diff options
| -rw-r--r-- | src/core/defs.h | 4 | ||||
| -rw-r--r-- | src/platform/windows/win_impl.h | 10 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 424 |
3 files changed, 257 insertions, 181 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index 9c745660..13d0b25f 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -17,6 +17,10 @@ // superior, support for such are not universal. #define NNI_ARG_UNUSED(x) ((void) x); +#define NNI_ASSERT(x) \ + if (!(x)) \ + nni_panic("%s: %d: assert err: %s", __FILE__, __LINE__, # x) + // These types are common but have names shared with user space. typedef struct nng_socket nni_sock; typedef struct nng_endpoint nni_ep; diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index f675f510..c3afef91 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -40,7 +40,15 @@ struct nni_plat_tcpsock { }; struct nni_plat_ipcsock { - HANDLE p; + HANDLE p; + + char path[256]; + WSAOVERLAPPED recv_olpd; + WSAOVERLAPPED send_olpd; + WSAOVERLAPPED conn_olpd; // Use for both connect and accept + CRITICAL_SECTION cs; + + int server; }; struct nni_plat_thr { diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index f4d4d004..feef6942 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -11,273 +11,337 @@ #ifdef PLATFORM_WINDOWS +// Windows has infinite numbers of error codes it seems. We only bother +// with the ones that are relevant to us (we think). +static struct { + int win_err; + int nng_err; +} +nni_winpipe_errnos[] = { + { ERROR_FILE_NOT_FOUND, NNG_ENOENT }, + { ERROR_ACCESS_DENIED, NNG_EPERM }, + { ERROR_INVALID_HANDLE, NNG_ECLOSED }, + { ERROR_NOT_ENOUGH_MEMORY, NNG_ENOMEM }, + { ERROR_INVALID_ACCESS, NNG_EPERM }, + { ERROR_INVALID_DATA, NNG_EINVAL }, + { ERROR_OUTOFMEMORY, NNG_ENOMEM }, + { ERROR_HANDLE_EOF, NNG_ECLOSED }, + { ERROR_NOT_SUPPORTED, NNG_ENOTSUP }, + { ERROR_OUT_OF_STRUCTURES, NNG_ENOMEM }, + { ERROR_INVALID_PARAMETER, NNG_EINVAL }, + { ERROR_CONNECTION_REFUSED, NNG_ECONNREFUSED }, + { ERROR_BROKEN_PIPE, NNG_ECLOSED }, + { ERROR_BAD_PIPE, NNG_ECLOSED }, + { ERROR_NO_DATA, NNG_ECLOSED }, + { ERROR_PIPE_NOT_CONNECTED, NNG_ECLOSED }, + { ERROR_OPERATION_ABORTED, NNG_ECLOSED }, + { WAIT_TIMEOUT, NNG_ETIMEDOUT }, + // Must be Last!! + { 0, 0 }, +}; + + +static int +nni_winpipe_error(int werr) +{ + int i; + + if (werr == 0) { + return (0); + } + + for (i = 0; nni_winpipe_errnos[i].nng_err != 0; i++) { + if (werr == nni_winpipe_errnos[i].win_err) { + return (nni_winpipe_errnos[i].nng_err); + } + } + // Other system errno. + return (NNG_ESYSERR + werr); +} + int nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) { -#if 0 - struct iovec iov[4]; // We never have more than 3 at present int i; - int offset; - int resid = 0; + DWORD nsent; + DWORD resid; + char *buf; + DWORD len; + nni_iov iov[4]; int rv; + OVERLAPPED *olp = &s->send_olpd; - if (cnt > 4) { - return (NNG_EINVAL); - } - - for (i = 0; i < cnt; i++) { - iov[i].iov_base = iovs[i].iov_buf; + NNI_ASSERT(cnt <= 4); + for (i = 0, resid = 0; i < cnt; resid += iov[i].iov_len, i++) { iov[i].iov_len = iovs[i].iov_len; - resid += iov[i].iov_len; + iov[i].iov_buf = iovs[i].iov_buf; } i = 0; while (resid) { - rv = writev(s->fd, iov, cnt); - if (rv < 0) { - if (rv == EINTR) { - continue; + NNI_ASSERT(i < cnt); + nsent = 0; + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + len = iov[i].iov_len > 0x1000000 ? 0x1000000 : iov[i].iov_len; + buf = iov[i].iov_buf; + + if (!WriteFile(s->p, buf, len, NULL, olp)) { + if ((rv = GetLastError()) != ERROR_IO_PENDING) { + return (nni_winpipe_error(rv)); } - return (nni_plat_errno(errno)); } - if (rv > resid) { - nni_panic("writev says it wrote too much!"); + if (!GetOverlappedResult(s->p, olp, &nsent, TRUE)) { + rv = GetLastError(); + return (nni_winpipe_error(rv)); } - resid -= rv; - while (rv) { - if (iov[i].iov_len <= rv) { - rv -= iov[i].iov_len; - i++; - cnt--; - } else { - iov[i].iov_len -= rv; - iov[i].iov_base += rv; - rv = 0; - } + NNI_ASSERT(nsent <= resid); + NNI_ASSERT(nsent <= len); + resid -= nsent; + if (nsent < iov[i].iov_len) { + iov[i].iov_buf = buf + nsent; + iov[i].iov_len -= nsent; + } else { + i++; } } -#endif - return (NNG_ENOTSUP); + return (0); } int nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) { -#if 0 - struct iovec iov[4]; // We never have more than 3 at present int i; - int offset; - int resid = 0; + DWORD nrecv; + DWORD resid; + DWORD len; + char *buf; + nni_iov iov[4]; int rv; + OVERLAPPED *olp = &s->recv_olpd; - if (cnt > 4) { - return (NNG_EINVAL); - } - - for (i = 0; i < cnt; i++) { - iov[i].iov_base = iovs[i].iov_buf; + NNI_ASSERT(cnt <= 4); + for (i = 0, resid = 0; i < cnt; resid += iov[i].iov_len, i++) { iov[i].iov_len = iovs[i].iov_len; - resid += iov[i].iov_len; + iov[i].iov_buf = iovs[i].iov_buf; } + i = 0; while (resid) { - rv = readv(s->fd, iov, cnt); - if (rv < 0) { - if (errno == EINTR) { - continue; + NNI_ASSERT(i < cnt); + nrecv = 0; + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + len = iov[i].iov_len > 0x1000000 ? 0x1000000 : iov[i].iov_len; + buf = iov[i].iov_buf; + + if (!ReadFile(s->p, buf, len, NULL, olp)) { + if ((rv = GetLastError()) != ERROR_IO_PENDING) { + return (nni_winpipe_error(rv)); } - return (nni_plat_errno(errno)); } - if (rv == 0) { - return (NNG_ECLOSED); + if (!GetOverlappedResult(s->p, olp, &nrecv, TRUE)) { + rv = GetLastError(); + return (nni_winpipe_error(rv)); } - if (rv > resid) { - nni_panic("readv says it read too much!"); - } - - resid -= rv; - while (rv) { - if (iov[i].iov_len <= rv) { - rv -= iov[i].iov_len; - i++; - cnt--; - } else { - iov[i].iov_len -= rv; - iov[i].iov_base += rv; - rv = 0; - } + NNI_ASSERT(nrecv <= resid); + NNI_ASSERT(nrecv <= len); + resid -= nrecv; + if (nrecv < iov[i].iov_len) { + iov[i].iov_buf = buf + nrecv; + iov[i].iov_len -= nrecv; + } else { + i++; } } -#endif - return (NNG_ENOTSUP); + return (0); } int nni_plat_ipc_init(nni_plat_ipcsock *s) { + int rv; + s->p = INVALID_HANDLE_VALUE; + s->recv_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); + if (s->recv_olpd.hEvent == INVALID_HANDLE_VALUE) { + rv = GetLastError(); + return (nni_winpipe_error(rv)); + } + s->send_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); + if (s->send_olpd.hEvent == INVALID_HANDLE_VALUE) { + rv = GetLastError(); + CloseHandle(s->recv_olpd.hEvent); + return (nni_winpipe_error(rv)); + } + s->conn_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); + if (s->conn_olpd.hEvent == INVALID_HANDLE_VALUE) { + rv = GetLastError(); + CloseHandle(s->send_olpd.hEvent); + CloseHandle(s->recv_olpd.hEvent); + return (nni_winpipe_error(rv)); + } + InitializeCriticalSection(&s->cs); return (0); } -void -nni_plat_ipc_fini(nni_plat_ipcsock *s) +static void +nni_plat_ipc_close(nni_plat_ipcsock *s) { - if (s->p != INVALID_HANDLE_VALUE) { - (void) CloseHandle(s->p); + HANDLE fd; + + EnterCriticalSection(&s->cs); + if ((fd = s->p) != INVALID_HANDLE_VALUE) { s->p = INVALID_HANDLE_VALUE; + if (s->server) { + (void) DisconnectNamedPipe(fd); + } + (void) CancelIoEx(fd, &s->send_olpd); + (void) CancelIoEx(fd, &s->recv_olpd); + (void) CancelIoEx(fd, &s->conn_olpd); + (void) CloseHandle(fd); } + LeaveCriticalSection(&s->cs); +} + + +void +nni_plat_ipc_fini(nni_plat_ipcsock *s) +{ + nni_plat_ipc_close(s); + DeleteCriticalSection(&s->cs); + CloseHandle(s->recv_olpd.hEvent); + CloseHandle(s->send_olpd.hEvent); + CloseHandle(s->conn_olpd.hEvent); } void nni_plat_ipc_shutdown(nni_plat_ipcsock *s) { - if (s->p != INVALID_HANDLE_VALUE) { -#if 0 - (void) shutdown(s->fd, SHUT_RDWR); - // This causes the equivalent of a close. Hopefully waking - // up anything that didn't get the hint with the shutdown. - // (macOS does not see the shtudown). - (void) dup2(nni_plat_devnull, s->fd); -#endif - } + nni_plat_ipc_close(s); } -// nni_plat_ipc_listen creates a file descriptor bound to the given address. -// This basically does the equivalent of socket, bind, and listen. We have -// chosen a default value for the listen backlog of 128, which should be -// plenty. (If it isn't, then the accept thread can't get enough resources -// to keep up, and your clients are going to experience bad things. Normally -// the actual backlog should hover near 0 anyway.) int nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path) { -#if 0 - int fd, checkfd; - struct sockaddr_un sun; int rv; - if (nni_plat_ipc_path_to_sockaddr(&sun, path) < 0) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(AF_UNIX, NNI_IPC_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - - // We are going to check to see if there was a name already there. - // If there was, and nothing is listening (ECONNREFUSED), then we - // will just try to cleanup the old socket. Note that this is not - // perfect in all scenarios, so use this with caution. - if ((checkfd = socket(AF_UNIX, NNI_IPC_SOCKTYPE, 0)) < 0) { - (void) close(fd); - return (nni_plat_errno(errno)); - } - - // Nonblocking because we don't want to wait for any remote server. - (void) fcntl(checkfd, F_SETFL, O_NONBLOCK); - if (connect(checkfd, (struct sockaddr *) &sun, sizeof (sun)) < 0) { - if (errno == ECONNREFUSED) { - (void) unlink(path); + snprintf(s->path, sizeof (s->path), "\\\\.\\pipe\\%s", path); + + // We create the first named pipe, and we make sure that it is + // properly ours. + s->p = CreateNamedPipeA( + s->path, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | + FILE_FLAG_FIRST_PIPE_INSTANCE, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | + PIPE_REJECT_REMOTE_CLIENTS, + PIPE_UNLIMITED_INSTANCES, + 4096, 4096, 0, NULL); + + if (s->p == INVALID_HANDLE_VALUE) { + if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) { + return (NNG_EADDRINUSE); } + return (nni_winpipe_error(rv)); } - (void) close(checkfd); + s->server = 1; - nni_plat_ipc_setopts(fd); - - if ((s->unlink = nni_alloc(strlen(path) + 1)) == NULL) { - return (NNG_ENOMEM); - } - strcpy(s->unlink, path); - if (bind(fd, (struct sockaddr *) &sun, sizeof (sun)) < 0) { - rv = nni_plat_errno(errno); - nni_free(s->unlink, strlen(path) + 1); - s->unlink = NULL; - (void) close(fd); - return (rv); - } - - // Listen -- 128 depth is probably sufficient. If it isn't, other - // bad things are going to happen. - if (listen(fd, 128) < 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - s->fd = fd; -#endif - return (NNG_ENOTSUP); + return (0); } int -nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path) +nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server) { -#if 0 - int fd; - int len; - struct sockaddr_un sun; int rv; - - if (nni_plat_ipc_path_to_sockaddr(&sun, path) < 0) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(AF_UNIX, NNI_IPC_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); + OVERLAPPED *olp = &s->conn_olpd; + HANDLE newp; + DWORD nbytes; + + s->server = 1; + if (!ConnectNamedPipe(server->p, olp)) { + rv = GetLastError(); + switch (rv) { + case ERROR_PIPE_CONNECTED: + break; + case ERROR_IO_PENDING: + if (!GetOverlappedResult(server->p, olp, &nbytes, + TRUE)) { + rv = GetLastError(); + return (nni_winpipe_error(rv)); + } + default: + rv = GetLastError(); + return (nni_winpipe_error(rv)); + } } - nni_plat_ipc_setopts(fd); - - if (connect(fd, (struct sockaddr *) &sun, sizeof (sun)) != 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - if (rv == NNG_ENOENT) { - // In this case we want to treat this the same as - // ECONNREFUSED, since they mean the same to us. - rv = NNG_ECONNREFUSED; + EnterCriticalSection(&server->cs); + if (server->p != INVALID_HANDLE_VALUE) { + s->p = server->p; + server->p = CreateNamedPipeA( + server->path, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | + PIPE_REJECT_REMOTE_CLIENTS, + PIPE_UNLIMITED_INSTANCES, + 4096, 4096, 0, NULL); + if (server->p == INVALID_HANDLE_VALUE) { + // We return the old handle, so that future accept + // attempts have a chance of succeeding. That means + // we will disconnect the current client. + rv = GetLastError(); + server->p = s->p; + DisconnectNamedPipe(server->p); + s->p = INVALID_HANDLE_VALUE; + LeaveCriticalSection(&server->cs); + return (nni_winpipe_error(rv)); } - return (rv); } - s->fd = fd; -#endif - return (NNG_ENOTSUP); + LeaveCriticalSection(&server->cs); + + return (0); } int -nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server) +nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path) { -#if 0 - int fd; + int rv; - for (;;) { - fd = accept(server->fd, NULL, NULL); + snprintf(s->path, sizeof (s->path), "\\\\.\\pipe\\%s", path); - if (fd < 0) { - if ((errno == EINTR) || (errno == ECONNABORTED)) { - // These are not fatal errors, keep trying - continue; - } - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + for (;;) { + s->p = CreateFileA(s->path, GENERIC_READ | GENERIC_WRITE, + 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); + if (s->p == INVALID_HANDLE_VALUE) { + rv = GetLastError(); + switch (rv) { + case ERROR_PIPE_BUSY: + if (!WaitNamedPipe(s->path, + NMPWAIT_USE_DEFAULT_WAIT)) { + return (NNG_ETIMEDOUT); + } continue; + case ERROR_FILE_NOT_FOUND: + // No present pipes (no listener?) + return (NNG_ECONNREFUSED); + + default: + return (nni_winpipe_error(rv)); } - return (nni_plat_errno(errno)); - } else { - break; } + s->server = 0; + break; } - - nni_plat_ipc_setopts(fd); - - s->fd = fd; -#endif - return (NNG_ENOTSUP); + return (0); } |
