aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-08 20:34:26 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-08 21:16:30 -0800
commitb21d7805523a407a14567017edbdef57ca81781f (patch)
treee07f08bdc047ee4dfb057b670766e3de5bf2f981 /src/protocol
parent8479b4c8861c77cfd9eb64e724615605bdd1cbcb (diff)
downloadnng-b21d7805523a407a14567017edbdef57ca81781f.tar.gz
nng-b21d7805523a407a14567017edbdef57ca81781f.tar.bz2
nng-b21d7805523a407a14567017edbdef57ca81781f.zip
fixes #1094 Consider in-lining task and aio
This only does it for rep, but it also has changes that should increase the overall test coverage for the REP protocol
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/bus0/bus.c22
-rw-r--r--src/protocol/pair0/pair.c16
-rw-r--r--src/protocol/pair1/pair.c20
-rw-r--r--src/protocol/pipeline0/pull.c8
-rw-r--r--src/protocol/pipeline0/push.c12
-rw-r--r--src/protocol/pubsub0/pub.c8
-rw-r--r--src/protocol/pubsub0/sub.c4
-rw-r--r--src/protocol/pubsub0/xsub.c4
-rw-r--r--src/protocol/reqrep0/rep.c80
-rw-r--r--src/protocol/reqrep0/rep_test.c21
-rw-r--r--src/protocol/reqrep0/req.c8
-rw-r--r--src/protocol/reqrep0/xrep.c20
-rw-r--r--src/protocol/reqrep0/xreq.c16
-rw-r--r--src/protocol/survey0/respond.c8
-rw-r--r--src/protocol/survey0/survey.c12
-rw-r--r--src/protocol/survey0/xrespond.c20
-rw-r--r--src/protocol/survey0/xsurvey.c20
17 files changed, 158 insertions, 141 deletions
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index afb12ef6..dea228a1 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -68,7 +68,7 @@ bus0_sock_fini(void *arg)
{
bus0_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_aio_free(s->aio_getq);
nni_mtx_fini(&s->mtx);
}
@@ -80,7 +80,7 @@ bus0_sock_init(void *arg, nni_sock *nsock)
NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) {
+ if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) {
bus0_sock_fini(s);
return (rv);
}
@@ -99,7 +99,7 @@ bus0_sock_init_raw(void *arg, nni_sock *nsock)
NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) {
+ if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) {
bus0_sock_fini(s);
return (rv);
}
@@ -142,10 +142,10 @@ bus0_pipe_fini(void *arg)
{
bus0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_putq);
nni_msgq_fini(p->sendq);
nni_mtx_fini(&p->mtx);
}
@@ -159,10 +159,10 @@ bus0_pipe_init(void *arg, nni_pipe *npipe, void *s)
NNI_LIST_NODE_INIT(&p->node);
nni_mtx_init(&p->mtx);
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, bus0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) {
bus0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c
index 860ac17f..730e5f5e 100644
--- a/src/protocol/pair0/pair.c
+++ b/src/protocol/pair0/pair.c
@@ -88,10 +88,10 @@ pair0_pipe_fini(void *arg)
{
pair0_pipe *p = arg;
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
- nni_aio_fini(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_putq);
+ nni_aio_free(p->aio_getq);
}
static int
@@ -100,10 +100,10 @@ pair0_pipe_init(void *arg, nni_pipe *npipe, void *psock)
pair0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->aio_send, pair0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, pair0_putq_cb, p)) != 0)) {
pair0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index 2838cb5d..b3b64a79 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -69,7 +69,7 @@ pair1_sock_fini(void *arg)
{
pair1_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_aio_free(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
}
@@ -88,7 +88,7 @@ pair1_sock_init_impl(void *arg, nni_sock *nsock, bool raw)
// Raw mode uses this.
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) {
+ if ((rv = nni_aio_alloc(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) {
pair1_sock_fini(s);
return (rv);
}
@@ -147,10 +147,10 @@ pair1_pipe_fini(void *arg)
{
pair1_pipe *p = arg;
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
- nni_aio_fini(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_putq);
+ nni_aio_free(p->aio_getq);
nni_msgq_fini(p->sendq);
}
@@ -161,10 +161,10 @@ pair1_pipe_init(void *arg, nni_pipe *npipe, void *psock)
int rv;
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) {
pair1_pipe_fini(p);
return (NNG_ENOMEM);
}
diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c
index 64b47cef..8feb08b8 100644
--- a/src/protocol/pipeline0/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -73,8 +73,8 @@ pull0_pipe_fini(void *arg)
{
pull0_pipe *p = arg;
- nni_aio_fini(p->putq_aio);
- nni_aio_fini(p->recv_aio);
+ nni_aio_free(p->putq_aio);
+ nni_aio_free(p->recv_aio);
}
static int
@@ -83,8 +83,8 @@ pull0_pipe_init(void *arg, nni_pipe *pipe, void *s)
pull0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->putq_aio, pull0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->recv_aio, pull0_recv_cb, p)) != 0)) {
pull0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c
index 5a932ece..90c94af9 100644
--- a/src/protocol/pipeline0/push.c
+++ b/src/protocol/pipeline0/push.c
@@ -89,9 +89,9 @@ push0_pipe_fini(void *arg)
{
push0_pipe *p = arg;
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_getq);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_getq);
}
static int
@@ -100,9 +100,9 @@ push0_pipe_init(void *arg, nni_pipe *pipe, void *s)
push0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->aio_recv, push0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, push0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_getq, push0_getq_cb, p)) != 0)) {
push0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index a42e95ff..9b995c33 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -111,8 +111,8 @@ pub0_pipe_fini(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
nni_lmq_fini(&p->sendq);
}
@@ -130,8 +130,8 @@ pub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
// XXX: consider making this depth tunable
if (((rv = nni_lmq_init(&p->sendq, len)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
pub0_pipe_fini(p);
return (rv);
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index 56da98f8..c5b84313 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -265,7 +265,7 @@ sub0_pipe_fini(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_recv);
}
static int
@@ -274,7 +274,7 @@ sub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
sub0_pipe *p = arg;
int rv;
- if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) {
+ if ((rv = nni_aio_alloc(&p->aio_recv, sub0_recv_cb, p)) != 0) {
sub0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c
index be300df4..baa4f8eb 100644
--- a/src/protocol/pubsub0/xsub.c
+++ b/src/protocol/pubsub0/xsub.c
@@ -85,7 +85,7 @@ xsub0_pipe_fini(void *arg)
{
xsub0_pipe *p = arg;
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_recv);
}
static int
@@ -94,7 +94,7 @@ xsub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
xsub0_pipe *p = arg;
int rv;
- if ((rv = nni_aio_init(&p->aio_recv, xsub0_recv_cb, p)) != 0) {
+ if ((rv = nni_aio_alloc(&p->aio_recv, xsub0_recv_cb, p)) != 0) {
xsub0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index a715ab59..a29c3120 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -48,14 +48,14 @@ struct rep0_ctx {
// rep0_sock is our per-socket protocol private structure.
struct rep0_sock {
- nni_mtx lk;
- int ttl;
- nni_idhash * pipes;
- nni_list recvpipes; // list of pipes with data to receive
- nni_list recvq;
- rep0_ctx ctx;
- nni_pollable readable;
- nni_pollable writable;
+ nni_mtx lk;
+ int ttl;
+ nni_idhash * pipes;
+ nni_list recvpipes; // list of pipes with data to receive
+ nni_list recvq;
+ rep0_ctx ctx;
+ nni_pollable readable;
+ nni_pollable writable;
};
// rep0_pipe is our per-pipe protocol private structure.
@@ -63,8 +63,8 @@ struct rep0_pipe {
nni_pipe * pipe;
rep0_sock * rep;
uint32_t id;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_recv;
nni_list_node rnode; // receivable list linkage
nni_list sendq; // contexts waiting to send
bool busy;
@@ -193,8 +193,8 @@ rep0_ctx_send(void *arg, nni_aio *aio)
if (!p->busy) {
p->busy = true;
len = nni_msg_len(msg);
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
nni_mtx_unlock(&s->lk);
nni_aio_set_msg(aio, NULL);
@@ -273,8 +273,8 @@ rep0_pipe_stop(void *arg)
{
rep0_pipe *p = arg;
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
+ nni_aio_stop(&p->aio_send);
+ nni_aio_stop(&p->aio_recv);
}
static void
@@ -283,26 +283,22 @@ rep0_pipe_fini(void *arg)
rep0_pipe *p = arg;
nng_msg * msg;
- if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) {
- nni_aio_set_msg(p->aio_recv, NULL);
+ if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) {
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_free(msg);
}
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
}
static int
rep0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
rep0_pipe *p = arg;
- int rv;
- if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) {
- rep0_pipe_fini(p);
- return (rv);
- }
+ nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p);
NNI_LIST_INIT(&p->sendq, rep0_ctx, sqnode);
@@ -329,7 +325,7 @@ rep0_pipe_start(void *arg)
}
// By definition, we have not received a request yet on this pipe,
// so it cannot cause us to become writable.
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -340,8 +336,8 @@ rep0_pipe_close(void *arg)
rep0_sock *s = p->rep;
rep0_ctx * ctx;
- nni_aio_close(p->aio_send);
- nni_aio_close(p->aio_recv);
+ nni_aio_close(&p->aio_send);
+ nni_aio_close(&p->aio_recv);
nni_mtx_lock(&s->lk);
if (nni_list_active(&s->recvpipes, p)) {
@@ -380,9 +376,9 @@ rep0_pipe_send_cb(void *arg)
nni_msg * msg;
size_t len;
- if (nni_aio_result(p->aio_send) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->pipe);
return;
}
@@ -406,8 +402,8 @@ rep0_pipe_send_cb(void *arg)
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
nni_aio_set_msg(aio, NULL);
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
nni_mtx_unlock(&s->lk);
@@ -462,13 +458,13 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
nni_mtx_unlock(&s->lk);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_list_remove(&s->recvpipes, p);
if (nni_list_empty(&s->recvpipes)) {
nni_pollable_clear(&s->readable);
}
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
if ((ctx == &s->ctx) && !p->busy) {
nni_pollable_raise(&s->writable);
}
@@ -496,12 +492,12 @@ rep0_pipe_recv_cb(void *arg)
size_t len;
int hops;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
+ msg = nni_aio_get_msg(&p->aio_recv);
nni_msg_set_pipe(msg, p->id);
@@ -521,7 +517,7 @@ rep0_pipe_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer is speaking garbage. Kick it.
nni_msg_free(msg);
- nni_aio_set_msg(p->aio_recv, NULL);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_pipe_close(p->pipe);
return;
}
@@ -552,13 +548,13 @@ rep0_pipe_recv_cb(void *arg)
nni_list_remove(&s->recvq, ctx);
aio = ctx->raio;
ctx->raio = NULL;
- nni_aio_set_msg(p->aio_recv, NULL);
+ nni_aio_set_msg(&p->aio_recv, NULL);
if ((ctx == &s->ctx) && !p->busy) {
nni_pollable_raise(&s->writable);
}
// schedule another receive
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
ctx->btrace_len = len;
memcpy(ctx->btrace, nni_msg_header(msg), len);
@@ -573,8 +569,8 @@ rep0_pipe_recv_cb(void *arg)
drop:
nni_msg_free(msg);
- nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
}
static int
diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c
index 879d6ae4..f339e68d 100644
--- a/src/protocol/reqrep0/rep_test.c
+++ b/src/protocol/reqrep0/rep_test.c
@@ -273,6 +273,26 @@ test_rep_close_pipe_during_send(void)
}
void
+test_rep_ctx_recv_aio_stopped(void)
+{
+ nng_socket rep;
+ nng_ctx ctx;
+ nng_aio * aio;
+
+ TEST_NNG_PASS(nng_rep0_open(&rep));
+ TEST_NNG_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ TEST_NNG_PASS(nng_ctx_open(&ctx, rep));
+
+ nng_aio_stop(aio);
+ nng_ctx_recv(ctx, aio);
+ nng_aio_wait(aio);
+ TEST_NNG_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ TEST_NNG_PASS(nng_ctx_close(ctx));
+ TEST_NNG_PASS(nng_close(rep));
+ nng_aio_free(aio);
+}
+
+void
test_rep_close_pipe_context_send(void)
{
nng_socket rep;
@@ -424,6 +444,7 @@ TEST_LIST = {
{ "rep double recv", test_rep_double_recv },
{ "rep close pipe before send", test_rep_close_pipe_before_send },
{ "rep close pipe during send", test_rep_close_pipe_during_send },
+ { "rep recv aio ctx stopped", test_rep_ctx_recv_aio_stopped },
{ "rep close pipe context send", test_rep_close_pipe_context_send },
{ "rep close context send", test_rep_close_context_send },
{ "rep recv garbage", test_rep_recv_garbage },
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 33629abc..14da7143 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -184,8 +184,8 @@ req0_pipe_fini(void *arg)
{
req0_pipe *p = arg;
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_send);
}
static int
@@ -194,8 +194,8 @@ req0_pipe_init(void *arg, nni_pipe *pipe, void *s)
req0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, req0_send_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->aio_recv, req0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, req0_send_cb, p)) != 0)) {
req0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index 48f74075..308c0f0e 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -62,7 +62,7 @@ xrep0_sock_fini(void *arg)
{
xrep0_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_aio_free(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->lk);
}
@@ -75,7 +75,7 @@ xrep0_sock_init(void *arg, nni_sock *sock)
nni_mtx_init(&s->lk);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) {
+ ((rv = nni_aio_alloc(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) {
xrep0_sock_fini(s);
return (rv);
}
@@ -120,10 +120,10 @@ xrep0_pipe_fini(void *arg)
{
xrep0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_putq);
nni_msgq_fini(p->sendq);
}
@@ -146,10 +146,10 @@ xrep0_pipe_init(void *arg, nni_pipe *pipe, void *s)
// willing to receive replies. Something to think about for the
// future.)
if (((rv = nni_msgq_init(&p->sendq, 64)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) {
xrep0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
index 7455c986..15652f4f 100644
--- a/src/protocol/reqrep0/xreq.c
+++ b/src/protocol/reqrep0/xreq.c
@@ -96,10 +96,10 @@ xreq0_pipe_fini(void *arg)
{
xreq0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_putq);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_send);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_putq);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_send);
}
static int
@@ -108,10 +108,10 @@ xreq0_pipe_init(void *arg, nni_pipe *pipe, void *s)
xreq0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->aio_getq, xreq0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, xreq0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, xreq0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, xreq0_send_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->aio_getq, xreq0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, xreq0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, xreq0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, xreq0_send_cb, p)) != 0)) {
xreq0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index b4ffc917..06010d99 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -281,8 +281,8 @@ resp0_pipe_fini(void *arg)
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_free(msg);
}
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
}
static int
@@ -291,8 +291,8 @@ resp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
resp0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) {
resp0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index 8aa05dd4..35a14de7 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -286,9 +286,9 @@ surv0_pipe_fini(void *arg)
{
surv0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
nni_msgq_fini(p->sendq);
}
@@ -303,9 +303,9 @@ surv0_pipe_init(void *arg, nni_pipe *npipe, void *s)
// is best effort, and a deep queue doesn't really do much for us.
// Note that surveys can be *outstanding*, but not yet put on the wire.
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, surv0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) {
surv0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
index 66b340ee..6318fe8b 100644
--- a/src/protocol/survey0/xrespond.c
+++ b/src/protocol/survey0/xrespond.c
@@ -62,7 +62,7 @@ xresp0_sock_fini(void *arg)
{
xresp0_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_aio_free(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
}
@@ -75,7 +75,7 @@ xresp0_sock_init(void *arg, nni_sock *nsock)
nni_mtx_init(&s->mtx);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) {
+ ((rv = nni_aio_alloc(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) {
xresp0_sock_fini(s);
return (rv);
}
@@ -119,10 +119,10 @@ xresp0_pipe_fini(void *arg)
{
xresp0_pipe *p = arg;
- nni_aio_fini(p->aio_putq);
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_putq);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
nni_msgq_fini(p->sendq);
}
@@ -133,10 +133,10 @@ xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
int rv;
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, xresp0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, xresp0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, xresp0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, xresp0_send_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_putq, xresp0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, xresp0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_getq, xresp0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, xresp0_send_cb, p)) != 0)) {
xresp0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
index 43c83793..86f912a2 100644
--- a/src/protocol/survey0/xsurvey.c
+++ b/src/protocol/survey0/xsurvey.c
@@ -60,7 +60,7 @@ xsurv0_sock_fini(void *arg)
{
xsurv0_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_aio_free(s->aio_getq);
nni_mtx_fini(&s->mtx);
}
@@ -70,7 +70,7 @@ xsurv0_sock_init(void *arg, nni_sock *nsock)
xsurv0_sock *s = arg;
int rv;
- if ((rv = nni_aio_init(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) {
+ if ((rv = nni_aio_alloc(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) {
xsurv0_sock_fini(s);
return (rv);
}
@@ -116,10 +116,10 @@ xsurv0_pipe_fini(void *arg)
{
xsurv0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_putq);
nni_msgq_fini(p->sendq);
}
@@ -136,10 +136,10 @@ xsurv0_pipe_init(void *arg, nni_pipe *npipe, void *s)
// an expiration with them, so that we could discard any that are
// not delivered before their expiration date.
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, xsurv0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, xsurv0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, xsurv0_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_getq, xsurv0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, xsurv0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, xsurv0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) {
xsurv0_pipe_fini(p);
return (rv);
}