summaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-19 10:33:11 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-21 13:52:18 -0700
commitb2c9baba988347c5bf15423f2ea40ce9d05da075 (patch)
treeaf08f823cec5f5e0e8b68d380d49cc8c659cb7b8 /src/protocol
parent6abb328523509d35663f54ee0012254232df4a0a (diff)
downloadnng-b2c9baba988347c5bf15423f2ea40ce9d05da075.tar.gz
nng-b2c9baba988347c5bf15423f2ea40ce9d05da075.tar.bz2
nng-b2c9baba988347c5bf15423f2ea40ce9d05da075.zip
fixes #459 SUB should be more aggressive about discarding messages
As part of this code fix, we needed to add filtering support to the msgq_tryput code path -- it turns out that code path was bypassing the filterfn altogether. Eventually we'll remove all this filtering stuff from the msgq code and replace it with inline filtering directly in sub.
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/pubsub0/sub.c30
1 files changed, 11 insertions, 19 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index b4fa2e2a..a7d6b9f5 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -31,7 +31,6 @@ typedef struct sub0_sock sub0_sock;
typedef struct sub0_topic sub0_topic;
static void sub0_recv_cb(void *);
-static void sub0_putq_cb(void *);
static void sub0_pipe_fini(void *);
struct sub0_topic {
@@ -52,7 +51,6 @@ struct sub0_pipe {
nni_pipe * pipe;
sub0_sock *sub;
nni_aio * aio_recv;
- nni_aio * aio_putq;
};
static int
@@ -103,7 +101,6 @@ sub0_pipe_stop(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_stop(p->aio_putq);
nni_aio_stop(p->aio_recv);
}
@@ -112,7 +109,6 @@ sub0_pipe_fini(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_recv);
NNI_FREE_STRUCT(p);
}
@@ -126,8 +122,7 @@ sub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) {
+ if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) {
sub0_pipe_fini(p);
return (rv);
}
@@ -157,7 +152,6 @@ sub0_pipe_close(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_close(p->aio_putq);
nni_aio_close(p->aio_recv);
}
@@ -177,22 +171,20 @@ sub0_recv_cb(void *arg)
msg = nni_aio_get_msg(p->aio_recv);
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- nni_aio_set_msg(p->aio_putq, msg);
- nni_msgq_aio_put(urq, p->aio_putq);
-}
-static void
-sub0_putq_cb(void *arg)
-{
- sub0_pipe *p = arg;
-
- if (nni_aio_result(p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_putq));
- nni_aio_set_msg(p->aio_putq, NULL);
+ switch (nni_msgq_tryput(urq, msg)) {
+ case 0:
+ break;
+ case NNG_EAGAIN:
+ nni_msg_free(msg);
+ break;
+ default:
+ // Any other error we stop the pipe for. It's probably
+ // NNG_ECLOSED anyway.
+ nng_msg_free(msg);
nni_pipe_stop(p->pipe);
return;
}
-
nni_pipe_recv(p->pipe, p->aio_recv);
}