From 869d0eeb20657cd6d2e87d8c4836b086c6be448d Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 29 Sep 2017 15:27:08 -0700 Subject: Windows UDP support. This implements the basic UDP functionality for Windows (required for ZeroTier for example). We have also introduced a UDP test suite to validate that this actually works. While here a few Windows compilation warnings / nits were fixed. --- src/CMakeLists.txt | 6 +- src/core/options.c | 2 +- src/platform/windows/win_impl.h | 6 + src/platform/windows/win_net.c | 650 ------------------------------------ src/platform/windows/win_sockaddr.c | 67 ++++ src/platform/windows/win_tcp.c | 624 ++++++++++++++++++++++++++++++++++ src/platform/windows/win_thread.c | 16 +- src/platform/windows/win_udp.c | 303 +++++++++++++++++ 8 files changed, 1011 insertions(+), 663 deletions(-) delete mode 100644 src/platform/windows/win_net.c create mode 100644 src/platform/windows/win_sockaddr.c create mode 100644 src/platform/windows/win_tcp.c create mode 100644 src/platform/windows/win_udp.c (limited to 'src') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 17b14d2e..ffcb2b7b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -133,12 +133,14 @@ if (NNG_PLATFORM_WINDOWS) platform/windows/win_debug.c platform/windows/win_iocp.c platform/windows/win_ipc.c - platform/windows/win_net.c platform/windows/win_pipe.c platform/windows/win_rand.c platform/windows/win_resolv.c + platform/windows/win_sockaddr.c + platform/windows/win_tcp.c platform/windows/win_thread.c - ) + platform/windows/win_udp.c + ) endif() if (NNG_ENABLE_ZEROTIER) diff --git a/src/core/options.c b/src/core/options.c index d08ac1cb..aa744642 100644 --- a/src/core/options.c +++ b/src/core/options.c @@ -149,7 +149,7 @@ nni_getopt_int(int i, void *val, size_t *sizep) } int -nni_getopt_u64(const uint64_t u, void *val, size_t *sizep) +nni_getopt_u64(uint64_t u, void *val, size_t *sizep) { size_t sz = sizeof(u); diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index d1c5b2a2..c2549266 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -93,9 +93,15 @@ extern void nni_win_ipc_sysfini(void); extern int nni_win_tcp_sysinit(void); extern void nni_win_tcp_sysfini(void); +extern int nni_win_udp_sysinit(void); +extern void nni_win_udp_sysfini(void); + extern int nni_win_resolv_sysinit(void); extern void nni_win_resolv_sysfini(void); +extern int nni_win_sockaddr2nn(nni_sockaddr *, const SOCKADDR_STORAGE *); +extern int nni_win_nn2sockaddr(SOCKADDR_STORAGE *, const nni_sockaddr *); + #endif // NNG_PLATFORM_WINDOWS #endif // PLATFORM_WIN_IMPL_H diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c deleted file mode 100644 index 07c30e11..00000000 --- a/src/platform/windows/win_net.c +++ /dev/null @@ -1,650 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef NNG_PLATFORM_WINDOWS - -#include - -struct nni_plat_tcp_pipe { - SOCKET s; - nni_win_event rcv_ev; - nni_win_event snd_ev; -}; - -struct nni_plat_tcp_ep { - SOCKET s; - SOCKET acc_s; - nni_win_event con_ev; - nni_win_event acc_ev; - int started; - int bound; - - SOCKADDR_STORAGE remaddr; - int remlen; - SOCKADDR_STORAGE locaddr; - int loclen; - - char buf[512]; // to hold acceptex results - - // We have to lookup some function pointers using ioctls. Winsock, - // gotta love it. - LPFN_CONNECTEX connectex; - LPFN_ACCEPTEX acceptex; -}; - -static int nni_win_tcp_pipe_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_pipe_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_pipe_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_pipe_ops = { - .wev_start = nni_win_tcp_pipe_start, - .wev_finish = nni_win_tcp_pipe_finish, - .wev_cancel = nni_win_tcp_pipe_cancel, -}; - -static int nni_win_tcp_acc_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_acc_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_acc_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_acc_ops = { - .wev_start = nni_win_tcp_acc_start, - .wev_finish = nni_win_tcp_acc_finish, - .wev_cancel = nni_win_tcp_acc_cancel, -}; - -static int nni_win_tcp_con_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_con_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_con_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_con_ops = { - .wev_start = nni_win_tcp_con_start, - .wev_finish = nni_win_tcp_con_finish, - .wev_cancel = nni_win_tcp_con_cancel, -}; - -static void -nni_win_tcp_sockinit(SOCKET s) -{ - BOOL yes; - DWORD no; - - // Don't inherit the handle (CLOEXEC really). - SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0); - - no = 0; - (void) setsockopt( - s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); - - // Also disable Nagle. We are careful to group data with WSASend, - // and latency is king for most of our users. (Consider adding - // a method to enable this later.) - yes = 1; - (void) setsockopt( - s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); -} - -static int -nni_win_tcp_addr(SOCKADDR_STORAGE *ss, const nni_sockaddr *sa) -{ - SOCKADDR_IN * sin; - SOCKADDR_IN6 *sin6; - - switch (sa->s_un.s_family) { - case NNG_AF_INET: - sin = (void *) ss; - memset(sin, 0, sizeof(*sin)); - sin->sin_family = PF_INET; - sin->sin_port = sa->s_un.s_in.sa_port; - sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; - return (sizeof(*sin)); - - case NNG_AF_INET6: - sin6 = (void *) ss; - memset(sin6, 0, sizeof(*sin6)); - sin6->sin6_family = PF_INET6; - sin6->sin6_port = sa->s_un.s_in6.sa_port; - memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); - return (sizeof(*sin6)); - } - return (-1); -} - -static int -nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) -{ - int rv; - SOCKET s; - WSABUF iov[4]; - DWORD niov; - DWORD flags; - nni_plat_tcp_pipe *pipe = evt->ptr; - int i; - - NNI_ASSERT(aio->a_niov > 0); - NNI_ASSERT(aio->a_niov <= 4); - NNI_ASSERT(aio->a_iov[0].iov_len > 0); - NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); - - niov = aio->a_niov; - - // Put the AIOs in Windows form. - for (i = 0; i < aio->a_niov; i++) { - iov[i].buf = aio->a_iov[i].iov_buf; - iov[i].len = (ULONG) aio->a_iov[i].iov_len; - } - - if ((s = pipe->s) == INVALID_SOCKET) { - evt->status = NNG_ECLOSED; - evt->count = 0; - return (1); - } - - // Note that the IOVs for the event were prepared on entry already. - // The actual aio's iov array we don't touch. - - evt->count = 0; - flags = 0; - if (evt == &pipe->snd_ev) { - rv = WSASend(s, iov, niov, NULL, flags, &evt->olpd, NULL); - } else { - rv = WSARecv(s, iov, niov, NULL, &flags, &evt->olpd, NULL); - } - - if ((rv == SOCKET_ERROR) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - - // Wait for the I/O completion event. Note that when an I/O - // completes immediately, the I/O completion packet is still - // delivered. - return (0); -} - -static void -nni_win_tcp_pipe_cancel(nni_win_event *evt) -{ - nni_plat_tcp_pipe *pipe = evt->ptr; - - (void) CancelIoEx((HANDLE) pipe->s, &evt->olpd); -} - -static void -nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio) -{ - int rv; - size_t cnt; - - cnt = evt->count; - if ((rv = evt->status) == 0) { - int i; - aio->a_count += cnt; - - while (cnt > 0) { - // If we didn't write the first full iov, - // then we're done for now. Record progress - // and move on. - if (cnt < aio->a_iov[0].iov_len) { - aio->a_iov[0].iov_len -= cnt; - aio->a_iov[0].iov_buf = - (char *) aio->a_iov[0].iov_buf + cnt; - break; - } - - // We consumed the full iov, so just move the - // remaininng ones up, and decrement count handled. - cnt -= aio->a_iov[0].iov_len; - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } - NNI_ASSERT(aio->a_niov > 0); - aio->a_niov--; - } - - if (aio->a_niov > 0) { - // If we have more to do, submit it! - nni_win_event_resubmit(evt, aio); - return; - } - } - - // All done; hopefully successfully. - nni_aio_finish(aio, rv, aio->a_count); -} - -static int -nni_win_tcp_pipe_init(nni_plat_tcp_pipe **pipep, SOCKET s) -{ - nni_plat_tcp_pipe *pipe; - int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_tcp_pipe_ops, pipe); - if (rv != 0) { - nni_plat_tcp_pipe_fini(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->snd_ev, &nni_win_tcp_pipe_ops, pipe); - if (rv != 0) { - nni_plat_tcp_pipe_fini(pipe); - return (rv); - } - nni_win_tcp_sockinit(s); - pipe->s = s; - *pipep = pipe; - return (0); -} - -void -nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->snd_ev, aio); -} - -void -nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->rcv_ev, aio); -} - -void -nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *pipe) -{ - SOCKET s; - - nni_win_event_close(&pipe->rcv_ev); - - if ((s = pipe->s) != INVALID_SOCKET) { - pipe->s = INVALID_SOCKET; - closesocket(s); - } -} - -void -nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *pipe) -{ - nni_plat_tcp_pipe_close(pipe); - - nni_win_event_fini(&pipe->snd_ev); - nni_win_event_fini(&pipe->rcv_ev); - NNI_FREE_STRUCT(pipe); -} - -int -nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa, - const nni_sockaddr *rsa, int mode) -{ - nni_plat_tcp_ep *ep; - int rv; - SOCKET s; - DWORD nbytes; - GUID guid1 = WSAID_CONNECTEX; - GUID guid2 = WSAID_ACCEPTEX; - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } - ZeroMemory(ep, sizeof(ep)); - - ep->s = INVALID_SOCKET; - - if (rsa->s_un.s_family != NNG_AF_UNSPEC) { - ep->remlen = nni_win_tcp_addr(&ep->remaddr, rsa); - } - if (lsa->s_un.s_family != NNG_AF_UNSPEC) { - ep->loclen = nni_win_tcp_addr(&ep->locaddr, lsa); - } - - // Create a scratch socket for use with ioctl. - s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - // Look up the function pointer. - if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, - sizeof(guid1), &ep->connectex, sizeof(ep->connectex), &nbytes, - NULL, NULL) == SOCKET_ERROR) { - rv = nni_win_error(GetLastError()); - goto fail; - } - if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, - sizeof(guid2), &ep->acceptex, sizeof(ep->acceptex), &nbytes, - NULL, NULL) == SOCKET_ERROR) { - rv = nni_win_error(GetLastError()); - goto fail; - } - closesocket(s); - s = INVALID_SOCKET; - - // Now initialize the win events for later use. - rv = nni_win_event_init(&ep->acc_ev, &nni_win_tcp_acc_ops, ep); - if (rv != 0) { - goto fail; - } - rv = nni_win_event_init(&ep->con_ev, &nni_win_tcp_con_ops, ep); - if (rv != 0) { - goto fail; - } - - *epp = ep; - return (0); - -fail: - if (s != INVALID_SOCKET) { - closesocket(s); - } - nni_plat_tcp_ep_fini(ep); - return (rv); -} - -void -nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) -{ - nni_win_event_close(&ep->acc_ev); - nni_win_event_close(&ep->con_ev); - if (ep->s != INVALID_SOCKET) { - closesocket(ep->s); - ep->s = INVALID_SOCKET; - } - if (ep->acc_s != INVALID_SOCKET) { - closesocket(ep->acc_s); - } -} - -void -nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) -{ - nni_plat_tcp_ep_close(ep); - NNI_FREE_STRUCT(ep); -} - -static int -nni_win_tcp_listen(nni_plat_tcp_ep *ep) -{ - int rv; - BOOL yes; - SOCKET s; - - if (ep->started) { - return (NNG_EBUSY); - } - - s = socket(ep->locaddr.ss_family, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - nni_win_tcp_sockinit(s); - - if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { - goto fail; - } - - // Make sure that we use the address exclusively. Windows lets - // others hijack us by default. - yes = 1; - - rv = setsockopt( - s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, sizeof(yes)); - if (rv != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - if (bind(s, (struct sockaddr *) &ep->locaddr, ep->loclen) != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - if (listen(s, SOMAXCONN) != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - ep->s = s; - ep->started = 1; - - return (0); - -fail: - if (s != INVALID_SOCKET) { - closesocket(s); - } - return (rv); -} - -int -nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep) -{ - int rv; - - nni_mtx_lock(&ep->acc_ev.mtx); - rv = nni_win_tcp_listen(ep); - nni_mtx_unlock(&ep->acc_ev.mtx); - return (rv); -} - -static void -nni_win_tcp_acc_cancel(nni_win_event *evt) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - - if (s != INVALID_SOCKET) { - CancelIoEx((HANDLE) s, &evt->olpd); - } -} - -static void -nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep * ep = evt->ptr; - nni_plat_tcp_pipe *pipe; - SOCKET s; - int rv; - - s = ep->acc_s; - ep->acc_s = INVALID_SOCKET; - - if (s == INVALID_SOCKET) { - return; - } - - if (((rv = evt->status) != 0) || - ((rv = nni_win_iocp_register((HANDLE) s)) != 0) || - ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { - closesocket(s); - nni_aio_finish_error(aio, rv); - return; - } - - nni_aio_finish_pipe(aio, pipe); -} - -static int -nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - SOCKET acc_s; - DWORD cnt; - - acc_s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); - if (acc_s == INVALID_SOCKET) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - return (1); - } - ep->acc_s = acc_s; - - if (!ep->acceptex(s, acc_s, ep->buf, 0, 256, 256, &cnt, &evt->olpd)) { - int rv = GetLastError(); - switch (rv) { - case ERROR_IO_PENDING: - // Normal asynchronous operation. Wait for - // completion. - return (0); - - default: - // Fast-fail (synchronous). - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - } - - // Synch completion right now. I/O completion packet delivered - // already. - return (0); -} - -void -nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - aio->a_pipe = NULL; - nni_win_event_submit(&ep->acc_ev, aio); -} - -static void -nni_win_tcp_con_cancel(nni_win_event *evt) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - - if (s != INVALID_SOCKET) { - CancelIoEx((HANDLE) s, &evt->olpd); - } -} - -static void -nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep * ep = evt->ptr; - nni_plat_tcp_pipe *pipe; - SOCKET s; - int rv; - - s = ep->s; - ep->s = INVALID_SOCKET; - - // The socket was already registered with the IOCP. - - if (((rv = evt->status) != 0) || - ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { - // The new pipe is already fine for us. Discard - // the old one, since failed to be able to use it. - closesocket(s); - nni_aio_finish_error(aio, rv); - return; - } - - aio->a_pipe = pipe; - nni_aio_finish(aio, 0, 0); -} - -static int -nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s; - SOCKADDR_STORAGE bss; - int len; - int rv; - int family; - - if (ep->loclen > 0) { - family = ep->locaddr.ss_family; - } else { - family = ep->remaddr.ss_family; - } - - s = socket(family, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - return (1); - } - - nni_win_tcp_sockinit(s); - - // Windows ConnectEx requires the socket to be bound first. - if (ep->loclen > 0) { - bss = ep->locaddr; - len = ep->loclen; - } else { - ZeroMemory(&bss, sizeof(bss)); - bss.ss_family = ep->remaddr.ss_family; - len = ep->remlen; - } - if (bind(s, (struct sockaddr *) &bss, len) < 0) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - closesocket(s); - - return (1); - } - // Register with the I/O completion port so we can get the - // events for the next call. - if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { - closesocket(s); - evt->status = rv; - evt->count = 0; - return (1); - } - - ep->s = s; - if (!ep->connectex(s, (struct sockaddr *) &ep->remaddr, ep->remlen, - NULL, 0, NULL, &evt->olpd)) { - if ((rv = GetLastError()) != ERROR_IO_PENDING) { - closesocket(s); - ep->s = INVALID_SOCKET; - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - } - return (0); -} - -extern void -nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - aio->a_pipe = NULL; - nni_win_event_submit(&ep->con_ev, aio); -} - -int -nni_win_tcp_sysinit(void) -{ - WSADATA data; - if (WSAStartup(MAKEWORD(2, 2), &data) != 0) { - NNI_ASSERT(LOBYTE(data.wVersion) == 2); - NNI_ASSERT(HIBYTE(data.wVersion) == 2); - return (nni_win_error(GetLastError())); - } - return (0); -} - -void -nni_win_tcp_sysfini(void) -{ - WSACleanup(); -} - -#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_sockaddr.c b/src/platform/windows/win_sockaddr.c new file mode 100644 index 00000000..0fa6dd51 --- /dev/null +++ b/src/platform/windows/win_sockaddr.c @@ -0,0 +1,67 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include + +int +nni_win_nn2sockaddr(SOCKADDR_STORAGE *ss, const nni_sockaddr *sa) +{ + SOCKADDR_IN * sin; + SOCKADDR_IN6 *sin6; + + switch (sa->s_un.s_family) { + case NNG_AF_INET: + sin = (void *) ss; + memset(sin, 0, sizeof(*sin)); + sin->sin_family = PF_INET; + sin->sin_port = sa->s_un.s_in.sa_port; + sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; + return (sizeof(*sin)); + + case NNG_AF_INET6: + sin6 = (void *) ss; + memset(sin6, 0, sizeof(*sin6)); + sin6->sin6_family = PF_INET6; + sin6->sin6_port = sa->s_un.s_in6.sa_port; + memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); + return (sizeof(*sin6)); + } + return (-1); +} + +int +nni_win_sockaddr2nn(nni_sockaddr *sa, const SOCKADDR_STORAGE *ss) +{ + SOCKADDR_IN * sin; + SOCKADDR_IN6 *sin6; + + switch (ss->ss_family) { + case PF_INET: + sin = (void *) ss; + sa->s_un.s_in.sa_family = NNG_AF_INET; + sa->s_un.s_in.sa_port = sin->sin_port; + sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr; + return (0); + + case PF_INET6: + sin6 = (void *) ss; + sa->s_un.s_in6.sa_family = NNG_AF_INET6; + sa->s_un.s_in6.sa_port = sin6->sin6_port; + memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); + return (0); + } + return (-1); +} + +#endif // NNG_PLATFORM_WINDOWS \ No newline at end of file diff --git a/src/platform/windows/win_tcp.c b/src/platform/windows/win_tcp.c new file mode 100644 index 00000000..d34ef7a6 --- /dev/null +++ b/src/platform/windows/win_tcp.c @@ -0,0 +1,624 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include + +struct nni_plat_tcp_pipe { + SOCKET s; + nni_win_event rcv_ev; + nni_win_event snd_ev; +}; + +struct nni_plat_tcp_ep { + SOCKET s; + SOCKET acc_s; + nni_win_event con_ev; + nni_win_event acc_ev; + int started; + int bound; + + SOCKADDR_STORAGE remaddr; + int remlen; + SOCKADDR_STORAGE locaddr; + int loclen; + + char buf[512]; // to hold acceptex results + + // We have to lookup some function pointers using ioctls. Winsock, + // gotta love it. + LPFN_CONNECTEX connectex; + LPFN_ACCEPTEX acceptex; +}; + +static int nni_win_tcp_pipe_start(nni_win_event *, nni_aio *); +static void nni_win_tcp_pipe_finish(nni_win_event *, nni_aio *); +static void nni_win_tcp_pipe_cancel(nni_win_event *); + +static nni_win_event_ops nni_win_tcp_pipe_ops = { + .wev_start = nni_win_tcp_pipe_start, + .wev_finish = nni_win_tcp_pipe_finish, + .wev_cancel = nni_win_tcp_pipe_cancel, +}; + +static int nni_win_tcp_acc_start(nni_win_event *, nni_aio *); +static void nni_win_tcp_acc_finish(nni_win_event *, nni_aio *); +static void nni_win_tcp_acc_cancel(nni_win_event *); + +static nni_win_event_ops nni_win_tcp_acc_ops = { + .wev_start = nni_win_tcp_acc_start, + .wev_finish = nni_win_tcp_acc_finish, + .wev_cancel = nni_win_tcp_acc_cancel, +}; + +static int nni_win_tcp_con_start(nni_win_event *, nni_aio *); +static void nni_win_tcp_con_finish(nni_win_event *, nni_aio *); +static void nni_win_tcp_con_cancel(nni_win_event *); + +static nni_win_event_ops nni_win_tcp_con_ops = { + .wev_start = nni_win_tcp_con_start, + .wev_finish = nni_win_tcp_con_finish, + .wev_cancel = nni_win_tcp_con_cancel, +}; + +static void +nni_win_tcp_sockinit(SOCKET s) +{ + BOOL yes; + DWORD no; + + // Don't inherit the handle (CLOEXEC really). + SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0); + + no = 0; + (void) setsockopt( + s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); + + // Also disable Nagle. We are careful to group data with WSASend, + // and latency is king for most of our users. (Consider adding + // a method to enable this later.) + yes = 1; + (void) setsockopt( + s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); +} + +static int +nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) +{ + int rv; + SOCKET s; + WSABUF iov[4]; + DWORD niov; + DWORD flags; + nni_plat_tcp_pipe *pipe = evt->ptr; + int i; + + NNI_ASSERT(aio->a_niov > 0); + NNI_ASSERT(aio->a_niov <= 4); + NNI_ASSERT(aio->a_iov[0].iov_len > 0); + NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); + + niov = aio->a_niov; + + // Put the AIOs in Windows form. + for (i = 0; i < aio->a_niov; i++) { + iov[i].buf = aio->a_iov[i].iov_buf; + iov[i].len = (ULONG) aio->a_iov[i].iov_len; + } + + if ((s = pipe->s) == INVALID_SOCKET) { + evt->status = NNG_ECLOSED; + evt->count = 0; + return (1); + } + + // Note that the IOVs for the event were prepared on entry already. + // The actual aio's iov array we don't touch. + + evt->count = 0; + flags = 0; + if (evt == &pipe->snd_ev) { + rv = WSASend(s, iov, niov, NULL, flags, &evt->olpd, NULL); + } else { + rv = WSARecv(s, iov, niov, NULL, &flags, &evt->olpd, NULL); + } + + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + evt->status = nni_win_error(rv); + evt->count = 0; + return (1); + } + + // Wait for the I/O completion event. Note that when an I/O + // completes immediately, the I/O completion packet is still + // delivered. + return (0); +} + +static void +nni_win_tcp_pipe_cancel(nni_win_event *evt) +{ + nni_plat_tcp_pipe *pipe = evt->ptr; + + (void) CancelIoEx((HANDLE) pipe->s, &evt->olpd); +} + +static void +nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio) +{ + int rv; + size_t cnt; + + cnt = evt->count; + if ((rv = evt->status) == 0) { + int i; + aio->a_count += cnt; + + while (cnt > 0) { + // If we didn't write the first full iov, + // then we're done for now. Record progress + // and move on. + if (cnt < aio->a_iov[0].iov_len) { + aio->a_iov[0].iov_len -= cnt; + aio->a_iov[0].iov_buf = + (char *) aio->a_iov[0].iov_buf + cnt; + break; + } + + // We consumed the full iov, so just move the + // remaininng ones up, and decrement count handled. + cnt -= aio->a_iov[0].iov_len; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i - 1] = aio->a_iov[i]; + } + NNI_ASSERT(aio->a_niov > 0); + aio->a_niov--; + } + + if (aio->a_niov > 0) { + // If we have more to do, submit it! + nni_win_event_resubmit(evt, aio); + return; + } + } + + // All done; hopefully successfully. + nni_aio_finish(aio, rv, aio->a_count); +} + +static int +nni_win_tcp_pipe_init(nni_plat_tcp_pipe **pipep, SOCKET s) +{ + nni_plat_tcp_pipe *pipe; + int rv; + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_tcp_pipe_ops, pipe); + if (rv != 0) { + nni_plat_tcp_pipe_fini(pipe); + return (rv); + } + rv = nni_win_event_init(&pipe->snd_ev, &nni_win_tcp_pipe_ops, pipe); + if (rv != 0) { + nni_plat_tcp_pipe_fini(pipe); + return (rv); + } + nni_win_tcp_sockinit(s); + pipe->s = s; + *pipep = pipe; + return (0); +} + +void +nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *pipe, nni_aio *aio) +{ + nni_win_event_submit(&pipe->snd_ev, aio); +} + +void +nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *pipe, nni_aio *aio) +{ + nni_win_event_submit(&pipe->rcv_ev, aio); +} + +void +nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *pipe) +{ + SOCKET s; + + nni_win_event_close(&pipe->rcv_ev); + + if ((s = pipe->s) != INVALID_SOCKET) { + pipe->s = INVALID_SOCKET; + closesocket(s); + } +} + +void +nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *pipe) +{ + nni_plat_tcp_pipe_close(pipe); + + nni_win_event_fini(&pipe->snd_ev); + nni_win_event_fini(&pipe->rcv_ev); + NNI_FREE_STRUCT(pipe); +} + +int +nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa, + const nni_sockaddr *rsa, int mode) +{ + nni_plat_tcp_ep *ep; + int rv; + SOCKET s; + DWORD nbytes; + GUID guid1 = WSAID_CONNECTEX; + GUID guid2 = WSAID_ACCEPTEX; + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + ZeroMemory(ep, sizeof(ep)); + + ep->s = INVALID_SOCKET; + + if (rsa->s_un.s_family != NNG_AF_UNSPEC) { + ep->remlen = nni_win_nn2sockaddr(&ep->remaddr, rsa); + } + if (lsa->s_un.s_family != NNG_AF_UNSPEC) { + ep->loclen = nni_win_nn2sockaddr(&ep->locaddr, lsa); + } + + // Create a scratch socket for use with ioctl. + s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + goto fail; + } + + // Look up the function pointer. + if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, + sizeof(guid1), &ep->connectex, sizeof(ep->connectex), &nbytes, + NULL, NULL) == SOCKET_ERROR) { + rv = nni_win_error(GetLastError()); + goto fail; + } + if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, + sizeof(guid2), &ep->acceptex, sizeof(ep->acceptex), &nbytes, + NULL, NULL) == SOCKET_ERROR) { + rv = nni_win_error(GetLastError()); + goto fail; + } + closesocket(s); + s = INVALID_SOCKET; + + // Now initialize the win events for later use. + rv = nni_win_event_init(&ep->acc_ev, &nni_win_tcp_acc_ops, ep); + if (rv != 0) { + goto fail; + } + rv = nni_win_event_init(&ep->con_ev, &nni_win_tcp_con_ops, ep); + if (rv != 0) { + goto fail; + } + + *epp = ep; + return (0); + +fail: + if (s != INVALID_SOCKET) { + closesocket(s); + } + nni_plat_tcp_ep_fini(ep); + return (rv); +} + +void +nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) +{ + nni_win_event_close(&ep->acc_ev); + nni_win_event_close(&ep->con_ev); + if (ep->s != INVALID_SOCKET) { + closesocket(ep->s); + ep->s = INVALID_SOCKET; + } + if (ep->acc_s != INVALID_SOCKET) { + closesocket(ep->acc_s); + } +} + +void +nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) +{ + nni_plat_tcp_ep_close(ep); + NNI_FREE_STRUCT(ep); +} + +static int +nni_win_tcp_listen(nni_plat_tcp_ep *ep) +{ + int rv; + BOOL yes; + SOCKET s; + + if (ep->started) { + return (NNG_EBUSY); + } + + s = socket(ep->locaddr.ss_family, SOCK_STREAM, IPPROTO_TCP); + if (s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + goto fail; + } + + nni_win_tcp_sockinit(s); + + if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { + goto fail; + } + + // Make sure that we use the address exclusively. Windows lets + // others hijack us by default. + yes = 1; + + rv = setsockopt( + s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, sizeof(yes)); + if (rv != 0) { + rv = nni_win_error(GetLastError()); + goto fail; + } + if (bind(s, (struct sockaddr *) &ep->locaddr, ep->loclen) != 0) { + rv = nni_win_error(GetLastError()); + goto fail; + } + + if (listen(s, SOMAXCONN) != 0) { + rv = nni_win_error(GetLastError()); + goto fail; + } + + ep->s = s; + ep->started = 1; + + return (0); + +fail: + if (s != INVALID_SOCKET) { + closesocket(s); + } + return (rv); +} + +int +nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep) +{ + int rv; + + nni_mtx_lock(&ep->acc_ev.mtx); + rv = nni_win_tcp_listen(ep); + nni_mtx_unlock(&ep->acc_ev.mtx); + return (rv); +} + +static void +nni_win_tcp_acc_cancel(nni_win_event *evt) +{ + nni_plat_tcp_ep *ep = evt->ptr; + SOCKET s = ep->s; + + if (s != INVALID_SOCKET) { + CancelIoEx((HANDLE) s, &evt->olpd); + } +} + +static void +nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio) +{ + nni_plat_tcp_ep * ep = evt->ptr; + nni_plat_tcp_pipe *pipe; + SOCKET s; + int rv; + + s = ep->acc_s; + ep->acc_s = INVALID_SOCKET; + + if (s == INVALID_SOCKET) { + return; + } + + if (((rv = evt->status) != 0) || + ((rv = nni_win_iocp_register((HANDLE) s)) != 0) || + ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { + closesocket(s); + nni_aio_finish_error(aio, rv); + return; + } + + nni_aio_finish_pipe(aio, pipe); +} + +static int +nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) +{ + nni_plat_tcp_ep *ep = evt->ptr; + SOCKET s = ep->s; + SOCKET acc_s; + DWORD cnt; + + acc_s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (acc_s == INVALID_SOCKET) { + evt->status = nni_win_error(GetLastError()); + evt->count = 0; + return (1); + } + ep->acc_s = acc_s; + + if (!ep->acceptex(s, acc_s, ep->buf, 0, 256, 256, &cnt, &evt->olpd)) { + int rv = GetLastError(); + switch (rv) { + case ERROR_IO_PENDING: + // Normal asynchronous operation. Wait for + // completion. + return (0); + + default: + // Fast-fail (synchronous). + evt->status = nni_win_error(rv); + evt->count = 0; + return (1); + } + } + + // Synch completion right now. I/O completion packet delivered + // already. + return (0); +} + +void +nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) +{ + aio->a_pipe = NULL; + nni_win_event_submit(&ep->acc_ev, aio); +} + +static void +nni_win_tcp_con_cancel(nni_win_event *evt) +{ + nni_plat_tcp_ep *ep = evt->ptr; + SOCKET s = ep->s; + + if (s != INVALID_SOCKET) { + CancelIoEx((HANDLE) s, &evt->olpd); + } +} + +static void +nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio) +{ + nni_plat_tcp_ep * ep = evt->ptr; + nni_plat_tcp_pipe *pipe; + SOCKET s; + int rv; + + s = ep->s; + ep->s = INVALID_SOCKET; + + // The socket was already registered with the IOCP. + + if (((rv = evt->status) != 0) || + ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { + // The new pipe is already fine for us. Discard + // the old one, since failed to be able to use it. + closesocket(s); + nni_aio_finish_error(aio, rv); + return; + } + + aio->a_pipe = pipe; + nni_aio_finish(aio, 0, 0); +} + +static int +nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) +{ + nni_plat_tcp_ep *ep = evt->ptr; + SOCKET s; + SOCKADDR_STORAGE bss; + int len; + int rv; + int family; + + if (ep->loclen > 0) { + family = ep->locaddr.ss_family; + } else { + family = ep->remaddr.ss_family; + } + + s = socket(family, SOCK_STREAM, IPPROTO_TCP); + if (s == INVALID_SOCKET) { + evt->status = nni_win_error(GetLastError()); + evt->count = 0; + return (1); + } + + nni_win_tcp_sockinit(s); + + // Windows ConnectEx requires the socket to be bound first. + if (ep->loclen > 0) { + bss = ep->locaddr; + len = ep->loclen; + } else { + ZeroMemory(&bss, sizeof(bss)); + bss.ss_family = ep->remaddr.ss_family; + len = ep->remlen; + } + if (bind(s, (struct sockaddr *) &bss, len) < 0) { + evt->status = nni_win_error(GetLastError()); + evt->count = 0; + closesocket(s); + + return (1); + } + // Register with the I/O completion port so we can get the + // events for the next call. + if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { + closesocket(s); + evt->status = rv; + evt->count = 0; + return (1); + } + + ep->s = s; + if (!ep->connectex(s, (struct sockaddr *) &ep->remaddr, ep->remlen, + NULL, 0, NULL, &evt->olpd)) { + if ((rv = GetLastError()) != ERROR_IO_PENDING) { + closesocket(s); + ep->s = INVALID_SOCKET; + evt->status = nni_win_error(rv); + evt->count = 0; + return (1); + } + } + return (0); +} + +extern void +nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) +{ + aio->a_pipe = NULL; + nni_win_event_submit(&ep->con_ev, aio); +} + +int +nni_win_tcp_sysinit(void) +{ + WSADATA data; + if (WSAStartup(MAKEWORD(2, 2), &data) != 0) { + NNI_ASSERT(LOBYTE(data.wVersion) == 2); + NNI_ASSERT(HIBYTE(data.wVersion) == 2); + return (nni_win_error(GetLastError())); + } + return (0); +} + +void +nni_win_tcp_sysfini(void) +{ + WSACleanup(); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index 12049139..41ba721c 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -154,16 +154,11 @@ nni_plat_init(int (*helper)(void)) AcquireSRWLockExclusive(&lock); if (!plat_inited) { - if ((rv = nni_win_iocp_sysinit()) != 0) { - goto out; - } - if ((rv = nni_win_ipc_sysinit()) != 0) { - goto out; - } - if ((rv = nni_win_tcp_sysinit()) != 0) { - goto out; - } - if ((rv = nni_win_resolv_sysinit()) != 0) { + if (((rv = nni_win_iocp_sysinit()) != 0) || + ((rv = nni_win_ipc_sysinit()) != 0) || + ((rv = nni_win_tcp_sysinit()) != 0) || + ((rv = nni_win_udp_sysinit()) != 0) || + ((rv = nni_win_resolv_sysinit()) != 0)) { goto out; } @@ -182,6 +177,7 @@ nni_plat_fini(void) { nni_win_resolv_sysfini(); nni_win_ipc_sysfini(); + nni_win_udp_sysfini(); nni_win_tcp_sysfini(); nni_win_iocp_sysfini(); WSACleanup(); diff --git a/src/platform/windows/win_udp.c b/src/platform/windows/win_udp.c new file mode 100644 index 00000000..678b4ae7 --- /dev/null +++ b/src/platform/windows/win_udp.c @@ -0,0 +1,303 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +// Silence complaints about inet_addr() +#define _WINSOCK_DEPRECATED_NO_WARNINGS 1 + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include + +struct nni_plat_udp { + SOCKET s; + nni_mtx lk; + nni_win_event rxev; + nni_win_event txev; + SOCKADDR_STORAGE rxsa; + SOCKADDR_STORAGE txsa; + int rxsalen; + int txsalen; +}; + +static int nni_win_udp_start_rx(nni_win_event *, nni_aio *); +static int nni_win_udp_start_tx(nni_win_event *, nni_aio *); +static void nni_win_udp_finish_rx(nni_win_event *, nni_aio *); +static void nni_win_udp_finish_tx(nni_win_event *, nni_aio *); +static void nni_win_udp_cancel(nni_win_event *); + +static nni_win_event_ops nni_win_udp_rxo = { + .wev_start = nni_win_udp_start_rx, + .wev_finish = nni_win_udp_finish_rx, + .wev_cancel = nni_win_udp_cancel, +}; + +static nni_win_event_ops nni_win_udp_txo = { + .wev_start = nni_win_udp_start_tx, + .wev_finish = nni_win_udp_finish_tx, + .wev_cancel = nni_win_udp_cancel, +}; + +// nni_plat_udp_open initializes a UDP socket, binding to the local +// address specified specified in the AIO. The remote address is +// not used. The resulting nni_plat_udp structure is returned in the +// the aio's a_pipe. +int +nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa) +{ + nni_plat_udp * u; + SOCKADDR_STORAGE ss; + int sslen; + DWORD no; + int rv; + + if ((sslen = nni_win_nn2sockaddr(&ss, sa)) < 0) { + return (NNG_EADDRINVAL); + } + + if ((u = NNI_ALLOC_STRUCT(u)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&u->lk); + + u->s = socket(ss.ss_family, SOCK_DGRAM, IPPROTO_UDP); + if (u->s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + nni_plat_udp_close(u); + return (rv); + } + // Don't inherit the handle (CLOEXEC really). + SetHandleInformation((HANDLE) u->s, HANDLE_FLAG_INHERIT, 0); + no = 0; + (void) setsockopt( + u->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); + + if (((rv = nni_win_event_init(&u->rxev, &nni_win_udp_rxo, u)) != 0) || + ((rv = nni_win_event_init(&u->txev, &nni_win_udp_txo, u)) != 0) || + ((rv = nni_win_iocp_register((HANDLE) u->s)) != 0)) { + nni_plat_udp_close(u); + return (rv); + } + + // Bind the local address + if (bind(u->s, (struct sockaddr *) &ss, sslen) == SOCKET_ERROR) { + rv = nni_win_error(GetLastError()); + nni_plat_udp_close(u); + return (rv); + } + + *udpp = u; + return (rv); +} + +// nni_plat_udp_close closes the underlying UDP socket. +void +nni_plat_udp_close(nni_plat_udp *u) +{ + if (u->s != INVALID_SOCKET) { + closesocket(u->s); + } + nni_win_event_fini(&u->rxev); + nni_win_event_fini(&u->txev); + nni_mtx_fini(&u->lk); + NNI_FREE_STRUCT(u); +} + +// nni_plat_udp_send sends the data in the aio to the the +// destination specified in the nni_aio. The iovs are the +// UDP payload. +void +nni_plat_udp_send(nni_plat_udp *u, nni_aio *aio) +{ + nni_win_event_submit(&u->txev, aio); +} + +// nni_plat_udp_pipe_recv recvs a message, storing it in the iovs +// from the UDP payload. If the UDP payload will not fit, then +// NNG_EMSGSIZE results. +void +nni_plat_udp_recv(nni_plat_udp *u, nni_aio *aio) +{ + nni_win_event_submit(&u->rxev, aio); +} + +static int +nni_win_udp_start_rx(nni_win_event *evt, nni_aio *aio) +{ + int rv; + SOCKET s; + WSABUF iov[4]; + DWORD niov; + DWORD flags; + nni_plat_udp *u = evt->ptr; + + if ((s = u->s) == INVALID_SOCKET) { + evt->status = NNG_ECLOSED; + evt->count = 0; + return (1); + } + + u->rxsalen = sizeof(SOCKADDR_STORAGE); + NNI_ASSERT(aio->a_niov > 0); + NNI_ASSERT(aio->a_niov <= 4); + NNI_ASSERT(aio->a_iov[0].iov_len > 0); + NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); + + niov = aio->a_niov; + + // Put the AIOs in Windows form. + for (int i = 0; i < aio->a_niov; i++) { + iov[i].buf = aio->a_iov[i].iov_buf; + iov[i].len = (ULONG) aio->a_iov[i].iov_len; + } + + // Note that the IOVs for the event were prepared on entry + // already. The actual aio's iov array we don't touch. + + evt->count = 0; + flags = 0; + + rv = WSARecvFrom(u->s, iov, niov, NULL, &flags, + (struct sockaddr *) &u->rxsa, &u->rxsalen, &evt->olpd, NULL); + + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + evt->status = nni_win_error(rv); + evt->count = 0; + return (1); + } + + // Wait for the I/O completion event. Note that when an I/O + // completes immediately, the I/O completion packet is still + // delivered. + return (0); +} + +static int +nni_win_udp_start_tx(nni_win_event *evt, nni_aio *aio) +{ + int rv; + SOCKET s; + WSABUF iov[4]; + DWORD niov; + nni_plat_udp *u = evt->ptr; + int salen; + + if ((s = u->s) == INVALID_SOCKET) { + evt->status = NNG_ECLOSED; + evt->count = 0; + return (1); + } + + if ((salen = nni_win_nn2sockaddr(&u->txsa, aio->a_addr)) < 0) { + evt->status = NNG_EADDRINVAL; + evt->count = 0; + return (1); + } + + NNI_ASSERT(aio->a_addr != NULL); + NNI_ASSERT(aio->a_niov > 0); + NNI_ASSERT(aio->a_niov <= 4); + NNI_ASSERT(aio->a_iov[0].iov_len > 0); + NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); + + niov = aio->a_niov; + + // Put the AIOs in Windows form. + for (int i = 0; i < aio->a_niov; i++) { + iov[i].buf = aio->a_iov[i].iov_buf; + iov[i].len = (ULONG) aio->a_iov[i].iov_len; + } + + // Note that the IOVs for the event were prepared on entry + // already. The actual aio's iov array we don't touch. + + evt->count = 0; + + rv = WSASendTo(u->s, iov, niov, NULL, 0, (struct sockaddr *) &u->txsa, + salen, &evt->olpd, NULL); + + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + evt->status = nni_win_error(rv); + evt->count = 0; + return (1); + } + + // Wait for the I/O completion event. Note that when an I/O + // completes immediately, the I/O completion packet is still + // delivered. + return (0); +} + +static void +nni_win_udp_cancel(nni_win_event *evt) +{ + nni_plat_udp *u = evt->ptr; + + (void) CancelIoEx((HANDLE) u->s, &evt->olpd); +} + +static void +nni_win_udp_finish_rx(nni_win_event *evt, nni_aio *aio) +{ + int rv; + size_t cnt; + nni_plat_udp *u = evt->ptr; + + cnt = evt->count; + if ((rv = evt->status) == 0) { + // convert address from Windows form... + if (aio->a_addr != NULL) { + if (nni_win_sockaddr2nn(aio->a_addr, &u->rxsa) != 0) { + rv = NNG_EADDRINVAL; + cnt = 0; + } + } + } + + // All done; hopefully successfully. + nni_aio_finish(aio, rv, cnt); +} + +static void +nni_win_udp_finish_tx(nni_win_event *evt, nni_aio *aio) +{ + int rv; + size_t cnt; + + cnt = evt->count; + rv = evt->status; + + nni_aio_finish(aio, rv, cnt); +} + +int +nni_win_udp_sysinit(void) +{ + WSADATA data; + if (WSAStartup(MAKEWORD(2, 2), &data) != 0) { + NNI_ASSERT(LOBYTE(data.wVersion) == 2); + NNI_ASSERT(HIBYTE(data.wVersion) == 2); + return (nni_win_error(GetLastError())); + } + return (0); +} + +void +nni_win_udp_sysfini(void) +{ + WSACleanup(); +} + +#endif // NNG_PLATFORM_WINDOWS -- cgit v1.2.3-70-g09d2