aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/endpt.c4
-rw-r--r--src/core/pipe.c34
2 files changed, 30 insertions, 8 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 5f695d41..554d9a36 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -107,7 +107,7 @@ nni_ep_connect(nni_ep *ep, nni_pipe **pp)
}
rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_tran_data);
if (rv != 0) {
- nni_pipe_close(pipe);
+ nni_pipe_destroy(pipe);
return (rv);
}
ep->ep_pipe = pipe;
@@ -243,7 +243,7 @@ nni_ep_accept(nni_ep *ep, nni_pipe **pp)
}
rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_tran_data);
if (rv != 0) {
- nni_pipe_close(pipe);
+ nni_pipe_destroy(pipe);
return (rv);
}
*pp = pipe;
diff --git a/src/core/pipe.c b/src/core/pipe.c
index b3a107ff..93b08b08 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -60,6 +60,28 @@ nni_pipe_close(nni_pipe *p)
}
+// nni_pipe_bail is a special version of close, that is used to abort
+// from nni_pipe_start, when it fails. It requires the lock to be held,
+// and this prevents us from dropping the lock, possibly leading to race
+// conditions.
+static void
+nni_pipe_bail(nni_pipe *p)
+{
+ nni_sock *sock = p->p_sock;
+ if (p->p_tran_data != NULL) {
+ p->p_tran_ops.pipe_close(p->p_tran_data);
+ }
+
+ if (!p->p_reap) {
+ // schedule deferred reap/close
+ p->p_reap = 1;
+ nni_list_remove(&sock->s_pipes, p);
+ nni_list_append(&sock->s_reaps, p);
+ nni_cv_wake(&sock->s_cv);
+ }
+}
+
+
uint16_t
nni_pipe_peer(nni_pipe *p)
{
@@ -121,9 +143,6 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
NNI_FREE_STRUCT(p);
return (rv);
}
- nni_mtx_lock(&sock->s_mx);
- nni_list_append(&sock->s_pipes, p);
- nni_mtx_unlock(&sock->s_mx);
*pp = p;
return (0);
@@ -150,14 +169,14 @@ nni_pipe_start(nni_pipe *pipe)
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
+ nni_pipe_bail(pipe);
nni_mtx_unlock(&sock->s_mx);
- nni_pipe_close(pipe);
return (NNG_ECLOSED);
}
if (nni_pipe_peer(pipe) != sock->s_peer) {
+ nni_pipe_bail(pipe);
nni_mtx_unlock(&sock->s_mx);
- nni_pipe_close(pipe);
return (NNG_EPROTO);
}
@@ -178,10 +197,13 @@ nni_pipe_start(nni_pipe *pipe)
} while (collide);
if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) {
+ nni_pipe_bail(pipe);
nni_mtx_unlock(&sock->s_mx);
- nni_pipe_close(pipe);
return (rv);
}
+
+ nni_list_append(&sock->s_pipes, pipe);
+
nni_thr_run(&pipe->p_send_thr);
nni_thr_run(&pipe->p_recv_thr);
pipe->p_active = 1;