aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c143
-rw-r--r--src/core/endpt.h7
2 files changed, 64 insertions, 86 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 50542d17..b1ee6b82 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -20,6 +20,7 @@ static void nni_ep_accept_start(nni_ep *);
static void nni_ep_accept_done(void *);
static void nni_ep_connect_start(nni_ep *);
static void nni_ep_connect_done(void *);
+static void nni_ep_backoff_done(void *);
static nni_idhash *nni_eps;
@@ -60,6 +61,7 @@ nni_ep_destroy(nni_ep *ep)
nni_aio_fini(&ep->ep_acc_aio);
nni_aio_fini(&ep->ep_con_aio);
nni_aio_fini(&ep->ep_con_syn);
+ nni_aio_fini(&ep->ep_backoff);
if (ep->ep_data != NULL) {
ep->ep_ops.ep_fini(ep->ep_data);
}
@@ -121,6 +123,11 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
nni_ep_destroy(ep);
return (rv);
}
+ rv = nni_aio_init(&ep->ep_backoff, nni_ep_backoff_done, ep);
+ if (rv != 0) {
+ nni_ep_destroy(ep);
+ return (rv);
+ }
ep->ep_sock = sock;
ep->ep_tran = tran;
@@ -204,6 +211,23 @@ nni_ep_stop(nni_ep *ep)
}
static void
+nni_ep_backoff_done(void *arg)
+{
+ nni_ep * ep = arg;
+ nni_aio *aio = &ep->ep_backoff;
+
+ nni_mtx_lock(&ep->ep_mtx);
+ if (nni_aio_result(aio) == NNG_ETIMEDOUT) {
+ if (ep->ep_mode == NNI_EP_MODE_DIAL) {
+ nni_ep_connect_start(ep);
+ } else {
+ nni_ep_accept_start(ep);
+ }
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
+}
+
+static void
nni_ep_connect_done(void *arg)
{
nni_ep * ep = arg;
@@ -212,6 +236,7 @@ nni_ep_connect_done(void *arg)
nni_pipe * pipe;
const nni_tran_pipe *ops;
int rv;
+ nni_time cooldown;
ops = ep->ep_tran->tran_pipe;
@@ -232,7 +257,18 @@ done:
case 0:
pipe->p_tran_ops = *ops;
pipe->p_tran_data = tpipe;
+
+ // Good connect, so reset the backoff timer.
+ // XXX: This is kind of bad if a remote host just drops
+ // the connection without completing our negotiation.
+ // We should reset on close instead, when the pipe is
+ // removed *after* a good connect is made, and only
+ // if we manage to keep the pipe open for at least
+ // some meaningful amount of time. Alternatively we
+ // can dial into the pipe start logic...
+ ep->ep_currtime = ep->ep_inirtime;
nni_pipe_start(pipe);
+
// No further outgoing connects -- we will restart a
// connection from the pipe when the pipe is removed.
break;
@@ -241,15 +277,15 @@ done:
// Canceled/closed -- stop everything.
break;
default:
- // cooldown = nni_clock() + rtime;
- // rtime *= 2;
- // if ((maxrtime >= defrtime) && (rtime > maxrtime)) {
- // rtime = maxrtime;
- // }
-
- // XXX: We need to inject a cooldown, and then try again. For
- // now we just try again immediately. This is very suboptimal.
- nni_ep_connect_start(ep);
+ // Other errors involve the use of the backoff timer.
+ // XXX: randomize this slightly to prevent reconnect
+ // storms.
+ ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime;
+ ep->ep_currtime *= 2;
+ if (ep->ep_currtime > ep->ep_maxrtime) {
+ ep->ep_currtime = ep->ep_maxrtime;
+ }
+ nni_aio_start(&ep->ep_backoff, NULL, ep);
break;
}
nni_mtx_unlock(&ep->ep_mtx);
@@ -269,71 +305,6 @@ nni_ep_connect_start(nni_ep *ep)
ep->ep_ops.ep_connect(ep->ep_data, aio);
}
-// nni_dialer is the thread worker that dials in the background.
-#if 0 // this has the logic for timing out... remove when done.
-static void
-nni_dialer(void *arg)
-{
- nni_ep * ep = arg;
- int rv;
- nni_time cooldown;
- nni_duration maxrtime = 0, nmaxrtime;
- nni_duration defrtime = 0, ndefrtime;
- nni_duration rtime;
-
- nni_sock_reconntimes(ep->ep_sock, &defrtime, &maxrtime);
- rtime = defrtime;
-
- for (;;) {
- nni_sock_reconntimes(ep->ep_sock, &ndefrtime, &nmaxrtime);
- if ((defrtime != ndefrtime) || (maxrtime != nmaxrtime)) {
- // Times changed, so reset them.
- defrtime = ndefrtime;
- maxrtime = nmaxrtime;
- rtime = defrtime;
- }
-
- nni_mtx_lock(&ep->ep_mtx);
- while ((!ep->ep_closed) && (ep->ep_pipe != NULL)) {
- rtime = defrtime;
- nni_cv_wait(&ep->ep_cv);
- }
- if (ep->ep_closed) {
- nni_mtx_unlock(&ep->ep_mtx);
- return;
- }
- nni_mtx_unlock(&ep->ep_mtx);
-
- rv = nni_ep_connect_sync(ep);
- switch (rv) {
- case 0:
- // good connection
- continue;
- case NNG_ECLOSED:
- return;
-
- default:
- cooldown = nni_clock() + rtime;
- rtime *= 2;
- if ((maxrtime >= defrtime) && (rtime > maxrtime)) {
- rtime = maxrtime;
- }
- break;
- }
- // we inject a delay so we don't just spin hard on
- // errors like connection refused.
- nni_mtx_lock(&ep->ep_mtx);
- while (!ep->ep_closed) {
- rv = nni_cv_until(&ep->ep_cv, cooldown);
- if (rv == NNG_ETIMEDOUT) {
- break;
- }
- }
- nni_mtx_unlock(&ep->ep_mtx);
- }
-}
-#endif
-
int
nni_ep_dial(nni_ep *ep, int flags)
{
@@ -343,6 +314,9 @@ nni_ep_dial(nni_ep *ep, int flags)
nni_pipe * pipe;
const nni_tran_pipe *ops;
+ nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime);
+ ep->ep_currtime = ep->ep_inirtime;
+
nni_mtx_lock(&ep->ep_mtx);
ops = ep->ep_tran->tran_pipe;
@@ -455,18 +429,17 @@ done:
// cooldown = 0;
nni_ep_accept_start(ep);
break;
- case NNG_ENOMEM:
- // We're running low on memory, so its best to wait
- // a whole second to give the system a chance to
- // recover memory.
- // cooldown = 1000000;
- nni_ep_accept_start(ep);
- break;
default:
- // other cases... sleep a tiny bit then try again.
- // cooldown = 1000; 10msec
- // Add a timeout here instead to avoid spinning.
- nni_ep_accept_start(ep);
+ // We don't really know why we failed, but we backoff here.
+ // This is because errors here are probably due to system
+ // failures (resource exhaustion) and we hope by not
+ // thrashing we give the system a chance to recover.
+ ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime;
+ ep->ep_currtime *= 2;
+ if (ep->ep_currtime > ep->ep_maxrtime) {
+ ep->ep_currtime = ep->ep_maxrtime;
+ }
+ nni_aio_start(&ep->ep_backoff, NULL, ep);
break;
}
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 3350bc59..2c14605c 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -38,7 +38,12 @@ struct nni_ep {
nni_list ep_pipes;
nni_aio ep_acc_aio;
nni_aio ep_con_aio;
- nni_aio ep_con_syn; // used for sync connect
+ nni_aio ep_con_syn; // used for sync connect
+ nni_aio ep_backoff; // backoff timer
+ nni_duration ep_maxrtime; // maximum time for reconnect
+ nni_duration ep_currtime; // current time for reconnect
+ nni_duration ep_inirtime; // initial time for reconnect
+ nni_time ep_conntime; // time of last good connect
nni_taskq_ent ep_reap_tqe;
};