diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-03 20:40:55 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-03 20:40:55 -0700 |
| commit | a80654e3e0abb7ddbd81a6159dd89933bdec44e7 (patch) | |
| tree | ee0170b03c2103065d2d606297c93f64cb6c7a92 | |
| parent | c1a92ee76a3e9e70ecae4646763bade0c16e4807 (diff) | |
| download | nng-a80654e3e0abb7ddbd81a6159dd89933bdec44e7.tar.gz nng-a80654e3e0abb7ddbd81a6159dd89933bdec44e7.tar.bz2 nng-a80654e3e0abb7ddbd81a6159dd89933bdec44e7.zip | |
IPC & TCP negotiation done using aio. Remove old sync send/recv.
| -rw-r--r-- | src/core/platform.h | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipc.c | 14 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 14 | ||||
| -rw-r--r-- | src/platform/posix/posix_socket.c | 98 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 154 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 151 |
6 files changed, 240 insertions, 197 deletions
diff --git a/src/core/platform.h b/src/core/platform.h index 7654f730..2d83f9b7 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -281,12 +281,6 @@ extern void nni_plat_pipe_clear(int); // routine. extern void nni_plat_pipe_close(int, int); -// XXX: Stuff to REMOVE -extern int nni_plat_tcp_send(nni_plat_tcpsock *, nni_iov *, int); -extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int); -extern int nni_plat_ipc_send(nni_plat_ipcsock *, nni_iov *, int); -extern int nni_plat_ipc_recv(nni_plat_ipcsock *, nni_iov *, int); - // Actual platforms we support. This is included up front so that we can // get the specific types that are supplied by the platform. #if defined(PLATFORM_POSIX) diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index d211e82f..a3d95428 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -53,20 +53,6 @@ nni_plat_ipc_path_resolve(nni_sockaddr *addr, const char *path) } -int -nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) -{ - return (nni_posix_sock_send_sync((void *) s, iovs, cnt)); -} - - -int -nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) -{ - return (nni_posix_sock_recv_sync((void *) s, iovs, cnt)); -} - - void nni_plat_ipc_aio_send(nni_plat_ipcsock *s, nni_aio *aio) { diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index 4ba67f9c..6d0d7c08 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -56,20 +56,6 @@ nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags) } -int -nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) -{ - return (nni_posix_sock_send_sync((void *) s, iovs, cnt)); -} - - -int -nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) -{ - return (nni_posix_sock_recv_sync((void *) s, iovs, cnt)); -} - - void nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio) { diff --git a/src/platform/posix/posix_socket.c b/src/platform/posix/posix_socket.c index 32530388..dabad3af 100644 --- a/src/platform/posix/posix_socket.c +++ b/src/platform/posix/posix_socket.c @@ -326,104 +326,6 @@ nni_posix_sock_listen(nni_posix_sock *s, const nni_sockaddr *saddr) // transition functions for now. int -nni_posix_sock_send_sync(nni_posix_sock *s, nni_iov *iovs, int cnt) -{ - struct iovec iov[4]; // We never have more than 3 at present - int i; - int offset; - int resid = 0; - int rv; - - if (cnt > 4) { - return (NNG_EINVAL); - } - - for (i = 0; i < cnt; i++) { - iov[i].iov_base = iovs[i].iov_buf; - iov[i].iov_len = iovs[i].iov_len; - resid += iov[i].iov_len; - } - - i = 0; - while (resid) { - rv = writev(s->fd, &iov[i], cnt); - if (rv < 0) { - if (rv == EINTR) { - continue; - } - return (nni_plat_errno(errno)); - } - NNI_ASSERT(rv <= resid); - resid -= rv; - while (rv) { - if (iov[i].iov_len <= rv) { - rv -= iov[i].iov_len; - i++; - cnt--; - } else { - iov[i].iov_len -= rv; - iov[i].iov_base += rv; - rv = 0; - } - } - } - - return (0); -} - - -int -nni_posix_sock_recv_sync(nni_posix_sock *s, nni_iov *iovs, int cnt) -{ - struct iovec iov[4]; // We never have more than 3 at present - int i; - int offset; - int resid = 0; - int rv; - - if (cnt > 4) { - return (NNG_EINVAL); - } - - for (i = 0; i < cnt; i++) { - iov[i].iov_base = iovs[i].iov_buf; - iov[i].iov_len = iovs[i].iov_len; - resid += iov[i].iov_len; - } - - i = 0; - while (resid) { - rv = readv(s->fd, &iov[i], cnt); - if (rv < 0) { - if (errno == EINTR) { - continue; - } - return (nni_plat_errno(errno)); - } - if (rv == 0) { - return (NNG_ECLOSED); - } - NNI_ASSERT(rv <= resid); - - resid -= rv; - while (rv) { - if (iov[i].iov_len <= rv) { - rv -= iov[i].iov_len; - i++; - cnt--; - } else { - iov[i].iov_len -= rv; - iov[i].iov_base += rv; - rv = 0; - } - } - } - - return (0); -} - - -int nni_posix_sock_accept_sync(nni_posix_sock *s, nni_posix_sock *server) { int fd; diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 1bb94092..3f882408 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -30,11 +30,17 @@ struct nni_ipc_pipe { uint8_t txhead[1+sizeof (uint64_t)]; uint8_t rxhead[1+sizeof (uint64_t)]; + int gottxhead; + int gotrxhead; + int wanttxhead; + int wantrxhead; nni_aio * user_txaio; nni_aio * user_rxaio; + nni_aio * user_negaio; nni_aio txaio; nni_aio rxaio; + nni_aio negaio; nni_msg * rxmsg; nni_mtx mtx; }; @@ -50,6 +56,7 @@ struct nni_ipc_ep { static void nni_ipc_pipe_send_cb(void *); static void nni_ipc_pipe_recv_cb(void *); +static void nni_ipc_pipe_nego_cb(void *); static int nni_ipc_tran_init(void) @@ -80,6 +87,7 @@ nni_ipc_pipe_fini(void *arg) nni_aio_fini(&pipe->rxaio); nni_aio_fini(&pipe->txaio); + nni_aio_fini(&pipe->negaio); if (pipe->isp != NULL) { nni_plat_ipc_fini(pipe->isp); } @@ -92,7 +100,7 @@ nni_ipc_pipe_fini(void *arg) static int -nni_ipc_pipe_init(nni_ipc_pipe **pipep) +nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) { nni_ipc_pipe *pipe; int rv; @@ -110,11 +118,18 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep) if (rv != 0) { goto fail; } - rv = nni_aio_init(&pipe->rxaio, nni_ipc_pipe_recv_cb, pipe); if (rv != 0) { goto fail; } + rv = nni_aio_init(&pipe->negaio, nni_ipc_pipe_nego_cb, pipe); + if (rv != 0) { + goto fail; + } + + pipe->proto = ep->proto; + pipe->rcvmax = ep->rcvmax; + *pipep = pipe; return (0); @@ -125,6 +140,79 @@ fail: static void +nni_ipc_cancel_nego(nni_aio *aio) +{ + nni_ipc_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + if ((aio = pipe->user_negaio) != NULL) { + pipe->user_negaio = NULL; + nni_aio_stop(aio); + } + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_ipc_pipe_nego_cb(void *arg) +{ + nni_ipc_pipe *pipe = arg; + nni_aio *aio = &pipe->negaio; + int rv; + + nni_mtx_lock(&pipe->mtx); + if ((rv = nni_aio_result(aio)) != 0) { + goto done; + } + + // We start transmitting before we receive. + if (pipe->gottxhead < pipe->wanttxhead) { + pipe->gottxhead += nni_aio_count(aio); + } else if (pipe->gotrxhead < pipe->wantrxhead) { + pipe->gotrxhead += nni_aio_count(aio); + } + + if (pipe->gottxhead < pipe->wanttxhead) { + aio->a_niov = 1; + aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; + aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead]; + // send it down... + nni_plat_ipc_aio_send(pipe->isp, aio); + nni_mtx_unlock(&pipe->mtx); + return; + } + if (pipe->gotrxhead < pipe->wantrxhead) { + aio->a_niov = 1; + aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; + aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead]; + nni_plat_ipc_aio_recv(pipe->isp, aio); + nni_mtx_unlock(&pipe->mtx); + return; + } + // We have both sent and received the headers. Lets check the + // receive side header. + if ((pipe->rxhead[0] != 0) || + (pipe->rxhead[1] != 'S') || + (pipe->rxhead[2] != 'P') || + (pipe->rxhead[3] != 0) || + (pipe->rxhead[6] != 0) || + (pipe->rxhead[7] != 0)) { + rv = NNG_EPROTO; + goto done; + } + + NNI_GET16(&pipe->rxhead[4], pipe->peer); + +done: + if ((aio = pipe->user_negaio) != NULL) { + pipe->user_negaio = NULL; + nni_aio_finish(aio, rv, 0); + } + nni_mtx_unlock(&pipe->mtx); +} + + +static void nni_ipc_pipe_send_cb(void *arg) { nni_ipc_pipe *pipe = arg; @@ -410,35 +498,40 @@ nni_ipc_negotiate(nni_ipc_pipe *pipe) int rv; nni_iov iov; uint8_t buf[8]; + nni_aio aio; - // First send our header.. - buf[0] = 0; - buf[1] = 'S'; - buf[2] = 'P'; - buf[3] = 0; // version - NNI_PUT16(&buf[4], pipe->proto); - NNI_PUT16(&buf[6], 0); - - iov.iov_buf = buf; - iov.iov_len = 8; - if ((rv = nni_plat_ipc_send(pipe->isp, &iov, 1)) != 0) { - return (rv); - } + pipe->txhead[0] = 0; + pipe->txhead[1] = 'S'; + pipe->txhead[2] = 'P'; + pipe->txhead[3] = 0; + NNI_PUT16(&pipe->txhead[4], pipe->proto); + NNI_PUT16(&pipe->txhead[6], 0); - iov.iov_buf = buf; - iov.iov_len = 8; - if ((rv = nni_plat_ipc_recv(pipe->isp, &iov, 1)) != 0) { - return (rv); - } + nni_aio_init(&aio, NULL, NULL); - if ((buf[0] != 0) || (buf[1] != 'S') || - (buf[2] != 'P') || (buf[3] != 0) || - (buf[6] != 0) || (buf[7] != 0)) { - return (NNG_EPROTO); + nni_mtx_lock(&pipe->mtx); + pipe->user_negaio = &aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + pipe->negaio.a_niov = 1; + pipe->negaio.a_iov[0].iov_len = 8; + pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0]; + rv = nni_aio_start(&aio, nni_ipc_cancel_nego, pipe); + if (rv != 0) { + nni_mtx_unlock(&pipe->mtx); + return (NNG_ECLOSED); } + nni_plat_ipc_aio_send(pipe->isp, &pipe->negaio); + nni_mtx_unlock(&pipe->mtx); - NNI_GET16(&buf[4], pipe->peer); - return (0); + nni_aio_wait(&aio); + rv = nni_aio_result(&aio); + nni_aio_fini(&aio); + NNI_ASSERT(pipe->user_negaio == NULL); + + return (rv); } @@ -455,12 +548,10 @@ nni_ipc_ep_connect_sync(void *arg, void **pipep) } path = ep->addr + strlen("ipc://"); - if ((rv = nni_ipc_pipe_init(&pipe)) != 0) { + if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { return (rv); } - pipe->proto = ep->proto; - pipe->rcvmax = ep->rcvmax; rv = nni_plat_ipc_connect(pipe->isp, path); if (rv != 0) { @@ -504,12 +595,9 @@ nni_ipc_ep_accept_sync(void *arg, void **pipep) nni_ipc_pipe *pipe; int rv; - if ((rv = nni_ipc_pipe_init(&pipe)) != 0) { + if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { return (rv); } - pipe->proto = ep->proto; - pipe->rcvmax = ep->rcvmax; - if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) { nni_ipc_pipe_fini(pipe); return (rv); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 67a49a95..b6520080 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -29,11 +29,17 @@ struct nni_tcp_pipe { nni_aio * user_txaio; nni_aio * user_rxaio; + nni_aio * user_negaio; uint8_t txlen[sizeof (uint64_t)]; uint8_t rxlen[sizeof (uint64_t)]; + int gottxhead; + int gotrxhead; + int wanttxhead; + int wantrxhead; nni_aio txaio; nni_aio rxaio; + nni_aio negaio; nni_msg * rxmsg; nni_mtx mtx; }; @@ -50,6 +56,7 @@ struct nni_tcp_ep { static void nni_tcp_pipe_send_cb(void *); static void nni_tcp_pipe_recv_cb(void *); +static void nni_tcp_pipe_nego_cb(void *); static int nni_tcp_tran_init(void) @@ -80,6 +87,7 @@ nni_tcp_pipe_fini(void *arg) nni_aio_fini(&pipe->rxaio); nni_aio_fini(&pipe->txaio); + nni_aio_fini(&pipe->negaio); if (pipe->tsp != NULL) { nni_plat_tcp_fini(pipe->tsp); } @@ -92,7 +100,7 @@ nni_tcp_pipe_fini(void *arg) static int -nni_tcp_pipe_init(nni_tcp_pipe **pipep) +nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep) { nni_tcp_pipe *pipe; int rv; @@ -114,6 +122,12 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep) if (rv != 0) { goto fail; } + rv = nni_aio_init(&pipe->negaio, nni_tcp_pipe_nego_cb, pipe); + if (rv != 0) { + goto fail; + } + pipe->proto = ep->proto; + pipe->rcvmax = ep->rcvmax; *pipep = pipe; return (0); @@ -124,6 +138,79 @@ fail: static void +nni_tcp_cancel_nego(nni_aio *aio) +{ + nni_tcp_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + if ((aio = pipe->user_negaio) != NULL) { + pipe->user_negaio = NULL; + nni_aio_stop(aio); + } + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_tcp_pipe_nego_cb(void *arg) +{ + nni_tcp_pipe *pipe = arg; + nni_aio *aio = &pipe->negaio; + int rv; + + nni_mtx_lock(&pipe->mtx); + if ((rv = nni_aio_result(aio)) != 0) { + goto done; + } + + // We start transmitting before we receive. + if (pipe->gottxhead < pipe->wanttxhead) { + pipe->gottxhead += nni_aio_count(aio); + } else if (pipe->gotrxhead < pipe->wantrxhead) { + pipe->gotrxhead += nni_aio_count(aio); + } + + if (pipe->gottxhead < pipe->wanttxhead) { + aio->a_niov = 1; + aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; + aio->a_iov[0].iov_buf = &pipe->txlen[pipe->gottxhead]; + // send it down... + nni_plat_tcp_aio_send(pipe->tsp, aio); + nni_mtx_unlock(&pipe->mtx); + return; + } + if (pipe->gotrxhead < pipe->wantrxhead) { + aio->a_niov = 1; + aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; + aio->a_iov[0].iov_buf = &pipe->rxlen[pipe->gotrxhead]; + nni_plat_tcp_aio_recv(pipe->tsp, aio); + nni_mtx_unlock(&pipe->mtx); + return; + } + // We have both sent and received the headers. Lets check the + // receive side header. + if ((pipe->rxlen[0] != 0) || + (pipe->rxlen[1] != 'S') || + (pipe->rxlen[2] != 'P') || + (pipe->rxlen[3] != 0) || + (pipe->rxlen[6] != 0) || + (pipe->rxlen[7] != 0)) { + rv = NNG_EPROTO; + goto done; + } + + NNI_GET16(&pipe->rxlen[4], pipe->peer); + +done: + if ((aio = pipe->user_negaio) != NULL) { + pipe->user_negaio = NULL; + nni_aio_finish(aio, rv, 0); + } + nni_mtx_unlock(&pipe->mtx); +} + + +static void nni_tcp_pipe_send_cb(void *arg) { nni_tcp_pipe *pipe = arg; @@ -451,35 +538,40 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe) int rv; nni_iov iov; uint8_t buf[8]; + nni_aio aio; - // First send our header.. - buf[0] = 0; - buf[1] = 'S'; - buf[2] = 'P'; - buf[3] = 0; // version - NNI_PUT16(&buf[4], pipe->proto); - NNI_PUT16(&buf[6], 0); - - iov.iov_buf = buf; - iov.iov_len = 8; - if ((rv = nni_plat_tcp_send(pipe->tsp, &iov, 1)) != 0) { - return (rv); - } + pipe->txlen[0] = 0; + pipe->txlen[1] = 'S'; + pipe->txlen[2] = 'P'; + pipe->txlen[3] = 0; + NNI_PUT16(&pipe->txlen[4], pipe->proto); + NNI_PUT16(&pipe->txlen[6], 0); - iov.iov_buf = buf; - iov.iov_len = 8; - if ((rv = nni_plat_tcp_recv(pipe->tsp, &iov, 1)) != 0) { - return (rv); - } + nni_aio_init(&aio, NULL, NULL); - if ((buf[0] != 0) || (buf[1] != 'S') || - (buf[2] != 'P') || (buf[3] != 0) || - (buf[6] != 0) || (buf[7] != 0)) { - return (NNG_EPROTO); + nni_mtx_lock(&pipe->mtx); + pipe->user_negaio = &aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + pipe->negaio.a_niov = 1; + pipe->negaio.a_iov[0].iov_len = 8; + pipe->negaio.a_iov[0].iov_buf = &pipe->txlen[0]; + rv = nni_aio_start(&aio, nni_tcp_cancel_nego, pipe); + if (rv != 0) { + nni_mtx_unlock(&pipe->mtx); + return (NNG_ECLOSED); } + nni_plat_tcp_aio_send(pipe->tsp, &pipe->negaio); + nni_mtx_unlock(&pipe->mtx); - NNI_GET16(&buf[4], pipe->peer); - return (0); + nni_aio_wait(&aio); + rv = nni_aio_result(&aio); + nni_aio_fini(&aio); + NNI_ASSERT(pipe->user_negaio == NULL); + + return (rv); } @@ -531,11 +623,9 @@ nni_tcp_ep_connect_sync(void *arg, void **pipep) return (rv); } - if ((rv = nni_tcp_pipe_init(&pipe)) != 0) { + if ((rv = nni_tcp_pipe_init(&pipe, ep)) != 0) { return (rv); } - pipe->proto = ep->proto; - pipe->rcvmax = ep->rcvmax; // Port is in the same place for both v4 and v6. remaddr.s_un.s_in.sa_port = port; @@ -594,12 +684,9 @@ nni_tcp_ep_accept_sync(void *arg, void **pipep) nni_tcp_pipe *pipe; int rv; - if ((rv = nni_tcp_pipe_init(&pipe)) != 0) { + if ((rv = nni_tcp_pipe_init(&pipe, ep)) != 0) { return (rv); } - pipe->proto = ep->proto; - pipe->rcvmax = ep->rcvmax; - if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) { nni_tcp_pipe_fini(pipe); |
