diff options
Diffstat (limited to 'src/protocol/reqrep0/req.c')
| -rw-r--r-- | src/protocol/reqrep0/req.c | 37 |
1 files changed, 31 insertions, 6 deletions
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index c63359d5..cd7132c9 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -42,6 +42,7 @@ struct req0_ctx { nng_msg * rep_msg; // reply message nni_timer_node timer; nni_duration retry; + bool conn_reset; // sent message w/o retry, peer disconnect }; // A req0_sock is our per-socket protocol private structure. @@ -222,11 +223,27 @@ req0_pipe_close(void *arg) while ((ctx = nni_list_first(&p->contexts)) != NULL) { nni_list_remove(&p->contexts, ctx); - // Reset the timer on this so it expires immediately. - // This is actually easier than canceling the timer and - // running the send_queue separately. (In particular, it - // avoids a potential deadlock on cancelling the timer.) - nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO); + nng_aio *aio; + if (ctx->retry <= 0) { + // If we can't retry, then just cancel the operation + // altogether. We should only be waiting for recv, + // because we will already have sent if we are here. + if ((aio = ctx->recv_aio) != NULL) { + ctx->recv_aio = NULL; + nni_aio_finish_error(aio, NNG_ECONNRESET); + req0_ctx_reset(ctx); + } else { + req0_ctx_reset(ctx); + ctx->conn_reset = true; + } + } else { + // Reset the timer on this so it expires immediately. + // This is actually easier than canceling the timer and + // running the send_queue separately. (In particular, + // it avoids a potential deadlock on cancelling the + // timer.) + nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO); + } } nni_mtx_unlock(&s->mtx); } @@ -517,6 +534,7 @@ req0_ctx_reset(req0_ctx *ctx) nni_msg_free(ctx->rep_msg); ctx->rep_msg = NULL; } + ctx->conn_reset = false; } static void @@ -559,8 +577,15 @@ req0_ctx_recv(void *arg, nni_aio *aio) // We have already got a pending receive or have not // tried to send a request yet. // Either of these violate our basic state assumptions. + int rv; + if (ctx->conn_reset) { + ctx->conn_reset = false; + rv = NNG_ECONNRESET; + } else { + rv = NNG_ESTATE; + } nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); + nni_aio_finish_error(aio, rv); return; } |
