aboutsummaryrefslogtreecommitdiff
path: root/src/platform/windows/win_ipc.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-11 22:59:38 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-11 22:59:38 -0700
commit8741c4421ec7a5e889c05a3d7dd46feee93ddf9a (patch)
tree9024d46ff202b065c67c2ea75ee5e43417ce4cdb /src/platform/windows/win_ipc.c
parent183bd7e02c81bc09c17c6f4c0d3883d4d45221fc (diff)
downloadnng-8741c4421ec7a5e889c05a3d7dd46feee93ddf9a.tar.gz
nng-8741c4421ec7a5e889c05a3d7dd46feee93ddf9a.tar.bz2
nng-8741c4421ec7a5e889c05a3d7dd46feee93ddf9a.zip
Windows IPC working, mostly.
The IOCP code has been refactored to improve reuse, and hopefully will be easier to use with TCP now. Windows IPC using Named Pipes is mostly working -- mostly because there is a gnarly close-race. It seems that we need to take some more care to ensure that the pipe is not released while requests may be outstanding -- so some deeper synchronization between the IOCP callback logic and the win_event code is needed. In short, we need to add a condvar to the event, and notice when we have submitted work for async completion, and make sure we flag the event "idle" after either completion or cancellation of the event.
Diffstat (limited to 'src/platform/windows/win_ipc.c')
-rw-r--r--src/platform/windows/win_ipc.c619
1 files changed, 201 insertions, 418 deletions
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index 237ded78..be1a98a1 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -13,19 +13,10 @@
#include <stdio.h>
-static void nni_win_ipc_acc_cb(void *);
-static void nni_win_ipc_send_cb(void *);
-static void nni_win_ipc_recv_cb(void *);
-static void nni_win_ipc_send_start(nni_plat_ipc_pipe *);
-static void nni_win_ipc_recv_start(nni_plat_ipc_pipe *);
-
struct nni_plat_ipc_pipe {
HANDLE p;
- nni_win_event recv_evt;
- nni_win_event send_evt;
- nni_mtx mtx;
- nni_list readq;
- nni_list writeq;
+ nni_win_event rcv_ev;
+ nni_win_event snd_ev;
};
struct nni_plat_ipc_ep {
@@ -34,118 +25,49 @@ struct nni_plat_ipc_ep {
int started;
nni_list aios;
HANDLE p; // accept side only
- nni_win_event acc_evt; // accept side only
- nni_mtx mtx; // accept side only
+ nni_win_event acc_ev; // accept side only
+ nni_aio * con_aio; // conn side only
nni_list_node node; // conn side uses this
};
-static int
-nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p)
-{
- nni_plat_ipc_pipe *pipe;
- int rv;
-
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
- NNI_FREE_STRUCT(pipe);
- return (rv);
- }
- rv = nni_win_event_init(&pipe->recv_evt, nni_win_ipc_recv_cb, pipe, p);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
- rv = nni_win_event_init(&pipe->send_evt, nni_win_ipc_send_cb, pipe, p);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
-
- pipe->p = p;
- nni_aio_list_init(&pipe->readq);
- nni_aio_list_init(&pipe->writeq);
- *pipep = pipe;
- return (0);
-}
-
-static void
-nni_win_ipc_send_cancel(nni_aio *aio)
-{
- nni_plat_ipc_pipe *pipe = aio->a_prov_data;
-
- nni_mtx_lock(&pipe->mtx);
- nni_win_event_cancel(&pipe->recv_evt);
- nni_aio_list_remove(aio);
- nni_mtx_unlock(&pipe->mtx);
-}
-
-static void
-nni_win_ipc_send_finish(nni_plat_ipc_pipe *pipe)
-{
- nni_win_event *evt = &pipe->send_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- int rv = 0;
- nni_aio * aio;
- DWORD cnt;
-
- if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) {
- rv = nni_win_error(GetLastError());
- }
- if ((aio = nni_list_first(&pipe->writeq)) == NULL) {
- // If the AIO was canceled, but IOCP thread was still
- // working on it, we might have seen this.
- return;
- }
- if (rv == 0) {
- NNI_ASSERT(cnt <= aio->a_iov[0].iov_len);
- aio->a_count += cnt;
- aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt;
- aio->a_iov[0].iov_len -= cnt;
+static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *);
+static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *);
+static void nni_win_ipc_pipe_cancel(nni_win_event *, nni_aio *);
- if (aio->a_iov[0].iov_len == 0) {
- int i;
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
- aio->a_niov--;
- }
+static nni_win_event_ops nni_win_ipc_pipe_ops = {
+ .wev_start = nni_win_ipc_pipe_start,
+ .wev_finish = nni_win_ipc_pipe_finish,
+ .wev_cancel = nni_win_ipc_pipe_cancel,
+};
- if (aio->a_niov > 0) {
- // If we have more to do, submit it!
- nni_win_ipc_send_start(pipe);
- return;
- }
- }
+static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *);
+static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *);
+static void nni_win_ipc_acc_cancel(nni_win_event *, nni_aio *);
- // All done; hopefully successfully.
- nni_list_remove(&pipe->writeq, aio);
- nni_aio_finish(aio, rv, aio->a_count);
-}
+static nni_win_event_ops nni_win_ipc_acc_ops = {
+ .wev_start = nni_win_ipc_acc_start,
+ .wev_finish = nni_win_ipc_acc_finish,
+ .wev_cancel = nni_win_ipc_acc_cancel,
+};
-static void
-nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe)
+static int
+nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
{
- void * buf;
- DWORD len;
- int rv;
- nni_win_event *evt = &pipe->send_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- nni_aio * aio = nni_list_first(&pipe->writeq);
+ void * buf;
+ DWORD len;
+ BOOL ok;
+ int rv;
+ nni_plat_ipc_pipe *pipe = evt->ptr;
NNI_ASSERT(aio != NULL);
NNI_ASSERT(aio->a_niov > 0);
NNI_ASSERT(aio->a_iov[0].iov_len > 0);
NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
- if (pipe->p == INVALID_HANDLE_VALUE) {
- rv = NNG_ECLOSED;
- goto fail;
- }
-
- if ((rv = nni_win_event_reset(evt)) != 0) {
- goto fail;
+ if (evt->h == INVALID_HANDLE_VALUE) {
+ evt->status = ERROR_INVALID_HANDLE;
+ evt->count = 0;
+ return (1);
}
// Now start a writefile. We assume that only one send can be
@@ -153,9 +75,8 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe)
// scrambling the data anyway. Note that Windows named pipes do
// not appear to support scatter/gather, so we have to process
// each element in turn.
- buf = aio->a_iov[0].iov_buf;
- len = (DWORD) aio->a_iov[0].iov_len;
- olpd = nni_win_event_overlapped(evt);
+ buf = aio->a_iov[0].iov_buf;
+ len = (DWORD) aio->a_iov[0].iov_len;
// We limit ourselves to writing 16MB at a time. Named Pipes
// on Windows have limits of between 31 and 64MB.
@@ -163,95 +84,47 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe)
len = 0x1000000;
}
- if (!WriteFile(pipe->p, buf, len, NULL, olpd)) {
- // If we failed immediately, then process it.
- if ((rv = GetLastError()) == ERROR_IO_PENDING) {
- // This is the normal path we expect; the IO will
- // complete asynchronously.
- return;
- }
-
- // Some synchronous error occurred.
- rv = nni_win_error(rv);
- goto fail;
+ evt->count = 0;
+ if (evt == &pipe->snd_ev) {
+ ok = WriteFile(evt->h, buf, len, NULL, &evt->olpd);
+ } else {
+ ok = ReadFile(evt->h, buf, len, NULL, &evt->olpd);
+ }
+ if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ evt->status = GetLastError();
+ evt->count = 0;
+ return (1);
}
- // If we completed synchronously, then do the completion. This is
- // not normally expected.
- nni_win_ipc_send_finish(pipe);
- return;
-
-fail:
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, aio->a_count);
+ // Wait for the I/O completion event. Note that when an I/O
+ // completes immediately, the I/O completion packet is still
+ // delivered.
+ return (0);
}
static void
-nni_win_ipc_send_cb(void *arg)
+nni_win_ipc_pipe_cancel(nni_win_event *evt, nni_aio *aio)
{
- nni_plat_ipc_pipe *pipe = arg;
+ NNI_ARG_UNUSED(aio);
- nni_mtx_lock(&pipe->mtx);
- nni_win_ipc_send_finish(pipe);
- nni_mtx_unlock(&pipe->mtx);
-}
+ if (CancelIoEx(evt->h, &evt->olpd)) {
+ DWORD cnt;
-void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio)
-{
- nni_win_event *evt = &pipe->send_evt;
- int rv;
-
- nni_mtx_lock(&pipe->mtx);
- if ((rv = nni_aio_start(aio, nni_win_ipc_send_cancel, pipe)) != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
- if (pipe->p == INVALID_HANDLE_VALUE) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
-
- if ((rv = nni_win_event_reset(evt)) != 0) {
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
+ // If we canceled, make sure that we've completely
+ // finished with the overlapped.
+ GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE);
}
- nni_aio_list_append(&pipe->writeq, aio);
- nni_win_ipc_send_start(pipe);
- nni_mtx_unlock(&pipe->mtx);
}
static void
-nni_win_ipc_recv_cancel(nni_aio *aio)
+nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio)
{
- nni_plat_ipc_pipe *pipe = aio->a_prov_data;
+ int rv = 0;
+ DWORD cnt;
- nni_mtx_lock(&pipe->mtx);
- nni_win_event_cancel(&pipe->recv_evt);
- nni_aio_list_remove(aio);
- nni_mtx_unlock(&pipe->mtx);
-}
-
-static void
-nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe)
-{
- nni_win_event *evt = &pipe->recv_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- int rv = 0;
- nni_aio * aio;
- DWORD cnt;
-
- if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) {
- rv = nni_win_error(GetLastError());
- }
- if ((aio = nni_list_first(&pipe->readq)) == NULL) {
- // If the AIO was canceled, but IOCP thread was still
- // working on it, we might have seen this.
- return;
- }
- if (rv == 0) {
+ cnt = evt->count;
+ if ((rv = evt->status) == 0) {
NNI_ASSERT(cnt <= aio->a_iov[0].iov_len);
aio->a_count += cnt;
aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt;
@@ -259,133 +132,71 @@ nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe)
if (aio->a_iov[0].iov_len == 0) {
int i;
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
aio->a_niov--;
+ for (i = 0; i < aio->a_niov; i++) {
+ aio->a_iov[i] = aio->a_iov[i + 1];
+ }
}
if (aio->a_niov > 0) {
// If we have more to do, submit it!
- nni_win_ipc_recv_start(pipe);
+ nni_win_event_resubmit(evt, aio);
return;
}
}
// All done; hopefully successfully.
- nni_list_remove(&pipe->readq, aio);
- nni_aio_finish(aio, rv, aio->a_count);
+ nni_aio_finish(aio, nni_win_error(rv), aio->a_count);
}
-static void
-nni_win_ipc_recv_start(nni_plat_ipc_pipe *pipe)
+static int
+nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p)
{
- void * buf;
- DWORD len;
- int rv;
- nni_win_event *evt = &pipe->recv_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- nni_aio * aio = nni_list_first(&pipe->readq);
-
- NNI_ASSERT(aio != NULL);
- NNI_ASSERT(aio->a_niov > 0);
- NNI_ASSERT(aio->a_iov[0].iov_len > 0);
- NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
-
- if (pipe->p == INVALID_HANDLE_VALUE) {
- rv = NNG_ECLOSED;
- goto fail;
- }
+ nni_plat_ipc_pipe *pipe;
+ int rv;
- if ((rv = nni_win_event_reset(evt)) != 0) {
- goto fail;
+ if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ return (NNG_ENOMEM);
}
-
- // Now start a readfile. We assume that only one read can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes do
- // not appear to support scatter/gather, so we have to process
- // each element in turn.
- buf = aio->a_iov[0].iov_buf;
- len = (DWORD) aio->a_iov[0].iov_len;
- olpd = nni_win_event_overlapped(evt);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
+ rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe, p);
+ if (rv != 0) {
+ nni_plat_ipc_pipe_fini(pipe);
+ return (rv);
}
-
- if (!ReadFile(pipe->p, buf, len, NULL, olpd)) {
- // If we failed immediately, then process it.
- if ((rv = GetLastError()) == ERROR_IO_PENDING) {
- // This is the normal path we expect; the IO will
- // complete asynchronously.
- return;
- }
-
- // Some synchronous error occurred.
- rv = nni_win_error(rv);
- goto fail;
+ rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe, p);
+ if (rv != 0) {
+ nni_plat_ipc_pipe_fini(pipe);
+ return (rv);
}
- // If we completed synchronously, then do the completion. This is
- // not normally expected.
- nni_win_ipc_recv_finish(pipe);
- return;
-
-fail:
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, 0);
+ pipe->p = p;
+ *pipep = pipe;
+ return (0);
}
-static void
-nni_win_ipc_recv_cb(void *arg)
+void
+nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio)
{
- nni_plat_ipc_pipe *pipe = arg;
-
- nni_mtx_lock(&pipe->mtx);
- nni_win_ipc_recv_finish(pipe);
- nni_mtx_unlock(&pipe->mtx);
+ nni_win_event_submit(&pipe->snd_ev, aio);
}
void
nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio)
{
- nni_win_event *evt = &pipe->send_evt;
- int rv;
-
- nni_mtx_lock(&pipe->mtx);
- if ((rv = nni_aio_start(aio, nni_win_ipc_recv_cancel, pipe)) != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
- if (pipe->p == INVALID_HANDLE_VALUE) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
-
- if ((rv = nni_win_event_reset(evt)) != 0) {
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
- nni_aio_list_append(&pipe->readq, aio);
- nni_win_ipc_recv_start(pipe);
+ nni_win_event_submit(&pipe->rcv_ev, aio);
}
void
nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe)
{
- nni_mtx_lock(&pipe->mtx);
- if (pipe->p != INVALID_HANDLE_VALUE) {
- CloseHandle(pipe->p);
+ HANDLE p;
+
+ if ((p = pipe->p) != INVALID_HANDLE_VALUE) {
pipe->p = INVALID_HANDLE_VALUE;
+ CloseHandle(p);
}
- nni_win_event_cancel(&pipe->send_evt);
- nni_win_event_cancel(&pipe->recv_evt);
- nni_mtx_unlock(&pipe->mtx);
+ nni_win_event_close(&pipe->snd_ev);
+ nni_win_event_close(&pipe->rcv_ev);
}
void
@@ -393,9 +204,8 @@ nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe)
{
nni_plat_ipc_pipe_close(pipe);
- nni_win_event_fini(&pipe->send_evt);
- nni_win_event_fini(&pipe->recv_evt);
- nni_mtx_fini(&pipe->mtx);
+ nni_win_event_fini(&pipe->snd_ev);
+ nni_win_event_fini(&pipe->rcv_ev);
NNI_FREE_STRUCT(pipe);
}
@@ -404,7 +214,6 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode)
{
const char * path;
nni_plat_ipc_ep *ep;
- int rv;
if (strncmp(url, "ipc://", strlen("ipc://")) != 0) {
return (NNG_EADDRINVAL);
@@ -414,14 +223,9 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode)
return (NNG_ENOMEM);
}
ZeroMemory(ep, sizeof(ep));
- if ((rv = nni_mtx_init(&ep->mtx)) != 0) {
- NNI_FREE_STRUCT(ep);
- return (rv);
- }
ep->mode = mode;
NNI_LIST_NODE_INIT(&ep->node);
- nni_aio_list_init(&ep->aios);
(void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path);
@@ -435,13 +239,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
int rv;
HANDLE p;
- nni_mtx_lock(&ep->mtx);
if (ep->mode != NNI_EP_MODE_LISTEN) {
- nni_mtx_unlock(&ep->mtx);
return (NNG_EINVAL);
}
if (ep->started) {
- nni_mtx_unlock(&ep->mtx);
return (NNG_EBUSY);
}
@@ -461,7 +262,7 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
}
goto failed;
}
- rv = nni_win_event_init(&ep->acc_evt, nni_win_ipc_acc_cb, ep, p);
+ rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep, p);
if (rv != 0) {
goto failed;
}
@@ -472,12 +273,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
ep->p = p;
ep->started = 1;
- nni_mtx_unlock(&ep->mtx);
return (0);
failed:
- nni_mtx_unlock(&ep->mtx);
if (p != INVALID_HANDLE_VALUE) {
(void) CloseHandle(p);
}
@@ -486,40 +285,15 @@ failed:
}
static void
-nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep)
+nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
{
- nni_win_event * evt = &ep->acc_evt;
- DWORD nbytes;
- int rv;
+ nni_plat_ipc_ep * ep = evt->ptr;
nni_plat_ipc_pipe *pipe;
- nni_aio * aio;
+ int rv;
HANDLE newp, oldp;
- // Note: This should be called with the ep lock held, and only when
- // the ConnectNamedPipe has finished.
-
- rv = 0;
- if (!GetOverlappedResult(ep->p, &evt->olpd, &nbytes, FALSE)) {
- if ((rv = GetLastError()) == ERROR_IO_INCOMPLETE) {
- // We should never be here normally, but if the
- // pipe got accepted by another client we can
- // some times race here.
- return;
- }
- }
-
- if ((aio = nni_list_first(&ep->aios)) == NULL) {
- // No completion available to us.
- if (rv == 0) {
- NNI_ASSERT(0);
- DisconnectNamedPipe(ep->p);
- }
- return;
- }
-
- nni_list_remove(&ep->aios, aio);
- if (rv != 0) {
- nni_aio_finish(aio, rv, 0);
+ if ((rv = evt->status) != 0) {
+ nni_aio_finish(aio, nni_win_error(rv), 0);
return;
}
@@ -530,14 +304,31 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep)
PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, NULL);
if (newp == INVALID_HANDLE_VALUE) {
rv = nni_win_error(GetLastError());
+ // We connected, but as we cannot get a new pipe,
+ // we have to disconnect the old one.
DisconnectNamedPipe(ep->p);
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+ if ((rv = nni_win_iocp_register(newp)) != 0) {
+ // Disconnect the old pipe.
+ DisconnectNamedPipe(ep->p);
+ // And discard the half-baked new one.
+ DisconnectNamedPipe(newp);
+ (void) CloseHandle(newp);
+ nni_aio_finish(aio, rv, 0);
return;
}
- oldp = ep->p;
- ep->p = newp;
+
+ oldp = ep->p;
+ ep->p = newp;
+ evt->h = newp;
if ((rv = nni_win_ipc_pipe_init(&pipe, oldp)) != 0) {
+ // The new pipe is already fine for us. Discard
+ // the old one, since failed to be able to use it.
DisconnectNamedPipe(oldp);
+ (void) CloseHandle(oldp);
nni_aio_finish(aio, rv, 0);
return;
}
@@ -547,64 +338,56 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep)
}
static void
-nni_win_ipc_acc_cb(void *arg)
+nni_win_ipc_acc_cancel(nni_win_event *evt, nni_aio *aio)
{
- nni_plat_ipc_ep *ep = arg;
-
- nni_mtx_lock(&ep->mtx);
- nni_win_ipc_acc_finish(ep);
- nni_mtx_unlock(&ep->mtx);
-}
+ NNI_ARG_UNUSED(aio);
-static void
-nni_win_ipc_acc_cancel(nni_aio *aio)
-{
- nni_plat_ipc_ep *ep = aio->a_prov_data;
+ if (CancelIoEx(evt->h, &evt->olpd)) {
+ DWORD cnt;
- nni_mtx_lock(&ep->mtx);
- nni_win_event_cancel(&ep->acc_evt);
- nni_aio_list_remove(aio);
- nni_mtx_unlock(&ep->mtx);
+ // If we canceled, make sure that we've completely
+ // finished with the overlapped.
+ GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE);
+ }
+ // Just to be sure.
+ (void) DisconnectNamedPipe(evt->h);
}
-void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
+static int
+nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio)
{
- int rv;
- nni_win_event *evt = &ep->acc_evt;
-
- nni_mtx_lock(&ep->mtx);
- if (nni_aio_start(aio, nni_win_ipc_acc_cancel, ep) != 0) {
- nni_mtx_unlock(&ep->mtx);
- return;
- }
- rv = 0;
- if ((rv = nni_win_event_reset(evt)) != 0) {
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&ep->mtx);
- return;
- }
- if (!ConnectNamedPipe(ep->p, nni_win_event_overlapped(evt))) {
- rv = GetLastError();
+ if (!ConnectNamedPipe(evt->h, &evt->olpd)) {
+ int rv = GetLastError();
switch (rv) {
case ERROR_PIPE_CONNECTED:
- rv = 0;
- break;
+ // Synch completion already occurred.
+ // Windows is weird. Apparently the I/O
+ // completion packet has already been sent.
+ return (0);
+
case ERROR_IO_PENDING:
- nni_aio_list_append(&ep->aios, aio);
- nni_mtx_unlock(&ep->mtx);
- return;
+ // Normal asynchronous operation. Wait for
+ // completion.
+ return (0);
default:
- rv = nni_win_error(GetLastError());
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&ep->mtx);
- return;
+ // Fast-fail (synchronous).
+ evt->status = GetLastError();
+ evt->count = 0;
+ return (1);
}
}
- nni_win_ipc_acc_finish(ep);
- nni_mtx_unlock(&ep->mtx);
+ // Synch completion right now. I/O completion packet delivered
+ // already.
+ return (0);
+}
+
+void
+nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
+{
+ aio->a_pipe = NULL;
+ nni_win_event_submit(&ep->acc_ev, aio);
}
// So Windows IPC is a bit different on the client side. There is no
@@ -645,12 +428,16 @@ nni_win_ipc_conn_thr(void *arg)
}
while ((ep = nni_list_first(&w->workers)) != NULL) {
+
nni_list_remove(&w->workers, ep);
- if ((aio = nni_list_first(&ep->aios)) == NULL) {
+ if ((aio = ep->con_aio) == NULL) {
continue;
}
- nni_list_remove(&ep->aios, aio);
+ ep->con_aio = NULL;
+
+ pipe = NULL;
+
p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE,
0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED,
NULL);
@@ -658,41 +445,46 @@ nni_win_ipc_conn_thr(void *arg)
if (p == INVALID_HANDLE_VALUE) {
switch ((rv = GetLastError())) {
case ERROR_PIPE_BUSY:
- // still in progress.
- nni_list_prepend(&ep->aios, aio);
- break;
+ // Still in progress. This shouldn't
+ // happen unless the other side aborts
+ // the connection.
+ ep->con_aio = aio;
+ nni_list_append(&w->waiters, ep);
+ continue;
+
case ERROR_FILE_NOT_FOUND:
- nni_aio_finish(
- aio, NNG_ECONNREFUSED, 0);
+ rv = NNG_ECONNREFUSED;
break;
default:
- nni_aio_finish(
- aio, nni_win_error(rv), 0);
+ rv = nni_win_error(rv);
break;
}
- } else {
- rv = nni_win_ipc_pipe_init(&pipe, p);
- if (rv == 0) {
- rv = nni_win_iocp_register(p);
- }
- if (rv != 0) {
- DisconnectNamedPipe(p);
- CloseHandle(p);
- nni_aio_finish(aio, rv, 0);
- } else {
- aio->a_pipe = pipe;
- nni_aio_finish(aio, 0, 0);
- }
+ goto fail;
+ }
+ if (((rv = nni_win_ipc_pipe_init(&pipe, p)) != 0) ||
+ ((rv = nni_win_iocp_register(p)) != 0)) {
+ goto fail;
+ }
+ aio->a_pipe = pipe;
+ nni_aio_finish(aio, 0, 0);
+ continue;
+
+ fail:
+ if (p != INVALID_HANDLE_VALUE) {
+ DisconnectNamedPipe(p);
+ CloseHandle(p);
}
- if (!nni_list_empty(&ep->aios)) {
- nni_list_append(&w->waiters, ep);
+ if (pipe != NULL) {
+ nni_plat_ipc_pipe_fini(pipe);
}
+ nni_aio_finish(aio, rv, 0);
}
- // Wait 10 ms, unless woken earlier.
if (nni_list_empty(&w->waiters)) {
+ // Wait until an endpoint is added.
nni_cv_wait(&w->cv);
} else {
+ // Wait 10 ms, unless woken earlier.
nni_cv_until(&w->cv, nni_clock() + 10000);
}
}
@@ -706,9 +498,10 @@ nni_win_ipc_conn_cancel(nni_aio *aio)
nni_plat_ipc_ep * ep = aio->a_prov_data;
nni_mtx_lock(&w->mtx);
- nni_aio_list_remove(aio);
- if (nni_list_empty(&ep->aios)) {
+ ep->con_aio = NULL;
+ if (nni_list_active(&w->waiters, ep)) {
nni_list_remove(&w->waiters, ep);
+ nni_cv_wake(&w->cv);
}
nni_mtx_unlock(&w->mtx);
}
@@ -720,17 +513,13 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
nni_mtx_lock(&w->mtx);
- if (nni_list_active(&w->waiters, ep)) {
- nni_aio_finish(aio, NNG_EBUSY, 0);
- nni_mtx_unlock(&w->mtx);
- return;
- }
+ NNI_ASSERT(!nni_list_active(&w->waiters, ep));
if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) {
nni_mtx_unlock(&w->mtx);
return;
}
- nni_list_append(&ep->aios, aio);
+ ep->con_aio = aio;
nni_list_append(&w->waiters, ep);
nni_cv_wake(&w->cv);
nni_mtx_unlock(&w->mtx);
@@ -739,13 +528,12 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
void
nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
{
- nni_mtx_lock(&ep->mtx);
- if (ep->p) {
+ if (ep->p != INVALID_HANDLE_VALUE) {
CloseHandle(ep->p);
ep->p = NULL;
}
- nni_mtx_unlock(&ep->mtx);
- nni_mtx_fini(&ep->mtx);
+ nni_win_event_close(&ep->acc_ev);
+ nni_win_event_fini(&ep->acc_ev);
NNI_FREE_STRUCT(ep);
}
@@ -761,24 +549,19 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
if (nni_list_active(&w->waiters, ep)) {
nni_list_remove(&w->waiters, ep);
}
- while ((aio = nni_list_first(&ep->aios)) != NULL) {
- nni_list_remove(&ep->aios, aio);
+ if ((aio = ep->con_aio) != NULL) {
+ ep->con_aio = NULL;
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
nni_mtx_unlock(&w->mtx);
break;
+
case NNI_EP_MODE_LISTEN:
- nni_mtx_lock(&ep->mtx);
- while ((aio = nni_list_first(&ep->aios)) != NULL) {
- nni_list_remove(&ep->aios, aio);
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- }
if (ep->p != INVALID_HANDLE_VALUE) {
- nni_win_event_cancel(&ep->acc_evt);
+ nni_win_event_close(&ep->acc_ev);
CloseHandle(ep->p);
ep->p = INVALID_HANDLE_VALUE;
}
- nni_mtx_unlock(&ep->mtx);
break;
}
}