aboutsummaryrefslogtreecommitdiff
path: root/src/platform/windows
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-04 17:17:42 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-04 21:20:00 -0700
commitdc334d7193a2a0bc0194221b853a37e1be7f5b9a (patch)
tree1eebf2773745a3a25e8a071fbe4f51cd5490d4e4 /src/platform/windows
parent6887900ae033add30ee0151b72abe927c5239588 (diff)
downloadnng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.gz
nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.bz2
nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.zip
Refactor AIO logic to close numerous races and reduce complexity.
This passes valgrind 100% clean for both helgrind and deep leak checks. This represents a complete rethink of how the AIOs work, and much simpler synchronization; the provider API is a bit simpler to boot, as a number of failure modes have been simply eliminated. While here a few other minor bugs were squashed.
Diffstat (limited to 'src/platform/windows')
-rw-r--r--src/platform/windows/win_impl.h9
-rw-r--r--src/platform/windows/win_iocp.c101
-rw-r--r--src/platform/windows/win_ipc.c58
-rw-r--r--src/platform/windows/win_net.c66
-rw-r--r--src/platform/windows/win_resolv.c28
5 files changed, 103 insertions, 159 deletions
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index a77fcf0b..a2700485 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -65,17 +66,13 @@ struct nni_win_event {
nni_aio * aio;
nni_mtx mtx;
nni_cv cv;
- int flags;
+ unsigned run : 1;
+ unsigned fini : 1;
int count;
int status;
nni_win_event_ops ops;
};
-enum nni_win_event_flags {
- NNI_WIN_EVENT_RUNNING = 1,
- NNI_WIN_EVENT_ABORT = 2,
-};
-
extern int nni_win_error(int);
extern int nni_win_event_init(nni_win_event *, nni_win_event_ops *, void *);
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index df0357c8..c4cdcb8a 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -15,7 +15,7 @@
#define NNI_WIN_IOCP_NTHREADS 4
#include <stdio.h>
-// Windows IO Completion Port support. We basically creaet a single
+// Windows IO Completion Port support. We basically create a single
// IO completion port, then start threads on it. Handles are added
// to the port on an as needed basis. We use a single IO completion
// port for pretty much everything.
@@ -25,6 +25,18 @@ static nni_thr nni_win_iocp_thrs[NNI_WIN_IOCP_NTHREADS];
static nni_mtx nni_win_iocp_mtx;
static void
+nni_win_event_finish(nni_win_event *evt, nni_aio *aio)
+{
+ evt->run = 0;
+ if (aio != NULL) {
+ evt->ops.wev_finish(evt, aio);
+ }
+ if (evt->fini) {
+ nni_cv_wake(&evt->cv);
+ }
+}
+
+static void
nni_win_iocp_handler(void *arg)
{
HANDLE iocp;
@@ -59,42 +71,30 @@ nni_win_iocp_handler(void *arg)
if (ok) {
rv = ERROR_SUCCESS;
- } else {
- rv = GetLastError();
+ } else if (evt->status == 0) {
+ evt->status = nni_win_error(GetLastError());
}
- aio = evt->aio;
- evt->aio = NULL;
- evt->status = rv;
- evt->count = cnt;
-
- // Aborted operations don't get the finish callback done.
- // All others do.
- evt->flags &= ~NNI_WIN_EVENT_RUNNING;
- if (evt->flags & NNI_WIN_EVENT_ABORT) {
- nni_cv_wake(&evt->cv);
- } else if ((rv != ERROR_OPERATION_ABORTED) && (aio != NULL)) {
- evt->ops.wev_finish(evt, aio);
- }
+ aio = evt->aio;
+ evt->aio = NULL;
+ evt->count = cnt;
+
+ nni_win_event_finish(evt, aio);
nni_mtx_unlock(&evt->mtx);
}
}
static void
-nni_win_event_cancel(nni_aio *aio)
+nni_win_event_cancel(nni_aio *aio, int rv)
{
nni_win_event *evt = aio->a_prov_data;
nni_mtx_lock(&evt->mtx);
- evt->flags |= NNI_WIN_EVENT_ABORT;
- evt->aio = NULL;
-
- // Use provider specific cancellation.
- evt->ops.wev_cancel(evt);
+ if (evt->aio == aio) {
+ evt->status = rv;
- // Wait for everything to stop referencing this.
- while (evt->flags & NNI_WIN_EVENT_RUNNING) {
- nni_cv_wait(&evt->cv);
+ // Use provider specific cancellation.
+ evt->ops.wev_cancel(evt);
}
nni_mtx_unlock(&evt->mtx);
}
@@ -107,28 +107,28 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
// The lock is held.
// Abort operation -- no further activity.
- if (evt->flags & NNI_WIN_EVENT_ABORT) {
+ if (evt->fini) {
+ evt->run = 0;
+ nni_cv_wake(&evt->cv);
return;
}
- evt->status = ERROR_SUCCESS;
+ evt->status = 0;
evt->count = 0;
if (!ResetEvent(evt->olpd.hEvent)) {
- evt->status = GetLastError();
+ evt->status = nni_win_error(GetLastError());
evt->count = 0;
-
- evt->ops.wev_finish(evt, aio);
+ nni_win_event_finish(evt, aio);
return;
}
evt->aio = aio;
- evt->flags |= NNI_WIN_EVENT_RUNNING;
+ evt->run = 1;
if (evt->ops.wev_start(evt, aio) != 0) {
// Start completed synchronously. It will have stored
// the count and status in the evt.
- evt->flags &= ~NNI_WIN_EVENT_RUNNING;
evt->aio = NULL;
- evt->ops.wev_finish(evt, aio);
+ nni_win_event_finish(evt, aio);
}
}
@@ -154,20 +154,10 @@ nni_win_event_complete(nni_win_event *evt, int cnt)
void
nni_win_event_close(nni_win_event *evt)
{
- nni_aio *aio;
-
if (evt->ptr != NULL) {
nni_mtx_lock(&evt->mtx);
- evt->flags |= NNI_WIN_EVENT_ABORT;
+ evt->status = NNG_ECLOSED;
evt->ops.wev_cancel(evt);
- if ((aio = evt->aio) != NULL) {
- evt->aio = NULL;
- // We really don't care if we transferred data or not.
- // The caller indicates they have closed the pipe.
- evt->status = ERROR_INVALID_HANDLE;
- evt->count = 0;
- evt->ops.wev_finish(evt, aio);
- }
nni_mtx_unlock(&evt->mtx);
}
}
@@ -195,28 +185,27 @@ nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr)
((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) {
return (rv); // NB: This will never happen on Windows.
}
- evt->ops = *ops;
- evt->aio = NULL;
- evt->ptr = ptr;
+ evt->ops = *ops;
+ evt->aio = NULL;
+ evt->ptr = ptr;
+ evt->fini = 0;
+ evt->run = 0;
return (0);
}
void
nni_win_event_fini(nni_win_event *evt)
{
- nni_aio *aio;
-
if (evt->ptr != NULL) {
nni_mtx_lock(&evt->mtx);
- if ((aio = evt->aio) != NULL) {
- evt->flags |= NNI_WIN_EVENT_ABORT;
- evt->aio = NULL;
- // Use provider specific cancellation.
- evt->ops.wev_cancel(evt);
- }
+ evt->fini = 1;
+
+ // Use provider specific cancellation.
+ evt->ops.wev_cancel(evt);
+
// Wait for everything to stop referencing this.
- while (evt->flags & NNI_WIN_EVENT_RUNNING) {
+ while (evt->run) {
nni_cv_wait(&evt->cv);
}
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index bd9ce26d..c9eb20ec 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -65,7 +65,7 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
if (pipe->p == INVALID_HANDLE_VALUE) {
- evt->status = ERROR_INVALID_HANDLE;
+ evt->status = NNG_ECLOSED;
evt->count = 0;
return (1);
}
@@ -92,7 +92,7 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
}
if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
- evt->status = rv;
+ evt->status = nni_win_error(rv);
evt->count = 0;
return (1);
}
@@ -108,13 +108,7 @@ nni_win_ipc_pipe_cancel(nni_win_event *evt)
{
nni_plat_ipc_pipe *pipe = evt->ptr;
- if (CancelIoEx(pipe->p, &evt->olpd)) {
- DWORD cnt;
-
- // If we canceled, make sure that we've completely
- // finished with the overlapped.
- GetOverlappedResult(pipe->p, &evt->olpd, &cnt, TRUE);
- }
+ CancelIoEx(pipe->p, &evt->olpd);
}
static void
@@ -146,7 +140,7 @@ nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio)
}
// All done; hopefully successfully.
- nni_aio_finish(aio, nni_win_error(rv), aio->a_count);
+ nni_aio_finish(aio, rv, aio->a_count);
}
static int
@@ -294,7 +288,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
HANDLE newp, oldp;
if ((rv = evt->status) != 0) {
- nni_aio_finish(aio, nni_win_error(rv), 0);
+ nni_aio_finish_error(aio, rv);
return;
}
@@ -308,7 +302,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
// We connected, but as we cannot get a new pipe,
// we have to disconnect the old one.
DisconnectNamedPipe(ep->p);
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
return;
}
if ((rv = nni_win_iocp_register(newp)) != 0) {
@@ -317,7 +311,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
// And discard the half-baked new one.
DisconnectNamedPipe(newp);
(void) CloseHandle(newp);
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
return;
}
@@ -329,14 +323,11 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
// the old one, since failed to be able to use it.
DisconnectNamedPipe(oldp);
(void) CloseHandle(oldp);
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
return;
}
- // What if the pipe is already finished?
- if (nni_aio_finish_pipe(aio, 0, pipe) != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- }
+ nni_aio_finish_pipe(aio, pipe);
}
static void
@@ -344,13 +335,7 @@ nni_win_ipc_acc_cancel(nni_win_event *evt)
{
nni_plat_ipc_ep *ep = evt->ptr;
- if (CancelIoEx(ep->p, &evt->olpd)) {
- DWORD cnt;
-
- // If we canceled, make sure that we've completely
- // finished with the overlapped.
- GetOverlappedResult(ep->p, &evt->olpd, &cnt, TRUE);
- }
+ (void) CancelIoEx(ep->p, &evt->olpd);
// Just to be sure.
(void) DisconnectNamedPipe(ep->p);
}
@@ -376,7 +361,7 @@ nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio)
default:
// Fast-fail (synchronous).
- evt->status = rv;
+ evt->status = nni_win_error(rv);
evt->count = 0;
return (1);
}
@@ -468,9 +453,7 @@ nni_win_ipc_conn_thr(void *arg)
((rv = nni_win_iocp_register(p)) != 0)) {
goto fail;
}
- if (rv = nni_aio_finish_pipe(aio, 0, pipe) != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- }
+ nni_aio_finish_pipe(aio, pipe);
continue;
fail:
@@ -481,7 +464,7 @@ nni_win_ipc_conn_thr(void *arg)
if (pipe != NULL) {
nni_plat_ipc_pipe_fini(pipe);
}
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
}
if (nni_list_empty(&w->waiters)) {
@@ -496,16 +479,19 @@ nni_win_ipc_conn_thr(void *arg)
}
static void
-nni_win_ipc_conn_cancel(nni_aio *aio)
+nni_win_ipc_conn_cancel(nni_aio *aio, int rv)
{
nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
nni_plat_ipc_ep * ep = aio->a_prov_data;
nni_mtx_lock(&w->mtx);
- ep->con_aio = NULL;
- if (nni_list_active(&w->waiters, ep)) {
- nni_list_remove(&w->waiters, ep);
- nni_cv_wake(&w->cv);
+ if (aio == ep->con_aio) {
+ ep->con_aio = NULL;
+ if (nni_list_active(&w->waiters, ep)) {
+ nni_list_remove(&w->waiters, ep);
+ nni_cv_wake(&w->cv);
+ }
+ nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&w->mtx);
}
@@ -556,7 +542,7 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
}
if ((aio = ep->con_aio) != NULL) {
ep->con_aio = NULL;
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
nni_mtx_unlock(&w->mtx);
break;
diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c
index 633dd256..df6275ff 100644
--- a/src/platform/windows/win_net.c
+++ b/src/platform/windows/win_net.c
@@ -144,7 +144,7 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio)
}
if ((s = pipe->s) == INVALID_SOCKET) {
- evt->status = ERROR_INVALID_HANDLE;
+ evt->status = NNG_ECLOSED;
evt->count = 0;
return (1);
}
@@ -163,7 +163,7 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio)
if ((rv == SOCKET_ERROR) &&
((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
- evt->status = rv;
+ evt->status = nni_win_error(rv);
evt->count = 0;
return (1);
}
@@ -179,13 +179,7 @@ nni_win_tcp_pipe_cancel(nni_win_event *evt)
{
nni_plat_tcp_pipe *pipe = evt->ptr;
- if (CancelIoEx((HANDLE) pipe->s, &evt->olpd)) {
- DWORD cnt;
-
- // If we canceled, make sure that we've completely
- // finished with the overlapped.
- GetOverlappedResult((HANDLE) pipe->s, &evt->olpd, &cnt, TRUE);
- }
+ (void) CancelIoEx((HANDLE) pipe->s, &evt->olpd);
}
static void
@@ -228,7 +222,7 @@ nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio)
}
// All done; hopefully successfully.
- nni_aio_finish(aio, nni_win_error(rv), aio->a_count);
+ nni_aio_finish(aio, rv, aio->a_count);
}
static int
@@ -507,12 +501,8 @@ nni_win_tcp_acc_cancel(nni_win_event *evt)
nni_plat_tcp_ep *ep = evt->ptr;
SOCKET s = ep->s;
- if ((s != INVALID_SOCKET) && CancelIoEx((HANDLE) s, &evt->olpd)) {
- DWORD cnt;
-
- // If we canceled, make sure that we've completely
- // finished with the overlapped.
- GetOverlappedResult((HANDLE) s, &evt->olpd, &cnt, TRUE);
+ if (s != INVALID_SOCKET) {
+ CancelIoEx((HANDLE) s, &evt->olpd);
}
}
@@ -531,22 +521,15 @@ nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio)
return;
}
- if ((rv = evt->status) != 0) {
- closesocket(s);
- nni_aio_finish(aio, nni_win_error(rv), 0);
- return;
- }
-
- if (((rv = nni_win_iocp_register((HANDLE) s)) != 0) ||
+ if (((rv = evt->status) != 0) ||
+ ((rv = nni_win_iocp_register((HANDLE) s)) != 0) ||
((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) {
closesocket(s);
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
return;
}
- if (nni_aio_finish_pipe(aio, 0, pipe) != 0) {
- nni_plat_tcp_pipe_fini(pipe);
- }
+ nni_aio_finish_pipe(aio, pipe);
}
static int
@@ -559,7 +542,7 @@ nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio)
acc_s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
if (acc_s == INVALID_SOCKET) {
- evt->status = GetLastError();
+ evt->status = nni_win_error(GetLastError());
evt->count = 0;
return (1);
}
@@ -575,7 +558,7 @@ nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio)
default:
// Fast-fail (synchronous).
- evt->status = rv;
+ evt->status = nni_win_error(rv);
evt->count = 0;
return (1);
}
@@ -599,12 +582,8 @@ nni_win_tcp_con_cancel(nni_win_event *evt)
nni_plat_tcp_ep *ep = evt->ptr;
SOCKET s = ep->s;
- if ((s != INVALID_SOCKET) && CancelIoEx((HANDLE) s, &evt->olpd)) {
- DWORD cnt;
-
- // If we canceled, make sure that we've completely
- // finished with the overlapped.
- GetOverlappedResult((HANDLE) s, &evt->olpd, &cnt, TRUE);
+ if (s != INVALID_SOCKET) {
+ CancelIoEx((HANDLE) s, &evt->olpd);
}
}
@@ -619,19 +598,14 @@ nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio)
s = ep->s;
ep->s = INVALID_SOCKET;
- if ((rv = evt->status) != 0) {
- closesocket(s);
- nni_aio_finish(aio, nni_win_error(rv), 0);
- return;
- }
-
// The socket was already registere with the IOCP.
- if ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0) {
+ if (((rv = evt->status) != 0) ||
+ ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) {
// The new pipe is already fine for us. Discard
// the old one, since failed to be able to use it.
closesocket(s);
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
return;
}
@@ -650,7 +624,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio)
s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
if (s == INVALID_SOCKET) {
- evt->status = GetLastError();
+ evt->status = nni_win_error(GetLastError());
evt->count = 0;
return (1);
}
@@ -667,7 +641,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio)
len = ep->remlen;
}
if (bind(s, (struct sockaddr *) &bss, len) < 0) {
- evt->status = GetLastError();
+ evt->status = nni_win_error(GetLastError());
evt->count = 0;
closesocket(s);
return (1);
@@ -687,7 +661,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio)
if ((rv = GetLastError()) != ERROR_IO_PENDING) {
closesocket(s);
ep->s = INVALID_SOCKET;
- evt->status = rv;
+ evt->status = nni_win_error(rv);
evt->count = 0;
return (1);
}
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index 44d00c34..a01dc123 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -30,13 +30,13 @@ static nni_mtx nni_win_resolv_mtx;
typedef struct nni_win_resolv_item nni_win_resolv_item;
struct nni_win_resolv_item {
- int family;
- int passive;
- const char * name;
- const char * serv;
- int proto;
- nni_aio * aio;
- nni_taskq_ent tqe;
+ int family;
+ int passive;
+ const char *name;
+ const char *serv;
+ int proto;
+ nni_aio * aio;
+ nni_task task;
};
static void
@@ -50,7 +50,7 @@ nni_win_resolv_finish(nni_win_resolv_item *item, int rv)
}
static void
-nni_win_resolv_cancel(nni_aio *aio)
+nni_win_resolv_cancel(nni_aio *aio, int rv)
{
nni_win_resolv_item *item;
@@ -61,8 +61,9 @@ nni_win_resolv_cancel(nni_aio *aio)
}
aio->a_prov_data = NULL;
nni_mtx_unlock(&nni_win_resolv_mtx);
- nni_taskq_cancel(nni_win_resolv_tq, &item->tqe);
+ nni_task_cancel(&item->task);
NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
}
static int
@@ -209,7 +210,8 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
return;
}
- nni_taskq_ent_init(&item->tqe, nni_win_resolv_task, item);
+ nni_task_init(
+ nni_win_resolv_tq, &item->task, nni_win_resolv_task, item);
switch (family) {
case NNG_AF_INET:
@@ -236,11 +238,7 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
NNI_FREE_STRUCT(item);
return;
}
- if ((rv = nni_taskq_dispatch(nni_win_resolv_tq, &item->tqe)) != 0) {
- nni_win_resolv_finish(item, rv);
- nni_mtx_unlock(&nni_win_resolv_mtx);
- return;
- }
+ nni_task_dispatch(&item->task);
nni_mtx_unlock(&nni_win_resolv_mtx);
}