aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/survey/survey.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/survey/survey.c')
-rw-r--r--src/protocol/survey/survey.c138
1 files changed, 60 insertions, 78 deletions
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 14d73028..e7d0f3ce 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -15,8 +15,8 @@
// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
// survey pattern. This is useful for building service discovery, voting, etc.
-typedef struct nni_surv_pipe nni_surv_pipe;
-typedef struct nni_surv_sock nni_surv_sock;
+typedef struct nni_surv_pipe nni_surv_pipe;
+typedef struct nni_surv_sock nni_surv_sock;
static void nni_surv_sock_getq_cb(void *);
static void nni_surv_getq_cb(void *);
@@ -27,31 +27,31 @@ static void nni_surv_timeout(void *);
// An nni_surv_sock is our per-socket protocol private structure.
struct nni_surv_sock {
- nni_sock * nsock;
- nni_duration survtime;
- nni_time expire;
- int raw;
- int closing;
- uint32_t nextid; // next id
- uint8_t survid[4]; // outstanding request ID (big endian)
- nni_list pipes;
- nni_aio aio_getq;
- nni_timer_node timer;
- nni_msgq * uwq;
- nni_msgq * urq;
- nni_mtx mtx;
+ nni_sock * nsock;
+ nni_duration survtime;
+ nni_time expire;
+ int raw;
+ int closing;
+ uint32_t nextid; // next id
+ uint8_t survid[4]; // outstanding request ID (big endian)
+ nni_list pipes;
+ nni_aio aio_getq;
+ nni_timer_node timer;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ nni_mtx mtx;
};
// An nni_surv_pipe is our per-pipe protocol private structure.
struct nni_surv_pipe {
- nni_pipe * npipe;
- nni_surv_sock * psock;
- nni_msgq * sendq;
- nni_list_node node;
- nni_aio aio_getq;
- nni_aio aio_putq;
- nni_aio aio_send;
- nni_aio aio_recv;
+ nni_pipe * npipe;
+ nni_surv_sock *psock;
+ nni_msgq * sendq;
+ nni_list_node node;
+ nni_aio aio_getq;
+ nni_aio aio_putq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
};
static void
@@ -64,12 +64,11 @@ nni_surv_sock_fini(void *arg)
NNI_FREE_STRUCT(psock);
}
-
static int
nni_surv_sock_init(void **sp, nni_sock *nsock)
{
nni_surv_sock *psock;
- int rv;
+ int rv;
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
@@ -84,13 +83,13 @@ nni_surv_sock_init(void **sp, nni_sock *nsock)
NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node);
nni_timer_init(&psock->timer, nni_surv_timeout, psock);
- psock->nextid = nni_random();
- psock->nsock = nsock;
- psock->raw = 0;
+ psock->nextid = nni_random();
+ psock->nsock = nsock;
+ psock->raw = 0;
psock->survtime = NNI_SECOND * 60;
- psock->expire = NNI_TIME_ZERO;
- psock->uwq = nni_sock_sendq(nsock);
- psock->urq = nni_sock_recvq(nsock);
+ psock->expire = NNI_TIME_ZERO;
+ psock->uwq = nni_sock_sendq(nsock);
+ psock->urq = nni_sock_recvq(nsock);
*sp = psock;
nni_sock_recverr(nsock, NNG_ESTATE);
@@ -101,7 +100,6 @@ fail:
return (rv);
}
-
static void
nni_surv_sock_open(void *arg)
{
@@ -110,7 +108,6 @@ nni_surv_sock_open(void *arg)
nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
}
-
static void
nni_surv_sock_close(void *arg)
{
@@ -120,7 +117,6 @@ nni_surv_sock_close(void *arg)
nni_aio_stop(&psock->aio_getq);
}
-
static void
nni_surv_pipe_fini(void *arg)
{
@@ -134,12 +130,11 @@ nni_surv_pipe_fini(void *arg)
NNI_FREE_STRUCT(ppipe);
}
-
static int
nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
nni_surv_pipe *ppipe;
- int rv;
+ int rv;
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
@@ -166,7 +161,7 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
}
ppipe->npipe = npipe;
ppipe->psock = psock;
- *pp = ppipe;
+ *pp = ppipe;
return (0);
failed:
@@ -174,7 +169,6 @@ failed:
return (rv);
}
-
static int
nni_surv_pipe_start(void *arg)
{
@@ -190,7 +184,6 @@ nni_surv_pipe_start(void *arg)
return (0);
}
-
static void
nni_surv_pipe_stop(void *arg)
{
@@ -210,7 +203,6 @@ nni_surv_pipe_stop(void *arg)
nni_mtx_unlock(&psock->mtx);
}
-
static void
nni_surv_getq_cb(void *arg)
{
@@ -227,7 +219,6 @@ nni_surv_getq_cb(void *arg)
nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
}
-
static void
nni_surv_send_cb(void *arg)
{
@@ -243,7 +234,6 @@ nni_surv_send_cb(void *arg)
nni_msgq_aio_get(ppipe->psock->uwq, &ppipe->aio_getq);
}
-
static void
nni_surv_putq_cb(void *arg)
{
@@ -259,18 +249,17 @@ nni_surv_putq_cb(void *arg)
nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
}
-
static void
nni_surv_recv_cb(void *arg)
{
nni_surv_pipe *ppipe = arg;
- nni_msg *msg;
+ nni_msg * msg;
if (nni_aio_result(&ppipe->aio_recv) != 0) {
goto failed;
}
- msg = ppipe->aio_recv.a_msg;
+ msg = ppipe->aio_recv.a_msg;
ppipe->aio_recv.a_msg = NULL;
// We yank 4 bytes of body, and move them to the header.
@@ -297,13 +286,12 @@ failed:
nni_pipe_stop(ppipe->npipe);
}
-
static int
nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_surv_sock *psock = arg;
- int rv;
- int oldraw;
+ int rv;
+ int oldraw;
switch (opt) {
case NNG_OPT_SURVEYTIME:
@@ -311,14 +299,14 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
break;
case NNG_OPT_RAW:
oldraw = psock->raw;
- rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
if (oldraw != psock->raw) {
if (psock->raw) {
nni_sock_recverr(psock->nsock, 0);
} else {
nni_sock_recverr(psock->nsock, NNG_ESTATE);
}
- memset(psock->survid, 0, sizeof (psock->survid));
+ memset(psock->survid, 0, sizeof(psock->survid));
nni_timer_cancel(&psock->timer);
}
break;
@@ -328,12 +316,11 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_surv_sock *psock = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_SURVEYTIME:
@@ -348,20 +335,19 @@ nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
static void
nni_surv_sock_getq_cb(void *arg)
{
nni_surv_sock *psock = arg;
nni_surv_pipe *ppipe;
nni_surv_pipe *last;
- nni_msg *msg, *dup;
+ nni_msg * msg, *dup;
if (nni_aio_result(&psock->aio_getq) != 0) {
// Should be NNG_ECLOSED.
return;
}
- msg = psock->aio_getq.a_msg;
+ msg = psock->aio_getq.a_msg;
psock->aio_getq.a_msg = NULL;
nni_mtx_lock(&psock->mtx);
@@ -386,25 +372,23 @@ nni_surv_sock_getq_cb(void *arg)
}
}
-
static void
nni_surv_timeout(void *arg)
{
nni_surv_sock *psock = arg;
nni_sock_lock(psock->nsock);
- memset(psock->survid, 0, sizeof (psock->survid));
+ memset(psock->survid, 0, sizeof(psock->survid));
nni_sock_recverr(psock->nsock, NNG_ESTATE);
nni_msgq_set_get_error(psock->urq, NNG_ETIMEDOUT);
nni_sock_unlock(psock->nsock);
}
-
static nni_msg *
nni_surv_sock_sfilter(void *arg, nni_msg *msg)
{
nni_surv_sock *psock = arg;
- uint32_t id;
+ uint32_t id;
if (psock->raw) {
// No automatic retry, and the request ID must
@@ -434,12 +418,11 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg)
// Clear the error condition.
nni_sock_recverr(psock->nsock, 0);
- //nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0);
+ // nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0);
return (msg);
}
-
static nni_msg *
nni_surv_sock_rfilter(void *arg, nni_msg *msg)
{
@@ -466,32 +449,31 @@ nni_surv_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-
static nni_proto_pipe_ops nni_surv_pipe_ops = {
- .pipe_init = nni_surv_pipe_init,
- .pipe_fini = nni_surv_pipe_fini,
- .pipe_start = nni_surv_pipe_start,
- .pipe_stop = nni_surv_pipe_stop,
+ .pipe_init = nni_surv_pipe_init,
+ .pipe_fini = nni_surv_pipe_fini,
+ .pipe_start = nni_surv_pipe_start,
+ .pipe_stop = nni_surv_pipe_stop,
};
static nni_proto_sock_ops nni_surv_sock_ops = {
- .sock_init = nni_surv_sock_init,
- .sock_fini = nni_surv_sock_fini,
- .sock_open = nni_surv_sock_open,
- .sock_close = nni_surv_sock_close,
- .sock_setopt = nni_surv_sock_setopt,
- .sock_getopt = nni_surv_sock_getopt,
- .sock_rfilter = nni_surv_sock_rfilter,
- .sock_sfilter = nni_surv_sock_sfilter,
+ .sock_init = nni_surv_sock_init,
+ .sock_fini = nni_surv_sock_fini,
+ .sock_open = nni_surv_sock_open,
+ .sock_close = nni_surv_sock_close,
+ .sock_setopt = nni_surv_sock_setopt,
+ .sock_getopt = nni_surv_sock_getopt,
+ .sock_rfilter = nni_surv_sock_rfilter,
+ .sock_sfilter = nni_surv_sock_sfilter,
};
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
nni_proto nni_surveyor_proto = {
- .proto_self = NNG_PROTO_SURVEYOR,
- .proto_peer = NNG_PROTO_RESPONDENT,
- .proto_name = "surveyor",
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_self = NNG_PROTO_SURVEYOR,
+ .proto_peer = NNG_PROTO_RESPONDENT,
+ .proto_name = "surveyor",
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_surv_sock_ops,
.proto_pipe_ops = &nni_surv_pipe_ops,
};