diff options
Diffstat (limited to 'src/platform/posix')
| -rw-r--r-- | src/platform/posix/posix_aio.h | 3 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 3 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_resolv_gai.c | 46 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcp.c | 109 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcp.h | 44 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpconn.c | 410 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpdial.c | 234 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcplisten.c | 319 |
9 files changed, 1055 insertions, 115 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 83f2f1a8..3b1ce751 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -20,8 +20,8 @@ #include "core/nng_impl.h" #include "posix_pollq.h" +#include <sys/stat.h> // needed for musl build #include <sys/types.h> // needed for mode_t -#include <sys/stat.h> // needed for musl build typedef struct nni_posix_pipedesc nni_posix_pipedesc; typedef struct nni_posix_epdesc nni_posix_epdesc; @@ -48,5 +48,4 @@ extern int nni_posix_epdesc_listen(nni_posix_epdesc *); extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *); extern int nni_posix_epdesc_sockname(nni_posix_epdesc *, nni_sockaddr *); extern int nni_posix_epdesc_set_permissions(nni_posix_epdesc *, mode_t); - #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 74c92dbd..d796765a 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -188,13 +188,14 @@ nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_posix_epdesc *ed = arg; + NNI_ARG_UNUSED(pfd); + nni_mtx_lock(&ed->mtx); if (events & POLLNVAL) { nni_epdesc_doclose(ed); nni_mtx_unlock(&ed->mtx); return; } - NNI_ASSERT(pfd == ed->pfd); // Anything else will turn up in accept. nni_epdesc_doaccept(ed); diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index 26753df6..ec487de5 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -302,7 +302,7 @@ static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { nni_mtx_lock(&pq->mtx); - pq->closing = 1; + pq->closing = true; nni_mtx_unlock(&pq->mtx); nni_plat_pipe_raise(pq->wakewfd); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 8c003362..63d858c2 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -13,6 +13,7 @@ #ifdef NNG_USE_POSIX_RESOLV_GAI #include "platform/posix/posix_aio.h" +#include <arpa/inet.h> #include <ctype.h> #include <errno.h> #include <netdb.h> @@ -274,14 +275,14 @@ resolv_ip(const char *host, const char *serv, int passive, int family, } void -nni_plat_tcp_resolv( +nni_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void -nni_plat_udp_resolv( +nni_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); @@ -330,6 +331,47 @@ resolv_worker(void *notused) } int +nni_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) +{ + const void *ap; + uint16_t port; + int af; + switch (sa->s_family) { + case NNG_AF_INET: + ap = &sa->s_in.sa_addr; + port = sa->s_in.sa_port; + af = AF_INET; + break; + case NNG_AF_INET6: + ap = &sa->s_in6.sa_addr; + port = sa->s_in6.sa_port; + af = AF_INET6; + break; + default: + return (NNG_EINVAL); + } + if (ipstr != NULL) { + if (af == AF_INET6) { + size_t l; + ipstr[0] = '['; + inet_ntop(af, ap, ipstr + 1, INET6_ADDRSTRLEN); + l = strlen(ipstr); + ipstr[l++] = ']'; + ipstr[l++] = '\0'; + } else { + inet_ntop(af, ap, ipstr, INET6_ADDRSTRLEN); + } + } + if (portstr != NULL) { +#ifdef NNG_LITTLE_ENDIAN + port = ((port >> 8) & 0xff) | ((port & 0xff) << 8); +#endif + snprintf(portstr, 6, "%u", port); + } + return (0); +} + +int nni_posix_resolv_sysinit(void) { nni_mtx_init(&resolv_mtx); diff --git a/src/platform/posix/posix_tcp.c b/src/platform/posix/posix_tcp.c index cbc7a641..53ea8026 100644 --- a/src/platform/posix/posix_tcp.c +++ b/src/platform/posix/posix_tcp.c @@ -26,115 +26,6 @@ #include <unistd.h> int -nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa, - const nni_sockaddr *rsa, int mode) -{ - nni_posix_epdesc * ed; - int rv; - struct sockaddr_storage ss; - int len; - - if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) { - return (rv); - } - - if ((rsa != NULL) && (rsa->s_family != NNG_AF_UNSPEC)) { - len = nni_posix_nn2sockaddr((void *) &ss, rsa); - nni_posix_epdesc_set_remote(ed, &ss, len); - } - if ((lsa != NULL) && (lsa->s_family != NNG_AF_UNSPEC)) { - len = nni_posix_nn2sockaddr((void *) &ss, lsa); - nni_posix_epdesc_set_local(ed, &ss, len); - } - - *epp = (void *) ed; - return (0); -} - -void -nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_fini((void *) ep); -} - -void -nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_close((void *) ep); -} - -int -nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep, nng_sockaddr *bsa) -{ - int rv; - rv = nni_posix_epdesc_listen((void *) ep); - if ((rv == 0) && (bsa != NULL)) { - rv = nni_posix_epdesc_sockname((void *) ep, bsa); - } - return (rv); -} - -void -nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_connect((void *) ep, aio); -} - -void -nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_accept((void *) ep, aio); -} - -void -nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_fini((void *) p); -} - -void -nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_close((void *) p); -} - -void -nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_send((void *) p, aio); -} - -void -nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_recv((void *) p, aio); -} - -int -nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *p, nni_sockaddr *sa) -{ - return (nni_posix_pipedesc_peername((void *) p, sa)); -} - -int -nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *p, nni_sockaddr *sa) -{ - return (nni_posix_pipedesc_sockname((void *) p, sa)); -} - -int -nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *p, bool v) -{ - return (nni_posix_pipedesc_set_keepalive((void *) p, v)); -} - -int -nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *p, bool v) -{ - return (nni_posix_pipedesc_set_nodelay((void *) p, v)); -} - -int nni_plat_tcp_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) { const void *ap; diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h new file mode 100644 index 00000000..aefce7f7 --- /dev/null +++ b/src/platform/posix/posix_tcp.h @@ -0,0 +1,44 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +struct nni_tcp_conn { + nni_posix_pfd * pfd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_aio * dial_aio; + nni_tcp_dialer *dialer; + nni_reap_item reap; +}; + +struct nni_tcp_dialer { + nni_list connq; // pending connections + bool closed; + nni_mtx mtx; +}; + +struct nni_tcp_listener { + nni_posix_pfd *pfd; + nni_list acceptq; + bool started; + bool closed; + nni_mtx mtx; +}; + +extern int nni_posix_tcp_conn_init(nni_tcp_conn **, nni_posix_pfd *); +extern void nni_posix_tcp_conn_start(nni_tcp_conn *); + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c new file mode 100644 index 00000000..30525648 --- /dev/null +++ b/src/platform/posix/posix_tcpconn.c @@ -0,0 +1,410 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <poll.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> +#ifdef NNG_HAVE_ALLOCA +#include <alloca.h> +#endif + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#include "posix_tcp.h" + +static void +tcp_conn_dowrite(nni_tcp_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->writeq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov * aiov; + struct msghdr hdr; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + memset(&hdr, 0, sizeof(hdr)); + nni_aio_get_iov(aio, &naiov, &aiov); + +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len > 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + hdr.msg_iovlen = niov; + hdr.msg_iov = iovec; + + if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + nni_aio_bump_count(aio, n); + // We completed the entire operation on this aio. + // (Sendmsg never returns a partial result.) + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +static void +tcp_conn_doread(nni_tcp_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->readq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov *aiov; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + nni_aio_get_iov(aio, &naiov, &aiov); +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + if (n == 0) { + // No bytes indicates a closed descriptor. + // This implicitly completes this (all!) aio. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + continue; + } + + nni_aio_bump_count(aio, n); + + // We completed the entire operation on this aio. + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +void +nni_tcp_conn_close(nni_tcp_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + nni_aio *aio; + c->closed = true; + while (((aio = nni_list_first(&c->readq)) != NULL) || + ((aio = nni_list_first(&c->writeq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_posix_pfd_close(c->pfd); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_tcp_conn *c = arg; + + if (events & (POLLHUP | POLLERR | POLLNVAL)) { + nni_tcp_conn_close(c); + return; + } + nni_mtx_lock(&c->mtx); + if (events & POLLIN) { + tcp_conn_doread(c); + } + if (events & POLLOUT) { + tcp_conn_dowrite(c); + } + events = 0; + if (!nni_list_empty(&c->writeq)) { + events |= POLLOUT; + } + if (!nni_list_empty(&c->readq)) { + events |= POLLIN; + } + if ((!c->closed) && (events != 0)) { + nni_posix_pfd_arm(pfd, events); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_conn_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) +{ + + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->writeq, aio); + + if (nni_list_first(&c->writeq) == aio) { + tcp_conn_dowrite(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->writeq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLOUT); + } + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->readq, aio); + + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. + if (nni_list_first(&c->readq) == aio) { + tcp_conn_doread(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->readq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLIN); + } + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa) +{ + struct sockaddr_storage ss; + socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(c->pfd); + + if (getpeername(fd, (void *) &ss, &sslen) != 0) { + return (nni_plat_errno(errno)); + } + return (nni_posix_sockaddr2nn(sa, &ss)); +} + +int +nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa) +{ + struct sockaddr_storage ss; + socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(c->pfd); + + if (getsockname(fd, (void *) &ss, &sslen) != 0) { + return (nni_plat_errno(errno)); + } + return (nni_posix_sockaddr2nn(sa, &ss)); +} + +int +nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep) +{ + int val = keep ? 1 : 0; + int fd = nni_posix_pfd_fd(c->pfd); + + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { + return (nni_plat_errno(errno)); + } + return (0); +} + +int +nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay) +{ + + int val = nodelay ? 1 : 0; + int fd = nni_posix_pfd_fd(c->pfd); + + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { + return (nni_plat_errno(errno)); + } + return (0); +} + +int +nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd) +{ + nni_tcp_conn *c; + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + + c->closed = false; + c->pfd = pfd; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + *cp = c; + return (0); +} + +void +nni_posix_tcp_conn_start(nni_tcp_conn *c) +{ + nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c); +} + +void +nni_tcp_conn_fini(nni_tcp_conn *c) +{ + nni_tcp_conn_close(c); + nni_posix_pfd_fini(c->pfd); + c->pfd = NULL; + nni_mtx_fini(&c->mtx); + + NNI_FREE_STRUCT(c); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c new file mode 100644 index 00000000..7f627361 --- /dev/null +++ b/src/platform/posix/posix_tcpdial.c @@ -0,0 +1,234 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_tcp.h" + +// Dialer stuff. +int +nni_tcp_dialer_init(nni_tcp_dialer **dp) +{ + nni_tcp_dialer *d; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&d->mtx); + d->closed = false; + nni_aio_list_init(&d->connq); + *dp = d; + return (0); +} + +void +nni_tcp_dialer_close(nni_tcp_dialer *d) +{ + nni_mtx_lock(&d->mtx); + if (!d->closed) { + nni_aio *aio; + d->closed = true; + while ((aio = nni_list_first(&d->connq)) != NULL) { + nni_tcp_conn *c; + nni_list_remove(&d->connq, aio); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_tcp_conn_close(c); + nni_reap( + &c->reap, (nni_cb) nni_tcp_conn_fini, c); + } + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } + nni_mtx_unlock(&d->mtx); +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_close(d); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +tcp_dialer_cancel(nni_aio *aio, int rv) +{ + nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_conn * c; + + nni_mtx_lock(&d->mtx); + if ((!nni_aio_list_active(aio)) || + ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) { + nni_mtx_unlock(&d->mtx); + return; + } + nni_aio_list_remove(aio); + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + nni_aio_finish_error(aio, rv); + nni_tcp_conn_fini(c); +} + +static void +tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg) +{ + nni_tcp_conn * c = arg; + nni_tcp_dialer *d = c->dialer; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + aio = c->dial_aio; + if ((aio == NULL) || (!nni_aio_list_active(aio))) { + nni_mtx_unlock(&d->mtx); + return; + } + + if (ev & POLLNVAL) { + rv = EBADF; + + } else { + socklen_t sz = sizeof(int); + int fd = nni_posix_pfd_fd(pfd); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + if (rv == EINPROGRESS) { + // Connection still in progress, come back + // later. + nni_mtx_unlock(&d->mtx); + return; + } else if (rv != 0) { + rv = nni_plat_errno(rv); + } + } + + c->dial_aio = NULL; + nni_aio_list_remove(aio); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + if (rv != 0) { + nni_tcp_conn_close(c); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +// We don't give local address binding support. Outbound dialers always +// get an ephemeral port. +void +nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + nni_tcp_conn * c; + nni_posix_pfd * pfd = NULL; + struct sockaddr_storage ss; + size_t sslen; + int fd; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_aio_finish_error(aio, nni_plat_errno(errno)); + return; + } + + // This arranges for the fd to be in nonblocking mode, and adds the + // pollfd to the list. + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + (void) close(fd); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_finish_error(aio, rv); + return; + } + c->dialer = d; + nni_posix_pfd_set_cb(pfd, tcp_dialer_cb, c); + + nni_mtx_lock(&d->mtx); + if (d->closed) { + rv = NNG_ECLOSED; + goto error; + } + if ((rv = nni_aio_schedule(aio, tcp_dialer_cancel, d)) != 0) { + goto error; + } + if ((rv = connect(fd, (void *) &ss, sslen)) != 0) { + if (errno != EINPROGRESS) { + rv = nni_plat_errno(errno); + goto error; + } + // Asynchronous connect. + if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) { + goto error; + } + c->dial_aio = aio; + nni_aio_set_prov_extra(aio, 0, c); + nni_list_append(&d->connq, aio); + nni_mtx_unlock(&d->mtx); + return; + } + // Immediate connect, cool! This probably only happens + // on loopback, and probably not on every platform. + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + return; + +error: + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_reap(&c->reap, (nni_cb) nni_tcp_conn_fini, c); + nni_aio_finish_error(aio, rv); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c new file mode 100644 index 00000000..cdcbecd9 --- /dev/null +++ b/src/platform/posix/posix_tcplisten.c @@ -0,0 +1,319 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_tcp.h" + +int +nni_tcp_listener_init(nni_tcp_listener **lp) +{ + nni_tcp_listener *l; + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_init(&l->mtx); + + l->pfd = NULL; + l->closed = false; + l->started = false; + + nni_aio_list_init(&l->acceptq); + *lp = l; + return (0); +} + +static void +tcp_listener_doclose(nni_tcp_listener *l) +{ + nni_aio *aio; + + l->closed = true; + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + + if (l->pfd != NULL) { + nni_posix_pfd_close(l->pfd); + } +} + +void +nni_tcp_listener_close(nni_tcp_listener *l) +{ + nni_mtx_lock(&l->mtx); + tcp_listener_doclose(l); + nni_mtx_unlock(&l->mtx); +} + +static void +tcp_listener_doaccept(nni_tcp_listener *l) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + int newfd; + int fd; + int rv; + nni_posix_pfd *pfd; + nni_tcp_conn * c; + + fd = nni_posix_pfd_fd(l->pfd); + +#ifdef NNG_USE_ACCEPT4 + newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); + if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { + newfd = accept(fd, NULL, NULL); + } +#else + newfd = accept(fd, NULL, NULL); +#endif + if (newfd < 0) { + switch (errno) { + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + rv = nni_posix_pfd_arm(l->pfd, POLLIN); + if (rv != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + // Come back later... + return; + case ECONNABORTED: + case ECONNRESET: + // Eat them, they aren't interesting. + continue; + default: + // Error this one, but keep moving to the next. + rv = nni_plat_errno(errno); + NNI_ASSERT(rv != 0); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + } + + if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + close(newfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + nni_aio_list_remove(aio); + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +static void +tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_tcp_listener *l = arg; + + nni_mtx_lock(&l->mtx); + if (events & POLLNVAL) { + tcp_listener_doclose(l); + nni_mtx_unlock(&l->mtx); + return; + } + NNI_ASSERT(pfd == l->pfd); + + // Anything else will turn up in accept. + tcp_listener_doaccept(l); + nni_mtx_unlock(&l->mtx); +} + +static void +tcp_listener_cancel(nni_aio *aio, int rv) +{ + nni_tcp_listener *l = nni_aio_get_prov_data(aio); + + // This is dead easy, because we'll ignore the completion if there + // isn't anything to do the accept on! + NNI_ASSERT(rv != 0); + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +int +nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) +{ + socklen_t len; + struct sockaddr_storage ss; + int rv; + int fd; + nni_posix_pfd * pfd; + + if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) { + return (NNG_EADDRINVAL); + } + + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_ESTATE); + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_mtx_unlock(&l->mtx); + return (nni_plat_errno(errno)); + } + + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + +// On the Windows Subsystem for Linux, SO_REUSEADDR behaves like Windows +// SO_REUSEADDR, which is almost completely different (and wrong!) from +// traditional SO_REUSEADDR. +#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL) + { + int on = 1; + // If for some reason this doesn't work, it's probably ok. + // Second bind will fail. + (void) setsockopt( + fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + } +#endif + + if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + + // Listen -- 128 depth is probably sufficient. If it isn't, other + // bad things are going to happen. + if (listen(fd, 128) != 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + + // Lets get the bound sockname, and pass that back to the caller. + // This permits ephemeral port binding to work. + // If this fails for some reason, we just don't update the + // sockaddr structure. This is kind of suboptimal, but failures + // here should never occur. + len = sizeof(ss); + (void) getsockname(fd, (void *) &ss, &len); + (void) nni_posix_sockaddr2nn(sa, &ss); + + nni_posix_pfd_set_cb(pfd, tcp_listener_cb, l); + + l->pfd = pfd; + l->started = true; + nni_mtx_unlock(&l->mtx); + + return (0); +} + +void +nni_tcp_listener_fini(nni_tcp_listener *l) +{ + nni_posix_pfd *pfd; + + nni_mtx_lock(&l->mtx); + tcp_listener_doclose(l); + pfd = l->pfd; + nni_mtx_unlock(&l->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +void +nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) +{ + int rv; + + // Accept is simpler than the connect case. With accept we just + // need to wait for the socket to be readable to indicate an incoming + // connection is ready for us. There isn't anything else for us to + // do really, as that will have been done in listen. + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + + if (!l->started) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&l->acceptq, aio); + if (nni_list_first(&l->acceptq) == aio) { + tcp_listener_doaccept(l); + } + nni_mtx_unlock(&l->mtx); +} + +#endif // NNG_PLATFORM_POSIX |
