diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-11 22:38:21 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-11 22:38:21 -0800 |
| commit | 3d4be5126f91978b7d7349de79334ecfc8fc2afe (patch) | |
| tree | c8cfadbb1096e99cad21bddbb9fe9ff7b5dd175a /src | |
| parent | 3d90bae8eda62fecdf367932fca591b965838e20 (diff) | |
| download | nng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.tar.gz nng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.tar.bz2 nng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.zip | |
Notification working - separate thread now.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/event.c | 111 | ||||
| -rw-r--r-- | src/core/event.h | 14 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 303 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 4 | ||||
| -rw-r--r-- | src/core/options.c | 2 | ||||
| -rw-r--r-- | src/core/socket.c | 129 | ||||
| -rw-r--r-- | src/core/socket.h | 9 | ||||
| -rw-r--r-- | src/nng.c | 4 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 11 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 1 |
10 files changed, 159 insertions, 429 deletions
diff --git a/src/core/event.c b/src/core/event.c index 98156a92..b0e02b16 100644 --- a/src/core/event.c +++ b/src/core/event.c @@ -18,10 +18,6 @@ nni_ev_init(nni_event *event, int type, nni_sock *sock) int rv; memset(event, 0, sizeof (*event)); - if ((rv = nni_cv_init(&event->e_cv, &sock->s_mx)) != 0) { - return (rv); - } - NNI_LIST_NODE_INIT(&event->e_node); event->e_type = type; event->e_sock = sock; return (0); @@ -31,110 +27,5 @@ nni_ev_init(nni_event *event, int type, nni_sock *sock) void nni_ev_fini(nni_event *event) { - nni_cv_fini(&event->e_cv); -} - - -void -nni_ev_submit(nni_event *event) -{ - nni_sock *sock = event->e_sock; - - // If nobody is listening, don't bother submitting anything. - // This reduces pressure on the socket locks & condvars, in the - // typical case. - if (nni_list_first(&sock->s_notify) == NULL) { - event->e_pending = 0; - event->e_done = 1; - return; - } - - // XXX: taskq_dispatch the event processing. - // This probably should bump a reference count on the socket - // first. - // XXX: One question of note... the aio structures we use elsewhere - // would be better than this. So instead of the handler doing two - // context switches we can just do one. - - // Call with socket mutex owned! - if (event->e_pending == 0) { - event->e_pending = 1; - event->e_done = 0; - nni_list_append(&sock->s_events, event); - nni_cv_wake(&sock->s_notify_cv); - } -} - - -void -nni_notifier(void *arg) -{ - nni_sock *sock = arg; - nni_event *event; - nni_notify *notify; - - nni_mtx_lock(&sock->s_mx); - for (;;) { - if (sock->s_closing) { - break; - } - - if ((event = nni_list_first(&sock->s_events)) != NULL) { - event->e_pending = 0; - nni_list_remove(&sock->s_events, event); - nni_mtx_unlock(&sock->s_mx); - - // Lock the notify list, it must not change. - nni_mtx_lock(&sock->s_notify_mx); - NNI_LIST_FOREACH (&sock->s_notify, notify) { - if ((notify->n_mask & event->e_type) == 0) { - // No interest. - continue; - } - notify->n_func(event, notify->n_arg); - } - nni_mtx_unlock(&sock->s_notify_mx); - - nni_mtx_lock(&sock->s_mx); - // Let the event submitter know we are done, unless - // they have resubmitted. Submitters can wait on this - // lock. - event->e_done = 1; - nni_cv_wake(&event->e_cv); - continue; - } - - nni_cv_wait(&sock->s_notify_cv); - } - nni_mtx_unlock(&sock->s_mx); -} - - -nni_notify * -nni_add_notify(nni_sock *sock, int mask, nng_notify_func fn, void *arg) -{ - nni_notify *notify; - - if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) { - return (NULL); - } - notify->n_func = fn; - notify->n_arg = arg; - notify->n_mask = mask; - NNI_LIST_NODE_INIT(¬ify->n_node); - - nni_mtx_lock(&sock->s_notify_mx); - nni_list_append(&sock->s_notify, notify); - nni_mtx_unlock(&sock->s_notify_mx); - return (notify); -} - - -void -nni_rem_notify(nni_sock *sock, nni_notify *notify) -{ - nni_mtx_lock(&sock->s_notify_mx); - nni_list_remove(&sock->s_notify, notify); - nni_mtx_unlock(&sock->s_notify_mx); - NNI_FREE_STRUCT(notify); + NNI_ARG_UNUSED(event); } diff --git a/src/core/event.h b/src/core/event.h index d07aa9dd..e43306c9 100644 --- a/src/core/event.h +++ b/src/core/event.h @@ -18,25 +18,17 @@ struct nng_event { nni_sock * e_sock; nni_ep * e_ep; nni_pipe * e_pipe; - - int e_done; // true when notify thr is finished - int e_pending; // true if event is queued - nni_cv e_cv; // signaled when e_done is noted - nni_list_node e_node; // location on the socket list }; struct nng_notify { - nni_list_node n_node; nng_notify_func n_func; void * n_arg; - int n_mask; + int n_type; + nni_sock * n_sock; + nni_aio n_aio; }; -extern void nni_notifier(void *); extern int nni_ev_init(nni_event *, int, nni_sock *); extern void nni_ev_fini(nni_event *); -extern void nni_ev_submit(nni_event *); // call holding sock lock -extern nni_notify *nni_add_notify(nni_sock *, int, nng_notify_func, void *); -extern void nni_rem_notify(nni_sock *, nni_notify *); #endif // CORE_EVENT_H diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 153d7c15..37d6d5ff 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -15,83 +15,29 @@ // side can close, and they may be closed more than once. struct nni_msgq { - nni_mtx mq_lock; - nni_cv mq_readable; - nni_cv mq_writeable; - nni_cv mq_drained; - int mq_cap; - int mq_alloc; // alloc is cap + 2... - int mq_len; - int mq_get; - int mq_put; - int mq_closed; - int mq_puterr; - int mq_geterr; - nni_msg ** mq_msgs; - - int mq_notify_sig; - nni_cv mq_notify_cv; - nni_thr mq_notify_thr; - nni_msgq_notify_fn mq_notify_fn; - void * mq_notify_arg; - - nni_list mq_aio_putq; - nni_list mq_aio_getq; - nni_list mq_aio_notify_get; - nni_list mq_aio_notify_put; - - nni_timer_node mq_timer; - nni_time mq_expire; + nni_mtx mq_lock; + nni_cv mq_drained; + int mq_cap; + int mq_alloc; // alloc is cap + 2... + int mq_len; + int mq_get; + int mq_put; + int mq_closed; + int mq_puterr; + int mq_geterr; + int mq_draining; + nni_msg ** mq_msgs; + + nni_list mq_aio_putq; + nni_list mq_aio_getq; + nni_list mq_aio_notify_get; + nni_list mq_aio_notify_put; + + nni_timer_node mq_timer; + nni_time mq_expire; }; -// nni_msgq_notifier thread runs if events callbacks are registered on the -// message queue, and calls the notification callbacks outside of the -// lock. It looks at the actual msgq state to trigger the right events. -static void -nni_msgq_notifier(void *arg) -{ - nni_msgq *mq = arg; - int sig; - nni_msgq_notify_fn fn; - void *fnarg; - - for (;;) { - nni_mtx_lock(&mq->mq_lock); - while ((mq->mq_notify_sig == 0) && (!mq->mq_closed)) { - nni_cv_wait(&mq->mq_notify_cv); - } - if (mq->mq_closed) { - nni_mtx_unlock(&mq->mq_lock); - break; - } - - sig = mq->mq_notify_sig; - mq->mq_notify_sig = 0; - - fn = mq->mq_notify_fn; - fnarg = mq->mq_notify_arg; - nni_mtx_unlock(&mq->mq_lock); - - if (fn != NULL) { - fn(mq, sig, fnarg); - } - } -} - - -// nni_msgq_kick kicks the msgq notification thread. It should be called -// with the lock held. -static void -nni_msgq_kick(nni_msgq *mq, int sig) -{ - if (mq->mq_notify_fn != NULL) { - mq->mq_notify_sig |= sig; - nni_cv_wake(&mq->mq_notify_cv); - } -} - - static void nni_msgq_run_timeout(void *); int @@ -123,10 +69,7 @@ nni_msgq_init(nni_msgq **mqp, int cap) if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { goto fail; } - if (((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) || - ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) || - ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) || - ((rv = nni_cv_init(&mq->mq_notify_cv, &mq->mq_lock)) != 0)) { + if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) { goto fail; } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) { @@ -144,17 +87,14 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_closed = 0; mq->mq_puterr = 0; mq->mq_geterr = 0; - mq->mq_notify_fn = NULL; - mq->mq_notify_arg = NULL; mq->mq_expire = NNI_TIME_NEVER; + mq->mq_draining = 0; *mqp = mq; return (0); fail: nni_cv_fini(&mq->mq_drained); - nni_cv_fini(&mq->mq_writeable); - nni_cv_fini(&mq->mq_readable); nni_mtx_fini(&mq->mq_lock); if (mq->mq_msgs != NULL) { nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc); @@ -173,10 +113,7 @@ nni_msgq_fini(nni_msgq *mq) return; } nni_timer_cancel(&mq->mq_timer); - nni_thr_fini(&mq->mq_notify_thr); nni_cv_fini(&mq->mq_drained); - nni_cv_fini(&mq->mq_writeable); - nni_cv_fini(&mq->mq_readable); nni_mtx_fini(&mq->mq_lock); /* Free any orphaned messages. */ @@ -195,27 +132,6 @@ nni_msgq_fini(nni_msgq *mq) } -int -nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg) -{ - int rv; - - nni_thr_fini(&mq->mq_notify_thr); - - nni_mtx_lock(&mq->mq_lock); - rv = nni_thr_init(&mq->mq_notify_thr, nni_msgq_notifier, mq); - if (rv != 0) { - nni_mtx_unlock(&mq->mq_lock); - return (rv); - } - mq->mq_notify_fn = fn; - mq->mq_notify_arg = arg; - nni_thr_run(&mq->mq_notify_thr); - nni_mtx_unlock(&mq->mq_lock); - return (0); -} - - void nni_msgq_set_get_error(nni_msgq *mq, int error) { @@ -329,14 +245,6 @@ nni_msgq_run_putq(nni_msgq *mq) // Unable to make progress, leave the aio where it is. break; } - - // XXX: REMOVE ME WHEN WE GO COMPLETELY ASYNC. - if (mq->mq_len != 0) { - nni_cv_wake(&mq->mq_readable); - } - if (mq->mq_len < mq->mq_cap) { - nni_cv_wake(&mq->mq_writeable); - } } @@ -394,7 +302,6 @@ nni_msgq_run_notify(nni_msgq *mq) } if ((mq->mq_len < mq->mq_cap) || (nni_list_first(&mq->mq_aio_getq) != NULL)) { - NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) { // This stays on the list. nni_aio_finish(aio, 0, 0); @@ -406,6 +313,37 @@ nni_msgq_run_notify(nni_msgq *mq) nni_aio_finish(aio, 0, 0); } } + + if (mq->mq_draining) { + if ((mq->mq_len == 0) && + (nni_list_first(&mq->mq_aio_putq) == NULL)) { + nni_cv_wake(&mq->mq_drained); + } + } +} + + +void +nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) +{ + nni_mtx_lock(&mq->mq_lock); + if (nni_list_active(&mq->mq_aio_notify_put, aio)) { + nni_list_remove(&mq->mq_aio_notify_put, aio); + } + nni_list_append(&mq->mq_aio_notify_put, aio); + nni_mtx_unlock(&mq->mq_lock); +} + + +void +nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) +{ + nni_mtx_lock(&mq->mq_lock); + if (nni_list_active(&mq->mq_aio_notify_get, aio)) { + nni_list_remove(&mq->mq_aio_notify_get, aio); + } + nni_list_append(&mq->mq_aio_notify_get, aio); + nni_mtx_unlock(&mq->mq_lock); } @@ -608,122 +546,6 @@ nni_msgq_run_timeout(void *arg) } -#if 0 -int -nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) -{ - int rv; - - nni_mtx_lock(&mq->mq_lock); - - for (;;) { - // if closed, we don't put more... this check is first! - if (mq->mq_closed) { - nni_mtx_unlock(&mq->mq_lock); - return (NNG_ECLOSED); - } - - if ((rv = mq->mq_puterr) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return (rv); - } - - // room in the queue? - if (mq->mq_len < mq->mq_cap) { - break; - } - - // unbuffered, room for one, and a reader waiting? - if (mq->mq_rwait && - (mq->mq_cap == 0) && - (mq->mq_len == mq->mq_cap)) { - break; - } - - // interrupted? - if (*sig) { - nni_mtx_unlock(&mq->mq_lock); - return (NNG_EINTR); - } - - // single poll? - if (expire == NNI_TIME_ZERO) { - nni_mtx_unlock(&mq->mq_lock); - return (NNG_EAGAIN); - } - - // waiting.... - mq->mq_wwait = 1; - - // if we are unbuffered, kick the notifier, because we're - // writable. - if (mq->mq_cap == 0) { - nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); - } - - // not writeable, so wait until something changes - rv = nni_cv_until(&mq->mq_writeable, expire); - if (rv == NNG_ETIMEDOUT) { - nni_mtx_unlock(&mq->mq_lock); - return (NNG_ETIMEDOUT); - } - } - - // Writeable! Yay!! - mq->mq_msgs[mq->mq_put] = msg; - mq->mq_put++; - if (mq->mq_put == mq->mq_alloc) { - mq->mq_put = 0; - } - mq->mq_len++; - if (mq->mq_rwait) { - mq->mq_rwait = 0; - nni_cv_wake(&mq->mq_readable); - } - if (mq->mq_len < mq->mq_cap) { - nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); - } - nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); - nni_msgq_run_getq(mq); - nni_mtx_unlock(&mq->mq_lock); - - return (0); -} - - -static int -nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) -{ - int rv; - - nni_mtx_lock(&mq->mq_lock); - - // Readable! Yay!! - - *msgp = mq->mq_msgs[mq->mq_get]; - mq->mq_len--; - mq->mq_get++; - if (mq->mq_get == mq->mq_alloc) { - mq->mq_get = 0; - } - if (mq->mq_wwait) { - mq->mq_wwait = 0; - nni_cv_wake(&mq->mq_writeable); - } - if (mq->mq_len) { - nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); - } - nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); - nni_msgq_run_putq(mq); - nni_mtx_unlock(&mq->mq_lock); - - return (0); -} - - -#endif - - int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { @@ -781,17 +603,23 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg) void nni_msgq_drain(nni_msgq *mq, nni_time expire) { + nni_aio *aio; + nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; - nni_cv_wake(&mq->mq_writeable); - nni_cv_wake(&mq->mq_readable); - nni_cv_wake(&mq->mq_notify_cv); - while (mq->mq_len > 0) { + mq->mq_draining = 1; + while ((mq->mq_len > 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) { if (nni_cv_until(&mq->mq_drained, expire) != 0) { break; } } // If we timedout, free any remaining messages in the queue. + // Also complete the putq as NNG_ECLOSED. + + while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { + nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } while (mq->mq_len > 0) { nni_msg *msg = mq->mq_msgs[mq->mq_get++]; if (mq->mq_get > mq->mq_alloc) { @@ -812,9 +640,6 @@ nni_msgq_close(nni_msgq *mq) nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; - nni_cv_wake(&mq->mq_writeable); - nni_cv_wake(&mq->mq_readable); - nni_cv_wake(&mq->mq_notify_cv); // Free the messages orphaned in the queue. while (mq->mq_len > 0) { @@ -937,8 +762,6 @@ nni_msgq_resize(nni_msgq *mq, int cap) out: // Wake everyone up -- we changed everything. - nni_cv_wake(&mq->mq_readable); - nni_cv_wake(&mq->mq_writeable); nni_cv_wake(&mq->mq_drained); nni_mtx_unlock(&mq->mq_lock); return (0); diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 51143706..682c9821 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -39,8 +39,8 @@ extern int nni_msgq_canget(nni_msgq *); extern int nni_msgq_canput(nni_msgq *); extern void nni_msgq_aio_put(nni_msgq *, nni_aio *); extern void nni_msgq_aio_get(nni_msgq *, nni_aio *); -extern int nni_msgq_aio_notify_get(nni_msgq *, nni_aio *); -extern int nni_msgq_aio_notify_put(nni_msgq *, nni_aio *); +extern void nni_msgq_aio_notify_get(nni_msgq *, nni_aio *); +extern void nni_msgq_aio_notify_put(nni_msgq *, nni_aio *); extern void nni_msgq_aio_cancel(nni_msgq *, nni_aio *); // nni_msgq_put puts the message to the queue. It blocks until it diff --git a/src/core/options.c b/src/core/options.c index cd67e0d0..245172bf 100644 --- a/src/core/options.c +++ b/src/core/options.c @@ -195,7 +195,7 @@ nni_getopt_fd(nni_sock *s, nni_notifyfd *fd, int mask, void *val, size_t *szp) return (rv); } - if (nni_add_notify(s, mask, nni_notifyfd_push, fd) == NULL) { + if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) { nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd); return (NNG_ENOMEM); } diff --git a/src/core/socket.c b/src/core/socket.c index 5beffaf0..abbd2c2c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -229,30 +229,93 @@ nni_sock_unlock(nni_sock *sock) static void -nni_sock_urq_notify(nni_msgq *mq, int flags, void *arg) +nni_sock_cansend_cb(void *arg) { - nni_sock *sock = arg; + nni_notify *notify = arg; + nni_sock *sock = notify->n_sock; - if ((flags & NNI_MSGQ_NOTIFY_CANGET) == 0) { - return; // No interest in writability of read queue. + if (nni_aio_result(¬ify->n_aio) != 0) { + return; } - nni_mtx_lock(&sock->s_mx); - nni_ev_submit(&sock->s_recv_ev); - nni_mtx_unlock(&sock->s_mx); + + notify->n_func(&sock->s_send_ev, notify->n_arg); } static void -nni_sock_uwq_notify(nni_msgq *mq, int flags, void *arg) +nni_sock_canrecv_cb(void *arg) { - nni_sock *sock = arg; + nni_notify *notify = arg; + nni_sock *sock = notify->n_sock; - if ((flags & NNI_MSGQ_NOTIFY_CANPUT) == 0) { - return; // No interest in readability of write queue. + if (nni_aio_result(¬ify->n_aio) != 0) { + return; } - nni_mtx_lock(&sock->s_mx); - nni_ev_submit(&sock->s_send_ev); - nni_mtx_unlock(&sock->s_mx); + + notify->n_func(&sock->s_recv_ev, notify->n_arg); +} + + +nni_notify * +nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) +{ + nni_notify *notify; + int rv; + + if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) { + return (NULL); + } + + notify->n_func = fn; + notify->n_arg = arg; + notify->n_type = type; + notify->n_sock = sock; + + switch (type) { + case NNG_EV_CAN_RCV: + rv = nni_aio_init(¬ify->n_aio, nni_sock_canrecv_cb, notify); + if (rv != 0) { + goto fail; + } + nni_msgq_aio_notify_get(sock->s_urq, ¬ify->n_aio); + break; + case NNG_EV_CAN_SND: + rv = nni_aio_init(¬ify->n_aio, nni_sock_cansend_cb, notify); + if (rv != 0) { + goto fail; + } + nni_msgq_aio_notify_put(sock->s_uwq, ¬ify->n_aio); + break; + default: + rv = NNG_ENOTSUP; + goto fail; + break; + } + + return (notify); + +fail: + nni_aio_fini(¬ify->n_aio); + NNI_FREE_STRUCT(notify); + return (NULL); +} + + +void +nni_sock_unnotify(nni_sock *sock, nni_notify *notify) +{ + switch (notify->n_type) { + case NNG_EV_CAN_RCV: + nni_msgq_aio_cancel(sock->s_urq, ¬ify->n_aio); + break; + case NNG_EV_CAN_SND: + nni_msgq_aio_cancel(sock->s_uwq, ¬ify->n_aio); + break; + default: + return; + } + nni_aio_fini(¬ify->n_aio); + NNI_FREE_STRUCT(notify); } @@ -344,8 +407,6 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_idles, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node); - NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node); - NNI_LIST_INIT(&sock->s_events, nni_event, e_node); sock->s_send_fd.sn_init = 0; sock->s_recv_fd.sn_init = 0; @@ -379,9 +440,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } if (((rv = nni_mtx_init(&sock->s_mx)) != 0) || - ((rv = nni_mtx_init(&sock->s_notify_mx)) != 0) || - ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) || - ((rv = nni_cv_init(&sock->s_notify_cv, &sock->s_mx)) != 0)) { + ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0)) { goto fail; } @@ -398,10 +457,6 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } - if ((rv = nni_thr_init(&sock->s_notifier, nni_notifier, sock)) != 0) { - goto fail; - } - if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) { goto fail; @@ -421,21 +476,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } - // XXX: This kills performance. Look at moving this to - // be conditional - if nobody has callbacks because their code is - // also threaded, then we don't need to jump through these hoops. - rv = nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock); - if (rv != 0) { - goto fail; - } - rv = nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock); - if (rv != 0) { - goto fail; - } - sops->sock_open(sock->s_data); - nni_thr_run(&sock->s_notifier); *sockp = sock; return (0); @@ -453,15 +495,12 @@ fail: } nni_mtx_unlock(nni_idlock); } - nni_thr_fini(&sock->s_notifier); nni_ev_fini(&sock->s_send_ev); nni_ev_fini(&sock->s_recv_ev); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_refcv); - nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_notify_mx); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); return (rv); @@ -545,7 +584,6 @@ nni_sock_shutdown(nni_sock *sock) sock->s_sock_ops.sock_close(sock->s_data); - nni_cv_wake(&sock->s_notify_cv); nni_cv_wake(&sock->s_cv); while ((nni_list_first(&sock->s_idles) != NULL) || @@ -554,9 +592,6 @@ nni_sock_shutdown(nni_sock *sock) } nni_mtx_unlock(&sock->s_mx); - // Wait for the threads to exit. - nni_thr_wait(&sock->s_notifier); - // At this point, there are no threads blocked inside of us // that are referencing socket state. User code should call // nng_close to release the last resources. @@ -611,20 +646,12 @@ nni_sock_close(nni_sock *sock) // The protocol needs to clean up its state. sock->s_sock_ops.sock_fini(sock->s_data); - // And we need to clean up *our* state. - while ((notify = nni_list_first(&sock->s_notify)) != NULL) { - nni_list_remove(&sock->s_notify, notify); - NNI_FREE_STRUCT(notify); - } - nni_thr_fini(&sock->s_notifier); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_ev_fini(&sock->s_send_ev); nni_ev_fini(&sock->s_recv_ev); nni_cv_fini(&sock->s_refcv); - nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_notify_mx); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); } diff --git a/src/core/socket.h b/src/core/socket.h index 6a608d0a..a746a170 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -44,15 +44,9 @@ struct nni_socket { nni_list s_eps; // active endpoints nni_list s_pipes; // ready pipes (started) nni_list s_idles; // idle pipes (not ready) - nni_list s_events; // pending events - nni_list s_notify; // event watchers - nni_cv s_notify_cv; // wakes notify thread - nni_mtx s_notify_mx; // protects s_notify list size_t s_rcvmaxsz; // maximum receive size - nni_thr s_notifier; - int s_ep_pend; // EP dial/listen in progress int s_closing; // Socket is closing int s_closed; // Socket closed @@ -89,6 +83,9 @@ extern uint32_t nni_sock_id(nni_sock *); extern void nni_sock_lock(nni_sock *); extern void nni_sock_unlock(nni_sock *); +extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *); +extern void nni_sock_unnotify(nni_sock *, nni_notify *); + // nni_sock_pipe_add is called by the pipe to register the pipe with // with the socket. The pipe is added to the idle list. extern void nni_sock_pipe_add(nni_sock *, nni_pipe *); @@ -313,7 +313,7 @@ nng_setnotify(nng_socket sid, int mask, nng_notify_func fn, void *arg) if ((rv = nni_sock_hold(&sock, sid)) != 0) { return (NULL); } - notify = nni_add_notify(sock, mask, fn, arg); + notify = nni_sock_notify(sock, mask, fn, arg); nni_sock_rele(sock); return (notify); } @@ -328,7 +328,7 @@ nng_unsetnotify(nng_socket sid, nng_notify *notify) if ((rv = nni_sock_hold(&sock, sid)) != 0) { return; } - nni_rem_notify(sock, notify); + nni_sock_unnotify(sock, notify); nni_sock_rele(sock); } diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 736e4099..55a6855c 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -109,13 +109,12 @@ nni_rep_sock_fini(void *arg) { nni_rep_sock *rep = arg; - if (rep != NULL) { - nni_idhash_fini(&rep->pipes); - if (rep->btrace != NULL) { - nni_free(rep->btrace, rep->btrace_len); - } - NNI_FREE_STRUCT(rep); + nni_idhash_fini(&rep->pipes); + if (rep->btrace != NULL) { + nni_free(rep->btrace, rep->btrace_len); } + nni_aio_fini(&rep->aio_getq); + NNI_FREE_STRUCT(rep); } diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 323a262b..4fa89af3 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -108,6 +108,7 @@ nni_surv_sock_fini(void *arg) { nni_surv_sock *psock = arg; + nni_aio_fini(&psock->aio_getq); NNI_FREE_STRUCT(psock); } |
