summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-10-20 17:03:12 -0700
committerGarrett D'Amore <garrett@damore.org>2017-10-23 16:14:53 -0700
commit3585000ca027740dbdb4599f4991cd2bf562e2f2 (patch)
treea45b4c1bcc2d11777dde0e38d4b742d121d55e45 /src/core/msgqueue.c
parentfdb73b69a887d868f8e976ef8a990a5d7f6687f9 (diff)
downloadnng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.gz
nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.bz2
nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.zip
fixes #112 Need to move some stuff from socket to message queues
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c113
1 files changed, 65 insertions, 48 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 6fd95f98..c1f3701c 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -27,12 +27,17 @@ struct nni_msgq {
int mq_puterr;
int mq_geterr;
int mq_draining;
+ int mq_besteffort;
nni_msg **mq_msgs;
nni_list mq_aio_putq;
nni_list mq_aio_getq;
nni_list mq_aio_notify_get;
nni_list mq_aio_notify_put;
+
+ // Filters.
+ nni_msgq_filter mq_filter_fn;
+ void * mq_filter_arg;
};
int
@@ -157,6 +162,13 @@ nni_msgq_set_error(nni_msgq *mq, int error)
nni_mtx_unlock(&mq->mq_lock);
}
+void
+nni_msgq_set_filter(nni_msgq *mq, nni_msgq_filter filter, void *arg)
+{
+ mq->mq_filter_fn = filter;
+ mq->mq_filter_arg = arg;
+}
+
static void
nni_msgq_run_putq(nni_msgq *mq)
{
@@ -173,12 +185,19 @@ nni_msgq_run_putq(nni_msgq *mq)
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- nni_aio_set_msg(waio, NULL);
- nni_aio_list_remove(raio);
+ nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
+
+ if (mq->mq_filter_fn != NULL) {
+ msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
+ }
+ if (msg != NULL) {
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
+ }
+
nni_aio_finish(waio, 0, len);
- nni_aio_finish_msg(raio, msg);
continue;
}
@@ -195,40 +214,76 @@ nni_msgq_run_putq(nni_msgq *mq)
continue;
}
+ // If we are in best effort mode, just drop the message
+ // as if we delivered.
+ if (mq->mq_besteffort) {
+ nni_list_remove(&mq->mq_aio_putq, waio);
+ nni_aio_set_msg(waio, NULL);
+ nni_msg_free(msg);
+ nni_aio_finish(waio, 0, len);
+ continue;
+ }
+
// Unable to make progress, leave the aio where it is.
break;
}
}
+void
+nni_msgq_set_best_effort(nni_msgq *mq, int on)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_besteffort = on;
+ if (on) {
+ nni_msgq_run_putq(mq);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
static void
nni_msgq_run_getq(nni_msgq *mq)
{
nni_aio *raio;
nni_aio *waio;
- nni_msg *msg;
while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
// If anything is waiting in the queue, get it first.
if (mq->mq_len != 0) {
- msg = mq->mq_msgs[mq->mq_get++];
+ nni_msg *msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get == mq->mq_alloc) {
mq->mq_get = 0;
}
mq->mq_len--;
- nni_aio_list_remove(raio);
- nni_aio_finish_msg(raio, msg);
+
+ if (mq->mq_filter_fn != NULL) {
+ msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
+ }
+ if (msg != NULL) {
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
+ }
continue;
}
// Nothing queued (unbuffered?), maybe a writer is waiting.
if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
+ nni_msg *msg;
+ size_t len;
msg = nni_aio_get_msg(waio);
+ len = nni_msg_len(msg);
+
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
- nni_aio_list_remove(raio);
- nni_aio_finish(waio, 0, nni_msg_len(msg));
- nni_aio_finish_msg(raio, msg);
+ if (mq->mq_filter_fn != NULL) {
+ msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
+ }
+ if (msg != NULL) {
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
+ }
+
+ nni_aio_finish(waio, 0, len);
continue;
}
@@ -393,44 +448,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_EAGAIN);
}
-int
-nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
-{
- nni_aio *aio;
- int rv;
-
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
- nni_aio_set_timeout(aio, expire);
- nni_msgq_aio_get(mq, aio);
- nni_aio_wait(aio);
- if ((rv = nni_aio_result(aio)) == 0) {
- *msgp = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
- }
- nni_aio_fini(aio);
- return (rv);
-}
-
-int
-nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
-{
- nni_aio *aio;
- int rv;
-
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
- nni_aio_set_timeout(aio, expire);
- nni_aio_set_msg(aio, msg);
- nni_msgq_aio_put(mq, aio);
- nni_aio_wait(aio);
- rv = nni_aio_result(aio);
- nni_aio_fini(aio);
- return (rv);
-}
-
void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{