diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 62 |
1 files changed, 31 insertions, 31 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index ad8e5703..c6b0408e 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -142,6 +142,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_sock *sock; nni_proto *proto; int rv; + int i; nni_proto_sock_ops *sops; nni_proto_pipe_ops *pops; @@ -217,7 +218,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) { nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_recver); + nni_thr_fini(&sock->s_reaper); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); @@ -234,36 +235,30 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) return (rv); } - // NB: If these functions are NULL, the thread initialization is - // largely a NO-OP. The system won't actually create the threads. - rv = nni_thr_init(&sock->s_sender, sops->sock_send, sock->s_data); - if (rv != 0) { - nni_thr_wait(&sock->s_reaper); - sops->sock_fini(&sock->s_data); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - } - rv = nni_thr_init(&sock->s_recver, sops->sock_recv, sock->s_data); - if (rv != 0) { - nni_thr_wait(&sock->s_sender); - sops->sock_fini(&sock->s_data); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_sender); - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); + // NB: If worker functions are null, then the thread initialization + // turns into a NOP, and no actual thread will be started. + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_worker fn = sops->sock_worker[i]; + rv = nni_thr_init(&sock->s_worker_thr[i], fn, sock->s_data); + if (rv != 0) { + while (i > 0) { + i--; + nni_thr_fini(&sock->s_worker_thr[i]); + } + sops->sock_fini(&sock->s_data); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); + } } + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_run(&sock->s_worker_thr[i]); + } nni_thr_run(&sock->s_reaper); - nni_thr_run(&sock->s_recver); - nni_thr_run(&sock->s_sender); *sockp = sock; return (0); } @@ -279,6 +274,7 @@ nni_sock_shutdown(nni_sock *sock) nni_pipe *pipe; nni_ep *ep; nni_time linger; + int i; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -350,8 +346,9 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_unlock(&sock->s_mx); // Wait for the threads to exit. - nni_thr_wait(&sock->s_sender); - nni_thr_wait(&sock->s_recver); + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_wait(&sock->s_worker_thr[i]); + } nni_thr_wait(&sock->s_reaper); // At this point, there are no threads blocked inside of us @@ -368,6 +365,8 @@ nni_sock_shutdown(nni_sock *sock) void nni_sock_close(nni_sock *sock) { + int i; + // Shutdown everything if not already done. This operation // is idempotent. nni_sock_shutdown(sock); @@ -383,9 +382,10 @@ nni_sock_close(nni_sock *sock) sock->s_sock_ops.sock_fini(sock->s_data); // And we need to clean up *our* state. + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_fini(&sock->s_worker_thr[i]); + } nni_thr_fini(&sock->s_reaper); - nni_thr_fini(&sock->s_sender); - nni_thr_fini(&sock->s_recver); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_cv); |
