aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/bus
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-31 17:59:01 -0700
committerGarrett D'Amore <garrett@damore.org>2017-09-22 11:47:07 -0700
commitd72076207a2fad96ff014a81366868fb47a0ed1b (patch)
tree5a4f67ab607ef6690e983c2d1ab2c64062027e52 /src/protocol/bus
parent366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff)
downloadnng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them abstractly in more places without inlining them. This will be used for the ZeroTier transport to allow us to create operations consisting of just the AIO. Furthermore, we provide accessors for some of the aio members, in the hopes that we will be able to wrap these for "safe" version of the AIO capability to export to applications, and to protocol and transport implementors. While here we cleaned up the protocol details to use consistently shorter names (no nni_ prefix for static symbols needed), and we also fixed a bug in the surveyor code.
Diffstat (limited to 'src/protocol/bus')
-rw-r--r--src/protocol/bus/bus.c354
1 files changed, 179 insertions, 175 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 6f6def0a..c8d759f2 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -18,251 +18,257 @@
// for each participant to receive the message, each sender must be connected
// to every other node in the network (full mesh).
-typedef struct nni_bus_pipe nni_bus_pipe;
-typedef struct nni_bus_sock nni_bus_sock;
-
-static void nni_bus_sock_getq(nni_bus_sock *);
-static void nni_bus_pipe_getq(nni_bus_pipe *);
-static void nni_bus_pipe_send(nni_bus_pipe *);
-static void nni_bus_pipe_recv(nni_bus_pipe *);
-
-static void nni_bus_sock_getq_cb(void *);
-static void nni_bus_pipe_getq_cb(void *);
-static void nni_bus_pipe_send_cb(void *);
-static void nni_bus_pipe_recv_cb(void *);
-static void nni_bus_pipe_putq_cb(void *);
-
-// An nni_bus_sock is our per-socket protocol private structure.
-struct nni_bus_sock {
+typedef struct bus_pipe bus_pipe;
+typedef struct bus_sock bus_sock;
+
+static void bus_sock_getq(bus_sock *);
+static void bus_pipe_getq(bus_pipe *);
+static void bus_pipe_send(bus_pipe *);
+static void bus_pipe_recv(bus_pipe *);
+
+static void bus_sock_getq_cb(void *);
+static void bus_pipe_getq_cb(void *);
+static void bus_pipe_send_cb(void *);
+static void bus_pipe_recv_cb(void *);
+static void bus_pipe_putq_cb(void *);
+
+// A bus_sock is our per-socket protocol private structure.
+struct bus_sock {
nni_sock *nsock;
int raw;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
};
-// An nni_bus_pipe is our per-pipe protocol private structure.
-struct nni_bus_pipe {
+// A bus_pipe is our per-pipe protocol private structure.
+struct bus_pipe {
nni_pipe * npipe;
- nni_bus_sock *psock;
+ bus_sock * psock;
nni_msgq * sendq;
nni_list_node node;
- nni_aio aio_getq;
- nni_aio aio_recv;
- nni_aio aio_send;
- nni_aio aio_putq;
+ nni_aio * aio_getq;
+ nni_aio * aio_recv;
+ nni_aio * aio_send;
+ nni_aio * aio_putq;
nni_mtx mtx;
};
static void
-nni_bus_sock_fini(void *arg)
+bus_sock_fini(void *arg)
{
- nni_bus_sock *psock = arg;
+ bus_sock *s = arg;
- nni_aio_stop(&psock->aio_getq);
- nni_aio_fini(&psock->aio_getq);
- nni_mtx_fini(&psock->mtx);
- NNI_FREE_STRUCT(psock);
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
}
static int
-nni_bus_sock_init(void **sp, nni_sock *nsock)
+bus_sock_init(void **sp, nni_sock *nsock)
{
- nni_bus_sock *psock;
+ bus_sock *s;
+ int rv;
- if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node);
- nni_mtx_init(&psock->mtx);
- nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock);
- psock->nsock = nsock;
- psock->raw = 0;
+ NNI_LIST_INIT(&s->pipes, bus_pipe, node);
+ nni_mtx_init(&s->mtx);
+ if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) {
+ bus_sock_fini(s);
+ return (rv);
+ }
+ s->nsock = nsock;
+ s->raw = 0;
- *sp = psock;
+ *sp = s;
return (0);
}
static void
-nni_bus_sock_open(void *arg)
+bus_sock_open(void *arg)
{
- nni_bus_sock *psock = arg;
+ bus_sock *s = arg;
- nni_bus_sock_getq(psock);
+ bus_sock_getq(s);
}
static void
-nni_bus_sock_close(void *arg)
+bus_sock_close(void *arg)
{
- nni_bus_sock *psock = arg;
+ bus_sock *s = arg;
- nni_aio_cancel(&psock->aio_getq, NNG_ECLOSED);
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-nni_bus_pipe_fini(void *arg)
+bus_pipe_fini(void *arg)
{
- nni_bus_pipe *ppipe = arg;
-
- nni_aio_fini(&ppipe->aio_getq);
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_aio_fini(&ppipe->aio_putq);
- nni_msgq_fini(ppipe->sendq);
- nni_mtx_fini(&ppipe->mtx);
- NNI_FREE_STRUCT(ppipe);
+ bus_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_msgq_fini(p->sendq);
+ nni_mtx_fini(&p->mtx);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- nni_bus_pipe *ppipe;
- int rv;
+ bus_pipe *p;
+ int rv;
- if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
- NNI_FREE_STRUCT(ppipe);
+ NNI_LIST_NODE_INIT(&p->node);
+ nni_mtx_init(&p->mtx);
+ if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) {
+ bus_pipe_fini(p);
return (rv);
}
- NNI_LIST_NODE_INIT(&ppipe->node);
- nni_mtx_init(&ppipe->mtx);
- nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe);
- nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe);
- nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe);
- nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe);
-
- ppipe->npipe = npipe;
- ppipe->psock = psock;
- *pp = ppipe;
+
+ p->npipe = npipe;
+ p->psock = s;
+ *pp = p;
return (0);
}
static int
-nni_bus_pipe_start(void *arg)
+bus_pipe_start(void *arg)
{
- nni_bus_pipe *ppipe = arg;
- nni_bus_sock *psock = ppipe->psock;
+ bus_pipe *p = arg;
+ bus_sock *s = p->psock;
- nni_mtx_lock(&psock->mtx);
- nni_list_append(&psock->pipes, ppipe);
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_lock(&s->mtx);
+ nni_list_append(&s->pipes, p);
+ nni_mtx_unlock(&s->mtx);
- nni_bus_pipe_recv(ppipe);
- nni_bus_pipe_getq(ppipe);
+ bus_pipe_recv(p);
+ bus_pipe_getq(p);
return (0);
}
static void
-nni_bus_pipe_stop(void *arg)
+bus_pipe_stop(void *arg)
{
- nni_bus_pipe *ppipe = arg;
- nni_bus_sock *psock = ppipe->psock;
+ bus_pipe *p = arg;
+ bus_sock *s = p->psock;
- nni_msgq_close(ppipe->sendq);
+ nni_msgq_close(p->sendq);
- nni_aio_stop(&ppipe->aio_getq);
- nni_aio_stop(&ppipe->aio_send);
- nni_aio_stop(&ppipe->aio_recv);
- nni_aio_stop(&ppipe->aio_putq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
- nni_mtx_lock(&ppipe->psock->mtx);
- if (nni_list_active(&psock->pipes, ppipe)) {
- nni_list_remove(&psock->pipes, ppipe);
+ nni_mtx_lock(&s->mtx);
+ if (nni_list_active(&s->pipes, p)) {
+ nni_list_remove(&s->pipes, p);
}
- nni_mtx_unlock(&ppipe->psock->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_bus_pipe_getq_cb(void *arg)
+bus_pipe_getq_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
+ bus_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_getq) != 0) {
+ if (nni_aio_result(p->aio_getq) != 0) {
// closed?
- nni_pipe_stop(ppipe->npipe);
+ nni_pipe_stop(p->npipe);
return;
}
- ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg;
- ppipe->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
+ nni_pipe_send(p->npipe, p->aio_send);
}
static void
-nni_bus_pipe_send_cb(void *arg)
+bus_pipe_send_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
+ bus_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_send) != 0) {
+ if (nni_aio_result(p->aio_send) != 0) {
// closed?
- nni_msg_free(ppipe->aio_send.a_msg);
- ppipe->aio_send.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- nni_bus_pipe_getq(ppipe);
+ bus_pipe_getq(p);
}
static void
-nni_bus_pipe_recv_cb(void *arg)
+bus_pipe_recv_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
- nni_bus_sock *psock = ppipe->psock;
- nni_msg * msg;
+ bus_pipe *p = arg;
+ bus_sock *s = p->psock;
+ nni_msg * msg;
- if (nni_aio_result(&ppipe->aio_recv) != 0) {
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->npipe);
return;
}
- msg = ppipe->aio_recv.a_msg;
+ msg = nni_aio_get_msg(p->aio_recv);
- if (nni_msg_header_insert_u32(msg, nni_pipe_id(ppipe->npipe)) != 0) {
+ if (nni_msg_header_insert_u32(msg, nni_pipe_id(p->npipe)) != 0) {
// XXX: bump a nomemory stat
nni_msg_free(msg);
- nni_pipe_stop(ppipe->npipe);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- ppipe->aio_putq.a_msg = msg;
- nni_msgq_aio_put(nni_sock_recvq(psock->nsock), &ppipe->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msgq_aio_put(nni_sock_recvq(s->nsock), p->aio_putq);
}
static void
-nni_bus_pipe_putq_cb(void *arg)
+bus_pipe_putq_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
+ bus_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_putq) != 0) {
- nni_msg_free(ppipe->aio_putq.a_msg);
- ppipe->aio_putq.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
// Wait for another recv.
- nni_bus_pipe_recv(ppipe);
+ bus_pipe_recv(p);
}
static void
-nni_bus_sock_getq_cb(void *arg)
+bus_sock_getq_cb(void *arg)
{
- nni_bus_sock *psock = arg;
- nni_bus_pipe *ppipe;
- nni_bus_pipe *lpipe;
- nni_msgq * uwq = nni_sock_sendq(psock->nsock);
- nni_msg * msg;
- nni_msg * dup;
- uint32_t sender;
-
- if (nni_aio_result(&psock->aio_getq) != 0) {
+ bus_sock *s = arg;
+ bus_pipe *p;
+ bus_pipe *lastp;
+ nni_msgq *uwq = nni_sock_sendq(s->nsock);
+ nni_msg * msg;
+ nni_msg * dup;
+ uint32_t sender;
+
+ if (nni_aio_result(s->aio_getq) != 0) {
return;
}
- msg = psock->aio_getq.a_msg;
+ msg = nni_aio_get_msg(s->aio_getq);
// The header being present indicates that the message
// was received locally and we are rebroadcasting. (Device
@@ -274,103 +280,101 @@ nni_bus_sock_getq_cb(void *arg)
sender = 0;
}
- nni_mtx_lock(&psock->mtx);
- lpipe = nni_list_last(&psock->pipes);
- NNI_LIST_FOREACH (&psock->pipes, ppipe) {
- if (nni_pipe_id(ppipe->npipe) == sender) {
+ nni_mtx_lock(&s->mtx);
+ lastp = nni_list_last(&s->pipes);
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ if (nni_pipe_id(p->npipe) == sender) {
continue;
}
- if (ppipe != lpipe) {
+ if (p != lastp) {
if (nni_msg_dup(&dup, msg) != 0) {
continue;
}
} else {
dup = msg;
}
- if (nni_msgq_tryput(ppipe->sendq, dup) != 0) {
+ if (nni_msgq_tryput(p->sendq, dup) != 0) {
nni_msg_free(dup);
}
}
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_unlock(&s->mtx);
- if (lpipe == NULL) {
+ if (lastp == NULL) {
nni_msg_free(msg);
}
- nni_bus_sock_getq(psock);
+ bus_sock_getq(s);
}
static void
-nni_bus_sock_getq(nni_bus_sock *psock)
+bus_sock_getq(bus_sock *s)
{
- nni_msgq_aio_get(nni_sock_sendq(psock->nsock), &psock->aio_getq);
+ nni_msgq_aio_get(nni_sock_sendq(s->nsock), s->aio_getq);
}
static void
-nni_bus_pipe_getq(nni_bus_pipe *ppipe)
+bus_pipe_getq(bus_pipe *p)
{
- nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static void
-nni_bus_pipe_recv(nni_bus_pipe *ppipe)
+bus_pipe_recv(bus_pipe *p)
{
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static int
-nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_bus_sock *psock = arg;
- int rv = NNG_ENOTSUP;
+ bus_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_bus_sock *psock = arg;
- int rv = NNG_ENOTSUP;
+ bus_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&psock->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-static nni_proto_pipe_ops nni_bus_pipe_ops = {
- .pipe_init = nni_bus_pipe_init,
- .pipe_fini = nni_bus_pipe_fini,
- .pipe_start = nni_bus_pipe_start,
- .pipe_stop = nni_bus_pipe_stop,
+static nni_proto_pipe_ops bus_pipe_ops = {
+ .pipe_init = bus_pipe_init,
+ .pipe_fini = bus_pipe_fini,
+ .pipe_start = bus_pipe_start,
+ .pipe_stop = bus_pipe_stop,
};
-static nni_proto_sock_ops nni_bus_sock_ops = {
- .sock_init = nni_bus_sock_init,
- .sock_fini = nni_bus_sock_fini,
- .sock_open = nni_bus_sock_open,
- .sock_close = nni_bus_sock_close,
- .sock_setopt = nni_bus_sock_setopt,
- .sock_getopt = nni_bus_sock_getopt,
+static nni_proto_sock_ops bus_sock_ops = {
+ .sock_init = bus_sock_init,
+ .sock_fini = bus_sock_fini,
+ .sock_open = bus_sock_open,
+ .sock_close = bus_sock_close,
+ .sock_setopt = bus_sock_setopt,
+ .sock_getopt = bus_sock_getopt,
};
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
-nni_proto nni_bus_proto = {
+static nni_proto bus_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_BUS_V0, "bus" },
.proto_peer = { NNG_PROTO_BUS_V0, "bus" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_bus_sock_ops,
- .proto_pipe_ops = &nni_bus_pipe_ops,
+ .proto_sock_ops = &bus_sock_ops,
+ .proto_pipe_ops = &bus_pipe_ops,
};
int
nng_bus0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_bus_proto));
+ return (nni_proto_open(sidp, &bus_proto));
}