From d64f12553eb6ceb67ed6f6a5b2ceb6c061d375ba Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Tue, 8 Aug 2017 21:19:09 -0700 Subject: fixes #44 open protocol by "name" (symbol) instead number fixes #38 Make protocols "pluggable", or at least optional This is a breaking change, as we've done away with the central registered list of protocols, and instead demand the user call nng_xxx_open() where xxx is a protocol name. (We did keep a table around in the compat framework though.) There is a nice way for protocols to plug in via an nni_proto_open(), where they can use a generic constructor that they use to build a protocol specific constructor (passing their ops vector in.) --- perf/perf.c | 118 +++++++-------- src/core/device.c | 4 +- src/core/protocol.c | 83 +---------- src/core/protocol.h | 36 +++-- src/core/socket.c | 134 ++++++----------- src/core/socket.h | 7 +- src/nng.c | 16 -- src/nng.h | 67 ++++++--- src/nng_compat.c | 35 ++++- src/protocol/bus/bus.c | 21 ++- src/protocol/pair/pair.c | 29 +++- src/protocol/pipeline/pull.c | 32 +++- src/protocol/pipeline/push.c | 32 +++- src/protocol/pubsub/pub.c | 21 ++- src/protocol/pubsub/sub.c | 26 +++- src/protocol/reqrep/rep.c | 12 +- src/protocol/reqrep/req.c | 19 ++- src/protocol/survey/respond.c | 12 +- src/protocol/survey/survey.c | 14 +- tests/bus.c | 61 ++++---- tests/event.c | 68 +++++---- tests/pipeline.c | 100 +++++++------ tests/pollfd.c | 174 +++++++++++----------- tests/pubsub.c | 113 ++++++++------- tests/reqrep.c | 58 ++++---- tests/scalability.c | 4 +- tests/sock.c | 330 ++++++++++++++++++++++-------------------- tests/survey.c | 240 ++++++++++++++++-------------- tests/tcp.c | 24 ++- tests/trantest.h | 4 +- 30 files changed, 1004 insertions(+), 890 deletions(-) diff --git a/perf/perf.c b/perf/perf.c index e0526282..4827d4a2 100644 --- a/perf/perf.c +++ b/perf/perf.c @@ -1,5 +1,6 @@ // -// Copyright 2016 Garrett D'Amore +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -9,11 +10,11 @@ #include "nng.h" +#include #include #include #include #include -#include // We steal access to the clock and thread functions so that we can // work on Windows too. These functions are *not* part of nng's public @@ -108,22 +109,20 @@ die(const char *fmt, ...) exit(2); } - static int parse_int(const char *arg, const char *what) { - long val; + long val; char *eptr; val = strtol(arg, &eptr, 10); // Must be a postive number less than around a billion. - if ((val < 0) || (val > (1<<30)) || (*eptr != 0) || (eptr == arg)) { + if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) { die("Invalid %s", what); } return ((int) val); } - void do_local_lat(int argc, char **argv) { @@ -135,12 +134,11 @@ do_local_lat(int argc, char **argv) } msgsize = parse_int(argv[1], "message size"); - trips = parse_int(argv[2], "round-trips"); + trips = parse_int(argv[2], "round-trips"); latency_server(argv[0], msgsize, trips); } - void do_remote_lat(int argc, char **argv) { @@ -152,12 +150,11 @@ do_remote_lat(int argc, char **argv) } msgsize = parse_int(argv[1], "message size"); - trips = parse_int(argv[2], "round-trips"); + trips = parse_int(argv[2], "round-trips"); latency_client(argv[0], msgsize, trips); } - void do_local_thr(int argc, char **argv) { @@ -169,12 +166,11 @@ do_local_thr(int argc, char **argv) } msgsize = parse_int(argv[1], "message size"); - trips = parse_int(argv[2], "count"); + trips = parse_int(argv[2], "count"); throughput_server(argv[0], msgsize, trips); } - void do_remote_thr(int argc, char **argv) { @@ -186,17 +182,16 @@ do_remote_thr(int argc, char **argv) } msgsize = parse_int(argv[1], "message size"); - trips = parse_int(argv[2], "count"); + trips = parse_int(argv[2], "count"); throughput_client(argv[0], msgsize, trips); } - struct inproc_args { - int count; - int msgsize; - const char * addr; - void (*func)(const char *, int, int); + int count; + int msgsize; + const char *addr; + void (*func)(const char *, int, int); }; static void @@ -207,23 +202,22 @@ do_inproc(void *args) ia->func(ia->addr, ia->msgsize, ia->count); } - void do_inproc_lat(int argc, char **argv) { - nni_thr thr; + nni_thr thr; struct inproc_args ia; - int rv; + int rv; nni_init(); if (argc != 2) { die("Usage: inproc_lat "); } - ia.addr = "inproc://latency_test"; + ia.addr = "inproc://latency_test"; ia.msgsize = parse_int(argv[0], "message size"); - ia.count = parse_int(argv[1], "count"); - ia.func = latency_server; + ia.count = parse_int(argv[1], "count"); + ia.func = latency_server; // Sleep a bit. nng_usleep(100000); @@ -236,24 +230,22 @@ do_inproc_lat(int argc, char **argv) nni_thr_fini(&thr); } - void do_inproc_thr(int argc, char **argv) { - nni_thr thr; + nni_thr thr; struct inproc_args ia; - int rv; + int rv; nni_init(); if (argc != 2) { die("Usage: inproc_thr "); } - ia.addr = "inproc://tput_test"; + ia.addr = "inproc://tput_test"; ia.msgsize = parse_int(argv[0], "message size"); - ia.count = parse_int(argv[1], "count"); - ia.func = throughput_client; - + ia.count = parse_int(argv[1], "count"); + ia.func = throughput_client; if ((rv = nni_thr_init(&thr, do_inproc, &ia)) != 0) { die("Cannot create thread: %s", nng_strerror(rv)); @@ -263,19 +255,18 @@ do_inproc_thr(int argc, char **argv) nni_thr_fini(&thr); } - void latency_client(const char *addr, int msgsize, int trips) { nng_socket s; - nng_msg *msg; - nni_time start, end; - int rv; - int i; - float total; - float latency; - - if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) { + nng_msg * msg; + nni_time start, end; + int rv; + int i; + float total; + float latency; + + if ((rv = nng_pair_open(&s)) != 0) { die("nng_socket: %s", nng_strerror(rv)); } @@ -305,7 +296,7 @@ latency_client(const char *addr, int msgsize, int trips) nni_msg_free(msg); nng_close(s); - total = (float)(end - start); + total = (float) (end - start); latency = (total / (trips * 2)); printf("total time: %.3f [s]\n", total / 1000000.0); printf("message size: %d [B]\n", msgsize); @@ -313,16 +304,15 @@ latency_client(const char *addr, int msgsize, int trips) printf("average latency: %.3f [us]\n", latency); } - void latency_server(const char *addr, int msgsize, int trips) { nng_socket s; - nng_msg *msg; - int rv; - int i; + nng_msg * msg; + int rv; + int i; - if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) { + if ((rv = nng_pair_open(&s)) != 0) { die("nng_socket: %s", nng_strerror(rv)); } @@ -352,7 +342,6 @@ latency_server(const char *addr, int msgsize, int trips) nng_close(s); } - // Our throughput story is quite a mess. Mostly I think because of the poor // caching and message reuse. We should probably implement a message pooling // API somewhere. @@ -361,18 +350,18 @@ void throughput_server(const char *addr, int msgsize, int count) { nng_socket s; - nng_msg *msg; - int rv; - int i; - size_t len; - uint64_t start, end; - double msgpersec, mbps, total; - - if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) { + nng_msg * msg; + int rv; + int i; + size_t len; + uint64_t start, end; + double msgpersec, mbps, total; + + if ((rv = nng_pair_open(&s)) != 0) { die("nng_socket: %s", nng_strerror(rv)); } len = 128; - rv = nng_setopt(s, NNG_OPT_RCVBUF, &len, sizeof (len)); + rv = nng_setopt(s, NNG_OPT_RCVBUF, &len, sizeof(len)); if (rv != 0) { die("nng_setopt(NNG_OPT_RCVBUF): %s", nng_strerror(rv)); } @@ -403,9 +392,9 @@ throughput_server(const char *addr, int msgsize, int count) } end = nni_clock(); nng_close(s); - total = (end - start) / 1.0; + total = (end - start) / 1.0; msgpersec = (count * 1000000.0) / total; - mbps = (count * 8.0 * msgsize); + mbps = (count * 8.0 * msgsize); mbps /= total; printf("total time: %.3f [s]\n", total / 1000000.0); printf("message size: %d [B]\n", msgsize); @@ -414,20 +403,19 @@ throughput_server(const char *addr, int msgsize, int count) printf("throughput: %.3f [Mb/s]\n", mbps); } - void throughput_client(const char *addr, int msgsize, int count) { nng_socket s; - nng_msg *msg; - int rv; - int i; - int len; + nng_msg * msg; + int rv; + int i; + int len; // We send one extra zero length message to start the timer. count++; - if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) { + if ((rv = nng_pair_open(&s)) != 0) { die("nng_socket: %s", nng_strerror(rv)); } @@ -435,7 +423,7 @@ throughput_client(const char *addr, int msgsize, int count) // XXX: other options (TLS in the future?, Linger?) len = 128; - rv = nng_setopt(s, NNG_OPT_SNDBUF, &len, sizeof (len)); + rv = nng_setopt(s, NNG_OPT_SNDBUF, &len, sizeof(len)); if (rv != 0) { die("nng_setopt(NNG_OPT_SNDBUF): %s", nng_strerror(rv)); } diff --git a/src/core/device.c b/src/core/device.c index cd0c8dfc..81aafc3d 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -80,8 +80,8 @@ nni_device(nni_sock *sock1, nni_sock *sock2) rv = NNG_EINVAL; goto out; } - if ((sock1->s_peer != sock2->s_protocol) || - (sock2->s_peer != sock1->s_protocol)) { + if ((sock1->s_peer_id.p_id != sock2->s_self_id.p_id) || + (sock2->s_peer_id.p_id != sock1->s_self_id.p_id)) { rv = NNG_EINVAL; goto out; } diff --git a/src/core/protocol.c b/src/core/protocol.c index a60454de..7faf9068 100644 --- a/src/core/protocol.c +++ b/src/core/protocol.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -13,82 +14,14 @@ // Protocol related stuff - generically. -// The list of protocols is hardwired. This is reasonably unlikely to -// change, as adding new protocols is not something intended to be done -// outside of the core. -extern nni_proto nni_bus_proto; -extern nni_proto nni_pair_proto; -extern nni_proto nni_rep_proto; -extern nni_proto nni_req_proto; -extern nni_proto nni_pub_proto; -extern nni_proto nni_sub_proto; -extern nni_proto nni_push_proto; -extern nni_proto nni_pull_proto; -extern nni_proto nni_surveyor_proto; -extern nni_proto nni_respondent_proto; - -static nni_proto *protocols[] = { - // clang-format off - &nni_bus_proto, - &nni_pair_proto, - &nni_rep_proto, - &nni_req_proto, - &nni_pub_proto, - &nni_sub_proto, - &nni_push_proto, - &nni_pull_proto, - &nni_surveyor_proto, - &nni_respondent_proto, - NULL - // clang-format on -}; - -nni_proto * -nni_proto_find(uint16_t num) -{ - int i; - nni_proto *p; - - for (i = 0; (p = protocols[i]) != NULL; i++) { - if (p->proto_self == num) { - break; - } - } - return (p); -} - -const char * -nni_proto_name(uint16_t num) -{ - nni_proto *p; - - if ((p = nni_proto_find(num)) == NULL) { - return (NULL); - } - return (p->proto_name); -} - -uint16_t -nni_proto_number(const char *name) -{ - nni_proto *p; - int i; - - for (i = 0; (p = protocols[i]) != NULL; i++) { - if (strcmp(p->proto_name, name) == 0) { - return (p->proto_self); - } - } - return (NNG_PROTO_NONE); -} - -uint16_t -nni_proto_peer(uint16_t num) +int +nni_proto_open(nng_socket *sockidp, const nni_proto *proto) { - nni_proto *p; + int rv; + nni_sock *sock; - if ((p = nni_proto_find(num)) == NULL) { - return (NNG_PROTO_NONE); + if ((rv = nni_sock_open(&sock, proto)) == 0) { + *sockidp = nni_sock_id(sock); // Keep socket held open. } - return (p->proto_peer); + return (rv); } diff --git a/src/core/protocol.h b/src/core/protocol.h index 3a133469..14954b49 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -47,7 +48,7 @@ struct nni_proto_pipe_ops { }; struct nni_proto_sock_ops { - // sock_initf creates the protocol instance, which will be stored on + // sock_init creates the protocol instance, which will be stored on // the socket. This is run without the sock lock held, and allocates // storage or other resources for the socket. int (*sock_init)(void **, nni_sock *); @@ -82,15 +83,31 @@ struct nni_proto_sock_ops { nni_msg *(*sock_sfilter)(void *, nni_msg *); }; +typedef struct nni_proto_id { + uint16_t p_id; + const char *p_name; +} nni_proto_id; + struct nni_proto { - uint16_t proto_self; // our 16-bit D - uint16_t proto_peer; // who we peer with (ID) - const char * proto_name; // Our name + uint32_t proto_version; // Ops vector version + nni_proto_id proto_self; // Our identity + nni_proto_id proto_peer; // Peer identity uint32_t proto_flags; // Protocol flags const nni_proto_sock_ops *proto_sock_ops; // Per-socket opeations const nni_proto_pipe_ops *proto_pipe_ops; // Per-pipe operations. }; +// We quite intentionally use a signature where the upper word is nonzero, +// which ensures that if we get garbage we will reject it. This is more +// likely to mismatch than all zero bytes would. The actual version is +// stored in the lower word; this is not semver -- the numbers are just +// increasing - we doubt it will increase more than a handful of times +// during the life of the project. If we add a new version, please keep +// the old version around -- it may be possible to automatically convert +// older versions in the future. +#define NNI_PROTOCOL_V0 0x50520000 // "pr\0\0" +#define NNI_PROTOCOL_VERSION NNI_PROTOCOL_V0 + // These flags determine which operations make sense. We use them so that // we can reject attempts to create notification fds for operations that make // no sense. @@ -98,11 +115,10 @@ struct nni_proto { #define NNI_PROTO_FLAG_SND 2 // Protocol can send #define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv -// These functions are not used by protocols, but rather by the socket -// core implementation. The lookups can be used by transports as well. -extern nni_proto * nni_proto_find(uint16_t); -extern const char *nni_proto_name(uint16_t); -extern uint16_t nni_proto_number(const char *); -extern uint16_t nni_proto_peer(uint16_t); +// nni_proto_open is called by the protocol to create a socket instance +// with its ops vector. The intent is that applications will only see +// the single protocol-specific constructure, like nng_pair_v0_open(), +// which should just be a thin wrapper around this. +extern int nni_proto_open(nng_socket *, const nni_proto *); #endif // CORE_PROTOCOL_H diff --git a/src/core/socket.c b/src/core/socket.c index f2dbb573..f01379a8 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -83,7 +83,7 @@ nni_sock_pipe_start(nni_pipe *pipe) // We're closing, bail out. return (NNG_ECLOSED); } - if (nni_pipe_peer(pipe) != sock->s_peer) { + if (nni_pipe_peer(pipe) != sock->s_peer_id.p_id) { // Peer protocol mismatch. return (NNG_EPROTO); } @@ -175,7 +175,7 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } - if (nni_pipe_peer(pipe) != sock->s_peer) { + if (nni_pipe_peer(pipe) != sock->s_peer_id.p_id) { nni_mtx_unlock(&sock->s_mx); return (NNG_EPROTO); } @@ -305,47 +305,6 @@ nni_sock_unnotify(nni_sock *sock, nni_notify *notify) NNI_FREE_STRUCT(notify); } -static nni_msg * -nni_sock_nullfilter(void *arg, nni_msg *mp) -{ - NNI_ARG_UNUSED(arg); - return (mp); -} - -static int -nni_sock_nullgetopt(void *arg, int num, void *data, size_t *szp) -{ - NNI_ARG_UNUSED(arg); - NNI_ARG_UNUSED(num); - NNI_ARG_UNUSED(data); - NNI_ARG_UNUSED(szp); - return (NNG_ENOTSUP); -} - -static int -nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz) -{ - NNI_ARG_UNUSED(arg); - NNI_ARG_UNUSED(num); - NNI_ARG_UNUSED(data); - NNI_ARG_UNUSED(sz); - return (NNG_ENOTSUP); -} - -static void -nni_sock_nullop(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static int -nni_sock_nullstartpipe(void *arg) -{ - NNI_ARG_UNUSED(arg); - - return (0); -} - static void * nni_sock_ctor(uint32_t id) { @@ -456,23 +415,23 @@ nni_sock_sys_fini(void) nni_mtx_fini(&nni_sock_lk); } -// nn_sock_open creates the underlying socket. int -nni_sock_open(nni_sock **sockp, uint16_t pnum) +nni_sock_open(nni_sock **sockp, const nni_proto *proto) { nni_sock * sock; - nni_proto * proto; int rv; nni_proto_sock_ops *sops; nni_proto_pipe_ops *pops; uint32_t sockid; + if (proto->proto_version != NNI_PROTOCOL_VERSION) { + // unsupported protocol version + return (NNG_ENOTSUP); + } + if ((rv = nni_init()) != 0) { return (rv); } - if ((proto = nni_proto_find(pnum)) == NULL) { - return (NNG_ENOTSUP); - } rv = nni_objhash_alloc(nni_socks, &sockid, (void **) &sock); if (rv != 0) { @@ -480,40 +439,20 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } // We make a copy of the protocol operations. - sock->s_protocol = proto->proto_self; - sock->s_peer = proto->proto_peer; + sock->s_self_id = proto->proto_self; + sock->s_peer_id = proto->proto_peer; sock->s_flags = proto->proto_flags; sock->s_sock_ops = *proto->proto_sock_ops; + sock->s_pipe_ops = *proto->proto_pipe_ops; sops = &sock->s_sock_ops; - if (sops->sock_sfilter == NULL) { - sops->sock_sfilter = nni_sock_nullfilter; - } - if (sops->sock_rfilter == NULL) { - sops->sock_rfilter = nni_sock_nullfilter; - } - if (sops->sock_getopt == NULL) { - sops->sock_getopt = nni_sock_nullgetopt; - } - if (sops->sock_setopt == NULL) { - sops->sock_setopt = nni_sock_nullsetopt; - } - if (sops->sock_close == NULL) { - sops->sock_close = nni_sock_nullop; - } - if (sops->sock_open == NULL) { - sops->sock_open = nni_sock_nullop; - } - sock->s_pipe_ops = *proto->proto_pipe_ops; - pops = &sock->s_pipe_ops; - if (pops->pipe_start == NULL) { - pops->pipe_start = nni_sock_nullstartpipe; - } - if (pops->pipe_stop == NULL) { - pops->pipe_stop = nni_sock_nullop; - } + NNI_ASSERT(sock->s_sock_ops.sock_open != NULL); + NNI_ASSERT(sock->s_sock_ops.sock_close != NULL); + + NNI_ASSERT(sock->s_pipe_ops.pipe_start != NULL); + NNI_ASSERT(sock->s_pipe_ops.pipe_stop != NULL); - if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) { + if ((rv = sock->s_sock_ops.sock_init(&sock->s_data, sock)) != 0) { nni_objhash_unref(nni_socks, sockid); return (rv); } @@ -730,7 +669,9 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) } besteffort = sock->s_besteffort; - msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg); + if (sock->s_sock_ops.sock_sfilter != NULL) { + msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg); + } nni_mtx_unlock(&sock->s_mx); if (msg == NULL) { @@ -780,9 +721,11 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) if (rv != 0) { return (rv); } - nni_mtx_lock(&sock->s_mx); - msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg); - nni_mtx_unlock(&sock->s_mx); + if (sock->s_sock_ops.sock_rfilter != NULL) { + nni_mtx_lock(&sock->s_mx); + msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg); + nni_mtx_unlock(&sock->s_mx); + } if (msg != NULL) { break; } @@ -797,13 +740,13 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) uint16_t nni_sock_proto(nni_sock *sock) { - return (sock->s_protocol); + return (sock->s_self_id.p_id); } uint16_t nni_sock_peer(nni_sock *sock) { - return (sock->s_peer); + return (sock->s_peer_id.p_id); } nni_duration @@ -924,10 +867,13 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size) nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } - rv = sock->s_sock_ops.sock_setopt(sock->s_data, opt, val, size); - if (rv != NNG_ENOTSUP) { - nni_mtx_unlock(&sock->s_mx); - return (rv); + if (sock->s_sock_ops.sock_setopt != NULL) { + rv = + sock->s_sock_ops.sock_setopt(sock->s_data, opt, val, size); + if (rv != NNG_ENOTSUP) { + nni_mtx_unlock(&sock->s_mx); + return (rv); + } } switch (opt) { case NNG_OPT_LINGER: @@ -970,11 +916,15 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } - rv = sock->s_sock_ops.sock_getopt(sock->s_data, opt, val, sizep); - if (rv != NNG_ENOTSUP) { - nni_mtx_unlock(&sock->s_mx); - return (rv); + if (sock->s_sock_ops.sock_getopt != NULL) { + rv = sock->s_sock_ops.sock_getopt( + sock->s_data, opt, val, sizep); + if (rv != NNG_ENOTSUP) { + nni_mtx_unlock(&sock->s_mx); + return (rv); + } } + switch (opt) { case NNG_OPT_LINGER: rv = nni_getopt_duration(&sock->s_linger, val, sizep); diff --git a/src/core/socket.h b/src/core/socket.h index 54c1b4da..6824b04b 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -25,8 +25,9 @@ struct nni_socket { nni_list_node s_node; - uint16_t s_protocol; - uint16_t s_peer; + nni_proto_id s_self_id; + nni_proto_id s_peer_id; + uint32_t s_flags; nni_proto_pipe_ops s_pipe_ops; @@ -67,7 +68,7 @@ extern void nni_sock_sys_fini(void); extern int nni_sock_find(nni_sock **, uint32_t); extern void nni_sock_rele(nni_sock *); -extern int nni_sock_open(nni_sock **, uint16_t); +extern int nni_sock_open(nni_sock **, const nni_proto *); extern void nni_sock_close(nni_sock *); extern void nni_sock_closeall(void); extern int nni_sock_shutdown(nni_sock *); diff --git a/src/nng.c b/src/nng.c index d5cc4f2b..815fb2b8 100644 --- a/src/nng.c +++ b/src/nng.c @@ -21,22 +21,6 @@ #include -int -nng_open(nng_socket *sidp, uint16_t proto) -{ - int rv; - nni_sock *sock; - - if ((rv = nni_sock_open(&sock, proto)) != 0) { - return (rv); - } - *sidp = nni_sock_id(sock); - - // Keep the socket "held" until it is explicitly closed. - - return (0); -} - void nng_fini(void) { diff --git a/src/nng.h b/src/nng.h index aac7ec89..99c95b92 100644 --- a/src/nng.h +++ b/src/nng.h @@ -52,11 +52,6 @@ typedef struct nng_snapshot nng_snapshot; typedef struct nng_stat nng_stat; typedef uint32_t nng_endpoint; // XXX: REMOVE ME. -// nng_open simply creates a socket of the given class. It returns an -// error code on failure, or zero on success. The socket starts in cooked -// mode. -NNG_DECL int nng_open(nng_socket *, uint16_t proto); - // nng_fini is used to terminate the library, freeing certain global resources. // Its a good idea to call this with atexit() or during application shutdown. // For most cases, this call is optional, but failure to do so may cause @@ -293,20 +288,58 @@ enum nng_flag_enum { // a valid protocol numbered 0 (NNG_PROTO_NONE). #define NNG_PROTO(major, minor) (((major) *16) + (minor)) enum nng_proto_enum { - NNG_PROTO_NONE = NNG_PROTO(0, 0), - NNG_PROTO_PAIR = NNG_PROTO(1, 0), - NNG_PROTO_PUB = NNG_PROTO(2, 0), - NNG_PROTO_SUB = NNG_PROTO(2, 1), - NNG_PROTO_REQ = NNG_PROTO(3, 0), - NNG_PROTO_REP = NNG_PROTO(3, 1), - NNG_PROTO_PUSH = NNG_PROTO(5, 0), - NNG_PROTO_PULL = NNG_PROTO(5, 1), - NNG_PROTO_SURVEYOR = NNG_PROTO(6, 2), - NNG_PROTO_RESPONDENT = NNG_PROTO(6, 3), - NNG_PROTO_BUS = NNG_PROTO(7, 0), - NNG_PROTO_STAR = NNG_PROTO(100, 0), + NNG_PROTO_NONE = NNG_PROTO(0, 0), + NNG_PROTO_PAIR_V0 = NNG_PROTO(1, 0), + NNG_PROTO_PUB_V0 = NNG_PROTO(2, 0), + NNG_PROTO_SUB_V0 = NNG_PROTO(2, 1), + NNG_PROTO_REQ_V0 = NNG_PROTO(3, 0), + NNG_PROTO_REP_V0 = NNG_PROTO(3, 1), + NNG_PROTO_PUSH_V0 = NNG_PROTO(5, 0), + NNG_PROTO_PULL_V0 = NNG_PROTO(5, 1), + NNG_PROTO_SURVEYOR_V0 = NNG_PROTO(6, 2), + NNG_PROTO_RESPONDENT_V0 = NNG_PROTO(6, 3), + NNG_PROTO_BUS_V0 = NNG_PROTO(7, 0), + NNG_PROTO_STAR_V0 = NNG_PROTO(100, 0), + + // "Legacy" names. Please use explicit versioned names above. + NNG_PROTO_BUS = NNG_PROTO_BUS_V0, + NNG_PROTO_PAIR = NNG_PROTO_PAIR_V0, + NNG_PROTO_SUB = NNG_PROTO_SUB_V0, + NNG_PROTO_PUB = NNG_PROTO_PUB_V0, + NNG_PROTO_REQ = NNG_PROTO_REQ_V0, + NNG_PROTO_REP = NNG_PROTO_REP_V0, + NNG_PROTO_PUSH = NNG_PROTO_PUSH_V0, + NNG_PROTO_PULL = NNG_PROTO_PULL_V0, + NNG_PROTO_SURVEYOR = NNG_PROTO_SURVEYOR_V0, + NNG_PROTO_RESPONDENT = NNG_PROTO_RESPONDENT_V0, }; +// Builtin protocol socket constructors. +extern int nng_bus0_open(nng_socket *); +extern int nng_pair0_open(nng_socket *); +extern int nng_pub0_open(nng_socket *); +extern int nng_sub0_open(nng_socket *); +extern int nng_push0_open(nng_socket *); +extern int nng_pull0_open(nng_socket *); +extern int nng_req0_open(nng_socket *); +extern int nng_rep0_open(nng_socket *); +extern int nng_surveyor0_open(nng_socket *); +extern int nng_respondent0_open(nng_socket *); + +// Default versions. These provide compile time defaults; note that +// the actual protocols are baked into the binary; this should avoid +// suprising. Choosing a new protocol should be done explicitly. +#define nng_bus_open nng_bus0_open +#define nng_pair_open nng_pair0_open +#define nng_pub_open nng_pub0_open +#define nng_sub_open nng_sub0_open +#define nng_push_open nng_push0_open +#define nng_pull_open nng_pull0_open +#define nng_req_open nng_req0_open +#define nng_rep_open nng_rep0_open +#define nng_surveyor_open nng_surveyor0_open +#define nng_respondent_open nng_respondent0_open + // Options. We encode option numbers as follows: // // - 0: socket, 1: transport diff --git a/src/nng_compat.c b/src/nng_compat.c index cb8d29ab..34fc6553 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -87,17 +87,48 @@ nn_errno(void) return (errno); } +static const struct { + uint16_t p_id; + int (*p_open)(nng_socket *); +} nn_protocols[] = { + // clang-format off + { NNG_PROTO_BUS_V0, nng_bus_open }, + { NNG_PROTO_PAIR_V0, nng_pair_open }, + { NNG_PROTO_PUSH_V0, nng_push_open }, + { NNG_PROTO_PULL_V0, nng_pull_open }, + { NNG_PROTO_PUB_V0, nng_pub_open }, + { NNG_PROTO_SUB_V0, nng_sub_open }, + { NNG_PROTO_REQ_V0, nng_req_open }, + { NNG_PROTO_REP_V0, nng_rep_open }, + { NNG_PROTO_SURVEYOR_V0, nng_surveyor_open }, + { NNG_PROTO_RESPONDENT_V0, nng_respondent_open }, + { NNG_PROTO_NONE, NULL }, + // clang-format on +}; + int nn_socket(int domain, int protocol) { nng_socket sock; int rv; + int i; if ((domain != AF_SP) && (domain != AF_SP_RAW)) { - errno = EAFNOSUPPORT; + nn_seterror(EAFNOSUPPORT); return (-1); } - if ((rv = nng_open(&sock, protocol)) != 0) { + + for (i = 0; nn_protocols[i].p_id != NNG_PROTO_NONE; i++) { + if (nn_protocols[i].p_id == protocol) { + break; + } + } + if (nn_protocols[i].p_open == NULL) { + nn_seterror(ENOTSUP); + return (-1); + } + + if ((rv = nn_protocols[i].p_open(&sock)) != 0) { nn_seterror(rv); return (-1); } diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 3070b90d..c5a6ce06 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -103,6 +103,14 @@ nni_bus_sock_open(void *arg) nni_bus_sock_getq(psock); } +static void +nni_bus_sock_close(void *arg) +{ + nni_bus_sock *psock = arg; + + nni_aio_cancel(&psock->aio_getq, NNG_ECLOSED); +} + static void nni_bus_pipe_fini(void *arg) { @@ -383,6 +391,7 @@ static nni_proto_sock_ops nni_bus_sock_ops = { .sock_init = nni_bus_sock_init, .sock_fini = nni_bus_sock_fini, .sock_open = nni_bus_sock_open, + .sock_close = nni_bus_sock_close, .sock_setopt = nni_bus_sock_setopt, .sock_getopt = nni_bus_sock_getopt, }; @@ -390,10 +399,16 @@ static nni_proto_sock_ops nni_bus_sock_ops = { // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. nni_proto nni_bus_proto = { - .proto_self = NNG_PROTO_BUS, - .proto_peer = NNG_PROTO_BUS, - .proto_name = "bus", + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_PROTO_BUS_V0, "bus" }, + .proto_peer = { NNG_PROTO_BUS_V0, "bus" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_bus_sock_ops, .proto_pipe_ops = &nni_bus_pipe_ops, }; + +int +nng_bus0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &nni_bus_proto)); +} diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 55ce5aa9..59465293 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -231,6 +231,18 @@ nni_pair_send_cb(void *arg) nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq); } +static void +nni_pair_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +nni_pair_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + static int nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { @@ -263,9 +275,6 @@ nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } -// This is the global protocol structure -- our linkage to the core. -// This should be the only global non-static symbol in this file. - static nni_proto_pipe_ops nni_pair_pipe_ops = { .pipe_init = nni_pair_pipe_init, .pipe_fini = nni_pair_pipe_fini, @@ -276,15 +285,23 @@ static nni_proto_pipe_ops nni_pair_pipe_ops = { static nni_proto_sock_ops nni_pair_sock_ops = { .sock_init = nni_pair_sock_init, .sock_fini = nni_pair_sock_fini, + .sock_open = nni_pair_sock_open, + .sock_close = nni_pair_sock_close, .sock_setopt = nni_pair_sock_setopt, .sock_getopt = nni_pair_sock_getopt, }; nni_proto nni_pair_proto = { - .proto_self = NNG_PROTO_PAIR, - .proto_peer = NNG_PROTO_PAIR, - .proto_name = "pair", + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_PROTO_PAIR_V0, "pair" }, + .proto_peer = { NNG_PROTO_PAIR_V0, "pair" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_pair_sock_ops, .proto_pipe_ops = &nni_pair_pipe_ops, }; + +int +nng_pair0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &nni_pair_proto)); +} diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index cde79824..e3c73342 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -56,9 +56,7 @@ nni_pull_sock_fini(void *arg) { nni_pull_sock *pull = arg; - if (pull != NULL) { - NNI_FREE_STRUCT(pull); - } + NNI_FREE_STRUCT(pull); } static int @@ -163,6 +161,18 @@ nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg) nni_msgq_aio_put(pull->urq, &pp->putq_aio); } +static void +nni_pull_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +nni_pull_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + static int nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { @@ -195,8 +205,6 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } -// This is the global protocol structure -- our linkage to the core. -// This should be the only global non-static symbol in this file. static nni_proto_pipe_ops nni_pull_pipe_ops = { .pipe_init = nni_pull_pipe_init, .pipe_fini = nni_pull_pipe_fini, @@ -207,15 +215,23 @@ static nni_proto_pipe_ops nni_pull_pipe_ops = { static nni_proto_sock_ops nni_pull_sock_ops = { .sock_init = nni_pull_sock_init, .sock_fini = nni_pull_sock_fini, + .sock_open = nni_pull_sock_open, + .sock_close = nni_pull_sock_close, .sock_setopt = nni_pull_sock_setopt, .sock_getopt = nni_pull_sock_getopt, }; nni_proto nni_pull_proto = { - .proto_self = NNG_PROTO_PULL, - .proto_peer = NNG_PROTO_PUSH, - .proto_name = "pull", + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_PROTO_PULL_V0, "pull" }, + .proto_peer = { NNG_PROTO_PUSH_V0, "push" }, .proto_flags = NNI_PROTO_FLAG_RCV, .proto_pipe_ops = &nni_pull_pipe_ops, .proto_sock_ops = &nni_pull_sock_ops, }; + +int +nng_pull0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &nni_pull_proto)); +} diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index b7d4322c..14b3b191 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -63,9 +63,19 @@ nni_push_sock_fini(void *arg) { nni_push_sock *push = arg; - if (push != NULL) { - NNI_FREE_STRUCT(push); - } + NNI_FREE_STRUCT(push); +} + +static void +nni_push_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +nni_push_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); } static void @@ -221,8 +231,6 @@ nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } -// This is the global protocol structure -- our linkage to the core. -// This should be the only global non-static symbol in this file. static nni_proto_pipe_ops nni_push_pipe_ops = { .pipe_init = nni_push_pipe_init, .pipe_fini = nni_push_pipe_fini, @@ -233,15 +241,23 @@ static nni_proto_pipe_ops nni_push_pipe_ops = { static nni_proto_sock_ops nni_push_sock_ops = { .sock_init = nni_push_sock_init, .sock_fini = nni_push_sock_fini, + .sock_open = nni_push_sock_open, + .sock_close = nni_push_sock_close, .sock_setopt = nni_push_sock_setopt, .sock_getopt = nni_push_sock_getopt, }; nni_proto nni_push_proto = { - .proto_self = NNG_PROTO_PUSH, - .proto_peer = NNG_PROTO_PULL, - .proto_name = "push", + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_PROTO_PUSH_V0, "push" }, + .proto_peer = { NNG_PROTO_PULL_V0, "pull" }, .proto_flags = NNI_PROTO_FLAG_SND, .proto_pipe_ops = &nni_push_pipe_ops, .proto_sock_ops = &nni_push_sock_ops, }; + +int +nng_push0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &nni_push_proto)); +} diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index e32f179a..161a5d79 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -97,6 +97,14 @@ nni_pub_sock_open(void *arg) nni_msgq_aio_get(pub->uwq, &pub->aio_getq); } +static void +nni_pub_sock_close(void *arg) +{ + nni_pub_sock *pub = arg; + + nni_aio_cancel(&pub->aio_getq, NNG_ECLOSED); +} + static void nni_pub_pipe_fini(void *arg) { @@ -319,15 +327,22 @@ nni_proto_sock_ops nni_pub_sock_ops = { .sock_init = nni_pub_sock_init, .sock_fini = nni_pub_sock_fini, .sock_open = nni_pub_sock_open, + .sock_close = nni_pub_sock_close, .sock_setopt = nni_pub_sock_setopt, .sock_getopt = nni_pub_sock_getopt, }; nni_proto nni_pub_proto = { - .proto_self = NNG_PROTO_PUB, - .proto_peer = NNG_PROTO_SUB, - .proto_name = "pub", + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_PROTO_PUB_V0, "pub" }, + .proto_peer = { NNG_PROTO_SUB_V0, "sub" }, .proto_flags = NNI_PROTO_FLAG_SND, .proto_sock_ops = &nni_pub_sock_ops, .proto_pipe_ops = &nni_pub_pipe_ops, }; + +int +nng_pub0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &nni_pub_proto)); +} diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 03d76e2d..53f01e0f 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -79,6 +79,18 @@ nni_sub_sock_fini(void *arg) NNI_FREE_STRUCT(sub); } +static void +nni_sub_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +nni_sub_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + static int nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock) { @@ -330,16 +342,24 @@ static nni_proto_pipe_ops nni_sub_pipe_ops = { static nni_proto_sock_ops nni_sub_sock_ops = { .sock_init = nni_sub_sock_init, .sock_fini = nni_sub_sock_fini, + .sock_open = nni_sub_sock_open, + .sock_close = nni_sub_sock_close, .sock_setopt = nni_sub_sock_setopt, .sock_getopt = nni_sub_sock_getopt, .sock_rfilter = nni_sub_sock_rfilter, }; nni_proto nni_sub_proto = { - .proto_self = NNG_PROTO_SUB, - .proto_peer = NNG_PROTO_PUB, - .proto_name = "sub", + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_PROTO_SUB_V0, "sub" }, + .proto_peer = { NNG_PROTO_PUB_V0, "pub" }, .proto_flags = NNI_PROTO_FLAG_RCV, .proto_sock_ops = &nni_sub_sock_ops, .proto_pipe_ops = &nni_sub_pipe_ops, }; + +int +nng_sub0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &nni_sub_proto)); +} diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index c7546182..1adfd0f8 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -488,10 +488,16 @@ static nni_proto_sock_ops nni_rep_sock_ops = { }; nni_proto nni_rep_proto = { - .proto_self = NNG_PROTO_REP, - .proto_peer = NNG_PROTO_REQ, - .proto_name = "rep", + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_PROTO_REP_V0, "rep" }, + .proto_peer = { NNG_PROTO_REQ_V0, "req" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_rep_sock_ops, .proto_pipe_ops = &nni_rep_pipe_ops, }; + +int +nng_rep0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &nni_rep_proto)); +} diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 7ec53c90..d0dd3887 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -111,6 +111,12 @@ nni_req_sock_init(void **reqp, nni_sock *sock) return (0); } +static void +nni_req_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + static void nni_req_sock_close(void *arg) { @@ -626,6 +632,7 @@ static nni_proto_pipe_ops nni_req_pipe_ops = { static nni_proto_sock_ops nni_req_sock_ops = { .sock_init = nni_req_sock_init, .sock_fini = nni_req_sock_fini, + .sock_open = nni_req_sock_open, .sock_close = nni_req_sock_close, .sock_setopt = nni_req_sock_setopt, .sock_getopt = nni_req_sock_getopt, @@ -634,10 +641,16 @@ static nni_proto_sock_ops nni_req_sock_ops = { }; nni_proto nni_req_proto = { - .proto_self = NNG_PROTO_REQ, - .proto_peer = NNG_PROTO_REP, - .proto_name = "req", + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_PROTO_REQ_V0, "req" }, + .proto_peer = { NNG_PROTO_REP_V0, "rep" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_req_sock_ops, .proto_pipe_ops = &nni_req_pipe_ops, }; + +int +nng_req0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &nni_req_proto)); +} diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 089e730e..4db79b86 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -504,10 +504,16 @@ static nni_proto_sock_ops nni_resp_sock_ops = { }; nni_proto nni_respondent_proto = { - .proto_self = NNG_PROTO_RESPONDENT, - .proto_peer = NNG_PROTO_SURVEYOR, - .proto_name = "respondent", + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_PROTO_RESPONDENT_V0, "respondent" }, + .proto_peer = { NNG_PROTO_SURVEYOR_V0, "surveyor" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_resp_sock_ops, .proto_pipe_ops = &nni_resp_pipe_ops, }; + +int +nng_respondent0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &nni_respondent_proto)); +} diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 633e1491..91fe4ad3 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -470,13 +470,17 @@ static nni_proto_sock_ops nni_surv_sock_ops = { .sock_sfilter = nni_surv_sock_sfilter, }; -// This is the global protocol structure -- our linkage to the core. -// This should be the only global non-static symbol in this file. nni_proto nni_surveyor_proto = { - .proto_self = NNG_PROTO_SURVEYOR, - .proto_peer = NNG_PROTO_RESPONDENT, - .proto_name = "surveyor", + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_peer = { NNG_PROTO_RESPONDENT_V0, "respondent" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_surv_sock_ops, .proto_pipe_ops = &nni_surv_pipe_ops, }; + +int +nng_surveyor0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &nni_surveyor_proto)); +} diff --git a/tests/bus.c b/tests/bus.c index 78740a37..e96a0a53 100644 --- a/tests/bus.c +++ b/tests/bus.c @@ -8,14 +8,15 @@ // #include "convey.h" -#include "nng.h" #include "core/nng_impl.h" +#include "nng.h" #include -#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) -#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\ - So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) +#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) +#define CHECKSTR(m, s) \ + So(nng_msg_len(m) == strlen(s)); \ + So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) Main({ const char *addr = "inproc://test"; @@ -27,54 +28,57 @@ Main({ Convey("We can create a BUS socket", { nng_socket bus; - So(nng_open(&bus, NNG_PROTO_BUS) == 0); + So(nng_bus_open(&bus) == 0); - Reset({ - nng_close(bus); - }) + Reset({ nng_close(bus); }); Convey("Protocols match", { So(nng_protocol(bus) == NNG_PROTO_BUS); So(nng_peer(bus) == NNG_PROTO_BUS); - }) - }) + }); + }); Convey("We can create a linked BUS topology", { nng_socket bus1; nng_socket bus2; nng_socket bus3; - uint64_t rtimeo; + uint64_t rtimeo; + + So(nng_bus_open(&bus1) == 0); + So(nng_bus_open(&bus2) == 0); + So(nng_bus_open(&bus3) == 0); - So(nng_open(&bus1, NNG_PROTO_BUS) == 0); - So(nng_open(&bus2, NNG_PROTO_BUS) == 0); - So(nng_open(&bus3, NNG_PROTO_BUS) == 0); - Reset({ nng_close(bus1); nng_close(bus2); nng_close(bus3); - }) + }); So(nng_listen(bus1, addr, NULL, NNG_FLAG_SYNCH) == 0); So(nng_dial(bus2, addr, NULL, NNG_FLAG_SYNCH) == 0); So(nng_dial(bus3, addr, NULL, NNG_FLAG_SYNCH) == 0); rtimeo = 50000; - So(nng_setopt(bus1, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); + So(nng_setopt(bus1, NNG_OPT_RCVTIMEO, &rtimeo, + sizeof(rtimeo)) == 0); rtimeo = 50000; - So(nng_setopt(bus2, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); + So(nng_setopt(bus2, NNG_OPT_RCVTIMEO, &rtimeo, + sizeof(rtimeo)) == 0); rtimeo = 50000; - So(nng_setopt(bus3, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); + So(nng_setopt(bus3, NNG_OPT_RCVTIMEO, &rtimeo, + sizeof(rtimeo)) == 0); Convey("Messages delivered", { nng_msg *msg; - // This is just a poor man's sleep. - So(nng_recvmsg(bus1, &msg, 0) == NNG_ETIMEDOUT); - So(nng_recvmsg(bus2, &msg, 0) == NNG_ETIMEDOUT); - So(nng_recvmsg(bus3, &msg, 0) == NNG_ETIMEDOUT); - + So(nng_recvmsg(bus1, &msg, 0) == + NNG_ETIMEDOUT); + So(nng_recvmsg(bus2, &msg, 0) == + NNG_ETIMEDOUT); + So(nng_recvmsg(bus3, &msg, 0) == + NNG_ETIMEDOUT); + So(nng_msg_alloc(&msg, 0) == 0); APPENDSTR(msg, "99bits"); So(nng_sendmsg(bus2, msg, 0) == 0); @@ -82,7 +86,8 @@ Main({ So(nng_recvmsg(bus1, &msg, 0) == 0); CHECKSTR(msg, "99bits"); nng_msg_free(msg); - So(nng_recvmsg(bus3, &msg, 0) == NNG_ETIMEDOUT); + So(nng_recvmsg(bus3, &msg, 0) == + NNG_ETIMEDOUT); So(nng_msg_alloc(&msg, 0) == 0); APPENDSTR(msg, "onthe"); @@ -95,7 +100,7 @@ Main({ So(nng_recvmsg(bus3, &msg, 0) == 0); CHECKSTR(msg, "onthe"); nng_msg_free(msg); - }) - }) - }) + }); + }); + }); }) diff --git a/tests/event.c b/tests/event.c index 4c7014e9..b2d15780 100644 --- a/tests/event.c +++ b/tests/event.c @@ -9,21 +9,22 @@ #include "convey.h" #include "nng.h" -#include #include +#include -#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) -#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\ - So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) +#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) +#define CHECKSTR(m, s) \ + So(nng_msg_len(m) == strlen(s)); \ + So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) struct evcnt { nng_socket sock; - int readable; - int writeable; - int pipeadd; - int piperem; - int epadd; - int eprem; - int err; + int readable; + int writeable; + int pipeadd; + int piperem; + int epadd; + int eprem; + int err; }; void @@ -68,25 +69,25 @@ Main({ Test("Event Handling", { Convey("Given a connected pair of pair sockets", { - nng_socket sock1; - nng_socket sock2; + nng_socket sock1; + nng_socket sock2; struct evcnt evcnt1; struct evcnt evcnt2; - nng_notify *notify1; - nng_notify *notify2; + nng_notify * notify1; + nng_notify * notify2; - So(nng_open(&sock1, NNG_PROTO_PAIR) == 0); - So(nng_open(&sock2, NNG_PROTO_PAIR) == 0); + So(nng_pair0_open(&sock1) == 0); + So(nng_pair0_open(&sock2) == 0); - memset(&evcnt1, 0, sizeof (evcnt1)); - memset(&evcnt2, 0, sizeof (evcnt2)); + memset(&evcnt1, 0, sizeof(evcnt1)); + memset(&evcnt2, 0, sizeof(evcnt2)); evcnt1.sock = sock1; evcnt2.sock = sock2; Reset({ nng_close(sock1); nng_close(sock2); - }) + }); So(nng_listen(sock1, addr, NULL, NNG_FLAG_SYNCH) == 0); So(nng_dial(sock2, addr, NULL, NNG_FLAG_SYNCH) == 0); @@ -95,8 +96,12 @@ Main({ nng_usleep(100000); Convey("We can register callbacks", { - So((notify1 = nng_setnotify(sock1, NNG_EV_CAN_SND, bump, &evcnt1)) != NULL); - So((notify2 = nng_setnotify(sock2, NNG_EV_CAN_RCV, bump, &evcnt2)) != NULL); + So((notify1 = nng_setnotify(sock1, + NNG_EV_CAN_SND, bump, &evcnt1)) != + NULL); + So((notify2 = nng_setnotify(sock2, + NNG_EV_CAN_RCV, bump, &evcnt2)) != + NULL); Convey("They are called", { nng_msg *msg; @@ -112,27 +117,28 @@ Main({ // this. Probably the msgq needs to // toggle on reads. - //nng_usleep(20000); + // nng_usleep(20000); - //So(nng_recvmsg(sock2, &msg, 0) == 0); + // So(nng_recvmsg(sock2, &msg, 0) == + // 0); - //CHECKSTR(msg, "abc"); - //nng_msg_free(msg); + // CHECKSTR(msg, "abc"); + // nng_msg_free(msg); // The notify runs async... nng_usleep(100000); So(evcnt1.writeable == 1); So(evcnt2.readable == 1); - }) + }); Convey("We can unregister them", { nng_unsetnotify(sock1, notify1); So(1); nng_unsetnotify(sock2, notify2); So(1); - }) - }) - }) - }) + }); + }); + }); + }); }) diff --git a/tests/pipeline.c b/tests/pipeline.c index aa01c2d1..510b7f77 100644 --- a/tests/pipeline.c +++ b/tests/pipeline.c @@ -8,13 +8,14 @@ // #include "convey.h" -#include "nng.h" #include "core/nng_impl.h" +#include "nng.h" #include -#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) -#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\ - So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) +#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) +#define CHECKSTR(m, s) \ + So(nng_msg_len(m) == strlen(s)); \ + So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) Main({ const char *addr = "inproc://test"; @@ -25,58 +26,54 @@ Main({ Convey("We can create a PUSH socket", { nng_socket push; - So(nng_open(&push, NNG_PROTO_PUSH) == 0); + So(nng_push_open(&push) == 0); - Reset({ - nng_close(push); - }) + Reset({ nng_close(push); }); Convey("Protocols match", { So(nng_protocol(push) == NNG_PROTO_PUSH); So(nng_peer(push) == NNG_PROTO_PULL); - }) + }); Convey("Recv fails", { nng_msg *msg; So(nng_recvmsg(push, &msg, 0) == NNG_ENOTSUP); - }) - }) + }); + }); Convey("We can create a PULL socket", { nng_socket pull; - So(nng_open(&pull, NNG_PROTO_PULL) == 0); + So(nng_pull_open(&pull) == 0); - Reset({ - nng_close(pull); - }) + Reset({ nng_close(pull); }); Convey("Protocols match", { So(nng_protocol(pull) == NNG_PROTO_PULL); So(nng_peer(pull) == NNG_PROTO_PUSH); - }) + }); Convey("Send fails", { nng_msg *msg; So(nng_msg_alloc(&msg, 0) == 0); So(nng_sendmsg(pull, msg, 0) == NNG_ENOTSUP); nng_msg_free(msg); - }) - }) + }); + }); Convey("We can create a linked PUSH/PULL pair", { nng_socket push; nng_socket pull; nng_socket what; - So(nng_open(&push, NNG_PROTO_PUSH) == 0); - So(nng_open(&pull, NNG_PROTO_PULL) == 0); - So(nng_open(&what, NNG_PROTO_PUSH) == 0); + So(nng_push_open(&push) == 0); + So(nng_pull_open(&pull) == 0); + So(nng_push_open(&what) == 0); Reset({ nng_close(push); nng_close(pull); nng_close(what); - }) + }); // Its important to avoid a startup race that the // sender be the dialer. Otherwise you need a delay @@ -97,30 +94,30 @@ Main({ So(msg != NULL); CHECKSTR(msg, "hello"); nng_msg_free(msg); - }) - }) + }); + }); Convey("Load balancing", { - nng_msg *abc; - nng_msg *def; - uint64_t usecs; - int len; + nng_msg * abc; + nng_msg * def; + uint64_t usecs; + int len; nng_socket push; nng_socket pull1; nng_socket pull2; nng_socket pull3; - So(nng_open(&push, NNG_PROTO_PUSH) == 0); - So(nng_open(&pull1, NNG_PROTO_PULL) == 0); - So(nng_open(&pull2, NNG_PROTO_PULL) == 0); - So(nng_open(&pull3, NNG_PROTO_PULL) == 0); + So(nng_push_open(&push) == 0); + So(nng_pull_open(&pull1) == 0); + So(nng_pull_open(&pull2) == 0); + So(nng_pull_open(&pull3) == 0); Reset({ nng_close(push); nng_close(pull1); nng_close(pull2); nng_close(pull3); - }) + }); // We need to increase the buffer from zero, because // there is no guarantee that the various listeners @@ -129,14 +126,22 @@ Main({ // ensures that we can write to each stream, even if // the listeners are not running yet. len = 4; - So(nng_setopt(push, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0); - So(nng_setopt(push, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0); - So(nng_setopt(pull1, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0); - So(nng_setopt(pull1, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0); - So(nng_setopt(pull2, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0); - So(nng_setopt(pull2, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0); - So(nng_setopt(pull3, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0); - So(nng_setopt(pull3, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0); + So(nng_setopt( + push, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0); + So(nng_setopt( + push, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0); + So(nng_setopt( + pull1, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0); + So(nng_setopt( + pull1, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0); + So(nng_setopt( + pull2, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0); + So(nng_setopt( + pull2, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0); + So(nng_setopt( + pull3, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0); + So(nng_setopt( + pull3, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0); So(nng_msg_alloc(&abc, 0) == 0); APPENDSTR(abc, "abc"); @@ -144,9 +149,12 @@ Main({ APPENDSTR(def, "def"); usecs = 100000; - So(nng_setopt(pull1, NNG_OPT_RCVTIMEO, &usecs, sizeof (usecs)) == 0); - So(nng_setopt(pull2, NNG_OPT_RCVTIMEO, &usecs, sizeof (usecs)) == 0); - So(nng_setopt(pull3, NNG_OPT_RCVTIMEO, &usecs, sizeof (usecs)) == 0); + So(nng_setopt(pull1, NNG_OPT_RCVTIMEO, &usecs, + sizeof(usecs)) == 0); + So(nng_setopt(pull2, NNG_OPT_RCVTIMEO, &usecs, + sizeof(usecs)) == 0); + So(nng_setopt(pull3, NNG_OPT_RCVTIMEO, &usecs, + sizeof(usecs)) == 0); So(nng_listen(push, addr, NULL, NNG_FLAG_SYNCH) == 0); So(nng_dial(pull1, addr, NULL, NNG_FLAG_SYNCH) == 0); So(nng_dial(pull2, addr, NULL, NNG_FLAG_SYNCH) == 0); @@ -175,8 +183,8 @@ Main({ So(nng_recvmsg(pull1, &abc, 0) == NNG_ETIMEDOUT); So(nng_recvmsg(pull2, &abc, 0) == NNG_ETIMEDOUT); - }) - }) + }); + }); nni_fini(); }) diff --git a/tests/pollfd.c b/tests/pollfd.c index f6b9cf30..e2cd2fb6 100644 --- a/tests/pollfd.c +++ b/tests/pollfd.c @@ -11,9 +11,9 @@ #include "nng.h" #ifndef _WIN32 -#include #include -#define INVALID_SOCKET -1 +#include +#define INVALID_SOCKET -1 #else #define poll WSAPoll @@ -22,94 +22,92 @@ #endif #include -#include + #include + +#include #include #endif -// Inproc tests. - -TestMain("Poll FDs", { - - Convey("Given a connected pair of sockets", { - nng_socket s1; - nng_socket s2; - - So(nng_open(&s1, NNG_PROTO_PAIR) == 0); - So(nng_open(&s2, NNG_PROTO_PAIR) == 0); - Reset({ - nng_close(s1); - nng_close(s2); - }) - So(nng_listen(s1, "inproc://yeahbaby", NULL, 0) == 0); - nng_usleep(50000); - - So(nng_dial(s2, "inproc://yeahbaby", NULL, 0) == 0); - nng_usleep(50000); - - Convey("We can get a recv FD", { - int fd; - size_t sz; - - sz = sizeof (fd); - So(nng_getopt(s1, NNG_OPT_RCVFD, &fd, &sz) == 0); - So(fd != INVALID_SOCKET); - - Convey("And they start non pollable", { - struct pollfd pfd; - pfd.fd = fd; - pfd.events = POLLIN; - pfd.revents = 0; - - So(poll(&pfd, 1, 0) == 0); - So(pfd.revents == 0); - }) - - Convey("But if we write they are pollable", { - struct pollfd pfd; - pfd.fd = fd; - pfd.events = POLLIN; - pfd.revents = 0; - - So(nng_send(s2, "kick", 5, 0) == 0); - So(poll(&pfd, 1, 1000) == 1); - So((pfd.revents & POLLIN) != 0); - }) - }) - - Convey("We can get a send FD", { - int fd; - size_t sz; - - sz = sizeof (fd); - So(nng_getopt(s1, NNG_OPT_SNDFD, &fd, &sz) == 0); - So(fd != INVALID_SOCKET); - So(nng_send(s1, "oops", 4, 0) == 0); - }) - - Convey("We cannot get a send FD for PULL", { - nng_socket s3; - int fd; - size_t sz; - So(nng_open(&s3, NNG_PROTO_PULL) == 0); - Reset({ - nng_close(s3); - }) - sz = sizeof (fd); - So(nng_getopt(s3, NNG_OPT_SNDFD, &fd, &sz) == NNG_ENOTSUP); - }) - - Convey("We cannot get a recv FD for PUSH", { - nng_socket s3; - int fd; - size_t sz; - So(nng_open(&s3, NNG_PROTO_PUSH) == 0); - Reset({ - nng_close(s3); - }) - sz = sizeof (fd); - So(nng_getopt(s3, NNG_OPT_RCVFD, &fd, &sz) == NNG_ENOTSUP); - }) - }) -}) +TestMain("Poll FDs", + { + + Convey("Given a connected pair of sockets", { + nng_socket s1; + nng_socket s2; + + So(nng_pair_open(&s1) == 0); + So(nng_pair_open(&s2) == 0); + Reset({ + nng_close(s1); + nng_close(s2); + }); + So(nng_listen(s1, "inproc://yeahbaby", NULL, 0) == 0); + nng_usleep(50000); + + So(nng_dial(s2, "inproc://yeahbaby", NULL, 0) == 0); + nng_usleep(50000); + + Convey("We can get a recv FD", { + int fd; + size_t sz; + + sz = sizeof(fd); + So(nng_getopt(s1, NNG_OPT_RCVFD, &fd, &sz) == 0); + So(fd != INVALID_SOCKET); + + Convey("And they start non pollable", { + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; + + So(poll(&pfd, 1, 0) == 0); + So(pfd.revents == 0); + }); + + Convey("But if we write they are pollable", { + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; + + So(nng_send(s2, "kick", 5, 0) == 0); + So(poll(&pfd, 1, 1000) == 1); + So((pfd.revents & POLLIN) != 0); + }); + }); + + Convey("We can get a send FD", { + int fd; + size_t sz; + + sz = sizeof(fd); + So(nng_getopt(s1, NNG_OPT_SNDFD, &fd, &sz) == 0); + So(fd != INVALID_SOCKET); + So(nng_send(s1, "oops", 4, 0) == 0); + }); + + Convey("We cannot get a send FD for PULL", { + nng_socket s3; + int fd; + size_t sz; + So(nng_pull_open(&s3) == 0); + Reset({ nng_close(s3); }); + sz = sizeof(fd); + So(nng_getopt(s3, NNG_OPT_SNDFD, &fd, &sz) == + NNG_ENOTSUP); + }); + + Convey("We cannot get a recv FD for PUSH", { + nng_socket s3; + int fd; + size_t sz; + So(nng_push_open(&s3) == 0); + Reset({ nng_close(s3); }); + sz = sizeof(fd); + So(nng_getopt(s3, NNG_OPT_RCVFD, &fd, &sz) == + NNG_ENOTSUP); + }); + }) }) diff --git a/tests/pubsub.c b/tests/pubsub.c index 94558f39..19d85848 100644 --- a/tests/pubsub.c +++ b/tests/pubsub.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -8,14 +9,15 @@ // #include "convey.h" -#include "nng.h" #include "core/nng_impl.h" +#include "nng.h" #include -#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) -#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\ - So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) +#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) +#define CHECKSTR(m, s) \ + So(nng_msg_len(m) == strlen(s)); \ + So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) Main({ const char *addr = "inproc://test"; @@ -25,90 +27,95 @@ Main({ Convey("We can create a PUB socket", { nng_socket pub; - So(nng_open(&pub, NNG_PROTO_PUB) == 0); + So(nng_pub_open(&pub) == 0); - Reset({ - nng_close(pub); - }) + Reset({ nng_close(pub); }); Convey("Protocols match", { So(nng_protocol(pub) == NNG_PROTO_PUB); So(nng_peer(pub) == NNG_PROTO_SUB); - }) + }); Convey("Recv fails", { nng_msg *msg; So(nng_recvmsg(pub, &msg, 0) == NNG_ENOTSUP); - }) - }) + }); + }); Convey("We can create a SUB socket", { nng_socket sub; - So(nng_open(&sub, NNG_PROTO_SUB) == 0); + So(nng_sub_open(&sub) == 0); - Reset({ - nng_close(sub); - }) + Reset({ nng_close(sub); }); Convey("Protocols match", { So(nng_protocol(sub) == NNG_PROTO_SUB); So(nng_peer(sub) == NNG_PROTO_PUB); - }) + }); Convey("Send fails", { nng_msg *msg; So(nng_msg_alloc(&msg, 0) == 0); So(nng_sendmsg(sub, msg, 0) == NNG_ENOTSUP); nng_msg_free(msg); - }) - }) + }); + }); Convey("We can create a linked PUB/SUB pair", { nng_socket pub; nng_socket sub; - So(nng_open(&pub, NNG_PROTO_PUB) == 0); + So(nng_pub_open(&pub) == 0); - So(nng_open(&sub, NNG_PROTO_SUB) == 0); + So(nng_sub_open(&sub) == 0); Reset({ nng_close(pub); nng_close(sub); - }) - - // Most consumers will usually have the pub listen, - // and the sub dial. However, this creates a problem - // for our tests, since we can wind up trying to push - // data before the pipe is fully registered (the - // accept runs in an asynch thread.) Doing the reverse - // here ensures that we won't lose data. + }); + + // Most consumers will usually have the pub + // listen, and the sub dial. However, this + // creates a problem for our tests, since we + // can wind up trying to push data before the + // pipe is fully registered (the accept runs in + // an asynch thread.) Doing the reverse here + // ensures that we won't lose data. So(nng_listen(sub, addr, NULL, NNG_FLAG_SYNCH) == 0); So(nng_dial(pub, addr, NULL, NNG_FLAG_SYNCH) == 0); Convey("Sub can subscribe", { - So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "ABC", 3) == 0); - So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "", 0) == 0); + So(nng_setopt( + sub, NNG_OPT_SUBSCRIBE, "ABC", 3) == 0); + So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "", 0) == + 0); Convey("Unsubscribe works", { - So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, "ABC", 3) == 0); - So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, "", 0) == 0); - - So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, "", 0) == NNG_ENOENT); - So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, "HELLO", 0) == NNG_ENOENT); - }) - }) + So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, + "ABC", 3) == 0); + So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, + "", 0) == 0); + + So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, + "", 0) == NNG_ENOENT); + So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, + "HELLO", 0) == NNG_ENOENT); + }); + }); Convey("Pub cannot subscribe", { - So(nng_setopt(pub, NNG_OPT_SUBSCRIBE, "", 0) == NNG_ENOTSUP); - }) + So(nng_setopt(pub, NNG_OPT_SUBSCRIBE, "", 0) == + NNG_ENOTSUP); + }); Convey("Subs can receive from pubs", { nng_msg *msg; uint64_t rtimeo; - - So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "/some/", strlen("/some/")) == 0); + So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "/some/", + strlen("/some/")) == 0); rtimeo = 50000; // 50ms - So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); + So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, + sizeof(rtimeo)) == 0); So(nng_msg_alloc(&msg, 0) == 0); APPENDSTR(msg, "/some/like/it/hot"); @@ -132,28 +139,31 @@ Main({ So(nng_recvmsg(sub, &msg, 0) == 0); CHECKSTR(msg, "/some/day/some/how"); nng_msg_free(msg); - }) + }); Convey("Subs without subsciptions don't receive", { uint64_t rtimeo = 50000; // 50ms nng_msg *msg; - So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); + So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, + sizeof(rtimeo)) == 0); So(nng_msg_alloc(&msg, 0) == 0); APPENDSTR(msg, "/some/don't/like/it"); So(nng_sendmsg(pub, msg, 0) == 0); So(nng_recvmsg(sub, &msg, 0) == NNG_ETIMEDOUT); - }) + }); Convey("Subs in raw receive", { uint64_t rtimeo = 50000; // 500ms - int raw = 1; + int raw = 1; nng_msg *msg; - So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); - So(nng_setopt(sub, NNG_OPT_RAW, &raw, sizeof (raw)) == 0); + So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, + sizeof(rtimeo)) == 0); + So(nng_setopt(sub, NNG_OPT_RAW, &raw, + sizeof(raw)) == 0); So(nng_msg_alloc(&msg, 0) == 0); APPENDSTR(msg, "/some/like/it/raw"); @@ -161,10 +171,9 @@ Main({ So(nng_recvmsg(sub, &msg, 0) == 0); CHECKSTR(msg, "/some/like/it/raw"); nng_msg_free(msg); - }) - - }) - }) + }); + }); + }); nni_fini(); }) diff --git a/tests/reqrep.c b/tests/reqrep.c index d4040bdb..cadd0a5f 100644 --- a/tests/reqrep.c +++ b/tests/reqrep.c @@ -8,13 +8,13 @@ // #include "convey.h" -#include "nng.h" #include "core/nng_impl.h" +#include "nng.h" #include Main({ - int rv; + int rv; const char *addr = "inproc://test"; nni_init(); @@ -22,36 +22,32 @@ Main({ Convey("We can create a REQ socket", { nng_socket req; - So(nng_open(&req, NNG_PROTO_REQ) == 0); + So(nng_req_open(&req) == 0); - Reset({ - nng_close(req); - }) + Reset({ nng_close(req); }); Convey("Protocols match", { So(nng_protocol(req) == NNG_PROTO_REQ); So(nng_peer(req) == NNG_PROTO_REP); - }) + }); Convey("Recv with no send fails", { nng_msg *msg; rv = nng_recvmsg(req, &msg, 0); So(rv == NNG_ESTATE); - }) - }) + }); + }); Convey("We can create a REP socket", { nng_socket rep; - So(nng_open(&rep, NNG_PROTO_REP) == 0); + So(nng_rep_open(&rep) == 0); - Reset({ - nng_close(rep); - }) + Reset({ nng_close(rep); }); Convey("Protocols match", { So(nng_protocol(rep) == NNG_PROTO_REP); So(nng_peer(rep) == NNG_PROTO_REQ); - }) + }); Convey("Send with no recv fails", { nng_msg *msg; @@ -60,21 +56,21 @@ Main({ rv = nng_sendmsg(rep, msg, 0); So(rv == NNG_ESTATE); nng_msg_free(msg); - }) - }) + }); + }); Convey("We can create a linked REQ/REP pair", { nng_socket req; nng_socket rep; - So(nng_open(&rep, NNG_PROTO_REP) == 0); + So(nng_rep_open(&rep) == 0); - So(nng_open(&req, NNG_PROTO_REQ) == 0); + So(nng_req_open(&req) == 0); Reset({ nng_close(rep); nng_close(req); - }) + }); So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0); So(nng_dial(req, addr, NULL, NNG_FLAG_SYNCH) == 0); @@ -102,31 +98,33 @@ Main({ So(nng_msg_len(ping) == 5); So(memcmp(nng_msg_body(ping), "pong", 5) == 0); nng_msg_free(ping); - }) - }) + }); + }); Convey("Request cancellation works", { nng_msg *abc; nng_msg *def; nng_msg *cmd; - uint64_t retry = 100000; // 100 ms - size_t len; + uint64_t retry = 100000; // 100 ms + size_t len; nng_socket req; nng_socket rep; - So(nng_open(&rep, NNG_PROTO_REP) == 0); + So(nng_rep_open(&rep) == 0); - So(nng_open(&req, NNG_PROTO_REQ) == 0); + So(nng_req_open(&req) == 0); Reset({ nng_close(rep); nng_close(req); - }) + }); - So(nng_setopt(req, NNG_OPT_RESENDTIME, &retry, sizeof (retry)) == 0); + So(nng_setopt(req, NNG_OPT_RESENDTIME, &retry, + sizeof(retry)) == 0); len = 16; - So(nng_setopt(req, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0); + So(nng_setopt( + req, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0); So(nng_msg_alloc(&abc, 0) == 0); So(nng_msg_append(abc, "abc", 4) == 0); @@ -149,8 +147,8 @@ Main({ So(nng_msg_len(cmd) == 4); So(memcmp(nng_msg_body(cmd), "def", 4) == 0); nng_msg_free(cmd); - }) - }) + }); + }); nni_fini(); }) diff --git a/tests/scalability.c b/tests/scalability.c index 1e0ea2a9..e9422f8e 100644 --- a/tests/scalability.c +++ b/tests/scalability.c @@ -43,7 +43,7 @@ openclients(nng_socket *clients, int num) int i; uint64_t t; for (i = 0; i < num; i++) { - if ((rv = nng_open(&clients[i], NNG_PROTO_REQ)) != 0) { + if ((rv = nng_req_open(&clients[i])) != 0) { printf("open #%d: %s\n", i, nng_strerror(rv)); return (rv); } @@ -120,7 +120,7 @@ Main({ clients = calloc(nclients, sizeof(nng_socket)); results = calloc(nclients, sizeof(int)); - if ((nng_open(&rep, NNG_PROTO_REP) != 0) || + if ((nng_rep_open(&rep) != 0) || (nng_setopt(rep, NNG_OPT_RCVBUF, &depth, sizeof(depth)) != 0) || (nng_setopt(rep, NNG_OPT_SNDBUF, &depth, sizeof(depth)) != 0) || (nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) != 0) || diff --git a/tests/sock.c b/tests/sock.c index 6c6d5b06..27599e47 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -8,175 +8,185 @@ // #include "convey.h" -#include "nng.h" #include "core/nng_impl.h" +#include "nng.h" #include Main({ Test("Socket Operations", { - Convey("We are able to open a PAIR socket", { - int rv; - nng_socket sock; - - So(nng_open(&sock, NNG_PROTO_PAIR) == 0); - - Reset({ - nng_close(sock); - }) - - Convey("And we can shut it down", { - rv = nng_shutdown(sock); - So(rv == 0); - rv = nng_shutdown(sock); - So(rv == NNG_ECLOSED); - }) - - Convey("It's type is still proto", { - So(nng_protocol(sock) == NNG_PROTO_PAIR); - }) - - Convey("Recv with no pipes times out correctly", { - nng_msg *msg = NULL; - int64_t when = 100000; - uint64_t now; - - now = nni_clock(); - - rv = nng_setopt(sock, NNG_OPT_RCVTIMEO, &when, - sizeof (when)); - So(rv == 0); - rv = nng_recvmsg(sock, &msg, 0); - So(rv == NNG_ETIMEDOUT); - So(msg == NULL); - So(nni_clock() >= (now + when)); - So(nni_clock() < (now + (when * 2))); - }) - - Convey("Recv nonblock with no pipes gives EAGAIN", { - nng_msg *msg = NULL; - rv = nng_recvmsg(sock, &msg, NNG_FLAG_NONBLOCK); - So(rv == NNG_EAGAIN); - So(msg == NULL); - }) - - Convey("Send with no pipes times out correctly", { - nng_msg *msg = NULL; - int64_t when = 100000; - uint64_t now; - - // We cheat to get access to the core's clock. - So(nng_msg_alloc(&msg, 0) == 0); - So(msg != NULL); - now = nni_clock(); - - rv = nng_setopt(sock, NNG_OPT_SNDTIMEO, &when, - sizeof (when)); - So(rv == 0); - rv = nng_sendmsg(sock, msg, 0); - So(rv == NNG_ETIMEDOUT); - So(nni_clock() >= (now + when)); - So(nni_clock() < (now + (when * 2))); - nng_msg_free(msg); - }) - - Convey("We can set and get options", { - int64_t when = 1234; - int64_t check = 0; - size_t sz; - rv = nng_setopt(sock, NNG_OPT_SNDTIMEO, &when, - sizeof (when)); - So(rv == 0); - sz = sizeof (check); - Convey("Short size is not copied", { - sz = 0; - rv = nng_getopt(sock, NNG_OPT_SNDTIMEO, - &check, &sz); + Convey("We are able to open a PAIR socket", { + int rv; + nng_socket sock; + + So(nng_pair_open(&sock) == 0); + + Reset({ nng_close(sock); }); + + Convey("And we can shut it down", { + rv = nng_shutdown(sock); So(rv == 0); - So(sz == sizeof (check)); - So(check == 0); - }) - Convey("Correct size is copied", { - sz = sizeof (check); - rv = nng_getopt(sock, NNG_OPT_SNDTIMEO, &check, - &sz); + rv = nng_shutdown(sock); + So(rv == NNG_ECLOSED); + }); + + Convey("It's type is still proto", + { So(nng_protocol(sock) == NNG_PROTO_PAIR); }); + + Convey("Recv with no pipes times out correctly", { + nng_msg *msg = NULL; + int64_t when = 100000; + uint64_t now; + + now = nni_clock(); + + rv = nng_setopt(sock, NNG_OPT_RCVTIMEO, &when, + sizeof(when)); So(rv == 0); - So(sz == sizeof (check)); - So(check == 1234); - }) - }) - - Convey("Bogus URLs not supported", { - Convey("Dialing fails properly", { - rv = nng_dial(sock, "bogus://somewhere", NULL, 0); - So(rv == NNG_ENOTSUP); - }) - Convey("Listening fails properly", { - rv = nng_listen(sock, "bogus://elsewhere", NULL, 0); - So(rv == NNG_ENOTSUP); - }) - }) - - Convey("Dialing synch can get refused", { - rv = nng_dial(sock, "inproc://notthere", NULL, NNG_FLAG_SYNCH); - So(rv == NNG_ECONNREFUSED); - }) - - Convey("Listening works", { - rv = nng_listen(sock, "inproc://here", NULL, NNG_FLAG_SYNCH); - So(rv == 0); - - Convey("Second listen fails ADDRINUSE", { - rv = nng_listen(sock, "inproc://here", NULL, NNG_FLAG_SYNCH); - So(rv == NNG_EADDRINUSE); - }) - - Convey("We can connect to it", { - nng_socket sock2; - So(nng_open(&sock2, NNG_PROTO_PAIR) == 0); - Reset({ - nng_close(sock2); + rv = nng_recvmsg(sock, &msg, 0); + So(rv == NNG_ETIMEDOUT); + So(msg == NULL); + So(nni_clock() >= (now + when)); + So(nni_clock() < (now + (when * 2))); + }); + + Convey("Recv nonblock with no pipes gives EAGAIN", { + nng_msg *msg = NULL; + rv = + nng_recvmsg(sock, &msg, NNG_FLAG_NONBLOCK); + So(rv == NNG_EAGAIN); + So(msg == NULL); + }); + + Convey("Send with no pipes times out correctly", { + nng_msg *msg = NULL; + int64_t when = 100000; + uint64_t now; + + // We cheat to get access to the core's clock. + So(nng_msg_alloc(&msg, 0) == 0); + So(msg != NULL); + now = nni_clock(); + + rv = nng_setopt(sock, NNG_OPT_SNDTIMEO, &when, + sizeof(when)); + So(rv == 0); + rv = nng_sendmsg(sock, msg, 0); + So(rv == NNG_ETIMEDOUT); + So(nni_clock() >= (now + when)); + So(nni_clock() < (now + (when * 2))); + nng_msg_free(msg); + }); + + Convey("We can set and get options", { + int64_t when = 1234; + int64_t check = 0; + size_t sz; + rv = nng_setopt(sock, NNG_OPT_SNDTIMEO, &when, + sizeof(when)); + So(rv == 0); + sz = sizeof(check); + Convey("Short size is not copied", { + sz = 0; + rv = nng_getopt(sock, NNG_OPT_SNDTIMEO, + &check, &sz); + So(rv == 0); + So(sz == sizeof(check)); + So(check == 0); + }) Convey("Correct size is copied", { + sz = sizeof(check); + rv = nng_getopt(sock, NNG_OPT_SNDTIMEO, + &check, &sz); + So(rv == 0); + So(sz == sizeof(check)); + So(check == 1234); }) - rv = nng_dial(sock2, "inproc://here", NULL, NNG_FLAG_SYNCH); + }); + + Convey("Bogus URLs not supported", { + Convey("Dialing fails properly", { + rv = nng_dial(sock, + "bogus://somewhere", NULL, 0); + So(rv == NNG_ENOTSUP); + }); + Convey("Listening fails properly", { + rv = nng_listen(sock, + "bogus://elsewhere", NULL, 0); + So(rv == NNG_ENOTSUP); + }); + }); + + Convey("Dialing synch can get refused", { + rv = nng_dial(sock, "inproc://notthere", NULL, + NNG_FLAG_SYNCH); + So(rv == NNG_ECONNREFUSED); + }); + + Convey("Listening works", { + rv = nng_listen(sock, "inproc://here", NULL, + NNG_FLAG_SYNCH); So(rv == 0); - nng_close(sock2); - }) - }) - - Convey("We can send and receive messages", { - nng_socket sock2; - int len = 1; - size_t sz; - uint64_t second = 3000000; - char *buf; - - So(nng_open(&sock2, NNG_PROTO_PAIR) == 0); - Reset({ - nng_close(sock2); - }) - - So(nng_setopt(sock, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0); - So(nng_setopt(sock, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0); - - So(nng_setopt(sock2, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0); - So(nng_setopt(sock2, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0); - - So(nng_setopt(sock, NNG_OPT_SNDTIMEO, &second, sizeof (second)) == 0); - So(nng_setopt(sock, NNG_OPT_RCVTIMEO, &second, sizeof (second)) == 0); - So(nng_setopt(sock2, NNG_OPT_SNDTIMEO, &second, sizeof (second)) == 0); - So(nng_setopt(sock2, NNG_OPT_RCVTIMEO, &second, sizeof (second)) == 0); - - So(nng_listen(sock, "inproc://test1", NULL, NNG_FLAG_SYNCH) == 0); - So(nng_dial(sock2, "inproc://test1", NULL, NNG_FLAG_SYNCH) == 0); - - So(nng_send(sock, "abc", 4, 0) == 0); - So(nng_recv(sock2 , &buf, &sz, NNG_FLAG_ALLOC) == 0); - So(buf != NULL); - So(sz == 4); - So(memcmp(buf, "abc", 4) == 0); - nng_free(buf, sz); - }) - }) - }) + + Convey("Second listen fails ADDRINUSE", { + rv = nng_listen(sock, "inproc://here", + NULL, NNG_FLAG_SYNCH); + So(rv == NNG_EADDRINUSE); + }); + + Convey("We can connect to it", { + nng_socket sock2; + So(nng_pair_open(&sock2) == 0); + Reset({ nng_close(sock2); }); + rv = nng_dial(sock2, "inproc://here", + NULL, NNG_FLAG_SYNCH); + So(rv == 0); + nng_close(sock2); + }); + }); + + Convey("We can send and receive messages", { + nng_socket sock2; + int len = 1; + size_t sz; + uint64_t second = 3000000; + char * buf; + + So(nng_pair_open(&sock2) == 0); + Reset({ nng_close(sock2); }); + + So(nng_setopt(sock, NNG_OPT_RCVBUF, &len, + sizeof(len)) == 0); + So(nng_setopt(sock, NNG_OPT_SNDBUF, &len, + sizeof(len)) == 0); + + So(nng_setopt(sock2, NNG_OPT_RCVBUF, &len, + sizeof(len)) == 0); + So(nng_setopt(sock2, NNG_OPT_SNDBUF, &len, + sizeof(len)) == 0); + + So(nng_setopt(sock, NNG_OPT_SNDTIMEO, &second, + sizeof(second)) == 0); + So(nng_setopt(sock, NNG_OPT_RCVTIMEO, &second, + sizeof(second)) == 0); + So(nng_setopt(sock2, NNG_OPT_SNDTIMEO, &second, + sizeof(second)) == 0); + So(nng_setopt(sock2, NNG_OPT_RCVTIMEO, &second, + sizeof(second)) == 0); + + So(nng_listen(sock, "inproc://test1", NULL, + NNG_FLAG_SYNCH) == 0); + So(nng_dial(sock2, "inproc://test1", NULL, + NNG_FLAG_SYNCH) == 0); + + So(nng_send(sock, "abc", 4, 0) == 0); + So(nng_recv( + sock2, &buf, &sz, NNG_FLAG_ALLOC) == 0); + So(buf != NULL); + So(sz == 4); + So(memcmp(buf, "abc", 4) == 0); + nng_free(buf, sz); + }); + }); + }); }) diff --git a/tests/survey.c b/tests/survey.c index a7dd3cca..6f85850f 100644 --- a/tests/survey.c +++ b/tests/survey.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -8,14 +9,15 @@ // #include "convey.h" -#include "nng.h" #include "core/nng_impl.h" +#include "nng.h" #include -#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) -#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\ - So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) +#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) +#define CHECKSTR(m, s) \ + So(nng_msg_len(m) == strlen(s)); \ + So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) Main({ const char *addr = "inproc://test"; @@ -23,113 +25,129 @@ Main({ nni_init(); Test("SURVEY pattern", { - Convey("We can create a SURVEYOR socket", { - nng_socket surv; - - So(nng_open(&surv, NNG_PROTO_SURVEYOR) == 0); - - Reset({ - nng_close(surv); - }) - - Convey("Protocols match", { - So(nng_protocol(surv) == NNG_PROTO_SURVEYOR); - So(nng_peer(surv) == NNG_PROTO_RESPONDENT); - }) - - Convey("Recv with no survey fails", { - nng_msg *msg; - So(nng_recvmsg(surv, &msg, 0) == NNG_ESTATE); - }) - - Convey("Survey without responder times out", { - uint64_t expire = 50000; - nng_msg *msg; - - So(nng_setopt(surv, NNG_OPT_SURVEYTIME, &expire, sizeof (expire)) == 0); - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_sendmsg(surv, msg, 0) == 0); - So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT); - }) - }) - - Convey("We can create a RESPONDENT socket", { - nng_socket resp; - So(nng_open(&resp, NNG_PROTO_RESPONDENT) == 0); - - Reset({ - nng_close(resp); - }) - - Convey("Protocols match", { - So(nng_protocol(resp) == NNG_PROTO_RESPONDENT); - So(nng_peer(resp) == NNG_PROTO_SURVEYOR); - }) - - Convey("Send fails with no suvey", { - nng_msg *msg; - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_sendmsg(resp, msg, 0) == NNG_ESTATE); - nng_msg_free(msg); - }) - }) - - Convey("We can create a linked survey pair", { - nng_socket surv; - nng_socket resp; - nng_socket sock; - uint64_t expire; - - So(nng_open(&surv, NNG_PROTO_SURVEYOR) == 0); - So(nng_open(&resp, NNG_PROTO_RESPONDENT) == 0); - - Reset({ - nng_close(surv); - nng_close(resp); - }) - - expire = 50000; - So(nng_setopt(surv, NNG_OPT_SURVEYTIME, &expire, sizeof (expire)) == 0); - - So(nng_listen(surv, addr, NULL, NNG_FLAG_SYNCH) == 0); - So(nng_dial(resp, addr, NULL, NNG_FLAG_SYNCH) == 0); - - // We dial another socket as that will force the - // earlier dial to have completed *fully*. This is a - // hack that only works because our listen logic is - // single threaded. - So(nng_open(&sock, NNG_PROTO_RESPONDENT) == 0); - So(nng_dial(sock, addr, NULL, NNG_FLAG_SYNCH) == 0); - nng_close(sock); - - Convey("Survey works", { - nng_msg *msg; - uint64_t rtimeo; - - So(nng_msg_alloc(&msg, 0) == 0); - APPENDSTR(msg, "abc"); - So(nng_sendmsg(surv, msg, 0) == 0); - msg = NULL; - So(nng_recvmsg(resp, &msg, 0) == 0); - CHECKSTR(msg, "abc"); - nng_msg_trunc(msg, 3); - APPENDSTR(msg, "def"); - So(nng_sendmsg(resp, msg, 0) == 0); - msg = NULL; - So(nng_recvmsg(surv, &msg, 0) == 0); - CHECKSTR(msg, "def"); - nng_msg_free(msg); - - So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT); - - Convey("And goes to non-survey state", { - rtimeo = 200000; - So(nng_setopt(surv, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); - So(nng_recvmsg(surv, &msg, 0) == NNG_ESTATE); - }) - }) - }) - }) + Convey("We can create a SURVEYOR socket", + { + nng_socket surv; + + So(nng_surveyor_open(&surv) == 0); + + Reset({ nng_close(surv); }); + + Convey("Protocols match", { + So(nng_protocol(surv) == + NNG_PROTO_SURVEYOR); + So(nng_peer(surv) == NNG_PROTO_RESPONDENT); + }); + + Convey("Recv with no survey fails", { + nng_msg *msg; + So(nng_recvmsg(surv, &msg, 0) == + NNG_ESTATE); + }); + + Convey("Survey without responder times out", { + uint64_t expire = 50000; + nng_msg *msg; + + So(nng_setopt(surv, NNG_OPT_SURVEYTIME, + &expire, sizeof(expire)) == 0); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(surv, &msg, 0) == + NNG_ETIMEDOUT); + }); + }) + + Convey("We can create a RESPONDENT socket", + { + nng_socket resp; + So(nng_respondent_open(&resp) == 0); + + Reset({ nng_close(resp); }); + + Convey("Protocols match", { + So(nng_protocol(resp) == + NNG_PROTO_RESPONDENT); + So(nng_peer(resp) == + NNG_PROTO_SURVEYOR); + }); + + Convey("Send fails with no suvey", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(resp, msg, 0) == + NNG_ESTATE); + nng_msg_free(msg); + }); + }) + + Convey("We can create a linked survey pair", { + nng_socket surv; + nng_socket resp; + nng_socket sock; + uint64_t expire; + + So(nng_surveyor_open(&surv) == 0); + So(nng_respondent_open(&resp) == 0); + + Reset({ + nng_close(surv); + nng_close(resp); + }); + + expire = 50000; + So(nng_setopt(surv, NNG_OPT_SURVEYTIME, + &expire, sizeof(expire)) == 0); + + So(nng_listen( + surv, addr, NULL, NNG_FLAG_SYNCH) == 0); + So(nng_dial( + resp, addr, NULL, NNG_FLAG_SYNCH) == 0); + + // We dial another socket as that will force + // the earlier dial to have completed *fully*. + // This is a hack that only works because our + // listen logic is single threaded. + So(nng_respondent_open(&sock) == 0); + So(nng_dial( + sock, addr, NULL, NNG_FLAG_SYNCH) == 0); + nng_close(sock); + + Convey("Survey works", { + nng_msg *msg; + uint64_t rtimeo; + + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "abc"); + So(nng_sendmsg(surv, msg, 0) == 0); + msg = NULL; + So(nng_recvmsg(resp, &msg, 0) == 0); + CHECKSTR(msg, "abc"); + nng_msg_trunc(msg, 3); + APPENDSTR(msg, "def"); + So(nng_sendmsg(resp, msg, 0) == 0); + msg = NULL; + So(nng_recvmsg(surv, &msg, 0) == 0); + CHECKSTR(msg, "def"); + nng_msg_free(msg); + + So(nng_recvmsg(surv, &msg, 0) == + NNG_ETIMEDOUT); + + Convey( + "And goes to non-survey state", { + rtimeo = 200000; + So(nng_setopt(surv, + NNG_OPT_RCVTIMEO, + &rtimeo, + sizeof(rtimeo)) == + 0); + So(nng_recvmsg(surv, &msg, + 0) == NNG_ESTATE); + }); + }); + }); + }); nni_fini(); }) diff --git a/tests/tcp.c b/tests/tcp.c index b37e239c..3910cb7f 100644 --- a/tests/tcp.c +++ b/tests/tcp.c @@ -10,36 +10,34 @@ #include "convey.h" #include "trantest.h" - // Inproc tests. TestMain("TCP Transport", { trantest_test_all("tcp://127.0.0.1:4450"); - Convey("We cannot connect to wild cards", { nng_socket s; - So(nng_open(&s, NNG_PROTO_PAIR) == 0); - Reset({ - nng_close(s); - }) - So(nng_dial(s, "tcp://*:5555", NULL, NNG_FLAG_SYNCH) == NNG_EADDRINVAL); - }) + So(nng_pair_open(&s) == 0); + Reset({ nng_close(s); }); + So(nng_dial(s, "tcp://*:5555", NULL, NNG_FLAG_SYNCH) == + NNG_EADDRINVAL); + }); Convey("We can bind to wild card", { nng_socket s1; nng_socket s2; - So(nng_open(&s1, NNG_PROTO_PAIR) == 0); - So(nng_open(&s2, NNG_PROTO_PAIR) == 0); + So(nng_pair_open(&s1) == 0); + So(nng_pair_open(&s2) == 0); Reset({ nng_close(s2); nng_close(s1); - }) + }); So(nng_listen(s1, "tcp://*:5771", NULL, NNG_FLAG_SYNCH) == 0); - So(nng_dial(s2, "tcp://127.0.0.1:5771", NULL, NNG_FLAG_SYNCH) == 0); - }) + So(nng_dial( + s2, "tcp://127.0.0.1:5771", NULL, NNG_FLAG_SYNCH) == 0); + }); nng_fini(); }) diff --git a/tests/trantest.h b/tests/trantest.h index fe6e5431..173c6b45 100644 --- a/tests/trantest.h +++ b/tests/trantest.h @@ -29,8 +29,8 @@ void trantest_init(trantest *tt, const char *addr) { (void) snprintf(tt->addr, sizeof(tt->addr), "%s", addr); - So(nng_open(&tt->reqsock, NNG_PROTO_REQ) == 0); - So(nng_open(&tt->repsock, NNG_PROTO_REP) == 0); + So(nng_req_open(&tt->reqsock) == 0); + So(nng_rep_open(&tt->repsock) == 0); tt->tran = nni_tran_find(addr); So(tt->tran != NULL); -- cgit v1.2.3-70-g09d2