diff options
Diffstat (limited to 'src/platform/windows/win_ipc.c')
| -rw-r--r-- | src/platform/windows/win_ipc.c | 619 |
1 files changed, 201 insertions, 418 deletions
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index 237ded78..be1a98a1 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -13,19 +13,10 @@ #include <stdio.h> -static void nni_win_ipc_acc_cb(void *); -static void nni_win_ipc_send_cb(void *); -static void nni_win_ipc_recv_cb(void *); -static void nni_win_ipc_send_start(nni_plat_ipc_pipe *); -static void nni_win_ipc_recv_start(nni_plat_ipc_pipe *); - struct nni_plat_ipc_pipe { HANDLE p; - nni_win_event recv_evt; - nni_win_event send_evt; - nni_mtx mtx; - nni_list readq; - nni_list writeq; + nni_win_event rcv_ev; + nni_win_event snd_ev; }; struct nni_plat_ipc_ep { @@ -34,118 +25,49 @@ struct nni_plat_ipc_ep { int started; nni_list aios; HANDLE p; // accept side only - nni_win_event acc_evt; // accept side only - nni_mtx mtx; // accept side only + nni_win_event acc_ev; // accept side only + nni_aio * con_aio; // conn side only nni_list_node node; // conn side uses this }; -static int -nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p) -{ - nni_plat_ipc_pipe *pipe; - int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { - NNI_FREE_STRUCT(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->recv_evt, nni_win_ipc_recv_cb, pipe, p); - if (rv != 0) { - nni_plat_ipc_pipe_fini(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->send_evt, nni_win_ipc_send_cb, pipe, p); - if (rv != 0) { - nni_plat_ipc_pipe_fini(pipe); - return (rv); - } - - pipe->p = p; - nni_aio_list_init(&pipe->readq); - nni_aio_list_init(&pipe->writeq); - *pipep = pipe; - return (0); -} - -static void -nni_win_ipc_send_cancel(nni_aio *aio) -{ - nni_plat_ipc_pipe *pipe = aio->a_prov_data; - - nni_mtx_lock(&pipe->mtx); - nni_win_event_cancel(&pipe->recv_evt); - nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); -} - -static void -nni_win_ipc_send_finish(nni_plat_ipc_pipe *pipe) -{ - nni_win_event *evt = &pipe->send_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - int rv = 0; - nni_aio * aio; - DWORD cnt; - - if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) { - rv = nni_win_error(GetLastError()); - } - if ((aio = nni_list_first(&pipe->writeq)) == NULL) { - // If the AIO was canceled, but IOCP thread was still - // working on it, we might have seen this. - return; - } - if (rv == 0) { - NNI_ASSERT(cnt <= aio->a_iov[0].iov_len); - aio->a_count += cnt; - aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt; - aio->a_iov[0].iov_len -= cnt; +static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *); +static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *); +static void nni_win_ipc_pipe_cancel(nni_win_event *, nni_aio *); - if (aio->a_iov[0].iov_len == 0) { - int i; - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } - aio->a_niov--; - } +static nni_win_event_ops nni_win_ipc_pipe_ops = { + .wev_start = nni_win_ipc_pipe_start, + .wev_finish = nni_win_ipc_pipe_finish, + .wev_cancel = nni_win_ipc_pipe_cancel, +}; - if (aio->a_niov > 0) { - // If we have more to do, submit it! - nni_win_ipc_send_start(pipe); - return; - } - } +static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *); +static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *); +static void nni_win_ipc_acc_cancel(nni_win_event *, nni_aio *); - // All done; hopefully successfully. - nni_list_remove(&pipe->writeq, aio); - nni_aio_finish(aio, rv, aio->a_count); -} +static nni_win_event_ops nni_win_ipc_acc_ops = { + .wev_start = nni_win_ipc_acc_start, + .wev_finish = nni_win_ipc_acc_finish, + .wev_cancel = nni_win_ipc_acc_cancel, +}; -static void -nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe) +static int +nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) { - void * buf; - DWORD len; - int rv; - nni_win_event *evt = &pipe->send_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - nni_aio * aio = nni_list_first(&pipe->writeq); + void * buf; + DWORD len; + BOOL ok; + int rv; + nni_plat_ipc_pipe *pipe = evt->ptr; NNI_ASSERT(aio != NULL); NNI_ASSERT(aio->a_niov > 0); NNI_ASSERT(aio->a_iov[0].iov_len > 0); NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); - if (pipe->p == INVALID_HANDLE_VALUE) { - rv = NNG_ECLOSED; - goto fail; - } - - if ((rv = nni_win_event_reset(evt)) != 0) { - goto fail; + if (evt->h == INVALID_HANDLE_VALUE) { + evt->status = ERROR_INVALID_HANDLE; + evt->count = 0; + return (1); } // Now start a writefile. We assume that only one send can be @@ -153,9 +75,8 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe) // scrambling the data anyway. Note that Windows named pipes do // not appear to support scatter/gather, so we have to process // each element in turn. - buf = aio->a_iov[0].iov_buf; - len = (DWORD) aio->a_iov[0].iov_len; - olpd = nni_win_event_overlapped(evt); + buf = aio->a_iov[0].iov_buf; + len = (DWORD) aio->a_iov[0].iov_len; // We limit ourselves to writing 16MB at a time. Named Pipes // on Windows have limits of between 31 and 64MB. @@ -163,95 +84,47 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe) len = 0x1000000; } - if (!WriteFile(pipe->p, buf, len, NULL, olpd)) { - // If we failed immediately, then process it. - if ((rv = GetLastError()) == ERROR_IO_PENDING) { - // This is the normal path we expect; the IO will - // complete asynchronously. - return; - } - - // Some synchronous error occurred. - rv = nni_win_error(rv); - goto fail; + evt->count = 0; + if (evt == &pipe->snd_ev) { + ok = WriteFile(evt->h, buf, len, NULL, &evt->olpd); + } else { + ok = ReadFile(evt->h, buf, len, NULL, &evt->olpd); + } + if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + evt->status = GetLastError(); + evt->count = 0; + return (1); } - // If we completed synchronously, then do the completion. This is - // not normally expected. - nni_win_ipc_send_finish(pipe); - return; - -fail: - nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, aio->a_count); + // Wait for the I/O completion event. Note that when an I/O + // completes immediately, the I/O completion packet is still + // delivered. + return (0); } static void -nni_win_ipc_send_cb(void *arg) +nni_win_ipc_pipe_cancel(nni_win_event *evt, nni_aio *aio) { - nni_plat_ipc_pipe *pipe = arg; + NNI_ARG_UNUSED(aio); - nni_mtx_lock(&pipe->mtx); - nni_win_ipc_send_finish(pipe); - nni_mtx_unlock(&pipe->mtx); -} + if (CancelIoEx(evt->h, &evt->olpd)) { + DWORD cnt; -void -nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio) -{ - nni_win_event *evt = &pipe->send_evt; - int rv; - - nni_mtx_lock(&pipe->mtx); - if ((rv = nni_aio_start(aio, nni_win_ipc_send_cancel, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); - return; - } - if (pipe->p == INVALID_HANDLE_VALUE) { - nni_aio_finish(aio, NNG_ECLOSED, 0); - nni_mtx_unlock(&pipe->mtx); - return; - } - - if ((rv = nni_win_event_reset(evt)) != 0) { - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&pipe->mtx); - return; + // If we canceled, make sure that we've completely + // finished with the overlapped. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); } - nni_aio_list_append(&pipe->writeq, aio); - nni_win_ipc_send_start(pipe); - nni_mtx_unlock(&pipe->mtx); } static void -nni_win_ipc_recv_cancel(nni_aio *aio) +nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio) { - nni_plat_ipc_pipe *pipe = aio->a_prov_data; + int rv = 0; + DWORD cnt; - nni_mtx_lock(&pipe->mtx); - nni_win_event_cancel(&pipe->recv_evt); - nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); -} - -static void -nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe) -{ - nni_win_event *evt = &pipe->recv_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - int rv = 0; - nni_aio * aio; - DWORD cnt; - - if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) { - rv = nni_win_error(GetLastError()); - } - if ((aio = nni_list_first(&pipe->readq)) == NULL) { - // If the AIO was canceled, but IOCP thread was still - // working on it, we might have seen this. - return; - } - if (rv == 0) { + cnt = evt->count; + if ((rv = evt->status) == 0) { NNI_ASSERT(cnt <= aio->a_iov[0].iov_len); aio->a_count += cnt; aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt; @@ -259,133 +132,71 @@ nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe) if (aio->a_iov[0].iov_len == 0) { int i; - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } aio->a_niov--; + for (i = 0; i < aio->a_niov; i++) { + aio->a_iov[i] = aio->a_iov[i + 1]; + } } if (aio->a_niov > 0) { // If we have more to do, submit it! - nni_win_ipc_recv_start(pipe); + nni_win_event_resubmit(evt, aio); return; } } // All done; hopefully successfully. - nni_list_remove(&pipe->readq, aio); - nni_aio_finish(aio, rv, aio->a_count); + nni_aio_finish(aio, nni_win_error(rv), aio->a_count); } -static void -nni_win_ipc_recv_start(nni_plat_ipc_pipe *pipe) +static int +nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p) { - void * buf; - DWORD len; - int rv; - nni_win_event *evt = &pipe->recv_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - nni_aio * aio = nni_list_first(&pipe->readq); - - NNI_ASSERT(aio != NULL); - NNI_ASSERT(aio->a_niov > 0); - NNI_ASSERT(aio->a_iov[0].iov_len > 0); - NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); - - if (pipe->p == INVALID_HANDLE_VALUE) { - rv = NNG_ECLOSED; - goto fail; - } + nni_plat_ipc_pipe *pipe; + int rv; - if ((rv = nni_win_event_reset(evt)) != 0) { - goto fail; + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); } - - // Now start a readfile. We assume that only one read can be - // outstanding on a pipe at a time. This is important to avoid - // scrambling the data anyway. Note that Windows named pipes do - // not appear to support scatter/gather, so we have to process - // each element in turn. - buf = aio->a_iov[0].iov_buf; - len = (DWORD) aio->a_iov[0].iov_len; - olpd = nni_win_event_overlapped(evt); - - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - if (len > 0x1000000) { - len = 0x1000000; + rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe, p); + if (rv != 0) { + nni_plat_ipc_pipe_fini(pipe); + return (rv); } - - if (!ReadFile(pipe->p, buf, len, NULL, olpd)) { - // If we failed immediately, then process it. - if ((rv = GetLastError()) == ERROR_IO_PENDING) { - // This is the normal path we expect; the IO will - // complete asynchronously. - return; - } - - // Some synchronous error occurred. - rv = nni_win_error(rv); - goto fail; + rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe, p); + if (rv != 0) { + nni_plat_ipc_pipe_fini(pipe); + return (rv); } - // If we completed synchronously, then do the completion. This is - // not normally expected. - nni_win_ipc_recv_finish(pipe); - return; - -fail: - nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, 0); + pipe->p = p; + *pipep = pipe; + return (0); } -static void -nni_win_ipc_recv_cb(void *arg) +void +nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio) { - nni_plat_ipc_pipe *pipe = arg; - - nni_mtx_lock(&pipe->mtx); - nni_win_ipc_recv_finish(pipe); - nni_mtx_unlock(&pipe->mtx); + nni_win_event_submit(&pipe->snd_ev, aio); } void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio) { - nni_win_event *evt = &pipe->send_evt; - int rv; - - nni_mtx_lock(&pipe->mtx); - if ((rv = nni_aio_start(aio, nni_win_ipc_recv_cancel, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); - return; - } - if (pipe->p == INVALID_HANDLE_VALUE) { - nni_aio_finish(aio, NNG_ECLOSED, 0); - nni_mtx_unlock(&pipe->mtx); - return; - } - - if ((rv = nni_win_event_reset(evt)) != 0) { - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&pipe->mtx); - return; - } - nni_aio_list_append(&pipe->readq, aio); - nni_win_ipc_recv_start(pipe); + nni_win_event_submit(&pipe->rcv_ev, aio); } void nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe) { - nni_mtx_lock(&pipe->mtx); - if (pipe->p != INVALID_HANDLE_VALUE) { - CloseHandle(pipe->p); + HANDLE p; + + if ((p = pipe->p) != INVALID_HANDLE_VALUE) { pipe->p = INVALID_HANDLE_VALUE; + CloseHandle(p); } - nni_win_event_cancel(&pipe->send_evt); - nni_win_event_cancel(&pipe->recv_evt); - nni_mtx_unlock(&pipe->mtx); + nni_win_event_close(&pipe->snd_ev); + nni_win_event_close(&pipe->rcv_ev); } void @@ -393,9 +204,8 @@ nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe) { nni_plat_ipc_pipe_close(pipe); - nni_win_event_fini(&pipe->send_evt); - nni_win_event_fini(&pipe->recv_evt); - nni_mtx_fini(&pipe->mtx); + nni_win_event_fini(&pipe->snd_ev); + nni_win_event_fini(&pipe->rcv_ev); NNI_FREE_STRUCT(pipe); } @@ -404,7 +214,6 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) { const char * path; nni_plat_ipc_ep *ep; - int rv; if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { return (NNG_EADDRINVAL); @@ -414,14 +223,9 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) return (NNG_ENOMEM); } ZeroMemory(ep, sizeof(ep)); - if ((rv = nni_mtx_init(&ep->mtx)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } ep->mode = mode; NNI_LIST_NODE_INIT(&ep->node); - nni_aio_list_init(&ep->aios); (void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path); @@ -435,13 +239,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) int rv; HANDLE p; - nni_mtx_lock(&ep->mtx); if (ep->mode != NNI_EP_MODE_LISTEN) { - nni_mtx_unlock(&ep->mtx); return (NNG_EINVAL); } if (ep->started) { - nni_mtx_unlock(&ep->mtx); return (NNG_EBUSY); } @@ -461,7 +262,7 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) } goto failed; } - rv = nni_win_event_init(&ep->acc_evt, nni_win_ipc_acc_cb, ep, p); + rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep, p); if (rv != 0) { goto failed; } @@ -472,12 +273,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) ep->p = p; ep->started = 1; - nni_mtx_unlock(&ep->mtx); return (0); failed: - nni_mtx_unlock(&ep->mtx); if (p != INVALID_HANDLE_VALUE) { (void) CloseHandle(p); } @@ -486,40 +285,15 @@ failed: } static void -nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep) +nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) { - nni_win_event * evt = &ep->acc_evt; - DWORD nbytes; - int rv; + nni_plat_ipc_ep * ep = evt->ptr; nni_plat_ipc_pipe *pipe; - nni_aio * aio; + int rv; HANDLE newp, oldp; - // Note: This should be called with the ep lock held, and only when - // the ConnectNamedPipe has finished. - - rv = 0; - if (!GetOverlappedResult(ep->p, &evt->olpd, &nbytes, FALSE)) { - if ((rv = GetLastError()) == ERROR_IO_INCOMPLETE) { - // We should never be here normally, but if the - // pipe got accepted by another client we can - // some times race here. - return; - } - } - - if ((aio = nni_list_first(&ep->aios)) == NULL) { - // No completion available to us. - if (rv == 0) { - NNI_ASSERT(0); - DisconnectNamedPipe(ep->p); - } - return; - } - - nni_list_remove(&ep->aios, aio); - if (rv != 0) { - nni_aio_finish(aio, rv, 0); + if ((rv = evt->status) != 0) { + nni_aio_finish(aio, nni_win_error(rv), 0); return; } @@ -530,14 +304,31 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep) PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, NULL); if (newp == INVALID_HANDLE_VALUE) { rv = nni_win_error(GetLastError()); + // We connected, but as we cannot get a new pipe, + // we have to disconnect the old one. DisconnectNamedPipe(ep->p); + nni_aio_finish(aio, rv, 0); + return; + } + if ((rv = nni_win_iocp_register(newp)) != 0) { + // Disconnect the old pipe. + DisconnectNamedPipe(ep->p); + // And discard the half-baked new one. + DisconnectNamedPipe(newp); + (void) CloseHandle(newp); + nni_aio_finish(aio, rv, 0); return; } - oldp = ep->p; - ep->p = newp; + + oldp = ep->p; + ep->p = newp; + evt->h = newp; if ((rv = nni_win_ipc_pipe_init(&pipe, oldp)) != 0) { + // The new pipe is already fine for us. Discard + // the old one, since failed to be able to use it. DisconnectNamedPipe(oldp); + (void) CloseHandle(oldp); nni_aio_finish(aio, rv, 0); return; } @@ -547,64 +338,56 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep) } static void -nni_win_ipc_acc_cb(void *arg) +nni_win_ipc_acc_cancel(nni_win_event *evt, nni_aio *aio) { - nni_plat_ipc_ep *ep = arg; - - nni_mtx_lock(&ep->mtx); - nni_win_ipc_acc_finish(ep); - nni_mtx_unlock(&ep->mtx); -} + NNI_ARG_UNUSED(aio); -static void -nni_win_ipc_acc_cancel(nni_aio *aio) -{ - nni_plat_ipc_ep *ep = aio->a_prov_data; + if (CancelIoEx(evt->h, &evt->olpd)) { + DWORD cnt; - nni_mtx_lock(&ep->mtx); - nni_win_event_cancel(&ep->acc_evt); - nni_aio_list_remove(aio); - nni_mtx_unlock(&ep->mtx); + // If we canceled, make sure that we've completely + // finished with the overlapped. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + } + // Just to be sure. + (void) DisconnectNamedPipe(evt->h); } -void -nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) +static int +nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio) { - int rv; - nni_win_event *evt = &ep->acc_evt; - - nni_mtx_lock(&ep->mtx); - if (nni_aio_start(aio, nni_win_ipc_acc_cancel, ep) != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } - rv = 0; - if ((rv = nni_win_event_reset(evt)) != 0) { - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&ep->mtx); - return; - } - if (!ConnectNamedPipe(ep->p, nni_win_event_overlapped(evt))) { - rv = GetLastError(); + if (!ConnectNamedPipe(evt->h, &evt->olpd)) { + int rv = GetLastError(); switch (rv) { case ERROR_PIPE_CONNECTED: - rv = 0; - break; + // Synch completion already occurred. + // Windows is weird. Apparently the I/O + // completion packet has already been sent. + return (0); + case ERROR_IO_PENDING: - nni_aio_list_append(&ep->aios, aio); - nni_mtx_unlock(&ep->mtx); - return; + // Normal asynchronous operation. Wait for + // completion. + return (0); default: - rv = nni_win_error(GetLastError()); - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&ep->mtx); - return; + // Fast-fail (synchronous). + evt->status = GetLastError(); + evt->count = 0; + return (1); } } - nni_win_ipc_acc_finish(ep); - nni_mtx_unlock(&ep->mtx); + // Synch completion right now. I/O completion packet delivered + // already. + return (0); +} + +void +nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) +{ + aio->a_pipe = NULL; + nni_win_event_submit(&ep->acc_ev, aio); } // So Windows IPC is a bit different on the client side. There is no @@ -645,12 +428,16 @@ nni_win_ipc_conn_thr(void *arg) } while ((ep = nni_list_first(&w->workers)) != NULL) { + nni_list_remove(&w->workers, ep); - if ((aio = nni_list_first(&ep->aios)) == NULL) { + if ((aio = ep->con_aio) == NULL) { continue; } - nni_list_remove(&ep->aios, aio); + ep->con_aio = NULL; + + pipe = NULL; + p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); @@ -658,41 +445,46 @@ nni_win_ipc_conn_thr(void *arg) if (p == INVALID_HANDLE_VALUE) { switch ((rv = GetLastError())) { case ERROR_PIPE_BUSY: - // still in progress. - nni_list_prepend(&ep->aios, aio); - break; + // Still in progress. This shouldn't + // happen unless the other side aborts + // the connection. + ep->con_aio = aio; + nni_list_append(&w->waiters, ep); + continue; + case ERROR_FILE_NOT_FOUND: - nni_aio_finish( - aio, NNG_ECONNREFUSED, 0); + rv = NNG_ECONNREFUSED; break; default: - nni_aio_finish( - aio, nni_win_error(rv), 0); + rv = nni_win_error(rv); break; } - } else { - rv = nni_win_ipc_pipe_init(&pipe, p); - if (rv == 0) { - rv = nni_win_iocp_register(p); - } - if (rv != 0) { - DisconnectNamedPipe(p); - CloseHandle(p); - nni_aio_finish(aio, rv, 0); - } else { - aio->a_pipe = pipe; - nni_aio_finish(aio, 0, 0); - } + goto fail; + } + if (((rv = nni_win_ipc_pipe_init(&pipe, p)) != 0) || + ((rv = nni_win_iocp_register(p)) != 0)) { + goto fail; + } + aio->a_pipe = pipe; + nni_aio_finish(aio, 0, 0); + continue; + + fail: + if (p != INVALID_HANDLE_VALUE) { + DisconnectNamedPipe(p); + CloseHandle(p); } - if (!nni_list_empty(&ep->aios)) { - nni_list_append(&w->waiters, ep); + if (pipe != NULL) { + nni_plat_ipc_pipe_fini(pipe); } + nni_aio_finish(aio, rv, 0); } - // Wait 10 ms, unless woken earlier. if (nni_list_empty(&w->waiters)) { + // Wait until an endpoint is added. nni_cv_wait(&w->cv); } else { + // Wait 10 ms, unless woken earlier. nni_cv_until(&w->cv, nni_clock() + 10000); } } @@ -706,9 +498,10 @@ nni_win_ipc_conn_cancel(nni_aio *aio) nni_plat_ipc_ep * ep = aio->a_prov_data; nni_mtx_lock(&w->mtx); - nni_aio_list_remove(aio); - if (nni_list_empty(&ep->aios)) { + ep->con_aio = NULL; + if (nni_list_active(&w->waiters, ep)) { nni_list_remove(&w->waiters, ep); + nni_cv_wake(&w->cv); } nni_mtx_unlock(&w->mtx); } @@ -720,17 +513,13 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) nni_mtx_lock(&w->mtx); - if (nni_list_active(&w->waiters, ep)) { - nni_aio_finish(aio, NNG_EBUSY, 0); - nni_mtx_unlock(&w->mtx); - return; - } + NNI_ASSERT(!nni_list_active(&w->waiters, ep)); if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) { nni_mtx_unlock(&w->mtx); return; } - nni_list_append(&ep->aios, aio); + ep->con_aio = aio; nni_list_append(&w->waiters, ep); nni_cv_wake(&w->cv); nni_mtx_unlock(&w->mtx); @@ -739,13 +528,12 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) { - nni_mtx_lock(&ep->mtx); - if (ep->p) { + if (ep->p != INVALID_HANDLE_VALUE) { CloseHandle(ep->p); ep->p = NULL; } - nni_mtx_unlock(&ep->mtx); - nni_mtx_fini(&ep->mtx); + nni_win_event_close(&ep->acc_ev); + nni_win_event_fini(&ep->acc_ev); NNI_FREE_STRUCT(ep); } @@ -761,24 +549,19 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) if (nni_list_active(&w->waiters, ep)) { nni_list_remove(&w->waiters, ep); } - while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_list_remove(&ep->aios, aio); + if ((aio = ep->con_aio) != NULL) { + ep->con_aio = NULL; nni_aio_finish(aio, NNG_ECLOSED, 0); } nni_mtx_unlock(&w->mtx); break; + case NNI_EP_MODE_LISTEN: - nni_mtx_lock(&ep->mtx); - while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_list_remove(&ep->aios, aio); - nni_aio_finish(aio, NNG_ECLOSED, 0); - } if (ep->p != INVALID_HANDLE_VALUE) { - nni_win_event_cancel(&ep->acc_evt); + nni_win_event_close(&ep->acc_ev); CloseHandle(ep->p); ep->p = INVALID_HANDLE_VALUE; } - nni_mtx_unlock(&ep->mtx); break; } } |
