aboutsummaryrefslogtreecommitdiff
path: root/src/transport/tcp
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-29 13:07:35 -0700
committerGarrett D'Amore <garrett@damore.org>2017-03-29 13:07:35 -0700
commit374f93a18edca2e0656c337a5b54927169ec31fa (patch)
treecbaef995db10cfafd795953be203de744dc688c9 /src/transport/tcp
parent6091cf7e1c030417e1fd29c66160e71bcbe4f984 (diff)
downloadnng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.gz
nng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.bz2
nng-374f93a18edca2e0656c337a5b54927169ec31fa.zip
TCP (POSIX) async send/recv working. Other changes.
Transport-level pipe initialization is now sepearate and explicit. The POSIX send/recv logic still uses threads under the hood, but makes use of the AIO framework for send/recv. This is a key stepping stone towards enabling poll() or similar async I/O approaches.
Diffstat (limited to 'src/transport/tcp')
-rw-r--r--src/transport/tcp/tcp.c276
1 files changed, 223 insertions, 53 deletions
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index e198a927..f220e49e 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -22,21 +22,34 @@ typedef struct nni_tcp_ep nni_tcp_ep;
// nni_tcp_pipe is one end of a TCP connection.
struct nni_tcp_pipe {
const char * addr;
- nni_plat_tcpsock fd;
+ nni_plat_tcpsock * tsp;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
+
+ nni_aio * user_txaio;
+ nni_aio * user_rxaio;
+
+ uint8_t txlen[sizeof (uint64_t)];
+ uint8_t rxlen[sizeof (uint64_t)];
+ nni_aio txaio;
+ nni_aio rxaio;
+ nni_msg * rxmsg;
};
struct nni_tcp_ep {
char addr[NNG_MAXADDRLEN+1];
- nni_plat_tcpsock fd;
+ nni_plat_tcpsock * tsp;
int closed;
uint16_t proto;
size_t rcvmax;
int ipv4only;
};
+
+static void nni_tcp_pipe_send_cb(void *);
+static void nni_tcp_pipe_recv_cb(void *);
+
static int
nni_tcp_tran_init(void)
{
@@ -55,23 +68,201 @@ nni_tcp_pipe_close(void *arg)
{
nni_tcp_pipe *pipe = arg;
- nni_plat_tcp_shutdown(&pipe->fd);
+ nni_plat_tcp_shutdown(pipe->tsp);
+}
+
+
+static int
+nni_tcp_pipe_init(void **argp)
+{
+ nni_tcp_pipe *pipe;
+ int rv;
+
+ if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+ rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe);
+ if (rv != 0) {
+ nni_plat_tcp_fini(pipe->tsp);
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+ rv = nni_aio_init(&pipe->rxaio, nni_tcp_pipe_recv_cb, pipe);
+ if (rv != 0) {
+ nni_aio_fini(&pipe->txaio);
+ nni_plat_tcp_fini(pipe->tsp);
+ NNI_FREE_STRUCT(pipe);
+ }
+ *argp = pipe;
+ return (0);
}
static void
-nni_tcp_pipe_destroy(void *arg)
+nni_tcp_pipe_fini(void *arg)
{
nni_tcp_pipe *pipe = arg;
- nni_plat_tcp_fini(&pipe->fd);
+ nni_aio_fini(&pipe->rxaio);
+ nni_aio_fini(&pipe->txaio);
+ nni_plat_tcp_fini(pipe->tsp);
NNI_FREE_STRUCT(pipe);
}
+static void
+nni_tcp_pipe_send_cb(void *arg)
+{
+ nni_tcp_pipe *pipe = arg;
+ int rv;
+ nni_aio *aio;
+ size_t len;
+
+ if ((aio = pipe->user_txaio) == NULL) {
+ // This should never ever happen.
+ NNI_ASSERT(aio != NULL);
+ return;
+ }
+ pipe->user_txaio = NULL;
+
+ if ((rv = nni_aio_result(&pipe->txaio)) != 0) {
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ len = nni_msg_len(aio->a_msg);
+ nni_msg_free(aio->a_msg);
+ aio->a_msg = NULL;
+
+ nni_aio_finish(aio, 0, len);
+}
+
+
+static void
+nni_tcp_pipe_recv_cb(void *arg)
+{
+ nni_tcp_pipe *pipe = arg;
+ nni_aio *aio;
+ int rv;
+
+ aio = pipe->user_rxaio;
+ if (aio == NULL) {
+ // This should never ever happen.
+ NNI_ASSERT(aio != NULL);
+ return;
+ }
+
+ if ((rv = nni_aio_result(&pipe->rxaio)) != 0) {
+ // Error on receive. This has to cause an error back
+ // to the user. Also, if we had allocated an rxmsg, lets
+ // toss it.
+ if (pipe->rxmsg != NULL) {
+ nni_msg_free(pipe->rxmsg);
+ pipe->rxmsg = NULL;
+ }
+ pipe->user_rxaio = NULL;
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ // If we don't have a message yet, we were reading the TCP message
+ // header, which is just the length. This tells us the size of the
+ // message to allocate and how much more to expect.
+ if (pipe->rxmsg == NULL) {
+ uint64_t len;
+ // We should have gotten a message header.
+ NNI_GET64(pipe->rxlen, len);
+
+ // Make sure the message payload is not too big. If it is
+ // the caller will shut down the pipe.
+ if (len > pipe->rcvmax) {
+ pipe->user_rxaio = NULL;
+ nni_aio_finish(aio, NNG_EMSGSIZE, 0);
+ return;
+ }
+
+ if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
+ pipe->user_rxaio = NULL;
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ // Submit the rest of the data for a read -- we want to
+ // read the entire message now.
+ pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
+ pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
+ pipe->rxaio.a_niov = 1;
+
+ rv = nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio);
+ if (rv != 0) {
+ pipe->user_rxaio = NULL;
+ nni_msg_free(pipe->rxmsg);
+ pipe->rxmsg = NULL;
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+ return;
+ }
+
+ // Otherwise we got a message read completely. Let the user know the
+ // good news.
+ pipe->user_rxaio = NULL;
+ aio->a_msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
+ nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg));
+}
+
+
+static int
+nni_tcp_pipe_aio_send(void *arg, nni_aio *aio)
+{
+ nni_tcp_pipe *pipe = arg;
+ nni_msg *msg = aio->a_msg;
+ uint64_t len;
+
+ pipe->user_txaio = aio;
+
+ len = nni_msg_len(msg) + nni_msg_header_len(msg);
+ NNI_PUT64(pipe->txlen, len);
+
+ pipe->txaio.a_iov[0].iov_buf = pipe->txlen;
+ pipe->txaio.a_iov[0].iov_len = sizeof (pipe->txlen);
+ pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
+ pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
+ pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
+ pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
+ pipe->txaio.a_niov = 3;
+
+ return (nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio));
+}
+
+
+static int
+nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio)
+{
+ nni_tcp_pipe *pipe = arg;
+
+ pipe->user_rxaio = aio;
+
+ NNI_ASSERT(pipe->rxmsg == NULL);
+
+ // Schedule a read of the TCP header.
+ pipe->rxaio.a_iov[0].iov_buf = pipe->rxlen;
+ pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen);
+ pipe->rxaio.a_niov = 1;
+
+ return (nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio));
+}
+
+
static int
nni_tcp_pipe_send(void *arg, nni_msg *msg)
{
+#if 0
nni_tcp_pipe *pipe = arg;
uint64_t len;
uint8_t buf[sizeof (len)];
@@ -88,10 +279,13 @@ nni_tcp_pipe_send(void *arg, nni_msg *msg)
len = (uint64_t) iov[1].iov_len + (uint64_t) iov[2].iov_len;
NNI_PUT64(buf, len);
- if ((rv = nni_plat_tcp_send(&pipe->fd, iov, 3)) == 0) {
+ if ((rv = nni_plat_tcp_send(pipe->tsp, iov, 3)) == 0) {
nni_msg_free(msg);
}
return (rv);
+
+#endif
+ return (NNG_EINVAL);
}
@@ -107,7 +301,7 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp)
iov[0].iov_buf = buf;
iov[0].iov_len = sizeof (buf);
- if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) != 0) {
+ if ((rv = nni_plat_tcp_recv(pipe->tsp, iov, 1)) != 0) {
return (rv);
}
NNI_GET64(buf, len);
@@ -122,7 +316,7 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp)
iov[0].iov_len = nng_msg_len(msg);
iov[0].iov_buf = nng_msg_body(msg);
- if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) == 0) {
+ if ((rv = nni_plat_tcp_recv(pipe->tsp, iov, 1)) == 0) {
*msgp = msg;
} else {
nni_msg_free(msg);
@@ -181,7 +375,7 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock)
ep->ipv4only = 0; // XXX: FIXME
ep->rcvmax = nni_sock_rcvmaxsz(sock);
- if ((rv = nni_plat_tcp_init(&ep->fd)) != 0) {
+ if ((rv = nni_plat_tcp_init(&ep->tsp)) != 0) {
NNI_FREE_STRUCT(ep);
return (rv);
}
@@ -198,7 +392,7 @@ nni_tcp_ep_fini(void *arg)
{
nni_tcp_ep *ep = arg;
- nni_plat_tcp_fini(&ep->fd);
+ nni_plat_tcp_fini(ep->tsp);
NNI_FREE_STRUCT(ep);
}
@@ -208,7 +402,7 @@ nni_tcp_ep_close(void *arg)
{
nni_tcp_ep *ep = arg;
- nni_plat_tcp_shutdown(&ep->fd);
+ nni_plat_tcp_shutdown(ep->tsp);
}
@@ -281,13 +475,13 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe)
iov.iov_buf = buf;
iov.iov_len = 8;
- if ((rv = nni_plat_tcp_send(&pipe->fd, &iov, 1)) != 0) {
+ if ((rv = nni_plat_tcp_send(pipe->tsp, &iov, 1)) != 0) {
return (rv);
}
iov.iov_buf = buf;
iov.iov_len = 8;
- if ((rv = nni_plat_tcp_recv(&pipe->fd, &iov, 1)) != 0) {
+ if ((rv = nni_plat_tcp_recv(pipe->tsp, &iov, 1)) != 0) {
return (rv);
}
@@ -303,10 +497,10 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe)
static int
-nni_tcp_ep_connect(void *arg, nni_pipe *npipe)
+nni_tcp_ep_connect(void *arg, void *pipearg)
{
nni_tcp_ep *ep = arg;
- nni_tcp_pipe *pipe;
+ nni_tcp_pipe *pipe = pipearg;
char *host;
uint16_t port;
int flag;
@@ -350,13 +544,6 @@ nni_tcp_ep_connect(void *arg, nni_pipe *npipe)
return (rv);
}
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) {
- NNI_FREE_STRUCT(pipe);
- return (rv);
- }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
@@ -364,20 +551,15 @@ nni_tcp_ep_connect(void *arg, nni_pipe *npipe)
remaddr.s_un.s_in.sa_port = port;
bindaddr = lclpart == NULL ? NULL : &lcladdr;
- rv = nni_plat_tcp_connect(&pipe->fd, &remaddr, bindaddr);
+ rv = nni_plat_tcp_connect(pipe->tsp, &remaddr, bindaddr);
if (rv != 0) {
- nni_plat_tcp_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
return (rv);
}
if ((rv = nni_tcp_negotiate(pipe)) != 0) {
- nni_plat_tcp_shutdown(&pipe->fd);
- nni_plat_tcp_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
+ nni_plat_tcp_shutdown(pipe->tsp);
return (rv);
}
- nni_pipe_set_tran_data(npipe, pipe);
return (0);
}
@@ -406,7 +588,7 @@ nni_tcp_ep_bind(void *arg)
}
baddr.s_un.s_in.sa_port = port;
- if ((rv = nni_plat_tcp_listen(&ep->fd, &baddr)) != 0) {
+ if ((rv = nni_plat_tcp_listen(ep->tsp, &baddr)) != 0) {
return (rv);
}
return (0);
@@ -414,46 +596,34 @@ nni_tcp_ep_bind(void *arg)
static int
-nni_tcp_ep_accept(void *arg, nni_pipe *npipe)
+nni_tcp_ep_accept(void *arg, void *pipearg)
{
nni_tcp_ep *ep = arg;
- nni_tcp_pipe *pipe;
+ nni_tcp_pipe *pipe = pipearg;
int rv;
-
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
- if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) {
- NNI_FREE_STRUCT(pipe);
- }
-
- if ((rv = nni_plat_tcp_accept(&pipe->fd, &ep->fd)) != 0) {
- nni_plat_tcp_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
+ if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) {
return (rv);
}
if ((rv = nni_tcp_negotiate(pipe)) != 0) {
- nni_plat_tcp_shutdown(&pipe->fd);
- nni_plat_tcp_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
+ nni_plat_tcp_shutdown(pipe->tsp);
return (rv);
}
- nni_pipe_set_tran_data(npipe, pipe);
return (0);
}
static nni_tran_pipe nni_tcp_pipe_ops = {
- .pipe_destroy = nni_tcp_pipe_destroy,
- .pipe_send = nni_tcp_pipe_send,
- .pipe_recv = nni_tcp_pipe_recv,
- .pipe_close = nni_tcp_pipe_close,
- .pipe_peer = nni_tcp_pipe_peer,
- .pipe_getopt = nni_tcp_pipe_getopt,
+ .p_init = nni_tcp_pipe_init,
+ .p_fini = nni_tcp_pipe_fini,
+ .p_aio_send = nni_tcp_pipe_aio_send,
+ .p_aio_recv = nni_tcp_pipe_aio_recv,
+ .p_close = nni_tcp_pipe_close,
+ .p_peer = nni_tcp_pipe_peer,
+ .p_getopt = nni_tcp_pipe_getopt,
};
static nni_tran_ep nni_tcp_ep_ops = {