diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/platform/windows/win_debug.c | 21 | ||||
| -rw-r--r-- | src/platform/windows/win_impl.h | 57 | ||||
| -rw-r--r-- | src/platform/windows/win_iocp.c | 186 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 619 |
4 files changed, 373 insertions, 510 deletions
diff --git a/src/platform/windows/win_debug.c b/src/platform/windows/win_debug.c index bc3d5bfa..cc2adb88 100644 --- a/src/platform/windows/win_debug.c +++ b/src/platform/windows/win_debug.c @@ -42,11 +42,20 @@ static struct { int sys_err; int nng_err; } nni_plat_errnos[] = { - { ENOENT, NNG_ENOENT }, { EINTR, NNG_EINTR }, { EINVAL, NNG_EINVAL }, - { ENOMEM, NNG_ENOMEM }, { EACCES, NNG_EPERM }, { EAGAIN, NNG_EAGAIN }, - { EBADF, NNG_ECLOSED }, { EBUSY, NNG_EBUSY }, - { ENAMETOOLONG, NNG_EINVAL }, { EPERM, NNG_EPERM }, - { EPIPE, NNG_ECLOSED }, { 0, 0 } // must be last + // clang-format off + { ENOENT, NNG_ENOENT }, + { EINTR, NNG_EINTR }, + { EINVAL, NNG_EINVAL }, + { ENOMEM, NNG_ENOMEM }, + { EACCES, NNG_EPERM }, + { EAGAIN, NNG_EAGAIN }, + { EBADF, NNG_ECLOSED }, + { EBUSY, NNG_EBUSY }, + { ENAMETOOLONG, NNG_EINVAL }, + { EPERM, NNG_EPERM }, + { EPIPE, NNG_ECLOSED }, + { 0, 0 } // must be last + // clang-format on }; int @@ -107,7 +116,7 @@ nni_win_error(int errnum) if (errnum == 0) { return (0); } - for (i = 0; nni_win_errnos[i].nng_err != 0; i++) { + for (i = 0; nni_win_errnos[i].win_err != 0; i++) { if (errnum == nni_win_errnos[i].win_err) { return (nni_win_errnos[i].nng_err); } diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index 234acc35..4594da53 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -16,10 +16,12 @@ #define WIN32_LEAN_AND_MEAN #endif -#include <mswsock.h> -#include <process.h> +// These headers must be included first. #include <windows.h> #include <winsock2.h> + +#include <mswsock.h> +#include <process.h> #include <ws2tcpip.h> #include "core/list.h" @@ -27,21 +29,6 @@ // These types are provided for here, to permit them to be directly inlined // elsewhere. -typedef struct nni_win_event nni_win_event; - -// nni_win_event is used with io completion ports. This allows us to get -// to a specific completion callback without requiring the poller (in the -// completion port) to know anything about the event itself. We also use -// this to pass back status and counts to the routine, which may not be -// conveyed in the OVERLAPPED directly. -struct nni_win_event { - OVERLAPPED olpd; - HANDLE h; - void * ptr; - nni_cb cb; - nni_list aios; -}; - struct nni_plat_thr { void (*func)(void *); void * arg; @@ -59,14 +46,40 @@ struct nni_plat_cv { PSRWLOCK srl; }; +// nni_win_event is used with io completion ports. This allows us to get +// to a specific completion callback without requiring the poller (in the +// completion port) to know anything about the event itself. We also use +// this to pass back status and counts to the routine, which may not be +// conveyed in the OVERLAPPED directly. +typedef struct nni_win_event nni_win_event; +typedef struct nni_win_event_ops nni_win_event_ops; + +struct nni_win_event_ops { + int (*wev_start)(nni_win_event *, nni_aio *); + void (*wev_finish)(nni_win_event *, nni_aio *); + void (*wev_cancel)(nni_win_event *, nni_aio *); +}; +struct nni_win_event { + OVERLAPPED olpd; + HANDLE h; + void * ptr; + nni_aio * aio; + nni_mtx mtx; + int count; + int status; + nni_win_event_ops ops; +}; + extern int nni_win_error(int); extern int nni_winsock_error(int); -extern int nni_win_event_init(nni_win_event *, nni_cb, void *, HANDLE); -extern void nni_win_event_fini(nni_win_event *); -extern int nni_win_event_reset(nni_win_event *); -extern OVERLAPPED *nni_win_event_overlapped(nni_win_event *); -extern void nni_win_event_cancel(nni_win_event *); +extern int nni_win_event_init( + nni_win_event *, nni_win_event_ops *, void *, HANDLE); +extern void nni_win_event_fini(nni_win_event *); +extern void nni_win_event_submit(nni_win_event *, nni_aio *); +extern void nni_win_event_resubmit(nni_win_event *, nni_aio *); +extern void nni_win_event_close(nni_win_event *); +extern void nni_win_event_complete(nni_win_event *, int); extern int nni_win_iocp_register(HANDLE); diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index 7244b360..17f3b16d 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -31,35 +31,131 @@ nni_win_iocp_handler(void *arg) ULONG_PTR key; OVERLAPPED * olpd; nni_win_event *evt; - int status; - BOOL rv; + int rv; + BOOL ok; + nni_aio * aio; NNI_ARG_UNUSED(arg); iocp = nni_win_global_iocp; for (;;) { - key = 0; - olpd = NULL; - status = 0; - cnt = 0; + key = 0; + olpd = NULL; - rv = GetQueuedCompletionStatus( + ok = GetQueuedCompletionStatus( iocp, &cnt, &key, &olpd, INFINITE); - if (rv == FALSE) { - if (olpd == NULL) { - // Completion port bailed... - break; - } + if (olpd == NULL) { + // Completion port closed... + NNI_ASSERT(ok == FALSE); + break; } - NNI_ASSERT(olpd != NULL); - evt = (void *) olpd; + evt = CONTAINING_RECORD(olpd, nni_win_event, olpd); - NNI_ASSERT(evt->cb != NULL); - evt->cb(evt->ptr); + if (ok) { + rv = ERROR_SUCCESS; + } else if ((rv = GetLastError()) == ERROR_OPERATION_ABORTED) { + // Canceled operation, we can't touch any + // of the memory, since it may be gone. + continue; + } + + nni_mtx_lock(&evt->mtx); + if ((aio = evt->aio) == NULL) { + // Canceled?? Probably ERROR_OPERATION_ABORTED + // It's pretty unclear how we got here. + nni_mtx_unlock(&evt->mtx); + continue; + } + evt->aio = NULL; + evt->status = rv; + evt->count = cnt; + + evt->ops.wev_finish(evt, aio); + nni_mtx_unlock(&evt->mtx); + } +} + +static void +nni_win_event_cancel(nni_aio *aio) +{ + nni_win_event *evt = aio->a_prov_data; + + nni_mtx_lock(&evt->mtx); + evt->aio = NULL; + + // Use provider specific cancellation. + evt->ops.wev_cancel(evt, aio); + nni_mtx_unlock(&evt->mtx); +} + +void +nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) +{ + // This is just continuation of a pre-existing AIO operation. + // For example, continuing I/O of a multi-buffer s/g operation. + // The lock is held. + evt->status = ERROR_SUCCESS; + evt->count = 0; + if (!ResetEvent(evt->olpd.hEvent)) { + evt->status = GetLastError(); + evt->count = 0; + + evt->ops.wev_finish(evt, aio); + return; + } + + evt->aio = aio; + if (evt->ops.wev_start(evt, aio) != 0) { + // Start completed synchronously. It will have stored + // the count and status in the evt. + evt->aio = NULL; + evt->ops.wev_finish(evt, aio); + } +} + +void +nni_win_event_submit(nni_win_event *evt, nni_aio *aio) +{ + nni_mtx_lock(&evt->mtx); + if (nni_aio_start(aio, nni_win_event_cancel, evt) != 0) { + // the aio was aborted + nni_mtx_unlock(&evt->mtx); + return; + } + nni_win_event_resubmit(evt, aio); + nni_mtx_unlock(&evt->mtx); +} + +void +nni_win_event_complete(nni_win_event *evt, int cnt) +{ + PostQueuedCompletionStatus(nni_win_global_iocp, cnt, 0, &evt->olpd); +} + +void +nni_win_event_close(nni_win_event *evt) +{ + nni_aio *aio; + nni_mtx_lock(&evt->mtx); + if (evt->h != NULL) { + if (CancelIoEx(evt->h, &evt->olpd)) { + DWORD cnt; + // Stall waiting for the I/O to complete. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + } + } + if ((aio = evt->aio) != NULL) { + evt->aio = NULL; + // We really don't care if we transferred data or not. + // The caller indicates they have closed the pipe. + evt->status = ERROR_INVALID_HANDLE; + evt->count = 0; + evt->ops.wev_finish(evt, aio); } + nni_mtx_unlock(&evt->mtx); } int @@ -72,16 +168,22 @@ nni_win_iocp_register(HANDLE h) } int -nni_win_event_init(nni_win_event *evt, nni_cb cb, void *ptr, HANDLE h) +nni_win_event_init( + nni_win_event *evt, nni_win_event_ops *ops, void *ptr, HANDLE h) { + int rv; + ZeroMemory(&evt->olpd, sizeof(evt->olpd)); evt->olpd.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (evt->olpd.hEvent == NULL) { return (nni_win_error(GetLastError())); } - nni_aio_list_init(&evt->aios); + if ((rv = nni_mtx_init(&evt->mtx)) != 0) { + return (rv); // NB: This will never happen on Windows. + } + evt->ops = *ops; + evt->aio = NULL; evt->ptr = ptr; - evt->cb = cb; evt->h = h; return (0); } @@ -93,51 +195,7 @@ nni_win_event_fini(nni_win_event *evt) (void) CloseHandle(evt->olpd.hEvent); evt->olpd.hEvent = NULL; } -} - -int -nni_win_event_reset(nni_win_event *evt) -{ - if (!ResetEvent(evt->olpd.hEvent)) { - return (nni_win_error(GetLastError())); - } - return (0); -} - -OVERLAPPED * -nni_win_event_overlapped(nni_win_event *evt) -{ - return (&evt->olpd); -} - -void -nni_win_event_cancel(nni_win_event *evt) -{ - int rv; - DWORD cnt; - - // Try to cancel the event... - if (!CancelIoEx(evt->h, &evt->olpd)) { - // None was found. That's good. - if ((rv = GetLastError()) == ERROR_NOT_FOUND) { - // Nothing queued. We may in theory be running - // the callback via the completion port handler; - // caller must synchronize that separately. - return; - } - - // It's unclear why we would ever fail in this - // circumstance. Is there a kind of "uncancellable I/O" - // here, or somesuch? In this case we just wait hard - // using the success case -- its the best we can do. - } - - // This basically just waits for the canceled I/O to complete. - // The end result can be either success or ERROR_OPERATION_ABORTED. - // It turns out we don't much care either way; we just want to make - // sure that we don't have any I/O pending on the overlapped - // structure before we release it or reuse it. - GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + nni_mtx_fini(&evt->mtx); } int diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index 237ded78..be1a98a1 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -13,19 +13,10 @@ #include <stdio.h> -static void nni_win_ipc_acc_cb(void *); -static void nni_win_ipc_send_cb(void *); -static void nni_win_ipc_recv_cb(void *); -static void nni_win_ipc_send_start(nni_plat_ipc_pipe *); -static void nni_win_ipc_recv_start(nni_plat_ipc_pipe *); - struct nni_plat_ipc_pipe { HANDLE p; - nni_win_event recv_evt; - nni_win_event send_evt; - nni_mtx mtx; - nni_list readq; - nni_list writeq; + nni_win_event rcv_ev; + nni_win_event snd_ev; }; struct nni_plat_ipc_ep { @@ -34,118 +25,49 @@ struct nni_plat_ipc_ep { int started; nni_list aios; HANDLE p; // accept side only - nni_win_event acc_evt; // accept side only - nni_mtx mtx; // accept side only + nni_win_event acc_ev; // accept side only + nni_aio * con_aio; // conn side only nni_list_node node; // conn side uses this }; -static int -nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p) -{ - nni_plat_ipc_pipe *pipe; - int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { - NNI_FREE_STRUCT(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->recv_evt, nni_win_ipc_recv_cb, pipe, p); - if (rv != 0) { - nni_plat_ipc_pipe_fini(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->send_evt, nni_win_ipc_send_cb, pipe, p); - if (rv != 0) { - nni_plat_ipc_pipe_fini(pipe); - return (rv); - } - - pipe->p = p; - nni_aio_list_init(&pipe->readq); - nni_aio_list_init(&pipe->writeq); - *pipep = pipe; - return (0); -} - -static void -nni_win_ipc_send_cancel(nni_aio *aio) -{ - nni_plat_ipc_pipe *pipe = aio->a_prov_data; - - nni_mtx_lock(&pipe->mtx); - nni_win_event_cancel(&pipe->recv_evt); - nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); -} - -static void -nni_win_ipc_send_finish(nni_plat_ipc_pipe *pipe) -{ - nni_win_event *evt = &pipe->send_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - int rv = 0; - nni_aio * aio; - DWORD cnt; - - if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) { - rv = nni_win_error(GetLastError()); - } - if ((aio = nni_list_first(&pipe->writeq)) == NULL) { - // If the AIO was canceled, but IOCP thread was still - // working on it, we might have seen this. - return; - } - if (rv == 0) { - NNI_ASSERT(cnt <= aio->a_iov[0].iov_len); - aio->a_count += cnt; - aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt; - aio->a_iov[0].iov_len -= cnt; +static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *); +static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *); +static void nni_win_ipc_pipe_cancel(nni_win_event *, nni_aio *); - if (aio->a_iov[0].iov_len == 0) { - int i; - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } - aio->a_niov--; - } +static nni_win_event_ops nni_win_ipc_pipe_ops = { + .wev_start = nni_win_ipc_pipe_start, + .wev_finish = nni_win_ipc_pipe_finish, + .wev_cancel = nni_win_ipc_pipe_cancel, +}; - if (aio->a_niov > 0) { - // If we have more to do, submit it! - nni_win_ipc_send_start(pipe); - return; - } - } +static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *); +static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *); +static void nni_win_ipc_acc_cancel(nni_win_event *, nni_aio *); - // All done; hopefully successfully. - nni_list_remove(&pipe->writeq, aio); - nni_aio_finish(aio, rv, aio->a_count); -} +static nni_win_event_ops nni_win_ipc_acc_ops = { + .wev_start = nni_win_ipc_acc_start, + .wev_finish = nni_win_ipc_acc_finish, + .wev_cancel = nni_win_ipc_acc_cancel, +}; -static void -nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe) +static int +nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) { - void * buf; - DWORD len; - int rv; - nni_win_event *evt = &pipe->send_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - nni_aio * aio = nni_list_first(&pipe->writeq); + void * buf; + DWORD len; + BOOL ok; + int rv; + nni_plat_ipc_pipe *pipe = evt->ptr; NNI_ASSERT(aio != NULL); NNI_ASSERT(aio->a_niov > 0); NNI_ASSERT(aio->a_iov[0].iov_len > 0); NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); - if (pipe->p == INVALID_HANDLE_VALUE) { - rv = NNG_ECLOSED; - goto fail; - } - - if ((rv = nni_win_event_reset(evt)) != 0) { - goto fail; + if (evt->h == INVALID_HANDLE_VALUE) { + evt->status = ERROR_INVALID_HANDLE; + evt->count = 0; + return (1); } // Now start a writefile. We assume that only one send can be @@ -153,9 +75,8 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe) // scrambling the data anyway. Note that Windows named pipes do // not appear to support scatter/gather, so we have to process // each element in turn. - buf = aio->a_iov[0].iov_buf; - len = (DWORD) aio->a_iov[0].iov_len; - olpd = nni_win_event_overlapped(evt); + buf = aio->a_iov[0].iov_buf; + len = (DWORD) aio->a_iov[0].iov_len; // We limit ourselves to writing 16MB at a time. Named Pipes // on Windows have limits of between 31 and 64MB. @@ -163,95 +84,47 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe) len = 0x1000000; } - if (!WriteFile(pipe->p, buf, len, NULL, olpd)) { - // If we failed immediately, then process it. - if ((rv = GetLastError()) == ERROR_IO_PENDING) { - // This is the normal path we expect; the IO will - // complete asynchronously. - return; - } - - // Some synchronous error occurred. - rv = nni_win_error(rv); - goto fail; + evt->count = 0; + if (evt == &pipe->snd_ev) { + ok = WriteFile(evt->h, buf, len, NULL, &evt->olpd); + } else { + ok = ReadFile(evt->h, buf, len, NULL, &evt->olpd); + } + if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + evt->status = GetLastError(); + evt->count = 0; + return (1); } - // If we completed synchronously, then do the completion. This is - // not normally expected. - nni_win_ipc_send_finish(pipe); - return; - -fail: - nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, aio->a_count); + // Wait for the I/O completion event. Note that when an I/O + // completes immediately, the I/O completion packet is still + // delivered. + return (0); } static void -nni_win_ipc_send_cb(void *arg) +nni_win_ipc_pipe_cancel(nni_win_event *evt, nni_aio *aio) { - nni_plat_ipc_pipe *pipe = arg; + NNI_ARG_UNUSED(aio); - nni_mtx_lock(&pipe->mtx); - nni_win_ipc_send_finish(pipe); - nni_mtx_unlock(&pipe->mtx); -} + if (CancelIoEx(evt->h, &evt->olpd)) { + DWORD cnt; -void -nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio) -{ - nni_win_event *evt = &pipe->send_evt; - int rv; - - nni_mtx_lock(&pipe->mtx); - if ((rv = nni_aio_start(aio, nni_win_ipc_send_cancel, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); - return; - } - if (pipe->p == INVALID_HANDLE_VALUE) { - nni_aio_finish(aio, NNG_ECLOSED, 0); - nni_mtx_unlock(&pipe->mtx); - return; - } - - if ((rv = nni_win_event_reset(evt)) != 0) { - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&pipe->mtx); - return; + // If we canceled, make sure that we've completely + // finished with the overlapped. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); } - nni_aio_list_append(&pipe->writeq, aio); - nni_win_ipc_send_start(pipe); - nni_mtx_unlock(&pipe->mtx); } static void -nni_win_ipc_recv_cancel(nni_aio *aio) +nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio) { - nni_plat_ipc_pipe *pipe = aio->a_prov_data; + int rv = 0; + DWORD cnt; - nni_mtx_lock(&pipe->mtx); - nni_win_event_cancel(&pipe->recv_evt); - nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); -} - -static void -nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe) -{ - nni_win_event *evt = &pipe->recv_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - int rv = 0; - nni_aio * aio; - DWORD cnt; - - if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) { - rv = nni_win_error(GetLastError()); - } - if ((aio = nni_list_first(&pipe->readq)) == NULL) { - // If the AIO was canceled, but IOCP thread was still - // working on it, we might have seen this. - return; - } - if (rv == 0) { + cnt = evt->count; + if ((rv = evt->status) == 0) { NNI_ASSERT(cnt <= aio->a_iov[0].iov_len); aio->a_count += cnt; aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt; @@ -259,133 +132,71 @@ nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe) if (aio->a_iov[0].iov_len == 0) { int i; - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } aio->a_niov--; + for (i = 0; i < aio->a_niov; i++) { + aio->a_iov[i] = aio->a_iov[i + 1]; + } } if (aio->a_niov > 0) { // If we have more to do, submit it! - nni_win_ipc_recv_start(pipe); + nni_win_event_resubmit(evt, aio); return; } } // All done; hopefully successfully. - nni_list_remove(&pipe->readq, aio); - nni_aio_finish(aio, rv, aio->a_count); + nni_aio_finish(aio, nni_win_error(rv), aio->a_count); } -static void -nni_win_ipc_recv_start(nni_plat_ipc_pipe *pipe) +static int +nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p) { - void * buf; - DWORD len; - int rv; - nni_win_event *evt = &pipe->recv_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - nni_aio * aio = nni_list_first(&pipe->readq); - - NNI_ASSERT(aio != NULL); - NNI_ASSERT(aio->a_niov > 0); - NNI_ASSERT(aio->a_iov[0].iov_len > 0); - NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); - - if (pipe->p == INVALID_HANDLE_VALUE) { - rv = NNG_ECLOSED; - goto fail; - } + nni_plat_ipc_pipe *pipe; + int rv; - if ((rv = nni_win_event_reset(evt)) != 0) { - goto fail; + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); } - - // Now start a readfile. We assume that only one read can be - // outstanding on a pipe at a time. This is important to avoid - // scrambling the data anyway. Note that Windows named pipes do - // not appear to support scatter/gather, so we have to process - // each element in turn. - buf = aio->a_iov[0].iov_buf; - len = (DWORD) aio->a_iov[0].iov_len; - olpd = nni_win_event_overlapped(evt); - - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - if (len > 0x1000000) { - len = 0x1000000; + rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe, p); + if (rv != 0) { + nni_plat_ipc_pipe_fini(pipe); + return (rv); } - - if (!ReadFile(pipe->p, buf, len, NULL, olpd)) { - // If we failed immediately, then process it. - if ((rv = GetLastError()) == ERROR_IO_PENDING) { - // This is the normal path we expect; the IO will - // complete asynchronously. - return; - } - - // Some synchronous error occurred. - rv = nni_win_error(rv); - goto fail; + rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe, p); + if (rv != 0) { + nni_plat_ipc_pipe_fini(pipe); + return (rv); } - // If we completed synchronously, then do the completion. This is - // not normally expected. - nni_win_ipc_recv_finish(pipe); - return; - -fail: - nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, 0); + pipe->p = p; + *pipep = pipe; + return (0); } -static void -nni_win_ipc_recv_cb(void *arg) +void +nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio) { - nni_plat_ipc_pipe *pipe = arg; - - nni_mtx_lock(&pipe->mtx); - nni_win_ipc_recv_finish(pipe); - nni_mtx_unlock(&pipe->mtx); + nni_win_event_submit(&pipe->snd_ev, aio); } void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio) { - nni_win_event *evt = &pipe->send_evt; - int rv; - - nni_mtx_lock(&pipe->mtx); - if ((rv = nni_aio_start(aio, nni_win_ipc_recv_cancel, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); - return; - } - if (pipe->p == INVALID_HANDLE_VALUE) { - nni_aio_finish(aio, NNG_ECLOSED, 0); - nni_mtx_unlock(&pipe->mtx); - return; - } - - if ((rv = nni_win_event_reset(evt)) != 0) { - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&pipe->mtx); - return; - } - nni_aio_list_append(&pipe->readq, aio); - nni_win_ipc_recv_start(pipe); + nni_win_event_submit(&pipe->rcv_ev, aio); } void nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe) { - nni_mtx_lock(&pipe->mtx); - if (pipe->p != INVALID_HANDLE_VALUE) { - CloseHandle(pipe->p); + HANDLE p; + + if ((p = pipe->p) != INVALID_HANDLE_VALUE) { pipe->p = INVALID_HANDLE_VALUE; + CloseHandle(p); } - nni_win_event_cancel(&pipe->send_evt); - nni_win_event_cancel(&pipe->recv_evt); - nni_mtx_unlock(&pipe->mtx); + nni_win_event_close(&pipe->snd_ev); + nni_win_event_close(&pipe->rcv_ev); } void @@ -393,9 +204,8 @@ nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe) { nni_plat_ipc_pipe_close(pipe); - nni_win_event_fini(&pipe->send_evt); - nni_win_event_fini(&pipe->recv_evt); - nni_mtx_fini(&pipe->mtx); + nni_win_event_fini(&pipe->snd_ev); + nni_win_event_fini(&pipe->rcv_ev); NNI_FREE_STRUCT(pipe); } @@ -404,7 +214,6 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) { const char * path; nni_plat_ipc_ep *ep; - int rv; if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { return (NNG_EADDRINVAL); @@ -414,14 +223,9 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) return (NNG_ENOMEM); } ZeroMemory(ep, sizeof(ep)); - if ((rv = nni_mtx_init(&ep->mtx)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } ep->mode = mode; NNI_LIST_NODE_INIT(&ep->node); - nni_aio_list_init(&ep->aios); (void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path); @@ -435,13 +239,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) int rv; HANDLE p; - nni_mtx_lock(&ep->mtx); if (ep->mode != NNI_EP_MODE_LISTEN) { - nni_mtx_unlock(&ep->mtx); return (NNG_EINVAL); } if (ep->started) { - nni_mtx_unlock(&ep->mtx); return (NNG_EBUSY); } @@ -461,7 +262,7 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) } goto failed; } - rv = nni_win_event_init(&ep->acc_evt, nni_win_ipc_acc_cb, ep, p); + rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep, p); if (rv != 0) { goto failed; } @@ -472,12 +273,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) ep->p = p; ep->started = 1; - nni_mtx_unlock(&ep->mtx); return (0); failed: - nni_mtx_unlock(&ep->mtx); if (p != INVALID_HANDLE_VALUE) { (void) CloseHandle(p); } @@ -486,40 +285,15 @@ failed: } static void -nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep) +nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) { - nni_win_event * evt = &ep->acc_evt; - DWORD nbytes; - int rv; + nni_plat_ipc_ep * ep = evt->ptr; nni_plat_ipc_pipe *pipe; - nni_aio * aio; + int rv; HANDLE newp, oldp; - // Note: This should be called with the ep lock held, and only when - // the ConnectNamedPipe has finished. - - rv = 0; - if (!GetOverlappedResult(ep->p, &evt->olpd, &nbytes, FALSE)) { - if ((rv = GetLastError()) == ERROR_IO_INCOMPLETE) { - // We should never be here normally, but if the - // pipe got accepted by another client we can - // some times race here. - return; - } - } - - if ((aio = nni_list_first(&ep->aios)) == NULL) { - // No completion available to us. - if (rv == 0) { - NNI_ASSERT(0); - DisconnectNamedPipe(ep->p); - } - return; - } - - nni_list_remove(&ep->aios, aio); - if (rv != 0) { - nni_aio_finish(aio, rv, 0); + if ((rv = evt->status) != 0) { + nni_aio_finish(aio, nni_win_error(rv), 0); return; } @@ -530,14 +304,31 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep) PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, NULL); if (newp == INVALID_HANDLE_VALUE) { rv = nni_win_error(GetLastError()); + // We connected, but as we cannot get a new pipe, + // we have to disconnect the old one. DisconnectNamedPipe(ep->p); + nni_aio_finish(aio, rv, 0); + return; + } + if ((rv = nni_win_iocp_register(newp)) != 0) { + // Disconnect the old pipe. + DisconnectNamedPipe(ep->p); + // And discard the half-baked new one. + DisconnectNamedPipe(newp); + (void) CloseHandle(newp); + nni_aio_finish(aio, rv, 0); return; } - oldp = ep->p; - ep->p = newp; + + oldp = ep->p; + ep->p = newp; + evt->h = newp; if ((rv = nni_win_ipc_pipe_init(&pipe, oldp)) != 0) { + // The new pipe is already fine for us. Discard + // the old one, since failed to be able to use it. DisconnectNamedPipe(oldp); + (void) CloseHandle(oldp); nni_aio_finish(aio, rv, 0); return; } @@ -547,64 +338,56 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep) } static void -nni_win_ipc_acc_cb(void *arg) +nni_win_ipc_acc_cancel(nni_win_event *evt, nni_aio *aio) { - nni_plat_ipc_ep *ep = arg; - - nni_mtx_lock(&ep->mtx); - nni_win_ipc_acc_finish(ep); - nni_mtx_unlock(&ep->mtx); -} + NNI_ARG_UNUSED(aio); -static void -nni_win_ipc_acc_cancel(nni_aio *aio) -{ - nni_plat_ipc_ep *ep = aio->a_prov_data; + if (CancelIoEx(evt->h, &evt->olpd)) { + DWORD cnt; - nni_mtx_lock(&ep->mtx); - nni_win_event_cancel(&ep->acc_evt); - nni_aio_list_remove(aio); - nni_mtx_unlock(&ep->mtx); + // If we canceled, make sure that we've completely + // finished with the overlapped. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + } + // Just to be sure. + (void) DisconnectNamedPipe(evt->h); } -void -nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) +static int +nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio) { - int rv; - nni_win_event *evt = &ep->acc_evt; - - nni_mtx_lock(&ep->mtx); - if (nni_aio_start(aio, nni_win_ipc_acc_cancel, ep) != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } - rv = 0; - if ((rv = nni_win_event_reset(evt)) != 0) { - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&ep->mtx); - return; - } - if (!ConnectNamedPipe(ep->p, nni_win_event_overlapped(evt))) { - rv = GetLastError(); + if (!ConnectNamedPipe(evt->h, &evt->olpd)) { + int rv = GetLastError(); switch (rv) { case ERROR_PIPE_CONNECTED: - rv = 0; - break; + // Synch completion already occurred. + // Windows is weird. Apparently the I/O + // completion packet has already been sent. + return (0); + case ERROR_IO_PENDING: - nni_aio_list_append(&ep->aios, aio); - nni_mtx_unlock(&ep->mtx); - return; + // Normal asynchronous operation. Wait for + // completion. + return (0); default: - rv = nni_win_error(GetLastError()); - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&ep->mtx); - return; + // Fast-fail (synchronous). + evt->status = GetLastError(); + evt->count = 0; + return (1); } } - nni_win_ipc_acc_finish(ep); - nni_mtx_unlock(&ep->mtx); + // Synch completion right now. I/O completion packet delivered + // already. + return (0); +} + +void +nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) +{ + aio->a_pipe = NULL; + nni_win_event_submit(&ep->acc_ev, aio); } // So Windows IPC is a bit different on the client side. There is no @@ -645,12 +428,16 @@ nni_win_ipc_conn_thr(void *arg) } while ((ep = nni_list_first(&w->workers)) != NULL) { + nni_list_remove(&w->workers, ep); - if ((aio = nni_list_first(&ep->aios)) == NULL) { + if ((aio = ep->con_aio) == NULL) { continue; } - nni_list_remove(&ep->aios, aio); + ep->con_aio = NULL; + + pipe = NULL; + p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); @@ -658,41 +445,46 @@ nni_win_ipc_conn_thr(void *arg) if (p == INVALID_HANDLE_VALUE) { switch ((rv = GetLastError())) { case ERROR_PIPE_BUSY: - // still in progress. - nni_list_prepend(&ep->aios, aio); - break; + // Still in progress. This shouldn't + // happen unless the other side aborts + // the connection. + ep->con_aio = aio; + nni_list_append(&w->waiters, ep); + continue; + case ERROR_FILE_NOT_FOUND: - nni_aio_finish( - aio, NNG_ECONNREFUSED, 0); + rv = NNG_ECONNREFUSED; break; default: - nni_aio_finish( - aio, nni_win_error(rv), 0); + rv = nni_win_error(rv); break; } - } else { - rv = nni_win_ipc_pipe_init(&pipe, p); - if (rv == 0) { - rv = nni_win_iocp_register(p); - } - if (rv != 0) { - DisconnectNamedPipe(p); - CloseHandle(p); - nni_aio_finish(aio, rv, 0); - } else { - aio->a_pipe = pipe; - nni_aio_finish(aio, 0, 0); - } + goto fail; + } + if (((rv = nni_win_ipc_pipe_init(&pipe, p)) != 0) || + ((rv = nni_win_iocp_register(p)) != 0)) { + goto fail; + } + aio->a_pipe = pipe; + nni_aio_finish(aio, 0, 0); + continue; + + fail: + if (p != INVALID_HANDLE_VALUE) { + DisconnectNamedPipe(p); + CloseHandle(p); } - if (!nni_list_empty(&ep->aios)) { - nni_list_append(&w->waiters, ep); + if (pipe != NULL) { + nni_plat_ipc_pipe_fini(pipe); } + nni_aio_finish(aio, rv, 0); } - // Wait 10 ms, unless woken earlier. if (nni_list_empty(&w->waiters)) { + // Wait until an endpoint is added. nni_cv_wait(&w->cv); } else { + // Wait 10 ms, unless woken earlier. nni_cv_until(&w->cv, nni_clock() + 10000); } } @@ -706,9 +498,10 @@ nni_win_ipc_conn_cancel(nni_aio *aio) nni_plat_ipc_ep * ep = aio->a_prov_data; nni_mtx_lock(&w->mtx); - nni_aio_list_remove(aio); - if (nni_list_empty(&ep->aios)) { + ep->con_aio = NULL; + if (nni_list_active(&w->waiters, ep)) { nni_list_remove(&w->waiters, ep); + nni_cv_wake(&w->cv); } nni_mtx_unlock(&w->mtx); } @@ -720,17 +513,13 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) nni_mtx_lock(&w->mtx); - if (nni_list_active(&w->waiters, ep)) { - nni_aio_finish(aio, NNG_EBUSY, 0); - nni_mtx_unlock(&w->mtx); - return; - } + NNI_ASSERT(!nni_list_active(&w->waiters, ep)); if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) { nni_mtx_unlock(&w->mtx); return; } - nni_list_append(&ep->aios, aio); + ep->con_aio = aio; nni_list_append(&w->waiters, ep); nni_cv_wake(&w->cv); nni_mtx_unlock(&w->mtx); @@ -739,13 +528,12 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) { - nni_mtx_lock(&ep->mtx); - if (ep->p) { + if (ep->p != INVALID_HANDLE_VALUE) { CloseHandle(ep->p); ep->p = NULL; } - nni_mtx_unlock(&ep->mtx); - nni_mtx_fini(&ep->mtx); + nni_win_event_close(&ep->acc_ev); + nni_win_event_fini(&ep->acc_ev); NNI_FREE_STRUCT(ep); } @@ -761,24 +549,19 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) if (nni_list_active(&w->waiters, ep)) { nni_list_remove(&w->waiters, ep); } - while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_list_remove(&ep->aios, aio); + if ((aio = ep->con_aio) != NULL) { + ep->con_aio = NULL; nni_aio_finish(aio, NNG_ECLOSED, 0); } nni_mtx_unlock(&w->mtx); break; + case NNI_EP_MODE_LISTEN: - nni_mtx_lock(&ep->mtx); - while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_list_remove(&ep->aios, aio); - nni_aio_finish(aio, NNG_ECLOSED, 0); - } if (ep->p != INVALID_HANDLE_VALUE) { - nni_win_event_cancel(&ep->acc_evt); + nni_win_event_close(&ep->acc_ev); CloseHandle(ep->p); ep->p = INVALID_HANDLE_VALUE; } - nni_mtx_unlock(&ep->mtx); break; } } |
