aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/clock.h2
-rw-r--r--src/core/endpt.c30
-rw-r--r--src/core/platform.h4
-rw-r--r--src/nng.c9
-rw-r--r--src/nng.h7
-rw-r--r--src/platform/posix/posix_debug.c61
-rw-r--r--src/platform/posix/posix_ipc.c7
-rw-r--r--src/platform/posix/posix_net.c34
-rw-r--r--src/platform/posix/posix_thread.c23
-rw-r--r--src/platform/windows/win_debug.c26
-rw-r--r--src/platform/windows/win_impl.h13
-rw-r--r--src/platform/windows/win_ipc.c11
-rw-r--r--src/platform/windows/win_net.c358
-rw-r--r--src/platform/windows/win_rand.c3
-rw-r--r--src/platform/windows/win_thread.c19
-rw-r--r--src/transport/ipc/ipc.c17
-rw-r--r--src/transport/tcp/tcp.c18
17 files changed, 444 insertions, 198 deletions
diff --git a/src/core/clock.h b/src/core/clock.h
index c46adfbe..3b607322 100644
--- a/src/core/clock.h
+++ b/src/core/clock.h
@@ -16,4 +16,4 @@ extern nni_time nni_clock(void);
extern void nni_usleep(nni_duration usec);
-#endif // CORE_CLOCK_H \ No newline at end of file
+#endif // CORE_CLOCK_H
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 554d9a36..c4673376 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -297,16 +297,32 @@ nni_listener(void *arg)
if (((rv = nni_ep_accept(ep, &pipe)) == 0) &&
((rv = nni_pipe_start(pipe)) == 0)) {
+ // Success! Loop around for the next one.
continue;
}
- if (rv == NNG_ECLOSED) {
+
+ switch (rv) {
+ case NNG_ECLOSED:
+ // This indicates the listening socket got closed.
+ // We just bail.
+ return;
+
+ case NNG_ECONNABORTED:
+ case NNG_ECONNRESET:
+ // These are remote conditions, no cool down.
+ cooldown = 0;
+ break;
+ case NNG_ENOMEM:
+ // We're running low on memory, so its best to wait
+ // a whole second to give the system a chance to
+ // recover memory.
+ cooldown = 1000000;
+ break;
+ default:
+ // Other cases we sleep just a tiny bit to avoid
+ // burning the cpu (e.g. out of files).
+ cooldown = 1000; // 1 msec
break;
- }
- cooldown = 1000; // 1 ms cooldown
- if (rv == NNG_ENOMEM) {
- // For out of memory, we need to give more
- // time for the system to reclaim resources.
- cooldown = 100000; // 100ms
}
cooldown += nni_clock();
nni_mtx_lock(mx);
diff --git a/src/core/platform.h b/src/core/platform.h
index bf8a52dc..8d19a7c8 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -170,7 +170,7 @@ extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int);
// nni_plat_tcp_init initializes the socket, for example it can
// set underlying file descriptors to -1, etc.
-extern void nni_plat_tcp_init(nni_plat_tcpsock *);
+extern int nni_plat_tcp_init(nni_plat_tcpsock *);
// nni_plat_tcp_fini just closes a TCP socket, and releases any related
// resources.
@@ -211,7 +211,7 @@ extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int);
// nni_plat_ipc_init initializes the socket, for example it can
// set underlying file descriptors to -1, etc.
-extern void nni_plat_ipc_init(nni_plat_ipcsock *);
+extern int nni_plat_ipc_init(nni_plat_ipcsock *);
// nni_plat_ipc_fini just closes an IPC socket, and releases any related
// resources.
diff --git a/src/nng.c b/src/nng.c
index d550c974..ec221fb7 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -199,6 +199,15 @@ nng_strerror(int num)
case NNG_EPERM:
return ("Permission denied");
+
+ case NNG_EMSGSIZE:
+ return ("Message too large");
+
+ case NNG_ECONNRESET:
+ return ("Connection reset");
+
+ case NNG_ECONNABORTED:
+ return ("Connection aborted");
}
if (num & NNG_ESYSERR) {
diff --git a/src/nng.h b/src/nng.h
index bf7bfa7b..cba6a596 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -38,8 +38,8 @@ extern "C" {
#endif // NNG_SHARED_LIB
#else
#define NNG_DECL extern
-#endif // _WIN32 && !NNG_STATIC_LIB
-#endif // NNG_DECL
+#endif // _WIN32 && !NNG_STATIC_LIB
+#endif // NNG_DECL
// Types common to nng.
typedef struct nng_socket nng_socket;
@@ -388,6 +388,9 @@ NNG_DECL int nng_device(nng_socket *, nng_socket *);
#define NNG_EUNREACHABLE (14)
#define NNG_EADDRINVAL (15)
#define NNG_EPERM (16)
+#define NNG_EMSGSIZE (17)
+#define NNG_ECONNABORTED (18)
+#define NNG_ECONNRESET (19)
// NNG_SYSERR is a special code, which allows us to wrap errors from the
// underlyuing operating system. We generally prefer to map errors to one
diff --git a/src/platform/posix/posix_debug.c b/src/platform/posix/posix_debug.c
index 71006b25..6199df1d 100644
--- a/src/platform/posix/posix_debug.c
+++ b/src/platform/posix/posix_debug.c
@@ -40,8 +40,6 @@ nni_plat_strerror(int errnum)
}
-#define NNI_ERR(x, y) { x, y },
-
// There are of course other errors than these, but these are the ones
// that we might reasonably expect and want to handle "cleanly". Most of
// the others should be handled by the system error code. Note that EFAULT
@@ -55,35 +53,36 @@ static struct {
int nng_err;
}
nni_plat_errnos[] = {
- NNI_ERR(EINTR, NNG_EINTR)
- NNI_ERR(EINVAL, NNG_EINVAL)
- NNI_ERR(ENOMEM, NNG_ENOMEM)
- NNI_ERR(EACCES, NNG_EPERM)
- NNI_ERR(EADDRINUSE, NNG_EADDRINUSE)
- NNI_ERR(EADDRNOTAVAIL, NNG_EADDRINVAL)
- NNI_ERR(EAFNOSUPPORT, NNG_ENOTSUP)
- NNI_ERR(EAGAIN, NNG_EAGAIN)
- NNI_ERR(EBADF, NNG_ECLOSED)
- NNI_ERR(EBUSY, NNG_EBUSY)
- NNI_ERR(ECONNABORTED, NNG_ECLOSED)
- NNI_ERR(ECONNREFUSED, NNG_ECONNREFUSED)
- NNI_ERR(ECONNRESET, NNG_ECLOSED)
- NNI_ERR(EHOSTUNREACH, NNG_EUNREACHABLE)
- NNI_ERR(ENETUNREACH, NNG_EUNREACHABLE)
- NNI_ERR(ENAMETOOLONG, NNG_EINVAL)
- NNI_ERR(ENOENT, NNG_ENOENT)
- NNI_ERR(ENOBUFS, NNG_ENOMEM)
- NNI_ERR(ENOPROTOOPT, NNG_ENOTSUP)
- NNI_ERR(ENOSYS, NNG_ENOTSUP)
- NNI_ERR(ENOTSUP, NNG_ENOTSUP)
- NNI_ERR(EPERM, NNG_EPERM)
- NNI_ERR(EPIPE, NNG_ECLOSED)
- NNI_ERR(EPROTO, NNG_EPROTO)
- NNI_ERR(EPROTONOSUPPORT, NNG_ENOTSUP)
- NNI_ERR(ETIME, NNG_ETIMEDOUT)
- NNI_ERR(ETIMEDOUT, NNG_ETIMEDOUT)
- NNI_ERR(EWOULDBLOCK, NNG_EAGAIN)
- NNI_ERR(0, 0) // must be last
+ { EINTR, NNG_EINTR },
+ { EINVAL, NNG_EINVAL },
+ { ENOMEM, NNG_ENOMEM },
+ { EACCES, NNG_EPERM },
+ { EADDRINUSE, NNG_EADDRINUSE },
+ { EADDRNOTAVAIL, NNG_EADDRINVAL },
+ { EAFNOSUPPORT, NNG_ENOTSUP },
+ { EAGAIN, NNG_EAGAIN },
+ { EBADF, NNG_ECLOSED },
+ { EBUSY, NNG_EBUSY },
+ { ECONNABORTED, NNG_ECONNABORTED },
+ { ECONNREFUSED, NNG_ECONNREFUSED },
+ { ECONNRESET, NNG_ECONNRESET },
+ { EHOSTUNREACH, NNG_EUNREACHABLE },
+ { ENETUNREACH, NNG_EUNREACHABLE },
+ { ENAMETOOLONG, NNG_EINVAL },
+ { ENOENT, NNG_ENOENT },
+ { ENOBUFS, NNG_ENOMEM },
+ { ENOPROTOOPT, NNG_ENOTSUP },
+ { ENOSYS, NNG_ENOTSUP },
+ { ENOTSUP, NNG_ENOTSUP },
+ { EPERM, NNG_EPERM },
+ { EPIPE, NNG_ECLOSED },
+ { EPROTO, NNG_EPROTO },
+ { EPROTONOSUPPORT, NNG_ENOTSUP },
+ { ETIME, NNG_ETIMEDOUT },
+ { ETIMEDOUT, NNG_ETIMEDOUT },
+ { EWOULDBLOCK, NNG_EAGAIN },
+ // must be last
+ { 0, 0 },
};
int
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index 8c40397a..6044280e 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -73,7 +73,7 @@ nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
i = 0;
while (resid) {
- rv = writev(s->fd, iov, cnt);
+ rv = writev(s->fd, &iov[i], cnt);
if (rv < 0) {
if (rv == EINTR) {
continue;
@@ -121,7 +121,7 @@ nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
}
i = 0;
while (resid) {
- rv = readv(s->fd, iov, cnt);
+ rv = readv(s->fd, &iov[i], cnt);
if (rv < 0) {
if (errno == EINTR) {
continue;
@@ -171,10 +171,11 @@ nni_plat_ipc_setopts(int fd)
}
-void
+int
nni_plat_ipc_init(nni_plat_ipcsock *s)
{
s->fd = -1;
+ return (0);
}
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index 76a40259..f9720b66 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -23,6 +23,12 @@
#include <unistd.h>
#include <netdb.h>
+#ifdef SOCK_CLOEXEC
+#define NNI_TCP_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
+#else
+#define NNI_TCP_SOCKTYPE SOCK_STREAM
+#endif
+
static int
nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa)
{
@@ -128,7 +134,7 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
i = 0;
while (resid) {
- rv = writev(s->fd, iov, cnt);
+ rv = writev(s->fd, &iov[i], cnt);
if (rv < 0) {
if (rv == EINTR) {
continue;
@@ -177,7 +183,7 @@ nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
i = 0;
while (resid) {
- rv = readv(s->fd, iov, cnt);
+ rv = readv(s->fd, &iov[i], cnt);
if (rv < 0) {
if (errno == EINTR) {
continue;
@@ -233,10 +239,11 @@ nni_plat_tcp_setopts(int fd)
}
-void
+int
nni_plat_tcp_init(nni_plat_tcpsock *s)
{
s->fd = -1;
+ return (0);
}
@@ -282,12 +289,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
return (NNG_EADDRINVAL);
}
-#ifdef SOCK_CLOEXEC
- fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0);
-#else
- fd = socket(ss.ss_family, SOCK_STREAM, 0);
-#endif
- if (fd < 0) {
+ if ((fd = socket(ss.ss_family, NNI_TCP_SOCKTYPE, 0)) < 0) {
return (nni_plat_errno(errno));
}
@@ -330,12 +332,7 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
return (NNG_EADDRINVAL);
}
-#ifdef SOCK_CLOEXEC
- fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0);
-#else
- fd = socket(ss.ss_family, SOCK_STREAM, 0);
-#endif
- if (fd < 0) {
+ if ((fd = socket(ss.ss_family, NNI_TCP_SOCKTYPE, 0)) < 0) {
return (nni_plat_errno(errno));
}
@@ -381,13 +378,6 @@ nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server)
#endif
if (fd < 0) {
- if ((errno == EINTR) || (errno == ECONNABORTED)) {
- // These are not fatal errors, keep trying
- continue;
- }
- if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
- continue;
- }
return (nni_plat_errno(errno));
} else {
break;
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index a429e071..7b4dbbdc 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -28,6 +28,7 @@ static int nni_plat_forked = 0;
pthread_condattr_t nni_cvattr;
pthread_mutexattr_t nni_mxattr;
+static pthread_attr_t nni_pthread_attr;
// We open a /dev/null file descriptor so that we can dup2() it to
// cause MacOS X to wakeup. This gives us a "safe" close semantic.
@@ -189,7 +190,8 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg)
thr->arg = arg;
// POSIX wants functions to return a void *, but we don't care.
- rv = pthread_create(&thr->tid, NULL, nni_plat_thr_main, thr);
+ rv = pthread_create(&thr->tid, &nni_pthread_attr,
+ nni_plat_thr_main, thr);
if (rv != 0) {
//nni_printf("pthread_create: %s", strerror(rv));
return (NNG_ENOMEM);
@@ -263,9 +265,27 @@ nni_plat_init(int (*helper)(void))
return (NNG_ENOMEM);
}
+ rv = pthread_attr_init(&nni_pthread_attr);
+ if (rv != 0) {
+ pthread_mutex_unlock(&nni_plat_lock);
+ (void) close(nni_plat_devnull);
+ pthread_mutexattr_destroy(&nni_mxattr);
+ pthread_condattr_destroy(&nni_cvattr);
+ return (NNG_ENOMEM);
+ }
+
+ // We don't force this, but we want to have it small... we could
+ // probably get by with even just 8k, but Linux usually wants 16k
+ // as a minimum. If this fails, its not fatal, just we won't be
+ // as scalable / thrifty with our use of VM.
+ (void) pthread_attr_setstacksize(&nni_pthread_attr, 16384);
+
if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) {
pthread_mutex_unlock(&nni_plat_lock);
(void) close(nni_plat_devnull);
+ pthread_mutexattr_destroy(&nni_mxattr);
+ pthread_condattr_destroy(&nni_cvattr);
+ pthread_attr_destroy(&nni_pthread_attr);
return (NNG_ENOMEM);
}
if ((rv = helper()) == 0) {
@@ -284,6 +304,7 @@ nni_plat_fini(void)
if (nni_plat_inited) {
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
+ pthread_attr_destroy(&nni_pthread_attr);
(void) close(nni_plat_devnull);
nni_plat_devnull = -1;
nni_plat_inited = 0;
diff --git a/src/platform/windows/win_debug.c b/src/platform/windows/win_debug.c
index 189f5d4c..00fd6493 100644
--- a/src/platform/windows/win_debug.c
+++ b/src/platform/windows/win_debug.c
@@ -40,26 +40,24 @@ nni_plat_strerror(int errnum)
}
-#define NNI_ERR(x, y) { x, y },
-
// Win32 has its own error codes, but these ones it shares with POSIX.
static struct {
int sys_err;
int nng_err;
}
nni_plat_errnos[] = {
- NNI_ERR(ENOENT, NNG_ENOENT)
- NNI_ERR(EINTR, NNG_EINTR)
- NNI_ERR(EINVAL, NNG_EINVAL)
- NNI_ERR(ENOMEM, NNG_ENOMEM)
- NNI_ERR(EACCES, NNG_EPERM)
- NNI_ERR(EAGAIN, NNG_EAGAIN)
- NNI_ERR(EBADF, NNG_ECLOSED)
- NNI_ERR(EBUSY, NNG_EBUSY)
- NNI_ERR(ENAMETOOLONG, NNG_EINVAL)
- NNI_ERR(EPERM, NNG_EPERM)
- NNI_ERR(EPIPE, NNG_ECLOSED)
- NNI_ERR(0, 0) // must be last
+ { ENOENT, NNG_ENOENT },
+ { EINTR, NNG_EINTR },
+ { EINVAL, NNG_EINVAL },
+ { ENOMEM, NNG_ENOMEM },
+ { EACCES, NNG_EPERM },
+ { EAGAIN, NNG_EAGAIN },
+ { EBADF, NNG_ECLOSED },
+ { EBUSY, NNG_EBUSY },
+ { ENAMETOOLONG, NNG_EINVAL },
+ { EPERM, NNG_EPERM },
+ { EPIPE, NNG_ECLOSED },
+ { 0, 0 } // must be last
};
int
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index b2714b23..f675f510 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -27,7 +27,16 @@
// elsewhere.
struct nni_plat_tcpsock {
- SOCKET s;
+ SOCKET s;
+
+ WSAOVERLAPPED recv_olpd;
+ WSAOVERLAPPED send_olpd;
+ WSAOVERLAPPED conn_olpd; // Use for both connect and accept
+
+ // We have to lookup some function pointers using ioctls. Winsock,
+ // gotta love it.
+ LPFN_CONNECTEX connectex;
+ LPFN_ACCEPTEX acceptex;
};
struct nni_plat_ipcsock {
@@ -47,7 +56,7 @@ struct nni_plat_mtx {
struct nni_plat_cv {
CONDITION_VARIABLE cv;
- CRITICAL_SECTION *cs;
+ CRITICAL_SECTION * cs;
};
#endif // PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index 682eb0a8..f4d4d004 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -115,10 +115,11 @@ nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
}
-void
+int
nni_plat_ipc_init(nni_plat_ipcsock *s)
{
s->p = INVALID_HANDLE_VALUE;
+ return (0);
}
@@ -136,7 +137,7 @@ void
nni_plat_ipc_shutdown(nni_plat_ipcsock *s)
{
if (s->p != INVALID_HANDLE_VALUE) {
-#if 0
+#if 0
(void) shutdown(s->fd, SHUT_RDWR);
// This causes the equivalent of a close. Hopefully waking
// up anything that didn't get the hint with the shutdown.
@@ -209,7 +210,7 @@ nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path)
return (rv);
}
s->fd = fd;
-#endif
+#endif
return (NNG_ENOTSUP);
}
@@ -244,7 +245,7 @@ nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path)
return (rv);
}
s->fd = fd;
-#endif
+#endif
return (NNG_ENOTSUP);
}
@@ -275,7 +276,7 @@ nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server)
nni_plat_ipc_setopts(fd);
s->fd = fd;
-#endif
+#endif
return (NNG_ENOTSUP);
}
diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c
index 3bd34b78..c7a7849d 100644
--- a/src/platform/windows/win_net.c
+++ b/src/platform/windows/win_net.c
@@ -11,46 +11,104 @@
#ifdef PLATFORM_WINDOWS
+#include <stdio.h>
+
+// Windows has infinite numbers of error codes it seems.
static struct {
int wsa_err;
int nng_err;
}
nni_plat_wsa_errnos[] = {
- { WSAECONNABORTED, NNG_ECLOSED },
- { WSAEINTR, NNG_EINTR },
+ { WSA_INVALID_HANDLE, NNG_ECLOSED },
+ { WSA_NOT_ENOUGH_MEMORY, NNG_ENOMEM },
+ { WSA_INVALID_PARAMETER, NNG_EINVAL },
+ { WSA_OPERATION_ABORTED, NNG_ECLOSED },
+ { WSA_IO_INCOMPLETE, NNG_EAGAIN },
+
+ { WSAEINTR, NNG_EINTR },
+ { WSAEBADF, NNG_ECLOSED },
+ { WSAEACCES, NNG_EPERM },
+ { WSAEFAULT, NNG_ESYSERR + WSAEFAULT },
+ { WSAEWOULDBLOCK, NNG_EAGAIN },
+ { WSAEINPROGRESS, NNG_EAGAIN },
+ { WSAEALREADY, NNG_ESYSERR + WSAEALREADY },
+ { WSAENOTSOCK, NNG_ECLOSED },
+ { WSAEMSGSIZE, NNG_EMSGSIZE },
+ { WSAEPROTOTYPE, NNG_ESYSERR + WSAEPROTOTYPE },
+ { WSAENOPROTOOPT, NNG_ENOTSUP },
+ { WSAEPROTONOSUPPORT, NNG_ENOTSUP },
+ { WSAEPROTONOSUPPORT, NNG_ENOTSUP },
+ { WSAEADDRINUSE, NNG_EADDRINUSE },
+ { WSAEADDRNOTAVAIL, NNG_EADDRINVAL },
+ { WSAENETDOWN, NNG_EUNREACHABLE },
+ { WSAENETUNREACH, NNG_EUNREACHABLE },
+ { WSAECONNABORTED, NNG_ETIMEDOUT },
+ { WSAECONNRESET, NNG_ECLOSED },
+ { WSAENOBUFS, NNG_ENOMEM },
+ { WSAEISCONN, NNG_ESYSERR + WSAEISCONN },
+ { WSAENOTCONN, NNG_ECLOSED },
+ { WSAESHUTDOWN, NNG_ECLOSED },
+ { WSAETOOMANYREFS, NNG_ESYSERR + WSAETOOMANYREFS },
+ { WSAETIMEDOUT, NNG_ETIMEDOUT },
+ { WSAECONNREFUSED, NNG_ECONNREFUSED },
+ { WSAELOOP, NNG_ESYSERR + WSAELOOP },
+ { WSAENAMETOOLONG, NNG_ESYSERR + WSAENAMETOOLONG },
+ { WSAEHOSTDOWN, NNG_EUNREACHABLE },
+ { WSAEHOSTUNREACH, NNG_EUNREACHABLE },
+ { WSAENOTEMPTY, NNG_ESYSERR + WSAENOTEMPTY },
+ { WSAEPROCLIM, NNG_ESYSERR + WSAEPROCLIM },
+ { WSAEUSERS, NNG_ESYSERR + WSAEUSERS },
+ { WSAEDQUOT, NNG_ESYSERR + WSAEDQUOT },
+ { WSAESTALE, NNG_ESYSERR + WSAESTALE },
+ { WSAEREMOTE, NNG_ESYSERR + WSAEREMOTE },
+ { WSASYSNOTREADY, NNG_ESYSERR + WSASYSNOTREADY },
+ { WSAVERNOTSUPPORTED, NNG_ENOTSUP },
+ { WSANOTINITIALISED, NNG_ESYSERR + WSANOTINITIALISED },
+ { WSAEDISCON, NNG_ECLOSED },
+ { WSAENOMORE, NNG_ESYSERR + WSAENOMORE },
+ { WSAECANCELLED, NNG_ESYSERR + WSAECANCELLED },
+ { WSAEINVALIDPROVIDER, NNG_ESYSERR + WSAEINVALIDPROVIDER },
+ { WSAEPROVIDERFAILEDINIT, NNG_ESYSERR + WSAEPROVIDERFAILEDINIT },
+ { WSASYSCALLFAILURE, NNG_ESYSERR + WSASYSCALLFAILURE },
+ { WSASERVICE_NOT_FOUND, NNG_ESYSERR + WSASERVICE_NOT_FOUND },
+ { WSATYPE_NOT_FOUND, NNG_ESYSERR + WSATYPE_NOT_FOUND },
+ { WSA_E_CANCELLED, NNG_ESYSERR + WSA_E_CANCELLED },
+ { WSAEREFUSED, NNG_ESYSERR + WSAEREFUSED },
+ { WSAHOST_NOT_FOUND, NNG_EADDRINVAL },
+ { WSATRY_AGAIN, NNG_EAGAIN },
+ { WSANO_RECOVERY, NNG_ESYSERR + WSANO_RECOVERY },
+ { WSANO_DATA, NNG_EADDRINVAL },
+ // Eliding all the QoS related errors.
+
+#if 0
// REVIEW THESE!!!
- { WSAECONNRESET, NNG_ECONNREFUSED },
- { WSAEMSGSIZE, NNG_EINVAL },
- { WSAENETDOWN, NNG_EUNREACHABLE },
- { WSAENETRESET, NNG_ECLOSED },
- { WSAENOBUFS, NNG_ENOMEM },
- { WSAESHUTDOWN, NNG_ECLOSED },
- { WSAEWOULDBLOCK, NNG_EAGAIN },
- { WSAEBADF, NNG_ECLOSED },
- { WSA_INVALID_HANDLE, NNG_ECLOSED },
- { WSA_NOT_ENOUGH_MEMORY, NNG_ENOMEM },
- { WSA_INVALID_PARAMETER, NNG_EINVAL },
- { WSAEACCES, NNG_EPERM },
- { 0, 0 }, // MUST BE LAST
+ { ERROR_BROKEN_PIPE, NNG_ECLOSED },
+ { ERROR_CONNECTION_REFUSED, NNG_ECONNREFUSED },
+ { ERROR_NOT_CONNECTED, NNG_ECLOSED },
+ { ERROR_PIPE_NOT_CONNECTED, NNG_ECLOSED },
+ { ERROR_NO_DATA, NNG_ECLOSED },
+#endif
+ // Must be Last!!
+ { 0, 0 },
};
static int
-nni_plat_wsa_last_error(void)
+nni_winsock_error(int werr)
{
- int errnum = WSAGetLastError();
int i;
- if (errnum == 0) {
+ if (werr == 0) {
return (0);
}
+
for (i = 0; nni_plat_wsa_errnos[i].nng_err != 0; i++) {
- if (errnum == nni_plat_wsa_errnos[i].wsa_err) {
+ if (werr == nni_plat_wsa_errnos[i].wsa_err) {
return (nni_plat_wsa_errnos[i].nng_err);
}
}
// Other system errno.
- return (NNG_ESYSERR + errnum);
+ return (NNG_ESYSERR + werr);
}
@@ -140,22 +198,53 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
{
WSABUF iov[4]; // We never have more than 3 at present
int i;
- DWORD sent = 0;
int rv;
+ DWORD offset;
+ DWORD nsent;
+ DWORD resid;
+ DWORD flags;
+ WSAOVERLAPPED *olp = &s->send_olpd;
if (cnt > 4) {
return (NNG_EINVAL);
}
- for (i = 0; i < cnt; i++) {
+ for (i = 0, resid = 0; i < cnt; resid += iov[i].len, i++) {
iov[i].buf = iovs[i].iov_buf;
iov[i].len = iovs[i].iov_len;
}
- rv = WSASend(s->s, iov, cnt, &sent, 0, NULL, NULL);
- if (rv != 0) {
- // XXX: CONVERT WSAGetLastError code.
- return (nni_plat_wsa_last_error());
+ i = 0;
+ while (resid) {
+ flags = 0;
+ rv = WSASend(s->s, &iov[i], cnt, &nsent, flags, olp, NULL);
+ if (rv == SOCKET_ERROR) {
+ if ((rv = WSAGetLastError()) != WSA_IO_PENDING) {
+ return (nni_winsock_error(rv));
+ }
+ flags = 0;
+ if (!WSAGetOverlappedResult(s->s, olp, &nsent,
+ TRUE, &flags)) {
+ return (nni_winsock_error(WSAGetLastError()));
+ }
+ }
+
+ if (nsent > resid) {
+ nni_panic("WSASend says it sent too much");
+ }
+
+ resid -= nsent;
+ while (nsent) {
+ if (iov[i].len <= nsent) {
+ nsent -= iov[i].len;
+ i++;
+ cnt--;
+ } else {
+ iov[i].len -= nsent;
+ iov[i].buf += nsent;
+ nsent = 0;
+ }
+ }
}
return (0);
@@ -167,29 +256,39 @@ nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
{
WSABUF iov[4]; // We never have more than 3 at present
int i;
- int offset;
- int resid = 0;
int rv;
+ DWORD offset;
+ DWORD resid;
DWORD nrecv;
+ DWORD flags;
+ WSAOVERLAPPED *olp = &s->recv_olpd;
if (cnt > 4) {
return (NNG_EINVAL);
}
- for (i = 0; i < cnt; i++) {
+ for (i = 0, resid = 0; i < cnt; resid += iov[i].len, i++) {
iov[i].buf = iovs[i].iov_buf;
iov[i].len = iovs[i].iov_len;
- resid += iov[i].len;
}
i = 0;
while (resid) {
- rv = WSARecv(s->s, iov, cnt, &nrecv, 0, NULL, NULL);
- if (rv != 0) {
- return (nni_plat_wsa_last_error());
+ flags = 0;
+ rv = WSARecv(s->s, &iov[i], cnt, &nrecv, &flags, olp, NULL);
+ if (rv == SOCKET_ERROR) {
+ if ((rv = WSAGetLastError()) != WSA_IO_PENDING) {
+ return (nni_winsock_error(rv));
+ }
+ flags = 0;
+ if (!WSAGetOverlappedResult(s->s, olp, &nrecv,
+ TRUE, &flags)) {
+ return (nni_winsock_error(WSAGetLastError()));
+ }
}
+
if (nrecv > resid) {
- nni_panic("readv says it read too much!");
+ nni_panic("WSARecv says it read too much!");
}
resid -= nrecv;
@@ -227,19 +326,74 @@ nni_plat_tcp_setopts(SOCKET fd)
}
-void
+int
nni_plat_tcp_init(nni_plat_tcpsock *s)
{
s->s = INVALID_SOCKET;
+ return (0);
+}
+
+
+static int
+nni_plat_tcp_open(nni_plat_tcpsock *s)
+{
+ int rv;
+ DWORD nbytes;
+ GUID guid1 = WSAID_CONNECTEX;
+ GUID guid2 = WSAID_ACCEPTEX;
+
+ ZeroMemory(s, sizeof (*s));
+ s->s = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
+ WSA_FLAG_NO_HANDLE_INHERIT|WSA_FLAG_OVERLAPPED);
+ if (s->s == INVALID_SOCKET) {
+ rv = WSAGetLastError();
+ return (nni_winsock_error(rv));
+ }
+
+ if (WSAIoctl(s->s, SIO_GET_EXTENSION_FUNCTION_POINTER,
+ &guid1, sizeof (guid1), &s->connectex, sizeof (s->connectex),
+ &nbytes, NULL, NULL) == SOCKET_ERROR) {
+ nni_panic("failed lookup for ConnectEx function");
+ }
+ if (WSAIoctl(s->s, SIO_GET_EXTENSION_FUNCTION_POINTER,
+ &guid2, sizeof (guid2), &s->acceptex, sizeof (s->acceptex),
+ &nbytes, NULL, NULL) == SOCKET_ERROR) {
+ nni_panic("failed lookup for AcceptEx function");
+ }
+
+ nni_plat_tcp_setopts(s->s);
+
+ return (0);
+}
+
+
+static void
+nni_plat_tcp_close(nni_plat_tcpsock *s)
+{
+ SOCKET fd;
+
+ if ((fd = s->s) != INVALID_SOCKET) {
+ s->s = INVALID_SOCKET;
+ (void) shutdown(fd, SD_BOTH);
+ (void) CancelIoEx((HANDLE) fd, &s->conn_olpd);
+ (void) CancelIoEx((HANDLE) fd, &s->recv_olpd);
+ (void) CancelIoEx((HANDLE) fd, &s->send_olpd);
+ (void) closesocket(fd);
+ }
}
void
nni_plat_tcp_fini(nni_plat_tcpsock *s)
{
- if (s->s != INVALID_SOCKET) {
- (void) closesocket(s->s);
+ SOCKET fd;
+
+ if ((fd = s->s) != INVALID_SOCKET) {
s->s = INVALID_SOCKET;
+ (void) CancelIoEx((HANDLE) fd, &s->conn_olpd);
+ (void) CancelIoEx((HANDLE) fd, &s->recv_olpd);
+ (void) CancelIoEx((HANDLE) fd, &s->send_olpd);
+ (void) closesocket(fd);
}
}
@@ -247,9 +401,7 @@ nni_plat_tcp_fini(nni_plat_tcpsock *s)
void
nni_plat_tcp_shutdown(nni_plat_tcpsock *s)
{
- if (s->s != INVALID_SOCKET) {
- (void) shutdown(s->s, SD_BOTH);
- }
+ nni_plat_tcp_close(s);
}
@@ -264,46 +416,39 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
{
int len;
SOCKADDR_STORAGE ss;
+ ULONG yes;
int rv;
- BOOL yes;
len = nni_plat_to_sockaddr(&ss, addr);
if (len < 0) {
return (NNG_EADDRINVAL);
}
- s->s = WSASocket(ss.ss_family, SOCK_STREAM, 0, NULL, 0,
- WSA_FLAG_NO_HANDLE_INHERIT);
- if (s->s == INVALID_SOCKET) {
- return (nni_plat_wsa_last_error());
+ if ((rv = nni_plat_tcp_open(s)) != 0) {
+ return (rv);
}
- nni_plat_tcp_setopts(s->s);
-
// Make sure that we use the address exclusively. Windows lets
// others hijack us by default.
yes = 1;
if (setsockopt(s->s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes,
sizeof (yes)) == SOCKET_ERROR) {
- rv = nni_plat_wsa_last_error();
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
- return (rv);
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
}
if (bind(s->s, (struct sockaddr *) &ss, len) != 0) {
- rv = nni_plat_wsa_last_error();
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
- return (rv);
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
}
// Listen -- 128 depth is probably sufficient. If it isn't, other
// bad things are going to happen.
if (listen(s->s, 128) != 0) {
- rv = nni_plat_wsa_last_error();
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
- return (rv);
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
}
return (0);
@@ -320,6 +465,10 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
int len;
SOCKADDR_STORAGE ss;
SOCKADDR_STORAGE bss;
+ WSAOVERLAPPED *olp = &s->conn_olpd;
+ BOOL ok;
+ DWORD nbytes;
+ DWORD flags;
int rv;
len = nni_plat_to_sockaddr(&ss, addr);
@@ -327,39 +476,44 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
return (NNG_EADDRINVAL);
}
- s->s = WSASocket(ss.ss_family, SOCK_STREAM, 0, NULL, 0,
- WSA_FLAG_NO_HANDLE_INHERIT);
- if (s->s == INVALID_SOCKET) {
- return (nni_plat_wsa_last_error());
- }
-
if (bindaddr != NULL) {
if (bindaddr->s_un.s_family != addr->s_un.s_family) {
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
- return (NNG_EINVAL);
+ return (NNG_EADDRINVAL);
}
if (nni_plat_to_sockaddr(&bss, bindaddr) < 0) {
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
return (NNG_EADDRINVAL);
}
- if (bind(s->s, (struct sockaddr *) &bss, len) < 0) {
- rv = nni_plat_wsa_last_error();
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
- return (rv);
- }
+ } else {
+ ZeroMemory(&bss, sizeof (bss));
+ bss.ss_family = ss.ss_family;
}
- nni_plat_tcp_setopts(s->s);
-
- if (connect(s->s, (struct sockaddr *) &ss, len) != 0) {
- rv = nni_plat_wsa_last_error();
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
+ if ((rv = nni_plat_tcp_open(s)) != 0) {
return (rv);
}
+
+ // ConnectEx must always be bound first.
+ if (bind(s->s, (struct sockaddr *) &bss, len) < 0) {
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
+ }
+
+ if (s->connectex(s->s, (struct sockaddr *) &ss, len, NULL, 0, NULL,
+ olp)) {
+ // Immediate completion?
+ return (0);
+ }
+ if ((rv = WSAGetLastError()) != ERROR_IO_PENDING) {
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
+ }
+ nbytes = flags = 0;
+ if (!WSAGetOverlappedResult(s->s, olp, &nbytes, TRUE, &flags)) {
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
+ }
return (0);
}
@@ -367,26 +521,32 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
int
nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server)
{
- SOCKET fd;
- int err;
-
- for (;;) {
- fd = accept(server->s, NULL, NULL);
+ DWORD nbytes;
+ DWORD flags;
+ WSAOVERLAPPED *olp = &s->conn_olpd;
+ char ainfo[512];
+ int rv;
- if (fd == INVALID_SOCKET) {
- err = WSAGetLastError();
- if ((err == WSAECONNRESET) || (err == WSAEWOULDBLOCK)) {
- continue;
- }
- return (nni_plat_wsa_last_error());
- } else {
- break;
- }
+ if ((rv = nni_plat_tcp_open(s)) != 0) {
+ return (rv);
}
- nni_plat_tcp_setopts(fd);
-
- s->s = fd;
+ // 256 > (sizeof (SOCKADDR_STORAGE) + 16)
+ nbytes = 0;
+ if (s->acceptex(server->s, s->s, ainfo, 0, 256, 256, &nbytes, olp)) {
+ // Immediate completion?
+ return (0);
+ }
+ if ((rv = WSAGetLastError()) != ERROR_IO_PENDING) {
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
+ }
+ nbytes = flags = 0;
+ if (!WSAGetOverlappedResult(server->s, olp, &nbytes, TRUE, &flags)) {
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
+ }
return (0);
}
diff --git a/src/platform/windows/win_rand.c b/src/platform/windows/win_rand.c
index 1cabb6a0..8bca5927 100644
--- a/src/platform/windows/win_rand.c
+++ b/src/platform/windows/win_rand.c
@@ -17,13 +17,14 @@ void
nni_plat_seed_prng(void *buf, size_t bufsz)
{
unsigned val;
+
// The rand_s routine uses RtlGenRandom to get high quality
// pseudo random numbers (i.e. numbers that should be good enough
// for use with crypto keying.)
while (bufsz > sizeof (val)) {
rand_s(&val);
memcpy(buf, &val, sizeof (val));
- buf = (((char *)buf) + sizeof (val));
+ buf = (((char *) buf) + sizeof (val));
bufsz -= sizeof (val);
}
}
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index 4f323616..631883fc 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -125,8 +125,11 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg)
thr->func = fn;
thr->arg = arg;
- thr->handle = (HANDLE) _beginthreadex(NULL, 0,
- nni_plat_thr_main, thr, 0, NULL);
+ // We could probably even go down to 8k... but crypto for some
+ // protocols might get bigger than this. 1MB is waaay too big.
+ thr->handle = (HANDLE) _beginthreadex(NULL, 16384,
+ nni_plat_thr_main, thr, STACK_SIZE_PARAM_IS_A_RESERVATION,
+ NULL);
if (thr->handle == NULL) {
return (NNG_ENOMEM); // Best guess...
}
@@ -166,6 +169,17 @@ nni_plat_init(int (*helper)(void))
Sleep(1);
}
if (!inited) {
+ WSADATA data;
+ WORD ver;
+ ver = MAKEWORD(2, 2);
+ if (WSAStartup(MAKEWORD(2, 2), &data) != 0) {
+ InterlockedExchange(&initing, 0);
+ if ((LOBYTE(data.wVersion) != 2) ||
+ (HIBYTE(data.wVersion) != 2)) {
+ nni_panic("got back wrong winsock ver");
+ }
+ return (NNG_ENOMEM);
+ }
helper();
inited = 1;
}
@@ -178,6 +192,7 @@ nni_plat_init(int (*helper)(void))
void
nni_plat_fini(void)
{
+ WSACleanup();
}
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 8a28dfe5..2ddc74b7 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -190,7 +190,10 @@ nni_ipc_ep_init(void **epp, const char *url, uint16_t proto)
ep->closed = 0;
ep->proto = proto;
ep->rcvmax = 1024 * 1024; // XXX: fix this
- nni_plat_ipc_init(&ep->fd);
+ if ((rv = nni_plat_ipc_init(&ep->fd)) != 0) {
+ NNI_FREE_STRUCT(ep);
+ return (rv);
+ }
(void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
@@ -273,12 +276,16 @@ nni_ipc_ep_connect(void *arg, void **pipep)
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
- nni_plat_ipc_init(&pipe->fd);
+ if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
rv = nni_plat_ipc_connect(&pipe->fd, path);
if (rv != 0) {
+ nni_plat_ipc_fini(&pipe->fd);
NNI_FREE_STRUCT(pipe);
return (rv);
}
@@ -327,9 +334,13 @@ nni_ipc_ep_accept(void *arg, void **pipep)
}
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
- nni_plat_ipc_init(&pipe->fd);
+ if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
if ((rv = nni_plat_ipc_accept(&pipe->fd, &ep->fd)) != 0) {
+ nni_plat_ipc_fini(&pipe->fd);
NNI_FREE_STRUCT(pipe);
return (rv);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 9b8cdb1d..8fcf07e5 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -180,7 +180,11 @@ nni_tcp_ep_init(void **epp, const char *url, uint16_t proto)
ep->proto = proto;
ep->ipv4only = 0;
ep->rcvmax = 1024 * 1024; // XXX: fix this
- nni_plat_tcp_init(&ep->fd);
+
+ if ((rv = nni_plat_tcp_init(&ep->fd)) != 0) {
+ NNI_FREE_STRUCT(ep);
+ return (rv);
+ }
(void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
@@ -347,7 +351,10 @@ nni_tcp_ep_connect(void *arg, void **pipep)
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
- nni_plat_tcp_init(&pipe->fd);
+ if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
@@ -357,6 +364,7 @@ nni_tcp_ep_connect(void *arg, void **pipep)
bindaddr = lclpart == NULL ? NULL : &lcladdr;
rv = nni_plat_tcp_connect(&pipe->fd, &remaddr, bindaddr);
if (rv != 0) {
+ nni_plat_tcp_fini(&pipe->fd);
NNI_FREE_STRUCT(pipe);
return (rv);
}
@@ -416,9 +424,13 @@ nni_tcp_ep_accept(void *arg, void **pipep)
}
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
- nni_plat_tcp_init(&pipe->fd);
+
+ if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ }
if ((rv = nni_plat_tcp_accept(&pipe->fd, &ep->fd)) != 0) {
+ nni_plat_tcp_fini(&pipe->fd);
NNI_FREE_STRUCT(pipe);
return (rv);
}