aboutsummaryrefslogtreecommitdiff
path: root/src/sp
diff options
context:
space:
mode:
Diffstat (limited to 'src/sp')
-rw-r--r--src/sp/protocol/survey0/survey.c45
1 files changed, 17 insertions, 28 deletions
diff --git a/src/sp/protocol/survey0/survey.c b/src/sp/protocol/survey0/survey.c
index 5c52d8f8..18074016 100644
--- a/src/sp/protocol/survey0/survey.c
+++ b/src/sp/protocol/survey0/survey.c
@@ -1,5 +1,5 @@
//
-// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -25,17 +25,15 @@ typedef struct surv0_ctx surv0_ctx;
static void surv0_pipe_send_cb(void *);
static void surv0_pipe_recv_cb(void *);
-static void surv0_ctx_timeout(void *);
struct surv0_ctx {
surv0_sock * sock;
uint32_t survey_id; // survey id
- nni_timer_node timer;
- nni_time expire;
nni_lmq recv_lmq;
nni_list recv_queue;
nni_atomic_int recv_buf;
nni_atomic_int survey_time;
+ nni_time expire;
int err;
};
@@ -99,7 +97,6 @@ surv0_ctx_fini(void *arg)
surv0_ctx *ctx = arg;
surv0_ctx_close(ctx);
- nni_timer_cancel(&ctx->timer);
nni_lmq_fini(&ctx->recv_lmq);
}
@@ -129,7 +126,6 @@ surv0_ctx_init(void *c, void *s)
ctx->sock = sock;
nni_lmq_init(&ctx->recv_lmq, len);
- nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx);
}
static void
@@ -155,17 +151,28 @@ surv0_ctx_recv(void *arg, nni_aio *aio)
surv0_ctx * ctx = arg;
surv0_sock *sock = ctx->sock;
nni_msg * msg;
+ nni_time now;
+ nni_duration timeout;
if (nni_aio_begin(aio) != 0) {
return;
}
+ now = nni_clock();
+
nni_mtx_lock(&sock->mtx);
- if (ctx->survey_id == 0) {
+ if ((ctx->survey_id == 0) || (now >= ctx->expire)) {
nni_mtx_unlock(&sock->mtx);
nni_aio_finish_error(aio, NNG_ESTATE);
return;
}
+
+ timeout = nni_aio_get_timeout(aio);
+ if ((timeout < 1) || ((now + timeout) > ctx->expire)) {
+ // limit the timeout to the survey time
+ nni_aio_set_expire(aio, ctx->expire);
+ }
+
again:
if (nni_lmq_get(&ctx->recv_lmq, &msg) != 0) {
int rv;
@@ -190,23 +197,6 @@ again:
nni_aio_finish_msg(aio, msg);
}
-void
-surv0_ctx_timeout(void *arg)
-{
- surv0_ctx * ctx = arg;
- surv0_sock *sock = ctx->sock;
-
- nni_mtx_lock(&sock->mtx);
- if (nni_clock() < ctx->expire) {
- nni_mtx_unlock(&sock->mtx);
- return;
- }
-
- // Abort any pending receives.
- surv0_ctx_abort(ctx, NNG_ETIMEDOUT);
- nni_mtx_unlock(&sock->mtx);
-}
-
static void
surv0_ctx_send(void *arg, nni_aio *aio)
{
@@ -215,7 +205,6 @@ surv0_ctx_send(void *arg, nni_aio *aio)
surv0_pipe * pipe;
nni_msg * msg = nni_aio_get_msg(aio);
size_t len = nni_msg_len(msg);
- nni_time now = nni_clock();
nng_duration survey_time;
int rv;
@@ -229,7 +218,6 @@ surv0_ctx_send(void *arg, nni_aio *aio)
// Abort everything outstanding.
surv0_ctx_abort(ctx, NNG_ECANCELED);
- nni_timer_cancel(&ctx->timer);
// Allocate the new ID.
if ((rv = nni_id_alloc(&sock->surveys, &ctx->survey_id, ctx)) != 0) {
@@ -258,8 +246,9 @@ surv0_ctx_send(void *arg, nni_aio *aio)
}
}
- ctx->expire = now + survey_time;
- nni_timer_schedule(&ctx->timer, ctx->expire);
+ // save the survey time, so we know the maximum timeout to use when
+ // waiting for receive
+ ctx->expire = nni_clock() + survey_time;
nni_mtx_unlock(&sock->mtx);
nni_msg_free(msg);