diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-05-19 10:33:11 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-05-21 13:52:18 -0700 |
| commit | b2c9baba988347c5bf15423f2ea40ce9d05da075 (patch) | |
| tree | af08f823cec5f5e0e8b68d380d49cc8c659cb7b8 /src | |
| parent | 6abb328523509d35663f54ee0012254232df4a0a (diff) | |
| download | nng-b2c9baba988347c5bf15423f2ea40ce9d05da075.tar.gz nng-b2c9baba988347c5bf15423f2ea40ce9d05da075.tar.bz2 nng-b2c9baba988347c5bf15423f2ea40ce9d05da075.zip | |
fixes #459 SUB should be more aggressive about discarding messages
As part of this code fix, we needed to add filtering support to the
msgq_tryput code path -- it turns out that code path was bypassing
the filterfn altogether.
Eventually we'll remove all this filtering stuff from the msgq code
and replace it with inline filtering directly in sub.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/msgqueue.c | 11 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 30 |
2 files changed, 19 insertions, 22 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index fa94e32f..62f57553 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -403,10 +403,15 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - nni_list_remove(&mq->mq_aio_getq, raio); - nni_aio_finish_msg(raio, msg); - nni_msgq_run_notify(mq); + if (mq->mq_filter_fn != NULL) { + msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); + } + if (msg != NULL) { + nni_list_remove(&mq->mq_aio_getq, raio); + nni_aio_finish_msg(raio, msg); + nni_msgq_run_notify(mq); + } nni_mtx_unlock(&mq->mq_lock); return (0); } diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index b4fa2e2a..a7d6b9f5 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -31,7 +31,6 @@ typedef struct sub0_sock sub0_sock; typedef struct sub0_topic sub0_topic; static void sub0_recv_cb(void *); -static void sub0_putq_cb(void *); static void sub0_pipe_fini(void *); struct sub0_topic { @@ -52,7 +51,6 @@ struct sub0_pipe { nni_pipe * pipe; sub0_sock *sub; nni_aio * aio_recv; - nni_aio * aio_putq; }; static int @@ -103,7 +101,6 @@ sub0_pipe_stop(void *arg) { sub0_pipe *p = arg; - nni_aio_stop(p->aio_putq); nni_aio_stop(p->aio_recv); } @@ -112,7 +109,6 @@ sub0_pipe_fini(void *arg) { sub0_pipe *p = arg; - nni_aio_fini(p->aio_putq); nni_aio_fini(p->aio_recv); NNI_FREE_STRUCT(p); } @@ -126,8 +122,7 @@ sub0_pipe_init(void **pp, nni_pipe *pipe, void *s) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) { + if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) { sub0_pipe_fini(p); return (rv); } @@ -157,7 +152,6 @@ sub0_pipe_close(void *arg) { sub0_pipe *p = arg; - nni_aio_close(p->aio_putq); nni_aio_close(p->aio_recv); } @@ -177,22 +171,20 @@ sub0_recv_cb(void *arg) msg = nni_aio_get_msg(p->aio_recv); nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(urq, p->aio_putq); -} -static void -sub0_putq_cb(void *arg) -{ - sub0_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); + switch (nni_msgq_tryput(urq, msg)) { + case 0: + break; + case NNG_EAGAIN: + nni_msg_free(msg); + break; + default: + // Any other error we stop the pipe for. It's probably + // NNG_ECLOSED anyway. + nng_msg_free(msg); nni_pipe_stop(p->pipe); return; } - nni_pipe_recv(p->pipe, p->aio_recv); } |
