aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-09 22:27:55 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-09 23:03:42 -0800
commit2ade67cf0bad8596838762d085664d87e91093ba (patch)
tree87cddebb07ed7d8b8a8b9c61d8e813a476d87779 /src
parent2e72f32c61e3c6227e654285890807b9ca8709cf (diff)
downloadnng-2ade67cf0bad8596838762d085664d87e91093ba.tar.gz
nng-2ade67cf0bad8596838762d085664d87e91093ba.tar.bz2
nng-2ade67cf0bad8596838762d085664d87e91093ba.zip
windows ipc: significant refactor
This refactors a lot of the IPC code to hopefully address various hangs on shutdown, etc. The problem is that named pipes are not terrifically reliable when it comes to aborting ConnectNamedPipe. Additionally there were some logic errors in some of our code that left things rather brittle. Ultimately this all needs to be replaced with UNIX domain sockets which are superior in many ways.
Diffstat (limited to 'src')
-rw-r--r--src/platform/windows/win_io.c13
-rw-r--r--src/platform/windows/win_ipcconn.c249
-rw-r--r--src/platform/windows/win_ipclisten.c83
-rw-r--r--src/sp/transport/ipc/ipc_test.c8
4 files changed, 209 insertions, 144 deletions
diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c
index 815f9bf3..f5556719 100644
--- a/src/platform/windows/win_io.c
+++ b/src/platform/windows/win_io.c
@@ -37,11 +37,10 @@ win_io_handler(void *arg)
int rv;
ok = GetQueuedCompletionStatus(
- win_io_h, &cnt, &key, &olpd, INFINITE);
+ win_io_h, &cnt, &key, &olpd, 5000);
if (olpd == NULL) {
// Completion port closed...
- NNI_ASSERT(ok == FALSE);
break;
}
@@ -124,12 +123,16 @@ nni_win_io_sysfini(void)
HANDLE h;
if ((h = win_io_h) != NULL) {
+ // send wakeups in case closing the handle doesn't work
+ for (i = 0; i < win_io_nthr; i++) {
+ PostQueuedCompletionStatus(h, 0, 0, NULL);
+ }
CloseHandle(h);
+ for (i = 0; i < win_io_nthr; i++) {
+ nni_thr_fini(&win_io_thrs[i]);
+ }
win_io_h = NULL;
}
- for (i = 0; i < win_io_nthr; i++) {
- nni_thr_fini(&win_io_thrs[i]);
- }
NNI_FREE_STRUCTS(win_io_thrs, win_io_nthr);
}
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c
index ef6ceb66..5f540a6c 100644
--- a/src/platform/windows/win_ipcconn.c
+++ b/src/platform/windows/win_ipcconn.c
@@ -9,6 +9,7 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include "core/aio.h"
#include "core/nng_impl.h"
#include "win_ipc.h"
@@ -32,12 +33,28 @@ typedef struct ipc_conn {
bool closed;
bool sending;
bool recving;
+ bool recv_fail;
+ bool send_fail;
nni_mtx mtx;
nni_cv cv;
nni_reap_node reap;
} ipc_conn;
static void
+ipc_recv_fail(ipc_conn *c, int rv)
+{
+ nni_aio *aio;
+ c->recving = false;
+ c->recv_fail = true;
+ c->recv_rv = rv;
+ while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_cv_wake(&c->cv);
+}
+
+static void
ipc_recv_start(ipc_conn *c)
{
nni_aio *aio;
@@ -48,48 +65,45 @@ ipc_recv_start(ipc_conn *c)
DWORD len;
int rv;
- while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
- if (c->closed) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- continue;
- }
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ nni_cv_wake(&c->cv);
+ return;
+ }
- nni_aio_get_iov(aio, &naiov, &aiov);
+ if (c->closed) {
+ ipc_recv_fail(c, NNG_ECLOSED);
+ return;
+ }
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes
- // do not appear to support scatter/gather, so we have to
- // process each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
+ nni_aio_get_iov(aio, &naiov, &aiov);
- c->recving = true;
- if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
- ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- c->recving = false;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, nni_win_error(rv));
- } else {
- return;
- }
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes
+ // do not appear to support scatter/gather, so we have to
+ // process each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ c->recving = true;
+ if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ ipc_recv_fail(c, nni_win_error(rv));
}
- nni_cv_wake(&c->cv);
}
static void
@@ -97,6 +111,7 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num)
{
nni_aio *aio;
ipc_conn *c = io->ptr;
+
nni_mtx_lock(&c->mtx);
aio = nni_list_first(&c->recv_aios);
NNI_ASSERT(aio != NULL);
@@ -109,11 +124,17 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num)
rv = NNG_ECONNSHUT;
}
c->recving = false;
+ if (rv != 0) {
+ ipc_recv_fail(c, nni_win_error(rv));
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
nni_aio_list_remove(aio);
ipc_recv_start(c);
nni_mtx_unlock(&c->mtx);
- nni_aio_finish_sync(aio, rv, num);
+ // nni_aio_finish_sync(aio, rv, num);
+ nni_aio_finish(aio, rv, num);
}
static void
@@ -153,6 +174,12 @@ ipc_recv(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ if (c->recv_fail) {
+ rv = c->recv_rv;
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
@@ -166,6 +193,21 @@ ipc_recv(void *arg, nni_aio *aio)
}
static void
+ipc_send_fail(ipc_conn *c, int rv)
+{
+ nni_aio *aio;
+
+ c->sending = false;
+ c->send_fail = true;
+ c->send_rv = rv;
+ while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_cv_wake(&c->cv);
+}
+
+static void
ipc_send_start(ipc_conn *c)
{
nni_aio *aio;
@@ -176,43 +218,45 @@ ipc_send_start(ipc_conn *c)
DWORD len;
int rv;
- while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ nni_cv_wake(&c->cv);
+ return;
+ }
- nni_aio_get_iov(aio, &naiov, &aiov);
+ if (c->closed) {
+ ipc_send_fail(c, NNG_ECLOSED);
+ return;
+ }
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes
- // do not appear to support scatter/gather, so we have to
- // process each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
+ nni_aio_get_iov(aio, &naiov, &aiov);
- c->sending = true;
- if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
- ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- c->sending = false;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, nni_win_error(rv));
- } else {
- return;
- }
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes
+ // do not appear to support scatter/gather, so we have to
+ // process each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ c->sending = true;
+ if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ ipc_send_fail(c, nni_win_error(rv));
}
- nni_cv_wake(&c->cv);
}
static void
@@ -284,6 +328,8 @@ ipc_close(void *arg)
{
ipc_conn *c = arg;
nni_time now;
+ nni_aio *aio;
+
nni_mtx_lock(&c->mtx);
if (!c->closed) {
HANDLE f = c->f;
@@ -294,58 +340,53 @@ ipc_close(void *arg)
if (f != INVALID_HANDLE_VALUE) {
CancelIoEx(f, &c->send_io.olpd);
CancelIoEx(f, &c->recv_io.olpd);
- DisconnectNamedPipe(f);
- CloseHandle(f);
}
}
- now = nni_clock();
- // wait up to a maximum of 10 seconds before assuming something is
- // badly amiss. from what we can tell, this doesn't happen, and we do
- // see the timer expire properly, but this safeguard can prevent a
- // hang.
- while ((c->recving || c->sending) &&
- ((nni_clock() - now) < (NNI_SECOND * 10))) {
- nni_mtx_unlock(&c->mtx);
- nni_msleep(1);
- nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_aio_abort(aio, NNG_ECLOSED);
+ }
+ if ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_aio_abort(aio, NNG_ECLOSED);
}
nni_mtx_unlock(&c->mtx);
}
static void
-ipc_conn_reap(void *arg)
+ipc_free(void *arg)
{
ipc_conn *c = arg;
+ nni_aio *aio;
+ HANDLE f = c->f;
+ int loop = 0;
nni_mtx_lock(&c->mtx);
- while ((!nni_list_empty(&c->recv_aios)) ||
- (!nni_list_empty(&c->send_aios))) {
- nni_cv_wait(&c->cv);
+ // time for callbacks to fire/drain.
+ nni_time when = nng_clock() + 5000;
+ while (c->sending || c->recving) {
+ if (nni_cv_until(&c->cv, when) == NNG_ETIMEDOUT) {
+ nng_log_err("NNG-WIN-IPC",
+ "Timeout waiting for operations to cancel");
+ break;
+ }
}
+ // These asserts are for debug, we should never see it.
+ // If we do then something bad happened.
+ NNI_ASSERT(!c->sending);
+ NNI_ASSERT(!c->recving);
+ NNI_ASSERT(nni_list_empty(&c->recv_aios));
+ NNI_ASSERT(nni_list_empty(&c->send_aios));
nni_mtx_unlock(&c->mtx);
- if (c->f != INVALID_HANDLE_VALUE) {
- CloseHandle(c->f);
+ if (f != INVALID_HANDLE_VALUE) {
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
}
+
nni_cv_fini(&c->cv);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
}
-static nni_reap_list ipc_reap_list = {
- .rl_offset = offsetof(ipc_conn, reap),
- .rl_func = ipc_conn_reap,
-};
-
-static void
-ipc_free(void *arg)
-{
- ipc_conn *c = arg;
- ipc_close(c);
-
- nni_reap(&ipc_reap_list, c);
-}
-
static int
ipc_conn_get_addr(void *c, void *buf, size_t *szp, nni_opt_type t)
{
diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c
index b7b7b526..98cb8273 100644
--- a/src/platform/windows/win_ipclisten.c
+++ b/src/platform/windows/win_ipclisten.c
@@ -20,11 +20,11 @@ typedef struct {
char *path;
bool started;
bool closed;
+ bool accepting;
HANDLE f;
SECURITY_ATTRIBUTES sec_attr;
nni_list aios;
nni_mtx mtx;
- nni_cv cv;
nni_win_io io;
nni_sockaddr sa;
int rv;
@@ -39,7 +39,6 @@ ipc_accept_done(ipc_listener *l, int rv)
aio = nni_list_first(&l->aios);
nni_list_remove(&l->aios, aio);
- nni_cv_wake(&l->cv);
if (l->closed) {
rv = NNG_ECLOSED;
@@ -86,6 +85,7 @@ ipc_accept_start(ipc_listener *l)
{
nni_aio *aio;
+ NNI_ASSERT(!l->accepting);
while ((aio = nni_list_first(&l->aios)) != NULL) {
int rv;
@@ -97,6 +97,7 @@ ipc_accept_start(ipc_listener *l)
rv = 0;
} else if ((rv = GetLastError()) == ERROR_IO_PENDING) {
// asynchronous completion pending
+ l->accepting = true;
return;
} else if (rv == ERROR_PIPE_CONNECTED) {
rv = 0;
@@ -104,8 +105,6 @@ ipc_accept_start(ipc_listener *l)
// synchronous completion
ipc_accept_done(l, rv);
}
-
- nni_cv_wake(&l->cv);
}
static void
@@ -116,9 +115,18 @@ ipc_accept_cb(nni_win_io *io, int rv, size_t cnt)
NNI_ARG_UNUSED(cnt);
nni_mtx_lock(&l->mtx);
- if (nni_list_empty(&l->aios)) {
- // We canceled this somehow. We no longer care.
- DisconnectNamedPipe(l->f);
+ l->accepting = false;
+ if (l->closed) {
+ // We're shutting down, and the handle is probably closed.
+ // We should not have gotten anything here.
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (nni_list_empty(&l->aios) && l->rv == 0) {
+ // We canceled, and nobody waiting.
+ // But... we'll probably have another caller do
+ // accept momentarily, so we leave this and it will be
+ // ERROR_PIPE_CONNECTED later.
nni_mtx_unlock(&l->mtx);
return;
}
@@ -242,12 +250,8 @@ ipc_accept_cancel(nni_aio *aio, void *arg, int rv)
ipc_listener *l = arg;
nni_mtx_unlock(&l->mtx);
- if (aio == nni_list_first(&l->aios)) {
- l->rv = rv;
- CancelIoEx(l->f, &l->io.olpd);
- } else if (nni_aio_list_active(aio)) {
- nni_list_remove(&l->aios, aio);
- nni_cv_wake(&l->cv);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&l->mtx);
@@ -282,29 +286,55 @@ static void
ipc_listener_close(void *arg)
{
ipc_listener *l = arg;
+ nni_aio *aio;
+ int rv;
+ DWORD nb;
+
nni_mtx_lock(&l->mtx);
- if (!l->closed) {
- l->closed = true;
- if (!nni_list_empty(&l->aios)) {
- CancelIoEx(l->f, &l->io.olpd);
- }
- DisconnectNamedPipe(l->f);
- CloseHandle(l->f);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return;
}
+ l->closed = true;
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ bool accepting = l->accepting;
nni_mtx_unlock(&l->mtx);
+
+ // This craziness because CancelIoEx on ConnectNamedPipe
+ // seems to be incredibly unreliable. It does work, sometimes,
+ // but often it doesn't. This entire named pipe business needs
+ // to be retired in favor of UNIX domain sockets anyway.
+
+ while (accepting) {
+ if (!CancelIoEx(l->f, &l->io.olpd)) {
+ // operation not found probably
+ // We just inject a safety sleep to
+ // let it drain and give the callback
+ // a chance to fire (although it should
+ // already have done so.)
+ DisconnectNamedPipe(l->f);
+ CloseHandle(l->f);
+ nng_msleep(500);
+ return;
+ }
+ nng_msleep(100);
+ nni_mtx_lock(&l->mtx);
+ accepting = l->accepting;
+ nni_mtx_unlock(&l->mtx);
+ }
+ DisconnectNamedPipe(l->f);
+ CloseHandle(l->f);
}
static void
ipc_listener_free(void *arg)
{
ipc_listener *l = arg;
- nni_mtx_lock(&l->mtx);
- while (!nni_list_empty(&l->aios)) {
- nni_cv_wait(&l->cv);
- }
- nni_mtx_unlock(&l->mtx);
+
nni_strfree(l->path);
- nni_cv_fini(&l->cv);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
@@ -339,7 +369,6 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
snprintf(l->sa.s_ipc.sa_path, NNG_MAXADDRLEN, "%s", url->u_path);
nni_aio_list_init(&l->aios);
nni_mtx_init(&l->mtx);
- nni_cv_init(&l->cv, &l->mtx);
*lp = (void *) l;
return (0);
}
diff --git a/src/sp/transport/ipc/ipc_test.c b/src/sp/transport/ipc/ipc_test.c
index 51eb975d..509d8722 100644
--- a/src/sp/transport/ipc/ipc_test.c
+++ b/src/sp/transport/ipc/ipc_test.c
@@ -152,7 +152,6 @@ test_ipc_ping_pong(void)
nng_socket s1;
char *addr;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
NUTS_OPEN(s1);
@@ -245,7 +244,6 @@ test_ipc_recv_max(void)
size_t sz;
char *addr;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100));
@@ -275,7 +273,6 @@ test_ipc_connect_refused(void)
nng_dialer d;
char *addr;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100));
@@ -292,7 +289,6 @@ test_ipc_connect_blocking(void)
nng_stream_listener *l;
char *addr;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
@@ -315,7 +311,6 @@ test_ipc_connect_blocking_accept(void)
char *addr;
nng_aio *aio;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
@@ -344,7 +339,6 @@ test_ipc_listen_accept_cancel(void)
char *addr;
nng_aio *aio;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
@@ -364,7 +358,6 @@ test_ipc_listen_duplicate(void)
nng_socket s0;
char *addr;
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);
@@ -384,7 +377,6 @@ test_ipc_listener_clean_stale(void)
char *path;
char renamed[256];
- NUTS_ENABLE_LOG(NNG_LOG_INFO);
NUTS_ADDR(addr, "ipc");
NUTS_OPEN(s0);