aboutsummaryrefslogtreecommitdiff
path: root/src/transport/ipc
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-03-29 19:35:09 -0700
committerGarrett D'Amore <garrett@damore.org>2018-03-30 12:38:56 -0700
commit99907b52872d9fc9c68f1af60299dd8476c0a603 (patch)
treeef465cb6c71efe9b5294069627be57f99a8d9f8f /src/transport/ipc
parent5f69a5ba55058c7f5bfc4d5a250e72c56f31e0eb (diff)
downloadnng-99907b52872d9fc9c68f1af60299dd8476c0a603.tar.gz
nng-99907b52872d9fc9c68f1af60299dd8476c0a603.tar.bz2
nng-99907b52872d9fc9c68f1af60299dd8476c0a603.zip
fixes #317 TLS, TCP, and IPC should support multiple outstanding ops
fixes #22 Consider using synchronous completions sometimes Transport improvements for IPC, TCP, and TLS. This change does three things. First it permits multiple outstanding receives or sends on the transport. This change is being made to accomodate some other changes in the protocols where it might be advantageous to post send or receives directly against the transport pipe without going through another level of indirection. Second, it changes the normal completions to be performed synchronously. This translates into a rather major performance improvement, reducing latency by some 27%, and thereby improving performance altogether. (This elminates two extra context switches per transaction!) FInally, we can save some extra checks and conditions because we know that completions cannot happen if we don't have a pending operation (we no longer complete out of sequence), and we only call the dosend operation when we have something to send. This can eliminate some pipeline stalls.
Diffstat (limited to 'src/transport/ipc')
-rw-r--r--src/transport/ipc/ipc.c157
1 files changed, 100 insertions, 57 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 9d25ed72..e2bb5f99 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -37,8 +37,8 @@ struct nni_ipc_pipe {
size_t wanttxhead;
size_t wantrxhead;
- nni_aio *user_txaio;
- nni_aio *user_rxaio;
+ nni_list recvq;
+ nni_list sendq;
nni_aio *user_negaio;
nni_aio *txaio;
nni_aio *rxaio;
@@ -57,6 +57,8 @@ struct nni_ipc_ep {
nni_mtx mtx;
};
+static void nni_ipc_pipe_dosend(nni_ipc_pipe *, nni_aio *);
+static void nni_ipc_pipe_dorecv(nni_ipc_pipe *);
static void nni_ipc_pipe_send_cb(void *);
static void nni_ipc_pipe_recv_cb(void *);
static void nni_ipc_pipe_nego_cb(void *);
@@ -119,6 +121,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
nni_ipc_pipe_fini(p);
return (rv);
}
+ nni_aio_list_init(&p->sendq);
+ nni_aio_list_init(&p->recvq);
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -215,17 +219,16 @@ nni_ipc_pipe_send_cb(void *arg)
size_t n;
nni_mtx_lock(&pipe->mtx);
- if ((aio = pipe->user_txaio) == NULL) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
+ aio = nni_list_first(&pipe->sendq);
if ((rv = nni_aio_result(txaio)) != 0) {
- pipe->user_txaio = NULL;
+ // Intentionally we do not queue up another transfer.
+ // There's an excellent chance that the pipe is no longer
+ // usable, with a partial transfer.
+ // The protocol should see this error, and close the
+ // pipe itself, we hope.
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&pipe->mtx);
- msg = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
- nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
return;
}
@@ -238,11 +241,18 @@ nni_ipc_pipe_send_cb(void *arg)
return;
}
+ nni_aio_list_remove(aio);
+ if (!nni_list_empty(&pipe->sendq)) {
+ // schedule next send
+ nni_ipc_pipe_dosend(pipe, nni_list_first(&pipe->sendq));
+ }
nni_mtx_unlock(&pipe->mtx);
+
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
+ nni_aio_set_synch(aio);
nni_aio_finish(aio, 0, n);
}
@@ -257,11 +267,7 @@ nni_ipc_pipe_recv_cb(void *arg)
nni_aio * rxaio = pipe->rxaio;
nni_mtx_lock(&pipe->mtx);
- if ((aio = pipe->user_rxaio) == NULL) {
- // aio was canceled
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
+ aio = nni_list_first(&pipe->recvq);
if ((rv = nni_aio_result(rxaio)) != 0) {
// Error on receive. This has to cause an error back
@@ -325,19 +331,28 @@ nni_ipc_pipe_recv_cb(void *arg)
// Otherwise we got a message read completely. Let the user know the
// good news.
- pipe->user_rxaio = NULL;
- msg = pipe->rxmsg;
- pipe->rxmsg = NULL;
+
+ nni_aio_list_remove(aio);
+ msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
+ if (!nni_list_empty(&pipe->recvq)) {
+ nni_ipc_pipe_dorecv(pipe);
+ }
nni_mtx_unlock(&pipe->mtx);
- nni_aio_finish_msg(aio, msg);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_set_synch(aio);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
return;
recv_error:
- pipe->user_rxaio = NULL;
- msg = pipe->rxmsg;
- pipe->rxmsg = NULL;
+ nni_aio_list_remove(aio);
+ msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
+ // Intentionally, we do not queue up another receive.
+ // The protocol should notice this error and close the pipe.
nni_mtx_unlock(&pipe->mtx);
+
nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
}
@@ -348,37 +363,36 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv)
nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
- if (pipe->user_txaio != aio) {
+ if (!nni_aio_list_active(aio)) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ // If this is being sent, then cancel the pending transfer.
+ // The callback on the txaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&pipe->sendq) == aio) {
+ nni_aio_abort(pipe->txaio, rv);
nni_mtx_unlock(&pipe->mtx);
return;
}
- pipe->user_txaio = NULL;
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&pipe->mtx);
-
- nni_aio_abort(pipe->txaio, rv);
nni_aio_finish_error(aio, rv);
}
static void
-nni_ipc_pipe_send(void *arg, nni_aio *aio)
+nni_ipc_pipe_dosend(nni_ipc_pipe *pipe, nni_aio *aio)
{
- nni_ipc_pipe *pipe = arg;
- nni_msg * msg = nni_aio_get_msg(aio);
- uint64_t len;
- nni_aio * txaio;
- int niov;
- nni_iov iov[3];
+ nni_aio *txaio;
+ nni_msg *msg;
+ int niov;
+ nni_iov iov[3];
+ uint64_t len;
+ // This runs to send the message.
+ msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg) + nni_msg_header_len(msg);
- nni_mtx_lock(&pipe->mtx);
- if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
-
- pipe->user_txaio = aio;
-
pipe->txhead[0] = 1; // message type, 1.
NNI_PUT64(pipe->txhead + 1, len);
@@ -398,43 +412,54 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
niov++;
}
nni_aio_set_iov(txaio, niov, iov);
-
nni_plat_ipc_pipe_send(pipe->ipp, txaio);
- nni_mtx_unlock(&pipe->mtx);
}
static void
-nni_ipc_cancel_rx(nni_aio *aio, int rv)
+nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
- nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
+ nni_ipc_pipe *pipe = arg;
nni_mtx_lock(&pipe->mtx);
- if (pipe->user_rxaio != aio) {
+ if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
nni_mtx_unlock(&pipe->mtx);
return;
}
- pipe->user_rxaio = NULL;
+ nni_list_append(&pipe->sendq, aio);
+ if (nni_list_first(&pipe->sendq) == aio) {
+ nni_ipc_pipe_dosend(pipe, aio);
+ }
nni_mtx_unlock(&pipe->mtx);
-
- nni_aio_abort(pipe->rxaio, rv);
- nni_aio_finish_error(aio, rv);
}
static void
-nni_ipc_pipe_recv(void *arg, nni_aio *aio)
+nni_ipc_cancel_rx(nni_aio *aio, int rv)
{
- nni_ipc_pipe *pipe = arg;
- nni_aio * rxaio;
- nni_iov iov;
+ nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
-
- if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
+ if (!nni_aio_list_active(aio)) {
nni_mtx_unlock(&pipe->mtx);
return;
}
+ // If receive in progress, then cancel the pending transfer.
+ // The callback on the rxaio will cause the user aio to
+ // be canceled too.
+ if (nni_list_first(&pipe->recvq) == aio) {
+ nni_aio_abort(pipe->rxaio, rv);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+}
- pipe->user_rxaio = aio;
+static void
+nni_ipc_pipe_dorecv(nni_ipc_pipe *pipe)
+{
+ nni_aio *rxaio;
+ nni_iov iov;
NNI_ASSERT(pipe->rxmsg == NULL);
// Schedule a read of the IPC header.
@@ -444,6 +469,24 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
+}
+
+static void
+nni_ipc_pipe_recv(void *arg, nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = arg;
+
+ nni_mtx_lock(&pipe->mtx);
+
+ if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
+
+ nni_list_append(&pipe->recvq, aio);
+ if (nni_list_first(&pipe->recvq) == aio) {
+ nni_ipc_pipe_dorecv(pipe);
+ }
nni_mtx_unlock(&pipe->mtx);
}