diff options
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/bus/bus.c | 158 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 75 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 16 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 71 | ||||
| -rw-r--r-- | src/protocol/pubsub/pub.c | 90 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 13 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 100 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 113 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 143 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 86 |
10 files changed, 523 insertions, 342 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index ca24c32c..cb624a10 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -37,6 +37,7 @@ struct nni_bus_sock { int raw; nni_aio aio_getq; nni_list pipes; + nni_mtx mtx; }; // An nni_bus_pipe is our per-pipe protocol private structure. @@ -49,8 +50,24 @@ struct nni_bus_pipe { nni_aio aio_recv; nni_aio aio_send; nni_aio aio_putq; + nni_mtx mtx; + int refcnt; }; + +static void +nni_bus_sock_fini(void *arg) +{ + nni_bus_sock *psock = arg; + + if (psock != NULL) { + nni_aio_fini(&psock->aio_getq); + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); + } +} + + static int nni_bus_sock_init(void **sp, nni_sock *nsock) { @@ -60,38 +77,49 @@ nni_bus_sock_init(void **sp, nni_sock *nsock) if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } + NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node); + if ((rv = nni_mtx_init(&psock->mtx)) != 0) { + goto fail; + } rv = nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock); if (rv != 0) { - NNI_FREE_STRUCT(psock); - return (rv); + goto fail; } - NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node); psock->nsock = nsock; psock->raw = 0; *sp = psock; return (0); + +fail: + nni_bus_sock_fini(psock); + return (rv); } static void -nni_bus_sock_fini(void *arg) +nni_bus_sock_open(void *arg) { nni_bus_sock *psock = arg; - if (psock != NULL) { - nni_aio_fini(&psock->aio_getq); - NNI_FREE_STRUCT(psock); - } + nni_bus_sock_getq(psock); } static void -nni_bus_sock_open(void *arg) +nni_bus_pipe_fini(void *arg) { - nni_bus_sock *psock = arg; + nni_bus_pipe *ppipe = arg; - nni_bus_sock_getq(psock); + if (ppipe != NULL) { + nni_msgq_fini(ppipe->sendq); + nni_mtx_fini(&ppipe->mtx); + nni_aio_fini(&ppipe->aio_getq); + nni_aio_fini(&ppipe->aio_send); + nni_aio_fini(&ppipe->aio_recv); + nni_aio_fini(&ppipe->aio_putq); + NNI_FREE_STRUCT(ppipe); + } } @@ -105,62 +133,36 @@ nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (NNG_ENOMEM); } NNI_LIST_NODE_INIT(&ppipe->node); - // This depth could be tunable. - if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { - NNI_FREE_STRUCT(ppipe); - return (rv); + ppipe->refcnt = 0; + if (((rv = nni_mtx_init(&ppipe->mtx)) != 0) || + ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0)) { + goto fail; } rv = nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe); if (rv != 0) { - nni_msgq_fini(ppipe->sendq); - NNI_FREE_STRUCT(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe); if (rv != 0) { - nni_aio_fini(&ppipe->aio_getq); - nni_msgq_fini(ppipe->sendq); - NNI_FREE_STRUCT(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe); if (rv != 0) { - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_getq); - nni_msgq_fini(ppipe->sendq); - NNI_FREE_STRUCT(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe); if (rv != 0) { - nni_aio_fini(&ppipe->aio_recv); - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_getq); - nni_msgq_fini(ppipe->sendq); - NNI_FREE_STRUCT(ppipe); - return (rv); + goto fail; } ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); -} - - -static void -nni_bus_pipe_fini(void *arg) -{ - nni_bus_pipe *ppipe = arg; - if (ppipe != NULL) { - nni_msgq_fini(ppipe->sendq); - nni_aio_fini(&ppipe->aio_getq); - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_recv); - nni_aio_fini(&ppipe->aio_putq); - NNI_FREE_STRUCT(ppipe); - } +fail: + nni_bus_pipe_fini(ppipe); + return (rv); } @@ -170,29 +172,56 @@ nni_bus_pipe_start(void *arg) nni_bus_pipe *ppipe = arg; nni_bus_sock *psock = ppipe->psock; + nni_mtx_lock(&psock->mtx); nni_list_append(&psock->pipes, ppipe); + nni_mtx_unlock(&psock->mtx); + + // Mark the ppipe busy twice -- once for each of the oustanding + // asynchronous "threads" of operation. + nni_mtx_lock(&ppipe->mtx); + ppipe->refcnt = 2; + nni_mtx_unlock(&ppipe->mtx); - nni_pipe_hold(ppipe->npipe); nni_bus_pipe_recv(ppipe); - nni_pipe_hold(ppipe->npipe); nni_bus_pipe_getq(ppipe); return (0); } +// nni_bus_pipe_stop is called only internally when one of our handlers notices +// that the transport layer has closed. This allows us to stop all further +// actions. static void -nni_bus_pipe_stop(void *arg) +nni_bus_pipe_stop(nni_bus_pipe *ppipe) { - nni_bus_pipe *ppipe = arg; + int refcnt; nni_bus_sock *psock = ppipe->psock; - nni_sock *nsock = psock->nsock; + // As we are called only on error paths, shut down the underlying + // pipe transport. This should cause any other consumer to also get + // a suitable error (NNG_ECLOSED), so that we can shut down completely. + nni_pipe_close(ppipe->npipe); + + nni_mtx_lock(&ppipe->psock->mtx); if (nni_list_active(&psock->pipes, ppipe)) { nni_list_remove(&psock->pipes, ppipe); nni_msgq_close(ppipe->sendq); - nni_msgq_aio_cancel(nni_sock_recvq(nsock), &ppipe->aio_putq); + nni_msgq_aio_cancel(nni_sock_recvq(psock->nsock), + &ppipe->aio_putq); + } + nni_mtx_unlock(&ppipe->psock->mtx); + + nni_mtx_lock(&ppipe->mtx); + NNI_ASSERT(ppipe->refcnt > 0); + refcnt = --ppipe->refcnt; + nni_mtx_unlock(&ppipe->mtx); + + // If we are done with the pipe, let the system know so it can + // deregister it. + if (refcnt == 0) { + nni_pipe_remove(ppipe->npipe); } } @@ -204,8 +233,7 @@ nni_bus_pipe_getq_cb(void *arg) if (nni_aio_result(&ppipe->aio_getq) != 0) { // closed? - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_bus_pipe_stop(ppipe); return; } ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg; @@ -224,8 +252,7 @@ nni_bus_pipe_send_cb(void *arg) // closed? nni_msg_free(ppipe->aio_send.a_msg); ppipe->aio_send.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_bus_pipe_stop(ppipe); return; } @@ -242,8 +269,7 @@ nni_bus_pipe_recv_cb(void *arg) uint32_t id; if (nni_aio_result(&ppipe->aio_recv) != 0) { - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_bus_pipe_stop(ppipe); return; } msg = ppipe->aio_recv.a_msg; @@ -252,8 +278,7 @@ nni_bus_pipe_recv_cb(void *arg) if (nni_msg_prepend_header(msg, &id, 4) != 0) { // XXX: bump a nomemory stat nni_msg_free(msg); - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_bus_pipe_stop(ppipe); return; } @@ -270,8 +295,7 @@ nni_bus_pipe_putq_cb(void *arg) if (nni_aio_result(&ppipe->aio_putq) != 0) { nni_msg_free(ppipe->aio_putq.a_msg); ppipe->aio_putq.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_bus_pipe_stop(ppipe); return; } @@ -287,7 +311,6 @@ nni_bus_sock_getq_cb(void *arg) nni_bus_pipe *ppipe; nni_bus_pipe *lpipe; nni_msgq *uwq = nni_sock_sendq(psock->nsock); - nni_mtx *mx = nni_sock_mtx(psock->nsock); nni_msg *msg, *dup; uint32_t sender; @@ -308,7 +331,7 @@ nni_bus_sock_getq_cb(void *arg) sender = 0; } - nni_mtx_lock(mx); + nni_mtx_lock(&psock->mtx); lpipe = nni_list_last(&psock->pipes); NNI_LIST_FOREACH (&psock->pipes, ppipe) { if (nni_pipe_id(ppipe->npipe) == sender) { @@ -325,7 +348,7 @@ nni_bus_sock_getq_cb(void *arg) nni_msg_free(dup); } } - nni_mtx_unlock(mx); + nni_mtx_unlock(&psock->mtx); if (lpipe == NULL) { nni_msg_free(msg); @@ -394,7 +417,6 @@ static nni_proto_pipe_ops nni_bus_pipe_ops = { .pipe_init = nni_bus_pipe_init, .pipe_fini = nni_bus_pipe_fini, .pipe_start = nni_bus_pipe_start, - .pipe_stop = nni_bus_pipe_stop, }; static nni_proto_sock_ops nni_bus_sock_ops = { diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index a2b97fe0..c2d4fa0d 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -32,6 +32,7 @@ struct nni_pair_sock { nni_msgq * uwq; nni_msgq * urq; int raw; + nni_mtx mtx; }; // An nni_pair_pipe is our per-pipe protocol private structure. We keep @@ -47,16 +48,23 @@ struct nni_pair_pipe { nni_aio aio_putq; int busy; int closed; + nni_mtx mtx; + int refcnt; }; static int nni_pair_sock_init(void **sp, nni_sock *nsock) { nni_pair_sock *psock; + int rv; if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&psock->mtx)) != 0) { + NNI_FREE_STRUCT(psock); + return (rv); + } psock->nsock = nsock; psock->ppipe = NULL; psock->raw = 0; @@ -73,6 +81,8 @@ nni_pair_sock_fini(void *arg) nni_pair_sock *psock = arg; if (psock != NULL) { + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); } } @@ -87,31 +97,33 @@ nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock) if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); } - + if ((rv = nni_mtx_init(&ppipe->mtx)) != 0) { + goto fail; + } rv = nni_aio_init(&ppipe->aio_send, nni_pair_send_cb, ppipe); if (rv != 0) { - nni_pair_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_recv, nni_pair_recv_cb, ppipe); if (rv != 0) { - nni_pair_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_getq, nni_pair_getq_cb, ppipe); if (rv != 0) { - nni_pair_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_putq, nni_pair_putq_cb, ppipe); if (rv != 0) { - nni_pair_pipe_fini(ppipe); - return (rv); + goto fail; } ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); + +fail: + nni_pair_pipe_fini(ppipe); + return (rv); } @@ -125,6 +137,7 @@ nni_pair_pipe_fini(void *arg) nni_aio_fini(&ppipe->aio_recv); nni_aio_fini(&ppipe->aio_putq); nni_aio_fini(&ppipe->aio_getq); + nni_mtx_fini(&ppipe->mtx); NNI_FREE_STRUCT(ppipe); } @@ -135,16 +148,21 @@ nni_pair_pipe_start(void *arg) nni_pair_pipe *ppipe = arg; nni_pair_sock *psock = ppipe->psock; + nni_mtx_lock(&psock->mtx); if (psock->ppipe != NULL) { + nni_mtx_unlock(&psock->mtx); return (NNG_EBUSY); // Already have a peer, denied. } psock->ppipe = ppipe; + nni_mtx_unlock(&psock->mtx); + + nni_mtx_lock(&ppipe->mtx); + ppipe->refcnt = 2; + nni_mtx_unlock(&ppipe->mtx); // Schedule a getq on the upper, and a read from the pipe. // Each of these also sets up another hold on the pipe itself. - nni_pipe_hold(ppipe->npipe); nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq); - nni_pipe_hold(ppipe->npipe); nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); return (0); @@ -152,17 +170,31 @@ nni_pair_pipe_start(void *arg) static void -nni_pair_pipe_stop(void *arg) +nni_pair_pipe_stop(nni_pair_pipe *ppipe) { - nni_pair_pipe *ppipe = arg; nni_pair_sock *psock = ppipe->psock; + int refcnt; - nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq); - nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); + nni_mtx_lock(&psock->mtx); if (psock->ppipe == ppipe) { psock->ppipe = NULL; } + nni_mtx_unlock(&psock->mtx); + + // These operations are idempotent. + nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq); + nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); + + nni_mtx_lock(&ppipe->mtx); + NNI_ASSERT(ppipe->refcnt > 0); + ppipe->refcnt--; + refcnt = ppipe->refcnt; + nni_mtx_unlock(&ppipe->mtx); + + if (refcnt == 0) { + nni_pipe_remove(ppipe->npipe); + } } @@ -173,8 +205,7 @@ nni_pair_recv_cb(void *arg) nni_pair_sock *psock = ppipe->psock; if (nni_aio_result(&ppipe->aio_recv) != 0) { - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_pair_pipe_stop(ppipe); return; } @@ -192,8 +223,7 @@ nni_pair_putq_cb(void *arg) if (nni_aio_result(&ppipe->aio_putq) != 0) { nni_msg_free(ppipe->aio_putq.a_msg); ppipe->aio_putq.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_pair_pipe_stop(ppipe); return; } nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); @@ -208,8 +238,7 @@ nni_pair_getq_cb(void *arg) nni_msg *msg; if (nni_aio_result(&ppipe->aio_getq) != 0) { - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_pair_pipe_stop(ppipe); return; } @@ -228,8 +257,7 @@ nni_pair_send_cb(void *arg) if (nni_aio_result(&ppipe->aio_send) != 0) { nni_msg_free(ppipe->aio_send.a_msg); ppipe->aio_send.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_pair_pipe_stop(ppipe); return; } @@ -278,7 +306,6 @@ static nni_proto_pipe_ops nni_pair_pipe_ops = { .pipe_init = nni_pair_pipe_init, .pipe_fini = nni_pair_pipe_fini, .pipe_start = nni_pair_pipe_start, - .pipe_stop = nni_pair_pipe_stop, }; static nni_proto_sock_ops nni_pair_sock_ops = { diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index eb14be81..eb66bc21 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -107,7 +107,6 @@ nni_pull_pipe_start(void *arg) nni_pull_pipe *pp = arg; // Start the pending pull... - nni_pipe_hold(pp->pipe); nni_pull_recv(pp); return (0); @@ -115,12 +114,11 @@ nni_pull_pipe_start(void *arg) static void -nni_pull_pipe_stop(void *arg) +nni_pull_pipe_stop(nni_pull_pipe *pp) { - nni_pull_pipe *pp = arg; - // Cancel any pending sendup. nni_msgq_aio_cancel(pp->pull->urq, &pp->putq_aio); + nni_pipe_remove(pp->pipe); } @@ -133,8 +131,7 @@ nni_pull_recv_cb(void *arg) if (nni_aio_result(aio) != 0) { // Failed to get a message, probably the pipe is closed. - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pull_pipe_stop(pp); return; } @@ -157,8 +154,7 @@ nni_pull_putq_cb(void *arg) // we can do. Just close the pipe. nni_msg_free(aio->a_msg); aio->a_msg = NULL; - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pull_pipe_stop(pp); return; } @@ -172,8 +168,7 @@ nni_pull_recv(nni_pull_pipe *pp) { // Schedule the aio with callback. if (nni_pipe_aio_recv(pp->pipe, &pp->recv_aio) != 0) { - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pipe_remove(pp->pipe); } } @@ -230,7 +225,6 @@ static nni_proto_pipe_ops nni_pull_pipe_ops = { .pipe_init = nni_pull_pipe_init, .pipe_fini = nni_pull_pipe_fini, .pipe_start = nni_pull_pipe_start, - .pipe_stop = nni_pull_pipe_stop, }; static nni_proto_sock_ops nni_pull_sock_ops = { diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 9554b2be..e69ebdbf 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -39,6 +39,8 @@ struct nni_push_pipe { nni_aio aio_recv; nni_aio aio_send; nni_aio aio_getq; + int refcnt; + nni_mtx mtx; }; static int @@ -70,6 +72,19 @@ nni_push_sock_fini(void *arg) } +static void +nni_push_pipe_fini(void *arg) +{ + nni_push_pipe *pp = arg; + + nni_aio_fini(&pp->aio_recv); + nni_aio_fini(&pp->aio_send); + nni_aio_fini(&pp->aio_getq); + nni_mtx_fini(&pp->mtx); + NNI_FREE_STRUCT(pp); +} + + static int nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { @@ -80,19 +95,17 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) return (NNG_ENOMEM); } if ((rv = nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp)) != 0) { - NNI_FREE_STRUCT(pp); - return (rv); + goto fail; } if ((rv = nni_aio_init(&pp->aio_send, nni_push_send_cb, pp)) != 0) { - nni_aio_fini(&pp->aio_recv); - NNI_FREE_STRUCT(pp); + goto fail; return (rv); } if ((rv = nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp)) != 0) { - nni_aio_fini(&pp->aio_send); - nni_aio_fini(&pp->aio_recv); - NNI_FREE_STRUCT(pp); - return (rv); + goto fail; + } + if ((rv = nni_mtx_init(&pp->mtx)) != 0) { + goto fail; } NNI_LIST_NODE_INIT(&pp->node); @@ -100,18 +113,10 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) pp->push = psock; *ppp = pp; return (0); -} - -static void -nni_push_pipe_fini(void *arg) -{ - nni_push_pipe *pp = arg; - - nni_aio_fini(&pp->aio_recv); - nni_aio_fini(&pp->aio_send); - nni_aio_fini(&pp->aio_getq); - NNI_FREE_STRUCT(pp); +fail: + nni_push_pipe_fini(pp); + return (rv); } @@ -125,6 +130,10 @@ nni_push_pipe_start(void *arg) return (NNG_EPROTO); } + nni_mtx_lock(&pp->mtx); + pp->refcnt = 2; + nni_mtx_unlock(&pp->mtx); + // Schedule a receiver. This is mostly so that we can detect // a closed transport pipe. nni_pipe_hold(pp->pipe); @@ -139,12 +148,22 @@ nni_push_pipe_start(void *arg) static void -nni_push_pipe_stop(void *arg) +nni_push_pipe_stop(nni_push_pipe *pp) { - nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; + int refcnt; nni_msgq_aio_cancel(push->uwq, &pp->aio_getq); + + nni_mtx_lock(&pp->mtx); + NNI_ASSERT(pp->refcnt > 0); + pp->refcnt--; + refcnt = pp->refcnt; + nni_mtx_unlock(&pp->mtx); + + if (refcnt == 0) { + nni_pipe_remove(pp->pipe); + } } @@ -156,8 +175,7 @@ nni_push_recv_cb(void *arg) // We normally expect to receive an error. If a pipe actually // sends us data, we just discard it. if (nni_aio_result(&pp->aio_recv) != 0) { - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_push_pipe_stop(pp); return; } nni_msg_free(pp->aio_recv.a_msg); @@ -175,8 +193,7 @@ nni_push_send_cb(void *arg) if (nni_aio_result(&pp->aio_send) != 0) { nni_msg_free(pp->aio_send.a_msg); pp->aio_send.a_msg = NULL; - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_push_pipe_stop(pp); return; } @@ -192,8 +209,7 @@ nni_push_getq_cb(void *arg) if (nni_aio_result(aio) != 0) { // If the socket is closing, nothing else we can do. - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_push_pipe_stop(pp); return; } @@ -244,7 +260,6 @@ static nni_proto_pipe_ops nni_push_pipe_ops = { .pipe_init = nni_push_pipe_init, .pipe_fini = nni_push_pipe_fini, .pipe_start = nni_push_pipe_start, - .pipe_stop = nni_push_pipe_stop, }; static nni_proto_sock_ops nni_push_sock_ops = { diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 5ea16d2d..8ad9bb6d 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -34,6 +34,7 @@ struct nni_pub_sock { int raw; nni_aio aio_getq; nni_list pipes; + nni_mtx mtx; }; // An nni_pub_pipe is our per-pipe protocol private structure. @@ -45,6 +46,8 @@ struct nni_pub_pipe { nni_aio aio_send; nni_aio aio_recv; nni_list_node node; + int refcnt; + nni_mtx mtx; }; static int @@ -56,6 +59,10 @@ nni_pub_sock_init(void **pubp, nni_sock *sock) if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&pub->mtx)) != 0) { + nni_pub_sock_fini(pub); + return (rv); + } rv = nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub); if (rv != 0) { nni_pub_sock_fini(pub); @@ -79,6 +86,7 @@ nni_pub_sock_fini(void *arg) nni_pub_sock *pub = arg; nni_aio_fini(&pub->aio_getq); + nni_mtx_fini(&pub->mtx); NNI_FREE_STRUCT(pub); } @@ -92,6 +100,20 @@ nni_pub_sock_open(void *arg) } +static void +nni_pub_pipe_fini(void *arg) +{ + nni_pub_pipe *pp = arg; + + nni_msgq_fini(pp->sendq); + nni_aio_fini(&pp->aio_getq); + nni_aio_fini(&pp->aio_send); + nni_aio_fini(&pp->aio_recv); + nni_mtx_fini(&pp->mtx); + NNI_FREE_STRUCT(pp); +} + + static int nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { @@ -102,45 +124,33 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock) return (NNG_ENOMEM); } // XXX: consider making this depth tunable - if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) { - nni_pub_pipe_fini(pp); - return (rv); + if (((rv = nni_msgq_init(&pp->sendq, 16)) != 0) || + ((rv = nni_mtx_init(&pp->mtx)) != 0)) { + goto fail; } rv = nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp); if (rv != 0) { - nni_pub_pipe_fini(pp); - return (rv); + goto fail; } rv = nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp); if (rv != 0) { - nni_pub_pipe_fini(pp); - return (rv); + goto fail; } rv = nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp); if (rv != 0) { - nni_pub_pipe_fini(pp); - return (rv); + goto fail; } pp->pipe = pipe; pp->pub = psock; *ppp = pp; return (0); -} - - -static void -nni_pub_pipe_fini(void *arg) -{ - nni_pub_pipe *pp = arg; - nni_msgq_fini(pp->sendq); - nni_aio_fini(&pp->aio_getq); - nni_aio_fini(&pp->aio_send); - nni_aio_fini(&pp->aio_recv); - NNI_FREE_STRUCT(pp); +fail: + nni_pub_pipe_fini(pp); + return (rv); } @@ -155,26 +165,41 @@ nni_pub_pipe_start(void *arg) } nni_list_append(&pub->pipes, pp); + nni_mtx_lock(&pp->mtx); + pp->refcnt = 2; + nni_mtx_unlock(&pp->mtx); + // Start the receiver and the queue reader. - nni_pipe_hold(pp->pipe); nni_pipe_aio_recv(pp->pipe, &pp->aio_recv); - nni_pipe_hold(pp->pipe); nni_msgq_aio_get(pp->sendq, &pp->aio_getq); + return (0); } static void -nni_pub_pipe_stop(void *arg) +nni_pub_pipe_stop(nni_pub_pipe *pp) { - nni_pub_pipe *pp = arg; nni_pub_sock *pub = pp->pub; + int refcnt; + nni_mtx_lock(&pub->mtx); if (nni_list_active(&pub->pipes, pp)) { nni_list_remove(&pub->pipes, pp); nni_msgq_close(pp->sendq); } + nni_mtx_unlock(&pub->mtx); + + nni_mtx_lock(&pp->mtx); + NNI_ASSERT(pp->refcnt > 0); + pp->refcnt--; + refcnt = pp->refcnt; + nni_mtx_unlock(&pp->mtx); + + if (refcnt == 0) { + nni_pipe_remove(pp->pipe); + } } @@ -184,7 +209,6 @@ nni_pub_sock_getq_cb(void *arg) nni_pub_sock *pub = arg; nni_msgq *uwq = pub->uwq; nni_msg *msg, *dup; - nni_mtx *mx = nni_sock_mtx(pub->sock); nni_pub_pipe *pp; nni_pub_pipe *last; @@ -197,7 +221,7 @@ nni_pub_sock_getq_cb(void *arg) msg = pub->aio_getq.a_msg; pub->aio_getq.a_msg = NULL; - nni_mtx_lock(mx); + nni_mtx_lock(&pub->mtx); last = nni_list_last(&pub->pipes); NNI_LIST_FOREACH (&pub->pipes, pp) { if (pp != last) { @@ -212,7 +236,7 @@ nni_pub_sock_getq_cb(void *arg) nni_msg_free(dup); } } - nni_mtx_unlock(mx); + nni_mtx_unlock(&pub->mtx); if (last == NULL) { nni_msg_free(msg); @@ -228,8 +252,7 @@ nni_pub_pipe_recv_cb(void *arg) nni_pub_pipe *pp = arg; if (nni_aio_result(&pp->aio_recv) != 0) { - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pub_pipe_stop(pp); return; } @@ -245,8 +268,7 @@ nni_pub_pipe_getq_cb(void *arg) nni_pub_pipe *pp = arg; if (nni_aio_result(&pp->aio_getq) != 0) { - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pub_pipe_stop(pp); return; } @@ -265,8 +287,7 @@ nni_pub_pipe_send_cb(void *arg) if (nni_aio_result(&pp->aio_send) != 0) { nni_msg_free(pp->aio_send.a_msg); pp->aio_send.a_msg = NULL; - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pub_pipe_stop(pp); return; } @@ -315,7 +336,6 @@ static nni_proto_pipe_ops nni_pub_pipe_ops = { .pipe_init = nni_pub_pipe_init, .pipe_fini = nni_pub_pipe_fini, .pipe_start = nni_pub_pipe_start, - .pipe_stop = nni_pub_pipe_stop, }; nni_proto_sock_ops nni_pub_sock_ops = { diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 8340da77..cab745b5 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -117,18 +117,16 @@ nni_sub_pipe_start(void *arg) { nni_sub_pipe *sp = arg; - nni_pipe_hold(sp->pipe); nni_pipe_aio_recv(sp->pipe, &sp->aio_recv); return (0); } static void -nni_sub_pipe_stop(void *arg) +nni_sub_pipe_stop(nni_sub_pipe *sp) { - nni_sub_pipe *sp = arg; - nni_msgq_aio_cancel(sp->sub->urq, &sp->aio_putq); + nni_pipe_remove(sp->pipe); } @@ -141,8 +139,7 @@ nni_sub_recv_cb(void *arg) nni_msg *msg; if (nni_aio_result(&sp->aio_recv) != 0) { - nni_pipe_close(sp->pipe); - nni_pipe_rele(sp->pipe); + nni_sub_pipe_stop(sp); return; } @@ -160,8 +157,7 @@ nni_sub_putq_cb(void *arg) if (nni_aio_result(&sp->aio_putq) != 0) { nni_msg_free(sp->aio_putq.a_msg); sp->aio_putq.a_msg = NULL; - nni_pipe_close(sp->pipe); - nni_pipe_rele(sp->pipe); + nni_sub_pipe_stop(sp); return; } @@ -339,7 +335,6 @@ static nni_proto_pipe_ops nni_sub_pipe_ops = { .pipe_init = nni_sub_pipe_init, .pipe_fini = nni_sub_pipe_fini, .pipe_start = nni_sub_pipe_start, - .pipe_stop = nni_sub_pipe_stop, }; static nni_proto_sock_ops nni_sub_sock_ops = { diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 822758ef..507edf66 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -38,6 +38,7 @@ struct nni_rep_sock { char * btrace; size_t btrace_len; nni_aio aio_getq; + nni_mtx mtx; }; // An nni_rep_pipe is our per-pipe protocol private structure. @@ -51,8 +52,25 @@ struct nni_rep_pipe { nni_aio aio_recv; nni_aio aio_putq; int running; + int refcnt; + nni_mtx mtx; }; +static void +nni_rep_sock_fini(void *arg) +{ + nni_rep_sock *rep = arg; + + nni_aio_fini(&rep->aio_getq); + nni_idhash_fini(&rep->pipes); + if (rep->btrace != NULL) { + nni_free(rep->btrace, rep->btrace_len); + } + nni_mtx_fini(&rep->mtx); + NNI_FREE_STRUCT(rep); +} + + static int nni_rep_sock_init(void **repp, nni_sock *sock) { @@ -67,15 +85,14 @@ nni_rep_sock_init(void **repp, nni_sock *sock) rep->raw = 0; rep->btrace = NULL; rep->btrace_len = 0; - if ((rv = nni_idhash_init(&rep->pipes)) != 0) { - NNI_FREE_STRUCT(rep); - return (rv); + if (((rv = nni_mtx_init(&rep->mtx)) != 0) || + ((rv = nni_idhash_init(&rep->pipes)) != 0)) { + goto fail; } rv = nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep); if (rv != 0) { - nni_idhash_fini(&rep->pipes); - return (rv); + goto fail; } rep->uwq = nni_sock_sendq(sock); @@ -85,6 +102,10 @@ nni_rep_sock_init(void **repp, nni_sock *sock) nni_sock_senderr(sock, NNG_ESTATE); return (0); + +fail: + nni_rep_sock_fini(rep); + return (rv); } @@ -106,20 +127,6 @@ nni_rep_sock_close(void *arg) } -static void -nni_rep_sock_fini(void *arg) -{ - nni_rep_sock *rep = arg; - - nni_aio_fini(&rep->aio_getq); - nni_idhash_fini(&rep->pipes); - if (rep->btrace != NULL) { - nni_free(rep->btrace, rep->btrace_len); - } - NNI_FREE_STRUCT(rep); -} - - static int nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { @@ -129,7 +136,8 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) { + if (((rv = nni_msgq_init(&rp->sendq, 2)) != 0) || + ((rv = nni_mtx_init(&rp->mtx)) != 0)) { goto fail; } if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) != 0) { @@ -165,6 +173,7 @@ nni_rep_pipe_fini(void *arg) nni_aio_fini(&rp->aio_send); nni_aio_fini(&rp->aio_recv); nni_aio_fini(&rp->aio_putq); + nni_mtx_fini(&rp->mtx); NNI_FREE_STRUCT(rp); } @@ -177,31 +186,53 @@ nni_rep_pipe_start(void *arg) int rv; rp->id = nni_pipe_id(rp->pipe); + + nni_mtx_lock(&rep->mtx); rv = nni_idhash_insert(&rep->pipes, rp->id, rp); + nni_mtx_unlock(&rep->mtx); if (rv != 0) { return (rv); } - nni_pipe_hold(rp->pipe); + nni_mtx_lock(&rp->mtx); + rp->refcnt = 2; + rp->running = 1; + nni_mtx_unlock(&rp->mtx); + nni_msgq_aio_get(rp->sendq, &rp->aio_getq); - nni_pipe_hold(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); - rp->running = 1; return (0); } static void -nni_rep_pipe_stop(void *arg) +nni_rep_pipe_stop(nni_rep_pipe *rp) { - nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; + int refcnt; + uint32_t id; + nni_mtx_lock(&rp->mtx); + NNI_ASSERT(rp->refcnt > 0); + rp->refcnt--; + refcnt = rp->refcnt; + id = rp->id; + rp->id = 0; if (rp->running) { rp->running = 0; nni_msgq_close(rp->sendq); nni_msgq_aio_cancel(rep->urq, &rp->aio_putq); - nni_idhash_remove(&rep->pipes, rp->id); + } + nni_mtx_unlock(&rp->mtx); + + if (id != 0) { + nni_mtx_lock(&rep->mtx); + nni_idhash_remove(&rep->pipes, id); + nni_mtx_unlock(&rep->mtx); + } + + if (refcnt == 0) { + nni_pipe_remove(rp->pipe); } } @@ -246,12 +277,12 @@ nni_rep_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - nni_sock_lock(rep->sock); + nni_mtx_lock(&rep->mtx); rv = nni_idhash_find(&rep->pipes, id, (void **) &rp); + nni_mtx_unlock(&rep->mtx); if (rv == 0) { rv = nni_msgq_tryput(rp->sendq, msg); } - nni_sock_unlock(rep->sock); if (rv != 0) { nni_msg_free(msg); } @@ -267,8 +298,7 @@ nni_rep_pipe_getq_cb(void *arg) nni_rep_pipe *rp = arg; if (nni_aio_result(&rp->aio_getq) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -287,8 +317,7 @@ nni_rep_pipe_send_cb(void *arg) if (nni_aio_result(&rp->aio_send) != 0) { nni_msg_free(rp->aio_send.a_msg); rp->aio_send.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -308,8 +337,7 @@ nni_rep_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(&rp->aio_recv) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -371,8 +399,7 @@ nni_rep_pipe_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); rp->aio_putq.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -494,7 +521,6 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = { .pipe_init = nni_rep_pipe_init, .pipe_fini = nni_rep_pipe_fini, .pipe_start = nni_rep_pipe_start, - .pipe_stop = nni_rep_pipe_stop, }; static nni_proto_sock_ops nni_rep_sock_ops = { diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 8268ecd6..519b224e 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -44,6 +44,7 @@ struct nni_req_sock { uint32_t nextid; // next id uint8_t reqid[4]; // outstanding request ID (big endian) + nni_mtx mtx; }; // An nni_req_pipe is our per-pipe protocol private structure. @@ -57,6 +58,8 @@ struct nni_req_pipe { nni_aio aio_recv; nni_aio aio_putq; int running; + int refcnt; + nni_mtx mtx; }; static void nni_req_resender(void *); @@ -75,6 +78,11 @@ nni_req_sock_init(void **reqp, nni_sock *sock) if ((req = NNI_ALLOC_STRUCT(req)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&req->mtx)) != 0) { + NNI_FREE_STRUCT(req); + return (rv); + } + NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node); NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node); nni_timer_init(&req->timer, nni_req_timeout, req); @@ -114,6 +122,7 @@ nni_req_sock_fini(void *arg) if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); } + nni_mtx_fini(&req->mtx); NNI_FREE_STRUCT(req); } @@ -127,6 +136,9 @@ nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&rp->mtx)) != 0) { + goto failed; + } if ((rv = nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp)) != 0) { goto failed; } @@ -168,6 +180,7 @@ nni_req_pipe_fini(void *arg) nni_aio_fini(&rp->aio_recv); nni_aio_fini(&rp->aio_sendcooked); nni_aio_fini(&rp->aio_sendraw); + nni_mtx_fini(&rp->mtx); NNI_FREE_STRUCT(rp); } } @@ -182,45 +195,61 @@ nni_req_pipe_start(void *arg) if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) { return (NNG_EPROTO); } + nni_mtx_lock(&req->mtx); nni_list_append(&req->readypipes, rp); if (req->wantw) { nni_req_resend(req); } + nni_mtx_unlock(&req->mtx); + + nni_mtx_lock(&rp->mtx); + rp->refcnt = 2; + rp->running = 1; + nni_mtx_unlock(&rp->mtx); - nni_pipe_hold(rp->pipe); nni_msgq_aio_get(req->uwq, &rp->aio_getq); - nni_pipe_hold(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); - rp->running = 1; return (0); } static void -nni_req_pipe_stop(void *arg) +nni_req_pipe_stop(nni_req_pipe *rp) { - nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; + int refcnt; + int running; - if (!rp->running) { - return; - } + nni_mtx_lock(&rp->mtx); + running = rp->running; rp->running = 0; + NNI_ASSERT(rp->refcnt > 0); + rp->refcnt--; + refcnt = rp->refcnt; + nni_mtx_unlock(&rp->mtx); + + if (running) { + nni_mtx_lock(&req->mtx); + // This removes the node from either busypipes or readypipes. + // It doesn't much matter which. + nni_list_remove(&req->readypipes, rp); - // This removes the node from either busypipes or readypipes. - // It doesn't much matter which. - nni_list_remove(&req->readypipes, rp); - - if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { - // we are removing the pipe we sent the last request on... - // schedule immediate resend. - req->resend = NNI_TIME_ZERO; - req->wantw = 1; - nni_req_resend(req); + if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { + // removing the pipe we sent the last request on... + // schedule immediate resend. + req->resend = NNI_TIME_ZERO; + req->wantw = 1; + nni_req_resend(req); + } + nni_mtx_unlock(&req->mtx); } nni_msgq_aio_cancel(req->uwq, &rp->aio_getq); nni_msgq_aio_cancel(req->urq, &rp->aio_putq); + + if (refcnt == 0) { + nni_pipe_remove(rp->pipe); + } } @@ -293,8 +322,7 @@ nni_req_getq_cb(void *arg) // exception: we wind up here in error state when the uwq is closed.) if (nni_aio_result(&rp->aio_getq) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -315,8 +343,7 @@ nni_req_sendraw_cb(void *arg) if (nni_aio_result(&rp->aio_sendraw) != 0) { nni_msg_free(rp->aio_sendraw.a_msg); rp->aio_sendraw.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -338,8 +365,7 @@ nni_req_sendcooked_cb(void *arg) // means no new asynchronous traffic can occur here. nni_msg_free(rp->aio_sendcooked.a_msg); rp->aio_sendcooked.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -347,12 +373,12 @@ nni_req_sendcooked_cb(void *arg) // reinsert ourselves in the ready list, and possibly schedule // a resend. - nni_mtx_lock(mx); + nni_mtx_lock(&req->mtx); nni_list_remove(&req->busypipes, rp); nni_list_append(&req->readypipes, rp); nni_req_resend(req); - nni_mtx_unlock(mx); + nni_mtx_unlock(&req->mtx); } @@ -363,8 +389,7 @@ nni_req_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } rp->aio_putq.a_msg = NULL; @@ -380,8 +405,7 @@ nni_req_recv_cb(void *arg) nni_msg *msg; if (nni_aio_result(&rp->aio_recv) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -411,8 +435,7 @@ nni_req_recv_cb(void *arg) malformed: nni_msg_free(msg); - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); } @@ -420,14 +443,13 @@ static void nni_req_timeout(void *arg) { nni_req_sock *req = arg; - nni_mtx *mx = nni_sock_mtx(req->sock); - nni_mtx_lock(mx); + nni_mtx_lock(&req->mtx); if (req->reqmsg != NULL) { req->wantw = 1; nni_req_resend(req); } - nni_mtx_unlock(mx); + nni_mtx_unlock(&req->mtx); } @@ -435,7 +457,6 @@ static void nni_req_resend(nni_req_sock *req) { nni_req_pipe *rp; - nni_mtx *mx = nni_sock_mtx(req->sock); nni_msg *msg; int i; @@ -512,6 +533,10 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) return (NULL); } + // NB: The socket lock is also held, so this is always self-serialized. + // But we have to serialize against other async callbacks. + nni_mtx_lock(&req->mtx); + // If another message is there, this cancels it. if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); @@ -525,6 +550,7 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) req->wantw = 1; nni_req_resend(req); + nni_mtx_unlock(&req->mtx); // Clear the error condition. nni_sock_recverr(req->sock, 0); @@ -537,6 +563,7 @@ static nni_msg * nni_req_sock_rfilter(void *arg, nni_msg *msg) { nni_req_sock *req = arg; + nni_msg *rmsg; if (req->raw) { // Pass it unmolested @@ -548,21 +575,28 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg) return (NULL); } - if (req->reqmsg == NULL) { + nni_mtx_lock(&req->mtx); + + if ((rmsg = req->reqmsg) == NULL) { // We had no outstanding request. + nni_mtx_unlock(&req->mtx); nni_msg_free(msg); return (NULL); } if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) { // Wrong request id + nni_mtx_unlock(&req->mtx); nni_msg_free(msg); return (NULL); } - nni_sock_recverr(req->sock, NNG_ESTATE); - nni_msg_free(req->reqmsg); req->reqmsg = NULL; req->pendpipe = NULL; + nni_mtx_unlock(&req->mtx); + + nni_sock_recverr(req->sock, NNG_ESTATE); + nni_msg_free(rmsg); + return (msg); } @@ -573,7 +607,6 @@ static nni_proto_pipe_ops nni_req_pipe_ops = { .pipe_init = nni_req_pipe_init, .pipe_fini = nni_req_pipe_fini, .pipe_start = nni_req_pipe_start, - .pipe_stop = nni_req_pipe_stop, }; static nni_proto_sock_ops nni_req_sock_ops = { diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 71220678..33360ab5 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -37,6 +37,7 @@ struct nni_resp_sock { char * btrace; size_t btrace_len; nni_aio aio_getq; + nni_mtx mtx; }; // An nni_resp_pipe is our per-pipe protocol private structure. @@ -50,8 +51,28 @@ struct nni_resp_pipe { nni_aio aio_send; nni_aio aio_recv; int running; + int refcnt; + nni_mtx mtx; }; + +static void +nni_resp_sock_fini(void *arg) +{ + nni_resp_sock *psock = arg; + + if (psock != NULL) { + nni_aio_fini(&psock->aio_getq); + nni_idhash_fini(&psock->pipes); + if (psock->btrace != NULL) { + nni_free(psock->btrace, psock->btrace_len); + } + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); + } +} + + static int nni_resp_sock_init(void **pp, nni_sock *nsock) { @@ -68,36 +89,22 @@ nni_resp_sock_init(void **pp, nni_sock *nsock) psock->btrace_len = 0; psock->urq = nni_sock_recvq(nsock); psock->uwq = nni_sock_sendq(nsock); - if ((rv = nni_idhash_init(&psock->pipes)) != 0) { - NNI_FREE_STRUCT(psock); - return (rv); + if (((rv = nni_idhash_init(&psock->pipes)) != 0) || + ((rv = nni_mtx_init(&psock->mtx)) != 0)) { + goto fail; } rv = nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock); if (rv != 0) { - nni_idhash_fini(&psock->pipes); - NNI_FREE_STRUCT(psock); - return (rv); + goto fail; } *pp = psock; nni_sock_senderr(nsock, NNG_ESTATE); return (0); -} - -static void -nni_resp_sock_fini(void *arg) -{ - nni_resp_sock *psock = arg; - - if (psock != NULL) { - nni_aio_fini(&psock->aio_getq); - nni_idhash_fini(&psock->pipes); - if (psock->btrace != NULL) { - nni_free(psock->btrace, psock->btrace_len); - } - NNI_FREE_STRUCT(psock); - } +fail: + nni_resp_sock_fini(psock); + return (rv); } @@ -128,35 +135,35 @@ nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock) if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) { - nni_resp_pipe_fini(ppipe); - return (rv); + if (((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) || + ((rv = nni_mtx_init(&ppipe->mtx)) != 0)) { + goto fail; } rv = nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe); if (rv != 0) { - nni_resp_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe); if (rv != 0) { - nni_resp_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe); if (rv != 0) { - nni_resp_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe); if (rv != 0) { - nni_resp_pipe_fini(ppipe); - return (rv); + goto fail; } ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); + +fail: + nni_resp_pipe_fini(ppipe); + return (rv); } @@ -170,6 +177,7 @@ nni_resp_pipe_fini(void *arg) nni_aio_fini(&ppipe->aio_getq); nni_aio_fini(&ppipe->aio_send); nni_aio_fini(&ppipe->aio_recv); + nni_mtx_fini(&ppipe->mtx); NNI_FREE_STRUCT(ppipe); } @@ -182,34 +190,56 @@ nni_resp_pipe_start(void *arg) int rv; ppipe->id = nni_pipe_id(ppipe->npipe); + + nni_mtx_lock(&psock->mtx); rv = nni_idhash_insert(&psock->pipes, ppipe->id, ppipe); + nni_mtx_unlock(&psock->mtx); if (rv != 0) { return (rv); } - nni_pipe_hold(ppipe->npipe); - nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); + nni_mtx_lock(&ppipe->mtx); + ppipe->refcnt = 2; + ppipe->running = 1; + nni_mtx_unlock(&ppipe->mtx); - nni_pipe_hold(ppipe->npipe); + nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq); - ppipe->running = 1; + return (rv); } static void -nni_resp_pipe_stop(void *arg) +nni_resp_pipe_stop(nni_resp_pipe *ppipe) { - nni_resp_pipe *ppipe = arg; nni_resp_sock *psock = ppipe->psock; + int refcnt; + int running; - if (ppipe->running) { - ppipe->running = 0; + nni_mtx_lock(&psock->mtx); + if (ppipe->id != 0) { nni_idhash_remove(&psock->pipes, ppipe->id); + ppipe->id = 0; + } + nni_mtx_unlock(&psock->mtx); + + nni_mtx_lock(&ppipe->mtx); + NNI_ASSERT(ppipe->refcnt > 0); + ppipe->refcnt--; + refcnt = ppipe->refcnt; + running = ppipe->running; + ppipe->running = 0; + nni_mtx_unlock(&ppipe->mtx); + + if (running) { nni_msgq_close(ppipe->sendq); nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); } + if (refcnt == 0) { + nni_pipe_remove(ppipe->npipe); + } } @@ -246,19 +276,19 @@ nni_resp_sock_getq_cb(void *arg) NNI_GET32(header, id); nni_msg_trim_header(msg, 4); - nni_sock_lock(psock->nsock); - if (nni_idhash_find(&psock->pipes, id, (void **) &ppipe) != 0) { - nni_sock_unlock(psock->nsock); - nni_msg_free(msg); - nni_msgq_aio_get(psock->uwq, &psock->aio_getq); - return; - } + nni_mtx_lock(&psock->mtx); + rv = nni_idhash_find(&psock->pipes, id, (void **) &ppipe); - // Non-blocking put. - if (nni_msgq_tryput(ppipe->sendq, msg) != 0) { + if (rv != 0) { nni_msg_free(msg); + nni_msgq_aio_get(psock->uwq, &psock->aio_getq); + } else { + // Non-blocking put. + if (nni_msgq_tryput(ppipe->sendq, msg) != 0) { + nni_msg_free(msg); + } } - nni_sock_unlock(psock->nsock); + nni_mtx_unlock(&psock->mtx); } @@ -268,8 +298,7 @@ nni_resp_getq_cb(void *arg) nni_resp_pipe *ppipe = arg; if (nni_aio_result(&ppipe->aio_getq) != 0) { - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_resp_pipe_stop(ppipe); return; } @@ -288,8 +317,7 @@ nni_resp_send_cb(void *arg) if (nni_aio_result(&ppipe->aio_send) != 0) { nni_msg_free(ppipe->aio_send.a_msg); ppipe->aio_send.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_resp_pipe_stop(ppipe); return; } @@ -358,8 +386,7 @@ nni_resp_recv_cb(void *arg) return; error: - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_resp_pipe_stop(ppipe); } @@ -371,8 +398,7 @@ nni_resp_putq_cb(void *arg) if (nni_aio_result(&ppipe->aio_putq) != 0) { nni_msg_free(ppipe->aio_putq.a_msg); ppipe->aio_putq.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_resp_pipe_stop(ppipe); } nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); @@ -499,7 +525,6 @@ static nni_proto_pipe_ops nni_resp_pipe_ops = { .pipe_init = nni_resp_pipe_init, .pipe_fini = nni_resp_pipe_fini, .pipe_start = nni_resp_pipe_start, - .pipe_stop = nni_resp_pipe_stop, }; static nni_proto_sock_ops nni_resp_sock_ops = { diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index f72532de..962371c1 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -39,6 +39,7 @@ struct nni_surv_sock { nni_timer_node timer; nni_msgq * uwq; nni_msgq * urq; + nni_mtx mtx; }; // An nni_surv_pipe is our per-pipe protocol private structure. @@ -52,8 +53,21 @@ struct nni_surv_pipe { nni_aio aio_send; nni_aio aio_recv; int running; + int refcnt; + nni_mtx mtx; }; +static void +nni_surv_sock_fini(void *arg) +{ + nni_surv_sock *psock = arg; + + nni_aio_fini(&psock->aio_getq); + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); +} + + static int nni_surv_sock_init(void **sp, nni_sock *nsock) { @@ -63,10 +77,12 @@ nni_surv_sock_init(void **sp, nni_sock *nsock) if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&psock->mtx)) != 0) { + goto fail; + } rv = nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock); if (rv != 0) { - NNI_FREE_STRUCT(psock); - return (rv); + goto fail; } NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node); nni_timer_init(&psock->timer, nni_surv_timeout, psock); @@ -82,6 +98,10 @@ nni_surv_sock_init(void **sp, nni_sock *nsock) *sp = psock; nni_sock_recverr(nsock, NNG_ESTATE); return (0); + +fail: + nni_surv_sock_fini(psock); + return (rv); } @@ -105,16 +125,6 @@ nni_surv_sock_close(void *arg) static void -nni_surv_sock_fini(void *arg) -{ - nni_surv_sock *psock = arg; - - nni_aio_fini(&psock->aio_getq); - NNI_FREE_STRUCT(psock); -} - - -static void nni_surv_pipe_fini(void *arg) { nni_surv_pipe *ppipe = arg; @@ -124,6 +134,7 @@ nni_surv_pipe_fini(void *arg) nni_aio_fini(&ppipe->aio_recv); nni_aio_fini(&ppipe->aio_putq); nni_msgq_fini(ppipe->sendq); + nni_mtx_fini(&ppipe->mtx); NNI_FREE_STRUCT(ppipe); } @@ -138,7 +149,8 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (NNG_ENOMEM); } // This depth could be tunable. - if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { + if (((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) || + ((rv = nni_mtx_init(&ppipe->mtx)) != 0)) { goto failed; } rv = nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe); @@ -174,29 +186,46 @@ nni_surv_pipe_start(void *arg) nni_surv_pipe *ppipe = arg; nni_surv_sock *psock = ppipe->psock; + nni_mtx_lock(&psock->mtx); nni_list_append(&psock->pipes, ppipe); + nni_mtx_unlock(&psock->mtx); - nni_pipe_hold(ppipe->npipe); - nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq); + nni_mtx_lock(&ppipe->mtx); + ppipe->refcnt = 2; + ppipe->running = 1; + nni_mtx_unlock(&ppipe->mtx); - nni_pipe_hold(ppipe->npipe); + nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq); nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); - ppipe->running = 1; return (0); } static void -nni_surv_pipe_stop(void *arg) +nni_surv_pipe_stop(nni_surv_pipe *ppipe) { - nni_surv_pipe *ppipe = arg; nni_surv_sock *psock = ppipe->psock; + int refcnt; - if (ppipe->running) { + nni_mtx_lock(&psock->mtx); + if (nni_list_active(&psock->pipes, ppipe)) { nni_list_remove(&psock->pipes, ppipe); + } + nni_mtx_unlock(&psock->mtx); + + nni_mtx_lock(&ppipe->mtx); + NNI_ASSERT(ppipe->refcnt > 0); + ppipe->refcnt--; + refcnt = ppipe->refcnt; + if (ppipe->running) { nni_msgq_close(ppipe->sendq); nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); } + nni_mtx_unlock(&ppipe->mtx); + + if (refcnt == 0) { + nni_pipe_remove(ppipe->npipe); + } } @@ -206,8 +235,7 @@ nni_surv_getq_cb(void *arg) nni_surv_pipe *ppipe = arg; if (nni_aio_result(&ppipe->aio_getq) != 0) { - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_surv_pipe_stop(ppipe); return; } @@ -226,8 +254,7 @@ nni_surv_send_cb(void *arg) if (nni_aio_result(&ppipe->aio_send) != 0) { nni_msg_free(ppipe->aio_send.a_msg); ppipe->aio_send.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_surv_pipe_stop(ppipe); return; } @@ -243,8 +270,7 @@ nni_surv_putq_cb(void *arg) if (nni_aio_result(&ppipe->aio_putq) != 0) { nni_msg_free(ppipe->aio_putq.a_msg); ppipe->aio_putq.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_surv_pipe_stop(ppipe); return; } @@ -287,8 +313,7 @@ nni_surv_recv_cb(void *arg) return; failed: - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_surv_pipe_stop(ppipe); } @@ -358,7 +383,7 @@ nni_surv_sock_getq_cb(void *arg) msg = psock->aio_getq.a_msg; psock->aio_getq.a_msg = NULL; - nni_sock_lock(psock->nsock); + nni_mtx_lock(&psock->mtx); last = nni_list_last(&psock->pipes); NNI_LIST_FOREACH (&psock->pipes, ppipe) { if (ppipe != last) { @@ -372,7 +397,7 @@ nni_surv_sock_getq_cb(void *arg) nni_msg_free(dup); } } - nni_sock_unlock(psock->nsock); + nni_mtx_unlock(&psock->mtx); if (last == NULL) { // If there were no pipes to send on, just toss the message. @@ -465,7 +490,6 @@ static nni_proto_pipe_ops nni_surv_pipe_ops = { .pipe_init = nni_surv_pipe_init, .pipe_fini = nni_surv_pipe_fini, .pipe_start = nni_surv_pipe_start, - .pipe_stop = nni_surv_pipe_stop, }; static nni_proto_sock_ops nni_surv_sock_ops = { |
