aboutsummaryrefslogtreecommitdiff
path: root/src/core/endpt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/endpt.c')
-rw-r--r--src/core/endpt.c166
1 files changed, 51 insertions, 115 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index b1ee6b82..da0e3318 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -20,6 +20,7 @@ static void nni_ep_accept_start(nni_ep *);
static void nni_ep_accept_done(void *);
static void nni_ep_connect_start(nni_ep *);
static void nni_ep_connect_done(void *);
+static void nni_ep_backoff_start(nni_ep *);
static void nni_ep_backoff_done(void *);
static nni_idhash *nni_eps;
@@ -211,6 +212,32 @@ nni_ep_stop(nni_ep *ep)
}
static void
+nni_ep_backoff_start(nni_ep *ep)
+{
+ nni_duration backoff;
+
+ if (ep->ep_closed) {
+ return;
+ }
+ backoff = ep->ep_currtime;
+ ep->ep_currtime *= 2;
+ if (ep->ep_currtime > ep->ep_maxrtime) {
+ ep->ep_currtime = ep->ep_maxrtime;
+ }
+
+ // To minimize damage from storms, etc., we select a backoff
+ // value randomly, in the range of [0, backoff-1]; this is pretty
+ // similar to 802 style backoff, except that we have a nearly
+ // uniform time period instead of discrete slot times. This
+ // algorithm may lead to slight biases because we don't have
+ // a statistically perfect distribution with the modulo of the
+ // random number, but this really doesn't matter.
+
+ ep->ep_backoff.a_expire = nni_clock() + (nni_random() % backoff);
+ nni_aio_start(&ep->ep_backoff, NULL, ep);
+}
+
+static void
nni_ep_backoff_done(void *arg)
{
nni_ep * ep = arg;
@@ -230,34 +257,17 @@ nni_ep_backoff_done(void *arg)
static void
nni_ep_connect_done(void *arg)
{
- nni_ep * ep = arg;
- nni_aio * aio = &ep->ep_con_aio;
- void * tpipe;
- nni_pipe * pipe;
- const nni_tran_pipe *ops;
- int rv;
- nni_time cooldown;
-
- ops = ep->ep_tran->tran_pipe;
+ nni_ep * ep = arg;
+ nni_aio *aio = &ep->ep_con_aio;
+ int rv;
+ nni_time cooldown;
- nni_mtx_lock(&ep->ep_mtx);
if ((rv = nni_aio_result(aio)) == 0) {
-
- tpipe = aio->a_pipe;
- NNI_ASSERT(tpipe != NULL);
-
- rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
- if (rv != 0) {
- ops->p_fini(tpipe);
- }
+ rv = nni_pipe_create(ep, aio->a_pipe);
}
-
-done:
+ nni_mtx_lock(&ep->ep_mtx);
switch (rv) {
case 0:
- pipe->p_tran_ops = *ops;
- pipe->p_tran_data = tpipe;
-
// Good connect, so reset the backoff timer.
// XXX: This is kind of bad if a remote host just drops
// the connection without completing our negotiation.
@@ -267,7 +277,6 @@ done:
// some meaningful amount of time. Alternatively we
// can dial into the pipe start logic...
ep->ep_currtime = ep->ep_inirtime;
- nni_pipe_start(pipe);
// No further outgoing connects -- we will restart a
// connection from the pipe when the pipe is removed.
@@ -278,14 +287,7 @@ done:
break;
default:
// Other errors involve the use of the backoff timer.
- // XXX: randomize this slightly to prevent reconnect
- // storms.
- ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime;
- ep->ep_currtime *= 2;
- if (ep->ep_currtime > ep->ep_maxrtime) {
- ep->ep_currtime = ep->ep_maxrtime;
- }
- nni_aio_start(&ep->ep_backoff, NULL, ep);
+ nni_ep_backoff_start(ep);
break;
}
nni_mtx_unlock(&ep->ep_mtx);
@@ -308,115 +310,61 @@ nni_ep_connect_start(nni_ep *ep)
int
nni_ep_dial(nni_ep *ep, int flags)
{
- int rv = 0;
- nni_aio * aio;
- void * tpipe;
- nni_pipe * pipe;
- const nni_tran_pipe *ops;
+ int rv = 0;
+ nni_aio *aio;
nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime);
ep->ep_currtime = ep->ep_inirtime;
nni_mtx_lock(&ep->ep_mtx);
- ops = ep->ep_tran->tran_pipe;
if (ep->ep_mode != NNI_EP_MODE_DIAL) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ENOTSUP);
}
- if (ep->ep_started) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_EBUSY);
- }
if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
}
- ep->ep_started = 1;
-
if ((flags & NNG_FLAG_SYNCH) == 0) {
nni_ep_connect_start(ep);
nni_mtx_unlock(&ep->ep_mtx);
return (0);
}
- // This one is kind of special, since we need
- // to block for the connection to complete. Ick.
+ // Synchronous mode: so we have to wait for it to complete.
aio = &ep->ep_con_syn;
aio->a_endpt = ep->ep_data;
ep->ep_ops.ep_connect(ep->ep_data, aio);
-
- // We're about to drop the lock, but we cannot allow the
- // endpoint to be removed. Put a hold on it.
- ep->ep_refcnt++;
nni_mtx_unlock(&ep->ep_mtx);
nni_aio_wait(aio);
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_refcnt--;
-
- if (ep->ep_closed) {
- rv = NNG_ECLOSED;
- } else {
- rv = nni_aio_result(aio);
- }
-
- if (rv != 0) {
- ep->ep_started = 0;
- nni_cv_wake(&ep->ep_cv);
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
- tpipe = aio->a_pipe;
- NNI_ASSERT(tpipe != NULL);
- rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
- if (rv != 0) {
- ops->p_fini(tpipe);
- } else {
- pipe->p_tran_ops = *ops;
- pipe->p_tran_data = tpipe;
- nni_pipe_start(pipe);
+ // As we're synchronous, we also have to handle the completion.
+ if ((rv = nni_aio_result(aio)) == 0) {
+ NNI_ASSERT(aio->a_pipe != NULL);
+ rv = nni_pipe_create(ep, aio->a_pipe);
}
- nni_cv_wake(&ep->ep_cv);
- nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
static void
nni_ep_accept_done(void *arg)
{
- nni_ep * ep = arg;
- nni_aio * aio = &ep->ep_acc_aio;
- void * tpipe;
- nni_pipe * pipe;
- int rv;
- const nni_tran_pipe *ops;
-
- ops = ep->ep_tran->tran_pipe;
-
- nni_mtx_lock(&ep->ep_mtx);
- if ((rv = nni_aio_result(aio)) != 0) {
- goto done;
- }
- tpipe = aio->a_pipe;
- NNI_ASSERT(tpipe != NULL);
+ nni_ep * ep = arg;
+ nni_aio *aio = &ep->ep_acc_aio;
+ int rv;
- rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
- if (rv != 0) {
- ops->p_fini(tpipe);
- goto done;
+ if ((rv = nni_aio_result(aio)) == 0) {
+ NNI_ASSERT(aio->a_pipe != NULL);
+ rv = nni_pipe_create(ep, aio->a_pipe);
}
-done:
+ nni_mtx_lock(&ep->ep_mtx);
switch (rv) {
case 0:
- pipe->p_tran_ops = *ops;
- pipe->p_tran_data = tpipe;
- nni_pipe_start(pipe);
nni_ep_accept_start(ep);
break;
case NNG_ECLOSED:
@@ -426,7 +374,6 @@ done:
case NNG_ECONNABORTED:
case NNG_ECONNRESET:
// These are remote conditions, no cool down.
- // cooldown = 0;
nni_ep_accept_start(ep);
break;
default:
@@ -434,15 +381,9 @@ done:
// This is because errors here are probably due to system
// failures (resource exhaustion) and we hope by not
// thrashing we give the system a chance to recover.
- ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime;
- ep->ep_currtime *= 2;
- if (ep->ep_currtime > ep->ep_maxrtime) {
- ep->ep_currtime = ep->ep_maxrtime;
- }
- nni_aio_start(&ep->ep_backoff, NULL, ep);
+ nni_ep_backoff_start(ep);
break;
}
-
nni_mtx_unlock(&ep->ep_mtx);
}
@@ -465,25 +406,20 @@ nni_ep_listen(nni_ep *ep, int flags)
{
int rv = 0;
+ nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime);
+
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_mode != NNI_EP_MODE_LISTEN) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ENOTSUP);
}
- if (ep->ep_started) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_EBUSY);
- }
if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
}
- ep->ep_started = 1;
-
rv = ep->ep_ops.ep_bind(ep->ep_data);
if (rv != 0) {
- ep->ep_started = 0;
nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}