From b44e20c80c936a29bfeaf964ec94bc62ac0386f5 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 9 Jul 2018 09:59:46 -0700 Subject: fixes #523 dialers could support multiple outstanding dial requests fixes #179 DNS resolution should be done at connect time fixes #586 Windows IO completion port work could be better fixes #339 Windows iocp could use synchronous completions fixes #280 TCP abstraction improvements This is a rather monstrous set of changes, which refactors TCP, and the underlying Windows I/O completion path logic, in order to obtain a cleaner, simpler API, with support for asynchronous DNS lookups performed on connect rather than initialization time, the ability to have multiple connects or accepts pending, as well as fewer extraneous function calls. The Windows code also benefits from greatly reduced context switching, fewer lock operations performed, and a reduced number of system calls on the hot code path. (We use automatic event resetting instead of manual.) Some dead code was removed as well, and a few potential edge case leaks on failure paths (in the websocket code) were plugged. Note that all TCP based transports benefit from this work. The IPC code on Windows still uses the legacy IOCP for now, as does the UDP code (used for ZeroTier.) We will be converting those soon too. --- src/platform/posix/posix_aio.h | 3 +- src/platform/posix/posix_epdesc.c | 3 +- src/platform/posix/posix_pollq_poll.c | 2 +- src/platform/posix/posix_resolv_gai.c | 46 ++- src/platform/posix/posix_tcp.c | 109 ------ src/platform/posix/posix_tcp.h | 44 +++ src/platform/posix/posix_tcpconn.c | 410 ++++++++++++++++++++ src/platform/posix/posix_tcpdial.c | 234 +++++++++++ src/platform/posix/posix_tcplisten.c | 319 +++++++++++++++ src/platform/windows/win_impl.h | 24 ++ src/platform/windows/win_io.c | 152 ++++++++ src/platform/windows/win_resolv.c | 49 ++- src/platform/windows/win_tcp.c | 703 ---------------------------------- src/platform/windows/win_tcp.h | 67 ++++ src/platform/windows/win_tcpconn.c | 391 +++++++++++++++++++ src/platform/windows/win_tcpdial.c | 228 +++++++++++ src/platform/windows/win_tcplisten.c | 312 +++++++++++++++ src/platform/windows/win_thread.c | 8 +- 18 files changed, 2279 insertions(+), 825 deletions(-) create mode 100644 src/platform/posix/posix_tcp.h create mode 100644 src/platform/posix/posix_tcpconn.c create mode 100644 src/platform/posix/posix_tcpdial.c create mode 100644 src/platform/posix/posix_tcplisten.c create mode 100644 src/platform/windows/win_io.c create mode 100644 src/platform/windows/win_tcp.h create mode 100644 src/platform/windows/win_tcpconn.c create mode 100644 src/platform/windows/win_tcpdial.c create mode 100644 src/platform/windows/win_tcplisten.c (limited to 'src/platform') diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 83f2f1a8..3b1ce751 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -20,8 +20,8 @@ #include "core/nng_impl.h" #include "posix_pollq.h" +#include // needed for musl build #include // needed for mode_t -#include // needed for musl build typedef struct nni_posix_pipedesc nni_posix_pipedesc; typedef struct nni_posix_epdesc nni_posix_epdesc; @@ -48,5 +48,4 @@ extern int nni_posix_epdesc_listen(nni_posix_epdesc *); extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *); extern int nni_posix_epdesc_sockname(nni_posix_epdesc *, nni_sockaddr *); extern int nni_posix_epdesc_set_permissions(nni_posix_epdesc *, mode_t); - #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 74c92dbd..d796765a 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -188,13 +188,14 @@ nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_posix_epdesc *ed = arg; + NNI_ARG_UNUSED(pfd); + nni_mtx_lock(&ed->mtx); if (events & POLLNVAL) { nni_epdesc_doclose(ed); nni_mtx_unlock(&ed->mtx); return; } - NNI_ASSERT(pfd == ed->pfd); // Anything else will turn up in accept. nni_epdesc_doaccept(ed); diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index 26753df6..ec487de5 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -302,7 +302,7 @@ static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { nni_mtx_lock(&pq->mtx); - pq->closing = 1; + pq->closing = true; nni_mtx_unlock(&pq->mtx); nni_plat_pipe_raise(pq->wakewfd); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 8c003362..63d858c2 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -13,6 +13,7 @@ #ifdef NNG_USE_POSIX_RESOLV_GAI #include "platform/posix/posix_aio.h" +#include #include #include #include @@ -274,14 +275,14 @@ resolv_ip(const char *host, const char *serv, int passive, int family, } void -nni_plat_tcp_resolv( +nni_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void -nni_plat_udp_resolv( +nni_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); @@ -329,6 +330,47 @@ resolv_worker(void *notused) nni_mtx_unlock(&resolv_mtx); } +int +nni_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) +{ + const void *ap; + uint16_t port; + int af; + switch (sa->s_family) { + case NNG_AF_INET: + ap = &sa->s_in.sa_addr; + port = sa->s_in.sa_port; + af = AF_INET; + break; + case NNG_AF_INET6: + ap = &sa->s_in6.sa_addr; + port = sa->s_in6.sa_port; + af = AF_INET6; + break; + default: + return (NNG_EINVAL); + } + if (ipstr != NULL) { + if (af == AF_INET6) { + size_t l; + ipstr[0] = '['; + inet_ntop(af, ap, ipstr + 1, INET6_ADDRSTRLEN); + l = strlen(ipstr); + ipstr[l++] = ']'; + ipstr[l++] = '\0'; + } else { + inet_ntop(af, ap, ipstr, INET6_ADDRSTRLEN); + } + } + if (portstr != NULL) { +#ifdef NNG_LITTLE_ENDIAN + port = ((port >> 8) & 0xff) | ((port & 0xff) << 8); +#endif + snprintf(portstr, 6, "%u", port); + } + return (0); +} + int nni_posix_resolv_sysinit(void) { diff --git a/src/platform/posix/posix_tcp.c b/src/platform/posix/posix_tcp.c index cbc7a641..53ea8026 100644 --- a/src/platform/posix/posix_tcp.c +++ b/src/platform/posix/posix_tcp.c @@ -25,115 +25,6 @@ #include #include -int -nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa, - const nni_sockaddr *rsa, int mode) -{ - nni_posix_epdesc * ed; - int rv; - struct sockaddr_storage ss; - int len; - - if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) { - return (rv); - } - - if ((rsa != NULL) && (rsa->s_family != NNG_AF_UNSPEC)) { - len = nni_posix_nn2sockaddr((void *) &ss, rsa); - nni_posix_epdesc_set_remote(ed, &ss, len); - } - if ((lsa != NULL) && (lsa->s_family != NNG_AF_UNSPEC)) { - len = nni_posix_nn2sockaddr((void *) &ss, lsa); - nni_posix_epdesc_set_local(ed, &ss, len); - } - - *epp = (void *) ed; - return (0); -} - -void -nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_fini((void *) ep); -} - -void -nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_close((void *) ep); -} - -int -nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep, nng_sockaddr *bsa) -{ - int rv; - rv = nni_posix_epdesc_listen((void *) ep); - if ((rv == 0) && (bsa != NULL)) { - rv = nni_posix_epdesc_sockname((void *) ep, bsa); - } - return (rv); -} - -void -nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_connect((void *) ep, aio); -} - -void -nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_accept((void *) ep, aio); -} - -void -nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_fini((void *) p); -} - -void -nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_close((void *) p); -} - -void -nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_send((void *) p, aio); -} - -void -nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_recv((void *) p, aio); -} - -int -nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *p, nni_sockaddr *sa) -{ - return (nni_posix_pipedesc_peername((void *) p, sa)); -} - -int -nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *p, nni_sockaddr *sa) -{ - return (nni_posix_pipedesc_sockname((void *) p, sa)); -} - -int -nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *p, bool v) -{ - return (nni_posix_pipedesc_set_keepalive((void *) p, v)); -} - -int -nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *p, bool v) -{ - return (nni_posix_pipedesc_set_nodelay((void *) p, v)); -} - int nni_plat_tcp_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) { diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h new file mode 100644 index 00000000..aefce7f7 --- /dev/null +++ b/src/platform/posix/posix_tcp.h @@ -0,0 +1,44 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +struct nni_tcp_conn { + nni_posix_pfd * pfd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_aio * dial_aio; + nni_tcp_dialer *dialer; + nni_reap_item reap; +}; + +struct nni_tcp_dialer { + nni_list connq; // pending connections + bool closed; + nni_mtx mtx; +}; + +struct nni_tcp_listener { + nni_posix_pfd *pfd; + nni_list acceptq; + bool started; + bool closed; + nni_mtx mtx; +}; + +extern int nni_posix_tcp_conn_init(nni_tcp_conn **, nni_posix_pfd *); +extern void nni_posix_tcp_conn_start(nni_tcp_conn *); + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c new file mode 100644 index 00000000..30525648 --- /dev/null +++ b/src/platform/posix/posix_tcpconn.c @@ -0,0 +1,410 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef NNG_HAVE_ALLOCA +#include +#endif + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#include "posix_tcp.h" + +static void +tcp_conn_dowrite(nni_tcp_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->writeq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov * aiov; + struct msghdr hdr; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + memset(&hdr, 0, sizeof(hdr)); + nni_aio_get_iov(aio, &naiov, &aiov); + +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len > 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + hdr.msg_iovlen = niov; + hdr.msg_iov = iovec; + + if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + nni_aio_bump_count(aio, n); + // We completed the entire operation on this aio. + // (Sendmsg never returns a partial result.) + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +static void +tcp_conn_doread(nni_tcp_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->readq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov *aiov; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + nni_aio_get_iov(aio, &naiov, &aiov); +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + if (n == 0) { + // No bytes indicates a closed descriptor. + // This implicitly completes this (all!) aio. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + continue; + } + + nni_aio_bump_count(aio, n); + + // We completed the entire operation on this aio. + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +void +nni_tcp_conn_close(nni_tcp_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + nni_aio *aio; + c->closed = true; + while (((aio = nni_list_first(&c->readq)) != NULL) || + ((aio = nni_list_first(&c->writeq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_posix_pfd_close(c->pfd); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_tcp_conn *c = arg; + + if (events & (POLLHUP | POLLERR | POLLNVAL)) { + nni_tcp_conn_close(c); + return; + } + nni_mtx_lock(&c->mtx); + if (events & POLLIN) { + tcp_conn_doread(c); + } + if (events & POLLOUT) { + tcp_conn_dowrite(c); + } + events = 0; + if (!nni_list_empty(&c->writeq)) { + events |= POLLOUT; + } + if (!nni_list_empty(&c->readq)) { + events |= POLLIN; + } + if ((!c->closed) && (events != 0)) { + nni_posix_pfd_arm(pfd, events); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_conn_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) +{ + + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->writeq, aio); + + if (nni_list_first(&c->writeq) == aio) { + tcp_conn_dowrite(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->writeq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLOUT); + } + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->readq, aio); + + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. + if (nni_list_first(&c->readq) == aio) { + tcp_conn_doread(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->readq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLIN); + } + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa) +{ + struct sockaddr_storage ss; + socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(c->pfd); + + if (getpeername(fd, (void *) &ss, &sslen) != 0) { + return (nni_plat_errno(errno)); + } + return (nni_posix_sockaddr2nn(sa, &ss)); +} + +int +nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa) +{ + struct sockaddr_storage ss; + socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(c->pfd); + + if (getsockname(fd, (void *) &ss, &sslen) != 0) { + return (nni_plat_errno(errno)); + } + return (nni_posix_sockaddr2nn(sa, &ss)); +} + +int +nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep) +{ + int val = keep ? 1 : 0; + int fd = nni_posix_pfd_fd(c->pfd); + + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { + return (nni_plat_errno(errno)); + } + return (0); +} + +int +nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay) +{ + + int val = nodelay ? 1 : 0; + int fd = nni_posix_pfd_fd(c->pfd); + + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { + return (nni_plat_errno(errno)); + } + return (0); +} + +int +nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd) +{ + nni_tcp_conn *c; + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + + c->closed = false; + c->pfd = pfd; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + *cp = c; + return (0); +} + +void +nni_posix_tcp_conn_start(nni_tcp_conn *c) +{ + nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c); +} + +void +nni_tcp_conn_fini(nni_tcp_conn *c) +{ + nni_tcp_conn_close(c); + nni_posix_pfd_fini(c->pfd); + c->pfd = NULL; + nni_mtx_fini(&c->mtx); + + NNI_FREE_STRUCT(c); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c new file mode 100644 index 00000000..7f627361 --- /dev/null +++ b/src/platform/posix/posix_tcpdial.c @@ -0,0 +1,234 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_tcp.h" + +// Dialer stuff. +int +nni_tcp_dialer_init(nni_tcp_dialer **dp) +{ + nni_tcp_dialer *d; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&d->mtx); + d->closed = false; + nni_aio_list_init(&d->connq); + *dp = d; + return (0); +} + +void +nni_tcp_dialer_close(nni_tcp_dialer *d) +{ + nni_mtx_lock(&d->mtx); + if (!d->closed) { + nni_aio *aio; + d->closed = true; + while ((aio = nni_list_first(&d->connq)) != NULL) { + nni_tcp_conn *c; + nni_list_remove(&d->connq, aio); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_tcp_conn_close(c); + nni_reap( + &c->reap, (nni_cb) nni_tcp_conn_fini, c); + } + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } + nni_mtx_unlock(&d->mtx); +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_close(d); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +tcp_dialer_cancel(nni_aio *aio, int rv) +{ + nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_conn * c; + + nni_mtx_lock(&d->mtx); + if ((!nni_aio_list_active(aio)) || + ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) { + nni_mtx_unlock(&d->mtx); + return; + } + nni_aio_list_remove(aio); + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + nni_aio_finish_error(aio, rv); + nni_tcp_conn_fini(c); +} + +static void +tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg) +{ + nni_tcp_conn * c = arg; + nni_tcp_dialer *d = c->dialer; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + aio = c->dial_aio; + if ((aio == NULL) || (!nni_aio_list_active(aio))) { + nni_mtx_unlock(&d->mtx); + return; + } + + if (ev & POLLNVAL) { + rv = EBADF; + + } else { + socklen_t sz = sizeof(int); + int fd = nni_posix_pfd_fd(pfd); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + if (rv == EINPROGRESS) { + // Connection still in progress, come back + // later. + nni_mtx_unlock(&d->mtx); + return; + } else if (rv != 0) { + rv = nni_plat_errno(rv); + } + } + + c->dial_aio = NULL; + nni_aio_list_remove(aio); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + if (rv != 0) { + nni_tcp_conn_close(c); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +// We don't give local address binding support. Outbound dialers always +// get an ephemeral port. +void +nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + nni_tcp_conn * c; + nni_posix_pfd * pfd = NULL; + struct sockaddr_storage ss; + size_t sslen; + int fd; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_aio_finish_error(aio, nni_plat_errno(errno)); + return; + } + + // This arranges for the fd to be in nonblocking mode, and adds the + // pollfd to the list. + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + (void) close(fd); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_finish_error(aio, rv); + return; + } + c->dialer = d; + nni_posix_pfd_set_cb(pfd, tcp_dialer_cb, c); + + nni_mtx_lock(&d->mtx); + if (d->closed) { + rv = NNG_ECLOSED; + goto error; + } + if ((rv = nni_aio_schedule(aio, tcp_dialer_cancel, d)) != 0) { + goto error; + } + if ((rv = connect(fd, (void *) &ss, sslen)) != 0) { + if (errno != EINPROGRESS) { + rv = nni_plat_errno(errno); + goto error; + } + // Asynchronous connect. + if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) { + goto error; + } + c->dial_aio = aio; + nni_aio_set_prov_extra(aio, 0, c); + nni_list_append(&d->connq, aio); + nni_mtx_unlock(&d->mtx); + return; + } + // Immediate connect, cool! This probably only happens + // on loopback, and probably not on every platform. + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + return; + +error: + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_reap(&c->reap, (nni_cb) nni_tcp_conn_fini, c); + nni_aio_finish_error(aio, rv); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c new file mode 100644 index 00000000..cdcbecd9 --- /dev/null +++ b/src/platform/posix/posix_tcplisten.c @@ -0,0 +1,319 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_tcp.h" + +int +nni_tcp_listener_init(nni_tcp_listener **lp) +{ + nni_tcp_listener *l; + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_init(&l->mtx); + + l->pfd = NULL; + l->closed = false; + l->started = false; + + nni_aio_list_init(&l->acceptq); + *lp = l; + return (0); +} + +static void +tcp_listener_doclose(nni_tcp_listener *l) +{ + nni_aio *aio; + + l->closed = true; + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + + if (l->pfd != NULL) { + nni_posix_pfd_close(l->pfd); + } +} + +void +nni_tcp_listener_close(nni_tcp_listener *l) +{ + nni_mtx_lock(&l->mtx); + tcp_listener_doclose(l); + nni_mtx_unlock(&l->mtx); +} + +static void +tcp_listener_doaccept(nni_tcp_listener *l) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + int newfd; + int fd; + int rv; + nni_posix_pfd *pfd; + nni_tcp_conn * c; + + fd = nni_posix_pfd_fd(l->pfd); + +#ifdef NNG_USE_ACCEPT4 + newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); + if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { + newfd = accept(fd, NULL, NULL); + } +#else + newfd = accept(fd, NULL, NULL); +#endif + if (newfd < 0) { + switch (errno) { + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + rv = nni_posix_pfd_arm(l->pfd, POLLIN); + if (rv != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + // Come back later... + return; + case ECONNABORTED: + case ECONNRESET: + // Eat them, they aren't interesting. + continue; + default: + // Error this one, but keep moving to the next. + rv = nni_plat_errno(errno); + NNI_ASSERT(rv != 0); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + } + + if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + close(newfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + nni_aio_list_remove(aio); + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +static void +tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_tcp_listener *l = arg; + + nni_mtx_lock(&l->mtx); + if (events & POLLNVAL) { + tcp_listener_doclose(l); + nni_mtx_unlock(&l->mtx); + return; + } + NNI_ASSERT(pfd == l->pfd); + + // Anything else will turn up in accept. + tcp_listener_doaccept(l); + nni_mtx_unlock(&l->mtx); +} + +static void +tcp_listener_cancel(nni_aio *aio, int rv) +{ + nni_tcp_listener *l = nni_aio_get_prov_data(aio); + + // This is dead easy, because we'll ignore the completion if there + // isn't anything to do the accept on! + NNI_ASSERT(rv != 0); + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +int +nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) +{ + socklen_t len; + struct sockaddr_storage ss; + int rv; + int fd; + nni_posix_pfd * pfd; + + if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) { + return (NNG_EADDRINVAL); + } + + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_ESTATE); + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_mtx_unlock(&l->mtx); + return (nni_plat_errno(errno)); + } + + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + +// On the Windows Subsystem for Linux, SO_REUSEADDR behaves like Windows +// SO_REUSEADDR, which is almost completely different (and wrong!) from +// traditional SO_REUSEADDR. +#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL) + { + int on = 1; + // If for some reason this doesn't work, it's probably ok. + // Second bind will fail. + (void) setsockopt( + fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + } +#endif + + if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + + // Listen -- 128 depth is probably sufficient. If it isn't, other + // bad things are going to happen. + if (listen(fd, 128) != 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + + // Lets get the bound sockname, and pass that back to the caller. + // This permits ephemeral port binding to work. + // If this fails for some reason, we just don't update the + // sockaddr structure. This is kind of suboptimal, but failures + // here should never occur. + len = sizeof(ss); + (void) getsockname(fd, (void *) &ss, &len); + (void) nni_posix_sockaddr2nn(sa, &ss); + + nni_posix_pfd_set_cb(pfd, tcp_listener_cb, l); + + l->pfd = pfd; + l->started = true; + nni_mtx_unlock(&l->mtx); + + return (0); +} + +void +nni_tcp_listener_fini(nni_tcp_listener *l) +{ + nni_posix_pfd *pfd; + + nni_mtx_lock(&l->mtx); + tcp_listener_doclose(l); + pfd = l->pfd; + nni_mtx_unlock(&l->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +void +nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) +{ + int rv; + + // Accept is simpler than the connect case. With accept we just + // need to wait for the socket to be readable to indicate an incoming + // connection is ready for us. There isn't anything else for us to + // do really, as that will have been done in listen. + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + + if (!l->started) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&l->acceptq, aio); + if (nni_list_first(&l->acceptq) == aio) { + tcp_listener_doaccept(l); + } + nni_mtx_unlock(&l->mtx); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index 263a322b..93e45423 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -80,6 +80,17 @@ struct nni_win_event { nni_win_event_ops ops; }; +typedef struct nni_win_io nni_win_io; +typedef void (*nni_win_io_cb)(nni_win_io *, int, size_t); + +struct nni_win_io { + OVERLAPPED olpd; + HANDLE f; + void * ptr; + nni_aio * aio; + nni_win_io_cb cb; +}; + struct nni_plat_flock { HANDLE h; }; @@ -94,6 +105,13 @@ extern void nni_win_event_complete(nni_win_event *, int); extern int nni_win_iocp_register(HANDLE); +extern int nni_win_tcp_conn_init(nni_tcp_conn **, SOCKET); +extern void nni_win_tcp_conn_set_addrs( + nni_tcp_conn *, const SOCKADDR_STORAGE *, const SOCKADDR_STORAGE *); + +extern int nni_win_io_sysinit(void); +extern void nni_win_io_sysfini(void); + extern int nni_win_iocp_sysinit(void); extern void nni_win_iocp_sysfini(void); @@ -109,6 +127,12 @@ extern void nni_win_udp_sysfini(void); extern int nni_win_resolv_sysinit(void); extern void nni_win_resolv_sysfini(void); +extern int nni_win_io_init(nni_win_io *, HANDLE, nni_win_io_cb, void *); +extern void nni_win_io_fini(nni_win_io *); +extern void nni_win_io_cancel(nni_win_io *); + +extern int nni_win_io_register(HANDLE); + extern int nni_win_sockaddr2nn(nni_sockaddr *, const SOCKADDR_STORAGE *); extern int nni_win_nn2sockaddr(SOCKADDR_STORAGE *, const nni_sockaddr *); diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c new file mode 100644 index 00000000..1179b603 --- /dev/null +++ b/src/platform/windows/win_io.c @@ -0,0 +1,152 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include + +// Windows IO Completion Port support. We basically create a single +// IO completion port, then start threads on it. Handles are added +// to the port on an as needed basis. We use a single IO completion +// port for pretty much everything. + +static int win_io_nthr = 0; +static HANDLE win_io_h = NULL; +static nni_thr *win_io_thrs; + +static void +win_io_handler(void *arg) +{ + NNI_ARG_UNUSED(arg); + + for (;;) { + DWORD cnt; + BOOL ok; + nni_win_io *item; + OVERLAPPED *olpd = NULL; + ULONG_PTR key = 0; + int rv; + + ok = GetQueuedCompletionStatus( + win_io_h, &cnt, &key, &olpd, INFINITE); + + if (olpd == NULL) { + // Completion port closed... + NNI_ASSERT(ok == FALSE); + break; + } + + item = CONTAINING_RECORD(olpd, nni_win_io, olpd); + rv = ok ? 0 : nni_win_error(GetLastError()); + item->cb(item, rv, (size_t) cnt); + } +} + +int +nni_win_io_register(HANDLE h) +{ + if (CreateIoCompletionPort(h, win_io_h, 0, 0) == NULL) { + return (nni_win_error(GetLastError())); + } + return (0); +} + +int +nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr) +{ + ZeroMemory(&io->olpd, sizeof(io->olpd)); + + io->cb = cb; + io->ptr = ptr; + io->aio = NULL; + io->f = f; + io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); + if (io->olpd.hEvent == NULL) { + return (nni_win_error(GetLastError())); + } + return (0); +} + +void +nni_win_io_cancel(nni_win_io *io) +{ + if (io->f != INVALID_HANDLE_VALUE) { + CancelIoEx(io->f, &io->olpd); + } +} + +void +nni_win_io_fini(nni_win_io *io) +{ + if (io->olpd.hEvent != NULL) { + CloseHandle((HANDLE) io->olpd.hEvent); + } +} + +int +nni_win_io_sysinit(void) +{ + HANDLE h; + int i; + int rv; + int nthr = nni_plat_ncpu() * 2; + + // Limits on the thread count. This is fairly arbitrary. + if (nthr < 4) { + nthr = 4; + } + if (nthr > 64) { + nthr = 64; + } + if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) { + return (NNG_ENOMEM); + } + win_io_nthr = nthr; + + h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nthr); + if (h == NULL) { + return (nni_win_error(GetLastError())); + } + win_io_h = h; + + for (i = 0; i < win_io_nthr; i++) { + rv = nni_thr_init(&win_io_thrs[i], win_io_handler, NULL); + if (rv != 0) { + goto fail; + } + } + for (i = 0; i < win_io_nthr; i++) { + nni_thr_run(&win_io_thrs[i]); + } + return (0); + +fail: + nni_win_io_sysfini(); + return (rv); +} + +void +nni_win_io_sysfini(void) +{ + int i; + HANDLE h; + + if ((h = win_io_h) != NULL) { + CloseHandle(h); + win_io_h = NULL; + } + for (i = 0; i < win_io_nthr; i++) { + nni_thr_fini(&win_io_thrs[i]); + } +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index fb9d6751..e2cd192e 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -69,7 +69,7 @@ resolv_cancel(nni_aio *aio, int rv) } static int -resolv_gai_errno(int rv) +resolv_errno(int rv) { switch (rv) { case 0: @@ -116,7 +116,7 @@ resolv_task(resolv_item *item) hints.ai_family = item->family; if ((rv = getaddrinfo(item->name, "80", &hints, &results)) != 0) { - rv = resolv_gai_errno(rv); + rv = resolv_errno(rv); goto done; } @@ -246,14 +246,14 @@ resolv_ip(const char *host, const char *serv, int passive, int family, } void -nni_plat_tcp_resolv( +nni_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void -nni_plat_udp_resolv( +nni_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); @@ -301,6 +301,47 @@ resolv_worker(void *notused) nni_mtx_unlock(&resolv_mtx); } +int +nni_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) +{ + void * ap; + uint16_t port; + int af; + switch (sa->s_family) { + case NNG_AF_INET: + ap = (void *) &sa->s_in.sa_addr; + port = sa->s_in.sa_port; + af = AF_INET; + break; + case NNG_AF_INET6: + ap = (void *) &sa->s_in6.sa_addr; + port = sa->s_in6.sa_port; + af = AF_INET6; + break; + default: + return (NNG_EINVAL); + } + if (ipstr != NULL) { + if (af == AF_INET6) { + size_t l; + ipstr[0] = '['; + InetNtopA(af, ap, ipstr + 1, INET6_ADDRSTRLEN); + l = strlen(ipstr); + ipstr[l++] = ']'; + ipstr[l++] = '\0'; + } else { + InetNtopA(af, ap, ipstr, INET6_ADDRSTRLEN); + } + } + if (portstr != NULL) { +#ifdef NNG_LITTLE_ENDIAN + port = ((port >> 8) & 0xff) | ((port & 0xff) << 8); +#endif + snprintf(portstr, 6, "%u", port); + } + return (0); +} + int nni_win_resolv_sysinit(void) { diff --git a/src/platform/windows/win_tcp.c b/src/platform/windows/win_tcp.c index 33c4a1a5..2ab6be0f 100644 --- a/src/platform/windows/win_tcp.c +++ b/src/platform/windows/win_tcp.c @@ -15,709 +15,6 @@ #include #include -struct nni_plat_tcp_pipe { - SOCKET s; - nni_win_event rcv_ev; - nni_win_event snd_ev; - SOCKADDR_STORAGE sockname; - SOCKADDR_STORAGE peername; -}; - -struct nni_plat_tcp_ep { - SOCKET s; - SOCKET acc_s; - nni_win_event con_ev; - nni_win_event acc_ev; - int started; - int bound; - - SOCKADDR_STORAGE remaddr; - int remlen; - SOCKADDR_STORAGE locaddr; - int loclen; - - char buf[512]; // to hold acceptex results - - // We have to lookup some function pointers using ioctls. Winsock, - // gotta love it. Especially I love that asynch accept means that - // getsockname and getpeername don't work. - LPFN_CONNECTEX connectex; - LPFN_ACCEPTEX acceptex; - LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs; -}; - -static int nni_win_tcp_pipe_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_pipe_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_pipe_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_pipe_ops = { - .wev_start = nni_win_tcp_pipe_start, - .wev_finish = nni_win_tcp_pipe_finish, - .wev_cancel = nni_win_tcp_pipe_cancel, -}; - -static int nni_win_tcp_acc_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_acc_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_acc_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_acc_ops = { - .wev_start = nni_win_tcp_acc_start, - .wev_finish = nni_win_tcp_acc_finish, - .wev_cancel = nni_win_tcp_acc_cancel, -}; - -static int nni_win_tcp_con_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_con_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_con_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_con_ops = { - .wev_start = nni_win_tcp_con_start, - .wev_finish = nni_win_tcp_con_finish, - .wev_cancel = nni_win_tcp_con_cancel, -}; - -static void -nni_win_tcp_sockinit(SOCKET s) -{ - BOOL yes; - DWORD no; - - // Don't inherit the handle (CLOEXEC really). - SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0); - - no = 0; - (void) setsockopt( - s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); - - // Also disable Nagle. We are careful to group data with WSASend, - // and latency is king for most of our users. (Consider adding - // a method to enable this later.) - yes = 1; - (void) setsockopt( - s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); -} - -static int -nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) -{ - int rv; - SOCKET s; - DWORD niov; - DWORD flags; - nni_plat_tcp_pipe *pipe = evt->ptr; - unsigned i; - unsigned naiov; - nni_iov * aiov; - WSABUF * iov; - - nni_aio_get_iov(aio, &naiov, &aiov); - iov = _malloca(naiov * sizeof(*iov)); - - // Put the AIOs in Windows form. - for (niov = 0, i = 0; i < naiov; i++) { - if (aiov[i].iov_len != 0) { - iov[niov].buf = aiov[i].iov_buf; - iov[niov].len = (ULONG) aiov[i].iov_len; - niov++; - } - } - - if ((s = pipe->s) == INVALID_SOCKET) { - _freea(iov); - evt->status = NNG_ECLOSED; - evt->count = 0; - return (1); - } - - // Note that the IOVs for the event were prepared on entry already. - // The actual aio's iov array we don't touch. - - evt->count = 0; - flags = 0; - if (evt == &pipe->snd_ev) { - rv = WSASend(s, iov, niov, NULL, flags, &evt->olpd, NULL); - } else { - rv = WSARecv(s, iov, niov, NULL, &flags, &evt->olpd, NULL); - } - _freea(iov); - - if ((rv == SOCKET_ERROR) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - - // Wait for the I/O completion event. Note that when an I/O - // completes immediately, the I/O completion packet is still - // delivered. - return (0); -} - -static void -nni_win_tcp_pipe_cancel(nni_win_event *evt) -{ - nni_plat_tcp_pipe *pipe = evt->ptr; - - (void) CancelIoEx((HANDLE) pipe->s, &evt->olpd); -} - -static void -nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio) -{ - if ((evt->status == 0) && (evt->count == 0)) { - // Windows sometimes returns a zero read. Convert these - // into an NNG_ECLOSED. (We are never supposed to come - // back with zero length read.) - evt->status = NNG_ECLOSED; - } - nni_aio_finish(aio, evt->status, evt->count); -} - -static int -nni_win_tcp_pipe_init(nni_plat_tcp_pipe **pipep, SOCKET s) -{ - nni_plat_tcp_pipe *pipe; - int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_tcp_pipe_ops, pipe); - if (rv != 0) { - nni_plat_tcp_pipe_fini(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->snd_ev, &nni_win_tcp_pipe_ops, pipe); - if (rv != 0) { - nni_plat_tcp_pipe_fini(pipe); - return (rv); - } - nni_win_tcp_sockinit(s); - pipe->s = s; - *pipep = pipe; - return (0); -} - -void -nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->snd_ev, aio); -} - -void -nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->rcv_ev, aio); -} - -void -nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *pipe) -{ - SOCKET s; - - nni_win_event_close(&pipe->rcv_ev); - - if ((s = pipe->s) != INVALID_SOCKET) { - pipe->s = INVALID_SOCKET; - closesocket(s); - } -} - -int -nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *pipe, nni_sockaddr *sa) -{ - if (nni_win_sockaddr2nn(sa, &pipe->peername) < 0) { - return (NNG_EADDRINVAL); - } - return (0); -} - -int -nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *pipe, nni_sockaddr *sa) -{ - if (nni_win_sockaddr2nn(sa, &pipe->sockname) < 0) { - return (NNG_EADDRINVAL); - } - return (0); -} - -int -nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *pipe, bool val) -{ - BOOL b; - b = val ? TRUE : FALSE; - if (setsockopt(pipe->s, IPPROTO_TCP, TCP_NODELAY, (void *) &b, - sizeof(b)) != 0) { - return (nni_win_error(WSAGetLastError())); - } - return (0); -} - -int -nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *pipe, bool val) -{ - BOOL b; - b = val ? TRUE : FALSE; - if (setsockopt(pipe->s, SOL_SOCKET, SO_KEEPALIVE, (void *) &b, - sizeof(b)) != 0) { - return (nni_win_error(WSAGetLastError())); - } - return (0); -} - -void -nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *pipe) -{ - nni_plat_tcp_pipe_close(pipe); - - nni_win_event_fini(&pipe->snd_ev); - nni_win_event_fini(&pipe->rcv_ev); - NNI_FREE_STRUCT(pipe); -} - -int -nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa, - const nni_sockaddr *rsa, int mode) -{ - nni_plat_tcp_ep *ep; - int rv; - SOCKET s; - DWORD nbytes; - GUID guid1 = WSAID_CONNECTEX; - GUID guid2 = WSAID_ACCEPTEX; - GUID guid3 = WSAID_GETACCEPTEXSOCKADDRS; - - NNI_ARG_UNUSED(mode); - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } - ZeroMemory(ep, sizeof(*ep)); - - ep->s = INVALID_SOCKET; - - if ((rsa != NULL) && (rsa->s_family != NNG_AF_UNSPEC)) { - ep->remlen = nni_win_nn2sockaddr(&ep->remaddr, rsa); - } - if ((lsa != NULL) && (lsa->s_family != NNG_AF_UNSPEC)) { - ep->loclen = nni_win_nn2sockaddr(&ep->locaddr, lsa); - } - - // Create a scratch socket for use with ioctl. - s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - // Look up the function pointer. - if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, - sizeof(guid1), &ep->connectex, sizeof(ep->connectex), &nbytes, - NULL, NULL) == SOCKET_ERROR) { - rv = nni_win_error(GetLastError()); - goto fail; - } - if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, - sizeof(guid2), &ep->acceptex, sizeof(ep->acceptex), &nbytes, - NULL, NULL) == SOCKET_ERROR) { - rv = nni_win_error(GetLastError()); - goto fail; - } - if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid3, - sizeof(guid3), &ep->getacceptexsockaddrs, - sizeof(ep->getacceptexsockaddrs), &nbytes, NULL, - NULL) == SOCKET_ERROR) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - closesocket(s); - s = INVALID_SOCKET; - - // Now initialize the win events for later use. - rv = nni_win_event_init(&ep->acc_ev, &nni_win_tcp_acc_ops, ep); - if (rv != 0) { - goto fail; - } - rv = nni_win_event_init(&ep->con_ev, &nni_win_tcp_con_ops, ep); - if (rv != 0) { - goto fail; - } - - *epp = ep; - return (0); - -fail: - if (s != INVALID_SOCKET) { - closesocket(s); - } - nni_plat_tcp_ep_fini(ep); - return (rv); -} - -void -nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) -{ - nni_win_event_close(&ep->acc_ev); - nni_win_event_close(&ep->con_ev); - if (ep->s != INVALID_SOCKET) { - closesocket(ep->s); - ep->s = INVALID_SOCKET; - } - if (ep->acc_s != INVALID_SOCKET) { - closesocket(ep->acc_s); - } -} - -void -nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) -{ - nni_plat_tcp_ep_close(ep); - NNI_FREE_STRUCT(ep); -} - -static int -nni_win_tcp_listen(nni_plat_tcp_ep *ep, nni_sockaddr *bsa) -{ - int rv; - BOOL yes; - SOCKET s; - - if (ep->started) { - return (NNG_EBUSY); - } - - s = socket(ep->locaddr.ss_family, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - nni_win_tcp_sockinit(s); - - if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { - goto fail; - } - - // Make sure that we use the address exclusively. Windows lets - // others hijack us by default. - yes = 1; - - rv = setsockopt( - s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, sizeof(yes)); - if (rv != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - if (bind(s, (struct sockaddr *) &ep->locaddr, ep->loclen) != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - if (bsa != NULL) { - SOCKADDR_STORAGE bound; - int len = sizeof(bound); - rv = getsockname(s, (SOCKADDR *) &bound, &len); - if (rv != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - nni_win_sockaddr2nn(bsa, &bound); - } - - if (listen(s, SOMAXCONN) != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - ep->s = s; - ep->started = 1; - - return (0); - -fail: - if (s != INVALID_SOCKET) { - closesocket(s); - } - return (rv); -} - -int -nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep, nng_sockaddr *bsa) -{ - int rv; - - nni_mtx_lock(&ep->acc_ev.mtx); - rv = nni_win_tcp_listen(ep, bsa); - nni_mtx_unlock(&ep->acc_ev.mtx); - return (rv); -} - -static void -nni_win_tcp_acc_cancel(nni_win_event *evt) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - - if (s != INVALID_SOCKET) { - CancelIoEx((HANDLE) s, &evt->olpd); - } -} - -static void -nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep * ep = evt->ptr; - nni_plat_tcp_pipe *pipe; - SOCKET s; - int rv; - int len1; - int len2; - SOCKADDR * sa1; - SOCKADDR * sa2; - - s = ep->acc_s; - ep->acc_s = INVALID_SOCKET; - - if (s == INVALID_SOCKET) { - return; - } - - if (((rv = evt->status) != 0) || - ((rv = nni_win_iocp_register((HANDLE) s)) != 0) || - ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { - closesocket(s); - nni_aio_finish_error(aio, rv); - return; - } - - // Collect the local and peer addresses, because normal getsockname - // and getpeername don't work with AcceptEx. - len1 = (int) sizeof(pipe->sockname); - len2 = (int) sizeof(pipe->peername); - ep->getacceptexsockaddrs( - ep->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2); - NNI_ASSERT(len1 > 0); - NNI_ASSERT(len1 < (int) sizeof(SOCKADDR_STORAGE)); - NNI_ASSERT(len2 > 0); - NNI_ASSERT(len2 < (int) sizeof(SOCKADDR_STORAGE)); - memcpy(&pipe->sockname, sa1, len1); - memcpy(&pipe->peername, sa2, len2); - - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); -} - -static int -nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - SOCKET acc_s; - DWORD cnt; - - NNI_ARG_UNUSED(aio); - - acc_s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); - if (acc_s == INVALID_SOCKET) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - return (1); - } - ep->acc_s = acc_s; - - if (!ep->acceptex(s, acc_s, ep->buf, 0, 256, 256, &cnt, &evt->olpd)) { - int rv = GetLastError(); - switch (rv) { - case ERROR_IO_PENDING: - // Normal asynchronous operation. Wait for - // completion. - return (0); - - default: - // Fast-fail (synchronous). - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - } - - // Synch completion right now. I/O completion packet delivered - // already. - return (0); -} - -void -nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_win_event_submit(&ep->acc_ev, aio); -} - -static void -nni_win_tcp_con_cancel(nni_win_event *evt) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - - if (s != INVALID_SOCKET) { - CancelIoEx((HANDLE) s, &evt->olpd); - } -} - -static void -nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep * ep = evt->ptr; - nni_plat_tcp_pipe *pipe; - SOCKET s; - int rv; - DWORD yes = 1; - int len; - - s = ep->s; - ep->s = INVALID_SOCKET; - - // The socket was already registered with the IOCP. - - if (((rv = evt->status) != 0) || - ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { - // The new pipe is already fine for us. Discard - // the old one, since failed to be able to use it. - closesocket(s); - nni_aio_finish_error(aio, rv); - return; - } - - (void) setsockopt(s, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, - (char *) &yes, sizeof(yes)); - - // Windows seems to be unable to get peernames for sockets on - // connect - perhaps because we supplied it already with connectex. - // Rather than debugging it, just steal the address from the endpoint. - memcpy(&pipe->peername, &ep->remaddr, ep->remlen); - - len = sizeof(pipe->sockname); - (void) getsockname(s, (SOCKADDR *) &pipe->sockname, &len); - - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); -} - -static int -nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s; - SOCKADDR_STORAGE bss; - int len; - int rv; - int family; - - NNI_ARG_UNUSED(aio); - - if (ep->loclen > 0) { - family = ep->locaddr.ss_family; - } else { - family = ep->remaddr.ss_family; - } - - s = socket(family, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - return (1); - } - - nni_win_tcp_sockinit(s); - - // Windows ConnectEx requires the socket to be bound first. - if (ep->loclen > 0) { - bss = ep->locaddr; - len = ep->loclen; - } else { - ZeroMemory(&bss, sizeof(bss)); - bss.ss_family = ep->remaddr.ss_family; - len = ep->remlen; - } - if (bind(s, (struct sockaddr *) &bss, len) < 0) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - closesocket(s); - - return (1); - } - // Register with the I/O completion port so we can get the - // events for the next call. - if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { - closesocket(s); - evt->status = rv; - evt->count = 0; - return (1); - } - - ep->s = s; - if (!ep->connectex(s, (struct sockaddr *) &ep->remaddr, ep->remlen, - NULL, 0, NULL, &evt->olpd)) { - if ((rv = GetLastError()) != ERROR_IO_PENDING) { - closesocket(s); - ep->s = INVALID_SOCKET; - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - } - return (0); -} - -extern void -nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_win_event_submit(&ep->con_ev, aio); -} - -int -nni_plat_tcp_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) -{ - void * ap; - uint16_t port; - int af; - switch (sa->s_family) { - case NNG_AF_INET: - ap = (void *) &sa->s_in.sa_addr; - port = sa->s_in.sa_port; - af = AF_INET; - break; - case NNG_AF_INET6: - ap = (void *) &sa->s_in6.sa_addr; - port = sa->s_in6.sa_port; - af = AF_INET6; - break; - default: - return (NNG_EINVAL); - } - if (ipstr != NULL) { - if (af == AF_INET6) { - size_t l; - ipstr[0] = '['; - InetNtopA(af, ap, ipstr + 1, INET6_ADDRSTRLEN); - l = strlen(ipstr); - ipstr[l++] = ']'; - ipstr[l++] = '\0'; - } else { - InetNtopA(af, ap, ipstr, INET6_ADDRSTRLEN); - } - } - if (portstr != NULL) { -#ifdef NNG_LITTLE_ENDIAN - port = ((port >> 8) & 0xff) | ((port & 0xff) << 8); -#endif - snprintf(portstr, 6, "%u", port); - } - return (0); -} - int nni_win_tcp_sysinit(void) { diff --git a/src/platform/windows/win_tcp.h b/src/platform/windows/win_tcp.h new file mode 100644 index 00000000..7025af81 --- /dev/null +++ b/src/platform/windows/win_tcp.h @@ -0,0 +1,67 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef PLATFORM_WIN_WINTCP_H +#define PLATFORM_WIN_WINTCP_H + +// This header file is private to the TCP support for Windows. + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +struct nni_tcp_conn { + SOCKET s; + nni_win_io recv_io; + nni_win_io send_io; + nni_win_io conn_io; + nni_list recv_aios; + nni_list send_aios; + nni_aio * conn_aio; + SOCKADDR_STORAGE sockname; + SOCKADDR_STORAGE peername; + nni_tcp_dialer * dialer; + nni_tcp_listener *listener; + int recv_rv; + int send_rv; + int conn_rv; + bool closed; + char buf[512]; // to hold acceptex results + nni_mtx mtx; + nni_cv cv; +}; + +struct nni_tcp_dialer { + LPFN_CONNECTEX connectex; // looked up name via ioctl + nni_list aios; // in flight connections + bool closed; + nni_mtx mtx; + nni_reap_item reap; +}; + +struct nni_tcp_listener { + SOCKET s; + nni_list aios; + bool closed; + bool started; + LPFN_ACCEPTEX acceptex; + LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs; + SOCKADDR_STORAGE ss; + nni_mtx mtx; + nni_reap_item reap; +}; + +extern int nni_win_tcp_conn_init(nni_tcp_conn **, SOCKET); +extern void nni_win_tcp_conn_set_addrs( + nni_tcp_conn *, const SOCKADDR_STORAGE *, const SOCKADDR_STORAGE *); + +#endif // NNG_PLATFORM_WINDOWS + +#endif // NNG_PLATFORM_WIN_WINTCP_H diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c new file mode 100644 index 00000000..c3a4a5d8 --- /dev/null +++ b/src/platform/windows/win_tcpconn.c @@ -0,0 +1,391 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include "win_tcp.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include +#include + +static void +tcp_recv_start(nni_tcp_conn *c) +{ + nni_aio *aio; + int rv; + DWORD niov; + DWORD flags; + unsigned i; + unsigned naiov; + nni_iov *aiov; + WSABUF * iov; + + if (c->closed) { + while ((aio = nni_list_first(&c->recv_aios)) != NULL) { + nni_list_remove(&c->recv_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } +again: + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + iov = _malloca(naiov * sizeof(*iov)); + + // Put the AIOs in Windows form. + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iov[niov].buf = aiov[i].iov_buf; + iov[niov].len = (ULONG) aiov[i].iov_len; + niov++; + } + } + + flags = 0; + rv = WSARecv(c->s, iov, niov, NULL, &flags, &c->recv_io.olpd, NULL); + _freea(iov); + + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +tcp_recv_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_tcp_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->recv_rv != 0) { + rv = c->recv_rv; + c->recv_rv = 0; + } + nni_aio_list_remove(aio); + tcp_recv_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + if ((rv == 0) && (num == 0)) { + // A zero byte receive is a remote close from the peer. + rv = NNG_ECLOSED; + } + nni_aio_finish_synch(aio, rv, num); +} + +static void +tcp_recv_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->recv_aios)) { + c->recv_rv = rv; + nni_win_io_cancel(&c->recv_io); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_recv_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->recv_aios, aio); + if (aio == nni_list_first(&c->recv_aios)) { + tcp_recv_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_send_start(nni_tcp_conn *c) +{ + nni_aio *aio; + int rv; + DWORD niov; + unsigned i; + unsigned naiov; + nni_iov *aiov; + WSABUF * iov; + + if (c->closed) { + while ((aio = nni_list_first(&c->send_aios)) != NULL) { + nni_list_remove(&c->send_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } + +again: + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + iov = _malloca(naiov * sizeof(*iov)); + + // Put the AIOs in Windows form. + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iov[niov].buf = aiov[i].iov_buf; + iov[niov].len = (ULONG) aiov[i].iov_len; + niov++; + } + } + + rv = WSASend(c->s, iov, niov, NULL, 0, &c->send_io.olpd, NULL); + _freea(iov); + + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +tcp_send_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->send_aios)) { + c->send_rv = rv; + nni_win_io_cancel(&c->send_io); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_send_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_tcp_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->send_rv != 0) { + rv = c->send_rv; + c->send_rv = 0; + } + nni_aio_list_remove(aio); // should always be at head + tcp_send_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + nni_aio_finish_synch(aio, rv, num); +} + +void +nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_send_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->send_aios, aio); + if (aio == nni_list_first(&c->send_aios)) { + tcp_send_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s) +{ + nni_tcp_conn *c; + int rv; + BOOL yes; + DWORD no; + + // Don't inherit the handle (CLOEXEC really). + SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0); + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + c->s = INVALID_SOCKET; + nni_mtx_init(&c->mtx); + nni_cv_init(&c->cv, &c->mtx); + nni_aio_list_init(&c->recv_aios); + nni_aio_list_init(&c->send_aios); + c->conn_aio = NULL; + + if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) != + 0) || + ((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) != + 0) || + ((rv = nni_win_io_register((HANDLE) s)) != 0)) { + nni_tcp_conn_fini(c); + return (rv); + } + + no = 0; + (void) setsockopt( + s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); + yes = 1; + (void) setsockopt( + s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); + + c->s = s; + *connp = c; + return (0); +} + +void +nni_win_tcp_conn_set_addrs( + nni_tcp_conn *c, const SOCKADDR_STORAGE *loc, const SOCKADDR_STORAGE *rem) +{ + memcpy(&c->sockname, loc, sizeof(*loc)); + memcpy(&c->peername, rem, sizeof(*rem)); +} + +void +nni_tcp_conn_close(nni_tcp_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + c->closed = true; + if (!nni_list_empty(&c->recv_aios)) { + nni_win_io_cancel(&c->recv_io); + } + if (!nni_list_empty(&c->send_aios)) { + nni_win_io_cancel(&c->send_io); + } + if (c->s != INVALID_SOCKET) { + shutdown(c->s, SD_BOTH); + } + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa) +{ + if (nni_win_sockaddr2nn(sa, &c->peername) < 0) { + return (NNG_EADDRINVAL); + } + return (0); +} + +int +nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa) +{ + if (nni_win_sockaddr2nn(sa, &c->sockname) < 0) { + return (NNG_EADDRINVAL); + } + return (0); +} + +int +nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool val) +{ + BOOL b; + b = val ? TRUE : FALSE; + if (setsockopt( + c->s, IPPROTO_TCP, TCP_NODELAY, (void *) &b, sizeof(b)) != 0) { + return (nni_win_error(WSAGetLastError())); + } + return (0); +} + +int +nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool val) +{ + BOOL b; + b = val ? TRUE : FALSE; + if (setsockopt( + c->s, SOL_SOCKET, SO_KEEPALIVE, (void *) &b, sizeof(b)) != 0) { + return (nni_win_error(WSAGetLastError())); + } + return (0); +} + +void +nni_tcp_conn_fini(nni_tcp_conn *c) +{ + nni_tcp_conn_close(c); + + nni_mtx_lock(&c->mtx); + while ((!nni_list_empty(&c->recv_aios)) || + (!nni_list_empty(&c->send_aios))) { + nni_cv_wait(&c->cv); + nni_mtx_unlock(&c->mtx); + } + nni_mtx_unlock(&c->mtx); + + nni_win_io_fini(&c->recv_io); + nni_win_io_fini(&c->send_io); + nni_win_io_fini(&c->conn_io); + + if (c->s != INVALID_SOCKET) { + closesocket(c->s); + } + nni_cv_fini(&c->cv); + nni_mtx_fini(&c->mtx); + NNI_FREE_STRUCT(c); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c new file mode 100644 index 00000000..4a3e9f2f --- /dev/null +++ b/src/platform/windows/win_tcpdial.c @@ -0,0 +1,228 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include "win_tcp.h" + +#include +#include + +int +nni_tcp_dialer_init(nni_tcp_dialer **dp) +{ + nni_tcp_dialer *d; + int rv; + SOCKET s; + DWORD nbytes; + GUID guid = WSAID_CONNECTEX; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + ZeroMemory(d, sizeof(*d)); + nni_mtx_init(&d->mtx); + nni_aio_list_init(&d->aios); + + // Create a scratch socket for use with ioctl. + s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + nni_tcp_dialer_fini(d); + return (rv); + } + + // Look up the function pointer. + if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, + sizeof(guid), &d->connectex, sizeof(d->connectex), &nbytes, + NULL, NULL) == SOCKET_ERROR) { + rv = nni_win_error(GetLastError()); + closesocket(s); + nni_tcp_dialer_fini(d); + return (rv); + } + + closesocket(s); + + *dp = d; + return (0); +} + +void +nni_tcp_dialer_close(nni_tcp_dialer *d) +{ + nni_mtx_lock(&d->mtx); + if (!d->closed) { + nni_aio *aio; + d->closed = true; + + NNI_LIST_FOREACH (&d->aios, aio) { + nni_tcp_conn *c; + + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->conn_rv = NNG_ECLOSED; + nni_win_io_cancel(&c->conn_io); + } + } + } + nni_mtx_unlock(&d->mtx); +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_close(d); + nni_mtx_lock(&d->mtx); + if (!nni_list_empty(&d->aios)) { + nni_mtx_unlock(&d->mtx); + nni_reap(&d->reap, (nni_cb) nni_tcp_dialer_fini, d); + return; + } + nni_mtx_unlock(&d->mtx); + + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +tcp_dial_cancel(nni_aio *aio, int rv) +{ + nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_conn * c; + + nni_mtx_lock(&d->mtx); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + if (c->conn_rv == 0) { + c->conn_rv = rv; + } + nni_win_io_cancel(&c->conn_io); + } + nni_mtx_unlock(&d->mtx); +} + +static void +tcp_dial_cb(nni_win_io *io, int rv, size_t cnt) +{ + nni_tcp_conn * c = io->ptr; + nni_tcp_dialer *d = c->dialer; + nni_aio * aio = c->conn_aio; + + NNI_ARG_UNUSED(cnt); + + nni_mtx_lock(&d->mtx); + if ((aio = c->conn_aio) == NULL) { + // This should never occur. + nni_mtx_unlock(&d->mtx); + return; + } + + c->conn_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_aio_list_remove(aio); + if (c->conn_rv != 0) { + rv = c->conn_rv; + } + nni_mtx_unlock(&d->mtx); + + if (rv != 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + } else { + DWORD yes = 1; + (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, + (char *) &yes, sizeof(yes)); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +void +nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + SOCKET s; + SOCKADDR_STORAGE ss; + int len; + nni_tcp_conn * c; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if ((len = nni_win_nn2sockaddr(&ss, sa)) <= 0) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + + if ((s = socket(ss.ss_family, SOCK_STREAM, 0)) == INVALID_SOCKET) { + nni_aio_finish_error(aio, nni_win_error(GetLastError())); + return; + } + + if ((rv = nni_win_tcp_conn_init(&c, s)) != 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + c->peername = ss; + + // Windows ConnectEx requires the socket to be bound + // first. We just bind to an ephemeral address in the + // same family. + ZeroMemory(&c->sockname, sizeof(c->sockname)); + c->sockname.ss_family = ss.ss_family; + if (bind(s, (SOCKADDR *) &c->sockname, len) < 0) { + rv = nni_win_error(GetLastError()); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) != + 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + nni_mtx_lock(&d->mtx); + if (d->closed) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + c->dialer = d; + nni_aio_set_prov_extra(aio, 0, c); + if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; + } + c->conn_aio = aio; + nni_aio_list_append(&d->aios, aio); + + // dialing is concurrent. + if (!d->connectex(s, (struct sockaddr *) &c->peername, len, NULL, 0, + NULL, &c->conn_io.olpd)) { + if ((rv = GetLastError()) != ERROR_IO_PENDING) { + nni_aio_list_remove(aio); + nni_mtx_unlock(&d->mtx); + + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + } + nni_mtx_unlock(&d->mtx); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c new file mode 100644 index 00000000..5055c32d --- /dev/null +++ b/src/platform/windows/win_tcplisten.c @@ -0,0 +1,312 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include +#include +#include + +#include "win_tcp.h" + +// tcp_listener_funcs looks up function pointers we need for advanced accept +// functionality on Windows. Windows is weird. +static int +tcp_listener_funcs(nni_tcp_listener *l) +{ + static SRWLOCK lock = SRWLOCK_INIT; + static LPFN_ACCEPTEX acceptex; + static LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs; + + AcquireSRWLockExclusive(&lock); + if (acceptex == NULL) { + int rv; + DWORD nbytes; + GUID guid1 = WSAID_ACCEPTEX; + GUID guid2 = WSAID_GETACCEPTEXSOCKADDRS; + SOCKET s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + + if (s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + ReleaseSRWLockExclusive(&lock); + return (rv); + } + + // Look up the function pointer. + if ((WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, + sizeof(guid1), &acceptex, sizeof(acceptex), &nbytes, + NULL, NULL) == SOCKET_ERROR) || + (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, + sizeof(guid2), &getacceptexsockaddrs, + sizeof(getacceptexsockaddrs), &nbytes, NULL, + NULL) == SOCKET_ERROR)) { + rv = nni_win_error(GetLastError()); + acceptex = NULL; + getacceptexsockaddrs = NULL; + ReleaseSRWLockExclusive(&lock); + closesocket(s); + return (rv); + } + closesocket(s); + } + ReleaseSRWLockExclusive(&lock); + + l->acceptex = acceptex; + l->getacceptexsockaddrs = getacceptexsockaddrs; + return (0); +} + +static void +tcp_accept_cb(nni_win_io *io, int rv, size_t cnt) +{ + nni_tcp_conn * c = io->ptr; + nni_tcp_listener *l = c->listener; + nni_aio * aio; + int len1; + int len2; + SOCKADDR * sa1; + SOCKADDR * sa2; + DWORD yes; + + NNI_ARG_UNUSED(cnt); + + nni_mtx_lock(&l->mtx); + if ((aio = c->conn_aio) == NULL) { + // This case should not occur. The situation would indicate + // a case where the connection was accepted already. + nni_mtx_unlock(&l->mtx); + return; + } + c->conn_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_aio_list_remove(aio); + if (c->conn_rv != 0) { + rv = c->conn_rv; + } + nni_mtx_unlock(&l->mtx); + + if (rv != 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + len1 = (int) sizeof(c->sockname); + len2 = (int) sizeof(c->peername); + l->getacceptexsockaddrs(c->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2); + memcpy(&c->sockname, sa1, len1); + memcpy(&c->peername, sa2, len2); + + yes = 1; + (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, + (char *) &yes, sizeof(yes)); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +int +nni_tcp_listener_init(nni_tcp_listener **lp) +{ + nni_tcp_listener *l; + int rv; + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + ZeroMemory(l, sizeof(*l)); + nni_mtx_init(&l->mtx); + nni_aio_list_init(&l->aios); + if ((rv = tcp_listener_funcs(l)) != 0) { + nni_tcp_listener_fini(l); + return (rv); + } + + *lp = l; + return (0); +} + +void +nni_tcp_listener_close(nni_tcp_listener *l) +{ + nni_mtx_lock(&l->mtx); + if (!l->closed) { + nni_aio *aio; + l->closed = true; + + NNI_LIST_FOREACH (&l->aios, aio) { + nni_tcp_conn *c; + + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->conn_rv = NNG_ECLOSED; + nni_win_io_cancel(&c->conn_io); + } + } + closesocket(l->s); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_tcp_listener_fini(nni_tcp_listener *l) +{ + nni_tcp_listener_close(l); + nni_mtx_lock(&l->mtx); + if (!nni_list_empty(&l->aios)) { + nni_mtx_unlock(&l->mtx); + nni_reap(&l->reap, (nni_cb) nni_tcp_listener_fini, l); + return; + } + nni_mtx_unlock(&l->mtx); + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +int +nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) +{ + int rv; + BOOL yes; + DWORD no; + int len; + + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_EBUSY); + } + if ((len = nni_win_nn2sockaddr(&l->ss, sa)) <= 0) { + nni_mtx_unlock(&l->mtx); + return (NNG_EADDRINVAL); + } + l->s = socket(l->ss.ss_family, SOCK_STREAM, 0); + if (l->s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + nni_mtx_unlock(&l->mtx); + return (rv); + } + + // Don't inherit the handle (CLOEXEC really). + SetHandleInformation((HANDLE) l->s, HANDLE_FLAG_INHERIT, 0); + + no = 0; + (void) setsockopt( + l->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); + yes = 1; + (void) setsockopt( + l->s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); + + if ((rv = nni_win_io_register((HANDLE) l->s)) != 0) { + closesocket(l->s); + l->s = INVALID_SOCKET; + nni_mtx_unlock(&l->mtx); + return (rv); + } + + // Make sure that we use the address exclusively. Windows lets + // others hijack us by default. + yes = 1; + if ((setsockopt(l->s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, + sizeof(yes)) != 0) || + (bind(l->s, (SOCKADDR *) &l->ss, len) != 0) || + (getsockname(l->s, (SOCKADDR *) &l->ss, &len) != 0) || + (listen(l->s, SOMAXCONN) != 0)) { + rv = nni_win_error(GetLastError()); + closesocket(l->s); + l->s = INVALID_SOCKET; + nni_mtx_unlock(&l->mtx); + return (rv); + } + nni_win_sockaddr2nn(sa, &l->ss); + l->started = true; + nni_mtx_unlock(&l->mtx); + return (0); +} + +static void +tcp_accept_cancel(nni_aio *aio, int rv) +{ + nni_tcp_listener *l = nni_aio_get_prov_data(aio); + nni_tcp_conn * c; + + nni_mtx_lock(&l->mtx); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + if (c->conn_rv == 0) { + c->conn_rv = rv; + } + nni_win_io_cancel(&c->conn_io); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) +{ + SOCKET s; + int rv; + DWORD cnt; + nni_tcp_conn *c; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + // Windows requires us to explicity create the socket before + // calling accept on it. + if ((s = socket(l->ss.ss_family, SOCK_STREAM, 0)) == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_win_tcp_conn_init(&c, s)) != 0) { + nni_mtx_unlock(&l->mtx); + closesocket(s); + nni_aio_finish_error(aio, rv); + return; + } + c->listener = l; + c->conn_aio = aio; + nni_aio_set_prov_extra(aio, 0, c); + if (((rv = nni_win_io_init( + &c->conn_io, (HANDLE) l->s, tcp_accept_cb, c)) != 0) || + ((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) { + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&l->mtx); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&l->aios, aio); + if ((!l->acceptex( + l->s, s, c->buf, 0, 256, 256, &cnt, &c->conn_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Fast failure (synchronous.) + nni_aio_list_remove(aio); + nni_mtx_unlock(&l->mtx); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + nni_mtx_unlock(&l->mtx); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index 243811a0..52327cc4 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -181,7 +181,8 @@ nni_plat_init(int (*helper)(void)) AcquireSRWLockExclusive(&lock); if (!plat_inited) { - if (((rv = nni_win_iocp_sysinit()) != 0) || + if (((rv = nni_win_io_sysinit()) != 0) || + ((rv = nni_win_iocp_sysinit()) != 0) || ((rv = nni_win_ipc_sysinit()) != 0) || ((rv = nni_win_tcp_sysinit()) != 0) || ((rv = nni_win_udp_sysinit()) != 0) || @@ -207,6 +208,7 @@ nni_plat_fini(void) nni_win_udp_sysfini(); nni_win_tcp_sysfini(); nni_win_iocp_sysfini(); + nni_win_io_sysfini(); WSACleanup(); plat_inited = 0; } -- cgit v1.2.3-70-g09d2