diff options
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/ipc/ipc.c | 196 |
1 files changed, 123 insertions, 73 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 6ad9da75..d9ddeb2d 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -23,7 +23,7 @@ typedef struct nni_ipc_ep nni_ipc_ep; // nni_ipc_pipe is one end of an IPC connection. struct nni_ipc_pipe { const char * addr; - nni_plat_ipcsock * isp; + nni_plat_ipc_pipe * ipp; uint16_t peer; uint16_t proto; size_t rcvmax; @@ -47,16 +47,21 @@ struct nni_ipc_pipe { struct nni_ipc_ep { char addr[NNG_MAXADDRLEN+1]; - nni_plat_ipcsock * isp; + nni_plat_ipc_ep * iep; int closed; uint16_t proto; size_t rcvmax; + nni_aio aio; + nni_aio * user_aio; + nni_mtx mtx; }; static void nni_ipc_pipe_send_cb(void *); static void nni_ipc_pipe_recv_cb(void *); static void nni_ipc_pipe_nego_cb(void *); +static void nni_ipc_ep_cb(void *); + static int nni_ipc_tran_init(void) @@ -76,7 +81,7 @@ nni_ipc_pipe_close(void *arg) { nni_ipc_pipe *pipe = arg; - nni_plat_ipc_shutdown(pipe->isp); + nni_plat_ipc_pipe_close(pipe->ipp); } @@ -88,8 +93,8 @@ nni_ipc_pipe_fini(void *arg) nni_aio_fini(&pipe->rxaio); nni_aio_fini(&pipe->txaio); nni_aio_fini(&pipe->negaio); - if (pipe->isp != NULL) { - nni_plat_ipc_fini(pipe->isp); + if (pipe->ipp != NULL) { + nni_plat_ipc_pipe_fini(pipe->ipp); } if (pipe->rxmsg) { nni_msg_free(pipe->rxmsg); @@ -100,7 +105,7 @@ nni_ipc_pipe_fini(void *arg) static int -nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) +nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) { nni_ipc_pipe *pipe; int rv; @@ -111,9 +116,6 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { goto fail; } - if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) { - goto fail; - } rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe); if (rv != 0) { goto fail; @@ -129,6 +131,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; + pipe->ipp = ipp; + pipe->addr = ep->addr; *pipep = pipe; return (0); @@ -177,7 +181,7 @@ nni_ipc_pipe_nego_cb(void *arg) aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead]; // send it down... - nni_plat_ipc_send(pipe->isp, aio); + nni_plat_ipc_pipe_send(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -185,7 +189,7 @@ nni_ipc_pipe_nego_cb(void *arg) aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead]; - nni_plat_ipc_recv(pipe->isp, aio); + nni_plat_ipc_pipe_recv(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -310,7 +314,7 @@ nni_ipc_pipe_recv_cb(void *arg) pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); pipe->rxaio.a_niov = 1; - nni_plat_ipc_recv(pipe->isp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); return; } @@ -367,7 +371,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); pipe->txaio.a_niov = 3; - nni_plat_ipc_send(pipe->isp, &pipe->txaio); + nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); } @@ -406,7 +410,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead); pipe->rxaio.a_niov = 1; - nni_plat_ipc_recv(pipe->isp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); } @@ -438,7 +442,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) nni_mtx_unlock(&pipe->mtx); return; } - nni_plat_ipc_send(pipe->isp, &pipe->negaio); + nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio); nni_mtx_unlock(&pipe->mtx); } @@ -476,26 +480,41 @@ nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) } +static void +nni_ipc_ep_fini(void *arg) +{ + nni_ipc_ep *ep = arg; + + nni_plat_ipc_ep_fini(ep->iep); + nni_aio_fini(&ep->aio); + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + + static int nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) { nni_ipc_ep *ep; int rv; - if (strlen(url) > NNG_MAXADDRLEN-1) { + if ((strlen(url) > NNG_MAXADDRLEN-1) || + (strncmp(url, "ipc://", strlen("ipc://")) != 0)) { return (NNG_EADDRINVAL); } + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } + if (((rv = nni_mtx_init(&ep->mtx)) != 0) || + ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) || + ((rv = nni_plat_ipc_ep_init(&ep->iep, url)) != 0)) { + nni_ipc_ep_fini(ep); + return (rv); + } ep->closed = 0; ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); - if ((rv = nni_plat_ipc_init(&ep->isp)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } - (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); *epp = ep; @@ -504,89 +523,120 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) static void -nni_ipc_ep_fini(void *arg) +nni_ipc_ep_close(void *arg) { nni_ipc_ep *ep = arg; - nni_plat_ipc_fini(ep->isp); - NNI_FREE_STRUCT(ep); + nni_plat_ipc_ep_close(ep->iep); } -static void -nni_ipc_ep_close(void *arg) +static int +nni_ipc_ep_bind(void *arg) { nni_ipc_ep *ep = arg; - nni_plat_ipc_shutdown(ep->isp); + return (nni_plat_ipc_ep_listen(ep->iep)); } -static int -nni_ipc_ep_connect_sync(void *arg, void **pipep) +static void +nni_ipc_ep_finish(nni_ipc_ep *ep) { - nni_ipc_ep *ep = arg; + nni_aio *aio = ep->user_aio; nni_ipc_pipe *pipe; int rv; - const char *path; - if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) { - return (NNG_EADDRINVAL); + if ((aio = ep->user_aio) == NULL) { + return; + } + ep->user_aio = NULL; + if ((rv = nni_aio_result(&ep->aio)) != 0) { + goto done; } - path = ep->addr + strlen("ipc://"); + NNI_ASSERT(ep->aio.a_pipe != NULL); - if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { - return (rv); + // Attempt to allocate the parent pipe. If this fails we'll + // drop the connection (ENOMEM probably). + if ((rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe)) != 0) { + nni_plat_ipc_pipe_fini(ep->aio.a_pipe); + goto done; } + aio->a_pipe = pipe; - rv = nni_plat_ipc_connect(pipe->isp, path); - if (rv != 0) { - nni_ipc_pipe_fini(pipe); - return (rv); - } +done: + ep->aio.a_pipe = NULL; + nni_aio_finish(aio, rv, 0); +} - *pipep = pipe; - return (0); + +static void +nni_ipc_ep_cb(void *arg) +{ + nni_ipc_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + nni_ipc_ep_finish(ep); + nni_mtx_unlock(&ep->mtx); } -static int -nni_ipc_ep_bind(void *arg) +static void +nni_ipc_cancel_ep(nni_aio *aio) +{ + nni_ipc_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&ep->mtx); + if (ep->user_aio == aio) { + ep->user_aio = NULL; + } + nni_aio_stop(&ep->aio); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_ipc_ep_accept(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; int rv; - const char *path; - // We want to strok this, so make a copy. Skip the scheme. - if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) { - return (NNG_EADDRINVAL); - } - path = ep->addr + strlen("ipc://"); + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; - if ((rv = nni_plat_ipc_listen(ep->isp, path)) != 0) { - return (rv); + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; } - return (0); + + nni_plat_ipc_ep_accept(ep->iep, &ep->aio); + nni_mtx_unlock(&ep->mtx); } -static int -nni_ipc_ep_accept_sync(void *arg, void **pipep) +static void +nni_ipc_ep_connect(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - nni_ipc_pipe *pipe; int rv; - if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { - return (rv); - } - if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) { - nni_ipc_pipe_fini(pipe); - return (rv); + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; + + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; } - *pipep = pipe; - return (0); + + nni_plat_ipc_ep_connect(ep->iep, &ep->aio); + nni_mtx_unlock(&ep->mtx); } @@ -601,14 +651,14 @@ static nni_tran_pipe nni_ipc_pipe_ops = { }; static nni_tran_ep nni_ipc_ep_ops = { - .ep_init = nni_ipc_ep_init, - .ep_fini = nni_ipc_ep_fini, - .ep_connect_sync = nni_ipc_ep_connect_sync, - .ep_bind = nni_ipc_ep_bind, - .ep_accept_sync = nni_ipc_ep_accept_sync, - .ep_close = nni_ipc_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_ipc_ep_init, + .ep_fini = nni_ipc_ep_fini, + .ep_connect = nni_ipc_ep_connect, + .ep_bind = nni_ipc_ep_bind, + .ep_accept = nni_ipc_ep_accept, + .ep_close = nni_ipc_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the IPC transport linkage, and should be the only global |
