aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/protocol/reqrep/req.c25
-rw-r--r--tests/reqrep.c53
2 files changed, 70 insertions, 8 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index abd6e25c..d4098526 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -32,6 +32,7 @@ struct nni_req_sock {
int raw;
int closing;
nni_msg * reqmsg;
+ nni_msg * retrymsg;
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
};
@@ -69,6 +70,7 @@ nni_req_init(void **reqp, nni_sock *sock)
req->retry = NNI_SECOND * 60;
req->sock = sock;
req->reqmsg = NULL;
+ req->retrymsg = NULL;
req->raw = 0;
req->resend = NNI_TIME_ZERO;
@@ -106,6 +108,9 @@ nni_req_fini(void *arg)
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
}
+ if (req->retrymsg != NULL) {
+ nni_msg_free(req->retrymsg);
+ }
NNI_FREE_STRUCT(req);
}
@@ -167,9 +172,16 @@ nni_req_pipe_send(void *arg)
int rv;
for (;;) {
- rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose);
- if (rv != 0) {
- break;
+ nni_mtx_lock(&req->mx);
+ if ((msg = req->retrymsg) != NULL) {
+ req->retrymsg = NULL;
+ }
+ nni_mtx_unlock(&req->mx);
+ if (msg == NULL) {
+ rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose);
+ if (rv != 0) {
+ break;
+ }
}
rv = nni_pipe_send(pipe, msg);
if (rv != 0) {
@@ -294,12 +306,9 @@ nni_req_resender(void *arg)
}
rv = nni_cv_until(&req->cv, req->resend);
if ((rv == NNG_ETIMEDOUT) && (req->reqmsg != NULL)) {
- nni_msg *dup;
// XXX: check for final timeout on this?
- if (nni_msg_dup(&dup, req->reqmsg) != 0) {
- if (nni_msgq_putback(req->uwq, dup) != 0) {
- nni_msg_free(dup);
- }
+ if (req->retrymsg == NULL) {
+ nni_msg_dup(&req->retrymsg, req->reqmsg);
}
req->resend = nni_clock() + req->retry;
}
diff --git a/tests/reqrep.c b/tests/reqrep.c
index fb9a7fb4..d182559d 100644
--- a/tests/reqrep.c
+++ b/tests/reqrep.c
@@ -118,5 +118,58 @@ Main({
nng_msg_free(ping);
})
})
+
+ Convey("Request cancellation works", {
+ nng_msg *abc;
+ nng_msg *def;
+ nng_msg *cmd;
+ nng_msg *nvm;
+ char *body;
+ size_t len;
+ uint64_t retry = 100000; // 100 ms
+
+ nng_socket *req;
+ nng_socket *rep;
+
+ So(nng_open(&rep, NNG_PROTO_REP) == 0);
+ So(rep != NULL);
+
+ So(nng_open(&req, NNG_PROTO_REQ) == 0);
+ So(req != NULL);
+
+ Reset({
+ nng_close(rep);
+ nng_close(req);
+ })
+
+ So(nng_setopt(req, NNG_OPT_RESENDTIME, &retry, sizeof (retry)) == 0);
+ len = 16;
+ So(nng_setopt(req, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0);
+
+ So(nng_msg_alloc(&abc, 0) == 0);
+ So(nng_msg_append(abc, "abc", 4) == 0);
+ So(nng_msg_alloc(&def, 0) == 0);
+ So(nng_msg_append(def, "def", 4) == 0);
+
+ So(nng_dial(req, addr, NULL, 0) == 0);
+
+ So(nng_sendmsg(req, abc, 0) == 0);
+ So(nng_sendmsg(req, def, 0) == 0);
+
+ So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0);
+
+ So(nng_recvmsg(rep, &cmd, 0) == 0);
+ So(cmd != NULL);
+ So(nng_sendmsg(rep, cmd, 0) == 0);
+ So(nng_recvmsg(rep, &cmd, 0) == 0);
+ So(nng_sendmsg(rep, cmd, 0) == 0);
+
+ So(nng_recvmsg(req, &cmd, 0) == 0);
+
+ body = nng_msg_body(cmd, &len);
+ So(len == 4);
+ So(memcmp(body, "def", 4) == 0);
+ nng_msg_free(cmd);
+ })
})
})