aboutsummaryrefslogtreecommitdiff
path: root/src/transport/ipc
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-05 20:22:36 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-05 20:22:36 -0700
commit8811317e2da3b5a21d6caab0cc0e12aad417edd6 (patch)
tree3ee093b515d3b6d69554bf7913c3626a5605d178 /src/transport/ipc
parent5ee6713c34963ed400c8886213ed2ee53c367c74 (diff)
downloadnng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.tar.gz
nng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.tar.bz2
nng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.zip
Make ipc work 100% async.
The connect & accept logic for IPC is now fully asynchronous. This will serve as a straight-forward template for TCP. Note that the upper logic still uses a thread to run this "synchronously", but that will be able to be removed once the last transport (TCP) is made fully async. The unified ipcsock is also now separated, and we anticipate being able to remove the posix_sock.c logic shortly. Separating out the endpoint logic from the pipe logic helps makes things clearer, and may faciliate a day where endpoints have multiple addresses (for example with a connect() endpoint that uses a round-robin DNS list and tries to run the entire list in parallel, stopping with the first connection made.) The platform header got a little cleanup while we were here.
Diffstat (limited to 'src/transport/ipc')
-rw-r--r--src/transport/ipc/ipc.c196
1 files changed, 123 insertions, 73 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 6ad9da75..d9ddeb2d 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -23,7 +23,7 @@ typedef struct nni_ipc_ep nni_ipc_ep;
// nni_ipc_pipe is one end of an IPC connection.
struct nni_ipc_pipe {
const char * addr;
- nni_plat_ipcsock * isp;
+ nni_plat_ipc_pipe * ipp;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
@@ -47,16 +47,21 @@ struct nni_ipc_pipe {
struct nni_ipc_ep {
char addr[NNG_MAXADDRLEN+1];
- nni_plat_ipcsock * isp;
+ nni_plat_ipc_ep * iep;
int closed;
uint16_t proto;
size_t rcvmax;
+ nni_aio aio;
+ nni_aio * user_aio;
+ nni_mtx mtx;
};
static void nni_ipc_pipe_send_cb(void *);
static void nni_ipc_pipe_recv_cb(void *);
static void nni_ipc_pipe_nego_cb(void *);
+static void nni_ipc_ep_cb(void *);
+
static int
nni_ipc_tran_init(void)
@@ -76,7 +81,7 @@ nni_ipc_pipe_close(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_plat_ipc_shutdown(pipe->isp);
+ nni_plat_ipc_pipe_close(pipe->ipp);
}
@@ -88,8 +93,8 @@ nni_ipc_pipe_fini(void *arg)
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
nni_aio_fini(&pipe->negaio);
- if (pipe->isp != NULL) {
- nni_plat_ipc_fini(pipe->isp);
+ if (pipe->ipp != NULL) {
+ nni_plat_ipc_pipe_fini(pipe->ipp);
}
if (pipe->rxmsg) {
nni_msg_free(pipe->rxmsg);
@@ -100,7 +105,7 @@ nni_ipc_pipe_fini(void *arg)
static int
-nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep)
+nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
{
nni_ipc_pipe *pipe;
int rv;
@@ -111,9 +116,6 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep)
if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
goto fail;
}
- if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) {
- goto fail;
- }
rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe);
if (rv != 0) {
goto fail;
@@ -129,6 +131,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep)
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
+ pipe->ipp = ipp;
+ pipe->addr = ep->addr;
*pipep = pipe;
return (0);
@@ -177,7 +181,7 @@ nni_ipc_pipe_nego_cb(void *arg)
aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead;
aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead];
// send it down...
- nni_plat_ipc_send(pipe->isp, aio);
+ nni_plat_ipc_pipe_send(pipe->ipp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -185,7 +189,7 @@ nni_ipc_pipe_nego_cb(void *arg)
aio->a_niov = 1;
aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead;
aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead];
- nni_plat_ipc_recv(pipe->isp, aio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -310,7 +314,7 @@ nni_ipc_pipe_recv_cb(void *arg)
pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
pipe->rxaio.a_niov = 1;
- nni_plat_ipc_recv(pipe->isp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -367,7 +371,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
pipe->txaio.a_niov = 3;
- nni_plat_ipc_send(pipe->isp, &pipe->txaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -406,7 +410,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead);
pipe->rxaio.a_niov = 1;
- nni_plat_ipc_recv(pipe->isp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -438,7 +442,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
nni_mtx_unlock(&pipe->mtx);
return;
}
- nni_plat_ipc_send(pipe->isp, &pipe->negaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -476,26 +480,41 @@ nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
}
+static void
+nni_ipc_ep_fini(void *arg)
+{
+ nni_ipc_ep *ep = arg;
+
+ nni_plat_ipc_ep_fini(ep->iep);
+ nni_aio_fini(&ep->aio);
+ nni_mtx_fini(&ep->mtx);
+ NNI_FREE_STRUCT(ep);
+}
+
+
static int
nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock)
{
nni_ipc_ep *ep;
int rv;
- if (strlen(url) > NNG_MAXADDRLEN-1) {
+ if ((strlen(url) > NNG_MAXADDRLEN-1) ||
+ (strncmp(url, "ipc://", strlen("ipc://")) != 0)) {
return (NNG_EADDRINVAL);
}
+
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
+ if (((rv = nni_mtx_init(&ep->mtx)) != 0) ||
+ ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) ||
+ ((rv = nni_plat_ipc_ep_init(&ep->iep, url)) != 0)) {
+ nni_ipc_ep_fini(ep);
+ return (rv);
+ }
ep->closed = 0;
ep->proto = nni_sock_proto(sock);
ep->rcvmax = nni_sock_rcvmaxsz(sock);
- if ((rv = nni_plat_ipc_init(&ep->isp)) != 0) {
- NNI_FREE_STRUCT(ep);
- return (rv);
- }
-
(void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
*epp = ep;
@@ -504,89 +523,120 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock)
static void
-nni_ipc_ep_fini(void *arg)
+nni_ipc_ep_close(void *arg)
{
nni_ipc_ep *ep = arg;
- nni_plat_ipc_fini(ep->isp);
- NNI_FREE_STRUCT(ep);
+ nni_plat_ipc_ep_close(ep->iep);
}
-static void
-nni_ipc_ep_close(void *arg)
+static int
+nni_ipc_ep_bind(void *arg)
{
nni_ipc_ep *ep = arg;
- nni_plat_ipc_shutdown(ep->isp);
+ return (nni_plat_ipc_ep_listen(ep->iep));
}
-static int
-nni_ipc_ep_connect_sync(void *arg, void **pipep)
+static void
+nni_ipc_ep_finish(nni_ipc_ep *ep)
{
- nni_ipc_ep *ep = arg;
+ nni_aio *aio = ep->user_aio;
nni_ipc_pipe *pipe;
int rv;
- const char *path;
- if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) {
- return (NNG_EADDRINVAL);
+ if ((aio = ep->user_aio) == NULL) {
+ return;
+ }
+ ep->user_aio = NULL;
+ if ((rv = nni_aio_result(&ep->aio)) != 0) {
+ goto done;
}
- path = ep->addr + strlen("ipc://");
+ NNI_ASSERT(ep->aio.a_pipe != NULL);
- if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) {
- return (rv);
+ // Attempt to allocate the parent pipe. If this fails we'll
+ // drop the connection (ENOMEM probably).
+ if ((rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe)) != 0) {
+ nni_plat_ipc_pipe_fini(ep->aio.a_pipe);
+ goto done;
}
+ aio->a_pipe = pipe;
- rv = nni_plat_ipc_connect(pipe->isp, path);
- if (rv != 0) {
- nni_ipc_pipe_fini(pipe);
- return (rv);
- }
+done:
+ ep->aio.a_pipe = NULL;
+ nni_aio_finish(aio, rv, 0);
+}
- *pipep = pipe;
- return (0);
+
+static void
+nni_ipc_ep_cb(void *arg)
+{
+ nni_ipc_ep *ep = arg;
+
+ nni_mtx_lock(&ep->mtx);
+ nni_ipc_ep_finish(ep);
+ nni_mtx_unlock(&ep->mtx);
}
-static int
-nni_ipc_ep_bind(void *arg)
+static void
+nni_ipc_cancel_ep(nni_aio *aio)
+{
+ nni_ipc_ep *ep = aio->a_prov_data;
+
+ nni_mtx_lock(&ep->mtx);
+ if (ep->user_aio == aio) {
+ ep->user_aio = NULL;
+ }
+ nni_aio_stop(&ep->aio);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+
+static void
+nni_ipc_ep_accept(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
int rv;
- const char *path;
- // We want to strok this, so make a copy. Skip the scheme.
- if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) {
- return (NNG_EADDRINVAL);
- }
- path = ep->addr + strlen("ipc://");
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ ep->user_aio = aio;
- if ((rv = nni_plat_ipc_listen(ep->isp, path)) != 0) {
- return (rv);
+ // If we can't start, then its dying and we can't report either,
+ if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ ep->user_aio = NULL;
+ nni_mtx_unlock(&ep->mtx);
+ return;
}
- return (0);
+
+ nni_plat_ipc_ep_accept(ep->iep, &ep->aio);
+ nni_mtx_unlock(&ep->mtx);
}
-static int
-nni_ipc_ep_accept_sync(void *arg, void **pipep)
+static void
+nni_ipc_ep_connect(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
- nni_ipc_pipe *pipe;
int rv;
- if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) {
- return (rv);
- }
- if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) {
- nni_ipc_pipe_fini(pipe);
- return (rv);
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ ep->user_aio = aio;
+
+ // If we can't start, then its dying and we can't report either,
+ if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ ep->user_aio = NULL;
+ nni_mtx_unlock(&ep->mtx);
+ return;
}
- *pipep = pipe;
- return (0);
+
+ nni_plat_ipc_ep_connect(ep->iep, &ep->aio);
+ nni_mtx_unlock(&ep->mtx);
}
@@ -601,14 +651,14 @@ static nni_tran_pipe nni_ipc_pipe_ops = {
};
static nni_tran_ep nni_ipc_ep_ops = {
- .ep_init = nni_ipc_ep_init,
- .ep_fini = nni_ipc_ep_fini,
- .ep_connect_sync = nni_ipc_ep_connect_sync,
- .ep_bind = nni_ipc_ep_bind,
- .ep_accept_sync = nni_ipc_ep_accept_sync,
- .ep_close = nni_ipc_ep_close,
- .ep_setopt = NULL,
- .ep_getopt = NULL,
+ .ep_init = nni_ipc_ep_init,
+ .ep_fini = nni_ipc_ep_fini,
+ .ep_connect = nni_ipc_ep_connect,
+ .ep_bind = nni_ipc_ep_bind,
+ .ep_accept = nni_ipc_ep_accept,
+ .ep_close = nni_ipc_ep_close,
+ .ep_setopt = NULL,
+ .ep_getopt = NULL,
};
// This is the IPC transport linkage, and should be the only global