summaryrefslogtreecommitdiff
path: root/src/platform/windows/win_ipc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/windows/win_ipc.c')
-rw-r--r--src/platform/windows/win_ipc.c619
1 files changed, 201 insertions, 418 deletions
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index 237ded78..be1a98a1 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -13,19 +13,10 @@
#include <stdio.h>
-static void nni_win_ipc_acc_cb(void *);
-static void nni_win_ipc_send_cb(void *);
-static void nni_win_ipc_recv_cb(void *);
-static void nni_win_ipc_send_start(nni_plat_ipc_pipe *);
-static void nni_win_ipc_recv_start(nni_plat_ipc_pipe *);
-
struct nni_plat_ipc_pipe {
HANDLE p;
- nni_win_event recv_evt;
- nni_win_event send_evt;
- nni_mtx mtx;
- nni_list readq;
- nni_list writeq;
+ nni_win_event rcv_ev;
+ nni_win_event snd_ev;
};
struct nni_plat_ipc_ep {
@@ -34,118 +25,49 @@ struct nni_plat_ipc_ep {
int started;
nni_list aios;
HANDLE p; // accept side only
- nni_win_event acc_evt; // accept side only
- nni_mtx mtx; // accept side only
+ nni_win_event acc_ev; // accept side only
+ nni_aio * con_aio; // conn side only
nni_list_node node; // conn side uses this
};
-static int
-nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p)
-{
- nni_plat_ipc_pipe *pipe;
- int rv;
-
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
- NNI_FREE_STRUCT(pipe);
- return (rv);
- }
- rv = nni_win_event_init(&pipe->recv_evt, nni_win_ipc_recv_cb, pipe, p);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
- rv = nni_win_event_init(&pipe->send_evt, nni_win_ipc_send_cb, pipe, p);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
-
- pipe->p = p;
- nni_aio_list_init(&pipe->readq);
- nni_aio_list_init(&pipe->writeq);
- *pipep = pipe;
- return (0);
-}
-
-static void
-nni_win_ipc_send_cancel(nni_aio *aio)
-{
- nni_plat_ipc_pipe *pipe = aio->a_prov_data;
-
- nni_mtx_lock(&pipe->mtx);
- nni_win_event_cancel(&pipe->recv_evt);
- nni_aio_list_remove(aio);
- nni_mtx_unlock(&pipe->mtx);
-}
-
-static void
-nni_win_ipc_send_finish(nni_plat_ipc_pipe *pipe)
-{
- nni_win_event *evt = &pipe->send_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- int rv = 0;
- nni_aio * aio;
- DWORD cnt;
-
- if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) {
- rv = nni_win_error(GetLastError());
- }
- if ((aio = nni_list_first(&pipe->writeq)) == NULL) {
- // If the AIO was canceled, but IOCP thread was still
- // working on it, we might have seen this.
- return;
- }
- if (rv == 0) {
- NNI_ASSERT(cnt <= aio->a_iov[0].iov_len);
- aio->a_count += cnt;
- aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt;
- aio->a_iov[0].iov_len -= cnt;
+static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *);
+static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *);
+static void nni_win_ipc_pipe_cancel(nni_win_event *, nni_aio *);
- if (aio->a_iov[0].iov_len == 0) {
- int i;
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
- aio->a_niov--;
- }
+static nni_win_event_ops nni_win_ipc_pipe_ops = {
+ .wev_start = nni_win_ipc_pipe_start,
+ .wev_finish = nni_win_ipc_pipe_finish,
+ .wev_cancel = nni_win_ipc_pipe_cancel,
+};
- if (aio->a_niov > 0) {
- // If we have more to do, submit it!
- nni_win_ipc_send_start(pipe);
- return;
- }
- }
+static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *);
+static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *);
+static void nni_win_ipc_acc_cancel(nni_win_event *, nni_aio *);
- // All done; hopefully successfully.
- nni_list_remove(&pipe->writeq, aio);
- nni_aio_finish(aio, rv, aio->a_count);
-}
+static nni_win_event_ops nni_win_ipc_acc_ops = {
+ .wev_start = nni_win_ipc_acc_start,
+ .wev_finish = nni_win_ipc_acc_finish,
+ .wev_cancel = nni_win_ipc_acc_cancel,
+};
-static void
-nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe)
+static int
+nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
{
- void * buf;
- DWORD len;
- int rv;
- nni_win_event *evt = &pipe->send_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- nni_aio * aio = nni_list_first(&pipe->writeq);
+ void * buf;
+ DWORD len;
+ BOOL ok;
+ int rv;
+ nni_plat_ipc_pipe *pipe = evt->ptr;
NNI_ASSERT(aio != NULL);
NNI_ASSERT(aio->a_niov > 0);
NNI_ASSERT(aio->a_iov[0].iov_len > 0);
NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
- if (pipe->p == INVALID_HANDLE_VALUE) {
- rv = NNG_ECLOSED;
- goto fail;
- }
-
- if ((rv = nni_win_event_reset(evt)) != 0) {
- goto fail;
+ if (evt->h == INVALID_HANDLE_VALUE) {
+ evt->status = ERROR_INVALID_HANDLE;
+ evt->count = 0;
+ return (1);
}
// Now start a writefile. We assume that only one send can be
@@ -153,9 +75,8 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe)
// scrambling the data anyway. Note that Windows named pipes do
// not appear to support scatter/gather, so we have to process
// each element in turn.
- buf = aio->a_iov[0].iov_buf;
- len = (DWORD) aio->a_iov[0].iov_len;
- olpd = nni_win_event_overlapped(evt);
+ buf = aio->a_iov[0].iov_buf;
+ len = (DWORD) aio->a_iov[0].iov_len;
// We limit ourselves to writing 16MB at a time. Named Pipes
// on Windows have limits of between 31 and 64MB.
@@ -163,95 +84,47 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe)
len = 0x1000000;
}
- if (!WriteFile(pipe->p, buf, len, NULL, olpd)) {
- // If we failed immediately, then process it.
- if ((rv = GetLastError()) == ERROR_IO_PENDING) {
- // This is the normal path we expect; the IO will
- // complete asynchronously.
- return;
- }
-
- // Some synchronous error occurred.
- rv = nni_win_error(rv);
- goto fail;
+ evt->count = 0;
+ if (evt == &pipe->snd_ev) {
+ ok = WriteFile(evt->h, buf, len, NULL, &evt->olpd);
+ } else {
+ ok = ReadFile(evt->h, buf, len, NULL, &evt->olpd);
+ }
+ if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ evt->status = GetLastError();
+ evt->count = 0;
+ return (1);
}
- // If we completed synchronously, then do the completion. This is
- // not normally expected.
- nni_win_ipc_send_finish(pipe);
- return;
-
-fail:
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, aio->a_count);
+ // Wait for the I/O completion event. Note that when an I/O
+ // completes immediately, the I/O completion packet is still
+ // delivered.
+ return (0);
}
static void
-nni_win_ipc_send_cb(void *arg)
+nni_win_ipc_pipe_cancel(nni_win_event *evt, nni_aio *aio)
{
- nni_plat_ipc_pipe *pipe = arg;
+ NNI_ARG_UNUSED(aio);
- nni_mtx_lock(&pipe->mtx);
- nni_win_ipc_send_finish(pipe);
- nni_mtx_unlock(&pipe->mtx);
-}
+ if (CancelIoEx(evt->h, &evt->olpd)) {
+ DWORD cnt;
-void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio)
-{
- nni_win_event *evt = &pipe->send_evt;
- int rv;
-
- nni_mtx_lock(&pipe->mtx);
- if ((rv = nni_aio_start(aio, nni_win_ipc_send_cancel, pipe)) != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
- if (pipe->p == INVALID_HANDLE_VALUE) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
-
- if ((rv = nni_win_event_reset(evt)) != 0) {
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
+ // If we canceled, make sure that we've completely
+ // finished with the overlapped.
+ GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE);
}
- nni_aio_list_append(&pipe->writeq, aio);
- nni_win_ipc_send_start(pipe);
- nni_mtx_unlock(&pipe->mtx);
}
static void
-nni_win_ipc_recv_cancel(nni_aio *aio)
+nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio)
{
- nni_plat_ipc_pipe *pipe = aio->a_prov_data;
+ int rv = 0;
+ DWORD cnt;
- nni_mtx_lock(&pipe->mtx);
- nni_win_event_cancel(&pipe->recv_evt);
- nni_aio_list_remove(aio);
- nni_mtx_unlock(&pipe->mtx);
-}
-
-static void
-nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe)
-{
- nni_win_event *evt = &pipe->recv_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- int rv = 0;
- nni_aio * aio;
- DWORD cnt;
-
- if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) {
- rv = nni_win_error(GetLastError());
- }
- if ((aio = nni_list_first(&pipe->readq)) == NULL) {
- // If the AIO was canceled, but IOCP thread was still
- // working on it, we might have seen this.
- return;
- }
- if (rv == 0) {
+ cnt = evt->count;
+ if ((rv = evt->status) == 0) {
NNI_ASSERT(cnt <= aio->a_iov[0].iov_len);
aio->a_count += cnt;
aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt;
@@ -259,133 +132,71 @@ nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe)
if (aio->a_iov[0].iov_len == 0) {
int i;
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
aio->a_niov--;
+ for (i = 0; i < aio->a_niov; i++) {
+ aio->a_iov[i] = aio->a_iov[i + 1];
+ }
}
if (aio->a_niov > 0) {
// If we have more to do, submit it!
- nni_win_ipc_recv_start(pipe);
+ nni_win_event_resubmit(evt, aio);
return;
}
}
// All done; hopefully successfully.
- nni_list_remove(&pipe->readq, aio);
- nni_aio_finish(aio, rv, aio->a_count);
+ nni_aio_finish(aio, nni_win_error(rv), aio->a_count);
}
-static void
-nni_win_ipc_recv_start(nni_plat_ipc_pipe *pipe)
+static int
+nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p)
{
- void * buf;
- DWORD len;
- int rv;
- nni_win_event *evt = &pipe->recv_evt;
- OVERLAPPED * olpd = nni_win_event_overlapped(evt);
- nni_aio * aio = nni_list_first(&pipe->readq);
-
- NNI_ASSERT(aio != NULL);
- NNI_ASSERT(aio->a_niov > 0);
- NNI_ASSERT(aio->a_iov[0].iov_len > 0);
- NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
-
- if (pipe->p == INVALID_HANDLE_VALUE) {
- rv = NNG_ECLOSED;
- goto fail;
- }
+ nni_plat_ipc_pipe *pipe;
+ int rv;
- if ((rv = nni_win_event_reset(evt)) != 0) {
- goto fail;
+ if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ return (NNG_ENOMEM);
}
-
- // Now start a readfile. We assume that only one read can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes do
- // not appear to support scatter/gather, so we have to process
- // each element in turn.
- buf = aio->a_iov[0].iov_buf;
- len = (DWORD) aio->a_iov[0].iov_len;
- olpd = nni_win_event_overlapped(evt);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
+ rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe, p);
+ if (rv != 0) {
+ nni_plat_ipc_pipe_fini(pipe);
+ return (rv);
}
-
- if (!ReadFile(pipe->p, buf, len, NULL, olpd)) {
- // If we failed immediately, then process it.
- if ((rv = GetLastError()) == ERROR_IO_PENDING) {
- // This is the normal path we expect; the IO will
- // complete asynchronously.
- return;
- }
-
- // Some synchronous error occurred.
- rv = nni_win_error(rv);
- goto fail;
+ rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe, p);
+ if (rv != 0) {
+ nni_plat_ipc_pipe_fini(pipe);
+ return (rv);
}
- // If we completed synchronously, then do the completion. This is
- // not normally expected.
- nni_win_ipc_recv_finish(pipe);
- return;
-
-fail:
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, 0);
+ pipe->p = p;
+ *pipep = pipe;
+ return (0);
}
-static void
-nni_win_ipc_recv_cb(void *arg)
+void
+nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio)
{
- nni_plat_ipc_pipe *pipe = arg;
-
- nni_mtx_lock(&pipe->mtx);
- nni_win_ipc_recv_finish(pipe);
- nni_mtx_unlock(&pipe->mtx);
+ nni_win_event_submit(&pipe->snd_ev, aio);
}
void
nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio)
{
- nni_win_event *evt = &pipe->send_evt;
- int rv;
-
- nni_mtx_lock(&pipe->mtx);
- if ((rv = nni_aio_start(aio, nni_win_ipc_recv_cancel, pipe)) != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
- if (pipe->p == INVALID_HANDLE_VALUE) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
-
- if ((rv = nni_win_event_reset(evt)) != 0) {
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
- nni_aio_list_append(&pipe->readq, aio);
- nni_win_ipc_recv_start(pipe);
+ nni_win_event_submit(&pipe->rcv_ev, aio);
}
void
nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe)
{
- nni_mtx_lock(&pipe->mtx);
- if (pipe->p != INVALID_HANDLE_VALUE) {
- CloseHandle(pipe->p);
+ HANDLE p;
+
+ if ((p = pipe->p) != INVALID_HANDLE_VALUE) {
pipe->p = INVALID_HANDLE_VALUE;
+ CloseHandle(p);
}
- nni_win_event_cancel(&pipe->send_evt);
- nni_win_event_cancel(&pipe->recv_evt);
- nni_mtx_unlock(&pipe->mtx);
+ nni_win_event_close(&pipe->snd_ev);
+ nni_win_event_close(&pipe->rcv_ev);
}
void
@@ -393,9 +204,8 @@ nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe)
{
nni_plat_ipc_pipe_close(pipe);
- nni_win_event_fini(&pipe->send_evt);
- nni_win_event_fini(&pipe->recv_evt);
- nni_mtx_fini(&pipe->mtx);
+ nni_win_event_fini(&pipe->snd_ev);
+ nni_win_event_fini(&pipe->rcv_ev);
NNI_FREE_STRUCT(pipe);
}
@@ -404,7 +214,6 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode)
{
const char * path;
nni_plat_ipc_ep *ep;
- int rv;
if (strncmp(url, "ipc://", strlen("ipc://")) != 0) {
return (NNG_EADDRINVAL);
@@ -414,14 +223,9 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode)
return (NNG_ENOMEM);
}
ZeroMemory(ep, sizeof(ep));
- if ((rv = nni_mtx_init(&ep->mtx)) != 0) {
- NNI_FREE_STRUCT(ep);
- return (rv);
- }
ep->mode = mode;
NNI_LIST_NODE_INIT(&ep->node);
- nni_aio_list_init(&ep->aios);
(void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path);
@@ -435,13 +239,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
int rv;
HANDLE p;
- nni_mtx_lock(&ep->mtx);
if (ep->mode != NNI_EP_MODE_LISTEN) {
- nni_mtx_unlock(&ep->mtx);
return (NNG_EINVAL);
}
if (ep->started) {
- nni_mtx_unlock(&ep->mtx);
return (NNG_EBUSY);
}
@@ -461,7 +262,7 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
}
goto failed;
}
- rv = nni_win_event_init(&ep->acc_evt, nni_win_ipc_acc_cb, ep, p);
+ rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep, p);
if (rv != 0) {
goto failed;
}
@@ -472,12 +273,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
ep->p = p;
ep->started = 1;
- nni_mtx_unlock(&ep->mtx);
return (0);
failed:
- nni_mtx_unlock(&ep->mtx);
if (p != INVALID_HANDLE_VALUE) {
(void) CloseHandle(p);
}
@@ -486,40 +285,15 @@ failed:
}
static void
-nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep)
+nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
{
- nni_win_event * evt = &ep->acc_evt;
- DWORD nbytes;
- int rv;
+ nni_plat_ipc_ep * ep = evt->ptr;
nni_plat_ipc_pipe *pipe;
- nni_aio * aio;
+ int rv;
HANDLE newp, oldp;
- // Note: This should be called with the ep lock held, and only when
- // the ConnectNamedPipe has finished.
-
- rv = 0;
- if (!GetOverlappedResult(ep->p, &evt->olpd, &nbytes, FALSE)) {
- if ((rv = GetLastError()) == ERROR_IO_INCOMPLETE) {
- // We should never be here normally, but if the
- // pipe got accepted by another client we can
- // some times race here.
- return;
- }
- }
-
- if ((aio = nni_list_first(&ep->aios)) == NULL) {
- // No completion available to us.
- if (rv == 0) {
- NNI_ASSERT(0);
- DisconnectNamedPipe(ep->p);
- }
- return;
- }
-
- nni_list_remove(&ep->aios, aio);
- if (rv != 0) {
- nni_aio_finish(aio, rv, 0);
+ if ((rv = evt->status) != 0) {
+ nni_aio_finish(aio, nni_win_error(rv), 0);
return;
}
@@ -530,14 +304,31 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep)
PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, NULL);
if (newp == INVALID_HANDLE_VALUE) {
rv = nni_win_error(GetLastError());
+ // We connected, but as we cannot get a new pipe,
+ // we have to disconnect the old one.
DisconnectNamedPipe(ep->p);
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+ if ((rv = nni_win_iocp_register(newp)) != 0) {
+ // Disconnect the old pipe.
+ DisconnectNamedPipe(ep->p);
+ // And discard the half-baked new one.
+ DisconnectNamedPipe(newp);
+ (void) CloseHandle(newp);
+ nni_aio_finish(aio, rv, 0);
return;
}
- oldp = ep->p;
- ep->p = newp;
+
+ oldp = ep->p;
+ ep->p = newp;
+ evt->h = newp;
if ((rv = nni_win_ipc_pipe_init(&pipe, oldp)) != 0) {
+ // The new pipe is already fine for us. Discard
+ // the old one, since failed to be able to use it.
DisconnectNamedPipe(oldp);
+ (void) CloseHandle(oldp);
nni_aio_finish(aio, rv, 0);
return;
}
@@ -547,64 +338,56 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep)
}
static void
-nni_win_ipc_acc_cb(void *arg)
+nni_win_ipc_acc_cancel(nni_win_event *evt, nni_aio *aio)
{
- nni_plat_ipc_ep *ep = arg;
-
- nni_mtx_lock(&ep->mtx);
- nni_win_ipc_acc_finish(ep);
- nni_mtx_unlock(&ep->mtx);
-}
+ NNI_ARG_UNUSED(aio);
-static void
-nni_win_ipc_acc_cancel(nni_aio *aio)
-{
- nni_plat_ipc_ep *ep = aio->a_prov_data;
+ if (CancelIoEx(evt->h, &evt->olpd)) {
+ DWORD cnt;
- nni_mtx_lock(&ep->mtx);
- nni_win_event_cancel(&ep->acc_evt);
- nni_aio_list_remove(aio);
- nni_mtx_unlock(&ep->mtx);
+ // If we canceled, make sure that we've completely
+ // finished with the overlapped.
+ GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE);
+ }
+ // Just to be sure.
+ (void) DisconnectNamedPipe(evt->h);
}
-void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
+static int
+nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio)
{
- int rv;
- nni_win_event *evt = &ep->acc_evt;
-
- nni_mtx_lock(&ep->mtx);
- if (nni_aio_start(aio, nni_win_ipc_acc_cancel, ep) != 0) {
- nni_mtx_unlock(&ep->mtx);
- return;
- }
- rv = 0;
- if ((rv = nni_win_event_reset(evt)) != 0) {
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&ep->mtx);
- return;
- }
- if (!ConnectNamedPipe(ep->p, nni_win_event_overlapped(evt))) {
- rv = GetLastError();
+ if (!ConnectNamedPipe(evt->h, &evt->olpd)) {
+ int rv = GetLastError();
switch (rv) {
case ERROR_PIPE_CONNECTED:
- rv = 0;
- break;
+ // Synch completion already occurred.
+ // Windows is weird. Apparently the I/O
+ // completion packet has already been sent.
+ return (0);
+
case ERROR_IO_PENDING:
- nni_aio_list_append(&ep->aios, aio);
- nni_mtx_unlock(&ep->mtx);
- return;
+ // Normal asynchronous operation. Wait for
+ // completion.
+ return (0);
default:
- rv = nni_win_error(GetLastError());
- nni_aio_finish(aio, rv, 0);
- nni_mtx_unlock(&ep->mtx);
- return;
+ // Fast-fail (synchronous).
+ evt->status = GetLastError();
+ evt->count = 0;
+ return (1);
}
}
- nni_win_ipc_acc_finish(ep);
- nni_mtx_unlock(&ep->mtx);
+ // Synch completion right now. I/O completion packet delivered
+ // already.
+ return (0);
+}
+
+void
+nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
+{
+ aio->a_pipe = NULL;
+ nni_win_event_submit(&ep->acc_ev, aio);
}
// So Windows IPC is a bit different on the client side. There is no
@@ -645,12 +428,16 @@ nni_win_ipc_conn_thr(void *arg)
}
while ((ep = nni_list_first(&w->workers)) != NULL) {
+
nni_list_remove(&w->workers, ep);
- if ((aio = nni_list_first(&ep->aios)) == NULL) {
+ if ((aio = ep->con_aio) == NULL) {
continue;
}
- nni_list_remove(&ep->aios, aio);
+ ep->con_aio = NULL;
+
+ pipe = NULL;
+
p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE,
0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED,
NULL);
@@ -658,41 +445,46 @@ nni_win_ipc_conn_thr(void *arg)
if (p == INVALID_HANDLE_VALUE) {
switch ((rv = GetLastError())) {
case ERROR_PIPE_BUSY:
- // still in progress.
- nni_list_prepend(&ep->aios, aio);
- break;
+ // Still in progress. This shouldn't
+ // happen unless the other side aborts
+ // the connection.
+ ep->con_aio = aio;
+ nni_list_append(&w->waiters, ep);
+ continue;
+
case ERROR_FILE_NOT_FOUND:
- nni_aio_finish(
- aio, NNG_ECONNREFUSED, 0);
+ rv = NNG_ECONNREFUSED;
break;
default:
- nni_aio_finish(
- aio, nni_win_error(rv), 0);
+ rv = nni_win_error(rv);
break;
}
- } else {
- rv = nni_win_ipc_pipe_init(&pipe, p);
- if (rv == 0) {
- rv = nni_win_iocp_register(p);
- }
- if (rv != 0) {
- DisconnectNamedPipe(p);
- CloseHandle(p);
- nni_aio_finish(aio, rv, 0);
- } else {
- aio->a_pipe = pipe;
- nni_aio_finish(aio, 0, 0);
- }
+ goto fail;
+ }
+ if (((rv = nni_win_ipc_pipe_init(&pipe, p)) != 0) ||
+ ((rv = nni_win_iocp_register(p)) != 0)) {
+ goto fail;
+ }
+ aio->a_pipe = pipe;
+ nni_aio_finish(aio, 0, 0);
+ continue;
+
+ fail:
+ if (p != INVALID_HANDLE_VALUE) {
+ DisconnectNamedPipe(p);
+ CloseHandle(p);
}
- if (!nni_list_empty(&ep->aios)) {
- nni_list_append(&w->waiters, ep);
+ if (pipe != NULL) {
+ nni_plat_ipc_pipe_fini(pipe);
}
+ nni_aio_finish(aio, rv, 0);
}
- // Wait 10 ms, unless woken earlier.
if (nni_list_empty(&w->waiters)) {
+ // Wait until an endpoint is added.
nni_cv_wait(&w->cv);
} else {
+ // Wait 10 ms, unless woken earlier.
nni_cv_until(&w->cv, nni_clock() + 10000);
}
}
@@ -706,9 +498,10 @@ nni_win_ipc_conn_cancel(nni_aio *aio)
nni_plat_ipc_ep * ep = aio->a_prov_data;
nni_mtx_lock(&w->mtx);
- nni_aio_list_remove(aio);
- if (nni_list_empty(&ep->aios)) {
+ ep->con_aio = NULL;
+ if (nni_list_active(&w->waiters, ep)) {
nni_list_remove(&w->waiters, ep);
+ nni_cv_wake(&w->cv);
}
nni_mtx_unlock(&w->mtx);
}
@@ -720,17 +513,13 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
nni_mtx_lock(&w->mtx);
- if (nni_list_active(&w->waiters, ep)) {
- nni_aio_finish(aio, NNG_EBUSY, 0);
- nni_mtx_unlock(&w->mtx);
- return;
- }
+ NNI_ASSERT(!nni_list_active(&w->waiters, ep));
if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) {
nni_mtx_unlock(&w->mtx);
return;
}
- nni_list_append(&ep->aios, aio);
+ ep->con_aio = aio;
nni_list_append(&w->waiters, ep);
nni_cv_wake(&w->cv);
nni_mtx_unlock(&w->mtx);
@@ -739,13 +528,12 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
void
nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
{
- nni_mtx_lock(&ep->mtx);
- if (ep->p) {
+ if (ep->p != INVALID_HANDLE_VALUE) {
CloseHandle(ep->p);
ep->p = NULL;
}
- nni_mtx_unlock(&ep->mtx);
- nni_mtx_fini(&ep->mtx);
+ nni_win_event_close(&ep->acc_ev);
+ nni_win_event_fini(&ep->acc_ev);
NNI_FREE_STRUCT(ep);
}
@@ -761,24 +549,19 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
if (nni_list_active(&w->waiters, ep)) {
nni_list_remove(&w->waiters, ep);
}
- while ((aio = nni_list_first(&ep->aios)) != NULL) {
- nni_list_remove(&ep->aios, aio);
+ if ((aio = ep->con_aio) != NULL) {
+ ep->con_aio = NULL;
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
nni_mtx_unlock(&w->mtx);
break;
+
case NNI_EP_MODE_LISTEN:
- nni_mtx_lock(&ep->mtx);
- while ((aio = nni_list_first(&ep->aios)) != NULL) {
- nni_list_remove(&ep->aios, aio);
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- }
if (ep->p != INVALID_HANDLE_VALUE) {
- nni_win_event_cancel(&ep->acc_evt);
+ nni_win_event_close(&ep->acc_ev);
CloseHandle(ep->p);
ep->p = INVALID_HANDLE_VALUE;
}
- nni_mtx_unlock(&ep->mtx);
break;
}
}