From 458b414401d897d905f5313ec80594d8b8b95a48 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 26 Dec 2024 16:32:15 -0800 Subject: req rep: use nni_aio_start --- src/sp/protocol/reqrep0/rep.c | 14 ++------------ src/sp/protocol/reqrep0/req.c | 21 ++++++--------------- src/sp/protocol/reqrep0/req_test.c | 2 +- 3 files changed, 9 insertions(+), 28 deletions(-) diff --git a/src/sp/protocol/reqrep0/rep.c b/src/sp/protocol/reqrep0/rep.c index 6c06489d..7a29ee68 100644 --- a/src/sp/protocol/reqrep0/rep.c +++ b/src/sp/protocol/reqrep0/rep.c @@ -139,10 +139,6 @@ rep0_ctx_send(void *arg, nni_aio *aio) msg = nni_aio_get_msg(aio); nni_msg_header_clear(msg); - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&s->lk); len = ctx->btrace_len; p_id = ctx->pipe_id; @@ -191,9 +187,8 @@ rep0_ctx_send(void *arg, nni_aio *aio) return; } - if ((rv = nni_aio_schedule(aio, rep0_ctx_cancel_send, ctx)) != 0) { + if (!nni_aio_start(aio, rep0_ctx_cancel_send, ctx)) { nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); return; } @@ -424,15 +419,10 @@ rep0_ctx_recv(void *arg, nni_aio *aio) size_t len; nni_msg *msg; - if (nni_aio_begin(aio) != 0) { - return; - } nni_mtx_lock(&s->lk); if ((p = nni_list_first(&s->recvpipes)) == NULL) { - int rv; - if ((rv = nni_aio_schedule(aio, rep0_cancel_recv, ctx)) != 0) { + if (!nni_aio_start(aio, rep0_cancel_recv, ctx)) { nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); return; } if (ctx->raio != NULL) { diff --git a/src/sp/protocol/reqrep0/req.c b/src/sp/protocol/reqrep0/req.c index b8b42c26..67407ff0 100644 --- a/src/sp/protocol/reqrep0/req.c +++ b/src/sp/protocol/reqrep0/req.c @@ -608,9 +608,6 @@ req0_ctx_recv(void *arg, nni_aio *aio) req0_sock *s = ctx->sock; nni_msg *msg; - if (nni_aio_begin(aio) != 0) { - return; - } nni_mtx_lock(&s->mtx); if ((ctx->recv_aio != NULL) || ((ctx->req_msg == NULL) && (ctx->rep_msg == NULL))) { @@ -630,11 +627,8 @@ req0_ctx_recv(void *arg, nni_aio *aio) } if ((msg = ctx->rep_msg) == NULL) { - int rv; - rv = nni_aio_schedule(aio, req0_ctx_cancel_recv, ctx); - if (rv != 0) { + if (!nni_aio_start(aio, req0_ctx_cancel_recv, ctx)) { nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); return; } ctx->recv_aio = aio; @@ -692,9 +686,6 @@ req0_ctx_send(void *arg, nni_aio *aio) nng_msg *msg = nni_aio_get_msg(aio); int rv; - if (nni_aio_begin(aio) != 0) { - return; - } nni_mtx_lock(&s->mtx); if (s->closed) { nni_mtx_unlock(&s->mtx); @@ -728,13 +719,13 @@ req0_ctx_send(void *arg, nni_aio *aio) nni_msg_header_clear(msg); nni_msg_header_append_u32(msg, ctx->request_id); - // If no pipes are ready, and the request was a poll (no background - // schedule), then fail it. Should be NNG_ETIMEDOUT. - rv = nni_aio_schedule(aio, req0_ctx_cancel_send, ctx); - if ((rv != 0) && (nni_list_empty(&s->ready_pipes))) { + // only do asynch if we're going to defer -- this is somewhat subtle + // because we can have been submitted for a non-blocking operation and + // in that case we would not like to timeout the operation instantly. + if (nni_list_empty(&s->ready_pipes) && + !nni_aio_start(aio, req0_ctx_cancel_send, ctx)) { nni_id_remove(&s->requests, ctx->request_id); nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); return; } ctx->req_len = nni_msg_len(msg); diff --git a/src/sp/protocol/reqrep0/req_test.c b/src/sp/protocol/reqrep0/req_test.c index d1c615d1..1dc04a5c 100644 --- a/src/sp/protocol/reqrep0/req_test.c +++ b/src/sp/protocol/reqrep0/req_test.c @@ -513,7 +513,7 @@ test_req_poll_writeable(void) // Submit a bunch of jobs. Note that we have to stall a bit // between each message to let it queue up. - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 100; i++) { int rv = nng_send(req, "", 0, NNG_FLAG_NONBLOCK); if (rv == NNG_EAGAIN) { break; -- cgit v1.2.3-70-g09d2