aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_epdesc.c597
-rw-r--r--src/platform/posix/posix_ipc.c250
-rw-r--r--src/platform/posix/posix_ipc.h48
-rw-r--r--src/platform/posix/posix_ipcconn.c494
-rw-r--r--src/platform/posix/posix_ipcdial.c238
-rw-r--r--src/platform/posix/posix_ipclisten.c373
-rw-r--r--src/platform/posix/posix_pipedesc.c503
-rw-r--r--src/platform/posix/posix_resolv_gai.c1
-rw-r--r--src/platform/posix/posix_tcpconn.c1
-rw-r--r--src/platform/posix/posix_tcpdial.c1
-rw-r--r--src/platform/posix/posix_tcplisten.c5
-rw-r--r--src/platform/posix/posix_udp.c3
12 files changed, 1156 insertions, 1358 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
deleted file mode 100644
index d796765a..00000000
--- a/src/platform/posix/posix_epdesc.c
+++ /dev/null
@@ -1,597 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_pollq.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <poll.h>
-#include <stdbool.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-#ifdef sun
-#undef sun
-#endif
-
-#ifdef SOCK_CLOEXEC
-#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define NNI_STREAM_SOCKTYPE SOCK_STREAM
-#endif
-
-struct nni_posix_epdesc {
- nni_posix_pfd * pfd;
- nni_list connectq;
- nni_list acceptq;
- bool closed;
- bool started;
- bool ipcbound; // if true unlink socket on exit
- struct sockaddr_storage locaddr;
- struct sockaddr_storage remaddr;
- socklen_t loclen;
- socklen_t remlen;
- mode_t perms; // UNIX sockets only
- int mode; // end point mode (dialer/listener)
- nni_mtx mtx;
-};
-
-static void nni_epdesc_connect_cb(nni_posix_pfd *, int, void *);
-static void nni_epdesc_accept_cb(nni_posix_pfd *, int, void *);
-
-static void
-nni_epdesc_cancel(nni_aio *aio, int rv)
-{
- nni_posix_epdesc *ed = nni_aio_get_prov_data(aio);
- nni_posix_pfd * pfd = NULL;
-
- NNI_ASSERT(rv != 0);
- nni_mtx_lock(&ed->mtx);
- if (nni_aio_list_active(aio)) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- }
- if ((ed->mode == NNI_EP_MODE_DIAL) && nni_list_empty(&ed->connectq) &&
- ((pfd = ed->pfd) != NULL)) {
- nni_posix_pfd_close(pfd);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-static void
-nni_epdesc_finish(nni_aio *aio, int rv, nni_posix_pfd *newpfd)
-{
- nni_posix_pipedesc *pd = NULL;
-
- // acceptq or connectq.
- nni_aio_list_remove(aio);
-
- if (rv != 0) {
- NNI_ASSERT(newpfd == NULL);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- NNI_ASSERT(newpfd != NULL);
- if ((rv = nni_posix_pipedesc_init(&pd, newpfd)) != 0) {
- nni_posix_pfd_fini(newpfd);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_set_output(aio, 0, pd);
- nni_aio_finish(aio, 0, 0);
-}
-
-static void
-nni_epdesc_doaccept(nni_posix_epdesc *ed)
-{
- nni_aio *aio;
-
- while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- int newfd;
- int fd;
- int rv;
- nni_posix_pfd *pfd;
-
- fd = nni_posix_pfd_fd(ed->pfd);
-
-#ifdef NNG_USE_ACCEPT4
- newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
- if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
- newfd = accept(fd, NULL, NULL);
- }
-#else
- newfd = accept(fd, NULL, NULL);
-#endif
- if (newfd < 0) {
- switch (errno) {
- case EAGAIN:
-#ifdef EWOULDBLOCK
-#if EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
-#endif
-#endif
- rv = nni_posix_pfd_arm(ed->pfd, POLLIN);
- if (rv != 0) {
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
- // Come back later...
- return;
- case ECONNABORTED:
- case ECONNRESET:
- // Eat them, they aren't interesting.
- continue;
- default:
- // Error this one, but keep moving to the next.
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
- close(newfd);
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
-
- nni_epdesc_finish(aio, 0, pfd);
- }
-}
-
-static void
-nni_epdesc_doclose(nni_posix_epdesc *ed)
-{
- nni_aio *aio;
-
- ed->closed = true;
- while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- nni_epdesc_finish(aio, NNG_ECLOSED, 0);
- }
- while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- nni_epdesc_finish(aio, NNG_ECLOSED, 0);
- }
-
- if (ed->pfd != NULL) {
-
- nni_posix_pfd_close(ed->pfd);
- }
-
- // clean up stale UNIX socket when closing the server.
- if (ed->ipcbound) {
- struct sockaddr_un *sun = (void *) &ed->locaddr;
- (void) unlink(sun->sun_path);
- }
-}
-
-static void
-nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_epdesc *ed = arg;
-
- NNI_ARG_UNUSED(pfd);
-
- nni_mtx_lock(&ed->mtx);
- if (events & POLLNVAL) {
- nni_epdesc_doclose(ed);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
-
- // Anything else will turn up in accept.
- nni_epdesc_doaccept(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-void
-nni_posix_epdesc_close(nni_posix_epdesc *ed)
-{
- nni_mtx_lock(&ed->mtx);
- nni_epdesc_doclose(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_listen(nni_posix_epdesc *ed)
-{
- int len;
- struct sockaddr_storage *ss;
- int rv;
- int fd;
- nni_posix_pfd * pfd;
-
- nni_mtx_lock(&ed->mtx);
-
- if (ed->started) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ESTATE);
- }
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ECLOSED);
- }
- if ((len = ed->loclen) == 0) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EADDRINVAL);
- }
-
- ss = &ed->locaddr;
- len = ed->loclen;
-
- if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- nni_mtx_unlock(&ed->mtx);
- return (nni_plat_errno(errno));
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
-#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL)
- if (ss->ss_family != AF_UNIX) {
- int on = 1;
- // If for some reason this doesn't work, it's probably ok.
- // Second bind will fail.
- (void) setsockopt(
- fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
- }
-#endif
-
- if (bind(fd, (struct sockaddr *) ss, len) < 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
- // For UNIX domain sockets, optionally set the permission bits.
- // This is done after the bind and before listen, and on the file
- // rather than the file descriptor.
- // Experiments have shown that chmod() works correctly, provided that
- // it is done *before* the listen() operation, whereas fchmod seems to
- // have no impact. This behavior was observed on both macOS and Linux.
- // YMMV on other platforms.
- if (ss->ss_family == AF_UNIX) {
- ed->ipcbound = true;
- if (ed->perms != 0) {
- struct sockaddr_un *sun = (void *) ss;
- mode_t perms = ed->perms & ~(S_IFMT);
- if ((rv = chmod(sun->sun_path, perms)) != 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
- }
- }
-
- // Listen -- 128 depth is probably sufficient. If it isn't, other
- // bad things are going to happen.
- if (listen(fd, 128) != 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
- nni_posix_pfd_set_cb(pfd, nni_epdesc_accept_cb, ed);
-
- ed->pfd = pfd;
- ed->started = true;
- nni_mtx_unlock(&ed->mtx);
-
- return (0);
-}
-
-void
-nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
-{
- int rv;
-
- // Accept is simpler than the connect case. With accept we just
- // need to wait for the socket to be readable to indicate an incoming
- // connection is ready for us. There isn't anything else for us to
- // do really, as that will have been done in listen.
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
-
- if (!ed->started) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ESTATE);
- return;
- }
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
- if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&ed->acceptq, aio);
- if (nni_list_first(&ed->acceptq) == aio) {
- nni_epdesc_doaccept(ed);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_sockname(nni_posix_epdesc *ed, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = -1;
-
- nni_mtx_lock(&ed->mtx);
- if (ed->pfd != NULL) {
- fd = nni_posix_pfd_fd(ed->pfd);
- }
- nni_mtx_unlock(&ed->mtx);
-
- if (getsockname(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-static void
-nni_epdesc_connect_start(nni_posix_epdesc *ed)
-{
- nni_posix_pfd *pfd;
- int fd;
- int rv;
- nni_aio * aio;
-
-loop:
- if ((aio = nni_list_first(&ed->connectq)) == NULL) {
- return;
- }
-
- NNI_ASSERT(ed->pfd == NULL);
- if (ed->closed) {
- nni_epdesc_finish(aio, NNG_ECLOSED, NULL);
- goto loop;
- }
- ed->started = true;
-
- if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- goto loop;
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- (void) close(fd);
- nni_epdesc_finish(aio, rv, NULL);
- goto loop;
- }
- // Possibly bind.
- if ((ed->loclen != 0) &&
- (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) {
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
-
- if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) {
- // Immediate connect, cool! This probably only happens on
- // loopback, and probably not on every platform.
- nni_epdesc_finish(aio, 0, pfd);
- goto loop;
- }
-
- if (errno != EINPROGRESS) {
- // Some immediate failure occurred.
- if (errno == ENOENT) { // For UNIX domain sockets
- rv = NNG_ECONNREFUSED;
- } else {
- rv = nni_plat_errno(errno);
- }
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
- nni_posix_pfd_set_cb(pfd, nni_epdesc_connect_cb, ed);
- if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
- ed->pfd = pfd;
- // all done... wait for this to signal via callback
-}
-
-void
-nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
- if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- ed->started = true;
- nni_list_append(&ed->connectq, aio);
- if (nni_list_first(&ed->connectq) == aio) {
- // If there was a stale pfd (probably from an aborted or
- // canceled connect attempt), discard it so we start fresh.
- if (ed->pfd != NULL) {
- nni_posix_pfd_fini(ed->pfd);
- ed->pfd = NULL;
- }
- nni_epdesc_connect_start(ed);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-static void
-nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_epdesc *ed = arg;
- nni_aio * aio;
- socklen_t sz;
- int rv;
- int fd;
-
- nni_mtx_lock(&ed->mtx);
- if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) ||
- (pfd != ed->pfd)) {
- // Spurious completion. Just ignore it.
- nni_mtx_unlock(&ed->mtx);
- return;
- }
-
- fd = nni_posix_pfd_fd(pfd);
- sz = sizeof(rv);
-
- if ((events & POLLNVAL) != 0) {
- rv = EBADF;
-
- } else if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
- rv = errno;
- }
-
- switch (rv) {
- case 0:
- // Good connect!
- ed->pfd = NULL;
- nni_epdesc_finish(aio, 0, pfd);
- break;
- case EINPROGRESS: // still connecting... come back later
- nni_mtx_unlock(&ed->mtx);
- return;
- default:
- ed->pfd = NULL;
- nni_epdesc_finish(aio, nni_plat_errno(rv), NULL);
- nni_posix_pfd_fini(pfd);
- break;
- }
-
- // Start another connect running, if any is waiting.
- nni_epdesc_connect_start(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode)
-{
- nni_posix_epdesc *ed;
-
- if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- nni_mtx_init(&ed->mtx);
-
- ed->pfd = NULL;
- ed->closed = false;
- ed->started = false;
- ed->perms = 0; // zero means use default (no change)
- ed->mode = mode;
-
- nni_aio_list_init(&ed->connectq);
- nni_aio_list_init(&ed->acceptq);
- *edp = ed;
- return (0);
-}
-
-void
-nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, size_t len)
-{
- if ((len < 1) || (len > sizeof(struct sockaddr_storage))) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- memcpy(&ed->locaddr, sa, len);
- ed->loclen = len;
- nni_mtx_unlock(&ed->mtx);
-}
-
-void
-nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, size_t len)
-{
- if ((len < 1) || (len > sizeof(struct sockaddr_storage))) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- memcpy(&ed->remaddr, sa, len);
- ed->remlen = len;
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_set_permissions(nni_posix_epdesc *ed, mode_t mode)
-{
- nni_mtx_lock(&ed->mtx);
- if (ed->mode != NNI_EP_MODE_LISTEN) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ENOTSUP);
- }
- if (ed->started) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EBUSY);
- }
- if ((mode & S_IFMT) != 0) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EINVAL);
- }
- ed->perms = mode | S_IFSOCK; // we set IFSOCK to ensure non-zero
- nni_mtx_unlock(&ed->mtx);
- return (0);
-}
-
-void
-nni_posix_epdesc_fini(nni_posix_epdesc *ed)
-{
- nni_posix_pfd *pfd;
-
- nni_mtx_lock(&ed->mtx);
- nni_epdesc_doclose(ed);
- pfd = ed->pfd;
- nni_mtx_unlock(&ed->mtx);
-
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
- nni_mtx_fini(&ed->mtx);
- NNI_FREE_STRUCT(ed);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
deleted file mode 100644
index 5ba3c8fb..00000000
--- a/src/platform/posix/posix_ipc.c
+++ /dev/null
@@ -1,250 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-#ifdef SOCK_CLOEXEC
-#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define NNI_STREAM_SOCKTYPE SOCK_STREAM
-#endif
-
-// Solaris/SunOS systems define this, which collides with our symbol
-// names. Just undefine it now.
-#ifdef sun
-#undef sun
-#endif
-
-static int nni_plat_ipc_remove_stale(const char *path);
-
-// We alias nni_posix_pipedesc to nni_plat_ipc_pipe.
-// We alias nni_posix_epdesc to nni_plat_ipc_ep.
-
-int
-nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode)
-{
- nni_posix_epdesc * ed;
- int rv;
- struct sockaddr_un sun;
-
- if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) {
- return (rv);
- }
- switch (mode) {
- case NNI_EP_MODE_DIAL:
- nni_posix_nn2sockaddr(&sun, sa);
- nni_posix_epdesc_set_remote(ed, &sun, sizeof(sun));
- break;
- case NNI_EP_MODE_LISTEN:
-
- if ((rv = nni_plat_ipc_remove_stale(sa->s_ipc.sa_path)) != 0) {
- return (rv);
- }
-
- nni_posix_nn2sockaddr(&sun, sa);
- nni_posix_epdesc_set_local(ed, &sun, sizeof(sun));
- break;
- default:
- nni_posix_epdesc_fini(ed);
- return (NNG_EINVAL);
- }
-
- *epp = (void *) ed;
- return (0);
-}
-
-void
-nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc_fini((void *) ep);
-}
-
-void
-nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc_close((void *) ep);
-}
-
-int
-nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits)
-{
- return (nni_posix_epdesc_set_permissions((void *) ep, (mode_t) bits));
-}
-
-int
-nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *attr)
-{
- NNI_ARG_UNUSED(ep);
- NNI_ARG_UNUSED(attr);
- return (NNG_ENOTSUP);
-}
-
-// UNIX DOMAIN SOCKETS -- these have names in the file namespace.
-// We are going to check to see if there was a name already there.
-// If there was, and nothing is listening (ECONNREFUSED), then we
-// will just try to cleanup the old socket. Note that this is not
-// perfect in all scenarios, so use this with caution.
-static int
-nni_plat_ipc_remove_stale(const char *path)
-{
- int fd;
- struct sockaddr_un sun;
- size_t sz;
-
- sun.sun_family = AF_UNIX;
- sz = sizeof(sun.sun_path);
-
- if (nni_strlcpy(sun.sun_path, path, sz) >= sz) {
- return (NNG_EADDRINVAL);
- }
-
- if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
-
- // There is an assumption here that connect() returns immediately
- // (even when non-blocking) when a server is absent. This seems
- // to be true for the platforms we've tried. If it doesn't work,
- // then the cleanup will fail. As this is supposed to be an
- // exceptional case, don't worry.
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if (connect(fd, (void *) &sun, sizeof(sun)) < 0) {
- if (errno == ECONNREFUSED) {
- (void) unlink(path);
- }
- }
- (void) close(fd);
- return (0);
-}
-
-int
-nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc *ed = (void *) ep;
-
- return (nni_posix_epdesc_listen(ed));
-}
-
-void
-nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_connect((void *) ep, aio);
-}
-
-void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_accept((void *) ep, aio);
-}
-
-void
-nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p)
-{
- nni_posix_pipedesc_fini((void *) p);
-}
-
-void
-nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p)
-{
- nni_posix_pipedesc_close((void *) p);
-}
-
-void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_send((void *) p, aio);
-}
-
-void
-nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_recv((void *) p, aio);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *p, uint64_t *uid)
-{
- int rv;
- uint64_t ignore;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, uid, &ignore, &ignore, &ignore)) != 0) {
- return (rv);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *p, uint64_t *gid)
-{
- int rv;
- uint64_t ignore;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, gid, &ignore, &ignore)) != 0) {
- return (rv);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *p, uint64_t *zid)
-{
- int rv;
- uint64_t ignore;
- uint64_t id;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, &ignore, &ignore, &id)) != 0) {
- return (rv);
- }
- if (id == (uint64_t) -1) {
- // NB: -1 is not a legal zone id (illumos/Solaris)
- return (NNG_ENOTSUP);
- }
- *zid = id;
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *p, uint64_t *pid)
-{
- int rv;
- uint64_t ignore;
- uint64_t id;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, &ignore, &id, &ignore)) != 0) {
- return (rv);
- }
- if (id == (uint64_t) -1) {
- // NB: -1 is not a legal process id
- return (NNG_ENOTSUP);
- }
- *pid = id;
- return (0);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h
new file mode 100644
index 00000000..caf7ed7b
--- /dev/null
+++ b/src/platform/posix/posix_ipc.h
@@ -0,0 +1,48 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+#include <sys/types.h> // For mode_t
+
+struct nni_ipc_conn {
+ nni_posix_pfd * pfd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
+ nni_aio * dial_aio;
+ nni_ipc_dialer *dialer;
+ nni_reap_item reap;
+};
+
+struct nni_ipc_dialer {
+ nni_list connq; // pending connections
+ bool closed;
+ nni_mtx mtx;
+};
+
+struct nni_ipc_listener {
+ nni_posix_pfd *pfd;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ char * path;
+ mode_t perms;
+ nni_mtx mtx;
+};
+
+extern int nni_posix_ipc_conn_init(nni_ipc_conn **, nni_posix_pfd *);
+extern void nni_posix_ipc_conn_start(nni_ipc_conn *);
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
new file mode 100644
index 00000000..2b46fb12
--- /dev/null
+++ b/src/platform/posix/posix_ipcconn.c
@@ -0,0 +1,494 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <unistd.h>
+#if defined(NNG_HAVE_GETPEERUCRED)
+#include <ucred.h>
+#elif defined(NNG_HAVE_LOCALPEERCRED)
+#include <sys/ucred.h>
+#endif
+
+#ifdef NNG_HAVE_ALLOCA
+#include <alloca.h>
+#endif
+
+#ifndef MSG_NOSIGNAL
+#define MSG_NOSIGNAL 0
+#endif
+
+#include "posix_ipc.h"
+
+static void
+ipc_conn_dowrite(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->writeq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov * aiov;
+ struct msghdr hdr;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ memset(&hdr, 0, sizeof(hdr));
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len > 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ hdr.msg_iovlen = niov;
+ hdr.msg_iov = iovec;
+
+ if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ nni_aio_bump_count(aio, n);
+ // We completed the entire operation on this aio.
+ // (Sendmsg never returns a partial result.)
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+static void
+ipc_conn_doread(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->readq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov *aiov;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len != 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ if (n == 0) {
+ // No bytes indicates a closed descriptor.
+ // This implicitly completes this (all!) aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ continue;
+ }
+
+ nni_aio_bump_count(aio, n);
+
+ // We completed the entire operation on this aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+void
+nni_ipc_conn_close(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ nni_aio *aio;
+ c->closed = true;
+ while (((aio = nni_list_first(&c->readq)) != NULL) ||
+ ((aio = nni_list_first(&c->writeq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_posix_pfd_close(c->pfd);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_ipc_conn *c = arg;
+
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
+ nni_ipc_conn_close(c);
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (events & POLLIN) {
+ ipc_conn_doread(c);
+ }
+ if (events & POLLOUT) {
+ ipc_conn_dowrite(c);
+ }
+ events = 0;
+ if (!nni_list_empty(&c->writeq)) {
+ events |= POLLOUT;
+ }
+ if (!nni_list_empty(&c->readq)) {
+ events |= POLLIN;
+ }
+ if ((!c->closed) && (events != 0)) {
+ nni_posix_pfd_arm(pfd, events);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&c->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
+{
+
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->writeq, aio);
+
+ if (nni_list_first(&c->writeq) == aio) {
+ ipc_conn_dowrite(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->writeq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLOUT);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->readq, aio);
+
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
+ if (nni_list_first(&c->readq) == aio) {
+ ipc_conn_doread(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->readq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLIN);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid,
+ uint64_t *prid, uint64_t *znid)
+{
+ int fd = nni_posix_pfd_fd(c->pfd);
+#if defined(NNG_HAVE_GETPEEREID)
+ uid_t uid;
+ gid_t gid;
+
+ if (getpeereid(fd, &uid, &gid) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = uid;
+ *egid = gid;
+ *prid = (uint64_t) -1;
+ *znid = (uint64_t) -1;
+ return (0);
+#elif defined(NNG_HAVE_GETPEERUCRED)
+ ucred_t *ucp = NULL;
+ if (getpeerucred(fd, &ucp) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = ucred_geteuid(ucp);
+ *egid = ucred_getegid(ucp);
+ *prid = ucred_getpid(ucp);
+ *znid = ucred_getzoneid(ucp);
+ ucred_free(ucp);
+ return (0);
+#elif defined(NNG_HAVE_SOPEERCRED)
+ struct ucred uc;
+ socklen_t len = sizeof(uc);
+ if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = uc.uid;
+ *egid = uc.gid;
+ *prid = uc.pid;
+ *znid = (uint64_t) -1;
+ return (0);
+#elif defined(NNG_HAVE_LOCALPEERCRED)
+ struct xucred xu;
+ socklen_t len = sizeof(xu);
+ if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = xu.cr_uid;
+ *egid = xu.cr_gid;
+ *prid = (uint64_t) -1;
+ *znid = (uint64_t) -1;
+#if defined(LOCAL_PEERPID) // present (undocumented) on macOS
+ {
+ pid_t pid;
+ if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERPID, &pid, &len) ==
+ 0) {
+ *prid = (uint64_t) pid;
+ }
+ }
+#endif // LOCAL_PEERPID
+ return (0);
+#else
+ if (fd < 0) {
+ return (NNG_ECLOSED);
+ }
+ NNI_ARG_UNUSED(euid);
+ NNI_ARG_UNUSED(egid);
+ NNI_ARG_UNUSED(prid);
+ NNI_ARG_UNUSED(znid);
+ return (NNG_ENOTSUP);
+#endif
+}
+int
+nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *uid)
+{
+ int rv;
+ uint64_t ignore;
+
+ if ((rv = ipc_conn_peerid(c, uid, &ignore, &ignore, &ignore)) != 0) {
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *gid)
+{
+ int rv;
+ uint64_t ignore;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, gid, &ignore, &ignore)) != 0) {
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *zid)
+{
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal zone id (illumos/Solaris)
+ return (NNG_ENOTSUP);
+ }
+ *zid = id;
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid)
+{
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal process id
+ return (NNG_ENOTSUP);
+ }
+ *pid = id;
+ return (0);
+}
+
+int
+nni_posix_ipc_conn_init(nni_ipc_conn **cp, nni_posix_pfd *pfd)
+{
+ nni_ipc_conn *c;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ c->closed = false;
+ c->pfd = pfd;
+
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->readq);
+ nni_aio_list_init(&c->writeq);
+
+ *cp = c;
+ return (0);
+}
+
+void
+nni_posix_ipc_conn_start(nni_ipc_conn *c)
+{
+ nni_posix_pfd_set_cb(c->pfd, ipc_conn_cb, c);
+}
+
+void
+nni_ipc_conn_fini(nni_ipc_conn *c)
+{
+ nni_ipc_conn_close(c);
+ nni_posix_pfd_fini(c->pfd);
+ nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+ nni_mtx_fini(&c->mtx);
+
+ NNI_FREE_STRUCT(c);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c
new file mode 100644
index 00000000..13732911
--- /dev/null
+++ b/src/platform/posix/posix_ipcdial.c
@@ -0,0 +1,238 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_ipc.h"
+
+// Dialer stuff.
+int
+nni_ipc_dialer_init(nni_ipc_dialer **dp)
+{
+ nni_ipc_dialer *d;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&d->mtx);
+ d->closed = false;
+ nni_aio_list_init(&d->connq);
+ *dp = d;
+ return (0);
+}
+
+void
+nni_ipc_dialer_close(nni_ipc_dialer *d)
+{
+ nni_mtx_lock(&d->mtx);
+ if (!d->closed) {
+ nni_aio *aio;
+ d->closed = true;
+ while ((aio = nni_list_first(&d->connq)) != NULL) {
+ nni_ipc_conn *c;
+ nni_list_remove(&d->connq, aio);
+ if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_ipc_conn_close(c);
+ nni_reap(
+ &c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ }
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_ipc_dialer_fini(nni_ipc_dialer *d)
+{
+ nni_ipc_dialer_close(d);
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+static void
+ipc_dialer_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_dialer *d = nni_aio_get_prov_data(aio);
+ nni_ipc_conn * c;
+
+ nni_mtx_lock(&d->mtx);
+ if ((!nni_aio_list_active(aio)) ||
+ ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ nni_aio_finish_error(aio, rv);
+ nni_ipc_conn_fini(c);
+}
+
+static void
+ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
+{
+ nni_ipc_conn * c = arg;
+ nni_ipc_dialer *d = c->dialer;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&d->mtx);
+ aio = c->dial_aio;
+ if ((aio == NULL) || (!nni_aio_list_active(aio))) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ if (ev & POLLNVAL) {
+ rv = EBADF;
+
+ } else {
+ socklen_t sz = sizeof(int);
+ int fd = nni_posix_pfd_fd(pfd);
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ rv = errno;
+ }
+ if (rv == EINPROGRESS) {
+ // Connection still in progress, come back
+ // later.
+ nni_mtx_unlock(&d->mtx);
+ return;
+ } else if (rv != 0) {
+ rv = nni_plat_errno(rv);
+ }
+ }
+
+ c->dial_aio = NULL;
+ nni_aio_list_remove(aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ if (rv != 0) {
+ nni_ipc_conn_close(c);
+ nni_ipc_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+// We don't give local address binding support. Outbound dialers always
+// get an ephemeral port.
+void
+nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+{
+ nni_ipc_conn * c;
+ nni_posix_pfd * pfd = NULL;
+ struct sockaddr_storage ss;
+ size_t sslen;
+ int fd;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ (ss.ss_family != AF_UNIX)) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
+
+ if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ nni_aio_finish_error(aio, nni_plat_errno(errno));
+ return;
+ }
+
+ // This arranges for the fd to be in nonblocking mode, and adds the
+ // pollfd to the list.
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ c->dialer = d;
+ nni_posix_pfd_set_cb(pfd, ipc_dialer_cb, c);
+
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ rv = NNG_ECLOSED;
+ goto error;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_dialer_cancel, d)) != 0) {
+ goto error;
+ }
+ if ((rv = connect(fd, (void *) &ss, sslen)) != 0) {
+ if (errno != EINPROGRESS) {
+ if (errno == ENOENT) {
+ // No socket present means nobody listening.
+ rv = NNG_ECONNREFUSED;
+ } else {
+ rv = nni_plat_errno(errno);
+ }
+ goto error;
+ }
+ // Asynchronous connect.
+ if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
+ goto error;
+ }
+ c->dial_aio = aio;
+ nni_aio_set_prov_extra(aio, 0, c);
+ nni_list_append(&d->connq, aio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ // Immediate connect, cool! This probably only happens
+ // on loopback, and probably not on every platform.
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ return;
+
+error:
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_reap(&c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ nni_aio_finish_error(aio, rv);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c
new file mode 100644
index 00000000..4e33a08f
--- /dev/null
+++ b/src/platform/posix/posix_ipclisten.c
@@ -0,0 +1,373 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_ipc.h"
+
+int
+nni_ipc_listener_init(nni_ipc_listener **lp)
+{
+ nni_ipc_listener *l;
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&l->mtx);
+
+ l->pfd = NULL;
+ l->closed = false;
+ l->started = false;
+ l->perms = 0;
+
+ nni_aio_list_init(&l->acceptq);
+ *lp = l;
+ return (0);
+}
+
+static void
+ipc_listener_doclose(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+ char * path;
+
+ l->closed = true;
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+
+ if (l->pfd != NULL) {
+ nni_posix_pfd_close(l->pfd);
+ }
+ if (l->started && ((path = l->path) != NULL)) {
+ l->path = NULL;
+ (void) unlink(path);
+ nni_strfree(path);
+ }
+}
+
+void
+nni_ipc_listener_close(nni_ipc_listener *l)
+{
+ nni_mtx_lock(&l->mtx);
+ ipc_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ipc_listener_doaccept(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ int newfd;
+ int fd;
+ int rv;
+ nni_posix_pfd *pfd;
+ nni_ipc_conn * c;
+
+ fd = nni_posix_pfd_fd(l->pfd);
+
+#ifdef NNG_USE_ACCEPT4
+ newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
+ if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
+ newfd = accept(fd, NULL, NULL);
+ }
+#else
+ newfd = accept(fd, NULL, NULL);
+#endif
+ if (newfd < 0) {
+ switch (errno) {
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ rv = nni_posix_pfd_arm(l->pfd, POLLIN);
+ if (rv != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ // Come back later...
+ return;
+ case ECONNABORTED:
+ case ECONNRESET:
+ // Eat them, they aren't interesting.
+ continue;
+ default:
+ // Error this one, but keep moving to the next.
+ rv = nni_plat_errno(errno);
+ NNI_ASSERT(rv != 0);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ }
+
+ if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ close(newfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ nni_aio_list_remove(aio);
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+static void
+ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_ipc_listener *l = arg;
+ NNI_ARG_UNUSED(pfd);
+
+ nni_mtx_lock(&l->mtx);
+ if (events & POLLNVAL) {
+ ipc_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+
+ // Anything else will turn up in accept.
+ ipc_listener_doaccept(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ipc_listener_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_listener *l = nni_aio_get_prov_data(aio);
+
+ // This is dead easy, because we'll ignore the completion if there
+ // isn't anything to do the accept on!
+ NNI_ASSERT(rv != 0);
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+static int
+ipc_remove_stale(const char *path)
+{
+ int fd;
+ struct sockaddr_un sun;
+ size_t sz;
+
+ sun.sun_family = AF_UNIX;
+ sz = sizeof(sun.sun_path);
+
+ if (nni_strlcpy(sun.sun_path, path, sz) >= sz) {
+ return (NNG_EADDRINVAL);
+ }
+
+ if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ // There is an assumption here that connect() returns immediately
+ // (even when non-blocking) when a server is absent. This seems
+ // to be true for the platforms we've tried. If it doesn't work,
+ // then the cleanup will fail. As this is supposed to be an
+ // exceptional case, don't worry.
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ if (connect(fd, (void *) &sun, sizeof(sun)) < 0) {
+ if (errno == ECONNREFUSED) {
+ (void) unlink(path);
+ }
+ }
+ (void) close(fd);
+ return (0);
+}
+
+int
+nni_ipc_listener_set_permissions(nni_ipc_listener *l, int mode)
+{
+ if ((mode & S_IFMT) != 0) {
+ return (NNG_EINVAL);
+ }
+ mode |= S_IFSOCK; // set IFSOCK to ensure non-zero
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ l->perms = mode;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+int
+nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *sd)
+{
+ NNI_ARG_UNUSED(l);
+ NNI_ARG_UNUSED(sd);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
+{
+ socklen_t len;
+ struct sockaddr_storage ss;
+ int rv;
+ int fd;
+ nni_posix_pfd * pfd;
+ char * path;
+
+ if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ (ss.ss_family != AF_UNIX)) {
+ return (NNG_EADDRINVAL);
+ }
+
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ESTATE);
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ path = nni_strdup(sa->s_ipc.sa_path);
+ if (path == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+
+ if (((rv = ipc_remove_stale(path)) != 0) ||
+ ((rv = nni_posix_pfd_init(&pfd, fd)) != 0)) {
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ (void) close(fd);
+ return (rv);
+ }
+
+ if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ if (((l->perms != 0) && (chmod(path, l->perms & ~S_IFMT) != 0)) ||
+ (listen(fd, 128) != 0)) {
+ rv = nni_plat_errno(errno);
+ (void) unlink(path);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ nni_posix_pfd_set_cb(pfd, ipc_listener_cb, l);
+
+ l->pfd = pfd;
+ l->started = true;
+ l->path = path;
+ nni_mtx_unlock(&l->mtx);
+
+ return (0);
+}
+
+void
+nni_ipc_listener_fini(nni_ipc_listener *l)
+{
+ nni_posix_pfd *pfd;
+
+ nni_mtx_lock(&l->mtx);
+ ipc_listener_doclose(l);
+ pfd = l->pfd;
+ nni_mtx_unlock(&l->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+void
+nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
+{
+ int rv;
+
+ // Accept is simpler than the connect case. With accept we just
+ // need to wait for the socket to be readable to indicate an incoming
+ // connection is ready for us. There isn't anything else for us to
+ // do really, as that will have been done in listen.
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_listener_cancel, l)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&l->acceptq, aio);
+ if (nni_list_first(&l->acceptq) == aio) {
+ ipc_listener_doaccept(l);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
deleted file mode 100644
index b11036ea..00000000
--- a/src/platform/posix/posix_pipedesc.c
+++ /dev/null
@@ -1,503 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_pollq.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <poll.h>
-#include <stdbool.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <unistd.h>
-#if defined(NNG_HAVE_GETPEERUCRED)
-#include <ucred.h>
-#elif defined(NNG_HAVE_LOCALPEERCRED)
-#include <sys/ucred.h>
-#include <sys/un.h>
-#endif
-#ifdef NNG_HAVE_ALLOCA
-#include <alloca.h>
-#endif
-
-// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open
-// file descriptor for TCP socket, etc.) This contains the list of pending
-// aios for that underlying socket, as well as the socket itself.
-struct nni_posix_pipedesc {
- nni_posix_pfd *pfd;
- nni_list readq;
- nni_list writeq;
- bool closed;
- nni_mtx mtx;
-};
-
-static void
-nni_posix_pipedesc_finish(nni_aio *aio, int rv)
-{
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, nni_aio_count(aio));
-}
-
-static void
-nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
-
- pd->closed = true;
- while ((aio = nni_list_first(&pd->readq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
- while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
- nni_posix_pfd_close(pd->pfd);
-}
-
-static void
-nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
- int fd;
-
- fd = nni_posix_pfd_fd(pd->pfd);
- if ((fd < 0) || (pd->closed)) {
- return;
- }
-
- while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- unsigned i;
- int n;
- int niov;
- unsigned naiov;
- nni_iov * aiov;
- struct msghdr hdr;
-#ifdef NNG_HAVE_ALLOCA
- struct iovec *iovec;
-#else
- struct iovec iovec[16];
-#endif
-
- memset(&hdr, 0, sizeof(hdr));
-
- nni_aio_get_iov(aio, &naiov, &aiov);
-
-#ifdef NNG_HAVE_ALLOCA
- if (naiov > 64) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
- iovec = alloca(naiov * sizeof(*iovec));
-#else
- if (naiov > NNI_NUM_ELEMENTS(iovec)) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
-#endif
-
- for (niov = 0, i = 0; i < naiov; i++) {
- if (aiov[i].iov_len > 0) {
- iovec[niov].iov_len = aiov[i].iov_len;
- iovec[niov].iov_base = aiov[i].iov_buf;
- niov++;
- }
- }
-
-#ifndef MSG_NOSIGNAL
-#define MSG_NOSIGNAL 0
-#endif
-
- hdr.msg_iovlen = niov;
- hdr.msg_iov = iovec;
-
- if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
- switch (errno) {
- case EINTR:
- continue;
- case EAGAIN:
-#ifdef EWOULDBLOCK
-#if EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
-#endif
-#endif
- return;
- default:
- nni_posix_pipedesc_finish(
- aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
- }
- }
-
- nni_aio_bump_count(aio, n);
- // We completed the entire operation on this aioq.
- // (Sendmsg never returns a partial result.)
- nni_posix_pipedesc_finish(aio, 0);
-
- // Go back to start of loop to see if there is another
- // aio ready for us to process.
- }
-}
-
-static void
-nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
- int fd;
-
- fd = nni_posix_pfd_fd(pd->pfd);
- if ((fd < 0) || (pd->closed)) {
- return;
- }
-
- while ((aio = nni_list_first(&pd->readq)) != NULL) {
- unsigned i;
- int n;
- int niov;
- unsigned naiov;
- nni_iov *aiov;
-#ifdef NNG_HAVE_ALLOCA
- struct iovec *iovec;
-#else
- struct iovec iovec[16];
-#endif
-
- nni_aio_get_iov(aio, &naiov, &aiov);
-#ifdef NNG_HAVE_ALLOCA
- if (naiov > 64) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
- iovec = alloca(naiov * sizeof(*iovec));
-#else
- if (naiov > NNI_NUM_ELEMENTS(iovec)) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
-#endif
- for (niov = 0, i = 0; i < naiov; i++) {
- if (aiov[i].iov_len != 0) {
- iovec[niov].iov_len = aiov[i].iov_len;
- iovec[niov].iov_base = aiov[i].iov_buf;
- niov++;
- }
- }
-
- if ((n = readv(fd, iovec, niov)) < 0) {
- switch (errno) {
- case EINTR:
- continue;
- case EAGAIN:
- return;
- default:
- nni_posix_pipedesc_finish(
- aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
- }
- }
-
- if (n == 0) {
- // No bytes indicates a closed descriptor.
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- nni_posix_pipedesc_doclose(pd);
- return;
- }
-
- nni_aio_bump_count(aio, n);
-
- // We completed the entire operation on this aioq.
- nni_posix_pipedesc_finish(aio, 0);
-
- // Go back to start of loop to see if there is another
- // aio ready for us to process.
- }
-}
-
-static void
-nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_pipedesc *pd = arg;
-
- nni_mtx_lock(&pd->mtx);
- if (events & POLLIN) {
- nni_posix_pipedesc_doread(pd);
- }
- if (events & POLLOUT) {
- nni_posix_pipedesc_dowrite(pd);
- }
- if (events & (POLLHUP | POLLERR | POLLNVAL)) {
- nni_posix_pipedesc_doclose(pd);
- } else {
- events = 0;
- if (!nni_list_empty(&pd->writeq)) {
- events |= POLLOUT;
- }
- if (!nni_list_empty(&pd->readq)) {
- events |= POLLIN;
- }
- if ((!pd->closed) && (events != 0)) {
- nni_posix_pfd_arm(pfd, events);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
-{
- // NB: Events may still occur.
- nni_mtx_lock(&pd->mtx);
- nni_posix_pipedesc_doclose(pd);
- nni_mtx_unlock(&pd->mtx);
-}
-
-static void
-nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
-{
- nni_posix_pipedesc *pd = nni_aio_get_prov_data(aio);
-
- nni_mtx_lock(&pd->mtx);
- if (nni_aio_list_active(aio)) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&pd->mtx);
-
- if (pd->closed) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
-
- if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&pd->readq, aio);
-
- // If we are only job on the list, go ahead and try to do an
- // immediate transfer. This allows for faster completions in
- // many cases. We also need not arm a list if it was already
- // armed.
- if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pipedesc_doread(pd);
- // If we are still the first thing on the list, that
- // means we didn't finish the job, so arm the poller to
- // complete us.
- if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pfd_arm(pd->pfd, POLLIN);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&pd->mtx);
-
- if (pd->closed) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
-
- if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&pd->writeq, aio);
-
- if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pipedesc_dowrite(pd);
- // If we are still the first thing on the list, that
- // means we didn't finish the job, so arm the poller to
- // complete us.
- if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pfd_arm(pd->pfd, POLLOUT);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-int
-nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (getpeername(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (getsockname(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay)
-{
- int val = nodelay ? 1 : 0;
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-int
-nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep)
-{
- int val = keep ? 1 : 0;
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-int
-nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
- uint64_t *egid, uint64_t *prid, uint64_t *znid)
-{
- int fd = nni_posix_pfd_fd(pd->pfd);
-#if defined(NNG_HAVE_GETPEEREID)
- uid_t uid;
- gid_t gid;
-
- if (getpeereid(fd, &uid, &gid) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = uid;
- *egid = gid;
- *prid = (uint64_t) -1;
- *znid = (uint64_t) -1;
- return (0);
-#elif defined(NNG_HAVE_GETPEERUCRED)
- ucred_t *ucp = NULL;
- if (getpeerucred(fd, &ucp) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = ucred_geteuid(ucp);
- *egid = ucred_getegid(ucp);
- *prid = ucred_getpid(ucp);
- *znid = ucred_getzoneid(ucp);
- ucred_free(ucp);
- return (0);
-#elif defined(NNG_HAVE_SOPEERCRED)
- struct ucred uc;
- socklen_t len = sizeof(uc);
- if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = uc.uid;
- *egid = uc.gid;
- *prid = uc.pid;
- *znid = (uint64_t) -1;
- return (0);
-#elif defined(NNG_HAVE_LOCALPEERCRED)
- struct xucred xu;
- socklen_t len = sizeof(xu);
- if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = xu.cr_uid;
- *egid = xu.cr_gid;
- *prid = (uint64_t) -1; // XXX: macOS has undocumented
- // LOCAL_PEERPID...
- *znid = (uint64_t) -1;
- return (0);
-#else
- if (fd < 0) {
- return (NNG_ECLOSED);
- }
- NNI_ARG_UNUSED(euid);
- NNI_ARG_UNUSED(egid);
- NNI_ARG_UNUSED(prid);
- NNI_ARG_UNUSED(znid);
- return (NNG_ENOTSUP);
-#endif
-}
-
-int
-nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd)
-{
- nni_posix_pipedesc *pd;
-
- if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- pd->closed = false;
- pd->pfd = pfd;
-
- nni_mtx_init(&pd->mtx);
- nni_aio_list_init(&pd->readq);
- nni_aio_list_init(&pd->writeq);
-
- nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd);
-
- *pdp = pd;
- return (0);
-}
-
-void
-nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
-{
- nni_posix_pipedesc_close(pd);
- nni_posix_pfd_fini(pd->pfd);
- pd->pfd = NULL;
- nni_mtx_fini(&pd->mtx);
-
- NNI_FREE_STRUCT(pd);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index 63d858c2..f78d0b6b 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_USE_POSIX_RESOLV_GAI
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <ctype.h>
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index 5cd1887a..c0352c55 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index 7f627361..21f6ecfe 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index cdcbecd9..8c186885 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
@@ -151,6 +150,7 @@ static void
tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_tcp_listener *l = arg;
+ NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&l->mtx);
if (events & POLLNVAL) {
@@ -158,7 +158,6 @@ tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
nni_mtx_unlock(&l->mtx);
return;
}
- NNI_ASSERT(pfd == l->pfd);
// Anything else will turn up in accept.
tcp_listener_doaccept(l);
@@ -212,7 +211,7 @@ nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa)
if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
nni_mtx_unlock(&l->mtx);
- nni_posix_pfd_fini(pfd);
+ (void) close(fd);
return (rv);
}
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 759bdb96..873a02a1 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include "platform/posix/posix_pollq.h"
#include <errno.h>
@@ -177,7 +176,7 @@ static void
nni_posix_udp_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_plat_udp *udp = arg;
- NNI_ASSERT(pfd == udp->udp_pfd);
+ NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&udp->udp_mtx);
if (events & POLLIN) {