From 0a51aa7bfc88d55b98fdde0d497b072e6911457d Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 24 Jun 2017 14:11:35 -0700 Subject: Protocols keep their own reference counts. --- src/core/socket.c | 98 ++++++++++++++++++++----------------------------------- 1 file changed, 35 insertions(+), 63 deletions(-) (limited to 'src/core/socket.c') diff --git a/src/core/socket.c b/src/core/socket.c index 257f7fa3..67b3f978 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -101,7 +101,7 @@ nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) sock->s_pipe_ops.pipe_fini(pdata); return (NNG_ECLOSED); } - nni_pipe_set_proto_data(pipe, pdata); + nni_pipe_set_proto_data(pipe, pdata, sock->s_pipe_ops.pipe_fini); nni_list_append(&sock->s_pipes, pipe); nni_mtx_unlock(&sock->s_mx); return (0); @@ -137,58 +137,18 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) void -nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe) +nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) { - nni_ep *ep; - void *pdata = nni_pipe_get_proto_data(pipe); - - nni_mtx_lock(&sock->s_mx); - - // NB: nni_list_remove doesn't really care *which* list the pipe - // is on, and so if the pipe is already on the idle list these - // two statements are effectively a no-op. - nni_list_remove(&sock->s_pipes, pipe); - if (nni_list_first(&sock->s_pipes) == NULL) { - nni_cv_wake(&sock->s_cv); - } - - sock->s_pipe_ops.pipe_stop(pdata); - - // Notify the endpoint that the pipe has closed. - if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { - ep->ep_pipe = NULL; - nni_cv_wake(&ep->ep_cv); - } - nni_mtx_unlock(&sock->s_mx); -} - - -void -nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe) -{ - nni_ep *ep; - void *pdata = nni_pipe_get_proto_data(pipe); + void *pdata; nni_mtx_lock(&sock->s_mx); - if (nni_list_active(&sock->s_pipes, pipe)) { nni_list_remove(&sock->s_pipes, pipe); + if (sock->s_closing) { + nni_cv_wake(&sock->s_cv); + } } - - if (pdata != NULL) { - sock->s_pipe_ops.pipe_fini(pdata); - } - - // XXX: Move this to a seperate ep-specific API. - // Notify the endpoint that the pipe has closed - if not already done. - if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { - ep->ep_pipe = NULL; - nni_cv_wake(&ep->ep_cv); - } - nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); - - // XXX release the hold on the pipe } @@ -556,12 +516,13 @@ nni_sock_shutdown(nni_sock *sock) linger = nni_clock() + sock->s_linger; } + // Stop the EPs. This prevents new connections from forming but + // but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_close(ep); + nni_ep_stop(ep); } nni_mtx_unlock(&sock->s_mx); - // We drain the upper write queue. This is just like closing it, // except that the protocol gets a chance to get the messages and // push them down to the transport. This operation can *block* @@ -588,23 +549,36 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // Stop all EPS. - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { - nni_list_remove(&sock->s_eps, ep); - nni_mtx_unlock(&sock->s_mx); + // For each ep, close it; this will also tell it to force any + // of its pipes to close. + NNI_LIST_FOREACH (&sock->s_eps, ep) { nni_ep_close(ep); - nni_ep_rele(ep); - nni_mtx_lock(&sock->s_mx); } // For each pipe, close the underlying transport. Also move it // to the idle list so we won't keep looping. - while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_mtx_unlock(&sock->s_mx); + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { nni_pipe_close(pipe); + } + + // Wait for the eps to be reaped. + while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + nni_list_remove(&sock->s_eps, ep); + + // This has to be done without the lock held, as the remove + // operation requires shutting down a thread which might be + // trying to acquire the socket lock. + nni_mtx_unlock(&sock->s_mx); + nni_ep_remove(ep); nni_mtx_lock(&sock->s_mx); } + // Wait for the pipes to be reaped (there should not be any because + // we have already reaped the EPs.) + while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { + nni_cv_wait(&sock->s_cv); + } + sock->s_sock_ops.sock_close(sock->s_data); nni_cv_wake(&sock->s_cv); @@ -620,14 +594,14 @@ nni_sock_shutdown(nni_sock *sock) } -// nni_sock_add_ep adds a newly created endpoint to the socket. The +// nni_sock_ep_add adds a newly created endpoint to the socket. The // caller must hold references on the sock and the ep, and not be holding // the socket lock. The ep acquires a reference against the sock, // which will be dropped later by nni_sock_rem_ep. The endpoint must not // already be associated with a socket. (Note, the ep holds the reference // on the socket, not the other way around.) int -nni_sock_add_ep(nni_sock *sock, nni_ep *ep) +nni_sock_ep_add(nni_sock *sock, nni_ep *ep) { int rv; @@ -643,7 +617,7 @@ nni_sock_add_ep(nni_sock *sock, nni_ep *ep) void -nni_sock_rem_ep(nni_sock *sock, nni_ep *ep) +nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) { // If we're not on the list, then nothing to do. Be idempotent. // Note that if the ep is not on a list, then we assume that we have @@ -832,8 +806,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) } if ((rv = nni_ep_dial(ep, flags)) != 0) { - nni_ep_close(ep); - nni_ep_rele(ep); + nni_ep_remove(ep); } else if (epp != NULL) { *epp = ep; } @@ -853,8 +826,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) } if ((rv = nni_ep_listen(ep, flags)) != 0) { - nni_ep_close(ep); - nni_ep_rele(ep); + nni_ep_remove(ep); } else if (epp != NULL) { *epp = ep; } -- cgit v1.2.3-70-g09d2