aboutsummaryrefslogtreecommitdiff
path: root/src/platform/windows/win_ipcconn.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-09 22:27:55 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-09 23:03:42 -0800
commit2ade67cf0bad8596838762d085664d87e91093ba (patch)
tree87cddebb07ed7d8b8a8b9c61d8e813a476d87779 /src/platform/windows/win_ipcconn.c
parent2e72f32c61e3c6227e654285890807b9ca8709cf (diff)
downloadnng-2ade67cf0bad8596838762d085664d87e91093ba.tar.gz
nng-2ade67cf0bad8596838762d085664d87e91093ba.tar.bz2
nng-2ade67cf0bad8596838762d085664d87e91093ba.zip
windows ipc: significant refactor
This refactors a lot of the IPC code to hopefully address various hangs on shutdown, etc. The problem is that named pipes are not terrifically reliable when it comes to aborting ConnectNamedPipe. Additionally there were some logic errors in some of our code that left things rather brittle. Ultimately this all needs to be replaced with UNIX domain sockets which are superior in many ways.
Diffstat (limited to 'src/platform/windows/win_ipcconn.c')
-rw-r--r--src/platform/windows/win_ipcconn.c249
1 files changed, 145 insertions, 104 deletions
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c
index ef6ceb66..5f540a6c 100644
--- a/src/platform/windows/win_ipcconn.c
+++ b/src/platform/windows/win_ipcconn.c
@@ -9,6 +9,7 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include "core/aio.h"
#include "core/nng_impl.h"
#include "win_ipc.h"
@@ -32,12 +33,28 @@ typedef struct ipc_conn {
bool closed;
bool sending;
bool recving;
+ bool recv_fail;
+ bool send_fail;
nni_mtx mtx;
nni_cv cv;
nni_reap_node reap;
} ipc_conn;
static void
+ipc_recv_fail(ipc_conn *c, int rv)
+{
+ nni_aio *aio;
+ c->recving = false;
+ c->recv_fail = true;
+ c->recv_rv = rv;
+ while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_cv_wake(&c->cv);
+}
+
+static void
ipc_recv_start(ipc_conn *c)
{
nni_aio *aio;
@@ -48,48 +65,45 @@ ipc_recv_start(ipc_conn *c)
DWORD len;
int rv;
- while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
- if (c->closed) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- continue;
- }
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ nni_cv_wake(&c->cv);
+ return;
+ }
- nni_aio_get_iov(aio, &naiov, &aiov);
+ if (c->closed) {
+ ipc_recv_fail(c, NNG_ECLOSED);
+ return;
+ }
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes
- // do not appear to support scatter/gather, so we have to
- // process each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
+ nni_aio_get_iov(aio, &naiov, &aiov);
- c->recving = true;
- if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
- ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- c->recving = false;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, nni_win_error(rv));
- } else {
- return;
- }
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes
+ // do not appear to support scatter/gather, so we have to
+ // process each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ c->recving = true;
+ if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ ipc_recv_fail(c, nni_win_error(rv));
}
- nni_cv_wake(&c->cv);
}
static void
@@ -97,6 +111,7 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num)
{
nni_aio *aio;
ipc_conn *c = io->ptr;
+
nni_mtx_lock(&c->mtx);
aio = nni_list_first(&c->recv_aios);
NNI_ASSERT(aio != NULL);
@@ -109,11 +124,17 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num)
rv = NNG_ECONNSHUT;
}
c->recving = false;
+ if (rv != 0) {
+ ipc_recv_fail(c, nni_win_error(rv));
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
nni_aio_list_remove(aio);
ipc_recv_start(c);
nni_mtx_unlock(&c->mtx);
- nni_aio_finish_sync(aio, rv, num);
+ // nni_aio_finish_sync(aio, rv, num);
+ nni_aio_finish(aio, rv, num);
}
static void
@@ -153,6 +174,12 @@ ipc_recv(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ if (c->recv_fail) {
+ rv = c->recv_rv;
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
@@ -166,6 +193,21 @@ ipc_recv(void *arg, nni_aio *aio)
}
static void
+ipc_send_fail(ipc_conn *c, int rv)
+{
+ nni_aio *aio;
+
+ c->sending = false;
+ c->send_fail = true;
+ c->send_rv = rv;
+ while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_cv_wake(&c->cv);
+}
+
+static void
ipc_send_start(ipc_conn *c)
{
nni_aio *aio;
@@ -176,43 +218,45 @@ ipc_send_start(ipc_conn *c)
DWORD len;
int rv;
- while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ nni_cv_wake(&c->cv);
+ return;
+ }
- nni_aio_get_iov(aio, &naiov, &aiov);
+ if (c->closed) {
+ ipc_send_fail(c, NNG_ECLOSED);
+ return;
+ }
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes
- // do not appear to support scatter/gather, so we have to
- // process each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
+ nni_aio_get_iov(aio, &naiov, &aiov);
- c->sending = true;
- if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
- ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- c->sending = false;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, nni_win_error(rv));
- } else {
- return;
- }
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes
+ // do not appear to support scatter/gather, so we have to
+ // process each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ c->sending = true;
+ if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ ipc_send_fail(c, nni_win_error(rv));
}
- nni_cv_wake(&c->cv);
}
static void
@@ -284,6 +328,8 @@ ipc_close(void *arg)
{
ipc_conn *c = arg;
nni_time now;
+ nni_aio *aio;
+
nni_mtx_lock(&c->mtx);
if (!c->closed) {
HANDLE f = c->f;
@@ -294,58 +340,53 @@ ipc_close(void *arg)
if (f != INVALID_HANDLE_VALUE) {
CancelIoEx(f, &c->send_io.olpd);
CancelIoEx(f, &c->recv_io.olpd);
- DisconnectNamedPipe(f);
- CloseHandle(f);
}
}
- now = nni_clock();
- // wait up to a maximum of 10 seconds before assuming something is
- // badly amiss. from what we can tell, this doesn't happen, and we do
- // see the timer expire properly, but this safeguard can prevent a
- // hang.
- while ((c->recving || c->sending) &&
- ((nni_clock() - now) < (NNI_SECOND * 10))) {
- nni_mtx_unlock(&c->mtx);
- nni_msleep(1);
- nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_aio_abort(aio, NNG_ECLOSED);
+ }
+ if ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_aio_abort(aio, NNG_ECLOSED);
}
nni_mtx_unlock(&c->mtx);
}
static void
-ipc_conn_reap(void *arg)
+ipc_free(void *arg)
{
ipc_conn *c = arg;
+ nni_aio *aio;
+ HANDLE f = c->f;
+ int loop = 0;
nni_mtx_lock(&c->mtx);
- while ((!nni_list_empty(&c->recv_aios)) ||
- (!nni_list_empty(&c->send_aios))) {
- nni_cv_wait(&c->cv);
+ // time for callbacks to fire/drain.
+ nni_time when = nng_clock() + 5000;
+ while (c->sending || c->recving) {
+ if (nni_cv_until(&c->cv, when) == NNG_ETIMEDOUT) {
+ nng_log_err("NNG-WIN-IPC",
+ "Timeout waiting for operations to cancel");
+ break;
+ }
}
+ // These asserts are for debug, we should never see it.
+ // If we do then something bad happened.
+ NNI_ASSERT(!c->sending);
+ NNI_ASSERT(!c->recving);
+ NNI_ASSERT(nni_list_empty(&c->recv_aios));
+ NNI_ASSERT(nni_list_empty(&c->send_aios));
nni_mtx_unlock(&c->mtx);
- if (c->f != INVALID_HANDLE_VALUE) {
- CloseHandle(c->f);
+ if (f != INVALID_HANDLE_VALUE) {
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
}
+
nni_cv_fini(&c->cv);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
}
-static nni_reap_list ipc_reap_list = {
- .rl_offset = offsetof(ipc_conn, reap),
- .rl_func = ipc_conn_reap,
-};
-
-static void
-ipc_free(void *arg)
-{
- ipc_conn *c = arg;
- ipc_close(c);
-
- nni_reap(&ipc_reap_list, c);
-}
-
static int
ipc_conn_get_addr(void *c, void *buf, size_t *szp, nni_opt_type t)
{