aboutsummaryrefslogtreecommitdiff
path: root/src/transport/inproc
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-20 11:02:52 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-20 11:35:40 -0800
commitb2ba35251986d2754de5f0f274ee7cbf577223e1 (patch)
tree556896e5e4a40ad79316ff58a689535390a9a91b /src/transport/inproc
parent1a4e60f94dd4febffe3b651b5c800f1b9096f3d9 (diff)
downloadnng-b2ba35251986d2754de5f0f274ee7cbf577223e1.tar.gz
nng-b2ba35251986d2754de5f0f274ee7cbf577223e1.tar.bz2
nng-b2ba35251986d2754de5f0f274ee7cbf577223e1.zip
fixes #1159 inproc could skip message queue
In addition, the message queue code is rather heavy weight, so this may make the inproc code a bit leaner. It also makes backpressure conditions much more reliable since it now is completely unbuffered.
Diffstat (limited to 'src/transport/inproc')
-rw-r--r--src/transport/inproc/inproc.c234
1 files changed, 162 insertions, 72 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index ffc6f002..a8466e78 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -9,8 +9,6 @@
// found online at https://opensource.org/licenses/MIT.
//
-#include <stdio.h>
-#include <stdlib.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -19,9 +17,10 @@
// peer to another. The inproc transport is only valid within the same
// process.
-typedef struct inproc_pair inproc_pair;
-typedef struct inproc_pipe inproc_pipe;
-typedef struct inproc_ep inproc_ep;
+typedef struct inproc_pair inproc_pair;
+typedef struct inproc_pipe inproc_pipe;
+typedef struct inproc_ep inproc_ep;
+typedef struct inproc_queue inproc_queue;
typedef struct {
nni_mtx mx;
@@ -30,22 +29,27 @@ typedef struct {
// inproc_pipe represents one half of a connection.
struct inproc_pipe {
- const char * addr;
- inproc_pair *pair;
- nni_msgq * rq;
- nni_msgq * wq;
- nni_pipe * npipe;
- uint16_t peer;
- uint16_t proto;
+ const char * addr;
+ inproc_pair * pair;
+ inproc_queue *recv_queue;
+ inproc_queue *send_queue;
+ nni_pipe * npipe;
+ uint16_t peer;
+ uint16_t proto;
+};
+
+struct inproc_queue {
+ nni_list readers;
+ nni_list writers;
+ nni_mtx lock;
+ bool closed;
};
// inproc_pair represents a pair of pipes. Because we control both
// sides of the pipes, we can allocate and free this in one structure.
struct inproc_pair {
- nni_mtx mx;
- int refcnt;
- nni_msgq * q[2];
- inproc_pipe *pipes[2];
+ nni_atomic_int ref;
+ inproc_queue queues[2];
};
struct inproc_ep {
@@ -81,27 +85,14 @@ inproc_fini(void)
nni_mtx_fini(&nni_inproc.mx);
}
-static void
-inproc_pipe_close(void *arg)
-{
- inproc_pipe *pipe = arg;
-
- if (pipe->rq != NULL) {
- nni_msgq_close(pipe->rq);
- }
- if (pipe->wq != NULL) {
- nni_msgq_close(pipe->wq);
- }
-}
-
// inproc_pair destroy is called when both pipe-ends of the pipe
// have been destroyed.
static void
inproc_pair_destroy(inproc_pair *pair)
{
- nni_msgq_fini(pair->q[0]);
- nni_msgq_fini(pair->q[1]);
- nni_mtx_fini(&pair->mx);
+ for (int i = 0; i < 2; i++) {
+ nni_mtx_fini(&pair->queues[i].lock);
+ }
NNI_FREE_STRUCT(pair);
}
@@ -137,18 +128,8 @@ inproc_pipe_fini(void *arg)
if ((pair = pipe->pair) != NULL) {
// If we are the last peer, then toss the pair structure.
- nni_mtx_lock(&pair->mx);
- if (pair->pipes[0] == pipe) {
- pair->pipes[0] = NULL;
- } else if (pair->pipes[1] == pipe) {
- pair->pipes[1] = NULL;
- }
- pair->refcnt--;
- if (pair->refcnt == 0) {
- nni_mtx_unlock(&pair->mx);
+ if (nni_atomic_dec_nv(&pair->ref) == 0) {
inproc_pair_destroy(pair);
- } else {
- nni_mtx_unlock(&pair->mx);
}
}
@@ -156,32 +137,137 @@ inproc_pipe_fini(void *arg)
}
static void
+inproc_queue_run_closed(inproc_queue *queue)
+{
+ nni_aio *aio;
+ while (((aio = nni_list_first(&queue->readers)) != NULL) ||
+ ((aio = nni_list_first(&queue->writers)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+}
+
+static void
+inproc_queue_run(inproc_queue *queue)
+{
+ if (queue->closed) {
+ inproc_queue_run_closed(queue);
+ }
+ for (;;) {
+ nni_aio *rd;
+ nni_aio *wr;
+ nni_msg *msg;
+ size_t header_len;
+ uint8_t *header;
+
+ if (((rd = nni_list_first(&queue->readers)) == NULL) ||
+ ((wr = nni_list_first(&queue->writers)) == NULL)) {
+ return;
+ }
+
+ msg = nni_aio_get_msg(wr);
+ NNI_ASSERT(msg != NULL);
+
+ header_len = nni_msg_header_len(msg);
+ header = nni_msg_header(msg);
+
+ // At this point, we pass success back to the caller. If
+ // we drop the message for any reason, its accounted on the
+ // receiver side.
+ nni_aio_list_remove(wr);
+ nni_aio_set_msg(wr, NULL);
+ nni_aio_finish(wr, 0, nni_msg_len(msg) + header_len);
+
+ // TODO: We could check the max receive size here.
+
+ // Now the receive side. First lets make sure we ensure that
+ // the message headers are inserted into the body, because
+ // that is what the protocols expect.
+ // TODO: This would also be the place to do the work to make
+ // sure we aren't sharing the message once #1156 integrates.
+
+ if (nni_msg_insert(msg, header, header_len) != 0) {
+ // TODO: bump a dropped statistic
+ nni_msg_free(msg);
+ continue;
+ }
+ nni_msg_header_clear(msg);
+ nni_aio_list_remove(rd);
+ nni_aio_set_msg(rd, msg);
+ nni_aio_finish(rd, 0, nni_msg_len(msg));
+ }
+}
+
+static void
+inproc_queue_cancel(nni_aio *aio, void *arg, int rv)
+{
+ inproc_queue *queue = arg;
+
+ nni_mtx_lock(&queue->lock);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&queue->lock);
+}
+
+static void
inproc_pipe_send(void *arg, nni_aio *aio)
{
- inproc_pipe *pipe = arg;
- nni_msg * msg = nni_aio_get_msg(aio);
- char * h;
- size_t l;
- int rv;
-
- // We need to move any header data to the body, because the other
- // side won't know what to do otherwise.
- h = nni_msg_header(msg);
- l = nni_msg_header_len(msg);
- if ((rv = nni_msg_insert(msg, h, l)) != 0) {
- nni_aio_finish(aio, rv, nni_aio_count(aio));
+ inproc_pipe * pipe = arg;
+ inproc_queue *queue = pipe->send_queue;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&queue->lock);
+ if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) {
+ nni_mtx_unlock(&queue->lock);
+ nni_aio_finish_error(aio, rv);
return;
}
- nni_msg_header_clear(msg);
- nni_msgq_aio_put(pipe->wq, aio);
+ nni_aio_list_append(&queue->writers, aio);
+ inproc_queue_run(queue);
+ nni_mtx_unlock(&queue->lock);
}
static void
inproc_pipe_recv(void *arg, nni_aio *aio)
{
- inproc_pipe *pipe = arg;
+ inproc_pipe * pipe = arg;
+ inproc_queue *queue = pipe->recv_queue;
+ int rv;
- nni_msgq_aio_get(pipe->rq, aio);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&queue->lock);
+ if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) {
+ nni_mtx_unlock(&queue->lock);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&queue->readers, aio);
+ inproc_queue_run(queue);
+ nni_mtx_unlock(&queue->lock);
+}
+
+static void
+inproc_pipe_close(void *arg)
+{
+ inproc_pipe *pipe = arg;
+ inproc_pair *pair = pipe->pair;
+
+ for (int i = 0; i < 2; i++) {
+ inproc_queue *queue = &pair->queues[i];
+ nni_mtx_lock(&queue->lock);
+ queue->closed = true;
+ inproc_queue_run_closed(queue);
+ nni_mtx_unlock(&queue->lock);
+ }
}
static uint16_t
@@ -265,7 +351,7 @@ inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe)
{
nni_aio_list_remove(aio);
- if ((ep != NULL) && (!ep->listener) && nni_list_empty(&ep->aios)) {
+ if ((!ep->listener) && nni_list_empty(&ep->aios)) {
nni_list_node_remove(&ep->node);
}
@@ -336,13 +422,17 @@ inproc_accept_clients(inproc_ep *srv)
saio, NNG_ENOMEM, srv, NULL);
continue;
}
- nni_mtx_init(&pair->mx);
+ for (int i = 0; i < 2; i++) {
+ nni_aio_list_init(&pair->queues[i].readers);
+ nni_aio_list_init(&pair->queues[i].writers);
+ nni_mtx_init(&pair->queues[i].lock);
+ }
+ nni_atomic_init(&pair->ref);
+ nni_atomic_set(&pair->ref, 2);
spipe = cpipe = NULL;
if (((rv = inproc_pipe_alloc(&cpipe, cli)) != 0) ||
- ((rv = inproc_pipe_alloc(&spipe, srv)) != 0) ||
- ((rv = nni_msgq_init(&pair->q[0], 1)) != 0) ||
- ((rv = nni_msgq_init(&pair->q[1], 1)) != 0)) {
+ ((rv = inproc_pipe_alloc(&spipe, srv)) != 0)) {
if (cpipe != NULL) {
inproc_pipe_fini(cpipe);
@@ -356,14 +446,14 @@ inproc_accept_clients(inproc_ep *srv)
continue;
}
- spipe->peer = cpipe->proto;
- cpipe->peer = spipe->proto;
- pair->pipes[0] = cpipe;
- pair->pipes[1] = spipe;
- pair->refcnt = 2;
- cpipe->pair = spipe->pair = pair;
- cpipe->rq = spipe->wq = pair->q[0];
- cpipe->wq = spipe->rq = pair->q[1];
+ cpipe->peer = spipe->proto;
+ spipe->peer = cpipe->proto;
+ cpipe->pair = pair;
+ spipe->pair = pair;
+ cpipe->send_queue = &pair->queues[0];
+ cpipe->recv_queue = &pair->queues[1];
+ spipe->send_queue = &pair->queues[1];
+ spipe->recv_queue = &pair->queues[0];
inproc_conn_finish(caio, 0, cli, cpipe);
inproc_conn_finish(saio, 0, srv, spipe);