aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-29 11:33:01 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-29 11:33:01 -0700
commit36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64 (patch)
treea2ab064810f269fac8e9b0473685a02c5643f5c9 /src
parentd0c0c9969ab5552889f91d09db6dbf6b79f6705c (diff)
downloadnng-36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64.tar.gz
nng-36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64.tar.bz2
nng-36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64.zip
Pass cancel of IPC and TCP all the way down to POSIX pipedescs.
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c2
-rw-r--r--src/core/platform.h8
-rw-r--r--src/platform/posix/posix_aio.h4
-rw-r--r--src/platform/posix/posix_ipc.c8
-rw-r--r--src/platform/posix/posix_net.c8
-rw-r--r--src/platform/posix/posix_poll.c22
-rw-r--r--src/transport/ipc/ipc.c147
-rw-r--r--src/transport/tcp/tcp.c138
8 files changed, 222 insertions, 115 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 96e7c950..6a57ad52 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -116,6 +116,8 @@ nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data)
nni_mtx_unlock(&aio->a_lk);
return (NNG_ECANCELED);
}
+ aio->a_result = 0;
+ aio->a_count = 0;
aio->a_prov_cancel = cancel;
aio->a_prov_data = data;
nni_mtx_unlock(&aio->a_lk);
diff --git a/src/core/platform.h b/src/core/platform.h
index 08fcdb1c..7654f730 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -207,12 +207,12 @@ extern int nni_plat_tcp_connect(nni_plat_tcpsock *, const nni_sockaddr *,
// The data to send is stored in the a_iov field of the aio, and the array
// of iovs will never be larger than 4. The platform may modify the iovs,
// or the iov list.
-extern int nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *);
+extern void nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *);
// nni_plat_tcp_aio_recv recvs data into the buffers provided by the
// iovs. The implementation does not return until the iovs are completely
// full, or an error condition occurs.
-extern int nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *);
+extern void nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *);
// nni_plat_ipc_init initializes the socket, for example it can
// set underlying file descriptors to -1, etc.
@@ -244,12 +244,12 @@ extern int nni_plat_ipc_connect(nni_plat_ipcsock *, const char *);
// nni_plat_ipc_aio_send sends data to the peer. The platform is responsible
// for attempting to send all of the data. The iov count will never be
// larger than 4. The platform may modify the iovs.
-extern int nni_plat_ipc_aio_send(nni_plat_ipcsock *, nni_aio *);
+extern void nni_plat_ipc_aio_send(nni_plat_ipcsock *, nni_aio *);
// nni_plat_ipc_aio_recv recvs data into the buffers provided by the
// iovs. The implementation does not return until the iovs are completely
// full, or an error condition occurs.
-extern int nni_plat_ipc_aio_recv(nni_plat_ipcsock *, nni_aio *);
+extern void nni_plat_ipc_aio_recv(nni_plat_ipcsock *, nni_aio *);
// nni_plat_seed_prng seeds the PRNG subsystem. The specified number
// of bytes of entropy should be stashed. When possible, cryptographic
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index de662ff2..e762bebe 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -24,8 +24,8 @@ extern int nni_posix_pipedesc_sysinit(void);
extern void nni_posix_pipedesc_sysfini(void);
extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int);
extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *);
-extern int nni_posix_pipedesc_read(nni_posix_pipedesc *, nni_aio *);
-extern int nni_posix_pipedesc_write(nni_posix_pipedesc *, nni_aio *);
+extern void nni_posix_pipedesc_read(nni_posix_pipedesc *, nni_aio *);
+extern void nni_posix_pipedesc_write(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_close(nni_posix_pipedesc *);
#endif // PLATFORM_POSIX_AIO_H
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index ba46b41e..44307ca5 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -66,17 +66,17 @@ nni_plat_ipc_path_to_sockaddr(struct sockaddr_un *sun, const char *path)
}
-int
+void
nni_plat_ipc_aio_send(nni_plat_ipcsock *isp, nni_aio *aio)
{
- return (nni_posix_pipedesc_write(isp->pd, aio));
+ nni_posix_pipedesc_write(isp->pd, aio);
}
-int
+void
nni_plat_ipc_aio_recv(nni_plat_ipcsock *isp, nni_aio *aio)
{
- return (nni_posix_pipedesc_read(isp->pd, aio));
+ nni_posix_pipedesc_read(isp->pd, aio);
}
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index 5ae9904a..1d44435e 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -170,17 +170,17 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
}
-int
+void
nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio)
{
- return (nni_posix_pipedesc_write(s->pd, aio));
+ nni_posix_pipedesc_write(s->pd, aio);
}
-int
+void
nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio)
{
- return (nni_posix_pipedesc_read(s->pd, aio));
+ nni_posix_pipedesc_read(s->pd, aio);
}
diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c
index 7fb07917..a30c0953 100644
--- a/src/platform/posix/posix_poll.c
+++ b/src/platform/posix/posix_poll.c
@@ -42,6 +42,7 @@ struct nni_posix_pipedesc {
nni_list writeq;
nni_list_node node;
nni_posix_pollq * pq;
+ int nonblocking;
};
// nni_posix_pollq is a work structure used by the poller thread, that keeps
@@ -417,14 +418,17 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
int rv;
nni_posix_pollq *pq = pd->pq;
- // XXX: this should be done only once, after tcp negot. is done
- // or at init if we can get tcp negot. to be async.
- (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK);
-
nni_mtx_lock(&pq->mtx);
if (pd->fd < 0) {
nni_mtx_unlock(&pq->mtx);
nni_aio_finish(aio, NNG_ECLOSED, aio->a_count);
+ return;
+ }
+ // XXX: We really should just make all the FDs nonblocking, but we
+ // need to fix the negotiation phase.
+ if (pd->nonblocking == 0) {
+ (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK);
+ pd->nonblocking = 1;
}
if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
nni_mtx_unlock(&pq->mtx);
@@ -470,6 +474,7 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
pd->pq = &nni_posix_global_pollq;
pd->fd = fd;
pd->index = 0;
+ pd->nonblocking = 0;
NNI_LIST_INIT(&pd->readq, nni_aio, a_prov_node);
NNI_LIST_INIT(&pd->writeq, nni_aio, a_prov_node);
@@ -567,22 +572,17 @@ nni_posix_pipedesc_sysfini(void)
// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int);
// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *);
-int
+void
nni_posix_pipedesc_read(nni_posix_pipedesc *pd, nni_aio *aio)
{
- aio->a_count = 0;
-
nni_posix_pipedesc_submit(pd, &pd->readq, aio);
- return (0);
}
-int
+void
nni_posix_pipedesc_write(nni_posix_pipedesc *pd, nni_aio *aio)
{
- aio->a_count = 0;
nni_posix_pipedesc_submit(pd, &pd->writeq, aio);
- return (0);
}
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index ada7a87c..7727b375 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -36,6 +36,7 @@ struct nni_ipc_pipe {
nni_aio txaio;
nni_aio rxaio;
nni_msg * rxmsg;
+ nni_mtx mtx;
};
struct nni_ipc_ep {
@@ -72,6 +73,24 @@ nni_ipc_pipe_close(void *arg)
}
+static void
+nni_ipc_pipe_fini(void *arg)
+{
+ nni_ipc_pipe *pipe = arg;
+
+ nni_aio_fini(&pipe->rxaio);
+ nni_aio_fini(&pipe->txaio);
+ if (pipe->isp != NULL) {
+ nni_plat_ipc_fini(pipe->isp);
+ }
+ if (pipe->rxmsg) {
+ nni_msg_free(pipe->rxmsg);
+ }
+ nni_mtx_fini(&pipe->mtx);
+ NNI_FREE_STRUCT(pipe);
+}
+
+
static int
nni_ipc_pipe_init(void **argp)
{
@@ -81,41 +100,27 @@ nni_ipc_pipe_init(void **argp)
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
+ goto fail;
+ }
if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) {
- NNI_FREE_STRUCT(pipe);
- return (rv);
+ goto fail;
}
rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe);
if (rv != 0) {
- nni_plat_ipc_fini(pipe->isp);
- NNI_FREE_STRUCT(pipe);
- return (rv);
+ goto fail;
}
rv = nni_aio_init(&pipe->rxaio, nni_ipc_pipe_recv_cb, pipe);
if (rv != 0) {
- nni_aio_fini(&pipe->txaio);
- nni_plat_ipc_fini(pipe->isp);
- NNI_FREE_STRUCT(pipe);
- return (rv);
+ goto fail;
}
*argp = pipe;
return (0);
-}
-
-
-static void
-nni_ipc_pipe_fini(void *arg)
-{
- nni_ipc_pipe *pipe = arg;
- if (pipe->rxmsg) {
- nni_msg_free(pipe->rxmsg);
- }
- nni_aio_fini(&pipe->rxaio);
- nni_aio_fini(&pipe->txaio);
- nni_plat_ipc_fini(pipe->isp);
- NNI_FREE_STRUCT(pipe);
+fail:
+ nni_ipc_pipe_fini(pipe);
+ return (rv);
}
@@ -127,21 +132,21 @@ nni_ipc_pipe_send_cb(void *arg)
int rv;
size_t len;
+ nni_mtx_lock(&pipe->mtx);
if ((aio = pipe->user_txaio) == NULL) {
- NNI_ASSERT(aio != NULL);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
pipe->user_txaio = NULL;
if ((rv = nni_aio_result(&pipe->txaio)) != 0) {
- nni_aio_finish(aio, rv, 0);
- return;
+ len = 0;
+ } else {
+ len = nni_msg_len(aio->a_msg);
+ nni_msg_free(aio->a_msg);
+ aio->a_msg = NULL;
}
-
- len = nni_msg_len(aio->a_msg);
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
-
- nni_aio_finish(aio, 0, len);
+ nni_aio_finish(aio, rv, len);
+ nni_mtx_unlock(&pipe->mtx);
}
@@ -152,10 +157,11 @@ nni_ipc_pipe_recv_cb(void *arg)
nni_aio *aio;
int rv;
+ nni_mtx_lock(&pipe->mtx);
aio = pipe->user_rxaio;
if (aio == NULL) {
- // This should never ever happen.
- NNI_ASSERT(aio != NULL);
+ // aio was canceled
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -169,6 +175,7 @@ nni_ipc_pipe_recv_cb(void *arg)
}
pipe->user_rxaio = NULL;
nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -181,6 +188,7 @@ nni_ipc_pipe_recv_cb(void *arg)
// Check to make sure we got msg type 1.
if (pipe->rxhead[0] != 1) {
nni_aio_finish(aio, NNG_EPROTO, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -192,12 +200,19 @@ nni_ipc_pipe_recv_cb(void *arg)
if (len > pipe->rcvmax) {
pipe->user_rxaio = NULL;
nni_aio_finish(aio, NNG_EMSGSIZE, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
+ // Note that all IO on this pipe is blocked behind this
+ // allocation. We could possibly look at using a separate
+ // lock for the read side in the future, so that we allow
+ // transmits to proceed normally. In practice this is
+ // unlikely to be much of an issue though.
if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
pipe->user_rxaio = NULL;
nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -207,14 +222,8 @@ nni_ipc_pipe_recv_cb(void *arg)
pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
pipe->rxaio.a_niov = 1;
- rv = nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio);
- if (rv != 0) {
- pipe->user_rxaio = NULL;
- nni_msg_free(pipe->rxmsg);
- pipe->rxmsg = NULL;
- nni_aio_finish(aio, rv, 0);
- return;
- }
+ nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -224,6 +233,21 @@ nni_ipc_pipe_recv_cb(void *arg)
aio->a_msg = pipe->rxmsg;
pipe->rxmsg = NULL;
nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg));
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
+nni_ipc_cancel_tx(nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ pipe->user_txaio = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+
+ // stop the underlying aio ... we don't want a result for it.
+ nni_aio_stop(&pipe->txaio);
}
@@ -234,10 +258,17 @@ nni_ipc_pipe_aio_send(void *arg, nni_aio *aio)
nni_msg *msg = aio->a_msg;
uint64_t len;
+ len = nni_msg_len(msg) + nni_msg_header_len(msg);
+
+ nni_mtx_lock(&pipe->mtx);
+ if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+ }
+
pipe->user_txaio = aio;
pipe->txhead[0] = 1; // message type, 1.
- len = nni_msg_len(msg) + nni_msg_header_len(msg);
NNI_PUT64(pipe->txhead + 1, len);
pipe->txaio.a_iov[0].iov_buf = pipe->txhead;
@@ -248,7 +279,23 @@ nni_ipc_pipe_aio_send(void *arg, nni_aio *aio)
pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
pipe->txaio.a_niov = 3;
- return (nni_plat_ipc_aio_send(pipe->isp, &pipe->txaio));
+ nni_plat_ipc_aio_send(pipe->isp, &pipe->txaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+}
+
+
+static void
+nni_ipc_cancel_rx(nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ pipe->user_rxaio = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+
+ // stop the underlying aio ... we don't want a result for it.
+ nni_aio_stop(&pipe->rxaio);
}
@@ -257,8 +304,14 @@ nni_ipc_pipe_aio_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- pipe->user_rxaio = aio;
+ nni_mtx_lock(&pipe->mtx);
+ if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+ }
+
+ pipe->user_rxaio = aio;
NNI_ASSERT(pipe->rxmsg == NULL);
// Schedule a read of the IPC header.
@@ -266,7 +319,9 @@ nni_ipc_pipe_aio_recv(void *arg, nni_aio *aio)
pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead);
pipe->rxaio.a_niov = 1;
- return (nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio));
+ nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 1c1eabbf..4aecef65 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -35,6 +35,7 @@ struct nni_tcp_pipe {
nni_aio txaio;
nni_aio rxaio;
nni_msg * rxmsg;
+ nni_mtx mtx;
};
struct nni_tcp_ep {
@@ -72,6 +73,24 @@ nni_tcp_pipe_close(void *arg)
}
+static void
+nni_tcp_pipe_fini(void *arg)
+{
+ nni_tcp_pipe *pipe = arg;
+
+ nni_aio_fini(&pipe->rxaio);
+ nni_aio_fini(&pipe->txaio);
+ if (pipe->tsp != NULL) {
+ nni_plat_tcp_fini(pipe->tsp);
+ }
+ if (pipe->rxmsg) {
+ nni_msg_free(pipe->rxmsg);
+ }
+
+ NNI_FREE_STRUCT(pipe);
+}
+
+
static int
nni_tcp_pipe_init(void **argp)
{
@@ -81,39 +100,26 @@ nni_tcp_pipe_init(void **argp)
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
+ goto fail;
+ }
if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) {
- NNI_FREE_STRUCT(pipe);
- return (rv);
+ goto fail;
}
rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe);
if (rv != 0) {
- nni_plat_tcp_fini(pipe->tsp);
- NNI_FREE_STRUCT(pipe);
- return (rv);
+ goto fail;
}
rv = nni_aio_init(&pipe->rxaio, nni_tcp_pipe_recv_cb, pipe);
if (rv != 0) {
- nni_aio_fini(&pipe->txaio);
- nni_plat_tcp_fini(pipe->tsp);
- NNI_FREE_STRUCT(pipe);
+ goto fail;
}
*argp = pipe;
return (0);
-}
-
-
-static void
-nni_tcp_pipe_fini(void *arg)
-{
- nni_tcp_pipe *pipe = arg;
- if (pipe->rxmsg) {
- nni_msg_free(pipe->rxmsg);
- }
- nni_aio_fini(&pipe->rxaio);
- nni_aio_fini(&pipe->txaio);
- nni_plat_tcp_fini(pipe->tsp);
- NNI_FREE_STRUCT(pipe);
+fail:
+ nni_tcp_pipe_fini(pipe);
+ return (rv);
}
@@ -125,23 +131,22 @@ nni_tcp_pipe_send_cb(void *arg)
nni_aio *aio;
size_t len;
+ nni_mtx_lock(&pipe->mtx);
if ((aio = pipe->user_txaio) == NULL) {
- // This should never ever happen.
- NNI_ASSERT(aio != NULL);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
pipe->user_txaio = NULL;
if ((rv = nni_aio_result(&pipe->txaio)) != 0) {
- nni_aio_finish(aio, rv, 0);
- return;
+ len = 0;
+ } else {
+ len = nni_msg_len(aio->a_msg);
+ nni_msg_free(aio->a_msg);
+ aio->a_msg = NULL;
}
-
- len = nni_msg_len(aio->a_msg);
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
-
nni_aio_finish(aio, 0, len);
+ nni_mtx_unlock(&pipe->mtx);
}
@@ -152,10 +157,11 @@ nni_tcp_pipe_recv_cb(void *arg)
nni_aio *aio;
int rv;
+ nni_mtx_lock(&pipe->mtx);
+
aio = pipe->user_rxaio;
if (aio == NULL) {
- // This should never ever happen.
- NNI_ASSERT(aio != NULL);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -169,6 +175,7 @@ nni_tcp_pipe_recv_cb(void *arg)
}
pipe->user_rxaio = NULL;
nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -185,12 +192,14 @@ nni_tcp_pipe_recv_cb(void *arg)
if (len > pipe->rcvmax) {
pipe->user_rxaio = NULL;
nni_aio_finish(aio, NNG_EMSGSIZE, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
pipe->user_rxaio = NULL;
nni_aio_finish(aio, rv, 0);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -200,14 +209,8 @@ nni_tcp_pipe_recv_cb(void *arg)
pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
pipe->rxaio.a_niov = 1;
- rv = nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio);
- if (rv != 0) {
- pipe->user_rxaio = NULL;
- nni_msg_free(pipe->rxmsg);
- pipe->rxmsg = NULL;
- nni_aio_finish(aio, rv, 0);
- return;
- }
+ nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio);
+ nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -217,6 +220,21 @@ nni_tcp_pipe_recv_cb(void *arg)
aio->a_msg = pipe->rxmsg;
pipe->rxmsg = NULL;
nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg));
+ nni_mtx_unlock(&pipe->mtx);
+}
+
+
+static void
+nni_tcp_cancel_tx(nni_aio *aio)
+{
+ nni_tcp_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ pipe->user_txaio = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+
+ // stop the underlying aio ... we don't want a result for it.
+ nni_aio_stop(&pipe->txaio);
}
@@ -227,9 +245,17 @@ nni_tcp_pipe_aio_send(void *arg, nni_aio *aio)
nni_msg *msg = aio->a_msg;
uint64_t len;
+ len = nni_msg_len(msg) + nni_msg_header_len(msg);
+
+ nni_mtx_lock(&pipe->mtx);
+
+ if (nni_aio_start(aio, nni_tcp_cancel_tx, pipe) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+ }
+
pipe->user_txaio = aio;
- len = nni_msg_len(msg) + nni_msg_header_len(msg);
NNI_PUT64(pipe->txlen, len);
pipe->txaio.a_iov[0].iov_buf = pipe->txlen;
@@ -240,7 +266,23 @@ nni_tcp_pipe_aio_send(void *arg, nni_aio *aio)
pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
pipe->txaio.a_niov = 3;
- return (nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio));
+ nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+}
+
+
+static void
+nni_tcp_cancel_rx(nni_aio *aio)
+{
+ nni_tcp_pipe *pipe = aio->a_prov_data;
+
+ nni_mtx_lock(&pipe->mtx);
+ pipe->user_rxaio = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+
+ // stop the underlying aio ... we don't want a result for it.
+ nni_aio_stop(&pipe->rxaio);
}
@@ -249,6 +291,12 @@ nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *pipe = arg;
+ nni_mtx_lock(&pipe->mtx);
+
+ if (nni_aio_start(aio, nni_tcp_cancel_rx, pipe) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
+ }
pipe->user_rxaio = aio;
NNI_ASSERT(pipe->rxmsg == NULL);
@@ -258,7 +306,9 @@ nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio)
pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen);
pipe->rxaio.a_niov = 1;
- return (nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio));
+ nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return (0);
}