diff options
| -rw-r--r-- | src/protocol/reqrep/req.c | 25 | ||||
| -rw-r--r-- | tests/reqrep.c | 53 |
2 files changed, 70 insertions, 8 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index abd6e25c..d4098526 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -32,6 +32,7 @@ struct nni_req_sock { int raw; int closing; nni_msg * reqmsg; + nni_msg * retrymsg; uint32_t nextid; // next id uint8_t reqid[4]; // outstanding request ID (big endian) }; @@ -69,6 +70,7 @@ nni_req_init(void **reqp, nni_sock *sock) req->retry = NNI_SECOND * 60; req->sock = sock; req->reqmsg = NULL; + req->retrymsg = NULL; req->raw = 0; req->resend = NNI_TIME_ZERO; @@ -106,6 +108,9 @@ nni_req_fini(void *arg) if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); } + if (req->retrymsg != NULL) { + nni_msg_free(req->retrymsg); + } NNI_FREE_STRUCT(req); } @@ -167,9 +172,16 @@ nni_req_pipe_send(void *arg) int rv; for (;;) { - rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose); - if (rv != 0) { - break; + nni_mtx_lock(&req->mx); + if ((msg = req->retrymsg) != NULL) { + req->retrymsg = NULL; + } + nni_mtx_unlock(&req->mx); + if (msg == NULL) { + rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose); + if (rv != 0) { + break; + } } rv = nni_pipe_send(pipe, msg); if (rv != 0) { @@ -294,12 +306,9 @@ nni_req_resender(void *arg) } rv = nni_cv_until(&req->cv, req->resend); if ((rv == NNG_ETIMEDOUT) && (req->reqmsg != NULL)) { - nni_msg *dup; // XXX: check for final timeout on this? - if (nni_msg_dup(&dup, req->reqmsg) != 0) { - if (nni_msgq_putback(req->uwq, dup) != 0) { - nni_msg_free(dup); - } + if (req->retrymsg == NULL) { + nni_msg_dup(&req->retrymsg, req->reqmsg); } req->resend = nni_clock() + req->retry; } diff --git a/tests/reqrep.c b/tests/reqrep.c index fb9a7fb4..d182559d 100644 --- a/tests/reqrep.c +++ b/tests/reqrep.c @@ -118,5 +118,58 @@ Main({ nng_msg_free(ping); }) }) + + Convey("Request cancellation works", { + nng_msg *abc; + nng_msg *def; + nng_msg *cmd; + nng_msg *nvm; + char *body; + size_t len; + uint64_t retry = 100000; // 100 ms + + nng_socket *req; + nng_socket *rep; + + So(nng_open(&rep, NNG_PROTO_REP) == 0); + So(rep != NULL); + + So(nng_open(&req, NNG_PROTO_REQ) == 0); + So(req != NULL); + + Reset({ + nng_close(rep); + nng_close(req); + }) + + So(nng_setopt(req, NNG_OPT_RESENDTIME, &retry, sizeof (retry)) == 0); + len = 16; + So(nng_setopt(req, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0); + + So(nng_msg_alloc(&abc, 0) == 0); + So(nng_msg_append(abc, "abc", 4) == 0); + So(nng_msg_alloc(&def, 0) == 0); + So(nng_msg_append(def, "def", 4) == 0); + + So(nng_dial(req, addr, NULL, 0) == 0); + + So(nng_sendmsg(req, abc, 0) == 0); + So(nng_sendmsg(req, def, 0) == 0); + + So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0); + + So(nng_recvmsg(rep, &cmd, 0) == 0); + So(cmd != NULL); + So(nng_sendmsg(rep, cmd, 0) == 0); + So(nng_recvmsg(rep, &cmd, 0) == 0); + So(nng_sendmsg(rep, cmd, 0) == 0); + + So(nng_recvmsg(req, &cmd, 0) == 0); + + body = nng_msg_body(cmd, &len); + So(len == 4); + So(memcmp(body, "def", 4) == 0); + nng_msg_free(cmd); + }) }) }) |
