From 9caabf76621ba81e7fed5df42971f355b648ff59 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 18 Dec 2023 01:12:01 -0800 Subject: fixes #1746 Create a new socket:// transport for socketpair() based connections This transport only listens, and creates connections when the application calls setopt on the lister with NNG_OPT_SOCKET_FD, to pass a file descriptor. The FD is turned into an nng_stream, and utilized for SP. The protocol over the descriptor is identical to the TCP protocol (not the IPC protocol). The options for peer information are borrowed from the IPC transport, as they may be useful for these purposes. This includes a test suite and full documentation. --- src/core/CMakeLists.txt | 2 + src/core/dialer.c | 2 +- src/core/platform.h | 17 +++- src/core/sockfd.c | 231 ++++++++++++++++++++++++++++++++++++++++++++++++ src/core/sockfd.h | 28 ++++++ src/core/stream.c | 10 ++- 6 files changed, 286 insertions(+), 4 deletions(-) create mode 100644 src/core/sockfd.c create mode 100644 src/core/sockfd.h (limited to 'src/core') diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index f003d756..9e5a6bec 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -24,6 +24,8 @@ nng_sources( device.h dialer.c dialer.h + sockfd.c + sockfd.h file.c file.h idhash.c diff --git a/src/core/dialer.c b/src/core/dialer.c index 55a46efb..91d18dc8 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -411,7 +411,7 @@ int nni_dialer_start(nni_dialer *d, unsigned flags) { int rv = 0; - nni_aio *aio; + nni_aio *aio = NULL; if (nni_atomic_flag_test_and_set(&d->d_started)) { return (NNG_ESTATE); diff --git a/src/core/platform.h b/src/core/platform.h index 89759921..0b5ec634 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -386,9 +386,9 @@ extern int nni_ipc_listener_alloc(nng_stream_listener **, const nng_url *); typedef struct nni_plat_udp nni_plat_udp; // nni_plat_udp_open initializes a UDP socket, binding to the local -// address specified specified in the AIO. The remote address is +// address specified in the AIO. The remote address is // not used. The resulting nni_plat_udp structure is returned in the -// the aio's a_pipe. +// aio's a_pipe. extern int nni_plat_udp_open(nni_plat_udp **, nni_sockaddr *); // nni_plat_udp_close closes the underlying UDP socket. @@ -434,6 +434,19 @@ extern void nni_plat_pipe_close(int, int); extern int nni_plat_udp_sockname(nni_plat_udp *, nni_sockaddr *); +// nni_socket_pair is used to create a socket pair using socketpair() +// on POSIX systems. (Windows might provide a similar solution, using +// AF_UNIX at some point, in which case the arguments will actually be +// an array of HANDLEs.) If not supported, this returns NNG_ENOTSUP. +// +// This API can only create a pair of open file descriptors, suitable for use +// with the socket transport, each bound to the other. The transport must be +// a bidirectional reliable byte stream. This should be suitable for use +// in APIs to transport file descriptors, or across a fork/exec boundary (so +// that child processes may use these with socket to inherit a socket that is +// connected to the parent.) +extern int nni_socket_pair(int *); + // // File/Store Support // diff --git a/src/core/sockfd.c b/src/core/sockfd.c new file mode 100644 index 00000000..1b4dbc1d --- /dev/null +++ b/src/core/sockfd.c @@ -0,0 +1,231 @@ +// +// Copyright 2023 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include + +#include "core/nng_impl.h" +#include "core/sockfd.h" + +// We will accept up to this many FDs to be queued up for +// accept, before we start rejecting with NNG_ENOSPC. Once +// accept is performed, then another slot is available. +#define NNG_SFD_LISTEN_QUEUE 16 + +int +nni_sfd_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) +{ + NNI_ARG_UNUSED(dp); + NNI_ARG_UNUSED(url); + // No dialer support for this. + return (NNG_ENOTSUP); +} + +typedef struct { + nng_stream_listener ops; + int listen_cnt; // how many FDs are waiting + int listen_q[NNG_SFD_LISTEN_QUEUE]; + bool closed; + nni_list accept_q; + nni_mtx mtx; +} sfd_listener; + +static void +sfd_listener_free(void *arg) +{ + sfd_listener *l = arg; + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +static void +sfd_listener_close(void *arg) +{ + nni_aio *aio; + sfd_listener *l = arg; + nni_mtx_lock(&l->mtx); + l->closed = true; + while ((aio = nni_list_first(&l->accept_q)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + for (int i = 0; i < l->listen_cnt; i++) { + nni_sfd_close_fd(l->listen_q[i]); + } + nni_mtx_unlock(&l->mtx); +} + +static int +sfd_listener_listen(void *arg) +{ + NNI_ARG_UNUSED(arg); + // nothing really for us to do + return (0); +} + +static void +sfd_start_conn(sfd_listener *l, nni_aio *aio) +{ + int fd; + int rv; + nni_sfd_conn *c; + NNI_ASSERT(l->listen_cnt > 0); + fd = l->listen_q[0]; + for (int i = 1; i < l->listen_cnt; i++) { + l->listen_q[i] = l->listen_q[i + 1]; + } + l->listen_cnt--; + if ((rv = nni_sfd_conn_alloc(&c, fd)) != 0) { + nni_aio_finish_error(aio, rv); + nni_sfd_close_fd(fd); + } else { + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +static void +sfd_cancel_accept(nni_aio *aio, void *arg, int rv) +{ + sfd_listener *l = arg; + + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +static void +sfd_listener_accept(void *arg, nng_aio *aio) +{ + sfd_listener *l = arg; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + if (l->listen_cnt) { + sfd_start_conn(l, aio); + } else { + nni_aio_schedule(aio, sfd_cancel_accept, l); + nni_aio_list_append(&l->accept_q, aio); + } + nni_mtx_unlock(&l->mtx); +} + +static int +sfd_listener_set_fd(void *arg, const void *buf, size_t sz, nni_type t) +{ + sfd_listener *l = arg; + nni_aio *aio; + int fd; + int rv; + + if ((rv = nni_copyin_int(&fd, buf, sz, NNI_MININT, NNI_MAXINT, t)) != + 0) { + return (rv); + } + + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + + if (l->listen_cnt == NNG_SFD_LISTEN_QUEUE) { + nni_mtx_unlock(&l->mtx); + return (NNG_ENOSPC); + } + + l->listen_q[l->listen_cnt++] = fd; + + // if someone was waiting in accept, give it to them now + if ((aio = nni_list_first(&l->accept_q)) != NULL) { + nni_aio_list_remove(aio); + sfd_start_conn(l, aio); + } + + nni_mtx_unlock(&l->mtx); + return (0); +} + +static int +sfd_listener_get_addr(void *arg, void *buf, size_t *szp, nni_type t) +{ + NNI_ARG_UNUSED(arg); + nng_sockaddr sa; + sa.s_family = NNG_AF_UNSPEC; + return (nni_copyout_sockaddr(&sa, buf, szp, t)); +} + +static const nni_option sfd_listener_options[] = { + { + .o_name = NNG_OPT_SOCKET_FD, + .o_set = sfd_listener_set_fd, + }, + { + .o_name = NNG_OPT_LOCADDR, + .o_get = sfd_listener_get_addr, + }, + { + .o_name = NULL, + }, +}; + +static int +sfd_listener_get( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + sfd_listener *l = arg; + return (nni_getopt(sfd_listener_options, name, l, buf, szp, t)); +} + +static int +sfd_listener_set( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + sfd_listener *l = arg; + return (nni_setopt(sfd_listener_options, name, l, buf, sz, t)); +} + +int +nni_sfd_listener_alloc(nng_stream_listener **lp, const nng_url *url) +{ + sfd_listener *l; + + NNI_ARG_UNUSED(url); + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + memset(l->listen_q, 0, sizeof(l->listen_q)); + l->listen_cnt = 0; + nni_aio_list_init(&l->accept_q); + nni_mtx_init(&l->mtx); + + l->ops.sl_free = sfd_listener_free; + l->ops.sl_close = sfd_listener_close; + l->ops.sl_listen = sfd_listener_listen; + l->ops.sl_accept = sfd_listener_accept; + l->ops.sl_get = sfd_listener_get; + l->ops.sl_set = sfd_listener_set; + + *lp = (void *) l; + return (0); +} diff --git a/src/core/sockfd.h b/src/core/sockfd.h new file mode 100644 index 00000000..ca37f0e1 --- /dev/null +++ b/src/core/sockfd.h @@ -0,0 +1,28 @@ +// +// Copyright 2023 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_FDC_H +#define CORE_FDC_H + +#include "core/nng_impl.h" + +// the nni_sfd_conn struct is provided by platform code to wrap +// an arbitrary byte stream file descriptor (UNIX) or handle (Windows) +// with a nng_stream. +typedef struct nni_sfd_conn nni_sfd_conn; +extern int nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd); +extern int nni_sfd_dialer_alloc(nng_stream_dialer **, const nng_url *); +extern int nni_sfd_listener_alloc(nng_stream_listener **, const nng_url *); + +// this is used to close a file descriptor, in case we cannot +// create a connection (or if the listener is closed before the +// connection is accepted.) +extern void nni_sfd_close_fd(int fd); + +#endif // CORE_FDC_H diff --git a/src/core/stream.c b/src/core/stream.c index 418bfb15..99002fcd 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -15,6 +15,7 @@ #include "core/nng_impl.h" #include +#include "core/sockfd.h" #include "core/tcp.h" #include "supplemental/tls/tls_api.h" #include "supplemental/websocket/websocket.h" @@ -94,6 +95,13 @@ static struct { .dialer_alloc = nni_ws_dialer_alloc, .listener_alloc = nni_ws_listener_alloc, }, +#ifdef NNG_TRANSPORT_FDC + { + .scheme = "socket", + .dialer_alloc = nni_sfd_dialer_alloc, + .listener_alloc = nni_sfd_listener_alloc, + }, +#endif { .scheme = NULL, }, -- cgit v1.2.3-70-g09d2