diff options
| author | Garrett D'Amore <garrett@damore.org> | 2021-12-27 23:13:42 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2021-12-27 23:13:42 -0800 |
| commit | 92bf6fd4cccc548e14ae826c1d36851f98378da0 (patch) | |
| tree | ca3d35f130e7ec6e51ede51142cc34c5f73ff6ca /src/sp/protocol/bus0/bus.c | |
| parent | 2fbfd7e5c3ad245de2c905720eb8d9d5b27b6739 (diff) | |
| download | nng-92bf6fd4cccc548e14ae826c1d36851f98378da0.tar.gz nng-92bf6fd4cccc548e14ae826c1d36851f98378da0.tar.bz2 nng-92bf6fd4cccc548e14ae826c1d36851f98378da0.zip | |
Bus socket converted to use lmq.
This should give significant performance boosts to anyone using this
protocol. Buffering on both the send and receive side is supported,
with a default buffer size of 16 messages. This should help provide
a reasonable default case for most users.
While here updated the test for bus to much more complete
NUTS style test framework, with increased coverage.
Diffstat (limited to 'src/sp/protocol/bus0/bus.c')
| -rw-r--r-- | src/sp/protocol/bus0/bus.c | 419 |
1 files changed, 257 insertions, 162 deletions
diff --git a/src/sp/protocol/bus0/bus.c b/src/sp/protocol/bus0/bus.c index ab857d72..faa94c13 100644 --- a/src/sp/protocol/bus0/bus.c +++ b/src/sp/protocol/bus0/bus.c @@ -13,6 +13,7 @@ #include "core/nng_impl.h" #include "nng/protocol/bus0/bus.h" +#include <stdio.h> // Bus protocol. The BUS protocol, each peer sends a message to its peers. // However, bus protocols do not "forward" (absent a device). So in order @@ -26,41 +27,35 @@ typedef struct bus0_pipe bus0_pipe; typedef struct bus0_sock bus0_sock; -static void bus0_sock_getq(bus0_sock *); static void bus0_sock_send(void *, nni_aio *); static void bus0_sock_recv(void *, nni_aio *); -static void bus0_pipe_getq(bus0_pipe *); static void bus0_pipe_recv(bus0_pipe *); -static void bus0_sock_getq_cb(void *); -static void bus0_sock_getq_cb_raw(void *); -static void bus0_pipe_getq_cb(void *); static void bus0_pipe_send_cb(void *); static void bus0_pipe_recv_cb(void *); -static void bus0_pipe_putq_cb(void *); // bus0_sock is our per-socket protocol private structure. struct bus0_sock { - nni_aio aio_getq; - nni_list pipes; - nni_mtx mtx; - nni_msgq *uwq; - nni_msgq *urq; - bool raw; + nni_list pipes; + nni_mtx mtx; + nni_pollable can_send; + nni_pollable can_recv; + nni_lmq recv_msgs; + nni_list recv_wait; + int send_buf; + bool raw; }; // bus0_pipe is our per-pipe protocol private structure. struct bus0_pipe { - nni_pipe *npipe; - bus0_sock *psock; - nni_msgq *sendq; + nni_pipe *pipe; + bus0_sock *bus; + nni_lmq send_queue; nni_list_node node; - nni_aio aio_getq; - nni_aio aio_recv; - nni_aio aio_send; - nni_aio aio_putq; - nni_mtx mtx; + bool busy; + nni_aio aio_recv; + nni_aio aio_send; }; static void @@ -68,50 +63,57 @@ bus0_sock_fini(void *arg) { bus0_sock *s = arg; - nni_aio_fini(&s->aio_getq); nni_mtx_fini(&s->mtx); + nni_pollable_fini(&s->can_send); + nni_pollable_fini(&s->can_recv); + nni_lmq_fini(&s->recv_msgs); } static void -bus0_sock_init(void *arg, nni_sock *nsock) +bus0_sock_init(void *arg, nni_sock *ns) { bus0_sock *s = arg; + NNI_ARG_UNUSED(ns); + NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s); - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); + nni_aio_list_init(&s->recv_wait); + nni_pollable_init(&s->can_send); + nni_pollable_init(&s->can_recv); + nni_lmq_init(&s->recv_msgs, 16); + s->send_buf = 16; + s->raw = false; } static void -bus0_sock_init_raw(void *arg, nni_sock *nsock) +bus0_sock_init_raw(void *arg, nni_sock *ns) { bus0_sock *s = arg; - NNI_LIST_INIT(&s->pipes, bus0_pipe, node); - nni_mtx_init(&s->mtx); - nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s); - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); + bus0_sock_init(arg, ns); s->raw = true; } static void bus0_sock_open(void *arg) { - bus0_sock *s = arg; - - bus0_sock_getq(s); + NNI_ARG_UNUSED(arg); } static void bus0_sock_close(void *arg) { bus0_sock *s = arg; + nni_aio *aio; - nni_aio_close(&s->aio_getq); + nni_mtx_lock(&s->mtx); + while ((aio = nni_list_first(&s->recv_wait)) != NULL) { + nni_list_remove(&s->recv_wait, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&s->mtx); } static void @@ -119,10 +121,8 @@ bus0_pipe_stop(void *arg) { bus0_pipe *p = arg; - nni_aio_stop(&p->aio_getq); nni_aio_stop(&p->aio_send); nni_aio_stop(&p->aio_recv); - nni_aio_stop(&p->aio_putq); } static void @@ -130,33 +130,23 @@ bus0_pipe_fini(void *arg) { bus0_pipe *p = arg; - nni_aio_fini(&p->aio_getq); nni_aio_fini(&p->aio_send); nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_putq); - nni_msgq_fini(p->sendq); - nni_mtx_fini(&p->mtx); + nni_lmq_fini(&p->send_queue); } static int -bus0_pipe_init(void *arg, nni_pipe *npipe, void *s) +bus0_pipe_init(void *arg, nni_pipe *np, void *s) { bus0_pipe *p = arg; - int rv; + p->pipe = np; + p->bus = s; NNI_LIST_NODE_INIT(&p->node); - nni_mtx_init(&p->mtx); - nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p); nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p); nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p); - nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p); - if ((rv = nni_msgq_init(&p->sendq, 16)) != 0) { - bus0_pipe_fini(p); - return (rv); - } + nni_lmq_init(&p->send_queue, p->bus->send_buf); - p->npipe = npipe; - p->psock = s; return (0); } @@ -164,10 +154,9 @@ static int bus0_pipe_start(void *arg) { bus0_pipe *p = arg; - bus0_sock *s = p->psock; + bus0_sock *s = p->bus; - if (nni_pipe_peer(p->npipe) != NNI_PROTO_BUS_V0) { - // Peer protocol mismatch. + if (nni_pipe_peer(p->pipe) != NNI_PROTO_BUS_V0) { return (NNG_EPROTO); } @@ -176,7 +165,6 @@ bus0_pipe_start(void *arg) nni_mtx_unlock(&s->mtx); bus0_pipe_recv(p); - bus0_pipe_getq(p); return (0); } @@ -185,15 +173,13 @@ static void bus0_pipe_close(void *arg) { bus0_pipe *p = arg; - bus0_sock *s = p->psock; + bus0_sock *s = p->bus; - nni_aio_close(&p->aio_getq); nni_aio_close(&p->aio_send); nni_aio_close(&p->aio_recv); - nni_aio_close(&p->aio_putq); - nni_msgq_close(p->sendq); nni_mtx_lock(&s->mtx); + nni_lmq_flush(&p->send_queue); if (nni_list_active(&s->pipes, p)) { nni_list_remove(&s->pipes, p); } @@ -201,187 +187,278 @@ bus0_pipe_close(void *arg) } static void -bus0_pipe_getq_cb(void *arg) -{ - bus0_pipe *p = arg; - - if (nni_aio_result(&p->aio_getq) != 0) { - // closed? - nni_pipe_close(p->npipe); - return; - } - nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq)); - nni_aio_set_msg(&p->aio_getq, NULL); - - nni_pipe_send(p->npipe, &p->aio_send); -} - -static void bus0_pipe_send_cb(void *arg) { bus0_pipe *p = arg; + bus0_sock *s = p->bus; + nni_msg *msg; if (nni_aio_result(&p->aio_send) != 0) { // closed? nni_msg_free(nni_aio_get_msg(&p->aio_send)); nni_aio_set_msg(&p->aio_send, NULL); - nni_pipe_close(p->npipe); + nni_pipe_close(p->pipe); return; } - bus0_pipe_getq(p); + nni_mtx_lock(&s->mtx); + if (nni_lmq_get(&p->send_queue, &msg) == 0) { + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); + } else { + p->busy = false; + } + nni_mtx_unlock(&s->mtx); } static void bus0_pipe_recv_cb(void *arg) { - bus0_pipe *p = arg; - bus0_sock *s = p->psock; + bus0_pipe *p = arg; + bus0_sock *s = p->bus; + nni_aio *aio = NULL; nni_msg *msg; if (nni_aio_result(&p->aio_recv) != 0) { - nni_pipe_close(p->npipe); + nni_pipe_close(p->pipe); return; } + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + nni_mtx_lock(&s->mtx); if (s->raw) { - nni_msg_header_append_u32(msg, nni_pipe_id(p->npipe)); + nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); } - nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); - nni_aio_set_msg(&p->aio_putq, msg); - nni_aio_set_msg(&p->aio_recv, NULL); - nni_msgq_aio_put(s->urq, &p->aio_putq); + if (!nni_list_empty(&s->recv_wait)) { + aio = nni_list_first(&s->recv_wait); + nni_aio_list_remove(aio); + nni_aio_set_msg(aio, msg); + } else if (nni_lmq_put(&s->recv_msgs, msg) == 0) { + nni_pollable_raise(&s->can_recv); + } else { + // dropped message due to no room + nni_msg_free(msg); + } + nni_mtx_unlock(&s->mtx); + + if (aio != NULL) { + nni_aio_finish_sync(aio, 0, nni_msg_len(msg)); + } + bus0_pipe_recv(p); } static void -bus0_pipe_putq_cb(void *arg) +bus0_pipe_recv(bus0_pipe *p) { - bus0_pipe *p = arg; - - if (nni_aio_result(&p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(&p->aio_putq)); - nni_aio_set_msg(&p->aio_putq, NULL); - nni_pipe_close(p->npipe); - return; - } - - // Wait for another recv. - bus0_pipe_recv(p); + nni_pipe_recv(p->pipe, &p->aio_recv); } static void -bus0_sock_getq_cb(void *arg) +bus0_sock_send(void *arg, nni_aio *aio) { bus0_sock *s = arg; - bus0_pipe *p; - bus0_pipe *lastp; nni_msg *msg; - nni_msg *dup; + bus0_pipe *pipe; + uint32_t sender = 0; + size_t len; - if (nni_aio_result(&s->aio_getq) != 0) { + if (nni_aio_begin(aio) != 0) { return; } - msg = nni_aio_get_msg(&s->aio_getq); + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); - // We ignore any headers present for cooked mode. - nni_msg_header_clear(msg); + if (s->raw) { + // In raw mode, we look for the message header, to see if it + // is being resent from another pipe (e.g. via a device). + // We don't want to send it back to the originator. + if (nni_msg_header_len(msg) >= sizeof(uint32_t)) { + sender = nni_msg_header_trim_u32(msg); + } + } else { + // In cooked mode just strip the header. + nni_msg_header_clear(msg); + } nni_mtx_lock(&s->mtx); - lastp = nni_list_last(&s->pipes); - NNI_LIST_FOREACH (&s->pipes, p) { - if (p != lastp) { - if (nni_msg_dup(&dup, msg) != 0) { - continue; - } - } else { - dup = msg; - msg = NULL; + NNI_LIST_FOREACH (&s->pipes, pipe) { + + if (s->raw && nni_pipe_id(pipe->pipe) == sender) { + continue; } - if (nni_msgq_tryput(p->sendq, dup) != 0) { - nni_msg_free(dup); + + // if the pipe isn't busy, then send this message direct. + if (!pipe->busy) { + pipe->busy = true; + nni_msg_clone(msg); + nni_aio_set_msg(&pipe->aio_send, msg); + nni_pipe_send(pipe->pipe, &pipe->aio_send); + } else if (!nni_lmq_full(&pipe->send_queue)) { + nni_msg_clone(msg); + nni_lmq_put(&pipe->send_queue, msg); } } nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + nni_aio_finish(aio, 0, len); +} - bus0_sock_getq(s); +static void +bus0_recv_cancel(nng_aio *aio, void *arg, int rv) +{ + bus0_sock *s = arg; + nni_mtx_lock(&s->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->mtx); } static void -bus0_sock_getq_cb_raw(void *arg) +bus0_sock_recv(void *arg, nni_aio *aio) { bus0_sock *s = arg; - bus0_pipe *p; nni_msg *msg; - uint32_t sender; - if (nni_aio_result(&s->aio_getq) != 0) { + if (nni_aio_begin(aio) != 0) { return; } - msg = nni_aio_get_msg(&s->aio_getq); - - // The header being present indicates that the message - // was received locally and we are rebroadcasting. (Device - // is doing this probably.) In this case grab the pipe - // ID from the header, so we can exclude it. - if (nni_msg_header_len(msg) >= 4) { - sender = nni_msg_header_trim_u32(msg); - } else { - sender = 0; - } - nni_mtx_lock(&s->mtx); - NNI_LIST_FOREACH (&s->pipes, p) { - if (nni_pipe_id(p->npipe) == sender) { - continue; - } - nni_msg_clone(msg); - if (nni_msgq_tryput(p->sendq, msg) != 0) { - nni_msg_free(msg); +again: + if (nni_lmq_empty(&s->recv_msgs)) { + int rv; + if ((rv = nni_aio_schedule(aio, bus0_recv_cancel, s)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; } + nni_list_append(&s->recv_wait, aio); + nni_mtx_unlock(&s->mtx); + return; + } + + (void) nni_lmq_get(&s->recv_msgs, &msg); + + if (nni_lmq_empty(&s->recv_msgs)) { + nni_pollable_clear(&s->can_recv); } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } + nni_aio_set_msg(aio, msg); nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); + nni_aio_finish(aio, 0, nni_msg_len(msg)); +} - bus0_sock_getq(s); +static int +bus0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_type t) +{ + bus0_sock *sock = arg; + int fd; + int rv; + nni_mtx_lock(&sock->mtx); + // BUS sockets are *always* writable (best effort) + nni_pollable_raise(&sock->can_send); + rv = nni_pollable_getfd(&sock->can_send, &fd); + nni_mtx_unlock(&sock->mtx); + + if (rv == 0) { + rv = nni_copyout_int(fd, buf, szp, t); + } + return (rv); } -static void -bus0_sock_getq(bus0_sock *s) +static int +bus0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) { - nni_msgq_aio_get(s->uwq, &s->aio_getq); + bus0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->can_recv, &fd)) == 0) { + rv = nni_copyout_int(fd, buf, szp, t); + } + return (rv); } -static void -bus0_pipe_getq(bus0_pipe *p) +static int +bus0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { - nni_msgq_aio_get(p->sendq, &p->aio_getq); + bus0_sock *s = arg; + int val; + nni_mtx_lock(&s->mtx); + val = (int) nni_lmq_cap(&s->recv_msgs); + nni_mtx_unlock(&s->mtx); + + return (nni_copyout_int(val, buf, szp, t)); } -static void -bus0_pipe_recv(bus0_pipe *p) +static int +bus0_sock_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { - nni_pipe_recv(p->npipe, &p->aio_recv); + bus0_sock *s = arg; + int val; + nni_mtx_lock(&s->mtx); + val = s->send_buf; + nni_mtx_unlock(&s->mtx); + return (nni_copyout_int(val, buf, szp, t)); } -static void -bus0_sock_send(void *arg, nni_aio *aio) +static int +bus0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { bus0_sock *s = arg; + int val; + int rv; - nni_msgq_aio_put(s->uwq, aio); + if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->mtx); + if ((rv = nni_lmq_resize(&s->recv_msgs, (size_t) val)) != 0) { + nni_mtx_unlock(&s->mtx); + return (rv); + } + + nni_mtx_unlock(&s->mtx); + return (0); } -static void -bus0_sock_recv(void *arg, nni_aio *aio) +static int +bus0_sock_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { bus0_sock *s = arg; + bus0_pipe *p; + int val; + int rv; - nni_msgq_aio_get(s->urq, aio); + if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) { + return (rv); + } + + nni_mtx_lock(&s->mtx); + s->send_buf = val; + NNI_LIST_FOREACH (&s->pipes, p) { + // If we fail part way through (should only be ENOMEM), we + // stop short. The others would likely fail for ENOMEM as + // well anyway. There is a weird effect here where the + // buffers may have been set for *some* of the pipes, but + // we have no way to correct partial failure. + if ((rv = nni_lmq_resize(&p->send_queue, (size_t) val)) != 0) { + break; + } + } + nni_mtx_unlock(&s->mtx); + return (rv); } static nni_proto_pipe_ops bus0_pipe_ops = { @@ -394,6 +471,24 @@ static nni_proto_pipe_ops bus0_pipe_ops = { }; static nni_option bus0_sock_options[] = { + { + .o_name = NNG_OPT_SENDFD, + .o_get = bus0_sock_get_send_fd, + }, + { + .o_name = NNG_OPT_RECVFD, + .o_get = bus0_sock_get_recv_fd, + }, + { + .o_name = NNG_OPT_RECVBUF, + .o_get = bus0_sock_get_recv_buf_len, + .o_set = bus0_sock_set_recv_buf_len, + }, + { + .o_name = NNG_OPT_SENDBUF, + .o_get = bus0_sock_get_send_buf_len, + .o_set = bus0_sock_set_send_buf_len, + }, // terminate list { .o_name = NULL, @@ -441,13 +536,13 @@ static nni_proto bus0_proto_raw = { }; int -nng_bus0_open(nng_socket *sidp) +nng_bus0_open(nng_socket *id) { - return (nni_proto_open(sidp, &bus0_proto)); + return (nni_proto_open(id, &bus0_proto)); } int -nng_bus0_open_raw(nng_socket *sidp) +nng_bus0_open_raw(nng_socket *id) { - return (nni_proto_open(sidp, &bus0_proto_raw)); + return (nni_proto_open(id, &bus0_proto_raw)); } |
