From a37093079b492e966344416445aae354b147d30e Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 20 Jul 2017 14:34:51 -0700 Subject: Yet more race condition fixes. We need to remember that protocol stops can run synchronously, and therefore we need to wait for the aio to complete. Further, we need to break apart shutting down aio activity from deallocation, as we need to shut down *all* async activity before deallocating *anything*. Noticed that we had a pipe race in the surveyor pattern too. --- src/core/socket.c | 93 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 78 insertions(+), 15 deletions(-) (limited to 'src/core/socket.c') diff --git a/src/core/socket.c b/src/core/socket.c index 10bc1c80..23a1793a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -69,27 +69,94 @@ nni_sock_rele(nni_sock *sock) nni_objhash_unref(nni_socks, sock->s_id); } +static int +nni_sock_pipe_start(nni_pipe *pipe) +{ + nni_sock *sock = pipe->p_sock; + void * pdata = nni_pipe_get_proto_data(pipe); + int rv; + + NNI_ASSERT(sock != NULL); + if (sock->s_closing) { + // We're closing, bail out. + return (NNG_ECLOSED); + } + if (nni_pipe_peer(pipe) != sock->s_peer) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) { + // Protocol rejection for other reasons. + // E.g. pair and already have active connected partner. + return (rv); + } + return (0); +} + +static void +nni_sock_pipe_start_cb(void *arg) +{ + nni_pipe *pipe = arg; + nni_aio * aio = &pipe->p_start_aio; + + if (nni_aio_result(aio) != 0) { + // Failed I/O during start, abort everything. + nni_pipe_stop(pipe); + return; + } + if (nni_sock_pipe_start(pipe) != 0) { + nni_pipe_stop(pipe); + return; + } +} + int -nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) +nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe) { int rv; // Initialize protocol pipe data. nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { + nni_mtx_lock(&ep->ep_mtx); + + if ((sock->s_closing) || (ep->ep_closed)) { + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } + rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe); + if (rv != 0) { + nni_mtx_unlock(&ep->ep_mtx); + nni_mtx_unlock(&sock->s_mx); + return (rv); + } + rv = sock->s_pipe_ops.pipe_init( &pipe->p_proto_data, pipe, sock->s_data); if (rv != 0) { + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_lock(&sock->s_mx); return (rv); } // Save the protocol destructor. pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini; pipe->p_sock = sock; + pipe->p_ep = ep; + nni_list_append(&sock->s_pipes, pipe); + nni_list_append(&ep->ep_pipes, pipe); + + // Start the initial negotiation I/O... + if (pipe->p_tran_ops.p_start == NULL) { + if (nni_sock_pipe_start(pipe) != 0) { + nni_pipe_stop(pipe); + } + } else { + pipe->p_tran_ops.p_start( + pipe->p_tran_data, &pipe->p_start_aio); + } + + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); return (0); } @@ -128,8 +195,10 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) pdata = nni_pipe_get_proto_data(pipe); - nni_mtx_lock(&sock->s_mx); + // Stop any pending negotiation. + nni_aio_stop(&pipe->p_start_aio); + nni_mtx_lock(&sock->s_mx); if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) { nni_mtx_unlock(&sock->s_mx); return; @@ -508,24 +577,18 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // For each pipe, arrange for it to teardown hard. (Close, etc.). - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_stop(pipe); - } - // For each ep, arrange for it to teardown hard. NNI_LIST_FOREACH (&sock->s_eps, ep) { nni_ep_stop(ep); } - - // Wait for the pipes to be reaped (there should not be any because - // we have already reaped the EPs.) - while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_cv_wait(&sock->s_cv); + // For each pipe, arrange for it to teardown hard. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_stop(pipe); } - // Wait for the eps to be reaped. - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + // We have to wait for *both* endpoints and pipes to be removed. + while ((!nni_list_empty(&sock->s_pipes)) || + (!nni_list_empty(&sock->s_eps))) { nni_cv_wait(&sock->s_cv); } -- cgit v1.2.3-70-g09d2