aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0/xsub.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-12 16:11:19 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-12 23:03:06 -0800
commit7b4a0a996aa6ed3e8fbbd9fd0e28811725707605 (patch)
treec06211c9972d2b311116c8b4fee536896f38b394 /src/protocol/pubsub0/xsub.c
parent4299a5b4edc59753a6ec857fabedadf1504c4243 (diff)
downloadnng-7b4a0a996aa6ed3e8fbbd9fd0e28811725707605.tar.gz
nng-7b4a0a996aa6ed3e8fbbd9fd0e28811725707605.tar.bz2
nng-7b4a0a996aa6ed3e8fbbd9fd0e28811725707605.zip
Add PUB/SUB test suite.
This gets near 100% coverage of the PUB/SUB protocols. The remaining uncovered bits will need to have a mock protocol that runs slower, so that we can inject both back pressure, and also so that we can inject "erroroneous" messages.
Diffstat (limited to 'src/protocol/pubsub0/xsub.c')
-rw-r--r--src/protocol/pubsub0/xsub.c42
1 files changed, 17 insertions, 25 deletions
diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c
index baa4f8eb..0013b8b3 100644
--- a/src/protocol/pubsub0/xsub.c
+++ b/src/protocol/pubsub0/xsub.c
@@ -41,7 +41,7 @@ struct xsub0_sock {
struct xsub0_pipe {
nni_pipe * pipe;
xsub0_sock *sub;
- nni_aio * aio_recv;
+ nni_aio aio_recv;
};
static int
@@ -77,7 +77,7 @@ xsub0_pipe_stop(void *arg)
{
xsub0_pipe *p = arg;
- nni_aio_stop(p->aio_recv);
+ nni_aio_stop(&p->aio_recv);
}
static void
@@ -85,19 +85,15 @@ xsub0_pipe_fini(void *arg)
{
xsub0_pipe *p = arg;
- nni_aio_free(p->aio_recv);
+ nni_aio_fini(&p->aio_recv);
}
static int
xsub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
xsub0_pipe *p = arg;
- int rv;
- if ((rv = nni_aio_alloc(&p->aio_recv, xsub0_recv_cb, p)) != 0) {
- xsub0_pipe_fini(p);
- return (rv);
- }
+ nni_aio_init(&p->aio_recv, xsub0_recv_cb, p);
p->pipe = pipe;
p->sub = s;
@@ -114,7 +110,7 @@ xsub0_pipe_start(void *arg)
return (NNG_EPROTO);
}
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -123,7 +119,7 @@ xsub0_pipe_close(void *arg)
{
xsub0_pipe *p = arg;
- nni_aio_close(p->aio_recv);
+ nni_aio_close(&p->aio_recv);
}
static void
@@ -134,29 +130,25 @@ xsub0_recv_cb(void *arg)
nni_msgq * urq = s->urq;
nni_msg * msg;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- switch (nni_msgq_tryput(urq, msg)) {
- case 0:
- break;
- case NNG_EAGAIN:
+ if (nni_msgq_tryput(urq, msg) != 0) {
+ // This only happens for two reasons. For flow control,
+ // in which case we just want to discard the message and
+ // carry on, and for a close of the socket (which is very
+ // hard to achieve, since we close the pipes.) In either
+ // case the easiest thing to do is just free the message
+ // and try again.
nni_msg_free(msg);
- break;
- default:
- // Any other error we stop the pipe for. It's probably
- // NNG_ECLOSED anyway.
- nng_msg_free(msg);
- nni_pipe_close(p->pipe);
- return;
}
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
}
static void