aboutsummaryrefslogtreecommitdiff
path: root/src/transport/ipc
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-03 20:40:55 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-03 20:40:55 -0700
commita80654e3e0abb7ddbd81a6159dd89933bdec44e7 (patch)
treeee0170b03c2103065d2d606297c93f64cb6c7a92 /src/transport/ipc
parentc1a92ee76a3e9e70ecae4646763bade0c16e4807 (diff)
downloadnng-a80654e3e0abb7ddbd81a6159dd89933bdec44e7.tar.gz
nng-a80654e3e0abb7ddbd81a6159dd89933bdec44e7.tar.bz2
nng-a80654e3e0abb7ddbd81a6159dd89933bdec44e7.zip
IPC & TCP negotiation done using aio. Remove old sync send/recv.
Diffstat (limited to 'src/transport/ipc')
-rw-r--r--src/transport/ipc/ipc.c154
1 files changed, 121 insertions, 33 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 1bb94092..3f882408 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -30,11 +30,17 @@ struct nni_ipc_pipe {
uint8_t txhead[1+sizeof (uint64_t)];
uint8_t rxhead[1+sizeof (uint64_t)];
+ int gottxhead;
+ int gotrxhead;
+ int wanttxhead;
+ int wantrxhead;
nni_aio * user_txaio;
nni_aio * user_rxaio;
+ nni_aio * user_negaio;
nni_aio txaio;
nni_aio rxaio;
+ nni_aio negaio;
nni_msg * rxmsg;
nni_mtx mtx;
};
@@ -50,6 +56,7 @@ struct nni_ipc_ep {
static void nni_ipc_pipe_send_cb(void *);
static void nni_ipc_pipe_recv_cb(void *);
+static void nni_ipc_pipe_nego_cb(void *);
static int
nni_ipc_tran_init(void)
@@ -80,6 +87,7 @@ nni_ipc_pipe_fini(void *arg)
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
+ nni_aio_fini(&pipe->negaio);
if (pipe->isp != NULL) {
nni_plat_ipc_fini(pipe->isp);
}
@@ -92,7 +100,7 @@ nni_ipc_pipe_fini(void *arg)
static int
-nni_ipc_pipe_init(nni_ipc_pipe **pipep)
+nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep)
{
nni_ipc_pipe *pipe;
int rv;
@@ -110,11 +118,18 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep)
if (rv != 0) {
goto fail;
}
-
rv = nni_aio_init(&pipe->rxaio, nni_ipc_pipe_recv_cb, pipe);
if (rv != 0) {
goto fail;
}
+ rv = nni_aio_init(&pipe->negaio, nni_ipc_pipe_nego_cb, pipe);
+ if (rv != 0) {
+ goto fail;
+ }
+
+ pipe->proto = ep->proto;
+ pipe->rcvmax = ep->rcvmax;
+
*pipep = pipe;
return (0);
@@ -125,6 +140,79 @@ fail:
static void
+nni_ipc_cancel_nego(nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ if ((aio = pipe->user_negaio) != NULL) {
+ pipe->user_negaio = NULL;
+ nni_aio_stop(aio);
+ }
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
+nni_ipc_pipe_nego_cb(void *arg)
+{
+ nni_ipc_pipe *pipe = arg;
+ nni_aio *aio = &pipe->negaio;
+ int rv;
+
+ nni_mtx_lock(&pipe->mtx);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto done;
+ }
+
+ // We start transmitting before we receive.
+ if (pipe->gottxhead < pipe->wanttxhead) {
+ pipe->gottxhead += nni_aio_count(aio);
+ } else if (pipe->gotrxhead < pipe->wantrxhead) {
+ pipe->gotrxhead += nni_aio_count(aio);
+ }
+
+ if (pipe->gottxhead < pipe->wanttxhead) {
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead;
+ aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead];
+ // send it down...
+ nni_plat_ipc_aio_send(pipe->isp, aio);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ if (pipe->gotrxhead < pipe->wantrxhead) {
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead;
+ aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead];
+ nni_plat_ipc_aio_recv(pipe->isp, aio);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ // We have both sent and received the headers. Lets check the
+ // receive side header.
+ if ((pipe->rxhead[0] != 0) ||
+ (pipe->rxhead[1] != 'S') ||
+ (pipe->rxhead[2] != 'P') ||
+ (pipe->rxhead[3] != 0) ||
+ (pipe->rxhead[6] != 0) ||
+ (pipe->rxhead[7] != 0)) {
+ rv = NNG_EPROTO;
+ goto done;
+ }
+
+ NNI_GET16(&pipe->rxhead[4], pipe->peer);
+
+done:
+ if ((aio = pipe->user_negaio) != NULL) {
+ pipe->user_negaio = NULL;
+ nni_aio_finish(aio, rv, 0);
+ }
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
nni_ipc_pipe_send_cb(void *arg)
{
nni_ipc_pipe *pipe = arg;
@@ -410,35 +498,40 @@ nni_ipc_negotiate(nni_ipc_pipe *pipe)
int rv;
nni_iov iov;
uint8_t buf[8];
+ nni_aio aio;
- // First send our header..
- buf[0] = 0;
- buf[1] = 'S';
- buf[2] = 'P';
- buf[3] = 0; // version
- NNI_PUT16(&buf[4], pipe->proto);
- NNI_PUT16(&buf[6], 0);
-
- iov.iov_buf = buf;
- iov.iov_len = 8;
- if ((rv = nni_plat_ipc_send(pipe->isp, &iov, 1)) != 0) {
- return (rv);
- }
+ pipe->txhead[0] = 0;
+ pipe->txhead[1] = 'S';
+ pipe->txhead[2] = 'P';
+ pipe->txhead[3] = 0;
+ NNI_PUT16(&pipe->txhead[4], pipe->proto);
+ NNI_PUT16(&pipe->txhead[6], 0);
- iov.iov_buf = buf;
- iov.iov_len = 8;
- if ((rv = nni_plat_ipc_recv(pipe->isp, &iov, 1)) != 0) {
- return (rv);
- }
+ nni_aio_init(&aio, NULL, NULL);
- if ((buf[0] != 0) || (buf[1] != 'S') ||
- (buf[2] != 'P') || (buf[3] != 0) ||
- (buf[6] != 0) || (buf[7] != 0)) {
- return (NNG_EPROTO);
+ nni_mtx_lock(&pipe->mtx);
+ pipe->user_negaio = &aio;
+ pipe->gotrxhead = 0;
+ pipe->gottxhead = 0;
+ pipe->wantrxhead = 8;
+ pipe->wanttxhead = 8;
+ pipe->negaio.a_niov = 1;
+ pipe->negaio.a_iov[0].iov_len = 8;
+ pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0];
+ rv = nni_aio_start(&aio, nni_ipc_cancel_nego, pipe);
+ if (rv != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return (NNG_ECLOSED);
}
+ nni_plat_ipc_aio_send(pipe->isp, &pipe->negaio);
+ nni_mtx_unlock(&pipe->mtx);
- NNI_GET16(&buf[4], pipe->peer);
- return (0);
+ nni_aio_wait(&aio);
+ rv = nni_aio_result(&aio);
+ nni_aio_fini(&aio);
+ NNI_ASSERT(pipe->user_negaio == NULL);
+
+ return (rv);
}
@@ -455,12 +548,10 @@ nni_ipc_ep_connect_sync(void *arg, void **pipep)
}
path = ep->addr + strlen("ipc://");
- if ((rv = nni_ipc_pipe_init(&pipe)) != 0) {
+ if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) {
return (rv);
}
- pipe->proto = ep->proto;
- pipe->rcvmax = ep->rcvmax;
rv = nni_plat_ipc_connect(pipe->isp, path);
if (rv != 0) {
@@ -504,12 +595,9 @@ nni_ipc_ep_accept_sync(void *arg, void **pipep)
nni_ipc_pipe *pipe;
int rv;
- if ((rv = nni_ipc_pipe_init(&pipe)) != 0) {
+ if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) {
return (rv);
}
- pipe->proto = ep->proto;
- pipe->rcvmax = ep->rcvmax;
-
if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) {
nni_ipc_pipe_fini(pipe);
return (rv);