diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-06 10:07:19 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-06 10:07:19 -0800 |
| commit | 59a4b69c7989ac4fef1645f929abfa91b884215b (patch) | |
| tree | 5ac708af2420fbc67b17ae25bfe253878012bea6 | |
| parent | a2801adffebb6a3679e41789b38ba925ed32832b (diff) | |
| download | nng-59a4b69c7989ac4fef1645f929abfa91b884215b.tar.gz nng-59a4b69c7989ac4fef1645f929abfa91b884215b.tar.bz2 nng-59a4b69c7989ac4fef1645f929abfa91b884215b.zip | |
Fixes for PUB/SUB.
This fixes several issues, and brings PUB/SUB to operational
correctness. Included is test code to verify that.
| -rw-r--r-- | src/protocol/pubsub/pub.c | 5 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 4 | ||||
| -rw-r--r-- | tests/pubsub.c | 140 |
3 files changed, 63 insertions, 86 deletions
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index b1f91fe2..860c7c7d 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -121,7 +121,6 @@ nni_pub_pipe_add(void *arg) { nni_pub_pipe *pp = arg; nni_pub_sock *pub = pp->pub; - int rv; if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) { return (NNG_EPROTO); @@ -130,7 +129,7 @@ nni_pub_pipe_add(void *arg) nni_list_append(&pub->pipes, pp); nni_mtx_unlock(&pub->mx); - return (rv); + return (0); } @@ -173,7 +172,7 @@ nni_pub_broadcast(void *arg) } else { dup = msg; } - if ((rv = nni_msgq_tryput(pp->sendq, msg)) != 0) { + if ((rv = nni_msgq_tryput(pp->sendq, dup)) != 0) { nni_msg_free(dup); } } diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 1243ca47..19c06aa0 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -200,6 +200,7 @@ nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz) } NNI_LIST_NODE_INIT(&newtopic->node); newtopic->len = sz; + memcpy(newtopic->buf, buf, sz); if (topic != NULL) { nni_list_insert_before(&sub->topics, newtopic, topic); } else { @@ -306,6 +307,7 @@ nni_sub_recvfilter(void *arg, nni_msg *msg) body = nni_msg_body(msg); len = nni_msg_len(msg); + match = 0; // Check to see if the message matches one of our subscriptions. NNI_LIST_FOREACH (&sub->topics, topic) { if (len >= topic->len) { @@ -329,7 +331,7 @@ nni_sub_recvfilter(void *arg, nni_msg *msg) nni_msg_free(msg); return (NULL); } - return (0); + return (msg); } diff --git a/tests/pubsub.c b/tests/pubsub.c index c693cf52..df828241 100644 --- a/tests/pubsub.c +++ b/tests/pubsub.c @@ -12,6 +12,10 @@ #include <string.h> +#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) +#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\ + So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) + Main({ int rv; const char *addr = "inproc://test"; @@ -20,8 +24,7 @@ Main({ Convey("We can create a PUB socket", { nng_socket *pub; - rv = nng_open(&pub, NNG_PROTO_PUB); - So(rv == 0); + So(nng_open(&pub, NNG_PROTO_PUB) == 0); So(pub != NULL); Reset({ @@ -35,15 +38,13 @@ Main({ Convey("Recv fails", { nng_msg *msg; - rv = nng_recvmsg(pub, &msg, 0); - So(rv == NNG_ENOTSUP); + So(nng_recvmsg(pub, &msg, 0) == NNG_ENOTSUP); }) }) Convey("We can create a SUB socket", { nng_socket *sub; - rv = nng_open(&sub, NNG_PROTO_SUB); - So(rv == 0); + So(nng_open(&sub, NNG_PROTO_SUB) == 0); So(sub != NULL); Reset({ @@ -57,10 +58,8 @@ Main({ Convey("Send fails", { nng_msg *msg; - rv = nng_msg_alloc(&msg, 0); - So(rv == 0); - rv = nng_sendmsg(sub, msg, 0); - So(rv == NNG_ENOTSUP); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(sub, msg, 0) == NNG_ENOTSUP); nng_msg_free(msg); }) }) @@ -98,92 +97,69 @@ Main({ Convey("Pub cannot subscribe", { So(nng_setopt(pub, NNG_OPT_SUBSCRIBE, "", 0) == NNG_ENOTSUP); }) -#if 0 - Convey("They can REQ/REP exchange", { - nng_msg *ping; - nng_msg *pong; - char *body; - size_t len; - - So(nng_msg_alloc(&ping, 0) == 0); - So(nng_msg_append(ping, "ping", 5) == 0); - body = nng_msg_body(ping, &len); - So(len == 5); - So(memcmp(body, "ping", 5) == 0); - So(nng_sendmsg(req, ping, 0) == 0); - pong = NULL; - So(nng_recvmsg(rep, &pong, 0) == 0); - So(pong != NULL); - body = nng_msg_body(pong, &len); - So(len == 5); - So(memcmp(body, "ping", 5) == 0); - nng_msg_trim(pong, 5); - So(nng_msg_append(pong, "pong", 5) == 0); - So(nng_sendmsg(rep, pong, 0) == 0); - ping = 0; - So(nng_recvmsg(req, &ping, 0) == 0); - So(ping != NULL); - body = nng_msg_body(ping, &len); - So(len == 5); - So(memcmp(body, "pong", 5) == 0); - nng_msg_free(ping); - }) -#endif - }) -#if 0 - Convey("Request cancellation works", { - nng_msg *abc; - nng_msg *def; - nng_msg *cmd; - nng_msg *nvm; - char *body; - size_t len; - uint64_t retry = 100000; // 100 ms + Convey("Subs can receive from pubs", { + nng_msg *msg; + uint64_t rtimeo; + + + So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "/some/", strlen("/some/")) == 0); + rtimeo = 50000; // 50ms + So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); - nng_socket *req; - nng_socket *rep; + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "/some/like/it/hot"); + So(nng_sendmsg(pub, msg, 0) == 0); + So(nng_recvmsg(sub, &msg, 0) == 0); + CHECKSTR(msg, "/some/like/it/hot"); + nng_msg_free(msg); - So(nng_open(&rep, NNG_PROTO_REP) == 0); - So(rep != NULL); + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "/somewhere/over/the/rainbow"); + CHECKSTR(msg, "/somewhere/over/the/rainbow"); - So(nng_open(&req, NNG_PROTO_REQ) == 0); - So(req != NULL); + So(nng_sendmsg(pub, msg, 0) == 0); + So(nng_recvmsg(sub, &msg, 0) == NNG_ETIMEDOUT); - Reset({ - nng_close(rep); - nng_close(req); + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "/some/day/some/how"); + CHECKSTR(msg, "/some/day/some/how"); + + So(nng_sendmsg(pub, msg, 0) == 0); + So(nng_recvmsg(sub, &msg, 0) == 0); + CHECKSTR(msg, "/some/day/some/how"); + nng_msg_free(msg); }) - So(nng_setopt(req, NNG_OPT_RESENDTIME, &retry, sizeof (retry)) == 0); - len = 16; - So(nng_setopt(req, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0); + Convey("Subs without subsciptions don't receive", { - So(nng_msg_alloc(&abc, 0) == 0); - So(nng_msg_append(abc, "abc", 4) == 0); - So(nng_msg_alloc(&def, 0) == 0); - So(nng_msg_append(def, "def", 4) == 0); + uint64_t rtimeo = 50000; // 50ms + nng_msg *msg; + So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); - So(nng_dial(req, addr, NULL, 0) == 0); + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "/some/don't/like/it"); + So(nng_sendmsg(pub, msg, 0) == 0); + So(nng_recvmsg(sub, &msg, 0) == NNG_ETIMEDOUT); + }) - So(nng_sendmsg(req, abc, 0) == 0); - So(nng_sendmsg(req, def, 0) == 0); + Convey("Subs in raw receive", { - So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0); + uint64_t rtimeo = 50000; // 50ms + int raw = 1; + nng_msg *msg; - So(nng_recvmsg(rep, &cmd, 0) == 0); - So(cmd != NULL); - So(nng_sendmsg(rep, cmd, 0) == 0); - So(nng_recvmsg(rep, &cmd, 0) == 0); - So(nng_sendmsg(rep, cmd, 0) == 0); + So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); + So(nng_setopt(sub, NNG_OPT_RAW, &raw, sizeof (raw)) == 0); - So(nng_recvmsg(req, &cmd, 0) == 0); + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "/some/like/it/raw"); + So(nng_sendmsg(pub, msg, 0) == 0); + So(nng_recvmsg(sub, &msg, 0) == 0); + CHECKSTR(msg, "/some/like/it/raw"); + nng_msg_free(msg); + }) - body = nng_msg_body(cmd, &len); - So(len == 4); - So(memcmp(body, "def", 4) == 0); - nng_msg_free(cmd); }) -#endif }) }) |
