aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub/sub.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-10 15:02:38 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-10 15:02:38 -0700
commit795aebbee77bb74d8792df96dfe1aa79ec9548fc (patch)
tree58c16424c16b9e71cebdceaee4507ab6608f80da /src/protocol/pubsub/sub.c
parentde90f97167d2df6739db47b2c6aad85f06250270 (diff)
downloadnng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.tar.gz
nng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.tar.bz2
nng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.zip
Give up on uncrustify; switch to clang-format.
Diffstat (limited to 'src/protocol/pubsub/sub.c')
-rw-r--r--src/protocol/pubsub/sub.c97
1 files changed, 42 insertions, 55 deletions
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 09a724e2..bc4de973 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -16,34 +16,34 @@
// it from publishers, and filters out those it is not interested in,
// only passing up ones that match known subscriptions.
-typedef struct nni_sub_pipe nni_sub_pipe;
-typedef struct nni_sub_sock nni_sub_sock;
-typedef struct nni_sub_topic nni_sub_topic;
+typedef struct nni_sub_pipe nni_sub_pipe;
+typedef struct nni_sub_sock nni_sub_sock;
+typedef struct nni_sub_topic nni_sub_topic;
static void nni_sub_recv_cb(void *);
static void nni_sub_putq_cb(void *);
static void nni_sub_pipe_fini(void *);
struct nni_sub_topic {
- nni_list_node node;
- size_t len;
- void * buf;
+ nni_list_node node;
+ size_t len;
+ void * buf;
};
// An nni_rep_sock is our per-socket protocol private structure.
struct nni_sub_sock {
- nni_sock * sock;
- nni_list topics;
- nni_msgq * urq;
- int raw;
+ nni_sock *sock;
+ nni_list topics;
+ nni_msgq *urq;
+ int raw;
};
// An nni_rep_pipe is our per-pipe protocol private structure.
struct nni_sub_pipe {
- nni_pipe * pipe;
- nni_sub_sock * sub;
- nni_aio aio_recv;
- nni_aio aio_putq;
+ nni_pipe * pipe;
+ nni_sub_sock *sub;
+ nni_aio aio_recv;
+ nni_aio aio_putq;
};
static int
@@ -56,7 +56,7 @@ nni_sub_sock_init(void **subp, nni_sock *sock)
}
NNI_LIST_INIT(&sub->topics, nni_sub_topic, node);
sub->sock = sock;
- sub->raw = 0;
+ sub->raw = 0;
sub->urq = nni_sock_recvq(sock);
nni_sock_senderr(sock, NNG_ENOTSUP);
@@ -64,11 +64,10 @@ nni_sub_sock_init(void **subp, nni_sock *sock)
return (0);
}
-
static void
nni_sub_sock_fini(void *arg)
{
- nni_sub_sock *sub = arg;
+ nni_sub_sock * sub = arg;
nni_sub_topic *topic;
while ((topic = nni_list_first(&sub->topics)) != NULL) {
@@ -79,12 +78,11 @@ nni_sub_sock_fini(void *arg)
NNI_FREE_STRUCT(sub);
}
-
static int
nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
{
nni_sub_pipe *sp;
- int rv;
+ int rv;
if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) {
return (NNG_ENOMEM);
@@ -95,12 +93,11 @@ nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
return (rv);
}
sp->pipe = pipe;
- sp->sub = ssock;
- *spp = sp;
+ sp->sub = ssock;
+ *spp = sp;
return (0);
}
-
static void
nni_sub_pipe_fini(void *arg)
{
@@ -111,7 +108,6 @@ nni_sub_pipe_fini(void *arg)
NNI_FREE_STRUCT(sp);
}
-
static int
nni_sub_pipe_start(void *arg)
{
@@ -121,7 +117,6 @@ nni_sub_pipe_start(void *arg)
return (0);
}
-
static void
nni_sub_pipe_stop(void *arg)
{
@@ -131,13 +126,12 @@ nni_sub_pipe_stop(void *arg)
nni_aio_stop(&sp->aio_recv);
}
-
static void
nni_sub_recv_cb(void *arg)
{
- nni_sub_pipe *sp = arg;
+ nni_sub_pipe *sp = arg;
nni_sub_sock *sub = sp->sub;
- nni_msgq *urq = sub->urq;
+ nni_msgq * urq = sub->urq;
if (nni_aio_result(&sp->aio_recv) != 0) {
nni_pipe_stop(sp->pipe);
@@ -149,7 +143,6 @@ nni_sub_recv_cb(void *arg)
nni_msgq_aio_put(sub->urq, &sp->aio_putq);
}
-
static void
nni_sub_putq_cb(void *arg)
{
@@ -165,7 +158,6 @@ nni_sub_putq_cb(void *arg)
nni_pipe_recv(sp->pipe, &sp->aio_recv);
}
-
// For now we maintain subscriptions on a sorted linked list. As we do not
// expect to have huge numbers of subscriptions, and as the operation is
// really O(n), we think this is acceptable. In the future we might decide
@@ -215,12 +207,11 @@ nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz)
return (0);
}
-
static int
nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
{
nni_sub_topic *topic;
- int rv;
+ int rv;
NNI_LIST_FOREACH (&sub->topics, topic) {
if (topic->len >= sz) {
@@ -246,12 +237,11 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
return (NNG_ENOENT);
}
-
static int
nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_sub_sock *sub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -269,12 +259,11 @@ nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_sub_sock *sub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -286,22 +275,21 @@ nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
static nni_msg *
nni_sub_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_sub_sock *sub = arg;
+ nni_sub_sock * sub = arg;
nni_sub_topic *topic;
- char *body;
- size_t len;
- int match;
+ char * body;
+ size_t len;
+ int match;
if (sub->raw) {
return (msg);
}
body = nni_msg_body(msg);
- len = nni_msg_len(msg);
+ len = nni_msg_len(msg);
match = 0;
// Check to see if the message matches one of our subscriptions.
@@ -329,29 +317,28 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_sub_pipe_ops = {
- .pipe_init = nni_sub_pipe_init,
- .pipe_fini = nni_sub_pipe_fini,
- .pipe_start = nni_sub_pipe_start,
- .pipe_stop = nni_sub_pipe_stop,
+ .pipe_init = nni_sub_pipe_init,
+ .pipe_fini = nni_sub_pipe_fini,
+ .pipe_start = nni_sub_pipe_start,
+ .pipe_stop = nni_sub_pipe_stop,
};
static nni_proto_sock_ops nni_sub_sock_ops = {
- .sock_init = nni_sub_sock_init,
- .sock_fini = nni_sub_sock_fini,
- .sock_setopt = nni_sub_sock_setopt,
- .sock_getopt = nni_sub_sock_getopt,
- .sock_rfilter = nni_sub_sock_rfilter,
+ .sock_init = nni_sub_sock_init,
+ .sock_fini = nni_sub_sock_fini,
+ .sock_setopt = nni_sub_sock_setopt,
+ .sock_getopt = nni_sub_sock_getopt,
+ .sock_rfilter = nni_sub_sock_rfilter,
};
nni_proto nni_sub_proto = {
- .proto_self = NNG_PROTO_SUB,
- .proto_peer = NNG_PROTO_PUB,
- .proto_name = "sub",
- .proto_flags = NNI_PROTO_FLAG_RCV,
+ .proto_self = NNG_PROTO_SUB,
+ .proto_peer = NNG_PROTO_PUB,
+ .proto_name = "sub",
+ .proto_flags = NNI_PROTO_FLAG_RCV,
.proto_sock_ops = &nni_sub_sock_ops,
.proto_pipe_ops = &nni_sub_pipe_ops,
};