diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-07-09 09:59:46 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-07-16 10:06:50 -0700 |
| commit | b44e20c80c936a29bfeaf964ec94bc62ac0386f5 (patch) | |
| tree | 87b2b5b999046b7f10789d4bae863eeea9354e44 /src/platform/posix | |
| parent | 05f404b917ddaf9fee70208a796cdf66ee747050 (diff) | |
| download | nng-b44e20c80c936a29bfeaf964ec94bc62ac0386f5.tar.gz nng-b44e20c80c936a29bfeaf964ec94bc62ac0386f5.tar.bz2 nng-b44e20c80c936a29bfeaf964ec94bc62ac0386f5.zip | |
fixes #523 dialers could support multiple outstanding dial requests
fixes #179 DNS resolution should be done at connect time
fixes #586 Windows IO completion port work could be better
fixes #339 Windows iocp could use synchronous completions
fixes #280 TCP abstraction improvements
This is a rather monstrous set of changes, which refactors TCP, and
the underlying Windows I/O completion path logic, in order to obtain
a cleaner, simpler API, with support for asynchronous DNS lookups performed
on connect rather than initialization time, the ability to have multiple
connects or accepts pending, as well as fewer extraneous function calls.
The Windows code also benefits from greatly reduced context switching,
fewer lock operations performed, and a reduced number of system calls
on the hot code path. (We use automatic event resetting instead of manual.)
Some dead code was removed as well, and a few potential edge case leaks
on failure paths (in the websocket code) were plugged.
Note that all TCP based transports benefit from this work. The IPC code
on Windows still uses the legacy IOCP for now, as does the UDP code (used
for ZeroTier.) We will be converting those soon too.
Diffstat (limited to 'src/platform/posix')
| -rw-r--r-- | src/platform/posix/posix_aio.h | 3 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 3 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_resolv_gai.c | 46 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcp.c | 109 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcp.h | 44 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpconn.c | 410 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpdial.c | 234 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcplisten.c | 319 |
9 files changed, 1055 insertions, 115 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 83f2f1a8..3b1ce751 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -20,8 +20,8 @@ #include "core/nng_impl.h" #include "posix_pollq.h" +#include <sys/stat.h> // needed for musl build #include <sys/types.h> // needed for mode_t -#include <sys/stat.h> // needed for musl build typedef struct nni_posix_pipedesc nni_posix_pipedesc; typedef struct nni_posix_epdesc nni_posix_epdesc; @@ -48,5 +48,4 @@ extern int nni_posix_epdesc_listen(nni_posix_epdesc *); extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *); extern int nni_posix_epdesc_sockname(nni_posix_epdesc *, nni_sockaddr *); extern int nni_posix_epdesc_set_permissions(nni_posix_epdesc *, mode_t); - #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 74c92dbd..d796765a 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -188,13 +188,14 @@ nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_posix_epdesc *ed = arg; + NNI_ARG_UNUSED(pfd); + nni_mtx_lock(&ed->mtx); if (events & POLLNVAL) { nni_epdesc_doclose(ed); nni_mtx_unlock(&ed->mtx); return; } - NNI_ASSERT(pfd == ed->pfd); // Anything else will turn up in accept. nni_epdesc_doaccept(ed); diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index 26753df6..ec487de5 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -302,7 +302,7 @@ static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { nni_mtx_lock(&pq->mtx); - pq->closing = 1; + pq->closing = true; nni_mtx_unlock(&pq->mtx); nni_plat_pipe_raise(pq->wakewfd); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 8c003362..63d858c2 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -13,6 +13,7 @@ #ifdef NNG_USE_POSIX_RESOLV_GAI #include "platform/posix/posix_aio.h" +#include <arpa/inet.h> #include <ctype.h> #include <errno.h> #include <netdb.h> @@ -274,14 +275,14 @@ resolv_ip(const char *host, const char *serv, int passive, int family, } void -nni_plat_tcp_resolv( +nni_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void -nni_plat_udp_resolv( +nni_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); @@ -330,6 +331,47 @@ resolv_worker(void *notused) } int +nni_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) +{ + const void *ap; + uint16_t port; + int af; + switch (sa->s_family) { + case NNG_AF_INET: + ap = &sa->s_in.sa_addr; + port = sa->s_in.sa_port; + af = AF_INET; + break; + case NNG_AF_INET6: + ap = &sa->s_in6.sa_addr; + port = sa->s_in6.sa_port; + af = AF_INET6; + break; + default: + return (NNG_EINVAL); + } + if (ipstr != NULL) { + if (af == AF_INET6) { + size_t l; + ipstr[0] = '['; + inet_ntop(af, ap, ipstr + 1, INET6_ADDRSTRLEN); + l = strlen(ipstr); + ipstr[l++] = ']'; + ipstr[l++] = '\0'; + } else { + inet_ntop(af, ap, ipstr, INET6_ADDRSTRLEN); + } + } + if (portstr != NULL) { +#ifdef NNG_LITTLE_ENDIAN + port = ((port >> 8) & 0xff) | ((port & 0xff) << 8); +#endif + snprintf(portstr, 6, "%u", port); + } + return (0); +} + +int nni_posix_resolv_sysinit(void) { nni_mtx_init(&resolv_mtx); diff --git a/src/platform/posix/posix_tcp.c b/src/platform/posix/posix_tcp.c index cbc7a641..53ea8026 100644 --- a/src/platform/posix/posix_tcp.c +++ b/src/platform/posix/posix_tcp.c @@ -26,115 +26,6 @@ #include <unistd.h> int -nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa, - const nni_sockaddr *rsa, int mode) -{ - nni_posix_epdesc * ed; - int rv; - struct sockaddr_storage ss; - int len; - - if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) { - return (rv); - } - - if ((rsa != NULL) && (rsa->s_family != NNG_AF_UNSPEC)) { - len = nni_posix_nn2sockaddr((void *) &ss, rsa); - nni_posix_epdesc_set_remote(ed, &ss, len); - } - if ((lsa != NULL) && (lsa->s_family != NNG_AF_UNSPEC)) { - len = nni_posix_nn2sockaddr((void *) &ss, lsa); - nni_posix_epdesc_set_local(ed, &ss, len); - } - - *epp = (void *) ed; - return (0); -} - -void -nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_fini((void *) ep); -} - -void -nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_close((void *) ep); -} - -int -nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep, nng_sockaddr *bsa) -{ - int rv; - rv = nni_posix_epdesc_listen((void *) ep); - if ((rv == 0) && (bsa != NULL)) { - rv = nni_posix_epdesc_sockname((void *) ep, bsa); - } - return (rv); -} - -void -nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_connect((void *) ep, aio); -} - -void -nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_accept((void *) ep, aio); -} - -void -nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_fini((void *) p); -} - -void -nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_close((void *) p); -} - -void -nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_send((void *) p, aio); -} - -void -nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_recv((void *) p, aio); -} - -int -nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *p, nni_sockaddr *sa) -{ - return (nni_posix_pipedesc_peername((void *) p, sa)); -} - -int -nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *p, nni_sockaddr *sa) -{ - return (nni_posix_pipedesc_sockname((void *) p, sa)); -} - -int -nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *p, bool v) -{ - return (nni_posix_pipedesc_set_keepalive((void *) p, v)); -} - -int -nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *p, bool v) -{ - return (nni_posix_pipedesc_set_nodelay((void *) p, v)); -} - -int nni_plat_tcp_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) { const void *ap; diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h new file mode 100644 index 00000000..aefce7f7 --- /dev/null +++ b/src/platform/posix/posix_tcp.h @@ -0,0 +1,44 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +struct nni_tcp_conn { + nni_posix_pfd * pfd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_aio * dial_aio; + nni_tcp_dialer *dialer; + nni_reap_item reap; +}; + +struct nni_tcp_dialer { + nni_list connq; // pending connections + bool closed; + nni_mtx mtx; +}; + +struct nni_tcp_listener { + nni_posix_pfd *pfd; + nni_list acceptq; + bool started; + bool closed; + nni_mtx mtx; +}; + +extern int nni_posix_tcp_conn_init(nni_tcp_conn **, nni_posix_pfd *); +extern void nni_posix_tcp_conn_start(nni_tcp_conn *); + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c new file mode 100644 index 00000000..30525648 --- /dev/null +++ b/src/platform/posix/posix_tcpconn.c @@ -0,0 +1,410 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <poll.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> +#ifdef NNG_HAVE_ALLOCA +#include <alloca.h> +#endif + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#include "posix_tcp.h" + +static void +tcp_conn_dowrite(nni_tcp_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->writeq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov * aiov; + struct msghdr hdr; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + memset(&hdr, 0, sizeof(hdr)); + nni_aio_get_iov(aio, &naiov, &aiov); + +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len > 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + hdr.msg_iovlen = niov; + hdr.msg_iov = iovec; + + if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + nni_aio_bump_count(aio, n); + // We completed the entire operation on this aio. + // (Sendmsg never returns a partial result.) + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +static void +tcp_conn_doread(nni_tcp_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->readq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov *aiov; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + nni_aio_get_iov(aio, &naiov, &aiov); +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + if (n == 0) { + // No bytes indicates a closed descriptor. + // This implicitly completes this (all!) aio. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + continue; + } + + nni_aio_bump_count(aio, n); + + // We completed the entire operation on this aio. + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +void +nni_tcp_conn_close(nni_tcp_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + nni_aio *aio; + c->closed = true; + while (((aio = nni_list_first(&c->readq)) != NULL) || + ((aio = nni_list_first(&c->writeq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_posix_pfd_close(c->pfd); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_tcp_conn *c = arg; + + if (events & (POLLHUP | POLLERR | POLLNVAL)) { + nni_tcp_conn_close(c); + return; + } + nni_mtx_lock(&c->mtx); + if (events & POLLIN) { + tcp_conn_doread(c); + } + if (events & POLLOUT) { + tcp_conn_dowrite(c); + } + events = 0; + if (!nni_list_empty(&c->writeq)) { + events |= POLLOUT; + } + if (!nni_list_empty(&c->readq)) { + events |= POLLIN; + } + if ((!c->closed) && (events != 0)) { + nni_posix_pfd_arm(pfd, events); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_conn_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) +{ + + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->writeq, aio); + + if (nni_list_first(&c->writeq) == aio) { + tcp_conn_dowrite(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->writeq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLOUT); + } + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->readq, aio); + + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. + if (nni_list_first(&c->readq) == aio) { + tcp_conn_doread(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->readq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLIN); + } + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa) +{ + struct sockaddr_storage ss; + socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(c->pfd); + + if (getpeername(fd, (void *) &ss, &sslen) != 0) { + return (nni_plat_errno(errno)); + } + return (nni_posix_sockaddr2nn(sa, &ss)); +} + +int +nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa) +{ + struct sockaddr_storage ss; + socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(c->pfd); + + if (getsockname(fd, (void *) &ss, &sslen) != 0) { + return (nni_plat_errno(errno)); + } + return (nni_posix_sockaddr2nn(sa, &ss)); +} + +int +nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep) +{ + int val = keep ? 1 : 0; + int fd = nni_posix_pfd_fd(c->pfd); + + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { + return (nni_plat_errno(errno)); + } + return (0); +} + +int +nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay) +{ + + int val = nodelay ? 1 : 0; + int fd = nni_posix_pfd_fd(c->pfd); + + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { + return (nni_plat_errno(errno)); + } + return (0); +} + +int +nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd) +{ + nni_tcp_conn *c; + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + + c->closed = false; + c->pfd = pfd; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + *cp = c; + return (0); +} + +void +nni_posix_tcp_conn_start(nni_tcp_conn *c) +{ + nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c); +} + +void +nni_tcp_conn_fini(nni_tcp_conn *c) +{ + nni_tcp_conn_close(c); + nni_posix_pfd_fini(c->pfd); + c->pfd = NULL; + nni_mtx_fini(&c->mtx); + + NNI_FREE_STRUCT(c); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c new file mode 100644 index 00000000..7f627361 --- /dev/null +++ b/src/platform/posix/posix_tcpdial.c @@ -0,0 +1,234 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_tcp.h" + +// Dialer stuff. +int +nni_tcp_dialer_init(nni_tcp_dialer **dp) +{ + nni_tcp_dialer *d; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&d->mtx); + d->closed = false; + nni_aio_list_init(&d->connq); + *dp = d; + return (0); +} + +void +nni_tcp_dialer_close(nni_tcp_dialer *d) +{ + nni_mtx_lock(&d->mtx); + if (!d->closed) { + nni_aio *aio; + d->closed = true; + while ((aio = nni_list_first(&d->connq)) != NULL) { + nni_tcp_conn *c; + nni_list_remove(&d->connq, aio); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_tcp_conn_close(c); + nni_reap( + &c->reap, (nni_cb) nni_tcp_conn_fini, c); + } + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } + nni_mtx_unlock(&d->mtx); +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_close(d); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +tcp_dialer_cancel(nni_aio *aio, int rv) +{ + nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_conn * c; + + nni_mtx_lock(&d->mtx); + if ((!nni_aio_list_active(aio)) || + ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) { + nni_mtx_unlock(&d->mtx); + return; + } + nni_aio_list_remove(aio); + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + nni_aio_finish_error(aio, rv); + nni_tcp_conn_fini(c); +} + +static void +tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg) +{ + nni_tcp_conn * c = arg; + nni_tcp_dialer *d = c->dialer; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + aio = c->dial_aio; + if ((aio == NULL) || (!nni_aio_list_active(aio))) { + nni_mtx_unlock(&d->mtx); + return; + } + + if (ev & POLLNVAL) { + rv = EBADF; + + } else { + socklen_t sz = sizeof(int); + int fd = nni_posix_pfd_fd(pfd); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + if (rv == EINPROGRESS) { + // Connection still in progress, come back + // later. + nni_mtx_unlock(&d->mtx); + return; + } else if (rv != 0) { + rv = nni_plat_errno(rv); + } + } + + c->dial_aio = NULL; + nni_aio_list_remove(aio); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + if (rv != 0) { + nni_tcp_conn_close(c); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +// We don't give local address binding support. Outbound dialers always +// get an ephemeral port. +void +nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + nni_tcp_conn * c; + nni_posix_pfd * pfd = NULL; + struct sockaddr_storage ss; + size_t sslen; + int fd; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_aio_finish_error(aio, nni_plat_errno(errno)); + return; + } + + // This arranges for the fd to be in nonblocking mode, and adds the + // pollfd to the list. + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + (void) close(fd); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_finish_error(aio, rv); + return; + } + c->dialer = d; + nni_posix_pfd_set_cb(pfd, tcp_dialer_cb, c); + + nni_mtx_lock(&d->mtx); + if (d->closed) { + rv = NNG_ECLOSED; + goto error; + } + if ((rv = nni_aio_schedule(aio, tcp_dialer_cancel, d)) != 0) { + goto error; + } + if ((rv = connect(fd, (void *) &ss, sslen)) != 0) { + if (errno != EINPROGRESS) { + rv = nni_plat_errno(errno); + goto error; + } + // Asynchronous connect. + if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) { + goto error; + } + c->dial_aio = aio; + nni_aio_set_prov_extra(aio, 0, c); + nni_list_append(&d->connq, aio); + nni_mtx_unlock(&d->mtx); + return; + } + // Immediate connect, cool! This probably only happens + // on loopback, and probably not on every platform. + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + return; + +error: + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_reap(&c->reap, (nni_cb) nni_tcp_conn_fini, c); + nni_aio_finish_error(aio, rv); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c new file mode 100644 index 00000000..cdcbecd9 --- /dev/null +++ b/src/platform/posix/posix_tcplisten.c @@ -0,0 +1,319 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_tcp.h" + +int +nni_tcp_listener_init(nni_tcp_listener **lp) +{ + nni_tcp_listener *l; + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_init(&l->mtx); + + l->pfd = NULL; + l->closed = false; + l->started = false; + + nni_aio_list_init(&l->acceptq); + *lp = l; + return (0); +} + +static void +tcp_listener_doclose(nni_tcp_listener *l) +{ + nni_aio *aio; + + l->closed = true; + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + + if (l->pfd != NULL) { + nni_posix_pfd_close(l->pfd); + } +} + +void +nni_tcp_listener_close(nni_tcp_listener *l) +{ + nni_mtx_lock(&l->mtx); + tcp_listener_doclose(l); + nni_mtx_unlock(&l->mtx); +} + +static void +tcp_listener_doaccept(nni_tcp_listener *l) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + int newfd; + int fd; + int rv; + nni_posix_pfd *pfd; + nni_tcp_conn * c; + + fd = nni_posix_pfd_fd(l->pfd); + +#ifdef NNG_USE_ACCEPT4 + newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); + if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { + newfd = accept(fd, NULL, NULL); + } +#else + newfd = accept(fd, NULL, NULL); +#endif + if (newfd < 0) { + switch (errno) { + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + rv = nni_posix_pfd_arm(l->pfd, POLLIN); + if (rv != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + // Come back later... + return; + case ECONNABORTED: + case ECONNRESET: + // Eat them, they aren't interesting. + continue; + default: + // Error this one, but keep moving to the next. + rv = nni_plat_errno(errno); + NNI_ASSERT(rv != 0); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + } + + if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + close(newfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + nni_aio_list_remove(aio); + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +static void +tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_tcp_listener *l = arg; + + nni_mtx_lock(&l->mtx); + if (events & POLLNVAL) { + tcp_listener_doclose(l); + nni_mtx_unlock(&l->mtx); + return; + } + NNI_ASSERT(pfd == l->pfd); + + // Anything else will turn up in accept. + tcp_listener_doaccept(l); + nni_mtx_unlock(&l->mtx); +} + +static void +tcp_listener_cancel(nni_aio *aio, int rv) +{ + nni_tcp_listener *l = nni_aio_get_prov_data(aio); + + // This is dead easy, because we'll ignore the completion if there + // isn't anything to do the accept on! + NNI_ASSERT(rv != 0); + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +int +nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) +{ + socklen_t len; + struct sockaddr_storage ss; + int rv; + int fd; + nni_posix_pfd * pfd; + + if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) { + return (NNG_EADDRINVAL); + } + + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_ESTATE); + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_mtx_unlock(&l->mtx); + return (nni_plat_errno(errno)); + } + + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + +// On the Windows Subsystem for Linux, SO_REUSEADDR behaves like Windows +// SO_REUSEADDR, which is almost completely different (and wrong!) from +// traditional SO_REUSEADDR. +#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL) + { + int on = 1; + // If for some reason this doesn't work, it's probably ok. + // Second bind will fail. + (void) setsockopt( + fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + } +#endif + + if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + + // Listen -- 128 depth is probably sufficient. If it isn't, other + // bad things are going to happen. + if (listen(fd, 128) != 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + + // Lets get the bound sockname, and pass that back to the caller. + // This permits ephemeral port binding to work. + // If this fails for some reason, we just don't update the + // sockaddr structure. This is kind of suboptimal, but failures + // here should never occur. + len = sizeof(ss); + (void) getsockname(fd, (void *) &ss, &len); + (void) nni_posix_sockaddr2nn(sa, &ss); + + nni_posix_pfd_set_cb(pfd, tcp_listener_cb, l); + + l->pfd = pfd; + l->started = true; + nni_mtx_unlock(&l->mtx); + + return (0); +} + +void +nni_tcp_listener_fini(nni_tcp_listener *l) +{ + nni_posix_pfd *pfd; + + nni_mtx_lock(&l->mtx); + tcp_listener_doclose(l); + pfd = l->pfd; + nni_mtx_unlock(&l->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +void +nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) +{ + int rv; + + // Accept is simpler than the connect case. With accept we just + // need to wait for the socket to be readable to indicate an incoming + // connection is ready for us. There isn't anything else for us to + // do really, as that will have been done in listen. + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + + if (!l->started) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&l->acceptq, aio); + if (nni_list_first(&l->acceptq) == aio) { + tcp_listener_doaccept(l); + } + nni_mtx_unlock(&l->mtx); +} + +#endif // NNG_PLATFORM_POSIX |
