diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/clock.h | 2 | ||||
| -rw-r--r-- | src/core/endpt.c | 30 | ||||
| -rw-r--r-- | src/core/platform.h | 4 | ||||
| -rw-r--r-- | src/nng.c | 9 | ||||
| -rw-r--r-- | src/nng.h | 7 | ||||
| -rw-r--r-- | src/platform/posix/posix_debug.c | 61 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipc.c | 7 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 34 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 23 | ||||
| -rw-r--r-- | src/platform/windows/win_debug.c | 26 | ||||
| -rw-r--r-- | src/platform/windows/win_impl.h | 13 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 11 | ||||
| -rw-r--r-- | src/platform/windows/win_net.c | 358 | ||||
| -rw-r--r-- | src/platform/windows/win_rand.c | 3 | ||||
| -rw-r--r-- | src/platform/windows/win_thread.c | 19 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 17 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 18 |
17 files changed, 444 insertions, 198 deletions
diff --git a/src/core/clock.h b/src/core/clock.h index c46adfbe..3b607322 100644 --- a/src/core/clock.h +++ b/src/core/clock.h @@ -16,4 +16,4 @@ extern nni_time nni_clock(void); extern void nni_usleep(nni_duration usec); -#endif // CORE_CLOCK_H
\ No newline at end of file +#endif // CORE_CLOCK_H diff --git a/src/core/endpt.c b/src/core/endpt.c index 554d9a36..c4673376 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -297,16 +297,32 @@ nni_listener(void *arg) if (((rv = nni_ep_accept(ep, &pipe)) == 0) && ((rv = nni_pipe_start(pipe)) == 0)) { + // Success! Loop around for the next one. continue; } - if (rv == NNG_ECLOSED) { + + switch (rv) { + case NNG_ECLOSED: + // This indicates the listening socket got closed. + // We just bail. + return; + + case NNG_ECONNABORTED: + case NNG_ECONNRESET: + // These are remote conditions, no cool down. + cooldown = 0; + break; + case NNG_ENOMEM: + // We're running low on memory, so its best to wait + // a whole second to give the system a chance to + // recover memory. + cooldown = 1000000; + break; + default: + // Other cases we sleep just a tiny bit to avoid + // burning the cpu (e.g. out of files). + cooldown = 1000; // 1 msec break; - } - cooldown = 1000; // 1 ms cooldown - if (rv == NNG_ENOMEM) { - // For out of memory, we need to give more - // time for the system to reclaim resources. - cooldown = 100000; // 100ms } cooldown += nni_clock(); nni_mtx_lock(mx); diff --git a/src/core/platform.h b/src/core/platform.h index bf8a52dc..8d19a7c8 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -170,7 +170,7 @@ extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int); // nni_plat_tcp_init initializes the socket, for example it can // set underlying file descriptors to -1, etc. -extern void nni_plat_tcp_init(nni_plat_tcpsock *); +extern int nni_plat_tcp_init(nni_plat_tcpsock *); // nni_plat_tcp_fini just closes a TCP socket, and releases any related // resources. @@ -211,7 +211,7 @@ extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int); // nni_plat_ipc_init initializes the socket, for example it can // set underlying file descriptors to -1, etc. -extern void nni_plat_ipc_init(nni_plat_ipcsock *); +extern int nni_plat_ipc_init(nni_plat_ipcsock *); // nni_plat_ipc_fini just closes an IPC socket, and releases any related // resources. @@ -199,6 +199,15 @@ nng_strerror(int num) case NNG_EPERM: return ("Permission denied"); + + case NNG_EMSGSIZE: + return ("Message too large"); + + case NNG_ECONNRESET: + return ("Connection reset"); + + case NNG_ECONNABORTED: + return ("Connection aborted"); } if (num & NNG_ESYSERR) { @@ -38,8 +38,8 @@ extern "C" { #endif // NNG_SHARED_LIB #else #define NNG_DECL extern -#endif // _WIN32 && !NNG_STATIC_LIB -#endif // NNG_DECL +#endif // _WIN32 && !NNG_STATIC_LIB +#endif // NNG_DECL // Types common to nng. typedef struct nng_socket nng_socket; @@ -388,6 +388,9 @@ NNG_DECL int nng_device(nng_socket *, nng_socket *); #define NNG_EUNREACHABLE (14) #define NNG_EADDRINVAL (15) #define NNG_EPERM (16) +#define NNG_EMSGSIZE (17) +#define NNG_ECONNABORTED (18) +#define NNG_ECONNRESET (19) // NNG_SYSERR is a special code, which allows us to wrap errors from the // underlyuing operating system. We generally prefer to map errors to one diff --git a/src/platform/posix/posix_debug.c b/src/platform/posix/posix_debug.c index 71006b25..6199df1d 100644 --- a/src/platform/posix/posix_debug.c +++ b/src/platform/posix/posix_debug.c @@ -40,8 +40,6 @@ nni_plat_strerror(int errnum) } -#define NNI_ERR(x, y) { x, y }, - // There are of course other errors than these, but these are the ones // that we might reasonably expect and want to handle "cleanly". Most of // the others should be handled by the system error code. Note that EFAULT @@ -55,35 +53,36 @@ static struct { int nng_err; } nni_plat_errnos[] = { - NNI_ERR(EINTR, NNG_EINTR) - NNI_ERR(EINVAL, NNG_EINVAL) - NNI_ERR(ENOMEM, NNG_ENOMEM) - NNI_ERR(EACCES, NNG_EPERM) - NNI_ERR(EADDRINUSE, NNG_EADDRINUSE) - NNI_ERR(EADDRNOTAVAIL, NNG_EADDRINVAL) - NNI_ERR(EAFNOSUPPORT, NNG_ENOTSUP) - NNI_ERR(EAGAIN, NNG_EAGAIN) - NNI_ERR(EBADF, NNG_ECLOSED) - NNI_ERR(EBUSY, NNG_EBUSY) - NNI_ERR(ECONNABORTED, NNG_ECLOSED) - NNI_ERR(ECONNREFUSED, NNG_ECONNREFUSED) - NNI_ERR(ECONNRESET, NNG_ECLOSED) - NNI_ERR(EHOSTUNREACH, NNG_EUNREACHABLE) - NNI_ERR(ENETUNREACH, NNG_EUNREACHABLE) - NNI_ERR(ENAMETOOLONG, NNG_EINVAL) - NNI_ERR(ENOENT, NNG_ENOENT) - NNI_ERR(ENOBUFS, NNG_ENOMEM) - NNI_ERR(ENOPROTOOPT, NNG_ENOTSUP) - NNI_ERR(ENOSYS, NNG_ENOTSUP) - NNI_ERR(ENOTSUP, NNG_ENOTSUP) - NNI_ERR(EPERM, NNG_EPERM) - NNI_ERR(EPIPE, NNG_ECLOSED) - NNI_ERR(EPROTO, NNG_EPROTO) - NNI_ERR(EPROTONOSUPPORT, NNG_ENOTSUP) - NNI_ERR(ETIME, NNG_ETIMEDOUT) - NNI_ERR(ETIMEDOUT, NNG_ETIMEDOUT) - NNI_ERR(EWOULDBLOCK, NNG_EAGAIN) - NNI_ERR(0, 0) // must be last + { EINTR, NNG_EINTR }, + { EINVAL, NNG_EINVAL }, + { ENOMEM, NNG_ENOMEM }, + { EACCES, NNG_EPERM }, + { EADDRINUSE, NNG_EADDRINUSE }, + { EADDRNOTAVAIL, NNG_EADDRINVAL }, + { EAFNOSUPPORT, NNG_ENOTSUP }, + { EAGAIN, NNG_EAGAIN }, + { EBADF, NNG_ECLOSED }, + { EBUSY, NNG_EBUSY }, + { ECONNABORTED, NNG_ECONNABORTED }, + { ECONNREFUSED, NNG_ECONNREFUSED }, + { ECONNRESET, NNG_ECONNRESET }, + { EHOSTUNREACH, NNG_EUNREACHABLE }, + { ENETUNREACH, NNG_EUNREACHABLE }, + { ENAMETOOLONG, NNG_EINVAL }, + { ENOENT, NNG_ENOENT }, + { ENOBUFS, NNG_ENOMEM }, + { ENOPROTOOPT, NNG_ENOTSUP }, + { ENOSYS, NNG_ENOTSUP }, + { ENOTSUP, NNG_ENOTSUP }, + { EPERM, NNG_EPERM }, + { EPIPE, NNG_ECLOSED }, + { EPROTO, NNG_EPROTO }, + { EPROTONOSUPPORT, NNG_ENOTSUP }, + { ETIME, NNG_ETIMEDOUT }, + { ETIMEDOUT, NNG_ETIMEDOUT }, + { EWOULDBLOCK, NNG_EAGAIN }, + // must be last + { 0, 0 }, }; int diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index 8c40397a..6044280e 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -73,7 +73,7 @@ nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) i = 0; while (resid) { - rv = writev(s->fd, iov, cnt); + rv = writev(s->fd, &iov[i], cnt); if (rv < 0) { if (rv == EINTR) { continue; @@ -121,7 +121,7 @@ nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) } i = 0; while (resid) { - rv = readv(s->fd, iov, cnt); + rv = readv(s->fd, &iov[i], cnt); if (rv < 0) { if (errno == EINTR) { continue; @@ -171,10 +171,11 @@ nni_plat_ipc_setopts(int fd) } -void +int nni_plat_ipc_init(nni_plat_ipcsock *s) { s->fd = -1; + return (0); } diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index 76a40259..f9720b66 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -23,6 +23,12 @@ #include <unistd.h> #include <netdb.h> +#ifdef SOCK_CLOEXEC +#define NNI_TCP_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) +#else +#define NNI_TCP_SOCKTYPE SOCK_STREAM +#endif + static int nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa) { @@ -128,7 +134,7 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) i = 0; while (resid) { - rv = writev(s->fd, iov, cnt); + rv = writev(s->fd, &iov[i], cnt); if (rv < 0) { if (rv == EINTR) { continue; @@ -177,7 +183,7 @@ nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) i = 0; while (resid) { - rv = readv(s->fd, iov, cnt); + rv = readv(s->fd, &iov[i], cnt); if (rv < 0) { if (errno == EINTR) { continue; @@ -233,10 +239,11 @@ nni_plat_tcp_setopts(int fd) } -void +int nni_plat_tcp_init(nni_plat_tcpsock *s) { s->fd = -1; + return (0); } @@ -282,12 +289,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) return (NNG_EADDRINVAL); } -#ifdef SOCK_CLOEXEC - fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); -#else - fd = socket(ss.ss_family, SOCK_STREAM, 0); -#endif - if (fd < 0) { + if ((fd = socket(ss.ss_family, NNI_TCP_SOCKTYPE, 0)) < 0) { return (nni_plat_errno(errno)); } @@ -330,12 +332,7 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, return (NNG_EADDRINVAL); } -#ifdef SOCK_CLOEXEC - fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); -#else - fd = socket(ss.ss_family, SOCK_STREAM, 0); -#endif - if (fd < 0) { + if ((fd = socket(ss.ss_family, NNI_TCP_SOCKTYPE, 0)) < 0) { return (nni_plat_errno(errno)); } @@ -381,13 +378,6 @@ nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) #endif if (fd < 0) { - if ((errno == EINTR) || (errno == ECONNABORTED)) { - // These are not fatal errors, keep trying - continue; - } - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { - continue; - } return (nni_plat_errno(errno)); } else { break; diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index a429e071..7b4dbbdc 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -28,6 +28,7 @@ static int nni_plat_forked = 0; pthread_condattr_t nni_cvattr; pthread_mutexattr_t nni_mxattr; +static pthread_attr_t nni_pthread_attr; // We open a /dev/null file descriptor so that we can dup2() it to // cause MacOS X to wakeup. This gives us a "safe" close semantic. @@ -189,7 +190,8 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg) thr->arg = arg; // POSIX wants functions to return a void *, but we don't care. - rv = pthread_create(&thr->tid, NULL, nni_plat_thr_main, thr); + rv = pthread_create(&thr->tid, &nni_pthread_attr, + nni_plat_thr_main, thr); if (rv != 0) { //nni_printf("pthread_create: %s", strerror(rv)); return (NNG_ENOMEM); @@ -263,9 +265,27 @@ nni_plat_init(int (*helper)(void)) return (NNG_ENOMEM); } + rv = pthread_attr_init(&nni_pthread_attr); + if (rv != 0) { + pthread_mutex_unlock(&nni_plat_lock); + (void) close(nni_plat_devnull); + pthread_mutexattr_destroy(&nni_mxattr); + pthread_condattr_destroy(&nni_cvattr); + return (NNG_ENOMEM); + } + + // We don't force this, but we want to have it small... we could + // probably get by with even just 8k, but Linux usually wants 16k + // as a minimum. If this fails, its not fatal, just we won't be + // as scalable / thrifty with our use of VM. + (void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); + if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { pthread_mutex_unlock(&nni_plat_lock); (void) close(nni_plat_devnull); + pthread_mutexattr_destroy(&nni_mxattr); + pthread_condattr_destroy(&nni_cvattr); + pthread_attr_destroy(&nni_pthread_attr); return (NNG_ENOMEM); } if ((rv = helper()) == 0) { @@ -284,6 +304,7 @@ nni_plat_fini(void) if (nni_plat_inited) { pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); + pthread_attr_destroy(&nni_pthread_attr); (void) close(nni_plat_devnull); nni_plat_devnull = -1; nni_plat_inited = 0; diff --git a/src/platform/windows/win_debug.c b/src/platform/windows/win_debug.c index 189f5d4c..00fd6493 100644 --- a/src/platform/windows/win_debug.c +++ b/src/platform/windows/win_debug.c @@ -40,26 +40,24 @@ nni_plat_strerror(int errnum) } -#define NNI_ERR(x, y) { x, y }, - // Win32 has its own error codes, but these ones it shares with POSIX. static struct { int sys_err; int nng_err; } nni_plat_errnos[] = { - NNI_ERR(ENOENT, NNG_ENOENT) - NNI_ERR(EINTR, NNG_EINTR) - NNI_ERR(EINVAL, NNG_EINVAL) - NNI_ERR(ENOMEM, NNG_ENOMEM) - NNI_ERR(EACCES, NNG_EPERM) - NNI_ERR(EAGAIN, NNG_EAGAIN) - NNI_ERR(EBADF, NNG_ECLOSED) - NNI_ERR(EBUSY, NNG_EBUSY) - NNI_ERR(ENAMETOOLONG, NNG_EINVAL) - NNI_ERR(EPERM, NNG_EPERM) - NNI_ERR(EPIPE, NNG_ECLOSED) - NNI_ERR(0, 0) // must be last + { ENOENT, NNG_ENOENT }, + { EINTR, NNG_EINTR }, + { EINVAL, NNG_EINVAL }, + { ENOMEM, NNG_ENOMEM }, + { EACCES, NNG_EPERM }, + { EAGAIN, NNG_EAGAIN }, + { EBADF, NNG_ECLOSED }, + { EBUSY, NNG_EBUSY }, + { ENAMETOOLONG, NNG_EINVAL }, + { EPERM, NNG_EPERM }, + { EPIPE, NNG_ECLOSED }, + { 0, 0 } // must be last }; int diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index b2714b23..f675f510 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -27,7 +27,16 @@ // elsewhere. struct nni_plat_tcpsock { - SOCKET s; + SOCKET s; + + WSAOVERLAPPED recv_olpd; + WSAOVERLAPPED send_olpd; + WSAOVERLAPPED conn_olpd; // Use for both connect and accept + + // We have to lookup some function pointers using ioctls. Winsock, + // gotta love it. + LPFN_CONNECTEX connectex; + LPFN_ACCEPTEX acceptex; }; struct nni_plat_ipcsock { @@ -47,7 +56,7 @@ struct nni_plat_mtx { struct nni_plat_cv { CONDITION_VARIABLE cv; - CRITICAL_SECTION *cs; + CRITICAL_SECTION * cs; }; #endif // PLATFORM_WINDOWS diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index 682eb0a8..f4d4d004 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -115,10 +115,11 @@ nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) } -void +int nni_plat_ipc_init(nni_plat_ipcsock *s) { s->p = INVALID_HANDLE_VALUE; + return (0); } @@ -136,7 +137,7 @@ void nni_plat_ipc_shutdown(nni_plat_ipcsock *s) { if (s->p != INVALID_HANDLE_VALUE) { -#if 0 +#if 0 (void) shutdown(s->fd, SHUT_RDWR); // This causes the equivalent of a close. Hopefully waking // up anything that didn't get the hint with the shutdown. @@ -209,7 +210,7 @@ nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path) return (rv); } s->fd = fd; -#endif +#endif return (NNG_ENOTSUP); } @@ -244,7 +245,7 @@ nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path) return (rv); } s->fd = fd; -#endif +#endif return (NNG_ENOTSUP); } @@ -275,7 +276,7 @@ nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server) nni_plat_ipc_setopts(fd); s->fd = fd; -#endif +#endif return (NNG_ENOTSUP); } diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c index 3bd34b78..c7a7849d 100644 --- a/src/platform/windows/win_net.c +++ b/src/platform/windows/win_net.c @@ -11,46 +11,104 @@ #ifdef PLATFORM_WINDOWS +#include <stdio.h> + +// Windows has infinite numbers of error codes it seems. static struct { int wsa_err; int nng_err; } nni_plat_wsa_errnos[] = { - { WSAECONNABORTED, NNG_ECLOSED }, - { WSAEINTR, NNG_EINTR }, + { WSA_INVALID_HANDLE, NNG_ECLOSED }, + { WSA_NOT_ENOUGH_MEMORY, NNG_ENOMEM }, + { WSA_INVALID_PARAMETER, NNG_EINVAL }, + { WSA_OPERATION_ABORTED, NNG_ECLOSED }, + { WSA_IO_INCOMPLETE, NNG_EAGAIN }, + + { WSAEINTR, NNG_EINTR }, + { WSAEBADF, NNG_ECLOSED }, + { WSAEACCES, NNG_EPERM }, + { WSAEFAULT, NNG_ESYSERR + WSAEFAULT }, + { WSAEWOULDBLOCK, NNG_EAGAIN }, + { WSAEINPROGRESS, NNG_EAGAIN }, + { WSAEALREADY, NNG_ESYSERR + WSAEALREADY }, + { WSAENOTSOCK, NNG_ECLOSED }, + { WSAEMSGSIZE, NNG_EMSGSIZE }, + { WSAEPROTOTYPE, NNG_ESYSERR + WSAEPROTOTYPE }, + { WSAENOPROTOOPT, NNG_ENOTSUP }, + { WSAEPROTONOSUPPORT, NNG_ENOTSUP }, + { WSAEPROTONOSUPPORT, NNG_ENOTSUP }, + { WSAEADDRINUSE, NNG_EADDRINUSE }, + { WSAEADDRNOTAVAIL, NNG_EADDRINVAL }, + { WSAENETDOWN, NNG_EUNREACHABLE }, + { WSAENETUNREACH, NNG_EUNREACHABLE }, + { WSAECONNABORTED, NNG_ETIMEDOUT }, + { WSAECONNRESET, NNG_ECLOSED }, + { WSAENOBUFS, NNG_ENOMEM }, + { WSAEISCONN, NNG_ESYSERR + WSAEISCONN }, + { WSAENOTCONN, NNG_ECLOSED }, + { WSAESHUTDOWN, NNG_ECLOSED }, + { WSAETOOMANYREFS, NNG_ESYSERR + WSAETOOMANYREFS }, + { WSAETIMEDOUT, NNG_ETIMEDOUT }, + { WSAECONNREFUSED, NNG_ECONNREFUSED }, + { WSAELOOP, NNG_ESYSERR + WSAELOOP }, + { WSAENAMETOOLONG, NNG_ESYSERR + WSAENAMETOOLONG }, + { WSAEHOSTDOWN, NNG_EUNREACHABLE }, + { WSAEHOSTUNREACH, NNG_EUNREACHABLE }, + { WSAENOTEMPTY, NNG_ESYSERR + WSAENOTEMPTY }, + { WSAEPROCLIM, NNG_ESYSERR + WSAEPROCLIM }, + { WSAEUSERS, NNG_ESYSERR + WSAEUSERS }, + { WSAEDQUOT, NNG_ESYSERR + WSAEDQUOT }, + { WSAESTALE, NNG_ESYSERR + WSAESTALE }, + { WSAEREMOTE, NNG_ESYSERR + WSAEREMOTE }, + { WSASYSNOTREADY, NNG_ESYSERR + WSASYSNOTREADY }, + { WSAVERNOTSUPPORTED, NNG_ENOTSUP }, + { WSANOTINITIALISED, NNG_ESYSERR + WSANOTINITIALISED }, + { WSAEDISCON, NNG_ECLOSED }, + { WSAENOMORE, NNG_ESYSERR + WSAENOMORE }, + { WSAECANCELLED, NNG_ESYSERR + WSAECANCELLED }, + { WSAEINVALIDPROVIDER, NNG_ESYSERR + WSAEINVALIDPROVIDER }, + { WSAEPROVIDERFAILEDINIT, NNG_ESYSERR + WSAEPROVIDERFAILEDINIT }, + { WSASYSCALLFAILURE, NNG_ESYSERR + WSASYSCALLFAILURE }, + { WSASERVICE_NOT_FOUND, NNG_ESYSERR + WSASERVICE_NOT_FOUND }, + { WSATYPE_NOT_FOUND, NNG_ESYSERR + WSATYPE_NOT_FOUND }, + { WSA_E_CANCELLED, NNG_ESYSERR + WSA_E_CANCELLED }, + { WSAEREFUSED, NNG_ESYSERR + WSAEREFUSED }, + { WSAHOST_NOT_FOUND, NNG_EADDRINVAL }, + { WSATRY_AGAIN, NNG_EAGAIN }, + { WSANO_RECOVERY, NNG_ESYSERR + WSANO_RECOVERY }, + { WSANO_DATA, NNG_EADDRINVAL }, + // Eliding all the QoS related errors. + +#if 0 // REVIEW THESE!!! - { WSAECONNRESET, NNG_ECONNREFUSED }, - { WSAEMSGSIZE, NNG_EINVAL }, - { WSAENETDOWN, NNG_EUNREACHABLE }, - { WSAENETRESET, NNG_ECLOSED }, - { WSAENOBUFS, NNG_ENOMEM }, - { WSAESHUTDOWN, NNG_ECLOSED }, - { WSAEWOULDBLOCK, NNG_EAGAIN }, - { WSAEBADF, NNG_ECLOSED }, - { WSA_INVALID_HANDLE, NNG_ECLOSED }, - { WSA_NOT_ENOUGH_MEMORY, NNG_ENOMEM }, - { WSA_INVALID_PARAMETER, NNG_EINVAL }, - { WSAEACCES, NNG_EPERM }, - { 0, 0 }, // MUST BE LAST + { ERROR_BROKEN_PIPE, NNG_ECLOSED }, + { ERROR_CONNECTION_REFUSED, NNG_ECONNREFUSED }, + { ERROR_NOT_CONNECTED, NNG_ECLOSED }, + { ERROR_PIPE_NOT_CONNECTED, NNG_ECLOSED }, + { ERROR_NO_DATA, NNG_ECLOSED }, +#endif + // Must be Last!! + { 0, 0 }, }; static int -nni_plat_wsa_last_error(void) +nni_winsock_error(int werr) { - int errnum = WSAGetLastError(); int i; - if (errnum == 0) { + if (werr == 0) { return (0); } + for (i = 0; nni_plat_wsa_errnos[i].nng_err != 0; i++) { - if (errnum == nni_plat_wsa_errnos[i].wsa_err) { + if (werr == nni_plat_wsa_errnos[i].wsa_err) { return (nni_plat_wsa_errnos[i].nng_err); } } // Other system errno. - return (NNG_ESYSERR + errnum); + return (NNG_ESYSERR + werr); } @@ -140,22 +198,53 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) { WSABUF iov[4]; // We never have more than 3 at present int i; - DWORD sent = 0; int rv; + DWORD offset; + DWORD nsent; + DWORD resid; + DWORD flags; + WSAOVERLAPPED *olp = &s->send_olpd; if (cnt > 4) { return (NNG_EINVAL); } - for (i = 0; i < cnt; i++) { + for (i = 0, resid = 0; i < cnt; resid += iov[i].len, i++) { iov[i].buf = iovs[i].iov_buf; iov[i].len = iovs[i].iov_len; } - rv = WSASend(s->s, iov, cnt, &sent, 0, NULL, NULL); - if (rv != 0) { - // XXX: CONVERT WSAGetLastError code. - return (nni_plat_wsa_last_error()); + i = 0; + while (resid) { + flags = 0; + rv = WSASend(s->s, &iov[i], cnt, &nsent, flags, olp, NULL); + if (rv == SOCKET_ERROR) { + if ((rv = WSAGetLastError()) != WSA_IO_PENDING) { + return (nni_winsock_error(rv)); + } + flags = 0; + if (!WSAGetOverlappedResult(s->s, olp, &nsent, + TRUE, &flags)) { + return (nni_winsock_error(WSAGetLastError())); + } + } + + if (nsent > resid) { + nni_panic("WSASend says it sent too much"); + } + + resid -= nsent; + while (nsent) { + if (iov[i].len <= nsent) { + nsent -= iov[i].len; + i++; + cnt--; + } else { + iov[i].len -= nsent; + iov[i].buf += nsent; + nsent = 0; + } + } } return (0); @@ -167,29 +256,39 @@ nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) { WSABUF iov[4]; // We never have more than 3 at present int i; - int offset; - int resid = 0; int rv; + DWORD offset; + DWORD resid; DWORD nrecv; + DWORD flags; + WSAOVERLAPPED *olp = &s->recv_olpd; if (cnt > 4) { return (NNG_EINVAL); } - for (i = 0; i < cnt; i++) { + for (i = 0, resid = 0; i < cnt; resid += iov[i].len, i++) { iov[i].buf = iovs[i].iov_buf; iov[i].len = iovs[i].iov_len; - resid += iov[i].len; } i = 0; while (resid) { - rv = WSARecv(s->s, iov, cnt, &nrecv, 0, NULL, NULL); - if (rv != 0) { - return (nni_plat_wsa_last_error()); + flags = 0; + rv = WSARecv(s->s, &iov[i], cnt, &nrecv, &flags, olp, NULL); + if (rv == SOCKET_ERROR) { + if ((rv = WSAGetLastError()) != WSA_IO_PENDING) { + return (nni_winsock_error(rv)); + } + flags = 0; + if (!WSAGetOverlappedResult(s->s, olp, &nrecv, + TRUE, &flags)) { + return (nni_winsock_error(WSAGetLastError())); + } } + if (nrecv > resid) { - nni_panic("readv says it read too much!"); + nni_panic("WSARecv says it read too much!"); } resid -= nrecv; @@ -227,19 +326,74 @@ nni_plat_tcp_setopts(SOCKET fd) } -void +int nni_plat_tcp_init(nni_plat_tcpsock *s) { s->s = INVALID_SOCKET; + return (0); +} + + +static int +nni_plat_tcp_open(nni_plat_tcpsock *s) +{ + int rv; + DWORD nbytes; + GUID guid1 = WSAID_CONNECTEX; + GUID guid2 = WSAID_ACCEPTEX; + + ZeroMemory(s, sizeof (*s)); + s->s = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + WSA_FLAG_NO_HANDLE_INHERIT|WSA_FLAG_OVERLAPPED); + if (s->s == INVALID_SOCKET) { + rv = WSAGetLastError(); + return (nni_winsock_error(rv)); + } + + if (WSAIoctl(s->s, SIO_GET_EXTENSION_FUNCTION_POINTER, + &guid1, sizeof (guid1), &s->connectex, sizeof (s->connectex), + &nbytes, NULL, NULL) == SOCKET_ERROR) { + nni_panic("failed lookup for ConnectEx function"); + } + if (WSAIoctl(s->s, SIO_GET_EXTENSION_FUNCTION_POINTER, + &guid2, sizeof (guid2), &s->acceptex, sizeof (s->acceptex), + &nbytes, NULL, NULL) == SOCKET_ERROR) { + nni_panic("failed lookup for AcceptEx function"); + } + + nni_plat_tcp_setopts(s->s); + + return (0); +} + + +static void +nni_plat_tcp_close(nni_plat_tcpsock *s) +{ + SOCKET fd; + + if ((fd = s->s) != INVALID_SOCKET) { + s->s = INVALID_SOCKET; + (void) shutdown(fd, SD_BOTH); + (void) CancelIoEx((HANDLE) fd, &s->conn_olpd); + (void) CancelIoEx((HANDLE) fd, &s->recv_olpd); + (void) CancelIoEx((HANDLE) fd, &s->send_olpd); + (void) closesocket(fd); + } } void nni_plat_tcp_fini(nni_plat_tcpsock *s) { - if (s->s != INVALID_SOCKET) { - (void) closesocket(s->s); + SOCKET fd; + + if ((fd = s->s) != INVALID_SOCKET) { s->s = INVALID_SOCKET; + (void) CancelIoEx((HANDLE) fd, &s->conn_olpd); + (void) CancelIoEx((HANDLE) fd, &s->recv_olpd); + (void) CancelIoEx((HANDLE) fd, &s->send_olpd); + (void) closesocket(fd); } } @@ -247,9 +401,7 @@ nni_plat_tcp_fini(nni_plat_tcpsock *s) void nni_plat_tcp_shutdown(nni_plat_tcpsock *s) { - if (s->s != INVALID_SOCKET) { - (void) shutdown(s->s, SD_BOTH); - } + nni_plat_tcp_close(s); } @@ -264,46 +416,39 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) { int len; SOCKADDR_STORAGE ss; + ULONG yes; int rv; - BOOL yes; len = nni_plat_to_sockaddr(&ss, addr); if (len < 0) { return (NNG_EADDRINVAL); } - s->s = WSASocket(ss.ss_family, SOCK_STREAM, 0, NULL, 0, - WSA_FLAG_NO_HANDLE_INHERIT); - if (s->s == INVALID_SOCKET) { - return (nni_plat_wsa_last_error()); + if ((rv = nni_plat_tcp_open(s)) != 0) { + return (rv); } - nni_plat_tcp_setopts(s->s); - // Make sure that we use the address exclusively. Windows lets // others hijack us by default. yes = 1; if (setsockopt(s->s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, sizeof (yes)) == SOCKET_ERROR) { - rv = nni_plat_wsa_last_error(); - (void) closesocket(s->s); - s->s = INVALID_SOCKET; - return (rv); + rv = WSAGetLastError(); + nni_plat_tcp_close(s); + return (nni_winsock_error(rv)); } if (bind(s->s, (struct sockaddr *) &ss, len) != 0) { - rv = nni_plat_wsa_last_error(); - (void) closesocket(s->s); - s->s = INVALID_SOCKET; - return (rv); + rv = WSAGetLastError(); + nni_plat_tcp_close(s); + return (nni_winsock_error(rv)); } // Listen -- 128 depth is probably sufficient. If it isn't, other // bad things are going to happen. if (listen(s->s, 128) != 0) { - rv = nni_plat_wsa_last_error(); - (void) closesocket(s->s); - s->s = INVALID_SOCKET; - return (rv); + rv = WSAGetLastError(); + nni_plat_tcp_close(s); + return (nni_winsock_error(rv)); } return (0); @@ -320,6 +465,10 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, int len; SOCKADDR_STORAGE ss; SOCKADDR_STORAGE bss; + WSAOVERLAPPED *olp = &s->conn_olpd; + BOOL ok; + DWORD nbytes; + DWORD flags; int rv; len = nni_plat_to_sockaddr(&ss, addr); @@ -327,39 +476,44 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, return (NNG_EADDRINVAL); } - s->s = WSASocket(ss.ss_family, SOCK_STREAM, 0, NULL, 0, - WSA_FLAG_NO_HANDLE_INHERIT); - if (s->s == INVALID_SOCKET) { - return (nni_plat_wsa_last_error()); - } - if (bindaddr != NULL) { if (bindaddr->s_un.s_family != addr->s_un.s_family) { - (void) closesocket(s->s); - s->s = INVALID_SOCKET; - return (NNG_EINVAL); + return (NNG_EADDRINVAL); } if (nni_plat_to_sockaddr(&bss, bindaddr) < 0) { - (void) closesocket(s->s); - s->s = INVALID_SOCKET; return (NNG_EADDRINVAL); } - if (bind(s->s, (struct sockaddr *) &bss, len) < 0) { - rv = nni_plat_wsa_last_error(); - (void) closesocket(s->s); - s->s = INVALID_SOCKET; - return (rv); - } + } else { + ZeroMemory(&bss, sizeof (bss)); + bss.ss_family = ss.ss_family; } - nni_plat_tcp_setopts(s->s); - - if (connect(s->s, (struct sockaddr *) &ss, len) != 0) { - rv = nni_plat_wsa_last_error(); - (void) closesocket(s->s); - s->s = INVALID_SOCKET; + if ((rv = nni_plat_tcp_open(s)) != 0) { return (rv); } + + // ConnectEx must always be bound first. + if (bind(s->s, (struct sockaddr *) &bss, len) < 0) { + rv = WSAGetLastError(); + nni_plat_tcp_close(s); + return (nni_winsock_error(rv)); + } + + if (s->connectex(s->s, (struct sockaddr *) &ss, len, NULL, 0, NULL, + olp)) { + // Immediate completion? + return (0); + } + if ((rv = WSAGetLastError()) != ERROR_IO_PENDING) { + nni_plat_tcp_close(s); + return (nni_winsock_error(rv)); + } + nbytes = flags = 0; + if (!WSAGetOverlappedResult(s->s, olp, &nbytes, TRUE, &flags)) { + rv = WSAGetLastError(); + nni_plat_tcp_close(s); + return (nni_winsock_error(rv)); + } return (0); } @@ -367,26 +521,32 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, int nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) { - SOCKET fd; - int err; - - for (;;) { - fd = accept(server->s, NULL, NULL); + DWORD nbytes; + DWORD flags; + WSAOVERLAPPED *olp = &s->conn_olpd; + char ainfo[512]; + int rv; - if (fd == INVALID_SOCKET) { - err = WSAGetLastError(); - if ((err == WSAECONNRESET) || (err == WSAEWOULDBLOCK)) { - continue; - } - return (nni_plat_wsa_last_error()); - } else { - break; - } + if ((rv = nni_plat_tcp_open(s)) != 0) { + return (rv); } - nni_plat_tcp_setopts(fd); - - s->s = fd; + // 256 > (sizeof (SOCKADDR_STORAGE) + 16) + nbytes = 0; + if (s->acceptex(server->s, s->s, ainfo, 0, 256, 256, &nbytes, olp)) { + // Immediate completion? + return (0); + } + if ((rv = WSAGetLastError()) != ERROR_IO_PENDING) { + nni_plat_tcp_close(s); + return (nni_winsock_error(rv)); + } + nbytes = flags = 0; + if (!WSAGetOverlappedResult(server->s, olp, &nbytes, TRUE, &flags)) { + rv = WSAGetLastError(); + nni_plat_tcp_close(s); + return (nni_winsock_error(rv)); + } return (0); } diff --git a/src/platform/windows/win_rand.c b/src/platform/windows/win_rand.c index 1cabb6a0..8bca5927 100644 --- a/src/platform/windows/win_rand.c +++ b/src/platform/windows/win_rand.c @@ -17,13 +17,14 @@ void nni_plat_seed_prng(void *buf, size_t bufsz) { unsigned val; + // The rand_s routine uses RtlGenRandom to get high quality // pseudo random numbers (i.e. numbers that should be good enough // for use with crypto keying.) while (bufsz > sizeof (val)) { rand_s(&val); memcpy(buf, &val, sizeof (val)); - buf = (((char *)buf) + sizeof (val)); + buf = (((char *) buf) + sizeof (val)); bufsz -= sizeof (val); } } diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index 4f323616..631883fc 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -125,8 +125,11 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg) thr->func = fn; thr->arg = arg; - thr->handle = (HANDLE) _beginthreadex(NULL, 0, - nni_plat_thr_main, thr, 0, NULL); + // We could probably even go down to 8k... but crypto for some + // protocols might get bigger than this. 1MB is waaay too big. + thr->handle = (HANDLE) _beginthreadex(NULL, 16384, + nni_plat_thr_main, thr, STACK_SIZE_PARAM_IS_A_RESERVATION, + NULL); if (thr->handle == NULL) { return (NNG_ENOMEM); // Best guess... } @@ -166,6 +169,17 @@ nni_plat_init(int (*helper)(void)) Sleep(1); } if (!inited) { + WSADATA data; + WORD ver; + ver = MAKEWORD(2, 2); + if (WSAStartup(MAKEWORD(2, 2), &data) != 0) { + InterlockedExchange(&initing, 0); + if ((LOBYTE(data.wVersion) != 2) || + (HIBYTE(data.wVersion) != 2)) { + nni_panic("got back wrong winsock ver"); + } + return (NNG_ENOMEM); + } helper(); inited = 1; } @@ -178,6 +192,7 @@ nni_plat_init(int (*helper)(void)) void nni_plat_fini(void) { + WSACleanup(); } diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 8a28dfe5..2ddc74b7 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -190,7 +190,10 @@ nni_ipc_ep_init(void **epp, const char *url, uint16_t proto) ep->closed = 0; ep->proto = proto; ep->rcvmax = 1024 * 1024; // XXX: fix this - nni_plat_ipc_init(&ep->fd); + if ((rv = nni_plat_ipc_init(&ep->fd)) != 0) { + NNI_FREE_STRUCT(ep); + return (rv); + } (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); @@ -273,12 +276,16 @@ nni_ipc_ep_connect(void *arg, void **pipep) if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } - nni_plat_ipc_init(&pipe->fd); + if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) { + NNI_FREE_STRUCT(pipe); + return (rv); + } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; rv = nni_plat_ipc_connect(&pipe->fd, path); if (rv != 0) { + nni_plat_ipc_fini(&pipe->fd); NNI_FREE_STRUCT(pipe); return (rv); } @@ -327,9 +334,13 @@ nni_ipc_ep_accept(void *arg, void **pipep) } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; - nni_plat_ipc_init(&pipe->fd); + if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) { + NNI_FREE_STRUCT(pipe); + return (rv); + } if ((rv = nni_plat_ipc_accept(&pipe->fd, &ep->fd)) != 0) { + nni_plat_ipc_fini(&pipe->fd); NNI_FREE_STRUCT(pipe); return (rv); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 9b8cdb1d..8fcf07e5 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -180,7 +180,11 @@ nni_tcp_ep_init(void **epp, const char *url, uint16_t proto) ep->proto = proto; ep->ipv4only = 0; ep->rcvmax = 1024 * 1024; // XXX: fix this - nni_plat_tcp_init(&ep->fd); + + if ((rv = nni_plat_tcp_init(&ep->fd)) != 0) { + NNI_FREE_STRUCT(ep); + return (rv); + } (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); @@ -347,7 +351,10 @@ nni_tcp_ep_connect(void *arg, void **pipep) if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } - nni_plat_tcp_init(&pipe->fd); + if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) { + NNI_FREE_STRUCT(pipe); + return (rv); + } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; @@ -357,6 +364,7 @@ nni_tcp_ep_connect(void *arg, void **pipep) bindaddr = lclpart == NULL ? NULL : &lcladdr; rv = nni_plat_tcp_connect(&pipe->fd, &remaddr, bindaddr); if (rv != 0) { + nni_plat_tcp_fini(&pipe->fd); NNI_FREE_STRUCT(pipe); return (rv); } @@ -416,9 +424,13 @@ nni_tcp_ep_accept(void *arg, void **pipep) } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; - nni_plat_tcp_init(&pipe->fd); + + if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) { + NNI_FREE_STRUCT(pipe); + } if ((rv = nni_plat_tcp_accept(&pipe->fd, &ep->fd)) != 0) { + nni_plat_tcp_fini(&pipe->fd); NNI_FREE_STRUCT(pipe); return (rv); } |
