aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-28 23:07:28 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-28 23:07:28 -0700
commitfe3c9705072ac8cafecdf2ea6bca4c26f9464824 (patch)
tree07aaea70cbf8bb6af369d5efede475ed03ffdd63 /src/core/pipe.c
parent10d748fa6444324878a77cc5749c93b75819ced2 (diff)
downloadnng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.tar.gz
nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.tar.bz2
nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.zip
Refactor stop again, closing numerous races (thanks valgrind!)
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c61
1 files changed, 35 insertions, 26 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index f33f21a6..3dcfe9e0 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -46,7 +46,7 @@ nni_pipe_dtor(void *ptr)
{
nni_pipe *p = ptr;
- if (p->p_proto_dtor != NULL) {
+ if (p->p_proto_data != NULL) {
p->p_proto_dtor(p->p_proto_data);
}
if (p->p_tran_data != NULL) {
@@ -145,23 +145,41 @@ nni_pipe_close(nni_pipe *p)
}
-// nni_pipe_remove is called by protocol implementations to indicate that
-// they are finished using the pipe (it should be closed already), and the
-// owning socket and endpoint should de-register it.
+// We have to stop asynchronously using a task, because otherwise we can
+// wind up having a callback from an AIO trying to cancel itself. That
+// simply will not work.
void
nni_pipe_remove(nni_pipe *p)
{
- // Make sure the pipe is closed, in case it wasn't already done.
+ // Transport close...
nni_pipe_close(p);
nni_ep_pipe_remove(p->p_ep, p);
- nni_sock_pipe_remove(p->p_sock, p);
+
+ // Tell the protocol to stop.
+ nni_sock_pipe_stop(p->p_sock, p);
// XXX: would be simpler to just do a destroy here
nni_pipe_rele(p);
}
+void
+nni_pipe_stop(nni_pipe *p)
+{
+ // Guard against recursive calls.
+ nni_mtx_lock(&p->p_mtx);
+ if (p->p_stop) {
+ nni_mtx_unlock(&p->p_mtx);
+ return;
+ }
+ p->p_stop = 1;
+ nni_mtx_unlock(&p->p_mtx);
+ nni_taskq_ent_init(&p->p_reap_tqe, (nni_cb) nni_pipe_remove, p);
+ nni_taskq_dispatch(NULL, &p->p_reap_tqe);
+}
+
+
uint16_t
nni_pipe_peer(nni_pipe *p)
{
@@ -175,6 +193,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
nni_pipe *p;
int rv;
uint32_t id;
+ void *pdata;
rv = nni_objhash_alloc(nni_pipes, &id, (void **) &p);
if (rv != 0) {
@@ -187,18 +206,24 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
// and we avoid an extra dereference on hot code paths.
p->p_tran_ops = *tran->tran_pipe;
+ // Save the protocol destructor.
+ p->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
+
// Initialize the transport pipe data.
if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) {
nni_objhash_unref(nni_pipes, p->p_id);
return (rv);
}
- if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
- nni_pipe_remove(p);
+ // Initialize protocol pipe data.
+ rv = sock->s_pipe_ops.pipe_init(&p->p_proto_data, p, sock->s_data);
+ if (rv != 0) {
+ nni_objhash_unref(nni_pipes, p->p_id);
return (rv);
}
- if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- nni_pipe_remove(p);
+
+ if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
+ nni_objhash_unref(nni_pipes, p->p_id);
return (rv);
}
@@ -222,16 +247,8 @@ int
nni_pipe_start(nni_pipe *p)
{
int rv;
- nni_pipe *scratch;
-
- rv = nni_objhash_find(nni_pipes, p->p_id, (void **) &scratch);
- if (rv != 0) {
- return (rv);
- }
- NNI_ASSERT(p == scratch);
if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) {
- nni_pipe_remove(p);
return (rv);
}
@@ -241,14 +258,6 @@ nni_pipe_start(nni_pipe *p)
}
-void
-nni_pipe_set_proto_data(nni_pipe *p, void *data, nni_cb dtor)
-{
- p->p_proto_data = data;
- p->p_proto_dtor = dtor;
-}
-
-
void *
nni_pipe_get_proto_data(nni_pipe *p)
{