diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-07-09 09:59:46 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-07-16 10:06:50 -0700 |
| commit | b44e20c80c936a29bfeaf964ec94bc62ac0386f5 (patch) | |
| tree | 87b2b5b999046b7f10789d4bae863eeea9354e44 /src/platform/windows/win_tcpconn.c | |
| parent | 05f404b917ddaf9fee70208a796cdf66ee747050 (diff) | |
| download | nng-b44e20c80c936a29bfeaf964ec94bc62ac0386f5.tar.gz nng-b44e20c80c936a29bfeaf964ec94bc62ac0386f5.tar.bz2 nng-b44e20c80c936a29bfeaf964ec94bc62ac0386f5.zip | |
fixes #523 dialers could support multiple outstanding dial requests
fixes #179 DNS resolution should be done at connect time
fixes #586 Windows IO completion port work could be better
fixes #339 Windows iocp could use synchronous completions
fixes #280 TCP abstraction improvements
This is a rather monstrous set of changes, which refactors TCP, and
the underlying Windows I/O completion path logic, in order to obtain
a cleaner, simpler API, with support for asynchronous DNS lookups performed
on connect rather than initialization time, the ability to have multiple
connects or accepts pending, as well as fewer extraneous function calls.
The Windows code also benefits from greatly reduced context switching,
fewer lock operations performed, and a reduced number of system calls
on the hot code path. (We use automatic event resetting instead of manual.)
Some dead code was removed as well, and a few potential edge case leaks
on failure paths (in the websocket code) were plugged.
Note that all TCP based transports benefit from this work. The IPC code
on Windows still uses the legacy IOCP for now, as does the UDP code (used
for ZeroTier.) We will be converting those soon too.
Diffstat (limited to 'src/platform/windows/win_tcpconn.c')
| -rw-r--r-- | src/platform/windows/win_tcpconn.c | 391 |
1 files changed, 391 insertions, 0 deletions
diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c new file mode 100644 index 00000000..c3a4a5d8 --- /dev/null +++ b/src/platform/windows/win_tcpconn.c @@ -0,0 +1,391 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include "win_tcp.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include <malloc.h> +#include <stdio.h> + +static void +tcp_recv_start(nni_tcp_conn *c) +{ + nni_aio *aio; + int rv; + DWORD niov; + DWORD flags; + unsigned i; + unsigned naiov; + nni_iov *aiov; + WSABUF * iov; + + if (c->closed) { + while ((aio = nni_list_first(&c->recv_aios)) != NULL) { + nni_list_remove(&c->recv_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } +again: + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + iov = _malloca(naiov * sizeof(*iov)); + + // Put the AIOs in Windows form. + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iov[niov].buf = aiov[i].iov_buf; + iov[niov].len = (ULONG) aiov[i].iov_len; + niov++; + } + } + + flags = 0; + rv = WSARecv(c->s, iov, niov, NULL, &flags, &c->recv_io.olpd, NULL); + _freea(iov); + + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +tcp_recv_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_tcp_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->recv_rv != 0) { + rv = c->recv_rv; + c->recv_rv = 0; + } + nni_aio_list_remove(aio); + tcp_recv_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + if ((rv == 0) && (num == 0)) { + // A zero byte receive is a remote close from the peer. + rv = NNG_ECLOSED; + } + nni_aio_finish_synch(aio, rv, num); +} + +static void +tcp_recv_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->recv_aios)) { + c->recv_rv = rv; + nni_win_io_cancel(&c->recv_io); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_recv_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->recv_aios, aio); + if (aio == nni_list_first(&c->recv_aios)) { + tcp_recv_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_send_start(nni_tcp_conn *c) +{ + nni_aio *aio; + int rv; + DWORD niov; + unsigned i; + unsigned naiov; + nni_iov *aiov; + WSABUF * iov; + + if (c->closed) { + while ((aio = nni_list_first(&c->send_aios)) != NULL) { + nni_list_remove(&c->send_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } + +again: + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + iov = _malloca(naiov * sizeof(*iov)); + + // Put the AIOs in Windows form. + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iov[niov].buf = aiov[i].iov_buf; + iov[niov].len = (ULONG) aiov[i].iov_len; + niov++; + } + } + + rv = WSASend(c->s, iov, niov, NULL, 0, &c->send_io.olpd, NULL); + _freea(iov); + + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +tcp_send_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->send_aios)) { + c->send_rv = rv; + nni_win_io_cancel(&c->send_io); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_send_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_tcp_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->send_rv != 0) { + rv = c->send_rv; + c->send_rv = 0; + } + nni_aio_list_remove(aio); // should always be at head + tcp_send_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + nni_aio_finish_synch(aio, rv, num); +} + +void +nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_send_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->send_aios, aio); + if (aio == nni_list_first(&c->send_aios)) { + tcp_send_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s) +{ + nni_tcp_conn *c; + int rv; + BOOL yes; + DWORD no; + + // Don't inherit the handle (CLOEXEC really). + SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0); + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + c->s = INVALID_SOCKET; + nni_mtx_init(&c->mtx); + nni_cv_init(&c->cv, &c->mtx); + nni_aio_list_init(&c->recv_aios); + nni_aio_list_init(&c->send_aios); + c->conn_aio = NULL; + + if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) != + 0) || + ((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) != + 0) || + ((rv = nni_win_io_register((HANDLE) s)) != 0)) { + nni_tcp_conn_fini(c); + return (rv); + } + + no = 0; + (void) setsockopt( + s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); + yes = 1; + (void) setsockopt( + s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); + + c->s = s; + *connp = c; + return (0); +} + +void +nni_win_tcp_conn_set_addrs( + nni_tcp_conn *c, const SOCKADDR_STORAGE *loc, const SOCKADDR_STORAGE *rem) +{ + memcpy(&c->sockname, loc, sizeof(*loc)); + memcpy(&c->peername, rem, sizeof(*rem)); +} + +void +nni_tcp_conn_close(nni_tcp_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + c->closed = true; + if (!nni_list_empty(&c->recv_aios)) { + nni_win_io_cancel(&c->recv_io); + } + if (!nni_list_empty(&c->send_aios)) { + nni_win_io_cancel(&c->send_io); + } + if (c->s != INVALID_SOCKET) { + shutdown(c->s, SD_BOTH); + } + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa) +{ + if (nni_win_sockaddr2nn(sa, &c->peername) < 0) { + return (NNG_EADDRINVAL); + } + return (0); +} + +int +nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa) +{ + if (nni_win_sockaddr2nn(sa, &c->sockname) < 0) { + return (NNG_EADDRINVAL); + } + return (0); +} + +int +nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool val) +{ + BOOL b; + b = val ? TRUE : FALSE; + if (setsockopt( + c->s, IPPROTO_TCP, TCP_NODELAY, (void *) &b, sizeof(b)) != 0) { + return (nni_win_error(WSAGetLastError())); + } + return (0); +} + +int +nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool val) +{ + BOOL b; + b = val ? TRUE : FALSE; + if (setsockopt( + c->s, SOL_SOCKET, SO_KEEPALIVE, (void *) &b, sizeof(b)) != 0) { + return (nni_win_error(WSAGetLastError())); + } + return (0); +} + +void +nni_tcp_conn_fini(nni_tcp_conn *c) +{ + nni_tcp_conn_close(c); + + nni_mtx_lock(&c->mtx); + while ((!nni_list_empty(&c->recv_aios)) || + (!nni_list_empty(&c->send_aios))) { + nni_cv_wait(&c->cv); + nni_mtx_unlock(&c->mtx); + } + nni_mtx_unlock(&c->mtx); + + nni_win_io_fini(&c->recv_io); + nni_win_io_fini(&c->send_io); + nni_win_io_fini(&c->conn_io); + + if (c->s != INVALID_SOCKET) { + closesocket(c->s); + } + nni_cv_fini(&c->cv); + nni_mtx_fini(&c->mtx); + NNI_FREE_STRUCT(c); +} + +#endif // NNG_PLATFORM_WINDOWS |
