aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-06 10:07:19 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-06 10:07:19 -0800
commit59a4b69c7989ac4fef1645f929abfa91b884215b (patch)
tree5ac708af2420fbc67b17ae25bfe253878012bea6
parenta2801adffebb6a3679e41789b38ba925ed32832b (diff)
downloadnng-59a4b69c7989ac4fef1645f929abfa91b884215b.tar.gz
nng-59a4b69c7989ac4fef1645f929abfa91b884215b.tar.bz2
nng-59a4b69c7989ac4fef1645f929abfa91b884215b.zip
Fixes for PUB/SUB.
This fixes several issues, and brings PUB/SUB to operational correctness. Included is test code to verify that.
-rw-r--r--src/protocol/pubsub/pub.c5
-rw-r--r--src/protocol/pubsub/sub.c4
-rw-r--r--tests/pubsub.c140
3 files changed, 63 insertions, 86 deletions
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index b1f91fe2..860c7c7d 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -121,7 +121,6 @@ nni_pub_pipe_add(void *arg)
{
nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
- int rv;
if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) {
return (NNG_EPROTO);
@@ -130,7 +129,7 @@ nni_pub_pipe_add(void *arg)
nni_list_append(&pub->pipes, pp);
nni_mtx_unlock(&pub->mx);
- return (rv);
+ return (0);
}
@@ -173,7 +172,7 @@ nni_pub_broadcast(void *arg)
} else {
dup = msg;
}
- if ((rv = nni_msgq_tryput(pp->sendq, msg)) != 0) {
+ if ((rv = nni_msgq_tryput(pp->sendq, dup)) != 0) {
nni_msg_free(dup);
}
}
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 1243ca47..19c06aa0 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -200,6 +200,7 @@ nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz)
}
NNI_LIST_NODE_INIT(&newtopic->node);
newtopic->len = sz;
+ memcpy(newtopic->buf, buf, sz);
if (topic != NULL) {
nni_list_insert_before(&sub->topics, newtopic, topic);
} else {
@@ -306,6 +307,7 @@ nni_sub_recvfilter(void *arg, nni_msg *msg)
body = nni_msg_body(msg);
len = nni_msg_len(msg);
+ match = 0;
// Check to see if the message matches one of our subscriptions.
NNI_LIST_FOREACH (&sub->topics, topic) {
if (len >= topic->len) {
@@ -329,7 +331,7 @@ nni_sub_recvfilter(void *arg, nni_msg *msg)
nni_msg_free(msg);
return (NULL);
}
- return (0);
+ return (msg);
}
diff --git a/tests/pubsub.c b/tests/pubsub.c
index c693cf52..df828241 100644
--- a/tests/pubsub.c
+++ b/tests/pubsub.c
@@ -12,6 +12,10 @@
#include <string.h>
+#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
+#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\
+ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
+
Main({
int rv;
const char *addr = "inproc://test";
@@ -20,8 +24,7 @@ Main({
Convey("We can create a PUB socket", {
nng_socket *pub;
- rv = nng_open(&pub, NNG_PROTO_PUB);
- So(rv == 0);
+ So(nng_open(&pub, NNG_PROTO_PUB) == 0);
So(pub != NULL);
Reset({
@@ -35,15 +38,13 @@ Main({
Convey("Recv fails", {
nng_msg *msg;
- rv = nng_recvmsg(pub, &msg, 0);
- So(rv == NNG_ENOTSUP);
+ So(nng_recvmsg(pub, &msg, 0) == NNG_ENOTSUP);
})
})
Convey("We can create a SUB socket", {
nng_socket *sub;
- rv = nng_open(&sub, NNG_PROTO_SUB);
- So(rv == 0);
+ So(nng_open(&sub, NNG_PROTO_SUB) == 0);
So(sub != NULL);
Reset({
@@ -57,10 +58,8 @@ Main({
Convey("Send fails", {
nng_msg *msg;
- rv = nng_msg_alloc(&msg, 0);
- So(rv == 0);
- rv = nng_sendmsg(sub, msg, 0);
- So(rv == NNG_ENOTSUP);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(sub, msg, 0) == NNG_ENOTSUP);
nng_msg_free(msg);
})
})
@@ -98,92 +97,69 @@ Main({
Convey("Pub cannot subscribe", {
So(nng_setopt(pub, NNG_OPT_SUBSCRIBE, "", 0) == NNG_ENOTSUP);
})
-#if 0
- Convey("They can REQ/REP exchange", {
- nng_msg *ping;
- nng_msg *pong;
- char *body;
- size_t len;
-
- So(nng_msg_alloc(&ping, 0) == 0);
- So(nng_msg_append(ping, "ping", 5) == 0);
- body = nng_msg_body(ping, &len);
- So(len == 5);
- So(memcmp(body, "ping", 5) == 0);
- So(nng_sendmsg(req, ping, 0) == 0);
- pong = NULL;
- So(nng_recvmsg(rep, &pong, 0) == 0);
- So(pong != NULL);
- body = nng_msg_body(pong, &len);
- So(len == 5);
- So(memcmp(body, "ping", 5) == 0);
- nng_msg_trim(pong, 5);
- So(nng_msg_append(pong, "pong", 5) == 0);
- So(nng_sendmsg(rep, pong, 0) == 0);
- ping = 0;
- So(nng_recvmsg(req, &ping, 0) == 0);
- So(ping != NULL);
- body = nng_msg_body(ping, &len);
- So(len == 5);
- So(memcmp(body, "pong", 5) == 0);
- nng_msg_free(ping);
- })
-#endif
- })
-#if 0
- Convey("Request cancellation works", {
- nng_msg *abc;
- nng_msg *def;
- nng_msg *cmd;
- nng_msg *nvm;
- char *body;
- size_t len;
- uint64_t retry = 100000; // 100 ms
+ Convey("Subs can receive from pubs", {
+ nng_msg *msg;
+ uint64_t rtimeo;
+
+
+ So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "/some/", strlen("/some/")) == 0);
+ rtimeo = 50000; // 50ms
+ So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0);
- nng_socket *req;
- nng_socket *rep;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "/some/like/it/hot");
+ So(nng_sendmsg(pub, msg, 0) == 0);
+ So(nng_recvmsg(sub, &msg, 0) == 0);
+ CHECKSTR(msg, "/some/like/it/hot");
+ nng_msg_free(msg);
- So(nng_open(&rep, NNG_PROTO_REP) == 0);
- So(rep != NULL);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "/somewhere/over/the/rainbow");
+ CHECKSTR(msg, "/somewhere/over/the/rainbow");
- So(nng_open(&req, NNG_PROTO_REQ) == 0);
- So(req != NULL);
+ So(nng_sendmsg(pub, msg, 0) == 0);
+ So(nng_recvmsg(sub, &msg, 0) == NNG_ETIMEDOUT);
- Reset({
- nng_close(rep);
- nng_close(req);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "/some/day/some/how");
+ CHECKSTR(msg, "/some/day/some/how");
+
+ So(nng_sendmsg(pub, msg, 0) == 0);
+ So(nng_recvmsg(sub, &msg, 0) == 0);
+ CHECKSTR(msg, "/some/day/some/how");
+ nng_msg_free(msg);
})
- So(nng_setopt(req, NNG_OPT_RESENDTIME, &retry, sizeof (retry)) == 0);
- len = 16;
- So(nng_setopt(req, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0);
+ Convey("Subs without subsciptions don't receive", {
- So(nng_msg_alloc(&abc, 0) == 0);
- So(nng_msg_append(abc, "abc", 4) == 0);
- So(nng_msg_alloc(&def, 0) == 0);
- So(nng_msg_append(def, "def", 4) == 0);
+ uint64_t rtimeo = 50000; // 50ms
+ nng_msg *msg;
+ So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0);
- So(nng_dial(req, addr, NULL, 0) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "/some/don't/like/it");
+ So(nng_sendmsg(pub, msg, 0) == 0);
+ So(nng_recvmsg(sub, &msg, 0) == NNG_ETIMEDOUT);
+ })
- So(nng_sendmsg(req, abc, 0) == 0);
- So(nng_sendmsg(req, def, 0) == 0);
+ Convey("Subs in raw receive", {
- So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0);
+ uint64_t rtimeo = 50000; // 50ms
+ int raw = 1;
+ nng_msg *msg;
- So(nng_recvmsg(rep, &cmd, 0) == 0);
- So(cmd != NULL);
- So(nng_sendmsg(rep, cmd, 0) == 0);
- So(nng_recvmsg(rep, &cmd, 0) == 0);
- So(nng_sendmsg(rep, cmd, 0) == 0);
+ So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0);
+ So(nng_setopt(sub, NNG_OPT_RAW, &raw, sizeof (raw)) == 0);
- So(nng_recvmsg(req, &cmd, 0) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "/some/like/it/raw");
+ So(nng_sendmsg(pub, msg, 0) == 0);
+ So(nng_recvmsg(sub, &msg, 0) == 0);
+ CHECKSTR(msg, "/some/like/it/raw");
+ nng_msg_free(msg);
+ })
- body = nng_msg_body(cmd, &len);
- So(len == 4);
- So(memcmp(body, "def", 4) == 0);
- nng_msg_free(cmd);
})
-#endif
})
})