diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-31 17:59:01 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-09-22 11:47:07 -0700 |
| commit | d72076207a2fad96ff014a81366868fb47a0ed1b (patch) | |
| tree | 5a4f67ab607ef6690e983c2d1ab2c64062027e52 /src/protocol/bus/bus.c | |
| parent | 366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff) | |
| download | nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2 nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip | |
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them
abstractly in more places without inlining them. This will be used
for the ZeroTier transport to allow us to create operations consisting
of just the AIO. Furthermore, we provide accessors for some of the
aio members, in the hopes that we will be able to wrap these for
"safe" version of the AIO capability to export to applications, and
to protocol and transport implementors.
While here we cleaned up the protocol details to use consistently
shorter names (no nni_ prefix for static symbols needed), and we
also fixed a bug in the surveyor code.
Diffstat (limited to 'src/protocol/bus/bus.c')
| -rw-r--r-- | src/protocol/bus/bus.c | 354 |
1 files changed, 179 insertions, 175 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 6f6def0a..c8d759f2 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -18,251 +18,257 @@ // for each participant to receive the message, each sender must be connected // to every other node in the network (full mesh). -typedef struct nni_bus_pipe nni_bus_pipe; -typedef struct nni_bus_sock nni_bus_sock; - -static void nni_bus_sock_getq(nni_bus_sock *); -static void nni_bus_pipe_getq(nni_bus_pipe *); -static void nni_bus_pipe_send(nni_bus_pipe *); -static void nni_bus_pipe_recv(nni_bus_pipe *); - -static void nni_bus_sock_getq_cb(void *); -static void nni_bus_pipe_getq_cb(void *); -static void nni_bus_pipe_send_cb(void *); -static void nni_bus_pipe_recv_cb(void *); -static void nni_bus_pipe_putq_cb(void *); - -// An nni_bus_sock is our per-socket protocol private structure. -struct nni_bus_sock { +typedef struct bus_pipe bus_pipe; +typedef struct bus_sock bus_sock; + +static void bus_sock_getq(bus_sock *); +static void bus_pipe_getq(bus_pipe *); +static void bus_pipe_send(bus_pipe *); +static void bus_pipe_recv(bus_pipe *); + +static void bus_sock_getq_cb(void *); +static void bus_pipe_getq_cb(void *); +static void bus_pipe_send_cb(void *); +static void bus_pipe_recv_cb(void *); +static void bus_pipe_putq_cb(void *); + +// A bus_sock is our per-socket protocol private structure. +struct bus_sock { nni_sock *nsock; int raw; - nni_aio aio_getq; + nni_aio * aio_getq; nni_list pipes; nni_mtx mtx; }; -// An nni_bus_pipe is our per-pipe protocol private structure. -struct nni_bus_pipe { +// A bus_pipe is our per-pipe protocol private structure. +struct bus_pipe { nni_pipe * npipe; - nni_bus_sock *psock; + bus_sock * psock; nni_msgq * sendq; nni_list_node node; - nni_aio aio_getq; - nni_aio aio_recv; - nni_aio aio_send; - nni_aio aio_putq; + nni_aio * aio_getq; + nni_aio * aio_recv; + nni_aio * aio_send; + nni_aio * aio_putq; nni_mtx mtx; }; static void -nni_bus_sock_fini(void *arg) +bus_sock_fini(void *arg) { - nni_bus_sock *psock = arg; + bus_sock *s = arg; - nni_aio_stop(&psock->aio_getq); - nni_aio_fini(&psock->aio_getq); - nni_mtx_fini(&psock->mtx); - NNI_FREE_STRUCT(psock); + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); } static int -nni_bus_sock_init(void **sp, nni_sock *nsock) +bus_sock_init(void **sp, nni_sock *nsock) { - nni_bus_sock *psock; + bus_sock *s; + int rv; - if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node); - nni_mtx_init(&psock->mtx); - nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock); - psock->nsock = nsock; - psock->raw = 0; + NNI_LIST_INIT(&s->pipes, bus_pipe, node); + nni_mtx_init(&s->mtx); + if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) { + bus_sock_fini(s); + return (rv); + } + s->nsock = nsock; + s->raw = 0; - *sp = psock; + *sp = s; return (0); } static void -nni_bus_sock_open(void *arg) +bus_sock_open(void *arg) { - nni_bus_sock *psock = arg; + bus_sock *s = arg; - nni_bus_sock_getq(psock); + bus_sock_getq(s); } static void -nni_bus_sock_close(void *arg) +bus_sock_close(void *arg) { - nni_bus_sock *psock = arg; + bus_sock *s = arg; - nni_aio_cancel(&psock->aio_getq, NNG_ECLOSED); + nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -nni_bus_pipe_fini(void *arg) +bus_pipe_fini(void *arg) { - nni_bus_pipe *ppipe = arg; - - nni_aio_fini(&ppipe->aio_getq); - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_recv); - nni_aio_fini(&ppipe->aio_putq); - nni_msgq_fini(ppipe->sendq); - nni_mtx_fini(&ppipe->mtx); - NNI_FREE_STRUCT(ppipe); + bus_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_msgq_fini(p->sendq); + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); } static int -nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock) +bus_pipe_init(void **pp, nni_pipe *npipe, void *s) { - nni_bus_pipe *ppipe; - int rv; + bus_pipe *p; + int rv; - if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { - NNI_FREE_STRUCT(ppipe); + NNI_LIST_NODE_INIT(&p->node); + nni_mtx_init(&p->mtx); + if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) { + bus_pipe_fini(p); return (rv); } - NNI_LIST_NODE_INIT(&ppipe->node); - nni_mtx_init(&ppipe->mtx); - nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe); - nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe); - nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe); - nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe); - - ppipe->npipe = npipe; - ppipe->psock = psock; - *pp = ppipe; + + p->npipe = npipe; + p->psock = s; + *pp = p; return (0); } static int -nni_bus_pipe_start(void *arg) +bus_pipe_start(void *arg) { - nni_bus_pipe *ppipe = arg; - nni_bus_sock *psock = ppipe->psock; + bus_pipe *p = arg; + bus_sock *s = p->psock; - nni_mtx_lock(&psock->mtx); - nni_list_append(&psock->pipes, ppipe); - nni_mtx_unlock(&psock->mtx); + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + nni_mtx_unlock(&s->mtx); - nni_bus_pipe_recv(ppipe); - nni_bus_pipe_getq(ppipe); + bus_pipe_recv(p); + bus_pipe_getq(p); return (0); } static void -nni_bus_pipe_stop(void *arg) +bus_pipe_stop(void *arg) { - nni_bus_pipe *ppipe = arg; - nni_bus_sock *psock = ppipe->psock; + bus_pipe *p = arg; + bus_sock *s = p->psock; - nni_msgq_close(ppipe->sendq); + nni_msgq_close(p->sendq); - nni_aio_stop(&ppipe->aio_getq); - nni_aio_stop(&ppipe->aio_send); - nni_aio_stop(&ppipe->aio_recv); - nni_aio_stop(&ppipe->aio_putq); + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); - nni_mtx_lock(&ppipe->psock->mtx); - if (nni_list_active(&psock->pipes, ppipe)) { - nni_list_remove(&psock->pipes, ppipe); + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); } - nni_mtx_unlock(&ppipe->psock->mtx); + nni_mtx_unlock(&s->mtx); } static void -nni_bus_pipe_getq_cb(void *arg) +bus_pipe_getq_cb(void *arg) { - nni_bus_pipe *ppipe = arg; + bus_pipe *p = arg; - if (nni_aio_result(&ppipe->aio_getq) != 0) { + if (nni_aio_result(p->aio_getq) != 0) { // closed? - nni_pipe_stop(ppipe->npipe); + nni_pipe_stop(p->npipe); return; } - ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg; - ppipe->aio_getq.a_msg = NULL; + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); - nni_pipe_send(ppipe->npipe, &ppipe->aio_send); + nni_pipe_send(p->npipe, p->aio_send); } static void -nni_bus_pipe_send_cb(void *arg) +bus_pipe_send_cb(void *arg) { - nni_bus_pipe *ppipe = arg; + bus_pipe *p = arg; - if (nni_aio_result(&ppipe->aio_send) != 0) { + if (nni_aio_result(p->aio_send) != 0) { // closed? - nni_msg_free(ppipe->aio_send.a_msg); - ppipe->aio_send.a_msg = NULL; - nni_pipe_stop(ppipe->npipe); + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); return; } - nni_bus_pipe_getq(ppipe); + bus_pipe_getq(p); } static void -nni_bus_pipe_recv_cb(void *arg) +bus_pipe_recv_cb(void *arg) { - nni_bus_pipe *ppipe = arg; - nni_bus_sock *psock = ppipe->psock; - nni_msg * msg; + bus_pipe *p = arg; + bus_sock *s = p->psock; + nni_msg * msg; - if (nni_aio_result(&ppipe->aio_recv) != 0) { - nni_pipe_stop(ppipe->npipe); + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->npipe); return; } - msg = ppipe->aio_recv.a_msg; + msg = nni_aio_get_msg(p->aio_recv); - if (nni_msg_header_insert_u32(msg, nni_pipe_id(ppipe->npipe)) != 0) { + if (nni_msg_header_insert_u32(msg, nni_pipe_id(p->npipe)) != 0) { // XXX: bump a nomemory stat nni_msg_free(msg); - nni_pipe_stop(ppipe->npipe); + nni_aio_set_msg(p->aio_recv, NULL); + nni_pipe_stop(p->npipe); return; } - ppipe->aio_putq.a_msg = msg; - nni_msgq_aio_put(nni_sock_recvq(psock->nsock), &ppipe->aio_putq); + nni_aio_set_msg(p->aio_putq, msg); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msgq_aio_put(nni_sock_recvq(s->nsock), p->aio_putq); } static void -nni_bus_pipe_putq_cb(void *arg) +bus_pipe_putq_cb(void *arg) { - nni_bus_pipe *ppipe = arg; + bus_pipe *p = arg; - if (nni_aio_result(&ppipe->aio_putq) != 0) { - nni_msg_free(ppipe->aio_putq.a_msg); - ppipe->aio_putq.a_msg = NULL; - nni_pipe_stop(ppipe->npipe); + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->npipe); return; } // Wait for another recv. - nni_bus_pipe_recv(ppipe); + bus_pipe_recv(p); } static void -nni_bus_sock_getq_cb(void *arg) +bus_sock_getq_cb(void *arg) { - nni_bus_sock *psock = arg; - nni_bus_pipe *ppipe; - nni_bus_pipe *lpipe; - nni_msgq * uwq = nni_sock_sendq(psock->nsock); - nni_msg * msg; - nni_msg * dup; - uint32_t sender; - - if (nni_aio_result(&psock->aio_getq) != 0) { + bus_sock *s = arg; + bus_pipe *p; + bus_pipe *lastp; + nni_msgq *uwq = nni_sock_sendq(s->nsock); + nni_msg * msg; + nni_msg * dup; + uint32_t sender; + + if (nni_aio_result(s->aio_getq) != 0) { return; } - msg = psock->aio_getq.a_msg; + msg = nni_aio_get_msg(s->aio_getq); // The header being present indicates that the message // was received locally and we are rebroadcasting. (Device @@ -274,103 +280,101 @@ nni_bus_sock_getq_cb(void *arg) sender = 0; } - nni_mtx_lock(&psock->mtx); - lpipe = nni_list_last(&psock->pipes); - NNI_LIST_FOREACH (&psock->pipes, ppipe) { - if (nni_pipe_id(ppipe->npipe) == sender) { + nni_mtx_lock(&s->mtx); + lastp = nni_list_last(&s->pipes); + NNI_LIST_FOREACH (&s->pipes, p) { + if (nni_pipe_id(p->npipe) == sender) { continue; } - if (ppipe != lpipe) { + if (p != lastp) { if (nni_msg_dup(&dup, msg) != 0) { continue; } } else { dup = msg; } - if (nni_msgq_tryput(ppipe->sendq, dup) != 0) { + if (nni_msgq_tryput(p->sendq, dup) != 0) { nni_msg_free(dup); } } - nni_mtx_unlock(&psock->mtx); + nni_mtx_unlock(&s->mtx); - if (lpipe == NULL) { + if (lastp == NULL) { nni_msg_free(msg); } - nni_bus_sock_getq(psock); + bus_sock_getq(s); } static void -nni_bus_sock_getq(nni_bus_sock *psock) +bus_sock_getq(bus_sock *s) { - nni_msgq_aio_get(nni_sock_sendq(psock->nsock), &psock->aio_getq); + nni_msgq_aio_get(nni_sock_sendq(s->nsock), s->aio_getq); } static void -nni_bus_pipe_getq(nni_bus_pipe *ppipe) +bus_pipe_getq(bus_pipe *p) { - nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq); + nni_msgq_aio_get(p->sendq, p->aio_getq); } static void -nni_bus_pipe_recv(nni_bus_pipe *ppipe) +bus_pipe_recv(bus_pipe *p) { - nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv); + nni_pipe_recv(p->npipe, p->aio_recv); } static int -nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz) +bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { - nni_bus_sock *psock = arg; - int rv = NNG_ENOTSUP; + bus_sock *s = arg; + int rv = NNG_ENOTSUP; if (opt == nng_optid_raw) { - rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); + rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); } return (rv); } static int -nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp) +bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { - nni_bus_sock *psock = arg; - int rv = NNG_ENOTSUP; + bus_sock *s = arg; + int rv = NNG_ENOTSUP; if (opt == nng_optid_raw) { - rv = nni_getopt_int(&psock->raw, buf, szp); + rv = nni_getopt_int(&s->raw, buf, szp); } return (rv); } -static nni_proto_pipe_ops nni_bus_pipe_ops = { - .pipe_init = nni_bus_pipe_init, - .pipe_fini = nni_bus_pipe_fini, - .pipe_start = nni_bus_pipe_start, - .pipe_stop = nni_bus_pipe_stop, +static nni_proto_pipe_ops bus_pipe_ops = { + .pipe_init = bus_pipe_init, + .pipe_fini = bus_pipe_fini, + .pipe_start = bus_pipe_start, + .pipe_stop = bus_pipe_stop, }; -static nni_proto_sock_ops nni_bus_sock_ops = { - .sock_init = nni_bus_sock_init, - .sock_fini = nni_bus_sock_fini, - .sock_open = nni_bus_sock_open, - .sock_close = nni_bus_sock_close, - .sock_setopt = nni_bus_sock_setopt, - .sock_getopt = nni_bus_sock_getopt, +static nni_proto_sock_ops bus_sock_ops = { + .sock_init = bus_sock_init, + .sock_fini = bus_sock_fini, + .sock_open = bus_sock_open, + .sock_close = bus_sock_close, + .sock_setopt = bus_sock_setopt, + .sock_getopt = bus_sock_getopt, }; -// This is the global protocol structure -- our linkage to the core. -// This should be the only global non-static symbol in this file. -nni_proto nni_bus_proto = { +static nni_proto bus_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNG_PROTO_BUS_V0, "bus" }, .proto_peer = { NNG_PROTO_BUS_V0, "bus" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &nni_bus_sock_ops, - .proto_pipe_ops = &nni_bus_pipe_ops, + .proto_sock_ops = &bus_sock_ops, + .proto_pipe_ops = &bus_pipe_ops, }; int nng_bus0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &nni_bus_proto)); + return (nni_proto_open(sidp, &bus_proto)); } |
