From b21d7805523a407a14567017edbdef57ca81781f Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 8 Jan 2020 20:34:26 -0800 Subject: fixes #1094 Consider in-lining task and aio This only does it for rep, but it also has changes that should increase the overall test coverage for the REP protocol --- src/protocol/bus0/bus.c | 22 ++++++------ src/protocol/pair0/pair.c | 16 ++++----- src/protocol/pair1/pair.c | 20 +++++------ src/protocol/pipeline0/pull.c | 8 ++--- src/protocol/pipeline0/push.c | 12 +++---- src/protocol/pubsub0/pub.c | 8 ++--- src/protocol/pubsub0/sub.c | 4 +-- src/protocol/pubsub0/xsub.c | 4 +-- src/protocol/reqrep0/rep.c | 80 ++++++++++++++++++++--------------------- src/protocol/reqrep0/rep_test.c | 21 +++++++++++ src/protocol/reqrep0/req.c | 8 ++--- src/protocol/reqrep0/xrep.c | 20 +++++------ src/protocol/reqrep0/xreq.c | 16 ++++----- src/protocol/survey0/respond.c | 8 ++--- src/protocol/survey0/survey.c | 12 +++---- src/protocol/survey0/xrespond.c | 20 +++++------ src/protocol/survey0/xsurvey.c | 20 +++++------ 17 files changed, 158 insertions(+), 141 deletions(-) (limited to 'src/protocol') diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index afb12ef6..dea228a1 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -68,7 +68,7 @@ bus0_sock_fini(void *arg) { bus0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_mtx_fini(&s->mtx); } @@ -80,7 +80,7 @@ bus0_sock_init(void *arg, nni_sock *nsock) NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { bus0_sock_fini(s); return (rv); } @@ -99,7 +99,7 @@ bus0_sock_init_raw(void *arg, nni_sock *nsock) NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) { bus0_sock_fini(s); return (rv); } @@ -142,10 +142,10 @@ bus0_pipe_fini(void *arg) { bus0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); nni_msgq_fini(p->sendq); nni_mtx_fini(&p->mtx); } @@ -159,10 +159,10 @@ bus0_pipe_init(void *arg, nni_pipe *npipe, void *s) NNI_LIST_NODE_INIT(&p->node); nni_mtx_init(&p->mtx); if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { bus0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c index 860ac17f..730e5f5e 100644 --- a/src/protocol/pair0/pair.c +++ b/src/protocol/pair0/pair.c @@ -88,10 +88,10 @@ pair0_pipe_fini(void *arg) { pair0_pipe *p = arg; - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_getq); } static int @@ -100,10 +100,10 @@ pair0_pipe_init(void *arg, nni_pipe *npipe, void *psock) pair0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_send, pair0_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, pair0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, pair0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, pair0_putq_cb, p)) != 0)) { pair0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index 2838cb5d..b3b64a79 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -69,7 +69,7 @@ pair1_sock_fini(void *arg) { pair1_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->mtx); } @@ -88,7 +88,7 @@ pair1_sock_init_impl(void *arg, nni_sock *nsock, bool raw) // Raw mode uses this. nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) { pair1_sock_fini(s); return (rv); } @@ -147,10 +147,10 @@ pair1_pipe_fini(void *arg) { pair1_pipe *p = arg; - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_getq); nni_msgq_fini(p->sendq); } @@ -161,10 +161,10 @@ pair1_pipe_init(void *arg, nni_pipe *npipe, void *psock) int rv; if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { pair1_pipe_fini(p); return (NNG_ENOMEM); } diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index 64b47cef..8feb08b8 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -73,8 +73,8 @@ pull0_pipe_fini(void *arg) { pull0_pipe *p = arg; - nni_aio_fini(p->putq_aio); - nni_aio_fini(p->recv_aio); + nni_aio_free(p->putq_aio); + nni_aio_free(p->recv_aio); } static int @@ -83,8 +83,8 @@ pull0_pipe_init(void *arg, nni_pipe *pipe, void *s) pull0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->putq_aio, pull0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->recv_aio, pull0_recv_cb, p)) != 0)) { pull0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c index 5a932ece..90c94af9 100644 --- a/src/protocol/pipeline0/push.c +++ b/src/protocol/pipeline0/push.c @@ -89,9 +89,9 @@ push0_pipe_fini(void *arg) { push0_pipe *p = arg; - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_getq); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_getq); } static int @@ -100,9 +100,9 @@ push0_pipe_init(void *arg, nni_pipe *pipe, void *s) push0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_recv, push0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, push0_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, push0_getq_cb, p)) != 0)) { push0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index a42e95ff..9b995c33 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -111,8 +111,8 @@ pub0_pipe_fini(void *arg) { pub0_pipe *p = arg; - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); nni_lmq_fini(&p->sendq); } @@ -130,8 +130,8 @@ pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) // XXX: consider making this depth tunable if (((rv = nni_lmq_init(&p->sendq, len)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { pub0_pipe_fini(p); return (rv); diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 56da98f8..c5b84313 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -265,7 +265,7 @@ sub0_pipe_fini(void *arg) { sub0_pipe *p = arg; - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_recv); } static int @@ -274,7 +274,7 @@ sub0_pipe_init(void *arg, nni_pipe *pipe, void *s) sub0_pipe *p = arg; int rv; - if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) { + if ((rv = nni_aio_alloc(&p->aio_recv, sub0_recv_cb, p)) != 0) { sub0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c index be300df4..baa4f8eb 100644 --- a/src/protocol/pubsub0/xsub.c +++ b/src/protocol/pubsub0/xsub.c @@ -85,7 +85,7 @@ xsub0_pipe_fini(void *arg) { xsub0_pipe *p = arg; - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_recv); } static int @@ -94,7 +94,7 @@ xsub0_pipe_init(void *arg, nni_pipe *pipe, void *s) xsub0_pipe *p = arg; int rv; - if ((rv = nni_aio_init(&p->aio_recv, xsub0_recv_cb, p)) != 0) { + if ((rv = nni_aio_alloc(&p->aio_recv, xsub0_recv_cb, p)) != 0) { xsub0_pipe_fini(p); return (rv); } diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index a715ab59..a29c3120 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -48,14 +48,14 @@ struct rep0_ctx { // rep0_sock is our per-socket protocol private structure. struct rep0_sock { - nni_mtx lk; - int ttl; - nni_idhash * pipes; - nni_list recvpipes; // list of pipes with data to receive - nni_list recvq; - rep0_ctx ctx; - nni_pollable readable; - nni_pollable writable; + nni_mtx lk; + int ttl; + nni_idhash * pipes; + nni_list recvpipes; // list of pipes with data to receive + nni_list recvq; + rep0_ctx ctx; + nni_pollable readable; + nni_pollable writable; }; // rep0_pipe is our per-pipe protocol private structure. @@ -63,8 +63,8 @@ struct rep0_pipe { nni_pipe * pipe; rep0_sock * rep; uint32_t id; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; nni_list_node rnode; // receivable list linkage nni_list sendq; // contexts waiting to send bool busy; @@ -193,8 +193,8 @@ rep0_ctx_send(void *arg, nni_aio *aio) if (!p->busy) { p->busy = true; len = nni_msg_len(msg); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); nni_mtx_unlock(&s->lk); nni_aio_set_msg(aio, NULL); @@ -273,8 +273,8 @@ rep0_pipe_stop(void *arg) { rep0_pipe *p = arg; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -283,26 +283,22 @@ rep0_pipe_fini(void *arg) rep0_pipe *p = arg; nng_msg * msg; - if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) { - nni_aio_set_msg(p->aio_recv, NULL); + if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) { + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_free(msg); } - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); } static int rep0_pipe_init(void *arg, nni_pipe *pipe, void *s) { rep0_pipe *p = arg; - int rv; - if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) { - rep0_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p); NNI_LIST_INIT(&p->sendq, rep0_ctx, sqnode); @@ -329,7 +325,7 @@ rep0_pipe_start(void *arg) } // By definition, we have not received a request yet on this pipe, // so it cannot cause us to become writable. - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -340,8 +336,8 @@ rep0_pipe_close(void *arg) rep0_sock *s = p->rep; rep0_ctx * ctx; - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_mtx_lock(&s->lk); if (nni_list_active(&s->recvpipes, p)) { @@ -380,9 +376,9 @@ rep0_pipe_send_cb(void *arg) nni_msg * msg; size_t len; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->pipe); return; } @@ -406,8 +402,8 @@ rep0_pipe_send_cb(void *arg) msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); nni_mtx_unlock(&s->lk); @@ -462,13 +458,13 @@ rep0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_unlock(&s->lk); return; } - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_list_remove(&s->recvpipes, p); if (nni_list_empty(&s->recvpipes)) { nni_pollable_clear(&s->readable); } - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); if ((ctx == &s->ctx) && !p->busy) { nni_pollable_raise(&s->writable); } @@ -496,12 +492,12 @@ rep0_pipe_recv_cb(void *arg) size_t len; int hops; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - msg = nni_aio_get_msg(p->aio_recv); + msg = nni_aio_get_msg(&p->aio_recv); nni_msg_set_pipe(msg, p->id); @@ -521,7 +517,7 @@ rep0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer is speaking garbage. Kick it. nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); nni_pipe_close(p->pipe); return; } @@ -552,13 +548,13 @@ rep0_pipe_recv_cb(void *arg) nni_list_remove(&s->recvq, ctx); aio = ctx->raio; ctx->raio = NULL; - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); if ((ctx == &s->ctx) && !p->busy) { nni_pollable_raise(&s->writable); } // schedule another receive - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); ctx->btrace_len = len; memcpy(ctx->btrace, nni_msg_header(msg), len); @@ -573,8 +569,8 @@ rep0_pipe_recv_cb(void *arg) drop: nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_recv(p->pipe, p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->pipe, &p->aio_recv); } static int diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c index 879d6ae4..f339e68d 100644 --- a/src/protocol/reqrep0/rep_test.c +++ b/src/protocol/reqrep0/rep_test.c @@ -272,6 +272,26 @@ test_rep_close_pipe_during_send(void) TEST_NNG_PASS(nng_close(rep)); } +void +test_rep_ctx_recv_aio_stopped(void) +{ + nng_socket rep; + nng_ctx ctx; + nng_aio * aio; + + TEST_NNG_PASS(nng_rep0_open(&rep)); + TEST_NNG_PASS(nng_aio_alloc(&aio, NULL, NULL)); + TEST_NNG_PASS(nng_ctx_open(&ctx, rep)); + + nng_aio_stop(aio); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + TEST_NNG_FAIL(nng_aio_result(aio), NNG_ECANCELED); + TEST_NNG_PASS(nng_ctx_close(ctx)); + TEST_NNG_PASS(nng_close(rep)); + nng_aio_free(aio); +} + void test_rep_close_pipe_context_send(void) { @@ -424,6 +444,7 @@ TEST_LIST = { { "rep double recv", test_rep_double_recv }, { "rep close pipe before send", test_rep_close_pipe_before_send }, { "rep close pipe during send", test_rep_close_pipe_during_send }, + { "rep recv aio ctx stopped", test_rep_ctx_recv_aio_stopped }, { "rep close pipe context send", test_rep_close_pipe_context_send }, { "rep close context send", test_rep_close_context_send }, { "rep recv garbage", test_rep_recv_garbage }, diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 33629abc..14da7143 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -184,8 +184,8 @@ req0_pipe_fini(void *arg) { req0_pipe *p = arg; - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_send); } static int @@ -194,8 +194,8 @@ req0_pipe_init(void *arg, nni_pipe *pipe, void *s) req0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, req0_send_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_recv, req0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, req0_send_cb, p)) != 0)) { req0_pipe_fini(p); return (rv); } diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index 48f74075..308c0f0e 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -62,7 +62,7 @@ xrep0_sock_fini(void *arg) { xrep0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->lk); } @@ -75,7 +75,7 @@ xrep0_sock_init(void *arg, nni_sock *sock) nni_mtx_init(&s->lk); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) { + ((rv = nni_aio_alloc(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) { xrep0_sock_fini(s); return (rv); } @@ -120,10 +120,10 @@ xrep0_pipe_fini(void *arg) { xrep0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); nni_msgq_fini(p->sendq); } @@ -146,10 +146,10 @@ xrep0_pipe_init(void *arg, nni_pipe *pipe, void *s) // willing to receive replies. Something to think about for the // future.) if (((rv = nni_msgq_init(&p->sendq, 64)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) { xrep0_pipe_fini(p); return (rv); } diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index 7455c986..15652f4f 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -96,10 +96,10 @@ xreq0_pipe_fini(void *arg) { xreq0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_send); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_send); } static int @@ -108,10 +108,10 @@ xreq0_pipe_init(void *arg, nni_pipe *pipe, void *s) xreq0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_getq, xreq0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xreq0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xreq0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xreq0_send_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_getq, xreq0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, xreq0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xreq0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xreq0_send_cb, p)) != 0)) { xreq0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index b4ffc917..06010d99 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -281,8 +281,8 @@ resp0_pipe_fini(void *arg) nni_aio_set_msg(p->aio_recv, NULL); nni_msg_free(msg); } - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); } static int @@ -291,8 +291,8 @@ resp0_pipe_init(void *arg, nni_pipe *npipe, void *s) resp0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) { resp0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index 8aa05dd4..35a14de7 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -286,9 +286,9 @@ surv0_pipe_fini(void *arg) { surv0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); nni_msgq_fini(p->sendq); } @@ -303,9 +303,9 @@ surv0_pipe_init(void *arg, nni_pipe *npipe, void *s) // is best effort, and a deep queue doesn't really do much for us. // Note that surveys can be *outstanding*, but not yet put on the wire. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, surv0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) { surv0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c index 66b340ee..6318fe8b 100644 --- a/src/protocol/survey0/xrespond.c +++ b/src/protocol/survey0/xrespond.c @@ -62,7 +62,7 @@ xresp0_sock_fini(void *arg) { xresp0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->mtx); } @@ -75,7 +75,7 @@ xresp0_sock_init(void *arg, nni_sock *nsock) nni_mtx_init(&s->mtx); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) { + ((rv = nni_aio_alloc(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) { xresp0_sock_fini(s); return (rv); } @@ -119,10 +119,10 @@ xresp0_pipe_fini(void *arg) { xresp0_pipe *p = arg; - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); nni_msgq_fini(p->sendq); } @@ -133,10 +133,10 @@ xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s) int rv; if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xresp0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xresp0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, xresp0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xresp0_send_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_putq, xresp0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xresp0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, xresp0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xresp0_send_cb, p)) != 0)) { xresp0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c index 43c83793..86f912a2 100644 --- a/src/protocol/survey0/xsurvey.c +++ b/src/protocol/survey0/xsurvey.c @@ -60,7 +60,7 @@ xsurv0_sock_fini(void *arg) { xsurv0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_mtx_fini(&s->mtx); } @@ -70,7 +70,7 @@ xsurv0_sock_init(void *arg, nni_sock *nsock) xsurv0_sock *s = arg; int rv; - if ((rv = nni_aio_init(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) { xsurv0_sock_fini(s); return (rv); } @@ -116,10 +116,10 @@ xsurv0_pipe_fini(void *arg) { xsurv0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); nni_msgq_fini(p->sendq); } @@ -136,10 +136,10 @@ xsurv0_pipe_init(void *arg, nni_pipe *npipe, void *s) // an expiration with them, so that we could discard any that are // not delivered before their expiration date. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, xsurv0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xsurv0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xsurv0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, xsurv0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, xsurv0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xsurv0_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) { xsurv0_pipe_fini(p); return (rv); } -- cgit v1.2.3-70-g09d2