From 953ca274ae57f8edd12536a3dd15d134aa6e5576 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 6 Jul 2018 14:42:53 -0700 Subject: fixes #568 Want a single reader/write lock on socket child objects fixes #170 Make more use of reaper This is a complete restructure/rethink of how child objects interact with the socket. (This also backs out #576 as it turns out not to be needed.) While 568 says reader/writer lock, for now we have settled for a single writer lock. Its likely that this is sufficient. Essentially we use the single socket lock to guard lists of the socket children. We also use deferred deletion in the idhash to facilitate teardown, which means endpoint closes are no longer synchronous. We use the reaper to clean up objects when the reference count drops to zero. We make a special exception for pipes, since they really are not reference counted by their parents, and they are leaf objects anyway. We believe this addresses the main outstanding race conditions in a much more correct and holistic way. Note that endpoint shutdown is a little tricky, as it makes use of atomic flags to guard against double entry, and against recursive lock entry. This is something that would be nice to make a bit more obvious, but what we have is safe, and the complexity is at least confined to one place. --- src/protocol/bus0/bus.c | 10 +++++----- src/protocol/pair0/pair.c | 8 ++++---- src/protocol/pair1/pair.c | 12 ++++++------ src/protocol/pipeline0/pull.c | 4 ++-- src/protocol/pipeline0/push.c | 6 +++--- src/protocol/pubsub0/pub.c | 6 +++--- src/protocol/pubsub0/sub.c | 4 ++-- src/protocol/reqrep0/rep.c | 6 +++--- src/protocol/reqrep0/req.c | 6 +++--- src/protocol/reqrep0/xrep.c | 10 +++++----- src/protocol/reqrep0/xreq.c | 10 +++++----- src/protocol/survey0/respond.c | 6 +++--- src/protocol/survey0/survey.c | 8 ++++---- src/protocol/survey0/xrespond.c | 10 +++++----- src/protocol/survey0/xsurvey.c | 10 +++++----- 15 files changed, 58 insertions(+), 58 deletions(-) (limited to 'src/protocol') diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index ba719e49..2426abba 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -234,7 +234,7 @@ bus0_pipe_getq_cb(void *arg) if (nni_aio_result(p->aio_getq) != 0) { // closed? - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); @@ -252,7 +252,7 @@ bus0_pipe_send_cb(void *arg) // closed? nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -267,7 +267,7 @@ bus0_pipe_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } msg = nni_aio_get_msg(p->aio_recv); @@ -277,7 +277,7 @@ bus0_pipe_recv_cb(void *arg) // XXX: bump a nomemory stat nni_msg_free(msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -295,7 +295,7 @@ bus0_pipe_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c index 4ab9f9e8..d663c5e2 100644 --- a/src/protocol/pair0/pair.c +++ b/src/protocol/pair0/pair.c @@ -176,7 +176,7 @@ pair0_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -196,7 +196,7 @@ pair0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_pipe_recv(p->npipe, p->aio_recv); @@ -208,7 +208,7 @@ pair0_getq_cb(void *arg) pair0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -226,7 +226,7 @@ pair0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index 0c6a4867..dc250943 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -244,7 +244,7 @@ pair1_pipe_recv_cb(void *arg) int rv; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -257,13 +257,13 @@ pair1_pipe_recv_cb(void *arg) // If the message is missing the hop count header, scrap it. if (nni_msg_len(msg) < sizeof(uint32_t)) { nni_msg_free(msg); - nni_pipe_stop(npipe); + nni_pipe_close(npipe); return; } hdr = nni_msg_trim_u32(msg); if (hdr & 0xffffff00) { nni_msg_free(msg); - nni_pipe_stop(npipe); + nni_pipe_close(npipe); return; } @@ -343,7 +343,7 @@ pair1_pipe_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_pipe_recv(p->npipe, p->aio_recv); @@ -358,7 +358,7 @@ pair1_pipe_getq_cb(void *arg) uint32_t hops; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -405,7 +405,7 @@ pair1_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index 63243cb5..a713bc80 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -141,7 +141,7 @@ pull0_recv_cb(void *arg) if (nni_aio_result(aio) != 0) { // Failed to get a message, probably the pipe is closed. - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -163,7 +163,7 @@ pull0_putq_cb(void *arg) // we can do. Just close the pipe. nni_msg_free(nni_aio_get_msg(aio)); nni_aio_set_msg(aio, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c index 9585b86c..00e9212c 100644 --- a/src/protocol/pipeline0/push.c +++ b/src/protocol/pipeline0/push.c @@ -163,7 +163,7 @@ push0_recv_cb(void *arg) // We normally expect to receive an error. If a pipe actually // sends us data, we just discard it. if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_msg_free(nni_aio_get_msg(p->aio_recv)); @@ -180,7 +180,7 @@ push0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -195,7 +195,7 @@ push0_getq_cb(void *arg) if (nni_aio_result(aio) != 0) { // If the socket is closing, nothing else we can do. - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index 72cd1daa..cbc8acea 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -240,7 +240,7 @@ pub0_pipe_recv_cb(void *arg) pub0_pipe *p = arg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -255,7 +255,7 @@ pub0_pipe_getq_cb(void *arg) pub0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -273,7 +273,7 @@ pub0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 2e8be4be..cb6d781f 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -164,7 +164,7 @@ sub0_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -182,7 +182,7 @@ sub0_recv_cb(void *arg) // Any other error we stop the pipe for. It's probably // NNG_ECLOSED anyway. nng_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_pipe_recv(p->pipe, p->aio_recv); diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index f725cadb..f9ce58fd 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -416,7 +416,7 @@ rep0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_mtx_lock(&s->lk); @@ -519,7 +519,7 @@ rep0_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -544,7 +544,7 @@ rep0_pipe_recv_cb(void *arg) // Peer is speaking garbage. Kick it. nni_msg_free(msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } body = nni_msg_body(msg); diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 43751d14..018dd0e8 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -305,7 +305,7 @@ req0_send_cb(void *arg) // We failed to send... clean up and deal with it. nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -345,7 +345,7 @@ req0_recv_cb(void *arg) uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -409,7 +409,7 @@ req0_recv_cb(void *arg) malformed: nni_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); } static void diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index e3b9b605..1fe81ac5 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -262,7 +262,7 @@ xrep0_pipe_getq_cb(void *arg) xrep0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -280,7 +280,7 @@ xrep0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -296,7 +296,7 @@ xrep0_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -327,7 +327,7 @@ xrep0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer is speaking garbage. Kick it. nni_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } body = nni_msg_body(msg); @@ -361,7 +361,7 @@ xrep0_pipe_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index 2f0a3652..a98c713e 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -173,7 +173,7 @@ xreq0_getq_cb(void *arg) xreq0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -191,7 +191,7 @@ xreq0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -207,7 +207,7 @@ xreq0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_aio_set_msg(p->aio_putq, NULL); @@ -224,7 +224,7 @@ xreq0_recv_cb(void *arg) uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -236,7 +236,7 @@ xreq0_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer gave us garbage, so kick it. nni_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } id = nni_msg_trim_u32(msg); diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index fbdeb65a..4e0a5263 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -405,7 +405,7 @@ resp0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_mtx_lock(&s->mtx); @@ -511,7 +511,7 @@ resp0_pipe_recv_cb(void *arg) size_t len; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -532,7 +532,7 @@ resp0_pipe_recv_cb(void *arg) // Peer is speaking garbage, kick it. nni_msg_free(msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } body = nni_msg_body(msg); diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index 42d88f13..58fa4aa6 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -376,7 +376,7 @@ surv0_pipe_getq_cb(void *arg) surv0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -394,7 +394,7 @@ surv0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -411,7 +411,7 @@ surv0_pipe_recv_cb(void *arg) uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -423,7 +423,7 @@ surv0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer sent us garbage. Kick it. nni_msg_free(msg); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } id = nni_msg_trim_u32(msg); diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c index 03a47e92..334c5ca6 100644 --- a/src/protocol/survey0/xrespond.c +++ b/src/protocol/survey0/xrespond.c @@ -247,7 +247,7 @@ xresp0_getq_cb(void *arg) xresp0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -265,7 +265,7 @@ xresp0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -282,7 +282,7 @@ xresp0_recv_cb(void *arg) int hops; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -308,7 +308,7 @@ xresp0_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer sent us garbage, so kick it. nni_msg_free(msg); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } body = nni_msg_body(msg); @@ -340,7 +340,7 @@ xresp0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c index bad24042..fabcc766 100644 --- a/src/protocol/survey0/xsurvey.c +++ b/src/protocol/survey0/xsurvey.c @@ -205,7 +205,7 @@ xsurv0_getq_cb(void *arg) xsurv0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -223,7 +223,7 @@ xsurv0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -238,7 +238,7 @@ xsurv0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -252,7 +252,7 @@ xsurv0_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -264,7 +264,7 @@ xsurv0_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer gave us garbage, so kick it. nni_msg_free(msg); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { -- cgit v1.2.3-70-g09d2