diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-04 17:17:42 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-04 21:20:00 -0700 |
| commit | dc334d7193a2a0bc0194221b853a37e1be7f5b9a (patch) | |
| tree | 1eebf2773745a3a25e8a071fbe4f51cd5490d4e4 /src/platform/windows | |
| parent | 6887900ae033add30ee0151b72abe927c5239588 (diff) | |
| download | nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.gz nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.bz2 nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.zip | |
Refactor AIO logic to close numerous races and reduce complexity.
This passes valgrind 100% clean for both helgrind and deep leak
checks. This represents a complete rethink of how the AIOs work,
and much simpler synchronization; the provider API is a bit simpler
to boot, as a number of failure modes have been simply eliminated.
While here a few other minor bugs were squashed.
Diffstat (limited to 'src/platform/windows')
| -rw-r--r-- | src/platform/windows/win_impl.h | 9 | ||||
| -rw-r--r-- | src/platform/windows/win_iocp.c | 101 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 58 | ||||
| -rw-r--r-- | src/platform/windows/win_net.c | 66 | ||||
| -rw-r--r-- | src/platform/windows/win_resolv.c | 28 |
5 files changed, 103 insertions, 159 deletions
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index a77fcf0b..a2700485 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -65,17 +66,13 @@ struct nni_win_event { nni_aio * aio; nni_mtx mtx; nni_cv cv; - int flags; + unsigned run : 1; + unsigned fini : 1; int count; int status; nni_win_event_ops ops; }; -enum nni_win_event_flags { - NNI_WIN_EVENT_RUNNING = 1, - NNI_WIN_EVENT_ABORT = 2, -}; - extern int nni_win_error(int); extern int nni_win_event_init(nni_win_event *, nni_win_event_ops *, void *); diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index df0357c8..c4cdcb8a 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -15,7 +15,7 @@ #define NNI_WIN_IOCP_NTHREADS 4 #include <stdio.h> -// Windows IO Completion Port support. We basically creaet a single +// Windows IO Completion Port support. We basically create a single // IO completion port, then start threads on it. Handles are added // to the port on an as needed basis. We use a single IO completion // port for pretty much everything. @@ -25,6 +25,18 @@ static nni_thr nni_win_iocp_thrs[NNI_WIN_IOCP_NTHREADS]; static nni_mtx nni_win_iocp_mtx; static void +nni_win_event_finish(nni_win_event *evt, nni_aio *aio) +{ + evt->run = 0; + if (aio != NULL) { + evt->ops.wev_finish(evt, aio); + } + if (evt->fini) { + nni_cv_wake(&evt->cv); + } +} + +static void nni_win_iocp_handler(void *arg) { HANDLE iocp; @@ -59,42 +71,30 @@ nni_win_iocp_handler(void *arg) if (ok) { rv = ERROR_SUCCESS; - } else { - rv = GetLastError(); + } else if (evt->status == 0) { + evt->status = nni_win_error(GetLastError()); } - aio = evt->aio; - evt->aio = NULL; - evt->status = rv; - evt->count = cnt; - - // Aborted operations don't get the finish callback done. - // All others do. - evt->flags &= ~NNI_WIN_EVENT_RUNNING; - if (evt->flags & NNI_WIN_EVENT_ABORT) { - nni_cv_wake(&evt->cv); - } else if ((rv != ERROR_OPERATION_ABORTED) && (aio != NULL)) { - evt->ops.wev_finish(evt, aio); - } + aio = evt->aio; + evt->aio = NULL; + evt->count = cnt; + + nni_win_event_finish(evt, aio); nni_mtx_unlock(&evt->mtx); } } static void -nni_win_event_cancel(nni_aio *aio) +nni_win_event_cancel(nni_aio *aio, int rv) { nni_win_event *evt = aio->a_prov_data; nni_mtx_lock(&evt->mtx); - evt->flags |= NNI_WIN_EVENT_ABORT; - evt->aio = NULL; - - // Use provider specific cancellation. - evt->ops.wev_cancel(evt); + if (evt->aio == aio) { + evt->status = rv; - // Wait for everything to stop referencing this. - while (evt->flags & NNI_WIN_EVENT_RUNNING) { - nni_cv_wait(&evt->cv); + // Use provider specific cancellation. + evt->ops.wev_cancel(evt); } nni_mtx_unlock(&evt->mtx); } @@ -107,28 +107,28 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) // The lock is held. // Abort operation -- no further activity. - if (evt->flags & NNI_WIN_EVENT_ABORT) { + if (evt->fini) { + evt->run = 0; + nni_cv_wake(&evt->cv); return; } - evt->status = ERROR_SUCCESS; + evt->status = 0; evt->count = 0; if (!ResetEvent(evt->olpd.hEvent)) { - evt->status = GetLastError(); + evt->status = nni_win_error(GetLastError()); evt->count = 0; - - evt->ops.wev_finish(evt, aio); + nni_win_event_finish(evt, aio); return; } evt->aio = aio; - evt->flags |= NNI_WIN_EVENT_RUNNING; + evt->run = 1; if (evt->ops.wev_start(evt, aio) != 0) { // Start completed synchronously. It will have stored // the count and status in the evt. - evt->flags &= ~NNI_WIN_EVENT_RUNNING; evt->aio = NULL; - evt->ops.wev_finish(evt, aio); + nni_win_event_finish(evt, aio); } } @@ -154,20 +154,10 @@ nni_win_event_complete(nni_win_event *evt, int cnt) void nni_win_event_close(nni_win_event *evt) { - nni_aio *aio; - if (evt->ptr != NULL) { nni_mtx_lock(&evt->mtx); - evt->flags |= NNI_WIN_EVENT_ABORT; + evt->status = NNG_ECLOSED; evt->ops.wev_cancel(evt); - if ((aio = evt->aio) != NULL) { - evt->aio = NULL; - // We really don't care if we transferred data or not. - // The caller indicates they have closed the pipe. - evt->status = ERROR_INVALID_HANDLE; - evt->count = 0; - evt->ops.wev_finish(evt, aio); - } nni_mtx_unlock(&evt->mtx); } } @@ -195,28 +185,27 @@ nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr) ((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) { return (rv); // NB: This will never happen on Windows. } - evt->ops = *ops; - evt->aio = NULL; - evt->ptr = ptr; + evt->ops = *ops; + evt->aio = NULL; + evt->ptr = ptr; + evt->fini = 0; + evt->run = 0; return (0); } void nni_win_event_fini(nni_win_event *evt) { - nni_aio *aio; - if (evt->ptr != NULL) { nni_mtx_lock(&evt->mtx); - if ((aio = evt->aio) != NULL) { - evt->flags |= NNI_WIN_EVENT_ABORT; - evt->aio = NULL; - // Use provider specific cancellation. - evt->ops.wev_cancel(evt); - } + evt->fini = 1; + + // Use provider specific cancellation. + evt->ops.wev_cancel(evt); + // Wait for everything to stop referencing this. - while (evt->flags & NNI_WIN_EVENT_RUNNING) { + while (evt->run) { nni_cv_wait(&evt->cv); } diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index bd9ce26d..c9eb20ec 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -65,7 +65,7 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); if (pipe->p == INVALID_HANDLE_VALUE) { - evt->status = ERROR_INVALID_HANDLE; + evt->status = NNG_ECLOSED; evt->count = 0; return (1); } @@ -92,7 +92,7 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) } if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) { // Synchronous failure. - evt->status = rv; + evt->status = nni_win_error(rv); evt->count = 0; return (1); } @@ -108,13 +108,7 @@ nni_win_ipc_pipe_cancel(nni_win_event *evt) { nni_plat_ipc_pipe *pipe = evt->ptr; - if (CancelIoEx(pipe->p, &evt->olpd)) { - DWORD cnt; - - // If we canceled, make sure that we've completely - // finished with the overlapped. - GetOverlappedResult(pipe->p, &evt->olpd, &cnt, TRUE); - } + CancelIoEx(pipe->p, &evt->olpd); } static void @@ -146,7 +140,7 @@ nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio) } // All done; hopefully successfully. - nni_aio_finish(aio, nni_win_error(rv), aio->a_count); + nni_aio_finish(aio, rv, aio->a_count); } static int @@ -294,7 +288,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) HANDLE newp, oldp; if ((rv = evt->status) != 0) { - nni_aio_finish(aio, nni_win_error(rv), 0); + nni_aio_finish_error(aio, rv); return; } @@ -308,7 +302,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) // We connected, but as we cannot get a new pipe, // we have to disconnect the old one. DisconnectNamedPipe(ep->p); - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); return; } if ((rv = nni_win_iocp_register(newp)) != 0) { @@ -317,7 +311,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) // And discard the half-baked new one. DisconnectNamedPipe(newp); (void) CloseHandle(newp); - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); return; } @@ -329,14 +323,11 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) // the old one, since failed to be able to use it. DisconnectNamedPipe(oldp); (void) CloseHandle(oldp); - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); return; } - // What if the pipe is already finished? - if (nni_aio_finish_pipe(aio, 0, pipe) != 0) { - nni_plat_ipc_pipe_fini(pipe); - } + nni_aio_finish_pipe(aio, pipe); } static void @@ -344,13 +335,7 @@ nni_win_ipc_acc_cancel(nni_win_event *evt) { nni_plat_ipc_ep *ep = evt->ptr; - if (CancelIoEx(ep->p, &evt->olpd)) { - DWORD cnt; - - // If we canceled, make sure that we've completely - // finished with the overlapped. - GetOverlappedResult(ep->p, &evt->olpd, &cnt, TRUE); - } + (void) CancelIoEx(ep->p, &evt->olpd); // Just to be sure. (void) DisconnectNamedPipe(ep->p); } @@ -376,7 +361,7 @@ nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio) default: // Fast-fail (synchronous). - evt->status = rv; + evt->status = nni_win_error(rv); evt->count = 0; return (1); } @@ -468,9 +453,7 @@ nni_win_ipc_conn_thr(void *arg) ((rv = nni_win_iocp_register(p)) != 0)) { goto fail; } - if (rv = nni_aio_finish_pipe(aio, 0, pipe) != 0) { - nni_plat_ipc_pipe_fini(pipe); - } + nni_aio_finish_pipe(aio, pipe); continue; fail: @@ -481,7 +464,7 @@ nni_win_ipc_conn_thr(void *arg) if (pipe != NULL) { nni_plat_ipc_pipe_fini(pipe); } - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); } if (nni_list_empty(&w->waiters)) { @@ -496,16 +479,19 @@ nni_win_ipc_conn_thr(void *arg) } static void -nni_win_ipc_conn_cancel(nni_aio *aio) +nni_win_ipc_conn_cancel(nni_aio *aio, int rv) { nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; nni_plat_ipc_ep * ep = aio->a_prov_data; nni_mtx_lock(&w->mtx); - ep->con_aio = NULL; - if (nni_list_active(&w->waiters, ep)) { - nni_list_remove(&w->waiters, ep); - nni_cv_wake(&w->cv); + if (aio == ep->con_aio) { + ep->con_aio = NULL; + if (nni_list_active(&w->waiters, ep)) { + nni_list_remove(&w->waiters, ep); + nni_cv_wake(&w->cv); + } + nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&w->mtx); } @@ -556,7 +542,7 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) } if ((aio = ep->con_aio) != NULL) { ep->con_aio = NULL; - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); } nni_mtx_unlock(&w->mtx); break; diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c index 633dd256..df6275ff 100644 --- a/src/platform/windows/win_net.c +++ b/src/platform/windows/win_net.c @@ -144,7 +144,7 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) } if ((s = pipe->s) == INVALID_SOCKET) { - evt->status = ERROR_INVALID_HANDLE; + evt->status = NNG_ECLOSED; evt->count = 0; return (1); } @@ -163,7 +163,7 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) if ((rv == SOCKET_ERROR) && ((rv = GetLastError()) != ERROR_IO_PENDING)) { // Synchronous failure. - evt->status = rv; + evt->status = nni_win_error(rv); evt->count = 0; return (1); } @@ -179,13 +179,7 @@ nni_win_tcp_pipe_cancel(nni_win_event *evt) { nni_plat_tcp_pipe *pipe = evt->ptr; - if (CancelIoEx((HANDLE) pipe->s, &evt->olpd)) { - DWORD cnt; - - // If we canceled, make sure that we've completely - // finished with the overlapped. - GetOverlappedResult((HANDLE) pipe->s, &evt->olpd, &cnt, TRUE); - } + (void) CancelIoEx((HANDLE) pipe->s, &evt->olpd); } static void @@ -228,7 +222,7 @@ nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio) } // All done; hopefully successfully. - nni_aio_finish(aio, nni_win_error(rv), aio->a_count); + nni_aio_finish(aio, rv, aio->a_count); } static int @@ -507,12 +501,8 @@ nni_win_tcp_acc_cancel(nni_win_event *evt) nni_plat_tcp_ep *ep = evt->ptr; SOCKET s = ep->s; - if ((s != INVALID_SOCKET) && CancelIoEx((HANDLE) s, &evt->olpd)) { - DWORD cnt; - - // If we canceled, make sure that we've completely - // finished with the overlapped. - GetOverlappedResult((HANDLE) s, &evt->olpd, &cnt, TRUE); + if (s != INVALID_SOCKET) { + CancelIoEx((HANDLE) s, &evt->olpd); } } @@ -531,22 +521,15 @@ nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio) return; } - if ((rv = evt->status) != 0) { - closesocket(s); - nni_aio_finish(aio, nni_win_error(rv), 0); - return; - } - - if (((rv = nni_win_iocp_register((HANDLE) s)) != 0) || + if (((rv = evt->status) != 0) || + ((rv = nni_win_iocp_register((HANDLE) s)) != 0) || ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { closesocket(s); - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); return; } - if (nni_aio_finish_pipe(aio, 0, pipe) != 0) { - nni_plat_tcp_pipe_fini(pipe); - } + nni_aio_finish_pipe(aio, pipe); } static int @@ -559,7 +542,7 @@ nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) acc_s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); if (acc_s == INVALID_SOCKET) { - evt->status = GetLastError(); + evt->status = nni_win_error(GetLastError()); evt->count = 0; return (1); } @@ -575,7 +558,7 @@ nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) default: // Fast-fail (synchronous). - evt->status = rv; + evt->status = nni_win_error(rv); evt->count = 0; return (1); } @@ -599,12 +582,8 @@ nni_win_tcp_con_cancel(nni_win_event *evt) nni_plat_tcp_ep *ep = evt->ptr; SOCKET s = ep->s; - if ((s != INVALID_SOCKET) && CancelIoEx((HANDLE) s, &evt->olpd)) { - DWORD cnt; - - // If we canceled, make sure that we've completely - // finished with the overlapped. - GetOverlappedResult((HANDLE) s, &evt->olpd, &cnt, TRUE); + if (s != INVALID_SOCKET) { + CancelIoEx((HANDLE) s, &evt->olpd); } } @@ -619,19 +598,14 @@ nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio) s = ep->s; ep->s = INVALID_SOCKET; - if ((rv = evt->status) != 0) { - closesocket(s); - nni_aio_finish(aio, nni_win_error(rv), 0); - return; - } - // The socket was already registere with the IOCP. - if ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0) { + if (((rv = evt->status) != 0) || + ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { // The new pipe is already fine for us. Discard // the old one, since failed to be able to use it. closesocket(s); - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); return; } @@ -650,7 +624,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); if (s == INVALID_SOCKET) { - evt->status = GetLastError(); + evt->status = nni_win_error(GetLastError()); evt->count = 0; return (1); } @@ -667,7 +641,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) len = ep->remlen; } if (bind(s, (struct sockaddr *) &bss, len) < 0) { - evt->status = GetLastError(); + evt->status = nni_win_error(GetLastError()); evt->count = 0; closesocket(s); return (1); @@ -687,7 +661,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) if ((rv = GetLastError()) != ERROR_IO_PENDING) { closesocket(s); ep->s = INVALID_SOCKET; - evt->status = rv; + evt->status = nni_win_error(rv); evt->count = 0; return (1); } diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index 44d00c34..a01dc123 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -30,13 +30,13 @@ static nni_mtx nni_win_resolv_mtx; typedef struct nni_win_resolv_item nni_win_resolv_item; struct nni_win_resolv_item { - int family; - int passive; - const char * name; - const char * serv; - int proto; - nni_aio * aio; - nni_taskq_ent tqe; + int family; + int passive; + const char *name; + const char *serv; + int proto; + nni_aio * aio; + nni_task task; }; static void @@ -50,7 +50,7 @@ nni_win_resolv_finish(nni_win_resolv_item *item, int rv) } static void -nni_win_resolv_cancel(nni_aio *aio) +nni_win_resolv_cancel(nni_aio *aio, int rv) { nni_win_resolv_item *item; @@ -61,8 +61,9 @@ nni_win_resolv_cancel(nni_aio *aio) } aio->a_prov_data = NULL; nni_mtx_unlock(&nni_win_resolv_mtx); - nni_taskq_cancel(nni_win_resolv_tq, &item->tqe); + nni_task_cancel(&item->task); NNI_FREE_STRUCT(item); + nni_aio_finish_error(aio, rv); } static int @@ -209,7 +210,8 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family, return; } - nni_taskq_ent_init(&item->tqe, nni_win_resolv_task, item); + nni_task_init( + nni_win_resolv_tq, &item->task, nni_win_resolv_task, item); switch (family) { case NNG_AF_INET: @@ -236,11 +238,7 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family, NNI_FREE_STRUCT(item); return; } - if ((rv = nni_taskq_dispatch(nni_win_resolv_tq, &item->tqe)) != 0) { - nni_win_resolv_finish(item, rv); - nni_mtx_unlock(&nni_win_resolv_mtx); - return; - } + nni_task_dispatch(&item->task); nni_mtx_unlock(&nni_win_resolv_mtx); } |
