summaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-17 12:54:01 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-17 19:29:37 -0700
commit70d478f5d185e147ca8d3dcba4cbd8bb6da3719a (patch)
tree443e3b0e81138d7c195660d45eca7d4d497af8ac /src/protocol
parente490aa3353f05e158a0f1f485f371cd49e70b4f5 (diff)
downloadnng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.tar.gz
nng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.tar.bz2
nng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.zip
fixes #449 Want more flexible pipe events
This changes the signature of nng_pipe_notify(), and the associated events. The documentation is updated to reflect this. We have also broken the lock up so that we don't hold the master socket lock for some of these things, which may have beneficial impact on performance.
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/bus0/bus.c5
-rw-r--r--src/protocol/pair0/pair.c5
-rw-r--r--src/protocol/pair1/pair.c5
-rw-r--r--src/protocol/pipeline0/pull.c5
-rw-r--r--src/protocol/pubsub0/sub.c5
-rw-r--r--src/protocol/reqrep0/rep.c5
-rw-r--r--src/protocol/reqrep0/xrep.c5
-rw-r--r--src/protocol/survey0/respond.c4
-rw-r--r--src/protocol/survey0/survey.c4
-rw-r--r--src/protocol/survey0/xrespond.c4
-rw-r--r--src/protocol/survey0/xsurvey.c4
11 files changed, 51 insertions, 0 deletions
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index 852de7c8..2427e836 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -193,6 +193,11 @@ bus0_pipe_start(void *arg)
bus0_pipe *p = arg;
bus0_sock *s = p->psock;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_BUS_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
nni_mtx_unlock(&s->mtx);
diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c
index 2fb42df5..87900793 100644
--- a/src/protocol/pair0/pair.c
+++ b/src/protocol/pair0/pair.c
@@ -129,6 +129,11 @@ pair0_pipe_start(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
nni_mtx_lock(&s->mtx);
if (s->ppipe != NULL) {
nni_mtx_unlock(&s->mtx);
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index ab01e451..584c147d 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -173,6 +173,11 @@ pair1_pipe_start(void *arg)
uint32_t id;
int rv;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V1) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
id = nni_pipe_id(p->npipe);
nni_mtx_lock(&s->mtx);
if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) {
diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c
index 81f6c137..f7d5d9a2 100644
--- a/src/protocol/pipeline0/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -112,6 +112,11 @@ pull0_pipe_start(void *arg)
{
pull0_pipe *p = arg;
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUSH_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
// Start the pending pull...
nni_pipe_recv(p->pipe, p->recv_aio);
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index c244b0ad..b4fa2e2a 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -143,6 +143,11 @@ sub0_pipe_start(void *arg)
{
sub0_pipe *p = arg;
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUB_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index c483b777..24fc7335 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -346,6 +346,11 @@ rep0_pipe_start(void *arg)
rep0_sock *s = p->rep;
int rv;
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
return (rv);
}
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index 6dcfe6be..f6cd29eb 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -178,6 +178,11 @@ xrep0_pipe_start(void *arg)
xrep0_sock *s = p->rep;
int rv;
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
return (rv);
}
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index 7738a8b7..5f833ca6 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -341,6 +341,10 @@ resp0_pipe_start(void *arg)
resp0_sock *s = p->psock;
int rv;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) {
+ return (NNG_EPROTO);
+ }
+
nni_mtx_lock(&s->mtx);
rv = nni_idhash_insert(s->pipes, p->id, p);
nni_mtx_unlock(&s->mtx);
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index 51bce0c8..24a35003 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -338,6 +338,10 @@ surv0_pipe_start(void *arg)
surv0_pipe *p = arg;
surv0_sock *s = p->sock;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) {
+ return (NNG_EPROTO);
+ }
+
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
nni_mtx_unlock(&s->mtx);
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
index bcbbcbc7..13a3d759 100644
--- a/src/protocol/survey0/xrespond.c
+++ b/src/protocol/survey0/xrespond.c
@@ -164,6 +164,10 @@ xresp0_pipe_start(void *arg)
xresp0_sock *s = p->psock;
int rv;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) {
+ return (NNG_EPROTO);
+ }
+
p->id = nni_pipe_id(p->npipe);
nni_mtx_lock(&s->mtx);
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
index 47ebef3c..db21e688 100644
--- a/src/protocol/survey0/xsurvey.c
+++ b/src/protocol/survey0/xsurvey.c
@@ -166,6 +166,10 @@ xsurv0_pipe_start(void *arg)
xsurv0_pipe *p = arg;
xsurv0_sock *s = p->psock;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) {
+ return (NNG_EPROTO);
+ }
+
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
nni_mtx_unlock(&s->mtx);