aboutsummaryrefslogtreecommitdiff
path: root/src/sp
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-26 16:32:15 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-26 18:21:22 -0800
commit458b414401d897d905f5313ec80594d8b8b95a48 (patch)
treed81d6be6dc6ba55a57047ed9a57d3afb54ed88a5 /src/sp
parentf915163ead94fa7db84d5e1de60b29c72b5c2466 (diff)
downloadnng-458b414401d897d905f5313ec80594d8b8b95a48.tar.gz
nng-458b414401d897d905f5313ec80594d8b8b95a48.tar.bz2
nng-458b414401d897d905f5313ec80594d8b8b95a48.zip
req rep: use nni_aio_start
Diffstat (limited to 'src/sp')
-rw-r--r--src/sp/protocol/reqrep0/rep.c14
-rw-r--r--src/sp/protocol/reqrep0/req.c21
-rw-r--r--src/sp/protocol/reqrep0/req_test.c2
3 files changed, 9 insertions, 28 deletions
diff --git a/src/sp/protocol/reqrep0/rep.c b/src/sp/protocol/reqrep0/rep.c
index 6c06489d..7a29ee68 100644
--- a/src/sp/protocol/reqrep0/rep.c
+++ b/src/sp/protocol/reqrep0/rep.c
@@ -139,10 +139,6 @@ rep0_ctx_send(void *arg, nni_aio *aio)
msg = nni_aio_get_msg(aio);
nni_msg_header_clear(msg);
- if (nni_aio_begin(aio) != 0) {
- return;
- }
-
nni_mtx_lock(&s->lk);
len = ctx->btrace_len;
p_id = ctx->pipe_id;
@@ -191,9 +187,8 @@ rep0_ctx_send(void *arg, nni_aio *aio)
return;
}
- if ((rv = nni_aio_schedule(aio, rep0_ctx_cancel_send, ctx)) != 0) {
+ if (!nni_aio_start(aio, rep0_ctx_cancel_send, ctx)) {
nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, rv);
return;
}
@@ -424,15 +419,10 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
size_t len;
nni_msg *msg;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
nni_mtx_lock(&s->lk);
if ((p = nni_list_first(&s->recvpipes)) == NULL) {
- int rv;
- if ((rv = nni_aio_schedule(aio, rep0_cancel_recv, ctx)) != 0) {
+ if (!nni_aio_start(aio, rep0_cancel_recv, ctx)) {
nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, rv);
return;
}
if (ctx->raio != NULL) {
diff --git a/src/sp/protocol/reqrep0/req.c b/src/sp/protocol/reqrep0/req.c
index b8b42c26..67407ff0 100644
--- a/src/sp/protocol/reqrep0/req.c
+++ b/src/sp/protocol/reqrep0/req.c
@@ -608,9 +608,6 @@ req0_ctx_recv(void *arg, nni_aio *aio)
req0_sock *s = ctx->sock;
nni_msg *msg;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
nni_mtx_lock(&s->mtx);
if ((ctx->recv_aio != NULL) ||
((ctx->req_msg == NULL) && (ctx->rep_msg == NULL))) {
@@ -630,11 +627,8 @@ req0_ctx_recv(void *arg, nni_aio *aio)
}
if ((msg = ctx->rep_msg) == NULL) {
- int rv;
- rv = nni_aio_schedule(aio, req0_ctx_cancel_recv, ctx);
- if (rv != 0) {
+ if (!nni_aio_start(aio, req0_ctx_cancel_recv, ctx)) {
nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
ctx->recv_aio = aio;
@@ -692,9 +686,6 @@ req0_ctx_send(void *arg, nni_aio *aio)
nng_msg *msg = nni_aio_get_msg(aio);
int rv;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
nni_mtx_lock(&s->mtx);
if (s->closed) {
nni_mtx_unlock(&s->mtx);
@@ -728,13 +719,13 @@ req0_ctx_send(void *arg, nni_aio *aio)
nni_msg_header_clear(msg);
nni_msg_header_append_u32(msg, ctx->request_id);
- // If no pipes are ready, and the request was a poll (no background
- // schedule), then fail it. Should be NNG_ETIMEDOUT.
- rv = nni_aio_schedule(aio, req0_ctx_cancel_send, ctx);
- if ((rv != 0) && (nni_list_empty(&s->ready_pipes))) {
+ // only do asynch if we're going to defer -- this is somewhat subtle
+ // because we can have been submitted for a non-blocking operation and
+ // in that case we would not like to timeout the operation instantly.
+ if (nni_list_empty(&s->ready_pipes) &&
+ !nni_aio_start(aio, req0_ctx_cancel_send, ctx)) {
nni_id_remove(&s->requests, ctx->request_id);
nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
ctx->req_len = nni_msg_len(msg);
diff --git a/src/sp/protocol/reqrep0/req_test.c b/src/sp/protocol/reqrep0/req_test.c
index d1c615d1..1dc04a5c 100644
--- a/src/sp/protocol/reqrep0/req_test.c
+++ b/src/sp/protocol/reqrep0/req_test.c
@@ -513,7 +513,7 @@ test_req_poll_writeable(void)
// Submit a bunch of jobs. Note that we have to stall a bit
// between each message to let it queue up.
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 100; i++) {
int rv = nng_send(req, "", 0, NNG_FLAG_NONBLOCK);
if (rv == NNG_EAGAIN) {
break;