diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-07 00:08:24 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-07 00:08:24 -0700 |
| commit | 3730260da3744b549aaa1fe13946a674f924f63c (patch) | |
| tree | 902866876ee71246a299370cbe8f6580d758525c | |
| parent | 3b19940dfcd5d3585b1fb1dcf7915a748ae67289 (diff) | |
| download | nng-3730260da3744b549aaa1fe13946a674f924f63c.tar.gz nng-3730260da3744b549aaa1fe13946a674f924f63c.tar.bz2 nng-3730260da3744b549aaa1fe13946a674f924f63c.zip | |
TCP asynchronous working now.
It turns out that I had to fix a number of subtle asynchronous
handling bugs, but now TCP is fully asynchronous. We need to
change the high-level dial and listen interfaces to be async
as well.
Some of the transport APIs have changed here, and I've elected
to change what we expose to consumers as endpoints into seperate
dialers and listeners. Under the hood they are the same, but
it turns out that its helpful to know the intended use of the
endpoint at initialization time.
Scalability still occasionally hangs on Linux. Investigation
pending.
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/endpt.c | 26 | ||||
| -rw-r--r-- | src/core/endpt.h | 4 | ||||
| -rw-r--r-- | src/core/platform.h | 89 | ||||
| -rw-r--r-- | src/core/socket.c | 4 | ||||
| -rw-r--r-- | src/core/transport.c | 1 | ||||
| -rw-r--r-- | src/core/transport.h | 2 | ||||
| -rw-r--r-- | src/nng.h | 39 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 98 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipc.c | 80 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 176 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_socket.c | 422 | ||||
| -rw-r--r-- | src/platform/posix/posix_socket.h | 45 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 3 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 38 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 4 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 336 | ||||
| -rw-r--r-- | tests/resolv.c | 14 |
19 files changed, 621 insertions, 768 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9392e423..581d1d6c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -81,7 +81,6 @@ set (NNG_SOURCES platform/posix/posix_config.h platform/posix/posix_aio.h platform/posix/posix_pollq.h - platform/posix/posix_socket.h platform/posix/posix_alloc.c platform/posix/posix_clock.c @@ -94,7 +93,6 @@ set (NNG_SOURCES platform/posix/posix_pollq_poll.c platform/posix/posix_rand.c platform/posix/posix_resolv_gai.c - platform/posix/posix_socket.c platform/posix/posix_thread.c platform/windows/win_impl.h diff --git a/src/core/endpt.c b/src/core/endpt.c index 20874c90..e5a9a5bb 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -142,7 +142,7 @@ nni_ep_dtor(void *ptr) int -nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) +nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) { nni_tran *tran; nni_ep *ep; @@ -162,6 +162,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) } ep->ep_sock = sock; ep->ep_tran = tran; + ep->ep_mode = mode; // Could safely use strcpy here, but this avoids discussion. (void) snprintf(ep->ep_addr, sizeof (ep->ep_addr), "%s", addr); @@ -172,7 +173,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) ep->ep_ops = *tran->tran_ep; - if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock)) != 0) { + if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock, mode)) != 0) { nni_objhash_unref(nni_eps, id); return (rv); } @@ -391,7 +392,11 @@ nni_ep_dial(nni_ep *ep, int flags) int rv = 0; nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_mode != NNI_EP_MODE_IDLE) { + if (ep->ep_mode != NNI_EP_MODE_DIAL) { + nni_mtx_unlock(&ep->ep_mtx); + return (NNG_ENOTSUP); + } + if (ep->ep_started) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_EBUSY); } @@ -404,14 +409,14 @@ nni_ep_dial(nni_ep *ep, int flags) nni_mtx_unlock(&ep->ep_mtx); return (rv); } - ep->ep_mode = NNI_EP_MODE_DIAL; + ep->ep_started = 1; if (flags & NNG_FLAG_SYNCH) { nni_mtx_unlock(&ep->ep_mtx); rv = nni_ep_connect_sync(ep); if (rv != 0) { nni_thr_fini(&ep->ep_thr); - ep->ep_mode = NNI_EP_MODE_IDLE; + ep->ep_started = 0; return (rv); } nni_mtx_lock(&ep->ep_mtx); @@ -561,11 +566,14 @@ nni_ep_listen(nni_ep *ep, int flags) int rv = 0; nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_mode != NNI_EP_MODE_IDLE) { + if (ep->ep_mode != NNI_EP_MODE_LISTEN) { + nni_mtx_unlock(&ep->ep_mtx); + return (NNG_ENOTSUP); + } + if (ep->ep_started) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_EBUSY); } - if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); @@ -576,14 +584,14 @@ nni_ep_listen(nni_ep *ep, int flags) return (rv); } - ep->ep_mode = NNI_EP_MODE_LISTEN; + ep->ep_started = 1; if (flags & NNG_FLAG_SYNCH) { nni_mtx_unlock(&ep->ep_mtx); rv = ep->ep_ops.ep_bind(ep->ep_data); if (rv != 0) { nni_thr_fini(&ep->ep_thr); - ep->ep_mode = NNI_EP_MODE_IDLE; + ep->ep_started = 0; return (rv); } nni_mtx_lock(&ep->ep_mtx); diff --git a/src/core/endpt.h b/src/core/endpt.h index 3d353cdf..becaa3f4 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -28,6 +28,7 @@ struct nni_ep { char ep_addr[NNG_MAXADDRLEN]; nni_thr ep_thr; int ep_mode; + int ep_started; int ep_closed; // full shutdown int ep_bound; // true if we bound locally nni_mtx ep_mtx; @@ -36,7 +37,6 @@ struct nni_ep { nni_list ep_pipes; }; -#define NNI_EP_MODE_IDLE 0 #define NNI_EP_MODE_DIAL 1 #define NNI_EP_MODE_LISTEN 2 @@ -46,7 +46,7 @@ extern int nni_ep_find(nni_ep **, uint32_t); extern void nni_ep_hold(nni_ep *); extern void nni_ep_rele(nni_ep *); extern uint32_t nni_ep_id(nni_ep *); -extern int nni_ep_create(nni_ep **, nni_sock *, const char *); +extern int nni_ep_create(nni_ep **, nni_sock *, const char *, int); extern void nni_ep_stop(nni_ep *); extern void nni_ep_close(nni_ep *); extern void nni_ep_remove(nni_ep *); diff --git a/src/core/platform.h b/src/core/platform.h index 0dcab40a..d5c6aa8a 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -200,47 +200,47 @@ extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int); // TCP Support. // -// nni_plat_tcp_init initializes the socket, for example it can -// set underlying file descriptors to -1, etc. -extern int nni_plat_tcp_init(nni_plat_tcpsock **); - -// nni_plat_tcp_fini just closes a TCP socket, and releases any related -// resources. -extern void nni_plat_tcp_fini(nni_plat_tcpsock *); - -// nni_plat_tcp_shutdown performs a shutdown of the socket. For -// BSD sockets, this closes both sides of the TCP connection gracefully, -// but the underlying file descriptor is left open. (This part is critical -// to prevention of close() related races.) -extern void nni_plat_tcp_shutdown(nni_plat_tcpsock *); - -// nni_plat_tcp_listen creates a TCP socket in listening mode, bound -// to the specified address. Note that nni_plat_tcpsock should be defined -// to whatever your platform uses. For most systems its just "int". -extern int nni_plat_tcp_listen(nni_plat_tcpsock *, const nni_sockaddr *); - -// nni_plat_tcp_accept does the accept to accept an inbound connection. -// The tcpsock used for the server will have been set up with the -// nni_plat_tcp_listen. -extern int nni_plat_tcp_accept(nni_plat_tcpsock *, nni_plat_tcpsock *); - -// nni_plat_tcp_connect is the client side. Two addresses are supplied, -// as the client may specify a local address to which to bind. This -// second address may be NULL to use ephemeral ports, which is the -// usual default. -extern int nni_plat_tcp_connect(nni_plat_tcpsock *, const nni_sockaddr *, - const nni_sockaddr *); - -// nni_plat_tcp_aio_send sends the data to the remote side asynchronously. -// The data to send is stored in the a_iov field of the aio, and the array -// of iovs will never be larger than 4. The platform may modify the iovs, -// or the iov list. -extern void nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *); - -// nni_plat_tcp_aio_recv recvs data into the buffers provided by the -// iovs. The implementation does not return until the iovs are completely -// full, or an error condition occurs. -extern void nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *); +typedef struct nni_plat_tcp_ep nni_plat_tcp_ep; +typedef struct nni_plat_tcp_pipe nni_plat_tcp_pipe; + +// nni_plat_tcp_ep_init creates a new endpoint associated with the url. +extern int nni_plat_tcp_ep_init(nni_plat_tcp_ep **, const char *, int); + +// nni_plat_tcp_ep_fini closes the endpoint and releases resources. +extern void nni_plat_tcp_ep_fini(nni_plat_tcp_ep *); + +// nni_plat_tcp_ep_close closes the endpoint; this might not close the +// actual underlying socket, but it should call shutdown on it. +// Further operations on the pipe should return NNG_ECLOSED. +extern void nni_plat_tcp_ep_close(nni_plat_tcp_ep *); + +// nni_plat_tcp_listen creates an TCP socket in listening mode, bound +// to the specified path. +extern int nni_plat_tcp_ep_listen(nni_plat_tcp_ep *); + +// nni_plat_tcp_ep_accept starts an accept to receive an incoming connection. +// An accepted connection will be passed back in the a_pipe member. +extern void nni_plat_tcp_ep_accept(nni_plat_tcp_ep *, nni_aio *); + +// nni_plat_tcp_connect is the client side. +// An accepted connection will be passed back in the a_pipe member. +extern void nni_plat_tcp_ep_connect(nni_plat_tcp_ep *, nni_aio *); + +// nni_plat_tcp_pipe_fini closes the pipe, and releases all resources +// associated with it. +extern void nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *); + +// nni_plat_tcp_pipe_close closes the socket, or at least shuts it down. +// Further operations on the pipe should return NNG_ECLOSED. +extern void nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *); + +// nni_plat_tcp_pipe_send sends data in the iov buffers to the peer. +// The platform may modify the iovs. +extern void nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *, nni_aio *); + +// nni_plat_tcp_pipe_recv recvs data into the buffers provided by the iovs. +// The platform may modify the iovs. +extern void nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *, nni_aio *); // nni_plat_tcp_resolv resolves a TCP name asynchronously. The family // should be one of NNG_AF_INET, NNG_AF_INET6, or NNG_AF_UNSPEC. The @@ -258,8 +258,8 @@ extern void nni_plat_tcp_resolv(const char *, const char *, int, int, typedef struct nni_plat_ipc_ep nni_plat_ipc_ep; typedef struct nni_plat_ipc_pipe nni_plat_ipc_pipe; -// nni_plat_ipc_ep_init creates a new endpoint associated with the path. -extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const char *); +// nni_plat_ipc_ep_init creates a new endpoint associated with the url. +extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const char *, int); // nni_plat_ipc_ep_fini closes the endpoint and releases resources. extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *); @@ -270,8 +270,7 @@ extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *); extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *); // nni_plat_tcp_listen creates an IPC socket in listening mode, bound -// to the specified path. Note that nni_plat_ipcsock should be defined -// to whatever your platform uses. For most systems its just "int". +// to the specified path. extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *); // nni_plat_ipc_ep_accept starts an accept to receive an incoming connection. diff --git a/src/core/socket.c b/src/core/socket.c index d9d9ae3b..66f5c63b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -775,7 +775,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) nni_ep *ep; int rv; - if ((rv = nni_ep_create(&ep, sock, addr)) != 0) { + if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) { return (rv); } @@ -795,7 +795,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) nni_ep *ep; int rv; - if ((rv = nni_ep_create(&ep, sock, addr)) != 0) { + if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) { return (rv); } diff --git a/src/core/transport.c b/src/core/transport.c index de94596c..ce61f722 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -24,6 +24,7 @@ static nni_tran *transports[] = { NULL }; + nni_tran * nni_tran_find(const char *addr) { diff --git a/src/core/transport.h b/src/core/transport.h index dd4da299..8098e5c2 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -40,7 +40,7 @@ struct nni_tran { struct nni_tran_ep { // ep_init creates a vanilla endpoint. The value created is // used for the first argument for all other endpoint functions. - int (*ep_init)(void **, const char *, nni_sock *); + int (*ep_init)(void **, const char *, nni_sock *, int); // ep_fini frees the resources associated with the endpoint. // The endpoint will already have been closed. @@ -42,13 +42,15 @@ extern "C" { // Types common to nng. typedef uint32_t nng_socket; -typedef uint32_t nng_endpoint; +typedef uint32_t nng_dialer; +typedef uint32_t nng_listener; typedef uint32_t nng_pipe; typedef struct nng_msg nng_msg; typedef struct nng_event nng_event; typedef struct nng_notify nng_notify; typedef struct nng_snapshot nng_snapshot; typedef struct nng_stat nng_stat; +typedef uint32_t nng_endpoint; // XXX: REMOVE ME. // nng_open simply creates a socket of the given class. It returns an // error code on failure, or zero on success. The socket starts in cooked @@ -127,8 +129,13 @@ NNG_DECL void nng_unsetnotify(nng_socket, nng_notify *); #define NNG_EV_ERROR NNG_EV_BIT(2) #define NNG_EV_PIPE_ADD NNG_EV_BIT(3) #define NNG_EV_PIPE_REM NNG_EV_BIT(4) -#define NNG_EV_ENDPT_ADD NNG_EV_BIT(5) -#define NNG_EV_ENDPT_REM NNG_EV_BIT(6) +#define NNG_EV_DIALER_ADD NNG_EV_BIT(5) +#define NNG_EV_DIALER_REM NNG_EV_BIT(6) +#define NNG_EV_LISTENER_ADD NNG_EV_BIT(7) +#define NNG_EV_LISTENER_REM NNG_EV_BIT(8) +// XXX: Remove these. +#define NNG_EV_ENDPT_ADD NNG_EV_DIALER_ADD +#define NNG_EV_ENDPT_REM NNG_EV_DIALER_REM // The following functions return more detailed information about the event. // Some of the values will not make sense for some event types, in which case @@ -145,7 +152,7 @@ NNG_DECL const char *nng_event_reason(nng_event *); // endpoint pointer, if it is not NULL. The flags may be NNG_FLAG_SYNCH to // indicate that a failure setting the socket up should return an error // back to the caller immediately. -NNG_DECL int nng_listen(nng_socket, const char *, nng_endpoint *, int); +NNG_DECL int nng_listen(nng_socket, const char *, nng_listener *, int); // nng_dial creates a dialing endpoint, with no special options, and // starts it dialing. Dialers have at most one active connection at a time @@ -155,24 +162,28 @@ NNG_DECL int nng_listen(nng_socket, const char *, nng_endpoint *, int); // dial will be made synchronously, and a failure condition returned back // to the caller. (If the connection is dropped, it will still be // reconnected in the background -- only the initial connect is synchronous.) -NNG_DECL int nng_dial(nng_socket, const char *, nng_endpoint *, int); +NNG_DECL int nng_dial(nng_socket, const char *, nng_dialer *, int); -// nng_endpoint_create creates an endpoint on the socket, but does not -// start it either dialing or listening. -NNG_DECL int nng_endpoint_create(nng_socket, const char *, nng_endpoint *); +// nng_dialer_create creates a new dialer, that is not yet started. +NNG_DECL int nng_dialer_create(nng_socket, const char *, nng_dialer *); -// nng_endpoint_dial starts the endpoint dialing. This is only possible if -// the endpoint is not already dialing or listening. -NNG_DECL int nng_endpoint_dial(nng_endpoint, int); +// nng_listener_create creates a new listener, that is not yet started. +NNG_DECL int nng_listener_create(nng_socket, const char *, nng_listener *); -// nng_endpoint_listen starts the endpoint listening. This is only possible if -// the endpoint is not already dialing or listening. -NNG_DECL int nng_endpoint_listen(nng_endpoint, int); +// nng_dialer_start starts the endpoint dialing. This is only possible if +// the dialer is not already dialing. +NNG_DECL int nng_dialer_start(nng_dialer, int); + +// nng_listener_start starts the endpoint listening. This is only possible if +// the listener is not already listening. +NNG_DECL int nng_listener_start(nng_listener, int); // nng_endpoint_close closes the endpoint, shutting down all underlying // connections and releasing all associated resources. It is an error to // refer to the endpoint after this is called. NNG_DECL int nng_endpoint_close(nng_endpoint); +NNG_DECL int nng_dialer_close(nng_dialer); +NNG_DECL int nng_listener_close(nng_listener); // nng_endpoint_setopt sets an option for a specific endpoint. Note // endpoint options may not be altered on a running endpoint. diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 3d703e97..f64e25e9 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -8,11 +8,10 @@ // #include "core/nng_impl.h" -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_pollq.h" -#include "platform/posix/posix_socket.h" #ifdef PLATFORM_POSIX_EPDESC +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_pollq.h" #include <errno.h> #include <stdlib.h> @@ -180,6 +179,30 @@ nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) static void +nni_posix_epdesc_doerror(nni_posix_epdesc *ed) +{ + nni_aio *aio; + int rv = 1; + socklen_t sz = sizeof (rv); + + if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + if (rv == 0) { + return; + } + rv = nni_plat_errno(rv); + + while ((aio = nni_list_first(&ed->acceptq)) != NULL) { + nni_posix_epdesc_finish(aio, rv, 0); + } + while ((aio = nni_list_first(&ed->connectq)) != NULL) { + nni_posix_epdesc_finish(aio, rv, 0); + } +} + + +static void nni_posix_epdesc_doclose(nni_posix_epdesc *ed) { nni_aio *aio; @@ -191,6 +214,8 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed) if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) { (void) unlink(sun->sun_path); } + (void) close(ed->fd); + ed->fd = -1; } while ((aio = nni_list_first(&ed->acceptq)) != NULL) { nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); @@ -214,7 +239,10 @@ nni_posix_epdesc_cb(void *arg) if (ed->node.revents & POLLOUT) { nni_posix_epdesc_doconnect(ed); } - if (ed->node.revents & (POLLHUP|POLLERR|POLLNVAL)) { + if (ed->node.revents & (POLLERR|POLLHUP)) { + nni_posix_epdesc_doerror(ed); + } + if (ed->node.revents & POLLNVAL) { nni_posix_epdesc_doclose(ed); } ed->node.revents = 0; @@ -241,6 +269,58 @@ nni_posix_epdesc_close(nni_posix_epdesc *ed) } +static int +nni_posix_epdesc_parseaddr(char *pair, char **hostp, uint16_t *portp) +{ + char *host, *port, *end; + char c; + int val; + + if (pair[0] == '[') { + host = pair+1; + // IP address enclosed ... for IPv6 usually. + if ((end = strchr(host, ']')) == NULL) { + return (NNG_EADDRINVAL); + } + *end = '\0'; + port = end + 1; + if (*port == ':') { + port++; + } else if (port != '\0') { + return (NNG_EADDRINVAL); + } + } else { + host = pair; + port = strchr(host, ':'); + if (port != NULL) { + *port = '\0'; + port++; + } + } + val = 0; + while ((c = *port) != '\0') { + val *= 10; + if ((c >= '0') && (c <= '9')) { + val += (c - '0'); + } else { + return (NNG_EADDRINVAL); + } + if (val > 65535) { + return (NNG_EADDRINVAL); + } + port++; + } + if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) { + *hostp = NULL; + } else { + *hostp = host; + } + // Stash the port in big endian (network) byte order. + NNI_PUT16((uint8_t *) portp, val); + return (0); +} + + int nni_posix_epdesc_listen(nni_posix_epdesc *ed) { @@ -250,6 +330,7 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) int fd; nni_mtx_lock(&ed->mtx); + ss = &ed->locaddr; len = ed->loclen; @@ -274,6 +355,8 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) return (rv); } + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + ed->fd = fd; ed->node.fd = fd; nni_mtx_unlock(&ed->mtx); @@ -349,7 +432,10 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) } } + (void) fcntl(ed->fd, F_SETFL, O_NONBLOCK); + rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen); + if (rv == 0) { // Immediate connect, cool! This probably only happens on // loopback, and probably not on every platform. @@ -460,7 +546,9 @@ nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len) void nni_posix_epdesc_fini(nni_posix_epdesc *ed) { - // XXX: MORE WORK HERE. + if (ed->fd >= 0) { + (void) close(ed->fd); + } nni_mtx_fini(&ed->mtx); NNI_FREE_STRUCT(ed); } diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index 5bd42190..706ef15f 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -11,7 +11,6 @@ #ifdef PLATFORM_POSIX_IPC #include "platform/posix/posix_aio.h" -#include "platform/posix/posix_socket.h" #include <errno.h> #include <stdlib.h> @@ -41,37 +40,40 @@ // We alias nni_posix_pipedesc to nni_plat_ipc_pipe. // We alias nni_posix_epdesc to nni_plat_ipc_ep. -static int -nni_plat_ipc_path_resolve(struct sockaddr_un *sun, const char *path) -{ - size_t len; - - memset(sun, 0, sizeof (*sun)); - - // TODO: abstract sockets, including autobind sockets. - len = strlen(path); - if ((len >= sizeof (sun->sun_path)) || (len < 1)) { - return (NNG_EADDRINVAL); - } - (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", path); - sun->sun_family = AF_UNIX; - return (0); -} - - int -nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url) +nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) { nni_posix_epdesc *ed; int rv; + struct sockaddr_un sun; + const char *path; if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { return (NNG_EADDRINVAL); } - url += strlen("ipc://"); // skip the prefix. + path = url + strlen("ipc://"); // skip the prefix. + + // prepare the sockaddr_un + sun.sun_family = AF_UNIX; + if (strlen(url) >= sizeof (sun.sun_path)) { + return (NNG_EADDRINVAL); + } + snprintf(sun.sun_path, sizeof (sun.sun_path), "%s", path); + if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { return (rv); } + switch (mode) { + case NNI_EP_MODE_DIAL: + nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun)); + break; + case NNI_EP_MODE_LISTEN: + nni_posix_epdesc_set_local(ed, &sun, sizeof (sun)); + break; + default: + nni_posix_epdesc_fini(ed); + return (NNG_EINVAL); + } *epp = (void *) ed; return (0); @@ -98,10 +100,14 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) // will just try to cleanup the old socket. Note that this is not // perfect in all scenarios, so use this with caution. static int -nni_plat_ipc_remove_stale(struct sockaddr_un *sun) +nni_plat_ipc_remove_stale(const char *path) { int fd; int rv; + struct sockaddr_un sun; + + sun.sun_family = AF_UNIX; + snprintf(sun.sun_path, sizeof (sun.sun_path), "%s", path); if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { return (nni_plat_errno(errno)); @@ -113,9 +119,9 @@ nni_plat_ipc_remove_stale(struct sockaddr_un *sun) // then the cleanup will fail. As this is supposed to be an // exceptional case, don't worry. (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if (connect(fd, (void *) sun, sizeof (*sun)) < 0) { + if (connect(fd, (void *) &sun, sizeof (sun)) < 0) { if (errno == ECONNREFUSED) { - (void) unlink(sun->sun_path); + (void) unlink(path); } } (void) close(fd); @@ -132,17 +138,11 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) int rv; path = nni_posix_epdesc_url(ed); + path += strlen("ipc://"); - if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { - return (rv); - } - - if ((rv = nni_plat_ipc_remove_stale(&sun)) != 0) { + if ((rv = nni_plat_ipc_remove_stale(path)) != 0) { return (rv); } - - nni_posix_epdesc_set_local(ed, &sun, sizeof (sun)); - return (nni_posix_epdesc_listen(ed)); } @@ -150,21 +150,7 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) { - const char *path; - nni_posix_epdesc *ed = (void *) ep; - struct sockaddr_un sun; - int rv; - - path = nni_posix_epdesc_url(ed); - - if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { - nni_aio_finish(aio, rv, 0); - return; - } - - nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun)); - - nni_posix_epdesc_connect(ed, aio); + nni_posix_epdesc_connect((void *) ep, aio); } diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index 6d0d7c08..26ec3632 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -11,9 +11,9 @@ #ifdef PLATFORM_POSIX_NET #include "platform/posix/posix_aio.h" -#include "platform/posix/posix_socket.h" #include <errno.h> +#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> @@ -24,101 +24,181 @@ #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> -#include <netdb.h> -// We alias nni_plat_tcpsock to an nni_posix_sock. +static int +nni_posix_tcp_addr(struct sockaddr_storage *ss, const nni_sockaddr *sa) +{ + struct sockaddr_in *sin; + struct sockaddr_in6 *sin6; + + switch (sa->s_un.s_family) { + case NNG_AF_INET: + sin = (void *) ss; + memset(sin, 0, sizeof (*sin)); + sin->sin_family = PF_INET; + sin->sin_port = sa->s_un.s_in.sa_port; + sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; + return (sizeof (*sin)); + + + case NNG_AF_INET6: + sin6 = (void *) ss; + memset(sin6, 0, sizeof (*sin6)); +#ifdef SIN6_LEN + sin6->sin6_len = sizeof (*sin6); +#endif + sin6->sin6_family = PF_INET6; + sin6->sin6_port = sa->s_un.s_in6.sa_port; + memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); + return (sizeof (*sin6)); + } + return (-1); +} + + +extern int nni_tcp_parse_url(char *, char **, char **, char **, char **); int -nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags) +nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode) { - struct addrinfo hint; - struct addrinfo *ai; - - memset(&hint, 0, sizeof (hint)); - hint.ai_flags = AI_PASSIVE | AI_ADDRCONFIG | AI_NUMERICSERV; - hint.ai_family = PF_UNSPEC; - hint.ai_socktype = SOCK_STREAM; - hint.ai_protocol = IPPROTO_TCP; - if (flags & NNI_FLAG_IPV4ONLY) { - hint.ai_family = PF_INET; + nni_posix_epdesc *ed; + char buf[NNG_MAXADDRLEN]; + int rv; + char *lhost, *rhost; + char *lserv, *rserv; + char *sep; + struct sockaddr_storage ss; + int len; + int passive; + nni_aio aio; + + if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { + return (rv); } - if (getaddrinfo(host, "1", &hint, &ai) != 0) { - return (NNG_EADDRINVAL); + // Make a local copy. + snprintf(buf, sizeof (buf), "%s", url); + nni_aio_init(&aio, NULL, NULL); + + if (mode == NNI_EP_MODE_DIAL) { + rv = nni_tcp_parse_url(buf, &rhost, &rserv, &lhost, &lserv); + if (rv != 0) { + goto done; + } + + // We have to have a remote destination! + if ((rhost == NULL) || (rserv == NULL)) { + rv = NNG_EADDRINVAL; + goto done; + } + } else { + rv = nni_tcp_parse_url(buf, &lhost, &lserv, &rhost, &rserv); + if (rv != 0) { + goto done; + } + if ((rhost != NULL) || (rserv != NULL)) { + // remotes are nonsensical here. + rv = NNG_EADDRINVAL; + goto done; + } + if (lserv == NULL) { + // missing port to listen on! + rv = NNG_EADDRINVAL; + goto done; + } } - if (nni_posix_from_sockaddr(addr, ai->ai_addr) < 0) { - freeaddrinfo(ai); - return (NNG_EADDRINVAL); + if ((rserv != NULL) || (rhost != NULL)) { + nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, 0, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) != 0) { + goto done; + } + len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]); + nni_posix_epdesc_set_remote(ed, &ss, len); } - freeaddrinfo(ai); + + if ((lserv != NULL) || (lhost != NULL)) { + nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, 1, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) != 0) { + goto done; + } + len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]); + nni_posix_epdesc_set_local(ed, &ss, len); + } + *epp = (void *) ed; return (0); + +done: + if (rv != 0) { + nni_posix_epdesc_fini(ed); + } + nni_aio_fini(&aio); + return (rv); } void -nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio) +nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) { - nni_posix_sock_aio_send((void *) s, aio); + nni_posix_epdesc_fini((void *) ep); } void -nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio) +nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) { - nni_posix_sock_aio_recv((void *) s, aio); + nni_posix_epdesc_close((void *) ep); } int -nni_plat_tcp_init(nni_plat_tcpsock **sp) +nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep) { - nni_posix_sock *s; - int rv; + return (nni_posix_epdesc_listen((void *) ep)); +} - if ((rv = nni_posix_sock_init(&s)) == 0) { - *sp = (void *) s; - } - return (rv); + +void +nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) +{ + return (nni_posix_epdesc_connect((void *) ep, aio)); } void -nni_plat_tcp_fini(nni_plat_tcpsock *s) +nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) { - nni_posix_sock_fini((void *) s); + return (nni_posix_epdesc_accept((void *) ep, aio)); } void -nni_plat_tcp_shutdown(nni_plat_tcpsock *s) +nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p) { - nni_posix_sock_shutdown((void *) s); + nni_posix_pipedesc_fini((void *) p); } -int -nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) +void +nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p) { - return (nni_posix_sock_listen((void *) s, addr)); + nni_posix_pipedesc_close((void *) p); } -// nni_plat_tcp_connect establishes an outbound connection. It the -// bind address is not null, then it will attempt to bind to the local -// address specified first. -int -nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, - const nni_sockaddr *bindaddr) +void +nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio) { - return (nni_posix_sock_connect_sync((void *) s, addr, bindaddr)); + nni_posix_pipedesc_send((void *) p, aio); } -int -nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) +void +nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) { - return (nni_posix_sock_accept_sync((void *) s, (void *) server)); + nni_posix_pipedesc_recv((void *) p, aio); } diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index c3e29c33..c509643e 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -8,10 +8,10 @@ // #include "core/nng_impl.h" -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_pollq.h" #ifdef PLATFORM_POSIX_PIPEDESC +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_pollq.h" #include <errno.h> #include <stdlib.h> @@ -183,6 +183,8 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) if (pd->fd != -1) { // Let any peer know we are closing. (void) shutdown(pd->fd, SHUT_RDWR); + close(pd->fd); + pd->fd = -1; } while ((aio = nni_list_first(&pd->readq)) != NULL) { nni_posix_pipedesc_finish(aio, NNG_ECLOSED); diff --git a/src/platform/posix/posix_socket.c b/src/platform/posix/posix_socket.c deleted file mode 100644 index 9e054c42..00000000 --- a/src/platform/posix/posix_socket.c +++ /dev/null @@ -1,422 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef PLATFORM_POSIX_SOCKET -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_socket.h" - -#include <errno.h> -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/uio.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <arpa/inet.h> -#include <sys/un.h> -#include <fcntl.h> -#include <unistd.h> -#include <netdb.h> - -// Solaris/SunOS systems define this, which collides with our symbol -// names. Just undefine it now. -#ifdef sun -#undef sun -#endif - - -#ifdef SOCK_CLOEXEC -#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) -#else -#define NNI_STREAM_SOCKTYPE SOCK_STREAM -#endif - -struct nni_posix_sock { - int fd; - char * unlink; // path to unlink at unbind - nni_posix_pipedesc * pd; - int tcpnodelay; -}; - -int -nni_posix_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa) -{ - struct sockaddr_in *sin; - struct sockaddr_un *sun; - -#ifdef PF_INET6 - struct sockaddr_in6 *sin6; -#endif - - switch (sa->s_un.s_family) { - case NNG_AF_INET: - sin = (void *) ss; - memset(sin, 0, sizeof (*sin)); - sin->sin_family = PF_INET; - sin->sin_port = sa->s_un.s_in.sa_port; - sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; - return (sizeof (*sin)); - -#ifdef PF_INET6 - // Not every platform can do IPv6. Amazingly. - case NNG_AF_INET6: - sin6 = (void *) ss; - memset(sin6, 0, sizeof (*sin6)); -#ifdef SIN6_LEN - sin6->sin6_len = sizeof (*sin6); -#endif - sin6->sin6_family = PF_INET6; - sin6->sin6_port = sa->s_un.s_in6.sa_port; - memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); - return (sizeof (*sin6)); - -#endif // PF_INET6 - - case NNG_AF_IPC: - sun = (void *) ss; - memset(sun, 0, sizeof (*sun)); - // NB: This logic does not support abstract sockets, which - // have their first byte NULL, and rely on length instead. - // Probably for dealing with abstract sockets we will just - // handle @ specially in the future. - if (strlen(sa->s_un.s_path.sa_path) >= - sizeof (sun->sun_path)) { - return (-1); // caller converts to NNG_EADDRINVAL - } - - sun->sun_family = PF_UNIX; - (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", - sa->s_un.s_path.sa_path); - return (sizeof (*sun)); - } - return (-1); -} - - -int -nni_posix_from_sockaddr(nni_sockaddr *sa, const struct sockaddr *ss) -{ - const struct sockaddr_in *sin; - const struct sockaddr_un *sun; - -#ifdef PF_INET6 - const struct sockaddr_in6 *sin6; -#endif - - memset(sa, 0, sizeof (*sa)); - switch (ss->sa_family) { - case PF_INET: - sin = (const void *) ss; - sa->s_un.s_in.sa_family = NNG_AF_INET; - sa->s_un.s_in.sa_port = sin->sin_port; - sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr; - return (0); - -#ifdef PF_INET6 - case PF_INET6: - sin6 = (const void *) ss; - sa->s_un.s_in6.sa_family = NNG_AF_INET6; - sa->s_un.s_in6.sa_port = sin6->sin6_port; - memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); - return (0); - -#endif // PF_INET6 - - case PF_UNIX: - // NB: This doesn't handle abstract sockets! - sun = (const void *) ss; - sa->s_un.s_path.sa_family = NNG_AF_IPC; - snprintf(sa->s_un.s_path.sa_path, - sizeof (sa->s_un.s_path.sa_path), "%s", sun->sun_path); - return (0); - } - return (-1); -} - - -void -nni_posix_sock_aio_send(nni_posix_sock *s, nni_aio *aio) -{ - nni_posix_pipedesc_send(s->pd, aio); -} - - -void -nni_posix_sock_aio_recv(nni_posix_sock *s, nni_aio *aio) -{ - nni_posix_pipedesc_recv(s->pd, aio); -} - - -static void -nni_posix_sock_setopts_fd(int fd, int tcpnodelay) -{ - int one; - - // Try to ensure that both CLOEXEC is set, and that we don't - // generate SIGPIPE. (Note that SIGPIPE suppression in this way - // only works on BSD systems. Linux wants us to use sendmsg().) - (void) fcntl(fd, F_SETFD, FD_CLOEXEC); -#if defined(F_SETNOSIGPIPE) - (void) fcntl(fd, F_SETNOSIGPIPE, 1); -#elif defined(SO_NOSIGPIPE) - one = 1; - (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one)); -#endif - - // Also disable Nagle. We are careful to group data with writev, - // and latency is king for most of our users. (Consider adding - // a method to enable this later.) - - // It's unclear whether this is safe for UNIX domain sockets. It - // *should* be. - if (tcpnodelay) { - one = 1; - (void) setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, - sizeof (one)); - } -} - - -int -nni_posix_sock_init(nni_posix_sock **sp) -{ - nni_posix_sock *s; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - s->fd = -1; - *sp = s; - return (0); -} - - -void -nni_posix_sock_fini(nni_posix_sock *s) -{ - if (s->fd != -1) { - (void) close(s->fd); - s->fd = -1; - } - if (s->pd != NULL) { - nni_posix_pipedesc_fini(s->pd); - } - if (s->unlink != NULL) { - (void) unlink(s->unlink); - nni_free(s->unlink, strlen(s->unlink) + 1); - } - NNI_FREE_STRUCT(s); -} - - -void -nni_posix_sock_shutdown(nni_posix_sock *s) -{ - if (s->fd != -1) { - (void) shutdown(s->fd, SHUT_RDWR); - // This causes the equivalent of a close. Hopefully waking - // up anything that didn't get the hint with the shutdown. - // (macOS does not see the shtudown). - (void) dup2(nni_plat_devnull, s->fd); - } - if (s->pd != NULL) { - nni_posix_pipedesc_close(s->pd); - } -} - - -int -nni_posix_sock_listen(nni_posix_sock *s, const nni_sockaddr *saddr) -{ - int len; - struct sockaddr_storage ss; - int rv; - int fd; - - if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - if ((saddr->s_un.s_family == NNG_AF_INET) || - (saddr->s_un.s_family == NNG_AF_INET6)) { - s->tcpnodelay = 1; - } - - nni_posix_sock_setopts_fd(fd, s->tcpnodelay); - - // UNIX DOMAIN SOCKETS -- these have names in the file namespace. - // We are going to check to see if there was a name already there. - // If there was, and nothing is listening (ECONNREFUSED), then we - // will just try to cleanup the old socket. Note that this is not - // perfect in all scenarios, so use this with caution. - if ((saddr->s_un.s_family == NNG_AF_IPC) && - (saddr->s_un.s_path.sa_path[0] != 0)) { - int chkfd; - if ((chkfd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { - (void) close(fd); - return (nni_plat_errno(errno)); - } - - // Nonblocking; we don't want to wait for remote server. - (void) fcntl(chkfd, F_SETFL, O_NONBLOCK); - if (connect(chkfd, (struct sockaddr *) &ss, len) < 0) { - if (errno == ECONNREFUSED) { - (void) unlink(saddr->s_un.s_path.sa_path); - } - } - (void) close(chkfd); - } - - if (bind(fd, (struct sockaddr *) &ss, len) < 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - - // For IPC, record the path so we unlink it later - if ((saddr->s_un.s_family == NNG_AF_IPC) && - (saddr->s_un.s_path.sa_path[0] != 0)) { - s->unlink = nni_alloc(strlen(saddr->s_un.s_path.sa_path) + 1); - if (s->unlink == NULL) { - (void) close(fd); - (void) unlink(saddr->s_un.s_path.sa_path); - return (NNG_ENOMEM); - } - strcpy(s->unlink, saddr->s_un.s_path.sa_path); - } - - // Listen -- 128 depth is probably sufficient. If it isn't, other - // bad things are going to happen. - if (listen(fd, 128) != 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - - s->fd = fd; - - return (0); -} - - -// These functions will need to be removed in the future. They are -// transition functions for now. - -int -nni_posix_sock_accept_sync(nni_posix_sock *s, nni_posix_sock *server) -{ - int fd; - int rv; - - for (;;) { -#ifdef NNG_USE_ACCEPT4 - fd = accept4(server->fd, NULL, NULL, SOCK_CLOEXEC); - if ((fd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { - fd = accept(server->fd, NULL, NULL); - } -#else - fd = accept(server->fd, NULL, NULL); -#endif - - if (fd < 0) { - return (nni_plat_errno(errno)); - } else { - break; - } - } - - nni_posix_sock_setopts_fd(fd, s->tcpnodelay); - - if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) { - close(fd); - return (rv); - } - s->fd = fd; - return (0); -} - - -int -nni_posix_sock_connect_sync(nni_posix_sock *s, const nni_sockaddr *addr, - const nni_sockaddr *bindaddr) -{ - int fd; - int len; - struct sockaddr_storage ss; - struct sockaddr_storage bss; - int rv; - - if ((len = nni_posix_to_sockaddr(&ss, addr)) < 0) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - - if ((addr->s_un.s_family == NNG_AF_INET) || - (addr->s_un.s_family == NNG_AF_INET6)) { - s->tcpnodelay = 1; - } - - if (bindaddr != NULL) { - if (bindaddr->s_un.s_family != addr->s_un.s_family) { - return (NNG_EINVAL); - } - if (nni_posix_to_sockaddr(&bss, bindaddr) < 0) { - return (NNG_EADDRINVAL); - } - if (bind(fd, (struct sockaddr *) &bss, len) < 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - } - - nni_posix_sock_setopts_fd(fd, s->tcpnodelay); - - if (connect(fd, (struct sockaddr *) &ss, len) != 0) { - rv = nni_plat_errno(errno); - // Unix domain sockets return ENOENT when nothing is there. - // Massage this into ECONNREFUSED, to provide more consistent - // behavior. - if (rv == NNG_ENOENT) { - rv = NNG_ECONNREFUSED; - } - (void) close(fd); - return (rv); - } - if (s->pd != NULL) { - // If we had a prior pipedesc hanging around, nuke it. - nni_posix_pipedesc_fini(s->pd); - s->pd = NULL; - } - if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) { - (void) close(fd); - return (rv); - } - s->fd = fd; - return (0); -} - - -#else - -// Suppress empty symbols warnings in ranlib. -int nni_posix_socket_not_used = 0; - -#endif // PLATFORM_POSIX_SOCKET diff --git a/src/platform/posix/posix_socket.h b/src/platform/posix/posix_socket.h deleted file mode 100644 index f3fb169c..00000000 --- a/src/platform/posix/posix_socket.h +++ /dev/null @@ -1,45 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#ifndef PLATFORM_POSIX_SOCKET_H -#define PLATFORM_POSIX_SOCKET_H - -// This file provides declarations for comment socket handling functions on -// POSIX platforms. We assume that TCP and Unix domain socket (IPC) all -// work using mostly comment socket handling routines. - -#include "core/nng_impl.h" - -#include "platform/posix/posix_aio.h" - -#include <sys/types.h> -#include <sys/socket.h> - -typedef struct nni_posix_sock nni_posix_sock; - -extern int nni_posix_to_sockaddr(struct sockaddr_storage *, - const nni_sockaddr *); -extern int nni_posix_from_sockaddr(nni_sockaddr *, const struct sockaddr *); -extern void nni_posix_sock_aio_send(nni_posix_sock *, nni_aio *); -extern void nni_posix_sock_aio_recv(nni_posix_sock *, nni_aio *); -extern int nni_posix_sock_init(nni_posix_sock **); -extern void nni_posix_sock_fini(nni_posix_sock *); -extern void nni_posix_sock_shutdown(nni_posix_sock *); -extern int nni_posix_sock_listen(nni_posix_sock *, const nni_sockaddr *); - -// These functions will need to be removed in the future. They are -// transition functions for now. - -extern int nni_posix_sock_send_sync(nni_posix_sock *, nni_iov *, int); -extern int nni_posix_sock_recv_sync(nni_posix_sock *, nni_iov *, int); -extern int nni_posix_sock_accept_sync(nni_posix_sock *, nni_posix_sock *); -extern int nni_posix_sock_connect_sync(nni_posix_sock *, - const nni_sockaddr *, const nni_sockaddr *); - -#endif // PLATFORM_POSIX_SOCKET_H diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 4137984f..a2f24f8a 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -297,7 +297,7 @@ nni_plat_init(int (*helper)(void)) // probably get by with even just 8k, but Linux usually wants 16k // as a minimum. If this fails, its not fatal, just we won't be // as scalable / thrifty with our use of VM. - (void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); + //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); if ((rv = nni_posix_pollq_sysinit()) != 0) { pthread_mutex_unlock(&nni_plat_lock); @@ -316,7 +316,6 @@ nni_plat_init(int (*helper)(void)) pthread_condattr_destroy(&nni_cvattr); pthread_attr_destroy(&nni_pthread_attr); return (rv); - } if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 52ed582a..0489ea38 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -49,6 +49,7 @@ struct nni_inproc_ep { char addr[NNG_MAXADDRLEN+1]; int mode; int closed; + int started; nni_list_node node; uint16_t proto; nni_cv cv; @@ -217,7 +218,7 @@ nni_inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) static int -nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock) +nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) { nni_inproc_ep *ep; int rv; @@ -229,8 +230,9 @@ nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock) return (NNG_ENOMEM); } - ep->mode = NNI_INPROC_EP_IDLE; + ep->mode = mode; ep->closed = 0; + ep->started = 0; ep->proto = nni_sock_proto(sock); NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); nni_aio_list_init(&ep->aios); @@ -365,7 +367,6 @@ nni_inproc_accept_clients(nni_inproc_ep *server) pair->pipes[1]->peer = pair->pipes[0]->proto; pair->pipes[0]->peer = pair->pipes[1]->proto; pair->refcnt = 2; - client->mode = NNI_INPROC_EP_IDLE; // XXX: ?? nni_inproc_conn_finish(caio, 0); nni_inproc_conn_finish(saio, 0); @@ -390,12 +391,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) nni_inproc_ep *server; int rv; - if (ep->mode != NNI_INPROC_EP_IDLE) { + if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish(aio, NNG_EINVAL, 0); return; } + if (ep->started) { + nni_aio_finish(aio, NNG_EBUSY, 0); + return; + } if (nni_list_active(&ep->clients, ep)) { - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_aio_finish(aio, NNG_EBUSY, 0); return; } @@ -421,7 +426,8 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { - if (server->mode != NNI_INPROC_EP_LISTEN) { + if ((server->mode != NNI_EP_MODE_LISTEN) || + (server->started == 0)) { continue; } if (strcmp(server->addr, ep->addr) == 0) { @@ -434,7 +440,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) return; } - ep->mode = NNI_INPROC_EP_DIAL; nni_list_append(&server->clients, ep); nni_aio_list_append(&ep->aios, aio); @@ -450,16 +455,21 @@ nni_inproc_ep_bind(void *arg) nni_inproc_ep *srch; nni_list *list = &nni_inproc.servers; - if (ep->mode != NNI_INPROC_EP_IDLE) { + if (ep->mode != NNI_EP_MODE_LISTEN) { return (NNG_EINVAL); } nni_mtx_lock(&nni_inproc.mx); + if (ep->started) { + nni_mtx_unlock(&nni_inproc.mx); + return (NNG_EBUSY); + } if (ep->closed) { nni_mtx_unlock(&nni_inproc.mx); return (NNG_ECLOSED); } NNI_LIST_FOREACH (list, srch) { - if (srch->mode != NNI_INPROC_EP_LISTEN) { + if ((srch->mode != NNI_EP_MODE_LISTEN) || + (!srch->started)) { continue; } if (strcmp(srch->addr, ep->addr) == 0) { @@ -467,7 +477,7 @@ nni_inproc_ep_bind(void *arg) return (NNG_EADDRINUSE); } } - ep->mode = NNI_INPROC_EP_LISTEN; + ep->started = 1; nni_list_append(list, ep); nni_mtx_unlock(&nni_inproc.mx); return (0); @@ -481,10 +491,9 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) nni_inproc_pipe *pipe; int rv; - if (ep->mode != NNI_INPROC_EP_LISTEN) { + if (ep->mode != NNI_EP_MODE_LISTEN) { nni_aio_finish(aio, NNG_EINVAL, 0); } - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { nni_aio_finish(aio, rv, 0); return; @@ -494,13 +503,16 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) aio->a_pipe = pipe; // We are already on the master list of servers, thanks to bind. - if (ep->closed) { // This is the only possible error path from the // time we acquired the lock. nni_inproc_conn_finish(aio, NNG_ECLOSED); return; } + if (!ep->started) { + nni_inproc_conn_finish(aio, NNG_ESTATE); + return; + } // Insert us into the pending server aios, and then run the // accept list. diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index d9ddeb2d..fcfe44ea 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -493,7 +493,7 @@ nni_ipc_ep_fini(void *arg) static int -nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) +nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) { nni_ipc_ep *ep; int rv; @@ -508,7 +508,7 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) } if (((rv = nni_mtx_init(&ep->mtx)) != 0) || ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) || - ((rv = nni_plat_ipc_ep_init(&ep->iep, url)) != 0)) { + ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0)) { nni_ipc_ep_fini(ep); return (rv); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index a43763e2..237b0c07 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -22,7 +22,7 @@ typedef struct nni_tcp_ep nni_tcp_ep; // nni_tcp_pipe is one end of a TCP connection. struct nni_tcp_pipe { const char * addr; - nni_plat_tcpsock * tsp; + nni_plat_tcp_pipe * tpp; uint16_t peer; uint16_t proto; size_t rcvmax; @@ -46,17 +46,21 @@ struct nni_tcp_pipe { struct nni_tcp_ep { char addr[NNG_MAXADDRLEN+1]; - nni_plat_tcpsock * tsp; + nni_plat_tcp_ep * tep; int closed; uint16_t proto; size_t rcvmax; int ipv4only; + nni_aio aio; + nni_aio * user_aio; + nni_mtx mtx; }; static void nni_tcp_pipe_send_cb(void *); static void nni_tcp_pipe_recv_cb(void *); static void nni_tcp_pipe_nego_cb(void *); +static void nni_tcp_ep_cb(void *arg); static int nni_tcp_tran_init(void) @@ -76,7 +80,7 @@ nni_tcp_pipe_close(void *arg) { nni_tcp_pipe *pipe = arg; - nni_plat_tcp_shutdown(pipe->tsp); + nni_plat_tcp_pipe_close(pipe->tpp); } @@ -88,8 +92,8 @@ nni_tcp_pipe_fini(void *arg) nni_aio_fini(&pipe->rxaio); nni_aio_fini(&pipe->txaio); nni_aio_fini(&pipe->negaio); - if (pipe->tsp != NULL) { - nni_plat_tcp_fini(pipe->tsp); + if (pipe->tpp != NULL) { + nni_plat_tcp_pipe_fini(pipe->tpp); } if (pipe->rxmsg) { nni_msg_free(pipe->rxmsg); @@ -100,7 +104,7 @@ nni_tcp_pipe_fini(void *arg) static int -nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep) +nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) { nni_tcp_pipe *pipe; int rv; @@ -111,9 +115,6 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep) if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { goto fail; } - if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) { - goto fail; - } rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe); if (rv != 0) { goto fail; @@ -128,6 +129,9 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep) } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; + pipe->tpp = tpp; + pipe->addr = ep->addr; + *pipep = pipe; return (0); @@ -175,7 +179,7 @@ nni_tcp_pipe_nego_cb(void *arg) aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; aio->a_iov[0].iov_buf = &pipe->txlen[pipe->gottxhead]; // send it down... - nni_plat_tcp_aio_send(pipe->tsp, aio); + nni_plat_tcp_pipe_send(pipe->tpp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -183,7 +187,7 @@ nni_tcp_pipe_nego_cb(void *arg) aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; aio->a_iov[0].iov_buf = &pipe->rxlen[pipe->gotrxhead]; - nni_plat_tcp_aio_recv(pipe->tsp, aio); + nni_plat_tcp_pipe_recv(pipe->tpp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -296,7 +300,7 @@ nni_tcp_pipe_recv_cb(void *arg) pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); pipe->rxaio.a_niov = 1; - nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); return; } @@ -353,7 +357,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); pipe->txaio.a_niov = 3; - nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio); + nni_plat_tcp_pipe_send(pipe->tpp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); } @@ -392,7 +396,7 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen); pipe->rxaio.a_niov = 1; - nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); } @@ -431,59 +435,10 @@ nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp) static int -nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock) -{ - nni_tcp_ep *ep; - int rv; - - if (strlen(url) > NNG_MAXADDRLEN-1) { - return (NNG_EADDRINVAL); - } - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } - ep->closed = 0; - ep->proto = nni_sock_proto(sock); - ep->ipv4only = 0; // XXX: FIXME - ep->rcvmax = nni_sock_rcvmaxsz(sock); - - if ((rv = nni_plat_tcp_init(&ep->tsp)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } - - (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); - - *epp = ep; - return (0); -} - - -static void -nni_tcp_ep_fini(void *arg) +nni_tcp_parse_pair(char *pair, char **hostp, char **servp) { - nni_tcp_ep *ep = arg; - - nni_plat_tcp_fini(ep->tsp); - NNI_FREE_STRUCT(ep); -} - - -static void -nni_tcp_ep_close(void *arg) -{ - nni_tcp_ep *ep = arg; - - nni_plat_tcp_shutdown(ep->tsp); -} - - -static int -nni_parseaddr(char *pair, char **hostp, uint16_t *portp) -{ - char *host, *port, *end; + char *host, *serv, *end; char c; - int val; if (pair[0] == '[') { host = pair+1; @@ -492,40 +447,73 @@ nni_parseaddr(char *pair, char **hostp, uint16_t *portp) return (NNG_EADDRINVAL); } *end = '\0'; - port = end + 1; - if (*port == ':') { - port++; - } else if (port != '\0') { + serv = end + 1; + if (*serv == ':') { + serv++; + } else if (serv != '\0') { return (NNG_EADDRINVAL); } } else { host = pair; - port = strchr(host, ':'); - if (port != NULL) { - *port = '\0'; - port++; + serv = strchr(host, ':'); + if (serv != NULL) { + *serv = '\0'; + serv++; } } - val = 0; - while ((c = *port) != '\0') { - val *= 10; - if ((c >= '0') && (c <= '9')) { - val += (c - '0'); + if (hostp != NULL) { + if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) { + *hostp = NULL; } else { - return (NNG_EADDRINVAL); + *hostp = host; } - if (val > 65535) { - return (NNG_EADDRINVAL); + } + if (servp != NULL) { + if (strlen(serv) == 0) { + *servp = NULL; + } else { + *servp = serv; } - port++; } - if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) { - *hostp = NULL; + // Stash the port in big endian (network) byte order. + return (0); +} + + +// Note that the url *must* be in a modifiable buffer. +int +nni_tcp_parse_url(char *url, char **host1, char **serv1, char **host2, + char **serv2) +{ + char *h1; + int rv; + + if (strncmp(url, "tcp://", strlen("tcp://")) != 0) { + return (NNG_EADDRINVAL); + } + url += strlen("tcp://"); + if ((h1 = strchr(url, ';')) != 0) { + // For these we want the second part first, because + // the "primary" address is the remote address, and the + // "secondary" is the local (bind) address. This is only + // used for dial side. + *h1 = '\0'; + h1++; + if (((rv = nni_tcp_parse_pair(h1, host1, serv1)) != 0) || + ((rv = nni_tcp_parse_pair(url, host2, serv2)) != 0)) { + return (rv); + } } else { - *hostp = host; + if (host2 != NULL) { + *host2 = NULL; + } + if (serv2 != NULL) { + *serv2 = NULL; + } + if ((rv = nni_tcp_parse_pair(url, host1, serv1)) != 0) { + return (rv); + } } - // Stash the port in big endian (network) byte order. - NNI_PUT16((uint8_t *) portp, val); return (0); } @@ -556,11 +544,12 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) nni_mtx_unlock(&pipe->mtx); return; } - nni_plat_tcp_aio_send(pipe->tsp, &pipe->negaio); + nni_plat_tcp_pipe_send(pipe->tpp, &pipe->negaio); nni_mtx_unlock(&pipe->mtx); } +#if 0 static int nni_tcp_ep_connect_sync(void *arg, void **pipep) { @@ -659,23 +648,162 @@ nni_tcp_ep_bind(void *arg) } +#endif + +static void +nni_tcp_ep_fini(void *arg) +{ + nni_tcp_ep *ep = arg; + + if (ep->tep != NULL) { + nni_plat_tcp_ep_fini(ep->tep); + } + nni_aio_fini(&ep->aio); + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + + static int -nni_tcp_ep_accept_sync(void *arg, void **pipep) +nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) +{ + nni_tcp_ep *ep; + int rv; + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_mtx_init(&ep->mtx)) != 0) || + ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) || + ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0)) { + nni_tcp_ep_fini(ep); + return (rv); + } + ep->closed = 0; + ep->proto = nni_sock_proto(sock); + ep->rcvmax = nni_sock_rcvmaxsz(sock); + (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); + + *epp = ep; + return (0); +} + + +static void +nni_tcp_ep_close(void *arg) { nni_tcp_ep *ep = arg; + + nni_plat_tcp_ep_close(ep->tep); +} + + +static int +nni_tcp_ep_bind(void *arg) +{ + nni_tcp_ep *ep = arg; + + return (nni_plat_tcp_ep_listen(ep->tep)); +} + + +static void +nni_tcp_ep_finish(nni_tcp_ep *ep) +{ + nni_aio *aio = ep->user_aio; nni_tcp_pipe *pipe; int rv; - if ((rv = nni_tcp_pipe_init(&pipe, ep)) != 0) { - return (rv); + if ((aio = ep->user_aio) == NULL) { + return; + } + ep->user_aio = NULL; + if ((rv = nni_aio_result(&ep->aio)) != 0) { + goto done; } + NNI_ASSERT(ep->aio.a_pipe != NULL); - if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) { - nni_tcp_pipe_fini(pipe); - return (rv); + // Attempt to allocate the parent pipe. If this fails we'll + // drop the connection (ENOMEM probably). + if ((rv = nni_tcp_pipe_init(&pipe, ep, ep->aio.a_pipe)) != 0) { + nni_plat_tcp_pipe_fini(ep->aio.a_pipe); + goto done; } - *pipep = pipe; - return (0); + + aio->a_pipe = pipe; + +done: + ep->aio.a_pipe = NULL; + nni_aio_finish(aio, rv, 0); +} + + +static void +nni_tcp_ep_cb(void *arg) +{ + nni_tcp_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + nni_tcp_ep_finish(ep); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_tcp_cancel_ep(nni_aio *aio) +{ + nni_tcp_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&ep->mtx); + if (ep->user_aio == aio) { + ep->user_aio = NULL; + } + nni_aio_stop(&ep->aio); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_tcp_ep_accept(void *arg, nni_aio *aio) +{ + nni_tcp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; + + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; + } + + nni_plat_tcp_ep_accept(ep->tep, &ep->aio); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_tcp_ep_connect(void *arg, nni_aio *aio) +{ + nni_tcp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; + + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; + } + + nni_plat_tcp_ep_connect(ep->tep, &ep->aio); + nni_mtx_unlock(&ep->mtx); } @@ -690,14 +818,14 @@ static nni_tran_pipe nni_tcp_pipe_ops = { }; static nni_tran_ep nni_tcp_ep_ops = { - .ep_init = nni_tcp_ep_init, - .ep_fini = nni_tcp_ep_fini, - .ep_connect_sync = nni_tcp_ep_connect_sync, - .ep_bind = nni_tcp_ep_bind, - .ep_accept_sync = nni_tcp_ep_accept_sync, - .ep_close = nni_tcp_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_tcp_ep_init, + .ep_fini = nni_tcp_ep_fini, + .ep_connect = nni_tcp_ep_connect, + .ep_bind = nni_tcp_ep_bind, + .ep_accept = nni_tcp_ep_accept, + .ep_close = nni_tcp_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the TCP transport linkage, and should be the only global diff --git a/tests/resolv.c b/tests/resolv.c index 5ace327a..52a598dc 100644 --- a/tests/resolv.c +++ b/tests/resolv.c @@ -35,6 +35,12 @@ ip6str(void *addr) TestMain("TCP Resolver", { nni_init(); +// These work on Darwin, and should work on illumos, but they may +// depend on the local resolver configuration. We elect not to depend +// too much on them, since localhost can be configured weirdly. Notably +// the normal assumptions on Linux do *not* hold true. +#if 0 + Convey("Localhost IPv4 resolves", { nni_aio aio; const char *str; @@ -53,6 +59,7 @@ TestMain("TCP Resolver", { nni_aio_fini(&aio); } ); + Convey("Localhost IPv6 resolves", { nni_aio aio; memset(&aio, 0, sizeof (aio)); @@ -107,13 +114,14 @@ TestMain("TCP Resolver", { nni_aio_fini(&aio); } ); +#endif Convey("Google DNS IPv4 resolves", { nni_aio aio; const char *str; memset(&aio, 0, sizeof (aio)); nni_aio_init(&aio, NULL, NULL); nni_plat_tcp_resolv("google-public-dns-a.google.com", - "80", NNG_AF_INET, 1, &aio); + "80", NNG_AF_INET, 1, &aio); nni_aio_wait(&aio); So(nni_aio_result(&aio) == 0); So(aio.a_naddrs == 1); @@ -130,7 +138,7 @@ TestMain("TCP Resolver", { memset(&aio, 0, sizeof (aio)); nni_aio_init(&aio, NULL, NULL); nni_plat_tcp_resolv("8.8.4.4", - "80", NNG_AF_INET, 1, &aio); + "80", NNG_AF_INET, 1, &aio); nni_aio_wait(&aio); So(nni_aio_result(&aio) == 0); So(aio.a_naddrs == 1); @@ -147,7 +155,7 @@ TestMain("TCP Resolver", { memset(&aio, 0, sizeof (aio)); nni_aio_init(&aio, NULL, NULL); nni_plat_tcp_resolv("8.8.4.4", - "http", NNG_AF_INET, 1, &aio); + "http", NNG_AF_INET, 1, &aio); nni_aio_wait(&aio); So(nni_aio_result(&aio) == 0); So(aio.a_naddrs == 1); |
