aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c167
1 files changed, 75 insertions, 92 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index a401e4e3..18c47c60 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -50,6 +50,30 @@ nni_pipe_aio_send(nni_pipe *p, nni_aio *aio)
}
+void
+nni_pipe_incref(nni_pipe *p)
+{
+ nni_mtx_lock(&p->p_mtx);
+ p->p_refcnt++;
+ nni_mtx_unlock(&p->p_mtx);
+}
+
+
+void
+nni_pipe_decref(nni_pipe *p)
+{
+ nni_mtx_lock(&p->p_mtx);
+ p->p_refcnt--;
+ if (p->p_refcnt == 0) {
+ nni_mtx_unlock(&p->p_mtx);
+
+ nni_pipe_destroy(p);
+ return;
+ }
+ nni_mtx_unlock(&p->p_mtx);
+}
+
+
// nni_pipe_close closes the underlying connection. It is expected that
// subsequent attempts receive or send (including any waiting receive) will
// simply return NNG_ECLOSED.
@@ -58,37 +82,34 @@ nni_pipe_close(nni_pipe *p)
{
nni_sock *sock = p->p_sock;
+ nni_mtx_lock(&p->p_mtx);
+ if (p->p_reap == 1) {
+ // We already did a close.
+ nni_mtx_unlock(&p->p_mtx);
+ return;
+ }
+ p->p_reap = 1;
+
+ // Close the underlying transport.
if (p->p_tran_data != NULL) {
p->p_tran_ops.pipe_close(p->p_tran_data);
}
- nni_mtx_lock(&sock->s_mx);
- if (!p->p_reap) {
- // schedule deferred reap/close
- p->p_reap = 1;
- nni_list_remove(&sock->s_pipes, p);
- nni_list_append(&sock->s_reaps, p);
- nni_cv_wake(&sock->s_cv);
+ // Unregister our ID so nobody else can find it.
+ if (p->p_id != 0) {
+ nni_mtx_lock(nni_idlock);
+ nni_idhash_remove(nni_pipes, p->p_id);
+ nni_mtx_unlock(nni_idlock);
+ p->p_id = 0;
}
- nni_mtx_unlock(&sock->s_mx);
-}
+ nni_mtx_unlock(&p->p_mtx);
-// nni_pipe_bail is a special version of close, that is used to abort
-// from nni_pipe_start, when it fails. It requires the lock to be held,
-// and this prevents us from dropping the lock, possibly leading to race
-// conditions. It's critical that this not be called after the pipe is
-// started, or deadlock will occur.
-static void
-nni_pipe_bail(nni_pipe *p)
-{
- nni_sock *sock = p->p_sock;
-
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.pipe_close(p->p_tran_data);
- }
+ // Let the socket (and endpoint) know we have closed.
+ nni_sock_pipe_closed(sock, p);
- nni_pipe_destroy(p);
+ // Drop a reference count, possibly doing deferred destroy.
+ nni_pipe_decref(p);
}
@@ -99,25 +120,6 @@ nni_pipe_peer(nni_pipe *p)
}
-void
-nni_pipe_destroy(nni_pipe *p)
-{
- int i;
-
- for (i = 0; i < NNI_MAXWORKERS; i++) {
- nni_thr_fini(&p->p_worker_thr[i]);
- }
-
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.pipe_destroy(p->p_tran_data);
- }
- if (p->p_proto_data != NULL) {
- p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data);
- }
- NNI_FREE_STRUCT(p);
-}
-
-
int
nni_pipe_create(nni_pipe **pp, nni_ep *ep)
{
@@ -126,15 +128,17 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
const nni_proto_pipe_ops *ops = &sock->s_pipe_ops;
void *pdata;
int rv;
- int i;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_mtx_init(&p->p_mtx)) != 0) {
+ NNI_FREE_STRUCT(p);
+ return (rv);
+ }
p->p_sock = sock;
p->p_tran_data = NULL;
p->p_proto_data = NULL;
- p->p_active = 0;
p->p_id = 0;
NNI_LIST_NODE_INIT(&p->p_node);
@@ -143,30 +147,37 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
p->p_tran_ops = *ep->ep_tran->tran_pipe;
if ((rv = ops->pipe_init(&pdata, p, sock->s_data)) != 0) {
+ nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
return (rv);
}
p->p_proto_data = pdata;
-
- for (i = 0; i < NNI_MAXWORKERS; i++) {
- nni_worker fn = ops->pipe_worker[i];
- rv = nni_thr_init(&p->p_worker_thr[i], fn, pdata);
- if (rv != 0) {
- while (i > 0) {
- i--;
- nni_thr_fini(&p->p_worker_thr[i]);
- }
- ops->pipe_fini(pdata);
- NNI_FREE_STRUCT(p);
- return (rv);
- }
- }
+ nni_sock_pipe_add(sock, p);
*pp = p;
return (0);
}
+void
+nni_pipe_destroy(nni_pipe *p)
+{
+ NNI_ASSERT(p->p_refcnt == 0);
+
+ // The caller is responsible for ensuring that the pipe
+ // is not in use by any other consumers. It must not be started
+ if (p->p_tran_data != NULL) {
+ p->p_tran_ops.pipe_destroy(p->p_tran_data);
+ }
+ if (p->p_proto_data != NULL) {
+ p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data);
+ }
+ nni_sock_pipe_rem(p->p_sock, p);
+ nni_mtx_fini(&p->p_mtx);
+ NNI_FREE_STRUCT(p);
+}
+
+
int
nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
{
@@ -179,55 +190,27 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
int
-nni_pipe_start(nni_pipe *pipe)
+nni_pipe_start(nni_pipe *p)
{
int rv;
- int i;
- nni_sock *sock = pipe->p_sock;
-
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_pipe_bail(pipe);
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_ECLOSED);
- }
- if (nni_pipe_peer(pipe) != sock->s_peer) {
- nni_pipe_bail(pipe);
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_EPROTO);
- }
+ nni_pipe_incref(p);
nni_mtx_lock(nni_idlock);
- rv = nni_idhash_alloc(nni_pipes, &pipe->p_id, pipe);
+ rv = nni_idhash_alloc(nni_pipes, &p->p_id, p);
nni_mtx_unlock(nni_idlock);
if (rv != 0) {
- nni_pipe_bail(pipe);
- nni_mtx_unlock(&sock->s_mx);
+ nni_pipe_close(p);
return (rv);
}
- if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) {
- nni_mtx_lock(nni_idlock);
- nni_idhash_remove(nni_pipes, pipe->p_id);
- pipe->p_id = 0;
- nni_mtx_unlock(nni_idlock);
-
- nni_pipe_bail(pipe);
- nni_mtx_unlock(&sock->s_mx);
+ if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) {
+ nni_pipe_close(p);
return (rv);
}
- pipe->p_active = 1;
- nni_list_append(&sock->s_pipes, pipe);
-
- for (i = 0; i < NNI_MAXWORKERS; i++) {
- nni_thr_run(&pipe->p_worker_thr[i]);
- }
-
// XXX: Publish event
- nni_mtx_unlock(&sock->s_mx);
return (0);
}