From d6ab6bca7a538c1a320ce00ab845e98c16649c94 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 7 Dec 2024 22:28:15 -0800 Subject: pipes and endpoints: support for inline allocations of transport data This is a new transport API, which should make it easier for transports to rely upon lifetime guarantees made by the common SP framework, thus eliminating the need for transport specific reference counters, reap lists, and similar. The transport declares the size of the object in the ops vector (for pipe, dialer, or listener), and the framework supplies one allocated using the associated allocator. For now these add the pipe object to the socket and endpoint using linked linked lists. The plan is to transition those to reference counts which should be lighter weight and free form locking issues. The pipe teardown has been moved more fully to the reaper, to avoid some of the deadlocks that can occur as nni_pipe_close can be called from almost any context. For now the old API is retained as well, but the intention is to convert all the transports and then remove it. --- src/core/dialer.c | 159 ++++++++++++++++++++++++++++-------------------- src/core/listener.c | 120 ++++++++++++++++++++++++++++++------- src/core/pipe.c | 169 ++++++++++++++++++++++++++++++++++------------------ src/core/pipe.h | 3 + src/core/socket.c | 144 ++++++++++++++++++++++++++++---------------- src/core/sockimpl.h | 2 + 6 files changed, 398 insertions(+), 199 deletions(-) (limited to 'src/core') diff --git a/src/core/dialer.c b/src/core/dialer.c index 2945fded..388d9981 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -19,6 +19,7 @@ // Functionality related to dialing. static void dialer_connect_start(nni_dialer *); static void dialer_connect_cb(void *); +static void dialer_connect_cb_old(void *); static void dialer_timer_cb(void *); static nni_id_map dialers = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0); @@ -33,9 +34,6 @@ nni_dialer_id(nni_dialer *d) void nni_dialer_destroy(nni_dialer *d) { - nni_aio_stop(&d->d_con_aio); - nni_aio_stop(&d->d_tmo_aio); - nni_aio_fini(&d->d_con_aio); nni_aio_fini(&d->d_tmo_aio); @@ -44,7 +42,7 @@ nni_dialer_destroy(nni_dialer *d) } nni_mtx_fini(&d->d_mtx); nni_url_fini(&d->d_url); - NNI_FREE_STRUCT(d); + nni_free(d, NNI_ALIGN_UP(sizeof(*d)) + d->d_ops.d_size); } #ifdef NNG_ENABLE_STATS @@ -54,10 +52,12 @@ dialer_stat_init(nni_dialer *d, nni_stat_item *item, const nni_stat_info *info) nni_stat_init(item, info); nni_stat_add(&d->st_root, item); } +#endif // NNG_ENABLE_STATS static void dialer_stats_init(nni_dialer *d) { +#ifdef NNG_ENABLE_STATS static const nni_stat_info root_info = { .si_name = "dialer", .si_desc = "dialer statistics", @@ -155,13 +155,21 @@ dialer_stats_init(nni_dialer *d) dialer_stat_init(d, &d->st_auth, &auth_info); dialer_stat_init(d, &d->st_oom, &oom_info); dialer_stat_init(d, &d->st_reject, &reject_info); +#endif // NNG_ENABLE_STATS +} +static void +dialer_register_stats(nni_dialer *d) +{ +#ifdef NNG_ENABLE_STATS nni_stat_set_id(&d->st_root, (int) d->d_id); nni_stat_set_id(&d->st_id, (int) d->d_id); nni_stat_set_id(&d->st_sock, (int) nni_sock_id(d->d_sock)); nni_stat_register(&d->st_root); +#else + NNI_ARG_UNUSED(d); +#endif } -#endif // NNG_ENABLE_STATS void nni_dialer_bump_error(nni_dialer *d, int err) @@ -207,7 +215,8 @@ nni_dialer_bump_error(nni_dialer *d, int err) static int nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran) { - int rv; + int rv; + void *dp; d->d_closed = false; d->d_data = NULL; @@ -226,30 +235,41 @@ nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran) nni_mtx_init(&d->d_mtx); - nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); - nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); + if (d->d_ops.d_size != 0) { + d->d_data = ((uint8_t *) d) + NNI_ALIGN_UP(sizeof(*d)); + dp = d->d_data; + } else { + // legacy: remove me when transports converted + dp = &d->d_data; + } - nni_mtx_lock(&dialers_lk); - rv = nni_id_alloc32(&dialers, &d->d_id, d); - nni_mtx_unlock(&dialers_lk); + if (tran->tran_pipe->p_size) { + nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); + } else { + // legacy: remove me + nni_aio_init(&d->d_con_aio, dialer_connect_cb_old, d); + } + nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); -#ifdef NNG_ENABLE_STATS dialer_stats_init(d); -#endif - if ((rv != 0) || - ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) || - ((rv = nni_sock_add_dialer(s, d)) != 0)) { + rv = d->d_ops.d_init(dp, &d->d_url, d); + + if (rv == 0) { + rv = nni_sock_add_dialer(s, d); + } + + if (rv == 0) { nni_mtx_lock(&dialers_lk); - nni_id_remove(&dialers, d->d_id); + rv = nni_id_alloc32(&dialers, &d->d_id, d); nni_mtx_unlock(&dialers_lk); -#ifdef NNG_ENABLE_STATS - nni_stat_unregister(&d->st_root); -#endif - return (rv); } - return (0); + if (rv == 0) { + dialer_register_stats(d); + } + + return (rv); } int @@ -258,16 +278,18 @@ nni_dialer_create_url(nni_dialer **dp, nni_sock *s, const nng_url *url) nni_sp_tran *tran; nni_dialer *d; int rv; + size_t sz; if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) || (tran->tran_dialer == NULL)) { return (NNG_ENOTSUP); } - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size; + if ((d = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } if ((rv = nni_url_clone_inline(&d->d_url, url)) != 0) { - NNI_FREE_STRUCT(d); + nni_free(d, sz); return (rv); } if ((rv = nni_dialer_init(d, s, tran)) != 0) { @@ -289,59 +311,24 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str) nni_sp_tran *tran; nni_dialer *d; int rv; + size_t sz; if (((tran = nni_sp_tran_find(url_str)) == NULL) || (tran->tran_dialer == NULL)) { return (NNG_ENOTSUP); } - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size; + if ((d = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } if ((rv = nni_url_parse_inline(&d->d_url, url_str)) != 0) { - NNI_FREE_STRUCT(d); + nni_free(d, sz); return (rv); } - d->d_closed = false; - d->d_data = NULL; - d->d_ref = 1; - d->d_sock = s; - d->d_tran = tran; - nni_atomic_flag_reset(&d->d_started); - - // Make a copy of the endpoint operations. This allows us to - // modify them (to override NULLs for example), and avoids an extra - // dereference on hot paths. - d->d_ops = *tran->tran_dialer; - - NNI_LIST_NODE_INIT(&d->d_node); - NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node); - - nni_mtx_init(&d->d_mtx); - - nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); - nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); - - nni_mtx_lock(&dialers_lk); - rv = nni_id_alloc32(&dialers, &d->d_id, d); - nni_mtx_unlock(&dialers_lk); - -#ifdef NNG_ENABLE_STATS - dialer_stats_init(d); -#endif - - if ((rv != 0) || - ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) || - ((rv = nni_sock_add_dialer(s, d)) != 0)) { - nni_mtx_lock(&dialers_lk); - nni_id_remove(&dialers, d->d_id); - nni_mtx_unlock(&dialers_lk); -#ifdef NNG_ENABLE_STATS - nni_stat_unregister(&d->st_root); -#endif + if ((rv = nni_dialer_init(d, s, tran)) != 0) { nni_dialer_destroy(d); return (rv); } - *dp = d; return (0); } @@ -433,6 +420,50 @@ dialer_connect_cb(void *arg) d->d_user_aio = NULL; nni_mtx_unlock(&d->d_mtx); + switch ((rv = nni_aio_result(aio))) { + case 0: +#ifdef NNG_ENABLE_STATS + nni_stat_inc(&d->st_connect, 1); +#endif + nni_pipe_start(nni_aio_get_output(aio, 0)); + break; + case NNG_ECLOSED: // No further action. + case NNG_ECANCELED: // No further action. + nni_dialer_bump_error(d, rv); + break; + case NNG_ECONNREFUSED: + case NNG_ETIMEDOUT: + default: + nng_log_warn("NNG-CONN-FAIL", + "Failed connecting socket<%u>: %s", nni_sock_id(d->d_sock), + nng_strerror(rv)); + + nni_dialer_bump_error(d, rv); + if (user_aio == NULL) { + nni_dialer_timer_start(d); + } else { + nni_atomic_flag_reset(&d->d_started); + } + break; + } + if (user_aio != NULL) { + nni_aio_finish(user_aio, rv, 0); + } +} + +static void +dialer_connect_cb_old(void *arg) +{ + nni_dialer *d = arg; + nni_aio *aio = &d->d_con_aio; + nni_aio *user_aio; + int rv; + + nni_mtx_lock(&d->d_mtx); + user_aio = d->d_user_aio; + d->d_user_aio = NULL; + nni_mtx_unlock(&d->d_mtx); + switch ((rv = nni_aio_result(aio))) { case 0: #ifdef NNG_ENABLE_STATS diff --git a/src/core/listener.c b/src/core/listener.c index 9107e3a4..3dc2a0fa 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -22,6 +22,7 @@ static void listener_accept_start(nni_listener *); static void listener_accept_cb(void *); +static void listener_accept_cb_old(void *); static void listener_timer_cb(void *); static nni_id_map listeners = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0); @@ -44,7 +45,7 @@ nni_listener_destroy(nni_listener *l) l->l_ops.l_fini(l->l_data); } nni_url_fini(&l->l_url); - NNI_FREE_STRUCT(l); + nni_free(l, NNI_ALIGN_UP(sizeof(*l)) + l->l_ops.l_size); } #ifdef NNG_ENABLE_STATS @@ -55,10 +56,12 @@ listener_stat_init( nni_stat_init(item, info); nni_stat_add(&l->st_root, item); } +#endif static void listener_stats_init(nni_listener *l) { +#ifdef NNG_ENABLE_STATS static const nni_stat_info root_info = { .si_name = "listener", .si_desc = "listener statistics", @@ -153,9 +156,23 @@ listener_stats_init(nni_listener *l) nni_stat_set_id(&l->st_root, (int) l->l_id); nni_stat_set_id(&l->st_id, (int) l->l_id); nni_stat_set_id(&l->st_sock, (int) nni_sock_id(l->l_sock)); +#else + NNI_ARG_UNUSED(l); +#endif // NNG_ENABLE_STATS +} + +static void +listener_register_stats(nni_listener *l) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_set_id(&l->st_root, (int) l->l_id); + nni_stat_set_id(&l->st_id, (int) l->l_id); + nni_stat_set_id(&l->st_sock, (int) nni_sock_id(l->l_sock)); nni_stat_register(&l->st_root); +#else + NNI_ARG_UNUSED(l); +#endif } -#endif // NNG_ENABLE_STATS void nni_listener_bump_error(nni_listener *l, int err) @@ -195,7 +212,8 @@ nni_listener_bump_error(nni_listener *l, int err) static int nni_listener_init(nni_listener *l, nni_sock *s, nni_sp_tran *tran) { - int rv; + int rv; + void *lp; l->l_closed = false; l->l_data = NULL; @@ -212,30 +230,40 @@ nni_listener_init(nni_listener *l, nni_sock *s, nni_sp_tran *tran) NNI_LIST_NODE_INIT(&l->l_node); NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node); - nni_aio_init(&l->l_acc_aio, listener_accept_cb, l); + if (tran->tran_pipe->p_size) { + nni_aio_init(&l->l_acc_aio, listener_accept_cb, l); + } else { + nni_aio_init(&l->l_acc_aio, listener_accept_cb_old, l); + } nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l); - nni_mtx_lock(&listeners_lk); - rv = nni_id_alloc32(&listeners, &l->l_id, l); - nni_mtx_unlock(&listeners_lk); - -#ifdef NNG_ENABLE_STATS listener_stats_init(l); -#endif - if ((rv != 0) || - ((rv = l->l_ops.l_init(&l->l_data, &l->l_url, l)) != 0) || - ((rv = nni_sock_add_listener(s, l)) != 0)) { + if (l->l_ops.l_size != 0) { + l->l_data = ((uint8_t *) l) + NNI_ALIGN_UP(sizeof(*l)); + lp = l->l_data; + } else { + // legacy: remove me when transports converted + lp = &l->l_data; + } + + rv = l->l_ops.l_init(lp, &l->l_url, l); + + if (rv == 0) { + rv = nni_sock_add_listener(s, l); + } + + if (rv == 0) { nni_mtx_lock(&listeners_lk); - nni_id_remove(&listeners, l->l_id); + rv = nni_id_alloc32(&listeners, &l->l_id, l); nni_mtx_unlock(&listeners_lk); -#ifdef NNG_ENABLE_STATS - nni_stat_unregister(&l->st_root); -#endif - return (rv); } - return (0); + if (rv == 0) { + listener_register_stats(l); + } + + return (rv); } int @@ -244,16 +272,18 @@ nni_listener_create_url(nni_listener **lp, nni_sock *s, const nng_url *url) nni_sp_tran *tran; nni_listener *l; int rv; + size_t sz; if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) || (tran->tran_listener == NULL)) { return (NNG_ENOTSUP); } - if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*l)) + tran->tran_listener->l_size; + if ((l = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } if ((rv = nni_url_clone_inline(&l->l_url, url)) != 0) { - NNI_FREE_STRUCT(l); + nni_free(l, sz); return (rv); } if ((rv = nni_listener_init(l, s, tran)) != 0) { @@ -276,16 +306,19 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str) nni_sp_tran *tran; nni_listener *l; int rv; + size_t sz; if (((tran = nni_sp_tran_find(url_str)) == NULL) || (tran->tran_listener == NULL)) { return (NNG_ENOTSUP); } - if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*l)) + tran->tran_listener->l_size; + if ((l = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_url_parse_inline(&l->l_url, url_str)) != 0) { - NNI_FREE_STRUCT(l); + nni_free(l, sz); return (rv); } if ((rv = nni_listener_init(l, s, tran)) != 0) { @@ -376,6 +409,47 @@ listener_accept_cb(void *arg) nni_aio *aio = &l->l_acc_aio; int rv; + switch ((rv = nni_aio_result(aio))) { + case 0: +#ifdef NNG_ENABLE_STATS + nni_stat_inc(&l->st_accept, 1); +#endif + nni_pipe_start(nni_aio_get_output(aio, 0)); + listener_accept_start(l); + break; + case NNG_ECONNABORTED: // remote condition, no cool down + case NNG_ECONNRESET: // remote condition, no cool down + case NNG_ETIMEDOUT: // No need to sleep, we timed out already. + case NNG_EPEERAUTH: // peer validation failure + nng_log_warn("NNG-ACCEPT-FAIL", + "Failed accepting for socket<%u>: %s", + nni_sock_id(l->l_sock), nng_strerror(rv)); + nni_listener_bump_error(l, rv); + listener_accept_start(l); + break; + case NNG_ECLOSED: // no further action + case NNG_ECANCELED: // no further action + nni_listener_bump_error(l, rv); + break; + default: + // We don't really know why we failed, but we back off + // here. This is because errors here are probably due + // to system failures (resource exhaustion) and we hope + // by not thrashing we give the system a chance to + // recover. 100 ms is enough to cool down. + nni_listener_bump_error(l, rv); + nni_sleep_aio(100, &l->l_tmo_aio); + break; + } +} + +static void +listener_accept_cb_old(void *arg) +{ + nni_listener *l = arg; + nni_aio *aio = &l->l_acc_aio; + int rv; + switch ((rv = nni_aio_result(aio))) { case 0: #ifdef NNG_ENABLE_STATS diff --git a/src/core/pipe.c b/src/core/pipe.c index 05089fee..eaf49af9 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -38,9 +38,8 @@ pipe_destroy(void *arg) nni_pipe *p = arg; p->p_proto_ops.pipe_fini(p->p_proto_data); - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_fini(p->p_tran_data); - } + p->p_tran_ops.p_fini(p->p_tran_data); + nni_free(p, p->p_size); } @@ -49,6 +48,11 @@ pipe_reap(void *arg) { nni_pipe *p = arg; + p->p_proto_ops.pipe_close(p->p_proto_data); + + // Close the underlying transport. + p->p_tran_ops.p_close(p->p_tran_data); + nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST); // Make sure any unlocked holders are done with this. @@ -62,12 +66,11 @@ pipe_reap(void *arg) #ifdef NNG_ENABLE_STATS nni_stat_unregister(&p->st_root); #endif - nni_pipe_remove(p); p->p_proto_ops.pipe_stop(p->p_proto_data); - if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { - p->p_tran_ops.p_stop(p->p_tran_data); - } + p->p_tran_ops.p_stop(p->p_tran_data); + + nni_pipe_remove(p); nni_pipe_rele(p); } @@ -125,13 +128,6 @@ nni_pipe_close(nni_pipe *p) return; // We already did a close. } - p->p_proto_ops.pipe_close(p->p_proto_data); - - // Close the underlying transport. - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_close(p->p_tran_data); - } - nni_reap(&pipe_reap_list, p); } @@ -154,10 +150,12 @@ pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info) nni_stat_init(item, info); nni_stat_add(&p->st_root, item); } +#endif static void pipe_stats_init(nni_pipe *p) { +#ifdef NNG_ENABLE_STATS static const nni_stat_info root_info = { .si_name = "pipe", .si_desc = "pipe statistics", @@ -201,6 +199,16 @@ pipe_stats_init(nni_pipe *p) .si_unit = NNG_UNIT_BYTES, .si_atomic = true, }; + static const nni_stat_info dialer_info = { + .si_name = "dialer", + .si_desc = "dialer for pipe", + .si_type = NNG_STAT_ID, + }; + static const nni_stat_info listener_info = { + .si_name = "listener", + .si_desc = "listener for pipe", + .si_type = NNG_STAT_ID, + }; nni_stat_init(&p->st_root, &root_info); pipe_stat_init(p, &p->st_id, &id_info); @@ -213,34 +221,56 @@ pipe_stats_init(nni_pipe *p) nni_stat_set_id(&p->st_root, (int) p->p_id); nni_stat_set_id(&p->st_id, (int) p->p_id); nni_stat_set_id(&p->st_sock_id, (int) nni_sock_id(p->p_sock)); -} + + if (p->p_dialer) { + NNI_ASSERT(p->p_listener == NULL); + pipe_stat_init(p, &p->st_ep_id, &dialer_info); + nni_stat_set_id( + &p->st_ep_id, (int) nni_dialer_id(p->p_dialer)); + } + if (p->p_listener) { + pipe_stat_init(p, &p->st_ep_id, &listener_info); + nni_stat_set_id( + &p->st_ep_id, (int) nni_listener_id(p->p_listener)); + } +#else + NNI_ARG_UNUSED(p); #endif // NNG_ENABLE_STATS +} static int -pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) +pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d, + nni_listener *l, void *tran_data) { - nni_pipe *p; - int rv; - void *sock_data = nni_sock_proto_data(sock); - nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); - size_t sz; + nni_pipe *p; + int rv1, rv2, rv3; + void *sock_data = nni_sock_proto_data(sock); + nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); + const nni_sp_pipe_ops *tops = tran->tran_pipe; + size_t sz; - sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size; + sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) + + NNI_ALIGN_UP(tops->p_size); if ((p = nni_zalloc(sz)) == NULL) { - // In this case we just toss the pipe... - tran->tran_pipe->p_fini(tran_data); + // TODO: remove when all transports converted + // to use p_size. + if (tran_data != NULL) { + tops->p_fini(tran_data); + } return (NNG_ENOMEM); } - p->p_size = sz; - p->p_proto_data = p + 1; - p->p_tran_ops = *tran->tran_pipe; - p->p_tran_data = tran_data; - p->p_proto_ops = *pops; - p->p_sock = sock; - p->p_cbs = false; + p->p_size = sz; + p->p_proto_ops = *pops; + p->p_tran_ops = *tops; + p->p_sock = sock; + p->p_cbs = false; + p->p_dialer = d; + p->p_listener = l; + // Two references - one for our caller, and + // one to be dropped when the pipe is closed. nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy); nni_atomic_init_bool(&p->p_closed); @@ -248,19 +278,30 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); + uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p)); + + if (tran_data == NULL) { + tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size); + } + nni_pipe_add(p); + + p->p_tran_data = tran_data; + p->p_proto_data = proto_data; + nni_mtx_lock(&pipes_lk); - rv = nni_id_alloc32(&pipes, &p->p_id, p); + rv1 = nni_id_alloc32(&pipes, &p->p_id, p); nni_mtx_unlock(&pipes_lk); -#ifdef NNG_ENABLE_STATS + // must be done before protocol or transports, because + // they may add further stats pipe_stats_init(p); -#endif - if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) || - ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) { + rv2 = tops->p_init(tran_data, p); + rv3 = pops->pipe_init(proto_data, p, sock_data); + if (rv1 != 0 || rv2 != 0 || rv3 != 0) { nni_pipe_close(p); nni_pipe_rele(p); - return (rv); + return (rv1 ? rv1 : rv2 ? rv2 : rv3); } *pp = p; @@ -274,19 +315,9 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data) nni_sp_tran *tran = d->d_tran; nni_pipe *p; - if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) { + if ((rv = pipe_create(&p, d->d_sock, tran, d, NULL, tran_data)) != 0) { return (rv); } - p->p_dialer = d; -#ifdef NNG_ENABLE_STATS - static const nni_stat_info dialer_info = { - .si_name = "dialer", - .si_desc = "dialer for pipe", - .si_type = NNG_STAT_ID, - }; - pipe_stat_init(p, &p->st_ep_id, &dialer_info); - nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d)); -#endif *pp = p; return (0); } @@ -298,23 +329,43 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data) nni_sp_tran *tran = l->l_tran; nni_pipe *p; - if ((rv = pipe_create(&p, l->l_sock, tran, tran_data)) != 0) { + if ((rv = pipe_create(&p, l->l_sock, tran, NULL, l, tran_data)) != 0) { return (rv); } - p->p_listener = l; -#ifdef NNG_ENABLE_STATS - static const nni_stat_info listener_info = { - .si_name = "listener", - .si_desc = "listener for pipe", - .si_type = NNG_STAT_ID, - }; - pipe_stat_init(p, &p->st_ep_id, &listener_info); - nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l)); -#endif *pp = p; return (0); } +int +nni_pipe_alloc_dialer(void **datap, nni_dialer *d) +{ + int rv; + nni_sp_tran *tran = d->d_tran; + nni_sock *s = d->d_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, d, NULL, NULL)) != 0) { + return (rv); + } + *datap = p->p_tran_data; + return (0); +} + +int +nni_pipe_alloc_listener(void **datap, nni_listener *l) +{ + int rv; + nni_sp_tran *tran = l->l_tran; + nni_sock *s = l->l_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, NULL, l, NULL)) != 0) { + return (rv); + } + *datap = p->p_tran_data; + return (0); +} + int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) diff --git a/src/core/pipe.h b/src/core/pipe.h index 9e7109de..71e07a9f 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -63,4 +63,7 @@ extern void nni_pipe_bump_error(nni_pipe *, int); extern char *nni_pipe_peer_addr(nni_pipe *p, char buf[NNG_MAXADDRSTRLEN]); +extern int nni_pipe_alloc_dialer(void **, nni_dialer *); +extern int nni_pipe_alloc_listener(void **, nni_listener *); + #endif // CORE_PIPE_H diff --git a/src/core/socket.c b/src/core/socket.c index fe1b62d2..ec2691a3 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -645,6 +645,13 @@ nni_sock_shutdown(nni_sock *sock) nni_dialer_close(d); nni_mtx_lock(&sock->s_mx); } + + // For each pipe, arrange for it to teardown hard. We would + // expect there not to be any here. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_close(pipe); + } + nni_mtx_unlock(&sock->s_mx); // Close the upper queues immediately. @@ -679,30 +686,15 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_unlock(&sock_lk); nni_mtx_lock(&sock->s_mx); - - // At this point, we've done everything we politely can to - // give the protocol a chance to flush its write side. Now - // it is time to be a little more insistent. - - // For each pipe, arrange for it to teardown hard. We would - // expect there not to be any here. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_close(pipe); - } - // We have to wait for pipes to be removed. while (!nni_list_empty(&sock->s_pipes)) { nni_cv_wait(&sock->s_cv); } - - sock->s_sock_ops.sock_close(sock->s_data); - - nni_cv_wake(&sock->s_cv); - NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL); - nni_mtx_unlock(&sock->s_mx); + sock->s_sock_ops.sock_close(sock->s_data); + // At this point, there are no threads blocked inside of us // that are referencing socket state. User code should call // nng_close to release the last resources. @@ -1277,24 +1269,16 @@ nni_dialer_timer_start(nni_dialer *d) nni_mtx_unlock(&s->s_mx); } -void -nni_dialer_add_pipe(nni_dialer *d, void *tpipe) +static void +dialer_start_pipe(nni_dialer *d, nni_pipe *p) { nni_sock *s = d->d_sock; - nni_pipe *p; nni_mtx_lock(&s->s_mx); - - if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; - } - - nni_list_append(&d->d_pipes, p); - nni_list_append(&s->s_pipes, p); d->d_pipe = p; d->d_currtime = d->d_inirtime; nni_mtx_unlock(&s->s_mx); + #ifdef NNG_ENABLE_STATS nni_stat_inc(&s->st_pipes, 1); nni_stat_inc(&d->st_pipes, 1); @@ -1341,6 +1325,24 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) nni_pipe_rele(p); } +void +nni_dialer_add_pipe(nni_dialer *d, void *tpipe) +{ + nni_sock *s = d->d_sock; + nni_pipe *p; + + if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { + return; + } + + nni_mtx_lock(&s->s_mx); + d->d_pipe = p; + d->d_currtime = d->d_inirtime; + nni_mtx_unlock(&s->s_mx); + + dialer_start_pipe(d, p); +} + void nni_dialer_shutdown(nni_dialer *d) { @@ -1403,21 +1405,11 @@ nni_dialer_reap(nni_dialer *d) nni_reap(&dialer_reap_list, d); } -void -nni_listener_add_pipe(nni_listener *l, void *tpipe) +static void +listener_start_pipe(nni_listener *l, nni_pipe *p) { nni_sock *s = l->l_sock; - nni_pipe *p; - - nni_mtx_lock(&s->s_mx); - if (nni_pipe_create_listener(&p, l, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; - } - nni_list_append(&l->l_pipes, p); - nni_list_append(&s->s_pipes, p); - nni_mtx_unlock(&s->s_mx); #ifdef NNG_ENABLE_STATS nni_stat_inc(&l->st_pipes, 1); nni_stat_inc(&s->st_pipes, 1); @@ -1454,9 +1446,24 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) "Accepted pipe<%u> on socket<%u> from %s", nni_pipe_id(p), nni_sock_id(s), nni_pipe_peer_addr(p, addr)); } + + // the socket now "owns" the pipe, and a pipe close should immediately + // start the process of teardown. nni_pipe_rele(p); } +void +nni_listener_add_pipe(nni_listener *l, void *tpipe) +{ + nni_pipe *p; + + if (nni_pipe_create_listener(&p, l, tpipe) != 0) { + return; + } + + listener_start_pipe(l, p); +} + void nni_listener_shutdown(nni_listener *l) { @@ -1520,6 +1527,44 @@ nni_listener_reap(nni_listener *l) nni_reap(&listener_reap_list, l); } +// nni_pipe_add just registers the pipe with the socket and endpoint +// so they won't be deallocated while the pipe still exists. +void +nni_pipe_add(nni_pipe *p) +{ + nni_sock *s = p->p_sock; + nni_dialer *d = p->p_dialer; + nni_listener *l = p->p_listener; + + nni_mtx_lock(&s->s_mx); + nni_list_append(&s->s_pipes, p); + if (d != NULL) { + NNI_ASSERT(l == NULL); + nni_list_append(&d->d_pipes, p); + } + if (l != NULL) { + NNI_ASSERT(d == NULL); + nni_list_append(&l->l_pipes, p); + } + nni_mtx_unlock(&s->s_mx); +} + +// nni_pipe_start attempts to start the pipe, adding it to the socket and +// endpoints and calling callbacks, etc. The pipe should already have finished +// any negotiation needed at the transport layer. +void +nni_pipe_start(nni_pipe *p) +{ + if (p->p_listener) { + NNI_ASSERT(p->p_dialer == NULL); + listener_start_pipe(p->p_listener, p); + } + if (p->p_dialer) { + NNI_ASSERT(p->p_listener == NULL); + dialer_start_pipe(p->p_dialer, p); + } +} + void nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) { @@ -1528,14 +1573,11 @@ nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) void *arg; nni_mtx_lock(&s->s_pipe_cbs_mtx); - if (!p->p_cbs) { - if (ev == NNG_PIPE_EV_ADD_PRE) { - // First event, after this we want all other events. - p->p_cbs = true; - } else { - nni_mtx_unlock(&s->s_pipe_cbs_mtx); - return; - } + if (ev == NNG_PIPE_EV_ADD_PRE) { + p->p_cbs = true; + } else if (!p->p_cbs) { + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + return; } cb = s->s_pipe_cbs[ev].cb_fn; arg = s->s_pipe_cbs[ev].cb_arg; @@ -1556,9 +1598,7 @@ nni_pipe_remove(nni_pipe *p) nni_mtx_lock(&s->s_mx); #ifdef NNG_ENABLE_STATS - if (nni_list_node_active(&p->p_sock_node)) { - nni_stat_dec(&s->st_pipes, 1); - } + nni_stat_dec(&s->st_pipes, 1); if (p->p_listener != NULL) { nni_stat_dec(&p->p_listener->st_pipes, 1); } @@ -1568,8 +1608,6 @@ nni_pipe_remove(nni_pipe *p) #endif nni_list_node_remove(&p->p_sock_node); nni_list_node_remove(&p->p_ep_node); - p->p_listener = NULL; - p->p_dialer = NULL; if ((d != NULL) && (d->d_pipe == p)) { d->d_pipe = NULL; dialer_timer_start_locked(d); // Kick the timer to redial. diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 801ef7b1..aeaa369b 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -132,12 +132,14 @@ extern void nni_dialer_destroy(nni_dialer *); extern void nni_dialer_timer_start(nni_dialer *); extern void nni_dialer_stop(nni_dialer *); +extern void nni_listener_start_pipe(nni_listener *, nni_pipe *); extern void nni_listener_add_pipe(nni_listener *, void *); extern void nni_listener_shutdown(nni_listener *); extern void nni_listener_reap(nni_listener *); extern void nni_listener_destroy(nni_listener *); extern void nni_listener_stop(nni_listener *); +extern void nni_pipe_add(nni_pipe *); extern void nni_pipe_remove(nni_pipe *); extern bool nni_pipe_is_closed(nni_pipe *); extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev); -- cgit v1.2.3-70-g09d2