diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/device.c | 137 | ||||
| -rw-r--r-- | src/core/device.h | 19 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/socket.c | 36 | ||||
| -rw-r--r-- | src/core/socket.h | 1 |
5 files changed, 190 insertions, 4 deletions
diff --git a/src/core/device.c b/src/core/device.c new file mode 100644 index 00000000..7b2b5b87 --- /dev/null +++ b/src/core/device.c @@ -0,0 +1,137 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <string.h> + +struct nni_device_pair { + nni_thr thrs[2]; + nni_sock * socks[2]; + int err[2]; +}; + +typedef struct nni_device_pair nni_device_pair; + +static int +nni_device_loop(nni_sock *from, nni_sock *to) +{ + nni_msg *msg; + int rv = 0; + + for (;;) { + // Take messages sock[0], and send to sock[1]. + // If an error occurs, we close both sockets. + if ((rv = nni_sock_recvmsg(from, &msg, NNI_TIME_NEVER)) != 0) { + break; + } + if ((rv = nni_sock_sendmsg(to, msg, NNI_TIME_NEVER)) != 0) { + nni_msg_free(msg); + break; + } + } + + return (rv); +} + + +static void +nni_device_fwd(void *p) +{ + nni_device_pair *pair = p; + + pair->err[0] = nni_device_loop(pair->socks[0], pair->socks[1]); + nni_sock_shutdown(pair->socks[0]); + nni_sock_shutdown(pair->socks[1]); +} + + +static void +nni_device_rev(void *p) +{ + nni_device_pair *pair = p; + int rv; + + pair->err[1] = nni_device_loop(pair->socks[1], pair->socks[0]); + nni_sock_shutdown(pair->socks[0]); + nni_sock_shutdown(pair->socks[1]); +} + + +int +nni_device(nni_sock *sock1, nni_sock *sock2) +{ + nni_device_pair pair; + int rv; + + memset(&pair, 0, sizeof (pair)); + pair.socks[0] = sock1; + pair.socks[1] = sock2; + + if (sock1 == NULL) { + sock1 = sock2; + } + if (sock2 == NULL) { + sock2 = sock1; + } + if ((sock1 == NULL) || (sock2 == NULL)) { + rv = NNG_EINVAL; + goto out; + } + if ((sock1->s_peer != sock2->s_protocol) || + (sock2->s_peer != sock1->s_protocol)) { + rv = NNG_EINVAL; + goto out; + } + + pair.socks[0] = sock1; + pair.socks[1] = sock2; + + if ((rv = nni_thr_init(&pair.thrs[0], nni_device_fwd, &pair)) != 0) { + goto out; + } + if ((rv = nni_thr_init(&pair.thrs[1], nni_device_rev, &pair)) != 0) { + nni_thr_fini(&pair.thrs[0]); + goto out; + } + if (((sock1->s_flags & NNI_PROTO_FLAG_RCV) != 0) && + ((sock2->s_flags & NNI_PROTO_FLAG_SND) != 0)) { + nni_thr_run(&pair.thrs[0]); + } + // If the sockets are the same, then its a simple one way forwarder, + // and we don't need two workers (but would be harmless if we did it). + if ((sock1 != sock2) && + ((sock2->s_flags & NNI_PROTO_FLAG_RCV) != 0) && + ((sock1->s_flags & NNI_PROTO_FLAG_SND) != 0)) { + nni_thr_run(&pair.thrs[1]); + } + + // This blocks on both threads (though if we didn't start one, that + // will return immediately.) + nni_thr_fini(&pair.thrs[0]); + nni_thr_fini(&pair.thrs[1]); + + nni_sock_rele(sock1); + if (sock1 != sock2) { + nni_sock_rele(sock2); + } + + rv = pair.err[0]; + if (rv == 0) { + rv = pair.err[1]; + } + if (rv == 0) { + // This can happen if neither thread ran. Shouldn't happen + // really. + rv = NNG_EINVAL; + } + +out: + return (rv); +} diff --git a/src/core/device.h b/src/core/device.h new file mode 100644 index 00000000..973a4cb7 --- /dev/null +++ b/src/core/device.h @@ -0,0 +1,19 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_DEVICE_H +#define CORE_DEVICE_H + +// Device takes messages from one side, and forwards them to the other. +// It works in both directions. Arguably we should build versions of this +// that are unidirectional, and we could extend this API with user-defined +// filtering functions. +extern int nni_device(nni_sock *, nni_sock *); + +#endif // CORE_DEVICE_H diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 007e20a6..871fcf67 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -24,6 +24,7 @@ #include "core/defs.h" #include "core/clock.h" +#include "core/device.h" #include "core/idhash.h" #include "core/init.h" #include "core/list.h" diff --git a/src/core/socket.c b/src/core/socket.c index 6ccf3025..52a4d6d9 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -64,7 +64,7 @@ nni_sock_rele(nni_sock *sock) { nni_mtx_lock(nni_idlock); sock->s_refcnt--; - if ((sock->s_closed) && (sock->s_refcnt == 1)) { + if ((sock->s_closed) && (sock->s_refcnt == 0)) { nni_cv_wake(&sock->s_refcv); } nni_mtx_unlock(nni_idlock); @@ -90,9 +90,13 @@ nni_sock_hold_close(nni_sock **sockp, uint32_t id) nni_mtx_unlock(nni_idlock); return (NNG_ECLOSED); } + nni_idhash_remove(nni_sockets, id); + sock->s_id = 0; sock->s_closed = 1; - sock->s_refcnt++; - while (sock->s_refcnt != 1) { + nni_mtx_unlock(nni_idlock); + nni_sock_shutdown(sock); + nni_mtx_lock(nni_idlock); + while (sock->s_refcnt != 0) { nni_cv_wait(&sock->s_refcv); } nni_mtx_unlock(nni_idlock); @@ -102,6 +106,28 @@ nni_sock_hold_close(nni_sock **sockp, uint32_t id) } +// nni_sock_held_close uses an existing hold on the socket, but is +// otherwise pretty much the same as nni_sock_hold_close. When this +// returns there will be no other user-land references to the socket. +void +nni_sock_held_close(nni_sock *sock) +{ + nni_mtx_lock(nni_idlock); + sock->s_closed = 1; + if (sock->s_id != 0) { + nni_idhash_remove(nni_sockets, sock->s_id); + sock->s_id = 0; + } + nni_mtx_unlock(nni_idlock); + nni_sock_shutdown(sock); + nni_mtx_lock(nni_idlock); + while (sock->s_refcnt != 0) { + nni_cv_wait(&sock->s_refcv); + } + nni_mtx_unlock(nni_idlock); +} + + // Because we have to call back into the socket, and possibly also the proto, // and wait for threads to terminate, we do this in a special thread. The // assumption is that closing is always a "fast" operation. @@ -537,7 +563,9 @@ nni_sock_close(nni_sock *sock) // the results may be tragic. nni_mtx_lock(nni_idlock); - nni_idhash_remove(nni_sockets, sock->s_id); + if (sock->s_id != 0) { + nni_idhash_remove(nni_sockets, sock->s_id); + } if (nni_idhash_count(nni_sockets) == 0) { nni_idhash_reclaim(nni_pipes); nni_idhash_reclaim(nni_endpoints); diff --git a/src/core/socket.h b/src/core/socket.h index df11ded6..d7a7eb5e 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -74,6 +74,7 @@ struct nni_socket { extern int nni_sock_hold(nni_sock **, uint32_t); extern int nni_sock_hold_close(nni_sock **, uint32_t); +extern void nni_sock_held_close(nni_sock *); extern void nni_sock_rele(nni_sock *); extern int nni_sock_open(nni_sock **, uint16_t); extern void nni_sock_close(nni_sock *); |
