summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-08 21:19:09 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-09 02:38:55 -0700
commitd64f12553eb6ceb67ed6f6a5b2ceb6c061d375ba (patch)
treef6bdac79578176f0d00528d191f862009e761eac
parent5f0398de8edd1ed4ddbf6455c66273a6608aad9a (diff)
downloadnng-d64f12553eb6ceb67ed6f6a5b2ceb6c061d375ba.tar.gz
nng-d64f12553eb6ceb67ed6f6a5b2ceb6c061d375ba.tar.bz2
nng-d64f12553eb6ceb67ed6f6a5b2ceb6c061d375ba.zip
fixes #44 open protocol by "name" (symbol) instead number
fixes #38 Make protocols "pluggable", or at least optional This is a breaking change, as we've done away with the central registered list of protocols, and instead demand the user call nng_xxx_open() where xxx is a protocol name. (We did keep a table around in the compat framework though.) There is a nice way for protocols to plug in via an nni_proto_open(), where they can use a generic constructor that they use to build a protocol specific constructor (passing their ops vector in.)
-rw-r--r--perf/perf.c118
-rw-r--r--src/core/device.c4
-rw-r--r--src/core/protocol.c83
-rw-r--r--src/core/protocol.h36
-rw-r--r--src/core/socket.c134
-rw-r--r--src/core/socket.h7
-rw-r--r--src/nng.c16
-rw-r--r--src/nng.h67
-rw-r--r--src/nng_compat.c35
-rw-r--r--src/protocol/bus/bus.c21
-rw-r--r--src/protocol/pair/pair.c29
-rw-r--r--src/protocol/pipeline/pull.c32
-rw-r--r--src/protocol/pipeline/push.c32
-rw-r--r--src/protocol/pubsub/pub.c21
-rw-r--r--src/protocol/pubsub/sub.c26
-rw-r--r--src/protocol/reqrep/rep.c12
-rw-r--r--src/protocol/reqrep/req.c19
-rw-r--r--src/protocol/survey/respond.c12
-rw-r--r--src/protocol/survey/survey.c14
-rw-r--r--tests/bus.c61
-rw-r--r--tests/event.c68
-rw-r--r--tests/pipeline.c100
-rw-r--r--tests/pollfd.c174
-rw-r--r--tests/pubsub.c113
-rw-r--r--tests/reqrep.c58
-rw-r--r--tests/scalability.c4
-rw-r--r--tests/sock.c330
-rw-r--r--tests/survey.c240
-rw-r--r--tests/tcp.c24
-rw-r--r--tests/trantest.h4
30 files changed, 1004 insertions, 890 deletions
diff --git a/perf/perf.c b/perf/perf.c
index e0526282..4827d4a2 100644
--- a/perf/perf.c
+++ b/perf/perf.c
@@ -1,5 +1,6 @@
//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -9,11 +10,11 @@
#include "nng.h"
+#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <stdarg.h>
// We steal access to the clock and thread functions so that we can
// work on Windows too. These functions are *not* part of nng's public
@@ -108,22 +109,20 @@ die(const char *fmt, ...)
exit(2);
}
-
static int
parse_int(const char *arg, const char *what)
{
- long val;
+ long val;
char *eptr;
val = strtol(arg, &eptr, 10);
// Must be a postive number less than around a billion.
- if ((val < 0) || (val > (1<<30)) || (*eptr != 0) || (eptr == arg)) {
+ if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) {
die("Invalid %s", what);
}
return ((int) val);
}
-
void
do_local_lat(int argc, char **argv)
{
@@ -135,12 +134,11 @@ do_local_lat(int argc, char **argv)
}
msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "round-trips");
+ trips = parse_int(argv[2], "round-trips");
latency_server(argv[0], msgsize, trips);
}
-
void
do_remote_lat(int argc, char **argv)
{
@@ -152,12 +150,11 @@ do_remote_lat(int argc, char **argv)
}
msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "round-trips");
+ trips = parse_int(argv[2], "round-trips");
latency_client(argv[0], msgsize, trips);
}
-
void
do_local_thr(int argc, char **argv)
{
@@ -169,12 +166,11 @@ do_local_thr(int argc, char **argv)
}
msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "count");
+ trips = parse_int(argv[2], "count");
throughput_server(argv[0], msgsize, trips);
}
-
void
do_remote_thr(int argc, char **argv)
{
@@ -186,17 +182,16 @@ do_remote_thr(int argc, char **argv)
}
msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "count");
+ trips = parse_int(argv[2], "count");
throughput_client(argv[0], msgsize, trips);
}
-
struct inproc_args {
- int count;
- int msgsize;
- const char * addr;
- void (*func)(const char *, int, int);
+ int count;
+ int msgsize;
+ const char *addr;
+ void (*func)(const char *, int, int);
};
static void
@@ -207,23 +202,22 @@ do_inproc(void *args)
ia->func(ia->addr, ia->msgsize, ia->count);
}
-
void
do_inproc_lat(int argc, char **argv)
{
- nni_thr thr;
+ nni_thr thr;
struct inproc_args ia;
- int rv;
+ int rv;
nni_init();
if (argc != 2) {
die("Usage: inproc_lat <msg-size> <count>");
}
- ia.addr = "inproc://latency_test";
+ ia.addr = "inproc://latency_test";
ia.msgsize = parse_int(argv[0], "message size");
- ia.count = parse_int(argv[1], "count");
- ia.func = latency_server;
+ ia.count = parse_int(argv[1], "count");
+ ia.func = latency_server;
// Sleep a bit.
nng_usleep(100000);
@@ -236,24 +230,22 @@ do_inproc_lat(int argc, char **argv)
nni_thr_fini(&thr);
}
-
void
do_inproc_thr(int argc, char **argv)
{
- nni_thr thr;
+ nni_thr thr;
struct inproc_args ia;
- int rv;
+ int rv;
nni_init();
if (argc != 2) {
die("Usage: inproc_thr <msg-size> <count>");
}
- ia.addr = "inproc://tput_test";
+ ia.addr = "inproc://tput_test";
ia.msgsize = parse_int(argv[0], "message size");
- ia.count = parse_int(argv[1], "count");
- ia.func = throughput_client;
-
+ ia.count = parse_int(argv[1], "count");
+ ia.func = throughput_client;
if ((rv = nni_thr_init(&thr, do_inproc, &ia)) != 0) {
die("Cannot create thread: %s", nng_strerror(rv));
@@ -263,19 +255,18 @@ do_inproc_thr(int argc, char **argv)
nni_thr_fini(&thr);
}
-
void
latency_client(const char *addr, int msgsize, int trips)
{
nng_socket s;
- nng_msg *msg;
- nni_time start, end;
- int rv;
- int i;
- float total;
- float latency;
-
- if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) {
+ nng_msg * msg;
+ nni_time start, end;
+ int rv;
+ int i;
+ float total;
+ float latency;
+
+ if ((rv = nng_pair_open(&s)) != 0) {
die("nng_socket: %s", nng_strerror(rv));
}
@@ -305,7 +296,7 @@ latency_client(const char *addr, int msgsize, int trips)
nni_msg_free(msg);
nng_close(s);
- total = (float)(end - start);
+ total = (float) (end - start);
latency = (total / (trips * 2));
printf("total time: %.3f [s]\n", total / 1000000.0);
printf("message size: %d [B]\n", msgsize);
@@ -313,16 +304,15 @@ latency_client(const char *addr, int msgsize, int trips)
printf("average latency: %.3f [us]\n", latency);
}
-
void
latency_server(const char *addr, int msgsize, int trips)
{
nng_socket s;
- nng_msg *msg;
- int rv;
- int i;
+ nng_msg * msg;
+ int rv;
+ int i;
- if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) {
+ if ((rv = nng_pair_open(&s)) != 0) {
die("nng_socket: %s", nng_strerror(rv));
}
@@ -352,7 +342,6 @@ latency_server(const char *addr, int msgsize, int trips)
nng_close(s);
}
-
// Our throughput story is quite a mess. Mostly I think because of the poor
// caching and message reuse. We should probably implement a message pooling
// API somewhere.
@@ -361,18 +350,18 @@ void
throughput_server(const char *addr, int msgsize, int count)
{
nng_socket s;
- nng_msg *msg;
- int rv;
- int i;
- size_t len;
- uint64_t start, end;
- double msgpersec, mbps, total;
-
- if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) {
+ nng_msg * msg;
+ int rv;
+ int i;
+ size_t len;
+ uint64_t start, end;
+ double msgpersec, mbps, total;
+
+ if ((rv = nng_pair_open(&s)) != 0) {
die("nng_socket: %s", nng_strerror(rv));
}
len = 128;
- rv = nng_setopt(s, NNG_OPT_RCVBUF, &len, sizeof (len));
+ rv = nng_setopt(s, NNG_OPT_RCVBUF, &len, sizeof(len));
if (rv != 0) {
die("nng_setopt(NNG_OPT_RCVBUF): %s", nng_strerror(rv));
}
@@ -403,9 +392,9 @@ throughput_server(const char *addr, int msgsize, int count)
}
end = nni_clock();
nng_close(s);
- total = (end - start) / 1.0;
+ total = (end - start) / 1.0;
msgpersec = (count * 1000000.0) / total;
- mbps = (count * 8.0 * msgsize);
+ mbps = (count * 8.0 * msgsize);
mbps /= total;
printf("total time: %.3f [s]\n", total / 1000000.0);
printf("message size: %d [B]\n", msgsize);
@@ -414,20 +403,19 @@ throughput_server(const char *addr, int msgsize, int count)
printf("throughput: %.3f [Mb/s]\n", mbps);
}
-
void
throughput_client(const char *addr, int msgsize, int count)
{
nng_socket s;
- nng_msg *msg;
- int rv;
- int i;
- int len;
+ nng_msg * msg;
+ int rv;
+ int i;
+ int len;
// We send one extra zero length message to start the timer.
count++;
- if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) {
+ if ((rv = nng_pair_open(&s)) != 0) {
die("nng_socket: %s", nng_strerror(rv));
}
@@ -435,7 +423,7 @@ throughput_client(const char *addr, int msgsize, int count)
// XXX: other options (TLS in the future?, Linger?)
len = 128;
- rv = nng_setopt(s, NNG_OPT_SNDBUF, &len, sizeof (len));
+ rv = nng_setopt(s, NNG_OPT_SNDBUF, &len, sizeof(len));
if (rv != 0) {
die("nng_setopt(NNG_OPT_SNDBUF): %s", nng_strerror(rv));
}
diff --git a/src/core/device.c b/src/core/device.c
index cd0c8dfc..81aafc3d 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -80,8 +80,8 @@ nni_device(nni_sock *sock1, nni_sock *sock2)
rv = NNG_EINVAL;
goto out;
}
- if ((sock1->s_peer != sock2->s_protocol) ||
- (sock2->s_peer != sock1->s_protocol)) {
+ if ((sock1->s_peer_id.p_id != sock2->s_self_id.p_id) ||
+ (sock2->s_peer_id.p_id != sock1->s_self_id.p_id)) {
rv = NNG_EINVAL;
goto out;
}
diff --git a/src/core/protocol.c b/src/core/protocol.c
index a60454de..7faf9068 100644
--- a/src/core/protocol.c
+++ b/src/core/protocol.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -13,82 +14,14 @@
// Protocol related stuff - generically.
-// The list of protocols is hardwired. This is reasonably unlikely to
-// change, as adding new protocols is not something intended to be done
-// outside of the core.
-extern nni_proto nni_bus_proto;
-extern nni_proto nni_pair_proto;
-extern nni_proto nni_rep_proto;
-extern nni_proto nni_req_proto;
-extern nni_proto nni_pub_proto;
-extern nni_proto nni_sub_proto;
-extern nni_proto nni_push_proto;
-extern nni_proto nni_pull_proto;
-extern nni_proto nni_surveyor_proto;
-extern nni_proto nni_respondent_proto;
-
-static nni_proto *protocols[] = {
- // clang-format off
- &nni_bus_proto,
- &nni_pair_proto,
- &nni_rep_proto,
- &nni_req_proto,
- &nni_pub_proto,
- &nni_sub_proto,
- &nni_push_proto,
- &nni_pull_proto,
- &nni_surveyor_proto,
- &nni_respondent_proto,
- NULL
- // clang-format on
-};
-
-nni_proto *
-nni_proto_find(uint16_t num)
-{
- int i;
- nni_proto *p;
-
- for (i = 0; (p = protocols[i]) != NULL; i++) {
- if (p->proto_self == num) {
- break;
- }
- }
- return (p);
-}
-
-const char *
-nni_proto_name(uint16_t num)
-{
- nni_proto *p;
-
- if ((p = nni_proto_find(num)) == NULL) {
- return (NULL);
- }
- return (p->proto_name);
-}
-
-uint16_t
-nni_proto_number(const char *name)
-{
- nni_proto *p;
- int i;
-
- for (i = 0; (p = protocols[i]) != NULL; i++) {
- if (strcmp(p->proto_name, name) == 0) {
- return (p->proto_self);
- }
- }
- return (NNG_PROTO_NONE);
-}
-
-uint16_t
-nni_proto_peer(uint16_t num)
+int
+nni_proto_open(nng_socket *sockidp, const nni_proto *proto)
{
- nni_proto *p;
+ int rv;
+ nni_sock *sock;
- if ((p = nni_proto_find(num)) == NULL) {
- return (NNG_PROTO_NONE);
+ if ((rv = nni_sock_open(&sock, proto)) == 0) {
+ *sockidp = nni_sock_id(sock); // Keep socket held open.
}
- return (p->proto_peer);
+ return (rv);
}
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 3a133469..14954b49 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -47,7 +48,7 @@ struct nni_proto_pipe_ops {
};
struct nni_proto_sock_ops {
- // sock_initf creates the protocol instance, which will be stored on
+ // sock_init creates the protocol instance, which will be stored on
// the socket. This is run without the sock lock held, and allocates
// storage or other resources for the socket.
int (*sock_init)(void **, nni_sock *);
@@ -82,15 +83,31 @@ struct nni_proto_sock_ops {
nni_msg *(*sock_sfilter)(void *, nni_msg *);
};
+typedef struct nni_proto_id {
+ uint16_t p_id;
+ const char *p_name;
+} nni_proto_id;
+
struct nni_proto {
- uint16_t proto_self; // our 16-bit D
- uint16_t proto_peer; // who we peer with (ID)
- const char * proto_name; // Our name
+ uint32_t proto_version; // Ops vector version
+ nni_proto_id proto_self; // Our identity
+ nni_proto_id proto_peer; // Peer identity
uint32_t proto_flags; // Protocol flags
const nni_proto_sock_ops *proto_sock_ops; // Per-socket opeations
const nni_proto_pipe_ops *proto_pipe_ops; // Per-pipe operations.
};
+// We quite intentionally use a signature where the upper word is nonzero,
+// which ensures that if we get garbage we will reject it. This is more
+// likely to mismatch than all zero bytes would. The actual version is
+// stored in the lower word; this is not semver -- the numbers are just
+// increasing - we doubt it will increase more than a handful of times
+// during the life of the project. If we add a new version, please keep
+// the old version around -- it may be possible to automatically convert
+// older versions in the future.
+#define NNI_PROTOCOL_V0 0x50520000 // "pr\0\0"
+#define NNI_PROTOCOL_VERSION NNI_PROTOCOL_V0
+
// These flags determine which operations make sense. We use them so that
// we can reject attempts to create notification fds for operations that make
// no sense.
@@ -98,11 +115,10 @@ struct nni_proto {
#define NNI_PROTO_FLAG_SND 2 // Protocol can send
#define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv
-// These functions are not used by protocols, but rather by the socket
-// core implementation. The lookups can be used by transports as well.
-extern nni_proto * nni_proto_find(uint16_t);
-extern const char *nni_proto_name(uint16_t);
-extern uint16_t nni_proto_number(const char *);
-extern uint16_t nni_proto_peer(uint16_t);
+// nni_proto_open is called by the protocol to create a socket instance
+// with its ops vector. The intent is that applications will only see
+// the single protocol-specific constructure, like nng_pair_v0_open(),
+// which should just be a thin wrapper around this.
+extern int nni_proto_open(nng_socket *, const nni_proto *);
#endif // CORE_PROTOCOL_H
diff --git a/src/core/socket.c b/src/core/socket.c
index f2dbb573..f01379a8 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -83,7 +83,7 @@ nni_sock_pipe_start(nni_pipe *pipe)
// We're closing, bail out.
return (NNG_ECLOSED);
}
- if (nni_pipe_peer(pipe) != sock->s_peer) {
+ if (nni_pipe_peer(pipe) != sock->s_peer_id.p_id) {
// Peer protocol mismatch.
return (NNG_EPROTO);
}
@@ -175,7 +175,7 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
- if (nni_pipe_peer(pipe) != sock->s_peer) {
+ if (nni_pipe_peer(pipe) != sock->s_peer_id.p_id) {
nni_mtx_unlock(&sock->s_mx);
return (NNG_EPROTO);
}
@@ -305,47 +305,6 @@ nni_sock_unnotify(nni_sock *sock, nni_notify *notify)
NNI_FREE_STRUCT(notify);
}
-static nni_msg *
-nni_sock_nullfilter(void *arg, nni_msg *mp)
-{
- NNI_ARG_UNUSED(arg);
- return (mp);
-}
-
-static int
-nni_sock_nullgetopt(void *arg, int num, void *data, size_t *szp)
-{
- NNI_ARG_UNUSED(arg);
- NNI_ARG_UNUSED(num);
- NNI_ARG_UNUSED(data);
- NNI_ARG_UNUSED(szp);
- return (NNG_ENOTSUP);
-}
-
-static int
-nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz)
-{
- NNI_ARG_UNUSED(arg);
- NNI_ARG_UNUSED(num);
- NNI_ARG_UNUSED(data);
- NNI_ARG_UNUSED(sz);
- return (NNG_ENOTSUP);
-}
-
-static void
-nni_sock_nullop(void *arg)
-{
- NNI_ARG_UNUSED(arg);
-}
-
-static int
-nni_sock_nullstartpipe(void *arg)
-{
- NNI_ARG_UNUSED(arg);
-
- return (0);
-}
-
static void *
nni_sock_ctor(uint32_t id)
{
@@ -456,23 +415,23 @@ nni_sock_sys_fini(void)
nni_mtx_fini(&nni_sock_lk);
}
-// nn_sock_open creates the underlying socket.
int
-nni_sock_open(nni_sock **sockp, uint16_t pnum)
+nni_sock_open(nni_sock **sockp, const nni_proto *proto)
{
nni_sock * sock;
- nni_proto * proto;
int rv;
nni_proto_sock_ops *sops;
nni_proto_pipe_ops *pops;
uint32_t sockid;
+ if (proto->proto_version != NNI_PROTOCOL_VERSION) {
+ // unsupported protocol version
+ return (NNG_ENOTSUP);
+ }
+
if ((rv = nni_init()) != 0) {
return (rv);
}
- if ((proto = nni_proto_find(pnum)) == NULL) {
- return (NNG_ENOTSUP);
- }
rv = nni_objhash_alloc(nni_socks, &sockid, (void **) &sock);
if (rv != 0) {
@@ -480,40 +439,20 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
// We make a copy of the protocol operations.
- sock->s_protocol = proto->proto_self;
- sock->s_peer = proto->proto_peer;
+ sock->s_self_id = proto->proto_self;
+ sock->s_peer_id = proto->proto_peer;
sock->s_flags = proto->proto_flags;
sock->s_sock_ops = *proto->proto_sock_ops;
+ sock->s_pipe_ops = *proto->proto_pipe_ops;
sops = &sock->s_sock_ops;
- if (sops->sock_sfilter == NULL) {
- sops->sock_sfilter = nni_sock_nullfilter;
- }
- if (sops->sock_rfilter == NULL) {
- sops->sock_rfilter = nni_sock_nullfilter;
- }
- if (sops->sock_getopt == NULL) {
- sops->sock_getopt = nni_sock_nullgetopt;
- }
- if (sops->sock_setopt == NULL) {
- sops->sock_setopt = nni_sock_nullsetopt;
- }
- if (sops->sock_close == NULL) {
- sops->sock_close = nni_sock_nullop;
- }
- if (sops->sock_open == NULL) {
- sops->sock_open = nni_sock_nullop;
- }
- sock->s_pipe_ops = *proto->proto_pipe_ops;
- pops = &sock->s_pipe_ops;
- if (pops->pipe_start == NULL) {
- pops->pipe_start = nni_sock_nullstartpipe;
- }
- if (pops->pipe_stop == NULL) {
- pops->pipe_stop = nni_sock_nullop;
- }
+ NNI_ASSERT(sock->s_sock_ops.sock_open != NULL);
+ NNI_ASSERT(sock->s_sock_ops.sock_close != NULL);
+
+ NNI_ASSERT(sock->s_pipe_ops.pipe_start != NULL);
+ NNI_ASSERT(sock->s_pipe_ops.pipe_stop != NULL);
- if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) {
+ if ((rv = sock->s_sock_ops.sock_init(&sock->s_data, sock)) != 0) {
nni_objhash_unref(nni_socks, sockid);
return (rv);
}
@@ -730,7 +669,9 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire)
}
besteffort = sock->s_besteffort;
- msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg);
+ if (sock->s_sock_ops.sock_sfilter != NULL) {
+ msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg);
+ }
nni_mtx_unlock(&sock->s_mx);
if (msg == NULL) {
@@ -780,9 +721,11 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
if (rv != 0) {
return (rv);
}
- nni_mtx_lock(&sock->s_mx);
- msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg);
- nni_mtx_unlock(&sock->s_mx);
+ if (sock->s_sock_ops.sock_rfilter != NULL) {
+ nni_mtx_lock(&sock->s_mx);
+ msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg);
+ nni_mtx_unlock(&sock->s_mx);
+ }
if (msg != NULL) {
break;
}
@@ -797,13 +740,13 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
uint16_t
nni_sock_proto(nni_sock *sock)
{
- return (sock->s_protocol);
+ return (sock->s_self_id.p_id);
}
uint16_t
nni_sock_peer(nni_sock *sock)
{
- return (sock->s_peer);
+ return (sock->s_peer_id.p_id);
}
nni_duration
@@ -924,10 +867,13 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size)
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
- rv = sock->s_sock_ops.sock_setopt(sock->s_data, opt, val, size);
- if (rv != NNG_ENOTSUP) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
+ if (sock->s_sock_ops.sock_setopt != NULL) {
+ rv =
+ sock->s_sock_ops.sock_setopt(sock->s_data, opt, val, size);
+ if (rv != NNG_ENOTSUP) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+ }
}
switch (opt) {
case NNG_OPT_LINGER:
@@ -970,11 +916,15 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
- rv = sock->s_sock_ops.sock_getopt(sock->s_data, opt, val, sizep);
- if (rv != NNG_ENOTSUP) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
+ if (sock->s_sock_ops.sock_getopt != NULL) {
+ rv = sock->s_sock_ops.sock_getopt(
+ sock->s_data, opt, val, sizep);
+ if (rv != NNG_ENOTSUP) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+ }
}
+
switch (opt) {
case NNG_OPT_LINGER:
rv = nni_getopt_duration(&sock->s_linger, val, sizep);
diff --git a/src/core/socket.h b/src/core/socket.h
index 54c1b4da..6824b04b 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -25,8 +25,9 @@ struct nni_socket {
nni_list_node s_node;
- uint16_t s_protocol;
- uint16_t s_peer;
+ nni_proto_id s_self_id;
+ nni_proto_id s_peer_id;
+
uint32_t s_flags;
nni_proto_pipe_ops s_pipe_ops;
@@ -67,7 +68,7 @@ extern void nni_sock_sys_fini(void);
extern int nni_sock_find(nni_sock **, uint32_t);
extern void nni_sock_rele(nni_sock *);
-extern int nni_sock_open(nni_sock **, uint16_t);
+extern int nni_sock_open(nni_sock **, const nni_proto *);
extern void nni_sock_close(nni_sock *);
extern void nni_sock_closeall(void);
extern int nni_sock_shutdown(nni_sock *);
diff --git a/src/nng.c b/src/nng.c
index d5cc4f2b..815fb2b8 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -21,22 +21,6 @@
#include <string.h>
-int
-nng_open(nng_socket *sidp, uint16_t proto)
-{
- int rv;
- nni_sock *sock;
-
- if ((rv = nni_sock_open(&sock, proto)) != 0) {
- return (rv);
- }
- *sidp = nni_sock_id(sock);
-
- // Keep the socket "held" until it is explicitly closed.
-
- return (0);
-}
-
void
nng_fini(void)
{
diff --git a/src/nng.h b/src/nng.h
index aac7ec89..99c95b92 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -52,11 +52,6 @@ typedef struct nng_snapshot nng_snapshot;
typedef struct nng_stat nng_stat;
typedef uint32_t nng_endpoint; // XXX: REMOVE ME.
-// nng_open simply creates a socket of the given class. It returns an
-// error code on failure, or zero on success. The socket starts in cooked
-// mode.
-NNG_DECL int nng_open(nng_socket *, uint16_t proto);
-
// nng_fini is used to terminate the library, freeing certain global resources.
// Its a good idea to call this with atexit() or during application shutdown.
// For most cases, this call is optional, but failure to do so may cause
@@ -293,20 +288,58 @@ enum nng_flag_enum {
// a valid protocol numbered 0 (NNG_PROTO_NONE).
#define NNG_PROTO(major, minor) (((major) *16) + (minor))
enum nng_proto_enum {
- NNG_PROTO_NONE = NNG_PROTO(0, 0),
- NNG_PROTO_PAIR = NNG_PROTO(1, 0),
- NNG_PROTO_PUB = NNG_PROTO(2, 0),
- NNG_PROTO_SUB = NNG_PROTO(2, 1),
- NNG_PROTO_REQ = NNG_PROTO(3, 0),
- NNG_PROTO_REP = NNG_PROTO(3, 1),
- NNG_PROTO_PUSH = NNG_PROTO(5, 0),
- NNG_PROTO_PULL = NNG_PROTO(5, 1),
- NNG_PROTO_SURVEYOR = NNG_PROTO(6, 2),
- NNG_PROTO_RESPONDENT = NNG_PROTO(6, 3),
- NNG_PROTO_BUS = NNG_PROTO(7, 0),
- NNG_PROTO_STAR = NNG_PROTO(100, 0),
+ NNG_PROTO_NONE = NNG_PROTO(0, 0),
+ NNG_PROTO_PAIR_V0 = NNG_PROTO(1, 0),
+ NNG_PROTO_PUB_V0 = NNG_PROTO(2, 0),
+ NNG_PROTO_SUB_V0 = NNG_PROTO(2, 1),
+ NNG_PROTO_REQ_V0 = NNG_PROTO(3, 0),
+ NNG_PROTO_REP_V0 = NNG_PROTO(3, 1),
+ NNG_PROTO_PUSH_V0 = NNG_PROTO(5, 0),
+ NNG_PROTO_PULL_V0 = NNG_PROTO(5, 1),
+ NNG_PROTO_SURVEYOR_V0 = NNG_PROTO(6, 2),
+ NNG_PROTO_RESPONDENT_V0 = NNG_PROTO(6, 3),
+ NNG_PROTO_BUS_V0 = NNG_PROTO(7, 0),
+ NNG_PROTO_STAR_V0 = NNG_PROTO(100, 0),
+
+ // "Legacy" names. Please use explicit versioned names above.
+ NNG_PROTO_BUS = NNG_PROTO_BUS_V0,
+ NNG_PROTO_PAIR = NNG_PROTO_PAIR_V0,
+ NNG_PROTO_SUB = NNG_PROTO_SUB_V0,
+ NNG_PROTO_PUB = NNG_PROTO_PUB_V0,
+ NNG_PROTO_REQ = NNG_PROTO_REQ_V0,
+ NNG_PROTO_REP = NNG_PROTO_REP_V0,
+ NNG_PROTO_PUSH = NNG_PROTO_PUSH_V0,
+ NNG_PROTO_PULL = NNG_PROTO_PULL_V0,
+ NNG_PROTO_SURVEYOR = NNG_PROTO_SURVEYOR_V0,
+ NNG_PROTO_RESPONDENT = NNG_PROTO_RESPONDENT_V0,
};
+// Builtin protocol socket constructors.
+extern int nng_bus0_open(nng_socket *);
+extern int nng_pair0_open(nng_socket *);
+extern int nng_pub0_open(nng_socket *);
+extern int nng_sub0_open(nng_socket *);
+extern int nng_push0_open(nng_socket *);
+extern int nng_pull0_open(nng_socket *);
+extern int nng_req0_open(nng_socket *);
+extern int nng_rep0_open(nng_socket *);
+extern int nng_surveyor0_open(nng_socket *);
+extern int nng_respondent0_open(nng_socket *);
+
+// Default versions. These provide compile time defaults; note that
+// the actual protocols are baked into the binary; this should avoid
+// suprising. Choosing a new protocol should be done explicitly.
+#define nng_bus_open nng_bus0_open
+#define nng_pair_open nng_pair0_open
+#define nng_pub_open nng_pub0_open
+#define nng_sub_open nng_sub0_open
+#define nng_push_open nng_push0_open
+#define nng_pull_open nng_pull0_open
+#define nng_req_open nng_req0_open
+#define nng_rep_open nng_rep0_open
+#define nng_surveyor_open nng_surveyor0_open
+#define nng_respondent_open nng_respondent0_open
+
// Options. We encode option numbers as follows:
//
// <level> - 0: socket, 1: transport
diff --git a/src/nng_compat.c b/src/nng_compat.c
index cb8d29ab..34fc6553 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -87,17 +87,48 @@ nn_errno(void)
return (errno);
}
+static const struct {
+ uint16_t p_id;
+ int (*p_open)(nng_socket *);
+} nn_protocols[] = {
+ // clang-format off
+ { NNG_PROTO_BUS_V0, nng_bus_open },
+ { NNG_PROTO_PAIR_V0, nng_pair_open },
+ { NNG_PROTO_PUSH_V0, nng_push_open },
+ { NNG_PROTO_PULL_V0, nng_pull_open },
+ { NNG_PROTO_PUB_V0, nng_pub_open },
+ { NNG_PROTO_SUB_V0, nng_sub_open },
+ { NNG_PROTO_REQ_V0, nng_req_open },
+ { NNG_PROTO_REP_V0, nng_rep_open },
+ { NNG_PROTO_SURVEYOR_V0, nng_surveyor_open },
+ { NNG_PROTO_RESPONDENT_V0, nng_respondent_open },
+ { NNG_PROTO_NONE, NULL },
+ // clang-format on
+};
+
int
nn_socket(int domain, int protocol)
{
nng_socket sock;
int rv;
+ int i;
if ((domain != AF_SP) && (domain != AF_SP_RAW)) {
- errno = EAFNOSUPPORT;
+ nn_seterror(EAFNOSUPPORT);
return (-1);
}
- if ((rv = nng_open(&sock, protocol)) != 0) {
+
+ for (i = 0; nn_protocols[i].p_id != NNG_PROTO_NONE; i++) {
+ if (nn_protocols[i].p_id == protocol) {
+ break;
+ }
+ }
+ if (nn_protocols[i].p_open == NULL) {
+ nn_seterror(ENOTSUP);
+ return (-1);
+ }
+
+ if ((rv = nn_protocols[i].p_open(&sock)) != 0) {
nn_seterror(rv);
return (-1);
}
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 3070b90d..c5a6ce06 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -104,6 +104,14 @@ nni_bus_sock_open(void *arg)
}
static void
+nni_bus_sock_close(void *arg)
+{
+ nni_bus_sock *psock = arg;
+
+ nni_aio_cancel(&psock->aio_getq, NNG_ECLOSED);
+}
+
+static void
nni_bus_pipe_fini(void *arg)
{
nni_bus_pipe *ppipe = arg;
@@ -383,6 +391,7 @@ static nni_proto_sock_ops nni_bus_sock_ops = {
.sock_init = nni_bus_sock_init,
.sock_fini = nni_bus_sock_fini,
.sock_open = nni_bus_sock_open,
+ .sock_close = nni_bus_sock_close,
.sock_setopt = nni_bus_sock_setopt,
.sock_getopt = nni_bus_sock_getopt,
};
@@ -390,10 +399,16 @@ static nni_proto_sock_ops nni_bus_sock_ops = {
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
nni_proto nni_bus_proto = {
- .proto_self = NNG_PROTO_BUS,
- .proto_peer = NNG_PROTO_BUS,
- .proto_name = "bus",
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_BUS_V0, "bus" },
+ .proto_peer = { NNG_PROTO_BUS_V0, "bus" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_bus_sock_ops,
.proto_pipe_ops = &nni_bus_pipe_ops,
};
+
+int
+nng_bus0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &nni_bus_proto));
+}
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 55ce5aa9..59465293 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -231,6 +231,18 @@ nni_pair_send_cb(void *arg)
nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq);
}
+static void
+nni_pair_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+nni_pair_sock_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
static int
nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
@@ -263,9 +275,6 @@ nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
-
static nni_proto_pipe_ops nni_pair_pipe_ops = {
.pipe_init = nni_pair_pipe_init,
.pipe_fini = nni_pair_pipe_fini,
@@ -276,15 +285,23 @@ static nni_proto_pipe_ops nni_pair_pipe_ops = {
static nni_proto_sock_ops nni_pair_sock_ops = {
.sock_init = nni_pair_sock_init,
.sock_fini = nni_pair_sock_fini,
+ .sock_open = nni_pair_sock_open,
+ .sock_close = nni_pair_sock_close,
.sock_setopt = nni_pair_sock_setopt,
.sock_getopt = nni_pair_sock_getopt,
};
nni_proto nni_pair_proto = {
- .proto_self = NNG_PROTO_PAIR,
- .proto_peer = NNG_PROTO_PAIR,
- .proto_name = "pair",
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_PAIR_V0, "pair" },
+ .proto_peer = { NNG_PROTO_PAIR_V0, "pair" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_pair_sock_ops,
.proto_pipe_ops = &nni_pair_pipe_ops,
};
+
+int
+nng_pair0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &nni_pair_proto));
+}
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index cde79824..e3c73342 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -56,9 +56,7 @@ nni_pull_sock_fini(void *arg)
{
nni_pull_sock *pull = arg;
- if (pull != NULL) {
- NNI_FREE_STRUCT(pull);
- }
+ NNI_FREE_STRUCT(pull);
}
static int
@@ -163,6 +161,18 @@ nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg)
nni_msgq_aio_put(pull->urq, &pp->putq_aio);
}
+static void
+nni_pull_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+nni_pull_sock_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
static int
nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
@@ -195,8 +205,6 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_pull_pipe_ops = {
.pipe_init = nni_pull_pipe_init,
.pipe_fini = nni_pull_pipe_fini,
@@ -207,15 +215,23 @@ static nni_proto_pipe_ops nni_pull_pipe_ops = {
static nni_proto_sock_ops nni_pull_sock_ops = {
.sock_init = nni_pull_sock_init,
.sock_fini = nni_pull_sock_fini,
+ .sock_open = nni_pull_sock_open,
+ .sock_close = nni_pull_sock_close,
.sock_setopt = nni_pull_sock_setopt,
.sock_getopt = nni_pull_sock_getopt,
};
nni_proto nni_pull_proto = {
- .proto_self = NNG_PROTO_PULL,
- .proto_peer = NNG_PROTO_PUSH,
- .proto_name = "pull",
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_PULL_V0, "pull" },
+ .proto_peer = { NNG_PROTO_PUSH_V0, "push" },
.proto_flags = NNI_PROTO_FLAG_RCV,
.proto_pipe_ops = &nni_pull_pipe_ops,
.proto_sock_ops = &nni_pull_sock_ops,
};
+
+int
+nng_pull0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &nni_pull_proto));
+}
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index b7d4322c..14b3b191 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -63,9 +63,19 @@ nni_push_sock_fini(void *arg)
{
nni_push_sock *push = arg;
- if (push != NULL) {
- NNI_FREE_STRUCT(push);
- }
+ NNI_FREE_STRUCT(push);
+}
+
+static void
+nni_push_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+nni_push_sock_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
}
static void
@@ -221,8 +231,6 @@ nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_push_pipe_ops = {
.pipe_init = nni_push_pipe_init,
.pipe_fini = nni_push_pipe_fini,
@@ -233,15 +241,23 @@ static nni_proto_pipe_ops nni_push_pipe_ops = {
static nni_proto_sock_ops nni_push_sock_ops = {
.sock_init = nni_push_sock_init,
.sock_fini = nni_push_sock_fini,
+ .sock_open = nni_push_sock_open,
+ .sock_close = nni_push_sock_close,
.sock_setopt = nni_push_sock_setopt,
.sock_getopt = nni_push_sock_getopt,
};
nni_proto nni_push_proto = {
- .proto_self = NNG_PROTO_PUSH,
- .proto_peer = NNG_PROTO_PULL,
- .proto_name = "push",
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_PUSH_V0, "push" },
+ .proto_peer = { NNG_PROTO_PULL_V0, "pull" },
.proto_flags = NNI_PROTO_FLAG_SND,
.proto_pipe_ops = &nni_push_pipe_ops,
.proto_sock_ops = &nni_push_sock_ops,
};
+
+int
+nng_push0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &nni_push_proto));
+}
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index e32f179a..161a5d79 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -98,6 +98,14 @@ nni_pub_sock_open(void *arg)
}
static void
+nni_pub_sock_close(void *arg)
+{
+ nni_pub_sock *pub = arg;
+
+ nni_aio_cancel(&pub->aio_getq, NNG_ECLOSED);
+}
+
+static void
nni_pub_pipe_fini(void *arg)
{
nni_pub_pipe *pp = arg;
@@ -319,15 +327,22 @@ nni_proto_sock_ops nni_pub_sock_ops = {
.sock_init = nni_pub_sock_init,
.sock_fini = nni_pub_sock_fini,
.sock_open = nni_pub_sock_open,
+ .sock_close = nni_pub_sock_close,
.sock_setopt = nni_pub_sock_setopt,
.sock_getopt = nni_pub_sock_getopt,
};
nni_proto nni_pub_proto = {
- .proto_self = NNG_PROTO_PUB,
- .proto_peer = NNG_PROTO_SUB,
- .proto_name = "pub",
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_PUB_V0, "pub" },
+ .proto_peer = { NNG_PROTO_SUB_V0, "sub" },
.proto_flags = NNI_PROTO_FLAG_SND,
.proto_sock_ops = &nni_pub_sock_ops,
.proto_pipe_ops = &nni_pub_pipe_ops,
};
+
+int
+nng_pub0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &nni_pub_proto));
+}
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 03d76e2d..53f01e0f 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -79,6 +79,18 @@ nni_sub_sock_fini(void *arg)
NNI_FREE_STRUCT(sub);
}
+static void
+nni_sub_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+nni_sub_sock_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
static int
nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
{
@@ -330,16 +342,24 @@ static nni_proto_pipe_ops nni_sub_pipe_ops = {
static nni_proto_sock_ops nni_sub_sock_ops = {
.sock_init = nni_sub_sock_init,
.sock_fini = nni_sub_sock_fini,
+ .sock_open = nni_sub_sock_open,
+ .sock_close = nni_sub_sock_close,
.sock_setopt = nni_sub_sock_setopt,
.sock_getopt = nni_sub_sock_getopt,
.sock_rfilter = nni_sub_sock_rfilter,
};
nni_proto nni_sub_proto = {
- .proto_self = NNG_PROTO_SUB,
- .proto_peer = NNG_PROTO_PUB,
- .proto_name = "sub",
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_SUB_V0, "sub" },
+ .proto_peer = { NNG_PROTO_PUB_V0, "pub" },
.proto_flags = NNI_PROTO_FLAG_RCV,
.proto_sock_ops = &nni_sub_sock_ops,
.proto_pipe_ops = &nni_sub_pipe_ops,
};
+
+int
+nng_sub0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &nni_sub_proto));
+}
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index c7546182..1adfd0f8 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -488,10 +488,16 @@ static nni_proto_sock_ops nni_rep_sock_ops = {
};
nni_proto nni_rep_proto = {
- .proto_self = NNG_PROTO_REP,
- .proto_peer = NNG_PROTO_REQ,
- .proto_name = "rep",
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_REP_V0, "rep" },
+ .proto_peer = { NNG_PROTO_REQ_V0, "req" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_rep_sock_ops,
.proto_pipe_ops = &nni_rep_pipe_ops,
};
+
+int
+nng_rep0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &nni_rep_proto));
+}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 7ec53c90..d0dd3887 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -112,6 +112,12 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
}
static void
+nni_req_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
nni_req_sock_close(void *arg)
{
nni_req_sock *req = arg;
@@ -626,6 +632,7 @@ static nni_proto_pipe_ops nni_req_pipe_ops = {
static nni_proto_sock_ops nni_req_sock_ops = {
.sock_init = nni_req_sock_init,
.sock_fini = nni_req_sock_fini,
+ .sock_open = nni_req_sock_open,
.sock_close = nni_req_sock_close,
.sock_setopt = nni_req_sock_setopt,
.sock_getopt = nni_req_sock_getopt,
@@ -634,10 +641,16 @@ static nni_proto_sock_ops nni_req_sock_ops = {
};
nni_proto nni_req_proto = {
- .proto_self = NNG_PROTO_REQ,
- .proto_peer = NNG_PROTO_REP,
- .proto_name = "req",
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_REQ_V0, "req" },
+ .proto_peer = { NNG_PROTO_REP_V0, "rep" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_req_sock_ops,
.proto_pipe_ops = &nni_req_pipe_ops,
};
+
+int
+nng_req0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &nni_req_proto));
+}
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 089e730e..4db79b86 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -504,10 +504,16 @@ static nni_proto_sock_ops nni_resp_sock_ops = {
};
nni_proto nni_respondent_proto = {
- .proto_self = NNG_PROTO_RESPONDENT,
- .proto_peer = NNG_PROTO_SURVEYOR,
- .proto_name = "respondent",
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_RESPONDENT_V0, "respondent" },
+ .proto_peer = { NNG_PROTO_SURVEYOR_V0, "surveyor" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_resp_sock_ops,
.proto_pipe_ops = &nni_resp_pipe_ops,
};
+
+int
+nng_respondent0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &nni_respondent_proto));
+}
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 633e1491..91fe4ad3 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -470,13 +470,17 @@ static nni_proto_sock_ops nni_surv_sock_ops = {
.sock_sfilter = nni_surv_sock_sfilter,
};
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
nni_proto nni_surveyor_proto = {
- .proto_self = NNG_PROTO_SURVEYOR,
- .proto_peer = NNG_PROTO_RESPONDENT,
- .proto_name = "surveyor",
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_SURVEYOR_V0, "surveyor" },
+ .proto_peer = { NNG_PROTO_RESPONDENT_V0, "respondent" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_surv_sock_ops,
.proto_pipe_ops = &nni_surv_pipe_ops,
};
+
+int
+nng_surveyor0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &nni_surveyor_proto));
+}
diff --git a/tests/bus.c b/tests/bus.c
index 78740a37..e96a0a53 100644
--- a/tests/bus.c
+++ b/tests/bus.c
@@ -8,14 +8,15 @@
//
#include "convey.h"
-#include "nng.h"
#include "core/nng_impl.h"
+#include "nng.h"
#include <string.h>
-#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
-#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\
- So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
+#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
+#define CHECKSTR(m, s) \
+ So(nng_msg_len(m) == strlen(s)); \
+ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
Main({
const char *addr = "inproc://test";
@@ -27,54 +28,57 @@ Main({
Convey("We can create a BUS socket", {
nng_socket bus;
- So(nng_open(&bus, NNG_PROTO_BUS) == 0);
+ So(nng_bus_open(&bus) == 0);
- Reset({
- nng_close(bus);
- })
+ Reset({ nng_close(bus); });
Convey("Protocols match", {
So(nng_protocol(bus) == NNG_PROTO_BUS);
So(nng_peer(bus) == NNG_PROTO_BUS);
- })
- })
+ });
+ });
Convey("We can create a linked BUS topology", {
nng_socket bus1;
nng_socket bus2;
nng_socket bus3;
- uint64_t rtimeo;
+ uint64_t rtimeo;
+
+ So(nng_bus_open(&bus1) == 0);
+ So(nng_bus_open(&bus2) == 0);
+ So(nng_bus_open(&bus3) == 0);
- So(nng_open(&bus1, NNG_PROTO_BUS) == 0);
- So(nng_open(&bus2, NNG_PROTO_BUS) == 0);
- So(nng_open(&bus3, NNG_PROTO_BUS) == 0);
-
Reset({
nng_close(bus1);
nng_close(bus2);
nng_close(bus3);
- })
+ });
So(nng_listen(bus1, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_dial(bus2, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_dial(bus3, addr, NULL, NNG_FLAG_SYNCH) == 0);
rtimeo = 50000;
- So(nng_setopt(bus1, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0);
+ So(nng_setopt(bus1, NNG_OPT_RCVTIMEO, &rtimeo,
+ sizeof(rtimeo)) == 0);
rtimeo = 50000;
- So(nng_setopt(bus2, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0);
+ So(nng_setopt(bus2, NNG_OPT_RCVTIMEO, &rtimeo,
+ sizeof(rtimeo)) == 0);
rtimeo = 50000;
- So(nng_setopt(bus3, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0);
+ So(nng_setopt(bus3, NNG_OPT_RCVTIMEO, &rtimeo,
+ sizeof(rtimeo)) == 0);
Convey("Messages delivered", {
nng_msg *msg;
-
// This is just a poor man's sleep.
- So(nng_recvmsg(bus1, &msg, 0) == NNG_ETIMEDOUT);
- So(nng_recvmsg(bus2, &msg, 0) == NNG_ETIMEDOUT);
- So(nng_recvmsg(bus3, &msg, 0) == NNG_ETIMEDOUT);
-
+ So(nng_recvmsg(bus1, &msg, 0) ==
+ NNG_ETIMEDOUT);
+ So(nng_recvmsg(bus2, &msg, 0) ==
+ NNG_ETIMEDOUT);
+ So(nng_recvmsg(bus3, &msg, 0) ==
+ NNG_ETIMEDOUT);
+
So(nng_msg_alloc(&msg, 0) == 0);
APPENDSTR(msg, "99bits");
So(nng_sendmsg(bus2, msg, 0) == 0);
@@ -82,7 +86,8 @@ Main({
So(nng_recvmsg(bus1, &msg, 0) == 0);
CHECKSTR(msg, "99bits");
nng_msg_free(msg);
- So(nng_recvmsg(bus3, &msg, 0) == NNG_ETIMEDOUT);
+ So(nng_recvmsg(bus3, &msg, 0) ==
+ NNG_ETIMEDOUT);
So(nng_msg_alloc(&msg, 0) == 0);
APPENDSTR(msg, "onthe");
@@ -95,7 +100,7 @@ Main({
So(nng_recvmsg(bus3, &msg, 0) == 0);
CHECKSTR(msg, "onthe");
nng_msg_free(msg);
- })
- })
- })
+ });
+ });
+ });
})
diff --git a/tests/event.c b/tests/event.c
index 4c7014e9..b2d15780 100644
--- a/tests/event.c
+++ b/tests/event.c
@@ -9,21 +9,22 @@
#include "convey.h"
#include "nng.h"
-#include <string.h>
#include <assert.h>
+#include <string.h>
-#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
-#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\
- So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
+#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
+#define CHECKSTR(m, s) \
+ So(nng_msg_len(m) == strlen(s)); \
+ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
struct evcnt {
nng_socket sock;
- int readable;
- int writeable;
- int pipeadd;
- int piperem;
- int epadd;
- int eprem;
- int err;
+ int readable;
+ int writeable;
+ int pipeadd;
+ int piperem;
+ int epadd;
+ int eprem;
+ int err;
};
void
@@ -68,25 +69,25 @@ Main({
Test("Event Handling", {
Convey("Given a connected pair of pair sockets", {
- nng_socket sock1;
- nng_socket sock2;
+ nng_socket sock1;
+ nng_socket sock2;
struct evcnt evcnt1;
struct evcnt evcnt2;
- nng_notify *notify1;
- nng_notify *notify2;
+ nng_notify * notify1;
+ nng_notify * notify2;
- So(nng_open(&sock1, NNG_PROTO_PAIR) == 0);
- So(nng_open(&sock2, NNG_PROTO_PAIR) == 0);
+ So(nng_pair0_open(&sock1) == 0);
+ So(nng_pair0_open(&sock2) == 0);
- memset(&evcnt1, 0, sizeof (evcnt1));
- memset(&evcnt2, 0, sizeof (evcnt2));
+ memset(&evcnt1, 0, sizeof(evcnt1));
+ memset(&evcnt2, 0, sizeof(evcnt2));
evcnt1.sock = sock1;
evcnt2.sock = sock2;
Reset({
nng_close(sock1);
nng_close(sock2);
- })
+ });
So(nng_listen(sock1, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_dial(sock2, addr, NULL, NNG_FLAG_SYNCH) == 0);
@@ -95,8 +96,12 @@ Main({
nng_usleep(100000);
Convey("We can register callbacks", {
- So((notify1 = nng_setnotify(sock1, NNG_EV_CAN_SND, bump, &evcnt1)) != NULL);
- So((notify2 = nng_setnotify(sock2, NNG_EV_CAN_RCV, bump, &evcnt2)) != NULL);
+ So((notify1 = nng_setnotify(sock1,
+ NNG_EV_CAN_SND, bump, &evcnt1)) !=
+ NULL);
+ So((notify2 = nng_setnotify(sock2,
+ NNG_EV_CAN_RCV, bump, &evcnt2)) !=
+ NULL);
Convey("They are called", {
nng_msg *msg;
@@ -112,27 +117,28 @@ Main({
// this. Probably the msgq needs to
// toggle on reads.
- //nng_usleep(20000);
+ // nng_usleep(20000);
- //So(nng_recvmsg(sock2, &msg, 0) == 0);
+ // So(nng_recvmsg(sock2, &msg, 0) ==
+ // 0);
- //CHECKSTR(msg, "abc");
- //nng_msg_free(msg);
+ // CHECKSTR(msg, "abc");
+ // nng_msg_free(msg);
// The notify runs async...
nng_usleep(100000);
So(evcnt1.writeable == 1);
So(evcnt2.readable == 1);
- })
+ });
Convey("We can unregister them", {
nng_unsetnotify(sock1, notify1);
So(1);
nng_unsetnotify(sock2, notify2);
So(1);
- })
- })
- })
- })
+ });
+ });
+ });
+ });
})
diff --git a/tests/pipeline.c b/tests/pipeline.c
index aa01c2d1..510b7f77 100644
--- a/tests/pipeline.c
+++ b/tests/pipeline.c
@@ -8,13 +8,14 @@
//
#include "convey.h"
-#include "nng.h"
#include "core/nng_impl.h"
+#include "nng.h"
#include <string.h>
-#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
-#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\
- So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
+#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
+#define CHECKSTR(m, s) \
+ So(nng_msg_len(m) == strlen(s)); \
+ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
Main({
const char *addr = "inproc://test";
@@ -25,58 +26,54 @@ Main({
Convey("We can create a PUSH socket", {
nng_socket push;
- So(nng_open(&push, NNG_PROTO_PUSH) == 0);
+ So(nng_push_open(&push) == 0);
- Reset({
- nng_close(push);
- })
+ Reset({ nng_close(push); });
Convey("Protocols match", {
So(nng_protocol(push) == NNG_PROTO_PUSH);
So(nng_peer(push) == NNG_PROTO_PULL);
- })
+ });
Convey("Recv fails", {
nng_msg *msg;
So(nng_recvmsg(push, &msg, 0) == NNG_ENOTSUP);
- })
- })
+ });
+ });
Convey("We can create a PULL socket", {
nng_socket pull;
- So(nng_open(&pull, NNG_PROTO_PULL) == 0);
+ So(nng_pull_open(&pull) == 0);
- Reset({
- nng_close(pull);
- })
+ Reset({ nng_close(pull); });
Convey("Protocols match", {
So(nng_protocol(pull) == NNG_PROTO_PULL);
So(nng_peer(pull) == NNG_PROTO_PUSH);
- })
+ });
Convey("Send fails", {
nng_msg *msg;
So(nng_msg_alloc(&msg, 0) == 0);
So(nng_sendmsg(pull, msg, 0) == NNG_ENOTSUP);
nng_msg_free(msg);
- })
- })
+ });
+ });
Convey("We can create a linked PUSH/PULL pair", {
nng_socket push;
nng_socket pull;
nng_socket what;
- So(nng_open(&push, NNG_PROTO_PUSH) == 0);
- So(nng_open(&pull, NNG_PROTO_PULL) == 0);
- So(nng_open(&what, NNG_PROTO_PUSH) == 0);
+ So(nng_push_open(&push) == 0);
+ So(nng_pull_open(&pull) == 0);
+ So(nng_push_open(&what) == 0);
Reset({
nng_close(push);
nng_close(pull);
nng_close(what);
- })
+ });
// Its important to avoid a startup race that the
// sender be the dialer. Otherwise you need a delay
@@ -97,30 +94,30 @@ Main({
So(msg != NULL);
CHECKSTR(msg, "hello");
nng_msg_free(msg);
- })
- })
+ });
+ });
Convey("Load balancing", {
- nng_msg *abc;
- nng_msg *def;
- uint64_t usecs;
- int len;
+ nng_msg * abc;
+ nng_msg * def;
+ uint64_t usecs;
+ int len;
nng_socket push;
nng_socket pull1;
nng_socket pull2;
nng_socket pull3;
- So(nng_open(&push, NNG_PROTO_PUSH) == 0);
- So(nng_open(&pull1, NNG_PROTO_PULL) == 0);
- So(nng_open(&pull2, NNG_PROTO_PULL) == 0);
- So(nng_open(&pull3, NNG_PROTO_PULL) == 0);
+ So(nng_push_open(&push) == 0);
+ So(nng_pull_open(&pull1) == 0);
+ So(nng_pull_open(&pull2) == 0);
+ So(nng_pull_open(&pull3) == 0);
Reset({
nng_close(push);
nng_close(pull1);
nng_close(pull2);
nng_close(pull3);
- })
+ });
// We need to increase the buffer from zero, because
// there is no guarantee that the various listeners
@@ -129,14 +126,22 @@ Main({
// ensures that we can write to each stream, even if
// the listeners are not running yet.
len = 4;
- So(nng_setopt(push, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0);
- So(nng_setopt(push, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0);
- So(nng_setopt(pull1, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0);
- So(nng_setopt(pull1, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0);
- So(nng_setopt(pull2, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0);
- So(nng_setopt(pull2, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0);
- So(nng_setopt(pull3, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0);
- So(nng_setopt(pull3, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0);
+ So(nng_setopt(
+ push, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0);
+ So(nng_setopt(
+ push, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0);
+ So(nng_setopt(
+ pull1, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0);
+ So(nng_setopt(
+ pull1, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0);
+ So(nng_setopt(
+ pull2, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0);
+ So(nng_setopt(
+ pull2, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0);
+ So(nng_setopt(
+ pull3, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0);
+ So(nng_setopt(
+ pull3, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0);
So(nng_msg_alloc(&abc, 0) == 0);
APPENDSTR(abc, "abc");
@@ -144,9 +149,12 @@ Main({
APPENDSTR(def, "def");
usecs = 100000;
- So(nng_setopt(pull1, NNG_OPT_RCVTIMEO, &usecs, sizeof (usecs)) == 0);
- So(nng_setopt(pull2, NNG_OPT_RCVTIMEO, &usecs, sizeof (usecs)) == 0);
- So(nng_setopt(pull3, NNG_OPT_RCVTIMEO, &usecs, sizeof (usecs)) == 0);
+ So(nng_setopt(pull1, NNG_OPT_RCVTIMEO, &usecs,
+ sizeof(usecs)) == 0);
+ So(nng_setopt(pull2, NNG_OPT_RCVTIMEO, &usecs,
+ sizeof(usecs)) == 0);
+ So(nng_setopt(pull3, NNG_OPT_RCVTIMEO, &usecs,
+ sizeof(usecs)) == 0);
So(nng_listen(push, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_dial(pull1, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_dial(pull2, addr, NULL, NNG_FLAG_SYNCH) == 0);
@@ -175,8 +183,8 @@ Main({
So(nng_recvmsg(pull1, &abc, 0) == NNG_ETIMEDOUT);
So(nng_recvmsg(pull2, &abc, 0) == NNG_ETIMEDOUT);
- })
- })
+ });
+ });
nni_fini();
})
diff --git a/tests/pollfd.c b/tests/pollfd.c
index f6b9cf30..e2cd2fb6 100644
--- a/tests/pollfd.c
+++ b/tests/pollfd.c
@@ -11,9 +11,9 @@
#include "nng.h"
#ifndef _WIN32
-#include <unistd.h>
#include <poll.h>
-#define INVALID_SOCKET -1
+#include <unistd.h>
+#define INVALID_SOCKET -1
#else
#define poll WSAPoll
@@ -22,94 +22,92 @@
#endif
#include <windows.h>
-#include <winsock2.h>
+
#include <mswsock.h>
+
+#include <winsock2.h>
#include <ws2tcpip.h>
#endif
-// Inproc tests.
-
-TestMain("Poll FDs", {
-
- Convey("Given a connected pair of sockets", {
- nng_socket s1;
- nng_socket s2;
-
- So(nng_open(&s1, NNG_PROTO_PAIR) == 0);
- So(nng_open(&s2, NNG_PROTO_PAIR) == 0);
- Reset({
- nng_close(s1);
- nng_close(s2);
- })
- So(nng_listen(s1, "inproc://yeahbaby", NULL, 0) == 0);
- nng_usleep(50000);
-
- So(nng_dial(s2, "inproc://yeahbaby", NULL, 0) == 0);
- nng_usleep(50000);
-
- Convey("We can get a recv FD", {
- int fd;
- size_t sz;
-
- sz = sizeof (fd);
- So(nng_getopt(s1, NNG_OPT_RCVFD, &fd, &sz) == 0);
- So(fd != INVALID_SOCKET);
-
- Convey("And they start non pollable", {
- struct pollfd pfd;
- pfd.fd = fd;
- pfd.events = POLLIN;
- pfd.revents = 0;
-
- So(poll(&pfd, 1, 0) == 0);
- So(pfd.revents == 0);
- })
-
- Convey("But if we write they are pollable", {
- struct pollfd pfd;
- pfd.fd = fd;
- pfd.events = POLLIN;
- pfd.revents = 0;
-
- So(nng_send(s2, "kick", 5, 0) == 0);
- So(poll(&pfd, 1, 1000) == 1);
- So((pfd.revents & POLLIN) != 0);
- })
- })
-
- Convey("We can get a send FD", {
- int fd;
- size_t sz;
-
- sz = sizeof (fd);
- So(nng_getopt(s1, NNG_OPT_SNDFD, &fd, &sz) == 0);
- So(fd != INVALID_SOCKET);
- So(nng_send(s1, "oops", 4, 0) == 0);
- })
-
- Convey("We cannot get a send FD for PULL", {
- nng_socket s3;
- int fd;
- size_t sz;
- So(nng_open(&s3, NNG_PROTO_PULL) == 0);
- Reset({
- nng_close(s3);
- })
- sz = sizeof (fd);
- So(nng_getopt(s3, NNG_OPT_SNDFD, &fd, &sz) == NNG_ENOTSUP);
- })
-
- Convey("We cannot get a recv FD for PUSH", {
- nng_socket s3;
- int fd;
- size_t sz;
- So(nng_open(&s3, NNG_PROTO_PUSH) == 0);
- Reset({
- nng_close(s3);
- })
- sz = sizeof (fd);
- So(nng_getopt(s3, NNG_OPT_RCVFD, &fd, &sz) == NNG_ENOTSUP);
- })
- })
-})
+TestMain("Poll FDs",
+ {
+
+ Convey("Given a connected pair of sockets", {
+ nng_socket s1;
+ nng_socket s2;
+
+ So(nng_pair_open(&s1) == 0);
+ So(nng_pair_open(&s2) == 0);
+ Reset({
+ nng_close(s1);
+ nng_close(s2);
+ });
+ So(nng_listen(s1, "inproc://yeahbaby", NULL, 0) == 0);
+ nng_usleep(50000);
+
+ So(nng_dial(s2, "inproc://yeahbaby", NULL, 0) == 0);
+ nng_usleep(50000);
+
+ Convey("We can get a recv FD", {
+ int fd;
+ size_t sz;
+
+ sz = sizeof(fd);
+ So(nng_getopt(s1, NNG_OPT_RCVFD, &fd, &sz) == 0);
+ So(fd != INVALID_SOCKET);
+
+ Convey("And they start non pollable", {
+ struct pollfd pfd;
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+
+ So(poll(&pfd, 1, 0) == 0);
+ So(pfd.revents == 0);
+ });
+
+ Convey("But if we write they are pollable", {
+ struct pollfd pfd;
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+
+ So(nng_send(s2, "kick", 5, 0) == 0);
+ So(poll(&pfd, 1, 1000) == 1);
+ So((pfd.revents & POLLIN) != 0);
+ });
+ });
+
+ Convey("We can get a send FD", {
+ int fd;
+ size_t sz;
+
+ sz = sizeof(fd);
+ So(nng_getopt(s1, NNG_OPT_SNDFD, &fd, &sz) == 0);
+ So(fd != INVALID_SOCKET);
+ So(nng_send(s1, "oops", 4, 0) == 0);
+ });
+
+ Convey("We cannot get a send FD for PULL", {
+ nng_socket s3;
+ int fd;
+ size_t sz;
+ So(nng_pull_open(&s3) == 0);
+ Reset({ nng_close(s3); });
+ sz = sizeof(fd);
+ So(nng_getopt(s3, NNG_OPT_SNDFD, &fd, &sz) ==
+ NNG_ENOTSUP);
+ });
+
+ Convey("We cannot get a recv FD for PUSH", {
+ nng_socket s3;
+ int fd;
+ size_t sz;
+ So(nng_push_open(&s3) == 0);
+ Reset({ nng_close(s3); });
+ sz = sizeof(fd);
+ So(nng_getopt(s3, NNG_OPT_RCVFD, &fd, &sz) ==
+ NNG_ENOTSUP);
+ });
+ }) })
diff --git a/tests/pubsub.c b/tests/pubsub.c
index 94558f39..19d85848 100644
--- a/tests/pubsub.c
+++ b/tests/pubsub.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -8,14 +9,15 @@
//
#include "convey.h"
-#include "nng.h"
#include "core/nng_impl.h"
+#include "nng.h"
#include <string.h>
-#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
-#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\
- So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
+#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
+#define CHECKSTR(m, s) \
+ So(nng_msg_len(m) == strlen(s)); \
+ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
Main({
const char *addr = "inproc://test";
@@ -25,90 +27,95 @@ Main({
Convey("We can create a PUB socket", {
nng_socket pub;
- So(nng_open(&pub, NNG_PROTO_PUB) == 0);
+ So(nng_pub_open(&pub) == 0);
- Reset({
- nng_close(pub);
- })
+ Reset({ nng_close(pub); });
Convey("Protocols match", {
So(nng_protocol(pub) == NNG_PROTO_PUB);
So(nng_peer(pub) == NNG_PROTO_SUB);
- })
+ });
Convey("Recv fails", {
nng_msg *msg;
So(nng_recvmsg(pub, &msg, 0) == NNG_ENOTSUP);
- })
- })
+ });
+ });
Convey("We can create a SUB socket", {
nng_socket sub;
- So(nng_open(&sub, NNG_PROTO_SUB) == 0);
+ So(nng_sub_open(&sub) == 0);
- Reset({
- nng_close(sub);
- })
+ Reset({ nng_close(sub); });
Convey("Protocols match", {
So(nng_protocol(sub) == NNG_PROTO_SUB);
So(nng_peer(sub) == NNG_PROTO_PUB);
- })
+ });
Convey("Send fails", {
nng_msg *msg;
So(nng_msg_alloc(&msg, 0) == 0);
So(nng_sendmsg(sub, msg, 0) == NNG_ENOTSUP);
nng_msg_free(msg);
- })
- })
+ });
+ });
Convey("We can create a linked PUB/SUB pair", {
nng_socket pub;
nng_socket sub;
- So(nng_open(&pub, NNG_PROTO_PUB) == 0);
+ So(nng_pub_open(&pub) == 0);
- So(nng_open(&sub, NNG_PROTO_SUB) == 0);
+ So(nng_sub_open(&sub) == 0);
Reset({
nng_close(pub);
nng_close(sub);
- })
-
- // Most consumers will usually have the pub listen,
- // and the sub dial. However, this creates a problem
- // for our tests, since we can wind up trying to push
- // data before the pipe is fully registered (the
- // accept runs in an asynch thread.) Doing the reverse
- // here ensures that we won't lose data.
+ });
+
+ // Most consumers will usually have the pub
+ // listen, and the sub dial. However, this
+ // creates a problem for our tests, since we
+ // can wind up trying to push data before the
+ // pipe is fully registered (the accept runs in
+ // an asynch thread.) Doing the reverse here
+ // ensures that we won't lose data.
So(nng_listen(sub, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_dial(pub, addr, NULL, NNG_FLAG_SYNCH) == 0);
Convey("Sub can subscribe", {
- So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "ABC", 3) == 0);
- So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "", 0) == 0);
+ So(nng_setopt(
+ sub, NNG_OPT_SUBSCRIBE, "ABC", 3) == 0);
+ So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "", 0) ==
+ 0);
Convey("Unsubscribe works", {
- So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, "ABC", 3) == 0);
- So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, "", 0) == 0);
-
- So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, "", 0) == NNG_ENOENT);
- So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, "HELLO", 0) == NNG_ENOENT);
- })
- })
+ So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE,
+ "ABC", 3) == 0);
+ So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE,
+ "", 0) == 0);
+
+ So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE,
+ "", 0) == NNG_ENOENT);
+ So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE,
+ "HELLO", 0) == NNG_ENOENT);
+ });
+ });
Convey("Pub cannot subscribe", {
- So(nng_setopt(pub, NNG_OPT_SUBSCRIBE, "", 0) == NNG_ENOTSUP);
- })
+ So(nng_setopt(pub, NNG_OPT_SUBSCRIBE, "", 0) ==
+ NNG_ENOTSUP);
+ });
Convey("Subs can receive from pubs", {
nng_msg *msg;
uint64_t rtimeo;
-
- So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "/some/", strlen("/some/")) == 0);
+ So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "/some/",
+ strlen("/some/")) == 0);
rtimeo = 50000; // 50ms
- So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0);
+ So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo,
+ sizeof(rtimeo)) == 0);
So(nng_msg_alloc(&msg, 0) == 0);
APPENDSTR(msg, "/some/like/it/hot");
@@ -132,28 +139,31 @@ Main({
So(nng_recvmsg(sub, &msg, 0) == 0);
CHECKSTR(msg, "/some/day/some/how");
nng_msg_free(msg);
- })
+ });
Convey("Subs without subsciptions don't receive", {
uint64_t rtimeo = 50000; // 50ms
nng_msg *msg;
- So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0);
+ So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo,
+ sizeof(rtimeo)) == 0);
So(nng_msg_alloc(&msg, 0) == 0);
APPENDSTR(msg, "/some/don't/like/it");
So(nng_sendmsg(pub, msg, 0) == 0);
So(nng_recvmsg(sub, &msg, 0) == NNG_ETIMEDOUT);
- })
+ });
Convey("Subs in raw receive", {
uint64_t rtimeo = 50000; // 500ms
- int raw = 1;
+ int raw = 1;
nng_msg *msg;
- So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0);
- So(nng_setopt(sub, NNG_OPT_RAW, &raw, sizeof (raw)) == 0);
+ So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo,
+ sizeof(rtimeo)) == 0);
+ So(nng_setopt(sub, NNG_OPT_RAW, &raw,
+ sizeof(raw)) == 0);
So(nng_msg_alloc(&msg, 0) == 0);
APPENDSTR(msg, "/some/like/it/raw");
@@ -161,10 +171,9 @@ Main({
So(nng_recvmsg(sub, &msg, 0) == 0);
CHECKSTR(msg, "/some/like/it/raw");
nng_msg_free(msg);
- })
-
- })
- })
+ });
+ });
+ });
nni_fini();
})
diff --git a/tests/reqrep.c b/tests/reqrep.c
index d4040bdb..cadd0a5f 100644
--- a/tests/reqrep.c
+++ b/tests/reqrep.c
@@ -8,13 +8,13 @@
//
#include "convey.h"
-#include "nng.h"
#include "core/nng_impl.h"
+#include "nng.h"
#include <string.h>
Main({
- int rv;
+ int rv;
const char *addr = "inproc://test";
nni_init();
@@ -22,36 +22,32 @@ Main({
Convey("We can create a REQ socket", {
nng_socket req;
- So(nng_open(&req, NNG_PROTO_REQ) == 0);
+ So(nng_req_open(&req) == 0);
- Reset({
- nng_close(req);
- })
+ Reset({ nng_close(req); });
Convey("Protocols match", {
So(nng_protocol(req) == NNG_PROTO_REQ);
So(nng_peer(req) == NNG_PROTO_REP);
- })
+ });
Convey("Recv with no send fails", {
nng_msg *msg;
rv = nng_recvmsg(req, &msg, 0);
So(rv == NNG_ESTATE);
- })
- })
+ });
+ });
Convey("We can create a REP socket", {
nng_socket rep;
- So(nng_open(&rep, NNG_PROTO_REP) == 0);
+ So(nng_rep_open(&rep) == 0);
- Reset({
- nng_close(rep);
- })
+ Reset({ nng_close(rep); });
Convey("Protocols match", {
So(nng_protocol(rep) == NNG_PROTO_REP);
So(nng_peer(rep) == NNG_PROTO_REQ);
- })
+ });
Convey("Send with no recv fails", {
nng_msg *msg;
@@ -60,21 +56,21 @@ Main({
rv = nng_sendmsg(rep, msg, 0);
So(rv == NNG_ESTATE);
nng_msg_free(msg);
- })
- })
+ });
+ });
Convey("We can create a linked REQ/REP pair", {
nng_socket req;
nng_socket rep;
- So(nng_open(&rep, NNG_PROTO_REP) == 0);
+ So(nng_rep_open(&rep) == 0);
- So(nng_open(&req, NNG_PROTO_REQ) == 0);
+ So(nng_req_open(&req) == 0);
Reset({
nng_close(rep);
nng_close(req);
- })
+ });
So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_dial(req, addr, NULL, NNG_FLAG_SYNCH) == 0);
@@ -102,31 +98,33 @@ Main({
So(nng_msg_len(ping) == 5);
So(memcmp(nng_msg_body(ping), "pong", 5) == 0);
nng_msg_free(ping);
- })
- })
+ });
+ });
Convey("Request cancellation works", {
nng_msg *abc;
nng_msg *def;
nng_msg *cmd;
- uint64_t retry = 100000; // 100 ms
- size_t len;
+ uint64_t retry = 100000; // 100 ms
+ size_t len;
nng_socket req;
nng_socket rep;
- So(nng_open(&rep, NNG_PROTO_REP) == 0);
+ So(nng_rep_open(&rep) == 0);
- So(nng_open(&req, NNG_PROTO_REQ) == 0);
+ So(nng_req_open(&req) == 0);
Reset({
nng_close(rep);
nng_close(req);
- })
+ });
- So(nng_setopt(req, NNG_OPT_RESENDTIME, &retry, sizeof (retry)) == 0);
+ So(nng_setopt(req, NNG_OPT_RESENDTIME, &retry,
+ sizeof(retry)) == 0);
len = 16;
- So(nng_setopt(req, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0);
+ So(nng_setopt(
+ req, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0);
So(nng_msg_alloc(&abc, 0) == 0);
So(nng_msg_append(abc, "abc", 4) == 0);
@@ -149,8 +147,8 @@ Main({
So(nng_msg_len(cmd) == 4);
So(memcmp(nng_msg_body(cmd), "def", 4) == 0);
nng_msg_free(cmd);
- })
- })
+ });
+ });
nni_fini();
})
diff --git a/tests/scalability.c b/tests/scalability.c
index 1e0ea2a9..e9422f8e 100644
--- a/tests/scalability.c
+++ b/tests/scalability.c
@@ -43,7 +43,7 @@ openclients(nng_socket *clients, int num)
int i;
uint64_t t;
for (i = 0; i < num; i++) {
- if ((rv = nng_open(&clients[i], NNG_PROTO_REQ)) != 0) {
+ if ((rv = nng_req_open(&clients[i])) != 0) {
printf("open #%d: %s\n", i, nng_strerror(rv));
return (rv);
}
@@ -120,7 +120,7 @@ Main({
clients = calloc(nclients, sizeof(nng_socket));
results = calloc(nclients, sizeof(int));
- if ((nng_open(&rep, NNG_PROTO_REP) != 0) ||
+ if ((nng_rep_open(&rep) != 0) ||
(nng_setopt(rep, NNG_OPT_RCVBUF, &depth, sizeof(depth)) != 0) ||
(nng_setopt(rep, NNG_OPT_SNDBUF, &depth, sizeof(depth)) != 0) ||
(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) != 0) ||
diff --git a/tests/sock.c b/tests/sock.c
index 6c6d5b06..27599e47 100644
--- a/tests/sock.c
+++ b/tests/sock.c
@@ -8,175 +8,185 @@
//
#include "convey.h"
-#include "nng.h"
#include "core/nng_impl.h"
+#include "nng.h"
#include <string.h>
Main({
Test("Socket Operations", {
- Convey("We are able to open a PAIR socket", {
- int rv;
- nng_socket sock;
-
- So(nng_open(&sock, NNG_PROTO_PAIR) == 0);
-
- Reset({
- nng_close(sock);
- })
-
- Convey("And we can shut it down", {
- rv = nng_shutdown(sock);
- So(rv == 0);
- rv = nng_shutdown(sock);
- So(rv == NNG_ECLOSED);
- })
-
- Convey("It's type is still proto", {
- So(nng_protocol(sock) == NNG_PROTO_PAIR);
- })
-
- Convey("Recv with no pipes times out correctly", {
- nng_msg *msg = NULL;
- int64_t when = 100000;
- uint64_t now;
-
- now = nni_clock();
-
- rv = nng_setopt(sock, NNG_OPT_RCVTIMEO, &when,
- sizeof (when));
- So(rv == 0);
- rv = nng_recvmsg(sock, &msg, 0);
- So(rv == NNG_ETIMEDOUT);
- So(msg == NULL);
- So(nni_clock() >= (now + when));
- So(nni_clock() < (now + (when * 2)));
- })
-
- Convey("Recv nonblock with no pipes gives EAGAIN", {
- nng_msg *msg = NULL;
- rv = nng_recvmsg(sock, &msg, NNG_FLAG_NONBLOCK);
- So(rv == NNG_EAGAIN);
- So(msg == NULL);
- })
-
- Convey("Send with no pipes times out correctly", {
- nng_msg *msg = NULL;
- int64_t when = 100000;
- uint64_t now;
-
- // We cheat to get access to the core's clock.
- So(nng_msg_alloc(&msg, 0) == 0);
- So(msg != NULL);
- now = nni_clock();
-
- rv = nng_setopt(sock, NNG_OPT_SNDTIMEO, &when,
- sizeof (when));
- So(rv == 0);
- rv = nng_sendmsg(sock, msg, 0);
- So(rv == NNG_ETIMEDOUT);
- So(nni_clock() >= (now + when));
- So(nni_clock() < (now + (when * 2)));
- nng_msg_free(msg);
- })
-
- Convey("We can set and get options", {
- int64_t when = 1234;
- int64_t check = 0;
- size_t sz;
- rv = nng_setopt(sock, NNG_OPT_SNDTIMEO, &when,
- sizeof (when));
- So(rv == 0);
- sz = sizeof (check);
- Convey("Short size is not copied", {
- sz = 0;
- rv = nng_getopt(sock, NNG_OPT_SNDTIMEO,
- &check, &sz);
+ Convey("We are able to open a PAIR socket", {
+ int rv;
+ nng_socket sock;
+
+ So(nng_pair_open(&sock) == 0);
+
+ Reset({ nng_close(sock); });
+
+ Convey("And we can shut it down", {
+ rv = nng_shutdown(sock);
So(rv == 0);
- So(sz == sizeof (check));
- So(check == 0);
- })
- Convey("Correct size is copied", {
- sz = sizeof (check);
- rv = nng_getopt(sock, NNG_OPT_SNDTIMEO, &check,
- &sz);
+ rv = nng_shutdown(sock);
+ So(rv == NNG_ECLOSED);
+ });
+
+ Convey("It's type is still proto",
+ { So(nng_protocol(sock) == NNG_PROTO_PAIR); });
+
+ Convey("Recv with no pipes times out correctly", {
+ nng_msg *msg = NULL;
+ int64_t when = 100000;
+ uint64_t now;
+
+ now = nni_clock();
+
+ rv = nng_setopt(sock, NNG_OPT_RCVTIMEO, &when,
+ sizeof(when));
So(rv == 0);
- So(sz == sizeof (check));
- So(check == 1234);
- })
- })
-
- Convey("Bogus URLs not supported", {
- Convey("Dialing fails properly", {
- rv = nng_dial(sock, "bogus://somewhere", NULL, 0);
- So(rv == NNG_ENOTSUP);
- })
- Convey("Listening fails properly", {
- rv = nng_listen(sock, "bogus://elsewhere", NULL, 0);
- So(rv == NNG_ENOTSUP);
- })
- })
-
- Convey("Dialing synch can get refused", {
- rv = nng_dial(sock, "inproc://notthere", NULL, NNG_FLAG_SYNCH);
- So(rv == NNG_ECONNREFUSED);
- })
-
- Convey("Listening works", {
- rv = nng_listen(sock, "inproc://here", NULL, NNG_FLAG_SYNCH);
- So(rv == 0);
-
- Convey("Second listen fails ADDRINUSE", {
- rv = nng_listen(sock, "inproc://here", NULL, NNG_FLAG_SYNCH);
- So(rv == NNG_EADDRINUSE);
- })
-
- Convey("We can connect to it", {
- nng_socket sock2;
- So(nng_open(&sock2, NNG_PROTO_PAIR) == 0);
- Reset({
- nng_close(sock2);
+ rv = nng_recvmsg(sock, &msg, 0);
+ So(rv == NNG_ETIMEDOUT);
+ So(msg == NULL);
+ So(nni_clock() >= (now + when));
+ So(nni_clock() < (now + (when * 2)));
+ });
+
+ Convey("Recv nonblock with no pipes gives EAGAIN", {
+ nng_msg *msg = NULL;
+ rv =
+ nng_recvmsg(sock, &msg, NNG_FLAG_NONBLOCK);
+ So(rv == NNG_EAGAIN);
+ So(msg == NULL);
+ });
+
+ Convey("Send with no pipes times out correctly", {
+ nng_msg *msg = NULL;
+ int64_t when = 100000;
+ uint64_t now;
+
+ // We cheat to get access to the core's clock.
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(msg != NULL);
+ now = nni_clock();
+
+ rv = nng_setopt(sock, NNG_OPT_SNDTIMEO, &when,
+ sizeof(when));
+ So(rv == 0);
+ rv = nng_sendmsg(sock, msg, 0);
+ So(rv == NNG_ETIMEDOUT);
+ So(nni_clock() >= (now + when));
+ So(nni_clock() < (now + (when * 2)));
+ nng_msg_free(msg);
+ });
+
+ Convey("We can set and get options", {
+ int64_t when = 1234;
+ int64_t check = 0;
+ size_t sz;
+ rv = nng_setopt(sock, NNG_OPT_SNDTIMEO, &when,
+ sizeof(when));
+ So(rv == 0);
+ sz = sizeof(check);
+ Convey("Short size is not copied", {
+ sz = 0;
+ rv = nng_getopt(sock, NNG_OPT_SNDTIMEO,
+ &check, &sz);
+ So(rv == 0);
+ So(sz == sizeof(check));
+ So(check == 0);
+ }) Convey("Correct size is copied", {
+ sz = sizeof(check);
+ rv = nng_getopt(sock, NNG_OPT_SNDTIMEO,
+ &check, &sz);
+ So(rv == 0);
+ So(sz == sizeof(check));
+ So(check == 1234);
})
- rv = nng_dial(sock2, "inproc://here", NULL, NNG_FLAG_SYNCH);
+ });
+
+ Convey("Bogus URLs not supported", {
+ Convey("Dialing fails properly", {
+ rv = nng_dial(sock,
+ "bogus://somewhere", NULL, 0);
+ So(rv == NNG_ENOTSUP);
+ });
+ Convey("Listening fails properly", {
+ rv = nng_listen(sock,
+ "bogus://elsewhere", NULL, 0);
+ So(rv == NNG_ENOTSUP);
+ });
+ });
+
+ Convey("Dialing synch can get refused", {
+ rv = nng_dial(sock, "inproc://notthere", NULL,
+ NNG_FLAG_SYNCH);
+ So(rv == NNG_ECONNREFUSED);
+ });
+
+ Convey("Listening works", {
+ rv = nng_listen(sock, "inproc://here", NULL,
+ NNG_FLAG_SYNCH);
So(rv == 0);
- nng_close(sock2);
- })
- })
-
- Convey("We can send and receive messages", {
- nng_socket sock2;
- int len = 1;
- size_t sz;
- uint64_t second = 3000000;
- char *buf;
-
- So(nng_open(&sock2, NNG_PROTO_PAIR) == 0);
- Reset({
- nng_close(sock2);
- })
-
- So(nng_setopt(sock, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0);
- So(nng_setopt(sock, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0);
-
- So(nng_setopt(sock2, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0);
- So(nng_setopt(sock2, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0);
-
- So(nng_setopt(sock, NNG_OPT_SNDTIMEO, &second, sizeof (second)) == 0);
- So(nng_setopt(sock, NNG_OPT_RCVTIMEO, &second, sizeof (second)) == 0);
- So(nng_setopt(sock2, NNG_OPT_SNDTIMEO, &second, sizeof (second)) == 0);
- So(nng_setopt(sock2, NNG_OPT_RCVTIMEO, &second, sizeof (second)) == 0);
-
- So(nng_listen(sock, "inproc://test1", NULL, NNG_FLAG_SYNCH) == 0);
- So(nng_dial(sock2, "inproc://test1", NULL, NNG_FLAG_SYNCH) == 0);
-
- So(nng_send(sock, "abc", 4, 0) == 0);
- So(nng_recv(sock2 , &buf, &sz, NNG_FLAG_ALLOC) == 0);
- So(buf != NULL);
- So(sz == 4);
- So(memcmp(buf, "abc", 4) == 0);
- nng_free(buf, sz);
- })
- })
- })
+
+ Convey("Second listen fails ADDRINUSE", {
+ rv = nng_listen(sock, "inproc://here",
+ NULL, NNG_FLAG_SYNCH);
+ So(rv == NNG_EADDRINUSE);
+ });
+
+ Convey("We can connect to it", {
+ nng_socket sock2;
+ So(nng_pair_open(&sock2) == 0);
+ Reset({ nng_close(sock2); });
+ rv = nng_dial(sock2, "inproc://here",
+ NULL, NNG_FLAG_SYNCH);
+ So(rv == 0);
+ nng_close(sock2);
+ });
+ });
+
+ Convey("We can send and receive messages", {
+ nng_socket sock2;
+ int len = 1;
+ size_t sz;
+ uint64_t second = 3000000;
+ char * buf;
+
+ So(nng_pair_open(&sock2) == 0);
+ Reset({ nng_close(sock2); });
+
+ So(nng_setopt(sock, NNG_OPT_RCVBUF, &len,
+ sizeof(len)) == 0);
+ So(nng_setopt(sock, NNG_OPT_SNDBUF, &len,
+ sizeof(len)) == 0);
+
+ So(nng_setopt(sock2, NNG_OPT_RCVBUF, &len,
+ sizeof(len)) == 0);
+ So(nng_setopt(sock2, NNG_OPT_SNDBUF, &len,
+ sizeof(len)) == 0);
+
+ So(nng_setopt(sock, NNG_OPT_SNDTIMEO, &second,
+ sizeof(second)) == 0);
+ So(nng_setopt(sock, NNG_OPT_RCVTIMEO, &second,
+ sizeof(second)) == 0);
+ So(nng_setopt(sock2, NNG_OPT_SNDTIMEO, &second,
+ sizeof(second)) == 0);
+ So(nng_setopt(sock2, NNG_OPT_RCVTIMEO, &second,
+ sizeof(second)) == 0);
+
+ So(nng_listen(sock, "inproc://test1", NULL,
+ NNG_FLAG_SYNCH) == 0);
+ So(nng_dial(sock2, "inproc://test1", NULL,
+ NNG_FLAG_SYNCH) == 0);
+
+ So(nng_send(sock, "abc", 4, 0) == 0);
+ So(nng_recv(
+ sock2, &buf, &sz, NNG_FLAG_ALLOC) == 0);
+ So(buf != NULL);
+ So(sz == 4);
+ So(memcmp(buf, "abc", 4) == 0);
+ nng_free(buf, sz);
+ });
+ });
+ });
})
diff --git a/tests/survey.c b/tests/survey.c
index a7dd3cca..6f85850f 100644
--- a/tests/survey.c
+++ b/tests/survey.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -8,14 +9,15 @@
//
#include "convey.h"
-#include "nng.h"
#include "core/nng_impl.h"
+#include "nng.h"
#include <string.h>
-#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
-#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\
- So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
+#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
+#define CHECKSTR(m, s) \
+ So(nng_msg_len(m) == strlen(s)); \
+ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
Main({
const char *addr = "inproc://test";
@@ -23,113 +25,129 @@ Main({
nni_init();
Test("SURVEY pattern", {
- Convey("We can create a SURVEYOR socket", {
- nng_socket surv;
-
- So(nng_open(&surv, NNG_PROTO_SURVEYOR) == 0);
-
- Reset({
- nng_close(surv);
- })
-
- Convey("Protocols match", {
- So(nng_protocol(surv) == NNG_PROTO_SURVEYOR);
- So(nng_peer(surv) == NNG_PROTO_RESPONDENT);
- })
-
- Convey("Recv with no survey fails", {
- nng_msg *msg;
- So(nng_recvmsg(surv, &msg, 0) == NNG_ESTATE);
- })
-
- Convey("Survey without responder times out", {
- uint64_t expire = 50000;
- nng_msg *msg;
-
- So(nng_setopt(surv, NNG_OPT_SURVEYTIME, &expire, sizeof (expire)) == 0);
- So(nng_msg_alloc(&msg, 0) == 0);
- So(nng_sendmsg(surv, msg, 0) == 0);
- So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
- })
- })
-
- Convey("We can create a RESPONDENT socket", {
- nng_socket resp;
- So(nng_open(&resp, NNG_PROTO_RESPONDENT) == 0);
-
- Reset({
- nng_close(resp);
- })
-
- Convey("Protocols match", {
- So(nng_protocol(resp) == NNG_PROTO_RESPONDENT);
- So(nng_peer(resp) == NNG_PROTO_SURVEYOR);
- })
-
- Convey("Send fails with no suvey", {
- nng_msg *msg;
- So(nng_msg_alloc(&msg, 0) == 0);
- So(nng_sendmsg(resp, msg, 0) == NNG_ESTATE);
- nng_msg_free(msg);
- })
- })
-
- Convey("We can create a linked survey pair", {
- nng_socket surv;
- nng_socket resp;
- nng_socket sock;
- uint64_t expire;
-
- So(nng_open(&surv, NNG_PROTO_SURVEYOR) == 0);
- So(nng_open(&resp, NNG_PROTO_RESPONDENT) == 0);
-
- Reset({
- nng_close(surv);
- nng_close(resp);
- })
-
- expire = 50000;
- So(nng_setopt(surv, NNG_OPT_SURVEYTIME, &expire, sizeof (expire)) == 0);
-
- So(nng_listen(surv, addr, NULL, NNG_FLAG_SYNCH) == 0);
- So(nng_dial(resp, addr, NULL, NNG_FLAG_SYNCH) == 0);
-
- // We dial another socket as that will force the
- // earlier dial to have completed *fully*. This is a
- // hack that only works because our listen logic is
- // single threaded.
- So(nng_open(&sock, NNG_PROTO_RESPONDENT) == 0);
- So(nng_dial(sock, addr, NULL, NNG_FLAG_SYNCH) == 0);
- nng_close(sock);
-
- Convey("Survey works", {
- nng_msg *msg;
- uint64_t rtimeo;
-
- So(nng_msg_alloc(&msg, 0) == 0);
- APPENDSTR(msg, "abc");
- So(nng_sendmsg(surv, msg, 0) == 0);
- msg = NULL;
- So(nng_recvmsg(resp, &msg, 0) == 0);
- CHECKSTR(msg, "abc");
- nng_msg_trunc(msg, 3);
- APPENDSTR(msg, "def");
- So(nng_sendmsg(resp, msg, 0) == 0);
- msg = NULL;
- So(nng_recvmsg(surv, &msg, 0) == 0);
- CHECKSTR(msg, "def");
- nng_msg_free(msg);
-
- So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
-
- Convey("And goes to non-survey state", {
- rtimeo = 200000;
- So(nng_setopt(surv, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0);
- So(nng_recvmsg(surv, &msg, 0) == NNG_ESTATE);
- })
- })
- })
- })
+ Convey("We can create a SURVEYOR socket",
+ {
+ nng_socket surv;
+
+ So(nng_surveyor_open(&surv) == 0);
+
+ Reset({ nng_close(surv); });
+
+ Convey("Protocols match", {
+ So(nng_protocol(surv) ==
+ NNG_PROTO_SURVEYOR);
+ So(nng_peer(surv) == NNG_PROTO_RESPONDENT);
+ });
+
+ Convey("Recv with no survey fails", {
+ nng_msg *msg;
+ So(nng_recvmsg(surv, &msg, 0) ==
+ NNG_ESTATE);
+ });
+
+ Convey("Survey without responder times out", {
+ uint64_t expire = 50000;
+ nng_msg *msg;
+
+ So(nng_setopt(surv, NNG_OPT_SURVEYTIME,
+ &expire, sizeof(expire)) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(surv, &msg, 0) ==
+ NNG_ETIMEDOUT);
+ });
+ })
+
+ Convey("We can create a RESPONDENT socket",
+ {
+ nng_socket resp;
+ So(nng_respondent_open(&resp) == 0);
+
+ Reset({ nng_close(resp); });
+
+ Convey("Protocols match", {
+ So(nng_protocol(resp) ==
+ NNG_PROTO_RESPONDENT);
+ So(nng_peer(resp) ==
+ NNG_PROTO_SURVEYOR);
+ });
+
+ Convey("Send fails with no suvey", {
+ nng_msg *msg;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(resp, msg, 0) ==
+ NNG_ESTATE);
+ nng_msg_free(msg);
+ });
+ })
+
+ Convey("We can create a linked survey pair", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_socket sock;
+ uint64_t expire;
+
+ So(nng_surveyor_open(&surv) == 0);
+ So(nng_respondent_open(&resp) == 0);
+
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+
+ expire = 50000;
+ So(nng_setopt(surv, NNG_OPT_SURVEYTIME,
+ &expire, sizeof(expire)) == 0);
+
+ So(nng_listen(
+ surv, addr, NULL, NNG_FLAG_SYNCH) == 0);
+ So(nng_dial(
+ resp, addr, NULL, NNG_FLAG_SYNCH) == 0);
+
+ // We dial another socket as that will force
+ // the earlier dial to have completed *fully*.
+ // This is a hack that only works because our
+ // listen logic is single threaded.
+ So(nng_respondent_open(&sock) == 0);
+ So(nng_dial(
+ sock, addr, NULL, NNG_FLAG_SYNCH) == 0);
+ nng_close(sock);
+
+ Convey("Survey works", {
+ nng_msg *msg;
+ uint64_t rtimeo;
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "abc");
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ msg = NULL;
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ CHECKSTR(msg, "abc");
+ nng_msg_trunc(msg, 3);
+ APPENDSTR(msg, "def");
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ msg = NULL;
+ So(nng_recvmsg(surv, &msg, 0) == 0);
+ CHECKSTR(msg, "def");
+ nng_msg_free(msg);
+
+ So(nng_recvmsg(surv, &msg, 0) ==
+ NNG_ETIMEDOUT);
+
+ Convey(
+ "And goes to non-survey state", {
+ rtimeo = 200000;
+ So(nng_setopt(surv,
+ NNG_OPT_RCVTIMEO,
+ &rtimeo,
+ sizeof(rtimeo)) ==
+ 0);
+ So(nng_recvmsg(surv, &msg,
+ 0) == NNG_ESTATE);
+ });
+ });
+ });
+ });
nni_fini();
})
diff --git a/tests/tcp.c b/tests/tcp.c
index b37e239c..3910cb7f 100644
--- a/tests/tcp.c
+++ b/tests/tcp.c
@@ -10,36 +10,34 @@
#include "convey.h"
#include "trantest.h"
-
// Inproc tests.
TestMain("TCP Transport", {
trantest_test_all("tcp://127.0.0.1:4450");
-
Convey("We cannot connect to wild cards", {
nng_socket s;
- So(nng_open(&s, NNG_PROTO_PAIR) == 0);
- Reset({
- nng_close(s);
- })
- So(nng_dial(s, "tcp://*:5555", NULL, NNG_FLAG_SYNCH) == NNG_EADDRINVAL);
- })
+ So(nng_pair_open(&s) == 0);
+ Reset({ nng_close(s); });
+ So(nng_dial(s, "tcp://*:5555", NULL, NNG_FLAG_SYNCH) ==
+ NNG_EADDRINVAL);
+ });
Convey("We can bind to wild card", {
nng_socket s1;
nng_socket s2;
- So(nng_open(&s1, NNG_PROTO_PAIR) == 0);
- So(nng_open(&s2, NNG_PROTO_PAIR) == 0);
+ So(nng_pair_open(&s1) == 0);
+ So(nng_pair_open(&s2) == 0);
Reset({
nng_close(s2);
nng_close(s1);
- })
+ });
So(nng_listen(s1, "tcp://*:5771", NULL, NNG_FLAG_SYNCH) == 0);
- So(nng_dial(s2, "tcp://127.0.0.1:5771", NULL, NNG_FLAG_SYNCH) == 0);
- })
+ So(nng_dial(
+ s2, "tcp://127.0.0.1:5771", NULL, NNG_FLAG_SYNCH) == 0);
+ });
nng_fini();
})
diff --git a/tests/trantest.h b/tests/trantest.h
index fe6e5431..173c6b45 100644
--- a/tests/trantest.h
+++ b/tests/trantest.h
@@ -29,8 +29,8 @@ void
trantest_init(trantest *tt, const char *addr)
{
(void) snprintf(tt->addr, sizeof(tt->addr), "%s", addr);
- So(nng_open(&tt->reqsock, NNG_PROTO_REQ) == 0);
- So(nng_open(&tt->repsock, NNG_PROTO_REP) == 0);
+ So(nng_req_open(&tt->reqsock) == 0);
+ So(nng_rep_open(&tt->repsock) == 0);
tt->tran = nni_tran_find(addr);
So(tt->tran != NULL);