aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/CMakeLists.txt6
-rw-r--r--src/platform/posix/posix_peerid.c122
-rw-r--r--src/platform/posix/posix_peerid.h25
-rw-r--r--src/platform/posix/posix_socketpair.c43
-rw-r--r--src/platform/posix/posix_sockfd.c493
-rw-r--r--src/platform/windows/CMakeLists.txt1
-rw-r--r--src/platform/windows/win_socketpair.c57
7 files changed, 746 insertions, 1 deletions
diff --git a/src/platform/posix/CMakeLists.txt b/src/platform/posix/CMakeLists.txt
index dcfea221..b8e3782e 100644
--- a/src/platform/posix/CMakeLists.txt
+++ b/src/platform/posix/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2020 Staysail Systems, Inc. <info@staystail.tech>
+# Copyright 2023 Staysail Systems, Inc. <info@staystail.tech>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
@@ -61,6 +61,7 @@ if (NNG_PLATFORM_POSIX)
nng_check_sym(LOCAL_PEERPID sys/un.h NNG_HAVE_LOCALPEERPID)
nng_check_sym(getpeerucred ucred.h NNG_HAVE_GETPEERUCRED)
nng_check_sym(atomic_flag_test_and_set stdatomic.h NNG_HAVE_STDATOMIC)
+ nng_check_sym(socketpair sys/socket.h NNG_HAVE_SOCKETPAIR)
nng_sources(
posix_impl.h
@@ -78,9 +79,12 @@ if (NNG_PLATFORM_POSIX)
posix_ipcconn.c
posix_ipcdial.c
posix_ipclisten.c
+ posix_peerid.c
posix_pipe.c
posix_resolv_gai.c
posix_sockaddr.c
+ posix_socketpair.c
+ posix_sockfd.c
posix_tcpconn.c
posix_tcpdial.c
posix_tcplisten.c
diff --git a/src/platform/posix/posix_peerid.c b/src/platform/posix/posix_peerid.c
new file mode 100644
index 00000000..e0020150
--- /dev/null
+++ b/src/platform/posix/posix_peerid.c
@@ -0,0 +1,122 @@
+//
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+// Copyright 2019 Devolutions <info@devolutions.net>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <stdbool.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+
+#if defined(NNG_HAVE_GETPEERUCRED)
+#include <ucred.h>
+#elif defined(NNG_HAVE_LOCALPEERCRED) || defined(NNG_HAVE_SOCKPEERCRED)
+#include <sys/ucred.h>
+#endif
+#if defined(NNG_HAVE_GETPEEREID)
+#include <sys/types.h>
+#include <unistd.h>
+#endif
+
+#ifndef MSG_NOSIGNAL
+#define MSG_NOSIGNAL 0
+#endif
+
+#ifndef SOL_LOCAL
+#define SOL_LOCAL 0
+#endif
+
+
+int
+nni_posix_peerid(int fd, uint64_t *euid, uint64_t *egid, uint64_t *prid,
+ uint64_t *znid)
+{
+#if defined(NNG_HAVE_GETPEEREID) && !defined(NNG_HAVE_LOCALPEERCRED)
+ uid_t uid;
+ gid_t gid;
+
+ if (getpeereid(fd, &uid, &gid) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = uid;
+ *egid = gid;
+ *prid = (uint64_t) -1;
+ *znid = (uint64_t) -1;
+ return (0);
+#elif defined(NNG_HAVE_GETPEERUCRED)
+ ucred_t *ucp = NULL;
+ if (getpeerucred(fd, &ucp) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = ucred_geteuid(ucp);
+ *egid = ucred_getegid(ucp);
+ *prid = ucred_getpid(ucp);
+ *znid = ucred_getzoneid(ucp);
+ ucred_free(ucp);
+ return (0);
+#elif defined(NNG_HAVE_SOCKPEERCRED)
+ struct sockpeercred uc;
+ socklen_t len = sizeof(uc);
+ if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = uc.uid;
+ *egid = uc.gid;
+ *prid = uc.pid;
+ *znid = (uint64_t) -1;
+ return (0);
+#elif defined(NNG_HAVE_SOPEERCRED)
+ struct ucred uc;
+ socklen_t len = sizeof(uc);
+ if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = uc.uid;
+ *egid = uc.gid;
+ *prid = uc.pid;
+ *znid = (uint64_t) -1;
+ return (0);
+#elif defined(NNG_HAVE_LOCALPEERCRED)
+ struct xucred xu;
+ socklen_t len = sizeof(xu);
+ if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = xu.cr_uid;
+ *egid = xu.cr_gid;
+ *prid = (uint64_t) -1;
+ *znid = (uint64_t) -1;
+#if defined(NNG_HAVE_LOCALPEERPID) // documented on macOS since 10.8
+ {
+ pid_t pid;
+ if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERPID, &pid, &len) ==
+ 0) {
+ *prid = (uint64_t) pid;
+ }
+ }
+#endif // NNG_HAVE_LOCALPEERPID
+ return (0);
+#else
+ if (fd < 0) {
+ return (NNG_ECLOSED);
+ }
+ NNI_ARG_UNUSED(euid);
+ NNI_ARG_UNUSED(egid);
+ NNI_ARG_UNUSED(prid);
+ NNI_ARG_UNUSED(znid);
+ return (NNG_ENOTSUP);
+#endif
+}
+
diff --git a/src/platform/posix/posix_peerid.h b/src/platform/posix/posix_peerid.h
new file mode 100644
index 00000000..57e9abff
--- /dev/null
+++ b/src/platform/posix/posix_peerid.h
@@ -0,0 +1,25 @@
+//
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef PLATFORM_POSIX_PEERID_H
+#define PLATFORM_POSIX_PEERID_H
+
+// This file defines structures we will use for emulating asynchronous I/O
+// on POSIX. POSIX lacks the support for callback based asynchronous I/O
+// that we have on Windows, although it has a non-widely support aio layer
+// that is not very performant on many systems. So we emulate this using
+// one of several possible different backends.
+
+#include "core/nng_impl.h"
+#include <sys/types.h>
+
+int nni_posix_peerid(
+ int fd, uint64_t *euid, uint64_t *egid, uint64_t *prid, uint64_t *znid);
+
+#endif // PLATFORM_POSIX_PEERID_H \ No newline at end of file
diff --git a/src/platform/posix/posix_socketpair.c b/src/platform/posix/posix_socketpair.c
new file mode 100644
index 00000000..3a01ad2b
--- /dev/null
+++ b/src/platform/posix/posix_socketpair.c
@@ -0,0 +1,43 @@
+//
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_HAVE_SOCKETPAIR
+// This provides an implementation of socketpair(), which is supposed
+// to be present on XPG6 and newer. This trivial implementation
+// only supports SOCK_STREAM over AF_UNIX. Which is sufficient for
+// most purposes. The fds array should point to an int[2].
+#include <errno.h>
+#include <sys/socket.h>
+
+int
+nni_socket_pair(int fds[2])
+{
+ int rv;
+ rv = socketpair(PF_UNIX, SOCK_STREAM, 0, fds);
+ if (rv != 0) {
+ return (nni_plat_errno(errno));
+ }
+
+#ifdef SO_NOSIGPIPE
+ int set = 1;
+ setsockopt(fds[0], SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
+ setsockopt(fds[1], SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
+#endif
+
+ return (0);
+}
+#else
+int
+nni_socket_pair(int *fds)
+{
+ return (NNG_ENOTSUP);
+}
+#endif \ No newline at end of file
diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c
new file mode 100644
index 00000000..b0d88a31
--- /dev/null
+++ b/src/platform/posix/posix_sockfd.c
@@ -0,0 +1,493 @@
+//
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+// Copyright 2019 Devolutions <info@devolutions.net>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <errno.h>
+#include <poll.h>
+#include <stdbool.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#include "core/sockfd.h"
+#include "platform/posix/posix_aio.h"
+#include "platform/posix/posix_peerid.h"
+
+struct nni_sfd_conn {
+ nng_stream stream;
+ nni_posix_pfd *pfd;
+ int fd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
+ nni_reap_node reap;
+};
+
+static void
+sfd_dowrite(nni_sfd_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->writeq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov *aiov;
+ struct iovec iovec[16];
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len > 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ if ((n = writev(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ // If we didn't send all the data, the caller will
+ // resubmit. As a corollary, callers should probably
+ // only have one message on the write queue at a time.
+ nni_aio_bump_count(aio, n);
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+static void
+sfd_doread(nni_sfd_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->readq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov *aiov;
+ struct iovec iovec[16];
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len != 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ if (n == 0) {
+ // Zero indicates a closed descriptor.
+ // This implicitly completes this (all!) aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECONNSHUT);
+ continue;
+ }
+
+ nni_aio_bump_count(aio, n);
+
+ // We completed the entire operation on this aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+static void
+sfd_error(void *arg, int err)
+{
+ nni_sfd_conn *c = arg;
+ nni_aio *aio;
+
+ nni_mtx_lock(&c->mtx);
+ while (((aio = nni_list_first(&c->readq)) != NULL) ||
+ ((aio = nni_list_first(&c->writeq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, err);
+ }
+ if (c->pfd != NULL) {
+ nni_posix_pfd_close(c->pfd);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+sfd_close(void *arg)
+{
+ nni_sfd_conn *c = arg;
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ nni_aio *aio;
+ c->closed = true;
+ while (((aio = nni_list_first(&c->readq)) != NULL) ||
+ ((aio = nni_list_first(&c->writeq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ if (c->pfd != NULL) {
+ nni_posix_pfd_close(c->pfd);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+// sfd_fini may block briefly waiting for the pollq thread.
+// To get that out of our context, we simply reap this.
+static void
+sfd_fini(void *arg)
+{
+ nni_sfd_conn *c = arg;
+ sfd_close(c);
+ if (c->pfd != NULL) {
+ nni_posix_pfd_fini(c->pfd);
+ }
+ nni_mtx_fini(&c->mtx);
+
+ NNI_FREE_STRUCT(c);
+}
+
+static nni_reap_list sfd_reap_list = {
+ .rl_offset = offsetof(nni_sfd_conn, reap),
+ .rl_func = sfd_fini,
+};
+static void
+sfd_free(void *arg)
+{
+ struct nni_sfd_conn *c = arg;
+ nni_reap(&sfd_reap_list, c);
+}
+
+static void
+sfd_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+{
+ struct nni_sfd_conn *c = arg;
+
+ if (events & (NNI_POLL_HUP | NNI_POLL_ERR | NNI_POLL_INVAL)) {
+ sfd_error(c, NNG_ECONNSHUT);
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if ((events & NNI_POLL_IN) != 0) {
+ sfd_doread(c);
+ }
+ if ((events & NNI_POLL_OUT) != 0) {
+ sfd_dowrite(c);
+ }
+ events = 0;
+ if (!nni_list_empty(&c->writeq)) {
+ events |= NNI_POLL_OUT;
+ }
+ if (!nni_list_empty(&c->readq)) {
+ events |= NNI_POLL_IN;
+ }
+ if ((!c->closed) && (events != 0)) {
+ nni_posix_pfd_arm(pfd, events);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+sfd_cancel(nni_aio *aio, void *arg, int rv)
+{
+ nni_sfd_conn *c = arg;
+
+ nni_mtx_lock(&c->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+sfd_send(void *arg, nni_aio *aio)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, sfd_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->writeq, aio);
+
+ if (nni_list_first(&c->writeq) == aio) {
+ sfd_dowrite(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->writeq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLOUT);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+sfd_recv(void *arg, nni_aio *aio)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, sfd_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->readq, aio);
+
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
+ if (nni_list_first(&c->readq) == aio) {
+ sfd_doread(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->readq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLIN);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static int
+sfd_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ NNI_ARG_UNUSED(arg);
+ nng_sockaddr sa;
+ sa.s_family = NNG_AF_UNSPEC;
+ return (nni_copyout_sockaddr(&sa, buf, szp, t));
+}
+
+static int
+sfd_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id = 0;
+
+ rv = nni_posix_peerid(c->fd, &id, &ignore, &ignore, &ignore);
+ if (rv != 0) {
+ return (rv);
+ }
+ return (nni_copyout_u64(id, buf, szp, t));
+}
+
+static int
+sfd_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id = 0;
+
+ rv = nni_posix_peerid(c->fd, &ignore, &id, &ignore, &ignore);
+ if (rv != 0) {
+ return (rv);
+ }
+ return (nni_copyout_u64(id, buf, szp, t));
+}
+
+static int
+sfd_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id = 0;
+
+ rv = nni_posix_peerid(c->fd, &ignore, &ignore, &ignore, &id);
+ if (rv != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal zone id (illumos/Solaris)
+ return (NNG_ENOTSUP);
+ }
+ return (nni_copyout_u64(id, buf, szp, t));
+}
+
+static int
+sfd_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id = 0;
+
+ rv = nni_posix_peerid(c->fd, &ignore, &ignore, &id, &ignore);
+ if (rv != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal process id
+ return (NNG_ENOTSUP);
+ }
+ return (nni_copyout_u64(id, buf, szp, t));
+}
+
+static const nni_option sfd_options[] = {
+ {
+ .o_name = NNG_OPT_LOCADDR,
+ .o_get = sfd_get_addr,
+ },
+ {
+ .o_name = NNG_OPT_REMADDR,
+ .o_get = sfd_get_addr,
+ },
+ {
+ .o_name = NNG_OPT_PEER_PID,
+ .o_get = sfd_get_peer_pid,
+ },
+ {
+ .o_name = NNG_OPT_PEER_UID,
+ .o_get = sfd_get_peer_uid,
+ },
+ {
+ .o_name = NNG_OPT_PEER_GID,
+ .o_get = sfd_get_peer_gid,
+ },
+ {
+ .o_name = NNG_OPT_PEER_ZONEID,
+ .o_get = sfd_get_peer_zoneid,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+sfd_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ return (nni_getopt(sfd_options, name, c, buf, szp, t));
+}
+
+static int
+sfd_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ return (nni_setopt(sfd_options, name, c, buf, sz, t));
+}
+
+int
+nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd)
+{
+ nni_sfd_conn *c;
+ int rv;
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_posix_pfd_init(&c->pfd, fd)) != 0) {
+ NNI_FREE_STRUCT(c);
+ return (rv);
+ }
+
+ c->closed = false;
+ c->fd = fd;
+
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->readq);
+ nni_aio_list_init(&c->writeq);
+
+ c->stream.s_free = sfd_free;
+ c->stream.s_close = sfd_close;
+ c->stream.s_recv = sfd_recv;
+ c->stream.s_send = sfd_send;
+ c->stream.s_get = sfd_get;
+ c->stream.s_set = sfd_set;
+
+ nni_posix_pfd_set_cb(c->pfd, sfd_cb, c);
+
+ *cp = c;
+ return (0);
+}
+
+void
+nni_sfd_close_fd(int fd)
+{
+ close(fd);
+} \ No newline at end of file
diff --git a/src/platform/windows/CMakeLists.txt b/src/platform/windows/CMakeLists.txt
index d1d158e0..adf67ebd 100644
--- a/src/platform/windows/CMakeLists.txt
+++ b/src/platform/windows/CMakeLists.txt
@@ -39,6 +39,7 @@ if (NNG_PLATFORM_WINDOWS)
win_rand.c
win_resolv.c
win_sockaddr.c
+ win_socketpair.c
win_tcp.c
win_tcpconn.c
win_tcpdial.c
diff --git a/src/platform/windows/win_socketpair.c b/src/platform/windows/win_socketpair.c
new file mode 100644
index 00000000..0ed0443a
--- /dev/null
+++ b/src/platform/windows/win_socketpair.c
@@ -0,0 +1,57 @@
+//
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+
+#ifdef NNG_HAVE_SOCKETPAIR_TODO
+// TODO: Windows lacks socketpair. We can emulate it with an explcit
+// implementation based on AF_UNIX.
+
+#include <errno.h>
+#include <sys/socket.h>
+
+int
+nni_socket_pair(int *fds)
+{
+ int rv;
+ rv = socketpair(PF_UNIX, SOCK_STREAM, 0, fds);
+ if (rv != 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ return (0);
+}
+#else
+int
+nni_socket_pair(int fds[2])
+{
+ NNI_ARG_UNUSED(fds);
+ return (NNG_ENOTSUP);
+}
+
+// This is also the fdc transport.
+
+typedef struct nni_sfd_conn nni_sfd_conn;
+
+void
+nni_sfd_close_fd(int fd)
+{
+ NNI_ARG_UNUSED(fd);
+}
+
+int
+nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd)
+{
+ NNI_ARG_UNUSED(cp);
+ NNI_ARG_UNUSED(fd);
+ return (NNG_ENOTSUP);
+}
+
+#endif