aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/platform/windows/win_debug.c21
-rw-r--r--src/platform/windows/win_impl.h57
-rw-r--r--src/platform/windows/win_iocp.c186
-rw-r--r--src/platform/windows/win_ipc.c619
4 files changed, 373 insertions, 510 deletions
diff --git a/src/platform/windows/win_debug.c b/src/platform/windows/win_debug.c
index bc3d5bfa..cc2adb88 100644
--- a/src/platform/windows/win_debug.c
+++ b/src/platform/windows/win_debug.c
@@ -42,11 +42,20 @@ static struct {
int sys_err;
int nng_err;
} nni_plat_errnos[] = {
- { ENOENT, NNG_ENOENT }, { EINTR, NNG_EINTR }, { EINVAL, NNG_EINVAL },
- { ENOMEM, NNG_ENOMEM }, { EACCES, NNG_EPERM }, { EAGAIN, NNG_EAGAIN },
- { EBADF, NNG_ECLOSED }, { EBUSY, NNG_EBUSY },
- { ENAMETOOLONG, NNG_EINVAL }, { EPERM, NNG_EPERM },
- { EPIPE, NNG_ECLOSED }, { 0, 0 } // must be last
+ // clang-format off
+ { ENOENT, NNG_ENOENT },
+ { EINTR, NNG_EINTR },
+ { EINVAL, NNG_EINVAL },
+ { ENOMEM, NNG_ENOMEM },
+ { EACCES, NNG_EPERM },
+ { EAGAIN, NNG_EAGAIN },
+ { EBADF, NNG_ECLOSED },
+ { EBUSY, NNG_EBUSY },
+ { ENAMETOOLONG, NNG_EINVAL },
+ { EPERM, NNG_EPERM },
+ { EPIPE, NNG_ECLOSED },
+ { 0, 0 } // must be last
+ // clang-format on
};
int
@@ -107,7 +116,7 @@ nni_win_error(int errnum)
if (errnum == 0) {
return (0);
}
- for (i = 0; nni_win_errnos[i].nng_err != 0; i++) {
+ for (i = 0; nni_win_errnos[i].win_err != 0; i++) {
if (errnum == nni_win_errnos[i].win_err) {
return (nni_win_errnos[i].nng_err);
}
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index 234acc35..4594da53 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -16,10 +16,12 @@
#define WIN32_LEAN_AND_MEAN
#endif
-#include <mswsock.h>
-#include <process.h>
+// These headers must be included first.
#include <windows.h>
#include <winsock2.h>
+
+#include <mswsock.h>
+#include <process.h>
#include <ws2tcpip.h>
#include "core/list.h"
@@ -27,21 +29,6 @@
// These types are provided for here, to permit them to be directly inlined
// elsewhere.
-typedef struct nni_win_event nni_win_event;
-
-// nni_win_event is used with io completion ports. This allows us to get
-// to a specific completion callback without requiring the poller (in the
-// completion port) to know anything about the event itself. We also use
-// this to pass back status and counts to the routine, which may not be
-// conveyed in the OVERLAPPED directly.
-struct nni_win_event {
- OVERLAPPED olpd;
- HANDLE h;
- void * ptr;
- nni_cb cb;
- nni_list aios;
-};
-
struct nni_plat_thr {
void (*func)(void *);
void * arg;
@@ -59,14 +46,40 @@ struct nni_plat_cv {
PSRWLOCK srl;
};
+// nni_win_event is used with io completion ports. This allows us to get
+// to a specific completion callback without requiring the poller (in the
+// completion port) to know anything about the event itself. We also use
+// this to pass back status and counts to the routine, which may not be
+// conveyed in the OVERLAPPED directly.
+typedef struct nni_win_event nni_win_event;
+typedef struct nni_win_event_ops nni_win_event_ops;
+
+struct nni_win_event_ops {
+ int (*wev_start)(nni_win_event *, nni_aio *);
+ void (*wev_finish)(nni_win_event *, nni_aio *);
+ void (*wev_cancel)(nni_win_event *, nni_aio *);
+};
+struct nni_win_event {
+ OVERLAPPED olpd;
+ HANDLE h;
+ void * ptr;
+ nni_aio * aio;
+ nni_mtx mtx;
+ int count;
+ int status;
+ nni_win_event_ops ops;
+};
+
extern int nni_win_error(int);
extern int nni_winsock_error(int);
-extern int nni_win_event_init(nni_win_event *, nni_cb, void *, HANDLE);
-extern void nni_win_event_fini(nni_win_event *);
-extern int nni_win_event_reset(nni_win_event *);
-extern OVERLAPPED *nni_win_event_overlapped(nni_win_event *);
-extern void nni_win_event_cancel(nni_win_event *);
+extern int nni_win_event_init(
+ nni_win_event *, nni_win_event_ops *, void *, HANDLE);
+extern void nni_win_event_fini(nni_win_event *);
+extern void nni_win_event_submit(nni_win_event *, nni_aio *);
+extern void nni_win_event_resubmit(nni_win_event *, nni_aio *);
+extern void nni_win_event_close(nni_win_event *);
+extern void nni_win_event_complete(nni_win_event *, int);
extern int nni_win_iocp_register(HANDLE);
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index 7244b360..17f3b16d 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -31,35 +31,131 @@ nni_win_iocp_handler(void *arg)
ULONG_PTR key;
OVERLAPPED * olpd;
nni_win_event *evt;
- int status;
- BOOL rv;
+ int rv;
+ BOOL ok;
+ nni_aio * aio;
NNI_ARG_UNUSED(arg);
iocp = nni_win_global_iocp;
for (;;) {
- key = 0;
- olpd = NULL;
- status = 0;
- cnt = 0;
+ key = 0;
+ olpd = NULL;
- rv = GetQueuedCompletionStatus(
+ ok = GetQueuedCompletionStatus(
iocp, &cnt, &key, &olpd, INFINITE);
- if (rv == FALSE) {
- if (olpd == NULL) {
- // Completion port bailed...
- break;
- }
+ if (olpd == NULL) {
+ // Completion port closed...
+ NNI_ASSERT(ok == FALSE);
+ break;
}
- NNI_ASSERT(olpd != NULL);
- evt = (void *) olpd;
+ evt = CONTAINING_RECORD(olpd, nni_win_event, olpd);
- NNI_ASSERT(evt->cb != NULL);
- evt->cb(evt->ptr);
+ if (ok) {
+ rv = ERROR_SUCCESS;
+ } else if ((rv = GetLastError()) == ERROR_OPERATION_ABORTED) {
+ // Canceled operation, we can't touch any
+ // of the memory, since it may be gone.
+ continue;
+ }
+
+ nni_mtx_lock(&evt->mtx);
+ if ((aio = evt->aio) == NULL) {
+ // Canceled?? Probably ERROR_OPERATION_ABORTED
+ // It's pretty unclear how we got here.
+ nni_mtx_unlock(&evt->mtx);
+ continue;
+ }
+ evt->aio = NULL;
+ evt->status = rv;
+ evt->count = cnt;
+
+ evt->ops.wev_finish(evt, aio);
+ nni_mtx_unlock(&evt->mtx);
+ }
+}
+
+static void
+nni_win_event_cancel(nni_aio *aio)
+{
+ nni_win_event *evt = aio->a_prov_data;
+
+ nni_mtx_lock(&evt->mtx);
+ evt->aio = NULL;
+
+ // Use provider specific cancellation.
+ evt->ops.wev_cancel(evt, aio);
+ nni_mtx_unlock(&evt->mtx);
+}
+
+void
+nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
+{
+ // This is just continuation of a pre-existing AIO operation.
+ // For example, continuing I/O of a multi-buffer s/g operation.
+ // The lock is held.
+ evt->status = ERROR_SUCCESS;
+ evt->count = 0;
+ if (!ResetEvent(evt->olpd.hEvent)) {
+ evt->status = GetLastError();
+ evt->count = 0;
+
+ evt->ops.wev_finish(evt, aio);
+ return;
+ }
+
+ evt->aio = aio;
+ if (evt->ops.wev_start(evt, aio) != 0) {
+ // Start completed synchronously. It will have stored
+ // the count and status in the evt.
+ evt->aio = NULL;
+ evt->ops.wev_finish(evt, aio);
+ }
+}
+
+void
+nni_win_event_submit(nni_win_event *evt, nni_aio *aio)
+{
+ nni_mtx_lock(&evt->mtx);
+ if (nni_aio_start(aio, nni_win_event_cancel, evt) != 0) {
+ // the aio was aborted
+ nni_mtx_unlock(&evt->mtx);
+ return;
+ }
+ nni_win_event_resubmit(evt, aio);
+ nni_mtx_unlock(&evt->mtx);
+}
+
+void
+nni_win_event_complete(nni_win_event *evt, int cnt)
+{
+ PostQueuedCompletionStatus(nni_win_global_iocp, cnt, 0, &evt->olpd);
+}
+
+void
+nni_win_event_close(nni_win_event *evt)
+{
+ nni_aio *aio;
+ nni_mtx_lock(&evt->mtx);
+ if (evt->h != NULL) {
+ if (CancelIoEx(evt->h, &evt->olpd)) {
+ DWORD cnt;
+ // Stall waiting for the I/O to complete.
+ GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE);
+ }
+ }
+ if ((aio = evt->aio) != NULL) {
+ evt->aio = NULL;
+ // We really don't care if we transferred data or not.
+ // The caller indicates they have closed the pipe.
+ evt->status = ERROR_INVALID_HANDLE;
+ evt->count = 0;
+ evt->ops.wev_finish(evt, aio);
}
+ nni_mtx_unlock(&evt->mtx);
}
int
@@ -72,16 +168,22 @@ nni_win_iocp_register(HANDLE h)
}
int
-nni_win_event_init(nni_win_event *evt, nni_cb cb, void *ptr, HANDLE h)
+nni_win_event_init(
+ nni_win_event *evt, nni_win_event_ops *ops, void *ptr, HANDLE h)
{
+ int rv;
+
ZeroMemory(&evt->olpd, sizeof(evt->olpd));
evt->olpd.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (evt->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
}
- nni_aio_list_init(&evt->aios);
+ if ((rv = nni_mtx_init(&evt->mtx)) != 0) {
+ return (rv); // NB: This will never happen on Windows.
+ }
+ evt->ops = *ops;
+ evt->aio = NULL;
evt->ptr = ptr;
- evt->cb = cb;
evt->h = h;
return (0);
}
@@ -93,51 +195,7 @@ nni_win_event_fini(nni_win_event *evt)
(void) CloseHandle(evt->olpd.hEvent);
evt->olpd.hEvent = NULL;
}
-}
-
-int
-nni_win_event_reset(nni_win_event *evt)
-{
- if (!ResetEvent(evt->olpd.hEvent)) {
- return (nni_win_error(GetLastError()));
- }
- return (0);
-}
-
-OVERLAPPED *
-nni_win_event_overlapped(nni_win_event *evt)
-{
- return (&evt->olpd);
-}
-
-void
-nni_win_event_cancel(nni_win_event *evt)
-{
- int rv;
- DWORD cnt;
-
- // Try to cancel the event...
- if (!CancelIoEx(evt->h, &evt->olpd)) {
- // None was found. That's good.
- if ((rv = GetLastError()) == ERROR_NOT_FOUND) {
- // Nothing queued. We may in theory be running
- // the callback via the completion port handler;
- // caller must synchronize that separately.
- return;
- }
-
- // It's unclear why we would ever fail in this
- // circumstance. Is there a kind of "uncancellable I/O"
- // here, or somesuch? In this case we just wait hard
- // using the success case -- its the best we can do.
- }
-
- // This basically just waits for the canceled I/O to complete.
- // The end result can be either success or ERROR_OPERATION_ABORTED.
- // It turns out we don't much care either way; we just want to make
- // sure that we don't have any I/O pending on the overlapped
- // structure before we release it or reuse it.
- GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE);
+ nni_mtx_fini(&evt->mtx);
}
int
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index 237ded78..be1a98a1 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -13,19 +13,10 @@
#include <stdio.h>
-static void nni_win_ipc_acc_cb(void *);
-static void nni_win_ipc_send_cb(void *);
-static void nni_win_ipc_recv_cb(void *);
-static void nni_win_ipc_send_start(nni_plat_ipc_pipe *);
-static void nni_win_ipc_recv_start(nni_plat_ipc_pipe *);
-
struct nni_plat_ipc_pipe {
HANDLE p;
- nni_win_event recv_evt;
- nni_win_event send_evt;
- nni_mtx mtx;
- nni_list readq;
- nni_list writeq;
+ nni_win_event rcv_ev;
+ nni_win_event snd_ev;
};
struct nni_plat_ipc_ep {
@@ -34,118 +25,49 @@ struct nni_plat_ipc_ep {
int started;
nni_list aios;
HANDLE p; // accept side only
- nni_win_event acc_evt; // accept side only
- nni_mtx mtx; // accept side only
+ nni_win_event acc_ev; // accept side only
+ nni_aio * con_aio; // conn side only
nni_list_node node; // conn side uses this
};
-static int
-nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p)
-{
- nni_plat_ipc_pipe *pipe;
- int rv;
-
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
- NNI_FREE_STRUCT(pipe);
- return (rv);
- }
- rv = nni_win_event_init(&pipe->recv_evt, nni_win_ipc_recv_cb, pipe, p);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
- rv = nni_win_event_init(&pipe->send_evt, nni_win_ipc_send_cb, pipe, p);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
-
- pipe->p = p;
- nni_aio_list_init(&pipe->readq);
- nni_aio_list_init(&pipe->writeq);
- *pipep = pipe;
- return (0);
-}
-
-static void
-nni_win_ipc_send_cancel(nni_aio *aio)
-{
- nni_plat_ipc_pipe *pipe = aio->a_prov_data;
-
- nni_mtx_lock(&pipe->mtx);
- nni_win_event_cancel(&pipe->recv_evt);
- nni_aio_list_remove(aio);
- nni_mtx_unlock(&pipe->mtx);
-}
-
-static void
-nni_win_ipc_send_finish(nni_plat_ipc_pipe *pipe)
-{
- nni_win_event *evt = &pipe->send_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- int rv = 0;
- nni_aio * aio;
- DWORD cnt;
-
- if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) {
- rv = nni_win_error(GetLastError());
- }
- if ((aio = nni_list_first(&pipe->writeq)) == NULL) {
- // If the AIO was canceled, but IOCP thread was still
- // working on it, we might have seen this.
- return;
- }
- if (rv == 0) {
- NNI_ASSERT(cnt <= aio->a_iov[0].iov_len);
- aio->a_count += cnt;
- aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt;
- aio->a_iov[0].iov_len -= cnt;
+static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *);
+static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *);
+static void nni_win_ipc_pipe_cancel(nni_win_event *, nni_aio *);
- if (aio->a_iov[0].iov_len == 0) {
- int i;
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
- aio->a_niov--;
- }
+static nni_win_event_ops nni_win_ipc_pipe_ops = {
+ .wev_start = nni_win_ipc_pipe_start,
+ .wev_finish = nni_win_ipc_pipe_finish,
+ .wev_cancel = nni_win_ipc_pipe_cancel,
+};
- if (aio->a_niov > 0) {
- // If we have more to do, submit it!
- nni_win_ipc_send_start(pipe);
- return;
- }
- }
+static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *);
+static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *);
+static void nni_win_ipc_acc_cancel(nni_win_event *, nni_aio *);
- // All done; hopefully successfully.
- nni_list_remove(&pipe->writeq, aio);
- nni_aio_finish(aio, rv, aio->a_count);
-}
+static nni_win_event_ops nni_win_ipc_acc_ops = {
+ .wev_start = nni_win_ipc_acc_start,
+ .wev_finish = nni_win_ipc_acc_finish,
+ .wev_cancel = nni_win_ipc_acc_cancel,
+};
-static void
-nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe)
+static int
+nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
{
- void * buf;
- DWORD len;
- int rv;
- nni_win_event *evt = &pipe->send_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- nni_aio * aio = nni_list_first(&pipe->writeq);
+ void * buf;
+ DWORD len;
+ BOOL ok;
+ int rv;
+ nni_plat_ipc_pipe *pipe = evt->ptr;
NNI_ASSERT(aio != NULL);
NNI_ASSERT(aio->a_niov > 0);
NNI_ASSERT(aio->a_iov[0].iov_len > 0);
NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
- if (pipe->p == INVALID_HANDLE_VALUE) {
- rv = NNG_ECLOSED;
- goto fail;
- }
-
- if ((rv = nni_win_event_reset(evt)) != 0) {
- goto fail;
+ if (evt->h == INVALID_HANDLE_VALUE) {
+ evt->status = ERROR_INVALID_HANDLE;
+ evt->count = 0;
+ return (1);
}
// Now start a writefile. We assume that only one send can be
@@ -153,9 +75,8 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe)
// scrambling the data anyway. Note that Windows named pipes do
// not appear to support scatter/gather, so we have to process
// each element in turn.
- buf = aio->a_iov[0].iov_buf;
- len = (DWORD) aio->a_iov[0].iov_len;
- olpd = nni_win_event_overlapped(evt);
+ buf = aio->a_iov[0].iov_buf;
+ len = (DWORD) aio->a_iov[0].iov_len;
// We limit ourselves to writing 16MB at a time. Named Pipes
// on Windows have limits of between 31 and 64MB.
@@ -163,95 +84,47 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe)
len = 0x1000000;
}
- if (!WriteFile(pipe->p, buf, len, NULL, olpd)) {
- // If we failed immediately, then process it.
- if ((rv = GetLastError()) == ERROR_IO_PENDING) {
- // This is the normal path we expect; the IO will
- // complete asynchronously.
- return;
- }
-
- // Some synchronous error occurred.
- rv = nni_win_error(rv);
- goto fail;
+ evt->count = 0;
+ if (evt == &pipe->snd_ev) {
+ ok = WriteFile(evt->h, buf, len, NULL, &evt->olpd);
+ } else {
+ ok = ReadFile(evt->h, buf, len, NULL, &evt->olpd);
+ }
+ if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ evt->status = GetLastError();
+ evt->count = 0;
+ return (1);
}
- // If we completed synchronously, then do the completion. This is
- // not normally expected.
- nni_win_ipc_send_finish(pipe);
- return;
-
-fail:
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, aio->a_count);
+ // Wait for the I/O completion event. Note that when an I/O
+ // completes immediately, the I/O completion packet is still
+ // delivered.
+ return (0);
}
static void
-nni_win_ipc_send_cb(void *arg)
+nni_win_ipc_pipe_cancel(nni_win_event *evt, nni_aio *aio)
{
- nni_plat_ipc_pipe *pipe = arg;
+ NNI_ARG_UNUSED(aio);
- nni_mtx_lock(&pipe->mtx);
- nni_win_ipc_send_finish(pipe);
- nni_mtx_unlock(&pipe->mtx);
-}
+ if (CancelIoEx(evt->h, &evt->olpd)) {
+ DWORD cnt;
-void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio)
-{
- nni_win_event *evt = &pipe->send_evt;
- int rv;
-
- nni_mtx_lock(&pipe->mtx);
- if ((rv = nni_aio_start(aio, nni_win_ipc_send_cancel, pipe)) != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
- if (pipe->p == INVALID_HANDLE_VALUE) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
-
- if ((rv = nni_win_event_reset(evt)) != 0) {
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
+ // If we canceled, make sure that we've completely
+ // finished with the overlapped.
+ GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE);
}
- nni_aio_list_append(&pipe->writeq, aio);
- nni_win_ipc_send_start(pipe);
- nni_mtx_unlock(&pipe->mtx);
}
static void
-nni_win_ipc_recv_cancel(nni_aio *aio)
+nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio)
{
- nni_plat_ipc_pipe *pipe = aio->a_prov_data;
+ int rv = 0;
+ DWORD cnt;
- nni_mtx_lock(&pipe->mtx);
- nni_win_event_cancel(&pipe->recv_evt);
- nni_aio_list_remove(aio);
- nni_mtx_unlock(&pipe->mtx);
-}
-
-static void
-nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe)
-{
- nni_win_event *evt = &pipe->recv_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- int rv = 0;
- nni_aio * aio;
- DWORD cnt;
-
- if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) {
- rv = nni_win_error(GetLastError());
- }
- if ((aio = nni_list_first(&pipe->readq)) == NULL) {
- // If the AIO was canceled, but IOCP thread was still
- // working on it, we might have seen this.
- return;
- }
- if (rv == 0) {
+ cnt = evt->count;
+ if ((rv = evt->status) == 0) {
NNI_ASSERT(cnt <= aio->a_iov[0].iov_len);
aio->a_count += cnt;
aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt;
@@ -259,133 +132,71 @@ nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe)
if (aio->a_iov[0].iov_len == 0) {
int i;
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
aio->a_niov--;
+ for (i = 0; i < aio->a_niov; i++) {
+ aio->a_iov[i] = aio->a_iov[i + 1];
+ }
}
if (aio->a_niov > 0) {
// If we have more to do, submit it!
- nni_win_ipc_recv_start(pipe);
+ nni_win_event_resubmit(evt, aio);
return;
}
}
// All done; hopefully successfully.
- nni_list_remove(&pipe->readq, aio);
- nni_aio_finish(aio, rv, aio->a_count);
+ nni_aio_finish(aio, nni_win_error(rv), aio->a_count);
}
-static void
-nni_win_ipc_recv_start(nni_plat_ipc_pipe *pipe)
+static int
+nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p)
{
- void * buf;
- DWORD len;
- int rv;
- nni_win_event *evt = &pipe->recv_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- nni_aio * aio = nni_list_first(&pipe->readq);
-
- NNI_ASSERT(aio != NULL);
- NNI_ASSERT(aio->a_niov > 0);
- NNI_ASSERT(aio->a_iov[0].iov_len > 0);
- NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
-
- if (pipe->p == INVALID_HANDLE_VALUE) {
- rv = NNG_ECLOSED;
- goto fail;
- }
+ nni_plat_ipc_pipe *pipe;
+ int rv;
- if ((rv = nni_win_event_reset(evt)) != 0) {
- goto fail;
+ if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ return (NNG_ENOMEM);
}
-
- // Now start a readfile. We assume that only one read can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes do
- // not appear to support scatter/gather, so we have to process
- // each element in turn.
- buf = aio->a_iov[0].iov_buf;
- len = (DWORD) aio->a_iov[0].iov_len;
- olpd = nni_win_event_overlapped(evt);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
+ rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe, p);
+ if (rv != 0) {
+ nni_plat_ipc_pipe_fini(pipe);
+ return (rv);
}
-
- if (!ReadFile(pipe->p, buf, len, NULL, olpd)) {
- // If we failed immediately, then process it.
- if ((rv = GetLastError()) == ERROR_IO_PENDING) {
- // This is the normal path we expect; the IO will
- // complete asynchronously.
- return;
- }
-
- // Some synchronous error occurred.
- rv = nni_win_error(rv);
- goto fail;
+ rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe, p);
+ if (rv != 0) {
+ nni_plat_ipc_pipe_fini(pipe);
+ return (rv);
}
- // If we completed synchronously, then do the completion. This is
- // not normally expected.
- nni_win_ipc_recv_finish(pipe);
- return;
-
-fail:
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, 0);
+ pipe->p = p;
+ *pipep = pipe;
+ return (0);
}
-static void
-nni_win_ipc_recv_cb(void *arg)
+void
+nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio)
{
- nni_plat_ipc_pipe *pipe = arg;
-
- nni_mtx_lock(&pipe->mtx);
- nni_win_ipc_recv_finish(pipe);
- nni_mtx_unlock(&pipe->mtx);
+ nni_win_event_submit(&pipe->snd_ev, aio);
}
void
nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio)
{
- nni_win_event *evt = &pipe->send_evt;
- int rv;
-
- nni_mtx_lock(&pipe->mtx);
- if ((rv = nni_aio_start(aio, nni_win_ipc_recv_cancel, pipe)) != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
- if (pipe->p == INVALID_HANDLE_VALUE) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
-
- if ((rv = nni_win_event_reset(evt)) != 0) {
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
- nni_aio_list_append(&pipe->readq, aio);
- nni_win_ipc_recv_start(pipe);
+ nni_win_event_submit(&pipe->rcv_ev, aio);
}
void
nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe)
{
- nni_mtx_lock(&pipe->mtx);
- if (pipe->p != INVALID_HANDLE_VALUE) {
- CloseHandle(pipe->p);
+ HANDLE p;
+
+ if ((p = pipe->p) != INVALID_HANDLE_VALUE) {
pipe->p = INVALID_HANDLE_VALUE;
+ CloseHandle(p);
}
- nni_win_event_cancel(&pipe->send_evt);
- nni_win_event_cancel(&pipe->recv_evt);
- nni_mtx_unlock(&pipe->mtx);
+ nni_win_event_close(&pipe->snd_ev);
+ nni_win_event_close(&pipe->rcv_ev);
}
void
@@ -393,9 +204,8 @@ nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe)
{
nni_plat_ipc_pipe_close(pipe);
- nni_win_event_fini(&pipe->send_evt);
- nni_win_event_fini(&pipe->recv_evt);
- nni_mtx_fini(&pipe->mtx);
+ nni_win_event_fini(&pipe->snd_ev);
+ nni_win_event_fini(&pipe->rcv_ev);
NNI_FREE_STRUCT(pipe);
}
@@ -404,7 +214,6 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode)
{
const char * path;
nni_plat_ipc_ep *ep;
- int rv;
if (strncmp(url, "ipc://", strlen("ipc://")) != 0) {
return (NNG_EADDRINVAL);
@@ -414,14 +223,9 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode)
return (NNG_ENOMEM);
}
ZeroMemory(ep, sizeof(ep));
- if ((rv = nni_mtx_init(&ep->mtx)) != 0) {
- NNI_FREE_STRUCT(ep);
- return (rv);
- }
ep->mode = mode;
NNI_LIST_NODE_INIT(&ep->node);
- nni_aio_list_init(&ep->aios);
(void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path);
@@ -435,13 +239,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
int rv;
HANDLE p;
- nni_mtx_lock(&ep->mtx);
if (ep->mode != NNI_EP_MODE_LISTEN) {
- nni_mtx_unlock(&ep->mtx);
return (NNG_EINVAL);
}
if (ep->started) {
- nni_mtx_unlock(&ep->mtx);
return (NNG_EBUSY);
}
@@ -461,7 +262,7 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
}
goto failed;
}
- rv = nni_win_event_init(&ep->acc_evt, nni_win_ipc_acc_cb, ep, p);
+ rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep, p);
if (rv != 0) {
goto failed;
}
@@ -472,12 +273,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
ep->p = p;
ep->started = 1;
- nni_mtx_unlock(&ep->mtx);
return (0);
failed:
- nni_mtx_unlock(&ep->mtx);
if (p != INVALID_HANDLE_VALUE) {
(void) CloseHandle(p);
}
@@ -486,40 +285,15 @@ failed:
}
static void
-nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep)
+nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
{
- nni_win_event * evt = &ep->acc_evt;
- DWORD nbytes;
- int rv;
+ nni_plat_ipc_ep * ep = evt->ptr;
nni_plat_ipc_pipe *pipe;
- nni_aio * aio;
+ int rv;
HANDLE newp, oldp;
- // Note: This should be called with the ep lock held, and only when
- // the ConnectNamedPipe has finished.
-
- rv = 0;
- if (!GetOverlappedResult(ep->p, &evt->olpd, &nbytes, FALSE)) {
- if ((rv = GetLastError()) == ERROR_IO_INCOMPLETE) {
- // We should never be here normally, but if the
- // pipe got accepted by another client we can
- // some times race here.
- return;
- }
- }
-
- if ((aio = nni_list_first(&ep->aios)) == NULL) {
- // No completion available to us.
- if (rv == 0) {
- NNI_ASSERT(0);
- DisconnectNamedPipe(ep->p);
- }
- return;
- }
-
- nni_list_remove(&ep->aios, aio);
- if (rv != 0) {
- nni_aio_finish(aio, rv, 0);
+ if ((rv = evt->status) != 0) {
+ nni_aio_finish(aio, nni_win_error(rv), 0);
return;
}
@@ -530,14 +304,31 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep)
PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, NULL);
if (newp == INVALID_HANDLE_VALUE) {
rv = nni_win_error(GetLastError());
+ // We connected, but as we cannot get a new pipe,
+ // we have to disconnect the old one.
DisconnectNamedPipe(ep->p);
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+ if ((rv = nni_win_iocp_register(newp)) != 0) {
+ // Disconnect the old pipe.
+ DisconnectNamedPipe(ep->p);
+ // And discard the half-baked new one.
+ DisconnectNamedPipe(newp);
+ (void) CloseHandle(newp);
+ nni_aio_finish(aio, rv, 0);
return;
}
- oldp = ep->p;
- ep->p = newp;
+
+ oldp = ep->p;
+ ep->p = newp;
+ evt->h = newp;
if ((rv = nni_win_ipc_pipe_init(&pipe, oldp)) != 0) {
+ // The new pipe is already fine for us. Discard
+ // the old one, since failed to be able to use it.
DisconnectNamedPipe(oldp);
+ (void) CloseHandle(oldp);
nni_aio_finish(aio, rv, 0);
return;
}
@@ -547,64 +338,56 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep)
}
static void
-nni_win_ipc_acc_cb(void *arg)
+nni_win_ipc_acc_cancel(nni_win_event *evt, nni_aio *aio)
{
- nni_plat_ipc_ep *ep = arg;
-
- nni_mtx_lock(&ep->mtx);
- nni_win_ipc_acc_finish(ep);
- nni_mtx_unlock(&ep->mtx);
-}
+ NNI_ARG_UNUSED(aio);
-static void
-nni_win_ipc_acc_cancel(nni_aio *aio)
-{
- nni_plat_ipc_ep *ep = aio->a_prov_data;
+ if (CancelIoEx(evt->h, &evt->olpd)) {
+ DWORD cnt;
- nni_mtx_lock(&ep->mtx);
- nni_win_event_cancel(&ep->acc_evt);
- nni_aio_list_remove(aio);
- nni_mtx_unlock(&ep->mtx);
+ // If we canceled, make sure that we've completely
+ // finished with the overlapped.
+ GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE);
+ }
+ // Just to be sure.
+ (void) DisconnectNamedPipe(evt->h);
}
-void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
+static int
+nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio)
{
- int rv;
- nni_win_event *evt = &ep->acc_evt;
-
- nni_mtx_lock(&ep->mtx);
- if (nni_aio_start(aio, nni_win_ipc_acc_cancel, ep) != 0) {
- nni_mtx_unlock(&ep->mtx);
- return;
- }
- rv = 0;
- if ((rv = nni_win_event_reset(evt)) != 0) {
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&ep->mtx);
- return;
- }
- if (!ConnectNamedPipe(ep->p, nni_win_event_overlapped(evt))) {
- rv = GetLastError();
+ if (!ConnectNamedPipe(evt->h, &evt->olpd)) {
+ int rv = GetLastError();
switch (rv) {
case ERROR_PIPE_CONNECTED:
- rv = 0;
- break;
+ // Synch completion already occurred.
+ // Windows is weird. Apparently the I/O
+ // completion packet has already been sent.
+ return (0);
+
case ERROR_IO_PENDING:
- nni_aio_list_append(&ep->aios, aio);
- nni_mtx_unlock(&ep->mtx);
- return;
+ // Normal asynchronous operation. Wait for
+ // completion.
+ return (0);
default:
- rv = nni_win_error(GetLastError());
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&ep->mtx);
- return;
+ // Fast-fail (synchronous).
+ evt->status = GetLastError();
+ evt->count = 0;
+ return (1);
}
}
- nni_win_ipc_acc_finish(ep);
- nni_mtx_unlock(&ep->mtx);
+ // Synch completion right now. I/O completion packet delivered
+ // already.
+ return (0);
+}
+
+void
+nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
+{
+ aio->a_pipe = NULL;
+ nni_win_event_submit(&ep->acc_ev, aio);
}
// So Windows IPC is a bit different on the client side. There is no
@@ -645,12 +428,16 @@ nni_win_ipc_conn_thr(void *arg)
}
while ((ep = nni_list_first(&w->workers)) != NULL) {
+
nni_list_remove(&w->workers, ep);
- if ((aio = nni_list_first(&ep->aios)) == NULL) {
+ if ((aio = ep->con_aio) == NULL) {
continue;
}
- nni_list_remove(&ep->aios, aio);
+ ep->con_aio = NULL;
+
+ pipe = NULL;
+
p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE,
0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED,
NULL);
@@ -658,41 +445,46 @@ nni_win_ipc_conn_thr(void *arg)
if (p == INVALID_HANDLE_VALUE) {
switch ((rv = GetLastError())) {
case ERROR_PIPE_BUSY:
- // still in progress.
- nni_list_prepend(&ep->aios, aio);
- break;
+ // Still in progress. This shouldn't
+ // happen unless the other side aborts
+ // the connection.
+ ep->con_aio = aio;
+ nni_list_append(&w->waiters, ep);
+ continue;
+
case ERROR_FILE_NOT_FOUND:
- nni_aio_finish(
- aio, NNG_ECONNREFUSED, 0);
+ rv = NNG_ECONNREFUSED;
break;
default:
- nni_aio_finish(
- aio, nni_win_error(rv), 0);
+ rv = nni_win_error(rv);
break;
}
- } else {
- rv = nni_win_ipc_pipe_init(&pipe, p);
- if (rv == 0) {
- rv = nni_win_iocp_register(p);
- }
- if (rv != 0) {
- DisconnectNamedPipe(p);
- CloseHandle(p);
- nni_aio_finish(aio, rv, 0);
- } else {
- aio->a_pipe = pipe;
- nni_aio_finish(aio, 0, 0);
- }
+ goto fail;
+ }
+ if (((rv = nni_win_ipc_pipe_init(&pipe, p)) != 0) ||
+ ((rv = nni_win_iocp_register(p)) != 0)) {
+ goto fail;
+ }
+ aio->a_pipe = pipe;
+ nni_aio_finish(aio, 0, 0);
+ continue;
+
+ fail:
+ if (p != INVALID_HANDLE_VALUE) {
+ DisconnectNamedPipe(p);
+ CloseHandle(p);
}
- if (!nni_list_empty(&ep->aios)) {
- nni_list_append(&w->waiters, ep);
+ if (pipe != NULL) {
+ nni_plat_ipc_pipe_fini(pipe);
}
+ nni_aio_finish(aio, rv, 0);
}
- // Wait 10 ms, unless woken earlier.
if (nni_list_empty(&w->waiters)) {
+ // Wait until an endpoint is added.
nni_cv_wait(&w->cv);
} else {
+ // Wait 10 ms, unless woken earlier.
nni_cv_until(&w->cv, nni_clock() + 10000);
}
}
@@ -706,9 +498,10 @@ nni_win_ipc_conn_cancel(nni_aio *aio)
nni_plat_ipc_ep * ep = aio->a_prov_data;
nni_mtx_lock(&w->mtx);
- nni_aio_list_remove(aio);
- if (nni_list_empty(&ep->aios)) {
+ ep->con_aio = NULL;
+ if (nni_list_active(&w->waiters, ep)) {
nni_list_remove(&w->waiters, ep);
+ nni_cv_wake(&w->cv);
}
nni_mtx_unlock(&w->mtx);
}
@@ -720,17 +513,13 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
nni_mtx_lock(&w->mtx);
- if (nni_list_active(&w->waiters, ep)) {
- nni_aio_finish(aio, NNG_EBUSY, 0);
- nni_mtx_unlock(&w->mtx);
- return;
- }
+ NNI_ASSERT(!nni_list_active(&w->waiters, ep));
if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) {
nni_mtx_unlock(&w->mtx);
return;
}
- nni_list_append(&ep->aios, aio);
+ ep->con_aio = aio;
nni_list_append(&w->waiters, ep);
nni_cv_wake(&w->cv);
nni_mtx_unlock(&w->mtx);
@@ -739,13 +528,12 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
void
nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
{
- nni_mtx_lock(&ep->mtx);
- if (ep->p) {
+ if (ep->p != INVALID_HANDLE_VALUE) {
CloseHandle(ep->p);
ep->p = NULL;
}
- nni_mtx_unlock(&ep->mtx);
- nni_mtx_fini(&ep->mtx);
+ nni_win_event_close(&ep->acc_ev);
+ nni_win_event_fini(&ep->acc_ev);
NNI_FREE_STRUCT(ep);
}
@@ -761,24 +549,19 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
if (nni_list_active(&w->waiters, ep)) {
nni_list_remove(&w->waiters, ep);
}
- while ((aio = nni_list_first(&ep->aios)) != NULL) {
- nni_list_remove(&ep->aios, aio);
+ if ((aio = ep->con_aio) != NULL) {
+ ep->con_aio = NULL;
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
nni_mtx_unlock(&w->mtx);
break;
+
case NNI_EP_MODE_LISTEN:
- nni_mtx_lock(&ep->mtx);
- while ((aio = nni_list_first(&ep->aios)) != NULL) {
- nni_list_remove(&ep->aios, aio);
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- }
if (ep->p != INVALID_HANDLE_VALUE) {
- nni_win_event_cancel(&ep->acc_evt);
+ nni_win_event_close(&ep->acc_ev);
CloseHandle(ep->p);
ep->p = INVALID_HANDLE_VALUE;
}
- nni_mtx_unlock(&ep->mtx);
break;
}
}