summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-05 20:21:52 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-05 20:21:52 -0800
commitd28f9d935819a88fa799b99362b78a277b74fce8 (patch)
tree612413bfcf85295e1926ffab3bc4011804725266
parent44f1e3674f56dccb1feb5f0f5a37cb2196f20591 (diff)
downloadnng-d28f9d935819a88fa799b99362b78a277b74fce8.tar.gz
nng-d28f9d935819a88fa799b99362b78a277b74fce8.tar.bz2
nng-d28f9d935819a88fa799b99362b78a277b74fce8.zip
Added test for cancellation, fixed retry bug.
On retry we were pushing back to the queue. The problem with this is that we could wind up pushing back many copies of the message if no reader was present. The new code ensures at most one retry is outstanding.
-rw-r--r--src/protocol/reqrep/req.c25
-rw-r--r--tests/reqrep.c53
2 files changed, 70 insertions, 8 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index abd6e25c..d4098526 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -32,6 +32,7 @@ struct nni_req_sock {
int raw;
int closing;
nni_msg * reqmsg;
+ nni_msg * retrymsg;
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
};
@@ -69,6 +70,7 @@ nni_req_init(void **reqp, nni_sock *sock)
req->retry = NNI_SECOND * 60;
req->sock = sock;
req->reqmsg = NULL;
+ req->retrymsg = NULL;
req->raw = 0;
req->resend = NNI_TIME_ZERO;
@@ -106,6 +108,9 @@ nni_req_fini(void *arg)
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
}
+ if (req->retrymsg != NULL) {
+ nni_msg_free(req->retrymsg);
+ }
NNI_FREE_STRUCT(req);
}
@@ -167,9 +172,16 @@ nni_req_pipe_send(void *arg)
int rv;
for (;;) {
- rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose);
- if (rv != 0) {
- break;
+ nni_mtx_lock(&req->mx);
+ if ((msg = req->retrymsg) != NULL) {
+ req->retrymsg = NULL;
+ }
+ nni_mtx_unlock(&req->mx);
+ if (msg == NULL) {
+ rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose);
+ if (rv != 0) {
+ break;
+ }
}
rv = nni_pipe_send(pipe, msg);
if (rv != 0) {
@@ -294,12 +306,9 @@ nni_req_resender(void *arg)
}
rv = nni_cv_until(&req->cv, req->resend);
if ((rv == NNG_ETIMEDOUT) && (req->reqmsg != NULL)) {
- nni_msg *dup;
// XXX: check for final timeout on this?
- if (nni_msg_dup(&dup, req->reqmsg) != 0) {
- if (nni_msgq_putback(req->uwq, dup) != 0) {
- nni_msg_free(dup);
- }
+ if (req->retrymsg == NULL) {
+ nni_msg_dup(&req->retrymsg, req->reqmsg);
}
req->resend = nni_clock() + req->retry;
}
diff --git a/tests/reqrep.c b/tests/reqrep.c
index fb9a7fb4..d182559d 100644
--- a/tests/reqrep.c
+++ b/tests/reqrep.c
@@ -118,5 +118,58 @@ Main({
nng_msg_free(ping);
})
})
+
+ Convey("Request cancellation works", {
+ nng_msg *abc;
+ nng_msg *def;
+ nng_msg *cmd;
+ nng_msg *nvm;
+ char *body;
+ size_t len;
+ uint64_t retry = 100000; // 100 ms
+
+ nng_socket *req;
+ nng_socket *rep;
+
+ So(nng_open(&rep, NNG_PROTO_REP) == 0);
+ So(rep != NULL);
+
+ So(nng_open(&req, NNG_PROTO_REQ) == 0);
+ So(req != NULL);
+
+ Reset({
+ nng_close(rep);
+ nng_close(req);
+ })
+
+ So(nng_setopt(req, NNG_OPT_RESENDTIME, &retry, sizeof (retry)) == 0);
+ len = 16;
+ So(nng_setopt(req, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0);
+
+ So(nng_msg_alloc(&abc, 0) == 0);
+ So(nng_msg_append(abc, "abc", 4) == 0);
+ So(nng_msg_alloc(&def, 0) == 0);
+ So(nng_msg_append(def, "def", 4) == 0);
+
+ So(nng_dial(req, addr, NULL, 0) == 0);
+
+ So(nng_sendmsg(req, abc, 0) == 0);
+ So(nng_sendmsg(req, def, 0) == 0);
+
+ So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0);
+
+ So(nng_recvmsg(rep, &cmd, 0) == 0);
+ So(cmd != NULL);
+ So(nng_sendmsg(rep, cmd, 0) == 0);
+ So(nng_recvmsg(rep, &cmd, 0) == 0);
+ So(nng_sendmsg(rep, cmd, 0) == 0);
+
+ So(nng_recvmsg(req, &cmd, 0) == 0);
+
+ body = nng_msg_body(cmd, &len);
+ So(len == 4);
+ So(memcmp(body, "def", 4) == 0);
+ nng_msg_free(cmd);
+ })
})
})