aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt7
-rw-r--r--src/core/aio.c2
-rw-r--r--src/core/platform.h138
-rw-r--r--src/platform/posix/posix_aio.h3
-rw-r--r--src/platform/posix/posix_epdesc.c3
-rw-r--r--src/platform/posix/posix_pollq_poll.c2
-rw-r--r--src/platform/posix/posix_resolv_gai.c46
-rw-r--r--src/platform/posix/posix_tcp.c109
-rw-r--r--src/platform/posix/posix_tcp.h44
-rw-r--r--src/platform/posix/posix_tcpconn.c410
-rw-r--r--src/platform/posix/posix_tcpdial.c234
-rw-r--r--src/platform/posix/posix_tcplisten.c319
-rw-r--r--src/platform/windows/win_impl.h24
-rw-r--r--src/platform/windows/win_io.c152
-rw-r--r--src/platform/windows/win_resolv.c49
-rw-r--r--src/platform/windows/win_tcp.c703
-rw-r--r--src/platform/windows/win_tcp.h67
-rw-r--r--src/platform/windows/win_tcpconn.c391
-rw-r--r--src/platform/windows/win_tcpdial.c228
-rw-r--r--src/platform/windows/win_tcplisten.c312
-rw-r--r--src/platform/windows/win_thread.c8
-rw-r--r--src/supplemental/http/http_api.h4
-rw-r--r--src/supplemental/http/http_client.c129
-rw-r--r--src/supplemental/http/http_conn.c57
-rw-r--r--src/supplemental/http/http_server.c75
-rw-r--r--src/supplemental/tls/mbedtls/tls.c32
-rw-r--r--src/supplemental/tls/none/tls.c9
-rw-r--r--src/supplemental/tls/tls_api.h2
-rw-r--r--src/transport/tcp/tcp.c1016
-rw-r--r--src/transport/tls/tls.c1056
30 files changed, 3748 insertions, 1883 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index e9561980..e89f782b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -104,6 +104,9 @@ if (NNG_PLATFORM_POSIX)
platform/posix/posix_resolv_gai.c
platform/posix/posix_sockaddr.c
platform/posix/posix_tcp.c
+ platform/posix/posix_tcpconn.c
+ platform/posix/posix_tcpdial.c
+ platform/posix/posix_tcplisten.c
platform/posix/posix_thread.c
platform/posix/posix_udp.c
)
@@ -133,6 +136,7 @@ if (NNG_PLATFORM_WINDOWS)
platform/windows/win_clock.c
platform/windows/win_debug.c
platform/windows/win_file.c
+ platform/windows/win_io.c
platform/windows/win_iocp.c
platform/windows/win_ipc.c
platform/windows/win_pipe.c
@@ -140,6 +144,9 @@ if (NNG_PLATFORM_WINDOWS)
platform/windows/win_resolv.c
platform/windows/win_sockaddr.c
platform/windows/win_tcp.c
+ platform/windows/win_tcpconn.c
+ platform/windows/win_tcpdial.c
+ platform/windows/win_tcplisten.c
platform/windows/win_thread.c
platform/windows/win_udp.c
)
diff --git a/src/core/aio.c b/src/core/aio.c
index c8517b9b..40638bce 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -378,8 +378,10 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
return (NNG_ECLOSED);
}
+ NNI_ASSERT(aio->a_prov_cancel == NULL);
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
+
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
diff --git a/src/core/platform.h b/src/core/platform.h
index 607c3827..5045b172 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -209,93 +209,103 @@ extern int nni_plat_ncpu(void);
//
// TCP Support.
//
+typedef struct nni_tcp_conn nni_tcp_conn;
+typedef struct nni_tcp_dialer nni_tcp_dialer;
+typedef struct nni_tcp_listener nni_tcp_listener;
-typedef struct nni_plat_tcp_ep nni_plat_tcp_ep;
-typedef struct nni_plat_tcp_pipe nni_plat_tcp_pipe;
+extern void nni_tcp_conn_fini(nni_tcp_conn *);
-// nni_plat_tcp_ep_init creates a new endpoint associated with the local
-// and remote addresses.
-extern int nni_plat_tcp_ep_init(
- nni_plat_tcp_ep **, const nni_sockaddr *, const nni_sockaddr *, int);
+// nni_tcp_dialer_close closes the dialer, which might actually be
+// implemented as a shutdown() call.
+// Further operations on it should return NNG_ECLOSED.
+extern void nni_tcp_conn_close(nni_tcp_conn *);
-// nni_plat_tcp_ep_fini closes the endpoint and releases resources.
-extern void nni_plat_tcp_ep_fini(nni_plat_tcp_ep *);
-
-// nni_plat_tcp_ep_close closes the endpoint; this might not close the
-// actual underlying socket, but it should call shutdown on it.
-// Further operations on the pipe should return NNG_ECLOSED.
-extern void nni_plat_tcp_ep_close(nni_plat_tcp_ep *);
-
-// nni_plat_tcp_listen creates an TCP socket in listening mode, bound
-// to the specified path.
-extern int nni_plat_tcp_ep_listen(nni_plat_tcp_ep *, nni_sockaddr *);
-
-// nni_plat_tcp_ep_accept starts an accept to receive an incoming connection.
-// An accepted connection will be passed back in the a_pipe member.
-extern void nni_plat_tcp_ep_accept(nni_plat_tcp_ep *, nni_aio *);
-
-// nni_plat_tcp_connect is the client side.
-// An accepted connection will be passed back in the a_pipe member.
-extern void nni_plat_tcp_ep_connect(nni_plat_tcp_ep *, nni_aio *);
-
-// nni_plat_tcp_pipe_fini closes the pipe, and releases all resources
-// associated with it.
-extern void nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *);
-
-// nni_plat_tcp_pipe_close closes the socket, or at least shuts it down.
-// Further operations on the pipe should return NNG_ECLOSED.
-extern void nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *);
-
-// nni_plat_tcp_pipe_send sends data in the iov buffers to the peer.
+// nni_tcp_conn_send sends data in the iov buffers to the peer.
// The platform may modify the iovs.
-extern void nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *, nni_aio *);
+extern void nni_tcp_conn_send(nni_tcp_conn *, nni_aio *);
-// nni_plat_tcp_pipe_recv receives data into the buffers provided by the
+// nni_tcp_conn_recv receives data into the buffers provided by the
// I/O vector (iovs). The platform should attempt to scatter the received
// data into the iovs if possible.
//
-// It is an error for the caller to supply any IO vector elements with
-// zero length.
-//
-// It is possible for the TCP reader to return less data than is requested,
+// It is possible for the reader to return less data than is requested,
// in which case the caller is responsible for resubmitting. The platform
-// should not return "zero" data however. (It is an error to attempt to
-// receive zero bytes.) The platform may not modify the I/O vector.
-extern void nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *, nni_aio *);
+// must not return "zero" data however. (It is an error to attempt to
+// receive zero bytes.) The platform may modify the iovs.
+extern void nni_tcp_conn_recv(nni_tcp_conn *, nni_aio *);
-// nni_plat_tcp_pipe_peername gets the peer name.
-extern int nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *, nni_sockaddr *);
+// nni_tcp_conn_peername gets the peer name.
+extern int nni_tcp_conn_peername(nni_tcp_conn *, nni_sockaddr *);
-// nni_plat_tcp_pipe_sockname gets the local name.
-extern int nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *, nni_sockaddr *);
+// nni_tcp_conn_sockname gets the local name.
+extern int nni_tcp_conn_sockname(nni_tcp_conn *, nni_sockaddr *);
-// nni_plat_tcp_pipe_set_nodelay sets nodelay, disabling Nagle, according
-// to the parameter. true disables Nagle; false enables Nagle.
-extern int nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *, bool);
+// nni_tcp_conn_set_nodelay indicates that the TCP pipe should send
+// data immediately, without any buffering. (Disable Nagle's algorithm.)
+extern int nni_tcp_conn_set_nodelay(nni_tcp_conn *, bool);
-// nni_plat_tcp_pipe_set_keepalive indicates that the TCP pipe should send
+// nni_tcp_conn_set_keepalive indicates that the TCP pipe should send
// keepalive probes. Tuning of these keepalives is current unsupported.
-extern int nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *, bool);
-
-// nni_plat_tcp_ntop obtains the IP address for the socket (enclosing it
+extern int nni_tcp_conn_set_keepalive(nni_tcp_conn *, bool);
+
+// nni_tcp_listener_init creates a new dialer object.
+extern int nni_tcp_dialer_init(nni_tcp_dialer **);
+
+// nni_tcp_dialer_fini finalizes the dialer, closing it and freeing
+// all resources.
+extern void nni_tcp_dialer_fini(nni_tcp_dialer *);
+
+// nni_tcp_dialer_close closes the dialer.
+// Further operations on it should return NNG_ECLOSED. Any in-progress
+// connection will be aborted.
+extern void nni_tcp_dialer_close(nni_tcp_dialer *);
+
+// nni_tcp_dialer_dial attempts to create an outgoing connection,
+// asynchronously, to the address specified. On success, the first (and only)
+// output will be an nni_tcp_conn * associated with the remote server.
+extern void nni_tcp_dialer_dial(
+ nni_tcp_dialer *, const nni_sockaddr *, nni_aio *);
+
+// nni_tcp_listener_init creates a new listener object, unbound.
+extern int nni_tcp_listener_init(nni_tcp_listener **);
+
+// nni_tcp_listener_fini frees the listener and all associated resources.
+// It implictly closes the listener as well.
+extern void nni_tcp_listener_fini(nni_tcp_listener *);
+
+// nni_tcp_listener_close closes the listener. This will unbind
+// any bound socket, and further operations will result in NNG_ECLOSED.
+extern void nni_tcp_listener_close(nni_tcp_listener *);
+
+// nni_tcp_listener_listen creates the socket in listening mode, bound
+// to the specified address. The address will be updated to reflect
+// the actual address bound (making it possible to bind to port 0 to
+// specify an ephemeral address, and then the actual address can be
+// examined afterwards.)
+extern int nni_tcp_listener_listen(nni_tcp_listener *, nni_sockaddr *);
+
+// nni_tcp_listener_accept accepts in incoming connect, asynchronously.
+// On success, the first (and only) output will be an nni_tcp_conn *
+// associated with the remote peer.
+extern void nni_tcp_listener_accept(nni_tcp_listener *, nni_aio *);
+
+// nni_ntop obtains the IP address for the socket (enclosing it
// in brackets if it is IPv6) and port. Enough space for both must
// be present (48 bytes and 6 bytes each), although if either is NULL then
-// those components are skipped.
-extern int nni_plat_tcp_ntop(const nni_sockaddr *, char *, char *);
+// those components are skipped. This is based on inet_ntop.
+extern int nni_ntop(const nni_sockaddr *, char *, char *);
-// nni_plat_tcp_resolv resolves a TCP name asynchronously. The family
+// nni_tcp_resolv resolves a TCP name asynchronously. The family
// should be one of NNG_AF_INET, NNG_AF_INET6, or NNG_AF_UNSPEC. The
// first two constrain the name to those families, while the third will
// return names of either family. The passive flag indicates that the
// name will be used for bind(), otherwise the name will be used with
// connect(). The host part may be NULL only if passive is true.
-extern void nni_plat_tcp_resolv(
- const char *, const char *, int, int, nni_aio *);
+extern void nni_tcp_resolv(const char *, const char *, int, int, nni_aio *);
-// nni_plat_udp_resolve is just like nni_plat_tcp_resolve, but looks up
+// nni_udp_resolv is just like nni_tcp_resolv, but looks up
// service names using UDP.
-extern void nni_plat_udp_resolv(
- const char *, const char *, int, int, nni_aio *);
+extern void nni_udp_resolv(const char *, const char *, int, int, nni_aio *);
//
// IPC (UNIX Domain Sockets & Named Pipes) Support.
@@ -317,7 +327,7 @@ extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *);
// Further operations on the pipe should return NNG_ECLOSED.
extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *);
-// nni_plat_tcp_listen creates an IPC socket in listening mode, bound
+// nni_plat_ipc_listen creates an IPC socket in listening mode, bound
// to the specified path.
extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *);
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 83f2f1a8..3b1ce751 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -20,8 +20,8 @@
#include "core/nng_impl.h"
#include "posix_pollq.h"
+#include <sys/stat.h> // needed for musl build
#include <sys/types.h> // needed for mode_t
-#include <sys/stat.h> // needed for musl build
typedef struct nni_posix_pipedesc nni_posix_pipedesc;
typedef struct nni_posix_epdesc nni_posix_epdesc;
@@ -48,5 +48,4 @@ extern int nni_posix_epdesc_listen(nni_posix_epdesc *);
extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *);
extern int nni_posix_epdesc_sockname(nni_posix_epdesc *, nni_sockaddr *);
extern int nni_posix_epdesc_set_permissions(nni_posix_epdesc *, mode_t);
-
#endif // PLATFORM_POSIX_AIO_H
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 74c92dbd..d796765a 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -188,13 +188,14 @@ nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_posix_epdesc *ed = arg;
+ NNI_ARG_UNUSED(pfd);
+
nni_mtx_lock(&ed->mtx);
if (events & POLLNVAL) {
nni_epdesc_doclose(ed);
nni_mtx_unlock(&ed->mtx);
return;
}
- NNI_ASSERT(pfd == ed->pfd);
// Anything else will turn up in accept.
nni_epdesc_doaccept(ed);
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index 26753df6..ec487de5 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -302,7 +302,7 @@ static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
nni_mtx_lock(&pq->mtx);
- pq->closing = 1;
+ pq->closing = true;
nni_mtx_unlock(&pq->mtx);
nni_plat_pipe_raise(pq->wakewfd);
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index 8c003362..63d858c2 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -13,6 +13,7 @@
#ifdef NNG_USE_POSIX_RESOLV_GAI
#include "platform/posix/posix_aio.h"
+#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <netdb.h>
@@ -274,14 +275,14 @@ resolv_ip(const char *host, const char *serv, int passive, int family,
}
void
-nni_plat_tcp_resolv(
+nni_tcp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
}
void
-nni_plat_udp_resolv(
+nni_udp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
@@ -330,6 +331,47 @@ resolv_worker(void *notused)
}
int
+nni_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr)
+{
+ const void *ap;
+ uint16_t port;
+ int af;
+ switch (sa->s_family) {
+ case NNG_AF_INET:
+ ap = &sa->s_in.sa_addr;
+ port = sa->s_in.sa_port;
+ af = AF_INET;
+ break;
+ case NNG_AF_INET6:
+ ap = &sa->s_in6.sa_addr;
+ port = sa->s_in6.sa_port;
+ af = AF_INET6;
+ break;
+ default:
+ return (NNG_EINVAL);
+ }
+ if (ipstr != NULL) {
+ if (af == AF_INET6) {
+ size_t l;
+ ipstr[0] = '[';
+ inet_ntop(af, ap, ipstr + 1, INET6_ADDRSTRLEN);
+ l = strlen(ipstr);
+ ipstr[l++] = ']';
+ ipstr[l++] = '\0';
+ } else {
+ inet_ntop(af, ap, ipstr, INET6_ADDRSTRLEN);
+ }
+ }
+ if (portstr != NULL) {
+#ifdef NNG_LITTLE_ENDIAN
+ port = ((port >> 8) & 0xff) | ((port & 0xff) << 8);
+#endif
+ snprintf(portstr, 6, "%u", port);
+ }
+ return (0);
+}
+
+int
nni_posix_resolv_sysinit(void)
{
nni_mtx_init(&resolv_mtx);
diff --git a/src/platform/posix/posix_tcp.c b/src/platform/posix/posix_tcp.c
index cbc7a641..53ea8026 100644
--- a/src/platform/posix/posix_tcp.c
+++ b/src/platform/posix/posix_tcp.c
@@ -26,115 +26,6 @@
#include <unistd.h>
int
-nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa,
- const nni_sockaddr *rsa, int mode)
-{
- nni_posix_epdesc * ed;
- int rv;
- struct sockaddr_storage ss;
- int len;
-
- if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) {
- return (rv);
- }
-
- if ((rsa != NULL) && (rsa->s_family != NNG_AF_UNSPEC)) {
- len = nni_posix_nn2sockaddr((void *) &ss, rsa);
- nni_posix_epdesc_set_remote(ed, &ss, len);
- }
- if ((lsa != NULL) && (lsa->s_family != NNG_AF_UNSPEC)) {
- len = nni_posix_nn2sockaddr((void *) &ss, lsa);
- nni_posix_epdesc_set_local(ed, &ss, len);
- }
-
- *epp = (void *) ed;
- return (0);
-}
-
-void
-nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep)
-{
- nni_posix_epdesc_fini((void *) ep);
-}
-
-void
-nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep)
-{
- nni_posix_epdesc_close((void *) ep);
-}
-
-int
-nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep, nng_sockaddr *bsa)
-{
- int rv;
- rv = nni_posix_epdesc_listen((void *) ep);
- if ((rv == 0) && (bsa != NULL)) {
- rv = nni_posix_epdesc_sockname((void *) ep, bsa);
- }
- return (rv);
-}
-
-void
-nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_connect((void *) ep, aio);
-}
-
-void
-nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_accept((void *) ep, aio);
-}
-
-void
-nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p)
-{
- nni_posix_pipedesc_fini((void *) p);
-}
-
-void
-nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p)
-{
- nni_posix_pipedesc_close((void *) p);
-}
-
-void
-nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_send((void *) p, aio);
-}
-
-void
-nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_recv((void *) p, aio);
-}
-
-int
-nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *p, nni_sockaddr *sa)
-{
- return (nni_posix_pipedesc_peername((void *) p, sa));
-}
-
-int
-nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *p, nni_sockaddr *sa)
-{
- return (nni_posix_pipedesc_sockname((void *) p, sa));
-}
-
-int
-nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *p, bool v)
-{
- return (nni_posix_pipedesc_set_keepalive((void *) p, v));
-}
-
-int
-nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *p, bool v)
-{
- return (nni_posix_pipedesc_set_nodelay((void *) p, v));
-}
-
-int
nni_plat_tcp_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr)
{
const void *ap;
diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h
new file mode 100644
index 00000000..aefce7f7
--- /dev/null
+++ b/src/platform/posix/posix_tcp.h
@@ -0,0 +1,44 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+struct nni_tcp_conn {
+ nni_posix_pfd * pfd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
+ nni_aio * dial_aio;
+ nni_tcp_dialer *dialer;
+ nni_reap_item reap;
+};
+
+struct nni_tcp_dialer {
+ nni_list connq; // pending connections
+ bool closed;
+ nni_mtx mtx;
+};
+
+struct nni_tcp_listener {
+ nni_posix_pfd *pfd;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ nni_mtx mtx;
+};
+
+extern int nni_posix_tcp_conn_init(nni_tcp_conn **, nni_posix_pfd *);
+extern void nni_posix_tcp_conn_start(nni_tcp_conn *);
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
new file mode 100644
index 00000000..30525648
--- /dev/null
+++ b/src/platform/posix/posix_tcpconn.c
@@ -0,0 +1,410 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <poll.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+#ifdef NNG_HAVE_ALLOCA
+#include <alloca.h>
+#endif
+
+#ifndef MSG_NOSIGNAL
+#define MSG_NOSIGNAL 0
+#endif
+
+#include "posix_tcp.h"
+
+static void
+tcp_conn_dowrite(nni_tcp_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->writeq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov * aiov;
+ struct msghdr hdr;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ memset(&hdr, 0, sizeof(hdr));
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len > 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ hdr.msg_iovlen = niov;
+ hdr.msg_iov = iovec;
+
+ if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ nni_aio_bump_count(aio, n);
+ // We completed the entire operation on this aio.
+ // (Sendmsg never returns a partial result.)
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+static void
+tcp_conn_doread(nni_tcp_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->readq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov *aiov;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len != 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ if (n == 0) {
+ // No bytes indicates a closed descriptor.
+ // This implicitly completes this (all!) aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ continue;
+ }
+
+ nni_aio_bump_count(aio, n);
+
+ // We completed the entire operation on this aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+void
+nni_tcp_conn_close(nni_tcp_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ nni_aio *aio;
+ c->closed = true;
+ while (((aio = nni_list_first(&c->readq)) != NULL) ||
+ ((aio = nni_list_first(&c->writeq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_posix_pfd_close(c->pfd);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_tcp_conn *c = arg;
+
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
+ nni_tcp_conn_close(c);
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (events & POLLIN) {
+ tcp_conn_doread(c);
+ }
+ if (events & POLLOUT) {
+ tcp_conn_dowrite(c);
+ }
+ events = 0;
+ if (!nni_list_empty(&c->writeq)) {
+ events |= POLLOUT;
+ }
+ if (!nni_list_empty(&c->readq)) {
+ events |= POLLIN;
+ }
+ if ((!c->closed) && (events != 0)) {
+ nni_posix_pfd_arm(pfd, events);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+tcp_conn_cancel(nni_aio *aio, int rv)
+{
+ nni_tcp_conn *c = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&c->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
+{
+
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->writeq, aio);
+
+ if (nni_list_first(&c->writeq) == aio) {
+ tcp_conn_dowrite(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->writeq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLOUT);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->readq, aio);
+
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
+ if (nni_list_first(&c->readq) == aio) {
+ tcp_conn_doread(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->readq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLIN);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa)
+{
+ struct sockaddr_storage ss;
+ socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(c->pfd);
+
+ if (getpeername(fd, (void *) &ss, &sslen) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ return (nni_posix_sockaddr2nn(sa, &ss));
+}
+
+int
+nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa)
+{
+ struct sockaddr_storage ss;
+ socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(c->pfd);
+
+ if (getsockname(fd, (void *) &ss, &sslen) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ return (nni_posix_sockaddr2nn(sa, &ss));
+}
+
+int
+nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep)
+{
+ int val = keep ? 1 : 0;
+ int fd = nni_posix_pfd_fd(c->pfd);
+
+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ return (0);
+}
+
+int
+nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay)
+{
+
+ int val = nodelay ? 1 : 0;
+ int fd = nni_posix_pfd_fd(c->pfd);
+
+ if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ return (0);
+}
+
+int
+nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd)
+{
+ nni_tcp_conn *c;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ c->closed = false;
+ c->pfd = pfd;
+
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->readq);
+ nni_aio_list_init(&c->writeq);
+
+ *cp = c;
+ return (0);
+}
+
+void
+nni_posix_tcp_conn_start(nni_tcp_conn *c)
+{
+ nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c);
+}
+
+void
+nni_tcp_conn_fini(nni_tcp_conn *c)
+{
+ nni_tcp_conn_close(c);
+ nni_posix_pfd_fini(c->pfd);
+ c->pfd = NULL;
+ nni_mtx_fini(&c->mtx);
+
+ NNI_FREE_STRUCT(c);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
new file mode 100644
index 00000000..7f627361
--- /dev/null
+++ b/src/platform/posix/posix_tcpdial.c
@@ -0,0 +1,234 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_tcp.h"
+
+// Dialer stuff.
+int
+nni_tcp_dialer_init(nni_tcp_dialer **dp)
+{
+ nni_tcp_dialer *d;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&d->mtx);
+ d->closed = false;
+ nni_aio_list_init(&d->connq);
+ *dp = d;
+ return (0);
+}
+
+void
+nni_tcp_dialer_close(nni_tcp_dialer *d)
+{
+ nni_mtx_lock(&d->mtx);
+ if (!d->closed) {
+ nni_aio *aio;
+ d->closed = true;
+ while ((aio = nni_list_first(&d->connq)) != NULL) {
+ nni_tcp_conn *c;
+ nni_list_remove(&d->connq, aio);
+ if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_tcp_conn_close(c);
+ nni_reap(
+ &c->reap, (nni_cb) nni_tcp_conn_fini, c);
+ }
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_tcp_dialer_fini(nni_tcp_dialer *d)
+{
+ nni_tcp_dialer_close(d);
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+static void
+tcp_dialer_cancel(nni_aio *aio, int rv)
+{
+ nni_tcp_dialer *d = nni_aio_get_prov_data(aio);
+ nni_tcp_conn * c;
+
+ nni_mtx_lock(&d->mtx);
+ if ((!nni_aio_list_active(aio)) ||
+ ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ nni_aio_finish_error(aio, rv);
+ nni_tcp_conn_fini(c);
+}
+
+static void
+tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
+{
+ nni_tcp_conn * c = arg;
+ nni_tcp_dialer *d = c->dialer;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&d->mtx);
+ aio = c->dial_aio;
+ if ((aio == NULL) || (!nni_aio_list_active(aio))) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ if (ev & POLLNVAL) {
+ rv = EBADF;
+
+ } else {
+ socklen_t sz = sizeof(int);
+ int fd = nni_posix_pfd_fd(pfd);
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ rv = errno;
+ }
+ if (rv == EINPROGRESS) {
+ // Connection still in progress, come back
+ // later.
+ nni_mtx_unlock(&d->mtx);
+ return;
+ } else if (rv != 0) {
+ rv = nni_plat_errno(rv);
+ }
+ }
+
+ c->dial_aio = NULL;
+ nni_aio_list_remove(aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ if (rv != 0) {
+ nni_tcp_conn_close(c);
+ nni_tcp_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_posix_tcp_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+// We don't give local address binding support. Outbound dialers always
+// get an ephemeral port.
+void
+nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+{
+ nni_tcp_conn * c;
+ nni_posix_pfd * pfd = NULL;
+ struct sockaddr_storage ss;
+ size_t sslen;
+ int fd;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
+
+ if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ nni_aio_finish_error(aio, nni_plat_errno(errno));
+ return;
+ }
+
+ // This arranges for the fd to be in nonblocking mode, and adds the
+ // pollfd to the list.
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ c->dialer = d;
+ nni_posix_pfd_set_cb(pfd, tcp_dialer_cb, c);
+
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ rv = NNG_ECLOSED;
+ goto error;
+ }
+ if ((rv = nni_aio_schedule(aio, tcp_dialer_cancel, d)) != 0) {
+ goto error;
+ }
+ if ((rv = connect(fd, (void *) &ss, sslen)) != 0) {
+ if (errno != EINPROGRESS) {
+ rv = nni_plat_errno(errno);
+ goto error;
+ }
+ // Asynchronous connect.
+ if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
+ goto error;
+ }
+ c->dial_aio = aio;
+ nni_aio_set_prov_extra(aio, 0, c);
+ nni_list_append(&d->connq, aio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ // Immediate connect, cool! This probably only happens
+ // on loopback, and probably not on every platform.
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_posix_tcp_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ return;
+
+error:
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_reap(&c->reap, (nni_cb) nni_tcp_conn_fini, c);
+ nni_aio_finish_error(aio, rv);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
new file mode 100644
index 00000000..cdcbecd9
--- /dev/null
+++ b/src/platform/posix/posix_tcplisten.c
@@ -0,0 +1,319 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_tcp.h"
+
+int
+nni_tcp_listener_init(nni_tcp_listener **lp)
+{
+ nni_tcp_listener *l;
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&l->mtx);
+
+ l->pfd = NULL;
+ l->closed = false;
+ l->started = false;
+
+ nni_aio_list_init(&l->acceptq);
+ *lp = l;
+ return (0);
+}
+
+static void
+tcp_listener_doclose(nni_tcp_listener *l)
+{
+ nni_aio *aio;
+
+ l->closed = true;
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+
+ if (l->pfd != NULL) {
+ nni_posix_pfd_close(l->pfd);
+ }
+}
+
+void
+nni_tcp_listener_close(nni_tcp_listener *l)
+{
+ nni_mtx_lock(&l->mtx);
+ tcp_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+tcp_listener_doaccept(nni_tcp_listener *l)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ int newfd;
+ int fd;
+ int rv;
+ nni_posix_pfd *pfd;
+ nni_tcp_conn * c;
+
+ fd = nni_posix_pfd_fd(l->pfd);
+
+#ifdef NNG_USE_ACCEPT4
+ newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
+ if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
+ newfd = accept(fd, NULL, NULL);
+ }
+#else
+ newfd = accept(fd, NULL, NULL);
+#endif
+ if (newfd < 0) {
+ switch (errno) {
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ rv = nni_posix_pfd_arm(l->pfd, POLLIN);
+ if (rv != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ // Come back later...
+ return;
+ case ECONNABORTED:
+ case ECONNRESET:
+ // Eat them, they aren't interesting.
+ continue;
+ default:
+ // Error this one, but keep moving to the next.
+ rv = nni_plat_errno(errno);
+ NNI_ASSERT(rv != 0);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ }
+
+ if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ close(newfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ nni_aio_list_remove(aio);
+ nni_posix_tcp_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+static void
+tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_tcp_listener *l = arg;
+
+ nni_mtx_lock(&l->mtx);
+ if (events & POLLNVAL) {
+ tcp_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ NNI_ASSERT(pfd == l->pfd);
+
+ // Anything else will turn up in accept.
+ tcp_listener_doaccept(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+tcp_listener_cancel(nni_aio *aio, int rv)
+{
+ nni_tcp_listener *l = nni_aio_get_prov_data(aio);
+
+ // This is dead easy, because we'll ignore the completion if there
+ // isn't anything to do the accept on!
+ NNI_ASSERT(rv != 0);
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+int
+nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa)
+{
+ socklen_t len;
+ struct sockaddr_storage ss;
+ int rv;
+ int fd;
+ nni_posix_pfd * pfd;
+
+ if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) {
+ return (NNG_EADDRINVAL);
+ }
+
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ESTATE);
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+
+ if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (nni_plat_errno(errno));
+ }
+
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+// On the Windows Subsystem for Linux, SO_REUSEADDR behaves like Windows
+// SO_REUSEADDR, which is almost completely different (and wrong!) from
+// traditional SO_REUSEADDR.
+#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL)
+ {
+ int on = 1;
+ // If for some reason this doesn't work, it's probably ok.
+ // Second bind will fail.
+ (void) setsockopt(
+ fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ }
+#endif
+
+ if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ // Listen -- 128 depth is probably sufficient. If it isn't, other
+ // bad things are going to happen.
+ if (listen(fd, 128) != 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ // Lets get the bound sockname, and pass that back to the caller.
+ // This permits ephemeral port binding to work.
+ // If this fails for some reason, we just don't update the
+ // sockaddr structure. This is kind of suboptimal, but failures
+ // here should never occur.
+ len = sizeof(ss);
+ (void) getsockname(fd, (void *) &ss, &len);
+ (void) nni_posix_sockaddr2nn(sa, &ss);
+
+ nni_posix_pfd_set_cb(pfd, tcp_listener_cb, l);
+
+ l->pfd = pfd;
+ l->started = true;
+ nni_mtx_unlock(&l->mtx);
+
+ return (0);
+}
+
+void
+nni_tcp_listener_fini(nni_tcp_listener *l)
+{
+ nni_posix_pfd *pfd;
+
+ nni_mtx_lock(&l->mtx);
+ tcp_listener_doclose(l);
+ pfd = l->pfd;
+ nni_mtx_unlock(&l->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+void
+nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
+{
+ int rv;
+
+ // Accept is simpler than the connect case. With accept we just
+ // need to wait for the socket to be readable to indicate an incoming
+ // connection is ready for us. There isn't anything else for us to
+ // do really, as that will have been done in listen.
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, tcp_listener_cancel, l)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&l->acceptq, aio);
+ if (nni_list_first(&l->acceptq) == aio) {
+ tcp_listener_doaccept(l);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index 263a322b..93e45423 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -80,6 +80,17 @@ struct nni_win_event {
nni_win_event_ops ops;
};
+typedef struct nni_win_io nni_win_io;
+typedef void (*nni_win_io_cb)(nni_win_io *, int, size_t);
+
+struct nni_win_io {
+ OVERLAPPED olpd;
+ HANDLE f;
+ void * ptr;
+ nni_aio * aio;
+ nni_win_io_cb cb;
+};
+
struct nni_plat_flock {
HANDLE h;
};
@@ -94,6 +105,13 @@ extern void nni_win_event_complete(nni_win_event *, int);
extern int nni_win_iocp_register(HANDLE);
+extern int nni_win_tcp_conn_init(nni_tcp_conn **, SOCKET);
+extern void nni_win_tcp_conn_set_addrs(
+ nni_tcp_conn *, const SOCKADDR_STORAGE *, const SOCKADDR_STORAGE *);
+
+extern int nni_win_io_sysinit(void);
+extern void nni_win_io_sysfini(void);
+
extern int nni_win_iocp_sysinit(void);
extern void nni_win_iocp_sysfini(void);
@@ -109,6 +127,12 @@ extern void nni_win_udp_sysfini(void);
extern int nni_win_resolv_sysinit(void);
extern void nni_win_resolv_sysfini(void);
+extern int nni_win_io_init(nni_win_io *, HANDLE, nni_win_io_cb, void *);
+extern void nni_win_io_fini(nni_win_io *);
+extern void nni_win_io_cancel(nni_win_io *);
+
+extern int nni_win_io_register(HANDLE);
+
extern int nni_win_sockaddr2nn(nni_sockaddr *, const SOCKADDR_STORAGE *);
extern int nni_win_nn2sockaddr(SOCKADDR_STORAGE *, const nni_sockaddr *);
diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c
new file mode 100644
index 00000000..1179b603
--- /dev/null
+++ b/src/platform/windows/win_io.c
@@ -0,0 +1,152 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include <stdio.h>
+
+// Windows IO Completion Port support. We basically create a single
+// IO completion port, then start threads on it. Handles are added
+// to the port on an as needed basis. We use a single IO completion
+// port for pretty much everything.
+
+static int win_io_nthr = 0;
+static HANDLE win_io_h = NULL;
+static nni_thr *win_io_thrs;
+
+static void
+win_io_handler(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+
+ for (;;) {
+ DWORD cnt;
+ BOOL ok;
+ nni_win_io *item;
+ OVERLAPPED *olpd = NULL;
+ ULONG_PTR key = 0;
+ int rv;
+
+ ok = GetQueuedCompletionStatus(
+ win_io_h, &cnt, &key, &olpd, INFINITE);
+
+ if (olpd == NULL) {
+ // Completion port closed...
+ NNI_ASSERT(ok == FALSE);
+ break;
+ }
+
+ item = CONTAINING_RECORD(olpd, nni_win_io, olpd);
+ rv = ok ? 0 : nni_win_error(GetLastError());
+ item->cb(item, rv, (size_t) cnt);
+ }
+}
+
+int
+nni_win_io_register(HANDLE h)
+{
+ if (CreateIoCompletionPort(h, win_io_h, 0, 0) == NULL) {
+ return (nni_win_error(GetLastError()));
+ }
+ return (0);
+}
+
+int
+nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr)
+{
+ ZeroMemory(&io->olpd, sizeof(io->olpd));
+
+ io->cb = cb;
+ io->ptr = ptr;
+ io->aio = NULL;
+ io->f = f;
+ io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
+ if (io->olpd.hEvent == NULL) {
+ return (nni_win_error(GetLastError()));
+ }
+ return (0);
+}
+
+void
+nni_win_io_cancel(nni_win_io *io)
+{
+ if (io->f != INVALID_HANDLE_VALUE) {
+ CancelIoEx(io->f, &io->olpd);
+ }
+}
+
+void
+nni_win_io_fini(nni_win_io *io)
+{
+ if (io->olpd.hEvent != NULL) {
+ CloseHandle((HANDLE) io->olpd.hEvent);
+ }
+}
+
+int
+nni_win_io_sysinit(void)
+{
+ HANDLE h;
+ int i;
+ int rv;
+ int nthr = nni_plat_ncpu() * 2;
+
+ // Limits on the thread count. This is fairly arbitrary.
+ if (nthr < 4) {
+ nthr = 4;
+ }
+ if (nthr > 64) {
+ nthr = 64;
+ }
+ if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ win_io_nthr = nthr;
+
+ h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nthr);
+ if (h == NULL) {
+ return (nni_win_error(GetLastError()));
+ }
+ win_io_h = h;
+
+ for (i = 0; i < win_io_nthr; i++) {
+ rv = nni_thr_init(&win_io_thrs[i], win_io_handler, NULL);
+ if (rv != 0) {
+ goto fail;
+ }
+ }
+ for (i = 0; i < win_io_nthr; i++) {
+ nni_thr_run(&win_io_thrs[i]);
+ }
+ return (0);
+
+fail:
+ nni_win_io_sysfini();
+ return (rv);
+}
+
+void
+nni_win_io_sysfini(void)
+{
+ int i;
+ HANDLE h;
+
+ if ((h = win_io_h) != NULL) {
+ CloseHandle(h);
+ win_io_h = NULL;
+ }
+ for (i = 0; i < win_io_nthr; i++) {
+ nni_thr_fini(&win_io_thrs[i]);
+ }
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index fb9d6751..e2cd192e 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -69,7 +69,7 @@ resolv_cancel(nni_aio *aio, int rv)
}
static int
-resolv_gai_errno(int rv)
+resolv_errno(int rv)
{
switch (rv) {
case 0:
@@ -116,7 +116,7 @@ resolv_task(resolv_item *item)
hints.ai_family = item->family;
if ((rv = getaddrinfo(item->name, "80", &hints, &results)) != 0) {
- rv = resolv_gai_errno(rv);
+ rv = resolv_errno(rv);
goto done;
}
@@ -246,14 +246,14 @@ resolv_ip(const char *host, const char *serv, int passive, int family,
}
void
-nni_plat_tcp_resolv(
+nni_tcp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
}
void
-nni_plat_udp_resolv(
+nni_udp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
@@ -302,6 +302,47 @@ resolv_worker(void *notused)
}
int
+nni_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr)
+{
+ void * ap;
+ uint16_t port;
+ int af;
+ switch (sa->s_family) {
+ case NNG_AF_INET:
+ ap = (void *) &sa->s_in.sa_addr;
+ port = sa->s_in.sa_port;
+ af = AF_INET;
+ break;
+ case NNG_AF_INET6:
+ ap = (void *) &sa->s_in6.sa_addr;
+ port = sa->s_in6.sa_port;
+ af = AF_INET6;
+ break;
+ default:
+ return (NNG_EINVAL);
+ }
+ if (ipstr != NULL) {
+ if (af == AF_INET6) {
+ size_t l;
+ ipstr[0] = '[';
+ InetNtopA(af, ap, ipstr + 1, INET6_ADDRSTRLEN);
+ l = strlen(ipstr);
+ ipstr[l++] = ']';
+ ipstr[l++] = '\0';
+ } else {
+ InetNtopA(af, ap, ipstr, INET6_ADDRSTRLEN);
+ }
+ }
+ if (portstr != NULL) {
+#ifdef NNG_LITTLE_ENDIAN
+ port = ((port >> 8) & 0xff) | ((port & 0xff) << 8);
+#endif
+ snprintf(portstr, 6, "%u", port);
+ }
+ return (0);
+}
+
+int
nni_win_resolv_sysinit(void)
{
nni_mtx_init(&resolv_mtx);
diff --git a/src/platform/windows/win_tcp.c b/src/platform/windows/win_tcp.c
index 33c4a1a5..2ab6be0f 100644
--- a/src/platform/windows/win_tcp.c
+++ b/src/platform/windows/win_tcp.c
@@ -15,709 +15,6 @@
#include <malloc.h>
#include <stdio.h>
-struct nni_plat_tcp_pipe {
- SOCKET s;
- nni_win_event rcv_ev;
- nni_win_event snd_ev;
- SOCKADDR_STORAGE sockname;
- SOCKADDR_STORAGE peername;
-};
-
-struct nni_plat_tcp_ep {
- SOCKET s;
- SOCKET acc_s;
- nni_win_event con_ev;
- nni_win_event acc_ev;
- int started;
- int bound;
-
- SOCKADDR_STORAGE remaddr;
- int remlen;
- SOCKADDR_STORAGE locaddr;
- int loclen;
-
- char buf[512]; // to hold acceptex results
-
- // We have to lookup some function pointers using ioctls. Winsock,
- // gotta love it. Especially I love that asynch accept means that
- // getsockname and getpeername don't work.
- LPFN_CONNECTEX connectex;
- LPFN_ACCEPTEX acceptex;
- LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs;
-};
-
-static int nni_win_tcp_pipe_start(nni_win_event *, nni_aio *);
-static void nni_win_tcp_pipe_finish(nni_win_event *, nni_aio *);
-static void nni_win_tcp_pipe_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_tcp_pipe_ops = {
- .wev_start = nni_win_tcp_pipe_start,
- .wev_finish = nni_win_tcp_pipe_finish,
- .wev_cancel = nni_win_tcp_pipe_cancel,
-};
-
-static int nni_win_tcp_acc_start(nni_win_event *, nni_aio *);
-static void nni_win_tcp_acc_finish(nni_win_event *, nni_aio *);
-static void nni_win_tcp_acc_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_tcp_acc_ops = {
- .wev_start = nni_win_tcp_acc_start,
- .wev_finish = nni_win_tcp_acc_finish,
- .wev_cancel = nni_win_tcp_acc_cancel,
-};
-
-static int nni_win_tcp_con_start(nni_win_event *, nni_aio *);
-static void nni_win_tcp_con_finish(nni_win_event *, nni_aio *);
-static void nni_win_tcp_con_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_tcp_con_ops = {
- .wev_start = nni_win_tcp_con_start,
- .wev_finish = nni_win_tcp_con_finish,
- .wev_cancel = nni_win_tcp_con_cancel,
-};
-
-static void
-nni_win_tcp_sockinit(SOCKET s)
-{
- BOOL yes;
- DWORD no;
-
- // Don't inherit the handle (CLOEXEC really).
- SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
-
- no = 0;
- (void) setsockopt(
- s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no));
-
- // Also disable Nagle. We are careful to group data with WSASend,
- // and latency is king for most of our users. (Consider adding
- // a method to enable this later.)
- yes = 1;
- (void) setsockopt(
- s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes));
-}
-
-static int
-nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio)
-{
- int rv;
- SOCKET s;
- DWORD niov;
- DWORD flags;
- nni_plat_tcp_pipe *pipe = evt->ptr;
- unsigned i;
- unsigned naiov;
- nni_iov * aiov;
- WSABUF * iov;
-
- nni_aio_get_iov(aio, &naiov, &aiov);
- iov = _malloca(naiov * sizeof(*iov));
-
- // Put the AIOs in Windows form.
- for (niov = 0, i = 0; i < naiov; i++) {
- if (aiov[i].iov_len != 0) {
- iov[niov].buf = aiov[i].iov_buf;
- iov[niov].len = (ULONG) aiov[i].iov_len;
- niov++;
- }
- }
-
- if ((s = pipe->s) == INVALID_SOCKET) {
- _freea(iov);
- evt->status = NNG_ECLOSED;
- evt->count = 0;
- return (1);
- }
-
- // Note that the IOVs for the event were prepared on entry already.
- // The actual aio's iov array we don't touch.
-
- evt->count = 0;
- flags = 0;
- if (evt == &pipe->snd_ev) {
- rv = WSASend(s, iov, niov, NULL, flags, &evt->olpd, NULL);
- } else {
- rv = WSARecv(s, iov, niov, NULL, &flags, &evt->olpd, NULL);
- }
- _freea(iov);
-
- if ((rv == SOCKET_ERROR) &&
- ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
-
- // Wait for the I/O completion event. Note that when an I/O
- // completes immediately, the I/O completion packet is still
- // delivered.
- return (0);
-}
-
-static void
-nni_win_tcp_pipe_cancel(nni_win_event *evt)
-{
- nni_plat_tcp_pipe *pipe = evt->ptr;
-
- (void) CancelIoEx((HANDLE) pipe->s, &evt->olpd);
-}
-
-static void
-nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio)
-{
- if ((evt->status == 0) && (evt->count == 0)) {
- // Windows sometimes returns a zero read. Convert these
- // into an NNG_ECLOSED. (We are never supposed to come
- // back with zero length read.)
- evt->status = NNG_ECLOSED;
- }
- nni_aio_finish(aio, evt->status, evt->count);
-}
-
-static int
-nni_win_tcp_pipe_init(nni_plat_tcp_pipe **pipep, SOCKET s)
-{
- nni_plat_tcp_pipe *pipe;
- int rv;
-
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_tcp_pipe_ops, pipe);
- if (rv != 0) {
- nni_plat_tcp_pipe_fini(pipe);
- return (rv);
- }
- rv = nni_win_event_init(&pipe->snd_ev, &nni_win_tcp_pipe_ops, pipe);
- if (rv != 0) {
- nni_plat_tcp_pipe_fini(pipe);
- return (rv);
- }
- nni_win_tcp_sockinit(s);
- pipe->s = s;
- *pipep = pipe;
- return (0);
-}
-
-void
-nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *pipe, nni_aio *aio)
-{
- nni_win_event_submit(&pipe->snd_ev, aio);
-}
-
-void
-nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *pipe, nni_aio *aio)
-{
- nni_win_event_submit(&pipe->rcv_ev, aio);
-}
-
-void
-nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *pipe)
-{
- SOCKET s;
-
- nni_win_event_close(&pipe->rcv_ev);
-
- if ((s = pipe->s) != INVALID_SOCKET) {
- pipe->s = INVALID_SOCKET;
- closesocket(s);
- }
-}
-
-int
-nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *pipe, nni_sockaddr *sa)
-{
- if (nni_win_sockaddr2nn(sa, &pipe->peername) < 0) {
- return (NNG_EADDRINVAL);
- }
- return (0);
-}
-
-int
-nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *pipe, nni_sockaddr *sa)
-{
- if (nni_win_sockaddr2nn(sa, &pipe->sockname) < 0) {
- return (NNG_EADDRINVAL);
- }
- return (0);
-}
-
-int
-nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *pipe, bool val)
-{
- BOOL b;
- b = val ? TRUE : FALSE;
- if (setsockopt(pipe->s, IPPROTO_TCP, TCP_NODELAY, (void *) &b,
- sizeof(b)) != 0) {
- return (nni_win_error(WSAGetLastError()));
- }
- return (0);
-}
-
-int
-nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *pipe, bool val)
-{
- BOOL b;
- b = val ? TRUE : FALSE;
- if (setsockopt(pipe->s, SOL_SOCKET, SO_KEEPALIVE, (void *) &b,
- sizeof(b)) != 0) {
- return (nni_win_error(WSAGetLastError()));
- }
- return (0);
-}
-
-void
-nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *pipe)
-{
- nni_plat_tcp_pipe_close(pipe);
-
- nni_win_event_fini(&pipe->snd_ev);
- nni_win_event_fini(&pipe->rcv_ev);
- NNI_FREE_STRUCT(pipe);
-}
-
-int
-nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa,
- const nni_sockaddr *rsa, int mode)
-{
- nni_plat_tcp_ep *ep;
- int rv;
- SOCKET s;
- DWORD nbytes;
- GUID guid1 = WSAID_CONNECTEX;
- GUID guid2 = WSAID_ACCEPTEX;
- GUID guid3 = WSAID_GETACCEPTEXSOCKADDRS;
-
- NNI_ARG_UNUSED(mode);
-
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
- }
- ZeroMemory(ep, sizeof(*ep));
-
- ep->s = INVALID_SOCKET;
-
- if ((rsa != NULL) && (rsa->s_family != NNG_AF_UNSPEC)) {
- ep->remlen = nni_win_nn2sockaddr(&ep->remaddr, rsa);
- }
- if ((lsa != NULL) && (lsa->s_family != NNG_AF_UNSPEC)) {
- ep->loclen = nni_win_nn2sockaddr(&ep->locaddr, lsa);
- }
-
- // Create a scratch socket for use with ioctl.
- s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
- if (s == INVALID_SOCKET) {
- rv = nni_win_error(GetLastError());
- goto fail;
- }
-
- // Look up the function pointer.
- if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1,
- sizeof(guid1), &ep->connectex, sizeof(ep->connectex), &nbytes,
- NULL, NULL) == SOCKET_ERROR) {
- rv = nni_win_error(GetLastError());
- goto fail;
- }
- if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2,
- sizeof(guid2), &ep->acceptex, sizeof(ep->acceptex), &nbytes,
- NULL, NULL) == SOCKET_ERROR) {
- rv = nni_win_error(GetLastError());
- goto fail;
- }
- if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid3,
- sizeof(guid3), &ep->getacceptexsockaddrs,
- sizeof(ep->getacceptexsockaddrs), &nbytes, NULL,
- NULL) == SOCKET_ERROR) {
- rv = nni_win_error(GetLastError());
- goto fail;
- }
-
- closesocket(s);
- s = INVALID_SOCKET;
-
- // Now initialize the win events for later use.
- rv = nni_win_event_init(&ep->acc_ev, &nni_win_tcp_acc_ops, ep);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_win_event_init(&ep->con_ev, &nni_win_tcp_con_ops, ep);
- if (rv != 0) {
- goto fail;
- }
-
- *epp = ep;
- return (0);
-
-fail:
- if (s != INVALID_SOCKET) {
- closesocket(s);
- }
- nni_plat_tcp_ep_fini(ep);
- return (rv);
-}
-
-void
-nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep)
-{
- nni_win_event_close(&ep->acc_ev);
- nni_win_event_close(&ep->con_ev);
- if (ep->s != INVALID_SOCKET) {
- closesocket(ep->s);
- ep->s = INVALID_SOCKET;
- }
- if (ep->acc_s != INVALID_SOCKET) {
- closesocket(ep->acc_s);
- }
-}
-
-void
-nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep)
-{
- nni_plat_tcp_ep_close(ep);
- NNI_FREE_STRUCT(ep);
-}
-
-static int
-nni_win_tcp_listen(nni_plat_tcp_ep *ep, nni_sockaddr *bsa)
-{
- int rv;
- BOOL yes;
- SOCKET s;
-
- if (ep->started) {
- return (NNG_EBUSY);
- }
-
- s = socket(ep->locaddr.ss_family, SOCK_STREAM, IPPROTO_TCP);
- if (s == INVALID_SOCKET) {
- rv = nni_win_error(GetLastError());
- goto fail;
- }
-
- nni_win_tcp_sockinit(s);
-
- if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) {
- goto fail;
- }
-
- // Make sure that we use the address exclusively. Windows lets
- // others hijack us by default.
- yes = 1;
-
- rv = setsockopt(
- s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, sizeof(yes));
- if (rv != 0) {
- rv = nni_win_error(GetLastError());
- goto fail;
- }
- if (bind(s, (struct sockaddr *) &ep->locaddr, ep->loclen) != 0) {
- rv = nni_win_error(GetLastError());
- goto fail;
- }
-
- if (bsa != NULL) {
- SOCKADDR_STORAGE bound;
- int len = sizeof(bound);
- rv = getsockname(s, (SOCKADDR *) &bound, &len);
- if (rv != 0) {
- rv = nni_win_error(GetLastError());
- goto fail;
- }
- nni_win_sockaddr2nn(bsa, &bound);
- }
-
- if (listen(s, SOMAXCONN) != 0) {
- rv = nni_win_error(GetLastError());
- goto fail;
- }
-
- ep->s = s;
- ep->started = 1;
-
- return (0);
-
-fail:
- if (s != INVALID_SOCKET) {
- closesocket(s);
- }
- return (rv);
-}
-
-int
-nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep, nng_sockaddr *bsa)
-{
- int rv;
-
- nni_mtx_lock(&ep->acc_ev.mtx);
- rv = nni_win_tcp_listen(ep, bsa);
- nni_mtx_unlock(&ep->acc_ev.mtx);
- return (rv);
-}
-
-static void
-nni_win_tcp_acc_cancel(nni_win_event *evt)
-{
- nni_plat_tcp_ep *ep = evt->ptr;
- SOCKET s = ep->s;
-
- if (s != INVALID_SOCKET) {
- CancelIoEx((HANDLE) s, &evt->olpd);
- }
-}
-
-static void
-nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_tcp_ep * ep = evt->ptr;
- nni_plat_tcp_pipe *pipe;
- SOCKET s;
- int rv;
- int len1;
- int len2;
- SOCKADDR * sa1;
- SOCKADDR * sa2;
-
- s = ep->acc_s;
- ep->acc_s = INVALID_SOCKET;
-
- if (s == INVALID_SOCKET) {
- return;
- }
-
- if (((rv = evt->status) != 0) ||
- ((rv = nni_win_iocp_register((HANDLE) s)) != 0) ||
- ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) {
- closesocket(s);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- // Collect the local and peer addresses, because normal getsockname
- // and getpeername don't work with AcceptEx.
- len1 = (int) sizeof(pipe->sockname);
- len2 = (int) sizeof(pipe->peername);
- ep->getacceptexsockaddrs(
- ep->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2);
- NNI_ASSERT(len1 > 0);
- NNI_ASSERT(len1 < (int) sizeof(SOCKADDR_STORAGE));
- NNI_ASSERT(len2 > 0);
- NNI_ASSERT(len2 < (int) sizeof(SOCKADDR_STORAGE));
- memcpy(&pipe->sockname, sa1, len1);
- memcpy(&pipe->peername, sa2, len2);
-
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
-}
-
-static int
-nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_tcp_ep *ep = evt->ptr;
- SOCKET s = ep->s;
- SOCKET acc_s;
- DWORD cnt;
-
- NNI_ARG_UNUSED(aio);
-
- acc_s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
- if (acc_s == INVALID_SOCKET) {
- evt->status = nni_win_error(GetLastError());
- evt->count = 0;
- return (1);
- }
- ep->acc_s = acc_s;
-
- if (!ep->acceptex(s, acc_s, ep->buf, 0, 256, 256, &cnt, &evt->olpd)) {
- int rv = GetLastError();
- switch (rv) {
- case ERROR_IO_PENDING:
- // Normal asynchronous operation. Wait for
- // completion.
- return (0);
-
- default:
- // Fast-fail (synchronous).
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
- }
-
- // Synch completion right now. I/O completion packet delivered
- // already.
- return (0);
-}
-
-void
-nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio)
-{
- nni_win_event_submit(&ep->acc_ev, aio);
-}
-
-static void
-nni_win_tcp_con_cancel(nni_win_event *evt)
-{
- nni_plat_tcp_ep *ep = evt->ptr;
- SOCKET s = ep->s;
-
- if (s != INVALID_SOCKET) {
- CancelIoEx((HANDLE) s, &evt->olpd);
- }
-}
-
-static void
-nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_tcp_ep * ep = evt->ptr;
- nni_plat_tcp_pipe *pipe;
- SOCKET s;
- int rv;
- DWORD yes = 1;
- int len;
-
- s = ep->s;
- ep->s = INVALID_SOCKET;
-
- // The socket was already registered with the IOCP.
-
- if (((rv = evt->status) != 0) ||
- ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) {
- // The new pipe is already fine for us. Discard
- // the old one, since failed to be able to use it.
- closesocket(s);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- (void) setsockopt(s, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,
- (char *) &yes, sizeof(yes));
-
- // Windows seems to be unable to get peernames for sockets on
- // connect - perhaps because we supplied it already with connectex.
- // Rather than debugging it, just steal the address from the endpoint.
- memcpy(&pipe->peername, &ep->remaddr, ep->remlen);
-
- len = sizeof(pipe->sockname);
- (void) getsockname(s, (SOCKADDR *) &pipe->sockname, &len);
-
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
-}
-
-static int
-nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_tcp_ep *ep = evt->ptr;
- SOCKET s;
- SOCKADDR_STORAGE bss;
- int len;
- int rv;
- int family;
-
- NNI_ARG_UNUSED(aio);
-
- if (ep->loclen > 0) {
- family = ep->locaddr.ss_family;
- } else {
- family = ep->remaddr.ss_family;
- }
-
- s = socket(family, SOCK_STREAM, IPPROTO_TCP);
- if (s == INVALID_SOCKET) {
- evt->status = nni_win_error(GetLastError());
- evt->count = 0;
- return (1);
- }
-
- nni_win_tcp_sockinit(s);
-
- // Windows ConnectEx requires the socket to be bound first.
- if (ep->loclen > 0) {
- bss = ep->locaddr;
- len = ep->loclen;
- } else {
- ZeroMemory(&bss, sizeof(bss));
- bss.ss_family = ep->remaddr.ss_family;
- len = ep->remlen;
- }
- if (bind(s, (struct sockaddr *) &bss, len) < 0) {
- evt->status = nni_win_error(GetLastError());
- evt->count = 0;
- closesocket(s);
-
- return (1);
- }
- // Register with the I/O completion port so we can get the
- // events for the next call.
- if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) {
- closesocket(s);
- evt->status = rv;
- evt->count = 0;
- return (1);
- }
-
- ep->s = s;
- if (!ep->connectex(s, (struct sockaddr *) &ep->remaddr, ep->remlen,
- NULL, 0, NULL, &evt->olpd)) {
- if ((rv = GetLastError()) != ERROR_IO_PENDING) {
- closesocket(s);
- ep->s = INVALID_SOCKET;
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
- }
- return (0);
-}
-
-extern void
-nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio)
-{
- nni_win_event_submit(&ep->con_ev, aio);
-}
-
-int
-nni_plat_tcp_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr)
-{
- void * ap;
- uint16_t port;
- int af;
- switch (sa->s_family) {
- case NNG_AF_INET:
- ap = (void *) &sa->s_in.sa_addr;
- port = sa->s_in.sa_port;
- af = AF_INET;
- break;
- case NNG_AF_INET6:
- ap = (void *) &sa->s_in6.sa_addr;
- port = sa->s_in6.sa_port;
- af = AF_INET6;
- break;
- default:
- return (NNG_EINVAL);
- }
- if (ipstr != NULL) {
- if (af == AF_INET6) {
- size_t l;
- ipstr[0] = '[';
- InetNtopA(af, ap, ipstr + 1, INET6_ADDRSTRLEN);
- l = strlen(ipstr);
- ipstr[l++] = ']';
- ipstr[l++] = '\0';
- } else {
- InetNtopA(af, ap, ipstr, INET6_ADDRSTRLEN);
- }
- }
- if (portstr != NULL) {
-#ifdef NNG_LITTLE_ENDIAN
- port = ((port >> 8) & 0xff) | ((port & 0xff) << 8);
-#endif
- snprintf(portstr, 6, "%u", port);
- }
- return (0);
-}
-
int
nni_win_tcp_sysinit(void)
{
diff --git a/src/platform/windows/win_tcp.h b/src/platform/windows/win_tcp.h
new file mode 100644
index 00000000..7025af81
--- /dev/null
+++ b/src/platform/windows/win_tcp.h
@@ -0,0 +1,67 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef PLATFORM_WIN_WINTCP_H
+#define PLATFORM_WIN_WINTCP_H
+
+// This header file is private to the TCP support for Windows.
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+struct nni_tcp_conn {
+ SOCKET s;
+ nni_win_io recv_io;
+ nni_win_io send_io;
+ nni_win_io conn_io;
+ nni_list recv_aios;
+ nni_list send_aios;
+ nni_aio * conn_aio;
+ SOCKADDR_STORAGE sockname;
+ SOCKADDR_STORAGE peername;
+ nni_tcp_dialer * dialer;
+ nni_tcp_listener *listener;
+ int recv_rv;
+ int send_rv;
+ int conn_rv;
+ bool closed;
+ char buf[512]; // to hold acceptex results
+ nni_mtx mtx;
+ nni_cv cv;
+};
+
+struct nni_tcp_dialer {
+ LPFN_CONNECTEX connectex; // looked up name via ioctl
+ nni_list aios; // in flight connections
+ bool closed;
+ nni_mtx mtx;
+ nni_reap_item reap;
+};
+
+struct nni_tcp_listener {
+ SOCKET s;
+ nni_list aios;
+ bool closed;
+ bool started;
+ LPFN_ACCEPTEX acceptex;
+ LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs;
+ SOCKADDR_STORAGE ss;
+ nni_mtx mtx;
+ nni_reap_item reap;
+};
+
+extern int nni_win_tcp_conn_init(nni_tcp_conn **, SOCKET);
+extern void nni_win_tcp_conn_set_addrs(
+ nni_tcp_conn *, const SOCKADDR_STORAGE *, const SOCKADDR_STORAGE *);
+
+#endif // NNG_PLATFORM_WINDOWS
+
+#endif // NNG_PLATFORM_WIN_WINTCP_H
diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c
new file mode 100644
index 00000000..c3a4a5d8
--- /dev/null
+++ b/src/platform/windows/win_tcpconn.c
@@ -0,0 +1,391 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include "win_tcp.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include <malloc.h>
+#include <stdio.h>
+
+static void
+tcp_recv_start(nni_tcp_conn *c)
+{
+ nni_aio *aio;
+ int rv;
+ DWORD niov;
+ DWORD flags;
+ unsigned i;
+ unsigned naiov;
+ nni_iov *aiov;
+ WSABUF * iov;
+
+ if (c->closed) {
+ while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_list_remove(&c->recv_aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&c->cv);
+ }
+again:
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ return;
+ }
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+ iov = _malloca(naiov * sizeof(*iov));
+
+ // Put the AIOs in Windows form.
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len != 0) {
+ iov[niov].buf = aiov[i].iov_buf;
+ iov[niov].len = (ULONG) aiov[i].iov_len;
+ niov++;
+ }
+ }
+
+ flags = 0;
+ rv = WSARecv(c->s, iov, niov, NULL, &flags, &c->recv_io.olpd, NULL);
+ _freea(iov);
+
+ if ((rv == SOCKET_ERROR) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ goto again;
+ }
+}
+
+static void
+tcp_recv_cb(nni_win_io *io, int rv, size_t num)
+{
+ nni_aio * aio;
+ nni_tcp_conn *c = io->ptr;
+ nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ // Should indicate that it was closed.
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ if (c->recv_rv != 0) {
+ rv = c->recv_rv;
+ c->recv_rv = 0;
+ }
+ nni_aio_list_remove(aio);
+ tcp_recv_start(c);
+ if (c->closed) {
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ if ((rv == 0) && (num == 0)) {
+ // A zero byte receive is a remote close from the peer.
+ rv = NNG_ECLOSED;
+ }
+ nni_aio_finish_synch(aio, rv, num);
+}
+
+static void
+tcp_recv_cancel(nni_aio *aio, int rv)
+{
+ nni_tcp_conn *c = nni_aio_get_prov_data(aio);
+ nni_mtx_lock(&c->mtx);
+ if (aio == nni_list_first(&c->recv_aios)) {
+ c->recv_rv = rv;
+ nni_win_io_cancel(&c->recv_io);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (c->closed) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, tcp_recv_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&c->recv_aios, aio);
+ if (aio == nni_list_first(&c->recv_aios)) {
+ tcp_recv_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+tcp_send_start(nni_tcp_conn *c)
+{
+ nni_aio *aio;
+ int rv;
+ DWORD niov;
+ unsigned i;
+ unsigned naiov;
+ nni_iov *aiov;
+ WSABUF * iov;
+
+ if (c->closed) {
+ while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_list_remove(&c->send_aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&c->cv);
+ }
+
+again:
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ return;
+ }
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+ iov = _malloca(naiov * sizeof(*iov));
+
+ // Put the AIOs in Windows form.
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len != 0) {
+ iov[niov].buf = aiov[i].iov_buf;
+ iov[niov].len = (ULONG) aiov[i].iov_len;
+ niov++;
+ }
+ }
+
+ rv = WSASend(c->s, iov, niov, NULL, 0, &c->send_io.olpd, NULL);
+ _freea(iov);
+
+ if ((rv == SOCKET_ERROR) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ goto again;
+ }
+}
+
+static void
+tcp_send_cancel(nni_aio *aio, int rv)
+{
+ nni_tcp_conn *c = nni_aio_get_prov_data(aio);
+ nni_mtx_lock(&c->mtx);
+ if (aio == nni_list_first(&c->send_aios)) {
+ c->send_rv = rv;
+ nni_win_io_cancel(&c->send_io);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+tcp_send_cb(nni_win_io *io, int rv, size_t num)
+{
+ nni_aio * aio;
+ nni_tcp_conn *c = io->ptr;
+ nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ // Should indicate that it was closed.
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ if (c->send_rv != 0) {
+ rv = c->send_rv;
+ c->send_rv = 0;
+ }
+ nni_aio_list_remove(aio); // should always be at head
+ tcp_send_start(c);
+ if (c->closed) {
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ nni_aio_finish_synch(aio, rv, num);
+}
+
+void
+nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (c->closed) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, tcp_send_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&c->send_aios, aio);
+ if (aio == nni_list_first(&c->send_aios)) {
+ tcp_send_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s)
+{
+ nni_tcp_conn *c;
+ int rv;
+ BOOL yes;
+ DWORD no;
+
+ // Don't inherit the handle (CLOEXEC really).
+ SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ c->s = INVALID_SOCKET;
+ nni_mtx_init(&c->mtx);
+ nni_cv_init(&c->cv, &c->mtx);
+ nni_aio_list_init(&c->recv_aios);
+ nni_aio_list_init(&c->send_aios);
+ c->conn_aio = NULL;
+
+ if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) !=
+ 0) ||
+ ((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) !=
+ 0) ||
+ ((rv = nni_win_io_register((HANDLE) s)) != 0)) {
+ nni_tcp_conn_fini(c);
+ return (rv);
+ }
+
+ no = 0;
+ (void) setsockopt(
+ s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no));
+ yes = 1;
+ (void) setsockopt(
+ s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes));
+
+ c->s = s;
+ *connp = c;
+ return (0);
+}
+
+void
+nni_win_tcp_conn_set_addrs(
+ nni_tcp_conn *c, const SOCKADDR_STORAGE *loc, const SOCKADDR_STORAGE *rem)
+{
+ memcpy(&c->sockname, loc, sizeof(*loc));
+ memcpy(&c->peername, rem, sizeof(*rem));
+}
+
+void
+nni_tcp_conn_close(nni_tcp_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ c->closed = true;
+ if (!nni_list_empty(&c->recv_aios)) {
+ nni_win_io_cancel(&c->recv_io);
+ }
+ if (!nni_list_empty(&c->send_aios)) {
+ nni_win_io_cancel(&c->send_io);
+ }
+ if (c->s != INVALID_SOCKET) {
+ shutdown(c->s, SD_BOTH);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa)
+{
+ if (nni_win_sockaddr2nn(sa, &c->peername) < 0) {
+ return (NNG_EADDRINVAL);
+ }
+ return (0);
+}
+
+int
+nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa)
+{
+ if (nni_win_sockaddr2nn(sa, &c->sockname) < 0) {
+ return (NNG_EADDRINVAL);
+ }
+ return (0);
+}
+
+int
+nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool val)
+{
+ BOOL b;
+ b = val ? TRUE : FALSE;
+ if (setsockopt(
+ c->s, IPPROTO_TCP, TCP_NODELAY, (void *) &b, sizeof(b)) != 0) {
+ return (nni_win_error(WSAGetLastError()));
+ }
+ return (0);
+}
+
+int
+nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool val)
+{
+ BOOL b;
+ b = val ? TRUE : FALSE;
+ if (setsockopt(
+ c->s, SOL_SOCKET, SO_KEEPALIVE, (void *) &b, sizeof(b)) != 0) {
+ return (nni_win_error(WSAGetLastError()));
+ }
+ return (0);
+}
+
+void
+nni_tcp_conn_fini(nni_tcp_conn *c)
+{
+ nni_tcp_conn_close(c);
+
+ nni_mtx_lock(&c->mtx);
+ while ((!nni_list_empty(&c->recv_aios)) ||
+ (!nni_list_empty(&c->send_aios))) {
+ nni_cv_wait(&c->cv);
+ nni_mtx_unlock(&c->mtx);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ nni_win_io_fini(&c->recv_io);
+ nni_win_io_fini(&c->send_io);
+ nni_win_io_fini(&c->conn_io);
+
+ if (c->s != INVALID_SOCKET) {
+ closesocket(c->s);
+ }
+ nni_cv_fini(&c->cv);
+ nni_mtx_fini(&c->mtx);
+ NNI_FREE_STRUCT(c);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c
new file mode 100644
index 00000000..4a3e9f2f
--- /dev/null
+++ b/src/platform/windows/win_tcpdial.c
@@ -0,0 +1,228 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_tcp.h"
+
+#include <malloc.h>
+#include <stdio.h>
+
+int
+nni_tcp_dialer_init(nni_tcp_dialer **dp)
+{
+ nni_tcp_dialer *d;
+ int rv;
+ SOCKET s;
+ DWORD nbytes;
+ GUID guid = WSAID_CONNECTEX;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ ZeroMemory(d, sizeof(*d));
+ nni_mtx_init(&d->mtx);
+ nni_aio_list_init(&d->aios);
+
+ // Create a scratch socket for use with ioctl.
+ s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
+ if (s == INVALID_SOCKET) {
+ rv = nni_win_error(GetLastError());
+ nni_tcp_dialer_fini(d);
+ return (rv);
+ }
+
+ // Look up the function pointer.
+ if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
+ sizeof(guid), &d->connectex, sizeof(d->connectex), &nbytes,
+ NULL, NULL) == SOCKET_ERROR) {
+ rv = nni_win_error(GetLastError());
+ closesocket(s);
+ nni_tcp_dialer_fini(d);
+ return (rv);
+ }
+
+ closesocket(s);
+
+ *dp = d;
+ return (0);
+}
+
+void
+nni_tcp_dialer_close(nni_tcp_dialer *d)
+{
+ nni_mtx_lock(&d->mtx);
+ if (!d->closed) {
+ nni_aio *aio;
+ d->closed = true;
+
+ NNI_LIST_FOREACH (&d->aios, aio) {
+ nni_tcp_conn *c;
+
+ if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
+ c->conn_rv = NNG_ECLOSED;
+ nni_win_io_cancel(&c->conn_io);
+ }
+ }
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_tcp_dialer_fini(nni_tcp_dialer *d)
+{
+ nni_tcp_dialer_close(d);
+ nni_mtx_lock(&d->mtx);
+ if (!nni_list_empty(&d->aios)) {
+ nni_mtx_unlock(&d->mtx);
+ nni_reap(&d->reap, (nni_cb) nni_tcp_dialer_fini, d);
+ return;
+ }
+ nni_mtx_unlock(&d->mtx);
+
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+static void
+tcp_dial_cancel(nni_aio *aio, int rv)
+{
+ nni_tcp_dialer *d = nni_aio_get_prov_data(aio);
+ nni_tcp_conn * c;
+
+ nni_mtx_lock(&d->mtx);
+ if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
+ if (c->conn_rv == 0) {
+ c->conn_rv = rv;
+ }
+ nni_win_io_cancel(&c->conn_io);
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+static void
+tcp_dial_cb(nni_win_io *io, int rv, size_t cnt)
+{
+ nni_tcp_conn * c = io->ptr;
+ nni_tcp_dialer *d = c->dialer;
+ nni_aio * aio = c->conn_aio;
+
+ NNI_ARG_UNUSED(cnt);
+
+ nni_mtx_lock(&d->mtx);
+ if ((aio = c->conn_aio) == NULL) {
+ // This should never occur.
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ c->conn_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_aio_list_remove(aio);
+ if (c->conn_rv != 0) {
+ rv = c->conn_rv;
+ }
+ nni_mtx_unlock(&d->mtx);
+
+ if (rv != 0) {
+ nni_tcp_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ } else {
+ DWORD yes = 1;
+ (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,
+ (char *) &yes, sizeof(yes));
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+void
+nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+{
+ SOCKET s;
+ SOCKADDR_STORAGE ss;
+ int len;
+ nni_tcp_conn * c;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ if ((len = nni_win_nn2sockaddr(&ss, sa)) <= 0) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
+
+ if ((s = socket(ss.ss_family, SOCK_STREAM, 0)) == INVALID_SOCKET) {
+ nni_aio_finish_error(aio, nni_win_error(GetLastError()));
+ return;
+ }
+
+ if ((rv = nni_win_tcp_conn_init(&c, s)) != 0) {
+ nni_tcp_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ c->peername = ss;
+
+ // Windows ConnectEx requires the socket to be bound
+ // first. We just bind to an ephemeral address in the
+ // same family.
+ ZeroMemory(&c->sockname, sizeof(c->sockname));
+ c->sockname.ss_family = ss.ss_family;
+ if (bind(s, (SOCKADDR *) &c->sockname, len) < 0) {
+ rv = nni_win_error(GetLastError());
+ nni_tcp_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) !=
+ 0) {
+ nni_tcp_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ c->dialer = d;
+ nni_aio_set_prov_extra(aio, 0, c);
+ if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) {
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ c->conn_aio = aio;
+ nni_aio_list_append(&d->aios, aio);
+
+ // dialing is concurrent.
+ if (!d->connectex(s, (struct sockaddr *) &c->peername, len, NULL, 0,
+ NULL, &c->conn_io.olpd)) {
+ if ((rv = GetLastError()) != ERROR_IO_PENDING) {
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&d->mtx);
+
+ nni_tcp_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c
new file mode 100644
index 00000000..5055c32d
--- /dev/null
+++ b/src/platform/windows/win_tcplisten.c
@@ -0,0 +1,312 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include <malloc.h>
+#include <stdbool.h>
+#include <stdio.h>
+
+#include "win_tcp.h"
+
+// tcp_listener_funcs looks up function pointers we need for advanced accept
+// functionality on Windows. Windows is weird.
+static int
+tcp_listener_funcs(nni_tcp_listener *l)
+{
+ static SRWLOCK lock = SRWLOCK_INIT;
+ static LPFN_ACCEPTEX acceptex;
+ static LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs;
+
+ AcquireSRWLockExclusive(&lock);
+ if (acceptex == NULL) {
+ int rv;
+ DWORD nbytes;
+ GUID guid1 = WSAID_ACCEPTEX;
+ GUID guid2 = WSAID_GETACCEPTEXSOCKADDRS;
+ SOCKET s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
+
+ if (s == INVALID_SOCKET) {
+ rv = nni_win_error(GetLastError());
+ ReleaseSRWLockExclusive(&lock);
+ return (rv);
+ }
+
+ // Look up the function pointer.
+ if ((WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1,
+ sizeof(guid1), &acceptex, sizeof(acceptex), &nbytes,
+ NULL, NULL) == SOCKET_ERROR) ||
+ (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2,
+ sizeof(guid2), &getacceptexsockaddrs,
+ sizeof(getacceptexsockaddrs), &nbytes, NULL,
+ NULL) == SOCKET_ERROR)) {
+ rv = nni_win_error(GetLastError());
+ acceptex = NULL;
+ getacceptexsockaddrs = NULL;
+ ReleaseSRWLockExclusive(&lock);
+ closesocket(s);
+ return (rv);
+ }
+ closesocket(s);
+ }
+ ReleaseSRWLockExclusive(&lock);
+
+ l->acceptex = acceptex;
+ l->getacceptexsockaddrs = getacceptexsockaddrs;
+ return (0);
+}
+
+static void
+tcp_accept_cb(nni_win_io *io, int rv, size_t cnt)
+{
+ nni_tcp_conn * c = io->ptr;
+ nni_tcp_listener *l = c->listener;
+ nni_aio * aio;
+ int len1;
+ int len2;
+ SOCKADDR * sa1;
+ SOCKADDR * sa2;
+ DWORD yes;
+
+ NNI_ARG_UNUSED(cnt);
+
+ nni_mtx_lock(&l->mtx);
+ if ((aio = c->conn_aio) == NULL) {
+ // This case should not occur. The situation would indicate
+ // a case where the connection was accepted already.
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ c->conn_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_aio_list_remove(aio);
+ if (c->conn_rv != 0) {
+ rv = c->conn_rv;
+ }
+ nni_mtx_unlock(&l->mtx);
+
+ if (rv != 0) {
+ nni_tcp_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ len1 = (int) sizeof(c->sockname);
+ len2 = (int) sizeof(c->peername);
+ l->getacceptexsockaddrs(c->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2);
+ memcpy(&c->sockname, sa1, len1);
+ memcpy(&c->peername, sa2, len2);
+
+ yes = 1;
+ (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
+ (char *) &yes, sizeof(yes));
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+int
+nni_tcp_listener_init(nni_tcp_listener **lp)
+{
+ nni_tcp_listener *l;
+ int rv;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ ZeroMemory(l, sizeof(*l));
+ nni_mtx_init(&l->mtx);
+ nni_aio_list_init(&l->aios);
+ if ((rv = tcp_listener_funcs(l)) != 0) {
+ nni_tcp_listener_fini(l);
+ return (rv);
+ }
+
+ *lp = l;
+ return (0);
+}
+
+void
+nni_tcp_listener_close(nni_tcp_listener *l)
+{
+ nni_mtx_lock(&l->mtx);
+ if (!l->closed) {
+ nni_aio *aio;
+ l->closed = true;
+
+ NNI_LIST_FOREACH (&l->aios, aio) {
+ nni_tcp_conn *c;
+
+ if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
+ c->conn_rv = NNG_ECLOSED;
+ nni_win_io_cancel(&c->conn_io);
+ }
+ }
+ closesocket(l->s);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_tcp_listener_fini(nni_tcp_listener *l)
+{
+ nni_tcp_listener_close(l);
+ nni_mtx_lock(&l->mtx);
+ if (!nni_list_empty(&l->aios)) {
+ nni_mtx_unlock(&l->mtx);
+ nni_reap(&l->reap, (nni_cb) nni_tcp_listener_fini, l);
+ return;
+ }
+ nni_mtx_unlock(&l->mtx);
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+int
+nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa)
+{
+ int rv;
+ BOOL yes;
+ DWORD no;
+ int len;
+
+ nni_mtx_lock(&l->mtx);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ if ((len = nni_win_nn2sockaddr(&l->ss, sa)) <= 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EADDRINVAL);
+ }
+ l->s = socket(l->ss.ss_family, SOCK_STREAM, 0);
+ if (l->s == INVALID_SOCKET) {
+ rv = nni_win_error(GetLastError());
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ // Don't inherit the handle (CLOEXEC really).
+ SetHandleInformation((HANDLE) l->s, HANDLE_FLAG_INHERIT, 0);
+
+ no = 0;
+ (void) setsockopt(
+ l->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no));
+ yes = 1;
+ (void) setsockopt(
+ l->s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes));
+
+ if ((rv = nni_win_io_register((HANDLE) l->s)) != 0) {
+ closesocket(l->s);
+ l->s = INVALID_SOCKET;
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ // Make sure that we use the address exclusively. Windows lets
+ // others hijack us by default.
+ yes = 1;
+ if ((setsockopt(l->s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes,
+ sizeof(yes)) != 0) ||
+ (bind(l->s, (SOCKADDR *) &l->ss, len) != 0) ||
+ (getsockname(l->s, (SOCKADDR *) &l->ss, &len) != 0) ||
+ (listen(l->s, SOMAXCONN) != 0)) {
+ rv = nni_win_error(GetLastError());
+ closesocket(l->s);
+ l->s = INVALID_SOCKET;
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+ nni_win_sockaddr2nn(sa, &l->ss);
+ l->started = true;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+static void
+tcp_accept_cancel(nni_aio *aio, int rv)
+{
+ nni_tcp_listener *l = nni_aio_get_prov_data(aio);
+ nni_tcp_conn * c;
+
+ nni_mtx_lock(&l->mtx);
+ if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
+ if (c->conn_rv == 0) {
+ c->conn_rv = rv;
+ }
+ nni_win_io_cancel(&c->conn_io);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
+{
+ SOCKET s;
+ int rv;
+ DWORD cnt;
+ nni_tcp_conn *c;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ // Windows requires us to explicity create the socket before
+ // calling accept on it.
+ if ((s = socket(l->ss.ss_family, SOCK_STREAM, 0)) == INVALID_SOCKET) {
+ rv = nni_win_error(GetLastError());
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ if ((rv = nni_win_tcp_conn_init(&c, s)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ closesocket(s);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ c->listener = l;
+ c->conn_aio = aio;
+ nni_aio_set_prov_extra(aio, 0, c);
+ if (((rv = nni_win_io_init(
+ &c->conn_io, (HANDLE) l->s, tcp_accept_cb, c)) != 0) ||
+ ((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) {
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&l->mtx);
+ nni_tcp_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&l->aios, aio);
+ if ((!l->acceptex(
+ l->s, s, c->buf, 0, 256, 256, &cnt, &c->conn_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Fast failure (synchronous.)
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&l->mtx);
+ nni_tcp_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index 243811a0..52327cc4 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -181,7 +181,8 @@ nni_plat_init(int (*helper)(void))
AcquireSRWLockExclusive(&lock);
if (!plat_inited) {
- if (((rv = nni_win_iocp_sysinit()) != 0) ||
+ if (((rv = nni_win_io_sysinit()) != 0) ||
+ ((rv = nni_win_iocp_sysinit()) != 0) ||
((rv = nni_win_ipc_sysinit()) != 0) ||
((rv = nni_win_tcp_sysinit()) != 0) ||
((rv = nni_win_udp_sysinit()) != 0) ||
@@ -207,6 +208,7 @@ nni_plat_fini(void)
nni_win_udp_sysfini();
nni_win_tcp_sysfini();
nni_win_iocp_sysfini();
+ nni_win_io_sysfini();
WSACleanup();
plat_inited = 0;
}
diff --git a/src/supplemental/http/http_api.h b/src/supplemental/http/http_api.h
index 4b515ca5..cf2c78bf 100644
--- a/src/supplemental/http/http_api.h
+++ b/src/supplemental/http/http_api.h
@@ -67,9 +67,9 @@ extern void *nni_http_conn_get_ctx(nni_http_conn *);
// These initialization functions create stream for HTTP transactions.
// They should only be used by the server or client HTTP implementations,
// and are not for use by other code.
-extern int nni_http_conn_init_tcp(nni_http_conn **, void *);
+extern int nni_http_conn_init_tcp(nni_http_conn **, nni_tcp_conn *);
extern int nni_http_conn_init_tls(
- nni_http_conn **, struct nng_tls_config *, void *);
+ nni_http_conn **, struct nng_tls_config *, nni_tcp_conn *);
extern void nni_http_conn_close(nni_http_conn *);
extern void nni_http_conn_fini(nni_http_conn *);
diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c
index 4034bbff..6842b271 100644
--- a/src/supplemental/http/http_client.c
+++ b/src/supplemental/http/http_client.c
@@ -20,72 +20,97 @@
#include "http_api.h"
struct nng_http_client {
- nni_list aios;
- nni_mtx mtx;
- bool closed;
- struct nng_tls_config *tls;
- nni_aio * connaio;
- nni_plat_tcp_ep * tep;
+ nni_list aios;
+ nni_mtx mtx;
+ bool closed;
+ bool resolving;
+ nng_tls_config *tls;
+ nni_aio * aio;
+ nng_sockaddr sa;
+ nni_tcp_dialer *dialer;
+ char * host;
+ char * port;
+ nni_url * url;
};
static void
-http_conn_start(nni_http_client *c)
+http_dial_start(nni_http_client *c)
{
- nni_plat_tcp_ep_connect(c->tep, c->connaio);
+ nni_aio *aio;
+
+ if ((aio = nni_list_first(&c->aios)) == NULL) {
+ return;
+ }
+ c->resolving = true;
+ nni_aio_set_input(c->aio, 0, &c->sa);
+ nni_tcp_resolv(c->host, c->port, NNG_AF_UNSPEC, 0, c->aio);
}
static void
-http_conn_done(void *arg)
+http_dial_cb(void *arg)
{
- nni_http_client * c = arg;
- nni_aio * aio;
- int rv;
- nni_plat_tcp_pipe *p;
- nni_http_conn * conn;
+ nni_http_client *c = arg;
+ nni_aio * aio;
+ int rv;
+ nni_tcp_conn * tcp;
+ nni_http_conn * conn;
nni_mtx_lock(&c->mtx);
- rv = nni_aio_result(c->connaio);
- p = rv == 0 ? nni_aio_get_output(c->connaio, 0) : NULL;
+ rv = nni_aio_result(c->aio);
+
if ((aio = nni_list_first(&c->aios)) == NULL) {
- if (p != NULL) {
- nni_plat_tcp_pipe_fini(p);
- }
+ // User abandoned request, and no residuals left.
nni_mtx_unlock(&c->mtx);
+ if ((rv == 0) && !c->resolving) {
+ tcp = nni_aio_get_output(c->aio, 0);
+ nni_tcp_conn_fini(tcp);
+ }
return;
}
- nni_aio_list_remove(aio);
if (rv != 0) {
+ nni_aio_list_remove(aio);
+ http_dial_start(c);
+ nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (c->resolving) {
+ // This was a DNS lookup -- advance to normal TCP connect.
+ c->resolving = false;
+ nni_tcp_dialer_dial(c->dialer, &c->sa, c->aio);
nni_mtx_unlock(&c->mtx);
return;
}
+ nni_aio_list_remove(aio);
+ tcp = nni_aio_get_output(c->aio, 0);
+ NNI_ASSERT(tcp != NULL);
+
if (c->tls != NULL) {
- rv = nni_http_conn_init_tls(&conn, c->tls, p);
+ rv = nni_http_conn_init_tls(&conn, c->tls, tcp);
} else {
- rv = nni_http_conn_init_tcp(&conn, p);
+ rv = nni_http_conn_init_tcp(&conn, tcp);
}
+ http_dial_start(c);
+ nni_mtx_unlock(&c->mtx);
+
if (rv != 0) {
+ // the conn_init function will have already discard tcp.
nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&c->mtx);
return;
}
nni_aio_set_output(aio, 0, conn);
nni_aio_finish(aio, 0, 0);
-
- if (!nni_list_empty(&c->aios)) {
- http_conn_start(c);
- }
- nni_mtx_unlock(&c->mtx);
}
void
nni_http_client_fini(nni_http_client *c)
{
- nni_aio_fini(c->connaio);
- nni_plat_tcp_ep_fini(c->tep);
+ nni_aio_fini(c->aio);
+ nni_tcp_dialer_fini(c->dialer);
nni_mtx_fini(&c->mtx);
#ifdef NNG_SUPP_TLS
if (c->tls != NULL) {
@@ -100,10 +125,6 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url)
{
int rv;
nni_http_client *c;
- nni_aio * aio;
- nni_sockaddr sa;
- char * host;
- char * port;
if (strlen(url->u_hostname) == 0) {
// We require a valid hostname.
@@ -118,29 +139,17 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url)
return (NNG_EADDRINVAL);
}
- // For now we are looking up the address. We would really like
- // to do this later, but we need TcP support for this. One
- // imagines the ability to create a tcp dialer that does the
- // necessary DNS lookups, etc. all asynchronously.
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
- nni_aio_set_input(aio, 0, &sa);
- host = (strlen(url->u_hostname) != 0) ? url->u_hostname : NULL;
- port = (strlen(url->u_port) != 0) ? url->u_port : NULL;
- nni_plat_tcp_resolv(host, port, NNG_AF_UNSPEC, false, aio);
- nni_aio_wait(aio);
- rv = nni_aio_result(aio);
- nni_aio_fini(aio);
- if (rv != 0) {
- return (rv);
- }
-
if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&c->mtx);
nni_aio_list_init(&c->aios);
+ if (((c->host = nni_strdup(url->u_hostname)) == NULL) ||
+ ((strlen(url->u_port) != 0) &&
+ ((c->port = nni_strdup(url->u_port)) == NULL))) {
+ nni_http_client_fini(c);
+ return (NNG_ENOMEM);
+ }
#ifdef NNG_SUPP_TLS
if ((strcmp(url->u_scheme, "https") == 0) ||
@@ -167,16 +176,12 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url)
}
#endif
- rv = nni_plat_tcp_ep_init(&c->tep, NULL, &sa, NNI_EP_MODE_DIAL);
- if (rv != 0) {
+ if (((rv = nni_tcp_dialer_init(&c->dialer)) != 0) ||
+ ((rv = nni_aio_init(&c->aio, http_dial_cb, c)) != 0)) {
nni_http_client_fini(c);
return (rv);
}
- if ((rv = nni_aio_init(&c->connaio, http_conn_done, c)) != 0) {
- nni_http_client_fini(c);
- return (rv);
- }
*cp = c;
return (0);
}
@@ -224,7 +229,7 @@ nni_http_client_get_tls(nni_http_client *c, struct nng_tls_config **tlsp)
}
static void
-http_connect_cancel(nni_aio *aio, int rv)
+http_dial_cancel(nni_aio *aio, int rv)
{
nni_http_client *c = nni_aio_get_prov_data(aio);
nni_mtx_lock(&c->mtx);
@@ -233,7 +238,7 @@ http_connect_cancel(nni_aio *aio, int rv)
nni_aio_finish_error(aio, rv);
}
if (nni_list_empty(&c->aios)) {
- nni_aio_abort(c->connaio, rv);
+ nni_aio_abort(c->aio, rv);
}
nni_mtx_unlock(&c->mtx);
}
@@ -246,14 +251,14 @@ nni_http_client_connect(nni_http_client *c, nni_aio *aio)
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, http_connect_cancel, c)) != 0) {
+ if ((rv = nni_aio_schedule(aio, http_dial_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&c->aios, aio);
if (nni_list_first(&c->aios) == aio) {
- http_conn_start(c);
+ http_dial_start(c);
}
nni_mtx_unlock(&c->mtx);
}
diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c
index 15d1f776..169918e9 100644
--- a/src/supplemental/http/http_conn.c
+++ b/src/supplemental/http/http_conn.c
@@ -699,17 +699,23 @@ http_init(nni_http_conn **connp, nni_http_tran *tran, void *data)
if ((conn = NNI_ALLOC_STRUCT(conn)) == NULL) {
return (NNG_ENOMEM);
}
- conn->rd_bufsz = HTTP_BUFSIZE;
- if ((conn->rd_buf = nni_alloc(conn->rd_bufsz)) == NULL) {
- NNI_FREE_STRUCT(conn);
- return (NNG_ENOMEM);
- }
nni_mtx_init(&conn->mtx);
nni_aio_list_init(&conn->rdq);
nni_aio_list_init(&conn->wrq);
+ if ((conn->rd_buf = nni_alloc(HTTP_BUFSIZE)) == NULL) {
+ nni_http_conn_fini(conn);
+ return (NNG_ENOMEM);
+ }
+ conn->rd_bufsz = HTTP_BUFSIZE;
+
+ if (((rv = nni_aio_init(&conn->wr_aio, http_wr_cb, conn)) != 0) ||
+ ((rv = nni_aio_init(&conn->rd_aio, http_rd_cb, conn)) != 0)) {
+ nni_http_conn_fini(conn);
+ return (rv);
+ }
+
conn->sock = data;
- conn->rd_bufsz = HTTP_BUFSIZE;
conn->rd = tran->h_read;
conn->wr = tran->h_write;
conn->close = tran->h_close;
@@ -718,12 +724,6 @@ http_init(nni_http_conn **connp, nni_http_tran *tran, void *data)
conn->peer_addr = tran->h_peer_addr;
conn->verified = tran->h_verified;
- if (((rv = nni_aio_init(&conn->wr_aio, http_wr_cb, conn)) != 0) ||
- ((rv = nni_aio_init(&conn->rd_aio, http_rd_cb, conn)) != 0)) {
- nni_http_conn_fini(conn);
- return (rv);
- }
-
*connp = conn;
return (0);
@@ -737,19 +737,23 @@ nni_http_verified_tcp(void *arg)
}
static nni_http_tran http_tcp_ops = {
- .h_read = (http_read_fn) nni_plat_tcp_pipe_recv,
- .h_write = (http_write_fn) nni_plat_tcp_pipe_send,
- .h_close = (http_close_fn) nni_plat_tcp_pipe_close,
- .h_fini = (http_fini_fn) nni_plat_tcp_pipe_fini,
- .h_sock_addr = (http_addr_fn) nni_plat_tcp_pipe_sockname,
- .h_peer_addr = (http_addr_fn) nni_plat_tcp_pipe_peername,
+ .h_read = (http_read_fn) nni_tcp_conn_recv,
+ .h_write = (http_write_fn) nni_tcp_conn_send,
+ .h_close = (http_close_fn) nni_tcp_conn_close,
+ .h_fini = (http_fini_fn) nni_tcp_conn_fini,
+ .h_sock_addr = (http_addr_fn) nni_tcp_conn_sockname,
+ .h_peer_addr = (http_addr_fn) nni_tcp_conn_peername,
.h_verified = (http_verified_fn) nni_http_verified_tcp,
};
int
-nni_http_conn_init_tcp(nni_http_conn **connp, void *tcp)
+nni_http_conn_init_tcp(nni_http_conn **connp, nni_tcp_conn *tcp)
{
- return (http_init(connp, &http_tcp_ops, tcp));
+ int rv;
+ if ((rv = http_init(connp, &http_tcp_ops, tcp)) != 0) {
+ nni_tcp_conn_fini(tcp);
+ }
+ return (rv);
}
#ifdef NNG_SUPP_TLS
@@ -765,26 +769,29 @@ static nni_http_tran http_tls_ops = {
int
nni_http_conn_init_tls(
- nni_http_conn **connp, struct nng_tls_config *cfg, void *tcp)
+ nni_http_conn **connp, struct nng_tls_config *cfg, nni_tcp_conn *tcp)
{
nni_tls *tls;
int rv;
if ((rv = nni_tls_init(&tls, cfg, tcp)) != 0) {
- nni_plat_tcp_pipe_fini(tcp);
+ nni_tcp_conn_fini(tcp);
return (rv);
}
- return (http_init(connp, &http_tls_ops, tls));
+ if ((rv = http_init(connp, &http_tls_ops, tls)) != 0) {
+ nni_tls_fini(tls);
+ }
+ return (rv);
}
#else
int
nni_http_conn_init_tls(
- nni_http_conn **connp, struct nng_tls_config *cfg, void *tcp)
+ nni_http_conn **connp, struct nng_tls_config *cfg, nni_tcp_conn *tcp)
{
NNI_ARG_UNUSED(connp);
NNI_ARG_UNUSED(cfg);
- nni_plat_tcp_pipe_fini(tcp);
+ nni_tcp_conn_fini(tcp);
return (NNG_ENOTSUP);
}
#endif // NNG_SUPP_TLS
diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c
index 4a07d544..9d1313b1 100644
--- a/src/supplemental/http/http_server.c
+++ b/src/supplemental/http/http_server.c
@@ -64,22 +64,22 @@ typedef struct http_error {
} http_error;
struct nng_http_server {
- nng_sockaddr addr;
- nni_list_node node;
- int refcnt;
- int starts;
- nni_list handlers;
- nni_list conns;
- nni_mtx mtx;
- nni_cv cv;
- bool closed;
- nng_tls_config * tls;
- nni_aio * accaio;
- nni_plat_tcp_ep *tep;
- char * port;
- char * hostname;
- nni_list errors;
- nni_mtx errors_mtx;
+ nng_sockaddr addr;
+ nni_list_node node;
+ int refcnt;
+ int starts;
+ nni_list handlers;
+ nni_list conns;
+ nni_mtx mtx;
+ nni_cv cv;
+ bool closed;
+ nng_tls_config * tls;
+ nni_aio * accaio;
+ nni_tcp_listener *listener;
+ char * port;
+ char * hostname;
+ nni_list errors;
+ nni_mtx errors_mtx;
};
int
@@ -682,13 +682,13 @@ http_sconn_cbdone(void *arg)
}
static int
-http_sconn_init(http_sconn **scp, nni_http_server *s, nni_plat_tcp_pipe *tcp)
+http_sconn_init(http_sconn **scp, nni_http_server *s, nni_tcp_conn *tcp)
{
http_sconn *sc;
int rv;
if ((sc = NNI_ALLOC_STRUCT(sc)) == NULL) {
- nni_plat_tcp_pipe_fini(tcp);
+ nni_tcp_conn_fini(tcp);
return (NNG_ENOMEM);
}
@@ -720,17 +720,17 @@ http_sconn_init(http_sconn **scp, nni_http_server *s, nni_plat_tcp_pipe *tcp)
static void
http_server_acccb(void *arg)
{
- nni_http_server * s = arg;
- nni_aio * aio = s->accaio;
- nni_plat_tcp_pipe *tcp;
- http_sconn * sc;
- int rv;
+ nni_http_server *s = arg;
+ nni_aio * aio = s->accaio;
+ nni_tcp_conn * tcp;
+ http_sconn * sc;
+ int rv;
nni_mtx_lock(&s->mtx);
if ((rv = nni_aio_result(aio)) != 0) {
if (!s->closed) {
// try again?
- nni_plat_tcp_ep_accept(s->tep, s->accaio);
+ nni_tcp_listener_accept(s->listener, s->accaio);
}
nni_mtx_unlock(&s->mtx);
return;
@@ -738,14 +738,14 @@ http_server_acccb(void *arg)
tcp = nni_aio_get_output(aio, 0);
if (s->closed) {
// If we're closing, then reject this one.
- nni_plat_tcp_pipe_fini(tcp);
+ nni_tcp_conn_fini(tcp);
nni_mtx_unlock(&s->mtx);
return;
}
if (http_sconn_init(&sc, s, tcp) != 0) {
// The TCP structure is already cleaned up.
// Start another accept attempt.
- nni_plat_tcp_ep_accept(s->tep, s->accaio);
+ nni_tcp_listener_accept(s->listener, s->accaio);
nni_mtx_unlock(&s->mtx);
return;
}
@@ -753,7 +753,7 @@ http_server_acccb(void *arg)
nni_list_append(&s->conns, sc);
nni_http_read_req(sc->conn, sc->req, sc->rxaio);
- nni_plat_tcp_ep_accept(s->tep, s->accaio);
+ nni_tcp_listener_accept(s->listener, s->accaio);
nni_mtx_unlock(&s->mtx);
}
@@ -769,8 +769,8 @@ http_server_fini(nni_http_server *s)
while (!nni_list_empty(&s->conns)) {
nni_cv_wait(&s->cv);
}
- if (s->tep != NULL) {
- nni_plat_tcp_ep_fini(s->tep);
+ if (s->listener != NULL) {
+ nni_tcp_listener_fini(s->listener);
}
while ((h = nni_list_first(&s->handlers)) != NULL) {
nni_list_remove(&s->handlers, h);
@@ -875,7 +875,7 @@ http_server_init(nni_http_server **serverp, const nni_url *url)
return (rv);
}
nni_aio_set_input(aio, 0, &s->addr);
- nni_plat_tcp_resolv(s->hostname, s->port, NNG_AF_UNSPEC, true, aio);
+ nni_tcp_resolv(s->hostname, s->port, NNG_AF_UNSPEC, true, aio);
nni_aio_wait(aio);
rv = nni_aio_result(aio);
nni_aio_fini(aio);
@@ -921,16 +921,15 @@ static int
http_server_start(nni_http_server *s)
{
int rv;
- rv = nni_plat_tcp_ep_init(&s->tep, &s->addr, NULL, NNI_EP_MODE_LISTEN);
- if (rv != 0) {
+ if ((rv = nni_tcp_listener_init(&s->listener)) != 0) {
return (rv);
}
- if ((rv = nni_plat_tcp_ep_listen(s->tep, NULL)) != 0) {
- nni_plat_tcp_ep_fini(s->tep);
- s->tep = NULL;
+ if ((rv = nni_tcp_listener_listen(s->listener, &s->addr)) != 0) {
+ nni_tcp_listener_fini(s->listener);
+ s->listener = NULL;
return (rv);
}
- nni_plat_tcp_ep_accept(s->tep, s->accaio);
+ nni_tcp_listener_accept(s->listener, s->accaio);
return (0);
}
@@ -961,8 +960,8 @@ http_server_stop(nni_http_server *s)
s->closed = true;
// Close the TCP endpoint that is listening.
- if (s->tep) {
- nni_plat_tcp_ep_close(s->tep);
+ if (s->listener) {
+ nni_tcp_listener_close(s->listener);
}
// Stopping the server is a hard stop -- it aborts any work
diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c
index cdd226cd..0f4f67cc 100644
--- a/src/supplemental/tls/mbedtls/tls.c
+++ b/src/supplemental/tls/mbedtls/tls.c
@@ -59,7 +59,7 @@ typedef struct nni_tls_certkey {
} nni_tls_certkey;
struct nni_tls {
- nni_plat_tcp_pipe * tcp;
+ nni_tcp_conn * tcp;
mbedtls_ssl_context ctx;
nng_tls_config * cfg; // kept so we can release it
nni_mtx lk;
@@ -254,14 +254,14 @@ nni_tls_fini(nni_tls *tp)
{
// Shut it all down first.
if (tp->tcp) {
- nni_plat_tcp_pipe_close(tp->tcp);
+ nni_tcp_conn_close(tp->tcp);
}
nni_aio_stop(tp->tcp_send);
nni_aio_stop(tp->tcp_recv);
// And finalize / free everything.
if (tp->tcp) {
- nni_plat_tcp_pipe_fini(tp->tcp);
+ nni_tcp_conn_fini(tp->tcp);
}
nni_aio_fini(tp->tcp_send);
nni_aio_fini(tp->tcp_recv);
@@ -306,7 +306,7 @@ nni_tls_mkerr(int err)
}
int
-nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_plat_tcp_pipe *tcp)
+nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp)
{
nni_tls *tp;
int rv;
@@ -314,7 +314,7 @@ nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_plat_tcp_pipe *tcp)
// During the handshake, disable Nagle to shorten the
// negotiation. Once things are set up the caller can
// re-enable Nagle if so desired.
- (void) nni_plat_tcp_pipe_set_nodelay(tcp, true);
+ (void) nni_tcp_conn_set_nodelay(tcp, true);
if ((tp = NNI_ALLOC_STRUCT(tp)) == NULL) {
return (NNG_ENOMEM);
@@ -387,7 +387,7 @@ nni_tls_fail(nni_tls *tp, int rv)
{
nni_aio *aio;
tp->tls_closed = true;
- nni_plat_tcp_pipe_close(tp->tcp);
+ nni_tcp_conn_close(tp->tcp);
tp->tcp_closed = true;
while ((aio = nni_list_first(&tp->recvs)) != NULL) {
nni_list_remove(&tp->recvs, aio);
@@ -408,7 +408,7 @@ nni_tls_send_cb(void *ctx)
nni_mtx_lock(&tp->lk);
if (nni_aio_result(aio) != 0) {
- nni_plat_tcp_pipe_close(tp->tcp);
+ nni_tcp_conn_close(tp->tcp);
tp->tcp_closed = true;
} else {
size_t n = nni_aio_count(aio);
@@ -421,7 +421,7 @@ nni_tls_send_cb(void *ctx)
iov.iov_len = tp->sendlen;
nni_aio_set_iov(aio, 1, &iov);
nni_aio_set_timeout(aio, NNG_DURATION_INFINITE);
- nni_plat_tcp_pipe_send(tp->tcp, aio);
+ nni_tcp_conn_send(tp->tcp, aio);
nni_mtx_unlock(&tp->lk);
return;
}
@@ -460,7 +460,7 @@ nni_tls_recv_start(nni_tls *tp)
iov.iov_len = NNG_TLS_MAX_RECV_SIZE;
nni_aio_set_iov(aio, 1, &iov);
nni_aio_set_timeout(tp->tcp_recv, NNG_DURATION_INFINITE);
- nni_plat_tcp_pipe_recv(tp->tcp, aio);
+ nni_tcp_conn_recv(tp->tcp, aio);
}
static void
@@ -474,7 +474,7 @@ nni_tls_recv_cb(void *ctx)
if (nni_aio_result(aio) != 0) {
// Close the underlying TCP channel, but permit data we
// already received to continue to be received.
- nni_plat_tcp_pipe_close(tp->tcp);
+ nni_tcp_conn_close(tp->tcp);
tp->tcp_closed = true;
} else {
NNI_ASSERT(tp->recvlen == 0);
@@ -531,7 +531,7 @@ nni_tls_net_send(void *ctx, const unsigned char *buf, size_t len)
iov.iov_len = len;
nni_aio_set_iov(tp->tcp_send, 1, &iov);
nni_aio_set_timeout(tp->tcp_send, NNG_DURATION_INFINITE);
- nni_plat_tcp_pipe_send(tp->tcp, tp->tcp_send);
+ nni_tcp_conn_send(tp->tcp, tp->tcp_send);
return (len);
}
@@ -615,25 +615,25 @@ nni_tls_recv(nni_tls *tp, nni_aio *aio)
int
nni_tls_peername(nni_tls *tp, nni_sockaddr *sa)
{
- return (nni_plat_tcp_pipe_peername(tp->tcp, sa));
+ return (nni_tcp_conn_peername(tp->tcp, sa));
}
int
nni_tls_sockname(nni_tls *tp, nni_sockaddr *sa)
{
- return (nni_plat_tcp_pipe_sockname(tp->tcp, sa));
+ return (nni_tcp_conn_sockname(tp->tcp, sa));
}
int
nni_tls_set_nodelay(nni_tls *tp, bool val)
{
- return (nni_plat_tcp_pipe_set_nodelay(tp->tcp, val));
+ return (nni_tcp_conn_set_nodelay(tp->tcp, val));
}
int
nni_tls_set_keepalive(nni_tls *tp, bool val)
{
- return (nni_plat_tcp_pipe_set_keepalive(tp->tcp, val));
+ return (nni_tcp_conn_set_keepalive(tp->tcp, val));
}
static void
@@ -785,7 +785,7 @@ nni_tls_close(nni_tls *tp)
// connection at this point.
(void) mbedtls_ssl_close_notify(&tp->ctx);
} else {
- nni_plat_tcp_pipe_close(tp->tcp);
+ nni_tcp_conn_close(tp->tcp);
}
nni_mtx_unlock(&tp->lk);
}
diff --git a/src/supplemental/tls/none/tls.c b/src/supplemental/tls/none/tls.c
index 2fdc0c93..d7968758 100644
--- a/src/supplemental/tls/none/tls.c
+++ b/src/supplemental/tls/none/tls.c
@@ -47,7 +47,7 @@ nni_tls_fini(nni_tls *tp)
}
int
-nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_plat_tcp_pipe *tcp)
+nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp)
{
NNI_ARG_UNUSED(tpp);
NNI_ARG_UNUSED(cfg);
@@ -163,7 +163,8 @@ nng_tls_config_cert_key_file(
return (NNG_ENOTSUP);
}
-int nng_tls_config_key(nng_tls_config *cfg, const uint8_t * key, size_t size)
+int
+nng_tls_config_key(nng_tls_config *cfg, const uint8_t *key, size_t size)
{
NNI_ARG_UNUSED(cfg);
NNI_ARG_UNUSED(key);
@@ -171,14 +172,14 @@ int nng_tls_config_key(nng_tls_config *cfg, const uint8_t * key, size_t size)
return (NNG_ENOTSUP);
}
-int nng_tls_config_pass(nng_tls_config *cfg, const char *pass)
+int
+nng_tls_config_pass(nng_tls_config *cfg, const char *pass)
{
NNI_ARG_UNUSED(cfg);
NNI_ARG_UNUSED(pass);
return (NNG_ENOTSUP);
}
-
int
nng_tls_config_alloc(nng_tls_config **cfgp, nng_tls_mode mode)
{
diff --git a/src/supplemental/tls/tls_api.h b/src/supplemental/tls/tls_api.h
index 8a40bcfb..53dba7fe 100644
--- a/src/supplemental/tls/tls_api.h
+++ b/src/supplemental/tls/tls_api.h
@@ -31,7 +31,7 @@ extern void nni_tls_config_fini(nng_tls_config *);
// the configuration object is created with a hold on it.
extern void nni_tls_config_hold(nng_tls_config *);
-extern int nni_tls_init(nni_tls **, nng_tls_config *, nni_plat_tcp_pipe *);
+extern int nni_tls_init(nni_tls **, nng_tls_config *, nni_tcp_conn *);
extern void nni_tls_close(nni_tls *);
extern void nni_tls_fini(nni_tls *);
extern void nni_tls_send(nni_tls *, nng_aio *);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index e8aa04d0..c323da29 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -17,17 +17,18 @@
// TCP transport. Platform specific TCP operations must be
// supplied as well.
-typedef struct tcp_pipe tcp_pipe;
-typedef struct tcp_ep tcp_ep;
+typedef struct tcptran_pipe tcptran_pipe;
+typedef struct tcptran_dialer tcptran_dialer;
+typedef struct tcptran_listener tcptran_listener;
// tcp_pipe is one end of a TCP connection.
-struct tcp_pipe {
- nni_plat_tcp_pipe *tpp;
- uint16_t peer;
- uint16_t proto;
- size_t rcvmax;
- bool nodelay;
- bool keepalive;
+struct tcptran_pipe {
+ nni_tcp_conn *conn;
+ uint16_t peer;
+ uint16_t proto;
+ size_t rcvmax;
+ bool nodelay;
+ bool keepalive;
nni_list recvq;
nni_list sendq;
@@ -46,53 +47,70 @@ struct tcp_pipe {
nni_mtx mtx;
};
-struct tcp_ep {
- nni_plat_tcp_ep *tep;
- uint16_t proto;
- size_t rcvmax;
- bool nodelay;
- bool keepalive;
- nni_aio * aio;
- nni_aio * user_aio;
- nni_url * url;
- nng_sockaddr bsa; // bound addr
- nni_mtx mtx;
+struct tcptran_dialer {
+ nni_tcp_dialer *dialer;
+ uint16_t proto;
+ uint16_t af;
+ size_t rcvmax;
+ bool nodelay;
+ bool keepalive;
+ bool resolving;
+ nng_sockaddr sa;
+ nni_aio * aio;
+ nni_aio * user_aio;
+ nni_url * url;
+ nni_mtx mtx;
};
-static void tcp_pipe_dosend(tcp_pipe *, nni_aio *);
-static void tcp_pipe_dorecv(tcp_pipe *);
-static void tcp_pipe_send_cb(void *);
-static void tcp_pipe_recv_cb(void *);
-static void tcp_pipe_nego_cb(void *);
-static void tcp_ep_cb(void *arg);
+struct tcptran_listener {
+ nni_tcp_listener *listener;
+ uint16_t proto;
+ size_t rcvmax;
+ bool nodelay;
+ bool keepalive;
+ nni_aio * aio;
+ nni_aio * user_aio;
+ nni_url * url;
+ nng_sockaddr sa;
+ nng_sockaddr bsa; // bound addr
+ nni_mtx mtx;
+};
+
+static void tcptran_pipe_send_start(tcptran_pipe *);
+static void tcptran_pipe_recv_start(tcptran_pipe *);
+static void tcptran_pipe_send_cb(void *);
+static void tcptran_pipe_recv_cb(void *);
+static void tcptran_pipe_nego_cb(void *);
+static void tcptran_dialer_cb(void *arg);
+static void tcptran_listener_cb(void *arg);
static int
-tcp_tran_init(void)
+tcptran_init(void)
{
return (0);
}
static void
-tcp_tran_fini(void)
+tcptran_fini(void)
{
}
static void
-tcp_pipe_close(void *arg)
+tcptran_pipe_close(void *arg)
{
- tcp_pipe *p = arg;
+ tcptran_pipe *p = arg;
nni_aio_close(p->rxaio);
nni_aio_close(p->txaio);
nni_aio_close(p->negaio);
- nni_plat_tcp_pipe_close(p->tpp);
+ nni_tcp_conn_close(p->conn);
}
static void
-tcp_pipe_stop(void *arg)
+tcptran_pipe_stop(void *arg)
{
- tcp_pipe *p = arg;
+ tcptran_pipe *p = arg;
nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
@@ -100,62 +118,48 @@ tcp_pipe_stop(void *arg)
}
static void
-tcp_pipe_fini(void *arg)
+tcptran_pipe_fini(void *arg)
{
- tcp_pipe *p = arg;
+ tcptran_pipe *p = arg;
nni_aio_fini(p->rxaio);
nni_aio_fini(p->txaio);
nni_aio_fini(p->negaio);
- if (p->tpp != NULL) {
- nni_plat_tcp_pipe_fini(p->tpp);
- }
- if (p->rxmsg) {
- nni_msg_free(p->rxmsg);
+ if (p->conn != NULL) {
+ nni_tcp_conn_fini(p->conn);
}
-
+ nni_msg_free(p->rxmsg);
NNI_FREE_STRUCT(p);
}
static int
-tcp_pipe_init(tcp_pipe **pipep, tcp_ep *ep, void *tpp)
+tcptran_pipe_init(tcptran_pipe **pipep, void *conn)
{
- tcp_pipe *p;
- int rv;
+ tcptran_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- if (((rv = nni_aio_init(&p->txaio, tcp_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->rxaio, tcp_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->negaio, tcp_pipe_nego_cb, p)) != 0)) {
- tcp_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->negaio, tcptran_pipe_nego_cb, p)) != 0)) {
+ tcptran_pipe_fini(p);
return (rv);
}
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
- p->proto = ep->proto;
- p->rcvmax = ep->rcvmax;
- p->nodelay = ep->nodelay;
- p->keepalive = ep->keepalive;
- p->tpp = tpp;
-
- // We try to set the nodelay and keepalive, but if these fail for
- // some reason, its not really fatal to the communication channel.
- // So ignore the return values.
- (void) nni_plat_tcp_pipe_set_nodelay(tpp, p->nodelay);
- (void) nni_plat_tcp_pipe_set_keepalive(tpp, p->keepalive);
-
- *pipep = p;
+ p->conn = conn;
+ *pipep = p;
return (0);
}
static void
-tcp_cancel_nego(nni_aio *aio, int rv)
+tcptran_pipe_nego_cancel(nni_aio *aio, int rv)
{
- tcp_pipe *p = nni_aio_get_prov_data(aio);
+ tcptran_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_negaio != aio) {
@@ -170,11 +174,11 @@ tcp_cancel_nego(nni_aio *aio, int rv)
}
static void
-tcp_pipe_nego_cb(void *arg)
+tcptran_pipe_nego_cb(void *arg)
{
- tcp_pipe *p = arg;
- nni_aio * aio = p->negaio;
- int rv;
+ tcptran_pipe *p = arg;
+ nni_aio * aio = p->negaio;
+ int rv;
nni_mtx_lock(&p->mtx);
if ((rv = nni_aio_result(aio)) != 0) {
@@ -194,7 +198,7 @@ tcp_pipe_nego_cb(void *arg)
iov.iov_buf = &p->txlen[p->gottxhead];
// send it down...
nni_aio_set_iov(aio, 1, &iov);
- nni_plat_tcp_pipe_send(p->tpp, aio);
+ nni_tcp_conn_send(p->conn, aio);
nni_mtx_unlock(&p->mtx);
return;
}
@@ -203,7 +207,7 @@ tcp_pipe_nego_cb(void *arg)
iov.iov_len = p->wantrxhead - p->gotrxhead;
iov.iov_buf = &p->rxlen[p->gotrxhead];
nni_aio_set_iov(aio, 1, &iov);
- nni_plat_tcp_pipe_recv(p->tpp, aio);
+ nni_tcp_conn_recv(p->conn, aio);
nni_mtx_unlock(&p->mtx);
return;
}
@@ -227,14 +231,14 @@ done:
}
static void
-tcp_pipe_send_cb(void *arg)
+tcptran_pipe_send_cb(void *arg)
{
- tcp_pipe *p = arg;
- int rv;
- nni_aio * aio;
- size_t n;
- nni_msg * msg;
- nni_aio * txaio = p->txaio;
+ tcptran_pipe *p = arg;
+ int rv;
+ nni_aio * aio;
+ size_t n;
+ nni_msg * msg;
+ nni_aio * txaio = p->txaio;
nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->sendq);
@@ -254,16 +258,14 @@ tcp_pipe_send_cb(void *arg)
n = nni_aio_count(txaio);
nni_aio_iov_advance(txaio, n);
if (nni_aio_iov_count(txaio) > 0) {
- nni_plat_tcp_pipe_send(p->tpp, txaio);
+ nni_tcp_conn_send(p->conn, txaio);
nni_mtx_unlock(&p->mtx);
return;
}
nni_aio_list_remove(aio);
- if (!nni_list_empty(&p->sendq)) {
- // schedule next send
- tcp_pipe_dosend(p, nni_list_first(&p->sendq));
- }
+ tcptran_pipe_send_start(p);
+
nni_mtx_unlock(&p->mtx);
msg = nni_aio_get_msg(aio);
@@ -274,14 +276,14 @@ tcp_pipe_send_cb(void *arg)
}
static void
-tcp_pipe_recv_cb(void *arg)
+tcptran_pipe_recv_cb(void *arg)
{
- tcp_pipe *p = arg;
- nni_aio * aio;
- int rv;
- size_t n;
- nni_msg * msg;
- nni_aio * rxaio = p->rxaio;
+ tcptran_pipe *p = arg;
+ nni_aio * aio;
+ int rv;
+ size_t n;
+ nni_msg * msg;
+ nni_aio * rxaio = p->rxaio;
nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->recvq);
@@ -293,7 +295,7 @@ tcp_pipe_recv_cb(void *arg)
n = nni_aio_count(rxaio);
nni_aio_iov_advance(rxaio, n);
if (nni_aio_iov_count(rxaio) > 0) {
- nni_plat_tcp_pipe_recv(p->tpp, rxaio);
+ nni_tcp_conn_recv(p->conn, rxaio);
nni_mtx_unlock(&p->mtx);
return;
}
@@ -325,7 +327,7 @@ tcp_pipe_recv_cb(void *arg)
iov.iov_len = (size_t) len;
nni_aio_set_iov(rxaio, 1, &iov);
- nni_plat_tcp_pipe_recv(p->tpp, rxaio);
+ nni_tcp_conn_recv(p->conn, rxaio);
nni_mtx_unlock(&p->mtx);
return;
}
@@ -336,7 +338,7 @@ tcp_pipe_recv_cb(void *arg)
msg = p->rxmsg;
p->rxmsg = NULL;
if (!nni_list_empty(&p->recvq)) {
- tcp_pipe_dorecv(p);
+ tcptran_pipe_recv_start(p);
}
nni_mtx_unlock(&p->mtx);
@@ -357,9 +359,9 @@ recv_error:
}
static void
-tcp_cancel_tx(nni_aio *aio, int rv)
+tcptran_pipe_send_cancel(nni_aio *aio, int rv)
{
- tcp_pipe *p = nni_aio_get_prov_data(aio);
+ tcptran_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (!nni_aio_list_active(aio)) {
@@ -381,14 +383,19 @@ tcp_cancel_tx(nni_aio *aio, int rv)
}
static void
-tcp_pipe_dosend(tcp_pipe *p, nni_aio *aio)
+tcptran_pipe_send_start(tcptran_pipe *p)
{
+ nni_aio *aio;
nni_aio *txaio;
nni_msg *msg;
int niov;
nni_iov iov[3];
uint64_t len;
+ if ((aio = nni_list_first(&p->sendq)) == NULL) {
+ return;
+ }
+
// This runs to send the message.
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -411,35 +418,35 @@ tcp_pipe_dosend(tcp_pipe *p, nni_aio *aio)
niov++;
}
nni_aio_set_iov(txaio, niov, iov);
- nni_plat_tcp_pipe_send(p->tpp, txaio);
+ nni_tcp_conn_send(p->conn, txaio);
}
static void
-tcp_pipe_send(void *arg, nni_aio *aio)
+tcptran_pipe_send(void *arg, nni_aio *aio)
{
- tcp_pipe *p = arg;
- int rv;
+ tcptran_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, tcp_cancel_tx, p)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tcptran_pipe_send_cancel, p)) != 0) {
nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
- tcp_pipe_dosend(p, aio);
+ tcptran_pipe_send_start(p);
}
nni_mtx_unlock(&p->mtx);
}
static void
-tcp_cancel_rx(nni_aio *aio, int rv)
+tcptran_pipe_recv_cancel(nni_aio *aio, int rv)
{
- tcp_pipe *p = nni_aio_get_prov_data(aio);
+ tcptran_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (!nni_aio_list_active(aio)) {
@@ -460,7 +467,7 @@ tcp_cancel_rx(nni_aio *aio, int rv)
}
static void
-tcp_pipe_dorecv(tcp_pipe *p)
+tcptran_pipe_recv_start(tcptran_pipe *p)
{
nni_aio *rxaio;
nni_iov iov;
@@ -472,20 +479,20 @@ tcp_pipe_dorecv(tcp_pipe *p)
iov.iov_len = sizeof(p->rxlen);
nni_aio_set_iov(rxaio, 1, &iov);
- nni_plat_tcp_pipe_recv(p->tpp, rxaio);
+ nni_tcp_conn_recv(p->conn, rxaio);
}
static void
-tcp_pipe_recv(void *arg, nni_aio *aio)
+tcptran_pipe_recv(void *arg, nni_aio *aio)
{
- tcp_pipe *p = arg;
- int rv;
+ tcptran_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, tcp_cancel_rx, p)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tcptran_pipe_recv_cancel, p)) != 0) {
nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -493,75 +500,74 @@ tcp_pipe_recv(void *arg, nni_aio *aio)
nni_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
- tcp_pipe_dorecv(p);
+ tcptran_pipe_recv_start(p);
}
nni_mtx_unlock(&p->mtx);
}
static uint16_t
-tcp_pipe_peer(void *arg)
+tcptran_pipe_peer(void *arg)
{
- tcp_pipe *p = arg;
+ tcptran_pipe *p = arg;
return (p->peer);
}
static int
-tcp_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
+tcptran_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tcp_pipe * p = arg;
- int rv;
- nni_sockaddr sa;
+ tcptran_pipe *p = arg;
+ int rv;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
- if ((rv = nni_plat_tcp_pipe_sockname(p->tpp, &sa)) == 0) {
+ if ((rv = nni_tcp_conn_sockname(p->conn, &sa)) == 0) {
rv = nni_copyout_sockaddr(&sa, v, szp, t);
}
return (rv);
}
static int
-tcp_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
+tcptran_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tcp_pipe * p = arg;
- int rv;
- nni_sockaddr sa;
+ tcptran_pipe *p = arg;
+ int rv;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
- if ((rv = nni_plat_tcp_pipe_peername(p->tpp, &sa)) == 0) {
+ if ((rv = nni_tcp_conn_peername(p->conn, &sa)) == 0) {
rv = nni_copyout_sockaddr(&sa, v, szp, t);
}
return (rv);
}
static int
-tcp_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
+tcptran_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tcp_pipe *p = arg;
+ tcptran_pipe *p = arg;
return (nni_copyout_bool(p->keepalive, v, szp, t));
}
static int
-tcp_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
+tcptran_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tcp_pipe *p = arg;
+ tcptran_pipe *p = arg;
return (nni_copyout_bool(p->nodelay, v, szp, t));
}
-// Note that the url *must* be in a modifiable buffer.
static void
-tcp_pipe_start(void *arg, nni_aio *aio)
+tcptran_pipe_start(void *arg, nni_aio *aio)
{
- tcp_pipe *p = arg;
- nni_aio * negaio;
- nni_iov iov;
- int rv;
+ tcptran_pipe *p = arg;
+ nni_aio * negaio;
+ nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, tcp_cancel_nego, p)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tcptran_pipe_nego_cancel, p)) != 0) {
nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -582,35 +588,39 @@ tcp_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- nni_plat_tcp_pipe_send(p->tpp, negaio);
+ nni_tcp_conn_send(p->conn, negaio);
nni_mtx_unlock(&p->mtx);
}
static void
-tcp_ep_fini(void *arg)
+tcptran_dialer_fini(void *arg)
{
- tcp_ep *ep = arg;
+ tcptran_dialer *d = arg;
- nni_aio_stop(ep->aio);
- if (ep->tep != NULL) {
- nni_plat_tcp_ep_fini(ep->tep);
+ nni_aio_stop(d->aio);
+ if (d->dialer != NULL) {
+ nni_tcp_dialer_fini(d->dialer);
}
- nni_aio_fini(ep->aio);
- nni_mtx_fini(&ep->mtx);
- NNI_FREE_STRUCT(ep);
+ nni_aio_fini(d->aio);
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+static void
+tcptran_dialer_close(void *arg)
+{
+ tcptran_dialer *d = arg;
+
+ nni_aio_close(d->aio);
+ nni_tcp_dialer_close(d->dialer);
}
static int
-tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
+tcptran_dialer_init(void **dp, nni_url *url, nni_sock *sock)
{
- tcp_ep * ep;
- int rv;
- char * host;
- char * serv;
- nni_sockaddr rsa, lsa;
- nni_aio * aio;
- int passive;
- uint16_t af;
+ tcptran_dialer *d;
+ int rv;
+ uint16_t af;
if (strcmp(url->u_scheme, "tcp") == 0) {
af = NNG_AF_UNSPEC;
@@ -627,345 +637,543 @@ tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (NNG_EADDRINVAL);
}
if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) ||
- (url->u_query != NULL)) {
+ (url->u_query != NULL) || (strlen(url->u_hostname) == 0) ||
+ (strlen(url->u_port) == 0)) {
return (NNG_EADDRINVAL);
}
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
}
+ nni_mtx_init(&d->mtx);
- if (strlen(url->u_hostname) == 0) {
- host = NULL;
- } else {
- host = url->u_hostname;
+ if (((rv = nni_tcp_dialer_init(&d->dialer)) != 0) ||
+ ((rv = nni_aio_init(&d->aio, tcptran_dialer_cb, d)) != 0)) {
+ tcptran_dialer_fini(d);
+ return (rv);
}
- if (strlen(url->u_port) == 0) {
- serv = NULL;
- } else {
- serv = url->u_port;
- }
- // XXX: arguably we could defer this part to the point we do a bind
- // or connect!
- if (mode == NNI_EP_MODE_DIAL) {
- passive = 0;
- lsa.s_family = af;
- nni_aio_set_input(aio, 0, &rsa);
- if ((host == NULL) || (serv == NULL)) {
- nni_aio_fini(aio);
- return (NNG_EADDRINVAL);
+ d->url = url;
+ d->proto = nni_sock_proto_id(sock);
+ d->nodelay = true;
+ d->keepalive = false;
+ d->af = af;
+
+ *dp = d;
+ return (0);
+}
+
+static void
+tcptran_dialer_cb(void *arg)
+{
+ tcptran_dialer *d = arg;
+ tcptran_pipe * p;
+ nni_tcp_conn * conn;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&d->mtx);
+ aio = d->user_aio;
+ rv = nni_aio_result(d->aio);
+
+ if (aio == NULL) {
+ nni_mtx_unlock(&d->mtx);
+ if ((rv == 0) && !d->resolving) {
+ conn = nni_aio_get_output(d->aio, 0);
+ nni_tcp_conn_fini(conn);
}
- } else {
- passive = 1;
- rsa.s_family = af;
- nni_aio_set_input(aio, 0, &lsa);
+ return;
}
- nni_plat_tcp_resolv(host, serv, af, passive, aio);
- nni_aio_wait(aio);
- if ((rv = nni_aio_result(aio)) != 0) {
- nni_aio_fini(aio);
- return (rv);
+ if (rv != 0) {
+ d->user_aio = NULL;
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
- nni_aio_fini(aio);
+ if (d->resolving) {
+ // Name resolution complete. Now go to next step.
+ d->resolving = false;
+ nni_tcp_dialer_dial(d->dialer, &d->sa, d->aio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
+ d->user_aio = NULL;
+ conn = nni_aio_get_output(d->aio, 0);
+ NNI_ASSERT(conn != NULL);
+ if ((rv = tcptran_pipe_init(&p, conn)) != 0) {
+ nni_mtx_unlock(&d->mtx);
+ nni_tcp_conn_fini(conn);
+ nni_aio_finish_error(aio, rv);
+ return;
}
- nni_mtx_init(&ep->mtx);
- ep->url = url;
- if ((rv = nni_plat_tcp_ep_init(&ep->tep, &lsa, &rsa, mode)) != 0) {
- tcp_ep_fini(ep);
- return (rv);
+ p->proto = d->proto;
+ p->rcvmax = d->rcvmax;
+ p->nodelay = d->nodelay;
+ p->keepalive = d->keepalive;
+ nni_mtx_unlock(&d->mtx);
+
+ (void) nni_tcp_conn_set_nodelay(conn, p->nodelay);
+ (void) nni_tcp_conn_set_keepalive(conn, p->keepalive);
+
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+tcptran_dialer_cancel(nni_aio *aio, int rv)
+{
+ tcptran_dialer *d = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&d->mtx);
+ if (d->user_aio != aio) {
+ nni_mtx_unlock(&d->mtx);
+ return;
}
+ d->user_aio = NULL;
+ nni_mtx_unlock(&d->mtx);
- if ((rv = nni_aio_init(&ep->aio, tcp_ep_cb, ep)) != 0) {
- tcp_ep_fini(ep);
- return (rv);
+ nni_aio_abort(d->aio, rv);
+ nni_aio_finish_error(aio, rv);
+}
+
+static void
+tcptran_dialer_connect(void *arg, nni_aio *aio)
+{
+ tcptran_dialer *d = arg;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
- ep->proto = nni_sock_proto_id(sock);
- ep->nodelay = true;
- ep->keepalive = false;
+ nni_mtx_lock(&d->mtx);
+ NNI_ASSERT(d->user_aio == NULL);
- *epp = ep;
- return (0);
+ if ((rv = nni_aio_schedule(aio, tcptran_dialer_cancel, d)) != 0) {
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ d->user_aio = aio;
+
+ d->resolving = true;
+
+ // Start the name resolution. Callback will see resolving, and then
+ // switch to doing actual connect.
+ nni_aio_set_input(d->aio, 0, &d->sa);
+ nni_tcp_resolv(d->url->u_hostname, d->url->u_port, d->af, 0, d->aio);
+ nni_mtx_unlock(&d->mtx);
}
static int
-tcp_dialer_init(void **epp, nni_url *url, nni_sock *sock)
+tcptran_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- return (tcp_ep_init(epp, url, sock, NNI_EP_MODE_DIAL));
+ tcptran_dialer *d = arg;
+
+ return (nni_copyout_str(d->url->u_rawurl, v, szp, t));
}
static int
-tcp_listener_init(void **epp, nni_url *url, nni_sock *sock)
+tcptran_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- return (tcp_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN));
+ tcptran_dialer *d = arg;
+ return (nni_copyout_size(d->rcvmax, v, szp, t));
}
-static void
-tcp_ep_close(void *arg)
+static int
+tcptran_dialer_set_recvmaxsz(
+ void *arg, const void *v, size_t sz, nni_opt_type t)
{
- tcp_ep *ep = arg;
-
- nni_aio_close(ep->aio);
+ tcptran_dialer *d = arg;
+ size_t val;
+ int rv;
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ d->rcvmax = val;
+ nni_mtx_unlock(&d->mtx);
+ }
+ return (rv);
+}
- nni_mtx_lock(&ep->mtx);
- nni_plat_tcp_ep_close(ep->tep);
- nni_mtx_unlock(&ep->mtx);
+static int
+tcptran_dialer_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ tcptran_dialer *d = arg;
+ int rv;
+ nni_mtx_lock(&d->mtx);
+ rv = nni_copyout_bool(d->nodelay, v, szp, t);
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
}
static int
-tcp_ep_bind(void *arg)
+tcptran_dialer_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t)
{
- tcp_ep *ep = arg;
- int rv;
+ tcptran_dialer *d = arg;
+ bool val;
+ int rv;
+ if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ d->nodelay = val;
+ nni_mtx_unlock(&d->mtx);
+ }
+ return (rv);
+}
- nni_mtx_lock(&ep->mtx);
- rv = nni_plat_tcp_ep_listen(ep->tep, &ep->bsa);
- nni_mtx_unlock(&ep->mtx);
+static int
+tcptran_dialer_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ tcptran_dialer *d = arg;
+ return (nni_copyout_bool(d->keepalive, v, szp, t));
+}
+static int
+tcptran_dialer_set_keepalive(
+ void *arg, const void *v, size_t sz, nni_opt_type t)
+{
+ tcptran_dialer *d = arg;
+ bool val;
+ int rv;
+ if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ d->keepalive = val;
+ nni_mtx_unlock(&d->mtx);
+ }
return (rv);
}
static void
-tcp_ep_finish(tcp_ep *ep)
+tcptran_listener_fini(void *arg)
{
- nni_aio * aio;
- int rv;
- tcp_pipe *pipe = NULL;
+ tcptran_listener *l = arg;
- if ((rv = nni_aio_result(ep->aio)) != 0) {
- goto done;
+ nni_aio_stop(l->aio);
+ if (l->listener != NULL) {
+ nni_tcp_listener_fini(l->listener);
+ }
+ nni_aio_fini(l->aio);
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+static int
+tcptran_listener_init(void **lp, nni_url *url, nni_sock *sock)
+{
+ tcptran_listener *l;
+ int rv;
+ char * host;
+ nni_aio * aio;
+ uint16_t af;
+
+ if (strcmp(url->u_scheme, "tcp") == 0) {
+ af = NNG_AF_UNSPEC;
+ } else if (strcmp(url->u_scheme, "tcp4") == 0) {
+ af = NNG_AF_INET;
+ } else if (strcmp(url->u_scheme, "tcp6") == 0) {
+ af = NNG_AF_INET6;
+ } else {
+ return (NNG_EADDRINVAL);
}
- NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL);
- // Attempt to allocate the parent pipe. If this fails we'll
- // drop the connection (ENOMEM probably).
- rv = tcp_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0));
+ // Check for invalid URL components.
+ if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) {
+ return (NNG_EADDRINVAL);
+ }
+ if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) ||
+ (url->u_query != NULL)) {
+ return (NNG_EADDRINVAL);
+ }
-done:
- aio = ep->user_aio;
- ep->user_aio = NULL;
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&l->mtx);
+ l->url = url;
- if ((aio != NULL) && (rv == 0)) {
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
- return;
+ if (strlen(url->u_hostname) == 0) {
+ host = NULL;
+ } else {
+ host = url->u_hostname;
}
- if (pipe != NULL) {
- tcp_pipe_fini(pipe);
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ tcptran_listener_fini(l);
+ return (rv);
}
- if (aio != NULL) {
- NNI_ASSERT(rv != 0);
- nni_aio_finish_error(aio, rv);
+
+ // XXX: We are doing lookup at listener initialization. There is
+ // a valid argument that this should be done at bind time, but that
+ // would require making bind asynchronous. In some ways this would
+ // be worse than the cost of just waiting here. We always recommend
+ // using local IP addresses rather than names when possible.
+
+ nni_aio_set_input(aio, 0, &l->sa);
+
+ nni_tcp_resolv(host, url->u_port, af, 1, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
+
+ if (rv != 0) {
+ tcptran_listener_fini(l);
+ return (rv);
+ }
+
+ if ((rv = nni_tcp_listener_init(&l->listener)) != 0) {
+ tcptran_listener_fini(l);
+ return (rv);
+ }
+
+ if ((rv = nni_aio_init(&l->aio, tcptran_listener_cb, l)) != 0) {
+ tcptran_listener_fini(l);
+ return (rv);
}
+ l->proto = nni_sock_proto_id(sock);
+ l->nodelay = true;
+ l->keepalive = false;
+ l->bsa = l->sa;
+
+ *lp = l;
+ return (0);
}
static void
-tcp_ep_cb(void *arg)
+tcptran_listener_close(void *arg)
{
- tcp_ep *ep = arg;
+ tcptran_listener *l = arg;
- nni_mtx_lock(&ep->mtx);
- tcp_ep_finish(ep);
- nni_mtx_unlock(&ep->mtx);
+ nni_aio_close(l->aio);
+ nni_tcp_listener_close(l->listener);
}
-static void
-tcp_cancel_ep(nni_aio *aio, int rv)
+static int
+tcptran_listener_bind(void *arg)
{
- tcp_ep *ep = nni_aio_get_prov_data(aio);
+ tcptran_listener *l = arg;
+ int rv;
- nni_mtx_lock(&ep->mtx);
- if (ep->user_aio != aio) {
- nni_mtx_unlock(&ep->mtx);
- return;
- }
- ep->user_aio = NULL;
- nni_mtx_unlock(&ep->mtx);
+ l->bsa = l->sa;
+ nni_mtx_lock(&l->mtx);
+ rv = nni_tcp_listener_listen(l->listener, &l->bsa);
+ nni_mtx_unlock(&l->mtx);
- nni_aio_abort(ep->aio, rv);
- nni_aio_finish_error(aio, rv);
+ return (rv);
}
static void
-tcp_ep_accept(void *arg, nni_aio *aio)
+tcptran_listener_cb(void *arg)
{
- tcp_ep *ep = arg;
- int rv;
+ tcptran_listener *l = arg;
+ nni_aio * aio;
+ int rv;
+ tcptran_pipe * p = NULL;
+ nni_tcp_conn * conn;
+
+ nni_mtx_lock(&l->mtx);
+ rv = nni_aio_result(l->aio);
+ aio = l->user_aio;
+ l->user_aio = NULL;
+
+ if (aio == NULL) {
+ nni_mtx_unlock(&l->mtx);
+ if (rv == 0) {
+ conn = nni_aio_get_output(l->aio, 0);
+ nni_tcp_conn_fini(conn);
+ }
+ return;
+ }
- if (nni_aio_begin(aio) != 0) {
+ if (rv != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, rv);
return;
}
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- if ((rv = nni_aio_schedule(aio, tcp_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ conn = nni_aio_get_output(l->aio, 0);
+
+ NNI_ASSERT(conn != NULL);
+ if ((rv = tcptran_pipe_init(&p, conn)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_tcp_conn_fini(conn);
nni_aio_finish_error(aio, rv);
return;
}
- ep->user_aio = aio;
- nni_plat_tcp_ep_accept(ep->tep, ep->aio);
- nni_mtx_unlock(&ep->mtx);
+ p->proto = l->proto;
+ p->rcvmax = l->rcvmax;
+ p->nodelay = l->nodelay;
+ p->keepalive = l->keepalive;
+ nni_mtx_unlock(&l->mtx);
+
+ (void) nni_tcp_conn_set_nodelay(conn, p->nodelay);
+ (void) nni_tcp_conn_set_keepalive(conn, p->keepalive);
+
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
}
static void
-tcp_ep_connect(void *arg, nni_aio *aio)
+tcptran_listener_cancel(nni_aio *aio, int rv)
{
- tcp_ep *ep = arg;
- int rv;
+ tcptran_listener *l = nni_aio_get_prov_data(aio);
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- if ((rv = nni_aio_schedule(aio, tcp_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
+ nni_mtx_lock(&l->mtx);
+ if (l->user_aio != aio) {
+ nni_mtx_unlock(&l->mtx);
return;
}
- ep->user_aio = aio;
+ l->user_aio = NULL;
+ nni_mtx_unlock(&l->mtx);
- nni_plat_tcp_ep_connect(ep->tep, ep->aio);
- nni_mtx_unlock(&ep->mtx);
+ nni_aio_abort(l->aio, rv);
+ nni_aio_finish_error(aio, rv);
}
-static int
-tcp_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
+static void
+tcptran_listener_accept(void *arg, nni_aio *aio)
{
- tcp_ep *ep = arg;
- size_t val;
- int rv;
- if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
- nni_mtx_lock(&ep->mtx);
- ep->rcvmax = val;
- nni_mtx_unlock(&ep->mtx);
+ tcptran_listener *l = arg;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
- return (rv);
-}
+ nni_mtx_lock(&l->mtx);
+ NNI_ASSERT(l->user_aio == NULL);
-static int
-tcp_ep_chk_recvmaxsz(const void *v, size_t sz, nni_opt_type t)
-{
- return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t));
+ if ((rv = nni_aio_schedule(aio, tcptran_listener_cancel, l)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ l->user_aio = aio;
+
+ nni_tcp_listener_accept(l->listener, l->aio);
+ nni_mtx_unlock(&l->mtx);
}
static int
-tcp_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t)
+tcptran_listener_set_nodelay(
+ void *arg, const void *v, size_t sz, nni_opt_type t)
{
- tcp_ep *ep = arg;
- bool val;
- int rv;
+ tcptran_listener *l = arg;
+ bool val;
+ int rv;
if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) {
- nni_mtx_lock(&ep->mtx);
- ep->nodelay = val;
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_lock(&l->mtx);
+ l->nodelay = val;
+ nni_mtx_unlock(&l->mtx);
}
return (rv);
}
static int
-tcp_ep_chk_bool(const void *v, size_t sz, nni_opt_type t)
+tcptran_listener_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- return (nni_copyin_bool(NULL, v, sz, t));
+ tcptran_listener *l = arg;
+ int rv;
+ nni_mtx_lock(&l->mtx);
+ rv = nni_copyout_bool(l->nodelay, v, szp, t);
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
}
static int
-tcp_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
+tcptran_listener_set_recvmaxsz(
+ void *arg, const void *v, size_t sz, nni_opt_type t)
{
- tcp_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- rv = nni_copyout_bool(ep->nodelay, v, szp, t);
- nni_mtx_unlock(&ep->mtx);
+ tcptran_listener *l = arg;
+ size_t val;
+ int rv;
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ l->rcvmax = val;
+ nni_mtx_unlock(&l->mtx);
+ }
return (rv);
}
static int
-tcp_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t)
+tcptran_listener_set_keepalive(
+ void *arg, const void *v, size_t sz, nni_opt_type t)
{
- tcp_ep *ep = arg;
- bool val;
- int rv;
+ tcptran_listener *l = arg;
+ bool val;
+ int rv;
if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) {
- nni_mtx_lock(&ep->mtx);
- ep->keepalive = val;
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_lock(&l->mtx);
+ l->keepalive = val;
+ nni_mtx_unlock(&l->mtx);
}
return (rv);
}
static int
-tcp_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
+tcptran_listener_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tcp_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- rv = nni_copyout_bool(ep->keepalive, v, szp, t);
- nni_mtx_unlock(&ep->mtx);
+ tcptran_listener *l = arg;
+ int rv;
+ nni_mtx_lock(&l->mtx);
+ rv = nni_copyout_bool(l->keepalive, v, szp, t);
+ nni_mtx_unlock(&l->mtx);
return (rv);
}
static int
-tcp_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
+tcptran_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tcp_ep *ep = arg;
+ tcptran_listener *l = arg;
+ char ustr[128];
+ char ipstr[48]; // max for IPv6 addresses including []
+ char portstr[6]; // max for 16-bit port
- return (nni_copyout_str(ep->url->u_rawurl, v, szp, t));
+ nni_ntop(&l->bsa, ipstr, portstr);
+ snprintf(ustr, sizeof(ustr), "tcp://%s:%s", ipstr, portstr);
+ return (nni_copyout_str(ustr, v, szp, t));
}
static int
-tcp_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
+tcptran_listener_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tcp_ep *ep = arg;
- char ustr[128];
- char ipstr[48]; // max for IPv6 addresses including []
- char portstr[6]; // max for 16-bit port
+ tcptran_listener *l = arg;
+ return (nni_copyout_size(l->rcvmax, v, szp, t));
+}
- nni_plat_tcp_ntop(&ep->bsa, ipstr, portstr);
- snprintf(ustr, sizeof(ustr), "tcp://%s:%s", ipstr, portstr);
- return (nni_copyout_str(ustr, v, szp, t));
+static int
+tcptran_check_bool(const void *v, size_t sz, nni_opt_type t)
+{
+ return (nni_copyin_bool(NULL, v, sz, t));
}
static int
-tcp_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
+tcptran_check_recvmaxsz(const void *v, size_t sz, nni_opt_type t)
{
- tcp_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- rv = nni_copyout_size(ep->rcvmax, v, szp, t);
- nni_mtx_unlock(&ep->mtx);
- return (rv);
+ return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t));
}
-static nni_tran_option tcp_pipe_options[] = {
+static nni_tran_option tcptran_pipe_options[] = {
{
.o_name = NNG_OPT_LOCADDR,
.o_type = NNI_TYPE_SOCKADDR,
- .o_get = tcp_pipe_get_locaddr,
+ .o_get = tcptran_pipe_get_locaddr,
},
{
.o_name = NNG_OPT_REMADDR,
.o_type = NNI_TYPE_SOCKADDR,
- .o_get = tcp_pipe_get_remaddr,
+ .o_get = tcptran_pipe_get_remaddr,
},
{
.o_name = NNG_OPT_TCP_KEEPALIVE,
.o_type = NNI_TYPE_BOOL,
- .o_get = tcp_pipe_get_keepalive,
+ .o_get = tcptran_pipe_get_keepalive,
},
{
.o_name = NNG_OPT_TCP_NODELAY,
.o_type = NNI_TYPE_BOOL,
- .o_get = tcp_pipe_get_nodelay,
+ .o_get = tcptran_pipe_get_nodelay,
},
// terminate list
{
@@ -973,43 +1181,43 @@ static nni_tran_option tcp_pipe_options[] = {
},
};
-static nni_tran_pipe_ops tcp_pipe_ops = {
- .p_fini = tcp_pipe_fini,
- .p_start = tcp_pipe_start,
- .p_stop = tcp_pipe_stop,
- .p_send = tcp_pipe_send,
- .p_recv = tcp_pipe_recv,
- .p_close = tcp_pipe_close,
- .p_peer = tcp_pipe_peer,
- .p_options = tcp_pipe_options,
+static nni_tran_pipe_ops tcptran_pipe_ops = {
+ .p_fini = tcptran_pipe_fini,
+ .p_start = tcptran_pipe_start,
+ .p_stop = tcptran_pipe_stop,
+ .p_send = tcptran_pipe_send,
+ .p_recv = tcptran_pipe_recv,
+ .p_close = tcptran_pipe_close,
+ .p_peer = tcptran_pipe_peer,
+ .p_options = tcptran_pipe_options,
};
-static nni_tran_option tcp_dialer_options[] = {
+static nni_tran_option tcptran_dialer_options[] = {
{
.o_name = NNG_OPT_RECVMAXSZ,
.o_type = NNI_TYPE_SIZE,
- .o_get = tcp_ep_get_recvmaxsz,
- .o_set = tcp_ep_set_recvmaxsz,
- .o_chk = tcp_ep_chk_recvmaxsz,
+ .o_get = tcptran_dialer_get_recvmaxsz,
+ .o_set = tcptran_dialer_set_recvmaxsz,
+ .o_chk = tcptran_check_recvmaxsz,
},
{
.o_name = NNG_OPT_URL,
.o_type = NNI_TYPE_STRING,
- .o_get = tcp_dialer_get_url,
+ .o_get = tcptran_dialer_get_url,
},
{
.o_name = NNG_OPT_TCP_NODELAY,
.o_type = NNI_TYPE_BOOL,
- .o_get = tcp_ep_get_nodelay,
- .o_set = tcp_ep_set_nodelay,
- .o_chk = tcp_ep_chk_bool,
+ .o_get = tcptran_dialer_get_nodelay,
+ .o_set = tcptran_dialer_set_nodelay,
+ .o_chk = tcptran_check_bool,
},
{
.o_name = NNG_OPT_TCP_KEEPALIVE,
.o_type = NNI_TYPE_BOOL,
- .o_get = tcp_ep_get_keepalive,
- .o_set = tcp_ep_set_keepalive,
- .o_chk = tcp_ep_chk_bool,
+ .o_get = tcptran_dialer_get_keepalive,
+ .o_set = tcptran_dialer_set_keepalive,
+ .o_chk = tcptran_check_bool,
},
// terminate list
{
@@ -1017,32 +1225,32 @@ static nni_tran_option tcp_dialer_options[] = {
},
};
-static nni_tran_option tcp_listener_options[] = {
+static nni_tran_option tcptran_listener_options[] = {
{
.o_name = NNG_OPT_RECVMAXSZ,
.o_type = NNI_TYPE_SIZE,
- .o_get = tcp_ep_get_recvmaxsz,
- .o_set = tcp_ep_set_recvmaxsz,
- .o_chk = tcp_ep_chk_recvmaxsz,
+ .o_get = tcptran_listener_get_recvmaxsz,
+ .o_set = tcptran_listener_set_recvmaxsz,
+ .o_chk = tcptran_check_recvmaxsz,
},
{
.o_name = NNG_OPT_URL,
.o_type = NNI_TYPE_STRING,
- .o_get = tcp_listener_get_url,
+ .o_get = tcptran_listener_get_url,
},
{
.o_name = NNG_OPT_TCP_NODELAY,
.o_type = NNI_TYPE_BOOL,
- .o_get = tcp_ep_get_nodelay,
- .o_set = tcp_ep_set_nodelay,
- .o_chk = tcp_ep_chk_bool,
+ .o_get = tcptran_listener_get_nodelay,
+ .o_set = tcptran_listener_set_nodelay,
+ .o_chk = tcptran_check_bool,
},
{
.o_name = NNG_OPT_TCP_KEEPALIVE,
.o_type = NNI_TYPE_BOOL,
- .o_get = tcp_ep_get_keepalive,
- .o_set = tcp_ep_set_keepalive,
- .o_chk = tcp_ep_chk_bool,
+ .o_get = tcptran_listener_get_keepalive,
+ .o_set = tcptran_listener_set_keepalive,
+ .o_chk = tcptran_check_bool,
},
// terminate list
{
@@ -1050,51 +1258,51 @@ static nni_tran_option tcp_listener_options[] = {
},
};
-static nni_tran_dialer_ops tcp_dialer_ops = {
- .d_init = tcp_dialer_init,
- .d_fini = tcp_ep_fini,
- .d_connect = tcp_ep_connect,
- .d_close = tcp_ep_close,
- .d_options = tcp_dialer_options,
+static nni_tran_dialer_ops tcptran_dialer_ops = {
+ .d_init = tcptran_dialer_init,
+ .d_fini = tcptran_dialer_fini,
+ .d_connect = tcptran_dialer_connect,
+ .d_close = tcptran_dialer_close,
+ .d_options = tcptran_dialer_options,
};
-static nni_tran_listener_ops tcp_listener_ops = {
- .l_init = tcp_listener_init,
- .l_fini = tcp_ep_fini,
- .l_bind = tcp_ep_bind,
- .l_accept = tcp_ep_accept,
- .l_close = tcp_ep_close,
- .l_options = tcp_listener_options,
+static nni_tran_listener_ops tcptran_listener_ops = {
+ .l_init = tcptran_listener_init,
+ .l_fini = tcptran_listener_fini,
+ .l_bind = tcptran_listener_bind,
+ .l_accept = tcptran_listener_accept,
+ .l_close = tcptran_listener_close,
+ .l_options = tcptran_listener_options,
};
static nni_tran tcp_tran = {
.tran_version = NNI_TRANSPORT_VERSION,
.tran_scheme = "tcp",
- .tran_dialer = &tcp_dialer_ops,
- .tran_listener = &tcp_listener_ops,
- .tran_pipe = &tcp_pipe_ops,
- .tran_init = tcp_tran_init,
- .tran_fini = tcp_tran_fini,
+ .tran_dialer = &tcptran_dialer_ops,
+ .tran_listener = &tcptran_listener_ops,
+ .tran_pipe = &tcptran_pipe_ops,
+ .tran_init = tcptran_init,
+ .tran_fini = tcptran_fini,
};
static nni_tran tcp4_tran = {
.tran_version = NNI_TRANSPORT_VERSION,
.tran_scheme = "tcp4",
- .tran_dialer = &tcp_dialer_ops,
- .tran_listener = &tcp_listener_ops,
- .tran_pipe = &tcp_pipe_ops,
- .tran_init = tcp_tran_init,
- .tran_fini = tcp_tran_fini,
+ .tran_dialer = &tcptran_dialer_ops,
+ .tran_listener = &tcptran_listener_ops,
+ .tran_pipe = &tcptran_pipe_ops,
+ .tran_init = tcptran_init,
+ .tran_fini = tcptran_fini,
};
static nni_tran tcp6_tran = {
.tran_version = NNI_TRANSPORT_VERSION,
.tran_scheme = "tcp6",
- .tran_dialer = &tcp_dialer_ops,
- .tran_listener = &tcp_listener_ops,
- .tran_pipe = &tcp_pipe_ops,
- .tran_init = tcp_tran_init,
- .tran_fini = tcp_tran_fini,
+ .tran_dialer = &tcptran_dialer_ops,
+ .tran_listener = &tcptran_listener_ops,
+ .tran_pipe = &tcptran_pipe_ops,
+ .tran_init = tcptran_init,
+ .tran_fini = tcptran_fini,
};
int
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 5e1a1e8d..b1bdddb8 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -22,17 +22,19 @@
// supplied as well, and uses the supplemental TLS v1.2 code. It is not
// an accident that this very closely resembles the TCP transport itself.
-typedef struct tls_pipe tls_pipe;
-typedef struct tls_ep tls_ep;
-
-// tls_pipe is one end of a TLS connection.
-struct tls_pipe {
- nni_plat_tcp_pipe *tcp;
- uint16_t peer;
- uint16_t proto;
- size_t rcvmax;
- bool nodelay;
- bool keepalive;
+typedef struct tlstran_ep tlstran_ep;
+typedef struct tlstran_dialer tlstran_dialer;
+typedef struct tlstran_listener tlstran_listener;
+typedef struct tlstran_pipe tlstran_pipe;
+
+// tlstran_pipe is one end of a TLS connection.
+struct tlstran_pipe {
+ nni_tls *tls;
+ uint16_t peer;
+ uint16_t proto;
+ size_t rcvmax;
+ bool nodelay;
+ bool keepalive;
nni_list sendq;
nni_list recvq;
@@ -49,47 +51,62 @@ struct tls_pipe {
nni_aio *negaio;
nni_msg *rxmsg;
nni_mtx mtx;
- nni_tls *tls;
};
-struct tls_ep {
- nni_plat_tcp_ep *tep;
- uint16_t proto;
- size_t rcvmax;
- int authmode;
- nni_aio * aio;
- nni_aio * user_aio;
- nni_mtx mtx;
- nng_tls_config * cfg;
- nng_sockaddr bsa;
- nni_url * url;
- int mode;
- bool nodelay;
- bool keepalive;
+// Stuff that is common to both dialers and listeners.
+struct tlstran_ep {
+ uint16_t proto;
+ size_t rcvmax;
+ bool nodelay;
+ bool keepalive;
+ int authmode;
+ nng_tls_config *cfg;
+ nni_url * url;
+ nni_mtx mtx;
+};
+
+struct tlstran_dialer {
+ tlstran_ep ep; // must be first
+ nni_tcp_dialer *dialer;
+ uint16_t af;
+ nni_aio * aio;
+ nni_aio * user_aio;
+ bool resolving;
+ nng_sockaddr sa;
+};
+
+struct tlstran_listener {
+ tlstran_ep ep; // must be first
+ nni_tcp_listener *listener;
+ nni_aio * aio;
+ nni_aio * user_aio;
+ nng_sockaddr sa;
+ nng_sockaddr bsa; // bound addr
};
-static void tls_pipe_dorecv(tls_pipe *);
-static void tls_pipe_dosend(tls_pipe *, nni_aio *);
-static void tls_pipe_send_cb(void *);
-static void tls_pipe_recv_cb(void *);
-static void tls_pipe_nego_cb(void *);
-static void tls_ep_cb(void *arg);
+static void tlstran_pipe_send_start(tlstran_pipe *);
+static void tlstran_pipe_recv_start(tlstran_pipe *);
+static void tlstran_pipe_send_cb(void *);
+static void tlstran_pipe_recv_cb(void *);
+static void tlstran_pipe_nego_cb(void *);
+static void tlstran_dialer_cb(void *);
+static void tlstran_listener_cb(void *);
static int
-tls_tran_init(void)
+tlstran_init(void)
{
return (0);
}
static void
-tls_tran_fini(void)
+tlstran_fini(void)
{
}
static void
-tls_pipe_close(void *arg)
+tlstran_pipe_close(void *arg)
{
- tls_pipe *p = arg;
+ tlstran_pipe *p = arg;
nni_aio_close(p->rxaio);
nni_aio_close(p->txaio);
@@ -99,9 +116,9 @@ tls_pipe_close(void *arg)
}
static void
-tls_pipe_stop(void *arg)
+tlstran_pipe_stop(void *arg)
{
- tls_pipe *p = arg;
+ tlstran_pipe *p = arg;
nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
@@ -109,14 +126,13 @@ tls_pipe_stop(void *arg)
}
static void
-tls_pipe_fini(void *arg)
+tlstran_pipe_fini(void *arg)
{
- tls_pipe *p = arg;
+ tlstran_pipe *p = arg;
nni_aio_fini(p->rxaio);
nni_aio_fini(p->txaio);
nni_aio_fini(p->negaio);
-
if (p->tls != NULL) {
nni_tls_fini(p->tls);
}
@@ -125,41 +141,34 @@ tls_pipe_fini(void *arg)
}
static int
-tls_pipe_init(tls_pipe **pipep, tls_ep *ep, void *tpp)
+tlstran_pipe_init(tlstran_pipe **pipep, nni_tls *tls)
{
- tls_pipe * p;
- nni_plat_tcp_pipe *tcp = tpp;
- int rv;
+ tlstran_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- if (((rv = nni_tls_init(&p->tls, ep->cfg, tcp)) != 0) ||
- ((rv = nni_aio_init(&p->txaio, tls_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->rxaio, tls_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->negaio, tls_pipe_nego_cb, p)) != 0)) {
- tls_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->txaio, tlstran_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, tlstran_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->negaio, tlstran_pipe_nego_cb, p)) != 0)) {
+ tlstran_pipe_fini(p);
return (rv);
}
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
- p->proto = ep->proto;
- p->rcvmax = ep->rcvmax;
- p->tcp = tcp;
- p->keepalive = ep->keepalive;
- p->nodelay = ep->nodelay;
-
+ p->tls = tls;
*pipep = p;
return (0);
}
static void
-tls_cancel_nego(nni_aio *aio, int rv)
+tlstran_pipe_cancel_nego(nni_aio *aio, int rv)
{
- tls_pipe *p = nni_aio_get_prov_data(aio);
+ tlstran_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_negaio != aio) {
@@ -174,11 +183,11 @@ tls_cancel_nego(nni_aio *aio, int rv)
}
static void
-tls_pipe_nego_cb(void *arg)
+tlstran_pipe_nego_cb(void *arg)
{
- tls_pipe *p = arg;
- nni_aio * aio = p->negaio;
- int rv;
+ tlstran_pipe *p = arg;
+ nni_aio * aio = p->negaio;
+ int rv;
nni_mtx_lock(&p->mtx);
if ((rv = nni_aio_result(aio)) != 0) {
@@ -237,14 +246,14 @@ done:
}
static void
-tls_pipe_send_cb(void *arg)
+tlstran_pipe_send_cb(void *arg)
{
- tls_pipe *p = arg;
- int rv;
- nni_aio * aio;
- size_t n;
- nni_msg * msg;
- nni_aio * txaio = p->txaio;
+ tlstran_pipe *p = arg;
+ int rv;
+ nni_aio * aio;
+ size_t n;
+ nni_msg * msg;
+ nni_aio * txaio = p->txaio;
nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->sendq);
@@ -269,9 +278,7 @@ tls_pipe_send_cb(void *arg)
return;
}
nni_aio_list_remove(aio);
- if (!nni_list_empty(&p->sendq)) {
- tls_pipe_dosend(p, nni_list_first(&p->sendq));
- }
+ tlstran_pipe_send_start(p);
nni_mtx_unlock(&p->mtx);
msg = nni_aio_get_msg(aio);
@@ -282,14 +289,14 @@ tls_pipe_send_cb(void *arg)
}
static void
-tls_pipe_recv_cb(void *arg)
+tlstran_pipe_recv_cb(void *arg)
{
- tls_pipe *p = arg;
- nni_aio * aio;
- int rv;
- size_t n;
- nni_msg * msg;
- nni_aio * rxaio = p->rxaio;
+ tlstran_pipe *p = arg;
+ nni_aio * aio;
+ int rv;
+ size_t n;
+ nni_msg * msg;
+ nni_aio * rxaio = p->rxaio;
nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->recvq);
@@ -345,7 +352,7 @@ tls_pipe_recv_cb(void *arg)
msg = p->rxmsg;
p->rxmsg = NULL;
if (!nni_list_empty(&p->recvq)) {
- tls_pipe_dorecv(p);
+ tlstran_pipe_recv_start(p);
}
nni_mtx_unlock(&p->mtx);
@@ -365,9 +372,9 @@ recv_error:
}
static void
-tls_cancel_tx(nni_aio *aio, int rv)
+tlstran_pipe_send_cancel(nni_aio *aio, int rv)
{
- tls_pipe *p = nni_aio_get_prov_data(aio);
+ tlstran_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (!nni_aio_list_active(aio)) {
@@ -389,14 +396,19 @@ tls_cancel_tx(nni_aio *aio, int rv)
}
static void
-tls_pipe_dosend(tls_pipe *p, nni_aio *aio)
+tlstran_pipe_send_start(tlstran_pipe *p)
{
nni_aio *txaio;
+ nni_aio *aio;
nni_msg *msg;
int niov;
nni_iov iov[3];
uint64_t len;
+ if ((aio = nni_list_first(&p->sendq)) == NULL) {
+ return;
+ }
+
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -423,31 +435,31 @@ tls_pipe_dosend(tls_pipe *p, nni_aio *aio)
}
static void
-tls_pipe_send(void *arg, nni_aio *aio)
+tlstran_pipe_send(void *arg, nni_aio *aio)
{
- tls_pipe *p = arg;
- int rv;
+ tlstran_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, tls_cancel_tx, p)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tlstran_pipe_send_cancel, p)) != 0) {
nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
- tls_pipe_dosend(p, aio);
+ tlstran_pipe_send_start(p);
}
nni_mtx_unlock(&p->mtx);
}
static void
-tls_cancel_rx(nni_aio *aio, int rv)
+tlstran_pipe_recv_cancel(nni_aio *aio, int rv)
{
- tls_pipe *p = nni_aio_get_prov_data(aio);
+ tlstran_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (!nni_aio_list_active(aio)) {
@@ -468,7 +480,7 @@ tls_cancel_rx(nni_aio *aio, int rv)
}
static void
-tls_pipe_dorecv(tls_pipe *p)
+tlstran_pipe_recv_start(tlstran_pipe *p)
{
nni_aio *rxaio;
nni_iov iov;
@@ -484,16 +496,16 @@ tls_pipe_dorecv(tls_pipe *p)
}
static void
-tls_pipe_recv(void *arg, nni_aio *aio)
+tlstran_pipe_recv(void *arg, nni_aio *aio)
{
- tls_pipe *p = arg;
- int rv;
+ tlstran_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, tls_cancel_rx, p)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tlstran_pipe_recv_cancel, p)) != 0) {
nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -501,25 +513,25 @@ tls_pipe_recv(void *arg, nni_aio *aio)
nni_aio_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
- tls_pipe_dorecv(p);
+ tlstran_pipe_recv_start(p);
}
nni_mtx_unlock(&p->mtx);
}
static uint16_t
-tls_pipe_peer(void *arg)
+tlstran_pipe_peer(void *arg)
{
- tls_pipe *p = arg;
+ tlstran_pipe *p = arg;
return (p->peer);
}
static int
-tls_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
+tlstran_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tls_pipe * p = arg;
- int rv;
- nni_sockaddr sa;
+ tlstran_pipe *p = arg;
+ int rv;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
if ((rv = nni_tls_sockname(p->tls, &sa)) == 0) {
@@ -529,11 +541,11 @@ tls_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
}
static int
-tls_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
+tlstran_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tls_pipe * p = arg;
- int rv;
- nni_sockaddr sa;
+ tlstran_pipe *p = arg;
+ int rv;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
if ((rv = nni_tls_peername(p->tls, &sa)) == 0) {
@@ -543,32 +555,32 @@ tls_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
}
static int
-tls_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
+tlstran_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tls_pipe *p = arg;
+ tlstran_pipe *p = arg;
return (nni_copyout_bool(p->keepalive, v, szp, t));
}
static int
-tls_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
+tlstran_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tls_pipe *p = arg;
+ tlstran_pipe *p = arg;
return (nni_copyout_bool(p->nodelay, v, szp, t));
}
static void
-tls_pipe_start(void *arg, nni_aio *aio)
+tlstran_pipe_start(void *arg, nni_aio *aio)
{
- tls_pipe *p = arg;
- nni_aio * negaio;
- nni_iov iov;
- int rv;
+ tlstran_pipe *p = arg;
+ nni_aio * negaio;
+ nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, tls_cancel_nego, p)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tlstran_pipe_cancel_nego, p)) != 0) {
nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -594,35 +606,39 @@ tls_pipe_start(void *arg, nni_aio *aio)
}
static void
-tls_ep_fini(void *arg)
+tlstran_dialer_fini(void *arg)
{
- tls_ep *ep = arg;
+ tlstran_dialer *d = arg;
- nni_aio_stop(ep->aio);
- if (ep->tep != NULL) {
- nni_plat_tcp_ep_fini(ep->tep);
+ nni_aio_stop(d->aio);
+ if (d->dialer != NULL) {
+ nni_tcp_dialer_fini(d->dialer);
}
- if (ep->cfg) {
- nni_tls_config_fini(ep->cfg);
+ nni_aio_fini(d->aio);
+ if (d->ep.cfg != NULL) {
+ nni_tls_config_fini(d->ep.cfg);
}
- nni_aio_fini(ep->aio);
- nni_mtx_fini(&ep->mtx);
- NNI_FREE_STRUCT(ep);
+ nni_mtx_fini(&d->ep.mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+static void
+tlstran_dialer_close(void *arg)
+{
+ tlstran_dialer *d = arg;
+
+ nni_aio_close(d->aio);
+ nni_tcp_dialer_close(d->dialer);
}
static int
-tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
+tlstran_dialer_init(void **dp, nni_url *url, nni_sock *sock)
{
- tls_ep * ep;
- int rv;
- char * host;
- char * serv;
- nni_sockaddr rsa, lsa;
- nni_aio * aio;
- int passive;
- nng_tls_mode tlsmode;
- nng_tls_auth_mode authmode;
- uint16_t af;
+ tlstran_dialer *d;
+ int rv;
+ uint16_t af;
+ char * host = url->u_hostname;
+ char * port = url->u_port;
if (strcmp(url->u_scheme, "tls+tcp") == 0) {
af = NNG_AF_UNSPEC;
@@ -639,232 +655,374 @@ tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (NNG_EADDRINVAL);
}
if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) ||
- (url->u_query != NULL)) {
+ (url->u_query != NULL) || (host == NULL) || (port == NULL) ||
+ (strlen(host) == 0) || (strlen(port) == 0)) {
return (NNG_EADDRINVAL);
}
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ nni_mtx_init(&d->ep.mtx);
+ d->ep.authmode = NNG_TLS_AUTH_MODE_REQUIRED;
+ d->ep.url = url;
+ d->ep.proto = nni_sock_proto_id(sock);
+ d->ep.nodelay = true;
+ d->ep.keepalive = false;
+
+ if (((rv = nni_tcp_dialer_init(&d->dialer)) != 0) ||
+ ((rv = nni_tls_config_init(&d->ep.cfg, NNG_TLS_MODE_CLIENT)) !=
+ 0) ||
+ ((rv = nng_tls_config_auth_mode(d->ep.cfg, d->ep.authmode)) !=
+ 0) ||
+ ((rv = nng_tls_config_server_name(d->ep.cfg, host)) != 0) ||
+ ((rv = nni_aio_init(&d->aio, tlstran_dialer_cb, d)) != 0)) {
+ tlstran_dialer_fini(d);
return (rv);
}
+ d->af = af;
- if (strlen(url->u_hostname) == 0) {
- host = NULL;
- } else {
- host = url->u_hostname;
- }
- if (strlen(url->u_port) == 0) {
- serv = NULL;
- } else {
- serv = url->u_port;
- }
-
- if (mode == NNI_EP_MODE_DIAL) {
- passive = 0;
- tlsmode = NNG_TLS_MODE_CLIENT;
- authmode = NNG_TLS_AUTH_MODE_REQUIRED;
- lsa.s_family = af;
- nni_aio_set_input(aio, 0, &rsa);
- if ((host == NULL) || (serv == NULL)) {
- nni_aio_fini(aio);
- return (NNG_EADDRINVAL);
+ *dp = d;
+ return (0);
+}
+
+static void
+tlstran_dialer_cb(void *arg)
+{
+ tlstran_dialer *d = arg;
+ tlstran_pipe * p;
+ nni_tcp_conn * conn;
+ nni_tls * tls;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&d->ep.mtx);
+ aio = d->user_aio;
+ rv = nni_aio_result(d->aio);
+
+ if (aio == NULL) {
+ nni_mtx_unlock(&d->ep.mtx);
+ if ((rv == 0) && !d->resolving) {
+ conn = nni_aio_get_output(d->aio, 0);
+ nni_tcp_conn_fini(conn);
}
- } else {
- passive = 1;
- tlsmode = NNG_TLS_MODE_SERVER;
- authmode = NNG_TLS_AUTH_MODE_NONE;
- rsa.s_family = af;
- nni_aio_set_input(aio, 0, &lsa);
+ return;
}
- // XXX: arguably we could defer this part to the point we do a bind
- // or connect!
- nni_plat_tcp_resolv(host, serv, af, passive, aio);
- nni_aio_wait(aio);
- if ((rv = nni_aio_result(aio)) != 0) {
- nni_aio_fini(aio);
- return (rv);
+ if (rv != 0) {
+ d->user_aio = NULL;
+ nni_mtx_unlock(&d->ep.mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
- nni_aio_fini(aio);
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
+ if (d->resolving) {
+ // Name resolution complete. Now go to next step.
+ d->resolving = false;
+ nni_tcp_dialer_dial(d->dialer, &d->sa, d->aio);
+ nni_mtx_unlock(&d->ep.mtx);
+ return;
}
- nni_mtx_init(&ep->mtx);
- ep->url = url;
- ep->keepalive = false;
- ep->nodelay = true;
+ d->user_aio = NULL;
+ conn = nni_aio_get_output(d->aio, 0);
+ NNI_ASSERT(conn != NULL);
- if (((rv = nni_plat_tcp_ep_init(&ep->tep, &lsa, &rsa, mode)) != 0) ||
- ((rv = nni_tls_config_init(&ep->cfg, tlsmode)) != 0) ||
- ((rv = nng_tls_config_auth_mode(ep->cfg, authmode)) != 0) ||
- ((rv = nni_aio_init(&ep->aio, tls_ep_cb, ep)) != 0)) {
- tls_ep_fini(ep);
- return (rv);
+ if ((rv = nni_tls_init(&tls, d->ep.cfg, conn)) != 0) {
+ nni_mtx_unlock(&d->ep.mtx);
+ nni_tcp_conn_fini(conn);
+ nni_aio_finish_error(aio, rv);
+ return;
}
- if ((tlsmode == NNG_TLS_MODE_CLIENT) && (host != NULL)) {
- if ((rv = nng_tls_config_server_name(ep->cfg, host)) != 0) {
- tls_ep_fini(ep);
- return (rv);
- }
+
+ if ((rv = tlstran_pipe_init(&p, tls)) != 0) {
+ nni_mtx_unlock(&d->ep.mtx);
+ nni_tls_fini(tls);
+ nni_aio_finish_error(aio, rv);
+ return;
}
- ep->proto = nni_sock_proto_id(sock);
- ep->authmode = authmode;
- *epp = ep;
- return (0);
-}
+ p->proto = d->ep.proto;
+ p->rcvmax = d->ep.rcvmax;
+ p->nodelay = d->ep.nodelay;
+ p->keepalive = d->ep.keepalive;
+ nni_mtx_unlock(&d->ep.mtx);
-static int
-tls_dialer_init(void **epp, nni_url *url, nni_sock *sock)
-{
- return (tls_ep_init(epp, url, sock, NNI_EP_MODE_DIAL));
+ (void) nni_tls_set_nodelay(tls, p->nodelay);
+ (void) nni_tls_set_keepalive(tls, p->keepalive);
+
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
}
-static int
-tls_listener_init(void **epp, nni_url *url, nni_sock *sock)
+static void
+tlstran_dialer_cancel(nni_aio *aio, int rv)
{
- return (tls_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN));
+ tlstran_dialer *d = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&d->ep.mtx);
+ if (d->user_aio != aio) {
+ nni_mtx_unlock(&d->ep.mtx);
+ return;
+ }
+ d->user_aio = NULL;
+ nni_mtx_unlock(&d->ep.mtx);
+
+ nni_aio_abort(d->aio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
-tls_ep_close(void *arg)
+tlstran_dialer_connect(void *arg, nni_aio *aio)
{
- tls_ep *ep = arg;
+ tlstran_dialer *d = arg;
+ int rv;
- nni_aio_close(ep->aio);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&d->ep.mtx);
+ NNI_ASSERT(d->user_aio == NULL);
- nni_mtx_lock(&ep->mtx);
- nni_plat_tcp_ep_close(ep->tep);
- nni_mtx_unlock(&ep->mtx);
+ if ((rv = nni_aio_schedule(aio, tlstran_dialer_cancel, d)) != 0) {
+ nni_mtx_unlock(&d->ep.mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ d->user_aio = aio;
+
+ d->resolving = true;
+
+ // Start the name resolution. Callback will see resolving, and then
+ // switch to doing actual connect.
+ nni_aio_set_input(d->aio, 0, &d->sa);
+ nni_tcp_resolv(
+ d->ep.url->u_hostname, d->ep.url->u_port, d->af, 0, d->aio);
+ nni_mtx_unlock(&d->ep.mtx);
}
-static int
-tls_ep_bind(void *arg)
+static void
+tlstran_listener_fini(void *arg)
{
- tls_ep *ep = arg;
- int rv;
+ tlstran_listener *l = arg;
- nni_mtx_lock(&ep->mtx);
- rv = nni_plat_tcp_ep_listen(ep->tep, &ep->bsa);
- nni_mtx_unlock(&ep->mtx);
-
- return (rv);
+ nni_aio_stop(l->aio);
+ if (l->listener != NULL) {
+ nni_tcp_listener_fini(l->listener);
+ }
+ nni_aio_fini(l->aio);
+ if (l->ep.cfg != NULL) {
+ nni_tls_config_fini(l->ep.cfg);
+ }
+ nni_mtx_fini(&l->ep.mtx);
+ NNI_FREE_STRUCT(l);
}
static void
-tls_ep_finish(tls_ep *ep)
+tlstran_listener_close(void *arg)
{
- nni_aio * aio;
- int rv;
- tls_pipe *pipe = NULL;
+ tlstran_listener *l = arg;
- if ((rv = nni_aio_result(ep->aio)) != 0) {
- goto done;
+ nni_aio_close(l->aio);
+ nni_tcp_listener_close(l->listener);
+}
+
+static int
+tlstran_listener_init(void **lp, nni_url *url, nni_sock *sock)
+{
+ tlstran_listener *l;
+ int rv;
+ nni_aio * aio;
+ uint16_t af;
+ char * host = url->u_hostname;
+ char * port = url->u_port;
+
+ if (strcmp(url->u_scheme, "tls+tcp") == 0) {
+ af = NNG_AF_UNSPEC;
+ } else if (strcmp(url->u_scheme, "tls+tcp4") == 0) {
+ af = NNG_AF_INET;
+ } else if (strcmp(url->u_scheme, "tls+tcp6") == 0) {
+ af = NNG_AF_INET6;
+ } else {
+ return (NNG_EADDRINVAL);
+ }
+
+ // Check for invalid URL components.
+ if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) {
+ return (NNG_EADDRINVAL);
+ }
+ if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) ||
+ (url->u_query != NULL)) {
+ return (NNG_EADDRINVAL);
}
- NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL);
- // Attempt to allocate the parent pipe. If this fails we'll
- // drop the connection (ENOMEM probably).
- rv = tls_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0));
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&l->ep.mtx);
+ l->ep.url = url;
+ l->ep.authmode = NNG_TLS_AUTH_MODE_NONE;
+ l->ep.keepalive = false;
+ l->ep.nodelay = true;
+ l->ep.proto = nni_sock_proto_id(sock);
-done:
- aio = ep->user_aio;
- ep->user_aio = NULL;
+ if (strlen(host) == 0) {
+ host = NULL;
+ }
- if ((aio != NULL) && (rv == 0)) {
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
- return;
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ tlstran_listener_fini(l);
+ return (rv);
}
- if (pipe != NULL) {
- tls_pipe_fini(pipe);
+
+ // XXX: We are doing lookup at listener initialization. There is
+ // a valid argument that this should be done at bind time, but that
+ // would require making bind asynchronous. In some ways this would
+ // be worse than the cost of just waiting here. We always recommend
+ // using local IP addresses rather than names when possible.
+
+ nni_aio_set_input(aio, 0, &l->sa);
+
+ nni_tcp_resolv(host, port, af, 1, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
+
+ if (rv != 0) {
+ tlstran_listener_fini(l);
+ return (rv);
}
- if (aio != NULL) {
- NNI_ASSERT(rv != 0);
- nni_aio_finish_error(aio, rv);
+
+ if (((rv = nni_tcp_listener_init(&l->listener)) != 0) ||
+ ((rv = nni_tls_config_init(&l->ep.cfg, NNG_TLS_MODE_SERVER)) !=
+ 0) ||
+ ((rv = nng_tls_config_auth_mode(l->ep.cfg, l->ep.authmode)) !=
+ 0) ||
+ ((rv = nni_aio_init(&l->aio, tlstran_listener_cb, l)) != 0)) {
+ tlstran_listener_fini(l);
+ return (rv);
}
+ l->bsa = l->sa;
+
+ *lp = l;
+ return (0);
}
-static void
-tls_ep_cb(void *arg)
+static int
+tlstran_listener_bind(void *arg)
{
- tls_ep *ep = arg;
+ tlstran_listener *l = arg;
+ int rv;
- nni_mtx_lock(&ep->mtx);
- tls_ep_finish(ep);
- nni_mtx_unlock(&ep->mtx);
+ l->bsa = l->sa;
+ nni_mtx_lock(&l->ep.mtx);
+ rv = nni_tcp_listener_listen(l->listener, &l->bsa);
+ nni_mtx_unlock(&l->ep.mtx);
+
+ return (rv);
}
static void
-tls_cancel_ep(nni_aio *aio, int rv)
+tlstran_listener_cb(void *arg)
{
- tls_ep *ep = nni_aio_get_prov_data(aio);
+ tlstran_listener *l = arg;
+ nni_aio * aio;
+ int rv;
+ tlstran_pipe * p = NULL;
+ nni_tcp_conn * conn;
+ nni_tls * tls;
+
+ nni_mtx_lock(&l->ep.mtx);
+ rv = nni_aio_result(l->aio);
+ aio = l->user_aio;
+ l->user_aio = NULL;
+
+ if (aio == NULL) {
+ nni_mtx_unlock(&l->ep.mtx);
+ if (rv == 0) {
+ conn = nni_aio_get_output(l->aio, 0);
+ nni_tcp_conn_fini(conn);
+ }
+ return;
+ }
+ if (rv != 0) {
+ nni_mtx_unlock(&l->ep.mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
- nni_mtx_lock(&ep->mtx);
- if (ep->user_aio != aio) {
- nni_mtx_unlock(&ep->mtx);
+ conn = nni_aio_get_output(l->aio, 0);
+ if ((rv = nni_tls_init(&tls, l->ep.cfg, conn)) != 0) {
+ nni_mtx_unlock(&l->ep.mtx);
+ nni_tcp_conn_fini(conn);
+ nni_aio_finish_error(aio, rv);
return;
}
- ep->user_aio = NULL;
- nni_mtx_unlock(&ep->mtx);
+ if ((rv = tlstran_pipe_init(&p, tls)) != 0) {
+ nni_mtx_unlock(&l->ep.mtx);
+ nni_tls_fini(tls);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ p->proto = l->ep.proto;
+ p->rcvmax = l->ep.rcvmax;
+ p->nodelay = l->ep.nodelay;
+ p->keepalive = l->ep.keepalive;
- nni_aio_abort(ep->aio, rv);
- nni_aio_finish_error(aio, rv);
+ (void) nni_tls_set_nodelay(tls, p->nodelay);
+ (void) nni_tls_set_keepalive(tls, p->keepalive);
+
+ nni_mtx_unlock(&l->ep.mtx);
+
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
}
static void
-tls_ep_accept(void *arg, nni_aio *aio)
+tlstran_listener_cancel(nni_aio *aio, int rv)
{
- tls_ep *ep = arg;
- int rv;
+ tlstran_listener *l = nni_aio_get_prov_data(aio);
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- if ((rv = nni_aio_schedule(aio, tls_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
+ nni_mtx_lock(&l->ep.mtx);
+ if (l->user_aio != aio) {
+ nni_mtx_unlock(&l->ep.mtx);
return;
}
- ep->user_aio = aio;
- nni_plat_tcp_ep_accept(ep->tep, ep->aio);
- nni_mtx_unlock(&ep->mtx);
+ l->user_aio = NULL;
+ nni_mtx_unlock(&l->ep.mtx);
+
+ nni_aio_abort(l->aio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
-tls_ep_connect(void *arg, nni_aio *aio)
+tlstran_listener_accept(void *arg, nni_aio *aio)
{
- tls_ep *ep = arg;
- int rv;
+ tlstran_listener *l = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- if ((rv = nni_aio_schedule(aio, tls_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_lock(&l->ep.mtx);
+ NNI_ASSERT(l->user_aio == NULL);
+
+ if ((rv = nni_aio_schedule(aio, tlstran_listener_cancel, l)) != 0) {
+ nni_mtx_unlock(&l->ep.mtx);
nni_aio_finish_error(aio, rv);
+ return;
}
- ep->user_aio = aio;
- nni_plat_tcp_ep_connect(ep->tep, ep->aio);
- nni_mtx_unlock(&ep->mtx);
-}
+ l->user_aio = aio;
-static int
-tls_ep_chk_bool(const void *v, size_t sz, nni_opt_type t)
-{
- return (nni_copyin_bool(NULL, v, sz, t));
+ nni_tcp_listener_accept(l->listener, l->aio);
+ nni_mtx_unlock(&l->ep.mtx);
}
static int
-tls_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t)
+tlstran_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t)
{
- tls_ep *ep = arg;
- bool val;
- int rv;
+ tlstran_ep *ep = arg;
+ bool val;
+ int rv;
if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) {
nni_mtx_lock(&ep->mtx);
ep->nodelay = val;
@@ -874,10 +1032,10 @@ tls_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t)
}
static int
-tls_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
+tlstran_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tls_ep *ep = arg;
- int rv;
+ tlstran_ep *ep = arg;
+ int rv;
nni_mtx_lock(&ep->mtx);
rv = nni_copyout_bool(ep->nodelay, v, szp, t);
nni_mtx_unlock(&ep->mtx);
@@ -885,11 +1043,25 @@ tls_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
}
static int
-tls_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t)
+tlstran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
{
- tls_ep *ep = arg;
- bool val;
- int rv;
+ tlstran_ep *ep = arg;
+ size_t val;
+ int rv;
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
+ nni_mtx_lock(&ep->mtx);
+ ep->rcvmax = val;
+ nni_mtx_unlock(&ep->mtx);
+ }
+ return (rv);
+}
+
+static int
+tlstran_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t)
+{
+ tlstran_ep *ep = arg;
+ bool val;
+ int rv;
if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) {
nni_mtx_lock(&ep->mtx);
ep->keepalive = val;
@@ -899,10 +1071,10 @@ tls_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t)
}
static int
-tls_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
+tlstran_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tls_ep *ep = arg;
- int rv;
+ tlstran_ep *ep = arg;
+ int rv;
nni_mtx_lock(&ep->mtx);
rv = nni_copyout_bool(ep->keepalive, v, szp, t);
nni_mtx_unlock(&ep->mtx);
@@ -910,60 +1082,53 @@ tls_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
}
static int
-tls_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
+tlstran_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tls_ep *ep = arg;
-
- return (nni_copyout_str(ep->url->u_rawurl, v, szp, t));
+ tlstran_ep *ep = arg;
+ int rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_size(ep->rcvmax, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
}
static int
-tls_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
+tlstran_check_bool(const void *v, size_t sz, nni_opt_type t)
{
- tls_ep *ep = arg;
- char ustr[128];
- char ipstr[48]; // max for IPv6 addresses including []
- char portstr[6]; // max for 16-bit port
-
- nni_plat_tcp_ntop(&ep->bsa, ipstr, portstr);
- snprintf(ustr, sizeof(ustr), "tls+tcp://%s:%s", ipstr, portstr);
- return (nni_copyout_str(ustr, v, szp, t));
+ return (nni_copyin_bool(NULL, v, sz, t));
}
static int
-tls_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
+tlstran_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tls_ep *ep = arg;
- size_t val;
- int rv;
+ tlstran_dialer *d = arg;
- if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
- nni_mtx_lock(&ep->mtx);
- ep->rcvmax = val;
- nni_mtx_unlock(&ep->mtx);
- }
- return (rv);
+ return (nni_copyout_str(d->ep.url->u_rawurl, v, szp, t));
}
static int
-tls_ep_chk_recvmaxsz(const void *v, size_t sz, nni_opt_type t)
+tlstran_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t));
+ tlstran_listener *l = arg;
+ char ustr[128];
+ char ipstr[48]; // max for IPv6 addresses including []
+ char portstr[6]; // max for 16-bit port
+
+ nni_mtx_lock(&l->ep.mtx);
+ nni_ntop(&l->bsa, ipstr, portstr);
+ nni_mtx_unlock(&l->ep.mtx);
+ snprintf(ustr, sizeof(ustr), "tls+tcp://%s:%s", ipstr, portstr);
+ return (nni_copyout_str(ustr, v, szp, t));
}
static int
-tls_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
+tlstran_check_recvmaxsz(const void *v, size_t sz, nni_opt_type t)
{
- tls_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- rv = nni_copyout_size(ep->rcvmax, v, szp, t);
- nni_mtx_unlock(&ep->mtx);
- return (rv);
+ return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t));
}
static int
-tls_ep_chk_config(const void *data, size_t sz, nni_opt_type t)
+tlstran_check_config(const void *data, size_t sz, nni_opt_type t)
{
void *v;
int rv;
@@ -974,9 +1139,9 @@ tls_ep_chk_config(const void *data, size_t sz, nni_opt_type t)
}
static int
-tls_ep_set_config(void *arg, const void *data, size_t sz, nni_opt_type t)
+tlstran_ep_set_config(void *arg, const void *data, size_t sz, nni_opt_type t)
{
- tls_ep * ep = arg;
+ tlstran_ep * ep = arg;
nng_tls_config *cfg, *old;
int rv;
@@ -998,10 +1163,10 @@ tls_ep_set_config(void *arg, const void *data, size_t sz, nni_opt_type t)
}
static int
-tls_ep_get_config(void *arg, void *v, size_t *szp, nni_opt_type t)
+tlstran_ep_get_config(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tls_ep *ep = arg;
- int rv;
+ tlstran_ep *ep = arg;
+ int rv;
nni_mtx_lock(&ep->mtx);
rv = nni_copyout_ptr(ep->cfg, v, szp, t);
nni_mtx_unlock(&ep->mtx);
@@ -1009,7 +1174,7 @@ tls_ep_get_config(void *arg, void *v, size_t *szp, nni_opt_type t)
}
static int
-tls_ep_chk_string(const void *v, size_t sz, nni_opt_type t)
+tlstran_check_string(const void *v, size_t sz, nni_opt_type t)
{
if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) {
return (NNG_EBADTYPE);
@@ -1021,58 +1186,65 @@ tls_ep_chk_string(const void *v, size_t sz, nni_opt_type t)
}
static int
-tls_ep_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t)
+tlstran_ep_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t)
{
- tls_ep *ep = arg;
- int rv;
+ tlstran_ep *ep = arg;
+ int rv;
- if ((rv = tls_ep_chk_string(v, sz, t)) == 0) {
+ if ((rv = tlstran_check_string(v, sz, t)) == 0) {
+ nni_mtx_lock(&ep->mtx);
rv = nng_tls_config_ca_file(ep->cfg, v);
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
static int
-tls_ep_chk_auth_mode(const void *v, size_t sz, nni_opt_type t)
+tlstran_check_auth_mode(const void *v, size_t sz, nni_opt_type t)
{
return (nni_copyin_int(NULL, v, sz, NNG_TLS_AUTH_MODE_NONE,
NNG_TLS_AUTH_MODE_REQUIRED, t));
}
static int
-tls_ep_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t)
+tlstran_ep_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t)
{
- tls_ep *ep = arg;
- int mode;
- int rv;
+ tlstran_ep *ep = arg;
+ int mode;
+ int rv;
rv = nni_copyin_int(&mode, v, sz, NNG_TLS_AUTH_MODE_NONE,
NNG_TLS_AUTH_MODE_REQUIRED, t);
if (rv == 0) {
+ nni_mtx_lock(&ep->mtx);
rv = nng_tls_config_auth_mode(ep->cfg, mode);
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
static int
-tls_ep_set_server_name(void *arg, const void *v, size_t sz, nni_opt_type t)
+tlstran_ep_set_server_name(void *arg, const void *v, size_t sz, nni_opt_type t)
{
- tls_ep *ep = arg;
- int rv;
+ tlstran_ep *ep = arg;
+ int rv;
- if ((rv = tls_ep_chk_string(v, sz, t)) == 0) {
+ if ((rv = tlstran_check_string(v, sz, t)) == 0) {
+ nni_mtx_lock(&ep->mtx);
rv = nng_tls_config_server_name(ep->cfg, v);
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
static int
-tls_ep_set_cert_key_file(void *arg, const void *v, size_t sz, nni_opt_type t)
+tlstran_ep_set_cert_key_file(
+ void *arg, const void *v, size_t sz, nni_opt_type t)
{
- tls_ep *ep = arg;
- int rv;
+ tlstran_ep *ep = arg;
+ int rv;
- if ((rv = tls_ep_chk_string(v, sz, t)) == 0) {
+ if ((rv = tlstran_check_string(v, sz, t)) == 0) {
nni_mtx_lock(&ep->mtx);
rv = nng_tls_config_cert_key_file(ep->cfg, v, NULL);
nni_mtx_unlock(&ep->mtx);
@@ -1081,38 +1253,38 @@ tls_ep_set_cert_key_file(void *arg, const void *v, size_t sz, nni_opt_type t)
}
static int
-tls_pipe_get_verified(void *arg, void *v, size_t *szp, nni_opt_type t)
+tlstran_pipe_get_verified(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- tls_pipe *p = arg;
+ tlstran_pipe *p = arg;
return (nni_copyout_bool(nni_tls_verified(p->tls), v, szp, t));
}
-static nni_tran_option tls_pipe_options[] = {
+static nni_tran_option tlstran_pipe_options[] = {
{
.o_name = NNG_OPT_LOCADDR,
.o_type = NNI_TYPE_SOCKADDR,
- .o_get = tls_pipe_get_locaddr,
+ .o_get = tlstran_pipe_get_locaddr,
},
{
.o_name = NNG_OPT_REMADDR,
.o_type = NNI_TYPE_SOCKADDR,
- .o_get = tls_pipe_get_remaddr,
+ .o_get = tlstran_pipe_get_remaddr,
},
{
.o_name = NNG_OPT_TLS_VERIFIED,
.o_type = NNI_TYPE_BOOL,
- .o_get = tls_pipe_get_verified,
+ .o_get = tlstran_pipe_get_verified,
},
{
.o_name = NNG_OPT_TCP_KEEPALIVE,
.o_type = NNI_TYPE_BOOL,
- .o_get = tls_pipe_get_keepalive,
+ .o_get = tlstran_pipe_get_keepalive,
},
{
.o_name = NNG_OPT_TCP_NODELAY,
.o_type = NNI_TYPE_BOOL,
- .o_get = tls_pipe_get_nodelay,
+ .o_get = tlstran_pipe_get_nodelay,
},
// terminate list
{
@@ -1120,74 +1292,74 @@ static nni_tran_option tls_pipe_options[] = {
},
};
-static nni_tran_pipe_ops tls_pipe_ops = {
- .p_fini = tls_pipe_fini,
- .p_start = tls_pipe_start,
- .p_stop = tls_pipe_stop,
- .p_send = tls_pipe_send,
- .p_recv = tls_pipe_recv,
- .p_close = tls_pipe_close,
- .p_peer = tls_pipe_peer,
- .p_options = tls_pipe_options,
+static nni_tran_pipe_ops tlstran_pipe_ops = {
+ .p_fini = tlstran_pipe_fini,
+ .p_start = tlstran_pipe_start,
+ .p_stop = tlstran_pipe_stop,
+ .p_send = tlstran_pipe_send,
+ .p_recv = tlstran_pipe_recv,
+ .p_close = tlstran_pipe_close,
+ .p_peer = tlstran_pipe_peer,
+ .p_options = tlstran_pipe_options,
};
-static nni_tran_option tls_dialer_options[] = {
+static nni_tran_option tlstran_dialer_options[] = {
{
.o_name = NNG_OPT_RECVMAXSZ,
.o_type = NNI_TYPE_SIZE,
- .o_get = tls_ep_get_recvmaxsz,
- .o_set = tls_ep_set_recvmaxsz,
- .o_chk = tls_ep_chk_recvmaxsz,
+ .o_get = tlstran_ep_get_recvmaxsz,
+ .o_set = tlstran_ep_set_recvmaxsz,
+ .o_chk = tlstran_check_recvmaxsz,
},
{
.o_name = NNG_OPT_URL,
.o_type = NNI_TYPE_STRING,
- .o_get = tls_dialer_get_url,
+ .o_get = tlstran_dialer_get_url,
},
{
.o_name = NNG_OPT_TLS_CONFIG,
.o_type = NNI_TYPE_POINTER,
- .o_get = tls_ep_get_config,
- .o_set = tls_ep_set_config,
- .o_chk = tls_ep_chk_config,
+ .o_get = tlstran_ep_get_config,
+ .o_set = tlstran_ep_set_config,
+ .o_chk = tlstran_check_config,
},
{
.o_name = NNG_OPT_TLS_CERT_KEY_FILE,
.o_type = NNI_TYPE_STRING,
- .o_set = tls_ep_set_cert_key_file,
- .o_chk = tls_ep_chk_string,
+ .o_set = tlstran_ep_set_cert_key_file,
+ .o_chk = tlstran_check_string,
},
{
.o_name = NNG_OPT_TLS_CA_FILE,
.o_type = NNI_TYPE_STRING,
- .o_set = tls_ep_set_ca_file,
- .o_chk = tls_ep_chk_string,
+ .o_set = tlstran_ep_set_ca_file,
+ .o_chk = tlstran_check_string,
},
{
.o_name = NNG_OPT_TLS_AUTH_MODE,
.o_type = NNI_TYPE_INT32, // enum really
- .o_set = tls_ep_set_auth_mode,
- .o_chk = tls_ep_chk_auth_mode,
+ .o_set = tlstran_ep_set_auth_mode,
+ .o_chk = tlstran_check_auth_mode,
},
{
.o_name = NNG_OPT_TLS_SERVER_NAME,
.o_type = NNI_TYPE_STRING,
- .o_set = tls_ep_set_server_name,
- .o_chk = tls_ep_chk_string,
+ .o_set = tlstran_ep_set_server_name,
+ .o_chk = tlstran_check_string,
},
{
.o_name = NNG_OPT_TCP_NODELAY,
.o_type = NNI_TYPE_BOOL,
- .o_get = tls_ep_get_nodelay,
- .o_set = tls_ep_set_nodelay,
- .o_chk = tls_ep_chk_bool,
+ .o_get = tlstran_ep_get_nodelay,
+ .o_set = tlstran_ep_set_nodelay,
+ .o_chk = tlstran_check_bool,
},
{
.o_name = NNG_OPT_TCP_KEEPALIVE,
.o_type = NNI_TYPE_BOOL,
- .o_get = tls_ep_get_keepalive,
- .o_set = tls_ep_set_keepalive,
- .o_chk = tls_ep_chk_bool,
+ .o_get = tlstran_ep_get_keepalive,
+ .o_set = tlstran_ep_set_keepalive,
+ .o_chk = tlstran_check_bool,
},
// terminate list
{
@@ -1195,63 +1367,63 @@ static nni_tran_option tls_dialer_options[] = {
},
};
-static nni_tran_option tls_listener_options[] = {
+static nni_tran_option tlstran_listener_options[] = {
{
.o_name = NNG_OPT_RECVMAXSZ,
.o_type = NNI_TYPE_SIZE,
- .o_get = tls_ep_get_recvmaxsz,
- .o_set = tls_ep_set_recvmaxsz,
- .o_chk = tls_ep_chk_recvmaxsz,
+ .o_get = tlstran_ep_get_recvmaxsz,
+ .o_set = tlstran_ep_set_recvmaxsz,
+ .o_chk = tlstran_check_recvmaxsz,
},
{
.o_name = NNG_OPT_URL,
.o_type = NNI_TYPE_STRING,
- .o_get = tls_listener_get_url,
+ .o_get = tlstran_listener_get_url,
},
{
.o_name = NNG_OPT_TLS_CONFIG,
.o_type = NNI_TYPE_POINTER,
- .o_get = tls_ep_get_config,
- .o_set = tls_ep_set_config,
- .o_chk = tls_ep_chk_config,
+ .o_get = tlstran_ep_get_config,
+ .o_set = tlstran_ep_set_config,
+ .o_chk = tlstran_check_config,
},
{
.o_name = NNG_OPT_TLS_CERT_KEY_FILE,
.o_type = NNI_TYPE_STRING,
- .o_set = tls_ep_set_cert_key_file,
- .o_chk = tls_ep_chk_string,
+ .o_set = tlstran_ep_set_cert_key_file,
+ .o_chk = tlstran_check_string,
},
{
.o_name = NNG_OPT_TLS_CA_FILE,
.o_type = NNI_TYPE_STRING,
- .o_set = tls_ep_set_ca_file,
- .o_chk = tls_ep_chk_string,
+ .o_set = tlstran_ep_set_ca_file,
+ .o_chk = tlstran_check_string,
},
{
.o_name = NNG_OPT_TLS_AUTH_MODE,
.o_type = NNI_TYPE_INT32, // enum really
- .o_set = tls_ep_set_auth_mode,
- .o_chk = tls_ep_chk_auth_mode,
+ .o_set = tlstran_ep_set_auth_mode,
+ .o_chk = tlstran_check_auth_mode,
},
{
.o_name = NNG_OPT_TLS_SERVER_NAME,
.o_type = NNI_TYPE_STRING,
- .o_set = tls_ep_set_server_name,
- .o_chk = tls_ep_chk_string,
+ .o_set = tlstran_ep_set_server_name,
+ .o_chk = tlstran_check_string,
},
{
.o_name = NNG_OPT_TCP_NODELAY,
.o_type = NNI_TYPE_BOOL,
- .o_get = tls_ep_get_nodelay,
- .o_set = tls_ep_set_nodelay,
- .o_chk = tls_ep_chk_bool,
+ .o_get = tlstran_ep_get_nodelay,
+ .o_set = tlstran_ep_set_nodelay,
+ .o_chk = tlstran_check_bool,
},
{
.o_name = NNG_OPT_TCP_KEEPALIVE,
.o_type = NNI_TYPE_BOOL,
- .o_get = tls_ep_get_keepalive,
- .o_set = tls_ep_set_keepalive,
- .o_chk = tls_ep_chk_bool,
+ .o_get = tlstran_ep_get_keepalive,
+ .o_set = tlstran_ep_set_keepalive,
+ .o_chk = tlstran_check_bool,
},
// terminate list
{
@@ -1259,51 +1431,51 @@ static nni_tran_option tls_listener_options[] = {
},
};
-static nni_tran_dialer_ops tls_dialer_ops = {
- .d_init = tls_dialer_init,
- .d_fini = tls_ep_fini,
- .d_connect = tls_ep_connect,
- .d_close = tls_ep_close,
- .d_options = tls_dialer_options,
+static nni_tran_dialer_ops tlstran_dialer_ops = {
+ .d_init = tlstran_dialer_init,
+ .d_fini = tlstran_dialer_fini,
+ .d_connect = tlstran_dialer_connect,
+ .d_close = tlstran_dialer_close,
+ .d_options = tlstran_dialer_options,
};
-static nni_tran_listener_ops tls_listener_ops = {
- .l_init = tls_listener_init,
- .l_fini = tls_ep_fini,
- .l_bind = tls_ep_bind,
- .l_accept = tls_ep_accept,
- .l_close = tls_ep_close,
- .l_options = tls_listener_options,
+static nni_tran_listener_ops tlstran_listener_ops = {
+ .l_init = tlstran_listener_init,
+ .l_fini = tlstran_listener_fini,
+ .l_bind = tlstran_listener_bind,
+ .l_accept = tlstran_listener_accept,
+ .l_close = tlstran_listener_close,
+ .l_options = tlstran_listener_options,
};
static nni_tran tls_tran = {
.tran_version = NNI_TRANSPORT_VERSION,
.tran_scheme = "tls+tcp",
- .tran_dialer = &tls_dialer_ops,
- .tran_listener = &tls_listener_ops,
- .tran_pipe = &tls_pipe_ops,
- .tran_init = tls_tran_init,
- .tran_fini = tls_tran_fini,
+ .tran_dialer = &tlstran_dialer_ops,
+ .tran_listener = &tlstran_listener_ops,
+ .tran_pipe = &tlstran_pipe_ops,
+ .tran_init = tlstran_init,
+ .tran_fini = tlstran_fini,
};
static nni_tran tls4_tran = {
.tran_version = NNI_TRANSPORT_VERSION,
.tran_scheme = "tls+tcp4",
- .tran_dialer = &tls_dialer_ops,
- .tran_listener = &tls_listener_ops,
- .tran_pipe = &tls_pipe_ops,
- .tran_init = tls_tran_init,
- .tran_fini = tls_tran_fini,
+ .tran_dialer = &tlstran_dialer_ops,
+ .tran_listener = &tlstran_listener_ops,
+ .tran_pipe = &tlstran_pipe_ops,
+ .tran_init = tlstran_init,
+ .tran_fini = tlstran_fini,
};
static nni_tran tls6_tran = {
.tran_version = NNI_TRANSPORT_VERSION,
.tran_scheme = "tls+tcp6",
- .tran_dialer = &tls_dialer_ops,
- .tran_listener = &tls_listener_ops,
- .tran_pipe = &tls_pipe_ops,
- .tran_init = tls_tran_init,
- .tran_fini = tls_tran_fini,
+ .tran_dialer = &tlstran_dialer_ops,
+ .tran_listener = &tlstran_listener_ops,
+ .tran_pipe = &tlstran_pipe_ops,
+ .tran_init = tlstran_init,
+ .tran_fini = tlstran_fini,
};
int