diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-10 13:06:47 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-10 13:06:47 -0700 |
| commit | de90f97167d2df6739db47b2c6aad85f06250270 (patch) | |
| tree | 96c34759e56c8ab75013120e99d08cb2996d0dc9 | |
| parent | 2f9332a75cf5441daa61957a7281ed5cfcf76902 (diff) | |
| download | nng-de90f97167d2df6739db47b2c6aad85f06250270.tar.gz nng-de90f97167d2df6739db47b2c6aad85f06250270.tar.bz2 nng-de90f97167d2df6739db47b2c6aad85f06250270.zip | |
Windows IPC progress. Not working yet, but should be close.
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/platform/windows/win_impl.h | 28 | ||||
| -rw-r--r-- | src/platform/windows/win_iocp.c | 105 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 950 | ||||
| -rw-r--r-- | src/platform/windows/win_thread.c | 7 |
5 files changed, 817 insertions, 275 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7d2f8d2f..18d4932f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -98,8 +98,8 @@ set (NNG_SOURCES platform/windows/win_impl.h platform/windows/win_clock.c platform/windows/win_debug.c - platform/windows/win_ipc.c platform/windows/win_iocp.c + platform/windows/win_ipc.c platform/windows/win_net.c platform/windows/win_pipe.c platform/windows/win_rand.c diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index fd8f3f79..6ce38250 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -22,6 +22,7 @@ #include <process.h> #include <ws2tcpip.h> +#include "core/list.h" // These types are provided for here, to permit them to be directly inlined // elsewhere. @@ -35,22 +36,10 @@ typedef struct nni_win_event nni_win_event; // conveyed in the OVERLAPPED directly. struct nni_win_event { OVERLAPPED olpd; + HANDLE h; void * ptr; nni_cb cb; - int status; - int nbytes; -}; - -struct nni_plat_ipcsock { - HANDLE p; - - char path[256]; - WSAOVERLAPPED recv_olpd; - WSAOVERLAPPED send_olpd; - WSAOVERLAPPED conn_olpd; // Use for both connect and accept - CRITICAL_SECTION cs; - - int server; + nni_list aios; }; struct nni_plat_thr { @@ -73,9 +62,20 @@ struct nni_plat_cv { extern int nni_win_error(int); extern int nni_winsock_error(int); +extern int nni_win_event_init(nni_win_event *, nni_cb, void *, HANDLE); +extern void nni_win_event_fini(nni_win_event *); +extern int nni_win_event_reset(nni_win_event *); +extern OVERLAPPED *nni_win_event_overlapped(nni_win_event *); +extern void nni_win_event_cancel(nni_win_event *); + +extern int nni_win_iocp_register(HANDLE); + extern int nni_win_iocp_sysinit(void); extern void nni_win_iocp_sysfini(void); +extern int nni_win_ipc_sysinit(void); +extern void nni_win_ipc_sysfini(void); + extern int nni_win_resolv_sysinit(void); extern void nni_win_resolv_sysfini(void); diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index 06feb62e..381da64f 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -20,14 +20,14 @@ // port for pretty much everything. static HANDLE nni_win_global_iocp = NULL; -static nni_win_event nni_win_iocp_exit; static nni_thr nni_win_iocp_thrs[NNI_WIN_IOCP_NTHREADS]; +static nni_mtx nni_win_iocp_mtx; static void nni_win_iocp_handler(void *arg) { HANDLE iocp; - DWORD nbytes; + DWORD cnt; ULONG_PTR key; OVERLAPPED *olpd; nni_win_event *evt; @@ -42,7 +42,9 @@ nni_win_iocp_handler(void *arg) key = 0; olpd = NULL; status = 0; - rv = GetQueuedCompletionStatus(iocp, &nbytes, &key, &olpd, + cnt = 0; + + rv = GetQueuedCompletionStatus(iocp, &cnt, &key, &olpd, INFINITE); if (rv == FALSE) { @@ -50,21 +52,12 @@ nni_win_iocp_handler(void *arg) // Completion port bailed... break; } - nbytes = 0; - key = 0; - status = nni_win_error(GetLastError()); } NNI_ASSERT(olpd != NULL); evt = (void *) olpd; - if (evt == &nni_win_iocp_exit) { - // Exit requested. - break; - } NNI_ASSERT(evt->cb != NULL); - evt->nbytes = nbytes; - evt->status = status; evt->cb(evt->ptr); } } @@ -81,13 +74,86 @@ nni_win_iocp_register(HANDLE h) int +nni_win_event_init(nni_win_event *evt, nni_cb cb, void *ptr, HANDLE h) +{ + ZeroMemory(&evt->olpd, sizeof (evt->olpd)); + evt->olpd.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + if (evt->olpd.hEvent == NULL) { + return (nni_win_error(GetLastError())); + } + nni_aio_list_init(&evt->aios); + evt->ptr = ptr; + evt->cb = cb; + evt->h = h; + return (0); +} + + +void +nni_win_event_fini(nni_win_event *evt) +{ + if (evt->olpd.hEvent != NULL) { + (void) CloseHandle(evt->olpd.hEvent); + evt->olpd.hEvent = NULL; + } +} + + +int +nni_win_event_reset(nni_win_event *evt) +{ + if (!ResetEvent(evt->olpd.hEvent)) { + return (nni_win_error(GetLastError())); + } + return (0); +} + + +OVERLAPPED * +nni_win_event_overlapped(nni_win_event *evt) +{ + return (&evt->olpd); +} + + +void +nni_win_event_cancel(nni_win_event *evt) +{ + int rv; + DWORD cnt; + + // Try to cancel the event... + if (!CancelIoEx(evt->h, &evt->olpd)) { + // None was found. That's good. + if ((rv = GetLastError()) == ERROR_NOT_FOUND) { + // Nothing queued. We may in theory be running + // the callback via the completion port handler; + // caller must synchronize that separately. + return; + } + + // It's unclear why we would ever fail in this + // circumstance. Is there a kind of "uncancellable I/O" + // here, or somesuch? In this case we just wait hard + // using the success case -- its the best we can do. + } + + // This basically just waits for the canceled I/O to complete. + // The end result can be either success or ERROR_OPERATION_ABORTED. + // It turns out we don't much care either way; we just want to make + // sure that we don't have any I/O pending on the overlapped + // structure before we release it or reuse it. + GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); +} + + +int nni_win_iocp_sysinit(void) { HANDLE h; int i; int rv; - ZeroMemory(&nni_win_iocp_exit, sizeof (nni_win_iocp_exit)); h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, NNI_WIN_IOCP_NTHREADS); if (h == NULL) { @@ -101,6 +167,9 @@ nni_win_iocp_sysinit(void) goto fail; } } + if ((rv = nni_mtx_init(&nni_win_iocp_mtx)) != 0) { + goto fail; + } for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) { nni_thr_run(&nni_win_iocp_thrs[i]); } @@ -108,7 +177,6 @@ nni_win_iocp_sysinit(void) fail: if ((h = nni_win_global_iocp) != NULL) { - PostQueuedCompletionStatus(h, 0, 0, &nni_win_iocp_exit.olpd); CloseHandle(h); nni_win_global_iocp = NULL; } @@ -126,14 +194,13 @@ nni_win_iocp_sysfini(void) HANDLE h; if ((h = nni_win_global_iocp) != NULL) { - // Signal the iocp poller to exit. - PostQueuedCompletionStatus(h, 0, 0, &nni_win_iocp_exit.olpd); CloseHandle(h); nni_win_global_iocp = NULL; - for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) { - nni_thr_fini(&nni_win_iocp_thrs[i]); - } } + for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) { + nni_thr_fini(&nni_win_iocp_thrs[i]); + } + nni_mtx_fini(&nni_win_iocp_mtx); } diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index 8ae26a6e..f6601f93 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -13,371 +13,841 @@ #include <stdio.h> -#if 0 +static void nni_win_ipc_acc_cb(void *); +static void nni_win_ipc_send_cb(void *); +static void nni_win_ipc_recv_cb(void *); +static void nni_win_ipc_send_start(nni_plat_ipc_pipe *); +static void nni_win_ipc_recv_start(nni_plat_ipc_pipe *); -int -nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) +struct nni_plat_ipc_pipe { + HANDLE p; + nni_win_event recv_evt; + nni_win_event send_evt; + nni_mtx mtx; + nni_list readq; + nni_list writeq; +}; + +struct nni_plat_ipc_ep { + char path[256]; + int mode; + int started; + nni_list aios; + HANDLE p; // accept side only + nni_win_event acc_evt; // accept side only + nni_mtx mtx; // accept side only + nni_list_node node; // conn side uses this +}; + + +static int +nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p) { - int i; - DWORD nsent; - DWORD resid; - char *buf; - DWORD len; - nni_iov iov[4]; + nni_plat_ipc_pipe *pipe; int rv; - OVERLAPPED *olp = &s->send_olpd; - - NNI_ASSERT(cnt <= 4); - for (i = 0, resid = 0; i < cnt; resid += (DWORD) iov[i].iov_len, i++) { - iov[i].iov_len = iovs[i].iov_len; - iov[i].iov_buf = iovs[i].iov_buf; - } - - i = 0; - while (resid) { - NNI_ASSERT(i < cnt); - nsent = 0; - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - len = iov[i].iov_len > 0x1000000 ? 0x1000000 : - (DWORD) iov[i].iov_len; - buf = iov[i].iov_buf; - - if (!WriteFile(s->p, buf, len, NULL, olp)) { - if ((rv = GetLastError()) != ERROR_IO_PENDING) { - return (nni_winpipe_error(rv)); + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { + NNI_FREE_STRUCT(pipe); + return (rv); + } + rv = nni_win_event_init(&pipe->recv_evt, nni_win_ipc_recv_cb, pipe, p); + if (rv != 0) { + nni_plat_ipc_pipe_fini(pipe); + return (rv); + } + rv = nni_win_event_init(&pipe->send_evt, nni_win_ipc_send_cb, pipe, p); + if (rv != 0) { + nni_plat_ipc_pipe_fini(pipe); + return (rv); + } + + pipe->p = p; + nni_aio_list_init(&pipe->readq); + nni_aio_list_init(&pipe->writeq); + *pipep = pipe; + return (0); +} + + +static void +nni_win_ipc_send_cancel(nni_aio *aio) +{ + nni_plat_ipc_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + nni_win_event_cancel(&pipe->recv_evt); + nni_aio_list_remove(aio); + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_win_ipc_send_finish(nni_plat_ipc_pipe *pipe) +{ + nni_win_event *evt = &pipe->send_evt; + OVERLAPPED *olpd = nni_win_event_overlapped(evt); + int rv = 0; + nni_aio *aio; + DWORD cnt; + + if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) { + rv = nni_win_error(GetLastError()); + } + if ((aio = nni_list_first(&pipe->writeq)) == NULL) { + // If the AIO was canceled, but IOCP thread was still + // working on it, we might have seen this. + return; + } + if (rv == 0) { + NNI_ASSERT(cnt <= aio->a_iov[0].iov_len); + aio->a_count += cnt; + aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt; + aio->a_iov[0].iov_len -= cnt; + + if (aio->a_iov[0].iov_len == 0) { + int i; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i-1] = aio->a_iov[i]; } + aio->a_niov--; } - if (!GetOverlappedResult(s->p, olp, &nsent, TRUE)) { - rv = GetLastError(); - return (nni_winpipe_error(rv)); - } - NNI_ASSERT(nsent <= resid); - NNI_ASSERT(nsent <= len); - resid -= nsent; - if (nsent < iov[i].iov_len) { - iov[i].iov_buf = buf + nsent; - iov[i].iov_len -= nsent; - } else { - i++; + + if (aio->a_niov > 0) { + // If we have more to do, submit it! + nni_win_ipc_send_start(pipe); + return; } } - return (0); + + // All done; hopefully successfully. + nni_list_remove(&pipe->writeq, aio); + nni_aio_finish(aio, rv, aio->a_count); } -int -nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) +static void +nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe) { - int i; - DWORD nrecv; - DWORD resid; + void *buf; DWORD len; - char *buf; - nni_iov iov[4]; int rv; - OVERLAPPED *olp = &s->recv_olpd; - - NNI_ASSERT(cnt <= 4); - for (i = 0, resid = 0; i < cnt; resid += (DWORD) iov[i].iov_len, i++) { - iov[i].iov_len = iovs[i].iov_len; - iov[i].iov_buf = iovs[i].iov_buf; - } - - i = 0; - while (resid) { - NNI_ASSERT(i < cnt); - nrecv = 0; - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - len = iov[i].iov_len > 0x1000000 ? 0x1000000 : - (DWORD) iov[i].iov_len; - buf = iov[i].iov_buf; - - if (!ReadFile(s->p, buf, len, NULL, olp)) { - if ((rv = GetLastError()) != ERROR_IO_PENDING) { - return (nni_winpipe_error(rv)); - } + nni_win_event *evt = &pipe->send_evt; + OVERLAPPED *olpd = nni_win_event_overlapped(evt); + nni_aio *aio = nni_list_first(&pipe->writeq); + + NNI_ASSERT(aio != NULL); + NNI_ASSERT(aio->a_niov > 0); + NNI_ASSERT(aio->a_iov[0].iov_len > 0); + NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); + + if (pipe->p == INVALID_HANDLE_VALUE) { + rv = NNG_ECLOSED; + goto fail; + } + + if ((rv = nni_win_event_reset(evt)) != 0) { + goto fail; + } + + // Now start a writefile. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes do + // not appear to support scatter/gather, so we have to process + // each element in turn. + buf = aio->a_iov[0].iov_buf; + len = (DWORD) aio->a_iov[0].iov_len; + olpd = nni_win_event_overlapped(evt); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } + + if (!WriteFile(pipe->p, buf, len, NULL, olpd)) { + // If we failed immediately, then process it. + if ((rv = GetLastError()) == ERROR_IO_PENDING) { + // This is the normal path we expect; the IO will + // complete asynchronously. + return; } - if (!GetOverlappedResult(s->p, olp, &nrecv, TRUE)) { - rv = GetLastError(); - return (nni_winpipe_error(rv)); + + // Some synchronous error occurred. + rv = nni_win_error(rv); + goto fail; + } + + // If we completed synchronously, then do the completion. This is + // not normally expected. + nni_win_ipc_send_finish(pipe); + return; + +fail: + nni_aio_list_remove(aio); + nni_aio_finish(aio, rv, aio->a_count); +} + + +static void +nni_win_ipc_send_cb(void *arg) +{ + nni_plat_ipc_pipe *pipe = arg; + + nni_mtx_lock(&pipe->mtx); + nni_win_ipc_send_finish(pipe); + nni_mtx_unlock(&pipe->mtx); +} + + +void +nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio) +{ + nni_win_event *evt = &pipe->send_evt; + int rv; + + nni_mtx_lock(&pipe->mtx); + if ((rv = nni_aio_start(aio, nni_win_ipc_send_cancel, pipe)) != 0) { + nni_mtx_unlock(&pipe->mtx); + return; + } + if (pipe->p == INVALID_HANDLE_VALUE) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&pipe->mtx); + return; + } + + if ((rv = nni_win_event_reset(evt)) != 0) { + nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&pipe->mtx); + return; + } + nni_aio_list_append(&pipe->writeq, aio); + nni_win_ipc_send_start(pipe); + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_win_ipc_recv_cancel(nni_aio *aio) +{ + nni_plat_ipc_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + nni_win_event_cancel(&pipe->recv_evt); + nni_aio_list_remove(aio); + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe) +{ + nni_win_event *evt = &pipe->recv_evt; + OVERLAPPED *olpd = nni_win_event_overlapped(evt); + int rv = 0; + nni_aio *aio; + DWORD cnt; + + if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) { + rv = nni_win_error(GetLastError()); + } + if ((aio = nni_list_first(&pipe->readq)) == NULL) { + // If the AIO was canceled, but IOCP thread was still + // working on it, we might have seen this. + return; + } + if (rv == 0) { + NNI_ASSERT(cnt <= aio->a_iov[0].iov_len); + aio->a_count += cnt; + aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt; + aio->a_iov[0].iov_len -= cnt; + + if (aio->a_iov[0].iov_len == 0) { + int i; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i-1] = aio->a_iov[i]; + } + aio->a_niov--; } - NNI_ASSERT(nrecv <= resid); - NNI_ASSERT(nrecv <= len); - resid -= nrecv; - if (nrecv < iov[i].iov_len) { - iov[i].iov_buf = buf + nrecv; - iov[i].iov_len -= nrecv; - } else { - i++; + + if (aio->a_niov > 0) { + // If we have more to do, submit it! + nni_win_ipc_recv_start(pipe); + return; } } - return (0); + + // All done; hopefully successfully. + nni_list_remove(&pipe->readq, aio); + nni_aio_finish(aio, rv, aio->a_count); } -int -nni_plat_ipc_init(nni_plat_ipcsock *s) +static void +nni_win_ipc_recv_start(nni_plat_ipc_pipe *pipe) { + void *buf; + DWORD len; int rv; + nni_win_event *evt = &pipe->recv_evt; + OVERLAPPED *olpd = nni_win_event_overlapped(evt); + nni_aio *aio = nni_list_first(&pipe->readq); + + NNI_ASSERT(aio != NULL); + NNI_ASSERT(aio->a_niov > 0); + NNI_ASSERT(aio->a_iov[0].iov_len > 0); + NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); + + if (pipe->p == INVALID_HANDLE_VALUE) { + rv = NNG_ECLOSED; + goto fail; + } - s->p = INVALID_HANDLE_VALUE; - s->recv_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (s->recv_olpd.hEvent == INVALID_HANDLE_VALUE) { - rv = GetLastError(); - return (nni_winpipe_error(rv)); + if ((rv = nni_win_event_reset(evt)) != 0) { + goto fail; } - s->send_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (s->send_olpd.hEvent == INVALID_HANDLE_VALUE) { - rv = GetLastError(); - CloseHandle(s->recv_olpd.hEvent); - return (nni_winpipe_error(rv)); + + // Now start a readfile. We assume that only one read can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes do + // not appear to support scatter/gather, so we have to process + // each element in turn. + buf = aio->a_iov[0].iov_buf; + len = (DWORD) aio->a_iov[0].iov_len; + olpd = nni_win_event_overlapped(evt); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; } - s->conn_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (s->conn_olpd.hEvent == INVALID_HANDLE_VALUE) { - rv = GetLastError(); - CloseHandle(s->send_olpd.hEvent); - CloseHandle(s->recv_olpd.hEvent); - return (nni_winpipe_error(rv)); + + if (!ReadFile(pipe->p, buf, len, NULL, olpd)) { + // If we failed immediately, then process it. + if ((rv = GetLastError()) == ERROR_IO_PENDING) { + // This is the normal path we expect; the IO will + // complete asynchronously. + return; + } + + // Some synchronous error occurred. + rv = nni_win_error(rv); + goto fail; } - InitializeCriticalSection(&s->cs); - return (0); + + // If we completed synchronously, then do the completion. This is + // not normally expected. + nni_win_ipc_recv_finish(pipe); + return; + +fail: + nni_aio_list_remove(aio); + nni_aio_finish(aio, rv, 0); } static void -nni_plat_ipc_close(nni_plat_ipcsock *s) +nni_win_ipc_recv_cb(void *arg) { - HANDLE fd; + nni_plat_ipc_pipe *pipe = arg; - EnterCriticalSection(&s->cs); - if ((fd = s->p) != INVALID_HANDLE_VALUE) { - s->p = INVALID_HANDLE_VALUE; - if (s->server) { - (void) DisconnectNamedPipe(fd); - } - (void) CancelIoEx(fd, &s->send_olpd); - (void) CancelIoEx(fd, &s->recv_olpd); - (void) CancelIoEx(fd, &s->conn_olpd); - (void) CloseHandle(fd); + nni_mtx_lock(&pipe->mtx); + nni_win_ipc_recv_finish(pipe); + nni_mtx_unlock(&pipe->mtx); +} + + +void +nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio) +{ + nni_win_event *evt = &pipe->send_evt; + int rv; + + nni_mtx_lock(&pipe->mtx); + if ((rv = nni_aio_start(aio, nni_win_ipc_recv_cancel, pipe)) != 0) { + nni_mtx_unlock(&pipe->mtx); + return; } - LeaveCriticalSection(&s->cs); + if (pipe->p == INVALID_HANDLE_VALUE) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&pipe->mtx); + return; + } + + if ((rv = nni_win_event_reset(evt)) != 0) { + nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&pipe->mtx); + return; + } + nni_aio_list_append(&pipe->readq, aio); + nni_win_ipc_recv_start(pipe); } void -nni_plat_ipc_fini(nni_plat_ipcsock *s) +nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe) { - nni_plat_ipc_close(s); - DeleteCriticalSection(&s->cs); - CloseHandle(s->recv_olpd.hEvent); - CloseHandle(s->send_olpd.hEvent); - CloseHandle(s->conn_olpd.hEvent); + nni_mtx_lock(&pipe->mtx); + if (pipe->p != INVALID_HANDLE_VALUE) { + CloseHandle(pipe->p); + pipe->p = INVALID_HANDLE_VALUE; + } + nni_win_event_cancel(&pipe->send_evt); + nni_win_event_cancel(&pipe->recv_evt); + nni_mtx_unlock(&pipe->mtx); } void -nni_plat_ipc_shutdown(nni_plat_ipcsock *s) +nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe) +{ + nni_plat_ipc_pipe_close(pipe); + + nni_win_event_fini(&pipe->send_evt); + nni_win_event_fini(&pipe->recv_evt); + nni_mtx_fini(&pipe->mtx); + NNI_FREE_STRUCT(pipe); +} + + +int +nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) { - nni_plat_ipc_close(s); + const char *path; + nni_plat_ipc_ep *ep; + int rv; + + if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { + return (NNG_EADDRINVAL); + } + path = url + strlen("ipc://"); + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + ZeroMemory(ep, sizeof (ep)); + if ((rv = nni_mtx_init(&ep->mtx)) != 0) { + NNI_FREE_STRUCT(ep); + return (rv); + } + + ep->mode = mode; + NNI_LIST_NODE_INIT(&ep->node); + nni_aio_list_init(&ep->aios); + + (void) snprintf(ep->path, sizeof (ep->path), "\\\\.\\pipe\\%s", path); + + *epp = ep; + return (0); } int -nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path) +nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) { int rv; + HANDLE p; - snprintf(s->path, sizeof (s->path), "\\\\.\\pipe\\%s", path); + nni_mtx_lock(&ep->mtx); + if (ep->mode != NNI_EP_MODE_LISTEN) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EINVAL); + } + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EBUSY); + } // We create the first named pipe, and we make sure that it is // properly ours. - s->p = CreateNamedPipeA( - s->path, + p = CreateNamedPipeA(ep->path, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS, PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, NULL); - - if (s->p == INVALID_HANDLE_VALUE) { + if (p == INVALID_HANDLE_VALUE) { if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) { - return (NNG_EADDRINUSE); + rv = NNG_EADDRINUSE; + } else { + rv = nni_win_error(rv); } - return (nni_winpipe_error(rv)); + goto failed; + } + rv = nni_win_event_init(&ep->acc_evt, nni_win_ipc_acc_cb, ep, p); + if (rv != 0) { + goto failed; } - s->server = 1; + if ((rv = nni_win_iocp_register(p)) != 0) { + goto failed; + } + + ep->p = p; + ep->started = 1; + nni_mtx_unlock(&ep->mtx); return (0); + +failed: + + nni_mtx_unlock(&ep->mtx); + if (p != INVALID_HANDLE_VALUE) { + (void) CloseHandle(p); + } + + return (rv); } -int -nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server) +static void +nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep) { - int rv; - OVERLAPPED *olp = &s->conn_olpd; + nni_win_event *evt = &ep->acc_evt; DWORD nbytes; - - s->server = 1; - if (!ConnectNamedPipe(server->p, olp)) { - rv = GetLastError(); - switch (rv) { - case ERROR_PIPE_CONNECTED: - break; - case ERROR_IO_PENDING: - if (!GetOverlappedResult(server->p, olp, &nbytes, - TRUE)) { - rv = GetLastError(); - return (nni_winpipe_error(rv)); - } - default: - rv = GetLastError(); - return (nni_winpipe_error(rv)); + int rv; + nni_plat_ipc_pipe *pipe; + nni_aio *aio; + HANDLE newp, oldp; + + // Note: This should be called with the ep lock held, and only when + // the ConnectNamedPipe has finished. + + rv = 0; + if (!GetOverlappedResult(ep->p, &evt->olpd, &nbytes, FALSE)) { + if ((rv = GetLastError()) == ERROR_IO_INCOMPLETE) { + // We should never be here normally, but if the + // pipe got accepted by another client we can + // some times race here. + return; } } - EnterCriticalSection(&server->cs); - if (server->p != INVALID_HANDLE_VALUE) { - s->p = server->p; - server->p = CreateNamedPipeA( - server->path, - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | - PIPE_REJECT_REMOTE_CLIENTS, - PIPE_UNLIMITED_INSTANCES, - 4096, 4096, 0, NULL); - if (server->p == INVALID_HANDLE_VALUE) { - // We return the old handle, so that future accept - // attempts have a chance of succeeding. That means - // we will disconnect the current client. - rv = GetLastError(); - server->p = s->p; - DisconnectNamedPipe(server->p); - s->p = INVALID_HANDLE_VALUE; - LeaveCriticalSection(&server->cs); - return (nni_winpipe_error(rv)); + if ((aio = nni_list_first(&ep->aios)) == NULL) { + // No completion available to us. + if (rv == 0) { + NNI_ASSERT(0); + DisconnectNamedPipe(ep->p); } + return; } - LeaveCriticalSection(&server->cs); - return (0); + nni_list_remove(&ep->aios, aio); + if (rv != 0) { + nni_aio_finish(aio, rv, 0); + return; + } + + newp = CreateNamedPipeA(ep->path, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | + PIPE_REJECT_REMOTE_CLIENTS, + PIPE_UNLIMITED_INSTANCES, + 4096, 4096, 0, NULL); + if (newp == INVALID_HANDLE_VALUE) { + rv = nni_win_error(GetLastError()); + DisconnectNamedPipe(ep->p); + return; + } + oldp = ep->p; + ep->p = newp; + + if ((rv = nni_win_ipc_pipe_init(&pipe, oldp)) != 0) { + DisconnectNamedPipe(oldp); + nni_aio_finish(aio, rv, 0); + return; + } + + aio->a_pipe = pipe; + nni_aio_finish(aio, 0, 0); } -int -nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path) +static void +nni_win_ipc_acc_cb(void *arg) { - int rv; + nni_plat_ipc_ep *ep = arg; - snprintf(s->path, sizeof (s->path), "\\\\.\\pipe\\%s", path); + nni_mtx_lock(&ep->mtx); + nni_win_ipc_acc_finish(ep); + nni_mtx_unlock(&ep->mtx); +} - for (;;) { - s->p = CreateFileA(s->path, GENERIC_READ | GENERIC_WRITE, - 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); - if (s->p == INVALID_HANDLE_VALUE) { - rv = GetLastError(); - switch (rv) { - case ERROR_PIPE_BUSY: - if (!WaitNamedPipe(s->path, - NMPWAIT_USE_DEFAULT_WAIT)) { - return (NNG_ETIMEDOUT); - } - continue; - case ERROR_FILE_NOT_FOUND: - // No present pipes (no listener?) - return (NNG_ECONNREFUSED); - default: - return (nni_winpipe_error(rv)); - } - } - s->server = 0; - break; - } - return (0); +static void +nni_win_ipc_acc_cancel(nni_aio *aio) +{ + nni_plat_ipc_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&ep->mtx); + nni_win_event_cancel(&ep->acc_evt); + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); } -#endif +void +nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) +{ + int rv; + nni_win_event *evt = &ep->acc_evt; + nni_mtx_lock(&ep->mtx); + if (nni_aio_start(aio, nni_win_ipc_acc_cancel, ep) != 0) { + nni_mtx_unlock(&ep->mtx); + return; + } + rv = 0; + if ((rv = nni_win_event_reset(evt)) != 0) { + nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&ep->mtx); + return; + } + if (!ConnectNamedPipe(ep->p, nni_win_event_overlapped(evt))) { + rv = GetLastError(); + switch (rv) { + case ERROR_PIPE_CONNECTED: + rv = 0; + break; + case ERROR_IO_PENDING: + nni_aio_list_append(&ep->aios, aio); + nni_mtx_unlock(&ep->mtx); + return; -struct nni_plat_ipc_pipe { - SOCKET s; - nni_win_event recv_evt; - nni_win_event send_evt; - OVERLAPPED recv_olpd; - OVERLAPPED send_olpd; -}; + default: + rv = nni_win_error(GetLastError()); + nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&ep->mtx); + return; + } + } -struct nni_plat_ipc_ep { - SOCKET s; - nni_win_event evt; - OVERLAPPED olpd; -}; + nni_win_ipc_acc_finish(ep); + nni_mtx_unlock(&ep->mtx); +} -int -nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) -{ - return (NNG_ENOTSUP); -} +// So Windows IPC is a bit different on the client side. There is no +// support for asynchronous connection, but we can fake it with a single +// thread that runs to establish the connection. That thread will run +// keep looping, sleeping for 10 ms between attempts. It performs non-blocking +// attempts to connect. +typedef struct nni_win_ipc_conn_work nni_win_ipc_conn_work; +struct nni_win_ipc_conn_work { + nni_list waiters; + nni_list workers; + nni_mtx mtx; + nni_cv cv; + nni_thr thr; + int exit; +}; +static nni_win_ipc_conn_work nni_win_ipc_connecter; -void -nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) + +static void +nni_win_ipc_conn_thr(void *arg) { -} + nni_win_ipc_conn_work *w = arg; + nni_plat_ipc_ep *ep; + nni_plat_ipc_pipe *pipe; + nni_aio *aio; + HANDLE p; + int rv; + nni_mtx_lock(&w->mtx); + for (;;) { + if (w->exit) { + break; + } + while ((ep = nni_list_first(&w->waiters)) != NULL) { + nni_list_remove(&w->waiters, ep); + nni_list_append(&w->workers, ep); + } -void -nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) -{ -} + while ((ep = nni_list_first(&w->workers)) != NULL) { + nni_list_remove(&w->workers, ep); + if ((aio = nni_list_first(&ep->aios)) == NULL) { + continue; + } + nni_list_remove(&ep->aios, aio); + p = CreateFileA(ep->path, + GENERIC_READ | GENERIC_WRITE, + 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, + NULL); + + if (p == INVALID_HANDLE_VALUE) { + switch ((rv = GetLastError())) { + case ERROR_PIPE_BUSY: + // still in progress. + nni_list_prepend(&ep->aios, aio); + break; + case ERROR_FILE_NOT_FOUND: + nni_aio_finish(aio, NNG_ECONNREFUSED, + 0); + break; + default: + nni_aio_finish(aio, nni_win_error(rv), + 0); + break; + } + } else { + rv = nni_win_ipc_pipe_init(&pipe, p); + if (rv == 0) { + rv = nni_win_iocp_register(p); + } + if (rv != 0) { + DisconnectNamedPipe(p); + CloseHandle(p); + nni_aio_finish(aio, rv, 0); + } else { + aio->a_pipe = pipe; + nni_aio_finish(aio, 0, 0); + } + } + if (!nni_list_empty(&ep->aios)) { + nni_list_append(&w->waiters, ep); + } + } -extern int -nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) -{ - return (NNG_ENOTSUP); + // Wait 10 ms, unless woken earlier. + if (nni_list_empty(&w->waiters)) { + nni_cv_wait(&w->cv); + } else { + nni_cv_until(&w->cv, nni_clock() + 10000); + } + } + nni_mtx_unlock(&w->mtx); } -extern void -nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) +static void +nni_win_ipc_conn_cancel(nni_aio *aio) { + nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; + nni_plat_ipc_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&w->mtx); + nni_aio_list_remove(aio); + if (nni_list_empty(&ep->aios)) { + nni_list_remove(&w->waiters, ep); + } + nni_mtx_unlock(&w->mtx); } -extern void +void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) { + nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; + + nni_mtx_lock(&w->mtx); + + if (nni_list_active(&w->waiters, ep)) { + nni_aio_finish(aio, NNG_EBUSY, 0); + nni_mtx_unlock(&w->mtx); + return; + } + + if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) { + nni_mtx_unlock(&w->mtx); + return; + } + nni_list_append(&ep->aios, aio); + nni_list_append(&w->waiters, ep); + nni_cv_wake(&w->cv); + nni_mtx_unlock(&w->mtx); } void -nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio) +nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) { + nni_mtx_lock(&ep->mtx); + if (ep->p) { + CloseHandle(ep->p); + ep->p = NULL; + } + nni_mtx_unlock(&ep->mtx); + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); } void -nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio) +nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) { + nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; + nni_aio *aio; + + switch (ep->mode) { + case NNI_EP_MODE_DIAL: + nni_mtx_lock(&w->mtx); + if (nni_list_active(&w->waiters, ep)) { + nni_list_remove(&w->waiters, ep); + } + while ((aio = nni_list_first(&ep->aios)) != NULL) { + nni_list_remove(&ep->aios, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + nni_mtx_unlock(&w->mtx); + break; + case NNI_EP_MODE_LISTEN: + nni_mtx_lock(&ep->mtx); + while ((aio = nni_list_first(&ep->aios)) != NULL) { + nni_list_remove(&ep->aios, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + if (ep->p != INVALID_HANDLE_VALUE) { + nni_win_event_cancel(&ep->acc_evt); + CloseHandle(ep->p); + ep->p = INVALID_HANDLE_VALUE; + } + nni_mtx_unlock(&ep->mtx); + break; + } } -void -nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p) +int +nni_win_ipc_sysinit(void) { + int rv; + nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter; + + NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node); + NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node); + + if (((rv = nni_mtx_init(&worker->mtx)) != 0) || + ((rv = nni_cv_init(&worker->cv, &worker->mtx)) != 0)) { + return (rv); + } + rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker); + if (rv != 0) { + return (rv); + } + + nni_thr_run(&worker->thr); + + return (0); } void -nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p) +nni_win_ipc_sysfini(void) { + nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter; + + nni_mtx_lock(&worker->mtx); + worker->exit = 1; + nni_cv_wake(&worker->cv); + nni_mtx_unlock(&worker->mtx); + nni_thr_fini(&worker->thr); + nni_cv_fini(&worker->cv); + nni_mtx_fini(&worker->mtx); } diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index f3103079..bc2d7a0c 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -181,6 +181,10 @@ nni_plat_init(int (*helper)(void)) if ((rv = nni_win_resolv_sysinit()) != 0) { goto out; } + if ((rv = nni_win_ipc_sysinit()) != 0) { + goto out; + } + helper(); inited = 1; } @@ -195,9 +199,10 @@ out: void nni_plat_fini(void) { - WSACleanup(); + nni_win_ipc_sysfini(); nni_win_resolv_sysfini(); nni_win_iocp_sysfini(); + WSACleanup(); } |
