aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-07 00:08:24 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-07 00:08:24 -0700
commit3730260da3744b549aaa1fe13946a674f924f63c (patch)
tree902866876ee71246a299370cbe8f6580d758525c /src/core
parent3b19940dfcd5d3585b1fb1dcf7915a748ae67289 (diff)
downloadnng-3730260da3744b549aaa1fe13946a674f924f63c.tar.gz
nng-3730260da3744b549aaa1fe13946a674f924f63c.tar.bz2
nng-3730260da3744b549aaa1fe13946a674f924f63c.zip
TCP asynchronous working now.
It turns out that I had to fix a number of subtle asynchronous handling bugs, but now TCP is fully asynchronous. We need to change the high-level dial and listen interfaces to be async as well. Some of the transport APIs have changed here, and I've elected to change what we expose to consumers as endpoints into seperate dialers and listeners. Under the hood they are the same, but it turns out that its helpful to know the intended use of the endpoint at initialization time. Scalability still occasionally hangs on Linux. Investigation pending.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c26
-rw-r--r--src/core/endpt.h4
-rw-r--r--src/core/platform.h89
-rw-r--r--src/core/socket.c4
-rw-r--r--src/core/transport.c1
-rw-r--r--src/core/transport.h2
6 files changed, 67 insertions, 59 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 20874c90..e5a9a5bb 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -142,7 +142,7 @@ nni_ep_dtor(void *ptr)
int
-nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
+nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
{
nni_tran *tran;
nni_ep *ep;
@@ -162,6 +162,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
}
ep->ep_sock = sock;
ep->ep_tran = tran;
+ ep->ep_mode = mode;
// Could safely use strcpy here, but this avoids discussion.
(void) snprintf(ep->ep_addr, sizeof (ep->ep_addr), "%s", addr);
@@ -172,7 +173,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
ep->ep_ops = *tran->tran_ep;
- if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock)) != 0) {
+ if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock, mode)) != 0) {
nni_objhash_unref(nni_eps, id);
return (rv);
}
@@ -391,7 +392,11 @@ nni_ep_dial(nni_ep *ep, int flags)
int rv = 0;
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_mode != NNI_EP_MODE_IDLE) {
+ if (ep->ep_mode != NNI_EP_MODE_DIAL) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ return (NNG_ENOTSUP);
+ }
+ if (ep->ep_started) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_EBUSY);
}
@@ -404,14 +409,14 @@ nni_ep_dial(nni_ep *ep, int flags)
nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
- ep->ep_mode = NNI_EP_MODE_DIAL;
+ ep->ep_started = 1;
if (flags & NNG_FLAG_SYNCH) {
nni_mtx_unlock(&ep->ep_mtx);
rv = nni_ep_connect_sync(ep);
if (rv != 0) {
nni_thr_fini(&ep->ep_thr);
- ep->ep_mode = NNI_EP_MODE_IDLE;
+ ep->ep_started = 0;
return (rv);
}
nni_mtx_lock(&ep->ep_mtx);
@@ -561,11 +566,14 @@ nni_ep_listen(nni_ep *ep, int flags)
int rv = 0;
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_mode != NNI_EP_MODE_IDLE) {
+ if (ep->ep_mode != NNI_EP_MODE_LISTEN) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ return (NNG_ENOTSUP);
+ }
+ if (ep->ep_started) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_EBUSY);
}
-
if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
@@ -576,14 +584,14 @@ nni_ep_listen(nni_ep *ep, int flags)
return (rv);
}
- ep->ep_mode = NNI_EP_MODE_LISTEN;
+ ep->ep_started = 1;
if (flags & NNG_FLAG_SYNCH) {
nni_mtx_unlock(&ep->ep_mtx);
rv = ep->ep_ops.ep_bind(ep->ep_data);
if (rv != 0) {
nni_thr_fini(&ep->ep_thr);
- ep->ep_mode = NNI_EP_MODE_IDLE;
+ ep->ep_started = 0;
return (rv);
}
nni_mtx_lock(&ep->ep_mtx);
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 3d353cdf..becaa3f4 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -28,6 +28,7 @@ struct nni_ep {
char ep_addr[NNG_MAXADDRLEN];
nni_thr ep_thr;
int ep_mode;
+ int ep_started;
int ep_closed; // full shutdown
int ep_bound; // true if we bound locally
nni_mtx ep_mtx;
@@ -36,7 +37,6 @@ struct nni_ep {
nni_list ep_pipes;
};
-#define NNI_EP_MODE_IDLE 0
#define NNI_EP_MODE_DIAL 1
#define NNI_EP_MODE_LISTEN 2
@@ -46,7 +46,7 @@ extern int nni_ep_find(nni_ep **, uint32_t);
extern void nni_ep_hold(nni_ep *);
extern void nni_ep_rele(nni_ep *);
extern uint32_t nni_ep_id(nni_ep *);
-extern int nni_ep_create(nni_ep **, nni_sock *, const char *);
+extern int nni_ep_create(nni_ep **, nni_sock *, const char *, int);
extern void nni_ep_stop(nni_ep *);
extern void nni_ep_close(nni_ep *);
extern void nni_ep_remove(nni_ep *);
diff --git a/src/core/platform.h b/src/core/platform.h
index 0dcab40a..d5c6aa8a 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -200,47 +200,47 @@ extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int);
// TCP Support.
//
-// nni_plat_tcp_init initializes the socket, for example it can
-// set underlying file descriptors to -1, etc.
-extern int nni_plat_tcp_init(nni_plat_tcpsock **);
-
-// nni_plat_tcp_fini just closes a TCP socket, and releases any related
-// resources.
-extern void nni_plat_tcp_fini(nni_plat_tcpsock *);
-
-// nni_plat_tcp_shutdown performs a shutdown of the socket. For
-// BSD sockets, this closes both sides of the TCP connection gracefully,
-// but the underlying file descriptor is left open. (This part is critical
-// to prevention of close() related races.)
-extern void nni_plat_tcp_shutdown(nni_plat_tcpsock *);
-
-// nni_plat_tcp_listen creates a TCP socket in listening mode, bound
-// to the specified address. Note that nni_plat_tcpsock should be defined
-// to whatever your platform uses. For most systems its just "int".
-extern int nni_plat_tcp_listen(nni_plat_tcpsock *, const nni_sockaddr *);
-
-// nni_plat_tcp_accept does the accept to accept an inbound connection.
-// The tcpsock used for the server will have been set up with the
-// nni_plat_tcp_listen.
-extern int nni_plat_tcp_accept(nni_plat_tcpsock *, nni_plat_tcpsock *);
-
-// nni_plat_tcp_connect is the client side. Two addresses are supplied,
-// as the client may specify a local address to which to bind. This
-// second address may be NULL to use ephemeral ports, which is the
-// usual default.
-extern int nni_plat_tcp_connect(nni_plat_tcpsock *, const nni_sockaddr *,
- const nni_sockaddr *);
-
-// nni_plat_tcp_aio_send sends the data to the remote side asynchronously.
-// The data to send is stored in the a_iov field of the aio, and the array
-// of iovs will never be larger than 4. The platform may modify the iovs,
-// or the iov list.
-extern void nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *);
-
-// nni_plat_tcp_aio_recv recvs data into the buffers provided by the
-// iovs. The implementation does not return until the iovs are completely
-// full, or an error condition occurs.
-extern void nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *);
+typedef struct nni_plat_tcp_ep nni_plat_tcp_ep;
+typedef struct nni_plat_tcp_pipe nni_plat_tcp_pipe;
+
+// nni_plat_tcp_ep_init creates a new endpoint associated with the url.
+extern int nni_plat_tcp_ep_init(nni_plat_tcp_ep **, const char *, int);
+
+// nni_plat_tcp_ep_fini closes the endpoint and releases resources.
+extern void nni_plat_tcp_ep_fini(nni_plat_tcp_ep *);
+
+// nni_plat_tcp_ep_close closes the endpoint; this might not close the
+// actual underlying socket, but it should call shutdown on it.
+// Further operations on the pipe should return NNG_ECLOSED.
+extern void nni_plat_tcp_ep_close(nni_plat_tcp_ep *);
+
+// nni_plat_tcp_listen creates an TCP socket in listening mode, bound
+// to the specified path.
+extern int nni_plat_tcp_ep_listen(nni_plat_tcp_ep *);
+
+// nni_plat_tcp_ep_accept starts an accept to receive an incoming connection.
+// An accepted connection will be passed back in the a_pipe member.
+extern void nni_plat_tcp_ep_accept(nni_plat_tcp_ep *, nni_aio *);
+
+// nni_plat_tcp_connect is the client side.
+// An accepted connection will be passed back in the a_pipe member.
+extern void nni_plat_tcp_ep_connect(nni_plat_tcp_ep *, nni_aio *);
+
+// nni_plat_tcp_pipe_fini closes the pipe, and releases all resources
+// associated with it.
+extern void nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *);
+
+// nni_plat_tcp_pipe_close closes the socket, or at least shuts it down.
+// Further operations on the pipe should return NNG_ECLOSED.
+extern void nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *);
+
+// nni_plat_tcp_pipe_send sends data in the iov buffers to the peer.
+// The platform may modify the iovs.
+extern void nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *, nni_aio *);
+
+// nni_plat_tcp_pipe_recv recvs data into the buffers provided by the iovs.
+// The platform may modify the iovs.
+extern void nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *, nni_aio *);
// nni_plat_tcp_resolv resolves a TCP name asynchronously. The family
// should be one of NNG_AF_INET, NNG_AF_INET6, or NNG_AF_UNSPEC. The
@@ -258,8 +258,8 @@ extern void nni_plat_tcp_resolv(const char *, const char *, int, int,
typedef struct nni_plat_ipc_ep nni_plat_ipc_ep;
typedef struct nni_plat_ipc_pipe nni_plat_ipc_pipe;
-// nni_plat_ipc_ep_init creates a new endpoint associated with the path.
-extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const char *);
+// nni_plat_ipc_ep_init creates a new endpoint associated with the url.
+extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const char *, int);
// nni_plat_ipc_ep_fini closes the endpoint and releases resources.
extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *);
@@ -270,8 +270,7 @@ extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *);
extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *);
// nni_plat_tcp_listen creates an IPC socket in listening mode, bound
-// to the specified path. Note that nni_plat_ipcsock should be defined
-// to whatever your platform uses. For most systems its just "int".
+// to the specified path.
extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *);
// nni_plat_ipc_ep_accept starts an accept to receive an incoming connection.
diff --git a/src/core/socket.c b/src/core/socket.c
index d9d9ae3b..66f5c63b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -775,7 +775,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
- if ((rv = nni_ep_create(&ep, sock, addr)) != 0) {
+ if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) {
return (rv);
}
@@ -795,7 +795,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
- if ((rv = nni_ep_create(&ep, sock, addr)) != 0) {
+ if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) {
return (rv);
}
diff --git a/src/core/transport.c b/src/core/transport.c
index de94596c..ce61f722 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -24,6 +24,7 @@ static nni_tran *transports[] = {
NULL
};
+
nni_tran *
nni_tran_find(const char *addr)
{
diff --git a/src/core/transport.h b/src/core/transport.h
index dd4da299..8098e5c2 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -40,7 +40,7 @@ struct nni_tran {
struct nni_tran_ep {
// ep_init creates a vanilla endpoint. The value created is
// used for the first argument for all other endpoint functions.
- int (*ep_init)(void **, const char *, nni_sock *);
+ int (*ep_init)(void **, const char *, nni_sock *, int);
// ep_fini frees the resources associated with the endpoint.
// The endpoint will already have been closed.