diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-29 13:07:35 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-29 13:07:35 -0700 |
| commit | 374f93a18edca2e0656c337a5b54927169ec31fa (patch) | |
| tree | cbaef995db10cfafd795953be203de744dc688c9 /src/transport/tcp | |
| parent | 6091cf7e1c030417e1fd29c66160e71bcbe4f984 (diff) | |
| download | nng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.gz nng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.bz2 nng-374f93a18edca2e0656c337a5b54927169ec31fa.zip | |
TCP (POSIX) async send/recv working. Other changes.
Transport-level pipe initialization is now sepearate and explicit.
The POSIX send/recv logic still uses threads under the hood, but
makes use of the AIO framework for send/recv. This is a key stepping
stone towards enabling poll() or similar async I/O approaches.
Diffstat (limited to 'src/transport/tcp')
| -rw-r--r-- | src/transport/tcp/tcp.c | 276 |
1 files changed, 223 insertions, 53 deletions
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index e198a927..f220e49e 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -22,21 +22,34 @@ typedef struct nni_tcp_ep nni_tcp_ep; // nni_tcp_pipe is one end of a TCP connection. struct nni_tcp_pipe { const char * addr; - nni_plat_tcpsock fd; + nni_plat_tcpsock * tsp; uint16_t peer; uint16_t proto; size_t rcvmax; + + nni_aio * user_txaio; + nni_aio * user_rxaio; + + uint8_t txlen[sizeof (uint64_t)]; + uint8_t rxlen[sizeof (uint64_t)]; + nni_aio txaio; + nni_aio rxaio; + nni_msg * rxmsg; }; struct nni_tcp_ep { char addr[NNG_MAXADDRLEN+1]; - nni_plat_tcpsock fd; + nni_plat_tcpsock * tsp; int closed; uint16_t proto; size_t rcvmax; int ipv4only; }; + +static void nni_tcp_pipe_send_cb(void *); +static void nni_tcp_pipe_recv_cb(void *); + static int nni_tcp_tran_init(void) { @@ -55,23 +68,201 @@ nni_tcp_pipe_close(void *arg) { nni_tcp_pipe *pipe = arg; - nni_plat_tcp_shutdown(&pipe->fd); + nni_plat_tcp_shutdown(pipe->tsp); +} + + +static int +nni_tcp_pipe_init(void **argp) +{ + nni_tcp_pipe *pipe; + int rv; + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) { + NNI_FREE_STRUCT(pipe); + return (rv); + } + rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe); + if (rv != 0) { + nni_plat_tcp_fini(pipe->tsp); + NNI_FREE_STRUCT(pipe); + return (rv); + } + rv = nni_aio_init(&pipe->rxaio, nni_tcp_pipe_recv_cb, pipe); + if (rv != 0) { + nni_aio_fini(&pipe->txaio); + nni_plat_tcp_fini(pipe->tsp); + NNI_FREE_STRUCT(pipe); + } + *argp = pipe; + return (0); } static void -nni_tcp_pipe_destroy(void *arg) +nni_tcp_pipe_fini(void *arg) { nni_tcp_pipe *pipe = arg; - nni_plat_tcp_fini(&pipe->fd); + nni_aio_fini(&pipe->rxaio); + nni_aio_fini(&pipe->txaio); + nni_plat_tcp_fini(pipe->tsp); NNI_FREE_STRUCT(pipe); } +static void +nni_tcp_pipe_send_cb(void *arg) +{ + nni_tcp_pipe *pipe = arg; + int rv; + nni_aio *aio; + size_t len; + + if ((aio = pipe->user_txaio) == NULL) { + // This should never ever happen. + NNI_ASSERT(aio != NULL); + return; + } + pipe->user_txaio = NULL; + + if ((rv = nni_aio_result(&pipe->txaio)) != 0) { + nni_aio_finish(aio, rv, 0); + return; + } + + len = nni_msg_len(aio->a_msg); + nni_msg_free(aio->a_msg); + aio->a_msg = NULL; + + nni_aio_finish(aio, 0, len); +} + + +static void +nni_tcp_pipe_recv_cb(void *arg) +{ + nni_tcp_pipe *pipe = arg; + nni_aio *aio; + int rv; + + aio = pipe->user_rxaio; + if (aio == NULL) { + // This should never ever happen. + NNI_ASSERT(aio != NULL); + return; + } + + if ((rv = nni_aio_result(&pipe->rxaio)) != 0) { + // Error on receive. This has to cause an error back + // to the user. Also, if we had allocated an rxmsg, lets + // toss it. + if (pipe->rxmsg != NULL) { + nni_msg_free(pipe->rxmsg); + pipe->rxmsg = NULL; + } + pipe->user_rxaio = NULL; + nni_aio_finish(aio, rv, 0); + return; + } + + // If we don't have a message yet, we were reading the TCP message + // header, which is just the length. This tells us the size of the + // message to allocate and how much more to expect. + if (pipe->rxmsg == NULL) { + uint64_t len; + // We should have gotten a message header. + NNI_GET64(pipe->rxlen, len); + + // Make sure the message payload is not too big. If it is + // the caller will shut down the pipe. + if (len > pipe->rcvmax) { + pipe->user_rxaio = NULL; + nni_aio_finish(aio, NNG_EMSGSIZE, 0); + return; + } + + if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { + pipe->user_rxaio = NULL; + nni_aio_finish(aio, rv, 0); + return; + } + + // Submit the rest of the data for a read -- we want to + // read the entire message now. + pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); + pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); + pipe->rxaio.a_niov = 1; + + rv = nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + if (rv != 0) { + pipe->user_rxaio = NULL; + nni_msg_free(pipe->rxmsg); + pipe->rxmsg = NULL; + nni_aio_finish(aio, rv, 0); + return; + } + return; + } + + // Otherwise we got a message read completely. Let the user know the + // good news. + pipe->user_rxaio = NULL; + aio->a_msg = pipe->rxmsg; + pipe->rxmsg = NULL; + nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); +} + + +static int +nni_tcp_pipe_aio_send(void *arg, nni_aio *aio) +{ + nni_tcp_pipe *pipe = arg; + nni_msg *msg = aio->a_msg; + uint64_t len; + + pipe->user_txaio = aio; + + len = nni_msg_len(msg) + nni_msg_header_len(msg); + NNI_PUT64(pipe->txlen, len); + + pipe->txaio.a_iov[0].iov_buf = pipe->txlen; + pipe->txaio.a_iov[0].iov_len = sizeof (pipe->txlen); + pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg); + pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg); + pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg); + pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); + pipe->txaio.a_niov = 3; + + return (nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio)); +} + + +static int +nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio) +{ + nni_tcp_pipe *pipe = arg; + + pipe->user_rxaio = aio; + + NNI_ASSERT(pipe->rxmsg == NULL); + + // Schedule a read of the TCP header. + pipe->rxaio.a_iov[0].iov_buf = pipe->rxlen; + pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen); + pipe->rxaio.a_niov = 1; + + return (nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio)); +} + + static int nni_tcp_pipe_send(void *arg, nni_msg *msg) { +#if 0 nni_tcp_pipe *pipe = arg; uint64_t len; uint8_t buf[sizeof (len)]; @@ -88,10 +279,13 @@ nni_tcp_pipe_send(void *arg, nni_msg *msg) len = (uint64_t) iov[1].iov_len + (uint64_t) iov[2].iov_len; NNI_PUT64(buf, len); - if ((rv = nni_plat_tcp_send(&pipe->fd, iov, 3)) == 0) { + if ((rv = nni_plat_tcp_send(pipe->tsp, iov, 3)) == 0) { nni_msg_free(msg); } return (rv); + +#endif + return (NNG_EINVAL); } @@ -107,7 +301,7 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp) iov[0].iov_buf = buf; iov[0].iov_len = sizeof (buf); - if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) != 0) { + if ((rv = nni_plat_tcp_recv(pipe->tsp, iov, 1)) != 0) { return (rv); } NNI_GET64(buf, len); @@ -122,7 +316,7 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp) iov[0].iov_len = nng_msg_len(msg); iov[0].iov_buf = nng_msg_body(msg); - if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) == 0) { + if ((rv = nni_plat_tcp_recv(pipe->tsp, iov, 1)) == 0) { *msgp = msg; } else { nni_msg_free(msg); @@ -181,7 +375,7 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock) ep->ipv4only = 0; // XXX: FIXME ep->rcvmax = nni_sock_rcvmaxsz(sock); - if ((rv = nni_plat_tcp_init(&ep->fd)) != 0) { + if ((rv = nni_plat_tcp_init(&ep->tsp)) != 0) { NNI_FREE_STRUCT(ep); return (rv); } @@ -198,7 +392,7 @@ nni_tcp_ep_fini(void *arg) { nni_tcp_ep *ep = arg; - nni_plat_tcp_fini(&ep->fd); + nni_plat_tcp_fini(ep->tsp); NNI_FREE_STRUCT(ep); } @@ -208,7 +402,7 @@ nni_tcp_ep_close(void *arg) { nni_tcp_ep *ep = arg; - nni_plat_tcp_shutdown(&ep->fd); + nni_plat_tcp_shutdown(ep->tsp); } @@ -281,13 +475,13 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe) iov.iov_buf = buf; iov.iov_len = 8; - if ((rv = nni_plat_tcp_send(&pipe->fd, &iov, 1)) != 0) { + if ((rv = nni_plat_tcp_send(pipe->tsp, &iov, 1)) != 0) { return (rv); } iov.iov_buf = buf; iov.iov_len = 8; - if ((rv = nni_plat_tcp_recv(&pipe->fd, &iov, 1)) != 0) { + if ((rv = nni_plat_tcp_recv(pipe->tsp, &iov, 1)) != 0) { return (rv); } @@ -303,10 +497,10 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe) static int -nni_tcp_ep_connect(void *arg, nni_pipe *npipe) +nni_tcp_ep_connect(void *arg, void *pipearg) { nni_tcp_ep *ep = arg; - nni_tcp_pipe *pipe; + nni_tcp_pipe *pipe = pipearg; char *host; uint16_t port; int flag; @@ -350,13 +544,6 @@ nni_tcp_ep_connect(void *arg, nni_pipe *npipe) return (rv); } - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) { - NNI_FREE_STRUCT(pipe); - return (rv); - } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; @@ -364,20 +551,15 @@ nni_tcp_ep_connect(void *arg, nni_pipe *npipe) remaddr.s_un.s_in.sa_port = port; bindaddr = lclpart == NULL ? NULL : &lcladdr; - rv = nni_plat_tcp_connect(&pipe->fd, &remaddr, bindaddr); + rv = nni_plat_tcp_connect(pipe->tsp, &remaddr, bindaddr); if (rv != 0) { - nni_plat_tcp_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); return (rv); } if ((rv = nni_tcp_negotiate(pipe)) != 0) { - nni_plat_tcp_shutdown(&pipe->fd); - nni_plat_tcp_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); + nni_plat_tcp_shutdown(pipe->tsp); return (rv); } - nni_pipe_set_tran_data(npipe, pipe); return (0); } @@ -406,7 +588,7 @@ nni_tcp_ep_bind(void *arg) } baddr.s_un.s_in.sa_port = port; - if ((rv = nni_plat_tcp_listen(&ep->fd, &baddr)) != 0) { + if ((rv = nni_plat_tcp_listen(ep->tsp, &baddr)) != 0) { return (rv); } return (0); @@ -414,46 +596,34 @@ nni_tcp_ep_bind(void *arg) static int -nni_tcp_ep_accept(void *arg, nni_pipe *npipe) +nni_tcp_ep_accept(void *arg, void *pipearg) { nni_tcp_ep *ep = arg; - nni_tcp_pipe *pipe; + nni_tcp_pipe *pipe = pipearg; int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; - if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) { - NNI_FREE_STRUCT(pipe); - } - - if ((rv = nni_plat_tcp_accept(&pipe->fd, &ep->fd)) != 0) { - nni_plat_tcp_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); + if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) { return (rv); } if ((rv = nni_tcp_negotiate(pipe)) != 0) { - nni_plat_tcp_shutdown(&pipe->fd); - nni_plat_tcp_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); + nni_plat_tcp_shutdown(pipe->tsp); return (rv); } - nni_pipe_set_tran_data(npipe, pipe); return (0); } static nni_tran_pipe nni_tcp_pipe_ops = { - .pipe_destroy = nni_tcp_pipe_destroy, - .pipe_send = nni_tcp_pipe_send, - .pipe_recv = nni_tcp_pipe_recv, - .pipe_close = nni_tcp_pipe_close, - .pipe_peer = nni_tcp_pipe_peer, - .pipe_getopt = nni_tcp_pipe_getopt, + .p_init = nni_tcp_pipe_init, + .p_fini = nni_tcp_pipe_fini, + .p_aio_send = nni_tcp_pipe_aio_send, + .p_aio_recv = nni_tcp_pipe_aio_recv, + .p_close = nni_tcp_pipe_close, + .p_peer = nni_tcp_pipe_peer, + .p_getopt = nni_tcp_pipe_getopt, }; static nni_tran_ep nni_tcp_ep_ops = { |
