diff options
Diffstat (limited to 'src')
30 files changed, 3748 insertions, 1883 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e9561980..e89f782b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -104,6 +104,9 @@ if (NNG_PLATFORM_POSIX) platform/posix/posix_resolv_gai.c platform/posix/posix_sockaddr.c platform/posix/posix_tcp.c + platform/posix/posix_tcpconn.c + platform/posix/posix_tcpdial.c + platform/posix/posix_tcplisten.c platform/posix/posix_thread.c platform/posix/posix_udp.c ) @@ -133,6 +136,7 @@ if (NNG_PLATFORM_WINDOWS) platform/windows/win_clock.c platform/windows/win_debug.c platform/windows/win_file.c + platform/windows/win_io.c platform/windows/win_iocp.c platform/windows/win_ipc.c platform/windows/win_pipe.c @@ -140,6 +144,9 @@ if (NNG_PLATFORM_WINDOWS) platform/windows/win_resolv.c platform/windows/win_sockaddr.c platform/windows/win_tcp.c + platform/windows/win_tcpconn.c + platform/windows/win_tcpdial.c + platform/windows/win_tcplisten.c platform/windows/win_thread.c platform/windows/win_udp.c ) diff --git a/src/core/aio.c b/src/core/aio.c index c8517b9b..40638bce 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -378,8 +378,10 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) return (NNG_ECLOSED); } + NNI_ASSERT(aio->a_prov_cancel == NULL); aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; + if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } diff --git a/src/core/platform.h b/src/core/platform.h index 607c3827..5045b172 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -209,93 +209,103 @@ extern int nni_plat_ncpu(void); // // TCP Support. // +typedef struct nni_tcp_conn nni_tcp_conn; +typedef struct nni_tcp_dialer nni_tcp_dialer; +typedef struct nni_tcp_listener nni_tcp_listener; -typedef struct nni_plat_tcp_ep nni_plat_tcp_ep; -typedef struct nni_plat_tcp_pipe nni_plat_tcp_pipe; +extern void nni_tcp_conn_fini(nni_tcp_conn *); -// nni_plat_tcp_ep_init creates a new endpoint associated with the local -// and remote addresses. -extern int nni_plat_tcp_ep_init( - nni_plat_tcp_ep **, const nni_sockaddr *, const nni_sockaddr *, int); +// nni_tcp_dialer_close closes the dialer, which might actually be +// implemented as a shutdown() call. +// Further operations on it should return NNG_ECLOSED. +extern void nni_tcp_conn_close(nni_tcp_conn *); -// nni_plat_tcp_ep_fini closes the endpoint and releases resources. -extern void nni_plat_tcp_ep_fini(nni_plat_tcp_ep *); - -// nni_plat_tcp_ep_close closes the endpoint; this might not close the -// actual underlying socket, but it should call shutdown on it. -// Further operations on the pipe should return NNG_ECLOSED. -extern void nni_plat_tcp_ep_close(nni_plat_tcp_ep *); - -// nni_plat_tcp_listen creates an TCP socket in listening mode, bound -// to the specified path. -extern int nni_plat_tcp_ep_listen(nni_plat_tcp_ep *, nni_sockaddr *); - -// nni_plat_tcp_ep_accept starts an accept to receive an incoming connection. -// An accepted connection will be passed back in the a_pipe member. -extern void nni_plat_tcp_ep_accept(nni_plat_tcp_ep *, nni_aio *); - -// nni_plat_tcp_connect is the client side. -// An accepted connection will be passed back in the a_pipe member. -extern void nni_plat_tcp_ep_connect(nni_plat_tcp_ep *, nni_aio *); - -// nni_plat_tcp_pipe_fini closes the pipe, and releases all resources -// associated with it. -extern void nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *); - -// nni_plat_tcp_pipe_close closes the socket, or at least shuts it down. -// Further operations on the pipe should return NNG_ECLOSED. -extern void nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *); - -// nni_plat_tcp_pipe_send sends data in the iov buffers to the peer. +// nni_tcp_conn_send sends data in the iov buffers to the peer. // The platform may modify the iovs. -extern void nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *, nni_aio *); +extern void nni_tcp_conn_send(nni_tcp_conn *, nni_aio *); -// nni_plat_tcp_pipe_recv receives data into the buffers provided by the +// nni_tcp_conn_recv receives data into the buffers provided by the // I/O vector (iovs). The platform should attempt to scatter the received // data into the iovs if possible. // -// It is an error for the caller to supply any IO vector elements with -// zero length. -// -// It is possible for the TCP reader to return less data than is requested, +// It is possible for the reader to return less data than is requested, // in which case the caller is responsible for resubmitting. The platform -// should not return "zero" data however. (It is an error to attempt to -// receive zero bytes.) The platform may not modify the I/O vector. -extern void nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *, nni_aio *); +// must not return "zero" data however. (It is an error to attempt to +// receive zero bytes.) The platform may modify the iovs. +extern void nni_tcp_conn_recv(nni_tcp_conn *, nni_aio *); -// nni_plat_tcp_pipe_peername gets the peer name. -extern int nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *, nni_sockaddr *); +// nni_tcp_conn_peername gets the peer name. +extern int nni_tcp_conn_peername(nni_tcp_conn *, nni_sockaddr *); -// nni_plat_tcp_pipe_sockname gets the local name. -extern int nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *, nni_sockaddr *); +// nni_tcp_conn_sockname gets the local name. +extern int nni_tcp_conn_sockname(nni_tcp_conn *, nni_sockaddr *); -// nni_plat_tcp_pipe_set_nodelay sets nodelay, disabling Nagle, according -// to the parameter. true disables Nagle; false enables Nagle. -extern int nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *, bool); +// nni_tcp_conn_set_nodelay indicates that the TCP pipe should send +// data immediately, without any buffering. (Disable Nagle's algorithm.) +extern int nni_tcp_conn_set_nodelay(nni_tcp_conn *, bool); -// nni_plat_tcp_pipe_set_keepalive indicates that the TCP pipe should send +// nni_tcp_conn_set_keepalive indicates that the TCP pipe should send // keepalive probes. Tuning of these keepalives is current unsupported. -extern int nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *, bool); - -// nni_plat_tcp_ntop obtains the IP address for the socket (enclosing it +extern int nni_tcp_conn_set_keepalive(nni_tcp_conn *, bool); + +// nni_tcp_listener_init creates a new dialer object. +extern int nni_tcp_dialer_init(nni_tcp_dialer **); + +// nni_tcp_dialer_fini finalizes the dialer, closing it and freeing +// all resources. +extern void nni_tcp_dialer_fini(nni_tcp_dialer *); + +// nni_tcp_dialer_close closes the dialer. +// Further operations on it should return NNG_ECLOSED. Any in-progress +// connection will be aborted. +extern void nni_tcp_dialer_close(nni_tcp_dialer *); + +// nni_tcp_dialer_dial attempts to create an outgoing connection, +// asynchronously, to the address specified. On success, the first (and only) +// output will be an nni_tcp_conn * associated with the remote server. +extern void nni_tcp_dialer_dial( + nni_tcp_dialer *, const nni_sockaddr *, nni_aio *); + +// nni_tcp_listener_init creates a new listener object, unbound. +extern int nni_tcp_listener_init(nni_tcp_listener **); + +// nni_tcp_listener_fini frees the listener and all associated resources. +// It implictly closes the listener as well. +extern void nni_tcp_listener_fini(nni_tcp_listener *); + +// nni_tcp_listener_close closes the listener. This will unbind +// any bound socket, and further operations will result in NNG_ECLOSED. +extern void nni_tcp_listener_close(nni_tcp_listener *); + +// nni_tcp_listener_listen creates the socket in listening mode, bound +// to the specified address. The address will be updated to reflect +// the actual address bound (making it possible to bind to port 0 to +// specify an ephemeral address, and then the actual address can be +// examined afterwards.) +extern int nni_tcp_listener_listen(nni_tcp_listener *, nni_sockaddr *); + +// nni_tcp_listener_accept accepts in incoming connect, asynchronously. +// On success, the first (and only) output will be an nni_tcp_conn * +// associated with the remote peer. +extern void nni_tcp_listener_accept(nni_tcp_listener *, nni_aio *); + +// nni_ntop obtains the IP address for the socket (enclosing it // in brackets if it is IPv6) and port. Enough space for both must // be present (48 bytes and 6 bytes each), although if either is NULL then -// those components are skipped. -extern int nni_plat_tcp_ntop(const nni_sockaddr *, char *, char *); +// those components are skipped. This is based on inet_ntop. +extern int nni_ntop(const nni_sockaddr *, char *, char *); -// nni_plat_tcp_resolv resolves a TCP name asynchronously. The family +// nni_tcp_resolv resolves a TCP name asynchronously. The family // should be one of NNG_AF_INET, NNG_AF_INET6, or NNG_AF_UNSPEC. The // first two constrain the name to those families, while the third will // return names of either family. The passive flag indicates that the // name will be used for bind(), otherwise the name will be used with // connect(). The host part may be NULL only if passive is true. -extern void nni_plat_tcp_resolv( - const char *, const char *, int, int, nni_aio *); +extern void nni_tcp_resolv(const char *, const char *, int, int, nni_aio *); -// nni_plat_udp_resolve is just like nni_plat_tcp_resolve, but looks up +// nni_udp_resolv is just like nni_tcp_resolv, but looks up // service names using UDP. -extern void nni_plat_udp_resolv( - const char *, const char *, int, int, nni_aio *); +extern void nni_udp_resolv(const char *, const char *, int, int, nni_aio *); // // IPC (UNIX Domain Sockets & Named Pipes) Support. @@ -317,7 +327,7 @@ extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *); // Further operations on the pipe should return NNG_ECLOSED. extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *); -// nni_plat_tcp_listen creates an IPC socket in listening mode, bound +// nni_plat_ipc_listen creates an IPC socket in listening mode, bound // to the specified path. extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *); diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 83f2f1a8..3b1ce751 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -20,8 +20,8 @@ #include "core/nng_impl.h" #include "posix_pollq.h" +#include <sys/stat.h> // needed for musl build #include <sys/types.h> // needed for mode_t -#include <sys/stat.h> // needed for musl build typedef struct nni_posix_pipedesc nni_posix_pipedesc; typedef struct nni_posix_epdesc nni_posix_epdesc; @@ -48,5 +48,4 @@ extern int nni_posix_epdesc_listen(nni_posix_epdesc *); extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *); extern int nni_posix_epdesc_sockname(nni_posix_epdesc *, nni_sockaddr *); extern int nni_posix_epdesc_set_permissions(nni_posix_epdesc *, mode_t); - #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 74c92dbd..d796765a 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -188,13 +188,14 @@ nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_posix_epdesc *ed = arg; + NNI_ARG_UNUSED(pfd); + nni_mtx_lock(&ed->mtx); if (events & POLLNVAL) { nni_epdesc_doclose(ed); nni_mtx_unlock(&ed->mtx); return; } - NNI_ASSERT(pfd == ed->pfd); // Anything else will turn up in accept. nni_epdesc_doaccept(ed); diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index 26753df6..ec487de5 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -302,7 +302,7 @@ static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { nni_mtx_lock(&pq->mtx); - pq->closing = 1; + pq->closing = true; nni_mtx_unlock(&pq->mtx); nni_plat_pipe_raise(pq->wakewfd); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 8c003362..63d858c2 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -13,6 +13,7 @@ #ifdef NNG_USE_POSIX_RESOLV_GAI #include "platform/posix/posix_aio.h" +#include <arpa/inet.h> #include <ctype.h> #include <errno.h> #include <netdb.h> @@ -274,14 +275,14 @@ resolv_ip(const char *host, const char *serv, int passive, int family, } void -nni_plat_tcp_resolv( +nni_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void -nni_plat_udp_resolv( +nni_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); @@ -330,6 +331,47 @@ resolv_worker(void *notused) } int +nni_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) +{ + const void *ap; + uint16_t port; + int af; + switch (sa->s_family) { + case NNG_AF_INET: + ap = &sa->s_in.sa_addr; + port = sa->s_in.sa_port; + af = AF_INET; + break; + case NNG_AF_INET6: + ap = &sa->s_in6.sa_addr; + port = sa->s_in6.sa_port; + af = AF_INET6; + break; + default: + return (NNG_EINVAL); + } + if (ipstr != NULL) { + if (af == AF_INET6) { + size_t l; + ipstr[0] = '['; + inet_ntop(af, ap, ipstr + 1, INET6_ADDRSTRLEN); + l = strlen(ipstr); + ipstr[l++] = ']'; + ipstr[l++] = '\0'; + } else { + inet_ntop(af, ap, ipstr, INET6_ADDRSTRLEN); + } + } + if (portstr != NULL) { +#ifdef NNG_LITTLE_ENDIAN + port = ((port >> 8) & 0xff) | ((port & 0xff) << 8); +#endif + snprintf(portstr, 6, "%u", port); + } + return (0); +} + +int nni_posix_resolv_sysinit(void) { nni_mtx_init(&resolv_mtx); diff --git a/src/platform/posix/posix_tcp.c b/src/platform/posix/posix_tcp.c index cbc7a641..53ea8026 100644 --- a/src/platform/posix/posix_tcp.c +++ b/src/platform/posix/posix_tcp.c @@ -26,115 +26,6 @@ #include <unistd.h> int -nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa, - const nni_sockaddr *rsa, int mode) -{ - nni_posix_epdesc * ed; - int rv; - struct sockaddr_storage ss; - int len; - - if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) { - return (rv); - } - - if ((rsa != NULL) && (rsa->s_family != NNG_AF_UNSPEC)) { - len = nni_posix_nn2sockaddr((void *) &ss, rsa); - nni_posix_epdesc_set_remote(ed, &ss, len); - } - if ((lsa != NULL) && (lsa->s_family != NNG_AF_UNSPEC)) { - len = nni_posix_nn2sockaddr((void *) &ss, lsa); - nni_posix_epdesc_set_local(ed, &ss, len); - } - - *epp = (void *) ed; - return (0); -} - -void -nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_fini((void *) ep); -} - -void -nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_close((void *) ep); -} - -int -nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep, nng_sockaddr *bsa) -{ - int rv; - rv = nni_posix_epdesc_listen((void *) ep); - if ((rv == 0) && (bsa != NULL)) { - rv = nni_posix_epdesc_sockname((void *) ep, bsa); - } - return (rv); -} - -void -nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_connect((void *) ep, aio); -} - -void -nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_accept((void *) ep, aio); -} - -void -nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_fini((void *) p); -} - -void -nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_close((void *) p); -} - -void -nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_send((void *) p, aio); -} - -void -nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_recv((void *) p, aio); -} - -int -nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *p, nni_sockaddr *sa) -{ - return (nni_posix_pipedesc_peername((void *) p, sa)); -} - -int -nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *p, nni_sockaddr *sa) -{ - return (nni_posix_pipedesc_sockname((void *) p, sa)); -} - -int -nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *p, bool v) -{ - return (nni_posix_pipedesc_set_keepalive((void *) p, v)); -} - -int -nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *p, bool v) -{ - return (nni_posix_pipedesc_set_nodelay((void *) p, v)); -} - -int nni_plat_tcp_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) { const void *ap; diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h new file mode 100644 index 00000000..aefce7f7 --- /dev/null +++ b/src/platform/posix/posix_tcp.h @@ -0,0 +1,44 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +struct nni_tcp_conn { + nni_posix_pfd * pfd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_aio * dial_aio; + nni_tcp_dialer *dialer; + nni_reap_item reap; +}; + +struct nni_tcp_dialer { + nni_list connq; // pending connections + bool closed; + nni_mtx mtx; +}; + +struct nni_tcp_listener { + nni_posix_pfd *pfd; + nni_list acceptq; + bool started; + bool closed; + nni_mtx mtx; +}; + +extern int nni_posix_tcp_conn_init(nni_tcp_conn **, nni_posix_pfd *); +extern void nni_posix_tcp_conn_start(nni_tcp_conn *); + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c new file mode 100644 index 00000000..30525648 --- /dev/null +++ b/src/platform/posix/posix_tcpconn.c @@ -0,0 +1,410 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <poll.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> +#ifdef NNG_HAVE_ALLOCA +#include <alloca.h> +#endif + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#include "posix_tcp.h" + +static void +tcp_conn_dowrite(nni_tcp_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->writeq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov * aiov; + struct msghdr hdr; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + memset(&hdr, 0, sizeof(hdr)); + nni_aio_get_iov(aio, &naiov, &aiov); + +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len > 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + hdr.msg_iovlen = niov; + hdr.msg_iov = iovec; + + if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + nni_aio_bump_count(aio, n); + // We completed the entire operation on this aio. + // (Sendmsg never returns a partial result.) + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +static void +tcp_conn_doread(nni_tcp_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->readq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov *aiov; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + nni_aio_get_iov(aio, &naiov, &aiov); +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + if (n == 0) { + // No bytes indicates a closed descriptor. + // This implicitly completes this (all!) aio. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + continue; + } + + nni_aio_bump_count(aio, n); + + // We completed the entire operation on this aio. + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +void +nni_tcp_conn_close(nni_tcp_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + nni_aio *aio; + c->closed = true; + while (((aio = nni_list_first(&c->readq)) != NULL) || + ((aio = nni_list_first(&c->writeq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_posix_pfd_close(c->pfd); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_tcp_conn *c = arg; + + if (events & (POLLHUP | POLLERR | POLLNVAL)) { + nni_tcp_conn_close(c); + return; + } + nni_mtx_lock(&c->mtx); + if (events & POLLIN) { + tcp_conn_doread(c); + } + if (events & POLLOUT) { + tcp_conn_dowrite(c); + } + events = 0; + if (!nni_list_empty(&c->writeq)) { + events |= POLLOUT; + } + if (!nni_list_empty(&c->readq)) { + events |= POLLIN; + } + if ((!c->closed) && (events != 0)) { + nni_posix_pfd_arm(pfd, events); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_conn_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) +{ + + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->writeq, aio); + + if (nni_list_first(&c->writeq) == aio) { + tcp_conn_dowrite(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->writeq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLOUT); + } + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->readq, aio); + + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. + if (nni_list_first(&c->readq) == aio) { + tcp_conn_doread(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->readq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLIN); + } + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa) +{ + struct sockaddr_storage ss; + socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(c->pfd); + + if (getpeername(fd, (void *) &ss, &sslen) != 0) { + return (nni_plat_errno(errno)); + } + return (nni_posix_sockaddr2nn(sa, &ss)); +} + +int +nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa) +{ + struct sockaddr_storage ss; + socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(c->pfd); + + if (getsockname(fd, (void *) &ss, &sslen) != 0) { + return (nni_plat_errno(errno)); + } + return (nni_posix_sockaddr2nn(sa, &ss)); +} + +int +nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep) +{ + int val = keep ? 1 : 0; + int fd = nni_posix_pfd_fd(c->pfd); + + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { + return (nni_plat_errno(errno)); + } + return (0); +} + +int +nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay) +{ + + int val = nodelay ? 1 : 0; + int fd = nni_posix_pfd_fd(c->pfd); + + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { + return (nni_plat_errno(errno)); + } + return (0); +} + +int +nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd) +{ + nni_tcp_conn *c; + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + + c->closed = false; + c->pfd = pfd; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + *cp = c; + return (0); +} + +void +nni_posix_tcp_conn_start(nni_tcp_conn *c) +{ + nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c); +} + +void +nni_tcp_conn_fini(nni_tcp_conn *c) +{ + nni_tcp_conn_close(c); + nni_posix_pfd_fini(c->pfd); + c->pfd = NULL; + nni_mtx_fini(&c->mtx); + + NNI_FREE_STRUCT(c); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c new file mode 100644 index 00000000..7f627361 --- /dev/null +++ b/src/platform/posix/posix_tcpdial.c @@ -0,0 +1,234 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_tcp.h" + +// Dialer stuff. +int +nni_tcp_dialer_init(nni_tcp_dialer **dp) +{ + nni_tcp_dialer *d; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&d->mtx); + d->closed = false; + nni_aio_list_init(&d->connq); + *dp = d; + return (0); +} + +void +nni_tcp_dialer_close(nni_tcp_dialer *d) +{ + nni_mtx_lock(&d->mtx); + if (!d->closed) { + nni_aio *aio; + d->closed = true; + while ((aio = nni_list_first(&d->connq)) != NULL) { + nni_tcp_conn *c; + nni_list_remove(&d->connq, aio); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_tcp_conn_close(c); + nni_reap( + &c->reap, (nni_cb) nni_tcp_conn_fini, c); + } + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } + nni_mtx_unlock(&d->mtx); +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_close(d); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +tcp_dialer_cancel(nni_aio *aio, int rv) +{ + nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_conn * c; + + nni_mtx_lock(&d->mtx); + if ((!nni_aio_list_active(aio)) || + ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) { + nni_mtx_unlock(&d->mtx); + return; + } + nni_aio_list_remove(aio); + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + nni_aio_finish_error(aio, rv); + nni_tcp_conn_fini(c); +} + +static void +tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg) +{ + nni_tcp_conn * c = arg; + nni_tcp_dialer *d = c->dialer; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + aio = c->dial_aio; + if ((aio == NULL) || (!nni_aio_list_active(aio))) { + nni_mtx_unlock(&d->mtx); + return; + } + + if (ev & POLLNVAL) { + rv = EBADF; + + } else { + socklen_t sz = sizeof(int); + int fd = nni_posix_pfd_fd(pfd); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + if (rv == EINPROGRESS) { + // Connection still in progress, come back + // later. + nni_mtx_unlock(&d->mtx); + return; + } else if (rv != 0) { + rv = nni_plat_errno(rv); + } + } + + c->dial_aio = NULL; + nni_aio_list_remove(aio); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + if (rv != 0) { + nni_tcp_conn_close(c); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +// We don't give local address binding support. Outbound dialers always +// get an ephemeral port. +void +nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + nni_tcp_conn * c; + nni_posix_pfd * pfd = NULL; + struct sockaddr_storage ss; + size_t sslen; + int fd; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_aio_finish_error(aio, nni_plat_errno(errno)); + return; + } + + // This arranges for the fd to be in nonblocking mode, and adds the + // pollfd to the list. + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + (void) close(fd); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_finish_error(aio, rv); + return; + } + c->dialer = d; + nni_posix_pfd_set_cb(pfd, tcp_dialer_cb, c); + + nni_mtx_lock(&d->mtx); + if (d->closed) { + rv = NNG_ECLOSED; + goto error; + } + if ((rv = nni_aio_schedule(aio, tcp_dialer_cancel, d)) != 0) { + goto error; + } + if ((rv = connect(fd, (void *) &ss, sslen)) != 0) { + if (errno != EINPROGRESS) { + rv = nni_plat_errno(errno); + goto error; + } + // Asynchronous connect. + if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) { + goto error; + } + c->dial_aio = aio; + nni_aio_set_prov_extra(aio, 0, c); + nni_list_append(&d->connq, aio); + nni_mtx_unlock(&d->mtx); + return; + } + // Immediate connect, cool! This probably only happens + // on loopback, and probably not on every platform. + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + return; + +error: + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_reap(&c->reap, (nni_cb) nni_tcp_conn_fini, c); + nni_aio_finish_error(aio, rv); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c new file mode 100644 index 00000000..cdcbecd9 --- /dev/null +++ b/src/platform/posix/posix_tcplisten.c @@ -0,0 +1,319 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_tcp.h" + +int +nni_tcp_listener_init(nni_tcp_listener **lp) +{ + nni_tcp_listener *l; + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_init(&l->mtx); + + l->pfd = NULL; + l->closed = false; + l->started = false; + + nni_aio_list_init(&l->acceptq); + *lp = l; + return (0); +} + +static void +tcp_listener_doclose(nni_tcp_listener *l) +{ + nni_aio *aio; + + l->closed = true; + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + + if (l->pfd != NULL) { + nni_posix_pfd_close(l->pfd); + } +} + +void +nni_tcp_listener_close(nni_tcp_listener *l) +{ + nni_mtx_lock(&l->mtx); + tcp_listener_doclose(l); + nni_mtx_unlock(&l->mtx); +} + +static void +tcp_listener_doaccept(nni_tcp_listener *l) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + int newfd; + int fd; + int rv; + nni_posix_pfd *pfd; + nni_tcp_conn * c; + + fd = nni_posix_pfd_fd(l->pfd); + +#ifdef NNG_USE_ACCEPT4 + newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); + if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { + newfd = accept(fd, NULL, NULL); + } +#else + newfd = accept(fd, NULL, NULL); +#endif + if (newfd < 0) { + switch (errno) { + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + rv = nni_posix_pfd_arm(l->pfd, POLLIN); + if (rv != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + // Come back later... + return; + case ECONNABORTED: + case ECONNRESET: + // Eat them, they aren't interesting. + continue; + default: + // Error this one, but keep moving to the next. + rv = nni_plat_errno(errno); + NNI_ASSERT(rv != 0); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + } + + if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + close(newfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + nni_aio_list_remove(aio); + nni_posix_tcp_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +static void +tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_tcp_listener *l = arg; + + nni_mtx_lock(&l->mtx); + if (events & POLLNVAL) { + tcp_listener_doclose(l); + nni_mtx_unlock(&l->mtx); + return; + } + NNI_ASSERT(pfd == l->pfd); + + // Anything else will turn up in accept. + tcp_listener_doaccept(l); + nni_mtx_unlock(&l->mtx); +} + +static void +tcp_listener_cancel(nni_aio *aio, int rv) +{ + nni_tcp_listener *l = nni_aio_get_prov_data(aio); + + // This is dead easy, because we'll ignore the completion if there + // isn't anything to do the accept on! + NNI_ASSERT(rv != 0); + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +int +nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) +{ + socklen_t len; + struct sockaddr_storage ss; + int rv; + int fd; + nni_posix_pfd * pfd; + + if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) { + return (NNG_EADDRINVAL); + } + + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_ESTATE); + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_mtx_unlock(&l->mtx); + return (nni_plat_errno(errno)); + } + + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + +// On the Windows Subsystem for Linux, SO_REUSEADDR behaves like Windows +// SO_REUSEADDR, which is almost completely different (and wrong!) from +// traditional SO_REUSEADDR. +#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL) + { + int on = 1; + // If for some reason this doesn't work, it's probably ok. + // Second bind will fail. + (void) setsockopt( + fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + } +#endif + + if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + + // Listen -- 128 depth is probably sufficient. If it isn't, other + // bad things are going to happen. + if (listen(fd, 128) != 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } + + // Lets get the bound sockname, and pass that back to the caller. + // This permits ephemeral port binding to work. + // If this fails for some reason, we just don't update the + // sockaddr structure. This is kind of suboptimal, but failures + // here should never occur. + len = sizeof(ss); + (void) getsockname(fd, (void *) &ss, &len); + (void) nni_posix_sockaddr2nn(sa, &ss); + + nni_posix_pfd_set_cb(pfd, tcp_listener_cb, l); + + l->pfd = pfd; + l->started = true; + nni_mtx_unlock(&l->mtx); + + return (0); +} + +void +nni_tcp_listener_fini(nni_tcp_listener *l) +{ + nni_posix_pfd *pfd; + + nni_mtx_lock(&l->mtx); + tcp_listener_doclose(l); + pfd = l->pfd; + nni_mtx_unlock(&l->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +void +nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) +{ + int rv; + + // Accept is simpler than the connect case. With accept we just + // need to wait for the socket to be readable to indicate an incoming + // connection is ready for us. There isn't anything else for us to + // do really, as that will have been done in listen. + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + + if (!l->started) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&l->acceptq, aio); + if (nni_list_first(&l->acceptq) == aio) { + tcp_listener_doaccept(l); + } + nni_mtx_unlock(&l->mtx); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index 263a322b..93e45423 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -80,6 +80,17 @@ struct nni_win_event { nni_win_event_ops ops; }; +typedef struct nni_win_io nni_win_io; +typedef void (*nni_win_io_cb)(nni_win_io *, int, size_t); + +struct nni_win_io { + OVERLAPPED olpd; + HANDLE f; + void * ptr; + nni_aio * aio; + nni_win_io_cb cb; +}; + struct nni_plat_flock { HANDLE h; }; @@ -94,6 +105,13 @@ extern void nni_win_event_complete(nni_win_event *, int); extern int nni_win_iocp_register(HANDLE); +extern int nni_win_tcp_conn_init(nni_tcp_conn **, SOCKET); +extern void nni_win_tcp_conn_set_addrs( + nni_tcp_conn *, const SOCKADDR_STORAGE *, const SOCKADDR_STORAGE *); + +extern int nni_win_io_sysinit(void); +extern void nni_win_io_sysfini(void); + extern int nni_win_iocp_sysinit(void); extern void nni_win_iocp_sysfini(void); @@ -109,6 +127,12 @@ extern void nni_win_udp_sysfini(void); extern int nni_win_resolv_sysinit(void); extern void nni_win_resolv_sysfini(void); +extern int nni_win_io_init(nni_win_io *, HANDLE, nni_win_io_cb, void *); +extern void nni_win_io_fini(nni_win_io *); +extern void nni_win_io_cancel(nni_win_io *); + +extern int nni_win_io_register(HANDLE); + extern int nni_win_sockaddr2nn(nni_sockaddr *, const SOCKADDR_STORAGE *); extern int nni_win_nn2sockaddr(SOCKADDR_STORAGE *, const nni_sockaddr *); diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c new file mode 100644 index 00000000..1179b603 --- /dev/null +++ b/src/platform/windows/win_io.c @@ -0,0 +1,152 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include <stdio.h> + +// Windows IO Completion Port support. We basically create a single +// IO completion port, then start threads on it. Handles are added +// to the port on an as needed basis. We use a single IO completion +// port for pretty much everything. + +static int win_io_nthr = 0; +static HANDLE win_io_h = NULL; +static nni_thr *win_io_thrs; + +static void +win_io_handler(void *arg) +{ + NNI_ARG_UNUSED(arg); + + for (;;) { + DWORD cnt; + BOOL ok; + nni_win_io *item; + OVERLAPPED *olpd = NULL; + ULONG_PTR key = 0; + int rv; + + ok = GetQueuedCompletionStatus( + win_io_h, &cnt, &key, &olpd, INFINITE); + + if (olpd == NULL) { + // Completion port closed... + NNI_ASSERT(ok == FALSE); + break; + } + + item = CONTAINING_RECORD(olpd, nni_win_io, olpd); + rv = ok ? 0 : nni_win_error(GetLastError()); + item->cb(item, rv, (size_t) cnt); + } +} + +int +nni_win_io_register(HANDLE h) +{ + if (CreateIoCompletionPort(h, win_io_h, 0, 0) == NULL) { + return (nni_win_error(GetLastError())); + } + return (0); +} + +int +nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr) +{ + ZeroMemory(&io->olpd, sizeof(io->olpd)); + + io->cb = cb; + io->ptr = ptr; + io->aio = NULL; + io->f = f; + io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); + if (io->olpd.hEvent == NULL) { + return (nni_win_error(GetLastError())); + } + return (0); +} + +void +nni_win_io_cancel(nni_win_io *io) +{ + if (io->f != INVALID_HANDLE_VALUE) { + CancelIoEx(io->f, &io->olpd); + } +} + +void +nni_win_io_fini(nni_win_io *io) +{ + if (io->olpd.hEvent != NULL) { + CloseHandle((HANDLE) io->olpd.hEvent); + } +} + +int +nni_win_io_sysinit(void) +{ + HANDLE h; + int i; + int rv; + int nthr = nni_plat_ncpu() * 2; + + // Limits on the thread count. This is fairly arbitrary. + if (nthr < 4) { + nthr = 4; + } + if (nthr > 64) { + nthr = 64; + } + if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) { + return (NNG_ENOMEM); + } + win_io_nthr = nthr; + + h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nthr); + if (h == NULL) { + return (nni_win_error(GetLastError())); + } + win_io_h = h; + + for (i = 0; i < win_io_nthr; i++) { + rv = nni_thr_init(&win_io_thrs[i], win_io_handler, NULL); + if (rv != 0) { + goto fail; + } + } + for (i = 0; i < win_io_nthr; i++) { + nni_thr_run(&win_io_thrs[i]); + } + return (0); + +fail: + nni_win_io_sysfini(); + return (rv); +} + +void +nni_win_io_sysfini(void) +{ + int i; + HANDLE h; + + if ((h = win_io_h) != NULL) { + CloseHandle(h); + win_io_h = NULL; + } + for (i = 0; i < win_io_nthr; i++) { + nni_thr_fini(&win_io_thrs[i]); + } +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index fb9d6751..e2cd192e 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -69,7 +69,7 @@ resolv_cancel(nni_aio *aio, int rv) } static int -resolv_gai_errno(int rv) +resolv_errno(int rv) { switch (rv) { case 0: @@ -116,7 +116,7 @@ resolv_task(resolv_item *item) hints.ai_family = item->family; if ((rv = getaddrinfo(item->name, "80", &hints, &results)) != 0) { - rv = resolv_gai_errno(rv); + rv = resolv_errno(rv); goto done; } @@ -246,14 +246,14 @@ resolv_ip(const char *host, const char *serv, int passive, int family, } void -nni_plat_tcp_resolv( +nni_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void -nni_plat_udp_resolv( +nni_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); @@ -302,6 +302,47 @@ resolv_worker(void *notused) } int +nni_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) +{ + void * ap; + uint16_t port; + int af; + switch (sa->s_family) { + case NNG_AF_INET: + ap = (void *) &sa->s_in.sa_addr; + port = sa->s_in.sa_port; + af = AF_INET; + break; + case NNG_AF_INET6: + ap = (void *) &sa->s_in6.sa_addr; + port = sa->s_in6.sa_port; + af = AF_INET6; + break; + default: + return (NNG_EINVAL); + } + if (ipstr != NULL) { + if (af == AF_INET6) { + size_t l; + ipstr[0] = '['; + InetNtopA(af, ap, ipstr + 1, INET6_ADDRSTRLEN); + l = strlen(ipstr); + ipstr[l++] = ']'; + ipstr[l++] = '\0'; + } else { + InetNtopA(af, ap, ipstr, INET6_ADDRSTRLEN); + } + } + if (portstr != NULL) { +#ifdef NNG_LITTLE_ENDIAN + port = ((port >> 8) & 0xff) | ((port & 0xff) << 8); +#endif + snprintf(portstr, 6, "%u", port); + } + return (0); +} + +int nni_win_resolv_sysinit(void) { nni_mtx_init(&resolv_mtx); diff --git a/src/platform/windows/win_tcp.c b/src/platform/windows/win_tcp.c index 33c4a1a5..2ab6be0f 100644 --- a/src/platform/windows/win_tcp.c +++ b/src/platform/windows/win_tcp.c @@ -15,709 +15,6 @@ #include <malloc.h> #include <stdio.h> -struct nni_plat_tcp_pipe { - SOCKET s; - nni_win_event rcv_ev; - nni_win_event snd_ev; - SOCKADDR_STORAGE sockname; - SOCKADDR_STORAGE peername; -}; - -struct nni_plat_tcp_ep { - SOCKET s; - SOCKET acc_s; - nni_win_event con_ev; - nni_win_event acc_ev; - int started; - int bound; - - SOCKADDR_STORAGE remaddr; - int remlen; - SOCKADDR_STORAGE locaddr; - int loclen; - - char buf[512]; // to hold acceptex results - - // We have to lookup some function pointers using ioctls. Winsock, - // gotta love it. Especially I love that asynch accept means that - // getsockname and getpeername don't work. - LPFN_CONNECTEX connectex; - LPFN_ACCEPTEX acceptex; - LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs; -}; - -static int nni_win_tcp_pipe_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_pipe_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_pipe_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_pipe_ops = { - .wev_start = nni_win_tcp_pipe_start, - .wev_finish = nni_win_tcp_pipe_finish, - .wev_cancel = nni_win_tcp_pipe_cancel, -}; - -static int nni_win_tcp_acc_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_acc_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_acc_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_acc_ops = { - .wev_start = nni_win_tcp_acc_start, - .wev_finish = nni_win_tcp_acc_finish, - .wev_cancel = nni_win_tcp_acc_cancel, -}; - -static int nni_win_tcp_con_start(nni_win_event *, nni_aio *); -static void nni_win_tcp_con_finish(nni_win_event *, nni_aio *); -static void nni_win_tcp_con_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_tcp_con_ops = { - .wev_start = nni_win_tcp_con_start, - .wev_finish = nni_win_tcp_con_finish, - .wev_cancel = nni_win_tcp_con_cancel, -}; - -static void -nni_win_tcp_sockinit(SOCKET s) -{ - BOOL yes; - DWORD no; - - // Don't inherit the handle (CLOEXEC really). - SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0); - - no = 0; - (void) setsockopt( - s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); - - // Also disable Nagle. We are careful to group data with WSASend, - // and latency is king for most of our users. (Consider adding - // a method to enable this later.) - yes = 1; - (void) setsockopt( - s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); -} - -static int -nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) -{ - int rv; - SOCKET s; - DWORD niov; - DWORD flags; - nni_plat_tcp_pipe *pipe = evt->ptr; - unsigned i; - unsigned naiov; - nni_iov * aiov; - WSABUF * iov; - - nni_aio_get_iov(aio, &naiov, &aiov); - iov = _malloca(naiov * sizeof(*iov)); - - // Put the AIOs in Windows form. - for (niov = 0, i = 0; i < naiov; i++) { - if (aiov[i].iov_len != 0) { - iov[niov].buf = aiov[i].iov_buf; - iov[niov].len = (ULONG) aiov[i].iov_len; - niov++; - } - } - - if ((s = pipe->s) == INVALID_SOCKET) { - _freea(iov); - evt->status = NNG_ECLOSED; - evt->count = 0; - return (1); - } - - // Note that the IOVs for the event were prepared on entry already. - // The actual aio's iov array we don't touch. - - evt->count = 0; - flags = 0; - if (evt == &pipe->snd_ev) { - rv = WSASend(s, iov, niov, NULL, flags, &evt->olpd, NULL); - } else { - rv = WSARecv(s, iov, niov, NULL, &flags, &evt->olpd, NULL); - } - _freea(iov); - - if ((rv == SOCKET_ERROR) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - - // Wait for the I/O completion event. Note that when an I/O - // completes immediately, the I/O completion packet is still - // delivered. - return (0); -} - -static void -nni_win_tcp_pipe_cancel(nni_win_event *evt) -{ - nni_plat_tcp_pipe *pipe = evt->ptr; - - (void) CancelIoEx((HANDLE) pipe->s, &evt->olpd); -} - -static void -nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio) -{ - if ((evt->status == 0) && (evt->count == 0)) { - // Windows sometimes returns a zero read. Convert these - // into an NNG_ECLOSED. (We are never supposed to come - // back with zero length read.) - evt->status = NNG_ECLOSED; - } - nni_aio_finish(aio, evt->status, evt->count); -} - -static int -nni_win_tcp_pipe_init(nni_plat_tcp_pipe **pipep, SOCKET s) -{ - nni_plat_tcp_pipe *pipe; - int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_tcp_pipe_ops, pipe); - if (rv != 0) { - nni_plat_tcp_pipe_fini(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->snd_ev, &nni_win_tcp_pipe_ops, pipe); - if (rv != 0) { - nni_plat_tcp_pipe_fini(pipe); - return (rv); - } - nni_win_tcp_sockinit(s); - pipe->s = s; - *pipep = pipe; - return (0); -} - -void -nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->snd_ev, aio); -} - -void -nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->rcv_ev, aio); -} - -void -nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *pipe) -{ - SOCKET s; - - nni_win_event_close(&pipe->rcv_ev); - - if ((s = pipe->s) != INVALID_SOCKET) { - pipe->s = INVALID_SOCKET; - closesocket(s); - } -} - -int -nni_plat_tcp_pipe_peername(nni_plat_tcp_pipe *pipe, nni_sockaddr *sa) -{ - if (nni_win_sockaddr2nn(sa, &pipe->peername) < 0) { - return (NNG_EADDRINVAL); - } - return (0); -} - -int -nni_plat_tcp_pipe_sockname(nni_plat_tcp_pipe *pipe, nni_sockaddr *sa) -{ - if (nni_win_sockaddr2nn(sa, &pipe->sockname) < 0) { - return (NNG_EADDRINVAL); - } - return (0); -} - -int -nni_plat_tcp_pipe_set_nodelay(nni_plat_tcp_pipe *pipe, bool val) -{ - BOOL b; - b = val ? TRUE : FALSE; - if (setsockopt(pipe->s, IPPROTO_TCP, TCP_NODELAY, (void *) &b, - sizeof(b)) != 0) { - return (nni_win_error(WSAGetLastError())); - } - return (0); -} - -int -nni_plat_tcp_pipe_set_keepalive(nni_plat_tcp_pipe *pipe, bool val) -{ - BOOL b; - b = val ? TRUE : FALSE; - if (setsockopt(pipe->s, SOL_SOCKET, SO_KEEPALIVE, (void *) &b, - sizeof(b)) != 0) { - return (nni_win_error(WSAGetLastError())); - } - return (0); -} - -void -nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *pipe) -{ - nni_plat_tcp_pipe_close(pipe); - - nni_win_event_fini(&pipe->snd_ev); - nni_win_event_fini(&pipe->rcv_ev); - NNI_FREE_STRUCT(pipe); -} - -int -nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa, - const nni_sockaddr *rsa, int mode) -{ - nni_plat_tcp_ep *ep; - int rv; - SOCKET s; - DWORD nbytes; - GUID guid1 = WSAID_CONNECTEX; - GUID guid2 = WSAID_ACCEPTEX; - GUID guid3 = WSAID_GETACCEPTEXSOCKADDRS; - - NNI_ARG_UNUSED(mode); - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } - ZeroMemory(ep, sizeof(*ep)); - - ep->s = INVALID_SOCKET; - - if ((rsa != NULL) && (rsa->s_family != NNG_AF_UNSPEC)) { - ep->remlen = nni_win_nn2sockaddr(&ep->remaddr, rsa); - } - if ((lsa != NULL) && (lsa->s_family != NNG_AF_UNSPEC)) { - ep->loclen = nni_win_nn2sockaddr(&ep->locaddr, lsa); - } - - // Create a scratch socket for use with ioctl. - s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - // Look up the function pointer. - if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, - sizeof(guid1), &ep->connectex, sizeof(ep->connectex), &nbytes, - NULL, NULL) == SOCKET_ERROR) { - rv = nni_win_error(GetLastError()); - goto fail; - } - if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, - sizeof(guid2), &ep->acceptex, sizeof(ep->acceptex), &nbytes, - NULL, NULL) == SOCKET_ERROR) { - rv = nni_win_error(GetLastError()); - goto fail; - } - if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid3, - sizeof(guid3), &ep->getacceptexsockaddrs, - sizeof(ep->getacceptexsockaddrs), &nbytes, NULL, - NULL) == SOCKET_ERROR) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - closesocket(s); - s = INVALID_SOCKET; - - // Now initialize the win events for later use. - rv = nni_win_event_init(&ep->acc_ev, &nni_win_tcp_acc_ops, ep); - if (rv != 0) { - goto fail; - } - rv = nni_win_event_init(&ep->con_ev, &nni_win_tcp_con_ops, ep); - if (rv != 0) { - goto fail; - } - - *epp = ep; - return (0); - -fail: - if (s != INVALID_SOCKET) { - closesocket(s); - } - nni_plat_tcp_ep_fini(ep); - return (rv); -} - -void -nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) -{ - nni_win_event_close(&ep->acc_ev); - nni_win_event_close(&ep->con_ev); - if (ep->s != INVALID_SOCKET) { - closesocket(ep->s); - ep->s = INVALID_SOCKET; - } - if (ep->acc_s != INVALID_SOCKET) { - closesocket(ep->acc_s); - } -} - -void -nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) -{ - nni_plat_tcp_ep_close(ep); - NNI_FREE_STRUCT(ep); -} - -static int -nni_win_tcp_listen(nni_plat_tcp_ep *ep, nni_sockaddr *bsa) -{ - int rv; - BOOL yes; - SOCKET s; - - if (ep->started) { - return (NNG_EBUSY); - } - - s = socket(ep->locaddr.ss_family, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - nni_win_tcp_sockinit(s); - - if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { - goto fail; - } - - // Make sure that we use the address exclusively. Windows lets - // others hijack us by default. - yes = 1; - - rv = setsockopt( - s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, sizeof(yes)); - if (rv != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - if (bind(s, (struct sockaddr *) &ep->locaddr, ep->loclen) != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - if (bsa != NULL) { - SOCKADDR_STORAGE bound; - int len = sizeof(bound); - rv = getsockname(s, (SOCKADDR *) &bound, &len); - if (rv != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - nni_win_sockaddr2nn(bsa, &bound); - } - - if (listen(s, SOMAXCONN) != 0) { - rv = nni_win_error(GetLastError()); - goto fail; - } - - ep->s = s; - ep->started = 1; - - return (0); - -fail: - if (s != INVALID_SOCKET) { - closesocket(s); - } - return (rv); -} - -int -nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep, nng_sockaddr *bsa) -{ - int rv; - - nni_mtx_lock(&ep->acc_ev.mtx); - rv = nni_win_tcp_listen(ep, bsa); - nni_mtx_unlock(&ep->acc_ev.mtx); - return (rv); -} - -static void -nni_win_tcp_acc_cancel(nni_win_event *evt) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - - if (s != INVALID_SOCKET) { - CancelIoEx((HANDLE) s, &evt->olpd); - } -} - -static void -nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep * ep = evt->ptr; - nni_plat_tcp_pipe *pipe; - SOCKET s; - int rv; - int len1; - int len2; - SOCKADDR * sa1; - SOCKADDR * sa2; - - s = ep->acc_s; - ep->acc_s = INVALID_SOCKET; - - if (s == INVALID_SOCKET) { - return; - } - - if (((rv = evt->status) != 0) || - ((rv = nni_win_iocp_register((HANDLE) s)) != 0) || - ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { - closesocket(s); - nni_aio_finish_error(aio, rv); - return; - } - - // Collect the local and peer addresses, because normal getsockname - // and getpeername don't work with AcceptEx. - len1 = (int) sizeof(pipe->sockname); - len2 = (int) sizeof(pipe->peername); - ep->getacceptexsockaddrs( - ep->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2); - NNI_ASSERT(len1 > 0); - NNI_ASSERT(len1 < (int) sizeof(SOCKADDR_STORAGE)); - NNI_ASSERT(len2 > 0); - NNI_ASSERT(len2 < (int) sizeof(SOCKADDR_STORAGE)); - memcpy(&pipe->sockname, sa1, len1); - memcpy(&pipe->peername, sa2, len2); - - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); -} - -static int -nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - SOCKET acc_s; - DWORD cnt; - - NNI_ARG_UNUSED(aio); - - acc_s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); - if (acc_s == INVALID_SOCKET) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - return (1); - } - ep->acc_s = acc_s; - - if (!ep->acceptex(s, acc_s, ep->buf, 0, 256, 256, &cnt, &evt->olpd)) { - int rv = GetLastError(); - switch (rv) { - case ERROR_IO_PENDING: - // Normal asynchronous operation. Wait for - // completion. - return (0); - - default: - // Fast-fail (synchronous). - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - } - - // Synch completion right now. I/O completion packet delivered - // already. - return (0); -} - -void -nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_win_event_submit(&ep->acc_ev, aio); -} - -static void -nni_win_tcp_con_cancel(nni_win_event *evt) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s = ep->s; - - if (s != INVALID_SOCKET) { - CancelIoEx((HANDLE) s, &evt->olpd); - } -} - -static void -nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep * ep = evt->ptr; - nni_plat_tcp_pipe *pipe; - SOCKET s; - int rv; - DWORD yes = 1; - int len; - - s = ep->s; - ep->s = INVALID_SOCKET; - - // The socket was already registered with the IOCP. - - if (((rv = evt->status) != 0) || - ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { - // The new pipe is already fine for us. Discard - // the old one, since failed to be able to use it. - closesocket(s); - nni_aio_finish_error(aio, rv); - return; - } - - (void) setsockopt(s, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, - (char *) &yes, sizeof(yes)); - - // Windows seems to be unable to get peernames for sockets on - // connect - perhaps because we supplied it already with connectex. - // Rather than debugging it, just steal the address from the endpoint. - memcpy(&pipe->peername, &ep->remaddr, ep->remlen); - - len = sizeof(pipe->sockname); - (void) getsockname(s, (SOCKADDR *) &pipe->sockname, &len); - - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); -} - -static int -nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_tcp_ep *ep = evt->ptr; - SOCKET s; - SOCKADDR_STORAGE bss; - int len; - int rv; - int family; - - NNI_ARG_UNUSED(aio); - - if (ep->loclen > 0) { - family = ep->locaddr.ss_family; - } else { - family = ep->remaddr.ss_family; - } - - s = socket(family, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - return (1); - } - - nni_win_tcp_sockinit(s); - - // Windows ConnectEx requires the socket to be bound first. - if (ep->loclen > 0) { - bss = ep->locaddr; - len = ep->loclen; - } else { - ZeroMemory(&bss, sizeof(bss)); - bss.ss_family = ep->remaddr.ss_family; - len = ep->remlen; - } - if (bind(s, (struct sockaddr *) &bss, len) < 0) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - closesocket(s); - - return (1); - } - // Register with the I/O completion port so we can get the - // events for the next call. - if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { - closesocket(s); - evt->status = rv; - evt->count = 0; - return (1); - } - - ep->s = s; - if (!ep->connectex(s, (struct sockaddr *) &ep->remaddr, ep->remlen, - NULL, 0, NULL, &evt->olpd)) { - if ((rv = GetLastError()) != ERROR_IO_PENDING) { - closesocket(s); - ep->s = INVALID_SOCKET; - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - } - return (0); -} - -extern void -nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - nni_win_event_submit(&ep->con_ev, aio); -} - -int -nni_plat_tcp_ntop(const nni_sockaddr *sa, char *ipstr, char *portstr) -{ - void * ap; - uint16_t port; - int af; - switch (sa->s_family) { - case NNG_AF_INET: - ap = (void *) &sa->s_in.sa_addr; - port = sa->s_in.sa_port; - af = AF_INET; - break; - case NNG_AF_INET6: - ap = (void *) &sa->s_in6.sa_addr; - port = sa->s_in6.sa_port; - af = AF_INET6; - break; - default: - return (NNG_EINVAL); - } - if (ipstr != NULL) { - if (af == AF_INET6) { - size_t l; - ipstr[0] = '['; - InetNtopA(af, ap, ipstr + 1, INET6_ADDRSTRLEN); - l = strlen(ipstr); - ipstr[l++] = ']'; - ipstr[l++] = '\0'; - } else { - InetNtopA(af, ap, ipstr, INET6_ADDRSTRLEN); - } - } - if (portstr != NULL) { -#ifdef NNG_LITTLE_ENDIAN - port = ((port >> 8) & 0xff) | ((port & 0xff) << 8); -#endif - snprintf(portstr, 6, "%u", port); - } - return (0); -} - int nni_win_tcp_sysinit(void) { diff --git a/src/platform/windows/win_tcp.h b/src/platform/windows/win_tcp.h new file mode 100644 index 00000000..7025af81 --- /dev/null +++ b/src/platform/windows/win_tcp.h @@ -0,0 +1,67 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef PLATFORM_WIN_WINTCP_H +#define PLATFORM_WIN_WINTCP_H + +// This header file is private to the TCP support for Windows. + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +struct nni_tcp_conn { + SOCKET s; + nni_win_io recv_io; + nni_win_io send_io; + nni_win_io conn_io; + nni_list recv_aios; + nni_list send_aios; + nni_aio * conn_aio; + SOCKADDR_STORAGE sockname; + SOCKADDR_STORAGE peername; + nni_tcp_dialer * dialer; + nni_tcp_listener *listener; + int recv_rv; + int send_rv; + int conn_rv; + bool closed; + char buf[512]; // to hold acceptex results + nni_mtx mtx; + nni_cv cv; +}; + +struct nni_tcp_dialer { + LPFN_CONNECTEX connectex; // looked up name via ioctl + nni_list aios; // in flight connections + bool closed; + nni_mtx mtx; + nni_reap_item reap; +}; + +struct nni_tcp_listener { + SOCKET s; + nni_list aios; + bool closed; + bool started; + LPFN_ACCEPTEX acceptex; + LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs; + SOCKADDR_STORAGE ss; + nni_mtx mtx; + nni_reap_item reap; +}; + +extern int nni_win_tcp_conn_init(nni_tcp_conn **, SOCKET); +extern void nni_win_tcp_conn_set_addrs( + nni_tcp_conn *, const SOCKADDR_STORAGE *, const SOCKADDR_STORAGE *); + +#endif // NNG_PLATFORM_WINDOWS + +#endif // NNG_PLATFORM_WIN_WINTCP_H diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c new file mode 100644 index 00000000..c3a4a5d8 --- /dev/null +++ b/src/platform/windows/win_tcpconn.c @@ -0,0 +1,391 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include "win_tcp.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include <malloc.h> +#include <stdio.h> + +static void +tcp_recv_start(nni_tcp_conn *c) +{ + nni_aio *aio; + int rv; + DWORD niov; + DWORD flags; + unsigned i; + unsigned naiov; + nni_iov *aiov; + WSABUF * iov; + + if (c->closed) { + while ((aio = nni_list_first(&c->recv_aios)) != NULL) { + nni_list_remove(&c->recv_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } +again: + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + iov = _malloca(naiov * sizeof(*iov)); + + // Put the AIOs in Windows form. + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iov[niov].buf = aiov[i].iov_buf; + iov[niov].len = (ULONG) aiov[i].iov_len; + niov++; + } + } + + flags = 0; + rv = WSARecv(c->s, iov, niov, NULL, &flags, &c->recv_io.olpd, NULL); + _freea(iov); + + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +tcp_recv_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_tcp_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->recv_rv != 0) { + rv = c->recv_rv; + c->recv_rv = 0; + } + nni_aio_list_remove(aio); + tcp_recv_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + if ((rv == 0) && (num == 0)) { + // A zero byte receive is a remote close from the peer. + rv = NNG_ECLOSED; + } + nni_aio_finish_synch(aio, rv, num); +} + +static void +tcp_recv_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->recv_aios)) { + c->recv_rv = rv; + nni_win_io_cancel(&c->recv_io); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_recv_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->recv_aios, aio); + if (aio == nni_list_first(&c->recv_aios)) { + tcp_recv_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_send_start(nni_tcp_conn *c) +{ + nni_aio *aio; + int rv; + DWORD niov; + unsigned i; + unsigned naiov; + nni_iov *aiov; + WSABUF * iov; + + if (c->closed) { + while ((aio = nni_list_first(&c->send_aios)) != NULL) { + nni_list_remove(&c->send_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } + +again: + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + iov = _malloca(naiov * sizeof(*iov)); + + // Put the AIOs in Windows form. + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iov[niov].buf = aiov[i].iov_buf; + iov[niov].len = (ULONG) aiov[i].iov_len; + niov++; + } + } + + rv = WSASend(c->s, iov, niov, NULL, 0, &c->send_io.olpd, NULL); + _freea(iov); + + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +tcp_send_cancel(nni_aio *aio, int rv) +{ + nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->send_aios)) { + c->send_rv = rv; + nni_win_io_cancel(&c->send_io); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +static void +tcp_send_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_tcp_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->send_rv != 0) { + rv = c->send_rv; + c->send_rv = 0; + } + nni_aio_list_remove(aio); // should always be at head + tcp_send_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + nni_aio_finish_synch(aio, rv, num); +} + +void +nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_send_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->send_aios, aio); + if (aio == nni_list_first(&c->send_aios)) { + tcp_send_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s) +{ + nni_tcp_conn *c; + int rv; + BOOL yes; + DWORD no; + + // Don't inherit the handle (CLOEXEC really). + SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0); + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + c->s = INVALID_SOCKET; + nni_mtx_init(&c->mtx); + nni_cv_init(&c->cv, &c->mtx); + nni_aio_list_init(&c->recv_aios); + nni_aio_list_init(&c->send_aios); + c->conn_aio = NULL; + + if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) != + 0) || + ((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) != + 0) || + ((rv = nni_win_io_register((HANDLE) s)) != 0)) { + nni_tcp_conn_fini(c); + return (rv); + } + + no = 0; + (void) setsockopt( + s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); + yes = 1; + (void) setsockopt( + s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); + + c->s = s; + *connp = c; + return (0); +} + +void +nni_win_tcp_conn_set_addrs( + nni_tcp_conn *c, const SOCKADDR_STORAGE *loc, const SOCKADDR_STORAGE *rem) +{ + memcpy(&c->sockname, loc, sizeof(*loc)); + memcpy(&c->peername, rem, sizeof(*rem)); +} + +void +nni_tcp_conn_close(nni_tcp_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + c->closed = true; + if (!nni_list_empty(&c->recv_aios)) { + nni_win_io_cancel(&c->recv_io); + } + if (!nni_list_empty(&c->send_aios)) { + nni_win_io_cancel(&c->send_io); + } + if (c->s != INVALID_SOCKET) { + shutdown(c->s, SD_BOTH); + } + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa) +{ + if (nni_win_sockaddr2nn(sa, &c->peername) < 0) { + return (NNG_EADDRINVAL); + } + return (0); +} + +int +nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa) +{ + if (nni_win_sockaddr2nn(sa, &c->sockname) < 0) { + return (NNG_EADDRINVAL); + } + return (0); +} + +int +nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool val) +{ + BOOL b; + b = val ? TRUE : FALSE; + if (setsockopt( + c->s, IPPROTO_TCP, TCP_NODELAY, (void *) &b, sizeof(b)) != 0) { + return (nni_win_error(WSAGetLastError())); + } + return (0); +} + +int +nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool val) +{ + BOOL b; + b = val ? TRUE : FALSE; + if (setsockopt( + c->s, SOL_SOCKET, SO_KEEPALIVE, (void *) &b, sizeof(b)) != 0) { + return (nni_win_error(WSAGetLastError())); + } + return (0); +} + +void +nni_tcp_conn_fini(nni_tcp_conn *c) +{ + nni_tcp_conn_close(c); + + nni_mtx_lock(&c->mtx); + while ((!nni_list_empty(&c->recv_aios)) || + (!nni_list_empty(&c->send_aios))) { + nni_cv_wait(&c->cv); + nni_mtx_unlock(&c->mtx); + } + nni_mtx_unlock(&c->mtx); + + nni_win_io_fini(&c->recv_io); + nni_win_io_fini(&c->send_io); + nni_win_io_fini(&c->conn_io); + + if (c->s != INVALID_SOCKET) { + closesocket(c->s); + } + nni_cv_fini(&c->cv); + nni_mtx_fini(&c->mtx); + NNI_FREE_STRUCT(c); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c new file mode 100644 index 00000000..4a3e9f2f --- /dev/null +++ b/src/platform/windows/win_tcpdial.c @@ -0,0 +1,228 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include "win_tcp.h" + +#include <malloc.h> +#include <stdio.h> + +int +nni_tcp_dialer_init(nni_tcp_dialer **dp) +{ + nni_tcp_dialer *d; + int rv; + SOCKET s; + DWORD nbytes; + GUID guid = WSAID_CONNECTEX; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + ZeroMemory(d, sizeof(*d)); + nni_mtx_init(&d->mtx); + nni_aio_list_init(&d->aios); + + // Create a scratch socket for use with ioctl. + s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + nni_tcp_dialer_fini(d); + return (rv); + } + + // Look up the function pointer. + if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, + sizeof(guid), &d->connectex, sizeof(d->connectex), &nbytes, + NULL, NULL) == SOCKET_ERROR) { + rv = nni_win_error(GetLastError()); + closesocket(s); + nni_tcp_dialer_fini(d); + return (rv); + } + + closesocket(s); + + *dp = d; + return (0); +} + +void +nni_tcp_dialer_close(nni_tcp_dialer *d) +{ + nni_mtx_lock(&d->mtx); + if (!d->closed) { + nni_aio *aio; + d->closed = true; + + NNI_LIST_FOREACH (&d->aios, aio) { + nni_tcp_conn *c; + + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->conn_rv = NNG_ECLOSED; + nni_win_io_cancel(&c->conn_io); + } + } + } + nni_mtx_unlock(&d->mtx); +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_close(d); + nni_mtx_lock(&d->mtx); + if (!nni_list_empty(&d->aios)) { + nni_mtx_unlock(&d->mtx); + nni_reap(&d->reap, (nni_cb) nni_tcp_dialer_fini, d); + return; + } + nni_mtx_unlock(&d->mtx); + + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +tcp_dial_cancel(nni_aio *aio, int rv) +{ + nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_conn * c; + + nni_mtx_lock(&d->mtx); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + if (c->conn_rv == 0) { + c->conn_rv = rv; + } + nni_win_io_cancel(&c->conn_io); + } + nni_mtx_unlock(&d->mtx); +} + +static void +tcp_dial_cb(nni_win_io *io, int rv, size_t cnt) +{ + nni_tcp_conn * c = io->ptr; + nni_tcp_dialer *d = c->dialer; + nni_aio * aio = c->conn_aio; + + NNI_ARG_UNUSED(cnt); + + nni_mtx_lock(&d->mtx); + if ((aio = c->conn_aio) == NULL) { + // This should never occur. + nni_mtx_unlock(&d->mtx); + return; + } + + c->conn_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_aio_list_remove(aio); + if (c->conn_rv != 0) { + rv = c->conn_rv; + } + nni_mtx_unlock(&d->mtx); + + if (rv != 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + } else { + DWORD yes = 1; + (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, + (char *) &yes, sizeof(yes)); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +void +nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + SOCKET s; + SOCKADDR_STORAGE ss; + int len; + nni_tcp_conn * c; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if ((len = nni_win_nn2sockaddr(&ss, sa)) <= 0) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + + if ((s = socket(ss.ss_family, SOCK_STREAM, 0)) == INVALID_SOCKET) { + nni_aio_finish_error(aio, nni_win_error(GetLastError())); + return; + } + + if ((rv = nni_win_tcp_conn_init(&c, s)) != 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + c->peername = ss; + + // Windows ConnectEx requires the socket to be bound + // first. We just bind to an ephemeral address in the + // same family. + ZeroMemory(&c->sockname, sizeof(c->sockname)); + c->sockname.ss_family = ss.ss_family; + if (bind(s, (SOCKADDR *) &c->sockname, len) < 0) { + rv = nni_win_error(GetLastError()); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) != + 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + nni_mtx_lock(&d->mtx); + if (d->closed) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + c->dialer = d; + nni_aio_set_prov_extra(aio, 0, c); + if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; + } + c->conn_aio = aio; + nni_aio_list_append(&d->aios, aio); + + // dialing is concurrent. + if (!d->connectex(s, (struct sockaddr *) &c->peername, len, NULL, 0, + NULL, &c->conn_io.olpd)) { + if ((rv = GetLastError()) != ERROR_IO_PENDING) { + nni_aio_list_remove(aio); + nni_mtx_unlock(&d->mtx); + + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + } + nni_mtx_unlock(&d->mtx); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c new file mode 100644 index 00000000..5055c32d --- /dev/null +++ b/src/platform/windows/win_tcplisten.c @@ -0,0 +1,312 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include <malloc.h> +#include <stdbool.h> +#include <stdio.h> + +#include "win_tcp.h" + +// tcp_listener_funcs looks up function pointers we need for advanced accept +// functionality on Windows. Windows is weird. +static int +tcp_listener_funcs(nni_tcp_listener *l) +{ + static SRWLOCK lock = SRWLOCK_INIT; + static LPFN_ACCEPTEX acceptex; + static LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs; + + AcquireSRWLockExclusive(&lock); + if (acceptex == NULL) { + int rv; + DWORD nbytes; + GUID guid1 = WSAID_ACCEPTEX; + GUID guid2 = WSAID_GETACCEPTEXSOCKADDRS; + SOCKET s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + + if (s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + ReleaseSRWLockExclusive(&lock); + return (rv); + } + + // Look up the function pointer. + if ((WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, + sizeof(guid1), &acceptex, sizeof(acceptex), &nbytes, + NULL, NULL) == SOCKET_ERROR) || + (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, + sizeof(guid2), &getacceptexsockaddrs, + sizeof(getacceptexsockaddrs), &nbytes, NULL, + NULL) == SOCKET_ERROR)) { + rv = nni_win_error(GetLastError()); + acceptex = NULL; + getacceptexsockaddrs = NULL; + ReleaseSRWLockExclusive(&lock); + closesocket(s); + return (rv); + } + closesocket(s); + } + ReleaseSRWLockExclusive(&lock); + + l->acceptex = acceptex; + l->getacceptexsockaddrs = getacceptexsockaddrs; + return (0); +} + +static void +tcp_accept_cb(nni_win_io *io, int rv, size_t cnt) +{ + nni_tcp_conn * c = io->ptr; + nni_tcp_listener *l = c->listener; + nni_aio * aio; + int len1; + int len2; + SOCKADDR * sa1; + SOCKADDR * sa2; + DWORD yes; + + NNI_ARG_UNUSED(cnt); + + nni_mtx_lock(&l->mtx); + if ((aio = c->conn_aio) == NULL) { + // This case should not occur. The situation would indicate + // a case where the connection was accepted already. + nni_mtx_unlock(&l->mtx); + return; + } + c->conn_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_aio_list_remove(aio); + if (c->conn_rv != 0) { + rv = c->conn_rv; + } + nni_mtx_unlock(&l->mtx); + + if (rv != 0) { + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + len1 = (int) sizeof(c->sockname); + len2 = (int) sizeof(c->peername); + l->getacceptexsockaddrs(c->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2); + memcpy(&c->sockname, sa1, len1); + memcpy(&c->peername, sa2, len2); + + yes = 1; + (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, + (char *) &yes, sizeof(yes)); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +int +nni_tcp_listener_init(nni_tcp_listener **lp) +{ + nni_tcp_listener *l; + int rv; + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + ZeroMemory(l, sizeof(*l)); + nni_mtx_init(&l->mtx); + nni_aio_list_init(&l->aios); + if ((rv = tcp_listener_funcs(l)) != 0) { + nni_tcp_listener_fini(l); + return (rv); + } + + *lp = l; + return (0); +} + +void +nni_tcp_listener_close(nni_tcp_listener *l) +{ + nni_mtx_lock(&l->mtx); + if (!l->closed) { + nni_aio *aio; + l->closed = true; + + NNI_LIST_FOREACH (&l->aios, aio) { + nni_tcp_conn *c; + + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->conn_rv = NNG_ECLOSED; + nni_win_io_cancel(&c->conn_io); + } + } + closesocket(l->s); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_tcp_listener_fini(nni_tcp_listener *l) +{ + nni_tcp_listener_close(l); + nni_mtx_lock(&l->mtx); + if (!nni_list_empty(&l->aios)) { + nni_mtx_unlock(&l->mtx); + nni_reap(&l->reap, (nni_cb) nni_tcp_listener_fini, l); + return; + } + nni_mtx_unlock(&l->mtx); + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +int +nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) +{ + int rv; + BOOL yes; + DWORD no; + int len; + + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_EBUSY); + } + if ((len = nni_win_nn2sockaddr(&l->ss, sa)) <= 0) { + nni_mtx_unlock(&l->mtx); + return (NNG_EADDRINVAL); + } + l->s = socket(l->ss.ss_family, SOCK_STREAM, 0); + if (l->s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + nni_mtx_unlock(&l->mtx); + return (rv); + } + + // Don't inherit the handle (CLOEXEC really). + SetHandleInformation((HANDLE) l->s, HANDLE_FLAG_INHERIT, 0); + + no = 0; + (void) setsockopt( + l->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); + yes = 1; + (void) setsockopt( + l->s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); + + if ((rv = nni_win_io_register((HANDLE) l->s)) != 0) { + closesocket(l->s); + l->s = INVALID_SOCKET; + nni_mtx_unlock(&l->mtx); + return (rv); + } + + // Make sure that we use the address exclusively. Windows lets + // others hijack us by default. + yes = 1; + if ((setsockopt(l->s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, + sizeof(yes)) != 0) || + (bind(l->s, (SOCKADDR *) &l->ss, len) != 0) || + (getsockname(l->s, (SOCKADDR *) &l->ss, &len) != 0) || + (listen(l->s, SOMAXCONN) != 0)) { + rv = nni_win_error(GetLastError()); + closesocket(l->s); + l->s = INVALID_SOCKET; + nni_mtx_unlock(&l->mtx); + return (rv); + } + nni_win_sockaddr2nn(sa, &l->ss); + l->started = true; + nni_mtx_unlock(&l->mtx); + return (0); +} + +static void +tcp_accept_cancel(nni_aio *aio, int rv) +{ + nni_tcp_listener *l = nni_aio_get_prov_data(aio); + nni_tcp_conn * c; + + nni_mtx_lock(&l->mtx); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + if (c->conn_rv == 0) { + c->conn_rv = rv; + } + nni_win_io_cancel(&c->conn_io); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) +{ + SOCKET s; + int rv; + DWORD cnt; + nni_tcp_conn *c; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + // Windows requires us to explicity create the socket before + // calling accept on it. + if ((s = socket(l->ss.ss_family, SOCK_STREAM, 0)) == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_win_tcp_conn_init(&c, s)) != 0) { + nni_mtx_unlock(&l->mtx); + closesocket(s); + nni_aio_finish_error(aio, rv); + return; + } + c->listener = l; + c->conn_aio = aio; + nni_aio_set_prov_extra(aio, 0, c); + if (((rv = nni_win_io_init( + &c->conn_io, (HANDLE) l->s, tcp_accept_cb, c)) != 0) || + ((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) { + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&l->mtx); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&l->aios, aio); + if ((!l->acceptex( + l->s, s, c->buf, 0, 256, 256, &cnt, &c->conn_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Fast failure (synchronous.) + nni_aio_list_remove(aio); + nni_mtx_unlock(&l->mtx); + nni_tcp_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + nni_mtx_unlock(&l->mtx); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index 243811a0..52327cc4 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -181,7 +181,8 @@ nni_plat_init(int (*helper)(void)) AcquireSRWLockExclusive(&lock); if (!plat_inited) { - if (((rv = nni_win_iocp_sysinit()) != 0) || + if (((rv = nni_win_io_sysinit()) != 0) || + ((rv = nni_win_iocp_sysinit()) != 0) || ((rv = nni_win_ipc_sysinit()) != 0) || ((rv = nni_win_tcp_sysinit()) != 0) || ((rv = nni_win_udp_sysinit()) != 0) || @@ -207,6 +208,7 @@ nni_plat_fini(void) nni_win_udp_sysfini(); nni_win_tcp_sysfini(); nni_win_iocp_sysfini(); + nni_win_io_sysfini(); WSACleanup(); plat_inited = 0; } diff --git a/src/supplemental/http/http_api.h b/src/supplemental/http/http_api.h index 4b515ca5..cf2c78bf 100644 --- a/src/supplemental/http/http_api.h +++ b/src/supplemental/http/http_api.h @@ -67,9 +67,9 @@ extern void *nni_http_conn_get_ctx(nni_http_conn *); // These initialization functions create stream for HTTP transactions. // They should only be used by the server or client HTTP implementations, // and are not for use by other code. -extern int nni_http_conn_init_tcp(nni_http_conn **, void *); +extern int nni_http_conn_init_tcp(nni_http_conn **, nni_tcp_conn *); extern int nni_http_conn_init_tls( - nni_http_conn **, struct nng_tls_config *, void *); + nni_http_conn **, struct nng_tls_config *, nni_tcp_conn *); extern void nni_http_conn_close(nni_http_conn *); extern void nni_http_conn_fini(nni_http_conn *); diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index 4034bbff..6842b271 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -20,72 +20,97 @@ #include "http_api.h" struct nng_http_client { - nni_list aios; - nni_mtx mtx; - bool closed; - struct nng_tls_config *tls; - nni_aio * connaio; - nni_plat_tcp_ep * tep; + nni_list aios; + nni_mtx mtx; + bool closed; + bool resolving; + nng_tls_config *tls; + nni_aio * aio; + nng_sockaddr sa; + nni_tcp_dialer *dialer; + char * host; + char * port; + nni_url * url; }; static void -http_conn_start(nni_http_client *c) +http_dial_start(nni_http_client *c) { - nni_plat_tcp_ep_connect(c->tep, c->connaio); + nni_aio *aio; + + if ((aio = nni_list_first(&c->aios)) == NULL) { + return; + } + c->resolving = true; + nni_aio_set_input(c->aio, 0, &c->sa); + nni_tcp_resolv(c->host, c->port, NNG_AF_UNSPEC, 0, c->aio); } static void -http_conn_done(void *arg) +http_dial_cb(void *arg) { - nni_http_client * c = arg; - nni_aio * aio; - int rv; - nni_plat_tcp_pipe *p; - nni_http_conn * conn; + nni_http_client *c = arg; + nni_aio * aio; + int rv; + nni_tcp_conn * tcp; + nni_http_conn * conn; nni_mtx_lock(&c->mtx); - rv = nni_aio_result(c->connaio); - p = rv == 0 ? nni_aio_get_output(c->connaio, 0) : NULL; + rv = nni_aio_result(c->aio); + if ((aio = nni_list_first(&c->aios)) == NULL) { - if (p != NULL) { - nni_plat_tcp_pipe_fini(p); - } + // User abandoned request, and no residuals left. nni_mtx_unlock(&c->mtx); + if ((rv == 0) && !c->resolving) { + tcp = nni_aio_get_output(c->aio, 0); + nni_tcp_conn_fini(tcp); + } return; } - nni_aio_list_remove(aio); if (rv != 0) { + nni_aio_list_remove(aio); + http_dial_start(c); + nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); + return; + } + + if (c->resolving) { + // This was a DNS lookup -- advance to normal TCP connect. + c->resolving = false; + nni_tcp_dialer_dial(c->dialer, &c->sa, c->aio); nni_mtx_unlock(&c->mtx); return; } + nni_aio_list_remove(aio); + tcp = nni_aio_get_output(c->aio, 0); + NNI_ASSERT(tcp != NULL); + if (c->tls != NULL) { - rv = nni_http_conn_init_tls(&conn, c->tls, p); + rv = nni_http_conn_init_tls(&conn, c->tls, tcp); } else { - rv = nni_http_conn_init_tcp(&conn, p); + rv = nni_http_conn_init_tcp(&conn, tcp); } + http_dial_start(c); + nni_mtx_unlock(&c->mtx); + if (rv != 0) { + // the conn_init function will have already discard tcp. nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&c->mtx); return; } nni_aio_set_output(aio, 0, conn); nni_aio_finish(aio, 0, 0); - - if (!nni_list_empty(&c->aios)) { - http_conn_start(c); - } - nni_mtx_unlock(&c->mtx); } void nni_http_client_fini(nni_http_client *c) { - nni_aio_fini(c->connaio); - nni_plat_tcp_ep_fini(c->tep); + nni_aio_fini(c->aio); + nni_tcp_dialer_fini(c->dialer); nni_mtx_fini(&c->mtx); #ifdef NNG_SUPP_TLS if (c->tls != NULL) { @@ -100,10 +125,6 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url) { int rv; nni_http_client *c; - nni_aio * aio; - nni_sockaddr sa; - char * host; - char * port; if (strlen(url->u_hostname) == 0) { // We require a valid hostname. @@ -118,29 +139,17 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url) return (NNG_EADDRINVAL); } - // For now we are looking up the address. We would really like - // to do this later, but we need TcP support for this. One - // imagines the ability to create a tcp dialer that does the - // necessary DNS lookups, etc. all asynchronously. - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } - nni_aio_set_input(aio, 0, &sa); - host = (strlen(url->u_hostname) != 0) ? url->u_hostname : NULL; - port = (strlen(url->u_port) != 0) ? url->u_port : NULL; - nni_plat_tcp_resolv(host, port, NNG_AF_UNSPEC, false, aio); - nni_aio_wait(aio); - rv = nni_aio_result(aio); - nni_aio_fini(aio); - if (rv != 0) { - return (rv); - } - if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&c->mtx); nni_aio_list_init(&c->aios); + if (((c->host = nni_strdup(url->u_hostname)) == NULL) || + ((strlen(url->u_port) != 0) && + ((c->port = nni_strdup(url->u_port)) == NULL))) { + nni_http_client_fini(c); + return (NNG_ENOMEM); + } #ifdef NNG_SUPP_TLS if ((strcmp(url->u_scheme, "https") == 0) || @@ -167,16 +176,12 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url) } #endif - rv = nni_plat_tcp_ep_init(&c->tep, NULL, &sa, NNI_EP_MODE_DIAL); - if (rv != 0) { + if (((rv = nni_tcp_dialer_init(&c->dialer)) != 0) || + ((rv = nni_aio_init(&c->aio, http_dial_cb, c)) != 0)) { nni_http_client_fini(c); return (rv); } - if ((rv = nni_aio_init(&c->connaio, http_conn_done, c)) != 0) { - nni_http_client_fini(c); - return (rv); - } *cp = c; return (0); } @@ -224,7 +229,7 @@ nni_http_client_get_tls(nni_http_client *c, struct nng_tls_config **tlsp) } static void -http_connect_cancel(nni_aio *aio, int rv) +http_dial_cancel(nni_aio *aio, int rv) { nni_http_client *c = nni_aio_get_prov_data(aio); nni_mtx_lock(&c->mtx); @@ -233,7 +238,7 @@ http_connect_cancel(nni_aio *aio, int rv) nni_aio_finish_error(aio, rv); } if (nni_list_empty(&c->aios)) { - nni_aio_abort(c->connaio, rv); + nni_aio_abort(c->aio, rv); } nni_mtx_unlock(&c->mtx); } @@ -246,14 +251,14 @@ nni_http_client_connect(nni_http_client *c, nni_aio *aio) return; } nni_mtx_lock(&c->mtx); - if ((rv = nni_aio_schedule(aio, http_connect_cancel, c)) != 0) { + if ((rv = nni_aio_schedule(aio, http_dial_cancel, c)) != 0) { nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); return; } nni_list_append(&c->aios, aio); if (nni_list_first(&c->aios) == aio) { - http_conn_start(c); + http_dial_start(c); } nni_mtx_unlock(&c->mtx); } diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c index 15d1f776..169918e9 100644 --- a/src/supplemental/http/http_conn.c +++ b/src/supplemental/http/http_conn.c @@ -699,17 +699,23 @@ http_init(nni_http_conn **connp, nni_http_tran *tran, void *data) if ((conn = NNI_ALLOC_STRUCT(conn)) == NULL) { return (NNG_ENOMEM); } - conn->rd_bufsz = HTTP_BUFSIZE; - if ((conn->rd_buf = nni_alloc(conn->rd_bufsz)) == NULL) { - NNI_FREE_STRUCT(conn); - return (NNG_ENOMEM); - } nni_mtx_init(&conn->mtx); nni_aio_list_init(&conn->rdq); nni_aio_list_init(&conn->wrq); + if ((conn->rd_buf = nni_alloc(HTTP_BUFSIZE)) == NULL) { + nni_http_conn_fini(conn); + return (NNG_ENOMEM); + } + conn->rd_bufsz = HTTP_BUFSIZE; + + if (((rv = nni_aio_init(&conn->wr_aio, http_wr_cb, conn)) != 0) || + ((rv = nni_aio_init(&conn->rd_aio, http_rd_cb, conn)) != 0)) { + nni_http_conn_fini(conn); + return (rv); + } + conn->sock = data; - conn->rd_bufsz = HTTP_BUFSIZE; conn->rd = tran->h_read; conn->wr = tran->h_write; conn->close = tran->h_close; @@ -718,12 +724,6 @@ http_init(nni_http_conn **connp, nni_http_tran *tran, void *data) conn->peer_addr = tran->h_peer_addr; conn->verified = tran->h_verified; - if (((rv = nni_aio_init(&conn->wr_aio, http_wr_cb, conn)) != 0) || - ((rv = nni_aio_init(&conn->rd_aio, http_rd_cb, conn)) != 0)) { - nni_http_conn_fini(conn); - return (rv); - } - *connp = conn; return (0); @@ -737,19 +737,23 @@ nni_http_verified_tcp(void *arg) } static nni_http_tran http_tcp_ops = { - .h_read = (http_read_fn) nni_plat_tcp_pipe_recv, - .h_write = (http_write_fn) nni_plat_tcp_pipe_send, - .h_close = (http_close_fn) nni_plat_tcp_pipe_close, - .h_fini = (http_fini_fn) nni_plat_tcp_pipe_fini, - .h_sock_addr = (http_addr_fn) nni_plat_tcp_pipe_sockname, - .h_peer_addr = (http_addr_fn) nni_plat_tcp_pipe_peername, + .h_read = (http_read_fn) nni_tcp_conn_recv, + .h_write = (http_write_fn) nni_tcp_conn_send, + .h_close = (http_close_fn) nni_tcp_conn_close, + .h_fini = (http_fini_fn) nni_tcp_conn_fini, + .h_sock_addr = (http_addr_fn) nni_tcp_conn_sockname, + .h_peer_addr = (http_addr_fn) nni_tcp_conn_peername, .h_verified = (http_verified_fn) nni_http_verified_tcp, }; int -nni_http_conn_init_tcp(nni_http_conn **connp, void *tcp) +nni_http_conn_init_tcp(nni_http_conn **connp, nni_tcp_conn *tcp) { - return (http_init(connp, &http_tcp_ops, tcp)); + int rv; + if ((rv = http_init(connp, &http_tcp_ops, tcp)) != 0) { + nni_tcp_conn_fini(tcp); + } + return (rv); } #ifdef NNG_SUPP_TLS @@ -765,26 +769,29 @@ static nni_http_tran http_tls_ops = { int nni_http_conn_init_tls( - nni_http_conn **connp, struct nng_tls_config *cfg, void *tcp) + nni_http_conn **connp, struct nng_tls_config *cfg, nni_tcp_conn *tcp) { nni_tls *tls; int rv; if ((rv = nni_tls_init(&tls, cfg, tcp)) != 0) { - nni_plat_tcp_pipe_fini(tcp); + nni_tcp_conn_fini(tcp); return (rv); } - return (http_init(connp, &http_tls_ops, tls)); + if ((rv = http_init(connp, &http_tls_ops, tls)) != 0) { + nni_tls_fini(tls); + } + return (rv); } #else int nni_http_conn_init_tls( - nni_http_conn **connp, struct nng_tls_config *cfg, void *tcp) + nni_http_conn **connp, struct nng_tls_config *cfg, nni_tcp_conn *tcp) { NNI_ARG_UNUSED(connp); NNI_ARG_UNUSED(cfg); - nni_plat_tcp_pipe_fini(tcp); + nni_tcp_conn_fini(tcp); return (NNG_ENOTSUP); } #endif // NNG_SUPP_TLS diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index 4a07d544..9d1313b1 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -64,22 +64,22 @@ typedef struct http_error { } http_error; struct nng_http_server { - nng_sockaddr addr; - nni_list_node node; - int refcnt; - int starts; - nni_list handlers; - nni_list conns; - nni_mtx mtx; - nni_cv cv; - bool closed; - nng_tls_config * tls; - nni_aio * accaio; - nni_plat_tcp_ep *tep; - char * port; - char * hostname; - nni_list errors; - nni_mtx errors_mtx; + nng_sockaddr addr; + nni_list_node node; + int refcnt; + int starts; + nni_list handlers; + nni_list conns; + nni_mtx mtx; + nni_cv cv; + bool closed; + nng_tls_config * tls; + nni_aio * accaio; + nni_tcp_listener *listener; + char * port; + char * hostname; + nni_list errors; + nni_mtx errors_mtx; }; int @@ -682,13 +682,13 @@ http_sconn_cbdone(void *arg) } static int -http_sconn_init(http_sconn **scp, nni_http_server *s, nni_plat_tcp_pipe *tcp) +http_sconn_init(http_sconn **scp, nni_http_server *s, nni_tcp_conn *tcp) { http_sconn *sc; int rv; if ((sc = NNI_ALLOC_STRUCT(sc)) == NULL) { - nni_plat_tcp_pipe_fini(tcp); + nni_tcp_conn_fini(tcp); return (NNG_ENOMEM); } @@ -720,17 +720,17 @@ http_sconn_init(http_sconn **scp, nni_http_server *s, nni_plat_tcp_pipe *tcp) static void http_server_acccb(void *arg) { - nni_http_server * s = arg; - nni_aio * aio = s->accaio; - nni_plat_tcp_pipe *tcp; - http_sconn * sc; - int rv; + nni_http_server *s = arg; + nni_aio * aio = s->accaio; + nni_tcp_conn * tcp; + http_sconn * sc; + int rv; nni_mtx_lock(&s->mtx); if ((rv = nni_aio_result(aio)) != 0) { if (!s->closed) { // try again? - nni_plat_tcp_ep_accept(s->tep, s->accaio); + nni_tcp_listener_accept(s->listener, s->accaio); } nni_mtx_unlock(&s->mtx); return; @@ -738,14 +738,14 @@ http_server_acccb(void *arg) tcp = nni_aio_get_output(aio, 0); if (s->closed) { // If we're closing, then reject this one. - nni_plat_tcp_pipe_fini(tcp); + nni_tcp_conn_fini(tcp); nni_mtx_unlock(&s->mtx); return; } if (http_sconn_init(&sc, s, tcp) != 0) { // The TCP structure is already cleaned up. // Start another accept attempt. - nni_plat_tcp_ep_accept(s->tep, s->accaio); + nni_tcp_listener_accept(s->listener, s->accaio); nni_mtx_unlock(&s->mtx); return; } @@ -753,7 +753,7 @@ http_server_acccb(void *arg) nni_list_append(&s->conns, sc); nni_http_read_req(sc->conn, sc->req, sc->rxaio); - nni_plat_tcp_ep_accept(s->tep, s->accaio); + nni_tcp_listener_accept(s->listener, s->accaio); nni_mtx_unlock(&s->mtx); } @@ -769,8 +769,8 @@ http_server_fini(nni_http_server *s) while (!nni_list_empty(&s->conns)) { nni_cv_wait(&s->cv); } - if (s->tep != NULL) { - nni_plat_tcp_ep_fini(s->tep); + if (s->listener != NULL) { + nni_tcp_listener_fini(s->listener); } while ((h = nni_list_first(&s->handlers)) != NULL) { nni_list_remove(&s->handlers, h); @@ -875,7 +875,7 @@ http_server_init(nni_http_server **serverp, const nni_url *url) return (rv); } nni_aio_set_input(aio, 0, &s->addr); - nni_plat_tcp_resolv(s->hostname, s->port, NNG_AF_UNSPEC, true, aio); + nni_tcp_resolv(s->hostname, s->port, NNG_AF_UNSPEC, true, aio); nni_aio_wait(aio); rv = nni_aio_result(aio); nni_aio_fini(aio); @@ -921,16 +921,15 @@ static int http_server_start(nni_http_server *s) { int rv; - rv = nni_plat_tcp_ep_init(&s->tep, &s->addr, NULL, NNI_EP_MODE_LISTEN); - if (rv != 0) { + if ((rv = nni_tcp_listener_init(&s->listener)) != 0) { return (rv); } - if ((rv = nni_plat_tcp_ep_listen(s->tep, NULL)) != 0) { - nni_plat_tcp_ep_fini(s->tep); - s->tep = NULL; + if ((rv = nni_tcp_listener_listen(s->listener, &s->addr)) != 0) { + nni_tcp_listener_fini(s->listener); + s->listener = NULL; return (rv); } - nni_plat_tcp_ep_accept(s->tep, s->accaio); + nni_tcp_listener_accept(s->listener, s->accaio); return (0); } @@ -961,8 +960,8 @@ http_server_stop(nni_http_server *s) s->closed = true; // Close the TCP endpoint that is listening. - if (s->tep) { - nni_plat_tcp_ep_close(s->tep); + if (s->listener) { + nni_tcp_listener_close(s->listener); } // Stopping the server is a hard stop -- it aborts any work diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c index cdd226cd..0f4f67cc 100644 --- a/src/supplemental/tls/mbedtls/tls.c +++ b/src/supplemental/tls/mbedtls/tls.c @@ -59,7 +59,7 @@ typedef struct nni_tls_certkey { } nni_tls_certkey; struct nni_tls { - nni_plat_tcp_pipe * tcp; + nni_tcp_conn * tcp; mbedtls_ssl_context ctx; nng_tls_config * cfg; // kept so we can release it nni_mtx lk; @@ -254,14 +254,14 @@ nni_tls_fini(nni_tls *tp) { // Shut it all down first. if (tp->tcp) { - nni_plat_tcp_pipe_close(tp->tcp); + nni_tcp_conn_close(tp->tcp); } nni_aio_stop(tp->tcp_send); nni_aio_stop(tp->tcp_recv); // And finalize / free everything. if (tp->tcp) { - nni_plat_tcp_pipe_fini(tp->tcp); + nni_tcp_conn_fini(tp->tcp); } nni_aio_fini(tp->tcp_send); nni_aio_fini(tp->tcp_recv); @@ -306,7 +306,7 @@ nni_tls_mkerr(int err) } int -nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_plat_tcp_pipe *tcp) +nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp) { nni_tls *tp; int rv; @@ -314,7 +314,7 @@ nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_plat_tcp_pipe *tcp) // During the handshake, disable Nagle to shorten the // negotiation. Once things are set up the caller can // re-enable Nagle if so desired. - (void) nni_plat_tcp_pipe_set_nodelay(tcp, true); + (void) nni_tcp_conn_set_nodelay(tcp, true); if ((tp = NNI_ALLOC_STRUCT(tp)) == NULL) { return (NNG_ENOMEM); @@ -387,7 +387,7 @@ nni_tls_fail(nni_tls *tp, int rv) { nni_aio *aio; tp->tls_closed = true; - nni_plat_tcp_pipe_close(tp->tcp); + nni_tcp_conn_close(tp->tcp); tp->tcp_closed = true; while ((aio = nni_list_first(&tp->recvs)) != NULL) { nni_list_remove(&tp->recvs, aio); @@ -408,7 +408,7 @@ nni_tls_send_cb(void *ctx) nni_mtx_lock(&tp->lk); if (nni_aio_result(aio) != 0) { - nni_plat_tcp_pipe_close(tp->tcp); + nni_tcp_conn_close(tp->tcp); tp->tcp_closed = true; } else { size_t n = nni_aio_count(aio); @@ -421,7 +421,7 @@ nni_tls_send_cb(void *ctx) iov.iov_len = tp->sendlen; nni_aio_set_iov(aio, 1, &iov); nni_aio_set_timeout(aio, NNG_DURATION_INFINITE); - nni_plat_tcp_pipe_send(tp->tcp, aio); + nni_tcp_conn_send(tp->tcp, aio); nni_mtx_unlock(&tp->lk); return; } @@ -460,7 +460,7 @@ nni_tls_recv_start(nni_tls *tp) iov.iov_len = NNG_TLS_MAX_RECV_SIZE; nni_aio_set_iov(aio, 1, &iov); nni_aio_set_timeout(tp->tcp_recv, NNG_DURATION_INFINITE); - nni_plat_tcp_pipe_recv(tp->tcp, aio); + nni_tcp_conn_recv(tp->tcp, aio); } static void @@ -474,7 +474,7 @@ nni_tls_recv_cb(void *ctx) if (nni_aio_result(aio) != 0) { // Close the underlying TCP channel, but permit data we // already received to continue to be received. - nni_plat_tcp_pipe_close(tp->tcp); + nni_tcp_conn_close(tp->tcp); tp->tcp_closed = true; } else { NNI_ASSERT(tp->recvlen == 0); @@ -531,7 +531,7 @@ nni_tls_net_send(void *ctx, const unsigned char *buf, size_t len) iov.iov_len = len; nni_aio_set_iov(tp->tcp_send, 1, &iov); nni_aio_set_timeout(tp->tcp_send, NNG_DURATION_INFINITE); - nni_plat_tcp_pipe_send(tp->tcp, tp->tcp_send); + nni_tcp_conn_send(tp->tcp, tp->tcp_send); return (len); } @@ -615,25 +615,25 @@ nni_tls_recv(nni_tls *tp, nni_aio *aio) int nni_tls_peername(nni_tls *tp, nni_sockaddr *sa) { - return (nni_plat_tcp_pipe_peername(tp->tcp, sa)); + return (nni_tcp_conn_peername(tp->tcp, sa)); } int nni_tls_sockname(nni_tls *tp, nni_sockaddr *sa) { - return (nni_plat_tcp_pipe_sockname(tp->tcp, sa)); + return (nni_tcp_conn_sockname(tp->tcp, sa)); } int nni_tls_set_nodelay(nni_tls *tp, bool val) { - return (nni_plat_tcp_pipe_set_nodelay(tp->tcp, val)); + return (nni_tcp_conn_set_nodelay(tp->tcp, val)); } int nni_tls_set_keepalive(nni_tls *tp, bool val) { - return (nni_plat_tcp_pipe_set_keepalive(tp->tcp, val)); + return (nni_tcp_conn_set_keepalive(tp->tcp, val)); } static void @@ -785,7 +785,7 @@ nni_tls_close(nni_tls *tp) // connection at this point. (void) mbedtls_ssl_close_notify(&tp->ctx); } else { - nni_plat_tcp_pipe_close(tp->tcp); + nni_tcp_conn_close(tp->tcp); } nni_mtx_unlock(&tp->lk); } diff --git a/src/supplemental/tls/none/tls.c b/src/supplemental/tls/none/tls.c index 2fdc0c93..d7968758 100644 --- a/src/supplemental/tls/none/tls.c +++ b/src/supplemental/tls/none/tls.c @@ -47,7 +47,7 @@ nni_tls_fini(nni_tls *tp) } int -nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_plat_tcp_pipe *tcp) +nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp) { NNI_ARG_UNUSED(tpp); NNI_ARG_UNUSED(cfg); @@ -163,7 +163,8 @@ nng_tls_config_cert_key_file( return (NNG_ENOTSUP); } -int nng_tls_config_key(nng_tls_config *cfg, const uint8_t * key, size_t size) +int +nng_tls_config_key(nng_tls_config *cfg, const uint8_t *key, size_t size) { NNI_ARG_UNUSED(cfg); NNI_ARG_UNUSED(key); @@ -171,14 +172,14 @@ int nng_tls_config_key(nng_tls_config *cfg, const uint8_t * key, size_t size) return (NNG_ENOTSUP); } -int nng_tls_config_pass(nng_tls_config *cfg, const char *pass) +int +nng_tls_config_pass(nng_tls_config *cfg, const char *pass) { NNI_ARG_UNUSED(cfg); NNI_ARG_UNUSED(pass); return (NNG_ENOTSUP); } - int nng_tls_config_alloc(nng_tls_config **cfgp, nng_tls_mode mode) { diff --git a/src/supplemental/tls/tls_api.h b/src/supplemental/tls/tls_api.h index 8a40bcfb..53dba7fe 100644 --- a/src/supplemental/tls/tls_api.h +++ b/src/supplemental/tls/tls_api.h @@ -31,7 +31,7 @@ extern void nni_tls_config_fini(nng_tls_config *); // the configuration object is created with a hold on it. extern void nni_tls_config_hold(nng_tls_config *); -extern int nni_tls_init(nni_tls **, nng_tls_config *, nni_plat_tcp_pipe *); +extern int nni_tls_init(nni_tls **, nng_tls_config *, nni_tcp_conn *); extern void nni_tls_close(nni_tls *); extern void nni_tls_fini(nni_tls *); extern void nni_tls_send(nni_tls *, nng_aio *); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index e8aa04d0..c323da29 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -17,17 +17,18 @@ // TCP transport. Platform specific TCP operations must be // supplied as well. -typedef struct tcp_pipe tcp_pipe; -typedef struct tcp_ep tcp_ep; +typedef struct tcptran_pipe tcptran_pipe; +typedef struct tcptran_dialer tcptran_dialer; +typedef struct tcptran_listener tcptran_listener; // tcp_pipe is one end of a TCP connection. -struct tcp_pipe { - nni_plat_tcp_pipe *tpp; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool nodelay; - bool keepalive; +struct tcptran_pipe { + nni_tcp_conn *conn; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool nodelay; + bool keepalive; nni_list recvq; nni_list sendq; @@ -46,53 +47,70 @@ struct tcp_pipe { nni_mtx mtx; }; -struct tcp_ep { - nni_plat_tcp_ep *tep; - uint16_t proto; - size_t rcvmax; - bool nodelay; - bool keepalive; - nni_aio * aio; - nni_aio * user_aio; - nni_url * url; - nng_sockaddr bsa; // bound addr - nni_mtx mtx; +struct tcptran_dialer { + nni_tcp_dialer *dialer; + uint16_t proto; + uint16_t af; + size_t rcvmax; + bool nodelay; + bool keepalive; + bool resolving; + nng_sockaddr sa; + nni_aio * aio; + nni_aio * user_aio; + nni_url * url; + nni_mtx mtx; }; -static void tcp_pipe_dosend(tcp_pipe *, nni_aio *); -static void tcp_pipe_dorecv(tcp_pipe *); -static void tcp_pipe_send_cb(void *); -static void tcp_pipe_recv_cb(void *); -static void tcp_pipe_nego_cb(void *); -static void tcp_ep_cb(void *arg); +struct tcptran_listener { + nni_tcp_listener *listener; + uint16_t proto; + size_t rcvmax; + bool nodelay; + bool keepalive; + nni_aio * aio; + nni_aio * user_aio; + nni_url * url; + nng_sockaddr sa; + nng_sockaddr bsa; // bound addr + nni_mtx mtx; +}; + +static void tcptran_pipe_send_start(tcptran_pipe *); +static void tcptran_pipe_recv_start(tcptran_pipe *); +static void tcptran_pipe_send_cb(void *); +static void tcptran_pipe_recv_cb(void *); +static void tcptran_pipe_nego_cb(void *); +static void tcptran_dialer_cb(void *arg); +static void tcptran_listener_cb(void *arg); static int -tcp_tran_init(void) +tcptran_init(void) { return (0); } static void -tcp_tran_fini(void) +tcptran_fini(void) { } static void -tcp_pipe_close(void *arg) +tcptran_pipe_close(void *arg) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; nni_aio_close(p->rxaio); nni_aio_close(p->txaio); nni_aio_close(p->negaio); - nni_plat_tcp_pipe_close(p->tpp); + nni_tcp_conn_close(p->conn); } static void -tcp_pipe_stop(void *arg) +tcptran_pipe_stop(void *arg) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); @@ -100,62 +118,48 @@ tcp_pipe_stop(void *arg) } static void -tcp_pipe_fini(void *arg) +tcptran_pipe_fini(void *arg) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; nni_aio_fini(p->rxaio); nni_aio_fini(p->txaio); nni_aio_fini(p->negaio); - if (p->tpp != NULL) { - nni_plat_tcp_pipe_fini(p->tpp); - } - if (p->rxmsg) { - nni_msg_free(p->rxmsg); + if (p->conn != NULL) { + nni_tcp_conn_fini(p->conn); } - + nni_msg_free(p->rxmsg); NNI_FREE_STRUCT(p); } static int -tcp_pipe_init(tcp_pipe **pipep, tcp_ep *ep, void *tpp) +tcptran_pipe_init(tcptran_pipe **pipep, void *conn) { - tcp_pipe *p; - int rv; + tcptran_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->txaio, tcp_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, tcp_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negaio, tcp_pipe_nego_cb, p)) != 0)) { - tcp_pipe_fini(p); + if (((rv = nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->negaio, tcptran_pipe_nego_cb, p)) != 0)) { + tcptran_pipe_fini(p); return (rv); } nni_aio_list_init(&p->recvq); nni_aio_list_init(&p->sendq); - p->proto = ep->proto; - p->rcvmax = ep->rcvmax; - p->nodelay = ep->nodelay; - p->keepalive = ep->keepalive; - p->tpp = tpp; - - // We try to set the nodelay and keepalive, but if these fail for - // some reason, its not really fatal to the communication channel. - // So ignore the return values. - (void) nni_plat_tcp_pipe_set_nodelay(tpp, p->nodelay); - (void) nni_plat_tcp_pipe_set_keepalive(tpp, p->keepalive); - - *pipep = p; + p->conn = conn; + *pipep = p; return (0); } static void -tcp_cancel_nego(nni_aio *aio, int rv) +tcptran_pipe_nego_cancel(nni_aio *aio, int rv) { - tcp_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_negaio != aio) { @@ -170,11 +174,11 @@ tcp_cancel_nego(nni_aio *aio, int rv) } static void -tcp_pipe_nego_cb(void *arg) +tcptran_pipe_nego_cb(void *arg) { - tcp_pipe *p = arg; - nni_aio * aio = p->negaio; - int rv; + tcptran_pipe *p = arg; + nni_aio * aio = p->negaio; + int rv; nni_mtx_lock(&p->mtx); if ((rv = nni_aio_result(aio)) != 0) { @@ -194,7 +198,7 @@ tcp_pipe_nego_cb(void *arg) iov.iov_buf = &p->txlen[p->gottxhead]; // send it down... nni_aio_set_iov(aio, 1, &iov); - nni_plat_tcp_pipe_send(p->tpp, aio); + nni_tcp_conn_send(p->conn, aio); nni_mtx_unlock(&p->mtx); return; } @@ -203,7 +207,7 @@ tcp_pipe_nego_cb(void *arg) iov.iov_len = p->wantrxhead - p->gotrxhead; iov.iov_buf = &p->rxlen[p->gotrxhead]; nni_aio_set_iov(aio, 1, &iov); - nni_plat_tcp_pipe_recv(p->tpp, aio); + nni_tcp_conn_recv(p->conn, aio); nni_mtx_unlock(&p->mtx); return; } @@ -227,14 +231,14 @@ done: } static void -tcp_pipe_send_cb(void *arg) +tcptran_pipe_send_cb(void *arg) { - tcp_pipe *p = arg; - int rv; - nni_aio * aio; - size_t n; - nni_msg * msg; - nni_aio * txaio = p->txaio; + tcptran_pipe *p = arg; + int rv; + nni_aio * aio; + size_t n; + nni_msg * msg; + nni_aio * txaio = p->txaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->sendq); @@ -254,16 +258,14 @@ tcp_pipe_send_cb(void *arg) n = nni_aio_count(txaio); nni_aio_iov_advance(txaio, n); if (nni_aio_iov_count(txaio) > 0) { - nni_plat_tcp_pipe_send(p->tpp, txaio); + nni_tcp_conn_send(p->conn, txaio); nni_mtx_unlock(&p->mtx); return; } nni_aio_list_remove(aio); - if (!nni_list_empty(&p->sendq)) { - // schedule next send - tcp_pipe_dosend(p, nni_list_first(&p->sendq)); - } + tcptran_pipe_send_start(p); + nni_mtx_unlock(&p->mtx); msg = nni_aio_get_msg(aio); @@ -274,14 +276,14 @@ tcp_pipe_send_cb(void *arg) } static void -tcp_pipe_recv_cb(void *arg) +tcptran_pipe_recv_cb(void *arg) { - tcp_pipe *p = arg; - nni_aio * aio; - int rv; - size_t n; - nni_msg * msg; - nni_aio * rxaio = p->rxaio; + tcptran_pipe *p = arg; + nni_aio * aio; + int rv; + size_t n; + nni_msg * msg; + nni_aio * rxaio = p->rxaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->recvq); @@ -293,7 +295,7 @@ tcp_pipe_recv_cb(void *arg) n = nni_aio_count(rxaio); nni_aio_iov_advance(rxaio, n); if (nni_aio_iov_count(rxaio) > 0) { - nni_plat_tcp_pipe_recv(p->tpp, rxaio); + nni_tcp_conn_recv(p->conn, rxaio); nni_mtx_unlock(&p->mtx); return; } @@ -325,7 +327,7 @@ tcp_pipe_recv_cb(void *arg) iov.iov_len = (size_t) len; nni_aio_set_iov(rxaio, 1, &iov); - nni_plat_tcp_pipe_recv(p->tpp, rxaio); + nni_tcp_conn_recv(p->conn, rxaio); nni_mtx_unlock(&p->mtx); return; } @@ -336,7 +338,7 @@ tcp_pipe_recv_cb(void *arg) msg = p->rxmsg; p->rxmsg = NULL; if (!nni_list_empty(&p->recvq)) { - tcp_pipe_dorecv(p); + tcptran_pipe_recv_start(p); } nni_mtx_unlock(&p->mtx); @@ -357,9 +359,9 @@ recv_error: } static void -tcp_cancel_tx(nni_aio *aio, int rv) +tcptran_pipe_send_cancel(nni_aio *aio, int rv) { - tcp_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -381,14 +383,19 @@ tcp_cancel_tx(nni_aio *aio, int rv) } static void -tcp_pipe_dosend(tcp_pipe *p, nni_aio *aio) +tcptran_pipe_send_start(tcptran_pipe *p) { + nni_aio *aio; nni_aio *txaio; nni_msg *msg; int niov; nni_iov iov[3]; uint64_t len; + if ((aio = nni_list_first(&p->sendq)) == NULL) { + return; + } + // This runs to send the message. msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -411,35 +418,35 @@ tcp_pipe_dosend(tcp_pipe *p, nni_aio *aio) niov++; } nni_aio_set_iov(txaio, niov, iov); - nni_plat_tcp_pipe_send(p->tpp, txaio); + nni_tcp_conn_send(p->conn, txaio); } static void -tcp_pipe_send(void *arg, nni_aio *aio) +tcptran_pipe_send(void *arg, nni_aio *aio) { - tcp_pipe *p = arg; - int rv; + tcptran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tcp_cancel_tx, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tcptran_pipe_send_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { - tcp_pipe_dosend(p, aio); + tcptran_pipe_send_start(p); } nni_mtx_unlock(&p->mtx); } static void -tcp_cancel_rx(nni_aio *aio, int rv) +tcptran_pipe_recv_cancel(nni_aio *aio, int rv) { - tcp_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -460,7 +467,7 @@ tcp_cancel_rx(nni_aio *aio, int rv) } static void -tcp_pipe_dorecv(tcp_pipe *p) +tcptran_pipe_recv_start(tcptran_pipe *p) { nni_aio *rxaio; nni_iov iov; @@ -472,20 +479,20 @@ tcp_pipe_dorecv(tcp_pipe *p) iov.iov_len = sizeof(p->rxlen); nni_aio_set_iov(rxaio, 1, &iov); - nni_plat_tcp_pipe_recv(p->tpp, rxaio); + nni_tcp_conn_recv(p->conn, rxaio); } static void -tcp_pipe_recv(void *arg, nni_aio *aio) +tcptran_pipe_recv(void *arg, nni_aio *aio) { - tcp_pipe *p = arg; - int rv; + tcptran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tcp_cancel_rx, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tcptran_pipe_recv_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; @@ -493,75 +500,74 @@ tcp_pipe_recv(void *arg, nni_aio *aio) nni_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { - tcp_pipe_dorecv(p); + tcptran_pipe_recv_start(p); } nni_mtx_unlock(&p->mtx); } static uint16_t -tcp_pipe_peer(void *arg) +tcptran_pipe_peer(void *arg) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; return (p->peer); } static int -tcp_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_pipe * p = arg; - int rv; - nni_sockaddr sa; + tcptran_pipe *p = arg; + int rv; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); - if ((rv = nni_plat_tcp_pipe_sockname(p->tpp, &sa)) == 0) { + if ((rv = nni_tcp_conn_sockname(p->conn, &sa)) == 0) { rv = nni_copyout_sockaddr(&sa, v, szp, t); } return (rv); } static int -tcp_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_pipe * p = arg; - int rv; - nni_sockaddr sa; + tcptran_pipe *p = arg; + int rv; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); - if ((rv = nni_plat_tcp_pipe_peername(p->tpp, &sa)) == 0) { + if ((rv = nni_tcp_conn_peername(p->conn, &sa)) == 0) { rv = nni_copyout_sockaddr(&sa, v, szp, t); } return (rv); } static int -tcp_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; return (nni_copyout_bool(p->keepalive, v, szp, t)); } static int -tcp_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_pipe *p = arg; + tcptran_pipe *p = arg; return (nni_copyout_bool(p->nodelay, v, szp, t)); } -// Note that the url *must* be in a modifiable buffer. static void -tcp_pipe_start(void *arg, nni_aio *aio) +tcptran_pipe_start(void *arg, nni_aio *aio) { - tcp_pipe *p = arg; - nni_aio * negaio; - nni_iov iov; - int rv; + tcptran_pipe *p = arg; + nni_aio * negaio; + nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tcp_cancel_nego, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tcptran_pipe_nego_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; @@ -582,35 +588,39 @@ tcp_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; nni_aio_set_iov(negaio, 1, &iov); - nni_plat_tcp_pipe_send(p->tpp, negaio); + nni_tcp_conn_send(p->conn, negaio); nni_mtx_unlock(&p->mtx); } static void -tcp_ep_fini(void *arg) +tcptran_dialer_fini(void *arg) { - tcp_ep *ep = arg; + tcptran_dialer *d = arg; - nni_aio_stop(ep->aio); - if (ep->tep != NULL) { - nni_plat_tcp_ep_fini(ep->tep); + nni_aio_stop(d->aio); + if (d->dialer != NULL) { + nni_tcp_dialer_fini(d->dialer); } - nni_aio_fini(ep->aio); - nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); + nni_aio_fini(d->aio); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +tcptran_dialer_close(void *arg) +{ + tcptran_dialer *d = arg; + + nni_aio_close(d->aio); + nni_tcp_dialer_close(d->dialer); } static int -tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) +tcptran_dialer_init(void **dp, nni_url *url, nni_sock *sock) { - tcp_ep * ep; - int rv; - char * host; - char * serv; - nni_sockaddr rsa, lsa; - nni_aio * aio; - int passive; - uint16_t af; + tcptran_dialer *d; + int rv; + uint16_t af; if (strcmp(url->u_scheme, "tcp") == 0) { af = NNG_AF_UNSPEC; @@ -627,345 +637,543 @@ tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (NNG_EADDRINVAL); } if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) || - (url->u_query != NULL)) { + (url->u_query != NULL) || (strlen(url->u_hostname) == 0) || + (strlen(url->u_port) == 0)) { return (NNG_EADDRINVAL); } - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); } + nni_mtx_init(&d->mtx); - if (strlen(url->u_hostname) == 0) { - host = NULL; - } else { - host = url->u_hostname; + if (((rv = nni_tcp_dialer_init(&d->dialer)) != 0) || + ((rv = nni_aio_init(&d->aio, tcptran_dialer_cb, d)) != 0)) { + tcptran_dialer_fini(d); + return (rv); } - if (strlen(url->u_port) == 0) { - serv = NULL; - } else { - serv = url->u_port; - } - // XXX: arguably we could defer this part to the point we do a bind - // or connect! - if (mode == NNI_EP_MODE_DIAL) { - passive = 0; - lsa.s_family = af; - nni_aio_set_input(aio, 0, &rsa); - if ((host == NULL) || (serv == NULL)) { - nni_aio_fini(aio); - return (NNG_EADDRINVAL); + d->url = url; + d->proto = nni_sock_proto_id(sock); + d->nodelay = true; + d->keepalive = false; + d->af = af; + + *dp = d; + return (0); +} + +static void +tcptran_dialer_cb(void *arg) +{ + tcptran_dialer *d = arg; + tcptran_pipe * p; + nni_tcp_conn * conn; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + aio = d->user_aio; + rv = nni_aio_result(d->aio); + + if (aio == NULL) { + nni_mtx_unlock(&d->mtx); + if ((rv == 0) && !d->resolving) { + conn = nni_aio_get_output(d->aio, 0); + nni_tcp_conn_fini(conn); } - } else { - passive = 1; - rsa.s_family = af; - nni_aio_set_input(aio, 0, &lsa); + return; } - nni_plat_tcp_resolv(host, serv, af, passive, aio); - nni_aio_wait(aio); - if ((rv = nni_aio_result(aio)) != 0) { - nni_aio_fini(aio); - return (rv); + if (rv != 0) { + d->user_aio = NULL; + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; } - nni_aio_fini(aio); + if (d->resolving) { + // Name resolution complete. Now go to next step. + d->resolving = false; + nni_tcp_dialer_dial(d->dialer, &d->sa, d->aio); + nni_mtx_unlock(&d->mtx); + return; + } - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); + d->user_aio = NULL; + conn = nni_aio_get_output(d->aio, 0); + NNI_ASSERT(conn != NULL); + if ((rv = tcptran_pipe_init(&p, conn)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_tcp_conn_fini(conn); + nni_aio_finish_error(aio, rv); + return; } - nni_mtx_init(&ep->mtx); - ep->url = url; - if ((rv = nni_plat_tcp_ep_init(&ep->tep, &lsa, &rsa, mode)) != 0) { - tcp_ep_fini(ep); - return (rv); + p->proto = d->proto; + p->rcvmax = d->rcvmax; + p->nodelay = d->nodelay; + p->keepalive = d->keepalive; + nni_mtx_unlock(&d->mtx); + + (void) nni_tcp_conn_set_nodelay(conn, p->nodelay); + (void) nni_tcp_conn_set_keepalive(conn, p->keepalive); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); +} + +static void +tcptran_dialer_cancel(nni_aio *aio, int rv) +{ + tcptran_dialer *d = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&d->mtx); + if (d->user_aio != aio) { + nni_mtx_unlock(&d->mtx); + return; } + d->user_aio = NULL; + nni_mtx_unlock(&d->mtx); - if ((rv = nni_aio_init(&ep->aio, tcp_ep_cb, ep)) != 0) { - tcp_ep_fini(ep); - return (rv); + nni_aio_abort(d->aio, rv); + nni_aio_finish_error(aio, rv); +} + +static void +tcptran_dialer_connect(void *arg, nni_aio *aio) +{ + tcptran_dialer *d = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; } - ep->proto = nni_sock_proto_id(sock); - ep->nodelay = true; - ep->keepalive = false; + nni_mtx_lock(&d->mtx); + NNI_ASSERT(d->user_aio == NULL); - *epp = ep; - return (0); + if ((rv = nni_aio_schedule(aio, tcptran_dialer_cancel, d)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; + } + d->user_aio = aio; + + d->resolving = true; + + // Start the name resolution. Callback will see resolving, and then + // switch to doing actual connect. + nni_aio_set_input(d->aio, 0, &d->sa); + nni_tcp_resolv(d->url->u_hostname, d->url->u_port, d->af, 0, d->aio); + nni_mtx_unlock(&d->mtx); } static int -tcp_dialer_init(void **epp, nni_url *url, nni_sock *sock) +tcptran_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) { - return (tcp_ep_init(epp, url, sock, NNI_EP_MODE_DIAL)); + tcptran_dialer *d = arg; + + return (nni_copyout_str(d->url->u_rawurl, v, szp, t)); } static int -tcp_listener_init(void **epp, nni_url *url, nni_sock *sock) +tcptran_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { - return (tcp_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN)); + tcptran_dialer *d = arg; + return (nni_copyout_size(d->rcvmax, v, szp, t)); } -static void -tcp_ep_close(void *arg) +static int +tcptran_dialer_set_recvmaxsz( + void *arg, const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - - nni_aio_close(ep->aio); + tcptran_dialer *d = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->rcvmax = val; + nni_mtx_unlock(&d->mtx); + } + return (rv); +} - nni_mtx_lock(&ep->mtx); - nni_plat_tcp_ep_close(ep->tep); - nni_mtx_unlock(&ep->mtx); +static int +tcptran_dialer_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + tcptran_dialer *d = arg; + int rv; + nni_mtx_lock(&d->mtx); + rv = nni_copyout_bool(d->nodelay, v, szp, t); + nni_mtx_unlock(&d->mtx); + return (rv); } static int -tcp_ep_bind(void *arg) +tcptran_dialer_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - int rv; + tcptran_dialer *d = arg; + bool val; + int rv; + if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->nodelay = val; + nni_mtx_unlock(&d->mtx); + } + return (rv); +} - nni_mtx_lock(&ep->mtx); - rv = nni_plat_tcp_ep_listen(ep->tep, &ep->bsa); - nni_mtx_unlock(&ep->mtx); +static int +tcptran_dialer_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + tcptran_dialer *d = arg; + return (nni_copyout_bool(d->keepalive, v, szp, t)); +} +static int +tcptran_dialer_set_keepalive( + void *arg, const void *v, size_t sz, nni_opt_type t) +{ + tcptran_dialer *d = arg; + bool val; + int rv; + if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->keepalive = val; + nni_mtx_unlock(&d->mtx); + } return (rv); } static void -tcp_ep_finish(tcp_ep *ep) +tcptran_listener_fini(void *arg) { - nni_aio * aio; - int rv; - tcp_pipe *pipe = NULL; + tcptran_listener *l = arg; - if ((rv = nni_aio_result(ep->aio)) != 0) { - goto done; + nni_aio_stop(l->aio); + if (l->listener != NULL) { + nni_tcp_listener_fini(l->listener); + } + nni_aio_fini(l->aio); + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +static int +tcptran_listener_init(void **lp, nni_url *url, nni_sock *sock) +{ + tcptran_listener *l; + int rv; + char * host; + nni_aio * aio; + uint16_t af; + + if (strcmp(url->u_scheme, "tcp") == 0) { + af = NNG_AF_UNSPEC; + } else if (strcmp(url->u_scheme, "tcp4") == 0) { + af = NNG_AF_INET; + } else if (strcmp(url->u_scheme, "tcp6") == 0) { + af = NNG_AF_INET6; + } else { + return (NNG_EADDRINVAL); } - NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); - // Attempt to allocate the parent pipe. If this fails we'll - // drop the connection (ENOMEM probably). - rv = tcp_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); + // Check for invalid URL components. + if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) { + return (NNG_EADDRINVAL); + } + if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) || + (url->u_query != NULL)) { + return (NNG_EADDRINVAL); + } -done: - aio = ep->user_aio; - ep->user_aio = NULL; + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&l->mtx); + l->url = url; - if ((aio != NULL) && (rv == 0)) { - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); - return; + if (strlen(url->u_hostname) == 0) { + host = NULL; + } else { + host = url->u_hostname; } - if (pipe != NULL) { - tcp_pipe_fini(pipe); + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + tcptran_listener_fini(l); + return (rv); } - if (aio != NULL) { - NNI_ASSERT(rv != 0); - nni_aio_finish_error(aio, rv); + + // XXX: We are doing lookup at listener initialization. There is + // a valid argument that this should be done at bind time, but that + // would require making bind asynchronous. In some ways this would + // be worse than the cost of just waiting here. We always recommend + // using local IP addresses rather than names when possible. + + nni_aio_set_input(aio, 0, &l->sa); + + nni_tcp_resolv(host, url->u_port, af, 1, aio); + nni_aio_wait(aio); + rv = nni_aio_result(aio); + nni_aio_fini(aio); + + if (rv != 0) { + tcptran_listener_fini(l); + return (rv); + } + + if ((rv = nni_tcp_listener_init(&l->listener)) != 0) { + tcptran_listener_fini(l); + return (rv); + } + + if ((rv = nni_aio_init(&l->aio, tcptran_listener_cb, l)) != 0) { + tcptran_listener_fini(l); + return (rv); } + l->proto = nni_sock_proto_id(sock); + l->nodelay = true; + l->keepalive = false; + l->bsa = l->sa; + + *lp = l; + return (0); } static void -tcp_ep_cb(void *arg) +tcptran_listener_close(void *arg) { - tcp_ep *ep = arg; + tcptran_listener *l = arg; - nni_mtx_lock(&ep->mtx); - tcp_ep_finish(ep); - nni_mtx_unlock(&ep->mtx); + nni_aio_close(l->aio); + nni_tcp_listener_close(l->listener); } -static void -tcp_cancel_ep(nni_aio *aio, int rv) +static int +tcptran_listener_bind(void *arg) { - tcp_ep *ep = nni_aio_get_prov_data(aio); + tcptran_listener *l = arg; + int rv; - nni_mtx_lock(&ep->mtx); - if (ep->user_aio != aio) { - nni_mtx_unlock(&ep->mtx); - return; - } - ep->user_aio = NULL; - nni_mtx_unlock(&ep->mtx); + l->bsa = l->sa; + nni_mtx_lock(&l->mtx); + rv = nni_tcp_listener_listen(l->listener, &l->bsa); + nni_mtx_unlock(&l->mtx); - nni_aio_abort(ep->aio, rv); - nni_aio_finish_error(aio, rv); + return (rv); } static void -tcp_ep_accept(void *arg, nni_aio *aio) +tcptran_listener_cb(void *arg) { - tcp_ep *ep = arg; - int rv; + tcptran_listener *l = arg; + nni_aio * aio; + int rv; + tcptran_pipe * p = NULL; + nni_tcp_conn * conn; + + nni_mtx_lock(&l->mtx); + rv = nni_aio_result(l->aio); + aio = l->user_aio; + l->user_aio = NULL; + + if (aio == NULL) { + nni_mtx_unlock(&l->mtx); + if (rv == 0) { + conn = nni_aio_get_output(l->aio, 0); + nni_tcp_conn_fini(conn); + } + return; + } - if (nni_aio_begin(aio) != 0) { + if (rv != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); return; } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - if ((rv = nni_aio_schedule(aio, tcp_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + conn = nni_aio_get_output(l->aio, 0); + + NNI_ASSERT(conn != NULL); + if ((rv = tcptran_pipe_init(&p, conn)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_tcp_conn_fini(conn); nni_aio_finish_error(aio, rv); return; } - ep->user_aio = aio; - nni_plat_tcp_ep_accept(ep->tep, ep->aio); - nni_mtx_unlock(&ep->mtx); + p->proto = l->proto; + p->rcvmax = l->rcvmax; + p->nodelay = l->nodelay; + p->keepalive = l->keepalive; + nni_mtx_unlock(&l->mtx); + + (void) nni_tcp_conn_set_nodelay(conn, p->nodelay); + (void) nni_tcp_conn_set_keepalive(conn, p->keepalive); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } static void -tcp_ep_connect(void *arg, nni_aio *aio) +tcptran_listener_cancel(nni_aio *aio, int rv) { - tcp_ep *ep = arg; - int rv; + tcptran_listener *l = nni_aio_get_prov_data(aio); - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - - if ((rv = nni_aio_schedule(aio, tcp_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); - nni_aio_finish_error(aio, rv); + nni_mtx_lock(&l->mtx); + if (l->user_aio != aio) { + nni_mtx_unlock(&l->mtx); return; } - ep->user_aio = aio; + l->user_aio = NULL; + nni_mtx_unlock(&l->mtx); - nni_plat_tcp_ep_connect(ep->tep, ep->aio); - nni_mtx_unlock(&ep->mtx); + nni_aio_abort(l->aio, rv); + nni_aio_finish_error(aio, rv); } -static int -tcp_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +static void +tcptran_listener_accept(void *arg, nni_aio *aio) { - tcp_ep *ep = arg; - size_t val; - int rv; - if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { - nni_mtx_lock(&ep->mtx); - ep->rcvmax = val; - nni_mtx_unlock(&ep->mtx); + tcptran_listener *l = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; } - return (rv); -} + nni_mtx_lock(&l->mtx); + NNI_ASSERT(l->user_aio == NULL); -static int -tcp_ep_chk_recvmaxsz(const void *v, size_t sz, nni_opt_type t) -{ - return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); + if ((rv = nni_aio_schedule(aio, tcptran_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + l->user_aio = aio; + + nni_tcp_listener_accept(l->listener, l->aio); + nni_mtx_unlock(&l->mtx); } static int -tcp_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t) +tcptran_listener_set_nodelay( + void *arg, const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - bool val; - int rv; + tcptran_listener *l = arg; + bool val; + int rv; if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { - nni_mtx_lock(&ep->mtx); - ep->nodelay = val; - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->mtx); + l->nodelay = val; + nni_mtx_unlock(&l->mtx); } return (rv); } static int -tcp_ep_chk_bool(const void *v, size_t sz, nni_opt_type t) +tcptran_listener_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) { - return (nni_copyin_bool(NULL, v, sz, t)); + tcptran_listener *l = arg; + int rv; + nni_mtx_lock(&l->mtx); + rv = nni_copyout_bool(l->nodelay, v, szp, t); + nni_mtx_unlock(&l->mtx); + return (rv); } static int -tcp_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_listener_set_recvmaxsz( + void *arg, const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_bool(ep->nodelay, v, szp, t); - nni_mtx_unlock(&ep->mtx); + tcptran_listener *l = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&l->mtx); + l->rcvmax = val; + nni_mtx_unlock(&l->mtx); + } return (rv); } static int -tcp_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t) +tcptran_listener_set_keepalive( + void *arg, const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - bool val; - int rv; + tcptran_listener *l = arg; + bool val; + int rv; if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { - nni_mtx_lock(&ep->mtx); - ep->keepalive = val; - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->mtx); + l->keepalive = val; + nni_mtx_unlock(&l->mtx); } return (rv); } static int -tcp_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_listener_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_bool(ep->keepalive, v, szp, t); - nni_mtx_unlock(&ep->mtx); + tcptran_listener *l = arg; + int rv; + nni_mtx_lock(&l->mtx); + rv = nni_copyout_bool(l->keepalive, v, szp, t); + nni_mtx_unlock(&l->mtx); return (rv); } static int -tcp_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_ep *ep = arg; + tcptran_listener *l = arg; + char ustr[128]; + char ipstr[48]; // max for IPv6 addresses including [] + char portstr[6]; // max for 16-bit port - return (nni_copyout_str(ep->url->u_rawurl, v, szp, t)); + nni_ntop(&l->bsa, ipstr, portstr); + snprintf(ustr, sizeof(ustr), "tcp://%s:%s", ipstr, portstr); + return (nni_copyout_str(ustr, v, szp, t)); } static int -tcp_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_listener_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { - tcp_ep *ep = arg; - char ustr[128]; - char ipstr[48]; // max for IPv6 addresses including [] - char portstr[6]; // max for 16-bit port + tcptran_listener *l = arg; + return (nni_copyout_size(l->rcvmax, v, szp, t)); +} - nni_plat_tcp_ntop(&ep->bsa, ipstr, portstr); - snprintf(ustr, sizeof(ustr), "tcp://%s:%s", ipstr, portstr); - return (nni_copyout_str(ustr, v, szp, t)); +static int +tcptran_check_bool(const void *v, size_t sz, nni_opt_type t) +{ + return (nni_copyin_bool(NULL, v, sz, t)); } static int -tcp_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +tcptran_check_recvmaxsz(const void *v, size_t sz, nni_opt_type t) { - tcp_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_size(ep->rcvmax, v, szp, t); - nni_mtx_unlock(&ep->mtx); - return (rv); + return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); } -static nni_tran_option tcp_pipe_options[] = { +static nni_tran_option tcptran_pipe_options[] = { { .o_name = NNG_OPT_LOCADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = tcp_pipe_get_locaddr, + .o_get = tcptran_pipe_get_locaddr, }, { .o_name = NNG_OPT_REMADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = tcp_pipe_get_remaddr, + .o_get = tcptran_pipe_get_remaddr, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_pipe_get_keepalive, + .o_get = tcptran_pipe_get_keepalive, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_pipe_get_nodelay, + .o_get = tcptran_pipe_get_nodelay, }, // terminate list { @@ -973,43 +1181,43 @@ static nni_tran_option tcp_pipe_options[] = { }, }; -static nni_tran_pipe_ops tcp_pipe_ops = { - .p_fini = tcp_pipe_fini, - .p_start = tcp_pipe_start, - .p_stop = tcp_pipe_stop, - .p_send = tcp_pipe_send, - .p_recv = tcp_pipe_recv, - .p_close = tcp_pipe_close, - .p_peer = tcp_pipe_peer, - .p_options = tcp_pipe_options, +static nni_tran_pipe_ops tcptran_pipe_ops = { + .p_fini = tcptran_pipe_fini, + .p_start = tcptran_pipe_start, + .p_stop = tcptran_pipe_stop, + .p_send = tcptran_pipe_send, + .p_recv = tcptran_pipe_recv, + .p_close = tcptran_pipe_close, + .p_peer = tcptran_pipe_peer, + .p_options = tcptran_pipe_options, }; -static nni_tran_option tcp_dialer_options[] = { +static nni_tran_option tcptran_dialer_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = tcp_ep_get_recvmaxsz, - .o_set = tcp_ep_set_recvmaxsz, - .o_chk = tcp_ep_chk_recvmaxsz, + .o_get = tcptran_dialer_get_recvmaxsz, + .o_set = tcptran_dialer_set_recvmaxsz, + .o_chk = tcptran_check_recvmaxsz, }, { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, - .o_get = tcp_dialer_get_url, + .o_get = tcptran_dialer_get_url, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_ep_get_nodelay, - .o_set = tcp_ep_set_nodelay, - .o_chk = tcp_ep_chk_bool, + .o_get = tcptran_dialer_get_nodelay, + .o_set = tcptran_dialer_set_nodelay, + .o_chk = tcptran_check_bool, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_ep_get_keepalive, - .o_set = tcp_ep_set_keepalive, - .o_chk = tcp_ep_chk_bool, + .o_get = tcptran_dialer_get_keepalive, + .o_set = tcptran_dialer_set_keepalive, + .o_chk = tcptran_check_bool, }, // terminate list { @@ -1017,32 +1225,32 @@ static nni_tran_option tcp_dialer_options[] = { }, }; -static nni_tran_option tcp_listener_options[] = { +static nni_tran_option tcptran_listener_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = tcp_ep_get_recvmaxsz, - .o_set = tcp_ep_set_recvmaxsz, - .o_chk = tcp_ep_chk_recvmaxsz, + .o_get = tcptran_listener_get_recvmaxsz, + .o_set = tcptran_listener_set_recvmaxsz, + .o_chk = tcptran_check_recvmaxsz, }, { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, - .o_get = tcp_listener_get_url, + .o_get = tcptran_listener_get_url, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_ep_get_nodelay, - .o_set = tcp_ep_set_nodelay, - .o_chk = tcp_ep_chk_bool, + .o_get = tcptran_listener_get_nodelay, + .o_set = tcptran_listener_set_nodelay, + .o_chk = tcptran_check_bool, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tcp_ep_get_keepalive, - .o_set = tcp_ep_set_keepalive, - .o_chk = tcp_ep_chk_bool, + .o_get = tcptran_listener_get_keepalive, + .o_set = tcptran_listener_set_keepalive, + .o_chk = tcptran_check_bool, }, // terminate list { @@ -1050,51 +1258,51 @@ static nni_tran_option tcp_listener_options[] = { }, }; -static nni_tran_dialer_ops tcp_dialer_ops = { - .d_init = tcp_dialer_init, - .d_fini = tcp_ep_fini, - .d_connect = tcp_ep_connect, - .d_close = tcp_ep_close, - .d_options = tcp_dialer_options, +static nni_tran_dialer_ops tcptran_dialer_ops = { + .d_init = tcptran_dialer_init, + .d_fini = tcptran_dialer_fini, + .d_connect = tcptran_dialer_connect, + .d_close = tcptran_dialer_close, + .d_options = tcptran_dialer_options, }; -static nni_tran_listener_ops tcp_listener_ops = { - .l_init = tcp_listener_init, - .l_fini = tcp_ep_fini, - .l_bind = tcp_ep_bind, - .l_accept = tcp_ep_accept, - .l_close = tcp_ep_close, - .l_options = tcp_listener_options, +static nni_tran_listener_ops tcptran_listener_ops = { + .l_init = tcptran_listener_init, + .l_fini = tcptran_listener_fini, + .l_bind = tcptran_listener_bind, + .l_accept = tcptran_listener_accept, + .l_close = tcptran_listener_close, + .l_options = tcptran_listener_options, }; static nni_tran tcp_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tcp", - .tran_dialer = &tcp_dialer_ops, - .tran_listener = &tcp_listener_ops, - .tran_pipe = &tcp_pipe_ops, - .tran_init = tcp_tran_init, - .tran_fini = tcp_tran_fini, + .tran_dialer = &tcptran_dialer_ops, + .tran_listener = &tcptran_listener_ops, + .tran_pipe = &tcptran_pipe_ops, + .tran_init = tcptran_init, + .tran_fini = tcptran_fini, }; static nni_tran tcp4_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tcp4", - .tran_dialer = &tcp_dialer_ops, - .tran_listener = &tcp_listener_ops, - .tran_pipe = &tcp_pipe_ops, - .tran_init = tcp_tran_init, - .tran_fini = tcp_tran_fini, + .tran_dialer = &tcptran_dialer_ops, + .tran_listener = &tcptran_listener_ops, + .tran_pipe = &tcptran_pipe_ops, + .tran_init = tcptran_init, + .tran_fini = tcptran_fini, }; static nni_tran tcp6_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tcp6", - .tran_dialer = &tcp_dialer_ops, - .tran_listener = &tcp_listener_ops, - .tran_pipe = &tcp_pipe_ops, - .tran_init = tcp_tran_init, - .tran_fini = tcp_tran_fini, + .tran_dialer = &tcptran_dialer_ops, + .tran_listener = &tcptran_listener_ops, + .tran_pipe = &tcptran_pipe_ops, + .tran_init = tcptran_init, + .tran_fini = tcptran_fini, }; int diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 5e1a1e8d..b1bdddb8 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -22,17 +22,19 @@ // supplied as well, and uses the supplemental TLS v1.2 code. It is not // an accident that this very closely resembles the TCP transport itself. -typedef struct tls_pipe tls_pipe; -typedef struct tls_ep tls_ep; - -// tls_pipe is one end of a TLS connection. -struct tls_pipe { - nni_plat_tcp_pipe *tcp; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool nodelay; - bool keepalive; +typedef struct tlstran_ep tlstran_ep; +typedef struct tlstran_dialer tlstran_dialer; +typedef struct tlstran_listener tlstran_listener; +typedef struct tlstran_pipe tlstran_pipe; + +// tlstran_pipe is one end of a TLS connection. +struct tlstran_pipe { + nni_tls *tls; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool nodelay; + bool keepalive; nni_list sendq; nni_list recvq; @@ -49,47 +51,62 @@ struct tls_pipe { nni_aio *negaio; nni_msg *rxmsg; nni_mtx mtx; - nni_tls *tls; }; -struct tls_ep { - nni_plat_tcp_ep *tep; - uint16_t proto; - size_t rcvmax; - int authmode; - nni_aio * aio; - nni_aio * user_aio; - nni_mtx mtx; - nng_tls_config * cfg; - nng_sockaddr bsa; - nni_url * url; - int mode; - bool nodelay; - bool keepalive; +// Stuff that is common to both dialers and listeners. +struct tlstran_ep { + uint16_t proto; + size_t rcvmax; + bool nodelay; + bool keepalive; + int authmode; + nng_tls_config *cfg; + nni_url * url; + nni_mtx mtx; +}; + +struct tlstran_dialer { + tlstran_ep ep; // must be first + nni_tcp_dialer *dialer; + uint16_t af; + nni_aio * aio; + nni_aio * user_aio; + bool resolving; + nng_sockaddr sa; +}; + +struct tlstran_listener { + tlstran_ep ep; // must be first + nni_tcp_listener *listener; + nni_aio * aio; + nni_aio * user_aio; + nng_sockaddr sa; + nng_sockaddr bsa; // bound addr }; -static void tls_pipe_dorecv(tls_pipe *); -static void tls_pipe_dosend(tls_pipe *, nni_aio *); -static void tls_pipe_send_cb(void *); -static void tls_pipe_recv_cb(void *); -static void tls_pipe_nego_cb(void *); -static void tls_ep_cb(void *arg); +static void tlstran_pipe_send_start(tlstran_pipe *); +static void tlstran_pipe_recv_start(tlstran_pipe *); +static void tlstran_pipe_send_cb(void *); +static void tlstran_pipe_recv_cb(void *); +static void tlstran_pipe_nego_cb(void *); +static void tlstran_dialer_cb(void *); +static void tlstran_listener_cb(void *); static int -tls_tran_init(void) +tlstran_init(void) { return (0); } static void -tls_tran_fini(void) +tlstran_fini(void) { } static void -tls_pipe_close(void *arg) +tlstran_pipe_close(void *arg) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; nni_aio_close(p->rxaio); nni_aio_close(p->txaio); @@ -99,9 +116,9 @@ tls_pipe_close(void *arg) } static void -tls_pipe_stop(void *arg) +tlstran_pipe_stop(void *arg) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); @@ -109,14 +126,13 @@ tls_pipe_stop(void *arg) } static void -tls_pipe_fini(void *arg) +tlstran_pipe_fini(void *arg) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; nni_aio_fini(p->rxaio); nni_aio_fini(p->txaio); nni_aio_fini(p->negaio); - if (p->tls != NULL) { nni_tls_fini(p->tls); } @@ -125,41 +141,34 @@ tls_pipe_fini(void *arg) } static int -tls_pipe_init(tls_pipe **pipep, tls_ep *ep, void *tpp) +tlstran_pipe_init(tlstran_pipe **pipep, nni_tls *tls) { - tls_pipe * p; - nni_plat_tcp_pipe *tcp = tpp; - int rv; + tlstran_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_tls_init(&p->tls, ep->cfg, tcp)) != 0) || - ((rv = nni_aio_init(&p->txaio, tls_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, tls_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negaio, tls_pipe_nego_cb, p)) != 0)) { - tls_pipe_fini(p); + if (((rv = nni_aio_init(&p->txaio, tlstran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, tlstran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->negaio, tlstran_pipe_nego_cb, p)) != 0)) { + tlstran_pipe_fini(p); return (rv); } nni_aio_list_init(&p->recvq); nni_aio_list_init(&p->sendq); - p->proto = ep->proto; - p->rcvmax = ep->rcvmax; - p->tcp = tcp; - p->keepalive = ep->keepalive; - p->nodelay = ep->nodelay; - + p->tls = tls; *pipep = p; return (0); } static void -tls_cancel_nego(nni_aio *aio, int rv) +tlstran_pipe_cancel_nego(nni_aio *aio, int rv) { - tls_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_negaio != aio) { @@ -174,11 +183,11 @@ tls_cancel_nego(nni_aio *aio, int rv) } static void -tls_pipe_nego_cb(void *arg) +tlstran_pipe_nego_cb(void *arg) { - tls_pipe *p = arg; - nni_aio * aio = p->negaio; - int rv; + tlstran_pipe *p = arg; + nni_aio * aio = p->negaio; + int rv; nni_mtx_lock(&p->mtx); if ((rv = nni_aio_result(aio)) != 0) { @@ -237,14 +246,14 @@ done: } static void -tls_pipe_send_cb(void *arg) +tlstran_pipe_send_cb(void *arg) { - tls_pipe *p = arg; - int rv; - nni_aio * aio; - size_t n; - nni_msg * msg; - nni_aio * txaio = p->txaio; + tlstran_pipe *p = arg; + int rv; + nni_aio * aio; + size_t n; + nni_msg * msg; + nni_aio * txaio = p->txaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->sendq); @@ -269,9 +278,7 @@ tls_pipe_send_cb(void *arg) return; } nni_aio_list_remove(aio); - if (!nni_list_empty(&p->sendq)) { - tls_pipe_dosend(p, nni_list_first(&p->sendq)); - } + tlstran_pipe_send_start(p); nni_mtx_unlock(&p->mtx); msg = nni_aio_get_msg(aio); @@ -282,14 +289,14 @@ tls_pipe_send_cb(void *arg) } static void -tls_pipe_recv_cb(void *arg) +tlstran_pipe_recv_cb(void *arg) { - tls_pipe *p = arg; - nni_aio * aio; - int rv; - size_t n; - nni_msg * msg; - nni_aio * rxaio = p->rxaio; + tlstran_pipe *p = arg; + nni_aio * aio; + int rv; + size_t n; + nni_msg * msg; + nni_aio * rxaio = p->rxaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->recvq); @@ -345,7 +352,7 @@ tls_pipe_recv_cb(void *arg) msg = p->rxmsg; p->rxmsg = NULL; if (!nni_list_empty(&p->recvq)) { - tls_pipe_dorecv(p); + tlstran_pipe_recv_start(p); } nni_mtx_unlock(&p->mtx); @@ -365,9 +372,9 @@ recv_error: } static void -tls_cancel_tx(nni_aio *aio, int rv) +tlstran_pipe_send_cancel(nni_aio *aio, int rv) { - tls_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -389,14 +396,19 @@ tls_cancel_tx(nni_aio *aio, int rv) } static void -tls_pipe_dosend(tls_pipe *p, nni_aio *aio) +tlstran_pipe_send_start(tlstran_pipe *p) { nni_aio *txaio; + nni_aio *aio; nni_msg *msg; int niov; nni_iov iov[3]; uint64_t len; + if ((aio = nni_list_first(&p->sendq)) == NULL) { + return; + } + msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -423,31 +435,31 @@ tls_pipe_dosend(tls_pipe *p, nni_aio *aio) } static void -tls_pipe_send(void *arg, nni_aio *aio) +tlstran_pipe_send(void *arg, nni_aio *aio) { - tls_pipe *p = arg; - int rv; + tlstran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tls_cancel_tx, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tlstran_pipe_send_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { - tls_pipe_dosend(p, aio); + tlstran_pipe_send_start(p); } nni_mtx_unlock(&p->mtx); } static void -tls_cancel_rx(nni_aio *aio, int rv) +tlstran_pipe_recv_cancel(nni_aio *aio, int rv) { - tls_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -468,7 +480,7 @@ tls_cancel_rx(nni_aio *aio, int rv) } static void -tls_pipe_dorecv(tls_pipe *p) +tlstran_pipe_recv_start(tlstran_pipe *p) { nni_aio *rxaio; nni_iov iov; @@ -484,16 +496,16 @@ tls_pipe_dorecv(tls_pipe *p) } static void -tls_pipe_recv(void *arg, nni_aio *aio) +tlstran_pipe_recv(void *arg, nni_aio *aio) { - tls_pipe *p = arg; - int rv; + tlstran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tls_cancel_rx, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tlstran_pipe_recv_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; @@ -501,25 +513,25 @@ tls_pipe_recv(void *arg, nni_aio *aio) nni_aio_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { - tls_pipe_dorecv(p); + tlstran_pipe_recv_start(p); } nni_mtx_unlock(&p->mtx); } static uint16_t -tls_pipe_peer(void *arg) +tlstran_pipe_peer(void *arg) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; return (p->peer); } static int -tls_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_pipe * p = arg; - int rv; - nni_sockaddr sa; + tlstran_pipe *p = arg; + int rv; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_tls_sockname(p->tls, &sa)) == 0) { @@ -529,11 +541,11 @@ tls_pipe_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tls_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_pipe * p = arg; - int rv; - nni_sockaddr sa; + tlstran_pipe *p = arg; + int rv; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_tls_peername(p->tls, &sa)) == 0) { @@ -543,32 +555,32 @@ tls_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tls_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_pipe_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; return (nni_copyout_bool(p->keepalive, v, szp, t)); } static int -tls_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_pipe_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; return (nni_copyout_bool(p->nodelay, v, szp, t)); } static void -tls_pipe_start(void *arg, nni_aio *aio) +tlstran_pipe_start(void *arg, nni_aio *aio) { - tls_pipe *p = arg; - nni_aio * negaio; - nni_iov iov; - int rv; + tlstran_pipe *p = arg; + nni_aio * negaio; + nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, tls_cancel_nego, p)) != 0) { + if ((rv = nni_aio_schedule(aio, tlstran_pipe_cancel_nego, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; @@ -594,35 +606,39 @@ tls_pipe_start(void *arg, nni_aio *aio) } static void -tls_ep_fini(void *arg) +tlstran_dialer_fini(void *arg) { - tls_ep *ep = arg; + tlstran_dialer *d = arg; - nni_aio_stop(ep->aio); - if (ep->tep != NULL) { - nni_plat_tcp_ep_fini(ep->tep); + nni_aio_stop(d->aio); + if (d->dialer != NULL) { + nni_tcp_dialer_fini(d->dialer); } - if (ep->cfg) { - nni_tls_config_fini(ep->cfg); + nni_aio_fini(d->aio); + if (d->ep.cfg != NULL) { + nni_tls_config_fini(d->ep.cfg); } - nni_aio_fini(ep->aio); - nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); + nni_mtx_fini(&d->ep.mtx); + NNI_FREE_STRUCT(d); +} + +static void +tlstran_dialer_close(void *arg) +{ + tlstran_dialer *d = arg; + + nni_aio_close(d->aio); + nni_tcp_dialer_close(d->dialer); } static int -tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) +tlstran_dialer_init(void **dp, nni_url *url, nni_sock *sock) { - tls_ep * ep; - int rv; - char * host; - char * serv; - nni_sockaddr rsa, lsa; - nni_aio * aio; - int passive; - nng_tls_mode tlsmode; - nng_tls_auth_mode authmode; - uint16_t af; + tlstran_dialer *d; + int rv; + uint16_t af; + char * host = url->u_hostname; + char * port = url->u_port; if (strcmp(url->u_scheme, "tls+tcp") == 0) { af = NNG_AF_UNSPEC; @@ -639,232 +655,374 @@ tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (NNG_EADDRINVAL); } if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) || - (url->u_query != NULL)) { + (url->u_query != NULL) || (host == NULL) || (port == NULL) || + (strlen(host) == 0) || (strlen(port) == 0)) { return (NNG_EADDRINVAL); } + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + nni_mtx_init(&d->ep.mtx); + d->ep.authmode = NNG_TLS_AUTH_MODE_REQUIRED; + d->ep.url = url; + d->ep.proto = nni_sock_proto_id(sock); + d->ep.nodelay = true; + d->ep.keepalive = false; + + if (((rv = nni_tcp_dialer_init(&d->dialer)) != 0) || + ((rv = nni_tls_config_init(&d->ep.cfg, NNG_TLS_MODE_CLIENT)) != + 0) || + ((rv = nng_tls_config_auth_mode(d->ep.cfg, d->ep.authmode)) != + 0) || + ((rv = nng_tls_config_server_name(d->ep.cfg, host)) != 0) || + ((rv = nni_aio_init(&d->aio, tlstran_dialer_cb, d)) != 0)) { + tlstran_dialer_fini(d); return (rv); } + d->af = af; - if (strlen(url->u_hostname) == 0) { - host = NULL; - } else { - host = url->u_hostname; - } - if (strlen(url->u_port) == 0) { - serv = NULL; - } else { - serv = url->u_port; - } - - if (mode == NNI_EP_MODE_DIAL) { - passive = 0; - tlsmode = NNG_TLS_MODE_CLIENT; - authmode = NNG_TLS_AUTH_MODE_REQUIRED; - lsa.s_family = af; - nni_aio_set_input(aio, 0, &rsa); - if ((host == NULL) || (serv == NULL)) { - nni_aio_fini(aio); - return (NNG_EADDRINVAL); + *dp = d; + return (0); +} + +static void +tlstran_dialer_cb(void *arg) +{ + tlstran_dialer *d = arg; + tlstran_pipe * p; + nni_tcp_conn * conn; + nni_tls * tls; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->ep.mtx); + aio = d->user_aio; + rv = nni_aio_result(d->aio); + + if (aio == NULL) { + nni_mtx_unlock(&d->ep.mtx); + if ((rv == 0) && !d->resolving) { + conn = nni_aio_get_output(d->aio, 0); + nni_tcp_conn_fini(conn); } - } else { - passive = 1; - tlsmode = NNG_TLS_MODE_SERVER; - authmode = NNG_TLS_AUTH_MODE_NONE; - rsa.s_family = af; - nni_aio_set_input(aio, 0, &lsa); + return; } - // XXX: arguably we could defer this part to the point we do a bind - // or connect! - nni_plat_tcp_resolv(host, serv, af, passive, aio); - nni_aio_wait(aio); - if ((rv = nni_aio_result(aio)) != 0) { - nni_aio_fini(aio); - return (rv); + if (rv != 0) { + d->user_aio = NULL; + nni_mtx_unlock(&d->ep.mtx); + nni_aio_finish_error(aio, rv); + return; } - nni_aio_fini(aio); - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); + if (d->resolving) { + // Name resolution complete. Now go to next step. + d->resolving = false; + nni_tcp_dialer_dial(d->dialer, &d->sa, d->aio); + nni_mtx_unlock(&d->ep.mtx); + return; } - nni_mtx_init(&ep->mtx); - ep->url = url; - ep->keepalive = false; - ep->nodelay = true; + d->user_aio = NULL; + conn = nni_aio_get_output(d->aio, 0); + NNI_ASSERT(conn != NULL); - if (((rv = nni_plat_tcp_ep_init(&ep->tep, &lsa, &rsa, mode)) != 0) || - ((rv = nni_tls_config_init(&ep->cfg, tlsmode)) != 0) || - ((rv = nng_tls_config_auth_mode(ep->cfg, authmode)) != 0) || - ((rv = nni_aio_init(&ep->aio, tls_ep_cb, ep)) != 0)) { - tls_ep_fini(ep); - return (rv); + if ((rv = nni_tls_init(&tls, d->ep.cfg, conn)) != 0) { + nni_mtx_unlock(&d->ep.mtx); + nni_tcp_conn_fini(conn); + nni_aio_finish_error(aio, rv); + return; } - if ((tlsmode == NNG_TLS_MODE_CLIENT) && (host != NULL)) { - if ((rv = nng_tls_config_server_name(ep->cfg, host)) != 0) { - tls_ep_fini(ep); - return (rv); - } + + if ((rv = tlstran_pipe_init(&p, tls)) != 0) { + nni_mtx_unlock(&d->ep.mtx); + nni_tls_fini(tls); + nni_aio_finish_error(aio, rv); + return; } - ep->proto = nni_sock_proto_id(sock); - ep->authmode = authmode; - *epp = ep; - return (0); -} + p->proto = d->ep.proto; + p->rcvmax = d->ep.rcvmax; + p->nodelay = d->ep.nodelay; + p->keepalive = d->ep.keepalive; + nni_mtx_unlock(&d->ep.mtx); -static int -tls_dialer_init(void **epp, nni_url *url, nni_sock *sock) -{ - return (tls_ep_init(epp, url, sock, NNI_EP_MODE_DIAL)); + (void) nni_tls_set_nodelay(tls, p->nodelay); + (void) nni_tls_set_keepalive(tls, p->keepalive); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } -static int -tls_listener_init(void **epp, nni_url *url, nni_sock *sock) +static void +tlstran_dialer_cancel(nni_aio *aio, int rv) { - return (tls_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN)); + tlstran_dialer *d = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&d->ep.mtx); + if (d->user_aio != aio) { + nni_mtx_unlock(&d->ep.mtx); + return; + } + d->user_aio = NULL; + nni_mtx_unlock(&d->ep.mtx); + + nni_aio_abort(d->aio, rv); + nni_aio_finish_error(aio, rv); } static void -tls_ep_close(void *arg) +tlstran_dialer_connect(void *arg, nni_aio *aio) { - tls_ep *ep = arg; + tlstran_dialer *d = arg; + int rv; - nni_aio_close(ep->aio); + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&d->ep.mtx); + NNI_ASSERT(d->user_aio == NULL); - nni_mtx_lock(&ep->mtx); - nni_plat_tcp_ep_close(ep->tep); - nni_mtx_unlock(&ep->mtx); + if ((rv = nni_aio_schedule(aio, tlstran_dialer_cancel, d)) != 0) { + nni_mtx_unlock(&d->ep.mtx); + nni_aio_finish_error(aio, rv); + return; + } + d->user_aio = aio; + + d->resolving = true; + + // Start the name resolution. Callback will see resolving, and then + // switch to doing actual connect. + nni_aio_set_input(d->aio, 0, &d->sa); + nni_tcp_resolv( + d->ep.url->u_hostname, d->ep.url->u_port, d->af, 0, d->aio); + nni_mtx_unlock(&d->ep.mtx); } -static int -tls_ep_bind(void *arg) +static void +tlstran_listener_fini(void *arg) { - tls_ep *ep = arg; - int rv; + tlstran_listener *l = arg; - nni_mtx_lock(&ep->mtx); - rv = nni_plat_tcp_ep_listen(ep->tep, &ep->bsa); - nni_mtx_unlock(&ep->mtx); - - return (rv); + nni_aio_stop(l->aio); + if (l->listener != NULL) { + nni_tcp_listener_fini(l->listener); + } + nni_aio_fini(l->aio); + if (l->ep.cfg != NULL) { + nni_tls_config_fini(l->ep.cfg); + } + nni_mtx_fini(&l->ep.mtx); + NNI_FREE_STRUCT(l); } static void -tls_ep_finish(tls_ep *ep) +tlstran_listener_close(void *arg) { - nni_aio * aio; - int rv; - tls_pipe *pipe = NULL; + tlstran_listener *l = arg; - if ((rv = nni_aio_result(ep->aio)) != 0) { - goto done; + nni_aio_close(l->aio); + nni_tcp_listener_close(l->listener); +} + +static int +tlstran_listener_init(void **lp, nni_url *url, nni_sock *sock) +{ + tlstran_listener *l; + int rv; + nni_aio * aio; + uint16_t af; + char * host = url->u_hostname; + char * port = url->u_port; + + if (strcmp(url->u_scheme, "tls+tcp") == 0) { + af = NNG_AF_UNSPEC; + } else if (strcmp(url->u_scheme, "tls+tcp4") == 0) { + af = NNG_AF_INET; + } else if (strcmp(url->u_scheme, "tls+tcp6") == 0) { + af = NNG_AF_INET6; + } else { + return (NNG_EADDRINVAL); + } + + // Check for invalid URL components. + if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) { + return (NNG_EADDRINVAL); + } + if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) || + (url->u_query != NULL)) { + return (NNG_EADDRINVAL); } - NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); - // Attempt to allocate the parent pipe. If this fails we'll - // drop the connection (ENOMEM probably). - rv = tls_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&l->ep.mtx); + l->ep.url = url; + l->ep.authmode = NNG_TLS_AUTH_MODE_NONE; + l->ep.keepalive = false; + l->ep.nodelay = true; + l->ep.proto = nni_sock_proto_id(sock); -done: - aio = ep->user_aio; - ep->user_aio = NULL; + if (strlen(host) == 0) { + host = NULL; + } - if ((aio != NULL) && (rv == 0)) { - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); - return; + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + tlstran_listener_fini(l); + return (rv); } - if (pipe != NULL) { - tls_pipe_fini(pipe); + + // XXX: We are doing lookup at listener initialization. There is + // a valid argument that this should be done at bind time, but that + // would require making bind asynchronous. In some ways this would + // be worse than the cost of just waiting here. We always recommend + // using local IP addresses rather than names when possible. + + nni_aio_set_input(aio, 0, &l->sa); + + nni_tcp_resolv(host, port, af, 1, aio); + nni_aio_wait(aio); + rv = nni_aio_result(aio); + nni_aio_fini(aio); + + if (rv != 0) { + tlstran_listener_fini(l); + return (rv); } - if (aio != NULL) { - NNI_ASSERT(rv != 0); - nni_aio_finish_error(aio, rv); + + if (((rv = nni_tcp_listener_init(&l->listener)) != 0) || + ((rv = nni_tls_config_init(&l->ep.cfg, NNG_TLS_MODE_SERVER)) != + 0) || + ((rv = nng_tls_config_auth_mode(l->ep.cfg, l->ep.authmode)) != + 0) || + ((rv = nni_aio_init(&l->aio, tlstran_listener_cb, l)) != 0)) { + tlstran_listener_fini(l); + return (rv); } + l->bsa = l->sa; + + *lp = l; + return (0); } -static void -tls_ep_cb(void *arg) +static int +tlstran_listener_bind(void *arg) { - tls_ep *ep = arg; + tlstran_listener *l = arg; + int rv; - nni_mtx_lock(&ep->mtx); - tls_ep_finish(ep); - nni_mtx_unlock(&ep->mtx); + l->bsa = l->sa; + nni_mtx_lock(&l->ep.mtx); + rv = nni_tcp_listener_listen(l->listener, &l->bsa); + nni_mtx_unlock(&l->ep.mtx); + + return (rv); } static void -tls_cancel_ep(nni_aio *aio, int rv) +tlstran_listener_cb(void *arg) { - tls_ep *ep = nni_aio_get_prov_data(aio); + tlstran_listener *l = arg; + nni_aio * aio; + int rv; + tlstran_pipe * p = NULL; + nni_tcp_conn * conn; + nni_tls * tls; + + nni_mtx_lock(&l->ep.mtx); + rv = nni_aio_result(l->aio); + aio = l->user_aio; + l->user_aio = NULL; + + if (aio == NULL) { + nni_mtx_unlock(&l->ep.mtx); + if (rv == 0) { + conn = nni_aio_get_output(l->aio, 0); + nni_tcp_conn_fini(conn); + } + return; + } + if (rv != 0) { + nni_mtx_unlock(&l->ep.mtx); + nni_aio_finish_error(aio, rv); + return; + } - nni_mtx_lock(&ep->mtx); - if (ep->user_aio != aio) { - nni_mtx_unlock(&ep->mtx); + conn = nni_aio_get_output(l->aio, 0); + if ((rv = nni_tls_init(&tls, l->ep.cfg, conn)) != 0) { + nni_mtx_unlock(&l->ep.mtx); + nni_tcp_conn_fini(conn); + nni_aio_finish_error(aio, rv); return; } - ep->user_aio = NULL; - nni_mtx_unlock(&ep->mtx); + if ((rv = tlstran_pipe_init(&p, tls)) != 0) { + nni_mtx_unlock(&l->ep.mtx); + nni_tls_fini(tls); + nni_aio_finish_error(aio, rv); + return; + } + p->proto = l->ep.proto; + p->rcvmax = l->ep.rcvmax; + p->nodelay = l->ep.nodelay; + p->keepalive = l->ep.keepalive; - nni_aio_abort(ep->aio, rv); - nni_aio_finish_error(aio, rv); + (void) nni_tls_set_nodelay(tls, p->nodelay); + (void) nni_tls_set_keepalive(tls, p->keepalive); + + nni_mtx_unlock(&l->ep.mtx); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } static void -tls_ep_accept(void *arg, nni_aio *aio) +tlstran_listener_cancel(nni_aio *aio, int rv) { - tls_ep *ep = arg; - int rv; + tlstran_listener *l = nni_aio_get_prov_data(aio); - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - if ((rv = nni_aio_schedule(aio, tls_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); - nni_aio_finish_error(aio, rv); + nni_mtx_lock(&l->ep.mtx); + if (l->user_aio != aio) { + nni_mtx_unlock(&l->ep.mtx); return; } - ep->user_aio = aio; - nni_plat_tcp_ep_accept(ep->tep, ep->aio); - nni_mtx_unlock(&ep->mtx); + l->user_aio = NULL; + nni_mtx_unlock(&l->ep.mtx); + + nni_aio_abort(l->aio, rv); + nni_aio_finish_error(aio, rv); } static void -tls_ep_connect(void *arg, nni_aio *aio) +tlstran_listener_accept(void *arg, nni_aio *aio) { - tls_ep *ep = arg; - int rv; + tlstran_listener *l = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - if ((rv = nni_aio_schedule(aio, tls_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->ep.mtx); + NNI_ASSERT(l->user_aio == NULL); + + if ((rv = nni_aio_schedule(aio, tlstran_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->ep.mtx); nni_aio_finish_error(aio, rv); + return; } - ep->user_aio = aio; - nni_plat_tcp_ep_connect(ep->tep, ep->aio); - nni_mtx_unlock(&ep->mtx); -} + l->user_aio = aio; -static int -tls_ep_chk_bool(const void *v, size_t sz, nni_opt_type t) -{ - return (nni_copyin_bool(NULL, v, sz, t)); + nni_tcp_listener_accept(l->listener, l->aio); + nni_mtx_unlock(&l->ep.mtx); } static int -tls_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - bool val; - int rv; + tlstran_ep *ep = arg; + bool val; + int rv; if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { nni_mtx_lock(&ep->mtx); ep->nodelay = val; @@ -874,10 +1032,10 @@ tls_ep_set_nodelay(void *arg, const void *v, size_t sz, nni_opt_type t) } static int -tls_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; nni_mtx_lock(&ep->mtx); rv = nni_copyout_bool(ep->nodelay, v, szp, t); nni_mtx_unlock(&ep->mtx); @@ -885,11 +1043,25 @@ tls_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tls_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - bool val; - int rv; + tlstran_ep *ep = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&ep->mtx); + ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); + } + return (rv); +} + +static int +tlstran_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + tlstran_ep *ep = arg; + bool val; + int rv; if ((rv = nni_copyin_bool(&val, v, sz, t)) == 0) { nni_mtx_lock(&ep->mtx); ep->keepalive = val; @@ -899,10 +1071,10 @@ tls_ep_set_keepalive(void *arg, const void *v, size_t sz, nni_opt_type t) } static int -tls_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; nni_mtx_lock(&ep->mtx); rv = nni_copyout_bool(ep->keepalive, v, szp, t); nni_mtx_unlock(&ep->mtx); @@ -910,60 +1082,53 @@ tls_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tls_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_ep *ep = arg; - - return (nni_copyout_str(ep->url->u_rawurl, v, szp, t)); + tlstran_ep *ep = arg; + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); } static int -tls_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_check_bool(const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - char ustr[128]; - char ipstr[48]; // max for IPv6 addresses including [] - char portstr[6]; // max for 16-bit port - - nni_plat_tcp_ntop(&ep->bsa, ipstr, portstr); - snprintf(ustr, sizeof(ustr), "tls+tcp://%s:%s", ipstr, portstr); - return (nni_copyout_str(ustr, v, szp, t)); + return (nni_copyin_bool(NULL, v, sz, t)); } static int -tls_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_ep *ep = arg; - size_t val; - int rv; + tlstran_dialer *d = arg; - if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { - nni_mtx_lock(&ep->mtx); - ep->rcvmax = val; - nni_mtx_unlock(&ep->mtx); - } - return (rv); + return (nni_copyout_str(d->ep.url->u_rawurl, v, szp, t)); } static int -tls_ep_chk_recvmaxsz(const void *v, size_t sz, nni_opt_type t) +tlstran_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) { - return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); + tlstran_listener *l = arg; + char ustr[128]; + char ipstr[48]; // max for IPv6 addresses including [] + char portstr[6]; // max for 16-bit port + + nni_mtx_lock(&l->ep.mtx); + nni_ntop(&l->bsa, ipstr, portstr); + nni_mtx_unlock(&l->ep.mtx); + snprintf(ustr, sizeof(ustr), "tls+tcp://%s:%s", ipstr, portstr); + return (nni_copyout_str(ustr, v, szp, t)); } static int -tls_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_check_recvmaxsz(const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_size(ep->rcvmax, v, szp, t); - nni_mtx_unlock(&ep->mtx); - return (rv); + return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); } static int -tls_ep_chk_config(const void *data, size_t sz, nni_opt_type t) +tlstran_check_config(const void *data, size_t sz, nni_opt_type t) { void *v; int rv; @@ -974,9 +1139,9 @@ tls_ep_chk_config(const void *data, size_t sz, nni_opt_type t) } static int -tls_ep_set_config(void *arg, const void *data, size_t sz, nni_opt_type t) +tlstran_ep_set_config(void *arg, const void *data, size_t sz, nni_opt_type t) { - tls_ep * ep = arg; + tlstran_ep * ep = arg; nng_tls_config *cfg, *old; int rv; @@ -998,10 +1163,10 @@ tls_ep_set_config(void *arg, const void *data, size_t sz, nni_opt_type t) } static int -tls_ep_get_config(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_ep_get_config(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; nni_mtx_lock(&ep->mtx); rv = nni_copyout_ptr(ep->cfg, v, szp, t); nni_mtx_unlock(&ep->mtx); @@ -1009,7 +1174,7 @@ tls_ep_get_config(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tls_ep_chk_string(const void *v, size_t sz, nni_opt_type t) +tlstran_check_string(const void *v, size_t sz, nni_opt_type t) { if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) { return (NNG_EBADTYPE); @@ -1021,58 +1186,65 @@ tls_ep_chk_string(const void *v, size_t sz, nni_opt_type t) } static int -tls_ep_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; - if ((rv = tls_ep_chk_string(v, sz, t)) == 0) { + if ((rv = tlstran_check_string(v, sz, t)) == 0) { + nni_mtx_lock(&ep->mtx); rv = nng_tls_config_ca_file(ep->cfg, v); + nni_mtx_unlock(&ep->mtx); } return (rv); } static int -tls_ep_chk_auth_mode(const void *v, size_t sz, nni_opt_type t) +tlstran_check_auth_mode(const void *v, size_t sz, nni_opt_type t) { return (nni_copyin_int(NULL, v, sz, NNG_TLS_AUTH_MODE_NONE, NNG_TLS_AUTH_MODE_REQUIRED, t)); } static int -tls_ep_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - int mode; - int rv; + tlstran_ep *ep = arg; + int mode; + int rv; rv = nni_copyin_int(&mode, v, sz, NNG_TLS_AUTH_MODE_NONE, NNG_TLS_AUTH_MODE_REQUIRED, t); if (rv == 0) { + nni_mtx_lock(&ep->mtx); rv = nng_tls_config_auth_mode(ep->cfg, mode); + nni_mtx_unlock(&ep->mtx); } return (rv); } static int -tls_ep_set_server_name(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_server_name(void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; - if ((rv = tls_ep_chk_string(v, sz, t)) == 0) { + if ((rv = tlstran_check_string(v, sz, t)) == 0) { + nni_mtx_lock(&ep->mtx); rv = nng_tls_config_server_name(ep->cfg, v); + nni_mtx_unlock(&ep->mtx); } return (rv); } static int -tls_ep_set_cert_key_file(void *arg, const void *v, size_t sz, nni_opt_type t) +tlstran_ep_set_cert_key_file( + void *arg, const void *v, size_t sz, nni_opt_type t) { - tls_ep *ep = arg; - int rv; + tlstran_ep *ep = arg; + int rv; - if ((rv = tls_ep_chk_string(v, sz, t)) == 0) { + if ((rv = tlstran_check_string(v, sz, t)) == 0) { nni_mtx_lock(&ep->mtx); rv = nng_tls_config_cert_key_file(ep->cfg, v, NULL); nni_mtx_unlock(&ep->mtx); @@ -1081,38 +1253,38 @@ tls_ep_set_cert_key_file(void *arg, const void *v, size_t sz, nni_opt_type t) } static int -tls_pipe_get_verified(void *arg, void *v, size_t *szp, nni_opt_type t) +tlstran_pipe_get_verified(void *arg, void *v, size_t *szp, nni_opt_type t) { - tls_pipe *p = arg; + tlstran_pipe *p = arg; return (nni_copyout_bool(nni_tls_verified(p->tls), v, szp, t)); } -static nni_tran_option tls_pipe_options[] = { +static nni_tran_option tlstran_pipe_options[] = { { .o_name = NNG_OPT_LOCADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = tls_pipe_get_locaddr, + .o_get = tlstran_pipe_get_locaddr, }, { .o_name = NNG_OPT_REMADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = tls_pipe_get_remaddr, + .o_get = tlstran_pipe_get_remaddr, }, { .o_name = NNG_OPT_TLS_VERIFIED, .o_type = NNI_TYPE_BOOL, - .o_get = tls_pipe_get_verified, + .o_get = tlstran_pipe_get_verified, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tls_pipe_get_keepalive, + .o_get = tlstran_pipe_get_keepalive, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tls_pipe_get_nodelay, + .o_get = tlstran_pipe_get_nodelay, }, // terminate list { @@ -1120,74 +1292,74 @@ static nni_tran_option tls_pipe_options[] = { }, }; -static nni_tran_pipe_ops tls_pipe_ops = { - .p_fini = tls_pipe_fini, - .p_start = tls_pipe_start, - .p_stop = tls_pipe_stop, - .p_send = tls_pipe_send, - .p_recv = tls_pipe_recv, - .p_close = tls_pipe_close, - .p_peer = tls_pipe_peer, - .p_options = tls_pipe_options, +static nni_tran_pipe_ops tlstran_pipe_ops = { + .p_fini = tlstran_pipe_fini, + .p_start = tlstran_pipe_start, + .p_stop = tlstran_pipe_stop, + .p_send = tlstran_pipe_send, + .p_recv = tlstran_pipe_recv, + .p_close = tlstran_pipe_close, + .p_peer = tlstran_pipe_peer, + .p_options = tlstran_pipe_options, }; -static nni_tran_option tls_dialer_options[] = { +static nni_tran_option tlstran_dialer_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = tls_ep_get_recvmaxsz, - .o_set = tls_ep_set_recvmaxsz, - .o_chk = tls_ep_chk_recvmaxsz, + .o_get = tlstran_ep_get_recvmaxsz, + .o_set = tlstran_ep_set_recvmaxsz, + .o_chk = tlstran_check_recvmaxsz, }, { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, - .o_get = tls_dialer_get_url, + .o_get = tlstran_dialer_get_url, }, { .o_name = NNG_OPT_TLS_CONFIG, .o_type = NNI_TYPE_POINTER, - .o_get = tls_ep_get_config, - .o_set = tls_ep_set_config, - .o_chk = tls_ep_chk_config, + .o_get = tlstran_ep_get_config, + .o_set = tlstran_ep_set_config, + .o_chk = tlstran_check_config, }, { .o_name = NNG_OPT_TLS_CERT_KEY_FILE, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_cert_key_file, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_cert_key_file, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TLS_CA_FILE, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_ca_file, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_ca_file, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TLS_AUTH_MODE, .o_type = NNI_TYPE_INT32, // enum really - .o_set = tls_ep_set_auth_mode, - .o_chk = tls_ep_chk_auth_mode, + .o_set = tlstran_ep_set_auth_mode, + .o_chk = tlstran_check_auth_mode, }, { .o_name = NNG_OPT_TLS_SERVER_NAME, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_server_name, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_server_name, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tls_ep_get_nodelay, - .o_set = tls_ep_set_nodelay, - .o_chk = tls_ep_chk_bool, + .o_get = tlstran_ep_get_nodelay, + .o_set = tlstran_ep_set_nodelay, + .o_chk = tlstran_check_bool, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tls_ep_get_keepalive, - .o_set = tls_ep_set_keepalive, - .o_chk = tls_ep_chk_bool, + .o_get = tlstran_ep_get_keepalive, + .o_set = tlstran_ep_set_keepalive, + .o_chk = tlstran_check_bool, }, // terminate list { @@ -1195,63 +1367,63 @@ static nni_tran_option tls_dialer_options[] = { }, }; -static nni_tran_option tls_listener_options[] = { +static nni_tran_option tlstran_listener_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = tls_ep_get_recvmaxsz, - .o_set = tls_ep_set_recvmaxsz, - .o_chk = tls_ep_chk_recvmaxsz, + .o_get = tlstran_ep_get_recvmaxsz, + .o_set = tlstran_ep_set_recvmaxsz, + .o_chk = tlstran_check_recvmaxsz, }, { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, - .o_get = tls_listener_get_url, + .o_get = tlstran_listener_get_url, }, { .o_name = NNG_OPT_TLS_CONFIG, .o_type = NNI_TYPE_POINTER, - .o_get = tls_ep_get_config, - .o_set = tls_ep_set_config, - .o_chk = tls_ep_chk_config, + .o_get = tlstran_ep_get_config, + .o_set = tlstran_ep_set_config, + .o_chk = tlstran_check_config, }, { .o_name = NNG_OPT_TLS_CERT_KEY_FILE, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_cert_key_file, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_cert_key_file, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TLS_CA_FILE, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_ca_file, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_ca_file, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TLS_AUTH_MODE, .o_type = NNI_TYPE_INT32, // enum really - .o_set = tls_ep_set_auth_mode, - .o_chk = tls_ep_chk_auth_mode, + .o_set = tlstran_ep_set_auth_mode, + .o_chk = tlstran_check_auth_mode, }, { .o_name = NNG_OPT_TLS_SERVER_NAME, .o_type = NNI_TYPE_STRING, - .o_set = tls_ep_set_server_name, - .o_chk = tls_ep_chk_string, + .o_set = tlstran_ep_set_server_name, + .o_chk = tlstran_check_string, }, { .o_name = NNG_OPT_TCP_NODELAY, .o_type = NNI_TYPE_BOOL, - .o_get = tls_ep_get_nodelay, - .o_set = tls_ep_set_nodelay, - .o_chk = tls_ep_chk_bool, + .o_get = tlstran_ep_get_nodelay, + .o_set = tlstran_ep_set_nodelay, + .o_chk = tlstran_check_bool, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, .o_type = NNI_TYPE_BOOL, - .o_get = tls_ep_get_keepalive, - .o_set = tls_ep_set_keepalive, - .o_chk = tls_ep_chk_bool, + .o_get = tlstran_ep_get_keepalive, + .o_set = tlstran_ep_set_keepalive, + .o_chk = tlstran_check_bool, }, // terminate list { @@ -1259,51 +1431,51 @@ static nni_tran_option tls_listener_options[] = { }, }; -static nni_tran_dialer_ops tls_dialer_ops = { - .d_init = tls_dialer_init, - .d_fini = tls_ep_fini, - .d_connect = tls_ep_connect, - .d_close = tls_ep_close, - .d_options = tls_dialer_options, +static nni_tran_dialer_ops tlstran_dialer_ops = { + .d_init = tlstran_dialer_init, + .d_fini = tlstran_dialer_fini, + .d_connect = tlstran_dialer_connect, + .d_close = tlstran_dialer_close, + .d_options = tlstran_dialer_options, }; -static nni_tran_listener_ops tls_listener_ops = { - .l_init = tls_listener_init, - .l_fini = tls_ep_fini, - .l_bind = tls_ep_bind, - .l_accept = tls_ep_accept, - .l_close = tls_ep_close, - .l_options = tls_listener_options, +static nni_tran_listener_ops tlstran_listener_ops = { + .l_init = tlstran_listener_init, + .l_fini = tlstran_listener_fini, + .l_bind = tlstran_listener_bind, + .l_accept = tlstran_listener_accept, + .l_close = tlstran_listener_close, + .l_options = tlstran_listener_options, }; static nni_tran tls_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tls+tcp", - .tran_dialer = &tls_dialer_ops, - .tran_listener = &tls_listener_ops, - .tran_pipe = &tls_pipe_ops, - .tran_init = tls_tran_init, - .tran_fini = tls_tran_fini, + .tran_dialer = &tlstran_dialer_ops, + .tran_listener = &tlstran_listener_ops, + .tran_pipe = &tlstran_pipe_ops, + .tran_init = tlstran_init, + .tran_fini = tlstran_fini, }; static nni_tran tls4_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tls+tcp4", - .tran_dialer = &tls_dialer_ops, - .tran_listener = &tls_listener_ops, - .tran_pipe = &tls_pipe_ops, - .tran_init = tls_tran_init, - .tran_fini = tls_tran_fini, + .tran_dialer = &tlstran_dialer_ops, + .tran_listener = &tlstran_listener_ops, + .tran_pipe = &tlstran_pipe_ops, + .tran_init = tlstran_init, + .tran_fini = tlstran_fini, }; static nni_tran tls6_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tls+tcp6", - .tran_dialer = &tls_dialer_ops, - .tran_listener = &tls_listener_ops, - .tran_pipe = &tls_pipe_ops, - .tran_init = tls_tran_init, - .tran_fini = tls_tran_fini, + .tran_dialer = &tlstran_dialer_ops, + .tran_listener = &tlstran_listener_ops, + .tran_pipe = &tlstran_pipe_ops, + .tran_init = tlstran_init, + .tran_fini = tlstran_fini, }; int |
