aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/endpt.c26
-rw-r--r--src/core/endpt.h4
-rw-r--r--src/core/platform.h89
-rw-r--r--src/core/socket.c4
-rw-r--r--src/core/transport.c1
-rw-r--r--src/core/transport.h2
-rw-r--r--src/nng.h39
-rw-r--r--src/platform/posix/posix_epdesc.c98
-rw-r--r--src/platform/posix/posix_ipc.c80
-rw-r--r--src/platform/posix/posix_net.c176
-rw-r--r--src/platform/posix/posix_pipedesc.c6
-rw-r--r--src/platform/posix/posix_socket.c422
-rw-r--r--src/platform/posix/posix_socket.h45
-rw-r--r--src/platform/posix/posix_thread.c3
-rw-r--r--src/transport/inproc/inproc.c38
-rw-r--r--src/transport/ipc/ipc.c4
-rw-r--r--src/transport/tcp/tcp.c336
18 files changed, 610 insertions, 765 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 9392e423..581d1d6c 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -81,7 +81,6 @@ set (NNG_SOURCES
platform/posix/posix_config.h
platform/posix/posix_aio.h
platform/posix/posix_pollq.h
- platform/posix/posix_socket.h
platform/posix/posix_alloc.c
platform/posix/posix_clock.c
@@ -94,7 +93,6 @@ set (NNG_SOURCES
platform/posix/posix_pollq_poll.c
platform/posix/posix_rand.c
platform/posix/posix_resolv_gai.c
- platform/posix/posix_socket.c
platform/posix/posix_thread.c
platform/windows/win_impl.h
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 20874c90..e5a9a5bb 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -142,7 +142,7 @@ nni_ep_dtor(void *ptr)
int
-nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
+nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
{
nni_tran *tran;
nni_ep *ep;
@@ -162,6 +162,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
}
ep->ep_sock = sock;
ep->ep_tran = tran;
+ ep->ep_mode = mode;
// Could safely use strcpy here, but this avoids discussion.
(void) snprintf(ep->ep_addr, sizeof (ep->ep_addr), "%s", addr);
@@ -172,7 +173,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
ep->ep_ops = *tran->tran_ep;
- if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock)) != 0) {
+ if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock, mode)) != 0) {
nni_objhash_unref(nni_eps, id);
return (rv);
}
@@ -391,7 +392,11 @@ nni_ep_dial(nni_ep *ep, int flags)
int rv = 0;
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_mode != NNI_EP_MODE_IDLE) {
+ if (ep->ep_mode != NNI_EP_MODE_DIAL) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ return (NNG_ENOTSUP);
+ }
+ if (ep->ep_started) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_EBUSY);
}
@@ -404,14 +409,14 @@ nni_ep_dial(nni_ep *ep, int flags)
nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
- ep->ep_mode = NNI_EP_MODE_DIAL;
+ ep->ep_started = 1;
if (flags & NNG_FLAG_SYNCH) {
nni_mtx_unlock(&ep->ep_mtx);
rv = nni_ep_connect_sync(ep);
if (rv != 0) {
nni_thr_fini(&ep->ep_thr);
- ep->ep_mode = NNI_EP_MODE_IDLE;
+ ep->ep_started = 0;
return (rv);
}
nni_mtx_lock(&ep->ep_mtx);
@@ -561,11 +566,14 @@ nni_ep_listen(nni_ep *ep, int flags)
int rv = 0;
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_mode != NNI_EP_MODE_IDLE) {
+ if (ep->ep_mode != NNI_EP_MODE_LISTEN) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ return (NNG_ENOTSUP);
+ }
+ if (ep->ep_started) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_EBUSY);
}
-
if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
@@ -576,14 +584,14 @@ nni_ep_listen(nni_ep *ep, int flags)
return (rv);
}
- ep->ep_mode = NNI_EP_MODE_LISTEN;
+ ep->ep_started = 1;
if (flags & NNG_FLAG_SYNCH) {
nni_mtx_unlock(&ep->ep_mtx);
rv = ep->ep_ops.ep_bind(ep->ep_data);
if (rv != 0) {
nni_thr_fini(&ep->ep_thr);
- ep->ep_mode = NNI_EP_MODE_IDLE;
+ ep->ep_started = 0;
return (rv);
}
nni_mtx_lock(&ep->ep_mtx);
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 3d353cdf..becaa3f4 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -28,6 +28,7 @@ struct nni_ep {
char ep_addr[NNG_MAXADDRLEN];
nni_thr ep_thr;
int ep_mode;
+ int ep_started;
int ep_closed; // full shutdown
int ep_bound; // true if we bound locally
nni_mtx ep_mtx;
@@ -36,7 +37,6 @@ struct nni_ep {
nni_list ep_pipes;
};
-#define NNI_EP_MODE_IDLE 0
#define NNI_EP_MODE_DIAL 1
#define NNI_EP_MODE_LISTEN 2
@@ -46,7 +46,7 @@ extern int nni_ep_find(nni_ep **, uint32_t);
extern void nni_ep_hold(nni_ep *);
extern void nni_ep_rele(nni_ep *);
extern uint32_t nni_ep_id(nni_ep *);
-extern int nni_ep_create(nni_ep **, nni_sock *, const char *);
+extern int nni_ep_create(nni_ep **, nni_sock *, const char *, int);
extern void nni_ep_stop(nni_ep *);
extern void nni_ep_close(nni_ep *);
extern void nni_ep_remove(nni_ep *);
diff --git a/src/core/platform.h b/src/core/platform.h
index 0dcab40a..d5c6aa8a 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -200,47 +200,47 @@ extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int);
// TCP Support.
//
-// nni_plat_tcp_init initializes the socket, for example it can
-// set underlying file descriptors to -1, etc.
-extern int nni_plat_tcp_init(nni_plat_tcpsock **);
-
-// nni_plat_tcp_fini just closes a TCP socket, and releases any related
-// resources.
-extern void nni_plat_tcp_fini(nni_plat_tcpsock *);
-
-// nni_plat_tcp_shutdown performs a shutdown of the socket. For
-// BSD sockets, this closes both sides of the TCP connection gracefully,
-// but the underlying file descriptor is left open. (This part is critical
-// to prevention of close() related races.)
-extern void nni_plat_tcp_shutdown(nni_plat_tcpsock *);
-
-// nni_plat_tcp_listen creates a TCP socket in listening mode, bound
-// to the specified address. Note that nni_plat_tcpsock should be defined
-// to whatever your platform uses. For most systems its just "int".
-extern int nni_plat_tcp_listen(nni_plat_tcpsock *, const nni_sockaddr *);
-
-// nni_plat_tcp_accept does the accept to accept an inbound connection.
-// The tcpsock used for the server will have been set up with the
-// nni_plat_tcp_listen.
-extern int nni_plat_tcp_accept(nni_plat_tcpsock *, nni_plat_tcpsock *);
-
-// nni_plat_tcp_connect is the client side. Two addresses are supplied,
-// as the client may specify a local address to which to bind. This
-// second address may be NULL to use ephemeral ports, which is the
-// usual default.
-extern int nni_plat_tcp_connect(nni_plat_tcpsock *, const nni_sockaddr *,
- const nni_sockaddr *);
-
-// nni_plat_tcp_aio_send sends the data to the remote side asynchronously.
-// The data to send is stored in the a_iov field of the aio, and the array
-// of iovs will never be larger than 4. The platform may modify the iovs,
-// or the iov list.
-extern void nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *);
-
-// nni_plat_tcp_aio_recv recvs data into the buffers provided by the
-// iovs. The implementation does not return until the iovs are completely
-// full, or an error condition occurs.
-extern void nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *);
+typedef struct nni_plat_tcp_ep nni_plat_tcp_ep;
+typedef struct nni_plat_tcp_pipe nni_plat_tcp_pipe;
+
+// nni_plat_tcp_ep_init creates a new endpoint associated with the url.
+extern int nni_plat_tcp_ep_init(nni_plat_tcp_ep **, const char *, int);
+
+// nni_plat_tcp_ep_fini closes the endpoint and releases resources.
+extern void nni_plat_tcp_ep_fini(nni_plat_tcp_ep *);
+
+// nni_plat_tcp_ep_close closes the endpoint; this might not close the
+// actual underlying socket, but it should call shutdown on it.
+// Further operations on the pipe should return NNG_ECLOSED.
+extern void nni_plat_tcp_ep_close(nni_plat_tcp_ep *);
+
+// nni_plat_tcp_listen creates an TCP socket in listening mode, bound
+// to the specified path.
+extern int nni_plat_tcp_ep_listen(nni_plat_tcp_ep *);
+
+// nni_plat_tcp_ep_accept starts an accept to receive an incoming connection.
+// An accepted connection will be passed back in the a_pipe member.
+extern void nni_plat_tcp_ep_accept(nni_plat_tcp_ep *, nni_aio *);
+
+// nni_plat_tcp_connect is the client side.
+// An accepted connection will be passed back in the a_pipe member.
+extern void nni_plat_tcp_ep_connect(nni_plat_tcp_ep *, nni_aio *);
+
+// nni_plat_tcp_pipe_fini closes the pipe, and releases all resources
+// associated with it.
+extern void nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *);
+
+// nni_plat_tcp_pipe_close closes the socket, or at least shuts it down.
+// Further operations on the pipe should return NNG_ECLOSED.
+extern void nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *);
+
+// nni_plat_tcp_pipe_send sends data in the iov buffers to the peer.
+// The platform may modify the iovs.
+extern void nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *, nni_aio *);
+
+// nni_plat_tcp_pipe_recv recvs data into the buffers provided by the iovs.
+// The platform may modify the iovs.
+extern void nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *, nni_aio *);
// nni_plat_tcp_resolv resolves a TCP name asynchronously. The family
// should be one of NNG_AF_INET, NNG_AF_INET6, or NNG_AF_UNSPEC. The
@@ -258,8 +258,8 @@ extern void nni_plat_tcp_resolv(const char *, const char *, int, int,
typedef struct nni_plat_ipc_ep nni_plat_ipc_ep;
typedef struct nni_plat_ipc_pipe nni_plat_ipc_pipe;
-// nni_plat_ipc_ep_init creates a new endpoint associated with the path.
-extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const char *);
+// nni_plat_ipc_ep_init creates a new endpoint associated with the url.
+extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const char *, int);
// nni_plat_ipc_ep_fini closes the endpoint and releases resources.
extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *);
@@ -270,8 +270,7 @@ extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *);
extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *);
// nni_plat_tcp_listen creates an IPC socket in listening mode, bound
-// to the specified path. Note that nni_plat_ipcsock should be defined
-// to whatever your platform uses. For most systems its just "int".
+// to the specified path.
extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *);
// nni_plat_ipc_ep_accept starts an accept to receive an incoming connection.
diff --git a/src/core/socket.c b/src/core/socket.c
index d9d9ae3b..66f5c63b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -775,7 +775,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
- if ((rv = nni_ep_create(&ep, sock, addr)) != 0) {
+ if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) {
return (rv);
}
@@ -795,7 +795,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
- if ((rv = nni_ep_create(&ep, sock, addr)) != 0) {
+ if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) {
return (rv);
}
diff --git a/src/core/transport.c b/src/core/transport.c
index de94596c..ce61f722 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -24,6 +24,7 @@ static nni_tran *transports[] = {
NULL
};
+
nni_tran *
nni_tran_find(const char *addr)
{
diff --git a/src/core/transport.h b/src/core/transport.h
index dd4da299..8098e5c2 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -40,7 +40,7 @@ struct nni_tran {
struct nni_tran_ep {
// ep_init creates a vanilla endpoint. The value created is
// used for the first argument for all other endpoint functions.
- int (*ep_init)(void **, const char *, nni_sock *);
+ int (*ep_init)(void **, const char *, nni_sock *, int);
// ep_fini frees the resources associated with the endpoint.
// The endpoint will already have been closed.
diff --git a/src/nng.h b/src/nng.h
index 208df8e9..e9c5e32e 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -42,13 +42,15 @@ extern "C" {
// Types common to nng.
typedef uint32_t nng_socket;
-typedef uint32_t nng_endpoint;
+typedef uint32_t nng_dialer;
+typedef uint32_t nng_listener;
typedef uint32_t nng_pipe;
typedef struct nng_msg nng_msg;
typedef struct nng_event nng_event;
typedef struct nng_notify nng_notify;
typedef struct nng_snapshot nng_snapshot;
typedef struct nng_stat nng_stat;
+typedef uint32_t nng_endpoint; // XXX: REMOVE ME.
// nng_open simply creates a socket of the given class. It returns an
// error code on failure, or zero on success. The socket starts in cooked
@@ -127,8 +129,13 @@ NNG_DECL void nng_unsetnotify(nng_socket, nng_notify *);
#define NNG_EV_ERROR NNG_EV_BIT(2)
#define NNG_EV_PIPE_ADD NNG_EV_BIT(3)
#define NNG_EV_PIPE_REM NNG_EV_BIT(4)
-#define NNG_EV_ENDPT_ADD NNG_EV_BIT(5)
-#define NNG_EV_ENDPT_REM NNG_EV_BIT(6)
+#define NNG_EV_DIALER_ADD NNG_EV_BIT(5)
+#define NNG_EV_DIALER_REM NNG_EV_BIT(6)
+#define NNG_EV_LISTENER_ADD NNG_EV_BIT(7)
+#define NNG_EV_LISTENER_REM NNG_EV_BIT(8)
+// XXX: Remove these.
+#define NNG_EV_ENDPT_ADD NNG_EV_DIALER_ADD
+#define NNG_EV_ENDPT_REM NNG_EV_DIALER_REM
// The following functions return more detailed information about the event.
// Some of the values will not make sense for some event types, in which case
@@ -145,7 +152,7 @@ NNG_DECL const char *nng_event_reason(nng_event *);
// endpoint pointer, if it is not NULL. The flags may be NNG_FLAG_SYNCH to
// indicate that a failure setting the socket up should return an error
// back to the caller immediately.
-NNG_DECL int nng_listen(nng_socket, const char *, nng_endpoint *, int);
+NNG_DECL int nng_listen(nng_socket, const char *, nng_listener *, int);
// nng_dial creates a dialing endpoint, with no special options, and
// starts it dialing. Dialers have at most one active connection at a time
@@ -155,24 +162,28 @@ NNG_DECL int nng_listen(nng_socket, const char *, nng_endpoint *, int);
// dial will be made synchronously, and a failure condition returned back
// to the caller. (If the connection is dropped, it will still be
// reconnected in the background -- only the initial connect is synchronous.)
-NNG_DECL int nng_dial(nng_socket, const char *, nng_endpoint *, int);
+NNG_DECL int nng_dial(nng_socket, const char *, nng_dialer *, int);
-// nng_endpoint_create creates an endpoint on the socket, but does not
-// start it either dialing or listening.
-NNG_DECL int nng_endpoint_create(nng_socket, const char *, nng_endpoint *);
+// nng_dialer_create creates a new dialer, that is not yet started.
+NNG_DECL int nng_dialer_create(nng_socket, const char *, nng_dialer *);
-// nng_endpoint_dial starts the endpoint dialing. This is only possible if
-// the endpoint is not already dialing or listening.
-NNG_DECL int nng_endpoint_dial(nng_endpoint, int);
+// nng_listener_create creates a new listener, that is not yet started.
+NNG_DECL int nng_listener_create(nng_socket, const char *, nng_listener *);
-// nng_endpoint_listen starts the endpoint listening. This is only possible if
-// the endpoint is not already dialing or listening.
-NNG_DECL int nng_endpoint_listen(nng_endpoint, int);
+// nng_dialer_start starts the endpoint dialing. This is only possible if
+// the dialer is not already dialing.
+NNG_DECL int nng_dialer_start(nng_dialer, int);
+
+// nng_listener_start starts the endpoint listening. This is only possible if
+// the listener is not already listening.
+NNG_DECL int nng_listener_start(nng_listener, int);
// nng_endpoint_close closes the endpoint, shutting down all underlying
// connections and releasing all associated resources. It is an error to
// refer to the endpoint after this is called.
NNG_DECL int nng_endpoint_close(nng_endpoint);
+NNG_DECL int nng_dialer_close(nng_dialer);
+NNG_DECL int nng_listener_close(nng_listener);
// nng_endpoint_setopt sets an option for a specific endpoint. Note
// endpoint options may not be altered on a running endpoint.
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 3d703e97..f64e25e9 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -8,11 +8,10 @@
//
#include "core/nng_impl.h"
-#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_pollq.h"
-#include "platform/posix/posix_socket.h"
#ifdef PLATFORM_POSIX_EPDESC
+#include "platform/posix/posix_aio.h"
+#include "platform/posix/posix_pollq.h"
#include <errno.h>
#include <stdlib.h>
@@ -180,6 +179,30 @@ nni_posix_epdesc_doaccept(nni_posix_epdesc *ed)
static void
+nni_posix_epdesc_doerror(nni_posix_epdesc *ed)
+{
+ nni_aio *aio;
+ int rv = 1;
+ socklen_t sz = sizeof (rv);
+
+ if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ rv = errno;
+ }
+ if (rv == 0) {
+ return;
+ }
+ rv = nni_plat_errno(rv);
+
+ while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
+ nni_posix_epdesc_finish(aio, rv, 0);
+ }
+ while ((aio = nni_list_first(&ed->connectq)) != NULL) {
+ nni_posix_epdesc_finish(aio, rv, 0);
+ }
+}
+
+
+static void
nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
{
nni_aio *aio;
@@ -191,6 +214,8 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) {
(void) unlink(sun->sun_path);
}
+ (void) close(ed->fd);
+ ed->fd = -1;
}
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
@@ -214,7 +239,10 @@ nni_posix_epdesc_cb(void *arg)
if (ed->node.revents & POLLOUT) {
nni_posix_epdesc_doconnect(ed);
}
- if (ed->node.revents & (POLLHUP|POLLERR|POLLNVAL)) {
+ if (ed->node.revents & (POLLERR|POLLHUP)) {
+ nni_posix_epdesc_doerror(ed);
+ }
+ if (ed->node.revents & POLLNVAL) {
nni_posix_epdesc_doclose(ed);
}
ed->node.revents = 0;
@@ -241,6 +269,58 @@ nni_posix_epdesc_close(nni_posix_epdesc *ed)
}
+static int
+nni_posix_epdesc_parseaddr(char *pair, char **hostp, uint16_t *portp)
+{
+ char *host, *port, *end;
+ char c;
+ int val;
+
+ if (pair[0] == '[') {
+ host = pair+1;
+ // IP address enclosed ... for IPv6 usually.
+ if ((end = strchr(host, ']')) == NULL) {
+ return (NNG_EADDRINVAL);
+ }
+ *end = '\0';
+ port = end + 1;
+ if (*port == ':') {
+ port++;
+ } else if (port != '\0') {
+ return (NNG_EADDRINVAL);
+ }
+ } else {
+ host = pair;
+ port = strchr(host, ':');
+ if (port != NULL) {
+ *port = '\0';
+ port++;
+ }
+ }
+ val = 0;
+ while ((c = *port) != '\0') {
+ val *= 10;
+ if ((c >= '0') && (c <= '9')) {
+ val += (c - '0');
+ } else {
+ return (NNG_EADDRINVAL);
+ }
+ if (val > 65535) {
+ return (NNG_EADDRINVAL);
+ }
+ port++;
+ }
+ if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) {
+ *hostp = NULL;
+ } else {
+ *hostp = host;
+ }
+ // Stash the port in big endian (network) byte order.
+ NNI_PUT16((uint8_t *) portp, val);
+ return (0);
+}
+
+
int
nni_posix_epdesc_listen(nni_posix_epdesc *ed)
{
@@ -250,6 +330,7 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
int fd;
nni_mtx_lock(&ed->mtx);
+
ss = &ed->locaddr;
len = ed->loclen;
@@ -274,6 +355,8 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
return (rv);
}
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+
ed->fd = fd;
ed->node.fd = fd;
nni_mtx_unlock(&ed->mtx);
@@ -349,7 +432,10 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
}
}
+ (void) fcntl(ed->fd, F_SETFL, O_NONBLOCK);
+
rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen);
+
if (rv == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
@@ -460,7 +546,9 @@ nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len)
void
nni_posix_epdesc_fini(nni_posix_epdesc *ed)
{
- // XXX: MORE WORK HERE.
+ if (ed->fd >= 0) {
+ (void) close(ed->fd);
+ }
nni_mtx_fini(&ed->mtx);
NNI_FREE_STRUCT(ed);
}
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index 5bd42190..706ef15f 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -11,7 +11,6 @@
#ifdef PLATFORM_POSIX_IPC
#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_socket.h"
#include <errno.h>
#include <stdlib.h>
@@ -41,37 +40,40 @@
// We alias nni_posix_pipedesc to nni_plat_ipc_pipe.
// We alias nni_posix_epdesc to nni_plat_ipc_ep.
-static int
-nni_plat_ipc_path_resolve(struct sockaddr_un *sun, const char *path)
-{
- size_t len;
-
- memset(sun, 0, sizeof (*sun));
-
- // TODO: abstract sockets, including autobind sockets.
- len = strlen(path);
- if ((len >= sizeof (sun->sun_path)) || (len < 1)) {
- return (NNG_EADDRINVAL);
- }
- (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", path);
- sun->sun_family = AF_UNIX;
- return (0);
-}
-
-
int
-nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url)
+nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode)
{
nni_posix_epdesc *ed;
int rv;
+ struct sockaddr_un sun;
+ const char *path;
if (strncmp(url, "ipc://", strlen("ipc://")) != 0) {
return (NNG_EADDRINVAL);
}
- url += strlen("ipc://"); // skip the prefix.
+ path = url + strlen("ipc://"); // skip the prefix.
+
+ // prepare the sockaddr_un
+ sun.sun_family = AF_UNIX;
+ if (strlen(url) >= sizeof (sun.sun_path)) {
+ return (NNG_EADDRINVAL);
+ }
+ snprintf(sun.sun_path, sizeof (sun.sun_path), "%s", path);
+
if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) {
return (rv);
}
+ switch (mode) {
+ case NNI_EP_MODE_DIAL:
+ nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun));
+ break;
+ case NNI_EP_MODE_LISTEN:
+ nni_posix_epdesc_set_local(ed, &sun, sizeof (sun));
+ break;
+ default:
+ nni_posix_epdesc_fini(ed);
+ return (NNG_EINVAL);
+ }
*epp = (void *) ed;
return (0);
@@ -98,10 +100,14 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
// will just try to cleanup the old socket. Note that this is not
// perfect in all scenarios, so use this with caution.
static int
-nni_plat_ipc_remove_stale(struct sockaddr_un *sun)
+nni_plat_ipc_remove_stale(const char *path)
{
int fd;
int rv;
+ struct sockaddr_un sun;
+
+ sun.sun_family = AF_UNIX;
+ snprintf(sun.sun_path, sizeof (sun.sun_path), "%s", path);
if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
return (nni_plat_errno(errno));
@@ -113,9 +119,9 @@ nni_plat_ipc_remove_stale(struct sockaddr_un *sun)
// then the cleanup will fail. As this is supposed to be an
// exceptional case, don't worry.
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if (connect(fd, (void *) sun, sizeof (*sun)) < 0) {
+ if (connect(fd, (void *) &sun, sizeof (sun)) < 0) {
if (errno == ECONNREFUSED) {
- (void) unlink(sun->sun_path);
+ (void) unlink(path);
}
}
(void) close(fd);
@@ -132,17 +138,11 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
int rv;
path = nni_posix_epdesc_url(ed);
+ path += strlen("ipc://");
- if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) {
- return (rv);
- }
-
- if ((rv = nni_plat_ipc_remove_stale(&sun)) != 0) {
+ if ((rv = nni_plat_ipc_remove_stale(path)) != 0) {
return (rv);
}
-
- nni_posix_epdesc_set_local(ed, &sun, sizeof (sun));
-
return (nni_posix_epdesc_listen(ed));
}
@@ -150,21 +150,7 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
void
nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
- const char *path;
- nni_posix_epdesc *ed = (void *) ep;
- struct sockaddr_un sun;
- int rv;
-
- path = nni_posix_epdesc_url(ed);
-
- if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) {
- nni_aio_finish(aio, rv, 0);
- return;
- }
-
- nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun));
-
- nni_posix_epdesc_connect(ed, aio);
+ nni_posix_epdesc_connect((void *) ep, aio);
}
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index 6d0d7c08..26ec3632 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -11,9 +11,9 @@
#ifdef PLATFORM_POSIX_NET
#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_socket.h"
#include <errno.h>
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
@@ -24,101 +24,181 @@
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
-#include <netdb.h>
-// We alias nni_plat_tcpsock to an nni_posix_sock.
+static int
+nni_posix_tcp_addr(struct sockaddr_storage *ss, const nni_sockaddr *sa)
+{
+ struct sockaddr_in *sin;
+ struct sockaddr_in6 *sin6;
+
+ switch (sa->s_un.s_family) {
+ case NNG_AF_INET:
+ sin = (void *) ss;
+ memset(sin, 0, sizeof (*sin));
+ sin->sin_family = PF_INET;
+ sin->sin_port = sa->s_un.s_in.sa_port;
+ sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr;
+ return (sizeof (*sin));
+
+
+ case NNG_AF_INET6:
+ sin6 = (void *) ss;
+ memset(sin6, 0, sizeof (*sin6));
+#ifdef SIN6_LEN
+ sin6->sin6_len = sizeof (*sin6);
+#endif
+ sin6->sin6_family = PF_INET6;
+ sin6->sin6_port = sa->s_un.s_in6.sa_port;
+ memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16);
+ return (sizeof (*sin6));
+ }
+ return (-1);
+}
+
+
+extern int nni_tcp_parse_url(char *, char **, char **, char **, char **);
int
-nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags)
+nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode)
{
- struct addrinfo hint;
- struct addrinfo *ai;
-
- memset(&hint, 0, sizeof (hint));
- hint.ai_flags = AI_PASSIVE | AI_ADDRCONFIG | AI_NUMERICSERV;
- hint.ai_family = PF_UNSPEC;
- hint.ai_socktype = SOCK_STREAM;
- hint.ai_protocol = IPPROTO_TCP;
- if (flags & NNI_FLAG_IPV4ONLY) {
- hint.ai_family = PF_INET;
+ nni_posix_epdesc *ed;
+ char buf[NNG_MAXADDRLEN];
+ int rv;
+ char *lhost, *rhost;
+ char *lserv, *rserv;
+ char *sep;
+ struct sockaddr_storage ss;
+ int len;
+ int passive;
+ nni_aio aio;
+
+ if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) {
+ return (rv);
}
- if (getaddrinfo(host, "1", &hint, &ai) != 0) {
- return (NNG_EADDRINVAL);
+ // Make a local copy.
+ snprintf(buf, sizeof (buf), "%s", url);
+ nni_aio_init(&aio, NULL, NULL);
+
+ if (mode == NNI_EP_MODE_DIAL) {
+ rv = nni_tcp_parse_url(buf, &rhost, &rserv, &lhost, &lserv);
+ if (rv != 0) {
+ goto done;
+ }
+
+ // We have to have a remote destination!
+ if ((rhost == NULL) || (rserv == NULL)) {
+ rv = NNG_EADDRINVAL;
+ goto done;
+ }
+ } else {
+ rv = nni_tcp_parse_url(buf, &lhost, &lserv, &rhost, &rserv);
+ if (rv != 0) {
+ goto done;
+ }
+ if ((rhost != NULL) || (rserv != NULL)) {
+ // remotes are nonsensical here.
+ rv = NNG_EADDRINVAL;
+ goto done;
+ }
+ if (lserv == NULL) {
+ // missing port to listen on!
+ rv = NNG_EADDRINVAL;
+ goto done;
+ }
}
- if (nni_posix_from_sockaddr(addr, ai->ai_addr) < 0) {
- freeaddrinfo(ai);
- return (NNG_EADDRINVAL);
+ if ((rserv != NULL) || (rhost != NULL)) {
+ nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, 0, &aio);
+ nni_aio_wait(&aio);
+ if ((rv = nni_aio_result(&aio)) != 0) {
+ goto done;
+ }
+ len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]);
+ nni_posix_epdesc_set_remote(ed, &ss, len);
}
- freeaddrinfo(ai);
+
+ if ((lserv != NULL) || (lhost != NULL)) {
+ nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, 1, &aio);
+ nni_aio_wait(&aio);
+ if ((rv = nni_aio_result(&aio)) != 0) {
+ goto done;
+ }
+ len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]);
+ nni_posix_epdesc_set_local(ed, &ss, len);
+ }
+ *epp = (void *) ed;
return (0);
+
+done:
+ if (rv != 0) {
+ nni_posix_epdesc_fini(ed);
+ }
+ nni_aio_fini(&aio);
+ return (rv);
}
void
-nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio)
+nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep)
{
- nni_posix_sock_aio_send((void *) s, aio);
+ nni_posix_epdesc_fini((void *) ep);
}
void
-nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio)
+nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep)
{
- nni_posix_sock_aio_recv((void *) s, aio);
+ nni_posix_epdesc_close((void *) ep);
}
int
-nni_plat_tcp_init(nni_plat_tcpsock **sp)
+nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep)
{
- nni_posix_sock *s;
- int rv;
+ return (nni_posix_epdesc_listen((void *) ep));
+}
- if ((rv = nni_posix_sock_init(&s)) == 0) {
- *sp = (void *) s;
- }
- return (rv);
+
+void
+nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio)
+{
+ return (nni_posix_epdesc_connect((void *) ep, aio));
}
void
-nni_plat_tcp_fini(nni_plat_tcpsock *s)
+nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio)
{
- nni_posix_sock_fini((void *) s);
+ return (nni_posix_epdesc_accept((void *) ep, aio));
}
void
-nni_plat_tcp_shutdown(nni_plat_tcpsock *s)
+nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p)
{
- nni_posix_sock_shutdown((void *) s);
+ nni_posix_pipedesc_fini((void *) p);
}
-int
-nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
+void
+nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p)
{
- return (nni_posix_sock_listen((void *) s, addr));
+ nni_posix_pipedesc_close((void *) p);
}
-// nni_plat_tcp_connect establishes an outbound connection. It the
-// bind address is not null, then it will attempt to bind to the local
-// address specified first.
-int
-nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
- const nni_sockaddr *bindaddr)
+void
+nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio)
{
- return (nni_posix_sock_connect_sync((void *) s, addr, bindaddr));
+ nni_posix_pipedesc_send((void *) p, aio);
}
-int
-nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server)
+void
+nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio)
{
- return (nni_posix_sock_accept_sync((void *) s, (void *) server));
+ nni_posix_pipedesc_recv((void *) p, aio);
}
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index c3e29c33..c509643e 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -8,10 +8,10 @@
//
#include "core/nng_impl.h"
-#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_pollq.h"
#ifdef PLATFORM_POSIX_PIPEDESC
+#include "platform/posix/posix_aio.h"
+#include "platform/posix/posix_pollq.h"
#include <errno.h>
#include <stdlib.h>
@@ -183,6 +183,8 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
if (pd->fd != -1) {
// Let any peer know we are closing.
(void) shutdown(pd->fd, SHUT_RDWR);
+ close(pd->fd);
+ pd->fd = -1;
}
while ((aio = nni_list_first(&pd->readq)) != NULL) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
diff --git a/src/platform/posix/posix_socket.c b/src/platform/posix/posix_socket.c
deleted file mode 100644
index 9e054c42..00000000
--- a/src/platform/posix/posix_socket.c
+++ /dev/null
@@ -1,422 +0,0 @@
-//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef PLATFORM_POSIX_SOCKET
-#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_socket.h"
-
-#include <errno.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/uio.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <arpa/inet.h>
-#include <sys/un.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <netdb.h>
-
-// Solaris/SunOS systems define this, which collides with our symbol
-// names. Just undefine it now.
-#ifdef sun
-#undef sun
-#endif
-
-
-#ifdef SOCK_CLOEXEC
-#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define NNI_STREAM_SOCKTYPE SOCK_STREAM
-#endif
-
-struct nni_posix_sock {
- int fd;
- char * unlink; // path to unlink at unbind
- nni_posix_pipedesc * pd;
- int tcpnodelay;
-};
-
-int
-nni_posix_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa)
-{
- struct sockaddr_in *sin;
- struct sockaddr_un *sun;
-
-#ifdef PF_INET6
- struct sockaddr_in6 *sin6;
-#endif
-
- switch (sa->s_un.s_family) {
- case NNG_AF_INET:
- sin = (void *) ss;
- memset(sin, 0, sizeof (*sin));
- sin->sin_family = PF_INET;
- sin->sin_port = sa->s_un.s_in.sa_port;
- sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr;
- return (sizeof (*sin));
-
-#ifdef PF_INET6
- // Not every platform can do IPv6. Amazingly.
- case NNG_AF_INET6:
- sin6 = (void *) ss;
- memset(sin6, 0, sizeof (*sin6));
-#ifdef SIN6_LEN
- sin6->sin6_len = sizeof (*sin6);
-#endif
- sin6->sin6_family = PF_INET6;
- sin6->sin6_port = sa->s_un.s_in6.sa_port;
- memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16);
- return (sizeof (*sin6));
-
-#endif // PF_INET6
-
- case NNG_AF_IPC:
- sun = (void *) ss;
- memset(sun, 0, sizeof (*sun));
- // NB: This logic does not support abstract sockets, which
- // have their first byte NULL, and rely on length instead.
- // Probably for dealing with abstract sockets we will just
- // handle @ specially in the future.
- if (strlen(sa->s_un.s_path.sa_path) >=
- sizeof (sun->sun_path)) {
- return (-1); // caller converts to NNG_EADDRINVAL
- }
-
- sun->sun_family = PF_UNIX;
- (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s",
- sa->s_un.s_path.sa_path);
- return (sizeof (*sun));
- }
- return (-1);
-}
-
-
-int
-nni_posix_from_sockaddr(nni_sockaddr *sa, const struct sockaddr *ss)
-{
- const struct sockaddr_in *sin;
- const struct sockaddr_un *sun;
-
-#ifdef PF_INET6
- const struct sockaddr_in6 *sin6;
-#endif
-
- memset(sa, 0, sizeof (*sa));
- switch (ss->sa_family) {
- case PF_INET:
- sin = (const void *) ss;
- sa->s_un.s_in.sa_family = NNG_AF_INET;
- sa->s_un.s_in.sa_port = sin->sin_port;
- sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr;
- return (0);
-
-#ifdef PF_INET6
- case PF_INET6:
- sin6 = (const void *) ss;
- sa->s_un.s_in6.sa_family = NNG_AF_INET6;
- sa->s_un.s_in6.sa_port = sin6->sin6_port;
- memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16);
- return (0);
-
-#endif // PF_INET6
-
- case PF_UNIX:
- // NB: This doesn't handle abstract sockets!
- sun = (const void *) ss;
- sa->s_un.s_path.sa_family = NNG_AF_IPC;
- snprintf(sa->s_un.s_path.sa_path,
- sizeof (sa->s_un.s_path.sa_path), "%s", sun->sun_path);
- return (0);
- }
- return (-1);
-}
-
-
-void
-nni_posix_sock_aio_send(nni_posix_sock *s, nni_aio *aio)
-{
- nni_posix_pipedesc_send(s->pd, aio);
-}
-
-
-void
-nni_posix_sock_aio_recv(nni_posix_sock *s, nni_aio *aio)
-{
- nni_posix_pipedesc_recv(s->pd, aio);
-}
-
-
-static void
-nni_posix_sock_setopts_fd(int fd, int tcpnodelay)
-{
- int one;
-
- // Try to ensure that both CLOEXEC is set, and that we don't
- // generate SIGPIPE. (Note that SIGPIPE suppression in this way
- // only works on BSD systems. Linux wants us to use sendmsg().)
- (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
-#if defined(F_SETNOSIGPIPE)
- (void) fcntl(fd, F_SETNOSIGPIPE, 1);
-#elif defined(SO_NOSIGPIPE)
- one = 1;
- (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one));
-#endif
-
- // Also disable Nagle. We are careful to group data with writev,
- // and latency is king for most of our users. (Consider adding
- // a method to enable this later.)
-
- // It's unclear whether this is safe for UNIX domain sockets. It
- // *should* be.
- if (tcpnodelay) {
- one = 1;
- (void) setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one,
- sizeof (one));
- }
-}
-
-
-int
-nni_posix_sock_init(nni_posix_sock **sp)
-{
- nni_posix_sock *s;
-
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
- s->fd = -1;
- *sp = s;
- return (0);
-}
-
-
-void
-nni_posix_sock_fini(nni_posix_sock *s)
-{
- if (s->fd != -1) {
- (void) close(s->fd);
- s->fd = -1;
- }
- if (s->pd != NULL) {
- nni_posix_pipedesc_fini(s->pd);
- }
- if (s->unlink != NULL) {
- (void) unlink(s->unlink);
- nni_free(s->unlink, strlen(s->unlink) + 1);
- }
- NNI_FREE_STRUCT(s);
-}
-
-
-void
-nni_posix_sock_shutdown(nni_posix_sock *s)
-{
- if (s->fd != -1) {
- (void) shutdown(s->fd, SHUT_RDWR);
- // This causes the equivalent of a close. Hopefully waking
- // up anything that didn't get the hint with the shutdown.
- // (macOS does not see the shtudown).
- (void) dup2(nni_plat_devnull, s->fd);
- }
- if (s->pd != NULL) {
- nni_posix_pipedesc_close(s->pd);
- }
-}
-
-
-int
-nni_posix_sock_listen(nni_posix_sock *s, const nni_sockaddr *saddr)
-{
- int len;
- struct sockaddr_storage ss;
- int rv;
- int fd;
-
- if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) {
- return (NNG_EADDRINVAL);
- }
-
- if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
- if ((saddr->s_un.s_family == NNG_AF_INET) ||
- (saddr->s_un.s_family == NNG_AF_INET6)) {
- s->tcpnodelay = 1;
- }
-
- nni_posix_sock_setopts_fd(fd, s->tcpnodelay);
-
- // UNIX DOMAIN SOCKETS -- these have names in the file namespace.
- // We are going to check to see if there was a name already there.
- // If there was, and nothing is listening (ECONNREFUSED), then we
- // will just try to cleanup the old socket. Note that this is not
- // perfect in all scenarios, so use this with caution.
- if ((saddr->s_un.s_family == NNG_AF_IPC) &&
- (saddr->s_un.s_path.sa_path[0] != 0)) {
- int chkfd;
- if ((chkfd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- (void) close(fd);
- return (nni_plat_errno(errno));
- }
-
- // Nonblocking; we don't want to wait for remote server.
- (void) fcntl(chkfd, F_SETFL, O_NONBLOCK);
- if (connect(chkfd, (struct sockaddr *) &ss, len) < 0) {
- if (errno == ECONNREFUSED) {
- (void) unlink(saddr->s_un.s_path.sa_path);
- }
- }
- (void) close(chkfd);
- }
-
- if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
- rv = nni_plat_errno(errno);
- (void) close(fd);
- return (rv);
- }
-
- // For IPC, record the path so we unlink it later
- if ((saddr->s_un.s_family == NNG_AF_IPC) &&
- (saddr->s_un.s_path.sa_path[0] != 0)) {
- s->unlink = nni_alloc(strlen(saddr->s_un.s_path.sa_path) + 1);
- if (s->unlink == NULL) {
- (void) close(fd);
- (void) unlink(saddr->s_un.s_path.sa_path);
- return (NNG_ENOMEM);
- }
- strcpy(s->unlink, saddr->s_un.s_path.sa_path);
- }
-
- // Listen -- 128 depth is probably sufficient. If it isn't, other
- // bad things are going to happen.
- if (listen(fd, 128) != 0) {
- rv = nni_plat_errno(errno);
- (void) close(fd);
- return (rv);
- }
-
- s->fd = fd;
-
- return (0);
-}
-
-
-// These functions will need to be removed in the future. They are
-// transition functions for now.
-
-int
-nni_posix_sock_accept_sync(nni_posix_sock *s, nni_posix_sock *server)
-{
- int fd;
- int rv;
-
- for (;;) {
-#ifdef NNG_USE_ACCEPT4
- fd = accept4(server->fd, NULL, NULL, SOCK_CLOEXEC);
- if ((fd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
- fd = accept(server->fd, NULL, NULL);
- }
-#else
- fd = accept(server->fd, NULL, NULL);
-#endif
-
- if (fd < 0) {
- return (nni_plat_errno(errno));
- } else {
- break;
- }
- }
-
- nni_posix_sock_setopts_fd(fd, s->tcpnodelay);
-
- if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) {
- close(fd);
- return (rv);
- }
- s->fd = fd;
- return (0);
-}
-
-
-int
-nni_posix_sock_connect_sync(nni_posix_sock *s, const nni_sockaddr *addr,
- const nni_sockaddr *bindaddr)
-{
- int fd;
- int len;
- struct sockaddr_storage ss;
- struct sockaddr_storage bss;
- int rv;
-
- if ((len = nni_posix_to_sockaddr(&ss, addr)) < 0) {
- return (NNG_EADDRINVAL);
- }
-
- if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
-
- if ((addr->s_un.s_family == NNG_AF_INET) ||
- (addr->s_un.s_family == NNG_AF_INET6)) {
- s->tcpnodelay = 1;
- }
-
- if (bindaddr != NULL) {
- if (bindaddr->s_un.s_family != addr->s_un.s_family) {
- return (NNG_EINVAL);
- }
- if (nni_posix_to_sockaddr(&bss, bindaddr) < 0) {
- return (NNG_EADDRINVAL);
- }
- if (bind(fd, (struct sockaddr *) &bss, len) < 0) {
- rv = nni_plat_errno(errno);
- (void) close(fd);
- return (rv);
- }
- }
-
- nni_posix_sock_setopts_fd(fd, s->tcpnodelay);
-
- if (connect(fd, (struct sockaddr *) &ss, len) != 0) {
- rv = nni_plat_errno(errno);
- // Unix domain sockets return ENOENT when nothing is there.
- // Massage this into ECONNREFUSED, to provide more consistent
- // behavior.
- if (rv == NNG_ENOENT) {
- rv = NNG_ECONNREFUSED;
- }
- (void) close(fd);
- return (rv);
- }
- if (s->pd != NULL) {
- // If we had a prior pipedesc hanging around, nuke it.
- nni_posix_pipedesc_fini(s->pd);
- s->pd = NULL;
- }
- if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) {
- (void) close(fd);
- return (rv);
- }
- s->fd = fd;
- return (0);
-}
-
-
-#else
-
-// Suppress empty symbols warnings in ranlib.
-int nni_posix_socket_not_used = 0;
-
-#endif // PLATFORM_POSIX_SOCKET
diff --git a/src/platform/posix/posix_socket.h b/src/platform/posix/posix_socket.h
deleted file mode 100644
index f3fb169c..00000000
--- a/src/platform/posix/posix_socket.h
+++ /dev/null
@@ -1,45 +0,0 @@
-//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#ifndef PLATFORM_POSIX_SOCKET_H
-#define PLATFORM_POSIX_SOCKET_H
-
-// This file provides declarations for comment socket handling functions on
-// POSIX platforms. We assume that TCP and Unix domain socket (IPC) all
-// work using mostly comment socket handling routines.
-
-#include "core/nng_impl.h"
-
-#include "platform/posix/posix_aio.h"
-
-#include <sys/types.h>
-#include <sys/socket.h>
-
-typedef struct nni_posix_sock nni_posix_sock;
-
-extern int nni_posix_to_sockaddr(struct sockaddr_storage *,
- const nni_sockaddr *);
-extern int nni_posix_from_sockaddr(nni_sockaddr *, const struct sockaddr *);
-extern void nni_posix_sock_aio_send(nni_posix_sock *, nni_aio *);
-extern void nni_posix_sock_aio_recv(nni_posix_sock *, nni_aio *);
-extern int nni_posix_sock_init(nni_posix_sock **);
-extern void nni_posix_sock_fini(nni_posix_sock *);
-extern void nni_posix_sock_shutdown(nni_posix_sock *);
-extern int nni_posix_sock_listen(nni_posix_sock *, const nni_sockaddr *);
-
-// These functions will need to be removed in the future. They are
-// transition functions for now.
-
-extern int nni_posix_sock_send_sync(nni_posix_sock *, nni_iov *, int);
-extern int nni_posix_sock_recv_sync(nni_posix_sock *, nni_iov *, int);
-extern int nni_posix_sock_accept_sync(nni_posix_sock *, nni_posix_sock *);
-extern int nni_posix_sock_connect_sync(nni_posix_sock *,
- const nni_sockaddr *, const nni_sockaddr *);
-
-#endif // PLATFORM_POSIX_SOCKET_H
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 4137984f..a2f24f8a 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -297,7 +297,7 @@ nni_plat_init(int (*helper)(void))
// probably get by with even just 8k, but Linux usually wants 16k
// as a minimum. If this fails, its not fatal, just we won't be
// as scalable / thrifty with our use of VM.
- (void) pthread_attr_setstacksize(&nni_pthread_attr, 16384);
+ //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384);
if ((rv = nni_posix_pollq_sysinit()) != 0) {
pthread_mutex_unlock(&nni_plat_lock);
@@ -316,7 +316,6 @@ nni_plat_init(int (*helper)(void))
pthread_condattr_destroy(&nni_cvattr);
pthread_attr_destroy(&nni_pthread_attr);
return (rv);
-
}
if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) {
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 52ed582a..0489ea38 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -49,6 +49,7 @@ struct nni_inproc_ep {
char addr[NNG_MAXADDRLEN+1];
int mode;
int closed;
+ int started;
nni_list_node node;
uint16_t proto;
nni_cv cv;
@@ -217,7 +218,7 @@ nni_inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
static int
-nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock)
+nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
{
nni_inproc_ep *ep;
int rv;
@@ -229,8 +230,9 @@ nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock)
return (NNG_ENOMEM);
}
- ep->mode = NNI_INPROC_EP_IDLE;
+ ep->mode = mode;
ep->closed = 0;
+ ep->started = 0;
ep->proto = nni_sock_proto(sock);
NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node);
nni_aio_list_init(&ep->aios);
@@ -365,7 +367,6 @@ nni_inproc_accept_clients(nni_inproc_ep *server)
pair->pipes[1]->peer = pair->pipes[0]->proto;
pair->pipes[0]->peer = pair->pipes[1]->proto;
pair->refcnt = 2;
- client->mode = NNI_INPROC_EP_IDLE; // XXX: ??
nni_inproc_conn_finish(caio, 0);
nni_inproc_conn_finish(saio, 0);
@@ -390,12 +391,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
nni_inproc_ep *server;
int rv;
- if (ep->mode != NNI_INPROC_EP_IDLE) {
+ if (ep->mode != NNI_EP_MODE_DIAL) {
nni_aio_finish(aio, NNG_EINVAL, 0);
return;
}
+ if (ep->started) {
+ nni_aio_finish(aio, NNG_EBUSY, 0);
+ return;
+ }
if (nni_list_active(&ep->clients, ep)) {
- nni_aio_finish(aio, NNG_EINVAL, 0);
+ nni_aio_finish(aio, NNG_EBUSY, 0);
return;
}
@@ -421,7 +426,8 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
// Find a server.
NNI_LIST_FOREACH (&nni_inproc.servers, server) {
- if (server->mode != NNI_INPROC_EP_LISTEN) {
+ if ((server->mode != NNI_EP_MODE_LISTEN) ||
+ (server->started == 0)) {
continue;
}
if (strcmp(server->addr, ep->addr) == 0) {
@@ -434,7 +440,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
return;
}
- ep->mode = NNI_INPROC_EP_DIAL;
nni_list_append(&server->clients, ep);
nni_aio_list_append(&ep->aios, aio);
@@ -450,16 +455,21 @@ nni_inproc_ep_bind(void *arg)
nni_inproc_ep *srch;
nni_list *list = &nni_inproc.servers;
- if (ep->mode != NNI_INPROC_EP_IDLE) {
+ if (ep->mode != NNI_EP_MODE_LISTEN) {
return (NNG_EINVAL);
}
nni_mtx_lock(&nni_inproc.mx);
+ if (ep->started) {
+ nni_mtx_unlock(&nni_inproc.mx);
+ return (NNG_EBUSY);
+ }
if (ep->closed) {
nni_mtx_unlock(&nni_inproc.mx);
return (NNG_ECLOSED);
}
NNI_LIST_FOREACH (list, srch) {
- if (srch->mode != NNI_INPROC_EP_LISTEN) {
+ if ((srch->mode != NNI_EP_MODE_LISTEN) ||
+ (!srch->started)) {
continue;
}
if (strcmp(srch->addr, ep->addr) == 0) {
@@ -467,7 +477,7 @@ nni_inproc_ep_bind(void *arg)
return (NNG_EADDRINUSE);
}
}
- ep->mode = NNI_INPROC_EP_LISTEN;
+ ep->started = 1;
nni_list_append(list, ep);
nni_mtx_unlock(&nni_inproc.mx);
return (0);
@@ -481,10 +491,9 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio)
nni_inproc_pipe *pipe;
int rv;
- if (ep->mode != NNI_INPROC_EP_LISTEN) {
+ if (ep->mode != NNI_EP_MODE_LISTEN) {
nni_aio_finish(aio, NNG_EINVAL, 0);
}
-
if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) {
nni_aio_finish(aio, rv, 0);
return;
@@ -494,13 +503,16 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio)
aio->a_pipe = pipe;
// We are already on the master list of servers, thanks to bind.
-
if (ep->closed) {
// This is the only possible error path from the
// time we acquired the lock.
nni_inproc_conn_finish(aio, NNG_ECLOSED);
return;
}
+ if (!ep->started) {
+ nni_inproc_conn_finish(aio, NNG_ESTATE);
+ return;
+ }
// Insert us into the pending server aios, and then run the
// accept list.
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index d9ddeb2d..fcfe44ea 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -493,7 +493,7 @@ nni_ipc_ep_fini(void *arg)
static int
-nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock)
+nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
{
nni_ipc_ep *ep;
int rv;
@@ -508,7 +508,7 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock)
}
if (((rv = nni_mtx_init(&ep->mtx)) != 0) ||
((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) ||
- ((rv = nni_plat_ipc_ep_init(&ep->iep, url)) != 0)) {
+ ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0)) {
nni_ipc_ep_fini(ep);
return (rv);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index a43763e2..237b0c07 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -22,7 +22,7 @@ typedef struct nni_tcp_ep nni_tcp_ep;
// nni_tcp_pipe is one end of a TCP connection.
struct nni_tcp_pipe {
const char * addr;
- nni_plat_tcpsock * tsp;
+ nni_plat_tcp_pipe * tpp;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
@@ -46,17 +46,21 @@ struct nni_tcp_pipe {
struct nni_tcp_ep {
char addr[NNG_MAXADDRLEN+1];
- nni_plat_tcpsock * tsp;
+ nni_plat_tcp_ep * tep;
int closed;
uint16_t proto;
size_t rcvmax;
int ipv4only;
+ nni_aio aio;
+ nni_aio * user_aio;
+ nni_mtx mtx;
};
static void nni_tcp_pipe_send_cb(void *);
static void nni_tcp_pipe_recv_cb(void *);
static void nni_tcp_pipe_nego_cb(void *);
+static void nni_tcp_ep_cb(void *arg);
static int
nni_tcp_tran_init(void)
@@ -76,7 +80,7 @@ nni_tcp_pipe_close(void *arg)
{
nni_tcp_pipe *pipe = arg;
- nni_plat_tcp_shutdown(pipe->tsp);
+ nni_plat_tcp_pipe_close(pipe->tpp);
}
@@ -88,8 +92,8 @@ nni_tcp_pipe_fini(void *arg)
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
nni_aio_fini(&pipe->negaio);
- if (pipe->tsp != NULL) {
- nni_plat_tcp_fini(pipe->tsp);
+ if (pipe->tpp != NULL) {
+ nni_plat_tcp_pipe_fini(pipe->tpp);
}
if (pipe->rxmsg) {
nni_msg_free(pipe->rxmsg);
@@ -100,7 +104,7 @@ nni_tcp_pipe_fini(void *arg)
static int
-nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep)
+nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
{
nni_tcp_pipe *pipe;
int rv;
@@ -111,9 +115,6 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep)
if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
goto fail;
}
- if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) {
- goto fail;
- }
rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe);
if (rv != 0) {
goto fail;
@@ -128,6 +129,9 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep)
}
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
+ pipe->tpp = tpp;
+ pipe->addr = ep->addr;
+
*pipep = pipe;
return (0);
@@ -175,7 +179,7 @@ nni_tcp_pipe_nego_cb(void *arg)
aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead;
aio->a_iov[0].iov_buf = &pipe->txlen[pipe->gottxhead];
// send it down...
- nni_plat_tcp_aio_send(pipe->tsp, aio);
+ nni_plat_tcp_pipe_send(pipe->tpp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -183,7 +187,7 @@ nni_tcp_pipe_nego_cb(void *arg)
aio->a_niov = 1;
aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead;
aio->a_iov[0].iov_buf = &pipe->rxlen[pipe->gotrxhead];
- nni_plat_tcp_aio_recv(pipe->tsp, aio);
+ nni_plat_tcp_pipe_recv(pipe->tpp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -296,7 +300,7 @@ nni_tcp_pipe_recv_cb(void *arg)
pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
pipe->rxaio.a_niov = 1;
- nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio);
+ nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -353,7 +357,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
pipe->txaio.a_niov = 3;
- nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio);
+ nni_plat_tcp_pipe_send(pipe->tpp, &pipe->txaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -392,7 +396,7 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen);
pipe->rxaio.a_niov = 1;
- nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio);
+ nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -431,59 +435,10 @@ nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
static int
-nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock)
-{
- nni_tcp_ep *ep;
- int rv;
-
- if (strlen(url) > NNG_MAXADDRLEN-1) {
- return (NNG_EADDRINVAL);
- }
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
- }
- ep->closed = 0;
- ep->proto = nni_sock_proto(sock);
- ep->ipv4only = 0; // XXX: FIXME
- ep->rcvmax = nni_sock_rcvmaxsz(sock);
-
- if ((rv = nni_plat_tcp_init(&ep->tsp)) != 0) {
- NNI_FREE_STRUCT(ep);
- return (rv);
- }
-
- (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
-
- *epp = ep;
- return (0);
-}
-
-
-static void
-nni_tcp_ep_fini(void *arg)
+nni_tcp_parse_pair(char *pair, char **hostp, char **servp)
{
- nni_tcp_ep *ep = arg;
-
- nni_plat_tcp_fini(ep->tsp);
- NNI_FREE_STRUCT(ep);
-}
-
-
-static void
-nni_tcp_ep_close(void *arg)
-{
- nni_tcp_ep *ep = arg;
-
- nni_plat_tcp_shutdown(ep->tsp);
-}
-
-
-static int
-nni_parseaddr(char *pair, char **hostp, uint16_t *portp)
-{
- char *host, *port, *end;
+ char *host, *serv, *end;
char c;
- int val;
if (pair[0] == '[') {
host = pair+1;
@@ -492,40 +447,73 @@ nni_parseaddr(char *pair, char **hostp, uint16_t *portp)
return (NNG_EADDRINVAL);
}
*end = '\0';
- port = end + 1;
- if (*port == ':') {
- port++;
- } else if (port != '\0') {
+ serv = end + 1;
+ if (*serv == ':') {
+ serv++;
+ } else if (serv != '\0') {
return (NNG_EADDRINVAL);
}
} else {
host = pair;
- port = strchr(host, ':');
- if (port != NULL) {
- *port = '\0';
- port++;
+ serv = strchr(host, ':');
+ if (serv != NULL) {
+ *serv = '\0';
+ serv++;
}
}
- val = 0;
- while ((c = *port) != '\0') {
- val *= 10;
- if ((c >= '0') && (c <= '9')) {
- val += (c - '0');
+ if (hostp != NULL) {
+ if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) {
+ *hostp = NULL;
} else {
- return (NNG_EADDRINVAL);
+ *hostp = host;
}
- if (val > 65535) {
- return (NNG_EADDRINVAL);
+ }
+ if (servp != NULL) {
+ if (strlen(serv) == 0) {
+ *servp = NULL;
+ } else {
+ *servp = serv;
}
- port++;
}
- if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) {
- *hostp = NULL;
+ // Stash the port in big endian (network) byte order.
+ return (0);
+}
+
+
+// Note that the url *must* be in a modifiable buffer.
+int
+nni_tcp_parse_url(char *url, char **host1, char **serv1, char **host2,
+ char **serv2)
+{
+ char *h1;
+ int rv;
+
+ if (strncmp(url, "tcp://", strlen("tcp://")) != 0) {
+ return (NNG_EADDRINVAL);
+ }
+ url += strlen("tcp://");
+ if ((h1 = strchr(url, ';')) != 0) {
+ // For these we want the second part first, because
+ // the "primary" address is the remote address, and the
+ // "secondary" is the local (bind) address. This is only
+ // used for dial side.
+ *h1 = '\0';
+ h1++;
+ if (((rv = nni_tcp_parse_pair(h1, host1, serv1)) != 0) ||
+ ((rv = nni_tcp_parse_pair(url, host2, serv2)) != 0)) {
+ return (rv);
+ }
} else {
- *hostp = host;
+ if (host2 != NULL) {
+ *host2 = NULL;
+ }
+ if (serv2 != NULL) {
+ *serv2 = NULL;
+ }
+ if ((rv = nni_tcp_parse_pair(url, host1, serv1)) != 0) {
+ return (rv);
+ }
}
- // Stash the port in big endian (network) byte order.
- NNI_PUT16((uint8_t *) portp, val);
return (0);
}
@@ -556,11 +544,12 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
nni_mtx_unlock(&pipe->mtx);
return;
}
- nni_plat_tcp_aio_send(pipe->tsp, &pipe->negaio);
+ nni_plat_tcp_pipe_send(pipe->tpp, &pipe->negaio);
nni_mtx_unlock(&pipe->mtx);
}
+#if 0
static int
nni_tcp_ep_connect_sync(void *arg, void **pipep)
{
@@ -659,23 +648,162 @@ nni_tcp_ep_bind(void *arg)
}
+#endif
+
+static void
+nni_tcp_ep_fini(void *arg)
+{
+ nni_tcp_ep *ep = arg;
+
+ if (ep->tep != NULL) {
+ nni_plat_tcp_ep_fini(ep->tep);
+ }
+ nni_aio_fini(&ep->aio);
+ nni_mtx_fini(&ep->mtx);
+ NNI_FREE_STRUCT(ep);
+}
+
+
static int
-nni_tcp_ep_accept_sync(void *arg, void **pipep)
+nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
+{
+ nni_tcp_ep *ep;
+ int rv;
+
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (((rv = nni_mtx_init(&ep->mtx)) != 0) ||
+ ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) ||
+ ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0)) {
+ nni_tcp_ep_fini(ep);
+ return (rv);
+ }
+ ep->closed = 0;
+ ep->proto = nni_sock_proto(sock);
+ ep->rcvmax = nni_sock_rcvmaxsz(sock);
+ (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
+
+ *epp = ep;
+ return (0);
+}
+
+
+static void
+nni_tcp_ep_close(void *arg)
{
nni_tcp_ep *ep = arg;
+
+ nni_plat_tcp_ep_close(ep->tep);
+}
+
+
+static int
+nni_tcp_ep_bind(void *arg)
+{
+ nni_tcp_ep *ep = arg;
+
+ return (nni_plat_tcp_ep_listen(ep->tep));
+}
+
+
+static void
+nni_tcp_ep_finish(nni_tcp_ep *ep)
+{
+ nni_aio *aio = ep->user_aio;
nni_tcp_pipe *pipe;
int rv;
- if ((rv = nni_tcp_pipe_init(&pipe, ep)) != 0) {
- return (rv);
+ if ((aio = ep->user_aio) == NULL) {
+ return;
+ }
+ ep->user_aio = NULL;
+ if ((rv = nni_aio_result(&ep->aio)) != 0) {
+ goto done;
}
+ NNI_ASSERT(ep->aio.a_pipe != NULL);
- if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) {
- nni_tcp_pipe_fini(pipe);
- return (rv);
+ // Attempt to allocate the parent pipe. If this fails we'll
+ // drop the connection (ENOMEM probably).
+ if ((rv = nni_tcp_pipe_init(&pipe, ep, ep->aio.a_pipe)) != 0) {
+ nni_plat_tcp_pipe_fini(ep->aio.a_pipe);
+ goto done;
}
- *pipep = pipe;
- return (0);
+
+ aio->a_pipe = pipe;
+
+done:
+ ep->aio.a_pipe = NULL;
+ nni_aio_finish(aio, rv, 0);
+}
+
+
+static void
+nni_tcp_ep_cb(void *arg)
+{
+ nni_tcp_ep *ep = arg;
+
+ nni_mtx_lock(&ep->mtx);
+ nni_tcp_ep_finish(ep);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+
+static void
+nni_tcp_cancel_ep(nni_aio *aio)
+{
+ nni_tcp_ep *ep = aio->a_prov_data;
+
+ nni_mtx_lock(&ep->mtx);
+ if (ep->user_aio == aio) {
+ ep->user_aio = NULL;
+ }
+ nni_aio_stop(&ep->aio);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+
+static void
+nni_tcp_ep_accept(void *arg, nni_aio *aio)
+{
+ nni_tcp_ep *ep = arg;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ ep->user_aio = aio;
+
+ // If we can't start, then its dying and we can't report either,
+ if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) {
+ ep->user_aio = NULL;
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ nni_plat_tcp_ep_accept(ep->tep, &ep->aio);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+
+static void
+nni_tcp_ep_connect(void *arg, nni_aio *aio)
+{
+ nni_tcp_ep *ep = arg;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ ep->user_aio = aio;
+
+ // If we can't start, then its dying and we can't report either,
+ if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) {
+ ep->user_aio = NULL;
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ nni_plat_tcp_ep_connect(ep->tep, &ep->aio);
+ nni_mtx_unlock(&ep->mtx);
}
@@ -690,14 +818,14 @@ static nni_tran_pipe nni_tcp_pipe_ops = {
};
static nni_tran_ep nni_tcp_ep_ops = {
- .ep_init = nni_tcp_ep_init,
- .ep_fini = nni_tcp_ep_fini,
- .ep_connect_sync = nni_tcp_ep_connect_sync,
- .ep_bind = nni_tcp_ep_bind,
- .ep_accept_sync = nni_tcp_ep_accept_sync,
- .ep_close = nni_tcp_ep_close,
- .ep_setopt = NULL,
- .ep_getopt = NULL,
+ .ep_init = nni_tcp_ep_init,
+ .ep_fini = nni_tcp_ep_fini,
+ .ep_connect = nni_tcp_ep_connect,
+ .ep_bind = nni_tcp_ep_bind,
+ .ep_accept = nni_tcp_ep_accept,
+ .ep_close = nni_tcp_ep_close,
+ .ep_setopt = NULL,
+ .ep_getopt = NULL,
};
// This is the TCP transport linkage, and should be the only global