diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-12 12:24:54 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-14 13:43:02 -0700 |
| commit | 343417234aa3fd86e8ae0b56ae500a1ed3411cfc (patch) | |
| tree | 728992cfe8c2987d5939026a1f734dcc58b3df18 /src/core/endpt.c | |
| parent | 4fb81f024e5f32a186cd5538574f8e5796980e36 (diff) | |
| download | nng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.tar.gz nng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.tar.bz2 nng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.zip | |
fixes #62 Endpoint close should be synchronous #62
fixes #66 Make pipe and endpoint structures private
This changes a number of things, refactoring endpoints and supporting
code to keep their internals private, and making endpoint close
synchronous. This will allow us to add a consumer facing API for
nng_ep_close(), as well as property APIs, etc.
While here a bunch of convoluted and dead code was cleaned up.
Diffstat (limited to 'src/core/endpt.c')
| -rw-r--r-- | src/core/endpt.c | 389 |
1 files changed, 203 insertions, 186 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index a5865acf..6e5f7e8a 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -14,58 +14,63 @@ #include <stdlib.h> #include <string.h> +struct nni_ep { + nni_tran_ep ep_ops; // transport ops + nni_tran * ep_tran; // transport pointer + void * ep_data; // transport private + uint32_t ep_id; // endpoint id + nni_list_node ep_node; // per socket list + nni_sock * ep_sock; + char ep_addr[NNG_MAXADDRLEN]; + int ep_mode; + int ep_closed; // full shutdown + int ep_closing; // close pending (waiting on refcnt) + int ep_bound; // true if we bound locally + int ep_refcnt; + nni_mtx ep_mtx; + nni_cv ep_cv; + nni_list ep_pipes; + nni_aio ep_acc_aio; + nni_aio ep_con_aio; + nni_aio ep_con_syn; // used for sync connect + nni_aio ep_tmo_aio; // backoff timer + nni_duration ep_maxrtime; // maximum time for reconnect + nni_duration ep_currtime; // current time for reconnect + nni_duration ep_inirtime; // initial time for reconnect + nni_time ep_conntime; // time of last good connect +}; + // Functionality related to end points. -static void nni_ep_accept_start(nni_ep *); -static void nni_ep_accept_done(void *); -static void nni_ep_connect_start(nni_ep *); -static void nni_ep_connect_done(void *); -static void nni_ep_backoff_start(nni_ep *); -static void nni_ep_backoff_done(void *); -static void nni_ep_reaper(void *); +static void nni_ep_acc_start(nni_ep *); +static void nni_ep_acc_cb(void *); +static void nni_ep_con_start(nni_ep *); +static void nni_ep_con_cb(void *); +static void nni_ep_tmo_start(nni_ep *); +static void nni_ep_tmo_cb(void *); static nni_idhash *nni_eps; - -static nni_list nni_ep_reap_list; -static nni_mtx nni_ep_reap_lk; -static nni_cv nni_ep_reap_cv; -static nni_thr nni_ep_reap_thr; -static int nni_ep_reap_run; +static nni_mtx nni_ep_lk; int nni_ep_sys_init(void) { int rv; - NNI_LIST_INIT(&nni_ep_reap_list, nni_ep, ep_reap_node); - - if (((rv = nni_mtx_init(&nni_ep_reap_lk)) != 0) || - ((rv = nni_cv_init(&nni_ep_reap_cv, &nni_ep_reap_lk)) != 0) || - ((rv = nni_thr_init(&nni_ep_reap_thr, nni_ep_reaper, 0)) != 0) || + if (((rv = nni_mtx_init(&nni_ep_lk)) != 0) || ((rv = nni_idhash_init(&nni_eps)) != 0)) { return (rv); } nni_idhash_set_limits( nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff); - nni_ep_reap_run = 1; - nni_thr_run(&nni_ep_reap_thr); - return (0); } void nni_ep_sys_fini(void) { - if (nni_ep_reap_run) { - nni_mtx_lock(&nni_ep_reap_lk); - nni_ep_reap_run = 0; - nni_cv_wake(&nni_ep_reap_cv); - nni_mtx_unlock(&nni_ep_reap_lk); - } - nni_thr_fini(&nni_ep_reap_thr); - nni_cv_fini(&nni_ep_reap_cv); - nni_mtx_fini(&nni_ep_reap_lk); + nni_mtx_fini(&nni_ep_lk); nni_idhash_fini(nni_eps); nni_eps = NULL; } @@ -83,19 +88,22 @@ nni_ep_destroy(nni_ep *ep) return; } - // Remove us form the table so we cannot be found. + // Remove us from the table so we cannot be found. if (ep->ep_id != 0) { nni_idhash_remove(nni_eps, ep->ep_id); } + + nni_sock_ep_remove(ep->ep_sock, ep); + nni_aio_stop(&ep->ep_acc_aio); nni_aio_stop(&ep->ep_con_aio); nni_aio_stop(&ep->ep_con_syn); - nni_aio_stop(&ep->ep_backoff); + nni_aio_stop(&ep->ep_tmo_aio); nni_aio_fini(&ep->ep_acc_aio); nni_aio_fini(&ep->ep_con_aio); nni_aio_fini(&ep->ep_con_syn); - nni_aio_fini(&ep->ep_backoff); + nni_aio_fini(&ep->ep_tmo_aio); nni_mtx_lock(&ep->ep_mtx); if (ep->ep_data != NULL) { @@ -107,8 +115,8 @@ nni_ep_destroy(nni_ep *ep) NNI_FREE_STRUCT(ep); } -int -nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) +static int +nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode) { nni_tran *tran; nni_ep * ep; @@ -127,152 +135,174 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) ep->ep_closed = 0; ep->ep_bound = 0; ep->ep_data = NULL; - ep->ep_refcnt = 0; + ep->ep_refcnt = 1; + ep->ep_sock = s; + ep->ep_tran = tran; + ep->ep_mode = mode; + + // Make a copy of the endpoint operations. This allows us to + // modify them (to override NULLs for example), and avoids an extra + // dereference on hot paths. + ep->ep_ops = *tran->tran_ep; + + // Could safely use strcpy here, but this avoids discussion. + (void) snprintf(ep->ep_addr, sizeof(ep->ep_addr), "%s", addr); NNI_LIST_NODE_INIT(&ep->ep_node); - NNI_LIST_NODE_INIT(&ep->ep_reap_node); nni_pipe_ep_list_init(&ep->ep_pipes); if (((rv = nni_mtx_init(&ep->ep_mtx)) != 0) || ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) || - ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0)) { - nni_ep_destroy(ep); - return (rv); - } - rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_accept_done, ep); - if (rv != 0) { - nni_ep_destroy(ep); - return (rv); - } - rv = nni_aio_init(&ep->ep_con_aio, nni_ep_connect_done, ep); - if (rv != 0) { - nni_ep_destroy(ep); - return (rv); - } - rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL); - if (rv != 0) { - nni_ep_destroy(ep); - return (rv); - } - rv = nni_aio_init(&ep->ep_backoff, nni_ep_backoff_done, ep); - if (rv != 0) { + ((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) || + ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || + ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) || + ((rv = nni_sock_ep_add(s, ep)) != 0)) { nni_ep_destroy(ep); return (rv); } - ep->ep_sock = sock; - ep->ep_tran = tran; - ep->ep_mode = mode; + *epp = ep; + return (0); +} - // Could safely use strcpy here, but this avoids discussion. - (void) snprintf(ep->ep_addr, sizeof(ep->ep_addr), "%s", addr); +int +nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *addr) +{ + return (nni_ep_create(epp, s, addr, NNI_EP_MODE_DIAL)); +} - // Make a copy of the endpoint operations. This allows us to - // modify them (to override NULLs for example), and avoids an extra - // dereference on hot paths. - ep->ep_ops = *tran->tran_ep; +int +nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *addr) +{ + return (nni_ep_create(epp, s, addr, NNI_EP_MODE_LISTEN)); +} - if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock, mode)) != 0) { - nni_ep_destroy(ep); +int +nni_ep_find(nni_ep **epp, uint32_t id) +{ + int rv; + nni_ep *ep; + + if ((rv = nni_init()) != 0) { return (rv); } - *epp = ep; - return (0); + nni_mtx_lock(&nni_ep_lk); + if ((rv = nni_idhash_find(nni_eps, id, (void **) &ep)) == 0) { + if (ep->ep_closed) { + rv = NNG_ECLOSED; + } else { + ep->ep_refcnt++; + *epp = ep; + } + } + nni_mtx_unlock(&nni_ep_lk); + return (rv); +} + +int +nni_ep_hold(nni_ep *ep) +{ + int rv; + nni_mtx_lock(&nni_ep_lk); + if (ep->ep_closed) { + rv = NNG_ECLOSED; + } else { + ep->ep_refcnt++; + rv = 0; + } + nni_mtx_unlock(&nni_ep_lk); + return (rv); } void -nni_ep_close(nni_ep *ep) +nni_ep_rele(nni_ep *ep) +{ + nni_mtx_lock(&nni_ep_lk); + ep->ep_refcnt--; + if (ep->ep_closing) { + nni_cv_wake(&ep->ep_cv); + } + nni_mtx_unlock(&nni_ep_lk); +} + +int +nni_ep_shutdown(nni_ep *ep) { + // This is done under the endpoints lock, although the remove + // is done under that as well, we also make sure that we hold + // the socket lock in the remove step. nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed) { + if (ep->ep_closing) { nni_mtx_unlock(&ep->ep_mtx); - return; + return (NNG_ECLOSED); } - ep->ep_closed = 1; - nni_mtx_unlock(&ep->ep_mtx); + ep->ep_closing = 1; // Abort any remaining in-flight operations. nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED); nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED); nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED); - nni_aio_cancel(&ep->ep_backoff, NNG_ECLOSED); + nni_aio_cancel(&ep->ep_tmo_aio, NNG_ECLOSED); // Stop the underlying transport. ep->ep_ops.ep_close(ep->ep_data); -} - -static void -nni_ep_reap(nni_ep *ep) -{ - nni_ep_close(ep); // Extra sanity. - - nni_aio_stop(&ep->ep_acc_aio); - nni_aio_stop(&ep->ep_con_aio); - nni_aio_stop(&ep->ep_con_syn); - nni_aio_stop(&ep->ep_backoff); - // Take us off the sock list. - nni_sock_ep_remove(ep->ep_sock, ep); - - // Make sure any other unlocked users (references) are gone - // before we actually remove the memory. We should not have - // to wait long as we have closed the underlying pipe and - // done everything we can to wake any waiter (synchronous connect) - // gracefully. - nni_mtx_lock(&ep->ep_mtx); - ep->ep_closed = 1; - for (;;) { - if ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 0)) { - nni_cv_wait(&ep->ep_cv); - continue; - } - break; - } nni_mtx_unlock(&ep->ep_mtx); - nni_ep_destroy(ep); + return (0); } void -nni_ep_stop(nni_ep *ep) +nni_ep_close(nni_ep *ep) { - nni_pipe *pipe; + nni_pipe *p; nni_mtx_lock(&ep->ep_mtx); - - // Protection against recursion. - if (ep->ep_stop) { + if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); + nni_ep_rele(ep); return; } - ep->ep_stop = 1; - NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { - nni_pipe_stop(pipe); + ep->ep_closed = 1; + nni_mtx_unlock(&ep->ep_mtx); + + nni_ep_shutdown(ep); + + nni_aio_stop(&ep->ep_acc_aio); + nni_aio_stop(&ep->ep_con_aio); + nni_aio_stop(&ep->ep_con_syn); + nni_aio_stop(&ep->ep_tmo_aio); + + nni_mtx_lock(&ep->ep_mtx); + NNI_LIST_FOREACH (&ep->ep_pipes, p) { + nni_pipe_stop(p); + } + while ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 1)) { + nni_cv_wait(&ep->ep_cv); } nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_lock(&nni_ep_reap_lk); - NNI_ASSERT(!nni_list_node_active(&ep->ep_reap_node)); - nni_list_append(&nni_ep_reap_list, ep); - nni_cv_wake(&nni_ep_reap_cv); - nni_mtx_unlock(&nni_ep_reap_lk); + nni_ep_destroy(ep); } static void -nni_ep_backoff_cancel(nni_aio *aio, int rv) +nni_ep_tmo_cancel(nni_aio *aio, int rv) { // The only way this ever gets "finished", is via cancellation. nni_aio_finish_error(aio, rv); } static void -nni_ep_backoff_start(nni_ep *ep) +nni_ep_tmo_start(nni_ep *ep) { nni_duration backoff; - if (ep->ep_closed) { + if (ep->ep_closing) { return; } backoff = ep->ep_currtime; @@ -282,36 +312,36 @@ nni_ep_backoff_start(nni_ep *ep) } // To minimize damage from storms, etc., we select a backoff - // value randomly, in the range of [0, backoff-1]; this is pretty - // similar to 802 style backoff, except that we have a nearly - // uniform time period instead of discrete slot times. This - // algorithm may lead to slight biases because we don't have - // a statistically perfect distribution with the modulo of the - // random number, but this really doesn't matter. - - ep->ep_backoff.a_expire = nni_clock() + (nni_random() % backoff); - nni_aio_start(&ep->ep_backoff, nni_ep_backoff_cancel, ep); + // value randomly, in the range of [0, backoff-1]; this is + // pretty similar to 802 style backoff, except that we have a + // nearly uniform time period instead of discrete slot times. + // This algorithm may lead to slight biases because we don't + // have a statistically perfect distribution with the modulo of + // the random number, but this really doesn't matter. + + ep->ep_tmo_aio.a_expire = nni_clock() + (nni_random() % backoff); + nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep); } static void -nni_ep_backoff_done(void *arg) +nni_ep_tmo_cb(void *arg) { nni_ep * ep = arg; - nni_aio *aio = &ep->ep_backoff; + nni_aio *aio = &ep->ep_tmo_aio; nni_mtx_lock(&ep->ep_mtx); if (nni_aio_result(aio) == NNG_ETIMEDOUT) { if (ep->ep_mode == NNI_EP_MODE_DIAL) { - nni_ep_connect_start(ep); + nni_ep_con_start(ep); } else { - nni_ep_accept_start(ep); + nni_ep_acc_start(ep); } } nni_mtx_unlock(&ep->ep_mtx); } static void -nni_ep_connect_done(void *arg) +nni_ep_con_cb(void *arg) { nni_ep * ep = arg; nni_aio *aio = &ep->ep_con_aio; @@ -324,11 +354,12 @@ nni_ep_connect_done(void *arg) switch (rv) { case 0: // Good connect, so reset the backoff timer. - // Note that a host that accepts the connect, but drops us - // immediately, is going to get hit pretty hard (depending - // on the initial backoff) with no exponential backoff. - // This can happen if we wind up trying to connect to some - // port that does not speak SP for example. + // Note that a host that accepts the connect, but drops + // us immediately, is going to get hit pretty hard + // (depending on the initial backoff) with no + // exponential backoff. This can happen if we wind up + // trying to connect to some port that does not speak + // SP for example. ep->ep_currtime = ep->ep_inirtime; // No further outgoing connects -- we will restart a @@ -340,19 +371,19 @@ nni_ep_connect_done(void *arg) break; default: // Other errors involve the use of the backoff timer. - nni_ep_backoff_start(ep); + nni_ep_tmo_start(ep); break; } nni_mtx_unlock(&ep->ep_mtx); } static void -nni_ep_connect_start(nni_ep *ep) +nni_ep_con_start(nni_ep *ep) { nni_aio *aio = &ep->ep_con_aio; // Call with the Endpoint lock held. - if (ep->ep_closed) { + if (ep->ep_closing) { return; } @@ -375,13 +406,13 @@ nni_ep_dial(nni_ep *ep, int flags) nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); } - if (ep->ep_closed) { + if (ep->ep_closing) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } if ((flags & NNG_FLAG_SYNCH) == 0) { - nni_ep_connect_start(ep); + nni_ep_con_start(ep); nni_mtx_unlock(&ep->ep_mtx); return (0); } @@ -404,7 +435,7 @@ nni_ep_dial(nni_ep *ep, int flags) } static void -nni_ep_accept_done(void *arg) +nni_ep_acc_cb(void *arg) { nni_ep * ep = arg; nni_aio *aio = &ep->ep_acc_aio; @@ -418,7 +449,7 @@ nni_ep_accept_done(void *arg) nni_mtx_lock(&ep->ep_mtx); switch (rv) { case 0: - nni_ep_accept_start(ep); + nni_ep_acc_start(ep); break; case NNG_ECLOSED: case NNG_ECANCELED: @@ -427,26 +458,27 @@ nni_ep_accept_done(void *arg) case NNG_ECONNABORTED: case NNG_ECONNRESET: // These are remote conditions, no cool down. - nni_ep_accept_start(ep); + nni_ep_acc_start(ep); break; default: - // We don't really know why we failed, but we backoff here. - // This is because errors here are probably due to system - // failures (resource exhaustion) and we hope by not - // thrashing we give the system a chance to recover. - nni_ep_backoff_start(ep); + // We don't really know why we failed, but we backoff + // here. This is because errors here are probably due + // to system failures (resource exhaustion) and we hope + // by not thrashing we give the system a chance to + // recover. + nni_ep_tmo_start(ep); break; } nni_mtx_unlock(&ep->ep_mtx); } static void -nni_ep_accept_start(nni_ep *ep) +nni_ep_acc_start(nni_ep *ep) { nni_aio *aio = &ep->ep_acc_aio; // Call with the Endpoint lock held. - if (ep->ep_closed) { + if (ep->ep_closing) { return; } aio->a_pipe = NULL; @@ -467,7 +499,7 @@ nni_ep_listen(nni_ep *ep, int flags) nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); } - if (ep->ep_closed) { + if (ep->ep_closing) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } @@ -479,7 +511,7 @@ nni_ep_listen(nni_ep *ep, int flags) } ep->ep_bound = 1; - nni_ep_accept_start(ep); + nni_ep_acc_start(ep); nni_mtx_unlock(&ep->ep_mtx); return (0); @@ -489,12 +521,11 @@ int nni_ep_pipe_add(nni_ep *ep, nni_pipe *p) { nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed) { + if (ep->ep_closing) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } nni_list_append(&ep->ep_pipes, p); - p->p_ep = ep; nni_mtx_unlock(&ep->ep_mtx); return (0); } @@ -508,7 +539,6 @@ nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) if (nni_list_active(&ep->ep_pipes, pipe)) { nni_list_remove(&ep->ep_pipes, pipe); } - pipe->p_ep = NULL; // Wake up the close thread if it is waiting. if (ep->ep_closed && nni_list_empty(&ep->ep_pipes)) { nni_cv_wake(&ep->ep_cv); @@ -517,9 +547,10 @@ nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) // If this pipe closed, then lets restart the dial operation. // Since the remote side seems to have closed, lets start with // a backoff. This keeps us from pounding the crap out of the - // thing if a remote server accepts but then disconnects immediately. + // thing if a remote server accepts but then disconnects + // immediately. if ((!ep->ep_closed) && (ep->ep_mode == NNI_EP_MODE_DIAL)) { - nni_ep_backoff_start(ep); + nni_ep_tmo_start(ep); } nni_mtx_unlock(&ep->ep_mtx); } @@ -530,28 +561,14 @@ nni_ep_list_init(nni_list *list) NNI_LIST_INIT(list, nni_ep, ep_node); } -static void -nni_ep_reaper(void *notused) +nni_tran * +nni_ep_tran(nni_ep *ep) { - NNI_ARG_UNUSED(notused); - - nni_mtx_lock(&nni_ep_reap_lk); - for (;;) { - nni_ep *ep; - - if ((ep = nni_list_first(&nni_ep_reap_list)) != NULL) { - nni_list_remove(&nni_ep_reap_list, ep); - nni_mtx_unlock(&nni_ep_reap_lk); - nni_ep_reap(ep); - nni_mtx_lock(&nni_ep_reap_lk); - continue; - } - - if (!nni_ep_reap_run) { - break; - } + return (ep->ep_tran); +} - nni_cv_wait(&nni_ep_reap_cv); - } - nni_mtx_unlock(&nni_ep_reap_lk); -}
\ No newline at end of file +nni_sock * +nni_ep_sock(nni_ep *ep) +{ + return (ep->ep_sock); +} |
