aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c114
1 files changed, 48 insertions, 66 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 1658aabc..75c4c8d6 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -15,6 +15,24 @@
// Operations on pipes (to the transport) are generally blocking operations,
// performed in the context of the protocol.
+struct nni_pipe {
+ uint32_t p_id;
+ nni_tran_pipe p_tran_ops;
+ void * p_tran_data;
+ void * p_proto_data;
+ nni_list_node p_sock_node;
+ nni_list_node p_ep_node;
+ nni_sock * p_sock;
+ nni_ep * p_ep;
+ int p_reap;
+ int p_stop;
+ int p_refcnt;
+ nni_mtx p_mtx;
+ nni_cv p_cv;
+ nni_list_node p_reap_node;
+ nni_aio p_start_aio;
+};
+
static nni_idhash *nni_pipes;
static nni_list nni_pipe_reap_list;
@@ -78,7 +96,10 @@ nni_pipe_destroy(nni_pipe *p)
if (p == NULL) {
return;
}
- NNI_ASSERT(p->p_refcnt != 0xDEAD);
+
+ // Stop any pending negotiation.
+ nni_aio_stop(&p->p_start_aio);
+
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
nni_mtx_lock(&p->p_mtx);
@@ -86,14 +107,17 @@ nni_pipe_destroy(nni_pipe *p)
nni_cv_wait(&p->p_cv);
}
nni_mtx_unlock(&p->p_mtx);
- p->p_refcnt = 0xDEAD;
- nni_aio_stop(&p->p_start_aio);
+ // We have exclusive access at this point, so we can check if
+ // we are still on any lists.
+
nni_aio_fini(&p->p_start_aio);
- if (p->p_proto_data != NULL) {
- p->p_proto_dtor(p->p_proto_data);
+ if (nni_list_node_active(&p->p_ep_node)) {
+ nni_ep_pipe_remove(p->p_ep, p);
}
+ nni_sock_pipe_remove(p->p_sock, p);
+
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
@@ -148,31 +172,6 @@ nni_pipe_close(nni_pipe *p)
nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
}
-// Pipe reap is called on a taskq when the pipe should be closed. No
-// locks are held. This routine must take care to synchronously ensure
-// that no further references to the pipe are possible, then it may
-// destroy the pipe.
-static void
-nni_pipe_reap(nni_pipe *p)
-{
- // Transport close...
- nni_pipe_close(p);
-
- nni_aio_stop(&p->p_start_aio);
-
- // Remove the pipe from the socket and the endpoint. Note
- // that it is in theory possible for either of these to be null
- // if the pipe is being torn down before it is fully initialized.
- if (p->p_ep != NULL) {
- nni_ep_pipe_remove(p->p_ep, p);
- }
- if (p->p_sock != NULL) {
- nni_sock_pipe_remove(p->p_sock, p);
- }
-
- nni_pipe_destroy(p);
-}
-
void
nni_pipe_stop(nni_pipe *p)
{
@@ -206,16 +205,12 @@ nni_pipe_start_cb(void *arg)
nni_aio * aio = &p->p_start_aio;
int rv;
- nni_mtx_lock(&p->p_mtx);
if ((rv = nni_aio_result(aio)) != 0) {
- nni_mtx_unlock(&p->p_mtx);
nni_pipe_stop(p);
return;
}
- nni_mtx_unlock(&p->p_mtx);
-
- if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) {
+ if ((rv = nni_sock_pipe_start(p->p_sock, p)) != 0) {
nni_pipe_stop(p);
}
}
@@ -225,8 +220,8 @@ nni_pipe_create(nni_ep *ep, void *tdata)
{
nni_pipe *p;
int rv;
- nni_tran *tran = ep->ep_tran;
- nni_sock *sock = ep->ep_sock;
+ nni_tran *tran = nni_ep_tran(ep);
+ nni_sock *sock = nni_ep_sock(ep);
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -238,31 +233,24 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_data = NULL;
+ p->p_ep = ep;
+ p->p_sock = sock;
NNI_LIST_NODE_INIT(&p->p_reap_node);
-
- if (((rv = nni_mtx_init(&p->p_mtx)) != 0) ||
- ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0)) {
- tran->tran_pipe->p_fini(p);
- nni_mtx_fini(&p->p_mtx);
- NNI_FREE_STRUCT(p);
- return (rv);
- }
-
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
- if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) {
- nni_pipe_destroy(p);
- return (rv);
- }
-
- if ((rv = nni_sock_pipe_add(sock, ep, p)) != 0) {
+ if (((rv = nni_mtx_init(&p->p_mtx)) != 0) ||
+ ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0) ||
+ ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) !=
+ 0) ||
+ ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) ||
+ ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
+ ((rv = nni_sock_pipe_add(sock, p)) != 0)) {
nni_pipe_destroy(p);
- return (rv);
}
- return (0);
+ return (rv);
}
int
@@ -292,6 +280,12 @@ nni_pipe_get_proto_data(nni_pipe *p)
}
void
+nni_pipe_set_proto_data(nni_pipe *p, void *data)
+{
+ p->p_proto_data = data;
+}
+
+void
nni_pipe_sock_list_init(nni_list *list)
{
NNI_LIST_INIT(list, nni_pipe, p_sock_node);
@@ -318,18 +312,6 @@ nni_pipe_reaper(void *notused)
// Transport close...
nni_pipe_close(p);
- nni_aio_stop(&p->p_start_aio);
-
- // Remove the pipe from the socket and the endpoint.
- // Note that it is in theory possible for either of
- // these to be null if the pipe is being torn down
- // before it is fully initialized.
- if (p->p_ep != NULL) {
- nni_ep_pipe_remove(p->p_ep, p);
- }
- if (p->p_sock != NULL) {
- nni_sock_pipe_remove(p->p_sock, p);
- }
nni_pipe_destroy(p);
nni_mtx_lock(&nni_pipe_reap_lk);
continue;