From 7b4a0a996aa6ed3e8fbbd9fd0e28811725707605 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 12 Jan 2020 16:11:19 -0800 Subject: Add PUB/SUB test suite. This gets near 100% coverage of the PUB/SUB protocols. The remaining uncovered bits will need to have a mock protocol that runs slower, so that we can inject both back pressure, and also so that we can inject "erroroneous" messages. --- src/protocol/pubsub0/sub.c | 35 +++++++++++------------------------ 1 file changed, 11 insertions(+), 24 deletions(-) (limited to 'src/protocol/pubsub0/sub.c') diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index c5b84313..b5dd7834 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -10,7 +10,6 @@ // #include -#include #include #include "core/nng_impl.h" @@ -55,7 +54,6 @@ struct sub0_ctx { sub0_sock * sock; nni_list topics; // TODO: Consider patricia trie nni_list recv_queue; // can have multiple pending receives - bool closed; nni_lmq lmq; bool prefer_new; }; @@ -74,7 +72,7 @@ struct sub0_sock { struct sub0_pipe { nni_pipe * pipe; sub0_sock *sub; - nni_aio * aio_recv; + nni_aio aio_recv; }; static void @@ -103,12 +101,6 @@ sub0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_lock(&sock->lk); - if (ctx->closed) { - nni_mtx_unlock(&sock->lk); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } - if (nni_lmq_empty(&ctx->lmq)) { int rv; if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) { @@ -148,7 +140,6 @@ sub0_ctx_close(void *arg) nni_aio * aio; nni_mtx_lock(&sock->lk); - ctx->closed = true; while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { nni_list_remove(&ctx->recv_queue, aio); nni_aio_finish_error(aio, NNG_ECLOSED); @@ -213,7 +204,7 @@ sub0_sock_fini(void *arg) sub0_sock *sock = arg; sub0_ctx_fini(&sock->master); - nni_pollable_fini(&sock->readable); + nni_pollable_fini(&sock->readable); nni_mtx_fini(&sock->lk); } @@ -257,7 +248,7 @@ sub0_pipe_stop(void *arg) { sub0_pipe *p = arg; - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_recv); } static void @@ -265,19 +256,15 @@ sub0_pipe_fini(void *arg) { sub0_pipe *p = arg; - nni_aio_free(p->aio_recv); + nni_aio_fini(&p->aio_recv); } static int sub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { sub0_pipe *p = arg; - int rv; - if ((rv = nni_aio_alloc(&p->aio_recv, sub0_recv_cb, p)) != 0) { - sub0_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->aio_recv, sub0_recv_cb, p); p->pipe = pipe; p->sub = s; @@ -294,7 +281,7 @@ sub0_pipe_start(void *arg) return (NNG_EPROTO); } - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -303,7 +290,7 @@ sub0_pipe_close(void *arg) { sub0_pipe *p = arg; - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_recv); } static bool @@ -338,15 +325,15 @@ sub0_recv_cb(void *arg) nng_aio * aio; bool submatch; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } nni_aio_list_init(&finish); - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); body = nni_msg_body(msg); @@ -415,7 +402,7 @@ sub0_recv_cb(void *arg) nni_pollable_raise(&sock->readable); } - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); } static int -- cgit v1.2.3-70-g09d2