aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-08 00:33:19 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-08 00:33:19 -0700
commit50532054c0bee3a1ff3324db10f3cdf7b44041e4 (patch)
tree6e924d21e7eb188865d98e9daa203a045cc928c5
parent702fe1d0af4b08a8b53172aaca57394b181d58b2 (diff)
downloadnng-50532054c0bee3a1ff3324db10f3cdf7b44041e4.tar.gz
nng-50532054c0bee3a1ff3324db10f3cdf7b44041e4.tar.bz2
nng-50532054c0bee3a1ff3324db10f3cdf7b44041e4.zip
Added nn_compat code for option handling, fixed other bugs.
Hop counts for REQ were busted (bad TTL), and imported the compat_reqtll test. At the same time, added code to nn_term to shut down completely, discarding sockets. (Note that some things, such as globals, may still be left around; that's ok.)
-rw-r--r--src/core/socket.c42
-rw-r--r--src/core/socket.h3
-rw-r--r--src/nng.c6
-rw-r--r--src/nng.h4
-rw-r--r--src/nng_compat.c139
-rw-r--r--src/protocol/reqrep/rep.c2
-rw-r--r--src/protocol/reqrep/req.c11
-rw-r--r--tests/CMakeLists.txt1
-rw-r--r--tests/compat_reqttl.c151
9 files changed, 350 insertions, 9 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index e59f6042..f2dbb573 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -15,6 +15,8 @@
// Socket implementation.
static nni_objhash *nni_socks = NULL;
+static nni_list nni_sock_list;
+static nni_mtx nni_sock_lk;
uint32_t
nni_sock_id(nni_sock *s)
@@ -362,6 +364,7 @@ nni_sock_ctor(uint32_t id)
sock->s_reconnmax = 0;
sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
sock->s_id = id;
+ NNI_LIST_NODE_INIT(&sock->s_node);
nni_pipe_sock_list_init(&sock->s_pipes);
@@ -436,8 +439,12 @@ nni_sock_sys_init(void)
{
int rv;
- rv = nni_objhash_init(&nni_socks, nni_sock_ctor, nni_sock_dtor);
-
+ NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node);
+ if (((rv = nni_objhash_init(
+ &nni_socks, nni_sock_ctor, nni_sock_dtor)) != 0) ||
+ ((rv = nni_mtx_init(&nni_sock_lk)) != 0)) {
+ nni_sock_sys_fini();
+ }
return (rv);
}
@@ -446,6 +453,7 @@ nni_sock_sys_fini(void)
{
nni_objhash_fini(nni_socks);
nni_socks = NULL;
+ nni_mtx_fini(&nni_sock_lk);
}
// nn_sock_open creates the underlying socket.
@@ -512,6 +520,10 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
sops->sock_open(sock->s_data);
+ nni_mtx_lock(&nni_sock_lk);
+ nni_list_append(&nni_sock_list, sock);
+ nni_mtx_unlock(&nni_sock_lk);
+
*sockp = sock;
return (0);
}
@@ -657,6 +669,10 @@ nni_sock_close(nni_sock *sock)
sock->s_closed = 1;
nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_lock(&nni_sock_lk);
+ nni_list_node_remove(&sock->s_node);
+ nni_mtx_unlock(&nni_sock_lk);
+
// At this point nothing else should be referencing us.
// As with UNIX close, it is a gross error for the caller
// to have concurrent threads using this. We've taken care to
@@ -672,6 +688,28 @@ nni_sock_close(nni_sock *sock)
nni_objhash_unref_wait(nni_socks, sock->s_id);
}
+void
+nni_sock_closeall(void)
+{
+ nni_sock *s;
+ uint32_t id;
+
+ for (;;) {
+ nni_mtx_lock(&nni_sock_lk);
+ if ((s = nni_list_first(&nni_sock_list)) == NULL) {
+ nni_mtx_unlock(&nni_sock_lk);
+ return;
+ }
+ id = s->s_id;
+ nni_list_node_remove(&s->s_node);
+ nni_mtx_unlock(&nni_sock_lk);
+
+ if (nni_sock_find(&s, id) == 0) {
+ nni_sock_close(s);
+ }
+ }
+}
+
int
nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire)
{
diff --git a/src/core/socket.h b/src/core/socket.h
index 41dfbc33..54c1b4da 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -23,6 +23,8 @@ struct nni_socket {
nni_msgq *s_uwq; // Upper write queue
nni_msgq *s_urq; // Upper read queue
+ nni_list_node s_node;
+
uint16_t s_protocol;
uint16_t s_peer;
uint32_t s_flags;
@@ -67,6 +69,7 @@ extern int nni_sock_find(nni_sock **, uint32_t);
extern void nni_sock_rele(nni_sock *);
extern int nni_sock_open(nni_sock **, uint16_t);
extern void nni_sock_close(nni_sock *);
+extern void nni_sock_closeall(void);
extern int nni_sock_shutdown(nni_sock *);
extern uint16_t nni_sock_proto(nni_sock *);
extern uint16_t nni_sock_peer(nni_sock *);
diff --git a/src/nng.c b/src/nng.c
index ac84012a..d5cc4f2b 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -73,6 +73,12 @@ nng_close(nng_socket sid)
return (rv);
}
+void
+nng_closeall(void)
+{
+ nni_sock_closeall();
+}
+
uint16_t
nng_protocol(nng_socket sid)
{
diff --git a/src/nng.h b/src/nng.h
index 48a04ae1..aac7ec89 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -75,6 +75,10 @@ NNG_DECL void nng_fini(void);
// pipes associated with the socket.
NNG_DECL int nng_close(nng_socket);
+// nng_closeall closes all open sockets. Do not call this from
+// a library; it will affect all sockets.
+NNG_DECL void nng_closeall(void);
+
// nng_shutdown shuts down the socket. This causes any threads doing
// work for the socket or blocked in socket functions to be woken (and
// return NNG_ECLOSED). The socket resources are still present, so it
diff --git a/src/nng_compat.c b/src/nng_compat.c
index 160fd33d..88e2f69c 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -60,7 +60,7 @@ nn_strerror(int err)
return ("Unknown I/O error");
}
- // Arguablye we could use strerror() here, but we should only
+ // Arguably we could use strerror() here, but we should only
// be getting errnos we understand at this point.
(void) snprintf(msgbuf, sizeof(msgbuf), "Unknown error %d", err);
return (msgbuf);
@@ -536,6 +536,133 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
}
int
+nn_getsockopt(int s, int nnlevel, int nnopt, void *valp, size_t *szp)
+{
+ int opt = 0;
+ int mscvt = 0;
+ uint64_t usec;
+ int * msecp;
+ int rv;
+
+ switch (nnlevel) {
+ case NN_SOL_SOCKET:
+ switch (nnopt) {
+ case NN_LINGER:
+ opt = NNG_OPT_LINGER;
+ break;
+ case NN_SNDBUF:
+ opt = NNG_OPT_SNDBUF;
+ break;
+ case NN_RCVBUF:
+ opt = NNG_OPT_RCVBUF;
+ break;
+ case NN_RECONNECT_IVL:
+ opt = NNG_OPT_RECONN_TIME;
+ mscvt = 1;
+ break;
+ case NN_RECONNECT_IVL_MAX:
+ opt = NNG_OPT_RECONN_MAXTIME;
+ mscvt = 1;
+ break;
+ case NN_SNDFD:
+ opt = NNG_OPT_SNDFD;
+ break;
+ case NN_RCVFD:
+ opt = NNG_OPT_RCVFD;
+ break;
+ case NN_RCVMAXSIZE:
+ opt = NNG_OPT_RCVMAXSZ;
+ break;
+ case NN_MAXTTL:
+ opt = NNG_OPT_MAXTTL;
+ break;
+ case NN_RCVTIMEO:
+ opt = NNG_OPT_RCVTIMEO;
+ mscvt = 1;
+ break;
+ case NN_SNDTIMEO:
+ opt = NNG_OPT_SNDTIMEO;
+ mscvt = 1;
+ break;
+ case NN_DOMAIN:
+ case NN_PROTOCOL:
+ case NN_IPV4ONLY:
+ case NN_SOCKET_NAME:
+ case NN_SNDPRIO:
+ case NN_RCVPRIO:
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+
+ break;
+ }
+ break;
+ case NN_REQ:
+ switch (nnopt) {
+ case NN_REQ_RESEND_IVL:
+ opt = NNG_OPT_RESENDTIME;
+ mscvt = 1;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ break;
+ case NN_SUB:
+ switch (nnopt) {
+ case NN_SUB_SUBSCRIBE:
+ opt = NNG_OPT_SUBSCRIBE;
+ break;
+ case NN_SUB_UNSUBSCRIBE:
+ opt = NNG_OPT_UNSUBSCRIBE;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ break;
+ case NN_SURVEYOR:
+ switch (nnopt) {
+ case NN_SURVEYOR_DEADLINE:
+ opt = NNG_OPT_SURVEYTIME;
+ mscvt = 1;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+
+ if (mscvt) {
+ if (*szp != sizeof(int)) {
+ errno = EINVAL;
+ return (-1);
+ }
+
+ msecp = valp;
+ valp = &usec;
+ *szp = sizeof(uint64_t);
+ }
+
+ if ((rv = nng_getopt((nng_socket) s, opt, valp, szp)) != 0) {
+ nn_seterror(rv);
+ return (-1);
+ }
+
+ if (mscvt) {
+ // We have to convert value to ms...
+ *msecp = (usec / 1000);
+ *szp = sizeof(int);
+ }
+
+ return (0);
+}
+
+int
nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz)
{
int opt = 0;
@@ -703,11 +830,11 @@ nn_device(int s1, int s2)
void
nn_term(void)
{
- // XXX: Implement something to do something. Probably we
- // should go through the nni_sockets idhash and clobber all
- // of the sockets. This function is relatively toxic, since
- // it can affect all sockets in the process, including those
- // in use by libraries, etc.
+ // This function is relatively toxic, since it can affect
+ // all sockets in the process, including those
+ // in use by libraries, etc. Accordingly, do not use this
+ // in a library -- only e.g. atexit() and similar.
+ nng_closeall();
}
// Internal test support routines.
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 049b1422..c7546182 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -312,7 +312,7 @@ nni_rep_pipe_recv_cb(void *arg)
}
// Move backtrace from body to header
- hops = 0;
+ hops = 1;
for (;;) {
int end = 0;
if (hops >= rep->ttl) {
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 8e7056f5..7ec53c90 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -35,6 +35,7 @@ struct nni_req_sock {
int raw;
int wantw;
int closed;
+ int ttl;
nni_msg * reqmsg;
nni_req_pipe *pendpipe;
@@ -101,6 +102,7 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
req->raw = 0;
req->wantw = 0;
req->resend = NNI_TIME_ZERO;
+ req->ttl = 8;
req->uwq = nni_sock_sendq(sock);
req->urq = nni_sock_recvq(sock);
@@ -269,6 +271,12 @@ nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
break;
case NNG_OPT_RAW:
rv = nni_setopt_int(&req->raw, buf, sz, 0, 1);
+ if (rv == 0) {
+ nni_sock_recverr(req->sock, req->raw ? 0 : NNG_ESTATE);
+ }
+ break;
+ case NNG_OPT_MAXTTL:
+ rv = nni_setopt_int(&req->ttl, buf, sz, 1, 255);
break;
default:
rv = NNG_ENOTSUP;
@@ -289,6 +297,9 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
case NNG_OPT_RAW:
rv = nni_getopt_int(&req->raw, buf, szp);
break;
+ case NNG_OPT_MAXTTL:
+ rv = nni_getopt_int(&req->ttl, buf, szp);
+ break;
default:
rv = NNG_ENOTSUP;
}
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 7b8768dc..fe58c602 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -100,3 +100,4 @@ add_nng_compat_test(compat_pair 5)
add_nng_compat_test(compat_pipeline 5)
add_nng_compat_test(compat_reqrep 5)
add_nng_compat_test(compat_survey 5)
+add_nng_compat_test(compat_reqttl 5)
diff --git a/tests/compat_reqttl.c b/tests/compat_reqttl.c
new file mode 100644
index 00000000..2a0113df
--- /dev/null
+++ b/tests/compat_reqttl.c
@@ -0,0 +1,151 @@
+/*
+ Copyright (c) 2012 Martin Sustrik All rights reserved.
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+ Copyright 2016 Franklin "Snaipe" Mathieu <franklinmathieu@gmail.com>
+ Copyright 2017 Garrett D'Amore <garrett@damore.org>
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "nng_compat.h"
+
+#include "compat_testutil.h"
+
+static char socket_address_a[128];
+static char socket_address_b[128];
+int dev0;
+int dev1;
+
+void device (NN_UNUSED void *arg)
+{
+ int rc;
+
+ /* Run the device. */
+ rc = nn_device (dev0, dev1);
+ nn_assert (rc < 0 && nn_errno () == EBADF);
+
+ /* Clean up. */
+ test_close (dev0);
+ test_close (dev1);
+}
+
+int main (int argc, const char *argv[])
+{
+ int end0;
+ int end1;
+ struct nn_thread thread1;
+ int timeo;
+ int maxttl;
+ size_t sz;
+ int rc;
+
+ int port = get_test_port(argc, argv);
+
+ test_addr_from(socket_address_a, "tcp", "127.0.0.1", port);
+ test_addr_from(socket_address_b, "tcp", "127.0.0.1", port + 1);
+
+ /* Intialise the device sockets. */
+ dev0 = test_socket (AF_SP_RAW, NN_REP);
+ dev1 = test_socket (AF_SP_RAW, NN_REQ);
+
+ test_bind (dev0, socket_address_a);
+ test_bind (dev1, socket_address_b);
+
+ /* Start the device. */
+ nn_thread_init (&thread1, device, NULL);
+
+ end0 = test_socket (AF_SP, NN_REQ);
+ end1 = test_socket (AF_SP, NN_REP);
+
+ /* Test the bi-directional device TTL */
+ test_connect (end0, socket_address_a);
+ test_connect (end1, socket_address_b);
+
+ /* Wait for TCP to establish. */
+ nn_sleep (100);
+
+ /* Pass a message between endpoints. */
+ /* Set up max receive timeout. */
+ timeo = 100;
+ test_setsockopt (end0, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo));
+ timeo = 100;
+ test_setsockopt (end1, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo));
+
+ /* Test default TTL is 8. */
+ sz = sizeof (maxttl);
+ maxttl = -1;
+ rc = nn_getsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, &sz);
+ nn_assert (rc == 0);
+ nn_assert (sz == sizeof (maxttl));
+ nn_assert (maxttl == 8);
+
+ /* Test to make sure option TTL cannot be set below 1. */
+ maxttl = -1;
+ rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ nn_assert (rc < 0 && nn_errno () == EINVAL);
+ nn_assert (maxttl == -1);
+ maxttl = 0;
+ rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ nn_assert (rc < 0 && nn_errno () == EINVAL);
+ nn_assert (maxttl == 0);
+
+ /* Test to set non-integer size */
+ maxttl = 8;
+ rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, 1);
+ nn_assert (rc < 0 && nn_errno () == EINVAL);
+ nn_assert (maxttl == 8);
+
+ test_send (end0, "XYZ");
+
+ test_recv (end1, "XYZ");
+
+ /* Now send a reply. */
+ test_send (end1, "REPLYXYZ\n");
+ test_recv (end0, "REPLYXYZ\n");
+
+ /* Now set the max TTL. */
+ maxttl = 1;
+ test_setsockopt (end0, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ test_setsockopt (end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+
+ test_send (end0, "DROPTHIS");
+ test_drop (end1, ETIMEDOUT);
+
+ /* Now set the max TTL up. */
+ maxttl = 2;
+ test_setsockopt (end0, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ test_setsockopt (end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+
+ test_send (end0, "DONTDROP");
+ test_recv (end1, "DONTDROP");
+
+ test_send (end1, "GOTIT");
+ test_recv (end0, "GOTIT");
+
+ /* Clean up. */
+ test_close (end0);
+ test_close (end1);
+
+ /* Shut down the devices. */
+ nn_term ();
+
+ nn_thread_term (&thread1);
+
+ return 0;
+}