aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0/sub.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-12 16:11:19 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-12 23:03:06 -0800
commit7b4a0a996aa6ed3e8fbbd9fd0e28811725707605 (patch)
treec06211c9972d2b311116c8b4fee536896f38b394 /src/protocol/pubsub0/sub.c
parent4299a5b4edc59753a6ec857fabedadf1504c4243 (diff)
downloadnng-7b4a0a996aa6ed3e8fbbd9fd0e28811725707605.tar.gz
nng-7b4a0a996aa6ed3e8fbbd9fd0e28811725707605.tar.bz2
nng-7b4a0a996aa6ed3e8fbbd9fd0e28811725707605.zip
Add PUB/SUB test suite.
This gets near 100% coverage of the PUB/SUB protocols. The remaining uncovered bits will need to have a mock protocol that runs slower, so that we can inject both back pressure, and also so that we can inject "erroroneous" messages.
Diffstat (limited to 'src/protocol/pubsub0/sub.c')
-rw-r--r--src/protocol/pubsub0/sub.c35
1 files changed, 11 insertions, 24 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index c5b84313..b5dd7834 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -10,7 +10,6 @@
//
#include <stdbool.h>
-#include <stdlib.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -55,7 +54,6 @@ struct sub0_ctx {
sub0_sock * sock;
nni_list topics; // TODO: Consider patricia trie
nni_list recv_queue; // can have multiple pending receives
- bool closed;
nni_lmq lmq;
bool prefer_new;
};
@@ -74,7 +72,7 @@ struct sub0_sock {
struct sub0_pipe {
nni_pipe * pipe;
sub0_sock *sub;
- nni_aio * aio_recv;
+ nni_aio aio_recv;
};
static void
@@ -103,12 +101,6 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
nni_mtx_lock(&sock->lk);
- if (ctx->closed) {
- nni_mtx_unlock(&sock->lk);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
-
if (nni_lmq_empty(&ctx->lmq)) {
int rv;
if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) {
@@ -148,7 +140,6 @@ sub0_ctx_close(void *arg)
nni_aio * aio;
nni_mtx_lock(&sock->lk);
- ctx->closed = true;
while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) {
nni_list_remove(&ctx->recv_queue, aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -213,7 +204,7 @@ sub0_sock_fini(void *arg)
sub0_sock *sock = arg;
sub0_ctx_fini(&sock->master);
- nni_pollable_fini(&sock->readable);
+ nni_pollable_fini(&sock->readable);
nni_mtx_fini(&sock->lk);
}
@@ -257,7 +248,7 @@ sub0_pipe_stop(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_stop(p->aio_recv);
+ nni_aio_stop(&p->aio_recv);
}
static void
@@ -265,19 +256,15 @@ sub0_pipe_fini(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_free(p->aio_recv);
+ nni_aio_fini(&p->aio_recv);
}
static int
sub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
sub0_pipe *p = arg;
- int rv;
- if ((rv = nni_aio_alloc(&p->aio_recv, sub0_recv_cb, p)) != 0) {
- sub0_pipe_fini(p);
- return (rv);
- }
+ nni_aio_init(&p->aio_recv, sub0_recv_cb, p);
p->pipe = pipe;
p->sub = s;
@@ -294,7 +281,7 @@ sub0_pipe_start(void *arg)
return (NNG_EPROTO);
}
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -303,7 +290,7 @@ sub0_pipe_close(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_close(p->aio_recv);
+ nni_aio_close(&p->aio_recv);
}
static bool
@@ -338,15 +325,15 @@ sub0_recv_cb(void *arg)
nng_aio * aio;
bool submatch;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
nni_aio_list_init(&finish);
- msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
body = nni_msg_body(msg);
@@ -415,7 +402,7 @@ sub0_recv_cb(void *arg)
nni_pollable_raise(&sock->readable);
}
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
}
static int