From e9c28eed5ffb8da59ba8a6805e85936f04bdb644 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 24 Jan 2021 21:22:13 -0800 Subject: fixes #1419 pairv0 performance work This takes the performance work we did for pairv1, and provides an implementation for pairv0. The upshot should be a nice performance boost for pair v0. --- src/sp/protocol/pair0/pair.c | 478 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 390 insertions(+), 88 deletions(-) (limited to 'src/sp/protocol/pair0/pair.c') diff --git a/src/sp/protocol/pair0/pair.c b/src/sp/protocol/pair0/pair.c index 41f88c7c..c2407d81 100644 --- a/src/sp/protocol/pair0/pair.c +++ b/src/sp/protocol/pair0/pair.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. +// Copyright 2021 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -24,18 +24,24 @@ typedef struct pair0_pipe pair0_pipe; typedef struct pair0_sock pair0_sock; -static void pair0_send_cb(void *); -static void pair0_recv_cb(void *); -static void pair0_getq_cb(void *); -static void pair0_putq_cb(void *); +static void pair0_pipe_send_cb(void *); +static void pair0_pipe_recv_cb(void *); static void pair0_pipe_fini(void *); +static void pair0_send_sched(pair0_sock *); +static void pair0_pipe_send(pair0_pipe *, nni_msg *); // pair0_sock is our per-socket protocol private structure. struct pair0_sock { - pair0_pipe *ppipe; - nni_msgq * uwq; - nni_msgq * urq; - nni_mtx mtx; + pair0_pipe * p; + nni_mtx mtx; + nni_lmq wmq; + nni_list waq; + nni_lmq rmq; + nni_list raq; + nni_pollable readable; + nni_pollable writable; + bool rd_ready; // pipe ready for read + bool wr_ready; // pipe ready for write }; // An pair0_pipe is our per-pipe protocol private structure. We keep @@ -43,23 +49,28 @@ struct pair0_sock { // pipe. The separate data structure is more like other protocols that do // manage multiple pipes. struct pair0_pipe { - nni_pipe * npipe; - pair0_sock *psock; + nni_pipe * pipe; + pair0_sock *pair; nni_aio aio_send; nni_aio aio_recv; - nni_aio aio_getq; - nni_aio aio_putq; }; static int -pair0_sock_init(void *arg, nni_sock *nsock) +pair0_sock_init(void *arg, nni_sock *sock) { pair0_sock *s = arg; + NNI_ARG_UNUSED(sock); nni_mtx_init(&s->mtx); - s->ppipe = NULL; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); + + nni_lmq_init(&s->rmq, 0); + nni_lmq_init(&s->wmq, 0); + nni_aio_list_init(&s->raq); + nni_aio_list_init(&s->waq); + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); + + s->p = NULL; return (0); } @@ -68,6 +79,8 @@ pair0_sock_fini(void *arg) { pair0_sock *s = arg; + nni_lmq_fini(&s->rmq); + nni_lmq_fini(&s->wmq); nni_mtx_fini(&s->mtx); } @@ -75,11 +88,28 @@ static void pair0_pipe_stop(void *arg) { pair0_pipe *p = arg; + pair0_sock *s = p->pair; + + nni_mtx_lock(&s->mtx); + if (s->p == p) { + s->p = NULL; + if (s->rd_ready) { + nni_msg *m = nni_aio_get_msg(&p->aio_recv); + nni_msg_free(m); + s->rd_ready = false; + } + if (s->wr_ready) { + s->wr_ready = false; + nni_pollable_clear(&s->writable); + } + if (nni_lmq_empty(&s->rmq)) { + nni_pollable_clear(&s->readable); + } + } + nni_mtx_unlock(&s->mtx); nni_aio_stop(&p->aio_send); nni_aio_stop(&p->aio_recv); - nni_aio_stop(&p->aio_putq); - nni_aio_stop(&p->aio_getq); } static void @@ -89,48 +119,59 @@ pair0_pipe_fini(void *arg) nni_aio_fini(&p->aio_send); nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_putq); - nni_aio_fini(&p->aio_getq); } static int -pair0_pipe_init(void *arg, nni_pipe *npipe, void *psock) +pair0_pipe_init(void *arg, nni_pipe *pipe, void *pair) { pair0_pipe *p = arg; - nni_aio_init(&p->aio_send, pair0_send_cb, p); - nni_aio_init(&p->aio_recv, pair0_recv_cb, p); - nni_aio_init(&p->aio_getq, pair0_getq_cb, p); - nni_aio_init(&p->aio_putq, pair0_putq_cb, p); + nni_aio_init(&p->aio_send, pair0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, pair0_pipe_recv_cb, p); + + p->pipe = pipe; + p->pair = pair; - p->npipe = npipe; - p->psock = psock; return (0); } +static void +pair0_cancel(nni_aio *aio, void *arg, int rv) +{ + pair0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->mtx); +} + static int pair0_pipe_start(void *arg) { pair0_pipe *p = arg; - pair0_sock *s = p->psock; + pair0_sock *s = p->pair; - if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V0) { + if (nni_pipe_peer(p->pipe) != NNI_PROTO_PAIR_V0) { // Peer protocol mismatch. return (NNG_EPROTO); } nni_mtx_lock(&s->mtx); - if (s->ppipe != NULL) { + if (s->p != NULL) { nni_mtx_unlock(&s->mtx); return (NNG_EBUSY); // Already have a peer, denied. } - s->ppipe = p; + s->p = p; + s->rd_ready = false; nni_mtx_unlock(&s->mtx); - // Schedule a getq on the upper, and a read from the pipe. - // Each of these also sets up another hold on the pipe itself. - nni_msgq_aio_get(s->uwq, &p->aio_getq); - nni_pipe_recv(p->npipe, &p->aio_recv); + pair0_send_sched(s); + + // And the pipe read of course. + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -139,83 +180,117 @@ static void pair0_pipe_close(void *arg) { pair0_pipe *p = arg; - pair0_sock *s = p->psock; nni_aio_close(&p->aio_send); nni_aio_close(&p->aio_recv); - nni_aio_close(&p->aio_putq); - nni_aio_close(&p->aio_getq); - - nni_mtx_lock(&s->mtx); - if (s->ppipe == p) { - s->ppipe = NULL; - } - nni_mtx_unlock(&s->mtx); } static void -pair0_recv_cb(void *arg) +pair0_pipe_recv_cb(void *arg) { pair0_pipe *p = arg; - pair0_sock *s = p->psock; + pair0_sock *s = p->pair; nni_msg * msg; + nni_aio * a; if (nni_aio_result(&p->aio_recv) != 0) { - nni_pipe_close(p->npipe); + nni_pipe_close(p->pipe); return; } msg = nni_aio_get_msg(&p->aio_recv); - nni_aio_set_msg(&p->aio_putq, msg); - nni_aio_set_msg(&p->aio_recv, NULL); + // Store the pipe ID. + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); - nni_msgq_aio_put(s->urq, &p->aio_putq); -} - -static void -pair0_putq_cb(void *arg) -{ - pair0_pipe *p = arg; + nni_mtx_lock(&s->mtx); - if (nni_aio_result(&p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(&p->aio_putq)); - nni_aio_set_msg(&p->aio_putq, NULL); - nni_pipe_close(p->npipe); + // if anyone is blocking, then the lmq will be empty, and + // we should deliver it there. + if ((a = nni_list_first(&s->raq)) != NULL) { + nni_aio_list_remove(a); + nni_aio_set_msg(a, msg); + nni_pipe_recv(p->pipe, &p->aio_recv); + nni_mtx_unlock(&s->mtx); + nni_aio_finish_sync(a, 0, nni_msg_len(msg)); return; } - nni_pipe_recv(p->npipe, &p->aio_recv); + + // maybe we have room in the rmq? + if (!nni_lmq_full(&s->rmq)) { + nni_lmq_putq(&s->rmq, msg); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->pipe, &p->aio_recv); + } else { + s->rd_ready = true; + } + nni_pollable_raise(&s->readable); + nni_mtx_unlock(&s->mtx); } static void -pair0_getq_cb(void *arg) +pair0_send_sched(pair0_sock *s) { - pair0_pipe *p = arg; + pair0_pipe *p; + nni_msg * m; + nni_aio * a = NULL; + size_t l = 0; - if (nni_aio_result(&p->aio_getq) != 0) { - nni_pipe_close(p->npipe); + nni_mtx_lock(&s->mtx); + + if ((p = s->p) == NULL) { + nni_mtx_unlock(&s->mtx); return; } - nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq)); - nni_aio_set_msg(&p->aio_getq, NULL); - nni_pipe_send(p->npipe, &p->aio_send); + s->wr_ready = true; + + // if message waiting in buffered queue, then we prefer that. + if (nni_lmq_getq(&s->wmq, &m) == 0) { + pair0_pipe_send(p, m); + + if ((a = nni_list_first(&s->waq)) != NULL) { + nni_aio_list_remove(a); + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + nni_lmq_putq(&s->wmq, m); + } + + } else if ((a = nni_list_first(&s->waq)) != NULL) { + // Looks like we had the unbuffered case, but + // someone was waiting. + nni_aio_list_remove(a); + + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + pair0_pipe_send(p, m); + } + + // if we were blocked before, but not now, update. + if ((!nni_lmq_full(&s->wmq)) || s->wr_ready) { + nni_pollable_raise(&s->writable); + } + + nni_mtx_unlock(&s->mtx); + + if (a != NULL) { + nni_aio_set_msg(a, NULL); + nni_aio_finish_sync(a, 0, l); + } } static void -pair0_send_cb(void *arg) +pair0_pipe_send_cb(void *arg) { pair0_pipe *p = arg; - pair0_sock *s = p->psock; if (nni_aio_result(&p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(&p->aio_send)); nni_aio_set_msg(&p->aio_send, NULL); - nni_pipe_close(p->npipe); + nni_pipe_close(p->pipe); return; } - nni_msgq_aio_get(s->uwq, &p->aio_getq); + pair0_send_sched(p->pair); } static void @@ -227,25 +302,259 @@ pair0_sock_open(void *arg) static void pair0_sock_close(void *arg) { - NNI_ARG_UNUSED(arg); + pair0_sock *s = arg; + nni_aio * a; + nni_msg * m; + nni_mtx_lock(&s->mtx); + while (((a = nni_list_first(&s->raq)) != NULL) || + ((a = nni_list_first(&s->waq)) != NULL)) { + nni_aio_list_remove(a); + nni_aio_finish_error(a, NNG_ECLOSED); + } + while ((nni_lmq_getq(&s->rmq, &m) == 0) || + (nni_lmq_getq(&s->wmq, &m) == 0)) { + nni_msg_free(m); + } + nni_mtx_unlock(&s->mtx); +} + +static void +pair0_pipe_send(pair0_pipe *p, nni_msg *m) +{ + pair0_sock *s = p->pair; + // assumption: we have unique access to the message at this point. + NNI_ASSERT(!nni_msg_shared(m)); + + nni_aio_set_msg(&p->aio_send, m); + nni_pipe_send(p->pipe, &p->aio_send); + s->wr_ready = false; } static void pair0_sock_send(void *arg, nni_aio *aio) { pair0_sock *s = arg; + nni_msg * m; + size_t len; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + m = nni_aio_get_msg(aio); + len = nni_msg_len(m); + + nni_mtx_lock(&s->mtx); + if (s->wr_ready) { + pair0_pipe *p = s->p; + if (nni_lmq_full(&s->wmq)) { + nni_pollable_clear(&s->writable); + } + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + pair0_pipe_send(p, m); + nni_mtx_unlock(&s->mtx); + return; + } + + // Can we maybe queue it. + if (nni_lmq_putq(&s->wmq, m) == 0) { + // Yay, we can. So we're done. + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + if (nni_lmq_full(&s->wmq)) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->mtx); + return; + } - nni_msgq_aio_put(s->uwq, aio); + if ((rv = nni_aio_schedule(aio, pair0_cancel, s)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&s->mtx); + return; + } + nni_aio_list_append(&s->waq, aio); + nni_mtx_unlock(&s->mtx); } static void pair0_sock_recv(void *arg, nni_aio *aio) { pair0_sock *s = arg; + pair0_pipe *p; + nni_msg * m; + int rv; - nni_msgq_aio_get(s->urq, aio); + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&s->mtx); + p = s->p; + + // Buffered read. If there is a message waiting for us, pick + // it up. We might need to post another read request as well. + if (nni_lmq_getq(&s->rmq, &m) == 0) { + nni_aio_set_msg(aio, m); + nni_aio_finish(aio, 0, nni_msg_len(m)); + if (s->rd_ready) { + s->rd_ready = false; + m = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_lmq_putq(&s->rmq, m); + nni_pipe_recv(p->pipe, &p->aio_recv); + } + if (nni_lmq_empty(&s->rmq)) { + nni_pollable_clear(&s->readable); + } + nni_mtx_unlock(&s->mtx); + return; + } + + // Unbuffered -- but waiting. + if (s->rd_ready) { + s->rd_ready = false; + m = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_aio_set_msg(aio, m); + nni_aio_finish(aio, 0, nni_msg_len(m)); + nni_pipe_recv(p->pipe, &p->aio_recv); + nni_pollable_clear(&s->readable); + nni_mtx_unlock(&s->mtx); + return; + } + + if ((rv = nni_aio_schedule(aio, pair0_cancel, s)) != 0) { + nni_aio_finish_error(aio, rv); + } else { + nni_aio_list_append(&s->raq, aio); + } + nni_mtx_unlock(&s->mtx); } +static int +pair0_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + pair0_sock *s = arg; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->mtx); + rv = nni_lmq_resize(&s->wmq, (size_t) val); + // Changing the size of the queue can affect our readiness. + if (!nni_lmq_full(&s->wmq)) { + nni_pollable_raise(&s->writable); + } else if (!s->wr_ready) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +pair0_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair0_sock *s = arg; + int val; + + nni_mtx_lock(&s->mtx); + val = nni_lmq_cap(&s->wmq); + nni_mtx_unlock(&s->mtx); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +pair0_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + pair0_sock *s = arg; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->mtx); + rv = nni_lmq_resize(&s->rmq, (size_t) val); + // Changing the size of the queue can affect our readiness. + if (!nni_lmq_empty(&s->rmq)) { + nni_pollable_raise(&s->readable); + } else if (!s->rd_ready) { + nni_pollable_clear(&s->readable); + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +pair0_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair0_sock *s = arg; + int val; + + nni_mtx_lock(&s->mtx); + val = nni_lmq_cap(&s->rmq); + nni_mtx_unlock(&s->mtx); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +pair0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static int +pair0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static nni_option pair0_sock_options[] = { + { + .o_name = NNG_OPT_RECVFD, + .o_get = pair0_sock_get_recv_fd, + }, + { + .o_name = NNG_OPT_SENDFD, + .o_get = pair0_sock_get_send_fd, + }, + { + .o_name = NNG_OPT_SENDBUF, + .o_get = pair0_get_send_buf_len, + .o_set = pair0_set_send_buf_len, + }, + { + .o_name = NNG_OPT_RECVBUF, + .o_get = pair0_get_recv_buf_len, + .o_set = pair0_set_recv_buf_len, + }, + // terminate list + { + .o_name = NULL, + }, +}; + static nni_proto_pipe_ops pair0_pipe_ops = { .pipe_size = sizeof(pair0_pipe), .pipe_init = pair0_pipe_init, @@ -255,13 +564,6 @@ static nni_proto_pipe_ops pair0_pipe_ops = { .pipe_stop = pair0_pipe_stop, }; -static nni_option pair0_sock_options[] = { - // terminate list - { - .o_name = NULL, - } -}; - static nni_proto_sock_ops pair0_sock_ops = { .sock_size = sizeof(pair0_sock), .sock_init = pair0_sock_init, @@ -293,13 +595,13 @@ static nni_proto pair0_proto_raw = { }; int -nng_pair0_open(nng_socket *sidp) +nng_pair0_open(nng_socket *sock) { - return (nni_proto_open(sidp, &pair0_proto)); + return (nni_proto_open(sock, &pair0_proto)); } int -nng_pair0_open_raw(nng_socket *sidp) +nng_pair0_open_raw(nng_socket *sock) { - return (nni_proto_open(sidp, &pair0_proto_raw)); + return (nni_proto_open(sock, &pair0_proto_raw)); } -- cgit v1.2.3-70-g09d2