aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_epdesc.c597
-rw-r--r--src/platform/posix/posix_ipc.c250
-rw-r--r--src/platform/posix/posix_ipc.h48
-rw-r--r--src/platform/posix/posix_ipcconn.c494
-rw-r--r--src/platform/posix/posix_ipcdial.c238
-rw-r--r--src/platform/posix/posix_ipclisten.c373
-rw-r--r--src/platform/posix/posix_pipedesc.c503
-rw-r--r--src/platform/posix/posix_resolv_gai.c1
-rw-r--r--src/platform/posix/posix_tcpconn.c1
-rw-r--r--src/platform/posix/posix_tcpdial.c1
-rw-r--r--src/platform/posix/posix_tcplisten.c5
-rw-r--r--src/platform/posix/posix_udp.c3
-rw-r--r--src/platform/windows/win_impl.h3
-rw-r--r--src/platform/windows/win_io.c11
-rw-r--r--src/platform/windows/win_ipc.c673
-rw-r--r--src/platform/windows/win_ipc.h62
-rw-r--r--src/platform/windows/win_ipcconn.c388
-rw-r--r--src/platform/windows/win_ipcdial.c265
-rw-r--r--src/platform/windows/win_ipclisten.c296
-rw-r--r--src/platform/windows/win_tcpconn.c15
-rw-r--r--src/platform/windows/win_tcpdial.c7
-rw-r--r--src/platform/windows/win_tcplisten.c12
22 files changed, 2186 insertions, 2060 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
deleted file mode 100644
index d796765a..00000000
--- a/src/platform/posix/posix_epdesc.c
+++ /dev/null
@@ -1,597 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_pollq.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <poll.h>
-#include <stdbool.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-#ifdef sun
-#undef sun
-#endif
-
-#ifdef SOCK_CLOEXEC
-#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define NNI_STREAM_SOCKTYPE SOCK_STREAM
-#endif
-
-struct nni_posix_epdesc {
- nni_posix_pfd * pfd;
- nni_list connectq;
- nni_list acceptq;
- bool closed;
- bool started;
- bool ipcbound; // if true unlink socket on exit
- struct sockaddr_storage locaddr;
- struct sockaddr_storage remaddr;
- socklen_t loclen;
- socklen_t remlen;
- mode_t perms; // UNIX sockets only
- int mode; // end point mode (dialer/listener)
- nni_mtx mtx;
-};
-
-static void nni_epdesc_connect_cb(nni_posix_pfd *, int, void *);
-static void nni_epdesc_accept_cb(nni_posix_pfd *, int, void *);
-
-static void
-nni_epdesc_cancel(nni_aio *aio, int rv)
-{
- nni_posix_epdesc *ed = nni_aio_get_prov_data(aio);
- nni_posix_pfd * pfd = NULL;
-
- NNI_ASSERT(rv != 0);
- nni_mtx_lock(&ed->mtx);
- if (nni_aio_list_active(aio)) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- }
- if ((ed->mode == NNI_EP_MODE_DIAL) && nni_list_empty(&ed->connectq) &&
- ((pfd = ed->pfd) != NULL)) {
- nni_posix_pfd_close(pfd);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-static void
-nni_epdesc_finish(nni_aio *aio, int rv, nni_posix_pfd *newpfd)
-{
- nni_posix_pipedesc *pd = NULL;
-
- // acceptq or connectq.
- nni_aio_list_remove(aio);
-
- if (rv != 0) {
- NNI_ASSERT(newpfd == NULL);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- NNI_ASSERT(newpfd != NULL);
- if ((rv = nni_posix_pipedesc_init(&pd, newpfd)) != 0) {
- nni_posix_pfd_fini(newpfd);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_set_output(aio, 0, pd);
- nni_aio_finish(aio, 0, 0);
-}
-
-static void
-nni_epdesc_doaccept(nni_posix_epdesc *ed)
-{
- nni_aio *aio;
-
- while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- int newfd;
- int fd;
- int rv;
- nni_posix_pfd *pfd;
-
- fd = nni_posix_pfd_fd(ed->pfd);
-
-#ifdef NNG_USE_ACCEPT4
- newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
- if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
- newfd = accept(fd, NULL, NULL);
- }
-#else
- newfd = accept(fd, NULL, NULL);
-#endif
- if (newfd < 0) {
- switch (errno) {
- case EAGAIN:
-#ifdef EWOULDBLOCK
-#if EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
-#endif
-#endif
- rv = nni_posix_pfd_arm(ed->pfd, POLLIN);
- if (rv != 0) {
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
- // Come back later...
- return;
- case ECONNABORTED:
- case ECONNRESET:
- // Eat them, they aren't interesting.
- continue;
- default:
- // Error this one, but keep moving to the next.
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
- close(newfd);
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
-
- nni_epdesc_finish(aio, 0, pfd);
- }
-}
-
-static void
-nni_epdesc_doclose(nni_posix_epdesc *ed)
-{
- nni_aio *aio;
-
- ed->closed = true;
- while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- nni_epdesc_finish(aio, NNG_ECLOSED, 0);
- }
- while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- nni_epdesc_finish(aio, NNG_ECLOSED, 0);
- }
-
- if (ed->pfd != NULL) {
-
- nni_posix_pfd_close(ed->pfd);
- }
-
- // clean up stale UNIX socket when closing the server.
- if (ed->ipcbound) {
- struct sockaddr_un *sun = (void *) &ed->locaddr;
- (void) unlink(sun->sun_path);
- }
-}
-
-static void
-nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_epdesc *ed = arg;
-
- NNI_ARG_UNUSED(pfd);
-
- nni_mtx_lock(&ed->mtx);
- if (events & POLLNVAL) {
- nni_epdesc_doclose(ed);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
-
- // Anything else will turn up in accept.
- nni_epdesc_doaccept(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-void
-nni_posix_epdesc_close(nni_posix_epdesc *ed)
-{
- nni_mtx_lock(&ed->mtx);
- nni_epdesc_doclose(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_listen(nni_posix_epdesc *ed)
-{
- int len;
- struct sockaddr_storage *ss;
- int rv;
- int fd;
- nni_posix_pfd * pfd;
-
- nni_mtx_lock(&ed->mtx);
-
- if (ed->started) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ESTATE);
- }
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ECLOSED);
- }
- if ((len = ed->loclen) == 0) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EADDRINVAL);
- }
-
- ss = &ed->locaddr;
- len = ed->loclen;
-
- if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- nni_mtx_unlock(&ed->mtx);
- return (nni_plat_errno(errno));
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
-#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL)
- if (ss->ss_family != AF_UNIX) {
- int on = 1;
- // If for some reason this doesn't work, it's probably ok.
- // Second bind will fail.
- (void) setsockopt(
- fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
- }
-#endif
-
- if (bind(fd, (struct sockaddr *) ss, len) < 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
- // For UNIX domain sockets, optionally set the permission bits.
- // This is done after the bind and before listen, and on the file
- // rather than the file descriptor.
- // Experiments have shown that chmod() works correctly, provided that
- // it is done *before* the listen() operation, whereas fchmod seems to
- // have no impact. This behavior was observed on both macOS and Linux.
- // YMMV on other platforms.
- if (ss->ss_family == AF_UNIX) {
- ed->ipcbound = true;
- if (ed->perms != 0) {
- struct sockaddr_un *sun = (void *) ss;
- mode_t perms = ed->perms & ~(S_IFMT);
- if ((rv = chmod(sun->sun_path, perms)) != 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
- }
- }
-
- // Listen -- 128 depth is probably sufficient. If it isn't, other
- // bad things are going to happen.
- if (listen(fd, 128) != 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
- nni_posix_pfd_set_cb(pfd, nni_epdesc_accept_cb, ed);
-
- ed->pfd = pfd;
- ed->started = true;
- nni_mtx_unlock(&ed->mtx);
-
- return (0);
-}
-
-void
-nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
-{
- int rv;
-
- // Accept is simpler than the connect case. With accept we just
- // need to wait for the socket to be readable to indicate an incoming
- // connection is ready for us. There isn't anything else for us to
- // do really, as that will have been done in listen.
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
-
- if (!ed->started) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ESTATE);
- return;
- }
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
- if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&ed->acceptq, aio);
- if (nni_list_first(&ed->acceptq) == aio) {
- nni_epdesc_doaccept(ed);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_sockname(nni_posix_epdesc *ed, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = -1;
-
- nni_mtx_lock(&ed->mtx);
- if (ed->pfd != NULL) {
- fd = nni_posix_pfd_fd(ed->pfd);
- }
- nni_mtx_unlock(&ed->mtx);
-
- if (getsockname(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-static void
-nni_epdesc_connect_start(nni_posix_epdesc *ed)
-{
- nni_posix_pfd *pfd;
- int fd;
- int rv;
- nni_aio * aio;
-
-loop:
- if ((aio = nni_list_first(&ed->connectq)) == NULL) {
- return;
- }
-
- NNI_ASSERT(ed->pfd == NULL);
- if (ed->closed) {
- nni_epdesc_finish(aio, NNG_ECLOSED, NULL);
- goto loop;
- }
- ed->started = true;
-
- if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- goto loop;
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- (void) close(fd);
- nni_epdesc_finish(aio, rv, NULL);
- goto loop;
- }
- // Possibly bind.
- if ((ed->loclen != 0) &&
- (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) {
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
-
- if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) {
- // Immediate connect, cool! This probably only happens on
- // loopback, and probably not on every platform.
- nni_epdesc_finish(aio, 0, pfd);
- goto loop;
- }
-
- if (errno != EINPROGRESS) {
- // Some immediate failure occurred.
- if (errno == ENOENT) { // For UNIX domain sockets
- rv = NNG_ECONNREFUSED;
- } else {
- rv = nni_plat_errno(errno);
- }
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
- nni_posix_pfd_set_cb(pfd, nni_epdesc_connect_cb, ed);
- if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
- ed->pfd = pfd;
- // all done... wait for this to signal via callback
-}
-
-void
-nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
- if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- ed->started = true;
- nni_list_append(&ed->connectq, aio);
- if (nni_list_first(&ed->connectq) == aio) {
- // If there was a stale pfd (probably from an aborted or
- // canceled connect attempt), discard it so we start fresh.
- if (ed->pfd != NULL) {
- nni_posix_pfd_fini(ed->pfd);
- ed->pfd = NULL;
- }
- nni_epdesc_connect_start(ed);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-static void
-nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_epdesc *ed = arg;
- nni_aio * aio;
- socklen_t sz;
- int rv;
- int fd;
-
- nni_mtx_lock(&ed->mtx);
- if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) ||
- (pfd != ed->pfd)) {
- // Spurious completion. Just ignore it.
- nni_mtx_unlock(&ed->mtx);
- return;
- }
-
- fd = nni_posix_pfd_fd(pfd);
- sz = sizeof(rv);
-
- if ((events & POLLNVAL) != 0) {
- rv = EBADF;
-
- } else if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
- rv = errno;
- }
-
- switch (rv) {
- case 0:
- // Good connect!
- ed->pfd = NULL;
- nni_epdesc_finish(aio, 0, pfd);
- break;
- case EINPROGRESS: // still connecting... come back later
- nni_mtx_unlock(&ed->mtx);
- return;
- default:
- ed->pfd = NULL;
- nni_epdesc_finish(aio, nni_plat_errno(rv), NULL);
- nni_posix_pfd_fini(pfd);
- break;
- }
-
- // Start another connect running, if any is waiting.
- nni_epdesc_connect_start(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode)
-{
- nni_posix_epdesc *ed;
-
- if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- nni_mtx_init(&ed->mtx);
-
- ed->pfd = NULL;
- ed->closed = false;
- ed->started = false;
- ed->perms = 0; // zero means use default (no change)
- ed->mode = mode;
-
- nni_aio_list_init(&ed->connectq);
- nni_aio_list_init(&ed->acceptq);
- *edp = ed;
- return (0);
-}
-
-void
-nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, size_t len)
-{
- if ((len < 1) || (len > sizeof(struct sockaddr_storage))) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- memcpy(&ed->locaddr, sa, len);
- ed->loclen = len;
- nni_mtx_unlock(&ed->mtx);
-}
-
-void
-nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, size_t len)
-{
- if ((len < 1) || (len > sizeof(struct sockaddr_storage))) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- memcpy(&ed->remaddr, sa, len);
- ed->remlen = len;
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_set_permissions(nni_posix_epdesc *ed, mode_t mode)
-{
- nni_mtx_lock(&ed->mtx);
- if (ed->mode != NNI_EP_MODE_LISTEN) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ENOTSUP);
- }
- if (ed->started) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EBUSY);
- }
- if ((mode & S_IFMT) != 0) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EINVAL);
- }
- ed->perms = mode | S_IFSOCK; // we set IFSOCK to ensure non-zero
- nni_mtx_unlock(&ed->mtx);
- return (0);
-}
-
-void
-nni_posix_epdesc_fini(nni_posix_epdesc *ed)
-{
- nni_posix_pfd *pfd;
-
- nni_mtx_lock(&ed->mtx);
- nni_epdesc_doclose(ed);
- pfd = ed->pfd;
- nni_mtx_unlock(&ed->mtx);
-
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
- nni_mtx_fini(&ed->mtx);
- NNI_FREE_STRUCT(ed);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
deleted file mode 100644
index 5ba3c8fb..00000000
--- a/src/platform/posix/posix_ipc.c
+++ /dev/null
@@ -1,250 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-#ifdef SOCK_CLOEXEC
-#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define NNI_STREAM_SOCKTYPE SOCK_STREAM
-#endif
-
-// Solaris/SunOS systems define this, which collides with our symbol
-// names. Just undefine it now.
-#ifdef sun
-#undef sun
-#endif
-
-static int nni_plat_ipc_remove_stale(const char *path);
-
-// We alias nni_posix_pipedesc to nni_plat_ipc_pipe.
-// We alias nni_posix_epdesc to nni_plat_ipc_ep.
-
-int
-nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode)
-{
- nni_posix_epdesc * ed;
- int rv;
- struct sockaddr_un sun;
-
- if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) {
- return (rv);
- }
- switch (mode) {
- case NNI_EP_MODE_DIAL:
- nni_posix_nn2sockaddr(&sun, sa);
- nni_posix_epdesc_set_remote(ed, &sun, sizeof(sun));
- break;
- case NNI_EP_MODE_LISTEN:
-
- if ((rv = nni_plat_ipc_remove_stale(sa->s_ipc.sa_path)) != 0) {
- return (rv);
- }
-
- nni_posix_nn2sockaddr(&sun, sa);
- nni_posix_epdesc_set_local(ed, &sun, sizeof(sun));
- break;
- default:
- nni_posix_epdesc_fini(ed);
- return (NNG_EINVAL);
- }
-
- *epp = (void *) ed;
- return (0);
-}
-
-void
-nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc_fini((void *) ep);
-}
-
-void
-nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc_close((void *) ep);
-}
-
-int
-nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits)
-{
- return (nni_posix_epdesc_set_permissions((void *) ep, (mode_t) bits));
-}
-
-int
-nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *attr)
-{
- NNI_ARG_UNUSED(ep);
- NNI_ARG_UNUSED(attr);
- return (NNG_ENOTSUP);
-}
-
-// UNIX DOMAIN SOCKETS -- these have names in the file namespace.
-// We are going to check to see if there was a name already there.
-// If there was, and nothing is listening (ECONNREFUSED), then we
-// will just try to cleanup the old socket. Note that this is not
-// perfect in all scenarios, so use this with caution.
-static int
-nni_plat_ipc_remove_stale(const char *path)
-{
- int fd;
- struct sockaddr_un sun;
- size_t sz;
-
- sun.sun_family = AF_UNIX;
- sz = sizeof(sun.sun_path);
-
- if (nni_strlcpy(sun.sun_path, path, sz) >= sz) {
- return (NNG_EADDRINVAL);
- }
-
- if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
-
- // There is an assumption here that connect() returns immediately
- // (even when non-blocking) when a server is absent. This seems
- // to be true for the platforms we've tried. If it doesn't work,
- // then the cleanup will fail. As this is supposed to be an
- // exceptional case, don't worry.
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if (connect(fd, (void *) &sun, sizeof(sun)) < 0) {
- if (errno == ECONNREFUSED) {
- (void) unlink(path);
- }
- }
- (void) close(fd);
- return (0);
-}
-
-int
-nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc *ed = (void *) ep;
-
- return (nni_posix_epdesc_listen(ed));
-}
-
-void
-nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_connect((void *) ep, aio);
-}
-
-void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_accept((void *) ep, aio);
-}
-
-void
-nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p)
-{
- nni_posix_pipedesc_fini((void *) p);
-}
-
-void
-nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p)
-{
- nni_posix_pipedesc_close((void *) p);
-}
-
-void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_send((void *) p, aio);
-}
-
-void
-nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_recv((void *) p, aio);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *p, uint64_t *uid)
-{
- int rv;
- uint64_t ignore;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, uid, &ignore, &ignore, &ignore)) != 0) {
- return (rv);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *p, uint64_t *gid)
-{
- int rv;
- uint64_t ignore;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, gid, &ignore, &ignore)) != 0) {
- return (rv);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *p, uint64_t *zid)
-{
- int rv;
- uint64_t ignore;
- uint64_t id;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, &ignore, &ignore, &id)) != 0) {
- return (rv);
- }
- if (id == (uint64_t) -1) {
- // NB: -1 is not a legal zone id (illumos/Solaris)
- return (NNG_ENOTSUP);
- }
- *zid = id;
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *p, uint64_t *pid)
-{
- int rv;
- uint64_t ignore;
- uint64_t id;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, &ignore, &id, &ignore)) != 0) {
- return (rv);
- }
- if (id == (uint64_t) -1) {
- // NB: -1 is not a legal process id
- return (NNG_ENOTSUP);
- }
- *pid = id;
- return (0);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h
new file mode 100644
index 00000000..caf7ed7b
--- /dev/null
+++ b/src/platform/posix/posix_ipc.h
@@ -0,0 +1,48 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+#include <sys/types.h> // For mode_t
+
+struct nni_ipc_conn {
+ nni_posix_pfd * pfd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
+ nni_aio * dial_aio;
+ nni_ipc_dialer *dialer;
+ nni_reap_item reap;
+};
+
+struct nni_ipc_dialer {
+ nni_list connq; // pending connections
+ bool closed;
+ nni_mtx mtx;
+};
+
+struct nni_ipc_listener {
+ nni_posix_pfd *pfd;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ char * path;
+ mode_t perms;
+ nni_mtx mtx;
+};
+
+extern int nni_posix_ipc_conn_init(nni_ipc_conn **, nni_posix_pfd *);
+extern void nni_posix_ipc_conn_start(nni_ipc_conn *);
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
new file mode 100644
index 00000000..2b46fb12
--- /dev/null
+++ b/src/platform/posix/posix_ipcconn.c
@@ -0,0 +1,494 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <unistd.h>
+#if defined(NNG_HAVE_GETPEERUCRED)
+#include <ucred.h>
+#elif defined(NNG_HAVE_LOCALPEERCRED)
+#include <sys/ucred.h>
+#endif
+
+#ifdef NNG_HAVE_ALLOCA
+#include <alloca.h>
+#endif
+
+#ifndef MSG_NOSIGNAL
+#define MSG_NOSIGNAL 0
+#endif
+
+#include "posix_ipc.h"
+
+static void
+ipc_conn_dowrite(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->writeq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov * aiov;
+ struct msghdr hdr;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ memset(&hdr, 0, sizeof(hdr));
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len > 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ hdr.msg_iovlen = niov;
+ hdr.msg_iov = iovec;
+
+ if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ nni_aio_bump_count(aio, n);
+ // We completed the entire operation on this aio.
+ // (Sendmsg never returns a partial result.)
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+static void
+ipc_conn_doread(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->readq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov *aiov;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len != 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ if (n == 0) {
+ // No bytes indicates a closed descriptor.
+ // This implicitly completes this (all!) aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ continue;
+ }
+
+ nni_aio_bump_count(aio, n);
+
+ // We completed the entire operation on this aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+void
+nni_ipc_conn_close(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ nni_aio *aio;
+ c->closed = true;
+ while (((aio = nni_list_first(&c->readq)) != NULL) ||
+ ((aio = nni_list_first(&c->writeq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_posix_pfd_close(c->pfd);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_ipc_conn *c = arg;
+
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
+ nni_ipc_conn_close(c);
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (events & POLLIN) {
+ ipc_conn_doread(c);
+ }
+ if (events & POLLOUT) {
+ ipc_conn_dowrite(c);
+ }
+ events = 0;
+ if (!nni_list_empty(&c->writeq)) {
+ events |= POLLOUT;
+ }
+ if (!nni_list_empty(&c->readq)) {
+ events |= POLLIN;
+ }
+ if ((!c->closed) && (events != 0)) {
+ nni_posix_pfd_arm(pfd, events);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&c->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
+{
+
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->writeq, aio);
+
+ if (nni_list_first(&c->writeq) == aio) {
+ ipc_conn_dowrite(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->writeq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLOUT);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->readq, aio);
+
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
+ if (nni_list_first(&c->readq) == aio) {
+ ipc_conn_doread(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->readq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLIN);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid,
+ uint64_t *prid, uint64_t *znid)
+{
+ int fd = nni_posix_pfd_fd(c->pfd);
+#if defined(NNG_HAVE_GETPEEREID)
+ uid_t uid;
+ gid_t gid;
+
+ if (getpeereid(fd, &uid, &gid) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = uid;
+ *egid = gid;
+ *prid = (uint64_t) -1;
+ *znid = (uint64_t) -1;
+ return (0);
+#elif defined(NNG_HAVE_GETPEERUCRED)
+ ucred_t *ucp = NULL;
+ if (getpeerucred(fd, &ucp) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = ucred_geteuid(ucp);
+ *egid = ucred_getegid(ucp);
+ *prid = ucred_getpid(ucp);
+ *znid = ucred_getzoneid(ucp);
+ ucred_free(ucp);
+ return (0);
+#elif defined(NNG_HAVE_SOPEERCRED)
+ struct ucred uc;
+ socklen_t len = sizeof(uc);
+ if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = uc.uid;
+ *egid = uc.gid;
+ *prid = uc.pid;
+ *znid = (uint64_t) -1;
+ return (0);
+#elif defined(NNG_HAVE_LOCALPEERCRED)
+ struct xucred xu;
+ socklen_t len = sizeof(xu);
+ if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = xu.cr_uid;
+ *egid = xu.cr_gid;
+ *prid = (uint64_t) -1;
+ *znid = (uint64_t) -1;
+#if defined(LOCAL_PEERPID) // present (undocumented) on macOS
+ {
+ pid_t pid;
+ if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERPID, &pid, &len) ==
+ 0) {
+ *prid = (uint64_t) pid;
+ }
+ }
+#endif // LOCAL_PEERPID
+ return (0);
+#else
+ if (fd < 0) {
+ return (NNG_ECLOSED);
+ }
+ NNI_ARG_UNUSED(euid);
+ NNI_ARG_UNUSED(egid);
+ NNI_ARG_UNUSED(prid);
+ NNI_ARG_UNUSED(znid);
+ return (NNG_ENOTSUP);
+#endif
+}
+int
+nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *uid)
+{
+ int rv;
+ uint64_t ignore;
+
+ if ((rv = ipc_conn_peerid(c, uid, &ignore, &ignore, &ignore)) != 0) {
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *gid)
+{
+ int rv;
+ uint64_t ignore;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, gid, &ignore, &ignore)) != 0) {
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *zid)
+{
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal zone id (illumos/Solaris)
+ return (NNG_ENOTSUP);
+ }
+ *zid = id;
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid)
+{
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal process id
+ return (NNG_ENOTSUP);
+ }
+ *pid = id;
+ return (0);
+}
+
+int
+nni_posix_ipc_conn_init(nni_ipc_conn **cp, nni_posix_pfd *pfd)
+{
+ nni_ipc_conn *c;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ c->closed = false;
+ c->pfd = pfd;
+
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->readq);
+ nni_aio_list_init(&c->writeq);
+
+ *cp = c;
+ return (0);
+}
+
+void
+nni_posix_ipc_conn_start(nni_ipc_conn *c)
+{
+ nni_posix_pfd_set_cb(c->pfd, ipc_conn_cb, c);
+}
+
+void
+nni_ipc_conn_fini(nni_ipc_conn *c)
+{
+ nni_ipc_conn_close(c);
+ nni_posix_pfd_fini(c->pfd);
+ nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+ nni_mtx_fini(&c->mtx);
+
+ NNI_FREE_STRUCT(c);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c
new file mode 100644
index 00000000..13732911
--- /dev/null
+++ b/src/platform/posix/posix_ipcdial.c
@@ -0,0 +1,238 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_ipc.h"
+
+// Dialer stuff.
+int
+nni_ipc_dialer_init(nni_ipc_dialer **dp)
+{
+ nni_ipc_dialer *d;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&d->mtx);
+ d->closed = false;
+ nni_aio_list_init(&d->connq);
+ *dp = d;
+ return (0);
+}
+
+void
+nni_ipc_dialer_close(nni_ipc_dialer *d)
+{
+ nni_mtx_lock(&d->mtx);
+ if (!d->closed) {
+ nni_aio *aio;
+ d->closed = true;
+ while ((aio = nni_list_first(&d->connq)) != NULL) {
+ nni_ipc_conn *c;
+ nni_list_remove(&d->connq, aio);
+ if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_ipc_conn_close(c);
+ nni_reap(
+ &c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ }
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_ipc_dialer_fini(nni_ipc_dialer *d)
+{
+ nni_ipc_dialer_close(d);
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+static void
+ipc_dialer_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_dialer *d = nni_aio_get_prov_data(aio);
+ nni_ipc_conn * c;
+
+ nni_mtx_lock(&d->mtx);
+ if ((!nni_aio_list_active(aio)) ||
+ ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ nni_aio_finish_error(aio, rv);
+ nni_ipc_conn_fini(c);
+}
+
+static void
+ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
+{
+ nni_ipc_conn * c = arg;
+ nni_ipc_dialer *d = c->dialer;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&d->mtx);
+ aio = c->dial_aio;
+ if ((aio == NULL) || (!nni_aio_list_active(aio))) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ if (ev & POLLNVAL) {
+ rv = EBADF;
+
+ } else {
+ socklen_t sz = sizeof(int);
+ int fd = nni_posix_pfd_fd(pfd);
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ rv = errno;
+ }
+ if (rv == EINPROGRESS) {
+ // Connection still in progress, come back
+ // later.
+ nni_mtx_unlock(&d->mtx);
+ return;
+ } else if (rv != 0) {
+ rv = nni_plat_errno(rv);
+ }
+ }
+
+ c->dial_aio = NULL;
+ nni_aio_list_remove(aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ if (rv != 0) {
+ nni_ipc_conn_close(c);
+ nni_ipc_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+// We don't give local address binding support. Outbound dialers always
+// get an ephemeral port.
+void
+nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+{
+ nni_ipc_conn * c;
+ nni_posix_pfd * pfd = NULL;
+ struct sockaddr_storage ss;
+ size_t sslen;
+ int fd;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ (ss.ss_family != AF_UNIX)) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
+
+ if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ nni_aio_finish_error(aio, nni_plat_errno(errno));
+ return;
+ }
+
+ // This arranges for the fd to be in nonblocking mode, and adds the
+ // pollfd to the list.
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ c->dialer = d;
+ nni_posix_pfd_set_cb(pfd, ipc_dialer_cb, c);
+
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ rv = NNG_ECLOSED;
+ goto error;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_dialer_cancel, d)) != 0) {
+ goto error;
+ }
+ if ((rv = connect(fd, (void *) &ss, sslen)) != 0) {
+ if (errno != EINPROGRESS) {
+ if (errno == ENOENT) {
+ // No socket present means nobody listening.
+ rv = NNG_ECONNREFUSED;
+ } else {
+ rv = nni_plat_errno(errno);
+ }
+ goto error;
+ }
+ // Asynchronous connect.
+ if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
+ goto error;
+ }
+ c->dial_aio = aio;
+ nni_aio_set_prov_extra(aio, 0, c);
+ nni_list_append(&d->connq, aio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ // Immediate connect, cool! This probably only happens
+ // on loopback, and probably not on every platform.
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ return;
+
+error:
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_reap(&c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ nni_aio_finish_error(aio, rv);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c
new file mode 100644
index 00000000..4e33a08f
--- /dev/null
+++ b/src/platform/posix/posix_ipclisten.c
@@ -0,0 +1,373 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_ipc.h"
+
+int
+nni_ipc_listener_init(nni_ipc_listener **lp)
+{
+ nni_ipc_listener *l;
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&l->mtx);
+
+ l->pfd = NULL;
+ l->closed = false;
+ l->started = false;
+ l->perms = 0;
+
+ nni_aio_list_init(&l->acceptq);
+ *lp = l;
+ return (0);
+}
+
+static void
+ipc_listener_doclose(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+ char * path;
+
+ l->closed = true;
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+
+ if (l->pfd != NULL) {
+ nni_posix_pfd_close(l->pfd);
+ }
+ if (l->started && ((path = l->path) != NULL)) {
+ l->path = NULL;
+ (void) unlink(path);
+ nni_strfree(path);
+ }
+}
+
+void
+nni_ipc_listener_close(nni_ipc_listener *l)
+{
+ nni_mtx_lock(&l->mtx);
+ ipc_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ipc_listener_doaccept(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ int newfd;
+ int fd;
+ int rv;
+ nni_posix_pfd *pfd;
+ nni_ipc_conn * c;
+
+ fd = nni_posix_pfd_fd(l->pfd);
+
+#ifdef NNG_USE_ACCEPT4
+ newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
+ if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
+ newfd = accept(fd, NULL, NULL);
+ }
+#else
+ newfd = accept(fd, NULL, NULL);
+#endif
+ if (newfd < 0) {
+ switch (errno) {
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ rv = nni_posix_pfd_arm(l->pfd, POLLIN);
+ if (rv != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ // Come back later...
+ return;
+ case ECONNABORTED:
+ case ECONNRESET:
+ // Eat them, they aren't interesting.
+ continue;
+ default:
+ // Error this one, but keep moving to the next.
+ rv = nni_plat_errno(errno);
+ NNI_ASSERT(rv != 0);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ }
+
+ if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ close(newfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ nni_aio_list_remove(aio);
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+static void
+ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_ipc_listener *l = arg;
+ NNI_ARG_UNUSED(pfd);
+
+ nni_mtx_lock(&l->mtx);
+ if (events & POLLNVAL) {
+ ipc_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+
+ // Anything else will turn up in accept.
+ ipc_listener_doaccept(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ipc_listener_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_listener *l = nni_aio_get_prov_data(aio);
+
+ // This is dead easy, because we'll ignore the completion if there
+ // isn't anything to do the accept on!
+ NNI_ASSERT(rv != 0);
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+static int
+ipc_remove_stale(const char *path)
+{
+ int fd;
+ struct sockaddr_un sun;
+ size_t sz;
+
+ sun.sun_family = AF_UNIX;
+ sz = sizeof(sun.sun_path);
+
+ if (nni_strlcpy(sun.sun_path, path, sz) >= sz) {
+ return (NNG_EADDRINVAL);
+ }
+
+ if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ // There is an assumption here that connect() returns immediately
+ // (even when non-blocking) when a server is absent. This seems
+ // to be true for the platforms we've tried. If it doesn't work,
+ // then the cleanup will fail. As this is supposed to be an
+ // exceptional case, don't worry.
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ if (connect(fd, (void *) &sun, sizeof(sun)) < 0) {
+ if (errno == ECONNREFUSED) {
+ (void) unlink(path);
+ }
+ }
+ (void) close(fd);
+ return (0);
+}
+
+int
+nni_ipc_listener_set_permissions(nni_ipc_listener *l, int mode)
+{
+ if ((mode & S_IFMT) != 0) {
+ return (NNG_EINVAL);
+ }
+ mode |= S_IFSOCK; // set IFSOCK to ensure non-zero
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ l->perms = mode;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+int
+nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *sd)
+{
+ NNI_ARG_UNUSED(l);
+ NNI_ARG_UNUSED(sd);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
+{
+ socklen_t len;
+ struct sockaddr_storage ss;
+ int rv;
+ int fd;
+ nni_posix_pfd * pfd;
+ char * path;
+
+ if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ (ss.ss_family != AF_UNIX)) {
+ return (NNG_EADDRINVAL);
+ }
+
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ESTATE);
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ path = nni_strdup(sa->s_ipc.sa_path);
+ if (path == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+
+ if (((rv = ipc_remove_stale(path)) != 0) ||
+ ((rv = nni_posix_pfd_init(&pfd, fd)) != 0)) {
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ (void) close(fd);
+ return (rv);
+ }
+
+ if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ if (((l->perms != 0) && (chmod(path, l->perms & ~S_IFMT) != 0)) ||
+ (listen(fd, 128) != 0)) {
+ rv = nni_plat_errno(errno);
+ (void) unlink(path);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ nni_posix_pfd_set_cb(pfd, ipc_listener_cb, l);
+
+ l->pfd = pfd;
+ l->started = true;
+ l->path = path;
+ nni_mtx_unlock(&l->mtx);
+
+ return (0);
+}
+
+void
+nni_ipc_listener_fini(nni_ipc_listener *l)
+{
+ nni_posix_pfd *pfd;
+
+ nni_mtx_lock(&l->mtx);
+ ipc_listener_doclose(l);
+ pfd = l->pfd;
+ nni_mtx_unlock(&l->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+void
+nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
+{
+ int rv;
+
+ // Accept is simpler than the connect case. With accept we just
+ // need to wait for the socket to be readable to indicate an incoming
+ // connection is ready for us. There isn't anything else for us to
+ // do really, as that will have been done in listen.
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_listener_cancel, l)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&l->acceptq, aio);
+ if (nni_list_first(&l->acceptq) == aio) {
+ ipc_listener_doaccept(l);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
deleted file mode 100644
index b11036ea..00000000
--- a/src/platform/posix/posix_pipedesc.c
+++ /dev/null
@@ -1,503 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_pollq.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <poll.h>
-#include <stdbool.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <unistd.h>
-#if defined(NNG_HAVE_GETPEERUCRED)
-#include <ucred.h>
-#elif defined(NNG_HAVE_LOCALPEERCRED)
-#include <sys/ucred.h>
-#include <sys/un.h>
-#endif
-#ifdef NNG_HAVE_ALLOCA
-#include <alloca.h>
-#endif
-
-// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open
-// file descriptor for TCP socket, etc.) This contains the list of pending
-// aios for that underlying socket, as well as the socket itself.
-struct nni_posix_pipedesc {
- nni_posix_pfd *pfd;
- nni_list readq;
- nni_list writeq;
- bool closed;
- nni_mtx mtx;
-};
-
-static void
-nni_posix_pipedesc_finish(nni_aio *aio, int rv)
-{
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, nni_aio_count(aio));
-}
-
-static void
-nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
-
- pd->closed = true;
- while ((aio = nni_list_first(&pd->readq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
- while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
- nni_posix_pfd_close(pd->pfd);
-}
-
-static void
-nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
- int fd;
-
- fd = nni_posix_pfd_fd(pd->pfd);
- if ((fd < 0) || (pd->closed)) {
- return;
- }
-
- while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- unsigned i;
- int n;
- int niov;
- unsigned naiov;
- nni_iov * aiov;
- struct msghdr hdr;
-#ifdef NNG_HAVE_ALLOCA
- struct iovec *iovec;
-#else
- struct iovec iovec[16];
-#endif
-
- memset(&hdr, 0, sizeof(hdr));
-
- nni_aio_get_iov(aio, &naiov, &aiov);
-
-#ifdef NNG_HAVE_ALLOCA
- if (naiov > 64) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
- iovec = alloca(naiov * sizeof(*iovec));
-#else
- if (naiov > NNI_NUM_ELEMENTS(iovec)) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
-#endif
-
- for (niov = 0, i = 0; i < naiov; i++) {
- if (aiov[i].iov_len > 0) {
- iovec[niov].iov_len = aiov[i].iov_len;
- iovec[niov].iov_base = aiov[i].iov_buf;
- niov++;
- }
- }
-
-#ifndef MSG_NOSIGNAL
-#define MSG_NOSIGNAL 0
-#endif
-
- hdr.msg_iovlen = niov;
- hdr.msg_iov = iovec;
-
- if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
- switch (errno) {
- case EINTR:
- continue;
- case EAGAIN:
-#ifdef EWOULDBLOCK
-#if EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
-#endif
-#endif
- return;
- default:
- nni_posix_pipedesc_finish(
- aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
- }
- }
-
- nni_aio_bump_count(aio, n);
- // We completed the entire operation on this aioq.
- // (Sendmsg never returns a partial result.)
- nni_posix_pipedesc_finish(aio, 0);
-
- // Go back to start of loop to see if there is another
- // aio ready for us to process.
- }
-}
-
-static void
-nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
- int fd;
-
- fd = nni_posix_pfd_fd(pd->pfd);
- if ((fd < 0) || (pd->closed)) {
- return;
- }
-
- while ((aio = nni_list_first(&pd->readq)) != NULL) {
- unsigned i;
- int n;
- int niov;
- unsigned naiov;
- nni_iov *aiov;
-#ifdef NNG_HAVE_ALLOCA
- struct iovec *iovec;
-#else
- struct iovec iovec[16];
-#endif
-
- nni_aio_get_iov(aio, &naiov, &aiov);
-#ifdef NNG_HAVE_ALLOCA
- if (naiov > 64) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
- iovec = alloca(naiov * sizeof(*iovec));
-#else
- if (naiov > NNI_NUM_ELEMENTS(iovec)) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
-#endif
- for (niov = 0, i = 0; i < naiov; i++) {
- if (aiov[i].iov_len != 0) {
- iovec[niov].iov_len = aiov[i].iov_len;
- iovec[niov].iov_base = aiov[i].iov_buf;
- niov++;
- }
- }
-
- if ((n = readv(fd, iovec, niov)) < 0) {
- switch (errno) {
- case EINTR:
- continue;
- case EAGAIN:
- return;
- default:
- nni_posix_pipedesc_finish(
- aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
- }
- }
-
- if (n == 0) {
- // No bytes indicates a closed descriptor.
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- nni_posix_pipedesc_doclose(pd);
- return;
- }
-
- nni_aio_bump_count(aio, n);
-
- // We completed the entire operation on this aioq.
- nni_posix_pipedesc_finish(aio, 0);
-
- // Go back to start of loop to see if there is another
- // aio ready for us to process.
- }
-}
-
-static void
-nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_pipedesc *pd = arg;
-
- nni_mtx_lock(&pd->mtx);
- if (events & POLLIN) {
- nni_posix_pipedesc_doread(pd);
- }
- if (events & POLLOUT) {
- nni_posix_pipedesc_dowrite(pd);
- }
- if (events & (POLLHUP | POLLERR | POLLNVAL)) {
- nni_posix_pipedesc_doclose(pd);
- } else {
- events = 0;
- if (!nni_list_empty(&pd->writeq)) {
- events |= POLLOUT;
- }
- if (!nni_list_empty(&pd->readq)) {
- events |= POLLIN;
- }
- if ((!pd->closed) && (events != 0)) {
- nni_posix_pfd_arm(pfd, events);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
-{
- // NB: Events may still occur.
- nni_mtx_lock(&pd->mtx);
- nni_posix_pipedesc_doclose(pd);
- nni_mtx_unlock(&pd->mtx);
-}
-
-static void
-nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
-{
- nni_posix_pipedesc *pd = nni_aio_get_prov_data(aio);
-
- nni_mtx_lock(&pd->mtx);
- if (nni_aio_list_active(aio)) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&pd->mtx);
-
- if (pd->closed) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
-
- if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&pd->readq, aio);
-
- // If we are only job on the list, go ahead and try to do an
- // immediate transfer. This allows for faster completions in
- // many cases. We also need not arm a list if it was already
- // armed.
- if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pipedesc_doread(pd);
- // If we are still the first thing on the list, that
- // means we didn't finish the job, so arm the poller to
- // complete us.
- if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pfd_arm(pd->pfd, POLLIN);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&pd->mtx);
-
- if (pd->closed) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
-
- if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&pd->writeq, aio);
-
- if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pipedesc_dowrite(pd);
- // If we are still the first thing on the list, that
- // means we didn't finish the job, so arm the poller to
- // complete us.
- if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pfd_arm(pd->pfd, POLLOUT);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-int
-nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (getpeername(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (getsockname(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay)
-{
- int val = nodelay ? 1 : 0;
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-int
-nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep)
-{
- int val = keep ? 1 : 0;
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-int
-nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
- uint64_t *egid, uint64_t *prid, uint64_t *znid)
-{
- int fd = nni_posix_pfd_fd(pd->pfd);
-#if defined(NNG_HAVE_GETPEEREID)
- uid_t uid;
- gid_t gid;
-
- if (getpeereid(fd, &uid, &gid) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = uid;
- *egid = gid;
- *prid = (uint64_t) -1;
- *znid = (uint64_t) -1;
- return (0);
-#elif defined(NNG_HAVE_GETPEERUCRED)
- ucred_t *ucp = NULL;
- if (getpeerucred(fd, &ucp) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = ucred_geteuid(ucp);
- *egid = ucred_getegid(ucp);
- *prid = ucred_getpid(ucp);
- *znid = ucred_getzoneid(ucp);
- ucred_free(ucp);
- return (0);
-#elif defined(NNG_HAVE_SOPEERCRED)
- struct ucred uc;
- socklen_t len = sizeof(uc);
- if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = uc.uid;
- *egid = uc.gid;
- *prid = uc.pid;
- *znid = (uint64_t) -1;
- return (0);
-#elif defined(NNG_HAVE_LOCALPEERCRED)
- struct xucred xu;
- socklen_t len = sizeof(xu);
- if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = xu.cr_uid;
- *egid = xu.cr_gid;
- *prid = (uint64_t) -1; // XXX: macOS has undocumented
- // LOCAL_PEERPID...
- *znid = (uint64_t) -1;
- return (0);
-#else
- if (fd < 0) {
- return (NNG_ECLOSED);
- }
- NNI_ARG_UNUSED(euid);
- NNI_ARG_UNUSED(egid);
- NNI_ARG_UNUSED(prid);
- NNI_ARG_UNUSED(znid);
- return (NNG_ENOTSUP);
-#endif
-}
-
-int
-nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd)
-{
- nni_posix_pipedesc *pd;
-
- if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- pd->closed = false;
- pd->pfd = pfd;
-
- nni_mtx_init(&pd->mtx);
- nni_aio_list_init(&pd->readq);
- nni_aio_list_init(&pd->writeq);
-
- nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd);
-
- *pdp = pd;
- return (0);
-}
-
-void
-nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
-{
- nni_posix_pipedesc_close(pd);
- nni_posix_pfd_fini(pd->pfd);
- pd->pfd = NULL;
- nni_mtx_fini(&pd->mtx);
-
- NNI_FREE_STRUCT(pd);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index 63d858c2..f78d0b6b 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_USE_POSIX_RESOLV_GAI
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <ctype.h>
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index 5cd1887a..c0352c55 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index 7f627361..21f6ecfe 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index cdcbecd9..8c186885 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
@@ -151,6 +150,7 @@ static void
tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_tcp_listener *l = arg;
+ NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&l->mtx);
if (events & POLLNVAL) {
@@ -158,7 +158,6 @@ tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
nni_mtx_unlock(&l->mtx);
return;
}
- NNI_ASSERT(pfd == l->pfd);
// Anything else will turn up in accept.
tcp_listener_doaccept(l);
@@ -212,7 +211,7 @@ nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa)
if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
nni_mtx_unlock(&l->mtx);
- nni_posix_pfd_fini(pfd);
+ (void) close(fd);
return (rv);
}
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 759bdb96..873a02a1 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include "platform/posix/posix_pollq.h"
#include <errno.h>
@@ -177,7 +176,7 @@ static void
nni_posix_udp_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_plat_udp *udp = arg;
- NNI_ASSERT(pfd == udp->udp_pfd);
+ NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&udp->udp_mtx);
if (events & POLLIN) {
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index 93e45423..b8741b52 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -127,9 +127,8 @@ extern void nni_win_udp_sysfini(void);
extern int nni_win_resolv_sysinit(void);
extern void nni_win_resolv_sysfini(void);
-extern int nni_win_io_init(nni_win_io *, HANDLE, nni_win_io_cb, void *);
+extern int nni_win_io_init(nni_win_io *, nni_win_io_cb, void *);
extern void nni_win_io_fini(nni_win_io *);
-extern void nni_win_io_cancel(nni_win_io *);
extern int nni_win_io_register(HANDLE);
diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c
index 1179b603..50b22343 100644
--- a/src/platform/windows/win_io.c
+++ b/src/platform/windows/win_io.c
@@ -61,14 +61,13 @@ nni_win_io_register(HANDLE h)
}
int
-nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr)
+nni_win_io_init(nni_win_io *io, nni_win_io_cb cb, void *ptr)
{
ZeroMemory(&io->olpd, sizeof(io->olpd));
io->cb = cb;
io->ptr = ptr;
io->aio = NULL;
- io->f = f;
io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (io->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
@@ -77,14 +76,6 @@ nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr)
}
void
-nni_win_io_cancel(nni_win_io *io)
-{
- if (io->f != INVALID_HANDLE_VALUE) {
- CancelIoEx(io->f, &io->olpd);
- }
-}
-
-void
nni_win_io_fini(nni_win_io *io)
{
if (io->olpd.hEvent != NULL) {
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
deleted file mode 100644
index 169a2e00..00000000
--- a/src/platform/windows/win_ipc.c
+++ /dev/null
@@ -1,673 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_WINDOWS
-
-#include <stdio.h>
-
-struct nni_plat_ipc_pipe {
- HANDLE p;
- int mode;
- nni_win_event rcv_ev;
- nni_win_event snd_ev;
-};
-
-struct nni_plat_ipc_ep {
- char path[NNG_MAXADDRLEN + 16];
- nni_sockaddr addr;
- int mode;
- bool started;
- HANDLE p; // accept side only
- nni_win_event acc_ev; // accept side only
- nni_aio * con_aio; // conn side only
- nni_list_node node; // conn side uses this
- SECURITY_ATTRIBUTES sec_attr;
-};
-
-static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *);
-static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *);
-static void nni_win_ipc_pipe_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_ipc_pipe_ops = {
- .wev_start = nni_win_ipc_pipe_start,
- .wev_finish = nni_win_ipc_pipe_finish,
- .wev_cancel = nni_win_ipc_pipe_cancel,
-};
-
-static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *);
-static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *);
-static void nni_win_ipc_acc_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_ipc_acc_ops = {
- .wev_start = nni_win_ipc_acc_start,
- .wev_finish = nni_win_ipc_acc_finish,
- .wev_cancel = nni_win_ipc_acc_cancel,
-};
-
-static int
-nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
-{
- void * buf;
- DWORD len;
- BOOL ok;
- int rv;
- nni_plat_ipc_pipe *pipe = evt->ptr;
- unsigned idx;
- unsigned naiov;
- nni_iov * aiov;
-
- NNI_ASSERT(aio != NULL);
-
- if (pipe->p == INVALID_HANDLE_VALUE) {
- evt->status = NNG_ECLOSED;
- evt->count = 0;
- return (1);
- }
-
- nni_aio_get_iov(aio, &naiov, &aiov);
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes do
- // not appear to support scatter/gather, so we have to process
- // each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
-
- evt->count = 0;
- if (evt == &pipe->snd_ev) {
- ok = WriteFile(pipe->p, buf, len, NULL, &evt->olpd);
- } else {
- ok = ReadFile(pipe->p, buf, len, NULL, &evt->olpd);
- }
- if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
-
- // Wait for the I/O completion event. Note that when an I/O
- // completes immediately, the I/O completion packet is still
- // delivered.
- return (0);
-}
-
-static void
-nni_win_ipc_pipe_cancel(nni_win_event *evt)
-{
- nni_plat_ipc_pipe *pipe = evt->ptr;
-
- CancelIoEx(pipe->p, &evt->olpd);
-}
-
-static void
-nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio)
-{
- nni_aio_finish(aio, evt->status, evt->count);
-}
-
-static int
-nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p, int mode)
-{
- nni_plat_ipc_pipe *pipe;
- int rv;
-
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- pipe->mode = mode;
- rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
- rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
-
- pipe->p = p;
- *pipep = pipe;
- return (0);
-}
-
-void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio)
-{
- nni_win_event_submit(&pipe->snd_ev, aio);
-}
-
-void
-nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio)
-{
- nni_win_event_submit(&pipe->rcv_ev, aio);
-}
-
-void
-nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe)
-{
- HANDLE p;
-
- nni_win_event_close(&pipe->snd_ev);
- nni_win_event_close(&pipe->rcv_ev);
-
- if ((p = pipe->p) != INVALID_HANDLE_VALUE) {
- pipe->p = INVALID_HANDLE_VALUE;
- CloseHandle(p);
- }
-}
-
-void
-nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe)
-{
- nni_plat_ipc_pipe_close(pipe);
-
- nni_win_event_fini(&pipe->snd_ev);
- nni_win_event_fini(&pipe->rcv_ev);
- NNI_FREE_STRUCT(pipe);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-// nni_plat_ipc_pipe_get_peer_gid obtains the peer group id, if possible.
-// NB: Only POSIX systems support group IDs.
-int
-nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *pipe, uint64_t *pid)
-{
- ULONG id;
- switch (pipe->mode) {
- case NNI_EP_MODE_DIAL:
- if (!GetNamedPipeServerProcessId(pipe->p, &id)) {
- return (nni_win_error(GetLastError()));
- }
- *pid = id;
- break;
- case NNI_EP_MODE_LISTEN:
- if (!GetNamedPipeClientProcessId(pipe->p, &id)) {
- return (nni_win_error(GetLastError()));
- }
- *pid = id;
- break;
- default:
- // Should never occur!
- return (NNG_EINVAL);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode)
-{
- const char * path;
- nni_plat_ipc_ep *ep;
-
- path = sa->s_ipc.sa_path;
- if (nni_strnlen(path, NNG_MAXADDRLEN) >= NNG_MAXADDRLEN) {
- return (NNG_EINVAL);
- }
-
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
- }
- ZeroMemory(ep, sizeof(*ep));
-
- ep->mode = mode;
- ep->sec_attr.nLength = sizeof(ep->sec_attr);
- ep->sec_attr.lpSecurityDescriptor = NULL;
- ep->sec_attr.bInheritHandle = FALSE;
- NNI_LIST_NODE_INIT(&ep->node);
-
- ep->addr = *sa;
- (void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path);
-
- *epp = ep;
- return (0);
-}
-
-int
-nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits)
-{
- NNI_ARG_UNUSED(ep);
- NNI_ARG_UNUSED(bits);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *desc)
-{
- if (ep->started) {
- return (NNG_EBUSY);
- }
- if (ep->mode != NNI_EP_MODE_LISTEN) {
- return (NNG_ENOTSUP);
- }
- if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) {
- return (NNG_EINVAL);
- }
- ep->sec_attr.lpSecurityDescriptor = desc;
- return (0);
-}
-
-int
-nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
-{
- int rv;
- HANDLE p;
-
- if (ep->mode != NNI_EP_MODE_LISTEN) {
- return (NNG_EINVAL);
- }
- if (ep->started) {
- return (NNG_EBUSY);
- }
-
- // We create the first named pipe, and we make sure that it is
- // properly ours.
- p = CreateNamedPipeA(ep->path,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
- FILE_FLAG_FIRST_PIPE_INSTANCE,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
- PIPE_REJECT_REMOTE_CLIENTS,
- PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr);
- if (p == INVALID_HANDLE_VALUE) {
- if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) {
- rv = NNG_EADDRINUSE;
- } else {
- rv = nni_win_error(rv);
- }
- goto failed;
- }
- rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep);
- if (rv != 0) {
- goto failed;
- }
-
- if ((rv = nni_win_iocp_register(p)) != 0) {
- goto failed;
- }
-
- ep->p = p;
- ep->started = true;
- return (0);
-
-failed:
-
- if (p != INVALID_HANDLE_VALUE) {
- (void) CloseHandle(p);
- }
-
- return (rv);
-}
-
-static void
-nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_ipc_ep * ep = evt->ptr;
- nni_plat_ipc_pipe *pipe;
- int rv;
- HANDLE newp, oldp;
-
- if ((rv = evt->status) != 0) {
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- newp = CreateNamedPipeA(ep->path,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
- PIPE_REJECT_REMOTE_CLIENTS,
- PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr);
- if (newp == INVALID_HANDLE_VALUE) {
- rv = nni_win_error(GetLastError());
- // We connected, but as we cannot get a new pipe,
- // we have to disconnect the old one.
- DisconnectNamedPipe(ep->p);
- nni_aio_finish_error(aio, rv);
- return;
- }
- if ((rv = nni_win_iocp_register(newp)) != 0) {
- // Disconnect the old pipe.
- DisconnectNamedPipe(ep->p);
- // And discard the half-baked new one.
- DisconnectNamedPipe(newp);
- (void) CloseHandle(newp);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- oldp = ep->p;
- ep->p = newp;
-
- if ((rv = nni_win_ipc_pipe_init(&pipe, oldp, NNI_EP_MODE_LISTEN)) !=
- 0) {
- // The new pipe is already fine for us. Discard
- // the old one, since failed to be able to use it.
- DisconnectNamedPipe(oldp);
- (void) CloseHandle(oldp);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
-}
-
-static void
-nni_win_ipc_acc_cancel(nni_win_event *evt)
-{
- nni_plat_ipc_ep *ep = evt->ptr;
-
- (void) CancelIoEx(ep->p, &evt->olpd);
- // Just to be sure.
- (void) DisconnectNamedPipe(ep->p);
-}
-
-static int
-nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_ipc_ep *ep = evt->ptr;
- NNI_ARG_UNUSED(aio);
-
- if (!ConnectNamedPipe(ep->p, &evt->olpd)) {
- int rv = GetLastError();
- switch (rv) {
- case ERROR_PIPE_CONNECTED:
- // Kind of like success, but as this is technically
- // an "error", we have to complete it ourself.
- evt->status = 0;
- evt->count = 0;
- return (1);
-
- case ERROR_IO_PENDING:
- // Normal asynchronous operation. Wait for
- // completion.
- return (0);
-
- default:
- // Fast-fail (synchronous).
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
- }
-
- // Synchronous success - the I/O completion packet should still
- // be delivered.
- return (0);
-}
-
-void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_win_event_submit(&ep->acc_ev, aio);
-}
-
-// So Windows IPC is a bit different on the client side. There is no
-// support for asynchronous connection, but we can fake it with a
-// single thread that runs to establish the connection. That thread
-// will run keep looping, sleeping for 10 ms between attempts. It
-// performs non-blocking attempts to connect.
-typedef struct nni_win_ipc_conn_work nni_win_ipc_conn_work;
-struct nni_win_ipc_conn_work {
- nni_list waiters;
- nni_list workers;
- nni_mtx mtx;
- nni_cv cv;
- nni_thr thr;
- int exit;
-};
-
-static nni_win_ipc_conn_work nni_win_ipc_connecter;
-
-static void
-nni_win_ipc_conn_thr(void *arg)
-{
- nni_win_ipc_conn_work *w = arg;
- nni_plat_ipc_ep * ep;
- nni_plat_ipc_pipe * pipe;
- nni_aio * aio;
- HANDLE p;
- int rv;
-
- nni_mtx_lock(&w->mtx);
- for (;;) {
- if (w->exit) {
- break;
- }
- while ((ep = nni_list_first(&w->waiters)) != NULL) {
- nni_list_remove(&w->waiters, ep);
- nni_list_append(&w->workers, ep);
- }
-
- while ((ep = nni_list_first(&w->workers)) != NULL) {
-
- nni_list_remove(&w->workers, ep);
-
- if ((aio = ep->con_aio) == NULL) {
- continue;
- }
- ep->con_aio = NULL;
-
- pipe = NULL;
-
- p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE,
- 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED,
- NULL);
-
- if (p == INVALID_HANDLE_VALUE) {
- switch ((rv = GetLastError())) {
- case ERROR_PIPE_BUSY:
- // Still in progress. This
- // shouldn't happen unless the
- // other side aborts the
- // connection.
- ep->con_aio = aio;
- nni_list_append(&w->waiters, ep);
- continue;
-
- case ERROR_FILE_NOT_FOUND:
- rv = NNG_ECONNREFUSED;
- break;
- default:
- rv = nni_win_error(rv);
- break;
- }
- goto fail;
- }
- if (((rv = nni_win_ipc_pipe_init(
- &pipe, p, NNI_EP_MODE_DIAL)) != 0) ||
- ((rv = nni_win_iocp_register(p)) != 0)) {
- goto fail;
- }
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
- continue;
-
- fail:
- if (p != INVALID_HANDLE_VALUE) {
- DisconnectNamedPipe(p);
- CloseHandle(p);
- }
- if (pipe != NULL) {
- nni_plat_ipc_pipe_fini(pipe);
- }
- nni_aio_finish_error(aio, rv);
- }
-
- if (nni_list_empty(&w->waiters)) {
- // Wait until an endpoint is added.
- nni_cv_wait(&w->cv);
- } else {
- // Wait 10 ms, unless woken earlier.
- nni_cv_until(&w->cv, nni_clock() + 10);
- }
- }
- nni_mtx_unlock(&w->mtx);
-}
-
-static void
-nni_win_ipc_conn_cancel(nni_aio *aio, int rv)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- nni_plat_ipc_ep * ep = nni_aio_get_prov_data(aio);
-
- nni_mtx_lock(&w->mtx);
- if (aio == ep->con_aio) {
- ep->con_aio = NULL;
- if (nni_list_active(&w->waiters, ep)) {
- nni_list_remove(&w->waiters, ep);
- nni_cv_wake(&w->cv);
- }
- nni_aio_finish_error(aio, rv);
- }
- nni_mtx_unlock(&w->mtx);
-}
-
-void
-nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&w->mtx);
- if ((rv = nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep)) != 0) {
- nni_mtx_unlock(&w->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- NNI_ASSERT(!nni_list_active(&w->waiters, ep));
-
- ep->con_aio = aio;
- nni_list_append(&w->waiters, ep);
- nni_cv_wake(&w->cv);
- nni_mtx_unlock(&w->mtx);
-}
-
-void
-nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
-{
- nni_plat_ipc_ep_close(ep);
- if (ep->p != INVALID_HANDLE_VALUE) {
- CloseHandle(ep->p);
- ep->p = INVALID_HANDLE_VALUE;
- }
- nni_win_event_close(&ep->acc_ev);
- nni_win_event_fini(&ep->acc_ev);
- NNI_FREE_STRUCT(ep);
-}
-
-void
-nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- nni_aio * aio;
-
- switch (ep->mode) {
- case NNI_EP_MODE_DIAL:
- nni_mtx_lock(&w->mtx);
- if (nni_list_active(&w->waiters, ep)) {
- nni_list_remove(&w->waiters, ep);
- }
- if ((aio = ep->con_aio) != NULL) {
- ep->con_aio = NULL;
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
- nni_mtx_unlock(&w->mtx);
- break;
-
- case NNI_EP_MODE_LISTEN:
- nni_win_event_close(&ep->acc_ev);
- if (ep->p != INVALID_HANDLE_VALUE) {
- CloseHandle(ep->p);
- ep->p = INVALID_HANDLE_VALUE;
- }
- break;
- }
-}
-
-int
-nni_win_ipc_sysinit(void)
-{
- int rv;
- nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter;
-
- NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node);
- NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node);
-
- nni_mtx_init(&worker->mtx);
- nni_cv_init(&worker->cv, &worker->mtx);
-
- rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker);
- if (rv != 0) {
- return (rv);
- }
-
- nni_thr_run(&worker->thr);
-
- return (0);
-}
-
-void
-nni_win_ipc_sysfini(void)
-{
- nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter;
-
- nni_mtx_lock(&worker->mtx);
- worker->exit = 1;
- nni_cv_wake(&worker->cv);
- nni_mtx_unlock(&worker->mtx);
- nni_thr_fini(&worker->thr);
- nni_cv_fini(&worker->cv);
- nni_mtx_fini(&worker->mtx);
-}
-
-#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipc.h b/src/platform/windows/win_ipc.h
new file mode 100644
index 00000000..51ce5548
--- /dev/null
+++ b/src/platform/windows/win_ipc.h
@@ -0,0 +1,62 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef PLATFORM_WIN_WINIPC_H
+#define PLATFORM_WIN_WINIPC_H
+
+// This header file is private to the IPC (named pipes) support for Windows.
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+struct nni_ipc_conn {
+ HANDLE f;
+ nni_win_io recv_io;
+ nni_win_io send_io;
+ nni_win_io conn_io;
+ nni_list recv_aios;
+ nni_list send_aios;
+ nni_aio * conn_aio;
+ nni_ipc_dialer * dialer;
+ nni_ipc_listener *listener;
+ int recv_rv;
+ int send_rv;
+ int conn_rv;
+ bool closed;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_reap_item reap;
+};
+
+struct nni_ipc_dialer {
+ bool closed; // dialers are locked by the worker lock
+ nni_list aios;
+ nni_list_node node; // node on worker list
+};
+
+struct nni_ipc_listener {
+ char * path;
+ bool started;
+ bool closed;
+ HANDLE f;
+ SECURITY_ATTRIBUTES sec_attr;
+ nni_list aios;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_win_io io;
+ int rv;
+};
+
+extern int nni_win_ipc_conn_init(nni_ipc_conn **, HANDLE);
+
+#endif // NNG_PLATFORM_WINDOWS
+
+#endif // NNG_PLATFORM_WIN_WINIPC_H
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c
new file mode 100644
index 00000000..d8ef4e4e
--- /dev/null
+++ b/src/platform/windows/win_ipcconn.c
@@ -0,0 +1,388 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+static void
+ipc_recv_start(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ unsigned idx;
+ unsigned naiov;
+ nni_iov *aiov;
+ void * buf;
+ DWORD len;
+ int rv;
+
+ if (c->closed) {
+ while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_list_remove(&c->recv_aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&c->cv);
+ }
+again:
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ return;
+ }
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes do
+ // not appear to support scatter/gather, so we have to process
+ // each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ goto again;
+ }
+}
+
+static void
+ipc_recv_cb(nni_win_io *io, int rv, size_t num)
+{
+ nni_aio * aio;
+ nni_ipc_conn *c = io->ptr;
+ nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ // Should indicate that it was closed.
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ if (c->recv_rv != 0) {
+ rv = c->recv_rv;
+ c->recv_rv = 0;
+ }
+ nni_aio_list_remove(aio);
+ ipc_recv_start(c);
+ if (c->closed) {
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ if ((rv == 0) && (num == 0)) {
+ // A zero byte receive is a remote close from the peer.
+ rv = NNG_ECLOSED;
+ }
+ nni_aio_finish_synch(aio, rv, num);
+}
+static void
+ipc_recv_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+ nni_mtx_lock(&c->mtx);
+ if (aio == nni_list_first(&c->recv_aios)) {
+ c->recv_rv = rv;
+ CancelIoEx(c->f, &c->recv_io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (c->closed) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&c->recv_aios, aio);
+ if (aio == nni_list_first(&c->recv_aios)) {
+ ipc_recv_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_send_start(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ unsigned idx;
+ unsigned naiov;
+ nni_iov *aiov;
+ void * buf;
+ DWORD len;
+ int rv;
+
+ if (c->closed) {
+ while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_list_remove(&c->send_aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&c->cv);
+ }
+again:
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ return;
+ }
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes do
+ // not appear to support scatter/gather, so we have to process
+ // each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ goto again;
+ }
+}
+
+static void
+ipc_send_cb(nni_win_io *io, int rv, size_t num)
+{
+ nni_aio * aio;
+ nni_ipc_conn *c = io->ptr;
+ nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ // Should indicate that it was closed.
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ if (c->send_rv != 0) {
+ rv = c->send_rv;
+ c->send_rv = 0;
+ }
+ nni_aio_list_remove(aio);
+ ipc_send_start(c);
+ if (c->closed) {
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ if ((rv == 0) && (num == 0)) {
+ // A zero byte receive is a remote close from the peer.
+ rv = NNG_ECLOSED;
+ }
+ nni_aio_finish_synch(aio, rv, num);
+}
+
+static void
+ipc_send_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+ nni_mtx_lock(&c->mtx);
+ if (aio == nni_list_first(&c->send_aios)) {
+ c->send_rv = rv;
+ CancelIoEx(c->f, &c->send_io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (c->closed) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_send_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&c->send_aios, aio);
+ if (aio == nni_list_first(&c->send_aios)) {
+ ipc_send_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+nni_win_ipc_conn_init(nni_ipc_conn **connp, HANDLE p)
+{
+ nni_ipc_conn *c;
+ int rv;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ c->f = INVALID_HANDLE_VALUE;
+ nni_mtx_init(&c->mtx);
+ nni_cv_init(&c->cv, &c->mtx);
+ nni_aio_list_init(&c->recv_aios);
+ nni_aio_list_init(&c->send_aios);
+
+ if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) ||
+ ((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) {
+ nni_ipc_conn_fini(c);
+ return (rv);
+ }
+
+ c->f = p;
+ *connp = c;
+ return (0);
+}
+
+void
+nni_ipc_conn_close(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ c->closed = true;
+ if (!nni_list_empty(&c->recv_aios)) {
+ CancelIoEx(c->f, &c->recv_io.olpd);
+ }
+ if (!nni_list_empty(&c->send_aios)) {
+ CancelIoEx(c->f, &c->send_io.olpd);
+ }
+
+ if (c->f != INVALID_HANDLE_VALUE) {
+ // NB: closing the pipe is dangerous at this point.
+ DisconnectNamedPipe(c->f);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_reap(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ while ((!nni_list_empty(&c->recv_aios)) ||
+ (!nni_list_empty(&c->send_aios))) {
+ nni_cv_wait(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ nni_win_io_fini(&c->recv_io);
+ nni_win_io_fini(&c->send_io);
+ nni_win_io_fini(&c->conn_io);
+
+ if (c->f != INVALID_HANDLE_VALUE) {
+ CloseHandle(c->f);
+ }
+ nni_cv_fini(&c->cv);
+ nni_mtx_fini(&c->mtx);
+ NNI_FREE_STRUCT(c);
+}
+
+void
+nni_ipc_conn_fini(nni_ipc_conn *c)
+{
+ nni_ipc_conn_close(c);
+
+ nni_reap(&c->reap, (nni_cb) ipc_conn_reap, c);
+}
+
+int
+nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid)
+{
+ ULONG id;
+ if (c->dialer) {
+ if (!GetNamedPipeServerProcessId(c->f, &id)) {
+ return (nni_win_error(GetLastError()));
+ }
+ } else {
+ if (!GetNamedPipeClientProcessId(c->f, &id)) {
+ return (nni_win_error(GetLastError()));
+ }
+ }
+ *pid = id;
+ return (0);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c
new file mode 100644
index 00000000..429bcedf
--- /dev/null
+++ b/src/platform/windows/win_ipcdial.c
@@ -0,0 +1,265 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+int
+nni_ipc_dialer_init(nni_ipc_dialer **dp)
+{
+ nni_ipc_dialer *d;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ d->closed = false;
+ nni_aio_list_init(&d->aios);
+ *dp = d;
+ return (0);
+}
+
+// Windows IPC is a bit different on the client side. There is no
+// support for asynchronous connection, but we can fake it with a
+// single thread that runs to establish the connection. That thread
+// will run keep looping, sleeping for 10 ms between attempts. It
+// performs non-blocking attempts to connect.
+typedef struct ipc_dial_work {
+ nni_list waiters;
+ nni_list workers;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_thr thr;
+ int exit;
+} ipc_dial_work;
+
+static ipc_dial_work ipc_connecter;
+
+static void
+ipc_dial_thr(void *arg)
+{
+ ipc_dial_work *w = arg;
+
+ nni_mtx_lock(&w->mtx);
+ for (;;) {
+ nni_ipc_dialer *d;
+
+ if (w->exit) {
+ break;
+ }
+ while ((d = nni_list_first(&w->waiters)) != NULL) {
+ nni_list_remove(&w->waiters, d);
+ nni_list_append(&w->workers, d);
+ }
+
+ while ((d = nni_list_first(&w->workers)) != NULL) {
+ nni_ipc_conn *c;
+ nni_aio * aio;
+ HANDLE f;
+ int rv;
+ char * path;
+
+ if ((aio = nni_list_first(&d->aios)) == NULL) {
+ nni_list_remove(&w->workers, d);
+ continue;
+ }
+
+ path = nni_aio_get_prov_extra(aio, 0);
+
+ f = CreateFileA(path, GENERIC_READ | GENERIC_WRITE, 0,
+ NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
+
+ if (f == INVALID_HANDLE_VALUE) {
+ switch ((rv = GetLastError())) {
+ case ERROR_PIPE_BUSY:
+ // Still in progress. This
+ // shouldn't happen unless the
+ // other side aborts the
+ // connection.
+ // back at the head of the list
+ nni_list_remove(&w->workers, d);
+ nni_list_prepend(&w->waiters, d);
+ continue;
+
+ case ERROR_FILE_NOT_FOUND:
+ rv = NNG_ECONNREFUSED;
+ break;
+ default:
+ rv = nni_win_error(rv);
+ break;
+ }
+ nni_list_remove(&d->aios, aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ nni_list_remove(&d->aios, aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+
+ if (((rv = nni_win_io_register(f)) != 0) ||
+ ((rv = nni_win_ipc_conn_init(&c, f)) != 0)) {
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ c->dialer = d;
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+
+ if (nni_list_empty(&w->waiters)) {
+ // Wait until an endpoint is added.
+ nni_cv_wait(&w->cv);
+ } else {
+ // Wait 10 ms, unless woken earlier.
+ nni_cv_until(&w->cv, nni_clock() + 10);
+ }
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+static void
+ipc_dial_cancel(nni_aio *aio, int rv)
+{
+ ipc_dial_work * w = &ipc_connecter;
+ nni_ipc_dialer *d = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&w->mtx);
+ if (nni_aio_list_active(aio)) {
+ char *path;
+ if (nni_list_active(&w->waiters, d)) {
+ nni_list_remove(&w->waiters, d);
+ nni_cv_wake(&w->cv);
+ }
+ nni_aio_list_remove(aio);
+ path = nni_aio_get_prov_extra(aio, 0);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+void
+nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+{
+ ipc_dial_work *w = &ipc_connecter;
+ char * path;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ if (sa->s_family != NNG_AF_IPC) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
+ if ((rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path)) !=
+ 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_mtx_lock(&w->mtx);
+ if ((rv = nni_aio_schedule(aio, ipc_dial_cancel, d)) != 0) {
+ nni_mtx_unlock(&w->mtx);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (d->closed) {
+ nni_mtx_unlock(&w->mtx);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ nni_aio_set_prov_extra(aio, 0, path);
+ nni_list_append(&d->aios, aio);
+ if (nni_list_first(&d->aios) == aio) {
+ nni_list_append(&w->waiters, d);
+ nni_cv_wake(&w->cv);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+void
+nni_ipc_dialer_fini(nni_ipc_dialer *d)
+{
+ nni_ipc_dialer_close(d);
+ NNI_FREE_STRUCT(d);
+}
+
+void
+nni_ipc_dialer_close(nni_ipc_dialer *d)
+{
+ ipc_dial_work *w = &ipc_connecter;
+ nni_aio * aio;
+
+ nni_mtx_lock(&w->mtx);
+ d->closed = true;
+ if (nni_list_active(&w->waiters, d)) {
+ nni_list_remove(&w->waiters, d);
+ }
+ while ((aio = nni_list_first(&d->aios)) != NULL) {
+ nni_list_remove(&d->aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+int
+nni_win_ipc_sysinit(void)
+{
+ int rv;
+ ipc_dial_work *worker = &ipc_connecter;
+
+ NNI_LIST_INIT(&worker->workers, nni_ipc_dialer, node);
+ NNI_LIST_INIT(&worker->waiters, nni_ipc_dialer, node);
+
+ nni_mtx_init(&worker->mtx);
+ nni_cv_init(&worker->cv, &worker->mtx);
+
+ rv = nni_thr_init(&worker->thr, ipc_dial_thr, worker);
+ if (rv != 0) {
+ return (rv);
+ }
+
+ nni_thr_run(&worker->thr);
+
+ return (0);
+}
+
+void
+nni_win_ipc_sysfini(void)
+{
+ ipc_dial_work *worker = &ipc_connecter;
+
+ nni_reap_drain(); // so that listeners get cleaned up.
+
+ nni_mtx_lock(&worker->mtx);
+ worker->exit = 1;
+ nni_cv_wake(&worker->cv);
+ nni_mtx_unlock(&worker->mtx);
+ nni_thr_fini(&worker->thr);
+ nni_cv_fini(&worker->cv);
+ nni_mtx_fini(&worker->mtx);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c
new file mode 100644
index 00000000..20bb8548
--- /dev/null
+++ b/src/platform/windows/win_ipclisten.c
@@ -0,0 +1,296 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+static void
+ipc_accept_done(nni_ipc_listener *l, int rv)
+{
+ nni_aio * aio;
+ HANDLE f;
+ nni_ipc_conn *c;
+
+ aio = nni_list_first(&l->aios);
+ nni_list_remove(&l->aios, aio);
+ nni_cv_wake(&l->cv);
+
+ if (l->closed) {
+ // Closed, so bail.
+ DisconnectNamedPipe(l->f);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ // Create a replacement pipe.
+ f = CreateNamedPipeA(l->path,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS,
+ PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr);
+ if (f == INVALID_HANDLE_VALUE) {
+ // We couldn't create a replacement pipe, so we have to
+ // abort the client from our side, so that we can keep
+ // our server pipe available.
+ rv = nni_win_error(GetLastError());
+ DisconnectNamedPipe(l->f);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (((rv = nni_win_io_register(f)) != 0) ||
+ ((rv = nni_win_ipc_conn_init(&c, l->f)) != 0)) {
+ DisconnectNamedPipe(l->f);
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ l->f = f;
+ c->listener = l;
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+ipc_accept_start(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+
+ if (l->closed) {
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ nni_list_remove(&l->aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&l->cv);
+ }
+
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ int rv;
+
+ if ((ConnectNamedPipe(l->f, &l->io.olpd)) ||
+ ((rv = GetLastError()) == ERROR_IO_PENDING)) {
+ // Success, or pending, handled via completion pkt.
+ return;
+ }
+ if (rv == ERROR_PIPE_CONNECTED) {
+ // Kind of like success, but as this is technically
+ // an "error", we have to complete it ourself.
+ // Fake a completion.
+ ipc_accept_done(l, 0);
+ } else {
+ // Fast-fail (synchronous).
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ }
+ }
+}
+
+static void
+ipc_accept_cb(nni_win_io *io, int rv, size_t cnt)
+{
+ nni_ipc_listener *l = io->ptr;
+
+ NNI_ARG_UNUSED(cnt);
+
+ nni_mtx_lock(&l->mtx);
+ if (nni_list_empty(&l->aios)) {
+ // We canceled this somehow. We no longer care.
+ DisconnectNamedPipe(l->f);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (l->rv != 0) {
+ rv = l->rv;
+ l->rv = 0;
+ }
+ ipc_accept_done(l, rv);
+ ipc_accept_start(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+int
+nni_ipc_listener_init(nni_ipc_listener **lp)
+{
+ nni_ipc_listener *l;
+ int rv;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_win_io_init(&l->io, ipc_accept_cb, l)) != 0) {
+ NNI_FREE_STRUCT(l);
+ return (rv);
+ }
+ l->started = false;
+ l->closed = false;
+ l->sec_attr.nLength = sizeof(l->sec_attr);
+ l->sec_attr.lpSecurityDescriptor = NULL;
+ l->sec_attr.bInheritHandle = FALSE;
+ nni_aio_list_init(&l->aios);
+ nni_mtx_init(&l->mtx);
+ nni_cv_init(&l->cv, &l->mtx);
+ *lp = l;
+ return (0);
+}
+
+int
+nni_ipc_listener_set_permissions(nni_ipc_listener *l, int bits)
+{
+ NNI_ARG_UNUSED(l);
+ NNI_ARG_UNUSED(bits);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *desc)
+{
+ if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) {
+ return (NNG_EINVAL);
+ }
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ l->sec_attr.lpSecurityDescriptor = desc;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+int
+nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
+{
+ int rv;
+ HANDLE f;
+ char * path;
+
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path);
+ if (rv != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ f = CreateNamedPipeA(path,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
+ FILE_FLAG_FIRST_PIPE_INSTANCE,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS,
+ PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr);
+ if (f == INVALID_HANDLE_VALUE) {
+ if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) {
+ rv = NNG_EADDRINUSE;
+ } else {
+ rv = nni_win_error(rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+ if ((rv = nni_win_io_register(f)) != 0) {
+ CloseHandle(f);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+
+ l->f = f;
+ l->path = path;
+ l->started = true;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+static void
+ipc_accept_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_listener *l = nni_aio_get_prov_data(aio);
+
+ nni_mtx_unlock(&l->mtx);
+ if (aio == nni_list_first(&l->aios)) {
+ l->rv = rv;
+ CancelIoEx(l->f, &l->io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_list_remove(&l->aios, aio);
+ nni_cv_wake(&l->cv);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
+{
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ nni_list_append(&l->aios, aio);
+ if (nni_list_first(&l->aios) == aio) {
+ ipc_accept_start(l);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_close(nni_ipc_listener *l)
+{
+
+ nni_mtx_lock(&l->mtx);
+ if (!l->closed) {
+ l->closed = true;
+ if (!nni_list_empty(&l->aios)) {
+ CancelIoEx(l->f, &l->io.olpd);
+ }
+ DisconnectNamedPipe(l->f);
+ CloseHandle(l->f);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_fini(nni_ipc_listener *l)
+{
+ nni_mtx_lock(&l->mtx);
+ while (!nni_list_empty(&l->aios)) {
+ nni_cv_wait(&l->cv);
+ }
+ nni_mtx_unlock(&l->mtx);
+ nni_win_io_fini(&l->io);
+ nni_strfree(l->path);
+ nni_cv_fini(&l->cv);
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c
index c3a4a5d8..08b759ca 100644
--- a/src/platform/windows/win_tcpconn.c
+++ b/src/platform/windows/win_tcpconn.c
@@ -102,7 +102,7 @@ tcp_recv_cancel(nni_aio *aio, int rv)
nni_mtx_lock(&c->mtx);
if (aio == nni_list_first(&c->recv_aios)) {
c->recv_rv = rv;
- nni_win_io_cancel(&c->recv_io);
+ CancelIoEx((HANDLE) c->s, &c->recv_io.olpd);
} else if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -192,7 +192,7 @@ tcp_send_cancel(nni_aio *aio, int rv)
nni_mtx_lock(&c->mtx);
if (aio == nni_list_first(&c->send_aios)) {
c->send_rv = rv;
- nni_win_io_cancel(&c->send_io);
+ CancelIoEx((HANDLE) c->s, &c->send_io.olpd);
} else if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -273,10 +273,8 @@ nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s)
nni_aio_list_init(&c->send_aios);
c->conn_aio = NULL;
- if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) !=
- 0) ||
- ((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) !=
- 0) ||
+ if (((rv = nni_win_io_init(&c->recv_io, tcp_recv_cb, c)) != 0) ||
+ ((rv = nni_win_io_init(&c->send_io, tcp_send_cb, c)) != 0) ||
((rv = nni_win_io_register((HANDLE) s)) != 0)) {
nni_tcp_conn_fini(c);
return (rv);
@@ -309,10 +307,10 @@ nni_tcp_conn_close(nni_tcp_conn *c)
if (!c->closed) {
c->closed = true;
if (!nni_list_empty(&c->recv_aios)) {
- nni_win_io_cancel(&c->recv_io);
+ CancelIoEx((HANDLE) c->s, &c->recv_io.olpd);
}
if (!nni_list_empty(&c->send_aios)) {
- nni_win_io_cancel(&c->send_io);
+ CancelIoEx((HANDLE) c->s, &c->send_io.olpd);
}
if (c->s != INVALID_SOCKET) {
shutdown(c->s, SD_BOTH);
@@ -372,7 +370,6 @@ nni_tcp_conn_fini(nni_tcp_conn *c)
while ((!nni_list_empty(&c->recv_aios)) ||
(!nni_list_empty(&c->send_aios))) {
nni_cv_wait(&c->cv);
- nni_mtx_unlock(&c->mtx);
}
nni_mtx_unlock(&c->mtx);
diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c
index 4a3e9f2f..5283ea81 100644
--- a/src/platform/windows/win_tcpdial.c
+++ b/src/platform/windows/win_tcpdial.c
@@ -70,7 +70,7 @@ nni_tcp_dialer_close(nni_tcp_dialer *d)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->conn_rv = NNG_ECLOSED;
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
}
}
@@ -104,7 +104,7 @@ tcp_dial_cancel(nni_aio *aio, int rv)
if (c->conn_rv == 0) {
c->conn_rv = rv;
}
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
nni_mtx_unlock(&d->mtx);
}
@@ -187,8 +187,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) !=
- 0) {
+ if ((rv = nni_win_io_init(&c->conn_io, tcp_dial_cb, c)) != 0) {
nni_tcp_conn_fini(c);
nni_aio_finish_error(aio, rv);
return;
diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c
index 5055c32d..1f197246 100644
--- a/src/platform/windows/win_tcplisten.c
+++ b/src/platform/windows/win_tcplisten.c
@@ -147,7 +147,7 @@ nni_tcp_listener_close(nni_tcp_listener *l)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->conn_rv = NNG_ECLOSED;
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
}
closesocket(l->s);
@@ -246,7 +246,7 @@ tcp_accept_cancel(nni_aio *aio, int rv)
if (c->conn_rv == 0) {
c->conn_rv = rv;
}
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
nni_mtx_unlock(&l->mtx);
}
@@ -263,6 +263,11 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
return;
}
nni_mtx_lock(&l->mtx);
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
if (l->closed) {
nni_mtx_unlock(&l->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -286,8 +291,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
c->listener = l;
c->conn_aio = aio;
nni_aio_set_prov_extra(aio, 0, c);
- if (((rv = nni_win_io_init(
- &c->conn_io, (HANDLE) l->s, tcp_accept_cb, c)) != 0) ||
+ if (((rv = nni_win_io_init(&c->conn_io, tcp_accept_cb, c)) != 0) ||
((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) {
nni_aio_set_prov_extra(aio, 0, NULL);
nni_mtx_unlock(&l->mtx);