From 0a51aa7bfc88d55b98fdde0d497b072e6911457d Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 24 Jun 2017 14:11:35 -0700 Subject: Protocols keep their own reference counts. --- src/core/endpt.c | 129 ++++++++++++++++++++++++++++++++++++++++-------------- src/core/endpt.h | 6 ++- src/core/pipe.c | 45 ++++++++++++------- src/core/pipe.h | 19 +++++--- src/core/socket.c | 98 +++++++++++++++-------------------------- src/core/socket.h | 19 +++----- 6 files changed, 185 insertions(+), 131 deletions(-) (limited to 'src/core') diff --git a/src/core/endpt.c b/src/core/endpt.c index efd3eefb..00690cd4 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -19,13 +19,54 @@ static nni_objhash *nni_eps = NULL; static void *nni_ep_ctor(uint32_t); static void nni_ep_dtor(void *); +// Because we can't reap threads from themselves, we need to have a separate +// task to reap endpoints. We use one global task to do this, and we just +// add to the reap list as needed. +static nni_taskq_ent nni_ep_reap_tqe; +static nni_mtx nni_ep_reap_mx; +static nni_list nni_ep_reap_list; + +static void +nni_ep_reaper(void *arg) +{ + nni_ep *ep; + + NNI_ARG_UNUSED(arg); + + nni_mtx_lock(&nni_ep_reap_mx); + while ((ep = nni_list_first(&nni_ep_reap_list)) != NULL) { + nni_list_remove(&nni_ep_reap_list, ep); + + nni_mtx_unlock(&nni_ep_reap_mx); + nni_thr_fini(&ep->ep_thr); + nni_objhash_unref(nni_eps, ep->ep_id); + nni_mtx_lock(&nni_ep_reap_mx); + + continue; + } + + nni_mtx_unlock(&nni_ep_reap_mx); +} + + int nni_ep_sys_init(void) { int rv; rv = nni_objhash_init(&nni_eps, nni_ep_ctor, nni_ep_dtor); + if (rv != 0) { + return (rv); + } + rv = nni_mtx_init(&nni_ep_reap_mx); + if (rv != 0) { + nni_objhash_fini(nni_eps); + nni_eps = NULL; + return (rv); + } + nni_ep_list_init(&nni_ep_reap_list); + nni_taskq_ent_init(&nni_ep_reap_tqe, nni_ep_reaper, NULL); return (rv); } @@ -33,6 +74,8 @@ nni_ep_sys_init(void) void nni_ep_sys_fini(void) { + nni_taskq_cancel(NULL, &nni_ep_reap_tqe); + nni_mtx_fini(&nni_ep_reap_mx); nni_objhash_fini(nni_eps); nni_eps = NULL; } @@ -129,13 +172,6 @@ nni_ep_dtor(void *ptr) { nni_ep *ep = ptr; - // If a thread is running, make sure it is stopped. - nni_thr_fini(&ep->ep_thr); - - if (ep->ep_sock != NULL) { - // This is idempotent; harmless if not already on the list. - nni_sock_rem_ep(ep->ep_sock, ep); - } if (ep->ep_data != NULL) { ep->ep_ops.ep_fini(ep->ep_data); } @@ -181,7 +217,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) return (rv); } - if ((rv = nni_sock_add_ep(sock, ep)) != 0) { + if ((rv = nni_sock_ep_add(sock, ep)) != 0) { nni_objhash_unref(nni_eps, id); return (rv); } @@ -191,23 +227,51 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) } +void +nni_ep_stop(nni_ep *ep) +{ + nni_mtx_lock(&ep->ep_mtx); + if (ep->ep_closed == 0) { + ep->ep_closed = 1; + ep->ep_ops.ep_close(ep->ep_data); + } + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); +} + + void nni_ep_close(nni_ep *ep) +{ + nni_pipe *pipe; + + nni_ep_stop(ep); + nni_mtx_lock(&ep->ep_mtx); + NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { + nni_pipe_close(pipe); + } + nni_mtx_unlock(&ep->ep_mtx); +} + + +void +nni_ep_remove(nni_ep *ep) { nni_pipe *pipe; nni_sock *sock = ep->ep_sock; + nni_ep_close(ep); + nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed == 0) { - ep->ep_closed = 1; - ep->ep_ops.ep_close(ep->ep_data); - if ((pipe = ep->ep_pipe) != NULL) { - pipe->p_ep = NULL; - ep->ep_pipe = NULL; - } - nni_cv_wake(&ep->ep_cv); + while (nni_list_first(&ep->ep_pipes) != NULL) { + nni_cv_wait(&ep->ep_cv); } nni_mtx_unlock(&ep->ep_mtx); + + nni_sock_ep_remove(sock, ep); + + nni_thr_fini(&ep->ep_thr); + nni_objhash_unref(nni_eps, ep->ep_id); } @@ -222,29 +286,27 @@ nni_ep_connect(nni_ep *ep) } rv = ep->ep_ops.ep_connect(ep->ep_data, pipe->p_tran_data); if (rv != 0) { - nni_pipe_destroy(pipe); + nni_pipe_remove(pipe); return (rv); } if ((rv = nni_pipe_start(pipe)) != 0) { - nni_pipe_close(pipe); + nni_pipe_remove(pipe); return (rv); } ep->ep_pipe = pipe; - pipe->p_ep = ep; return (0); } int -nni_ep_add_pipe(nni_ep *ep, nni_pipe *pipe) +nni_ep_pipe_add(nni_ep *ep, nni_pipe *pipe) { - nni_ep_hold(ep); nni_mtx_lock(&ep->ep_mtx); if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); - nni_ep_rele(ep); return (NNG_ECLOSED); } + nni_list_append(&ep->ep_pipes, pipe); nni_mtx_unlock(&ep->ep_mtx); @@ -253,16 +315,19 @@ nni_ep_add_pipe(nni_ep *ep, nni_pipe *pipe) void -nni_ep_rem_pipe(nni_ep *ep, nni_pipe *pipe) +nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) { - nni_mtx_lock(&ep->ep_mtx); - if (!nni_list_active(&ep->ep_pipes, pipe)) { + if ((ep != NULL) && (nni_list_active(&ep->ep_pipes, pipe))) { + nni_mtx_lock(&ep->ep_mtx); + nni_list_remove(&ep->ep_pipes, pipe); + + if (ep->ep_pipe == pipe) { + ep->ep_pipe = NULL; + } + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); - return; } - nni_list_remove(&ep->ep_pipes, pipe); - nni_mtx_unlock(&ep->ep_mtx); - nni_ep_rele(ep); } @@ -295,7 +360,7 @@ nni_dialer(void *arg) } if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); - break; + return; } nni_mtx_unlock(&ep->ep_mtx); @@ -382,11 +447,11 @@ nni_ep_accept(nni_ep *ep) } rv = ep->ep_ops.ep_accept(ep->ep_data, pipe->p_tran_data); if (rv != 0) { - nni_pipe_destroy(pipe); + nni_pipe_remove(pipe); return (rv); } if ((rv = nni_pipe_start(pipe)) != 0) { - nni_pipe_close(pipe); + nni_pipe_remove(pipe); return (rv); } return (0); diff --git a/src/core/endpt.h b/src/core/endpt.h index cfb332bb..3d353cdf 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -47,11 +47,13 @@ extern void nni_ep_hold(nni_ep *); extern void nni_ep_rele(nni_ep *); extern uint32_t nni_ep_id(nni_ep *); extern int nni_ep_create(nni_ep **, nni_sock *, const char *); +extern void nni_ep_stop(nni_ep *); extern void nni_ep_close(nni_ep *); +extern void nni_ep_remove(nni_ep *); extern int nni_ep_dial(nni_ep *, int); extern int nni_ep_listen(nni_ep *, int); extern void nni_ep_list_init(nni_list *); -extern int nni_ep_add_pipe(nni_ep *, nni_pipe *); -extern void nni_ep_rem_pipe(nni_ep *, nni_pipe *); +extern int nni_ep_pipe_add(nni_ep *, nni_pipe *); +extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *); #endif // CORE_ENDPT_H diff --git a/src/core/pipe.c b/src/core/pipe.c index 80b39171..5d774e6d 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -31,6 +31,7 @@ nni_pipe_ctor(uint32_t id) p->p_tran_data = NULL; p->p_proto_data = NULL; + p->p_proto_dtor = NULL; p->p_id = id; NNI_LIST_NODE_INIT(&p->p_sock_node); @@ -45,8 +46,9 @@ nni_pipe_dtor(void *ptr) { nni_pipe *p = ptr; - nni_sock_pipe_rem(p->p_sock, p); - + if (p->p_proto_dtor != NULL) { + p->p_proto_dtor(p->p_proto_data); + } if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } @@ -140,11 +142,23 @@ nni_pipe_close(nni_pipe *p) } nni_mtx_unlock(&p->p_mtx); +} - // Let the socket (and endpoint) know we have closed. - nni_sock_pipe_closed(sock, p); - nni_objhash_unref(nni_pipes, p->p_id); +// nni_pipe_remove is called by protocol implementations to indicate that +// they are finished using the pipe (it should be closed already), and the +// owning socket and endpoint should de-register it. +void +nni_pipe_remove(nni_pipe *p) +{ + // Make sure the pipe is closed, in case it wasn't already done. + nni_pipe_close(p); + + nni_ep_pipe_remove(p->p_ep, p); + nni_sock_pipe_remove(p->p_sock, p); + + // XXX: would be simpler to just do a destroy here + nni_pipe_rele(p); } @@ -167,6 +181,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) return (rv); } p->p_sock = sock; + p->p_ep = ep; // Make a copy of the transport ops. We can override entry points // and we avoid an extra dereference on hot code paths. @@ -178,8 +193,12 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) return (rv); } + if ((rv = nni_ep_pipe_add(ep, p)) != 0) { + nni_pipe_remove(p); + } if ((rv = nni_sock_pipe_add(sock, p)) != 0) { - nni_objhash_unref(nni_pipes, p->p_id); + nni_pipe_remove(p); + //nni_objhash_unref(nni_pipes, p->p_id); return (rv); } @@ -188,15 +207,6 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) } -void -nni_pipe_destroy(nni_pipe *p) -{ - NNI_ASSERT(p->p_refcnt == 0); - - nni_objhash_unref(nni_pipes, p->p_id); -} - - int nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) { @@ -221,7 +231,7 @@ nni_pipe_start(nni_pipe *p) NNI_ASSERT(p == scratch); if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) { - nni_pipe_close(p); + nni_pipe_remove(p); return (rv); } @@ -232,9 +242,10 @@ nni_pipe_start(nni_pipe *p) void -nni_pipe_set_proto_data(nni_pipe *p, void *data) +nni_pipe_set_proto_data(nni_pipe *p, void *data, nni_cb dtor) { p->p_proto_data = data; + p->p_proto_dtor = dtor; } diff --git a/src/core/pipe.h b/src/core/pipe.h index 379b1d8a..1f911480 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -23,13 +23,13 @@ struct nni_pipe { nni_tran_pipe p_tran_ops; void * p_tran_data; void * p_proto_data; + nni_cb p_proto_dtor; nni_list_node p_sock_node; nni_list_node p_ep_node; nni_sock * p_sock; nni_ep * p_ep; int p_reap; nni_mtx p_mtx; - int p_refcnt; }; extern int nni_pipe_sys_init(void); @@ -41,16 +41,24 @@ extern int nni_pipe_aio_send(nni_pipe *, nni_aio *); // Pipe operations that protocols use. extern uint32_t nni_pipe_id(nni_pipe *); + +// nni_pipe_close closes the underlying transport for the pipe. Further +// operations against will return NNG_ECLOSED. extern void nni_pipe_close(nni_pipe *); + extern void nni_pipe_hold(nni_pipe *); extern void nni_pipe_rele(nni_pipe *); +// nni_pipe_remove is called by the protocol when it is done with the socket. +// The pipe should already be closed; it will be unregistered and it's +// resources released back to the system. The protocol MUST not reference +// the pipe after this. +extern void nni_pipe_remove(nni_pipe *); + // Used only by the socket core - as we don't wish to expose the details // of the pipe structure outside of pipe.c. extern int nni_pipe_create(nni_pipe **, nni_ep *, nni_sock *, nni_tran *); -extern void nni_pipe_destroy(nni_pipe *); - extern uint16_t nni_pipe_proto(nni_pipe *); extern uint16_t nni_pipe_peer(nni_pipe *); extern int nni_pipe_start(nni_pipe *); @@ -58,8 +66,9 @@ extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep); // nni_pipe_set_proto_data sets the protocol private data. No locking is // performed, and this routine should only be called once per pipe at -// initialization. -extern void nni_pipe_set_proto_data(nni_pipe *, void *); +// initialization. The third argument is called to destroy the data, +// at termination. +extern void nni_pipe_set_proto_data(nni_pipe *, void *, nni_cb); // nni_pipe_get_proto_data gets the protocol private data set with the // nni_pipe_set_proto_data function. No locking is performed. diff --git a/src/core/socket.c b/src/core/socket.c index 257f7fa3..67b3f978 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -101,7 +101,7 @@ nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) sock->s_pipe_ops.pipe_fini(pdata); return (NNG_ECLOSED); } - nni_pipe_set_proto_data(pipe, pdata); + nni_pipe_set_proto_data(pipe, pdata, sock->s_pipe_ops.pipe_fini); nni_list_append(&sock->s_pipes, pipe); nni_mtx_unlock(&sock->s_mx); return (0); @@ -137,58 +137,18 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) void -nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe) +nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) { - nni_ep *ep; - void *pdata = nni_pipe_get_proto_data(pipe); - - nni_mtx_lock(&sock->s_mx); - - // NB: nni_list_remove doesn't really care *which* list the pipe - // is on, and so if the pipe is already on the idle list these - // two statements are effectively a no-op. - nni_list_remove(&sock->s_pipes, pipe); - if (nni_list_first(&sock->s_pipes) == NULL) { - nni_cv_wake(&sock->s_cv); - } - - sock->s_pipe_ops.pipe_stop(pdata); - - // Notify the endpoint that the pipe has closed. - if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { - ep->ep_pipe = NULL; - nni_cv_wake(&ep->ep_cv); - } - nni_mtx_unlock(&sock->s_mx); -} - - -void -nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe) -{ - nni_ep *ep; - void *pdata = nni_pipe_get_proto_data(pipe); + void *pdata; nni_mtx_lock(&sock->s_mx); - if (nni_list_active(&sock->s_pipes, pipe)) { nni_list_remove(&sock->s_pipes, pipe); + if (sock->s_closing) { + nni_cv_wake(&sock->s_cv); + } } - - if (pdata != NULL) { - sock->s_pipe_ops.pipe_fini(pdata); - } - - // XXX: Move this to a seperate ep-specific API. - // Notify the endpoint that the pipe has closed - if not already done. - if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { - ep->ep_pipe = NULL; - nni_cv_wake(&ep->ep_cv); - } - nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); - - // XXX release the hold on the pipe } @@ -556,12 +516,13 @@ nni_sock_shutdown(nni_sock *sock) linger = nni_clock() + sock->s_linger; } + // Stop the EPs. This prevents new connections from forming but + // but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_close(ep); + nni_ep_stop(ep); } nni_mtx_unlock(&sock->s_mx); - // We drain the upper write queue. This is just like closing it, // except that the protocol gets a chance to get the messages and // push them down to the transport. This operation can *block* @@ -588,23 +549,36 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // Stop all EPS. - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { - nni_list_remove(&sock->s_eps, ep); - nni_mtx_unlock(&sock->s_mx); + // For each ep, close it; this will also tell it to force any + // of its pipes to close. + NNI_LIST_FOREACH (&sock->s_eps, ep) { nni_ep_close(ep); - nni_ep_rele(ep); - nni_mtx_lock(&sock->s_mx); } // For each pipe, close the underlying transport. Also move it // to the idle list so we won't keep looping. - while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_mtx_unlock(&sock->s_mx); + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { nni_pipe_close(pipe); + } + + // Wait for the eps to be reaped. + while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + nni_list_remove(&sock->s_eps, ep); + + // This has to be done without the lock held, as the remove + // operation requires shutting down a thread which might be + // trying to acquire the socket lock. + nni_mtx_unlock(&sock->s_mx); + nni_ep_remove(ep); nni_mtx_lock(&sock->s_mx); } + // Wait for the pipes to be reaped (there should not be any because + // we have already reaped the EPs.) + while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { + nni_cv_wait(&sock->s_cv); + } + sock->s_sock_ops.sock_close(sock->s_data); nni_cv_wake(&sock->s_cv); @@ -620,14 +594,14 @@ nni_sock_shutdown(nni_sock *sock) } -// nni_sock_add_ep adds a newly created endpoint to the socket. The +// nni_sock_ep_add adds a newly created endpoint to the socket. The // caller must hold references on the sock and the ep, and not be holding // the socket lock. The ep acquires a reference against the sock, // which will be dropped later by nni_sock_rem_ep. The endpoint must not // already be associated with a socket. (Note, the ep holds the reference // on the socket, not the other way around.) int -nni_sock_add_ep(nni_sock *sock, nni_ep *ep) +nni_sock_ep_add(nni_sock *sock, nni_ep *ep) { int rv; @@ -643,7 +617,7 @@ nni_sock_add_ep(nni_sock *sock, nni_ep *ep) void -nni_sock_rem_ep(nni_sock *sock, nni_ep *ep) +nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) { // If we're not on the list, then nothing to do. Be idempotent. // Note that if the ep is not on a list, then we assume that we have @@ -832,8 +806,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) } if ((rv = nni_ep_dial(ep, flags)) != 0) { - nni_ep_close(ep); - nni_ep_rele(ep); + nni_ep_remove(ep); } else if (epp != NULL) { *epp = ep; } @@ -853,8 +826,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) } if ((rv = nni_ep_listen(ep, flags)) != 0) { - nni_ep_close(ep); - nni_ep_rele(ep); + nni_ep_remove(ep); } else if (epp != NULL) { *epp = ep; } diff --git a/src/core/socket.h b/src/core/socket.h index 68f05705..7d5e0f20 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -85,18 +85,19 @@ extern void nni_sock_unlock(nni_sock *); extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *); extern void nni_sock_unnotify(nni_sock *, nni_notify *); -extern int nni_sock_add_ep(nni_sock *, nni_ep *); -extern void nni_sock_rem_ep(nni_sock *, nni_ep *); +extern int nni_sock_ep_add(nni_sock *, nni_ep *); +extern void nni_sock_ep_remove(nni_sock *, nni_ep *); // nni_sock_pipe_add is called by the pipe to register the pipe with // with the socket. The pipe is added to the idle list. The protocol // private pipe data is initialized as well. extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); -// nni_sock_pipe_rem deregisters the pipe from the socket. The socket -// will block during close if there are registered pipes outstanding. -// This also frees any protocol private pipe data. -extern void nni_sock_pipe_rem(nni_sock *, nni_pipe *); +// nni_sock_pipe_remove is called by the pipe when the protocol is +// done with it. This is the sockets indication that it should be +// removed, and freed. The protocol MUST guarantee that the pipe is +// no longer in use when this function is called. +extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); // nni_sock_pipe_ready lets the socket know the pipe is ready for // business. This also calls the socket/protocol specific add function, @@ -104,12 +105,6 @@ extern void nni_sock_pipe_rem(nni_sock *, nni_pipe *); // on success. The reference count should be dropped by nni_sock_pipe_closed. extern int nni_sock_pipe_ready(nni_sock *, nni_pipe *); -// nni_sock_pipe_closed lets the socket know that the pipe is closed. -// This keeps the socket from trying to schedule traffic to it. It -// also lets the endpoint know about it, to possibly restart a dial -// operation. -extern void nni_sock_pipe_closed(nni_sock *, nni_pipe *); - // Set error codes for applications. These are only ever // called from the filter functions in protocols, and thus // already have the socket lock held. -- cgit v1.2.3-70-g09d2