aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/bus/bus.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/bus/bus.c')
-rw-r--r--src/protocol/bus/bus.c100
1 files changed, 41 insertions, 59 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 5755d5a9..8c9ed83c 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -17,8 +17,8 @@
// for each participant to receive the message, each sender must be connected
// to every other node in the network (full mesh).
-typedef struct nni_bus_pipe nni_bus_pipe;
-typedef struct nni_bus_sock nni_bus_sock;
+typedef struct nni_bus_pipe nni_bus_pipe;
+typedef struct nni_bus_sock nni_bus_sock;
static void nni_bus_sock_getq(nni_bus_sock *);
static void nni_bus_pipe_getq(nni_bus_pipe *);
@@ -33,27 +33,26 @@ static void nni_bus_pipe_putq_cb(void *);
// An nni_bus_sock is our per-socket protocol private structure.
struct nni_bus_sock {
- nni_sock * nsock;
- int raw;
- nni_aio aio_getq;
- nni_list pipes;
- nni_mtx mtx;
+ nni_sock *nsock;
+ int raw;
+ nni_aio aio_getq;
+ nni_list pipes;
+ nni_mtx mtx;
};
// An nni_bus_pipe is our per-pipe protocol private structure.
struct nni_bus_pipe {
- nni_pipe * npipe;
- nni_bus_sock * psock;
- nni_msgq * sendq;
- nni_list_node node;
- nni_aio aio_getq;
- nni_aio aio_recv;
- nni_aio aio_send;
- nni_aio aio_putq;
- nni_mtx mtx;
+ nni_pipe * npipe;
+ nni_bus_sock *psock;
+ nni_msgq * sendq;
+ nni_list_node node;
+ nni_aio aio_getq;
+ nni_aio aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_putq;
+ nni_mtx mtx;
};
-
static void
nni_bus_sock_fini(void *arg)
{
@@ -66,12 +65,11 @@ nni_bus_sock_fini(void *arg)
}
}
-
static int
nni_bus_sock_init(void **sp, nni_sock *nsock)
{
nni_bus_sock *psock;
- int rv;
+ int rv;
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
@@ -85,7 +83,7 @@ nni_bus_sock_init(void **sp, nni_sock *nsock)
goto fail;
}
psock->nsock = nsock;
- psock->raw = 0;
+ psock->raw = 0;
*sp = psock;
return (0);
@@ -95,7 +93,6 @@ fail:
return (rv);
}
-
static void
nni_bus_sock_open(void *arg)
{
@@ -104,7 +101,6 @@ nni_bus_sock_open(void *arg)
nni_bus_sock_getq(psock);
}
-
static void
nni_bus_pipe_fini(void *arg)
{
@@ -121,12 +117,11 @@ nni_bus_pipe_fini(void *arg)
}
}
-
static int
nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
nni_bus_pipe *ppipe;
- int rv;
+ int rv;
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
@@ -155,7 +150,7 @@ nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock)
ppipe->npipe = npipe;
ppipe->psock = psock;
- *pp = ppipe;
+ *pp = ppipe;
return (0);
fail:
@@ -163,7 +158,6 @@ fail:
return (rv);
}
-
static int
nni_bus_pipe_start(void *arg)
{
@@ -180,7 +174,6 @@ nni_bus_pipe_start(void *arg)
return (0);
}
-
static void
nni_bus_pipe_stop(void *arg)
{
@@ -201,7 +194,6 @@ nni_bus_pipe_stop(void *arg)
nni_mtx_unlock(&ppipe->psock->mtx);
}
-
static void
nni_bus_pipe_getq_cb(void *arg)
{
@@ -218,7 +210,6 @@ nni_bus_pipe_getq_cb(void *arg)
nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
}
-
static void
nni_bus_pipe_send_cb(void *arg)
{
@@ -235,21 +226,20 @@ nni_bus_pipe_send_cb(void *arg)
nni_bus_pipe_getq(ppipe);
}
-
static void
nni_bus_pipe_recv_cb(void *arg)
{
nni_bus_pipe *ppipe = arg;
nni_bus_sock *psock = ppipe->psock;
- nni_msg *msg;
- uint32_t id;
+ nni_msg * msg;
+ uint32_t id;
if (nni_aio_result(&ppipe->aio_recv) != 0) {
nni_pipe_stop(ppipe->npipe);
return;
}
msg = ppipe->aio_recv.a_msg;
- id = nni_pipe_id(ppipe->npipe);
+ id = nni_pipe_id(ppipe->npipe);
if (nni_msg_prepend_header(msg, &id, 4) != 0) {
// XXX: bump a nomemory stat
@@ -262,7 +252,6 @@ nni_bus_pipe_recv_cb(void *arg)
nni_msgq_aio_put(nni_sock_recvq(psock->nsock), &ppipe->aio_putq);
}
-
static void
nni_bus_pipe_putq_cb(void *arg)
{
@@ -279,16 +268,15 @@ nni_bus_pipe_putq_cb(void *arg)
nni_bus_pipe_recv(ppipe);
}
-
static void
nni_bus_sock_getq_cb(void *arg)
{
nni_bus_sock *psock = arg;
nni_bus_pipe *ppipe;
nni_bus_pipe *lpipe;
- nni_msgq *uwq = nni_sock_sendq(psock->nsock);
- nni_msg *msg, *dup;
- uint32_t sender;
+ nni_msgq * uwq = nni_sock_sendq(psock->nsock);
+ nni_msg * msg, *dup;
+ uint32_t sender;
if (nni_aio_result(&psock->aio_getq) != 0) {
return;
@@ -333,33 +321,29 @@ nni_bus_sock_getq_cb(void *arg)
nni_bus_sock_getq(psock);
}
-
static void
nni_bus_sock_getq(nni_bus_sock *psock)
{
nni_msgq_aio_get(nni_sock_sendq(psock->nsock), &psock->aio_getq);
}
-
static void
nni_bus_pipe_getq(nni_bus_pipe *ppipe)
{
nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
}
-
static void
nni_bus_pipe_recv(nni_bus_pipe *ppipe)
{
nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
}
-
static int
nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_bus_sock *psock = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -371,12 +355,11 @@ nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_bus_sock *psock = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -388,29 +371,28 @@ nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
static nni_proto_pipe_ops nni_bus_pipe_ops = {
- .pipe_init = nni_bus_pipe_init,
- .pipe_fini = nni_bus_pipe_fini,
- .pipe_start = nni_bus_pipe_start,
- .pipe_stop = nni_bus_pipe_stop,
+ .pipe_init = nni_bus_pipe_init,
+ .pipe_fini = nni_bus_pipe_fini,
+ .pipe_start = nni_bus_pipe_start,
+ .pipe_stop = nni_bus_pipe_stop,
};
static nni_proto_sock_ops nni_bus_sock_ops = {
- .sock_init = nni_bus_sock_init,
- .sock_fini = nni_bus_sock_fini,
- .sock_open = nni_bus_sock_open,
- .sock_setopt = nni_bus_sock_setopt,
- .sock_getopt = nni_bus_sock_getopt,
+ .sock_init = nni_bus_sock_init,
+ .sock_fini = nni_bus_sock_fini,
+ .sock_open = nni_bus_sock_open,
+ .sock_setopt = nni_bus_sock_setopt,
+ .sock_getopt = nni_bus_sock_getopt,
};
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
nni_proto nni_bus_proto = {
- .proto_self = NNG_PROTO_BUS,
- .proto_peer = NNG_PROTO_BUS,
- .proto_name = "bus",
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_self = NNG_PROTO_BUS,
+ .proto_peer = NNG_PROTO_BUS,
+ .proto_name = "bus",
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_bus_sock_ops,
.proto_pipe_ops = &nni_bus_pipe_ops,
};