diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-11 22:59:38 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-11 22:59:38 -0700 |
| commit | 8741c4421ec7a5e889c05a3d7dd46feee93ddf9a (patch) | |
| tree | 9024d46ff202b065c67c2ea75ee5e43417ce4cdb /src/platform/windows | |
| parent | 183bd7e02c81bc09c17c6f4c0d3883d4d45221fc (diff) | |
| download | nng-8741c4421ec7a5e889c05a3d7dd46feee93ddf9a.tar.gz nng-8741c4421ec7a5e889c05a3d7dd46feee93ddf9a.tar.bz2 nng-8741c4421ec7a5e889c05a3d7dd46feee93ddf9a.zip | |
Windows IPC working, mostly.
The IOCP code has been refactored to improve reuse, and hopefully
will be easier to use with TCP now. Windows IPC using Named Pipes
is mostly working -- mostly because there is a gnarly close-race.
It seems that we need to take some more care to ensure that the
pipe is not released while requests may be outstanding -- so some
deeper synchronization between the IOCP callback logic and the
win_event code is needed. In short, we need to add a condvar to
the event, and notice when we have submitted work for async completion,
and make sure we flag the event "idle" after either completion or
cancellation of the event.
Diffstat (limited to 'src/platform/windows')
| -rw-r--r-- | src/platform/windows/win_debug.c | 21 | ||||
| -rw-r--r-- | src/platform/windows/win_impl.h | 57 | ||||
| -rw-r--r-- | src/platform/windows/win_iocp.c | 186 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 619 |
4 files changed, 373 insertions, 510 deletions
diff --git a/src/platform/windows/win_debug.c b/src/platform/windows/win_debug.c index bc3d5bfa..cc2adb88 100644 --- a/src/platform/windows/win_debug.c +++ b/src/platform/windows/win_debug.c @@ -42,11 +42,20 @@ static struct { int sys_err; int nng_err; } nni_plat_errnos[] = { - { ENOENT, NNG_ENOENT }, { EINTR, NNG_EINTR }, { EINVAL, NNG_EINVAL }, - { ENOMEM, NNG_ENOMEM }, { EACCES, NNG_EPERM }, { EAGAIN, NNG_EAGAIN }, - { EBADF, NNG_ECLOSED }, { EBUSY, NNG_EBUSY }, - { ENAMETOOLONG, NNG_EINVAL }, { EPERM, NNG_EPERM }, - { EPIPE, NNG_ECLOSED }, { 0, 0 } // must be last + // clang-format off + { ENOENT, NNG_ENOENT }, + { EINTR, NNG_EINTR }, + { EINVAL, NNG_EINVAL }, + { ENOMEM, NNG_ENOMEM }, + { EACCES, NNG_EPERM }, + { EAGAIN, NNG_EAGAIN }, + { EBADF, NNG_ECLOSED }, + { EBUSY, NNG_EBUSY }, + { ENAMETOOLONG, NNG_EINVAL }, + { EPERM, NNG_EPERM }, + { EPIPE, NNG_ECLOSED }, + { 0, 0 } // must be last + // clang-format on }; int @@ -107,7 +116,7 @@ nni_win_error(int errnum) if (errnum == 0) { return (0); } - for (i = 0; nni_win_errnos[i].nng_err != 0; i++) { + for (i = 0; nni_win_errnos[i].win_err != 0; i++) { if (errnum == nni_win_errnos[i].win_err) { return (nni_win_errnos[i].nng_err); } diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index 234acc35..4594da53 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -16,10 +16,12 @@ #define WIN32_LEAN_AND_MEAN #endif -#include <mswsock.h> -#include <process.h> +// These headers must be included first. #include <windows.h> #include <winsock2.h> + +#include <mswsock.h> +#include <process.h> #include <ws2tcpip.h> #include "core/list.h" @@ -27,21 +29,6 @@ // These types are provided for here, to permit them to be directly inlined // elsewhere. -typedef struct nni_win_event nni_win_event; - -// nni_win_event is used with io completion ports. This allows us to get -// to a specific completion callback without requiring the poller (in the -// completion port) to know anything about the event itself. We also use -// this to pass back status and counts to the routine, which may not be -// conveyed in the OVERLAPPED directly. -struct nni_win_event { - OVERLAPPED olpd; - HANDLE h; - void * ptr; - nni_cb cb; - nni_list aios; -}; - struct nni_plat_thr { void (*func)(void *); void * arg; @@ -59,14 +46,40 @@ struct nni_plat_cv { PSRWLOCK srl; }; +// nni_win_event is used with io completion ports. This allows us to get +// to a specific completion callback without requiring the poller (in the +// completion port) to know anything about the event itself. We also use +// this to pass back status and counts to the routine, which may not be +// conveyed in the OVERLAPPED directly. +typedef struct nni_win_event nni_win_event; +typedef struct nni_win_event_ops nni_win_event_ops; + +struct nni_win_event_ops { + int (*wev_start)(nni_win_event *, nni_aio *); + void (*wev_finish)(nni_win_event *, nni_aio *); + void (*wev_cancel)(nni_win_event *, nni_aio *); +}; +struct nni_win_event { + OVERLAPPED olpd; + HANDLE h; + void * ptr; + nni_aio * aio; + nni_mtx mtx; + int count; + int status; + nni_win_event_ops ops; +}; + extern int nni_win_error(int); extern int nni_winsock_error(int); -extern int nni_win_event_init(nni_win_event *, nni_cb, void *, HANDLE); -extern void nni_win_event_fini(nni_win_event *); -extern int nni_win_event_reset(nni_win_event *); -extern OVERLAPPED *nni_win_event_overlapped(nni_win_event *); -extern void nni_win_event_cancel(nni_win_event *); +extern int nni_win_event_init( + nni_win_event *, nni_win_event_ops *, void *, HANDLE); +extern void nni_win_event_fini(nni_win_event *); +extern void nni_win_event_submit(nni_win_event *, nni_aio *); +extern void nni_win_event_resubmit(nni_win_event *, nni_aio *); +extern void nni_win_event_close(nni_win_event *); +extern void nni_win_event_complete(nni_win_event *, int); extern int nni_win_iocp_register(HANDLE); diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index 7244b360..17f3b16d 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -31,35 +31,131 @@ nni_win_iocp_handler(void *arg) ULONG_PTR key; OVERLAPPED * olpd; nni_win_event *evt; - int status; - BOOL rv; + int rv; + BOOL ok; + nni_aio * aio; NNI_ARG_UNUSED(arg); iocp = nni_win_global_iocp; for (;;) { - key = 0; - olpd = NULL; - status = 0; - cnt = 0; + key = 0; + olpd = NULL; - rv = GetQueuedCompletionStatus( + ok = GetQueuedCompletionStatus( iocp, &cnt, &key, &olpd, INFINITE); - if (rv == FALSE) { - if (olpd == NULL) { - // Completion port bailed... - break; - } + if (olpd == NULL) { + // Completion port closed... + NNI_ASSERT(ok == FALSE); + break; } - NNI_ASSERT(olpd != NULL); - evt = (void *) olpd; + evt = CONTAINING_RECORD(olpd, nni_win_event, olpd); - NNI_ASSERT(evt->cb != NULL); - evt->cb(evt->ptr); + if (ok) { + rv = ERROR_SUCCESS; + } else if ((rv = GetLastError()) == ERROR_OPERATION_ABORTED) { + // Canceled operation, we can't touch any + // of the memory, since it may be gone. + continue; + } + + nni_mtx_lock(&evt->mtx); + if ((aio = evt->aio) == NULL) { + // Canceled?? Probably ERROR_OPERATION_ABORTED + // It's pretty unclear how we got here. + nni_mtx_unlock(&evt->mtx); + continue; + } + evt->aio = NULL; + evt->status = rv; + evt->count = cnt; + + evt->ops.wev_finish(evt, aio); + nni_mtx_unlock(&evt->mtx); + } +} + +static void +nni_win_event_cancel(nni_aio *aio) +{ + nni_win_event *evt = aio->a_prov_data; + + nni_mtx_lock(&evt->mtx); + evt->aio = NULL; + + // Use provider specific cancellation. + evt->ops.wev_cancel(evt, aio); + nni_mtx_unlock(&evt->mtx); +} + +void +nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) +{ + // This is just continuation of a pre-existing AIO operation. + // For example, continuing I/O of a multi-buffer s/g operation. + // The lock is held. + evt->status = ERROR_SUCCESS; + evt->count = 0; + if (!ResetEvent(evt->olpd.hEvent)) { + evt->status = GetLastError(); + evt->count = 0; + + evt->ops.wev_finish(evt, aio); + return; + } + + evt->aio = aio; + if (evt->ops.wev_start(evt, aio) != 0) { + // Start completed synchronously. It will have stored + // the count and status in the evt. + evt->aio = NULL; + evt->ops.wev_finish(evt, aio); + } +} + +void +nni_win_event_submit(nni_win_event *evt, nni_aio *aio) +{ + nni_mtx_lock(&evt->mtx); + if (nni_aio_start(aio, nni_win_event_cancel, evt) != 0) { + // the aio was aborted + nni_mtx_unlock(&evt->mtx); + return; + } + nni_win_event_resubmit(evt, aio); + nni_mtx_unlock(&evt->mtx); +} + +void +nni_win_event_complete(nni_win_event *evt, int cnt) +{ + PostQueuedCompletionStatus(nni_win_global_iocp, cnt, 0, &evt->olpd); +} + +void +nni_win_event_close(nni_win_event *evt) +{ + nni_aio *aio; + nni_mtx_lock(&evt->mtx); + if (evt->h != NULL) { + if (CancelIoEx(evt->h, &evt->olpd)) { + DWORD cnt; + // Stall waiting for the I/O to complete. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + } + } + if ((aio = evt->aio) != NULL) { + evt->aio = NULL; + // We really don't care if we transferred data or not. + // The caller indicates they have closed the pipe. + evt->status = ERROR_INVALID_HANDLE; + evt->count = 0; + evt->ops.wev_finish(evt, aio); } + nni_mtx_unlock(&evt->mtx); } int @@ -72,16 +168,22 @@ nni_win_iocp_register(HANDLE h) } int -nni_win_event_init(nni_win_event *evt, nni_cb cb, void *ptr, HANDLE h) +nni_win_event_init( + nni_win_event *evt, nni_win_event_ops *ops, void *ptr, HANDLE h) { + int rv; + ZeroMemory(&evt->olpd, sizeof(evt->olpd)); evt->olpd.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (evt->olpd.hEvent == NULL) { return (nni_win_error(GetLastError())); } - nni_aio_list_init(&evt->aios); + if ((rv = nni_mtx_init(&evt->mtx)) != 0) { + return (rv); // NB: This will never happen on Windows. + } + evt->ops = *ops; + evt->aio = NULL; evt->ptr = ptr; - evt->cb = cb; evt->h = h; return (0); } @@ -93,51 +195,7 @@ nni_win_event_fini(nni_win_event *evt) (void) CloseHandle(evt->olpd.hEvent); evt->olpd.hEvent = NULL; } -} - -int -nni_win_event_reset(nni_win_event *evt) -{ - if (!ResetEvent(evt->olpd.hEvent)) { - return (nni_win_error(GetLastError())); - } - return (0); -} - -OVERLAPPED * -nni_win_event_overlapped(nni_win_event *evt) -{ - return (&evt->olpd); -} - -void -nni_win_event_cancel(nni_win_event *evt) -{ - int rv; - DWORD cnt; - - // Try to cancel the event... - if (!CancelIoEx(evt->h, &evt->olpd)) { - // None was found. That's good. - if ((rv = GetLastError()) == ERROR_NOT_FOUND) { - // Nothing queued. We may in theory be running - // the callback via the completion port handler; - // caller must synchronize that separately. - return; - } - - // It's unclear why we would ever fail in this - // circumstance. Is there a kind of "uncancellable I/O" - // here, or somesuch? In this case we just wait hard - // using the success case -- its the best we can do. - } - - // This basically just waits for the canceled I/O to complete. - // The end result can be either success or ERROR_OPERATION_ABORTED. - // It turns out we don't much care either way; we just want to make - // sure that we don't have any I/O pending on the overlapped - // structure before we release it or reuse it. - GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + nni_mtx_fini(&evt->mtx); } int diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index 237ded78..be1a98a1 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -13,19 +13,10 @@ #include <stdio.h> -static void nni_win_ipc_acc_cb(void *); -static void nni_win_ipc_send_cb(void *); -static void nni_win_ipc_recv_cb(void *); -static void nni_win_ipc_send_start(nni_plat_ipc_pipe *); -static void nni_win_ipc_recv_start(nni_plat_ipc_pipe *); - struct nni_plat_ipc_pipe { HANDLE p; - nni_win_event recv_evt; - nni_win_event send_evt; - nni_mtx mtx; - nni_list readq; - nni_list writeq; + nni_win_event rcv_ev; + nni_win_event snd_ev; }; struct nni_plat_ipc_ep { @@ -34,118 +25,49 @@ struct nni_plat_ipc_ep { int started; nni_list aios; HANDLE p; // accept side only - nni_win_event acc_evt; // accept side only - nni_mtx mtx; // accept side only + nni_win_event acc_ev; // accept side only + nni_aio * con_aio; // conn side only nni_list_node node; // conn side uses this }; -static int -nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p) -{ - nni_plat_ipc_pipe *pipe; - int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { - NNI_FREE_STRUCT(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->recv_evt, nni_win_ipc_recv_cb, pipe, p); - if (rv != 0) { - nni_plat_ipc_pipe_fini(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->send_evt, nni_win_ipc_send_cb, pipe, p); - if (rv != 0) { - nni_plat_ipc_pipe_fini(pipe); - return (rv); - } - - pipe->p = p; - nni_aio_list_init(&pipe->readq); - nni_aio_list_init(&pipe->writeq); - *pipep = pipe; - return (0); -} - -static void -nni_win_ipc_send_cancel(nni_aio *aio) -{ - nni_plat_ipc_pipe *pipe = aio->a_prov_data; - - nni_mtx_lock(&pipe->mtx); - nni_win_event_cancel(&pipe->recv_evt); - nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); -} - -static void -nni_win_ipc_send_finish(nni_plat_ipc_pipe *pipe) -{ - nni_win_event *evt = &pipe->send_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - int rv = 0; - nni_aio * aio; - DWORD cnt; - - if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) { - rv = nni_win_error(GetLastError()); - } - if ((aio = nni_list_first(&pipe->writeq)) == NULL) { - // If the AIO was canceled, but IOCP thread was still - // working on it, we might have seen this. - return; - } - if (rv == 0) { - NNI_ASSERT(cnt <= aio->a_iov[0].iov_len); - aio->a_count += cnt; - aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt; - aio->a_iov[0].iov_len -= cnt; +static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *); +static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *); +static void nni_win_ipc_pipe_cancel(nni_win_event *, nni_aio *); - if (aio->a_iov[0].iov_len == 0) { - int i; - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } - aio->a_niov--; - } +static nni_win_event_ops nni_win_ipc_pipe_ops = { + .wev_start = nni_win_ipc_pipe_start, + .wev_finish = nni_win_ipc_pipe_finish, + .wev_cancel = nni_win_ipc_pipe_cancel, +}; - if (aio->a_niov > 0) { - // If we have more to do, submit it! - nni_win_ipc_send_start(pipe); - return; - } - } +static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *); +static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *); +static void nni_win_ipc_acc_cancel(nni_win_event *, nni_aio *); - // All done; hopefully successfully. - nni_list_remove(&pipe->writeq, aio); - nni_aio_finish(aio, rv, aio->a_count); -} +static nni_win_event_ops nni_win_ipc_acc_ops = { + .wev_start = nni_win_ipc_acc_start, + .wev_finish = nni_win_ipc_acc_finish, + .wev_cancel = nni_win_ipc_acc_cancel, +}; -static void -nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe) +static int +nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) { - void * buf; - DWORD len; - int rv; - nni_win_event *evt = &pipe->send_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - nni_aio * aio = nni_list_first(&pipe->writeq); + void * buf; + DWORD len; + BOOL ok; + int rv; + nni_plat_ipc_pipe *pipe = evt->ptr; NNI_ASSERT(aio != NULL); NNI_ASSERT(aio->a_niov > 0); NNI_ASSERT(aio->a_iov[0].iov_len > 0); NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); - if (pipe->p == INVALID_HANDLE_VALUE) { - rv = NNG_ECLOSED; - goto fail; - } - - if ((rv = nni_win_event_reset(evt)) != 0) { - goto fail; + if (evt->h == INVALID_HANDLE_VALUE) { + evt->status = ERROR_INVALID_HANDLE; + evt->count = 0; + return (1); } // Now start a writefile. We assume that only one send can be @@ -153,9 +75,8 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe) // scrambling the data anyway. Note that Windows named pipes do // not appear to support scatter/gather, so we have to process // each element in turn. - buf = aio->a_iov[0].iov_buf; - len = (DWORD) aio->a_iov[0].iov_len; - olpd = nni_win_event_overlapped(evt); + buf = aio->a_iov[0].iov_buf; + len = (DWORD) aio->a_iov[0].iov_len; // We limit ourselves to writing 16MB at a time. Named Pipes // on Windows have limits of between 31 and 64MB. @@ -163,95 +84,47 @@ nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe) len = 0x1000000; } - if (!WriteFile(pipe->p, buf, len, NULL, olpd)) { - // If we failed immediately, then process it. - if ((rv = GetLastError()) == ERROR_IO_PENDING) { - // This is the normal path we expect; the IO will - // complete asynchronously. - return; - } - - // Some synchronous error occurred. - rv = nni_win_error(rv); - goto fail; + evt->count = 0; + if (evt == &pipe->snd_ev) { + ok = WriteFile(evt->h, buf, len, NULL, &evt->olpd); + } else { + ok = ReadFile(evt->h, buf, len, NULL, &evt->olpd); + } + if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + evt->status = GetLastError(); + evt->count = 0; + return (1); } - // If we completed synchronously, then do the completion. This is - // not normally expected. - nni_win_ipc_send_finish(pipe); - return; - -fail: - nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, aio->a_count); + // Wait for the I/O completion event. Note that when an I/O + // completes immediately, the I/O completion packet is still + // delivered. + return (0); } static void -nni_win_ipc_send_cb(void *arg) +nni_win_ipc_pipe_cancel(nni_win_event *evt, nni_aio *aio) { - nni_plat_ipc_pipe *pipe = arg; + NNI_ARG_UNUSED(aio); - nni_mtx_lock(&pipe->mtx); - nni_win_ipc_send_finish(pipe); - nni_mtx_unlock(&pipe->mtx); -} + if (CancelIoEx(evt->h, &evt->olpd)) { + DWORD cnt; -void -nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio) -{ - nni_win_event *evt = &pipe->send_evt; - int rv; - - nni_mtx_lock(&pipe->mtx); - if ((rv = nni_aio_start(aio, nni_win_ipc_send_cancel, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); - return; - } - if (pipe->p == INVALID_HANDLE_VALUE) { - nni_aio_finish(aio, NNG_ECLOSED, 0); - nni_mtx_unlock(&pipe->mtx); - return; - } - - if ((rv = nni_win_event_reset(evt)) != 0) { - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&pipe->mtx); - return; + // If we canceled, make sure that we've completely + // finished with the overlapped. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); } - nni_aio_list_append(&pipe->writeq, aio); - nni_win_ipc_send_start(pipe); - nni_mtx_unlock(&pipe->mtx); } static void -nni_win_ipc_recv_cancel(nni_aio *aio) +nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio) { - nni_plat_ipc_pipe *pipe = aio->a_prov_data; + int rv = 0; + DWORD cnt; - nni_mtx_lock(&pipe->mtx); - nni_win_event_cancel(&pipe->recv_evt); - nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); -} - -static void -nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe) -{ - nni_win_event *evt = &pipe->recv_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - int rv = 0; - nni_aio * aio; - DWORD cnt; - - if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) { - rv = nni_win_error(GetLastError()); - } - if ((aio = nni_list_first(&pipe->readq)) == NULL) { - // If the AIO was canceled, but IOCP thread was still - // working on it, we might have seen this. - return; - } - if (rv == 0) { + cnt = evt->count; + if ((rv = evt->status) == 0) { NNI_ASSERT(cnt <= aio->a_iov[0].iov_len); aio->a_count += cnt; aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt; @@ -259,133 +132,71 @@ nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe) if (aio->a_iov[0].iov_len == 0) { int i; - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } aio->a_niov--; + for (i = 0; i < aio->a_niov; i++) { + aio->a_iov[i] = aio->a_iov[i + 1]; + } } if (aio->a_niov > 0) { // If we have more to do, submit it! - nni_win_ipc_recv_start(pipe); + nni_win_event_resubmit(evt, aio); return; } } // All done; hopefully successfully. - nni_list_remove(&pipe->readq, aio); - nni_aio_finish(aio, rv, aio->a_count); + nni_aio_finish(aio, nni_win_error(rv), aio->a_count); } -static void -nni_win_ipc_recv_start(nni_plat_ipc_pipe *pipe) +static int +nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p) { - void * buf; - DWORD len; - int rv; - nni_win_event *evt = &pipe->recv_evt; - OVERLAPPED * olpd = nni_win_event_overlapped(evt); - nni_aio * aio = nni_list_first(&pipe->readq); - - NNI_ASSERT(aio != NULL); - NNI_ASSERT(aio->a_niov > 0); - NNI_ASSERT(aio->a_iov[0].iov_len > 0); - NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); - - if (pipe->p == INVALID_HANDLE_VALUE) { - rv = NNG_ECLOSED; - goto fail; - } + nni_plat_ipc_pipe *pipe; + int rv; - if ((rv = nni_win_event_reset(evt)) != 0) { - goto fail; + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); } - - // Now start a readfile. We assume that only one read can be - // outstanding on a pipe at a time. This is important to avoid - // scrambling the data anyway. Note that Windows named pipes do - // not appear to support scatter/gather, so we have to process - // each element in turn. - buf = aio->a_iov[0].iov_buf; - len = (DWORD) aio->a_iov[0].iov_len; - olpd = nni_win_event_overlapped(evt); - - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - if (len > 0x1000000) { - len = 0x1000000; + rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe, p); + if (rv != 0) { + nni_plat_ipc_pipe_fini(pipe); + return (rv); } - - if (!ReadFile(pipe->p, buf, len, NULL, olpd)) { - // If we failed immediately, then process it. - if ((rv = GetLastError()) == ERROR_IO_PENDING) { - // This is the normal path we expect; the IO will - // complete asynchronously. - return; - } - - // Some synchronous error occurred. - rv = nni_win_error(rv); - goto fail; + rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe, p); + if (rv != 0) { + nni_plat_ipc_pipe_fini(pipe); + return (rv); } - // If we completed synchronously, then do the completion. This is - // not normally expected. - nni_win_ipc_recv_finish(pipe); - return; - -fail: - nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, 0); + pipe->p = p; + *pipep = pipe; + return (0); } -static void -nni_win_ipc_recv_cb(void *arg) +void +nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio) { - nni_plat_ipc_pipe *pipe = arg; - - nni_mtx_lock(&pipe->mtx); - nni_win_ipc_recv_finish(pipe); - nni_mtx_unlock(&pipe->mtx); + nni_win_event_submit(&pipe->snd_ev, aio); } void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio) { - nni_win_event *evt = &pipe->send_evt; - int rv; - - nni_mtx_lock(&pipe->mtx); - if ((rv = nni_aio_start(aio, nni_win_ipc_recv_cancel, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); - return; - } - if (pipe->p == INVALID_HANDLE_VALUE) { - nni_aio_finish(aio, NNG_ECLOSED, 0); - nni_mtx_unlock(&pipe->mtx); - return; - } - - if ((rv = nni_win_event_reset(evt)) != 0) { - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&pipe->mtx); - return; - } - nni_aio_list_append(&pipe->readq, aio); - nni_win_ipc_recv_start(pipe); + nni_win_event_submit(&pipe->rcv_ev, aio); } void nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe) { - nni_mtx_lock(&pipe->mtx); - if (pipe->p != INVALID_HANDLE_VALUE) { - CloseHandle(pipe->p); + HANDLE p; + + if ((p = pipe->p) != INVALID_HANDLE_VALUE) { pipe->p = INVALID_HANDLE_VALUE; + CloseHandle(p); } - nni_win_event_cancel(&pipe->send_evt); - nni_win_event_cancel(&pipe->recv_evt); - nni_mtx_unlock(&pipe->mtx); + nni_win_event_close(&pipe->snd_ev); + nni_win_event_close(&pipe->rcv_ev); } void @@ -393,9 +204,8 @@ nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe) { nni_plat_ipc_pipe_close(pipe); - nni_win_event_fini(&pipe->send_evt); - nni_win_event_fini(&pipe->recv_evt); - nni_mtx_fini(&pipe->mtx); + nni_win_event_fini(&pipe->snd_ev); + nni_win_event_fini(&pipe->rcv_ev); NNI_FREE_STRUCT(pipe); } @@ -404,7 +214,6 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) { const char * path; nni_plat_ipc_ep *ep; - int rv; if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { return (NNG_EADDRINVAL); @@ -414,14 +223,9 @@ nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) return (NNG_ENOMEM); } ZeroMemory(ep, sizeof(ep)); - if ((rv = nni_mtx_init(&ep->mtx)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } ep->mode = mode; NNI_LIST_NODE_INIT(&ep->node); - nni_aio_list_init(&ep->aios); (void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path); @@ -435,13 +239,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) int rv; HANDLE p; - nni_mtx_lock(&ep->mtx); if (ep->mode != NNI_EP_MODE_LISTEN) { - nni_mtx_unlock(&ep->mtx); return (NNG_EINVAL); } if (ep->started) { - nni_mtx_unlock(&ep->mtx); return (NNG_EBUSY); } @@ -461,7 +262,7 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) } goto failed; } - rv = nni_win_event_init(&ep->acc_evt, nni_win_ipc_acc_cb, ep, p); + rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep, p); if (rv != 0) { goto failed; } @@ -472,12 +273,10 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) ep->p = p; ep->started = 1; - nni_mtx_unlock(&ep->mtx); return (0); failed: - nni_mtx_unlock(&ep->mtx); if (p != INVALID_HANDLE_VALUE) { (void) CloseHandle(p); } @@ -486,40 +285,15 @@ failed: } static void -nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep) +nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) { - nni_win_event * evt = &ep->acc_evt; - DWORD nbytes; - int rv; + nni_plat_ipc_ep * ep = evt->ptr; nni_plat_ipc_pipe *pipe; - nni_aio * aio; + int rv; HANDLE newp, oldp; - // Note: This should be called with the ep lock held, and only when - // the ConnectNamedPipe has finished. - - rv = 0; - if (!GetOverlappedResult(ep->p, &evt->olpd, &nbytes, FALSE)) { - if ((rv = GetLastError()) == ERROR_IO_INCOMPLETE) { - // We should never be here normally, but if the - // pipe got accepted by another client we can - // some times race here. - return; - } - } - - if ((aio = nni_list_first(&ep->aios)) == NULL) { - // No completion available to us. - if (rv == 0) { - NNI_ASSERT(0); - DisconnectNamedPipe(ep->p); - } - return; - } - - nni_list_remove(&ep->aios, aio); - if (rv != 0) { - nni_aio_finish(aio, rv, 0); + if ((rv = evt->status) != 0) { + nni_aio_finish(aio, nni_win_error(rv), 0); return; } @@ -530,14 +304,31 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep) PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, NULL); if (newp == INVALID_HANDLE_VALUE) { rv = nni_win_error(GetLastError()); + // We connected, but as we cannot get a new pipe, + // we have to disconnect the old one. DisconnectNamedPipe(ep->p); + nni_aio_finish(aio, rv, 0); + return; + } + if ((rv = nni_win_iocp_register(newp)) != 0) { + // Disconnect the old pipe. + DisconnectNamedPipe(ep->p); + // And discard the half-baked new one. + DisconnectNamedPipe(newp); + (void) CloseHandle(newp); + nni_aio_finish(aio, rv, 0); return; } - oldp = ep->p; - ep->p = newp; + + oldp = ep->p; + ep->p = newp; + evt->h = newp; if ((rv = nni_win_ipc_pipe_init(&pipe, oldp)) != 0) { + // The new pipe is already fine for us. Discard + // the old one, since failed to be able to use it. DisconnectNamedPipe(oldp); + (void) CloseHandle(oldp); nni_aio_finish(aio, rv, 0); return; } @@ -547,64 +338,56 @@ nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep) } static void -nni_win_ipc_acc_cb(void *arg) +nni_win_ipc_acc_cancel(nni_win_event *evt, nni_aio *aio) { - nni_plat_ipc_ep *ep = arg; - - nni_mtx_lock(&ep->mtx); - nni_win_ipc_acc_finish(ep); - nni_mtx_unlock(&ep->mtx); -} + NNI_ARG_UNUSED(aio); -static void -nni_win_ipc_acc_cancel(nni_aio *aio) -{ - nni_plat_ipc_ep *ep = aio->a_prov_data; + if (CancelIoEx(evt->h, &evt->olpd)) { + DWORD cnt; - nni_mtx_lock(&ep->mtx); - nni_win_event_cancel(&ep->acc_evt); - nni_aio_list_remove(aio); - nni_mtx_unlock(&ep->mtx); + // If we canceled, make sure that we've completely + // finished with the overlapped. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + } + // Just to be sure. + (void) DisconnectNamedPipe(evt->h); } -void -nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) +static int +nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio) { - int rv; - nni_win_event *evt = &ep->acc_evt; - - nni_mtx_lock(&ep->mtx); - if (nni_aio_start(aio, nni_win_ipc_acc_cancel, ep) != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } - rv = 0; - if ((rv = nni_win_event_reset(evt)) != 0) { - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&ep->mtx); - return; - } - if (!ConnectNamedPipe(ep->p, nni_win_event_overlapped(evt))) { - rv = GetLastError(); + if (!ConnectNamedPipe(evt->h, &evt->olpd)) { + int rv = GetLastError(); switch (rv) { case ERROR_PIPE_CONNECTED: - rv = 0; - break; + // Synch completion already occurred. + // Windows is weird. Apparently the I/O + // completion packet has already been sent. + return (0); + case ERROR_IO_PENDING: - nni_aio_list_append(&ep->aios, aio); - nni_mtx_unlock(&ep->mtx); - return; + // Normal asynchronous operation. Wait for + // completion. + return (0); default: - rv = nni_win_error(GetLastError()); - nni_aio_finish(aio, rv, 0); - nni_mtx_unlock(&ep->mtx); - return; + // Fast-fail (synchronous). + evt->status = GetLastError(); + evt->count = 0; + return (1); } } - nni_win_ipc_acc_finish(ep); - nni_mtx_unlock(&ep->mtx); + // Synch completion right now. I/O completion packet delivered + // already. + return (0); +} + +void +nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) +{ + aio->a_pipe = NULL; + nni_win_event_submit(&ep->acc_ev, aio); } // So Windows IPC is a bit different on the client side. There is no @@ -645,12 +428,16 @@ nni_win_ipc_conn_thr(void *arg) } while ((ep = nni_list_first(&w->workers)) != NULL) { + nni_list_remove(&w->workers, ep); - if ((aio = nni_list_first(&ep->aios)) == NULL) { + if ((aio = ep->con_aio) == NULL) { continue; } - nni_list_remove(&ep->aios, aio); + ep->con_aio = NULL; + + pipe = NULL; + p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); @@ -658,41 +445,46 @@ nni_win_ipc_conn_thr(void *arg) if (p == INVALID_HANDLE_VALUE) { switch ((rv = GetLastError())) { case ERROR_PIPE_BUSY: - // still in progress. - nni_list_prepend(&ep->aios, aio); - break; + // Still in progress. This shouldn't + // happen unless the other side aborts + // the connection. + ep->con_aio = aio; + nni_list_append(&w->waiters, ep); + continue; + case ERROR_FILE_NOT_FOUND: - nni_aio_finish( - aio, NNG_ECONNREFUSED, 0); + rv = NNG_ECONNREFUSED; break; default: - nni_aio_finish( - aio, nni_win_error(rv), 0); + rv = nni_win_error(rv); break; } - } else { - rv = nni_win_ipc_pipe_init(&pipe, p); - if (rv == 0) { - rv = nni_win_iocp_register(p); - } - if (rv != 0) { - DisconnectNamedPipe(p); - CloseHandle(p); - nni_aio_finish(aio, rv, 0); - } else { - aio->a_pipe = pipe; - nni_aio_finish(aio, 0, 0); - } + goto fail; + } + if (((rv = nni_win_ipc_pipe_init(&pipe, p)) != 0) || + ((rv = nni_win_iocp_register(p)) != 0)) { + goto fail; + } + aio->a_pipe = pipe; + nni_aio_finish(aio, 0, 0); + continue; + + fail: + if (p != INVALID_HANDLE_VALUE) { + DisconnectNamedPipe(p); + CloseHandle(p); } - if (!nni_list_empty(&ep->aios)) { - nni_list_append(&w->waiters, ep); + if (pipe != NULL) { + nni_plat_ipc_pipe_fini(pipe); } + nni_aio_finish(aio, rv, 0); } - // Wait 10 ms, unless woken earlier. if (nni_list_empty(&w->waiters)) { + // Wait until an endpoint is added. nni_cv_wait(&w->cv); } else { + // Wait 10 ms, unless woken earlier. nni_cv_until(&w->cv, nni_clock() + 10000); } } @@ -706,9 +498,10 @@ nni_win_ipc_conn_cancel(nni_aio *aio) nni_plat_ipc_ep * ep = aio->a_prov_data; nni_mtx_lock(&w->mtx); - nni_aio_list_remove(aio); - if (nni_list_empty(&ep->aios)) { + ep->con_aio = NULL; + if (nni_list_active(&w->waiters, ep)) { nni_list_remove(&w->waiters, ep); + nni_cv_wake(&w->cv); } nni_mtx_unlock(&w->mtx); } @@ -720,17 +513,13 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) nni_mtx_lock(&w->mtx); - if (nni_list_active(&w->waiters, ep)) { - nni_aio_finish(aio, NNG_EBUSY, 0); - nni_mtx_unlock(&w->mtx); - return; - } + NNI_ASSERT(!nni_list_active(&w->waiters, ep)); if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) { nni_mtx_unlock(&w->mtx); return; } - nni_list_append(&ep->aios, aio); + ep->con_aio = aio; nni_list_append(&w->waiters, ep); nni_cv_wake(&w->cv); nni_mtx_unlock(&w->mtx); @@ -739,13 +528,12 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) { - nni_mtx_lock(&ep->mtx); - if (ep->p) { + if (ep->p != INVALID_HANDLE_VALUE) { CloseHandle(ep->p); ep->p = NULL; } - nni_mtx_unlock(&ep->mtx); - nni_mtx_fini(&ep->mtx); + nni_win_event_close(&ep->acc_ev); + nni_win_event_fini(&ep->acc_ev); NNI_FREE_STRUCT(ep); } @@ -761,24 +549,19 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) if (nni_list_active(&w->waiters, ep)) { nni_list_remove(&w->waiters, ep); } - while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_list_remove(&ep->aios, aio); + if ((aio = ep->con_aio) != NULL) { + ep->con_aio = NULL; nni_aio_finish(aio, NNG_ECLOSED, 0); } nni_mtx_unlock(&w->mtx); break; + case NNI_EP_MODE_LISTEN: - nni_mtx_lock(&ep->mtx); - while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_list_remove(&ep->aios, aio); - nni_aio_finish(aio, NNG_ECLOSED, 0); - } if (ep->p != INVALID_HANDLE_VALUE) { - nni_win_event_cancel(&ep->acc_evt); + nni_win_event_close(&ep->acc_ev); CloseHandle(ep->p); ep->p = INVALID_HANDLE_VALUE; } - nni_mtx_unlock(&ep->mtx); break; } } |
