diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-20 11:02:52 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-20 11:35:40 -0800 |
| commit | b2ba35251986d2754de5f0f274ee7cbf577223e1 (patch) | |
| tree | 556896e5e4a40ad79316ff58a689535390a9a91b | |
| parent | 1a4e60f94dd4febffe3b651b5c800f1b9096f3d9 (diff) | |
| download | nng-b2ba35251986d2754de5f0f274ee7cbf577223e1.tar.gz nng-b2ba35251986d2754de5f0f274ee7cbf577223e1.tar.bz2 nng-b2ba35251986d2754de5f0f274ee7cbf577223e1.zip | |
fixes #1159 inproc could skip message queue
In addition, the message queue code is rather heavy weight, so this may
make the inproc code a bit leaner. It also makes backpressure conditions
much more reliable since it now is completely unbuffered.
| -rw-r--r-- | src/transport/inproc/inproc.c | 234 |
1 files changed, 162 insertions, 72 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index ffc6f002..a8466e78 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -9,8 +9,6 @@ // found online at https://opensource.org/licenses/MIT. // -#include <stdio.h> -#include <stdlib.h> #include <string.h> #include "core/nng_impl.h" @@ -19,9 +17,10 @@ // peer to another. The inproc transport is only valid within the same // process. -typedef struct inproc_pair inproc_pair; -typedef struct inproc_pipe inproc_pipe; -typedef struct inproc_ep inproc_ep; +typedef struct inproc_pair inproc_pair; +typedef struct inproc_pipe inproc_pipe; +typedef struct inproc_ep inproc_ep; +typedef struct inproc_queue inproc_queue; typedef struct { nni_mtx mx; @@ -30,22 +29,27 @@ typedef struct { // inproc_pipe represents one half of a connection. struct inproc_pipe { - const char * addr; - inproc_pair *pair; - nni_msgq * rq; - nni_msgq * wq; - nni_pipe * npipe; - uint16_t peer; - uint16_t proto; + const char * addr; + inproc_pair * pair; + inproc_queue *recv_queue; + inproc_queue *send_queue; + nni_pipe * npipe; + uint16_t peer; + uint16_t proto; +}; + +struct inproc_queue { + nni_list readers; + nni_list writers; + nni_mtx lock; + bool closed; }; // inproc_pair represents a pair of pipes. Because we control both // sides of the pipes, we can allocate and free this in one structure. struct inproc_pair { - nni_mtx mx; - int refcnt; - nni_msgq * q[2]; - inproc_pipe *pipes[2]; + nni_atomic_int ref; + inproc_queue queues[2]; }; struct inproc_ep { @@ -81,27 +85,14 @@ inproc_fini(void) nni_mtx_fini(&nni_inproc.mx); } -static void -inproc_pipe_close(void *arg) -{ - inproc_pipe *pipe = arg; - - if (pipe->rq != NULL) { - nni_msgq_close(pipe->rq); - } - if (pipe->wq != NULL) { - nni_msgq_close(pipe->wq); - } -} - // inproc_pair destroy is called when both pipe-ends of the pipe // have been destroyed. static void inproc_pair_destroy(inproc_pair *pair) { - nni_msgq_fini(pair->q[0]); - nni_msgq_fini(pair->q[1]); - nni_mtx_fini(&pair->mx); + for (int i = 0; i < 2; i++) { + nni_mtx_fini(&pair->queues[i].lock); + } NNI_FREE_STRUCT(pair); } @@ -137,18 +128,8 @@ inproc_pipe_fini(void *arg) if ((pair = pipe->pair) != NULL) { // If we are the last peer, then toss the pair structure. - nni_mtx_lock(&pair->mx); - if (pair->pipes[0] == pipe) { - pair->pipes[0] = NULL; - } else if (pair->pipes[1] == pipe) { - pair->pipes[1] = NULL; - } - pair->refcnt--; - if (pair->refcnt == 0) { - nni_mtx_unlock(&pair->mx); + if (nni_atomic_dec_nv(&pair->ref) == 0) { inproc_pair_destroy(pair); - } else { - nni_mtx_unlock(&pair->mx); } } @@ -156,32 +137,137 @@ inproc_pipe_fini(void *arg) } static void +inproc_queue_run_closed(inproc_queue *queue) +{ + nni_aio *aio; + while (((aio = nni_list_first(&queue->readers)) != NULL) || + ((aio = nni_list_first(&queue->writers)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } +} + +static void +inproc_queue_run(inproc_queue *queue) +{ + if (queue->closed) { + inproc_queue_run_closed(queue); + } + for (;;) { + nni_aio *rd; + nni_aio *wr; + nni_msg *msg; + size_t header_len; + uint8_t *header; + + if (((rd = nni_list_first(&queue->readers)) == NULL) || + ((wr = nni_list_first(&queue->writers)) == NULL)) { + return; + } + + msg = nni_aio_get_msg(wr); + NNI_ASSERT(msg != NULL); + + header_len = nni_msg_header_len(msg); + header = nni_msg_header(msg); + + // At this point, we pass success back to the caller. If + // we drop the message for any reason, its accounted on the + // receiver side. + nni_aio_list_remove(wr); + nni_aio_set_msg(wr, NULL); + nni_aio_finish(wr, 0, nni_msg_len(msg) + header_len); + + // TODO: We could check the max receive size here. + + // Now the receive side. First lets make sure we ensure that + // the message headers are inserted into the body, because + // that is what the protocols expect. + // TODO: This would also be the place to do the work to make + // sure we aren't sharing the message once #1156 integrates. + + if (nni_msg_insert(msg, header, header_len) != 0) { + // TODO: bump a dropped statistic + nni_msg_free(msg); + continue; + } + nni_msg_header_clear(msg); + nni_aio_list_remove(rd); + nni_aio_set_msg(rd, msg); + nni_aio_finish(rd, 0, nni_msg_len(msg)); + } +} + +static void +inproc_queue_cancel(nni_aio *aio, void *arg, int rv) +{ + inproc_queue *queue = arg; + + nni_mtx_lock(&queue->lock); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&queue->lock); +} + +static void inproc_pipe_send(void *arg, nni_aio *aio) { - inproc_pipe *pipe = arg; - nni_msg * msg = nni_aio_get_msg(aio); - char * h; - size_t l; - int rv; - - // We need to move any header data to the body, because the other - // side won't know what to do otherwise. - h = nni_msg_header(msg); - l = nni_msg_header_len(msg); - if ((rv = nni_msg_insert(msg, h, l)) != 0) { - nni_aio_finish(aio, rv, nni_aio_count(aio)); + inproc_pipe * pipe = arg; + inproc_queue *queue = pipe->send_queue; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&queue->lock); + if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) { + nni_mtx_unlock(&queue->lock); + nni_aio_finish_error(aio, rv); return; } - nni_msg_header_clear(msg); - nni_msgq_aio_put(pipe->wq, aio); + nni_aio_list_append(&queue->writers, aio); + inproc_queue_run(queue); + nni_mtx_unlock(&queue->lock); } static void inproc_pipe_recv(void *arg, nni_aio *aio) { - inproc_pipe *pipe = arg; + inproc_pipe * pipe = arg; + inproc_queue *queue = pipe->recv_queue; + int rv; - nni_msgq_aio_get(pipe->rq, aio); + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&queue->lock); + if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) { + nni_mtx_unlock(&queue->lock); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&queue->readers, aio); + inproc_queue_run(queue); + nni_mtx_unlock(&queue->lock); +} + +static void +inproc_pipe_close(void *arg) +{ + inproc_pipe *pipe = arg; + inproc_pair *pair = pipe->pair; + + for (int i = 0; i < 2; i++) { + inproc_queue *queue = &pair->queues[i]; + nni_mtx_lock(&queue->lock); + queue->closed = true; + inproc_queue_run_closed(queue); + nni_mtx_unlock(&queue->lock); + } } static uint16_t @@ -265,7 +351,7 @@ inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe) { nni_aio_list_remove(aio); - if ((ep != NULL) && (!ep->listener) && nni_list_empty(&ep->aios)) { + if ((!ep->listener) && nni_list_empty(&ep->aios)) { nni_list_node_remove(&ep->node); } @@ -336,13 +422,17 @@ inproc_accept_clients(inproc_ep *srv) saio, NNG_ENOMEM, srv, NULL); continue; } - nni_mtx_init(&pair->mx); + for (int i = 0; i < 2; i++) { + nni_aio_list_init(&pair->queues[i].readers); + nni_aio_list_init(&pair->queues[i].writers); + nni_mtx_init(&pair->queues[i].lock); + } + nni_atomic_init(&pair->ref); + nni_atomic_set(&pair->ref, 2); spipe = cpipe = NULL; if (((rv = inproc_pipe_alloc(&cpipe, cli)) != 0) || - ((rv = inproc_pipe_alloc(&spipe, srv)) != 0) || - ((rv = nni_msgq_init(&pair->q[0], 1)) != 0) || - ((rv = nni_msgq_init(&pair->q[1], 1)) != 0)) { + ((rv = inproc_pipe_alloc(&spipe, srv)) != 0)) { if (cpipe != NULL) { inproc_pipe_fini(cpipe); @@ -356,14 +446,14 @@ inproc_accept_clients(inproc_ep *srv) continue; } - spipe->peer = cpipe->proto; - cpipe->peer = spipe->proto; - pair->pipes[0] = cpipe; - pair->pipes[1] = spipe; - pair->refcnt = 2; - cpipe->pair = spipe->pair = pair; - cpipe->rq = spipe->wq = pair->q[0]; - cpipe->wq = spipe->rq = pair->q[1]; + cpipe->peer = spipe->proto; + spipe->peer = cpipe->proto; + cpipe->pair = pair; + spipe->pair = pair; + cpipe->send_queue = &pair->queues[0]; + cpipe->recv_queue = &pair->queues[1]; + spipe->send_queue = &pair->queues[1]; + spipe->recv_queue = &pair->queues[0]; inproc_conn_finish(caio, 0, cli, cpipe); inproc_conn_finish(saio, 0, srv, spipe); |
