diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-16 19:35:51 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-16 19:35:51 -0800 |
| commit | 50e1484af0d443b46aa04fd4a8096b157dc160aa (patch) | |
| tree | ab517217c82e785b0c851c986ee44dfc4a4a65fe /src/core/socket.c | |
| parent | ac8415c24ffea645105c3859e814843e81c97f8a (diff) | |
| download | nng-50e1484af0d443b46aa04fd4a8096b157dc160aa.tar.gz nng-50e1484af0d443b46aa04fd4a8096b157dc160aa.tar.bz2 nng-50e1484af0d443b46aa04fd4a8096b157dc160aa.zip | |
Recv/Send event plumbing implemented (msgqueue and up).
This change provides for a private callback in the message queues,
which can be used to notify the socket, and which than arranges for
the appropriate event thread to run.
Upper layer hooks to access this still need to be written.
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 51 |
1 files changed, 50 insertions, 1 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 56347e4e..734e4da8 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -29,6 +29,9 @@ nni_sock_recvq(nni_sock *s) } +// XXX: don't expose the upper queues to protocols, because we need to +// trap on activity in those queues! + // Because we have to call back into the socket, and possibly also the proto, // and wait for threads to terminate, we do this in a special thread. The // assumption is that closing is always a "fast" operation. @@ -82,6 +85,34 @@ nni_reaper(void *arg) } +static void +nni_sock_urq_notify(nni_msgq *mq, int flags, void *arg) +{ + nni_sock *sock = arg; + + if ((flags & NNI_MSGQ_NOTIFY_CANGET) == 0) { + return; // No interest in writability of read queue. + } + nni_mtx_lock(&sock->s_mx); + nni_ev_submit(&sock->s_recv_ev); + nni_mtx_unlock(&sock->s_mx); +} + + +static void +nni_sock_uwq_notify(nni_msgq *mq, int flags, void *arg) +{ + nni_sock *sock = arg; + + if ((flags & NNI_MSGQ_NOTIFY_CANPUT) == 0) { + return; // No interest in readability of write queue. + } + nni_mtx_lock(&sock->s_mx); + nni_ev_submit(&sock->s_send_ev); + nni_mtx_unlock(&sock->s_mx); +} + + nni_mtx * nni_sock_mtx(nni_sock *sock) { @@ -201,7 +232,13 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } - if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { + if (((rv = nni_ev_init(&sock->s_recv_ev, NNG_EVENT_RECV, sock)) != 0) || + ((rv = nni_ev_init(&sock->s_send_ev, NNG_EVENT_SEND, sock)) != 0)) { + goto fail; + } + + if (((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) || + ((rv = nni_thr_init(&sock->s_notifier, nni_notifier, sock)) != 0)) { goto fail; } @@ -228,7 +265,11 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_thr_run(&sock->s_worker_thr[i]); } + nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock); + nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock); + nni_thr_run(&sock->s_reaper); + nni_thr_run(&sock->s_notifier); *sockp = sock; return (0); @@ -239,7 +280,10 @@ fail: for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_notify_cv); @@ -329,6 +373,7 @@ nni_sock_shutdown(nni_sock *sock) sock->s_sock_ops.sock_close(sock->s_data); + nni_cv_wake(&sock->s_notify_cv); nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); @@ -336,6 +381,7 @@ nni_sock_shutdown(nni_sock *sock) for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_wait(&sock->s_worker_thr[i]); } + nni_thr_wait(&sock->s_notifier); nni_thr_wait(&sock->s_reaper); // At this point, there are no threads blocked inside of us @@ -372,9 +418,12 @@ nni_sock_close(nni_sock *sock) for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_notify_mx); |
