diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 51 |
1 files changed, 50 insertions, 1 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 56347e4e..734e4da8 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -29,6 +29,9 @@ nni_sock_recvq(nni_sock *s) } +// XXX: don't expose the upper queues to protocols, because we need to +// trap on activity in those queues! + // Because we have to call back into the socket, and possibly also the proto, // and wait for threads to terminate, we do this in a special thread. The // assumption is that closing is always a "fast" operation. @@ -82,6 +85,34 @@ nni_reaper(void *arg) } +static void +nni_sock_urq_notify(nni_msgq *mq, int flags, void *arg) +{ + nni_sock *sock = arg; + + if ((flags & NNI_MSGQ_NOTIFY_CANGET) == 0) { + return; // No interest in writability of read queue. + } + nni_mtx_lock(&sock->s_mx); + nni_ev_submit(&sock->s_recv_ev); + nni_mtx_unlock(&sock->s_mx); +} + + +static void +nni_sock_uwq_notify(nni_msgq *mq, int flags, void *arg) +{ + nni_sock *sock = arg; + + if ((flags & NNI_MSGQ_NOTIFY_CANPUT) == 0) { + return; // No interest in readability of write queue. + } + nni_mtx_lock(&sock->s_mx); + nni_ev_submit(&sock->s_send_ev); + nni_mtx_unlock(&sock->s_mx); +} + + nni_mtx * nni_sock_mtx(nni_sock *sock) { @@ -201,7 +232,13 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } - if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { + if (((rv = nni_ev_init(&sock->s_recv_ev, NNG_EVENT_RECV, sock)) != 0) || + ((rv = nni_ev_init(&sock->s_send_ev, NNG_EVENT_SEND, sock)) != 0)) { + goto fail; + } + + if (((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) || + ((rv = nni_thr_init(&sock->s_notifier, nni_notifier, sock)) != 0)) { goto fail; } @@ -228,7 +265,11 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_thr_run(&sock->s_worker_thr[i]); } + nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock); + nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock); + nni_thr_run(&sock->s_reaper); + nni_thr_run(&sock->s_notifier); *sockp = sock; return (0); @@ -239,7 +280,10 @@ fail: for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_notify_cv); @@ -329,6 +373,7 @@ nni_sock_shutdown(nni_sock *sock) sock->s_sock_ops.sock_close(sock->s_data); + nni_cv_wake(&sock->s_notify_cv); nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); @@ -336,6 +381,7 @@ nni_sock_shutdown(nni_sock *sock) for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_wait(&sock->s_worker_thr[i]); } + nni_thr_wait(&sock->s_notifier); nni_thr_wait(&sock->s_reaper); // At this point, there are no threads blocked inside of us @@ -372,9 +418,12 @@ nni_sock_close(nni_sock *sock) for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_notify_mx); |
