diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-20 14:34:51 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-20 14:34:51 -0700 |
| commit | a37093079b492e966344416445aae354b147d30e (patch) | |
| tree | 2f21fc2bc716f2423ba02f4713b25038c429ec4e /src/core/socket.c | |
| parent | 88fb04f61918b06e6e269c1960058c3df5e0a0ef (diff) | |
| download | nng-a37093079b492e966344416445aae354b147d30e.tar.gz nng-a37093079b492e966344416445aae354b147d30e.tar.bz2 nng-a37093079b492e966344416445aae354b147d30e.zip | |
Yet more race condition fixes.
We need to remember that protocol stops can run synchronously, and
therefore we need to wait for the aio to complete. Further, we need
to break apart shutting down aio activity from deallocation, as we need
to shut down *all* async activity before deallocating *anything*.
Noticed that we had a pipe race in the surveyor pattern too.
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 93 |
1 files changed, 78 insertions, 15 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 10bc1c80..23a1793a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -69,27 +69,94 @@ nni_sock_rele(nni_sock *sock) nni_objhash_unref(nni_socks, sock->s_id); } +static int +nni_sock_pipe_start(nni_pipe *pipe) +{ + nni_sock *sock = pipe->p_sock; + void * pdata = nni_pipe_get_proto_data(pipe); + int rv; + + NNI_ASSERT(sock != NULL); + if (sock->s_closing) { + // We're closing, bail out. + return (NNG_ECLOSED); + } + if (nni_pipe_peer(pipe) != sock->s_peer) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) { + // Protocol rejection for other reasons. + // E.g. pair and already have active connected partner. + return (rv); + } + return (0); +} + +static void +nni_sock_pipe_start_cb(void *arg) +{ + nni_pipe *pipe = arg; + nni_aio * aio = &pipe->p_start_aio; + + if (nni_aio_result(aio) != 0) { + // Failed I/O during start, abort everything. + nni_pipe_stop(pipe); + return; + } + if (nni_sock_pipe_start(pipe) != 0) { + nni_pipe_stop(pipe); + return; + } +} + int -nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) +nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe) { int rv; // Initialize protocol pipe data. nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { + nni_mtx_lock(&ep->ep_mtx); + + if ((sock->s_closing) || (ep->ep_closed)) { + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } + rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe); + if (rv != 0) { + nni_mtx_unlock(&ep->ep_mtx); + nni_mtx_unlock(&sock->s_mx); + return (rv); + } + rv = sock->s_pipe_ops.pipe_init( &pipe->p_proto_data, pipe, sock->s_data); if (rv != 0) { + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_lock(&sock->s_mx); return (rv); } // Save the protocol destructor. pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini; pipe->p_sock = sock; + pipe->p_ep = ep; + nni_list_append(&sock->s_pipes, pipe); + nni_list_append(&ep->ep_pipes, pipe); + + // Start the initial negotiation I/O... + if (pipe->p_tran_ops.p_start == NULL) { + if (nni_sock_pipe_start(pipe) != 0) { + nni_pipe_stop(pipe); + } + } else { + pipe->p_tran_ops.p_start( + pipe->p_tran_data, &pipe->p_start_aio); + } + + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); return (0); } @@ -128,8 +195,10 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) pdata = nni_pipe_get_proto_data(pipe); - nni_mtx_lock(&sock->s_mx); + // Stop any pending negotiation. + nni_aio_stop(&pipe->p_start_aio); + nni_mtx_lock(&sock->s_mx); if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) { nni_mtx_unlock(&sock->s_mx); return; @@ -508,24 +577,18 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // For each pipe, arrange for it to teardown hard. (Close, etc.). - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_stop(pipe); - } - // For each ep, arrange for it to teardown hard. NNI_LIST_FOREACH (&sock->s_eps, ep) { nni_ep_stop(ep); } - - // Wait for the pipes to be reaped (there should not be any because - // we have already reaped the EPs.) - while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_cv_wait(&sock->s_cv); + // For each pipe, arrange for it to teardown hard. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_stop(pipe); } - // Wait for the eps to be reaped. - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + // We have to wait for *both* endpoints and pipes to be removed. + while ((!nni_list_empty(&sock->s_pipes)) || + (!nni_list_empty(&sock->s_eps))) { nni_cv_wait(&sock->s_cv); } |
