diff options
Diffstat (limited to 'src/platform/windows/win_ipcconn.c')
| -rw-r--r-- | src/platform/windows/win_ipcconn.c | 249 |
1 files changed, 145 insertions, 104 deletions
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c index ef6ceb66..5f540a6c 100644 --- a/src/platform/windows/win_ipcconn.c +++ b/src/platform/windows/win_ipcconn.c @@ -9,6 +9,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "core/aio.h" #include "core/nng_impl.h" #include "win_ipc.h" @@ -32,12 +33,28 @@ typedef struct ipc_conn { bool closed; bool sending; bool recving; + bool recv_fail; + bool send_fail; nni_mtx mtx; nni_cv cv; nni_reap_node reap; } ipc_conn; static void +ipc_recv_fail(ipc_conn *c, int rv) +{ + nni_aio *aio; + c->recving = false; + c->recv_fail = true; + c->recv_rv = rv; + while ((aio = nni_list_first(&c->recv_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_cv_wake(&c->cv); +} + +static void ipc_recv_start(ipc_conn *c) { nni_aio *aio; @@ -48,48 +65,45 @@ ipc_recv_start(ipc_conn *c) DWORD len; int rv; - while ((aio = nni_list_first(&c->recv_aios)) != NULL) { - if (c->closed) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - continue; - } + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + nni_cv_wake(&c->cv); + return; + } - nni_aio_get_iov(aio, &naiov, &aiov); + if (c->closed) { + ipc_recv_fail(c, NNG_ECLOSED); + return; + } - idx = 0; - while ((idx < naiov) && (aiov[idx].iov_len == 0)) { - idx++; - } - NNI_ASSERT(idx < naiov); - // Now start a transfer. We assume that only one send can be - // outstanding on a pipe at a time. This is important to avoid - // scrambling the data anyway. Note that Windows named pipes - // do not appear to support scatter/gather, so we have to - // process each element in turn. - buf = aiov[idx].iov_buf; - len = (DWORD) aiov[idx].iov_len; - NNI_ASSERT(buf != NULL); - NNI_ASSERT(len != 0); - - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - if (len > 0x1000000) { - len = 0x1000000; - } + nni_aio_get_iov(aio, &naiov, &aiov); - c->recving = true; - if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - c->recving = false; - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, nni_win_error(rv)); - } else { - return; - } + idx = 0; + while ((idx < naiov) && (aiov[idx].iov_len == 0)) { + idx++; + } + NNI_ASSERT(idx < naiov); + // Now start a transfer. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes + // do not appear to support scatter/gather, so we have to + // process each element in turn. + buf = aiov[idx].iov_buf; + len = (DWORD) aiov[idx].iov_len; + NNI_ASSERT(buf != NULL); + NNI_ASSERT(len != 0); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } + + c->recving = true; + if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + ipc_recv_fail(c, nni_win_error(rv)); } - nni_cv_wake(&c->cv); } static void @@ -97,6 +111,7 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num) { nni_aio *aio; ipc_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); aio = nni_list_first(&c->recv_aios); NNI_ASSERT(aio != NULL); @@ -109,11 +124,17 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num) rv = NNG_ECONNSHUT; } c->recving = false; + if (rv != 0) { + ipc_recv_fail(c, nni_win_error(rv)); + nni_mtx_unlock(&c->mtx); + return; + } nni_aio_list_remove(aio); ipc_recv_start(c); nni_mtx_unlock(&c->mtx); - nni_aio_finish_sync(aio, rv, num); + // nni_aio_finish_sync(aio, rv, num); + nni_aio_finish(aio, rv, num); } static void @@ -153,6 +174,12 @@ ipc_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } + if (c->recv_fail) { + rv = c->recv_rv; + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) { nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); @@ -166,6 +193,21 @@ ipc_recv(void *arg, nni_aio *aio) } static void +ipc_send_fail(ipc_conn *c, int rv) +{ + nni_aio *aio; + + c->sending = false; + c->send_fail = true; + c->send_rv = rv; + while ((aio = nni_list_first(&c->send_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_cv_wake(&c->cv); +} + +static void ipc_send_start(ipc_conn *c) { nni_aio *aio; @@ -176,43 +218,45 @@ ipc_send_start(ipc_conn *c) DWORD len; int rv; - while ((aio = nni_list_first(&c->send_aios)) != NULL) { + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + nni_cv_wake(&c->cv); + return; + } - nni_aio_get_iov(aio, &naiov, &aiov); + if (c->closed) { + ipc_send_fail(c, NNG_ECLOSED); + return; + } - idx = 0; - while ((idx < naiov) && (aiov[idx].iov_len == 0)) { - idx++; - } - NNI_ASSERT(idx < naiov); - // Now start a transfer. We assume that only one send can be - // outstanding on a pipe at a time. This is important to avoid - // scrambling the data anyway. Note that Windows named pipes - // do not appear to support scatter/gather, so we have to - // process each element in turn. - buf = aiov[idx].iov_buf; - len = (DWORD) aiov[idx].iov_len; - NNI_ASSERT(buf != NULL); - NNI_ASSERT(len != 0); - - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - if (len > 0x1000000) { - len = 0x1000000; - } + nni_aio_get_iov(aio, &naiov, &aiov); - c->sending = true; - if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - c->sending = false; - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, nni_win_error(rv)); - } else { - return; - } + idx = 0; + while ((idx < naiov) && (aiov[idx].iov_len == 0)) { + idx++; + } + NNI_ASSERT(idx < naiov); + // Now start a transfer. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes + // do not appear to support scatter/gather, so we have to + // process each element in turn. + buf = aiov[idx].iov_buf; + len = (DWORD) aiov[idx].iov_len; + NNI_ASSERT(buf != NULL); + NNI_ASSERT(len != 0); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } + + c->sending = true; + if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + ipc_send_fail(c, nni_win_error(rv)); } - nni_cv_wake(&c->cv); } static void @@ -284,6 +328,8 @@ ipc_close(void *arg) { ipc_conn *c = arg; nni_time now; + nni_aio *aio; + nni_mtx_lock(&c->mtx); if (!c->closed) { HANDLE f = c->f; @@ -294,58 +340,53 @@ ipc_close(void *arg) if (f != INVALID_HANDLE_VALUE) { CancelIoEx(f, &c->send_io.olpd); CancelIoEx(f, &c->recv_io.olpd); - DisconnectNamedPipe(f); - CloseHandle(f); } } - now = nni_clock(); - // wait up to a maximum of 10 seconds before assuming something is - // badly amiss. from what we can tell, this doesn't happen, and we do - // see the timer expire properly, but this safeguard can prevent a - // hang. - while ((c->recving || c->sending) && - ((nni_clock() - now) < (NNI_SECOND * 10))) { - nni_mtx_unlock(&c->mtx); - nni_msleep(1); - nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->send_aios)) != NULL) { + nni_aio_abort(aio, NNG_ECLOSED); + } + if ((aio = nni_list_first(&c->recv_aios)) != NULL) { + nni_aio_abort(aio, NNG_ECLOSED); } nni_mtx_unlock(&c->mtx); } static void -ipc_conn_reap(void *arg) +ipc_free(void *arg) { ipc_conn *c = arg; + nni_aio *aio; + HANDLE f = c->f; + int loop = 0; nni_mtx_lock(&c->mtx); - while ((!nni_list_empty(&c->recv_aios)) || - (!nni_list_empty(&c->send_aios))) { - nni_cv_wait(&c->cv); + // time for callbacks to fire/drain. + nni_time when = nng_clock() + 5000; + while (c->sending || c->recving) { + if (nni_cv_until(&c->cv, when) == NNG_ETIMEDOUT) { + nng_log_err("NNG-WIN-IPC", + "Timeout waiting for operations to cancel"); + break; + } } + // These asserts are for debug, we should never see it. + // If we do then something bad happened. + NNI_ASSERT(!c->sending); + NNI_ASSERT(!c->recving); + NNI_ASSERT(nni_list_empty(&c->recv_aios)); + NNI_ASSERT(nni_list_empty(&c->send_aios)); nni_mtx_unlock(&c->mtx); - if (c->f != INVALID_HANDLE_VALUE) { - CloseHandle(c->f); + if (f != INVALID_HANDLE_VALUE) { + DisconnectNamedPipe(f); + CloseHandle(f); } + nni_cv_fini(&c->cv); nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); } -static nni_reap_list ipc_reap_list = { - .rl_offset = offsetof(ipc_conn, reap), - .rl_func = ipc_conn_reap, -}; - -static void -ipc_free(void *arg) -{ - ipc_conn *c = arg; - ipc_close(c); - - nni_reap(&ipc_reap_list, c); -} - static int ipc_conn_get_addr(void *c, void *buf, size_t *szp, nni_opt_type t) { |
