aboutsummaryrefslogtreecommitdiff
path: root/src/platform/windows
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-07-16 17:10:47 -0700
committerGarrett D'Amore <garrett@damore.org>2018-07-18 13:25:33 -0700
commitb310f712828962bf3187caf3bfe064c3531c5628 (patch)
treef99ceb5851f601c93b0305617d692722f2978dd5 /src/platform/windows
parent3f40a08eab60df77dc61ae0350e59f36e8d0ed16 (diff)
downloadnng-b310f712828962bf3187caf3bfe064c3531c5628.tar.gz
nng-b310f712828962bf3187caf3bfe064c3531c5628.tar.bz2
nng-b310f712828962bf3187caf3bfe064c3531c5628.zip
fixes #595 mutex leak and other minor errors in TCP
fixes #596 POSIX IPC should move away from pipedesc/epdesc fixes #598 TLS and TCP listeners could support NNG_OPT_LOCADDR fixes #594 Windows IPC should use "new style" win_io code. fixes #597 macOS could support PEER PID This large change set cleans up the IPC support on Windows and POSIX. This has the beneficial impact of significantly reducing the complexity of the code, reducing locking, increasing concurrency (multiple dial and accepts can be outstanding now), reducing context switches (we complete thins synchronously now). While here we have added some missing option support, and fixed a few more bugs that we found in the TCP code changes from last week.
Diffstat (limited to 'src/platform/windows')
-rw-r--r--src/platform/windows/win_impl.h3
-rw-r--r--src/platform/windows/win_io.c11
-rw-r--r--src/platform/windows/win_ipc.c673
-rw-r--r--src/platform/windows/win_ipc.h62
-rw-r--r--src/platform/windows/win_ipcconn.c388
-rw-r--r--src/platform/windows/win_ipcdial.c265
-rw-r--r--src/platform/windows/win_ipclisten.c296
-rw-r--r--src/platform/windows/win_tcpconn.c15
-rw-r--r--src/platform/windows/win_tcpdial.c7
-rw-r--r--src/platform/windows/win_tcplisten.c12
10 files changed, 1030 insertions, 702 deletions
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index 93e45423..b8741b52 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -127,9 +127,8 @@ extern void nni_win_udp_sysfini(void);
extern int nni_win_resolv_sysinit(void);
extern void nni_win_resolv_sysfini(void);
-extern int nni_win_io_init(nni_win_io *, HANDLE, nni_win_io_cb, void *);
+extern int nni_win_io_init(nni_win_io *, nni_win_io_cb, void *);
extern void nni_win_io_fini(nni_win_io *);
-extern void nni_win_io_cancel(nni_win_io *);
extern int nni_win_io_register(HANDLE);
diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c
index 1179b603..50b22343 100644
--- a/src/platform/windows/win_io.c
+++ b/src/platform/windows/win_io.c
@@ -61,14 +61,13 @@ nni_win_io_register(HANDLE h)
}
int
-nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr)
+nni_win_io_init(nni_win_io *io, nni_win_io_cb cb, void *ptr)
{
ZeroMemory(&io->olpd, sizeof(io->olpd));
io->cb = cb;
io->ptr = ptr;
io->aio = NULL;
- io->f = f;
io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (io->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
@@ -77,14 +76,6 @@ nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr)
}
void
-nni_win_io_cancel(nni_win_io *io)
-{
- if (io->f != INVALID_HANDLE_VALUE) {
- CancelIoEx(io->f, &io->olpd);
- }
-}
-
-void
nni_win_io_fini(nni_win_io *io)
{
if (io->olpd.hEvent != NULL) {
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
deleted file mode 100644
index 169a2e00..00000000
--- a/src/platform/windows/win_ipc.c
+++ /dev/null
@@ -1,673 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_WINDOWS
-
-#include <stdio.h>
-
-struct nni_plat_ipc_pipe {
- HANDLE p;
- int mode;
- nni_win_event rcv_ev;
- nni_win_event snd_ev;
-};
-
-struct nni_plat_ipc_ep {
- char path[NNG_MAXADDRLEN + 16];
- nni_sockaddr addr;
- int mode;
- bool started;
- HANDLE p; // accept side only
- nni_win_event acc_ev; // accept side only
- nni_aio * con_aio; // conn side only
- nni_list_node node; // conn side uses this
- SECURITY_ATTRIBUTES sec_attr;
-};
-
-static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *);
-static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *);
-static void nni_win_ipc_pipe_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_ipc_pipe_ops = {
- .wev_start = nni_win_ipc_pipe_start,
- .wev_finish = nni_win_ipc_pipe_finish,
- .wev_cancel = nni_win_ipc_pipe_cancel,
-};
-
-static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *);
-static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *);
-static void nni_win_ipc_acc_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_ipc_acc_ops = {
- .wev_start = nni_win_ipc_acc_start,
- .wev_finish = nni_win_ipc_acc_finish,
- .wev_cancel = nni_win_ipc_acc_cancel,
-};
-
-static int
-nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
-{
- void * buf;
- DWORD len;
- BOOL ok;
- int rv;
- nni_plat_ipc_pipe *pipe = evt->ptr;
- unsigned idx;
- unsigned naiov;
- nni_iov * aiov;
-
- NNI_ASSERT(aio != NULL);
-
- if (pipe->p == INVALID_HANDLE_VALUE) {
- evt->status = NNG_ECLOSED;
- evt->count = 0;
- return (1);
- }
-
- nni_aio_get_iov(aio, &naiov, &aiov);
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes do
- // not appear to support scatter/gather, so we have to process
- // each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
-
- evt->count = 0;
- if (evt == &pipe->snd_ev) {
- ok = WriteFile(pipe->p, buf, len, NULL, &evt->olpd);
- } else {
- ok = ReadFile(pipe->p, buf, len, NULL, &evt->olpd);
- }
- if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
-
- // Wait for the I/O completion event. Note that when an I/O
- // completes immediately, the I/O completion packet is still
- // delivered.
- return (0);
-}
-
-static void
-nni_win_ipc_pipe_cancel(nni_win_event *evt)
-{
- nni_plat_ipc_pipe *pipe = evt->ptr;
-
- CancelIoEx(pipe->p, &evt->olpd);
-}
-
-static void
-nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio)
-{
- nni_aio_finish(aio, evt->status, evt->count);
-}
-
-static int
-nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p, int mode)
-{
- nni_plat_ipc_pipe *pipe;
- int rv;
-
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- pipe->mode = mode;
- rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
- rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
-
- pipe->p = p;
- *pipep = pipe;
- return (0);
-}
-
-void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio)
-{
- nni_win_event_submit(&pipe->snd_ev, aio);
-}
-
-void
-nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio)
-{
- nni_win_event_submit(&pipe->rcv_ev, aio);
-}
-
-void
-nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe)
-{
- HANDLE p;
-
- nni_win_event_close(&pipe->snd_ev);
- nni_win_event_close(&pipe->rcv_ev);
-
- if ((p = pipe->p) != INVALID_HANDLE_VALUE) {
- pipe->p = INVALID_HANDLE_VALUE;
- CloseHandle(p);
- }
-}
-
-void
-nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe)
-{
- nni_plat_ipc_pipe_close(pipe);
-
- nni_win_event_fini(&pipe->snd_ev);
- nni_win_event_fini(&pipe->rcv_ev);
- NNI_FREE_STRUCT(pipe);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-// nni_plat_ipc_pipe_get_peer_gid obtains the peer group id, if possible.
-// NB: Only POSIX systems support group IDs.
-int
-nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *pipe, uint64_t *pid)
-{
- ULONG id;
- switch (pipe->mode) {
- case NNI_EP_MODE_DIAL:
- if (!GetNamedPipeServerProcessId(pipe->p, &id)) {
- return (nni_win_error(GetLastError()));
- }
- *pid = id;
- break;
- case NNI_EP_MODE_LISTEN:
- if (!GetNamedPipeClientProcessId(pipe->p, &id)) {
- return (nni_win_error(GetLastError()));
- }
- *pid = id;
- break;
- default:
- // Should never occur!
- return (NNG_EINVAL);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode)
-{
- const char * path;
- nni_plat_ipc_ep *ep;
-
- path = sa->s_ipc.sa_path;
- if (nni_strnlen(path, NNG_MAXADDRLEN) >= NNG_MAXADDRLEN) {
- return (NNG_EINVAL);
- }
-
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
- }
- ZeroMemory(ep, sizeof(*ep));
-
- ep->mode = mode;
- ep->sec_attr.nLength = sizeof(ep->sec_attr);
- ep->sec_attr.lpSecurityDescriptor = NULL;
- ep->sec_attr.bInheritHandle = FALSE;
- NNI_LIST_NODE_INIT(&ep->node);
-
- ep->addr = *sa;
- (void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path);
-
- *epp = ep;
- return (0);
-}
-
-int
-nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits)
-{
- NNI_ARG_UNUSED(ep);
- NNI_ARG_UNUSED(bits);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *desc)
-{
- if (ep->started) {
- return (NNG_EBUSY);
- }
- if (ep->mode != NNI_EP_MODE_LISTEN) {
- return (NNG_ENOTSUP);
- }
- if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) {
- return (NNG_EINVAL);
- }
- ep->sec_attr.lpSecurityDescriptor = desc;
- return (0);
-}
-
-int
-nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
-{
- int rv;
- HANDLE p;
-
- if (ep->mode != NNI_EP_MODE_LISTEN) {
- return (NNG_EINVAL);
- }
- if (ep->started) {
- return (NNG_EBUSY);
- }
-
- // We create the first named pipe, and we make sure that it is
- // properly ours.
- p = CreateNamedPipeA(ep->path,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
- FILE_FLAG_FIRST_PIPE_INSTANCE,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
- PIPE_REJECT_REMOTE_CLIENTS,
- PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr);
- if (p == INVALID_HANDLE_VALUE) {
- if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) {
- rv = NNG_EADDRINUSE;
- } else {
- rv = nni_win_error(rv);
- }
- goto failed;
- }
- rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep);
- if (rv != 0) {
- goto failed;
- }
-
- if ((rv = nni_win_iocp_register(p)) != 0) {
- goto failed;
- }
-
- ep->p = p;
- ep->started = true;
- return (0);
-
-failed:
-
- if (p != INVALID_HANDLE_VALUE) {
- (void) CloseHandle(p);
- }
-
- return (rv);
-}
-
-static void
-nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_ipc_ep * ep = evt->ptr;
- nni_plat_ipc_pipe *pipe;
- int rv;
- HANDLE newp, oldp;
-
- if ((rv = evt->status) != 0) {
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- newp = CreateNamedPipeA(ep->path,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
- PIPE_REJECT_REMOTE_CLIENTS,
- PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr);
- if (newp == INVALID_HANDLE_VALUE) {
- rv = nni_win_error(GetLastError());
- // We connected, but as we cannot get a new pipe,
- // we have to disconnect the old one.
- DisconnectNamedPipe(ep->p);
- nni_aio_finish_error(aio, rv);
- return;
- }
- if ((rv = nni_win_iocp_register(newp)) != 0) {
- // Disconnect the old pipe.
- DisconnectNamedPipe(ep->p);
- // And discard the half-baked new one.
- DisconnectNamedPipe(newp);
- (void) CloseHandle(newp);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- oldp = ep->p;
- ep->p = newp;
-
- if ((rv = nni_win_ipc_pipe_init(&pipe, oldp, NNI_EP_MODE_LISTEN)) !=
- 0) {
- // The new pipe is already fine for us. Discard
- // the old one, since failed to be able to use it.
- DisconnectNamedPipe(oldp);
- (void) CloseHandle(oldp);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
-}
-
-static void
-nni_win_ipc_acc_cancel(nni_win_event *evt)
-{
- nni_plat_ipc_ep *ep = evt->ptr;
-
- (void) CancelIoEx(ep->p, &evt->olpd);
- // Just to be sure.
- (void) DisconnectNamedPipe(ep->p);
-}
-
-static int
-nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_ipc_ep *ep = evt->ptr;
- NNI_ARG_UNUSED(aio);
-
- if (!ConnectNamedPipe(ep->p, &evt->olpd)) {
- int rv = GetLastError();
- switch (rv) {
- case ERROR_PIPE_CONNECTED:
- // Kind of like success, but as this is technically
- // an "error", we have to complete it ourself.
- evt->status = 0;
- evt->count = 0;
- return (1);
-
- case ERROR_IO_PENDING:
- // Normal asynchronous operation. Wait for
- // completion.
- return (0);
-
- default:
- // Fast-fail (synchronous).
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
- }
-
- // Synchronous success - the I/O completion packet should still
- // be delivered.
- return (0);
-}
-
-void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_win_event_submit(&ep->acc_ev, aio);
-}
-
-// So Windows IPC is a bit different on the client side. There is no
-// support for asynchronous connection, but we can fake it with a
-// single thread that runs to establish the connection. That thread
-// will run keep looping, sleeping for 10 ms between attempts. It
-// performs non-blocking attempts to connect.
-typedef struct nni_win_ipc_conn_work nni_win_ipc_conn_work;
-struct nni_win_ipc_conn_work {
- nni_list waiters;
- nni_list workers;
- nni_mtx mtx;
- nni_cv cv;
- nni_thr thr;
- int exit;
-};
-
-static nni_win_ipc_conn_work nni_win_ipc_connecter;
-
-static void
-nni_win_ipc_conn_thr(void *arg)
-{
- nni_win_ipc_conn_work *w = arg;
- nni_plat_ipc_ep * ep;
- nni_plat_ipc_pipe * pipe;
- nni_aio * aio;
- HANDLE p;
- int rv;
-
- nni_mtx_lock(&w->mtx);
- for (;;) {
- if (w->exit) {
- break;
- }
- while ((ep = nni_list_first(&w->waiters)) != NULL) {
- nni_list_remove(&w->waiters, ep);
- nni_list_append(&w->workers, ep);
- }
-
- while ((ep = nni_list_first(&w->workers)) != NULL) {
-
- nni_list_remove(&w->workers, ep);
-
- if ((aio = ep->con_aio) == NULL) {
- continue;
- }
- ep->con_aio = NULL;
-
- pipe = NULL;
-
- p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE,
- 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED,
- NULL);
-
- if (p == INVALID_HANDLE_VALUE) {
- switch ((rv = GetLastError())) {
- case ERROR_PIPE_BUSY:
- // Still in progress. This
- // shouldn't happen unless the
- // other side aborts the
- // connection.
- ep->con_aio = aio;
- nni_list_append(&w->waiters, ep);
- continue;
-
- case ERROR_FILE_NOT_FOUND:
- rv = NNG_ECONNREFUSED;
- break;
- default:
- rv = nni_win_error(rv);
- break;
- }
- goto fail;
- }
- if (((rv = nni_win_ipc_pipe_init(
- &pipe, p, NNI_EP_MODE_DIAL)) != 0) ||
- ((rv = nni_win_iocp_register(p)) != 0)) {
- goto fail;
- }
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
- continue;
-
- fail:
- if (p != INVALID_HANDLE_VALUE) {
- DisconnectNamedPipe(p);
- CloseHandle(p);
- }
- if (pipe != NULL) {
- nni_plat_ipc_pipe_fini(pipe);
- }
- nni_aio_finish_error(aio, rv);
- }
-
- if (nni_list_empty(&w->waiters)) {
- // Wait until an endpoint is added.
- nni_cv_wait(&w->cv);
- } else {
- // Wait 10 ms, unless woken earlier.
- nni_cv_until(&w->cv, nni_clock() + 10);
- }
- }
- nni_mtx_unlock(&w->mtx);
-}
-
-static void
-nni_win_ipc_conn_cancel(nni_aio *aio, int rv)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- nni_plat_ipc_ep * ep = nni_aio_get_prov_data(aio);
-
- nni_mtx_lock(&w->mtx);
- if (aio == ep->con_aio) {
- ep->con_aio = NULL;
- if (nni_list_active(&w->waiters, ep)) {
- nni_list_remove(&w->waiters, ep);
- nni_cv_wake(&w->cv);
- }
- nni_aio_finish_error(aio, rv);
- }
- nni_mtx_unlock(&w->mtx);
-}
-
-void
-nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&w->mtx);
- if ((rv = nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep)) != 0) {
- nni_mtx_unlock(&w->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- NNI_ASSERT(!nni_list_active(&w->waiters, ep));
-
- ep->con_aio = aio;
- nni_list_append(&w->waiters, ep);
- nni_cv_wake(&w->cv);
- nni_mtx_unlock(&w->mtx);
-}
-
-void
-nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
-{
- nni_plat_ipc_ep_close(ep);
- if (ep->p != INVALID_HANDLE_VALUE) {
- CloseHandle(ep->p);
- ep->p = INVALID_HANDLE_VALUE;
- }
- nni_win_event_close(&ep->acc_ev);
- nni_win_event_fini(&ep->acc_ev);
- NNI_FREE_STRUCT(ep);
-}
-
-void
-nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- nni_aio * aio;
-
- switch (ep->mode) {
- case NNI_EP_MODE_DIAL:
- nni_mtx_lock(&w->mtx);
- if (nni_list_active(&w->waiters, ep)) {
- nni_list_remove(&w->waiters, ep);
- }
- if ((aio = ep->con_aio) != NULL) {
- ep->con_aio = NULL;
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
- nni_mtx_unlock(&w->mtx);
- break;
-
- case NNI_EP_MODE_LISTEN:
- nni_win_event_close(&ep->acc_ev);
- if (ep->p != INVALID_HANDLE_VALUE) {
- CloseHandle(ep->p);
- ep->p = INVALID_HANDLE_VALUE;
- }
- break;
- }
-}
-
-int
-nni_win_ipc_sysinit(void)
-{
- int rv;
- nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter;
-
- NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node);
- NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node);
-
- nni_mtx_init(&worker->mtx);
- nni_cv_init(&worker->cv, &worker->mtx);
-
- rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker);
- if (rv != 0) {
- return (rv);
- }
-
- nni_thr_run(&worker->thr);
-
- return (0);
-}
-
-void
-nni_win_ipc_sysfini(void)
-{
- nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter;
-
- nni_mtx_lock(&worker->mtx);
- worker->exit = 1;
- nni_cv_wake(&worker->cv);
- nni_mtx_unlock(&worker->mtx);
- nni_thr_fini(&worker->thr);
- nni_cv_fini(&worker->cv);
- nni_mtx_fini(&worker->mtx);
-}
-
-#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipc.h b/src/platform/windows/win_ipc.h
new file mode 100644
index 00000000..51ce5548
--- /dev/null
+++ b/src/platform/windows/win_ipc.h
@@ -0,0 +1,62 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef PLATFORM_WIN_WINIPC_H
+#define PLATFORM_WIN_WINIPC_H
+
+// This header file is private to the IPC (named pipes) support for Windows.
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+struct nni_ipc_conn {
+ HANDLE f;
+ nni_win_io recv_io;
+ nni_win_io send_io;
+ nni_win_io conn_io;
+ nni_list recv_aios;
+ nni_list send_aios;
+ nni_aio * conn_aio;
+ nni_ipc_dialer * dialer;
+ nni_ipc_listener *listener;
+ int recv_rv;
+ int send_rv;
+ int conn_rv;
+ bool closed;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_reap_item reap;
+};
+
+struct nni_ipc_dialer {
+ bool closed; // dialers are locked by the worker lock
+ nni_list aios;
+ nni_list_node node; // node on worker list
+};
+
+struct nni_ipc_listener {
+ char * path;
+ bool started;
+ bool closed;
+ HANDLE f;
+ SECURITY_ATTRIBUTES sec_attr;
+ nni_list aios;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_win_io io;
+ int rv;
+};
+
+extern int nni_win_ipc_conn_init(nni_ipc_conn **, HANDLE);
+
+#endif // NNG_PLATFORM_WINDOWS
+
+#endif // NNG_PLATFORM_WIN_WINIPC_H
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c
new file mode 100644
index 00000000..d8ef4e4e
--- /dev/null
+++ b/src/platform/windows/win_ipcconn.c
@@ -0,0 +1,388 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+static void
+ipc_recv_start(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ unsigned idx;
+ unsigned naiov;
+ nni_iov *aiov;
+ void * buf;
+ DWORD len;
+ int rv;
+
+ if (c->closed) {
+ while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_list_remove(&c->recv_aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&c->cv);
+ }
+again:
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ return;
+ }
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes do
+ // not appear to support scatter/gather, so we have to process
+ // each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ goto again;
+ }
+}
+
+static void
+ipc_recv_cb(nni_win_io *io, int rv, size_t num)
+{
+ nni_aio * aio;
+ nni_ipc_conn *c = io->ptr;
+ nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ // Should indicate that it was closed.
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ if (c->recv_rv != 0) {
+ rv = c->recv_rv;
+ c->recv_rv = 0;
+ }
+ nni_aio_list_remove(aio);
+ ipc_recv_start(c);
+ if (c->closed) {
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ if ((rv == 0) && (num == 0)) {
+ // A zero byte receive is a remote close from the peer.
+ rv = NNG_ECLOSED;
+ }
+ nni_aio_finish_synch(aio, rv, num);
+}
+static void
+ipc_recv_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+ nni_mtx_lock(&c->mtx);
+ if (aio == nni_list_first(&c->recv_aios)) {
+ c->recv_rv = rv;
+ CancelIoEx(c->f, &c->recv_io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (c->closed) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&c->recv_aios, aio);
+ if (aio == nni_list_first(&c->recv_aios)) {
+ ipc_recv_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_send_start(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ unsigned idx;
+ unsigned naiov;
+ nni_iov *aiov;
+ void * buf;
+ DWORD len;
+ int rv;
+
+ if (c->closed) {
+ while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_list_remove(&c->send_aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&c->cv);
+ }
+again:
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ return;
+ }
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes do
+ // not appear to support scatter/gather, so we have to process
+ // each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ goto again;
+ }
+}
+
+static void
+ipc_send_cb(nni_win_io *io, int rv, size_t num)
+{
+ nni_aio * aio;
+ nni_ipc_conn *c = io->ptr;
+ nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ // Should indicate that it was closed.
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ if (c->send_rv != 0) {
+ rv = c->send_rv;
+ c->send_rv = 0;
+ }
+ nni_aio_list_remove(aio);
+ ipc_send_start(c);
+ if (c->closed) {
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ if ((rv == 0) && (num == 0)) {
+ // A zero byte receive is a remote close from the peer.
+ rv = NNG_ECLOSED;
+ }
+ nni_aio_finish_synch(aio, rv, num);
+}
+
+static void
+ipc_send_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+ nni_mtx_lock(&c->mtx);
+ if (aio == nni_list_first(&c->send_aios)) {
+ c->send_rv = rv;
+ CancelIoEx(c->f, &c->send_io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (c->closed) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_send_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&c->send_aios, aio);
+ if (aio == nni_list_first(&c->send_aios)) {
+ ipc_send_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+nni_win_ipc_conn_init(nni_ipc_conn **connp, HANDLE p)
+{
+ nni_ipc_conn *c;
+ int rv;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ c->f = INVALID_HANDLE_VALUE;
+ nni_mtx_init(&c->mtx);
+ nni_cv_init(&c->cv, &c->mtx);
+ nni_aio_list_init(&c->recv_aios);
+ nni_aio_list_init(&c->send_aios);
+
+ if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) ||
+ ((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) {
+ nni_ipc_conn_fini(c);
+ return (rv);
+ }
+
+ c->f = p;
+ *connp = c;
+ return (0);
+}
+
+void
+nni_ipc_conn_close(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ c->closed = true;
+ if (!nni_list_empty(&c->recv_aios)) {
+ CancelIoEx(c->f, &c->recv_io.olpd);
+ }
+ if (!nni_list_empty(&c->send_aios)) {
+ CancelIoEx(c->f, &c->send_io.olpd);
+ }
+
+ if (c->f != INVALID_HANDLE_VALUE) {
+ // NB: closing the pipe is dangerous at this point.
+ DisconnectNamedPipe(c->f);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_reap(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ while ((!nni_list_empty(&c->recv_aios)) ||
+ (!nni_list_empty(&c->send_aios))) {
+ nni_cv_wait(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ nni_win_io_fini(&c->recv_io);
+ nni_win_io_fini(&c->send_io);
+ nni_win_io_fini(&c->conn_io);
+
+ if (c->f != INVALID_HANDLE_VALUE) {
+ CloseHandle(c->f);
+ }
+ nni_cv_fini(&c->cv);
+ nni_mtx_fini(&c->mtx);
+ NNI_FREE_STRUCT(c);
+}
+
+void
+nni_ipc_conn_fini(nni_ipc_conn *c)
+{
+ nni_ipc_conn_close(c);
+
+ nni_reap(&c->reap, (nni_cb) ipc_conn_reap, c);
+}
+
+int
+nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid)
+{
+ ULONG id;
+ if (c->dialer) {
+ if (!GetNamedPipeServerProcessId(c->f, &id)) {
+ return (nni_win_error(GetLastError()));
+ }
+ } else {
+ if (!GetNamedPipeClientProcessId(c->f, &id)) {
+ return (nni_win_error(GetLastError()));
+ }
+ }
+ *pid = id;
+ return (0);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c
new file mode 100644
index 00000000..429bcedf
--- /dev/null
+++ b/src/platform/windows/win_ipcdial.c
@@ -0,0 +1,265 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+int
+nni_ipc_dialer_init(nni_ipc_dialer **dp)
+{
+ nni_ipc_dialer *d;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ d->closed = false;
+ nni_aio_list_init(&d->aios);
+ *dp = d;
+ return (0);
+}
+
+// Windows IPC is a bit different on the client side. There is no
+// support for asynchronous connection, but we can fake it with a
+// single thread that runs to establish the connection. That thread
+// will run keep looping, sleeping for 10 ms between attempts. It
+// performs non-blocking attempts to connect.
+typedef struct ipc_dial_work {
+ nni_list waiters;
+ nni_list workers;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_thr thr;
+ int exit;
+} ipc_dial_work;
+
+static ipc_dial_work ipc_connecter;
+
+static void
+ipc_dial_thr(void *arg)
+{
+ ipc_dial_work *w = arg;
+
+ nni_mtx_lock(&w->mtx);
+ for (;;) {
+ nni_ipc_dialer *d;
+
+ if (w->exit) {
+ break;
+ }
+ while ((d = nni_list_first(&w->waiters)) != NULL) {
+ nni_list_remove(&w->waiters, d);
+ nni_list_append(&w->workers, d);
+ }
+
+ while ((d = nni_list_first(&w->workers)) != NULL) {
+ nni_ipc_conn *c;
+ nni_aio * aio;
+ HANDLE f;
+ int rv;
+ char * path;
+
+ if ((aio = nni_list_first(&d->aios)) == NULL) {
+ nni_list_remove(&w->workers, d);
+ continue;
+ }
+
+ path = nni_aio_get_prov_extra(aio, 0);
+
+ f = CreateFileA(path, GENERIC_READ | GENERIC_WRITE, 0,
+ NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
+
+ if (f == INVALID_HANDLE_VALUE) {
+ switch ((rv = GetLastError())) {
+ case ERROR_PIPE_BUSY:
+ // Still in progress. This
+ // shouldn't happen unless the
+ // other side aborts the
+ // connection.
+ // back at the head of the list
+ nni_list_remove(&w->workers, d);
+ nni_list_prepend(&w->waiters, d);
+ continue;
+
+ case ERROR_FILE_NOT_FOUND:
+ rv = NNG_ECONNREFUSED;
+ break;
+ default:
+ rv = nni_win_error(rv);
+ break;
+ }
+ nni_list_remove(&d->aios, aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ nni_list_remove(&d->aios, aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+
+ if (((rv = nni_win_io_register(f)) != 0) ||
+ ((rv = nni_win_ipc_conn_init(&c, f)) != 0)) {
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ c->dialer = d;
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+
+ if (nni_list_empty(&w->waiters)) {
+ // Wait until an endpoint is added.
+ nni_cv_wait(&w->cv);
+ } else {
+ // Wait 10 ms, unless woken earlier.
+ nni_cv_until(&w->cv, nni_clock() + 10);
+ }
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+static void
+ipc_dial_cancel(nni_aio *aio, int rv)
+{
+ ipc_dial_work * w = &ipc_connecter;
+ nni_ipc_dialer *d = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&w->mtx);
+ if (nni_aio_list_active(aio)) {
+ char *path;
+ if (nni_list_active(&w->waiters, d)) {
+ nni_list_remove(&w->waiters, d);
+ nni_cv_wake(&w->cv);
+ }
+ nni_aio_list_remove(aio);
+ path = nni_aio_get_prov_extra(aio, 0);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+void
+nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+{
+ ipc_dial_work *w = &ipc_connecter;
+ char * path;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ if (sa->s_family != NNG_AF_IPC) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
+ if ((rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path)) !=
+ 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_mtx_lock(&w->mtx);
+ if ((rv = nni_aio_schedule(aio, ipc_dial_cancel, d)) != 0) {
+ nni_mtx_unlock(&w->mtx);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (d->closed) {
+ nni_mtx_unlock(&w->mtx);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ nni_aio_set_prov_extra(aio, 0, path);
+ nni_list_append(&d->aios, aio);
+ if (nni_list_first(&d->aios) == aio) {
+ nni_list_append(&w->waiters, d);
+ nni_cv_wake(&w->cv);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+void
+nni_ipc_dialer_fini(nni_ipc_dialer *d)
+{
+ nni_ipc_dialer_close(d);
+ NNI_FREE_STRUCT(d);
+}
+
+void
+nni_ipc_dialer_close(nni_ipc_dialer *d)
+{
+ ipc_dial_work *w = &ipc_connecter;
+ nni_aio * aio;
+
+ nni_mtx_lock(&w->mtx);
+ d->closed = true;
+ if (nni_list_active(&w->waiters, d)) {
+ nni_list_remove(&w->waiters, d);
+ }
+ while ((aio = nni_list_first(&d->aios)) != NULL) {
+ nni_list_remove(&d->aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+int
+nni_win_ipc_sysinit(void)
+{
+ int rv;
+ ipc_dial_work *worker = &ipc_connecter;
+
+ NNI_LIST_INIT(&worker->workers, nni_ipc_dialer, node);
+ NNI_LIST_INIT(&worker->waiters, nni_ipc_dialer, node);
+
+ nni_mtx_init(&worker->mtx);
+ nni_cv_init(&worker->cv, &worker->mtx);
+
+ rv = nni_thr_init(&worker->thr, ipc_dial_thr, worker);
+ if (rv != 0) {
+ return (rv);
+ }
+
+ nni_thr_run(&worker->thr);
+
+ return (0);
+}
+
+void
+nni_win_ipc_sysfini(void)
+{
+ ipc_dial_work *worker = &ipc_connecter;
+
+ nni_reap_drain(); // so that listeners get cleaned up.
+
+ nni_mtx_lock(&worker->mtx);
+ worker->exit = 1;
+ nni_cv_wake(&worker->cv);
+ nni_mtx_unlock(&worker->mtx);
+ nni_thr_fini(&worker->thr);
+ nni_cv_fini(&worker->cv);
+ nni_mtx_fini(&worker->mtx);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c
new file mode 100644
index 00000000..20bb8548
--- /dev/null
+++ b/src/platform/windows/win_ipclisten.c
@@ -0,0 +1,296 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+static void
+ipc_accept_done(nni_ipc_listener *l, int rv)
+{
+ nni_aio * aio;
+ HANDLE f;
+ nni_ipc_conn *c;
+
+ aio = nni_list_first(&l->aios);
+ nni_list_remove(&l->aios, aio);
+ nni_cv_wake(&l->cv);
+
+ if (l->closed) {
+ // Closed, so bail.
+ DisconnectNamedPipe(l->f);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ // Create a replacement pipe.
+ f = CreateNamedPipeA(l->path,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS,
+ PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr);
+ if (f == INVALID_HANDLE_VALUE) {
+ // We couldn't create a replacement pipe, so we have to
+ // abort the client from our side, so that we can keep
+ // our server pipe available.
+ rv = nni_win_error(GetLastError());
+ DisconnectNamedPipe(l->f);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (((rv = nni_win_io_register(f)) != 0) ||
+ ((rv = nni_win_ipc_conn_init(&c, l->f)) != 0)) {
+ DisconnectNamedPipe(l->f);
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ l->f = f;
+ c->listener = l;
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+ipc_accept_start(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+
+ if (l->closed) {
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ nni_list_remove(&l->aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&l->cv);
+ }
+
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ int rv;
+
+ if ((ConnectNamedPipe(l->f, &l->io.olpd)) ||
+ ((rv = GetLastError()) == ERROR_IO_PENDING)) {
+ // Success, or pending, handled via completion pkt.
+ return;
+ }
+ if (rv == ERROR_PIPE_CONNECTED) {
+ // Kind of like success, but as this is technically
+ // an "error", we have to complete it ourself.
+ // Fake a completion.
+ ipc_accept_done(l, 0);
+ } else {
+ // Fast-fail (synchronous).
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ }
+ }
+}
+
+static void
+ipc_accept_cb(nni_win_io *io, int rv, size_t cnt)
+{
+ nni_ipc_listener *l = io->ptr;
+
+ NNI_ARG_UNUSED(cnt);
+
+ nni_mtx_lock(&l->mtx);
+ if (nni_list_empty(&l->aios)) {
+ // We canceled this somehow. We no longer care.
+ DisconnectNamedPipe(l->f);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (l->rv != 0) {
+ rv = l->rv;
+ l->rv = 0;
+ }
+ ipc_accept_done(l, rv);
+ ipc_accept_start(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+int
+nni_ipc_listener_init(nni_ipc_listener **lp)
+{
+ nni_ipc_listener *l;
+ int rv;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_win_io_init(&l->io, ipc_accept_cb, l)) != 0) {
+ NNI_FREE_STRUCT(l);
+ return (rv);
+ }
+ l->started = false;
+ l->closed = false;
+ l->sec_attr.nLength = sizeof(l->sec_attr);
+ l->sec_attr.lpSecurityDescriptor = NULL;
+ l->sec_attr.bInheritHandle = FALSE;
+ nni_aio_list_init(&l->aios);
+ nni_mtx_init(&l->mtx);
+ nni_cv_init(&l->cv, &l->mtx);
+ *lp = l;
+ return (0);
+}
+
+int
+nni_ipc_listener_set_permissions(nni_ipc_listener *l, int bits)
+{
+ NNI_ARG_UNUSED(l);
+ NNI_ARG_UNUSED(bits);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *desc)
+{
+ if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) {
+ return (NNG_EINVAL);
+ }
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ l->sec_attr.lpSecurityDescriptor = desc;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+int
+nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
+{
+ int rv;
+ HANDLE f;
+ char * path;
+
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path);
+ if (rv != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ f = CreateNamedPipeA(path,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
+ FILE_FLAG_FIRST_PIPE_INSTANCE,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS,
+ PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr);
+ if (f == INVALID_HANDLE_VALUE) {
+ if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) {
+ rv = NNG_EADDRINUSE;
+ } else {
+ rv = nni_win_error(rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+ if ((rv = nni_win_io_register(f)) != 0) {
+ CloseHandle(f);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+
+ l->f = f;
+ l->path = path;
+ l->started = true;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+static void
+ipc_accept_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_listener *l = nni_aio_get_prov_data(aio);
+
+ nni_mtx_unlock(&l->mtx);
+ if (aio == nni_list_first(&l->aios)) {
+ l->rv = rv;
+ CancelIoEx(l->f, &l->io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_list_remove(&l->aios, aio);
+ nni_cv_wake(&l->cv);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
+{
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ nni_list_append(&l->aios, aio);
+ if (nni_list_first(&l->aios) == aio) {
+ ipc_accept_start(l);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_close(nni_ipc_listener *l)
+{
+
+ nni_mtx_lock(&l->mtx);
+ if (!l->closed) {
+ l->closed = true;
+ if (!nni_list_empty(&l->aios)) {
+ CancelIoEx(l->f, &l->io.olpd);
+ }
+ DisconnectNamedPipe(l->f);
+ CloseHandle(l->f);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_fini(nni_ipc_listener *l)
+{
+ nni_mtx_lock(&l->mtx);
+ while (!nni_list_empty(&l->aios)) {
+ nni_cv_wait(&l->cv);
+ }
+ nni_mtx_unlock(&l->mtx);
+ nni_win_io_fini(&l->io);
+ nni_strfree(l->path);
+ nni_cv_fini(&l->cv);
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c
index c3a4a5d8..08b759ca 100644
--- a/src/platform/windows/win_tcpconn.c
+++ b/src/platform/windows/win_tcpconn.c
@@ -102,7 +102,7 @@ tcp_recv_cancel(nni_aio *aio, int rv)
nni_mtx_lock(&c->mtx);
if (aio == nni_list_first(&c->recv_aios)) {
c->recv_rv = rv;
- nni_win_io_cancel(&c->recv_io);
+ CancelIoEx((HANDLE) c->s, &c->recv_io.olpd);
} else if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -192,7 +192,7 @@ tcp_send_cancel(nni_aio *aio, int rv)
nni_mtx_lock(&c->mtx);
if (aio == nni_list_first(&c->send_aios)) {
c->send_rv = rv;
- nni_win_io_cancel(&c->send_io);
+ CancelIoEx((HANDLE) c->s, &c->send_io.olpd);
} else if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -273,10 +273,8 @@ nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s)
nni_aio_list_init(&c->send_aios);
c->conn_aio = NULL;
- if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) !=
- 0) ||
- ((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) !=
- 0) ||
+ if (((rv = nni_win_io_init(&c->recv_io, tcp_recv_cb, c)) != 0) ||
+ ((rv = nni_win_io_init(&c->send_io, tcp_send_cb, c)) != 0) ||
((rv = nni_win_io_register((HANDLE) s)) != 0)) {
nni_tcp_conn_fini(c);
return (rv);
@@ -309,10 +307,10 @@ nni_tcp_conn_close(nni_tcp_conn *c)
if (!c->closed) {
c->closed = true;
if (!nni_list_empty(&c->recv_aios)) {
- nni_win_io_cancel(&c->recv_io);
+ CancelIoEx((HANDLE) c->s, &c->recv_io.olpd);
}
if (!nni_list_empty(&c->send_aios)) {
- nni_win_io_cancel(&c->send_io);
+ CancelIoEx((HANDLE) c->s, &c->send_io.olpd);
}
if (c->s != INVALID_SOCKET) {
shutdown(c->s, SD_BOTH);
@@ -372,7 +370,6 @@ nni_tcp_conn_fini(nni_tcp_conn *c)
while ((!nni_list_empty(&c->recv_aios)) ||
(!nni_list_empty(&c->send_aios))) {
nni_cv_wait(&c->cv);
- nni_mtx_unlock(&c->mtx);
}
nni_mtx_unlock(&c->mtx);
diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c
index 4a3e9f2f..5283ea81 100644
--- a/src/platform/windows/win_tcpdial.c
+++ b/src/platform/windows/win_tcpdial.c
@@ -70,7 +70,7 @@ nni_tcp_dialer_close(nni_tcp_dialer *d)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->conn_rv = NNG_ECLOSED;
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
}
}
@@ -104,7 +104,7 @@ tcp_dial_cancel(nni_aio *aio, int rv)
if (c->conn_rv == 0) {
c->conn_rv = rv;
}
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
nni_mtx_unlock(&d->mtx);
}
@@ -187,8 +187,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) !=
- 0) {
+ if ((rv = nni_win_io_init(&c->conn_io, tcp_dial_cb, c)) != 0) {
nni_tcp_conn_fini(c);
nni_aio_finish_error(aio, rv);
return;
diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c
index 5055c32d..1f197246 100644
--- a/src/platform/windows/win_tcplisten.c
+++ b/src/platform/windows/win_tcplisten.c
@@ -147,7 +147,7 @@ nni_tcp_listener_close(nni_tcp_listener *l)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->conn_rv = NNG_ECLOSED;
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
}
closesocket(l->s);
@@ -246,7 +246,7 @@ tcp_accept_cancel(nni_aio *aio, int rv)
if (c->conn_rv == 0) {
c->conn_rv = rv;
}
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
nni_mtx_unlock(&l->mtx);
}
@@ -263,6 +263,11 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
return;
}
nni_mtx_lock(&l->mtx);
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
if (l->closed) {
nni_mtx_unlock(&l->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -286,8 +291,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
c->listener = l;
c->conn_aio = aio;
nni_aio_set_prov_extra(aio, 0, c);
- if (((rv = nni_win_io_init(
- &c->conn_io, (HANDLE) l->s, tcp_accept_cb, c)) != 0) ||
+ if (((rv = nni_win_io_init(&c->conn_io, tcp_accept_cb, c)) != 0) ||
((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) {
nni_aio_set_prov_extra(aio, 0, NULL);
nni_mtx_unlock(&l->mtx);