diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-12 16:11:19 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-12 23:03:06 -0800 |
| commit | 7b4a0a996aa6ed3e8fbbd9fd0e28811725707605 (patch) | |
| tree | c06211c9972d2b311116c8b4fee536896f38b394 /src/protocol/pubsub0/xsub.c | |
| parent | 4299a5b4edc59753a6ec857fabedadf1504c4243 (diff) | |
| download | nng-7b4a0a996aa6ed3e8fbbd9fd0e28811725707605.tar.gz nng-7b4a0a996aa6ed3e8fbbd9fd0e28811725707605.tar.bz2 nng-7b4a0a996aa6ed3e8fbbd9fd0e28811725707605.zip | |
Add PUB/SUB test suite.
This gets near 100% coverage of the PUB/SUB protocols.
The remaining uncovered bits will need to have a mock protocol that
runs slower, so that we can inject both back pressure, and also so
that we can inject "erroroneous" messages.
Diffstat (limited to 'src/protocol/pubsub0/xsub.c')
| -rw-r--r-- | src/protocol/pubsub0/xsub.c | 42 |
1 files changed, 17 insertions, 25 deletions
diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c index baa4f8eb..0013b8b3 100644 --- a/src/protocol/pubsub0/xsub.c +++ b/src/protocol/pubsub0/xsub.c @@ -41,7 +41,7 @@ struct xsub0_sock { struct xsub0_pipe { nni_pipe * pipe; xsub0_sock *sub; - nni_aio * aio_recv; + nni_aio aio_recv; }; static int @@ -77,7 +77,7 @@ xsub0_pipe_stop(void *arg) { xsub0_pipe *p = arg; - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_recv); } static void @@ -85,19 +85,15 @@ xsub0_pipe_fini(void *arg) { xsub0_pipe *p = arg; - nni_aio_free(p->aio_recv); + nni_aio_fini(&p->aio_recv); } static int xsub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { xsub0_pipe *p = arg; - int rv; - if ((rv = nni_aio_alloc(&p->aio_recv, xsub0_recv_cb, p)) != 0) { - xsub0_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->aio_recv, xsub0_recv_cb, p); p->pipe = pipe; p->sub = s; @@ -114,7 +110,7 @@ xsub0_pipe_start(void *arg) return (NNG_EPROTO); } - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -123,7 +119,7 @@ xsub0_pipe_close(void *arg) { xsub0_pipe *p = arg; - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_recv); } static void @@ -134,29 +130,25 @@ xsub0_recv_cb(void *arg) nni_msgq * urq = s->urq; nni_msg * msg; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - switch (nni_msgq_tryput(urq, msg)) { - case 0: - break; - case NNG_EAGAIN: + if (nni_msgq_tryput(urq, msg) != 0) { + // This only happens for two reasons. For flow control, + // in which case we just want to discard the message and + // carry on, and for a close of the socket (which is very + // hard to achieve, since we close the pipes.) In either + // case the easiest thing to do is just free the message + // and try again. nni_msg_free(msg); - break; - default: - // Any other error we stop the pipe for. It's probably - // NNG_ECLOSED anyway. - nng_msg_free(msg); - nni_pipe_close(p->pipe); - return; } - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); } static void |
