aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_sockfd.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2023-12-18 01:12:01 -0800
committerGarrett D'Amore <garrett@damore.org>2023-12-29 15:20:21 -0800
commit9caabf76621ba81e7fed5df42971f355b648ff59 (patch)
tree2f243965e202862f36c9d57c3053f57806bf70cf /src/platform/posix/posix_sockfd.c
parente5261536d4f72dccbf1a424bfe426f9635b9d1c3 (diff)
downloadnng-9caabf76621ba81e7fed5df42971f355b648ff59.tar.gz
nng-9caabf76621ba81e7fed5df42971f355b648ff59.tar.bz2
nng-9caabf76621ba81e7fed5df42971f355b648ff59.zip
fixes #1746 Create a new socket:// transport for socketpair() based connections
This transport only listens, and creates connections when the application calls setopt on the lister with NNG_OPT_SOCKET_FD, to pass a file descriptor. The FD is turned into an nng_stream, and utilized for SP. The protocol over the descriptor is identical to the TCP protocol (not the IPC protocol). The options for peer information are borrowed from the IPC transport, as they may be useful for these purposes. This includes a test suite and full documentation.
Diffstat (limited to 'src/platform/posix/posix_sockfd.c')
-rw-r--r--src/platform/posix/posix_sockfd.c493
1 files changed, 493 insertions, 0 deletions
diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c
new file mode 100644
index 00000000..b0d88a31
--- /dev/null
+++ b/src/platform/posix/posix_sockfd.c
@@ -0,0 +1,493 @@
+//
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+// Copyright 2019 Devolutions <info@devolutions.net>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <errno.h>
+#include <poll.h>
+#include <stdbool.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#include "core/sockfd.h"
+#include "platform/posix/posix_aio.h"
+#include "platform/posix/posix_peerid.h"
+
+struct nni_sfd_conn {
+ nng_stream stream;
+ nni_posix_pfd *pfd;
+ int fd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
+ nni_reap_node reap;
+};
+
+static void
+sfd_dowrite(nni_sfd_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->writeq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov *aiov;
+ struct iovec iovec[16];
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len > 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ if ((n = writev(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ // If we didn't send all the data, the caller will
+ // resubmit. As a corollary, callers should probably
+ // only have one message on the write queue at a time.
+ nni_aio_bump_count(aio, n);
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+static void
+sfd_doread(nni_sfd_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->readq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov *aiov;
+ struct iovec iovec[16];
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len != 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ if (n == 0) {
+ // Zero indicates a closed descriptor.
+ // This implicitly completes this (all!) aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECONNSHUT);
+ continue;
+ }
+
+ nni_aio_bump_count(aio, n);
+
+ // We completed the entire operation on this aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+static void
+sfd_error(void *arg, int err)
+{
+ nni_sfd_conn *c = arg;
+ nni_aio *aio;
+
+ nni_mtx_lock(&c->mtx);
+ while (((aio = nni_list_first(&c->readq)) != NULL) ||
+ ((aio = nni_list_first(&c->writeq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, err);
+ }
+ if (c->pfd != NULL) {
+ nni_posix_pfd_close(c->pfd);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+sfd_close(void *arg)
+{
+ nni_sfd_conn *c = arg;
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ nni_aio *aio;
+ c->closed = true;
+ while (((aio = nni_list_first(&c->readq)) != NULL) ||
+ ((aio = nni_list_first(&c->writeq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ if (c->pfd != NULL) {
+ nni_posix_pfd_close(c->pfd);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+// sfd_fini may block briefly waiting for the pollq thread.
+// To get that out of our context, we simply reap this.
+static void
+sfd_fini(void *arg)
+{
+ nni_sfd_conn *c = arg;
+ sfd_close(c);
+ if (c->pfd != NULL) {
+ nni_posix_pfd_fini(c->pfd);
+ }
+ nni_mtx_fini(&c->mtx);
+
+ NNI_FREE_STRUCT(c);
+}
+
+static nni_reap_list sfd_reap_list = {
+ .rl_offset = offsetof(nni_sfd_conn, reap),
+ .rl_func = sfd_fini,
+};
+static void
+sfd_free(void *arg)
+{
+ struct nni_sfd_conn *c = arg;
+ nni_reap(&sfd_reap_list, c);
+}
+
+static void
+sfd_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+{
+ struct nni_sfd_conn *c = arg;
+
+ if (events & (NNI_POLL_HUP | NNI_POLL_ERR | NNI_POLL_INVAL)) {
+ sfd_error(c, NNG_ECONNSHUT);
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if ((events & NNI_POLL_IN) != 0) {
+ sfd_doread(c);
+ }
+ if ((events & NNI_POLL_OUT) != 0) {
+ sfd_dowrite(c);
+ }
+ events = 0;
+ if (!nni_list_empty(&c->writeq)) {
+ events |= NNI_POLL_OUT;
+ }
+ if (!nni_list_empty(&c->readq)) {
+ events |= NNI_POLL_IN;
+ }
+ if ((!c->closed) && (events != 0)) {
+ nni_posix_pfd_arm(pfd, events);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+sfd_cancel(nni_aio *aio, void *arg, int rv)
+{
+ nni_sfd_conn *c = arg;
+
+ nni_mtx_lock(&c->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+sfd_send(void *arg, nni_aio *aio)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, sfd_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->writeq, aio);
+
+ if (nni_list_first(&c->writeq) == aio) {
+ sfd_dowrite(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->writeq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLOUT);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+sfd_recv(void *arg, nni_aio *aio)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, sfd_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->readq, aio);
+
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
+ if (nni_list_first(&c->readq) == aio) {
+ sfd_doread(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->readq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLIN);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static int
+sfd_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ NNI_ARG_UNUSED(arg);
+ nng_sockaddr sa;
+ sa.s_family = NNG_AF_UNSPEC;
+ return (nni_copyout_sockaddr(&sa, buf, szp, t));
+}
+
+static int
+sfd_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id = 0;
+
+ rv = nni_posix_peerid(c->fd, &id, &ignore, &ignore, &ignore);
+ if (rv != 0) {
+ return (rv);
+ }
+ return (nni_copyout_u64(id, buf, szp, t));
+}
+
+static int
+sfd_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id = 0;
+
+ rv = nni_posix_peerid(c->fd, &ignore, &id, &ignore, &ignore);
+ if (rv != 0) {
+ return (rv);
+ }
+ return (nni_copyout_u64(id, buf, szp, t));
+}
+
+static int
+sfd_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id = 0;
+
+ rv = nni_posix_peerid(c->fd, &ignore, &ignore, &ignore, &id);
+ if (rv != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal zone id (illumos/Solaris)
+ return (NNG_ENOTSUP);
+ }
+ return (nni_copyout_u64(id, buf, szp, t));
+}
+
+static int
+sfd_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id = 0;
+
+ rv = nni_posix_peerid(c->fd, &ignore, &ignore, &id, &ignore);
+ if (rv != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal process id
+ return (NNG_ENOTSUP);
+ }
+ return (nni_copyout_u64(id, buf, szp, t));
+}
+
+static const nni_option sfd_options[] = {
+ {
+ .o_name = NNG_OPT_LOCADDR,
+ .o_get = sfd_get_addr,
+ },
+ {
+ .o_name = NNG_OPT_REMADDR,
+ .o_get = sfd_get_addr,
+ },
+ {
+ .o_name = NNG_OPT_PEER_PID,
+ .o_get = sfd_get_peer_pid,
+ },
+ {
+ .o_name = NNG_OPT_PEER_UID,
+ .o_get = sfd_get_peer_uid,
+ },
+ {
+ .o_name = NNG_OPT_PEER_GID,
+ .o_get = sfd_get_peer_gid,
+ },
+ {
+ .o_name = NNG_OPT_PEER_ZONEID,
+ .o_get = sfd_get_peer_zoneid,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+sfd_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ return (nni_getopt(sfd_options, name, c, buf, szp, t));
+}
+
+static int
+sfd_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ nni_sfd_conn *c = arg;
+ return (nni_setopt(sfd_options, name, c, buf, sz, t));
+}
+
+int
+nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd)
+{
+ nni_sfd_conn *c;
+ int rv;
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_posix_pfd_init(&c->pfd, fd)) != 0) {
+ NNI_FREE_STRUCT(c);
+ return (rv);
+ }
+
+ c->closed = false;
+ c->fd = fd;
+
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->readq);
+ nni_aio_list_init(&c->writeq);
+
+ c->stream.s_free = sfd_free;
+ c->stream.s_close = sfd_close;
+ c->stream.s_recv = sfd_recv;
+ c->stream.s_send = sfd_send;
+ c->stream.s_get = sfd_get;
+ c->stream.s_set = sfd_set;
+
+ nni_posix_pfd_set_cb(c->pfd, sfd_cb, c);
+
+ *cp = c;
+ return (0);
+}
+
+void
+nni_sfd_close_fd(int fd)
+{
+ close(fd);
+} \ No newline at end of file