aboutsummaryrefslogtreecommitdiff
path: root/tests/pipeline.c
diff options
context:
space:
mode:
Diffstat (limited to 'tests/pipeline.c')
-rw-r--r--tests/pipeline.c287
1 files changed, 135 insertions, 152 deletions
diff --git a/tests/pipeline.c b/tests/pipeline.c
index 70b52350..78002fc6 100644
--- a/tests/pipeline.c
+++ b/tests/pipeline.c
@@ -16,172 +16,155 @@
So(nng_msg_len(m) == strlen(s)); \
So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
-Main({
+TestMain("PIPELINE (PUSH/PULL) pattern", {
+ atexit(nng_fini);
const char *addr = "inproc://test";
+ Convey("We can create a PUSH socket", {
+ nng_socket push;
- Test("PIPELINE (PUSH/PULL) pattern", {
- Convey("We can create a PUSH socket", {
- nng_socket push;
+ So(nng_push_open(&push) == 0);
- So(nng_push_open(&push) == 0);
+ Reset({ nng_close(push); });
- Reset({ nng_close(push); });
-
- Convey("Protocols match", {
- So(nng_protocol(push) == NNG_PROTO_PUSH);
- So(nng_peer(push) == NNG_PROTO_PULL);
- });
+ Convey("Protocols match", {
+ So(nng_protocol(push) == NNG_PROTO_PUSH);
+ So(nng_peer(push) == NNG_PROTO_PULL);
+ });
- Convey("Recv fails", {
- nng_msg *msg;
- So(nng_recvmsg(push, &msg, 0) == NNG_ENOTSUP);
- });
+ Convey("Recv fails", {
+ nng_msg *msg;
+ So(nng_recvmsg(push, &msg, 0) == NNG_ENOTSUP);
});
+ });
- Convey("We can create a PULL socket", {
- nng_socket pull;
- So(nng_pull_open(&pull) == 0);
+ Convey("We can create a PULL socket", {
+ nng_socket pull;
+ So(nng_pull_open(&pull) == 0);
- Reset({ nng_close(pull); });
+ Reset({ nng_close(pull); });
- Convey("Protocols match", {
- So(nng_protocol(pull) == NNG_PROTO_PULL);
- So(nng_peer(pull) == NNG_PROTO_PUSH);
- });
+ Convey("Protocols match", {
+ So(nng_protocol(pull) == NNG_PROTO_PULL);
+ So(nng_peer(pull) == NNG_PROTO_PUSH);
+ });
- Convey("Send fails", {
- nng_msg *msg;
- So(nng_msg_alloc(&msg, 0) == 0);
- So(nng_sendmsg(pull, msg, 0) == NNG_ENOTSUP);
- nng_msg_free(msg);
- });
+ Convey("Send fails", {
+ nng_msg *msg;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(pull, msg, 0) == NNG_ENOTSUP);
+ nng_msg_free(msg);
});
+ });
- Convey("We can create a linked PUSH/PULL pair", {
- nng_socket push;
- nng_socket pull;
- nng_socket what;
-
- So(nng_push_open(&push) == 0);
- So(nng_pull_open(&pull) == 0);
- So(nng_push_open(&what) == 0);
-
- Reset({
- nng_close(push);
- nng_close(pull);
- nng_close(what);
- });
-
- // Its important to avoid a startup race that the
- // sender be the dialer. Otherwise you need a delay
- // since the server accept is really asynchronous.
- So(nng_listen(pull, addr, NULL, 0) == 0);
- So(nng_dial(push, addr, NULL, 0) == 0);
- So(nng_dial(what, addr, NULL, 0) == 0);
- So(nng_shutdown(what) == 0);
-
- Convey("Push can send messages, and pull can recv", {
- nng_msg *msg;
-
- So(nng_msg_alloc(&msg, 0) == 0);
- APPENDSTR(msg, "hello");
- So(nng_sendmsg(push, msg, 0) == 0);
- msg = NULL;
- So(nng_recvmsg(pull, &msg, 0) == 0);
- So(msg != NULL);
- CHECKSTR(msg, "hello");
- nng_msg_free(msg);
- });
+ Convey("We can create a linked PUSH/PULL pair", {
+ nng_socket push;
+ nng_socket pull;
+ nng_socket what;
+
+ So(nng_push_open(&push) == 0);
+ So(nng_pull_open(&pull) == 0);
+ So(nng_push_open(&what) == 0);
+
+ Reset({
+ nng_close(push);
+ nng_close(pull);
+ nng_close(what);
});
- Convey("Load balancing", {
- nng_msg * abc;
- nng_msg * def;
- uint64_t usecs;
- int len;
- nng_socket push;
- nng_socket pull1;
- nng_socket pull2;
- nng_socket pull3;
-
- So(nng_push_open(&push) == 0);
- So(nng_pull_open(&pull1) == 0);
- So(nng_pull_open(&pull2) == 0);
- So(nng_pull_open(&pull3) == 0);
-
- Reset({
- nng_close(push);
- nng_close(pull1);
- nng_close(pull2);
- nng_close(pull3);
- });
-
- // We need to increase the buffer from zero, because
- // there is no guarantee that the various listeners
- // will be present, which means that they will push
- // back during load balancing. Adding a small buffer
- // ensures that we can write to each stream, even if
- // the listeners are not running yet.
- len = 4;
- So(nng_setopt(
- push, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0);
- So(nng_setopt(
- push, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0);
- So(nng_setopt(
- pull1, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0);
- So(nng_setopt(
- pull1, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0);
- So(nng_setopt(
- pull2, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0);
- So(nng_setopt(
- pull2, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0);
- So(nng_setopt(
- pull3, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0);
- So(nng_setopt(
- pull3, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0);
-
- So(nng_msg_alloc(&abc, 0) == 0);
- APPENDSTR(abc, "abc");
- So(nng_msg_alloc(&def, 0) == 0);
- APPENDSTR(def, "def");
-
- usecs = 100000;
- So(nng_setopt(pull1, NNG_OPT_RCVTIMEO, &usecs,
- sizeof(usecs)) == 0);
- So(nng_setopt(pull2, NNG_OPT_RCVTIMEO, &usecs,
- sizeof(usecs)) == 0);
- So(nng_setopt(pull3, NNG_OPT_RCVTIMEO, &usecs,
- sizeof(usecs)) == 0);
- So(nng_listen(push, addr, NULL, 0) == 0);
- So(nng_dial(pull1, addr, NULL, 0) == 0);
- So(nng_dial(pull2, addr, NULL, 0) == 0);
- So(nng_dial(pull3, addr, NULL, 0) == 0);
- So(nng_shutdown(pull3) == 0);
-
- // So pull3 might not be done accepting yet, but pull1
- // and pull2 definitely are, because otherwise the
- // server couldn't have gotten to the accept. (The
- // accept logic is single threaded.) Let's wait a bit
- // though, to ensure that stuff has settled.
- nng_usleep(100000);
-
- So(nng_sendmsg(push, abc, 0) == 0);
- So(nng_sendmsg(push, def, 0) == 0);
-
- abc = NULL;
- def = NULL;
-
- So(nng_recvmsg(pull1, &abc, 0) == 0);
- CHECKSTR(abc, "abc");
- So(nng_recvmsg(pull2, &def, 0) == 0);
- CHECKSTR(def, "def");
- nng_msg_free(abc);
- nng_msg_free(def);
-
- So(nng_recvmsg(pull1, &abc, 0) == NNG_ETIMEDOUT);
- So(nng_recvmsg(pull2, &abc, 0) == NNG_ETIMEDOUT);
+ // Its important to avoid a startup race that the
+ // sender be the dialer. Otherwise you need a delay
+ // since the server accept is really asynchronous.
+ So(nng_listen(pull, addr, NULL, 0) == 0);
+ So(nng_dial(push, addr, NULL, 0) == 0);
+ So(nng_dial(what, addr, NULL, 0) == 0);
+ So(nng_shutdown(what) == 0);
+
+ Convey("Push can send messages, and pull can recv", {
+ nng_msg *msg;
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "hello");
+ So(nng_sendmsg(push, msg, 0) == 0);
+ msg = NULL;
+ So(nng_recvmsg(pull, &msg, 0) == 0);
+ So(msg != NULL);
+ CHECKSTR(msg, "hello");
+ nng_msg_free(msg);
});
});
- nng_fini();
-})
+ Convey("Load balancing", {
+ nng_msg * abc;
+ nng_msg * def;
+ uint64_t usecs;
+ nng_socket push;
+ nng_socket pull1;
+ nng_socket pull2;
+ nng_socket pull3;
+
+ So(nng_push_open(&push) == 0);
+ So(nng_pull_open(&pull1) == 0);
+ So(nng_pull_open(&pull2) == 0);
+ So(nng_pull_open(&pull3) == 0);
+
+ Reset({
+ nng_close(push);
+ nng_close(pull1);
+ nng_close(pull2);
+ nng_close(pull3);
+ });
+
+ // We need to increase the buffer from zero, because
+ // there is no guarantee that the various listeners
+ // will be present, which means that they will push
+ // back during load balancing. Adding a small buffer
+ // ensures that we can write to each stream, even if
+ // the listeners are not running yet.
+ So(nng_setopt_int(push, nng_optid_recvbuf, 4) == 0);
+ So(nng_setopt_int(push, nng_optid_sendbuf, 4) == 0);
+ So(nng_setopt_int(pull1, nng_optid_recvbuf, 4) == 0);
+ So(nng_setopt_int(pull1, nng_optid_sendbuf, 4) == 0);
+ So(nng_setopt_int(pull2, nng_optid_recvbuf, 4) == 0);
+ So(nng_setopt_int(pull2, nng_optid_sendbuf, 4) == 0);
+ So(nng_setopt_int(pull3, nng_optid_recvbuf, 4) == 0);
+ So(nng_setopt_int(pull3, nng_optid_sendbuf, 4) == 0);
+
+ So(nng_msg_alloc(&abc, 0) == 0);
+ APPENDSTR(abc, "abc");
+ So(nng_msg_alloc(&def, 0) == 0);
+ APPENDSTR(def, "def");
+
+ usecs = 100000;
+ So(nng_setopt_usec(pull1, nng_optid_recvtimeo, usecs) == 0);
+ So(nng_setopt_usec(pull2, nng_optid_recvtimeo, usecs) == 0);
+ So(nng_setopt_usec(pull3, nng_optid_recvtimeo, usecs) == 0);
+ So(nng_listen(push, addr, NULL, 0) == 0);
+ So(nng_dial(pull1, addr, NULL, 0) == 0);
+ So(nng_dial(pull2, addr, NULL, 0) == 0);
+ So(nng_dial(pull3, addr, NULL, 0) == 0);
+ So(nng_shutdown(pull3) == 0);
+
+ // So pull3 might not be done accepting yet, but pull1
+ // and pull2 definitely are, because otherwise the
+ // server couldn't have gotten to the accept. (The
+ // accept logic is single threaded.) Let's wait a bit
+ // though, to ensure that stuff has settled.
+ nng_usleep(100000);
+
+ So(nng_sendmsg(push, abc, 0) == 0);
+ So(nng_sendmsg(push, def, 0) == 0);
+
+ abc = NULL;
+ def = NULL;
+
+ So(nng_recvmsg(pull1, &abc, 0) == 0);
+ CHECKSTR(abc, "abc");
+ So(nng_recvmsg(pull2, &def, 0) == 0);
+ CHECKSTR(def, "def");
+ nng_msg_free(abc);
+ nng_msg_free(def);
+
+ So(nng_recvmsg(pull1, &abc, 0) == NNG_ETIMEDOUT);
+ So(nng_recvmsg(pull2, &abc, 0) == NNG_ETIMEDOUT);
+ });
+});