aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-10 15:02:38 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-10 15:02:38 -0700
commit795aebbee77bb74d8792df96dfe1aa79ec9548fc (patch)
tree58c16424c16b9e71cebdceaee4507ab6608f80da /src/protocol/pubsub
parentde90f97167d2df6739db47b2c6aad85f06250270 (diff)
downloadnng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.tar.gz
nng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.tar.bz2
nng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.zip
Give up on uncrustify; switch to clang-format.
Diffstat (limited to 'src/protocol/pubsub')
-rw-r--r--src/protocol/pubsub/pub.c95
-rw-r--r--src/protocol/pubsub/sub.c97
2 files changed, 83 insertions, 109 deletions
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index e3b37f1a..64c2c59d 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -17,8 +17,8 @@
// perform sender-side filtering. Its best effort delivery, so anything
// that can't receive the message won't get one.
-typedef struct nni_pub_pipe nni_pub_pipe;
-typedef struct nni_pub_sock nni_pub_sock;
+typedef struct nni_pub_pipe nni_pub_pipe;
+typedef struct nni_pub_sock nni_pub_sock;
static void nni_pub_pipe_recv_cb(void *);
static void nni_pub_pipe_send_cb(void *);
@@ -29,30 +29,30 @@ static void nni_pub_pipe_fini(void *);
// An nni_pub_sock is our per-socket protocol private structure.
struct nni_pub_sock {
- nni_sock * sock;
- nni_msgq * uwq;
- int raw;
- nni_aio aio_getq;
- nni_list pipes;
- nni_mtx mtx;
+ nni_sock *sock;
+ nni_msgq *uwq;
+ int raw;
+ nni_aio aio_getq;
+ nni_list pipes;
+ nni_mtx mtx;
};
// An nni_pub_pipe is our per-pipe protocol private structure.
struct nni_pub_pipe {
- nni_pipe * pipe;
- nni_pub_sock * pub;
- nni_msgq * sendq;
- nni_aio aio_getq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_list_node node;
+ nni_pipe * pipe;
+ nni_pub_sock *pub;
+ nni_msgq * sendq;
+ nni_aio aio_getq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_list_node node;
};
static int
nni_pub_sock_init(void **pubp, nni_sock *sock)
{
nni_pub_sock *pub;
- int rv;
+ int rv;
if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) {
return (NNG_ENOMEM);
@@ -67,7 +67,7 @@ nni_pub_sock_init(void **pubp, nni_sock *sock)
return (rv);
}
pub->sock = sock;
- pub->raw = 0;
+ pub->raw = 0;
NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node);
pub->uwq = nni_sock_sendq(sock);
@@ -77,7 +77,6 @@ nni_pub_sock_init(void **pubp, nni_sock *sock)
return (0);
}
-
static void
nni_pub_sock_fini(void *arg)
{
@@ -88,7 +87,6 @@ nni_pub_sock_fini(void *arg)
NNI_FREE_STRUCT(pub);
}
-
static void
nni_pub_sock_open(void *arg)
{
@@ -97,7 +95,6 @@ nni_pub_sock_open(void *arg)
nni_msgq_aio_get(pub->uwq, &pub->aio_getq);
}
-
static void
nni_pub_pipe_fini(void *arg)
{
@@ -110,12 +107,11 @@ nni_pub_pipe_fini(void *arg)
NNI_FREE_STRUCT(pp);
}
-
static int
nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
nni_pub_pipe *pp;
- int rv;
+ int rv;
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
@@ -140,8 +136,8 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
goto fail;
}
pp->pipe = pipe;
- pp->pub = psock;
- *ppp = pp;
+ pp->pub = psock;
+ *ppp = pp;
return (0);
fail:
@@ -149,11 +145,10 @@ fail:
return (rv);
}
-
static int
nni_pub_pipe_start(void *arg)
{
- nni_pub_pipe *pp = arg;
+ nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) {
@@ -170,11 +165,10 @@ nni_pub_pipe_start(void *arg)
return (0);
}
-
static void
nni_pub_pipe_stop(void *arg)
{
- nni_pub_pipe *pp = arg;
+ nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
nni_aio_stop(&pp->aio_getq);
@@ -189,23 +183,22 @@ nni_pub_pipe_stop(void *arg)
nni_mtx_unlock(&pub->mtx);
}
-
static void
nni_pub_sock_getq_cb(void *arg)
{
nni_pub_sock *pub = arg;
- nni_msgq *uwq = pub->uwq;
- nni_msg *msg, *dup;
+ nni_msgq * uwq = pub->uwq;
+ nni_msg * msg, *dup;
nni_pub_pipe *pp;
nni_pub_pipe *last;
- int rv;
+ int rv;
if (nni_aio_result(&pub->aio_getq) != 0) {
return;
}
- msg = pub->aio_getq.a_msg;
+ msg = pub->aio_getq.a_msg;
pub->aio_getq.a_msg = NULL;
nni_mtx_lock(&pub->mtx);
@@ -232,7 +225,6 @@ nni_pub_sock_getq_cb(void *arg)
nni_msgq_aio_get(uwq, &pub->aio_getq);
}
-
static void
nni_pub_pipe_recv_cb(void *arg)
{
@@ -248,7 +240,6 @@ nni_pub_pipe_recv_cb(void *arg)
nni_pipe_recv(pp->pipe, &pp->aio_recv);
}
-
static void
nni_pub_pipe_getq_cb(void *arg)
{
@@ -265,7 +256,6 @@ nni_pub_pipe_getq_cb(void *arg)
nni_pipe_send(pp->pipe, &pp->aio_send);
}
-
static void
nni_pub_pipe_send_cb(void *arg)
{
@@ -282,12 +272,11 @@ nni_pub_pipe_send_cb(void *arg)
nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
}
-
static int
nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_pub_sock *pub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -299,12 +288,11 @@ nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_pub_sock *pub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -316,29 +304,28 @@ nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_pub_pipe_ops = {
- .pipe_init = nni_pub_pipe_init,
- .pipe_fini = nni_pub_pipe_fini,
- .pipe_start = nni_pub_pipe_start,
- .pipe_stop = nni_pub_pipe_stop,
+ .pipe_init = nni_pub_pipe_init,
+ .pipe_fini = nni_pub_pipe_fini,
+ .pipe_start = nni_pub_pipe_start,
+ .pipe_stop = nni_pub_pipe_stop,
};
nni_proto_sock_ops nni_pub_sock_ops = {
- .sock_init = nni_pub_sock_init,
- .sock_fini = nni_pub_sock_fini,
- .sock_open = nni_pub_sock_open,
- .sock_setopt = nni_pub_sock_setopt,
- .sock_getopt = nni_pub_sock_getopt,
+ .sock_init = nni_pub_sock_init,
+ .sock_fini = nni_pub_sock_fini,
+ .sock_open = nni_pub_sock_open,
+ .sock_setopt = nni_pub_sock_setopt,
+ .sock_getopt = nni_pub_sock_getopt,
};
nni_proto nni_pub_proto = {
- .proto_self = NNG_PROTO_PUB,
- .proto_peer = NNG_PROTO_SUB,
- .proto_name = "pub",
- .proto_flags = NNI_PROTO_FLAG_SND,
+ .proto_self = NNG_PROTO_PUB,
+ .proto_peer = NNG_PROTO_SUB,
+ .proto_name = "pub",
+ .proto_flags = NNI_PROTO_FLAG_SND,
.proto_sock_ops = &nni_pub_sock_ops,
.proto_pipe_ops = &nni_pub_pipe_ops,
};
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 09a724e2..bc4de973 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -16,34 +16,34 @@
// it from publishers, and filters out those it is not interested in,
// only passing up ones that match known subscriptions.
-typedef struct nni_sub_pipe nni_sub_pipe;
-typedef struct nni_sub_sock nni_sub_sock;
-typedef struct nni_sub_topic nni_sub_topic;
+typedef struct nni_sub_pipe nni_sub_pipe;
+typedef struct nni_sub_sock nni_sub_sock;
+typedef struct nni_sub_topic nni_sub_topic;
static void nni_sub_recv_cb(void *);
static void nni_sub_putq_cb(void *);
static void nni_sub_pipe_fini(void *);
struct nni_sub_topic {
- nni_list_node node;
- size_t len;
- void * buf;
+ nni_list_node node;
+ size_t len;
+ void * buf;
};
// An nni_rep_sock is our per-socket protocol private structure.
struct nni_sub_sock {
- nni_sock * sock;
- nni_list topics;
- nni_msgq * urq;
- int raw;
+ nni_sock *sock;
+ nni_list topics;
+ nni_msgq *urq;
+ int raw;
};
// An nni_rep_pipe is our per-pipe protocol private structure.
struct nni_sub_pipe {
- nni_pipe * pipe;
- nni_sub_sock * sub;
- nni_aio aio_recv;
- nni_aio aio_putq;
+ nni_pipe * pipe;
+ nni_sub_sock *sub;
+ nni_aio aio_recv;
+ nni_aio aio_putq;
};
static int
@@ -56,7 +56,7 @@ nni_sub_sock_init(void **subp, nni_sock *sock)
}
NNI_LIST_INIT(&sub->topics, nni_sub_topic, node);
sub->sock = sock;
- sub->raw = 0;
+ sub->raw = 0;
sub->urq = nni_sock_recvq(sock);
nni_sock_senderr(sock, NNG_ENOTSUP);
@@ -64,11 +64,10 @@ nni_sub_sock_init(void **subp, nni_sock *sock)
return (0);
}
-
static void
nni_sub_sock_fini(void *arg)
{
- nni_sub_sock *sub = arg;
+ nni_sub_sock * sub = arg;
nni_sub_topic *topic;
while ((topic = nni_list_first(&sub->topics)) != NULL) {
@@ -79,12 +78,11 @@ nni_sub_sock_fini(void *arg)
NNI_FREE_STRUCT(sub);
}
-
static int
nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
{
nni_sub_pipe *sp;
- int rv;
+ int rv;
if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) {
return (NNG_ENOMEM);
@@ -95,12 +93,11 @@ nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
return (rv);
}
sp->pipe = pipe;
- sp->sub = ssock;
- *spp = sp;
+ sp->sub = ssock;
+ *spp = sp;
return (0);
}
-
static void
nni_sub_pipe_fini(void *arg)
{
@@ -111,7 +108,6 @@ nni_sub_pipe_fini(void *arg)
NNI_FREE_STRUCT(sp);
}
-
static int
nni_sub_pipe_start(void *arg)
{
@@ -121,7 +117,6 @@ nni_sub_pipe_start(void *arg)
return (0);
}
-
static void
nni_sub_pipe_stop(void *arg)
{
@@ -131,13 +126,12 @@ nni_sub_pipe_stop(void *arg)
nni_aio_stop(&sp->aio_recv);
}
-
static void
nni_sub_recv_cb(void *arg)
{
- nni_sub_pipe *sp = arg;
+ nni_sub_pipe *sp = arg;
nni_sub_sock *sub = sp->sub;
- nni_msgq *urq = sub->urq;
+ nni_msgq * urq = sub->urq;
if (nni_aio_result(&sp->aio_recv) != 0) {
nni_pipe_stop(sp->pipe);
@@ -149,7 +143,6 @@ nni_sub_recv_cb(void *arg)
nni_msgq_aio_put(sub->urq, &sp->aio_putq);
}
-
static void
nni_sub_putq_cb(void *arg)
{
@@ -165,7 +158,6 @@ nni_sub_putq_cb(void *arg)
nni_pipe_recv(sp->pipe, &sp->aio_recv);
}
-
// For now we maintain subscriptions on a sorted linked list. As we do not
// expect to have huge numbers of subscriptions, and as the operation is
// really O(n), we think this is acceptable. In the future we might decide
@@ -215,12 +207,11 @@ nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz)
return (0);
}
-
static int
nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
{
nni_sub_topic *topic;
- int rv;
+ int rv;
NNI_LIST_FOREACH (&sub->topics, topic) {
if (topic->len >= sz) {
@@ -246,12 +237,11 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
return (NNG_ENOENT);
}
-
static int
nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_sub_sock *sub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -269,12 +259,11 @@ nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_sub_sock *sub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -286,22 +275,21 @@ nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
static nni_msg *
nni_sub_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_sub_sock *sub = arg;
+ nni_sub_sock * sub = arg;
nni_sub_topic *topic;
- char *body;
- size_t len;
- int match;
+ char * body;
+ size_t len;
+ int match;
if (sub->raw) {
return (msg);
}
body = nni_msg_body(msg);
- len = nni_msg_len(msg);
+ len = nni_msg_len(msg);
match = 0;
// Check to see if the message matches one of our subscriptions.
@@ -329,29 +317,28 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_sub_pipe_ops = {
- .pipe_init = nni_sub_pipe_init,
- .pipe_fini = nni_sub_pipe_fini,
- .pipe_start = nni_sub_pipe_start,
- .pipe_stop = nni_sub_pipe_stop,
+ .pipe_init = nni_sub_pipe_init,
+ .pipe_fini = nni_sub_pipe_fini,
+ .pipe_start = nni_sub_pipe_start,
+ .pipe_stop = nni_sub_pipe_stop,
};
static nni_proto_sock_ops nni_sub_sock_ops = {
- .sock_init = nni_sub_sock_init,
- .sock_fini = nni_sub_sock_fini,
- .sock_setopt = nni_sub_sock_setopt,
- .sock_getopt = nni_sub_sock_getopt,
- .sock_rfilter = nni_sub_sock_rfilter,
+ .sock_init = nni_sub_sock_init,
+ .sock_fini = nni_sub_sock_fini,
+ .sock_setopt = nni_sub_sock_setopt,
+ .sock_getopt = nni_sub_sock_getopt,
+ .sock_rfilter = nni_sub_sock_rfilter,
};
nni_proto nni_sub_proto = {
- .proto_self = NNG_PROTO_SUB,
- .proto_peer = NNG_PROTO_PUB,
- .proto_name = "sub",
- .proto_flags = NNI_PROTO_FLAG_RCV,
+ .proto_self = NNG_PROTO_SUB,
+ .proto_peer = NNG_PROTO_PUB,
+ .proto_name = "sub",
+ .proto_flags = NNI_PROTO_FLAG_RCV,
.proto_sock_ops = &nni_sub_sock_ops,
.proto_pipe_ops = &nni_sub_pipe_ops,
};