diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-24 15:26:14 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-10-24 16:30:09 -0700 |
| commit | 4250fc119057eb6a6b534e9c0758488cc5fb034e (patch) | |
| tree | c64efe1ae1115cf3aacdedf365c11398c8d99ba8 /src/core/msgqueue.c | |
| parent | a7b20c3babd965b12dec8cb5ff0883a4d8d1116d (diff) | |
| download | nng-4250fc119057eb6a6b534e9c0758488cc5fb034e.tar.gz nng-4250fc119057eb6a6b534e9c0758488cc5fb034e.tar.bz2 nng-4250fc119057eb6a6b534e9c0758488cc5fb034e.zip | |
fixes #132 Implement saner notification for file descriptors
This eliminates the "quasi-functional" notify API altogether.
The aio framework will be coming soon to replace it.
As a bonus, apps (legacy apps) that use the notification FDs
will see improved performance, since we don't have to context
switch to give them a notification.
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 74 |
1 files changed, 32 insertions, 42 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index c1f3701c..3f1ffcb5 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -32,8 +32,12 @@ struct nni_msgq { nni_list mq_aio_putq; nni_list mq_aio_getq; - nni_list mq_aio_notify_get; - nni_list mq_aio_notify_put; + + // Callback - this function is executed with the lock held, and + // provides information about the current queue state anytime + // a message enters or leaves the queue, or a waiter is blocked. + nni_msgq_cb mq_cb_fn; + void * mq_cb_arg; // Filters. nni_msgq_filter mq_filter_fn; @@ -63,9 +67,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); - nni_aio_list_init(&mq->mq_aio_notify_get); - nni_aio_list_init(&mq->mq_aio_notify_put); - nni_mtx_init(&mq->mq_lock); nni_cv_init(&mq->mq_drained, &mq->mq_lock); @@ -297,22 +298,25 @@ static void nni_msgq_run_notify(nni_msgq *mq) { nni_aio *aio; + if (mq->mq_cb_fn != NULL) { + int flags = 0; - if (mq->mq_closed) { - return; - } - if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) { - NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) { - // This stays on the list. - nni_aio_finish(aio, 0, 0); + if (mq->mq_closed) { + flags |= nni_msgq_f_closed; } - } - - if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { - NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) { - // This stays on the list. - nni_aio_finish(aio, 0, 0); + if (mq->mq_len == 0) { + flags |= nni_msgq_f_empty; + } else if (mq->mq_len == mq->mq_cap) { + flags |= nni_msgq_f_full; + } + if (mq->mq_len < mq->mq_cap || + !nni_list_empty(&mq->mq_aio_getq)) { + flags |= nni_msgq_f_can_put; + } + if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { + flags |= nni_msgq_f_can_get; } + mq->mq_cb_fn(mq->mq_cb_arg, flags); } if (mq->mq_draining) { @@ -322,6 +326,16 @@ nni_msgq_run_notify(nni_msgq *mq) } } +void +nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_cb_fn = fn; + mq->mq_cb_arg = arg; + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); +} + static void nni_msgq_cancel(nni_aio *aio, int rv) { @@ -336,30 +350,6 @@ nni_msgq_cancel(nni_aio *aio, int rv) } void -nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) -{ - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return; - } - nni_aio_list_append(&mq->mq_aio_notify_put, aio); - nni_mtx_unlock(&mq->mq_lock); -} - -void -nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) -{ - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return; - } - nni_aio_list_append(&mq->mq_aio_notify_get, aio); - nni_mtx_unlock(&mq->mq_lock); -} - -void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); |
