aboutsummaryrefslogtreecommitdiff
path: root/src/platform/windows/win_ipcconn.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/windows/win_ipcconn.c')
-rw-r--r--src/platform/windows/win_ipcconn.c249
1 files changed, 145 insertions, 104 deletions
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c
index ef6ceb66..5f540a6c 100644
--- a/src/platform/windows/win_ipcconn.c
+++ b/src/platform/windows/win_ipcconn.c
@@ -9,6 +9,7 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include "core/aio.h"
#include "core/nng_impl.h"
#include "win_ipc.h"
@@ -32,12 +33,28 @@ typedef struct ipc_conn {
bool closed;
bool sending;
bool recving;
+ bool recv_fail;
+ bool send_fail;
nni_mtx mtx;
nni_cv cv;
nni_reap_node reap;
} ipc_conn;
static void
+ipc_recv_fail(ipc_conn *c, int rv)
+{
+ nni_aio *aio;
+ c->recving = false;
+ c->recv_fail = true;
+ c->recv_rv = rv;
+ while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_cv_wake(&c->cv);
+}
+
+static void
ipc_recv_start(ipc_conn *c)
{
nni_aio *aio;
@@ -48,48 +65,45 @@ ipc_recv_start(ipc_conn *c)
DWORD len;
int rv;
- while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
- if (c->closed) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- continue;
- }
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ nni_cv_wake(&c->cv);
+ return;
+ }
- nni_aio_get_iov(aio, &naiov, &aiov);
+ if (c->closed) {
+ ipc_recv_fail(c, NNG_ECLOSED);
+ return;
+ }
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes
- // do not appear to support scatter/gather, so we have to
- // process each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
+ nni_aio_get_iov(aio, &naiov, &aiov);
- c->recving = true;
- if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
- ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- c->recving = false;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, nni_win_error(rv));
- } else {
- return;
- }
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes
+ // do not appear to support scatter/gather, so we have to
+ // process each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ c->recving = true;
+ if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ ipc_recv_fail(c, nni_win_error(rv));
}
- nni_cv_wake(&c->cv);
}
static void
@@ -97,6 +111,7 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num)
{
nni_aio *aio;
ipc_conn *c = io->ptr;
+
nni_mtx_lock(&c->mtx);
aio = nni_list_first(&c->recv_aios);
NNI_ASSERT(aio != NULL);
@@ -109,11 +124,17 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num)
rv = NNG_ECONNSHUT;
}
c->recving = false;
+ if (rv != 0) {
+ ipc_recv_fail(c, nni_win_error(rv));
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
nni_aio_list_remove(aio);
ipc_recv_start(c);
nni_mtx_unlock(&c->mtx);
- nni_aio_finish_sync(aio, rv, num);
+ // nni_aio_finish_sync(aio, rv, num);
+ nni_aio_finish(aio, rv, num);
}
static void
@@ -153,6 +174,12 @@ ipc_recv(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ if (c->recv_fail) {
+ rv = c->recv_rv;
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
@@ -166,6 +193,21 @@ ipc_recv(void *arg, nni_aio *aio)
}
static void
+ipc_send_fail(ipc_conn *c, int rv)
+{
+ nni_aio *aio;
+
+ c->sending = false;
+ c->send_fail = true;
+ c->send_rv = rv;
+ while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_cv_wake(&c->cv);
+}
+
+static void
ipc_send_start(ipc_conn *c)
{
nni_aio *aio;
@@ -176,43 +218,45 @@ ipc_send_start(ipc_conn *c)
DWORD len;
int rv;
- while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ nni_cv_wake(&c->cv);
+ return;
+ }
- nni_aio_get_iov(aio, &naiov, &aiov);
+ if (c->closed) {
+ ipc_send_fail(c, NNG_ECLOSED);
+ return;
+ }
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes
- // do not appear to support scatter/gather, so we have to
- // process each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
+ nni_aio_get_iov(aio, &naiov, &aiov);
- c->sending = true;
- if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
- ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- c->sending = false;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, nni_win_error(rv));
- } else {
- return;
- }
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes
+ // do not appear to support scatter/gather, so we have to
+ // process each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ c->sending = true;
+ if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ ipc_send_fail(c, nni_win_error(rv));
}
- nni_cv_wake(&c->cv);
}
static void
@@ -284,6 +328,8 @@ ipc_close(void *arg)
{
ipc_conn *c = arg;
nni_time now;
+ nni_aio *aio;
+
nni_mtx_lock(&c->mtx);
if (!c->closed) {
HANDLE f = c->f;
@@ -294,58 +340,53 @@ ipc_close(void *arg)
if (f != INVALID_HANDLE_VALUE) {
CancelIoEx(f, &c->send_io.olpd);
CancelIoEx(f, &c->recv_io.olpd);
- DisconnectNamedPipe(f);
- CloseHandle(f);
}
}
- now = nni_clock();
- // wait up to a maximum of 10 seconds before assuming something is
- // badly amiss. from what we can tell, this doesn't happen, and we do
- // see the timer expire properly, but this safeguard can prevent a
- // hang.
- while ((c->recving || c->sending) &&
- ((nni_clock() - now) < (NNI_SECOND * 10))) {
- nni_mtx_unlock(&c->mtx);
- nni_msleep(1);
- nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_aio_abort(aio, NNG_ECLOSED);
+ }
+ if ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_aio_abort(aio, NNG_ECLOSED);
}
nni_mtx_unlock(&c->mtx);
}
static void
-ipc_conn_reap(void *arg)
+ipc_free(void *arg)
{
ipc_conn *c = arg;
+ nni_aio *aio;
+ HANDLE f = c->f;
+ int loop = 0;
nni_mtx_lock(&c->mtx);
- while ((!nni_list_empty(&c->recv_aios)) ||
- (!nni_list_empty(&c->send_aios))) {
- nni_cv_wait(&c->cv);
+ // time for callbacks to fire/drain.
+ nni_time when = nng_clock() + 5000;
+ while (c->sending || c->recving) {
+ if (nni_cv_until(&c->cv, when) == NNG_ETIMEDOUT) {
+ nng_log_err("NNG-WIN-IPC",
+ "Timeout waiting for operations to cancel");
+ break;
+ }
}
+ // These asserts are for debug, we should never see it.
+ // If we do then something bad happened.
+ NNI_ASSERT(!c->sending);
+ NNI_ASSERT(!c->recving);
+ NNI_ASSERT(nni_list_empty(&c->recv_aios));
+ NNI_ASSERT(nni_list_empty(&c->send_aios));
nni_mtx_unlock(&c->mtx);
- if (c->f != INVALID_HANDLE_VALUE) {
- CloseHandle(c->f);
+ if (f != INVALID_HANDLE_VALUE) {
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
}
+
nni_cv_fini(&c->cv);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
}
-static nni_reap_list ipc_reap_list = {
- .rl_offset = offsetof(ipc_conn, reap),
- .rl_func = ipc_conn_reap,
-};
-
-static void
-ipc_free(void *arg)
-{
- ipc_conn *c = arg;
- ipc_close(c);
-
- nni_reap(&ipc_reap_list, c);
-}
-
static int
ipc_conn_get_addr(void *c, void *buf, size_t *szp, nni_opt_type t)
{