aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-03-29 19:35:09 -0700
committerGarrett D'Amore <garrett@damore.org>2018-03-30 12:38:56 -0700
commit99907b52872d9fc9c68f1af60299dd8476c0a603 (patch)
treeef465cb6c71efe9b5294069627be57f99a8d9f8f /src
parent5f69a5ba55058c7f5bfc4d5a250e72c56f31e0eb (diff)
downloadnng-99907b52872d9fc9c68f1af60299dd8476c0a603.tar.gz
nng-99907b52872d9fc9c68f1af60299dd8476c0a603.tar.bz2
nng-99907b52872d9fc9c68f1af60299dd8476c0a603.zip
fixes #317 TLS, TCP, and IPC should support multiple outstanding ops
fixes #22 Consider using synchronous completions sometimes Transport improvements for IPC, TCP, and TLS. This change does three things. First it permits multiple outstanding receives or sends on the transport. This change is being made to accomodate some other changes in the protocols where it might be advantageous to post send or receives directly against the transport pipe without going through another level of indirection. Second, it changes the normal completions to be performed synchronously. This translates into a rather major performance improvement, reducing latency by some 27%, and thereby improving performance altogether. (This elminates two extra context switches per transaction!) FInally, we can save some extra checks and conditions because we know that completions cannot happen if we don't have a pending operation (we no longer complete out of sequence), and we only call the dosend operation when we have something to send. This can eliminate some pipeline stalls.
Diffstat (limited to 'src')
-rw-r--r--src/transport/ipc/ipc.c157
-rw-r--r--src/transport/tcp/tcp.c172
-rw-r--r--src/transport/tls/tls.c169
3 files changed, 306 insertions, 192 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 9d25ed72..e2bb5f99 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -37,8 +37,8 @@ struct nni_ipc_pipe {
size_t wanttxhead;
size_t wantrxhead;
- nni_aio *user_txaio;
- nni_aio *user_rxaio;
+ nni_list recvq;
+ nni_list sendq;
nni_aio *user_negaio;
nni_aio *txaio;
nni_aio *rxaio;
@@ -57,6 +57,8 @@ struct nni_ipc_ep {
nni_mtx mtx;
};
+static void nni_ipc_pipe_dosend(nni_ipc_pipe *, nni_aio *);
+static void nni_ipc_pipe_dorecv(nni_ipc_pipe *);
static void nni_ipc_pipe_send_cb(void *);
static void nni_ipc_pipe_recv_cb(void *);
static void nni_ipc_pipe_nego_cb(void *);
@@ -119,6 +121,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
nni_ipc_pipe_fini(p);
return (rv);
}
+ nni_aio_list_init(&p->sendq);
+ nni_aio_list_init(&p->recvq);
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -215,17 +219,16 @@ nni_ipc_pipe_send_cb(void *arg)
size_t n;
nni_mtx_lock(&pipe->mtx);
- if ((aio = pipe->user_txaio) == NULL) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
+ aio = nni_list_first(&pipe->sendq);
if ((rv = nni_aio_result(txaio)) != 0) {
- pipe->user_txaio = NULL;
+ // Intentionally we do not queue up another transfer.
+ // There's an excellent chance that the pipe is no longer
+ // usable, with a partial transfer.
+ // The protocol should see this error, and close the
+ // pipe itself, we hope.
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&pipe->mtx);
- msg = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
- nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
return;
}
@@ -238,11 +241,18 @@ nni_ipc_pipe_send_cb(void *arg)
return;
}
+ nni_aio_list_remove(aio);
+ if (!nni_list_empty(&pipe->sendq)) {
+ // schedule next send
+ nni_ipc_pipe_dosend(pipe, nni_list_first(&pipe->sendq));
+ }
nni_mtx_unlock(&pipe->mtx);
+
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
+ nni_aio_set_synch(aio);
nni_aio_finish(aio, 0, n);
}
@@ -257,11 +267,7 @@ nni_ipc_pipe_recv_cb(void *arg)
nni_aio * rxaio = pipe->rxaio;
nni_mtx_lock(&pipe->mtx);
- if ((aio = pipe->user_rxaio) == NULL) {
- // aio was canceled
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
+ aio = nni_list_first(&pipe->recvq);
if ((rv = nni_aio_result(rxaio)) != 0) {
// Error on receive. This has to cause an error back
@@ -325,19 +331,28 @@ nni_ipc_pipe_recv_cb(void *arg)
// Otherwise we got a message read completely. Let the user know the
// good news.
- pipe->user_rxaio = NULL;
- msg = pipe->rxmsg;
- pipe->rxmsg = NULL;
+
+ nni_aio_list_remove(aio);
+ msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
+ if (!nni_list_empty(&pipe->recvq)) {
+ nni_ipc_pipe_dorecv(pipe);
+ }
nni_mtx_unlock(&pipe->mtx);
- nni_aio_finish_msg(aio, msg);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_set_synch(aio);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
return;
recv_error:
- pipe->user_rxaio = NULL;
- msg = pipe->rxmsg;
- pipe->rxmsg = NULL;
+ nni_aio_list_remove(aio);
+ msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
+ // Intentionally, we do not queue up another receive.
+ // The protocol should notice this error and close the pipe.
nni_mtx_unlock(&pipe->mtx);
+
nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
}
@@ -348,37 +363,36 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv)
nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
- if (pipe->user_txaio != aio) {
+ if (!nni_aio_list_active(aio)) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ // If this is being sent, then cancel the pending transfer.
+ // The callback on the txaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&pipe->sendq) == aio) {
+ nni_aio_abort(pipe->txaio, rv);
nni_mtx_unlock(&pipe->mtx);
return;
}
- pipe->user_txaio = NULL;
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&pipe->mtx);
-
- nni_aio_abort(pipe->txaio, rv);
nni_aio_finish_error(aio, rv);
}
static void
-nni_ipc_pipe_send(void *arg, nni_aio *aio)
+nni_ipc_pipe_dosend(nni_ipc_pipe *pipe, nni_aio *aio)
{
- nni_ipc_pipe *pipe = arg;
- nni_msg * msg = nni_aio_get_msg(aio);
- uint64_t len;
- nni_aio * txaio;
- int niov;
- nni_iov iov[3];
+ nni_aio *txaio;
+ nni_msg *msg;
+ int niov;
+ nni_iov iov[3];
+ uint64_t len;
+ // This runs to send the message.
+ msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg) + nni_msg_header_len(msg);
- nni_mtx_lock(&pipe->mtx);
- if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
-
- pipe->user_txaio = aio;
-
pipe->txhead[0] = 1; // message type, 1.
NNI_PUT64(pipe->txhead + 1, len);
@@ -398,43 +412,54 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
niov++;
}
nni_aio_set_iov(txaio, niov, iov);
-
nni_plat_ipc_pipe_send(pipe->ipp, txaio);
- nni_mtx_unlock(&pipe->mtx);
}
static void
-nni_ipc_cancel_rx(nni_aio *aio, int rv)
+nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
- nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
+ nni_ipc_pipe *pipe = arg;
nni_mtx_lock(&pipe->mtx);
- if (pipe->user_rxaio != aio) {
+ if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
nni_mtx_unlock(&pipe->mtx);
return;
}
- pipe->user_rxaio = NULL;
+ nni_list_append(&pipe->sendq, aio);
+ if (nni_list_first(&pipe->sendq) == aio) {
+ nni_ipc_pipe_dosend(pipe, aio);
+ }
nni_mtx_unlock(&pipe->mtx);
-
- nni_aio_abort(pipe->rxaio, rv);
- nni_aio_finish_error(aio, rv);
}
static void
-nni_ipc_pipe_recv(void *arg, nni_aio *aio)
+nni_ipc_cancel_rx(nni_aio *aio, int rv)
{
- nni_ipc_pipe *pipe = arg;
- nni_aio * rxaio;
- nni_iov iov;
+ nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
-
- if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
+ if (!nni_aio_list_active(aio)) {
nni_mtx_unlock(&pipe->mtx);
return;
}
+ // If receive in progress, then cancel the pending transfer.
+ // The callback on the rxaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&pipe->recvq) == aio) {
+ nni_aio_abort(pipe->rxaio, rv);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+}
- pipe->user_rxaio = aio;
+static void
+nni_ipc_pipe_dorecv(nni_ipc_pipe *pipe)
+{
+ nni_aio *rxaio;
+ nni_iov iov;
NNI_ASSERT(pipe->rxmsg == NULL);
// Schedule a read of the IPC header.
@@ -444,6 +469,24 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
+}
+
+static void
+nni_ipc_pipe_recv(void *arg, nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = arg;
+
+ nni_mtx_lock(&pipe->mtx);
+
+ if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+
+ nni_list_append(&pipe->recvq, aio);
+ if (nni_list_first(&pipe->recvq) == aio) {
+ nni_ipc_pipe_dorecv(pipe);
+ }
nni_mtx_unlock(&pipe->mtx);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 4837c8d3..7f819d4f 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -27,8 +27,8 @@ struct nni_tcp_pipe {
uint16_t proto;
size_t rcvmax;
- nni_aio *user_txaio;
- nni_aio *user_rxaio;
+ nni_list recvq;
+ nni_list sendq;
nni_aio *user_negaio;
uint8_t txlen[sizeof(uint64_t)];
@@ -57,6 +57,8 @@ struct nni_tcp_ep {
nni_mtx mtx;
};
+static void nni_tcp_pipe_dosend(nni_tcp_pipe *, nni_aio *);
+static void nni_tcp_pipe_dorecv(nni_tcp_pipe *);
static void nni_tcp_pipe_send_cb(void *);
static void nni_tcp_pipe_recv_cb(void *);
static void nni_tcp_pipe_nego_cb(void *);
@@ -119,6 +121,8 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
nni_tcp_pipe_fini(p);
return (rv);
}
+ nni_aio_list_init(&p->recvq);
+ nni_aio_list_init(&p->sendq);
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -213,17 +217,16 @@ nni_tcp_pipe_send_cb(void *arg)
nni_aio * txaio = p->txaio;
nni_mtx_lock(&p->mtx);
- if ((aio = p->user_txaio) == NULL) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ aio = nni_list_first(&p->sendq);
if ((rv = nni_aio_result(txaio)) != 0) {
- p->user_txaio = NULL;
+ // Intentionally we do not queue up another transfer.
+ // There's an excellent chance that the pipe is no longer
+ // usable, with a partial transfer.
+ // The protocol should see this error, and close the
+ // pipe itself, we hope.
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&p->mtx);
- msg = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
- nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
return;
}
@@ -236,11 +239,18 @@ nni_tcp_pipe_send_cb(void *arg)
return;
}
+ nni_aio_list_remove(aio);
+ if (!nni_list_empty(&p->sendq)) {
+ // schedule next send
+ nni_tcp_pipe_dosend(p, nni_list_first(&p->sendq));
+ }
nni_mtx_unlock(&p->mtx);
+
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
+ nni_aio_set_synch(aio);
nni_aio_finish(aio, 0, n);
}
@@ -252,17 +262,10 @@ nni_tcp_pipe_recv_cb(void *arg)
int rv;
size_t n;
nni_msg * msg;
- nni_aio * rxaio;
+ nni_aio * rxaio = p->rxaio;
nni_mtx_lock(&p->mtx);
-
- if ((aio = p->user_rxaio) == NULL) {
- // Canceled.
- nni_mtx_unlock(&p->mtx);
- return;
- }
-
- rxaio = p->rxaio;
+ aio = nni_list_first(&p->recvq);
if ((rv = nni_aio_result(rxaio)) != 0) {
goto recv_error;
@@ -310,18 +313,27 @@ nni_tcp_pipe_recv_cb(void *arg)
}
// We read a message completely. Let the user know the good news.
- p->user_rxaio = NULL;
- msg = p->rxmsg;
- p->rxmsg = NULL;
+ nni_aio_list_remove(aio);
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
+ if (!nni_list_empty(&p->recvq)) {
+ nni_tcp_pipe_dorecv(p);
+ }
nni_mtx_unlock(&p->mtx);
- nni_aio_finish_msg(aio, msg);
+
+ nni_aio_set_synch(aio);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
return;
recv_error:
- p->user_rxaio = NULL;
- msg = p->rxmsg;
- p->rxmsg = NULL;
+ nni_aio_list_remove(aio);
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
+ // Intentionally, we do not queue up another receive.
+ // The protocol should notice this error and close the pipe.
nni_mtx_unlock(&p->mtx);
+
nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
}
@@ -332,45 +344,43 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv)
nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
- if (p->user_txaio != aio) {
+ if (!nni_aio_list_active(aio)) {
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_txaio = NULL;
+ // If this is being sent, then cancel the pending transfer.
+ // The callback on the txaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&p->sendq) == aio) {
+ nni_aio_abort(p->txaio, rv);
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&p->mtx);
- // cancel the underlying operation.
- nni_aio_abort(p->txaio, rv);
nni_aio_finish_error(aio, rv);
}
static void
-nni_tcp_pipe_send(void *arg, nni_aio *aio)
+nni_tcp_pipe_dosend(nni_tcp_pipe *p, nni_aio *aio)
{
- nni_tcp_pipe *p = arg;
- nni_msg * msg = nni_aio_get_msg(aio);
- uint64_t len;
- nni_aio * txaio;
- int niov;
- nni_iov iov[3];
+ nni_aio *txaio;
+ nni_msg *msg;
+ int niov;
+ nni_iov iov[3];
+ uint64_t len;
+ // This runs to send the message.
+ msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg) + nni_msg_header_len(msg);
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
-
- p->user_txaio = aio;
-
NNI_PUT64(p->txlen, len);
- niov = 0;
- txaio = p->txaio;
- iov[niov].iov_buf = p->txlen;
- iov[niov].iov_len = sizeof(p->txlen);
+ txaio = p->txaio;
+ niov = 0;
+ iov[0].iov_buf = p->txlen;
+ iov[0].iov_len = sizeof(p->txlen);
niov++;
if (nni_msg_header_len(msg) > 0) {
iov[niov].iov_buf = nni_msg_header(msg);
@@ -383,53 +393,79 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
niov++;
}
nni_aio_set_iov(txaio, niov, iov);
-
nni_plat_tcp_pipe_send(p->tpp, txaio);
- nni_mtx_unlock(&p->mtx);
}
static void
-nni_tcp_cancel_rx(nni_aio *aio, int rv)
+nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
- nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
+ nni_tcp_pipe *p = arg;
nni_mtx_lock(&p->mtx);
- if (p->user_rxaio != aio) {
+ if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) {
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_rxaio = NULL;
+ nni_list_append(&p->sendq, aio);
+ if (nni_list_first(&p->sendq) == aio) {
+ nni_tcp_pipe_dosend(p, aio);
+ }
nni_mtx_unlock(&p->mtx);
-
- // cancel the underlying operation.
- nni_aio_abort(p->rxaio, rv);
- nni_aio_finish_error(aio, rv);
}
static void
-nni_tcp_pipe_recv(void *arg, nni_aio *aio)
+nni_tcp_cancel_rx(nni_aio *aio, int rv)
{
- nni_tcp_pipe *p = arg;
- nni_aio * rxaio;
- nni_iov iov;
+ nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) {
+ if (!nni_aio_list_active(aio)) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ // If receive in progress, then cancel the pending transfer.
+ // The callback on the rxaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&p->recvq) == aio) {
+ nni_aio_abort(p->rxaio, rv);
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_rxaio = aio;
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+}
+static void
+nni_tcp_pipe_dorecv(nni_tcp_pipe *p)
+{
+ nni_aio *rxaio;
+ nni_iov iov;
NNI_ASSERT(p->rxmsg == NULL);
- // Schedule a read of the TCP header.
+ // Schedule a read of the IPC header.
rxaio = p->rxaio;
iov.iov_buf = p->rxlen;
iov.iov_len = sizeof(p->rxlen);
nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_tcp_pipe_recv(p->tpp, rxaio);
+}
+
+static void
+nni_tcp_pipe_recv(void *arg, nni_aio *aio)
+{
+ nni_tcp_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_list_append(&p->recvq, aio);
+ if (nni_list_first(&p->recvq) == aio) {
+ nni_tcp_pipe_dorecv(p);
+ }
nni_mtx_unlock(&p->mtx);
}
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 84125c48..610a7f7c 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -32,8 +32,8 @@ struct nni_tls_pipe {
uint16_t proto;
size_t rcvmax;
- nni_aio *user_txaio;
- nni_aio *user_rxaio;
+ nni_list sendq;
+ nni_list recvq;
nni_aio *user_negaio;
uint8_t txlen[sizeof(uint64_t)];
@@ -65,6 +65,8 @@ struct nni_tls_ep {
int mode;
};
+static void nni_tls_pipe_dorecv(nni_tls_pipe *);
+static void nni_tls_pipe_dosend(nni_tls_pipe *, nni_aio *);
static void nni_tls_pipe_send_cb(void *);
static void nni_tls_pipe_recv_cb(void *);
static void nni_tls_pipe_nego_cb(void *);
@@ -128,6 +130,8 @@ nni_tls_pipe_init(nni_tls_pipe **pipep, nni_tls_ep *ep, void *tpp)
nni_tls_pipe_fini(p);
return (rv);
}
+ nni_aio_list_init(&p->recvq);
+ nni_aio_list_init(&p->sendq);
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -222,17 +226,16 @@ nni_tls_pipe_send_cb(void *arg)
nni_aio * txaio = p->txaio;
nni_mtx_lock(&p->mtx);
- if ((aio = p->user_txaio) == NULL) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ aio = nni_list_first(&p->sendq);
if ((rv = nni_aio_result(txaio)) != 0) {
- p->user_txaio = NULL;
+ // Intentionally we do not queue up another transfer.
+ // There's an excellent chance that the pipe is no longer
+ // usable, with a partial transfer.
+ // The protocol should see this error, and close the
+ // pipe itself, we hope.
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&p->mtx);
- msg = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
- nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
return;
}
@@ -244,12 +247,17 @@ nni_tls_pipe_send_cb(void *arg)
nni_mtx_unlock(&p->mtx);
return;
}
-
+ nni_aio_list_remove(aio);
+ if (!nni_list_empty(&p->sendq)) {
+ nni_tls_pipe_dosend(p, nni_list_first(&p->sendq));
+ }
nni_mtx_unlock(&p->mtx);
+
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
+ nni_aio_set_synch(aio);
nni_aio_finish(aio, 0, n);
}
@@ -261,17 +269,10 @@ nni_tls_pipe_recv_cb(void *arg)
int rv;
size_t n;
nni_msg * msg;
- nni_aio * rxaio;
+ nni_aio * rxaio = p->rxaio;
nni_mtx_lock(&p->mtx);
-
- if ((aio = p->user_rxaio) == NULL) {
- // Canceled.
- nni_mtx_unlock(&p->mtx);
- return;
- }
-
- rxaio = p->rxaio;
+ aio = nni_list_first(&p->recvq);
if ((rv = nni_aio_result(p->rxaio)) != 0) {
goto recv_error;
@@ -320,17 +321,25 @@ nni_tls_pipe_recv_cb(void *arg)
}
// We read a message completely. Let the user know the good news.
- p->user_rxaio = NULL;
- msg = p->rxmsg;
- p->rxmsg = NULL;
+ nni_aio_list_remove(aio);
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
+ if (!nni_list_empty(&p->recvq)) {
+ nni_tls_pipe_dorecv(p);
+ }
nni_mtx_unlock(&p->mtx);
- nni_aio_finish_msg(aio, msg);
+
+ nni_aio_set_synch(aio);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
return;
recv_error:
- p->user_rxaio = NULL;
- msg = p->rxmsg;
- p->rxmsg = NULL;
+ nni_aio_list_remove(aio);
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
+ // Intentionally, we do not queue up another receive.
+ // The protocol should notice this error and close the pipe.
nni_mtx_unlock(&p->mtx);
nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
@@ -342,43 +351,40 @@ nni_tls_cancel_tx(nni_aio *aio, int rv)
nni_tls_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
- if (p->user_txaio != aio) {
+ if (!nni_aio_list_active(aio)) {
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_txaio = NULL;
+ // If this is being sent, then cancel the pending transfer.
+ // The callback on the txaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&p->sendq) == aio) {
+ nni_aio_abort(p->txaio, rv);
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&p->mtx);
- // cancel the underlying operation.
- nni_aio_abort(p->txaio, rv);
nni_aio_finish_error(aio, rv);
}
static void
-nni_tls_pipe_send(void *arg, nni_aio *aio)
+nni_tls_pipe_dosend(nni_tls_pipe *p, nni_aio *aio)
{
- nni_tls_pipe *p = arg;
- nni_msg * msg = nni_aio_get_msg(aio);
- uint64_t len;
- nni_aio * txaio;
- int niov;
- nni_iov iov[3];
+ nni_aio *txaio;
+ nni_msg *msg;
+ int niov;
+ nni_iov iov[3];
+ uint64_t len;
+ msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg) + nni_msg_header_len(msg);
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
-
- p->user_txaio = aio;
-
NNI_PUT64(p->txlen, len);
- niov = 0;
txaio = p->txaio;
+ niov = 0;
iov[niov].iov_buf = p->txlen;
iov[niov].iov_len = sizeof(p->txlen);
niov++;
@@ -395,52 +401,81 @@ nni_tls_pipe_send(void *arg, nni_aio *aio)
nni_aio_set_iov(txaio, niov, iov);
nni_tls_send(p->tls, txaio);
- nni_mtx_unlock(&p->mtx);
}
static void
-nni_tls_cancel_rx(nni_aio *aio, int rv)
+nni_tls_pipe_send(void *arg, nni_aio *aio)
{
- nni_tls_pipe *p = nni_aio_get_prov_data(aio);
+ nni_tls_pipe *p = arg;
nni_mtx_lock(&p->mtx);
- if (p->user_rxaio != aio) {
+
+ if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) {
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_rxaio = NULL;
- nni_mtx_unlock(&p->mtx);
- // cancel the underlying operation.
- nni_aio_abort(p->rxaio, rv);
- nni_aio_finish_error(aio, rv);
+ nni_list_append(&p->sendq, aio);
+ if (nni_list_first(&p->sendq) == aio) {
+ nni_tls_pipe_dosend(p, aio);
+ }
+ nni_mtx_unlock(&p->mtx);
}
static void
-nni_tls_pipe_recv(void *arg, nni_aio *aio)
+nni_tls_cancel_rx(nni_aio *aio, int rv)
{
- nni_tls_pipe *p = arg;
- nni_aio * rxaio;
- nni_iov iov;
+ nni_tls_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) {
+ if (!nni_aio_list_active(aio)) {
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_rxaio = aio;
+ // If receive in progress, then cancel the pending transfer.
+ // The callback on the rxaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&p->recvq) == aio) {
+ nni_aio_abort(p->rxaio, rv);
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+}
+static void
+nni_tls_pipe_dorecv(nni_tls_pipe *p)
+{
+ nni_aio *rxaio;
+ nni_iov iov;
NNI_ASSERT(p->rxmsg == NULL);
- // Schedule a read of the TCP header.
- rxaio = p->rxaio;
-
+ // Schedule a read of the IPC header.
+ rxaio = p->rxaio;
iov.iov_buf = p->rxlen;
iov.iov_len = sizeof(p->rxlen);
nni_aio_set_iov(rxaio, 1, &iov);
nni_tls_recv(p->tls, rxaio);
+}
+
+static void
+nni_tls_pipe_recv(void *arg, nni_aio *aio)
+{
+ nni_tls_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+
+ if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_list_append(&p->recvq, aio);
+ if (nni_list_first(&p->recvq) == aio) {
+ nni_tls_pipe_dorecv(p);
+ }
nni_mtx_unlock(&p->mtx);
}