aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/device.c137
-rw-r--r--src/core/device.h19
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/socket.c36
-rw-r--r--src/core/socket.h1
5 files changed, 190 insertions, 4 deletions
diff --git a/src/core/device.c b/src/core/device.c
new file mode 100644
index 00000000..7b2b5b87
--- /dev/null
+++ b/src/core/device.c
@@ -0,0 +1,137 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <string.h>
+
+struct nni_device_pair {
+ nni_thr thrs[2];
+ nni_sock * socks[2];
+ int err[2];
+};
+
+typedef struct nni_device_pair nni_device_pair;
+
+static int
+nni_device_loop(nni_sock *from, nni_sock *to)
+{
+ nni_msg *msg;
+ int rv = 0;
+
+ for (;;) {
+ // Take messages sock[0], and send to sock[1].
+ // If an error occurs, we close both sockets.
+ if ((rv = nni_sock_recvmsg(from, &msg, NNI_TIME_NEVER)) != 0) {
+ break;
+ }
+ if ((rv = nni_sock_sendmsg(to, msg, NNI_TIME_NEVER)) != 0) {
+ nni_msg_free(msg);
+ break;
+ }
+ }
+
+ return (rv);
+}
+
+
+static void
+nni_device_fwd(void *p)
+{
+ nni_device_pair *pair = p;
+
+ pair->err[0] = nni_device_loop(pair->socks[0], pair->socks[1]);
+ nni_sock_shutdown(pair->socks[0]);
+ nni_sock_shutdown(pair->socks[1]);
+}
+
+
+static void
+nni_device_rev(void *p)
+{
+ nni_device_pair *pair = p;
+ int rv;
+
+ pair->err[1] = nni_device_loop(pair->socks[1], pair->socks[0]);
+ nni_sock_shutdown(pair->socks[0]);
+ nni_sock_shutdown(pair->socks[1]);
+}
+
+
+int
+nni_device(nni_sock *sock1, nni_sock *sock2)
+{
+ nni_device_pair pair;
+ int rv;
+
+ memset(&pair, 0, sizeof (pair));
+ pair.socks[0] = sock1;
+ pair.socks[1] = sock2;
+
+ if (sock1 == NULL) {
+ sock1 = sock2;
+ }
+ if (sock2 == NULL) {
+ sock2 = sock1;
+ }
+ if ((sock1 == NULL) || (sock2 == NULL)) {
+ rv = NNG_EINVAL;
+ goto out;
+ }
+ if ((sock1->s_peer != sock2->s_protocol) ||
+ (sock2->s_peer != sock1->s_protocol)) {
+ rv = NNG_EINVAL;
+ goto out;
+ }
+
+ pair.socks[0] = sock1;
+ pair.socks[1] = sock2;
+
+ if ((rv = nni_thr_init(&pair.thrs[0], nni_device_fwd, &pair)) != 0) {
+ goto out;
+ }
+ if ((rv = nni_thr_init(&pair.thrs[1], nni_device_rev, &pair)) != 0) {
+ nni_thr_fini(&pair.thrs[0]);
+ goto out;
+ }
+ if (((sock1->s_flags & NNI_PROTO_FLAG_RCV) != 0) &&
+ ((sock2->s_flags & NNI_PROTO_FLAG_SND) != 0)) {
+ nni_thr_run(&pair.thrs[0]);
+ }
+ // If the sockets are the same, then its a simple one way forwarder,
+ // and we don't need two workers (but would be harmless if we did it).
+ if ((sock1 != sock2) &&
+ ((sock2->s_flags & NNI_PROTO_FLAG_RCV) != 0) &&
+ ((sock1->s_flags & NNI_PROTO_FLAG_SND) != 0)) {
+ nni_thr_run(&pair.thrs[1]);
+ }
+
+ // This blocks on both threads (though if we didn't start one, that
+ // will return immediately.)
+ nni_thr_fini(&pair.thrs[0]);
+ nni_thr_fini(&pair.thrs[1]);
+
+ nni_sock_rele(sock1);
+ if (sock1 != sock2) {
+ nni_sock_rele(sock2);
+ }
+
+ rv = pair.err[0];
+ if (rv == 0) {
+ rv = pair.err[1];
+ }
+ if (rv == 0) {
+ // This can happen if neither thread ran. Shouldn't happen
+ // really.
+ rv = NNG_EINVAL;
+ }
+
+out:
+ return (rv);
+}
diff --git a/src/core/device.h b/src/core/device.h
new file mode 100644
index 00000000..973a4cb7
--- /dev/null
+++ b/src/core/device.h
@@ -0,0 +1,19 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_DEVICE_H
+#define CORE_DEVICE_H
+
+// Device takes messages from one side, and forwards them to the other.
+// It works in both directions. Arguably we should build versions of this
+// that are unidirectional, and we could extend this API with user-defined
+// filtering functions.
+extern int nni_device(nni_sock *, nni_sock *);
+
+#endif // CORE_DEVICE_H
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 007e20a6..871fcf67 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -24,6 +24,7 @@
#include "core/defs.h"
#include "core/clock.h"
+#include "core/device.h"
#include "core/idhash.h"
#include "core/init.h"
#include "core/list.h"
diff --git a/src/core/socket.c b/src/core/socket.c
index 6ccf3025..52a4d6d9 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -64,7 +64,7 @@ nni_sock_rele(nni_sock *sock)
{
nni_mtx_lock(nni_idlock);
sock->s_refcnt--;
- if ((sock->s_closed) && (sock->s_refcnt == 1)) {
+ if ((sock->s_closed) && (sock->s_refcnt == 0)) {
nni_cv_wake(&sock->s_refcv);
}
nni_mtx_unlock(nni_idlock);
@@ -90,9 +90,13 @@ nni_sock_hold_close(nni_sock **sockp, uint32_t id)
nni_mtx_unlock(nni_idlock);
return (NNG_ECLOSED);
}
+ nni_idhash_remove(nni_sockets, id);
+ sock->s_id = 0;
sock->s_closed = 1;
- sock->s_refcnt++;
- while (sock->s_refcnt != 1) {
+ nni_mtx_unlock(nni_idlock);
+ nni_sock_shutdown(sock);
+ nni_mtx_lock(nni_idlock);
+ while (sock->s_refcnt != 0) {
nni_cv_wait(&sock->s_refcv);
}
nni_mtx_unlock(nni_idlock);
@@ -102,6 +106,28 @@ nni_sock_hold_close(nni_sock **sockp, uint32_t id)
}
+// nni_sock_held_close uses an existing hold on the socket, but is
+// otherwise pretty much the same as nni_sock_hold_close. When this
+// returns there will be no other user-land references to the socket.
+void
+nni_sock_held_close(nni_sock *sock)
+{
+ nni_mtx_lock(nni_idlock);
+ sock->s_closed = 1;
+ if (sock->s_id != 0) {
+ nni_idhash_remove(nni_sockets, sock->s_id);
+ sock->s_id = 0;
+ }
+ nni_mtx_unlock(nni_idlock);
+ nni_sock_shutdown(sock);
+ nni_mtx_lock(nni_idlock);
+ while (sock->s_refcnt != 0) {
+ nni_cv_wait(&sock->s_refcv);
+ }
+ nni_mtx_unlock(nni_idlock);
+}
+
+
// Because we have to call back into the socket, and possibly also the proto,
// and wait for threads to terminate, we do this in a special thread. The
// assumption is that closing is always a "fast" operation.
@@ -537,7 +563,9 @@ nni_sock_close(nni_sock *sock)
// the results may be tragic.
nni_mtx_lock(nni_idlock);
- nni_idhash_remove(nni_sockets, sock->s_id);
+ if (sock->s_id != 0) {
+ nni_idhash_remove(nni_sockets, sock->s_id);
+ }
if (nni_idhash_count(nni_sockets) == 0) {
nni_idhash_reclaim(nni_pipes);
nni_idhash_reclaim(nni_endpoints);
diff --git a/src/core/socket.h b/src/core/socket.h
index df11ded6..d7a7eb5e 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -74,6 +74,7 @@ struct nni_socket {
extern int nni_sock_hold(nni_sock **, uint32_t);
extern int nni_sock_hold_close(nni_sock **, uint32_t);
+extern void nni_sock_held_close(nni_sock *);
extern void nni_sock_rele(nni_sock *);
extern int nni_sock_open(nni_sock **, uint16_t);
extern void nni_sock_close(nni_sock *);