aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/device.c137
-rw-r--r--src/core/device.h19
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/socket.c36
-rw-r--r--src/core/socket.h1
-rw-r--r--src/nng.c34
-rw-r--r--src/nng_compat.c24
-rw-r--r--src/nng_compat.h1
-rw-r--r--src/protocol/bus/bus.c21
-rw-r--r--tests/CMakeLists.txt1
-rw-r--r--tests/compat_device.c184
12 files changed, 449 insertions, 12 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index e9e8a31b..c81286c1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -34,6 +34,8 @@ set (NNG_SOURCES
core/clock.c
core/clock.h
+ core/device.c
+ core/device.h
core/endpt.c
core/endpt.h
core/event.c
diff --git a/src/core/device.c b/src/core/device.c
new file mode 100644
index 00000000..7b2b5b87
--- /dev/null
+++ b/src/core/device.c
@@ -0,0 +1,137 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <string.h>
+
+struct nni_device_pair {
+ nni_thr thrs[2];
+ nni_sock * socks[2];
+ int err[2];
+};
+
+typedef struct nni_device_pair nni_device_pair;
+
+static int
+nni_device_loop(nni_sock *from, nni_sock *to)
+{
+ nni_msg *msg;
+ int rv = 0;
+
+ for (;;) {
+ // Take messages sock[0], and send to sock[1].
+ // If an error occurs, we close both sockets.
+ if ((rv = nni_sock_recvmsg(from, &msg, NNI_TIME_NEVER)) != 0) {
+ break;
+ }
+ if ((rv = nni_sock_sendmsg(to, msg, NNI_TIME_NEVER)) != 0) {
+ nni_msg_free(msg);
+ break;
+ }
+ }
+
+ return (rv);
+}
+
+
+static void
+nni_device_fwd(void *p)
+{
+ nni_device_pair *pair = p;
+
+ pair->err[0] = nni_device_loop(pair->socks[0], pair->socks[1]);
+ nni_sock_shutdown(pair->socks[0]);
+ nni_sock_shutdown(pair->socks[1]);
+}
+
+
+static void
+nni_device_rev(void *p)
+{
+ nni_device_pair *pair = p;
+ int rv;
+
+ pair->err[1] = nni_device_loop(pair->socks[1], pair->socks[0]);
+ nni_sock_shutdown(pair->socks[0]);
+ nni_sock_shutdown(pair->socks[1]);
+}
+
+
+int
+nni_device(nni_sock *sock1, nni_sock *sock2)
+{
+ nni_device_pair pair;
+ int rv;
+
+ memset(&pair, 0, sizeof (pair));
+ pair.socks[0] = sock1;
+ pair.socks[1] = sock2;
+
+ if (sock1 == NULL) {
+ sock1 = sock2;
+ }
+ if (sock2 == NULL) {
+ sock2 = sock1;
+ }
+ if ((sock1 == NULL) || (sock2 == NULL)) {
+ rv = NNG_EINVAL;
+ goto out;
+ }
+ if ((sock1->s_peer != sock2->s_protocol) ||
+ (sock2->s_peer != sock1->s_protocol)) {
+ rv = NNG_EINVAL;
+ goto out;
+ }
+
+ pair.socks[0] = sock1;
+ pair.socks[1] = sock2;
+
+ if ((rv = nni_thr_init(&pair.thrs[0], nni_device_fwd, &pair)) != 0) {
+ goto out;
+ }
+ if ((rv = nni_thr_init(&pair.thrs[1], nni_device_rev, &pair)) != 0) {
+ nni_thr_fini(&pair.thrs[0]);
+ goto out;
+ }
+ if (((sock1->s_flags & NNI_PROTO_FLAG_RCV) != 0) &&
+ ((sock2->s_flags & NNI_PROTO_FLAG_SND) != 0)) {
+ nni_thr_run(&pair.thrs[0]);
+ }
+ // If the sockets are the same, then its a simple one way forwarder,
+ // and we don't need two workers (but would be harmless if we did it).
+ if ((sock1 != sock2) &&
+ ((sock2->s_flags & NNI_PROTO_FLAG_RCV) != 0) &&
+ ((sock1->s_flags & NNI_PROTO_FLAG_SND) != 0)) {
+ nni_thr_run(&pair.thrs[1]);
+ }
+
+ // This blocks on both threads (though if we didn't start one, that
+ // will return immediately.)
+ nni_thr_fini(&pair.thrs[0]);
+ nni_thr_fini(&pair.thrs[1]);
+
+ nni_sock_rele(sock1);
+ if (sock1 != sock2) {
+ nni_sock_rele(sock2);
+ }
+
+ rv = pair.err[0];
+ if (rv == 0) {
+ rv = pair.err[1];
+ }
+ if (rv == 0) {
+ // This can happen if neither thread ran. Shouldn't happen
+ // really.
+ rv = NNG_EINVAL;
+ }
+
+out:
+ return (rv);
+}
diff --git a/src/core/device.h b/src/core/device.h
new file mode 100644
index 00000000..973a4cb7
--- /dev/null
+++ b/src/core/device.h
@@ -0,0 +1,19 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_DEVICE_H
+#define CORE_DEVICE_H
+
+// Device takes messages from one side, and forwards them to the other.
+// It works in both directions. Arguably we should build versions of this
+// that are unidirectional, and we could extend this API with user-defined
+// filtering functions.
+extern int nni_device(nni_sock *, nni_sock *);
+
+#endif // CORE_DEVICE_H
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 007e20a6..871fcf67 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -24,6 +24,7 @@
#include "core/defs.h"
#include "core/clock.h"
+#include "core/device.h"
#include "core/idhash.h"
#include "core/init.h"
#include "core/list.h"
diff --git a/src/core/socket.c b/src/core/socket.c
index 6ccf3025..52a4d6d9 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -64,7 +64,7 @@ nni_sock_rele(nni_sock *sock)
{
nni_mtx_lock(nni_idlock);
sock->s_refcnt--;
- if ((sock->s_closed) && (sock->s_refcnt == 1)) {
+ if ((sock->s_closed) && (sock->s_refcnt == 0)) {
nni_cv_wake(&sock->s_refcv);
}
nni_mtx_unlock(nni_idlock);
@@ -90,9 +90,13 @@ nni_sock_hold_close(nni_sock **sockp, uint32_t id)
nni_mtx_unlock(nni_idlock);
return (NNG_ECLOSED);
}
+ nni_idhash_remove(nni_sockets, id);
+ sock->s_id = 0;
sock->s_closed = 1;
- sock->s_refcnt++;
- while (sock->s_refcnt != 1) {
+ nni_mtx_unlock(nni_idlock);
+ nni_sock_shutdown(sock);
+ nni_mtx_lock(nni_idlock);
+ while (sock->s_refcnt != 0) {
nni_cv_wait(&sock->s_refcv);
}
nni_mtx_unlock(nni_idlock);
@@ -102,6 +106,28 @@ nni_sock_hold_close(nni_sock **sockp, uint32_t id)
}
+// nni_sock_held_close uses an existing hold on the socket, but is
+// otherwise pretty much the same as nni_sock_hold_close. When this
+// returns there will be no other user-land references to the socket.
+void
+nni_sock_held_close(nni_sock *sock)
+{
+ nni_mtx_lock(nni_idlock);
+ sock->s_closed = 1;
+ if (sock->s_id != 0) {
+ nni_idhash_remove(nni_sockets, sock->s_id);
+ sock->s_id = 0;
+ }
+ nni_mtx_unlock(nni_idlock);
+ nni_sock_shutdown(sock);
+ nni_mtx_lock(nni_idlock);
+ while (sock->s_refcnt != 0) {
+ nni_cv_wait(&sock->s_refcv);
+ }
+ nni_mtx_unlock(nni_idlock);
+}
+
+
// Because we have to call back into the socket, and possibly also the proto,
// and wait for threads to terminate, we do this in a special thread. The
// assumption is that closing is always a "fast" operation.
@@ -537,7 +563,9 @@ nni_sock_close(nni_sock *sock)
// the results may be tragic.
nni_mtx_lock(nni_idlock);
- nni_idhash_remove(nni_sockets, sock->s_id);
+ if (sock->s_id != 0) {
+ nni_idhash_remove(nni_sockets, sock->s_id);
+ }
if (nni_idhash_count(nni_sockets) == 0) {
nni_idhash_reclaim(nni_pipes);
nni_idhash_reclaim(nni_endpoints);
diff --git a/src/core/socket.h b/src/core/socket.h
index df11ded6..d7a7eb5e 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -74,6 +74,7 @@ struct nni_socket {
extern int nni_sock_hold(nni_sock **, uint32_t);
extern int nni_sock_hold_close(nni_sock **, uint32_t);
+extern void nni_sock_held_close(nni_sock *);
extern void nni_sock_rele(nni_sock *);
extern int nni_sock_open(nni_sock **, uint16_t);
extern void nni_sock_close(nni_sock *);
diff --git a/src/nng.c b/src/nng.c
index d77a05c7..38d259fd 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -348,6 +348,30 @@ nng_event_type(nng_event *ev)
}
+int
+nng_device(nng_socket s1, nng_socket s2)
+{
+ int rv;
+ nni_sock *sock1 = NULL;
+ nni_sock *sock2 = NULL;
+
+ if ((s1 > 0) && (s1 != (nng_socket)-1)) {
+ if ((rv = nni_sock_hold(&sock1, s1)) != 0) {
+ return (rv);
+ }
+ }
+ if (((s2 > 0) && (s2 != (nng_socket)-1)) && (s2 != s1)) {
+ if ((rv = nni_sock_hold(&sock2, s2)) != 0) {
+ nni_sock_rele(sock1);
+ return (rv);
+ }
+ }
+
+ rv = nni_device(sock1, sock2);
+ return (rv);
+}
+
+
// Misc.
const char *
nng_strerror(int num)
@@ -617,14 +641,6 @@ nng_stat_value(nng_stat *stat)
}
-int
-nng_device(nng_socket sock1, nng_socket sock2)
-{
- // Device TBD.
- return (NNG_ENOTSUP);
-}
-
-
// These routines exist as utility functions, exposing some of our "guts"
// to the external world for the purposes of test code and bundled utilities.
// They should not be considered part of our public API, and applications
@@ -653,6 +669,8 @@ nng_thread_create(void **thrp, void (*func)(void *), void *arg)
nni_thr *thr;
int rv;
+ nni_init();
+
if ((thr = NNI_ALLOC_STRUCT(thr)) == NULL) {
return (NNG_ENOMEM);
}
diff --git a/src/nng_compat.c b/src/nng_compat.c
index 24085f30..ee52a074 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -729,6 +729,30 @@ nn_cmsg_next(struct nn_msghdr *mh, struct nn_cmsghdr *first)
}
+int
+nn_device(int s1, int s2)
+{
+ int rv;
+
+ rv = nng_device((nng_socket) s1, (nng_socket) s2);
+ // rv must always be nonzero
+ nn_seterror(rv);
+ return (-1);
+}
+
+
+// You should not use this function if you can help it.
+void
+nn_term(void)
+{
+ // XXX: Implement something to do something. Probably we
+ // should go through the nni_sockets idhash and clobber all
+ // of the sockets. This function is relatively toxic, since
+ // it can affect all sockets in the process, including those
+ // in use by libraries, etc.
+}
+
+
// Internal test support routines.
void
diff --git a/src/nng_compat.h b/src/nng_compat.h
index edf4bd6a..1896f6f7 100644
--- a/src/nng_compat.h
+++ b/src/nng_compat.h
@@ -298,6 +298,7 @@ NN_DECL void *nn_reallocmsg(void *, size_t);
NN_DECL int nn_freemsg(void *);
NN_DECL int nn_errno(void);
NN_DECL const char *nn_strerror(int);
+NN_DECL void nn_term(void);
// This stuff is intended to be exposed only for test programs and our
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index c9a1b42b..c6ecd42a 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -155,6 +155,7 @@ nni_bus_pipe_receiver(void *arg)
nni_msgq *urq = nni_sock_recvq(psock->nsock);
nni_msgq *uwq = nni_sock_sendq(psock->nsock);
nni_pipe *npipe = ppipe->npipe;
+ uint32_t id = nni_pipe_id(npipe);
nni_msg *msg;
int rv;
@@ -163,6 +164,11 @@ nni_bus_pipe_receiver(void *arg)
if (rv != 0) {
break;
}
+ if ((rv = nni_msg_prepend_header(msg, &id, 4)) != 0) {
+ // XXX: bump a nomemory stat
+ nni_msg_free(msg);
+ continue;
+ }
rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose);
if (rv != 0) {
nni_msg_free(msg);
@@ -216,6 +222,7 @@ nni_bus_sock_sender(void *arg)
nni_msgq *uwq = nni_sock_sendq(psock->nsock);
nni_mtx *mx = nni_sock_mtx(psock->nsock);
nni_msg *msg, *dup;
+ uint32_t sender;
for (;;) {
nni_bus_pipe *ppipe;
@@ -226,9 +233,23 @@ nni_bus_sock_sender(void *arg)
break;
}
+ // The header being present indicates that the message
+ // was received locally and we are rebroadcasting. (Device
+ // is doing this probably.) In this case grab the pipe
+ // ID from the header, so we can exclude it.
+ if (nni_msg_header_len(msg) >= 4) {
+ memcpy(&sender, nni_msg_header(msg), 4);
+ nni_msg_trim_header(msg, 4);
+ } else {
+ sender = 0;
+ }
+
nni_mtx_lock(mx);
last = nni_list_last(&psock->pipes);
NNI_LIST_FOREACH (&psock->pipes, ppipe) {
+ if (nni_pipe_id(ppipe->npipe) == sender) {
+ continue;
+ }
if (ppipe != last) {
rv = nni_msg_dup(&dup, msg);
if (rv != 0) {
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 10e7dcca..ae6401ce 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -90,4 +90,5 @@ add_nng_compat_test(compat_block 5)
add_nng_compat_test(compat_bug777 5)
add_nng_compat_test(compat_bus 5)
add_nng_compat_test(compat_cmsg 5)
+add_nng_compat_test(compat_device 5)
add_nng_compat_test(compat_reqrep 5)
diff --git a/tests/compat_device.c b/tests/compat_device.c
new file mode 100644
index 00000000..b83e4f82
--- /dev/null
+++ b/tests/compat_device.c
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2012 Martin Sustrik All rights reserved.
+ * Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom
+ * the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included
+ * in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "nng_compat.h"
+#include "compat_testutil.h"
+
+#define SOCKET_ADDRESS_A "inproc://a"
+#define SOCKET_ADDRESS_B "inproc://b"
+#define SOCKET_ADDRESS_C "inproc://c"
+#define SOCKET_ADDRESS_D "inproc://d"
+#define SOCKET_ADDRESS_E "inproc://e"
+
+int deva = -1;
+int devb = -1;
+int devc = -1;
+int devd = -1;
+int deve = -1;
+
+void device1(NN_UNUSED void *arg)
+{
+ int rc;
+
+ /* Intialise the device sockets. */
+ deva = test_socket(AF_SP_RAW, NN_PAIR);
+ test_bind(deva, SOCKET_ADDRESS_A);
+ devb = test_socket(AF_SP_RAW, NN_PAIR);
+ test_bind(devb, SOCKET_ADDRESS_B);
+
+ /* Run the device. */
+ rc = nn_device(deva, devb);
+ nn_assert(rc < 0 && (nn_errno() == EBADF));
+}
+
+
+void device2(NN_UNUSED void *arg)
+{
+ int rc;
+
+ /* Intialise the device sockets. */
+ devc = test_socket(AF_SP_RAW, NN_PULL);
+ test_bind(devc, SOCKET_ADDRESS_C);
+ devd = test_socket(AF_SP_RAW, NN_PUSH);
+ test_bind(devd, SOCKET_ADDRESS_D);
+
+ /* Run the device. */
+ rc = nn_device(devc, devd);
+ nn_assert(rc < 0 && nn_errno() == EBADF);
+}
+
+
+void device3(NN_UNUSED void *arg)
+{
+ int rc;
+
+ /* Intialise the device socket. */
+ deve = test_socket(AF_SP_RAW, NN_BUS);
+ test_bind(deve, SOCKET_ADDRESS_E);
+
+ /* Run the device. */
+ rc = nn_device(deve, -1);
+ nn_assert(rc < 0 && nn_errno() == EBADF);
+}
+
+
+int main()
+{
+ int enda;
+ int endb;
+ int endc;
+ int endd;
+ int ende1;
+ int ende2;
+ struct nn_thread thread1;
+ struct nn_thread thread2;
+ struct nn_thread thread3;
+ int timeo;
+
+ /* Test the bi-directional device. */
+
+ /* Start the device. */
+ nn_thread_init(&thread1, device1, NULL);
+
+ /* Create two sockets to connect to the device. */
+ enda = test_socket(AF_SP, NN_PAIR);
+ test_connect(enda, SOCKET_ADDRESS_A);
+ endb = test_socket(AF_SP, NN_PAIR);
+ test_connect(endb, SOCKET_ADDRESS_B);
+
+ nn_sleep(200);
+
+ /* Pass a pair of messages between endpoints. */
+ test_send(enda, "ABC");
+ test_recv(endb, "ABC");
+ test_send(endb, "ABC");
+ test_recv(enda, "ABC");
+
+ /* Clean up. */
+ test_close(endb);
+ test_close(enda);
+ test_close(deva);
+ test_close(devb);
+
+ /* Test the uni-directional device. */
+
+ /* Start the device. */
+ nn_thread_init(&thread2, device2, NULL);
+
+ /* Create two sockets to connect to the device. */
+ endc = test_socket(AF_SP, NN_PUSH);
+ test_connect(endc, SOCKET_ADDRESS_C);
+ endd = test_socket(AF_SP, NN_PULL);
+ test_connect(endd, SOCKET_ADDRESS_D);
+
+ nn_sleep(100);
+
+ /* Pass a message between endpoints. */
+ test_send(endc, "XYZ");
+ test_recv(endd, "XYZ");
+
+ /* Clean up. */
+ test_close(endd);
+ test_close(endc);
+ test_close(devc);
+ test_close(devd);
+
+ /* Test the loopback device. */
+
+ /* Start the device. */
+ nn_thread_init(&thread3, device3, NULL);
+ nn_sleep(100);
+
+ /* Create two sockets to connect to the device. */
+ ende1 = test_socket(AF_SP, NN_BUS);
+ test_connect(ende1, SOCKET_ADDRESS_E);
+ ende2 = test_socket(AF_SP, NN_BUS);
+ test_connect(ende2, SOCKET_ADDRESS_E);
+
+ /* BUS is unreliable so wait a bit for connections to be established. */
+ nn_sleep(200);
+
+ /* Pass a message to the bus. */
+ test_send(ende1, "KLM");
+ test_recv(ende2, "KLM");
+
+ /* Make sure that the message doesn't arrive at the socket it was
+ * originally sent to. */
+ timeo = 100;
+ test_setsockopt(ende1, NN_SOL_SOCKET, NN_RCVTIMEO,
+ &timeo, sizeof (timeo));
+ test_drop(ende1, ETIMEDOUT);
+
+ /* Clean up. */
+ test_close(ende2);
+ test_close(ende1);
+ test_close(deve);
+
+ /* Shut down the devices. */
+ nn_term();
+ nn_thread_term(&thread1);
+ nn_thread_term(&thread2);
+ nn_thread_term(&thread3);
+
+ return (0);
+}