aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep0')
-rw-r--r--src/protocol/reqrep0/req.c37
-rw-r--r--src/protocol/reqrep0/req_test.c77
2 files changed, 108 insertions, 6 deletions
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index c63359d5..cd7132c9 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -42,6 +42,7 @@ struct req0_ctx {
nng_msg * rep_msg; // reply message
nni_timer_node timer;
nni_duration retry;
+ bool conn_reset; // sent message w/o retry, peer disconnect
};
// A req0_sock is our per-socket protocol private structure.
@@ -222,11 +223,27 @@ req0_pipe_close(void *arg)
while ((ctx = nni_list_first(&p->contexts)) != NULL) {
nni_list_remove(&p->contexts, ctx);
- // Reset the timer on this so it expires immediately.
- // This is actually easier than canceling the timer and
- // running the send_queue separately. (In particular, it
- // avoids a potential deadlock on cancelling the timer.)
- nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO);
+ nng_aio *aio;
+ if (ctx->retry <= 0) {
+ // If we can't retry, then just cancel the operation
+ // altogether. We should only be waiting for recv,
+ // because we will already have sent if we are here.
+ if ((aio = ctx->recv_aio) != NULL) {
+ ctx->recv_aio = NULL;
+ nni_aio_finish_error(aio, NNG_ECONNRESET);
+ req0_ctx_reset(ctx);
+ } else {
+ req0_ctx_reset(ctx);
+ ctx->conn_reset = true;
+ }
+ } else {
+ // Reset the timer on this so it expires immediately.
+ // This is actually easier than canceling the timer and
+ // running the send_queue separately. (In particular,
+ // it avoids a potential deadlock on cancelling the
+ // timer.)
+ nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO);
+ }
}
nni_mtx_unlock(&s->mtx);
}
@@ -517,6 +534,7 @@ req0_ctx_reset(req0_ctx *ctx)
nni_msg_free(ctx->rep_msg);
ctx->rep_msg = NULL;
}
+ ctx->conn_reset = false;
}
static void
@@ -559,8 +577,15 @@ req0_ctx_recv(void *arg, nni_aio *aio)
// We have already got a pending receive or have not
// tried to send a request yet.
// Either of these violate our basic state assumptions.
+ int rv;
+ if (ctx->conn_reset) {
+ ctx->conn_reset = false;
+ rv = NNG_ECONNRESET;
+ } else {
+ rv = NNG_ESTATE;
+ }
nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, NNG_ESTATE);
+ nni_aio_finish_error(aio, rv);
return;
}
diff --git a/src/protocol/reqrep0/req_test.c b/src/protocol/reqrep0/req_test.c
index fa685e52..add420de 100644
--- a/src/protocol/reqrep0/req_test.c
+++ b/src/protocol/reqrep0/req_test.c
@@ -286,6 +286,81 @@ test_req_resend_disconnect(void)
}
void
+test_req_disconnect_no_retry(void)
+{
+ nng_socket req;
+ nng_socket rep1;
+ nng_socket rep2;
+
+ TEST_NNG_PASS(nng_req0_open(&req));
+ TEST_NNG_PASS(nng_rep0_open(&rep1));
+ TEST_NNG_PASS(nng_rep0_open(&rep2));
+
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_RECVTIMEO, SECOND));
+ TEST_NNG_PASS(nng_setopt_ms(rep1, NNG_OPT_RECVTIMEO, SECOND));
+ TEST_NNG_PASS(nng_setopt_ms(rep2, NNG_OPT_RECVTIMEO, SECOND));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, SECOND));
+ TEST_NNG_PASS(nng_setopt_ms(rep1, NNG_OPT_SENDTIMEO, SECOND / 10));
+ // Setting the resend time to zero so we will force an error
+ // if the peer disconnects without sending us an answer.
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_REQ_RESENDTIME, 0));
+
+ TEST_NNG_PASS(testutil_marry(rep1, req));
+ TEST_NNG_SEND_STR(req, "ping");
+ TEST_NNG_RECV_STR(rep1, "ping");
+
+ TEST_NNG_PASS(testutil_marry(rep2, req));
+ TEST_NNG_PASS(nng_close(rep1));
+
+ nng_msg *msg = NULL;
+ TEST_NNG_FAIL(nng_recvmsg(req, &msg, 0), NNG_ECONNRESET);
+ TEST_NNG_FAIL(nng_recvmsg(rep2, &msg, 0), NNG_ETIMEDOUT);
+
+ TEST_NNG_PASS(nng_close(req));
+ TEST_NNG_PASS(nng_close(rep2));
+}
+
+void
+test_req_disconnect_abort(void)
+{
+ nng_socket req;
+ nng_socket rep1;
+ nng_socket rep2;
+ nng_aio * aio;
+
+ TEST_NNG_PASS(nng_req0_open(&req));
+ TEST_NNG_PASS(nng_rep0_open(&rep1));
+ TEST_NNG_PASS(nng_rep0_open(&rep2));
+ TEST_NNG_PASS(nng_aio_alloc(&aio, 0, 0));
+
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_RECVTIMEO, SECOND));
+ TEST_NNG_PASS(nng_setopt_ms(rep1, NNG_OPT_RECVTIMEO, SECOND));
+ TEST_NNG_PASS(nng_setopt_ms(rep2, NNG_OPT_RECVTIMEO, SECOND));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, SECOND));
+ TEST_NNG_PASS(nng_setopt_ms(rep1, NNG_OPT_SENDTIMEO, SECOND / 10));
+ // Setting the resend time to zero so we will force an error
+ // if the peer disconnects without sending us an answer.
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_REQ_RESENDTIME, 0));
+
+ TEST_NNG_PASS(testutil_marry(rep1, req));
+ TEST_NNG_SEND_STR(req, "ping");
+ TEST_NNG_RECV_STR(rep1, "ping");
+ nng_recv_aio(req, aio);
+
+ TEST_NNG_PASS(testutil_marry(rep2, req));
+ TEST_NNG_PASS(nng_close(rep1));
+
+ nng_msg *msg = NULL;
+ nng_aio_wait(aio);
+ TEST_NNG_FAIL(nng_aio_result(aio), NNG_ECONNRESET);
+ TEST_NNG_FAIL(nng_recvmsg(rep2, &msg, 0), NNG_ETIMEDOUT);
+ nng_aio_free(aio);
+
+ TEST_NNG_PASS(nng_close(req));
+ TEST_NNG_PASS(nng_close(rep2));
+}
+
+void
test_req_cancel(void)
{
nng_msg * abc;
@@ -929,6 +1004,8 @@ TEST_LIST = {
{ "req rep exchange", test_req_rep_exchange },
{ "req resend", test_req_resend },
{ "req resend disconnect", test_req_resend_disconnect },
+ { "req disconnect no retry", test_req_disconnect_no_retry },
+ { "req disconnect abort", test_req_disconnect_abort },
{ "req resend reconnect", test_req_resend_reconnect },
{ "req cancel", test_req_cancel },
{ "req cancel abort recv", test_req_cancel_abort_recv },