aboutsummaryrefslogtreecommitdiff
path: root/src/transport/ipc
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-11-13 21:10:03 -0800
committerGarrett D'Amore <garrett@damore.org>2017-11-13 21:10:03 -0800
commite8694d15d0a108895bf869f292d59e11d834361e (patch)
treed87b8d396953fee653fbcbee92521395d0cec1fe /src/transport/ipc
parentac6019bfabac887274fb9d8b2a167df940ba6121 (diff)
downloadnng-e8694d15d0a108895bf869f292d59e11d834361e.tar.gz
nng-e8694d15d0a108895bf869f292d59e11d834361e.tar.bz2
nng-e8694d15d0a108895bf869f292d59e11d834361e.zip
fixes #154 underlyng TCP & IPC transports should support partial recv/send
fixes #155 POSIX TCP & IPC could avoid a lot of context switches
Diffstat (limited to 'src/transport/ipc')
-rw-r--r--src/transport/ipc/ipc.c150
1 files changed, 104 insertions, 46 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index c13dbd34..32ff1c0e 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -207,25 +207,52 @@ nni_ipc_pipe_send_cb(void *arg)
{
nni_ipc_pipe *pipe = arg;
nni_aio * aio;
+ nni_aio * txaio = pipe->txaio;
+ nni_msg * msg;
int rv;
- size_t len;
+ size_t n;
nni_mtx_lock(&pipe->mtx);
if ((aio = pipe->user_txaio) == NULL) {
nni_mtx_unlock(&pipe->mtx);
return;
}
- pipe->user_txaio = NULL;
- if ((rv = nni_aio_result(pipe->txaio)) != 0) {
- len = 0;
- } else {
- nni_msg *msg = nni_aio_get_msg(aio);
- len = nni_msg_len(msg);
- nni_msg_free(msg);
+
+ if ((rv = nni_aio_result(txaio)) != 0) {
+ pipe->user_txaio = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+ msg = nni_aio_get_msg(aio);
nni_aio_set_msg(aio, NULL);
+ nni_msg_free(msg);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ n = nni_aio_count(txaio);
+ while (n) {
+ NNI_ASSERT(txaio->a_niov != 0);
+ if (txaio->a_iov[0].iov_len > n) {
+ txaio->a_iov[0].iov_len -= n;
+ txaio->a_iov[0].iov_buf += n;
+ break;
+ }
+ n -= txaio->a_iov[0].iov_len;
+ for (int i = 0; i < txaio->a_niov; i++) {
+ txaio->a_iov[i] = txaio->a_iov[i + 1];
+ }
+ txaio->a_niov--;
+ }
+ if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) {
+ nni_plat_ipc_pipe_send(pipe->ipp, txaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
}
- nni_aio_finish(aio, rv, len);
+
nni_mtx_unlock(&pipe->mtx);
+ msg = nni_aio_get_msg(aio);
+ n = nni_msg_len(msg);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, n);
}
static void
@@ -234,7 +261,9 @@ nni_ipc_pipe_recv_cb(void *arg)
nni_ipc_pipe *pipe = arg;
nni_aio * aio;
int rv;
+ size_t n;
nni_msg * msg;
+ nni_aio * rxaio = pipe->rxaio;
nni_mtx_lock(&pipe->mtx);
if ((aio = pipe->user_rxaio) == NULL) {
@@ -243,32 +272,45 @@ nni_ipc_pipe_recv_cb(void *arg)
return;
}
- if ((rv = nni_aio_result(pipe->rxaio)) != 0) {
+ if ((rv = nni_aio_result(rxaio)) != 0) {
// Error on receive. This has to cause an error back
// to the user. Also, if we had allocated an rxmsg, lets
// toss it.
- if (pipe->rxmsg != NULL) {
- nni_msg_free(pipe->rxmsg);
- pipe->rxmsg = NULL;
+ goto recv_error;
+ }
+
+ n = nni_aio_count(rxaio);
+ while (n) {
+ NNI_ASSERT(rxaio->a_niov != 0);
+ if (rxaio->a_iov[0].iov_len > n) {
+ rxaio->a_iov[0].iov_len -= n;
+ rxaio->a_iov[0].iov_buf += n;
+ break;
}
- pipe->user_rxaio = NULL;
- nni_aio_finish_error(aio, rv);
+ n -= rxaio->a_iov[0].iov_len;
+ for (int i = 0; i < rxaio->a_niov; i++) {
+ rxaio->a_iov[i] = rxaio->a_iov[i + 1];
+ }
+ rxaio->a_niov--;
+ }
+
+ // Was this a partial read? If so then resubmit for the rest.
+ if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) {
+ nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
}
- // If we don't have a message yet, we were reading the TCP message
+ // If we don't have a message yet, we were reading the message
// header, which is just the length. This tells us the size of the
// message to allocate and how much more to expect.
if (pipe->rxmsg == NULL) {
uint64_t len;
- nni_aio *rxaio;
// Check to make sure we got msg type 1.
if (pipe->rxhead[0] != 1) {
- nni_aio_finish_error(aio, NNG_EPROTO);
- nni_mtx_unlock(&pipe->mtx);
- return;
+ rv = NNG_EPROTO;
+ goto recv_error;
}
// We should have gotten a message header.
@@ -277,10 +319,8 @@ nni_ipc_pipe_recv_cb(void *arg)
// Make sure the message payload is not too big. If it is
// the caller will shut down the pipe.
if (len > pipe->rcvmax) {
- pipe->user_rxaio = NULL;
- nni_aio_finish_error(aio, NNG_EMSGSIZE);
- nni_mtx_unlock(&pipe->mtx);
- return;
+ rv = NNG_EMSGSIZE;
+ goto recv_error;
}
// Note that all IO on this pipe is blocked behind this
@@ -289,22 +329,20 @@ nni_ipc_pipe_recv_cb(void *arg)
// transmits to proceed normally. In practice this is
// unlikely to be much of an issue though.
if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
- pipe->user_rxaio = NULL;
- nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&pipe->mtx);
- return;
+ goto recv_error;
}
- // Submit the rest of the data for a read -- we want to
- // read the entire message now.
- rxaio = pipe->rxaio;
- rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
- rxaio->a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
- rxaio->a_niov = 1;
+ if (len != 0) {
+ // Submit the rest of the data for a read -- we want to
+ // read the entire message now.
+ rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
+ rxaio->a_iov[0].iov_len = (size_t) len;
+ rxaio->a_niov = 1;
- nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
- nni_mtx_unlock(&pipe->mtx);
- return;
+ nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
}
// Otherwise we got a message read completely. Let the user know the
@@ -312,8 +350,18 @@ nni_ipc_pipe_recv_cb(void *arg)
pipe->user_rxaio = NULL;
msg = pipe->rxmsg;
pipe->rxmsg = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+
nni_aio_finish_msg(aio, msg);
+ return;
+
+recv_error:
+ pipe->user_rxaio = NULL;
+ msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
nni_mtx_unlock(&pipe->mtx);
+ nni_msg_free(msg);
+ nni_aio_finish_error(aio, rv);
}
static void
@@ -340,6 +388,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
nni_msg * msg = nni_aio_get_msg(aio);
uint64_t len;
nni_aio * txaio;
+ int niov;
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -354,14 +403,22 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
pipe->txhead[0] = 1; // message type, 1.
NNI_PUT64(pipe->txhead + 1, len);
- txaio = pipe->txaio;
- txaio->a_iov[0].iov_buf = pipe->txhead;
- txaio->a_iov[0].iov_len = sizeof(pipe->txhead);
- txaio->a_iov[1].iov_buf = nni_msg_header(msg);
- txaio->a_iov[1].iov_len = nni_msg_header_len(msg);
- txaio->a_iov[2].iov_buf = nni_msg_body(msg);
- txaio->a_iov[2].iov_len = nni_msg_len(msg);
- txaio->a_niov = 3;
+ txaio = pipe->txaio;
+ niov = 0;
+ txaio->a_iov[niov].iov_buf = pipe->txhead;
+ txaio->a_iov[niov].iov_len = sizeof(pipe->txhead);
+ niov++;
+ if (nni_msg_header_len(msg) > 0) {
+ txaio->a_iov[niov].iov_buf = nni_msg_header(msg);
+ txaio->a_iov[niov].iov_len = nni_msg_header_len(msg);
+ niov++;
+ }
+ if (nni_msg_len(msg) > 0) {
+ txaio->a_iov[niov].iov_buf = nni_msg_body(msg);
+ txaio->a_iov[niov].iov_len = nni_msg_len(msg);
+ niov++;
+ }
+ txaio->a_niov = niov;
nni_plat_ipc_pipe_send(pipe->ipp, txaio);
nni_mtx_unlock(&pipe->mtx);
@@ -625,7 +682,8 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- // If we can't start, then its dying and we can't report either.
+ // If we can't start, then its dying and we can't report
+ // either.
if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
return;