aboutsummaryrefslogtreecommitdiff
path: root/src/sp/protocol/reqrep0/req.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sp/protocol/reqrep0/req.c')
-rw-r--r--src/sp/protocol/reqrep0/req.c167
1 files changed, 118 insertions, 49 deletions
diff --git a/src/sp/protocol/reqrep0/req.c b/src/sp/protocol/reqrep0/req.c
index fdac7d5c..7e3e3db8 100644
--- a/src/sp/protocol/reqrep0/req.c
+++ b/src/sp/protocol/reqrep0/req.c
@@ -21,34 +21,36 @@ typedef struct req0_ctx req0_ctx;
static void req0_run_send_queue(req0_sock *, nni_aio_completions *);
static void req0_ctx_reset(req0_ctx *);
-static void req0_ctx_timeout(void *);
static void req0_pipe_fini(void *);
static void req0_ctx_fini(void *);
static void req0_ctx_init(void *, void *);
+static void req0_retry_cb(void *);
// A req0_ctx is a "context" for the request. It uses most of the
// socket, but keeps track of its own outstanding replays, the request ID,
// and so forth.
struct req0_ctx {
- req0_sock *sock;
- nni_list_node sock_node; // node on the socket context list
- nni_list_node send_node; // node on the send_queue
- nni_list_node pipe_node; // node on the pipe list
- uint32_t request_id; // request ID, without high bit set
- nni_aio *recv_aio; // user aio waiting to recv - only one!
- nni_aio *send_aio; // user aio waiting to send
- nng_msg *req_msg; // request message (owned by protocol)
- size_t req_len; // length of request message (for stats)
- nng_msg *rep_msg; // reply message
- nni_timer_node timer;
- nni_duration retry;
- bool conn_reset; // sent message w/o retry, peer disconnect
+ req0_sock *sock;
+ nni_list_node sock_node; // node on the socket context list
+ nni_list_node send_node; // node on the send_queue
+ nni_list_node pipe_node; // node on the pipe list
+ nni_list_node retry_node; // node on the socket retry list
+ uint32_t request_id; // request ID, without high bit set
+ nni_aio *recv_aio; // user aio waiting to recv - only one!
+ nni_aio *send_aio; // user aio waiting to send
+ nng_msg *req_msg; // request message (owned by protocol)
+ size_t req_len; // length of request message (for stats)
+ nng_msg *rep_msg; // reply message
+ nni_duration retry;
+ nni_time retry_time; // retry after this expires
+ bool conn_reset; // sent message w/o retry, peer disconnect
};
// A req0_sock is our per-socket protocol private structure.
struct req0_sock {
nni_duration retry;
bool closed;
+ bool retry_active; // true if retry aio running
nni_atomic_int ttl;
req0_ctx master; // base socket master
nni_list ready_pipes;
@@ -56,9 +58,12 @@ struct req0_sock {
nni_list stop_pipes;
nni_list contexts;
nni_list send_queue; // contexts waiting to send.
- nni_id_map requests; // contexts by request ID
+ nni_list retry_queue;
+ nni_aio retry_aio; // retry timer
+ nni_id_map requests; // contexts by request ID
nni_pollable readable;
nni_pollable writable;
+ nni_duration retry_tick; // clock interval for retry timer
nni_mtx mtx;
};
@@ -95,16 +100,20 @@ req0_sock_init(void *arg, nni_sock *sock)
NNI_LIST_INIT(&s->busy_pipes, req0_pipe, node);
NNI_LIST_INIT(&s->stop_pipes, req0_pipe, node);
NNI_LIST_INIT(&s->send_queue, req0_ctx, send_node);
+ NNI_LIST_INIT(&s->retry_queue, req0_ctx, retry_node);
NNI_LIST_INIT(&s->contexts, req0_ctx, sock_node);
// this is "semi random" start for request IDs.
- s->retry = NNI_SECOND * 60;
+ s->retry = NNI_SECOND * 60;
+ s->retry_tick = NNI_SECOND; // how often we check for retries
req0_ctx_init(&s->master, s);
nni_pollable_init(&s->writable);
nni_pollable_init(&s->readable);
+ nni_aio_init(&s->retry_aio, req0_retry_cb, s);
+
nni_atomic_init(&s->ttl);
nni_atomic_set(&s->ttl, 8);
}
@@ -130,6 +139,7 @@ req0_sock_fini(void *arg)
{
req0_sock *s = arg;
+ nni_aio_stop(&s->retry_aio);
nni_mtx_lock(&s->mtx);
NNI_ASSERT(nni_list_empty(&s->busy_pipes));
NNI_ASSERT(nni_list_empty(&s->stop_pipes));
@@ -140,6 +150,7 @@ req0_sock_fini(void *arg)
nni_pollable_fini(&s->readable);
nni_pollable_fini(&s->writable);
nni_id_map_fini(&s->requests);
+ nni_aio_fini(&s->retry_aio);
nni_mtx_fini(&s->mtx);
}
@@ -235,13 +246,17 @@ req0_pipe_close(void *arg)
req0_ctx_reset(ctx);
ctx->conn_reset = true;
}
- } else {
- // Reset the timer on this so it expires immediately.
- // This is actually easier than canceling the timer and
- // running the send_queue separately. (In particular,
- // it avoids a potential deadlock on cancelling the
- // timer.)
- nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO);
+ } else if (ctx->req_msg != NULL) {
+ // Reset the retry time to make it expire immediately.
+ // Also move this immediately to the resend queue.
+ // The timer should still be firing, so we don't need
+ // to restart or reschedule that.
+ ctx->retry_time = nni_clock() + ctx->retry;
+
+ if (!nni_list_node_active(&ctx->send_node)) {
+ nni_list_append(&s->send_queue, ctx);
+ req0_run_send_queue(s, NULL);
+ }
}
}
nni_mtx_unlock(&s->mtx);
@@ -363,16 +378,41 @@ malformed:
}
static void
-req0_ctx_timeout(void *arg)
+req0_retry_cb(void *arg)
{
- req0_ctx *ctx = arg;
- req0_sock *s = ctx->sock;
-
+ req0_sock *s = arg;
+ req0_ctx *ctx;
+ nni_time now;
+ bool reschedule = false;
+
+ // The design of this is that retries are infrequent, because
+ // we should normally be succeeding. We also hope that we are not
+ // executing this linear scan of all requests too often, once
+ // per clock tick is all we want.
+ now = nni_clock();
nni_mtx_lock(&s->mtx);
- if ((ctx->req_msg != NULL) && (!s->closed)) {
+ if (s->closed || (nni_aio_result(&s->retry_aio) != 0)) {
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ NNI_LIST_FOREACH (&s->retry_queue, ctx) {
+ if (ctx->retry_time > now || (ctx->req_msg == NULL)) {
+ continue;
+ }
if (!nni_list_node_active(&ctx->send_node)) {
nni_list_append(&s->send_queue, ctx);
}
+ reschedule = true;
+ }
+ if (!nni_list_empty(&s->retry_queue)) {
+ // if there are still jobs in the queue waiting to be
+ // retried, do them.
+ nni_sleep_aio(s->retry_tick, &s->retry_aio);
+ } else {
+ s->retry_active = false;
+ }
+ if (reschedule) {
req0_run_send_queue(s, NULL);
}
nni_mtx_unlock(&s->mtx);
@@ -384,8 +424,6 @@ req0_ctx_init(void *arg, void *sock)
req0_sock *s = sock;
req0_ctx *ctx = arg;
- nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx);
-
nni_mtx_lock(&s->mtx);
ctx->sock = s;
ctx->recv_aio = NULL;
@@ -415,9 +453,6 @@ req0_ctx_fini(void *arg)
req0_ctx_reset(ctx);
nni_list_remove(&s->contexts, ctx);
nni_mtx_unlock(&s->mtx);
-
- nni_timer_cancel(&ctx->timer);
- nni_timer_fini(&ctx->timer);
}
static int
@@ -448,20 +483,20 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list)
return;
}
- // We have a place to send it, so do the send.
+ // We have a place to send it, so send it.
// If a sending error occurs that causes the message to
// be dropped, we rely on the resend timer to pick it up.
// We also notify the completion callback if this is the
// first send attempt.
nni_list_remove(&s->send_queue, ctx);
- // Schedule a resubmit timer. We only do this if we got
+ // Schedule a retry. We only do this if we got
// a pipe to send to. Otherwise, we should get handled
// the next time that the send_queue is run. We don't do this
// if the retry is "disabled" with NNG_DURATION_INFINITE.
if (ctx->retry > 0) {
- nni_timer_schedule(
- &ctx->timer, nni_clock() + ctx->retry);
+ nni_list_node_remove(&ctx->retry_node);
+ nni_list_append(&s->retry_queue, ctx);
}
// Put us on the pipe list of active contexts.
@@ -489,7 +524,7 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list)
}
// At this point, we will never give this message back to
- // to the user, so we don't have to worry about making it
+ // the user, so we don't have to worry about making it
// unique. We can freely clone it.
nni_msg_clone(ctx->req_msg);
nni_aio_set_msg(&p->aio_send, ctx->req_msg);
@@ -503,16 +538,7 @@ req0_ctx_reset(req0_ctx *ctx)
req0_sock *s = ctx->sock;
// Call with sock lock held!
- // We cannot safely "wait" using nni_timer_cancel, but this removes
- // any scheduled timer activation. If the timeout is already running
- // concurrently, it will still run. It should do nothing, because
- // we toss the request. There is still a very narrow race if the
- // timeout fires, but doesn't actually start running before we
- // both finish this function, *and* manage to reschedule another
- // request. The consequence of that occurring is that the request
- // will be emitted on the wire twice. This is not actually tragic.
- nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER);
-
+ nni_list_node_remove(&ctx->retry_node);
nni_list_node_remove(&ctx->pipe_node);
nni_list_node_remove(&ctx->send_node);
if (ctx->request_id != 0) {
@@ -561,7 +587,7 @@ req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv)
// entire state machine. This allows us to preserve the
// semantic of exactly one receive operation per send
// operation, and should be the least surprising for users. The
- // main consequence is that if a receive operation is completed
+ // main consequence is that if the operation is completed
// (in error or otherwise), the user must submit a new send
// operation to restart the state machine.
req0_ctx_reset(ctx);
@@ -713,6 +739,15 @@ req0_ctx_send(void *arg, nni_aio *aio)
ctx->send_aio = aio;
nni_aio_set_msg(aio, NULL);
+ if (ctx->retry > 0) {
+ ctx->retry_time = nni_clock() + ctx->retry;
+ nni_list_append(&s->retry_queue, ctx);
+ if (!s->retry_active) {
+ s->retry_active = true;
+ nni_sleep_aio(s->retry_tick, &s->retry_aio);
+ }
+ }
+
// Stick us on the send_queue list.
nni_list_append(&s->send_queue, ctx);
@@ -772,6 +807,34 @@ req0_sock_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t)
}
static int
+req0_sock_set_resend_tick(
+ void *arg, const void *buf, size_t sz, nni_opt_type t)
+{
+ req0_sock *s = arg;
+ nng_duration tick;
+ int rv;
+
+ if ((rv = nni_copyin_ms(&tick, buf, sz, t)) == 0) {
+ nni_mtx_lock(&s->mtx);
+ s->retry_tick = tick;
+ nni_mtx_unlock(&s->mtx);
+ }
+ return (rv);
+}
+
+static int
+req0_sock_get_resend_tick(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ req0_sock *s = arg;
+ nng_duration tick;
+
+ nni_mtx_lock(&s->mtx);
+ tick = s->retry_tick;
+ nni_mtx_unlock(&s->mtx);
+ return (nni_copyout_ms(tick, buf, szp, t));
+}
+
+static int
req0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
req0_sock *s = arg;
@@ -846,6 +909,12 @@ static nni_option req0_sock_options[] = {
.o_name = NNG_OPT_SENDFD,
.o_get = req0_sock_get_send_fd,
},
+ {
+ .o_name = NNG_OPT_REQ_RESENDTICK,
+ .o_get = req0_sock_get_resend_tick,
+ .o_set = req0_sock_set_resend_tick,
+ },
+
// terminate list
{
.o_name = NULL,