From dc334d7193a2a0bc0194221b853a37e1be7f5b9a Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 4 Aug 2017 17:17:42 -0700 Subject: Refactor AIO logic to close numerous races and reduce complexity. This passes valgrind 100% clean for both helgrind and deep leak checks. This represents a complete rethink of how the AIOs work, and much simpler synchronization; the provider API is a bit simpler to boot, as a number of failure modes have been simply eliminated. While here a few other minor bugs were squashed. --- src/platform/windows/win_iocp.c | 101 ++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 56 deletions(-) (limited to 'src/platform/windows/win_iocp.c') diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index df0357c8..c4cdcb8a 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -15,7 +15,7 @@ #define NNI_WIN_IOCP_NTHREADS 4 #include -// Windows IO Completion Port support. We basically creaet a single +// Windows IO Completion Port support. We basically create a single // IO completion port, then start threads on it. Handles are added // to the port on an as needed basis. We use a single IO completion // port for pretty much everything. @@ -24,6 +24,18 @@ static HANDLE nni_win_global_iocp = NULL; static nni_thr nni_win_iocp_thrs[NNI_WIN_IOCP_NTHREADS]; static nni_mtx nni_win_iocp_mtx; +static void +nni_win_event_finish(nni_win_event *evt, nni_aio *aio) +{ + evt->run = 0; + if (aio != NULL) { + evt->ops.wev_finish(evt, aio); + } + if (evt->fini) { + nni_cv_wake(&evt->cv); + } +} + static void nni_win_iocp_handler(void *arg) { @@ -59,42 +71,30 @@ nni_win_iocp_handler(void *arg) if (ok) { rv = ERROR_SUCCESS; - } else { - rv = GetLastError(); + } else if (evt->status == 0) { + evt->status = nni_win_error(GetLastError()); } - aio = evt->aio; - evt->aio = NULL; - evt->status = rv; - evt->count = cnt; - - // Aborted operations don't get the finish callback done. - // All others do. - evt->flags &= ~NNI_WIN_EVENT_RUNNING; - if (evt->flags & NNI_WIN_EVENT_ABORT) { - nni_cv_wake(&evt->cv); - } else if ((rv != ERROR_OPERATION_ABORTED) && (aio != NULL)) { - evt->ops.wev_finish(evt, aio); - } + aio = evt->aio; + evt->aio = NULL; + evt->count = cnt; + + nni_win_event_finish(evt, aio); nni_mtx_unlock(&evt->mtx); } } static void -nni_win_event_cancel(nni_aio *aio) +nni_win_event_cancel(nni_aio *aio, int rv) { nni_win_event *evt = aio->a_prov_data; nni_mtx_lock(&evt->mtx); - evt->flags |= NNI_WIN_EVENT_ABORT; - evt->aio = NULL; - - // Use provider specific cancellation. - evt->ops.wev_cancel(evt); + if (evt->aio == aio) { + evt->status = rv; - // Wait for everything to stop referencing this. - while (evt->flags & NNI_WIN_EVENT_RUNNING) { - nni_cv_wait(&evt->cv); + // Use provider specific cancellation. + evt->ops.wev_cancel(evt); } nni_mtx_unlock(&evt->mtx); } @@ -107,28 +107,28 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) // The lock is held. // Abort operation -- no further activity. - if (evt->flags & NNI_WIN_EVENT_ABORT) { + if (evt->fini) { + evt->run = 0; + nni_cv_wake(&evt->cv); return; } - evt->status = ERROR_SUCCESS; + evt->status = 0; evt->count = 0; if (!ResetEvent(evt->olpd.hEvent)) { - evt->status = GetLastError(); + evt->status = nni_win_error(GetLastError()); evt->count = 0; - - evt->ops.wev_finish(evt, aio); + nni_win_event_finish(evt, aio); return; } evt->aio = aio; - evt->flags |= NNI_WIN_EVENT_RUNNING; + evt->run = 1; if (evt->ops.wev_start(evt, aio) != 0) { // Start completed synchronously. It will have stored // the count and status in the evt. - evt->flags &= ~NNI_WIN_EVENT_RUNNING; evt->aio = NULL; - evt->ops.wev_finish(evt, aio); + nni_win_event_finish(evt, aio); } } @@ -154,20 +154,10 @@ nni_win_event_complete(nni_win_event *evt, int cnt) void nni_win_event_close(nni_win_event *evt) { - nni_aio *aio; - if (evt->ptr != NULL) { nni_mtx_lock(&evt->mtx); - evt->flags |= NNI_WIN_EVENT_ABORT; + evt->status = NNG_ECLOSED; evt->ops.wev_cancel(evt); - if ((aio = evt->aio) != NULL) { - evt->aio = NULL; - // We really don't care if we transferred data or not. - // The caller indicates they have closed the pipe. - evt->status = ERROR_INVALID_HANDLE; - evt->count = 0; - evt->ops.wev_finish(evt, aio); - } nni_mtx_unlock(&evt->mtx); } } @@ -195,28 +185,27 @@ nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr) ((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) { return (rv); // NB: This will never happen on Windows. } - evt->ops = *ops; - evt->aio = NULL; - evt->ptr = ptr; + evt->ops = *ops; + evt->aio = NULL; + evt->ptr = ptr; + evt->fini = 0; + evt->run = 0; return (0); } void nni_win_event_fini(nni_win_event *evt) { - nni_aio *aio; - if (evt->ptr != NULL) { nni_mtx_lock(&evt->mtx); - if ((aio = evt->aio) != NULL) { - evt->flags |= NNI_WIN_EVENT_ABORT; - evt->aio = NULL; - // Use provider specific cancellation. - evt->ops.wev_cancel(evt); - } + evt->fini = 1; + + // Use provider specific cancellation. + evt->ops.wev_cancel(evt); + // Wait for everything to stop referencing this. - while (evt->flags & NNI_WIN_EVENT_RUNNING) { + while (evt->run) { nni_cv_wait(&evt->cv); } -- cgit v1.2.3-70-g09d2