diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 369 |
1 files changed, 284 insertions, 85 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 620c5d19..640e0db6 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include <stdio.h> #include <string.h> @@ -58,7 +59,7 @@ struct nni_socket { nni_cv s_cv; nni_cv s_close_cv; - uint64_t s_id; + uint32_t s_id; uint32_t s_flags; unsigned s_refcnt; // protected by global lock void * s_data; // Protocol private @@ -97,6 +98,9 @@ struct nni_socket { static void nni_ctx_destroy(nni_ctx *); +static void dialer_shutdown_locked(nni_dialer *); +static void listener_shutdown_locked(nni_listener *); + static int sock_get_fd(nni_sock *s, int flag, int *fdp) { @@ -340,7 +344,7 @@ nni_free_opt(nni_sockopt *opt) uint32_t nni_sock_id(nni_sock *s) { - return ((uint32_t) s->s_id); + return (s->s_id); } // nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain @@ -405,58 +409,6 @@ nni_sock_closing(nni_sock *s) return (rv); } -void -nni_sock_run_pipe_cb(nni_sock *s, int ev, uint32_t id) -{ - if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) { - nng_pipe_cb cb; - void * arg; - - nni_mtx_lock(&s->s_pipe_cbs_mtx); - cb = s->s_pipe_cbs[ev].cb_fn; - arg = s->s_pipe_cbs[ev].cb_arg; - nni_mtx_unlock(&s->s_pipe_cbs_mtx); - - if (cb != NULL) { - nng_pipe p; - p.id = id; - cb(p, ev, arg); - } - } -} - -int -nni_sock_pipe_add(nni_sock *s, nni_pipe *p) -{ - // Initialize protocol pipe data. - nni_mtx_lock(&s->s_mx); - if (s->s_closing) { - nni_mtx_unlock(&s->s_mx); - return (NNG_ECLOSED); - } - - nni_list_append(&s->s_pipes, p); - - // Start the initial negotiation I/O... - nni_pipe_start(p); - - nni_mtx_unlock(&s->s_mx); - return (0); -} - -void -nni_sock_pipe_remove(nni_sock *s, nni_pipe *p) -{ - nni_mtx_lock(&s->s_mx); - if (nni_list_active(&s->s_pipes, p)) { - nni_list_remove(&s->s_pipes, p); - } - if (s->s_closing && nni_list_empty(&s->s_pipes)) { - nni_cv_wake(&s->s_cv); - } - nni_mtx_unlock(&s->s_mx); -} - static void sock_destroy(nni_sock *s) { @@ -521,10 +473,9 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_NODE_INIT(&s->s_node); NNI_LIST_INIT(&s->s_options, nni_sockopt, node); NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); - - nni_pipe_sock_list_init(&s->s_pipes); - nni_listener_list_init(&s->s_listeners); - nni_dialer_list_init(&s->s_dialers); + NNI_LIST_INIT(&s->s_pipes, nni_pipe, p_sock_node); + NNI_LIST_INIT(&s->s_listeners, nni_listener, l_node); + NNI_LIST_INIT(&s->s_dialers, nni_dialer, d_node); nni_mtx_init(&s->s_mx); nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); @@ -615,7 +566,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) } nni_mtx_lock(&sock_lk); - if ((rv = nni_idhash_alloc(sock_hash, &s->s_id, s)) != 0) { + if ((rv = nni_idhash_alloc32(sock_hash, &s->s_id, s)) != 0) { sock_destroy(s); } else { nni_list_append(&sock_list, s); @@ -625,8 +576,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) nni_mtx_unlock(&sock_lk); // Set the sockname. - (void) snprintf( - s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id); + (void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id); return (rv); } @@ -640,9 +590,7 @@ nni_sock_shutdown(nni_sock *sock) { nni_pipe * pipe; nni_dialer * d; - nni_dialer * nd; nni_listener *l; - nni_listener *nl; nni_ctx * ctx; nni_ctx * nctx; @@ -657,10 +605,10 @@ nni_sock_shutdown(nni_sock *sock) // Close the EPs. This prevents new connections from forming // but but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_listeners, l) { - nni_listener_shutdown(l); + listener_shutdown_locked(l); } NNI_LIST_FOREACH (&sock->s_dialers, d) { - nni_dialer_shutdown(d); + dialer_shutdown_locked(d); } nni_mtx_unlock(&sock->s_mx); @@ -706,35 +654,26 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); + // For each pipe, arrange for it to teardown hard. We would + // expect there not to be any here. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_close(pipe); + } + // Go through the dialers and listeners, attempting to close them. // We might already have a close in progress, in which case // we skip past it; it will be removed from another thread. - nl = nni_list_first(&sock->s_listeners); - while ((l = nl) != NULL) { - nl = nni_list_next(&sock->s_listeners, nl); - + NNI_LIST_FOREACH (&sock->s_listeners, l) { if (nni_listener_hold(l) == 0) { - nni_mtx_unlock(&sock->s_mx); nni_listener_close(l); - nni_mtx_lock(&sock->s_mx); } } - nd = nni_list_first(&sock->s_dialers); - while ((d = nd) != NULL) { - nd = nni_list_next(&sock->s_dialers, nd); - + NNI_LIST_FOREACH (&sock->s_dialers, d) { if (nni_dialer_hold(d) == 0) { - nni_mtx_unlock(&sock->s_mx); nni_dialer_close(d); - nni_mtx_lock(&sock->s_mx); } } - // For each pipe, arrange for it to teardown hard. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_stop(pipe); - } - // We have to wait for *both* endpoints and pipes to be // removed. while ((!nni_list_empty(&sock->s_pipes)) || @@ -1271,7 +1210,6 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) { nni_ctx *ctx; int rv; - uint64_t id; if (sock->s_ctx_ops.ctx_init == NULL) { return (NNG_ENOTSUP); @@ -1286,12 +1224,11 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) NNI_FREE_STRUCT(ctx); return (NNG_ECLOSED); } - if ((rv = nni_idhash_alloc(ctx_hash, &id, ctx)) != 0) { + if ((rv = nni_idhash_alloc32(ctx_hash, &ctx->c_id, ctx)) != 0) { nni_mtx_unlock(&sock_lk); NNI_FREE_STRUCT(ctx); return (rv); } - ctx->c_id = (uint32_t) id; if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) { nni_idhash_remove(ctx_hash, ctx->c_id); @@ -1415,3 +1352,265 @@ nni_ctx_setopt( nni_mtx_unlock(&sock->s_mx); return (rv); } + +static void +dialer_timer_start_locked(nni_dialer *d) +{ + nni_duration backoff; + + backoff = d->d_currtime; + d->d_currtime *= 2; + if (d->d_currtime > d->d_maxrtime) { + d->d_currtime = d->d_maxrtime; + } + + // To minimize damage from storms, etc., we select a backoff + // value randomly, in the range of [0, backoff-1]; this is + // pretty similar to 802 style backoff, except that we have a + // nearly uniform time period instead of discrete slot times. + // This algorithm may lead to slight biases because we don't + // have a statistically perfect distribution with the modulo of + // the random number, but this really doesn't matter. + nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio); +} + +void +nni_dialer_timer_start(nni_dialer *d) +{ + nni_sock *s = d->d_sock; + nni_mtx_lock(&s->s_mx); + dialer_timer_start_locked(d); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) +{ + nni_sock *s = d->d_sock; + + nni_mtx_lock(&s->s_mx); + + if (s->s_closed || d->d_closed) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_destroy(p); + return; + } + + p->p_dialer = d; + nni_list_append(&d->d_pipes, p); + nni_list_append(&s->s_pipes, p); + d->d_pipe = p; + d->d_currtime = d->d_inirtime; + nni_mtx_unlock(&s->s_mx); + + // Start the initial negotiation I/O... + nni_pipe_start(p); +} + +static void +dialer_shutdown_impl(nni_dialer *d) +{ + nni_pipe *p; + + // Abort any remaining in-flight operations. + nni_aio_close(d->d_con_aio); + nni_aio_close(d->d_tmo_aio); + + // Stop the underlying transport. + d->d_ops.d_close(d->d_data); + + NNI_LIST_FOREACH (&d->d_pipes, p) { + nni_pipe_close(p); + } +} + +static void +dialer_shutdown_locked(nni_dialer *d) +{ + if (nni_atomic_flag_test_and_set(&d->d_closing)) { + return; + } + dialer_shutdown_impl(d); +} + +void +nni_dialer_shutdown(nni_dialer *d) +{ + nni_sock *s = d->d_sock; + if (nni_atomic_flag_test_and_set(&d->d_closing)) { + return; + } + nni_mtx_lock(&s->s_mx); + dialer_shutdown_impl(d); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_dialer_reap(nni_dialer *d) +{ + nni_sock *s = d->d_sock; + + nni_aio_stop(d->d_tmo_aio); + nni_aio_stop(d->d_con_aio); + + nni_mtx_lock(&s->s_mx); + if (!nni_list_empty(&d->d_pipes)) { + nni_pipe *p; + // This should already have been done, but be certain! + NNI_LIST_FOREACH (&d->d_pipes, p) { + nni_pipe_close(p); + } + nni_mtx_unlock(&s->s_mx); + // Go back to the end of reap list. + nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d); + return; + } + + nni_list_remove(&s->s_dialers, d); + if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { + nni_cv_wake(&s->s_cv); + } + + nni_mtx_unlock(&s->s_mx); + + nni_dialer_destroy(d); +} + +void +nni_listener_add_pipe(nni_listener *l, nni_pipe *p) +{ + nni_sock *s = l->l_sock; + + nni_mtx_lock(&s->s_mx); + if (s->s_closed || l->l_closed) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_destroy(p); + return; + } + p->p_listener = l; + nni_list_append(&l->l_pipes, p); + nni_list_append(&s->s_pipes, p); + nni_mtx_unlock(&s->s_mx); + + // Start the initial negotiation I/O... + nni_pipe_start(p); +} + +static void +listener_shutdown_impl(nni_listener *l) +{ + nni_pipe *p; + + // Abort any remaining in-flight accepts. + nni_aio_close(l->l_acc_aio); + nni_aio_close(l->l_tmo_aio); + + // Stop the underlying transport. + l->l_ops.l_close(l->l_data); + + NNI_LIST_FOREACH (&l->l_pipes, p) { + nni_pipe_close(p); + } +} + +static void +listener_shutdown_locked(nni_listener *l) +{ + if (nni_atomic_flag_test_and_set(&l->l_closing)) { + return; + } + listener_shutdown_impl(l); +} + +void +nni_listener_shutdown(nni_listener *l) +{ + nni_sock *s = l->l_sock; + + if (nni_atomic_flag_test_and_set(&l->l_closing)) { + return; + } + + nni_mtx_lock(&s->s_mx); + listener_shutdown_impl(l); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_listener_reap(nni_listener *l) +{ + nni_sock *s = l->l_sock; + + nni_aio_stop(l->l_tmo_aio); + nni_aio_stop(l->l_acc_aio); + + nni_mtx_lock(&s->s_mx); + if (!nni_list_empty(&l->l_pipes)) { + nni_pipe *p; + // This should already have been done, but be certain! + NNI_LIST_FOREACH (&l->l_pipes, p) { + nni_pipe_close(p); + } + nni_mtx_unlock(&s->s_mx); + // Go back to the end of reap list. + nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l); + return; + } + + nni_list_remove(&s->s_listeners, l); + if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { + nni_cv_wake(&s->s_cv); + } + + nni_mtx_unlock(&s->s_mx); + + nni_listener_destroy(l); +} + +void +nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) +{ + nni_sock * s = p->p_sock; + nng_pipe_cb cb; + void * arg; + + if (!p->p_cbs) { + if (ev == NNG_PIPE_EV_ADD_PRE) { + // First event, after this we want all other events. + p->p_cbs = true; + } else { + return; + } + } + nni_mtx_lock(&s->s_pipe_cbs_mtx); + cb = s->s_pipe_cbs[ev].cb_fn; + arg = s->s_pipe_cbs[ev].cb_arg; + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + + if (cb != NULL) { + nng_pipe pid; + pid.id = p->p_id; + cb(pid, ev, arg); + } +} + +void +nni_pipe_remove(nni_pipe *p) +{ + nni_sock * s = p->p_sock; + nni_dialer *d = p->p_dialer; + + nni_mtx_lock(&s->s_mx); + nni_list_node_remove(&p->p_sock_node); + nni_list_node_remove(&p->p_ep_node); + p->p_listener = NULL; + p->p_dialer = NULL; + if ((d != NULL) && (d->d_pipe == p)) { + d->d_pipe = NULL; + dialer_timer_start_locked(d); // Kick the timer to redial. + } + if (s->s_closing) { + nni_cv_wake(&s->s_cv); + } + nni_mtx_unlock(&s->s_mx); +} |
