From b93d5759c9b39ff153a14d474d800cd981f7dc97 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 22 Jan 2017 02:32:32 -0800 Subject: Event notification via pollable FDs verified working. --- src/core/msgqueue.c | 139 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 93 insertions(+), 46 deletions(-) (limited to 'src/core/msgqueue.c') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 3ec194fc..09f58a33 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -31,10 +31,61 @@ struct nni_msgq { int mq_wwait; nni_msg ** mq_msgs; + int mq_notify_sig; + nni_cv mq_notify_cv; + nni_thr mq_notify_thr; nni_msgq_notify_fn mq_notify_fn; void * mq_notify_arg; }; + +// nni_msgq_notifier thread runs if events callbacks are registered on the +// message queue, and calls the notification callbacks outside of the +// lock. It looks at the actual msgq state to trigger the right events. +static void +nni_msgq_notifier(void *arg) +{ + nni_msgq *mq = arg; + int sig; + nni_msgq_notify_fn fn; + void *fnarg; + + for (;;) { + nni_mtx_lock(&mq->mq_lock); + while ((mq->mq_notify_sig == 0) && (!mq->mq_closed)) { + nni_cv_wait(&mq->mq_notify_cv); + } + if (mq->mq_closed) { + nni_mtx_unlock(&mq->mq_lock); + break; + } + + sig = mq->mq_notify_sig; + mq->mq_notify_sig = 0; + + fn = mq->mq_notify_fn; + fnarg = mq->mq_notify_arg; + nni_mtx_unlock(&mq->mq_lock); + + if (fn != NULL) { + fn(mq, sig, fnarg); + } + } +} + + +// nni_msgq_kick kicks the msgq notification thread. It should be called +// with the lock held. +static void +nni_msgq_kick(nni_msgq *mq, int sig) +{ + if (mq->mq_notify_fn != NULL) { + mq->mq_notify_sig |= sig; + nni_cv_wake(&mq->mq_notify_cv); + } +} + + int nni_msgq_init(nni_msgq **mqp, int cap) { @@ -59,13 +110,10 @@ nni_msgq_init(nni_msgq **mqp, int cap) if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { goto fail; } - if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) { - goto fail; - } - if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) { - goto fail; - } - if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) { + if (((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) || + ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) || + ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) || + ((rv = nni_cv_init(&mq->mq_notify_cv, &mq->mq_lock)) != 0)) { goto fail; } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) { @@ -110,6 +158,7 @@ nni_msgq_fini(nni_msgq *mq) if (mq == NULL) { return; } + nni_thr_fini(&mq->mq_notify_thr); nni_cv_fini(&mq->mq_drained); nni_cv_fini(&mq->mq_writeable); nni_cv_fini(&mq->mq_readable); @@ -131,13 +180,24 @@ nni_msgq_fini(nni_msgq *mq) } -void +int nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg) { + int rv; + + nni_thr_fini(&mq->mq_notify_thr); + nni_mtx_lock(&mq->mq_lock); + rv = nni_thr_init(&mq->mq_notify_thr, nni_msgq_notifier, mq); + if (rv != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } mq->mq_notify_fn = fn; mq->mq_notify_arg = arg; + nni_thr_run(&mq->mq_notify_thr); nni_mtx_unlock(&mq->mq_lock); + return (0); } @@ -197,6 +257,7 @@ nni_msgq_signal(nni_msgq *mq, int *signal) mq->mq_wwait = 0; nni_cv_wake(&mq->mq_readable); nni_cv_wake(&mq->mq_writeable); + nni_cv_wake(&mq->mq_notify_cv); nni_mtx_unlock(&mq->mq_lock); } @@ -205,9 +266,6 @@ int nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) { int rv; - int notify = 0; - nni_msgq_notify_fn fn; - void *arg; nni_mtx_lock(&mq->mq_lock); @@ -247,8 +305,16 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) return (NNG_EAGAIN); } - // not writeable, so wait until something changes + // waiting.... mq->mq_wwait = 1; + + // if we are unbuffered, kick the notifier, because we're + // writable. + if (mq->mq_cap == 0) { + nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); + } + + // not writeable, so wait until something changes rv = nni_cv_until(&mq->mq_writeable, expire); if (rv == NNG_ETIMEDOUT) { nni_mtx_unlock(&mq->mq_lock); @@ -267,18 +333,12 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) mq->mq_rwait = 0; nni_cv_wake(&mq->mq_readable); } - notify = NNI_MSGQ_NOTIFY_CANGET; if (mq->mq_len < mq->mq_cap) { - notify |= NNI_MSGQ_NOTIFY_CANPUT; + nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); } - fn = mq->mq_notify_fn; - arg = mq->mq_notify_arg; + nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); nni_mtx_unlock(&mq->mq_lock); - // The notify callback is executed outside of the lock. - if (fn != NULL) { - fn(mq, notify, arg); - } return (0); } @@ -289,10 +349,6 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) int nni_msgq_putback(nni_msgq *mq, nni_msg *msg) { - int notify = 0; - nni_msgq_notify_fn fn; - void *arg; - nni_mtx_lock(&mq->mq_lock); // if closed, we don't put more... this check is first! @@ -319,18 +375,9 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg) nni_cv_wake(&mq->mq_readable); } - notify = NNI_MSGQ_NOTIFY_CANGET; - if (mq->mq_len < mq->mq_cap) { - notify |= NNI_MSGQ_NOTIFY_CANPUT; - } - fn = mq->mq_notify_fn; - arg = mq->mq_notify_arg; + nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); nni_mtx_unlock(&mq->mq_lock); - if (fn != NULL) { - fn(mq, notify, arg); - } - return (0); } @@ -339,9 +386,6 @@ static int nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) { int rv; - int notify = 0; - nni_msgq_notify_fn fn; - void *arg; nni_mtx_lock(&mq->mq_lock); @@ -372,6 +416,13 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) nni_cv_wake(&mq->mq_writeable); } mq->mq_rwait = 1; + + if (mq->mq_cap == 0) { + // If unbuffered, kick it since a writer would not + // block. + nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); + } + rv = nni_cv_until(&mq->mq_readable, expire); if (rv == NNG_ETIMEDOUT) { nni_mtx_unlock(&mq->mq_lock); @@ -391,18 +442,12 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) mq->mq_wwait = 0; nni_cv_wake(&mq->mq_writeable); } - notify = NNI_MSGQ_NOTIFY_CANPUT; - if (mq->mq_len > 0) { - notify |= NNI_MSGQ_NOTIFY_CANGET; + if (mq->mq_len) { + nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); } - fn = mq->mq_notify_fn; - arg = mq->mq_notify_arg; + nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); nni_mtx_unlock(&mq->mq_lock); - if (fn != NULL) { - fn(mq, notify, arg); - } - return (0); } @@ -475,6 +520,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) mq->mq_rwait = 0; nni_cv_wake(&mq->mq_writeable); nni_cv_wake(&mq->mq_readable); + nni_cv_wake(&mq->mq_notify_cv); while (mq->mq_len > 0) { if (nni_cv_until(&mq->mq_drained, expire) != 0) { break; @@ -502,6 +548,7 @@ nni_msgq_close(nni_msgq *mq) mq->mq_rwait = 0; nni_cv_wake(&mq->mq_writeable); nni_cv_wake(&mq->mq_readable); + nni_cv_wake(&mq->mq_notify_cv); // Free the messages orphaned in the queue. while (mq->mq_len > 0) { -- cgit v1.2.3-70-g09d2