diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-07-09 09:59:46 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-07-16 10:06:50 -0700 |
| commit | b44e20c80c936a29bfeaf964ec94bc62ac0386f5 (patch) | |
| tree | 87b2b5b999046b7f10789d4bae863eeea9354e44 | |
| parent | 05f404b917ddaf9fee70208a796cdf66ee747050 (diff) | |
| download | nng-b44e20c80c936a29bfeaf964ec94bc62ac0386f5.tar.gz nng-b44e20c80c936a29bfeaf964ec94bc62ac0386f5.tar.bz2 nng-b44e20c80c936a29bfeaf964ec94bc62ac0386f5.zip | |
fixes #523 dialers could support multiple outstanding dial requests
fixes #179 DNS resolution should be done at connect time
fixes #586 Windows IO completion port work could be better
fixes #339 Windows iocp could use synchronous completions
fixes #280 TCP abstraction improvements
This is a rather monstrous set of changes, which refactors TCP, and
the underlying Windows I/O completion path logic, in order to obtain
a cleaner, simpler API, with support for asynchronous DNS lookups performed
on connect rather than initialization time, the ability to have multiple
connects or accepts pending, as well as fewer extraneous function calls.
The Windows code also benefits from greatly reduced context switching,
fewer lock operations performed, and a reduced number of system calls
on the hot code path. (We use automatic event resetting instead of manual.)
Some dead code was removed as well, and a few potential edge case leaks
on failure paths (in the websocket code) were plugged.
Note that all TCP based transports benefit from this work. The IPC code
on Windows still uses the legacy IOCP for now, as does the UDP code (used
for ZeroTier.) We will be converting those soon too.
36 files changed, 3931 insertions, 2078 deletions
diff --git a/docs/man/nng_tcp.7.adoc b/docs/man/nng_tcp.7.adoc index 250e9399..c0c4a5f0 100644 --- a/docs/man/nng_tcp.7.adoc +++ b/docs/man/nng_tcp.7.adoc @@ -66,12 +66,6 @@ separating the port. For example, the same port 80 on the IPv6 loopback address (`::1`) would be specified as `tcp://[::1]:80`. -NOTE: When using symbolic names, the name is resolved when the -name is first used. _nng_ won't become aware of changes in the -name resolution until restart, -usually. -(This is a bug and will likely be fixed in the future.) - The special value of 0 (`INADDR_ANY`)(((`INADDR_ANY`))) can be used for a listener to indicate that it should listen on all interfaces on the host. diff --git a/docs/man/nng_tls.7.adoc b/docs/man/nng_tls.7.adoc index 61d0b220..43c5a913 100644 --- a/docs/man/nng_tls.7.adoc +++ b/docs/man/nng_tls.7.adoc @@ -88,12 +88,6 @@ separating the port. For example, the same port 4433 on the IPv6 loopback address ('::1') would be specified as `tls+tcp://[::1]:4433`. -NOTE: When using symbolic names, the name is resolved when the -name is first used. _nng_ won't become aware of changes in the -name resolution until restart, -usually. -(This is a bug and will likely be fixed in the future.) - TIP: Certificate validation generally works when using names rather than IP addresses. This transport automatically uses the name supplied in the URL when validating diff --git a/docs/man/nng_ws.7.adoc b/docs/man/nng_ws.7.adoc index bec579ac..1aede210 100644 --- a/docs/man/nng_ws.7.adoc +++ b/docs/man/nng_ws.7.adoc @@ -65,11 +65,6 @@ separating the port. For example, the same path and port on the IPv6 loopback address (`::1`) would be specified as `ws://[::1]/app/pubsub`. -NOTE: When using symbolic names, the name is resolved when the -name is first used. _nng_ won't become aware of changes in the -name resolution until restart, -usually. (This is a bug and will likely be fixed in the future.) - NOTE: The value specified as the host, if any, will also be used in the `Host:` ((HTTP header)) during HTTP negotiation. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e9561980..e89f782b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -104,6 +104,9 @@ if (NNG_PLATFORM_POSIX) platform/posix/posix_resolv_gai.c platform/posix/posix_sockaddr.c platform/posix/posix_tcp.c + platform/posix/posix_tcpconn.c + platform/posix/posix_tcpdial.c + platform/posix/posix_tcplisten.c platform/posix/posix_thread.c platform/posix/posix_udp.c ) @@ -133,6 +136,7 @@ if (NNG_PLATFORM_WINDOWS) platform/windows/win_clock.c platform/windows/win_debug.c platform/windows/win_file.c + platform/windows/win_io.c platform/windows/win_iocp.c platform/windows/win_ipc.c platform/windows/win_pipe.c @@ -140,6 +144,9 @@ if (NNG_PLATFORM_WINDOWS) platform/windows/win_resolv.c platform/windows/win_sockaddr.c platform/windows/win_tcp.c + platform/windows/win_tcpconn.c + platform/windows/win_tcpdial.c + platform/windows/win_tcplisten.c platform/windows/win_thread.c platform/windows/win_udp.c ) diff --git a/src/core/aio.c b/src/core/aio.c index c8517b9b..40638bce 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -378,8 +378,10 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) return (NNG_ECLOSED); } + NNI_ASSERT(aio->a_prov_cancel == NULL); aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; + if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } diff --git a/src/core/platform.h b/src/core/platform.h index 607c3827..5045b172 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -209,93 +209,103 @@ extern int nni_plat_ncpu(void); // // TCP Support. // +typedef struct nni_tcp_conn nni_tcp_conn; +typedef struct nni_tcp_dialer nni_tcp_dialer; +typedef struct nni_tcp_listener nni_tcp_listener; -typedef struct nni_plat_tcp_ep nni_plat_tcp_ep; -typedef struct nni_plat_tcp_pipe nni_plat_tcp_pipe; +extern void nni_tcp_conn_fini(nni_tcp_conn *); -// nni_plat_tcp_ep_init creates a new endpoint associated with the local -// and remote addresses. -extern int nni_plat_tcp_ep_init( - nni_plat_tcp_ep **, const nni_sockaddr *, const nni_sockaddr *, int); +// nni_tcp_dialer_close closes the dialer, which might actually be +// implemented as a shutdown() call. +// Further operations on it should return NNG_ECLOSED. +extern void nni_tcp_conn_close(nni_tcp_conn *); -// nni_plat_tcp_ep_fini closes the endpoint and releases resources. -extern void nni_plat_tcp_ep_fini(nni_plat_tcp_ep *); - -// nni_plat_tcp_ep_close closes the endpoint; this might not close the -// actual underlying socket, but it should call shutdown on it. -// Further operations on the pipe should return NNG_ECLOSED. -extern void nni_plat_tcp_ep_close(nni_plat_tcp_ep *); - -// nni_plat_tcp_listen creates an TCP socket in listening mode, bound -// to the specified path. -extern int nni_plat_tcp_ep_listen(nni_plat_tcp_ep *, nni_sockaddr *); - -// nni_plat_tcp_ep_accept starts an accept to receive an incoming connection. -// An accepted connection will be passed back in the a_pipe member. -extern void nni_plat_tcp_ep_accept(nni_plat_tcp_ep *, nni_aio *); - -// nni_plat_tcp_connect is the client side. -// An accepted connection will be passed back in the a_pipe member. -extern void nni_plat_tcp_ep_connect(nni_plat_tcp_ep *, nni_aio *); - -// nni_plat_tcp_pipe_fini closes the pipe, and releases all resources -// associated with it. -extern void nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *); - -// nni_plat_tcp_pipe_close closes the socket, or at least shuts it down. -// Further operations on the pipe should return NNG_ECLOSED. -extern void nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *); - -// nni_plat_tcp_pipe_send sends data in the iov buffers to the peer. +// nni_tcp_conn_send sends data in the iov buffers to the peer. // The platform may modify the iovs. -extern void nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *, nni_aio *); +extern void nni_tcp_conn_send(nni_tcp_conn *, nni_aio *); -// nni_plat_tcp_pipe_recv receives data into the buffers provided by the +// nni_tcp_conn_recv receives data into the buffers provided by the // I/O vector (iovs). The platform should attempt to scatter the received // data into the iovs if possible. // -// It is an error for the caller to supply any IO vector elements with -// zero length. -// -// It is possible for the TCP reader to return less data than is requested, +// It is possible for the reader to return less data than is requested, // in which case the caller is responsible for resubmitting. The platform -// should not return "zero" data however. (It is an error to attempt to -// receive zero bytes.) The platform may not modify the I/O vector. -extern void nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *, nni_aio *); +// must not return "zero" data however. (It is an error to attempt to +// receive zero bytes.) The platform may modify the iovs. +extern void nni_tcp_conn_recv(nni_tcp_conn *, nni_aio *); -// nni_plat_tcp_pipe_peername gets the peer name. -extern int nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *, nni_sockaddr *); +// nni_tcp_conn_peername gets the peer name. +extern int nni_tcp_conn_peername(nni_tcp_conn *, nni_sockaddr *); -// nni_plat_tcp_pipe_sockname gets the local name. -extern int nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *, nni_sockaddr *); +// nni_tcp_conn_sockname gets the local name. +extern int nni_tcp_conn_sockname(nni_tcp_conn *, nni_sockaddr *); -// nni_plat_tcp_pipe_set_nodelay sets nodelay, disabling Nagle, according -// to the parameter. true disables Nagle; false enables Nagle. -extern int nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *, bool); +// nni_tcp_conn_set_nodelay indicates that the TCP pipe should send +// data immediately, without any buffering. (Disable Nagle's algorithm.) +extern int nni_tcp_conn_set_nodelay(nni_tcp_conn *, bool); -// nni_plat_tcp_pipe_set_keepalive indicates that the TCP pipe should send +// nni_tcp_conn_set_keepalive indicates that the TCP pipe should send // keepalive probes. Tuning of these keepalives is current unsupported. -extern int nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *, bool); - -// nni_plat_tcp_ntop obtains the IP address for the socket (enclosing it +extern int nni_tcp_conn_set_keepalive(nni_tcp_conn *, bool); + +// nni_tcp_listener_init creates a new dialer object. +extern int nni_tcp_dialer_init(nni_tcp_dialer **); + +// nni_tcp_dialer_fini finalizes the dialer, closing it and freeing +// all resources. +extern void nni_tcp_dialer_fini(nni_tcp_dialer *); + +// nni_tcp_dialer_close closes the dialer. +// Further operations on it should return NNG_ECLOSED. Any in-progress +// connection will be aborted. +extern void nni_tcp_dialer_close(nni_tcp_dialer *); + +// nni_tcp_dialer_dial attempts to create an outgoing connection, +// asynchronously, to the address specified. On success, the first (and only) +// output will be an nni_tcp_conn * associated with the remote server. +extern void nni_tcp_dialer_dial( + nni_tcp_dialer *, const nni_sockaddr *, nni_aio *); + +// nni_tcp_listener_init creates a new listener object, unbound. +extern int nni_tcp_listener_init(nni_tcp_listener **); + +// nni_tcp_listener_fini frees the listener and all associated resources. +// It implictly closes the listener as well. +extern void nni_tcp_listener_fini(nni_tcp_listener *); + +// nni_tcp_listener_close closes the listener. This will unbind +// any bound socket, and further operations will result in NNG_ECLOSED. +extern void nni_tcp_listener_close(nni_tcp_listener *); + +// nni_tcp_listener_listen creates the socket in listening mode, bound +// to the specified address. The address will be updated to reflect +// the actual address bound (making it possible to bind to port 0 to +// specify an ephemeral address, and then the actual address can be +// examined afterwards.) +extern int nni_tcp_listener_listen(nni_tcp_listener *, nni_sockaddr *); + +// nni_tcp_listener_accept accepts in incoming connect, asynchronously. +// On success, the first (and only) output will be an nni_tcp_conn * +// associated with the remote peer. +extern void nni_tcp_listener_accept(nni_tcp_listener *, nni_aio *); + +// nni_ntop obtains the IP address for the socket (enclosing it // in brackets if it is IPv6) and port. Enough space for both must // be present (48 bytes and 6 bytes each), although if either is NULL then -// those components are skipped. -extern int nni_plat_tcp_ntop(const nni_sockaddr *, char *, char *); +// those components are skipped. This is based on inet_ntop. +extern int nni_ntop(const nni_sockaddr *, char *, char *); -// nni_plat_tcp_resolv resolves a TCP name asynchronously. The family +// nni_tcp_resolv resolves a TCP name asynchronously. The family // should be one of NNG_AF_INET, NNG_AF_INET6, or NNG_AF_UNSPEC. The // first two constrain the name to those families, while the third will // return names of either family. The passive flag indicates that the // name will be used for bind(), otherwise the name will be used with // connect(). The host part may be NULL only if passive is true. -extern void nni_plat_tcp_resolv( - const char *, const char *, int, int, nni_aio *); +extern void nni_tcp_resolv(const char *, const char *, int, int, nni_aio *); -// nni_plat_udp_resolve is just like nni_plat_tcp_resolve, but looks up +// nni_udp_resolv is just like nni_tcp_resolv, but looks up // service names using UDP. -extern void nni_plat_udp_resolv( - const char *, const char *, int, int, nni_aio *); +extern void nni_udp_resolv(const char *, const char *, int, int, nni_aio *); // // IPC (UNIX Domain Sockets & Named Pipes) Support. @@ -317,7 +327,7 @@ extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *); // Further operations on the pipe should return NNG_ECLOSED. extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *); -// nni_plat_tcp_listen creates an IPC socket in listening mode, bound +// nni_plat_ipc_listen creates an IPC socket in listening mode, bound // to the specified path. extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *); diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 83f2f1a8..3b1ce751 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -20,8 +20,8 @@ #include "core/nng_impl.h" #include "posix_pollq.h" +#include <sys/stat.h> // needed for musl build #include <sys/types.h> // needed for mode_t -#include <sys/stat.h> // needed for musl build typedef struct nni_posix_pipedesc nni_posix_pipedesc; typedef struct nni_posix_epdesc nni_posix_epdesc; @@ -48,5 +48,4 @@ extern int nni_posix_epdesc_listen(nni_posix_epdesc *); extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *); extern int nni_posix_epdesc_sockname(nni_posix_epdesc *, nni_sockaddr *); extern int nni_posix_epdesc_set_permissions(nni_posix_epdesc *, mode_t); - #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 74c92dbd..d796765a 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -188,13 +188,14 @@ nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_posix_epdesc *ed = arg; + NNI_ARG_UNUSED(pfd); + nni_mtx_lock(&ed->mtx); if (events & POLLNVAL) { nni_epdesc_doclose(ed); nni_mtx_unlock(&ed->mtx); return; } - NNI_ASSERT(pfd == ed->pfd); // Anything else will turn up in accept. nni_epdesc_doaccept(ed); diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index 26753df6..ec487de5 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -302,7 +302,7 @@ static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { nni_mtx_lock(&pq->mtx); - pq->closing = 1; + pq->closing = true; nni_mtx_unlock(&pq->mtx); nni_plat_pipe_raise(pq->wakewfd); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 8c003362..63d858c2 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -13,6 +13,7 @@ #ifdef NNG_USE_POSIX_RESOLV_GAI #include "platform/posix/posix_aio.h" +#include <arpa/inet.h> #include <ctype.h> #include <errno.h> #include <netdb.h> @@ -274,14 +275,14 @@ resolv_ip(const char *host, const char *serv, int passive, int family, } void -nni_plat_tcp_resolv( +nni_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void -nni_plat_udp_resolv( +nni_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); @@ -330,6 +331,47 @@ resolv_worker(void *notused) } int +nni_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) +{ + const void *ap; + uint16_t port; + int af; + switch (sa->s_family) { + case NNG_AF_INET: + ap = &sa->s_in.sa_addr; + port = sa->s_in.sa_port; + af = AF_INET; + break; + case NNG_AF_INET6: + ap = &sa->s_in6.sa_addr; + port = sa->s_in6.sa_port; + af = AF_INET6; + break; + default: + return (NNG_EINVAL); + } + if (ipstr != NULL) { + if (af == AF_INET6) { + size_t l; + ipstr[0] = '['; + inet_ntop(af, ap, ipstr + 1, INET6_ADDRSTRLEN); + l = strlen(ipstr); + ipstr[l++] = ']'; + ipstr[l++] = '\0'; + } else { + inet_ntop(af, ap, ipstr, INET6_ADDRSTRLEN); + } + } + if (portstr != NULL) { +#ifdef NNG_LITTLE_ENDIAN + port = ((port >> 8) & 0xff) | ((port & 0xff) << 8); +#endif + snprintf(portstr, 6, "%u", port); + } + return (0); +} + +int nni_posix_resolv_sysinit(void) { nni_mtx_init(&resolv_mtx); diff --git a/src/platform/posix/posix_tcp.c b/src/platform/posix/posix_tcp.c index cbc7a641..53ea8026 100644 --- a/src/platform/posix/posix_tcp.c +++ b/src/platform/posix/posix_tcp.c @@ -26,115 +26,6 @@ #include <unistd.h> int -nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa, - const nni_sockaddr *rsa, int mode) -{ - nni_posix_epdesc * ed; - int rv; - struct sockaddr_storage ss; - int len; - - if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) { - return (rv); - } - - if ((rsa != NULL) && (rsa->s_family != NNG_AF_UNSPEC)) { - len = nni_posix_nn2sockaddr((void *) &ss, rsa); - nni_posix_epdesc_set_remote(ed, &ss, len); - } - if ((lsa != NULL) && (lsa->s_family != NNG_AF_UNSPEC)) { - len = nni_posix_nn2sockaddr((void *) &ss, lsa); - nni_posix_epdesc_set_local(ed, &ss, len); - } - - *epp = (void *) ed; - return (0); -} - -void -nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_fini((void *) ep); -} - -void -nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_close((void *) ep); -} - -int -nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep, nng_sockaddr *bsa) -{ - int rv; - rv = nni_posix_epdesc_listen((void *) ep); - if ((rv == 0) && (bsa != NULL)) { - rv = nni_posix_epdesc_sockname((void *) ep, bsa); - } - return (rv); -} - -void -nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_connect((void *) ep, aio); -} - -void -nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_accept((void *) ep, aio); -} - -void -nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_fini((void *) p); -} - -void -nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_close((void *) p); -} - -void -nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_send((void *) p, aio); -} - -void -nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_recv((void *) p, aio); -} - -int -nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *p, nni_sockaddr *sa) -{ - return (nni_posix_pipedesc_peername((void *) p, sa)); -} - -int -nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *p, nni_sockaddr *sa) -{ - return (nni_posix_pipedesc_sockname((void *) p, sa)); -} - -int -nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *p, bool v) -{ - return (nni_posix_pipedesc_set_keepalive((void *) p, v)); -} - -int -nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *p, bool v) -{ - return (nni_posix_pipedesc_set_nodelay((void *) p, v)); -} - -int nni_plat_tcp_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) { const void *ap; diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h new file mode 100644 index 00000000..aefce7f7 --- /dev/null +++ b/src/platform/posix/posix_tcp.h @@ -0,0 +1,44 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +struct nni_tcp_conn { + nni_posix_pfd * pfd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_aio * dial_aio; + nni_tcp_dialer *dialer; + nni_reap_item reap; +}; + +struct nni_tcp_dialer { + nni_list connq; // pending connections + bool closed; + nni_mtx mtx; +}; + +struct nni_tcp_listener { + nni_posix_pfd *pfd; + nni_list acceptq; + bool started; + bool closed; + nni_mtx mtx; +}; + +extern int nni_posix_tcp_conn_init(nni_tcp_conn **, nni_posix_pfd *); +extern void nni_posix_tcp_conn_start(nni_tcp_conn *); + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c new file mode 100644 index 00000000..30525648 --- /dev/null +++ b/src/platform/posix/posix_tcpconn.c @@ -0,0 +1,410 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <poll.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> +#ifdef NNG_HAVE_ALLOCA +#include <alloca.h> +#endif + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#include "posix_tcp.h" + +static void +tcp_conn_dowrite(nni_tcp_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->writeq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov * aiov; + struct msghdr hdr; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + memset(&hdr, 0, sizeof(hdr)); + nni_aio_get_iov(aio, &naiov, &aiov); + +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len > 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + hdr.msg_iovlen = niov; + hdr.msg_iov = iovec; + + if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + nni_aio_bump_count(aio, n); + // We completed the entire operation on this aio. + // (Sendmsg never returns a partial result.) + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +static void +tcp_conn_doread(nni_tcp_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->readq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov *aiov; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + nni_aio_get_iov(aio, &naiov, &aiov); +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + if (n == 0) { + // No bytes indicates a closed descriptor. + // This implicitly completes this (all!) aio. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + continue; + } + + nni_aio_bump_count(aio, n); + + // We completed the entire operation on this aio. + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +void +nni_tcp_conn_close(nni_tcp_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + nni_aio *aio; + c->closed = true; + while (((aio = nni_list_first(&c->readq)) != NULL) || + ((aio = nni_list_first(&c->writeq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_posix_pfd_close(c->pfd); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_tcp_conn *c = arg; + + if (events & (POLLHUP | POLLERR | POLLNVAL)) { + nni_tcp_conn_close(c); + return; + } + nni_mtx_lock(&c->mtx); + if (events & POLLIN) { + tcp_conn_doread(c); + } + if (events & POLLOUT) { + tcp_conn_dowrite(c); + } + events = 0; + if (!nni_list_empty(&c->writeq)) { + events |= POLLOUT; + } + if (!nni_list_empty(&c->readq)) { + events |= POLLIN; + } + if ((!c->closed) && (events != 0)) { + nni_posix_pfd_arm(pfd, events); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_conn_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) +{ + + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->writeq, aio); + + if (nni_list_first(&c->writeq) == aio) { + tcp_conn_dowrite(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->writeq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLOUT); + } + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->readq, aio); + + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. + if (nni_list_first(&c->readq) == aio) { + tcp_conn_doread(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->readq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLIN); + } + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa) +{ + struct sockaddr_storage ss; + socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(c->pfd); + + if (getpeername(fd, (void *) &ss, &sslen) != 0) { + return (nni_plat_errno(errno)); + } + return (nni_posix_sockaddr2nn(sa, &ss)); +} + +int +nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa) +{ + struct sockaddr_storage ss; + socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(c->pfd); + + if (getsockname(fd, (void *) &ss, &sslen) != 0) { + return (nni_plat_errno(errno)); + } + return (nni_posix_sockaddr2nn(sa, &ss)); +} + +int +nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep) +{ + int val = keep ? 1 : 0; + int fd = nni_posix_pfd_fd(c->pfd); + + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { + return (nni_plat_errno(errno)); + } + return (0); +} + +int +nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay) +{ + + int val = nodelay ? 1 : 0; + int fd = nni_posix_pfd_fd(c->pfd); + + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { + return (nni_plat_errno(errno)); + } + return (0); +} + +int +nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd) +{ + nni_tcp_conn *c; + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + + c->closed = false; + c->pfd = pfd; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + *cp = c; + return (0); +} + +void +nni_posix_tcp_conn_start(nni_tcp_conn *c) +{ + nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c); +} + +void +nni_tcp_conn_fini(nni_tcp_conn *c) +{ + nni_tcp_conn_close(c); + nni_posix_pfd_fini(c->pfd); + c->pfd = NULL; + nni_mtx_fini(&c->mtx); + + NNI_FREE_STRUCT(c); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c new file mode 100644 index 00000000..7f627361 --- /dev/null +++ b/src/platform/posix/posix_tcpdial.c @@ -0,0 +1,234 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_tcp.h" + +// Dialer stuff. +int +nni_tcp_dialer_init(nni_tcp_dialer **dp) +{ + nni_tcp_dialer *d; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&d->mtx); + d->closed = false; + nni_aio_list_init(&d->connq); + *dp = d; + return (0); +} + +void +nni_tcp_dialer_close(nni_tcp_dialer *d) +{ + nni_mtx_lock(&d->mtx); + if (!d->closed) { + nni_aio *aio; + d->closed = true; + while ((aio = nni_list_first(&d->connq)) != NULL) { + nni_tcp_conn *c; + nni_list_remove(&d->connq, aio); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_tcp_conn_close(c); + nni_reap( + &c->reap, (nni_cb) nni_tcp_conn_fini, c); + } + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } + nni_mtx_unlock(&d->mtx); +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_close(d); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +tcp_dialer_cancel(nni_aio *aio, int rv) +{ + nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_conn * c; + + nni_mtx_lock(&d->mtx); + if ((!nni_aio_list_active(aio)) || + ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) { + nni_mtx_unlock(&d->mtx); + return; + } + nni_aio_list_remove(aio); + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + nni_aio_finish_error(aio, rv); + nni_tcp_conn_fini(c); +} + +static void +tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg) +{ + nni_tcp_conn * c = arg; + nni_tcp_dialer *d = c->dialer; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + aio = c->dial_aio; + if ((aio == NULL) || (!nni_aio_list_active(aio))) { + nni_mtx_unlock(&d->mtx); + return; + } + + if (ev & POLLNVAL) { + rv = EBADF; + + } else { + socklen_t sz = sizeof(int); + int fd = nni_posix_pfd_fd(pfd); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + if (rv == EINPROGRESS) { + // Connection still in progress, come back + // later. + nni_mtx_unlock(&d->mtx); + return; + } else if (rv != 0) { + rv = nni_plat_errno(rv); + } + } + + c->dial_aio = NULL; + nni_aio_list_remove(aio); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + if (rv != 0) { + nni_tcp_conn_close(c); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +// We don't give local address binding support. Outbound dialers always +// get an ephemeral port. +void +nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + nni_tcp_conn * c; + nni_posix_pfd * pfd = NULL; + struct sockaddr_storage ss; + size_t sslen; + int fd; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_aio_finish_error(aio, nni_plat_errno(errno)); + return; + } + + // This arranges for the fd to be in nonblocking mode, and adds the + // pollfd to the list. + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + (void) close(fd); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_finish_error(aio, rv); + return; + } + c->dialer = d; + nni_posix_pfd_set_cb(pfd, tcp_dialer_cb, c); + + nni_mtx_lock(&d->mtx); + if (d->closed) { + rv = NNG_ECLOSED; + goto error; + } + if ((rv = nni_aio_schedule(aio, tcp_dialer_cancel, d)) != 0) { + goto error; + } + if ((rv = connect(fd, (void *) &ss, sslen)) != 0) { + if (errno != EINPROGRESS) { + rv = nni_plat_errno(errno); + goto error; + } + // Asynchronous connect. + if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) { + goto error; + } + c->dial_aio = aio; + nni_aio_set_prov_extra(aio, 0, c); + nni_list_append(&d->connq, aio); + nni_mtx_unlock(&d->mtx); + return; + } + // Immediate connect, cool! This probably only happens + // on loopback, and probably not on every platform. + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + return; + +error: + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_reap(&c->reap, (nni_cb) nni_tcp_conn_fini, c); + nni_aio_finish_error(aio, rv); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c new file mode 100644 index 00000000..cdcbecd9 --- /dev/null +++ b/src/platform/posix/posix_tcplisten.c @@ -0,0 +1,319 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_tcp.h" + +int +nni_tcp_listener_init(nni_tcp_listener **lp) +{ + nni_tcp_listener *l; + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_init(&l->mtx); + + l->pfd = NULL; + l->closed = false; + l->started = false; + + nni_aio_list_init(&l->acceptq); + *lp = l; + return (0); +} + +static void +tcp_listener_doclose(nni_tcp_listener *l) +{ + nni_aio *aio; + + l->closed = true; + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + + if (l->pfd != NULL) { + nni_posix_pfd_close(l->pfd); + } +} + +void +nni_tcp_listener_close(nni_tcp_listener *l) +{ + nni_mtx_lock(&l->mtx); + tcp_listener_doclose(l); + nni_mtx_unlock(&l->mtx); +} + +static void +tcp_listener_doaccept(nni_tcp_listener *l) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + int newfd; + int fd; + int rv; + nni_posix_pfd *pfd; + nni_tcp_conn * c; + + fd = nni_posix_pfd_fd(l->pfd); + +#ifdef NNG_USE_ACCEPT4 + newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); + if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { + newfd = accept(fd, NULL, NULL); + } +#else + newfd = accept(fd, NULL, NULL); +#endif + if (newfd < 0) { + switch (errno) { + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + rv = nni_posix_pfd_arm(l->pfd, POLLIN); + if (rv != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + // Come back later... + return; + case ECONNABORTED: + case ECONNRESET: + // Eat them, they aren't interesting. + continue; + default: + // Error this one, but keep moving to the next. + rv = nni_plat_errno(errno); + NNI_ASSERT(rv != 0); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + } + + if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + close(newfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + nni_aio_list_remove(aio); + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +static void +tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_tcp_listener *l = arg; + + nni_mtx_lock(&l->mtx); + if (events & POLLNVAL) { + tcp_listener_doclose(l); + nni_mtx_unlock(&l->mtx); + return; + } + NNI_ASSERT(pfd == l->pfd); + + // Anything else will turn up in accept. + tcp_listener_doaccept(l); + nni_mtx_unlock(&l->mtx); +} + +static void +tcp_listener_cancel(nni_aio *aio, int rv) +{ + nni_tcp_listener *l = nni_aio_get_prov_data(aio); + + // This is dead easy, because we'll ignore the completion if there + // isn't anything to do the accept on! + NNI_ASSERT(rv != 0); + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +int +nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) +{ + socklen_t len; + struct sockaddr_storage ss; + int rv; + int fd; + nni_posix_pfd * pfd; + + if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) { + return (NNG_EADDRINVAL); + } + + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_ESTATE); + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_mtx_unlock(&l->mtx); + return (nni_plat_errno(errno)); + } + + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + +// On the Windows Subsystem for Linux, SO_REUSEADDR behaves like Windows +// SO_REUSEADDR, which is almost completely different (and wrong!) from +// traditional SO_REUSEADDR. +#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL) + { + int on = 1; + // If for some reason this doesn't work, it's probably ok. + // Second bind will fail. + (void) setsockopt( + fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + } +#endif + + if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + + // Listen -- 128 depth is probably sufficient. If it isn't, other + // bad things are going to happen. + if (listen(fd, 128) != 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + + // Lets get the bound sockname, and pass that back to the caller. + // This permits ephemeral port binding to work. + // If this fails for some reason, we just don't update the + // sockaddr structure. This is kind of suboptimal, but failures + // here should never occur. + len = sizeof(ss); + (void) getsockname(fd, (void *) &ss, &len); + (void) nni_posix_sockaddr2nn(sa, &ss); + + nni_posix_pfd_set_cb(pfd, tcp_listener_cb, l); + + l->pfd = pfd; + l->started = true; + nni_mtx_unlock(&l->mtx); + + return (0); +} + +void +nni_tcp_listener_fini(nni_tcp_listener *l) +{ + nni_posix_pfd *pfd; + + nni_mtx_lock(&l->mtx); + tcp_listener_doclose(l); + pfd = l->pfd; + nni_mtx_unlock(&l->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +void +nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) +{ + int rv; + + // Accept is simpler than the connect case. With accept we just + // need to wait for the socket to be readable to indicate an incoming + // connection is ready for us. There isn't anything else for us to + // do really, as that will have been done in listen. + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + + if (!l->started) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&l->acceptq, aio); + if (nni_list_first(&l->acceptq) == aio) { + tcp_listener_doaccept(l); + } + nni_mtx_unlock(&l->mtx); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index 263a322b..93e45423 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -80,6 +80,17 @@ struct nni_win_event { nni_win_event_ops ops; }; +typedef struct nni_win_io nni_win_io; +typedef void (*nni_win_io_cb)(nni_win_io *, int, size_t); + +struct nni_win_io { + OVERLAPPED olpd; + HANDLE f; + void * ptr; + nni_aio * aio; + nni_win_io_cb cb; +}; + struct nni_plat_flock { HANDLE h; }; @@ -94,6 +105,13 @@ extern void nni_win_event_complete(nni_win_event *, int); extern int nni_win_iocp_register(HANDLE); +extern int nni_win_tcp_conn_init(nni_tcp_conn **, SOCKET); +extern void nni_win_tcp_conn_set_addrs( + nni_tcp_conn *, const SOCKADDR_STORAGE *, const SOCKADDR_STORAGE *); + +extern int nni_win_io_sysinit(void); +extern void nni_win_io_sysfini(void); + extern int nni_win_iocp_sysinit(void); extern void nni_win_iocp_sysfini(void); @@ -109,6 +127,12 @@ extern void nni_win_udp_sysfini(void); extern int nni_win_resolv_sysinit(void); extern void nni_win_resolv_sysfini(void); +extern int nni_win_io_init(nni_win_io *, HANDLE, nni_win_io_cb, void *); +extern void nni_win_io_fini(nni_win_io *); +extern void nni_win_io_cancel(nni_win_io *); + +extern int nni_win_io_register(HANDLE); + extern int nni_win_sockaddr2nn(nni_sockaddr *, const SOCKADDR_STORAGE *); extern int nni_win_nn2sockaddr(SOCKADDR_STORAGE *, const nni_sockaddr *); diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c new file mode 100644 index 00000000..1179b603 --- /dev/null +++ b/src/platform/windows/win_io.c @@ -0,0 +1,152 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include <stdio.h> + +// Windows IO Completion Port support. We basically create a single +// IO completion port, then start threads on it. Handles are added +// to the port on an as needed basis. We use a single IO completion +// port for pretty much everything. + +static int win_io_nthr = 0; +static HANDLE win_io_h = NULL; +static nni_thr *win_io_thrs; + +static void +win_io_handler(void *arg) +{ + NNI_ARG_UNUSED(arg); + + for (;;) { + DWORD cnt; + BOOL ok; + nni_win_io *item; + OVERLAPPED *olpd = NULL; + ULONG_PTR key = 0; + int rv; + + ok = GetQueuedCompletionStatus( + win_io_h, &cnt, &key, &olpd, INFINITE); + + if (olpd == NULL) { + // Completion port closed... + NNI_ASSERT(ok == FALSE); + break; + } + + item = CONTAINING_RECORD(olpd, nni_win_io, olpd); + rv = ok ? 0 : nni_win_error(GetLastError()); + item->cb(item, rv, (size_t) cnt); + } +} + +int +nni_win_io_register(HANDLE h) +{ + if (CreateIoCompletionPort(h, win_io_h, 0, 0) == NULL) { + return (nni_win_error(GetLastError())); + } + return (0); +} + +int +nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr) +{ + ZeroMemory(&io->olpd, sizeof(io->olpd)); + + io->cb = cb; + io->ptr = ptr; + io->aio = NULL; + io->f = f; + io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); + if (io->olpd.hEvent == NULL) { + return (nni_win_error(GetLastError())); + } + return (0); +} + +void +nni_win_io_cancel(nni_win_io *io) +{ + if (io->f != INVALID_HANDLE_VALUE) { + CancelIoEx(io->f, &io->olpd); + } +} + +void +nni_win_io_fini(nni_win_io *io) +{ + if (io->olpd.hEvent != NULL) { + CloseHandle((HANDLE) io->olpd.hEvent); + } +} + +int +nni_win_io_sysinit(void) +{ + HANDLE h; + int i; + int rv; + int nthr = nni_plat_ncpu() * 2; + + // Limits on the thread count. This is fairly arbitrary. + if (nthr < 4) { + nthr = 4; + } + if (nthr > 64) { + nthr = 64; + } + if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) { + return (NNG_ENOMEM); + } + win_io_nthr = nthr; + + h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nthr); + if (h == NULL) { + return (nni_win_error(GetLastError())); + } + win_io_h = h; + + for (i = 0; i < win_io_nthr; i++) { + rv = nni_thr_init(&win_io_thrs[i], win_io_handler, NULL); + if (rv != 0) { + goto fail; + } + } + for (i = 0; i < win_io_nthr; i++) { + nni_thr_run(&win_io_thrs[i]); + } + return (0); + +fail: + nni_win_io_sysfini(); + return (rv); +} + +void +nni_win_io_sysfini(void) +{ + int i; + HANDLE h; + + if ((h = win_io_h) != NULL) { + CloseHandle(h); + win_io_h = NULL; + } + for (i = 0; i < win_io_nthr; i++) { + nni_thr_fini(&win_io_thrs[i]); + } +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index fb9d6751..e2cd192e 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -69,7 +69,7 @@ resolv_cancel(nni_aio *aio, int rv) } static int -resolv_gai_errno(int rv) +resolv_errno(int rv) { switch (rv) { case 0: @@ -116,7 +116,7 @@ resolv_task(resolv_item *item) hints.ai_family = item->family; if ((rv = getaddrinfo(item->name, "80", &hints, &results)) != 0) { - rv = resolv_gai_errno(rv); + rv = resolv_errno(rv); goto done; } @@ -246,14 +246,14 @@ resolv_ip(const char *host, const char *serv, int passive, int family, } void -nni_plat_tcp_resolv( +nni_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void -nni_plat_udp_resolv( +nni_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); @@ -302,6 +302,47 @@ resolv_worker(void *notused) } int +nni_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) +{ + void * ap; + uint16_t port; + int af; + switch (sa->s_family) { + case NNG_AF_INET: + ap = (void *) &sa->s_in.sa_addr; + port = sa->s_in.sa_port; + af = AF_INET; + break; + case NNG_AF_INET6: + ap = (void *) &sa->s_in6.sa_addr; + port = sa->s_in6.sa_port; + af = AF_INET6; + break; + default: + return (NNG_EINVAL); + } + if (ipstr != NULL) { + if (af == AF_INET6) { + size_t l; + ipstr[0] = '['; + InetNtopA(af, ap, ipstr + 1, INET6_ADDRSTRLEN); + l = strlen(ipstr); + ipstr[l++] = ']'; + ipstr[l++] = '\0'; + } else { + InetNtopA(af, ap, ipstr, INET6_ADDRSTRLEN); + } + } + if (portstr != NULL) { +#ifdef NNG_LITTLE_ENDIAN + port = ((port >> 8) & 0xff) | ((port & 0xff) << 8); +#endif + snprintf(portstr, 6, "%u", port); + } + return (0); +} + +int nni_win_resolv_sysinit(void) { nni_mtx_init(&resolv_mtx); diff --git a/src/platform/windows/win_tcp.c b/src/platform/windows/win_tcp.c index 33c4a1a5..2ab6be0f 100644 --- a/src/platform/windows/win_tcp.c +++ b/src/platform/windows/win_tcp.c @@ -15,709 +15,6 @@ #include <malloc.h> #include <stdio.h> -struct nni_plat_tcp_pipe { - SOCKET s; - nni_win_event rcv_ev; - nni_win_event snd_ev; - SOCKADDR_STORAGE sockname; - SOCKADDR_STORAGE peername; -}; - -struct nni_plat_tcp_ep { - SOCKET s; - SOCKET acc_s; - nni_win_event con_ev; - nni_win_event acc_ev; - int started; - int bound; - - SOCKADDR_STORAGE remaddr; - int remlen; - SOCKADDR_STORAGE locaddr; - int loclen; - - char buf[512]; // to hold acceptex results - - // We have to lookup some function pointers using ioctls. Winsock, - // gotta love it. Especially I love that asynch accept means that - // getsockname and getpeername don't work. - LPFN_CONNECTEX connectex; - LPFN_ACCEPTEX acceptex; - LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs; -}; - -static int nni_win_tcp_pipe_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_pipe_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_pipe_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_pipe_ops = { - .wev_start = nni_win_tcp_pipe_start, - .wev_finish = nni_win_tcp_pipe_finish, - .wev_cancel = nni_win_tcp_pipe_cancel, -}; - -static int nni_win_tcp_acc_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_acc_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_acc_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_acc_ops = { - .wev_start = nni_win_tcp_acc_start, - .wev_finish = nni_win_tcp_acc_finish, - .wev_cancel = nni_win_tcp_acc_cancel, -}; - -static int nni_win_tcp_con_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_con_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_con_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_con_ops = { - .wev_start = nni_win_tcp_con_start, - .wev_finish = nni_win_tcp_con_finish, - .wev_cancel = nni_win_tcp_con_cancel, -}; - -static void -nni_win_tcp_sockinit(SOCKET s) -{ - BOOL yes; - DWORD no; - - // Don't inherit the handle (CLOEXEC really). - SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0); - - no = 0; - (void) setsockopt( - s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); - - // Also disable Nagle. We are careful to group data with WSASend, - // and latency is king for most of our users. (Consider adding - // a method to enable this later.) - yes = 1; - (void) setsockopt( - s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); -} - -static int -nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) -{ - int rv; - SOCKET s; - DWORD niov; - DWORD flags; - nni_plat_tcp_pipe *pipe = evt->ptr; - unsigned i; - unsigned naiov; - nni_iov * aiov; - WSABUF * iov; - - nni_aio_get_iov(aio, &naiov, &aiov); - iov = _malloca(naiov * sizeof(*iov)); - - // Put the AIOs in Windows form. - for (niov = 0, i = 0; i < naiov; i++) { - if (aiov[i].iov_len != 0) { - iov[niov].buf = aiov[i].iov_buf; - iov[niov].len = (ULONG) aiov[i].iov_len; - niov++; - } - } - - if ((s = pipe->s) == INVALID_SOCKET) { - _freea(iov); - evt->status = NNG_ECLOSED; - evt->count = 0; - return (1); - } - - // Note that the IOVs for the event were prepared on entry already. - // The actual aio's iov array we don't touch. - - evt->count = 0; - flags = 0; - if (evt == &pipe->snd_ev) { - rv = WSASend(s, iov, niov, NULL, flags, &evt->olpd, NULL); - } else { - rv = WSARecv(s, iov, niov, NULL, &flags, &evt->olpd, NULL); - } - _freea(iov); - - if ((rv == SOCKET_ERROR) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - - // Wait for the I/O completion event. Note that when an I/O - // completes immediately, the I/O completion packet is still - // delivered. - return (0); -} - -static void -nni_win_tcp_pipe_cancel(nni_win_event *evt) -{ - nni_plat_tcp_pipe *pipe = evt->ptr; - - (void) CancelIoEx((HANDLE) pipe->s, &evt->olpd); -} - -static void -nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio) -{ - if ((evt->status == 0) && (evt->count == 0)) { - // Windows sometimes returns a zero read. Convert these - // into an NNG_ECLOSED. (We are never supposed to come - // back with zero length read.) - evt->status = NNG_ECLOSED; - } - nni_aio_finish(aio, evt->status, evt->count); -} - -static int -nni_win_tcp_pipe_init(nni_plat_tcp_pipe **pipep, SOCKET s) -{ - nni_plat_tcp_pipe *pipe; - int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_tcp_pipe_ops, pipe); - if (rv != 0) { - nni_plat_tcp_pipe_fini(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->snd_ev, &nni_win_tcp_pipe_ops, pipe); - if (rv != 0) { - nni_plat_tcp_pipe_fini(pipe); - return (rv); - } - nni_win_tcp_sockinit(s); - pipe->s = s; - *pipep = pipe; - return (0); -} - -void -nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->snd_ev, aio); -} - -void -nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->rcv_ev, aio); -} - -void -nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *pipe) -{ - SOCKET s; - - nni_win_event_close(&pipe->rcv_ev); - - if ((s = pipe->s) != INVALID_SOCKET) { - pipe->s = INVALID_SOCKET; - closesocket(s); - } -} - -int -nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *pipe, nni_sockaddr *sa) -{ - if (nni_win_sockaddr2nn(sa, &pipe->peername) < 0) { - return (NNG_EADDRINVAL); - } - return (0); -} - -int -nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *pipe, nni_sockaddr *sa) -{ - if (nni_win_sockaddr2nn(sa, &pipe->sockname) < 0) { - return (NNG_EADDRINVAL); - } - return (0); -} - -int -nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *pipe, bool val) -{ - BOOL b; - b = val ? TRUE : FALSE; - if (setsockopt(pipe->s, IPPROTO_TCP, TCP_NODELAY, (void *) &b, - sizeof(b)) != 0) { - return (nni_win_error(WSAGetLastError())); - } - return (0); -} - -int -nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *pipe, bool val) -{ - BOOL b; - b = val ? TRUE : FALSE; - if (setsockopt(pipe->s, SOL_SOCKET, SO_KEEPALIVE, (void *) &b, - sizeof(b)) != 0) { - return (nni_win_error(WSAGetLastError())); - } - return (0); -} - -void -nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *pipe) -{ - nni_plat_tcp_pipe_close(pipe); - - nni_win_event_fini(&pipe->snd_ev); - nni_win_event_fini(&pipe->rcv_ev); - NNI_FREE_STRUCT(pipe); -} - -int -nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa, - const nni_sockaddr *rsa, int mode) -{ - nni_plat_tcp_ep *ep; - int rv; - SOCKET s; - DWORD nbytes; - GUID guid1 = WSAID_CONNECTEX; - GUID guid2 = WSAID_ACCEPTEX; - GUID guid3 = WSAID_GETACCEPTEXSOCKADDRS; - - NNI_ARG_UNUSED(mode); - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } - ZeroMemory(ep, sizeof(*ep)); - - ep->s = INVALID_SOCKET; - - if ((rsa != NULL) && (rsa->s_family != NNG_AF_UNSPEC)) { - ep->remlen = nni_win_nn2sockaddr(&ep->remaddr, rsa); - } - if ((lsa != NULL) && (lsa->s_family != NNG_AF_UNSPEC)) { - ep->loclen = nni_win_nn2sockaddr(&ep->locaddr, lsa); - } - - // Create a scratch socket for use with ioctl. - s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - // Look up the function pointer. - if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, - sizeof(guid1), &ep->connectex, sizeof(ep->connectex), &nbytes, - NULL, NULL) == SOCKET_ERROR) { - rv = nni_win_error(GetLastError()); - goto fail; - } - if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, - sizeof(guid2), &ep->acceptex, sizeof(ep->acceptex), &nbytes, - NULL, NULL) == SOCKET_ERROR) { - rv = nni_win_error(GetLastError()); - goto fail; - } - if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid3, - sizeof(guid3), &ep->getacceptexsockaddrs, - sizeof(ep->getacceptexsockaddrs), &nbytes, NULL, - NULL) == SOCKET_ERROR) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - closesocket(s); - s = INVALID_SOCKET; - - // Now initialize the win events for later use. - rv = nni_win_event_init(&ep->acc_ev, &nni_win_tcp_acc_ops, ep); - if (rv != 0) { - goto fail; - } - rv = nni_win_event_init(&ep->con_ev, &nni_win_tcp_con_ops, ep); - if (rv != 0) { - goto fail; - } - - *epp = ep; - return (0); - -fail: - if (s != INVALID_SOCKET) { - closesocket(s); - } - nni_plat_tcp_ep_fini(ep); - return (rv); -} - -void -nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) -{ - nni_win_event_close(&ep->acc_ev); - nni_win_event_close(&ep->con_ev); - if (ep->s != INVALID_SOCKET) { - closesocket(ep->s); - ep->s = INVALID_SOCKET; - } - if (ep->acc_s != INVALID_SOCKET) { - closesocket(ep->acc_s); - } -} - -void -nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) -{ - nni_plat_tcp_ep_close(ep); - NNI_FREE_STRUCT(ep); -} - -static int -nni_win_tcp_listen(nni_plat_tcp_ep *ep, nni_sockaddr *bsa) -{ - int rv; - BOOL yes; - SOCKET s; - - if (ep->started) { - return (NNG_EBUSY); - } - - s = socket(ep->locaddr.ss_family, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - nni_win_tcp_sockinit(s); - - if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { - goto fail; - } - - // Make sure that we use the address exclusively. Windows lets - // others hijack us by default. - yes = 1; - - rv = setsockopt( - s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, sizeof(yes)); - if (rv != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - if (bind(s, (struct sockaddr *) &ep->locaddr, ep->loclen) != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - if (bsa != NULL) { - SOCKADDR_STORAGE bound; - int len = sizeof(bound); - rv = getsockname(s, (SOCKADDR *) &bound, &len); - if (rv != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - nni_win_sockaddr2nn(bsa, &bound); - } - - if (listen(s, SOMAXCONN) != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - ep->s = s; - ep->started = 1; - - return (0); - -fail: - if (s != INVALID_SOCKET) { - closesocket(s); - } - return (rv); -} - -int -nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep, nng_sockaddr *bsa) -{ - int rv; - - nni_mtx_lock(&ep->acc_ev.mtx); - rv = nni_win_tcp_listen(ep, bsa); - nni_mtx_unlock(&ep->acc_ev.mtx); - return (rv); -} - -static void -nni_win_tcp_acc_cancel(nni_win_event *evt) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - - if (s != INVALID_SOCKET) { - CancelIoEx((HANDLE) s, &evt->olpd); - } -} - -static void -nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep * ep = evt->ptr; - nni_plat_tcp_pipe *pipe; - SOCKET s; - int rv; - int len1; - int len2; - SOCKADDR * sa1; - SOCKADDR * sa2; - - s = ep->acc_s; - ep->acc_s = INVALID_SOCKET; - - if (s == INVALID_SOCKET) { - return; - } - - if (((rv = evt->status) != 0) || - ((rv = nni_win_iocp_register((HANDLE) s)) != 0) || - ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { - closesocket(s); - nni_aio_finish_error(aio, rv); - return; - } - - // Collect the local and peer addresses, because normal getsockname - // and getpeername don't work with AcceptEx. - len1 = (int) sizeof(pipe->sockname); - len2 = (int) sizeof(pipe->peername); - ep->getacceptexsockaddrs( - ep->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2); - NNI_ASSERT(len1 > 0); - NNI_ASSERT(len1 < (int) sizeof(SOCKADDR_STORAGE)); - NNI_ASSERT(len2 > 0); - NNI_ASSERT(len2 < (int) sizeof(SOCKADDR_STORAGE)); - memcpy(&pipe->sockname, sa1, len1); - memcpy(&pipe->peername, sa2, len2); - - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); -} - -static int -nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - SOCKET acc_s; - DWORD cnt; - - NNI_ARG_UNUSED(aio); - - acc_s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); - if (acc_s == INVALID_SOCKET) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - return (1); - } - ep->acc_s = acc_s; - - if (!ep->acceptex(s, acc_s, ep->buf, 0, 256, 256, &cnt, &evt->olpd)) { - int rv = GetLastError(); - switch (rv) { - case ERROR_IO_PENDING: - // Normal asynchronous operation. Wait for - // completion. - return (0); - - default: - // Fast-fail (synchronous). - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - } - - // Synch completion right now. I/O completion packet delivered - // already. - return (0); -} - -void -nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_win_event_submit(&ep->acc_ev, aio); -} - -static void -nni_win_tcp_con_cancel(nni_win_event *evt) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - - if (s != INVALID_SOCKET) { - CancelIoEx((HANDLE) s, &evt->olpd); - } -} - -static void -nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep * ep = evt->ptr; - nni_plat_tcp_pipe *pipe; - SOCKET s; - int rv; - DWORD yes = 1; - int len; - - s = ep->s; - ep->s = INVALID_SOCKET; - - // The socket was already registered with the IOCP. - - if (((rv = evt->status) != 0) || - ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { - // The new pipe is already fine for us. Discard - // the old one, since failed to be able to use it. - closesocket(s); - nni_aio_finish_error(aio, rv); - return; - } - - (void) setsockopt(s, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, - (char *) &yes, sizeof(yes)); - - // Windows seems to be unable to get peernames for sockets on - // connect - perhaps because we supplied it already with connectex. - // Rather than debugging it, just steal the address from the endpoint. - memcpy(&pipe->peername, &ep->remaddr, ep->remlen); - - len = sizeof(pipe->sockname); - (void) getsockname(s, (SOCKADDR *) &pipe->sockname, &len); - - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); -} - -static int -nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s; - SOCKADDR_STORAGE bss; - int len; - int rv; - int family; - - NNI_ARG_UNUSED(aio); - - if (ep->loclen > 0) { - family = ep->locaddr.ss_family; - } else { - family = ep->remaddr.ss_family; - } - - s = socket(family, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - return (1); - } - - nni_win_tcp_sockinit(s); - - // Windows ConnectEx requires the socket to be bound first. - if (ep->loclen > 0) { - bss = ep->locaddr; - len = ep->loclen; - } else { - ZeroMemory(&bss, sizeof(bss)); - bss.ss_family = ep->remaddr.ss_family; - len = ep->remlen; - } - if (bind(s, (struct sockaddr *) &bss, len) < 0) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - closesocket(s); - - return (1); - } - // Register with the I/O completion port so we can get the - // events for the next call. - if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { - closesocket(s); - evt->status = rv; - evt->count = 0; - return (1); - } - - ep->s = s; - if (!ep->connectex(s, (struct sockaddr *) &ep->remaddr, ep->remlen, - NULL, 0, NULL, &evt->olpd)) { - if ((rv = GetLastError()) != ERROR_IO_PENDING) { - closesocket(s); - ep->s = INVALID_SOCKET; - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - } - return (0); -} - -extern void -nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_win_event_submit(&ep->con_ev, aio); -} - -int -nni_plat_tcp_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) -{ - void * ap; - uint16_t port; - int af; - switch (sa->s_family) { - case NNG_AF_INET: - ap = (void *) &sa->s_in.sa_addr; - port = sa->s_in.sa_port; - af = AF_INET; - break; - case NNG_AF_INET6: - ap = (void *) &sa->s_in6.sa_addr; - port = sa->s_in6.sa_port; - af = AF_INET6; - break; - default: - return (NNG_EINVAL); - } - if (ipstr != NULL) { - if (af == AF_INET6) { - size_t l; - ipstr[0] = '['; - InetNtopA(af, ap, ipstr + 1, INET6_ADDRSTRLEN); - l = strlen(ipstr); - ipstr[l++] = ']'; - ipstr[l++] = '\0'; - } else { - InetNtopA(af, ap, ipstr, INET6_ADDRSTRLEN); - } - } - if (portstr != NULL) { -#ifdef NNG_LITTLE_ENDIAN - port = ((port >> 8) & 0xff) | ((port & 0xff) << 8); -#endif - snprintf(portstr, 6, "%u", port); - } - return (0); -} - int nni_win_tcp_sysinit(void) { diff --git a/src/platform/windows/win_tcp.h b/src/platform/windows/win_tcp.h new file mode 100644 index 00000000..7025af81 --- /dev/null +++ b/src/platform/windows/win_tcp.h @@ -0,0 +1,67 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef PLATFORM_WIN_WINTCP_H +#define PLATFORM_WIN_WINTCP_H + +// This header file is private to the TCP support for Windows. + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +struct nni_tcp_conn { + SOCKET s; + nni_win_io recv_io; + nni_win_io send_io; + nni_win_io conn_io; + nni_list recv_aios; + nni_list send_aios; + nni_aio * conn_aio; + SOCKADDR_STORAGE sockname; + SOCKADDR_STORAGE peername; + nni_tcp_dialer * dialer; + nni_tcp_listener *listener; + int recv_rv; + int send_rv; + int conn_rv; + bool closed; + char buf[512]; // to hold acceptex results + nni_mtx mtx; + nni_cv cv; +}; + +struct nni_tcp_dialer { + LPFN_CONNECTEX connectex; // looked up name via ioctl + nni_list aios; // in flight connections + bool closed; + nni_mtx mtx; + nni_reap_item reap; +}; + +struct nni_tcp_listener { + SOCKET s; + nni_list aios; + bool closed; + bool started; + LPFN_ACCEPTEX acceptex; + LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs; + SOCKADDR_STORAGE ss; + nni_mtx mtx; + nni_reap_item reap; +}; + +extern int nni_win_tcp_conn_init(nni_tcp_conn **, SOCKET); +extern void nni_win_tcp_conn_set_addrs( + nni_tcp_conn *, const SOCKADDR_STORAGE *, const SOCKADDR_STORAGE *); + +#endif // NNG_PLATFORM_WINDOWS + +#endif // NNG_PLATFORM_WIN_WINTCP_H diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c new file mode 100644 index 00000000..c3a4a5d8 --- /dev/null +++ b/src/platform/windows/win_tcpconn.c @@ -0,0 +1,391 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include "win_tcp.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include <malloc.h> +#include <stdio.h> + +static void +tcp_recv_start(nni_tcp_conn *c) +{ + nni_aio *aio; + int rv; + DWORD niov; + DWORD flags; + unsigned i; + unsigned naiov; + nni_iov *aiov; + WSABUF * iov; + + if (c->closed) { + while ((aio = nni_list_first(&c->recv_aios)) != NULL) { + nni_list_remove(&c->recv_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } +again: + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + iov = _malloca(naiov * sizeof(*iov)); + + // Put the AIOs in Windows form. + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iov[niov].buf = aiov[i].iov_buf; + iov[niov].len = (ULONG) aiov[i].iov_len; + niov++; + } + } + + flags = 0; + rv = WSARecv(c->s, iov, niov, NULL, &flags, &c->recv_io.olpd, NULL); + _freea(iov); + + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +tcp_recv_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_tcp_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->recv_rv != 0) { + rv = c->recv_rv; + c->recv_rv = 0; + } + nni_aio_list_remove(aio); + tcp_recv_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + if ((rv == 0) && (num == 0)) { + // A zero byte receive is a remote close from the peer. + rv = NNG_ECLOSED; + } + nni_aio_finish_synch(aio, rv, num); +} + +static void +tcp_recv_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->recv_aios)) { + c->recv_rv = rv; + nni_win_io_cancel(&c->recv_io); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_recv_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->recv_aios, aio); + if (aio == nni_list_first(&c->recv_aios)) { + tcp_recv_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_send_start(nni_tcp_conn *c) +{ + nni_aio *aio; + int rv; + DWORD niov; + unsigned i; + unsigned naiov; + nni_iov *aiov; + WSABUF * iov; + + if (c->closed) { + while ((aio = nni_list_first(&c->send_aios)) != NULL) { + nni_list_remove(&c->send_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } + +again: + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + iov = _malloca(naiov * sizeof(*iov)); + + // Put the AIOs in Windows form. + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iov[niov].buf = aiov[i].iov_buf; + iov[niov].len = (ULONG) aiov[i].iov_len; + niov++; + } + } + + rv = WSASend(c->s, iov, niov, NULL, 0, &c->send_io.olpd, NULL); + _freea(iov); + + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +tcp_send_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->send_aios)) { + c->send_rv = rv; + nni_win_io_cancel(&c->send_io); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_send_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_tcp_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->send_rv != 0) { + rv = c->send_rv; + c->send_rv = 0; + } + nni_aio_list_remove(aio); // should always be at head + tcp_send_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + nni_aio_finish_synch(aio, rv, num); +} + +void +nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_send_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->send_aios, aio); + if (aio == nni_list_first(&c->send_aios)) { + tcp_send_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s) +{ + nni_tcp_conn *c; + int rv; + BOOL yes; + DWORD no; + + // Don't inherit the handle (CLOEXEC really). + SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0); + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + c->s = INVALID_SOCKET; + nni_mtx_init(&c->mtx); + nni_cv_init(&c->cv, &c->mtx); + nni_aio_list_init(&c->recv_aios); + nni_aio_list_init(&c->send_aios); + c->conn_aio = NULL; + + if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) != + 0) || + ((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) != + 0) || + ((rv = nni_win_io_register((HANDLE) s)) != 0)) { + nni_tcp_conn_fini(c); + return (rv); + } + + no = 0; + (void) setsockopt( + s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); + yes = 1; + (void) setsockopt( + s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); + + c->s = s; + *connp = c; + return (0); +} + +void +nni_win_tcp_conn_set_addrs( + nni_tcp_conn *c, const SOCKADDR_STORAGE *loc, const SOCKADDR_STORAGE *rem) +{ + memcpy(&c->sockname, loc, sizeof(*loc)); + memcpy(&c->peername, rem, sizeof(*rem)); +} + +void +nni_tcp_conn_close(nni_tcp_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + c->closed = true; + if (!nni_list_empty(&c->recv_aios)) { + nni_win_io_cancel(&c->recv_io); + } + if (!nni_list_empty(&c->send_aios)) { + nni_win_io_cancel(&c->send_io); + } + if (c->s != INVALID_SOCKET) { + shutdown(c->s, SD_BOTH); + } + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa) +{ + if (nni_win_sockaddr2nn(sa, &c->peername) < 0) { + return (NNG_EADDRINVAL); + } + return (0); +} + +int +nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa) +{ + if (nni_win_sockaddr2nn(sa, &c->sockname) < 0) { + return (NNG_EADDRINVAL); + } + return (0); +} + +int +nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool val) +{ + BOOL b; + b = val ? TRUE : FALSE; + if (setsockopt( + c->s, IPPROTO_TCP, TCP_NODELAY, (void *) &b, sizeof(b)) != 0) { + return (nni_win_error(WSAGetLastError())); + } + return (0); +} + +int +nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool val) +{ + BOOL b; + b = val ? TRUE : FALSE; + if (setsockopt( + c->s, SOL_SOCKET, SO_KEEPALIVE, (void *) &b, sizeof(b)) != 0) { + return (nni_win_error(WSAGetLastError())); + } + return (0); +} + +void +nni_tcp_conn_fini(nni_tcp_conn *c) +{ + nni_tcp_conn_close(c); + + nni_mtx_lock(&c->mtx); + while ((!nni_list_empty(&c->recv_aios)) || + (!nni_list_empty(&c->send_aios))) { + nni_cv_wait(&c->cv); + nni_mtx_unlock(&c->mtx); + } + nni_mtx_unlock(&c->mtx); + + nni_win_io_fini(&c->recv_io); + nni_win_io_fini(&c->send_io); + nni_win_io_fini(&c->conn_io); + + if (c->s != INVALID_SOCKET) { + closesocket(c->s); + } + nni_cv_fini(&c->cv); + nni_mtx_fini(&c->mtx); + NNI_FREE_STRUCT(c); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c new file mode 100644 index 00000000..4a3e9f2f --- /dev/null +++ b/src/platform/windows/win_tcpdial.c @@ -0,0 +1,228 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include "win_tcp.h" + +#include <malloc.h> +#include <stdio.h> + +int +nni_tcp_dialer_init(nni_tcp_dialer **dp) +{ + nni_tcp_dialer *d; + int rv; + SOCKET s; + DWORD nbytes; + GUID guid = WSAID_CONNECTEX; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + ZeroMemory(d, sizeof(*d)); + nni_mtx_init(&d->mtx); + nni_aio_list_init(&d->aios); + + // Create a scratch socket for use with ioctl. + s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + nni_tcp_dialer_fini(d); + return (rv); + } + + // Look up the function pointer. + if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, + sizeof(guid), &d->connectex, sizeof(d->connectex), &nbytes, + NULL, NULL) == SOCKET_ERROR) { + rv = nni_win_error(GetLastError()); + closesocket(s); + nni_tcp_dialer_fini(d); + return (rv); + } + + closesocket(s); + + *dp = d; + return (0); +} + +void +nni_tcp_dialer_close(nni_tcp_dialer *d) +{ + nni_mtx_lock(&d->mtx); + if (!d->closed) { + nni_aio *aio; + d->closed = true; + + NNI_LIST_FOREACH (&d->aios, aio) { + nni_tcp_conn *c; + + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->conn_rv = NNG_ECLOSED; + nni_win_io_cancel(&c->conn_io); + } + } + } + nni_mtx_unlock(&d->mtx); +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_close(d); + nni_mtx_lock(&d->mtx); + if (!nni_list_empty(&d->aios)) { + nni_mtx_unlock(&d->mtx); + nni_reap(&d->reap, (nni_cb) nni_tcp_dialer_fini, d); + return; + } + nni_mtx_unlock(&d->mtx); + + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +tcp_dial_cancel(nni_aio *aio, int rv) +{ + nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_conn * c; + + nni_mtx_lock(&d->mtx); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + if (c->conn_rv == 0) { + c->conn_rv = rv; + } + nni_win_io_cancel(&c->conn_io); + } + nni_mtx_unlock(&d->mtx); +} + +static void +tcp_dial_cb(nni_win_io *io, int rv, size_t cnt) +{ + nni_tcp_conn * c = io->ptr; + nni_tcp_dialer *d = c->dialer; + nni_aio * aio = c->conn_aio; + + NNI_ARG_UNUSED(cnt); + + nni_mtx_lock(&d->mtx); + if ((aio = c->conn_aio) == NULL) { + // This should never occur. + nni_mtx_unlock(&d->mtx); + return; + } + + c->conn_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_aio_list_remove(aio); + if (c->conn_rv != 0) { + rv = c->conn_rv; + } + nni_mtx_unlock(&d->mtx); + + if (rv != 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + } else { + DWORD yes = 1; + (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, + (char *) &yes, sizeof(yes)); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +void +nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + SOCKET s; + SOCKADDR_STORAGE ss; + int len; + nni_tcp_conn * c; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if ((len = nni_win_nn2sockaddr(&ss, sa)) <= 0) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + + if ((s = socket(ss.ss_family, SOCK_STREAM, 0)) == INVALID_SOCKET) { + nni_aio_finish_error(aio, nni_win_error(GetLastError())); + return; + } + + if ((rv = nni_win_tcp_conn_init(&c, s)) != 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + c->peername = ss; + + // Windows ConnectEx requires the socket to be bound + // first. We just bind to an ephemeral address in the + // same family. + ZeroMemory(&c->sockname, sizeof(c->sockname)); + c->sockname.ss_family = ss.ss_family; + if (bind(s, (SOCKADDR *) &c->sockname, len) < 0) { + rv = nni_win_error(GetLastError()); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) != + 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + nni_mtx_lock(&d->mtx); + if (d->closed) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + c->dialer = d; + nni_aio_set_prov_extra(aio, 0, c); + if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; + } + c->conn_aio = aio; + nni_aio_list_append(&d->aios, aio); + + // dialing is concurrent. + if (!d->connectex(s, (struct sockaddr *) &c->peername, len, NULL, 0, + NULL, &c->conn_io.olpd)) { + if ((rv = GetLastError()) != ERROR_IO_PENDING) { + nni_aio_list_remove(aio); + nni_mtx_unlock(&d->mtx); + + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + } + nni_mtx_unlock(&d->mtx); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c new file mode 100644 index 00000000..5055c32d --- /dev/null +++ b/src/platform/windows/win_tcplisten.c @@ -0,0 +1,312 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include <malloc.h> +#include <stdbool.h> +#include <stdio.h> + +#include "win_tcp.h" + +// tcp_listener_funcs looks up function pointers we need for advanced accept +// functionality on Windows. Windows is weird. +static int +tcp_listener_funcs(nni_tcp_listener *l) +{ + static SRWLOCK lock = SRWLOCK_INIT; + static LPFN_ACCEPTEX acceptex; + static LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs; + + AcquireSRWLockExclusive(&lock); + if (acceptex == NULL) { + int rv; + DWORD nbytes; + GUID guid1 = WSAID_ACCEPTEX; + GUID guid2 = WSAID_GETACCEPTEXSOCKADDRS; + SOCKET s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + + if (s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + ReleaseSRWLockExclusive(&lock); + return (rv); + } + + // Look up the function pointer. + if ((WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, + sizeof(guid1), &acceptex, sizeof(acceptex), &nbytes, + NULL, NULL) == SOCKET_ERROR) || + (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, + sizeof(guid2), &getacceptexsockaddrs, + sizeof(getacceptexsockaddrs), &nbytes, NULL, + NULL) == SOCKET_ERROR)) { + rv = nni_win_error(GetLastError()); + acceptex = NULL; + getacceptexsockaddrs = NULL; + ReleaseSRWLockExclusive(&lock); + closesocket(s); + return (rv); + } + closesocket(s); + } + ReleaseSRWLockExclusive(&lock); + + l->acceptex = acceptex; + l->getacceptexsockaddrs = getacceptexsockaddrs; + return (0); +} + +static void +tcp_accept_cb(nni_win_io *io, int rv, size_t cnt) +{ + nni_tcp_conn * c = io->ptr; + nni_tcp_listener *l = c->listener; + nni_aio * aio; + int len1; + int len2; + SOCKADDR * sa1; + SOCKADDR * sa2; + DWORD yes; + + NNI_ARG_UNUSED(cnt); + + nni_mtx_lock(&l->mtx); + if ((aio = c->conn_aio) == NULL) { + // This case should not occur. The situation would indicate + // a case where the connection was accepted already. + nni_mtx_unlock(&l->mtx); + return; + } + c->conn_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_aio_list_remove(aio); + if (c->conn_rv != 0) { + rv = c->conn_rv; + } + nni_mtx_unlock(&l->mtx); + + if (rv != 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + len1 = (int) sizeof(c->sockname); + len2 = (int) sizeof(c->peername); + l->getacceptexsockaddrs(c->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2); + memcpy(&c->sockname, sa1, len1); + memcpy(&c->peername, sa2, len2); + + yes = 1; + (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, + (char *) &yes, sizeof(yes)); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +int +nni_tcp_listener_init(nni_tcp_listener **lp) +{ + nni_tcp_listener *l; + int rv; + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + ZeroMemory(l, sizeof(*l)); + nni_mtx_init(&l->mtx); + nni_aio_list_init(&l->aios); + if ((rv = tcp_listener_funcs(l)) != 0) { + nni_tcp_listener_fini(l); + return (rv); + } + + *lp = l; + return (0); +} + +void +nni_tcp_listener_close(nni_tcp_listener *l) +{ + nni_mtx_lock(&l->mtx); + if (!l->closed) { + nni_aio *aio; + l->closed = true; + + NNI_LIST_FOREACH (&l->aios, aio) { + nni_tcp_conn *c; + + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->conn_rv = NNG_ECLOSED; + nni_win_io_cancel(&c->conn_io); + } + } + closesocket(l->s); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_tcp_listener_fini(nni_tcp_listener *l) +{ + nni_tcp_listener_close(l); + nni_mtx_lock(&l->mtx); + if (!nni_list_empty(&l->aios)) { + nni_mtx_unlock(&l->mtx); + nni_reap(&l->reap, (nni_cb) nni_tcp_listener_fini, l); + return; + } + nni_mtx_unlock(&l->mtx); + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +int +nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) +{ + int rv; + BOOL yes; + DWORD no; + int len; + + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_EBUSY); + } + if ((len = nni_win_nn2sockaddr(&l->ss, sa)) <= 0) { + nni_mtx_unlock(&l->mtx); + return (NNG_EADDRINVAL); + } + l->s = socket(l->ss.ss_family, SOCK_STREAM, 0); + if (l->s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + nni_mtx_unlock(&l->mtx); + return (rv); + } + + // Don't inherit the handle (CLOEXEC really). + SetHandleInformation((HANDLE) l->s, HANDLE_FLAG_INHERIT, 0); + + no = 0; + (void) setsockopt( + l->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); + yes = 1; + (void) setsockopt( + l->s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); + + if ((rv = nni_win_io_register((HANDLE) l->s)) != 0) { + closesocket(l->s); + l->s = INVALID_SOCKET; + nni_mtx_unlock(&l->mtx); + return (rv); + } + + // Make sure that we use the address exclusively. Windows lets + // others hijack us by default. + yes = 1; + if ((setsockopt(l->s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, + sizeof(yes)) != 0) || + (bind(l->s, (SOCKADDR *) &l->ss, len) != 0) || + (getsockname(l->s, (SOCKADDR *) &l->ss, &len) != 0) || + (listen(l->s, SOMAXCONN) != 0)) { + rv = nni_win_error(GetLastError()); + closesocket(l->s); + l->s = INVALID_SOCKET; + nni_mtx_unlock(&l->mtx); + return (rv); + } + nni_win_sockaddr2nn(sa, &l->ss); + l->started = true; + nni_mtx_unlock(&l->mtx); + return (0); +} + +static void +tcp_accept_cancel(nni_aio *aio, int rv) +{ + nni_tcp_listener *l = nni_aio_get_prov_data(aio); + nni_tcp_conn * c; + + nni_mtx_lock(&l->mtx); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + if (c->conn_rv == 0) { + c->conn_rv = rv; + } + nni_win_io_cancel(&c->conn_io); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) +{ + SOCKET s; + int rv; + DWORD cnt; + nni_tcp_conn *c; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + // Windows requires us to explicity create the socket before + // calling accept on it. + if ((s = socket(l->ss.ss_family, SOCK_STREAM, 0)) == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_win_tcp_conn_init(&c, s)) != 0) { + nni_mtx_unlock(&l->mtx); + closesocket(s); + nni_aio_finish_error(aio, rv); + return; + } + c->listener = l; + c->conn_aio = aio; + nni_aio_set_prov_extra(aio, 0, c); + if (((rv = nni_win_io_init( + &c->conn_io, (HANDLE) l->s, tcp_accept_cb, c)) != 0) || + ((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) { + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&l->mtx); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&l->aios, aio); + if ((!l->acceptex( + l->s, s, c->buf, 0, 256, 256, &cnt, &c->conn_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Fast failure (synchronous.) + nni_aio_list_remove(aio); + nni_mtx_unlock(&l->mtx); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + nni_mtx_unlock(&l->mtx); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index 243811a0..52327cc4 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -181,7 +181,8 @@ nni_plat_init(int (*helper)(void)) AcquireSRWLockExclusive(&lock); if (!plat_inited) { - if (((rv = nni_win_iocp_sysinit()) != 0) || + if (((rv = nni_win_io_sysinit()) != 0) || + ((rv = nni_win_iocp_sysinit()) != 0) || ((rv = nni_win_ipc_sysinit()) != 0) || ((rv = nni_win_tcp_sysinit()) != 0) || ((rv = nni_win_udp_sysinit()) != 0) || @@ -207,6 +208,7 @@ nni_plat_fini(void) nni_win_udp_sysfini(); nni_win_tcp_sysfini(); nni_win_iocp_sysfini(); + nni_win_io_sysfini(); WSACleanup(); plat_inited = 0; } diff --git a/src/supplemental/http/http_api.h b/src/supplemental/http/http_api.h index 4b515ca5..cf2c78bf 100644 --- a/src/supplemental/http/http_api.h +++ b/src/supplemental/http/http_api.h @@ -67,9 +67,9 @@ extern void *nni_http_conn_get_ctx(nni_http_conn *); // These initialization functions create stream for HTTP transactions. // They should only be used by the server or client HTTP implementations, // and are not for use by other code. -extern int nni_http_conn_init_tcp(nni_http_conn **, void *); +extern int nni_http_conn_init_tcp(nni_http_conn **, nni_tcp_conn *); extern int nni_http_conn_init_tls( - nni_http_conn **, struct nng_tls_config *, void *); + nni_http_conn **, struct nng_tls_config *, nni_tcp_conn *); extern void nni_http_conn_close(nni_http_conn *); extern void nni_http_conn_fini(nni_http_conn *); diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index 4034bbff..6842b271 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -20,72 +20,97 @@ #include "http_api.h" struct nng_http_client { - nni_list aios; - nni_mtx mtx; - bool closed; - struct nng_tls_config *tls; - nni_aio * connaio; - nni_plat_tcp_ep * tep; + nni_list aios; + nni_mtx mtx; + bool closed; + bool resolving; + nng_tls_config *tls; + nni_aio * aio; + nng_sockaddr sa; + nni_tcp_dialer *dialer; + char * host; + char * port; + nni_url * url; }; static void -http_conn_start(nni_http_client *c) +http_dial_start(nni_http_client *c) { - nni_plat_tcp_ep_connect(c->tep, c->connaio); + nni_aio *aio; + + if ((aio = nni_list_first(&c->aios)) == NULL) { + return; + } + c->resolving = true; + nni_aio_set_input(c->aio, 0, &c->sa); + nni_tcp_resolv(c->host, c->port, NNG_AF_UNSPEC, 0, c->aio); } static void -http_conn_done(void *arg) +http_dial_cb(void *arg) { - nni_http_client * c = arg; - nni_aio * aio; - int rv; - nni_plat_tcp_pipe *p; - nni_http_conn * conn; + nni_http_client *c = arg; + nni_aio * aio; + int rv; + nni_tcp_conn * tcp; + nni_http_conn * conn; nni_mtx_lock(&c->mtx); - rv = nni_aio_result(c->connaio); - p = rv == 0 ? nni_aio_get_output(c->connaio, 0) : NULL; + rv = nni_aio_result(c->aio); + if ((aio = nni_list_first(&c->aios)) == NULL) { - if (p != NULL) { - nni_plat_tcp_pipe_fini(p); - } + // User abandoned request, and no residuals left. nni_mtx_unlock(&c->mtx); + if ((rv == 0) && !c->resolving) { + tcp = nni_aio_get_output(c->aio, 0); + nni_tcp_conn_fini(tcp); + } return; } - nni_aio_list_remove(aio); if (rv != 0) { + nni_aio_list_remove(aio); + http_dial_start(c); + nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); + return; + } + + if (c->resolving) { + // This was a DNS lookup -- advance to normal TCP connect. + c->resolving = false; + nni_tcp_dialer_dial(c->dialer, &c->sa, c->aio); nni_mtx_unlock(&c->mtx); return; } + nni_aio_list_remove(aio); + tcp = nni_aio_get_output(c->aio, 0); + NNI_ASSERT(tcp != NULL); + if (c->tls != NULL) { - rv = nni_http_conn_init_tls(&conn, c->tls, p); + rv = nni_http_conn_init_tls(&conn, c->tls, tcp); } else { - rv = nni_http_conn_init_tcp(&conn, p); + rv = nni_http_conn_init_tcp(&conn, tcp); } + http_dial_start(c); + nni_mtx_unlock(&c->mtx); + if (rv != 0) { + // the conn_init function will have already discard tcp. nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&c->mtx); return; } nni_aio_set_output(aio, 0, conn); nni_aio_finish(aio, 0, 0); - - if (!nni_list_empty(&c->aios)) { - http_conn_start(c); - } - nni_mtx_unlock(&c->mtx); } void nni_http_client_fini(nni_http_client *c) { - nni_aio_fini(c->connaio); - nni_plat_tcp_ep_fini(c->tep); + nni_aio_fini(c->aio); + nni_tcp_dialer_fini(c->dialer); nni_mtx_fini(&c->mtx); #ifdef NNG_SUPP_TLS if (c->tls != NULL) { @@ -100,10 +125,6 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url) { int rv; nni_http_client *c; - nni_aio * aio; - nni_sockaddr sa; - char * host; - char * port; if (strlen(url->u_hostname) == 0) { // We require a valid hostname. @@ -118,29 +139,17 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url) return (NNG_EADDRINVAL); } - // For now we are looking up the address. We would really like - // to do this later, but we need TcP support for this. One - // imagines the ability to create a tcp dialer that does the - // necessary DNS lookups, etc. all asynchronously. - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } - nni_aio_set_input(aio, 0, &sa); - host = (strlen(url->u_hostname) != 0) ? url->u_hostname : NULL; - port = (strlen(url->u_port) != 0) ? url->u_port : NULL; - nni_plat_tcp_resolv(host, port, NNG_AF_UNSPEC, false, aio); - nni_aio_wait(aio); - rv = nni_aio_result(aio); - nni_aio_fini(aio); - if (rv != 0) { - return (rv); - } - if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&c->mtx); nni_aio_list_init(&c->aios); + if (((c->host = nni_strdup(url->u_hostname)) == NULL) || + ((strlen(url->u_port) != 0) && + ((c->port = nni_strdup(url->u_port)) == NULL))) { + nni_http_client_fini(c); + return (NNG_ENOMEM); + } #ifdef NNG_SUPP_TLS if ((strcmp(url->u_scheme, "https") == 0) || @@ -167,16 +176,12 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url) } #endif - rv = nni_plat_tcp_ep_init(&c->tep, NULL, &sa, NNI_EP_MODE_DIAL); - if (rv != 0) { + if (((rv = nni_tcp_dialer_init(&c->dialer)) != 0) || + ((rv = nni_aio_init(&c->aio, http_dial_cb, c)) != 0)) { nni_http_client_fini(c); return (rv); } - if ((rv = nni_aio_init(&c->connaio, http_conn_done, c)) != 0) { - nni_http_client_fini(c); - return (rv); - } *cp = c; return (0); } @@ -224,7 +229,7 @@ nni_http_client_get_tls(nni_http_client *c, struct nng_tls_config **tlsp) } static void -http_connect_cancel(nni_aio *aio, int rv) +http_dial_cancel(nni_aio *aio, int rv) { nni_http_client *c = nni_aio_get_prov_data(aio); nni_mtx_lock(&c->mtx); @@ -233,7 +238,7 @@ http_connect_cancel(nni_aio *aio, int rv) nni_aio_finish_error(aio, rv); } if (nni_list_empty(&c->aios)) { - nni_aio_abort(c->connaio, rv); + nni_aio_abort(c->aio, rv); } nni_mtx_unlock(&c->mtx); } @@ -246,14 +251,14 @@ nni_http_client_connect(nni_http_client *c, nni_aio *aio) return; } nni_mtx_lock(&c->mtx); - if ((rv = nni_aio_schedule(aio, http_connect_cancel, c)) != 0) { + if ((rv = nni_aio_schedule(aio, http_dial_cancel, c)) != 0) { nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); return; } nni_list_append(&c->aios, aio); if (nni_list_first(&c->aios) == aio) { - http_conn_start(c); + http_dial_start(c); } nni_mtx_unlock(&c->mtx); } diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c index 15d1f776..169918e9 100644 --- a/src/supplemental/http/http_conn.c +++ b/src/supplemental/http/http_conn.c @@ -699,17 +699,23 @@ http_init(nni_http_conn **connp, nni_http_tran *tran, void *data) if ((conn = NNI_ALLOC_STRUCT(conn)) == NULL) { return (NNG_ENOMEM); } - conn->rd_bufsz = HTTP_BUFSIZE; - if ((conn->rd_buf = nni_alloc(conn->rd_bufsz)) == NULL) { - NNI_FREE_STRUCT(conn); - return (NNG_ENOMEM); - } nni_mtx_init(&conn->mtx); nni_aio_list_init(&conn->rdq); nni_aio_list_init(&conn->wrq); + if ((conn->rd_buf = nni_alloc(HTTP_BUFSIZE)) == NULL) { + nni_http_conn_fini(conn); + return (NNG_ENOMEM); + } + conn->rd_bufsz = HTTP_BUFSIZE; + + if (((rv = nni_aio_init(&conn->wr_aio, http_wr_cb, conn)) != 0) || + ((rv = nni_aio_init(&conn->rd_aio, http_rd_cb, conn)) != 0)) { + nni_http_conn_fini(conn); + return (rv); + } + conn->sock = data; - conn->rd_bufsz = HTTP_BUFSIZE; conn->rd = tran->h_read; conn->wr = tran->h_write; conn->close = tran->h_close; @@ -718,12 +724,6 @@ http_init(nni_http_conn **connp, nni_http_tran *tran, void *data) conn->peer_addr = tran->h_peer_addr; conn->verified = tran->h_verified; - if (((rv = nni_aio_init(&conn->wr_aio, http_wr_cb, conn)) != 0) || - ((rv = nni_aio_init(&conn->rd_aio, http_rd_cb, conn)) != 0)) { - nni_http_conn_fini(conn); - return (rv); - } - *connp = conn; return (0); @@ -737,19 +737,23 @@ nni_http_verified_tcp(void *arg) } static nni_http_tran http_tcp_ops = { - .h_read = (http_read_fn) nni_plat_tcp_pipe_recv, - .h_write = (http_write_fn) nni_plat_tcp_pipe_send, - .h_close = (http_close_fn) nni_plat_tcp_pipe_close, - .h_fini = (http_fini_fn) nni_plat_tcp_pipe_fini, - .h_sock_addr = (http_addr_fn) nni_plat_tcp_pipe_sockname, - .h_peer_addr = (http_addr_fn) nni_plat_tcp_pipe_peername, + .h_read = (http_read_fn) nni_tcp_conn_recv, + .h_write = (http_write_fn) nni_tcp_conn_send, + .h_close = (http_close_fn) nni_tcp_conn_close, + .h_fini = (http_fini_fn) nni_tcp_conn_fini, + .h_sock_addr = (http_addr_fn) nni_tcp_conn_sockname, + .h_peer_addr = (http_addr_fn) nni_tcp_conn_peername, .h_verified = (http_verified_fn) nni_http_verified_tcp, }; int -nni_http_conn_init_tcp(nni_http_conn **connp, void *tcp) +nni_http_conn_init_tcp(nni_http_conn **connp, nni_tcp_conn *tcp) { - return (http_init(connp, &http_tcp_ops, tcp)); + int rv; + if ((rv = http_init(connp, &http_tcp_ops, tcp)) != 0) { + nni_tcp_conn_fini(tcp); + } + return (rv); } #ifdef NNG_SUPP_TLS @@ -765,26 +769,29 @@ static nni_http_tran http_tls_ops = { int nni_http_conn_init_tls( - nni_http_conn **connp, struct nng_tls_config *cfg, void *tcp) + nni_http_conn **connp, struct nng_tls_config *cfg, nni_tcp_conn *tcp) { nni_tls *tls; int rv; if ((rv = nni_tls_init(&tls, cfg, tcp)) != 0) { - nni_plat_tcp_pipe_fini(tcp); + nni_tcp_conn_fini(tcp); return (rv); } - return (http_init(connp, &http_tls_ops, tls)); + if ((rv = http_init(connp, &http_tls_ops, tls)) != 0) { + nni_tls_fini(tls); + } + return (rv); } #else int nni_http_conn_init_tls( - nni_http_conn **connp, struct nng_tls_config *cfg, void *tcp) + nni_http_conn **connp, struct nng_tls_config *cfg, nni_tcp_conn *tcp) { NNI_ARG_UNUSED(connp); NNI_ARG_UNUSED(cfg); - nni_plat_tcp_pipe_fini(tcp); + nni_tcp_conn_fini(tcp); return (NNG_ENOTSUP); } #endif // NNG_SUPP_TLS diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index 4a07d544..9d1313b1 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -64,22 +64,22 @@ typedef struct http_error { } http_error; struct nng_http_server { - nng_sockaddr addr; - nni_list_node node; - int refcnt; - int starts; - nni_list handlers; - nni_list conns; - nni_mtx mtx; - nni_cv cv; - bool closed; - nng_tls_config * tls; - nni_aio * accaio; - nni_plat_tcp_ep *tep; - char * port; - char * hostname; - nni_list errors; - nni_mtx errors_mtx; + nng_sockaddr addr; + nni_list_node node; + int refcnt; + int starts; + nni_list handlers; + nni_list conns; + nni_mtx mtx; + nni_cv cv; + bool closed; + nng_tls_config * tls; + nni_aio * accaio; + nni_tcp_listener *listener; + char * port; + char * hostname; + nni_list errors; + nni_mtx errors_mtx; }; int @@ -682,13 +682,13 @@ http_sconn_cbdone(void *arg) } static int -http_sconn_init(http_sconn **scp, nni_http_server *s, nni_plat_tcp_pipe *tcp) +http_sconn_init(http_sconn **scp, nni_http_server *s, nni_tcp_conn *tcp) { http_sconn *sc; int rv; if ((sc = NNI_ALLOC_STRUCT(sc)) == NULL) { - nni_plat_tcp_pipe_fini(tcp); + nni_tcp_conn_fini(tcp); return (NNG_ENOMEM); } @@ -720,17 +720,17 @@ http_sconn_init(http_sconn **scp, nni_http_server *s, nni_plat_tcp_pipe *tcp) static void http_server_acccb(void *arg) { - nni_http_server * s = arg; - nni_aio * aio = s->accaio; - nni_plat_tcp_pipe *tcp; - http_sconn * sc; - int rv; + nni_http_server *s = arg; + nni_aio * aio = s->accaio; + nni_tcp_conn * tcp; + http_sconn * sc; + int rv; nni_mtx_lock(&s->mtx); if ((rv = nni_aio_result(aio)) != 0) { if (!s->closed) { // try again? - nni_plat_tcp_ep_accept(s->tep, s->accaio); + nni_tcp_listener_accept(s->listener, s->accaio); } nni_mtx_unlock(&s->mtx); return; @@ -738,14 +738,14 @@ http_server_acccb(void *arg) tcp = nni_aio_get_output(aio, 0); if (s->closed) { // If we're closing, then reject this one. - nni_plat_tcp_pipe_fini(tcp); + nni_tcp_conn_fini(tcp); nni_mtx_unlock(&s->mtx); return; } if (http_sconn_init(&sc, s, tcp) != 0) { // The TCP structure is already cleaned up. // Start another accept attempt. - nni_plat_tcp_ep_accept(s->tep, s->accaio); + nni_tcp_listener_accept(s->listener, s->accaio); nni_mtx_unlock(&s->mtx); return; } @@ -753,7 +753,7 @@ http_server_acccb(void *arg) nni_list_append(&s->conns, sc); nni_http_read_req(sc->conn, sc->req, sc->rxaio); - nni_plat_tcp_ep_accept(s->tep, s->accaio); + nni_tcp_listener_accept(s->listener, s->accaio); nni_mtx_unlock(&s->mtx); } @@ -769,8 +769,8 @@ http_server_fini(nni_http_server *s) while (!nni_list_empty(&s->conns)) { nni_cv_wait(&s->cv); } - if (s->tep != NULL) { - nni_plat_tcp_ep_fini(s->tep); + if (s->listener != NULL) { + nni_tcp_listener_fini(s->listener); } while ((h = nni_list_first(&s->handlers)) != NULL) { nni_list_remove(&s->handlers, h); @@ -875,7 +875,7 @@ http_server_init(nni_http_server **serverp, const nni_url *url) return (rv); } nni_aio_set_input(aio, 0, &s->addr); - nni_plat_tcp_resolv(s->hostname, s->port, NNG_AF_UNSPEC, true, aio); + nni_tcp_resolv(s->hostname, s->port, NNG_AF_UNSPEC, true, aio); nni_aio_wait(aio); rv = nni_aio_result(aio); nni_aio_fini(aio); @@ -921,16 +921,15 @@ static int http_server_start(nni_http_server *s) { int rv; - rv = nni_plat_tcp_ep_init(&s->tep, &s->addr, NULL, NNI_EP_MODE_LISTEN); - if (rv != 0) { + if ((rv = nni_tcp_listener_init(&s->listener)) != 0) { return (rv); } - if ((rv = nni_plat_tcp_ep_listen(s->tep, NULL)) != 0) { - nni_plat_tcp_ep_fini(s->tep); - s->tep = NULL; + if ((rv = nni_tcp_listener_listen(s->listener, &s->addr)) != 0) { + nni_tcp_listener_fini(s->listener); + s->listener = NULL; return (rv); } - nni_plat_tcp_ep_accept(s->tep, s->accaio); + nni_tcp_listener_accept(s->listener, s->accaio); return (0); } @@ -961,8 +960,8 @@ http_server_stop(nni_http_server *s) s->closed = true; // Close the TCP endpoint that is listening. - if (s->tep) { - nni_plat_tcp_ep_close(s->tep); + if (s->listener) { + nni_tcp_listener_close(s->listener); } // Stopping the server is a hard stop -- it aborts any work diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c index cdd226cd..0f4f67cc 100644 --- a/src/supplemental/tls/mbedtls/tls.c +++ b/src/supplemental/tls/mbedtls/tls.c @@ -59,7 +59,7 @@ typedef struct nni_tls_certkey { } nni_tls_certkey; struct nni_tls { - nni_plat_tcp_pipe * tcp; + nni_tcp_conn * tcp; mbedtls_ssl_context ctx; nng_tls_config * cfg; // kept so we can release it nni_mtx lk; @@ -254,14 +254,14 @@ nni_tls_fini(nni_tls *tp) { // Shut it all down first. if (tp->tcp) { - nni_plat_tcp_pipe_close(tp->tcp); + nni_tcp_conn_close(tp->tcp); } nni_aio_stop(tp->tcp_send); nni_aio_stop(tp->tcp_recv); // And finalize / free everything. if (tp->tcp) { - nni_plat_tcp_pipe_fini(tp->tcp); + nni_tcp_conn_fini(tp->tcp); } nni_aio_fini(tp->tcp_send); nni_aio_fini(tp->tcp_recv); @@ -306,7 +306,7 @@ nni_tls_mkerr(int err) } int -nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_plat_tcp_pipe *tcp) +nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp) { nni_tls *tp; int rv; @@ -314,7 +314,7 @@ nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_plat_tcp_pipe *tcp) // During the handshake, disable Nagle to shorten the // negotiation. Once things are set up the caller can // re-enable Nagle if so desired. - (void) nni_plat_tcp_pipe_set_nodelay(tcp, true); + (void) nni_tcp_conn_set_nodelay(tcp, true); if ((tp = NNI_ALLOC_STRUCT(tp)) == NULL) { return (NNG_ENOMEM); @@ -387,7 +387,7 @@ nni_tls_fail(nni_tls *tp, int rv) { nni_aio *aio; tp->tls_closed = true; - nni_plat_tcp_pipe_close(tp->tcp); + nni_tcp_conn_close(tp->tcp); tp->tcp_closed = true; while ((aio = nni_list_first(&tp->recvs)) != NULL) { nni_list_remove(&tp->recvs, aio); @@ -408,7 +408,7 @@ nni_tls_send_cb(void *ctx) nni_mtx_lock(&tp->lk); if (nni_aio_result(aio) != 0) { - nni_plat_tcp_pipe_close(tp->tcp); + nni_tcp_conn_close(tp->tcp); tp->tcp_closed = true; } else { size_t n = nni_aio_count(aio); @@ -421,7 +421,7 @@ nni_tls_send_cb(void *ctx) iov.iov_len = tp->sendlen; nni_aio_set_iov(aio, 1, &iov); nni_aio_set_timeout(aio, NNG_DURATION_INFINITE); - nni_plat_tcp_pipe_send(tp->tcp, aio); + nni_tcp_conn_send(tp->tcp, aio); nni_mtx_unlock(&tp->lk); return; } @@ -460,7 +460,7 @@ nni_tls_recv_start(nni_tls *tp) iov.iov_len = NNG_TLS_MAX_RECV_SIZE; nni_aio_set_iov(aio, 1, &iov); nni_aio_set_timeout(tp->tcp_recv, NNG_DURATION_INFINITE); - nni_plat_tcp_pipe_recv(tp->tcp, aio); + nni_tcp_conn_recv(tp->tcp, aio); } static void @@ -474,7 +474,7 @@ nni_tls_recv_cb(void *ctx) if (nni_aio_result(aio) != 0) { // Close the underlying TCP channel, but permit data we // already received to continue to be received. - nni_plat_tcp_pipe_close(tp->tcp); + nni_tcp_conn_close(tp->tcp); tp->tcp_closed = true; } else { NNI_ASSERT(tp->recvlen == 0); @@ -531,7 +531,7 @@ nni_tls_net_send(void *ctx, const unsigned char *buf, size_t len) iov.iov_len = len; nni_aio_set_iov(tp->tcp_send, 1, &iov); nni_aio_set_timeout(tp->tcp_send, NNG_DURATION_INFINITE); - nni_plat_tcp_pipe_send(tp->tcp, tp->tcp_send); + nni_tcp_conn_send(tp->tcp, tp->tcp_send); return (len); } @@ -615,25 +615,25 @@ nni_tls_recv(nni_tls *tp, nni_aio *aio) int nni_tls_peername(nni_tls *tp, nni_sockaddr *sa) { - return (nni_plat_tcp_pipe_peername(tp->tcp, sa)); + return (nni_tcp_conn_peername(tp->tcp, sa)); } int nni_tls_sockname(nni_tls *tp, nni_sockaddr *sa) { - return (nni_plat_tcp_pipe_sockname(tp->tcp, sa)); + return (nni_tcp_conn_sockname(tp->tcp, sa)); } int nni_tls_set_nodelay(nni_tls *tp, bool val) { - return (nni_plat_tcp_pipe_set_nodelay(tp->tcp, val)); + return (nni_tcp_conn_set_nodelay(tp->tcp, val)); } int nni_tls_set_keepalive(nni_tls *tp, bool val) { - return (nni_plat_tcp_pipe_set_keepalive(tp->tcp, val)); + return (nni_tcp_conn_set_keepalive(tp->tcp, val)); } static void @@ -785,7 +785,7 @@ nni_tls_close(nni_tls *tp) // connection at this point. (void) mbedtls_ssl_close_notify(&tp->ctx); } else { - nni_plat_tcp_pipe_close(tp->tcp); + nni_tcp_conn_close(tp->tcp); } nni_mtx_unlock(&tp->lk); } diff --git a/src/supplemental/tls/none/tls.c b/src/supplemental/tls/none/tls.c index 2fdc0c93..d7968758 100644 --- a/src/supplemental/tls/none/tls.c +++ b/src/supplemental/tls/none/tls.c @@ -47,7 +47,7 @@ nni_tls_fini(nni_tls *tp) } int -nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_plat_tcp_pipe *tcp) +nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp) { NNI_ARG_UNUSED(tpp); NNI_ARG_UNUSED(cfg); @@ -163,7 +163,8 @@ nng_tls_config_cert_key_file( return (NNG_ENOTSUP); } -int nng_tls_config_key(nng_tls_config *cfg, const uint8_t * key, size_t size) +int +nng_tls_config_key(nng_tls_config *cfg, const uint8_t *key, size_t size) { NNI_ARG_UNUSED(cfg); NNI_ARG_UNUSED(key); @@ -171,14 +172,14 @@ int nng_tls_config_key(nng_tls_config *cfg, const uint8_t * key, size_t size) return (NNG_ENOTSUP); } -int nng_tls_config_pass(nng_tls_config *cfg, const char *pass) +int +nng_tls_config_pass(nng_tls_config *cfg, const char *pass) { NNI_ARG_UNUSED(cfg); NNI_ARG_UNUSED(pass); return (NNG_ENOTSUP); } - int nng_tls_config_alloc(nng_tls_config **cfgp, nng_tls_mode mode) { diff --git a/src/supplemental/tls/tls_api.h b/src/supplemental/tls/tls_api.h index 8a40bcfb..53dba7fe 100644 --- a/src/supplemental/tls/tls_api.h +++ b/src/supplemental/tls/tls_api.h @@ -31,7 +31,7 @@ extern void nni_tls_config_fini(nng_tls_config *); // the configuration object is created with a hold on it. extern void nni_tls_config_hold(nng_tls_config *); -extern int nni_tls_init(nni_tls **, nng_tls_config *, nni_plat_tcp_pipe *); +extern int nni_tls_init(nni_tls **, nng_tls_config *, nni_tcp_conn *); extern void nni_tls_close(nni_tls *); extern void nni_tls_fini(nni_tls *); extern void nni_tls_send(nni_tls *, nng_aio *); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index e8aa04d0..c323da29 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -17,17 +17,18 @@ // TCP transport. Platform specific TCP operations must be // supplied as well. -typedef struct tcp_pipe tcp_pipe; -typedef struct tcp_ep tcp_ep; +typedef struct tcptran_pipe tcptran_pipe; +typedef struct tcptran_dialer tcptran_dialer; +typedef struct tcptran_listener tcptran_listener; // tcp_pipe is one end of a TCP connection. -struct tcp_pipe { - nni_plat_tcp_pipe *tpp; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool nodelay; - bool keepalive; +struct tcptran_pipe { + nni_tcp_conn *conn; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool nodelay; + bool keepalive; nni_list recvq; nni_list sendq; @@ -46,53 +47,70 @@ struct tcp_pipe { nni_mtx mtx; }; -struct tcp_ep { - nni_plat_tcp_ep *tep; - uint16_t proto; - size_t rcvmax; - bool nodelay; - bool keepalive; - nni_aio * aio; - nni_aio * user_aio; - nni_url * url; - nng_sockaddr bsa; // bound addr - nni_mtx mtx; +struct tcptran_dialer { + nni_tcp_dialer *dialer; + uint16_t proto; + uint16_t af; + size_t rcvmax; + bool nodelay; + bool keepalive; + bool resolving; + nng_sockaddr sa; + nni_aio * aio; + nni_aio * user_aio; + nni_url * url; + nni_mtx mtx; }; -static void tcp_pipe_dosend(tcp_pipe *, nni_aio *); -static void tcp_pipe_dorecv(tcp_pipe *); -static void tcp_pipe_send_cb(void *); -static void tcp_pipe_recv_cb(void *); -static void tcp_pipe_nego_cb(void *); -static void tcp_ep_cb(void *arg); +struct tcptran_listener { + nni_tcp_listener *listener; + uint16_t proto; + size_t rcvmax; + bool nodelay; + bool keepalive; + nni_aio * aio; + nni_aio * user_aio; + nni_url * url; + nng_sockaddr sa; + nng_sockaddr bsa; // bound addr + nni_mtx mtx; +}; + +static void tcptran_pipe_send_start(tcptran_pipe *); +static void tcptran_pipe_recv_start(tcptran_pipe *); +static void tcptran_pipe_send_cb(void *); +static void tcptran_pipe_recv_cb(void *); +static void tcptran_pipe_nego_cb(void *); +static void tcptran_dialer_cb(void *arg); +static void tcptran_listener_cb(void *arg); static int -tcp_tran_init(void) +tcptran_init(void) { return (0); } static void -tcp_tran_fini(void) +tcptran_fini(void) { } static void -tcp_pipe_close(void *arg) +tcptran_pipe_close(void *arg) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; nni_aio_close(p->rxaio); nni_aio_close(p->txaio); nni_aio_close(p->negaio); - nni_plat_tcp_pipe_close(p->tpp); + nni_tcp_conn_close(p->conn); } static void -tcp_pipe_stop(void *arg) +tcptran_pipe_stop(void *arg) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); @@ -100,62 +118,48 @@ tcp_pipe_stop(void *arg) } static void -tcp_pipe_fini(void *arg) +tcptran_pipe_fini(void *arg) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; nni_aio_fini(p->rxaio); nni_aio_fini(p->txaio); nni_aio_fini(p->negaio); - if (p->tpp != NULL) { - nni_plat_tcp_pipe_fini(p->tpp); - } - if (p->rxmsg) { - nni_msg_free(p->rxmsg); + if (p->conn != NULL) { + nni_tcp_conn_fini(p->conn); } - + nni_msg_free(p->rxmsg); NNI_FREE_STRUCT(p); } static int -tcp_pipe_init(tcp_pipe **pipep, tcp_ep *ep, void *tpp) +tcptran_pipe_init(tcptran_pipe **pipep, void *conn) { - tcp_pipe *p; - int rv; + tcptran_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->txaio, tcp_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, tcp_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negaio, tcp_pipe_nego_cb, p)) != 0)) { - tcp_pipe_fini(p); + if (((rv = nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->negaio, tcptran_pipe_nego_cb, p)) != 0)) { + tcptran_pipe_fini(p); return (rv); } nni_aio_list_init(&p->recvq); nni_aio_list_init(&p->sendq); - p->proto = ep->proto; - p->rcvmax = ep->rcvmax; - p->nodelay = ep->nodelay; - p->keepalive = ep->keepalive; - p->tpp = tpp; - - // We try to set the nodelay and keepalive, but if these fail for - // some reason, its not really fatal to the communication channel. - // So ignore the return values. - (void) nni_plat_tcp_pipe_set_nodelay(tpp, p->nodelay); - (void) nni_plat_tcp_pipe_set_keepalive(tpp, p->keepalive); - - *pipep = p; + p->conn = conn; + *pipep = p; return (0); } static void -tcp_cancel_nego(nni_aio *aio, int rv) +tcptran_pipe_nego_cancel(nni_aio *aio, int rv) { - tcp_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_negaio != aio) { @@ -170,11 +174,11 @@ tcp_cancel_nego(nni_aio *aio, int rv) } static void -tcp_pipe_nego_cb(void *arg) +tcptran_pipe_nego_cb(void *arg) { - tcp_pipe *p = arg; - nni_aio * aio = p->negaio; - int rv; + tcptran_pipe *p = arg; + nni_aio * aio = p->negaio; + int rv; nni_mtx_lock(&p->mtx); if ((rv = nni_aio_result(aio)) != 0) { @@ -194,7 +198,7 @@ tcp_pipe_nego_cb(void *arg) iov.iov_buf = &p->txlen[p->gottxhead]; // send it down... nni_aio_set_iov(aio, 1, &iov); - nni_plat_tcp_pipe_send(p->tpp, aio); + nni_tcp_conn_send(p->conn, aio); nni_mtx_unlock(&p->mtx); return; } @@ -203,7 +207,7 @@ tcp_pipe_nego_cb(void *arg) iov.iov_len = p->wantrxhead - p->gotrxhead; iov.iov_buf = &p->rxlen[p->gotrxhead]; nni_aio_set_iov(aio, 1, &iov); - nni_plat_tcp_pipe_recv(p->tpp, aio); + nni_tcp_conn_recv(p->conn, aio); nni_mtx_unlock(&p->mtx); return; } @@ -227,14 +231,14 @@ done: } static void -tcp_pipe_send_cb(void *arg) +tcptran_pipe_send_cb(void *arg) { - tcp_pipe *p = arg; - int rv; - nni_aio * aio; - size_t n; - nni_msg * msg; - nni_aio * txaio = p->txaio; + tcptran_pipe *p = arg; + int rv; + nni_aio * aio; + size_t n; + nni_msg * msg; + nni_aio * txaio = p->txaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->sendq); @@ -254,16 +258,14 @@ tcp_pipe_send_cb(void *arg) n = nni_aio_count(txaio); nni_aio_iov_advance(txaio, n); if (nni_aio_iov_count(txaio) > 0) { - nni_plat_tcp_pipe_send(p->tpp, txaio); + nni_tcp_conn_send(p->conn, txaio); nni_mtx_unlock(&p->mtx); return; } nni_aio_list_remove(aio); - if (!nni_list_empty(&p->sendq)) { - // schedule next send - tcp_pipe_dosend(p, nni_list_first(&p->sendq)); - } + tcptran_pipe_send_start(p); + nni_mtx_unlock(&p->mtx); msg = nni_aio_get_msg(aio); @@ -274,14 +276,14 @@ tcp_pipe_send_cb(void *arg) } static void -tcp_pipe_recv_cb(void *arg) +tcptran_pipe_recv_cb(void *arg) { - tcp_pipe *p = arg; - nni_aio * aio; - int rv; - size_t n; - nni_msg * msg; - nni_aio * rxaio = p->rxaio; + tcptran_pipe *p = arg; + nni_aio * aio; + int rv; + size_t n; + nni_msg * msg; + nni_aio * rxaio = p->rxaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->recvq); @@ -293,7 +295,7 @@ tcp_pipe_recv_cb(void *arg) n = nni_aio_count(rxaio); nni_aio_iov_advance(rxaio, n); if (nni_aio_iov_count(rxaio) > 0) { - nni_plat_tcp_pipe_recv(p->tpp, rxaio); + nni_tcp_conn_recv(p->conn, rxaio); nni_mtx_unlock(&p->mtx); return; } @@ -325,7 +327,7 @@ tcp_pipe_recv_cb(void *arg) iov.iov_len = (size_t) len; nni_aio_set_iov(rxaio, 1, &iov); - nni_plat_tcp_pipe_recv(p->tpp, rxaio); + nni_tcp_conn_recv(p->conn, rxaio); nni_mtx_unlock(&p->mtx); return; } @@ -336,7 +338,7 @@ tcp_pipe_recv_cb(void *arg) msg = p->rxmsg; p->rxmsg = NULL; if (!nni_list_empty(&p->recvq)) { - tcp_pipe_dorecv(p); + tcptran_pipe_recv_start(p); } nni_mtx_unlock(&p->mtx); @@ -357,9 +359,9 @@ recv_error: } static void -tcp_cancel_tx(nni_aio *aio, int rv) +tcptran_pipe_send_cancel(nni_aio *aio, int rv) { - tcp_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -381,14 +383,19 @@ tcp_cancel_tx(nni_aio *aio, int rv) } static void -tcp_pipe_dosend(tcp_pipe *p, nni_aio *aio) +tcptran_pipe_send_start(tcptran_pipe *p) { + nni_aio *aio; nni_aio *txaio; nni_msg *msg; int niov; nni_iov iov[3]; uint64_t len; + if ((aio = nni_list_first(&p->sendq)) == NULL) { + return; + } + // This runs to send the message. msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -411,35 +418,35 @@ tcp_pipe_dosend(tcp_pipe *p, nni_aio *aio) niov++; } nni_aio_set_iov(txaio, niov, iov); - nni_plat_tcp_pipe_send(p->tpp, txaio); + nni_tcp_conn_send(p->conn, txaio); } static void -tcp_pipe_send(void *arg, nni_aio *aio) +tcptran_pipe_send(void *arg, nni_aio *aio) { - tcp_pipe *p = arg; - int rv; + tcptran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tcp_cancel_tx, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tcptran_pipe_send_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { - tcp_pipe_dosend(p, aio); + tcptran_pipe_send_start(p); } nni_mtx_unlock(&p->mtx); } static void -tcp_cancel_rx(nni_aio *aio, int rv) +tcptran_pipe_recv_cancel(nni_aio *aio, int rv) { - tcp_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -460,7 +467,7 @@ tcp_cancel_rx(nni_aio *aio, int rv) } static void -tcp_pipe_dorecv(tcp_pipe *p) +tcptran_pipe_recv_start(tcptran_pipe *p) { nni_aio *rxaio; nni_iov iov; @@ -472,20 +479,20 @@ tcp_pipe_dorecv(tcp_pipe *p) iov.iov_len = sizeof(p->rxlen); nni_aio_set_iov(rxaio, 1, &iov); - nni_plat_tcp_pipe_recv(p->tpp, rxaio); + nni_tcp_conn_recv(p->conn, rxaio); } static void -tcp_pipe_recv(void *arg, nni_aio *aio) +tcptran_pipe_recv(void *arg, nni_aio *aio) { - tcp_pipe *p = arg; - int rv; + tcptran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tcp_cancel_rx, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tcptran_pipe_recv_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; @@ -493,75 +500,74 @@ tcp_pipe_recv(void *arg, nni_aio *aio) nni_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { - tcp_pipe_dorecv(p); + tcptran_pipe_recv_start(p); } nni_mtx_unlock(&p->mtx); } static uint16_t -tcp_pipe_peer(void *arg) +tcptran_pipe_peer(void *arg) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; return (p->peer); } static int -tcp_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_pipe * p = arg; - int rv; - nni_sockaddr sa; + tcptran_pipe *p = arg; + int rv; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); - if ((rv = nni_plat_tcp_pipe_sockname(p->tpp, &sa)) == 0) { + if ((rv = nni_tcp_conn_sockname(p->conn, &sa)) == 0) { rv = nni_copyout_sockaddr(&sa, v, szp, t); } return (rv); } static int -tcp_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_pipe * p = arg; - int rv; - nni_sockaddr sa; + tcptran_pipe *p = arg; + int rv; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); - if ((rv = nni_plat_tcp_pipe_peername(p->tpp, &sa)) == 0) { + if ((rv = nni_tcp_conn_peername(p->conn, &sa)) == 0) { rv = nni_copyout_sockaddr(&sa, v, szp, t); } return (rv); } static int -tcp_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; return (nni_copyout_bool(p->keepalive, v, szp, t)); } static int -tcp_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; return (nni_copyout_bool(p->nodelay, v, szp, t)); } -// Note that the url *must* be in a modifiable buffer. static void -tcp_pipe_start(void *arg, nni_aio *aio) +tcptran_pipe_start(void *arg, nni_aio *aio) { - tcp_pipe *p = arg; - nni_aio * negaio; - nni_iov iov; - int rv; + tcptran_pipe *p = arg; + nni_aio * negaio; + nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tcp_cancel_nego, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tcptran_pipe_nego_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; @@ -582,35 +588,39 @@ tcp_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; nni_aio_set_iov(negaio, 1, &iov); - nni_plat_tcp_pipe_send(p->tpp, negaio); + nni_tcp_conn_send(p->conn, negaio); nni_mtx_unlock(&p->mtx); } static void -tcp_ep_fini(void *arg) +tcptran_dialer_fini(void *arg) { - tcp_ep *ep = arg; + tcptran_dialer *d = arg; - nni_aio_stop(ep->aio); - if (ep->tep != NULL) { - nni_plat_tcp_ep_fini(ep->tep); + nni_aio_stop(d->aio); + if (d->dialer != NULL) { + nni_tcp_dialer_fini(d->dialer); } - nni_aio_fini(ep->aio); - nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); + nni_aio_fini(d->aio); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +tcptran_dialer_close(void *arg) +{ + tcptran_dialer *d = arg; + + nni_aio_close(d->aio); + nni_tcp_dialer_close(d->dialer); } static int -tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) +tcptran_dialer_init(void **dp, nni_url *url, nni_sock *sock) { - tcp_ep * ep; - int rv; - char * host; - char * serv; - nni_sockaddr rsa, lsa; - nni_aio * aio; - int passive; - uint16_t af; + tcptran_dialer *d; + int rv; + uint16_t af; if (strcmp(url->u_scheme, "tcp") == 0) { af = NNG_AF_UNSPEC; @@ -627,345 +637,543 @@ tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (NNG_EADDRINVAL); } if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) || - (url->u_query != NULL)) { + (url->u_query != NULL) || (strlen(url->u_hostname) == 0) || + (strlen(url->u_port) == 0)) { return (NNG_EADDRINVAL); } - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); } + nni_mtx_init(&d->mtx); - if (strlen(url->u_hostname) == 0) { - host = NULL; - } else { - host = url->u_hostname; + if (((rv = nni_tcp_dialer_init(&d->dialer)) != 0) || + ((rv = nni_aio_init(&d->aio, tcptran_dialer_cb, d)) != 0)) { + tcptran_dialer_fini(d); + return (rv); } - if (strlen(url->u_port) == 0) { - serv = NULL; - } else { - serv = url->u_port; - } - // XXX: arguably we could defer this part to the point we do a bind - // or connect! - if (mode == NNI_EP_MODE_DIAL) { - passive = 0; - lsa.s_family = af; - nni_aio_set_input(aio, 0, &rsa); - if ((host == NULL) || (serv == NULL)) { - nni_aio_fini(aio); - return (NNG_EADDRINVAL); + d->url = url; + d->proto = nni_sock_proto_id(sock); + d->nodelay = true; + d->keepalive = false; + d->af = af; + + *dp = d; + return (0); +} + +static void +tcptran_dialer_cb(void *arg) +{ + tcptran_dialer *d = arg; + tcptran_pipe * p; + nni_tcp_conn * conn; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + aio = d->user_aio; + rv = nni_aio_result(d->aio); + + if (aio == NULL) { + nni_mtx_unlock(&d->mtx); + if ((rv == 0) && !d->resolving) { + conn = nni_aio_get_output(d->aio, 0); + nni_tcp_conn_fini(conn); } - } else { - passive = 1; - rsa.s_family = af; - nni_aio_set_input(aio, 0, &lsa); + return; } - nni_plat_tcp_resolv(host, serv, af, passive, aio); - nni_aio_wait(aio); - if ((rv = nni_aio_result(aio)) != 0) { - nni_aio_fini(aio); - return (rv); + if (rv != 0) { + d->user_aio = NULL; + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; } - nni_aio_fini(aio); + if (d->resolving) { + // Name resolution complete. Now go to next step. + d->resolving = false; + nni_tcp_dialer_dial(d->dialer, &d->sa, d->aio); + nni_mtx_unlock(&d->mtx); + return; + } - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); + d->user_aio = NULL; + conn = nni_aio_get_output(d->aio, 0); + NNI_ASSERT(conn != NULL); + if ((rv = tcptran_pipe_init(&p, conn)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_tcp_conn_fini(conn); + nni_aio_finish_error(aio, rv); + return; } - nni_mtx_init(&ep->mtx); - ep->url = url; - if ((rv = nni_plat_tcp_ep_init(&ep->tep, &lsa, &rsa, mode)) != 0) { - tcp_ep_fini(ep); - return (rv); + p->proto = d->proto; + p->rcvmax = d->rcvmax; + p->nodelay = d->nodelay; + p->keepalive = d->keepalive; + nni_mtx_unlock(&d->mtx); + + (void) nni_tcp_conn_set_nodelay(conn, p->nodelay); + (void) nni_tcp_conn_set_keepalive(conn, p->keepalive); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); +} + +static void +tcptran_dialer_cancel(nni_aio *aio, int rv) +{ + tcptran_dialer *d = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&d->mtx); + if (d->user_aio != aio) { + nni_mtx_unlock(&d->mtx); + return; } + d->user_aio = NULL; + nni_mtx_unlock(&d->mtx); - if ((rv = nni_aio_init(&ep->aio, tcp_ep_cb, ep)) != 0) { - tcp_ep_fini(ep); - return (rv); + nni_aio_abort(d->aio, rv); + nni_aio_finish_error(aio, rv); +} + +static void +tcptran_dialer_connect(void *arg, nni_aio *aio) +{ + tcptran_dialer *d = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; } - ep->proto = nni_sock_proto_id(sock); - ep->nodelay = true; - ep->keepalive = false; + nni_mtx_lock(&d->mtx); + NNI_ASSERT(d->user_aio == NULL); - *epp = ep; - return (0); + if ((rv = nni_aio_schedule(aio, tcptran_dialer_cancel, d)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; + } + d->user_aio = aio; + + d->resolving = true; + + // Start the name resolution. Callback will see resolving, and then + // switch to doing actual connect. + nni_aio_set_input(d->aio, 0, &d->sa); + nni_tcp_resolv(d->url->u_hostname, d->url->u_port, d->af, 0, d->aio); + nni_mtx_unlock(&d->mtx); } static int -tcp_dialer_init(void **epp, nni_url *url, nni_sock *sock) +tcptran_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) { - return (tcp_ep_init(epp, url, sock, NNI_EP_MODE_DIAL)); + tcptran_dialer *d = arg; + + return (nni_copyout_str(d->url->u_rawurl, v, szp, t)); } static int -tcp_listener_init(void **epp, nni_url *url, nni_sock *sock) +tcptran_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { - return (tcp_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN)); + tcptran_dialer *d = arg; + return (nni_copyout_size(d->rcvmax, v, szp, t)); } -static void -tcp_ep_close(void *arg) +static int +tcptran_dialer_set_recvmaxsz( + void *arg, const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - - nni_aio_close(ep->aio); + tcptran_dialer *d = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->rcvmax = val; + nni_mtx_unlock(&d->mtx); + } + return (rv); +} - nni_mtx_lock(&ep->mtx); - nni_plat_tcp_ep_close(ep->tep); - nni_mtx_unlock(&ep->mtx); +static int +tcptran_dialer_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + tcptran_dialer *d = arg; + int rv; + nni_mtx_lock(&d->mtx); + rv = nni_copyout_bool(d->nodelay, v, szp, t); + nni_mtx_unlock(&d->mtx); + return (rv); } static int -tcp_ep_bind(void *arg) +tcptran_dialer_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - int rv; + tcptran_dialer *d = arg; + bool val; + int rv; + if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->nodelay = val; + nni_mtx_unlock(&d->mtx); + } + return (rv); +} - nni_mtx_lock(&ep->mtx); - rv = nni_plat_tcp_ep_listen(ep->tep, &ep->bsa); - nni_mtx_unlock(&ep->mtx); +static int +tcptran_dialer_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + tcptran_dialer *d = arg; + return (nni_copyout_bool(d->keepalive, v, szp, t)); +} +static int +tcptran_dialer_set_keepalive( + void *arg, const void *v, size_t sz, nni_opt_type t) +{ + tcptran_dialer *d = arg; + bool val; + int rv; + if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->keepalive = val; + nni_mtx_unlock(&d->mtx); + } return (rv); } static void -tcp_ep_finish(tcp_ep *ep) +tcptran_listener_fini(void *arg) { - nni_aio * aio; - int rv; - tcp_pipe *pipe = NULL; + tcptran_listener *l = arg; - if ((rv = nni_aio_result(ep->aio)) != 0) { - goto done; + nni_aio_stop(l->aio); + if (l->listener != NULL) { + nni_tcp_listener_fini(l->listener); + } + nni_aio_fini(l->aio); + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +static int +tcptran_listener_init(void **lp, nni_url *url, nni_sock *sock) +{ + tcptran_listener *l; + int rv; + char * host; + nni_aio * aio; + uint16_t af; + + if (strcmp(url->u_scheme, "tcp") == 0) { + af = NNG_AF_UNSPEC; + } else if (strcmp(url->u_scheme, "tcp4") == 0) { + af = NNG_AF_INET; + } else if (strcmp(url->u_scheme, "tcp6") == 0) { + af = NNG_AF_INET6; + } else { + return (NNG_EADDRINVAL); } - NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); - // Attempt to allocate the parent pipe. If this fails we'll - // drop the connection (ENOMEM probably). - rv = tcp_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); + // Check for invalid URL components. + if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) { + return (NNG_EADDRINVAL); + } + if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) || + (url->u_query != NULL)) { + return (NNG_EADDRINVAL); + } -done: - aio = ep->user_aio; - ep->user_aio = NULL; + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&l->mtx); + l->url = url; - if ((aio != NULL) && (rv == 0)) { - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); - return; + if (strlen(url->u_hostname) == 0) { + host = NULL; + } else { + host = url->u_hostname; } - if (pipe != NULL) { - tcp_pipe_fini(pipe); + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + tcptran_listener_fini(l); + return (rv); } - if (aio != NULL) { - NNI_ASSERT(rv != 0); - nni_aio_finish_error(aio, rv); + + // XXX: We are doing lookup at listener initialization. There is + // a valid argument that this should be done at bind time, but that + // would require making bind asynchronous. In some ways this would + // be worse than the cost of just waiting here. We always recommend + // using local IP addresses rather than names when possible. + + nni_aio_set_input(aio, 0, &l->sa); + + nni_tcp_resolv(host, url->u_port, af, 1, aio); + nni_aio_wait(aio); + rv = nni_aio_result(aio); + nni_aio_fini(aio); + + if (rv != 0) { + tcptran_listener_fini(l); + return (rv); + } + + if ((rv = nni_tcp_listener_init(&l->listener)) != 0) { + tcptran_listener_fini(l); + return (rv); + } + + if ((rv = nni_aio_init(&l->aio, tcptran_listener_cb, l)) != 0) { + tcptran_listener_fini(l); + return (rv); } + l->proto = nni_sock_proto_id(sock); + l->nodelay = true; + l->keepalive = false; + l->bsa = l->sa; + + *lp = l; + return (0); } static void -tcp_ep_cb(void *arg) +tcptran_listener_close(void *arg) { - tcp_ep *ep = arg; + tcptran_listener *l = arg; - nni_mtx_lock(&ep->mtx); - tcp_ep_finish(ep); - nni_mtx_unlock(&ep->mtx); + nni_aio_close(l->aio); + nni_tcp_listener_close(l->listener); } -static void -tcp_cancel_ep(nni_aio *aio, int rv) +static int +tcptran_listener_bind(void *arg) { - tcp_ep *ep = nni_aio_get_prov_data(aio); + tcptran_listener *l = arg; + int rv; - nni_mtx_lock(&ep->mtx); - if (ep->user_aio != aio) { - nni_mtx_unlock(&ep->mtx); - return; - } - ep->user_aio = NULL; - nni_mtx_unlock(&ep->mtx); + l->bsa = l->sa; + nni_mtx_lock(&l->mtx); + rv = nni_tcp_listener_listen(l->listener, &l->bsa); + nni_mtx_unlock(&l->mtx); - nni_aio_abort(ep->aio, rv); - nni_aio_finish_error(aio, rv); + return (rv); } static void -tcp_ep_accept(void *arg, nni_aio *aio) +tcptran_listener_cb(void *arg) { - tcp_ep *ep = arg; - int rv; + tcptran_listener *l = arg; + nni_aio * aio; + int rv; + tcptran_pipe * p = NULL; + nni_tcp_conn * conn; + + nni_mtx_lock(&l->mtx); + rv = nni_aio_result(l->aio); + aio = l->user_aio; + l->user_aio = NULL; + + if (aio == NULL) { + nni_mtx_unlock(&l->mtx); + if (rv == 0) { + conn = nni_aio_get_output(l->aio, 0); + nni_tcp_conn_fini(conn); + } + return; + } - if (nni_aio_begin(aio) != 0) { + if (rv != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); return; } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - if ((rv = nni_aio_schedule(aio, tcp_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + conn = nni_aio_get_output(l->aio, 0); + + NNI_ASSERT(conn != NULL); + if ((rv = tcptran_pipe_init(&p, conn)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_tcp_conn_fini(conn); nni_aio_finish_error(aio, rv); return; } - ep->user_aio = aio; - nni_plat_tcp_ep_accept(ep->tep, ep->aio); - nni_mtx_unlock(&ep->mtx); + p->proto = l->proto; + p->rcvmax = l->rcvmax; + p->nodelay = l->nodelay; + p->keepalive = l->keepalive; + nni_mtx_unlock(&l->mtx); + + (void) nni_tcp_conn_set_nodelay(conn, p->nodelay); + (void) nni_tcp_conn_set_keepalive(conn, p->keepalive); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } static void -tcp_ep_connect(void *arg, nni_aio *aio) +tcptran_listener_cancel(nni_aio *aio, int rv) { - tcp_ep *ep = arg; - int rv; + tcptran_listener *l = nni_aio_get_prov_data(aio); - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - - if ((rv = nni_aio_schedule(aio, tcp_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); - nni_aio_finish_error(aio, rv); + nni_mtx_lock(&l->mtx); + if (l->user_aio != aio) { + nni_mtx_unlock(&l->mtx); return; } - ep->user_aio = aio; + l->user_aio = NULL; + nni_mtx_unlock(&l->mtx); - nni_plat_tcp_ep_connect(ep->tep, ep->aio); - nni_mtx_unlock(&ep->mtx); + nni_aio_abort(l->aio, rv); + nni_aio_finish_error(aio, rv); } -static int -tcp_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +static void +tcptran_listener_accept(void *arg, nni_aio *aio) { - tcp_ep *ep = arg; - size_t val; - int rv; - if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { - nni_mtx_lock(&ep->mtx); - ep->rcvmax = val; - nni_mtx_unlock(&ep->mtx); + tcptran_listener *l = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; } - return (rv); -} + nni_mtx_lock(&l->mtx); + NNI_ASSERT(l->user_aio == NULL); -static int -tcp_ep_chk_recvmaxsz(const void *v, size_t sz, nni_opt_type t) -{ - return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); + if ((rv = nni_aio_schedule(aio, tcptran_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + l->user_aio = aio; + + nni_tcp_listener_accept(l->listener, l->aio); + nni_mtx_unlock(&l->mtx); } static int -tcp_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t) +tcptran_listener_set_nodelay( + void *arg, const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - bool val; - int rv; + tcptran_listener *l = arg; + bool val; + int rv; if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { - nni_mtx_lock(&ep->mtx); - ep->nodelay = val; - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->mtx); + l->nodelay = val; + nni_mtx_unlock(&l->mtx); } return (rv); } static int -tcp_ep_chk_bool(const void *v, size_t sz, nni_opt_type t) +tcptran_listener_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) { - return (nni_copyin_bool(NULL, v, sz, t)); + tcptran_listener *l = arg; + int rv; + nni_mtx_lock(&l->mtx); + rv = nni_copyout_bool(l->nodelay, v, szp, t); + nni_mtx_unlock(&l->mtx); + return (rv); } static int -tcp_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_listener_set_recvmaxsz( + void *arg, const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_bool(ep->nodelay, v, szp, t); - nni_mtx_unlock(&ep->mtx); + tcptran_listener *l = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&l->mtx); + l->rcvmax = val; + nni_mtx_unlock(&l->mtx); + } return (rv); } static int -tcp_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t) +tcptran_listener_set_keepalive( + void *arg, const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - bool val; - int rv; + tcptran_listener *l = arg; + bool val; + int rv; if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { - nni_mtx_lock(&ep->mtx); - ep->keepalive = val; - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->mtx); + l->keepalive = val; + nni_mtx_unlock(&l->mtx); } return (rv); } static int -tcp_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_listener_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_bool(ep->keepalive, v, szp, t); - nni_mtx_unlock(&ep->mtx); + tcptran_listener *l = arg; + int rv; + nni_mtx_lock(&l->mtx); + rv = nni_copyout_bool(l->keepalive, v, szp, t); + nni_mtx_unlock(&l->mtx); return (rv); } static int -tcp_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_ep *ep = arg; + tcptran_listener *l = arg; + char ustr[128]; + char ipstr[48]; // max for IPv6 addresses including [] + char portstr[6]; // max for 16-bit port - return (nni_copyout_str(ep->url->u_rawurl, v, szp, t)); + nni_ntop(&l->bsa, ipstr, portstr); + snprintf(ustr, sizeof(ustr), "tcp://%s:%s", ipstr, portstr); + return (nni_copyout_str(ustr, v, szp, t)); } static int -tcp_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_listener_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_ep *ep = arg; - char ustr[128]; - char ipstr[48]; // max for IPv6 addresses including [] - char portstr[6]; // max for 16-bit port + tcptran_listener *l = arg; + return (nni_copyout_size(l->rcvmax, v, szp, t)); +} - nni_plat_tcp_ntop(&ep->bsa, ipstr, portstr); - snprintf(ustr, sizeof(ustr), "tcp://%s:%s", ipstr, portstr); - return (nni_copyout_str(ustr, v, szp, t)); +static int +tcptran_check_bool(const void *v, size_t sz, nni_opt_type t) +{ + return (nni_copyin_bool(NULL, v, sz, t)); } static int -tcp_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_check_recvmaxsz(const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_size(ep->rcvmax, v, szp, t); - nni_mtx_unlock(&ep->mtx); - return (rv); + return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); } -static nni_tran_option tcp_pipe_options[] = { +static nni_tran_option tcptran_pipe_options[] = { { .o_name = NNG_OPT_LOCADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = tcp_pipe_get_locaddr, + .o_get = tcptran_pipe_get_locaddr, }, { .o_name = NNG_OPT_REMADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = tcp_pipe_get_remaddr, + .o_get = tcptran_pipe_get_remaddr, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_pipe_get_keepalive, + .o_get = tcptran_pipe_get_keepalive, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_pipe_get_nodelay, + .o_get = tcptran_pipe_get_nodelay, }, // terminate list { @@ -973,43 +1181,43 @@ static nni_tran_option tcp_pipe_options[] = { }, }; -static nni_tran_pipe_ops tcp_pipe_ops = { - .p_fini = tcp_pipe_fini, - .p_start = tcp_pipe_start, - .p_stop = tcp_pipe_stop, - .p_send = tcp_pipe_send, - .p_recv = tcp_pipe_recv, - .p_close = tcp_pipe_close, - .p_peer = tcp_pipe_peer, - .p_options = tcp_pipe_options, +static nni_tran_pipe_ops tcptran_pipe_ops = { + .p_fini = tcptran_pipe_fini, + .p_start = tcptran_pipe_start, + .p_stop = tcptran_pipe_stop, + .p_send = tcptran_pipe_send, + .p_recv = tcptran_pipe_recv, + .p_close = tcptran_pipe_close, + .p_peer = tcptran_pipe_peer, + .p_options = tcptran_pipe_options, }; -static nni_tran_option tcp_dialer_options[] = { +static nni_tran_option tcptran_dialer_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = tcp_ep_get_recvmaxsz, - .o_set = tcp_ep_set_recvmaxsz, - .o_chk = tcp_ep_chk_recvmaxsz, + .o_get = tcptran_dialer_get_recvmaxsz, + .o_set = tcptran_dialer_set_recvmaxsz, + .o_chk = tcptran_check_recvmaxsz, }, { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, - .o_get = tcp_dialer_get_url, + .o_get = tcptran_dialer_get_url, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_ep_get_nodelay, - .o_set = tcp_ep_set_nodelay, - .o_chk = tcp_ep_chk_bool, + .o_get = tcptran_dialer_get_nodelay, + .o_set = tcptran_dialer_set_nodelay, + .o_chk = tcptran_check_bool, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_ep_get_keepalive, - .o_set = tcp_ep_set_keepalive, - .o_chk = tcp_ep_chk_bool, + .o_get = tcptran_dialer_get_keepalive, + .o_set = tcptran_dialer_set_keepalive, + .o_chk = tcptran_check_bool, }, // terminate list { @@ -1017,32 +1225,32 @@ static nni_tran_option tcp_dialer_options[] = { }, }; -static nni_tran_option tcp_listener_options[] = { +static nni_tran_option tcptran_listener_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = tcp_ep_get_recvmaxsz, - .o_set = tcp_ep_set_recvmaxsz, - .o_chk = tcp_ep_chk_recvmaxsz, + .o_get = tcptran_listener_get_recvmaxsz, + .o_set = tcptran_listener_set_recvmaxsz, + .o_chk = tcptran_check_recvmaxsz, }, { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, - .o_get = tcp_listener_get_url, + .o_get = tcptran_listener_get_url, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_ep_get_nodelay, - .o_set = tcp_ep_set_nodelay, - .o_chk = tcp_ep_chk_bool, + .o_get = tcptran_listener_get_nodelay, + .o_set = tcptran_listener_set_nodelay, + .o_chk = tcptran_check_bool, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_ep_get_keepalive, - .o_set = tcp_ep_set_keepalive, - .o_chk = tcp_ep_chk_bool, + .o_get = tcptran_listener_get_keepalive, + .o_set = tcptran_listener_set_keepalive, + .o_chk = tcptran_check_bool, }, // terminate list { @@ -1050,51 +1258,51 @@ static nni_tran_option tcp_listener_options[] = { }, }; -static nni_tran_dialer_ops tcp_dialer_ops = { - .d_init = tcp_dialer_init, - .d_fini = tcp_ep_fini, - .d_connect = tcp_ep_connect, - .d_close = tcp_ep_close, - .d_options = tcp_dialer_options, +static nni_tran_dialer_ops tcptran_dialer_ops = { + .d_init = tcptran_dialer_init, + .d_fini = tcptran_dialer_fini, + .d_connect = tcptran_dialer_connect, + .d_close = tcptran_dialer_close, + .d_options = tcptran_dialer_options, }; -static nni_tran_listener_ops tcp_listener_ops = { - .l_init = tcp_listener_init, - .l_fini = tcp_ep_fini, - .l_bind = tcp_ep_bind, - .l_accept = tcp_ep_accept, - .l_close = tcp_ep_close, - .l_options = tcp_listener_options, +static nni_tran_listener_ops tcptran_listener_ops = { + .l_init = tcptran_listener_init, + .l_fini = tcptran_listener_fini, + .l_bind = tcptran_listener_bind, + .l_accept = tcptran_listener_accept, + .l_close = tcptran_listener_close, + .l_options = tcptran_listener_options, }; static nni_tran tcp_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tcp", - .tran_dialer = &tcp_dialer_ops, - .tran_listener = &tcp_listener_ops, - .tran_pipe = &tcp_pipe_ops, - .tran_init = tcp_tran_init, - .tran_fini = tcp_tran_fini, + .tran_dialer = &tcptran_dialer_ops, + .tran_listener = &tcptran_listener_ops, + .tran_pipe = &tcptran_pipe_ops, + .tran_init = tcptran_init, + .tran_fini = tcptran_fini, }; static nni_tran tcp4_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tcp4", - .tran_dialer = &tcp_dialer_ops, - .tran_listener = &tcp_listener_ops, - .tran_pipe = &tcp_pipe_ops, - .tran_init = tcp_tran_init, - .tran_fini = tcp_tran_fini, + .tran_dialer = &tcptran_dialer_ops, + .tran_listener = &tcptran_listener_ops, + .tran_pipe = &tcptran_pipe_ops, + .tran_init = tcptran_init, + .tran_fini = tcptran_fini, }; static nni_tran tcp6_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tcp6", - .tran_dialer = &tcp_dialer_ops, - .tran_listener = &tcp_listener_ops, - .tran_pipe = &tcp_pipe_ops, - .tran_init = tcp_tran_init, - .tran_fini = tcp_tran_fini, + .tran_dialer = &tcptran_dialer_ops, + .tran_listener = &tcptran_listener_ops, + .tran_pipe = &tcptran_pipe_ops, + .tran_init = tcptran_init, + .tran_fini = tcptran_fini, }; int diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 5e1a1e8d..b1bdddb8 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -22,17 +22,19 @@ // supplied as well, and uses the supplemental TLS v1.2 code. It is not // an accident that this very closely resembles the TCP transport itself. -typedef struct tls_pipe tls_pipe; -typedef struct tls_ep tls_ep; - -// tls_pipe is one end of a TLS connection. -struct tls_pipe { - nni_plat_tcp_pipe *tcp; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool nodelay; - bool keepalive; +typedef struct tlstran_ep tlstran_ep; +typedef struct tlstran_dialer tlstran_dialer; +typedef struct tlstran_listener tlstran_listener; +typedef struct tlstran_pipe tlstran_pipe; + +// tlstran_pipe is one end of a TLS connection. +struct tlstran_pipe { + nni_tls *tls; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool nodelay; + bool keepalive; nni_list sendq; nni_list recvq; @@ -49,47 +51,62 @@ struct tls_pipe { nni_aio *negaio; nni_msg *rxmsg; nni_mtx mtx; - nni_tls *tls; }; -struct tls_ep { - nni_plat_tcp_ep *tep; - uint16_t proto; - size_t rcvmax; - int authmode; - nni_aio * aio; - nni_aio * user_aio; - nni_mtx mtx; - nng_tls_config * cfg; - nng_sockaddr bsa; - nni_url * url; - int mode; - bool nodelay; - bool keepalive; +// Stuff that is common to both dialers and listeners. +struct tlstran_ep { + uint16_t proto; + size_t rcvmax; + bool nodelay; + bool keepalive; + int authmode; + nng_tls_config *cfg; + nni_url * url; + nni_mtx mtx; +}; + +struct tlstran_dialer { + tlstran_ep ep; // must be first + nni_tcp_dialer *dialer; + uint16_t af; + nni_aio * aio; + nni_aio * user_aio; + bool resolving; + nng_sockaddr sa; +}; + +struct tlstran_listener { + tlstran_ep ep; // must be first + nni_tcp_listener *listener; + nni_aio * aio; + nni_aio * user_aio; + nng_sockaddr sa; + nng_sockaddr bsa; // bound addr }; -static void tls_pipe_dorecv(tls_pipe *); -static void tls_pipe_dosend(tls_pipe *, nni_aio *); -static void tls_pipe_send_cb(void *); -static void tls_pipe_recv_cb(void *); -static void tls_pipe_nego_cb(void *); -static void tls_ep_cb(void *arg); +static void tlstran_pipe_send_start(tlstran_pipe *); +static void tlstran_pipe_recv_start(tlstran_pipe *); +static void tlstran_pipe_send_cb(void *); +static void tlstran_pipe_recv_cb(void *); +static void tlstran_pipe_nego_cb(void *); +static void tlstran_dialer_cb(void *); +static void tlstran_listener_cb(void *); static int -tls_tran_init(void) +tlstran_init(void) { return (0); } static void -tls_tran_fini(void) +tlstran_fini(void) { } static void -tls_pipe_close(void *arg) +tlstran_pipe_close(void *arg) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; nni_aio_close(p->rxaio); nni_aio_close(p->txaio); @@ -99,9 +116,9 @@ tls_pipe_close(void *arg) } static void -tls_pipe_stop(void *arg) +tlstran_pipe_stop(void *arg) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); @@ -109,14 +126,13 @@ tls_pipe_stop(void *arg) } static void -tls_pipe_fini(void *arg) +tlstran_pipe_fini(void *arg) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; nni_aio_fini(p->rxaio); nni_aio_fini(p->txaio); nni_aio_fini(p->negaio); - if (p->tls != NULL) { nni_tls_fini(p->tls); } @@ -125,41 +141,34 @@ tls_pipe_fini(void *arg) } static int -tls_pipe_init(tls_pipe **pipep, tls_ep *ep, void *tpp) +tlstran_pipe_init(tlstran_pipe **pipep, nni_tls *tls) { - tls_pipe * p; - nni_plat_tcp_pipe *tcp = tpp; - int rv; + tlstran_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_tls_init(&p->tls, ep->cfg, tcp)) != 0) || - ((rv = nni_aio_init(&p->txaio, tls_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, tls_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negaio, tls_pipe_nego_cb, p)) != 0)) { - tls_pipe_fini(p); + if (((rv = nni_aio_init(&p->txaio, tlstran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, tlstran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->negaio, tlstran_pipe_nego_cb, p)) != 0)) { + tlstran_pipe_fini(p); return (rv); } nni_aio_list_init(&p->recvq); nni_aio_list_init(&p->sendq); - p->proto = ep->proto; - p->rcvmax = ep->rcvmax; - p->tcp = tcp; - p->keepalive = ep->keepalive; - p->nodelay = ep->nodelay; - + p->tls = tls; *pipep = p; return (0); } static void -tls_cancel_nego(nni_aio *aio, int rv) +tlstran_pipe_cancel_nego(nni_aio *aio, int rv) { - tls_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_negaio != aio) { @@ -174,11 +183,11 @@ tls_cancel_nego(nni_aio *aio, int rv) } static void -tls_pipe_nego_cb(void *arg) +tlstran_pipe_nego_cb(void *arg) { - tls_pipe *p = arg; - nni_aio * aio = p->negaio; - int rv; + tlstran_pipe *p = arg; + nni_aio * aio = p->negaio; + int rv; nni_mtx_lock(&p->mtx); if ((rv = nni_aio_result(aio)) != 0) { @@ -237,14 +246,14 @@ done: } static void -tls_pipe_send_cb(void *arg) +tlstran_pipe_send_cb(void *arg) { - tls_pipe *p = arg; - int rv; - nni_aio * aio; - size_t n; - nni_msg * msg; - nni_aio * txaio = p->txaio; + tlstran_pipe *p = arg; + int rv; + nni_aio * aio; + size_t n; + nni_msg * msg; + nni_aio * txaio = p->txaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->sendq); @@ -269,9 +278,7 @@ tls_pipe_send_cb(void *arg) return; } nni_aio_list_remove(aio); - if (!nni_list_empty(&p->sendq)) { - tls_pipe_dosend(p, nni_list_first(&p->sendq)); - } + tlstran_pipe_send_start(p); nni_mtx_unlock(&p->mtx); msg = nni_aio_get_msg(aio); @@ -282,14 +289,14 @@ tls_pipe_send_cb(void *arg) } static void -tls_pipe_recv_cb(void *arg) +tlstran_pipe_recv_cb(void *arg) { - tls_pipe *p = arg; - nni_aio * aio; - int rv; - size_t n; - nni_msg * msg; - nni_aio * rxaio = p->rxaio; + tlstran_pipe *p = arg; + nni_aio * aio; + int rv; + size_t n; + nni_msg * msg; + nni_aio * rxaio = p->rxaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->recvq); @@ -345,7 +352,7 @@ tls_pipe_recv_cb(void *arg) msg = p->rxmsg; p->rxmsg = NULL; if (!nni_list_empty(&p->recvq)) { - tls_pipe_dorecv(p); + tlstran_pipe_recv_start(p); } nni_mtx_unlock(&p->mtx); @@ -365,9 +372,9 @@ recv_error: } static void -tls_cancel_tx(nni_aio *aio, int rv) +tlstran_pipe_send_cancel(nni_aio *aio, int rv) { - tls_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -389,14 +396,19 @@ tls_cancel_tx(nni_aio *aio, int rv) } static void -tls_pipe_dosend(tls_pipe *p, nni_aio *aio) +tlstran_pipe_send_start(tlstran_pipe *p) { nni_aio *txaio; + nni_aio *aio; nni_msg *msg; int niov; nni_iov iov[3]; uint64_t len; + if ((aio = nni_list_first(&p->sendq)) == NULL) { + return; + } + msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -423,31 +435,31 @@ tls_pipe_dosend(tls_pipe *p, nni_aio *aio) } static void -tls_pipe_send(void *arg, nni_aio *aio) +tlstran_pipe_send(void *arg, nni_aio *aio) { - tls_pipe *p = arg; - int rv; + tlstran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tls_cancel_tx, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tlstran_pipe_send_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { - tls_pipe_dosend(p, aio); + tlstran_pipe_send_start(p); } nni_mtx_unlock(&p->mtx); } static void -tls_cancel_rx(nni_aio *aio, int rv) +tlstran_pipe_recv_cancel(nni_aio *aio, int rv) { - tls_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -468,7 +480,7 @@ tls_cancel_rx(nni_aio *aio, int rv) } static void -tls_pipe_dorecv(tls_pipe *p) +tlstran_pipe_recv_start(tlstran_pipe *p) { nni_aio *rxaio; nni_iov iov; @@ -484,16 +496,16 @@ tls_pipe_dorecv(tls_pipe *p) } static void -tls_pipe_recv(void *arg, nni_aio *aio) +tlstran_pipe_recv(void *arg, nni_aio *aio) { - tls_pipe *p = arg; - int rv; + tlstran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tls_cancel_rx, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tlstran_pipe_recv_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; @@ -501,25 +513,25 @@ tls_pipe_recv(void *arg, nni_aio *aio) nni_aio_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { - tls_pipe_dorecv(p); + tlstran_pipe_recv_start(p); } nni_mtx_unlock(&p->mtx); } static uint16_t -tls_pipe_peer(void *arg) +tlstran_pipe_peer(void *arg) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; return (p->peer); } static int -tls_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_pipe * p = arg; - int rv; - nni_sockaddr sa; + tlstran_pipe *p = arg; + int rv; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_tls_sockname(p->tls, &sa)) == 0) { @@ -529,11 +541,11 @@ tls_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tls_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_pipe * p = arg; - int rv; - nni_sockaddr sa; + tlstran_pipe *p = arg; + int rv; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_tls_peername(p->tls, &sa)) == 0) { @@ -543,32 +555,32 @@ tls_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tls_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; return (nni_copyout_bool(p->keepalive, v, szp, t)); } static int -tls_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; return (nni_copyout_bool(p->nodelay, v, szp, t)); } static void -tls_pipe_start(void *arg, nni_aio *aio) +tlstran_pipe_start(void *arg, nni_aio *aio) { - tls_pipe *p = arg; - nni_aio * negaio; - nni_iov iov; - int rv; + tlstran_pipe *p = arg; + nni_aio * negaio; + nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tls_cancel_nego, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tlstran_pipe_cancel_nego, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; @@ -594,35 +606,39 @@ tls_pipe_start(void *arg, nni_aio *aio) } static void -tls_ep_fini(void *arg) +tlstran_dialer_fini(void *arg) { - tls_ep *ep = arg; + tlstran_dialer *d = arg; - nni_aio_stop(ep->aio); - if (ep->tep != NULL) { - nni_plat_tcp_ep_fini(ep->tep); + nni_aio_stop(d->aio); + if (d->dialer != NULL) { + nni_tcp_dialer_fini(d->dialer); } - if (ep->cfg) { - nni_tls_config_fini(ep->cfg); + nni_aio_fini(d->aio); + if (d->ep.cfg != NULL) { + nni_tls_config_fini(d->ep.cfg); } - nni_aio_fini(ep->aio); - nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); + nni_mtx_fini(&d->ep.mtx); + NNI_FREE_STRUCT(d); +} + +static void +tlstran_dialer_close(void *arg) +{ + tlstran_dialer *d = arg; + + nni_aio_close(d->aio); + nni_tcp_dialer_close(d->dialer); } static int -tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) +tlstran_dialer_init(void **dp, nni_url *url, nni_sock *sock) { - tls_ep * ep; - int rv; - char * host; - char * serv; - nni_sockaddr rsa, lsa; - nni_aio * aio; - int passive; - nng_tls_mode tlsmode; - nng_tls_auth_mode authmode; - uint16_t af; + tlstran_dialer *d; + int rv; + uint16_t af; + char * host = url->u_hostname; + char * port = url->u_port; if (strcmp(url->u_scheme, "tls+tcp") == 0) { af = NNG_AF_UNSPEC; @@ -639,232 +655,374 @@ tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (NNG_EADDRINVAL); } if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) || - (url->u_query != NULL)) { + (url->u_query != NULL) || (host == NULL) || (port == NULL) || + (strlen(host) == 0) || (strlen(port) == 0)) { return (NNG_EADDRINVAL); } + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + nni_mtx_init(&d->ep.mtx); + d->ep.authmode = NNG_TLS_AUTH_MODE_REQUIRED; + d->ep.url = url; + d->ep.proto = nni_sock_proto_id(sock); + d->ep.nodelay = true; + d->ep.keepalive = false; + + if (((rv = nni_tcp_dialer_init(&d->dialer)) != 0) || + ((rv = nni_tls_config_init(&d->ep.cfg, NNG_TLS_MODE_CLIENT)) != + 0) || + ((rv = nng_tls_config_auth_mode(d->ep.cfg, d->ep.authmode)) != + 0) || + ((rv = nng_tls_config_server_name(d->ep.cfg, host)) != 0) || + ((rv = nni_aio_init(&d->aio, tlstran_dialer_cb, d)) != 0)) { + tlstran_dialer_fini(d); return (rv); } + d->af = af; - if (strlen(url->u_hostname) == 0) { - host = NULL; - } else { - host = url->u_hostname; - } - if (strlen(url->u_port) == 0) { - serv = NULL; - } else { - serv = url->u_port; - } - - if (mode == NNI_EP_MODE_DIAL) { - passive = 0; - tlsmode = NNG_TLS_MODE_CLIENT; - authmode = NNG_TLS_AUTH_MODE_REQUIRED; - lsa.s_family = af; - nni_aio_set_input(aio, 0, &rsa); - if ((host == NULL) || (serv == NULL)) { - nni_aio_fini(aio); - return (NNG_EADDRINVAL); + *dp = d; + return (0); +} + +static void +tlstran_dialer_cb(void *arg) +{ + tlstran_dialer *d = arg; + tlstran_pipe * p; + nni_tcp_conn * conn; + nni_tls * tls; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->ep.mtx); + aio = d->user_aio; + rv = nni_aio_result(d->aio); + + if (aio == NULL) { + nni_mtx_unlock(&d->ep.mtx); + if ((rv == 0) && !d->resolving) { + conn = nni_aio_get_output(d->aio, 0); + nni_tcp_conn_fini(conn); } - } else { - passive = 1; - tlsmode = NNG_TLS_MODE_SERVER; - authmode = NNG_TLS_AUTH_MODE_NONE; - rsa.s_family = af; - nni_aio_set_input(aio, 0, &lsa); + return; } - // XXX: arguably we could defer this part to the point we do a bind - // or connect! - nni_plat_tcp_resolv(host, serv, af, passive, aio); - nni_aio_wait(aio); - if ((rv = nni_aio_result(aio)) != 0) { - nni_aio_fini(aio); - return (rv); + if (rv != 0) { + d->user_aio = NULL; + nni_mtx_unlock(&d->ep.mtx); + nni_aio_finish_error(aio, rv); + return; } - nni_aio_fini(aio); - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); + if (d->resolving) { + // Name resolution complete. Now go to next step. + d->resolving = false; + nni_tcp_dialer_dial(d->dialer, &d->sa, d->aio); + nni_mtx_unlock(&d->ep.mtx); + return; } - nni_mtx_init(&ep->mtx); - ep->url = url; - ep->keepalive = false; - ep->nodelay = true; + d->user_aio = NULL; + conn = nni_aio_get_output(d->aio, 0); + NNI_ASSERT(conn != NULL); - if (((rv = nni_plat_tcp_ep_init(&ep->tep, &lsa, &rsa, mode)) != 0) || - ((rv = nni_tls_config_init(&ep->cfg, tlsmode)) != 0) || - ((rv = nng_tls_config_auth_mode(ep->cfg, authmode)) != 0) || - ((rv = nni_aio_init(&ep->aio, tls_ep_cb, ep)) != 0)) { - tls_ep_fini(ep); - return (rv); + if ((rv = nni_tls_init(&tls, d->ep.cfg, conn)) != 0) { + nni_mtx_unlock(&d->ep.mtx); + nni_tcp_conn_fini(conn); + nni_aio_finish_error(aio, rv); + return; } - if ((tlsmode == NNG_TLS_MODE_CLIENT) && (host != NULL)) { - if ((rv = nng_tls_config_server_name(ep->cfg, host)) != 0) { - tls_ep_fini(ep); - return (rv); - } + + if ((rv = tlstran_pipe_init(&p, tls)) != 0) { + nni_mtx_unlock(&d->ep.mtx); + nni_tls_fini(tls); + nni_aio_finish_error(aio, rv); + return; } - ep->proto = nni_sock_proto_id(sock); - ep->authmode = authmode; - *epp = ep; - return (0); -} + p->proto = d->ep.proto; + p->rcvmax = d->ep.rcvmax; + p->nodelay = d->ep.nodelay; + p->keepalive = d->ep.keepalive; + nni_mtx_unlock(&d->ep.mtx); -static int -tls_dialer_init(void **epp, nni_url *url, nni_sock *sock) -{ - return (tls_ep_init(epp, url, sock, NNI_EP_MODE_DIAL)); + (void) nni_tls_set_nodelay(tls, p->nodelay); + (void) nni_tls_set_keepalive(tls, p->keepalive); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } -static int -tls_listener_init(void **epp, nni_url *url, nni_sock *sock) +static void +tlstran_dialer_cancel(nni_aio *aio, int rv) { - return (tls_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN)); + tlstran_dialer *d = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&d->ep.mtx); + if (d->user_aio != aio) { + nni_mtx_unlock(&d->ep.mtx); + return; + } + d->user_aio = NULL; + nni_mtx_unlock(&d->ep.mtx); + + nni_aio_abort(d->aio, rv); + nni_aio_finish_error(aio, rv); } static void -tls_ep_close(void *arg) +tlstran_dialer_connect(void *arg, nni_aio *aio) { - tls_ep *ep = arg; + tlstran_dialer *d = arg; + int rv; - nni_aio_close(ep->aio); + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&d->ep.mtx); + NNI_ASSERT(d->user_aio == NULL); - nni_mtx_lock(&ep->mtx); - nni_plat_tcp_ep_close(ep->tep); - nni_mtx_unlock(&ep->mtx); + if ((rv = nni_aio_schedule(aio, tlstran_dialer_cancel, d)) != 0) { + nni_mtx_unlock(&d->ep.mtx); + nni_aio_finish_error(aio, rv); + return; + } + d->user_aio = aio; + + d->resolving = true; + + // Start the name resolution. Callback will see resolving, and then + // switch to doing actual connect. + nni_aio_set_input(d->aio, 0, &d->sa); + nni_tcp_resolv( + d->ep.url->u_hostname, d->ep.url->u_port, d->af, 0, d->aio); + nni_mtx_unlock(&d->ep.mtx); } -static int -tls_ep_bind(void *arg) +static void +tlstran_listener_fini(void *arg) { - tls_ep *ep = arg; - int rv; + tlstran_listener *l = arg; - nni_mtx_lock(&ep->mtx); - rv = nni_plat_tcp_ep_listen(ep->tep, &ep->bsa); - nni_mtx_unlock(&ep->mtx); - - return (rv); + nni_aio_stop(l->aio); + if (l->listener != NULL) { + nni_tcp_listener_fini(l->listener); + } + nni_aio_fini(l->aio); + if (l->ep.cfg != NULL) { + nni_tls_config_fini(l->ep.cfg); + } + nni_mtx_fini(&l->ep.mtx); + NNI_FREE_STRUCT(l); } static void -tls_ep_finish(tls_ep *ep) +tlstran_listener_close(void *arg) { - nni_aio * aio; - int rv; - tls_pipe *pipe = NULL; + tlstran_listener *l = arg; - if ((rv = nni_aio_result(ep->aio)) != 0) { - goto done; + nni_aio_close(l->aio); + nni_tcp_listener_close(l->listener); +} + +static int +tlstran_listener_init(void **lp, nni_url *url, nni_sock *sock) +{ + tlstran_listener *l; + int rv; + nni_aio * aio; + uint16_t af; + char * host = url->u_hostname; + char * port = url->u_port; + + if (strcmp(url->u_scheme, "tls+tcp") == 0) { + af = NNG_AF_UNSPEC; + } else if (strcmp(url->u_scheme, "tls+tcp4") == 0) { + af = NNG_AF_INET; + } else if (strcmp(url->u_scheme, "tls+tcp6") == 0) { + af = NNG_AF_INET6; + } else { + return (NNG_EADDRINVAL); + } + + // Check for invalid URL components. + if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) { + return (NNG_EADDRINVAL); + } + if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) || + (url->u_query != NULL)) { + return (NNG_EADDRINVAL); } - NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); - // Attempt to allocate the parent pipe. If this fails we'll - // drop the connection (ENOMEM probably). - rv = tls_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&l->ep.mtx); + l->ep.url = url; + l->ep.authmode = NNG_TLS_AUTH_MODE_NONE; + l->ep.keepalive = false; + l->ep.nodelay = true; + l->ep.proto = nni_sock_proto_id(sock); -done: - aio = ep->user_aio; - ep->user_aio = NULL; + if (strlen(host) == 0) { + host = NULL; + } - if ((aio != NULL) && (rv == 0)) { - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); - return; + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + tlstran_listener_fini(l); + return (rv); } - if (pipe != NULL) { - tls_pipe_fini(pipe); + + // XXX: We are doing lookup at listener initialization. There is + // a valid argument that this should be done at bind time, but that + // would require making bind asynchronous. In some ways this would + // be worse than the cost of just waiting here. We always recommend + // using local IP addresses rather than names when possible. + + nni_aio_set_input(aio, 0, &l->sa); + + nni_tcp_resolv(host, port, af, 1, aio); + nni_aio_wait(aio); + rv = nni_aio_result(aio); + nni_aio_fini(aio); + + if (rv != 0) { + tlstran_listener_fini(l); + return (rv); } - if (aio != NULL) { - NNI_ASSERT(rv != 0); - nni_aio_finish_error(aio, rv); + + if (((rv = nni_tcp_listener_init(&l->listener)) != 0) || + ((rv = nni_tls_config_init(&l->ep.cfg, NNG_TLS_MODE_SERVER)) != + 0) || + ((rv = nng_tls_config_auth_mode(l->ep.cfg, l->ep.authmode)) != + 0) || + ((rv = nni_aio_init(&l->aio, tlstran_listener_cb, l)) != 0)) { + tlstran_listener_fini(l); + return (rv); } + l->bsa = l->sa; + + *lp = l; + return (0); } -static void -tls_ep_cb(void *arg) +static int +tlstran_listener_bind(void *arg) { - tls_ep *ep = arg; + tlstran_listener *l = arg; + int rv; - nni_mtx_lock(&ep->mtx); - tls_ep_finish(ep); - nni_mtx_unlock(&ep->mtx); + l->bsa = l->sa; + nni_mtx_lock(&l->ep.mtx); + rv = nni_tcp_listener_listen(l->listener, &l->bsa); + nni_mtx_unlock(&l->ep.mtx); + + return (rv); } static void -tls_cancel_ep(nni_aio *aio, int rv) +tlstran_listener_cb(void *arg) { - tls_ep *ep = nni_aio_get_prov_data(aio); + tlstran_listener *l = arg; + nni_aio * aio; + int rv; + tlstran_pipe * p = NULL; + nni_tcp_conn * conn; + nni_tls * tls; + + nni_mtx_lock(&l->ep.mtx); + rv = nni_aio_result(l->aio); + aio = l->user_aio; + l->user_aio = NULL; + + if (aio == NULL) { + nni_mtx_unlock(&l->ep.mtx); + if (rv == 0) { + conn = nni_aio_get_output(l->aio, 0); + nni_tcp_conn_fini(conn); + } + return; + } + if (rv != 0) { + nni_mtx_unlock(&l->ep.mtx); + nni_aio_finish_error(aio, rv); + return; + } - nni_mtx_lock(&ep->mtx); - if (ep->user_aio != aio) { - nni_mtx_unlock(&ep->mtx); + conn = nni_aio_get_output(l->aio, 0); + if ((rv = nni_tls_init(&tls, l->ep.cfg, conn)) != 0) { + nni_mtx_unlock(&l->ep.mtx); + nni_tcp_conn_fini(conn); + nni_aio_finish_error(aio, rv); return; } - ep->user_aio = NULL; - nni_mtx_unlock(&ep->mtx); + if ((rv = tlstran_pipe_init(&p, tls)) != 0) { + nni_mtx_unlock(&l->ep.mtx); + nni_tls_fini(tls); + nni_aio_finish_error(aio, rv); + return; + } + p->proto = l->ep.proto; + p->rcvmax = l->ep.rcvmax; + p->nodelay = l->ep.nodelay; + p->keepalive = l->ep.keepalive; - nni_aio_abort(ep->aio, rv); - nni_aio_finish_error(aio, rv); + (void) nni_tls_set_nodelay(tls, p->nodelay); + (void) nni_tls_set_keepalive(tls, p->keepalive); + + nni_mtx_unlock(&l->ep.mtx); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } static void -tls_ep_accept(void *arg, nni_aio *aio) +tlstran_listener_cancel(nni_aio *aio, int rv) { - tls_ep *ep = arg; - int rv; + tlstran_listener *l = nni_aio_get_prov_data(aio); - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - if ((rv = nni_aio_schedule(aio, tls_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); - nni_aio_finish_error(aio, rv); + nni_mtx_lock(&l->ep.mtx); + if (l->user_aio != aio) { + nni_mtx_unlock(&l->ep.mtx); return; } - ep->user_aio = aio; - nni_plat_tcp_ep_accept(ep->tep, ep->aio); - nni_mtx_unlock(&ep->mtx); + l->user_aio = NULL; + nni_mtx_unlock(&l->ep.mtx); + + nni_aio_abort(l->aio, rv); + nni_aio_finish_error(aio, rv); } static void -tls_ep_connect(void *arg, nni_aio *aio) +tlstran_listener_accept(void *arg, nni_aio *aio) { - tls_ep *ep = arg; - int rv; + tlstran_listener *l = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - if ((rv = nni_aio_schedule(aio, tls_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->ep.mtx); + NNI_ASSERT(l->user_aio == NULL); + + if ((rv = nni_aio_schedule(aio, tlstran_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->ep.mtx); nni_aio_finish_error(aio, rv); + return; } - ep->user_aio = aio; - nni_plat_tcp_ep_connect(ep->tep, ep->aio); - nni_mtx_unlock(&ep->mtx); -} + l->user_aio = aio; -static int -tls_ep_chk_bool(const void *v, size_t sz, nni_opt_type t) -{ - return (nni_copyin_bool(NULL, v, sz, t)); + nni_tcp_listener_accept(l->listener, l->aio); + nni_mtx_unlock(&l->ep.mtx); } static int -tls_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - bool val; - int rv; + tlstran_ep *ep = arg; + bool val; + int rv; if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { nni_mtx_lock(&ep->mtx); ep->nodelay = val; @@ -874,10 +1032,10 @@ tls_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t) } static int -tls_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; nni_mtx_lock(&ep->mtx); rv = nni_copyout_bool(ep->nodelay, v, szp, t); nni_mtx_unlock(&ep->mtx); @@ -885,11 +1043,25 @@ tls_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tls_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - bool val; - int rv; + tlstran_ep *ep = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&ep->mtx); + ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); + } + return (rv); +} + +static int +tlstran_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + tlstran_ep *ep = arg; + bool val; + int rv; if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { nni_mtx_lock(&ep->mtx); ep->keepalive = val; @@ -899,10 +1071,10 @@ tls_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t) } static int -tls_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; nni_mtx_lock(&ep->mtx); rv = nni_copyout_bool(ep->keepalive, v, szp, t); nni_mtx_unlock(&ep->mtx); @@ -910,60 +1082,53 @@ tls_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tls_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_ep *ep = arg; - - return (nni_copyout_str(ep->url->u_rawurl, v, szp, t)); + tlstran_ep *ep = arg; + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); } static int -tls_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_check_bool(const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - char ustr[128]; - char ipstr[48]; // max for IPv6 addresses including [] - char portstr[6]; // max for 16-bit port - - nni_plat_tcp_ntop(&ep->bsa, ipstr, portstr); - snprintf(ustr, sizeof(ustr), "tls+tcp://%s:%s", ipstr, portstr); - return (nni_copyout_str(ustr, v, szp, t)); + return (nni_copyin_bool(NULL, v, sz, t)); } static int -tls_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_ep *ep = arg; - size_t val; - int rv; + tlstran_dialer *d = arg; - if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { - nni_mtx_lock(&ep->mtx); - ep->rcvmax = val; - nni_mtx_unlock(&ep->mtx); - } - return (rv); + return (nni_copyout_str(d->ep.url->u_rawurl, v, szp, t)); } static int -tls_ep_chk_recvmaxsz(const void *v, size_t sz, nni_opt_type t) +tlstran_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) { - return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); + tlstran_listener *l = arg; + char ustr[128]; + char ipstr[48]; // max for IPv6 addresses including [] + char portstr[6]; // max for 16-bit port + + nni_mtx_lock(&l->ep.mtx); + nni_ntop(&l->bsa, ipstr, portstr); + nni_mtx_unlock(&l->ep.mtx); + snprintf(ustr, sizeof(ustr), "tls+tcp://%s:%s", ipstr, portstr); + return (nni_copyout_str(ustr, v, szp, t)); } static int -tls_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_check_recvmaxsz(const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_size(ep->rcvmax, v, szp, t); - nni_mtx_unlock(&ep->mtx); - return (rv); + return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); } static int -tls_ep_chk_config(const void *data, size_t sz, nni_opt_type t) +tlstran_check_config(const void *data, size_t sz, nni_opt_type t) { void *v; int rv; @@ -974,9 +1139,9 @@ tls_ep_chk_config(const void *data, size_t sz, nni_opt_type t) } static int -tls_ep_set_config(void *arg, const void *data, size_t sz, nni_opt_type t) +tlstran_ep_set_config(void *arg, const void *data, size_t sz, nni_opt_type t) { - tls_ep * ep = arg; + tlstran_ep * ep = arg; nng_tls_config *cfg, *old; int rv; @@ -998,10 +1163,10 @@ tls_ep_set_config(void *arg, const void *data, size_t sz, nni_opt_type t) } static int -tls_ep_get_config(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_ep_get_config(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; nni_mtx_lock(&ep->mtx); rv = nni_copyout_ptr(ep->cfg, v, szp, t); nni_mtx_unlock(&ep->mtx); @@ -1009,7 +1174,7 @@ tls_ep_get_config(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tls_ep_chk_string(const void *v, size_t sz, nni_opt_type t) +tlstran_check_string(const void *v, size_t sz, nni_opt_type t) { if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) { return (NNG_EBADTYPE); @@ -1021,58 +1186,65 @@ tls_ep_chk_string(const void *v, size_t sz, nni_opt_type t) } static int -tls_ep_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; - if ((rv = tls_ep_chk_string(v, sz, t)) == 0) { + if ((rv = tlstran_check_string(v, sz, t)) == 0) { + nni_mtx_lock(&ep->mtx); rv = nng_tls_config_ca_file(ep->cfg, v); + nni_mtx_unlock(&ep->mtx); } return (rv); } static int -tls_ep_chk_auth_mode(const void *v, size_t sz, nni_opt_type t) +tlstran_check_auth_mode(const void *v, size_t sz, nni_opt_type t) { return (nni_copyin_int(NULL, v, sz, NNG_TLS_AUTH_MODE_NONE, NNG_TLS_AUTH_MODE_REQUIRED, t)); } static int -tls_ep_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - int mode; - int rv; + tlstran_ep *ep = arg; + int mode; + int rv; rv = nni_copyin_int(&mode, v, sz, NNG_TLS_AUTH_MODE_NONE, NNG_TLS_AUTH_MODE_REQUIRED, t); if (rv == 0) { + nni_mtx_lock(&ep->mtx); rv = nng_tls_config_auth_mode(ep->cfg, mode); + nni_mtx_unlock(&ep->mtx); } return (rv); } static int -tls_ep_set_server_name(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_server_name(void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; - if ((rv = tls_ep_chk_string(v, sz, t)) == 0) { + if ((rv = tlstran_check_string(v, sz, t)) == 0) { + nni_mtx_lock(&ep->mtx); rv = nng_tls_config_server_name(ep->cfg, v); + nni_mtx_unlock(&ep->mtx); } return (rv); } static int -tls_ep_set_cert_key_file(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_cert_key_file( + void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; - if ((rv = tls_ep_chk_string(v, sz, t)) == 0) { + if ((rv = tlstran_check_string(v, sz, t)) == 0) { nni_mtx_lock(&ep->mtx); rv = nng_tls_config_cert_key_file(ep->cfg, v, NULL); nni_mtx_unlock(&ep->mtx); @@ -1081,38 +1253,38 @@ tls_ep_set_cert_key_file(void *arg, const void *v, size_t sz, nni_opt_type t) } static int -tls_pipe_get_verified(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_pipe_get_verified(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; return (nni_copyout_bool(nni_tls_verified(p->tls), v, szp, t)); } -static nni_tran_option tls_pipe_options[] = { +static nni_tran_option tlstran_pipe_options[] = { { .o_name = NNG_OPT_LOCADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = tls_pipe_get_locaddr, + .o_get = tlstran_pipe_get_locaddr, }, { .o_name = NNG_OPT_REMADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = tls_pipe_get_remaddr, + .o_get = tlstran_pipe_get_remaddr, }, { .o_name = NNG_OPT_TLS_VERIFIED, .o_type = NNI_TYPE_BOOL, - .o_get = tls_pipe_get_verified, + .o_get = tlstran_pipe_get_verified, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tls_pipe_get_keepalive, + .o_get = tlstran_pipe_get_keepalive, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tls_pipe_get_nodelay, + .o_get = tlstran_pipe_get_nodelay, }, // terminate list { @@ -1120,74 +1292,74 @@ static nni_tran_option tls_pipe_options[] = { }, }; -static nni_tran_pipe_ops tls_pipe_ops = { - .p_fini = tls_pipe_fini, - .p_start = tls_pipe_start, - .p_stop = tls_pipe_stop, - .p_send = tls_pipe_send, - .p_recv = tls_pipe_recv, - .p_close = tls_pipe_close, - .p_peer = tls_pipe_peer, - .p_options = tls_pipe_options, +static nni_tran_pipe_ops tlstran_pipe_ops = { + .p_fini = tlstran_pipe_fini, + .p_start = tlstran_pipe_start, + .p_stop = tlstran_pipe_stop, + .p_send = tlstran_pipe_send, + .p_recv = tlstran_pipe_recv, + .p_close = tlstran_pipe_close, + .p_peer = tlstran_pipe_peer, + .p_options = tlstran_pipe_options, }; -static nni_tran_option tls_dialer_options[] = { +static nni_tran_option tlstran_dialer_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = tls_ep_get_recvmaxsz, - .o_set = tls_ep_set_recvmaxsz, - .o_chk = tls_ep_chk_recvmaxsz, + .o_get = tlstran_ep_get_recvmaxsz, + .o_set = tlstran_ep_set_recvmaxsz, + .o_chk = tlstran_check_recvmaxsz, }, { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, - .o_get = tls_dialer_get_url, + .o_get = tlstran_dialer_get_url, }, { .o_name = NNG_OPT_TLS_CONFIG, .o_type = NNI_TYPE_POINTER, - .o_get = tls_ep_get_config, - .o_set = tls_ep_set_config, - .o_chk = tls_ep_chk_config, + .o_get = tlstran_ep_get_config, + .o_set = tlstran_ep_set_config, + .o_chk = tlstran_check_config, }, { .o_name = NNG_OPT_TLS_CERT_KEY_FILE, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_cert_key_file, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_cert_key_file, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TLS_CA_FILE, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_ca_file, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_ca_file, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TLS_AUTH_MODE, .o_type = NNI_TYPE_INT32, // enum really - .o_set = tls_ep_set_auth_mode, - .o_chk = tls_ep_chk_auth_mode, + .o_set = tlstran_ep_set_auth_mode, + .o_chk = tlstran_check_auth_mode, }, { .o_name = NNG_OPT_TLS_SERVER_NAME, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_server_name, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_server_name, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tls_ep_get_nodelay, - .o_set = tls_ep_set_nodelay, - .o_chk = tls_ep_chk_bool, + .o_get = tlstran_ep_get_nodelay, + .o_set = tlstran_ep_set_nodelay, + .o_chk = tlstran_check_bool, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tls_ep_get_keepalive, - .o_set = tls_ep_set_keepalive, - .o_chk = tls_ep_chk_bool, + .o_get = tlstran_ep_get_keepalive, + .o_set = tlstran_ep_set_keepalive, + .o_chk = tlstran_check_bool, }, // terminate list { @@ -1195,63 +1367,63 @@ static nni_tran_option tls_dialer_options[] = { }, }; -static nni_tran_option tls_listener_options[] = { +static nni_tran_option tlstran_listener_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = tls_ep_get_recvmaxsz, - .o_set = tls_ep_set_recvmaxsz, - .o_chk = tls_ep_chk_recvmaxsz, + .o_get = tlstran_ep_get_recvmaxsz, + .o_set = tlstran_ep_set_recvmaxsz, + .o_chk = tlstran_check_recvmaxsz, }, { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, - .o_get = tls_listener_get_url, + .o_get = tlstran_listener_get_url, }, { .o_name = NNG_OPT_TLS_CONFIG, .o_type = NNI_TYPE_POINTER, - .o_get = tls_ep_get_config, - .o_set = tls_ep_set_config, - .o_chk = tls_ep_chk_config, + .o_get = tlstran_ep_get_config, + .o_set = tlstran_ep_set_config, + .o_chk = tlstran_check_config, }, { .o_name = NNG_OPT_TLS_CERT_KEY_FILE, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_cert_key_file, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_cert_key_file, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TLS_CA_FILE, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_ca_file, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_ca_file, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TLS_AUTH_MODE, .o_type = NNI_TYPE_INT32, // enum really - .o_set = tls_ep_set_auth_mode, - .o_chk = tls_ep_chk_auth_mode, + .o_set = tlstran_ep_set_auth_mode, + .o_chk = tlstran_check_auth_mode, }, { .o_name = NNG_OPT_TLS_SERVER_NAME, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_server_name, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_server_name, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tls_ep_get_nodelay, - .o_set = tls_ep_set_nodelay, - .o_chk = tls_ep_chk_bool, + .o_get = tlstran_ep_get_nodelay, + .o_set = tlstran_ep_set_nodelay, + .o_chk = tlstran_check_bool, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tls_ep_get_keepalive, - .o_set = tls_ep_set_keepalive, - .o_chk = tls_ep_chk_bool, + .o_get = tlstran_ep_get_keepalive, + .o_set = tlstran_ep_set_keepalive, + .o_chk = tlstran_check_bool, }, // terminate list { @@ -1259,51 +1431,51 @@ static nni_tran_option tls_listener_options[] = { }, }; -static nni_tran_dialer_ops tls_dialer_ops = { - .d_init = tls_dialer_init, - .d_fini = tls_ep_fini, - .d_connect = tls_ep_connect, - .d_close = tls_ep_close, - .d_options = tls_dialer_options, +static nni_tran_dialer_ops tlstran_dialer_ops = { + .d_init = tlstran_dialer_init, + .d_fini = tlstran_dialer_fini, + .d_connect = tlstran_dialer_connect, + .d_close = tlstran_dialer_close, + .d_options = tlstran_dialer_options, }; -static nni_tran_listener_ops tls_listener_ops = { - .l_init = tls_listener_init, - .l_fini = tls_ep_fini, - .l_bind = tls_ep_bind, - .l_accept = tls_ep_accept, - .l_close = tls_ep_close, - .l_options = tls_listener_options, +static nni_tran_listener_ops tlstran_listener_ops = { + .l_init = tlstran_listener_init, + .l_fini = tlstran_listener_fini, + .l_bind = tlstran_listener_bind, + .l_accept = tlstran_listener_accept, + .l_close = tlstran_listener_close, + .l_options = tlstran_listener_options, }; static nni_tran tls_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tls+tcp", - .tran_dialer = &tls_dialer_ops, - .tran_listener = &tls_listener_ops, - .tran_pipe = &tls_pipe_ops, - .tran_init = tls_tran_init, - .tran_fini = tls_tran_fini, + .tran_dialer = &tlstran_dialer_ops, + .tran_listener = &tlstran_listener_ops, + .tran_pipe = &tlstran_pipe_ops, + .tran_init = tlstran_init, + .tran_fini = tlstran_fini, }; static nni_tran tls4_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tls+tcp4", - .tran_dialer = &tls_dialer_ops, - .tran_listener = &tls_listener_ops, - .tran_pipe = &tls_pipe_ops, - .tran_init = tls_tran_init, - .tran_fini = tls_tran_fini, + .tran_dialer = &tlstran_dialer_ops, + .tran_listener = &tlstran_listener_ops, + .tran_pipe = &tlstran_pipe_ops, + .tran_init = tlstran_init, + .tran_fini = tlstran_fini, }; static nni_tran tls6_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tls+tcp6", - .tran_dialer = &tls_dialer_ops, - .tran_listener = &tls_listener_ops, - .tran_pipe = &tls_pipe_ops, - .tran_init = tls_tran_init, - .tran_fini = tls_tran_fini, + .tran_dialer = &tlstran_dialer_ops, + .tran_listener = &tlstran_listener_ops, + .tran_pipe = &tlstran_pipe_ops, + .tran_init = tlstran_init, + .tran_fini = tlstran_fini, }; int diff --git a/tests/compat_tcp.c b/tests/compat_tcp.c index fee271e9..95ceb546 100644 --- a/tests/compat_tcp.c +++ b/tests/compat_tcp.c @@ -119,9 +119,11 @@ int main (int argc, const char *argv[]) rc = nn_connect (sc, "tcp://:5555"); nn_assert (rc < 0); errno_assert (nn_errno () == EINVAL); +#if 0 // NNG does not validate names apriori rc = nn_connect (sc, "tcp://-hostname:5555"); nn_assert (rc < 0); errno_assert (nn_errno () == EINVAL); +#endif rc = nn_connect (sc, "tcp://abc.123.---.#:5555"); nn_assert (rc < 0); errno_assert (nn_errno () == EINVAL); @@ -132,12 +134,14 @@ int main (int argc, const char *argv[]) errno_assert (nn_errno () == EINVAL); #endif +#if 0 // Again NNG is not validating names apriori. rc = nn_connect (sc, "tcp://abc...123:5555"); nn_assert (rc < 0); errno_assert (nn_errno () == EINVAL); rc = nn_connect (sc, "tcp://.123:5555"); nn_assert (rc < 0); errno_assert (nn_errno () == EINVAL); +#endif /* Connect correctly. Do so before binding the peer socket. */ test_connect (sc, socket_address); diff --git a/tests/compat_ws.c b/tests/compat_ws.c index 2c302712..5dcfa277 100644 --- a/tests/compat_ws.c +++ b/tests/compat_ws.c @@ -5,8 +5,8 @@ Copyright (c) 2014-2016 Jack R. Dunaway. All rights reserved. Copyright 2016 Franklin "Snaipe" Mathieu <franklinmathieu@gmail.com> - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), + Permission is hereby granted, free of charge, to any person obtaining a + copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom @@ -20,8 +20,8 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS - IN THE SOFTWARE. + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + DEALINGS IN THE SOFTWARE. */ // This file began life in nanomsg, but we have made some significant changes @@ -30,7 +30,7 @@ // has support for IPv6, and does not support local interface binding. // We have improved the maximum receive size test, and verified that option // setting for the frame type conforms to NNG constraints. - + #include <nanomsg/nn.h> #include <nanomsg/pair.h> #include <nanomsg/ws.h> @@ -41,7 +41,7 @@ static char socket_address[128]; /* Basic tests for WebSocket transport. */ -#if 0 // NNG has no support for text frames. +#if 0 // NNG has no support for text frames. /* test_text() verifies that we drop messages properly when sending invalid UTF-8, but not when we send valid data. */ void test_text () @@ -84,198 +84,200 @@ void test_text () } #endif -int main (int argc, const char *argv[]) +int +main(int argc, const char *argv[]) { - int rc; - int sb; - int sc; - int sb2; - int opt; - size_t sz; - int i; - char any_address[128]; - - test_addr_from (socket_address, "ws", "127.0.0.1", - get_test_port (argc, argv)); - - test_addr_from (any_address, "ws", "*", - get_test_port (argc, argv)); - - /* Try closing bound but unconnected socket. */ - sb = test_socket (AF_SP, NN_PAIR); - test_bind (sb, any_address); - test_close (sb); - - /* Try closing a TCP socket while it not connected. At the same time - test specifying the local address for the connection. */ - sc = test_socket (AF_SP, NN_PAIR); - test_connect (sc, socket_address); - test_close (sc); - - /* Open the socket anew. */ - sc = test_socket (AF_SP, NN_PAIR); - - /* Check socket options. */ - sz = sizeof (opt); - rc = nn_getsockopt (sc, NN_WS, NN_WS_MSG_TYPE, &opt, &sz); - errno_assert (rc == 0); - nn_assert (sz == sizeof (opt)); - nn_assert (opt == NN_WS_MSG_TYPE_BINARY); - - /* Default port 80 should be assumed if not explicitly declared. */ - rc = nn_connect (sc, "ws://127.0.0.1"); - errno_assert (rc >= 0); - - /* Try using invalid address strings. */ - rc = nn_connect (sc, "ws://*:"); - nn_assert (rc < 0); - errno_assert (nn_errno () == EINVAL); - rc = nn_connect (sc, "ws://*:1000000"); - nn_assert (rc < 0); - errno_assert (nn_errno () == EINVAL); - rc = nn_connect (sc, "ws://*:some_port"); - nn_assert (rc < 0); + int rc; + int sb; + int sc; + int sb2; + int opt; + size_t sz; + int i; + char any_address[128]; + + test_addr_from( + socket_address, "ws", "127.0.0.1", get_test_port(argc, argv)); + + test_addr_from(any_address, "ws", "*", get_test_port(argc, argv)); + + /* Try closing bound but unconnected socket. */ + sb = test_socket(AF_SP, NN_PAIR); + test_bind(sb, any_address); + test_close(sb); + + /* Try closing a TCP socket while it not connected. At the same time + test specifying the local address for the connection. */ + sc = test_socket(AF_SP, NN_PAIR); + test_connect(sc, socket_address); + test_close(sc); + + /* Open the socket anew. */ + sc = test_socket(AF_SP, NN_PAIR); + + /* Check socket options. */ + sz = sizeof(opt); + rc = nn_getsockopt(sc, NN_WS, NN_WS_MSG_TYPE, &opt, &sz); + errno_assert(rc == 0); + nn_assert(sz == sizeof(opt)); + nn_assert(opt == NN_WS_MSG_TYPE_BINARY); + + /* Default port 80 should be assumed if not explicitly declared. */ + rc = nn_connect(sc, "ws://127.0.0.1"); + errno_assert(rc >= 0); + + /* Try using invalid address strings. */ + rc = nn_connect(sc, "ws://*:"); + nn_assert(rc < 0); + errno_assert(nn_errno() == EINVAL); + rc = nn_connect(sc, "ws://*:1000000"); + nn_assert(rc < 0); + errno_assert(nn_errno() == EINVAL); + rc = nn_connect(sc, "ws://*:some_port"); + nn_assert(rc < 0); #if 0 // NNG doesn't support device binding rc = nn_connect (sc, "ws://eth10000;127.0.0.1:5555"); nn_assert (rc < 0); errno_assert (nn_errno () == ENODEV); #endif - rc = nn_bind (sc, "ws://127.0.0.1:"); - nn_assert (rc < 0); - errno_assert (nn_errno () == EINVAL); - rc = nn_bind (sc, "ws://127.0.0.1:1000000"); - nn_assert (rc < 0); - errno_assert (nn_errno () == EINVAL); + rc = nn_bind(sc, "ws://127.0.0.1:"); + nn_assert(rc < 0); + errno_assert(nn_errno() == EINVAL); + rc = nn_bind(sc, "ws://127.0.0.1:1000000"); + nn_assert(rc < 0); + errno_assert(nn_errno() == EINVAL); #if 0 // NNG doesn't support device binding rc = nn_bind (sc, "ws://eth10000:5555"); nn_assert (rc < 0); errno_assert (nn_errno () == ENODEV); #endif - rc = nn_connect (sc, "ws://:5555"); - nn_assert (rc < 0); - errno_assert (nn_errno () == EINVAL); + rc = nn_connect(sc, "ws://:5555"); + nn_assert(rc < 0); + errno_assert(nn_errno() == EINVAL); +#if 0 // NNG does not validate names apriori rc = nn_connect (sc, "ws://-hostname.:5555"); nn_assert (rc < 0); errno_assert (nn_errno () == EINVAL); rc = nn_connect (sc, "ws://abc.123.---.#:5555"); nn_assert (rc < 0); errno_assert (nn_errno () == EINVAL); -#if 0 // Nothing wrong with this under NNG. Valid IPv6 URL rc = nn_connect (sc, "ws://[::1]:5555"); nn_assert (rc < 0); + errno_assert(nn_errno() == EINVAL); + rc = nn_connect(sc, "ws://abc...123:5555"); + nn_assert(rc < 0); + errno_assert(nn_errno() == EINVAL); + rc = nn_connect(sc, "ws://.123:5555"); + nn_assert(rc < 0); + errno_assert(nn_errno() == EINVAL); #endif - errno_assert (nn_errno () == EINVAL); - rc = nn_connect (sc, "ws://abc...123:5555"); - nn_assert (rc < 0); - errno_assert (nn_errno () == EINVAL); - rc = nn_connect (sc, "ws://.123:5555"); - nn_assert (rc < 0); - errno_assert (nn_errno () == EINVAL); - - test_close (sc); - sb = test_socket (AF_SP, NN_PAIR); - test_bind (sb, socket_address); - sc = test_socket (AF_SP, NN_PAIR); - test_connect (sc, socket_address); - - nn_sleep(100); - /* Ping-pong test. */ - for (i = 0; i != 100; ++i) { - - test_send (sc, "ABC"); - test_recv (sb, "ABC"); - - test_send (sb, "DEF"); - test_recv (sc, "DEF"); - } - - /* Batch transfer test. */ - for (i = 0; i != 100; ++i) { - test_send (sc, "0123456789012345678901234567890123456789"); - } - for (i = 0; i != 100; ++i) { - test_recv (sb, "0123456789012345678901234567890123456789"); - } - - test_close (sc); - test_close (sb); - - /* Test two sockets binding to the same address. */ - sb = test_socket (AF_SP, NN_PAIR); - test_bind (sb, socket_address); - sb2 = test_socket (AF_SP, NN_PAIR); - - rc = nn_bind (sb2, socket_address); - nn_assert (rc < 0); - errno_assert (nn_errno () == EADDRINUSE); - test_close(sb); - test_close(sb2); - - /* Test that NN_RCVMAXSIZE can be -1, but not lower */ - sb = test_socket (AF_SP, NN_PAIR); - opt = -1; - rc = nn_setsockopt (sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof (opt)); - nn_assert (rc >= 0); - opt = -2; - rc = nn_setsockopt (sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof (opt)); - nn_assert (rc < 0); - errno_assert (nn_errno () == EINVAL); - test_close (sb); - - /* Test NN_RCVMAXSIZE limit */ - sb = test_socket (AF_SP, NN_PAIR); - opt = 4; - test_setsockopt (sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof (opt)); - test_bind (sb, socket_address); - sc = test_socket (AF_SP, NN_PAIR); - test_connect (sc, socket_address); - opt = 1000; - test_setsockopt (sc, NN_SOL_SOCKET, NN_SNDTIMEO, &opt, sizeof (opt)); - nn_assert (opt == 1000); - opt = 1000; - test_setsockopt (sb, NN_SOL_SOCKET, NN_RCVTIMEO, &opt, sizeof (opt)); - nn_assert (opt == 1000); - test_send (sc, "ABC"); - test_recv (sb, "ABC"); - test_send (sc, "ABCD"); - test_recv (sb, "ABCD"); - test_send (sc, "ABCDE"); - test_drop (sb, ETIMEDOUT); - test_close (sc); - - /* Increase the size limit, reconnect, then try sending again. */ - opt = 5; - test_setsockopt (sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof (opt)); - - sc = test_socket (AF_SP, NN_PAIR); - test_connect (sc, socket_address); - nn_sleep(200); - - test_send (sc, "ABCDE"); - test_recv (sb, "ABCDE"); - test_close (sb); - test_close (sc); + test_close(sc); + + sb = test_socket(AF_SP, NN_PAIR); + test_bind(sb, socket_address); + sc = test_socket(AF_SP, NN_PAIR); + test_connect(sc, socket_address); + + nn_sleep(100); + /* Ping-pong test. */ + for (i = 0; i != 100; ++i) { + + test_send(sc, "ABC"); + test_recv(sb, "ABC"); + + test_send(sb, "DEF"); + test_recv(sc, "DEF"); + } + + /* Batch transfer test. */ + for (i = 0; i != 100; ++i) { + test_send(sc, "0123456789012345678901234567890123456789"); + } + for (i = 0; i != 100; ++i) { + test_recv(sb, "0123456789012345678901234567890123456789"); + } + + test_close(sc); + test_close(sb); + + /* Test two sockets binding to the same address. */ + sb = test_socket(AF_SP, NN_PAIR); + test_bind(sb, socket_address); + sb2 = test_socket(AF_SP, NN_PAIR); + + rc = nn_bind(sb2, socket_address); + nn_assert(rc < 0); + errno_assert(nn_errno() == EADDRINUSE); + test_close(sb); + test_close(sb2); + + /* Test that NN_RCVMAXSIZE can be -1, but not lower */ + sb = test_socket(AF_SP, NN_PAIR); + opt = -1; + rc = + nn_setsockopt(sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof(opt)); + nn_assert(rc >= 0); + opt = -2; + rc = + nn_setsockopt(sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof(opt)); + nn_assert(rc < 0); + errno_assert(nn_errno() == EINVAL); + test_close(sb); + + /* Test NN_RCVMAXSIZE limit */ + sb = test_socket(AF_SP, NN_PAIR); + opt = 4; + test_setsockopt(sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof(opt)); + test_bind(sb, socket_address); + sc = test_socket(AF_SP, NN_PAIR); + test_connect(sc, socket_address); + opt = 1000; + test_setsockopt(sc, NN_SOL_SOCKET, NN_SNDTIMEO, &opt, sizeof(opt)); + nn_assert(opt == 1000); + opt = 1000; + test_setsockopt(sb, NN_SOL_SOCKET, NN_RCVTIMEO, &opt, sizeof(opt)); + nn_assert(opt == 1000); + test_send(sc, "ABC"); + test_recv(sb, "ABC"); + test_send(sc, "ABCD"); + test_recv(sb, "ABCD"); + test_send(sc, "ABCDE"); + test_drop(sb, ETIMEDOUT); + test_close(sc); + + /* Increase the size limit, reconnect, then try sending again. */ + opt = 5; + test_setsockopt(sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof(opt)); + + sc = test_socket(AF_SP, NN_PAIR); + test_connect(sc, socket_address); + nn_sleep(200); + + test_send(sc, "ABCDE"); + test_recv(sb, "ABCDE"); + test_close(sb); + test_close(sc); #if 0 // NNG doesn't support text frames. test_text (); #else - opt = NN_WS_MSG_TYPE_TEXT; - rc = nn_setsockopt (sc, NN_WS, NN_WS_MSG_TYPE, &opt, sizeof (opt)); - nn_assert (rc < 0); - errno_assert (nn_errno () == EINVAL); + opt = NN_WS_MSG_TYPE_TEXT; + rc = nn_setsockopt(sc, NN_WS, NN_WS_MSG_TYPE, &opt, sizeof(opt)); + nn_assert(rc < 0); + errno_assert(nn_errno() == EINVAL); - opt = NN_WS_MSG_TYPE_BINARY; - test_setsockopt (sb, NN_WS, NN_WS_MSG_TYPE, &opt, sizeof (opt)); + opt = NN_WS_MSG_TYPE_BINARY; + test_setsockopt(sb, NN_WS, NN_WS_MSG_TYPE, &opt, sizeof(opt)); #endif - /* Test closing a socket that is waiting to connect. */ - sc = test_socket (AF_SP, NN_PAIR); - test_connect (sc, socket_address); - nn_sleep (100); - test_close (sc); + /* Test closing a socket that is waiting to connect. */ + sc = test_socket(AF_SP, NN_PAIR); + test_connect(sc, socket_address); + nn_sleep(100); + test_close(sc); - return 0; + return 0; } diff --git a/tests/resolv.c b/tests/resolv.c index 1d502eec..90297a9f 100644 --- a/tests/resolv.c +++ b/tests/resolv.c @@ -57,8 +57,7 @@ ip6tostr(void *addr) nng_sockaddr sa; So(nng_aio_alloc(&aio, NULL, NULL) == 0); So(nng_aio_set_input(aio, 0, &sa) == 0); - nni_plat_tcp_resolv("localhost", "80", NNG_AF_INET6, 1, - aio); + nni_tcp_resolv("localhost", "80", NNG_AF_INET6, 1, aio); nng_aio_wait(aio); So(nng_aio_result(aio) == 0); So(sa.s_in6.sa_family == NNG_AF_INET6); @@ -79,7 +78,7 @@ TestMain("Resolver", { So(nng_aio_alloc(&aio, NULL, NULL) == 0); nng_aio_set_input(aio, 0, &sa); - nni_plat_tcp_resolv("google-public-dns-a.google.com", "80", + nni_tcp_resolv("google-public-dns-a.google.com", "80", NNG_AF_INET, 1, aio); nng_aio_wait(aio); So(nng_aio_result(aio) == 0); @@ -96,7 +95,7 @@ TestMain("Resolver", { So(nng_aio_alloc(&aio, NULL, NULL) == 0); nng_aio_set_input(aio, 0, &sa); - nni_plat_udp_resolv("8.8.4.4", "69", NNG_AF_INET, 1, aio); + nni_udp_resolv("8.8.4.4", "69", NNG_AF_INET, 1, aio); nng_aio_wait(aio); So(nng_aio_result(aio) == 0); So(sa.s_in.sa_family == NNG_AF_INET); @@ -112,7 +111,7 @@ TestMain("Resolver", { So(nng_aio_alloc(&aio, NULL, NULL) == 0); nng_aio_set_input(aio, 0, &sa); - nni_plat_tcp_resolv("8.8.4.4", "80", NNG_AF_INET, 1, aio); + nni_tcp_resolv("8.8.4.4", "80", NNG_AF_INET, 1, aio); nng_aio_wait(aio); So(nng_aio_result(aio) == 0); So(sa.s_in.sa_family == NNG_AF_INET); @@ -137,7 +136,7 @@ TestMain("Resolver", { So(nng_aio_alloc(&aio, NULL, NULL) == 0); nng_aio_set_input(aio, 0, &sa); - nni_plat_tcp_resolv("::1", "80", NNG_AF_INET6, 1, aio); + nni_tcp_resolv("::1", "80", NNG_AF_INET6, 1, aio); nng_aio_wait(aio); So(nng_aio_result(aio) == 0); So(sa.s_in6.sa_family == NNG_AF_INET6); @@ -153,7 +152,7 @@ TestMain("Resolver", { So(nng_aio_alloc(&aio, NULL, NULL) == 0); nng_aio_set_input(aio, 0, &sa); - nni_plat_tcp_resolv("8.8.4.4", "http", NNG_AF_INET, 1, aio); + nni_tcp_resolv("8.8.4.4", "http", NNG_AF_INET, 1, aio); nng_aio_wait(aio); So(nng_aio_result(aio) == NNG_EADDRINVAL); nng_aio_free(aio); @@ -166,7 +165,7 @@ TestMain("Resolver", { So(nng_aio_alloc(&aio, NULL, NULL) == 0); nng_aio_set_input(aio, 0, &sa); - nni_plat_tcp_resolv("localhost", "80", NNG_AF_INET, 1, aio); + nni_tcp_resolv("localhost", "80", NNG_AF_INET, 1, aio); nng_aio_wait(aio); So(nng_aio_result(aio) == 0); So(sa.s_in.sa_family == NNG_AF_INET); @@ -184,7 +183,7 @@ TestMain("Resolver", { So(nng_aio_alloc(&aio, NULL, NULL) == 0); nng_aio_set_input(aio, 0, &sa); - nni_plat_tcp_resolv("localhost", "80", NNG_AF_UNSPEC, 1, aio); + nni_tcp_resolv("localhost", "80", NNG_AF_UNSPEC, 1, aio); nng_aio_wait(aio); So(nng_aio_result(aio) == 0); So((sa.s_family == NNG_AF_INET) || |
