aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/reqrep/req.c2
-rw-r--r--src/protocol/survey/respond.c1
-rw-r--r--src/protocol/survey/survey.c13
3 files changed, 8 insertions, 8 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 5b9671f8..73ef4969 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -79,8 +79,6 @@ nni_req_sock_close(void *arg)
{
nni_req_sock *req = arg;
- // Shut down the resender. We request it to exit by clearing
- // its old value, then kick it.
req->closing = 1;
nni_cv_wake(&req->cv);
}
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 2891edc1..2359fef1 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -228,7 +228,6 @@ again:
if (rv != 0) {
break;
}
-
// Store the pipe id in the header, first thing.
rv = nni_msg_append_header(msg, idbuf, 4);
if (rv != 0) {
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 9def9292..3870a9ef 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -36,8 +36,8 @@ struct nni_surv_pipe {
nni_pipe * npipe;
nni_surv_sock * psock;
nni_msgq * sendq;
- int sigclose;
nni_list_node node;
+ int sigclose;
};
static int
@@ -71,8 +71,7 @@ nni_surv_sock_close(void *arg)
{
nni_surv_sock *psock = arg;
- // Shut down the resender. We request it to exit by clearing
- // its old value, then kick it.
+ // Shut down the resender.
psock->closing = 1;
nni_cv_wake(&psock->cv);
}
@@ -103,8 +102,8 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (rv);
}
ppipe->npipe = npipe;
- ppipe->sigclose = 0;
ppipe->psock = psock;
+ ppipe->sigclose = 0;
*pp = ppipe;
return (0);
}
@@ -146,7 +145,7 @@ nni_surv_pipe_sender(void *arg)
nni_surv_pipe *ppipe = arg;
nni_surv_sock *psock = ppipe->psock;
nni_pipe *npipe = ppipe->npipe;
- nni_msgq *uwq = nni_sock_sendq(psock->nsock);
+ nni_msgq *uwq = ppipe->sendq;
nni_msgq *urq = nni_sock_recvq(psock->nsock);
nni_mtx *mx = nni_sock_mtx(psock->nsock);
nni_msg *msg;
@@ -206,6 +205,7 @@ nni_surv_pipe_receiver(void *arg)
}
}
nni_msgq_signal(uwq, &ppipe->sigclose);
+ nni_msgq_set_error(ppipe->sendq, NNG_ECLOSED);
nni_pipe_close(npipe);
}
@@ -308,6 +308,7 @@ nni_surv_sock_timeout(void *arg)
{
nni_surv_sock *psock = arg;
nni_mtx *mx = nni_sock_mtx(psock->nsock);
+ nni_msgq *urq = nni_sock_recvq(psock->nsock);
nni_mtx_lock(mx);
for (;;) {
@@ -322,6 +323,7 @@ nni_surv_sock_timeout(void *arg)
// so zeroing means that nothing can match.
memset(psock->survid, 0, sizeof (psock->survid));
nni_sock_recverr(psock->nsock, NNG_ESTATE);
+ nni_msgq_set_get_error(urq, NNG_ETIMEDOUT);
}
nni_cv_until(&psock->cv, psock->expire);
}
@@ -362,6 +364,7 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg)
// Clear the error condition.
nni_sock_recverr(psock->nsock, 0);
+ nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0);
return (msg);
}