diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-11 22:59:38 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-11 22:59:38 -0700 |
| commit | 8741c4421ec7a5e889c05a3d7dd46feee93ddf9a (patch) | |
| tree | 9024d46ff202b065c67c2ea75ee5e43417ce4cdb /src/platform/windows/win_iocp.c | |
| parent | 183bd7e02c81bc09c17c6f4c0d3883d4d45221fc (diff) | |
| download | nng-8741c4421ec7a5e889c05a3d7dd46feee93ddf9a.tar.gz nng-8741c4421ec7a5e889c05a3d7dd46feee93ddf9a.tar.bz2 nng-8741c4421ec7a5e889c05a3d7dd46feee93ddf9a.zip | |
Windows IPC working, mostly.
The IOCP code has been refactored to improve reuse, and hopefully
will be easier to use with TCP now. Windows IPC using Named Pipes
is mostly working -- mostly because there is a gnarly close-race.
It seems that we need to take some more care to ensure that the
pipe is not released while requests may be outstanding -- so some
deeper synchronization between the IOCP callback logic and the
win_event code is needed. In short, we need to add a condvar to
the event, and notice when we have submitted work for async completion,
and make sure we flag the event "idle" after either completion or
cancellation of the event.
Diffstat (limited to 'src/platform/windows/win_iocp.c')
| -rw-r--r-- | src/platform/windows/win_iocp.c | 186 |
1 files changed, 122 insertions, 64 deletions
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index 7244b360..17f3b16d 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -31,35 +31,131 @@ nni_win_iocp_handler(void *arg) ULONG_PTR key; OVERLAPPED * olpd; nni_win_event *evt; - int status; - BOOL rv; + int rv; + BOOL ok; + nni_aio * aio; NNI_ARG_UNUSED(arg); iocp = nni_win_global_iocp; for (;;) { - key = 0; - olpd = NULL; - status = 0; - cnt = 0; + key = 0; + olpd = NULL; - rv = GetQueuedCompletionStatus( + ok = GetQueuedCompletionStatus( iocp, &cnt, &key, &olpd, INFINITE); - if (rv == FALSE) { - if (olpd == NULL) { - // Completion port bailed... - break; - } + if (olpd == NULL) { + // Completion port closed... + NNI_ASSERT(ok == FALSE); + break; } - NNI_ASSERT(olpd != NULL); - evt = (void *) olpd; + evt = CONTAINING_RECORD(olpd, nni_win_event, olpd); - NNI_ASSERT(evt->cb != NULL); - evt->cb(evt->ptr); + if (ok) { + rv = ERROR_SUCCESS; + } else if ((rv = GetLastError()) == ERROR_OPERATION_ABORTED) { + // Canceled operation, we can't touch any + // of the memory, since it may be gone. + continue; + } + + nni_mtx_lock(&evt->mtx); + if ((aio = evt->aio) == NULL) { + // Canceled?? Probably ERROR_OPERATION_ABORTED + // It's pretty unclear how we got here. + nni_mtx_unlock(&evt->mtx); + continue; + } + evt->aio = NULL; + evt->status = rv; + evt->count = cnt; + + evt->ops.wev_finish(evt, aio); + nni_mtx_unlock(&evt->mtx); + } +} + +static void +nni_win_event_cancel(nni_aio *aio) +{ + nni_win_event *evt = aio->a_prov_data; + + nni_mtx_lock(&evt->mtx); + evt->aio = NULL; + + // Use provider specific cancellation. + evt->ops.wev_cancel(evt, aio); + nni_mtx_unlock(&evt->mtx); +} + +void +nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) +{ + // This is just continuation of a pre-existing AIO operation. + // For example, continuing I/O of a multi-buffer s/g operation. + // The lock is held. + evt->status = ERROR_SUCCESS; + evt->count = 0; + if (!ResetEvent(evt->olpd.hEvent)) { + evt->status = GetLastError(); + evt->count = 0; + + evt->ops.wev_finish(evt, aio); + return; + } + + evt->aio = aio; + if (evt->ops.wev_start(evt, aio) != 0) { + // Start completed synchronously. It will have stored + // the count and status in the evt. + evt->aio = NULL; + evt->ops.wev_finish(evt, aio); + } +} + +void +nni_win_event_submit(nni_win_event *evt, nni_aio *aio) +{ + nni_mtx_lock(&evt->mtx); + if (nni_aio_start(aio, nni_win_event_cancel, evt) != 0) { + // the aio was aborted + nni_mtx_unlock(&evt->mtx); + return; + } + nni_win_event_resubmit(evt, aio); + nni_mtx_unlock(&evt->mtx); +} + +void +nni_win_event_complete(nni_win_event *evt, int cnt) +{ + PostQueuedCompletionStatus(nni_win_global_iocp, cnt, 0, &evt->olpd); +} + +void +nni_win_event_close(nni_win_event *evt) +{ + nni_aio *aio; + nni_mtx_lock(&evt->mtx); + if (evt->h != NULL) { + if (CancelIoEx(evt->h, &evt->olpd)) { + DWORD cnt; + // Stall waiting for the I/O to complete. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + } + } + if ((aio = evt->aio) != NULL) { + evt->aio = NULL; + // We really don't care if we transferred data or not. + // The caller indicates they have closed the pipe. + evt->status = ERROR_INVALID_HANDLE; + evt->count = 0; + evt->ops.wev_finish(evt, aio); } + nni_mtx_unlock(&evt->mtx); } int @@ -72,16 +168,22 @@ nni_win_iocp_register(HANDLE h) } int -nni_win_event_init(nni_win_event *evt, nni_cb cb, void *ptr, HANDLE h) +nni_win_event_init( + nni_win_event *evt, nni_win_event_ops *ops, void *ptr, HANDLE h) { + int rv; + ZeroMemory(&evt->olpd, sizeof(evt->olpd)); evt->olpd.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (evt->olpd.hEvent == NULL) { return (nni_win_error(GetLastError())); } - nni_aio_list_init(&evt->aios); + if ((rv = nni_mtx_init(&evt->mtx)) != 0) { + return (rv); // NB: This will never happen on Windows. + } + evt->ops = *ops; + evt->aio = NULL; evt->ptr = ptr; - evt->cb = cb; evt->h = h; return (0); } @@ -93,51 +195,7 @@ nni_win_event_fini(nni_win_event *evt) (void) CloseHandle(evt->olpd.hEvent); evt->olpd.hEvent = NULL; } -} - -int -nni_win_event_reset(nni_win_event *evt) -{ - if (!ResetEvent(evt->olpd.hEvent)) { - return (nni_win_error(GetLastError())); - } - return (0); -} - -OVERLAPPED * -nni_win_event_overlapped(nni_win_event *evt) -{ - return (&evt->olpd); -} - -void -nni_win_event_cancel(nni_win_event *evt) -{ - int rv; - DWORD cnt; - - // Try to cancel the event... - if (!CancelIoEx(evt->h, &evt->olpd)) { - // None was found. That's good. - if ((rv = GetLastError()) == ERROR_NOT_FOUND) { - // Nothing queued. We may in theory be running - // the callback via the completion port handler; - // caller must synchronize that separately. - return; - } - - // It's unclear why we would ever fail in this - // circumstance. Is there a kind of "uncancellable I/O" - // here, or somesuch? In this case we just wait hard - // using the success case -- its the best we can do. - } - - // This basically just waits for the canceled I/O to complete. - // The end result can be either success or ERROR_OPERATION_ABORTED. - // It turns out we don't much care either way; we just want to make - // sure that we don't have any I/O pending on the overlapped - // structure before we release it or reuse it. - GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + nni_mtx_fini(&evt->mtx); } int |
