aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/platform.h6
-rw-r--r--src/platform/posix/posix_ipc.c14
-rw-r--r--src/platform/posix/posix_net.c14
-rw-r--r--src/platform/posix/posix_socket.c98
-rw-r--r--src/transport/ipc/ipc.c154
-rw-r--r--src/transport/tcp/tcp.c151
6 files changed, 240 insertions, 197 deletions
diff --git a/src/core/platform.h b/src/core/platform.h
index 7654f730..2d83f9b7 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -281,12 +281,6 @@ extern void nni_plat_pipe_clear(int);
// routine.
extern void nni_plat_pipe_close(int, int);
-// XXX: Stuff to REMOVE
-extern int nni_plat_tcp_send(nni_plat_tcpsock *, nni_iov *, int);
-extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int);
-extern int nni_plat_ipc_send(nni_plat_ipcsock *, nni_iov *, int);
-extern int nni_plat_ipc_recv(nni_plat_ipcsock *, nni_iov *, int);
-
// Actual platforms we support. This is included up front so that we can
// get the specific types that are supplied by the platform.
#if defined(PLATFORM_POSIX)
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index d211e82f..a3d95428 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -53,20 +53,6 @@ nni_plat_ipc_path_resolve(nni_sockaddr *addr, const char *path)
}
-int
-nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
-{
- return (nni_posix_sock_send_sync((void *) s, iovs, cnt));
-}
-
-
-int
-nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
-{
- return (nni_posix_sock_recv_sync((void *) s, iovs, cnt));
-}
-
-
void
nni_plat_ipc_aio_send(nni_plat_ipcsock *s, nni_aio *aio)
{
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index 4ba67f9c..6d0d7c08 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -56,20 +56,6 @@ nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags)
}
-int
-nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
-{
- return (nni_posix_sock_send_sync((void *) s, iovs, cnt));
-}
-
-
-int
-nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
-{
- return (nni_posix_sock_recv_sync((void *) s, iovs, cnt));
-}
-
-
void
nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio)
{
diff --git a/src/platform/posix/posix_socket.c b/src/platform/posix/posix_socket.c
index 32530388..dabad3af 100644
--- a/src/platform/posix/posix_socket.c
+++ b/src/platform/posix/posix_socket.c
@@ -326,104 +326,6 @@ nni_posix_sock_listen(nni_posix_sock *s, const nni_sockaddr *saddr)
// transition functions for now.
int
-nni_posix_sock_send_sync(nni_posix_sock *s, nni_iov *iovs, int cnt)
-{
- struct iovec iov[4]; // We never have more than 3 at present
- int i;
- int offset;
- int resid = 0;
- int rv;
-
- if (cnt > 4) {
- return (NNG_EINVAL);
- }
-
- for (i = 0; i < cnt; i++) {
- iov[i].iov_base = iovs[i].iov_buf;
- iov[i].iov_len = iovs[i].iov_len;
- resid += iov[i].iov_len;
- }
-
- i = 0;
- while (resid) {
- rv = writev(s->fd, &iov[i], cnt);
- if (rv < 0) {
- if (rv == EINTR) {
- continue;
- }
- return (nni_plat_errno(errno));
- }
- NNI_ASSERT(rv <= resid);
- resid -= rv;
- while (rv) {
- if (iov[i].iov_len <= rv) {
- rv -= iov[i].iov_len;
- i++;
- cnt--;
- } else {
- iov[i].iov_len -= rv;
- iov[i].iov_base += rv;
- rv = 0;
- }
- }
- }
-
- return (0);
-}
-
-
-int
-nni_posix_sock_recv_sync(nni_posix_sock *s, nni_iov *iovs, int cnt)
-{
- struct iovec iov[4]; // We never have more than 3 at present
- int i;
- int offset;
- int resid = 0;
- int rv;
-
- if (cnt > 4) {
- return (NNG_EINVAL);
- }
-
- for (i = 0; i < cnt; i++) {
- iov[i].iov_base = iovs[i].iov_buf;
- iov[i].iov_len = iovs[i].iov_len;
- resid += iov[i].iov_len;
- }
-
- i = 0;
- while (resid) {
- rv = readv(s->fd, &iov[i], cnt);
- if (rv < 0) {
- if (errno == EINTR) {
- continue;
- }
- return (nni_plat_errno(errno));
- }
- if (rv == 0) {
- return (NNG_ECLOSED);
- }
- NNI_ASSERT(rv <= resid);
-
- resid -= rv;
- while (rv) {
- if (iov[i].iov_len <= rv) {
- rv -= iov[i].iov_len;
- i++;
- cnt--;
- } else {
- iov[i].iov_len -= rv;
- iov[i].iov_base += rv;
- rv = 0;
- }
- }
- }
-
- return (0);
-}
-
-
-int
nni_posix_sock_accept_sync(nni_posix_sock *s, nni_posix_sock *server)
{
int fd;
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 1bb94092..3f882408 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -30,11 +30,17 @@ struct nni_ipc_pipe {
uint8_t txhead[1+sizeof (uint64_t)];
uint8_t rxhead[1+sizeof (uint64_t)];
+ int gottxhead;
+ int gotrxhead;
+ int wanttxhead;
+ int wantrxhead;
nni_aio * user_txaio;
nni_aio * user_rxaio;
+ nni_aio * user_negaio;
nni_aio txaio;
nni_aio rxaio;
+ nni_aio negaio;
nni_msg * rxmsg;
nni_mtx mtx;
};
@@ -50,6 +56,7 @@ struct nni_ipc_ep {
static void nni_ipc_pipe_send_cb(void *);
static void nni_ipc_pipe_recv_cb(void *);
+static void nni_ipc_pipe_nego_cb(void *);
static int
nni_ipc_tran_init(void)
@@ -80,6 +87,7 @@ nni_ipc_pipe_fini(void *arg)
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
+ nni_aio_fini(&pipe->negaio);
if (pipe->isp != NULL) {
nni_plat_ipc_fini(pipe->isp);
}
@@ -92,7 +100,7 @@ nni_ipc_pipe_fini(void *arg)
static int
-nni_ipc_pipe_init(nni_ipc_pipe **pipep)
+nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep)
{
nni_ipc_pipe *pipe;
int rv;
@@ -110,11 +118,18 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep)
if (rv != 0) {
goto fail;
}
-
rv = nni_aio_init(&pipe->rxaio, nni_ipc_pipe_recv_cb, pipe);
if (rv != 0) {
goto fail;
}
+ rv = nni_aio_init(&pipe->negaio, nni_ipc_pipe_nego_cb, pipe);
+ if (rv != 0) {
+ goto fail;
+ }
+
+ pipe->proto = ep->proto;
+ pipe->rcvmax = ep->rcvmax;
+
*pipep = pipe;
return (0);
@@ -125,6 +140,79 @@ fail:
static void
+nni_ipc_cancel_nego(nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ if ((aio = pipe->user_negaio) != NULL) {
+ pipe->user_negaio = NULL;
+ nni_aio_stop(aio);
+ }
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
+nni_ipc_pipe_nego_cb(void *arg)
+{
+ nni_ipc_pipe *pipe = arg;
+ nni_aio *aio = &pipe->negaio;
+ int rv;
+
+ nni_mtx_lock(&pipe->mtx);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto done;
+ }
+
+ // We start transmitting before we receive.
+ if (pipe->gottxhead < pipe->wanttxhead) {
+ pipe->gottxhead += nni_aio_count(aio);
+ } else if (pipe->gotrxhead < pipe->wantrxhead) {
+ pipe->gotrxhead += nni_aio_count(aio);
+ }
+
+ if (pipe->gottxhead < pipe->wanttxhead) {
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead;
+ aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead];
+ // send it down...
+ nni_plat_ipc_aio_send(pipe->isp, aio);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ if (pipe->gotrxhead < pipe->wantrxhead) {
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead;
+ aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead];
+ nni_plat_ipc_aio_recv(pipe->isp, aio);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ // We have both sent and received the headers. Lets check the
+ // receive side header.
+ if ((pipe->rxhead[0] != 0) ||
+ (pipe->rxhead[1] != 'S') ||
+ (pipe->rxhead[2] != 'P') ||
+ (pipe->rxhead[3] != 0) ||
+ (pipe->rxhead[6] != 0) ||
+ (pipe->rxhead[7] != 0)) {
+ rv = NNG_EPROTO;
+ goto done;
+ }
+
+ NNI_GET16(&pipe->rxhead[4], pipe->peer);
+
+done:
+ if ((aio = pipe->user_negaio) != NULL) {
+ pipe->user_negaio = NULL;
+ nni_aio_finish(aio, rv, 0);
+ }
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
nni_ipc_pipe_send_cb(void *arg)
{
nni_ipc_pipe *pipe = arg;
@@ -410,35 +498,40 @@ nni_ipc_negotiate(nni_ipc_pipe *pipe)
int rv;
nni_iov iov;
uint8_t buf[8];
+ nni_aio aio;
- // First send our header..
- buf[0] = 0;
- buf[1] = 'S';
- buf[2] = 'P';
- buf[3] = 0; // version
- NNI_PUT16(&buf[4], pipe->proto);
- NNI_PUT16(&buf[6], 0);
-
- iov.iov_buf = buf;
- iov.iov_len = 8;
- if ((rv = nni_plat_ipc_send(pipe->isp, &iov, 1)) != 0) {
- return (rv);
- }
+ pipe->txhead[0] = 0;
+ pipe->txhead[1] = 'S';
+ pipe->txhead[2] = 'P';
+ pipe->txhead[3] = 0;
+ NNI_PUT16(&pipe->txhead[4], pipe->proto);
+ NNI_PUT16(&pipe->txhead[6], 0);
- iov.iov_buf = buf;
- iov.iov_len = 8;
- if ((rv = nni_plat_ipc_recv(pipe->isp, &iov, 1)) != 0) {
- return (rv);
- }
+ nni_aio_init(&aio, NULL, NULL);
- if ((buf[0] != 0) || (buf[1] != 'S') ||
- (buf[2] != 'P') || (buf[3] != 0) ||
- (buf[6] != 0) || (buf[7] != 0)) {
- return (NNG_EPROTO);
+ nni_mtx_lock(&pipe->mtx);
+ pipe->user_negaio = &aio;
+ pipe->gotrxhead = 0;
+ pipe->gottxhead = 0;
+ pipe->wantrxhead = 8;
+ pipe->wanttxhead = 8;
+ pipe->negaio.a_niov = 1;
+ pipe->negaio.a_iov[0].iov_len = 8;
+ pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0];
+ rv = nni_aio_start(&aio, nni_ipc_cancel_nego, pipe);
+ if (rv != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return (NNG_ECLOSED);
}
+ nni_plat_ipc_aio_send(pipe->isp, &pipe->negaio);
+ nni_mtx_unlock(&pipe->mtx);
- NNI_GET16(&buf[4], pipe->peer);
- return (0);
+ nni_aio_wait(&aio);
+ rv = nni_aio_result(&aio);
+ nni_aio_fini(&aio);
+ NNI_ASSERT(pipe->user_negaio == NULL);
+
+ return (rv);
}
@@ -455,12 +548,10 @@ nni_ipc_ep_connect_sync(void *arg, void **pipep)
}
path = ep->addr + strlen("ipc://");
- if ((rv = nni_ipc_pipe_init(&pipe)) != 0) {
+ if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) {
return (rv);
}
- pipe->proto = ep->proto;
- pipe->rcvmax = ep->rcvmax;
rv = nni_plat_ipc_connect(pipe->isp, path);
if (rv != 0) {
@@ -504,12 +595,9 @@ nni_ipc_ep_accept_sync(void *arg, void **pipep)
nni_ipc_pipe *pipe;
int rv;
- if ((rv = nni_ipc_pipe_init(&pipe)) != 0) {
+ if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) {
return (rv);
}
- pipe->proto = ep->proto;
- pipe->rcvmax = ep->rcvmax;
-
if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) {
nni_ipc_pipe_fini(pipe);
return (rv);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 67a49a95..b6520080 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -29,11 +29,17 @@ struct nni_tcp_pipe {
nni_aio * user_txaio;
nni_aio * user_rxaio;
+ nni_aio * user_negaio;
uint8_t txlen[sizeof (uint64_t)];
uint8_t rxlen[sizeof (uint64_t)];
+ int gottxhead;
+ int gotrxhead;
+ int wanttxhead;
+ int wantrxhead;
nni_aio txaio;
nni_aio rxaio;
+ nni_aio negaio;
nni_msg * rxmsg;
nni_mtx mtx;
};
@@ -50,6 +56,7 @@ struct nni_tcp_ep {
static void nni_tcp_pipe_send_cb(void *);
static void nni_tcp_pipe_recv_cb(void *);
+static void nni_tcp_pipe_nego_cb(void *);
static int
nni_tcp_tran_init(void)
@@ -80,6 +87,7 @@ nni_tcp_pipe_fini(void *arg)
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
+ nni_aio_fini(&pipe->negaio);
if (pipe->tsp != NULL) {
nni_plat_tcp_fini(pipe->tsp);
}
@@ -92,7 +100,7 @@ nni_tcp_pipe_fini(void *arg)
static int
-nni_tcp_pipe_init(nni_tcp_pipe **pipep)
+nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep)
{
nni_tcp_pipe *pipe;
int rv;
@@ -114,6 +122,12 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep)
if (rv != 0) {
goto fail;
}
+ rv = nni_aio_init(&pipe->negaio, nni_tcp_pipe_nego_cb, pipe);
+ if (rv != 0) {
+ goto fail;
+ }
+ pipe->proto = ep->proto;
+ pipe->rcvmax = ep->rcvmax;
*pipep = pipe;
return (0);
@@ -124,6 +138,79 @@ fail:
static void
+nni_tcp_cancel_nego(nni_aio *aio)
+{
+ nni_tcp_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ if ((aio = pipe->user_negaio) != NULL) {
+ pipe->user_negaio = NULL;
+ nni_aio_stop(aio);
+ }
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
+nni_tcp_pipe_nego_cb(void *arg)
+{
+ nni_tcp_pipe *pipe = arg;
+ nni_aio *aio = &pipe->negaio;
+ int rv;
+
+ nni_mtx_lock(&pipe->mtx);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto done;
+ }
+
+ // We start transmitting before we receive.
+ if (pipe->gottxhead < pipe->wanttxhead) {
+ pipe->gottxhead += nni_aio_count(aio);
+ } else if (pipe->gotrxhead < pipe->wantrxhead) {
+ pipe->gotrxhead += nni_aio_count(aio);
+ }
+
+ if (pipe->gottxhead < pipe->wanttxhead) {
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead;
+ aio->a_iov[0].iov_buf = &pipe->txlen[pipe->gottxhead];
+ // send it down...
+ nni_plat_tcp_aio_send(pipe->tsp, aio);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ if (pipe->gotrxhead < pipe->wantrxhead) {
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead;
+ aio->a_iov[0].iov_buf = &pipe->rxlen[pipe->gotrxhead];
+ nni_plat_tcp_aio_recv(pipe->tsp, aio);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ // We have both sent and received the headers. Lets check the
+ // receive side header.
+ if ((pipe->rxlen[0] != 0) ||
+ (pipe->rxlen[1] != 'S') ||
+ (pipe->rxlen[2] != 'P') ||
+ (pipe->rxlen[3] != 0) ||
+ (pipe->rxlen[6] != 0) ||
+ (pipe->rxlen[7] != 0)) {
+ rv = NNG_EPROTO;
+ goto done;
+ }
+
+ NNI_GET16(&pipe->rxlen[4], pipe->peer);
+
+done:
+ if ((aio = pipe->user_negaio) != NULL) {
+ pipe->user_negaio = NULL;
+ nni_aio_finish(aio, rv, 0);
+ }
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
nni_tcp_pipe_send_cb(void *arg)
{
nni_tcp_pipe *pipe = arg;
@@ -451,35 +538,40 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe)
int rv;
nni_iov iov;
uint8_t buf[8];
+ nni_aio aio;
- // First send our header..
- buf[0] = 0;
- buf[1] = 'S';
- buf[2] = 'P';
- buf[3] = 0; // version
- NNI_PUT16(&buf[4], pipe->proto);
- NNI_PUT16(&buf[6], 0);
-
- iov.iov_buf = buf;
- iov.iov_len = 8;
- if ((rv = nni_plat_tcp_send(pipe->tsp, &iov, 1)) != 0) {
- return (rv);
- }
+ pipe->txlen[0] = 0;
+ pipe->txlen[1] = 'S';
+ pipe->txlen[2] = 'P';
+ pipe->txlen[3] = 0;
+ NNI_PUT16(&pipe->txlen[4], pipe->proto);
+ NNI_PUT16(&pipe->txlen[6], 0);
- iov.iov_buf = buf;
- iov.iov_len = 8;
- if ((rv = nni_plat_tcp_recv(pipe->tsp, &iov, 1)) != 0) {
- return (rv);
- }
+ nni_aio_init(&aio, NULL, NULL);
- if ((buf[0] != 0) || (buf[1] != 'S') ||
- (buf[2] != 'P') || (buf[3] != 0) ||
- (buf[6] != 0) || (buf[7] != 0)) {
- return (NNG_EPROTO);
+ nni_mtx_lock(&pipe->mtx);
+ pipe->user_negaio = &aio;
+ pipe->gotrxhead = 0;
+ pipe->gottxhead = 0;
+ pipe->wantrxhead = 8;
+ pipe->wanttxhead = 8;
+ pipe->negaio.a_niov = 1;
+ pipe->negaio.a_iov[0].iov_len = 8;
+ pipe->negaio.a_iov[0].iov_buf = &pipe->txlen[0];
+ rv = nni_aio_start(&aio, nni_tcp_cancel_nego, pipe);
+ if (rv != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return (NNG_ECLOSED);
}
+ nni_plat_tcp_aio_send(pipe->tsp, &pipe->negaio);
+ nni_mtx_unlock(&pipe->mtx);
- NNI_GET16(&buf[4], pipe->peer);
- return (0);
+ nni_aio_wait(&aio);
+ rv = nni_aio_result(&aio);
+ nni_aio_fini(&aio);
+ NNI_ASSERT(pipe->user_negaio == NULL);
+
+ return (rv);
}
@@ -531,11 +623,9 @@ nni_tcp_ep_connect_sync(void *arg, void **pipep)
return (rv);
}
- if ((rv = nni_tcp_pipe_init(&pipe)) != 0) {
+ if ((rv = nni_tcp_pipe_init(&pipe, ep)) != 0) {
return (rv);
}
- pipe->proto = ep->proto;
- pipe->rcvmax = ep->rcvmax;
// Port is in the same place for both v4 and v6.
remaddr.s_un.s_in.sa_port = port;
@@ -594,12 +684,9 @@ nni_tcp_ep_accept_sync(void *arg, void **pipep)
nni_tcp_pipe *pipe;
int rv;
- if ((rv = nni_tcp_pipe_init(&pipe)) != 0) {
+ if ((rv = nni_tcp_pipe_init(&pipe, ep)) != 0) {
return (rv);
}
- pipe->proto = ep->proto;
- pipe->rcvmax = ep->rcvmax;
-
if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) {
nni_tcp_pipe_fini(pipe);