aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-29 11:33:01 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-29 11:33:01 -0700
commit36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64 (patch)
treea2ab064810f269fac8e9b0473685a02c5643f5c9 /src/transport
parentd0c0c9969ab5552889f91d09db6dbf6b79f6705c (diff)
downloadnng-36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64.tar.gz
nng-36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64.tar.bz2
nng-36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64.zip
Pass cancel of IPC and TCP all the way down to POSIX pipedescs.
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/ipc/ipc.c147
-rw-r--r--src/transport/tcp/tcp.c138
2 files changed, 195 insertions, 90 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index ada7a87c..7727b375 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -36,6 +36,7 @@ struct nni_ipc_pipe {
nni_aio txaio;
nni_aio rxaio;
nni_msg * rxmsg;
+ nni_mtx mtx;
};
struct nni_ipc_ep {
@@ -72,6 +73,24 @@ nni_ipc_pipe_close(void *arg)
}
+static void
+nni_ipc_pipe_fini(void *arg)
+{
+ nni_ipc_pipe *pipe = arg;
+
+ nni_aio_fini(&pipe->rxaio);
+ nni_aio_fini(&pipe->txaio);
+ if (pipe->isp != NULL) {
+ nni_plat_ipc_fini(pipe->isp);
+ }
+ if (pipe->rxmsg) {
+ nni_msg_free(pipe->rxmsg);
+ }
+ nni_mtx_fini(&pipe->mtx);
+ NNI_FREE_STRUCT(pipe);
+}
+
+
static int
nni_ipc_pipe_init(void **argp)
{
@@ -81,41 +100,27 @@ nni_ipc_pipe_init(void **argp)
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
+ goto fail;
+ }
if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) {
- NNI_FREE_STRUCT(pipe);
- return (rv);
+ goto fail;
}
rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe);
if (rv != 0) {
- nni_plat_ipc_fini(pipe->isp);
- NNI_FREE_STRUCT(pipe);
- return (rv);
+ goto fail;
}
rv = nni_aio_init(&pipe->rxaio, nni_ipc_pipe_recv_cb, pipe);
if (rv != 0) {
- nni_aio_fini(&pipe->txaio);
- nni_plat_ipc_fini(pipe->isp);
- NNI_FREE_STRUCT(pipe);
- return (rv);
+ goto fail;
}
*argp = pipe;
return (0);
-}
-
-
-static void
-nni_ipc_pipe_fini(void *arg)
-{
- nni_ipc_pipe *pipe = arg;
- if (pipe->rxmsg) {
- nni_msg_free(pipe->rxmsg);
- }
- nni_aio_fini(&pipe->rxaio);
- nni_aio_fini(&pipe->txaio);
- nni_plat_ipc_fini(pipe->isp);
- NNI_FREE_STRUCT(pipe);
+fail:
+ nni_ipc_pipe_fini(pipe);
+ return (rv);
}
@@ -127,21 +132,21 @@ nni_ipc_pipe_send_cb(void *arg)
int rv;
size_t len;
+ nni_mtx_lock(&pipe->mtx);
if ((aio = pipe->user_txaio) == NULL) {
- NNI_ASSERT(aio != NULL);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
pipe->user_txaio = NULL;
if ((rv = nni_aio_result(&pipe->txaio)) != 0) {
- nni_aio_finish(aio, rv, 0);
- return;
+ len = 0;
+ } else {
+ len = nni_msg_len(aio->a_msg);
+ nni_msg_free(aio->a_msg);
+ aio->a_msg = NULL;
}
-
- len = nni_msg_len(aio->a_msg);
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
-
- nni_aio_finish(aio, 0, len);
+ nni_aio_finish(aio, rv, len);
+ nni_mtx_unlock(&pipe->mtx);
}
@@ -152,10 +157,11 @@ nni_ipc_pipe_recv_cb(void *arg)
nni_aio *aio;
int rv;
+ nni_mtx_lock(&pipe->mtx);
aio = pipe->user_rxaio;
if (aio == NULL) {
- // This should never ever happen.
- NNI_ASSERT(aio != NULL);
+ // aio was canceled
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -169,6 +175,7 @@ nni_ipc_pipe_recv_cb(void *arg)
}
pipe->user_rxaio = NULL;
nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -181,6 +188,7 @@ nni_ipc_pipe_recv_cb(void *arg)
// Check to make sure we got msg type 1.
if (pipe->rxhead[0] != 1) {
nni_aio_finish(aio, NNG_EPROTO, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -192,12 +200,19 @@ nni_ipc_pipe_recv_cb(void *arg)
if (len > pipe->rcvmax) {
pipe->user_rxaio = NULL;
nni_aio_finish(aio, NNG_EMSGSIZE, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
+ // Note that all IO on this pipe is blocked behind this
+ // allocation. We could possibly look at using a separate
+ // lock for the read side in the future, so that we allow
+ // transmits to proceed normally. In practice this is
+ // unlikely to be much of an issue though.
if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
pipe->user_rxaio = NULL;
nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -207,14 +222,8 @@ nni_ipc_pipe_recv_cb(void *arg)
pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
pipe->rxaio.a_niov = 1;
- rv = nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio);
- if (rv != 0) {
- pipe->user_rxaio = NULL;
- nni_msg_free(pipe->rxmsg);
- pipe->rxmsg = NULL;
- nni_aio_finish(aio, rv, 0);
- return;
- }
+ nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -224,6 +233,21 @@ nni_ipc_pipe_recv_cb(void *arg)
aio->a_msg = pipe->rxmsg;
pipe->rxmsg = NULL;
nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg));
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
+nni_ipc_cancel_tx(nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ pipe->user_txaio = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+
+ // stop the underlying aio ... we don't want a result for it.
+ nni_aio_stop(&pipe->txaio);
}
@@ -234,10 +258,17 @@ nni_ipc_pipe_aio_send(void *arg, nni_aio *aio)
nni_msg *msg = aio->a_msg;
uint64_t len;
+ len = nni_msg_len(msg) + nni_msg_header_len(msg);
+
+ nni_mtx_lock(&pipe->mtx);
+ if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+ }
+
pipe->user_txaio = aio;
pipe->txhead[0] = 1; // message type, 1.
- len = nni_msg_len(msg) + nni_msg_header_len(msg);
NNI_PUT64(pipe->txhead + 1, len);
pipe->txaio.a_iov[0].iov_buf = pipe->txhead;
@@ -248,7 +279,23 @@ nni_ipc_pipe_aio_send(void *arg, nni_aio *aio)
pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
pipe->txaio.a_niov = 3;
- return (nni_plat_ipc_aio_send(pipe->isp, &pipe->txaio));
+ nni_plat_ipc_aio_send(pipe->isp, &pipe->txaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+}
+
+
+static void
+nni_ipc_cancel_rx(nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ pipe->user_rxaio = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+
+ // stop the underlying aio ... we don't want a result for it.
+ nni_aio_stop(&pipe->rxaio);
}
@@ -257,8 +304,14 @@ nni_ipc_pipe_aio_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- pipe->user_rxaio = aio;
+ nni_mtx_lock(&pipe->mtx);
+ if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+ }
+
+ pipe->user_rxaio = aio;
NNI_ASSERT(pipe->rxmsg == NULL);
// Schedule a read of the IPC header.
@@ -266,7 +319,9 @@ nni_ipc_pipe_aio_recv(void *arg, nni_aio *aio)
pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead);
pipe->rxaio.a_niov = 1;
- return (nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio));
+ nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 1c1eabbf..4aecef65 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -35,6 +35,7 @@ struct nni_tcp_pipe {
nni_aio txaio;
nni_aio rxaio;
nni_msg * rxmsg;
+ nni_mtx mtx;
};
struct nni_tcp_ep {
@@ -72,6 +73,24 @@ nni_tcp_pipe_close(void *arg)
}
+static void
+nni_tcp_pipe_fini(void *arg)
+{
+ nni_tcp_pipe *pipe = arg;
+
+ nni_aio_fini(&pipe->rxaio);
+ nni_aio_fini(&pipe->txaio);
+ if (pipe->tsp != NULL) {
+ nni_plat_tcp_fini(pipe->tsp);
+ }
+ if (pipe->rxmsg) {
+ nni_msg_free(pipe->rxmsg);
+ }
+
+ NNI_FREE_STRUCT(pipe);
+}
+
+
static int
nni_tcp_pipe_init(void **argp)
{
@@ -81,39 +100,26 @@ nni_tcp_pipe_init(void **argp)
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
+ goto fail;
+ }
if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) {
- NNI_FREE_STRUCT(pipe);
- return (rv);
+ goto fail;
}
rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe);
if (rv != 0) {
- nni_plat_tcp_fini(pipe->tsp);
- NNI_FREE_STRUCT(pipe);
- return (rv);
+ goto fail;
}
rv = nni_aio_init(&pipe->rxaio, nni_tcp_pipe_recv_cb, pipe);
if (rv != 0) {
- nni_aio_fini(&pipe->txaio);
- nni_plat_tcp_fini(pipe->tsp);
- NNI_FREE_STRUCT(pipe);
+ goto fail;
}
*argp = pipe;
return (0);
-}
-
-
-static void
-nni_tcp_pipe_fini(void *arg)
-{
- nni_tcp_pipe *pipe = arg;
- if (pipe->rxmsg) {
- nni_msg_free(pipe->rxmsg);
- }
- nni_aio_fini(&pipe->rxaio);
- nni_aio_fini(&pipe->txaio);
- nni_plat_tcp_fini(pipe->tsp);
- NNI_FREE_STRUCT(pipe);
+fail:
+ nni_tcp_pipe_fini(pipe);
+ return (rv);
}
@@ -125,23 +131,22 @@ nni_tcp_pipe_send_cb(void *arg)
nni_aio *aio;
size_t len;
+ nni_mtx_lock(&pipe->mtx);
if ((aio = pipe->user_txaio) == NULL) {
- // This should never ever happen.
- NNI_ASSERT(aio != NULL);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
pipe->user_txaio = NULL;
if ((rv = nni_aio_result(&pipe->txaio)) != 0) {
- nni_aio_finish(aio, rv, 0);
- return;
+ len = 0;
+ } else {
+ len = nni_msg_len(aio->a_msg);
+ nni_msg_free(aio->a_msg);
+ aio->a_msg = NULL;
}
-
- len = nni_msg_len(aio->a_msg);
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
-
nni_aio_finish(aio, 0, len);
+ nni_mtx_unlock(&pipe->mtx);
}
@@ -152,10 +157,11 @@ nni_tcp_pipe_recv_cb(void *arg)
nni_aio *aio;
int rv;
+ nni_mtx_lock(&pipe->mtx);
+
aio = pipe->user_rxaio;
if (aio == NULL) {
- // This should never ever happen.
- NNI_ASSERT(aio != NULL);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -169,6 +175,7 @@ nni_tcp_pipe_recv_cb(void *arg)
}
pipe->user_rxaio = NULL;
nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -185,12 +192,14 @@ nni_tcp_pipe_recv_cb(void *arg)
if (len > pipe->rcvmax) {
pipe->user_rxaio = NULL;
nni_aio_finish(aio, NNG_EMSGSIZE, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
pipe->user_rxaio = NULL;
nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -200,14 +209,8 @@ nni_tcp_pipe_recv_cb(void *arg)
pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
pipe->rxaio.a_niov = 1;
- rv = nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio);
- if (rv != 0) {
- pipe->user_rxaio = NULL;
- nni_msg_free(pipe->rxmsg);
- pipe->rxmsg = NULL;
- nni_aio_finish(aio, rv, 0);
- return;
- }
+ nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -217,6 +220,21 @@ nni_tcp_pipe_recv_cb(void *arg)
aio->a_msg = pipe->rxmsg;
pipe->rxmsg = NULL;
nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg));
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
+nni_tcp_cancel_tx(nni_aio *aio)
+{
+ nni_tcp_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ pipe->user_txaio = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+
+ // stop the underlying aio ... we don't want a result for it.
+ nni_aio_stop(&pipe->txaio);
}
@@ -227,9 +245,17 @@ nni_tcp_pipe_aio_send(void *arg, nni_aio *aio)
nni_msg *msg = aio->a_msg;
uint64_t len;
+ len = nni_msg_len(msg) + nni_msg_header_len(msg);
+
+ nni_mtx_lock(&pipe->mtx);
+
+ if (nni_aio_start(aio, nni_tcp_cancel_tx, pipe) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+ }
+
pipe->user_txaio = aio;
- len = nni_msg_len(msg) + nni_msg_header_len(msg);
NNI_PUT64(pipe->txlen, len);
pipe->txaio.a_iov[0].iov_buf = pipe->txlen;
@@ -240,7 +266,23 @@ nni_tcp_pipe_aio_send(void *arg, nni_aio *aio)
pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
pipe->txaio.a_niov = 3;
- return (nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio));
+ nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+}
+
+
+static void
+nni_tcp_cancel_rx(nni_aio *aio)
+{
+ nni_tcp_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ pipe->user_rxaio = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+
+ // stop the underlying aio ... we don't want a result for it.
+ nni_aio_stop(&pipe->rxaio);
}
@@ -249,6 +291,12 @@ nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *pipe = arg;
+ nni_mtx_lock(&pipe->mtx);
+
+ if (nni_aio_start(aio, nni_tcp_cancel_rx, pipe) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+ }
pipe->user_rxaio = aio;
NNI_ASSERT(pipe->rxmsg == NULL);
@@ -258,7 +306,9 @@ nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio)
pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen);
pipe->rxaio.a_niov = 1;
- return (nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio));
+ nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
}