aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/CMakeLists.txt2
-rw-r--r--src/core/dialer.c2
-rw-r--r--src/core/platform.h17
-rw-r--r--src/core/sockfd.c231
-rw-r--r--src/core/sockfd.h28
-rw-r--r--src/core/stream.c10
6 files changed, 286 insertions, 4 deletions
diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt
index f003d756..9e5a6bec 100644
--- a/src/core/CMakeLists.txt
+++ b/src/core/CMakeLists.txt
@@ -24,6 +24,8 @@ nng_sources(
device.h
dialer.c
dialer.h
+ sockfd.c
+ sockfd.h
file.c
file.h
idhash.c
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 55a46efb..91d18dc8 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -411,7 +411,7 @@ int
nni_dialer_start(nni_dialer *d, unsigned flags)
{
int rv = 0;
- nni_aio *aio;
+ nni_aio *aio = NULL;
if (nni_atomic_flag_test_and_set(&d->d_started)) {
return (NNG_ESTATE);
diff --git a/src/core/platform.h b/src/core/platform.h
index 89759921..0b5ec634 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -386,9 +386,9 @@ extern int nni_ipc_listener_alloc(nng_stream_listener **, const nng_url *);
typedef struct nni_plat_udp nni_plat_udp;
// nni_plat_udp_open initializes a UDP socket, binding to the local
-// address specified specified in the AIO. The remote address is
+// address specified in the AIO. The remote address is
// not used. The resulting nni_plat_udp structure is returned in the
-// the aio's a_pipe.
+// aio's a_pipe.
extern int nni_plat_udp_open(nni_plat_udp **, nni_sockaddr *);
// nni_plat_udp_close closes the underlying UDP socket.
@@ -434,6 +434,19 @@ extern void nni_plat_pipe_close(int, int);
extern int nni_plat_udp_sockname(nni_plat_udp *, nni_sockaddr *);
+// nni_socket_pair is used to create a socket pair using socketpair()
+// on POSIX systems. (Windows might provide a similar solution, using
+// AF_UNIX at some point, in which case the arguments will actually be
+// an array of HANDLEs.) If not supported, this returns NNG_ENOTSUP.
+//
+// This API can only create a pair of open file descriptors, suitable for use
+// with the socket transport, each bound to the other. The transport must be
+// a bidirectional reliable byte stream. This should be suitable for use
+// in APIs to transport file descriptors, or across a fork/exec boundary (so
+// that child processes may use these with socket to inherit a socket that is
+// connected to the parent.)
+extern int nni_socket_pair(int *);
+
//
// File/Store Support
//
diff --git a/src/core/sockfd.c b/src/core/sockfd.c
new file mode 100644
index 00000000..1b4dbc1d
--- /dev/null
+++ b/src/core/sockfd.c
@@ -0,0 +1,231 @@
+//
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdint.h>
+#include <string.h>
+
+#include <nng/nng.h>
+
+#include "core/nng_impl.h"
+#include "core/sockfd.h"
+
+// We will accept up to this many FDs to be queued up for
+// accept, before we start rejecting with NNG_ENOSPC. Once
+// accept is performed, then another slot is available.
+#define NNG_SFD_LISTEN_QUEUE 16
+
+int
+nni_sfd_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
+{
+ NNI_ARG_UNUSED(dp);
+ NNI_ARG_UNUSED(url);
+ // No dialer support for this.
+ return (NNG_ENOTSUP);
+}
+
+typedef struct {
+ nng_stream_listener ops;
+ int listen_cnt; // how many FDs are waiting
+ int listen_q[NNG_SFD_LISTEN_QUEUE];
+ bool closed;
+ nni_list accept_q;
+ nni_mtx mtx;
+} sfd_listener;
+
+static void
+sfd_listener_free(void *arg)
+{
+ sfd_listener *l = arg;
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+static void
+sfd_listener_close(void *arg)
+{
+ nni_aio *aio;
+ sfd_listener *l = arg;
+ nni_mtx_lock(&l->mtx);
+ l->closed = true;
+ while ((aio = nni_list_first(&l->accept_q)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ for (int i = 0; i < l->listen_cnt; i++) {
+ nni_sfd_close_fd(l->listen_q[i]);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+static int
+sfd_listener_listen(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+ // nothing really for us to do
+ return (0);
+}
+
+static void
+sfd_start_conn(sfd_listener *l, nni_aio *aio)
+{
+ int fd;
+ int rv;
+ nni_sfd_conn *c;
+ NNI_ASSERT(l->listen_cnt > 0);
+ fd = l->listen_q[0];
+ for (int i = 1; i < l->listen_cnt; i++) {
+ l->listen_q[i] = l->listen_q[i + 1];
+ }
+ l->listen_cnt--;
+ if ((rv = nni_sfd_conn_alloc(&c, fd)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_sfd_close_fd(fd);
+ } else {
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+static void
+sfd_cancel_accept(nni_aio *aio, void *arg, int rv)
+{
+ sfd_listener *l = arg;
+
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+sfd_listener_accept(void *arg, nng_aio *aio)
+{
+ sfd_listener *l = arg;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ if (l->listen_cnt) {
+ sfd_start_conn(l, aio);
+ } else {
+ nni_aio_schedule(aio, sfd_cancel_accept, l);
+ nni_aio_list_append(&l->accept_q, aio);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+static int
+sfd_listener_set_fd(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sfd_listener *l = arg;
+ nni_aio *aio;
+ int fd;
+ int rv;
+
+ if ((rv = nni_copyin_int(&fd, buf, sz, NNI_MININT, NNI_MAXINT, t)) !=
+ 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(&l->mtx);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+
+ if (l->listen_cnt == NNG_SFD_LISTEN_QUEUE) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ENOSPC);
+ }
+
+ l->listen_q[l->listen_cnt++] = fd;
+
+ // if someone was waiting in accept, give it to them now
+ if ((aio = nni_list_first(&l->accept_q)) != NULL) {
+ nni_aio_list_remove(aio);
+ sfd_start_conn(l, aio);
+ }
+
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+static int
+sfd_listener_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ NNI_ARG_UNUSED(arg);
+ nng_sockaddr sa;
+ sa.s_family = NNG_AF_UNSPEC;
+ return (nni_copyout_sockaddr(&sa, buf, szp, t));
+}
+
+static const nni_option sfd_listener_options[] = {
+ {
+ .o_name = NNG_OPT_SOCKET_FD,
+ .o_set = sfd_listener_set_fd,
+ },
+ {
+ .o_name = NNG_OPT_LOCADDR,
+ .o_get = sfd_listener_get_addr,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+sfd_listener_get(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ sfd_listener *l = arg;
+ return (nni_getopt(sfd_listener_options, name, l, buf, szp, t));
+}
+
+static int
+sfd_listener_set(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ sfd_listener *l = arg;
+ return (nni_setopt(sfd_listener_options, name, l, buf, sz, t));
+}
+
+int
+nni_sfd_listener_alloc(nng_stream_listener **lp, const nng_url *url)
+{
+ sfd_listener *l;
+
+ NNI_ARG_UNUSED(url);
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ memset(l->listen_q, 0, sizeof(l->listen_q));
+ l->listen_cnt = 0;
+ nni_aio_list_init(&l->accept_q);
+ nni_mtx_init(&l->mtx);
+
+ l->ops.sl_free = sfd_listener_free;
+ l->ops.sl_close = sfd_listener_close;
+ l->ops.sl_listen = sfd_listener_listen;
+ l->ops.sl_accept = sfd_listener_accept;
+ l->ops.sl_get = sfd_listener_get;
+ l->ops.sl_set = sfd_listener_set;
+
+ *lp = (void *) l;
+ return (0);
+}
diff --git a/src/core/sockfd.h b/src/core/sockfd.h
new file mode 100644
index 00000000..ca37f0e1
--- /dev/null
+++ b/src/core/sockfd.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_FDC_H
+#define CORE_FDC_H
+
+#include "core/nng_impl.h"
+
+// the nni_sfd_conn struct is provided by platform code to wrap
+// an arbitrary byte stream file descriptor (UNIX) or handle (Windows)
+// with a nng_stream.
+typedef struct nni_sfd_conn nni_sfd_conn;
+extern int nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd);
+extern int nni_sfd_dialer_alloc(nng_stream_dialer **, const nng_url *);
+extern int nni_sfd_listener_alloc(nng_stream_listener **, const nng_url *);
+
+// this is used to close a file descriptor, in case we cannot
+// create a connection (or if the listener is closed before the
+// connection is accepted.)
+extern void nni_sfd_close_fd(int fd);
+
+#endif // CORE_FDC_H
diff --git a/src/core/stream.c b/src/core/stream.c
index 418bfb15..99002fcd 100644
--- a/src/core/stream.c
+++ b/src/core/stream.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -15,6 +15,7 @@
#include "core/nng_impl.h"
#include <nng/supplemental/tls/tls.h>
+#include "core/sockfd.h"
#include "core/tcp.h"
#include "supplemental/tls/tls_api.h"
#include "supplemental/websocket/websocket.h"
@@ -94,6 +95,13 @@ static struct {
.dialer_alloc = nni_ws_dialer_alloc,
.listener_alloc = nni_ws_listener_alloc,
},
+#ifdef NNG_TRANSPORT_FDC
+ {
+ .scheme = "socket",
+ .dialer_alloc = nni_sfd_dialer_alloc,
+ .listener_alloc = nni_sfd_listener_alloc,
+ },
+#endif
{
.scheme = NULL,
},