From 795aebbee77bb74d8792df96dfe1aa79ec9548fc Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 10 Jul 2017 15:02:38 -0700 Subject: Give up on uncrustify; switch to clang-format. --- src/protocol/bus/bus.c | 100 ++++++++++-------------- src/protocol/pair/pair.c | 84 +++++++++----------- src/protocol/pipeline/pull.c | 73 ++++++++--------- src/protocol/pipeline/push.c | 83 +++++++++----------- src/protocol/pubsub/pub.c | 95 ++++++++++------------ src/protocol/pubsub/sub.c | 97 ++++++++++------------- src/protocol/reqrep/rep.c | 178 +++++++++++++++++++----------------------- src/protocol/reqrep/req.c | 161 +++++++++++++++++--------------------- src/protocol/survey/respond.c | 161 +++++++++++++++++--------------------- src/protocol/survey/survey.c | 138 ++++++++++++++------------------ 10 files changed, 512 insertions(+), 658 deletions(-) (limited to 'src/protocol') diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 5755d5a9..8c9ed83c 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -17,8 +17,8 @@ // for each participant to receive the message, each sender must be connected // to every other node in the network (full mesh). -typedef struct nni_bus_pipe nni_bus_pipe; -typedef struct nni_bus_sock nni_bus_sock; +typedef struct nni_bus_pipe nni_bus_pipe; +typedef struct nni_bus_sock nni_bus_sock; static void nni_bus_sock_getq(nni_bus_sock *); static void nni_bus_pipe_getq(nni_bus_pipe *); @@ -33,27 +33,26 @@ static void nni_bus_pipe_putq_cb(void *); // An nni_bus_sock is our per-socket protocol private structure. struct nni_bus_sock { - nni_sock * nsock; - int raw; - nni_aio aio_getq; - nni_list pipes; - nni_mtx mtx; + nni_sock *nsock; + int raw; + nni_aio aio_getq; + nni_list pipes; + nni_mtx mtx; }; // An nni_bus_pipe is our per-pipe protocol private structure. struct nni_bus_pipe { - nni_pipe * npipe; - nni_bus_sock * psock; - nni_msgq * sendq; - nni_list_node node; - nni_aio aio_getq; - nni_aio aio_recv; - nni_aio aio_send; - nni_aio aio_putq; - nni_mtx mtx; + nni_pipe * npipe; + nni_bus_sock *psock; + nni_msgq * sendq; + nni_list_node node; + nni_aio aio_getq; + nni_aio aio_recv; + nni_aio aio_send; + nni_aio aio_putq; + nni_mtx mtx; }; - static void nni_bus_sock_fini(void *arg) { @@ -66,12 +65,11 @@ nni_bus_sock_fini(void *arg) } } - static int nni_bus_sock_init(void **sp, nni_sock *nsock) { nni_bus_sock *psock; - int rv; + int rv; if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); @@ -85,7 +83,7 @@ nni_bus_sock_init(void **sp, nni_sock *nsock) goto fail; } psock->nsock = nsock; - psock->raw = 0; + psock->raw = 0; *sp = psock; return (0); @@ -95,7 +93,6 @@ fail: return (rv); } - static void nni_bus_sock_open(void *arg) { @@ -104,7 +101,6 @@ nni_bus_sock_open(void *arg) nni_bus_sock_getq(psock); } - static void nni_bus_pipe_fini(void *arg) { @@ -121,12 +117,11 @@ nni_bus_pipe_fini(void *arg) } } - static int nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock) { nni_bus_pipe *ppipe; - int rv; + int rv; if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); @@ -155,7 +150,7 @@ nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock) ppipe->npipe = npipe; ppipe->psock = psock; - *pp = ppipe; + *pp = ppipe; return (0); fail: @@ -163,7 +158,6 @@ fail: return (rv); } - static int nni_bus_pipe_start(void *arg) { @@ -180,7 +174,6 @@ nni_bus_pipe_start(void *arg) return (0); } - static void nni_bus_pipe_stop(void *arg) { @@ -201,7 +194,6 @@ nni_bus_pipe_stop(void *arg) nni_mtx_unlock(&ppipe->psock->mtx); } - static void nni_bus_pipe_getq_cb(void *arg) { @@ -218,7 +210,6 @@ nni_bus_pipe_getq_cb(void *arg) nni_pipe_send(ppipe->npipe, &ppipe->aio_send); } - static void nni_bus_pipe_send_cb(void *arg) { @@ -235,21 +226,20 @@ nni_bus_pipe_send_cb(void *arg) nni_bus_pipe_getq(ppipe); } - static void nni_bus_pipe_recv_cb(void *arg) { nni_bus_pipe *ppipe = arg; nni_bus_sock *psock = ppipe->psock; - nni_msg *msg; - uint32_t id; + nni_msg * msg; + uint32_t id; if (nni_aio_result(&ppipe->aio_recv) != 0) { nni_pipe_stop(ppipe->npipe); return; } msg = ppipe->aio_recv.a_msg; - id = nni_pipe_id(ppipe->npipe); + id = nni_pipe_id(ppipe->npipe); if (nni_msg_prepend_header(msg, &id, 4) != 0) { // XXX: bump a nomemory stat @@ -262,7 +252,6 @@ nni_bus_pipe_recv_cb(void *arg) nni_msgq_aio_put(nni_sock_recvq(psock->nsock), &ppipe->aio_putq); } - static void nni_bus_pipe_putq_cb(void *arg) { @@ -279,16 +268,15 @@ nni_bus_pipe_putq_cb(void *arg) nni_bus_pipe_recv(ppipe); } - static void nni_bus_sock_getq_cb(void *arg) { nni_bus_sock *psock = arg; nni_bus_pipe *ppipe; nni_bus_pipe *lpipe; - nni_msgq *uwq = nni_sock_sendq(psock->nsock); - nni_msg *msg, *dup; - uint32_t sender; + nni_msgq * uwq = nni_sock_sendq(psock->nsock); + nni_msg * msg, *dup; + uint32_t sender; if (nni_aio_result(&psock->aio_getq) != 0) { return; @@ -333,33 +321,29 @@ nni_bus_sock_getq_cb(void *arg) nni_bus_sock_getq(psock); } - static void nni_bus_sock_getq(nni_bus_sock *psock) { nni_msgq_aio_get(nni_sock_sendq(psock->nsock), &psock->aio_getq); } - static void nni_bus_pipe_getq(nni_bus_pipe *ppipe) { nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq); } - static void nni_bus_pipe_recv(nni_bus_pipe *ppipe) { nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv); } - static int nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_bus_sock *psock = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -371,12 +355,11 @@ nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz) return (rv); } - static int nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_bus_sock *psock = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -388,29 +371,28 @@ nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } - static nni_proto_pipe_ops nni_bus_pipe_ops = { - .pipe_init = nni_bus_pipe_init, - .pipe_fini = nni_bus_pipe_fini, - .pipe_start = nni_bus_pipe_start, - .pipe_stop = nni_bus_pipe_stop, + .pipe_init = nni_bus_pipe_init, + .pipe_fini = nni_bus_pipe_fini, + .pipe_start = nni_bus_pipe_start, + .pipe_stop = nni_bus_pipe_stop, }; static nni_proto_sock_ops nni_bus_sock_ops = { - .sock_init = nni_bus_sock_init, - .sock_fini = nni_bus_sock_fini, - .sock_open = nni_bus_sock_open, - .sock_setopt = nni_bus_sock_setopt, - .sock_getopt = nni_bus_sock_getopt, + .sock_init = nni_bus_sock_init, + .sock_fini = nni_bus_sock_fini, + .sock_open = nni_bus_sock_open, + .sock_setopt = nni_bus_sock_setopt, + .sock_getopt = nni_bus_sock_getopt, }; // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. nni_proto nni_bus_proto = { - .proto_self = NNG_PROTO_BUS, - .proto_peer = NNG_PROTO_BUS, - .proto_name = "bus", - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_self = NNG_PROTO_BUS, + .proto_peer = NNG_PROTO_BUS, + .proto_name = "bus", + .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_bus_sock_ops, .proto_pipe_ops = &nni_bus_pipe_ops, }; diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index b283377e..30302352 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -16,8 +16,8 @@ // While a peer is connected to the server, all other peer connection // attempts are discarded. -typedef struct nni_pair_pipe nni_pair_pipe; -typedef struct nni_pair_sock nni_pair_sock; +typedef struct nni_pair_pipe nni_pair_pipe; +typedef struct nni_pair_sock nni_pair_sock; static void nni_pair_send_cb(void *); static void nni_pair_recv_cb(void *); @@ -27,12 +27,12 @@ static void nni_pair_pipe_fini(void *); // An nni_pair_sock is our per-socket protocol private structure. struct nni_pair_sock { - nni_sock * nsock; - nni_pair_pipe * ppipe; - nni_msgq * uwq; - nni_msgq * urq; - int raw; - nni_mtx mtx; + nni_sock * nsock; + nni_pair_pipe *ppipe; + nni_msgq * uwq; + nni_msgq * urq; + int raw; + nni_mtx mtx; }; // An nni_pair_pipe is our per-pipe protocol private structure. We keep @@ -40,19 +40,19 @@ struct nni_pair_sock { // pipe. The separate data structure is more like other protocols that do // manage multiple pipes. struct nni_pair_pipe { - nni_pipe * npipe; - nni_pair_sock * psock; - nni_aio aio_send; - nni_aio aio_recv; - nni_aio aio_getq; - nni_aio aio_putq; + nni_pipe * npipe; + nni_pair_sock *psock; + nni_aio aio_send; + nni_aio aio_recv; + nni_aio aio_getq; + nni_aio aio_putq; }; static int nni_pair_sock_init(void **sp, nni_sock *nsock) { nni_pair_sock *psock; - int rv; + int rv; if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); @@ -63,14 +63,13 @@ nni_pair_sock_init(void **sp, nni_sock *nsock) } psock->nsock = nsock; psock->ppipe = NULL; - psock->raw = 0; - psock->uwq = nni_sock_sendq(nsock); - psock->urq = nni_sock_recvq(nsock); - *sp = psock; + psock->raw = 0; + psock->uwq = nni_sock_sendq(nsock); + psock->urq = nni_sock_recvq(nsock); + *sp = psock; return (0); } - static void nni_pair_sock_fini(void *arg) { @@ -83,12 +82,11 @@ nni_pair_sock_fini(void *arg) } } - static int nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock) { nni_pair_pipe *ppipe; - int rv; + int rv; if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); @@ -111,7 +109,7 @@ nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock) } ppipe->npipe = npipe; ppipe->psock = psock; - *pp = ppipe; + *pp = ppipe; return (0); fail: @@ -119,7 +117,6 @@ fail: return (rv); } - static void nni_pair_pipe_fini(void *arg) { @@ -132,7 +129,6 @@ nni_pair_pipe_fini(void *arg) NNI_FREE_STRUCT(ppipe); } - static int nni_pair_pipe_start(void *arg) { @@ -142,7 +138,7 @@ nni_pair_pipe_start(void *arg) nni_mtx_lock(&psock->mtx); if (psock->ppipe != NULL) { nni_mtx_unlock(&psock->mtx); - return (NNG_EBUSY); // Already have a peer, denied. + return (NNG_EBUSY); // Already have a peer, denied. } psock->ppipe = ppipe; nni_mtx_unlock(&psock->mtx); @@ -155,7 +151,6 @@ nni_pair_pipe_start(void *arg) return (0); } - static void nni_pair_pipe_stop(void *arg) { @@ -174,7 +169,6 @@ nni_pair_pipe_stop(void *arg) nni_mtx_unlock(&psock->mtx); } - static void nni_pair_recv_cb(void *arg) { @@ -191,7 +185,6 @@ nni_pair_recv_cb(void *arg) nni_msgq_aio_put(psock->urq, &ppipe->aio_putq); } - static void nni_pair_putq_cb(void *arg) { @@ -206,7 +199,6 @@ nni_pair_putq_cb(void *arg) nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv); } - static void nni_pair_getq_cb(void *arg) { @@ -223,7 +215,6 @@ nni_pair_getq_cb(void *arg) nni_pipe_send(ppipe->npipe, &ppipe->aio_send); } - static void nni_pair_send_cb(void *arg) { @@ -240,12 +231,11 @@ nni_pair_send_cb(void *arg) nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq); } - static int nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_pair_sock *psock = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -257,12 +247,11 @@ nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz) return (rv); } - static int nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_pair_sock *psock = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -274,29 +263,28 @@ nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } - // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops nni_pair_pipe_ops = { - .pipe_init = nni_pair_pipe_init, - .pipe_fini = nni_pair_pipe_fini, - .pipe_start = nni_pair_pipe_start, - .pipe_stop = nni_pair_pipe_stop, + .pipe_init = nni_pair_pipe_init, + .pipe_fini = nni_pair_pipe_fini, + .pipe_start = nni_pair_pipe_start, + .pipe_stop = nni_pair_pipe_stop, }; static nni_proto_sock_ops nni_pair_sock_ops = { - .sock_init = nni_pair_sock_init, - .sock_fini = nni_pair_sock_fini, - .sock_setopt = nni_pair_sock_setopt, - .sock_getopt = nni_pair_sock_getopt, + .sock_init = nni_pair_sock_init, + .sock_fini = nni_pair_sock_fini, + .sock_setopt = nni_pair_sock_setopt, + .sock_getopt = nni_pair_sock_getopt, }; nni_proto nni_pair_proto = { - .proto_self = NNG_PROTO_PAIR, - .proto_peer = NNG_PROTO_PAIR, - .proto_name = "pair", - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_self = NNG_PROTO_PAIR, + .proto_peer = NNG_PROTO_PAIR, + .proto_name = "pair", + .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_pair_sock_ops, .proto_pipe_ops = &nni_pair_pipe_ops, }; diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 27fd5478..80ff4245 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -14,8 +14,8 @@ // Pull protocol. The PULL protocol is the "read" side of a pipeline. -typedef struct nni_pull_pipe nni_pull_pipe; -typedef struct nni_pull_sock nni_pull_sock; +typedef struct nni_pull_pipe nni_pull_pipe; +typedef struct nni_pull_sock nni_pull_sock; static void nni_pull_putq_cb(void *); static void nni_pull_recv_cb(void *); @@ -23,16 +23,16 @@ static void nni_pull_putq(nni_pull_pipe *, nni_msg *); // An nni_pull_sock is our per-socket protocol private structure. struct nni_pull_sock { - nni_msgq * urq; - int raw; + nni_msgq *urq; + int raw; }; // An nni_pull_pipe is our per-pipe protocol private structure. struct nni_pull_pipe { - nni_pipe * pipe; - nni_pull_sock * pull; - nni_aio putq_aio; - nni_aio recv_aio; + nni_pipe * pipe; + nni_pull_sock *pull; + nni_aio putq_aio; + nni_aio recv_aio; }; static int @@ -45,12 +45,11 @@ nni_pull_sock_init(void **pullp, nni_sock *sock) } pull->raw = 0; pull->urq = nni_sock_recvq(sock); - *pullp = pull; + *pullp = pull; nni_sock_senderr(sock, NNG_ENOTSUP); return (0); } - static void nni_pull_sock_fini(void *arg) { @@ -61,12 +60,11 @@ nni_pull_sock_fini(void *arg) } } - static int nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { nni_pull_pipe *pp; - int rv; + int rv; if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { return (NNG_ENOMEM); @@ -82,11 +80,10 @@ nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock) } pp->pipe = pipe; pp->pull = psock; - *ppp = pp; + *ppp = pp; return (0); } - static void nni_pull_pipe_fini(void *arg) { @@ -99,7 +96,6 @@ nni_pull_pipe_fini(void *arg) } } - static int nni_pull_pipe_start(void *arg) { @@ -111,7 +107,6 @@ nni_pull_pipe_start(void *arg) return (0); } - static void nni_pull_pipe_stop(void *arg) { @@ -121,13 +116,12 @@ nni_pull_pipe_stop(void *arg) nni_aio_stop(&pp->recv_aio); } - static void nni_pull_recv_cb(void *arg) { - nni_pull_pipe *pp = arg; - nni_aio *aio = &pp->recv_aio; - nni_msg *msg; + nni_pull_pipe *pp = arg; + nni_aio * aio = &pp->recv_aio; + nni_msg * msg; if (nni_aio_result(aio) != 0) { // Failed to get a message, probably the pipe is closed. @@ -136,17 +130,16 @@ nni_pull_recv_cb(void *arg) } // Got a message... start the put to send it up to the application. - msg = aio->a_msg; + msg = aio->a_msg; aio->a_msg = NULL; nni_pull_putq(pp, msg); } - static void nni_pull_putq_cb(void *arg) { - nni_pull_pipe *pp = arg; - nni_aio *aio = &pp->putq_aio; + nni_pull_pipe *pp = arg; + nni_aio * aio = &pp->putq_aio; if (nni_aio_result(aio) != 0) { // If we failed to put, probably NNG_ECLOSED, nothing else @@ -160,7 +153,6 @@ nni_pull_putq_cb(void *arg) nni_pipe_recv(pp->pipe, &pp->recv_aio); } - // nni_pull_putq schedules a put operation to the user socket (sendup). static void nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg) @@ -172,12 +164,11 @@ nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg) nni_msgq_aio_put(pull->urq, &pp->putq_aio); } - static int nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_pull_sock *pull = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -189,12 +180,11 @@ nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz) return (rv); } - static int nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_pull_sock *pull = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -206,28 +196,27 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } - // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops nni_pull_pipe_ops = { - .pipe_init = nni_pull_pipe_init, - .pipe_fini = nni_pull_pipe_fini, - .pipe_start = nni_pull_pipe_start, - .pipe_stop = nni_pull_pipe_stop, + .pipe_init = nni_pull_pipe_init, + .pipe_fini = nni_pull_pipe_fini, + .pipe_start = nni_pull_pipe_start, + .pipe_stop = nni_pull_pipe_stop, }; static nni_proto_sock_ops nni_pull_sock_ops = { - .sock_init = nni_pull_sock_init, - .sock_fini = nni_pull_sock_fini, - .sock_setopt = nni_pull_sock_setopt, - .sock_getopt = nni_pull_sock_getopt, + .sock_init = nni_pull_sock_init, + .sock_fini = nni_pull_sock_fini, + .sock_setopt = nni_pull_sock_setopt, + .sock_getopt = nni_pull_sock_getopt, }; nni_proto nni_pull_proto = { - .proto_self = NNG_PROTO_PULL, - .proto_peer = NNG_PROTO_PUSH, - .proto_name = "pull", - .proto_flags = NNI_PROTO_FLAG_RCV, + .proto_self = NNG_PROTO_PULL, + .proto_peer = NNG_PROTO_PUSH, + .proto_name = "pull", + .proto_flags = NNI_PROTO_FLAG_RCV, .proto_pipe_ops = &nni_pull_pipe_ops, .proto_sock_ops = &nni_pull_sock_ops, }; diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index bbda91da..f1da0a9f 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -16,8 +16,8 @@ // Push distributes fairly, or tries to, by giving messages in round-robin // order. -typedef struct nni_push_pipe nni_push_pipe; -typedef struct nni_push_sock nni_push_sock; +typedef struct nni_push_pipe nni_push_pipe; +typedef struct nni_push_sock nni_push_sock; static void nni_push_send_cb(void *); static void nni_push_recv_cb(void *); @@ -25,20 +25,20 @@ static void nni_push_getq_cb(void *); // An nni_push_sock is our per-socket protocol private structure. struct nni_push_sock { - nni_msgq * uwq; - int raw; - nni_sock * sock; + nni_msgq *uwq; + int raw; + nni_sock *sock; }; // An nni_push_pipe is our per-pipe protocol private structure. struct nni_push_pipe { - nni_pipe * pipe; - nni_push_sock * push; - nni_list_node node; + nni_pipe * pipe; + nni_push_sock *push; + nni_list_node node; - nni_aio aio_recv; - nni_aio aio_send; - nni_aio aio_getq; + nni_aio aio_recv; + nni_aio aio_send; + nni_aio aio_getq; }; static int @@ -49,15 +49,14 @@ nni_push_sock_init(void **pushp, nni_sock *sock) if ((push = NNI_ALLOC_STRUCT(push)) == NULL) { return (NNG_ENOMEM); } - push->raw = 0; + push->raw = 0; push->sock = sock; - push->uwq = nni_sock_sendq(sock); - *pushp = push; + push->uwq = nni_sock_sendq(sock); + *pushp = push; nni_sock_recverr(sock, NNG_ENOTSUP); return (0); } - static void nni_push_sock_fini(void *arg) { @@ -68,7 +67,6 @@ nni_push_sock_fini(void *arg) } } - static void nni_push_pipe_fini(void *arg) { @@ -80,12 +78,11 @@ nni_push_pipe_fini(void *arg) NNI_FREE_STRUCT(pp); } - static int nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { nni_push_pipe *pp; - int rv; + int rv; if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { return (NNG_ENOMEM); @@ -103,7 +100,7 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) NNI_LIST_NODE_INIT(&pp->node); pp->pipe = pipe; pp->push = psock; - *ppp = pp; + *ppp = pp; return (0); fail: @@ -111,11 +108,10 @@ fail: return (rv); } - static int nni_push_pipe_start(void *arg) { - nni_push_pipe *pp = arg; + nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) { @@ -132,11 +128,10 @@ nni_push_pipe_start(void *arg) return (0); } - static void nni_push_pipe_stop(void *arg) { - nni_push_pipe *pp = arg; + nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; nni_aio_stop(&pp->aio_recv); @@ -144,7 +139,6 @@ nni_push_pipe_stop(void *arg) nni_aio_stop(&pp->aio_getq); } - static void nni_push_recv_cb(void *arg) { @@ -161,11 +155,10 @@ nni_push_recv_cb(void *arg) nni_pipe_recv(pp->pipe, &pp->aio_recv); } - static void nni_push_send_cb(void *arg) { - nni_push_pipe *pp = arg; + nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; if (nni_aio_result(&pp->aio_send) != 0) { @@ -178,12 +171,11 @@ nni_push_send_cb(void *arg) nni_msgq_aio_get(push->uwq, &pp->aio_getq); } - static void nni_push_getq_cb(void *arg) { - nni_push_pipe *pp = arg; - nni_aio *aio = &pp->aio_getq; + nni_push_pipe *pp = arg; + nni_aio * aio = &pp->aio_getq; if (nni_aio_result(aio) != 0) { // If the socket is closing, nothing else we can do. @@ -192,17 +184,16 @@ nni_push_getq_cb(void *arg) } pp->aio_send.a_msg = aio->a_msg; - aio->a_msg = NULL; + aio->a_msg = NULL; nni_pipe_send(pp->pipe, &pp->aio_send); } - static int nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_push_sock *push = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -214,12 +205,11 @@ nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz) return (rv); } - static int nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_push_sock *push = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -231,28 +221,27 @@ nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } - // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops nni_push_pipe_ops = { - .pipe_init = nni_push_pipe_init, - .pipe_fini = nni_push_pipe_fini, - .pipe_start = nni_push_pipe_start, - .pipe_stop = nni_push_pipe_stop, + .pipe_init = nni_push_pipe_init, + .pipe_fini = nni_push_pipe_fini, + .pipe_start = nni_push_pipe_start, + .pipe_stop = nni_push_pipe_stop, }; static nni_proto_sock_ops nni_push_sock_ops = { - .sock_init = nni_push_sock_init, - .sock_fini = nni_push_sock_fini, - .sock_setopt = nni_push_sock_setopt, - .sock_getopt = nni_push_sock_getopt, + .sock_init = nni_push_sock_init, + .sock_fini = nni_push_sock_fini, + .sock_setopt = nni_push_sock_setopt, + .sock_getopt = nni_push_sock_getopt, }; nni_proto nni_push_proto = { - .proto_self = NNG_PROTO_PUSH, - .proto_peer = NNG_PROTO_PULL, - .proto_name = "push", - .proto_flags = NNI_PROTO_FLAG_SND, + .proto_self = NNG_PROTO_PUSH, + .proto_peer = NNG_PROTO_PULL, + .proto_name = "push", + .proto_flags = NNI_PROTO_FLAG_SND, .proto_pipe_ops = &nni_push_pipe_ops, .proto_sock_ops = &nni_push_sock_ops, }; diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index e3b37f1a..64c2c59d 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -17,8 +17,8 @@ // perform sender-side filtering. Its best effort delivery, so anything // that can't receive the message won't get one. -typedef struct nni_pub_pipe nni_pub_pipe; -typedef struct nni_pub_sock nni_pub_sock; +typedef struct nni_pub_pipe nni_pub_pipe; +typedef struct nni_pub_sock nni_pub_sock; static void nni_pub_pipe_recv_cb(void *); static void nni_pub_pipe_send_cb(void *); @@ -29,30 +29,30 @@ static void nni_pub_pipe_fini(void *); // An nni_pub_sock is our per-socket protocol private structure. struct nni_pub_sock { - nni_sock * sock; - nni_msgq * uwq; - int raw; - nni_aio aio_getq; - nni_list pipes; - nni_mtx mtx; + nni_sock *sock; + nni_msgq *uwq; + int raw; + nni_aio aio_getq; + nni_list pipes; + nni_mtx mtx; }; // An nni_pub_pipe is our per-pipe protocol private structure. struct nni_pub_pipe { - nni_pipe * pipe; - nni_pub_sock * pub; - nni_msgq * sendq; - nni_aio aio_getq; - nni_aio aio_send; - nni_aio aio_recv; - nni_list_node node; + nni_pipe * pipe; + nni_pub_sock *pub; + nni_msgq * sendq; + nni_aio aio_getq; + nni_aio aio_send; + nni_aio aio_recv; + nni_list_node node; }; static int nni_pub_sock_init(void **pubp, nni_sock *sock) { nni_pub_sock *pub; - int rv; + int rv; if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) { return (NNG_ENOMEM); @@ -67,7 +67,7 @@ nni_pub_sock_init(void **pubp, nni_sock *sock) return (rv); } pub->sock = sock; - pub->raw = 0; + pub->raw = 0; NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node); pub->uwq = nni_sock_sendq(sock); @@ -77,7 +77,6 @@ nni_pub_sock_init(void **pubp, nni_sock *sock) return (0); } - static void nni_pub_sock_fini(void *arg) { @@ -88,7 +87,6 @@ nni_pub_sock_fini(void *arg) NNI_FREE_STRUCT(pub); } - static void nni_pub_sock_open(void *arg) { @@ -97,7 +95,6 @@ nni_pub_sock_open(void *arg) nni_msgq_aio_get(pub->uwq, &pub->aio_getq); } - static void nni_pub_pipe_fini(void *arg) { @@ -110,12 +107,11 @@ nni_pub_pipe_fini(void *arg) NNI_FREE_STRUCT(pp); } - static int nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { nni_pub_pipe *pp; - int rv; + int rv; if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { return (NNG_ENOMEM); @@ -140,8 +136,8 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock) goto fail; } pp->pipe = pipe; - pp->pub = psock; - *ppp = pp; + pp->pub = psock; + *ppp = pp; return (0); fail: @@ -149,11 +145,10 @@ fail: return (rv); } - static int nni_pub_pipe_start(void *arg) { - nni_pub_pipe *pp = arg; + nni_pub_pipe *pp = arg; nni_pub_sock *pub = pp->pub; if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) { @@ -170,11 +165,10 @@ nni_pub_pipe_start(void *arg) return (0); } - static void nni_pub_pipe_stop(void *arg) { - nni_pub_pipe *pp = arg; + nni_pub_pipe *pp = arg; nni_pub_sock *pub = pp->pub; nni_aio_stop(&pp->aio_getq); @@ -189,23 +183,22 @@ nni_pub_pipe_stop(void *arg) nni_mtx_unlock(&pub->mtx); } - static void nni_pub_sock_getq_cb(void *arg) { nni_pub_sock *pub = arg; - nni_msgq *uwq = pub->uwq; - nni_msg *msg, *dup; + nni_msgq * uwq = pub->uwq; + nni_msg * msg, *dup; nni_pub_pipe *pp; nni_pub_pipe *last; - int rv; + int rv; if (nni_aio_result(&pub->aio_getq) != 0) { return; } - msg = pub->aio_getq.a_msg; + msg = pub->aio_getq.a_msg; pub->aio_getq.a_msg = NULL; nni_mtx_lock(&pub->mtx); @@ -232,7 +225,6 @@ nni_pub_sock_getq_cb(void *arg) nni_msgq_aio_get(uwq, &pub->aio_getq); } - static void nni_pub_pipe_recv_cb(void *arg) { @@ -248,7 +240,6 @@ nni_pub_pipe_recv_cb(void *arg) nni_pipe_recv(pp->pipe, &pp->aio_recv); } - static void nni_pub_pipe_getq_cb(void *arg) { @@ -265,7 +256,6 @@ nni_pub_pipe_getq_cb(void *arg) nni_pipe_send(pp->pipe, &pp->aio_send); } - static void nni_pub_pipe_send_cb(void *arg) { @@ -282,12 +272,11 @@ nni_pub_pipe_send_cb(void *arg) nni_msgq_aio_get(pp->sendq, &pp->aio_getq); } - static int nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_pub_sock *pub = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -299,12 +288,11 @@ nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) return (rv); } - static int nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_pub_sock *pub = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -316,29 +304,28 @@ nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } - // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops nni_pub_pipe_ops = { - .pipe_init = nni_pub_pipe_init, - .pipe_fini = nni_pub_pipe_fini, - .pipe_start = nni_pub_pipe_start, - .pipe_stop = nni_pub_pipe_stop, + .pipe_init = nni_pub_pipe_init, + .pipe_fini = nni_pub_pipe_fini, + .pipe_start = nni_pub_pipe_start, + .pipe_stop = nni_pub_pipe_stop, }; nni_proto_sock_ops nni_pub_sock_ops = { - .sock_init = nni_pub_sock_init, - .sock_fini = nni_pub_sock_fini, - .sock_open = nni_pub_sock_open, - .sock_setopt = nni_pub_sock_setopt, - .sock_getopt = nni_pub_sock_getopt, + .sock_init = nni_pub_sock_init, + .sock_fini = nni_pub_sock_fini, + .sock_open = nni_pub_sock_open, + .sock_setopt = nni_pub_sock_setopt, + .sock_getopt = nni_pub_sock_getopt, }; nni_proto nni_pub_proto = { - .proto_self = NNG_PROTO_PUB, - .proto_peer = NNG_PROTO_SUB, - .proto_name = "pub", - .proto_flags = NNI_PROTO_FLAG_SND, + .proto_self = NNG_PROTO_PUB, + .proto_peer = NNG_PROTO_SUB, + .proto_name = "pub", + .proto_flags = NNI_PROTO_FLAG_SND, .proto_sock_ops = &nni_pub_sock_ops, .proto_pipe_ops = &nni_pub_pipe_ops, }; diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 09a724e2..bc4de973 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -16,34 +16,34 @@ // it from publishers, and filters out those it is not interested in, // only passing up ones that match known subscriptions. -typedef struct nni_sub_pipe nni_sub_pipe; -typedef struct nni_sub_sock nni_sub_sock; -typedef struct nni_sub_topic nni_sub_topic; +typedef struct nni_sub_pipe nni_sub_pipe; +typedef struct nni_sub_sock nni_sub_sock; +typedef struct nni_sub_topic nni_sub_topic; static void nni_sub_recv_cb(void *); static void nni_sub_putq_cb(void *); static void nni_sub_pipe_fini(void *); struct nni_sub_topic { - nni_list_node node; - size_t len; - void * buf; + nni_list_node node; + size_t len; + void * buf; }; // An nni_rep_sock is our per-socket protocol private structure. struct nni_sub_sock { - nni_sock * sock; - nni_list topics; - nni_msgq * urq; - int raw; + nni_sock *sock; + nni_list topics; + nni_msgq *urq; + int raw; }; // An nni_rep_pipe is our per-pipe protocol private structure. struct nni_sub_pipe { - nni_pipe * pipe; - nni_sub_sock * sub; - nni_aio aio_recv; - nni_aio aio_putq; + nni_pipe * pipe; + nni_sub_sock *sub; + nni_aio aio_recv; + nni_aio aio_putq; }; static int @@ -56,7 +56,7 @@ nni_sub_sock_init(void **subp, nni_sock *sock) } NNI_LIST_INIT(&sub->topics, nni_sub_topic, node); sub->sock = sock; - sub->raw = 0; + sub->raw = 0; sub->urq = nni_sock_recvq(sock); nni_sock_senderr(sock, NNG_ENOTSUP); @@ -64,11 +64,10 @@ nni_sub_sock_init(void **subp, nni_sock *sock) return (0); } - static void nni_sub_sock_fini(void *arg) { - nni_sub_sock *sub = arg; + nni_sub_sock * sub = arg; nni_sub_topic *topic; while ((topic = nni_list_first(&sub->topics)) != NULL) { @@ -79,12 +78,11 @@ nni_sub_sock_fini(void *arg) NNI_FREE_STRUCT(sub); } - static int nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock) { nni_sub_pipe *sp; - int rv; + int rv; if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) { return (NNG_ENOMEM); @@ -95,12 +93,11 @@ nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock) return (rv); } sp->pipe = pipe; - sp->sub = ssock; - *spp = sp; + sp->sub = ssock; + *spp = sp; return (0); } - static void nni_sub_pipe_fini(void *arg) { @@ -111,7 +108,6 @@ nni_sub_pipe_fini(void *arg) NNI_FREE_STRUCT(sp); } - static int nni_sub_pipe_start(void *arg) { @@ -121,7 +117,6 @@ nni_sub_pipe_start(void *arg) return (0); } - static void nni_sub_pipe_stop(void *arg) { @@ -131,13 +126,12 @@ nni_sub_pipe_stop(void *arg) nni_aio_stop(&sp->aio_recv); } - static void nni_sub_recv_cb(void *arg) { - nni_sub_pipe *sp = arg; + nni_sub_pipe *sp = arg; nni_sub_sock *sub = sp->sub; - nni_msgq *urq = sub->urq; + nni_msgq * urq = sub->urq; if (nni_aio_result(&sp->aio_recv) != 0) { nni_pipe_stop(sp->pipe); @@ -149,7 +143,6 @@ nni_sub_recv_cb(void *arg) nni_msgq_aio_put(sub->urq, &sp->aio_putq); } - static void nni_sub_putq_cb(void *arg) { @@ -165,7 +158,6 @@ nni_sub_putq_cb(void *arg) nni_pipe_recv(sp->pipe, &sp->aio_recv); } - // For now we maintain subscriptions on a sorted linked list. As we do not // expect to have huge numbers of subscriptions, and as the operation is // really O(n), we think this is acceptable. In the future we might decide @@ -215,12 +207,11 @@ nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz) return (0); } - static int nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz) { nni_sub_topic *topic; - int rv; + int rv; NNI_LIST_FOREACH (&sub->topics, topic) { if (topic->len >= sz) { @@ -246,12 +237,11 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz) return (NNG_ENOENT); } - static int nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_sub_sock *sub = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -269,12 +259,11 @@ nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) return (rv); } - static int nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_sub_sock *sub = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RAW: @@ -286,22 +275,21 @@ nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } - static nni_msg * nni_sub_sock_rfilter(void *arg, nni_msg *msg) { - nni_sub_sock *sub = arg; + nni_sub_sock * sub = arg; nni_sub_topic *topic; - char *body; - size_t len; - int match; + char * body; + size_t len; + int match; if (sub->raw) { return (msg); } body = nni_msg_body(msg); - len = nni_msg_len(msg); + len = nni_msg_len(msg); match = 0; // Check to see if the message matches one of our subscriptions. @@ -329,29 +317,28 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg) return (msg); } - // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops nni_sub_pipe_ops = { - .pipe_init = nni_sub_pipe_init, - .pipe_fini = nni_sub_pipe_fini, - .pipe_start = nni_sub_pipe_start, - .pipe_stop = nni_sub_pipe_stop, + .pipe_init = nni_sub_pipe_init, + .pipe_fini = nni_sub_pipe_fini, + .pipe_start = nni_sub_pipe_start, + .pipe_stop = nni_sub_pipe_stop, }; static nni_proto_sock_ops nni_sub_sock_ops = { - .sock_init = nni_sub_sock_init, - .sock_fini = nni_sub_sock_fini, - .sock_setopt = nni_sub_sock_setopt, - .sock_getopt = nni_sub_sock_getopt, - .sock_rfilter = nni_sub_sock_rfilter, + .sock_init = nni_sub_sock_init, + .sock_fini = nni_sub_sock_fini, + .sock_setopt = nni_sub_sock_setopt, + .sock_getopt = nni_sub_sock_getopt, + .sock_rfilter = nni_sub_sock_rfilter, }; nni_proto nni_sub_proto = { - .proto_self = NNG_PROTO_SUB, - .proto_peer = NNG_PROTO_PUB, - .proto_name = "sub", - .proto_flags = NNI_PROTO_FLAG_RCV, + .proto_self = NNG_PROTO_SUB, + .proto_peer = NNG_PROTO_PUB, + .proto_name = "sub", + .proto_flags = NNI_PROTO_FLAG_RCV, .proto_sock_ops = &nni_sub_sock_ops, .proto_pipe_ops = &nni_sub_pipe_ops, }; diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 7d887b55..cfc83a5b 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -16,9 +16,8 @@ // request-reply pair. This is useful for building RPC servers, for // example. -typedef struct nni_rep_pipe nni_rep_pipe; -typedef struct nni_rep_sock nni_rep_sock; - +typedef struct nni_rep_pipe nni_rep_pipe; +typedef struct nni_rep_sock nni_rep_sock; static void nni_rep_sock_getq_cb(void *); static void nni_rep_pipe_getq_cb(void *); @@ -29,29 +28,29 @@ static void nni_rep_pipe_fini(void *); // An nni_rep_sock is our per-socket protocol private structure. struct nni_rep_sock { - nni_sock * sock; - nni_msgq * uwq; - nni_msgq * urq; - int raw; - int ttl; - nni_idhash pipes; - char * btrace; - size_t btrace_len; - nni_aio aio_getq; - nni_mtx mtx; + nni_sock * sock; + nni_msgq * uwq; + nni_msgq * urq; + int raw; + int ttl; + nni_idhash pipes; + char * btrace; + size_t btrace_len; + nni_aio aio_getq; + nni_mtx mtx; }; // An nni_rep_pipe is our per-pipe protocol private structure. struct nni_rep_pipe { - nni_pipe * pipe; - nni_rep_sock * rep; - nni_msgq * sendq; - uint32_t id; // we have to save it - nni_aio aio_getq; - nni_aio aio_send; - nni_aio aio_recv; - nni_aio aio_putq; - nni_mtx mtx; + nni_pipe * pipe; + nni_rep_sock *rep; + nni_msgq * sendq; + uint32_t id; // we have to save it + nni_aio aio_getq; + nni_aio aio_send; + nni_aio aio_recv; + nni_aio aio_putq; + nni_mtx mtx; }; static void @@ -68,20 +67,19 @@ nni_rep_sock_fini(void *arg) NNI_FREE_STRUCT(rep); } - static int nni_rep_sock_init(void **repp, nni_sock *sock) { nni_rep_sock *rep; - int rv; + int rv; if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) { return (NNG_ENOMEM); } - rep->ttl = 8; // Per RFC - rep->sock = sock; - rep->raw = 0; - rep->btrace = NULL; + rep->ttl = 8; // Per RFC + rep->sock = sock; + rep->raw = 0; + rep->btrace = NULL; rep->btrace_len = 0; if (((rv = nni_mtx_init(&rep->mtx)) != 0) || ((rv = nni_idhash_init(&rep->pipes)) != 0)) { @@ -106,7 +104,6 @@ fail: return (rv); } - static void nni_rep_sock_open(void *arg) { @@ -115,7 +112,6 @@ nni_rep_sock_open(void *arg) nni_msgq_aio_get(rep->uwq, &rep->aio_getq); } - static void nni_rep_sock_close(void *arg) { @@ -124,12 +120,11 @@ nni_rep_sock_close(void *arg) nni_aio_stop(&rep->aio_getq); } - static int nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { nni_rep_pipe *rp; - int rv; + int rv; if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); @@ -138,21 +133,25 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) ((rv = nni_mtx_init(&rp->mtx)) != 0)) { goto fail; } - if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) != 0) { + if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) != + 0) { goto fail; } - if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) != 0) { + if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) != + 0) { goto fail; } - if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) != 0) { + if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) != + 0) { goto fail; } - if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) != 0) { + if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) != + 0) { goto fail; } rp->pipe = pipe; - rp->rep = rsock; - *rpp = rp; + rp->rep = rsock; + *rpp = rp; return (0); fail: @@ -160,7 +159,6 @@ fail: return (rv); } - static void nni_rep_pipe_fini(void *arg) { @@ -175,13 +173,12 @@ nni_rep_pipe_fini(void *arg) NNI_FREE_STRUCT(rp); } - static int nni_rep_pipe_start(void *arg) { - nni_rep_pipe *rp = arg; + nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - int rv; + int rv; rp->id = nni_pipe_id(rp->pipe); @@ -197,13 +194,12 @@ nni_rep_pipe_start(void *arg) return (0); } - static void nni_rep_pipe_stop(void *arg) { - nni_rep_pipe *rp = arg; + nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - uint32_t id; + uint32_t id; nni_aio_stop(&rp->aio_getq); nni_aio_stop(&rp->aio_putq); @@ -211,8 +207,8 @@ nni_rep_pipe_stop(void *arg) nni_aio_stop(&rp->aio_recv); nni_mtx_lock(&rp->mtx); - id = rp->id; - rp->id = 0; // makes this idempotent + id = rp->id; + rp->id = 0; // makes this idempotent nni_msgq_close(rp->sendq); nni_mtx_unlock(&rp->mtx); @@ -223,17 +219,16 @@ nni_rep_pipe_stop(void *arg) } } - static void nni_rep_sock_getq_cb(void *arg) { nni_rep_sock *rep = arg; - nni_msgq *uwq = rep->uwq; - nni_msg *msg; - uint8_t *header; - uint32_t id; + nni_msgq * uwq = rep->uwq; + nni_msg * msg; + uint8_t * header; + uint32_t id; nni_rep_pipe *rp; - int rv; + int rv; // This watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate @@ -245,7 +240,7 @@ nni_rep_sock_getq_cb(void *arg) return; } - msg = rep->aio_getq.a_msg; + msg = rep->aio_getq.a_msg; rep->aio_getq.a_msg = NULL; // We yank the outgoing pipe id from the header @@ -278,7 +273,6 @@ nni_rep_sock_getq_cb(void *arg) nni_msgq_aio_get(uwq, &rep->aio_getq); } - static void nni_rep_pipe_getq_cb(void *arg) { @@ -295,7 +289,6 @@ nni_rep_pipe_getq_cb(void *arg) nni_pipe_send(rp->pipe, &rp->aio_send); } - static void nni_rep_pipe_send_cb(void *arg) { @@ -311,17 +304,16 @@ nni_rep_pipe_send_cb(void *arg) nni_msgq_aio_get(rp->sendq, &rp->aio_getq); } - static void nni_rep_pipe_recv_cb(void *arg) { - nni_rep_pipe *rp = arg; + nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_msg *msg; - int rv; - uint8_t idbuf[4]; - uint8_t *body; - int hops; + nni_msg * msg; + int rv; + uint8_t idbuf[4]; + uint8_t * body; + int hops; if (nni_aio_result(&rp->aio_recv) != 0) { nni_pipe_stop(rp->pipe); @@ -330,7 +322,7 @@ nni_rep_pipe_recv_cb(void *arg) NNI_PUT32(idbuf, rp->id); - msg = rp->aio_recv.a_msg; + msg = rp->aio_recv.a_msg; rp->aio_recv.a_msg = NULL; // Store the pipe id in the header, first thing. @@ -350,8 +342,8 @@ nni_rep_pipe_recv_cb(void *arg) goto malformed; } body = nni_msg_body(msg); - end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_append_header(msg, body, 4); + end = (body[0] & 0x80) ? 1 : 0; + rv = nni_msg_append_header(msg, body, 4); if (rv != 0) { // Presumably this is due to out of memory. // We could just discard and try again, but we @@ -376,7 +368,6 @@ malformed: nni_pipe_stop(rp->pipe); } - static void nni_rep_pipe_putq_cb(void *arg) { @@ -392,12 +383,11 @@ nni_rep_pipe_putq_cb(void *arg) nni_pipe_recv(rp->pipe, &rp->aio_recv); } - static int nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_rep_sock *rep = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_MAXTTL: @@ -413,12 +403,11 @@ nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz) return (rv); } - static int nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_rep_sock *rep = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_MAXTTL: @@ -433,7 +422,6 @@ nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } - static nni_msg * nni_rep_sock_sfilter(void *arg, nni_msg *msg) { @@ -458,36 +446,35 @@ nni_rep_sock_sfilter(void *arg, nni_msg *msg) if (nni_msg_append_header(msg, rep->btrace, rep->btrace_len) != 0) { nni_free(rep->btrace, rep->btrace_len); - rep->btrace = NULL; + rep->btrace = NULL; rep->btrace_len = 0; nni_msg_free(msg); return (NULL); } nni_free(rep->btrace, rep->btrace_len); - rep->btrace = NULL; + rep->btrace = NULL; rep->btrace_len = 0; return (msg); } - static nni_msg * nni_rep_sock_rfilter(void *arg, nni_msg *msg) { nni_rep_sock *rep = arg; - char *header; - size_t len; + char * header; + size_t len; if (rep->raw) { return (msg); } nni_sock_senderr(rep->sock, 0); - len = nni_msg_header_len(msg); + len = nni_msg_header_len(msg); header = nni_msg_header(msg); if (rep->btrace != NULL) { nni_free(rep->btrace, rep->btrace_len); - rep->btrace = NULL; + rep->btrace = NULL; rep->btrace_len = 0; } if ((rep->btrace = nni_alloc(len)) == NULL) { @@ -500,32 +487,31 @@ nni_rep_sock_rfilter(void *arg, nni_msg *msg) return (msg); } - // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops nni_rep_pipe_ops = { - .pipe_init = nni_rep_pipe_init, - .pipe_fini = nni_rep_pipe_fini, - .pipe_start = nni_rep_pipe_start, - .pipe_stop = nni_rep_pipe_stop, + .pipe_init = nni_rep_pipe_init, + .pipe_fini = nni_rep_pipe_fini, + .pipe_start = nni_rep_pipe_start, + .pipe_stop = nni_rep_pipe_stop, }; static nni_proto_sock_ops nni_rep_sock_ops = { - .sock_init = nni_rep_sock_init, - .sock_fini = nni_rep_sock_fini, - .sock_open = nni_rep_sock_open, - .sock_close = nni_rep_sock_close, - .sock_setopt = nni_rep_sock_setopt, - .sock_getopt = nni_rep_sock_getopt, - .sock_rfilter = nni_rep_sock_rfilter, - .sock_sfilter = nni_rep_sock_sfilter, + .sock_init = nni_rep_sock_init, + .sock_fini = nni_rep_sock_fini, + .sock_open = nni_rep_sock_open, + .sock_close = nni_rep_sock_close, + .sock_setopt = nni_rep_sock_setopt, + .sock_getopt = nni_rep_sock_getopt, + .sock_rfilter = nni_rep_sock_rfilter, + .sock_sfilter = nni_rep_sock_sfilter, }; nni_proto nni_rep_proto = { - .proto_self = NNG_PROTO_REP, - .proto_peer = NNG_PROTO_REQ, - .proto_name = "rep", - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_self = NNG_PROTO_REP, + .proto_peer = NNG_PROTO_REQ, + .proto_name = "rep", + .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_rep_sock_ops, .proto_pipe_ops = &nni_rep_pipe_ops, }; diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index f32fd66f..f77700f5 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -7,9 +7,9 @@ // found online at https://opensource.org/licenses/MIT. // +#include #include #include -#include #include "core/nng_impl.h" @@ -17,8 +17,8 @@ // request-reply pair. This is useful for building RPC clients, for // example. -typedef struct nni_req_pipe nni_req_pipe; -typedef struct nni_req_sock nni_req_sock; +typedef struct nni_req_pipe nni_req_pipe; +typedef struct nni_req_sock nni_req_sock; static void nni_req_resend(nni_req_sock *); static void nni_req_timeout(void *); @@ -26,38 +26,38 @@ static void nni_req_pipe_fini(void *); // An nni_req_sock is our per-socket protocol private structure. struct nni_req_sock { - nni_sock * sock; - nni_msgq * uwq; - nni_msgq * urq; - nni_duration retry; - nni_time resend; - int raw; - int wantw; - nni_msg * reqmsg; + nni_sock * sock; + nni_msgq * uwq; + nni_msgq * urq; + nni_duration retry; + nni_time resend; + int raw; + int wantw; + nni_msg * reqmsg; - nni_req_pipe * pendpipe; + nni_req_pipe *pendpipe; - nni_list readypipes; - nni_list busypipes; + nni_list readypipes; + nni_list busypipes; - nni_timer_node timer; + nni_timer_node timer; - uint32_t nextid; // next id - uint8_t reqid[4]; // outstanding request ID (big endian) - nni_mtx mtx; + uint32_t nextid; // next id + uint8_t reqid[4]; // outstanding request ID (big endian) + nni_mtx mtx; }; // An nni_req_pipe is our per-pipe protocol private structure. struct nni_req_pipe { - nni_pipe * pipe; - nni_req_sock * req; - nni_list_node node; - nni_aio aio_getq; // raw mode only - nni_aio aio_sendraw; // raw mode only - nni_aio aio_sendcooked; // cooked mode only - nni_aio aio_recv; - nni_aio aio_putq; - nni_mtx mtx; + nni_pipe * pipe; + nni_req_sock *req; + nni_list_node node; + nni_aio aio_getq; // raw mode only + nni_aio aio_sendraw; // raw mode only + nni_aio aio_sendcooked; // cooked mode only + nni_aio aio_recv; + nni_aio aio_putq; + nni_mtx mtx; }; static void nni_req_resender(void *); @@ -71,7 +71,7 @@ static int nni_req_sock_init(void **reqp, nni_sock *sock) { nni_req_sock *req; - int rv; + int rv; if ((req = NNI_ALLOC_STRUCT(req)) == NULL) { return (NNG_ENOMEM); @@ -88,21 +88,20 @@ nni_req_sock_init(void **reqp, nni_sock *sock) // this is "semi random" start for request IDs. req->nextid = nni_random(); - req->retry = NNI_SECOND * 60; - req->sock = sock; + req->retry = NNI_SECOND * 60; + req->sock = sock; req->reqmsg = NULL; - req->raw = 0; - req->wantw = 0; + req->raw = 0; + req->wantw = 0; req->resend = NNI_TIME_ZERO; req->uwq = nni_sock_sendq(sock); req->urq = nni_sock_recvq(sock); - *reqp = req; + *reqp = req; nni_sock_recverr(sock, NNG_ESTATE); return (0); } - static void nni_req_sock_close(void *arg) { @@ -111,7 +110,6 @@ nni_req_sock_close(void *arg) nni_timer_cancel(&req->timer); } - static void nni_req_sock_fini(void *arg) { @@ -126,12 +124,11 @@ nni_req_sock_fini(void *arg) NNI_FREE_STRUCT(req); } - static int nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { nni_req_pipe *rp; - int rv; + int rv; if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); @@ -159,8 +156,8 @@ nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) NNI_LIST_NODE_INIT(&rp->node); rp->pipe = pipe; - rp->req = rsock; - *rpp = rp; + rp->req = rsock; + *rpp = rp; return (0); failed: @@ -168,7 +165,6 @@ failed: return (rv); } - static void nni_req_pipe_fini(void *arg) { @@ -185,11 +181,10 @@ nni_req_pipe_fini(void *arg) } } - static int nni_req_pipe_start(void *arg) { - nni_req_pipe *rp = arg; + nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) { @@ -203,17 +198,15 @@ nni_req_pipe_start(void *arg) } nni_mtx_unlock(&req->mtx); - nni_msgq_aio_get(req->uwq, &rp->aio_getq); nni_pipe_recv(rp->pipe, &rp->aio_recv); return (0); } - static void nni_req_pipe_stop(void *arg) { - nni_req_pipe *rp = arg; + nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; nni_aio_stop(&rp->aio_getq); @@ -236,19 +229,18 @@ nni_req_pipe_stop(void *arg) // removing the pipe we sent the last request on... // schedule immediate resend. req->pendpipe = NULL; - req->resend = NNI_TIME_ZERO; - req->wantw = 1; + req->resend = NNI_TIME_ZERO; + req->wantw = 1; nni_req_resend(req); } nni_mtx_unlock(&req->mtx); } - static int nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_req_sock *req = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RESENDTIME: @@ -263,12 +255,11 @@ nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz) return (rv); } - static int nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_req_sock *req = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_RESENDTIME: @@ -283,7 +274,6 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } - // Raw and cooked mode differ in the way they send messages out. // // For cooked mdes, we have a getq callback on the upper write queue, which @@ -303,7 +293,7 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static void nni_req_getq_cb(void *arg) { - nni_req_pipe *rp = arg; + nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; // We should be in RAW mode. Cooked mode traffic bypasses @@ -318,13 +308,12 @@ nni_req_getq_cb(void *arg) } rp->aio_sendraw.a_msg = rp->aio_getq.a_msg; - rp->aio_getq.a_msg = NULL; + rp->aio_getq.a_msg = NULL; // Send the message, but use the raw mode aio. nni_pipe_send(rp->pipe, &rp->aio_sendraw); } - static void nni_req_sendraw_cb(void *arg) { @@ -341,11 +330,10 @@ nni_req_sendraw_cb(void *arg) nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq); } - static void nni_req_sendcooked_cb(void *arg) { - nni_req_pipe *rp = arg; + nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; if (nni_aio_result(&rp->aio_sendcooked) != 0) { @@ -377,7 +365,6 @@ nni_req_sendcooked_cb(void *arg) nni_mtx_unlock(&req->mtx); } - static void nni_req_putq_cb(void *arg) { @@ -393,19 +380,18 @@ nni_req_putq_cb(void *arg) nni_pipe_recv(rp->pipe, &rp->aio_recv); } - static void nni_req_recv_cb(void *arg) { nni_req_pipe *rp = arg; - nni_msg *msg; + nni_msg * msg; if (nni_aio_result(&rp->aio_recv) != 0) { nni_pipe_stop(rp->pipe); return; } - msg = rp->aio_recv.a_msg; + msg = rp->aio_recv.a_msg; rp->aio_recv.a_msg = NULL; // We yank 4 bytes of body, and move them to the header. @@ -434,7 +420,6 @@ malformed: nni_pipe_stop(rp->pipe); } - static void nni_req_timeout(void *arg) { @@ -448,12 +433,11 @@ nni_req_timeout(void *arg) nni_mtx_unlock(&req->mtx); } - static void nni_req_resend(nni_req_sock *req) { nni_req_pipe *rp; - nni_msg *msg; + nni_msg * msg; // Note: This routine should be called with the socket lock held. // Also, this should only be called while handling cooked mode @@ -470,8 +454,8 @@ nni_req_resend(nni_req_sock *req) // mark that we have a message we want to resend, // in case something comes available. req->wantw = 1; - nni_timer_schedule(&req->timer, - nni_clock() + req->retry); + nni_timer_schedule( + &req->timer, nni_clock() + req->retry); return; } @@ -489,8 +473,8 @@ nni_req_resend(nni_req_sock *req) nni_list_remove(&req->readypipes, rp); nni_list_append(&req->busypipes, rp); - req->pendpipe = rp; - req->resend = nni_clock() + req->retry; + req->pendpipe = rp; + req->resend = nni_clock() + req->retry; rp->aio_sendcooked.a_msg = msg; // Note that because we were ready rather than busy, we @@ -501,12 +485,11 @@ nni_req_resend(nni_req_sock *req) } } - static nni_msg * nni_req_sock_sfilter(void *arg, nni_msg *msg) { nni_req_sock *req = arg; - uint32_t id; + uint32_t id; if (req->raw) { // No automatic retry, and the request ID must @@ -542,7 +525,7 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) req->reqmsg = msg; // Schedule for immediate send req->resend = NNI_TIME_ZERO; - req->wantw = 1; + req->wantw = 1; nni_req_resend(req); nni_mtx_unlock(&req->mtx); @@ -553,12 +536,11 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) return (NULL); } - static nni_msg * nni_req_sock_rfilter(void *arg, nni_msg *msg) { nni_req_sock *req = arg; - nni_msg *rmsg; + nni_msg * rmsg; if (req->raw) { // Pass it unmolested @@ -585,7 +567,7 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg) return (NULL); } - req->reqmsg = NULL; + req->reqmsg = NULL; req->pendpipe = NULL; nni_mtx_unlock(&req->mtx); @@ -595,31 +577,30 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg) return (msg); } - // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops nni_req_pipe_ops = { - .pipe_init = nni_req_pipe_init, - .pipe_fini = nni_req_pipe_fini, - .pipe_start = nni_req_pipe_start, - .pipe_stop = nni_req_pipe_stop, + .pipe_init = nni_req_pipe_init, + .pipe_fini = nni_req_pipe_fini, + .pipe_start = nni_req_pipe_start, + .pipe_stop = nni_req_pipe_stop, }; static nni_proto_sock_ops nni_req_sock_ops = { - .sock_init = nni_req_sock_init, - .sock_fini = nni_req_sock_fini, - .sock_close = nni_req_sock_close, - .sock_setopt = nni_req_sock_setopt, - .sock_getopt = nni_req_sock_getopt, - .sock_rfilter = nni_req_sock_rfilter, - .sock_sfilter = nni_req_sock_sfilter, + .sock_init = nni_req_sock_init, + .sock_fini = nni_req_sock_fini, + .sock_close = nni_req_sock_close, + .sock_setopt = nni_req_sock_setopt, + .sock_getopt = nni_req_sock_getopt, + .sock_rfilter = nni_req_sock_rfilter, + .sock_sfilter = nni_req_sock_sfilter, }; nni_proto nni_req_proto = { - .proto_self = NNG_PROTO_REQ, - .proto_peer = NNG_PROTO_REP, - .proto_name = "req", - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_self = NNG_PROTO_REQ, + .proto_peer = NNG_PROTO_REP, + .proto_name = "req", + .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_req_sock_ops, .proto_pipe_ops = &nni_req_pipe_ops, }; diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 73cc4792..6a784738 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -16,8 +16,8 @@ // the surveyor pattern. This is useful for building service discovery, or // voting algorithsm, for example. -typedef struct nni_resp_pipe nni_resp_pipe; -typedef struct nni_resp_sock nni_resp_sock; +typedef struct nni_resp_pipe nni_resp_pipe; +typedef struct nni_resp_sock nni_resp_sock; static void nni_resp_recv_cb(void *); static void nni_resp_putq_cb(void *); @@ -28,31 +28,30 @@ static void nni_resp_pipe_fini(void *); // An nni_resp_sock is our per-socket protocol private structure. struct nni_resp_sock { - nni_sock * nsock; - nni_msgq * urq; - nni_msgq * uwq; - int raw; - int ttl; - nni_idhash pipes; - char * btrace; - size_t btrace_len; - nni_aio aio_getq; - nni_mtx mtx; + nni_sock * nsock; + nni_msgq * urq; + nni_msgq * uwq; + int raw; + int ttl; + nni_idhash pipes; + char * btrace; + size_t btrace_len; + nni_aio aio_getq; + nni_mtx mtx; }; // An nni_resp_pipe is our per-pipe protocol private structure. struct nni_resp_pipe { - nni_pipe * npipe; - nni_resp_sock * psock; - uint32_t id; - nni_msgq * sendq; - nni_aio aio_getq; - nni_aio aio_putq; - nni_aio aio_send; - nni_aio aio_recv; + nni_pipe * npipe; + nni_resp_sock *psock; + uint32_t id; + nni_msgq * sendq; + nni_aio aio_getq; + nni_aio aio_putq; + nni_aio aio_send; + nni_aio aio_recv; }; - static void nni_resp_sock_fini(void *arg) { @@ -69,23 +68,22 @@ nni_resp_sock_fini(void *arg) } } - static int nni_resp_sock_init(void **pp, nni_sock *nsock) { nni_resp_sock *psock; - int rv; + int rv; if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } - psock->ttl = 8; // Per RFC - psock->nsock = nsock; - psock->raw = 0; - psock->btrace = NULL; + psock->ttl = 8; // Per RFC + psock->nsock = nsock; + psock->raw = 0; + psock->btrace = NULL; psock->btrace_len = 0; - psock->urq = nni_sock_recvq(nsock); - psock->uwq = nni_sock_sendq(nsock); + psock->urq = nni_sock_recvq(nsock); + psock->uwq = nni_sock_sendq(nsock); if (((rv = nni_idhash_init(&psock->pipes)) != 0) || ((rv = nni_mtx_init(&psock->mtx)) != 0)) { goto fail; @@ -104,7 +102,6 @@ fail: return (rv); } - static void nni_resp_sock_open(void *arg) { @@ -113,7 +110,6 @@ nni_resp_sock_open(void *arg) nni_msgq_aio_get(psock->uwq, &psock->aio_getq); } - static void nni_resp_sock_close(void *arg) { @@ -122,12 +118,11 @@ nni_resp_sock_close(void *arg) nni_aio_stop(&psock->aio_getq); } - static int nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock) { nni_resp_pipe *ppipe; - int rv; + int rv; if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); @@ -154,7 +149,7 @@ nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock) ppipe->npipe = npipe; ppipe->psock = psock; - *pp = ppipe; + *pp = ppipe; return (0); fail: @@ -162,7 +157,6 @@ fail: return (rv); } - static void nni_resp_pipe_fini(void *arg) { @@ -176,13 +170,12 @@ nni_resp_pipe_fini(void *arg) NNI_FREE_STRUCT(ppipe); } - static int nni_resp_pipe_start(void *arg) { nni_resp_pipe *ppipe = arg; nni_resp_sock *psock = ppipe->psock; - int rv; + int rv; ppipe->id = nni_pipe_id(ppipe->npipe); @@ -199,7 +192,6 @@ nni_resp_pipe_start(void *arg) return (rv); } - static void nni_resp_pipe_stop(void *arg) { @@ -220,7 +212,6 @@ nni_resp_pipe_stop(void *arg) nni_mtx_unlock(&psock->mtx); } - // nni_resp_sock_send watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate // destination pipe via a separate queue. This prevents a single bad @@ -230,16 +221,16 @@ void nni_resp_sock_getq_cb(void *arg) { nni_resp_sock *psock = arg; - nni_msg *msg; - uint8_t *header; - uint32_t id; + nni_msg * msg; + uint8_t * header; + uint32_t id; nni_resp_pipe *ppipe; - int rv; + int rv; if (nni_aio_result(&psock->aio_getq) != 0) { return; } - msg = psock->aio_getq.a_msg; + msg = psock->aio_getq.a_msg; psock->aio_getq.a_msg = NULL; // We yank the outgoing pipe id from the header @@ -269,7 +260,6 @@ nni_resp_sock_getq_cb(void *arg) nni_mtx_unlock(&psock->mtx); } - void nni_resp_getq_cb(void *arg) { @@ -286,7 +276,6 @@ nni_resp_getq_cb(void *arg) nni_pipe_send(ppipe->npipe, &ppipe->aio_send); } - void nni_resp_send_cb(void *arg) { @@ -302,17 +291,16 @@ nni_resp_send_cb(void *arg) nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq); } - static void nni_resp_recv_cb(void *arg) { nni_resp_pipe *ppipe = arg; nni_resp_sock *psock = ppipe->psock; - nni_msgq *urq; - nni_msg *msg; - uint8_t idbuf[4]; - int hops; - int rv; + nni_msgq * urq; + nni_msg * msg; + uint8_t idbuf[4]; + int hops; + int rv; if (nni_aio_result(&ppipe->aio_recv) != 0) { goto error; @@ -322,7 +310,7 @@ nni_resp_recv_cb(void *arg) NNI_PUT32(idbuf, ppipe->id); - msg = ppipe->aio_recv.a_msg; + msg = ppipe->aio_recv.a_msg; ppipe->aio_recv.a_msg = NULL; // Store the pipe id in the header, first thing. @@ -334,7 +322,7 @@ nni_resp_recv_cb(void *arg) // Move backtrace from body to header hops = 0; for (;;) { - int end = 0; + int end = 0; uint8_t *body; if (hops >= psock->ttl) { @@ -346,8 +334,8 @@ nni_resp_recv_cb(void *arg) goto error; } body = nni_msg_body(msg); - end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_append_header(msg, body, 4); + end = (body[0] & 0x80) ? 1 : 0; + rv = nni_msg_append_header(msg, body, 4); if (rv != 0) { nni_msg_free(msg); goto error; @@ -367,7 +355,6 @@ error: nni_pipe_stop(ppipe->npipe); } - static void nni_resp_putq_cb(void *arg) { @@ -382,13 +369,12 @@ nni_resp_putq_cb(void *arg) nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv); } - static int nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_resp_sock *psock = arg; - int rv; - int oldraw; + int rv; + int oldraw; switch (opt) { case NNG_OPT_MAXTTL: @@ -396,7 +382,7 @@ nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz) break; case NNG_OPT_RAW: oldraw = psock->raw; - rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); + rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); if (oldraw != psock->raw) { if (!psock->raw) { nni_sock_senderr(psock->nsock, 0); @@ -411,12 +397,11 @@ nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz) return (rv); } - static int nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_resp_sock *psock = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_MAXTTL: @@ -431,7 +416,6 @@ nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } - static nni_msg * nni_resp_sock_sfilter(void *arg, nni_msg *msg) { @@ -454,38 +438,38 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg) // drop anything else in the header... nni_msg_trunc_header(msg, nni_msg_header_len(msg)); - if (nni_msg_append_header(msg, psock->btrace, psock->btrace_len) != 0) { + if (nni_msg_append_header(msg, psock->btrace, psock->btrace_len) != + 0) { nni_free(psock->btrace, psock->btrace_len); - psock->btrace = NULL; + psock->btrace = NULL; psock->btrace_len = 0; nni_msg_free(msg); return (NULL); } nni_free(psock->btrace, psock->btrace_len); - psock->btrace = NULL; + psock->btrace = NULL; psock->btrace_len = 0; return (msg); } - static nni_msg * nni_resp_sock_rfilter(void *arg, nni_msg *msg) { nni_resp_sock *psock = arg; - char *header; - size_t len; + char * header; + size_t len; if (psock->raw) { return (msg); } nni_sock_senderr(psock->nsock, 0); - len = nni_msg_header_len(msg); + len = nni_msg_header_len(msg); header = nni_msg_header(msg); if (psock->btrace != NULL) { nni_free(psock->btrace, psock->btrace_len); - psock->btrace = NULL; + psock->btrace = NULL; psock->btrace_len = 0; } if ((psock->btrace = nni_alloc(len)) == NULL) { @@ -498,30 +482,29 @@ nni_resp_sock_rfilter(void *arg, nni_msg *msg) return (msg); } - static nni_proto_pipe_ops nni_resp_pipe_ops = { - .pipe_init = nni_resp_pipe_init, - .pipe_fini = nni_resp_pipe_fini, - .pipe_start = nni_resp_pipe_start, - .pipe_stop = nni_resp_pipe_stop, + .pipe_init = nni_resp_pipe_init, + .pipe_fini = nni_resp_pipe_fini, + .pipe_start = nni_resp_pipe_start, + .pipe_stop = nni_resp_pipe_stop, }; static nni_proto_sock_ops nni_resp_sock_ops = { - .sock_init = nni_resp_sock_init, - .sock_fini = nni_resp_sock_fini, - .sock_open = nni_resp_sock_open, - .sock_close = nni_resp_sock_close, - .sock_setopt = nni_resp_sock_setopt, - .sock_getopt = nni_resp_sock_getopt, - .sock_rfilter = nni_resp_sock_rfilter, - .sock_sfilter = nni_resp_sock_sfilter, + .sock_init = nni_resp_sock_init, + .sock_fini = nni_resp_sock_fini, + .sock_open = nni_resp_sock_open, + .sock_close = nni_resp_sock_close, + .sock_setopt = nni_resp_sock_setopt, + .sock_getopt = nni_resp_sock_getopt, + .sock_rfilter = nni_resp_sock_rfilter, + .sock_sfilter = nni_resp_sock_sfilter, }; nni_proto nni_respondent_proto = { - .proto_self = NNG_PROTO_RESPONDENT, - .proto_peer = NNG_PROTO_SURVEYOR, - .proto_name = "respondent", - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_self = NNG_PROTO_RESPONDENT, + .proto_peer = NNG_PROTO_SURVEYOR, + .proto_name = "respondent", + .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_resp_sock_ops, .proto_pipe_ops = &nni_resp_pipe_ops, }; diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 14d73028..e7d0f3ce 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -15,8 +15,8 @@ // Surveyor protocol. The SURVEYOR protocol is the "survey" side of the // survey pattern. This is useful for building service discovery, voting, etc. -typedef struct nni_surv_pipe nni_surv_pipe; -typedef struct nni_surv_sock nni_surv_sock; +typedef struct nni_surv_pipe nni_surv_pipe; +typedef struct nni_surv_sock nni_surv_sock; static void nni_surv_sock_getq_cb(void *); static void nni_surv_getq_cb(void *); @@ -27,31 +27,31 @@ static void nni_surv_timeout(void *); // An nni_surv_sock is our per-socket protocol private structure. struct nni_surv_sock { - nni_sock * nsock; - nni_duration survtime; - nni_time expire; - int raw; - int closing; - uint32_t nextid; // next id - uint8_t survid[4]; // outstanding request ID (big endian) - nni_list pipes; - nni_aio aio_getq; - nni_timer_node timer; - nni_msgq * uwq; - nni_msgq * urq; - nni_mtx mtx; + nni_sock * nsock; + nni_duration survtime; + nni_time expire; + int raw; + int closing; + uint32_t nextid; // next id + uint8_t survid[4]; // outstanding request ID (big endian) + nni_list pipes; + nni_aio aio_getq; + nni_timer_node timer; + nni_msgq * uwq; + nni_msgq * urq; + nni_mtx mtx; }; // An nni_surv_pipe is our per-pipe protocol private structure. struct nni_surv_pipe { - nni_pipe * npipe; - nni_surv_sock * psock; - nni_msgq * sendq; - nni_list_node node; - nni_aio aio_getq; - nni_aio aio_putq; - nni_aio aio_send; - nni_aio aio_recv; + nni_pipe * npipe; + nni_surv_sock *psock; + nni_msgq * sendq; + nni_list_node node; + nni_aio aio_getq; + nni_aio aio_putq; + nni_aio aio_send; + nni_aio aio_recv; }; static void @@ -64,12 +64,11 @@ nni_surv_sock_fini(void *arg) NNI_FREE_STRUCT(psock); } - static int nni_surv_sock_init(void **sp, nni_sock *nsock) { nni_surv_sock *psock; - int rv; + int rv; if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); @@ -84,13 +83,13 @@ nni_surv_sock_init(void **sp, nni_sock *nsock) NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node); nni_timer_init(&psock->timer, nni_surv_timeout, psock); - psock->nextid = nni_random(); - psock->nsock = nsock; - psock->raw = 0; + psock->nextid = nni_random(); + psock->nsock = nsock; + psock->raw = 0; psock->survtime = NNI_SECOND * 60; - psock->expire = NNI_TIME_ZERO; - psock->uwq = nni_sock_sendq(nsock); - psock->urq = nni_sock_recvq(nsock); + psock->expire = NNI_TIME_ZERO; + psock->uwq = nni_sock_sendq(nsock); + psock->urq = nni_sock_recvq(nsock); *sp = psock; nni_sock_recverr(nsock, NNG_ESTATE); @@ -101,7 +100,6 @@ fail: return (rv); } - static void nni_surv_sock_open(void *arg) { @@ -110,7 +108,6 @@ nni_surv_sock_open(void *arg) nni_msgq_aio_get(psock->uwq, &psock->aio_getq); } - static void nni_surv_sock_close(void *arg) { @@ -120,7 +117,6 @@ nni_surv_sock_close(void *arg) nni_aio_stop(&psock->aio_getq); } - static void nni_surv_pipe_fini(void *arg) { @@ -134,12 +130,11 @@ nni_surv_pipe_fini(void *arg) NNI_FREE_STRUCT(ppipe); } - static int nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock) { nni_surv_pipe *ppipe; - int rv; + int rv; if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); @@ -166,7 +161,7 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock) } ppipe->npipe = npipe; ppipe->psock = psock; - *pp = ppipe; + *pp = ppipe; return (0); failed: @@ -174,7 +169,6 @@ failed: return (rv); } - static int nni_surv_pipe_start(void *arg) { @@ -190,7 +184,6 @@ nni_surv_pipe_start(void *arg) return (0); } - static void nni_surv_pipe_stop(void *arg) { @@ -210,7 +203,6 @@ nni_surv_pipe_stop(void *arg) nni_mtx_unlock(&psock->mtx); } - static void nni_surv_getq_cb(void *arg) { @@ -227,7 +219,6 @@ nni_surv_getq_cb(void *arg) nni_pipe_send(ppipe->npipe, &ppipe->aio_send); } - static void nni_surv_send_cb(void *arg) { @@ -243,7 +234,6 @@ nni_surv_send_cb(void *arg) nni_msgq_aio_get(ppipe->psock->uwq, &ppipe->aio_getq); } - static void nni_surv_putq_cb(void *arg) { @@ -259,18 +249,17 @@ nni_surv_putq_cb(void *arg) nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv); } - static void nni_surv_recv_cb(void *arg) { nni_surv_pipe *ppipe = arg; - nni_msg *msg; + nni_msg * msg; if (nni_aio_result(&ppipe->aio_recv) != 0) { goto failed; } - msg = ppipe->aio_recv.a_msg; + msg = ppipe->aio_recv.a_msg; ppipe->aio_recv.a_msg = NULL; // We yank 4 bytes of body, and move them to the header. @@ -297,13 +286,12 @@ failed: nni_pipe_stop(ppipe->npipe); } - static int nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_surv_sock *psock = arg; - int rv; - int oldraw; + int rv; + int oldraw; switch (opt) { case NNG_OPT_SURVEYTIME: @@ -311,14 +299,14 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz) break; case NNG_OPT_RAW: oldraw = psock->raw; - rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); + rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); if (oldraw != psock->raw) { if (psock->raw) { nni_sock_recverr(psock->nsock, 0); } else { nni_sock_recverr(psock->nsock, NNG_ESTATE); } - memset(psock->survid, 0, sizeof (psock->survid)); + memset(psock->survid, 0, sizeof(psock->survid)); nni_timer_cancel(&psock->timer); } break; @@ -328,12 +316,11 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz) return (rv); } - static int nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_surv_sock *psock = arg; - int rv; + int rv; switch (opt) { case NNG_OPT_SURVEYTIME: @@ -348,20 +335,19 @@ nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp) return (rv); } - static void nni_surv_sock_getq_cb(void *arg) { nni_surv_sock *psock = arg; nni_surv_pipe *ppipe; nni_surv_pipe *last; - nni_msg *msg, *dup; + nni_msg * msg, *dup; if (nni_aio_result(&psock->aio_getq) != 0) { // Should be NNG_ECLOSED. return; } - msg = psock->aio_getq.a_msg; + msg = psock->aio_getq.a_msg; psock->aio_getq.a_msg = NULL; nni_mtx_lock(&psock->mtx); @@ -386,25 +372,23 @@ nni_surv_sock_getq_cb(void *arg) } } - static void nni_surv_timeout(void *arg) { nni_surv_sock *psock = arg; nni_sock_lock(psock->nsock); - memset(psock->survid, 0, sizeof (psock->survid)); + memset(psock->survid, 0, sizeof(psock->survid)); nni_sock_recverr(psock->nsock, NNG_ESTATE); nni_msgq_set_get_error(psock->urq, NNG_ETIMEDOUT); nni_sock_unlock(psock->nsock); } - static nni_msg * nni_surv_sock_sfilter(void *arg, nni_msg *msg) { nni_surv_sock *psock = arg; - uint32_t id; + uint32_t id; if (psock->raw) { // No automatic retry, and the request ID must @@ -434,12 +418,11 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg) // Clear the error condition. nni_sock_recverr(psock->nsock, 0); - //nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0); + // nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0); return (msg); } - static nni_msg * nni_surv_sock_rfilter(void *arg, nni_msg *msg) { @@ -466,32 +449,31 @@ nni_surv_sock_rfilter(void *arg, nni_msg *msg) return (msg); } - static nni_proto_pipe_ops nni_surv_pipe_ops = { - .pipe_init = nni_surv_pipe_init, - .pipe_fini = nni_surv_pipe_fini, - .pipe_start = nni_surv_pipe_start, - .pipe_stop = nni_surv_pipe_stop, + .pipe_init = nni_surv_pipe_init, + .pipe_fini = nni_surv_pipe_fini, + .pipe_start = nni_surv_pipe_start, + .pipe_stop = nni_surv_pipe_stop, }; static nni_proto_sock_ops nni_surv_sock_ops = { - .sock_init = nni_surv_sock_init, - .sock_fini = nni_surv_sock_fini, - .sock_open = nni_surv_sock_open, - .sock_close = nni_surv_sock_close, - .sock_setopt = nni_surv_sock_setopt, - .sock_getopt = nni_surv_sock_getopt, - .sock_rfilter = nni_surv_sock_rfilter, - .sock_sfilter = nni_surv_sock_sfilter, + .sock_init = nni_surv_sock_init, + .sock_fini = nni_surv_sock_fini, + .sock_open = nni_surv_sock_open, + .sock_close = nni_surv_sock_close, + .sock_setopt = nni_surv_sock_setopt, + .sock_getopt = nni_surv_sock_getopt, + .sock_rfilter = nni_surv_sock_rfilter, + .sock_sfilter = nni_surv_sock_sfilter, }; // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. nni_proto nni_surveyor_proto = { - .proto_self = NNG_PROTO_SURVEYOR, - .proto_peer = NNG_PROTO_RESPONDENT, - .proto_name = "surveyor", - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_self = NNG_PROTO_SURVEYOR, + .proto_peer = NNG_PROTO_RESPONDENT, + .proto_name = "surveyor", + .proto_flags = NNI_PROTO_FLAG_SNDRCV, .proto_sock_ops = &nni_surv_sock_ops, .proto_pipe_ops = &nni_surv_pipe_ops, }; -- cgit v1.2.3-70-g09d2