diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 87 |
1 files changed, 50 insertions, 37 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 4be5a856..3a4f36e5 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -109,7 +109,8 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) void nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe) { - void *pdata; + void * pdata; + nni_ep *ep; pdata = nni_pipe_get_proto_data(pipe); @@ -119,10 +120,24 @@ nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe) nni_mtx_unlock(&sock->s_mx); return; } + + // Break up the relationship between the EP and the pipe. + if ((ep = pipe->p_ep) != NULL) { + nni_mtx_lock(&ep->ep_mtx); + // During early init, the pipe might not have this set. + if (nni_list_active(&ep->ep_pipes, pipe)) { + nni_list_remove(&ep->ep_pipes, pipe); + } + pipe->p_ep = NULL; + ep->ep_pipe = NULL; // XXX: remove this soon + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); + } + sock->s_pipe_ops.pipe_stop(pdata); if (nni_list_active(&sock->s_pipes, pipe)) { nni_list_remove(&sock->s_pipes, pipe); - if (sock->s_closing) { + if (sock->s_closing && nni_list_empty(&sock->s_pipes)) { nni_cv_wake(&sock->s_cv); } } @@ -459,10 +474,10 @@ nni_sock_shutdown(nni_sock *sock) linger = nni_clock() + sock->s_linger; } - // Stop the EPs. This prevents new connections from forming but + // Close the EPs. This prevents new connections from forming but // but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_stop(ep); + nni_ep_close(ep); } nni_mtx_unlock(&sock->s_mx); @@ -492,15 +507,14 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // For each pipe, close the underlying transport. + // For each pipe, arrange for it to teardown hard. (Close, etc.). NNI_LIST_FOREACH (&sock->s_pipes, pipe) { nni_pipe_stop(pipe); } - // For each ep, close it; this will also tell it to force any - // of its pipes to close. + // For each ep, arrange for it to teardown hard. NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_close(ep); + nni_ep_stop(ep); } // Wait for the pipes to be reaped (there should not be any because @@ -511,14 +525,7 @@ nni_sock_shutdown(nni_sock *sock) // Wait for the eps to be reaped. while ((ep = nni_list_first(&sock->s_eps)) != NULL) { - nni_list_remove(&sock->s_eps, ep); - - // This has to be done without the lock held, as the remove - // operation requires shutting down a thread which might be - // trying to acquire the socket lock. - nni_mtx_unlock(&sock->s_mx); - nni_ep_remove(ep); - nni_mtx_lock(&sock->s_mx); + nni_cv_wait(&sock->s_cv); } sock->s_sock_ops.sock_close(sock->s_data); @@ -535,28 +542,10 @@ nni_sock_shutdown(nni_sock *sock) return (0); } -// nni_sock_ep_add adds a newly created endpoint to the socket. The -// caller must hold references on the sock and the ep, and not be holding -// the socket lock. The ep acquires a reference against the sock, -// which will be dropped later by nni_sock_rem_ep. The endpoint must not -// already be associated with a socket. (Note, the ep holds the reference -// on the socket, not the other way around.) -int -nni_sock_ep_add(nni_sock *sock, nni_ep *ep) -{ - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); - return (NNG_ECLOSED); - } - nni_list_append(&sock->s_eps, ep); - nni_mtx_unlock(&sock->s_mx); - return (0); -} - void nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) { + nni_pipe *pipe; // If we're not on the list, then nothing to do. Be idempotent. // Note that if the ep is not on a list, then we assume that we have // exclusive access. Therefore the check for being active need not @@ -564,8 +553,24 @@ nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) if ((sock == NULL) || (!nni_list_active(&sock->s_eps, ep))) { return; } + + // This is done under the endpoints lock, although the remove + // is done under that as well, we also make sure that we hold + // the socket lock in the remove step. + nni_mtx_lock(&ep->ep_mtx); + NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { + nni_pipe_stop(pipe); + } + while (!nni_list_empty(&ep->ep_pipes)) { + nni_cv_wait(&ep->ep_cv); + } + nni_mtx_unlock(&ep->ep_mtx); + nni_mtx_lock(&sock->s_mx); nni_list_remove(&sock->s_eps, ep); + if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) { + nni_cv_wake(&sock->s_cv); + } nni_mtx_unlock(&sock->s_mx); } @@ -727,12 +732,16 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) nni_ep *ep; int rv; + nni_mtx_lock(&sock->s_mx); if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) { + nni_mtx_unlock(&sock->s_mx); return (rv); } + nni_list_append(&sock->s_eps, ep); + nni_mtx_unlock(&sock->s_mx); if ((rv = nni_ep_dial(ep, flags)) != 0) { - nni_ep_remove(ep); + nni_ep_stop(ep); } else if (epp != NULL) { *epp = ep; } @@ -746,12 +755,16 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) nni_ep *ep; int rv; + nni_mtx_lock(&sock->s_mx); if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) { + nni_mtx_unlock(&sock->s_mx); return (rv); } + nni_list_append(&sock->s_eps, ep); + nni_mtx_unlock(&sock->s_mx); if ((rv = nni_ep_listen(ep, flags)) != 0) { - nni_ep_remove(ep); + nni_ep_stop(ep); } else if (epp != NULL) { *epp = ep; } |
