diff options
| -rw-r--r-- | src/core/endpt.c | 129 | ||||
| -rw-r--r-- | src/core/endpt.h | 6 | ||||
| -rw-r--r-- | src/core/pipe.c | 45 | ||||
| -rw-r--r-- | src/core/pipe.h | 19 | ||||
| -rw-r--r-- | src/core/socket.c | 98 | ||||
| -rw-r--r-- | src/core/socket.h | 19 | ||||
| -rw-r--r-- | src/protocol/bus/bus.c | 158 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 75 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 16 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 71 | ||||
| -rw-r--r-- | src/protocol/pubsub/pub.c | 90 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 13 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 100 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 113 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 143 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 86 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 8 |
17 files changed, 714 insertions, 475 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index efd3eefb..00690cd4 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -19,13 +19,54 @@ static nni_objhash *nni_eps = NULL; static void *nni_ep_ctor(uint32_t); static void nni_ep_dtor(void *); +// Because we can't reap threads from themselves, we need to have a separate +// task to reap endpoints. We use one global task to do this, and we just +// add to the reap list as needed. +static nni_taskq_ent nni_ep_reap_tqe; +static nni_mtx nni_ep_reap_mx; +static nni_list nni_ep_reap_list; + +static void +nni_ep_reaper(void *arg) +{ + nni_ep *ep; + + NNI_ARG_UNUSED(arg); + + nni_mtx_lock(&nni_ep_reap_mx); + while ((ep = nni_list_first(&nni_ep_reap_list)) != NULL) { + nni_list_remove(&nni_ep_reap_list, ep); + + nni_mtx_unlock(&nni_ep_reap_mx); + nni_thr_fini(&ep->ep_thr); + nni_objhash_unref(nni_eps, ep->ep_id); + nni_mtx_lock(&nni_ep_reap_mx); + + continue; + } + + nni_mtx_unlock(&nni_ep_reap_mx); +} + + int nni_ep_sys_init(void) { int rv; rv = nni_objhash_init(&nni_eps, nni_ep_ctor, nni_ep_dtor); + if (rv != 0) { + return (rv); + } + rv = nni_mtx_init(&nni_ep_reap_mx); + if (rv != 0) { + nni_objhash_fini(nni_eps); + nni_eps = NULL; + return (rv); + } + nni_ep_list_init(&nni_ep_reap_list); + nni_taskq_ent_init(&nni_ep_reap_tqe, nni_ep_reaper, NULL); return (rv); } @@ -33,6 +74,8 @@ nni_ep_sys_init(void) void nni_ep_sys_fini(void) { + nni_taskq_cancel(NULL, &nni_ep_reap_tqe); + nni_mtx_fini(&nni_ep_reap_mx); nni_objhash_fini(nni_eps); nni_eps = NULL; } @@ -129,13 +172,6 @@ nni_ep_dtor(void *ptr) { nni_ep *ep = ptr; - // If a thread is running, make sure it is stopped. - nni_thr_fini(&ep->ep_thr); - - if (ep->ep_sock != NULL) { - // This is idempotent; harmless if not already on the list. - nni_sock_rem_ep(ep->ep_sock, ep); - } if (ep->ep_data != NULL) { ep->ep_ops.ep_fini(ep->ep_data); } @@ -181,7 +217,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) return (rv); } - if ((rv = nni_sock_add_ep(sock, ep)) != 0) { + if ((rv = nni_sock_ep_add(sock, ep)) != 0) { nni_objhash_unref(nni_eps, id); return (rv); } @@ -192,22 +228,50 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) void +nni_ep_stop(nni_ep *ep) +{ + nni_mtx_lock(&ep->ep_mtx); + if (ep->ep_closed == 0) { + ep->ep_closed = 1; + ep->ep_ops.ep_close(ep->ep_data); + } + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); +} + + +void nni_ep_close(nni_ep *ep) { nni_pipe *pipe; + + nni_ep_stop(ep); + nni_mtx_lock(&ep->ep_mtx); + NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { + nni_pipe_close(pipe); + } + nni_mtx_unlock(&ep->ep_mtx); +} + + +void +nni_ep_remove(nni_ep *ep) +{ + nni_pipe *pipe; nni_sock *sock = ep->ep_sock; + nni_ep_close(ep); + nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed == 0) { - ep->ep_closed = 1; - ep->ep_ops.ep_close(ep->ep_data); - if ((pipe = ep->ep_pipe) != NULL) { - pipe->p_ep = NULL; - ep->ep_pipe = NULL; - } - nni_cv_wake(&ep->ep_cv); + while (nni_list_first(&ep->ep_pipes) != NULL) { + nni_cv_wait(&ep->ep_cv); } nni_mtx_unlock(&ep->ep_mtx); + + nni_sock_ep_remove(sock, ep); + + nni_thr_fini(&ep->ep_thr); + nni_objhash_unref(nni_eps, ep->ep_id); } @@ -222,29 +286,27 @@ nni_ep_connect(nni_ep *ep) } rv = ep->ep_ops.ep_connect(ep->ep_data, pipe->p_tran_data); if (rv != 0) { - nni_pipe_destroy(pipe); + nni_pipe_remove(pipe); return (rv); } if ((rv = nni_pipe_start(pipe)) != 0) { - nni_pipe_close(pipe); + nni_pipe_remove(pipe); return (rv); } ep->ep_pipe = pipe; - pipe->p_ep = ep; return (0); } int -nni_ep_add_pipe(nni_ep *ep, nni_pipe *pipe) +nni_ep_pipe_add(nni_ep *ep, nni_pipe *pipe) { - nni_ep_hold(ep); nni_mtx_lock(&ep->ep_mtx); if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); - nni_ep_rele(ep); return (NNG_ECLOSED); } + nni_list_append(&ep->ep_pipes, pipe); nni_mtx_unlock(&ep->ep_mtx); @@ -253,16 +315,19 @@ nni_ep_add_pipe(nni_ep *ep, nni_pipe *pipe) void -nni_ep_rem_pipe(nni_ep *ep, nni_pipe *pipe) +nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) { - nni_mtx_lock(&ep->ep_mtx); - if (!nni_list_active(&ep->ep_pipes, pipe)) { + if ((ep != NULL) && (nni_list_active(&ep->ep_pipes, pipe))) { + nni_mtx_lock(&ep->ep_mtx); + nni_list_remove(&ep->ep_pipes, pipe); + + if (ep->ep_pipe == pipe) { + ep->ep_pipe = NULL; + } + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); - return; } - nni_list_remove(&ep->ep_pipes, pipe); - nni_mtx_unlock(&ep->ep_mtx); - nni_ep_rele(ep); } @@ -295,7 +360,7 @@ nni_dialer(void *arg) } if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); - break; + return; } nni_mtx_unlock(&ep->ep_mtx); @@ -382,11 +447,11 @@ nni_ep_accept(nni_ep *ep) } rv = ep->ep_ops.ep_accept(ep->ep_data, pipe->p_tran_data); if (rv != 0) { - nni_pipe_destroy(pipe); + nni_pipe_remove(pipe); return (rv); } if ((rv = nni_pipe_start(pipe)) != 0) { - nni_pipe_close(pipe); + nni_pipe_remove(pipe); return (rv); } return (0); diff --git a/src/core/endpt.h b/src/core/endpt.h index cfb332bb..3d353cdf 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -47,11 +47,13 @@ extern void nni_ep_hold(nni_ep *); extern void nni_ep_rele(nni_ep *); extern uint32_t nni_ep_id(nni_ep *); extern int nni_ep_create(nni_ep **, nni_sock *, const char *); +extern void nni_ep_stop(nni_ep *); extern void nni_ep_close(nni_ep *); +extern void nni_ep_remove(nni_ep *); extern int nni_ep_dial(nni_ep *, int); extern int nni_ep_listen(nni_ep *, int); extern void nni_ep_list_init(nni_list *); -extern int nni_ep_add_pipe(nni_ep *, nni_pipe *); -extern void nni_ep_rem_pipe(nni_ep *, nni_pipe *); +extern int nni_ep_pipe_add(nni_ep *, nni_pipe *); +extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *); #endif // CORE_ENDPT_H diff --git a/src/core/pipe.c b/src/core/pipe.c index 80b39171..5d774e6d 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -31,6 +31,7 @@ nni_pipe_ctor(uint32_t id) p->p_tran_data = NULL; p->p_proto_data = NULL; + p->p_proto_dtor = NULL; p->p_id = id; NNI_LIST_NODE_INIT(&p->p_sock_node); @@ -45,8 +46,9 @@ nni_pipe_dtor(void *ptr) { nni_pipe *p = ptr; - nni_sock_pipe_rem(p->p_sock, p); - + if (p->p_proto_dtor != NULL) { + p->p_proto_dtor(p->p_proto_data); + } if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } @@ -140,11 +142,23 @@ nni_pipe_close(nni_pipe *p) } nni_mtx_unlock(&p->p_mtx); +} - // Let the socket (and endpoint) know we have closed. - nni_sock_pipe_closed(sock, p); - nni_objhash_unref(nni_pipes, p->p_id); +// nni_pipe_remove is called by protocol implementations to indicate that +// they are finished using the pipe (it should be closed already), and the +// owning socket and endpoint should de-register it. +void +nni_pipe_remove(nni_pipe *p) +{ + // Make sure the pipe is closed, in case it wasn't already done. + nni_pipe_close(p); + + nni_ep_pipe_remove(p->p_ep, p); + nni_sock_pipe_remove(p->p_sock, p); + + // XXX: would be simpler to just do a destroy here + nni_pipe_rele(p); } @@ -167,6 +181,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) return (rv); } p->p_sock = sock; + p->p_ep = ep; // Make a copy of the transport ops. We can override entry points // and we avoid an extra dereference on hot code paths. @@ -178,8 +193,12 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) return (rv); } + if ((rv = nni_ep_pipe_add(ep, p)) != 0) { + nni_pipe_remove(p); + } if ((rv = nni_sock_pipe_add(sock, p)) != 0) { - nni_objhash_unref(nni_pipes, p->p_id); + nni_pipe_remove(p); + //nni_objhash_unref(nni_pipes, p->p_id); return (rv); } @@ -188,15 +207,6 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) } -void -nni_pipe_destroy(nni_pipe *p) -{ - NNI_ASSERT(p->p_refcnt == 0); - - nni_objhash_unref(nni_pipes, p->p_id); -} - - int nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) { @@ -221,7 +231,7 @@ nni_pipe_start(nni_pipe *p) NNI_ASSERT(p == scratch); if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) { - nni_pipe_close(p); + nni_pipe_remove(p); return (rv); } @@ -232,9 +242,10 @@ nni_pipe_start(nni_pipe *p) void -nni_pipe_set_proto_data(nni_pipe *p, void *data) +nni_pipe_set_proto_data(nni_pipe *p, void *data, nni_cb dtor) { p->p_proto_data = data; + p->p_proto_dtor = dtor; } diff --git a/src/core/pipe.h b/src/core/pipe.h index 379b1d8a..1f911480 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -23,13 +23,13 @@ struct nni_pipe { nni_tran_pipe p_tran_ops; void * p_tran_data; void * p_proto_data; + nni_cb p_proto_dtor; nni_list_node p_sock_node; nni_list_node p_ep_node; nni_sock * p_sock; nni_ep * p_ep; int p_reap; nni_mtx p_mtx; - int p_refcnt; }; extern int nni_pipe_sys_init(void); @@ -41,16 +41,24 @@ extern int nni_pipe_aio_send(nni_pipe *, nni_aio *); // Pipe operations that protocols use. extern uint32_t nni_pipe_id(nni_pipe *); + +// nni_pipe_close closes the underlying transport for the pipe. Further +// operations against will return NNG_ECLOSED. extern void nni_pipe_close(nni_pipe *); + extern void nni_pipe_hold(nni_pipe *); extern void nni_pipe_rele(nni_pipe *); +// nni_pipe_remove is called by the protocol when it is done with the socket. +// The pipe should already be closed; it will be unregistered and it's +// resources released back to the system. The protocol MUST not reference +// the pipe after this. +extern void nni_pipe_remove(nni_pipe *); + // Used only by the socket core - as we don't wish to expose the details // of the pipe structure outside of pipe.c. extern int nni_pipe_create(nni_pipe **, nni_ep *, nni_sock *, nni_tran *); -extern void nni_pipe_destroy(nni_pipe *); - extern uint16_t nni_pipe_proto(nni_pipe *); extern uint16_t nni_pipe_peer(nni_pipe *); extern int nni_pipe_start(nni_pipe *); @@ -58,8 +66,9 @@ extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep); // nni_pipe_set_proto_data sets the protocol private data. No locking is // performed, and this routine should only be called once per pipe at -// initialization. -extern void nni_pipe_set_proto_data(nni_pipe *, void *); +// initialization. The third argument is called to destroy the data, +// at termination. +extern void nni_pipe_set_proto_data(nni_pipe *, void *, nni_cb); // nni_pipe_get_proto_data gets the protocol private data set with the // nni_pipe_set_proto_data function. No locking is performed. diff --git a/src/core/socket.c b/src/core/socket.c index 257f7fa3..67b3f978 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -101,7 +101,7 @@ nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) sock->s_pipe_ops.pipe_fini(pdata); return (NNG_ECLOSED); } - nni_pipe_set_proto_data(pipe, pdata); + nni_pipe_set_proto_data(pipe, pdata, sock->s_pipe_ops.pipe_fini); nni_list_append(&sock->s_pipes, pipe); nni_mtx_unlock(&sock->s_mx); return (0); @@ -137,58 +137,18 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) void -nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe) +nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) { - nni_ep *ep; - void *pdata = nni_pipe_get_proto_data(pipe); - - nni_mtx_lock(&sock->s_mx); - - // NB: nni_list_remove doesn't really care *which* list the pipe - // is on, and so if the pipe is already on the idle list these - // two statements are effectively a no-op. - nni_list_remove(&sock->s_pipes, pipe); - if (nni_list_first(&sock->s_pipes) == NULL) { - nni_cv_wake(&sock->s_cv); - } - - sock->s_pipe_ops.pipe_stop(pdata); - - // Notify the endpoint that the pipe has closed. - if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { - ep->ep_pipe = NULL; - nni_cv_wake(&ep->ep_cv); - } - nni_mtx_unlock(&sock->s_mx); -} - - -void -nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe) -{ - nni_ep *ep; - void *pdata = nni_pipe_get_proto_data(pipe); + void *pdata; nni_mtx_lock(&sock->s_mx); - if (nni_list_active(&sock->s_pipes, pipe)) { nni_list_remove(&sock->s_pipes, pipe); + if (sock->s_closing) { + nni_cv_wake(&sock->s_cv); + } } - - if (pdata != NULL) { - sock->s_pipe_ops.pipe_fini(pdata); - } - - // XXX: Move this to a seperate ep-specific API. - // Notify the endpoint that the pipe has closed - if not already done. - if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { - ep->ep_pipe = NULL; - nni_cv_wake(&ep->ep_cv); - } - nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); - - // XXX release the hold on the pipe } @@ -556,12 +516,13 @@ nni_sock_shutdown(nni_sock *sock) linger = nni_clock() + sock->s_linger; } + // Stop the EPs. This prevents new connections from forming but + // but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_close(ep); + nni_ep_stop(ep); } nni_mtx_unlock(&sock->s_mx); - // We drain the upper write queue. This is just like closing it, // except that the protocol gets a chance to get the messages and // push them down to the transport. This operation can *block* @@ -588,23 +549,36 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // Stop all EPS. - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { - nni_list_remove(&sock->s_eps, ep); - nni_mtx_unlock(&sock->s_mx); + // For each ep, close it; this will also tell it to force any + // of its pipes to close. + NNI_LIST_FOREACH (&sock->s_eps, ep) { nni_ep_close(ep); - nni_ep_rele(ep); - nni_mtx_lock(&sock->s_mx); } // For each pipe, close the underlying transport. Also move it // to the idle list so we won't keep looping. - while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_mtx_unlock(&sock->s_mx); + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { nni_pipe_close(pipe); + } + + // Wait for the eps to be reaped. + while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + nni_list_remove(&sock->s_eps, ep); + + // This has to be done without the lock held, as the remove + // operation requires shutting down a thread which might be + // trying to acquire the socket lock. + nni_mtx_unlock(&sock->s_mx); + nni_ep_remove(ep); nni_mtx_lock(&sock->s_mx); } + // Wait for the pipes to be reaped (there should not be any because + // we have already reaped the EPs.) + while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { + nni_cv_wait(&sock->s_cv); + } + sock->s_sock_ops.sock_close(sock->s_data); nni_cv_wake(&sock->s_cv); @@ -620,14 +594,14 @@ nni_sock_shutdown(nni_sock *sock) } -// nni_sock_add_ep adds a newly created endpoint to the socket. The +// nni_sock_ep_add adds a newly created endpoint to the socket. The // caller must hold references on the sock and the ep, and not be holding // the socket lock. The ep acquires a reference against the sock, // which will be dropped later by nni_sock_rem_ep. The endpoint must not // already be associated with a socket. (Note, the ep holds the reference // on the socket, not the other way around.) int -nni_sock_add_ep(nni_sock *sock, nni_ep *ep) +nni_sock_ep_add(nni_sock *sock, nni_ep *ep) { int rv; @@ -643,7 +617,7 @@ nni_sock_add_ep(nni_sock *sock, nni_ep *ep) void -nni_sock_rem_ep(nni_sock *sock, nni_ep *ep) +nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) { // If we're not on the list, then nothing to do. Be idempotent. // Note that if the ep is not on a list, then we assume that we have @@ -832,8 +806,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) } if ((rv = nni_ep_dial(ep, flags)) != 0) { - nni_ep_close(ep); - nni_ep_rele(ep); + nni_ep_remove(ep); } else if (epp != NULL) { *epp = ep; } @@ -853,8 +826,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) } if ((rv = nni_ep_listen(ep, flags)) != 0) { - nni_ep_close(ep); - nni_ep_rele(ep); + nni_ep_remove(ep); } else if (epp != NULL) { *epp = ep; } diff --git a/src/core/socket.h b/src/core/socket.h index 68f05705..7d5e0f20 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -85,18 +85,19 @@ extern void nni_sock_unlock(nni_sock *); extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *); extern void nni_sock_unnotify(nni_sock *, nni_notify *); -extern int nni_sock_add_ep(nni_sock *, nni_ep *); -extern void nni_sock_rem_ep(nni_sock *, nni_ep *); +extern int nni_sock_ep_add(nni_sock *, nni_ep *); +extern void nni_sock_ep_remove(nni_sock *, nni_ep *); // nni_sock_pipe_add is called by the pipe to register the pipe with // with the socket. The pipe is added to the idle list. The protocol // private pipe data is initialized as well. extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); -// nni_sock_pipe_rem deregisters the pipe from the socket. The socket -// will block during close if there are registered pipes outstanding. -// This also frees any protocol private pipe data. -extern void nni_sock_pipe_rem(nni_sock *, nni_pipe *); +// nni_sock_pipe_remove is called by the pipe when the protocol is +// done with it. This is the sockets indication that it should be +// removed, and freed. The protocol MUST guarantee that the pipe is +// no longer in use when this function is called. +extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); // nni_sock_pipe_ready lets the socket know the pipe is ready for // business. This also calls the socket/protocol specific add function, @@ -104,12 +105,6 @@ extern void nni_sock_pipe_rem(nni_sock *, nni_pipe *); // on success. The reference count should be dropped by nni_sock_pipe_closed. extern int nni_sock_pipe_ready(nni_sock *, nni_pipe *); -// nni_sock_pipe_closed lets the socket know that the pipe is closed. -// This keeps the socket from trying to schedule traffic to it. It -// also lets the endpoint know about it, to possibly restart a dial -// operation. -extern void nni_sock_pipe_closed(nni_sock *, nni_pipe *); - // Set error codes for applications. These are only ever // called from the filter functions in protocols, and thus // already have the socket lock held. diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index ca24c32c..cb624a10 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -37,6 +37,7 @@ struct nni_bus_sock { int raw; nni_aio aio_getq; nni_list pipes; + nni_mtx mtx; }; // An nni_bus_pipe is our per-pipe protocol private structure. @@ -49,8 +50,24 @@ struct nni_bus_pipe { nni_aio aio_recv; nni_aio aio_send; nni_aio aio_putq; + nni_mtx mtx; + int refcnt; }; + +static void +nni_bus_sock_fini(void *arg) +{ + nni_bus_sock *psock = arg; + + if (psock != NULL) { + nni_aio_fini(&psock->aio_getq); + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); + } +} + + static int nni_bus_sock_init(void **sp, nni_sock *nsock) { @@ -60,38 +77,49 @@ nni_bus_sock_init(void **sp, nni_sock *nsock) if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } + NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node); + if ((rv = nni_mtx_init(&psock->mtx)) != 0) { + goto fail; + } rv = nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock); if (rv != 0) { - NNI_FREE_STRUCT(psock); - return (rv); + goto fail; } - NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node); psock->nsock = nsock; psock->raw = 0; *sp = psock; return (0); + +fail: + nni_bus_sock_fini(psock); + return (rv); } static void -nni_bus_sock_fini(void *arg) +nni_bus_sock_open(void *arg) { nni_bus_sock *psock = arg; - if (psock != NULL) { - nni_aio_fini(&psock->aio_getq); - NNI_FREE_STRUCT(psock); - } + nni_bus_sock_getq(psock); } static void -nni_bus_sock_open(void *arg) +nni_bus_pipe_fini(void *arg) { - nni_bus_sock *psock = arg; + nni_bus_pipe *ppipe = arg; - nni_bus_sock_getq(psock); + if (ppipe != NULL) { + nni_msgq_fini(ppipe->sendq); + nni_mtx_fini(&ppipe->mtx); + nni_aio_fini(&ppipe->aio_getq); + nni_aio_fini(&ppipe->aio_send); + nni_aio_fini(&ppipe->aio_recv); + nni_aio_fini(&ppipe->aio_putq); + NNI_FREE_STRUCT(ppipe); + } } @@ -105,62 +133,36 @@ nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (NNG_ENOMEM); } NNI_LIST_NODE_INIT(&ppipe->node); - // This depth could be tunable. - if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { - NNI_FREE_STRUCT(ppipe); - return (rv); + ppipe->refcnt = 0; + if (((rv = nni_mtx_init(&ppipe->mtx)) != 0) || + ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0)) { + goto fail; } rv = nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe); if (rv != 0) { - nni_msgq_fini(ppipe->sendq); - NNI_FREE_STRUCT(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe); if (rv != 0) { - nni_aio_fini(&ppipe->aio_getq); - nni_msgq_fini(ppipe->sendq); - NNI_FREE_STRUCT(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe); if (rv != 0) { - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_getq); - nni_msgq_fini(ppipe->sendq); - NNI_FREE_STRUCT(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe); if (rv != 0) { - nni_aio_fini(&ppipe->aio_recv); - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_getq); - nni_msgq_fini(ppipe->sendq); - NNI_FREE_STRUCT(ppipe); - return (rv); + goto fail; } ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); -} - - -static void -nni_bus_pipe_fini(void *arg) -{ - nni_bus_pipe *ppipe = arg; - if (ppipe != NULL) { - nni_msgq_fini(ppipe->sendq); - nni_aio_fini(&ppipe->aio_getq); - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_recv); - nni_aio_fini(&ppipe->aio_putq); - NNI_FREE_STRUCT(ppipe); - } +fail: + nni_bus_pipe_fini(ppipe); + return (rv); } @@ -170,29 +172,56 @@ nni_bus_pipe_start(void *arg) nni_bus_pipe *ppipe = arg; nni_bus_sock *psock = ppipe->psock; + nni_mtx_lock(&psock->mtx); nni_list_append(&psock->pipes, ppipe); + nni_mtx_unlock(&psock->mtx); + + // Mark the ppipe busy twice -- once for each of the oustanding + // asynchronous "threads" of operation. + nni_mtx_lock(&ppipe->mtx); + ppipe->refcnt = 2; + nni_mtx_unlock(&ppipe->mtx); - nni_pipe_hold(ppipe->npipe); nni_bus_pipe_recv(ppipe); - nni_pipe_hold(ppipe->npipe); nni_bus_pipe_getq(ppipe); return (0); } +// nni_bus_pipe_stop is called only internally when one of our handlers notices +// that the transport layer has closed. This allows us to stop all further +// actions. static void -nni_bus_pipe_stop(void *arg) +nni_bus_pipe_stop(nni_bus_pipe *ppipe) { - nni_bus_pipe *ppipe = arg; + int refcnt; nni_bus_sock *psock = ppipe->psock; - nni_sock *nsock = psock->nsock; + // As we are called only on error paths, shut down the underlying + // pipe transport. This should cause any other consumer to also get + // a suitable error (NNG_ECLOSED), so that we can shut down completely. + nni_pipe_close(ppipe->npipe); + + nni_mtx_lock(&ppipe->psock->mtx); if (nni_list_active(&psock->pipes, ppipe)) { nni_list_remove(&psock->pipes, ppipe); nni_msgq_close(ppipe->sendq); - nni_msgq_aio_cancel(nni_sock_recvq(nsock), &ppipe->aio_putq); + nni_msgq_aio_cancel(nni_sock_recvq(psock->nsock), + &ppipe->aio_putq); + } + nni_mtx_unlock(&ppipe->psock->mtx); + + nni_mtx_lock(&ppipe->mtx); + NNI_ASSERT(ppipe->refcnt > 0); + refcnt = --ppipe->refcnt; + nni_mtx_unlock(&ppipe->mtx); + + // If we are done with the pipe, let the system know so it can + // deregister it. + if (refcnt == 0) { + nni_pipe_remove(ppipe->npipe); } } @@ -204,8 +233,7 @@ nni_bus_pipe_getq_cb(void *arg) if (nni_aio_result(&ppipe->aio_getq) != 0) { // closed? - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_bus_pipe_stop(ppipe); return; } ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg; @@ -224,8 +252,7 @@ nni_bus_pipe_send_cb(void *arg) // closed? nni_msg_free(ppipe->aio_send.a_msg); ppipe->aio_send.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_bus_pipe_stop(ppipe); return; } @@ -242,8 +269,7 @@ nni_bus_pipe_recv_cb(void *arg) uint32_t id; if (nni_aio_result(&ppipe->aio_recv) != 0) { - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_bus_pipe_stop(ppipe); return; } msg = ppipe->aio_recv.a_msg; @@ -252,8 +278,7 @@ nni_bus_pipe_recv_cb(void *arg) if (nni_msg_prepend_header(msg, &id, 4) != 0) { // XXX: bump a nomemory stat nni_msg_free(msg); - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_bus_pipe_stop(ppipe); return; } @@ -270,8 +295,7 @@ nni_bus_pipe_putq_cb(void *arg) if (nni_aio_result(&ppipe->aio_putq) != 0) { nni_msg_free(ppipe->aio_putq.a_msg); ppipe->aio_putq.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_bus_pipe_stop(ppipe); return; } @@ -287,7 +311,6 @@ nni_bus_sock_getq_cb(void *arg) nni_bus_pipe *ppipe; nni_bus_pipe *lpipe; nni_msgq *uwq = nni_sock_sendq(psock->nsock); - nni_mtx *mx = nni_sock_mtx(psock->nsock); nni_msg *msg, *dup; uint32_t sender; @@ -308,7 +331,7 @@ nni_bus_sock_getq_cb(void *arg) sender = 0; } - nni_mtx_lock(mx); + nni_mtx_lock(&psock->mtx); lpipe = nni_list_last(&psock->pipes); NNI_LIST_FOREACH (&psock->pipes, ppipe) { if (nni_pipe_id(ppipe->npipe) == sender) { @@ -325,7 +348,7 @@ nni_bus_sock_getq_cb(void *arg) nni_msg_free(dup); } } - nni_mtx_unlock(mx); + nni_mtx_unlock(&psock->mtx); if (lpipe == NULL) { nni_msg_free(msg); @@ -394,7 +417,6 @@ static nni_proto_pipe_ops nni_bus_pipe_ops = { .pipe_init = nni_bus_pipe_init, .pipe_fini = nni_bus_pipe_fini, .pipe_start = nni_bus_pipe_start, - .pipe_stop = nni_bus_pipe_stop, }; static nni_proto_sock_ops nni_bus_sock_ops = { diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index a2b97fe0..c2d4fa0d 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -32,6 +32,7 @@ struct nni_pair_sock { nni_msgq * uwq; nni_msgq * urq; int raw; + nni_mtx mtx; }; // An nni_pair_pipe is our per-pipe protocol private structure. We keep @@ -47,16 +48,23 @@ struct nni_pair_pipe { nni_aio aio_putq; int busy; int closed; + nni_mtx mtx; + int refcnt; }; static int nni_pair_sock_init(void **sp, nni_sock *nsock) { nni_pair_sock *psock; + int rv; if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&psock->mtx)) != 0) { + NNI_FREE_STRUCT(psock); + return (rv); + } psock->nsock = nsock; psock->ppipe = NULL; psock->raw = 0; @@ -73,6 +81,8 @@ nni_pair_sock_fini(void *arg) nni_pair_sock *psock = arg; if (psock != NULL) { + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); } } @@ -87,31 +97,33 @@ nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock) if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); } - + if ((rv = nni_mtx_init(&ppipe->mtx)) != 0) { + goto fail; + } rv = nni_aio_init(&ppipe->aio_send, nni_pair_send_cb, ppipe); if (rv != 0) { - nni_pair_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_recv, nni_pair_recv_cb, ppipe); if (rv != 0) { - nni_pair_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_getq, nni_pair_getq_cb, ppipe); if (rv != 0) { - nni_pair_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_putq, nni_pair_putq_cb, ppipe); if (rv != 0) { - nni_pair_pipe_fini(ppipe); - return (rv); + goto fail; } ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); + +fail: + nni_pair_pipe_fini(ppipe); + return (rv); } @@ -125,6 +137,7 @@ nni_pair_pipe_fini(void *arg) nni_aio_fini(&ppipe->aio_recv); nni_aio_fini(&ppipe->aio_putq); nni_aio_fini(&ppipe->aio_getq); + nni_mtx_fini(&ppipe->mtx); NNI_FREE_STRUCT(ppipe); } @@ -135,16 +148,21 @@ nni_pair_pipe_start(void *arg) nni_pair_pipe *ppipe = arg; nni_pair_sock *psock = ppipe->psock; + nni_mtx_lock(&psock->mtx); if (psock->ppipe != NULL) { + nni_mtx_unlock(&psock->mtx); return (NNG_EBUSY); // Already have a peer, denied. } psock->ppipe = ppipe; + nni_mtx_unlock(&psock->mtx); + + nni_mtx_lock(&ppipe->mtx); + ppipe->refcnt = 2; + nni_mtx_unlock(&ppipe->mtx); // Schedule a getq on the upper, and a read from the pipe. // Each of these also sets up another hold on the pipe itself. - nni_pipe_hold(ppipe->npipe); nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq); - nni_pipe_hold(ppipe->npipe); nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); return (0); @@ -152,17 +170,31 @@ nni_pair_pipe_start(void *arg) static void -nni_pair_pipe_stop(void *arg) +nni_pair_pipe_stop(nni_pair_pipe *ppipe) { - nni_pair_pipe *ppipe = arg; nni_pair_sock *psock = ppipe->psock; + int refcnt; - nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq); - nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); + nni_mtx_lock(&psock->mtx); if (psock->ppipe == ppipe) { psock->ppipe = NULL; } + nni_mtx_unlock(&psock->mtx); + + // These operations are idempotent. + nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq); + nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); + + nni_mtx_lock(&ppipe->mtx); + NNI_ASSERT(ppipe->refcnt > 0); + ppipe->refcnt--; + refcnt = ppipe->refcnt; + nni_mtx_unlock(&ppipe->mtx); + + if (refcnt == 0) { + nni_pipe_remove(ppipe->npipe); + } } @@ -173,8 +205,7 @@ nni_pair_recv_cb(void *arg) nni_pair_sock *psock = ppipe->psock; if (nni_aio_result(&ppipe->aio_recv) != 0) { - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_pair_pipe_stop(ppipe); return; } @@ -192,8 +223,7 @@ nni_pair_putq_cb(void *arg) if (nni_aio_result(&ppipe->aio_putq) != 0) { nni_msg_free(ppipe->aio_putq.a_msg); ppipe->aio_putq.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_pair_pipe_stop(ppipe); return; } nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); @@ -208,8 +238,7 @@ nni_pair_getq_cb(void *arg) nni_msg *msg; if (nni_aio_result(&ppipe->aio_getq) != 0) { - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_pair_pipe_stop(ppipe); return; } @@ -228,8 +257,7 @@ nni_pair_send_cb(void *arg) if (nni_aio_result(&ppipe->aio_send) != 0) { nni_msg_free(ppipe->aio_send.a_msg); ppipe->aio_send.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_pair_pipe_stop(ppipe); return; } @@ -278,7 +306,6 @@ static nni_proto_pipe_ops nni_pair_pipe_ops = { .pipe_init = nni_pair_pipe_init, .pipe_fini = nni_pair_pipe_fini, .pipe_start = nni_pair_pipe_start, - .pipe_stop = nni_pair_pipe_stop, }; static nni_proto_sock_ops nni_pair_sock_ops = { diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index eb14be81..eb66bc21 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -107,7 +107,6 @@ nni_pull_pipe_start(void *arg) nni_pull_pipe *pp = arg; // Start the pending pull... - nni_pipe_hold(pp->pipe); nni_pull_recv(pp); return (0); @@ -115,12 +114,11 @@ nni_pull_pipe_start(void *arg) static void -nni_pull_pipe_stop(void *arg) +nni_pull_pipe_stop(nni_pull_pipe *pp) { - nni_pull_pipe *pp = arg; - // Cancel any pending sendup. nni_msgq_aio_cancel(pp->pull->urq, &pp->putq_aio); + nni_pipe_remove(pp->pipe); } @@ -133,8 +131,7 @@ nni_pull_recv_cb(void *arg) if (nni_aio_result(aio) != 0) { // Failed to get a message, probably the pipe is closed. - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pull_pipe_stop(pp); return; } @@ -157,8 +154,7 @@ nni_pull_putq_cb(void *arg) // we can do. Just close the pipe. nni_msg_free(aio->a_msg); aio->a_msg = NULL; - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pull_pipe_stop(pp); return; } @@ -172,8 +168,7 @@ nni_pull_recv(nni_pull_pipe *pp) { // Schedule the aio with callback. if (nni_pipe_aio_recv(pp->pipe, &pp->recv_aio) != 0) { - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pipe_remove(pp->pipe); } } @@ -230,7 +225,6 @@ static nni_proto_pipe_ops nni_pull_pipe_ops = { .pipe_init = nni_pull_pipe_init, .pipe_fini = nni_pull_pipe_fini, .pipe_start = nni_pull_pipe_start, - .pipe_stop = nni_pull_pipe_stop, }; static nni_proto_sock_ops nni_pull_sock_ops = { diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 9554b2be..e69ebdbf 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -39,6 +39,8 @@ struct nni_push_pipe { nni_aio aio_recv; nni_aio aio_send; nni_aio aio_getq; + int refcnt; + nni_mtx mtx; }; static int @@ -70,6 +72,19 @@ nni_push_sock_fini(void *arg) } +static void +nni_push_pipe_fini(void *arg) +{ + nni_push_pipe *pp = arg; + + nni_aio_fini(&pp->aio_recv); + nni_aio_fini(&pp->aio_send); + nni_aio_fini(&pp->aio_getq); + nni_mtx_fini(&pp->mtx); + NNI_FREE_STRUCT(pp); +} + + static int nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { @@ -80,19 +95,17 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) return (NNG_ENOMEM); } if ((rv = nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp)) != 0) { - NNI_FREE_STRUCT(pp); - return (rv); + goto fail; } if ((rv = nni_aio_init(&pp->aio_send, nni_push_send_cb, pp)) != 0) { - nni_aio_fini(&pp->aio_recv); - NNI_FREE_STRUCT(pp); + goto fail; return (rv); } if ((rv = nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp)) != 0) { - nni_aio_fini(&pp->aio_send); - nni_aio_fini(&pp->aio_recv); - NNI_FREE_STRUCT(pp); - return (rv); + goto fail; + } + if ((rv = nni_mtx_init(&pp->mtx)) != 0) { + goto fail; } NNI_LIST_NODE_INIT(&pp->node); @@ -100,18 +113,10 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) pp->push = psock; *ppp = pp; return (0); -} - -static void -nni_push_pipe_fini(void *arg) -{ - nni_push_pipe *pp = arg; - - nni_aio_fini(&pp->aio_recv); - nni_aio_fini(&pp->aio_send); - nni_aio_fini(&pp->aio_getq); - NNI_FREE_STRUCT(pp); +fail: + nni_push_pipe_fini(pp); + return (rv); } @@ -125,6 +130,10 @@ nni_push_pipe_start(void *arg) return (NNG_EPROTO); } + nni_mtx_lock(&pp->mtx); + pp->refcnt = 2; + nni_mtx_unlock(&pp->mtx); + // Schedule a receiver. This is mostly so that we can detect // a closed transport pipe. nni_pipe_hold(pp->pipe); @@ -139,12 +148,22 @@ nni_push_pipe_start(void *arg) static void -nni_push_pipe_stop(void *arg) +nni_push_pipe_stop(nni_push_pipe *pp) { - nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; + int refcnt; nni_msgq_aio_cancel(push->uwq, &pp->aio_getq); + + nni_mtx_lock(&pp->mtx); + NNI_ASSERT(pp->refcnt > 0); + pp->refcnt--; + refcnt = pp->refcnt; + nni_mtx_unlock(&pp->mtx); + + if (refcnt == 0) { + nni_pipe_remove(pp->pipe); + } } @@ -156,8 +175,7 @@ nni_push_recv_cb(void *arg) // We normally expect to receive an error. If a pipe actually // sends us data, we just discard it. if (nni_aio_result(&pp->aio_recv) != 0) { - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_push_pipe_stop(pp); return; } nni_msg_free(pp->aio_recv.a_msg); @@ -175,8 +193,7 @@ nni_push_send_cb(void *arg) if (nni_aio_result(&pp->aio_send) != 0) { nni_msg_free(pp->aio_send.a_msg); pp->aio_send.a_msg = NULL; - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_push_pipe_stop(pp); return; } @@ -192,8 +209,7 @@ nni_push_getq_cb(void *arg) if (nni_aio_result(aio) != 0) { // If the socket is closing, nothing else we can do. - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_push_pipe_stop(pp); return; } @@ -244,7 +260,6 @@ static nni_proto_pipe_ops nni_push_pipe_ops = { .pipe_init = nni_push_pipe_init, .pipe_fini = nni_push_pipe_fini, .pipe_start = nni_push_pipe_start, - .pipe_stop = nni_push_pipe_stop, }; static nni_proto_sock_ops nni_push_sock_ops = { diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 5ea16d2d..8ad9bb6d 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -34,6 +34,7 @@ struct nni_pub_sock { int raw; nni_aio aio_getq; nni_list pipes; + nni_mtx mtx; }; // An nni_pub_pipe is our per-pipe protocol private structure. @@ -45,6 +46,8 @@ struct nni_pub_pipe { nni_aio aio_send; nni_aio aio_recv; nni_list_node node; + int refcnt; + nni_mtx mtx; }; static int @@ -56,6 +59,10 @@ nni_pub_sock_init(void **pubp, nni_sock *sock) if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&pub->mtx)) != 0) { + nni_pub_sock_fini(pub); + return (rv); + } rv = nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub); if (rv != 0) { nni_pub_sock_fini(pub); @@ -79,6 +86,7 @@ nni_pub_sock_fini(void *arg) nni_pub_sock *pub = arg; nni_aio_fini(&pub->aio_getq); + nni_mtx_fini(&pub->mtx); NNI_FREE_STRUCT(pub); } @@ -92,6 +100,20 @@ nni_pub_sock_open(void *arg) } +static void +nni_pub_pipe_fini(void *arg) +{ + nni_pub_pipe *pp = arg; + + nni_msgq_fini(pp->sendq); + nni_aio_fini(&pp->aio_getq); + nni_aio_fini(&pp->aio_send); + nni_aio_fini(&pp->aio_recv); + nni_mtx_fini(&pp->mtx); + NNI_FREE_STRUCT(pp); +} + + static int nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { @@ -102,45 +124,33 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock) return (NNG_ENOMEM); } // XXX: consider making this depth tunable - if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) { - nni_pub_pipe_fini(pp); - return (rv); + if (((rv = nni_msgq_init(&pp->sendq, 16)) != 0) || + ((rv = nni_mtx_init(&pp->mtx)) != 0)) { + goto fail; } rv = nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp); if (rv != 0) { - nni_pub_pipe_fini(pp); - return (rv); + goto fail; } rv = nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp); if (rv != 0) { - nni_pub_pipe_fini(pp); - return (rv); + goto fail; } rv = nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp); if (rv != 0) { - nni_pub_pipe_fini(pp); - return (rv); + goto fail; } pp->pipe = pipe; pp->pub = psock; *ppp = pp; return (0); -} - - -static void -nni_pub_pipe_fini(void *arg) -{ - nni_pub_pipe *pp = arg; - nni_msgq_fini(pp->sendq); - nni_aio_fini(&pp->aio_getq); - nni_aio_fini(&pp->aio_send); - nni_aio_fini(&pp->aio_recv); - NNI_FREE_STRUCT(pp); +fail: + nni_pub_pipe_fini(pp); + return (rv); } @@ -155,26 +165,41 @@ nni_pub_pipe_start(void *arg) } nni_list_append(&pub->pipes, pp); + nni_mtx_lock(&pp->mtx); + pp->refcnt = 2; + nni_mtx_unlock(&pp->mtx); + // Start the receiver and the queue reader. - nni_pipe_hold(pp->pipe); nni_pipe_aio_recv(pp->pipe, &pp->aio_recv); - nni_pipe_hold(pp->pipe); nni_msgq_aio_get(pp->sendq, &pp->aio_getq); + return (0); } static void -nni_pub_pipe_stop(void *arg) +nni_pub_pipe_stop(nni_pub_pipe *pp) { - nni_pub_pipe *pp = arg; nni_pub_sock *pub = pp->pub; + int refcnt; + nni_mtx_lock(&pub->mtx); if (nni_list_active(&pub->pipes, pp)) { nni_list_remove(&pub->pipes, pp); nni_msgq_close(pp->sendq); } + nni_mtx_unlock(&pub->mtx); + + nni_mtx_lock(&pp->mtx); + NNI_ASSERT(pp->refcnt > 0); + pp->refcnt--; + refcnt = pp->refcnt; + nni_mtx_unlock(&pp->mtx); + + if (refcnt == 0) { + nni_pipe_remove(pp->pipe); + } } @@ -184,7 +209,6 @@ nni_pub_sock_getq_cb(void *arg) nni_pub_sock *pub = arg; nni_msgq *uwq = pub->uwq; nni_msg *msg, *dup; - nni_mtx *mx = nni_sock_mtx(pub->sock); nni_pub_pipe *pp; nni_pub_pipe *last; @@ -197,7 +221,7 @@ nni_pub_sock_getq_cb(void *arg) msg = pub->aio_getq.a_msg; pub->aio_getq.a_msg = NULL; - nni_mtx_lock(mx); + nni_mtx_lock(&pub->mtx); last = nni_list_last(&pub->pipes); NNI_LIST_FOREACH (&pub->pipes, pp) { if (pp != last) { @@ -212,7 +236,7 @@ nni_pub_sock_getq_cb(void *arg) nni_msg_free(dup); } } - nni_mtx_unlock(mx); + nni_mtx_unlock(&pub->mtx); if (last == NULL) { nni_msg_free(msg); @@ -228,8 +252,7 @@ nni_pub_pipe_recv_cb(void *arg) nni_pub_pipe *pp = arg; if (nni_aio_result(&pp->aio_recv) != 0) { - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pub_pipe_stop(pp); return; } @@ -245,8 +268,7 @@ nni_pub_pipe_getq_cb(void *arg) nni_pub_pipe *pp = arg; if (nni_aio_result(&pp->aio_getq) != 0) { - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pub_pipe_stop(pp); return; } @@ -265,8 +287,7 @@ nni_pub_pipe_send_cb(void *arg) if (nni_aio_result(&pp->aio_send) != 0) { nni_msg_free(pp->aio_send.a_msg); pp->aio_send.a_msg = NULL; - nni_pipe_close(pp->pipe); - nni_pipe_rele(pp->pipe); + nni_pub_pipe_stop(pp); return; } @@ -315,7 +336,6 @@ static nni_proto_pipe_ops nni_pub_pipe_ops = { .pipe_init = nni_pub_pipe_init, .pipe_fini = nni_pub_pipe_fini, .pipe_start = nni_pub_pipe_start, - .pipe_stop = nni_pub_pipe_stop, }; nni_proto_sock_ops nni_pub_sock_ops = { diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 8340da77..cab745b5 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -117,18 +117,16 @@ nni_sub_pipe_start(void *arg) { nni_sub_pipe *sp = arg; - nni_pipe_hold(sp->pipe); nni_pipe_aio_recv(sp->pipe, &sp->aio_recv); return (0); } static void -nni_sub_pipe_stop(void *arg) +nni_sub_pipe_stop(nni_sub_pipe *sp) { - nni_sub_pipe *sp = arg; - nni_msgq_aio_cancel(sp->sub->urq, &sp->aio_putq); + nni_pipe_remove(sp->pipe); } @@ -141,8 +139,7 @@ nni_sub_recv_cb(void *arg) nni_msg *msg; if (nni_aio_result(&sp->aio_recv) != 0) { - nni_pipe_close(sp->pipe); - nni_pipe_rele(sp->pipe); + nni_sub_pipe_stop(sp); return; } @@ -160,8 +157,7 @@ nni_sub_putq_cb(void *arg) if (nni_aio_result(&sp->aio_putq) != 0) { nni_msg_free(sp->aio_putq.a_msg); sp->aio_putq.a_msg = NULL; - nni_pipe_close(sp->pipe); - nni_pipe_rele(sp->pipe); + nni_sub_pipe_stop(sp); return; } @@ -339,7 +335,6 @@ static nni_proto_pipe_ops nni_sub_pipe_ops = { .pipe_init = nni_sub_pipe_init, .pipe_fini = nni_sub_pipe_fini, .pipe_start = nni_sub_pipe_start, - .pipe_stop = nni_sub_pipe_stop, }; static nni_proto_sock_ops nni_sub_sock_ops = { diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 822758ef..507edf66 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -38,6 +38,7 @@ struct nni_rep_sock { char * btrace; size_t btrace_len; nni_aio aio_getq; + nni_mtx mtx; }; // An nni_rep_pipe is our per-pipe protocol private structure. @@ -51,8 +52,25 @@ struct nni_rep_pipe { nni_aio aio_recv; nni_aio aio_putq; int running; + int refcnt; + nni_mtx mtx; }; +static void +nni_rep_sock_fini(void *arg) +{ + nni_rep_sock *rep = arg; + + nni_aio_fini(&rep->aio_getq); + nni_idhash_fini(&rep->pipes); + if (rep->btrace != NULL) { + nni_free(rep->btrace, rep->btrace_len); + } + nni_mtx_fini(&rep->mtx); + NNI_FREE_STRUCT(rep); +} + + static int nni_rep_sock_init(void **repp, nni_sock *sock) { @@ -67,15 +85,14 @@ nni_rep_sock_init(void **repp, nni_sock *sock) rep->raw = 0; rep->btrace = NULL; rep->btrace_len = 0; - if ((rv = nni_idhash_init(&rep->pipes)) != 0) { - NNI_FREE_STRUCT(rep); - return (rv); + if (((rv = nni_mtx_init(&rep->mtx)) != 0) || + ((rv = nni_idhash_init(&rep->pipes)) != 0)) { + goto fail; } rv = nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep); if (rv != 0) { - nni_idhash_fini(&rep->pipes); - return (rv); + goto fail; } rep->uwq = nni_sock_sendq(sock); @@ -85,6 +102,10 @@ nni_rep_sock_init(void **repp, nni_sock *sock) nni_sock_senderr(sock, NNG_ESTATE); return (0); + +fail: + nni_rep_sock_fini(rep); + return (rv); } @@ -106,20 +127,6 @@ nni_rep_sock_close(void *arg) } -static void -nni_rep_sock_fini(void *arg) -{ - nni_rep_sock *rep = arg; - - nni_aio_fini(&rep->aio_getq); - nni_idhash_fini(&rep->pipes); - if (rep->btrace != NULL) { - nni_free(rep->btrace, rep->btrace_len); - } - NNI_FREE_STRUCT(rep); -} - - static int nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { @@ -129,7 +136,8 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) { + if (((rv = nni_msgq_init(&rp->sendq, 2)) != 0) || + ((rv = nni_mtx_init(&rp->mtx)) != 0)) { goto fail; } if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) != 0) { @@ -165,6 +173,7 @@ nni_rep_pipe_fini(void *arg) nni_aio_fini(&rp->aio_send); nni_aio_fini(&rp->aio_recv); nni_aio_fini(&rp->aio_putq); + nni_mtx_fini(&rp->mtx); NNI_FREE_STRUCT(rp); } @@ -177,31 +186,53 @@ nni_rep_pipe_start(void *arg) int rv; rp->id = nni_pipe_id(rp->pipe); + + nni_mtx_lock(&rep->mtx); rv = nni_idhash_insert(&rep->pipes, rp->id, rp); + nni_mtx_unlock(&rep->mtx); if (rv != 0) { return (rv); } - nni_pipe_hold(rp->pipe); + nni_mtx_lock(&rp->mtx); + rp->refcnt = 2; + rp->running = 1; + nni_mtx_unlock(&rp->mtx); + nni_msgq_aio_get(rp->sendq, &rp->aio_getq); - nni_pipe_hold(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); - rp->running = 1; return (0); } static void -nni_rep_pipe_stop(void *arg) +nni_rep_pipe_stop(nni_rep_pipe *rp) { - nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; + int refcnt; + uint32_t id; + nni_mtx_lock(&rp->mtx); + NNI_ASSERT(rp->refcnt > 0); + rp->refcnt--; + refcnt = rp->refcnt; + id = rp->id; + rp->id = 0; if (rp->running) { rp->running = 0; nni_msgq_close(rp->sendq); nni_msgq_aio_cancel(rep->urq, &rp->aio_putq); - nni_idhash_remove(&rep->pipes, rp->id); + } + nni_mtx_unlock(&rp->mtx); + + if (id != 0) { + nni_mtx_lock(&rep->mtx); + nni_idhash_remove(&rep->pipes, id); + nni_mtx_unlock(&rep->mtx); + } + + if (refcnt == 0) { + nni_pipe_remove(rp->pipe); } } @@ -246,12 +277,12 @@ nni_rep_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - nni_sock_lock(rep->sock); + nni_mtx_lock(&rep->mtx); rv = nni_idhash_find(&rep->pipes, id, (void **) &rp); + nni_mtx_unlock(&rep->mtx); if (rv == 0) { rv = nni_msgq_tryput(rp->sendq, msg); } - nni_sock_unlock(rep->sock); if (rv != 0) { nni_msg_free(msg); } @@ -267,8 +298,7 @@ nni_rep_pipe_getq_cb(void *arg) nni_rep_pipe *rp = arg; if (nni_aio_result(&rp->aio_getq) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -287,8 +317,7 @@ nni_rep_pipe_send_cb(void *arg) if (nni_aio_result(&rp->aio_send) != 0) { nni_msg_free(rp->aio_send.a_msg); rp->aio_send.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -308,8 +337,7 @@ nni_rep_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(&rp->aio_recv) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -371,8 +399,7 @@ nni_rep_pipe_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); rp->aio_putq.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -494,7 +521,6 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = { .pipe_init = nni_rep_pipe_init, .pipe_fini = nni_rep_pipe_fini, .pipe_start = nni_rep_pipe_start, - .pipe_stop = nni_rep_pipe_stop, }; static nni_proto_sock_ops nni_rep_sock_ops = { diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 8268ecd6..519b224e 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -44,6 +44,7 @@ struct nni_req_sock { uint32_t nextid; // next id uint8_t reqid[4]; // outstanding request ID (big endian) + nni_mtx mtx; }; // An nni_req_pipe is our per-pipe protocol private structure. @@ -57,6 +58,8 @@ struct nni_req_pipe { nni_aio aio_recv; nni_aio aio_putq; int running; + int refcnt; + nni_mtx mtx; }; static void nni_req_resender(void *); @@ -75,6 +78,11 @@ nni_req_sock_init(void **reqp, nni_sock *sock) if ((req = NNI_ALLOC_STRUCT(req)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&req->mtx)) != 0) { + NNI_FREE_STRUCT(req); + return (rv); + } + NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node); NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node); nni_timer_init(&req->timer, nni_req_timeout, req); @@ -114,6 +122,7 @@ nni_req_sock_fini(void *arg) if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); } + nni_mtx_fini(&req->mtx); NNI_FREE_STRUCT(req); } @@ -127,6 +136,9 @@ nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&rp->mtx)) != 0) { + goto failed; + } if ((rv = nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp)) != 0) { goto failed; } @@ -168,6 +180,7 @@ nni_req_pipe_fini(void *arg) nni_aio_fini(&rp->aio_recv); nni_aio_fini(&rp->aio_sendcooked); nni_aio_fini(&rp->aio_sendraw); + nni_mtx_fini(&rp->mtx); NNI_FREE_STRUCT(rp); } } @@ -182,45 +195,61 @@ nni_req_pipe_start(void *arg) if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) { return (NNG_EPROTO); } + nni_mtx_lock(&req->mtx); nni_list_append(&req->readypipes, rp); if (req->wantw) { nni_req_resend(req); } + nni_mtx_unlock(&req->mtx); + + nni_mtx_lock(&rp->mtx); + rp->refcnt = 2; + rp->running = 1; + nni_mtx_unlock(&rp->mtx); - nni_pipe_hold(rp->pipe); nni_msgq_aio_get(req->uwq, &rp->aio_getq); - nni_pipe_hold(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); - rp->running = 1; return (0); } static void -nni_req_pipe_stop(void *arg) +nni_req_pipe_stop(nni_req_pipe *rp) { - nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; + int refcnt; + int running; - if (!rp->running) { - return; - } + nni_mtx_lock(&rp->mtx); + running = rp->running; rp->running = 0; + NNI_ASSERT(rp->refcnt > 0); + rp->refcnt--; + refcnt = rp->refcnt; + nni_mtx_unlock(&rp->mtx); + + if (running) { + nni_mtx_lock(&req->mtx); + // This removes the node from either busypipes or readypipes. + // It doesn't much matter which. + nni_list_remove(&req->readypipes, rp); - // This removes the node from either busypipes or readypipes. - // It doesn't much matter which. - nni_list_remove(&req->readypipes, rp); - - if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { - // we are removing the pipe we sent the last request on... - // schedule immediate resend. - req->resend = NNI_TIME_ZERO; - req->wantw = 1; - nni_req_resend(req); + if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { + // removing the pipe we sent the last request on... + // schedule immediate resend. + req->resend = NNI_TIME_ZERO; + req->wantw = 1; + nni_req_resend(req); + } + nni_mtx_unlock(&req->mtx); } nni_msgq_aio_cancel(req->uwq, &rp->aio_getq); nni_msgq_aio_cancel(req->urq, &rp->aio_putq); + + if (refcnt == 0) { + nni_pipe_remove(rp->pipe); + } } @@ -293,8 +322,7 @@ nni_req_getq_cb(void *arg) // exception: we wind up here in error state when the uwq is closed.) if (nni_aio_result(&rp->aio_getq) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -315,8 +343,7 @@ nni_req_sendraw_cb(void *arg) if (nni_aio_result(&rp->aio_sendraw) != 0) { nni_msg_free(rp->aio_sendraw.a_msg); rp->aio_sendraw.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -338,8 +365,7 @@ nni_req_sendcooked_cb(void *arg) // means no new asynchronous traffic can occur here. nni_msg_free(rp->aio_sendcooked.a_msg); rp->aio_sendcooked.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -347,12 +373,12 @@ nni_req_sendcooked_cb(void *arg) // reinsert ourselves in the ready list, and possibly schedule // a resend. - nni_mtx_lock(mx); + nni_mtx_lock(&req->mtx); nni_list_remove(&req->busypipes, rp); nni_list_append(&req->readypipes, rp); nni_req_resend(req); - nni_mtx_unlock(mx); + nni_mtx_unlock(&req->mtx); } @@ -363,8 +389,7 @@ nni_req_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } rp->aio_putq.a_msg = NULL; @@ -380,8 +405,7 @@ nni_req_recv_cb(void *arg) nni_msg *msg; if (nni_aio_result(&rp->aio_recv) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -411,8 +435,7 @@ nni_req_recv_cb(void *arg) malformed: nni_msg_free(msg); - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); } @@ -420,14 +443,13 @@ static void nni_req_timeout(void *arg) { nni_req_sock *req = arg; - nni_mtx *mx = nni_sock_mtx(req->sock); - nni_mtx_lock(mx); + nni_mtx_lock(&req->mtx); if (req->reqmsg != NULL) { req->wantw = 1; nni_req_resend(req); } - nni_mtx_unlock(mx); + nni_mtx_unlock(&req->mtx); } @@ -435,7 +457,6 @@ static void nni_req_resend(nni_req_sock *req) { nni_req_pipe *rp; - nni_mtx *mx = nni_sock_mtx(req->sock); nni_msg *msg; int i; @@ -512,6 +533,10 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) return (NULL); } + // NB: The socket lock is also held, so this is always self-serialized. + // But we have to serialize against other async callbacks. + nni_mtx_lock(&req->mtx); + // If another message is there, this cancels it. if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); @@ -525,6 +550,7 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) req->wantw = 1; nni_req_resend(req); + nni_mtx_unlock(&req->mtx); // Clear the error condition. nni_sock_recverr(req->sock, 0); @@ -537,6 +563,7 @@ static nni_msg * nni_req_sock_rfilter(void *arg, nni_msg *msg) { nni_req_sock *req = arg; + nni_msg *rmsg; if (req->raw) { // Pass it unmolested @@ -548,21 +575,28 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg) return (NULL); } - if (req->reqmsg == NULL) { + nni_mtx_lock(&req->mtx); + + if ((rmsg = req->reqmsg) == NULL) { // We had no outstanding request. + nni_mtx_unlock(&req->mtx); nni_msg_free(msg); return (NULL); } if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) { // Wrong request id + nni_mtx_unlock(&req->mtx); nni_msg_free(msg); return (NULL); } - nni_sock_recverr(req->sock, NNG_ESTATE); - nni_msg_free(req->reqmsg); req->reqmsg = NULL; req->pendpipe = NULL; + nni_mtx_unlock(&req->mtx); + + nni_sock_recverr(req->sock, NNG_ESTATE); + nni_msg_free(rmsg); + return (msg); } @@ -573,7 +607,6 @@ static nni_proto_pipe_ops nni_req_pipe_ops = { .pipe_init = nni_req_pipe_init, .pipe_fini = nni_req_pipe_fini, .pipe_start = nni_req_pipe_start, - .pipe_stop = nni_req_pipe_stop, }; static nni_proto_sock_ops nni_req_sock_ops = { diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 71220678..33360ab5 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -37,6 +37,7 @@ struct nni_resp_sock { char * btrace; size_t btrace_len; nni_aio aio_getq; + nni_mtx mtx; }; // An nni_resp_pipe is our per-pipe protocol private structure. @@ -50,8 +51,28 @@ struct nni_resp_pipe { nni_aio aio_send; nni_aio aio_recv; int running; + int refcnt; + nni_mtx mtx; }; + +static void +nni_resp_sock_fini(void *arg) +{ + nni_resp_sock *psock = arg; + + if (psock != NULL) { + nni_aio_fini(&psock->aio_getq); + nni_idhash_fini(&psock->pipes); + if (psock->btrace != NULL) { + nni_free(psock->btrace, psock->btrace_len); + } + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); + } +} + + static int nni_resp_sock_init(void **pp, nni_sock *nsock) { @@ -68,36 +89,22 @@ nni_resp_sock_init(void **pp, nni_sock *nsock) psock->btrace_len = 0; psock->urq = nni_sock_recvq(nsock); psock->uwq = nni_sock_sendq(nsock); - if ((rv = nni_idhash_init(&psock->pipes)) != 0) { - NNI_FREE_STRUCT(psock); - return (rv); + if (((rv = nni_idhash_init(&psock->pipes)) != 0) || + ((rv = nni_mtx_init(&psock->mtx)) != 0)) { + goto fail; } rv = nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock); if (rv != 0) { - nni_idhash_fini(&psock->pipes); - NNI_FREE_STRUCT(psock); - return (rv); + goto fail; } *pp = psock; nni_sock_senderr(nsock, NNG_ESTATE); return (0); -} - -static void -nni_resp_sock_fini(void *arg) -{ - nni_resp_sock *psock = arg; - - if (psock != NULL) { - nni_aio_fini(&psock->aio_getq); - nni_idhash_fini(&psock->pipes); - if (psock->btrace != NULL) { - nni_free(psock->btrace, psock->btrace_len); - } - NNI_FREE_STRUCT(psock); - } +fail: + nni_resp_sock_fini(psock); + return (rv); } @@ -128,35 +135,35 @@ nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock) if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) { - nni_resp_pipe_fini(ppipe); - return (rv); + if (((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) || + ((rv = nni_mtx_init(&ppipe->mtx)) != 0)) { + goto fail; } rv = nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe); if (rv != 0) { - nni_resp_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe); if (rv != 0) { - nni_resp_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe); if (rv != 0) { - nni_resp_pipe_fini(ppipe); - return (rv); + goto fail; } rv = nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe); if (rv != 0) { - nni_resp_pipe_fini(ppipe); - return (rv); + goto fail; } ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); + +fail: + nni_resp_pipe_fini(ppipe); + return (rv); } @@ -170,6 +177,7 @@ nni_resp_pipe_fini(void *arg) nni_aio_fini(&ppipe->aio_getq); nni_aio_fini(&ppipe->aio_send); nni_aio_fini(&ppipe->aio_recv); + nni_mtx_fini(&ppipe->mtx); NNI_FREE_STRUCT(ppipe); } @@ -182,34 +190,56 @@ nni_resp_pipe_start(void *arg) int rv; ppipe->id = nni_pipe_id(ppipe->npipe); + + nni_mtx_lock(&psock->mtx); rv = nni_idhash_insert(&psock->pipes, ppipe->id, ppipe); + nni_mtx_unlock(&psock->mtx); if (rv != 0) { return (rv); } - nni_pipe_hold(ppipe->npipe); - nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); + nni_mtx_lock(&ppipe->mtx); + ppipe->refcnt = 2; + ppipe->running = 1; + nni_mtx_unlock(&ppipe->mtx); - nni_pipe_hold(ppipe->npipe); + nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq); - ppipe->running = 1; + return (rv); } static void -nni_resp_pipe_stop(void *arg) +nni_resp_pipe_stop(nni_resp_pipe *ppipe) { - nni_resp_pipe *ppipe = arg; nni_resp_sock *psock = ppipe->psock; + int refcnt; + int running; - if (ppipe->running) { - ppipe->running = 0; + nni_mtx_lock(&psock->mtx); + if (ppipe->id != 0) { nni_idhash_remove(&psock->pipes, ppipe->id); + ppipe->id = 0; + } + nni_mtx_unlock(&psock->mtx); + + nni_mtx_lock(&ppipe->mtx); + NNI_ASSERT(ppipe->refcnt > 0); + ppipe->refcnt--; + refcnt = ppipe->refcnt; + running = ppipe->running; + ppipe->running = 0; + nni_mtx_unlock(&ppipe->mtx); + + if (running) { nni_msgq_close(ppipe->sendq); nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); } + if (refcnt == 0) { + nni_pipe_remove(ppipe->npipe); + } } @@ -246,19 +276,19 @@ nni_resp_sock_getq_cb(void *arg) NNI_GET32(header, id); nni_msg_trim_header(msg, 4); - nni_sock_lock(psock->nsock); - if (nni_idhash_find(&psock->pipes, id, (void **) &ppipe) != 0) { - nni_sock_unlock(psock->nsock); - nni_msg_free(msg); - nni_msgq_aio_get(psock->uwq, &psock->aio_getq); - return; - } + nni_mtx_lock(&psock->mtx); + rv = nni_idhash_find(&psock->pipes, id, (void **) &ppipe); - // Non-blocking put. - if (nni_msgq_tryput(ppipe->sendq, msg) != 0) { + if (rv != 0) { nni_msg_free(msg); + nni_msgq_aio_get(psock->uwq, &psock->aio_getq); + } else { + // Non-blocking put. + if (nni_msgq_tryput(ppipe->sendq, msg) != 0) { + nni_msg_free(msg); + } } - nni_sock_unlock(psock->nsock); + nni_mtx_unlock(&psock->mtx); } @@ -268,8 +298,7 @@ nni_resp_getq_cb(void *arg) nni_resp_pipe *ppipe = arg; if (nni_aio_result(&ppipe->aio_getq) != 0) { - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_resp_pipe_stop(ppipe); return; } @@ -288,8 +317,7 @@ nni_resp_send_cb(void *arg) if (nni_aio_result(&ppipe->aio_send) != 0) { nni_msg_free(ppipe->aio_send.a_msg); ppipe->aio_send.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_resp_pipe_stop(ppipe); return; } @@ -358,8 +386,7 @@ nni_resp_recv_cb(void *arg) return; error: - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_resp_pipe_stop(ppipe); } @@ -371,8 +398,7 @@ nni_resp_putq_cb(void *arg) if (nni_aio_result(&ppipe->aio_putq) != 0) { nni_msg_free(ppipe->aio_putq.a_msg); ppipe->aio_putq.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_resp_pipe_stop(ppipe); } nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); @@ -499,7 +525,6 @@ static nni_proto_pipe_ops nni_resp_pipe_ops = { .pipe_init = nni_resp_pipe_init, .pipe_fini = nni_resp_pipe_fini, .pipe_start = nni_resp_pipe_start, - .pipe_stop = nni_resp_pipe_stop, }; static nni_proto_sock_ops nni_resp_sock_ops = { diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index f72532de..962371c1 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -39,6 +39,7 @@ struct nni_surv_sock { nni_timer_node timer; nni_msgq * uwq; nni_msgq * urq; + nni_mtx mtx; }; // An nni_surv_pipe is our per-pipe protocol private structure. @@ -52,8 +53,21 @@ struct nni_surv_pipe { nni_aio aio_send; nni_aio aio_recv; int running; + int refcnt; + nni_mtx mtx; }; +static void +nni_surv_sock_fini(void *arg) +{ + nni_surv_sock *psock = arg; + + nni_aio_fini(&psock->aio_getq); + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); +} + + static int nni_surv_sock_init(void **sp, nni_sock *nsock) { @@ -63,10 +77,12 @@ nni_surv_sock_init(void **sp, nni_sock *nsock) if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&psock->mtx)) != 0) { + goto fail; + } rv = nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock); if (rv != 0) { - NNI_FREE_STRUCT(psock); - return (rv); + goto fail; } NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node); nni_timer_init(&psock->timer, nni_surv_timeout, psock); @@ -82,6 +98,10 @@ nni_surv_sock_init(void **sp, nni_sock *nsock) *sp = psock; nni_sock_recverr(nsock, NNG_ESTATE); return (0); + +fail: + nni_surv_sock_fini(psock); + return (rv); } @@ -105,16 +125,6 @@ nni_surv_sock_close(void *arg) static void -nni_surv_sock_fini(void *arg) -{ - nni_surv_sock *psock = arg; - - nni_aio_fini(&psock->aio_getq); - NNI_FREE_STRUCT(psock); -} - - -static void nni_surv_pipe_fini(void *arg) { nni_surv_pipe *ppipe = arg; @@ -124,6 +134,7 @@ nni_surv_pipe_fini(void *arg) nni_aio_fini(&ppipe->aio_recv); nni_aio_fini(&ppipe->aio_putq); nni_msgq_fini(ppipe->sendq); + nni_mtx_fini(&ppipe->mtx); NNI_FREE_STRUCT(ppipe); } @@ -138,7 +149,8 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (NNG_ENOMEM); } // This depth could be tunable. - if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { + if (((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) || + ((rv = nni_mtx_init(&ppipe->mtx)) != 0)) { goto failed; } rv = nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe); @@ -174,29 +186,46 @@ nni_surv_pipe_start(void *arg) nni_surv_pipe *ppipe = arg; nni_surv_sock *psock = ppipe->psock; + nni_mtx_lock(&psock->mtx); nni_list_append(&psock->pipes, ppipe); + nni_mtx_unlock(&psock->mtx); - nni_pipe_hold(ppipe->npipe); - nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq); + nni_mtx_lock(&ppipe->mtx); + ppipe->refcnt = 2; + ppipe->running = 1; + nni_mtx_unlock(&ppipe->mtx); - nni_pipe_hold(ppipe->npipe); + nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq); nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); - ppipe->running = 1; return (0); } static void -nni_surv_pipe_stop(void *arg) +nni_surv_pipe_stop(nni_surv_pipe *ppipe) { - nni_surv_pipe *ppipe = arg; nni_surv_sock *psock = ppipe->psock; + int refcnt; - if (ppipe->running) { + nni_mtx_lock(&psock->mtx); + if (nni_list_active(&psock->pipes, ppipe)) { nni_list_remove(&psock->pipes, ppipe); + } + nni_mtx_unlock(&psock->mtx); + + nni_mtx_lock(&ppipe->mtx); + NNI_ASSERT(ppipe->refcnt > 0); + ppipe->refcnt--; + refcnt = ppipe->refcnt; + if (ppipe->running) { nni_msgq_close(ppipe->sendq); nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); } + nni_mtx_unlock(&ppipe->mtx); + + if (refcnt == 0) { + nni_pipe_remove(ppipe->npipe); + } } @@ -206,8 +235,7 @@ nni_surv_getq_cb(void *arg) nni_surv_pipe *ppipe = arg; if (nni_aio_result(&ppipe->aio_getq) != 0) { - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_surv_pipe_stop(ppipe); return; } @@ -226,8 +254,7 @@ nni_surv_send_cb(void *arg) if (nni_aio_result(&ppipe->aio_send) != 0) { nni_msg_free(ppipe->aio_send.a_msg); ppipe->aio_send.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_surv_pipe_stop(ppipe); return; } @@ -243,8 +270,7 @@ nni_surv_putq_cb(void *arg) if (nni_aio_result(&ppipe->aio_putq) != 0) { nni_msg_free(ppipe->aio_putq.a_msg); ppipe->aio_putq.a_msg = NULL; - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_surv_pipe_stop(ppipe); return; } @@ -287,8 +313,7 @@ nni_surv_recv_cb(void *arg) return; failed: - nni_pipe_close(ppipe->npipe); - nni_pipe_rele(ppipe->npipe); + nni_surv_pipe_stop(ppipe); } @@ -358,7 +383,7 @@ nni_surv_sock_getq_cb(void *arg) msg = psock->aio_getq.a_msg; psock->aio_getq.a_msg = NULL; - nni_sock_lock(psock->nsock); + nni_mtx_lock(&psock->mtx); last = nni_list_last(&psock->pipes); NNI_LIST_FOREACH (&psock->pipes, ppipe) { if (ppipe != last) { @@ -372,7 +397,7 @@ nni_surv_sock_getq_cb(void *arg) nni_msg_free(dup); } } - nni_sock_unlock(psock->nsock); + nni_mtx_unlock(&psock->mtx); if (last == NULL) { // If there were no pipes to send on, just toss the message. @@ -465,7 +490,6 @@ static nni_proto_pipe_ops nni_surv_pipe_ops = { .pipe_init = nni_surv_pipe_init, .pipe_fini = nni_surv_pipe_fini, .pipe_start = nni_surv_pipe_start, - .pipe_stop = nni_surv_pipe_stop, }; static nni_proto_sock_ops nni_surv_sock_ops = { diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index c6f908f8..9a39208d 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -91,8 +91,12 @@ nni_inproc_pipe_close(void *arg) { nni_inproc_pipe *pipe = arg; - nni_msgq_close(pipe->rq); - nni_msgq_close(pipe->wq); + if (pipe->rq != NULL) { + nni_msgq_close(pipe->rq); + } + if (pipe->wq != NULL) { + nni_msgq_close(pipe->wq); + } } |
