From d6ab6bca7a538c1a320ce00ab845e98c16649c94 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 7 Dec 2024 22:28:15 -0800 Subject: pipes and endpoints: support for inline allocations of transport data This is a new transport API, which should make it easier for transports to rely upon lifetime guarantees made by the common SP framework, thus eliminating the need for transport specific reference counters, reap lists, and similar. The transport declares the size of the object in the ops vector (for pipe, dialer, or listener), and the framework supplies one allocated using the associated allocator. For now these add the pipe object to the socket and endpoint using linked linked lists. The plan is to transition those to reference counts which should be lighter weight and free form locking issues. The pipe teardown has been moved more fully to the reaper, to avoid some of the deadlocks that can occur as nni_pipe_close can be called from almost any context. For now the old API is retained as well, but the intention is to convert all the transports and then remove it. --- src/core/dialer.c | 159 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 95 insertions(+), 64 deletions(-) (limited to 'src/core/dialer.c') diff --git a/src/core/dialer.c b/src/core/dialer.c index 2945fded..388d9981 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -19,6 +19,7 @@ // Functionality related to dialing. static void dialer_connect_start(nni_dialer *); static void dialer_connect_cb(void *); +static void dialer_connect_cb_old(void *); static void dialer_timer_cb(void *); static nni_id_map dialers = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0); @@ -33,9 +34,6 @@ nni_dialer_id(nni_dialer *d) void nni_dialer_destroy(nni_dialer *d) { - nni_aio_stop(&d->d_con_aio); - nni_aio_stop(&d->d_tmo_aio); - nni_aio_fini(&d->d_con_aio); nni_aio_fini(&d->d_tmo_aio); @@ -44,7 +42,7 @@ nni_dialer_destroy(nni_dialer *d) } nni_mtx_fini(&d->d_mtx); nni_url_fini(&d->d_url); - NNI_FREE_STRUCT(d); + nni_free(d, NNI_ALIGN_UP(sizeof(*d)) + d->d_ops.d_size); } #ifdef NNG_ENABLE_STATS @@ -54,10 +52,12 @@ dialer_stat_init(nni_dialer *d, nni_stat_item *item, const nni_stat_info *info) nni_stat_init(item, info); nni_stat_add(&d->st_root, item); } +#endif // NNG_ENABLE_STATS static void dialer_stats_init(nni_dialer *d) { +#ifdef NNG_ENABLE_STATS static const nni_stat_info root_info = { .si_name = "dialer", .si_desc = "dialer statistics", @@ -155,13 +155,21 @@ dialer_stats_init(nni_dialer *d) dialer_stat_init(d, &d->st_auth, &auth_info); dialer_stat_init(d, &d->st_oom, &oom_info); dialer_stat_init(d, &d->st_reject, &reject_info); +#endif // NNG_ENABLE_STATS +} +static void +dialer_register_stats(nni_dialer *d) +{ +#ifdef NNG_ENABLE_STATS nni_stat_set_id(&d->st_root, (int) d->d_id); nni_stat_set_id(&d->st_id, (int) d->d_id); nni_stat_set_id(&d->st_sock, (int) nni_sock_id(d->d_sock)); nni_stat_register(&d->st_root); +#else + NNI_ARG_UNUSED(d); +#endif } -#endif // NNG_ENABLE_STATS void nni_dialer_bump_error(nni_dialer *d, int err) @@ -207,7 +215,8 @@ nni_dialer_bump_error(nni_dialer *d, int err) static int nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran) { - int rv; + int rv; + void *dp; d->d_closed = false; d->d_data = NULL; @@ -226,30 +235,41 @@ nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran) nni_mtx_init(&d->d_mtx); - nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); - nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); + if (d->d_ops.d_size != 0) { + d->d_data = ((uint8_t *) d) + NNI_ALIGN_UP(sizeof(*d)); + dp = d->d_data; + } else { + // legacy: remove me when transports converted + dp = &d->d_data; + } - nni_mtx_lock(&dialers_lk); - rv = nni_id_alloc32(&dialers, &d->d_id, d); - nni_mtx_unlock(&dialers_lk); + if (tran->tran_pipe->p_size) { + nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); + } else { + // legacy: remove me + nni_aio_init(&d->d_con_aio, dialer_connect_cb_old, d); + } + nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); -#ifdef NNG_ENABLE_STATS dialer_stats_init(d); -#endif - if ((rv != 0) || - ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) || - ((rv = nni_sock_add_dialer(s, d)) != 0)) { + rv = d->d_ops.d_init(dp, &d->d_url, d); + + if (rv == 0) { + rv = nni_sock_add_dialer(s, d); + } + + if (rv == 0) { nni_mtx_lock(&dialers_lk); - nni_id_remove(&dialers, d->d_id); + rv = nni_id_alloc32(&dialers, &d->d_id, d); nni_mtx_unlock(&dialers_lk); -#ifdef NNG_ENABLE_STATS - nni_stat_unregister(&d->st_root); -#endif - return (rv); } - return (0); + if (rv == 0) { + dialer_register_stats(d); + } + + return (rv); } int @@ -258,16 +278,18 @@ nni_dialer_create_url(nni_dialer **dp, nni_sock *s, const nng_url *url) nni_sp_tran *tran; nni_dialer *d; int rv; + size_t sz; if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) || (tran->tran_dialer == NULL)) { return (NNG_ENOTSUP); } - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size; + if ((d = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } if ((rv = nni_url_clone_inline(&d->d_url, url)) != 0) { - NNI_FREE_STRUCT(d); + nni_free(d, sz); return (rv); } if ((rv = nni_dialer_init(d, s, tran)) != 0) { @@ -289,59 +311,24 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str) nni_sp_tran *tran; nni_dialer *d; int rv; + size_t sz; if (((tran = nni_sp_tran_find(url_str)) == NULL) || (tran->tran_dialer == NULL)) { return (NNG_ENOTSUP); } - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size; + if ((d = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } if ((rv = nni_url_parse_inline(&d->d_url, url_str)) != 0) { - NNI_FREE_STRUCT(d); + nni_free(d, sz); return (rv); } - d->d_closed = false; - d->d_data = NULL; - d->d_ref = 1; - d->d_sock = s; - d->d_tran = tran; - nni_atomic_flag_reset(&d->d_started); - - // Make a copy of the endpoint operations. This allows us to - // modify them (to override NULLs for example), and avoids an extra - // dereference on hot paths. - d->d_ops = *tran->tran_dialer; - - NNI_LIST_NODE_INIT(&d->d_node); - NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node); - - nni_mtx_init(&d->d_mtx); - - nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); - nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); - - nni_mtx_lock(&dialers_lk); - rv = nni_id_alloc32(&dialers, &d->d_id, d); - nni_mtx_unlock(&dialers_lk); - -#ifdef NNG_ENABLE_STATS - dialer_stats_init(d); -#endif - - if ((rv != 0) || - ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) || - ((rv = nni_sock_add_dialer(s, d)) != 0)) { - nni_mtx_lock(&dialers_lk); - nni_id_remove(&dialers, d->d_id); - nni_mtx_unlock(&dialers_lk); -#ifdef NNG_ENABLE_STATS - nni_stat_unregister(&d->st_root); -#endif + if ((rv = nni_dialer_init(d, s, tran)) != 0) { nni_dialer_destroy(d); return (rv); } - *dp = d; return (0); } @@ -433,6 +420,50 @@ dialer_connect_cb(void *arg) d->d_user_aio = NULL; nni_mtx_unlock(&d->d_mtx); + switch ((rv = nni_aio_result(aio))) { + case 0: +#ifdef NNG_ENABLE_STATS + nni_stat_inc(&d->st_connect, 1); +#endif + nni_pipe_start(nni_aio_get_output(aio, 0)); + break; + case NNG_ECLOSED: // No further action. + case NNG_ECANCELED: // No further action. + nni_dialer_bump_error(d, rv); + break; + case NNG_ECONNREFUSED: + case NNG_ETIMEDOUT: + default: + nng_log_warn("NNG-CONN-FAIL", + "Failed connecting socket<%u>: %s", nni_sock_id(d->d_sock), + nng_strerror(rv)); + + nni_dialer_bump_error(d, rv); + if (user_aio == NULL) { + nni_dialer_timer_start(d); + } else { + nni_atomic_flag_reset(&d->d_started); + } + break; + } + if (user_aio != NULL) { + nni_aio_finish(user_aio, rv, 0); + } +} + +static void +dialer_connect_cb_old(void *arg) +{ + nni_dialer *d = arg; + nni_aio *aio = &d->d_con_aio; + nni_aio *user_aio; + int rv; + + nni_mtx_lock(&d->d_mtx); + user_aio = d->d_user_aio; + d->d_user_aio = NULL; + nni_mtx_unlock(&d->d_mtx); + switch ((rv = nni_aio_result(aio))) { case 0: #ifdef NNG_ENABLE_STATS -- cgit v1.2.3-70-g09d2