summaryrefslogtreecommitdiff
path: root/src/protocol/pubsub/pub.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pubsub/pub.c')
-rw-r--r--src/protocol/pubsub/pub.c95
1 files changed, 41 insertions, 54 deletions
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index e3b37f1a..64c2c59d 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -17,8 +17,8 @@
// perform sender-side filtering. Its best effort delivery, so anything
// that can't receive the message won't get one.
-typedef struct nni_pub_pipe nni_pub_pipe;
-typedef struct nni_pub_sock nni_pub_sock;
+typedef struct nni_pub_pipe nni_pub_pipe;
+typedef struct nni_pub_sock nni_pub_sock;
static void nni_pub_pipe_recv_cb(void *);
static void nni_pub_pipe_send_cb(void *);
@@ -29,30 +29,30 @@ static void nni_pub_pipe_fini(void *);
// An nni_pub_sock is our per-socket protocol private structure.
struct nni_pub_sock {
- nni_sock * sock;
- nni_msgq * uwq;
- int raw;
- nni_aio aio_getq;
- nni_list pipes;
- nni_mtx mtx;
+ nni_sock *sock;
+ nni_msgq *uwq;
+ int raw;
+ nni_aio aio_getq;
+ nni_list pipes;
+ nni_mtx mtx;
};
// An nni_pub_pipe is our per-pipe protocol private structure.
struct nni_pub_pipe {
- nni_pipe * pipe;
- nni_pub_sock * pub;
- nni_msgq * sendq;
- nni_aio aio_getq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_list_node node;
+ nni_pipe * pipe;
+ nni_pub_sock *pub;
+ nni_msgq * sendq;
+ nni_aio aio_getq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_list_node node;
};
static int
nni_pub_sock_init(void **pubp, nni_sock *sock)
{
nni_pub_sock *pub;
- int rv;
+ int rv;
if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) {
return (NNG_ENOMEM);
@@ -67,7 +67,7 @@ nni_pub_sock_init(void **pubp, nni_sock *sock)
return (rv);
}
pub->sock = sock;
- pub->raw = 0;
+ pub->raw = 0;
NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node);
pub->uwq = nni_sock_sendq(sock);
@@ -77,7 +77,6 @@ nni_pub_sock_init(void **pubp, nni_sock *sock)
return (0);
}
-
static void
nni_pub_sock_fini(void *arg)
{
@@ -88,7 +87,6 @@ nni_pub_sock_fini(void *arg)
NNI_FREE_STRUCT(pub);
}
-
static void
nni_pub_sock_open(void *arg)
{
@@ -97,7 +95,6 @@ nni_pub_sock_open(void *arg)
nni_msgq_aio_get(pub->uwq, &pub->aio_getq);
}
-
static void
nni_pub_pipe_fini(void *arg)
{
@@ -110,12 +107,11 @@ nni_pub_pipe_fini(void *arg)
NNI_FREE_STRUCT(pp);
}
-
static int
nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
nni_pub_pipe *pp;
- int rv;
+ int rv;
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
@@ -140,8 +136,8 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
goto fail;
}
pp->pipe = pipe;
- pp->pub = psock;
- *ppp = pp;
+ pp->pub = psock;
+ *ppp = pp;
return (0);
fail:
@@ -149,11 +145,10 @@ fail:
return (rv);
}
-
static int
nni_pub_pipe_start(void *arg)
{
- nni_pub_pipe *pp = arg;
+ nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) {
@@ -170,11 +165,10 @@ nni_pub_pipe_start(void *arg)
return (0);
}
-
static void
nni_pub_pipe_stop(void *arg)
{
- nni_pub_pipe *pp = arg;
+ nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
nni_aio_stop(&pp->aio_getq);
@@ -189,23 +183,22 @@ nni_pub_pipe_stop(void *arg)
nni_mtx_unlock(&pub->mtx);
}
-
static void
nni_pub_sock_getq_cb(void *arg)
{
nni_pub_sock *pub = arg;
- nni_msgq *uwq = pub->uwq;
- nni_msg *msg, *dup;
+ nni_msgq * uwq = pub->uwq;
+ nni_msg * msg, *dup;
nni_pub_pipe *pp;
nni_pub_pipe *last;
- int rv;
+ int rv;
if (nni_aio_result(&pub->aio_getq) != 0) {
return;
}
- msg = pub->aio_getq.a_msg;
+ msg = pub->aio_getq.a_msg;
pub->aio_getq.a_msg = NULL;
nni_mtx_lock(&pub->mtx);
@@ -232,7 +225,6 @@ nni_pub_sock_getq_cb(void *arg)
nni_msgq_aio_get(uwq, &pub->aio_getq);
}
-
static void
nni_pub_pipe_recv_cb(void *arg)
{
@@ -248,7 +240,6 @@ nni_pub_pipe_recv_cb(void *arg)
nni_pipe_recv(pp->pipe, &pp->aio_recv);
}
-
static void
nni_pub_pipe_getq_cb(void *arg)
{
@@ -265,7 +256,6 @@ nni_pub_pipe_getq_cb(void *arg)
nni_pipe_send(pp->pipe, &pp->aio_send);
}
-
static void
nni_pub_pipe_send_cb(void *arg)
{
@@ -282,12 +272,11 @@ nni_pub_pipe_send_cb(void *arg)
nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
}
-
static int
nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_pub_sock *pub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -299,12 +288,11 @@ nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_pub_sock *pub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -316,29 +304,28 @@ nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_pub_pipe_ops = {
- .pipe_init = nni_pub_pipe_init,
- .pipe_fini = nni_pub_pipe_fini,
- .pipe_start = nni_pub_pipe_start,
- .pipe_stop = nni_pub_pipe_stop,
+ .pipe_init = nni_pub_pipe_init,
+ .pipe_fini = nni_pub_pipe_fini,
+ .pipe_start = nni_pub_pipe_start,
+ .pipe_stop = nni_pub_pipe_stop,
};
nni_proto_sock_ops nni_pub_sock_ops = {
- .sock_init = nni_pub_sock_init,
- .sock_fini = nni_pub_sock_fini,
- .sock_open = nni_pub_sock_open,
- .sock_setopt = nni_pub_sock_setopt,
- .sock_getopt = nni_pub_sock_getopt,
+ .sock_init = nni_pub_sock_init,
+ .sock_fini = nni_pub_sock_fini,
+ .sock_open = nni_pub_sock_open,
+ .sock_setopt = nni_pub_sock_setopt,
+ .sock_getopt = nni_pub_sock_getopt,
};
nni_proto nni_pub_proto = {
- .proto_self = NNG_PROTO_PUB,
- .proto_peer = NNG_PROTO_SUB,
- .proto_name = "pub",
- .proto_flags = NNI_PROTO_FLAG_SND,
+ .proto_self = NNG_PROTO_PUB,
+ .proto_peer = NNG_PROTO_SUB,
+ .proto_name = "pub",
+ .proto_flags = NNI_PROTO_FLAG_SND,
.proto_sock_ops = &nni_pub_sock_ops,
.proto_pipe_ops = &nni_pub_pipe_ops,
};