aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-17 12:54:01 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-17 19:29:37 -0700
commit70d478f5d185e147ca8d3dcba4cbd8bb6da3719a (patch)
tree443e3b0e81138d7c195660d45eca7d4d497af8ac /src/core/pipe.c
parente490aa3353f05e158a0f1f485f371cd49e70b4f5 (diff)
downloadnng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.tar.gz
nng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.tar.bz2
nng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.zip
fixes #449 Want more flexible pipe events
This changes the signature of nng_pipe_notify(), and the associated events. The documentation is updated to reflect this. We have also broken the lock up so that we don't hold the master socket lock for some of these things, which may have beneficial impact on performance.
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c49
1 files changed, 38 insertions, 11 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 321e1a09..ccd0dbe7 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -29,6 +29,7 @@ struct nni_pipe {
nni_ep * p_ep;
bool p_closed;
bool p_stop;
+ bool p_cbs;
int p_refcnt;
nni_mtx p_mtx;
nni_cv p_cv;
@@ -100,10 +101,19 @@ nni_pipe_sys_fini(void)
static void
nni_pipe_destroy(nni_pipe *p)
{
+ bool cbs;
if (p == NULL) {
return;
}
+ nni_mtx_lock(&p->p_mtx);
+ cbs = p->p_cbs;
+ nni_mtx_unlock(&p->p_mtx);
+
+ if (cbs) {
+ nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id);
+ }
+
// Stop any pending negotiation.
nni_aio_stop(p->p_start_aio);
@@ -119,6 +129,7 @@ nni_pipe_destroy(nni_pipe *p)
if (nni_list_node_active(&p->p_ep_node)) {
nni_ep_pipe_remove(p->p_ep, p);
}
+
if (nni_list_node_active(&p->p_sock_node)) {
nni_sock_pipe_remove(p->p_sock, p);
}
@@ -263,13 +274,32 @@ static void
nni_pipe_start_cb(void *arg)
{
nni_pipe *p = arg;
+ nni_sock *s = p->p_sock;
nni_aio * aio = p->p_start_aio;
+ uint32_t id = nni_pipe_id(p);
- if ((nni_aio_result(aio) != 0) ||
- (nni_sock_pipe_start(p->p_sock, p) != 0) ||
- (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ if (nni_aio_result(aio) != 0) {
nni_pipe_stop(p);
+ return;
}
+
+ nni_mtx_lock(&p->p_mtx);
+ p->p_cbs = true; // We're running all cbs going forward
+ nni_mtx_unlock(&p->p_mtx);
+
+ nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id);
+ if (nni_pipe_closed(p)) {
+ nni_pipe_stop(p);
+ return;
+ }
+
+ if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) ||
+ nni_sock_closing(s)) {
+ nni_pipe_stop(p);
+ return;
+ }
+
+ nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id);
}
int
@@ -289,6 +319,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
}
// Make a private copy of the transport ops.
+ p->p_start_aio = NULL;
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
@@ -297,6 +328,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_sock = sock;
p->p_closed = false;
p->p_stop = false;
+ p->p_cbs = false;
p->p_refcnt = 0;
NNI_LIST_NODE_INIT(&p->p_reap_node);
@@ -317,18 +349,13 @@ nni_pipe_create(nni_ep *ep, void *tdata)
if ((rv != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) ||
- ((rv = nni_ep_pipe_add(ep, p)) != 0)) {
+ ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
+ ((rv = nni_sock_pipe_add(sock, p)) != 0)) {
nni_pipe_destroy(p);
return (rv);
}
- // At this point the endpoint knows about it, and the protocol
- // might too, so on failure we have to tear it down fully as if done
- // after a successful result.
- if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- nni_pipe_stop(p);
- }
- return (rv);
+ return (0);
}
int