summaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-08 11:18:16 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-08 11:18:16 -0800
commitec2574b09a746709f15d2a3f5de135e29f4bcb52 (patch)
tree25f970232f8093b9ce94969eeed2a5f230e94a89 /src/core/socket.c
parent360d19001b90d92ac2f232efb67e356979b0bc4b (diff)
downloadnng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.tar.gz
nng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.tar.bz2
nng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.zip
Move to generic socket & pipe workers, and up to 4 each.
This should eliminate all need for protocols to do their own thread management tasks.
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c62
1 files changed, 31 insertions, 31 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index ad8e5703..c6b0408e 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -142,6 +142,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_sock *sock;
nni_proto *proto;
int rv;
+ int i;
nni_proto_sock_ops *sops;
nni_proto_pipe_ops *pops;
@@ -217,7 +218,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) {
nni_msgq_fini(sock->s_uwq);
- nni_thr_fini(&sock->s_recver);
+ nni_thr_fini(&sock->s_reaper);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
@@ -234,36 +235,30 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
return (rv);
}
- // NB: If these functions are NULL, the thread initialization is
- // largely a NO-OP. The system won't actually create the threads.
- rv = nni_thr_init(&sock->s_sender, sops->sock_send, sock->s_data);
- if (rv != 0) {
- nni_thr_wait(&sock->s_reaper);
- sops->sock_fini(&sock->s_data);
- nni_msgq_fini(sock->s_urq);
- nni_msgq_fini(sock->s_uwq);
- nni_thr_fini(&sock->s_reaper);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
- }
- rv = nni_thr_init(&sock->s_recver, sops->sock_recv, sock->s_data);
- if (rv != 0) {
- nni_thr_wait(&sock->s_sender);
- sops->sock_fini(&sock->s_data);
- nni_msgq_fini(sock->s_urq);
- nni_msgq_fini(sock->s_uwq);
- nni_thr_fini(&sock->s_sender);
- nni_thr_fini(&sock->s_reaper);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
+ // NB: If worker functions are null, then the thread initialization
+ // turns into a NOP, and no actual thread will be started.
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_worker fn = sops->sock_worker[i];
+ rv = nni_thr_init(&sock->s_worker_thr[i], fn, sock->s_data);
+ if (rv != 0) {
+ while (i > 0) {
+ i--;
+ nni_thr_fini(&sock->s_worker_thr[i]);
+ }
+ sops->sock_fini(&sock->s_data);
+ nni_msgq_fini(sock->s_urq);
+ nni_msgq_fini(sock->s_uwq);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
+ NNI_FREE_STRUCT(sock);
+ }
}
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_thr_run(&sock->s_worker_thr[i]);
+ }
nni_thr_run(&sock->s_reaper);
- nni_thr_run(&sock->s_recver);
- nni_thr_run(&sock->s_sender);
*sockp = sock;
return (0);
}
@@ -279,6 +274,7 @@ nni_sock_shutdown(nni_sock *sock)
nni_pipe *pipe;
nni_ep *ep;
nni_time linger;
+ int i;
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
@@ -350,8 +346,9 @@ nni_sock_shutdown(nni_sock *sock)
nni_mtx_unlock(&sock->s_mx);
// Wait for the threads to exit.
- nni_thr_wait(&sock->s_sender);
- nni_thr_wait(&sock->s_recver);
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_thr_wait(&sock->s_worker_thr[i]);
+ }
nni_thr_wait(&sock->s_reaper);
// At this point, there are no threads blocked inside of us
@@ -368,6 +365,8 @@ nni_sock_shutdown(nni_sock *sock)
void
nni_sock_close(nni_sock *sock)
{
+ int i;
+
// Shutdown everything if not already done. This operation
// is idempotent.
nni_sock_shutdown(sock);
@@ -383,9 +382,10 @@ nni_sock_close(nni_sock *sock)
sock->s_sock_ops.sock_fini(sock->s_data);
// And we need to clean up *our* state.
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_thr_fini(&sock->s_worker_thr[i]);
+ }
nni_thr_fini(&sock->s_reaper);
- nni_thr_fini(&sock->s_sender);
- nni_thr_fini(&sock->s_recver);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_cv_fini(&sock->s_cv);