aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-15 13:59:54 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-15 13:59:54 -0800
commit8eddc85dc9f621df44b85bf0769a1449109170ce (patch)
treeb2249a7152cb4413f9e871a26239283c1ae033df
parent2eace190e7715e6c48f33897acf7537576f7a412 (diff)
downloadnng-8eddc85dc9f621df44b85bf0769a1449109170ce.tar.gz
nng-8eddc85dc9f621df44b85bf0769a1449109170ce.tar.bz2
nng-8eddc85dc9f621df44b85bf0769a1449109170ce.zip
Windows IPC works now.
-rw-r--r--src/core/defs.h4
-rw-r--r--src/platform/windows/win_impl.h10
-rw-r--r--src/platform/windows/win_ipc.c424
3 files changed, 257 insertions, 181 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index 9c745660..13d0b25f 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -17,6 +17,10 @@
// superior, support for such are not universal.
#define NNI_ARG_UNUSED(x) ((void) x);
+#define NNI_ASSERT(x) \
+ if (!(x)) \
+ nni_panic("%s: %d: assert err: %s", __FILE__, __LINE__, # x)
+
// These types are common but have names shared with user space.
typedef struct nng_socket nni_sock;
typedef struct nng_endpoint nni_ep;
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index f675f510..c3afef91 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -40,7 +40,15 @@ struct nni_plat_tcpsock {
};
struct nni_plat_ipcsock {
- HANDLE p;
+ HANDLE p;
+
+ char path[256];
+ WSAOVERLAPPED recv_olpd;
+ WSAOVERLAPPED send_olpd;
+ WSAOVERLAPPED conn_olpd; // Use for both connect and accept
+ CRITICAL_SECTION cs;
+
+ int server;
};
struct nni_plat_thr {
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index f4d4d004..feef6942 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -11,273 +11,337 @@
#ifdef PLATFORM_WINDOWS
+// Windows has infinite numbers of error codes it seems. We only bother
+// with the ones that are relevant to us (we think).
+static struct {
+ int win_err;
+ int nng_err;
+}
+nni_winpipe_errnos[] = {
+ { ERROR_FILE_NOT_FOUND, NNG_ENOENT },
+ { ERROR_ACCESS_DENIED, NNG_EPERM },
+ { ERROR_INVALID_HANDLE, NNG_ECLOSED },
+ { ERROR_NOT_ENOUGH_MEMORY, NNG_ENOMEM },
+ { ERROR_INVALID_ACCESS, NNG_EPERM },
+ { ERROR_INVALID_DATA, NNG_EINVAL },
+ { ERROR_OUTOFMEMORY, NNG_ENOMEM },
+ { ERROR_HANDLE_EOF, NNG_ECLOSED },
+ { ERROR_NOT_SUPPORTED, NNG_ENOTSUP },
+ { ERROR_OUT_OF_STRUCTURES, NNG_ENOMEM },
+ { ERROR_INVALID_PARAMETER, NNG_EINVAL },
+ { ERROR_CONNECTION_REFUSED, NNG_ECONNREFUSED },
+ { ERROR_BROKEN_PIPE, NNG_ECLOSED },
+ { ERROR_BAD_PIPE, NNG_ECLOSED },
+ { ERROR_NO_DATA, NNG_ECLOSED },
+ { ERROR_PIPE_NOT_CONNECTED, NNG_ECLOSED },
+ { ERROR_OPERATION_ABORTED, NNG_ECLOSED },
+ { WAIT_TIMEOUT, NNG_ETIMEDOUT },
+ // Must be Last!!
+ { 0, 0 },
+};
+
+
+static int
+nni_winpipe_error(int werr)
+{
+ int i;
+
+ if (werr == 0) {
+ return (0);
+ }
+
+ for (i = 0; nni_winpipe_errnos[i].nng_err != 0; i++) {
+ if (werr == nni_winpipe_errnos[i].win_err) {
+ return (nni_winpipe_errnos[i].nng_err);
+ }
+ }
+ // Other system errno.
+ return (NNG_ESYSERR + werr);
+}
+
int
nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
{
-#if 0
- struct iovec iov[4]; // We never have more than 3 at present
int i;
- int offset;
- int resid = 0;
+ DWORD nsent;
+ DWORD resid;
+ char *buf;
+ DWORD len;
+ nni_iov iov[4];
int rv;
+ OVERLAPPED *olp = &s->send_olpd;
- if (cnt > 4) {
- return (NNG_EINVAL);
- }
-
- for (i = 0; i < cnt; i++) {
- iov[i].iov_base = iovs[i].iov_buf;
+ NNI_ASSERT(cnt <= 4);
+ for (i = 0, resid = 0; i < cnt; resid += iov[i].iov_len, i++) {
iov[i].iov_len = iovs[i].iov_len;
- resid += iov[i].iov_len;
+ iov[i].iov_buf = iovs[i].iov_buf;
}
i = 0;
while (resid) {
- rv = writev(s->fd, iov, cnt);
- if (rv < 0) {
- if (rv == EINTR) {
- continue;
+ NNI_ASSERT(i < cnt);
+ nsent = 0;
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ len = iov[i].iov_len > 0x1000000 ? 0x1000000 : iov[i].iov_len;
+ buf = iov[i].iov_buf;
+
+ if (!WriteFile(s->p, buf, len, NULL, olp)) {
+ if ((rv = GetLastError()) != ERROR_IO_PENDING) {
+ return (nni_winpipe_error(rv));
}
- return (nni_plat_errno(errno));
}
- if (rv > resid) {
- nni_panic("writev says it wrote too much!");
+ if (!GetOverlappedResult(s->p, olp, &nsent, TRUE)) {
+ rv = GetLastError();
+ return (nni_winpipe_error(rv));
}
- resid -= rv;
- while (rv) {
- if (iov[i].iov_len <= rv) {
- rv -= iov[i].iov_len;
- i++;
- cnt--;
- } else {
- iov[i].iov_len -= rv;
- iov[i].iov_base += rv;
- rv = 0;
- }
+ NNI_ASSERT(nsent <= resid);
+ NNI_ASSERT(nsent <= len);
+ resid -= nsent;
+ if (nsent < iov[i].iov_len) {
+ iov[i].iov_buf = buf + nsent;
+ iov[i].iov_len -= nsent;
+ } else {
+ i++;
}
}
-#endif
- return (NNG_ENOTSUP);
+ return (0);
}
int
nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
{
-#if 0
- struct iovec iov[4]; // We never have more than 3 at present
int i;
- int offset;
- int resid = 0;
+ DWORD nrecv;
+ DWORD resid;
+ DWORD len;
+ char *buf;
+ nni_iov iov[4];
int rv;
+ OVERLAPPED *olp = &s->recv_olpd;
- if (cnt > 4) {
- return (NNG_EINVAL);
- }
-
- for (i = 0; i < cnt; i++) {
- iov[i].iov_base = iovs[i].iov_buf;
+ NNI_ASSERT(cnt <= 4);
+ for (i = 0, resid = 0; i < cnt; resid += iov[i].iov_len, i++) {
iov[i].iov_len = iovs[i].iov_len;
- resid += iov[i].iov_len;
+ iov[i].iov_buf = iovs[i].iov_buf;
}
+
i = 0;
while (resid) {
- rv = readv(s->fd, iov, cnt);
- if (rv < 0) {
- if (errno == EINTR) {
- continue;
+ NNI_ASSERT(i < cnt);
+ nrecv = 0;
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ len = iov[i].iov_len > 0x1000000 ? 0x1000000 : iov[i].iov_len;
+ buf = iov[i].iov_buf;
+
+ if (!ReadFile(s->p, buf, len, NULL, olp)) {
+ if ((rv = GetLastError()) != ERROR_IO_PENDING) {
+ return (nni_winpipe_error(rv));
}
- return (nni_plat_errno(errno));
}
- if (rv == 0) {
- return (NNG_ECLOSED);
+ if (!GetOverlappedResult(s->p, olp, &nrecv, TRUE)) {
+ rv = GetLastError();
+ return (nni_winpipe_error(rv));
}
- if (rv > resid) {
- nni_panic("readv says it read too much!");
- }
-
- resid -= rv;
- while (rv) {
- if (iov[i].iov_len <= rv) {
- rv -= iov[i].iov_len;
- i++;
- cnt--;
- } else {
- iov[i].iov_len -= rv;
- iov[i].iov_base += rv;
- rv = 0;
- }
+ NNI_ASSERT(nrecv <= resid);
+ NNI_ASSERT(nrecv <= len);
+ resid -= nrecv;
+ if (nrecv < iov[i].iov_len) {
+ iov[i].iov_buf = buf + nrecv;
+ iov[i].iov_len -= nrecv;
+ } else {
+ i++;
}
}
-#endif
- return (NNG_ENOTSUP);
+ return (0);
}
int
nni_plat_ipc_init(nni_plat_ipcsock *s)
{
+ int rv;
+
s->p = INVALID_HANDLE_VALUE;
+ s->recv_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
+ if (s->recv_olpd.hEvent == INVALID_HANDLE_VALUE) {
+ rv = GetLastError();
+ return (nni_winpipe_error(rv));
+ }
+ s->send_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
+ if (s->send_olpd.hEvent == INVALID_HANDLE_VALUE) {
+ rv = GetLastError();
+ CloseHandle(s->recv_olpd.hEvent);
+ return (nni_winpipe_error(rv));
+ }
+ s->conn_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
+ if (s->conn_olpd.hEvent == INVALID_HANDLE_VALUE) {
+ rv = GetLastError();
+ CloseHandle(s->send_olpd.hEvent);
+ CloseHandle(s->recv_olpd.hEvent);
+ return (nni_winpipe_error(rv));
+ }
+ InitializeCriticalSection(&s->cs);
return (0);
}
-void
-nni_plat_ipc_fini(nni_plat_ipcsock *s)
+static void
+nni_plat_ipc_close(nni_plat_ipcsock *s)
{
- if (s->p != INVALID_HANDLE_VALUE) {
- (void) CloseHandle(s->p);
+ HANDLE fd;
+
+ EnterCriticalSection(&s->cs);
+ if ((fd = s->p) != INVALID_HANDLE_VALUE) {
s->p = INVALID_HANDLE_VALUE;
+ if (s->server) {
+ (void) DisconnectNamedPipe(fd);
+ }
+ (void) CancelIoEx(fd, &s->send_olpd);
+ (void) CancelIoEx(fd, &s->recv_olpd);
+ (void) CancelIoEx(fd, &s->conn_olpd);
+ (void) CloseHandle(fd);
}
+ LeaveCriticalSection(&s->cs);
+}
+
+
+void
+nni_plat_ipc_fini(nni_plat_ipcsock *s)
+{
+ nni_plat_ipc_close(s);
+ DeleteCriticalSection(&s->cs);
+ CloseHandle(s->recv_olpd.hEvent);
+ CloseHandle(s->send_olpd.hEvent);
+ CloseHandle(s->conn_olpd.hEvent);
}
void
nni_plat_ipc_shutdown(nni_plat_ipcsock *s)
{
- if (s->p != INVALID_HANDLE_VALUE) {
-#if 0
- (void) shutdown(s->fd, SHUT_RDWR);
- // This causes the equivalent of a close. Hopefully waking
- // up anything that didn't get the hint with the shutdown.
- // (macOS does not see the shtudown).
- (void) dup2(nni_plat_devnull, s->fd);
-#endif
- }
+ nni_plat_ipc_close(s);
}
-// nni_plat_ipc_listen creates a file descriptor bound to the given address.
-// This basically does the equivalent of socket, bind, and listen. We have
-// chosen a default value for the listen backlog of 128, which should be
-// plenty. (If it isn't, then the accept thread can't get enough resources
-// to keep up, and your clients are going to experience bad things. Normally
-// the actual backlog should hover near 0 anyway.)
int
nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path)
{
-#if 0
- int fd, checkfd;
- struct sockaddr_un sun;
int rv;
- if (nni_plat_ipc_path_to_sockaddr(&sun, path) < 0) {
- return (NNG_EADDRINVAL);
- }
-
- if ((fd = socket(AF_UNIX, NNI_IPC_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
-
- // We are going to check to see if there was a name already there.
- // If there was, and nothing is listening (ECONNREFUSED), then we
- // will just try to cleanup the old socket. Note that this is not
- // perfect in all scenarios, so use this with caution.
- if ((checkfd = socket(AF_UNIX, NNI_IPC_SOCKTYPE, 0)) < 0) {
- (void) close(fd);
- return (nni_plat_errno(errno));
- }
-
- // Nonblocking because we don't want to wait for any remote server.
- (void) fcntl(checkfd, F_SETFL, O_NONBLOCK);
- if (connect(checkfd, (struct sockaddr *) &sun, sizeof (sun)) < 0) {
- if (errno == ECONNREFUSED) {
- (void) unlink(path);
+ snprintf(s->path, sizeof (s->path), "\\\\.\\pipe\\%s", path);
+
+ // We create the first named pipe, and we make sure that it is
+ // properly ours.
+ s->p = CreateNamedPipeA(
+ s->path,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
+ FILE_FLAG_FIRST_PIPE_INSTANCE,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS,
+ PIPE_UNLIMITED_INSTANCES,
+ 4096, 4096, 0, NULL);
+
+ if (s->p == INVALID_HANDLE_VALUE) {
+ if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) {
+ return (NNG_EADDRINUSE);
}
+ return (nni_winpipe_error(rv));
}
- (void) close(checkfd);
+ s->server = 1;
- nni_plat_ipc_setopts(fd);
-
- if ((s->unlink = nni_alloc(strlen(path) + 1)) == NULL) {
- return (NNG_ENOMEM);
- }
- strcpy(s->unlink, path);
- if (bind(fd, (struct sockaddr *) &sun, sizeof (sun)) < 0) {
- rv = nni_plat_errno(errno);
- nni_free(s->unlink, strlen(path) + 1);
- s->unlink = NULL;
- (void) close(fd);
- return (rv);
- }
-
- // Listen -- 128 depth is probably sufficient. If it isn't, other
- // bad things are going to happen.
- if (listen(fd, 128) < 0) {
- rv = nni_plat_errno(errno);
- (void) close(fd);
- return (rv);
- }
- s->fd = fd;
-#endif
- return (NNG_ENOTSUP);
+ return (0);
}
int
-nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path)
+nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server)
{
-#if 0
- int fd;
- int len;
- struct sockaddr_un sun;
int rv;
-
- if (nni_plat_ipc_path_to_sockaddr(&sun, path) < 0) {
- return (NNG_EADDRINVAL);
- }
-
- if ((fd = socket(AF_UNIX, NNI_IPC_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
+ OVERLAPPED *olp = &s->conn_olpd;
+ HANDLE newp;
+ DWORD nbytes;
+
+ s->server = 1;
+ if (!ConnectNamedPipe(server->p, olp)) {
+ rv = GetLastError();
+ switch (rv) {
+ case ERROR_PIPE_CONNECTED:
+ break;
+ case ERROR_IO_PENDING:
+ if (!GetOverlappedResult(server->p, olp, &nbytes,
+ TRUE)) {
+ rv = GetLastError();
+ return (nni_winpipe_error(rv));
+ }
+ default:
+ rv = GetLastError();
+ return (nni_winpipe_error(rv));
+ }
}
- nni_plat_ipc_setopts(fd);
-
- if (connect(fd, (struct sockaddr *) &sun, sizeof (sun)) != 0) {
- rv = nni_plat_errno(errno);
- (void) close(fd);
- if (rv == NNG_ENOENT) {
- // In this case we want to treat this the same as
- // ECONNREFUSED, since they mean the same to us.
- rv = NNG_ECONNREFUSED;
+ EnterCriticalSection(&server->cs);
+ if (server->p != INVALID_HANDLE_VALUE) {
+ s->p = server->p;
+ server->p = CreateNamedPipeA(
+ server->path,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS,
+ PIPE_UNLIMITED_INSTANCES,
+ 4096, 4096, 0, NULL);
+ if (server->p == INVALID_HANDLE_VALUE) {
+ // We return the old handle, so that future accept
+ // attempts have a chance of succeeding. That means
+ // we will disconnect the current client.
+ rv = GetLastError();
+ server->p = s->p;
+ DisconnectNamedPipe(server->p);
+ s->p = INVALID_HANDLE_VALUE;
+ LeaveCriticalSection(&server->cs);
+ return (nni_winpipe_error(rv));
}
- return (rv);
}
- s->fd = fd;
-#endif
- return (NNG_ENOTSUP);
+ LeaveCriticalSection(&server->cs);
+
+ return (0);
}
int
-nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server)
+nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path)
{
-#if 0
- int fd;
+ int rv;
- for (;;) {
- fd = accept(server->fd, NULL, NULL);
+ snprintf(s->path, sizeof (s->path), "\\\\.\\pipe\\%s", path);
- if (fd < 0) {
- if ((errno == EINTR) || (errno == ECONNABORTED)) {
- // These are not fatal errors, keep trying
- continue;
- }
- if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
+ for (;;) {
+ s->p = CreateFileA(s->path, GENERIC_READ | GENERIC_WRITE,
+ 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
+ if (s->p == INVALID_HANDLE_VALUE) {
+ rv = GetLastError();
+ switch (rv) {
+ case ERROR_PIPE_BUSY:
+ if (!WaitNamedPipe(s->path,
+ NMPWAIT_USE_DEFAULT_WAIT)) {
+ return (NNG_ETIMEDOUT);
+ }
continue;
+ case ERROR_FILE_NOT_FOUND:
+ // No present pipes (no listener?)
+ return (NNG_ECONNREFUSED);
+
+ default:
+ return (nni_winpipe_error(rv));
}
- return (nni_plat_errno(errno));
- } else {
- break;
}
+ s->server = 0;
+ break;
}
-
- nni_plat_ipc_setopts(fd);
-
- s->fd = fd;
-#endif
- return (NNG_ENOTSUP);
+ return (0);
}