aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/bus/bus.c100
-rw-r--r--src/protocol/pair/pair.c84
-rw-r--r--src/protocol/pipeline/pull.c73
-rw-r--r--src/protocol/pipeline/push.c83
-rw-r--r--src/protocol/pubsub/pub.c95
-rw-r--r--src/protocol/pubsub/sub.c97
-rw-r--r--src/protocol/reqrep/rep.c178
-rw-r--r--src/protocol/reqrep/req.c161
-rw-r--r--src/protocol/survey/respond.c161
-rw-r--r--src/protocol/survey/survey.c138
10 files changed, 512 insertions, 658 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 5755d5a9..8c9ed83c 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -17,8 +17,8 @@
// for each participant to receive the message, each sender must be connected
// to every other node in the network (full mesh).
-typedef struct nni_bus_pipe nni_bus_pipe;
-typedef struct nni_bus_sock nni_bus_sock;
+typedef struct nni_bus_pipe nni_bus_pipe;
+typedef struct nni_bus_sock nni_bus_sock;
static void nni_bus_sock_getq(nni_bus_sock *);
static void nni_bus_pipe_getq(nni_bus_pipe *);
@@ -33,27 +33,26 @@ static void nni_bus_pipe_putq_cb(void *);
// An nni_bus_sock is our per-socket protocol private structure.
struct nni_bus_sock {
- nni_sock * nsock;
- int raw;
- nni_aio aio_getq;
- nni_list pipes;
- nni_mtx mtx;
+ nni_sock *nsock;
+ int raw;
+ nni_aio aio_getq;
+ nni_list pipes;
+ nni_mtx mtx;
};
// An nni_bus_pipe is our per-pipe protocol private structure.
struct nni_bus_pipe {
- nni_pipe * npipe;
- nni_bus_sock * psock;
- nni_msgq * sendq;
- nni_list_node node;
- nni_aio aio_getq;
- nni_aio aio_recv;
- nni_aio aio_send;
- nni_aio aio_putq;
- nni_mtx mtx;
+ nni_pipe * npipe;
+ nni_bus_sock *psock;
+ nni_msgq * sendq;
+ nni_list_node node;
+ nni_aio aio_getq;
+ nni_aio aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_putq;
+ nni_mtx mtx;
};
-
static void
nni_bus_sock_fini(void *arg)
{
@@ -66,12 +65,11 @@ nni_bus_sock_fini(void *arg)
}
}
-
static int
nni_bus_sock_init(void **sp, nni_sock *nsock)
{
nni_bus_sock *psock;
- int rv;
+ int rv;
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
@@ -85,7 +83,7 @@ nni_bus_sock_init(void **sp, nni_sock *nsock)
goto fail;
}
psock->nsock = nsock;
- psock->raw = 0;
+ psock->raw = 0;
*sp = psock;
return (0);
@@ -95,7 +93,6 @@ fail:
return (rv);
}
-
static void
nni_bus_sock_open(void *arg)
{
@@ -104,7 +101,6 @@ nni_bus_sock_open(void *arg)
nni_bus_sock_getq(psock);
}
-
static void
nni_bus_pipe_fini(void *arg)
{
@@ -121,12 +117,11 @@ nni_bus_pipe_fini(void *arg)
}
}
-
static int
nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
nni_bus_pipe *ppipe;
- int rv;
+ int rv;
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
@@ -155,7 +150,7 @@ nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock)
ppipe->npipe = npipe;
ppipe->psock = psock;
- *pp = ppipe;
+ *pp = ppipe;
return (0);
fail:
@@ -163,7 +158,6 @@ fail:
return (rv);
}
-
static int
nni_bus_pipe_start(void *arg)
{
@@ -180,7 +174,6 @@ nni_bus_pipe_start(void *arg)
return (0);
}
-
static void
nni_bus_pipe_stop(void *arg)
{
@@ -201,7 +194,6 @@ nni_bus_pipe_stop(void *arg)
nni_mtx_unlock(&ppipe->psock->mtx);
}
-
static void
nni_bus_pipe_getq_cb(void *arg)
{
@@ -218,7 +210,6 @@ nni_bus_pipe_getq_cb(void *arg)
nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
}
-
static void
nni_bus_pipe_send_cb(void *arg)
{
@@ -235,21 +226,20 @@ nni_bus_pipe_send_cb(void *arg)
nni_bus_pipe_getq(ppipe);
}
-
static void
nni_bus_pipe_recv_cb(void *arg)
{
nni_bus_pipe *ppipe = arg;
nni_bus_sock *psock = ppipe->psock;
- nni_msg *msg;
- uint32_t id;
+ nni_msg * msg;
+ uint32_t id;
if (nni_aio_result(&ppipe->aio_recv) != 0) {
nni_pipe_stop(ppipe->npipe);
return;
}
msg = ppipe->aio_recv.a_msg;
- id = nni_pipe_id(ppipe->npipe);
+ id = nni_pipe_id(ppipe->npipe);
if (nni_msg_prepend_header(msg, &id, 4) != 0) {
// XXX: bump a nomemory stat
@@ -262,7 +252,6 @@ nni_bus_pipe_recv_cb(void *arg)
nni_msgq_aio_put(nni_sock_recvq(psock->nsock), &ppipe->aio_putq);
}
-
static void
nni_bus_pipe_putq_cb(void *arg)
{
@@ -279,16 +268,15 @@ nni_bus_pipe_putq_cb(void *arg)
nni_bus_pipe_recv(ppipe);
}
-
static void
nni_bus_sock_getq_cb(void *arg)
{
nni_bus_sock *psock = arg;
nni_bus_pipe *ppipe;
nni_bus_pipe *lpipe;
- nni_msgq *uwq = nni_sock_sendq(psock->nsock);
- nni_msg *msg, *dup;
- uint32_t sender;
+ nni_msgq * uwq = nni_sock_sendq(psock->nsock);
+ nni_msg * msg, *dup;
+ uint32_t sender;
if (nni_aio_result(&psock->aio_getq) != 0) {
return;
@@ -333,33 +321,29 @@ nni_bus_sock_getq_cb(void *arg)
nni_bus_sock_getq(psock);
}
-
static void
nni_bus_sock_getq(nni_bus_sock *psock)
{
nni_msgq_aio_get(nni_sock_sendq(psock->nsock), &psock->aio_getq);
}
-
static void
nni_bus_pipe_getq(nni_bus_pipe *ppipe)
{
nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
}
-
static void
nni_bus_pipe_recv(nni_bus_pipe *ppipe)
{
nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
}
-
static int
nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_bus_sock *psock = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -371,12 +355,11 @@ nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_bus_sock *psock = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -388,29 +371,28 @@ nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
static nni_proto_pipe_ops nni_bus_pipe_ops = {
- .pipe_init = nni_bus_pipe_init,
- .pipe_fini = nni_bus_pipe_fini,
- .pipe_start = nni_bus_pipe_start,
- .pipe_stop = nni_bus_pipe_stop,
+ .pipe_init = nni_bus_pipe_init,
+ .pipe_fini = nni_bus_pipe_fini,
+ .pipe_start = nni_bus_pipe_start,
+ .pipe_stop = nni_bus_pipe_stop,
};
static nni_proto_sock_ops nni_bus_sock_ops = {
- .sock_init = nni_bus_sock_init,
- .sock_fini = nni_bus_sock_fini,
- .sock_open = nni_bus_sock_open,
- .sock_setopt = nni_bus_sock_setopt,
- .sock_getopt = nni_bus_sock_getopt,
+ .sock_init = nni_bus_sock_init,
+ .sock_fini = nni_bus_sock_fini,
+ .sock_open = nni_bus_sock_open,
+ .sock_setopt = nni_bus_sock_setopt,
+ .sock_getopt = nni_bus_sock_getopt,
};
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
nni_proto nni_bus_proto = {
- .proto_self = NNG_PROTO_BUS,
- .proto_peer = NNG_PROTO_BUS,
- .proto_name = "bus",
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_self = NNG_PROTO_BUS,
+ .proto_peer = NNG_PROTO_BUS,
+ .proto_name = "bus",
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_bus_sock_ops,
.proto_pipe_ops = &nni_bus_pipe_ops,
};
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index b283377e..30302352 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -16,8 +16,8 @@
// While a peer is connected to the server, all other peer connection
// attempts are discarded.
-typedef struct nni_pair_pipe nni_pair_pipe;
-typedef struct nni_pair_sock nni_pair_sock;
+typedef struct nni_pair_pipe nni_pair_pipe;
+typedef struct nni_pair_sock nni_pair_sock;
static void nni_pair_send_cb(void *);
static void nni_pair_recv_cb(void *);
@@ -27,12 +27,12 @@ static void nni_pair_pipe_fini(void *);
// An nni_pair_sock is our per-socket protocol private structure.
struct nni_pair_sock {
- nni_sock * nsock;
- nni_pair_pipe * ppipe;
- nni_msgq * uwq;
- nni_msgq * urq;
- int raw;
- nni_mtx mtx;
+ nni_sock * nsock;
+ nni_pair_pipe *ppipe;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ int raw;
+ nni_mtx mtx;
};
// An nni_pair_pipe is our per-pipe protocol private structure. We keep
@@ -40,19 +40,19 @@ struct nni_pair_sock {
// pipe. The separate data structure is more like other protocols that do
// manage multiple pipes.
struct nni_pair_pipe {
- nni_pipe * npipe;
- nni_pair_sock * psock;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
+ nni_pipe * npipe;
+ nni_pair_sock *psock;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_aio aio_getq;
+ nni_aio aio_putq;
};
static int
nni_pair_sock_init(void **sp, nni_sock *nsock)
{
nni_pair_sock *psock;
- int rv;
+ int rv;
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
@@ -63,14 +63,13 @@ nni_pair_sock_init(void **sp, nni_sock *nsock)
}
psock->nsock = nsock;
psock->ppipe = NULL;
- psock->raw = 0;
- psock->uwq = nni_sock_sendq(nsock);
- psock->urq = nni_sock_recvq(nsock);
- *sp = psock;
+ psock->raw = 0;
+ psock->uwq = nni_sock_sendq(nsock);
+ psock->urq = nni_sock_recvq(nsock);
+ *sp = psock;
return (0);
}
-
static void
nni_pair_sock_fini(void *arg)
{
@@ -83,12 +82,11 @@ nni_pair_sock_fini(void *arg)
}
}
-
static int
nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
nni_pair_pipe *ppipe;
- int rv;
+ int rv;
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
@@ -111,7 +109,7 @@ nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock)
}
ppipe->npipe = npipe;
ppipe->psock = psock;
- *pp = ppipe;
+ *pp = ppipe;
return (0);
fail:
@@ -119,7 +117,6 @@ fail:
return (rv);
}
-
static void
nni_pair_pipe_fini(void *arg)
{
@@ -132,7 +129,6 @@ nni_pair_pipe_fini(void *arg)
NNI_FREE_STRUCT(ppipe);
}
-
static int
nni_pair_pipe_start(void *arg)
{
@@ -142,7 +138,7 @@ nni_pair_pipe_start(void *arg)
nni_mtx_lock(&psock->mtx);
if (psock->ppipe != NULL) {
nni_mtx_unlock(&psock->mtx);
- return (NNG_EBUSY); // Already have a peer, denied.
+ return (NNG_EBUSY); // Already have a peer, denied.
}
psock->ppipe = ppipe;
nni_mtx_unlock(&psock->mtx);
@@ -155,7 +151,6 @@ nni_pair_pipe_start(void *arg)
return (0);
}
-
static void
nni_pair_pipe_stop(void *arg)
{
@@ -174,7 +169,6 @@ nni_pair_pipe_stop(void *arg)
nni_mtx_unlock(&psock->mtx);
}
-
static void
nni_pair_recv_cb(void *arg)
{
@@ -191,7 +185,6 @@ nni_pair_recv_cb(void *arg)
nni_msgq_aio_put(psock->urq, &ppipe->aio_putq);
}
-
static void
nni_pair_putq_cb(void *arg)
{
@@ -206,7 +199,6 @@ nni_pair_putq_cb(void *arg)
nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
}
-
static void
nni_pair_getq_cb(void *arg)
{
@@ -223,7 +215,6 @@ nni_pair_getq_cb(void *arg)
nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
}
-
static void
nni_pair_send_cb(void *arg)
{
@@ -240,12 +231,11 @@ nni_pair_send_cb(void *arg)
nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq);
}
-
static int
nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_pair_sock *psock = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -257,12 +247,11 @@ nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_pair_sock *psock = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -274,29 +263,28 @@ nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_pair_pipe_ops = {
- .pipe_init = nni_pair_pipe_init,
- .pipe_fini = nni_pair_pipe_fini,
- .pipe_start = nni_pair_pipe_start,
- .pipe_stop = nni_pair_pipe_stop,
+ .pipe_init = nni_pair_pipe_init,
+ .pipe_fini = nni_pair_pipe_fini,
+ .pipe_start = nni_pair_pipe_start,
+ .pipe_stop = nni_pair_pipe_stop,
};
static nni_proto_sock_ops nni_pair_sock_ops = {
- .sock_init = nni_pair_sock_init,
- .sock_fini = nni_pair_sock_fini,
- .sock_setopt = nni_pair_sock_setopt,
- .sock_getopt = nni_pair_sock_getopt,
+ .sock_init = nni_pair_sock_init,
+ .sock_fini = nni_pair_sock_fini,
+ .sock_setopt = nni_pair_sock_setopt,
+ .sock_getopt = nni_pair_sock_getopt,
};
nni_proto nni_pair_proto = {
- .proto_self = NNG_PROTO_PAIR,
- .proto_peer = NNG_PROTO_PAIR,
- .proto_name = "pair",
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_self = NNG_PROTO_PAIR,
+ .proto_peer = NNG_PROTO_PAIR,
+ .proto_name = "pair",
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_pair_sock_ops,
.proto_pipe_ops = &nni_pair_pipe_ops,
};
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 27fd5478..80ff4245 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -14,8 +14,8 @@
// Pull protocol. The PULL protocol is the "read" side of a pipeline.
-typedef struct nni_pull_pipe nni_pull_pipe;
-typedef struct nni_pull_sock nni_pull_sock;
+typedef struct nni_pull_pipe nni_pull_pipe;
+typedef struct nni_pull_sock nni_pull_sock;
static void nni_pull_putq_cb(void *);
static void nni_pull_recv_cb(void *);
@@ -23,16 +23,16 @@ static void nni_pull_putq(nni_pull_pipe *, nni_msg *);
// An nni_pull_sock is our per-socket protocol private structure.
struct nni_pull_sock {
- nni_msgq * urq;
- int raw;
+ nni_msgq *urq;
+ int raw;
};
// An nni_pull_pipe is our per-pipe protocol private structure.
struct nni_pull_pipe {
- nni_pipe * pipe;
- nni_pull_sock * pull;
- nni_aio putq_aio;
- nni_aio recv_aio;
+ nni_pipe * pipe;
+ nni_pull_sock *pull;
+ nni_aio putq_aio;
+ nni_aio recv_aio;
};
static int
@@ -45,12 +45,11 @@ nni_pull_sock_init(void **pullp, nni_sock *sock)
}
pull->raw = 0;
pull->urq = nni_sock_recvq(sock);
- *pullp = pull;
+ *pullp = pull;
nni_sock_senderr(sock, NNG_ENOTSUP);
return (0);
}
-
static void
nni_pull_sock_fini(void *arg)
{
@@ -61,12 +60,11 @@ nni_pull_sock_fini(void *arg)
}
}
-
static int
nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
nni_pull_pipe *pp;
- int rv;
+ int rv;
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
@@ -82,11 +80,10 @@ nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
}
pp->pipe = pipe;
pp->pull = psock;
- *ppp = pp;
+ *ppp = pp;
return (0);
}
-
static void
nni_pull_pipe_fini(void *arg)
{
@@ -99,7 +96,6 @@ nni_pull_pipe_fini(void *arg)
}
}
-
static int
nni_pull_pipe_start(void *arg)
{
@@ -111,7 +107,6 @@ nni_pull_pipe_start(void *arg)
return (0);
}
-
static void
nni_pull_pipe_stop(void *arg)
{
@@ -121,13 +116,12 @@ nni_pull_pipe_stop(void *arg)
nni_aio_stop(&pp->recv_aio);
}
-
static void
nni_pull_recv_cb(void *arg)
{
- nni_pull_pipe *pp = arg;
- nni_aio *aio = &pp->recv_aio;
- nni_msg *msg;
+ nni_pull_pipe *pp = arg;
+ nni_aio * aio = &pp->recv_aio;
+ nni_msg * msg;
if (nni_aio_result(aio) != 0) {
// Failed to get a message, probably the pipe is closed.
@@ -136,17 +130,16 @@ nni_pull_recv_cb(void *arg)
}
// Got a message... start the put to send it up to the application.
- msg = aio->a_msg;
+ msg = aio->a_msg;
aio->a_msg = NULL;
nni_pull_putq(pp, msg);
}
-
static void
nni_pull_putq_cb(void *arg)
{
- nni_pull_pipe *pp = arg;
- nni_aio *aio = &pp->putq_aio;
+ nni_pull_pipe *pp = arg;
+ nni_aio * aio = &pp->putq_aio;
if (nni_aio_result(aio) != 0) {
// If we failed to put, probably NNG_ECLOSED, nothing else
@@ -160,7 +153,6 @@ nni_pull_putq_cb(void *arg)
nni_pipe_recv(pp->pipe, &pp->recv_aio);
}
-
// nni_pull_putq schedules a put operation to the user socket (sendup).
static void
nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg)
@@ -172,12 +164,11 @@ nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg)
nni_msgq_aio_put(pull->urq, &pp->putq_aio);
}
-
static int
nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_pull_sock *pull = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -189,12 +180,11 @@ nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_pull_sock *pull = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -206,28 +196,27 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_pull_pipe_ops = {
- .pipe_init = nni_pull_pipe_init,
- .pipe_fini = nni_pull_pipe_fini,
- .pipe_start = nni_pull_pipe_start,
- .pipe_stop = nni_pull_pipe_stop,
+ .pipe_init = nni_pull_pipe_init,
+ .pipe_fini = nni_pull_pipe_fini,
+ .pipe_start = nni_pull_pipe_start,
+ .pipe_stop = nni_pull_pipe_stop,
};
static nni_proto_sock_ops nni_pull_sock_ops = {
- .sock_init = nni_pull_sock_init,
- .sock_fini = nni_pull_sock_fini,
- .sock_setopt = nni_pull_sock_setopt,
- .sock_getopt = nni_pull_sock_getopt,
+ .sock_init = nni_pull_sock_init,
+ .sock_fini = nni_pull_sock_fini,
+ .sock_setopt = nni_pull_sock_setopt,
+ .sock_getopt = nni_pull_sock_getopt,
};
nni_proto nni_pull_proto = {
- .proto_self = NNG_PROTO_PULL,
- .proto_peer = NNG_PROTO_PUSH,
- .proto_name = "pull",
- .proto_flags = NNI_PROTO_FLAG_RCV,
+ .proto_self = NNG_PROTO_PULL,
+ .proto_peer = NNG_PROTO_PUSH,
+ .proto_name = "pull",
+ .proto_flags = NNI_PROTO_FLAG_RCV,
.proto_pipe_ops = &nni_pull_pipe_ops,
.proto_sock_ops = &nni_pull_sock_ops,
};
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index bbda91da..f1da0a9f 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -16,8 +16,8 @@
// Push distributes fairly, or tries to, by giving messages in round-robin
// order.
-typedef struct nni_push_pipe nni_push_pipe;
-typedef struct nni_push_sock nni_push_sock;
+typedef struct nni_push_pipe nni_push_pipe;
+typedef struct nni_push_sock nni_push_sock;
static void nni_push_send_cb(void *);
static void nni_push_recv_cb(void *);
@@ -25,20 +25,20 @@ static void nni_push_getq_cb(void *);
// An nni_push_sock is our per-socket protocol private structure.
struct nni_push_sock {
- nni_msgq * uwq;
- int raw;
- nni_sock * sock;
+ nni_msgq *uwq;
+ int raw;
+ nni_sock *sock;
};
// An nni_push_pipe is our per-pipe protocol private structure.
struct nni_push_pipe {
- nni_pipe * pipe;
- nni_push_sock * push;
- nni_list_node node;
+ nni_pipe * pipe;
+ nni_push_sock *push;
+ nni_list_node node;
- nni_aio aio_recv;
- nni_aio aio_send;
- nni_aio aio_getq;
+ nni_aio aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_getq;
};
static int
@@ -49,15 +49,14 @@ nni_push_sock_init(void **pushp, nni_sock *sock)
if ((push = NNI_ALLOC_STRUCT(push)) == NULL) {
return (NNG_ENOMEM);
}
- push->raw = 0;
+ push->raw = 0;
push->sock = sock;
- push->uwq = nni_sock_sendq(sock);
- *pushp = push;
+ push->uwq = nni_sock_sendq(sock);
+ *pushp = push;
nni_sock_recverr(sock, NNG_ENOTSUP);
return (0);
}
-
static void
nni_push_sock_fini(void *arg)
{
@@ -68,7 +67,6 @@ nni_push_sock_fini(void *arg)
}
}
-
static void
nni_push_pipe_fini(void *arg)
{
@@ -80,12 +78,11 @@ nni_push_pipe_fini(void *arg)
NNI_FREE_STRUCT(pp);
}
-
static int
nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
nni_push_pipe *pp;
- int rv;
+ int rv;
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
@@ -103,7 +100,7 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
NNI_LIST_NODE_INIT(&pp->node);
pp->pipe = pipe;
pp->push = psock;
- *ppp = pp;
+ *ppp = pp;
return (0);
fail:
@@ -111,11 +108,10 @@ fail:
return (rv);
}
-
static int
nni_push_pipe_start(void *arg)
{
- nni_push_pipe *pp = arg;
+ nni_push_pipe *pp = arg;
nni_push_sock *push = pp->push;
if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) {
@@ -132,11 +128,10 @@ nni_push_pipe_start(void *arg)
return (0);
}
-
static void
nni_push_pipe_stop(void *arg)
{
- nni_push_pipe *pp = arg;
+ nni_push_pipe *pp = arg;
nni_push_sock *push = pp->push;
nni_aio_stop(&pp->aio_recv);
@@ -144,7 +139,6 @@ nni_push_pipe_stop(void *arg)
nni_aio_stop(&pp->aio_getq);
}
-
static void
nni_push_recv_cb(void *arg)
{
@@ -161,11 +155,10 @@ nni_push_recv_cb(void *arg)
nni_pipe_recv(pp->pipe, &pp->aio_recv);
}
-
static void
nni_push_send_cb(void *arg)
{
- nni_push_pipe *pp = arg;
+ nni_push_pipe *pp = arg;
nni_push_sock *push = pp->push;
if (nni_aio_result(&pp->aio_send) != 0) {
@@ -178,12 +171,11 @@ nni_push_send_cb(void *arg)
nni_msgq_aio_get(push->uwq, &pp->aio_getq);
}
-
static void
nni_push_getq_cb(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_aio *aio = &pp->aio_getq;
+ nni_push_pipe *pp = arg;
+ nni_aio * aio = &pp->aio_getq;
if (nni_aio_result(aio) != 0) {
// If the socket is closing, nothing else we can do.
@@ -192,17 +184,16 @@ nni_push_getq_cb(void *arg)
}
pp->aio_send.a_msg = aio->a_msg;
- aio->a_msg = NULL;
+ aio->a_msg = NULL;
nni_pipe_send(pp->pipe, &pp->aio_send);
}
-
static int
nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_push_sock *push = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -214,12 +205,11 @@ nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_push_sock *push = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -231,28 +221,27 @@ nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_push_pipe_ops = {
- .pipe_init = nni_push_pipe_init,
- .pipe_fini = nni_push_pipe_fini,
- .pipe_start = nni_push_pipe_start,
- .pipe_stop = nni_push_pipe_stop,
+ .pipe_init = nni_push_pipe_init,
+ .pipe_fini = nni_push_pipe_fini,
+ .pipe_start = nni_push_pipe_start,
+ .pipe_stop = nni_push_pipe_stop,
};
static nni_proto_sock_ops nni_push_sock_ops = {
- .sock_init = nni_push_sock_init,
- .sock_fini = nni_push_sock_fini,
- .sock_setopt = nni_push_sock_setopt,
- .sock_getopt = nni_push_sock_getopt,
+ .sock_init = nni_push_sock_init,
+ .sock_fini = nni_push_sock_fini,
+ .sock_setopt = nni_push_sock_setopt,
+ .sock_getopt = nni_push_sock_getopt,
};
nni_proto nni_push_proto = {
- .proto_self = NNG_PROTO_PUSH,
- .proto_peer = NNG_PROTO_PULL,
- .proto_name = "push",
- .proto_flags = NNI_PROTO_FLAG_SND,
+ .proto_self = NNG_PROTO_PUSH,
+ .proto_peer = NNG_PROTO_PULL,
+ .proto_name = "push",
+ .proto_flags = NNI_PROTO_FLAG_SND,
.proto_pipe_ops = &nni_push_pipe_ops,
.proto_sock_ops = &nni_push_sock_ops,
};
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index e3b37f1a..64c2c59d 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -17,8 +17,8 @@
// perform sender-side filtering. Its best effort delivery, so anything
// that can't receive the message won't get one.
-typedef struct nni_pub_pipe nni_pub_pipe;
-typedef struct nni_pub_sock nni_pub_sock;
+typedef struct nni_pub_pipe nni_pub_pipe;
+typedef struct nni_pub_sock nni_pub_sock;
static void nni_pub_pipe_recv_cb(void *);
static void nni_pub_pipe_send_cb(void *);
@@ -29,30 +29,30 @@ static void nni_pub_pipe_fini(void *);
// An nni_pub_sock is our per-socket protocol private structure.
struct nni_pub_sock {
- nni_sock * sock;
- nni_msgq * uwq;
- int raw;
- nni_aio aio_getq;
- nni_list pipes;
- nni_mtx mtx;
+ nni_sock *sock;
+ nni_msgq *uwq;
+ int raw;
+ nni_aio aio_getq;
+ nni_list pipes;
+ nni_mtx mtx;
};
// An nni_pub_pipe is our per-pipe protocol private structure.
struct nni_pub_pipe {
- nni_pipe * pipe;
- nni_pub_sock * pub;
- nni_msgq * sendq;
- nni_aio aio_getq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_list_node node;
+ nni_pipe * pipe;
+ nni_pub_sock *pub;
+ nni_msgq * sendq;
+ nni_aio aio_getq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_list_node node;
};
static int
nni_pub_sock_init(void **pubp, nni_sock *sock)
{
nni_pub_sock *pub;
- int rv;
+ int rv;
if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) {
return (NNG_ENOMEM);
@@ -67,7 +67,7 @@ nni_pub_sock_init(void **pubp, nni_sock *sock)
return (rv);
}
pub->sock = sock;
- pub->raw = 0;
+ pub->raw = 0;
NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node);
pub->uwq = nni_sock_sendq(sock);
@@ -77,7 +77,6 @@ nni_pub_sock_init(void **pubp, nni_sock *sock)
return (0);
}
-
static void
nni_pub_sock_fini(void *arg)
{
@@ -88,7 +87,6 @@ nni_pub_sock_fini(void *arg)
NNI_FREE_STRUCT(pub);
}
-
static void
nni_pub_sock_open(void *arg)
{
@@ -97,7 +95,6 @@ nni_pub_sock_open(void *arg)
nni_msgq_aio_get(pub->uwq, &pub->aio_getq);
}
-
static void
nni_pub_pipe_fini(void *arg)
{
@@ -110,12 +107,11 @@ nni_pub_pipe_fini(void *arg)
NNI_FREE_STRUCT(pp);
}
-
static int
nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
nni_pub_pipe *pp;
- int rv;
+ int rv;
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
@@ -140,8 +136,8 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
goto fail;
}
pp->pipe = pipe;
- pp->pub = psock;
- *ppp = pp;
+ pp->pub = psock;
+ *ppp = pp;
return (0);
fail:
@@ -149,11 +145,10 @@ fail:
return (rv);
}
-
static int
nni_pub_pipe_start(void *arg)
{
- nni_pub_pipe *pp = arg;
+ nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) {
@@ -170,11 +165,10 @@ nni_pub_pipe_start(void *arg)
return (0);
}
-
static void
nni_pub_pipe_stop(void *arg)
{
- nni_pub_pipe *pp = arg;
+ nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
nni_aio_stop(&pp->aio_getq);
@@ -189,23 +183,22 @@ nni_pub_pipe_stop(void *arg)
nni_mtx_unlock(&pub->mtx);
}
-
static void
nni_pub_sock_getq_cb(void *arg)
{
nni_pub_sock *pub = arg;
- nni_msgq *uwq = pub->uwq;
- nni_msg *msg, *dup;
+ nni_msgq * uwq = pub->uwq;
+ nni_msg * msg, *dup;
nni_pub_pipe *pp;
nni_pub_pipe *last;
- int rv;
+ int rv;
if (nni_aio_result(&pub->aio_getq) != 0) {
return;
}
- msg = pub->aio_getq.a_msg;
+ msg = pub->aio_getq.a_msg;
pub->aio_getq.a_msg = NULL;
nni_mtx_lock(&pub->mtx);
@@ -232,7 +225,6 @@ nni_pub_sock_getq_cb(void *arg)
nni_msgq_aio_get(uwq, &pub->aio_getq);
}
-
static void
nni_pub_pipe_recv_cb(void *arg)
{
@@ -248,7 +240,6 @@ nni_pub_pipe_recv_cb(void *arg)
nni_pipe_recv(pp->pipe, &pp->aio_recv);
}
-
static void
nni_pub_pipe_getq_cb(void *arg)
{
@@ -265,7 +256,6 @@ nni_pub_pipe_getq_cb(void *arg)
nni_pipe_send(pp->pipe, &pp->aio_send);
}
-
static void
nni_pub_pipe_send_cb(void *arg)
{
@@ -282,12 +272,11 @@ nni_pub_pipe_send_cb(void *arg)
nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
}
-
static int
nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_pub_sock *pub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -299,12 +288,11 @@ nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_pub_sock *pub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -316,29 +304,28 @@ nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_pub_pipe_ops = {
- .pipe_init = nni_pub_pipe_init,
- .pipe_fini = nni_pub_pipe_fini,
- .pipe_start = nni_pub_pipe_start,
- .pipe_stop = nni_pub_pipe_stop,
+ .pipe_init = nni_pub_pipe_init,
+ .pipe_fini = nni_pub_pipe_fini,
+ .pipe_start = nni_pub_pipe_start,
+ .pipe_stop = nni_pub_pipe_stop,
};
nni_proto_sock_ops nni_pub_sock_ops = {
- .sock_init = nni_pub_sock_init,
- .sock_fini = nni_pub_sock_fini,
- .sock_open = nni_pub_sock_open,
- .sock_setopt = nni_pub_sock_setopt,
- .sock_getopt = nni_pub_sock_getopt,
+ .sock_init = nni_pub_sock_init,
+ .sock_fini = nni_pub_sock_fini,
+ .sock_open = nni_pub_sock_open,
+ .sock_setopt = nni_pub_sock_setopt,
+ .sock_getopt = nni_pub_sock_getopt,
};
nni_proto nni_pub_proto = {
- .proto_self = NNG_PROTO_PUB,
- .proto_peer = NNG_PROTO_SUB,
- .proto_name = "pub",
- .proto_flags = NNI_PROTO_FLAG_SND,
+ .proto_self = NNG_PROTO_PUB,
+ .proto_peer = NNG_PROTO_SUB,
+ .proto_name = "pub",
+ .proto_flags = NNI_PROTO_FLAG_SND,
.proto_sock_ops = &nni_pub_sock_ops,
.proto_pipe_ops = &nni_pub_pipe_ops,
};
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 09a724e2..bc4de973 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -16,34 +16,34 @@
// it from publishers, and filters out those it is not interested in,
// only passing up ones that match known subscriptions.
-typedef struct nni_sub_pipe nni_sub_pipe;
-typedef struct nni_sub_sock nni_sub_sock;
-typedef struct nni_sub_topic nni_sub_topic;
+typedef struct nni_sub_pipe nni_sub_pipe;
+typedef struct nni_sub_sock nni_sub_sock;
+typedef struct nni_sub_topic nni_sub_topic;
static void nni_sub_recv_cb(void *);
static void nni_sub_putq_cb(void *);
static void nni_sub_pipe_fini(void *);
struct nni_sub_topic {
- nni_list_node node;
- size_t len;
- void * buf;
+ nni_list_node node;
+ size_t len;
+ void * buf;
};
// An nni_rep_sock is our per-socket protocol private structure.
struct nni_sub_sock {
- nni_sock * sock;
- nni_list topics;
- nni_msgq * urq;
- int raw;
+ nni_sock *sock;
+ nni_list topics;
+ nni_msgq *urq;
+ int raw;
};
// An nni_rep_pipe is our per-pipe protocol private structure.
struct nni_sub_pipe {
- nni_pipe * pipe;
- nni_sub_sock * sub;
- nni_aio aio_recv;
- nni_aio aio_putq;
+ nni_pipe * pipe;
+ nni_sub_sock *sub;
+ nni_aio aio_recv;
+ nni_aio aio_putq;
};
static int
@@ -56,7 +56,7 @@ nni_sub_sock_init(void **subp, nni_sock *sock)
}
NNI_LIST_INIT(&sub->topics, nni_sub_topic, node);
sub->sock = sock;
- sub->raw = 0;
+ sub->raw = 0;
sub->urq = nni_sock_recvq(sock);
nni_sock_senderr(sock, NNG_ENOTSUP);
@@ -64,11 +64,10 @@ nni_sub_sock_init(void **subp, nni_sock *sock)
return (0);
}
-
static void
nni_sub_sock_fini(void *arg)
{
- nni_sub_sock *sub = arg;
+ nni_sub_sock * sub = arg;
nni_sub_topic *topic;
while ((topic = nni_list_first(&sub->topics)) != NULL) {
@@ -79,12 +78,11 @@ nni_sub_sock_fini(void *arg)
NNI_FREE_STRUCT(sub);
}
-
static int
nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
{
nni_sub_pipe *sp;
- int rv;
+ int rv;
if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) {
return (NNG_ENOMEM);
@@ -95,12 +93,11 @@ nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
return (rv);
}
sp->pipe = pipe;
- sp->sub = ssock;
- *spp = sp;
+ sp->sub = ssock;
+ *spp = sp;
return (0);
}
-
static void
nni_sub_pipe_fini(void *arg)
{
@@ -111,7 +108,6 @@ nni_sub_pipe_fini(void *arg)
NNI_FREE_STRUCT(sp);
}
-
static int
nni_sub_pipe_start(void *arg)
{
@@ -121,7 +117,6 @@ nni_sub_pipe_start(void *arg)
return (0);
}
-
static void
nni_sub_pipe_stop(void *arg)
{
@@ -131,13 +126,12 @@ nni_sub_pipe_stop(void *arg)
nni_aio_stop(&sp->aio_recv);
}
-
static void
nni_sub_recv_cb(void *arg)
{
- nni_sub_pipe *sp = arg;
+ nni_sub_pipe *sp = arg;
nni_sub_sock *sub = sp->sub;
- nni_msgq *urq = sub->urq;
+ nni_msgq * urq = sub->urq;
if (nni_aio_result(&sp->aio_recv) != 0) {
nni_pipe_stop(sp->pipe);
@@ -149,7 +143,6 @@ nni_sub_recv_cb(void *arg)
nni_msgq_aio_put(sub->urq, &sp->aio_putq);
}
-
static void
nni_sub_putq_cb(void *arg)
{
@@ -165,7 +158,6 @@ nni_sub_putq_cb(void *arg)
nni_pipe_recv(sp->pipe, &sp->aio_recv);
}
-
// For now we maintain subscriptions on a sorted linked list. As we do not
// expect to have huge numbers of subscriptions, and as the operation is
// really O(n), we think this is acceptable. In the future we might decide
@@ -215,12 +207,11 @@ nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz)
return (0);
}
-
static int
nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
{
nni_sub_topic *topic;
- int rv;
+ int rv;
NNI_LIST_FOREACH (&sub->topics, topic) {
if (topic->len >= sz) {
@@ -246,12 +237,11 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
return (NNG_ENOENT);
}
-
static int
nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_sub_sock *sub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -269,12 +259,11 @@ nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_sub_sock *sub = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RAW:
@@ -286,22 +275,21 @@ nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
static nni_msg *
nni_sub_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_sub_sock *sub = arg;
+ nni_sub_sock * sub = arg;
nni_sub_topic *topic;
- char *body;
- size_t len;
- int match;
+ char * body;
+ size_t len;
+ int match;
if (sub->raw) {
return (msg);
}
body = nni_msg_body(msg);
- len = nni_msg_len(msg);
+ len = nni_msg_len(msg);
match = 0;
// Check to see if the message matches one of our subscriptions.
@@ -329,29 +317,28 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_sub_pipe_ops = {
- .pipe_init = nni_sub_pipe_init,
- .pipe_fini = nni_sub_pipe_fini,
- .pipe_start = nni_sub_pipe_start,
- .pipe_stop = nni_sub_pipe_stop,
+ .pipe_init = nni_sub_pipe_init,
+ .pipe_fini = nni_sub_pipe_fini,
+ .pipe_start = nni_sub_pipe_start,
+ .pipe_stop = nni_sub_pipe_stop,
};
static nni_proto_sock_ops nni_sub_sock_ops = {
- .sock_init = nni_sub_sock_init,
- .sock_fini = nni_sub_sock_fini,
- .sock_setopt = nni_sub_sock_setopt,
- .sock_getopt = nni_sub_sock_getopt,
- .sock_rfilter = nni_sub_sock_rfilter,
+ .sock_init = nni_sub_sock_init,
+ .sock_fini = nni_sub_sock_fini,
+ .sock_setopt = nni_sub_sock_setopt,
+ .sock_getopt = nni_sub_sock_getopt,
+ .sock_rfilter = nni_sub_sock_rfilter,
};
nni_proto nni_sub_proto = {
- .proto_self = NNG_PROTO_SUB,
- .proto_peer = NNG_PROTO_PUB,
- .proto_name = "sub",
- .proto_flags = NNI_PROTO_FLAG_RCV,
+ .proto_self = NNG_PROTO_SUB,
+ .proto_peer = NNG_PROTO_PUB,
+ .proto_name = "sub",
+ .proto_flags = NNI_PROTO_FLAG_RCV,
.proto_sock_ops = &nni_sub_sock_ops,
.proto_pipe_ops = &nni_sub_pipe_ops,
};
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 7d887b55..cfc83a5b 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -16,9 +16,8 @@
// request-reply pair. This is useful for building RPC servers, for
// example.
-typedef struct nni_rep_pipe nni_rep_pipe;
-typedef struct nni_rep_sock nni_rep_sock;
-
+typedef struct nni_rep_pipe nni_rep_pipe;
+typedef struct nni_rep_sock nni_rep_sock;
static void nni_rep_sock_getq_cb(void *);
static void nni_rep_pipe_getq_cb(void *);
@@ -29,29 +28,29 @@ static void nni_rep_pipe_fini(void *);
// An nni_rep_sock is our per-socket protocol private structure.
struct nni_rep_sock {
- nni_sock * sock;
- nni_msgq * uwq;
- nni_msgq * urq;
- int raw;
- int ttl;
- nni_idhash pipes;
- char * btrace;
- size_t btrace_len;
- nni_aio aio_getq;
- nni_mtx mtx;
+ nni_sock * sock;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ int raw;
+ int ttl;
+ nni_idhash pipes;
+ char * btrace;
+ size_t btrace_len;
+ nni_aio aio_getq;
+ nni_mtx mtx;
};
// An nni_rep_pipe is our per-pipe protocol private structure.
struct nni_rep_pipe {
- nni_pipe * pipe;
- nni_rep_sock * rep;
- nni_msgq * sendq;
- uint32_t id; // we have to save it
- nni_aio aio_getq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_putq;
- nni_mtx mtx;
+ nni_pipe * pipe;
+ nni_rep_sock *rep;
+ nni_msgq * sendq;
+ uint32_t id; // we have to save it
+ nni_aio aio_getq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_aio aio_putq;
+ nni_mtx mtx;
};
static void
@@ -68,20 +67,19 @@ nni_rep_sock_fini(void *arg)
NNI_FREE_STRUCT(rep);
}
-
static int
nni_rep_sock_init(void **repp, nni_sock *sock)
{
nni_rep_sock *rep;
- int rv;
+ int rv;
if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) {
return (NNG_ENOMEM);
}
- rep->ttl = 8; // Per RFC
- rep->sock = sock;
- rep->raw = 0;
- rep->btrace = NULL;
+ rep->ttl = 8; // Per RFC
+ rep->sock = sock;
+ rep->raw = 0;
+ rep->btrace = NULL;
rep->btrace_len = 0;
if (((rv = nni_mtx_init(&rep->mtx)) != 0) ||
((rv = nni_idhash_init(&rep->pipes)) != 0)) {
@@ -106,7 +104,6 @@ fail:
return (rv);
}
-
static void
nni_rep_sock_open(void *arg)
{
@@ -115,7 +112,6 @@ nni_rep_sock_open(void *arg)
nni_msgq_aio_get(rep->uwq, &rep->aio_getq);
}
-
static void
nni_rep_sock_close(void *arg)
{
@@ -124,12 +120,11 @@ nni_rep_sock_close(void *arg)
nni_aio_stop(&rep->aio_getq);
}
-
static int
nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
nni_rep_pipe *rp;
- int rv;
+ int rv;
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
@@ -138,21 +133,25 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
((rv = nni_mtx_init(&rp->mtx)) != 0)) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) !=
+ 0) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) !=
+ 0) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) !=
+ 0) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) !=
+ 0) {
goto fail;
}
rp->pipe = pipe;
- rp->rep = rsock;
- *rpp = rp;
+ rp->rep = rsock;
+ *rpp = rp;
return (0);
fail:
@@ -160,7 +159,6 @@ fail:
return (rv);
}
-
static void
nni_rep_pipe_fini(void *arg)
{
@@ -175,13 +173,12 @@ nni_rep_pipe_fini(void *arg)
NNI_FREE_STRUCT(rp);
}
-
static int
nni_rep_pipe_start(void *arg)
{
- nni_rep_pipe *rp = arg;
+ nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- int rv;
+ int rv;
rp->id = nni_pipe_id(rp->pipe);
@@ -197,13 +194,12 @@ nni_rep_pipe_start(void *arg)
return (0);
}
-
static void
nni_rep_pipe_stop(void *arg)
{
- nni_rep_pipe *rp = arg;
+ nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- uint32_t id;
+ uint32_t id;
nni_aio_stop(&rp->aio_getq);
nni_aio_stop(&rp->aio_putq);
@@ -211,8 +207,8 @@ nni_rep_pipe_stop(void *arg)
nni_aio_stop(&rp->aio_recv);
nni_mtx_lock(&rp->mtx);
- id = rp->id;
- rp->id = 0; // makes this idempotent
+ id = rp->id;
+ rp->id = 0; // makes this idempotent
nni_msgq_close(rp->sendq);
nni_mtx_unlock(&rp->mtx);
@@ -223,17 +219,16 @@ nni_rep_pipe_stop(void *arg)
}
}
-
static void
nni_rep_sock_getq_cb(void *arg)
{
nni_rep_sock *rep = arg;
- nni_msgq *uwq = rep->uwq;
- nni_msg *msg;
- uint8_t *header;
- uint32_t id;
+ nni_msgq * uwq = rep->uwq;
+ nni_msg * msg;
+ uint8_t * header;
+ uint32_t id;
nni_rep_pipe *rp;
- int rv;
+ int rv;
// This watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
@@ -245,7 +240,7 @@ nni_rep_sock_getq_cb(void *arg)
return;
}
- msg = rep->aio_getq.a_msg;
+ msg = rep->aio_getq.a_msg;
rep->aio_getq.a_msg = NULL;
// We yank the outgoing pipe id from the header
@@ -278,7 +273,6 @@ nni_rep_sock_getq_cb(void *arg)
nni_msgq_aio_get(uwq, &rep->aio_getq);
}
-
static void
nni_rep_pipe_getq_cb(void *arg)
{
@@ -295,7 +289,6 @@ nni_rep_pipe_getq_cb(void *arg)
nni_pipe_send(rp->pipe, &rp->aio_send);
}
-
static void
nni_rep_pipe_send_cb(void *arg)
{
@@ -311,17 +304,16 @@ nni_rep_pipe_send_cb(void *arg)
nni_msgq_aio_get(rp->sendq, &rp->aio_getq);
}
-
static void
nni_rep_pipe_recv_cb(void *arg)
{
- nni_rep_pipe *rp = arg;
+ nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- nni_msg *msg;
- int rv;
- uint8_t idbuf[4];
- uint8_t *body;
- int hops;
+ nni_msg * msg;
+ int rv;
+ uint8_t idbuf[4];
+ uint8_t * body;
+ int hops;
if (nni_aio_result(&rp->aio_recv) != 0) {
nni_pipe_stop(rp->pipe);
@@ -330,7 +322,7 @@ nni_rep_pipe_recv_cb(void *arg)
NNI_PUT32(idbuf, rp->id);
- msg = rp->aio_recv.a_msg;
+ msg = rp->aio_recv.a_msg;
rp->aio_recv.a_msg = NULL;
// Store the pipe id in the header, first thing.
@@ -350,8 +342,8 @@ nni_rep_pipe_recv_cb(void *arg)
goto malformed;
}
body = nni_msg_body(msg);
- end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_append_header(msg, body, 4);
+ end = (body[0] & 0x80) ? 1 : 0;
+ rv = nni_msg_append_header(msg, body, 4);
if (rv != 0) {
// Presumably this is due to out of memory.
// We could just discard and try again, but we
@@ -376,7 +368,6 @@ malformed:
nni_pipe_stop(rp->pipe);
}
-
static void
nni_rep_pipe_putq_cb(void *arg)
{
@@ -392,12 +383,11 @@ nni_rep_pipe_putq_cb(void *arg)
nni_pipe_recv(rp->pipe, &rp->aio_recv);
}
-
static int
nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_rep_sock *rep = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
@@ -413,12 +403,11 @@ nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_rep_sock *rep = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
@@ -433,7 +422,6 @@ nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
static nni_msg *
nni_rep_sock_sfilter(void *arg, nni_msg *msg)
{
@@ -458,36 +446,35 @@ nni_rep_sock_sfilter(void *arg, nni_msg *msg)
if (nni_msg_append_header(msg, rep->btrace, rep->btrace_len) != 0) {
nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
+ rep->btrace = NULL;
rep->btrace_len = 0;
nni_msg_free(msg);
return (NULL);
}
nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
+ rep->btrace = NULL;
rep->btrace_len = 0;
return (msg);
}
-
static nni_msg *
nni_rep_sock_rfilter(void *arg, nni_msg *msg)
{
nni_rep_sock *rep = arg;
- char *header;
- size_t len;
+ char * header;
+ size_t len;
if (rep->raw) {
return (msg);
}
nni_sock_senderr(rep->sock, 0);
- len = nni_msg_header_len(msg);
+ len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
if (rep->btrace != NULL) {
nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
+ rep->btrace = NULL;
rep->btrace_len = 0;
}
if ((rep->btrace = nni_alloc(len)) == NULL) {
@@ -500,32 +487,31 @@ nni_rep_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_rep_pipe_ops = {
- .pipe_init = nni_rep_pipe_init,
- .pipe_fini = nni_rep_pipe_fini,
- .pipe_start = nni_rep_pipe_start,
- .pipe_stop = nni_rep_pipe_stop,
+ .pipe_init = nni_rep_pipe_init,
+ .pipe_fini = nni_rep_pipe_fini,
+ .pipe_start = nni_rep_pipe_start,
+ .pipe_stop = nni_rep_pipe_stop,
};
static nni_proto_sock_ops nni_rep_sock_ops = {
- .sock_init = nni_rep_sock_init,
- .sock_fini = nni_rep_sock_fini,
- .sock_open = nni_rep_sock_open,
- .sock_close = nni_rep_sock_close,
- .sock_setopt = nni_rep_sock_setopt,
- .sock_getopt = nni_rep_sock_getopt,
- .sock_rfilter = nni_rep_sock_rfilter,
- .sock_sfilter = nni_rep_sock_sfilter,
+ .sock_init = nni_rep_sock_init,
+ .sock_fini = nni_rep_sock_fini,
+ .sock_open = nni_rep_sock_open,
+ .sock_close = nni_rep_sock_close,
+ .sock_setopt = nni_rep_sock_setopt,
+ .sock_getopt = nni_rep_sock_getopt,
+ .sock_rfilter = nni_rep_sock_rfilter,
+ .sock_sfilter = nni_rep_sock_sfilter,
};
nni_proto nni_rep_proto = {
- .proto_self = NNG_PROTO_REP,
- .proto_peer = NNG_PROTO_REQ,
- .proto_name = "rep",
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_self = NNG_PROTO_REP,
+ .proto_peer = NNG_PROTO_REQ,
+ .proto_name = "rep",
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_rep_sock_ops,
.proto_pipe_ops = &nni_rep_pipe_ops,
};
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index f32fd66f..f77700f5 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -7,9 +7,9 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <stdio.h>
#include "core/nng_impl.h"
@@ -17,8 +17,8 @@
// request-reply pair. This is useful for building RPC clients, for
// example.
-typedef struct nni_req_pipe nni_req_pipe;
-typedef struct nni_req_sock nni_req_sock;
+typedef struct nni_req_pipe nni_req_pipe;
+typedef struct nni_req_sock nni_req_sock;
static void nni_req_resend(nni_req_sock *);
static void nni_req_timeout(void *);
@@ -26,38 +26,38 @@ static void nni_req_pipe_fini(void *);
// An nni_req_sock is our per-socket protocol private structure.
struct nni_req_sock {
- nni_sock * sock;
- nni_msgq * uwq;
- nni_msgq * urq;
- nni_duration retry;
- nni_time resend;
- int raw;
- int wantw;
- nni_msg * reqmsg;
+ nni_sock * sock;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ nni_duration retry;
+ nni_time resend;
+ int raw;
+ int wantw;
+ nni_msg * reqmsg;
- nni_req_pipe * pendpipe;
+ nni_req_pipe *pendpipe;
- nni_list readypipes;
- nni_list busypipes;
+ nni_list readypipes;
+ nni_list busypipes;
- nni_timer_node timer;
+ nni_timer_node timer;
- uint32_t nextid; // next id
- uint8_t reqid[4]; // outstanding request ID (big endian)
- nni_mtx mtx;
+ uint32_t nextid; // next id
+ uint8_t reqid[4]; // outstanding request ID (big endian)
+ nni_mtx mtx;
};
// An nni_req_pipe is our per-pipe protocol private structure.
struct nni_req_pipe {
- nni_pipe * pipe;
- nni_req_sock * req;
- nni_list_node node;
- nni_aio aio_getq; // raw mode only
- nni_aio aio_sendraw; // raw mode only
- nni_aio aio_sendcooked; // cooked mode only
- nni_aio aio_recv;
- nni_aio aio_putq;
- nni_mtx mtx;
+ nni_pipe * pipe;
+ nni_req_sock *req;
+ nni_list_node node;
+ nni_aio aio_getq; // raw mode only
+ nni_aio aio_sendraw; // raw mode only
+ nni_aio aio_sendcooked; // cooked mode only
+ nni_aio aio_recv;
+ nni_aio aio_putq;
+ nni_mtx mtx;
};
static void nni_req_resender(void *);
@@ -71,7 +71,7 @@ static int
nni_req_sock_init(void **reqp, nni_sock *sock)
{
nni_req_sock *req;
- int rv;
+ int rv;
if ((req = NNI_ALLOC_STRUCT(req)) == NULL) {
return (NNG_ENOMEM);
@@ -88,21 +88,20 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
// this is "semi random" start for request IDs.
req->nextid = nni_random();
- req->retry = NNI_SECOND * 60;
- req->sock = sock;
+ req->retry = NNI_SECOND * 60;
+ req->sock = sock;
req->reqmsg = NULL;
- req->raw = 0;
- req->wantw = 0;
+ req->raw = 0;
+ req->wantw = 0;
req->resend = NNI_TIME_ZERO;
req->uwq = nni_sock_sendq(sock);
req->urq = nni_sock_recvq(sock);
- *reqp = req;
+ *reqp = req;
nni_sock_recverr(sock, NNG_ESTATE);
return (0);
}
-
static void
nni_req_sock_close(void *arg)
{
@@ -111,7 +110,6 @@ nni_req_sock_close(void *arg)
nni_timer_cancel(&req->timer);
}
-
static void
nni_req_sock_fini(void *arg)
{
@@ -126,12 +124,11 @@ nni_req_sock_fini(void *arg)
NNI_FREE_STRUCT(req);
}
-
static int
nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
nni_req_pipe *rp;
- int rv;
+ int rv;
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
@@ -159,8 +156,8 @@ nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
NNI_LIST_NODE_INIT(&rp->node);
rp->pipe = pipe;
- rp->req = rsock;
- *rpp = rp;
+ rp->req = rsock;
+ *rpp = rp;
return (0);
failed:
@@ -168,7 +165,6 @@ failed:
return (rv);
}
-
static void
nni_req_pipe_fini(void *arg)
{
@@ -185,11 +181,10 @@ nni_req_pipe_fini(void *arg)
}
}
-
static int
nni_req_pipe_start(void *arg)
{
- nni_req_pipe *rp = arg;
+ nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) {
@@ -203,17 +198,15 @@ nni_req_pipe_start(void *arg)
}
nni_mtx_unlock(&req->mtx);
-
nni_msgq_aio_get(req->uwq, &rp->aio_getq);
nni_pipe_recv(rp->pipe, &rp->aio_recv);
return (0);
}
-
static void
nni_req_pipe_stop(void *arg)
{
- nni_req_pipe *rp = arg;
+ nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
nni_aio_stop(&rp->aio_getq);
@@ -236,19 +229,18 @@ nni_req_pipe_stop(void *arg)
// removing the pipe we sent the last request on...
// schedule immediate resend.
req->pendpipe = NULL;
- req->resend = NNI_TIME_ZERO;
- req->wantw = 1;
+ req->resend = NNI_TIME_ZERO;
+ req->wantw = 1;
nni_req_resend(req);
}
nni_mtx_unlock(&req->mtx);
}
-
static int
nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_req_sock *req = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RESENDTIME:
@@ -263,12 +255,11 @@ nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_req_sock *req = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RESENDTIME:
@@ -283,7 +274,6 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
// Raw and cooked mode differ in the way they send messages out.
//
// For cooked mdes, we have a getq callback on the upper write queue, which
@@ -303,7 +293,7 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static void
nni_req_getq_cb(void *arg)
{
- nni_req_pipe *rp = arg;
+ nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
// We should be in RAW mode. Cooked mode traffic bypasses
@@ -318,13 +308,12 @@ nni_req_getq_cb(void *arg)
}
rp->aio_sendraw.a_msg = rp->aio_getq.a_msg;
- rp->aio_getq.a_msg = NULL;
+ rp->aio_getq.a_msg = NULL;
// Send the message, but use the raw mode aio.
nni_pipe_send(rp->pipe, &rp->aio_sendraw);
}
-
static void
nni_req_sendraw_cb(void *arg)
{
@@ -341,11 +330,10 @@ nni_req_sendraw_cb(void *arg)
nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq);
}
-
static void
nni_req_sendcooked_cb(void *arg)
{
- nni_req_pipe *rp = arg;
+ nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
if (nni_aio_result(&rp->aio_sendcooked) != 0) {
@@ -377,7 +365,6 @@ nni_req_sendcooked_cb(void *arg)
nni_mtx_unlock(&req->mtx);
}
-
static void
nni_req_putq_cb(void *arg)
{
@@ -393,19 +380,18 @@ nni_req_putq_cb(void *arg)
nni_pipe_recv(rp->pipe, &rp->aio_recv);
}
-
static void
nni_req_recv_cb(void *arg)
{
nni_req_pipe *rp = arg;
- nni_msg *msg;
+ nni_msg * msg;
if (nni_aio_result(&rp->aio_recv) != 0) {
nni_pipe_stop(rp->pipe);
return;
}
- msg = rp->aio_recv.a_msg;
+ msg = rp->aio_recv.a_msg;
rp->aio_recv.a_msg = NULL;
// We yank 4 bytes of body, and move them to the header.
@@ -434,7 +420,6 @@ malformed:
nni_pipe_stop(rp->pipe);
}
-
static void
nni_req_timeout(void *arg)
{
@@ -448,12 +433,11 @@ nni_req_timeout(void *arg)
nni_mtx_unlock(&req->mtx);
}
-
static void
nni_req_resend(nni_req_sock *req)
{
nni_req_pipe *rp;
- nni_msg *msg;
+ nni_msg * msg;
// Note: This routine should be called with the socket lock held.
// Also, this should only be called while handling cooked mode
@@ -470,8 +454,8 @@ nni_req_resend(nni_req_sock *req)
// mark that we have a message we want to resend,
// in case something comes available.
req->wantw = 1;
- nni_timer_schedule(&req->timer,
- nni_clock() + req->retry);
+ nni_timer_schedule(
+ &req->timer, nni_clock() + req->retry);
return;
}
@@ -489,8 +473,8 @@ nni_req_resend(nni_req_sock *req)
nni_list_remove(&req->readypipes, rp);
nni_list_append(&req->busypipes, rp);
- req->pendpipe = rp;
- req->resend = nni_clock() + req->retry;
+ req->pendpipe = rp;
+ req->resend = nni_clock() + req->retry;
rp->aio_sendcooked.a_msg = msg;
// Note that because we were ready rather than busy, we
@@ -501,12 +485,11 @@ nni_req_resend(nni_req_sock *req)
}
}
-
static nni_msg *
nni_req_sock_sfilter(void *arg, nni_msg *msg)
{
nni_req_sock *req = arg;
- uint32_t id;
+ uint32_t id;
if (req->raw) {
// No automatic retry, and the request ID must
@@ -542,7 +525,7 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
req->reqmsg = msg;
// Schedule for immediate send
req->resend = NNI_TIME_ZERO;
- req->wantw = 1;
+ req->wantw = 1;
nni_req_resend(req);
nni_mtx_unlock(&req->mtx);
@@ -553,12 +536,11 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
return (NULL);
}
-
static nni_msg *
nni_req_sock_rfilter(void *arg, nni_msg *msg)
{
nni_req_sock *req = arg;
- nni_msg *rmsg;
+ nni_msg * rmsg;
if (req->raw) {
// Pass it unmolested
@@ -585,7 +567,7 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg)
return (NULL);
}
- req->reqmsg = NULL;
+ req->reqmsg = NULL;
req->pendpipe = NULL;
nni_mtx_unlock(&req->mtx);
@@ -595,31 +577,30 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_req_pipe_ops = {
- .pipe_init = nni_req_pipe_init,
- .pipe_fini = nni_req_pipe_fini,
- .pipe_start = nni_req_pipe_start,
- .pipe_stop = nni_req_pipe_stop,
+ .pipe_init = nni_req_pipe_init,
+ .pipe_fini = nni_req_pipe_fini,
+ .pipe_start = nni_req_pipe_start,
+ .pipe_stop = nni_req_pipe_stop,
};
static nni_proto_sock_ops nni_req_sock_ops = {
- .sock_init = nni_req_sock_init,
- .sock_fini = nni_req_sock_fini,
- .sock_close = nni_req_sock_close,
- .sock_setopt = nni_req_sock_setopt,
- .sock_getopt = nni_req_sock_getopt,
- .sock_rfilter = nni_req_sock_rfilter,
- .sock_sfilter = nni_req_sock_sfilter,
+ .sock_init = nni_req_sock_init,
+ .sock_fini = nni_req_sock_fini,
+ .sock_close = nni_req_sock_close,
+ .sock_setopt = nni_req_sock_setopt,
+ .sock_getopt = nni_req_sock_getopt,
+ .sock_rfilter = nni_req_sock_rfilter,
+ .sock_sfilter = nni_req_sock_sfilter,
};
nni_proto nni_req_proto = {
- .proto_self = NNG_PROTO_REQ,
- .proto_peer = NNG_PROTO_REP,
- .proto_name = "req",
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_self = NNG_PROTO_REQ,
+ .proto_peer = NNG_PROTO_REP,
+ .proto_name = "req",
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_req_sock_ops,
.proto_pipe_ops = &nni_req_pipe_ops,
};
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 73cc4792..6a784738 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -16,8 +16,8 @@
// the surveyor pattern. This is useful for building service discovery, or
// voting algorithsm, for example.
-typedef struct nni_resp_pipe nni_resp_pipe;
-typedef struct nni_resp_sock nni_resp_sock;
+typedef struct nni_resp_pipe nni_resp_pipe;
+typedef struct nni_resp_sock nni_resp_sock;
static void nni_resp_recv_cb(void *);
static void nni_resp_putq_cb(void *);
@@ -28,31 +28,30 @@ static void nni_resp_pipe_fini(void *);
// An nni_resp_sock is our per-socket protocol private structure.
struct nni_resp_sock {
- nni_sock * nsock;
- nni_msgq * urq;
- nni_msgq * uwq;
- int raw;
- int ttl;
- nni_idhash pipes;
- char * btrace;
- size_t btrace_len;
- nni_aio aio_getq;
- nni_mtx mtx;
+ nni_sock * nsock;
+ nni_msgq * urq;
+ nni_msgq * uwq;
+ int raw;
+ int ttl;
+ nni_idhash pipes;
+ char * btrace;
+ size_t btrace_len;
+ nni_aio aio_getq;
+ nni_mtx mtx;
};
// An nni_resp_pipe is our per-pipe protocol private structure.
struct nni_resp_pipe {
- nni_pipe * npipe;
- nni_resp_sock * psock;
- uint32_t id;
- nni_msgq * sendq;
- nni_aio aio_getq;
- nni_aio aio_putq;
- nni_aio aio_send;
- nni_aio aio_recv;
+ nni_pipe * npipe;
+ nni_resp_sock *psock;
+ uint32_t id;
+ nni_msgq * sendq;
+ nni_aio aio_getq;
+ nni_aio aio_putq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
};
-
static void
nni_resp_sock_fini(void *arg)
{
@@ -69,23 +68,22 @@ nni_resp_sock_fini(void *arg)
}
}
-
static int
nni_resp_sock_init(void **pp, nni_sock *nsock)
{
nni_resp_sock *psock;
- int rv;
+ int rv;
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
}
- psock->ttl = 8; // Per RFC
- psock->nsock = nsock;
- psock->raw = 0;
- psock->btrace = NULL;
+ psock->ttl = 8; // Per RFC
+ psock->nsock = nsock;
+ psock->raw = 0;
+ psock->btrace = NULL;
psock->btrace_len = 0;
- psock->urq = nni_sock_recvq(nsock);
- psock->uwq = nni_sock_sendq(nsock);
+ psock->urq = nni_sock_recvq(nsock);
+ psock->uwq = nni_sock_sendq(nsock);
if (((rv = nni_idhash_init(&psock->pipes)) != 0) ||
((rv = nni_mtx_init(&psock->mtx)) != 0)) {
goto fail;
@@ -104,7 +102,6 @@ fail:
return (rv);
}
-
static void
nni_resp_sock_open(void *arg)
{
@@ -113,7 +110,6 @@ nni_resp_sock_open(void *arg)
nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
}
-
static void
nni_resp_sock_close(void *arg)
{
@@ -122,12 +118,11 @@ nni_resp_sock_close(void *arg)
nni_aio_stop(&psock->aio_getq);
}
-
static int
nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
nni_resp_pipe *ppipe;
- int rv;
+ int rv;
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
@@ -154,7 +149,7 @@ nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock)
ppipe->npipe = npipe;
ppipe->psock = psock;
- *pp = ppipe;
+ *pp = ppipe;
return (0);
fail:
@@ -162,7 +157,6 @@ fail:
return (rv);
}
-
static void
nni_resp_pipe_fini(void *arg)
{
@@ -176,13 +170,12 @@ nni_resp_pipe_fini(void *arg)
NNI_FREE_STRUCT(ppipe);
}
-
static int
nni_resp_pipe_start(void *arg)
{
nni_resp_pipe *ppipe = arg;
nni_resp_sock *psock = ppipe->psock;
- int rv;
+ int rv;
ppipe->id = nni_pipe_id(ppipe->npipe);
@@ -199,7 +192,6 @@ nni_resp_pipe_start(void *arg)
return (rv);
}
-
static void
nni_resp_pipe_stop(void *arg)
{
@@ -220,7 +212,6 @@ nni_resp_pipe_stop(void *arg)
nni_mtx_unlock(&psock->mtx);
}
-
// nni_resp_sock_send watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
// destination pipe via a separate queue. This prevents a single bad
@@ -230,16 +221,16 @@ void
nni_resp_sock_getq_cb(void *arg)
{
nni_resp_sock *psock = arg;
- nni_msg *msg;
- uint8_t *header;
- uint32_t id;
+ nni_msg * msg;
+ uint8_t * header;
+ uint32_t id;
nni_resp_pipe *ppipe;
- int rv;
+ int rv;
if (nni_aio_result(&psock->aio_getq) != 0) {
return;
}
- msg = psock->aio_getq.a_msg;
+ msg = psock->aio_getq.a_msg;
psock->aio_getq.a_msg = NULL;
// We yank the outgoing pipe id from the header
@@ -269,7 +260,6 @@ nni_resp_sock_getq_cb(void *arg)
nni_mtx_unlock(&psock->mtx);
}
-
void
nni_resp_getq_cb(void *arg)
{
@@ -286,7 +276,6 @@ nni_resp_getq_cb(void *arg)
nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
}
-
void
nni_resp_send_cb(void *arg)
{
@@ -302,17 +291,16 @@ nni_resp_send_cb(void *arg)
nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
}
-
static void
nni_resp_recv_cb(void *arg)
{
nni_resp_pipe *ppipe = arg;
nni_resp_sock *psock = ppipe->psock;
- nni_msgq *urq;
- nni_msg *msg;
- uint8_t idbuf[4];
- int hops;
- int rv;
+ nni_msgq * urq;
+ nni_msg * msg;
+ uint8_t idbuf[4];
+ int hops;
+ int rv;
if (nni_aio_result(&ppipe->aio_recv) != 0) {
goto error;
@@ -322,7 +310,7 @@ nni_resp_recv_cb(void *arg)
NNI_PUT32(idbuf, ppipe->id);
- msg = ppipe->aio_recv.a_msg;
+ msg = ppipe->aio_recv.a_msg;
ppipe->aio_recv.a_msg = NULL;
// Store the pipe id in the header, first thing.
@@ -334,7 +322,7 @@ nni_resp_recv_cb(void *arg)
// Move backtrace from body to header
hops = 0;
for (;;) {
- int end = 0;
+ int end = 0;
uint8_t *body;
if (hops >= psock->ttl) {
@@ -346,8 +334,8 @@ nni_resp_recv_cb(void *arg)
goto error;
}
body = nni_msg_body(msg);
- end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_append_header(msg, body, 4);
+ end = (body[0] & 0x80) ? 1 : 0;
+ rv = nni_msg_append_header(msg, body, 4);
if (rv != 0) {
nni_msg_free(msg);
goto error;
@@ -367,7 +355,6 @@ error:
nni_pipe_stop(ppipe->npipe);
}
-
static void
nni_resp_putq_cb(void *arg)
{
@@ -382,13 +369,12 @@ nni_resp_putq_cb(void *arg)
nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
}
-
static int
nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_resp_sock *psock = arg;
- int rv;
- int oldraw;
+ int rv;
+ int oldraw;
switch (opt) {
case NNG_OPT_MAXTTL:
@@ -396,7 +382,7 @@ nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
break;
case NNG_OPT_RAW:
oldraw = psock->raw;
- rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
if (oldraw != psock->raw) {
if (!psock->raw) {
nni_sock_senderr(psock->nsock, 0);
@@ -411,12 +397,11 @@ nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_resp_sock *psock = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
@@ -431,7 +416,6 @@ nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
static nni_msg *
nni_resp_sock_sfilter(void *arg, nni_msg *msg)
{
@@ -454,38 +438,38 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg)
// drop anything else in the header...
nni_msg_trunc_header(msg, nni_msg_header_len(msg));
- if (nni_msg_append_header(msg, psock->btrace, psock->btrace_len) != 0) {
+ if (nni_msg_append_header(msg, psock->btrace, psock->btrace_len) !=
+ 0) {
nni_free(psock->btrace, psock->btrace_len);
- psock->btrace = NULL;
+ psock->btrace = NULL;
psock->btrace_len = 0;
nni_msg_free(msg);
return (NULL);
}
nni_free(psock->btrace, psock->btrace_len);
- psock->btrace = NULL;
+ psock->btrace = NULL;
psock->btrace_len = 0;
return (msg);
}
-
static nni_msg *
nni_resp_sock_rfilter(void *arg, nni_msg *msg)
{
nni_resp_sock *psock = arg;
- char *header;
- size_t len;
+ char * header;
+ size_t len;
if (psock->raw) {
return (msg);
}
nni_sock_senderr(psock->nsock, 0);
- len = nni_msg_header_len(msg);
+ len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
if (psock->btrace != NULL) {
nni_free(psock->btrace, psock->btrace_len);
- psock->btrace = NULL;
+ psock->btrace = NULL;
psock->btrace_len = 0;
}
if ((psock->btrace = nni_alloc(len)) == NULL) {
@@ -498,30 +482,29 @@ nni_resp_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-
static nni_proto_pipe_ops nni_resp_pipe_ops = {
- .pipe_init = nni_resp_pipe_init,
- .pipe_fini = nni_resp_pipe_fini,
- .pipe_start = nni_resp_pipe_start,
- .pipe_stop = nni_resp_pipe_stop,
+ .pipe_init = nni_resp_pipe_init,
+ .pipe_fini = nni_resp_pipe_fini,
+ .pipe_start = nni_resp_pipe_start,
+ .pipe_stop = nni_resp_pipe_stop,
};
static nni_proto_sock_ops nni_resp_sock_ops = {
- .sock_init = nni_resp_sock_init,
- .sock_fini = nni_resp_sock_fini,
- .sock_open = nni_resp_sock_open,
- .sock_close = nni_resp_sock_close,
- .sock_setopt = nni_resp_sock_setopt,
- .sock_getopt = nni_resp_sock_getopt,
- .sock_rfilter = nni_resp_sock_rfilter,
- .sock_sfilter = nni_resp_sock_sfilter,
+ .sock_init = nni_resp_sock_init,
+ .sock_fini = nni_resp_sock_fini,
+ .sock_open = nni_resp_sock_open,
+ .sock_close = nni_resp_sock_close,
+ .sock_setopt = nni_resp_sock_setopt,
+ .sock_getopt = nni_resp_sock_getopt,
+ .sock_rfilter = nni_resp_sock_rfilter,
+ .sock_sfilter = nni_resp_sock_sfilter,
};
nni_proto nni_respondent_proto = {
- .proto_self = NNG_PROTO_RESPONDENT,
- .proto_peer = NNG_PROTO_SURVEYOR,
- .proto_name = "respondent",
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_self = NNG_PROTO_RESPONDENT,
+ .proto_peer = NNG_PROTO_SURVEYOR,
+ .proto_name = "respondent",
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_resp_sock_ops,
.proto_pipe_ops = &nni_resp_pipe_ops,
};
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 14d73028..e7d0f3ce 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -15,8 +15,8 @@
// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
// survey pattern. This is useful for building service discovery, voting, etc.
-typedef struct nni_surv_pipe nni_surv_pipe;
-typedef struct nni_surv_sock nni_surv_sock;
+typedef struct nni_surv_pipe nni_surv_pipe;
+typedef struct nni_surv_sock nni_surv_sock;
static void nni_surv_sock_getq_cb(void *);
static void nni_surv_getq_cb(void *);
@@ -27,31 +27,31 @@ static void nni_surv_timeout(void *);
// An nni_surv_sock is our per-socket protocol private structure.
struct nni_surv_sock {
- nni_sock * nsock;
- nni_duration survtime;
- nni_time expire;
- int raw;
- int closing;
- uint32_t nextid; // next id
- uint8_t survid[4]; // outstanding request ID (big endian)
- nni_list pipes;
- nni_aio aio_getq;
- nni_timer_node timer;
- nni_msgq * uwq;
- nni_msgq * urq;
- nni_mtx mtx;
+ nni_sock * nsock;
+ nni_duration survtime;
+ nni_time expire;
+ int raw;
+ int closing;
+ uint32_t nextid; // next id
+ uint8_t survid[4]; // outstanding request ID (big endian)
+ nni_list pipes;
+ nni_aio aio_getq;
+ nni_timer_node timer;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ nni_mtx mtx;
};
// An nni_surv_pipe is our per-pipe protocol private structure.
struct nni_surv_pipe {
- nni_pipe * npipe;
- nni_surv_sock * psock;
- nni_msgq * sendq;
- nni_list_node node;
- nni_aio aio_getq;
- nni_aio aio_putq;
- nni_aio aio_send;
- nni_aio aio_recv;
+ nni_pipe * npipe;
+ nni_surv_sock *psock;
+ nni_msgq * sendq;
+ nni_list_node node;
+ nni_aio aio_getq;
+ nni_aio aio_putq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
};
static void
@@ -64,12 +64,11 @@ nni_surv_sock_fini(void *arg)
NNI_FREE_STRUCT(psock);
}
-
static int
nni_surv_sock_init(void **sp, nni_sock *nsock)
{
nni_surv_sock *psock;
- int rv;
+ int rv;
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
@@ -84,13 +83,13 @@ nni_surv_sock_init(void **sp, nni_sock *nsock)
NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node);
nni_timer_init(&psock->timer, nni_surv_timeout, psock);
- psock->nextid = nni_random();
- psock->nsock = nsock;
- psock->raw = 0;
+ psock->nextid = nni_random();
+ psock->nsock = nsock;
+ psock->raw = 0;
psock->survtime = NNI_SECOND * 60;
- psock->expire = NNI_TIME_ZERO;
- psock->uwq = nni_sock_sendq(nsock);
- psock->urq = nni_sock_recvq(nsock);
+ psock->expire = NNI_TIME_ZERO;
+ psock->uwq = nni_sock_sendq(nsock);
+ psock->urq = nni_sock_recvq(nsock);
*sp = psock;
nni_sock_recverr(nsock, NNG_ESTATE);
@@ -101,7 +100,6 @@ fail:
return (rv);
}
-
static void
nni_surv_sock_open(void *arg)
{
@@ -110,7 +108,6 @@ nni_surv_sock_open(void *arg)
nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
}
-
static void
nni_surv_sock_close(void *arg)
{
@@ -120,7 +117,6 @@ nni_surv_sock_close(void *arg)
nni_aio_stop(&psock->aio_getq);
}
-
static void
nni_surv_pipe_fini(void *arg)
{
@@ -134,12 +130,11 @@ nni_surv_pipe_fini(void *arg)
NNI_FREE_STRUCT(ppipe);
}
-
static int
nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
nni_surv_pipe *ppipe;
- int rv;
+ int rv;
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
@@ -166,7 +161,7 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
}
ppipe->npipe = npipe;
ppipe->psock = psock;
- *pp = ppipe;
+ *pp = ppipe;
return (0);
failed:
@@ -174,7 +169,6 @@ failed:
return (rv);
}
-
static int
nni_surv_pipe_start(void *arg)
{
@@ -190,7 +184,6 @@ nni_surv_pipe_start(void *arg)
return (0);
}
-
static void
nni_surv_pipe_stop(void *arg)
{
@@ -210,7 +203,6 @@ nni_surv_pipe_stop(void *arg)
nni_mtx_unlock(&psock->mtx);
}
-
static void
nni_surv_getq_cb(void *arg)
{
@@ -227,7 +219,6 @@ nni_surv_getq_cb(void *arg)
nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
}
-
static void
nni_surv_send_cb(void *arg)
{
@@ -243,7 +234,6 @@ nni_surv_send_cb(void *arg)
nni_msgq_aio_get(ppipe->psock->uwq, &ppipe->aio_getq);
}
-
static void
nni_surv_putq_cb(void *arg)
{
@@ -259,18 +249,17 @@ nni_surv_putq_cb(void *arg)
nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
}
-
static void
nni_surv_recv_cb(void *arg)
{
nni_surv_pipe *ppipe = arg;
- nni_msg *msg;
+ nni_msg * msg;
if (nni_aio_result(&ppipe->aio_recv) != 0) {
goto failed;
}
- msg = ppipe->aio_recv.a_msg;
+ msg = ppipe->aio_recv.a_msg;
ppipe->aio_recv.a_msg = NULL;
// We yank 4 bytes of body, and move them to the header.
@@ -297,13 +286,12 @@ failed:
nni_pipe_stop(ppipe->npipe);
}
-
static int
nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_surv_sock *psock = arg;
- int rv;
- int oldraw;
+ int rv;
+ int oldraw;
switch (opt) {
case NNG_OPT_SURVEYTIME:
@@ -311,14 +299,14 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
break;
case NNG_OPT_RAW:
oldraw = psock->raw;
- rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
if (oldraw != psock->raw) {
if (psock->raw) {
nni_sock_recverr(psock->nsock, 0);
} else {
nni_sock_recverr(psock->nsock, NNG_ESTATE);
}
- memset(psock->survid, 0, sizeof (psock->survid));
+ memset(psock->survid, 0, sizeof(psock->survid));
nni_timer_cancel(&psock->timer);
}
break;
@@ -328,12 +316,11 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_surv_sock *psock = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_SURVEYTIME:
@@ -348,20 +335,19 @@ nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
static void
nni_surv_sock_getq_cb(void *arg)
{
nni_surv_sock *psock = arg;
nni_surv_pipe *ppipe;
nni_surv_pipe *last;
- nni_msg *msg, *dup;
+ nni_msg * msg, *dup;
if (nni_aio_result(&psock->aio_getq) != 0) {
// Should be NNG_ECLOSED.
return;
}
- msg = psock->aio_getq.a_msg;
+ msg = psock->aio_getq.a_msg;
psock->aio_getq.a_msg = NULL;
nni_mtx_lock(&psock->mtx);
@@ -386,25 +372,23 @@ nni_surv_sock_getq_cb(void *arg)
}
}
-
static void
nni_surv_timeout(void *arg)
{
nni_surv_sock *psock = arg;
nni_sock_lock(psock->nsock);
- memset(psock->survid, 0, sizeof (psock->survid));
+ memset(psock->survid, 0, sizeof(psock->survid));
nni_sock_recverr(psock->nsock, NNG_ESTATE);
nni_msgq_set_get_error(psock->urq, NNG_ETIMEDOUT);
nni_sock_unlock(psock->nsock);
}
-
static nni_msg *
nni_surv_sock_sfilter(void *arg, nni_msg *msg)
{
nni_surv_sock *psock = arg;
- uint32_t id;
+ uint32_t id;
if (psock->raw) {
// No automatic retry, and the request ID must
@@ -434,12 +418,11 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg)
// Clear the error condition.
nni_sock_recverr(psock->nsock, 0);
- //nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0);
+ // nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0);
return (msg);
}
-
static nni_msg *
nni_surv_sock_rfilter(void *arg, nni_msg *msg)
{
@@ -466,32 +449,31 @@ nni_surv_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-
static nni_proto_pipe_ops nni_surv_pipe_ops = {
- .pipe_init = nni_surv_pipe_init,
- .pipe_fini = nni_surv_pipe_fini,
- .pipe_start = nni_surv_pipe_start,
- .pipe_stop = nni_surv_pipe_stop,
+ .pipe_init = nni_surv_pipe_init,
+ .pipe_fini = nni_surv_pipe_fini,
+ .pipe_start = nni_surv_pipe_start,
+ .pipe_stop = nni_surv_pipe_stop,
};
static nni_proto_sock_ops nni_surv_sock_ops = {
- .sock_init = nni_surv_sock_init,
- .sock_fini = nni_surv_sock_fini,
- .sock_open = nni_surv_sock_open,
- .sock_close = nni_surv_sock_close,
- .sock_setopt = nni_surv_sock_setopt,
- .sock_getopt = nni_surv_sock_getopt,
- .sock_rfilter = nni_surv_sock_rfilter,
- .sock_sfilter = nni_surv_sock_sfilter,
+ .sock_init = nni_surv_sock_init,
+ .sock_fini = nni_surv_sock_fini,
+ .sock_open = nni_surv_sock_open,
+ .sock_close = nni_surv_sock_close,
+ .sock_setopt = nni_surv_sock_setopt,
+ .sock_getopt = nni_surv_sock_getopt,
+ .sock_rfilter = nni_surv_sock_rfilter,
+ .sock_sfilter = nni_surv_sock_sfilter,
};
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
nni_proto nni_surveyor_proto = {
- .proto_self = NNG_PROTO_SURVEYOR,
- .proto_peer = NNG_PROTO_RESPONDENT,
- .proto_name = "surveyor",
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_self = NNG_PROTO_SURVEYOR,
+ .proto_peer = NNG_PROTO_RESPONDENT,
+ .proto_name = "surveyor",
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_surv_sock_ops,
.proto_pipe_ops = &nni_surv_pipe_ops,
};