diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-20 17:03:12 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-10-23 16:14:53 -0700 |
| commit | 3585000ca027740dbdb4599f4991cd2bf562e2f2 (patch) | |
| tree | a45b4c1bcc2d11777dde0e38d4b742d121d55e45 /src/core/msgqueue.c | |
| parent | fdb73b69a887d868f8e976ef8a990a5d7f6687f9 (diff) | |
| download | nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.gz nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.bz2 nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.zip | |
fixes #112 Need to move some stuff from socket to message queues
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 113 |
1 files changed, 65 insertions, 48 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 6fd95f98..c1f3701c 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -27,12 +27,17 @@ struct nni_msgq { int mq_puterr; int mq_geterr; int mq_draining; + int mq_besteffort; nni_msg **mq_msgs; nni_list mq_aio_putq; nni_list mq_aio_getq; nni_list mq_aio_notify_get; nni_list mq_aio_notify_put; + + // Filters. + nni_msgq_filter mq_filter_fn; + void * mq_filter_arg; }; int @@ -157,6 +162,13 @@ nni_msgq_set_error(nni_msgq *mq, int error) nni_mtx_unlock(&mq->mq_lock); } +void +nni_msgq_set_filter(nni_msgq *mq, nni_msgq_filter filter, void *arg) +{ + mq->mq_filter_fn = filter; + mq->mq_filter_arg = arg; +} + static void nni_msgq_run_putq(nni_msgq *mq) { @@ -173,12 +185,19 @@ nni_msgq_run_putq(nni_msgq *mq) // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - nni_aio_set_msg(waio, NULL); - nni_aio_list_remove(raio); + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); + + if (mq->mq_filter_fn != NULL) { + msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); + } + if (msg != NULL) { + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); + } + nni_aio_finish(waio, 0, len); - nni_aio_finish_msg(raio, msg); continue; } @@ -195,40 +214,76 @@ nni_msgq_run_putq(nni_msgq *mq) continue; } + // If we are in best effort mode, just drop the message + // as if we delivered. + if (mq->mq_besteffort) { + nni_list_remove(&mq->mq_aio_putq, waio); + nni_aio_set_msg(waio, NULL); + nni_msg_free(msg); + nni_aio_finish(waio, 0, len); + continue; + } + // Unable to make progress, leave the aio where it is. break; } } +void +nni_msgq_set_best_effort(nni_msgq *mq, int on) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_besteffort = on; + if (on) { + nni_msgq_run_putq(mq); + } + nni_mtx_unlock(&mq->mq_lock); +} + static void nni_msgq_run_getq(nni_msgq *mq) { nni_aio *raio; nni_aio *waio; - nni_msg *msg; while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { // If anything is waiting in the queue, get it first. if (mq->mq_len != 0) { - msg = mq->mq_msgs[mq->mq_get++]; + nni_msg *msg = mq->mq_msgs[mq->mq_get++]; if (mq->mq_get == mq->mq_alloc) { mq->mq_get = 0; } mq->mq_len--; - nni_aio_list_remove(raio); - nni_aio_finish_msg(raio, msg); + + if (mq->mq_filter_fn != NULL) { + msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); + } + if (msg != NULL) { + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); + } continue; } // Nothing queued (unbuffered?), maybe a writer is waiting. if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { + nni_msg *msg; + size_t len; msg = nni_aio_get_msg(waio); + len = nni_msg_len(msg); + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); - nni_aio_list_remove(raio); - nni_aio_finish(waio, 0, nni_msg_len(msg)); - nni_aio_finish_msg(raio, msg); + if (mq->mq_filter_fn != NULL) { + msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); + } + if (msg != NULL) { + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); + } + + nni_aio_finish(waio, 0, len); continue; } @@ -393,44 +448,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) return (NNG_EAGAIN); } -int -nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) -{ - nni_aio *aio; - int rv; - - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } - nni_aio_set_timeout(aio, expire); - nni_msgq_aio_get(mq, aio); - nni_aio_wait(aio); - if ((rv = nni_aio_result(aio)) == 0) { - *msgp = nni_aio_get_msg(aio); - nni_aio_set_msg(aio, NULL); - } - nni_aio_fini(aio); - return (rv); -} - -int -nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) -{ - nni_aio *aio; - int rv; - - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } - nni_aio_set_timeout(aio, expire); - nni_aio_set_msg(aio, msg); - nni_msgq_aio_put(mq, aio); - nni_aio_wait(aio); - rv = nni_aio_result(aio); - nni_aio_fini(aio); - return (rv); -} - void nni_msgq_drain(nni_msgq *mq, nni_time expire) { |
