From 50e1484af0d443b46aa04fd4a8096b157dc160aa Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 16 Jan 2017 19:35:51 -0800 Subject: Recv/Send event plumbing implemented (msgqueue and up). This change provides for a private callback in the message queues, which can be used to notify the socket, and which than arranges for the appropriate event thread to run. Upper layer hooks to access this still need to be written. --- src/core/event.c | 21 ++++++--- src/core/event.h | 6 +++ src/core/msgqueue.c | 123 ++++++++++++++++++++++++++++++++++++++-------------- src/core/msgqueue.h | 11 +++++ src/core/socket.c | 51 +++++++++++++++++++++- src/core/socket.h | 2 +- 6 files changed, 175 insertions(+), 39 deletions(-) (limited to 'src/core') diff --git a/src/core/event.c b/src/core/event.c index 3f7d1143..1b264632 100644 --- a/src/core/event.c +++ b/src/core/event.c @@ -13,7 +13,7 @@ #include int -nni_event_init(nni_event *event, int type, nni_sock *sock) +nni_ev_init(nni_event *event, int type, nni_sock *sock) { int rv; @@ -29,15 +29,26 @@ nni_event_init(nni_event *event, int type, nni_sock *sock) void -nni_event_fini(nni_event *event) +nni_ev_fini(nni_event *event) { nni_cv_fini(&event->e_cv); } void -nni_event_submit(nni_sock *sock, nni_event *event) +nni_ev_submit(nni_event *event) { + nni_sock *sock = event->e_sock; + + // If nobody is listening, don't bother submitting anything. + // This reduces pressure on the socket locks & condvars, in the + // typical case. + if (nni_list_first(&sock->s_notify) == NULL) { + event->e_pending = 0; + event->e_done = 1; + return; + } + // Call with socket mutex owned! if (event->e_pending == 0) { event->e_pending = 1; @@ -49,7 +60,7 @@ nni_event_submit(nni_sock *sock, nni_event *event) void -nni_event_wait(nni_sock *sock, nni_event *event) +nni_ev_wait(nni_event *event) { // Call with socket mutex owned! // Note that the socket mutex is dropped during the call. @@ -60,7 +71,7 @@ nni_event_wait(nni_sock *sock, nni_event *event) void -nni_event_notifier(void *arg) +nni_notifier(void *arg) { nni_sock *sock = arg; nni_event *event; diff --git a/src/core/event.h b/src/core/event.h index d7faad2e..8b160a89 100644 --- a/src/core/event.h +++ b/src/core/event.h @@ -32,4 +32,10 @@ struct nng_notify { int n_mask; }; +extern void nni_notifier(void *); +extern int nni_ev_init(nni_event *, int, nni_sock *); +extern void nni_ev_fini(nni_event *); +extern void nni_ev_submit(nni_event *); // call holding sock lock +extern void nni_ev_wait(nni_event *); // call holding sock lock + #endif // CORE_EVENT_H diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index bc381cfc..344798d7 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -15,21 +15,24 @@ // side can close, and they may be closed more than once. struct nni_msgq { - nni_mtx mq_lock; - nni_cv mq_readable; - nni_cv mq_writeable; - nni_cv mq_drained; - int mq_cap; - int mq_alloc; // alloc is cap + 2... - int mq_len; - int mq_get; - int mq_put; - int mq_closed; - int mq_puterr; - int mq_geterr; - int mq_rwait; // readers waiting (unbuffered) - int mq_wwait; - nni_msg ** mq_msgs; + nni_mtx mq_lock; + nni_cv mq_readable; + nni_cv mq_writeable; + nni_cv mq_drained; + int mq_cap; + int mq_alloc; // alloc is cap + 2... + int mq_len; + int mq_get; + int mq_put; + int mq_closed; + int mq_puterr; + int mq_geterr; + int mq_rwait; // readers waiting (unbuffered) + int mq_wwait; + nni_msg ** mq_msgs; + + nni_msgq_notify_fn mq_notify_fn; + void * mq_notify_arg; }; int @@ -54,31 +57,20 @@ nni_msgq_init(nni_msgq **mqp, int cap) return (NNG_ENOMEM); } if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { - nni_free(mq, sizeof (*mq)); - return (rv); + goto fail; } if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) { - nni_mtx_fini(&mq->mq_lock); - nni_free(mq, sizeof (*mq)); - return (NNG_ENOMEM); + goto fail; } if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) { - nni_cv_fini(&mq->mq_readable); - nni_mtx_fini(&mq->mq_lock); - return (NNG_ENOMEM); + goto fail; } if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) { - nni_cv_fini(&mq->mq_writeable); - nni_cv_fini(&mq->mq_readable); - nni_mtx_fini(&mq->mq_lock); - return (NNG_ENOMEM); + goto fail; } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) { - nni_cv_fini(&mq->mq_drained); - nni_cv_fini(&mq->mq_writeable); - nni_cv_fini(&mq->mq_readable); - nni_mtx_fini(&mq->mq_lock); - return (NNG_ENOMEM); + rv = NNG_ENOMEM; + goto fail; } mq->mq_cap = cap; @@ -91,9 +83,22 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_geterr = 0; mq->mq_wwait = 0; mq->mq_rwait = 0; + mq->mq_notify_fn = NULL; + mq->mq_notify_arg = NULL; *mqp = mq; return (0); + +fail: + nni_cv_fini(&mq->mq_drained); + nni_cv_fini(&mq->mq_writeable); + nni_cv_fini(&mq->mq_readable); + nni_mtx_fini(&mq->mq_lock); + if (mq->mq_msgs != NULL) { + nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc); + } + NNI_FREE_STRUCT(mq); + return (rv); } @@ -126,6 +131,16 @@ nni_msgq_fini(nni_msgq *mq) } +void +nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_notify_fn = fn; + mq->mq_notify_arg = arg; + nni_mtx_unlock(&mq->mq_lock); +} + + void nni_msgq_set_put_error(nni_msgq *mq, int error) { @@ -184,6 +199,9 @@ int nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) { int rv; + int notify = 0; + nni_msgq_notify_fn fn; + void *arg; nni_mtx_lock(&mq->mq_lock); @@ -243,7 +261,18 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) if (mq->mq_rwait) { nni_cv_wake(&mq->mq_readable); } + notify = NNI_MSGQ_NOTIFY_CANGET; + if (mq->mq_len < mq->mq_cap) { + notify |= NNI_MSGQ_NOTIFY_CANPUT; + } + fn = mq->mq_notify_fn; + arg = mq->mq_notify_arg; nni_mtx_unlock(&mq->mq_lock); + + // The notify callback is executed outside of the lock. + if (fn != NULL) { + fn(mq, notify, arg); + } return (0); } @@ -254,6 +283,10 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) int nni_msgq_putback(nni_msgq *mq, nni_msg *msg) { + int notify = 0; + nni_msgq_notify_fn fn; + void *arg; + nni_mtx_lock(&mq->mq_lock); // if closed, we don't put more... this check is first! @@ -278,7 +311,19 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg) if (mq->mq_rwait) { nni_cv_wake(&mq->mq_readable); } + + notify = NNI_MSGQ_NOTIFY_CANGET; + if (mq->mq_len < mq->mq_cap) { + notify |= NNI_MSGQ_NOTIFY_CANPUT; + } + fn = mq->mq_notify_fn; + arg = mq->mq_notify_arg; nni_mtx_unlock(&mq->mq_lock); + + if (fn != NULL) { + fn(mq, notify, arg); + } + return (0); } @@ -287,6 +332,9 @@ static int nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) { int rv; + int notify = 0; + nni_msgq_notify_fn fn; + void *arg; nni_mtx_lock(&mq->mq_lock); @@ -335,7 +383,18 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) if (mq->mq_wwait) { nni_cv_wake(&mq->mq_writeable); } + notify = NNI_MSGQ_NOTIFY_CANPUT; + if (mq->mq_len > 0) { + notify |= NNI_MSGQ_NOTIFY_CANGET; + } + fn = mq->mq_notify_fn; + arg = mq->mq_notify_arg; nni_mtx_unlock(&mq->mq_lock); + + if (fn != NULL) { + fn(mq, notify, arg); + } + return (0); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 450c55a4..07f6aebb 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -131,4 +131,15 @@ extern int nni_msgq_cap(nni_msgq *mq); // nni_msgq_len returns the number of messages currently in the queue. extern int nni_msgq_len(nni_msgq *mq); +#define NNI_MSGQ_NOTIFY_CANPUT 1 +#define NNI_MSGQ_NOTIFY_CANGET 2 + +typedef void (*nni_msgq_notify_fn)(nni_msgq *, int, void *); + +// nni_msgq_notify registers a function to be called when the message +// queue state changes. It notifies that the queue is readable, or writeable. +// Only one function can be registered (for simplicity), and it is called +// outside of the queue's lock. +extern void nni_msgq_notify(nni_msgq *, nni_msgq_notify_fn, void *); + #endif // CORE_MSQUEUE_H diff --git a/src/core/socket.c b/src/core/socket.c index 56347e4e..734e4da8 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -29,6 +29,9 @@ nni_sock_recvq(nni_sock *s) } +// XXX: don't expose the upper queues to protocols, because we need to +// trap on activity in those queues! + // Because we have to call back into the socket, and possibly also the proto, // and wait for threads to terminate, we do this in a special thread. The // assumption is that closing is always a "fast" operation. @@ -82,6 +85,34 @@ nni_reaper(void *arg) } +static void +nni_sock_urq_notify(nni_msgq *mq, int flags, void *arg) +{ + nni_sock *sock = arg; + + if ((flags & NNI_MSGQ_NOTIFY_CANGET) == 0) { + return; // No interest in writability of read queue. + } + nni_mtx_lock(&sock->s_mx); + nni_ev_submit(&sock->s_recv_ev); + nni_mtx_unlock(&sock->s_mx); +} + + +static void +nni_sock_uwq_notify(nni_msgq *mq, int flags, void *arg) +{ + nni_sock *sock = arg; + + if ((flags & NNI_MSGQ_NOTIFY_CANPUT) == 0) { + return; // No interest in readability of write queue. + } + nni_mtx_lock(&sock->s_mx); + nni_ev_submit(&sock->s_send_ev); + nni_mtx_unlock(&sock->s_mx); +} + + nni_mtx * nni_sock_mtx(nni_sock *sock) { @@ -201,7 +232,13 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } - if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { + if (((rv = nni_ev_init(&sock->s_recv_ev, NNG_EVENT_RECV, sock)) != 0) || + ((rv = nni_ev_init(&sock->s_send_ev, NNG_EVENT_SEND, sock)) != 0)) { + goto fail; + } + + if (((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) || + ((rv = nni_thr_init(&sock->s_notifier, nni_notifier, sock)) != 0)) { goto fail; } @@ -228,7 +265,11 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_thr_run(&sock->s_worker_thr[i]); } + nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock); + nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock); + nni_thr_run(&sock->s_reaper); + nni_thr_run(&sock->s_notifier); *sockp = sock; return (0); @@ -239,7 +280,10 @@ fail: for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_notify_cv); @@ -329,6 +373,7 @@ nni_sock_shutdown(nni_sock *sock) sock->s_sock_ops.sock_close(sock->s_data); + nni_cv_wake(&sock->s_notify_cv); nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); @@ -336,6 +381,7 @@ nni_sock_shutdown(nni_sock *sock) for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_wait(&sock->s_worker_thr[i]); } + nni_thr_wait(&sock->s_notifier); nni_thr_wait(&sock->s_reaper); // At this point, there are no threads blocked inside of us @@ -372,9 +418,12 @@ nni_sock_close(nni_sock *sock) for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_notify_mx); diff --git a/src/core/socket.h b/src/core/socket.h index d8b37252..424b1321 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -44,7 +44,7 @@ struct nng_socket { nni_list s_reaps; // pipes to reap nni_thr s_reaper; - nni_thr s_notify_thr; + nni_thr s_notifier; nni_thr s_worker_thr[NNI_MAXWORKERS]; int s_ep_pend; // EP dial/listen in progress -- cgit v1.2.3-70-g09d2