diff options
Diffstat (limited to 'src/core/dialer.c')
| -rw-r--r-- | src/core/dialer.c | 237 |
1 files changed, 47 insertions, 190 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c index e93c893e..3144d673 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -9,36 +9,12 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include <stdio.h> #include <stdlib.h> #include <string.h> -struct nni_dialer { - nni_tran_dialer_ops d_ops; // transport ops - nni_tran * d_tran; // transport pointer - void * d_data; // transport private - uint64_t d_id; // endpoint id - nni_list_node d_node; // per socket list - nni_sock * d_sock; - nni_url * d_url; - int d_refcnt; - int d_lastrv; // last result from synchronous - bool d_synch; // synchronous connect in progress? - bool d_started; - bool d_closed; // full shutdown - nni_atomic_flag d_closing; // close pending (waiting on refcnt) - nni_mtx d_mtx; - nni_cv d_cv; - nni_list d_pipes; - nni_aio * d_con_aio; - nni_aio * d_tmo_aio; // backoff timer - nni_duration d_maxrtime; // maximum time for reconnect - nni_duration d_currtime; // current time for reconnect - nni_duration d_inirtime; // initial time for reconnect - nni_time d_conntime; // time of last good connect -}; - // Functionality related to dialers. static void dialer_connect_start(nni_dialer *); static void dialer_connect_cb(void *); @@ -65,6 +41,7 @@ nni_dialer_sys_init(void) void nni_dialer_sys_fini(void) { + nni_reap_drain(); nni_mtx_fini(&dialers_lk); nni_idhash_fini(dialers); dialers = NULL; @@ -73,26 +50,15 @@ nni_dialer_sys_fini(void) uint32_t nni_dialer_id(nni_dialer *d) { - return ((uint32_t) d->d_id); + return (d->d_id); } -static void -dialer_destroy(nni_dialer *d) +void +nni_dialer_destroy(nni_dialer *d) { - if (d == NULL) { - return; - } - - // Remove us from the table so we cannot be found. - if (d->d_id != 0) { - nni_idhash_remove(dialers, d->d_id); - } - nni_aio_stop(d->d_con_aio); nni_aio_stop(d->d_tmo_aio); - nni_sock_remove_dialer(d->d_sock, d); - nni_aio_fini(d->d_con_aio); nni_aio_fini(d->d_tmo_aio); @@ -126,13 +92,13 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) nni_url_free(url); return (NNG_ENOMEM); } - d->d_url = url; - d->d_closed = false; - d->d_started = false; - d->d_data = NULL; - d->d_refcnt = 1; - d->d_sock = s; - d->d_tran = tran; + d->d_url = url; + d->d_closed = false; + d->d_data = NULL; + d->d_refcnt = 1; + d->d_sock = s; + d->d_tran = tran; + nni_atomic_flag_reset(&d->d_started); nni_atomic_flag_reset(&d->d_closing); // Make a copy of the endpoint operations. This allows us to @@ -141,8 +107,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) d->d_ops = *tran->tran_dialer; NNI_LIST_NODE_INIT(&d->d_node); - - nni_pipe_ep_list_init(&d->d_pipes); + NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node); nni_mtx_init(&d->d_mtx); nni_cv_init(&d->d_cv, &d->d_mtx); @@ -150,9 +115,9 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) || ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) || - ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) || + ((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) || ((rv = nni_sock_add_dialer(s, d)) != 0)) { - dialer_destroy(d); + nni_dialer_destroy(d); return (rv); } @@ -203,80 +168,33 @@ nni_dialer_rele(nni_dialer *d) { nni_mtx_lock(&dialers_lk); d->d_refcnt--; - if (d->d_refcnt == 0) { - nni_cv_wake(&d->d_cv); + if ((d->d_refcnt == 0) && (d->d_closed)) { + nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d); } nni_mtx_unlock(&dialers_lk); } -int -nni_dialer_shutdown(nni_dialer *d) -{ - if (nni_atomic_flag_test_and_set(&d->d_closing)) { - return (NNG_ECLOSED); - } - - // Abort any remaining in-flight operations. - nni_aio_close(d->d_con_aio); - nni_aio_close(d->d_tmo_aio); - - // Stop the underlying transport. - d->d_ops.d_close(d->d_data); - - return (0); -} - void nni_dialer_close(nni_dialer *d) { - nni_pipe *p; - - nni_mtx_lock(&d->d_mtx); + nni_mtx_lock(&dialers_lk); if (d->d_closed) { - nni_mtx_unlock(&d->d_mtx); + nni_mtx_unlock(&dialers_lk); nni_dialer_rele(d); return; } d->d_closed = true; - nni_mtx_unlock(&d->d_mtx); - - nni_dialer_shutdown(d); - - nni_aio_stop(d->d_con_aio); - nni_aio_stop(d->d_tmo_aio); - - nni_mtx_lock(&d->d_mtx); - NNI_LIST_FOREACH (&d->d_pipes, p) { - nni_pipe_stop(p); - } - while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) { - nni_cv_wait(&d->d_cv); - } - nni_mtx_unlock(&d->d_mtx); - - dialer_destroy(d); -} + nni_mtx_unlock(&dialers_lk); -// This function starts an exponential backoff timer for reconnecting. -static void -dialer_timer_start(nni_dialer *d) -{ - nni_duration backoff; + // Remove us from the table so we cannot be found. + // This is done fairly early in the teardown process. + // If we're here, either the socket or the listener has been + // closed at the user request, so there would be a race anyway. + nni_idhash_remove(dialers, d->d_id); - backoff = d->d_currtime; - d->d_currtime *= 2; - if (d->d_currtime > d->d_maxrtime) { - d->d_currtime = d->d_maxrtime; - } + nni_dialer_shutdown(d); - // To minimize damage from storms, etc., we select a backoff - // value randomly, in the range of [0, backoff-1]; this is - // pretty similar to 802 style backoff, except that we have a - // nearly uniform time period instead of discrete slot times. - // This algorithm may lead to slight biases because we don't - // have a statistically perfect distribution with the modulo of - // the random number, but this really doesn't matter. - nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio); + nni_dialer_rele(d); } static void @@ -285,11 +203,9 @@ dialer_timer_cb(void *arg) nni_dialer *d = arg; nni_aio * aio = d->d_tmo_aio; - nni_mtx_lock(&d->d_mtx); if (nni_aio_result(aio) == 0) { dialer_connect_start(d); } - nni_mtx_unlock(&d->d_mtx); } static void @@ -304,52 +220,32 @@ dialer_connect_cb(void *arg) if ((rv = nni_aio_result(aio)) == 0) { void *data = nni_aio_get_output(aio, 0); NNI_ASSERT(data != NULL); - rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data); + rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data); } nni_mtx_lock(&d->d_mtx); - synch = d->d_synch; - d->d_synch = false; - if (rv == 0) { - nni_pipe_set_dialer(p, d); - nni_list_append(&d->d_pipes, p); - - // Good connect, so reset the backoff timer. - // Note that a host that accepts the connect, but drops - // us immediately, is going to get hit pretty hard - // (depending on the initial backoff) with no - // exponential backoff. This can happen if we wind up - // trying to connect to some port that does not speak - // SP for example. - d->d_currtime = d->d_inirtime; - } + synch = d->d_synch; nni_mtx_unlock(&d->d_mtx); - if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) { - nni_pipe_stop(p); - } - - nni_mtx_lock(&d->d_mtx); switch (rv) { case 0: - // No further outgoing connects -- we will restart a - // connection from the pipe when the pipe is removed. + nni_dialer_add_pipe(d, p); break; - case NNG_ECLOSED: - case NNG_ECANCELED: - // Canceled/closed -- stop everything. + case NNG_ECLOSED: // No further action. + case NNG_ECANCELED: // No further action. break; default: - // redial, but only if we are not synchronous if (!synch) { - dialer_timer_start(d); + nni_dialer_timer_start(d); } break; } if (synch) { + nni_mtx_lock(&d->d_mtx); + d->d_synch = false; d->d_lastrv = rv; nni_cv_wake(&d->d_cv); + nni_mtx_unlock(&d->d_mtx); } - nni_mtx_unlock(&d->d_mtx); } static void @@ -366,70 +262,37 @@ nni_dialer_start(nni_dialer *d, int flags) { int rv = 0; - // nni_sock_reconntimes(d->d_sock, &d->d_inirtime, - //&d->d_maxrtime); - d->d_currtime = d->d_inirtime; - - nni_mtx_lock(&d->d_mtx); - - if (d->d_started) { - nni_mtx_unlock(&d->d_mtx); + if (nni_atomic_flag_test_and_set(&d->d_started)) { return (NNG_ESTATE); } if ((flags & NNG_FLAG_NONBLOCK) != 0) { - d->d_started = true; - dialer_connect_start(d); + nni_mtx_lock(&d->d_mtx); + d->d_currtime = d->d_inirtime; nni_mtx_unlock(&d->d_mtx); + dialer_connect_start(d); return (0); } - d->d_synch = true; - d->d_started = true; + nni_mtx_lock(&d->d_mtx); + d->d_synch = true; + nni_mtx_unlock(&d->d_mtx); + dialer_connect_start(d); + nni_mtx_lock(&d->d_mtx); while (d->d_synch) { nni_cv_wait(&d->d_cv); } rv = d->d_lastrv; - nni_cv_wake(&d->d_cv); + nni_mtx_unlock(&d->d_mtx); if (rv != 0) { - d->d_started = false; + nni_atomic_flag_reset(&d->d_started); } - nni_mtx_unlock(&d->d_mtx); return (rv); } -void -nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p) -{ - if (d == NULL) { - return; - } - - // Break up the relationship between the dialer and the pipe. - nni_mtx_lock(&d->d_mtx); - // During early init, the pipe might not have this set. - if (nni_list_active(&d->d_pipes, p)) { - nni_list_remove(&d->d_pipes, p); - } - // Wake up the close thread if it is waiting. - if (d->d_closed) { - if (nni_list_empty(&d->d_pipes)) { - nni_cv_wake(&d->d_cv); - } - } else { - // If this pipe closed, then lets restart the dial operation. - // Since the remote side seems to have closed, lets start with - // a backoff. This keeps us from pounding the crap out of the - // thing if a remote server accepts but then disconnects - // immediately. - dialer_timer_start(d); - } - nni_mtx_unlock(&d->d_mtx); -} - int nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, nni_opt_type t) @@ -508,9 +371,3 @@ nni_dialer_getopt( return (nni_sock_getopt(d->d_sock, name, valp, szp, t)); } - -void -nni_dialer_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_dialer, d_node); -} |
