From 50e1484af0d443b46aa04fd4a8096b157dc160aa Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 16 Jan 2017 19:35:51 -0800 Subject: Recv/Send event plumbing implemented (msgqueue and up). This change provides for a private callback in the message queues, which can be used to notify the socket, and which than arranges for the appropriate event thread to run. Upper layer hooks to access this still need to be written. --- src/core/socket.c | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) (limited to 'src/core/socket.c') diff --git a/src/core/socket.c b/src/core/socket.c index 56347e4e..734e4da8 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -29,6 +29,9 @@ nni_sock_recvq(nni_sock *s) } +// XXX: don't expose the upper queues to protocols, because we need to +// trap on activity in those queues! + // Because we have to call back into the socket, and possibly also the proto, // and wait for threads to terminate, we do this in a special thread. The // assumption is that closing is always a "fast" operation. @@ -82,6 +85,34 @@ nni_reaper(void *arg) } +static void +nni_sock_urq_notify(nni_msgq *mq, int flags, void *arg) +{ + nni_sock *sock = arg; + + if ((flags & NNI_MSGQ_NOTIFY_CANGET) == 0) { + return; // No interest in writability of read queue. + } + nni_mtx_lock(&sock->s_mx); + nni_ev_submit(&sock->s_recv_ev); + nni_mtx_unlock(&sock->s_mx); +} + + +static void +nni_sock_uwq_notify(nni_msgq *mq, int flags, void *arg) +{ + nni_sock *sock = arg; + + if ((flags & NNI_MSGQ_NOTIFY_CANPUT) == 0) { + return; // No interest in readability of write queue. + } + nni_mtx_lock(&sock->s_mx); + nni_ev_submit(&sock->s_send_ev); + nni_mtx_unlock(&sock->s_mx); +} + + nni_mtx * nni_sock_mtx(nni_sock *sock) { @@ -201,7 +232,13 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } - if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { + if (((rv = nni_ev_init(&sock->s_recv_ev, NNG_EVENT_RECV, sock)) != 0) || + ((rv = nni_ev_init(&sock->s_send_ev, NNG_EVENT_SEND, sock)) != 0)) { + goto fail; + } + + if (((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) || + ((rv = nni_thr_init(&sock->s_notifier, nni_notifier, sock)) != 0)) { goto fail; } @@ -228,7 +265,11 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_thr_run(&sock->s_worker_thr[i]); } + nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock); + nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock); + nni_thr_run(&sock->s_reaper); + nni_thr_run(&sock->s_notifier); *sockp = sock; return (0); @@ -239,7 +280,10 @@ fail: for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_notify_cv); @@ -329,6 +373,7 @@ nni_sock_shutdown(nni_sock *sock) sock->s_sock_ops.sock_close(sock->s_data); + nni_cv_wake(&sock->s_notify_cv); nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); @@ -336,6 +381,7 @@ nni_sock_shutdown(nni_sock *sock) for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_wait(&sock->s_worker_thr[i]); } + nni_thr_wait(&sock->s_notifier); nni_thr_wait(&sock->s_reaper); // At this point, there are no threads blocked inside of us @@ -372,9 +418,12 @@ nni_sock_close(nni_sock *sock) for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_notify_mx); -- cgit v1.2.3-70-g09d2