diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-08 00:33:19 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-08 00:33:19 -0700 |
| commit | 50532054c0bee3a1ff3324db10f3cdf7b44041e4 (patch) | |
| tree | 6e924d21e7eb188865d98e9daa203a045cc928c5 | |
| parent | 702fe1d0af4b08a8b53172aaca57394b181d58b2 (diff) | |
| download | nng-50532054c0bee3a1ff3324db10f3cdf7b44041e4.tar.gz nng-50532054c0bee3a1ff3324db10f3cdf7b44041e4.tar.bz2 nng-50532054c0bee3a1ff3324db10f3cdf7b44041e4.zip | |
Added nn_compat code for option handling, fixed other bugs.
Hop counts for REQ were busted (bad TTL), and imported the
compat_reqtll test. At the same time, added code to nn_term
to shut down completely, discarding sockets. (Note that some
things, such as globals, may still be left around; that's ok.)
| -rw-r--r-- | src/core/socket.c | 42 | ||||
| -rw-r--r-- | src/core/socket.h | 3 | ||||
| -rw-r--r-- | src/nng.c | 6 | ||||
| -rw-r--r-- | src/nng.h | 4 | ||||
| -rw-r--r-- | src/nng_compat.c | 139 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 2 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 11 | ||||
| -rw-r--r-- | tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | tests/compat_reqttl.c | 151 |
9 files changed, 350 insertions, 9 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index e59f6042..f2dbb573 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -15,6 +15,8 @@ // Socket implementation. static nni_objhash *nni_socks = NULL; +static nni_list nni_sock_list; +static nni_mtx nni_sock_lk; uint32_t nni_sock_id(nni_sock *s) @@ -362,6 +364,7 @@ nni_sock_ctor(uint32_t id) sock->s_reconnmax = 0; sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default sock->s_id = id; + NNI_LIST_NODE_INIT(&sock->s_node); nni_pipe_sock_list_init(&sock->s_pipes); @@ -436,8 +439,12 @@ nni_sock_sys_init(void) { int rv; - rv = nni_objhash_init(&nni_socks, nni_sock_ctor, nni_sock_dtor); - + NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node); + if (((rv = nni_objhash_init( + &nni_socks, nni_sock_ctor, nni_sock_dtor)) != 0) || + ((rv = nni_mtx_init(&nni_sock_lk)) != 0)) { + nni_sock_sys_fini(); + } return (rv); } @@ -446,6 +453,7 @@ nni_sock_sys_fini(void) { nni_objhash_fini(nni_socks); nni_socks = NULL; + nni_mtx_fini(&nni_sock_lk); } // nn_sock_open creates the underlying socket. @@ -512,6 +520,10 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) sops->sock_open(sock->s_data); + nni_mtx_lock(&nni_sock_lk); + nni_list_append(&nni_sock_list, sock); + nni_mtx_unlock(&nni_sock_lk); + *sockp = sock; return (0); } @@ -657,6 +669,10 @@ nni_sock_close(nni_sock *sock) sock->s_closed = 1; nni_mtx_unlock(&sock->s_mx); + nni_mtx_lock(&nni_sock_lk); + nni_list_node_remove(&sock->s_node); + nni_mtx_unlock(&nni_sock_lk); + // At this point nothing else should be referencing us. // As with UNIX close, it is a gross error for the caller // to have concurrent threads using this. We've taken care to @@ -672,6 +688,28 @@ nni_sock_close(nni_sock *sock) nni_objhash_unref_wait(nni_socks, sock->s_id); } +void +nni_sock_closeall(void) +{ + nni_sock *s; + uint32_t id; + + for (;;) { + nni_mtx_lock(&nni_sock_lk); + if ((s = nni_list_first(&nni_sock_list)) == NULL) { + nni_mtx_unlock(&nni_sock_lk); + return; + } + id = s->s_id; + nni_list_node_remove(&s->s_node); + nni_mtx_unlock(&nni_sock_lk); + + if (nni_sock_find(&s, id) == 0) { + nni_sock_close(s); + } + } +} + int nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) { diff --git a/src/core/socket.h b/src/core/socket.h index 41dfbc33..54c1b4da 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -23,6 +23,8 @@ struct nni_socket { nni_msgq *s_uwq; // Upper write queue nni_msgq *s_urq; // Upper read queue + nni_list_node s_node; + uint16_t s_protocol; uint16_t s_peer; uint32_t s_flags; @@ -67,6 +69,7 @@ extern int nni_sock_find(nni_sock **, uint32_t); extern void nni_sock_rele(nni_sock *); extern int nni_sock_open(nni_sock **, uint16_t); extern void nni_sock_close(nni_sock *); +extern void nni_sock_closeall(void); extern int nni_sock_shutdown(nni_sock *); extern uint16_t nni_sock_proto(nni_sock *); extern uint16_t nni_sock_peer(nni_sock *); @@ -73,6 +73,12 @@ nng_close(nng_socket sid) return (rv); } +void +nng_closeall(void) +{ + nni_sock_closeall(); +} + uint16_t nng_protocol(nng_socket sid) { @@ -75,6 +75,10 @@ NNG_DECL void nng_fini(void); // pipes associated with the socket. NNG_DECL int nng_close(nng_socket); +// nng_closeall closes all open sockets. Do not call this from +// a library; it will affect all sockets. +NNG_DECL void nng_closeall(void); + // nng_shutdown shuts down the socket. This causes any threads doing // work for the socket or blocked in socket functions to be woken (and // return NNG_ECLOSED). The socket resources are still present, so it diff --git a/src/nng_compat.c b/src/nng_compat.c index 160fd33d..88e2f69c 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -60,7 +60,7 @@ nn_strerror(int err) return ("Unknown I/O error"); } - // Arguablye we could use strerror() here, but we should only + // Arguably we could use strerror() here, but we should only // be getting errnos we understand at this point. (void) snprintf(msgbuf, sizeof(msgbuf), "Unknown error %d", err); return (msgbuf); @@ -536,6 +536,133 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) } int +nn_getsockopt(int s, int nnlevel, int nnopt, void *valp, size_t *szp) +{ + int opt = 0; + int mscvt = 0; + uint64_t usec; + int * msecp; + int rv; + + switch (nnlevel) { + case NN_SOL_SOCKET: + switch (nnopt) { + case NN_LINGER: + opt = NNG_OPT_LINGER; + break; + case NN_SNDBUF: + opt = NNG_OPT_SNDBUF; + break; + case NN_RCVBUF: + opt = NNG_OPT_RCVBUF; + break; + case NN_RECONNECT_IVL: + opt = NNG_OPT_RECONN_TIME; + mscvt = 1; + break; + case NN_RECONNECT_IVL_MAX: + opt = NNG_OPT_RECONN_MAXTIME; + mscvt = 1; + break; + case NN_SNDFD: + opt = NNG_OPT_SNDFD; + break; + case NN_RCVFD: + opt = NNG_OPT_RCVFD; + break; + case NN_RCVMAXSIZE: + opt = NNG_OPT_RCVMAXSZ; + break; + case NN_MAXTTL: + opt = NNG_OPT_MAXTTL; + break; + case NN_RCVTIMEO: + opt = NNG_OPT_RCVTIMEO; + mscvt = 1; + break; + case NN_SNDTIMEO: + opt = NNG_OPT_SNDTIMEO; + mscvt = 1; + break; + case NN_DOMAIN: + case NN_PROTOCOL: + case NN_IPV4ONLY: + case NN_SOCKET_NAME: + case NN_SNDPRIO: + case NN_RCVPRIO: + default: + errno = ENOPROTOOPT; + return (-1); + + break; + } + break; + case NN_REQ: + switch (nnopt) { + case NN_REQ_RESEND_IVL: + opt = NNG_OPT_RESENDTIME; + mscvt = 1; + break; + default: + errno = ENOPROTOOPT; + return (-1); + } + break; + case NN_SUB: + switch (nnopt) { + case NN_SUB_SUBSCRIBE: + opt = NNG_OPT_SUBSCRIBE; + break; + case NN_SUB_UNSUBSCRIBE: + opt = NNG_OPT_UNSUBSCRIBE; + break; + default: + errno = ENOPROTOOPT; + return (-1); + } + break; + case NN_SURVEYOR: + switch (nnopt) { + case NN_SURVEYOR_DEADLINE: + opt = NNG_OPT_SURVEYTIME; + mscvt = 1; + break; + default: + errno = ENOPROTOOPT; + return (-1); + } + break; + default: + errno = ENOPROTOOPT; + return (-1); + } + + if (mscvt) { + if (*szp != sizeof(int)) { + errno = EINVAL; + return (-1); + } + + msecp = valp; + valp = &usec; + *szp = sizeof(uint64_t); + } + + if ((rv = nng_getopt((nng_socket) s, opt, valp, szp)) != 0) { + nn_seterror(rv); + return (-1); + } + + if (mscvt) { + // We have to convert value to ms... + *msecp = (usec / 1000); + *szp = sizeof(int); + } + + return (0); +} + +int nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz) { int opt = 0; @@ -703,11 +830,11 @@ nn_device(int s1, int s2) void nn_term(void) { - // XXX: Implement something to do something. Probably we - // should go through the nni_sockets idhash and clobber all - // of the sockets. This function is relatively toxic, since - // it can affect all sockets in the process, including those - // in use by libraries, etc. + // This function is relatively toxic, since it can affect + // all sockets in the process, including those + // in use by libraries, etc. Accordingly, do not use this + // in a library -- only e.g. atexit() and similar. + nng_closeall(); } // Internal test support routines. diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 049b1422..c7546182 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -312,7 +312,7 @@ nni_rep_pipe_recv_cb(void *arg) } // Move backtrace from body to header - hops = 0; + hops = 1; for (;;) { int end = 0; if (hops >= rep->ttl) { diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 8e7056f5..7ec53c90 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -35,6 +35,7 @@ struct nni_req_sock { int raw; int wantw; int closed; + int ttl; nni_msg * reqmsg; nni_req_pipe *pendpipe; @@ -101,6 +102,7 @@ nni_req_sock_init(void **reqp, nni_sock *sock) req->raw = 0; req->wantw = 0; req->resend = NNI_TIME_ZERO; + req->ttl = 8; req->uwq = nni_sock_sendq(sock); req->urq = nni_sock_recvq(sock); @@ -269,6 +271,12 @@ nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz) break; case NNG_OPT_RAW: rv = nni_setopt_int(&req->raw, buf, sz, 0, 1); + if (rv == 0) { + nni_sock_recverr(req->sock, req->raw ? 0 : NNG_ESTATE); + } + break; + case NNG_OPT_MAXTTL: + rv = nni_setopt_int(&req->ttl, buf, sz, 1, 255); break; default: rv = NNG_ENOTSUP; @@ -289,6 +297,9 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) case NNG_OPT_RAW: rv = nni_getopt_int(&req->raw, buf, szp); break; + case NNG_OPT_MAXTTL: + rv = nni_getopt_int(&req->ttl, buf, szp); + break; default: rv = NNG_ENOTSUP; } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 7b8768dc..fe58c602 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -100,3 +100,4 @@ add_nng_compat_test(compat_pair 5) add_nng_compat_test(compat_pipeline 5) add_nng_compat_test(compat_reqrep 5) add_nng_compat_test(compat_survey 5) +add_nng_compat_test(compat_reqttl 5) diff --git a/tests/compat_reqttl.c b/tests/compat_reqttl.c new file mode 100644 index 00000000..2a0113df --- /dev/null +++ b/tests/compat_reqttl.c @@ -0,0 +1,151 @@ +/* + Copyright (c) 2012 Martin Sustrik All rights reserved. + Copyright (c) 2013 GoPivotal, Inc. All rights reserved. + Copyright 2016 Franklin "Snaipe" Mathieu <franklinmathieu@gmail.com> + Copyright 2017 Garrett D'Amore <garrett@damore.org> + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#include "nng_compat.h" + +#include "compat_testutil.h" + +static char socket_address_a[128]; +static char socket_address_b[128]; +int dev0; +int dev1; + +void device (NN_UNUSED void *arg) +{ + int rc; + + /* Run the device. */ + rc = nn_device (dev0, dev1); + nn_assert (rc < 0 && nn_errno () == EBADF); + + /* Clean up. */ + test_close (dev0); + test_close (dev1); +} + +int main (int argc, const char *argv[]) +{ + int end0; + int end1; + struct nn_thread thread1; + int timeo; + int maxttl; + size_t sz; + int rc; + + int port = get_test_port(argc, argv); + + test_addr_from(socket_address_a, "tcp", "127.0.0.1", port); + test_addr_from(socket_address_b, "tcp", "127.0.0.1", port + 1); + + /* Intialise the device sockets. */ + dev0 = test_socket (AF_SP_RAW, NN_REP); + dev1 = test_socket (AF_SP_RAW, NN_REQ); + + test_bind (dev0, socket_address_a); + test_bind (dev1, socket_address_b); + + /* Start the device. */ + nn_thread_init (&thread1, device, NULL); + + end0 = test_socket (AF_SP, NN_REQ); + end1 = test_socket (AF_SP, NN_REP); + + /* Test the bi-directional device TTL */ + test_connect (end0, socket_address_a); + test_connect (end1, socket_address_b); + + /* Wait for TCP to establish. */ + nn_sleep (100); + + /* Pass a message between endpoints. */ + /* Set up max receive timeout. */ + timeo = 100; + test_setsockopt (end0, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo)); + timeo = 100; + test_setsockopt (end1, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo)); + + /* Test default TTL is 8. */ + sz = sizeof (maxttl); + maxttl = -1; + rc = nn_getsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, &sz); + nn_assert (rc == 0); + nn_assert (sz == sizeof (maxttl)); + nn_assert (maxttl == 8); + + /* Test to make sure option TTL cannot be set below 1. */ + maxttl = -1; + rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + nn_assert (rc < 0 && nn_errno () == EINVAL); + nn_assert (maxttl == -1); + maxttl = 0; + rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + nn_assert (rc < 0 && nn_errno () == EINVAL); + nn_assert (maxttl == 0); + + /* Test to set non-integer size */ + maxttl = 8; + rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, 1); + nn_assert (rc < 0 && nn_errno () == EINVAL); + nn_assert (maxttl == 8); + + test_send (end0, "XYZ"); + + test_recv (end1, "XYZ"); + + /* Now send a reply. */ + test_send (end1, "REPLYXYZ\n"); + test_recv (end0, "REPLYXYZ\n"); + + /* Now set the max TTL. */ + maxttl = 1; + test_setsockopt (end0, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + test_setsockopt (end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + + test_send (end0, "DROPTHIS"); + test_drop (end1, ETIMEDOUT); + + /* Now set the max TTL up. */ + maxttl = 2; + test_setsockopt (end0, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + test_setsockopt (end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + + test_send (end0, "DONTDROP"); + test_recv (end1, "DONTDROP"); + + test_send (end1, "GOTIT"); + test_recv (end0, "GOTIT"); + + /* Clean up. */ + test_close (end0); + test_close (end1); + + /* Shut down the devices. */ + nn_term (); + + nn_thread_term (&thread1); + + return 0; +} |
