aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/ipc/ipc.c196
1 files changed, 123 insertions, 73 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 6ad9da75..d9ddeb2d 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -23,7 +23,7 @@ typedef struct nni_ipc_ep nni_ipc_ep;
// nni_ipc_pipe is one end of an IPC connection.
struct nni_ipc_pipe {
const char * addr;
- nni_plat_ipcsock * isp;
+ nni_plat_ipc_pipe * ipp;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
@@ -47,16 +47,21 @@ struct nni_ipc_pipe {
struct nni_ipc_ep {
char addr[NNG_MAXADDRLEN+1];
- nni_plat_ipcsock * isp;
+ nni_plat_ipc_ep * iep;
int closed;
uint16_t proto;
size_t rcvmax;
+ nni_aio aio;
+ nni_aio * user_aio;
+ nni_mtx mtx;
};
static void nni_ipc_pipe_send_cb(void *);
static void nni_ipc_pipe_recv_cb(void *);
static void nni_ipc_pipe_nego_cb(void *);
+static void nni_ipc_ep_cb(void *);
+
static int
nni_ipc_tran_init(void)
@@ -76,7 +81,7 @@ nni_ipc_pipe_close(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_plat_ipc_shutdown(pipe->isp);
+ nni_plat_ipc_pipe_close(pipe->ipp);
}
@@ -88,8 +93,8 @@ nni_ipc_pipe_fini(void *arg)
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
nni_aio_fini(&pipe->negaio);
- if (pipe->isp != NULL) {
- nni_plat_ipc_fini(pipe->isp);
+ if (pipe->ipp != NULL) {
+ nni_plat_ipc_pipe_fini(pipe->ipp);
}
if (pipe->rxmsg) {
nni_msg_free(pipe->rxmsg);
@@ -100,7 +105,7 @@ nni_ipc_pipe_fini(void *arg)
static int
-nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep)
+nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
{
nni_ipc_pipe *pipe;
int rv;
@@ -111,9 +116,6 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep)
if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
goto fail;
}
- if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) {
- goto fail;
- }
rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe);
if (rv != 0) {
goto fail;
@@ -129,6 +131,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep)
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
+ pipe->ipp = ipp;
+ pipe->addr = ep->addr;
*pipep = pipe;
return (0);
@@ -177,7 +181,7 @@ nni_ipc_pipe_nego_cb(void *arg)
aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead;
aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead];
// send it down...
- nni_plat_ipc_send(pipe->isp, aio);
+ nni_plat_ipc_pipe_send(pipe->ipp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -185,7 +189,7 @@ nni_ipc_pipe_nego_cb(void *arg)
aio->a_niov = 1;
aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead;
aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead];
- nni_plat_ipc_recv(pipe->isp, aio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -310,7 +314,7 @@ nni_ipc_pipe_recv_cb(void *arg)
pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
pipe->rxaio.a_niov = 1;
- nni_plat_ipc_recv(pipe->isp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -367,7 +371,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
pipe->txaio.a_niov = 3;
- nni_plat_ipc_send(pipe->isp, &pipe->txaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -406,7 +410,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead);
pipe->rxaio.a_niov = 1;
- nni_plat_ipc_recv(pipe->isp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -438,7 +442,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
nni_mtx_unlock(&pipe->mtx);
return;
}
- nni_plat_ipc_send(pipe->isp, &pipe->negaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -476,26 +480,41 @@ nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
}
+static void
+nni_ipc_ep_fini(void *arg)
+{
+ nni_ipc_ep *ep = arg;
+
+ nni_plat_ipc_ep_fini(ep->iep);
+ nni_aio_fini(&ep->aio);
+ nni_mtx_fini(&ep->mtx);
+ NNI_FREE_STRUCT(ep);
+}
+
+
static int
nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock)
{
nni_ipc_ep *ep;
int rv;
- if (strlen(url) > NNG_MAXADDRLEN-1) {
+ if ((strlen(url) > NNG_MAXADDRLEN-1) ||
+ (strncmp(url, "ipc://", strlen("ipc://")) != 0)) {
return (NNG_EADDRINVAL);
}
+
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
+ if (((rv = nni_mtx_init(&ep->mtx)) != 0) ||
+ ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) ||
+ ((rv = nni_plat_ipc_ep_init(&ep->iep, url)) != 0)) {
+ nni_ipc_ep_fini(ep);
+ return (rv);
+ }
ep->closed = 0;
ep->proto = nni_sock_proto(sock);
ep->rcvmax = nni_sock_rcvmaxsz(sock);
- if ((rv = nni_plat_ipc_init(&ep->isp)) != 0) {
- NNI_FREE_STRUCT(ep);
- return (rv);
- }
-
(void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
*epp = ep;
@@ -504,89 +523,120 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock)
static void
-nni_ipc_ep_fini(void *arg)
+nni_ipc_ep_close(void *arg)
{
nni_ipc_ep *ep = arg;
- nni_plat_ipc_fini(ep->isp);
- NNI_FREE_STRUCT(ep);
+ nni_plat_ipc_ep_close(ep->iep);
}
-static void
-nni_ipc_ep_close(void *arg)
+static int
+nni_ipc_ep_bind(void *arg)
{
nni_ipc_ep *ep = arg;
- nni_plat_ipc_shutdown(ep->isp);
+ return (nni_plat_ipc_ep_listen(ep->iep));
}
-static int
-nni_ipc_ep_connect_sync(void *arg, void **pipep)
+static void
+nni_ipc_ep_finish(nni_ipc_ep *ep)
{
- nni_ipc_ep *ep = arg;
+ nni_aio *aio = ep->user_aio;
nni_ipc_pipe *pipe;
int rv;
- const char *path;
- if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) {
- return (NNG_EADDRINVAL);
+ if ((aio = ep->user_aio) == NULL) {
+ return;
+ }
+ ep->user_aio = NULL;
+ if ((rv = nni_aio_result(&ep->aio)) != 0) {
+ goto done;
}
- path = ep->addr + strlen("ipc://");
+ NNI_ASSERT(ep->aio.a_pipe != NULL);
- if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) {
- return (rv);
+ // Attempt to allocate the parent pipe. If this fails we'll
+ // drop the connection (ENOMEM probably).
+ if ((rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe)) != 0) {
+ nni_plat_ipc_pipe_fini(ep->aio.a_pipe);
+ goto done;
}
+ aio->a_pipe = pipe;
- rv = nni_plat_ipc_connect(pipe->isp, path);
- if (rv != 0) {
- nni_ipc_pipe_fini(pipe);
- return (rv);
- }
+done:
+ ep->aio.a_pipe = NULL;
+ nni_aio_finish(aio, rv, 0);
+}
- *pipep = pipe;
- return (0);
+
+static void
+nni_ipc_ep_cb(void *arg)
+{
+ nni_ipc_ep *ep = arg;
+
+ nni_mtx_lock(&ep->mtx);
+ nni_ipc_ep_finish(ep);
+ nni_mtx_unlock(&ep->mtx);
}
-static int
-nni_ipc_ep_bind(void *arg)
+static void
+nni_ipc_cancel_ep(nni_aio *aio)
+{
+ nni_ipc_ep *ep = aio->a_prov_data;
+
+ nni_mtx_lock(&ep->mtx);
+ if (ep->user_aio == aio) {
+ ep->user_aio = NULL;
+ }
+ nni_aio_stop(&ep->aio);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+
+static void
+nni_ipc_ep_accept(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
int rv;
- const char *path;
- // We want to strok this, so make a copy. Skip the scheme.
- if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) {
- return (NNG_EADDRINVAL);
- }
- path = ep->addr + strlen("ipc://");
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ ep->user_aio = aio;
- if ((rv = nni_plat_ipc_listen(ep->isp, path)) != 0) {
- return (rv);
+ // If we can't start, then its dying and we can't report either,
+ if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ ep->user_aio = NULL;
+ nni_mtx_unlock(&ep->mtx);
+ return;
}
- return (0);
+
+ nni_plat_ipc_ep_accept(ep->iep, &ep->aio);
+ nni_mtx_unlock(&ep->mtx);
}
-static int
-nni_ipc_ep_accept_sync(void *arg, void **pipep)
+static void
+nni_ipc_ep_connect(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
- nni_ipc_pipe *pipe;
int rv;
- if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) {
- return (rv);
- }
- if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) {
- nni_ipc_pipe_fini(pipe);
- return (rv);
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ ep->user_aio = aio;
+
+ // If we can't start, then its dying and we can't report either,
+ if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ ep->user_aio = NULL;
+ nni_mtx_unlock(&ep->mtx);
+ return;
}
- *pipep = pipe;
- return (0);
+
+ nni_plat_ipc_ep_connect(ep->iep, &ep->aio);
+ nni_mtx_unlock(&ep->mtx);
}
@@ -601,14 +651,14 @@ static nni_tran_pipe nni_ipc_pipe_ops = {
};
static nni_tran_ep nni_ipc_ep_ops = {
- .ep_init = nni_ipc_ep_init,
- .ep_fini = nni_ipc_ep_fini,
- .ep_connect_sync = nni_ipc_ep_connect_sync,
- .ep_bind = nni_ipc_ep_bind,
- .ep_accept_sync = nni_ipc_ep_accept_sync,
- .ep_close = nni_ipc_ep_close,
- .ep_setopt = NULL,
- .ep_getopt = NULL,
+ .ep_init = nni_ipc_ep_init,
+ .ep_fini = nni_ipc_ep_fini,
+ .ep_connect = nni_ipc_ep_connect,
+ .ep_bind = nni_ipc_ep_bind,
+ .ep_accept = nni_ipc_ep_accept,
+ .ep_close = nni_ipc_ep_close,
+ .ep_setopt = NULL,
+ .ep_getopt = NULL,
};
// This is the IPC transport linkage, and should be the only global