diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/endpt.c | 26 | ||||
| -rw-r--r-- | src/core/endpt.h | 4 | ||||
| -rw-r--r-- | src/core/platform.h | 89 | ||||
| -rw-r--r-- | src/core/socket.c | 4 | ||||
| -rw-r--r-- | src/core/transport.c | 1 | ||||
| -rw-r--r-- | src/core/transport.h | 2 | ||||
| -rw-r--r-- | src/nng.h | 39 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 98 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipc.c | 80 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 176 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_socket.c | 422 | ||||
| -rw-r--r-- | src/platform/posix/posix_socket.h | 45 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 3 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 38 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 4 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 336 |
18 files changed, 610 insertions, 765 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9392e423..581d1d6c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -81,7 +81,6 @@ set (NNG_SOURCES platform/posix/posix_config.h platform/posix/posix_aio.h platform/posix/posix_pollq.h - platform/posix/posix_socket.h platform/posix/posix_alloc.c platform/posix/posix_clock.c @@ -94,7 +93,6 @@ set (NNG_SOURCES platform/posix/posix_pollq_poll.c platform/posix/posix_rand.c platform/posix/posix_resolv_gai.c - platform/posix/posix_socket.c platform/posix/posix_thread.c platform/windows/win_impl.h diff --git a/src/core/endpt.c b/src/core/endpt.c index 20874c90..e5a9a5bb 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -142,7 +142,7 @@ nni_ep_dtor(void *ptr) int -nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) +nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) { nni_tran *tran; nni_ep *ep; @@ -162,6 +162,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) } ep->ep_sock = sock; ep->ep_tran = tran; + ep->ep_mode = mode; // Could safely use strcpy here, but this avoids discussion. (void) snprintf(ep->ep_addr, sizeof (ep->ep_addr), "%s", addr); @@ -172,7 +173,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) ep->ep_ops = *tran->tran_ep; - if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock)) != 0) { + if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock, mode)) != 0) { nni_objhash_unref(nni_eps, id); return (rv); } @@ -391,7 +392,11 @@ nni_ep_dial(nni_ep *ep, int flags) int rv = 0; nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_mode != NNI_EP_MODE_IDLE) { + if (ep->ep_mode != NNI_EP_MODE_DIAL) { + nni_mtx_unlock(&ep->ep_mtx); + return (NNG_ENOTSUP); + } + if (ep->ep_started) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_EBUSY); } @@ -404,14 +409,14 @@ nni_ep_dial(nni_ep *ep, int flags) nni_mtx_unlock(&ep->ep_mtx); return (rv); } - ep->ep_mode = NNI_EP_MODE_DIAL; + ep->ep_started = 1; if (flags & NNG_FLAG_SYNCH) { nni_mtx_unlock(&ep->ep_mtx); rv = nni_ep_connect_sync(ep); if (rv != 0) { nni_thr_fini(&ep->ep_thr); - ep->ep_mode = NNI_EP_MODE_IDLE; + ep->ep_started = 0; return (rv); } nni_mtx_lock(&ep->ep_mtx); @@ -561,11 +566,14 @@ nni_ep_listen(nni_ep *ep, int flags) int rv = 0; nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_mode != NNI_EP_MODE_IDLE) { + if (ep->ep_mode != NNI_EP_MODE_LISTEN) { + nni_mtx_unlock(&ep->ep_mtx); + return (NNG_ENOTSUP); + } + if (ep->ep_started) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_EBUSY); } - if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); @@ -576,14 +584,14 @@ nni_ep_listen(nni_ep *ep, int flags) return (rv); } - ep->ep_mode = NNI_EP_MODE_LISTEN; + ep->ep_started = 1; if (flags & NNG_FLAG_SYNCH) { nni_mtx_unlock(&ep->ep_mtx); rv = ep->ep_ops.ep_bind(ep->ep_data); if (rv != 0) { nni_thr_fini(&ep->ep_thr); - ep->ep_mode = NNI_EP_MODE_IDLE; + ep->ep_started = 0; return (rv); } nni_mtx_lock(&ep->ep_mtx); diff --git a/src/core/endpt.h b/src/core/endpt.h index 3d353cdf..becaa3f4 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -28,6 +28,7 @@ struct nni_ep { char ep_addr[NNG_MAXADDRLEN]; nni_thr ep_thr; int ep_mode; + int ep_started; int ep_closed; // full shutdown int ep_bound; // true if we bound locally nni_mtx ep_mtx; @@ -36,7 +37,6 @@ struct nni_ep { nni_list ep_pipes; }; -#define NNI_EP_MODE_IDLE 0 #define NNI_EP_MODE_DIAL 1 #define NNI_EP_MODE_LISTEN 2 @@ -46,7 +46,7 @@ extern int nni_ep_find(nni_ep **, uint32_t); extern void nni_ep_hold(nni_ep *); extern void nni_ep_rele(nni_ep *); extern uint32_t nni_ep_id(nni_ep *); -extern int nni_ep_create(nni_ep **, nni_sock *, const char *); +extern int nni_ep_create(nni_ep **, nni_sock *, const char *, int); extern void nni_ep_stop(nni_ep *); extern void nni_ep_close(nni_ep *); extern void nni_ep_remove(nni_ep *); diff --git a/src/core/platform.h b/src/core/platform.h index 0dcab40a..d5c6aa8a 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -200,47 +200,47 @@ extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int); // TCP Support. // -// nni_plat_tcp_init initializes the socket, for example it can -// set underlying file descriptors to -1, etc. -extern int nni_plat_tcp_init(nni_plat_tcpsock **); - -// nni_plat_tcp_fini just closes a TCP socket, and releases any related -// resources. -extern void nni_plat_tcp_fini(nni_plat_tcpsock *); - -// nni_plat_tcp_shutdown performs a shutdown of the socket. For -// BSD sockets, this closes both sides of the TCP connection gracefully, -// but the underlying file descriptor is left open. (This part is critical -// to prevention of close() related races.) -extern void nni_plat_tcp_shutdown(nni_plat_tcpsock *); - -// nni_plat_tcp_listen creates a TCP socket in listening mode, bound -// to the specified address. Note that nni_plat_tcpsock should be defined -// to whatever your platform uses. For most systems its just "int". -extern int nni_plat_tcp_listen(nni_plat_tcpsock *, const nni_sockaddr *); - -// nni_plat_tcp_accept does the accept to accept an inbound connection. -// The tcpsock used for the server will have been set up with the -// nni_plat_tcp_listen. -extern int nni_plat_tcp_accept(nni_plat_tcpsock *, nni_plat_tcpsock *); - -// nni_plat_tcp_connect is the client side. Two addresses are supplied, -// as the client may specify a local address to which to bind. This -// second address may be NULL to use ephemeral ports, which is the -// usual default. -extern int nni_plat_tcp_connect(nni_plat_tcpsock *, const nni_sockaddr *, - const nni_sockaddr *); - -// nni_plat_tcp_aio_send sends the data to the remote side asynchronously. -// The data to send is stored in the a_iov field of the aio, and the array -// of iovs will never be larger than 4. The platform may modify the iovs, -// or the iov list. -extern void nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *); - -// nni_plat_tcp_aio_recv recvs data into the buffers provided by the -// iovs. The implementation does not return until the iovs are completely -// full, or an error condition occurs. -extern void nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *); +typedef struct nni_plat_tcp_ep nni_plat_tcp_ep; +typedef struct nni_plat_tcp_pipe nni_plat_tcp_pipe; + +// nni_plat_tcp_ep_init creates a new endpoint associated with the url. +extern int nni_plat_tcp_ep_init(nni_plat_tcp_ep **, const char *, int); + +// nni_plat_tcp_ep_fini closes the endpoint and releases resources. +extern void nni_plat_tcp_ep_fini(nni_plat_tcp_ep *); + +// nni_plat_tcp_ep_close closes the endpoint; this might not close the +// actual underlying socket, but it should call shutdown on it. +// Further operations on the pipe should return NNG_ECLOSED. +extern void nni_plat_tcp_ep_close(nni_plat_tcp_ep *); + +// nni_plat_tcp_listen creates an TCP socket in listening mode, bound +// to the specified path. +extern int nni_plat_tcp_ep_listen(nni_plat_tcp_ep *); + +// nni_plat_tcp_ep_accept starts an accept to receive an incoming connection. +// An accepted connection will be passed back in the a_pipe member. +extern void nni_plat_tcp_ep_accept(nni_plat_tcp_ep *, nni_aio *); + +// nni_plat_tcp_connect is the client side. +// An accepted connection will be passed back in the a_pipe member. +extern void nni_plat_tcp_ep_connect(nni_plat_tcp_ep *, nni_aio *); + +// nni_plat_tcp_pipe_fini closes the pipe, and releases all resources +// associated with it. +extern void nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *); + +// nni_plat_tcp_pipe_close closes the socket, or at least shuts it down. +// Further operations on the pipe should return NNG_ECLOSED. +extern void nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *); + +// nni_plat_tcp_pipe_send sends data in the iov buffers to the peer. +// The platform may modify the iovs. +extern void nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *, nni_aio *); + +// nni_plat_tcp_pipe_recv recvs data into the buffers provided by the iovs. +// The platform may modify the iovs. +extern void nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *, nni_aio *); // nni_plat_tcp_resolv resolves a TCP name asynchronously. The family // should be one of NNG_AF_INET, NNG_AF_INET6, or NNG_AF_UNSPEC. The @@ -258,8 +258,8 @@ extern void nni_plat_tcp_resolv(const char *, const char *, int, int, typedef struct nni_plat_ipc_ep nni_plat_ipc_ep; typedef struct nni_plat_ipc_pipe nni_plat_ipc_pipe; -// nni_plat_ipc_ep_init creates a new endpoint associated with the path. -extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const char *); +// nni_plat_ipc_ep_init creates a new endpoint associated with the url. +extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const char *, int); // nni_plat_ipc_ep_fini closes the endpoint and releases resources. extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *); @@ -270,8 +270,7 @@ extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *); extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *); // nni_plat_tcp_listen creates an IPC socket in listening mode, bound -// to the specified path. Note that nni_plat_ipcsock should be defined -// to whatever your platform uses. For most systems its just "int". +// to the specified path. extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *); // nni_plat_ipc_ep_accept starts an accept to receive an incoming connection. diff --git a/src/core/socket.c b/src/core/socket.c index d9d9ae3b..66f5c63b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -775,7 +775,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) nni_ep *ep; int rv; - if ((rv = nni_ep_create(&ep, sock, addr)) != 0) { + if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) { return (rv); } @@ -795,7 +795,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) nni_ep *ep; int rv; - if ((rv = nni_ep_create(&ep, sock, addr)) != 0) { + if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) { return (rv); } diff --git a/src/core/transport.c b/src/core/transport.c index de94596c..ce61f722 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -24,6 +24,7 @@ static nni_tran *transports[] = { NULL }; + nni_tran * nni_tran_find(const char *addr) { diff --git a/src/core/transport.h b/src/core/transport.h index dd4da299..8098e5c2 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -40,7 +40,7 @@ struct nni_tran { struct nni_tran_ep { // ep_init creates a vanilla endpoint. The value created is // used for the first argument for all other endpoint functions. - int (*ep_init)(void **, const char *, nni_sock *); + int (*ep_init)(void **, const char *, nni_sock *, int); // ep_fini frees the resources associated with the endpoint. // The endpoint will already have been closed. @@ -42,13 +42,15 @@ extern "C" { // Types common to nng. typedef uint32_t nng_socket; -typedef uint32_t nng_endpoint; +typedef uint32_t nng_dialer; +typedef uint32_t nng_listener; typedef uint32_t nng_pipe; typedef struct nng_msg nng_msg; typedef struct nng_event nng_event; typedef struct nng_notify nng_notify; typedef struct nng_snapshot nng_snapshot; typedef struct nng_stat nng_stat; +typedef uint32_t nng_endpoint; // XXX: REMOVE ME. // nng_open simply creates a socket of the given class. It returns an // error code on failure, or zero on success. The socket starts in cooked @@ -127,8 +129,13 @@ NNG_DECL void nng_unsetnotify(nng_socket, nng_notify *); #define NNG_EV_ERROR NNG_EV_BIT(2) #define NNG_EV_PIPE_ADD NNG_EV_BIT(3) #define NNG_EV_PIPE_REM NNG_EV_BIT(4) -#define NNG_EV_ENDPT_ADD NNG_EV_BIT(5) -#define NNG_EV_ENDPT_REM NNG_EV_BIT(6) +#define NNG_EV_DIALER_ADD NNG_EV_BIT(5) +#define NNG_EV_DIALER_REM NNG_EV_BIT(6) +#define NNG_EV_LISTENER_ADD NNG_EV_BIT(7) +#define NNG_EV_LISTENER_REM NNG_EV_BIT(8) +// XXX: Remove these. +#define NNG_EV_ENDPT_ADD NNG_EV_DIALER_ADD +#define NNG_EV_ENDPT_REM NNG_EV_DIALER_REM // The following functions return more detailed information about the event. // Some of the values will not make sense for some event types, in which case @@ -145,7 +152,7 @@ NNG_DECL const char *nng_event_reason(nng_event *); // endpoint pointer, if it is not NULL. The flags may be NNG_FLAG_SYNCH to // indicate that a failure setting the socket up should return an error // back to the caller immediately. -NNG_DECL int nng_listen(nng_socket, const char *, nng_endpoint *, int); +NNG_DECL int nng_listen(nng_socket, const char *, nng_listener *, int); // nng_dial creates a dialing endpoint, with no special options, and // starts it dialing. Dialers have at most one active connection at a time @@ -155,24 +162,28 @@ NNG_DECL int nng_listen(nng_socket, const char *, nng_endpoint *, int); // dial will be made synchronously, and a failure condition returned back // to the caller. (If the connection is dropped, it will still be // reconnected in the background -- only the initial connect is synchronous.) -NNG_DECL int nng_dial(nng_socket, const char *, nng_endpoint *, int); +NNG_DECL int nng_dial(nng_socket, const char *, nng_dialer *, int); -// nng_endpoint_create creates an endpoint on the socket, but does not -// start it either dialing or listening. -NNG_DECL int nng_endpoint_create(nng_socket, const char *, nng_endpoint *); +// nng_dialer_create creates a new dialer, that is not yet started. +NNG_DECL int nng_dialer_create(nng_socket, const char *, nng_dialer *); -// nng_endpoint_dial starts the endpoint dialing. This is only possible if -// the endpoint is not already dialing or listening. -NNG_DECL int nng_endpoint_dial(nng_endpoint, int); +// nng_listener_create creates a new listener, that is not yet started. +NNG_DECL int nng_listener_create(nng_socket, const char *, nng_listener *); -// nng_endpoint_listen starts the endpoint listening. This is only possible if -// the endpoint is not already dialing or listening. -NNG_DECL int nng_endpoint_listen(nng_endpoint, int); +// nng_dialer_start starts the endpoint dialing. This is only possible if +// the dialer is not already dialing. +NNG_DECL int nng_dialer_start(nng_dialer, int); + +// nng_listener_start starts the endpoint listening. This is only possible if +// the listener is not already listening. +NNG_DECL int nng_listener_start(nng_listener, int); // nng_endpoint_close closes the endpoint, shutting down all underlying // connections and releasing all associated resources. It is an error to // refer to the endpoint after this is called. NNG_DECL int nng_endpoint_close(nng_endpoint); +NNG_DECL int nng_dialer_close(nng_dialer); +NNG_DECL int nng_listener_close(nng_listener); // nng_endpoint_setopt sets an option for a specific endpoint. Note // endpoint options may not be altered on a running endpoint. diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 3d703e97..f64e25e9 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -8,11 +8,10 @@ // #include "core/nng_impl.h" -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_pollq.h" -#include "platform/posix/posix_socket.h" #ifdef PLATFORM_POSIX_EPDESC +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_pollq.h" #include <errno.h> #include <stdlib.h> @@ -180,6 +179,30 @@ nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) static void +nni_posix_epdesc_doerror(nni_posix_epdesc *ed) +{ + nni_aio *aio; + int rv = 1; + socklen_t sz = sizeof (rv); + + if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + if (rv == 0) { + return; + } + rv = nni_plat_errno(rv); + + while ((aio = nni_list_first(&ed->acceptq)) != NULL) { + nni_posix_epdesc_finish(aio, rv, 0); + } + while ((aio = nni_list_first(&ed->connectq)) != NULL) { + nni_posix_epdesc_finish(aio, rv, 0); + } +} + + +static void nni_posix_epdesc_doclose(nni_posix_epdesc *ed) { nni_aio *aio; @@ -191,6 +214,8 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed) if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) { (void) unlink(sun->sun_path); } + (void) close(ed->fd); + ed->fd = -1; } while ((aio = nni_list_first(&ed->acceptq)) != NULL) { nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); @@ -214,7 +239,10 @@ nni_posix_epdesc_cb(void *arg) if (ed->node.revents & POLLOUT) { nni_posix_epdesc_doconnect(ed); } - if (ed->node.revents & (POLLHUP|POLLERR|POLLNVAL)) { + if (ed->node.revents & (POLLERR|POLLHUP)) { + nni_posix_epdesc_doerror(ed); + } + if (ed->node.revents & POLLNVAL) { nni_posix_epdesc_doclose(ed); } ed->node.revents = 0; @@ -241,6 +269,58 @@ nni_posix_epdesc_close(nni_posix_epdesc *ed) } +static int +nni_posix_epdesc_parseaddr(char *pair, char **hostp, uint16_t *portp) +{ + char *host, *port, *end; + char c; + int val; + + if (pair[0] == '[') { + host = pair+1; + // IP address enclosed ... for IPv6 usually. + if ((end = strchr(host, ']')) == NULL) { + return (NNG_EADDRINVAL); + } + *end = '\0'; + port = end + 1; + if (*port == ':') { + port++; + } else if (port != '\0') { + return (NNG_EADDRINVAL); + } + } else { + host = pair; + port = strchr(host, ':'); + if (port != NULL) { + *port = '\0'; + port++; + } + } + val = 0; + while ((c = *port) != '\0') { + val *= 10; + if ((c >= '0') && (c <= '9')) { + val += (c - '0'); + } else { + return (NNG_EADDRINVAL); + } + if (val > 65535) { + return (NNG_EADDRINVAL); + } + port++; + } + if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) { + *hostp = NULL; + } else { + *hostp = host; + } + // Stash the port in big endian (network) byte order. + NNI_PUT16((uint8_t *) portp, val); + return (0); +} + + int nni_posix_epdesc_listen(nni_posix_epdesc *ed) { @@ -250,6 +330,7 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) int fd; nni_mtx_lock(&ed->mtx); + ss = &ed->locaddr; len = ed->loclen; @@ -274,6 +355,8 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) return (rv); } + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + ed->fd = fd; ed->node.fd = fd; nni_mtx_unlock(&ed->mtx); @@ -349,7 +432,10 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) } } + (void) fcntl(ed->fd, F_SETFL, O_NONBLOCK); + rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen); + if (rv == 0) { // Immediate connect, cool! This probably only happens on // loopback, and probably not on every platform. @@ -460,7 +546,9 @@ nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len) void nni_posix_epdesc_fini(nni_posix_epdesc *ed) { - // XXX: MORE WORK HERE. + if (ed->fd >= 0) { + (void) close(ed->fd); + } nni_mtx_fini(&ed->mtx); NNI_FREE_STRUCT(ed); } diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index 5bd42190..706ef15f 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -11,7 +11,6 @@ #ifdef PLATFORM_POSIX_IPC #include "platform/posix/posix_aio.h" -#include "platform/posix/posix_socket.h" #include <errno.h> #include <stdlib.h> @@ -41,37 +40,40 @@ // We alias nni_posix_pipedesc to nni_plat_ipc_pipe. // We alias nni_posix_epdesc to nni_plat_ipc_ep. -static int -nni_plat_ipc_path_resolve(struct sockaddr_un *sun, const char *path) -{ - size_t len; - - memset(sun, 0, sizeof (*sun)); - - // TODO: abstract sockets, including autobind sockets. - len = strlen(path); - if ((len >= sizeof (sun->sun_path)) || (len < 1)) { - return (NNG_EADDRINVAL); - } - (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", path); - sun->sun_family = AF_UNIX; - return (0); -} - - int -nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url) +nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) { nni_posix_epdesc *ed; int rv; + struct sockaddr_un sun; + const char *path; if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { return (NNG_EADDRINVAL); } - url += strlen("ipc://"); // skip the prefix. + path = url + strlen("ipc://"); // skip the prefix. + + // prepare the sockaddr_un + sun.sun_family = AF_UNIX; + if (strlen(url) >= sizeof (sun.sun_path)) { + return (NNG_EADDRINVAL); + } + snprintf(sun.sun_path, sizeof (sun.sun_path), "%s", path); + if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { return (rv); } + switch (mode) { + case NNI_EP_MODE_DIAL: + nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun)); + break; + case NNI_EP_MODE_LISTEN: + nni_posix_epdesc_set_local(ed, &sun, sizeof (sun)); + break; + default: + nni_posix_epdesc_fini(ed); + return (NNG_EINVAL); + } *epp = (void *) ed; return (0); @@ -98,10 +100,14 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) // will just try to cleanup the old socket. Note that this is not // perfect in all scenarios, so use this with caution. static int -nni_plat_ipc_remove_stale(struct sockaddr_un *sun) +nni_plat_ipc_remove_stale(const char *path) { int fd; int rv; + struct sockaddr_un sun; + + sun.sun_family = AF_UNIX; + snprintf(sun.sun_path, sizeof (sun.sun_path), "%s", path); if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { return (nni_plat_errno(errno)); @@ -113,9 +119,9 @@ nni_plat_ipc_remove_stale(struct sockaddr_un *sun) // then the cleanup will fail. As this is supposed to be an // exceptional case, don't worry. (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if (connect(fd, (void *) sun, sizeof (*sun)) < 0) { + if (connect(fd, (void *) &sun, sizeof (sun)) < 0) { if (errno == ECONNREFUSED) { - (void) unlink(sun->sun_path); + (void) unlink(path); } } (void) close(fd); @@ -132,17 +138,11 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) int rv; path = nni_posix_epdesc_url(ed); + path += strlen("ipc://"); - if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { - return (rv); - } - - if ((rv = nni_plat_ipc_remove_stale(&sun)) != 0) { + if ((rv = nni_plat_ipc_remove_stale(path)) != 0) { return (rv); } - - nni_posix_epdesc_set_local(ed, &sun, sizeof (sun)); - return (nni_posix_epdesc_listen(ed)); } @@ -150,21 +150,7 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) { - const char *path; - nni_posix_epdesc *ed = (void *) ep; - struct sockaddr_un sun; - int rv; - - path = nni_posix_epdesc_url(ed); - - if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { - nni_aio_finish(aio, rv, 0); - return; - } - - nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun)); - - nni_posix_epdesc_connect(ed, aio); + nni_posix_epdesc_connect((void *) ep, aio); } diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index 6d0d7c08..26ec3632 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -11,9 +11,9 @@ #ifdef PLATFORM_POSIX_NET #include "platform/posix/posix_aio.h" -#include "platform/posix/posix_socket.h" #include <errno.h> +#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> @@ -24,101 +24,181 @@ #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> -#include <netdb.h> -// We alias nni_plat_tcpsock to an nni_posix_sock. +static int +nni_posix_tcp_addr(struct sockaddr_storage *ss, const nni_sockaddr *sa) +{ + struct sockaddr_in *sin; + struct sockaddr_in6 *sin6; + + switch (sa->s_un.s_family) { + case NNG_AF_INET: + sin = (void *) ss; + memset(sin, 0, sizeof (*sin)); + sin->sin_family = PF_INET; + sin->sin_port = sa->s_un.s_in.sa_port; + sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; + return (sizeof (*sin)); + + + case NNG_AF_INET6: + sin6 = (void *) ss; + memset(sin6, 0, sizeof (*sin6)); +#ifdef SIN6_LEN + sin6->sin6_len = sizeof (*sin6); +#endif + sin6->sin6_family = PF_INET6; + sin6->sin6_port = sa->s_un.s_in6.sa_port; + memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); + return (sizeof (*sin6)); + } + return (-1); +} + + +extern int nni_tcp_parse_url(char *, char **, char **, char **, char **); int -nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags) +nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode) { - struct addrinfo hint; - struct addrinfo *ai; - - memset(&hint, 0, sizeof (hint)); - hint.ai_flags = AI_PASSIVE | AI_ADDRCONFIG | AI_NUMERICSERV; - hint.ai_family = PF_UNSPEC; - hint.ai_socktype = SOCK_STREAM; - hint.ai_protocol = IPPROTO_TCP; - if (flags & NNI_FLAG_IPV4ONLY) { - hint.ai_family = PF_INET; + nni_posix_epdesc *ed; + char buf[NNG_MAXADDRLEN]; + int rv; + char *lhost, *rhost; + char *lserv, *rserv; + char *sep; + struct sockaddr_storage ss; + int len; + int passive; + nni_aio aio; + + if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { + return (rv); } - if (getaddrinfo(host, "1", &hint, &ai) != 0) { - return (NNG_EADDRINVAL); + // Make a local copy. + snprintf(buf, sizeof (buf), "%s", url); + nni_aio_init(&aio, NULL, NULL); + + if (mode == NNI_EP_MODE_DIAL) { + rv = nni_tcp_parse_url(buf, &rhost, &rserv, &lhost, &lserv); + if (rv != 0) { + goto done; + } + + // We have to have a remote destination! + if ((rhost == NULL) || (rserv == NULL)) { + rv = NNG_EADDRINVAL; + goto done; + } + } else { + rv = nni_tcp_parse_url(buf, &lhost, &lserv, &rhost, &rserv); + if (rv != 0) { + goto done; + } + if ((rhost != NULL) || (rserv != NULL)) { + // remotes are nonsensical here. + rv = NNG_EADDRINVAL; + goto done; + } + if (lserv == NULL) { + // missing port to listen on! + rv = NNG_EADDRINVAL; + goto done; + } } - if (nni_posix_from_sockaddr(addr, ai->ai_addr) < 0) { - freeaddrinfo(ai); - return (NNG_EADDRINVAL); + if ((rserv != NULL) || (rhost != NULL)) { + nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, 0, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) != 0) { + goto done; + } + len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]); + nni_posix_epdesc_set_remote(ed, &ss, len); } - freeaddrinfo(ai); + + if ((lserv != NULL) || (lhost != NULL)) { + nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, 1, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) != 0) { + goto done; + } + len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]); + nni_posix_epdesc_set_local(ed, &ss, len); + } + *epp = (void *) ed; return (0); + +done: + if (rv != 0) { + nni_posix_epdesc_fini(ed); + } + nni_aio_fini(&aio); + return (rv); } void -nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio) +nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) { - nni_posix_sock_aio_send((void *) s, aio); + nni_posix_epdesc_fini((void *) ep); } void -nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio) +nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) { - nni_posix_sock_aio_recv((void *) s, aio); + nni_posix_epdesc_close((void *) ep); } int -nni_plat_tcp_init(nni_plat_tcpsock **sp) +nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep) { - nni_posix_sock *s; - int rv; + return (nni_posix_epdesc_listen((void *) ep)); +} - if ((rv = nni_posix_sock_init(&s)) == 0) { - *sp = (void *) s; - } - return (rv); + +void +nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) +{ + return (nni_posix_epdesc_connect((void *) ep, aio)); } void -nni_plat_tcp_fini(nni_plat_tcpsock *s) +nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) { - nni_posix_sock_fini((void *) s); + return (nni_posix_epdesc_accept((void *) ep, aio)); } void -nni_plat_tcp_shutdown(nni_plat_tcpsock *s) +nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p) { - nni_posix_sock_shutdown((void *) s); + nni_posix_pipedesc_fini((void *) p); } -int -nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) +void +nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p) { - return (nni_posix_sock_listen((void *) s, addr)); + nni_posix_pipedesc_close((void *) p); } -// nni_plat_tcp_connect establishes an outbound connection. It the -// bind address is not null, then it will attempt to bind to the local -// address specified first. -int -nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, - const nni_sockaddr *bindaddr) +void +nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio) { - return (nni_posix_sock_connect_sync((void *) s, addr, bindaddr)); + nni_posix_pipedesc_send((void *) p, aio); } -int -nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) +void +nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) { - return (nni_posix_sock_accept_sync((void *) s, (void *) server)); + nni_posix_pipedesc_recv((void *) p, aio); } diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index c3e29c33..c509643e 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -8,10 +8,10 @@ // #include "core/nng_impl.h" -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_pollq.h" #ifdef PLATFORM_POSIX_PIPEDESC +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_pollq.h" #include <errno.h> #include <stdlib.h> @@ -183,6 +183,8 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) if (pd->fd != -1) { // Let any peer know we are closing. (void) shutdown(pd->fd, SHUT_RDWR); + close(pd->fd); + pd->fd = -1; } while ((aio = nni_list_first(&pd->readq)) != NULL) { nni_posix_pipedesc_finish(aio, NNG_ECLOSED); diff --git a/src/platform/posix/posix_socket.c b/src/platform/posix/posix_socket.c deleted file mode 100644 index 9e054c42..00000000 --- a/src/platform/posix/posix_socket.c +++ /dev/null @@ -1,422 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef PLATFORM_POSIX_SOCKET -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_socket.h" - -#include <errno.h> -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/uio.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <arpa/inet.h> -#include <sys/un.h> -#include <fcntl.h> -#include <unistd.h> -#include <netdb.h> - -// Solaris/SunOS systems define this, which collides with our symbol -// names. Just undefine it now. -#ifdef sun -#undef sun -#endif - - -#ifdef SOCK_CLOEXEC -#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) -#else -#define NNI_STREAM_SOCKTYPE SOCK_STREAM -#endif - -struct nni_posix_sock { - int fd; - char * unlink; // path to unlink at unbind - nni_posix_pipedesc * pd; - int tcpnodelay; -}; - -int -nni_posix_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa) -{ - struct sockaddr_in *sin; - struct sockaddr_un *sun; - -#ifdef PF_INET6 - struct sockaddr_in6 *sin6; -#endif - - switch (sa->s_un.s_family) { - case NNG_AF_INET: - sin = (void *) ss; - memset(sin, 0, sizeof (*sin)); - sin->sin_family = PF_INET; - sin->sin_port = sa->s_un.s_in.sa_port; - sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; - return (sizeof (*sin)); - -#ifdef PF_INET6 - // Not every platform can do IPv6. Amazingly. - case NNG_AF_INET6: - sin6 = (void *) ss; - memset(sin6, 0, sizeof (*sin6)); -#ifdef SIN6_LEN - sin6->sin6_len = sizeof (*sin6); -#endif - sin6->sin6_family = PF_INET6; - sin6->sin6_port = sa->s_un.s_in6.sa_port; - memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); - return (sizeof (*sin6)); - -#endif // PF_INET6 - - case NNG_AF_IPC: - sun = (void *) ss; - memset(sun, 0, sizeof (*sun)); - // NB: This logic does not support abstract sockets, which - // have their first byte NULL, and rely on length instead. - // Probably for dealing with abstract sockets we will just - // handle @ specially in the future. - if (strlen(sa->s_un.s_path.sa_path) >= - sizeof (sun->sun_path)) { - return (-1); // caller converts to NNG_EADDRINVAL - } - - sun->sun_family = PF_UNIX; - (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", - sa->s_un.s_path.sa_path); - return (sizeof (*sun)); - } - return (-1); -} - - -int -nni_posix_from_sockaddr(nni_sockaddr *sa, const struct sockaddr *ss) -{ - const struct sockaddr_in *sin; - const struct sockaddr_un *sun; - -#ifdef PF_INET6 - const struct sockaddr_in6 *sin6; -#endif - - memset(sa, 0, sizeof (*sa)); - switch (ss->sa_family) { - case PF_INET: - sin = (const void *) ss; - sa->s_un.s_in.sa_family = NNG_AF_INET; - sa->s_un.s_in.sa_port = sin->sin_port; - sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr; - return (0); - -#ifdef PF_INET6 - case PF_INET6: - sin6 = (const void *) ss; - sa->s_un.s_in6.sa_family = NNG_AF_INET6; - sa->s_un.s_in6.sa_port = sin6->sin6_port; - memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); - return (0); - -#endif // PF_INET6 - - case PF_UNIX: - // NB: This doesn't handle abstract sockets! - sun = (const void *) ss; - sa->s_un.s_path.sa_family = NNG_AF_IPC; - snprintf(sa->s_un.s_path.sa_path, - sizeof (sa->s_un.s_path.sa_path), "%s", sun->sun_path); - return (0); - } - return (-1); -} - - -void -nni_posix_sock_aio_send(nni_posix_sock *s, nni_aio *aio) -{ - nni_posix_pipedesc_send(s->pd, aio); -} - - -void -nni_posix_sock_aio_recv(nni_posix_sock *s, nni_aio *aio) -{ - nni_posix_pipedesc_recv(s->pd, aio); -} - - -static void -nni_posix_sock_setopts_fd(int fd, int tcpnodelay) -{ - int one; - - // Try to ensure that both CLOEXEC is set, and that we don't - // generate SIGPIPE. (Note that SIGPIPE suppression in this way - // only works on BSD systems. Linux wants us to use sendmsg().) - (void) fcntl(fd, F_SETFD, FD_CLOEXEC); -#if defined(F_SETNOSIGPIPE) - (void) fcntl(fd, F_SETNOSIGPIPE, 1); -#elif defined(SO_NOSIGPIPE) - one = 1; - (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one)); -#endif - - // Also disable Nagle. We are careful to group data with writev, - // and latency is king for most of our users. (Consider adding - // a method to enable this later.) - - // It's unclear whether this is safe for UNIX domain sockets. It - // *should* be. - if (tcpnodelay) { - one = 1; - (void) setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, - sizeof (one)); - } -} - - -int -nni_posix_sock_init(nni_posix_sock **sp) -{ - nni_posix_sock *s; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - s->fd = -1; - *sp = s; - return (0); -} - - -void -nni_posix_sock_fini(nni_posix_sock *s) -{ - if (s->fd != -1) { - (void) close(s->fd); - s->fd = -1; - } - if (s->pd != NULL) { - nni_posix_pipedesc_fini(s->pd); - } - if (s->unlink != NULL) { - (void) unlink(s->unlink); - nni_free(s->unlink, strlen(s->unlink) + 1); - } - NNI_FREE_STRUCT(s); -} - - -void -nni_posix_sock_shutdown(nni_posix_sock *s) -{ - if (s->fd != -1) { - (void) shutdown(s->fd, SHUT_RDWR); - // This causes the equivalent of a close. Hopefully waking - // up anything that didn't get the hint with the shutdown. - // (macOS does not see the shtudown). - (void) dup2(nni_plat_devnull, s->fd); - } - if (s->pd != NULL) { - nni_posix_pipedesc_close(s->pd); - } -} - - -int -nni_posix_sock_listen(nni_posix_sock *s, const nni_sockaddr *saddr) -{ - int len; - struct sockaddr_storage ss; - int rv; - int fd; - - if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - if ((saddr->s_un.s_family == NNG_AF_INET) || - (saddr->s_un.s_family == NNG_AF_INET6)) { - s->tcpnodelay = 1; - } - - nni_posix_sock_setopts_fd(fd, s->tcpnodelay); - - // UNIX DOMAIN SOCKETS -- these have names in the file namespace. - // We are going to check to see if there was a name already there. - // If there was, and nothing is listening (ECONNREFUSED), then we - // will just try to cleanup the old socket. Note that this is not - // perfect in all scenarios, so use this with caution. - if ((saddr->s_un.s_family == NNG_AF_IPC) && - (saddr->s_un.s_path.sa_path[0] != 0)) { - int chkfd; - if ((chkfd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { - (void) close(fd); - return (nni_plat_errno(errno)); - } - - // Nonblocking; we don't want to wait for remote server. - (void) fcntl(chkfd, F_SETFL, O_NONBLOCK); - if (connect(chkfd, (struct sockaddr *) &ss, len) < 0) { - if (errno == ECONNREFUSED) { - (void) unlink(saddr->s_un.s_path.sa_path); - } - } - (void) close(chkfd); - } - - if (bind(fd, (struct sockaddr *) &ss, len) < 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - - // For IPC, record the path so we unlink it later - if ((saddr->s_un.s_family == NNG_AF_IPC) && - (saddr->s_un.s_path.sa_path[0] != 0)) { - s->unlink = nni_alloc(strlen(saddr->s_un.s_path.sa_path) + 1); - if (s->unlink == NULL) { - (void) close(fd); - (void) unlink(saddr->s_un.s_path.sa_path); - return (NNG_ENOMEM); - } - strcpy(s->unlink, saddr->s_un.s_path.sa_path); - } - - // Listen -- 128 depth is probably sufficient. If it isn't, other - // bad things are going to happen. - if (listen(fd, 128) != 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - - s->fd = fd; - - return (0); -} - - -// These functions will need to be removed in the future. They are -// transition functions for now. - -int -nni_posix_sock_accept_sync(nni_posix_sock *s, nni_posix_sock *server) -{ - int fd; - int rv; - - for (;;) { -#ifdef NNG_USE_ACCEPT4 - fd = accept4(server->fd, NULL, NULL, SOCK_CLOEXEC); - if ((fd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { - fd = accept(server->fd, NULL, NULL); - } -#else - fd = accept(server->fd, NULL, NULL); -#endif - - if (fd < 0) { - return (nni_plat_errno(errno)); - } else { - break; - } - } - - nni_posix_sock_setopts_fd(fd, s->tcpnodelay); - - if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) { - close(fd); - return (rv); - } - s->fd = fd; - return (0); -} - - -int -nni_posix_sock_connect_sync(nni_posix_sock *s, const nni_sockaddr *addr, - const nni_sockaddr *bindaddr) -{ - int fd; - int len; - struct sockaddr_storage ss; - struct sockaddr_storage bss; - int rv; - - if ((len = nni_posix_to_sockaddr(&ss, addr)) < 0) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - - if ((addr->s_un.s_family == NNG_AF_INET) || - (addr->s_un.s_family == NNG_AF_INET6)) { - s->tcpnodelay = 1; - } - - if (bindaddr != NULL) { - if (bindaddr->s_un.s_family != addr->s_un.s_family) { - return (NNG_EINVAL); - } - if (nni_posix_to_sockaddr(&bss, bindaddr) < 0) { - return (NNG_EADDRINVAL); - } - if (bind(fd, (struct sockaddr *) &bss, len) < 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - } - - nni_posix_sock_setopts_fd(fd, s->tcpnodelay); - - if (connect(fd, (struct sockaddr *) &ss, len) != 0) { - rv = nni_plat_errno(errno); - // Unix domain sockets return ENOENT when nothing is there. - // Massage this into ECONNREFUSED, to provide more consistent - // behavior. - if (rv == NNG_ENOENT) { - rv = NNG_ECONNREFUSED; - } - (void) close(fd); - return (rv); - } - if (s->pd != NULL) { - // If we had a prior pipedesc hanging around, nuke it. - nni_posix_pipedesc_fini(s->pd); - s->pd = NULL; - } - if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) { - (void) close(fd); - return (rv); - } - s->fd = fd; - return (0); -} - - -#else - -// Suppress empty symbols warnings in ranlib. -int nni_posix_socket_not_used = 0; - -#endif // PLATFORM_POSIX_SOCKET diff --git a/src/platform/posix/posix_socket.h b/src/platform/posix/posix_socket.h deleted file mode 100644 index f3fb169c..00000000 --- a/src/platform/posix/posix_socket.h +++ /dev/null @@ -1,45 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#ifndef PLATFORM_POSIX_SOCKET_H -#define PLATFORM_POSIX_SOCKET_H - -// This file provides declarations for comment socket handling functions on -// POSIX platforms. We assume that TCP and Unix domain socket (IPC) all -// work using mostly comment socket handling routines. - -#include "core/nng_impl.h" - -#include "platform/posix/posix_aio.h" - -#include <sys/types.h> -#include <sys/socket.h> - -typedef struct nni_posix_sock nni_posix_sock; - -extern int nni_posix_to_sockaddr(struct sockaddr_storage *, - const nni_sockaddr *); -extern int nni_posix_from_sockaddr(nni_sockaddr *, const struct sockaddr *); -extern void nni_posix_sock_aio_send(nni_posix_sock *, nni_aio *); -extern void nni_posix_sock_aio_recv(nni_posix_sock *, nni_aio *); -extern int nni_posix_sock_init(nni_posix_sock **); -extern void nni_posix_sock_fini(nni_posix_sock *); -extern void nni_posix_sock_shutdown(nni_posix_sock *); -extern int nni_posix_sock_listen(nni_posix_sock *, const nni_sockaddr *); - -// These functions will need to be removed in the future. They are -// transition functions for now. - -extern int nni_posix_sock_send_sync(nni_posix_sock *, nni_iov *, int); -extern int nni_posix_sock_recv_sync(nni_posix_sock *, nni_iov *, int); -extern int nni_posix_sock_accept_sync(nni_posix_sock *, nni_posix_sock *); -extern int nni_posix_sock_connect_sync(nni_posix_sock *, - const nni_sockaddr *, const nni_sockaddr *); - -#endif // PLATFORM_POSIX_SOCKET_H diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 4137984f..a2f24f8a 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -297,7 +297,7 @@ nni_plat_init(int (*helper)(void)) // probably get by with even just 8k, but Linux usually wants 16k // as a minimum. If this fails, its not fatal, just we won't be // as scalable / thrifty with our use of VM. - (void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); + //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); if ((rv = nni_posix_pollq_sysinit()) != 0) { pthread_mutex_unlock(&nni_plat_lock); @@ -316,7 +316,6 @@ nni_plat_init(int (*helper)(void)) pthread_condattr_destroy(&nni_cvattr); pthread_attr_destroy(&nni_pthread_attr); return (rv); - } if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 52ed582a..0489ea38 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -49,6 +49,7 @@ struct nni_inproc_ep { char addr[NNG_MAXADDRLEN+1]; int mode; int closed; + int started; nni_list_node node; uint16_t proto; nni_cv cv; @@ -217,7 +218,7 @@ nni_inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) static int -nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock) +nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) { nni_inproc_ep *ep; int rv; @@ -229,8 +230,9 @@ nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock) return (NNG_ENOMEM); } - ep->mode = NNI_INPROC_EP_IDLE; + ep->mode = mode; ep->closed = 0; + ep->started = 0; ep->proto = nni_sock_proto(sock); NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); nni_aio_list_init(&ep->aios); @@ -365,7 +367,6 @@ nni_inproc_accept_clients(nni_inproc_ep *server) pair->pipes[1]->peer = pair->pipes[0]->proto; pair->pipes[0]->peer = pair->pipes[1]->proto; pair->refcnt = 2; - client->mode = NNI_INPROC_EP_IDLE; // XXX: ?? nni_inproc_conn_finish(caio, 0); nni_inproc_conn_finish(saio, 0); @@ -390,12 +391,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) nni_inproc_ep *server; int rv; - if (ep->mode != NNI_INPROC_EP_IDLE) { + if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish(aio, NNG_EINVAL, 0); return; } + if (ep->started) { + nni_aio_finish(aio, NNG_EBUSY, 0); + return; + } if (nni_list_active(&ep->clients, ep)) { - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_aio_finish(aio, NNG_EBUSY, 0); return; } @@ -421,7 +426,8 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { - if (server->mode != NNI_INPROC_EP_LISTEN) { + if ((server->mode != NNI_EP_MODE_LISTEN) || + (server->started == 0)) { continue; } if (strcmp(server->addr, ep->addr) == 0) { @@ -434,7 +440,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) return; } - ep->mode = NNI_INPROC_EP_DIAL; nni_list_append(&server->clients, ep); nni_aio_list_append(&ep->aios, aio); @@ -450,16 +455,21 @@ nni_inproc_ep_bind(void *arg) nni_inproc_ep *srch; nni_list *list = &nni_inproc.servers; - if (ep->mode != NNI_INPROC_EP_IDLE) { + if (ep->mode != NNI_EP_MODE_LISTEN) { return (NNG_EINVAL); } nni_mtx_lock(&nni_inproc.mx); + if (ep->started) { + nni_mtx_unlock(&nni_inproc.mx); + return (NNG_EBUSY); + } if (ep->closed) { nni_mtx_unlock(&nni_inproc.mx); return (NNG_ECLOSED); } NNI_LIST_FOREACH (list, srch) { - if (srch->mode != NNI_INPROC_EP_LISTEN) { + if ((srch->mode != NNI_EP_MODE_LISTEN) || + (!srch->started)) { continue; } if (strcmp(srch->addr, ep->addr) == 0) { @@ -467,7 +477,7 @@ nni_inproc_ep_bind(void *arg) return (NNG_EADDRINUSE); } } - ep->mode = NNI_INPROC_EP_LISTEN; + ep->started = 1; nni_list_append(list, ep); nni_mtx_unlock(&nni_inproc.mx); return (0); @@ -481,10 +491,9 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) nni_inproc_pipe *pipe; int rv; - if (ep->mode != NNI_INPROC_EP_LISTEN) { + if (ep->mode != NNI_EP_MODE_LISTEN) { nni_aio_finish(aio, NNG_EINVAL, 0); } - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { nni_aio_finish(aio, rv, 0); return; @@ -494,13 +503,16 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) aio->a_pipe = pipe; // We are already on the master list of servers, thanks to bind. - if (ep->closed) { // This is the only possible error path from the // time we acquired the lock. nni_inproc_conn_finish(aio, NNG_ECLOSED); return; } + if (!ep->started) { + nni_inproc_conn_finish(aio, NNG_ESTATE); + return; + } // Insert us into the pending server aios, and then run the // accept list. diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index d9ddeb2d..fcfe44ea 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -493,7 +493,7 @@ nni_ipc_ep_fini(void *arg) static int -nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) +nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) { nni_ipc_ep *ep; int rv; @@ -508,7 +508,7 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) } if (((rv = nni_mtx_init(&ep->mtx)) != 0) || ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) || - ((rv = nni_plat_ipc_ep_init(&ep->iep, url)) != 0)) { + ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0)) { nni_ipc_ep_fini(ep); return (rv); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index a43763e2..237b0c07 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -22,7 +22,7 @@ typedef struct nni_tcp_ep nni_tcp_ep; // nni_tcp_pipe is one end of a TCP connection. struct nni_tcp_pipe { const char * addr; - nni_plat_tcpsock * tsp; + nni_plat_tcp_pipe * tpp; uint16_t peer; uint16_t proto; size_t rcvmax; @@ -46,17 +46,21 @@ struct nni_tcp_pipe { struct nni_tcp_ep { char addr[NNG_MAXADDRLEN+1]; - nni_plat_tcpsock * tsp; + nni_plat_tcp_ep * tep; int closed; uint16_t proto; size_t rcvmax; int ipv4only; + nni_aio aio; + nni_aio * user_aio; + nni_mtx mtx; }; static void nni_tcp_pipe_send_cb(void *); static void nni_tcp_pipe_recv_cb(void *); static void nni_tcp_pipe_nego_cb(void *); +static void nni_tcp_ep_cb(void *arg); static int nni_tcp_tran_init(void) @@ -76,7 +80,7 @@ nni_tcp_pipe_close(void *arg) { nni_tcp_pipe *pipe = arg; - nni_plat_tcp_shutdown(pipe->tsp); + nni_plat_tcp_pipe_close(pipe->tpp); } @@ -88,8 +92,8 @@ nni_tcp_pipe_fini(void *arg) nni_aio_fini(&pipe->rxaio); nni_aio_fini(&pipe->txaio); nni_aio_fini(&pipe->negaio); - if (pipe->tsp != NULL) { - nni_plat_tcp_fini(pipe->tsp); + if (pipe->tpp != NULL) { + nni_plat_tcp_pipe_fini(pipe->tpp); } if (pipe->rxmsg) { nni_msg_free(pipe->rxmsg); @@ -100,7 +104,7 @@ nni_tcp_pipe_fini(void *arg) static int -nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep) +nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) { nni_tcp_pipe *pipe; int rv; @@ -111,9 +115,6 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep) if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { goto fail; } - if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) { - goto fail; - } rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe); if (rv != 0) { goto fail; @@ -128,6 +129,9 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep) } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; + pipe->tpp = tpp; + pipe->addr = ep->addr; + *pipep = pipe; return (0); @@ -175,7 +179,7 @@ nni_tcp_pipe_nego_cb(void *arg) aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; aio->a_iov[0].iov_buf = &pipe->txlen[pipe->gottxhead]; // send it down... - nni_plat_tcp_aio_send(pipe->tsp, aio); + nni_plat_tcp_pipe_send(pipe->tpp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -183,7 +187,7 @@ nni_tcp_pipe_nego_cb(void *arg) aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; aio->a_iov[0].iov_buf = &pipe->rxlen[pipe->gotrxhead]; - nni_plat_tcp_aio_recv(pipe->tsp, aio); + nni_plat_tcp_pipe_recv(pipe->tpp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -296,7 +300,7 @@ nni_tcp_pipe_recv_cb(void *arg) pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); pipe->rxaio.a_niov = 1; - nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); return; } @@ -353,7 +357,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); pipe->txaio.a_niov = 3; - nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio); + nni_plat_tcp_pipe_send(pipe->tpp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); } @@ -392,7 +396,7 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen); pipe->rxaio.a_niov = 1; - nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); } @@ -431,59 +435,10 @@ nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp) static int -nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock) -{ - nni_tcp_ep *ep; - int rv; - - if (strlen(url) > NNG_MAXADDRLEN-1) { - return (NNG_EADDRINVAL); - } - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } - ep->closed = 0; - ep->proto = nni_sock_proto(sock); - ep->ipv4only = 0; // XXX: FIXME - ep->rcvmax = nni_sock_rcvmaxsz(sock); - - if ((rv = nni_plat_tcp_init(&ep->tsp)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } - - (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); - - *epp = ep; - return (0); -} - - -static void -nni_tcp_ep_fini(void *arg) +nni_tcp_parse_pair(char *pair, char **hostp, char **servp) { - nni_tcp_ep *ep = arg; - - nni_plat_tcp_fini(ep->tsp); - NNI_FREE_STRUCT(ep); -} - - -static void -nni_tcp_ep_close(void *arg) -{ - nni_tcp_ep *ep = arg; - - nni_plat_tcp_shutdown(ep->tsp); -} - - -static int -nni_parseaddr(char *pair, char **hostp, uint16_t *portp) -{ - char *host, *port, *end; + char *host, *serv, *end; char c; - int val; if (pair[0] == '[') { host = pair+1; @@ -492,40 +447,73 @@ nni_parseaddr(char *pair, char **hostp, uint16_t *portp) return (NNG_EADDRINVAL); } *end = '\0'; - port = end + 1; - if (*port == ':') { - port++; - } else if (port != '\0') { + serv = end + 1; + if (*serv == ':') { + serv++; + } else if (serv != '\0') { return (NNG_EADDRINVAL); } } else { host = pair; - port = strchr(host, ':'); - if (port != NULL) { - *port = '\0'; - port++; + serv = strchr(host, ':'); + if (serv != NULL) { + *serv = '\0'; + serv++; } } - val = 0; - while ((c = *port) != '\0') { - val *= 10; - if ((c >= '0') && (c <= '9')) { - val += (c - '0'); + if (hostp != NULL) { + if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) { + *hostp = NULL; } else { - return (NNG_EADDRINVAL); + *hostp = host; } - if (val > 65535) { - return (NNG_EADDRINVAL); + } + if (servp != NULL) { + if (strlen(serv) == 0) { + *servp = NULL; + } else { + *servp = serv; } - port++; } - if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) { - *hostp = NULL; + // Stash the port in big endian (network) byte order. + return (0); +} + + +// Note that the url *must* be in a modifiable buffer. +int +nni_tcp_parse_url(char *url, char **host1, char **serv1, char **host2, + char **serv2) +{ + char *h1; + int rv; + + if (strncmp(url, "tcp://", strlen("tcp://")) != 0) { + return (NNG_EADDRINVAL); + } + url += strlen("tcp://"); + if ((h1 = strchr(url, ';')) != 0) { + // For these we want the second part first, because + // the "primary" address is the remote address, and the + // "secondary" is the local (bind) address. This is only + // used for dial side. + *h1 = '\0'; + h1++; + if (((rv = nni_tcp_parse_pair(h1, host1, serv1)) != 0) || + ((rv = nni_tcp_parse_pair(url, host2, serv2)) != 0)) { + return (rv); + } } else { - *hostp = host; + if (host2 != NULL) { + *host2 = NULL; + } + if (serv2 != NULL) { + *serv2 = NULL; + } + if ((rv = nni_tcp_parse_pair(url, host1, serv1)) != 0) { + return (rv); + } } - // Stash the port in big endian (network) byte order. - NNI_PUT16((uint8_t *) portp, val); return (0); } @@ -556,11 +544,12 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) nni_mtx_unlock(&pipe->mtx); return; } - nni_plat_tcp_aio_send(pipe->tsp, &pipe->negaio); + nni_plat_tcp_pipe_send(pipe->tpp, &pipe->negaio); nni_mtx_unlock(&pipe->mtx); } +#if 0 static int nni_tcp_ep_connect_sync(void *arg, void **pipep) { @@ -659,23 +648,162 @@ nni_tcp_ep_bind(void *arg) } +#endif + +static void +nni_tcp_ep_fini(void *arg) +{ + nni_tcp_ep *ep = arg; + + if (ep->tep != NULL) { + nni_plat_tcp_ep_fini(ep->tep); + } + nni_aio_fini(&ep->aio); + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + + static int -nni_tcp_ep_accept_sync(void *arg, void **pipep) +nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) +{ + nni_tcp_ep *ep; + int rv; + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_mtx_init(&ep->mtx)) != 0) || + ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) || + ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0)) { + nni_tcp_ep_fini(ep); + return (rv); + } + ep->closed = 0; + ep->proto = nni_sock_proto(sock); + ep->rcvmax = nni_sock_rcvmaxsz(sock); + (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); + + *epp = ep; + return (0); +} + + +static void +nni_tcp_ep_close(void *arg) { nni_tcp_ep *ep = arg; + + nni_plat_tcp_ep_close(ep->tep); +} + + +static int +nni_tcp_ep_bind(void *arg) +{ + nni_tcp_ep *ep = arg; + + return (nni_plat_tcp_ep_listen(ep->tep)); +} + + +static void +nni_tcp_ep_finish(nni_tcp_ep *ep) +{ + nni_aio *aio = ep->user_aio; nni_tcp_pipe *pipe; int rv; - if ((rv = nni_tcp_pipe_init(&pipe, ep)) != 0) { - return (rv); + if ((aio = ep->user_aio) == NULL) { + return; + } + ep->user_aio = NULL; + if ((rv = nni_aio_result(&ep->aio)) != 0) { + goto done; } + NNI_ASSERT(ep->aio.a_pipe != NULL); - if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) { - nni_tcp_pipe_fini(pipe); - return (rv); + // Attempt to allocate the parent pipe. If this fails we'll + // drop the connection (ENOMEM probably). + if ((rv = nni_tcp_pipe_init(&pipe, ep, ep->aio.a_pipe)) != 0) { + nni_plat_tcp_pipe_fini(ep->aio.a_pipe); + goto done; } - *pipep = pipe; - return (0); + + aio->a_pipe = pipe; + +done: + ep->aio.a_pipe = NULL; + nni_aio_finish(aio, rv, 0); +} + + +static void +nni_tcp_ep_cb(void *arg) +{ + nni_tcp_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + nni_tcp_ep_finish(ep); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_tcp_cancel_ep(nni_aio *aio) +{ + nni_tcp_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&ep->mtx); + if (ep->user_aio == aio) { + ep->user_aio = NULL; + } + nni_aio_stop(&ep->aio); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_tcp_ep_accept(void *arg, nni_aio *aio) +{ + nni_tcp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; + + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; + } + + nni_plat_tcp_ep_accept(ep->tep, &ep->aio); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_tcp_ep_connect(void *arg, nni_aio *aio) +{ + nni_tcp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; + + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; + } + + nni_plat_tcp_ep_connect(ep->tep, &ep->aio); + nni_mtx_unlock(&ep->mtx); } @@ -690,14 +818,14 @@ static nni_tran_pipe nni_tcp_pipe_ops = { }; static nni_tran_ep nni_tcp_ep_ops = { - .ep_init = nni_tcp_ep_init, - .ep_fini = nni_tcp_ep_fini, - .ep_connect_sync = nni_tcp_ep_connect_sync, - .ep_bind = nni_tcp_ep_bind, - .ep_accept_sync = nni_tcp_ep_accept_sync, - .ep_close = nni_tcp_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_tcp_ep_init, + .ep_fini = nni_tcp_ep_fini, + .ep_connect = nni_tcp_ep_connect, + .ep_bind = nni_tcp_ep_bind, + .ep_accept = nni_tcp_ep_accept, + .ep_close = nni_tcp_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the TCP transport linkage, and should be the only global |
