From 9caabf76621ba81e7fed5df42971f355b648ff59 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 18 Dec 2023 01:12:01 -0800 Subject: fixes #1746 Create a new socket:// transport for socketpair() based connections This transport only listens, and creates connections when the application calls setopt on the lister with NNG_OPT_SOCKET_FD, to pass a file descriptor. The FD is turned into an nng_stream, and utilized for SP. The protocol over the descriptor is identical to the TCP protocol (not the IPC protocol). The options for peer information are borrowed from the IPC transport, as they may be useful for these purposes. This includes a test suite and full documentation. --- src/core/CMakeLists.txt | 2 + src/core/dialer.c | 2 +- src/core/platform.h | 17 +- src/core/sockfd.c | 231 ++++++++ src/core/sockfd.h | 28 + src/core/stream.c | 10 +- src/platform/posix/CMakeLists.txt | 6 +- src/platform/posix/posix_peerid.c | 122 ++++ src/platform/posix/posix_peerid.h | 25 + src/platform/posix/posix_socketpair.c | 43 ++ src/platform/posix/posix_sockfd.c | 493 ++++++++++++++++ src/platform/windows/CMakeLists.txt | 1 + src/platform/windows/win_socketpair.c | 57 ++ src/sp/transport.c | 8 +- src/sp/transport/CMakeLists.txt | 3 +- src/sp/transport/socket/CMakeLists.txt | 15 + src/sp/transport/socket/sockfd.c | 1011 ++++++++++++++++++++++++++++++++ src/sp/transport/socket/sockfd_test.c | 461 +++++++++++++++ src/supplemental/util/platform.c | 6 + 19 files changed, 2534 insertions(+), 7 deletions(-) create mode 100644 src/core/sockfd.c create mode 100644 src/core/sockfd.h create mode 100644 src/platform/posix/posix_peerid.c create mode 100644 src/platform/posix/posix_peerid.h create mode 100644 src/platform/posix/posix_socketpair.c create mode 100644 src/platform/posix/posix_sockfd.c create mode 100644 src/platform/windows/win_socketpair.c create mode 100644 src/sp/transport/socket/CMakeLists.txt create mode 100644 src/sp/transport/socket/sockfd.c create mode 100644 src/sp/transport/socket/sockfd_test.c (limited to 'src') diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index f003d756..9e5a6bec 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -24,6 +24,8 @@ nng_sources( device.h dialer.c dialer.h + sockfd.c + sockfd.h file.c file.h idhash.c diff --git a/src/core/dialer.c b/src/core/dialer.c index 55a46efb..91d18dc8 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -411,7 +411,7 @@ int nni_dialer_start(nni_dialer *d, unsigned flags) { int rv = 0; - nni_aio *aio; + nni_aio *aio = NULL; if (nni_atomic_flag_test_and_set(&d->d_started)) { return (NNG_ESTATE); diff --git a/src/core/platform.h b/src/core/platform.h index 89759921..0b5ec634 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -386,9 +386,9 @@ extern int nni_ipc_listener_alloc(nng_stream_listener **, const nng_url *); typedef struct nni_plat_udp nni_plat_udp; // nni_plat_udp_open initializes a UDP socket, binding to the local -// address specified specified in the AIO. The remote address is +// address specified in the AIO. The remote address is // not used. The resulting nni_plat_udp structure is returned in the -// the aio's a_pipe. +// aio's a_pipe. extern int nni_plat_udp_open(nni_plat_udp **, nni_sockaddr *); // nni_plat_udp_close closes the underlying UDP socket. @@ -434,6 +434,19 @@ extern void nni_plat_pipe_close(int, int); extern int nni_plat_udp_sockname(nni_plat_udp *, nni_sockaddr *); +// nni_socket_pair is used to create a socket pair using socketpair() +// on POSIX systems. (Windows might provide a similar solution, using +// AF_UNIX at some point, in which case the arguments will actually be +// an array of HANDLEs.) If not supported, this returns NNG_ENOTSUP. +// +// This API can only create a pair of open file descriptors, suitable for use +// with the socket transport, each bound to the other. The transport must be +// a bidirectional reliable byte stream. This should be suitable for use +// in APIs to transport file descriptors, or across a fork/exec boundary (so +// that child processes may use these with socket to inherit a socket that is +// connected to the parent.) +extern int nni_socket_pair(int *); + // // File/Store Support // diff --git a/src/core/sockfd.c b/src/core/sockfd.c new file mode 100644 index 00000000..1b4dbc1d --- /dev/null +++ b/src/core/sockfd.c @@ -0,0 +1,231 @@ +// +// Copyright 2023 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include + +#include "core/nng_impl.h" +#include "core/sockfd.h" + +// We will accept up to this many FDs to be queued up for +// accept, before we start rejecting with NNG_ENOSPC. Once +// accept is performed, then another slot is available. +#define NNG_SFD_LISTEN_QUEUE 16 + +int +nni_sfd_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) +{ + NNI_ARG_UNUSED(dp); + NNI_ARG_UNUSED(url); + // No dialer support for this. + return (NNG_ENOTSUP); +} + +typedef struct { + nng_stream_listener ops; + int listen_cnt; // how many FDs are waiting + int listen_q[NNG_SFD_LISTEN_QUEUE]; + bool closed; + nni_list accept_q; + nni_mtx mtx; +} sfd_listener; + +static void +sfd_listener_free(void *arg) +{ + sfd_listener *l = arg; + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +static void +sfd_listener_close(void *arg) +{ + nni_aio *aio; + sfd_listener *l = arg; + nni_mtx_lock(&l->mtx); + l->closed = true; + while ((aio = nni_list_first(&l->accept_q)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + for (int i = 0; i < l->listen_cnt; i++) { + nni_sfd_close_fd(l->listen_q[i]); + } + nni_mtx_unlock(&l->mtx); +} + +static int +sfd_listener_listen(void *arg) +{ + NNI_ARG_UNUSED(arg); + // nothing really for us to do + return (0); +} + +static void +sfd_start_conn(sfd_listener *l, nni_aio *aio) +{ + int fd; + int rv; + nni_sfd_conn *c; + NNI_ASSERT(l->listen_cnt > 0); + fd = l->listen_q[0]; + for (int i = 1; i < l->listen_cnt; i++) { + l->listen_q[i] = l->listen_q[i + 1]; + } + l->listen_cnt--; + if ((rv = nni_sfd_conn_alloc(&c, fd)) != 0) { + nni_aio_finish_error(aio, rv); + nni_sfd_close_fd(fd); + } else { + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +static void +sfd_cancel_accept(nni_aio *aio, void *arg, int rv) +{ + sfd_listener *l = arg; + + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +static void +sfd_listener_accept(void *arg, nng_aio *aio) +{ + sfd_listener *l = arg; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + if (l->listen_cnt) { + sfd_start_conn(l, aio); + } else { + nni_aio_schedule(aio, sfd_cancel_accept, l); + nni_aio_list_append(&l->accept_q, aio); + } + nni_mtx_unlock(&l->mtx); +} + +static int +sfd_listener_set_fd(void *arg, const void *buf, size_t sz, nni_type t) +{ + sfd_listener *l = arg; + nni_aio *aio; + int fd; + int rv; + + if ((rv = nni_copyin_int(&fd, buf, sz, NNI_MININT, NNI_MAXINT, t)) != + 0) { + return (rv); + } + + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + + if (l->listen_cnt == NNG_SFD_LISTEN_QUEUE) { + nni_mtx_unlock(&l->mtx); + return (NNG_ENOSPC); + } + + l->listen_q[l->listen_cnt++] = fd; + + // if someone was waiting in accept, give it to them now + if ((aio = nni_list_first(&l->accept_q)) != NULL) { + nni_aio_list_remove(aio); + sfd_start_conn(l, aio); + } + + nni_mtx_unlock(&l->mtx); + return (0); +} + +static int +sfd_listener_get_addr(void *arg, void *buf, size_t *szp, nni_type t) +{ + NNI_ARG_UNUSED(arg); + nng_sockaddr sa; + sa.s_family = NNG_AF_UNSPEC; + return (nni_copyout_sockaddr(&sa, buf, szp, t)); +} + +static const nni_option sfd_listener_options[] = { + { + .o_name = NNG_OPT_SOCKET_FD, + .o_set = sfd_listener_set_fd, + }, + { + .o_name = NNG_OPT_LOCADDR, + .o_get = sfd_listener_get_addr, + }, + { + .o_name = NULL, + }, +}; + +static int +sfd_listener_get( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + sfd_listener *l = arg; + return (nni_getopt(sfd_listener_options, name, l, buf, szp, t)); +} + +static int +sfd_listener_set( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + sfd_listener *l = arg; + return (nni_setopt(sfd_listener_options, name, l, buf, sz, t)); +} + +int +nni_sfd_listener_alloc(nng_stream_listener **lp, const nng_url *url) +{ + sfd_listener *l; + + NNI_ARG_UNUSED(url); + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + memset(l->listen_q, 0, sizeof(l->listen_q)); + l->listen_cnt = 0; + nni_aio_list_init(&l->accept_q); + nni_mtx_init(&l->mtx); + + l->ops.sl_free = sfd_listener_free; + l->ops.sl_close = sfd_listener_close; + l->ops.sl_listen = sfd_listener_listen; + l->ops.sl_accept = sfd_listener_accept; + l->ops.sl_get = sfd_listener_get; + l->ops.sl_set = sfd_listener_set; + + *lp = (void *) l; + return (0); +} diff --git a/src/core/sockfd.h b/src/core/sockfd.h new file mode 100644 index 00000000..ca37f0e1 --- /dev/null +++ b/src/core/sockfd.h @@ -0,0 +1,28 @@ +// +// Copyright 2023 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_FDC_H +#define CORE_FDC_H + +#include "core/nng_impl.h" + +// the nni_sfd_conn struct is provided by platform code to wrap +// an arbitrary byte stream file descriptor (UNIX) or handle (Windows) +// with a nng_stream. +typedef struct nni_sfd_conn nni_sfd_conn; +extern int nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd); +extern int nni_sfd_dialer_alloc(nng_stream_dialer **, const nng_url *); +extern int nni_sfd_listener_alloc(nng_stream_listener **, const nng_url *); + +// this is used to close a file descriptor, in case we cannot +// create a connection (or if the listener is closed before the +// connection is accepted.) +extern void nni_sfd_close_fd(int fd); + +#endif // CORE_FDC_H diff --git a/src/core/stream.c b/src/core/stream.c index 418bfb15..99002fcd 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -15,6 +15,7 @@ #include "core/nng_impl.h" #include +#include "core/sockfd.h" #include "core/tcp.h" #include "supplemental/tls/tls_api.h" #include "supplemental/websocket/websocket.h" @@ -94,6 +95,13 @@ static struct { .dialer_alloc = nni_ws_dialer_alloc, .listener_alloc = nni_ws_listener_alloc, }, +#ifdef NNG_TRANSPORT_FDC + { + .scheme = "socket", + .dialer_alloc = nni_sfd_dialer_alloc, + .listener_alloc = nni_sfd_listener_alloc, + }, +#endif { .scheme = NULL, }, diff --git a/src/platform/posix/CMakeLists.txt b/src/platform/posix/CMakeLists.txt index dcfea221..b8e3782e 100644 --- a/src/platform/posix/CMakeLists.txt +++ b/src/platform/posix/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2020 Staysail Systems, Inc. +# Copyright 2023 Staysail Systems, Inc. # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -61,6 +61,7 @@ if (NNG_PLATFORM_POSIX) nng_check_sym(LOCAL_PEERPID sys/un.h NNG_HAVE_LOCALPEERPID) nng_check_sym(getpeerucred ucred.h NNG_HAVE_GETPEERUCRED) nng_check_sym(atomic_flag_test_and_set stdatomic.h NNG_HAVE_STDATOMIC) + nng_check_sym(socketpair sys/socket.h NNG_HAVE_SOCKETPAIR) nng_sources( posix_impl.h @@ -78,9 +79,12 @@ if (NNG_PLATFORM_POSIX) posix_ipcconn.c posix_ipcdial.c posix_ipclisten.c + posix_peerid.c posix_pipe.c posix_resolv_gai.c posix_sockaddr.c + posix_socketpair.c + posix_sockfd.c posix_tcpconn.c posix_tcpdial.c posix_tcplisten.c diff --git a/src/platform/posix/posix_peerid.c b/src/platform/posix/posix_peerid.c new file mode 100644 index 00000000..e0020150 --- /dev/null +++ b/src/platform/posix/posix_peerid.c @@ -0,0 +1,122 @@ +// +// Copyright 2023 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// Copyright 2019 Devolutions +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(NNG_HAVE_GETPEERUCRED) +#include +#elif defined(NNG_HAVE_LOCALPEERCRED) || defined(NNG_HAVE_SOCKPEERCRED) +#include +#endif +#if defined(NNG_HAVE_GETPEEREID) +#include +#include +#endif + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#ifndef SOL_LOCAL +#define SOL_LOCAL 0 +#endif + + +int +nni_posix_peerid(int fd, uint64_t *euid, uint64_t *egid, uint64_t *prid, + uint64_t *znid) +{ +#if defined(NNG_HAVE_GETPEEREID) && !defined(NNG_HAVE_LOCALPEERCRED) + uid_t uid; + gid_t gid; + + if (getpeereid(fd, &uid, &gid) != 0) { + return (nni_plat_errno(errno)); + } + *euid = uid; + *egid = gid; + *prid = (uint64_t) -1; + *znid = (uint64_t) -1; + return (0); +#elif defined(NNG_HAVE_GETPEERUCRED) + ucred_t *ucp = NULL; + if (getpeerucred(fd, &ucp) != 0) { + return (nni_plat_errno(errno)); + } + *euid = ucred_geteuid(ucp); + *egid = ucred_getegid(ucp); + *prid = ucred_getpid(ucp); + *znid = ucred_getzoneid(ucp); + ucred_free(ucp); + return (0); +#elif defined(NNG_HAVE_SOCKPEERCRED) + struct sockpeercred uc; + socklen_t len = sizeof(uc); + if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) { + return (nni_plat_errno(errno)); + } + *euid = uc.uid; + *egid = uc.gid; + *prid = uc.pid; + *znid = (uint64_t) -1; + return (0); +#elif defined(NNG_HAVE_SOPEERCRED) + struct ucred uc; + socklen_t len = sizeof(uc); + if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) { + return (nni_plat_errno(errno)); + } + *euid = uc.uid; + *egid = uc.gid; + *prid = uc.pid; + *znid = (uint64_t) -1; + return (0); +#elif defined(NNG_HAVE_LOCALPEERCRED) + struct xucred xu; + socklen_t len = sizeof(xu); + if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) { + return (nni_plat_errno(errno)); + } + *euid = xu.cr_uid; + *egid = xu.cr_gid; + *prid = (uint64_t) -1; + *znid = (uint64_t) -1; +#if defined(NNG_HAVE_LOCALPEERPID) // documented on macOS since 10.8 + { + pid_t pid; + if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERPID, &pid, &len) == + 0) { + *prid = (uint64_t) pid; + } + } +#endif // NNG_HAVE_LOCALPEERPID + return (0); +#else + if (fd < 0) { + return (NNG_ECLOSED); + } + NNI_ARG_UNUSED(euid); + NNI_ARG_UNUSED(egid); + NNI_ARG_UNUSED(prid); + NNI_ARG_UNUSED(znid); + return (NNG_ENOTSUP); +#endif +} + diff --git a/src/platform/posix/posix_peerid.h b/src/platform/posix/posix_peerid.h new file mode 100644 index 00000000..57e9abff --- /dev/null +++ b/src/platform/posix/posix_peerid.h @@ -0,0 +1,25 @@ +// +// Copyright 2023 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef PLATFORM_POSIX_PEERID_H +#define PLATFORM_POSIX_PEERID_H + +// This file defines structures we will use for emulating asynchronous I/O +// on POSIX. POSIX lacks the support for callback based asynchronous I/O +// that we have on Windows, although it has a non-widely support aio layer +// that is not very performant on many systems. So we emulate this using +// one of several possible different backends. + +#include "core/nng_impl.h" +#include + +int nni_posix_peerid( + int fd, uint64_t *euid, uint64_t *egid, uint64_t *prid, uint64_t *znid); + +#endif // PLATFORM_POSIX_PEERID_H \ No newline at end of file diff --git a/src/platform/posix/posix_socketpair.c b/src/platform/posix/posix_socketpair.c new file mode 100644 index 00000000..3a01ad2b --- /dev/null +++ b/src/platform/posix/posix_socketpair.c @@ -0,0 +1,43 @@ +// +// Copyright 2023 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_HAVE_SOCKETPAIR +// This provides an implementation of socketpair(), which is supposed +// to be present on XPG6 and newer. This trivial implementation +// only supports SOCK_STREAM over AF_UNIX. Which is sufficient for +// most purposes. The fds array should point to an int[2]. +#include +#include + +int +nni_socket_pair(int fds[2]) +{ + int rv; + rv = socketpair(PF_UNIX, SOCK_STREAM, 0, fds); + if (rv != 0) { + return (nni_plat_errno(errno)); + } + +#ifdef SO_NOSIGPIPE + int set = 1; + setsockopt(fds[0], SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int)); + setsockopt(fds[1], SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int)); +#endif + + return (0); +} +#else +int +nni_socket_pair(int *fds) +{ + return (NNG_ENOTSUP); +} +#endif \ No newline at end of file diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c new file mode 100644 index 00000000..b0d88a31 --- /dev/null +++ b/src/platform/posix/posix_sockfd.c @@ -0,0 +1,493 @@ +// +// Copyright 2023 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// Copyright 2019 Devolutions +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include +#include +#include +#include +#include + +#include "core/sockfd.h" +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_peerid.h" + +struct nni_sfd_conn { + nng_stream stream; + nni_posix_pfd *pfd; + int fd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_reap_node reap; +}; + +static void +sfd_dowrite(nni_sfd_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->writeq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov *aiov; + struct iovec iovec[16]; + + nni_aio_get_iov(aio, &naiov, &aiov); + + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len > 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + if ((n = writev(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + // If we didn't send all the data, the caller will + // resubmit. As a corollary, callers should probably + // only have one message on the write queue at a time. + nni_aio_bump_count(aio, n); + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +static void +sfd_doread(nni_sfd_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->readq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov *aiov; + struct iovec iovec[16]; + + nni_aio_get_iov(aio, &naiov, &aiov); + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + if (n == 0) { + // Zero indicates a closed descriptor. + // This implicitly completes this (all!) aio. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECONNSHUT); + continue; + } + + nni_aio_bump_count(aio, n); + + // We completed the entire operation on this aio. + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +static void +sfd_error(void *arg, int err) +{ + nni_sfd_conn *c = arg; + nni_aio *aio; + + nni_mtx_lock(&c->mtx); + while (((aio = nni_list_first(&c->readq)) != NULL) || + ((aio = nni_list_first(&c->writeq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, err); + } + if (c->pfd != NULL) { + nni_posix_pfd_close(c->pfd); + } + nni_mtx_unlock(&c->mtx); +} + +static void +sfd_close(void *arg) +{ + nni_sfd_conn *c = arg; + nni_mtx_lock(&c->mtx); + if (!c->closed) { + nni_aio *aio; + c->closed = true; + while (((aio = nni_list_first(&c->readq)) != NULL) || + ((aio = nni_list_first(&c->writeq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + if (c->pfd != NULL) { + nni_posix_pfd_close(c->pfd); + } + } + nni_mtx_unlock(&c->mtx); +} + +// sfd_fini may block briefly waiting for the pollq thread. +// To get that out of our context, we simply reap this. +static void +sfd_fini(void *arg) +{ + nni_sfd_conn *c = arg; + sfd_close(c); + if (c->pfd != NULL) { + nni_posix_pfd_fini(c->pfd); + } + nni_mtx_fini(&c->mtx); + + NNI_FREE_STRUCT(c); +} + +static nni_reap_list sfd_reap_list = { + .rl_offset = offsetof(nni_sfd_conn, reap), + .rl_func = sfd_fini, +}; +static void +sfd_free(void *arg) +{ + struct nni_sfd_conn *c = arg; + nni_reap(&sfd_reap_list, c); +} + +static void +sfd_cb(nni_posix_pfd *pfd, unsigned events, void *arg) +{ + struct nni_sfd_conn *c = arg; + + if (events & (NNI_POLL_HUP | NNI_POLL_ERR | NNI_POLL_INVAL)) { + sfd_error(c, NNG_ECONNSHUT); + return; + } + nni_mtx_lock(&c->mtx); + if ((events & NNI_POLL_IN) != 0) { + sfd_doread(c); + } + if ((events & NNI_POLL_OUT) != 0) { + sfd_dowrite(c); + } + events = 0; + if (!nni_list_empty(&c->writeq)) { + events |= NNI_POLL_OUT; + } + if (!nni_list_empty(&c->readq)) { + events |= NNI_POLL_IN; + } + if ((!c->closed) && (events != 0)) { + nni_posix_pfd_arm(pfd, events); + } + nni_mtx_unlock(&c->mtx); +} + +static void +sfd_cancel(nni_aio *aio, void *arg, int rv) +{ + nni_sfd_conn *c = arg; + + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +static void +sfd_send(void *arg, nni_aio *aio) +{ + nni_sfd_conn *c = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, sfd_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->writeq, aio); + + if (nni_list_first(&c->writeq) == aio) { + sfd_dowrite(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->writeq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLOUT); + } + } + nni_mtx_unlock(&c->mtx); +} + +static void +sfd_recv(void *arg, nni_aio *aio) +{ + nni_sfd_conn *c = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, sfd_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->readq, aio); + + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. + if (nni_list_first(&c->readq) == aio) { + sfd_doread(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->readq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLIN); + } + } + nni_mtx_unlock(&c->mtx); +} + +static int +sfd_get_addr(void *arg, void *buf, size_t *szp, nni_type t) +{ + NNI_ARG_UNUSED(arg); + nng_sockaddr sa; + sa.s_family = NNG_AF_UNSPEC; + return (nni_copyout_sockaddr(&sa, buf, szp, t)); +} + +static int +sfd_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_sfd_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id = 0; + + rv = nni_posix_peerid(c->fd, &id, &ignore, &ignore, &ignore); + if (rv != 0) { + return (rv); + } + return (nni_copyout_u64(id, buf, szp, t)); +} + +static int +sfd_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_sfd_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id = 0; + + rv = nni_posix_peerid(c->fd, &ignore, &id, &ignore, &ignore); + if (rv != 0) { + return (rv); + } + return (nni_copyout_u64(id, buf, szp, t)); +} + +static int +sfd_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_sfd_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id = 0; + + rv = nni_posix_peerid(c->fd, &ignore, &ignore, &ignore, &id); + if (rv != 0) { + return (rv); + } + if (id == (uint64_t) -1) { + // NB: -1 is not a legal zone id (illumos/Solaris) + return (NNG_ENOTSUP); + } + return (nni_copyout_u64(id, buf, szp, t)); +} + +static int +sfd_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_sfd_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id = 0; + + rv = nni_posix_peerid(c->fd, &ignore, &ignore, &id, &ignore); + if (rv != 0) { + return (rv); + } + if (id == (uint64_t) -1) { + // NB: -1 is not a legal process id + return (NNG_ENOTSUP); + } + return (nni_copyout_u64(id, buf, szp, t)); +} + +static const nni_option sfd_options[] = { + { + .o_name = NNG_OPT_LOCADDR, + .o_get = sfd_get_addr, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = sfd_get_addr, + }, + { + .o_name = NNG_OPT_PEER_PID, + .o_get = sfd_get_peer_pid, + }, + { + .o_name = NNG_OPT_PEER_UID, + .o_get = sfd_get_peer_uid, + }, + { + .o_name = NNG_OPT_PEER_GID, + .o_get = sfd_get_peer_gid, + }, + { + .o_name = NNG_OPT_PEER_ZONEID, + .o_get = sfd_get_peer_zoneid, + }, + { + .o_name = NULL, + }, +}; + +static int +sfd_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + nni_sfd_conn *c = arg; + return (nni_getopt(sfd_options, name, c, buf, szp, t)); +} + +static int +sfd_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + nni_sfd_conn *c = arg; + return (nni_setopt(sfd_options, name, c, buf, sz, t)); +} + +int +nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd) +{ + nni_sfd_conn *c; + int rv; + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_posix_pfd_init(&c->pfd, fd)) != 0) { + NNI_FREE_STRUCT(c); + return (rv); + } + + c->closed = false; + c->fd = fd; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + c->stream.s_free = sfd_free; + c->stream.s_close = sfd_close; + c->stream.s_recv = sfd_recv; + c->stream.s_send = sfd_send; + c->stream.s_get = sfd_get; + c->stream.s_set = sfd_set; + + nni_posix_pfd_set_cb(c->pfd, sfd_cb, c); + + *cp = c; + return (0); +} + +void +nni_sfd_close_fd(int fd) +{ + close(fd); +} \ No newline at end of file diff --git a/src/platform/windows/CMakeLists.txt b/src/platform/windows/CMakeLists.txt index d1d158e0..adf67ebd 100644 --- a/src/platform/windows/CMakeLists.txt +++ b/src/platform/windows/CMakeLists.txt @@ -39,6 +39,7 @@ if (NNG_PLATFORM_WINDOWS) win_rand.c win_resolv.c win_sockaddr.c + win_socketpair.c win_tcp.c win_tcpconn.c win_tcpdial.c diff --git a/src/platform/windows/win_socketpair.c b/src/platform/windows/win_socketpair.c new file mode 100644 index 00000000..0ed0443a --- /dev/null +++ b/src/platform/windows/win_socketpair.c @@ -0,0 +1,57 @@ +// +// Copyright 2023 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + + +#ifdef NNG_HAVE_SOCKETPAIR_TODO +// TODO: Windows lacks socketpair. We can emulate it with an explcit +// implementation based on AF_UNIX. + +#include +#include + +int +nni_socket_pair(int *fds) +{ + int rv; + rv = socketpair(PF_UNIX, SOCK_STREAM, 0, fds); + if (rv != 0) { + return (nni_plat_errno(errno)); + } + + return (0); +} +#else +int +nni_socket_pair(int fds[2]) +{ + NNI_ARG_UNUSED(fds); + return (NNG_ENOTSUP); +} + +// This is also the fdc transport. + +typedef struct nni_sfd_conn nni_sfd_conn; + +void +nni_sfd_close_fd(int fd) +{ + NNI_ARG_UNUSED(fd); +} + +int +nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd) +{ + NNI_ARG_UNUSED(cp); + NNI_ARG_UNUSED(fd); + return (NNG_ENOTSUP); +} + +#endif diff --git a/src/sp/transport.c b/src/sp/transport.c index 9f4c6522..d0a75b22 100644 --- a/src/sp/transport.c +++ b/src/sp/transport.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // Copyright 2019 Devolutions // @@ -70,6 +70,9 @@ extern void nni_sp_wss_register(void); #ifdef NNG_TRANSPORT_ZEROTIER extern void nni_sp_zt_register(void); #endif +#ifdef NNG_TRANSPORT_FDC +extern void nni_sp_sfd_register(void); +#endif void nni_sp_tran_sys_init(void) @@ -95,6 +98,9 @@ nni_sp_tran_sys_init(void) #ifdef NNG_TRANSPORT_ZEROTIER nni_sp_zt_register(); #endif +#ifdef NNG_TRANSPORT_FDC + nni_sp_sfd_register(); +#endif } // nni_sp_tran_sys_fini finalizes the entire transport system, including all diff --git a/src/sp/transport/CMakeLists.txt b/src/sp/transport/CMakeLists.txt index add8a9c9..0de80015 100644 --- a/src/sp/transport/CMakeLists.txt +++ b/src/sp/transport/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2020 Staysail Systems, Inc. +# Copyright 2023 Staysail Systems, Inc. # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -10,6 +10,7 @@ # Transports. nng_directory(transport) +add_subdirectory(socket) add_subdirectory(inproc) add_subdirectory(ipc) add_subdirectory(tcp) diff --git a/src/sp/transport/socket/CMakeLists.txt b/src/sp/transport/socket/CMakeLists.txt new file mode 100644 index 00000000..d79b261e --- /dev/null +++ b/src/sp/transport/socket/CMakeLists.txt @@ -0,0 +1,15 @@ +# +# Copyright 2023 Staysail Systems, Inc. +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# File Descriptor (or Handle) based connections +nng_directory(socket) + +nng_sources_if(NNG_TRANSPORT_FDC sockfd.c) +nng_defines_if(NNG_TRANSPORT_FDC NNG_TRANSPORT_FDC) +nng_test(sockfd_test) \ No newline at end of file diff --git a/src/sp/transport/socket/sockfd.c b/src/sp/transport/socket/sockfd.c new file mode 100644 index 00000000..2db052ac --- /dev/null +++ b/src/sp/transport/socket/sockfd.c @@ -0,0 +1,1011 @@ +// +// Copyright 2023 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// Copyright 2019 Devolutions +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" + +// Socket transport. This takes sockets that may have been +// created via another mechanism (usually socketpair) and +// builds a connection using them. The socket is passed in to the listener +// using the NNG_OPT_SOCKFD_FD option. + +typedef struct sfd_tran_pipe sfd_tran_pipe; +typedef struct sfd_tran_ep sfd_tran_ep; + +// sfd_tran_pipe wraps an open file descriptor +struct sfd_tran_pipe { + nng_stream *conn; + nni_pipe *npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool closed; + nni_list_node node; + sfd_tran_ep *ep; + nni_atomic_flag reaped; + nni_reap_node reap; + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_list recvq; + nni_list sendq; + nni_aio txaio; + nni_aio rxaio; + nni_aio negoaio; + nni_msg *rxmsg; + nni_mtx mtx; +}; + +struct sfd_tran_ep { + nni_mtx mtx; + uint16_t proto; + size_t rcvmax; + bool fini; + bool started; + bool closed; + nng_sockaddr src; + int refcnt; // active pipes + nni_aio *useraio; + nni_aio *connaio; + nni_aio *timeaio; + nni_list busypipes; // busy pipes -- ones passed to socket + nni_list waitpipes; // pipes waiting to match to socket + nni_list negopipes; // pipes busy negotiating + nni_reap_node reap; + nng_stream_listener *listener; + +#ifdef NNG_ENABLE_STATS + nni_stat_item st_rcv_max; +#endif +}; + +static void sfd_tran_pipe_send_start(sfd_tran_pipe *); +static void sfd_tran_pipe_recv_start(sfd_tran_pipe *); +static void sfd_tran_pipe_send_cb(void *); +static void sfd_tran_pipe_recv_cb(void *); +static void sfd_tran_pipe_nego_cb(void *); +static void sfd_tran_ep_fini(void *); +static void sfd_tran_pipe_fini(void *); + +static nni_reap_list sfd_tran_ep_reap_list = { + .rl_offset = offsetof(sfd_tran_ep, reap), + .rl_func = sfd_tran_ep_fini, +}; + +static nni_reap_list sfd_tran_pipe_reap_list = { + .rl_offset = offsetof(sfd_tran_pipe, reap), + .rl_func = sfd_tran_pipe_fini, +}; + +static void +sfd_tran_init(void) +{ +} + +static void +sfd_tran_fini(void) +{ +} + +static void +sfd_tran_pipe_close(void *arg) +{ + sfd_tran_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + p->closed = true; + nni_mtx_unlock(&p->mtx); + + nni_aio_close(&p->rxaio); + nni_aio_close(&p->txaio); + nni_aio_close(&p->negoaio); + + nng_stream_close(p->conn); +} + +static void +sfd_tran_pipe_stop(void *arg) +{ + sfd_tran_pipe *p = arg; + + nni_aio_stop(&p->rxaio); + nni_aio_stop(&p->txaio); + nni_aio_stop(&p->negoaio); +} + +static int +sfd_tran_pipe_init(void *arg, nni_pipe *npipe) +{ + sfd_tran_pipe *p = arg; + p->npipe = npipe; + + return (0); +} + +static void +sfd_tran_pipe_fini(void *arg) +{ + sfd_tran_pipe *p = arg; + sfd_tran_ep *ep; + + sfd_tran_pipe_stop(p); + if ((ep = p->ep) != NULL) { + nni_mtx_lock(&ep->mtx); + nni_list_node_remove(&p->node); + ep->refcnt--; + if (ep->fini && (ep->refcnt == 0)) { + nni_reap(&sfd_tran_ep_reap_list, ep); + } + nni_mtx_unlock(&ep->mtx); + } + + nni_aio_fini(&p->rxaio); + nni_aio_fini(&p->txaio); + nni_aio_fini(&p->negoaio); + nng_stream_free(p->conn); + nni_msg_free(p->rxmsg); + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); +} + +static void +sfd_tran_pipe_reap(sfd_tran_pipe *p) +{ + if (!nni_atomic_flag_test_and_set(&p->reaped)) { + if (p->conn != NULL) { + nng_stream_close(p->conn); + } + nni_reap(&sfd_tran_pipe_reap_list, p); + } +} + +static int +sfd_tran_pipe_alloc(sfd_tran_pipe **pipep) +{ + sfd_tran_pipe *p; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p); + nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p); + nni_aio_list_init(&p->recvq); + nni_aio_list_init(&p->sendq); + nni_atomic_flag_reset(&p->reaped); + + *pipep = p; + + return (0); +} + +static void +sfd_tran_ep_match(sfd_tran_ep *ep) +{ + nni_aio *aio; + sfd_tran_pipe *p; + + if (((aio = ep->useraio) == NULL) || + ((p = nni_list_first(&ep->waitpipes)) == NULL)) { + return; + } + nni_list_remove(&ep->waitpipes, p); + nni_list_append(&ep->busypipes, p); + ep->useraio = NULL; + p->rcvmax = ep->rcvmax; + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); +} + +static void +sfd_tran_pipe_nego_cb(void *arg) +{ + sfd_tran_pipe *p = arg; + sfd_tran_ep *ep = p->ep; + nni_aio *aio = &p->negoaio; + nni_aio *uaio; + int rv; + + nni_mtx_lock(&ep->mtx); + + if ((rv = nni_aio_result(aio)) != 0) { + goto error; + } + + // We start transmitting before we receive. + if (p->gottxhead < p->wanttxhead) { + p->gottxhead += nni_aio_count(aio); + } else if (p->gotrxhead < p->wantrxhead) { + p->gotrxhead += nni_aio_count(aio); + } + + if (p->gottxhead < p->wanttxhead) { + nni_iov iov; + iov.iov_len = p->wanttxhead - p->gottxhead; + iov.iov_buf = &p->txlen[p->gottxhead]; + // send it down... + nni_aio_set_iov(aio, 1, &iov); + nng_stream_send(p->conn, aio); + nni_mtx_unlock(&ep->mtx); + return; + } + if (p->gotrxhead < p->wantrxhead) { + nni_iov iov; + iov.iov_len = p->wantrxhead - p->gotrxhead; + iov.iov_buf = &p->rxlen[p->gotrxhead]; + nni_aio_set_iov(aio, 1, &iov); + nng_stream_recv(p->conn, aio); + nni_mtx_unlock(&ep->mtx); + return; + } + // We have both sent and received the headers. Let's check the + // receiver. + if ((p->rxlen[0] != 0) || (p->rxlen[1] != 'S') || + (p->rxlen[2] != 'P') || (p->rxlen[3] != 0) || (p->rxlen[6] != 0) || + (p->rxlen[7] != 0)) { + rv = NNG_EPROTO; + goto error; + } + + NNI_GET16(&p->rxlen[4], p->peer); + + // We are ready now. We put this in the wait list, and + // then try to run the matcher. + nni_list_remove(&ep->negopipes, p); + nni_list_append(&ep->waitpipes, p); + + sfd_tran_ep_match(ep); + nni_mtx_unlock(&ep->mtx); + + return; + +error: + // If the connection is closed, we need to pass back a different + // error code. This is necessary to avoid a problem where the + // closed status is confused with the accept file descriptor + // being closed. + if (rv == NNG_ECLOSED) { + rv = NNG_ECONNSHUT; + } + nng_stream_close(p->conn); + + if ((uaio = ep->useraio) != NULL) { + ep->useraio = NULL; + nni_aio_finish_error(uaio, rv); + } + nni_mtx_unlock(&ep->mtx); + sfd_tran_pipe_reap(p); +} + +static void +sfd_tran_pipe_send_cb(void *arg) +{ + sfd_tran_pipe *p = arg; + int rv; + nni_aio *aio; + size_t n; + nni_msg *msg; + nni_aio *txaio = &p->txaio; + + nni_mtx_lock(&p->mtx); + aio = nni_list_first(&p->sendq); + + if ((rv = nni_aio_result(txaio)) != 0) { + nni_pipe_bump_error(p->npipe, rv); + // Intentionally we do not queue up another transfer. + // There's an excellent chance that the pipe is no longer + // usable, with a partial transfer. + // The protocol should see this error, and close the + // pipe itself, we hope. + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + n = nni_aio_count(txaio); + nni_aio_iov_advance(txaio, n); + if (nni_aio_iov_count(txaio) > 0) { + nng_stream_send(p->conn, txaio); + nni_mtx_unlock(&p->mtx); + return; + } + + nni_aio_list_remove(aio); + sfd_tran_pipe_send_start(p); + + msg = nni_aio_get_msg(aio); + n = nni_msg_len(msg); + nni_pipe_bump_tx(p->npipe, n); + nni_mtx_unlock(&p->mtx); + + nni_aio_set_msg(aio, NULL); + nni_msg_free(msg); + nni_aio_finish_sync(aio, 0, n); +} + +static void +sfd_tran_pipe_recv_cb(void *arg) +{ + sfd_tran_pipe *p = arg; + nni_aio *aio; + int rv; + size_t n; + nni_msg *msg; + nni_aio *rxaio = &p->rxaio; + + nni_mtx_lock(&p->mtx); + aio = nni_list_first(&p->recvq); + + if ((rv = nni_aio_result(rxaio)) != 0) { + goto recv_error; + } + + if (p->closed) { + rv = NNG_ECLOSED; + goto recv_error; + } + + n = nni_aio_count(rxaio); + nni_aio_iov_advance(rxaio, n); + if (nni_aio_iov_count(rxaio) > 0) { + nng_stream_recv(p->conn, rxaio); + nni_mtx_unlock(&p->mtx); + return; + } + + // If we don't have a message yet, we were reading the message + // header, which is just the length. This tells us the size of the + // message to allocate and how much more to expect. + if (p->rxmsg == NULL) { + uint64_t len; + // We should have gotten a message header. + NNI_GET64(p->rxlen, len); + + // Make sure the message payload is not too big. If it is + // the caller will shut down the pipe. + if ((len > p->rcvmax) && (p->rcvmax > 0)) { + rv = NNG_EMSGSIZE; + goto recv_error; + } + + if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { + goto recv_error; + } + + // Submit the rest of the data for a read -- we want to + // read the entire message now. + if (len != 0) { + nni_iov iov; + iov.iov_buf = nni_msg_body(p->rxmsg); + iov.iov_len = (size_t) len; + + nni_aio_set_iov(rxaio, 1, &iov); + nng_stream_recv(p->conn, rxaio); + nni_mtx_unlock(&p->mtx); + return; + } + } + + // We read a message completely. Let the user know the good news. + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + n = nni_msg_len(msg); + + nni_pipe_bump_rx(p->npipe, n); + sfd_tran_pipe_recv_start(p); + nni_mtx_unlock(&p->mtx); + + nni_aio_set_msg(aio, msg); + nni_aio_finish_sync(aio, 0, n); + return; + +recv_error: + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + nni_pipe_bump_error(p->npipe, rv); + // Intentionally, we do not queue up another receive. + // The protocol should notice this error and close the pipe. + nni_mtx_unlock(&p->mtx); + + nni_msg_free(msg); + nni_aio_finish_error(aio, rv); +} + +static void +sfd_tran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) +{ + sfd_tran_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&p->mtx); + return; + } + // If this is being sent, then cancel the pending transfer. + // The callback on the txaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->sendq) == aio) { + nni_aio_abort(&p->txaio, rv); + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + + nni_aio_finish_error(aio, rv); +} + +static void +sfd_tran_pipe_send_start(sfd_tran_pipe *p) +{ + nni_aio *aio; + nni_aio *txaio; + nni_msg *msg; + int niov; + nni_iov iov[3]; + uint64_t len; + + if (p->closed) { + while ((aio = nni_list_first(&p->sendq)) != NULL) { + nni_list_remove(&p->sendq, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + return; + } + + if ((aio = nni_list_first(&p->sendq)) == NULL) { + return; + } + + // This runs to send the message. + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg) + nni_msg_header_len(msg); + + NNI_PUT64(p->txlen, len); + + txaio = &p->txaio; + niov = 0; + iov[0].iov_buf = p->txlen; + iov[0].iov_len = sizeof(p->txlen); + niov++; + if (nni_msg_header_len(msg) > 0) { + iov[niov].iov_buf = nni_msg_header(msg); + iov[niov].iov_len = nni_msg_header_len(msg); + niov++; + } + if (nni_msg_len(msg) > 0) { + iov[niov].iov_buf = nni_msg_body(msg); + iov[niov].iov_len = nni_msg_len(msg); + niov++; + } + nni_aio_set_iov(txaio, niov, iov); + nng_stream_send(p->conn, txaio); +} + +static void +sfd_tran_pipe_send(void *arg, nni_aio *aio) +{ + sfd_tran_pipe *p = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + // No way to give the message back to the protocol, so + // we just discard it silently to prevent it from leaking. + nni_msg_free(nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); + return; + } + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, sfd_tran_pipe_send_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&p->sendq, aio); + if (nni_list_first(&p->sendq) == aio) { + sfd_tran_pipe_send_start(p); + } + nni_mtx_unlock(&p->mtx); +} + +static void +sfd_tran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) +{ + sfd_tran_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&p->mtx); + return; + } + // If receive in progress, then cancel the pending transfer. + // The callback on the rxaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->recvq) == aio) { + nni_aio_abort(&p->rxaio, rv); + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); +} + +static void +sfd_tran_pipe_recv_start(sfd_tran_pipe *p) +{ + nni_aio *rxaio; + nni_iov iov; + NNI_ASSERT(p->rxmsg == NULL); + + if (p->closed) { + nni_aio *aio; + while ((aio = nni_list_first(&p->recvq)) != NULL) { + nni_list_remove(&p->recvq, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + return; + } + if (nni_list_empty(&p->recvq)) { + return; + } + + // Schedule a read of the header. + rxaio = &p->rxaio; + iov.iov_buf = p->rxlen; + iov.iov_len = sizeof(p->rxlen); + nni_aio_set_iov(rxaio, 1, &iov); + + nng_stream_recv(p->conn, rxaio); +} + +static void +sfd_tran_pipe_recv(void *arg, nni_aio *aio) +{ + sfd_tran_pipe *p = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, sfd_tran_pipe_recv_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + nni_list_append(&p->recvq, aio); + if (nni_list_first(&p->recvq) == aio) { + sfd_tran_pipe_recv_start(p); + } + nni_mtx_unlock(&p->mtx); +} + +static uint16_t +sfd_tran_pipe_peer(void *arg) +{ + sfd_tran_pipe *p = arg; + + return (p->peer); +} + +static int +sfd_tran_pipe_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + sfd_tran_pipe *p = arg; + return (nni_stream_get(p->conn, name, buf, szp, t)); +} + +static void +sfd_tran_pipe_start(sfd_tran_pipe *p, nng_stream *conn, sfd_tran_ep *ep) +{ + nni_iov iov; + + ep->refcnt++; + + p->conn = conn; + p->ep = ep; + p->proto = ep->proto; + + p->txlen[0] = 0; + p->txlen[1] = 'S'; + p->txlen[2] = 'P'; + p->txlen[3] = 0; + NNI_PUT16(&p->txlen[4], p->proto); + NNI_PUT16(&p->txlen[6], 0); + + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 8; + p->wanttxhead = 8; + iov.iov_len = 8; + iov.iov_buf = &p->txlen[0]; + nni_aio_set_iov(&p->negoaio, 1, &iov); + nni_list_append(&ep->negopipes, p); + + // No timeouts here -- the purpose of timeouts was to guard against + // untrusted callers forcing us to burn files. In this case the + // application is providing us with a file, and should be reasonably + // trusted not to be doing a DoS against itself! :-) Part of the + // rationale is that it may take a while for a child process to reach + // the point where it is ready to negotiate the other side of a + // connection. + nni_aio_set_timeout(&p->negoaio, NNG_DURATION_INFINITE); + nng_stream_send(p->conn, &p->negoaio); +} + +static void +sfd_tran_ep_fini(void *arg) +{ + sfd_tran_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + ep->fini = true; + if (ep->refcnt != 0) { + nni_mtx_unlock(&ep->mtx); + return; + } + nni_mtx_unlock(&ep->mtx); + nni_aio_stop(ep->timeaio); + nni_aio_stop(ep->connaio); + nng_stream_listener_free(ep->listener); + nni_aio_free(ep->timeaio); + nni_aio_free(ep->connaio); + + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + +static void +sfd_tran_ep_close(void *arg) +{ + sfd_tran_ep *ep = arg; + sfd_tran_pipe *p; + + nni_mtx_lock(&ep->mtx); + + ep->closed = true; + nni_aio_close(ep->timeaio); + if (ep->listener != NULL) { + nng_stream_listener_close(ep->listener); + } + NNI_LIST_FOREACH (&ep->negopipes, p) { + sfd_tran_pipe_close(p); + } + NNI_LIST_FOREACH (&ep->waitpipes, p) { + sfd_tran_pipe_close(p); + } + NNI_LIST_FOREACH (&ep->busypipes, p) { + sfd_tran_pipe_close(p); + } + if (ep->useraio != NULL) { + nni_aio_finish_error(ep->useraio, NNG_ECLOSED); + ep->useraio = NULL; + } + + nni_mtx_unlock(&ep->mtx); +} + +static void +sfd_tran_timer_cb(void *arg) +{ + sfd_tran_ep *ep = arg; + if (nni_aio_result(ep->timeaio) == 0) { + nng_stream_listener_accept(ep->listener, ep->connaio); + } +} + +static void +sfd_tran_accept_cb(void *arg) +{ + sfd_tran_ep *ep = arg; + nni_aio *aio = ep->connaio; + sfd_tran_pipe *p; + int rv; + nng_stream *conn; + + nni_mtx_lock(&ep->mtx); + + if ((rv = nni_aio_result(aio)) != 0) { + goto error; + } + + conn = nni_aio_get_output(aio, 0); + if ((rv = sfd_tran_pipe_alloc(&p)) != 0) { + nng_stream_free(conn); + goto error; + } + + if (ep->closed) { + sfd_tran_pipe_fini(p); + nng_stream_free(conn); + rv = NNG_ECLOSED; + goto error; + } + sfd_tran_pipe_start(p, conn, ep); + nng_stream_listener_accept(ep->listener, ep->connaio); + nni_mtx_unlock(&ep->mtx); + return; + +error: + // When an error here occurs, let's send a notice up to the consumer. + // That way it can be reported properly. + if ((aio = ep->useraio) != NULL) { + ep->useraio = NULL; + nni_aio_finish_error(aio, rv); + } + switch (rv) { + + case NNG_ENOMEM: + case NNG_ENOFILES: + nng_sleep_aio(10, ep->timeaio); + break; + + default: + if (!ep->closed) { + nng_stream_listener_accept(ep->listener, ep->connaio); + } + break; + } + nni_mtx_unlock(&ep->mtx); +} + +static int +sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock) +{ + sfd_tran_ep *ep; + NNI_ARG_UNUSED(url); + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&ep->mtx); + NNI_LIST_INIT(&ep->busypipes, sfd_tran_pipe, node); + NNI_LIST_INIT(&ep->waitpipes, sfd_tran_pipe, node); + NNI_LIST_INIT(&ep->negopipes, sfd_tran_pipe, node); + + ep->proto = nni_sock_proto_id(sock); + +#ifdef NNG_ENABLE_STATS + static const nni_stat_info rcv_max_info = { + .si_name = "rcv_max", + .si_desc = "maximum receive size", + .si_type = NNG_STAT_LEVEL, + .si_unit = NNG_UNIT_BYTES, + .si_atomic = true, + }; + nni_stat_init(&ep->st_rcv_max, &rcv_max_info); +#endif + + *epp = ep; + return (0); +} + +static int +sfd_tran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) +{ + NNI_ARG_UNUSED(dp); + NNI_ARG_UNUSED(url); + NNI_ARG_UNUSED(ndialer); + return (NNG_ENOTSUP); +} + +static int +sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) +{ + sfd_tran_ep *ep; + int rv; + nni_sock *sock = nni_listener_sock(nlistener); + + // Check for invalid URL components -- we only accept a bare scheme + if ((strlen(url->u_host) != 0) || (strlen(url->u_path) != 0) || + (url->u_fragment != NULL) || (url->u_userinfo != NULL) || + (url->u_query != NULL)) { + return (NNG_EADDRINVAL); + } + + if ((rv = sfd_tran_ep_init(&ep, url, sock)) != 0) { + return (rv); + } + + if (((rv = nni_aio_alloc(&ep->connaio, sfd_tran_accept_cb, ep)) != + 0) || + ((rv = nni_aio_alloc(&ep->timeaio, sfd_tran_timer_cb, ep)) != 0) || + ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { + sfd_tran_ep_fini(ep); + return (rv); + } +#ifdef NNG_ENABLE_STATS + nni_listener_add_stat(nlistener, &ep->st_rcv_max); +#endif + + *lp = ep; + return (0); +} + +static void +sfd_tran_ep_cancel(nni_aio *aio, void *arg, int rv) +{ + sfd_tran_ep *ep = arg; + nni_mtx_lock(&ep->mtx); + if (ep->useraio == aio) { + ep->useraio = NULL; + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&ep->mtx); +} + +static int +sfd_tran_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + sfd_tran_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static int +sfd_tran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + sfd_tran_ep *ep = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&ep->mtx); + ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); +#ifdef NNG_ENABLE_STATS + nni_stat_set_value(&ep->st_rcv_max, val); +#endif + } + return (rv); +} + +static int +sfd_tran_ep_bind(void *arg) +{ + sfd_tran_ep *ep = arg; + return (nng_stream_listener_listen(ep->listener)); +} + +static void +sfd_tran_ep_accept(void *arg, nni_aio *aio) +{ + sfd_tran_ep *ep = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ep->mtx); + if (ep->closed) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if (ep->useraio != NULL) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_EBUSY); + return; + } + if ((rv = nni_aio_schedule(aio, sfd_tran_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + ep->useraio = aio; + if (!ep->started) { + ep->started = true; + nng_stream_listener_accept(ep->listener, ep->connaio); + } else { + sfd_tran_ep_match(ep); + } + nni_mtx_unlock(&ep->mtx); +} + +static nni_sp_pipe_ops sfd_tran_pipe_ops = { + .p_init = sfd_tran_pipe_init, + .p_fini = sfd_tran_pipe_fini, + .p_stop = sfd_tran_pipe_stop, + .p_send = sfd_tran_pipe_send, + .p_recv = sfd_tran_pipe_recv, + .p_close = sfd_tran_pipe_close, + .p_peer = sfd_tran_pipe_peer, + .p_getopt = sfd_tran_pipe_getopt, +}; + +static const nni_option sfd_tran_ep_opts[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_get = sfd_tran_ep_get_recvmaxsz, + .o_set = sfd_tran_ep_set_recvmaxsz, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static int +sfd_tran_listener_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + sfd_tran_ep *ep = arg; + int rv; + + rv = nni_stream_listener_get(ep->listener, name, buf, szp, t); + if (rv == NNG_ENOTSUP) { + rv = nni_getopt(sfd_tran_ep_opts, name, ep, buf, szp, t); + } + return (rv); +} + +static int +sfd_tran_listener_setopt( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + sfd_tran_ep *ep = arg; + int rv; + + rv = nni_stream_listener_set(ep->listener, name, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_setopt(sfd_tran_ep_opts, name, ep, buf, sz, t); + } + return (rv); +} + +static nni_sp_dialer_ops sfd_tran_dialer_ops = { + .d_init = sfd_tran_dialer_init, + .d_fini = NULL, + .d_connect = NULL, + .d_close = NULL, + .d_getopt = NULL, + .d_setopt = NULL, +}; + +static nni_sp_listener_ops sfd_tran_listener_ops = { + .l_init = sfd_tran_listener_init, + .l_fini = sfd_tran_ep_fini, + .l_bind = sfd_tran_ep_bind, + .l_accept = sfd_tran_ep_accept, + .l_close = sfd_tran_ep_close, + .l_getopt = sfd_tran_listener_getopt, + .l_setopt = sfd_tran_listener_setopt, +}; + +static nni_sp_tran sfd_tran = { + .tran_scheme = "socket", + .tran_dialer = &sfd_tran_dialer_ops, + .tran_listener = &sfd_tran_listener_ops, + .tran_pipe = &sfd_tran_pipe_ops, + .tran_init = sfd_tran_init, + .tran_fini = sfd_tran_fini, +}; + +void +nni_sp_sfd_register(void) +{ + nni_sp_tran_register(&sfd_tran); +} diff --git a/src/sp/transport/socket/sockfd_test.c b/src/sp/transport/socket/sockfd_test.c new file mode 100644 index 00000000..c12d4466 --- /dev/null +++ b/src/sp/transport/socket/sockfd_test.c @@ -0,0 +1,461 @@ +// +// Copyright 2023 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// Copyright 2018 Devolutions +// Copyright 2018 Cody Piersall +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include + +#ifdef NNG_PLATFORM_POSIX +#include +#include +#ifdef NNG_PLATFORM_SUNOS +#include +#endif +#endif + +// FDC tests. +static void +test_sfd_connect_fail(void) +{ + nng_socket s; + + NUTS_OPEN(s); + NUTS_FAIL(nng_dial(s, "socket://", NULL, 0), NNG_ENOTSUP); + NUTS_CLOSE(s); +} + +void +test_sfd_malformed_address(void) +{ + nng_socket s1; + + NUTS_OPEN(s1); + NUTS_FAIL(nng_listen(s1, "socket://junk", NULL, 0), NNG_EADDRINVAL); + NUTS_CLOSE(s1); +} + +void +test_sfd_listen(void) +{ + nng_socket s1; + + NUTS_OPEN(s1); + NUTS_PASS(nng_listen(s1, "socket://", NULL, 0)); + NUTS_CLOSE(s1); +} + +void +test_sfd_accept(void) +{ + nng_socket s1, s2; + nng_listener l; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + // make sure we won't have to deal with SIGPIPE - EPIPE is better + NUTS_OPEN(s1); + NUTS_OPEN(s2); + NUTS_PASS(nng_listener_create(&l, s1, "socket://")); + NUTS_PASS(nng_listener_start(l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_SLEEP(10); + NUTS_CLOSE(s1); + close(fds[1]); +} + +void +test_sfd_exchange(void) +{ + nng_socket s1, s2; + nng_listener l1, l2; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s1); + NUTS_OPEN(s2); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listener_create(&l2, s2, "socket://")); + NUTS_PASS(nng_listener_start(l2, 0)); + NUTS_PASS(nng_listener_set_int(l2, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_SLEEP(10); + NUTS_SEND(s1, "hello"); + NUTS_RECV(s2, "hello"); + NUTS_SEND(s2, "there"); + NUTS_RECV(s1, "there"); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + close(fds[1]); +} + +void +test_sfd_exchange_late(void) +{ + nng_socket s1, s2; + nng_listener l1, l2; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s1); + NUTS_OPEN(s2); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_create(&l2, s2, "socket://")); + NUTS_PASS(nng_listener_set_int(l2, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_listener_start(l2, 0)); + NUTS_SLEEP(10); + NUTS_SEND(s1, "hello"); + NUTS_RECV(s2, "hello"); + NUTS_SEND(s2, "there"); + NUTS_RECV(s1, "there"); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + close(fds[1]); +} + +void +test_sfd_recv_max(void) +{ + char msg[256]; + char buf[256]; + nng_socket s0; + nng_socket s1; + nng_listener l0; + nng_listener l1; + size_t sz; + size_t scratch; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + + NUTS_OPEN(s0); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_size(s0, NNG_OPT_RECVMAXSZ, 200)); + NUTS_PASS(nng_listener_create(&l0, s0, "socket://")); + NUTS_PASS(nng_socket_get_size(s0, NNG_OPT_RECVMAXSZ, &sz)); + NUTS_TRUE(sz == 200); + NUTS_PASS(nng_listener_set_size(l0, NNG_OPT_RECVMAXSZ, 100)); + NUTS_PASS(nng_listener_get_size(l0, NNG_OPT_RECVMAXSZ, &scratch)); + NUTS_ASSERT(scratch == 100); + NUTS_PASS(nng_listener_start(l0, 0)); + NUTS_PASS(nng_listener_set_int(l0, NNG_OPT_SOCKET_FD, fds[0])); + + NUTS_OPEN(s1); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_send(s1, msg, 95, 0)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_recv(s0, buf, &sz, 0)); + NUTS_TRUE(sz == 95); + NUTS_PASS(nng_send(s1, msg, 150, 0)); + NUTS_FAIL(nng_recv(s0, buf, &sz, 0), NNG_ETIMEDOUT); + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); +} + +void +test_sfd_large(void) +{ + char *buf; + nng_socket s0; + nng_socket s1; + nng_listener l0; + nng_listener l1; + size_t sz; + nng_msg *msg; + int fds[2]; + + sz = 1U << 20; + buf = nng_alloc(sz); // a MB + buf[sz - 1] = 0; + memset(buf, 'A', sz - 1); + NUTS_PASS(nng_socket_pair(fds)); + NUTS_PASS(nng_msg_alloc(&msg, sz)); + + NUTS_OPEN(s0); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_size(s0, NNG_OPT_RECVMAXSZ, 2U << 20)); + NUTS_PASS(nng_listener_create(&l0, s0, "socket://")); + NUTS_PASS(nng_listener_start(l0, 0)); + NUTS_PASS(nng_listener_set_int(l0, NNG_OPT_SOCKET_FD, fds[0])); + + NUTS_OPEN(s1); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_size(s1, NNG_OPT_RECVMAXSZ, 2U << 20)); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[1])); + nng_msleep(100); + + nng_msg_clear(msg); + NUTS_PASS(nng_msg_append(msg, buf, sz)); + NUTS_PASS(nng_sendmsg(s0, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + NUTS_ASSERT(strcmp(nng_msg_body(msg), buf) == 0); + + memset(nng_msg_body(msg), 'B', sz - 1); + memset(buf, 'B', sz - 1); + + NUTS_PASS(nng_sendmsg(s1, msg, 0)); + NUTS_PASS(nng_recvmsg(s0, &msg, 0)); + NUTS_ASSERT(strcmp(nng_msg_body(msg), buf) == 0); + + nng_msg_clear(msg); + NUTS_PASS(nng_msg_append(msg, buf, sz)); + NUTS_PASS(nng_sendmsg(s0, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + NUTS_ASSERT(strcmp(nng_msg_body(msg), buf) == 0); + + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); + nng_msg_free(msg); + nng_free(buf, sz); +} + +void +test_sockfd_close_pending(void) +{ + // this test verifies that closing a socket pair that has not + // started negotiation with the other side still works. + int fds[2]; + nng_socket s0; + nng_listener l; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + nng_listen(s0, "socket://", &l, 0); + nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0]); + nng_msleep(10); + NUTS_CLOSE(s0); + close(fds[1]); +} + +void +test_sockfd_close_peer(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0; + nng_listener l; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + close(fds[1]); + nng_msleep(100); + NUTS_CLOSE(s0); +} + +void +test_sockfd_listener_sockaddr(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0; + nng_listener l; + nng_sockaddr sa; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa)); + NUTS_ASSERT(sa.s_family == NNG_AF_UNSPEC); + close(fds[1]); + NUTS_CLOSE(s0); +} + +void +test_sockfd_pipe_sockaddr(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0, s1; + nng_listener l; + nng_sockaddr sa; + nng_msg *msg; + nng_pipe p; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_OPEN(s1); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listen(s1, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + + NUTS_SEND(s0, "something"); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + p = nng_msg_get_pipe(msg); + NUTS_PASS(nng_pipe_get_addr(p, NNG_OPT_LOCADDR, &sa)); + NUTS_ASSERT(sa.s_family == NNG_AF_UNSPEC); + NUTS_PASS(nng_pipe_get_addr(p, NNG_OPT_REMADDR, &sa)); + NUTS_ASSERT(sa.s_family == NNG_AF_UNSPEC); + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); + nng_msg_free(msg); +} + +void +test_sockfd_pipe_peer(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0, s1; + nng_listener l; + nng_msg *msg; + nng_pipe p; + uint64_t id; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_OPEN(s1); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listen(s1, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + + NUTS_SEND(s0, "something"); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + p = nng_msg_get_pipe(msg); + NUTS_ASSERT(nng_pipe_id(p) != -1); +#if defined(NNG_PLATFORM_DARWIN) || defined(NNG_PLATFORM_LINUX) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_PID, &id)); + NUTS_ASSERT(id == (uint64_t) getpid()); +#endif +#if defined(NNG_PLATFORM_DARWIN) || defined(NNG_PLATFORM_LINUX) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_UID, &id)); + NUTS_ASSERT(id == (uint64_t) getuid()); +#endif +#if defined(NNG_PLATFORM_DARWIN) || defined(NNG_PLATFORM_LINUX) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_GID, &id)); + NUTS_ASSERT(id == (uint64_t) getgid()); +#endif +#if defined(NNG_PLATFORM_SUNOS) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_ZONEID, &id)); + NUTS_ASSERT(id == (uint64_t) getzoneid()); +#else + NUTS_FAIL( + nng_pipe_get_uint64(p, NNG_OPT_PEER_ZONEID, &id), NNG_ENOTSUP); +#endif + + nng_msg_free(msg); + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); +} + +void +test_sfd_listen_full(void) +{ +#ifndef NNG_SFD_LISTEN_QUEUE +#define NNG_SFD_LISTEN_QUEUE 16 +#endif + + int fds[NNG_SFD_LISTEN_QUEUE * 2]; + nng_socket s; + int i; + nng_listener l; + for (i = 0; i < NNG_SFD_LISTEN_QUEUE * 2; i += 2) { + int pair[2]; + NUTS_PASS(nng_socket_pair(pair)); + fds[i] = pair[0]; + fds[i+1] = pair[1]; + } + NUTS_OPEN(s); + NUTS_PASS(nng_listener_create(&l, s, "socket://")); + for (i = 0; i < NNG_SFD_LISTEN_QUEUE * 2; i++) { + if (i < NNG_SFD_LISTEN_QUEUE) { + NUTS_PASS(nng_listener_set_int( + l, NNG_OPT_SOCKET_FD, fds[i])); + } else { + NUTS_FAIL( + nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[i]), + NNG_ENOSPC); + } + } + for (i = 0; i < NNG_SFD_LISTEN_QUEUE * 2; i++) { + close(fds[i]); + } + NUTS_CLOSE(s); +} + +void +test_sfd_fd_option_type(void) +{ + nng_socket s; + nng_listener l; + + NUTS_OPEN(s); + NUTS_PASS(nng_listener_create(&l, s, "socket://")); + NUTS_FAIL(nng_listener_set_bool(l, NNG_OPT_SOCKET_FD, false), NNG_EBADTYPE); + NUTS_CLOSE(s); +} + +void +test_sfd_fd_dev_zero(void) +{ +#ifdef NNG_PLATFORM_POSIX + nng_socket s; + nng_listener l; + int fd; + + // dev/zero produces a stream of zero bytes leading to protocol error + NUTS_ASSERT((fd = open("/dev/zero", O_RDONLY, 0777)) >= 0); + + NUTS_OPEN(s); + NUTS_PASS(nng_listener_create(&l, s, "socket://")); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fd)); + nng_msleep(100); + NUTS_CLOSE(s); +#endif +} + +NUTS_TESTS = { + { "socket connect fail", test_sfd_connect_fail }, + { "socket malformed address", test_sfd_malformed_address }, +#ifdef NNG_HAVE_SOCKETPAIR + { "socket listen", test_sfd_listen }, + { "socket accept", test_sfd_accept }, + { "socket exchange", test_sfd_exchange }, + { "socket exchange late", test_sfd_exchange_late }, + { "socket recv max", test_sfd_recv_max }, + { "socket exchange large", test_sfd_large }, + { "socket close pending", test_sockfd_close_pending }, + { "socket close peer", test_sockfd_close_peer }, + { "socket listener address", test_sockfd_listener_sockaddr }, + { "socket pipe address", test_sockfd_pipe_sockaddr }, + { "socket pipe peer id", test_sockfd_pipe_peer }, + { "socket listen full", test_sfd_listen_full }, + { "socket bad fd type", test_sfd_fd_option_type }, + { "socket dev zero", test_sfd_fd_dev_zero }, +#endif + + { NULL, NULL }, +}; \ No newline at end of file diff --git a/src/supplemental/util/platform.c b/src/supplemental/util/platform.c index ddc2d088..99daaef6 100644 --- a/src/supplemental/util/platform.c +++ b/src/supplemental/util/platform.c @@ -164,3 +164,9 @@ nng_random(void) (void) nni_init(); return (nni_random()); } + +int +nng_socket_pair(int *fds) +{ + return (nni_socket_pair(fds)); +} \ No newline at end of file -- cgit v1.2.3-70-g09d2