From 3d4be5126f91978b7d7349de79334ecfc8fc2afe Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 11 Mar 2017 22:38:21 -0800 Subject: Notification working - separate thread now. --- src/core/msgqueue.c | 303 +++++++++++----------------------------------------- 1 file changed, 63 insertions(+), 240 deletions(-) (limited to 'src/core/msgqueue.c') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 153d7c15..37d6d5ff 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -15,83 +15,29 @@ // side can close, and they may be closed more than once. struct nni_msgq { - nni_mtx mq_lock; - nni_cv mq_readable; - nni_cv mq_writeable; - nni_cv mq_drained; - int mq_cap; - int mq_alloc; // alloc is cap + 2... - int mq_len; - int mq_get; - int mq_put; - int mq_closed; - int mq_puterr; - int mq_geterr; - nni_msg ** mq_msgs; - - int mq_notify_sig; - nni_cv mq_notify_cv; - nni_thr mq_notify_thr; - nni_msgq_notify_fn mq_notify_fn; - void * mq_notify_arg; - - nni_list mq_aio_putq; - nni_list mq_aio_getq; - nni_list mq_aio_notify_get; - nni_list mq_aio_notify_put; - - nni_timer_node mq_timer; - nni_time mq_expire; + nni_mtx mq_lock; + nni_cv mq_drained; + int mq_cap; + int mq_alloc; // alloc is cap + 2... + int mq_len; + int mq_get; + int mq_put; + int mq_closed; + int mq_puterr; + int mq_geterr; + int mq_draining; + nni_msg ** mq_msgs; + + nni_list mq_aio_putq; + nni_list mq_aio_getq; + nni_list mq_aio_notify_get; + nni_list mq_aio_notify_put; + + nni_timer_node mq_timer; + nni_time mq_expire; }; -// nni_msgq_notifier thread runs if events callbacks are registered on the -// message queue, and calls the notification callbacks outside of the -// lock. It looks at the actual msgq state to trigger the right events. -static void -nni_msgq_notifier(void *arg) -{ - nni_msgq *mq = arg; - int sig; - nni_msgq_notify_fn fn; - void *fnarg; - - for (;;) { - nni_mtx_lock(&mq->mq_lock); - while ((mq->mq_notify_sig == 0) && (!mq->mq_closed)) { - nni_cv_wait(&mq->mq_notify_cv); - } - if (mq->mq_closed) { - nni_mtx_unlock(&mq->mq_lock); - break; - } - - sig = mq->mq_notify_sig; - mq->mq_notify_sig = 0; - - fn = mq->mq_notify_fn; - fnarg = mq->mq_notify_arg; - nni_mtx_unlock(&mq->mq_lock); - - if (fn != NULL) { - fn(mq, sig, fnarg); - } - } -} - - -// nni_msgq_kick kicks the msgq notification thread. It should be called -// with the lock held. -static void -nni_msgq_kick(nni_msgq *mq, int sig) -{ - if (mq->mq_notify_fn != NULL) { - mq->mq_notify_sig |= sig; - nni_cv_wake(&mq->mq_notify_cv); - } -} - - static void nni_msgq_run_timeout(void *); int @@ -123,10 +69,7 @@ nni_msgq_init(nni_msgq **mqp, int cap) if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { goto fail; } - if (((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) || - ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) || - ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) || - ((rv = nni_cv_init(&mq->mq_notify_cv, &mq->mq_lock)) != 0)) { + if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) { goto fail; } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) { @@ -144,17 +87,14 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_closed = 0; mq->mq_puterr = 0; mq->mq_geterr = 0; - mq->mq_notify_fn = NULL; - mq->mq_notify_arg = NULL; mq->mq_expire = NNI_TIME_NEVER; + mq->mq_draining = 0; *mqp = mq; return (0); fail: nni_cv_fini(&mq->mq_drained); - nni_cv_fini(&mq->mq_writeable); - nni_cv_fini(&mq->mq_readable); nni_mtx_fini(&mq->mq_lock); if (mq->mq_msgs != NULL) { nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc); @@ -173,10 +113,7 @@ nni_msgq_fini(nni_msgq *mq) return; } nni_timer_cancel(&mq->mq_timer); - nni_thr_fini(&mq->mq_notify_thr); nni_cv_fini(&mq->mq_drained); - nni_cv_fini(&mq->mq_writeable); - nni_cv_fini(&mq->mq_readable); nni_mtx_fini(&mq->mq_lock); /* Free any orphaned messages. */ @@ -195,27 +132,6 @@ nni_msgq_fini(nni_msgq *mq) } -int -nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg) -{ - int rv; - - nni_thr_fini(&mq->mq_notify_thr); - - nni_mtx_lock(&mq->mq_lock); - rv = nni_thr_init(&mq->mq_notify_thr, nni_msgq_notifier, mq); - if (rv != 0) { - nni_mtx_unlock(&mq->mq_lock); - return (rv); - } - mq->mq_notify_fn = fn; - mq->mq_notify_arg = arg; - nni_thr_run(&mq->mq_notify_thr); - nni_mtx_unlock(&mq->mq_lock); - return (0); -} - - void nni_msgq_set_get_error(nni_msgq *mq, int error) { @@ -329,14 +245,6 @@ nni_msgq_run_putq(nni_msgq *mq) // Unable to make progress, leave the aio where it is. break; } - - // XXX: REMOVE ME WHEN WE GO COMPLETELY ASYNC. - if (mq->mq_len != 0) { - nni_cv_wake(&mq->mq_readable); - } - if (mq->mq_len < mq->mq_cap) { - nni_cv_wake(&mq->mq_writeable); - } } @@ -394,7 +302,6 @@ nni_msgq_run_notify(nni_msgq *mq) } if ((mq->mq_len < mq->mq_cap) || (nni_list_first(&mq->mq_aio_getq) != NULL)) { - NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) { // This stays on the list. nni_aio_finish(aio, 0, 0); @@ -406,6 +313,37 @@ nni_msgq_run_notify(nni_msgq *mq) nni_aio_finish(aio, 0, 0); } } + + if (mq->mq_draining) { + if ((mq->mq_len == 0) && + (nni_list_first(&mq->mq_aio_putq) == NULL)) { + nni_cv_wake(&mq->mq_drained); + } + } +} + + +void +nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) +{ + nni_mtx_lock(&mq->mq_lock); + if (nni_list_active(&mq->mq_aio_notify_put, aio)) { + nni_list_remove(&mq->mq_aio_notify_put, aio); + } + nni_list_append(&mq->mq_aio_notify_put, aio); + nni_mtx_unlock(&mq->mq_lock); +} + + +void +nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) +{ + nni_mtx_lock(&mq->mq_lock); + if (nni_list_active(&mq->mq_aio_notify_get, aio)) { + nni_list_remove(&mq->mq_aio_notify_get, aio); + } + nni_list_append(&mq->mq_aio_notify_get, aio); + nni_mtx_unlock(&mq->mq_lock); } @@ -608,122 +546,6 @@ nni_msgq_run_timeout(void *arg) } -#if 0 -int -nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) -{ - int rv; - - nni_mtx_lock(&mq->mq_lock); - - for (;;) { - // if closed, we don't put more... this check is first! - if (mq->mq_closed) { - nni_mtx_unlock(&mq->mq_lock); - return (NNG_ECLOSED); - } - - if ((rv = mq->mq_puterr) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return (rv); - } - - // room in the queue? - if (mq->mq_len < mq->mq_cap) { - break; - } - - // unbuffered, room for one, and a reader waiting? - if (mq->mq_rwait && - (mq->mq_cap == 0) && - (mq->mq_len == mq->mq_cap)) { - break; - } - - // interrupted? - if (*sig) { - nni_mtx_unlock(&mq->mq_lock); - return (NNG_EINTR); - } - - // single poll? - if (expire == NNI_TIME_ZERO) { - nni_mtx_unlock(&mq->mq_lock); - return (NNG_EAGAIN); - } - - // waiting.... - mq->mq_wwait = 1; - - // if we are unbuffered, kick the notifier, because we're - // writable. - if (mq->mq_cap == 0) { - nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); - } - - // not writeable, so wait until something changes - rv = nni_cv_until(&mq->mq_writeable, expire); - if (rv == NNG_ETIMEDOUT) { - nni_mtx_unlock(&mq->mq_lock); - return (NNG_ETIMEDOUT); - } - } - - // Writeable! Yay!! - mq->mq_msgs[mq->mq_put] = msg; - mq->mq_put++; - if (mq->mq_put == mq->mq_alloc) { - mq->mq_put = 0; - } - mq->mq_len++; - if (mq->mq_rwait) { - mq->mq_rwait = 0; - nni_cv_wake(&mq->mq_readable); - } - if (mq->mq_len < mq->mq_cap) { - nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); - } - nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); - nni_msgq_run_getq(mq); - nni_mtx_unlock(&mq->mq_lock); - - return (0); -} - - -static int -nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) -{ - int rv; - - nni_mtx_lock(&mq->mq_lock); - - // Readable! Yay!! - - *msgp = mq->mq_msgs[mq->mq_get]; - mq->mq_len--; - mq->mq_get++; - if (mq->mq_get == mq->mq_alloc) { - mq->mq_get = 0; - } - if (mq->mq_wwait) { - mq->mq_wwait = 0; - nni_cv_wake(&mq->mq_writeable); - } - if (mq->mq_len) { - nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); - } - nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); - nni_msgq_run_putq(mq); - nni_mtx_unlock(&mq->mq_lock); - - return (0); -} - - -#endif - - int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { @@ -781,17 +603,23 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg) void nni_msgq_drain(nni_msgq *mq, nni_time expire) { + nni_aio *aio; + nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; - nni_cv_wake(&mq->mq_writeable); - nni_cv_wake(&mq->mq_readable); - nni_cv_wake(&mq->mq_notify_cv); - while (mq->mq_len > 0) { + mq->mq_draining = 1; + while ((mq->mq_len > 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) { if (nni_cv_until(&mq->mq_drained, expire) != 0) { break; } } // If we timedout, free any remaining messages in the queue. + // Also complete the putq as NNG_ECLOSED. + + while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { + nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } while (mq->mq_len > 0) { nni_msg *msg = mq->mq_msgs[mq->mq_get++]; if (mq->mq_get > mq->mq_alloc) { @@ -812,9 +640,6 @@ nni_msgq_close(nni_msgq *mq) nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; - nni_cv_wake(&mq->mq_writeable); - nni_cv_wake(&mq->mq_readable); - nni_cv_wake(&mq->mq_notify_cv); // Free the messages orphaned in the queue. while (mq->mq_len > 0) { @@ -937,8 +762,6 @@ nni_msgq_resize(nni_msgq *mq, int cap) out: // Wake everyone up -- we changed everything. - nni_cv_wake(&mq->mq_readable); - nni_cv_wake(&mq->mq_writeable); nni_cv_wake(&mq->mq_drained); nni_mtx_unlock(&mq->mq_lock); return (0); -- cgit v1.2.3-70-g09d2