summaryrefslogtreecommitdiff
path: root/src/sp/protocol
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-12-25 12:35:54 -0800
committerGarrett D'Amore <garrett@damore.org>2021-12-25 12:35:54 -0800
commitbeb8eb05310ac945ede357e2e57152f0d71e38ed (patch)
treed8447284f89fa6de46008435db1c5363a3767ffd /src/sp/protocol
parent2049626be08cd584c3314df7a9ab49c114ba5254 (diff)
downloadnng-beb8eb05310ac945ede357e2e57152f0d71e38ed.tar.gz
nng-beb8eb05310ac945ede357e2e57152f0d71e38ed.tar.bz2
nng-beb8eb05310ac945ede357e2e57152f0d71e38ed.zip
Bus aio's can be inline.
Diffstat (limited to 'src/sp/protocol')
-rw-r--r--src/sp/protocol/bus0/bus.c121
1 files changed, 56 insertions, 65 deletions
diff --git a/src/sp/protocol/bus0/bus.c b/src/sp/protocol/bus0/bus.c
index 9a610ac6..5ec393ba 100644
--- a/src/sp/protocol/bus0/bus.c
+++ b/src/sp/protocol/bus0/bus.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -42,7 +42,7 @@ static void bus0_pipe_putq_cb(void *);
// bus0_sock is our per-socket protocol private structure.
struct bus0_sock {
- nni_aio * aio_getq;
+ nni_aio aio_getq;
nni_list pipes;
nni_mtx mtx;
nni_msgq *uwq;
@@ -52,14 +52,14 @@ struct bus0_sock {
// bus0_pipe is our per-pipe protocol private structure.
struct bus0_pipe {
- nni_pipe * npipe;
- bus0_sock * psock;
- nni_msgq * sendq;
+ nni_pipe *npipe;
+ bus0_sock *psock;
+ nni_msgq *sendq;
nni_list_node node;
- nni_aio * aio_getq;
- nni_aio * aio_recv;
- nni_aio * aio_send;
- nni_aio * aio_putq;
+ nni_aio aio_getq;
+ nni_aio aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_putq;
nni_mtx mtx;
};
@@ -68,7 +68,7 @@ bus0_sock_fini(void *arg)
{
bus0_sock *s = arg;
- nni_aio_free(s->aio_getq);
+ nni_aio_fini(&s->aio_getq);
nni_mtx_fini(&s->mtx);
}
@@ -76,14 +76,10 @@ static int
bus0_sock_init(void *arg, nni_sock *nsock)
{
bus0_sock *s = arg;
- int rv;
NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) {
- bus0_sock_fini(s);
- return (rv);
- }
+ nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s);
s->uwq = nni_sock_sendq(nsock);
s->urq = nni_sock_recvq(nsock);
s->raw = false;
@@ -95,15 +91,10 @@ static int
bus0_sock_init_raw(void *arg, nni_sock *nsock)
{
bus0_sock *s = arg;
- int rv;
NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb_raw, s)) !=
- 0) {
- bus0_sock_fini(s);
- return (rv);
- }
+ nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s);
s->uwq = nni_sock_sendq(nsock);
s->urq = nni_sock_recvq(nsock);
s->raw = true;
@@ -124,7 +115,7 @@ bus0_sock_close(void *arg)
{
bus0_sock *s = arg;
- nni_aio_close(s->aio_getq);
+ nni_aio_close(&s->aio_getq);
}
static void
@@ -132,10 +123,10 @@ bus0_pipe_stop(void *arg)
{
bus0_pipe *p = arg;
- nni_aio_stop(p->aio_getq);
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_putq);
+ nni_aio_stop(&p->aio_getq);
+ nni_aio_stop(&p->aio_send);
+ nni_aio_stop(&p->aio_recv);
+ nni_aio_stop(&p->aio_putq);
}
static void
@@ -143,10 +134,10 @@ bus0_pipe_fini(void *arg)
{
bus0_pipe *p = arg;
- nni_aio_free(p->aio_getq);
- nni_aio_free(p->aio_send);
- nni_aio_free(p->aio_recv);
- nni_aio_free(p->aio_putq);
+ nni_aio_fini(&p->aio_getq);
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
+ nni_aio_fini(&p->aio_putq);
nni_msgq_fini(p->sendq);
nni_mtx_fini(&p->mtx);
}
@@ -159,11 +150,11 @@ bus0_pipe_init(void *arg, nni_pipe *npipe, void *s)
NNI_LIST_NODE_INIT(&p->node);
nni_mtx_init(&p->mtx);
- if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_send, bus0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) {
+ nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p);
+ nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p);
+ nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p);
+ if ((rv = nni_msgq_init(&p->sendq, 16)) != 0) {
bus0_pipe_fini(p);
return (rv);
}
@@ -200,10 +191,10 @@ bus0_pipe_close(void *arg)
bus0_pipe *p = arg;
bus0_sock *s = p->psock;
- nni_aio_close(p->aio_getq);
- nni_aio_close(p->aio_send);
- nni_aio_close(p->aio_recv);
- nni_aio_close(p->aio_putq);
+ nni_aio_close(&p->aio_getq);
+ nni_aio_close(&p->aio_send);
+ nni_aio_close(&p->aio_recv);
+ nni_aio_close(&p->aio_putq);
nni_msgq_close(p->sendq);
nni_mtx_lock(&s->mtx);
@@ -218,15 +209,15 @@ bus0_pipe_getq_cb(void *arg)
{
bus0_pipe *p = arg;
- if (nni_aio_result(p->aio_getq) != 0) {
+ if (nni_aio_result(&p->aio_getq) != 0) {
// closed?
nni_pipe_close(p->npipe);
return;
}
- nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
- nni_aio_set_msg(p->aio_getq, NULL);
+ nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq));
+ nni_aio_set_msg(&p->aio_getq, NULL);
- nni_pipe_send(p->npipe, p->aio_send);
+ nni_pipe_send(p->npipe, &p->aio_send);
}
static void
@@ -234,10 +225,10 @@ bus0_pipe_send_cb(void *arg)
{
bus0_pipe *p = arg;
- if (nni_aio_result(p->aio_send) != 0) {
+ if (nni_aio_result(&p->aio_send) != 0) {
// closed?
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->npipe);
return;
}
@@ -250,22 +241,22 @@ bus0_pipe_recv_cb(void *arg)
{
bus0_pipe *p = arg;
bus0_sock *s = p->psock;
- nni_msg * msg;
+ nni_msg *msg;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->npipe);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
+ msg = nni_aio_get_msg(&p->aio_recv);
if (s->raw) {
nni_msg_header_append_u32(msg, nni_pipe_id(p->npipe));
}
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
- nni_aio_set_msg(p->aio_putq, msg);
- nni_aio_set_msg(p->aio_recv, NULL);
- nni_msgq_aio_put(s->urq, p->aio_putq);
+ nni_aio_set_msg(&p->aio_putq, msg);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_msgq_aio_put(s->urq, &p->aio_putq);
}
static void
@@ -273,9 +264,9 @@ bus0_pipe_putq_cb(void *arg)
{
bus0_pipe *p = arg;
- if (nni_aio_result(p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_putq));
- nni_aio_set_msg(p->aio_putq, NULL);
+ if (nni_aio_result(&p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_putq));
+ nni_aio_set_msg(&p->aio_putq, NULL);
nni_pipe_close(p->npipe);
return;
}
@@ -290,14 +281,14 @@ bus0_sock_getq_cb(void *arg)
bus0_sock *s = arg;
bus0_pipe *p;
bus0_pipe *lastp;
- nni_msg * msg;
- nni_msg * dup;
+ nni_msg *msg;
+ nni_msg *dup;
- if (nni_aio_result(s->aio_getq) != 0) {
+ if (nni_aio_result(&s->aio_getq) != 0) {
return;
}
- msg = nni_aio_get_msg(s->aio_getq);
+ msg = nni_aio_get_msg(&s->aio_getq);
// We ignore any headers present for cooked mode.
nni_msg_header_clear(msg);
@@ -328,14 +319,14 @@ bus0_sock_getq_cb_raw(void *arg)
{
bus0_sock *s = arg;
bus0_pipe *p;
- nni_msg * msg;
+ nni_msg *msg;
uint32_t sender;
- if (nni_aio_result(s->aio_getq) != 0) {
+ if (nni_aio_result(&s->aio_getq) != 0) {
return;
}
- msg = nni_aio_get_msg(s->aio_getq);
+ msg = nni_aio_get_msg(&s->aio_getq);
// The header being present indicates that the message
// was received locally and we are rebroadcasting. (Device
@@ -366,19 +357,19 @@ bus0_sock_getq_cb_raw(void *arg)
static void
bus0_sock_getq(bus0_sock *s)
{
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ nni_msgq_aio_get(s->uwq, &s->aio_getq);
}
static void
bus0_pipe_getq(bus0_pipe *p)
{
- nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_msgq_aio_get(p->sendq, &p->aio_getq);
}
static void
bus0_pipe_recv(bus0_pipe *p)
{
- nni_pipe_recv(p->npipe, p->aio_recv);
+ nni_pipe_recv(p->npipe, &p->aio_recv);
}
static void