aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/platform/windows/win_io.c13
-rw-r--r--src/platform/windows/win_ipcconn.c249
-rw-r--r--src/platform/windows/win_ipclisten.c83
-rw-r--r--src/sp/transport/ipc/ipc_test.c8
4 files changed, 209 insertions, 144 deletions
diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c
index 815f9bf3..f5556719 100644
--- a/src/platform/windows/win_io.c
+++ b/src/platform/windows/win_io.c
@@ -37,11 +37,10 @@ win_io_handler(void *arg)
int rv;
ok = GetQueuedCompletionStatus(
- win_io_h, &cnt, &key, &olpd, INFINITE);
+ win_io_h, &cnt, &key, &olpd, 5000);
if (olpd == NULL) {
// Completion port closed...
- NNI_ASSERT(ok == FALSE);
break;
}
@@ -124,12 +123,16 @@ nni_win_io_sysfini(void)
HANDLE h;
if ((h = win_io_h) != NULL) {
+ // send wakeups in case closing the handle doesn't work
+ for (i = 0; i < win_io_nthr; i++) {
+ PostQueuedCompletionStatus(h, 0, 0, NULL);
+ }
CloseHandle(h);
+ for (i = 0; i < win_io_nthr; i++) {
+ nni_thr_fini(&win_io_thrs[i]);
+ }
win_io_h = NULL;
}
- for (i = 0; i < win_io_nthr; i++) {
- nni_thr_fini(&win_io_thrs[i]);
- }
NNI_FREE_STRUCTS(win_io_thrs, win_io_nthr);
}
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c
index ef6ceb66..5f540a6c 100644
--- a/src/platform/windows/win_ipcconn.c
+++ b/src/platform/windows/win_ipcconn.c
@@ -9,6 +9,7 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include "core/aio.h"
#include "core/nng_impl.h"
#include "win_ipc.h"
@@ -32,12 +33,28 @@ typedef struct ipc_conn {
bool closed;
bool sending;
bool recving;
+ bool recv_fail;
+ bool send_fail;
nni_mtx mtx;
nni_cv cv;
nni_reap_node reap;
} ipc_conn;
static void
+ipc_recv_fail(ipc_conn *c, int rv)
+{
+ nni_aio *aio;
+ c->recving = false;
+ c->recv_fail = true;
+ c->recv_rv = rv;
+ while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_cv_wake(&c->cv);
+}
+
+static void
ipc_recv_start(ipc_conn *c)
{
nni_aio *aio;
@@ -48,48 +65,45 @@ ipc_recv_start(ipc_conn *c)
DWORD len;
int rv;
- while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
- if (c->closed) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- continue;
- }
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ nni_cv_wake(&c->cv);
+ return;
+ }
- nni_aio_get_iov(aio, &naiov, &aiov);
+ if (c->closed) {
+ ipc_recv_fail(c, NNG_ECLOSED);
+ return;
+ }
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes
- // do not appear to support scatter/gather, so we have to
- // process each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
+ nni_aio_get_iov(aio, &naiov, &aiov);
- c->recving = true;
- if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
- ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- c->recving = false;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, nni_win_error(rv));
- } else {
- return;
- }
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes
+ // do not appear to support scatter/gather, so we have to
+ // process each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ c->recving = true;
+ if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ ipc_recv_fail(c, nni_win_error(rv));
}
- nni_cv_wake(&c->cv);
}
static void
@@ -97,6 +111,7 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num)
{
nni_aio *aio;
ipc_conn *c = io->ptr;
+
nni_mtx_lock(&c->mtx);
aio = nni_list_first(&c->recv_aios);
NNI_ASSERT(aio != NULL);
@@ -109,11 +124,17 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num)
rv = NNG_ECONNSHUT;
}
c->recving = false;
+ if (rv != 0) {
+ ipc_recv_fail(c, nni_win_error(rv));
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
nni_aio_list_remove(aio);
ipc_recv_start(c);
nni_mtx_unlock(&c->mtx);
- nni_aio_finish_sync(aio, rv, num);
+ // nni_aio_finish_sync(aio, rv, num);
+ nni_aio_finish(aio, rv, num);
}
static void
@@ -153,6 +174,12 @@ ipc_recv(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ if (c->recv_fail) {
+ rv = c->recv_rv;
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
@@ -166,6 +193,21 @@ ipc_recv(void *arg, nni_aio *aio)
}
static void
+ipc_send_fail(ipc_conn *c, int rv)
+{
+ nni_aio *aio;
+
+ c->sending = false;
+ c->send_fail = true;
+ c->send_rv = rv;
+ while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_cv_wake(&c->cv);
+}
+
+static void
ipc_send_start(ipc_conn *c)
{
nni_aio *aio;
@@ -176,43 +218,45 @@ ipc_send_start(ipc_conn *c)
DWORD len;
int rv;
- while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ nni_cv_wake(&c->cv);
+ return;
+ }
- nni_aio_get_iov(aio, &naiov, &aiov);
+ if (c->closed) {
+ ipc_send_fail(c, NNG_ECLOSED);
+ return;
+ }
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes
- // do not appear to support scatter/gather, so we have to
- // process each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
+ nni_aio_get_iov(aio, &naiov, &aiov);
- c->sending = true;
- if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
- ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- c->sending = false;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, nni_win_error(rv));
- } else {
- return;
- }
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes
+ // do not appear to support scatter/gather, so we have to
+ // process each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ c->sending = true;
+ if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ ipc_send_fail(c, nni_win_error(rv));
}
- nni_cv_wake(&c->cv);
}
static void
@@ -284,6 +328,8 @@ ipc_close(void *arg)
{
ipc_conn *c = arg;
nni_time now;
+ nni_aio *aio;
+
nni_mtx_lock(&c->mtx);
if (!c->closed) {
HANDLE f = c->f;
@@ -294,58 +340,53 @@ ipc_close(void *arg)
if (f != INVALID_HANDLE_VALUE) {
CancelIoEx(f, &c->send_io.olpd);
CancelIoEx(f, &c->recv_io.olpd);
- DisconnectNamedPipe(f);
- CloseHandle(f);
}
}
- now = nni_clock();
- // wait up to a maximum of 10 seconds before assuming something is
- // badly amiss. from what we can tell, this doesn't happen, and we do
- // see the timer expire properly, but this safeguard can prevent a
- // hang.
- while ((c->recving || c->sending) &&
- ((nni_clock() - now) < (NNI_SECOND * 10))) {
- nni_mtx_unlock(&c->mtx);
- nni_msleep(1);
- nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_aio_abort(aio, NNG_ECLOSED);
+ }
+ if ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_aio_abort(aio, NNG_ECLOSED);
}
nni_mtx_unlock(&c->mtx);
}
static void
-ipc_conn_reap(void *arg)
+ipc_free(void *arg)
{
ipc_conn *c = arg;
+ nni_aio *aio;
+ HANDLE f = c->f;
+ int loop = 0;
nni_mtx_lock(&c->mtx);
- while ((!nni_list_empty(&c->recv_aios)) ||
- (!nni_list_empty(&c->send_aios))) {
- nni_cv_wait(&c->cv);
+ // time for callbacks to fire/drain.
+ nni_time when = nng_clock() + 5000;
+ while (c->sending || c->recving) {
+ if (nni_cv_until(&c->cv, when) == NNG_ETIMEDOUT) {
+ nng_log_err("NNG-WIN-IPC",
+ "Timeout waiting for operations to cancel");
+ break;
+ }
}
+ // These asserts are for debug, we should never see it.
+ // If we do then something bad happened.
+ NNI_ASSERT(!c->sending);
+ NNI_ASSERT(!c->recving);
+ NNI_ASSERT(nni_list_empty(&c->recv_aios));
+ NNI_ASSERT(nni_list_empty(&c->send_aios));
nni_mtx_unlock(&c->mtx);
- if (c->f != INVALID_HANDLE_VALUE) {
- CloseHandle(c->f);
+ if (f != INVALID_HANDLE_VALUE) {
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
}
+
nni_cv_fini(&c->cv);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
}
-static nni_reap_list ipc_reap_list = {
- .rl_offset = offsetof(ipc_conn, reap),
- .rl_func = ipc_conn_reap,
-};
-
-static void
-ipc_free(void *arg)
-{
- ipc_conn *c = arg;
- ipc_close(c);
-
- nni_reap(&ipc_reap_list, c);
-}
-
static int
ipc_conn_get_addr(void *c, void *buf, size_t *szp, nni_opt_type t)
{
diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c
index b7b7b526..98cb8273 100644
--- a/src/platform/windows/win_ipclisten.c
+++ b/src/platform/windows/win_ipclisten.c
@@ -20,11 +20,11 @@ typedef struct {
char *path;
bool started;
bool closed;
+ bool accepting;
HANDLE f;
SECURITY_ATTRIBUTES sec_attr;
nni_list aios;
nni_mtx mtx;
- nni_cv cv;
nni_win_io io;
nni_sockaddr sa;
int rv;
@@ -39,7 +39,6 @@ ipc_accept_done(ipc_listener *l, int rv)
aio = nni_list_first(&l->aios);
nni_list_remove(&l->aios, aio);
- nni_cv_wake(&l->cv);
if (l->closed) {
rv = NNG_ECLOSED;
@@ -86,6 +85,7 @@ ipc_accept_start(ipc_listener *l)
{
nni_aio *aio;
+ NNI_ASSERT(!l->accepting);
while ((aio = nni_list_first(&l->aios)) != NULL) {
int rv;
@@ -97,6 +97,7 @@ ipc_accept_start(ipc_listener *l)
rv = 0;
} else if ((rv = GetLastError()) == ERROR_IO_PENDING) {
// asynchronous completion pending
+ l->accepting = true;
return;
} else if (rv == ERROR_PIPE_CONNECTED) {
rv = 0;
@@ -104,8 +105,6 @@ ipc_accept_start(ipc_listener *l)
// synchronous completion
ipc_accept_done(l, rv);
}
-
- nni_cv_wake(&l->cv);
}
static void
@@ -116,9 +115,18 @@ ipc_accept_cb(nni_win_io *io, int rv, size_t cnt)
NNI_ARG_UNUSED(cnt);
nni_mtx_lock(&l->mtx);
- if (nni_list_empty(&l->aios)) {
- // We canceled this somehow. We no longer care.
- DisconnectNamedPipe(l->f);
+ l->accepting = false;
+ if (l->closed) {
+ // We're shutting down, and the handle is probably closed.
+ // We should not have gotten anything here.
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (nni_list_empty(&l->aios) && l->rv == 0) {
+ // We canceled, and nobody waiting.
+ // But... we'll probably have another caller do
+ // accept momentarily, so we leave this and it will be
+ // ERROR_PIPE_CONNECTED later.
nni_mtx_unlock(&l->mtx);
return;
}
@@ -242,12 +250,8 @@ ipc_accept_cancel(nni_aio *aio, void *arg, int rv)
ipc_listener *l = arg;
nni_mtx_unlock(&l->mtx);
- if (aio == nni_list_first(&l->aios)) {
- l->rv = rv;
- CancelIoEx(l->f, &l->io.olpd);
- } else if (nni_aio_list_active(aio)) {
- nni_list_remove(&l->aios, aio);
- nni_cv_wake(&l->cv);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&l->mtx);
@@ -282,29 +286,55 @@ static void
ipc_listener_close(void *arg)
{
ipc_listener *l = arg;
+ nni_aio *aio;
+ int rv;
+ DWORD nb;
+
nni_mtx_lock(&l->mtx);
- if (!l->closed) {
- l->closed = true;
- if (!nni_list_empty(&l->aios)) {
- CancelIoEx(l->f, &l->io.olpd);
- }
- DisconnectNamedPipe(l->f);
- CloseHandle(l->f);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return;
}
+ l->closed = true;
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ bool accepting = l->accepting;
nni_mtx_unlock(&l->mtx);
+
+ // This craziness because CancelIoEx on ConnectNamedPipe
+ // seems to be incredibly unreliable. It does work, sometimes,
+ // but often it doesn't. This entire named pipe business needs
+ // to be retired in favor of UNIX domain sockets anyway.
+
+ while (accepting) {
+ if (!CancelIoEx(l->f, &l->io.olpd)) {
+ // operation not found probably
+ // We just inject a safety sleep to
+ // let it drain and give the callback
+ // a chance to fire (although it should
+ // already have done so.)
+ DisconnectNamedPipe(l->f);
+ CloseHandle(l->f);
+ nng_msleep(500);
+ return;
+ }
+ nng_msleep(100);
+ nni_mtx_lock(&l->mtx);
+ accepting = l->accepting;
+ nni_mtx_unlock(&l->mtx);
+ }
+ DisconnectNamedPipe(l->f);
+ CloseHandle(l->f);
}
static void
ipc_listener_free(void *arg)
{
ipc_listener *l = arg;
- nni_mtx_lock(&l->mtx);
- while (!nni_list_empty(&l->aios)) {
- nni_cv_wait(&l->cv);
- }
- nni_mtx_unlock(&l->mtx);
+
nni_strfree(l->path);
- nni_cv_fini(&l->cv);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
@@ -339,7 +369,6 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
snprintf(l->sa.s_ipc.sa_path, NNG_MAXADDRLEN, "%s", url->u_path);
nni_aio_list_init(&l->aios);
nni_mtx_init(&l->mtx);
- nni_cv_init(&l->cv, &l->mtx);
*lp = (void *) l;
return (0);
}
diff --git a/src/sp/transport/ipc/ipc_test.c b/src/sp/transport/ipc/ipc_test.c
index 51eb975d..509d8722 100644
--- a/src/sp/transport/ipc/ipc_test.c
+++ b/src/sp/transport/ipc/ipc_test.c
@@ -152,7 +152,6 @@ test_ipc_ping_pong(void)
nng_socket s1;
char *addr;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
NUTS_OPEN(s1);
@@ -245,7 +244,6 @@ test_ipc_recv_max(void)
size_t sz;
char *addr;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100));
@@ -275,7 +273,6 @@ test_ipc_connect_refused(void)
nng_dialer d;
char *addr;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100));
@@ -292,7 +289,6 @@ test_ipc_connect_blocking(void)
nng_stream_listener *l;
char *addr;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
@@ -315,7 +311,6 @@ test_ipc_connect_blocking_accept(void)
char *addr;
nng_aio *aio;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
@@ -344,7 +339,6 @@ test_ipc_listen_accept_cancel(void)
char *addr;
nng_aio *aio;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
@@ -364,7 +358,6 @@ test_ipc_listen_duplicate(void)
nng_socket s0;
char *addr;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
@@ -384,7 +377,6 @@ test_ipc_listener_clean_stale(void)
char *path;
char renamed[256];
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);