From d6bd9375c828cb67518341122208f86ee37940a2 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Tue, 10 Jan 2017 23:38:02 -0800 Subject: Initial (untested) bus implementation. --- src/protocol/bus/bus.c | 275 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 275 insertions(+) create mode 100644 src/protocol/bus/bus.c (limited to 'src/protocol') diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c new file mode 100644 index 00000000..a9e402ef --- /dev/null +++ b/src/protocol/bus/bus.c @@ -0,0 +1,275 @@ +// +// Copyright 2017 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" + +// Bus protocol. The BUS protocol, each peer sends a message to its peers. +// However, bus protocols do not "forward" (absent a device). So in order +// for each participant to receive the message, each sender must be connected +// to every other node in the network (full mesh). + +typedef struct nni_bus_pipe nni_bus_pipe; +typedef struct nni_bus_sock nni_bus_sock; + +// An nni_bus_sock is our per-socket protocol private structure. +struct nni_bus_sock { + nni_sock * nsock; + int raw; + int closing; + nni_list pipes; +}; + +// An nni_bus_pipe is our per-pipe protocol private structure. +struct nni_bus_pipe { + nni_pipe * npipe; + nni_bus_sock * psock; + nni_msgq * sendq; + nni_list_node node; + int sigclose; +}; + +static int +nni_bus_sock_init(void **sp, nni_sock *nsock) +{ + nni_bus_sock *psock; + int rv; + + if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node); + psock->nsock = nsock; + psock->raw = 0; + + *sp = psock; + nni_sock_recverr(nsock, NNG_ESTATE); + return (0); +} + + +static void +nni_bus_sock_fini(void *arg) +{ + nni_bus_sock *psock = arg; + + NNI_FREE_STRUCT(psock); +} + + +static int +nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock) +{ + nni_bus_pipe *ppipe; + int rv; + + if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_NODE_INIT(&ppipe->node); + // This depth could be tunable. + if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { + NNI_FREE_STRUCT(ppipe); + return (rv); + } + ppipe->npipe = npipe; + ppipe->psock = psock; + ppipe->sigclose = 0; + *pp = ppipe; + return (0); +} + + +static void +nni_bus_pipe_fini(void *arg) +{ + nni_bus_pipe *ppipe = arg; + + NNI_FREE_STRUCT(ppipe); +} + + +static int +nni_bus_pipe_add(void *arg) +{ + nni_bus_pipe *ppipe = arg; + nni_bus_sock *psock = ppipe->psock; + + nni_list_append(&psock->pipes, ppipe); + return (0); +} + + +static void +nni_bus_pipe_rem(void *arg) +{ + nni_bus_pipe *ppipe = arg; + nni_bus_sock *psock = ppipe->psock; + + nni_list_remove(&psock->pipes, ppipe); +} + + +static void +nni_bus_pipe_sender(void *arg) +{ + nni_bus_pipe *ppipe = arg; + nni_pipe *npipe = ppipe->npipe; + nni_msgq *uwq = ppipe->sendq; + nni_msgq *urq = nni_sock_recvq(ppipe->psock->nsock); + nni_msg *msg; + int rv; + + for (;;) { + rv = nni_msgq_get_sig(uwq, &msg, &ppipe->sigclose); + if (rv != 0) { + break; + } + rv = nni_pipe_send(npipe, msg); + if (rv != 0) { + nni_msg_free(msg); + break; + } + } + nni_msgq_signal(urq, &ppipe->sigclose); + nni_pipe_close(npipe); +} + + +static void +nni_bus_pipe_receiver(void *arg) +{ + nni_bus_pipe *ppipe = arg; + nni_bus_sock *psock = ppipe->psock; + nni_msgq *urq = nni_sock_recvq(psock->nsock); + nni_msgq *uwq = nni_sock_sendq(psock->nsock); + nni_pipe *npipe = ppipe->npipe; + nni_msg *msg; + int rv; + + for (;;) { + rv = nni_pipe_recv(npipe, &msg); + if (rv != 0) { + break; + } + rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose); + if (rv != 0) { + nni_msg_free(msg); + break; + } + } + nni_msgq_signal(uwq, &ppipe->sigclose); + nni_msgq_signal(ppipe->sendq, &ppipe->sigclose); + nni_pipe_close(npipe); +} + + +static int +nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz) +{ + nni_bus_sock *psock = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static int +nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp) +{ + nni_bus_sock *psock = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + rv = nni_getopt_int(&psock->raw, buf, szp); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static void +nni_bus_sock_sender(void *arg) +{ + nni_bus_sock *psock = arg; + nni_msgq *uwq = nni_sock_sendq(psock->nsock); + nni_mtx *mx = nni_sock_mtx(psock->nsock); + nni_msg *msg, *dup; + + for (;;) { + nni_bus_pipe *ppipe; + nni_bus_pipe *last; + int rv; + + if ((rv = nni_msgq_get(uwq, &msg)) != 0) { + break; + } + + nni_mtx_lock(mx); + last = nni_list_last(&psock->pipes); + NNI_LIST_FOREACH (&psock->pipes, ppipe) { + if (ppipe != last) { + rv = nni_msg_dup(&dup, msg); + if (rv != 0) { + continue; + } + } else { + dup = msg; + } + if ((rv = nni_msgq_tryput(ppipe->sendq, dup)) != 0) { + nni_msg_free(dup); + } + } + nni_mtx_unlock(mx); + + if (last == NULL) { + nni_msg_free(msg); + } + } +} + + +static nni_proto_pipe_ops nni_bus_pipe_ops = { + .pipe_init = nni_bus_pipe_init, + .pipe_fini = nni_bus_pipe_fini, + .pipe_add = nni_bus_pipe_add, + .pipe_rem = nni_bus_pipe_rem, + .pipe_worker = { nni_bus_pipe_sender, + nni_bus_pipe_receiver } +}; + +static nni_proto_sock_ops nni_bus_sock_ops = { + .sock_init = nni_bus_sock_init, + .sock_fini = nni_bus_sock_fini, + .sock_setopt = nni_bus_sock_setopt, + .sock_getopt = nni_bus_sock_getopt, + .sock_worker = { nni_bus_sock_sender }, +}; + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +nni_proto nni_bus_proto = { + .proto_self = NNG_PROTO_BUS, + .proto_peer = NNG_PROTO_BUS, + .proto_name = "bus", + .proto_sock_ops = &nni_bus_sock_ops, + .proto_pipe_ops = &nni_bus_pipe_ops, +}; -- cgit v1.2.3-70-g09d2