aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/platform/windows/win_impl.h28
-rw-r--r--src/platform/windows/win_iocp.c105
-rw-r--r--src/platform/windows/win_ipc.c950
-rw-r--r--src/platform/windows/win_thread.c7
5 files changed, 817 insertions, 275 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 7d2f8d2f..18d4932f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -98,8 +98,8 @@ set (NNG_SOURCES
platform/windows/win_impl.h
platform/windows/win_clock.c
platform/windows/win_debug.c
- platform/windows/win_ipc.c
platform/windows/win_iocp.c
+ platform/windows/win_ipc.c
platform/windows/win_net.c
platform/windows/win_pipe.c
platform/windows/win_rand.c
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index fd8f3f79..6ce38250 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -22,6 +22,7 @@
#include <process.h>
#include <ws2tcpip.h>
+#include "core/list.h"
// These types are provided for here, to permit them to be directly inlined
// elsewhere.
@@ -35,22 +36,10 @@ typedef struct nni_win_event nni_win_event;
// conveyed in the OVERLAPPED directly.
struct nni_win_event {
OVERLAPPED olpd;
+ HANDLE h;
void * ptr;
nni_cb cb;
- int status;
- int nbytes;
-};
-
-struct nni_plat_ipcsock {
- HANDLE p;
-
- char path[256];
- WSAOVERLAPPED recv_olpd;
- WSAOVERLAPPED send_olpd;
- WSAOVERLAPPED conn_olpd; // Use for both connect and accept
- CRITICAL_SECTION cs;
-
- int server;
+ nni_list aios;
};
struct nni_plat_thr {
@@ -73,9 +62,20 @@ struct nni_plat_cv {
extern int nni_win_error(int);
extern int nni_winsock_error(int);
+extern int nni_win_event_init(nni_win_event *, nni_cb, void *, HANDLE);
+extern void nni_win_event_fini(nni_win_event *);
+extern int nni_win_event_reset(nni_win_event *);
+extern OVERLAPPED *nni_win_event_overlapped(nni_win_event *);
+extern void nni_win_event_cancel(nni_win_event *);
+
+extern int nni_win_iocp_register(HANDLE);
+
extern int nni_win_iocp_sysinit(void);
extern void nni_win_iocp_sysfini(void);
+extern int nni_win_ipc_sysinit(void);
+extern void nni_win_ipc_sysfini(void);
+
extern int nni_win_resolv_sysinit(void);
extern void nni_win_resolv_sysfini(void);
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index 06feb62e..381da64f 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -20,14 +20,14 @@
// port for pretty much everything.
static HANDLE nni_win_global_iocp = NULL;
-static nni_win_event nni_win_iocp_exit;
static nni_thr nni_win_iocp_thrs[NNI_WIN_IOCP_NTHREADS];
+static nni_mtx nni_win_iocp_mtx;
static void
nni_win_iocp_handler(void *arg)
{
HANDLE iocp;
- DWORD nbytes;
+ DWORD cnt;
ULONG_PTR key;
OVERLAPPED *olpd;
nni_win_event *evt;
@@ -42,7 +42,9 @@ nni_win_iocp_handler(void *arg)
key = 0;
olpd = NULL;
status = 0;
- rv = GetQueuedCompletionStatus(iocp, &nbytes, &key, &olpd,
+ cnt = 0;
+
+ rv = GetQueuedCompletionStatus(iocp, &cnt, &key, &olpd,
INFINITE);
if (rv == FALSE) {
@@ -50,21 +52,12 @@ nni_win_iocp_handler(void *arg)
// Completion port bailed...
break;
}
- nbytes = 0;
- key = 0;
- status = nni_win_error(GetLastError());
}
NNI_ASSERT(olpd != NULL);
evt = (void *) olpd;
- if (evt == &nni_win_iocp_exit) {
- // Exit requested.
- break;
- }
NNI_ASSERT(evt->cb != NULL);
- evt->nbytes = nbytes;
- evt->status = status;
evt->cb(evt->ptr);
}
}
@@ -81,13 +74,86 @@ nni_win_iocp_register(HANDLE h)
int
+nni_win_event_init(nni_win_event *evt, nni_cb cb, void *ptr, HANDLE h)
+{
+ ZeroMemory(&evt->olpd, sizeof (evt->olpd));
+ evt->olpd.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (evt->olpd.hEvent == NULL) {
+ return (nni_win_error(GetLastError()));
+ }
+ nni_aio_list_init(&evt->aios);
+ evt->ptr = ptr;
+ evt->cb = cb;
+ evt->h = h;
+ return (0);
+}
+
+
+void
+nni_win_event_fini(nni_win_event *evt)
+{
+ if (evt->olpd.hEvent != NULL) {
+ (void) CloseHandle(evt->olpd.hEvent);
+ evt->olpd.hEvent = NULL;
+ }
+}
+
+
+int
+nni_win_event_reset(nni_win_event *evt)
+{
+ if (!ResetEvent(evt->olpd.hEvent)) {
+ return (nni_win_error(GetLastError()));
+ }
+ return (0);
+}
+
+
+OVERLAPPED *
+nni_win_event_overlapped(nni_win_event *evt)
+{
+ return (&evt->olpd);
+}
+
+
+void
+nni_win_event_cancel(nni_win_event *evt)
+{
+ int rv;
+ DWORD cnt;
+
+ // Try to cancel the event...
+ if (!CancelIoEx(evt->h, &evt->olpd)) {
+ // None was found. That's good.
+ if ((rv = GetLastError()) == ERROR_NOT_FOUND) {
+ // Nothing queued. We may in theory be running
+ // the callback via the completion port handler;
+ // caller must synchronize that separately.
+ return;
+ }
+
+ // It's unclear why we would ever fail in this
+ // circumstance. Is there a kind of "uncancellable I/O"
+ // here, or somesuch? In this case we just wait hard
+ // using the success case -- its the best we can do.
+ }
+
+ // This basically just waits for the canceled I/O to complete.
+ // The end result can be either success or ERROR_OPERATION_ABORTED.
+ // It turns out we don't much care either way; we just want to make
+ // sure that we don't have any I/O pending on the overlapped
+ // structure before we release it or reuse it.
+ GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE);
+}
+
+
+int
nni_win_iocp_sysinit(void)
{
HANDLE h;
int i;
int rv;
- ZeroMemory(&nni_win_iocp_exit, sizeof (nni_win_iocp_exit));
h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,
NNI_WIN_IOCP_NTHREADS);
if (h == NULL) {
@@ -101,6 +167,9 @@ nni_win_iocp_sysinit(void)
goto fail;
}
}
+ if ((rv = nni_mtx_init(&nni_win_iocp_mtx)) != 0) {
+ goto fail;
+ }
for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) {
nni_thr_run(&nni_win_iocp_thrs[i]);
}
@@ -108,7 +177,6 @@ nni_win_iocp_sysinit(void)
fail:
if ((h = nni_win_global_iocp) != NULL) {
- PostQueuedCompletionStatus(h, 0, 0, &nni_win_iocp_exit.olpd);
CloseHandle(h);
nni_win_global_iocp = NULL;
}
@@ -126,14 +194,13 @@ nni_win_iocp_sysfini(void)
HANDLE h;
if ((h = nni_win_global_iocp) != NULL) {
- // Signal the iocp poller to exit.
- PostQueuedCompletionStatus(h, 0, 0, &nni_win_iocp_exit.olpd);
CloseHandle(h);
nni_win_global_iocp = NULL;
- for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) {
- nni_thr_fini(&nni_win_iocp_thrs[i]);
- }
}
+ for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) {
+ nni_thr_fini(&nni_win_iocp_thrs[i]);
+ }
+ nni_mtx_fini(&nni_win_iocp_mtx);
}
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index 8ae26a6e..f6601f93 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -13,371 +13,841 @@
#include <stdio.h>
-#if 0
+static void nni_win_ipc_acc_cb(void *);
+static void nni_win_ipc_send_cb(void *);
+static void nni_win_ipc_recv_cb(void *);
+static void nni_win_ipc_send_start(nni_plat_ipc_pipe *);
+static void nni_win_ipc_recv_start(nni_plat_ipc_pipe *);
-int
-nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
+struct nni_plat_ipc_pipe {
+ HANDLE p;
+ nni_win_event recv_evt;
+ nni_win_event send_evt;
+ nni_mtx mtx;
+ nni_list readq;
+ nni_list writeq;
+};
+
+struct nni_plat_ipc_ep {
+ char path[256];
+ int mode;
+ int started;
+ nni_list aios;
+ HANDLE p; // accept side only
+ nni_win_event acc_evt; // accept side only
+ nni_mtx mtx; // accept side only
+ nni_list_node node; // conn side uses this
+};
+
+
+static int
+nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p)
{
- int i;
- DWORD nsent;
- DWORD resid;
- char *buf;
- DWORD len;
- nni_iov iov[4];
+ nni_plat_ipc_pipe *pipe;
int rv;
- OVERLAPPED *olp = &s->send_olpd;
-
- NNI_ASSERT(cnt <= 4);
- for (i = 0, resid = 0; i < cnt; resid += (DWORD) iov[i].iov_len, i++) {
- iov[i].iov_len = iovs[i].iov_len;
- iov[i].iov_buf = iovs[i].iov_buf;
- }
-
- i = 0;
- while (resid) {
- NNI_ASSERT(i < cnt);
- nsent = 0;
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- len = iov[i].iov_len > 0x1000000 ? 0x1000000 :
- (DWORD) iov[i].iov_len;
- buf = iov[i].iov_buf;
-
- if (!WriteFile(s->p, buf, len, NULL, olp)) {
- if ((rv = GetLastError()) != ERROR_IO_PENDING) {
- return (nni_winpipe_error(rv));
+
+ if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+ rv = nni_win_event_init(&pipe->recv_evt, nni_win_ipc_recv_cb, pipe, p);
+ if (rv != 0) {
+ nni_plat_ipc_pipe_fini(pipe);
+ return (rv);
+ }
+ rv = nni_win_event_init(&pipe->send_evt, nni_win_ipc_send_cb, pipe, p);
+ if (rv != 0) {
+ nni_plat_ipc_pipe_fini(pipe);
+ return (rv);
+ }
+
+ pipe->p = p;
+ nni_aio_list_init(&pipe->readq);
+ nni_aio_list_init(&pipe->writeq);
+ *pipep = pipe;
+ return (0);
+}
+
+
+static void
+nni_win_ipc_send_cancel(nni_aio *aio)
+{
+ nni_plat_ipc_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ nni_win_event_cancel(&pipe->recv_evt);
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
+nni_win_ipc_send_finish(nni_plat_ipc_pipe *pipe)
+{
+ nni_win_event *evt = &pipe->send_evt;
+ OVERLAPPED *olpd = nni_win_event_overlapped(evt);
+ int rv = 0;
+ nni_aio *aio;
+ DWORD cnt;
+
+ if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) {
+ rv = nni_win_error(GetLastError());
+ }
+ if ((aio = nni_list_first(&pipe->writeq)) == NULL) {
+ // If the AIO was canceled, but IOCP thread was still
+ // working on it, we might have seen this.
+ return;
+ }
+ if (rv == 0) {
+ NNI_ASSERT(cnt <= aio->a_iov[0].iov_len);
+ aio->a_count += cnt;
+ aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt;
+ aio->a_iov[0].iov_len -= cnt;
+
+ if (aio->a_iov[0].iov_len == 0) {
+ int i;
+ for (i = 1; i < aio->a_niov; i++) {
+ aio->a_iov[i-1] = aio->a_iov[i];
}
+ aio->a_niov--;
}
- if (!GetOverlappedResult(s->p, olp, &nsent, TRUE)) {
- rv = GetLastError();
- return (nni_winpipe_error(rv));
- }
- NNI_ASSERT(nsent <= resid);
- NNI_ASSERT(nsent <= len);
- resid -= nsent;
- if (nsent < iov[i].iov_len) {
- iov[i].iov_buf = buf + nsent;
- iov[i].iov_len -= nsent;
- } else {
- i++;
+
+ if (aio->a_niov > 0) {
+ // If we have more to do, submit it!
+ nni_win_ipc_send_start(pipe);
+ return;
}
}
- return (0);
+
+ // All done; hopefully successfully.
+ nni_list_remove(&pipe->writeq, aio);
+ nni_aio_finish(aio, rv, aio->a_count);
}
-int
-nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
+static void
+nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe)
{
- int i;
- DWORD nrecv;
- DWORD resid;
+ void *buf;
DWORD len;
- char *buf;
- nni_iov iov[4];
int rv;
- OVERLAPPED *olp = &s->recv_olpd;
-
- NNI_ASSERT(cnt <= 4);
- for (i = 0, resid = 0; i < cnt; resid += (DWORD) iov[i].iov_len, i++) {
- iov[i].iov_len = iovs[i].iov_len;
- iov[i].iov_buf = iovs[i].iov_buf;
- }
-
- i = 0;
- while (resid) {
- NNI_ASSERT(i < cnt);
- nrecv = 0;
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- len = iov[i].iov_len > 0x1000000 ? 0x1000000 :
- (DWORD) iov[i].iov_len;
- buf = iov[i].iov_buf;
-
- if (!ReadFile(s->p, buf, len, NULL, olp)) {
- if ((rv = GetLastError()) != ERROR_IO_PENDING) {
- return (nni_winpipe_error(rv));
- }
+ nni_win_event *evt = &pipe->send_evt;
+ OVERLAPPED *olpd = nni_win_event_overlapped(evt);
+ nni_aio *aio = nni_list_first(&pipe->writeq);
+
+ NNI_ASSERT(aio != NULL);
+ NNI_ASSERT(aio->a_niov > 0);
+ NNI_ASSERT(aio->a_iov[0].iov_len > 0);
+ NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
+
+ if (pipe->p == INVALID_HANDLE_VALUE) {
+ rv = NNG_ECLOSED;
+ goto fail;
+ }
+
+ if ((rv = nni_win_event_reset(evt)) != 0) {
+ goto fail;
+ }
+
+ // Now start a writefile. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes do
+ // not appear to support scatter/gather, so we have to process
+ // each element in turn.
+ buf = aio->a_iov[0].iov_buf;
+ len = (DWORD) aio->a_iov[0].iov_len;
+ olpd = nni_win_event_overlapped(evt);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ if (!WriteFile(pipe->p, buf, len, NULL, olpd)) {
+ // If we failed immediately, then process it.
+ if ((rv = GetLastError()) == ERROR_IO_PENDING) {
+ // This is the normal path we expect; the IO will
+ // complete asynchronously.
+ return;
}
- if (!GetOverlappedResult(s->p, olp, &nrecv, TRUE)) {
- rv = GetLastError();
- return (nni_winpipe_error(rv));
+
+ // Some synchronous error occurred.
+ rv = nni_win_error(rv);
+ goto fail;
+ }
+
+ // If we completed synchronously, then do the completion. This is
+ // not normally expected.
+ nni_win_ipc_send_finish(pipe);
+ return;
+
+fail:
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, rv, aio->a_count);
+}
+
+
+static void
+nni_win_ipc_send_cb(void *arg)
+{
+ nni_plat_ipc_pipe *pipe = arg;
+
+ nni_mtx_lock(&pipe->mtx);
+ nni_win_ipc_send_finish(pipe);
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+void
+nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio)
+{
+ nni_win_event *evt = &pipe->send_evt;
+ int rv;
+
+ nni_mtx_lock(&pipe->mtx);
+ if ((rv = nni_aio_start(aio, nni_win_ipc_send_cancel, pipe)) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ if (pipe->p == INVALID_HANDLE_VALUE) {
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+
+ if ((rv = nni_win_event_reset(evt)) != 0) {
+ nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ nni_aio_list_append(&pipe->writeq, aio);
+ nni_win_ipc_send_start(pipe);
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
+nni_win_ipc_recv_cancel(nni_aio *aio)
+{
+ nni_plat_ipc_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ nni_win_event_cancel(&pipe->recv_evt);
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
+nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe)
+{
+ nni_win_event *evt = &pipe->recv_evt;
+ OVERLAPPED *olpd = nni_win_event_overlapped(evt);
+ int rv = 0;
+ nni_aio *aio;
+ DWORD cnt;
+
+ if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) {
+ rv = nni_win_error(GetLastError());
+ }
+ if ((aio = nni_list_first(&pipe->readq)) == NULL) {
+ // If the AIO was canceled, but IOCP thread was still
+ // working on it, we might have seen this.
+ return;
+ }
+ if (rv == 0) {
+ NNI_ASSERT(cnt <= aio->a_iov[0].iov_len);
+ aio->a_count += cnt;
+ aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt;
+ aio->a_iov[0].iov_len -= cnt;
+
+ if (aio->a_iov[0].iov_len == 0) {
+ int i;
+ for (i = 1; i < aio->a_niov; i++) {
+ aio->a_iov[i-1] = aio->a_iov[i];
+ }
+ aio->a_niov--;
}
- NNI_ASSERT(nrecv <= resid);
- NNI_ASSERT(nrecv <= len);
- resid -= nrecv;
- if (nrecv < iov[i].iov_len) {
- iov[i].iov_buf = buf + nrecv;
- iov[i].iov_len -= nrecv;
- } else {
- i++;
+
+ if (aio->a_niov > 0) {
+ // If we have more to do, submit it!
+ nni_win_ipc_recv_start(pipe);
+ return;
}
}
- return (0);
+
+ // All done; hopefully successfully.
+ nni_list_remove(&pipe->readq, aio);
+ nni_aio_finish(aio, rv, aio->a_count);
}
-int
-nni_plat_ipc_init(nni_plat_ipcsock *s)
+static void
+nni_win_ipc_recv_start(nni_plat_ipc_pipe *pipe)
{
+ void *buf;
+ DWORD len;
int rv;
+ nni_win_event *evt = &pipe->recv_evt;
+ OVERLAPPED *olpd = nni_win_event_overlapped(evt);
+ nni_aio *aio = nni_list_first(&pipe->readq);
+
+ NNI_ASSERT(aio != NULL);
+ NNI_ASSERT(aio->a_niov > 0);
+ NNI_ASSERT(aio->a_iov[0].iov_len > 0);
+ NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
+
+ if (pipe->p == INVALID_HANDLE_VALUE) {
+ rv = NNG_ECLOSED;
+ goto fail;
+ }
- s->p = INVALID_HANDLE_VALUE;
- s->recv_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
- if (s->recv_olpd.hEvent == INVALID_HANDLE_VALUE) {
- rv = GetLastError();
- return (nni_winpipe_error(rv));
+ if ((rv = nni_win_event_reset(evt)) != 0) {
+ goto fail;
}
- s->send_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
- if (s->send_olpd.hEvent == INVALID_HANDLE_VALUE) {
- rv = GetLastError();
- CloseHandle(s->recv_olpd.hEvent);
- return (nni_winpipe_error(rv));
+
+ // Now start a readfile. We assume that only one read can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes do
+ // not appear to support scatter/gather, so we have to process
+ // each element in turn.
+ buf = aio->a_iov[0].iov_buf;
+ len = (DWORD) aio->a_iov[0].iov_len;
+ olpd = nni_win_event_overlapped(evt);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
}
- s->conn_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
- if (s->conn_olpd.hEvent == INVALID_HANDLE_VALUE) {
- rv = GetLastError();
- CloseHandle(s->send_olpd.hEvent);
- CloseHandle(s->recv_olpd.hEvent);
- return (nni_winpipe_error(rv));
+
+ if (!ReadFile(pipe->p, buf, len, NULL, olpd)) {
+ // If we failed immediately, then process it.
+ if ((rv = GetLastError()) == ERROR_IO_PENDING) {
+ // This is the normal path we expect; the IO will
+ // complete asynchronously.
+ return;
+ }
+
+ // Some synchronous error occurred.
+ rv = nni_win_error(rv);
+ goto fail;
}
- InitializeCriticalSection(&s->cs);
- return (0);
+
+ // If we completed synchronously, then do the completion. This is
+ // not normally expected.
+ nni_win_ipc_recv_finish(pipe);
+ return;
+
+fail:
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, rv, 0);
}
static void
-nni_plat_ipc_close(nni_plat_ipcsock *s)
+nni_win_ipc_recv_cb(void *arg)
{
- HANDLE fd;
+ nni_plat_ipc_pipe *pipe = arg;
- EnterCriticalSection(&s->cs);
- if ((fd = s->p) != INVALID_HANDLE_VALUE) {
- s->p = INVALID_HANDLE_VALUE;
- if (s->server) {
- (void) DisconnectNamedPipe(fd);
- }
- (void) CancelIoEx(fd, &s->send_olpd);
- (void) CancelIoEx(fd, &s->recv_olpd);
- (void) CancelIoEx(fd, &s->conn_olpd);
- (void) CloseHandle(fd);
+ nni_mtx_lock(&pipe->mtx);
+ nni_win_ipc_recv_finish(pipe);
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+void
+nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio)
+{
+ nni_win_event *evt = &pipe->send_evt;
+ int rv;
+
+ nni_mtx_lock(&pipe->mtx);
+ if ((rv = nni_aio_start(aio, nni_win_ipc_recv_cancel, pipe)) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
}
- LeaveCriticalSection(&s->cs);
+ if (pipe->p == INVALID_HANDLE_VALUE) {
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+
+ if ((rv = nni_win_event_reset(evt)) != 0) {
+ nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ nni_aio_list_append(&pipe->readq, aio);
+ nni_win_ipc_recv_start(pipe);
}
void
-nni_plat_ipc_fini(nni_plat_ipcsock *s)
+nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe)
{
- nni_plat_ipc_close(s);
- DeleteCriticalSection(&s->cs);
- CloseHandle(s->recv_olpd.hEvent);
- CloseHandle(s->send_olpd.hEvent);
- CloseHandle(s->conn_olpd.hEvent);
+ nni_mtx_lock(&pipe->mtx);
+ if (pipe->p != INVALID_HANDLE_VALUE) {
+ CloseHandle(pipe->p);
+ pipe->p = INVALID_HANDLE_VALUE;
+ }
+ nni_win_event_cancel(&pipe->send_evt);
+ nni_win_event_cancel(&pipe->recv_evt);
+ nni_mtx_unlock(&pipe->mtx);
}
void
-nni_plat_ipc_shutdown(nni_plat_ipcsock *s)
+nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe)
+{
+ nni_plat_ipc_pipe_close(pipe);
+
+ nni_win_event_fini(&pipe->send_evt);
+ nni_win_event_fini(&pipe->recv_evt);
+ nni_mtx_fini(&pipe->mtx);
+ NNI_FREE_STRUCT(pipe);
+}
+
+
+int
+nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode)
{
- nni_plat_ipc_close(s);
+ const char *path;
+ nni_plat_ipc_ep *ep;
+ int rv;
+
+ if (strncmp(url, "ipc://", strlen("ipc://")) != 0) {
+ return (NNG_EADDRINVAL);
+ }
+ path = url + strlen("ipc://");
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ ZeroMemory(ep, sizeof (ep));
+ if ((rv = nni_mtx_init(&ep->mtx)) != 0) {
+ NNI_FREE_STRUCT(ep);
+ return (rv);
+ }
+
+ ep->mode = mode;
+ NNI_LIST_NODE_INIT(&ep->node);
+ nni_aio_list_init(&ep->aios);
+
+ (void) snprintf(ep->path, sizeof (ep->path), "\\\\.\\pipe\\%s", path);
+
+ *epp = ep;
+ return (0);
}
int
-nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path)
+nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
{
int rv;
+ HANDLE p;
- snprintf(s->path, sizeof (s->path), "\\\\.\\pipe\\%s", path);
+ nni_mtx_lock(&ep->mtx);
+ if (ep->mode != NNI_EP_MODE_LISTEN) {
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_EINVAL);
+ }
+ if (ep->started) {
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_EBUSY);
+ }
// We create the first named pipe, and we make sure that it is
// properly ours.
- s->p = CreateNamedPipeA(
- s->path,
+ p = CreateNamedPipeA(ep->path,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
FILE_FLAG_FIRST_PIPE_INSTANCE,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
PIPE_REJECT_REMOTE_CLIENTS,
PIPE_UNLIMITED_INSTANCES,
4096, 4096, 0, NULL);
-
- if (s->p == INVALID_HANDLE_VALUE) {
+ if (p == INVALID_HANDLE_VALUE) {
if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) {
- return (NNG_EADDRINUSE);
+ rv = NNG_EADDRINUSE;
+ } else {
+ rv = nni_win_error(rv);
}
- return (nni_winpipe_error(rv));
+ goto failed;
+ }
+ rv = nni_win_event_init(&ep->acc_evt, nni_win_ipc_acc_cb, ep, p);
+ if (rv != 0) {
+ goto failed;
}
- s->server = 1;
+ if ((rv = nni_win_iocp_register(p)) != 0) {
+ goto failed;
+ }
+
+ ep->p = p;
+ ep->started = 1;
+ nni_mtx_unlock(&ep->mtx);
return (0);
+
+failed:
+
+ nni_mtx_unlock(&ep->mtx);
+ if (p != INVALID_HANDLE_VALUE) {
+ (void) CloseHandle(p);
+ }
+
+ return (rv);
}
-int
-nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server)
+static void
+nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep)
{
- int rv;
- OVERLAPPED *olp = &s->conn_olpd;
+ nni_win_event *evt = &ep->acc_evt;
DWORD nbytes;
-
- s->server = 1;
- if (!ConnectNamedPipe(server->p, olp)) {
- rv = GetLastError();
- switch (rv) {
- case ERROR_PIPE_CONNECTED:
- break;
- case ERROR_IO_PENDING:
- if (!GetOverlappedResult(server->p, olp, &nbytes,
- TRUE)) {
- rv = GetLastError();
- return (nni_winpipe_error(rv));
- }
- default:
- rv = GetLastError();
- return (nni_winpipe_error(rv));
+ int rv;
+ nni_plat_ipc_pipe *pipe;
+ nni_aio *aio;
+ HANDLE newp, oldp;
+
+ // Note: This should be called with the ep lock held, and only when
+ // the ConnectNamedPipe has finished.
+
+ rv = 0;
+ if (!GetOverlappedResult(ep->p, &evt->olpd, &nbytes, FALSE)) {
+ if ((rv = GetLastError()) == ERROR_IO_INCOMPLETE) {
+ // We should never be here normally, but if the
+ // pipe got accepted by another client we can
+ // some times race here.
+ return;
}
}
- EnterCriticalSection(&server->cs);
- if (server->p != INVALID_HANDLE_VALUE) {
- s->p = server->p;
- server->p = CreateNamedPipeA(
- server->path,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
- PIPE_REJECT_REMOTE_CLIENTS,
- PIPE_UNLIMITED_INSTANCES,
- 4096, 4096, 0, NULL);
- if (server->p == INVALID_HANDLE_VALUE) {
- // We return the old handle, so that future accept
- // attempts have a chance of succeeding. That means
- // we will disconnect the current client.
- rv = GetLastError();
- server->p = s->p;
- DisconnectNamedPipe(server->p);
- s->p = INVALID_HANDLE_VALUE;
- LeaveCriticalSection(&server->cs);
- return (nni_winpipe_error(rv));
+ if ((aio = nni_list_first(&ep->aios)) == NULL) {
+ // No completion available to us.
+ if (rv == 0) {
+ NNI_ASSERT(0);
+ DisconnectNamedPipe(ep->p);
}
+ return;
}
- LeaveCriticalSection(&server->cs);
- return (0);
+ nni_list_remove(&ep->aios, aio);
+ if (rv != 0) {
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ newp = CreateNamedPipeA(ep->path,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS,
+ PIPE_UNLIMITED_INSTANCES,
+ 4096, 4096, 0, NULL);
+ if (newp == INVALID_HANDLE_VALUE) {
+ rv = nni_win_error(GetLastError());
+ DisconnectNamedPipe(ep->p);
+ return;
+ }
+ oldp = ep->p;
+ ep->p = newp;
+
+ if ((rv = nni_win_ipc_pipe_init(&pipe, oldp)) != 0) {
+ DisconnectNamedPipe(oldp);
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ aio->a_pipe = pipe;
+ nni_aio_finish(aio, 0, 0);
}
-int
-nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path)
+static void
+nni_win_ipc_acc_cb(void *arg)
{
- int rv;
+ nni_plat_ipc_ep *ep = arg;
- snprintf(s->path, sizeof (s->path), "\\\\.\\pipe\\%s", path);
+ nni_mtx_lock(&ep->mtx);
+ nni_win_ipc_acc_finish(ep);
+ nni_mtx_unlock(&ep->mtx);
+}
- for (;;) {
- s->p = CreateFileA(s->path, GENERIC_READ | GENERIC_WRITE,
- 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
- if (s->p == INVALID_HANDLE_VALUE) {
- rv = GetLastError();
- switch (rv) {
- case ERROR_PIPE_BUSY:
- if (!WaitNamedPipe(s->path,
- NMPWAIT_USE_DEFAULT_WAIT)) {
- return (NNG_ETIMEDOUT);
- }
- continue;
- case ERROR_FILE_NOT_FOUND:
- // No present pipes (no listener?)
- return (NNG_ECONNREFUSED);
- default:
- return (nni_winpipe_error(rv));
- }
- }
- s->server = 0;
- break;
- }
- return (0);
+static void
+nni_win_ipc_acc_cancel(nni_aio *aio)
+{
+ nni_plat_ipc_ep *ep = aio->a_prov_data;
+
+ nni_mtx_lock(&ep->mtx);
+ nni_win_event_cancel(&ep->acc_evt);
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&ep->mtx);
}
-#endif
+void
+nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
+{
+ int rv;
+ nni_win_event *evt = &ep->acc_evt;
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_start(aio, nni_win_ipc_acc_cancel, ep) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ rv = 0;
+ if ((rv = nni_win_event_reset(evt)) != 0) {
+ nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if (!ConnectNamedPipe(ep->p, nni_win_event_overlapped(evt))) {
+ rv = GetLastError();
+ switch (rv) {
+ case ERROR_PIPE_CONNECTED:
+ rv = 0;
+ break;
+ case ERROR_IO_PENDING:
+ nni_aio_list_append(&ep->aios, aio);
+ nni_mtx_unlock(&ep->mtx);
+ return;
-struct nni_plat_ipc_pipe {
- SOCKET s;
- nni_win_event recv_evt;
- nni_win_event send_evt;
- OVERLAPPED recv_olpd;
- OVERLAPPED send_olpd;
-};
+ default:
+ rv = nni_win_error(GetLastError());
+ nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ }
-struct nni_plat_ipc_ep {
- SOCKET s;
- nni_win_event evt;
- OVERLAPPED olpd;
-};
+ nni_win_ipc_acc_finish(ep);
+ nni_mtx_unlock(&ep->mtx);
+}
-int
-nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode)
-{
- return (NNG_ENOTSUP);
-}
+// So Windows IPC is a bit different on the client side. There is no
+// support for asynchronous connection, but we can fake it with a single
+// thread that runs to establish the connection. That thread will run
+// keep looping, sleeping for 10 ms between attempts. It performs non-blocking
+// attempts to connect.
+typedef struct nni_win_ipc_conn_work nni_win_ipc_conn_work;
+struct nni_win_ipc_conn_work {
+ nni_list waiters;
+ nni_list workers;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_thr thr;
+ int exit;
+};
+static nni_win_ipc_conn_work nni_win_ipc_connecter;
-void
-nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
+
+static void
+nni_win_ipc_conn_thr(void *arg)
{
-}
+ nni_win_ipc_conn_work *w = arg;
+ nni_plat_ipc_ep *ep;
+ nni_plat_ipc_pipe *pipe;
+ nni_aio *aio;
+ HANDLE p;
+ int rv;
+ nni_mtx_lock(&w->mtx);
+ for (;;) {
+ if (w->exit) {
+ break;
+ }
+ while ((ep = nni_list_first(&w->waiters)) != NULL) {
+ nni_list_remove(&w->waiters, ep);
+ nni_list_append(&w->workers, ep);
+ }
-void
-nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
-{
-}
+ while ((ep = nni_list_first(&w->workers)) != NULL) {
+ nni_list_remove(&w->workers, ep);
+ if ((aio = nni_list_first(&ep->aios)) == NULL) {
+ continue;
+ }
+ nni_list_remove(&ep->aios, aio);
+ p = CreateFileA(ep->path,
+ GENERIC_READ | GENERIC_WRITE,
+ 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED,
+ NULL);
+
+ if (p == INVALID_HANDLE_VALUE) {
+ switch ((rv = GetLastError())) {
+ case ERROR_PIPE_BUSY:
+ // still in progress.
+ nni_list_prepend(&ep->aios, aio);
+ break;
+ case ERROR_FILE_NOT_FOUND:
+ nni_aio_finish(aio, NNG_ECONNREFUSED,
+ 0);
+ break;
+ default:
+ nni_aio_finish(aio, nni_win_error(rv),
+ 0);
+ break;
+ }
+ } else {
+ rv = nni_win_ipc_pipe_init(&pipe, p);
+ if (rv == 0) {
+ rv = nni_win_iocp_register(p);
+ }
+ if (rv != 0) {
+ DisconnectNamedPipe(p);
+ CloseHandle(p);
+ nni_aio_finish(aio, rv, 0);
+ } else {
+ aio->a_pipe = pipe;
+ nni_aio_finish(aio, 0, 0);
+ }
+ }
+ if (!nni_list_empty(&ep->aios)) {
+ nni_list_append(&w->waiters, ep);
+ }
+ }
-extern int
-nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
-{
- return (NNG_ENOTSUP);
+ // Wait 10 ms, unless woken earlier.
+ if (nni_list_empty(&w->waiters)) {
+ nni_cv_wait(&w->cv);
+ } else {
+ nni_cv_until(&w->cv, nni_clock() + 10000);
+ }
+ }
+ nni_mtx_unlock(&w->mtx);
}
-extern void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
+static void
+nni_win_ipc_conn_cancel(nni_aio *aio)
{
+ nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
+ nni_plat_ipc_ep *ep = aio->a_prov_data;
+
+ nni_mtx_lock(&w->mtx);
+ nni_aio_list_remove(aio);
+ if (nni_list_empty(&ep->aios)) {
+ nni_list_remove(&w->waiters, ep);
+ }
+ nni_mtx_unlock(&w->mtx);
}
-extern void
+void
nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
+ nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
+
+ nni_mtx_lock(&w->mtx);
+
+ if (nni_list_active(&w->waiters, ep)) {
+ nni_aio_finish(aio, NNG_EBUSY, 0);
+ nni_mtx_unlock(&w->mtx);
+ return;
+ }
+
+ if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) {
+ nni_mtx_unlock(&w->mtx);
+ return;
+ }
+ nni_list_append(&ep->aios, aio);
+ nni_list_append(&w->waiters, ep);
+ nni_cv_wake(&w->cv);
+ nni_mtx_unlock(&w->mtx);
}
void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio)
+nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
{
+ nni_mtx_lock(&ep->mtx);
+ if (ep->p) {
+ CloseHandle(ep->p);
+ ep->p = NULL;
+ }
+ nni_mtx_unlock(&ep->mtx);
+ nni_mtx_fini(&ep->mtx);
+ NNI_FREE_STRUCT(ep);
}
void
-nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio)
+nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
{
+ nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
+ nni_aio *aio;
+
+ switch (ep->mode) {
+ case NNI_EP_MODE_DIAL:
+ nni_mtx_lock(&w->mtx);
+ if (nni_list_active(&w->waiters, ep)) {
+ nni_list_remove(&w->waiters, ep);
+ }
+ while ((aio = nni_list_first(&ep->aios)) != NULL) {
+ nni_list_remove(&ep->aios, aio);
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ }
+ nni_mtx_unlock(&w->mtx);
+ break;
+ case NNI_EP_MODE_LISTEN:
+ nni_mtx_lock(&ep->mtx);
+ while ((aio = nni_list_first(&ep->aios)) != NULL) {
+ nni_list_remove(&ep->aios, aio);
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ }
+ if (ep->p != INVALID_HANDLE_VALUE) {
+ nni_win_event_cancel(&ep->acc_evt);
+ CloseHandle(ep->p);
+ ep->p = INVALID_HANDLE_VALUE;
+ }
+ nni_mtx_unlock(&ep->mtx);
+ break;
+ }
}
-void
-nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p)
+int
+nni_win_ipc_sysinit(void)
{
+ int rv;
+ nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter;
+
+ NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node);
+ NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node);
+
+ if (((rv = nni_mtx_init(&worker->mtx)) != 0) ||
+ ((rv = nni_cv_init(&worker->cv, &worker->mtx)) != 0)) {
+ return (rv);
+ }
+ rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker);
+ if (rv != 0) {
+ return (rv);
+ }
+
+ nni_thr_run(&worker->thr);
+
+ return (0);
}
void
-nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p)
+nni_win_ipc_sysfini(void)
{
+ nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter;
+
+ nni_mtx_lock(&worker->mtx);
+ worker->exit = 1;
+ nni_cv_wake(&worker->cv);
+ nni_mtx_unlock(&worker->mtx);
+ nni_thr_fini(&worker->thr);
+ nni_cv_fini(&worker->cv);
+ nni_mtx_fini(&worker->mtx);
}
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index f3103079..bc2d7a0c 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -181,6 +181,10 @@ nni_plat_init(int (*helper)(void))
if ((rv = nni_win_resolv_sysinit()) != 0) {
goto out;
}
+ if ((rv = nni_win_ipc_sysinit()) != 0) {
+ goto out;
+ }
+
helper();
inited = 1;
}
@@ -195,9 +199,10 @@ out:
void
nni_plat_fini(void)
{
- WSACleanup();
+ nni_win_ipc_sysfini();
nni_win_resolv_sysfini();
nni_win_iocp_sysfini();
+ WSACleanup();
}