summaryrefslogtreecommitdiff
path: root/src/platform/windows/win_iocp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/windows/win_iocp.c')
-rw-r--r--src/platform/windows/win_iocp.c186
1 files changed, 122 insertions, 64 deletions
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index 7244b360..17f3b16d 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -31,35 +31,131 @@ nni_win_iocp_handler(void *arg)
ULONG_PTR key;
OVERLAPPED * olpd;
nni_win_event *evt;
- int status;
- BOOL rv;
+ int rv;
+ BOOL ok;
+ nni_aio * aio;
NNI_ARG_UNUSED(arg);
iocp = nni_win_global_iocp;
for (;;) {
- key = 0;
- olpd = NULL;
- status = 0;
- cnt = 0;
+ key = 0;
+ olpd = NULL;
- rv = GetQueuedCompletionStatus(
+ ok = GetQueuedCompletionStatus(
iocp, &cnt, &key, &olpd, INFINITE);
- if (rv == FALSE) {
- if (olpd == NULL) {
- // Completion port bailed...
- break;
- }
+ if (olpd == NULL) {
+ // Completion port closed...
+ NNI_ASSERT(ok == FALSE);
+ break;
}
- NNI_ASSERT(olpd != NULL);
- evt = (void *) olpd;
+ evt = CONTAINING_RECORD(olpd, nni_win_event, olpd);
- NNI_ASSERT(evt->cb != NULL);
- evt->cb(evt->ptr);
+ if (ok) {
+ rv = ERROR_SUCCESS;
+ } else if ((rv = GetLastError()) == ERROR_OPERATION_ABORTED) {
+ // Canceled operation, we can't touch any
+ // of the memory, since it may be gone.
+ continue;
+ }
+
+ nni_mtx_lock(&evt->mtx);
+ if ((aio = evt->aio) == NULL) {
+ // Canceled?? Probably ERROR_OPERATION_ABORTED
+ // It's pretty unclear how we got here.
+ nni_mtx_unlock(&evt->mtx);
+ continue;
+ }
+ evt->aio = NULL;
+ evt->status = rv;
+ evt->count = cnt;
+
+ evt->ops.wev_finish(evt, aio);
+ nni_mtx_unlock(&evt->mtx);
+ }
+}
+
+static void
+nni_win_event_cancel(nni_aio *aio)
+{
+ nni_win_event *evt = aio->a_prov_data;
+
+ nni_mtx_lock(&evt->mtx);
+ evt->aio = NULL;
+
+ // Use provider specific cancellation.
+ evt->ops.wev_cancel(evt, aio);
+ nni_mtx_unlock(&evt->mtx);
+}
+
+void
+nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
+{
+ // This is just continuation of a pre-existing AIO operation.
+ // For example, continuing I/O of a multi-buffer s/g operation.
+ // The lock is held.
+ evt->status = ERROR_SUCCESS;
+ evt->count = 0;
+ if (!ResetEvent(evt->olpd.hEvent)) {
+ evt->status = GetLastError();
+ evt->count = 0;
+
+ evt->ops.wev_finish(evt, aio);
+ return;
+ }
+
+ evt->aio = aio;
+ if (evt->ops.wev_start(evt, aio) != 0) {
+ // Start completed synchronously. It will have stored
+ // the count and status in the evt.
+ evt->aio = NULL;
+ evt->ops.wev_finish(evt, aio);
+ }
+}
+
+void
+nni_win_event_submit(nni_win_event *evt, nni_aio *aio)
+{
+ nni_mtx_lock(&evt->mtx);
+ if (nni_aio_start(aio, nni_win_event_cancel, evt) != 0) {
+ // the aio was aborted
+ nni_mtx_unlock(&evt->mtx);
+ return;
+ }
+ nni_win_event_resubmit(evt, aio);
+ nni_mtx_unlock(&evt->mtx);
+}
+
+void
+nni_win_event_complete(nni_win_event *evt, int cnt)
+{
+ PostQueuedCompletionStatus(nni_win_global_iocp, cnt, 0, &evt->olpd);
+}
+
+void
+nni_win_event_close(nni_win_event *evt)
+{
+ nni_aio *aio;
+ nni_mtx_lock(&evt->mtx);
+ if (evt->h != NULL) {
+ if (CancelIoEx(evt->h, &evt->olpd)) {
+ DWORD cnt;
+ // Stall waiting for the I/O to complete.
+ GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE);
+ }
+ }
+ if ((aio = evt->aio) != NULL) {
+ evt->aio = NULL;
+ // We really don't care if we transferred data or not.
+ // The caller indicates they have closed the pipe.
+ evt->status = ERROR_INVALID_HANDLE;
+ evt->count = 0;
+ evt->ops.wev_finish(evt, aio);
}
+ nni_mtx_unlock(&evt->mtx);
}
int
@@ -72,16 +168,22 @@ nni_win_iocp_register(HANDLE h)
}
int
-nni_win_event_init(nni_win_event *evt, nni_cb cb, void *ptr, HANDLE h)
+nni_win_event_init(
+ nni_win_event *evt, nni_win_event_ops *ops, void *ptr, HANDLE h)
{
+ int rv;
+
ZeroMemory(&evt->olpd, sizeof(evt->olpd));
evt->olpd.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (evt->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
}
- nni_aio_list_init(&evt->aios);
+ if ((rv = nni_mtx_init(&evt->mtx)) != 0) {
+ return (rv); // NB: This will never happen on Windows.
+ }
+ evt->ops = *ops;
+ evt->aio = NULL;
evt->ptr = ptr;
- evt->cb = cb;
evt->h = h;
return (0);
}
@@ -93,51 +195,7 @@ nni_win_event_fini(nni_win_event *evt)
(void) CloseHandle(evt->olpd.hEvent);
evt->olpd.hEvent = NULL;
}
-}
-
-int
-nni_win_event_reset(nni_win_event *evt)
-{
- if (!ResetEvent(evt->olpd.hEvent)) {
- return (nni_win_error(GetLastError()));
- }
- return (0);
-}
-
-OVERLAPPED *
-nni_win_event_overlapped(nni_win_event *evt)
-{
- return (&evt->olpd);
-}
-
-void
-nni_win_event_cancel(nni_win_event *evt)
-{
- int rv;
- DWORD cnt;
-
- // Try to cancel the event...
- if (!CancelIoEx(evt->h, &evt->olpd)) {
- // None was found. That's good.
- if ((rv = GetLastError()) == ERROR_NOT_FOUND) {
- // Nothing queued. We may in theory be running
- // the callback via the completion port handler;
- // caller must synchronize that separately.
- return;
- }
-
- // It's unclear why we would ever fail in this
- // circumstance. Is there a kind of "uncancellable I/O"
- // here, or somesuch? In this case we just wait hard
- // using the success case -- its the best we can do.
- }
-
- // This basically just waits for the canceled I/O to complete.
- // The end result can be either success or ERROR_OPERATION_ABORTED.
- // It turns out we don't much care either way; we just want to make
- // sure that we don't have any I/O pending on the overlapped
- // structure before we release it or reuse it.
- GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE);
+ nni_mtx_fini(&evt->mtx);
}
int