diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-10 13:06:47 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-10 13:06:47 -0700 |
| commit | de90f97167d2df6739db47b2c6aad85f06250270 (patch) | |
| tree | 96c34759e56c8ab75013120e99d08cb2996d0dc9 /src/platform/windows/win_ipc.c | |
| parent | 2f9332a75cf5441daa61957a7281ed5cfcf76902 (diff) | |
| download | nng-de90f97167d2df6739db47b2c6aad85f06250270.tar.gz nng-de90f97167d2df6739db47b2c6aad85f06250270.tar.bz2 nng-de90f97167d2df6739db47b2c6aad85f06250270.zip | |
Windows IPC progress. Not working yet, but should be close.
Diffstat (limited to 'src/platform/windows/win_ipc.c')
| -rw-r--r-- | src/platform/windows/win_ipc.c | 950 |
1 files changed, 710 insertions, 240 deletions
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index 8ae26a6e..f6601f93 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -13,371 +13,841 @@ #include <stdio.h> -#if 0 +static void nni_win_ipc_acc_cb(void *); +static void nni_win_ipc_send_cb(void *); +static void nni_win_ipc_recv_cb(void *); +static void nni_win_ipc_send_start(nni_plat_ipc_pipe *); +static void nni_win_ipc_recv_start(nni_plat_ipc_pipe *); -int -nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) +struct nni_plat_ipc_pipe { + HANDLE p; + nni_win_event recv_evt; + nni_win_event send_evt; + nni_mtx mtx; + nni_list readq; + nni_list writeq; +}; + +struct nni_plat_ipc_ep { + char path[256]; + int mode; + int started; + nni_list aios; + HANDLE p; // accept side only + nni_win_event acc_evt; // accept side only + nni_mtx mtx; // accept side only + nni_list_node node; // conn side uses this +}; + + +static int +nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p) { - int i; - DWORD nsent; - DWORD resid; - char *buf; - DWORD len; - nni_iov iov[4]; + nni_plat_ipc_pipe *pipe; int rv; - OVERLAPPED *olp = &s->send_olpd; - - NNI_ASSERT(cnt <= 4); - for (i = 0, resid = 0; i < cnt; resid += (DWORD) iov[i].iov_len, i++) { - iov[i].iov_len = iovs[i].iov_len; - iov[i].iov_buf = iovs[i].iov_buf; - } - - i = 0; - while (resid) { - NNI_ASSERT(i < cnt); - nsent = 0; - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - len = iov[i].iov_len > 0x1000000 ? 0x1000000 : - (DWORD) iov[i].iov_len; - buf = iov[i].iov_buf; - - if (!WriteFile(s->p, buf, len, NULL, olp)) { - if ((rv = GetLastError()) != ERROR_IO_PENDING) { - return (nni_winpipe_error(rv)); + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { + NNI_FREE_STRUCT(pipe); + return (rv); + } + rv = nni_win_event_init(&pipe->recv_evt, nni_win_ipc_recv_cb, pipe, p); + if (rv != 0) { + nni_plat_ipc_pipe_fini(pipe); + return (rv); + } + rv = nni_win_event_init(&pipe->send_evt, nni_win_ipc_send_cb, pipe, p); + if (rv != 0) { + nni_plat_ipc_pipe_fini(pipe); + return (rv); + } + + pipe->p = p; + nni_aio_list_init(&pipe->readq); + nni_aio_list_init(&pipe->writeq); + *pipep = pipe; + return (0); +} + + +static void +nni_win_ipc_send_cancel(nni_aio *aio) +{ + nni_plat_ipc_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + nni_win_event_cancel(&pipe->recv_evt); + nni_aio_list_remove(aio); + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_win_ipc_send_finish(nni_plat_ipc_pipe *pipe) +{ + nni_win_event *evt = &pipe->send_evt; + OVERLAPPED *olpd = nni_win_event_overlapped(evt); + int rv = 0; + nni_aio *aio; + DWORD cnt; + + if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) { + rv = nni_win_error(GetLastError()); + } + if ((aio = nni_list_first(&pipe->writeq)) == NULL) { + // If the AIO was canceled, but IOCP thread was still + // working on it, we might have seen this. + return; + } + if (rv == 0) { + NNI_ASSERT(cnt <= aio->a_iov[0].iov_len); + aio->a_count += cnt; + aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt; + aio->a_iov[0].iov_len -= cnt; + + if (aio->a_iov[0].iov_len == 0) { + int i; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i-1] = aio->a_iov[i]; } + aio->a_niov--; } - if (!GetOverlappedResult(s->p, olp, &nsent, TRUE)) { - rv = GetLastError(); - return (nni_winpipe_error(rv)); - } - NNI_ASSERT(nsent <= resid); - NNI_ASSERT(nsent <= len); - resid -= nsent; - if (nsent < iov[i].iov_len) { - iov[i].iov_buf = buf + nsent; - iov[i].iov_len -= nsent; - } else { - i++; + + if (aio->a_niov > 0) { + // If we have more to do, submit it! + nni_win_ipc_send_start(pipe); + return; } } - return (0); + + // All done; hopefully successfully. + nni_list_remove(&pipe->writeq, aio); + nni_aio_finish(aio, rv, aio->a_count); } -int -nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) +static void +nni_win_ipc_send_start(nni_plat_ipc_pipe *pipe) { - int i; - DWORD nrecv; - DWORD resid; + void *buf; DWORD len; - char *buf; - nni_iov iov[4]; int rv; - OVERLAPPED *olp = &s->recv_olpd; - - NNI_ASSERT(cnt <= 4); - for (i = 0, resid = 0; i < cnt; resid += (DWORD) iov[i].iov_len, i++) { - iov[i].iov_len = iovs[i].iov_len; - iov[i].iov_buf = iovs[i].iov_buf; - } - - i = 0; - while (resid) { - NNI_ASSERT(i < cnt); - nrecv = 0; - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - len = iov[i].iov_len > 0x1000000 ? 0x1000000 : - (DWORD) iov[i].iov_len; - buf = iov[i].iov_buf; - - if (!ReadFile(s->p, buf, len, NULL, olp)) { - if ((rv = GetLastError()) != ERROR_IO_PENDING) { - return (nni_winpipe_error(rv)); - } + nni_win_event *evt = &pipe->send_evt; + OVERLAPPED *olpd = nni_win_event_overlapped(evt); + nni_aio *aio = nni_list_first(&pipe->writeq); + + NNI_ASSERT(aio != NULL); + NNI_ASSERT(aio->a_niov > 0); + NNI_ASSERT(aio->a_iov[0].iov_len > 0); + NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); + + if (pipe->p == INVALID_HANDLE_VALUE) { + rv = NNG_ECLOSED; + goto fail; + } + + if ((rv = nni_win_event_reset(evt)) != 0) { + goto fail; + } + + // Now start a writefile. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes do + // not appear to support scatter/gather, so we have to process + // each element in turn. + buf = aio->a_iov[0].iov_buf; + len = (DWORD) aio->a_iov[0].iov_len; + olpd = nni_win_event_overlapped(evt); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } + + if (!WriteFile(pipe->p, buf, len, NULL, olpd)) { + // If we failed immediately, then process it. + if ((rv = GetLastError()) == ERROR_IO_PENDING) { + // This is the normal path we expect; the IO will + // complete asynchronously. + return; } - if (!GetOverlappedResult(s->p, olp, &nrecv, TRUE)) { - rv = GetLastError(); - return (nni_winpipe_error(rv)); + + // Some synchronous error occurred. + rv = nni_win_error(rv); + goto fail; + } + + // If we completed synchronously, then do the completion. This is + // not normally expected. + nni_win_ipc_send_finish(pipe); + return; + +fail: + nni_aio_list_remove(aio); + nni_aio_finish(aio, rv, aio->a_count); +} + + +static void +nni_win_ipc_send_cb(void *arg) +{ + nni_plat_ipc_pipe *pipe = arg; + + nni_mtx_lock(&pipe->mtx); + nni_win_ipc_send_finish(pipe); + nni_mtx_unlock(&pipe->mtx); +} + + +void +nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio) +{ + nni_win_event *evt = &pipe->send_evt; + int rv; + + nni_mtx_lock(&pipe->mtx); + if ((rv = nni_aio_start(aio, nni_win_ipc_send_cancel, pipe)) != 0) { + nni_mtx_unlock(&pipe->mtx); + return; + } + if (pipe->p == INVALID_HANDLE_VALUE) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&pipe->mtx); + return; + } + + if ((rv = nni_win_event_reset(evt)) != 0) { + nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&pipe->mtx); + return; + } + nni_aio_list_append(&pipe->writeq, aio); + nni_win_ipc_send_start(pipe); + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_win_ipc_recv_cancel(nni_aio *aio) +{ + nni_plat_ipc_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + nni_win_event_cancel(&pipe->recv_evt); + nni_aio_list_remove(aio); + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_win_ipc_recv_finish(nni_plat_ipc_pipe *pipe) +{ + nni_win_event *evt = &pipe->recv_evt; + OVERLAPPED *olpd = nni_win_event_overlapped(evt); + int rv = 0; + nni_aio *aio; + DWORD cnt; + + if (GetOverlappedResult(pipe->p, olpd, &cnt, TRUE) == FALSE) { + rv = nni_win_error(GetLastError()); + } + if ((aio = nni_list_first(&pipe->readq)) == NULL) { + // If the AIO was canceled, but IOCP thread was still + // working on it, we might have seen this. + return; + } + if (rv == 0) { + NNI_ASSERT(cnt <= aio->a_iov[0].iov_len); + aio->a_count += cnt; + aio->a_iov[0].iov_buf = (char *) aio->a_iov[0].iov_buf + cnt; + aio->a_iov[0].iov_len -= cnt; + + if (aio->a_iov[0].iov_len == 0) { + int i; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i-1] = aio->a_iov[i]; + } + aio->a_niov--; } - NNI_ASSERT(nrecv <= resid); - NNI_ASSERT(nrecv <= len); - resid -= nrecv; - if (nrecv < iov[i].iov_len) { - iov[i].iov_buf = buf + nrecv; - iov[i].iov_len -= nrecv; - } else { - i++; + + if (aio->a_niov > 0) { + // If we have more to do, submit it! + nni_win_ipc_recv_start(pipe); + return; } } - return (0); + + // All done; hopefully successfully. + nni_list_remove(&pipe->readq, aio); + nni_aio_finish(aio, rv, aio->a_count); } -int -nni_plat_ipc_init(nni_plat_ipcsock *s) +static void +nni_win_ipc_recv_start(nni_plat_ipc_pipe *pipe) { + void *buf; + DWORD len; int rv; + nni_win_event *evt = &pipe->recv_evt; + OVERLAPPED *olpd = nni_win_event_overlapped(evt); + nni_aio *aio = nni_list_first(&pipe->readq); + + NNI_ASSERT(aio != NULL); + NNI_ASSERT(aio->a_niov > 0); + NNI_ASSERT(aio->a_iov[0].iov_len > 0); + NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); + + if (pipe->p == INVALID_HANDLE_VALUE) { + rv = NNG_ECLOSED; + goto fail; + } - s->p = INVALID_HANDLE_VALUE; - s->recv_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (s->recv_olpd.hEvent == INVALID_HANDLE_VALUE) { - rv = GetLastError(); - return (nni_winpipe_error(rv)); + if ((rv = nni_win_event_reset(evt)) != 0) { + goto fail; } - s->send_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (s->send_olpd.hEvent == INVALID_HANDLE_VALUE) { - rv = GetLastError(); - CloseHandle(s->recv_olpd.hEvent); - return (nni_winpipe_error(rv)); + + // Now start a readfile. We assume that only one read can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes do + // not appear to support scatter/gather, so we have to process + // each element in turn. + buf = aio->a_iov[0].iov_buf; + len = (DWORD) aio->a_iov[0].iov_len; + olpd = nni_win_event_overlapped(evt); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; } - s->conn_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (s->conn_olpd.hEvent == INVALID_HANDLE_VALUE) { - rv = GetLastError(); - CloseHandle(s->send_olpd.hEvent); - CloseHandle(s->recv_olpd.hEvent); - return (nni_winpipe_error(rv)); + + if (!ReadFile(pipe->p, buf, len, NULL, olpd)) { + // If we failed immediately, then process it. + if ((rv = GetLastError()) == ERROR_IO_PENDING) { + // This is the normal path we expect; the IO will + // complete asynchronously. + return; + } + + // Some synchronous error occurred. + rv = nni_win_error(rv); + goto fail; } - InitializeCriticalSection(&s->cs); - return (0); + + // If we completed synchronously, then do the completion. This is + // not normally expected. + nni_win_ipc_recv_finish(pipe); + return; + +fail: + nni_aio_list_remove(aio); + nni_aio_finish(aio, rv, 0); } static void -nni_plat_ipc_close(nni_plat_ipcsock *s) +nni_win_ipc_recv_cb(void *arg) { - HANDLE fd; + nni_plat_ipc_pipe *pipe = arg; - EnterCriticalSection(&s->cs); - if ((fd = s->p) != INVALID_HANDLE_VALUE) { - s->p = INVALID_HANDLE_VALUE; - if (s->server) { - (void) DisconnectNamedPipe(fd); - } - (void) CancelIoEx(fd, &s->send_olpd); - (void) CancelIoEx(fd, &s->recv_olpd); - (void) CancelIoEx(fd, &s->conn_olpd); - (void) CloseHandle(fd); + nni_mtx_lock(&pipe->mtx); + nni_win_ipc_recv_finish(pipe); + nni_mtx_unlock(&pipe->mtx); +} + + +void +nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio) +{ + nni_win_event *evt = &pipe->send_evt; + int rv; + + nni_mtx_lock(&pipe->mtx); + if ((rv = nni_aio_start(aio, nni_win_ipc_recv_cancel, pipe)) != 0) { + nni_mtx_unlock(&pipe->mtx); + return; } - LeaveCriticalSection(&s->cs); + if (pipe->p == INVALID_HANDLE_VALUE) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&pipe->mtx); + return; + } + + if ((rv = nni_win_event_reset(evt)) != 0) { + nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&pipe->mtx); + return; + } + nni_aio_list_append(&pipe->readq, aio); + nni_win_ipc_recv_start(pipe); } void -nni_plat_ipc_fini(nni_plat_ipcsock *s) +nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe) { - nni_plat_ipc_close(s); - DeleteCriticalSection(&s->cs); - CloseHandle(s->recv_olpd.hEvent); - CloseHandle(s->send_olpd.hEvent); - CloseHandle(s->conn_olpd.hEvent); + nni_mtx_lock(&pipe->mtx); + if (pipe->p != INVALID_HANDLE_VALUE) { + CloseHandle(pipe->p); + pipe->p = INVALID_HANDLE_VALUE; + } + nni_win_event_cancel(&pipe->send_evt); + nni_win_event_cancel(&pipe->recv_evt); + nni_mtx_unlock(&pipe->mtx); } void -nni_plat_ipc_shutdown(nni_plat_ipcsock *s) +nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe) +{ + nni_plat_ipc_pipe_close(pipe); + + nni_win_event_fini(&pipe->send_evt); + nni_win_event_fini(&pipe->recv_evt); + nni_mtx_fini(&pipe->mtx); + NNI_FREE_STRUCT(pipe); +} + + +int +nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) { - nni_plat_ipc_close(s); + const char *path; + nni_plat_ipc_ep *ep; + int rv; + + if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { + return (NNG_EADDRINVAL); + } + path = url + strlen("ipc://"); + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + ZeroMemory(ep, sizeof (ep)); + if ((rv = nni_mtx_init(&ep->mtx)) != 0) { + NNI_FREE_STRUCT(ep); + return (rv); + } + + ep->mode = mode; + NNI_LIST_NODE_INIT(&ep->node); + nni_aio_list_init(&ep->aios); + + (void) snprintf(ep->path, sizeof (ep->path), "\\\\.\\pipe\\%s", path); + + *epp = ep; + return (0); } int -nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path) +nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) { int rv; + HANDLE p; - snprintf(s->path, sizeof (s->path), "\\\\.\\pipe\\%s", path); + nni_mtx_lock(&ep->mtx); + if (ep->mode != NNI_EP_MODE_LISTEN) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EINVAL); + } + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EBUSY); + } // We create the first named pipe, and we make sure that it is // properly ours. - s->p = CreateNamedPipeA( - s->path, + p = CreateNamedPipeA(ep->path, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS, PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, NULL); - - if (s->p == INVALID_HANDLE_VALUE) { + if (p == INVALID_HANDLE_VALUE) { if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) { - return (NNG_EADDRINUSE); + rv = NNG_EADDRINUSE; + } else { + rv = nni_win_error(rv); } - return (nni_winpipe_error(rv)); + goto failed; + } + rv = nni_win_event_init(&ep->acc_evt, nni_win_ipc_acc_cb, ep, p); + if (rv != 0) { + goto failed; } - s->server = 1; + if ((rv = nni_win_iocp_register(p)) != 0) { + goto failed; + } + + ep->p = p; + ep->started = 1; + nni_mtx_unlock(&ep->mtx); return (0); + +failed: + + nni_mtx_unlock(&ep->mtx); + if (p != INVALID_HANDLE_VALUE) { + (void) CloseHandle(p); + } + + return (rv); } -int -nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server) +static void +nni_win_ipc_acc_finish(nni_plat_ipc_ep *ep) { - int rv; - OVERLAPPED *olp = &s->conn_olpd; + nni_win_event *evt = &ep->acc_evt; DWORD nbytes; - - s->server = 1; - if (!ConnectNamedPipe(server->p, olp)) { - rv = GetLastError(); - switch (rv) { - case ERROR_PIPE_CONNECTED: - break; - case ERROR_IO_PENDING: - if (!GetOverlappedResult(server->p, olp, &nbytes, - TRUE)) { - rv = GetLastError(); - return (nni_winpipe_error(rv)); - } - default: - rv = GetLastError(); - return (nni_winpipe_error(rv)); + int rv; + nni_plat_ipc_pipe *pipe; + nni_aio *aio; + HANDLE newp, oldp; + + // Note: This should be called with the ep lock held, and only when + // the ConnectNamedPipe has finished. + + rv = 0; + if (!GetOverlappedResult(ep->p, &evt->olpd, &nbytes, FALSE)) { + if ((rv = GetLastError()) == ERROR_IO_INCOMPLETE) { + // We should never be here normally, but if the + // pipe got accepted by another client we can + // some times race here. + return; } } - EnterCriticalSection(&server->cs); - if (server->p != INVALID_HANDLE_VALUE) { - s->p = server->p; - server->p = CreateNamedPipeA( - server->path, - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | - PIPE_REJECT_REMOTE_CLIENTS, - PIPE_UNLIMITED_INSTANCES, - 4096, 4096, 0, NULL); - if (server->p == INVALID_HANDLE_VALUE) { - // We return the old handle, so that future accept - // attempts have a chance of succeeding. That means - // we will disconnect the current client. - rv = GetLastError(); - server->p = s->p; - DisconnectNamedPipe(server->p); - s->p = INVALID_HANDLE_VALUE; - LeaveCriticalSection(&server->cs); - return (nni_winpipe_error(rv)); + if ((aio = nni_list_first(&ep->aios)) == NULL) { + // No completion available to us. + if (rv == 0) { + NNI_ASSERT(0); + DisconnectNamedPipe(ep->p); } + return; } - LeaveCriticalSection(&server->cs); - return (0); + nni_list_remove(&ep->aios, aio); + if (rv != 0) { + nni_aio_finish(aio, rv, 0); + return; + } + + newp = CreateNamedPipeA(ep->path, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | + PIPE_REJECT_REMOTE_CLIENTS, + PIPE_UNLIMITED_INSTANCES, + 4096, 4096, 0, NULL); + if (newp == INVALID_HANDLE_VALUE) { + rv = nni_win_error(GetLastError()); + DisconnectNamedPipe(ep->p); + return; + } + oldp = ep->p; + ep->p = newp; + + if ((rv = nni_win_ipc_pipe_init(&pipe, oldp)) != 0) { + DisconnectNamedPipe(oldp); + nni_aio_finish(aio, rv, 0); + return; + } + + aio->a_pipe = pipe; + nni_aio_finish(aio, 0, 0); } -int -nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path) +static void +nni_win_ipc_acc_cb(void *arg) { - int rv; + nni_plat_ipc_ep *ep = arg; - snprintf(s->path, sizeof (s->path), "\\\\.\\pipe\\%s", path); + nni_mtx_lock(&ep->mtx); + nni_win_ipc_acc_finish(ep); + nni_mtx_unlock(&ep->mtx); +} - for (;;) { - s->p = CreateFileA(s->path, GENERIC_READ | GENERIC_WRITE, - 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); - if (s->p == INVALID_HANDLE_VALUE) { - rv = GetLastError(); - switch (rv) { - case ERROR_PIPE_BUSY: - if (!WaitNamedPipe(s->path, - NMPWAIT_USE_DEFAULT_WAIT)) { - return (NNG_ETIMEDOUT); - } - continue; - case ERROR_FILE_NOT_FOUND: - // No present pipes (no listener?) - return (NNG_ECONNREFUSED); - default: - return (nni_winpipe_error(rv)); - } - } - s->server = 0; - break; - } - return (0); +static void +nni_win_ipc_acc_cancel(nni_aio *aio) +{ + nni_plat_ipc_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&ep->mtx); + nni_win_event_cancel(&ep->acc_evt); + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); } -#endif +void +nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) +{ + int rv; + nni_win_event *evt = &ep->acc_evt; + nni_mtx_lock(&ep->mtx); + if (nni_aio_start(aio, nni_win_ipc_acc_cancel, ep) != 0) { + nni_mtx_unlock(&ep->mtx); + return; + } + rv = 0; + if ((rv = nni_win_event_reset(evt)) != 0) { + nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&ep->mtx); + return; + } + if (!ConnectNamedPipe(ep->p, nni_win_event_overlapped(evt))) { + rv = GetLastError(); + switch (rv) { + case ERROR_PIPE_CONNECTED: + rv = 0; + break; + case ERROR_IO_PENDING: + nni_aio_list_append(&ep->aios, aio); + nni_mtx_unlock(&ep->mtx); + return; -struct nni_plat_ipc_pipe { - SOCKET s; - nni_win_event recv_evt; - nni_win_event send_evt; - OVERLAPPED recv_olpd; - OVERLAPPED send_olpd; -}; + default: + rv = nni_win_error(GetLastError()); + nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&ep->mtx); + return; + } + } -struct nni_plat_ipc_ep { - SOCKET s; - nni_win_event evt; - OVERLAPPED olpd; -}; + nni_win_ipc_acc_finish(ep); + nni_mtx_unlock(&ep->mtx); +} -int -nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) -{ - return (NNG_ENOTSUP); -} +// So Windows IPC is a bit different on the client side. There is no +// support for asynchronous connection, but we can fake it with a single +// thread that runs to establish the connection. That thread will run +// keep looping, sleeping for 10 ms between attempts. It performs non-blocking +// attempts to connect. +typedef struct nni_win_ipc_conn_work nni_win_ipc_conn_work; +struct nni_win_ipc_conn_work { + nni_list waiters; + nni_list workers; + nni_mtx mtx; + nni_cv cv; + nni_thr thr; + int exit; +}; +static nni_win_ipc_conn_work nni_win_ipc_connecter; -void -nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) + +static void +nni_win_ipc_conn_thr(void *arg) { -} + nni_win_ipc_conn_work *w = arg; + nni_plat_ipc_ep *ep; + nni_plat_ipc_pipe *pipe; + nni_aio *aio; + HANDLE p; + int rv; + nni_mtx_lock(&w->mtx); + for (;;) { + if (w->exit) { + break; + } + while ((ep = nni_list_first(&w->waiters)) != NULL) { + nni_list_remove(&w->waiters, ep); + nni_list_append(&w->workers, ep); + } -void -nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) -{ -} + while ((ep = nni_list_first(&w->workers)) != NULL) { + nni_list_remove(&w->workers, ep); + if ((aio = nni_list_first(&ep->aios)) == NULL) { + continue; + } + nni_list_remove(&ep->aios, aio); + p = CreateFileA(ep->path, + GENERIC_READ | GENERIC_WRITE, + 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, + NULL); + + if (p == INVALID_HANDLE_VALUE) { + switch ((rv = GetLastError())) { + case ERROR_PIPE_BUSY: + // still in progress. + nni_list_prepend(&ep->aios, aio); + break; + case ERROR_FILE_NOT_FOUND: + nni_aio_finish(aio, NNG_ECONNREFUSED, + 0); + break; + default: + nni_aio_finish(aio, nni_win_error(rv), + 0); + break; + } + } else { + rv = nni_win_ipc_pipe_init(&pipe, p); + if (rv == 0) { + rv = nni_win_iocp_register(p); + } + if (rv != 0) { + DisconnectNamedPipe(p); + CloseHandle(p); + nni_aio_finish(aio, rv, 0); + } else { + aio->a_pipe = pipe; + nni_aio_finish(aio, 0, 0); + } + } + if (!nni_list_empty(&ep->aios)) { + nni_list_append(&w->waiters, ep); + } + } -extern int -nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) -{ - return (NNG_ENOTSUP); + // Wait 10 ms, unless woken earlier. + if (nni_list_empty(&w->waiters)) { + nni_cv_wait(&w->cv); + } else { + nni_cv_until(&w->cv, nni_clock() + 10000); + } + } + nni_mtx_unlock(&w->mtx); } -extern void -nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) +static void +nni_win_ipc_conn_cancel(nni_aio *aio) { + nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; + nni_plat_ipc_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&w->mtx); + nni_aio_list_remove(aio); + if (nni_list_empty(&ep->aios)) { + nni_list_remove(&w->waiters, ep); + } + nni_mtx_unlock(&w->mtx); } -extern void +void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) { + nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; + + nni_mtx_lock(&w->mtx); + + if (nni_list_active(&w->waiters, ep)) { + nni_aio_finish(aio, NNG_EBUSY, 0); + nni_mtx_unlock(&w->mtx); + return; + } + + if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) { + nni_mtx_unlock(&w->mtx); + return; + } + nni_list_append(&ep->aios, aio); + nni_list_append(&w->waiters, ep); + nni_cv_wake(&w->cv); + nni_mtx_unlock(&w->mtx); } void -nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio) +nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) { + nni_mtx_lock(&ep->mtx); + if (ep->p) { + CloseHandle(ep->p); + ep->p = NULL; + } + nni_mtx_unlock(&ep->mtx); + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); } void -nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio) +nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) { + nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; + nni_aio *aio; + + switch (ep->mode) { + case NNI_EP_MODE_DIAL: + nni_mtx_lock(&w->mtx); + if (nni_list_active(&w->waiters, ep)) { + nni_list_remove(&w->waiters, ep); + } + while ((aio = nni_list_first(&ep->aios)) != NULL) { + nni_list_remove(&ep->aios, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + nni_mtx_unlock(&w->mtx); + break; + case NNI_EP_MODE_LISTEN: + nni_mtx_lock(&ep->mtx); + while ((aio = nni_list_first(&ep->aios)) != NULL) { + nni_list_remove(&ep->aios, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + if (ep->p != INVALID_HANDLE_VALUE) { + nni_win_event_cancel(&ep->acc_evt); + CloseHandle(ep->p); + ep->p = INVALID_HANDLE_VALUE; + } + nni_mtx_unlock(&ep->mtx); + break; + } } -void -nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p) +int +nni_win_ipc_sysinit(void) { + int rv; + nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter; + + NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node); + NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node); + + if (((rv = nni_mtx_init(&worker->mtx)) != 0) || + ((rv = nni_cv_init(&worker->cv, &worker->mtx)) != 0)) { + return (rv); + } + rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker); + if (rv != 0) { + return (rv); + } + + nni_thr_run(&worker->thr); + + return (0); } void -nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p) +nni_win_ipc_sysfini(void) { + nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter; + + nni_mtx_lock(&worker->mtx); + worker->exit = 1; + nni_cv_wake(&worker->cv); + nni_mtx_unlock(&worker->mtx); + nni_thr_fini(&worker->thr); + nni_cv_fini(&worker->cv); + nni_mtx_fini(&worker->mtx); } |
