diff options
33 files changed, 1003 insertions, 1140 deletions
diff --git a/perf/perf.c b/perf/perf.c index 1319fffc..628bbec7 100644 --- a/perf/perf.c +++ b/perf/perf.c @@ -354,17 +354,15 @@ throughput_server(const char *addr, int msgsize, int count) nng_msg * msg; int rv; int i; - size_t len; uint64_t start, end; double msgpersec, mbps, total; if ((rv = nng_pair_open(&s)) != 0) { die("nng_socket: %s", nng_strerror(rv)); } - len = 128; - rv = nng_setopt(s, NNG_OPT_RCVBUF, &len, sizeof(len)); + rv = nng_setopt_int(s, nng_optid_recvbuf, 128); if (rv != 0) { - die("nng_setopt(NNG_OPT_RCVBUF): %s", nng_strerror(rv)); + die("nng_setopt(nng_optid_recvbuf): %s", nng_strerror(rv)); } // XXX: set no delay @@ -411,7 +409,6 @@ throughput_client(const char *addr, int msgsize, int count) nng_msg * msg; int rv; int i; - int len; // We send one extra zero length message to start the timer. count++; @@ -423,10 +420,9 @@ throughput_client(const char *addr, int msgsize, int count) // XXX: set no delay // XXX: other options (TLS in the future?, Linger?) - len = 128; - rv = nng_setopt(s, NNG_OPT_SNDBUF, &len, sizeof(len)); + rv = nng_setopt_int(s, nng_optid_sendbuf, 128); if (rv != 0) { - die("nng_setopt(NNG_OPT_SNDBUF): %s", nng_strerror(rv)); + die("nng_setopt(nng_optid_sendbuf): %s", nng_strerror(rv)); } if ((rv = nng_dial(s, addr, NULL, 0)) != 0) { diff --git a/src/core/aio.c b/src/core/aio.c index 5b8c7970..ee5724a6 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -171,7 +171,8 @@ nni_aio_finish_impl( { nni_mtx_lock(&nni_aio_lk); - NNI_ASSERT(aio->a_pend == 0); // provider only calls us *once* + // provider only calls us *once*, but expiration may be in flight + NNI_ASSERT(aio->a_expiring || aio->a_pend == 0); nni_list_node_remove(&aio->a_expire_node); aio->a_pend = 1; diff --git a/src/core/device.c b/src/core/device.c index bdfcf0a6..e7140664 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -90,10 +91,10 @@ nni_device(nni_sock *sock1, nni_sock *sock2) // No timeouts. sz = sizeof(never); - if ((nni_sock_setopt(sock1, NNG_OPT_RCVTIMEO, &never, sz) != 0) || - (nni_sock_setopt(sock2, NNG_OPT_RCVTIMEO, &never, sz) != 0) || - (nni_sock_setopt(sock1, NNG_OPT_SNDTIMEO, &never, sz) != 0) || - (nni_sock_setopt(sock2, NNG_OPT_SNDTIMEO, &never, sz) != 0)) { + if ((nni_sock_setopt(sock1, nng_optid_recvtimeo, &never, sz) != 0) || + (nni_sock_setopt(sock2, nng_optid_recvtimeo, &never, sz) != 0) || + (nni_sock_setopt(sock1, nng_optid_sendtimeo, &never, sz) != 0) || + (nni_sock_setopt(sock2, nng_optid_sendtimeo, &never, sz) != 0)) { // This should never happen. rv = NNG_EINVAL; goto out; diff --git a/src/core/endpt.c b/src/core/endpt.c index 5b56b784..10608575 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -320,7 +320,8 @@ nni_ep_tmo_start(nni_ep *ep) // have a statistically perfect distribution with the modulo of // the random number, but this really doesn't matter. - ep->ep_tmo_aio.a_expire = nni_clock() + (nni_random() % backoff); + ep->ep_tmo_aio.a_expire = + nni_clock() + (backoff ? nni_random() % backoff : 0); nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep); } diff --git a/src/core/options.c b/src/core/options.c index bdbcaaba..7a5de33d 100644 --- a/src/core/options.c +++ b/src/core/options.c @@ -340,6 +340,20 @@ nni_option_register(const char *name, int *idp) return (0); } +void +nni_option_sys_fini(void) +{ + if (nni_option_nextid != 0) { + nni_option *opt; + while ((opt = nni_list_first(&nni_options)) != NULL) { + nni_list_remove(&nni_options, opt); + nni_free(opt->o_name, strlen(opt->o_name) + 1); + NNI_FREE_STRUCT(opt); + } + } + nni_option_nextid = 0; +} + int nni_option_sys_init(void) { @@ -348,52 +362,32 @@ nni_option_sys_init(void) nni_option_nextid = 0x10000; int rv; +#define OPT_REGISTER(o) nni_option_register(nng_opt_##o, &nng_optid_##o) // Register our well-known options. - if (((rv = nni_option_set_id("raw", NNG_OPT_RAW)) != 0) || - ((rv = nni_option_set_id("linger", NNG_OPT_LINGER)) != 0) || - ((rv = nni_option_set_id("recv-buf", NNG_OPT_RCVBUF)) != 0) || - ((rv = nni_option_set_id("send-buf", NNG_OPT_SNDBUF)) != 0) || - ((rv = nni_option_set_id("recv-timeout", NNG_OPT_RCVTIMEO)) != - 0) || - ((rv = nni_option_set_id("send-timeout", NNG_OPT_SNDTIMEO)) != - 0) || - ((rv = nni_option_set_id("reconnect-time", NNG_OPT_RECONN_TIME)) != - 0) || - ((rv = nni_option_set_id( - "reconnect-max-time", NNG_OPT_RECONN_MAXTIME)) != 0) || - ((rv = nni_option_set_id("recv-max-size", NNG_OPT_RCVMAXSZ)) != - 0) || - ((rv = nni_option_set_id("max-ttl", NNG_OPT_MAXTTL)) != 0) || - ((rv = nni_option_set_id("protocol", NNG_OPT_PROTOCOL)) != 0) || - ((rv = nni_option_set_id("subscribe", NNG_OPT_SUBSCRIBE)) != 0) || - ((rv = nni_option_set_id("unsubscribe", NNG_OPT_UNSUBSCRIBE)) != - 0) || - ((rv = nni_option_set_id("survey-time", NNG_OPT_SURVEYTIME)) != - 0) || - ((rv = nni_option_set_id("resend-time", NNG_OPT_RESENDTIME)) != - 0) || - ((rv = nni_option_set_id("transport", NNG_OPT_TRANSPORT)) != 0) || - ((rv = nni_option_set_id("local-addr", NNG_OPT_LOCALADDR)) != 0) || - ((rv = nni_option_set_id("remote-addr", NNG_OPT_REMOTEADDR)) != - 0) || - ((rv = nni_option_set_id("recv-fd", NNG_OPT_RCVFD)) != 0) || - ((rv = nni_option_set_id("send-fd", NNG_OPT_SNDFD)) != 0)) { + if (((rv = OPT_REGISTER(raw)) != 0) || + ((rv = OPT_REGISTER(linger)) != 0) || + ((rv = OPT_REGISTER(recvbuf)) != 0) || + ((rv = OPT_REGISTER(sendbuf)) != 0) || + ((rv = OPT_REGISTER(recvtimeo)) != 0) || + ((rv = OPT_REGISTER(sendtimeo)) != 0) || + ((rv = OPT_REGISTER(reconnmint)) != 0) || + ((rv = OPT_REGISTER(reconnmaxt)) != 0) || + ((rv = OPT_REGISTER(recvmaxsz)) != 0) || + ((rv = OPT_REGISTER(maxttl)) != 0) || + ((rv = OPT_REGISTER(protocol)) != 0) || + ((rv = OPT_REGISTER(transport)) != 0) || + ((rv = OPT_REGISTER(locaddr)) != 0) || + ((rv = OPT_REGISTER(remaddr)) != 0) || + ((rv = OPT_REGISTER(recvfd)) != 0) || + ((rv = OPT_REGISTER(sendfd)) != 0) || + ((rv = OPT_REGISTER(req_resendtime)) != 0) || + ((rv = OPT_REGISTER(sub_subscribe)) != 0) || + ((rv = OPT_REGISTER(sub_unsubscribe)) != 0) || + ((rv = OPT_REGISTER(surveyor_surveytime)) != 0)) { nni_option_sys_fini(); return (rv); } +#undef OPT_REGISTER + return (0); } - -void -nni_option_sys_fini(void) -{ - if (nni_option_nextid != 0) { - nni_option *opt; - while ((opt = nni_list_first(&nni_options)) != NULL) { - nni_list_remove(&nni_options, opt); - nni_free(opt->o_name, strlen(opt->o_name) + 1); - NNI_FREE_STRUCT(opt); - } - } - nni_option_nextid = 0; -}
\ No newline at end of file diff --git a/src/core/socket.c b/src/core/socket.c index 765a6d0c..be1c2d96 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -362,17 +362,17 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) || ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0) || - ((rv = nni_sock_setopt(s, NNG_OPT_LINGER, &s->s_linger, + ((rv = nni_sock_setopt(s, nng_optid_linger, &s->s_linger, sizeof(nni_duration))) != 0) || - ((rv = nni_sock_setopt(s, NNG_OPT_SNDTIMEO, &s->s_sndtimeo, + ((rv = nni_sock_setopt(s, nng_optid_sendtimeo, &s->s_sndtimeo, sizeof(nni_duration))) != 0) || - ((rv = nni_sock_setopt(s, NNG_OPT_RCVTIMEO, &s->s_rcvtimeo, + ((rv = nni_sock_setopt(s, nng_optid_recvtimeo, &s->s_rcvtimeo, sizeof(nni_duration))) != 0) || - ((rv = nni_sock_setopt(s, NNG_OPT_RECONN_TIME, &s->s_reconn, + ((rv = nni_sock_setopt(s, nng_optid_reconnmint, &s->s_reconn, sizeof(nni_duration))) != 0) || - ((rv = nni_sock_setopt(s, NNG_OPT_RECONN_MAXTIME, &s->s_reconnmax, + ((rv = nni_sock_setopt(s, nng_optid_reconnmaxt, &s->s_reconnmax, sizeof(nni_duration))) != 0) || - ((rv = nni_sock_setopt(s, NNG_OPT_RCVMAXSZ, &s->s_rcvmaxsz, + ((rv = nni_sock_setopt(s, nng_optid_recvmaxsz, &s->s_rcvmaxsz, sizeof(size_t))) != 0)) { nni_sock_destroy(s); return (rv); @@ -807,26 +807,18 @@ nni_sock_setopt(nni_sock *s, int opt, const void *val, size_t size) // Some options do not go down to transports. Handle them // directly. - switch (opt) { - case NNG_OPT_RECONN_TIME: + if (opt == nng_optid_reconnmint) { rv = nni_setopt_usec(&s->s_reconn, val, size); - break; - case NNG_OPT_RECONN_MAXTIME: + } else if (opt == nng_optid_reconnmaxt) { rv = nni_setopt_usec(&s->s_reconnmax, val, size); - break; - case NNG_OPT_SNDBUF: + } else if (opt == nng_optid_sendbuf) { rv = nni_setopt_buf(s->s_uwq, val, size); - break; - case NNG_OPT_RCVBUF: + } else if (opt == nng_optid_recvbuf) { rv = nni_setopt_buf(s->s_urq, val, size); - break; - case NNG_OPT_SNDFD: - case NNG_OPT_RCVFD: - case NNG_OPT_LOCALADDR: - case NNG_OPT_REMOTEADDR: + } else if ((opt == nng_optid_sendfd) || (opt == nng_optid_recvfd) || + (opt == nng_optid_locaddr) || (opt == nng_optid_remaddr)) { // these options can be read, but cannot be set rv = NNG_EINVAL; - break; } nni_mtx_unlock(&s->s_mx); @@ -845,21 +837,14 @@ nni_sock_setopt(nni_sock *s, int opt, const void *val, size_t size) // check was found, or even if a transport rejected one of the // settings. if ((rv == NNG_ENOTSUP) || (rv == 0)) { - switch (opt) { - case NNG_OPT_LINGER: - rv = nni_chkopt_usec(val, size); - break; - case NNG_OPT_SNDTIMEO: + if ((opt == nng_optid_linger) || + (opt == nng_optid_sendtimeo) || + (opt == nng_optid_recvtimeo)) { rv = nni_chkopt_usec(val, size); - break; - case NNG_OPT_RCVTIMEO: - rv = nni_chkopt_usec(val, size); - break; - case NNG_OPT_RCVMAXSZ: + } else if (opt == nng_optid_recvmaxsz) { // just a sanity test on the size; it also ensures that // a size can be set even with no transport configured. rv = nni_chkopt_size(val, size, 0, NNI_MAXSZ); - break; } } @@ -913,16 +898,13 @@ nni_sock_setopt(nni_sock *s, int opt, const void *val, size_t size) // For some options, which also have an impact on the socket // behavior, we save a local value. Note that the transport // will already have had a chance to veto this. - switch (opt) { - case NNG_OPT_LINGER: + + if (opt == nng_optid_linger) { rv = nni_setopt_usec(&s->s_linger, val, size); - break; - case NNG_OPT_SNDTIMEO: + } else if (opt == nng_optid_sendtimeo) { rv = nni_setopt_usec(&s->s_sndtimeo, val, size); - break; - case NNG_OPT_RCVTIMEO: + } else if (opt == nng_optid_recvtimeo) { rv = nni_setopt_usec(&s->s_rcvtimeo, val, size); - break; } if (rv == 0) { @@ -965,26 +947,19 @@ nni_sock_getopt(nni_sock *s, int opt, void *val, size_t *szp) // Options that are handled by socket core, and never // passed down. - switch (opt) { - case NNG_OPT_RECONN_TIME: - rv = nni_getopt_usec(&s->s_reconn, val, szp); - break; - case NNG_OPT_RECONN_MAXTIME: - rv = nni_getopt_usec(&s->s_reconnmax, val, szp); - break; - case NNG_OPT_SNDBUF: + if (opt == nng_optid_sendbuf) { rv = nni_getopt_buf(s->s_uwq, val, szp); - break; - case NNG_OPT_RCVBUF: + } else if (opt == nng_optid_recvbuf) { rv = nni_getopt_buf(s->s_urq, val, szp); - break; - case NNG_OPT_SNDFD: + } else if (opt == nng_optid_sendfd) { rv = nni_getopt_fd(s, &s->s_send_fd, NNG_EV_CAN_SND, val, szp); - break; - case NNG_OPT_RCVFD: + } else if (opt == nng_optid_recvfd) { rv = nni_getopt_fd(s, &s->s_recv_fd, NNG_EV_CAN_RCV, val, szp); - break; - default: + } else if (opt == nng_optid_reconnmint) { + rv = nni_getopt_usec(&s->s_reconn, val, szp); + } else if (opt == nng_optid_reconnmaxt) { + rv = nni_getopt_usec(&s->s_reconnmax, val, szp); + } else { NNI_LIST_FOREACH (&s->s_options, sopt) { if (sopt->opt == opt) { size_t sz = sopt->sz; @@ -997,7 +972,6 @@ nni_sock_getopt(nni_sock *s, int opt, void *val, size_t *szp) break; } } - break; } nni_mtx_unlock(&s->s_mx); return (rv); @@ -881,12 +881,14 @@ nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp) int nng_option_lookup(const char *name) { + (void) nni_init(); return (nni_option_lookup(name)); } const char * nng_option_name(int id) { + (void) nni_init(); return (nni_option_name(id)); } @@ -990,3 +992,49 @@ nng_thread_destroy(void *arg) NNI_FREE_STRUCT(thr); } + +// Constant option definitions. These are for well-known options, +// so that the vast majority of consumers don't have to look these up. + +const char *nng_opt_raw = "raw"; +const char *nng_opt_linger = "linger"; +const char *nng_opt_recvbuf = "recv-buffer"; +const char *nng_opt_sendbuf = "send-buffer"; +const char *nng_opt_recvtimeo = "recv-timeout"; +const char *nng_opt_sendtimeo = "send-timeout"; +const char *nng_opt_recvmaxsz = "recv-size-max"; +const char *nng_opt_reconnmint = "reconnect-time-min"; +const char *nng_opt_reconnmaxt = "reconnect-time-min"; +const char *nng_opt_maxttl = "ttl-max"; +const char *nng_opt_protocol = "protocol"; +const char *nng_opt_transport = "transport"; +const char *nng_opt_recvfd = "recv-fd"; +const char *nng_opt_sendfd = "send-fd"; +const char *nng_opt_locaddr = "local-address"; +const char *nng_opt_remaddr = "remote-address"; +// Well known protocol options. +const char *nng_opt_req_resendtime = "req:resend-time"; +const char *nng_opt_sub_subscribe = "sub:subscribe"; +const char *nng_opt_sub_unsubscribe = "sub:unsubscribe"; +const char *nng_opt_surveyor_surveytime = "surveyor:survey-time"; + +int nng_optid_raw; +int nng_optid_linger; +int nng_optid_recvbuf; +int nng_optid_sendbuf; +int nng_optid_recvtimeo; +int nng_optid_sendtimeo; +int nng_optid_recvmaxsz; +int nng_optid_reconnmint; +int nng_optid_reconnmaxt; +int nng_optid_maxttl; +int nng_optid_protocol; +int nng_optid_transport; +int nng_optid_recvfd; +int nng_optid_sendfd; +int nng_optid_locaddr; +int nng_optid_remaddr; +int nng_optid_req_resendtime; +int nng_optid_sub_subscribe; +int nng_optid_sub_unsubscribe; +int nng_optid_surveyor_surveytime; @@ -403,29 +403,50 @@ NNG_DECL int nng_respondent0_open(nng_socket *); #define NNG_OPT_SOCKET(c) (c) #define NNG_OPT_TRANSPORT_OPT(t, c) (0x10000 | ((t) << 16) | (c)) -enum nng_opt_enum { - NNG_OPT_RAW = NNG_OPT_SOCKET(0), - NNG_OPT_LINGER = NNG_OPT_SOCKET(1), - NNG_OPT_RCVBUF = NNG_OPT_SOCKET(2), - NNG_OPT_SNDBUF = NNG_OPT_SOCKET(3), - NNG_OPT_RCVTIMEO = NNG_OPT_SOCKET(4), - NNG_OPT_SNDTIMEO = NNG_OPT_SOCKET(5), - NNG_OPT_RECONN_TIME = NNG_OPT_SOCKET(6), - NNG_OPT_RECONN_MAXTIME = NNG_OPT_SOCKET(7), - NNG_OPT_RCVMAXSZ = NNG_OPT_SOCKET(8), - NNG_OPT_MAXTTL = NNG_OPT_SOCKET(9), - NNG_OPT_PROTOCOL = NNG_OPT_SOCKET(10), - NNG_OPT_SUBSCRIBE = NNG_OPT_SOCKET(11), - NNG_OPT_UNSUBSCRIBE = NNG_OPT_SOCKET(12), - NNG_OPT_SURVEYTIME = NNG_OPT_SOCKET(13), - NNG_OPT_RESENDTIME = NNG_OPT_SOCKET(14), - NNG_OPT_TRANSPORT = NNG_OPT_SOCKET(15), - NNG_OPT_LOCALADDR = NNG_OPT_SOCKET(16), - NNG_OPT_REMOTEADDR = NNG_OPT_SOCKET(17), - NNG_OPT_RCVFD = NNG_OPT_SOCKET(18), - NNG_OPT_SNDFD = NNG_OPT_SOCKET(19), -}; - +extern const char *nng_opt_raw; +extern const char *nng_opt_linger; +extern const char *nng_opt_recvbuf; +extern const char *nng_opt_sendbuf; +extern const char *nng_opt_recvtimeo; +extern const char *nng_opt_sendtimeo; +extern const char *nng_opt_recvmaxsz; +extern const char *nng_opt_reconnmint; +extern const char *nng_opt_reconnmaxt; +extern const char *nng_opt_maxttl; +extern const char *nng_opt_protocol; +extern const char *nng_opt_transport; +extern const char *nng_opt_recvfd; +extern const char *nng_opt_sendfd; +extern const char *nng_opt_locaddr; +extern const char *nng_opt_remaddr; +extern const char *nng_opt_req_resendtime; +extern const char *nng_opt_sub_subscribe; +extern const char *nng_opt_sub_unsubscribe; +extern const char *nng_opt_surveyor_surveytime; + +extern int nng_optid_raw; +extern int nng_optid_linger; +extern int nng_optid_recvbuf; +extern int nng_optid_sendbuf; +extern int nng_optid_recvtimeo; +extern int nng_optid_sendtimeo; +extern int nng_optid_recvmaxsz; +extern int nng_optid_reconnmint; +extern int nng_optid_reconnmaxt; +extern int nng_optid_maxttl; +extern int nng_optid_protocol; +extern int nng_optid_transport; +extern int nng_optid_recvfd; +extern int nng_optid_sendfd; +extern int nng_optid_locaddr; +extern int nng_optid_remaddr; + +// These protocol specific options may not be valid until a socket of +// the given protocol is opened! +extern int nng_optid_req_resendtime; +extern int nng_optid_sub_subscribe; +extern int nng_optid_sub_unsubscribe; +extern int nng_optid_surveyor_surveytime; // XXX: TBD: priorities, socket names, ipv4only // Statistics. These are for informational purposes only, and subject diff --git a/src/nng_compat.c b/src/nng_compat.c index bb0c9faa..a6a558f5 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -133,9 +133,7 @@ nn_socket(int domain, int protocol) return (-1); } if (domain == AF_SP_RAW) { - int raw = 1; - rv = nng_setopt(sock, NNG_OPT_RAW, &raw, sizeof(raw)); - if (rv != 0) { + if ((rv = nng_setopt_int(sock, nng_optid_raw, 1)) != 0) { nn_seterror(rv); nng_close(sock); return (-1); @@ -566,106 +564,136 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) return ((int) sz); } +// options which we convert -- most of the array is initialized at run time. +static struct { + int nnlevel; + int nnopt; + int opt; + int mscvt; +} options[] = { + // clang-format off + { NN_SOL_SOCKET, NN_LINGER }, // review + { NN_SOL_SOCKET, NN_SNDBUF }, + { NN_SOL_SOCKET, NN_RCVBUF } , + { NN_SOL_SOCKET, NN_RECONNECT_IVL }, + { NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX }, + { NN_SOL_SOCKET, NN_SNDFD }, + { NN_SOL_SOCKET, NN_RCVFD }, + { NN_SOL_SOCKET, NN_RCVMAXSIZE }, + { NN_SOL_SOCKET, NN_MAXTTL }, + { NN_SOL_SOCKET, NN_RCVTIMEO }, + { NN_SOL_SOCKET, NN_SNDTIMEO }, + { NN_REQ, NN_REQ_RESEND_IVL }, + { NN_SUB, NN_SUB_SUBSCRIBE }, + { NN_SUB, NN_SUB_UNSUBSCRIBE }, + { NN_SURVEYOR, NN_SURVEYOR_DEADLINE }, + // XXX: DOMAIN, IPV4ONLY, SOCKETNAME, SNDPRIO, RCVPRIO + // clang-format on +}; + +static void +init_opts(void) +{ + static int optsinited = 0; + if (optsinited) { + return; + } + for (int i = 0; i < sizeof(options) / sizeof(options[0]); i++) { + if (options[i].opt > 0) { + continue; + } +#define SETOPT(n, ms) \ + options[i].opt = n; \ + options[i].mscvt = ms + + switch (options[i].nnlevel) { + case NN_SOL_SOCKET: + switch (options[i].nnopt) { + case NN_LINGER: + SETOPT(nng_optid_linger, 1); + break; + case NN_SNDBUF: + SETOPT(nng_optid_sendbuf, 0); + break; + case NN_RCVBUF: + SETOPT(nng_optid_recvbuf, 0); + break; + case NN_RECONNECT_IVL: + SETOPT(nng_optid_reconnmint, 1); + break; + case NN_RECONNECT_IVL_MAX: + SETOPT(nng_optid_reconnmaxt, 1); + break; + case NN_SNDFD: + SETOPT(nng_optid_sendfd, 0); + break; + case NN_RCVFD: + SETOPT(nng_optid_recvfd, 0); + break; + case NN_RCVMAXSIZE: + SETOPT(nng_optid_recvmaxsz, 0); + break; + case NN_MAXTTL: + SETOPT(nng_optid_maxttl, 0); + break; + case NN_RCVTIMEO: + SETOPT(nng_optid_recvtimeo, 1); + break; + case NN_SNDTIMEO: + SETOPT(nng_optid_sendtimeo, 1); + break; + } + break; + case NN_REQ: + switch (options[i].nnopt) { + case NN_REQ_RESEND_IVL: + SETOPT(nng_optid_req_resendtime, 1); + break; + } + break; + case NN_SUB: + switch (options[i].nnopt) { + case NN_SUB_SUBSCRIBE: + SETOPT(nng_optid_sub_subscribe, 0); + break; + case NN_SUB_UNSUBSCRIBE: + SETOPT(nng_optid_sub_unsubscribe, 0); + break; + } + case NN_SURVEYOR: + switch (options[i].nnopt) { + case NN_SURVEYOR_DEADLINE: + SETOPT(nng_optid_surveyor_surveytime, 1); + break; + } + break; + } + } + optsinited = 1; +} + int nn_getsockopt(int s, int nnlevel, int nnopt, void *valp, size_t *szp) { - int opt = 0; + int opt = -1; int mscvt = 0; uint64_t usec; int * msecp; int rv; - switch (nnlevel) { - case NN_SOL_SOCKET: - switch (nnopt) { - case NN_LINGER: - opt = NNG_OPT_LINGER; - break; - case NN_SNDBUF: - opt = NNG_OPT_SNDBUF; - break; - case NN_RCVBUF: - opt = NNG_OPT_RCVBUF; - break; - case NN_RECONNECT_IVL: - opt = NNG_OPT_RECONN_TIME; - mscvt = 1; - break; - case NN_RECONNECT_IVL_MAX: - opt = NNG_OPT_RECONN_MAXTIME; - mscvt = 1; - break; - case NN_SNDFD: - opt = NNG_OPT_SNDFD; - break; - case NN_RCVFD: - opt = NNG_OPT_RCVFD; - break; - case NN_RCVMAXSIZE: - opt = NNG_OPT_RCVMAXSZ; - break; - case NN_MAXTTL: - opt = NNG_OPT_MAXTTL; - break; - case NN_RCVTIMEO: - opt = NNG_OPT_RCVTIMEO; - mscvt = 1; - break; - case NN_SNDTIMEO: - opt = NNG_OPT_SNDTIMEO; - mscvt = 1; - break; - case NN_DOMAIN: - case NN_PROTOCOL: - case NN_IPV4ONLY: - case NN_SOCKET_NAME: - case NN_SNDPRIO: - case NN_RCVPRIO: - default: - errno = ENOPROTOOPT; - return (-1); + init_opts(); + for (int i = 0; i < sizeof(options) / sizeof(options[0]); i++) { + if ((options[i].nnlevel == nnlevel) && + (options[i].nnopt == nnopt)) { + mscvt = options[i].mscvt; + opt = options[i].opt; break; } - break; - case NN_REQ: - switch (nnopt) { - case NN_REQ_RESEND_IVL: - opt = NNG_OPT_RESENDTIME; - mscvt = 1; - break; - default: - errno = ENOPROTOOPT; - return (-1); - } - break; - case NN_SUB: - switch (nnopt) { - case NN_SUB_SUBSCRIBE: - opt = NNG_OPT_SUBSCRIBE; - break; - case NN_SUB_UNSUBSCRIBE: - opt = NNG_OPT_UNSUBSCRIBE; - break; - default: - errno = ENOPROTOOPT; - return (-1); - } - break; - case NN_SURVEYOR: - switch (nnopt) { - case NN_SURVEYOR_DEADLINE: - opt = NNG_OPT_SURVEYTIME; - mscvt = 1; - break; - default: - errno = ENOPROTOOPT; - return (-1); - } - break; - default: - errno = ENOPROTOOPT; - return (-1); + } + + if (opt < 0) { + return (ENOPROTOOPT); } if (mscvt) { @@ -696,102 +724,23 @@ nn_getsockopt(int s, int nnlevel, int nnopt, void *valp, size_t *szp) int nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz) { - int opt = 0; + int opt = -1; int mscvt = 0; uint64_t usec; int rv; - switch (nnlevel) { - case NN_SOL_SOCKET: - switch (nnopt) { - case NN_LINGER: - opt = NNG_OPT_LINGER; - break; - case NN_SNDBUF: - opt = NNG_OPT_SNDBUF; - break; - case NN_RCVBUF: - opt = NNG_OPT_RCVBUF; - break; - case NN_RECONNECT_IVL: - opt = NNG_OPT_RECONN_TIME; - mscvt = 1; - break; - case NN_RECONNECT_IVL_MAX: - opt = NNG_OPT_RECONN_MAXTIME; - mscvt = 1; - break; - case NN_SNDFD: - opt = NNG_OPT_SNDFD; - break; - case NN_RCVFD: - opt = NNG_OPT_RCVFD; - break; - case NN_RCVMAXSIZE: - opt = NNG_OPT_RCVMAXSZ; - break; - case NN_MAXTTL: - opt = NNG_OPT_MAXTTL; - break; - case NN_RCVTIMEO: - opt = NNG_OPT_RCVTIMEO; - mscvt = 1; - break; - case NN_SNDTIMEO: - opt = NNG_OPT_SNDTIMEO; - mscvt = 1; - break; - case NN_DOMAIN: - case NN_PROTOCOL: - case NN_IPV4ONLY: - case NN_SOCKET_NAME: - case NN_SNDPRIO: - case NN_RCVPRIO: - default: - errno = ENOPROTOOPT; - return (-1); + init_opts(); + for (int i = 0; i < sizeof(options) / sizeof(options[0]); i++) { + if ((options[i].nnlevel == nnlevel) && + (options[i].nnopt == nnopt)) { + mscvt = options[i].mscvt; + opt = options[i].opt; break; } - break; - case NN_REQ: - switch (nnopt) { - case NN_REQ_RESEND_IVL: - opt = NNG_OPT_RESENDTIME; - mscvt = 1; - break; - default: - errno = ENOPROTOOPT; - return (-1); - } - break; - case NN_SUB: - switch (nnopt) { - case NN_SUB_SUBSCRIBE: - opt = NNG_OPT_SUBSCRIBE; - break; - case NN_SUB_UNSUBSCRIBE: - opt = NNG_OPT_UNSUBSCRIBE; - break; - default: - errno = ENOPROTOOPT; - return (-1); - } - break; - case NN_SURVEYOR: - switch (nnopt) { - case NN_SURVEYOR_DEADLINE: - opt = NNG_OPT_SURVEYTIME; - mscvt = 1; - break; - default: - errno = ENOPROTOOPT; - return (-1); - } - break; - default: - errno = ENOPROTOOPT; - return (-1); + } + if (opt < 0) { + return (ENOPROTOOPT); } if (mscvt) { diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 88fd93e0..6f6def0a 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -322,14 +322,10 @@ static int nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_bus_sock *psock = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } @@ -338,14 +334,10 @@ static int nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_bus_sock *psock = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_getopt_int(&psock->raw, buf, szp); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c index acf9ec25..ef420051 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair/pair_v0.c @@ -231,11 +231,9 @@ pair0_sock_setopt(void *arg, int opt, const void *buf, size_t sz) pair0_sock *s = arg; int rv; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); - break; - default: + } else { rv = NNG_ENOTSUP; } return (rv); @@ -247,11 +245,9 @@ pair0_sock_getopt(void *arg, int opt, void *buf, size_t *szp) pair0_sock *s = arg; int rv; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_getopt_int(&s->raw, buf, szp); - break; - default: + } else { rv = NNG_ENOTSUP; } return (rv); diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c index 3b6770ba..a737402f 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair/pair_v1.c @@ -25,9 +25,9 @@ static void pair1_pipe_getq_cb(void *); static void pair1_pipe_putq_cb(void *); static void pair1_pipe_fini(void *); -static int pair1_opt_poly = -1; -static int pair1_opt_maxttl = -1; -static int pair1_opt_raw = -1; +// These are exposed as external names for external consumers. +int nng_optid_pair1_poly; +const char *nng_opt_pair1_poly = "pair1-polyamorous"; // pair1_sock is our per-socket protocol private structure. struct pair1_sock { @@ -400,10 +400,10 @@ pair1_sock_close(void *arg) static int pair1_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { - pair1_sock *s = arg; - int rv; + pair1_sock *s = arg; + int rv = NNG_ENOTSUP; - if (opt == pair1_opt_raw) { + if (opt == nng_optid_raw) { nni_mtx_lock(&s->mtx); if (s->started) { rv = NNG_ESTATE; @@ -411,7 +411,11 @@ pair1_sock_setopt(void *arg, int opt, const void *buf, size_t sz) rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); } nni_mtx_unlock(&s->mtx); - } else if (opt == pair1_opt_poly) { + } else if (opt == nng_optid_maxttl) { + nni_mtx_lock(&s->mtx); + rv = nni_setopt_int(&s->ttl, buf, sz, 1, 255); + nni_mtx_unlock(&s->mtx); + } else if (opt == nng_optid_pair1_poly) { nni_mtx_lock(&s->mtx); if (s->started) { rv = NNG_ESTATE; @@ -419,12 +423,6 @@ pair1_sock_setopt(void *arg, int opt, const void *buf, size_t sz) rv = nni_setopt_int(&s->poly, buf, sz, 0, 1); } nni_mtx_unlock(&s->mtx); - } else if (opt == pair1_opt_maxttl) { - nni_mtx_lock(&s->mtx); - rv = nni_setopt_int(&s->ttl, buf, sz, 1, 255); - nni_mtx_unlock(&s->mtx); - } else { - rv = NNG_ENOTSUP; } return (rv); @@ -433,23 +431,21 @@ pair1_sock_setopt(void *arg, int opt, const void *buf, size_t sz) static int pair1_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { - pair1_sock *s = arg; - int rv; + pair1_sock *s = arg; + int rv = NNG_ENOTSUP; - if (opt == pair1_opt_raw) { + if (opt == nng_optid_raw) { nni_mtx_lock(&s->mtx); rv = nni_getopt_int(&s->raw, buf, szp); nni_mtx_unlock(&s->mtx); - } else if (opt == pair1_opt_maxttl) { + } else if (opt == nng_optid_maxttl) { nni_mtx_lock(&s->mtx); rv = nni_getopt_int(&s->ttl, buf, szp); nni_mtx_unlock(&s->mtx); - } else if (opt == pair1_opt_poly) { + } else if (opt == nng_optid_pair1_poly) { nni_mtx_lock(&s->mtx); rv = nni_getopt_int(&s->poly, buf, szp); nni_mtx_unlock(&s->mtx); - } else { - rv = NNG_ENOTSUP; } return (rv); } @@ -457,19 +453,15 @@ pair1_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static void pair1_fini(void) { - pair1_opt_poly = -1; - pair1_opt_raw = -1; - pair1_opt_maxttl = -1; + nng_optid_pair1_poly = -1; } static int pair1_init(void) { int rv; - if (((rv = nni_option_register("polyamorous", &pair1_opt_poly)) != - 0) || - ((rv = nni_option_register("raw", &pair1_opt_raw)) != 0) || - ((rv = nni_option_register("max-ttl", &pair1_opt_maxttl)) != 0)) { + if ((rv = nni_option_register( + nng_opt_pair1_poly, &nng_optid_pair1_poly)) != 0) { pair1_fini(); return (rv); } diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 1ebcc4a2..1d738ec2 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -170,14 +170,10 @@ static int nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_pull_sock *pull = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } @@ -186,14 +182,10 @@ static int nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_pull_sock *pull = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_getopt_int(&pull->raw, buf, szp); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 1bc1659c..10d04091 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -192,14 +192,10 @@ static int nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_push_sock *push = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_setopt_int(&push->raw, buf, sz, 0, 1); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } @@ -208,14 +204,10 @@ static int nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_push_sock *push = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_getopt_int(&push->raw, buf, szp); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 940f2139..bbca1ecd 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -266,14 +266,10 @@ static int nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_pub_sock *pub = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_setopt_int(&pub->raw, buf, sz, 0, 1); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } @@ -282,14 +278,10 @@ static int nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_pub_sock *pub = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_getopt_int(&pub->raw, buf, szp); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 78b9d157..0563b764 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -251,20 +251,14 @@ static int nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_sub_sock *sub = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_setopt_int(&sub->raw, buf, sz, 0, 1); - break; - case NNG_OPT_SUBSCRIBE: + } else if (opt == nng_optid_sub_subscribe) { rv = nni_sub_subscribe(sub, buf, sz); - break; - case NNG_OPT_UNSUBSCRIBE: + } else if (opt == nng_optid_sub_unsubscribe) { rv = nni_sub_unsubscribe(sub, buf, sz); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } @@ -273,14 +267,10 @@ static int nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_sub_sock *sub = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RAW: + if (opt == nng_optid_raw) { rv = nni_getopt_int(&sub->raw, buf, szp); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 09f2b285..cd33d019 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -348,18 +348,13 @@ static int nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_rep_sock *rep = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_MAXTTL: + if (opt == nng_optid_maxttl) { rv = nni_setopt_int(&rep->ttl, buf, sz, 1, 255); - break; - case NNG_OPT_RAW: + } else if (opt == nng_optid_raw) { rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1); nni_sock_senderr(rep->sock, rep->raw ? 0 : NNG_ESTATE); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } @@ -368,17 +363,12 @@ static int nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_rep_sock *rep = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_MAXTTL: + if (opt == nng_optid_maxttl) { rv = nni_getopt_int(&rep->ttl, buf, szp); - break; - case NNG_OPT_RAW: + } else if (opt == nng_optid_raw) { rv = nni_getopt_int(&rep->raw, buf, szp); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index bab81331..2579417a 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -243,24 +243,21 @@ static int nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_req_sock *req = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RESENDTIME: + if (opt == nng_optid_req_resendtime) { rv = nni_setopt_usec(&req->retry, buf, sz); - break; - case NNG_OPT_RAW: + + } else if (opt == nng_optid_raw) { rv = nni_setopt_int(&req->raw, buf, sz, 0, 1); if (rv == 0) { nni_sock_recverr(req->sock, req->raw ? 0 : NNG_ESTATE); } - break; - case NNG_OPT_MAXTTL: + + } else if (opt == nng_optid_maxttl) { rv = nni_setopt_int(&req->ttl, buf, sz, 1, 255); - break; - default: - rv = NNG_ENOTSUP; } + return (rv); } @@ -268,21 +265,18 @@ static int nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_req_sock *req = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_RESENDTIME: + if (opt == nng_optid_req_resendtime) { rv = nni_getopt_usec(&req->retry, buf, szp); - break; - case NNG_OPT_RAW: + + } else if (opt == nng_optid_raw) { rv = nni_getopt_int(&req->raw, buf, szp); - break; - case NNG_OPT_MAXTTL: + + } else if (opt == nng_optid_maxttl) { rv = nni_getopt_int(&req->ttl, buf, szp); - break; - default: - rv = NNG_ENOTSUP; } + return (rv); } diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index a097f551..e695a987 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -348,14 +348,13 @@ static int nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_resp_sock *psock = arg; - int rv; + int rv = NNG_ENOTSUP; int oldraw; - switch (opt) { - case NNG_OPT_MAXTTL: + if (opt == nng_optid_maxttl) { rv = nni_setopt_int(&psock->ttl, buf, sz, 1, 255); - break; - case NNG_OPT_RAW: + + } else if (opt == nng_optid_raw) { oldraw = psock->raw; rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); if (oldraw != psock->raw) { @@ -365,10 +364,8 @@ nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz) nni_sock_senderr(psock->nsock, NNG_ESTATE); } } - break; - default: - rv = NNG_ENOTSUP; } + return (rv); } @@ -376,17 +373,12 @@ static int nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_resp_sock *psock = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_MAXTTL: + if (opt == nng_optid_maxttl) { rv = nni_getopt_int(&psock->ttl, buf, szp); - break; - case NNG_OPT_RAW: + } else if (opt == nng_optid_raw) { rv = nni_getopt_int(&psock->raw, buf, szp); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 2a32f289..d7341025 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -267,14 +267,13 @@ static int nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_surv_sock *psock = arg; - int rv; + int rv = NNG_ENOTSUP; int oldraw; - switch (opt) { - case NNG_OPT_SURVEYTIME: + if (opt == nng_optid_surveyor_surveytime) { rv = nni_setopt_usec(&psock->survtime, buf, sz); - break; - case NNG_OPT_RAW: + + } else if (opt == nng_optid_raw) { oldraw = psock->raw; rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); if (oldraw != psock->raw) { @@ -286,10 +285,8 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz) psock->survid = 0; nni_timer_cancel(&psock->timer); } - break; - default: - rv = NNG_ENOTSUP; } + return (rv); } @@ -297,17 +294,12 @@ static int nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_surv_sock *psock = arg; - int rv; + int rv = NNG_ENOTSUP; - switch (opt) { - case NNG_OPT_SURVEYTIME: + if (opt == nng_optid_surveyor_surveytime) { rv = nni_getopt_usec(&psock->survtime, buf, szp); - break; - case NNG_OPT_RAW: + } else if (opt == nng_optid_raw) { rv = nni_getopt_int(&psock->raw, buf, szp); - break; - default: - rv = NNG_ENOTSUP; } return (rv); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 4c79ddfd..23a74998 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -182,6 +182,7 @@ nni_inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) nni_inproc_pipe *pipe = arg; size_t len; +#if 0 switch (option) { case NNG_OPT_LOCALADDR: case NNG_OPT_REMOTEADDR: @@ -194,6 +195,7 @@ nni_inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) *szp = len; return (0); } +#endif return (NNG_ENOTSUP); } diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 4324eedd..e5e19f36 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -64,8 +64,7 @@ static void nni_ipc_ep_cb(void *); static int nni_ipc_tran_chkopt(int o, const void *data, size_t sz) { - switch (o) { - case NNG_OPT_RCVMAXSZ: + if (o == nng_optid_recvmaxsz) { return (nni_chkopt_size(data, sz, 0, NNI_MAXSZ)); } return (NNG_ENOTSUP); @@ -648,37 +647,28 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio) static int nni_ipc_ep_setopt(void *arg, int opt, const void *v, size_t sz) { - int rv; + int rv = NNG_ENOTSUP; nni_ipc_ep *ep = arg; - nni_mtx_lock(&ep->mtx); - switch (opt) { - case NNG_OPT_RCVMAXSZ: + + if (opt == nng_optid_recvmaxsz) { + nni_mtx_lock(&ep->mtx); rv = nni_setopt_size(&ep->rcvmax, v, sz, 0, NNI_MAXSZ); - break; - default: - rv = NNG_ENOTSUP; - break; + nni_mtx_unlock(&ep->mtx); } - nni_mtx_unlock(&ep->mtx); return (rv); } static int nni_ipc_ep_getopt(void *arg, int opt, void *v, size_t *szp) { - int rv; + int rv = NNG_ENOTSUP; nni_ipc_ep *ep = arg; - nni_mtx_lock(&ep->mtx); - switch (opt) { - case NNG_OPT_RCVMAXSZ: + if (opt == nng_optid_recvmaxsz) { + nni_mtx_lock(&ep->mtx); rv = nni_getopt_size(&ep->rcvmax, v, szp); - break; - default: - rv = NNG_ENOTSUP; - break; + nni_mtx_unlock(&ep->mtx); } - nni_mtx_unlock(&ep->mtx); return (rv); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index f5e5ad87..de1bfa66 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -65,10 +65,10 @@ static void nni_tcp_ep_cb(void *arg); static int nni_tcp_tran_chkopt(int o, const void *data, size_t sz) { - switch (o) { - case NNG_OPT_RCVMAXSZ: + if (o == nng_optid_recvmaxsz) { return (nni_chkopt_size(data, sz, 0, NNI_MAXSZ)); - case NNG_OPT_LINGER: + } + if (o == nng_optid_linger) { return (nni_chkopt_usec(data, sz)); } return (NNG_ENOTSUP); @@ -766,45 +766,39 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio) static int nni_tcp_ep_setopt(void *arg, int opt, const void *v, size_t sz) { - int rv; + int rv = NNG_ENOTSUP; nni_tcp_ep *ep = arg; - nni_mtx_lock(&ep->mtx); - switch (opt) { - case NNG_OPT_RCVMAXSZ: + if (opt == nng_optid_recvmaxsz) { + nni_mtx_lock(&ep->mtx); rv = nni_setopt_size(&ep->rcvmax, v, sz, 0, NNI_MAXSZ); - break; - case NNG_OPT_LINGER: + nni_mtx_unlock(&ep->mtx); + + } else if (opt == nng_optid_linger) { + nni_mtx_lock(&ep->mtx); rv = nni_setopt_usec(&ep->linger, v, sz); - break; - default: - rv = NNG_ENOTSUP; - break; + nni_mtx_unlock(&ep->mtx); } - nni_mtx_unlock(&ep->mtx); return (rv); } static int nni_tcp_ep_getopt(void *arg, int opt, void *v, size_t *szp) { - int rv; + int rv = NNG_ENOTSUP; nni_tcp_ep *ep = arg; - nni_mtx_lock(&ep->mtx); - switch (opt) { - case NNG_OPT_RCVMAXSZ: + if (opt == nng_optid_recvmaxsz) { + nni_mtx_lock(&ep->mtx); rv = nni_getopt_size(&ep->rcvmax, v, szp); - break; - case NNG_OPT_LINGER: + nni_mtx_unlock(&ep->mtx); + + } else if (opt == nng_optid_linger) { + nni_mtx_lock(&ep->mtx); rv = nni_getopt_usec(&ep->linger, v, szp); - break; - default: - // XXX: add address properties - rv = NNG_ENOTSUP; - break; + nni_mtx_unlock(&ep->mtx); } - nni_mtx_unlock(&ep->mtx); + // XXX: add address properties return (rv); } diff --git a/tests/bus.c b/tests/bus.c index d242a7dd..c38f8c5a 100644 --- a/tests/bus.c +++ b/tests/bus.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -56,14 +57,9 @@ TestMain("BUS pattern", { So(nng_dial(bus3, addr, NULL, 0) == 0); rtimeo = 50000; - So(nng_setopt( - bus1, NNG_OPT_RCVTIMEO, &rtimeo, sizeof(rtimeo)) == 0); - rtimeo = 50000; - So(nng_setopt( - bus2, NNG_OPT_RCVTIMEO, &rtimeo, sizeof(rtimeo)) == 0); - rtimeo = 50000; - So(nng_setopt( - bus3, NNG_OPT_RCVTIMEO, &rtimeo, sizeof(rtimeo)) == 0); + So(nng_setopt_usec(bus1, nng_optid_recvtimeo, rtimeo) == 0); + So(nng_setopt_usec(bus2, nng_optid_recvtimeo, rtimeo) == 0); + So(nng_setopt_usec(bus3, nng_optid_recvtimeo, rtimeo) == 0); Convey("Messages delivered", { nng_msg *msg; diff --git a/tests/device.c b/tests/device.c index 85a06fca..d3407eab 100644 --- a/tests/device.c +++ b/tests/device.c @@ -42,19 +42,15 @@ Main({ nng_socket dev2; nng_socket end1; nng_socket end2; - int raw; uint64_t tmo; nng_msg * msg; void * thr; So(nng_pair1_open(&dev1) == 0); So(nng_pair1_open(&dev2) == 0); - raw = 1; - So(nng_setopt(dev1, NNG_OPT_RAW, &raw, sizeof(raw)) == - 0); - raw = 1; - So(nng_setopt(dev2, NNG_OPT_RAW, &raw, sizeof(raw)) == - 0); + + So(nng_setopt_int(dev1, nng_optid_raw, 1) == 0); + So(nng_setopt_int(dev2, nng_optid_raw, 1) == 0); struct dev_data ddata; ddata.s1 = dev1; @@ -77,11 +73,10 @@ Main({ So(nng_dial(end2, addr2, NULL, 0) == 0); tmo = 1000000; - So(nng_setopt(end1, NNG_OPT_RCVTIMEO, &tmo, - sizeof(tmo)) == 0); - tmo = 1000000; - So(nng_setopt(end2, NNG_OPT_RCVTIMEO, &tmo, - sizeof(tmo)) == 0); + So(nng_setopt_usec(end1, nng_optid_recvtimeo, tmo) == + 0); + So(nng_setopt_usec(end2, nng_optid_recvtimeo, tmo) == + 0); nng_usleep(100000); Convey("Device can send and receive", { diff --git a/tests/pair1.c b/tests/pair1.c index adb50c88..6e7a22cb 100644 --- a/tests/pair1.c +++ b/tests/pair1.c @@ -14,6 +14,9 @@ #include <string.h> +extern int nng_optid_pair1_poly; +extern const char *nng_opt_pair1_poly; + #define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) #define CHECKSTR(m, s) \ So(nng_msg_len(m) == strlen(s)); \ @@ -44,13 +47,25 @@ TestMain("PAIRv1 protocol", { So(nng_pair1_open(&c2) == 0); tmo = 300000; - So(nng_setopt_usec(s1, NNG_OPT_RCVTIMEO, tmo) == 0); - So(nng_setopt_usec(c1, NNG_OPT_RCVTIMEO, tmo) == 0); - So(nng_setopt_usec(c2, NNG_OPT_RCVTIMEO, tmo) == 0); + So(nng_setopt_usec(s1, nng_optid_recvtimeo, tmo) == 0); + So(nng_setopt_usec(c1, nng_optid_recvtimeo, tmo) == 0); + So(nng_setopt_usec(c2, nng_optid_recvtimeo, tmo) == 0); tmo = 0; - So(nng_getopt_usec(s1, NNG_OPT_RCVTIMEO, &tmo) == 0); + So(nng_getopt_usec(s1, nng_optid_recvtimeo, &tmo) == 0); So(tmo == 300000); + Convey("Polyamorous option id works", { + // This test has to be done after polyamorous mode + // is registered! + int poly; + poly = nng_option_lookup(nng_opt_pair1_poly); + So(poly >= 0); + So(poly == nng_optid_pair1_poly); + So(nng_option_name(poly) != 0); + So(strcmp(nng_option_name(poly), nng_opt_pair1_poly) == + 0); + }); + Convey("Monogamous cooked mode works", { nng_msg *msg; @@ -98,26 +113,22 @@ TestMain("PAIRv1 protocol", { So(nng_dial(c1, addr, NULL, 0) == 0); nng_usleep(100000); - So(nng_setopt_int(s1, NNG_OPT_RAW, 1) == NNG_ESTATE); - So(nng_setopt_int(c1, NNG_OPT_RAW, 1) == NNG_ESTATE); + So(nng_setopt_int(s1, nng_optid_raw, 1) == NNG_ESTATE); + So(nng_setopt_int(c1, nng_optid_raw, 1) == NNG_ESTATE); }); Convey("Polyamorous mode is best effort", { int rv; int i; nng_msg *msg; - int poly; - poly = nng_option_lookup("polyamorous"); - So(poly >= 0); - So(nng_option_name(poly) != NULL); - So(strcmp(nng_option_name(poly), "polyamorous") == 0); - So(nng_setopt_int(s1, poly, 1) == 0); + So(nng_setopt_int(s1, nng_optid_pair1_poly, 1) == 0); - So(nng_setopt_int(s1, NNG_OPT_RCVBUF, 1) == 0); - So(nng_setopt_int(s1, NNG_OPT_SNDBUF, 1) == 0); - So(nng_setopt_int(c1, NNG_OPT_RCVBUF, 1) == 0); - So(nng_setopt_usec(s1, NNG_OPT_SNDTIMEO, 100000) == 0); + So(nng_setopt_int(s1, nng_optid_recvbuf, 1) == 0); + So(nng_setopt_int(s1, nng_optid_sendbuf, 1) == 0); + So(nng_setopt_int(c1, nng_optid_recvbuf, 1) == 0); + So(nng_setopt_usec(s1, nng_optid_sendtimeo, 100000) == + 0); So(nng_listen(s1, addr, NULL, 0) == 0); So(nng_dial(c1, addr, NULL, 0) == 0); @@ -138,10 +149,11 @@ TestMain("PAIRv1 protocol", { int rv; nng_msg *msg; - So(nng_setopt_int(s1, NNG_OPT_RCVBUF, 1) == 0); - So(nng_setopt_int(s1, NNG_OPT_SNDBUF, 1) == 0); - So(nng_setopt_int(c1, NNG_OPT_RCVBUF, 1) == 0); - So(nng_setopt_usec(s1, NNG_OPT_SNDTIMEO, 30000) == 0); + So(nng_setopt_int(s1, nng_optid_recvbuf, 1) == 0); + So(nng_setopt_int(s1, nng_optid_sendbuf, 1) == 0); + So(nng_setopt_int(c1, nng_optid_recvbuf, 1) == 0); + So(nng_setopt_usec(s1, nng_optid_sendtimeo, 30000) == + 0); So(nng_listen(s1, addr, NULL, 0) == 0); So(nng_dial(c1, addr, NULL, 0) == 0); @@ -164,21 +176,18 @@ TestMain("PAIRv1 protocol", { So(nng_listen(s1, addr, NULL, 0) == 0); So(nng_dial(c1, addr, NULL, 0) == 0); nng_usleep(100000); - poly = nng_option_lookup("polyamorous"); - So(poly >= 0); - So(nng_option_name(poly) != NULL); - So(strcmp(nng_option_name(poly), "polyamorous") == 0); - So(nng_setopt_int(s1, poly, 1) == NNG_ESTATE); + So(nng_setopt_int(s1, nng_optid_pair1_poly, 1) == + NNG_ESTATE); }); Convey("Monogamous raw mode works", { nng_msg *msg; uint32_t hops; - So(nng_setopt_int(s1, NNG_OPT_RAW, 1) == 0); - So(nng_setopt_int(c1, NNG_OPT_RAW, 1) == 0); - So(nng_setopt_int(c2, NNG_OPT_RAW, 1) == 0); + So(nng_setopt_int(s1, nng_optid_raw, 1) == 0); + So(nng_setopt_int(c1, nng_optid_raw, 1) == 0); + So(nng_setopt_int(c2, nng_optid_raw, 1) == 0); So(nng_listen(s1, addr, NULL, 0) == 0); So(nng_dial(c1, addr, NULL, 0) == 0); @@ -255,9 +264,10 @@ TestMain("PAIRv1 protocol", { Convey("TTL is honored", { int ttl; - So(nng_setopt_int(s1, NNG_OPT_MAXTTL, 4) == 0); - So(nng_getopt_int(s1, NNG_OPT_MAXTTL, &ttl) == + So(nng_setopt_int(s1, nng_optid_maxttl, 4) == 0); + So(nng_getopt_int( + s1, nng_optid_maxttl, &ttl) == 0); So(ttl == 4); Convey("Bad TTL bounces", { So(nng_msg_alloc(&msg, 0) == 0); @@ -285,8 +295,8 @@ TestMain("PAIRv1 protocol", { Convey("Large TTL passes", { ttl = 0xff; - So(nng_setopt_int( - s1, NNG_OPT_MAXTTL, 0xff) == 0); + So(nng_setopt_int(s1, nng_optid_maxttl, + 0xff) == 0); So(nng_msg_alloc(&msg, 0) == 0); So(nng_msg_append_u32(msg, 1234) == 0); So(nng_msg_header_append_u32( @@ -303,8 +313,8 @@ TestMain("PAIRv1 protocol", { Convey("Max TTL fails", { ttl = 0xff; - So(nng_setopt_int( - s1, NNG_OPT_MAXTTL, 0xff) == 0); + So(nng_setopt_int(s1, nng_optid_maxttl, + 0xff) == 0); So(nng_msg_alloc(&msg, 0) == 0); So(nng_msg_header_append_u32( msg, 0xff) == 0); @@ -319,15 +329,15 @@ TestMain("PAIRv1 protocol", { int ttl; ttl = 0; - So(nng_setopt_int(s1, NNG_OPT_MAXTTL, 0) == + So(nng_setopt_int(s1, nng_optid_maxttl, 0) == NNG_EINVAL); - So(nng_setopt_int(s1, NNG_OPT_MAXTTL, 1000) == + So(nng_setopt_int(s1, nng_optid_maxttl, 1000) == NNG_EINVAL); sz = 1; ttl = 8; - So(nng_setopt(s1, NNG_OPT_MAXTTL, &ttl, sz) == + So(nng_setopt(s1, nng_optid_maxttl, &ttl, sz) == NNG_EINVAL); }); @@ -336,18 +346,12 @@ TestMain("PAIRv1 protocol", { int v; nng_pipe p1; nng_pipe p2; - int poly; - poly = nng_option_lookup("polyamorous"); - So(poly >= 0); - So(nng_option_name(poly) != NULL); - So(strcmp(nng_option_name(poly), "polyamorous") == 0); - - So(nng_getopt_int(s1, poly, &v) == 0); + So(nng_getopt_int(s1, nng_optid_pair1_poly, &v) == 0); So(v == 0); - So(nng_setopt_int(s1, poly, 1) == 0); - So(nng_getopt_int(s1, poly, &v) == 0); + So(nng_setopt_int(s1, nng_optid_pair1_poly, 1) == 0); + So(nng_getopt_int(s1, nng_optid_pair1_poly, &v) == 0); So(v == 1); So(nng_listen(s1, addr, NULL, 0) == 0); @@ -402,14 +406,8 @@ TestMain("PAIRv1 protocol", { Convey("Polyamorous default works", { nng_msg *msg; - int poly; - - poly = nng_option_lookup("polyamorous"); - So(poly >= 0); - So(nng_option_name(poly) != NULL); - So(strcmp(nng_option_name(poly), "polyamorous") == 0); - So(nng_setopt_int(s1, poly, 1) == 0); + So(nng_setopt_int(s1, nng_optid_pair1_poly, 1) == 0); So(nng_listen(s1, addr, NULL, 0) == 0); So(nng_dial(c1, addr, NULL, 0) == 0); @@ -440,23 +438,17 @@ TestMain("PAIRv1 protocol", { uint32_t hops; nng_pipe p1; nng_pipe p2; - int poly; - - poly = nng_option_lookup("polyamorous"); - So(poly >= 0); - So(nng_option_name(poly) != NULL); - So(strcmp(nng_option_name(poly), "polyamorous") == 0); - So(nng_getopt_int(s1, poly, &v) == 0); + So(nng_getopt_int(s1, nng_optid_pair1_poly, &v) == 0); So(v == 0); - So(nng_setopt_int(s1, poly, 1) == 0); - So(nng_getopt_int(s1, poly, &v) == 0); + So(nng_setopt_int(s1, nng_optid_pair1_poly, 1) == 0); + So(nng_getopt_int(s1, nng_optid_pair1_poly, &v) == 0); So(v == 1); v = 0; - So(nng_setopt_int(s1, NNG_OPT_RAW, 1) == 0); - So(nng_getopt_int(s1, NNG_OPT_RAW, &v) == 0); + So(nng_setopt_int(s1, nng_optid_raw, 1) == 0); + So(nng_getopt_int(s1, nng_optid_raw, &v) == 0); So(v == 1); So(nng_listen(s1, addr, NULL, 0) == 0); diff --git a/tests/pipeline.c b/tests/pipeline.c index 70b52350..78002fc6 100644 --- a/tests/pipeline.c +++ b/tests/pipeline.c @@ -16,172 +16,155 @@ So(nng_msg_len(m) == strlen(s)); \ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) -Main({ +TestMain("PIPELINE (PUSH/PULL) pattern", { + atexit(nng_fini); const char *addr = "inproc://test"; + Convey("We can create a PUSH socket", { + nng_socket push; - Test("PIPELINE (PUSH/PULL) pattern", { - Convey("We can create a PUSH socket", { - nng_socket push; + So(nng_push_open(&push) == 0); - So(nng_push_open(&push) == 0); + Reset({ nng_close(push); }); - Reset({ nng_close(push); }); - - Convey("Protocols match", { - So(nng_protocol(push) == NNG_PROTO_PUSH); - So(nng_peer(push) == NNG_PROTO_PULL); - }); + Convey("Protocols match", { + So(nng_protocol(push) == NNG_PROTO_PUSH); + So(nng_peer(push) == NNG_PROTO_PULL); + }); - Convey("Recv fails", { - nng_msg *msg; - So(nng_recvmsg(push, &msg, 0) == NNG_ENOTSUP); - }); + Convey("Recv fails", { + nng_msg *msg; + So(nng_recvmsg(push, &msg, 0) == NNG_ENOTSUP); }); + }); - Convey("We can create a PULL socket", { - nng_socket pull; - So(nng_pull_open(&pull) == 0); + Convey("We can create a PULL socket", { + nng_socket pull; + So(nng_pull_open(&pull) == 0); - Reset({ nng_close(pull); }); + Reset({ nng_close(pull); }); - Convey("Protocols match", { - So(nng_protocol(pull) == NNG_PROTO_PULL); - So(nng_peer(pull) == NNG_PROTO_PUSH); - }); + Convey("Protocols match", { + So(nng_protocol(pull) == NNG_PROTO_PULL); + So(nng_peer(pull) == NNG_PROTO_PUSH); + }); - Convey("Send fails", { - nng_msg *msg; - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_sendmsg(pull, msg, 0) == NNG_ENOTSUP); - nng_msg_free(msg); - }); + Convey("Send fails", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(pull, msg, 0) == NNG_ENOTSUP); + nng_msg_free(msg); }); + }); - Convey("We can create a linked PUSH/PULL pair", { - nng_socket push; - nng_socket pull; - nng_socket what; - - So(nng_push_open(&push) == 0); - So(nng_pull_open(&pull) == 0); - So(nng_push_open(&what) == 0); - - Reset({ - nng_close(push); - nng_close(pull); - nng_close(what); - }); - - // Its important to avoid a startup race that the - // sender be the dialer. Otherwise you need a delay - // since the server accept is really asynchronous. - So(nng_listen(pull, addr, NULL, 0) == 0); - So(nng_dial(push, addr, NULL, 0) == 0); - So(nng_dial(what, addr, NULL, 0) == 0); - So(nng_shutdown(what) == 0); - - Convey("Push can send messages, and pull can recv", { - nng_msg *msg; - - So(nng_msg_alloc(&msg, 0) == 0); - APPENDSTR(msg, "hello"); - So(nng_sendmsg(push, msg, 0) == 0); - msg = NULL; - So(nng_recvmsg(pull, &msg, 0) == 0); - So(msg != NULL); - CHECKSTR(msg, "hello"); - nng_msg_free(msg); - }); + Convey("We can create a linked PUSH/PULL pair", { + nng_socket push; + nng_socket pull; + nng_socket what; + + So(nng_push_open(&push) == 0); + So(nng_pull_open(&pull) == 0); + So(nng_push_open(&what) == 0); + + Reset({ + nng_close(push); + nng_close(pull); + nng_close(what); }); - Convey("Load balancing", { - nng_msg * abc; - nng_msg * def; - uint64_t usecs; - int len; - nng_socket push; - nng_socket pull1; - nng_socket pull2; - nng_socket pull3; - - So(nng_push_open(&push) == 0); - So(nng_pull_open(&pull1) == 0); - So(nng_pull_open(&pull2) == 0); - So(nng_pull_open(&pull3) == 0); - - Reset({ - nng_close(push); - nng_close(pull1); - nng_close(pull2); - nng_close(pull3); - }); - - // We need to increase the buffer from zero, because - // there is no guarantee that the various listeners - // will be present, which means that they will push - // back during load balancing. Adding a small buffer - // ensures that we can write to each stream, even if - // the listeners are not running yet. - len = 4; - So(nng_setopt( - push, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0); - So(nng_setopt( - push, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0); - So(nng_setopt( - pull1, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0); - So(nng_setopt( - pull1, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0); - So(nng_setopt( - pull2, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0); - So(nng_setopt( - pull2, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0); - So(nng_setopt( - pull3, NNG_OPT_RCVBUF, &len, sizeof(len)) == 0); - So(nng_setopt( - pull3, NNG_OPT_SNDBUF, &len, sizeof(len)) == 0); - - So(nng_msg_alloc(&abc, 0) == 0); - APPENDSTR(abc, "abc"); - So(nng_msg_alloc(&def, 0) == 0); - APPENDSTR(def, "def"); - - usecs = 100000; - So(nng_setopt(pull1, NNG_OPT_RCVTIMEO, &usecs, - sizeof(usecs)) == 0); - So(nng_setopt(pull2, NNG_OPT_RCVTIMEO, &usecs, - sizeof(usecs)) == 0); - So(nng_setopt(pull3, NNG_OPT_RCVTIMEO, &usecs, - sizeof(usecs)) == 0); - So(nng_listen(push, addr, NULL, 0) == 0); - So(nng_dial(pull1, addr, NULL, 0) == 0); - So(nng_dial(pull2, addr, NULL, 0) == 0); - So(nng_dial(pull3, addr, NULL, 0) == 0); - So(nng_shutdown(pull3) == 0); - - // So pull3 might not be done accepting yet, but pull1 - // and pull2 definitely are, because otherwise the - // server couldn't have gotten to the accept. (The - // accept logic is single threaded.) Let's wait a bit - // though, to ensure that stuff has settled. - nng_usleep(100000); - - So(nng_sendmsg(push, abc, 0) == 0); - So(nng_sendmsg(push, def, 0) == 0); - - abc = NULL; - def = NULL; - - So(nng_recvmsg(pull1, &abc, 0) == 0); - CHECKSTR(abc, "abc"); - So(nng_recvmsg(pull2, &def, 0) == 0); - CHECKSTR(def, "def"); - nng_msg_free(abc); - nng_msg_free(def); - - So(nng_recvmsg(pull1, &abc, 0) == NNG_ETIMEDOUT); - So(nng_recvmsg(pull2, &abc, 0) == NNG_ETIMEDOUT); + // Its important to avoid a startup race that the + // sender be the dialer. Otherwise you need a delay + // since the server accept is really asynchronous. + So(nng_listen(pull, addr, NULL, 0) == 0); + So(nng_dial(push, addr, NULL, 0) == 0); + So(nng_dial(what, addr, NULL, 0) == 0); + So(nng_shutdown(what) == 0); + + Convey("Push can send messages, and pull can recv", { + nng_msg *msg; + + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "hello"); + So(nng_sendmsg(push, msg, 0) == 0); + msg = NULL; + So(nng_recvmsg(pull, &msg, 0) == 0); + So(msg != NULL); + CHECKSTR(msg, "hello"); + nng_msg_free(msg); }); }); - nng_fini(); -}) + Convey("Load balancing", { + nng_msg * abc; + nng_msg * def; + uint64_t usecs; + nng_socket push; + nng_socket pull1; + nng_socket pull2; + nng_socket pull3; + + So(nng_push_open(&push) == 0); + So(nng_pull_open(&pull1) == 0); + So(nng_pull_open(&pull2) == 0); + So(nng_pull_open(&pull3) == 0); + + Reset({ + nng_close(push); + nng_close(pull1); + nng_close(pull2); + nng_close(pull3); + }); + + // We need to increase the buffer from zero, because + // there is no guarantee that the various listeners + // will be present, which means that they will push + // back during load balancing. Adding a small buffer + // ensures that we can write to each stream, even if + // the listeners are not running yet. + So(nng_setopt_int(push, nng_optid_recvbuf, 4) == 0); + So(nng_setopt_int(push, nng_optid_sendbuf, 4) == 0); + So(nng_setopt_int(pull1, nng_optid_recvbuf, 4) == 0); + So(nng_setopt_int(pull1, nng_optid_sendbuf, 4) == 0); + So(nng_setopt_int(pull2, nng_optid_recvbuf, 4) == 0); + So(nng_setopt_int(pull2, nng_optid_sendbuf, 4) == 0); + So(nng_setopt_int(pull3, nng_optid_recvbuf, 4) == 0); + So(nng_setopt_int(pull3, nng_optid_sendbuf, 4) == 0); + + So(nng_msg_alloc(&abc, 0) == 0); + APPENDSTR(abc, "abc"); + So(nng_msg_alloc(&def, 0) == 0); + APPENDSTR(def, "def"); + + usecs = 100000; + So(nng_setopt_usec(pull1, nng_optid_recvtimeo, usecs) == 0); + So(nng_setopt_usec(pull2, nng_optid_recvtimeo, usecs) == 0); + So(nng_setopt_usec(pull3, nng_optid_recvtimeo, usecs) == 0); + So(nng_listen(push, addr, NULL, 0) == 0); + So(nng_dial(pull1, addr, NULL, 0) == 0); + So(nng_dial(pull2, addr, NULL, 0) == 0); + So(nng_dial(pull3, addr, NULL, 0) == 0); + So(nng_shutdown(pull3) == 0); + + // So pull3 might not be done accepting yet, but pull1 + // and pull2 definitely are, because otherwise the + // server couldn't have gotten to the accept. (The + // accept logic is single threaded.) Let's wait a bit + // though, to ensure that stuff has settled. + nng_usleep(100000); + + So(nng_sendmsg(push, abc, 0) == 0); + So(nng_sendmsg(push, def, 0) == 0); + + abc = NULL; + def = NULL; + + So(nng_recvmsg(pull1, &abc, 0) == 0); + CHECKSTR(abc, "abc"); + So(nng_recvmsg(pull2, &def, 0) == 0); + CHECKSTR(def, "def"); + nng_msg_free(abc); + nng_msg_free(def); + + So(nng_recvmsg(pull1, &abc, 0) == NNG_ETIMEDOUT); + So(nng_recvmsg(pull2, &abc, 0) == NNG_ETIMEDOUT); + }); +}); diff --git a/tests/pollfd.c b/tests/pollfd.c index 05ed2939..6cf712f1 100644 --- a/tests/pollfd.c +++ b/tests/pollfd.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -10,6 +11,8 @@ #include "convey.h" #include "nng.h" +#include <string.h> + #ifndef _WIN32 #include <poll.h> #include <unistd.h> @@ -29,102 +32,120 @@ #endif -TestMain("Poll FDs", - { - - Convey("Given a connected pair of sockets", { - nng_socket s1; - nng_socket s2; - - So(nng_pair_open(&s1) == 0); - So(nng_pair_open(&s2) == 0); - Reset({ - nng_close(s1); - nng_close(s2); - }); - So(nng_listen(s1, "inproc://yeahbaby", NULL, 0) == 0); - nng_usleep(50000); - - So(nng_dial(s2, "inproc://yeahbaby", NULL, 0) == 0); - nng_usleep(50000); - - Convey("We can get a recv FD", { - int fd; - size_t sz; - - sz = sizeof(fd); - So(nng_getopt(s1, NNG_OPT_RCVFD, &fd, &sz) == 0); - So(fd != INVALID_SOCKET); - - Convey("And it is always the same fd", { - int fd2; - sz = sizeof(fd2); - So(nng_getopt(s1, NNG_OPT_RCVFD, &fd2, &sz) == - 0); - So(fd2 == fd); - }); - - Convey("And they start non pollable", { - struct pollfd pfd; - pfd.fd = fd; - pfd.events = POLLIN; - pfd.revents = 0; - - So(poll(&pfd, 1, 0) == 0); - So(pfd.revents == 0); - }); - - Convey("But if we write they are pollable", { - struct pollfd pfd; - pfd.fd = fd; - pfd.events = POLLIN; - pfd.revents = 0; - - So(nng_send(s2, "kick", 5, 0) == 0); - So(poll(&pfd, 1, 1000) == 1); - So((pfd.revents & POLLIN) != 0); - }); - }); - - Convey("We can get a send FD", { - int fd; - size_t sz; - - sz = sizeof(fd); - So(nng_getopt(s1, NNG_OPT_SNDFD, &fd, &sz) == 0); - So(fd != INVALID_SOCKET); - So(nng_send(s1, "oops", 4, 0) == 0); - }); - - Convey("Must have a big enough size", { - int fd; - size_t sz; - sz = 1; - So(nng_getopt(s1, NNG_OPT_RCVFD, &fd, &sz) == - NNG_EINVAL); - sz = 128; - So(nng_getopt(s1, NNG_OPT_RCVFD, &fd, &sz) == 0); - So(sz == sizeof(fd)); - }); - Convey("We cannot get a send FD for PULL", { - nng_socket s3; - int fd; - size_t sz; - So(nng_pull_open(&s3) == 0); - Reset({ nng_close(s3); }); - sz = sizeof(fd); - So(nng_getopt(s3, NNG_OPT_SNDFD, &fd, &sz) == - NNG_ENOTSUP); - }); - - Convey("We cannot get a recv FD for PUSH", { - nng_socket s3; - int fd; - size_t sz; - So(nng_push_open(&s3) == 0); - Reset({ nng_close(s3); }); - sz = sizeof(fd); - So(nng_getopt(s3, NNG_OPT_RCVFD, &fd, &sz) == - NNG_ENOTSUP); - }); - }) }) +TestMain("Poll FDs", { + + Convey("Option values work", { + int opt; + const char *name; + + opt = nng_option_lookup(nng_opt_recvfd); + So(opt > 0); + So(opt == nng_optid_recvfd); + name = nng_option_name(opt); + So(name != NULL); + So(strcmp(name, nng_opt_recvfd) == 0); + opt = nng_option_lookup(nng_opt_sendfd); + So(opt > 0); + So(opt == nng_optid_sendfd); + name = nng_option_name(opt); + So(name != NULL); + So(strcmp(name, nng_opt_sendfd) == 0); + }); + + Convey("Given a connected pair of sockets", { + nng_socket s1; + nng_socket s2; + + So(nng_pair_open(&s1) == 0); + So(nng_pair_open(&s2) == 0); + Reset({ + nng_close(s1); + nng_close(s2); + }); + So(nng_listen(s1, "inproc://yeahbaby", NULL, 0) == 0); + nng_usleep(50000); + + So(nng_dial(s2, "inproc://yeahbaby", NULL, 0) == 0); + nng_usleep(50000); + + Convey("We can get a recv FD", { + int fd; + size_t sz; + + sz = sizeof(fd); + So(nng_getopt(s1, nng_optid_recvfd, &fd, &sz) == 0); + So(fd != INVALID_SOCKET); + + Convey("And it is always the same fd", { + int fd2; + sz = sizeof(fd2); + So(nng_getopt( + s1, nng_optid_recvfd, &fd2, &sz) == 0); + So(fd2 == fd); + }); + + Convey("And they start non pollable", { + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; + + So(poll(&pfd, 1, 0) == 0); + So(pfd.revents == 0); + }); + + Convey("But if we write they are pollable", { + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; + + So(nng_send(s2, "kick", 5, 0) == 0); + So(poll(&pfd, 1, 1000) == 1); + So((pfd.revents & POLLIN) != 0); + }); + }); + + Convey("We can get a send FD", { + int fd; + size_t sz; + + sz = sizeof(fd); + So(nng_getopt(s1, nng_optid_sendfd, &fd, &sz) == 0); + So(fd != INVALID_SOCKET); + So(nng_send(s1, "oops", 4, 0) == 0); + }); + + Convey("Must have a big enough size", { + int fd; + size_t sz; + sz = 1; + So(nng_getopt(s1, nng_optid_recvfd, &fd, &sz) == + NNG_EINVAL); + sz = 128; + So(nng_getopt(s1, nng_optid_recvfd, &fd, &sz) == 0); + So(sz == sizeof(fd)); + }); + Convey("We cannot get a send FD for PULL", { + nng_socket s3; + int fd; + size_t sz; + So(nng_pull_open(&s3) == 0); + Reset({ nng_close(s3); }); + sz = sizeof(fd); + So(nng_getopt(s3, nng_optid_sendfd, &fd, &sz) == + NNG_ENOTSUP); + }); + + Convey("We cannot get a recv FD for PUSH", { + nng_socket s3; + int fd; + size_t sz; + So(nng_push_open(&s3) == 0); + Reset({ nng_close(s3); }); + sz = sizeof(fd); + So(nng_getopt(s3, nng_optid_recvfd, &fd, &sz) == + NNG_ENOTSUP); + }); + }); +}) diff --git a/tests/pubsub.c b/tests/pubsub.c index 2712181d..de2fcfc5 100644 --- a/tests/pubsub.c +++ b/tests/pubsub.c @@ -83,35 +83,35 @@ TestMain("PUB/SUB pattern", { So(nng_dial(pub, addr, NULL, 0) == 0); Convey("Sub can subscribe", { - So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "ABC", 3) == 0); - So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "", 0) == 0); + So(nng_setopt( + sub, nng_optid_sub_subscribe, "ABC", 3) == 0); + So(nng_setopt(sub, nng_optid_sub_subscribe, "", 0) == + 0); Convey("Unsubscribe works", { - So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, "ABC", - 3) == 0); - So(nng_setopt( - sub, NNG_OPT_UNSUBSCRIBE, "", 0) == 0); - - So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, "", - 0) == NNG_ENOENT); - So(nng_setopt(sub, NNG_OPT_UNSUBSCRIBE, + So(nng_setopt(sub, nng_optid_sub_unsubscribe, + "ABC", 3) == 0); + So(nng_setopt(sub, nng_optid_sub_unsubscribe, + "", 0) == 0); + + So(nng_setopt(sub, nng_optid_sub_unsubscribe, + "", 0) == NNG_ENOENT); + So(nng_setopt(sub, nng_optid_sub_unsubscribe, "HELLO", 0) == NNG_ENOENT); }); }); Convey("Pub cannot subscribe", { - So(nng_setopt(pub, NNG_OPT_SUBSCRIBE, "", 0) == + So(nng_setopt(pub, nng_optid_sub_subscribe, "", 0) == NNG_ENOTSUP); }); Convey("Subs can receive from pubs", { nng_msg *msg; - uint64_t rtimeo; - So(nng_setopt(sub, NNG_OPT_SUBSCRIBE, "/some/", + So(nng_setopt(sub, nng_optid_sub_subscribe, "/some/", strlen("/some/")) == 0); - rtimeo = 50000; // 50ms - So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, - sizeof(rtimeo)) == 0); + So(nng_setopt_usec(sub, nng_optid_recvtimeo, 90000) == + 0); So(nng_msg_alloc(&msg, 0) == 0); APPENDSTR(msg, "/some/like/it/hot"); @@ -139,10 +139,9 @@ TestMain("PUB/SUB pattern", { Convey("Subs without subsciptions don't receive", { - uint64_t rtimeo = 50000; // 50ms nng_msg *msg; - So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, - sizeof(rtimeo)) == 0); + So(nng_setopt_usec(sub, nng_optid_recvtimeo, 90000) == + 0); So(nng_msg_alloc(&msg, 0) == 0); APPENDSTR(msg, "/some/don't/like/it"); @@ -152,14 +151,11 @@ TestMain("PUB/SUB pattern", { Convey("Subs in raw receive", { - uint64_t rtimeo = 50000; // 500ms - int raw = 1; nng_msg *msg; - So(nng_setopt(sub, NNG_OPT_RCVTIMEO, &rtimeo, - sizeof(rtimeo)) == 0); - So(nng_setopt(sub, NNG_OPT_RAW, &raw, sizeof(raw)) == + So(nng_setopt_usec(sub, nng_optid_recvtimeo, 90000) == 0); + So(nng_setopt_int(sub, nng_optid_raw, 1) == 0); So(nng_msg_alloc(&msg, 0) == 0); APPENDSTR(msg, "/some/like/it/raw"); diff --git a/tests/reqrep.c b/tests/reqrep.c index e36232b6..537d45c8 100644 --- a/tests/reqrep.c +++ b/tests/reqrep.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -12,6 +13,9 @@ #include <string.h> +extern const char *nng_opt_req_resendtime; +extern int nng_optid_req_resendtime; + TestMain("REQ/REP pattern", { int rv; const char *addr = "inproc://test"; @@ -27,6 +31,22 @@ TestMain("REQ/REP pattern", { So(nng_peer(req) == NNG_PROTO_REP); }); + Convey("Resend time option id works", { + int opt; + const char *name; + opt = nng_option_lookup(nng_opt_req_resendtime); + So(opt >= 0); + So(opt == nng_optid_req_resendtime); + name = nng_option_name(opt); + So(name != NULL); + So(strcmp(name, nng_opt_req_resendtime) == 0); + + // Set timeout. + So(nng_setopt_usec(req, opt, 10000) == 0); + // Check invalid size + So(nng_setopt(req, opt, name, 1) == NNG_EINVAL); + }); + Convey("Recv with no send fails", { nng_msg *msg; rv = nng_recvmsg(req, &msg, 0); @@ -53,6 +73,11 @@ TestMain("REQ/REP pattern", { So(rv == NNG_ESTATE); nng_msg_free(msg); }); + + Convey("Cannot set resend time", { + So(nng_setopt_usec(rep, nng_optid_req_resendtime, + 100) == NNG_ENOTSUP); + }); }); Convey("We can create a linked REQ/REP pair", { @@ -115,8 +140,8 @@ TestMain("REQ/REP pattern", { nng_close(req); }); - So(nng_setopt_usec(req, NNG_OPT_RESENDTIME, retry) == 0); - So(nng_setopt_int(req, NNG_OPT_SNDBUF, 16) == 0); + So(nng_setopt_usec(req, nng_optid_req_resendtime, retry) == 0); + So(nng_setopt_int(req, nng_optid_sendbuf, 16) == 0); So(nng_msg_alloc(&abc, 0) == 0); So(nng_msg_append(abc, "abc", 4) == 0); diff --git a/tests/scalability.c b/tests/scalability.c index 0bb93401..f0fc6366 100644 --- a/tests/scalability.c +++ b/tests/scalability.c @@ -52,29 +52,15 @@ openclients(nng_socket *clients, int num) int i; uint64_t t; for (i = 0; i < num; i++) { - if ((rv = nng_req_open(&clients[i])) != 0) { - printf("open #%d: %s\n", i, nng_strerror(rv)); - return (rv); - } - t = 100000; // 100ms - rv = nng_setopt(clients[i], NNG_OPT_RCVTIMEO, &t, sizeof(t)); - if (rv != 0) { - printf( - "setopt(RCVTIMEO) #%d: %s\n", i, nng_strerror(rv)); - return (rv); - } - t = 100000; // 100ms - rv = nng_setopt(clients[i], NNG_OPT_SNDTIMEO, &t, sizeof(t)); - if (rv != 0) { - printf( - "setopt(SNDTIMEO) #%d: %s\n", i, nng_strerror(rv)); - return (rv); - } - rv = nng_dial(clients[i], addr, NULL, 0); - if (rv != 0) { - printf("dial #%d: %s\n", i, nng_strerror(rv)); + t = 100000; // 100ms + nng_socket c; + if (((rv = nng_req_open(&c)) != 0) || + ((rv = nng_setopt_usec(c, nng_optid_recvtimeo, t)) != 0) || + ((rv = nng_setopt_usec(c, nng_optid_sendtimeo, t)) != 0) || + ((rv = nng_dial(c, addr, NULL, 0)) != 0)) { return (rv); } + clients[i] = c; } return (0); } @@ -88,31 +74,22 @@ transact(nng_socket *clients, int num) for (i = 0; i < num; i++) { - if ((rv = nng_msg_alloc(&msg, 0)) != 0) { - break; - } - - if ((rv = nng_sendmsg(clients[i], msg, 0)) != 0) { - break; - } - - msg = NULL; - if ((rv = nng_recvmsg(clients[i], &msg, 0)) != 0) { + if (((rv = nng_msg_alloc(&msg, 0)) != 0) || + ((rv = nng_sendmsg(clients[i], msg, 0)) != 0) || + ((rv = nng_recvmsg(clients[i], &msg, 0)) != 0)) { + // We may leak a message, but this is an + // error case anyway. break; } nng_msg_free(msg); msg = NULL; } - if (msg != NULL) { - nng_msg_free(msg); - } return (rv); } Main({ nng_socket *clients; int * results; - int depth = 256; atexit(stop); @@ -120,8 +97,8 @@ Main({ results = calloc(nclients, sizeof(int)); if ((nng_rep_open(&rep) != 0) || - (nng_setopt(rep, NNG_OPT_RCVBUF, &depth, sizeof(depth)) != 0) || - (nng_setopt(rep, NNG_OPT_SNDBUF, &depth, sizeof(depth)) != 0) || + (nng_setopt_int(rep, nng_optid_recvbuf, 256) != 0) || + (nng_setopt_int(rep, nng_optid_sendbuf, 256) != 0) || (nng_listen(rep, addr, NULL, 0) != 0) || (nng_thread_create(&server, serve, NULL) != 0)) { fprintf(stderr, "Unable to set up server!\n"); diff --git a/tests/sock.c b/tests/sock.c index 2e2345c9..81c0eb59 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -63,7 +64,7 @@ TestMain("Socket Operations", { now = nng_clock(); So(now > 0); - So(nng_setopt_usec(s1, NNG_OPT_RCVTIMEO, to) == 0); + So(nng_setopt_usec(s1, nng_optid_recvtimeo, to) == 0); So(nng_recvmsg(s1, &msg, 0) == NNG_ETIMEDOUT); So(msg == NULL); So(nng_clock() >= (now + to)); @@ -87,7 +88,7 @@ TestMain("Socket Operations", { So(msg != NULL); now = nng_clock(); - So(nng_setopt_usec(s1, NNG_OPT_SNDTIMEO, to) == 0); + So(nng_setopt_usec(s1, nng_optid_sendtimeo, to) == 0); So(nng_sendmsg(s1, msg, 0) == NNG_ETIMEDOUT); So(nng_clock() >= (now + to)); So(nng_clock() < (now + (to * 2))); @@ -99,17 +100,17 @@ TestMain("Socket Operations", { int64_t v = 0; size_t sz; - So(nng_setopt_usec(s1, NNG_OPT_SNDTIMEO, to) == 0); + So(nng_setopt_usec(s1, nng_optid_sendtimeo, to) == 0); Convey("Read only options handled properly", { - So(nng_setopt_int(s1, NNG_OPT_RCVFD, 0) == + So(nng_setopt_int(s1, nng_optid_recvfd, 0) == NNG_EINVAL); - So(nng_setopt_int(s1, NNG_OPT_SNDFD, 0) == + So(nng_setopt_int(s1, nng_optid_sendfd, 0) == NNG_EINVAL); - So(nng_setopt(s1, NNG_OPT_LOCALADDR, "a", 1) == + So(nng_setopt(s1, nng_optid_locaddr, "a", 1) == + NNG_EINVAL); + So(nng_setopt(s1, nng_optid_remaddr, "a", 1) == NNG_EINVAL); - So(nng_setopt(s1, NNG_OPT_REMOTEADDR, "a", - 1) == NNG_EINVAL); }); Convey("We can apply options before endpoint", { @@ -119,51 +120,51 @@ TestMain("Socket Operations", { addr, "ipc:///tmp/lopt_%u"); So(nng_setopt_size( - s1, NNG_OPT_RCVMAXSZ, 543) == 0); + s1, nng_optid_recvmaxsz, 543) == 0); So(nng_listener_create(&l, s1, addr) == 0); So(nng_listener_getopt_size( - l, NNG_OPT_RCVMAXSZ, &sz) == 0); + l, nng_optid_recvmaxsz, &sz) == 0); So(sz == 543); Convey("Endpoint option can be overridden", { - So(nng_listener_setopt_size( - l, NNG_OPT_RCVMAXSZ, 678) == 0); - So(nng_listener_getopt_size( - l, NNG_OPT_RCVMAXSZ, &sz) == 0); + So(nng_listener_setopt_size(l, + nng_optid_recvmaxsz, 678) == 0); + So(nng_listener_getopt_size(l, + nng_optid_recvmaxsz, &sz) == 0); So(sz == 678); So(nng_getopt_size(s1, - NNG_OPT_RCVMAXSZ, &sz) == 0); + nng_optid_recvmaxsz, &sz) == 0); So(sz == 543); }); Convey("And socket overrides again", { So(nng_setopt_size(s1, - NNG_OPT_RCVMAXSZ, 911) == 0); - So(nng_listener_getopt_size( - l, NNG_OPT_RCVMAXSZ, &sz) == 0); + nng_optid_recvmaxsz, 911) == 0); + So(nng_listener_getopt_size(l, + nng_optid_recvmaxsz, &sz) == 0); So(sz == 911); }); }); Convey("Short size is not copied", { sz = 0; - So(nng_getopt(s1, NNG_OPT_SNDTIMEO, &v, &sz) == - 0); + So(nng_getopt( + s1, nng_optid_sendtimeo, &v, &sz) == 0); So(sz == sizeof(v)); So(v == 0); sz = 0; - So(nng_getopt( - s1, NNG_OPT_RECONN_TIME, &v, &sz) == 0); + So(nng_getopt(s1, nng_optid_reconnmint, &v, + &sz) == 0); So(v == 0); - So(nng_getopt(s1, NNG_OPT_RECONN_MAXTIME, &v, + So(nng_getopt(s1, nng_optid_reconnmaxt, &v, &sz) == 0); So(v == 0); }); Convey("Correct size is copied", { sz = sizeof(v); - So(nng_getopt(s1, NNG_OPT_SNDTIMEO, &v, &sz) == - 0); + So(nng_getopt( + s1, nng_optid_sendtimeo, &v, &sz) == 0); So(sz == sizeof(v)); So(v == 1234); }); @@ -171,48 +172,47 @@ TestMain("Socket Operations", { Convey("Short size buf is not copied", { int l = 5; sz = 0; - So(nng_getopt(s1, NNG_OPT_RCVBUF, &l, &sz) == - 0); + So(nng_getopt( + s1, nng_optid_recvbuf, &l, &sz) == 0); So(sz == sizeof(l)); So(l == 5); }); Convey("Insane buffer size fails", { - So(nng_setopt_int(s1, NNG_OPT_RCVBUF, + So(nng_setopt_int(s1, nng_optid_recvbuf, 0x100000) == NNG_EINVAL); - So(nng_setopt_int(s1, NNG_OPT_RCVBUF, -200) == - NNG_EINVAL); - + So(nng_setopt_int(s1, nng_optid_recvbuf, + -200) == NNG_EINVAL); }); Convey("Negative timeout fails", { - So(nng_setopt_usec(s1, NNG_OPT_RCVTIMEO, -5) == - NNG_EINVAL); + So(nng_setopt_usec(s1, nng_optid_recvtimeo, + -5) == NNG_EINVAL); }); Convey("Short timeout fails", { to = 0; sz = sizeof(to) - 1; - So(nng_setopt(s1, NNG_OPT_RCVTIMEO, &to, sz) == - NNG_EINVAL); - So(nng_setopt(s1, NNG_OPT_RECONN_TIME, &to, + So(nng_setopt(s1, nng_optid_recvtimeo, &to, + sz) == NNG_EINVAL); + So(nng_setopt(s1, nng_optid_reconnmint, &to, sz) == NNG_EINVAL); }); Convey("Bogus raw fails", { - So(nng_setopt_int(s1, NNG_OPT_RAW, 42) == + So(nng_setopt_int(s1, nng_optid_raw, 42) == NNG_EINVAL); - So(nng_setopt_int(s1, NNG_OPT_RAW, -42) == + So(nng_setopt_int(s1, nng_optid_raw, -42) == NNG_EINVAL); - So(nng_setopt_int(s1, NNG_OPT_RAW, 0) == 0); - So(nng_setopt(s1, NNG_OPT_RAW, "a", 1) == + So(nng_setopt_int(s1, nng_optid_raw, 0) == 0); + So(nng_setopt(s1, nng_optid_raw, "a", 1) == NNG_EINVAL); }); Convey("Unsupported options fail", { char *crap = "crap"; - So(nng_setopt(s1, NNG_OPT_SUBSCRIBE, crap, - strlen(crap)) == NNG_ENOTSUP); + So(nng_setopt(s1, nng_optid_sub_subscribe, + crap, strlen(crap)) == NNG_ENOTSUP); }); Convey("Bogus sizes fail", { @@ -220,30 +220,30 @@ TestMain("Socket Operations", { int i; So(nng_setopt_size( - s1, NNG_OPT_RCVMAXSZ, 6550) == 0); - So(nng_getopt_size(s1, NNG_OPT_RCVMAXSZ, &v) == - 0); + s1, nng_optid_recvmaxsz, 6550) == 0); + So(nng_getopt_size( + s1, nng_optid_recvmaxsz, &v) == 0); So(v == 6550); v = 102400; - So(nng_setopt(s1, NNG_OPT_RCVMAXSZ, &v, 1) == - NNG_EINVAL); - So(nng_getopt_size(s1, NNG_OPT_RCVMAXSZ, &v) == - 0); + So(nng_setopt(s1, nng_optid_recvmaxsz, &v, + 1) == NNG_EINVAL); + So(nng_getopt_size( + s1, nng_optid_recvmaxsz, &v) == 0); So(v == 6550); i = 42; - So(nng_setopt(s1, NNG_OPT_RCVBUF, &i, 1) == + So(nng_setopt(s1, nng_optid_recvbuf, &i, 1) == NNG_EINVAL); if (sizeof(size_t) == 8) { v = 0x10000; v <<= 30; So(nng_setopt_size(s1, - NNG_OPT_RCVMAXSZ, + nng_optid_recvmaxsz, v) == NNG_EINVAL); - So(nng_getopt_size( - s1, NNG_OPT_RCVMAXSZ, &v) == 0); + So(nng_getopt_size(s1, + nng_optid_recvmaxsz, &v) == 0); So(v == 6550); } }); @@ -316,21 +316,22 @@ TestMain("Socket Operations", { Convey("Options work", { size_t sz; So(nng_dialer_setopt_size( - ep, NNG_OPT_RCVMAXSZ, 4321) == 0); + ep, nng_optid_recvmaxsz, 4321) == 0); So(nng_dialer_getopt_size( - ep, NNG_OPT_RCVMAXSZ, &sz) == 0); + ep, nng_optid_recvmaxsz, &sz) == 0); So(sz == 4321); }); Convey("Socket opts not for dialer", { // Not appropriate for dialer. - So(nng_dialer_setopt_int(ep, NNG_OPT_RAW, 1) == - NNG_ENOTSUP); + So(nng_dialer_setopt_int( + ep, nng_optid_raw, 1) == NNG_ENOTSUP); So(nng_dialer_setopt_usec(ep, - NNG_OPT_RECONN_TIME, 1) == NNG_ENOTSUP); + nng_optid_reconnmint, + 1) == NNG_ENOTSUP); }); Convey("Bad size checks", { - So(nng_dialer_setopt(ep, NNG_OPT_RCVMAXSZ, "a", - 1) == NNG_EINVAL); + So(nng_dialer_setopt(ep, nng_optid_recvmaxsz, + "a", 1) == NNG_EINVAL); }); Convey("Cannot listen", { So(nng_listener_start(ep, 0) == NNG_ENOTSUP); }); @@ -344,20 +345,21 @@ TestMain("Socket Operations", { Convey("Options work", { size_t sz; So(nng_listener_setopt_size( - ep, NNG_OPT_RCVMAXSZ, 4321) == 0); + ep, nng_optid_recvmaxsz, 4321) == 0); So(nng_listener_getopt_size( - ep, NNG_OPT_RCVMAXSZ, &sz) == 0); + ep, nng_optid_recvmaxsz, &sz) == 0); So(sz == 4321); }); Convey("Socket opts not for dialer", { // Not appropriate for dialer. So(nng_listener_setopt_int( - ep, NNG_OPT_RAW, 1) == NNG_ENOTSUP); + ep, nng_optid_raw, 1) == NNG_ENOTSUP); So(nng_listener_setopt_usec(ep, - NNG_OPT_RECONN_TIME, 1) == NNG_ENOTSUP); + nng_optid_reconnmint, + 1) == NNG_ENOTSUP); }); Convey("Bad size checks", { - So(nng_listener_setopt(ep, NNG_OPT_RCVMAXSZ, + So(nng_listener_setopt(ep, nng_optid_recvmaxsz, "a", 1) == NNG_EINVAL); }); Convey("Cannot dial", @@ -370,30 +372,30 @@ TestMain("Socket Operations", { uint64_t t; So(nng_dialer_setopt_size( - 1999, NNG_OPT_RCVMAXSZ, 10) == NNG_ENOENT); + 1999, nng_optid_recvmaxsz, 10) == NNG_ENOENT); So(nng_listener_setopt_size( - 1999, NNG_OPT_RCVMAXSZ, 10) == NNG_ENOENT); + 1999, nng_optid_recvmaxsz, 10) == NNG_ENOENT); s = 1; - So(nng_dialer_getopt(1999, NNG_OPT_RAW, &i, &s) == + So(nng_dialer_getopt(1999, nng_optid_raw, &i, &s) == NNG_ENOENT); - So(nng_listener_getopt(1999, NNG_OPT_RAW, &i, &s) == + So(nng_listener_getopt(1999, nng_optid_raw, &i, &s) == NNG_ENOENT); So(nng_dialer_getopt_size( - 1999, NNG_OPT_RCVMAXSZ, &s) == NNG_ENOENT); + 1999, nng_optid_recvmaxsz, &s) == NNG_ENOENT); So(nng_listener_getopt_size( - 1999, NNG_OPT_RCVMAXSZ, &s) == NNG_ENOENT); + 1999, nng_optid_recvmaxsz, &s) == NNG_ENOENT); - So(nng_dialer_getopt_int(1999, NNG_OPT_RAW, &i) == + So(nng_dialer_getopt_int(1999, nng_optid_raw, &i) == NNG_ENOENT); - So(nng_listener_getopt_int(1999, NNG_OPT_RAW, &i) == + So(nng_listener_getopt_int(1999, nng_optid_raw, &i) == NNG_ENOENT); - So(nng_dialer_getopt_usec(1999, NNG_OPT_LINGER, &t) == - NNG_ENOENT); + So(nng_dialer_getopt_usec( + 1999, nng_optid_linger, &t) == NNG_ENOENT); So(nng_listener_getopt_usec( - 1999, NNG_OPT_LINGER, &t) == NNG_ENOENT); + 1999, nng_optid_linger, &t) == NNG_ENOENT); }); @@ -404,8 +406,8 @@ TestMain("Socket Operations", { trantest_next_address(addr, "ipc:///tmp/sock_test_%u"); So(nng_dialer_create(&ep, s1, addr) == 0); So(nng_dialer_start(ep, NNG_FLAG_NONBLOCK) == 0); - So(nng_dialer_setopt_size(ep, NNG_OPT_RCVMAXSZ, 10) == - NNG_ESTATE); + So(nng_dialer_setopt_size( + ep, nng_optid_recvmaxsz, 10) == NNG_ESTATE); So(nng_dialer_close(ep) == 0); So(nng_dialer_close(ep) == NNG_ENOENT); }); @@ -419,7 +421,7 @@ TestMain("Socket Operations", { So(nng_listener_create(&ep, s1, addr) == 0); So(nng_listener_start(ep, 0) == 0); So(nng_listener_setopt_size( - ep, NNG_OPT_RCVMAXSZ, 10) == NNG_ESTATE); + ep, nng_optid_recvmaxsz, 10) == NNG_ESTATE); So(nng_listener_close(ep) == 0); So(nng_listener_close(ep) == NNG_ENOENT); }); @@ -435,17 +437,17 @@ TestMain("Socket Operations", { So(nng_pair_open(&s2) == 0); Reset({ nng_close(s2); }); - So(nng_setopt_int(s1, NNG_OPT_RCVBUF, 1) == 0); - So(nng_getopt_int(s1, NNG_OPT_RCVBUF, &len) == 0); + So(nng_setopt_int(s1, nng_optid_recvbuf, 1) == 0); + So(nng_getopt_int(s1, nng_optid_recvbuf, &len) == 0); So(len == 1); - So(nng_setopt_int(s1, NNG_OPT_SNDBUF, 1) == 0); - So(nng_setopt_int(s2, NNG_OPT_SNDBUF, 1) == 0); + So(nng_setopt_int(s1, nng_optid_sendbuf, 1) == 0); + So(nng_setopt_int(s2, nng_optid_sendbuf, 1) == 0); - So(nng_setopt_usec(s1, NNG_OPT_SNDTIMEO, to) == 0); - So(nng_setopt_usec(s1, NNG_OPT_RCVTIMEO, to) == 0); - So(nng_setopt_usec(s2, NNG_OPT_SNDTIMEO, to) == 0); - So(nng_setopt_usec(s2, NNG_OPT_RCVTIMEO, to) == 0); + So(nng_setopt_usec(s1, nng_optid_sendtimeo, to) == 0); + So(nng_setopt_usec(s1, nng_optid_recvtimeo, to) == 0); + So(nng_setopt_usec(s2, nng_optid_sendtimeo, to) == 0); + So(nng_setopt_usec(s2, nng_optid_recvtimeo, to) == 0); So(nng_listen(s1, a, NULL, 0) == 0); So(nng_dial(s2, a, NULL, 0) == 0); diff --git a/tests/survey.c b/tests/survey.c index 2bdd2930..ae252cd5 100644 --- a/tests/survey.c +++ b/tests/survey.c @@ -9,142 +9,133 @@ // #include "convey.h" -#include "core/nng_impl.h" #include "nng.h" #include <string.h> +extern const char *nng_opt_surveyor_surveytime; +extern int nng_optid_surveyor_surveytime; + #define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) #define CHECKSTR(m, s) \ So(nng_msg_len(m) == strlen(s)); \ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) -Main({ +TestMain("SURVEY pattern", { const char *addr = "inproc://test"; - nni_init(); - - Test("SURVEY pattern", { - Convey("We can create a SURVEYOR socket", - { - nng_socket surv; - - So(nng_surveyor_open(&surv) == 0); - - Reset({ nng_close(surv); }); - - Convey("Protocols match", { - So(nng_protocol(surv) == - NNG_PROTO_SURVEYOR); - So(nng_peer(surv) == NNG_PROTO_RESPONDENT); - }); - - Convey("Recv with no survey fails", { - nng_msg *msg; - So(nng_recvmsg(surv, &msg, 0) == - NNG_ESTATE); - }); - - Convey("Survey without responder times out", { - uint64_t expire = 50000; - nng_msg *msg; - - So(nng_setopt(surv, NNG_OPT_SURVEYTIME, - &expire, sizeof(expire)) == 0); - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_sendmsg(surv, msg, 0) == 0); - So(nng_recvmsg(surv, &msg, 0) == - NNG_ETIMEDOUT); - }); - }) - - Convey("We can create a RESPONDENT socket", - { - nng_socket resp; - So(nng_respondent_open(&resp) == 0); - - Reset({ nng_close(resp); }); - - Convey("Protocols match", { - So(nng_protocol(resp) == - NNG_PROTO_RESPONDENT); - So(nng_peer(resp) == - NNG_PROTO_SURVEYOR); - }); - - Convey("Send fails with no suvey", { - nng_msg *msg; - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_sendmsg(resp, msg, 0) == - NNG_ESTATE); - nng_msg_free(msg); - }); - }) - - Convey("We can create a linked survey pair", { - nng_socket surv; - nng_socket resp; - nng_socket sock; - uint64_t expire; - - So(nng_surveyor_open(&surv) == 0); - So(nng_respondent_open(&resp) == 0); - - Reset({ - nng_close(surv); - nng_close(resp); - }); - - expire = 50000; - So(nng_setopt(surv, NNG_OPT_SURVEYTIME, - &expire, sizeof(expire)) == 0); - - So(nng_listen(surv, addr, NULL, 0) == 0); - So(nng_dial(resp, addr, NULL, 0) == 0); - - // We dial another socket as that will force - // the earlier dial to have completed *fully*. - // This is a hack that only works because our - // listen logic is single threaded. - So(nng_respondent_open(&sock) == 0); - So(nng_dial(sock, addr, NULL, 0) == 0); - nng_close(sock); - - Convey("Survey works", { - nng_msg *msg; - uint64_t rtimeo; - - So(nng_msg_alloc(&msg, 0) == 0); - APPENDSTR(msg, "abc"); - So(nng_sendmsg(surv, msg, 0) == 0); - msg = NULL; - So(nng_recvmsg(resp, &msg, 0) == 0); - CHECKSTR(msg, "abc"); - nng_msg_chop(msg, 3); - APPENDSTR(msg, "def"); - So(nng_sendmsg(resp, msg, 0) == 0); - msg = NULL; - So(nng_recvmsg(surv, &msg, 0) == 0); - CHECKSTR(msg, "def"); - nng_msg_free(msg); - - So(nng_recvmsg(surv, &msg, 0) == - NNG_ETIMEDOUT); - - Convey( - "And goes to non-survey state", { - rtimeo = 200000; - So(nng_setopt(surv, - NNG_OPT_RCVTIMEO, - &rtimeo, - sizeof(rtimeo)) == - 0); - So(nng_recvmsg(surv, &msg, - 0) == NNG_ESTATE); - }); - }); - }); + atexit(nng_fini); + + Convey("We can create a SURVEYOR socket", { + nng_socket surv; + + So(nng_surveyor_open(&surv) == 0); + + Reset({ nng_close(surv); }); + + Convey("Surveytime option id works", { + int opt; + const char *name; + opt = nng_option_lookup(nng_opt_surveyor_surveytime); + So(opt >= 0); + So(opt == nng_optid_surveyor_surveytime); + name = nng_option_name(opt); + So(name != NULL); + So(strcmp(name, nng_opt_surveyor_surveytime) == 0); + }); + + Convey("Protocols match", { + So(nng_protocol(surv) == NNG_PROTO_SURVEYOR); + So(nng_peer(surv) == NNG_PROTO_RESPONDENT); + }); + + Convey("Recv with no survey fails", { + nng_msg *msg; + So(nng_recvmsg(surv, &msg, 0) == NNG_ESTATE); + }); + + Convey("Survey without responder times out", { + nng_msg *msg; + + So(nng_setopt_usec(surv, nng_optid_surveyor_surveytime, + 50000) == 0); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT); + }); }); - nni_fini(); -}) + Convey("We can create a RESPONDENT socket", { + nng_socket resp; + So(nng_respondent_open(&resp) == 0); + + Reset({ nng_close(resp); }); + + Convey("Protocols match", { + So(nng_protocol(resp) == NNG_PROTO_RESPONDENT); + So(nng_peer(resp) == NNG_PROTO_SURVEYOR); + }); + + Convey("Send fails with no suvey", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(resp, msg, 0) == NNG_ESTATE); + nng_msg_free(msg); + }); + }); + + Convey("We can create a linked survey pair", { + nng_socket surv; + nng_socket resp; + nng_socket sock; + + So(nng_surveyor_open(&surv) == 0); + So(nng_respondent_open(&resp) == 0); + + Reset({ + nng_close(surv); + nng_close(resp); + }); + + So(nng_setopt_usec( + surv, nng_optid_surveyor_surveytime, 50000) == 0); + So(nng_listen(surv, addr, NULL, 0) == 0); + So(nng_dial(resp, addr, NULL, 0) == 0); + + // We dial another socket as that will force + // the earlier dial to have completed *fully*. + // This is a hack that only works because our + // listen logic is single threaded. + So(nng_respondent_open(&sock) == 0); + So(nng_dial(sock, addr, NULL, 0) == 0); + nng_close(sock); + + Convey("Survey works", { + nng_msg *msg; + uint64_t rtimeo; + + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "abc"); + So(nng_sendmsg(surv, msg, 0) == 0); + msg = NULL; + So(nng_recvmsg(resp, &msg, 0) == 0); + CHECKSTR(msg, "abc"); + nng_msg_chop(msg, 3); + APPENDSTR(msg, "def"); + So(nng_sendmsg(resp, msg, 0) == 0); + msg = NULL; + So(nng_recvmsg(surv, &msg, 0) == 0); + CHECKSTR(msg, "def"); + nng_msg_free(msg); + + So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT); + + Convey("And goes to non-survey state", { + rtimeo = 200000; + So(nng_setopt_usec(surv, nng_optid_recvtimeo, + 200000) == 0); + So(nng_recvmsg(surv, &msg, 0) == NNG_ESTATE); + }); + }); + }); +}); |
