diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-05 20:22:36 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-05 20:22:36 -0700 |
| commit | 8811317e2da3b5a21d6caab0cc0e12aad417edd6 (patch) | |
| tree | 3ee093b515d3b6d69554bf7913c3626a5605d178 /src/transport/ipc | |
| parent | 5ee6713c34963ed400c8886213ed2ee53c367c74 (diff) | |
| download | nng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.tar.gz nng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.tar.bz2 nng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.zip | |
Make ipc work 100% async.
The connect & accept logic for IPC is now fully asynchronous.
This will serve as a straight-forward template for TCP. Note that
the upper logic still uses a thread to run this "synchronously", but
that will be able to be removed once the last transport (TCP) is made
fully async.
The unified ipcsock is also now separated, and we anticipate being
able to remove the posix_sock.c logic shortly. Separating out the
endpoint logic from the pipe logic helps makes things clearer, and
may faciliate a day where endpoints have multiple addresses (for
example with a connect() endpoint that uses a round-robin DNS list
and tries to run the entire list in parallel, stopping with the first
connection made.)
The platform header got a little cleanup while we were here.
Diffstat (limited to 'src/transport/ipc')
| -rw-r--r-- | src/transport/ipc/ipc.c | 196 |
1 files changed, 123 insertions, 73 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 6ad9da75..d9ddeb2d 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -23,7 +23,7 @@ typedef struct nni_ipc_ep nni_ipc_ep; // nni_ipc_pipe is one end of an IPC connection. struct nni_ipc_pipe { const char * addr; - nni_plat_ipcsock * isp; + nni_plat_ipc_pipe * ipp; uint16_t peer; uint16_t proto; size_t rcvmax; @@ -47,16 +47,21 @@ struct nni_ipc_pipe { struct nni_ipc_ep { char addr[NNG_MAXADDRLEN+1]; - nni_plat_ipcsock * isp; + nni_plat_ipc_ep * iep; int closed; uint16_t proto; size_t rcvmax; + nni_aio aio; + nni_aio * user_aio; + nni_mtx mtx; }; static void nni_ipc_pipe_send_cb(void *); static void nni_ipc_pipe_recv_cb(void *); static void nni_ipc_pipe_nego_cb(void *); +static void nni_ipc_ep_cb(void *); + static int nni_ipc_tran_init(void) @@ -76,7 +81,7 @@ nni_ipc_pipe_close(void *arg) { nni_ipc_pipe *pipe = arg; - nni_plat_ipc_shutdown(pipe->isp); + nni_plat_ipc_pipe_close(pipe->ipp); } @@ -88,8 +93,8 @@ nni_ipc_pipe_fini(void *arg) nni_aio_fini(&pipe->rxaio); nni_aio_fini(&pipe->txaio); nni_aio_fini(&pipe->negaio); - if (pipe->isp != NULL) { - nni_plat_ipc_fini(pipe->isp); + if (pipe->ipp != NULL) { + nni_plat_ipc_pipe_fini(pipe->ipp); } if (pipe->rxmsg) { nni_msg_free(pipe->rxmsg); @@ -100,7 +105,7 @@ nni_ipc_pipe_fini(void *arg) static int -nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) +nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) { nni_ipc_pipe *pipe; int rv; @@ -111,9 +116,6 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { goto fail; } - if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) { - goto fail; - } rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe); if (rv != 0) { goto fail; @@ -129,6 +131,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; + pipe->ipp = ipp; + pipe->addr = ep->addr; *pipep = pipe; return (0); @@ -177,7 +181,7 @@ nni_ipc_pipe_nego_cb(void *arg) aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead]; // send it down... - nni_plat_ipc_send(pipe->isp, aio); + nni_plat_ipc_pipe_send(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -185,7 +189,7 @@ nni_ipc_pipe_nego_cb(void *arg) aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead]; - nni_plat_ipc_recv(pipe->isp, aio); + nni_plat_ipc_pipe_recv(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -310,7 +314,7 @@ nni_ipc_pipe_recv_cb(void *arg) pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); pipe->rxaio.a_niov = 1; - nni_plat_ipc_recv(pipe->isp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); return; } @@ -367,7 +371,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); pipe->txaio.a_niov = 3; - nni_plat_ipc_send(pipe->isp, &pipe->txaio); + nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); } @@ -406,7 +410,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead); pipe->rxaio.a_niov = 1; - nni_plat_ipc_recv(pipe->isp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); } @@ -438,7 +442,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) nni_mtx_unlock(&pipe->mtx); return; } - nni_plat_ipc_send(pipe->isp, &pipe->negaio); + nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio); nni_mtx_unlock(&pipe->mtx); } @@ -476,26 +480,41 @@ nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) } +static void +nni_ipc_ep_fini(void *arg) +{ + nni_ipc_ep *ep = arg; + + nni_plat_ipc_ep_fini(ep->iep); + nni_aio_fini(&ep->aio); + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + + static int nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) { nni_ipc_ep *ep; int rv; - if (strlen(url) > NNG_MAXADDRLEN-1) { + if ((strlen(url) > NNG_MAXADDRLEN-1) || + (strncmp(url, "ipc://", strlen("ipc://")) != 0)) { return (NNG_EADDRINVAL); } + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } + if (((rv = nni_mtx_init(&ep->mtx)) != 0) || + ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) || + ((rv = nni_plat_ipc_ep_init(&ep->iep, url)) != 0)) { + nni_ipc_ep_fini(ep); + return (rv); + } ep->closed = 0; ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); - if ((rv = nni_plat_ipc_init(&ep->isp)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } - (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); *epp = ep; @@ -504,89 +523,120 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) static void -nni_ipc_ep_fini(void *arg) +nni_ipc_ep_close(void *arg) { nni_ipc_ep *ep = arg; - nni_plat_ipc_fini(ep->isp); - NNI_FREE_STRUCT(ep); + nni_plat_ipc_ep_close(ep->iep); } -static void -nni_ipc_ep_close(void *arg) +static int +nni_ipc_ep_bind(void *arg) { nni_ipc_ep *ep = arg; - nni_plat_ipc_shutdown(ep->isp); + return (nni_plat_ipc_ep_listen(ep->iep)); } -static int -nni_ipc_ep_connect_sync(void *arg, void **pipep) +static void +nni_ipc_ep_finish(nni_ipc_ep *ep) { - nni_ipc_ep *ep = arg; + nni_aio *aio = ep->user_aio; nni_ipc_pipe *pipe; int rv; - const char *path; - if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) { - return (NNG_EADDRINVAL); + if ((aio = ep->user_aio) == NULL) { + return; + } + ep->user_aio = NULL; + if ((rv = nni_aio_result(&ep->aio)) != 0) { + goto done; } - path = ep->addr + strlen("ipc://"); + NNI_ASSERT(ep->aio.a_pipe != NULL); - if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { - return (rv); + // Attempt to allocate the parent pipe. If this fails we'll + // drop the connection (ENOMEM probably). + if ((rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe)) != 0) { + nni_plat_ipc_pipe_fini(ep->aio.a_pipe); + goto done; } + aio->a_pipe = pipe; - rv = nni_plat_ipc_connect(pipe->isp, path); - if (rv != 0) { - nni_ipc_pipe_fini(pipe); - return (rv); - } +done: + ep->aio.a_pipe = NULL; + nni_aio_finish(aio, rv, 0); +} - *pipep = pipe; - return (0); + +static void +nni_ipc_ep_cb(void *arg) +{ + nni_ipc_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + nni_ipc_ep_finish(ep); + nni_mtx_unlock(&ep->mtx); } -static int -nni_ipc_ep_bind(void *arg) +static void +nni_ipc_cancel_ep(nni_aio *aio) +{ + nni_ipc_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&ep->mtx); + if (ep->user_aio == aio) { + ep->user_aio = NULL; + } + nni_aio_stop(&ep->aio); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_ipc_ep_accept(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; int rv; - const char *path; - // We want to strok this, so make a copy. Skip the scheme. - if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) { - return (NNG_EADDRINVAL); - } - path = ep->addr + strlen("ipc://"); + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; - if ((rv = nni_plat_ipc_listen(ep->isp, path)) != 0) { - return (rv); + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; } - return (0); + + nni_plat_ipc_ep_accept(ep->iep, &ep->aio); + nni_mtx_unlock(&ep->mtx); } -static int -nni_ipc_ep_accept_sync(void *arg, void **pipep) +static void +nni_ipc_ep_connect(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - nni_ipc_pipe *pipe; int rv; - if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { - return (rv); - } - if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) { - nni_ipc_pipe_fini(pipe); - return (rv); + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; + + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; } - *pipep = pipe; - return (0); + + nni_plat_ipc_ep_connect(ep->iep, &ep->aio); + nni_mtx_unlock(&ep->mtx); } @@ -601,14 +651,14 @@ static nni_tran_pipe nni_ipc_pipe_ops = { }; static nni_tran_ep nni_ipc_ep_ops = { - .ep_init = nni_ipc_ep_init, - .ep_fini = nni_ipc_ep_fini, - .ep_connect_sync = nni_ipc_ep_connect_sync, - .ep_bind = nni_ipc_ep_bind, - .ep_accept_sync = nni_ipc_ep_accept_sync, - .ep_close = nni_ipc_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_ipc_ep_init, + .ep_fini = nni_ipc_ep_fini, + .ep_connect = nni_ipc_ep_connect, + .ep_bind = nni_ipc_ep_bind, + .ep_accept = nni_ipc_ep_accept, + .ep_close = nni_ipc_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the IPC transport linkage, and should be the only global |
