aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/transport/ipc/ipc.c157
-rw-r--r--src/transport/tcp/tcp.c172
-rw-r--r--src/transport/tls/tls.c169
3 files changed, 306 insertions, 192 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 9d25ed72..e2bb5f99 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -37,8 +37,8 @@ struct nni_ipc_pipe {
size_t wanttxhead;
size_t wantrxhead;
- nni_aio *user_txaio;
- nni_aio *user_rxaio;
+ nni_list recvq;
+ nni_list sendq;
nni_aio *user_negaio;
nni_aio *txaio;
nni_aio *rxaio;
@@ -57,6 +57,8 @@ struct nni_ipc_ep {
nni_mtx mtx;
};
+static void nni_ipc_pipe_dosend(nni_ipc_pipe *, nni_aio *);
+static void nni_ipc_pipe_dorecv(nni_ipc_pipe *);
static void nni_ipc_pipe_send_cb(void *);
static void nni_ipc_pipe_recv_cb(void *);
static void nni_ipc_pipe_nego_cb(void *);
@@ -119,6 +121,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
nni_ipc_pipe_fini(p);
return (rv);
}
+ nni_aio_list_init(&p->sendq);
+ nni_aio_list_init(&p->recvq);
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -215,17 +219,16 @@ nni_ipc_pipe_send_cb(void *arg)
size_t n;
nni_mtx_lock(&pipe->mtx);
- if ((aio = pipe->user_txaio) == NULL) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
+ aio = nni_list_first(&pipe->sendq);
if ((rv = nni_aio_result(txaio)) != 0) {
- pipe->user_txaio = NULL;
+ // Intentionally we do not queue up another transfer.
+ // There's an excellent chance that the pipe is no longer
+ // usable, with a partial transfer.
+ // The protocol should see this error, and close the
+ // pipe itself, we hope.
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&pipe->mtx);
- msg = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
- nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
return;
}
@@ -238,11 +241,18 @@ nni_ipc_pipe_send_cb(void *arg)
return;
}
+ nni_aio_list_remove(aio);
+ if (!nni_list_empty(&pipe->sendq)) {
+ // schedule next send
+ nni_ipc_pipe_dosend(pipe, nni_list_first(&pipe->sendq));
+ }
nni_mtx_unlock(&pipe->mtx);
+
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
+ nni_aio_set_synch(aio);
nni_aio_finish(aio, 0, n);
}
@@ -257,11 +267,7 @@ nni_ipc_pipe_recv_cb(void *arg)
nni_aio * rxaio = pipe->rxaio;
nni_mtx_lock(&pipe->mtx);
- if ((aio = pipe->user_rxaio) == NULL) {
- // aio was canceled
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
+ aio = nni_list_first(&pipe->recvq);
if ((rv = nni_aio_result(rxaio)) != 0) {
// Error on receive. This has to cause an error back
@@ -325,19 +331,28 @@ nni_ipc_pipe_recv_cb(void *arg)
// Otherwise we got a message read completely. Let the user know the
// good news.
- pipe->user_rxaio = NULL;
- msg = pipe->rxmsg;
- pipe->rxmsg = NULL;
+
+ nni_aio_list_remove(aio);
+ msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
+ if (!nni_list_empty(&pipe->recvq)) {
+ nni_ipc_pipe_dorecv(pipe);
+ }
nni_mtx_unlock(&pipe->mtx);
- nni_aio_finish_msg(aio, msg);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_set_synch(aio);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
return;
recv_error:
- pipe->user_rxaio = NULL;
- msg = pipe->rxmsg;
- pipe->rxmsg = NULL;
+ nni_aio_list_remove(aio);
+ msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
+ // Intentionally, we do not queue up another receive.
+ // The protocol should notice this error and close the pipe.
nni_mtx_unlock(&pipe->mtx);
+
nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
}
@@ -348,37 +363,36 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv)
nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
- if (pipe->user_txaio != aio) {
+ if (!nni_aio_list_active(aio)) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ // If this is being sent, then cancel the pending transfer.
+ // The callback on the txaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&pipe->sendq) == aio) {
+ nni_aio_abort(pipe->txaio, rv);
nni_mtx_unlock(&pipe->mtx);
return;
}
- pipe->user_txaio = NULL;
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&pipe->mtx);
-
- nni_aio_abort(pipe->txaio, rv);
nni_aio_finish_error(aio, rv);
}
static void
-nni_ipc_pipe_send(void *arg, nni_aio *aio)
+nni_ipc_pipe_dosend(nni_ipc_pipe *pipe, nni_aio *aio)
{
- nni_ipc_pipe *pipe = arg;
- nni_msg * msg = nni_aio_get_msg(aio);
- uint64_t len;
- nni_aio * txaio;
- int niov;
- nni_iov iov[3];
+ nni_aio *txaio;
+ nni_msg *msg;
+ int niov;
+ nni_iov iov[3];
+ uint64_t len;
+ // This runs to send the message.
+ msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg) + nni_msg_header_len(msg);
- nni_mtx_lock(&pipe->mtx);
- if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
-
- pipe->user_txaio = aio;
-
pipe->txhead[0] = 1; // message type, 1.
NNI_PUT64(pipe->txhead + 1, len);
@@ -398,43 +412,54 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
niov++;
}
nni_aio_set_iov(txaio, niov, iov);
-
nni_plat_ipc_pipe_send(pipe->ipp, txaio);
- nni_mtx_unlock(&pipe->mtx);
}
static void
-nni_ipc_cancel_rx(nni_aio *aio, int rv)
+nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
- nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
+ nni_ipc_pipe *pipe = arg;
nni_mtx_lock(&pipe->mtx);
- if (pipe->user_rxaio != aio) {
+ if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
nni_mtx_unlock(&pipe->mtx);
return;
}
- pipe->user_rxaio = NULL;
+ nni_list_append(&pipe->sendq, aio);
+ if (nni_list_first(&pipe->sendq) == aio) {
+ nni_ipc_pipe_dosend(pipe, aio);
+ }
nni_mtx_unlock(&pipe->mtx);
-
- nni_aio_abort(pipe->rxaio, rv);
- nni_aio_finish_error(aio, rv);
}
static void
-nni_ipc_pipe_recv(void *arg, nni_aio *aio)
+nni_ipc_cancel_rx(nni_aio *aio, int rv)
{
- nni_ipc_pipe *pipe = arg;
- nni_aio * rxaio;
- nni_iov iov;
+ nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
-
- if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
+ if (!nni_aio_list_active(aio)) {
nni_mtx_unlock(&pipe->mtx);
return;
}
+ // If receive in progress, then cancel the pending transfer.
+ // The callback on the rxaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&pipe->recvq) == aio) {
+ nni_aio_abort(pipe->rxaio, rv);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+}
- pipe->user_rxaio = aio;
+static void
+nni_ipc_pipe_dorecv(nni_ipc_pipe *pipe)
+{
+ nni_aio *rxaio;
+ nni_iov iov;
NNI_ASSERT(pipe->rxmsg == NULL);
// Schedule a read of the IPC header.
@@ -444,6 +469,24 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
+}
+
+static void
+nni_ipc_pipe_recv(void *arg, nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = arg;
+
+ nni_mtx_lock(&pipe->mtx);
+
+ if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+
+ nni_list_append(&pipe->recvq, aio);
+ if (nni_list_first(&pipe->recvq) == aio) {
+ nni_ipc_pipe_dorecv(pipe);
+ }
nni_mtx_unlock(&pipe->mtx);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 4837c8d3..7f819d4f 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -27,8 +27,8 @@ struct nni_tcp_pipe {
uint16_t proto;
size_t rcvmax;
- nni_aio *user_txaio;
- nni_aio *user_rxaio;
+ nni_list recvq;
+ nni_list sendq;
nni_aio *user_negaio;
uint8_t txlen[sizeof(uint64_t)];
@@ -57,6 +57,8 @@ struct nni_tcp_ep {
nni_mtx mtx;
};
+static void nni_tcp_pipe_dosend(nni_tcp_pipe *, nni_aio *);
+static void nni_tcp_pipe_dorecv(nni_tcp_pipe *);
static void nni_tcp_pipe_send_cb(void *);
static void nni_tcp_pipe_recv_cb(void *);
static void nni_tcp_pipe_nego_cb(void *);
@@ -119,6 +121,8 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
nni_tcp_pipe_fini(p);
return (rv);
}
+ nni_aio_list_init(&p->recvq);
+ nni_aio_list_init(&p->sendq);
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -213,17 +217,16 @@ nni_tcp_pipe_send_cb(void *arg)
nni_aio * txaio = p->txaio;
nni_mtx_lock(&p->mtx);
- if ((aio = p->user_txaio) == NULL) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ aio = nni_list_first(&p->sendq);
if ((rv = nni_aio_result(txaio)) != 0) {
- p->user_txaio = NULL;
+ // Intentionally we do not queue up another transfer.
+ // There's an excellent chance that the pipe is no longer
+ // usable, with a partial transfer.
+ // The protocol should see this error, and close the
+ // pipe itself, we hope.
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&p->mtx);
- msg = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
- nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
return;
}
@@ -236,11 +239,18 @@ nni_tcp_pipe_send_cb(void *arg)
return;
}
+ nni_aio_list_remove(aio);
+ if (!nni_list_empty(&p->sendq)) {
+ // schedule next send
+ nni_tcp_pipe_dosend(p, nni_list_first(&p->sendq));
+ }
nni_mtx_unlock(&p->mtx);
+
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
+ nni_aio_set_synch(aio);
nni_aio_finish(aio, 0, n);
}
@@ -252,17 +262,10 @@ nni_tcp_pipe_recv_cb(void *arg)
int rv;
size_t n;
nni_msg * msg;
- nni_aio * rxaio;
+ nni_aio * rxaio = p->rxaio;
nni_mtx_lock(&p->mtx);
-
- if ((aio = p->user_rxaio) == NULL) {
- // Canceled.
- nni_mtx_unlock(&p->mtx);
- return;
- }
-
- rxaio = p->rxaio;
+ aio = nni_list_first(&p->recvq);
if ((rv = nni_aio_result(rxaio)) != 0) {
goto recv_error;
@@ -310,18 +313,27 @@ nni_tcp_pipe_recv_cb(void *arg)
}
// We read a message completely. Let the user know the good news.
- p->user_rxaio = NULL;
- msg = p->rxmsg;
- p->rxmsg = NULL;
+ nni_aio_list_remove(aio);
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
+ if (!nni_list_empty(&p->recvq)) {
+ nni_tcp_pipe_dorecv(p);
+ }
nni_mtx_unlock(&p->mtx);
- nni_aio_finish_msg(aio, msg);
+
+ nni_aio_set_synch(aio);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
return;
recv_error:
- p->user_rxaio = NULL;
- msg = p->rxmsg;
- p->rxmsg = NULL;
+ nni_aio_list_remove(aio);
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
+ // Intentionally, we do not queue up another receive.
+ // The protocol should notice this error and close the pipe.
nni_mtx_unlock(&p->mtx);
+
nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
}
@@ -332,45 +344,43 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv)
nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
- if (p->user_txaio != aio) {
+ if (!nni_aio_list_active(aio)) {
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_txaio = NULL;
+ // If this is being sent, then cancel the pending transfer.
+ // The callback on the txaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&p->sendq) == aio) {
+ nni_aio_abort(p->txaio, rv);
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&p->mtx);
- // cancel the underlying operation.
- nni_aio_abort(p->txaio, rv);
nni_aio_finish_error(aio, rv);
}
static void
-nni_tcp_pipe_send(void *arg, nni_aio *aio)
+nni_tcp_pipe_dosend(nni_tcp_pipe *p, nni_aio *aio)
{
- nni_tcp_pipe *p = arg;
- nni_msg * msg = nni_aio_get_msg(aio);
- uint64_t len;
- nni_aio * txaio;
- int niov;
- nni_iov iov[3];
+ nni_aio *txaio;
+ nni_msg *msg;
+ int niov;
+ nni_iov iov[3];
+ uint64_t len;
+ // This runs to send the message.
+ msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg) + nni_msg_header_len(msg);
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
-
- p->user_txaio = aio;
-
NNI_PUT64(p->txlen, len);
- niov = 0;
- txaio = p->txaio;
- iov[niov].iov_buf = p->txlen;
- iov[niov].iov_len = sizeof(p->txlen);
+ txaio = p->txaio;
+ niov = 0;
+ iov[0].iov_buf = p->txlen;
+ iov[0].iov_len = sizeof(p->txlen);
niov++;
if (nni_msg_header_len(msg) > 0) {
iov[niov].iov_buf = nni_msg_header(msg);
@@ -383,53 +393,79 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
niov++;
}
nni_aio_set_iov(txaio, niov, iov);
-
nni_plat_tcp_pipe_send(p->tpp, txaio);
- nni_mtx_unlock(&p->mtx);
}
static void
-nni_tcp_cancel_rx(nni_aio *aio, int rv)
+nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
- nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
+ nni_tcp_pipe *p = arg;
nni_mtx_lock(&p->mtx);
- if (p->user_rxaio != aio) {
+ if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) {
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_rxaio = NULL;
+ nni_list_append(&p->sendq, aio);
+ if (nni_list_first(&p->sendq) == aio) {
+ nni_tcp_pipe_dosend(p, aio);
+ }
nni_mtx_unlock(&p->mtx);
-
- // cancel the underlying operation.
- nni_aio_abort(p->rxaio, rv);
- nni_aio_finish_error(aio, rv);
}
static void
-nni_tcp_pipe_recv(void *arg, nni_aio *aio)
+nni_tcp_cancel_rx(nni_aio *aio, int rv)
{
- nni_tcp_pipe *p = arg;
- nni_aio * rxaio;
- nni_iov iov;
+ nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) {
+ if (!nni_aio_list_active(aio)) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ // If receive in progress, then cancel the pending transfer.
+ // The callback on the rxaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&p->recvq) == aio) {
+ nni_aio_abort(p->rxaio, rv);
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_rxaio = aio;
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+}
+static void
+nni_tcp_pipe_dorecv(nni_tcp_pipe *p)
+{
+ nni_aio *rxaio;
+ nni_iov iov;
NNI_ASSERT(p->rxmsg == NULL);
- // Schedule a read of the TCP header.
+ // Schedule a read of the IPC header.
rxaio = p->rxaio;
iov.iov_buf = p->rxlen;
iov.iov_len = sizeof(p->rxlen);
nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_tcp_pipe_recv(p->tpp, rxaio);
+}
+
+static void
+nni_tcp_pipe_recv(void *arg, nni_aio *aio)
+{
+ nni_tcp_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_list_append(&p->recvq, aio);
+ if (nni_list_first(&p->recvq) == aio) {
+ nni_tcp_pipe_dorecv(p);
+ }
nni_mtx_unlock(&p->mtx);
}
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 84125c48..610a7f7c 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -32,8 +32,8 @@ struct nni_tls_pipe {
uint16_t proto;
size_t rcvmax;
- nni_aio *user_txaio;
- nni_aio *user_rxaio;
+ nni_list sendq;
+ nni_list recvq;
nni_aio *user_negaio;
uint8_t txlen[sizeof(uint64_t)];
@@ -65,6 +65,8 @@ struct nni_tls_ep {
int mode;
};
+static void nni_tls_pipe_dorecv(nni_tls_pipe *);
+static void nni_tls_pipe_dosend(nni_tls_pipe *, nni_aio *);
static void nni_tls_pipe_send_cb(void *);
static void nni_tls_pipe_recv_cb(void *);
static void nni_tls_pipe_nego_cb(void *);
@@ -128,6 +130,8 @@ nni_tls_pipe_init(nni_tls_pipe **pipep, nni_tls_ep *ep, void *tpp)
nni_tls_pipe_fini(p);
return (rv);
}
+ nni_aio_list_init(&p->recvq);
+ nni_aio_list_init(&p->sendq);
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -222,17 +226,16 @@ nni_tls_pipe_send_cb(void *arg)
nni_aio * txaio = p->txaio;
nni_mtx_lock(&p->mtx);
- if ((aio = p->user_txaio) == NULL) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ aio = nni_list_first(&p->sendq);
if ((rv = nni_aio_result(txaio)) != 0) {
- p->user_txaio = NULL;
+ // Intentionally we do not queue up another transfer.
+ // There's an excellent chance that the pipe is no longer
+ // usable, with a partial transfer.
+ // The protocol should see this error, and close the
+ // pipe itself, we hope.
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&p->mtx);
- msg = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
- nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
return;
}
@@ -244,12 +247,17 @@ nni_tls_pipe_send_cb(void *arg)
nni_mtx_unlock(&p->mtx);
return;
}
-
+ nni_aio_list_remove(aio);
+ if (!nni_list_empty(&p->sendq)) {
+ nni_tls_pipe_dosend(p, nni_list_first(&p->sendq));
+ }
nni_mtx_unlock(&p->mtx);
+
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
+ nni_aio_set_synch(aio);
nni_aio_finish(aio, 0, n);
}
@@ -261,17 +269,10 @@ nni_tls_pipe_recv_cb(void *arg)
int rv;
size_t n;
nni_msg * msg;
- nni_aio * rxaio;
+ nni_aio * rxaio = p->rxaio;
nni_mtx_lock(&p->mtx);
-
- if ((aio = p->user_rxaio) == NULL) {
- // Canceled.
- nni_mtx_unlock(&p->mtx);
- return;
- }
-
- rxaio = p->rxaio;
+ aio = nni_list_first(&p->recvq);
if ((rv = nni_aio_result(p->rxaio)) != 0) {
goto recv_error;
@@ -320,17 +321,25 @@ nni_tls_pipe_recv_cb(void *arg)
}
// We read a message completely. Let the user know the good news.
- p->user_rxaio = NULL;
- msg = p->rxmsg;
- p->rxmsg = NULL;
+ nni_aio_list_remove(aio);
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
+ if (!nni_list_empty(&p->recvq)) {
+ nni_tls_pipe_dorecv(p);
+ }
nni_mtx_unlock(&p->mtx);
- nni_aio_finish_msg(aio, msg);
+
+ nni_aio_set_synch(aio);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
return;
recv_error:
- p->user_rxaio = NULL;
- msg = p->rxmsg;
- p->rxmsg = NULL;
+ nni_aio_list_remove(aio);
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
+ // Intentionally, we do not queue up another receive.
+ // The protocol should notice this error and close the pipe.
nni_mtx_unlock(&p->mtx);
nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
@@ -342,43 +351,40 @@ nni_tls_cancel_tx(nni_aio *aio, int rv)
nni_tls_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
- if (p->user_txaio != aio) {
+ if (!nni_aio_list_active(aio)) {
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_txaio = NULL;
+ // If this is being sent, then cancel the pending transfer.
+ // The callback on the txaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&p->sendq) == aio) {
+ nni_aio_abort(p->txaio, rv);
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&p->mtx);
- // cancel the underlying operation.
- nni_aio_abort(p->txaio, rv);
nni_aio_finish_error(aio, rv);
}
static void
-nni_tls_pipe_send(void *arg, nni_aio *aio)
+nni_tls_pipe_dosend(nni_tls_pipe *p, nni_aio *aio)
{
- nni_tls_pipe *p = arg;
- nni_msg * msg = nni_aio_get_msg(aio);
- uint64_t len;
- nni_aio * txaio;
- int niov;
- nni_iov iov[3];
+ nni_aio *txaio;
+ nni_msg *msg;
+ int niov;
+ nni_iov iov[3];
+ uint64_t len;
+ msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg) + nni_msg_header_len(msg);
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
-
- p->user_txaio = aio;
-
NNI_PUT64(p->txlen, len);
- niov = 0;
txaio = p->txaio;
+ niov = 0;
iov[niov].iov_buf = p->txlen;
iov[niov].iov_len = sizeof(p->txlen);
niov++;
@@ -395,52 +401,81 @@ nni_tls_pipe_send(void *arg, nni_aio *aio)
nni_aio_set_iov(txaio, niov, iov);
nni_tls_send(p->tls, txaio);
- nni_mtx_unlock(&p->mtx);
}
static void
-nni_tls_cancel_rx(nni_aio *aio, int rv)
+nni_tls_pipe_send(void *arg, nni_aio *aio)
{
- nni_tls_pipe *p = nni_aio_get_prov_data(aio);
+ nni_tls_pipe *p = arg;
nni_mtx_lock(&p->mtx);
- if (p->user_rxaio != aio) {
+
+ if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) {
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_rxaio = NULL;
- nni_mtx_unlock(&p->mtx);
- // cancel the underlying operation.
- nni_aio_abort(p->rxaio, rv);
- nni_aio_finish_error(aio, rv);
+ nni_list_append(&p->sendq, aio);
+ if (nni_list_first(&p->sendq) == aio) {
+ nni_tls_pipe_dosend(p, aio);
+ }
+ nni_mtx_unlock(&p->mtx);
}
static void
-nni_tls_pipe_recv(void *arg, nni_aio *aio)
+nni_tls_cancel_rx(nni_aio *aio, int rv)
{
- nni_tls_pipe *p = arg;
- nni_aio * rxaio;
- nni_iov iov;
+ nni_tls_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) {
+ if (!nni_aio_list_active(aio)) {
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_rxaio = aio;
+ // If receive in progress, then cancel the pending transfer.
+ // The callback on the rxaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&p->recvq) == aio) {
+ nni_aio_abort(p->rxaio, rv);
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+}
+static void
+nni_tls_pipe_dorecv(nni_tls_pipe *p)
+{
+ nni_aio *rxaio;
+ nni_iov iov;
NNI_ASSERT(p->rxmsg == NULL);
- // Schedule a read of the TCP header.
- rxaio = p->rxaio;
-
+ // Schedule a read of the IPC header.
+ rxaio = p->rxaio;
iov.iov_buf = p->rxlen;
iov.iov_len = sizeof(p->rxlen);
nni_aio_set_iov(rxaio, 1, &iov);
nni_tls_recv(p->tls, rxaio);
+}
+
+static void
+nni_tls_pipe_recv(void *arg, nni_aio *aio)
+{
+ nni_tls_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+
+ if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_list_append(&p->recvq, aio);
+ if (nni_list_first(&p->recvq) == aio) {
+ nni_tls_pipe_dorecv(p);
+ }
nni_mtx_unlock(&p->mtx);
}