summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-14 13:00:55 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-14 13:00:55 -0800
commitf4ce5a285167e7656037096f77f04ab80a010453 (patch)
tree48a9746e154872e4a01c9dee465aed716af278be /src
parentb639e4d3643b8245b77bc8707a3a864221fad195 (diff)
downloadnng-f4ce5a285167e7656037096f77f04ab80a010453.tar.gz
nng-f4ce5a285167e7656037096f77f04ab80a010453.tar.bz2
nng-f4ce5a285167e7656037096f77f04ab80a010453.zip
Windows TCP now working.
There are lots of changes here, mostly stuff we did in support of Windows TCP. However, there are some bugs that were fixed, and we added some new error codes, and generalized the handling of some failures during accept. Windows IPC (NamedPipes) is still missing.
Diffstat (limited to 'src')
-rw-r--r--src/core/clock.h2
-rw-r--r--src/core/endpt.c30
-rw-r--r--src/core/platform.h4
-rw-r--r--src/nng.c9
-rw-r--r--src/nng.h7
-rw-r--r--src/platform/posix/posix_debug.c61
-rw-r--r--src/platform/posix/posix_ipc.c7
-rw-r--r--src/platform/posix/posix_net.c34
-rw-r--r--src/platform/posix/posix_thread.c23
-rw-r--r--src/platform/windows/win_debug.c26
-rw-r--r--src/platform/windows/win_impl.h13
-rw-r--r--src/platform/windows/win_ipc.c11
-rw-r--r--src/platform/windows/win_net.c358
-rw-r--r--src/platform/windows/win_rand.c3
-rw-r--r--src/platform/windows/win_thread.c19
-rw-r--r--src/transport/ipc/ipc.c17
-rw-r--r--src/transport/tcp/tcp.c18
17 files changed, 444 insertions, 198 deletions
diff --git a/src/core/clock.h b/src/core/clock.h
index c46adfbe..3b607322 100644
--- a/src/core/clock.h
+++ b/src/core/clock.h
@@ -16,4 +16,4 @@ extern nni_time nni_clock(void);
extern void nni_usleep(nni_duration usec);
-#endif // CORE_CLOCK_H \ No newline at end of file
+#endif // CORE_CLOCK_H
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 554d9a36..c4673376 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -297,16 +297,32 @@ nni_listener(void *arg)
if (((rv = nni_ep_accept(ep, &pipe)) == 0) &&
((rv = nni_pipe_start(pipe)) == 0)) {
+ // Success! Loop around for the next one.
continue;
}
- if (rv == NNG_ECLOSED) {
+
+ switch (rv) {
+ case NNG_ECLOSED:
+ // This indicates the listening socket got closed.
+ // We just bail.
+ return;
+
+ case NNG_ECONNABORTED:
+ case NNG_ECONNRESET:
+ // These are remote conditions, no cool down.
+ cooldown = 0;
+ break;
+ case NNG_ENOMEM:
+ // We're running low on memory, so its best to wait
+ // a whole second to give the system a chance to
+ // recover memory.
+ cooldown = 1000000;
+ break;
+ default:
+ // Other cases we sleep just a tiny bit to avoid
+ // burning the cpu (e.g. out of files).
+ cooldown = 1000; // 1 msec
break;
- }
- cooldown = 1000; // 1 ms cooldown
- if (rv == NNG_ENOMEM) {
- // For out of memory, we need to give more
- // time for the system to reclaim resources.
- cooldown = 100000; // 100ms
}
cooldown += nni_clock();
nni_mtx_lock(mx);
diff --git a/src/core/platform.h b/src/core/platform.h
index bf8a52dc..8d19a7c8 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -170,7 +170,7 @@ extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int);
// nni_plat_tcp_init initializes the socket, for example it can
// set underlying file descriptors to -1, etc.
-extern void nni_plat_tcp_init(nni_plat_tcpsock *);
+extern int nni_plat_tcp_init(nni_plat_tcpsock *);
// nni_plat_tcp_fini just closes a TCP socket, and releases any related
// resources.
@@ -211,7 +211,7 @@ extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int);
// nni_plat_ipc_init initializes the socket, for example it can
// set underlying file descriptors to -1, etc.
-extern void nni_plat_ipc_init(nni_plat_ipcsock *);
+extern int nni_plat_ipc_init(nni_plat_ipcsock *);
// nni_plat_ipc_fini just closes an IPC socket, and releases any related
// resources.
diff --git a/src/nng.c b/src/nng.c
index d550c974..ec221fb7 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -199,6 +199,15 @@ nng_strerror(int num)
case NNG_EPERM:
return ("Permission denied");
+
+ case NNG_EMSGSIZE:
+ return ("Message too large");
+
+ case NNG_ECONNRESET:
+ return ("Connection reset");
+
+ case NNG_ECONNABORTED:
+ return ("Connection aborted");
}
if (num & NNG_ESYSERR) {
diff --git a/src/nng.h b/src/nng.h
index bf7bfa7b..cba6a596 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -38,8 +38,8 @@ extern "C" {
#endif // NNG_SHARED_LIB
#else
#define NNG_DECL extern
-#endif // _WIN32 && !NNG_STATIC_LIB
-#endif // NNG_DECL
+#endif // _WIN32 && !NNG_STATIC_LIB
+#endif // NNG_DECL
// Types common to nng.
typedef struct nng_socket nng_socket;
@@ -388,6 +388,9 @@ NNG_DECL int nng_device(nng_socket *, nng_socket *);
#define NNG_EUNREACHABLE (14)
#define NNG_EADDRINVAL (15)
#define NNG_EPERM (16)
+#define NNG_EMSGSIZE (17)
+#define NNG_ECONNABORTED (18)
+#define NNG_ECONNRESET (19)
// NNG_SYSERR is a special code, which allows us to wrap errors from the
// underlyuing operating system. We generally prefer to map errors to one
diff --git a/src/platform/posix/posix_debug.c b/src/platform/posix/posix_debug.c
index 71006b25..6199df1d 100644
--- a/src/platform/posix/posix_debug.c
+++ b/src/platform/posix/posix_debug.c
@@ -40,8 +40,6 @@ nni_plat_strerror(int errnum)
}
-#define NNI_ERR(x, y) { x, y },
-
// There are of course other errors than these, but these are the ones
// that we might reasonably expect and want to handle "cleanly". Most of
// the others should be handled by the system error code. Note that EFAULT
@@ -55,35 +53,36 @@ static struct {
int nng_err;
}
nni_plat_errnos[] = {
- NNI_ERR(EINTR, NNG_EINTR)
- NNI_ERR(EINVAL, NNG_EINVAL)
- NNI_ERR(ENOMEM, NNG_ENOMEM)
- NNI_ERR(EACCES, NNG_EPERM)
- NNI_ERR(EADDRINUSE, NNG_EADDRINUSE)
- NNI_ERR(EADDRNOTAVAIL, NNG_EADDRINVAL)
- NNI_ERR(EAFNOSUPPORT, NNG_ENOTSUP)
- NNI_ERR(EAGAIN, NNG_EAGAIN)
- NNI_ERR(EBADF, NNG_ECLOSED)
- NNI_ERR(EBUSY, NNG_EBUSY)
- NNI_ERR(ECONNABORTED, NNG_ECLOSED)
- NNI_ERR(ECONNREFUSED, NNG_ECONNREFUSED)
- NNI_ERR(ECONNRESET, NNG_ECLOSED)
- NNI_ERR(EHOSTUNREACH, NNG_EUNREACHABLE)
- NNI_ERR(ENETUNREACH, NNG_EUNREACHABLE)
- NNI_ERR(ENAMETOOLONG, NNG_EINVAL)
- NNI_ERR(ENOENT, NNG_ENOENT)
- NNI_ERR(ENOBUFS, NNG_ENOMEM)
- NNI_ERR(ENOPROTOOPT, NNG_ENOTSUP)
- NNI_ERR(ENOSYS, NNG_ENOTSUP)
- NNI_ERR(ENOTSUP, NNG_ENOTSUP)
- NNI_ERR(EPERM, NNG_EPERM)
- NNI_ERR(EPIPE, NNG_ECLOSED)
- NNI_ERR(EPROTO, NNG_EPROTO)
- NNI_ERR(EPROTONOSUPPORT, NNG_ENOTSUP)
- NNI_ERR(ETIME, NNG_ETIMEDOUT)
- NNI_ERR(ETIMEDOUT, NNG_ETIMEDOUT)
- NNI_ERR(EWOULDBLOCK, NNG_EAGAIN)
- NNI_ERR(0, 0) // must be last
+ { EINTR, NNG_EINTR },
+ { EINVAL, NNG_EINVAL },
+ { ENOMEM, NNG_ENOMEM },
+ { EACCES, NNG_EPERM },
+ { EADDRINUSE, NNG_EADDRINUSE },
+ { EADDRNOTAVAIL, NNG_EADDRINVAL },
+ { EAFNOSUPPORT, NNG_ENOTSUP },
+ { EAGAIN, NNG_EAGAIN },
+ { EBADF, NNG_ECLOSED },
+ { EBUSY, NNG_EBUSY },
+ { ECONNABORTED, NNG_ECONNABORTED },
+ { ECONNREFUSED, NNG_ECONNREFUSED },
+ { ECONNRESET, NNG_ECONNRESET },
+ { EHOSTUNREACH, NNG_EUNREACHABLE },
+ { ENETUNREACH, NNG_EUNREACHABLE },
+ { ENAMETOOLONG, NNG_EINVAL },
+ { ENOENT, NNG_ENOENT },
+ { ENOBUFS, NNG_ENOMEM },
+ { ENOPROTOOPT, NNG_ENOTSUP },
+ { ENOSYS, NNG_ENOTSUP },
+ { ENOTSUP, NNG_ENOTSUP },
+ { EPERM, NNG_EPERM },
+ { EPIPE, NNG_ECLOSED },
+ { EPROTO, NNG_EPROTO },
+ { EPROTONOSUPPORT, NNG_ENOTSUP },
+ { ETIME, NNG_ETIMEDOUT },
+ { ETIMEDOUT, NNG_ETIMEDOUT },
+ { EWOULDBLOCK, NNG_EAGAIN },
+ // must be last
+ { 0, 0 },
};
int
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index 8c40397a..6044280e 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -73,7 +73,7 @@ nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
i = 0;
while (resid) {
- rv = writev(s->fd, iov, cnt);
+ rv = writev(s->fd, &iov[i], cnt);
if (rv < 0) {
if (rv == EINTR) {
continue;
@@ -121,7 +121,7 @@ nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
}
i = 0;
while (resid) {
- rv = readv(s->fd, iov, cnt);
+ rv = readv(s->fd, &iov[i], cnt);
if (rv < 0) {
if (errno == EINTR) {
continue;
@@ -171,10 +171,11 @@ nni_plat_ipc_setopts(int fd)
}
-void
+int
nni_plat_ipc_init(nni_plat_ipcsock *s)
{
s->fd = -1;
+ return (0);
}
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index 76a40259..f9720b66 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -23,6 +23,12 @@
#include <unistd.h>
#include <netdb.h>
+#ifdef SOCK_CLOEXEC
+#define NNI_TCP_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
+#else
+#define NNI_TCP_SOCKTYPE SOCK_STREAM
+#endif
+
static int
nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa)
{
@@ -128,7 +134,7 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
i = 0;
while (resid) {
- rv = writev(s->fd, iov, cnt);
+ rv = writev(s->fd, &iov[i], cnt);
if (rv < 0) {
if (rv == EINTR) {
continue;
@@ -177,7 +183,7 @@ nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
i = 0;
while (resid) {
- rv = readv(s->fd, iov, cnt);
+ rv = readv(s->fd, &iov[i], cnt);
if (rv < 0) {
if (errno == EINTR) {
continue;
@@ -233,10 +239,11 @@ nni_plat_tcp_setopts(int fd)
}
-void
+int
nni_plat_tcp_init(nni_plat_tcpsock *s)
{
s->fd = -1;
+ return (0);
}
@@ -282,12 +289,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
return (NNG_EADDRINVAL);
}
-#ifdef SOCK_CLOEXEC
- fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0);
-#else
- fd = socket(ss.ss_family, SOCK_STREAM, 0);
-#endif
- if (fd < 0) {
+ if ((fd = socket(ss.ss_family, NNI_TCP_SOCKTYPE, 0)) < 0) {
return (nni_plat_errno(errno));
}
@@ -330,12 +332,7 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
return (NNG_EADDRINVAL);
}
-#ifdef SOCK_CLOEXEC
- fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0);
-#else
- fd = socket(ss.ss_family, SOCK_STREAM, 0);
-#endif
- if (fd < 0) {
+ if ((fd = socket(ss.ss_family, NNI_TCP_SOCKTYPE, 0)) < 0) {
return (nni_plat_errno(errno));
}
@@ -381,13 +378,6 @@ nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server)
#endif
if (fd < 0) {
- if ((errno == EINTR) || (errno == ECONNABORTED)) {
- // These are not fatal errors, keep trying
- continue;
- }
- if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
- continue;
- }
return (nni_plat_errno(errno));
} else {
break;
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index a429e071..7b4dbbdc 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -28,6 +28,7 @@ static int nni_plat_forked = 0;
pthread_condattr_t nni_cvattr;
pthread_mutexattr_t nni_mxattr;
+static pthread_attr_t nni_pthread_attr;
// We open a /dev/null file descriptor so that we can dup2() it to
// cause MacOS X to wakeup. This gives us a "safe" close semantic.
@@ -189,7 +190,8 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg)
thr->arg = arg;
// POSIX wants functions to return a void *, but we don't care.
- rv = pthread_create(&thr->tid, NULL, nni_plat_thr_main, thr);
+ rv = pthread_create(&thr->tid, &nni_pthread_attr,
+ nni_plat_thr_main, thr);
if (rv != 0) {
//nni_printf("pthread_create: %s", strerror(rv));
return (NNG_ENOMEM);
@@ -263,9 +265,27 @@ nni_plat_init(int (*helper)(void))
return (NNG_ENOMEM);
}
+ rv = pthread_attr_init(&nni_pthread_attr);
+ if (rv != 0) {
+ pthread_mutex_unlock(&nni_plat_lock);
+ (void) close(nni_plat_devnull);
+ pthread_mutexattr_destroy(&nni_mxattr);
+ pthread_condattr_destroy(&nni_cvattr);
+ return (NNG_ENOMEM);
+ }
+
+ // We don't force this, but we want to have it small... we could
+ // probably get by with even just 8k, but Linux usually wants 16k
+ // as a minimum. If this fails, its not fatal, just we won't be
+ // as scalable / thrifty with our use of VM.
+ (void) pthread_attr_setstacksize(&nni_pthread_attr, 16384);
+
if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) {
pthread_mutex_unlock(&nni_plat_lock);
(void) close(nni_plat_devnull);
+ pthread_mutexattr_destroy(&nni_mxattr);
+ pthread_condattr_destroy(&nni_cvattr);
+ pthread_attr_destroy(&nni_pthread_attr);
return (NNG_ENOMEM);
}
if ((rv = helper()) == 0) {
@@ -284,6 +304,7 @@ nni_plat_fini(void)
if (nni_plat_inited) {
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
+ pthread_attr_destroy(&nni_pthread_attr);
(void) close(nni_plat_devnull);
nni_plat_devnull = -1;
nni_plat_inited = 0;
diff --git a/src/platform/windows/win_debug.c b/src/platform/windows/win_debug.c
index 189f5d4c..00fd6493 100644
--- a/src/platform/windows/win_debug.c
+++ b/src/platform/windows/win_debug.c
@@ -40,26 +40,24 @@ nni_plat_strerror(int errnum)
}
-#define NNI_ERR(x, y) { x, y },
-
// Win32 has its own error codes, but these ones it shares with POSIX.
static struct {
int sys_err;
int nng_err;
}
nni_plat_errnos[] = {
- NNI_ERR(ENOENT, NNG_ENOENT)
- NNI_ERR(EINTR, NNG_EINTR)
- NNI_ERR(EINVAL, NNG_EINVAL)
- NNI_ERR(ENOMEM, NNG_ENOMEM)
- NNI_ERR(EACCES, NNG_EPERM)
- NNI_ERR(EAGAIN, NNG_EAGAIN)
- NNI_ERR(EBADF, NNG_ECLOSED)
- NNI_ERR(EBUSY, NNG_EBUSY)
- NNI_ERR(ENAMETOOLONG, NNG_EINVAL)
- NNI_ERR(EPERM, NNG_EPERM)
- NNI_ERR(EPIPE, NNG_ECLOSED)
- NNI_ERR(0, 0) // must be last
+ { ENOENT, NNG_ENOENT },
+ { EINTR, NNG_EINTR },
+ { EINVAL, NNG_EINVAL },
+ { ENOMEM, NNG_ENOMEM },
+ { EACCES, NNG_EPERM },
+ { EAGAIN, NNG_EAGAIN },
+ { EBADF, NNG_ECLOSED },
+ { EBUSY, NNG_EBUSY },
+ { ENAMETOOLONG, NNG_EINVAL },
+ { EPERM, NNG_EPERM },
+ { EPIPE, NNG_ECLOSED },
+ { 0, 0 } // must be last
};
int
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index b2714b23..f675f510 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -27,7 +27,16 @@
// elsewhere.
struct nni_plat_tcpsock {
- SOCKET s;
+ SOCKET s;
+
+ WSAOVERLAPPED recv_olpd;
+ WSAOVERLAPPED send_olpd;
+ WSAOVERLAPPED conn_olpd; // Use for both connect and accept
+
+ // We have to lookup some function pointers using ioctls. Winsock,
+ // gotta love it.
+ LPFN_CONNECTEX connectex;
+ LPFN_ACCEPTEX acceptex;
};
struct nni_plat_ipcsock {
@@ -47,7 +56,7 @@ struct nni_plat_mtx {
struct nni_plat_cv {
CONDITION_VARIABLE cv;
- CRITICAL_SECTION *cs;
+ CRITICAL_SECTION * cs;
};
#endif // PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index 682eb0a8..f4d4d004 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -115,10 +115,11 @@ nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
}
-void
+int
nni_plat_ipc_init(nni_plat_ipcsock *s)
{
s->p = INVALID_HANDLE_VALUE;
+ return (0);
}
@@ -136,7 +137,7 @@ void
nni_plat_ipc_shutdown(nni_plat_ipcsock *s)
{
if (s->p != INVALID_HANDLE_VALUE) {
-#if 0
+#if 0
(void) shutdown(s->fd, SHUT_RDWR);
// This causes the equivalent of a close. Hopefully waking
// up anything that didn't get the hint with the shutdown.
@@ -209,7 +210,7 @@ nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path)
return (rv);
}
s->fd = fd;
-#endif
+#endif
return (NNG_ENOTSUP);
}
@@ -244,7 +245,7 @@ nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path)
return (rv);
}
s->fd = fd;
-#endif
+#endif
return (NNG_ENOTSUP);
}
@@ -275,7 +276,7 @@ nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server)
nni_plat_ipc_setopts(fd);
s->fd = fd;
-#endif
+#endif
return (NNG_ENOTSUP);
}
diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c
index 3bd34b78..c7a7849d 100644
--- a/src/platform/windows/win_net.c
+++ b/src/platform/windows/win_net.c
@@ -11,46 +11,104 @@
#ifdef PLATFORM_WINDOWS
+#include <stdio.h>
+
+// Windows has infinite numbers of error codes it seems.
static struct {
int wsa_err;
int nng_err;
}
nni_plat_wsa_errnos[] = {
- { WSAECONNABORTED, NNG_ECLOSED },
- { WSAEINTR, NNG_EINTR },
+ { WSA_INVALID_HANDLE, NNG_ECLOSED },
+ { WSA_NOT_ENOUGH_MEMORY, NNG_ENOMEM },
+ { WSA_INVALID_PARAMETER, NNG_EINVAL },
+ { WSA_OPERATION_ABORTED, NNG_ECLOSED },
+ { WSA_IO_INCOMPLETE, NNG_EAGAIN },
+
+ { WSAEINTR, NNG_EINTR },
+ { WSAEBADF, NNG_ECLOSED },
+ { WSAEACCES, NNG_EPERM },
+ { WSAEFAULT, NNG_ESYSERR + WSAEFAULT },
+ { WSAEWOULDBLOCK, NNG_EAGAIN },
+ { WSAEINPROGRESS, NNG_EAGAIN },
+ { WSAEALREADY, NNG_ESYSERR + WSAEALREADY },
+ { WSAENOTSOCK, NNG_ECLOSED },
+ { WSAEMSGSIZE, NNG_EMSGSIZE },
+ { WSAEPROTOTYPE, NNG_ESYSERR + WSAEPROTOTYPE },
+ { WSAENOPROTOOPT, NNG_ENOTSUP },
+ { WSAEPROTONOSUPPORT, NNG_ENOTSUP },
+ { WSAEPROTONOSUPPORT, NNG_ENOTSUP },
+ { WSAEADDRINUSE, NNG_EADDRINUSE },
+ { WSAEADDRNOTAVAIL, NNG_EADDRINVAL },
+ { WSAENETDOWN, NNG_EUNREACHABLE },
+ { WSAENETUNREACH, NNG_EUNREACHABLE },
+ { WSAECONNABORTED, NNG_ETIMEDOUT },
+ { WSAECONNRESET, NNG_ECLOSED },
+ { WSAENOBUFS, NNG_ENOMEM },
+ { WSAEISCONN, NNG_ESYSERR + WSAEISCONN },
+ { WSAENOTCONN, NNG_ECLOSED },
+ { WSAESHUTDOWN, NNG_ECLOSED },
+ { WSAETOOMANYREFS, NNG_ESYSERR + WSAETOOMANYREFS },
+ { WSAETIMEDOUT, NNG_ETIMEDOUT },
+ { WSAECONNREFUSED, NNG_ECONNREFUSED },
+ { WSAELOOP, NNG_ESYSERR + WSAELOOP },
+ { WSAENAMETOOLONG, NNG_ESYSERR + WSAENAMETOOLONG },
+ { WSAEHOSTDOWN, NNG_EUNREACHABLE },
+ { WSAEHOSTUNREACH, NNG_EUNREACHABLE },
+ { WSAENOTEMPTY, NNG_ESYSERR + WSAENOTEMPTY },
+ { WSAEPROCLIM, NNG_ESYSERR + WSAEPROCLIM },
+ { WSAEUSERS, NNG_ESYSERR + WSAEUSERS },
+ { WSAEDQUOT, NNG_ESYSERR + WSAEDQUOT },
+ { WSAESTALE, NNG_ESYSERR + WSAESTALE },
+ { WSAEREMOTE, NNG_ESYSERR + WSAEREMOTE },
+ { WSASYSNOTREADY, NNG_ESYSERR + WSASYSNOTREADY },
+ { WSAVERNOTSUPPORTED, NNG_ENOTSUP },
+ { WSANOTINITIALISED, NNG_ESYSERR + WSANOTINITIALISED },
+ { WSAEDISCON, NNG_ECLOSED },
+ { WSAENOMORE, NNG_ESYSERR + WSAENOMORE },
+ { WSAECANCELLED, NNG_ESYSERR + WSAECANCELLED },
+ { WSAEINVALIDPROVIDER, NNG_ESYSERR + WSAEINVALIDPROVIDER },
+ { WSAEPROVIDERFAILEDINIT, NNG_ESYSERR + WSAEPROVIDERFAILEDINIT },
+ { WSASYSCALLFAILURE, NNG_ESYSERR + WSASYSCALLFAILURE },
+ { WSASERVICE_NOT_FOUND, NNG_ESYSERR + WSASERVICE_NOT_FOUND },
+ { WSATYPE_NOT_FOUND, NNG_ESYSERR + WSATYPE_NOT_FOUND },
+ { WSA_E_CANCELLED, NNG_ESYSERR + WSA_E_CANCELLED },
+ { WSAEREFUSED, NNG_ESYSERR + WSAEREFUSED },
+ { WSAHOST_NOT_FOUND, NNG_EADDRINVAL },
+ { WSATRY_AGAIN, NNG_EAGAIN },
+ { WSANO_RECOVERY, NNG_ESYSERR + WSANO_RECOVERY },
+ { WSANO_DATA, NNG_EADDRINVAL },
+ // Eliding all the QoS related errors.
+
+#if 0
// REVIEW THESE!!!
- { WSAECONNRESET, NNG_ECONNREFUSED },
- { WSAEMSGSIZE, NNG_EINVAL },
- { WSAENETDOWN, NNG_EUNREACHABLE },
- { WSAENETRESET, NNG_ECLOSED },
- { WSAENOBUFS, NNG_ENOMEM },
- { WSAESHUTDOWN, NNG_ECLOSED },
- { WSAEWOULDBLOCK, NNG_EAGAIN },
- { WSAEBADF, NNG_ECLOSED },
- { WSA_INVALID_HANDLE, NNG_ECLOSED },
- { WSA_NOT_ENOUGH_MEMORY, NNG_ENOMEM },
- { WSA_INVALID_PARAMETER, NNG_EINVAL },
- { WSAEACCES, NNG_EPERM },
- { 0, 0 }, // MUST BE LAST
+ { ERROR_BROKEN_PIPE, NNG_ECLOSED },
+ { ERROR_CONNECTION_REFUSED, NNG_ECONNREFUSED },
+ { ERROR_NOT_CONNECTED, NNG_ECLOSED },
+ { ERROR_PIPE_NOT_CONNECTED, NNG_ECLOSED },
+ { ERROR_NO_DATA, NNG_ECLOSED },
+#endif
+ // Must be Last!!
+ { 0, 0 },
};
static int
-nni_plat_wsa_last_error(void)
+nni_winsock_error(int werr)
{
- int errnum = WSAGetLastError();
int i;
- if (errnum == 0) {
+ if (werr == 0) {
return (0);
}
+
for (i = 0; nni_plat_wsa_errnos[i].nng_err != 0; i++) {
- if (errnum == nni_plat_wsa_errnos[i].wsa_err) {
+ if (werr == nni_plat_wsa_errnos[i].wsa_err) {
return (nni_plat_wsa_errnos[i].nng_err);
}
}
// Other system errno.
- return (NNG_ESYSERR + errnum);
+ return (NNG_ESYSERR + werr);
}
@@ -140,22 +198,53 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
{
WSABUF iov[4]; // We never have more than 3 at present
int i;
- DWORD sent = 0;
int rv;
+ DWORD offset;
+ DWORD nsent;
+ DWORD resid;
+ DWORD flags;
+ WSAOVERLAPPED *olp = &s->send_olpd;
if (cnt > 4) {
return (NNG_EINVAL);
}
- for (i = 0; i < cnt; i++) {
+ for (i = 0, resid = 0; i < cnt; resid += iov[i].len, i++) {
iov[i].buf = iovs[i].iov_buf;
iov[i].len = iovs[i].iov_len;
}
- rv = WSASend(s->s, iov, cnt, &sent, 0, NULL, NULL);
- if (rv != 0) {
- // XXX: CONVERT WSAGetLastError code.
- return (nni_plat_wsa_last_error());
+ i = 0;
+ while (resid) {
+ flags = 0;
+ rv = WSASend(s->s, &iov[i], cnt, &nsent, flags, olp, NULL);
+ if (rv == SOCKET_ERROR) {
+ if ((rv = WSAGetLastError()) != WSA_IO_PENDING) {
+ return (nni_winsock_error(rv));
+ }
+ flags = 0;
+ if (!WSAGetOverlappedResult(s->s, olp, &nsent,
+ TRUE, &flags)) {
+ return (nni_winsock_error(WSAGetLastError()));
+ }
+ }
+
+ if (nsent > resid) {
+ nni_panic("WSASend says it sent too much");
+ }
+
+ resid -= nsent;
+ while (nsent) {
+ if (iov[i].len <= nsent) {
+ nsent -= iov[i].len;
+ i++;
+ cnt--;
+ } else {
+ iov[i].len -= nsent;
+ iov[i].buf += nsent;
+ nsent = 0;
+ }
+ }
}
return (0);
@@ -167,29 +256,39 @@ nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
{
WSABUF iov[4]; // We never have more than 3 at present
int i;
- int offset;
- int resid = 0;
int rv;
+ DWORD offset;
+ DWORD resid;
DWORD nrecv;
+ DWORD flags;
+ WSAOVERLAPPED *olp = &s->recv_olpd;
if (cnt > 4) {
return (NNG_EINVAL);
}
- for (i = 0; i < cnt; i++) {
+ for (i = 0, resid = 0; i < cnt; resid += iov[i].len, i++) {
iov[i].buf = iovs[i].iov_buf;
iov[i].len = iovs[i].iov_len;
- resid += iov[i].len;
}
i = 0;
while (resid) {
- rv = WSARecv(s->s, iov, cnt, &nrecv, 0, NULL, NULL);
- if (rv != 0) {
- return (nni_plat_wsa_last_error());
+ flags = 0;
+ rv = WSARecv(s->s, &iov[i], cnt, &nrecv, &flags, olp, NULL);
+ if (rv == SOCKET_ERROR) {
+ if ((rv = WSAGetLastError()) != WSA_IO_PENDING) {
+ return (nni_winsock_error(rv));
+ }
+ flags = 0;
+ if (!WSAGetOverlappedResult(s->s, olp, &nrecv,
+ TRUE, &flags)) {
+ return (nni_winsock_error(WSAGetLastError()));
+ }
}
+
if (nrecv > resid) {
- nni_panic("readv says it read too much!");
+ nni_panic("WSARecv says it read too much!");
}
resid -= nrecv;
@@ -227,19 +326,74 @@ nni_plat_tcp_setopts(SOCKET fd)
}
-void
+int
nni_plat_tcp_init(nni_plat_tcpsock *s)
{
s->s = INVALID_SOCKET;
+ return (0);
+}
+
+
+static int
+nni_plat_tcp_open(nni_plat_tcpsock *s)
+{
+ int rv;
+ DWORD nbytes;
+ GUID guid1 = WSAID_CONNECTEX;
+ GUID guid2 = WSAID_ACCEPTEX;
+
+ ZeroMemory(s, sizeof (*s));
+ s->s = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
+ WSA_FLAG_NO_HANDLE_INHERIT|WSA_FLAG_OVERLAPPED);
+ if (s->s == INVALID_SOCKET) {
+ rv = WSAGetLastError();
+ return (nni_winsock_error(rv));
+ }
+
+ if (WSAIoctl(s->s, SIO_GET_EXTENSION_FUNCTION_POINTER,
+ &guid1, sizeof (guid1), &s->connectex, sizeof (s->connectex),
+ &nbytes, NULL, NULL) == SOCKET_ERROR) {
+ nni_panic("failed lookup for ConnectEx function");
+ }
+ if (WSAIoctl(s->s, SIO_GET_EXTENSION_FUNCTION_POINTER,
+ &guid2, sizeof (guid2), &s->acceptex, sizeof (s->acceptex),
+ &nbytes, NULL, NULL) == SOCKET_ERROR) {
+ nni_panic("failed lookup for AcceptEx function");
+ }
+
+ nni_plat_tcp_setopts(s->s);
+
+ return (0);
+}
+
+
+static void
+nni_plat_tcp_close(nni_plat_tcpsock *s)
+{
+ SOCKET fd;
+
+ if ((fd = s->s) != INVALID_SOCKET) {
+ s->s = INVALID_SOCKET;
+ (void) shutdown(fd, SD_BOTH);
+ (void) CancelIoEx((HANDLE) fd, &s->conn_olpd);
+ (void) CancelIoEx((HANDLE) fd, &s->recv_olpd);
+ (void) CancelIoEx((HANDLE) fd, &s->send_olpd);
+ (void) closesocket(fd);
+ }
}
void
nni_plat_tcp_fini(nni_plat_tcpsock *s)
{
- if (s->s != INVALID_SOCKET) {
- (void) closesocket(s->s);
+ SOCKET fd;
+
+ if ((fd = s->s) != INVALID_SOCKET) {
s->s = INVALID_SOCKET;
+ (void) CancelIoEx((HANDLE) fd, &s->conn_olpd);
+ (void) CancelIoEx((HANDLE) fd, &s->recv_olpd);
+ (void) CancelIoEx((HANDLE) fd, &s->send_olpd);
+ (void) closesocket(fd);
}
}
@@ -247,9 +401,7 @@ nni_plat_tcp_fini(nni_plat_tcpsock *s)
void
nni_plat_tcp_shutdown(nni_plat_tcpsock *s)
{
- if (s->s != INVALID_SOCKET) {
- (void) shutdown(s->s, SD_BOTH);
- }
+ nni_plat_tcp_close(s);
}
@@ -264,46 +416,39 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
{
int len;
SOCKADDR_STORAGE ss;
+ ULONG yes;
int rv;
- BOOL yes;
len = nni_plat_to_sockaddr(&ss, addr);
if (len < 0) {
return (NNG_EADDRINVAL);
}
- s->s = WSASocket(ss.ss_family, SOCK_STREAM, 0, NULL, 0,
- WSA_FLAG_NO_HANDLE_INHERIT);
- if (s->s == INVALID_SOCKET) {
- return (nni_plat_wsa_last_error());
+ if ((rv = nni_plat_tcp_open(s)) != 0) {
+ return (rv);
}
- nni_plat_tcp_setopts(s->s);
-
// Make sure that we use the address exclusively. Windows lets
// others hijack us by default.
yes = 1;
if (setsockopt(s->s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes,
sizeof (yes)) == SOCKET_ERROR) {
- rv = nni_plat_wsa_last_error();
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
- return (rv);
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
}
if (bind(s->s, (struct sockaddr *) &ss, len) != 0) {
- rv = nni_plat_wsa_last_error();
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
- return (rv);
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
}
// Listen -- 128 depth is probably sufficient. If it isn't, other
// bad things are going to happen.
if (listen(s->s, 128) != 0) {
- rv = nni_plat_wsa_last_error();
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
- return (rv);
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
}
return (0);
@@ -320,6 +465,10 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
int len;
SOCKADDR_STORAGE ss;
SOCKADDR_STORAGE bss;
+ WSAOVERLAPPED *olp = &s->conn_olpd;
+ BOOL ok;
+ DWORD nbytes;
+ DWORD flags;
int rv;
len = nni_plat_to_sockaddr(&ss, addr);
@@ -327,39 +476,44 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
return (NNG_EADDRINVAL);
}
- s->s = WSASocket(ss.ss_family, SOCK_STREAM, 0, NULL, 0,
- WSA_FLAG_NO_HANDLE_INHERIT);
- if (s->s == INVALID_SOCKET) {
- return (nni_plat_wsa_last_error());
- }
-
if (bindaddr != NULL) {
if (bindaddr->s_un.s_family != addr->s_un.s_family) {
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
- return (NNG_EINVAL);
+ return (NNG_EADDRINVAL);
}
if (nni_plat_to_sockaddr(&bss, bindaddr) < 0) {
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
return (NNG_EADDRINVAL);
}
- if (bind(s->s, (struct sockaddr *) &bss, len) < 0) {
- rv = nni_plat_wsa_last_error();
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
- return (rv);
- }
+ } else {
+ ZeroMemory(&bss, sizeof (bss));
+ bss.ss_family = ss.ss_family;
}
- nni_plat_tcp_setopts(s->s);
-
- if (connect(s->s, (struct sockaddr *) &ss, len) != 0) {
- rv = nni_plat_wsa_last_error();
- (void) closesocket(s->s);
- s->s = INVALID_SOCKET;
+ if ((rv = nni_plat_tcp_open(s)) != 0) {
return (rv);
}
+
+ // ConnectEx must always be bound first.
+ if (bind(s->s, (struct sockaddr *) &bss, len) < 0) {
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
+ }
+
+ if (s->connectex(s->s, (struct sockaddr *) &ss, len, NULL, 0, NULL,
+ olp)) {
+ // Immediate completion?
+ return (0);
+ }
+ if ((rv = WSAGetLastError()) != ERROR_IO_PENDING) {
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
+ }
+ nbytes = flags = 0;
+ if (!WSAGetOverlappedResult(s->s, olp, &nbytes, TRUE, &flags)) {
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
+ }
return (0);
}
@@ -367,26 +521,32 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
int
nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server)
{
- SOCKET fd;
- int err;
-
- for (;;) {
- fd = accept(server->s, NULL, NULL);
+ DWORD nbytes;
+ DWORD flags;
+ WSAOVERLAPPED *olp = &s->conn_olpd;
+ char ainfo[512];
+ int rv;
- if (fd == INVALID_SOCKET) {
- err = WSAGetLastError();
- if ((err == WSAECONNRESET) || (err == WSAEWOULDBLOCK)) {
- continue;
- }
- return (nni_plat_wsa_last_error());
- } else {
- break;
- }
+ if ((rv = nni_plat_tcp_open(s)) != 0) {
+ return (rv);
}
- nni_plat_tcp_setopts(fd);
-
- s->s = fd;
+ // 256 > (sizeof (SOCKADDR_STORAGE) + 16)
+ nbytes = 0;
+ if (s->acceptex(server->s, s->s, ainfo, 0, 256, 256, &nbytes, olp)) {
+ // Immediate completion?
+ return (0);
+ }
+ if ((rv = WSAGetLastError()) != ERROR_IO_PENDING) {
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
+ }
+ nbytes = flags = 0;
+ if (!WSAGetOverlappedResult(server->s, olp, &nbytes, TRUE, &flags)) {
+ rv = WSAGetLastError();
+ nni_plat_tcp_close(s);
+ return (nni_winsock_error(rv));
+ }
return (0);
}
diff --git a/src/platform/windows/win_rand.c b/src/platform/windows/win_rand.c
index 1cabb6a0..8bca5927 100644
--- a/src/platform/windows/win_rand.c
+++ b/src/platform/windows/win_rand.c
@@ -17,13 +17,14 @@ void
nni_plat_seed_prng(void *buf, size_t bufsz)
{
unsigned val;
+
// The rand_s routine uses RtlGenRandom to get high quality
// pseudo random numbers (i.e. numbers that should be good enough
// for use with crypto keying.)
while (bufsz > sizeof (val)) {
rand_s(&val);
memcpy(buf, &val, sizeof (val));
- buf = (((char *)buf) + sizeof (val));
+ buf = (((char *) buf) + sizeof (val));
bufsz -= sizeof (val);
}
}
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index 4f323616..631883fc 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -125,8 +125,11 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg)
thr->func = fn;
thr->arg = arg;
- thr->handle = (HANDLE) _beginthreadex(NULL, 0,
- nni_plat_thr_main, thr, 0, NULL);
+ // We could probably even go down to 8k... but crypto for some
+ // protocols might get bigger than this. 1MB is waaay too big.
+ thr->handle = (HANDLE) _beginthreadex(NULL, 16384,
+ nni_plat_thr_main, thr, STACK_SIZE_PARAM_IS_A_RESERVATION,
+ NULL);
if (thr->handle == NULL) {
return (NNG_ENOMEM); // Best guess...
}
@@ -166,6 +169,17 @@ nni_plat_init(int (*helper)(void))
Sleep(1);
}
if (!inited) {
+ WSADATA data;
+ WORD ver;
+ ver = MAKEWORD(2, 2);
+ if (WSAStartup(MAKEWORD(2, 2), &data) != 0) {
+ InterlockedExchange(&initing, 0);
+ if ((LOBYTE(data.wVersion) != 2) ||
+ (HIBYTE(data.wVersion) != 2)) {
+ nni_panic("got back wrong winsock ver");
+ }
+ return (NNG_ENOMEM);
+ }
helper();
inited = 1;
}
@@ -178,6 +192,7 @@ nni_plat_init(int (*helper)(void))
void
nni_plat_fini(void)
{
+ WSACleanup();
}
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 8a28dfe5..2ddc74b7 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -190,7 +190,10 @@ nni_ipc_ep_init(void **epp, const char *url, uint16_t proto)
ep->closed = 0;
ep->proto = proto;
ep->rcvmax = 1024 * 1024; // XXX: fix this
- nni_plat_ipc_init(&ep->fd);
+ if ((rv = nni_plat_ipc_init(&ep->fd)) != 0) {
+ NNI_FREE_STRUCT(ep);
+ return (rv);
+ }
(void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
@@ -273,12 +276,16 @@ nni_ipc_ep_connect(void *arg, void **pipep)
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
- nni_plat_ipc_init(&pipe->fd);
+ if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
rv = nni_plat_ipc_connect(&pipe->fd, path);
if (rv != 0) {
+ nni_plat_ipc_fini(&pipe->fd);
NNI_FREE_STRUCT(pipe);
return (rv);
}
@@ -327,9 +334,13 @@ nni_ipc_ep_accept(void *arg, void **pipep)
}
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
- nni_plat_ipc_init(&pipe->fd);
+ if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
if ((rv = nni_plat_ipc_accept(&pipe->fd, &ep->fd)) != 0) {
+ nni_plat_ipc_fini(&pipe->fd);
NNI_FREE_STRUCT(pipe);
return (rv);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 9b8cdb1d..8fcf07e5 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -180,7 +180,11 @@ nni_tcp_ep_init(void **epp, const char *url, uint16_t proto)
ep->proto = proto;
ep->ipv4only = 0;
ep->rcvmax = 1024 * 1024; // XXX: fix this
- nni_plat_tcp_init(&ep->fd);
+
+ if ((rv = nni_plat_tcp_init(&ep->fd)) != 0) {
+ NNI_FREE_STRUCT(ep);
+ return (rv);
+ }
(void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
@@ -347,7 +351,10 @@ nni_tcp_ep_connect(void *arg, void **pipep)
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
- nni_plat_tcp_init(&pipe->fd);
+ if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
@@ -357,6 +364,7 @@ nni_tcp_ep_connect(void *arg, void **pipep)
bindaddr = lclpart == NULL ? NULL : &lcladdr;
rv = nni_plat_tcp_connect(&pipe->fd, &remaddr, bindaddr);
if (rv != 0) {
+ nni_plat_tcp_fini(&pipe->fd);
NNI_FREE_STRUCT(pipe);
return (rv);
}
@@ -416,9 +424,13 @@ nni_tcp_ep_accept(void *arg, void **pipep)
}
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
- nni_plat_tcp_init(&pipe->fd);
+
+ if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ }
if ((rv = nni_plat_tcp_accept(&pipe->fd, &ep->fd)) != 0) {
+ nni_plat_tcp_fini(&pipe->fd);
NNI_FREE_STRUCT(pipe);
return (rv);
}