aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-15 01:47:12 -0700
committerGitHub <noreply@github.com>2018-05-15 01:47:12 -0700
commit1d033484ee1a2ec26d3eead073e7bc0f889ffdf4 (patch)
tree15d3897d405cb0beb1ada6270ecf70241451ca70 /src/core/pipe.c
parent16b4c4019c7b7904de171c588ed8c72ca732d2cf (diff)
downloadnng-1d033484ee1a2ec26d3eead073e7bc0f889ffdf4.tar.gz
nng-1d033484ee1a2ec26d3eead073e7bc0f889ffdf4.tar.bz2
nng-1d033484ee1a2ec26d3eead073e7bc0f889ffdf4.zip
fixes #419 want to nni_aio_stop without blocking (#428)
* fixes #419 want to nni_aio_stop without blocking This actually introduces an nni_aio_close() API that causes nni_aio_begin to return NNG_ECLOSED, while scheduling a callback on the AIO to do an NNG_ECLOSED as well. This should be called in non-blocking close() contexts instead of nni_aio_stop(), and the cases where we call nni_aio_fini() multiple times are updated updated to add nni_aio_stop() calls on all "interlinked" aios before finalizing them. Furthermore, we call nni_aio_close() as soon as practical in the close path. This closes an annoying race condition where the callback from a lower subsystem could wind up rescheduling an operation that we wanted to abort.
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c93
1 files changed, 54 insertions, 39 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 010d306d..ba3027cf 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -18,21 +18,22 @@
// performed in the context of the protocol.
struct nni_pipe {
- uint32_t p_id;
- nni_tran_pipe p_tran_ops;
- void * p_tran_data;
- void * p_proto_data;
- nni_list_node p_sock_node;
- nni_list_node p_ep_node;
- nni_sock * p_sock;
- nni_ep * p_ep;
- bool p_closed;
- bool p_stop;
- int p_refcnt;
- nni_mtx p_mtx;
- nni_cv p_cv;
- nni_list_node p_reap_node;
- nni_aio * p_start_aio;
+ uint32_t p_id;
+ nni_tran_pipe_ops p_tran_ops;
+ nni_proto_pipe_ops p_proto_ops;
+ void * p_tran_data;
+ void * p_proto_data;
+ nni_list_node p_sock_node;
+ nni_list_node p_ep_node;
+ nni_sock * p_sock;
+ nni_ep * p_ep;
+ bool p_closed;
+ bool p_stop;
+ int p_refcnt;
+ nni_mtx p_mtx;
+ nni_cv p_cv;
+ nni_list_node p_reap_node;
+ nni_aio * p_start_aio;
};
static nni_idhash *nni_pipes;
@@ -106,6 +107,10 @@ nni_pipe_destroy(nni_pipe *p)
// Stop any pending negotiation.
nni_aio_stop(p->p_start_aio);
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_stop(p->p_proto_data);
+ }
+
// We have exclusive access at this point, so we can check if
// we are still on any lists.
if (nni_list_node_active(&p->p_ep_node)) {
@@ -126,6 +131,9 @@ nni_pipe_destroy(nni_pipe *p)
}
nni_mtx_unlock(&nni_pipe_lk);
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_fini(p->p_proto_data);
+ }
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
@@ -189,6 +197,9 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
void
nni_pipe_close(nni_pipe *p)
{
+ // abort any pending negotiation/start process.
+ nni_aio_close(p->p_start_aio);
+
nni_mtx_lock(&p->p_mtx);
if (p->p_closed) {
// We already did a close.
@@ -196,16 +207,16 @@ nni_pipe_close(nni_pipe *p)
return;
}
p->p_closed = true;
+ nni_mtx_unlock(&p->p_mtx);
+
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_close(p->p_proto_data);
+ }
// Close the underlying transport.
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_close(p->p_tran_data);
}
-
- nni_mtx_unlock(&p->p_mtx);
-
- // abort any pending negotiation/start process.
- nni_aio_abort(p->p_start_aio, NNG_ECLOSED);
}
bool
@@ -222,7 +233,6 @@ void
nni_pipe_stop(nni_pipe *p)
{
// Guard against recursive calls.
- nni_pipe_close(p);
nni_mtx_lock(&p->p_mtx);
if (p->p_stop) {
nni_mtx_unlock(&p->p_mtx);
@@ -231,6 +241,8 @@ nni_pipe_stop(nni_pipe *p)
p->p_stop = true;
nni_mtx_unlock(&p->p_mtx);
+ nni_pipe_close(p);
+
// Put it on the reaplist for async cleanup
nni_mtx_lock(&nni_pipe_reap_lk);
nni_list_append(&nni_pipe_reap_list, p);
@@ -250,12 +262,9 @@ nni_pipe_start_cb(void *arg)
nni_pipe *p = arg;
nni_aio * aio = p->p_start_aio;
- if (nni_aio_result(aio) != 0) {
- nni_pipe_stop(p);
- return;
- }
-
- if (nni_sock_pipe_start(p->p_sock, p) != 0) {
+ if ((nni_aio_result(aio) != 0) ||
+ (nni_sock_pipe_start(p->p_sock, p) != 0) ||
+ (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
nni_pipe_stop(p);
}
}
@@ -263,10 +272,12 @@ nni_pipe_start_cb(void *arg)
int
nni_pipe_create(nni_ep *ep, void *tdata)
{
- nni_pipe *p;
- int rv;
- nni_tran *tran = nni_ep_tran(ep);
- nni_sock *sock = nni_ep_sock(ep);
+ nni_pipe * p;
+ int rv;
+ nni_tran * tran = nni_ep_tran(ep);
+ nni_sock * sock = nni_ep_sock(ep);
+ void * sdata = nni_sock_proto_data(sock);
+ nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -277,6 +288,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
// Make a private copy of the transport ops.
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
+ p->p_proto_ops = *pops;
p->p_proto_data = NULL;
p->p_ep = ep;
p->p_sock = sock;
@@ -290,6 +302,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
+
if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) {
uint64_t id;
nni_mtx_lock(&nni_pipe_lk);
@@ -299,11 +312,19 @@ nni_pipe_create(nni_ep *ep, void *tdata)
nni_mtx_unlock(&nni_pipe_lk);
}
- if ((rv != 0) || ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
- ((rv = nni_sock_pipe_add(sock, p)) != 0)) {
+ if ((rv != 0) ||
+ ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) ||
+ ((rv = nni_ep_pipe_add(ep, p)) != 0)) {
nni_pipe_destroy(p);
+ return (rv);
}
+ // At this point the endpoint knows about it, and the protocol
+ // might too, so on failure we have to tear it down fully as if done
+ // after a successful result.
+ if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
+ nni_pipe_stop(p);
+ }
return (rv);
}
@@ -339,12 +360,6 @@ nni_pipe_get_proto_data(nni_pipe *p)
}
void
-nni_pipe_set_proto_data(nni_pipe *p, void *data)
-{
- p->p_proto_data = data;
-}
-
-void
nni_pipe_sock_list_init(nni_list *list)
{
NNI_LIST_INIT(list, nni_pipe, p_sock_node);