diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 93 |
1 files changed, 78 insertions, 15 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 10bc1c80..23a1793a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -69,27 +69,94 @@ nni_sock_rele(nni_sock *sock) nni_objhash_unref(nni_socks, sock->s_id); } +static int +nni_sock_pipe_start(nni_pipe *pipe) +{ + nni_sock *sock = pipe->p_sock; + void * pdata = nni_pipe_get_proto_data(pipe); + int rv; + + NNI_ASSERT(sock != NULL); + if (sock->s_closing) { + // We're closing, bail out. + return (NNG_ECLOSED); + } + if (nni_pipe_peer(pipe) != sock->s_peer) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) { + // Protocol rejection for other reasons. + // E.g. pair and already have active connected partner. + return (rv); + } + return (0); +} + +static void +nni_sock_pipe_start_cb(void *arg) +{ + nni_pipe *pipe = arg; + nni_aio * aio = &pipe->p_start_aio; + + if (nni_aio_result(aio) != 0) { + // Failed I/O during start, abort everything. + nni_pipe_stop(pipe); + return; + } + if (nni_sock_pipe_start(pipe) != 0) { + nni_pipe_stop(pipe); + return; + } +} + int -nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) +nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe) { int rv; // Initialize protocol pipe data. nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { + nni_mtx_lock(&ep->ep_mtx); + + if ((sock->s_closing) || (ep->ep_closed)) { + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } + rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe); + if (rv != 0) { + nni_mtx_unlock(&ep->ep_mtx); + nni_mtx_unlock(&sock->s_mx); + return (rv); + } + rv = sock->s_pipe_ops.pipe_init( &pipe->p_proto_data, pipe, sock->s_data); if (rv != 0) { + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_lock(&sock->s_mx); return (rv); } // Save the protocol destructor. pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini; pipe->p_sock = sock; + pipe->p_ep = ep; + nni_list_append(&sock->s_pipes, pipe); + nni_list_append(&ep->ep_pipes, pipe); + + // Start the initial negotiation I/O... + if (pipe->p_tran_ops.p_start == NULL) { + if (nni_sock_pipe_start(pipe) != 0) { + nni_pipe_stop(pipe); + } + } else { + pipe->p_tran_ops.p_start( + pipe->p_tran_data, &pipe->p_start_aio); + } + + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); return (0); } @@ -128,8 +195,10 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) pdata = nni_pipe_get_proto_data(pipe); - nni_mtx_lock(&sock->s_mx); + // Stop any pending negotiation. + nni_aio_stop(&pipe->p_start_aio); + nni_mtx_lock(&sock->s_mx); if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) { nni_mtx_unlock(&sock->s_mx); return; @@ -508,24 +577,18 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // For each pipe, arrange for it to teardown hard. (Close, etc.). - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_stop(pipe); - } - // For each ep, arrange for it to teardown hard. NNI_LIST_FOREACH (&sock->s_eps, ep) { nni_ep_stop(ep); } - - // Wait for the pipes to be reaped (there should not be any because - // we have already reaped the EPs.) - while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_cv_wait(&sock->s_cv); + // For each pipe, arrange for it to teardown hard. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_stop(pipe); } - // Wait for the eps to be reaped. - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + // We have to wait for *both* endpoints and pipes to be removed. + while ((!nni_list_empty(&sock->s_pipes)) || + (!nni_list_empty(&sock->s_eps))) { nni_cv_wait(&sock->s_cv); } |
