diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/endpt.c | 4 | ||||
| -rw-r--r-- | src/core/pipe.c | 34 |
2 files changed, 30 insertions, 8 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 5f695d41..554d9a36 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -107,7 +107,7 @@ nni_ep_connect(nni_ep *ep, nni_pipe **pp) } rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_tran_data); if (rv != 0) { - nni_pipe_close(pipe); + nni_pipe_destroy(pipe); return (rv); } ep->ep_pipe = pipe; @@ -243,7 +243,7 @@ nni_ep_accept(nni_ep *ep, nni_pipe **pp) } rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_tran_data); if (rv != 0) { - nni_pipe_close(pipe); + nni_pipe_destroy(pipe); return (rv); } *pp = pipe; diff --git a/src/core/pipe.c b/src/core/pipe.c index b3a107ff..93b08b08 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -60,6 +60,28 @@ nni_pipe_close(nni_pipe *p) } +// nni_pipe_bail is a special version of close, that is used to abort +// from nni_pipe_start, when it fails. It requires the lock to be held, +// and this prevents us from dropping the lock, possibly leading to race +// conditions. +static void +nni_pipe_bail(nni_pipe *p) +{ + nni_sock *sock = p->p_sock; + if (p->p_tran_data != NULL) { + p->p_tran_ops.pipe_close(p->p_tran_data); + } + + if (!p->p_reap) { + // schedule deferred reap/close + p->p_reap = 1; + nni_list_remove(&sock->s_pipes, p); + nni_list_append(&sock->s_reaps, p); + nni_cv_wake(&sock->s_cv); + } +} + + uint16_t nni_pipe_peer(nni_pipe *p) { @@ -121,9 +143,6 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) NNI_FREE_STRUCT(p); return (rv); } - nni_mtx_lock(&sock->s_mx); - nni_list_append(&sock->s_pipes, p); - nni_mtx_unlock(&sock->s_mx); *pp = p; return (0); @@ -150,14 +169,14 @@ nni_pipe_start(nni_pipe *pipe) nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { + nni_pipe_bail(pipe); nni_mtx_unlock(&sock->s_mx); - nni_pipe_close(pipe); return (NNG_ECLOSED); } if (nni_pipe_peer(pipe) != sock->s_peer) { + nni_pipe_bail(pipe); nni_mtx_unlock(&sock->s_mx); - nni_pipe_close(pipe); return (NNG_EPROTO); } @@ -178,10 +197,13 @@ nni_pipe_start(nni_pipe *pipe) } while (collide); if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) { + nni_pipe_bail(pipe); nni_mtx_unlock(&sock->s_mx); - nni_pipe_close(pipe); return (rv); } + + nni_list_append(&sock->s_pipes, pipe); + nni_thr_run(&pipe->p_send_thr); nni_thr_run(&pipe->p_recv_thr); pipe->p_active = 1; |
