diff options
| author | Garrett D'Amore <garrett@damore.org> | 2021-12-25 12:35:54 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2021-12-25 12:35:54 -0800 |
| commit | beb8eb05310ac945ede357e2e57152f0d71e38ed (patch) | |
| tree | d8447284f89fa6de46008435db1c5363a3767ffd | |
| parent | 2049626be08cd584c3314df7a9ab49c114ba5254 (diff) | |
| download | nng-beb8eb05310ac945ede357e2e57152f0d71e38ed.tar.gz nng-beb8eb05310ac945ede357e2e57152f0d71e38ed.tar.bz2 nng-beb8eb05310ac945ede357e2e57152f0d71e38ed.zip | |
Bus aio's can be inline.
| -rw-r--r-- | src/core/lmq.c | 2 | ||||
| -rw-r--r-- | src/sp/protocol/bus0/bus.c | 121 |
2 files changed, 57 insertions, 66 deletions
diff --git a/src/core/lmq.c b/src/core/lmq.c index 47058fc9..468debbe 100644 --- a/src/core/lmq.c +++ b/src/core/lmq.c @@ -10,7 +10,7 @@ #include "nng_impl.h" // Light-weight message queue. These are derived from our heavy-weight -// message queues, but are less "featureful", but more useful for +// message queues, but are less "featured", but more useful for // performance sensitive contexts. Locking must be done by the caller. int diff --git a/src/sp/protocol/bus0/bus.c b/src/sp/protocol/bus0/bus.c index 9a610ac6..5ec393ba 100644 --- a/src/sp/protocol/bus0/bus.c +++ b/src/sp/protocol/bus0/bus.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -42,7 +42,7 @@ static void bus0_pipe_putq_cb(void *); // bus0_sock is our per-socket protocol private structure. struct bus0_sock { - nni_aio * aio_getq; + nni_aio aio_getq; nni_list pipes; nni_mtx mtx; nni_msgq *uwq; @@ -52,14 +52,14 @@ struct bus0_sock { // bus0_pipe is our per-pipe protocol private structure. struct bus0_pipe { - nni_pipe * npipe; - bus0_sock * psock; - nni_msgq * sendq; + nni_pipe *npipe; + bus0_sock *psock; + nni_msgq *sendq; nni_list_node node; - nni_aio * aio_getq; - nni_aio * aio_recv; - nni_aio * aio_send; - nni_aio * aio_putq; + nni_aio aio_getq; + nni_aio aio_recv; + nni_aio aio_send; + nni_aio aio_putq; nni_mtx mtx; }; @@ -68,7 +68,7 @@ bus0_sock_fini(void *arg) { bus0_sock *s = arg; - nni_aio_free(s->aio_getq); + nni_aio_fini(&s->aio_getq); nni_mtx_fini(&s->mtx); } @@ -76,14 +76,10 @@ static int bus0_sock_init(void *arg, nni_sock *nsock) { bus0_sock *s = arg; - int rv; NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { - bus0_sock_fini(s); - return (rv); - } + nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s); s->uwq = nni_sock_sendq(nsock); s->urq = nni_sock_recvq(nsock); s->raw = false; @@ -95,15 +91,10 @@ static int bus0_sock_init_raw(void *arg, nni_sock *nsock) { bus0_sock *s = arg; - int rv; NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != - 0) { - bus0_sock_fini(s); - return (rv); - } + nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s); s->uwq = nni_sock_sendq(nsock); s->urq = nni_sock_recvq(nsock); s->raw = true; @@ -124,7 +115,7 @@ bus0_sock_close(void *arg) { bus0_sock *s = arg; - nni_aio_close(s->aio_getq); + nni_aio_close(&s->aio_getq); } static void @@ -132,10 +123,10 @@ bus0_pipe_stop(void *arg) { bus0_pipe *p = arg; - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); + nni_aio_stop(&p->aio_getq); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); + nni_aio_stop(&p->aio_putq); } static void @@ -143,10 +134,10 @@ bus0_pipe_fini(void *arg) { bus0_pipe *p = arg; - nni_aio_free(p->aio_getq); - nni_aio_free(p->aio_send); - nni_aio_free(p->aio_recv); - nni_aio_free(p->aio_putq); + nni_aio_fini(&p->aio_getq); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); + nni_aio_fini(&p->aio_putq); nni_msgq_fini(p->sendq); nni_mtx_fini(&p->mtx); } @@ -159,11 +150,11 @@ bus0_pipe_init(void *arg, nni_pipe *npipe, void *s) NNI_LIST_NODE_INIT(&p->node); nni_mtx_init(&p->mtx); - if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_alloc(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { + nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p); + nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p); + nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p); + if ((rv = nni_msgq_init(&p->sendq, 16)) != 0) { bus0_pipe_fini(p); return (rv); } @@ -200,10 +191,10 @@ bus0_pipe_close(void *arg) bus0_pipe *p = arg; bus0_sock *s = p->psock; - nni_aio_close(p->aio_getq); - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); - nni_aio_close(p->aio_putq); + nni_aio_close(&p->aio_getq); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); + nni_aio_close(&p->aio_putq); nni_msgq_close(p->sendq); nni_mtx_lock(&s->mtx); @@ -218,15 +209,15 @@ bus0_pipe_getq_cb(void *arg) { bus0_pipe *p = arg; - if (nni_aio_result(p->aio_getq) != 0) { + if (nni_aio_result(&p->aio_getq) != 0) { // closed? nni_pipe_close(p->npipe); return; } - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); + nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq)); + nni_aio_set_msg(&p->aio_getq, NULL); - nni_pipe_send(p->npipe, p->aio_send); + nni_pipe_send(p->npipe, &p->aio_send); } static void @@ -234,10 +225,10 @@ bus0_pipe_send_cb(void *arg) { bus0_pipe *p = arg; - if (nni_aio_result(p->aio_send) != 0) { + if (nni_aio_result(&p->aio_send) != 0) { // closed? - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->npipe); return; } @@ -250,22 +241,22 @@ bus0_pipe_recv_cb(void *arg) { bus0_pipe *p = arg; bus0_sock *s = p->psock; - nni_msg * msg; + nni_msg *msg; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->npipe); return; } - msg = nni_aio_get_msg(p->aio_recv); + msg = nni_aio_get_msg(&p->aio_recv); if (s->raw) { nni_msg_header_append_u32(msg, nni_pipe_id(p->npipe)); } nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); - nni_aio_set_msg(p->aio_putq, msg); - nni_aio_set_msg(p->aio_recv, NULL); - nni_msgq_aio_put(s->urq, p->aio_putq); + nni_aio_set_msg(&p->aio_putq, msg); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msgq_aio_put(s->urq, &p->aio_putq); } static void @@ -273,9 +264,9 @@ bus0_pipe_putq_cb(void *arg) { bus0_pipe *p = arg; - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); + if (nni_aio_result(&p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_putq)); + nni_aio_set_msg(&p->aio_putq, NULL); nni_pipe_close(p->npipe); return; } @@ -290,14 +281,14 @@ bus0_sock_getq_cb(void *arg) bus0_sock *s = arg; bus0_pipe *p; bus0_pipe *lastp; - nni_msg * msg; - nni_msg * dup; + nni_msg *msg; + nni_msg *dup; - if (nni_aio_result(s->aio_getq) != 0) { + if (nni_aio_result(&s->aio_getq) != 0) { return; } - msg = nni_aio_get_msg(s->aio_getq); + msg = nni_aio_get_msg(&s->aio_getq); // We ignore any headers present for cooked mode. nni_msg_header_clear(msg); @@ -328,14 +319,14 @@ bus0_sock_getq_cb_raw(void *arg) { bus0_sock *s = arg; bus0_pipe *p; - nni_msg * msg; + nni_msg *msg; uint32_t sender; - if (nni_aio_result(s->aio_getq) != 0) { + if (nni_aio_result(&s->aio_getq) != 0) { return; } - msg = nni_aio_get_msg(s->aio_getq); + msg = nni_aio_get_msg(&s->aio_getq); // The header being present indicates that the message // was received locally and we are rebroadcasting. (Device @@ -366,19 +357,19 @@ bus0_sock_getq_cb_raw(void *arg) static void bus0_sock_getq(bus0_sock *s) { - nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_msgq_aio_get(s->uwq, &s->aio_getq); } static void bus0_pipe_getq(bus0_pipe *p) { - nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_msgq_aio_get(p->sendq, &p->aio_getq); } static void bus0_pipe_recv(bus0_pipe *p) { - nni_pipe_recv(p->npipe, p->aio_recv); + nni_pipe_recv(p->npipe, &p->aio_recv); } static void |
