From 8741c4421ec7a5e889c05a3d7dd46feee93ddf9a Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Tue, 11 Jul 2017 22:59:38 -0700 Subject: Windows IPC working, mostly. The IOCP code has been refactored to improve reuse, and hopefully will be easier to use with TCP now. Windows IPC using Named Pipes is mostly working -- mostly because there is a gnarly close-race. It seems that we need to take some more care to ensure that the pipe is not released while requests may be outstanding -- so some deeper synchronization between the IOCP callback logic and the win_event code is needed. In short, we need to add a condvar to the event, and notice when we have submitted work for async completion, and make sure we flag the event "idle" after either completion or cancellation of the event. --- src/platform/windows/win_iocp.c | 186 ++++++++++++++++++++++++++-------------- 1 file changed, 122 insertions(+), 64 deletions(-) (limited to 'src/platform/windows/win_iocp.c') diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index 7244b360..17f3b16d 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -31,35 +31,131 @@ nni_win_iocp_handler(void *arg) ULONG_PTR key; OVERLAPPED * olpd; nni_win_event *evt; - int status; - BOOL rv; + int rv; + BOOL ok; + nni_aio * aio; NNI_ARG_UNUSED(arg); iocp = nni_win_global_iocp; for (;;) { - key = 0; - olpd = NULL; - status = 0; - cnt = 0; + key = 0; + olpd = NULL; - rv = GetQueuedCompletionStatus( + ok = GetQueuedCompletionStatus( iocp, &cnt, &key, &olpd, INFINITE); - if (rv == FALSE) { - if (olpd == NULL) { - // Completion port bailed... - break; - } + if (olpd == NULL) { + // Completion port closed... + NNI_ASSERT(ok == FALSE); + break; } - NNI_ASSERT(olpd != NULL); - evt = (void *) olpd; + evt = CONTAINING_RECORD(olpd, nni_win_event, olpd); - NNI_ASSERT(evt->cb != NULL); - evt->cb(evt->ptr); + if (ok) { + rv = ERROR_SUCCESS; + } else if ((rv = GetLastError()) == ERROR_OPERATION_ABORTED) { + // Canceled operation, we can't touch any + // of the memory, since it may be gone. + continue; + } + + nni_mtx_lock(&evt->mtx); + if ((aio = evt->aio) == NULL) { + // Canceled?? Probably ERROR_OPERATION_ABORTED + // It's pretty unclear how we got here. + nni_mtx_unlock(&evt->mtx); + continue; + } + evt->aio = NULL; + evt->status = rv; + evt->count = cnt; + + evt->ops.wev_finish(evt, aio); + nni_mtx_unlock(&evt->mtx); + } +} + +static void +nni_win_event_cancel(nni_aio *aio) +{ + nni_win_event *evt = aio->a_prov_data; + + nni_mtx_lock(&evt->mtx); + evt->aio = NULL; + + // Use provider specific cancellation. + evt->ops.wev_cancel(evt, aio); + nni_mtx_unlock(&evt->mtx); +} + +void +nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) +{ + // This is just continuation of a pre-existing AIO operation. + // For example, continuing I/O of a multi-buffer s/g operation. + // The lock is held. + evt->status = ERROR_SUCCESS; + evt->count = 0; + if (!ResetEvent(evt->olpd.hEvent)) { + evt->status = GetLastError(); + evt->count = 0; + + evt->ops.wev_finish(evt, aio); + return; + } + + evt->aio = aio; + if (evt->ops.wev_start(evt, aio) != 0) { + // Start completed synchronously. It will have stored + // the count and status in the evt. + evt->aio = NULL; + evt->ops.wev_finish(evt, aio); + } +} + +void +nni_win_event_submit(nni_win_event *evt, nni_aio *aio) +{ + nni_mtx_lock(&evt->mtx); + if (nni_aio_start(aio, nni_win_event_cancel, evt) != 0) { + // the aio was aborted + nni_mtx_unlock(&evt->mtx); + return; + } + nni_win_event_resubmit(evt, aio); + nni_mtx_unlock(&evt->mtx); +} + +void +nni_win_event_complete(nni_win_event *evt, int cnt) +{ + PostQueuedCompletionStatus(nni_win_global_iocp, cnt, 0, &evt->olpd); +} + +void +nni_win_event_close(nni_win_event *evt) +{ + nni_aio *aio; + nni_mtx_lock(&evt->mtx); + if (evt->h != NULL) { + if (CancelIoEx(evt->h, &evt->olpd)) { + DWORD cnt; + // Stall waiting for the I/O to complete. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + } + } + if ((aio = evt->aio) != NULL) { + evt->aio = NULL; + // We really don't care if we transferred data or not. + // The caller indicates they have closed the pipe. + evt->status = ERROR_INVALID_HANDLE; + evt->count = 0; + evt->ops.wev_finish(evt, aio); } + nni_mtx_unlock(&evt->mtx); } int @@ -72,16 +168,22 @@ nni_win_iocp_register(HANDLE h) } int -nni_win_event_init(nni_win_event *evt, nni_cb cb, void *ptr, HANDLE h) +nni_win_event_init( + nni_win_event *evt, nni_win_event_ops *ops, void *ptr, HANDLE h) { + int rv; + ZeroMemory(&evt->olpd, sizeof(evt->olpd)); evt->olpd.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (evt->olpd.hEvent == NULL) { return (nni_win_error(GetLastError())); } - nni_aio_list_init(&evt->aios); + if ((rv = nni_mtx_init(&evt->mtx)) != 0) { + return (rv); // NB: This will never happen on Windows. + } + evt->ops = *ops; + evt->aio = NULL; evt->ptr = ptr; - evt->cb = cb; evt->h = h; return (0); } @@ -93,51 +195,7 @@ nni_win_event_fini(nni_win_event *evt) (void) CloseHandle(evt->olpd.hEvent); evt->olpd.hEvent = NULL; } -} - -int -nni_win_event_reset(nni_win_event *evt) -{ - if (!ResetEvent(evt->olpd.hEvent)) { - return (nni_win_error(GetLastError())); - } - return (0); -} - -OVERLAPPED * -nni_win_event_overlapped(nni_win_event *evt) -{ - return (&evt->olpd); -} - -void -nni_win_event_cancel(nni_win_event *evt) -{ - int rv; - DWORD cnt; - - // Try to cancel the event... - if (!CancelIoEx(evt->h, &evt->olpd)) { - // None was found. That's good. - if ((rv = GetLastError()) == ERROR_NOT_FOUND) { - // Nothing queued. We may in theory be running - // the callback via the completion port handler; - // caller must synchronize that separately. - return; - } - - // It's unclear why we would ever fail in this - // circumstance. Is there a kind of "uncancellable I/O" - // here, or somesuch? In this case we just wait hard - // using the success case -- its the best we can do. - } - - // This basically just waits for the canceled I/O to complete. - // The end result can be either success or ERROR_OPERATION_ABORTED. - // It turns out we don't much care either way; we just want to make - // sure that we don't have any I/O pending on the overlapped - // structure before we release it or reuse it. - GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + nni_mtx_fini(&evt->mtx); } int -- cgit v1.2.3-70-g09d2