aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/survey/respond.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/survey/respond.c')
-rw-r--r--src/protocol/survey/respond.c189
1 files changed, 101 insertions, 88 deletions
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 3b23dff8..2891edc1 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -21,7 +21,7 @@ typedef struct nni_resp_sock nni_resp_sock;
// An nni_resp_sock is our per-socket protocol private structure.
struct nni_resp_sock {
- nni_sock * sock;
+ nni_sock * nsock;
int raw;
int ttl;
nni_idhash * pipes;
@@ -31,33 +31,33 @@ struct nni_resp_sock {
// An nni_resp_pipe is our per-pipe protocol private structure.
struct nni_resp_pipe {
- nni_pipe * pipe;
- nni_resp_sock * resp;
+ nni_pipe * npipe;
+ nni_resp_sock * psock;
nni_msgq * sendq;
int sigclose;
};
static int
-nni_resp_sock_init(void **respp, nni_sock *sock)
+nni_resp_sock_init(void **pp, nni_sock *nsock)
{
- nni_resp_sock *resp;
+ nni_resp_sock *psock;
int rv;
- if ((resp = NNI_ALLOC_STRUCT(resp)) == NULL) {
+ if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
}
- resp->ttl = 8; // Per RFC
- resp->sock = sock;
- resp->raw = 0;
- resp->btrace = NULL;
- resp->btrace_len = 0;
- if ((rv = nni_idhash_create(&resp->pipes)) != 0) {
- NNI_FREE_STRUCT(resp);
+ psock->ttl = 8; // Per RFC
+ psock->nsock = nsock;
+ psock->raw = 0;
+ psock->btrace = NULL;
+ psock->btrace_len = 0;
+ if ((rv = nni_idhash_create(&psock->pipes)) != 0) {
+ NNI_FREE_STRUCT(psock);
return (rv);
}
- *respp = resp;
- nni_sock_senderr(sock, NNG_ESTATE);
+ *pp = psock;
+ nni_sock_senderr(nsock, NNG_ESTATE);
return (0);
}
@@ -65,33 +65,33 @@ nni_resp_sock_init(void **respp, nni_sock *sock)
static void
nni_resp_sock_fini(void *arg)
{
- nni_resp_sock *resp = arg;
+ nni_resp_sock *psock = arg;
- nni_idhash_destroy(resp->pipes);
- if (resp->btrace != NULL) {
- nni_free(resp->btrace, resp->btrace_len);
+ nni_idhash_destroy(psock->pipes);
+ if (psock->btrace != NULL) {
+ nni_free(psock->btrace, psock->btrace_len);
}
- NNI_FREE_STRUCT(resp);
+ NNI_FREE_STRUCT(psock);
}
static int
-nni_resp_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
+nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
- nni_resp_pipe *rp;
+ nni_resp_pipe *ppipe;
int rv;
- if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) {
- NNI_FREE_STRUCT(rp);
+ if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) {
+ NNI_FREE_STRUCT(ppipe);
return (rv);
}
- rp->pipe = pipe;
- rp->resp = rsock;
- rp->sigclose = 0;
- *rpp = rp;
+ ppipe->npipe = npipe;
+ ppipe->psock = psock;
+ ppipe->sigclose = 0;
+ *pp = ppipe;
return (0);
}
@@ -99,30 +99,32 @@ nni_resp_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
static void
nni_resp_pipe_fini(void *arg)
{
- nni_resp_pipe *rp = arg;
+ nni_resp_pipe *ppipe = arg;
- nni_msgq_fini(rp->sendq);
- NNI_FREE_STRUCT(rp);
+ nni_msgq_fini(ppipe->sendq);
+ NNI_FREE_STRUCT(ppipe);
}
static int
nni_resp_pipe_add(void *arg)
{
- nni_resp_pipe *rp = arg;
- nni_resp_sock *resp = rp->resp;
+ nni_resp_pipe *ppipe = arg;
+ nni_resp_sock *psock = ppipe->psock;
+ int rv;
- return (nni_idhash_insert(resp->pipes, nni_pipe_id(rp->pipe), rp));
+ rv = nni_idhash_insert(psock->pipes, nni_pipe_id(ppipe->npipe), ppipe);
+ return (rv);
}
static void
nni_resp_pipe_rem(void *arg)
{
- nni_resp_pipe *rp = arg;
- nni_resp_sock *resp = rp->resp;
+ nni_resp_pipe *ppipe = arg;
+ nni_resp_sock *psock = ppipe->psock;
- nni_idhash_remove(resp->pipes, nni_pipe_id(rp->pipe));
+ nni_idhash_remove(psock->pipes, nni_pipe_id(ppipe->npipe));
}
@@ -133,15 +135,15 @@ nni_resp_pipe_rem(void *arg)
static void
nni_resp_sock_send(void *arg)
{
- nni_resp_sock *resp = arg;
- nni_msgq *uwq = nni_sock_sendq(resp->sock);
- nni_mtx *mx = nni_sock_mtx(resp->sock);
+ nni_resp_sock *psock = arg;
+ nni_msgq *uwq = nni_sock_sendq(psock->nsock);
+ nni_mtx *mx = nni_sock_mtx(psock->nsock);
nni_msg *msg;
for (;;) {
uint8_t *header;
uint32_t id;
- nni_resp_pipe *rp;
+ nni_resp_pipe *ppipe;
int rv;
if ((rv = nni_msgq_get(uwq, &msg)) != 0) {
@@ -157,13 +159,13 @@ nni_resp_sock_send(void *arg)
nni_msg_trim_header(msg, 4);
nni_mtx_lock(mx);
- if (nni_idhash_find(resp->pipes, id, (void **) &rp) != 0) {
+ if (nni_idhash_find(psock->pipes, id, (void **) &ppipe) != 0) {
nni_mtx_unlock(mx);
nni_msg_free(msg);
continue;
}
// Try a non-blocking put to the lower writer.
- rv = nni_msgq_put_until(rp->sendq, msg, NNI_TIME_ZERO);
+ rv = nni_msgq_put_until(ppipe->sendq, msg, NNI_TIME_ZERO);
if (rv != 0) {
// message queue is full, we have no choice but
// to drop it. This should not happen under normal
@@ -178,39 +180,41 @@ nni_resp_sock_send(void *arg)
static void
nni_resp_pipe_send(void *arg)
{
- nni_resp_pipe *rp = arg;
- nni_resp_sock *resp = rp->resp;
- nni_msgq *sendq = rp->sendq;
+ nni_resp_pipe *ppipe = arg;
+ nni_resp_sock *psock = ppipe->psock;
+ nni_pipe *npipe = ppipe->npipe;
+ nni_msgq *sendq = ppipe->sendq;
nni_msg *msg;
int rv;
for (;;) {
- rv = nni_msgq_get_sig(sendq, &msg, &rp->sigclose);
+ rv = nni_msgq_get_sig(sendq, &msg, &ppipe->sigclose);
if (rv != 0) {
break;
}
- rv = nni_pipe_send(rp->pipe, msg);
+ rv = nni_pipe_send(npipe, msg);
if (rv != 0) {
nni_msg_free(msg);
break;
}
}
- nni_msgq_signal(nni_sock_recvq(resp->sock), &rp->sigclose);
- nni_pipe_close(rp->pipe);
+ nni_msgq_signal(nni_sock_recvq(psock->nsock), &ppipe->sigclose);
+ nni_pipe_close(npipe);
}
static void
nni_resp_pipe_recv(void *arg)
{
- nni_resp_pipe *rp = arg;
- nni_resp_sock *resp = rp->resp;
- nni_msgq *urq = nni_sock_recvq(resp->sock);
+ nni_resp_pipe *ppipe = arg;
+ nni_resp_sock *psock = ppipe->psock;
+ nni_msgq *urq = nni_sock_recvq(psock->nsock);
+ nni_pipe *npipe = ppipe->npipe;
nni_msg *msg;
int rv;
uint8_t idbuf[4];
- uint32_t id = nni_pipe_id(rp->pipe);
+ uint32_t id = nni_pipe_id(npipe);
NNI_PUT32(idbuf, id);
@@ -220,7 +224,7 @@ nni_resp_pipe_recv(void *arg)
int hops;
again:
- rv = nni_pipe_recv(rp->pipe, &msg);
+ rv = nni_pipe_recv(npipe, &msg);
if (rv != 0) {
break;
}
@@ -236,7 +240,7 @@ again:
hops = 0;
for (;;) {
int end = 0;
- if (hops >= resp->ttl) {
+ if (hops >= psock->ttl) {
nni_msg_free(msg);
goto again;
}
@@ -258,30 +262,39 @@ again:
}
// Now send it up.
- rv = nni_msgq_put_sig(urq, msg, &rp->sigclose);
+ rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose);
if (rv != 0) {
nni_msg_free(msg);
break;
}
}
- nni_msgq_signal(nni_sock_sendq(resp->sock), &rp->sigclose);
- nni_msgq_signal(rp->sendq, &rp->sigclose);
- nni_pipe_close(rp->pipe);
+ nni_msgq_signal(nni_sock_sendq(psock->nsock), &ppipe->sigclose);
+ nni_msgq_signal(ppipe->sendq, &ppipe->sigclose);
+ nni_pipe_close(npipe);
}
static int
nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_resp_sock *resp = arg;
+ nni_resp_sock *psock = arg;
int rv;
+ int oldraw;
switch (opt) {
case NNG_OPT_MAXTTL:
- rv = nni_setopt_int(&resp->ttl, buf, sz, 1, 255);
+ rv = nni_setopt_int(&psock->ttl, buf, sz, 1, 255);
break;
case NNG_OPT_RAW:
- rv = nni_setopt_int(&resp->raw, buf, sz, 0, 1);
+ oldraw = psock->raw;
+ rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
+ if (oldraw != psock->raw) {
+ if (!psock->raw) {
+ nni_sock_senderr(psock->nsock, 0);
+ } else {
+ nni_sock_senderr(psock->nsock, NNG_ESTATE);
+ }
+ }
break;
default:
rv = NNG_ENOTSUP;
@@ -293,15 +306,15 @@ nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
static int
nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_resp_sock *resp = arg;
+ nni_resp_sock *psock = arg;
int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
- rv = nni_getopt_int(&resp->ttl, buf, szp);
+ rv = nni_getopt_int(&psock->ttl, buf, szp);
break;
case NNG_OPT_RAW:
- rv = nni_getopt_int(&resp->raw, buf, szp);
+ rv = nni_getopt_int(&psock->raw, buf, szp);
break;
default:
rv = NNG_ENOTSUP;
@@ -313,19 +326,19 @@ nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static nni_msg *
nni_resp_sock_sfilter(void *arg, nni_msg *msg)
{
- nni_resp_sock *resp = arg;
+ nni_resp_sock *psock = arg;
size_t len;
- if (resp->raw) {
+ if (psock->raw) {
return (msg);
}
// Cannot send again until a receive is done...
- nni_sock_senderr(resp->sock, NNG_ESTATE);
+ nni_sock_senderr(psock->nsock, NNG_ESTATE);
// If we have a stored backtrace, append it to the header...
// if we don't have a backtrace, discard the message.
- if (resp->btrace == NULL) {
+ if (psock->btrace == NULL) {
nni_msg_free(msg);
return (NULL);
}
@@ -333,17 +346,17 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg)
// drop anything else in the header...
nni_msg_trunc_header(msg, nni_msg_header_len(msg));
- if (nni_msg_append_header(msg, resp->btrace, resp->btrace_len) != 0) {
- nni_free(resp->btrace, resp->btrace_len);
- resp->btrace = NULL;
- resp->btrace_len = 0;
+ if (nni_msg_append_header(msg, psock->btrace, psock->btrace_len) != 0) {
+ nni_free(psock->btrace, psock->btrace_len);
+ psock->btrace = NULL;
+ psock->btrace_len = 0;
nni_msg_free(msg);
return (NULL);
}
- nni_free(resp->btrace, resp->btrace_len);
- resp->btrace = NULL;
- resp->btrace_len = 0;
+ nni_free(psock->btrace, psock->btrace_len);
+ psock->btrace = NULL;
+ psock->btrace_len = 0;
return (msg);
}
@@ -351,28 +364,28 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg)
static nni_msg *
nni_resp_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_resp_sock *resp = arg;
+ nni_resp_sock *psock = arg;
char *header;
size_t len;
- if (resp->raw) {
+ if (psock->raw) {
return (msg);
}
- nni_sock_senderr(resp->sock, 0);
+ nni_sock_senderr(psock->nsock, 0);
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
- if (resp->btrace != NULL) {
- nni_free(resp->btrace, resp->btrace_len);
- resp->btrace = NULL;
- resp->btrace_len = 0;
+ if (psock->btrace != NULL) {
+ nni_free(psock->btrace, psock->btrace_len);
+ psock->btrace = NULL;
+ psock->btrace_len = 0;
}
- if ((resp->btrace = nni_alloc(len)) == NULL) {
+ if ((psock->btrace = nni_alloc(len)) == NULL) {
nni_msg_free(msg);
return (NULL);
}
- resp->btrace_len = len;
- memcpy(resp->btrace, header, len);
+ psock->btrace_len = len;
+ memcpy(psock->btrace, header, len);
nni_msg_trunc_header(msg, len);
return (msg);
}