aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-07-16 17:10:47 -0700
committerGarrett D'Amore <garrett@damore.org>2018-07-18 13:25:33 -0700
commitb310f712828962bf3187caf3bfe064c3531c5628 (patch)
treef99ceb5851f601c93b0305617d692722f2978dd5 /src/platform
parent3f40a08eab60df77dc61ae0350e59f36e8d0ed16 (diff)
downloadnng-b310f712828962bf3187caf3bfe064c3531c5628.tar.gz
nng-b310f712828962bf3187caf3bfe064c3531c5628.tar.bz2
nng-b310f712828962bf3187caf3bfe064c3531c5628.zip
fixes #595 mutex leak and other minor errors in TCP
fixes #596 POSIX IPC should move away from pipedesc/epdesc fixes #598 TLS and TCP listeners could support NNG_OPT_LOCADDR fixes #594 Windows IPC should use "new style" win_io code. fixes #597 macOS could support PEER PID This large change set cleans up the IPC support on Windows and POSIX. This has the beneficial impact of significantly reducing the complexity of the code, reducing locking, increasing concurrency (multiple dial and accepts can be outstanding now), reducing context switches (we complete thins synchronously now). While here we have added some missing option support, and fixed a few more bugs that we found in the TCP code changes from last week.
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_epdesc.c597
-rw-r--r--src/platform/posix/posix_ipc.c250
-rw-r--r--src/platform/posix/posix_ipc.h48
-rw-r--r--src/platform/posix/posix_ipcconn.c494
-rw-r--r--src/platform/posix/posix_ipcdial.c238
-rw-r--r--src/platform/posix/posix_ipclisten.c373
-rw-r--r--src/platform/posix/posix_pipedesc.c503
-rw-r--r--src/platform/posix/posix_resolv_gai.c1
-rw-r--r--src/platform/posix/posix_tcpconn.c1
-rw-r--r--src/platform/posix/posix_tcpdial.c1
-rw-r--r--src/platform/posix/posix_tcplisten.c5
-rw-r--r--src/platform/posix/posix_udp.c3
-rw-r--r--src/platform/windows/win_impl.h3
-rw-r--r--src/platform/windows/win_io.c11
-rw-r--r--src/platform/windows/win_ipc.c673
-rw-r--r--src/platform/windows/win_ipc.h62
-rw-r--r--src/platform/windows/win_ipcconn.c388
-rw-r--r--src/platform/windows/win_ipcdial.c265
-rw-r--r--src/platform/windows/win_ipclisten.c296
-rw-r--r--src/platform/windows/win_tcpconn.c15
-rw-r--r--src/platform/windows/win_tcpdial.c7
-rw-r--r--src/platform/windows/win_tcplisten.c12
22 files changed, 2186 insertions, 2060 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
deleted file mode 100644
index d796765a..00000000
--- a/src/platform/posix/posix_epdesc.c
+++ /dev/null
@@ -1,597 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_pollq.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <poll.h>
-#include <stdbool.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-#ifdef sun
-#undef sun
-#endif
-
-#ifdef SOCK_CLOEXEC
-#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define NNI_STREAM_SOCKTYPE SOCK_STREAM
-#endif
-
-struct nni_posix_epdesc {
- nni_posix_pfd * pfd;
- nni_list connectq;
- nni_list acceptq;
- bool closed;
- bool started;
- bool ipcbound; // if true unlink socket on exit
- struct sockaddr_storage locaddr;
- struct sockaddr_storage remaddr;
- socklen_t loclen;
- socklen_t remlen;
- mode_t perms; // UNIX sockets only
- int mode; // end point mode (dialer/listener)
- nni_mtx mtx;
-};
-
-static void nni_epdesc_connect_cb(nni_posix_pfd *, int, void *);
-static void nni_epdesc_accept_cb(nni_posix_pfd *, int, void *);
-
-static void
-nni_epdesc_cancel(nni_aio *aio, int rv)
-{
- nni_posix_epdesc *ed = nni_aio_get_prov_data(aio);
- nni_posix_pfd * pfd = NULL;
-
- NNI_ASSERT(rv != 0);
- nni_mtx_lock(&ed->mtx);
- if (nni_aio_list_active(aio)) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- }
- if ((ed->mode == NNI_EP_MODE_DIAL) && nni_list_empty(&ed->connectq) &&
- ((pfd = ed->pfd) != NULL)) {
- nni_posix_pfd_close(pfd);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-static void
-nni_epdesc_finish(nni_aio *aio, int rv, nni_posix_pfd *newpfd)
-{
- nni_posix_pipedesc *pd = NULL;
-
- // acceptq or connectq.
- nni_aio_list_remove(aio);
-
- if (rv != 0) {
- NNI_ASSERT(newpfd == NULL);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- NNI_ASSERT(newpfd != NULL);
- if ((rv = nni_posix_pipedesc_init(&pd, newpfd)) != 0) {
- nni_posix_pfd_fini(newpfd);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_set_output(aio, 0, pd);
- nni_aio_finish(aio, 0, 0);
-}
-
-static void
-nni_epdesc_doaccept(nni_posix_epdesc *ed)
-{
- nni_aio *aio;
-
- while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- int newfd;
- int fd;
- int rv;
- nni_posix_pfd *pfd;
-
- fd = nni_posix_pfd_fd(ed->pfd);
-
-#ifdef NNG_USE_ACCEPT4
- newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
- if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
- newfd = accept(fd, NULL, NULL);
- }
-#else
- newfd = accept(fd, NULL, NULL);
-#endif
- if (newfd < 0) {
- switch (errno) {
- case EAGAIN:
-#ifdef EWOULDBLOCK
-#if EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
-#endif
-#endif
- rv = nni_posix_pfd_arm(ed->pfd, POLLIN);
- if (rv != 0) {
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
- // Come back later...
- return;
- case ECONNABORTED:
- case ECONNRESET:
- // Eat them, they aren't interesting.
- continue;
- default:
- // Error this one, but keep moving to the next.
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
- close(newfd);
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
-
- nni_epdesc_finish(aio, 0, pfd);
- }
-}
-
-static void
-nni_epdesc_doclose(nni_posix_epdesc *ed)
-{
- nni_aio *aio;
-
- ed->closed = true;
- while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- nni_epdesc_finish(aio, NNG_ECLOSED, 0);
- }
- while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- nni_epdesc_finish(aio, NNG_ECLOSED, 0);
- }
-
- if (ed->pfd != NULL) {
-
- nni_posix_pfd_close(ed->pfd);
- }
-
- // clean up stale UNIX socket when closing the server.
- if (ed->ipcbound) {
- struct sockaddr_un *sun = (void *) &ed->locaddr;
- (void) unlink(sun->sun_path);
- }
-}
-
-static void
-nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_epdesc *ed = arg;
-
- NNI_ARG_UNUSED(pfd);
-
- nni_mtx_lock(&ed->mtx);
- if (events & POLLNVAL) {
- nni_epdesc_doclose(ed);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
-
- // Anything else will turn up in accept.
- nni_epdesc_doaccept(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-void
-nni_posix_epdesc_close(nni_posix_epdesc *ed)
-{
- nni_mtx_lock(&ed->mtx);
- nni_epdesc_doclose(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_listen(nni_posix_epdesc *ed)
-{
- int len;
- struct sockaddr_storage *ss;
- int rv;
- int fd;
- nni_posix_pfd * pfd;
-
- nni_mtx_lock(&ed->mtx);
-
- if (ed->started) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ESTATE);
- }
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ECLOSED);
- }
- if ((len = ed->loclen) == 0) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EADDRINVAL);
- }
-
- ss = &ed->locaddr;
- len = ed->loclen;
-
- if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- nni_mtx_unlock(&ed->mtx);
- return (nni_plat_errno(errno));
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
-#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL)
- if (ss->ss_family != AF_UNIX) {
- int on = 1;
- // If for some reason this doesn't work, it's probably ok.
- // Second bind will fail.
- (void) setsockopt(
- fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
- }
-#endif
-
- if (bind(fd, (struct sockaddr *) ss, len) < 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
- // For UNIX domain sockets, optionally set the permission bits.
- // This is done after the bind and before listen, and on the file
- // rather than the file descriptor.
- // Experiments have shown that chmod() works correctly, provided that
- // it is done *before* the listen() operation, whereas fchmod seems to
- // have no impact. This behavior was observed on both macOS and Linux.
- // YMMV on other platforms.
- if (ss->ss_family == AF_UNIX) {
- ed->ipcbound = true;
- if (ed->perms != 0) {
- struct sockaddr_un *sun = (void *) ss;
- mode_t perms = ed->perms & ~(S_IFMT);
- if ((rv = chmod(sun->sun_path, perms)) != 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
- }
- }
-
- // Listen -- 128 depth is probably sufficient. If it isn't, other
- // bad things are going to happen.
- if (listen(fd, 128) != 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
- nni_posix_pfd_set_cb(pfd, nni_epdesc_accept_cb, ed);
-
- ed->pfd = pfd;
- ed->started = true;
- nni_mtx_unlock(&ed->mtx);
-
- return (0);
-}
-
-void
-nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
-{
- int rv;
-
- // Accept is simpler than the connect case. With accept we just
- // need to wait for the socket to be readable to indicate an incoming
- // connection is ready for us. There isn't anything else for us to
- // do really, as that will have been done in listen.
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
-
- if (!ed->started) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ESTATE);
- return;
- }
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
- if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&ed->acceptq, aio);
- if (nni_list_first(&ed->acceptq) == aio) {
- nni_epdesc_doaccept(ed);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_sockname(nni_posix_epdesc *ed, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = -1;
-
- nni_mtx_lock(&ed->mtx);
- if (ed->pfd != NULL) {
- fd = nni_posix_pfd_fd(ed->pfd);
- }
- nni_mtx_unlock(&ed->mtx);
-
- if (getsockname(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-static void
-nni_epdesc_connect_start(nni_posix_epdesc *ed)
-{
- nni_posix_pfd *pfd;
- int fd;
- int rv;
- nni_aio * aio;
-
-loop:
- if ((aio = nni_list_first(&ed->connectq)) == NULL) {
- return;
- }
-
- NNI_ASSERT(ed->pfd == NULL);
- if (ed->closed) {
- nni_epdesc_finish(aio, NNG_ECLOSED, NULL);
- goto loop;
- }
- ed->started = true;
-
- if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- goto loop;
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- (void) close(fd);
- nni_epdesc_finish(aio, rv, NULL);
- goto loop;
- }
- // Possibly bind.
- if ((ed->loclen != 0) &&
- (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) {
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
-
- if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) {
- // Immediate connect, cool! This probably only happens on
- // loopback, and probably not on every platform.
- nni_epdesc_finish(aio, 0, pfd);
- goto loop;
- }
-
- if (errno != EINPROGRESS) {
- // Some immediate failure occurred.
- if (errno == ENOENT) { // For UNIX domain sockets
- rv = NNG_ECONNREFUSED;
- } else {
- rv = nni_plat_errno(errno);
- }
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
- nni_posix_pfd_set_cb(pfd, nni_epdesc_connect_cb, ed);
- if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
- ed->pfd = pfd;
- // all done... wait for this to signal via callback
-}
-
-void
-nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
- if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- ed->started = true;
- nni_list_append(&ed->connectq, aio);
- if (nni_list_first(&ed->connectq) == aio) {
- // If there was a stale pfd (probably from an aborted or
- // canceled connect attempt), discard it so we start fresh.
- if (ed->pfd != NULL) {
- nni_posix_pfd_fini(ed->pfd);
- ed->pfd = NULL;
- }
- nni_epdesc_connect_start(ed);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-static void
-nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_epdesc *ed = arg;
- nni_aio * aio;
- socklen_t sz;
- int rv;
- int fd;
-
- nni_mtx_lock(&ed->mtx);
- if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) ||
- (pfd != ed->pfd)) {
- // Spurious completion. Just ignore it.
- nni_mtx_unlock(&ed->mtx);
- return;
- }
-
- fd = nni_posix_pfd_fd(pfd);
- sz = sizeof(rv);
-
- if ((events & POLLNVAL) != 0) {
- rv = EBADF;
-
- } else if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
- rv = errno;
- }
-
- switch (rv) {
- case 0:
- // Good connect!
- ed->pfd = NULL;
- nni_epdesc_finish(aio, 0, pfd);
- break;
- case EINPROGRESS: // still connecting... come back later
- nni_mtx_unlock(&ed->mtx);
- return;
- default:
- ed->pfd = NULL;
- nni_epdesc_finish(aio, nni_plat_errno(rv), NULL);
- nni_posix_pfd_fini(pfd);
- break;
- }
-
- // Start another connect running, if any is waiting.
- nni_epdesc_connect_start(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode)
-{
- nni_posix_epdesc *ed;
-
- if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- nni_mtx_init(&ed->mtx);
-
- ed->pfd = NULL;
- ed->closed = false;
- ed->started = false;
- ed->perms = 0; // zero means use default (no change)
- ed->mode = mode;
-
- nni_aio_list_init(&ed->connectq);
- nni_aio_list_init(&ed->acceptq);
- *edp = ed;
- return (0);
-}
-
-void
-nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, size_t len)
-{
- if ((len < 1) || (len > sizeof(struct sockaddr_storage))) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- memcpy(&ed->locaddr, sa, len);
- ed->loclen = len;
- nni_mtx_unlock(&ed->mtx);
-}
-
-void
-nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, size_t len)
-{
- if ((len < 1) || (len > sizeof(struct sockaddr_storage))) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- memcpy(&ed->remaddr, sa, len);
- ed->remlen = len;
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_set_permissions(nni_posix_epdesc *ed, mode_t mode)
-{
- nni_mtx_lock(&ed->mtx);
- if (ed->mode != NNI_EP_MODE_LISTEN) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ENOTSUP);
- }
- if (ed->started) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EBUSY);
- }
- if ((mode & S_IFMT) != 0) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EINVAL);
- }
- ed->perms = mode | S_IFSOCK; // we set IFSOCK to ensure non-zero
- nni_mtx_unlock(&ed->mtx);
- return (0);
-}
-
-void
-nni_posix_epdesc_fini(nni_posix_epdesc *ed)
-{
- nni_posix_pfd *pfd;
-
- nni_mtx_lock(&ed->mtx);
- nni_epdesc_doclose(ed);
- pfd = ed->pfd;
- nni_mtx_unlock(&ed->mtx);
-
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
- nni_mtx_fini(&ed->mtx);
- NNI_FREE_STRUCT(ed);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
deleted file mode 100644
index 5ba3c8fb..00000000
--- a/src/platform/posix/posix_ipc.c
+++ /dev/null
@@ -1,250 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-#ifdef SOCK_CLOEXEC
-#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define NNI_STREAM_SOCKTYPE SOCK_STREAM
-#endif
-
-// Solaris/SunOS systems define this, which collides with our symbol
-// names. Just undefine it now.
-#ifdef sun
-#undef sun
-#endif
-
-static int nni_plat_ipc_remove_stale(const char *path);
-
-// We alias nni_posix_pipedesc to nni_plat_ipc_pipe.
-// We alias nni_posix_epdesc to nni_plat_ipc_ep.
-
-int
-nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode)
-{
- nni_posix_epdesc * ed;
- int rv;
- struct sockaddr_un sun;
-
- if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) {
- return (rv);
- }
- switch (mode) {
- case NNI_EP_MODE_DIAL:
- nni_posix_nn2sockaddr(&sun, sa);
- nni_posix_epdesc_set_remote(ed, &sun, sizeof(sun));
- break;
- case NNI_EP_MODE_LISTEN:
-
- if ((rv = nni_plat_ipc_remove_stale(sa->s_ipc.sa_path)) != 0) {
- return (rv);
- }
-
- nni_posix_nn2sockaddr(&sun, sa);
- nni_posix_epdesc_set_local(ed, &sun, sizeof(sun));
- break;
- default:
- nni_posix_epdesc_fini(ed);
- return (NNG_EINVAL);
- }
-
- *epp = (void *) ed;
- return (0);
-}
-
-void
-nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc_fini((void *) ep);
-}
-
-void
-nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc_close((void *) ep);
-}
-
-int
-nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits)
-{
- return (nni_posix_epdesc_set_permissions((void *) ep, (mode_t) bits));
-}
-
-int
-nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *attr)
-{
- NNI_ARG_UNUSED(ep);
- NNI_ARG_UNUSED(attr);
- return (NNG_ENOTSUP);
-}
-
-// UNIX DOMAIN SOCKETS -- these have names in the file namespace.
-// We are going to check to see if there was a name already there.
-// If there was, and nothing is listening (ECONNREFUSED), then we
-// will just try to cleanup the old socket. Note that this is not
-// perfect in all scenarios, so use this with caution.
-static int
-nni_plat_ipc_remove_stale(const char *path)
-{
- int fd;
- struct sockaddr_un sun;
- size_t sz;
-
- sun.sun_family = AF_UNIX;
- sz = sizeof(sun.sun_path);
-
- if (nni_strlcpy(sun.sun_path, path, sz) >= sz) {
- return (NNG_EADDRINVAL);
- }
-
- if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
-
- // There is an assumption here that connect() returns immediately
- // (even when non-blocking) when a server is absent. This seems
- // to be true for the platforms we've tried. If it doesn't work,
- // then the cleanup will fail. As this is supposed to be an
- // exceptional case, don't worry.
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if (connect(fd, (void *) &sun, sizeof(sun)) < 0) {
- if (errno == ECONNREFUSED) {
- (void) unlink(path);
- }
- }
- (void) close(fd);
- return (0);
-}
-
-int
-nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc *ed = (void *) ep;
-
- return (nni_posix_epdesc_listen(ed));
-}
-
-void
-nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_connect((void *) ep, aio);
-}
-
-void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_accept((void *) ep, aio);
-}
-
-void
-nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p)
-{
- nni_posix_pipedesc_fini((void *) p);
-}
-
-void
-nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p)
-{
- nni_posix_pipedesc_close((void *) p);
-}
-
-void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_send((void *) p, aio);
-}
-
-void
-nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_recv((void *) p, aio);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *p, uint64_t *uid)
-{
- int rv;
- uint64_t ignore;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, uid, &ignore, &ignore, &ignore)) != 0) {
- return (rv);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *p, uint64_t *gid)
-{
- int rv;
- uint64_t ignore;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, gid, &ignore, &ignore)) != 0) {
- return (rv);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *p, uint64_t *zid)
-{
- int rv;
- uint64_t ignore;
- uint64_t id;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, &ignore, &ignore, &id)) != 0) {
- return (rv);
- }
- if (id == (uint64_t) -1) {
- // NB: -1 is not a legal zone id (illumos/Solaris)
- return (NNG_ENOTSUP);
- }
- *zid = id;
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *p, uint64_t *pid)
-{
- int rv;
- uint64_t ignore;
- uint64_t id;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, &ignore, &id, &ignore)) != 0) {
- return (rv);
- }
- if (id == (uint64_t) -1) {
- // NB: -1 is not a legal process id
- return (NNG_ENOTSUP);
- }
- *pid = id;
- return (0);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h
new file mode 100644
index 00000000..caf7ed7b
--- /dev/null
+++ b/src/platform/posix/posix_ipc.h
@@ -0,0 +1,48 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+#include <sys/types.h> // For mode_t
+
+struct nni_ipc_conn {
+ nni_posix_pfd * pfd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
+ nni_aio * dial_aio;
+ nni_ipc_dialer *dialer;
+ nni_reap_item reap;
+};
+
+struct nni_ipc_dialer {
+ nni_list connq; // pending connections
+ bool closed;
+ nni_mtx mtx;
+};
+
+struct nni_ipc_listener {
+ nni_posix_pfd *pfd;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ char * path;
+ mode_t perms;
+ nni_mtx mtx;
+};
+
+extern int nni_posix_ipc_conn_init(nni_ipc_conn **, nni_posix_pfd *);
+extern void nni_posix_ipc_conn_start(nni_ipc_conn *);
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
new file mode 100644
index 00000000..2b46fb12
--- /dev/null
+++ b/src/platform/posix/posix_ipcconn.c
@@ -0,0 +1,494 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <unistd.h>
+#if defined(NNG_HAVE_GETPEERUCRED)
+#include <ucred.h>
+#elif defined(NNG_HAVE_LOCALPEERCRED)
+#include <sys/ucred.h>
+#endif
+
+#ifdef NNG_HAVE_ALLOCA
+#include <alloca.h>
+#endif
+
+#ifndef MSG_NOSIGNAL
+#define MSG_NOSIGNAL 0
+#endif
+
+#include "posix_ipc.h"
+
+static void
+ipc_conn_dowrite(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->writeq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov * aiov;
+ struct msghdr hdr;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ memset(&hdr, 0, sizeof(hdr));
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len > 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ hdr.msg_iovlen = niov;
+ hdr.msg_iov = iovec;
+
+ if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ nni_aio_bump_count(aio, n);
+ // We completed the entire operation on this aio.
+ // (Sendmsg never returns a partial result.)
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+static void
+ipc_conn_doread(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->readq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov *aiov;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len != 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ if (n == 0) {
+ // No bytes indicates a closed descriptor.
+ // This implicitly completes this (all!) aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ continue;
+ }
+
+ nni_aio_bump_count(aio, n);
+
+ // We completed the entire operation on this aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+void
+nni_ipc_conn_close(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ nni_aio *aio;
+ c->closed = true;
+ while (((aio = nni_list_first(&c->readq)) != NULL) ||
+ ((aio = nni_list_first(&c->writeq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_posix_pfd_close(c->pfd);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_ipc_conn *c = arg;
+
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
+ nni_ipc_conn_close(c);
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (events & POLLIN) {
+ ipc_conn_doread(c);
+ }
+ if (events & POLLOUT) {
+ ipc_conn_dowrite(c);
+ }
+ events = 0;
+ if (!nni_list_empty(&c->writeq)) {
+ events |= POLLOUT;
+ }
+ if (!nni_list_empty(&c->readq)) {
+ events |= POLLIN;
+ }
+ if ((!c->closed) && (events != 0)) {
+ nni_posix_pfd_arm(pfd, events);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&c->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
+{
+
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->writeq, aio);
+
+ if (nni_list_first(&c->writeq) == aio) {
+ ipc_conn_dowrite(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->writeq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLOUT);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->readq, aio);
+
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
+ if (nni_list_first(&c->readq) == aio) {
+ ipc_conn_doread(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->readq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLIN);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid,
+ uint64_t *prid, uint64_t *znid)
+{
+ int fd = nni_posix_pfd_fd(c->pfd);
+#if defined(NNG_HAVE_GETPEEREID)
+ uid_t uid;
+ gid_t gid;
+
+ if (getpeereid(fd, &uid, &gid) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = uid;
+ *egid = gid;
+ *prid = (uint64_t) -1;
+ *znid = (uint64_t) -1;
+ return (0);
+#elif defined(NNG_HAVE_GETPEERUCRED)
+ ucred_t *ucp = NULL;
+ if (getpeerucred(fd, &ucp) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = ucred_geteuid(ucp);
+ *egid = ucred_getegid(ucp);
+ *prid = ucred_getpid(ucp);
+ *znid = ucred_getzoneid(ucp);
+ ucred_free(ucp);
+ return (0);
+#elif defined(NNG_HAVE_SOPEERCRED)
+ struct ucred uc;
+ socklen_t len = sizeof(uc);
+ if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = uc.uid;
+ *egid = uc.gid;
+ *prid = uc.pid;
+ *znid = (uint64_t) -1;
+ return (0);
+#elif defined(NNG_HAVE_LOCALPEERCRED)
+ struct xucred xu;
+ socklen_t len = sizeof(xu);
+ if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = xu.cr_uid;
+ *egid = xu.cr_gid;
+ *prid = (uint64_t) -1;
+ *znid = (uint64_t) -1;
+#if defined(LOCAL_PEERPID) // present (undocumented) on macOS
+ {
+ pid_t pid;
+ if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERPID, &pid, &len) ==
+ 0) {
+ *prid = (uint64_t) pid;
+ }
+ }
+#endif // LOCAL_PEERPID
+ return (0);
+#else
+ if (fd < 0) {
+ return (NNG_ECLOSED);
+ }
+ NNI_ARG_UNUSED(euid);
+ NNI_ARG_UNUSED(egid);
+ NNI_ARG_UNUSED(prid);
+ NNI_ARG_UNUSED(znid);
+ return (NNG_ENOTSUP);
+#endif
+}
+int
+nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *uid)
+{
+ int rv;
+ uint64_t ignore;
+
+ if ((rv = ipc_conn_peerid(c, uid, &ignore, &ignore, &ignore)) != 0) {
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *gid)
+{
+ int rv;
+ uint64_t ignore;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, gid, &ignore, &ignore)) != 0) {
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *zid)
+{
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal zone id (illumos/Solaris)
+ return (NNG_ENOTSUP);
+ }
+ *zid = id;
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid)
+{
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal process id
+ return (NNG_ENOTSUP);
+ }
+ *pid = id;
+ return (0);
+}
+
+int
+nni_posix_ipc_conn_init(nni_ipc_conn **cp, nni_posix_pfd *pfd)
+{
+ nni_ipc_conn *c;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ c->closed = false;
+ c->pfd = pfd;
+
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->readq);
+ nni_aio_list_init(&c->writeq);
+
+ *cp = c;
+ return (0);
+}
+
+void
+nni_posix_ipc_conn_start(nni_ipc_conn *c)
+{
+ nni_posix_pfd_set_cb(c->pfd, ipc_conn_cb, c);
+}
+
+void
+nni_ipc_conn_fini(nni_ipc_conn *c)
+{
+ nni_ipc_conn_close(c);
+ nni_posix_pfd_fini(c->pfd);
+ nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+ nni_mtx_fini(&c->mtx);
+
+ NNI_FREE_STRUCT(c);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c
new file mode 100644
index 00000000..13732911
--- /dev/null
+++ b/src/platform/posix/posix_ipcdial.c
@@ -0,0 +1,238 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_ipc.h"
+
+// Dialer stuff.
+int
+nni_ipc_dialer_init(nni_ipc_dialer **dp)
+{
+ nni_ipc_dialer *d;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&d->mtx);
+ d->closed = false;
+ nni_aio_list_init(&d->connq);
+ *dp = d;
+ return (0);
+}
+
+void
+nni_ipc_dialer_close(nni_ipc_dialer *d)
+{
+ nni_mtx_lock(&d->mtx);
+ if (!d->closed) {
+ nni_aio *aio;
+ d->closed = true;
+ while ((aio = nni_list_first(&d->connq)) != NULL) {
+ nni_ipc_conn *c;
+ nni_list_remove(&d->connq, aio);
+ if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_ipc_conn_close(c);
+ nni_reap(
+ &c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ }
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_ipc_dialer_fini(nni_ipc_dialer *d)
+{
+ nni_ipc_dialer_close(d);
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+static void
+ipc_dialer_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_dialer *d = nni_aio_get_prov_data(aio);
+ nni_ipc_conn * c;
+
+ nni_mtx_lock(&d->mtx);
+ if ((!nni_aio_list_active(aio)) ||
+ ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ nni_aio_finish_error(aio, rv);
+ nni_ipc_conn_fini(c);
+}
+
+static void
+ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
+{
+ nni_ipc_conn * c = arg;
+ nni_ipc_dialer *d = c->dialer;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&d->mtx);
+ aio = c->dial_aio;
+ if ((aio == NULL) || (!nni_aio_list_active(aio))) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ if (ev & POLLNVAL) {
+ rv = EBADF;
+
+ } else {
+ socklen_t sz = sizeof(int);
+ int fd = nni_posix_pfd_fd(pfd);
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ rv = errno;
+ }
+ if (rv == EINPROGRESS) {
+ // Connection still in progress, come back
+ // later.
+ nni_mtx_unlock(&d->mtx);
+ return;
+ } else if (rv != 0) {
+ rv = nni_plat_errno(rv);
+ }
+ }
+
+ c->dial_aio = NULL;
+ nni_aio_list_remove(aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ if (rv != 0) {
+ nni_ipc_conn_close(c);
+ nni_ipc_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+// We don't give local address binding support. Outbound dialers always
+// get an ephemeral port.
+void
+nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+{
+ nni_ipc_conn * c;
+ nni_posix_pfd * pfd = NULL;
+ struct sockaddr_storage ss;
+ size_t sslen;
+ int fd;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ (ss.ss_family != AF_UNIX)) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
+
+ if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ nni_aio_finish_error(aio, nni_plat_errno(errno));
+ return;
+ }
+
+ // This arranges for the fd to be in nonblocking mode, and adds the
+ // pollfd to the list.
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ c->dialer = d;
+ nni_posix_pfd_set_cb(pfd, ipc_dialer_cb, c);
+
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ rv = NNG_ECLOSED;
+ goto error;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_dialer_cancel, d)) != 0) {
+ goto error;
+ }
+ if ((rv = connect(fd, (void *) &ss, sslen)) != 0) {
+ if (errno != EINPROGRESS) {
+ if (errno == ENOENT) {
+ // No socket present means nobody listening.
+ rv = NNG_ECONNREFUSED;
+ } else {
+ rv = nni_plat_errno(errno);
+ }
+ goto error;
+ }
+ // Asynchronous connect.
+ if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
+ goto error;
+ }
+ c->dial_aio = aio;
+ nni_aio_set_prov_extra(aio, 0, c);
+ nni_list_append(&d->connq, aio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ // Immediate connect, cool! This probably only happens
+ // on loopback, and probably not on every platform.
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ return;
+
+error:
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_reap(&c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ nni_aio_finish_error(aio, rv);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c
new file mode 100644
index 00000000..4e33a08f
--- /dev/null
+++ b/src/platform/posix/posix_ipclisten.c
@@ -0,0 +1,373 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_ipc.h"
+
+int
+nni_ipc_listener_init(nni_ipc_listener **lp)
+{
+ nni_ipc_listener *l;
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&l->mtx);
+
+ l->pfd = NULL;
+ l->closed = false;
+ l->started = false;
+ l->perms = 0;
+
+ nni_aio_list_init(&l->acceptq);
+ *lp = l;
+ return (0);
+}
+
+static void
+ipc_listener_doclose(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+ char * path;
+
+ l->closed = true;
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+
+ if (l->pfd != NULL) {
+ nni_posix_pfd_close(l->pfd);
+ }
+ if (l->started && ((path = l->path) != NULL)) {
+ l->path = NULL;
+ (void) unlink(path);
+ nni_strfree(path);
+ }
+}
+
+void
+nni_ipc_listener_close(nni_ipc_listener *l)
+{
+ nni_mtx_lock(&l->mtx);
+ ipc_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ipc_listener_doaccept(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ int newfd;
+ int fd;
+ int rv;
+ nni_posix_pfd *pfd;
+ nni_ipc_conn * c;
+
+ fd = nni_posix_pfd_fd(l->pfd);
+
+#ifdef NNG_USE_ACCEPT4
+ newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
+ if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
+ newfd = accept(fd, NULL, NULL);
+ }
+#else
+ newfd = accept(fd, NULL, NULL);
+#endif
+ if (newfd < 0) {
+ switch (errno) {
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ rv = nni_posix_pfd_arm(l->pfd, POLLIN);
+ if (rv != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ // Come back later...
+ return;
+ case ECONNABORTED:
+ case ECONNRESET:
+ // Eat them, they aren't interesting.
+ continue;
+ default:
+ // Error this one, but keep moving to the next.
+ rv = nni_plat_errno(errno);
+ NNI_ASSERT(rv != 0);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ }
+
+ if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ close(newfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ nni_aio_list_remove(aio);
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+static void
+ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_ipc_listener *l = arg;
+ NNI_ARG_UNUSED(pfd);
+
+ nni_mtx_lock(&l->mtx);
+ if (events & POLLNVAL) {
+ ipc_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+
+ // Anything else will turn up in accept.
+ ipc_listener_doaccept(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ipc_listener_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_listener *l = nni_aio_get_prov_data(aio);
+
+ // This is dead easy, because we'll ignore the completion if there
+ // isn't anything to do the accept on!
+ NNI_ASSERT(rv != 0);
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+static int
+ipc_remove_stale(const char *path)
+{
+ int fd;
+ struct sockaddr_un sun;
+ size_t sz;
+
+ sun.sun_family = AF_UNIX;
+ sz = sizeof(sun.sun_path);
+
+ if (nni_strlcpy(sun.sun_path, path, sz) >= sz) {
+ return (NNG_EADDRINVAL);
+ }
+
+ if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ // There is an assumption here that connect() returns immediately
+ // (even when non-blocking) when a server is absent. This seems
+ // to be true for the platforms we've tried. If it doesn't work,
+ // then the cleanup will fail. As this is supposed to be an
+ // exceptional case, don't worry.
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ if (connect(fd, (void *) &sun, sizeof(sun)) < 0) {
+ if (errno == ECONNREFUSED) {
+ (void) unlink(path);
+ }
+ }
+ (void) close(fd);
+ return (0);
+}
+
+int
+nni_ipc_listener_set_permissions(nni_ipc_listener *l, int mode)
+{
+ if ((mode & S_IFMT) != 0) {
+ return (NNG_EINVAL);
+ }
+ mode |= S_IFSOCK; // set IFSOCK to ensure non-zero
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ l->perms = mode;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+int
+nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *sd)
+{
+ NNI_ARG_UNUSED(l);
+ NNI_ARG_UNUSED(sd);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
+{
+ socklen_t len;
+ struct sockaddr_storage ss;
+ int rv;
+ int fd;
+ nni_posix_pfd * pfd;
+ char * path;
+
+ if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ (ss.ss_family != AF_UNIX)) {
+ return (NNG_EADDRINVAL);
+ }
+
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ESTATE);
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ path = nni_strdup(sa->s_ipc.sa_path);
+ if (path == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+
+ if (((rv = ipc_remove_stale(path)) != 0) ||
+ ((rv = nni_posix_pfd_init(&pfd, fd)) != 0)) {
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ (void) close(fd);
+ return (rv);
+ }
+
+ if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ if (((l->perms != 0) && (chmod(path, l->perms & ~S_IFMT) != 0)) ||
+ (listen(fd, 128) != 0)) {
+ rv = nni_plat_errno(errno);
+ (void) unlink(path);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ nni_posix_pfd_set_cb(pfd, ipc_listener_cb, l);
+
+ l->pfd = pfd;
+ l->started = true;
+ l->path = path;
+ nni_mtx_unlock(&l->mtx);
+
+ return (0);
+}
+
+void
+nni_ipc_listener_fini(nni_ipc_listener *l)
+{
+ nni_posix_pfd *pfd;
+
+ nni_mtx_lock(&l->mtx);
+ ipc_listener_doclose(l);
+ pfd = l->pfd;
+ nni_mtx_unlock(&l->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+void
+nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
+{
+ int rv;
+
+ // Accept is simpler than the connect case. With accept we just
+ // need to wait for the socket to be readable to indicate an incoming
+ // connection is ready for us. There isn't anything else for us to
+ // do really, as that will have been done in listen.
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_listener_cancel, l)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&l->acceptq, aio);
+ if (nni_list_first(&l->acceptq) == aio) {
+ ipc_listener_doaccept(l);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
deleted file mode 100644
index b11036ea..00000000
--- a/src/platform/posix/posix_pipedesc.c
+++ /dev/null
@@ -1,503 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_pollq.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <poll.h>
-#include <stdbool.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <unistd.h>
-#if defined(NNG_HAVE_GETPEERUCRED)
-#include <ucred.h>
-#elif defined(NNG_HAVE_LOCALPEERCRED)
-#include <sys/ucred.h>
-#include <sys/un.h>
-#endif
-#ifdef NNG_HAVE_ALLOCA
-#include <alloca.h>
-#endif
-
-// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open
-// file descriptor for TCP socket, etc.) This contains the list of pending
-// aios for that underlying socket, as well as the socket itself.
-struct nni_posix_pipedesc {
- nni_posix_pfd *pfd;
- nni_list readq;
- nni_list writeq;
- bool closed;
- nni_mtx mtx;
-};
-
-static void
-nni_posix_pipedesc_finish(nni_aio *aio, int rv)
-{
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, nni_aio_count(aio));
-}
-
-static void
-nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
-
- pd->closed = true;
- while ((aio = nni_list_first(&pd->readq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
- while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
- nni_posix_pfd_close(pd->pfd);
-}
-
-static void
-nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
- int fd;
-
- fd = nni_posix_pfd_fd(pd->pfd);
- if ((fd < 0) || (pd->closed)) {
- return;
- }
-
- while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- unsigned i;
- int n;
- int niov;
- unsigned naiov;
- nni_iov * aiov;
- struct msghdr hdr;
-#ifdef NNG_HAVE_ALLOCA
- struct iovec *iovec;
-#else
- struct iovec iovec[16];
-#endif
-
- memset(&hdr, 0, sizeof(hdr));
-
- nni_aio_get_iov(aio, &naiov, &aiov);
-
-#ifdef NNG_HAVE_ALLOCA
- if (naiov > 64) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
- iovec = alloca(naiov * sizeof(*iovec));
-#else
- if (naiov > NNI_NUM_ELEMENTS(iovec)) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
-#endif
-
- for (niov = 0, i = 0; i < naiov; i++) {
- if (aiov[i].iov_len > 0) {
- iovec[niov].iov_len = aiov[i].iov_len;
- iovec[niov].iov_base = aiov[i].iov_buf;
- niov++;
- }
- }
-
-#ifndef MSG_NOSIGNAL
-#define MSG_NOSIGNAL 0
-#endif
-
- hdr.msg_iovlen = niov;
- hdr.msg_iov = iovec;
-
- if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
- switch (errno) {
- case EINTR:
- continue;
- case EAGAIN:
-#ifdef EWOULDBLOCK
-#if EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
-#endif
-#endif
- return;
- default:
- nni_posix_pipedesc_finish(
- aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
- }
- }
-
- nni_aio_bump_count(aio, n);
- // We completed the entire operation on this aioq.
- // (Sendmsg never returns a partial result.)
- nni_posix_pipedesc_finish(aio, 0);
-
- // Go back to start of loop to see if there is another
- // aio ready for us to process.
- }
-}
-
-static void
-nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
- int fd;
-
- fd = nni_posix_pfd_fd(pd->pfd);
- if ((fd < 0) || (pd->closed)) {
- return;
- }
-
- while ((aio = nni_list_first(&pd->readq)) != NULL) {
- unsigned i;
- int n;
- int niov;
- unsigned naiov;
- nni_iov *aiov;
-#ifdef NNG_HAVE_ALLOCA
- struct iovec *iovec;
-#else
- struct iovec iovec[16];
-#endif
-
- nni_aio_get_iov(aio, &naiov, &aiov);
-#ifdef NNG_HAVE_ALLOCA
- if (naiov > 64) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
- iovec = alloca(naiov * sizeof(*iovec));
-#else
- if (naiov > NNI_NUM_ELEMENTS(iovec)) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
-#endif
- for (niov = 0, i = 0; i < naiov; i++) {
- if (aiov[i].iov_len != 0) {
- iovec[niov].iov_len = aiov[i].iov_len;
- iovec[niov].iov_base = aiov[i].iov_buf;
- niov++;
- }
- }
-
- if ((n = readv(fd, iovec, niov)) < 0) {
- switch (errno) {
- case EINTR:
- continue;
- case EAGAIN:
- return;
- default:
- nni_posix_pipedesc_finish(
- aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
- }
- }
-
- if (n == 0) {
- // No bytes indicates a closed descriptor.
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- nni_posix_pipedesc_doclose(pd);
- return;
- }
-
- nni_aio_bump_count(aio, n);
-
- // We completed the entire operation on this aioq.
- nni_posix_pipedesc_finish(aio, 0);
-
- // Go back to start of loop to see if there is another
- // aio ready for us to process.
- }
-}
-
-static void
-nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_pipedesc *pd = arg;
-
- nni_mtx_lock(&pd->mtx);
- if (events & POLLIN) {
- nni_posix_pipedesc_doread(pd);
- }
- if (events & POLLOUT) {
- nni_posix_pipedesc_dowrite(pd);
- }
- if (events & (POLLHUP | POLLERR | POLLNVAL)) {
- nni_posix_pipedesc_doclose(pd);
- } else {
- events = 0;
- if (!nni_list_empty(&pd->writeq)) {
- events |= POLLOUT;
- }
- if (!nni_list_empty(&pd->readq)) {
- events |= POLLIN;
- }
- if ((!pd->closed) && (events != 0)) {
- nni_posix_pfd_arm(pfd, events);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
-{
- // NB: Events may still occur.
- nni_mtx_lock(&pd->mtx);
- nni_posix_pipedesc_doclose(pd);
- nni_mtx_unlock(&pd->mtx);
-}
-
-static void
-nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
-{
- nni_posix_pipedesc *pd = nni_aio_get_prov_data(aio);
-
- nni_mtx_lock(&pd->mtx);
- if (nni_aio_list_active(aio)) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&pd->mtx);
-
- if (pd->closed) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
-
- if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&pd->readq, aio);
-
- // If we are only job on the list, go ahead and try to do an
- // immediate transfer. This allows for faster completions in
- // many cases. We also need not arm a list if it was already
- // armed.
- if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pipedesc_doread(pd);
- // If we are still the first thing on the list, that
- // means we didn't finish the job, so arm the poller to
- // complete us.
- if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pfd_arm(pd->pfd, POLLIN);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&pd->mtx);
-
- if (pd->closed) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
-
- if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&pd->writeq, aio);
-
- if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pipedesc_dowrite(pd);
- // If we are still the first thing on the list, that
- // means we didn't finish the job, so arm the poller to
- // complete us.
- if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pfd_arm(pd->pfd, POLLOUT);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-int
-nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (getpeername(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (getsockname(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay)
-{
- int val = nodelay ? 1 : 0;
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-int
-nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep)
-{
- int val = keep ? 1 : 0;
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-int
-nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
- uint64_t *egid, uint64_t *prid, uint64_t *znid)
-{
- int fd = nni_posix_pfd_fd(pd->pfd);
-#if defined(NNG_HAVE_GETPEEREID)
- uid_t uid;
- gid_t gid;
-
- if (getpeereid(fd, &uid, &gid) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = uid;
- *egid = gid;
- *prid = (uint64_t) -1;
- *znid = (uint64_t) -1;
- return (0);
-#elif defined(NNG_HAVE_GETPEERUCRED)
- ucred_t *ucp = NULL;
- if (getpeerucred(fd, &ucp) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = ucred_geteuid(ucp);
- *egid = ucred_getegid(ucp);
- *prid = ucred_getpid(ucp);
- *znid = ucred_getzoneid(ucp);
- ucred_free(ucp);
- return (0);
-#elif defined(NNG_HAVE_SOPEERCRED)
- struct ucred uc;
- socklen_t len = sizeof(uc);
- if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = uc.uid;
- *egid = uc.gid;
- *prid = uc.pid;
- *znid = (uint64_t) -1;
- return (0);
-#elif defined(NNG_HAVE_LOCALPEERCRED)
- struct xucred xu;
- socklen_t len = sizeof(xu);
- if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = xu.cr_uid;
- *egid = xu.cr_gid;
- *prid = (uint64_t) -1; // XXX: macOS has undocumented
- // LOCAL_PEERPID...
- *znid = (uint64_t) -1;
- return (0);
-#else
- if (fd < 0) {
- return (NNG_ECLOSED);
- }
- NNI_ARG_UNUSED(euid);
- NNI_ARG_UNUSED(egid);
- NNI_ARG_UNUSED(prid);
- NNI_ARG_UNUSED(znid);
- return (NNG_ENOTSUP);
-#endif
-}
-
-int
-nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd)
-{
- nni_posix_pipedesc *pd;
-
- if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- pd->closed = false;
- pd->pfd = pfd;
-
- nni_mtx_init(&pd->mtx);
- nni_aio_list_init(&pd->readq);
- nni_aio_list_init(&pd->writeq);
-
- nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd);
-
- *pdp = pd;
- return (0);
-}
-
-void
-nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
-{
- nni_posix_pipedesc_close(pd);
- nni_posix_pfd_fini(pd->pfd);
- pd->pfd = NULL;
- nni_mtx_fini(&pd->mtx);
-
- NNI_FREE_STRUCT(pd);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index 63d858c2..f78d0b6b 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_USE_POSIX_RESOLV_GAI
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <ctype.h>
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index 5cd1887a..c0352c55 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index 7f627361..21f6ecfe 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index cdcbecd9..8c186885 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
@@ -151,6 +150,7 @@ static void
tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_tcp_listener *l = arg;
+ NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&l->mtx);
if (events & POLLNVAL) {
@@ -158,7 +158,6 @@ tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
nni_mtx_unlock(&l->mtx);
return;
}
- NNI_ASSERT(pfd == l->pfd);
// Anything else will turn up in accept.
tcp_listener_doaccept(l);
@@ -212,7 +211,7 @@ nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa)
if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
nni_mtx_unlock(&l->mtx);
- nni_posix_pfd_fini(pfd);
+ (void) close(fd);
return (rv);
}
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 759bdb96..873a02a1 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include "platform/posix/posix_pollq.h"
#include <errno.h>
@@ -177,7 +176,7 @@ static void
nni_posix_udp_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_plat_udp *udp = arg;
- NNI_ASSERT(pfd == udp->udp_pfd);
+ NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&udp->udp_mtx);
if (events & POLLIN) {
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index 93e45423..b8741b52 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -127,9 +127,8 @@ extern void nni_win_udp_sysfini(void);
extern int nni_win_resolv_sysinit(void);
extern void nni_win_resolv_sysfini(void);
-extern int nni_win_io_init(nni_win_io *, HANDLE, nni_win_io_cb, void *);
+extern int nni_win_io_init(nni_win_io *, nni_win_io_cb, void *);
extern void nni_win_io_fini(nni_win_io *);
-extern void nni_win_io_cancel(nni_win_io *);
extern int nni_win_io_register(HANDLE);
diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c
index 1179b603..50b22343 100644
--- a/src/platform/windows/win_io.c
+++ b/src/platform/windows/win_io.c
@@ -61,14 +61,13 @@ nni_win_io_register(HANDLE h)
}
int
-nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr)
+nni_win_io_init(nni_win_io *io, nni_win_io_cb cb, void *ptr)
{
ZeroMemory(&io->olpd, sizeof(io->olpd));
io->cb = cb;
io->ptr = ptr;
io->aio = NULL;
- io->f = f;
io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (io->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
@@ -77,14 +76,6 @@ nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr)
}
void
-nni_win_io_cancel(nni_win_io *io)
-{
- if (io->f != INVALID_HANDLE_VALUE) {
- CancelIoEx(io->f, &io->olpd);
- }
-}
-
-void
nni_win_io_fini(nni_win_io *io)
{
if (io->olpd.hEvent != NULL) {
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
deleted file mode 100644
index 169a2e00..00000000
--- a/src/platform/windows/win_ipc.c
+++ /dev/null
@@ -1,673 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_WINDOWS
-
-#include <stdio.h>
-
-struct nni_plat_ipc_pipe {
- HANDLE p;
- int mode;
- nni_win_event rcv_ev;
- nni_win_event snd_ev;
-};
-
-struct nni_plat_ipc_ep {
- char path[NNG_MAXADDRLEN + 16];
- nni_sockaddr addr;
- int mode;
- bool started;
- HANDLE p; // accept side only
- nni_win_event acc_ev; // accept side only
- nni_aio * con_aio; // conn side only
- nni_list_node node; // conn side uses this
- SECURITY_ATTRIBUTES sec_attr;
-};
-
-static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *);
-static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *);
-static void nni_win_ipc_pipe_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_ipc_pipe_ops = {
- .wev_start = nni_win_ipc_pipe_start,
- .wev_finish = nni_win_ipc_pipe_finish,
- .wev_cancel = nni_win_ipc_pipe_cancel,
-};
-
-static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *);
-static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *);
-static void nni_win_ipc_acc_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_ipc_acc_ops = {
- .wev_start = nni_win_ipc_acc_start,
- .wev_finish = nni_win_ipc_acc_finish,
- .wev_cancel = nni_win_ipc_acc_cancel,
-};
-
-static int
-nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
-{
- void * buf;
- DWORD len;
- BOOL ok;
- int rv;
- nni_plat_ipc_pipe *pipe = evt->ptr;
- unsigned idx;
- unsigned naiov;
- nni_iov * aiov;
-
- NNI_ASSERT(aio != NULL);
-
- if (pipe->p == INVALID_HANDLE_VALUE) {
- evt->status = NNG_ECLOSED;
- evt->count = 0;
- return (1);
- }
-
- nni_aio_get_iov(aio, &naiov, &aiov);
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes do
- // not appear to support scatter/gather, so we have to process
- // each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
-
- evt->count = 0;
- if (evt == &pipe->snd_ev) {
- ok = WriteFile(pipe->p, buf, len, NULL, &evt->olpd);
- } else {
- ok = ReadFile(pipe->p, buf, len, NULL, &evt->olpd);
- }
- if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
-
- // Wait for the I/O completion event. Note that when an I/O
- // completes immediately, the I/O completion packet is still
- // delivered.
- return (0);
-}
-
-static void
-nni_win_ipc_pipe_cancel(nni_win_event *evt)
-{
- nni_plat_ipc_pipe *pipe = evt->ptr;
-
- CancelIoEx(pipe->p, &evt->olpd);
-}
-
-static void
-nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio)
-{
- nni_aio_finish(aio, evt->status, evt->count);
-}
-
-static int
-nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p, int mode)
-{
- nni_plat_ipc_pipe *pipe;
- int rv;
-
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- pipe->mode = mode;
- rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
- rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
-
- pipe->p = p;
- *pipep = pipe;
- return (0);
-}
-
-void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio)
-{
- nni_win_event_submit(&pipe->snd_ev, aio);
-}
-
-void
-nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio)
-{
- nni_win_event_submit(&pipe->rcv_ev, aio);
-}
-
-void
-nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe)
-{
- HANDLE p;
-
- nni_win_event_close(&pipe->snd_ev);
- nni_win_event_close(&pipe->rcv_ev);
-
- if ((p = pipe->p) != INVALID_HANDLE_VALUE) {
- pipe->p = INVALID_HANDLE_VALUE;
- CloseHandle(p);
- }
-}
-
-void
-nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe)
-{
- nni_plat_ipc_pipe_close(pipe);
-
- nni_win_event_fini(&pipe->snd_ev);
- nni_win_event_fini(&pipe->rcv_ev);
- NNI_FREE_STRUCT(pipe);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-// nni_plat_ipc_pipe_get_peer_gid obtains the peer group id, if possible.
-// NB: Only POSIX systems support group IDs.
-int
-nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *pipe, uint64_t *pid)
-{
- ULONG id;
- switch (pipe->mode) {
- case NNI_EP_MODE_DIAL:
- if (!GetNamedPipeServerProcessId(pipe->p, &id)) {
- return (nni_win_error(GetLastError()));
- }
- *pid = id;
- break;
- case NNI_EP_MODE_LISTEN:
- if (!GetNamedPipeClientProcessId(pipe->p, &id)) {
- return (nni_win_error(GetLastError()));
- }
- *pid = id;
- break;
- default:
- // Should never occur!
- return (NNG_EINVAL);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode)
-{
- const char * path;
- nni_plat_ipc_ep *ep;
-
- path = sa->s_ipc.sa_path;
- if (nni_strnlen(path, NNG_MAXADDRLEN) >= NNG_MAXADDRLEN) {
- return (NNG_EINVAL);
- }
-
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
- }
- ZeroMemory(ep, sizeof(*ep));
-
- ep->mode = mode;
- ep->sec_attr.nLength = sizeof(ep->sec_attr);
- ep->sec_attr.lpSecurityDescriptor = NULL;
- ep->sec_attr.bInheritHandle = FALSE;
- NNI_LIST_NODE_INIT(&ep->node);
-
- ep->addr = *sa;
- (void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path);
-
- *epp = ep;
- return (0);
-}
-
-int
-nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits)
-{
- NNI_ARG_UNUSED(ep);
- NNI_ARG_UNUSED(bits);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *desc)
-{
- if (ep->started) {
- return (NNG_EBUSY);
- }
- if (ep->mode != NNI_EP_MODE_LISTEN) {
- return (NNG_ENOTSUP);
- }
- if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) {
- return (NNG_EINVAL);
- }
- ep->sec_attr.lpSecurityDescriptor = desc;
- return (0);
-}
-
-int
-nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
-{
- int rv;
- HANDLE p;
-
- if (ep->mode != NNI_EP_MODE_LISTEN) {
- return (NNG_EINVAL);
- }
- if (ep->started) {
- return (NNG_EBUSY);
- }
-
- // We create the first named pipe, and we make sure that it is
- // properly ours.
- p = CreateNamedPipeA(ep->path,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
- FILE_FLAG_FIRST_PIPE_INSTANCE,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
- PIPE_REJECT_REMOTE_CLIENTS,
- PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr);
- if (p == INVALID_HANDLE_VALUE) {
- if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) {
- rv = NNG_EADDRINUSE;
- } else {
- rv = nni_win_error(rv);
- }
- goto failed;
- }
- rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep);
- if (rv != 0) {
- goto failed;
- }
-
- if ((rv = nni_win_iocp_register(p)) != 0) {
- goto failed;
- }
-
- ep->p = p;
- ep->started = true;
- return (0);
-
-failed:
-
- if (p != INVALID_HANDLE_VALUE) {
- (void) CloseHandle(p);
- }
-
- return (rv);
-}
-
-static void
-nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_ipc_ep * ep = evt->ptr;
- nni_plat_ipc_pipe *pipe;
- int rv;
- HANDLE newp, oldp;
-
- if ((rv = evt->status) != 0) {
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- newp = CreateNamedPipeA(ep->path,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
- PIPE_REJECT_REMOTE_CLIENTS,
- PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr);
- if (newp == INVALID_HANDLE_VALUE) {
- rv = nni_win_error(GetLastError());
- // We connected, but as we cannot get a new pipe,
- // we have to disconnect the old one.
- DisconnectNamedPipe(ep->p);
- nni_aio_finish_error(aio, rv);
- return;
- }
- if ((rv = nni_win_iocp_register(newp)) != 0) {
- // Disconnect the old pipe.
- DisconnectNamedPipe(ep->p);
- // And discard the half-baked new one.
- DisconnectNamedPipe(newp);
- (void) CloseHandle(newp);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- oldp = ep->p;
- ep->p = newp;
-
- if ((rv = nni_win_ipc_pipe_init(&pipe, oldp, NNI_EP_MODE_LISTEN)) !=
- 0) {
- // The new pipe is already fine for us. Discard
- // the old one, since failed to be able to use it.
- DisconnectNamedPipe(oldp);
- (void) CloseHandle(oldp);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
-}
-
-static void
-nni_win_ipc_acc_cancel(nni_win_event *evt)
-{
- nni_plat_ipc_ep *ep = evt->ptr;
-
- (void) CancelIoEx(ep->p, &evt->olpd);
- // Just to be sure.
- (void) DisconnectNamedPipe(ep->p);
-}
-
-static int
-nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_ipc_ep *ep = evt->ptr;
- NNI_ARG_UNUSED(aio);
-
- if (!ConnectNamedPipe(ep->p, &evt->olpd)) {
- int rv = GetLastError();
- switch (rv) {
- case ERROR_PIPE_CONNECTED:
- // Kind of like success, but as this is technically
- // an "error", we have to complete it ourself.
- evt->status = 0;
- evt->count = 0;
- return (1);
-
- case ERROR_IO_PENDING:
- // Normal asynchronous operation. Wait for
- // completion.
- return (0);
-
- default:
- // Fast-fail (synchronous).
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
- }
-
- // Synchronous success - the I/O completion packet should still
- // be delivered.
- return (0);
-}
-
-void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_win_event_submit(&ep->acc_ev, aio);
-}
-
-// So Windows IPC is a bit different on the client side. There is no
-// support for asynchronous connection, but we can fake it with a
-// single thread that runs to establish the connection. That thread
-// will run keep looping, sleeping for 10 ms between attempts. It
-// performs non-blocking attempts to connect.
-typedef struct nni_win_ipc_conn_work nni_win_ipc_conn_work;
-struct nni_win_ipc_conn_work {
- nni_list waiters;
- nni_list workers;
- nni_mtx mtx;
- nni_cv cv;
- nni_thr thr;
- int exit;
-};
-
-static nni_win_ipc_conn_work nni_win_ipc_connecter;
-
-static void
-nni_win_ipc_conn_thr(void *arg)
-{
- nni_win_ipc_conn_work *w = arg;
- nni_plat_ipc_ep * ep;
- nni_plat_ipc_pipe * pipe;
- nni_aio * aio;
- HANDLE p;
- int rv;
-
- nni_mtx_lock(&w->mtx);
- for (;;) {
- if (w->exit) {
- break;
- }
- while ((ep = nni_list_first(&w->waiters)) != NULL) {
- nni_list_remove(&w->waiters, ep);
- nni_list_append(&w->workers, ep);
- }
-
- while ((ep = nni_list_first(&w->workers)) != NULL) {
-
- nni_list_remove(&w->workers, ep);
-
- if ((aio = ep->con_aio) == NULL) {
- continue;
- }
- ep->con_aio = NULL;
-
- pipe = NULL;
-
- p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE,
- 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED,
- NULL);
-
- if (p == INVALID_HANDLE_VALUE) {
- switch ((rv = GetLastError())) {
- case ERROR_PIPE_BUSY:
- // Still in progress. This
- // shouldn't happen unless the
- // other side aborts the
- // connection.
- ep->con_aio = aio;
- nni_list_append(&w->waiters, ep);
- continue;
-
- case ERROR_FILE_NOT_FOUND:
- rv = NNG_ECONNREFUSED;
- break;
- default:
- rv = nni_win_error(rv);
- break;
- }
- goto fail;
- }
- if (((rv = nni_win_ipc_pipe_init(
- &pipe, p, NNI_EP_MODE_DIAL)) != 0) ||
- ((rv = nni_win_iocp_register(p)) != 0)) {
- goto fail;
- }
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
- continue;
-
- fail:
- if (p != INVALID_HANDLE_VALUE) {
- DisconnectNamedPipe(p);
- CloseHandle(p);
- }
- if (pipe != NULL) {
- nni_plat_ipc_pipe_fini(pipe);
- }
- nni_aio_finish_error(aio, rv);
- }
-
- if (nni_list_empty(&w->waiters)) {
- // Wait until an endpoint is added.
- nni_cv_wait(&w->cv);
- } else {
- // Wait 10 ms, unless woken earlier.
- nni_cv_until(&w->cv, nni_clock() + 10);
- }
- }
- nni_mtx_unlock(&w->mtx);
-}
-
-static void
-nni_win_ipc_conn_cancel(nni_aio *aio, int rv)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- nni_plat_ipc_ep * ep = nni_aio_get_prov_data(aio);
-
- nni_mtx_lock(&w->mtx);
- if (aio == ep->con_aio) {
- ep->con_aio = NULL;
- if (nni_list_active(&w->waiters, ep)) {
- nni_list_remove(&w->waiters, ep);
- nni_cv_wake(&w->cv);
- }
- nni_aio_finish_error(aio, rv);
- }
- nni_mtx_unlock(&w->mtx);
-}
-
-void
-nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&w->mtx);
- if ((rv = nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep)) != 0) {
- nni_mtx_unlock(&w->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- NNI_ASSERT(!nni_list_active(&w->waiters, ep));
-
- ep->con_aio = aio;
- nni_list_append(&w->waiters, ep);
- nni_cv_wake(&w->cv);
- nni_mtx_unlock(&w->mtx);
-}
-
-void
-nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
-{
- nni_plat_ipc_ep_close(ep);
- if (ep->p != INVALID_HANDLE_VALUE) {
- CloseHandle(ep->p);
- ep->p = INVALID_HANDLE_VALUE;
- }
- nni_win_event_close(&ep->acc_ev);
- nni_win_event_fini(&ep->acc_ev);
- NNI_FREE_STRUCT(ep);
-}
-
-void
-nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- nni_aio * aio;
-
- switch (ep->mode) {
- case NNI_EP_MODE_DIAL:
- nni_mtx_lock(&w->mtx);
- if (nni_list_active(&w->waiters, ep)) {
- nni_list_remove(&w->waiters, ep);
- }
- if ((aio = ep->con_aio) != NULL) {
- ep->con_aio = NULL;
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
- nni_mtx_unlock(&w->mtx);
- break;
-
- case NNI_EP_MODE_LISTEN:
- nni_win_event_close(&ep->acc_ev);
- if (ep->p != INVALID_HANDLE_VALUE) {
- CloseHandle(ep->p);
- ep->p = INVALID_HANDLE_VALUE;
- }
- break;
- }
-}
-
-int
-nni_win_ipc_sysinit(void)
-{
- int rv;
- nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter;
-
- NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node);
- NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node);
-
- nni_mtx_init(&worker->mtx);
- nni_cv_init(&worker->cv, &worker->mtx);
-
- rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker);
- if (rv != 0) {
- return (rv);
- }
-
- nni_thr_run(&worker->thr);
-
- return (0);
-}
-
-void
-nni_win_ipc_sysfini(void)
-{
- nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter;
-
- nni_mtx_lock(&worker->mtx);
- worker->exit = 1;
- nni_cv_wake(&worker->cv);
- nni_mtx_unlock(&worker->mtx);
- nni_thr_fini(&worker->thr);
- nni_cv_fini(&worker->cv);
- nni_mtx_fini(&worker->mtx);
-}
-
-#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipc.h b/src/platform/windows/win_ipc.h
new file mode 100644
index 00000000..51ce5548
--- /dev/null
+++ b/src/platform/windows/win_ipc.h
@@ -0,0 +1,62 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef PLATFORM_WIN_WINIPC_H
+#define PLATFORM_WIN_WINIPC_H
+
+// This header file is private to the IPC (named pipes) support for Windows.
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+struct nni_ipc_conn {
+ HANDLE f;
+ nni_win_io recv_io;
+ nni_win_io send_io;
+ nni_win_io conn_io;
+ nni_list recv_aios;
+ nni_list send_aios;
+ nni_aio * conn_aio;
+ nni_ipc_dialer * dialer;
+ nni_ipc_listener *listener;
+ int recv_rv;
+ int send_rv;
+ int conn_rv;
+ bool closed;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_reap_item reap;
+};
+
+struct nni_ipc_dialer {
+ bool closed; // dialers are locked by the worker lock
+ nni_list aios;
+ nni_list_node node; // node on worker list
+};
+
+struct nni_ipc_listener {
+ char * path;
+ bool started;
+ bool closed;
+ HANDLE f;
+ SECURITY_ATTRIBUTES sec_attr;
+ nni_list aios;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_win_io io;
+ int rv;
+};
+
+extern int nni_win_ipc_conn_init(nni_ipc_conn **, HANDLE);
+
+#endif // NNG_PLATFORM_WINDOWS
+
+#endif // NNG_PLATFORM_WIN_WINIPC_H
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c
new file mode 100644
index 00000000..d8ef4e4e
--- /dev/null
+++ b/src/platform/windows/win_ipcconn.c
@@ -0,0 +1,388 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+static void
+ipc_recv_start(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ unsigned idx;
+ unsigned naiov;
+ nni_iov *aiov;
+ void * buf;
+ DWORD len;
+ int rv;
+
+ if (c->closed) {
+ while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_list_remove(&c->recv_aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&c->cv);
+ }
+again:
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ return;
+ }
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes do
+ // not appear to support scatter/gather, so we have to process
+ // each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ goto again;
+ }
+}
+
+static void
+ipc_recv_cb(nni_win_io *io, int rv, size_t num)
+{
+ nni_aio * aio;
+ nni_ipc_conn *c = io->ptr;
+ nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ // Should indicate that it was closed.
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ if (c->recv_rv != 0) {
+ rv = c->recv_rv;
+ c->recv_rv = 0;
+ }
+ nni_aio_list_remove(aio);
+ ipc_recv_start(c);
+ if (c->closed) {
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ if ((rv == 0) && (num == 0)) {
+ // A zero byte receive is a remote close from the peer.
+ rv = NNG_ECLOSED;
+ }
+ nni_aio_finish_synch(aio, rv, num);
+}
+static void
+ipc_recv_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+ nni_mtx_lock(&c->mtx);
+ if (aio == nni_list_first(&c->recv_aios)) {
+ c->recv_rv = rv;
+ CancelIoEx(c->f, &c->recv_io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (c->closed) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&c->recv_aios, aio);
+ if (aio == nni_list_first(&c->recv_aios)) {
+ ipc_recv_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_send_start(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ unsigned idx;
+ unsigned naiov;
+ nni_iov *aiov;
+ void * buf;
+ DWORD len;
+ int rv;
+
+ if (c->closed) {
+ while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_list_remove(&c->send_aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&c->cv);
+ }
+again:
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ return;
+ }
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes do
+ // not appear to support scatter/gather, so we have to process
+ // each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ goto again;
+ }
+}
+
+static void
+ipc_send_cb(nni_win_io *io, int rv, size_t num)
+{
+ nni_aio * aio;
+ nni_ipc_conn *c = io->ptr;
+ nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ // Should indicate that it was closed.
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ if (c->send_rv != 0) {
+ rv = c->send_rv;
+ c->send_rv = 0;
+ }
+ nni_aio_list_remove(aio);
+ ipc_send_start(c);
+ if (c->closed) {
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ if ((rv == 0) && (num == 0)) {
+ // A zero byte receive is a remote close from the peer.
+ rv = NNG_ECLOSED;
+ }
+ nni_aio_finish_synch(aio, rv, num);
+}
+
+static void
+ipc_send_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+ nni_mtx_lock(&c->mtx);
+ if (aio == nni_list_first(&c->send_aios)) {
+ c->send_rv = rv;
+ CancelIoEx(c->f, &c->send_io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (c->closed) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_send_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&c->send_aios, aio);
+ if (aio == nni_list_first(&c->send_aios)) {
+ ipc_send_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+nni_win_ipc_conn_init(nni_ipc_conn **connp, HANDLE p)
+{
+ nni_ipc_conn *c;
+ int rv;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ c->f = INVALID_HANDLE_VALUE;
+ nni_mtx_init(&c->mtx);
+ nni_cv_init(&c->cv, &c->mtx);
+ nni_aio_list_init(&c->recv_aios);
+ nni_aio_list_init(&c->send_aios);
+
+ if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) ||
+ ((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) {
+ nni_ipc_conn_fini(c);
+ return (rv);
+ }
+
+ c->f = p;
+ *connp = c;
+ return (0);
+}
+
+void
+nni_ipc_conn_close(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ c->closed = true;
+ if (!nni_list_empty(&c->recv_aios)) {
+ CancelIoEx(c->f, &c->recv_io.olpd);
+ }
+ if (!nni_list_empty(&c->send_aios)) {
+ CancelIoEx(c->f, &c->send_io.olpd);
+ }
+
+ if (c->f != INVALID_HANDLE_VALUE) {
+ // NB: closing the pipe is dangerous at this point.
+ DisconnectNamedPipe(c->f);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_reap(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ while ((!nni_list_empty(&c->recv_aios)) ||
+ (!nni_list_empty(&c->send_aios))) {
+ nni_cv_wait(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ nni_win_io_fini(&c->recv_io);
+ nni_win_io_fini(&c->send_io);
+ nni_win_io_fini(&c->conn_io);
+
+ if (c->f != INVALID_HANDLE_VALUE) {
+ CloseHandle(c->f);
+ }
+ nni_cv_fini(&c->cv);
+ nni_mtx_fini(&c->mtx);
+ NNI_FREE_STRUCT(c);
+}
+
+void
+nni_ipc_conn_fini(nni_ipc_conn *c)
+{
+ nni_ipc_conn_close(c);
+
+ nni_reap(&c->reap, (nni_cb) ipc_conn_reap, c);
+}
+
+int
+nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid)
+{
+ ULONG id;
+ if (c->dialer) {
+ if (!GetNamedPipeServerProcessId(c->f, &id)) {
+ return (nni_win_error(GetLastError()));
+ }
+ } else {
+ if (!GetNamedPipeClientProcessId(c->f, &id)) {
+ return (nni_win_error(GetLastError()));
+ }
+ }
+ *pid = id;
+ return (0);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c
new file mode 100644
index 00000000..429bcedf
--- /dev/null
+++ b/src/platform/windows/win_ipcdial.c
@@ -0,0 +1,265 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+int
+nni_ipc_dialer_init(nni_ipc_dialer **dp)
+{
+ nni_ipc_dialer *d;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ d->closed = false;
+ nni_aio_list_init(&d->aios);
+ *dp = d;
+ return (0);
+}
+
+// Windows IPC is a bit different on the client side. There is no
+// support for asynchronous connection, but we can fake it with a
+// single thread that runs to establish the connection. That thread
+// will run keep looping, sleeping for 10 ms between attempts. It
+// performs non-blocking attempts to connect.
+typedef struct ipc_dial_work {
+ nni_list waiters;
+ nni_list workers;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_thr thr;
+ int exit;
+} ipc_dial_work;
+
+static ipc_dial_work ipc_connecter;
+
+static void
+ipc_dial_thr(void *arg)
+{
+ ipc_dial_work *w = arg;
+
+ nni_mtx_lock(&w->mtx);
+ for (;;) {
+ nni_ipc_dialer *d;
+
+ if (w->exit) {
+ break;
+ }
+ while ((d = nni_list_first(&w->waiters)) != NULL) {
+ nni_list_remove(&w->waiters, d);
+ nni_list_append(&w->workers, d);
+ }
+
+ while ((d = nni_list_first(&w->workers)) != NULL) {
+ nni_ipc_conn *c;
+ nni_aio * aio;
+ HANDLE f;
+ int rv;
+ char * path;
+
+ if ((aio = nni_list_first(&d->aios)) == NULL) {
+ nni_list_remove(&w->workers, d);
+ continue;
+ }
+
+ path = nni_aio_get_prov_extra(aio, 0);
+
+ f = CreateFileA(path, GENERIC_READ | GENERIC_WRITE, 0,
+ NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
+
+ if (f == INVALID_HANDLE_VALUE) {
+ switch ((rv = GetLastError())) {
+ case ERROR_PIPE_BUSY:
+ // Still in progress. This
+ // shouldn't happen unless the
+ // other side aborts the
+ // connection.
+ // back at the head of the list
+ nni_list_remove(&w->workers, d);
+ nni_list_prepend(&w->waiters, d);
+ continue;
+
+ case ERROR_FILE_NOT_FOUND:
+ rv = NNG_ECONNREFUSED;
+ break;
+ default:
+ rv = nni_win_error(rv);
+ break;
+ }
+ nni_list_remove(&d->aios, aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ nni_list_remove(&d->aios, aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+
+ if (((rv = nni_win_io_register(f)) != 0) ||
+ ((rv = nni_win_ipc_conn_init(&c, f)) != 0)) {
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ c->dialer = d;
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+
+ if (nni_list_empty(&w->waiters)) {
+ // Wait until an endpoint is added.
+ nni_cv_wait(&w->cv);
+ } else {
+ // Wait 10 ms, unless woken earlier.
+ nni_cv_until(&w->cv, nni_clock() + 10);
+ }
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+static void
+ipc_dial_cancel(nni_aio *aio, int rv)
+{
+ ipc_dial_work * w = &ipc_connecter;
+ nni_ipc_dialer *d = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&w->mtx);
+ if (nni_aio_list_active(aio)) {
+ char *path;
+ if (nni_list_active(&w->waiters, d)) {
+ nni_list_remove(&w->waiters, d);
+ nni_cv_wake(&w->cv);
+ }
+ nni_aio_list_remove(aio);
+ path = nni_aio_get_prov_extra(aio, 0);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+void
+nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+{
+ ipc_dial_work *w = &ipc_connecter;
+ char * path;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ if (sa->s_family != NNG_AF_IPC) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
+ if ((rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path)) !=
+ 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_mtx_lock(&w->mtx);
+ if ((rv = nni_aio_schedule(aio, ipc_dial_cancel, d)) != 0) {
+ nni_mtx_unlock(&w->mtx);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (d->closed) {
+ nni_mtx_unlock(&w->mtx);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ nni_aio_set_prov_extra(aio, 0, path);
+ nni_list_append(&d->aios, aio);
+ if (nni_list_first(&d->aios) == aio) {
+ nni_list_append(&w->waiters, d);
+ nni_cv_wake(&w->cv);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+void
+nni_ipc_dialer_fini(nni_ipc_dialer *d)
+{
+ nni_ipc_dialer_close(d);
+ NNI_FREE_STRUCT(d);
+}
+
+void
+nni_ipc_dialer_close(nni_ipc_dialer *d)
+{
+ ipc_dial_work *w = &ipc_connecter;
+ nni_aio * aio;
+
+ nni_mtx_lock(&w->mtx);
+ d->closed = true;
+ if (nni_list_active(&w->waiters, d)) {
+ nni_list_remove(&w->waiters, d);
+ }
+ while ((aio = nni_list_first(&d->aios)) != NULL) {
+ nni_list_remove(&d->aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+int
+nni_win_ipc_sysinit(void)
+{
+ int rv;
+ ipc_dial_work *worker = &ipc_connecter;
+
+ NNI_LIST_INIT(&worker->workers, nni_ipc_dialer, node);
+ NNI_LIST_INIT(&worker->waiters, nni_ipc_dialer, node);
+
+ nni_mtx_init(&worker->mtx);
+ nni_cv_init(&worker->cv, &worker->mtx);
+
+ rv = nni_thr_init(&worker->thr, ipc_dial_thr, worker);
+ if (rv != 0) {
+ return (rv);
+ }
+
+ nni_thr_run(&worker->thr);
+
+ return (0);
+}
+
+void
+nni_win_ipc_sysfini(void)
+{
+ ipc_dial_work *worker = &ipc_connecter;
+
+ nni_reap_drain(); // so that listeners get cleaned up.
+
+ nni_mtx_lock(&worker->mtx);
+ worker->exit = 1;
+ nni_cv_wake(&worker->cv);
+ nni_mtx_unlock(&worker->mtx);
+ nni_thr_fini(&worker->thr);
+ nni_cv_fini(&worker->cv);
+ nni_mtx_fini(&worker->mtx);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c
new file mode 100644
index 00000000..20bb8548
--- /dev/null
+++ b/src/platform/windows/win_ipclisten.c
@@ -0,0 +1,296 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+static void
+ipc_accept_done(nni_ipc_listener *l, int rv)
+{
+ nni_aio * aio;
+ HANDLE f;
+ nni_ipc_conn *c;
+
+ aio = nni_list_first(&l->aios);
+ nni_list_remove(&l->aios, aio);
+ nni_cv_wake(&l->cv);
+
+ if (l->closed) {
+ // Closed, so bail.
+ DisconnectNamedPipe(l->f);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ // Create a replacement pipe.
+ f = CreateNamedPipeA(l->path,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS,
+ PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr);
+ if (f == INVALID_HANDLE_VALUE) {
+ // We couldn't create a replacement pipe, so we have to
+ // abort the client from our side, so that we can keep
+ // our server pipe available.
+ rv = nni_win_error(GetLastError());
+ DisconnectNamedPipe(l->f);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (((rv = nni_win_io_register(f)) != 0) ||
+ ((rv = nni_win_ipc_conn_init(&c, l->f)) != 0)) {
+ DisconnectNamedPipe(l->f);
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ l->f = f;
+ c->listener = l;
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+ipc_accept_start(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+
+ if (l->closed) {
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ nni_list_remove(&l->aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&l->cv);
+ }
+
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ int rv;
+
+ if ((ConnectNamedPipe(l->f, &l->io.olpd)) ||
+ ((rv = GetLastError()) == ERROR_IO_PENDING)) {
+ // Success, or pending, handled via completion pkt.
+ return;
+ }
+ if (rv == ERROR_PIPE_CONNECTED) {
+ // Kind of like success, but as this is technically
+ // an "error", we have to complete it ourself.
+ // Fake a completion.
+ ipc_accept_done(l, 0);
+ } else {
+ // Fast-fail (synchronous).
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ }
+ }
+}
+
+static void
+ipc_accept_cb(nni_win_io *io, int rv, size_t cnt)
+{
+ nni_ipc_listener *l = io->ptr;
+
+ NNI_ARG_UNUSED(cnt);
+
+ nni_mtx_lock(&l->mtx);
+ if (nni_list_empty(&l->aios)) {
+ // We canceled this somehow. We no longer care.
+ DisconnectNamedPipe(l->f);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (l->rv != 0) {
+ rv = l->rv;
+ l->rv = 0;
+ }
+ ipc_accept_done(l, rv);
+ ipc_accept_start(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+int
+nni_ipc_listener_init(nni_ipc_listener **lp)
+{
+ nni_ipc_listener *l;
+ int rv;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_win_io_init(&l->io, ipc_accept_cb, l)) != 0) {
+ NNI_FREE_STRUCT(l);
+ return (rv);
+ }
+ l->started = false;
+ l->closed = false;
+ l->sec_attr.nLength = sizeof(l->sec_attr);
+ l->sec_attr.lpSecurityDescriptor = NULL;
+ l->sec_attr.bInheritHandle = FALSE;
+ nni_aio_list_init(&l->aios);
+ nni_mtx_init(&l->mtx);
+ nni_cv_init(&l->cv, &l->mtx);
+ *lp = l;
+ return (0);
+}
+
+int
+nni_ipc_listener_set_permissions(nni_ipc_listener *l, int bits)
+{
+ NNI_ARG_UNUSED(l);
+ NNI_ARG_UNUSED(bits);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *desc)
+{
+ if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) {
+ return (NNG_EINVAL);
+ }
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ l->sec_attr.lpSecurityDescriptor = desc;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+int
+nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
+{
+ int rv;
+ HANDLE f;
+ char * path;
+
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path);
+ if (rv != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ f = CreateNamedPipeA(path,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
+ FILE_FLAG_FIRST_PIPE_INSTANCE,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS,
+ PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr);
+ if (f == INVALID_HANDLE_VALUE) {
+ if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) {
+ rv = NNG_EADDRINUSE;
+ } else {
+ rv = nni_win_error(rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+ if ((rv = nni_win_io_register(f)) != 0) {
+ CloseHandle(f);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+
+ l->f = f;
+ l->path = path;
+ l->started = true;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+static void
+ipc_accept_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_listener *l = nni_aio_get_prov_data(aio);
+
+ nni_mtx_unlock(&l->mtx);
+ if (aio == nni_list_first(&l->aios)) {
+ l->rv = rv;
+ CancelIoEx(l->f, &l->io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_list_remove(&l->aios, aio);
+ nni_cv_wake(&l->cv);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
+{
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ nni_list_append(&l->aios, aio);
+ if (nni_list_first(&l->aios) == aio) {
+ ipc_accept_start(l);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_close(nni_ipc_listener *l)
+{
+
+ nni_mtx_lock(&l->mtx);
+ if (!l->closed) {
+ l->closed = true;
+ if (!nni_list_empty(&l->aios)) {
+ CancelIoEx(l->f, &l->io.olpd);
+ }
+ DisconnectNamedPipe(l->f);
+ CloseHandle(l->f);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_fini(nni_ipc_listener *l)
+{
+ nni_mtx_lock(&l->mtx);
+ while (!nni_list_empty(&l->aios)) {
+ nni_cv_wait(&l->cv);
+ }
+ nni_mtx_unlock(&l->mtx);
+ nni_win_io_fini(&l->io);
+ nni_strfree(l->path);
+ nni_cv_fini(&l->cv);
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c
index c3a4a5d8..08b759ca 100644
--- a/src/platform/windows/win_tcpconn.c
+++ b/src/platform/windows/win_tcpconn.c
@@ -102,7 +102,7 @@ tcp_recv_cancel(nni_aio *aio, int rv)
nni_mtx_lock(&c->mtx);
if (aio == nni_list_first(&c->recv_aios)) {
c->recv_rv = rv;
- nni_win_io_cancel(&c->recv_io);
+ CancelIoEx((HANDLE) c->s, &c->recv_io.olpd);
} else if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -192,7 +192,7 @@ tcp_send_cancel(nni_aio *aio, int rv)
nni_mtx_lock(&c->mtx);
if (aio == nni_list_first(&c->send_aios)) {
c->send_rv = rv;
- nni_win_io_cancel(&c->send_io);
+ CancelIoEx((HANDLE) c->s, &c->send_io.olpd);
} else if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -273,10 +273,8 @@ nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s)
nni_aio_list_init(&c->send_aios);
c->conn_aio = NULL;
- if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) !=
- 0) ||
- ((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) !=
- 0) ||
+ if (((rv = nni_win_io_init(&c->recv_io, tcp_recv_cb, c)) != 0) ||
+ ((rv = nni_win_io_init(&c->send_io, tcp_send_cb, c)) != 0) ||
((rv = nni_win_io_register((HANDLE) s)) != 0)) {
nni_tcp_conn_fini(c);
return (rv);
@@ -309,10 +307,10 @@ nni_tcp_conn_close(nni_tcp_conn *c)
if (!c->closed) {
c->closed = true;
if (!nni_list_empty(&c->recv_aios)) {
- nni_win_io_cancel(&c->recv_io);
+ CancelIoEx((HANDLE) c->s, &c->recv_io.olpd);
}
if (!nni_list_empty(&c->send_aios)) {
- nni_win_io_cancel(&c->send_io);
+ CancelIoEx((HANDLE) c->s, &c->send_io.olpd);
}
if (c->s != INVALID_SOCKET) {
shutdown(c->s, SD_BOTH);
@@ -372,7 +370,6 @@ nni_tcp_conn_fini(nni_tcp_conn *c)
while ((!nni_list_empty(&c->recv_aios)) ||
(!nni_list_empty(&c->send_aios))) {
nni_cv_wait(&c->cv);
- nni_mtx_unlock(&c->mtx);
}
nni_mtx_unlock(&c->mtx);
diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c
index 4a3e9f2f..5283ea81 100644
--- a/src/platform/windows/win_tcpdial.c
+++ b/src/platform/windows/win_tcpdial.c
@@ -70,7 +70,7 @@ nni_tcp_dialer_close(nni_tcp_dialer *d)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->conn_rv = NNG_ECLOSED;
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
}
}
@@ -104,7 +104,7 @@ tcp_dial_cancel(nni_aio *aio, int rv)
if (c->conn_rv == 0) {
c->conn_rv = rv;
}
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
nni_mtx_unlock(&d->mtx);
}
@@ -187,8 +187,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) !=
- 0) {
+ if ((rv = nni_win_io_init(&c->conn_io, tcp_dial_cb, c)) != 0) {
nni_tcp_conn_fini(c);
nni_aio_finish_error(aio, rv);
return;
diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c
index 5055c32d..1f197246 100644
--- a/src/platform/windows/win_tcplisten.c
+++ b/src/platform/windows/win_tcplisten.c
@@ -147,7 +147,7 @@ nni_tcp_listener_close(nni_tcp_listener *l)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->conn_rv = NNG_ECLOSED;
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
}
closesocket(l->s);
@@ -246,7 +246,7 @@ tcp_accept_cancel(nni_aio *aio, int rv)
if (c->conn_rv == 0) {
c->conn_rv = rv;
}
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
nni_mtx_unlock(&l->mtx);
}
@@ -263,6 +263,11 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
return;
}
nni_mtx_lock(&l->mtx);
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
if (l->closed) {
nni_mtx_unlock(&l->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -286,8 +291,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
c->listener = l;
c->conn_aio = aio;
nni_aio_set_prov_extra(aio, 0, c);
- if (((rv = nni_win_io_init(
- &c->conn_io, (HANDLE) l->s, tcp_accept_cb, c)) != 0) ||
+ if (((rv = nni_win_io_init(&c->conn_io, tcp_accept_cb, c)) != 0) ||
((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) {
nni_aio_set_prov_extra(aio, 0, NULL);
nni_mtx_unlock(&l->mtx);