diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/dialer.c | 2 | ||||
| -rw-r--r-- | src/core/platform.h | 17 | ||||
| -rw-r--r-- | src/core/sockfd.c | 231 | ||||
| -rw-r--r-- | src/core/sockfd.h | 28 | ||||
| -rw-r--r-- | src/core/stream.c | 10 |
6 files changed, 286 insertions, 4 deletions
diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index f003d756..9e5a6bec 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -24,6 +24,8 @@ nng_sources( device.h dialer.c dialer.h + sockfd.c + sockfd.h file.c file.h idhash.c diff --git a/src/core/dialer.c b/src/core/dialer.c index 55a46efb..91d18dc8 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -411,7 +411,7 @@ int nni_dialer_start(nni_dialer *d, unsigned flags) { int rv = 0; - nni_aio *aio; + nni_aio *aio = NULL; if (nni_atomic_flag_test_and_set(&d->d_started)) { return (NNG_ESTATE); diff --git a/src/core/platform.h b/src/core/platform.h index 89759921..0b5ec634 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -386,9 +386,9 @@ extern int nni_ipc_listener_alloc(nng_stream_listener **, const nng_url *); typedef struct nni_plat_udp nni_plat_udp; // nni_plat_udp_open initializes a UDP socket, binding to the local -// address specified specified in the AIO. The remote address is +// address specified in the AIO. The remote address is // not used. The resulting nni_plat_udp structure is returned in the -// the aio's a_pipe. +// aio's a_pipe. extern int nni_plat_udp_open(nni_plat_udp **, nni_sockaddr *); // nni_plat_udp_close closes the underlying UDP socket. @@ -434,6 +434,19 @@ extern void nni_plat_pipe_close(int, int); extern int nni_plat_udp_sockname(nni_plat_udp *, nni_sockaddr *); +// nni_socket_pair is used to create a socket pair using socketpair() +// on POSIX systems. (Windows might provide a similar solution, using +// AF_UNIX at some point, in which case the arguments will actually be +// an array of HANDLEs.) If not supported, this returns NNG_ENOTSUP. +// +// This API can only create a pair of open file descriptors, suitable for use +// with the socket transport, each bound to the other. The transport must be +// a bidirectional reliable byte stream. This should be suitable for use +// in APIs to transport file descriptors, or across a fork/exec boundary (so +// that child processes may use these with socket to inherit a socket that is +// connected to the parent.) +extern int nni_socket_pair(int *); + // // File/Store Support // diff --git a/src/core/sockfd.c b/src/core/sockfd.c new file mode 100644 index 00000000..1b4dbc1d --- /dev/null +++ b/src/core/sockfd.c @@ -0,0 +1,231 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdint.h> +#include <string.h> + +#include <nng/nng.h> + +#include "core/nng_impl.h" +#include "core/sockfd.h" + +// We will accept up to this many FDs to be queued up for +// accept, before we start rejecting with NNG_ENOSPC. Once +// accept is performed, then another slot is available. +#define NNG_SFD_LISTEN_QUEUE 16 + +int +nni_sfd_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) +{ + NNI_ARG_UNUSED(dp); + NNI_ARG_UNUSED(url); + // No dialer support for this. + return (NNG_ENOTSUP); +} + +typedef struct { + nng_stream_listener ops; + int listen_cnt; // how many FDs are waiting + int listen_q[NNG_SFD_LISTEN_QUEUE]; + bool closed; + nni_list accept_q; + nni_mtx mtx; +} sfd_listener; + +static void +sfd_listener_free(void *arg) +{ + sfd_listener *l = arg; + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +static void +sfd_listener_close(void *arg) +{ + nni_aio *aio; + sfd_listener *l = arg; + nni_mtx_lock(&l->mtx); + l->closed = true; + while ((aio = nni_list_first(&l->accept_q)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + for (int i = 0; i < l->listen_cnt; i++) { + nni_sfd_close_fd(l->listen_q[i]); + } + nni_mtx_unlock(&l->mtx); +} + +static int +sfd_listener_listen(void *arg) +{ + NNI_ARG_UNUSED(arg); + // nothing really for us to do + return (0); +} + +static void +sfd_start_conn(sfd_listener *l, nni_aio *aio) +{ + int fd; + int rv; + nni_sfd_conn *c; + NNI_ASSERT(l->listen_cnt > 0); + fd = l->listen_q[0]; + for (int i = 1; i < l->listen_cnt; i++) { + l->listen_q[i] = l->listen_q[i + 1]; + } + l->listen_cnt--; + if ((rv = nni_sfd_conn_alloc(&c, fd)) != 0) { + nni_aio_finish_error(aio, rv); + nni_sfd_close_fd(fd); + } else { + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +static void +sfd_cancel_accept(nni_aio *aio, void *arg, int rv) +{ + sfd_listener *l = arg; + + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +static void +sfd_listener_accept(void *arg, nng_aio *aio) +{ + sfd_listener *l = arg; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + if (l->listen_cnt) { + sfd_start_conn(l, aio); + } else { + nni_aio_schedule(aio, sfd_cancel_accept, l); + nni_aio_list_append(&l->accept_q, aio); + } + nni_mtx_unlock(&l->mtx); +} + +static int +sfd_listener_set_fd(void *arg, const void *buf, size_t sz, nni_type t) +{ + sfd_listener *l = arg; + nni_aio *aio; + int fd; + int rv; + + if ((rv = nni_copyin_int(&fd, buf, sz, NNI_MININT, NNI_MAXINT, t)) != + 0) { + return (rv); + } + + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + + if (l->listen_cnt == NNG_SFD_LISTEN_QUEUE) { + nni_mtx_unlock(&l->mtx); + return (NNG_ENOSPC); + } + + l->listen_q[l->listen_cnt++] = fd; + + // if someone was waiting in accept, give it to them now + if ((aio = nni_list_first(&l->accept_q)) != NULL) { + nni_aio_list_remove(aio); + sfd_start_conn(l, aio); + } + + nni_mtx_unlock(&l->mtx); + return (0); +} + +static int +sfd_listener_get_addr(void *arg, void *buf, size_t *szp, nni_type t) +{ + NNI_ARG_UNUSED(arg); + nng_sockaddr sa; + sa.s_family = NNG_AF_UNSPEC; + return (nni_copyout_sockaddr(&sa, buf, szp, t)); +} + +static const nni_option sfd_listener_options[] = { + { + .o_name = NNG_OPT_SOCKET_FD, + .o_set = sfd_listener_set_fd, + }, + { + .o_name = NNG_OPT_LOCADDR, + .o_get = sfd_listener_get_addr, + }, + { + .o_name = NULL, + }, +}; + +static int +sfd_listener_get( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + sfd_listener *l = arg; + return (nni_getopt(sfd_listener_options, name, l, buf, szp, t)); +} + +static int +sfd_listener_set( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + sfd_listener *l = arg; + return (nni_setopt(sfd_listener_options, name, l, buf, sz, t)); +} + +int +nni_sfd_listener_alloc(nng_stream_listener **lp, const nng_url *url) +{ + sfd_listener *l; + + NNI_ARG_UNUSED(url); + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + memset(l->listen_q, 0, sizeof(l->listen_q)); + l->listen_cnt = 0; + nni_aio_list_init(&l->accept_q); + nni_mtx_init(&l->mtx); + + l->ops.sl_free = sfd_listener_free; + l->ops.sl_close = sfd_listener_close; + l->ops.sl_listen = sfd_listener_listen; + l->ops.sl_accept = sfd_listener_accept; + l->ops.sl_get = sfd_listener_get; + l->ops.sl_set = sfd_listener_set; + + *lp = (void *) l; + return (0); +} diff --git a/src/core/sockfd.h b/src/core/sockfd.h new file mode 100644 index 00000000..ca37f0e1 --- /dev/null +++ b/src/core/sockfd.h @@ -0,0 +1,28 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_FDC_H +#define CORE_FDC_H + +#include "core/nng_impl.h" + +// the nni_sfd_conn struct is provided by platform code to wrap +// an arbitrary byte stream file descriptor (UNIX) or handle (Windows) +// with a nng_stream. +typedef struct nni_sfd_conn nni_sfd_conn; +extern int nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd); +extern int nni_sfd_dialer_alloc(nng_stream_dialer **, const nng_url *); +extern int nni_sfd_listener_alloc(nng_stream_listener **, const nng_url *); + +// this is used to close a file descriptor, in case we cannot +// create a connection (or if the listener is closed before the +// connection is accepted.) +extern void nni_sfd_close_fd(int fd); + +#endif // CORE_FDC_H diff --git a/src/core/stream.c b/src/core/stream.c index 418bfb15..99002fcd 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -15,6 +15,7 @@ #include "core/nng_impl.h" #include <nng/supplemental/tls/tls.h> +#include "core/sockfd.h" #include "core/tcp.h" #include "supplemental/tls/tls_api.h" #include "supplemental/websocket/websocket.h" @@ -94,6 +95,13 @@ static struct { .dialer_alloc = nni_ws_dialer_alloc, .listener_alloc = nni_ws_listener_alloc, }, +#ifdef NNG_TRANSPORT_FDC + { + .scheme = "socket", + .dialer_alloc = nni_sfd_dialer_alloc, + .listener_alloc = nni_sfd_listener_alloc, + }, +#endif { .scheme = NULL, }, |
