diff options
Diffstat (limited to 'src/platform/windows/win_iocp.c')
| -rw-r--r-- | src/platform/windows/win_iocp.c | 186 |
1 files changed, 122 insertions, 64 deletions
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index 7244b360..17f3b16d 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -31,35 +31,131 @@ nni_win_iocp_handler(void *arg) ULONG_PTR key; OVERLAPPED * olpd; nni_win_event *evt; - int status; - BOOL rv; + int rv; + BOOL ok; + nni_aio * aio; NNI_ARG_UNUSED(arg); iocp = nni_win_global_iocp; for (;;) { - key = 0; - olpd = NULL; - status = 0; - cnt = 0; + key = 0; + olpd = NULL; - rv = GetQueuedCompletionStatus( + ok = GetQueuedCompletionStatus( iocp, &cnt, &key, &olpd, INFINITE); - if (rv == FALSE) { - if (olpd == NULL) { - // Completion port bailed... - break; - } + if (olpd == NULL) { + // Completion port closed... + NNI_ASSERT(ok == FALSE); + break; } - NNI_ASSERT(olpd != NULL); - evt = (void *) olpd; + evt = CONTAINING_RECORD(olpd, nni_win_event, olpd); - NNI_ASSERT(evt->cb != NULL); - evt->cb(evt->ptr); + if (ok) { + rv = ERROR_SUCCESS; + } else if ((rv = GetLastError()) == ERROR_OPERATION_ABORTED) { + // Canceled operation, we can't touch any + // of the memory, since it may be gone. + continue; + } + + nni_mtx_lock(&evt->mtx); + if ((aio = evt->aio) == NULL) { + // Canceled?? Probably ERROR_OPERATION_ABORTED + // It's pretty unclear how we got here. + nni_mtx_unlock(&evt->mtx); + continue; + } + evt->aio = NULL; + evt->status = rv; + evt->count = cnt; + + evt->ops.wev_finish(evt, aio); + nni_mtx_unlock(&evt->mtx); + } +} + +static void +nni_win_event_cancel(nni_aio *aio) +{ + nni_win_event *evt = aio->a_prov_data; + + nni_mtx_lock(&evt->mtx); + evt->aio = NULL; + + // Use provider specific cancellation. + evt->ops.wev_cancel(evt, aio); + nni_mtx_unlock(&evt->mtx); +} + +void +nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) +{ + // This is just continuation of a pre-existing AIO operation. + // For example, continuing I/O of a multi-buffer s/g operation. + // The lock is held. + evt->status = ERROR_SUCCESS; + evt->count = 0; + if (!ResetEvent(evt->olpd.hEvent)) { + evt->status = GetLastError(); + evt->count = 0; + + evt->ops.wev_finish(evt, aio); + return; + } + + evt->aio = aio; + if (evt->ops.wev_start(evt, aio) != 0) { + // Start completed synchronously. It will have stored + // the count and status in the evt. + evt->aio = NULL; + evt->ops.wev_finish(evt, aio); + } +} + +void +nni_win_event_submit(nni_win_event *evt, nni_aio *aio) +{ + nni_mtx_lock(&evt->mtx); + if (nni_aio_start(aio, nni_win_event_cancel, evt) != 0) { + // the aio was aborted + nni_mtx_unlock(&evt->mtx); + return; + } + nni_win_event_resubmit(evt, aio); + nni_mtx_unlock(&evt->mtx); +} + +void +nni_win_event_complete(nni_win_event *evt, int cnt) +{ + PostQueuedCompletionStatus(nni_win_global_iocp, cnt, 0, &evt->olpd); +} + +void +nni_win_event_close(nni_win_event *evt) +{ + nni_aio *aio; + nni_mtx_lock(&evt->mtx); + if (evt->h != NULL) { + if (CancelIoEx(evt->h, &evt->olpd)) { + DWORD cnt; + // Stall waiting for the I/O to complete. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + } + } + if ((aio = evt->aio) != NULL) { + evt->aio = NULL; + // We really don't care if we transferred data or not. + // The caller indicates they have closed the pipe. + evt->status = ERROR_INVALID_HANDLE; + evt->count = 0; + evt->ops.wev_finish(evt, aio); } + nni_mtx_unlock(&evt->mtx); } int @@ -72,16 +168,22 @@ nni_win_iocp_register(HANDLE h) } int -nni_win_event_init(nni_win_event *evt, nni_cb cb, void *ptr, HANDLE h) +nni_win_event_init( + nni_win_event *evt, nni_win_event_ops *ops, void *ptr, HANDLE h) { + int rv; + ZeroMemory(&evt->olpd, sizeof(evt->olpd)); evt->olpd.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (evt->olpd.hEvent == NULL) { return (nni_win_error(GetLastError())); } - nni_aio_list_init(&evt->aios); + if ((rv = nni_mtx_init(&evt->mtx)) != 0) { + return (rv); // NB: This will never happen on Windows. + } + evt->ops = *ops; + evt->aio = NULL; evt->ptr = ptr; - evt->cb = cb; evt->h = h; return (0); } @@ -93,51 +195,7 @@ nni_win_event_fini(nni_win_event *evt) (void) CloseHandle(evt->olpd.hEvent); evt->olpd.hEvent = NULL; } -} - -int -nni_win_event_reset(nni_win_event *evt) -{ - if (!ResetEvent(evt->olpd.hEvent)) { - return (nni_win_error(GetLastError())); - } - return (0); -} - -OVERLAPPED * -nni_win_event_overlapped(nni_win_event *evt) -{ - return (&evt->olpd); -} - -void -nni_win_event_cancel(nni_win_event *evt) -{ - int rv; - DWORD cnt; - - // Try to cancel the event... - if (!CancelIoEx(evt->h, &evt->olpd)) { - // None was found. That's good. - if ((rv = GetLastError()) == ERROR_NOT_FOUND) { - // Nothing queued. We may in theory be running - // the callback via the completion port handler; - // caller must synchronize that separately. - return; - } - - // It's unclear why we would ever fail in this - // circumstance. Is there a kind of "uncancellable I/O" - // here, or somesuch? In this case we just wait hard - // using the success case -- its the best we can do. - } - - // This basically just waits for the canceled I/O to complete. - // The end result can be either success or ERROR_OPERATION_ABORTED. - // It turns out we don't much care either way; we just want to make - // sure that we don't have any I/O pending on the overlapped - // structure before we release it or reuse it. - GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + nni_mtx_fini(&evt->mtx); } int |
