diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/endpt.c | 143 | ||||
| -rw-r--r-- | src/core/endpt.h | 7 |
2 files changed, 64 insertions, 86 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 50542d17..b1ee6b82 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -20,6 +20,7 @@ static void nni_ep_accept_start(nni_ep *); static void nni_ep_accept_done(void *); static void nni_ep_connect_start(nni_ep *); static void nni_ep_connect_done(void *); +static void nni_ep_backoff_done(void *); static nni_idhash *nni_eps; @@ -60,6 +61,7 @@ nni_ep_destroy(nni_ep *ep) nni_aio_fini(&ep->ep_acc_aio); nni_aio_fini(&ep->ep_con_aio); nni_aio_fini(&ep->ep_con_syn); + nni_aio_fini(&ep->ep_backoff); if (ep->ep_data != NULL) { ep->ep_ops.ep_fini(ep->ep_data); } @@ -121,6 +123,11 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) nni_ep_destroy(ep); return (rv); } + rv = nni_aio_init(&ep->ep_backoff, nni_ep_backoff_done, ep); + if (rv != 0) { + nni_ep_destroy(ep); + return (rv); + } ep->ep_sock = sock; ep->ep_tran = tran; @@ -204,6 +211,23 @@ nni_ep_stop(nni_ep *ep) } static void +nni_ep_backoff_done(void *arg) +{ + nni_ep * ep = arg; + nni_aio *aio = &ep->ep_backoff; + + nni_mtx_lock(&ep->ep_mtx); + if (nni_aio_result(aio) == NNG_ETIMEDOUT) { + if (ep->ep_mode == NNI_EP_MODE_DIAL) { + nni_ep_connect_start(ep); + } else { + nni_ep_accept_start(ep); + } + } + nni_mtx_unlock(&ep->ep_mtx); +} + +static void nni_ep_connect_done(void *arg) { nni_ep * ep = arg; @@ -212,6 +236,7 @@ nni_ep_connect_done(void *arg) nni_pipe * pipe; const nni_tran_pipe *ops; int rv; + nni_time cooldown; ops = ep->ep_tran->tran_pipe; @@ -232,7 +257,18 @@ done: case 0: pipe->p_tran_ops = *ops; pipe->p_tran_data = tpipe; + + // Good connect, so reset the backoff timer. + // XXX: This is kind of bad if a remote host just drops + // the connection without completing our negotiation. + // We should reset on close instead, when the pipe is + // removed *after* a good connect is made, and only + // if we manage to keep the pipe open for at least + // some meaningful amount of time. Alternatively we + // can dial into the pipe start logic... + ep->ep_currtime = ep->ep_inirtime; nni_pipe_start(pipe); + // No further outgoing connects -- we will restart a // connection from the pipe when the pipe is removed. break; @@ -241,15 +277,15 @@ done: // Canceled/closed -- stop everything. break; default: - // cooldown = nni_clock() + rtime; - // rtime *= 2; - // if ((maxrtime >= defrtime) && (rtime > maxrtime)) { - // rtime = maxrtime; - // } - - // XXX: We need to inject a cooldown, and then try again. For - // now we just try again immediately. This is very suboptimal. - nni_ep_connect_start(ep); + // Other errors involve the use of the backoff timer. + // XXX: randomize this slightly to prevent reconnect + // storms. + ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime; + ep->ep_currtime *= 2; + if (ep->ep_currtime > ep->ep_maxrtime) { + ep->ep_currtime = ep->ep_maxrtime; + } + nni_aio_start(&ep->ep_backoff, NULL, ep); break; } nni_mtx_unlock(&ep->ep_mtx); @@ -269,71 +305,6 @@ nni_ep_connect_start(nni_ep *ep) ep->ep_ops.ep_connect(ep->ep_data, aio); } -// nni_dialer is the thread worker that dials in the background. -#if 0 // this has the logic for timing out... remove when done. -static void -nni_dialer(void *arg) -{ - nni_ep * ep = arg; - int rv; - nni_time cooldown; - nni_duration maxrtime = 0, nmaxrtime; - nni_duration defrtime = 0, ndefrtime; - nni_duration rtime; - - nni_sock_reconntimes(ep->ep_sock, &defrtime, &maxrtime); - rtime = defrtime; - - for (;;) { - nni_sock_reconntimes(ep->ep_sock, &ndefrtime, &nmaxrtime); - if ((defrtime != ndefrtime) || (maxrtime != nmaxrtime)) { - // Times changed, so reset them. - defrtime = ndefrtime; - maxrtime = nmaxrtime; - rtime = defrtime; - } - - nni_mtx_lock(&ep->ep_mtx); - while ((!ep->ep_closed) && (ep->ep_pipe != NULL)) { - rtime = defrtime; - nni_cv_wait(&ep->ep_cv); - } - if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - return; - } - nni_mtx_unlock(&ep->ep_mtx); - - rv = nni_ep_connect_sync(ep); - switch (rv) { - case 0: - // good connection - continue; - case NNG_ECLOSED: - return; - - default: - cooldown = nni_clock() + rtime; - rtime *= 2; - if ((maxrtime >= defrtime) && (rtime > maxrtime)) { - rtime = maxrtime; - } - break; - } - // we inject a delay so we don't just spin hard on - // errors like connection refused. - nni_mtx_lock(&ep->ep_mtx); - while (!ep->ep_closed) { - rv = nni_cv_until(&ep->ep_cv, cooldown); - if (rv == NNG_ETIMEDOUT) { - break; - } - } - nni_mtx_unlock(&ep->ep_mtx); - } -} -#endif - int nni_ep_dial(nni_ep *ep, int flags) { @@ -343,6 +314,9 @@ nni_ep_dial(nni_ep *ep, int flags) nni_pipe * pipe; const nni_tran_pipe *ops; + nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime); + ep->ep_currtime = ep->ep_inirtime; + nni_mtx_lock(&ep->ep_mtx); ops = ep->ep_tran->tran_pipe; @@ -455,18 +429,17 @@ done: // cooldown = 0; nni_ep_accept_start(ep); break; - case NNG_ENOMEM: - // We're running low on memory, so its best to wait - // a whole second to give the system a chance to - // recover memory. - // cooldown = 1000000; - nni_ep_accept_start(ep); - break; default: - // other cases... sleep a tiny bit then try again. - // cooldown = 1000; 10msec - // Add a timeout here instead to avoid spinning. - nni_ep_accept_start(ep); + // We don't really know why we failed, but we backoff here. + // This is because errors here are probably due to system + // failures (resource exhaustion) and we hope by not + // thrashing we give the system a chance to recover. + ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime; + ep->ep_currtime *= 2; + if (ep->ep_currtime > ep->ep_maxrtime) { + ep->ep_currtime = ep->ep_maxrtime; + } + nni_aio_start(&ep->ep_backoff, NULL, ep); break; } diff --git a/src/core/endpt.h b/src/core/endpt.h index 3350bc59..2c14605c 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -38,7 +38,12 @@ struct nni_ep { nni_list ep_pipes; nni_aio ep_acc_aio; nni_aio ep_con_aio; - nni_aio ep_con_syn; // used for sync connect + nni_aio ep_con_syn; // used for sync connect + nni_aio ep_backoff; // backoff timer + nni_duration ep_maxrtime; // maximum time for reconnect + nni_duration ep_currtime; // current time for reconnect + nni_duration ep_inirtime; // initial time for reconnect + nni_time ep_conntime; // time of last good connect nni_taskq_ent ep_reap_tqe; }; |
