diff options
| author | Garrett D'Amore <garrett@damore.org> | 2023-12-18 01:12:01 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2023-12-29 15:20:21 -0800 |
| commit | 9caabf76621ba81e7fed5df42971f355b648ff59 (patch) | |
| tree | 2f243965e202862f36c9d57c3053f57806bf70cf | |
| parent | e5261536d4f72dccbf1a424bfe426f9635b9d1c3 (diff) | |
| download | nng-9caabf76621ba81e7fed5df42971f355b648ff59.tar.gz nng-9caabf76621ba81e7fed5df42971f355b648ff59.tar.bz2 nng-9caabf76621ba81e7fed5df42971f355b648ff59.zip | |
fixes #1746 Create a new socket:// transport for socketpair() based connections
This transport only listens, and creates connections when
the application calls setopt on the lister with NNG_OPT_SOCKET_FD,
to pass a file descriptor. The FD is turned into an nng_stream,
and utilized for SP. The protocol over the descriptor is identical
to the TCP protocol (not the IPC protocol).
The options for peer information are borrowed from the IPC transport,
as they may be useful for these purposes.
This includes a test suite and full documentation.
30 files changed, 2776 insertions, 82 deletions
diff --git a/cmake/NNGOptions.cmake b/cmake/NNGOptions.cmake index b8067bca..12075f4c 100644 --- a/cmake/NNGOptions.cmake +++ b/cmake/NNGOptions.cmake @@ -1,5 +1,5 @@ # -# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -127,6 +127,9 @@ CMAKE_DEPENDENT_OPTION(NNG_TRANSPORT_WSS "Enable WSS transport." ON "NNG_ENABLE_TLS" OFF) mark_as_advanced(NNG_TRANSPORT_WSS) +option (NNG_TRANSPORT_FDC "Enable File Descriptor transport (EXPERIMENTAL)" ON) +mark_as_advanced(NNG_TRANSPORT_FDC) + # ZeroTier option (NNG_TRANSPORT_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF) mark_as_advanced(NNG_TRANSPORT_ZEROTIER) diff --git a/docs/man/CMakeLists.txt b/docs/man/CMakeLists.txt index a33d4ee8..17d43c8e 100644 --- a/docs/man/CMakeLists.txt +++ b/docs/man/CMakeLists.txt @@ -302,6 +302,7 @@ if (NNG_ENABLE_DOC) nng_mtx_unlock nng_opts_parse nng_random + nng_socket_pair nng_thread_create nng_thread_destroy nng_thread_set_name @@ -388,6 +389,7 @@ if (NNG_ENABLE_DOC) nng_rep nng_req nng_respondent + nng_socket nng_sub nng_surveyor nng_tcp diff --git a/docs/man/libnng.3.adoc b/docs/man/libnng.3.adoc index 6f64de41..c9df9c0d 100644 --- a/docs/man/libnng.3.adoc +++ b/docs/man/libnng.3.adoc @@ -305,6 +305,7 @@ as a convenience to aid in creating portable applications. |xref:nng_mtx_unlock.3supp.adoc[nng_mtx_unlock()]|unlock mutex |xref:nng_opts_parse.3supp.adoc[nng_opts_parse()]|parse command line options |xref:nng_random.3supp.adoc[nng_random()]|get random number +|xref:nng_socket_pair.3supp.adoc[nng_socket_pair()]|create connected pair of BSD sockets |xref:nng_thread_create.3supp.adoc[nng_thread_create()]|create thread |xref:nng_thread_destroy.3supp.adoc[nng_thread_destroy()]|reap thread |xref:nng_thread_set_name.3supp.adoc[nng_thread_set_name()]|set thread name diff --git a/docs/man/nng.7.adoc b/docs/man/nng.7.adoc index 1097a5f8..c0e1a640 100644 --- a/docs/man/nng.7.adoc +++ b/docs/man/nng.7.adoc @@ -69,6 +69,7 @@ xref:nng_surveyor.7.adoc[nng_surveyor(7)]:: Surveyor side of survey protocol [horizontal] xref:nng_inproc.7.adoc[nng_inproc(7)]:: Intra-process transport xref:nng_ipc.7.adoc[nng_ipc(7)]:: Inter-process transport +xref:nng_socket.7.adoc[nng_socket(7)]:: BSD socket transport xref:nng_tls.7.adoc[nng_tls(7)]:: TLSv1.2 over TCP transport xref:nng_tcp.7.adoc[nng_tcp(7)]:: TCP (and TCPv6) transport xref:nng_ws.7.adoc[nng_ws(7)]:: WebSocket transport diff --git a/docs/man/nng_ipc.7.adoc b/docs/man/nng_ipc.7.adoc index 856a00ac..c085e4d2 100644 --- a/docs/man/nng_ipc.7.adoc +++ b/docs/man/nng_ipc.7.adoc @@ -1,6 +1,6 @@ = nng_ipc(7) // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This document is supplied under the terms of the MIT License, a @@ -107,14 +107,14 @@ except for abstract sockets, which use xref:nng_sockaddr_abstract.5.adoc[`nng_so The following transport options are supported by this transport, where supported by the underlying platform. -* xref:nng_ipc_options.5.adoc#NNG_OPT_IPC_PEER_GID[`NNG_OPT_IPC_PEER_GID`] -* xref:nng_ipc_options.5.adoc#NNG_OPT_IPC_PEER_PID[`NNG_OPT_IPC_PEER_PID`] -* xref:nng_ipc_options.5.adoc#NNG_OPT_IPC_PEER_UID[`NNG_OPT_IPC_PEER_UID`] -* xref:nng_ipc_options.5.adoc#NNG_OPT_IPC_PEER_ZONEID[`NNG_OPT_IPC_PEER_ZONEID`] * xref:nng_ipc_options.5.adoc#NNG_OPT_IPC_PERMISSIONS[`NNG_OPT_IPC_PERMISSIONS`] * xref:nng_ipc_options.5.adoc#NNG_OPT_IPC_SECURITY_DESCRIPTOR[`NNG_OPT_IPC_SECURITY_DESCRIPTOR`] * xref:nng_options.5.adoc#NNG_OPT_LOCADDR[`NNG_OPT_LOCADDR`] * xref:nng_options.5.adoc#NNG_OPT_REMADDR[`NNG_OPT_REMADDR`] +* xref:nng_options.5.adoc#NNG_OPT_PEER_GID[`NNG_OPT_PEER_GID`] +* xref:nng_options.5.adoc#NNG_OPT_PEER_PID[`NNG_OPT_PEER_PID`] +* xref:nng_options.5.adoc#NNG_OPT_PEER_UID[`NNG_OPT_PEER_UID`] +* xref:nng_options.5.adoc#NNG_OPT_PEER_ZONEID[`NNG_OPT_PEER_ZONEID`] * xref:nng_options.5.adoc#NNG_OPT_URL[`NNG_OPT_URL`] == SEE ALSO diff --git a/docs/man/nng_ipc_options.5.adoc b/docs/man/nng_ipc_options.5.adoc index 72f383ea..516fbe80 100644 --- a/docs/man/nng_ipc_options.5.adoc +++ b/docs/man/nng_ipc_options.5.adoc @@ -1,6 +1,6 @@ = nng_ipc_options(5) // -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -43,43 +43,9 @@ have other access restrictions. An attempt has been made to include details about such restrictions in the description of the option. -NOTE: The availability of any of the following options is platform-specific, +NOTE: The availability of the following options is platform-specific, as the implementations of IPC are quite different on Windows and POSIX systems. -=== IPC Options - -[[NNG_OPT_IPC_PEER_GID]]((`NNG_OPT_IPC_PEER_GID`)):: -(`uint64_t`) -This read-only option provides a connected peer's primary -group id. -This is the effective group id of the peer when either the underlying -`listen()` or `connect()` calls were made, and is not forgeable. -This option is generally only available on POSIX systems. - -[[NNG_OPT_IPC_PEER_PID]]((`NNG_OPT_IPC_PEER_PID`)):: -(`uint64_t`) -This read-only option provides the the process id -of the connected peer. -This option is only available on Windows, Linux, and certain other systems. -+ -NOTE: Applications should not assume that the process ID does not change, -as it is possible (although unsupported!) for a nefarious process to pass a -file descriptor between processes. -However, it is not possible for a nefarious application to forge the identity -of a well-behaved one using this method. - -[[NNG_OPT_IPC_PEER_UID]]((`NNG_OPT_IPC_PEER_UID`)):: -(`uint64_t`) -This read-only option provides a connected peer's user id. -This is the effective user id of the peer when either the underlying -`listen()` or `connect()` calls were made, and is not forgeable. -This option is generally only available on POSIX systems. - -[[NNG_OPT_IPC_PEER_ZONEID]]((`NNG_OPT_IPC_PEER_ZONEID`)):: -(`uint64_t`) -This read-only option provides a connected peer's the zone id. -Zones (and this option) are only supported on Solaris and illumos systems. - [[NNG_OPT_IPC_PERMISSIONS]]((`NNG_OPT_IPC_PERMISSIONS`)):: (`int`) This write-only option may be applied to a listener to configure the @@ -115,10 +81,18 @@ named pipe. The value is a pointer, `PSECURITY_DESCRIPTOR`, and may only be applied to listeners that have not been started yet. +=== Common Platform Specific Options + +The following options are supported by this transport when the underlying platform supports them: + +* xref:nng_options.5.adoc#NNG_OPT_PEER_GID[`NNG_OPT_PEER_GID`] (also available as `NNG_OPT_IPC_PEER_GID`) +* xref:nng_options.5.adoc#NNG_OPT_PEER_PID[`NNG_OPT_PEER_PID`] (also available as `NNG_OPT_IPC_PEER_PID`) +* xref:nng_options.5.adoc#NNG_OPT_PEER_UID[`NNG_OPT_PEER_UID`] (also available as `NNG_OPT_IPC_PEER_UID`) +* xref:nng_options.5.adoc#NNG_OPT_PEER_ZONEID[`NNG_OPT_PEER_ZONEID`] (also available as `NNG_OPT_IPC_PEER_ZONEID`) + === Inherited Options -Generally, the following option values are also available for TLS objects, -when appropriate for the context: +Generally, the following option values are also available when appropriate for the context: * xref:nng_options.5.adoc#NNG_OPT_LOCADDR[`NNG_OPT_LOCADDR`] * xref:nng_options.5.adoc#NNG_OPT_REMADDR[`NNG_OPT_REMADDR`] @@ -126,11 +100,11 @@ when appropriate for the context: == SEE ALSO [.text-left] -xref:nng_ipc_dialer_getopt.3ipc.adoc[nng_ipc_dialer_getopt(3ipc)], -xref:nng_ipc_dialer_setopt.3ipc.adoc[nng_ipc_dialer_setopt(3ipc)], -xref:nng_ipc_getopt.3ipc.adoc[nng_ipc_getopt(3ipc)], -xref:nng_ipc_listener_getopt.3ipc.adoc[nng_ipc_listener_getopt(3ipc)], -xref:nng_ipc_listener_setopt.3ipc.adoc[nng_ipc_listener_setopt(3ipc)], -xref:nng_ipc_setopt.3ipc.adoc[nng_ipc_setopt(3ipc)], +xref:nng_dialer_get.3.adoc[nng_dialer_get(3)], +xref:nng_dialer_set.3.adoc[nng_dialer_set(3)], +xref:nng_listener_get.3.adoc[nng_listener_get(3)], +xref:nng_listener_set.3.adoc[nng_listener_set(3)], +xref:nng_pipe_get.3.adoc[nng_pipe_get(3)], xref:nng_options.5.adoc[nng_options(5)] -xref:nng.7.adoc[nng(7)] +xref:nng.7.adoc[nng(7)], +xref:nng_ipc.7.adoc[nng_ipc(7)] diff --git a/docs/man/nng_options.5.adoc b/docs/man/nng_options.5.adoc index e4e5124d..dd971f9f 100644 --- a/docs/man/nng_options.5.adoc +++ b/docs/man/nng_options.5.adoc @@ -1,6 +1,6 @@ = nng_options(5) // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -39,6 +39,10 @@ nng_options - socket, dialer, listener, and pipe options #define NNG_OPT_RECVMAXSZ "recv-size-max" #define NNG_OPT_RECONNMINT "reconnect-time-min" #define NNG_OPT_RECONNMAXT "reconnect-time-max" +#define NNG_OPT_PEER_GID "ipc:peer-gid" +#define NNG_OPT_PEER_PID "ipc:peer-pid" +#define NNG_OPT_PEER_UID "ipc:peer-uid" +#define NNG_OPT_PEER_ZONEID "ipc:peer-zoneid" ---- == DESCRIPTION @@ -328,27 +332,59 @@ This read-only option is used to obtain the 16-bit number for the socket's proto This read-only option is used to obtain the 16-bit number of the peer protocol for the socket. -[[NNG_OPT_PROTONAME]] -((`NNG_OPT_PROTONAME`)):: -(string) -This read-only option is used to obtain the name of the socket's protocol. +[[NNG_OPT_PEER_GID]] +((`NNG_OPT_PEER_GID`)):: +(`uint64_t`) +This read-only option provides a connected peer's primary group id, when known. +This is the effective group id of the peer when either the underlying +`listen()` or `connect()` calls were made, and is not forgeable. +This option is generally only available on POSIX systems, only on certain transports. + +[[NNG_OPT_PEER_PID]] +((`NNG_OPT_PEER_PID`)):: +(`uint64_t`) +This read-only option provides the process id of the connected peer, when known. +This option is only available on certain platforms and transports. ++ +NOTE: Applications should not assume that the process ID does not change, +as it may be possible for a process to pass a file descriptor between processes. +However, it is not possible for a nefarious application to forge the identity +of a well-behaved one using this method. + +[[NNG_OPT_PEER_UID]] +((`NNG_OPT_PEER_UID`)):: +(`uint64_t`) +This read-only option provides a connected peer's user id. +This is the effective user id of the peer when either the underlying +`listen()` or `connect()` calls were made, and cannot be forged. +This option is generally only available on POSIX systems, on certain transports. + +[[NNG_OPT_PEER_ZONEID]] +((`NNG_OPT_PEER_ZONEID`)):: +(`uint64_t`) +This read-only option provides a connected peer's the zone id. +Zones (and this option) are only supported on Solaris and illumos systems, on select transports. [[NNG_OPT_PEERNAME]] ((`NNG_OPT_PEERNAME`)):: (string) -This read-only option is used to obtain the name of the peer protocol for -the socket. +This read-only option is used to obtain the name of the peer protocol for the socket. + +[[NNG_OPT_PROTONAME]] +((`NNG_OPT_PROTONAME`)):: +(string) +This read-only option is used to obtain the name of the socket's protocol. == SEE ALSO [.text-left] -xref:nng_dialer_getopt.3.adoc[nng_dialer_getopt(3)], -xref:nng_dialer_setopt.3.adoc[nng_dialer_setopt(3)], -xref:nng_getopt.3.adoc[nng_getopt(3)], -xref:nng_listener_getopt.3.adoc[nng_listener_getopt(3)], -xref:nng_listener_setopt.3.adoc[nng_listener_setopt(3)], -xref:nng_pipe_getopt.3.adoc[nng_pipe_getopt(3)], -xref:nng_setopt.3.adoc[nng_setopt(3)], +xref:nng_dialer_get.3.adoc[nng_dialer_get(3)], +xref:nng_dialer_set.3.adoc[nng_dialer_set(3)], +xref:nng_listener_get.3.adoc[nng_listener_get(3)], +xref:nng_listener_set.3.adoc[nng_listener_set(3)], +xref:nng_pipe_get.3.adoc[nng_pipe_get(3)], +xref:nng_socket_get.3.adoc[nng_socket_get(3)], +xref:nng_socket_set.3.adoc[nng_socket_set(3)], xref:nng_ipc_options.5.adoc[nng_ipc_options(5)], xref:nng_tcp_options.5.adoc[nng_tcp_options(5)], xref:nng_tls_options.5.adoc[nng_tls_options(5)], diff --git a/docs/man/nng_socket.7.adoc b/docs/man/nng_socket.7.adoc new file mode 100644 index 00000000..f575eed7 --- /dev/null +++ b/docs/man/nng_socket.7.adoc @@ -0,0 +1,74 @@ += nng_socket(7) +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_socket - BSD Socket transport (experimental) + +== DESCRIPTION + +(((BSD Socket)))(((transport, _socket_))) +The ((_socket_ transport)) provides communication support between +peers across a arbitrary BSD sockets, such as those that are +created with xref:nng_socket_pair.3supp.adoc[`nng_socket_pair()`]. + +This transport only supports xref:nng_listener.5.adoc[listeners], using xref:nng_listener_create.3.adoc[`nng_listener_create()`]. + +NOTE: Attempts to create a xref:nng_dialer.5.adoc[dialer] using this transport will result in `NNG_ENOTSUP`. + +The socket file descriptor is passed to the listener using the `NNG_OPT_SOCKET_FD` option (as an integer). +Setting this option (which is read-only and can be set multiple times) will cause the listener +to create a xref:nng_pipe.5.adoc[pipe] associated backed by the file descriptor. + +The protocol between peers using this pipe is at present compatible with the protocol used for the +xref:nng_tcp.7.adoc[TCP] transport, but this is an implementation detail and subject to change without notice. + +NOTE: This transport is *experimental*, and at present is only supported on POSIX platforms. + +=== Registration + +No special action is necessary to register this transport. + +=== URI Format + +(((URI, `socket://`))) +This transport uses the URL `socket://`, without further qualification. + +=== Socket Address + +The socket address will be of family `NNG_AF_UNSPEC`. +There are no further socket details available. + +=== Transport Options + +The following transport option is available: + +((`NNG_OPT_SOCKET_FD`)):: + +(int) This is a write-only option, that may be set multiple times on a listener. +The listener will create a pipe backed by the given file descriptor passed as an argument. + +Additionally, the following options may be supported on pipes when the platform supports them: + +* xref:nng_options.5.adoc#NNG_OPT_PEER_GID[`NNG_OPT_PEER_GID`] +* xref:nng_options.5.adoc#NNG_OPT_PEER_PID[`NNG_OPT_PEER_PID`] +* xref:nng_options.5.adoc#NNG_OPT_PEER_UID[`NNG_OPT_PEER_UID`] +* xref:nng_options.5.adoc#NNG_OPT_PEER_ZONEID[`NNG_OPT_PEER_ZONEID`] + +== SEE ALSO + +[.text-left] +xref:nng_socket_pair.3supp.adoc[nng_socket_pair(3)], +xref:nng_dialer.5.adoc[nng_dialer(5)], +xref:nng_listener.5.adoc[nng_listener(5)], +xref:nng_options.5.adoc[nng_options(5)], +xref:nng_pipe.5.adoc[nng_pipe(5)], +xref:nng_sockaddr.5.adoc[nng_sockaddr(5)], +xref:nng.7.adoc[nng(7)] diff --git a/docs/man/nng_socket_pair.3supp.adoc b/docs/man/nng_socket_pair.3supp.adoc new file mode 100644 index 00000000..063c9f0b --- /dev/null +++ b/docs/man/nng_socket_pair.3supp.adoc @@ -0,0 +1,53 @@ += nng_socket_pair(3) +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_socket_pair - create a connected pair of BSD sockets + +== SYNOPSIS + +[source, c] +---- +#include <nng/nng.h> +#include <nng/supplemental/util/platform.h> + +int nng_socket_pair(int fds[2]); +---- + +== DESCRIPTION + +The `nng_socket_pair()` function creates a pair of connected BSD sockets. +These sockets, which are returned in the _fds_ array, are suitable for +use with the xref:nng_socket.7.adoc[_socket_] transport. + +On POSIX platforms, this is a thin wrapper around the standard `socketpair()` function, +using the `AF_UNIX` family and the `SOCK_STREAM` socket type. + +NOTE: At present only POSIX platforms implementing `socketpair()` are supported with this function. + +TIP: This function may be useful for creating a shared connection between a parent process and +a child process on UNIX platforms, without requiring the processes use a shared filesystem or TCP connection. + +== RETURN VALUES + +This function returns 0 on success, and non-zero otherwise. + +== ERRORS + +[horizontal] +`NNG_ENOMEM`:: Insufficient memory exists. +`NNG_ENOTSUP`:: This platform does not support socket pairs. + +== SEE ALSO + +[.text-left] +xref:nng_socket.7.adoc[nng_socket(7)], +xref:nng.7.adoc[nng(7)] diff --git a/include/nng/nng.h b/include/nng/nng.h index ce58fb3a..06733d77 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -709,8 +709,8 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe); // TLS options are only used when the underlying transport supports TLS. -// NNG_OPT_TLS_CONFIG is a pointer to an nng_tls_config object. Generally -// this can used with endpoints, although once an endpoint is started, or +// NNG_OPT_TLS_CONFIG is a pointer to a nng_tls_config object. Generally +// this can be used with endpoints, although once an endpoint is started, or // once a configuration is used, the value becomes read-only. Note that // when configuring the object, a hold is placed on the TLS configuration, // using a reference count. When retrieving the object, no such hold is @@ -730,7 +730,7 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe); // NNG_OPT_TLS_CERT_KEY_FILE names a single file that contains a certificate // and key identifying the endpoint. This is a write-only value. This can be -// set multiple times for times for different keys/certs corresponding to +// set multiple times for different keys/certs corresponding to // different algorithms on listeners, whereas dialers only support one. The // file must contain both cert and key as PEM blocks, and the key must // not be encrypted. (If more flexibility is needed, use the TLS configuration @@ -750,13 +750,13 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe); #define NNG_OPT_TLS_SERVER_NAME "tls-server-name" // NNG_OPT_TLS_VERIFIED returns a boolean indicating whether the peer has -// been verified (true) or not (false). Typically this is read-only, and +// been verified (true) or not (false). Typically, this is read-only, and // only available for pipes. This option may return incorrect results if // peer authentication is disabled with `NNG_TLS_AUTH_MODE_NONE`. #define NNG_OPT_TLS_VERIFIED "tls-verified" // NNG_OPT_TLS_PEER_CN returns the string with the common name -// of the peer certificate. Typically this is read-only and +// of the peer certificate. Typically, this is read-only and // only available for pipes. This option may return incorrect results if // peer authentication is disabled with `NNG_TLS_AUTH_MODE_NONE`. #define NNG_OPT_TLS_PEER_CN "tls-peer-cn" @@ -811,24 +811,30 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe); // this for security. #define NNG_OPT_IPC_PERMISSIONS "ipc:permissions" +// IPC peer options may also be used in some cases with other socket types. + // Peer UID. This is only available on POSIX style systems. -#define NNG_OPT_IPC_PEER_UID "ipc:peer-uid" +#define NNG_OPT_PEER_UID "ipc:peer-uid" +#define NNG_OPT_IPC_PEER_UID NNG_OPT_PEER_UID // Peer GID (primary group). This is only available on POSIX style systems. -#define NNG_OPT_IPC_PEER_GID "ipc:peer-gid" +#define NNG_OPT_PEER_GID "ipc:peer-gid" +#define NNG_OPT_IPC_PEER_GID NNG_OPT_PEER_GID // Peer process ID. Available on Windows, Linux, and SunOS. -// In theory we could obtain this with the first message sent, +// In theory, we could obtain this with the first message sent, // but we have elected not to do this for now. (Nice RFE for a FreeBSD // guru though.) -#define NNG_OPT_IPC_PEER_PID "ipc:peer-pid" +#define NNG_OPT_PEER_PID "ipc:peer-pid" +#define NNG_OPT_IPC_PEER_PID NNG_OPT_PEER_PID // Peer Zone ID. Only on SunOS systems. (Linux containers have no // definable kernel identity; they are a user-land fabrication made up // from various pieces of different namespaces. FreeBSD does have // something called JailIDs, but it isn't obvious how to determine this, // or even if processes can use IPC across jail boundaries.) -#define NNG_OPT_IPC_PEER_ZONEID "ipc:peer-zoneid" +#define NNG_OPT_PEER_ZONEID "ipc:peer-zoneid" +#define NNG_OPT_IPC_PEER_ZONEID NNG_OPT_PEER_ZONEID // WebSocket Options. @@ -893,6 +899,16 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe); // peers that cannot be coerced into sending binary frames. #define NNG_OPT_WS_RECV_TEXT "ws:recv-text" +// NNG_OPT_SOCKET_FD is a write-only integer property that is used to +// file descriptors (or FILE HANDLE objects on Windows) to a +// socket:// based listener. This file descriptor will be taken +// over and used as a stream connection. The protocol is compatible +// with SP over TCP. This facility is experimental, and intended to +// allow use with descriptors created via socketpair() or similar. +// Note that unidirectional pipes (such as those from pipe(2) or mkfifo) +// are not supported. +#define NNG_OPT_SOCKET_FD "socket:fd" + // XXX: TBD: priorities, ipv4only // Statistics. These are for informational purposes only, and subject diff --git a/include/nng/supplemental/util/platform.h b/include/nng/supplemental/util/platform.h index feca858a..dc3c47dd 100644 --- a/include/nng/supplemental/util/platform.h +++ b/include/nng/supplemental/util/platform.h @@ -104,6 +104,13 @@ NNG_DECL void nng_cv_wake1(nng_cv *); // nng_random returns a "strong" (cryptographic sense) random number. NNG_DECL uint32_t nng_random(void); +// nng_socket_pair is used to create a bound pair of file descriptors +// typically using the socketpair() call. The descriptors are backed +// by reliable, bidirectional, byte streams. This will return NNG_ENOTSUP +// if the platform lacks support for this. The argument is a pointer +// to an array of file descriptors (or HANDLES or similar). +NNG_DECL int nng_socket_pair(int [2]); + #ifdef __cplusplus } #endif diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index f003d756..9e5a6bec 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -24,6 +24,8 @@ nng_sources( device.h dialer.c dialer.h + sockfd.c + sockfd.h file.c file.h idhash.c diff --git a/src/core/dialer.c b/src/core/dialer.c index 55a46efb..91d18dc8 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -411,7 +411,7 @@ int nni_dialer_start(nni_dialer *d, unsigned flags) { int rv = 0; - nni_aio *aio; + nni_aio *aio = NULL; if (nni_atomic_flag_test_and_set(&d->d_started)) { return (NNG_ESTATE); diff --git a/src/core/platform.h b/src/core/platform.h index 89759921..0b5ec634 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -386,9 +386,9 @@ extern int nni_ipc_listener_alloc(nng_stream_listener **, const nng_url *); typedef struct nni_plat_udp nni_plat_udp; // nni_plat_udp_open initializes a UDP socket, binding to the local -// address specified specified in the AIO. The remote address is +// address specified in the AIO. The remote address is // not used. The resulting nni_plat_udp structure is returned in the -// the aio's a_pipe. +// aio's a_pipe. extern int nni_plat_udp_open(nni_plat_udp **, nni_sockaddr *); // nni_plat_udp_close closes the underlying UDP socket. @@ -434,6 +434,19 @@ extern void nni_plat_pipe_close(int, int); extern int nni_plat_udp_sockname(nni_plat_udp *, nni_sockaddr *); +// nni_socket_pair is used to create a socket pair using socketpair() +// on POSIX systems. (Windows might provide a similar solution, using +// AF_UNIX at some point, in which case the arguments will actually be +// an array of HANDLEs.) If not supported, this returns NNG_ENOTSUP. +// +// This API can only create a pair of open file descriptors, suitable for use +// with the socket transport, each bound to the other. The transport must be +// a bidirectional reliable byte stream. This should be suitable for use +// in APIs to transport file descriptors, or across a fork/exec boundary (so +// that child processes may use these with socket to inherit a socket that is +// connected to the parent.) +extern int nni_socket_pair(int *); + // // File/Store Support // diff --git a/src/core/sockfd.c b/src/core/sockfd.c new file mode 100644 index 00000000..1b4dbc1d --- /dev/null +++ b/src/core/sockfd.c @@ -0,0 +1,231 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdint.h> +#include <string.h> + +#include <nng/nng.h> + +#include "core/nng_impl.h" +#include "core/sockfd.h" + +// We will accept up to this many FDs to be queued up for +// accept, before we start rejecting with NNG_ENOSPC. Once +// accept is performed, then another slot is available. +#define NNG_SFD_LISTEN_QUEUE 16 + +int +nni_sfd_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) +{ + NNI_ARG_UNUSED(dp); + NNI_ARG_UNUSED(url); + // No dialer support for this. + return (NNG_ENOTSUP); +} + +typedef struct { + nng_stream_listener ops; + int listen_cnt; // how many FDs are waiting + int listen_q[NNG_SFD_LISTEN_QUEUE]; + bool closed; + nni_list accept_q; + nni_mtx mtx; +} sfd_listener; + +static void +sfd_listener_free(void *arg) +{ + sfd_listener *l = arg; + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +static void +sfd_listener_close(void *arg) +{ + nni_aio *aio; + sfd_listener *l = arg; + nni_mtx_lock(&l->mtx); + l->closed = true; + while ((aio = nni_list_first(&l->accept_q)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + for (int i = 0; i < l->listen_cnt; i++) { + nni_sfd_close_fd(l->listen_q[i]); + } + nni_mtx_unlock(&l->mtx); +} + +static int +sfd_listener_listen(void *arg) +{ + NNI_ARG_UNUSED(arg); + // nothing really for us to do + return (0); +} + +static void +sfd_start_conn(sfd_listener *l, nni_aio *aio) +{ + int fd; + int rv; + nni_sfd_conn *c; + NNI_ASSERT(l->listen_cnt > 0); + fd = l->listen_q[0]; + for (int i = 1; i < l->listen_cnt; i++) { + l->listen_q[i] = l->listen_q[i + 1]; + } + l->listen_cnt--; + if ((rv = nni_sfd_conn_alloc(&c, fd)) != 0) { + nni_aio_finish_error(aio, rv); + nni_sfd_close_fd(fd); + } else { + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +static void +sfd_cancel_accept(nni_aio *aio, void *arg, int rv) +{ + sfd_listener *l = arg; + + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +static void +sfd_listener_accept(void *arg, nng_aio *aio) +{ + sfd_listener *l = arg; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + if (l->listen_cnt) { + sfd_start_conn(l, aio); + } else { + nni_aio_schedule(aio, sfd_cancel_accept, l); + nni_aio_list_append(&l->accept_q, aio); + } + nni_mtx_unlock(&l->mtx); +} + +static int +sfd_listener_set_fd(void *arg, const void *buf, size_t sz, nni_type t) +{ + sfd_listener *l = arg; + nni_aio *aio; + int fd; + int rv; + + if ((rv = nni_copyin_int(&fd, buf, sz, NNI_MININT, NNI_MAXINT, t)) != + 0) { + return (rv); + } + + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + + if (l->listen_cnt == NNG_SFD_LISTEN_QUEUE) { + nni_mtx_unlock(&l->mtx); + return (NNG_ENOSPC); + } + + l->listen_q[l->listen_cnt++] = fd; + + // if someone was waiting in accept, give it to them now + if ((aio = nni_list_first(&l->accept_q)) != NULL) { + nni_aio_list_remove(aio); + sfd_start_conn(l, aio); + } + + nni_mtx_unlock(&l->mtx); + return (0); +} + +static int +sfd_listener_get_addr(void *arg, void *buf, size_t *szp, nni_type t) +{ + NNI_ARG_UNUSED(arg); + nng_sockaddr sa; + sa.s_family = NNG_AF_UNSPEC; + return (nni_copyout_sockaddr(&sa, buf, szp, t)); +} + +static const nni_option sfd_listener_options[] = { + { + .o_name = NNG_OPT_SOCKET_FD, + .o_set = sfd_listener_set_fd, + }, + { + .o_name = NNG_OPT_LOCADDR, + .o_get = sfd_listener_get_addr, + }, + { + .o_name = NULL, + }, +}; + +static int +sfd_listener_get( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + sfd_listener *l = arg; + return (nni_getopt(sfd_listener_options, name, l, buf, szp, t)); +} + +static int +sfd_listener_set( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + sfd_listener *l = arg; + return (nni_setopt(sfd_listener_options, name, l, buf, sz, t)); +} + +int +nni_sfd_listener_alloc(nng_stream_listener **lp, const nng_url *url) +{ + sfd_listener *l; + + NNI_ARG_UNUSED(url); + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + memset(l->listen_q, 0, sizeof(l->listen_q)); + l->listen_cnt = 0; + nni_aio_list_init(&l->accept_q); + nni_mtx_init(&l->mtx); + + l->ops.sl_free = sfd_listener_free; + l->ops.sl_close = sfd_listener_close; + l->ops.sl_listen = sfd_listener_listen; + l->ops.sl_accept = sfd_listener_accept; + l->ops.sl_get = sfd_listener_get; + l->ops.sl_set = sfd_listener_set; + + *lp = (void *) l; + return (0); +} diff --git a/src/core/sockfd.h b/src/core/sockfd.h new file mode 100644 index 00000000..ca37f0e1 --- /dev/null +++ b/src/core/sockfd.h @@ -0,0 +1,28 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_FDC_H +#define CORE_FDC_H + +#include "core/nng_impl.h" + +// the nni_sfd_conn struct is provided by platform code to wrap +// an arbitrary byte stream file descriptor (UNIX) or handle (Windows) +// with a nng_stream. +typedef struct nni_sfd_conn nni_sfd_conn; +extern int nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd); +extern int nni_sfd_dialer_alloc(nng_stream_dialer **, const nng_url *); +extern int nni_sfd_listener_alloc(nng_stream_listener **, const nng_url *); + +// this is used to close a file descriptor, in case we cannot +// create a connection (or if the listener is closed before the +// connection is accepted.) +extern void nni_sfd_close_fd(int fd); + +#endif // CORE_FDC_H diff --git a/src/core/stream.c b/src/core/stream.c index 418bfb15..99002fcd 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -15,6 +15,7 @@ #include "core/nng_impl.h" #include <nng/supplemental/tls/tls.h> +#include "core/sockfd.h" #include "core/tcp.h" #include "supplemental/tls/tls_api.h" #include "supplemental/websocket/websocket.h" @@ -94,6 +95,13 @@ static struct { .dialer_alloc = nni_ws_dialer_alloc, .listener_alloc = nni_ws_listener_alloc, }, +#ifdef NNG_TRANSPORT_FDC + { + .scheme = "socket", + .dialer_alloc = nni_sfd_dialer_alloc, + .listener_alloc = nni_sfd_listener_alloc, + }, +#endif { .scheme = NULL, }, diff --git a/src/platform/posix/CMakeLists.txt b/src/platform/posix/CMakeLists.txt index dcfea221..b8e3782e 100644 --- a/src/platform/posix/CMakeLists.txt +++ b/src/platform/posix/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2020 Staysail Systems, Inc. <info@staystail.tech> +# Copyright 2023 Staysail Systems, Inc. <info@staystail.tech> # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -61,6 +61,7 @@ if (NNG_PLATFORM_POSIX) nng_check_sym(LOCAL_PEERPID sys/un.h NNG_HAVE_LOCALPEERPID) nng_check_sym(getpeerucred ucred.h NNG_HAVE_GETPEERUCRED) nng_check_sym(atomic_flag_test_and_set stdatomic.h NNG_HAVE_STDATOMIC) + nng_check_sym(socketpair sys/socket.h NNG_HAVE_SOCKETPAIR) nng_sources( posix_impl.h @@ -78,9 +79,12 @@ if (NNG_PLATFORM_POSIX) posix_ipcconn.c posix_ipcdial.c posix_ipclisten.c + posix_peerid.c posix_pipe.c posix_resolv_gai.c posix_sockaddr.c + posix_socketpair.c + posix_sockfd.c posix_tcpconn.c posix_tcpdial.c posix_tcplisten.c diff --git a/src/platform/posix/posix_peerid.c b/src/platform/posix/posix_peerid.c new file mode 100644 index 00000000..e0020150 --- /dev/null +++ b/src/platform/posix/posix_peerid.c @@ -0,0 +1,122 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2019 Devolutions <info@devolutions.net> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <errno.h> +#include <fcntl.h> +#include <poll.h> +#include <stdbool.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/uio.h> +#include <sys/un.h> + +#if defined(NNG_HAVE_GETPEERUCRED) +#include <ucred.h> +#elif defined(NNG_HAVE_LOCALPEERCRED) || defined(NNG_HAVE_SOCKPEERCRED) +#include <sys/ucred.h> +#endif +#if defined(NNG_HAVE_GETPEEREID) +#include <sys/types.h> +#include <unistd.h> +#endif + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#ifndef SOL_LOCAL +#define SOL_LOCAL 0 +#endif + + +int +nni_posix_peerid(int fd, uint64_t *euid, uint64_t *egid, uint64_t *prid, + uint64_t *znid) +{ +#if defined(NNG_HAVE_GETPEEREID) && !defined(NNG_HAVE_LOCALPEERCRED) + uid_t uid; + gid_t gid; + + if (getpeereid(fd, &uid, &gid) != 0) { + return (nni_plat_errno(errno)); + } + *euid = uid; + *egid = gid; + *prid = (uint64_t) -1; + *znid = (uint64_t) -1; + return (0); +#elif defined(NNG_HAVE_GETPEERUCRED) + ucred_t *ucp = NULL; + if (getpeerucred(fd, &ucp) != 0) { + return (nni_plat_errno(errno)); + } + *euid = ucred_geteuid(ucp); + *egid = ucred_getegid(ucp); + *prid = ucred_getpid(ucp); + *znid = ucred_getzoneid(ucp); + ucred_free(ucp); + return (0); +#elif defined(NNG_HAVE_SOCKPEERCRED) + struct sockpeercred uc; + socklen_t len = sizeof(uc); + if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) { + return (nni_plat_errno(errno)); + } + *euid = uc.uid; + *egid = uc.gid; + *prid = uc.pid; + *znid = (uint64_t) -1; + return (0); +#elif defined(NNG_HAVE_SOPEERCRED) + struct ucred uc; + socklen_t len = sizeof(uc); + if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) { + return (nni_plat_errno(errno)); + } + *euid = uc.uid; + *egid = uc.gid; + *prid = uc.pid; + *znid = (uint64_t) -1; + return (0); +#elif defined(NNG_HAVE_LOCALPEERCRED) + struct xucred xu; + socklen_t len = sizeof(xu); + if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) { + return (nni_plat_errno(errno)); + } + *euid = xu.cr_uid; + *egid = xu.cr_gid; + *prid = (uint64_t) -1; + *znid = (uint64_t) -1; +#if defined(NNG_HAVE_LOCALPEERPID) // documented on macOS since 10.8 + { + pid_t pid; + if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERPID, &pid, &len) == + 0) { + *prid = (uint64_t) pid; + } + } +#endif // NNG_HAVE_LOCALPEERPID + return (0); +#else + if (fd < 0) { + return (NNG_ECLOSED); + } + NNI_ARG_UNUSED(euid); + NNI_ARG_UNUSED(egid); + NNI_ARG_UNUSED(prid); + NNI_ARG_UNUSED(znid); + return (NNG_ENOTSUP); +#endif +} + diff --git a/src/platform/posix/posix_peerid.h b/src/platform/posix/posix_peerid.h new file mode 100644 index 00000000..57e9abff --- /dev/null +++ b/src/platform/posix/posix_peerid.h @@ -0,0 +1,25 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef PLATFORM_POSIX_PEERID_H +#define PLATFORM_POSIX_PEERID_H + +// This file defines structures we will use for emulating asynchronous I/O +// on POSIX. POSIX lacks the support for callback based asynchronous I/O +// that we have on Windows, although it has a non-widely support aio layer +// that is not very performant on many systems. So we emulate this using +// one of several possible different backends. + +#include "core/nng_impl.h" +#include <sys/types.h> + +int nni_posix_peerid( + int fd, uint64_t *euid, uint64_t *egid, uint64_t *prid, uint64_t *znid); + +#endif // PLATFORM_POSIX_PEERID_H
\ No newline at end of file diff --git a/src/platform/posix/posix_socketpair.c b/src/platform/posix/posix_socketpair.c new file mode 100644 index 00000000..3a01ad2b --- /dev/null +++ b/src/platform/posix/posix_socketpair.c @@ -0,0 +1,43 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_HAVE_SOCKETPAIR +// This provides an implementation of socketpair(), which is supposed +// to be present on XPG6 and newer. This trivial implementation +// only supports SOCK_STREAM over AF_UNIX. Which is sufficient for +// most purposes. The fds array should point to an int[2]. +#include <errno.h> +#include <sys/socket.h> + +int +nni_socket_pair(int fds[2]) +{ + int rv; + rv = socketpair(PF_UNIX, SOCK_STREAM, 0, fds); + if (rv != 0) { + return (nni_plat_errno(errno)); + } + +#ifdef SO_NOSIGPIPE + int set = 1; + setsockopt(fds[0], SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int)); + setsockopt(fds[1], SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int)); +#endif + + return (0); +} +#else +int +nni_socket_pair(int *fds) +{ + return (NNG_ENOTSUP); +} +#endif
\ No newline at end of file diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c new file mode 100644 index 00000000..b0d88a31 --- /dev/null +++ b/src/platform/posix/posix_sockfd.c @@ -0,0 +1,493 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2019 Devolutions <info@devolutions.net> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <errno.h> +#include <poll.h> +#include <stdbool.h> +#include <sys/uio.h> +#include <unistd.h> + +#include "core/sockfd.h" +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_peerid.h" + +struct nni_sfd_conn { + nng_stream stream; + nni_posix_pfd *pfd; + int fd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_reap_node reap; +}; + +static void +sfd_dowrite(nni_sfd_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->writeq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov *aiov; + struct iovec iovec[16]; + + nni_aio_get_iov(aio, &naiov, &aiov); + + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len > 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + if ((n = writev(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + // If we didn't send all the data, the caller will + // resubmit. As a corollary, callers should probably + // only have one message on the write queue at a time. + nni_aio_bump_count(aio, n); + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +static void +sfd_doread(nni_sfd_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->readq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov *aiov; + struct iovec iovec[16]; + + nni_aio_get_iov(aio, &naiov, &aiov); + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + if (n == 0) { + // Zero indicates a closed descriptor. + // This implicitly completes this (all!) aio. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECONNSHUT); + continue; + } + + nni_aio_bump_count(aio, n); + + // We completed the entire operation on this aio. + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +static void +sfd_error(void *arg, int err) +{ + nni_sfd_conn *c = arg; + nni_aio *aio; + + nni_mtx_lock(&c->mtx); + while (((aio = nni_list_first(&c->readq)) != NULL) || + ((aio = nni_list_first(&c->writeq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, err); + } + if (c->pfd != NULL) { + nni_posix_pfd_close(c->pfd); + } + nni_mtx_unlock(&c->mtx); +} + +static void +sfd_close(void *arg) +{ + nni_sfd_conn *c = arg; + nni_mtx_lock(&c->mtx); + if (!c->closed) { + nni_aio *aio; + c->closed = true; + while (((aio = nni_list_first(&c->readq)) != NULL) || + ((aio = nni_list_first(&c->writeq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + if (c->pfd != NULL) { + nni_posix_pfd_close(c->pfd); + } + } + nni_mtx_unlock(&c->mtx); +} + +// sfd_fini may block briefly waiting for the pollq thread. +// To get that out of our context, we simply reap this. +static void +sfd_fini(void *arg) +{ + nni_sfd_conn *c = arg; + sfd_close(c); + if (c->pfd != NULL) { + nni_posix_pfd_fini(c->pfd); + } + nni_mtx_fini(&c->mtx); + + NNI_FREE_STRUCT(c); +} + +static nni_reap_list sfd_reap_list = { + .rl_offset = offsetof(nni_sfd_conn, reap), + .rl_func = sfd_fini, +}; +static void +sfd_free(void *arg) +{ + struct nni_sfd_conn *c = arg; + nni_reap(&sfd_reap_list, c); +} + +static void +sfd_cb(nni_posix_pfd *pfd, unsigned events, void *arg) +{ + struct nni_sfd_conn *c = arg; + + if (events & (NNI_POLL_HUP | NNI_POLL_ERR | NNI_POLL_INVAL)) { + sfd_error(c, NNG_ECONNSHUT); + return; + } + nni_mtx_lock(&c->mtx); + if ((events & NNI_POLL_IN) != 0) { + sfd_doread(c); + } + if ((events & NNI_POLL_OUT) != 0) { + sfd_dowrite(c); + } + events = 0; + if (!nni_list_empty(&c->writeq)) { + events |= NNI_POLL_OUT; + } + if (!nni_list_empty(&c->readq)) { + events |= NNI_POLL_IN; + } + if ((!c->closed) && (events != 0)) { + nni_posix_pfd_arm(pfd, events); + } + nni_mtx_unlock(&c->mtx); +} + +static void +sfd_cancel(nni_aio *aio, void *arg, int rv) +{ + nni_sfd_conn *c = arg; + + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +static void +sfd_send(void *arg, nni_aio *aio) +{ + nni_sfd_conn *c = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, sfd_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->writeq, aio); + + if (nni_list_first(&c->writeq) == aio) { + sfd_dowrite(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->writeq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLOUT); + } + } + nni_mtx_unlock(&c->mtx); +} + +static void +sfd_recv(void *arg, nni_aio *aio) +{ + nni_sfd_conn *c = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, sfd_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->readq, aio); + + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. + if (nni_list_first(&c->readq) == aio) { + sfd_doread(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->readq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLIN); + } + } + nni_mtx_unlock(&c->mtx); +} + +static int +sfd_get_addr(void *arg, void *buf, size_t *szp, nni_type t) +{ + NNI_ARG_UNUSED(arg); + nng_sockaddr sa; + sa.s_family = NNG_AF_UNSPEC; + return (nni_copyout_sockaddr(&sa, buf, szp, t)); +} + +static int +sfd_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_sfd_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id = 0; + + rv = nni_posix_peerid(c->fd, &id, &ignore, &ignore, &ignore); + if (rv != 0) { + return (rv); + } + return (nni_copyout_u64(id, buf, szp, t)); +} + +static int +sfd_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_sfd_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id = 0; + + rv = nni_posix_peerid(c->fd, &ignore, &id, &ignore, &ignore); + if (rv != 0) { + return (rv); + } + return (nni_copyout_u64(id, buf, szp, t)); +} + +static int +sfd_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_sfd_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id = 0; + + rv = nni_posix_peerid(c->fd, &ignore, &ignore, &ignore, &id); + if (rv != 0) { + return (rv); + } + if (id == (uint64_t) -1) { + // NB: -1 is not a legal zone id (illumos/Solaris) + return (NNG_ENOTSUP); + } + return (nni_copyout_u64(id, buf, szp, t)); +} + +static int +sfd_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_sfd_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id = 0; + + rv = nni_posix_peerid(c->fd, &ignore, &ignore, &id, &ignore); + if (rv != 0) { + return (rv); + } + if (id == (uint64_t) -1) { + // NB: -1 is not a legal process id + return (NNG_ENOTSUP); + } + return (nni_copyout_u64(id, buf, szp, t)); +} + +static const nni_option sfd_options[] = { + { + .o_name = NNG_OPT_LOCADDR, + .o_get = sfd_get_addr, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = sfd_get_addr, + }, + { + .o_name = NNG_OPT_PEER_PID, + .o_get = sfd_get_peer_pid, + }, + { + .o_name = NNG_OPT_PEER_UID, + .o_get = sfd_get_peer_uid, + }, + { + .o_name = NNG_OPT_PEER_GID, + .o_get = sfd_get_peer_gid, + }, + { + .o_name = NNG_OPT_PEER_ZONEID, + .o_get = sfd_get_peer_zoneid, + }, + { + .o_name = NULL, + }, +}; + +static int +sfd_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + nni_sfd_conn *c = arg; + return (nni_getopt(sfd_options, name, c, buf, szp, t)); +} + +static int +sfd_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + nni_sfd_conn *c = arg; + return (nni_setopt(sfd_options, name, c, buf, sz, t)); +} + +int +nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd) +{ + nni_sfd_conn *c; + int rv; + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_posix_pfd_init(&c->pfd, fd)) != 0) { + NNI_FREE_STRUCT(c); + return (rv); + } + + c->closed = false; + c->fd = fd; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + c->stream.s_free = sfd_free; + c->stream.s_close = sfd_close; + c->stream.s_recv = sfd_recv; + c->stream.s_send = sfd_send; + c->stream.s_get = sfd_get; + c->stream.s_set = sfd_set; + + nni_posix_pfd_set_cb(c->pfd, sfd_cb, c); + + *cp = c; + return (0); +} + +void +nni_sfd_close_fd(int fd) +{ + close(fd); +}
\ No newline at end of file diff --git a/src/platform/windows/CMakeLists.txt b/src/platform/windows/CMakeLists.txt index d1d158e0..adf67ebd 100644 --- a/src/platform/windows/CMakeLists.txt +++ b/src/platform/windows/CMakeLists.txt @@ -39,6 +39,7 @@ if (NNG_PLATFORM_WINDOWS) win_rand.c win_resolv.c win_sockaddr.c + win_socketpair.c win_tcp.c win_tcpconn.c win_tcpdial.c diff --git a/src/platform/windows/win_socketpair.c b/src/platform/windows/win_socketpair.c new file mode 100644 index 00000000..0ed0443a --- /dev/null +++ b/src/platform/windows/win_socketpair.c @@ -0,0 +1,57 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + + +#ifdef NNG_HAVE_SOCKETPAIR_TODO +// TODO: Windows lacks socketpair. We can emulate it with an explcit +// implementation based on AF_UNIX. + +#include <errno.h> +#include <sys/socket.h> + +int +nni_socket_pair(int *fds) +{ + int rv; + rv = socketpair(PF_UNIX, SOCK_STREAM, 0, fds); + if (rv != 0) { + return (nni_plat_errno(errno)); + } + + return (0); +} +#else +int +nni_socket_pair(int fds[2]) +{ + NNI_ARG_UNUSED(fds); + return (NNG_ENOTSUP); +} + +// This is also the fdc transport. + +typedef struct nni_sfd_conn nni_sfd_conn; + +void +nni_sfd_close_fd(int fd) +{ + NNI_ARG_UNUSED(fd); +} + +int +nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd) +{ + NNI_ARG_UNUSED(cp); + NNI_ARG_UNUSED(fd); + return (NNG_ENOTSUP); +} + +#endif diff --git a/src/sp/transport.c b/src/sp/transport.c index 9f4c6522..d0a75b22 100644 --- a/src/sp/transport.c +++ b/src/sp/transport.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -70,6 +70,9 @@ extern void nni_sp_wss_register(void); #ifdef NNG_TRANSPORT_ZEROTIER extern void nni_sp_zt_register(void); #endif +#ifdef NNG_TRANSPORT_FDC +extern void nni_sp_sfd_register(void); +#endif void nni_sp_tran_sys_init(void) @@ -95,6 +98,9 @@ nni_sp_tran_sys_init(void) #ifdef NNG_TRANSPORT_ZEROTIER nni_sp_zt_register(); #endif +#ifdef NNG_TRANSPORT_FDC + nni_sp_sfd_register(); +#endif } // nni_sp_tran_sys_fini finalizes the entire transport system, including all diff --git a/src/sp/transport/CMakeLists.txt b/src/sp/transport/CMakeLists.txt index add8a9c9..0de80015 100644 --- a/src/sp/transport/CMakeLists.txt +++ b/src/sp/transport/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2020 Staysail Systems, Inc. <info@staystail.tech> +# Copyright 2023 Staysail Systems, Inc. <info@staystail.tech> # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -10,6 +10,7 @@ # Transports. nng_directory(transport) +add_subdirectory(socket) add_subdirectory(inproc) add_subdirectory(ipc) add_subdirectory(tcp) diff --git a/src/sp/transport/socket/CMakeLists.txt b/src/sp/transport/socket/CMakeLists.txt new file mode 100644 index 00000000..d79b261e --- /dev/null +++ b/src/sp/transport/socket/CMakeLists.txt @@ -0,0 +1,15 @@ +# +# Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# File Descriptor (or Handle) based connections +nng_directory(socket) + +nng_sources_if(NNG_TRANSPORT_FDC sockfd.c) +nng_defines_if(NNG_TRANSPORT_FDC NNG_TRANSPORT_FDC) +nng_test(sockfd_test)
\ No newline at end of file diff --git a/src/sp/transport/socket/sockfd.c b/src/sp/transport/socket/sockfd.c new file mode 100644 index 00000000..2db052ac --- /dev/null +++ b/src/sp/transport/socket/sockfd.c @@ -0,0 +1,1011 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2019 Devolutions <info@devolutions.net> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" + +// Socket transport. This takes sockets that may have been +// created via another mechanism (usually socketpair) and +// builds a connection using them. The socket is passed in to the listener +// using the NNG_OPT_SOCKFD_FD option. + +typedef struct sfd_tran_pipe sfd_tran_pipe; +typedef struct sfd_tran_ep sfd_tran_ep; + +// sfd_tran_pipe wraps an open file descriptor +struct sfd_tran_pipe { + nng_stream *conn; + nni_pipe *npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool closed; + nni_list_node node; + sfd_tran_ep *ep; + nni_atomic_flag reaped; + nni_reap_node reap; + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_list recvq; + nni_list sendq; + nni_aio txaio; + nni_aio rxaio; + nni_aio negoaio; + nni_msg *rxmsg; + nni_mtx mtx; +}; + +struct sfd_tran_ep { + nni_mtx mtx; + uint16_t proto; + size_t rcvmax; + bool fini; + bool started; + bool closed; + nng_sockaddr src; + int refcnt; // active pipes + nni_aio *useraio; + nni_aio *connaio; + nni_aio *timeaio; + nni_list busypipes; // busy pipes -- ones passed to socket + nni_list waitpipes; // pipes waiting to match to socket + nni_list negopipes; // pipes busy negotiating + nni_reap_node reap; + nng_stream_listener *listener; + +#ifdef NNG_ENABLE_STATS + nni_stat_item st_rcv_max; +#endif +}; + +static void sfd_tran_pipe_send_start(sfd_tran_pipe *); +static void sfd_tran_pipe_recv_start(sfd_tran_pipe *); +static void sfd_tran_pipe_send_cb(void *); +static void sfd_tran_pipe_recv_cb(void *); +static void sfd_tran_pipe_nego_cb(void *); +static void sfd_tran_ep_fini(void *); +static void sfd_tran_pipe_fini(void *); + +static nni_reap_list sfd_tran_ep_reap_list = { + .rl_offset = offsetof(sfd_tran_ep, reap), + .rl_func = sfd_tran_ep_fini, +}; + +static nni_reap_list sfd_tran_pipe_reap_list = { + .rl_offset = offsetof(sfd_tran_pipe, reap), + .rl_func = sfd_tran_pipe_fini, +}; + +static void +sfd_tran_init(void) +{ +} + +static void +sfd_tran_fini(void) +{ +} + +static void +sfd_tran_pipe_close(void *arg) +{ + sfd_tran_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + p->closed = true; + nni_mtx_unlock(&p->mtx); + + nni_aio_close(&p->rxaio); + nni_aio_close(&p->txaio); + nni_aio_close(&p->negoaio); + + nng_stream_close(p->conn); +} + +static void +sfd_tran_pipe_stop(void *arg) +{ + sfd_tran_pipe *p = arg; + + nni_aio_stop(&p->rxaio); + nni_aio_stop(&p->txaio); + nni_aio_stop(&p->negoaio); +} + +static int +sfd_tran_pipe_init(void *arg, nni_pipe *npipe) +{ + sfd_tran_pipe *p = arg; + p->npipe = npipe; + + return (0); +} + +static void +sfd_tran_pipe_fini(void *arg) +{ + sfd_tran_pipe *p = arg; + sfd_tran_ep *ep; + + sfd_tran_pipe_stop(p); + if ((ep = p->ep) != NULL) { + nni_mtx_lock(&ep->mtx); + nni_list_node_remove(&p->node); + ep->refcnt--; + if (ep->fini && (ep->refcnt == 0)) { + nni_reap(&sfd_tran_ep_reap_list, ep); + } + nni_mtx_unlock(&ep->mtx); + } + + nni_aio_fini(&p->rxaio); + nni_aio_fini(&p->txaio); + nni_aio_fini(&p->negoaio); + nng_stream_free(p->conn); + nni_msg_free(p->rxmsg); + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); +} + +static void +sfd_tran_pipe_reap(sfd_tran_pipe *p) +{ + if (!nni_atomic_flag_test_and_set(&p->reaped)) { + if (p->conn != NULL) { + nng_stream_close(p->conn); + } + nni_reap(&sfd_tran_pipe_reap_list, p); + } +} + +static int +sfd_tran_pipe_alloc(sfd_tran_pipe **pipep) +{ + sfd_tran_pipe *p; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p); + nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p); + nni_aio_list_init(&p->recvq); + nni_aio_list_init(&p->sendq); + nni_atomic_flag_reset(&p->reaped); + + *pipep = p; + + return (0); +} + +static void +sfd_tran_ep_match(sfd_tran_ep *ep) +{ + nni_aio *aio; + sfd_tran_pipe *p; + + if (((aio = ep->useraio) == NULL) || + ((p = nni_list_first(&ep->waitpipes)) == NULL)) { + return; + } + nni_list_remove(&ep->waitpipes, p); + nni_list_append(&ep->busypipes, p); + ep->useraio = NULL; + p->rcvmax = ep->rcvmax; + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); +} + +static void +sfd_tran_pipe_nego_cb(void *arg) +{ + sfd_tran_pipe *p = arg; + sfd_tran_ep *ep = p->ep; + nni_aio *aio = &p->negoaio; + nni_aio *uaio; + int rv; + + nni_mtx_lock(&ep->mtx); + + if ((rv = nni_aio_result(aio)) != 0) { + goto error; + } + + // We start transmitting before we receive. + if (p->gottxhead < p->wanttxhead) { + p->gottxhead += nni_aio_count(aio); + } else if (p->gotrxhead < p->wantrxhead) { + p->gotrxhead += nni_aio_count(aio); + } + + if (p->gottxhead < p->wanttxhead) { + nni_iov iov; + iov.iov_len = p->wanttxhead - p->gottxhead; + iov.iov_buf = &p->txlen[p->gottxhead]; + // send it down... + nni_aio_set_iov(aio, 1, &iov); + nng_stream_send(p->conn, aio); + nni_mtx_unlock(&ep->mtx); + return; + } + if (p->gotrxhead < p->wantrxhead) { + nni_iov iov; + iov.iov_len = p->wantrxhead - p->gotrxhead; + iov.iov_buf = &p->rxlen[p->gotrxhead]; + nni_aio_set_iov(aio, 1, &iov); + nng_stream_recv(p->conn, aio); + nni_mtx_unlock(&ep->mtx); + return; + } + // We have both sent and received the headers. Let's check the + // receiver. + if ((p->rxlen[0] != 0) || (p->rxlen[1] != 'S') || + (p->rxlen[2] != 'P') || (p->rxlen[3] != 0) || (p->rxlen[6] != 0) || + (p->rxlen[7] != 0)) { + rv = NNG_EPROTO; + goto error; + } + + NNI_GET16(&p->rxlen[4], p->peer); + + // We are ready now. We put this in the wait list, and + // then try to run the matcher. + nni_list_remove(&ep->negopipes, p); + nni_list_append(&ep->waitpipes, p); + + sfd_tran_ep_match(ep); + nni_mtx_unlock(&ep->mtx); + + return; + +error: + // If the connection is closed, we need to pass back a different + // error code. This is necessary to avoid a problem where the + // closed status is confused with the accept file descriptor + // being closed. + if (rv == NNG_ECLOSED) { + rv = NNG_ECONNSHUT; + } + nng_stream_close(p->conn); + + if ((uaio = ep->useraio) != NULL) { + ep->useraio = NULL; + nni_aio_finish_error(uaio, rv); + } + nni_mtx_unlock(&ep->mtx); + sfd_tran_pipe_reap(p); +} + +static void +sfd_tran_pipe_send_cb(void *arg) +{ + sfd_tran_pipe *p = arg; + int rv; + nni_aio *aio; + size_t n; + nni_msg *msg; + nni_aio *txaio = &p->txaio; + + nni_mtx_lock(&p->mtx); + aio = nni_list_first(&p->sendq); + + if ((rv = nni_aio_result(txaio)) != 0) { + nni_pipe_bump_error(p->npipe, rv); + // Intentionally we do not queue up another transfer. + // There's an excellent chance that the pipe is no longer + // usable, with a partial transfer. + // The protocol should see this error, and close the + // pipe itself, we hope. + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + n = nni_aio_count(txaio); + nni_aio_iov_advance(txaio, n); + if (nni_aio_iov_count(txaio) > 0) { + nng_stream_send(p->conn, txaio); + nni_mtx_unlock(&p->mtx); + return; + } + + nni_aio_list_remove(aio); + sfd_tran_pipe_send_start(p); + + msg = nni_aio_get_msg(aio); + n = nni_msg_len(msg); + nni_pipe_bump_tx(p->npipe, n); + nni_mtx_unlock(&p->mtx); + + nni_aio_set_msg(aio, NULL); + nni_msg_free(msg); + nni_aio_finish_sync(aio, 0, n); +} + +static void +sfd_tran_pipe_recv_cb(void *arg) +{ + sfd_tran_pipe *p = arg; + nni_aio *aio; + int rv; + size_t n; + nni_msg *msg; + nni_aio *rxaio = &p->rxaio; + + nni_mtx_lock(&p->mtx); + aio = nni_list_first(&p->recvq); + + if ((rv = nni_aio_result(rxaio)) != 0) { + goto recv_error; + } + + if (p->closed) { + rv = NNG_ECLOSED; + goto recv_error; + } + + n = nni_aio_count(rxaio); + nni_aio_iov_advance(rxaio, n); + if (nni_aio_iov_count(rxaio) > 0) { + nng_stream_recv(p->conn, rxaio); + nni_mtx_unlock(&p->mtx); + return; + } + + // If we don't have a message yet, we were reading the message + // header, which is just the length. This tells us the size of the + // message to allocate and how much more to expect. + if (p->rxmsg == NULL) { + uint64_t len; + // We should have gotten a message header. + NNI_GET64(p->rxlen, len); + + // Make sure the message payload is not too big. If it is + // the caller will shut down the pipe. + if ((len > p->rcvmax) && (p->rcvmax > 0)) { + rv = NNG_EMSGSIZE; + goto recv_error; + } + + if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { + goto recv_error; + } + + // Submit the rest of the data for a read -- we want to + // read the entire message now. + if (len != 0) { + nni_iov iov; + iov.iov_buf = nni_msg_body(p->rxmsg); + iov.iov_len = (size_t) len; + + nni_aio_set_iov(rxaio, 1, &iov); + nng_stream_recv(p->conn, rxaio); + nni_mtx_unlock(&p->mtx); + return; + } + } + + // We read a message completely. Let the user know the good news. + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + n = nni_msg_len(msg); + + nni_pipe_bump_rx(p->npipe, n); + sfd_tran_pipe_recv_start(p); + nni_mtx_unlock(&p->mtx); + + nni_aio_set_msg(aio, msg); + nni_aio_finish_sync(aio, 0, n); + return; + +recv_error: + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + nni_pipe_bump_error(p->npipe, rv); + // Intentionally, we do not queue up another receive. + // The protocol should notice this error and close the pipe. + nni_mtx_unlock(&p->mtx); + + nni_msg_free(msg); + nni_aio_finish_error(aio, rv); +} + +static void +sfd_tran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) +{ + sfd_tran_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&p->mtx); + return; + } + // If this is being sent, then cancel the pending transfer. + // The callback on the txaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->sendq) == aio) { + nni_aio_abort(&p->txaio, rv); + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + + nni_aio_finish_error(aio, rv); +} + +static void +sfd_tran_pipe_send_start(sfd_tran_pipe *p) +{ + nni_aio *aio; + nni_aio *txaio; + nni_msg *msg; + int niov; + nni_iov iov[3]; + uint64_t len; + + if (p->closed) { + while ((aio = nni_list_first(&p->sendq)) != NULL) { + nni_list_remove(&p->sendq, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + return; + } + + if ((aio = nni_list_first(&p->sendq)) == NULL) { + return; + } + + // This runs to send the message. + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg) + nni_msg_header_len(msg); + + NNI_PUT64(p->txlen, len); + + txaio = &p->txaio; + niov = 0; + iov[0].iov_buf = p->txlen; + iov[0].iov_len = sizeof(p->txlen); + niov++; + if (nni_msg_header_len(msg) > 0) { + iov[niov].iov_buf = nni_msg_header(msg); + iov[niov].iov_len = nni_msg_header_len(msg); + niov++; + } + if (nni_msg_len(msg) > 0) { + iov[niov].iov_buf = nni_msg_body(msg); + iov[niov].iov_len = nni_msg_len(msg); + niov++; + } + nni_aio_set_iov(txaio, niov, iov); + nng_stream_send(p->conn, txaio); +} + +static void +sfd_tran_pipe_send(void *arg, nni_aio *aio) +{ + sfd_tran_pipe *p = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + // No way to give the message back to the protocol, so + // we just discard it silently to prevent it from leaking. + nni_msg_free(nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); + return; + } + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, sfd_tran_pipe_send_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&p->sendq, aio); + if (nni_list_first(&p->sendq) == aio) { + sfd_tran_pipe_send_start(p); + } + nni_mtx_unlock(&p->mtx); +} + +static void +sfd_tran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) +{ + sfd_tran_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&p->mtx); + return; + } + // If receive in progress, then cancel the pending transfer. + // The callback on the rxaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->recvq) == aio) { + nni_aio_abort(&p->rxaio, rv); + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); +} + +static void +sfd_tran_pipe_recv_start(sfd_tran_pipe *p) +{ + nni_aio *rxaio; + nni_iov iov; + NNI_ASSERT(p->rxmsg == NULL); + + if (p->closed) { + nni_aio *aio; + while ((aio = nni_list_first(&p->recvq)) != NULL) { + nni_list_remove(&p->recvq, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + return; + } + if (nni_list_empty(&p->recvq)) { + return; + } + + // Schedule a read of the header. + rxaio = &p->rxaio; + iov.iov_buf = p->rxlen; + iov.iov_len = sizeof(p->rxlen); + nni_aio_set_iov(rxaio, 1, &iov); + + nng_stream_recv(p->conn, rxaio); +} + +static void +sfd_tran_pipe_recv(void *arg, nni_aio *aio) +{ + sfd_tran_pipe *p = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, sfd_tran_pipe_recv_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + nni_list_append(&p->recvq, aio); + if (nni_list_first(&p->recvq) == aio) { + sfd_tran_pipe_recv_start(p); + } + nni_mtx_unlock(&p->mtx); +} + +static uint16_t +sfd_tran_pipe_peer(void *arg) +{ + sfd_tran_pipe *p = arg; + + return (p->peer); +} + +static int +sfd_tran_pipe_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + sfd_tran_pipe *p = arg; + return (nni_stream_get(p->conn, name, buf, szp, t)); +} + +static void +sfd_tran_pipe_start(sfd_tran_pipe *p, nng_stream *conn, sfd_tran_ep *ep) +{ + nni_iov iov; + + ep->refcnt++; + + p->conn = conn; + p->ep = ep; + p->proto = ep->proto; + + p->txlen[0] = 0; + p->txlen[1] = 'S'; + p->txlen[2] = 'P'; + p->txlen[3] = 0; + NNI_PUT16(&p->txlen[4], p->proto); + NNI_PUT16(&p->txlen[6], 0); + + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 8; + p->wanttxhead = 8; + iov.iov_len = 8; + iov.iov_buf = &p->txlen[0]; + nni_aio_set_iov(&p->negoaio, 1, &iov); + nni_list_append(&ep->negopipes, p); + + // No timeouts here -- the purpose of timeouts was to guard against + // untrusted callers forcing us to burn files. In this case the + // application is providing us with a file, and should be reasonably + // trusted not to be doing a DoS against itself! :-) Part of the + // rationale is that it may take a while for a child process to reach + // the point where it is ready to negotiate the other side of a + // connection. + nni_aio_set_timeout(&p->negoaio, NNG_DURATION_INFINITE); + nng_stream_send(p->conn, &p->negoaio); +} + +static void +sfd_tran_ep_fini(void *arg) +{ + sfd_tran_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + ep->fini = true; + if (ep->refcnt != 0) { + nni_mtx_unlock(&ep->mtx); + return; + } + nni_mtx_unlock(&ep->mtx); + nni_aio_stop(ep->timeaio); + nni_aio_stop(ep->connaio); + nng_stream_listener_free(ep->listener); + nni_aio_free(ep->timeaio); + nni_aio_free(ep->connaio); + + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + +static void +sfd_tran_ep_close(void *arg) +{ + sfd_tran_ep *ep = arg; + sfd_tran_pipe *p; + + nni_mtx_lock(&ep->mtx); + + ep->closed = true; + nni_aio_close(ep->timeaio); + if (ep->listener != NULL) { + nng_stream_listener_close(ep->listener); + } + NNI_LIST_FOREACH (&ep->negopipes, p) { + sfd_tran_pipe_close(p); + } + NNI_LIST_FOREACH (&ep->waitpipes, p) { + sfd_tran_pipe_close(p); + } + NNI_LIST_FOREACH (&ep->busypipes, p) { + sfd_tran_pipe_close(p); + } + if (ep->useraio != NULL) { + nni_aio_finish_error(ep->useraio, NNG_ECLOSED); + ep->useraio = NULL; + } + + nni_mtx_unlock(&ep->mtx); +} + +static void +sfd_tran_timer_cb(void *arg) +{ + sfd_tran_ep *ep = arg; + if (nni_aio_result(ep->timeaio) == 0) { + nng_stream_listener_accept(ep->listener, ep->connaio); + } +} + +static void +sfd_tran_accept_cb(void *arg) +{ + sfd_tran_ep *ep = arg; + nni_aio *aio = ep->connaio; + sfd_tran_pipe *p; + int rv; + nng_stream *conn; + + nni_mtx_lock(&ep->mtx); + + if ((rv = nni_aio_result(aio)) != 0) { + goto error; + } + + conn = nni_aio_get_output(aio, 0); + if ((rv = sfd_tran_pipe_alloc(&p)) != 0) { + nng_stream_free(conn); + goto error; + } + + if (ep->closed) { + sfd_tran_pipe_fini(p); + nng_stream_free(conn); + rv = NNG_ECLOSED; + goto error; + } + sfd_tran_pipe_start(p, conn, ep); + nng_stream_listener_accept(ep->listener, ep->connaio); + nni_mtx_unlock(&ep->mtx); + return; + +error: + // When an error here occurs, let's send a notice up to the consumer. + // That way it can be reported properly. + if ((aio = ep->useraio) != NULL) { + ep->useraio = NULL; + nni_aio_finish_error(aio, rv); + } + switch (rv) { + + case NNG_ENOMEM: + case NNG_ENOFILES: + nng_sleep_aio(10, ep->timeaio); + break; + + default: + if (!ep->closed) { + nng_stream_listener_accept(ep->listener, ep->connaio); + } + break; + } + nni_mtx_unlock(&ep->mtx); +} + +static int +sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock) +{ + sfd_tran_ep *ep; + NNI_ARG_UNUSED(url); + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&ep->mtx); + NNI_LIST_INIT(&ep->busypipes, sfd_tran_pipe, node); + NNI_LIST_INIT(&ep->waitpipes, sfd_tran_pipe, node); + NNI_LIST_INIT(&ep->negopipes, sfd_tran_pipe, node); + + ep->proto = nni_sock_proto_id(sock); + +#ifdef NNG_ENABLE_STATS + static const nni_stat_info rcv_max_info = { + .si_name = "rcv_max", + .si_desc = "maximum receive size", + .si_type = NNG_STAT_LEVEL, + .si_unit = NNG_UNIT_BYTES, + .si_atomic = true, + }; + nni_stat_init(&ep->st_rcv_max, &rcv_max_info); +#endif + + *epp = ep; + return (0); +} + +static int +sfd_tran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) +{ + NNI_ARG_UNUSED(dp); + NNI_ARG_UNUSED(url); + NNI_ARG_UNUSED(ndialer); + return (NNG_ENOTSUP); +} + +static int +sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) +{ + sfd_tran_ep *ep; + int rv; + nni_sock *sock = nni_listener_sock(nlistener); + + // Check for invalid URL components -- we only accept a bare scheme + if ((strlen(url->u_host) != 0) || (strlen(url->u_path) != 0) || + (url->u_fragment != NULL) || (url->u_userinfo != NULL) || + (url->u_query != NULL)) { + return (NNG_EADDRINVAL); + } + + if ((rv = sfd_tran_ep_init(&ep, url, sock)) != 0) { + return (rv); + } + + if (((rv = nni_aio_alloc(&ep->connaio, sfd_tran_accept_cb, ep)) != + 0) || + ((rv = nni_aio_alloc(&ep->timeaio, sfd_tran_timer_cb, ep)) != 0) || + ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { + sfd_tran_ep_fini(ep); + return (rv); + } +#ifdef NNG_ENABLE_STATS + nni_listener_add_stat(nlistener, &ep->st_rcv_max); +#endif + + *lp = ep; + return (0); +} + +static void +sfd_tran_ep_cancel(nni_aio *aio, void *arg, int rv) +{ + sfd_tran_ep *ep = arg; + nni_mtx_lock(&ep->mtx); + if (ep->useraio == aio) { + ep->useraio = NULL; + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&ep->mtx); +} + +static int +sfd_tran_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + sfd_tran_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static int +sfd_tran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + sfd_tran_ep *ep = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&ep->mtx); + ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); +#ifdef NNG_ENABLE_STATS + nni_stat_set_value(&ep->st_rcv_max, val); +#endif + } + return (rv); +} + +static int +sfd_tran_ep_bind(void *arg) +{ + sfd_tran_ep *ep = arg; + return (nng_stream_listener_listen(ep->listener)); +} + +static void +sfd_tran_ep_accept(void *arg, nni_aio *aio) +{ + sfd_tran_ep *ep = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ep->mtx); + if (ep->closed) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if (ep->useraio != NULL) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_EBUSY); + return; + } + if ((rv = nni_aio_schedule(aio, sfd_tran_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + ep->useraio = aio; + if (!ep->started) { + ep->started = true; + nng_stream_listener_accept(ep->listener, ep->connaio); + } else { + sfd_tran_ep_match(ep); + } + nni_mtx_unlock(&ep->mtx); +} + +static nni_sp_pipe_ops sfd_tran_pipe_ops = { + .p_init = sfd_tran_pipe_init, + .p_fini = sfd_tran_pipe_fini, + .p_stop = sfd_tran_pipe_stop, + .p_send = sfd_tran_pipe_send, + .p_recv = sfd_tran_pipe_recv, + .p_close = sfd_tran_pipe_close, + .p_peer = sfd_tran_pipe_peer, + .p_getopt = sfd_tran_pipe_getopt, +}; + +static const nni_option sfd_tran_ep_opts[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_get = sfd_tran_ep_get_recvmaxsz, + .o_set = sfd_tran_ep_set_recvmaxsz, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static int +sfd_tran_listener_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + sfd_tran_ep *ep = arg; + int rv; + + rv = nni_stream_listener_get(ep->listener, name, buf, szp, t); + if (rv == NNG_ENOTSUP) { + rv = nni_getopt(sfd_tran_ep_opts, name, ep, buf, szp, t); + } + return (rv); +} + +static int +sfd_tran_listener_setopt( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + sfd_tran_ep *ep = arg; + int rv; + + rv = nni_stream_listener_set(ep->listener, name, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_setopt(sfd_tran_ep_opts, name, ep, buf, sz, t); + } + return (rv); +} + +static nni_sp_dialer_ops sfd_tran_dialer_ops = { + .d_init = sfd_tran_dialer_init, + .d_fini = NULL, + .d_connect = NULL, + .d_close = NULL, + .d_getopt = NULL, + .d_setopt = NULL, +}; + +static nni_sp_listener_ops sfd_tran_listener_ops = { + .l_init = sfd_tran_listener_init, + .l_fini = sfd_tran_ep_fini, + .l_bind = sfd_tran_ep_bind, + .l_accept = sfd_tran_ep_accept, + .l_close = sfd_tran_ep_close, + .l_getopt = sfd_tran_listener_getopt, + .l_setopt = sfd_tran_listener_setopt, +}; + +static nni_sp_tran sfd_tran = { + .tran_scheme = "socket", + .tran_dialer = &sfd_tran_dialer_ops, + .tran_listener = &sfd_tran_listener_ops, + .tran_pipe = &sfd_tran_pipe_ops, + .tran_init = sfd_tran_init, + .tran_fini = sfd_tran_fini, +}; + +void +nni_sp_sfd_register(void) +{ + nni_sp_tran_register(&sfd_tran); +} diff --git a/src/sp/transport/socket/sockfd_test.c b/src/sp/transport/socket/sockfd_test.c new file mode 100644 index 00000000..c12d4466 --- /dev/null +++ b/src/sp/transport/socket/sockfd_test.c @@ -0,0 +1,461 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2018 Cody Piersall <cody.piersall@gmail.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +#ifdef NNG_PLATFORM_POSIX +#include <unistd.h> +#include <fcntl.h> +#ifdef NNG_PLATFORM_SUNOS +#include <zone.h> +#endif +#endif + +// FDC tests. +static void +test_sfd_connect_fail(void) +{ + nng_socket s; + + NUTS_OPEN(s); + NUTS_FAIL(nng_dial(s, "socket://", NULL, 0), NNG_ENOTSUP); + NUTS_CLOSE(s); +} + +void +test_sfd_malformed_address(void) +{ + nng_socket s1; + + NUTS_OPEN(s1); + NUTS_FAIL(nng_listen(s1, "socket://junk", NULL, 0), NNG_EADDRINVAL); + NUTS_CLOSE(s1); +} + +void +test_sfd_listen(void) +{ + nng_socket s1; + + NUTS_OPEN(s1); + NUTS_PASS(nng_listen(s1, "socket://", NULL, 0)); + NUTS_CLOSE(s1); +} + +void +test_sfd_accept(void) +{ + nng_socket s1, s2; + nng_listener l; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + // make sure we won't have to deal with SIGPIPE - EPIPE is better + NUTS_OPEN(s1); + NUTS_OPEN(s2); + NUTS_PASS(nng_listener_create(&l, s1, "socket://")); + NUTS_PASS(nng_listener_start(l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_SLEEP(10); + NUTS_CLOSE(s1); + close(fds[1]); +} + +void +test_sfd_exchange(void) +{ + nng_socket s1, s2; + nng_listener l1, l2; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s1); + NUTS_OPEN(s2); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listener_create(&l2, s2, "socket://")); + NUTS_PASS(nng_listener_start(l2, 0)); + NUTS_PASS(nng_listener_set_int(l2, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_SLEEP(10); + NUTS_SEND(s1, "hello"); + NUTS_RECV(s2, "hello"); + NUTS_SEND(s2, "there"); + NUTS_RECV(s1, "there"); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + close(fds[1]); +} + +void +test_sfd_exchange_late(void) +{ + nng_socket s1, s2; + nng_listener l1, l2; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s1); + NUTS_OPEN(s2); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_create(&l2, s2, "socket://")); + NUTS_PASS(nng_listener_set_int(l2, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_listener_start(l2, 0)); + NUTS_SLEEP(10); + NUTS_SEND(s1, "hello"); + NUTS_RECV(s2, "hello"); + NUTS_SEND(s2, "there"); + NUTS_RECV(s1, "there"); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + close(fds[1]); +} + +void +test_sfd_recv_max(void) +{ + char msg[256]; + char buf[256]; + nng_socket s0; + nng_socket s1; + nng_listener l0; + nng_listener l1; + size_t sz; + size_t scratch; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + + NUTS_OPEN(s0); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_size(s0, NNG_OPT_RECVMAXSZ, 200)); + NUTS_PASS(nng_listener_create(&l0, s0, "socket://")); + NUTS_PASS(nng_socket_get_size(s0, NNG_OPT_RECVMAXSZ, &sz)); + NUTS_TRUE(sz == 200); + NUTS_PASS(nng_listener_set_size(l0, NNG_OPT_RECVMAXSZ, 100)); + NUTS_PASS(nng_listener_get_size(l0, NNG_OPT_RECVMAXSZ, &scratch)); + NUTS_ASSERT(scratch == 100); + NUTS_PASS(nng_listener_start(l0, 0)); + NUTS_PASS(nng_listener_set_int(l0, NNG_OPT_SOCKET_FD, fds[0])); + + NUTS_OPEN(s1); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_send(s1, msg, 95, 0)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_recv(s0, buf, &sz, 0)); + NUTS_TRUE(sz == 95); + NUTS_PASS(nng_send(s1, msg, 150, 0)); + NUTS_FAIL(nng_recv(s0, buf, &sz, 0), NNG_ETIMEDOUT); + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); +} + +void +test_sfd_large(void) +{ + char *buf; + nng_socket s0; + nng_socket s1; + nng_listener l0; + nng_listener l1; + size_t sz; + nng_msg *msg; + int fds[2]; + + sz = 1U << 20; + buf = nng_alloc(sz); // a MB + buf[sz - 1] = 0; + memset(buf, 'A', sz - 1); + NUTS_PASS(nng_socket_pair(fds)); + NUTS_PASS(nng_msg_alloc(&msg, sz)); + + NUTS_OPEN(s0); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_size(s0, NNG_OPT_RECVMAXSZ, 2U << 20)); + NUTS_PASS(nng_listener_create(&l0, s0, "socket://")); + NUTS_PASS(nng_listener_start(l0, 0)); + NUTS_PASS(nng_listener_set_int(l0, NNG_OPT_SOCKET_FD, fds[0])); + + NUTS_OPEN(s1); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_size(s1, NNG_OPT_RECVMAXSZ, 2U << 20)); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[1])); + nng_msleep(100); + + nng_msg_clear(msg); + NUTS_PASS(nng_msg_append(msg, buf, sz)); + NUTS_PASS(nng_sendmsg(s0, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + NUTS_ASSERT(strcmp(nng_msg_body(msg), buf) == 0); + + memset(nng_msg_body(msg), 'B', sz - 1); + memset(buf, 'B', sz - 1); + + NUTS_PASS(nng_sendmsg(s1, msg, 0)); + NUTS_PASS(nng_recvmsg(s0, &msg, 0)); + NUTS_ASSERT(strcmp(nng_msg_body(msg), buf) == 0); + + nng_msg_clear(msg); + NUTS_PASS(nng_msg_append(msg, buf, sz)); + NUTS_PASS(nng_sendmsg(s0, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + NUTS_ASSERT(strcmp(nng_msg_body(msg), buf) == 0); + + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); + nng_msg_free(msg); + nng_free(buf, sz); +} + +void +test_sockfd_close_pending(void) +{ + // this test verifies that closing a socket pair that has not + // started negotiation with the other side still works. + int fds[2]; + nng_socket s0; + nng_listener l; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + nng_listen(s0, "socket://", &l, 0); + nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0]); + nng_msleep(10); + NUTS_CLOSE(s0); + close(fds[1]); +} + +void +test_sockfd_close_peer(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0; + nng_listener l; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + close(fds[1]); + nng_msleep(100); + NUTS_CLOSE(s0); +} + +void +test_sockfd_listener_sockaddr(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0; + nng_listener l; + nng_sockaddr sa; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa)); + NUTS_ASSERT(sa.s_family == NNG_AF_UNSPEC); + close(fds[1]); + NUTS_CLOSE(s0); +} + +void +test_sockfd_pipe_sockaddr(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0, s1; + nng_listener l; + nng_sockaddr sa; + nng_msg *msg; + nng_pipe p; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_OPEN(s1); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listen(s1, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + + NUTS_SEND(s0, "something"); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + p = nng_msg_get_pipe(msg); + NUTS_PASS(nng_pipe_get_addr(p, NNG_OPT_LOCADDR, &sa)); + NUTS_ASSERT(sa.s_family == NNG_AF_UNSPEC); + NUTS_PASS(nng_pipe_get_addr(p, NNG_OPT_REMADDR, &sa)); + NUTS_ASSERT(sa.s_family == NNG_AF_UNSPEC); + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); + nng_msg_free(msg); +} + +void +test_sockfd_pipe_peer(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0, s1; + nng_listener l; + nng_msg *msg; + nng_pipe p; + uint64_t id; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_OPEN(s1); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listen(s1, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + + NUTS_SEND(s0, "something"); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + p = nng_msg_get_pipe(msg); + NUTS_ASSERT(nng_pipe_id(p) != -1); +#if defined(NNG_PLATFORM_DARWIN) || defined(NNG_PLATFORM_LINUX) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_PID, &id)); + NUTS_ASSERT(id == (uint64_t) getpid()); +#endif +#if defined(NNG_PLATFORM_DARWIN) || defined(NNG_PLATFORM_LINUX) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_UID, &id)); + NUTS_ASSERT(id == (uint64_t) getuid()); +#endif +#if defined(NNG_PLATFORM_DARWIN) || defined(NNG_PLATFORM_LINUX) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_GID, &id)); + NUTS_ASSERT(id == (uint64_t) getgid()); +#endif +#if defined(NNG_PLATFORM_SUNOS) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_ZONEID, &id)); + NUTS_ASSERT(id == (uint64_t) getzoneid()); +#else + NUTS_FAIL( + nng_pipe_get_uint64(p, NNG_OPT_PEER_ZONEID, &id), NNG_ENOTSUP); +#endif + + nng_msg_free(msg); + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); +} + +void +test_sfd_listen_full(void) +{ +#ifndef NNG_SFD_LISTEN_QUEUE +#define NNG_SFD_LISTEN_QUEUE 16 +#endif + + int fds[NNG_SFD_LISTEN_QUEUE * 2]; + nng_socket s; + int i; + nng_listener l; + for (i = 0; i < NNG_SFD_LISTEN_QUEUE * 2; i += 2) { + int pair[2]; + NUTS_PASS(nng_socket_pair(pair)); + fds[i] = pair[0]; + fds[i+1] = pair[1]; + } + NUTS_OPEN(s); + NUTS_PASS(nng_listener_create(&l, s, "socket://")); + for (i = 0; i < NNG_SFD_LISTEN_QUEUE * 2; i++) { + if (i < NNG_SFD_LISTEN_QUEUE) { + NUTS_PASS(nng_listener_set_int( + l, NNG_OPT_SOCKET_FD, fds[i])); + } else { + NUTS_FAIL( + nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[i]), + NNG_ENOSPC); + } + } + for (i = 0; i < NNG_SFD_LISTEN_QUEUE * 2; i++) { + close(fds[i]); + } + NUTS_CLOSE(s); +} + +void +test_sfd_fd_option_type(void) +{ + nng_socket s; + nng_listener l; + + NUTS_OPEN(s); + NUTS_PASS(nng_listener_create(&l, s, "socket://")); + NUTS_FAIL(nng_listener_set_bool(l, NNG_OPT_SOCKET_FD, false), NNG_EBADTYPE); + NUTS_CLOSE(s); +} + +void +test_sfd_fd_dev_zero(void) +{ +#ifdef NNG_PLATFORM_POSIX + nng_socket s; + nng_listener l; + int fd; + + // dev/zero produces a stream of zero bytes leading to protocol error + NUTS_ASSERT((fd = open("/dev/zero", O_RDONLY, 0777)) >= 0); + + NUTS_OPEN(s); + NUTS_PASS(nng_listener_create(&l, s, "socket://")); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fd)); + nng_msleep(100); + NUTS_CLOSE(s); +#endif +} + +NUTS_TESTS = { + { "socket connect fail", test_sfd_connect_fail }, + { "socket malformed address", test_sfd_malformed_address }, +#ifdef NNG_HAVE_SOCKETPAIR + { "socket listen", test_sfd_listen }, + { "socket accept", test_sfd_accept }, + { "socket exchange", test_sfd_exchange }, + { "socket exchange late", test_sfd_exchange_late }, + { "socket recv max", test_sfd_recv_max }, + { "socket exchange large", test_sfd_large }, + { "socket close pending", test_sockfd_close_pending }, + { "socket close peer", test_sockfd_close_peer }, + { "socket listener address", test_sockfd_listener_sockaddr }, + { "socket pipe address", test_sockfd_pipe_sockaddr }, + { "socket pipe peer id", test_sockfd_pipe_peer }, + { "socket listen full", test_sfd_listen_full }, + { "socket bad fd type", test_sfd_fd_option_type }, + { "socket dev zero", test_sfd_fd_dev_zero }, +#endif + + { NULL, NULL }, +};
\ No newline at end of file diff --git a/src/supplemental/util/platform.c b/src/supplemental/util/platform.c index ddc2d088..99daaef6 100644 --- a/src/supplemental/util/platform.c +++ b/src/supplemental/util/platform.c @@ -164,3 +164,9 @@ nng_random(void) (void) nni_init(); return (nni_random()); } + +int +nng_socket_pair(int *fds) +{ + return (nni_socket_pair(fds)); +}
\ No newline at end of file |
