From ec2574b09a746709f15d2a3f5de135e29f4bcb52 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 8 Jan 2017 11:18:16 -0800 Subject: Move to generic socket & pipe workers, and up to 4 each. This should eliminate all need for protocols to do their own thread management tasks. --- src/core/socket.c | 62 +++++++++++++++++++++++++++---------------------------- 1 file changed, 31 insertions(+), 31 deletions(-) (limited to 'src/core/socket.c') diff --git a/src/core/socket.c b/src/core/socket.c index ad8e5703..c6b0408e 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -142,6 +142,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_sock *sock; nni_proto *proto; int rv; + int i; nni_proto_sock_ops *sops; nni_proto_pipe_ops *pops; @@ -217,7 +218,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) { nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_recver); + nni_thr_fini(&sock->s_reaper); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); @@ -234,36 +235,30 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) return (rv); } - // NB: If these functions are NULL, the thread initialization is - // largely a NO-OP. The system won't actually create the threads. - rv = nni_thr_init(&sock->s_sender, sops->sock_send, sock->s_data); - if (rv != 0) { - nni_thr_wait(&sock->s_reaper); - sops->sock_fini(&sock->s_data); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - } - rv = nni_thr_init(&sock->s_recver, sops->sock_recv, sock->s_data); - if (rv != 0) { - nni_thr_wait(&sock->s_sender); - sops->sock_fini(&sock->s_data); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_sender); - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); + // NB: If worker functions are null, then the thread initialization + // turns into a NOP, and no actual thread will be started. + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_worker fn = sops->sock_worker[i]; + rv = nni_thr_init(&sock->s_worker_thr[i], fn, sock->s_data); + if (rv != 0) { + while (i > 0) { + i--; + nni_thr_fini(&sock->s_worker_thr[i]); + } + sops->sock_fini(&sock->s_data); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); + } } + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_run(&sock->s_worker_thr[i]); + } nni_thr_run(&sock->s_reaper); - nni_thr_run(&sock->s_recver); - nni_thr_run(&sock->s_sender); *sockp = sock; return (0); } @@ -279,6 +274,7 @@ nni_sock_shutdown(nni_sock *sock) nni_pipe *pipe; nni_ep *ep; nni_time linger; + int i; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -350,8 +346,9 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_unlock(&sock->s_mx); // Wait for the threads to exit. - nni_thr_wait(&sock->s_sender); - nni_thr_wait(&sock->s_recver); + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_wait(&sock->s_worker_thr[i]); + } nni_thr_wait(&sock->s_reaper); // At this point, there are no threads blocked inside of us @@ -368,6 +365,8 @@ nni_sock_shutdown(nni_sock *sock) void nni_sock_close(nni_sock *sock) { + int i; + // Shutdown everything if not already done. This operation // is idempotent. nni_sock_shutdown(sock); @@ -383,9 +382,10 @@ nni_sock_close(nni_sock *sock) sock->s_sock_ops.sock_fini(sock->s_data); // And we need to clean up *our* state. + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_fini(&sock->s_worker_thr[i]); + } nni_thr_fini(&sock->s_reaper); - nni_thr_fini(&sock->s_sender); - nni_thr_fini(&sock->s_recver); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_cv); -- cgit v1.2.3-70-g09d2