diff options
Diffstat (limited to 'src/core/endpt.c')
| -rw-r--r-- | src/core/endpt.c | 166 |
1 files changed, 51 insertions, 115 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index b1ee6b82..da0e3318 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -20,6 +20,7 @@ static void nni_ep_accept_start(nni_ep *); static void nni_ep_accept_done(void *); static void nni_ep_connect_start(nni_ep *); static void nni_ep_connect_done(void *); +static void nni_ep_backoff_start(nni_ep *); static void nni_ep_backoff_done(void *); static nni_idhash *nni_eps; @@ -211,6 +212,32 @@ nni_ep_stop(nni_ep *ep) } static void +nni_ep_backoff_start(nni_ep *ep) +{ + nni_duration backoff; + + if (ep->ep_closed) { + return; + } + backoff = ep->ep_currtime; + ep->ep_currtime *= 2; + if (ep->ep_currtime > ep->ep_maxrtime) { + ep->ep_currtime = ep->ep_maxrtime; + } + + // To minimize damage from storms, etc., we select a backoff + // value randomly, in the range of [0, backoff-1]; this is pretty + // similar to 802 style backoff, except that we have a nearly + // uniform time period instead of discrete slot times. This + // algorithm may lead to slight biases because we don't have + // a statistically perfect distribution with the modulo of the + // random number, but this really doesn't matter. + + ep->ep_backoff.a_expire = nni_clock() + (nni_random() % backoff); + nni_aio_start(&ep->ep_backoff, NULL, ep); +} + +static void nni_ep_backoff_done(void *arg) { nni_ep * ep = arg; @@ -230,34 +257,17 @@ nni_ep_backoff_done(void *arg) static void nni_ep_connect_done(void *arg) { - nni_ep * ep = arg; - nni_aio * aio = &ep->ep_con_aio; - void * tpipe; - nni_pipe * pipe; - const nni_tran_pipe *ops; - int rv; - nni_time cooldown; - - ops = ep->ep_tran->tran_pipe; + nni_ep * ep = arg; + nni_aio *aio = &ep->ep_con_aio; + int rv; + nni_time cooldown; - nni_mtx_lock(&ep->ep_mtx); if ((rv = nni_aio_result(aio)) == 0) { - - tpipe = aio->a_pipe; - NNI_ASSERT(tpipe != NULL); - - rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - ops->p_fini(tpipe); - } + rv = nni_pipe_create(ep, aio->a_pipe); } - -done: + nni_mtx_lock(&ep->ep_mtx); switch (rv) { case 0: - pipe->p_tran_ops = *ops; - pipe->p_tran_data = tpipe; - // Good connect, so reset the backoff timer. // XXX: This is kind of bad if a remote host just drops // the connection without completing our negotiation. @@ -267,7 +277,6 @@ done: // some meaningful amount of time. Alternatively we // can dial into the pipe start logic... ep->ep_currtime = ep->ep_inirtime; - nni_pipe_start(pipe); // No further outgoing connects -- we will restart a // connection from the pipe when the pipe is removed. @@ -278,14 +287,7 @@ done: break; default: // Other errors involve the use of the backoff timer. - // XXX: randomize this slightly to prevent reconnect - // storms. - ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime; - ep->ep_currtime *= 2; - if (ep->ep_currtime > ep->ep_maxrtime) { - ep->ep_currtime = ep->ep_maxrtime; - } - nni_aio_start(&ep->ep_backoff, NULL, ep); + nni_ep_backoff_start(ep); break; } nni_mtx_unlock(&ep->ep_mtx); @@ -308,115 +310,61 @@ nni_ep_connect_start(nni_ep *ep) int nni_ep_dial(nni_ep *ep, int flags) { - int rv = 0; - nni_aio * aio; - void * tpipe; - nni_pipe * pipe; - const nni_tran_pipe *ops; + int rv = 0; + nni_aio *aio; nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime); ep->ep_currtime = ep->ep_inirtime; nni_mtx_lock(&ep->ep_mtx); - ops = ep->ep_tran->tran_pipe; if (ep->ep_mode != NNI_EP_MODE_DIAL) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); } - if (ep->ep_started) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_EBUSY); - } if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } - ep->ep_started = 1; - if ((flags & NNG_FLAG_SYNCH) == 0) { nni_ep_connect_start(ep); nni_mtx_unlock(&ep->ep_mtx); return (0); } - // This one is kind of special, since we need - // to block for the connection to complete. Ick. + // Synchronous mode: so we have to wait for it to complete. aio = &ep->ep_con_syn; aio->a_endpt = ep->ep_data; ep->ep_ops.ep_connect(ep->ep_data, aio); - - // We're about to drop the lock, but we cannot allow the - // endpoint to be removed. Put a hold on it. - ep->ep_refcnt++; nni_mtx_unlock(&ep->ep_mtx); nni_aio_wait(aio); - nni_mtx_lock(&ep->ep_mtx); - ep->ep_refcnt--; - - if (ep->ep_closed) { - rv = NNG_ECLOSED; - } else { - rv = nni_aio_result(aio); - } - - if (rv != 0) { - ep->ep_started = 0; - nni_cv_wake(&ep->ep_cv); - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - - tpipe = aio->a_pipe; - NNI_ASSERT(tpipe != NULL); - rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - ops->p_fini(tpipe); - } else { - pipe->p_tran_ops = *ops; - pipe->p_tran_data = tpipe; - nni_pipe_start(pipe); + // As we're synchronous, we also have to handle the completion. + if ((rv = nni_aio_result(aio)) == 0) { + NNI_ASSERT(aio->a_pipe != NULL); + rv = nni_pipe_create(ep, aio->a_pipe); } - nni_cv_wake(&ep->ep_cv); - nni_mtx_unlock(&ep->ep_mtx); return (rv); } static void nni_ep_accept_done(void *arg) { - nni_ep * ep = arg; - nni_aio * aio = &ep->ep_acc_aio; - void * tpipe; - nni_pipe * pipe; - int rv; - const nni_tran_pipe *ops; - - ops = ep->ep_tran->tran_pipe; - - nni_mtx_lock(&ep->ep_mtx); - if ((rv = nni_aio_result(aio)) != 0) { - goto done; - } - tpipe = aio->a_pipe; - NNI_ASSERT(tpipe != NULL); + nni_ep * ep = arg; + nni_aio *aio = &ep->ep_acc_aio; + int rv; - rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - ops->p_fini(tpipe); - goto done; + if ((rv = nni_aio_result(aio)) == 0) { + NNI_ASSERT(aio->a_pipe != NULL); + rv = nni_pipe_create(ep, aio->a_pipe); } -done: + nni_mtx_lock(&ep->ep_mtx); switch (rv) { case 0: - pipe->p_tran_ops = *ops; - pipe->p_tran_data = tpipe; - nni_pipe_start(pipe); nni_ep_accept_start(ep); break; case NNG_ECLOSED: @@ -426,7 +374,6 @@ done: case NNG_ECONNABORTED: case NNG_ECONNRESET: // These are remote conditions, no cool down. - // cooldown = 0; nni_ep_accept_start(ep); break; default: @@ -434,15 +381,9 @@ done: // This is because errors here are probably due to system // failures (resource exhaustion) and we hope by not // thrashing we give the system a chance to recover. - ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime; - ep->ep_currtime *= 2; - if (ep->ep_currtime > ep->ep_maxrtime) { - ep->ep_currtime = ep->ep_maxrtime; - } - nni_aio_start(&ep->ep_backoff, NULL, ep); + nni_ep_backoff_start(ep); break; } - nni_mtx_unlock(&ep->ep_mtx); } @@ -465,25 +406,20 @@ nni_ep_listen(nni_ep *ep, int flags) { int rv = 0; + nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime); + nni_mtx_lock(&ep->ep_mtx); if (ep->ep_mode != NNI_EP_MODE_LISTEN) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); } - if (ep->ep_started) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_EBUSY); - } if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } - ep->ep_started = 1; - rv = ep->ep_ops.ep_bind(ep->ep_data); if (rv != 0) { - ep->ep_started = 0; nni_mtx_unlock(&ep->ep_mtx); return (rv); } |
