aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/windows/win_impl.h7
-rw-r--r--src/platform/windows/win_iocp.c59
2 files changed, 53 insertions, 13 deletions
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index 4594da53..9049a81d 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -65,11 +65,18 @@ struct nni_win_event {
void * ptr;
nni_aio * aio;
nni_mtx mtx;
+ nni_cv cv;
+ int flags;
int count;
int status;
nni_win_event_ops ops;
};
+enum nni_win_event_flags {
+ NNI_WIN_EVENT_RUNNING = 1,
+ NNI_WIN_EVENT_ABORT = 2,
+};
+
extern int nni_win_error(int);
extern int nni_winsock_error(int);
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index 17f3b16d..306f029c 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -54,26 +54,27 @@ nni_win_iocp_handler(void *arg)
evt = CONTAINING_RECORD(olpd, nni_win_event, olpd);
+ nni_mtx_lock(&evt->mtx);
+
if (ok) {
rv = ERROR_SUCCESS;
- } else if ((rv = GetLastError()) == ERROR_OPERATION_ABORTED) {
- // Canceled operation, we can't touch any
- // of the memory, since it may be gone.
- continue;
+ } else {
+ rv = GetLastError();
}
- nni_mtx_lock(&evt->mtx);
- if ((aio = evt->aio) == NULL) {
- // Canceled?? Probably ERROR_OPERATION_ABORTED
- // It's pretty unclear how we got here.
- nni_mtx_unlock(&evt->mtx);
- continue;
- }
+ aio = evt->aio;
evt->aio = NULL;
evt->status = rv;
evt->count = cnt;
- evt->ops.wev_finish(evt, aio);
+ // Aborted operations don't get the finish callback done.
+ // All others do.
+ evt->flags &= ~NNI_WIN_EVENT_RUNNING;
+ if (evt->flags & NNI_WIN_EVENT_ABORT) {
+ nni_cv_wake(&evt->cv);
+ } else if ((rv != ERROR_OPERATION_ABORTED) && (aio != NULL)) {
+ evt->ops.wev_finish(evt, aio);
+ }
nni_mtx_unlock(&evt->mtx);
}
}
@@ -84,10 +85,16 @@ nni_win_event_cancel(nni_aio *aio)
nni_win_event *evt = aio->a_prov_data;
nni_mtx_lock(&evt->mtx);
+ evt->flags |= NNI_WIN_EVENT_ABORT;
evt->aio = NULL;
// Use provider specific cancellation.
evt->ops.wev_cancel(evt, aio);
+
+ // Wait for everything to stop referencing this.
+ while (evt->flags & NNI_WIN_EVENT_RUNNING) {
+ nni_cv_wait(&evt->cv);
+ }
nni_mtx_unlock(&evt->mtx);
}
@@ -97,6 +104,12 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
// This is just continuation of a pre-existing AIO operation.
// For example, continuing I/O of a multi-buffer s/g operation.
// The lock is held.
+
+ // Abort operation -- no further activity.
+ if (evt->flags & NNI_WIN_EVENT_ABORT) {
+ return;
+ }
+
evt->status = ERROR_SUCCESS;
evt->count = 0;
if (!ResetEvent(evt->olpd.hEvent)) {
@@ -108,9 +121,11 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
}
evt->aio = aio;
+ evt->flags |= NNI_WIN_EVENT_RUNNING;
if (evt->ops.wev_start(evt, aio) != 0) {
// Start completed synchronously. It will have stored
// the count and status in the evt.
+ evt->flags &= ~NNI_WIN_EVENT_RUNNING;
evt->aio = NULL;
evt->ops.wev_finish(evt, aio);
}
@@ -178,7 +193,8 @@ nni_win_event_init(
if (evt->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
}
- if ((rv = nni_mtx_init(&evt->mtx)) != 0) {
+ if (((rv = nni_mtx_init(&evt->mtx)) != 0) ||
+ ((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) {
return (rv); // NB: This will never happen on Windows.
}
evt->ops = *ops;
@@ -191,10 +207,27 @@ nni_win_event_init(
void
nni_win_event_fini(nni_win_event *evt)
{
+ nni_aio *aio;
+ nni_mtx_lock(&evt->mtx);
+ if ((aio = evt->aio) != NULL) {
+ evt->flags |= NNI_WIN_EVENT_ABORT;
+ evt->aio = NULL;
+
+ // Use provider specific cancellation.
+ evt->ops.wev_cancel(evt, aio);
+
+ // Wait for everything to stop referencing this.
+ while (evt->flags & NNI_WIN_EVENT_RUNNING) {
+ nni_cv_wait(&evt->cv);
+ }
+ }
+ nni_mtx_unlock(&evt->mtx);
+
if (evt->olpd.hEvent != NULL) {
(void) CloseHandle(evt->olpd.hEvent);
evt->olpd.hEvent = NULL;
}
+ nni_cv_fini(&evt->cv);
nni_mtx_fini(&evt->mtx);
}