aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep')
-rw-r--r--src/protocol/reqrep/req.c25
1 files changed, 17 insertions, 8 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index abd6e25c..d4098526 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -32,6 +32,7 @@ struct nni_req_sock {
int raw;
int closing;
nni_msg * reqmsg;
+ nni_msg * retrymsg;
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
};
@@ -69,6 +70,7 @@ nni_req_init(void **reqp, nni_sock *sock)
req->retry = NNI_SECOND * 60;
req->sock = sock;
req->reqmsg = NULL;
+ req->retrymsg = NULL;
req->raw = 0;
req->resend = NNI_TIME_ZERO;
@@ -106,6 +108,9 @@ nni_req_fini(void *arg)
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
}
+ if (req->retrymsg != NULL) {
+ nni_msg_free(req->retrymsg);
+ }
NNI_FREE_STRUCT(req);
}
@@ -167,9 +172,16 @@ nni_req_pipe_send(void *arg)
int rv;
for (;;) {
- rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose);
- if (rv != 0) {
- break;
+ nni_mtx_lock(&req->mx);
+ if ((msg = req->retrymsg) != NULL) {
+ req->retrymsg = NULL;
+ }
+ nni_mtx_unlock(&req->mx);
+ if (msg == NULL) {
+ rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose);
+ if (rv != 0) {
+ break;
+ }
}
rv = nni_pipe_send(pipe, msg);
if (rv != 0) {
@@ -294,12 +306,9 @@ nni_req_resender(void *arg)
}
rv = nni_cv_until(&req->cv, req->resend);
if ((rv == NNG_ETIMEDOUT) && (req->reqmsg != NULL)) {
- nni_msg *dup;
// XXX: check for final timeout on this?
- if (nni_msg_dup(&dup, req->reqmsg) != 0) {
- if (nni_msgq_putback(req->uwq, dup) != 0) {
- nni_msg_free(dup);
- }
+ if (req->retrymsg == NULL) {
+ nni_msg_dup(&req->retrymsg, req->reqmsg);
}
req->resend = nni_clock() + req->retry;
}