aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2023-12-17 09:51:39 -0800
committerGarrett D'Amore <garrett@damore.org>2023-12-17 09:51:39 -0800
commitdc499882e82827f39a77669fb7dc5cd7a70aaf40 (patch)
treeb5244944a3a13763cc9403ced9443ad601f456ad /src
parent9f34ec0a450dd2a9ef11aa29d07948144ec97de4 (diff)
downloadnng-dc499882e82827f39a77669fb7dc5cd7a70aaf40.tar.gz
nng-dc499882e82827f39a77669fb7dc5cd7a70aaf40.tar.bz2
nng-dc499882e82827f39a77669fb7dc5cd7a70aaf40.zip
fixes #1728 surveyor could be simplified to not use timer
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c40
-rw-r--r--src/core/aio.h35
-rw-r--r--src/sp/protocol/survey0/survey.c45
3 files changed, 64 insertions, 56 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index e849b33d..3d4a56c1 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -115,7 +115,7 @@ void
nni_aio_fini(nni_aio *aio)
{
nni_aio_cancel_fn fn;
- void *arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
// This is like aio_close, but we don't want to dispatch
@@ -247,7 +247,21 @@ nni_aio_close(nni_aio *aio)
void
nni_aio_set_timeout(nni_aio *aio, nni_duration when)
{
- aio->a_timeout = when;
+ aio->a_timeout = when;
+ aio->a_use_expire = false;
+}
+
+void
+nni_aio_set_expire(nni_aio *aio, nni_time expire)
+{
+ aio->a_expire = expire;
+ aio->a_use_expire = true;
+}
+
+nng_duration
+nni_aio_get_timeout(nni_aio *aio)
+{
+ return (aio->a_timeout);
}
void
@@ -369,7 +383,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
{
nni_aio_expire_q *eq = aio->a_expire_q;
- if (!aio->a_sleep) {
+ if ((!aio->a_sleep) && (!aio->a_use_expire)) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
@@ -411,7 +425,7 @@ void
nni_aio_abort(nni_aio *aio, int rv)
{
nni_aio_cancel_fn fn;
- void *arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
@@ -447,8 +461,9 @@ nni_aio_finish_impl(
aio->a_msg = msg;
}
- aio->a_expire = NNI_TIME_NEVER;
- aio->a_sleep = false;
+ aio->a_expire = NNI_TIME_NEVER;
+ aio->a_sleep = false;
+ aio->a_use_expire = false;
nni_mtx_unlock(&eq->eq_mtx);
if (sync) {
@@ -518,13 +533,14 @@ nni_aio_completions_init(nni_aio_completions *clp)
}
void
-nni_aio_completions_add(nni_aio_completions *clp, nni_aio *aio, int result, size_t count)
+nni_aio_completions_add(
+ nni_aio_completions *clp, nni_aio *aio, int result, size_t count)
{
NNI_ASSERT(!nni_aio_list_active(aio));
aio->a_reap_node.rn_next = *clp;
- aio->a_result = result;
- aio->a_count = count;
- *clp = aio;
+ aio->a_result = result;
+ aio->a_count = count;
+ *clp = aio;
}
void
@@ -532,10 +548,10 @@ nni_aio_completions_run(nni_aio_completions *clp)
{
nni_aio *aio;
nni_aio *cl = *clp;
- *clp = NULL;
+ *clp = NULL;
while ((aio = cl) != NULL) {
- cl = (void *)aio->a_reap_node.rn_next;
+ cl = (void *) aio->a_reap_node.rn_next;
aio->a_reap_node.rn_next = NULL;
nni_aio_finish_sync(aio, aio->a_result, aio->a_count);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index a2ebf70a..cae8610f 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -149,10 +149,12 @@ extern size_t nni_aio_iov_count(nni_aio *);
extern int nni_aio_set_iov(nni_aio *, unsigned, const nni_iov *);
-extern void nni_aio_set_timeout(nni_aio *, nng_duration);
-extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **);
-extern void nni_aio_normalize_timeout(nni_aio *, nng_duration);
-extern void nni_aio_bump_count(nni_aio *, size_t);
+extern void nni_aio_set_timeout(nni_aio *, nng_duration);
+extern void nni_aio_set_expire(nni_aio *, nni_time);
+extern nng_duration nni_aio_get_timeout(nni_aio *);
+extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **);
+extern void nni_aio_normalize_timeout(nni_aio *, nng_duration);
+extern void nni_aio_bump_count(nni_aio *, size_t);
// nni_aio_schedule indicates that the AIO has begun, and is scheduled for
// asynchronous completion. This also starts the expiration timer. Note that
@@ -187,8 +189,8 @@ extern void nni_aio_completions_run(nni_aio_completions *);
// nni_aio_completions_add adds an aio (with the result code and length as
// appropriate) to the completion list. This should be done while the
// appropriate lock is held. The aio must not be scheduled.
-extern void nni_aio_completions_add(nni_aio_completions *, nni_aio *,
- int, size_t);
+extern void nni_aio_completions_add(
+ nni_aio_completions *, nni_aio *, int, size_t);
extern int nni_aio_sys_init(void);
extern void nni_aio_sys_fini(void);
@@ -202,14 +204,15 @@ typedef struct nni_aio_expire_q nni_aio_expire_q;
// any of these members -- the definition is provided here to facilitate
// inlining, but that should be the only use.
struct nng_aio {
- size_t a_count; // Bytes transferred (I/O only)
- nni_time a_expire; // Absolute timeout
- nni_duration a_timeout; // Relative timeout
- int a_result; // Result code (nng_errno)
- bool a_stop; // Shutting down (no new operations)
- bool a_sleep; // Sleeping with no action
- bool a_expire_ok; // Expire from sleep is ok
- bool a_expiring; // Expiration in progress
+ size_t a_count; // Bytes transferred (I/O only)
+ nni_time a_expire; // Absolute timeout
+ nni_duration a_timeout; // Relative timeout
+ int a_result; // Result code (nng_errno)
+ bool a_stop; // Shutting down (no new operations)
+ bool a_sleep; // Sleeping with no action
+ bool a_expire_ok; // Expire from sleep is ok
+ bool a_expiring; // Expiration in progress
+ bool a_use_expire; // Use expire instead of timeout
nni_task a_task;
// Read/write operations.
@@ -227,8 +230,8 @@ struct nng_aio {
// Provider-use fields.
nni_aio_cancel_fn a_cancel_fn;
- void *a_cancel_arg;
- void *a_prov_data;
+ void *a_cancel_arg;
+ void *a_prov_data;
nni_list_node a_prov_node; // Linkage on provider list.
nni_aio_expire_q *a_expire_q;
nni_list_node a_expire_node; // Expiration node
diff --git a/src/sp/protocol/survey0/survey.c b/src/sp/protocol/survey0/survey.c
index 5c52d8f8..18074016 100644
--- a/src/sp/protocol/survey0/survey.c
+++ b/src/sp/protocol/survey0/survey.c
@@ -1,5 +1,5 @@
//
-// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -25,17 +25,15 @@ typedef struct surv0_ctx surv0_ctx;
static void surv0_pipe_send_cb(void *);
static void surv0_pipe_recv_cb(void *);
-static void surv0_ctx_timeout(void *);
struct surv0_ctx {
surv0_sock * sock;
uint32_t survey_id; // survey id
- nni_timer_node timer;
- nni_time expire;
nni_lmq recv_lmq;
nni_list recv_queue;
nni_atomic_int recv_buf;
nni_atomic_int survey_time;
+ nni_time expire;
int err;
};
@@ -99,7 +97,6 @@ surv0_ctx_fini(void *arg)
surv0_ctx *ctx = arg;
surv0_ctx_close(ctx);
- nni_timer_cancel(&ctx->timer);
nni_lmq_fini(&ctx->recv_lmq);
}
@@ -129,7 +126,6 @@ surv0_ctx_init(void *c, void *s)
ctx->sock = sock;
nni_lmq_init(&ctx->recv_lmq, len);
- nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx);
}
static void
@@ -155,17 +151,28 @@ surv0_ctx_recv(void *arg, nni_aio *aio)
surv0_ctx * ctx = arg;
surv0_sock *sock = ctx->sock;
nni_msg * msg;
+ nni_time now;
+ nni_duration timeout;
if (nni_aio_begin(aio) != 0) {
return;
}
+ now = nni_clock();
+
nni_mtx_lock(&sock->mtx);
- if (ctx->survey_id == 0) {
+ if ((ctx->survey_id == 0) || (now >= ctx->expire)) {
nni_mtx_unlock(&sock->mtx);
nni_aio_finish_error(aio, NNG_ESTATE);
return;
}
+
+ timeout = nni_aio_get_timeout(aio);
+ if ((timeout < 1) || ((now + timeout) > ctx->expire)) {
+ // limit the timeout to the survey time
+ nni_aio_set_expire(aio, ctx->expire);
+ }
+
again:
if (nni_lmq_get(&ctx->recv_lmq, &msg) != 0) {
int rv;
@@ -190,23 +197,6 @@ again:
nni_aio_finish_msg(aio, msg);
}
-void
-surv0_ctx_timeout(void *arg)
-{
- surv0_ctx * ctx = arg;
- surv0_sock *sock = ctx->sock;
-
- nni_mtx_lock(&sock->mtx);
- if (nni_clock() < ctx->expire) {
- nni_mtx_unlock(&sock->mtx);
- return;
- }
-
- // Abort any pending receives.
- surv0_ctx_abort(ctx, NNG_ETIMEDOUT);
- nni_mtx_unlock(&sock->mtx);
-}
-
static void
surv0_ctx_send(void *arg, nni_aio *aio)
{
@@ -215,7 +205,6 @@ surv0_ctx_send(void *arg, nni_aio *aio)
surv0_pipe * pipe;
nni_msg * msg = nni_aio_get_msg(aio);
size_t len = nni_msg_len(msg);
- nni_time now = nni_clock();
nng_duration survey_time;
int rv;
@@ -229,7 +218,6 @@ surv0_ctx_send(void *arg, nni_aio *aio)
// Abort everything outstanding.
surv0_ctx_abort(ctx, NNG_ECANCELED);
- nni_timer_cancel(&ctx->timer);
// Allocate the new ID.
if ((rv = nni_id_alloc(&sock->surveys, &ctx->survey_id, ctx)) != 0) {
@@ -258,8 +246,9 @@ surv0_ctx_send(void *arg, nni_aio *aio)
}
}
- ctx->expire = now + survey_time;
- nni_timer_schedule(&ctx->timer, ctx->expire);
+ // save the survey time, so we know the maximum timeout to use when
+ // waiting for receive
+ ctx->expire = nni_clock() + survey_time;
nni_mtx_unlock(&sock->mtx);
nni_msg_free(msg);