diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-03 20:40:55 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-03 20:40:55 -0700 |
| commit | a80654e3e0abb7ddbd81a6159dd89933bdec44e7 (patch) | |
| tree | ee0170b03c2103065d2d606297c93f64cb6c7a92 /src/transport/ipc | |
| parent | c1a92ee76a3e9e70ecae4646763bade0c16e4807 (diff) | |
| download | nng-a80654e3e0abb7ddbd81a6159dd89933bdec44e7.tar.gz nng-a80654e3e0abb7ddbd81a6159dd89933bdec44e7.tar.bz2 nng-a80654e3e0abb7ddbd81a6159dd89933bdec44e7.zip | |
IPC & TCP negotiation done using aio. Remove old sync send/recv.
Diffstat (limited to 'src/transport/ipc')
| -rw-r--r-- | src/transport/ipc/ipc.c | 154 |
1 files changed, 121 insertions, 33 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 1bb94092..3f882408 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -30,11 +30,17 @@ struct nni_ipc_pipe { uint8_t txhead[1+sizeof (uint64_t)]; uint8_t rxhead[1+sizeof (uint64_t)]; + int gottxhead; + int gotrxhead; + int wanttxhead; + int wantrxhead; nni_aio * user_txaio; nni_aio * user_rxaio; + nni_aio * user_negaio; nni_aio txaio; nni_aio rxaio; + nni_aio negaio; nni_msg * rxmsg; nni_mtx mtx; }; @@ -50,6 +56,7 @@ struct nni_ipc_ep { static void nni_ipc_pipe_send_cb(void *); static void nni_ipc_pipe_recv_cb(void *); +static void nni_ipc_pipe_nego_cb(void *); static int nni_ipc_tran_init(void) @@ -80,6 +87,7 @@ nni_ipc_pipe_fini(void *arg) nni_aio_fini(&pipe->rxaio); nni_aio_fini(&pipe->txaio); + nni_aio_fini(&pipe->negaio); if (pipe->isp != NULL) { nni_plat_ipc_fini(pipe->isp); } @@ -92,7 +100,7 @@ nni_ipc_pipe_fini(void *arg) static int -nni_ipc_pipe_init(nni_ipc_pipe **pipep) +nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) { nni_ipc_pipe *pipe; int rv; @@ -110,11 +118,18 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep) if (rv != 0) { goto fail; } - rv = nni_aio_init(&pipe->rxaio, nni_ipc_pipe_recv_cb, pipe); if (rv != 0) { goto fail; } + rv = nni_aio_init(&pipe->negaio, nni_ipc_pipe_nego_cb, pipe); + if (rv != 0) { + goto fail; + } + + pipe->proto = ep->proto; + pipe->rcvmax = ep->rcvmax; + *pipep = pipe; return (0); @@ -125,6 +140,79 @@ fail: static void +nni_ipc_cancel_nego(nni_aio *aio) +{ + nni_ipc_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + if ((aio = pipe->user_negaio) != NULL) { + pipe->user_negaio = NULL; + nni_aio_stop(aio); + } + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_ipc_pipe_nego_cb(void *arg) +{ + nni_ipc_pipe *pipe = arg; + nni_aio *aio = &pipe->negaio; + int rv; + + nni_mtx_lock(&pipe->mtx); + if ((rv = nni_aio_result(aio)) != 0) { + goto done; + } + + // We start transmitting before we receive. + if (pipe->gottxhead < pipe->wanttxhead) { + pipe->gottxhead += nni_aio_count(aio); + } else if (pipe->gotrxhead < pipe->wantrxhead) { + pipe->gotrxhead += nni_aio_count(aio); + } + + if (pipe->gottxhead < pipe->wanttxhead) { + aio->a_niov = 1; + aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; + aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead]; + // send it down... + nni_plat_ipc_aio_send(pipe->isp, aio); + nni_mtx_unlock(&pipe->mtx); + return; + } + if (pipe->gotrxhead < pipe->wantrxhead) { + aio->a_niov = 1; + aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; + aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead]; + nni_plat_ipc_aio_recv(pipe->isp, aio); + nni_mtx_unlock(&pipe->mtx); + return; + } + // We have both sent and received the headers. Lets check the + // receive side header. + if ((pipe->rxhead[0] != 0) || + (pipe->rxhead[1] != 'S') || + (pipe->rxhead[2] != 'P') || + (pipe->rxhead[3] != 0) || + (pipe->rxhead[6] != 0) || + (pipe->rxhead[7] != 0)) { + rv = NNG_EPROTO; + goto done; + } + + NNI_GET16(&pipe->rxhead[4], pipe->peer); + +done: + if ((aio = pipe->user_negaio) != NULL) { + pipe->user_negaio = NULL; + nni_aio_finish(aio, rv, 0); + } + nni_mtx_unlock(&pipe->mtx); +} + + +static void nni_ipc_pipe_send_cb(void *arg) { nni_ipc_pipe *pipe = arg; @@ -410,35 +498,40 @@ nni_ipc_negotiate(nni_ipc_pipe *pipe) int rv; nni_iov iov; uint8_t buf[8]; + nni_aio aio; - // First send our header.. - buf[0] = 0; - buf[1] = 'S'; - buf[2] = 'P'; - buf[3] = 0; // version - NNI_PUT16(&buf[4], pipe->proto); - NNI_PUT16(&buf[6], 0); - - iov.iov_buf = buf; - iov.iov_len = 8; - if ((rv = nni_plat_ipc_send(pipe->isp, &iov, 1)) != 0) { - return (rv); - } + pipe->txhead[0] = 0; + pipe->txhead[1] = 'S'; + pipe->txhead[2] = 'P'; + pipe->txhead[3] = 0; + NNI_PUT16(&pipe->txhead[4], pipe->proto); + NNI_PUT16(&pipe->txhead[6], 0); - iov.iov_buf = buf; - iov.iov_len = 8; - if ((rv = nni_plat_ipc_recv(pipe->isp, &iov, 1)) != 0) { - return (rv); - } + nni_aio_init(&aio, NULL, NULL); - if ((buf[0] != 0) || (buf[1] != 'S') || - (buf[2] != 'P') || (buf[3] != 0) || - (buf[6] != 0) || (buf[7] != 0)) { - return (NNG_EPROTO); + nni_mtx_lock(&pipe->mtx); + pipe->user_negaio = &aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + pipe->negaio.a_niov = 1; + pipe->negaio.a_iov[0].iov_len = 8; + pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0]; + rv = nni_aio_start(&aio, nni_ipc_cancel_nego, pipe); + if (rv != 0) { + nni_mtx_unlock(&pipe->mtx); + return (NNG_ECLOSED); } + nni_plat_ipc_aio_send(pipe->isp, &pipe->negaio); + nni_mtx_unlock(&pipe->mtx); - NNI_GET16(&buf[4], pipe->peer); - return (0); + nni_aio_wait(&aio); + rv = nni_aio_result(&aio); + nni_aio_fini(&aio); + NNI_ASSERT(pipe->user_negaio == NULL); + + return (rv); } @@ -455,12 +548,10 @@ nni_ipc_ep_connect_sync(void *arg, void **pipep) } path = ep->addr + strlen("ipc://"); - if ((rv = nni_ipc_pipe_init(&pipe)) != 0) { + if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { return (rv); } - pipe->proto = ep->proto; - pipe->rcvmax = ep->rcvmax; rv = nni_plat_ipc_connect(pipe->isp, path); if (rv != 0) { @@ -504,12 +595,9 @@ nni_ipc_ep_accept_sync(void *arg, void **pipep) nni_ipc_pipe *pipe; int rv; - if ((rv = nni_ipc_pipe_init(&pipe)) != 0) { + if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { return (rv); } - pipe->proto = ep->proto; - pipe->rcvmax = ep->rcvmax; - if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) { nni_ipc_pipe_fini(pipe); return (rv); |
