aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_aio.h3
-rw-r--r--src/platform/posix/posix_epdesc.c3
-rw-r--r--src/platform/posix/posix_pollq_poll.c2
-rw-r--r--src/platform/posix/posix_resolv_gai.c46
-rw-r--r--src/platform/posix/posix_tcp.c109
-rw-r--r--src/platform/posix/posix_tcp.h44
-rw-r--r--src/platform/posix/posix_tcpconn.c410
-rw-r--r--src/platform/posix/posix_tcpdial.c234
-rw-r--r--src/platform/posix/posix_tcplisten.c319
9 files changed, 1055 insertions, 115 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 83f2f1a8..3b1ce751 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -20,8 +20,8 @@
#include "core/nng_impl.h"
#include "posix_pollq.h"
+#include <sys/stat.h> // needed for musl build
#include <sys/types.h> // needed for mode_t
-#include <sys/stat.h> // needed for musl build
typedef struct nni_posix_pipedesc nni_posix_pipedesc;
typedef struct nni_posix_epdesc nni_posix_epdesc;
@@ -48,5 +48,4 @@ extern int nni_posix_epdesc_listen(nni_posix_epdesc *);
extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *);
extern int nni_posix_epdesc_sockname(nni_posix_epdesc *, nni_sockaddr *);
extern int nni_posix_epdesc_set_permissions(nni_posix_epdesc *, mode_t);
-
#endif // PLATFORM_POSIX_AIO_H
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 74c92dbd..d796765a 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -188,13 +188,14 @@ nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_posix_epdesc *ed = arg;
+ NNI_ARG_UNUSED(pfd);
+
nni_mtx_lock(&ed->mtx);
if (events & POLLNVAL) {
nni_epdesc_doclose(ed);
nni_mtx_unlock(&ed->mtx);
return;
}
- NNI_ASSERT(pfd == ed->pfd);
// Anything else will turn up in accept.
nni_epdesc_doaccept(ed);
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index 26753df6..ec487de5 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -302,7 +302,7 @@ static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
nni_mtx_lock(&pq->mtx);
- pq->closing = 1;
+ pq->closing = true;
nni_mtx_unlock(&pq->mtx);
nni_plat_pipe_raise(pq->wakewfd);
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index 8c003362..63d858c2 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -13,6 +13,7 @@
#ifdef NNG_USE_POSIX_RESOLV_GAI
#include "platform/posix/posix_aio.h"
+#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <netdb.h>
@@ -274,14 +275,14 @@ resolv_ip(const char *host, const char *serv, int passive, int family,
}
void
-nni_plat_tcp_resolv(
+nni_tcp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
}
void
-nni_plat_udp_resolv(
+nni_udp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
@@ -330,6 +331,47 @@ resolv_worker(void *notused)
}
int
+nni_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr)
+{
+ const void *ap;
+ uint16_t port;
+ int af;
+ switch (sa->s_family) {
+ case NNG_AF_INET:
+ ap = &sa->s_in.sa_addr;
+ port = sa->s_in.sa_port;
+ af = AF_INET;
+ break;
+ case NNG_AF_INET6:
+ ap = &sa->s_in6.sa_addr;
+ port = sa->s_in6.sa_port;
+ af = AF_INET6;
+ break;
+ default:
+ return (NNG_EINVAL);
+ }
+ if (ipstr != NULL) {
+ if (af == AF_INET6) {
+ size_t l;
+ ipstr[0] = '[';
+ inet_ntop(af, ap, ipstr + 1, INET6_ADDRSTRLEN);
+ l = strlen(ipstr);
+ ipstr[l++] = ']';
+ ipstr[l++] = '\0';
+ } else {
+ inet_ntop(af, ap, ipstr, INET6_ADDRSTRLEN);
+ }
+ }
+ if (portstr != NULL) {
+#ifdef NNG_LITTLE_ENDIAN
+ port = ((port >> 8) & 0xff) | ((port & 0xff) << 8);
+#endif
+ snprintf(portstr, 6, "%u", port);
+ }
+ return (0);
+}
+
+int
nni_posix_resolv_sysinit(void)
{
nni_mtx_init(&resolv_mtx);
diff --git a/src/platform/posix/posix_tcp.c b/src/platform/posix/posix_tcp.c
index cbc7a641..53ea8026 100644
--- a/src/platform/posix/posix_tcp.c
+++ b/src/platform/posix/posix_tcp.c
@@ -26,115 +26,6 @@
#include <unistd.h>
int
-nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa,
- const nni_sockaddr *rsa, int mode)
-{
- nni_posix_epdesc * ed;
- int rv;
- struct sockaddr_storage ss;
- int len;
-
- if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) {
- return (rv);
- }
-
- if ((rsa != NULL) && (rsa->s_family != NNG_AF_UNSPEC)) {
- len = nni_posix_nn2sockaddr((void *) &ss, rsa);
- nni_posix_epdesc_set_remote(ed, &ss, len);
- }
- if ((lsa != NULL) && (lsa->s_family != NNG_AF_UNSPEC)) {
- len = nni_posix_nn2sockaddr((void *) &ss, lsa);
- nni_posix_epdesc_set_local(ed, &ss, len);
- }
-
- *epp = (void *) ed;
- return (0);
-}
-
-void
-nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep)
-{
- nni_posix_epdesc_fini((void *) ep);
-}
-
-void
-nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep)
-{
- nni_posix_epdesc_close((void *) ep);
-}
-
-int
-nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep, nng_sockaddr *bsa)
-{
- int rv;
- rv = nni_posix_epdesc_listen((void *) ep);
- if ((rv == 0) && (bsa != NULL)) {
- rv = nni_posix_epdesc_sockname((void *) ep, bsa);
- }
- return (rv);
-}
-
-void
-nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_connect((void *) ep, aio);
-}
-
-void
-nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_accept((void *) ep, aio);
-}
-
-void
-nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p)
-{
- nni_posix_pipedesc_fini((void *) p);
-}
-
-void
-nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p)
-{
- nni_posix_pipedesc_close((void *) p);
-}
-
-void
-nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_send((void *) p, aio);
-}
-
-void
-nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_recv((void *) p, aio);
-}
-
-int
-nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *p, nni_sockaddr *sa)
-{
- return (nni_posix_pipedesc_peername((void *) p, sa));
-}
-
-int
-nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *p, nni_sockaddr *sa)
-{
- return (nni_posix_pipedesc_sockname((void *) p, sa));
-}
-
-int
-nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *p, bool v)
-{
- return (nni_posix_pipedesc_set_keepalive((void *) p, v));
-}
-
-int
-nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *p, bool v)
-{
- return (nni_posix_pipedesc_set_nodelay((void *) p, v));
-}
-
-int
nni_plat_tcp_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr)
{
const void *ap;
diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h
new file mode 100644
index 00000000..aefce7f7
--- /dev/null
+++ b/src/platform/posix/posix_tcp.h
@@ -0,0 +1,44 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+struct nni_tcp_conn {
+ nni_posix_pfd * pfd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
+ nni_aio * dial_aio;
+ nni_tcp_dialer *dialer;
+ nni_reap_item reap;
+};
+
+struct nni_tcp_dialer {
+ nni_list connq; // pending connections
+ bool closed;
+ nni_mtx mtx;
+};
+
+struct nni_tcp_listener {
+ nni_posix_pfd *pfd;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ nni_mtx mtx;
+};
+
+extern int nni_posix_tcp_conn_init(nni_tcp_conn **, nni_posix_pfd *);
+extern void nni_posix_tcp_conn_start(nni_tcp_conn *);
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
new file mode 100644
index 00000000..30525648
--- /dev/null
+++ b/src/platform/posix/posix_tcpconn.c
@@ -0,0 +1,410 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <poll.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+#ifdef NNG_HAVE_ALLOCA
+#include <alloca.h>
+#endif
+
+#ifndef MSG_NOSIGNAL
+#define MSG_NOSIGNAL 0
+#endif
+
+#include "posix_tcp.h"
+
+static void
+tcp_conn_dowrite(nni_tcp_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->writeq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov * aiov;
+ struct msghdr hdr;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ memset(&hdr, 0, sizeof(hdr));
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len > 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ hdr.msg_iovlen = niov;
+ hdr.msg_iov = iovec;
+
+ if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ nni_aio_bump_count(aio, n);
+ // We completed the entire operation on this aio.
+ // (Sendmsg never returns a partial result.)
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+static void
+tcp_conn_doread(nni_tcp_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->readq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov *aiov;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len != 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ if (n == 0) {
+ // No bytes indicates a closed descriptor.
+ // This implicitly completes this (all!) aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ continue;
+ }
+
+ nni_aio_bump_count(aio, n);
+
+ // We completed the entire operation on this aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+void
+nni_tcp_conn_close(nni_tcp_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ nni_aio *aio;
+ c->closed = true;
+ while (((aio = nni_list_first(&c->readq)) != NULL) ||
+ ((aio = nni_list_first(&c->writeq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_posix_pfd_close(c->pfd);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_tcp_conn *c = arg;
+
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
+ nni_tcp_conn_close(c);
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (events & POLLIN) {
+ tcp_conn_doread(c);
+ }
+ if (events & POLLOUT) {
+ tcp_conn_dowrite(c);
+ }
+ events = 0;
+ if (!nni_list_empty(&c->writeq)) {
+ events |= POLLOUT;
+ }
+ if (!nni_list_empty(&c->readq)) {
+ events |= POLLIN;
+ }
+ if ((!c->closed) && (events != 0)) {
+ nni_posix_pfd_arm(pfd, events);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+tcp_conn_cancel(nni_aio *aio, int rv)
+{
+ nni_tcp_conn *c = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&c->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
+{
+
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->writeq, aio);
+
+ if (nni_list_first(&c->writeq) == aio) {
+ tcp_conn_dowrite(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->writeq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLOUT);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->readq, aio);
+
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
+ if (nni_list_first(&c->readq) == aio) {
+ tcp_conn_doread(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->readq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLIN);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa)
+{
+ struct sockaddr_storage ss;
+ socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(c->pfd);
+
+ if (getpeername(fd, (void *) &ss, &sslen) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ return (nni_posix_sockaddr2nn(sa, &ss));
+}
+
+int
+nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa)
+{
+ struct sockaddr_storage ss;
+ socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(c->pfd);
+
+ if (getsockname(fd, (void *) &ss, &sslen) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ return (nni_posix_sockaddr2nn(sa, &ss));
+}
+
+int
+nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep)
+{
+ int val = keep ? 1 : 0;
+ int fd = nni_posix_pfd_fd(c->pfd);
+
+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ return (0);
+}
+
+int
+nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay)
+{
+
+ int val = nodelay ? 1 : 0;
+ int fd = nni_posix_pfd_fd(c->pfd);
+
+ if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ return (0);
+}
+
+int
+nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd)
+{
+ nni_tcp_conn *c;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ c->closed = false;
+ c->pfd = pfd;
+
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->readq);
+ nni_aio_list_init(&c->writeq);
+
+ *cp = c;
+ return (0);
+}
+
+void
+nni_posix_tcp_conn_start(nni_tcp_conn *c)
+{
+ nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c);
+}
+
+void
+nni_tcp_conn_fini(nni_tcp_conn *c)
+{
+ nni_tcp_conn_close(c);
+ nni_posix_pfd_fini(c->pfd);
+ c->pfd = NULL;
+ nni_mtx_fini(&c->mtx);
+
+ NNI_FREE_STRUCT(c);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
new file mode 100644
index 00000000..7f627361
--- /dev/null
+++ b/src/platform/posix/posix_tcpdial.c
@@ -0,0 +1,234 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_tcp.h"
+
+// Dialer stuff.
+int
+nni_tcp_dialer_init(nni_tcp_dialer **dp)
+{
+ nni_tcp_dialer *d;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&d->mtx);
+ d->closed = false;
+ nni_aio_list_init(&d->connq);
+ *dp = d;
+ return (0);
+}
+
+void
+nni_tcp_dialer_close(nni_tcp_dialer *d)
+{
+ nni_mtx_lock(&d->mtx);
+ if (!d->closed) {
+ nni_aio *aio;
+ d->closed = true;
+ while ((aio = nni_list_first(&d->connq)) != NULL) {
+ nni_tcp_conn *c;
+ nni_list_remove(&d->connq, aio);
+ if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_tcp_conn_close(c);
+ nni_reap(
+ &c->reap, (nni_cb) nni_tcp_conn_fini, c);
+ }
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_tcp_dialer_fini(nni_tcp_dialer *d)
+{
+ nni_tcp_dialer_close(d);
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+static void
+tcp_dialer_cancel(nni_aio *aio, int rv)
+{
+ nni_tcp_dialer *d = nni_aio_get_prov_data(aio);
+ nni_tcp_conn * c;
+
+ nni_mtx_lock(&d->mtx);
+ if ((!nni_aio_list_active(aio)) ||
+ ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ nni_aio_finish_error(aio, rv);
+ nni_tcp_conn_fini(c);
+}
+
+static void
+tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
+{
+ nni_tcp_conn * c = arg;
+ nni_tcp_dialer *d = c->dialer;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&d->mtx);
+ aio = c->dial_aio;
+ if ((aio == NULL) || (!nni_aio_list_active(aio))) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ if (ev & POLLNVAL) {
+ rv = EBADF;
+
+ } else {
+ socklen_t sz = sizeof(int);
+ int fd = nni_posix_pfd_fd(pfd);
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ rv = errno;
+ }
+ if (rv == EINPROGRESS) {
+ // Connection still in progress, come back
+ // later.
+ nni_mtx_unlock(&d->mtx);
+ return;
+ } else if (rv != 0) {
+ rv = nni_plat_errno(rv);
+ }
+ }
+
+ c->dial_aio = NULL;
+ nni_aio_list_remove(aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ if (rv != 0) {
+ nni_tcp_conn_close(c);
+ nni_tcp_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_posix_tcp_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+// We don't give local address binding support. Outbound dialers always
+// get an ephemeral port.
+void
+nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+{
+ nni_tcp_conn * c;
+ nni_posix_pfd * pfd = NULL;
+ struct sockaddr_storage ss;
+ size_t sslen;
+ int fd;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
+
+ if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ nni_aio_finish_error(aio, nni_plat_errno(errno));
+ return;
+ }
+
+ // This arranges for the fd to be in nonblocking mode, and adds the
+ // pollfd to the list.
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ c->dialer = d;
+ nni_posix_pfd_set_cb(pfd, tcp_dialer_cb, c);
+
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ rv = NNG_ECLOSED;
+ goto error;
+ }
+ if ((rv = nni_aio_schedule(aio, tcp_dialer_cancel, d)) != 0) {
+ goto error;
+ }
+ if ((rv = connect(fd, (void *) &ss, sslen)) != 0) {
+ if (errno != EINPROGRESS) {
+ rv = nni_plat_errno(errno);
+ goto error;
+ }
+ // Asynchronous connect.
+ if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
+ goto error;
+ }
+ c->dial_aio = aio;
+ nni_aio_set_prov_extra(aio, 0, c);
+ nni_list_append(&d->connq, aio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ // Immediate connect, cool! This probably only happens
+ // on loopback, and probably not on every platform.
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_posix_tcp_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ return;
+
+error:
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_reap(&c->reap, (nni_cb) nni_tcp_conn_fini, c);
+ nni_aio_finish_error(aio, rv);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
new file mode 100644
index 00000000..cdcbecd9
--- /dev/null
+++ b/src/platform/posix/posix_tcplisten.c
@@ -0,0 +1,319 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_tcp.h"
+
+int
+nni_tcp_listener_init(nni_tcp_listener **lp)
+{
+ nni_tcp_listener *l;
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&l->mtx);
+
+ l->pfd = NULL;
+ l->closed = false;
+ l->started = false;
+
+ nni_aio_list_init(&l->acceptq);
+ *lp = l;
+ return (0);
+}
+
+static void
+tcp_listener_doclose(nni_tcp_listener *l)
+{
+ nni_aio *aio;
+
+ l->closed = true;
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+
+ if (l->pfd != NULL) {
+ nni_posix_pfd_close(l->pfd);
+ }
+}
+
+void
+nni_tcp_listener_close(nni_tcp_listener *l)
+{
+ nni_mtx_lock(&l->mtx);
+ tcp_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+tcp_listener_doaccept(nni_tcp_listener *l)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ int newfd;
+ int fd;
+ int rv;
+ nni_posix_pfd *pfd;
+ nni_tcp_conn * c;
+
+ fd = nni_posix_pfd_fd(l->pfd);
+
+#ifdef NNG_USE_ACCEPT4
+ newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
+ if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
+ newfd = accept(fd, NULL, NULL);
+ }
+#else
+ newfd = accept(fd, NULL, NULL);
+#endif
+ if (newfd < 0) {
+ switch (errno) {
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ rv = nni_posix_pfd_arm(l->pfd, POLLIN);
+ if (rv != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ // Come back later...
+ return;
+ case ECONNABORTED:
+ case ECONNRESET:
+ // Eat them, they aren't interesting.
+ continue;
+ default:
+ // Error this one, but keep moving to the next.
+ rv = nni_plat_errno(errno);
+ NNI_ASSERT(rv != 0);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ }
+
+ if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ close(newfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ nni_aio_list_remove(aio);
+ nni_posix_tcp_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+static void
+tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_tcp_listener *l = arg;
+
+ nni_mtx_lock(&l->mtx);
+ if (events & POLLNVAL) {
+ tcp_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ NNI_ASSERT(pfd == l->pfd);
+
+ // Anything else will turn up in accept.
+ tcp_listener_doaccept(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+tcp_listener_cancel(nni_aio *aio, int rv)
+{
+ nni_tcp_listener *l = nni_aio_get_prov_data(aio);
+
+ // This is dead easy, because we'll ignore the completion if there
+ // isn't anything to do the accept on!
+ NNI_ASSERT(rv != 0);
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+int
+nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa)
+{
+ socklen_t len;
+ struct sockaddr_storage ss;
+ int rv;
+ int fd;
+ nni_posix_pfd * pfd;
+
+ if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) {
+ return (NNG_EADDRINVAL);
+ }
+
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ESTATE);
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+
+ if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (nni_plat_errno(errno));
+ }
+
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+// On the Windows Subsystem for Linux, SO_REUSEADDR behaves like Windows
+// SO_REUSEADDR, which is almost completely different (and wrong!) from
+// traditional SO_REUSEADDR.
+#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL)
+ {
+ int on = 1;
+ // If for some reason this doesn't work, it's probably ok.
+ // Second bind will fail.
+ (void) setsockopt(
+ fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ }
+#endif
+
+ if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ // Listen -- 128 depth is probably sufficient. If it isn't, other
+ // bad things are going to happen.
+ if (listen(fd, 128) != 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ // Lets get the bound sockname, and pass that back to the caller.
+ // This permits ephemeral port binding to work.
+ // If this fails for some reason, we just don't update the
+ // sockaddr structure. This is kind of suboptimal, but failures
+ // here should never occur.
+ len = sizeof(ss);
+ (void) getsockname(fd, (void *) &ss, &len);
+ (void) nni_posix_sockaddr2nn(sa, &ss);
+
+ nni_posix_pfd_set_cb(pfd, tcp_listener_cb, l);
+
+ l->pfd = pfd;
+ l->started = true;
+ nni_mtx_unlock(&l->mtx);
+
+ return (0);
+}
+
+void
+nni_tcp_listener_fini(nni_tcp_listener *l)
+{
+ nni_posix_pfd *pfd;
+
+ nni_mtx_lock(&l->mtx);
+ tcp_listener_doclose(l);
+ pfd = l->pfd;
+ nni_mtx_unlock(&l->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+void
+nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
+{
+ int rv;
+
+ // Accept is simpler than the connect case. With accept we just
+ // need to wait for the socket to be readable to indicate an incoming
+ // connection is ready for us. There isn't anything else for us to
+ // do really, as that will have been done in listen.
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, tcp_listener_cancel, l)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&l->acceptq, aio);
+ if (nni_list_first(&l->acceptq) == aio) {
+ tcp_listener_doaccept(l);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+#endif // NNG_PLATFORM_POSIX