diff options
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/device.c | 137 | ||||
| -rw-r--r-- | src/core/device.h | 19 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/socket.c | 36 | ||||
| -rw-r--r-- | src/core/socket.h | 1 | ||||
| -rw-r--r-- | src/nng.c | 34 | ||||
| -rw-r--r-- | src/nng_compat.c | 24 | ||||
| -rw-r--r-- | src/nng_compat.h | 1 | ||||
| -rw-r--r-- | src/protocol/bus/bus.c | 21 | ||||
| -rw-r--r-- | tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | tests/compat_device.c | 184 |
12 files changed, 449 insertions, 12 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e9e8a31b..c81286c1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -34,6 +34,8 @@ set (NNG_SOURCES core/clock.c core/clock.h + core/device.c + core/device.h core/endpt.c core/endpt.h core/event.c diff --git a/src/core/device.c b/src/core/device.c new file mode 100644 index 00000000..7b2b5b87 --- /dev/null +++ b/src/core/device.c @@ -0,0 +1,137 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <string.h> + +struct nni_device_pair { + nni_thr thrs[2]; + nni_sock * socks[2]; + int err[2]; +}; + +typedef struct nni_device_pair nni_device_pair; + +static int +nni_device_loop(nni_sock *from, nni_sock *to) +{ + nni_msg *msg; + int rv = 0; + + for (;;) { + // Take messages sock[0], and send to sock[1]. + // If an error occurs, we close both sockets. + if ((rv = nni_sock_recvmsg(from, &msg, NNI_TIME_NEVER)) != 0) { + break; + } + if ((rv = nni_sock_sendmsg(to, msg, NNI_TIME_NEVER)) != 0) { + nni_msg_free(msg); + break; + } + } + + return (rv); +} + + +static void +nni_device_fwd(void *p) +{ + nni_device_pair *pair = p; + + pair->err[0] = nni_device_loop(pair->socks[0], pair->socks[1]); + nni_sock_shutdown(pair->socks[0]); + nni_sock_shutdown(pair->socks[1]); +} + + +static void +nni_device_rev(void *p) +{ + nni_device_pair *pair = p; + int rv; + + pair->err[1] = nni_device_loop(pair->socks[1], pair->socks[0]); + nni_sock_shutdown(pair->socks[0]); + nni_sock_shutdown(pair->socks[1]); +} + + +int +nni_device(nni_sock *sock1, nni_sock *sock2) +{ + nni_device_pair pair; + int rv; + + memset(&pair, 0, sizeof (pair)); + pair.socks[0] = sock1; + pair.socks[1] = sock2; + + if (sock1 == NULL) { + sock1 = sock2; + } + if (sock2 == NULL) { + sock2 = sock1; + } + if ((sock1 == NULL) || (sock2 == NULL)) { + rv = NNG_EINVAL; + goto out; + } + if ((sock1->s_peer != sock2->s_protocol) || + (sock2->s_peer != sock1->s_protocol)) { + rv = NNG_EINVAL; + goto out; + } + + pair.socks[0] = sock1; + pair.socks[1] = sock2; + + if ((rv = nni_thr_init(&pair.thrs[0], nni_device_fwd, &pair)) != 0) { + goto out; + } + if ((rv = nni_thr_init(&pair.thrs[1], nni_device_rev, &pair)) != 0) { + nni_thr_fini(&pair.thrs[0]); + goto out; + } + if (((sock1->s_flags & NNI_PROTO_FLAG_RCV) != 0) && + ((sock2->s_flags & NNI_PROTO_FLAG_SND) != 0)) { + nni_thr_run(&pair.thrs[0]); + } + // If the sockets are the same, then its a simple one way forwarder, + // and we don't need two workers (but would be harmless if we did it). + if ((sock1 != sock2) && + ((sock2->s_flags & NNI_PROTO_FLAG_RCV) != 0) && + ((sock1->s_flags & NNI_PROTO_FLAG_SND) != 0)) { + nni_thr_run(&pair.thrs[1]); + } + + // This blocks on both threads (though if we didn't start one, that + // will return immediately.) + nni_thr_fini(&pair.thrs[0]); + nni_thr_fini(&pair.thrs[1]); + + nni_sock_rele(sock1); + if (sock1 != sock2) { + nni_sock_rele(sock2); + } + + rv = pair.err[0]; + if (rv == 0) { + rv = pair.err[1]; + } + if (rv == 0) { + // This can happen if neither thread ran. Shouldn't happen + // really. + rv = NNG_EINVAL; + } + +out: + return (rv); +} diff --git a/src/core/device.h b/src/core/device.h new file mode 100644 index 00000000..973a4cb7 --- /dev/null +++ b/src/core/device.h @@ -0,0 +1,19 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_DEVICE_H +#define CORE_DEVICE_H + +// Device takes messages from one side, and forwards them to the other. +// It works in both directions. Arguably we should build versions of this +// that are unidirectional, and we could extend this API with user-defined +// filtering functions. +extern int nni_device(nni_sock *, nni_sock *); + +#endif // CORE_DEVICE_H diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 007e20a6..871fcf67 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -24,6 +24,7 @@ #include "core/defs.h" #include "core/clock.h" +#include "core/device.h" #include "core/idhash.h" #include "core/init.h" #include "core/list.h" diff --git a/src/core/socket.c b/src/core/socket.c index 6ccf3025..52a4d6d9 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -64,7 +64,7 @@ nni_sock_rele(nni_sock *sock) { nni_mtx_lock(nni_idlock); sock->s_refcnt--; - if ((sock->s_closed) && (sock->s_refcnt == 1)) { + if ((sock->s_closed) && (sock->s_refcnt == 0)) { nni_cv_wake(&sock->s_refcv); } nni_mtx_unlock(nni_idlock); @@ -90,9 +90,13 @@ nni_sock_hold_close(nni_sock **sockp, uint32_t id) nni_mtx_unlock(nni_idlock); return (NNG_ECLOSED); } + nni_idhash_remove(nni_sockets, id); + sock->s_id = 0; sock->s_closed = 1; - sock->s_refcnt++; - while (sock->s_refcnt != 1) { + nni_mtx_unlock(nni_idlock); + nni_sock_shutdown(sock); + nni_mtx_lock(nni_idlock); + while (sock->s_refcnt != 0) { nni_cv_wait(&sock->s_refcv); } nni_mtx_unlock(nni_idlock); @@ -102,6 +106,28 @@ nni_sock_hold_close(nni_sock **sockp, uint32_t id) } +// nni_sock_held_close uses an existing hold on the socket, but is +// otherwise pretty much the same as nni_sock_hold_close. When this +// returns there will be no other user-land references to the socket. +void +nni_sock_held_close(nni_sock *sock) +{ + nni_mtx_lock(nni_idlock); + sock->s_closed = 1; + if (sock->s_id != 0) { + nni_idhash_remove(nni_sockets, sock->s_id); + sock->s_id = 0; + } + nni_mtx_unlock(nni_idlock); + nni_sock_shutdown(sock); + nni_mtx_lock(nni_idlock); + while (sock->s_refcnt != 0) { + nni_cv_wait(&sock->s_refcv); + } + nni_mtx_unlock(nni_idlock); +} + + // Because we have to call back into the socket, and possibly also the proto, // and wait for threads to terminate, we do this in a special thread. The // assumption is that closing is always a "fast" operation. @@ -537,7 +563,9 @@ nni_sock_close(nni_sock *sock) // the results may be tragic. nni_mtx_lock(nni_idlock); - nni_idhash_remove(nni_sockets, sock->s_id); + if (sock->s_id != 0) { + nni_idhash_remove(nni_sockets, sock->s_id); + } if (nni_idhash_count(nni_sockets) == 0) { nni_idhash_reclaim(nni_pipes); nni_idhash_reclaim(nni_endpoints); diff --git a/src/core/socket.h b/src/core/socket.h index df11ded6..d7a7eb5e 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -74,6 +74,7 @@ struct nni_socket { extern int nni_sock_hold(nni_sock **, uint32_t); extern int nni_sock_hold_close(nni_sock **, uint32_t); +extern void nni_sock_held_close(nni_sock *); extern void nni_sock_rele(nni_sock *); extern int nni_sock_open(nni_sock **, uint16_t); extern void nni_sock_close(nni_sock *); @@ -348,6 +348,30 @@ nng_event_type(nng_event *ev) } +int +nng_device(nng_socket s1, nng_socket s2) +{ + int rv; + nni_sock *sock1 = NULL; + nni_sock *sock2 = NULL; + + if ((s1 > 0) && (s1 != (nng_socket)-1)) { + if ((rv = nni_sock_hold(&sock1, s1)) != 0) { + return (rv); + } + } + if (((s2 > 0) && (s2 != (nng_socket)-1)) && (s2 != s1)) { + if ((rv = nni_sock_hold(&sock2, s2)) != 0) { + nni_sock_rele(sock1); + return (rv); + } + } + + rv = nni_device(sock1, sock2); + return (rv); +} + + // Misc. const char * nng_strerror(int num) @@ -617,14 +641,6 @@ nng_stat_value(nng_stat *stat) } -int -nng_device(nng_socket sock1, nng_socket sock2) -{ - // Device TBD. - return (NNG_ENOTSUP); -} - - // These routines exist as utility functions, exposing some of our "guts" // to the external world for the purposes of test code and bundled utilities. // They should not be considered part of our public API, and applications @@ -653,6 +669,8 @@ nng_thread_create(void **thrp, void (*func)(void *), void *arg) nni_thr *thr; int rv; + nni_init(); + if ((thr = NNI_ALLOC_STRUCT(thr)) == NULL) { return (NNG_ENOMEM); } diff --git a/src/nng_compat.c b/src/nng_compat.c index 24085f30..ee52a074 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -729,6 +729,30 @@ nn_cmsg_next(struct nn_msghdr *mh, struct nn_cmsghdr *first) } +int +nn_device(int s1, int s2) +{ + int rv; + + rv = nng_device((nng_socket) s1, (nng_socket) s2); + // rv must always be nonzero + nn_seterror(rv); + return (-1); +} + + +// You should not use this function if you can help it. +void +nn_term(void) +{ + // XXX: Implement something to do something. Probably we + // should go through the nni_sockets idhash and clobber all + // of the sockets. This function is relatively toxic, since + // it can affect all sockets in the process, including those + // in use by libraries, etc. +} + + // Internal test support routines. void diff --git a/src/nng_compat.h b/src/nng_compat.h index edf4bd6a..1896f6f7 100644 --- a/src/nng_compat.h +++ b/src/nng_compat.h @@ -298,6 +298,7 @@ NN_DECL void *nn_reallocmsg(void *, size_t); NN_DECL int nn_freemsg(void *); NN_DECL int nn_errno(void); NN_DECL const char *nn_strerror(int); +NN_DECL void nn_term(void); // This stuff is intended to be exposed only for test programs and our diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index c9a1b42b..c6ecd42a 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -155,6 +155,7 @@ nni_bus_pipe_receiver(void *arg) nni_msgq *urq = nni_sock_recvq(psock->nsock); nni_msgq *uwq = nni_sock_sendq(psock->nsock); nni_pipe *npipe = ppipe->npipe; + uint32_t id = nni_pipe_id(npipe); nni_msg *msg; int rv; @@ -163,6 +164,11 @@ nni_bus_pipe_receiver(void *arg) if (rv != 0) { break; } + if ((rv = nni_msg_prepend_header(msg, &id, 4)) != 0) { + // XXX: bump a nomemory stat + nni_msg_free(msg); + continue; + } rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose); if (rv != 0) { nni_msg_free(msg); @@ -216,6 +222,7 @@ nni_bus_sock_sender(void *arg) nni_msgq *uwq = nni_sock_sendq(psock->nsock); nni_mtx *mx = nni_sock_mtx(psock->nsock); nni_msg *msg, *dup; + uint32_t sender; for (;;) { nni_bus_pipe *ppipe; @@ -226,9 +233,23 @@ nni_bus_sock_sender(void *arg) break; } + // The header being present indicates that the message + // was received locally and we are rebroadcasting. (Device + // is doing this probably.) In this case grab the pipe + // ID from the header, so we can exclude it. + if (nni_msg_header_len(msg) >= 4) { + memcpy(&sender, nni_msg_header(msg), 4); + nni_msg_trim_header(msg, 4); + } else { + sender = 0; + } + nni_mtx_lock(mx); last = nni_list_last(&psock->pipes); NNI_LIST_FOREACH (&psock->pipes, ppipe) { + if (nni_pipe_id(ppipe->npipe) == sender) { + continue; + } if (ppipe != last) { rv = nni_msg_dup(&dup, msg); if (rv != 0) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 10e7dcca..ae6401ce 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -90,4 +90,5 @@ add_nng_compat_test(compat_block 5) add_nng_compat_test(compat_bug777 5) add_nng_compat_test(compat_bus 5) add_nng_compat_test(compat_cmsg 5) +add_nng_compat_test(compat_device 5) add_nng_compat_test(compat_reqrep 5) diff --git a/tests/compat_device.c b/tests/compat_device.c new file mode 100644 index 00000000..b83e4f82 --- /dev/null +++ b/tests/compat_device.c @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2012 Martin Sustrik All rights reserved. + * Copyright (c) 2013 GoPivotal, Inc. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "nng_compat.h" +#include "compat_testutil.h" + +#define SOCKET_ADDRESS_A "inproc://a" +#define SOCKET_ADDRESS_B "inproc://b" +#define SOCKET_ADDRESS_C "inproc://c" +#define SOCKET_ADDRESS_D "inproc://d" +#define SOCKET_ADDRESS_E "inproc://e" + +int deva = -1; +int devb = -1; +int devc = -1; +int devd = -1; +int deve = -1; + +void device1(NN_UNUSED void *arg) +{ + int rc; + + /* Intialise the device sockets. */ + deva = test_socket(AF_SP_RAW, NN_PAIR); + test_bind(deva, SOCKET_ADDRESS_A); + devb = test_socket(AF_SP_RAW, NN_PAIR); + test_bind(devb, SOCKET_ADDRESS_B); + + /* Run the device. */ + rc = nn_device(deva, devb); + nn_assert(rc < 0 && (nn_errno() == EBADF)); +} + + +void device2(NN_UNUSED void *arg) +{ + int rc; + + /* Intialise the device sockets. */ + devc = test_socket(AF_SP_RAW, NN_PULL); + test_bind(devc, SOCKET_ADDRESS_C); + devd = test_socket(AF_SP_RAW, NN_PUSH); + test_bind(devd, SOCKET_ADDRESS_D); + + /* Run the device. */ + rc = nn_device(devc, devd); + nn_assert(rc < 0 && nn_errno() == EBADF); +} + + +void device3(NN_UNUSED void *arg) +{ + int rc; + + /* Intialise the device socket. */ + deve = test_socket(AF_SP_RAW, NN_BUS); + test_bind(deve, SOCKET_ADDRESS_E); + + /* Run the device. */ + rc = nn_device(deve, -1); + nn_assert(rc < 0 && nn_errno() == EBADF); +} + + +int main() +{ + int enda; + int endb; + int endc; + int endd; + int ende1; + int ende2; + struct nn_thread thread1; + struct nn_thread thread2; + struct nn_thread thread3; + int timeo; + + /* Test the bi-directional device. */ + + /* Start the device. */ + nn_thread_init(&thread1, device1, NULL); + + /* Create two sockets to connect to the device. */ + enda = test_socket(AF_SP, NN_PAIR); + test_connect(enda, SOCKET_ADDRESS_A); + endb = test_socket(AF_SP, NN_PAIR); + test_connect(endb, SOCKET_ADDRESS_B); + + nn_sleep(200); + + /* Pass a pair of messages between endpoints. */ + test_send(enda, "ABC"); + test_recv(endb, "ABC"); + test_send(endb, "ABC"); + test_recv(enda, "ABC"); + + /* Clean up. */ + test_close(endb); + test_close(enda); + test_close(deva); + test_close(devb); + + /* Test the uni-directional device. */ + + /* Start the device. */ + nn_thread_init(&thread2, device2, NULL); + + /* Create two sockets to connect to the device. */ + endc = test_socket(AF_SP, NN_PUSH); + test_connect(endc, SOCKET_ADDRESS_C); + endd = test_socket(AF_SP, NN_PULL); + test_connect(endd, SOCKET_ADDRESS_D); + + nn_sleep(100); + + /* Pass a message between endpoints. */ + test_send(endc, "XYZ"); + test_recv(endd, "XYZ"); + + /* Clean up. */ + test_close(endd); + test_close(endc); + test_close(devc); + test_close(devd); + + /* Test the loopback device. */ + + /* Start the device. */ + nn_thread_init(&thread3, device3, NULL); + nn_sleep(100); + + /* Create two sockets to connect to the device. */ + ende1 = test_socket(AF_SP, NN_BUS); + test_connect(ende1, SOCKET_ADDRESS_E); + ende2 = test_socket(AF_SP, NN_BUS); + test_connect(ende2, SOCKET_ADDRESS_E); + + /* BUS is unreliable so wait a bit for connections to be established. */ + nn_sleep(200); + + /* Pass a message to the bus. */ + test_send(ende1, "KLM"); + test_recv(ende2, "KLM"); + + /* Make sure that the message doesn't arrive at the socket it was + * originally sent to. */ + timeo = 100; + test_setsockopt(ende1, NN_SOL_SOCKET, NN_RCVTIMEO, + &timeo, sizeof (timeo)); + test_drop(ende1, ETIMEDOUT); + + /* Clean up. */ + test_close(ende2); + test_close(ende1); + test_close(deve); + + /* Shut down the devices. */ + nn_term(); + nn_thread_term(&thread1); + nn_thread_term(&thread2); + nn_thread_term(&thread3); + + return (0); +} |
