diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/event.c | 21 | ||||
| -rw-r--r-- | src/core/event.h | 6 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 123 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 11 | ||||
| -rw-r--r-- | src/core/socket.c | 51 | ||||
| -rw-r--r-- | src/core/socket.h | 2 |
6 files changed, 175 insertions, 39 deletions
diff --git a/src/core/event.c b/src/core/event.c index 3f7d1143..1b264632 100644 --- a/src/core/event.c +++ b/src/core/event.c @@ -13,7 +13,7 @@ #include <string.h> int -nni_event_init(nni_event *event, int type, nni_sock *sock) +nni_ev_init(nni_event *event, int type, nni_sock *sock) { int rv; @@ -29,15 +29,26 @@ nni_event_init(nni_event *event, int type, nni_sock *sock) void -nni_event_fini(nni_event *event) +nni_ev_fini(nni_event *event) { nni_cv_fini(&event->e_cv); } void -nni_event_submit(nni_sock *sock, nni_event *event) +nni_ev_submit(nni_event *event) { + nni_sock *sock = event->e_sock; + + // If nobody is listening, don't bother submitting anything. + // This reduces pressure on the socket locks & condvars, in the + // typical case. + if (nni_list_first(&sock->s_notify) == NULL) { + event->e_pending = 0; + event->e_done = 1; + return; + } + // Call with socket mutex owned! if (event->e_pending == 0) { event->e_pending = 1; @@ -49,7 +60,7 @@ nni_event_submit(nni_sock *sock, nni_event *event) void -nni_event_wait(nni_sock *sock, nni_event *event) +nni_ev_wait(nni_event *event) { // Call with socket mutex owned! // Note that the socket mutex is dropped during the call. @@ -60,7 +71,7 @@ nni_event_wait(nni_sock *sock, nni_event *event) void -nni_event_notifier(void *arg) +nni_notifier(void *arg) { nni_sock *sock = arg; nni_event *event; diff --git a/src/core/event.h b/src/core/event.h index d7faad2e..8b160a89 100644 --- a/src/core/event.h +++ b/src/core/event.h @@ -32,4 +32,10 @@ struct nng_notify { int n_mask; }; +extern void nni_notifier(void *); +extern int nni_ev_init(nni_event *, int, nni_sock *); +extern void nni_ev_fini(nni_event *); +extern void nni_ev_submit(nni_event *); // call holding sock lock +extern void nni_ev_wait(nni_event *); // call holding sock lock + #endif // CORE_EVENT_H diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index bc381cfc..344798d7 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -15,21 +15,24 @@ // side can close, and they may be closed more than once. struct nni_msgq { - nni_mtx mq_lock; - nni_cv mq_readable; - nni_cv mq_writeable; - nni_cv mq_drained; - int mq_cap; - int mq_alloc; // alloc is cap + 2... - int mq_len; - int mq_get; - int mq_put; - int mq_closed; - int mq_puterr; - int mq_geterr; - int mq_rwait; // readers waiting (unbuffered) - int mq_wwait; - nni_msg ** mq_msgs; + nni_mtx mq_lock; + nni_cv mq_readable; + nni_cv mq_writeable; + nni_cv mq_drained; + int mq_cap; + int mq_alloc; // alloc is cap + 2... + int mq_len; + int mq_get; + int mq_put; + int mq_closed; + int mq_puterr; + int mq_geterr; + int mq_rwait; // readers waiting (unbuffered) + int mq_wwait; + nni_msg ** mq_msgs; + + nni_msgq_notify_fn mq_notify_fn; + void * mq_notify_arg; }; int @@ -54,31 +57,20 @@ nni_msgq_init(nni_msgq **mqp, int cap) return (NNG_ENOMEM); } if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { - nni_free(mq, sizeof (*mq)); - return (rv); + goto fail; } if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) { - nni_mtx_fini(&mq->mq_lock); - nni_free(mq, sizeof (*mq)); - return (NNG_ENOMEM); + goto fail; } if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) { - nni_cv_fini(&mq->mq_readable); - nni_mtx_fini(&mq->mq_lock); - return (NNG_ENOMEM); + goto fail; } if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) { - nni_cv_fini(&mq->mq_writeable); - nni_cv_fini(&mq->mq_readable); - nni_mtx_fini(&mq->mq_lock); - return (NNG_ENOMEM); + goto fail; } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) { - nni_cv_fini(&mq->mq_drained); - nni_cv_fini(&mq->mq_writeable); - nni_cv_fini(&mq->mq_readable); - nni_mtx_fini(&mq->mq_lock); - return (NNG_ENOMEM); + rv = NNG_ENOMEM; + goto fail; } mq->mq_cap = cap; @@ -91,9 +83,22 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_geterr = 0; mq->mq_wwait = 0; mq->mq_rwait = 0; + mq->mq_notify_fn = NULL; + mq->mq_notify_arg = NULL; *mqp = mq; return (0); + +fail: + nni_cv_fini(&mq->mq_drained); + nni_cv_fini(&mq->mq_writeable); + nni_cv_fini(&mq->mq_readable); + nni_mtx_fini(&mq->mq_lock); + if (mq->mq_msgs != NULL) { + nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc); + } + NNI_FREE_STRUCT(mq); + return (rv); } @@ -127,6 +132,16 @@ nni_msgq_fini(nni_msgq *mq) void +nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_notify_fn = fn; + mq->mq_notify_arg = arg; + nni_mtx_unlock(&mq->mq_lock); +} + + +void nni_msgq_set_put_error(nni_msgq *mq, int error) { nni_mtx_lock(&mq->mq_lock); @@ -184,6 +199,9 @@ int nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) { int rv; + int notify = 0; + nni_msgq_notify_fn fn; + void *arg; nni_mtx_lock(&mq->mq_lock); @@ -243,7 +261,18 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) if (mq->mq_rwait) { nni_cv_wake(&mq->mq_readable); } + notify = NNI_MSGQ_NOTIFY_CANGET; + if (mq->mq_len < mq->mq_cap) { + notify |= NNI_MSGQ_NOTIFY_CANPUT; + } + fn = mq->mq_notify_fn; + arg = mq->mq_notify_arg; nni_mtx_unlock(&mq->mq_lock); + + // The notify callback is executed outside of the lock. + if (fn != NULL) { + fn(mq, notify, arg); + } return (0); } @@ -254,6 +283,10 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) int nni_msgq_putback(nni_msgq *mq, nni_msg *msg) { + int notify = 0; + nni_msgq_notify_fn fn; + void *arg; + nni_mtx_lock(&mq->mq_lock); // if closed, we don't put more... this check is first! @@ -278,7 +311,19 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg) if (mq->mq_rwait) { nni_cv_wake(&mq->mq_readable); } + + notify = NNI_MSGQ_NOTIFY_CANGET; + if (mq->mq_len < mq->mq_cap) { + notify |= NNI_MSGQ_NOTIFY_CANPUT; + } + fn = mq->mq_notify_fn; + arg = mq->mq_notify_arg; nni_mtx_unlock(&mq->mq_lock); + + if (fn != NULL) { + fn(mq, notify, arg); + } + return (0); } @@ -287,6 +332,9 @@ static int nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) { int rv; + int notify = 0; + nni_msgq_notify_fn fn; + void *arg; nni_mtx_lock(&mq->mq_lock); @@ -335,7 +383,18 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) if (mq->mq_wwait) { nni_cv_wake(&mq->mq_writeable); } + notify = NNI_MSGQ_NOTIFY_CANPUT; + if (mq->mq_len > 0) { + notify |= NNI_MSGQ_NOTIFY_CANGET; + } + fn = mq->mq_notify_fn; + arg = mq->mq_notify_arg; nni_mtx_unlock(&mq->mq_lock); + + if (fn != NULL) { + fn(mq, notify, arg); + } + return (0); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 450c55a4..07f6aebb 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -131,4 +131,15 @@ extern int nni_msgq_cap(nni_msgq *mq); // nni_msgq_len returns the number of messages currently in the queue. extern int nni_msgq_len(nni_msgq *mq); +#define NNI_MSGQ_NOTIFY_CANPUT 1 +#define NNI_MSGQ_NOTIFY_CANGET 2 + +typedef void (*nni_msgq_notify_fn)(nni_msgq *, int, void *); + +// nni_msgq_notify registers a function to be called when the message +// queue state changes. It notifies that the queue is readable, or writeable. +// Only one function can be registered (for simplicity), and it is called +// outside of the queue's lock. +extern void nni_msgq_notify(nni_msgq *, nni_msgq_notify_fn, void *); + #endif // CORE_MSQUEUE_H diff --git a/src/core/socket.c b/src/core/socket.c index 56347e4e..734e4da8 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -29,6 +29,9 @@ nni_sock_recvq(nni_sock *s) } +// XXX: don't expose the upper queues to protocols, because we need to +// trap on activity in those queues! + // Because we have to call back into the socket, and possibly also the proto, // and wait for threads to terminate, we do this in a special thread. The // assumption is that closing is always a "fast" operation. @@ -82,6 +85,34 @@ nni_reaper(void *arg) } +static void +nni_sock_urq_notify(nni_msgq *mq, int flags, void *arg) +{ + nni_sock *sock = arg; + + if ((flags & NNI_MSGQ_NOTIFY_CANGET) == 0) { + return; // No interest in writability of read queue. + } + nni_mtx_lock(&sock->s_mx); + nni_ev_submit(&sock->s_recv_ev); + nni_mtx_unlock(&sock->s_mx); +} + + +static void +nni_sock_uwq_notify(nni_msgq *mq, int flags, void *arg) +{ + nni_sock *sock = arg; + + if ((flags & NNI_MSGQ_NOTIFY_CANPUT) == 0) { + return; // No interest in readability of write queue. + } + nni_mtx_lock(&sock->s_mx); + nni_ev_submit(&sock->s_send_ev); + nni_mtx_unlock(&sock->s_mx); +} + + nni_mtx * nni_sock_mtx(nni_sock *sock) { @@ -201,7 +232,13 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } - if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { + if (((rv = nni_ev_init(&sock->s_recv_ev, NNG_EVENT_RECV, sock)) != 0) || + ((rv = nni_ev_init(&sock->s_send_ev, NNG_EVENT_SEND, sock)) != 0)) { + goto fail; + } + + if (((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) || + ((rv = nni_thr_init(&sock->s_notifier, nni_notifier, sock)) != 0)) { goto fail; } @@ -228,7 +265,11 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_thr_run(&sock->s_worker_thr[i]); } + nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock); + nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock); + nni_thr_run(&sock->s_reaper); + nni_thr_run(&sock->s_notifier); *sockp = sock; return (0); @@ -239,7 +280,10 @@ fail: for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_notify_cv); @@ -329,6 +373,7 @@ nni_sock_shutdown(nni_sock *sock) sock->s_sock_ops.sock_close(sock->s_data); + nni_cv_wake(&sock->s_notify_cv); nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); @@ -336,6 +381,7 @@ nni_sock_shutdown(nni_sock *sock) for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_wait(&sock->s_worker_thr[i]); } + nni_thr_wait(&sock->s_notifier); nni_thr_wait(&sock->s_reaper); // At this point, there are no threads blocked inside of us @@ -372,9 +418,12 @@ nni_sock_close(nni_sock *sock) for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_notify_mx); diff --git a/src/core/socket.h b/src/core/socket.h index d8b37252..424b1321 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -44,7 +44,7 @@ struct nng_socket { nni_list s_reaps; // pipes to reap nni_thr s_reaper; - nni_thr s_notify_thr; + nni_thr s_notifier; nni_thr s_worker_thr[NNI_MAXWORKERS]; int s_ep_pend; // EP dial/listen in progress |
