summaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-07-06 14:42:53 -0700
committerGarrett D'Amore <garrett@damore.org>2018-07-06 18:57:29 -0700
commit953ca274ae57f8edd12536a3dd15d134aa6e5576 (patch)
tree7a0e889fbae7b525befefedcb5cb8f10820e7a47 /src/protocol
parent89cba92d13fbc5e059336fd054be30e50d8a2621 (diff)
downloadnng-953ca274ae57f8edd12536a3dd15d134aa6e5576.tar.gz
nng-953ca274ae57f8edd12536a3dd15d134aa6e5576.tar.bz2
nng-953ca274ae57f8edd12536a3dd15d134aa6e5576.zip
fixes #568 Want a single reader/write lock on socket child objects
fixes #170 Make more use of reaper This is a complete restructure/rethink of how child objects interact with the socket. (This also backs out #576 as it turns out not to be needed.) While 568 says reader/writer lock, for now we have settled for a single writer lock. Its likely that this is sufficient. Essentially we use the single socket lock to guard lists of the socket children. We also use deferred deletion in the idhash to facilitate teardown, which means endpoint closes are no longer synchronous. We use the reaper to clean up objects when the reference count drops to zero. We make a special exception for pipes, since they really are not reference counted by their parents, and they are leaf objects anyway. We believe this addresses the main outstanding race conditions in a much more correct and holistic way. Note that endpoint shutdown is a little tricky, as it makes use of atomic flags to guard against double entry, and against recursive lock entry. This is something that would be nice to make a bit more obvious, but what we have is safe, and the complexity is at least confined to one place.
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/bus0/bus.c10
-rw-r--r--src/protocol/pair0/pair.c8
-rw-r--r--src/protocol/pair1/pair.c12
-rw-r--r--src/protocol/pipeline0/pull.c4
-rw-r--r--src/protocol/pipeline0/push.c6
-rw-r--r--src/protocol/pubsub0/pub.c6
-rw-r--r--src/protocol/pubsub0/sub.c4
-rw-r--r--src/protocol/reqrep0/rep.c6
-rw-r--r--src/protocol/reqrep0/req.c6
-rw-r--r--src/protocol/reqrep0/xrep.c10
-rw-r--r--src/protocol/reqrep0/xreq.c10
-rw-r--r--src/protocol/survey0/respond.c6
-rw-r--r--src/protocol/survey0/survey.c8
-rw-r--r--src/protocol/survey0/xrespond.c10
-rw-r--r--src/protocol/survey0/xsurvey.c10
15 files changed, 58 insertions, 58 deletions
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index ba719e49..2426abba 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -234,7 +234,7 @@ bus0_pipe_getq_cb(void *arg)
if (nni_aio_result(p->aio_getq) != 0) {
// closed?
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
@@ -252,7 +252,7 @@ bus0_pipe_send_cb(void *arg)
// closed?
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -267,7 +267,7 @@ bus0_pipe_recv_cb(void *arg)
nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
msg = nni_aio_get_msg(p->aio_recv);
@@ -277,7 +277,7 @@ bus0_pipe_recv_cb(void *arg)
// XXX: bump a nomemory stat
nni_msg_free(msg);
nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -295,7 +295,7 @@ bus0_pipe_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c
index 4ab9f9e8..d663c5e2 100644
--- a/src/protocol/pair0/pair.c
+++ b/src/protocol/pair0/pair.c
@@ -176,7 +176,7 @@ pair0_recv_cb(void *arg)
nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -196,7 +196,7 @@ pair0_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
nni_pipe_recv(p->npipe, p->aio_recv);
@@ -208,7 +208,7 @@ pair0_getq_cb(void *arg)
pair0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -226,7 +226,7 @@ pair0_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index 0c6a4867..dc250943 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -244,7 +244,7 @@ pair1_pipe_recv_cb(void *arg)
int rv;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -257,13 +257,13 @@ pair1_pipe_recv_cb(void *arg)
// If the message is missing the hop count header, scrap it.
if (nni_msg_len(msg) < sizeof(uint32_t)) {
nni_msg_free(msg);
- nni_pipe_stop(npipe);
+ nni_pipe_close(npipe);
return;
}
hdr = nni_msg_trim_u32(msg);
if (hdr & 0xffffff00) {
nni_msg_free(msg);
- nni_pipe_stop(npipe);
+ nni_pipe_close(npipe);
return;
}
@@ -343,7 +343,7 @@ pair1_pipe_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
nni_pipe_recv(p->npipe, p->aio_recv);
@@ -358,7 +358,7 @@ pair1_pipe_getq_cb(void *arg)
uint32_t hops;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -405,7 +405,7 @@ pair1_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c
index 63243cb5..a713bc80 100644
--- a/src/protocol/pipeline0/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -141,7 +141,7 @@ pull0_recv_cb(void *arg)
if (nni_aio_result(aio) != 0) {
// Failed to get a message, probably the pipe is closed.
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -163,7 +163,7 @@ pull0_putq_cb(void *arg)
// we can do. Just close the pipe.
nni_msg_free(nni_aio_get_msg(aio));
nni_aio_set_msg(aio, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c
index 9585b86c..00e9212c 100644
--- a/src/protocol/pipeline0/push.c
+++ b/src/protocol/pipeline0/push.c
@@ -163,7 +163,7 @@ push0_recv_cb(void *arg)
// We normally expect to receive an error. If a pipe actually
// sends us data, we just discard it.
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
nni_msg_free(nni_aio_get_msg(p->aio_recv));
@@ -180,7 +180,7 @@ push0_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -195,7 +195,7 @@ push0_getq_cb(void *arg)
if (nni_aio_result(aio) != 0) {
// If the socket is closing, nothing else we can do.
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index 72cd1daa..cbc8acea 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -240,7 +240,7 @@ pub0_pipe_recv_cb(void *arg)
pub0_pipe *p = arg;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -255,7 +255,7 @@ pub0_pipe_getq_cb(void *arg)
pub0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -273,7 +273,7 @@ pub0_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index 2e8be4be..cb6d781f 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -164,7 +164,7 @@ sub0_recv_cb(void *arg)
nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -182,7 +182,7 @@ sub0_recv_cb(void *arg)
// Any other error we stop the pipe for. It's probably
// NNG_ECLOSED anyway.
nng_msg_free(msg);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
nni_pipe_recv(p->pipe, p->aio_recv);
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index f725cadb..f9ce58fd 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -416,7 +416,7 @@ rep0_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
nni_mtx_lock(&s->lk);
@@ -519,7 +519,7 @@ rep0_pipe_recv_cb(void *arg)
int hops;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -544,7 +544,7 @@ rep0_pipe_recv_cb(void *arg)
// Peer is speaking garbage. Kick it.
nni_msg_free(msg);
nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
body = nni_msg_body(msg);
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 43751d14..018dd0e8 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -305,7 +305,7 @@ req0_send_cb(void *arg)
// We failed to send... clean up and deal with it.
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -345,7 +345,7 @@ req0_recv_cb(void *arg)
uint32_t id;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -409,7 +409,7 @@ req0_recv_cb(void *arg)
malformed:
nni_msg_free(msg);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
}
static void
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index e3b9b605..1fe81ac5 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -262,7 +262,7 @@ xrep0_pipe_getq_cb(void *arg)
xrep0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -280,7 +280,7 @@ xrep0_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -296,7 +296,7 @@ xrep0_pipe_recv_cb(void *arg)
int hops;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -327,7 +327,7 @@ xrep0_pipe_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer is speaking garbage. Kick it.
nni_msg_free(msg);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
body = nni_msg_body(msg);
@@ -361,7 +361,7 @@ xrep0_pipe_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
index 2f0a3652..a98c713e 100644
--- a/src/protocol/reqrep0/xreq.c
+++ b/src/protocol/reqrep0/xreq.c
@@ -173,7 +173,7 @@ xreq0_getq_cb(void *arg)
xreq0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -191,7 +191,7 @@ xreq0_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -207,7 +207,7 @@ xreq0_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
nni_aio_set_msg(p->aio_putq, NULL);
@@ -224,7 +224,7 @@ xreq0_recv_cb(void *arg)
uint32_t id;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -236,7 +236,7 @@ xreq0_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer gave us garbage, so kick it.
nni_msg_free(msg);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
id = nni_msg_trim_u32(msg);
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index fbdeb65a..4e0a5263 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -405,7 +405,7 @@ resp0_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
nni_mtx_lock(&s->mtx);
@@ -511,7 +511,7 @@ resp0_pipe_recv_cb(void *arg)
size_t len;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -532,7 +532,7 @@ resp0_pipe_recv_cb(void *arg)
// Peer is speaking garbage, kick it.
nni_msg_free(msg);
nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
body = nni_msg_body(msg);
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index 42d88f13..58fa4aa6 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -376,7 +376,7 @@ surv0_pipe_getq_cb(void *arg)
surv0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -394,7 +394,7 @@ surv0_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -411,7 +411,7 @@ surv0_pipe_recv_cb(void *arg)
uint32_t id;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -423,7 +423,7 @@ surv0_pipe_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer sent us garbage. Kick it.
nni_msg_free(msg);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
id = nni_msg_trim_u32(msg);
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
index 03a47e92..334c5ca6 100644
--- a/src/protocol/survey0/xrespond.c
+++ b/src/protocol/survey0/xrespond.c
@@ -247,7 +247,7 @@ xresp0_getq_cb(void *arg)
xresp0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -265,7 +265,7 @@ xresp0_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -282,7 +282,7 @@ xresp0_recv_cb(void *arg)
int hops;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -308,7 +308,7 @@ xresp0_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer sent us garbage, so kick it.
nni_msg_free(msg);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
body = nni_msg_body(msg);
@@ -340,7 +340,7 @@ xresp0_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
index bad24042..fabcc766 100644
--- a/src/protocol/survey0/xsurvey.c
+++ b/src/protocol/survey0/xsurvey.c
@@ -205,7 +205,7 @@ xsurv0_getq_cb(void *arg)
xsurv0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -223,7 +223,7 @@ xsurv0_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -238,7 +238,7 @@ xsurv0_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -252,7 +252,7 @@ xsurv0_recv_cb(void *arg)
nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -264,7 +264,7 @@ xsurv0_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer gave us garbage, so kick it.
nni_msg_free(msg);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) {