diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-07 22:28:15 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-08 14:02:42 -0800 |
| commit | d6ab6bca7a538c1a320ce00ab845e98c16649c94 (patch) | |
| tree | d4a8a4f8ac49eee54c2d9bbdf8f9da6c28362311 /src/core/dialer.c | |
| parent | 5223a142e38a320ce53097cea450d8ba7f175193 (diff) | |
| download | nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.gz nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.bz2 nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.zip | |
pipes and endpoints: support for inline allocations of transport data
This is a new transport API, which should make it easier for transports
to rely upon lifetime guarantees made by the common SP framework, thus
eliminating the need for transport specific reference counters, reap
lists, and similar.
The transport declares the size of the object in the ops vector (for
pipe, dialer, or listener), and the framework supplies one allocated
using the associated allocator.
For now these add the pipe object to the socket and endpoint using
linked linked lists. The plan is to transition those to reference
counts which should be lighter weight and free form locking issues.
The pipe teardown has been moved more fully to the reaper, to avoid
some of the deadlocks that can occur as nni_pipe_close can be called
from almost any context.
For now the old API is retained as well, but the intention is to convert
all the transports and then remove it.
Diffstat (limited to 'src/core/dialer.c')
| -rw-r--r-- | src/core/dialer.c | 159 |
1 files changed, 95 insertions, 64 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c index 2945fded..388d9981 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -19,6 +19,7 @@ // Functionality related to dialing. static void dialer_connect_start(nni_dialer *); static void dialer_connect_cb(void *); +static void dialer_connect_cb_old(void *); static void dialer_timer_cb(void *); static nni_id_map dialers = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0); @@ -33,9 +34,6 @@ nni_dialer_id(nni_dialer *d) void nni_dialer_destroy(nni_dialer *d) { - nni_aio_stop(&d->d_con_aio); - nni_aio_stop(&d->d_tmo_aio); - nni_aio_fini(&d->d_con_aio); nni_aio_fini(&d->d_tmo_aio); @@ -44,7 +42,7 @@ nni_dialer_destroy(nni_dialer *d) } nni_mtx_fini(&d->d_mtx); nni_url_fini(&d->d_url); - NNI_FREE_STRUCT(d); + nni_free(d, NNI_ALIGN_UP(sizeof(*d)) + d->d_ops.d_size); } #ifdef NNG_ENABLE_STATS @@ -54,10 +52,12 @@ dialer_stat_init(nni_dialer *d, nni_stat_item *item, const nni_stat_info *info) nni_stat_init(item, info); nni_stat_add(&d->st_root, item); } +#endif // NNG_ENABLE_STATS static void dialer_stats_init(nni_dialer *d) { +#ifdef NNG_ENABLE_STATS static const nni_stat_info root_info = { .si_name = "dialer", .si_desc = "dialer statistics", @@ -155,13 +155,21 @@ dialer_stats_init(nni_dialer *d) dialer_stat_init(d, &d->st_auth, &auth_info); dialer_stat_init(d, &d->st_oom, &oom_info); dialer_stat_init(d, &d->st_reject, &reject_info); +#endif // NNG_ENABLE_STATS +} +static void +dialer_register_stats(nni_dialer *d) +{ +#ifdef NNG_ENABLE_STATS nni_stat_set_id(&d->st_root, (int) d->d_id); nni_stat_set_id(&d->st_id, (int) d->d_id); nni_stat_set_id(&d->st_sock, (int) nni_sock_id(d->d_sock)); nni_stat_register(&d->st_root); +#else + NNI_ARG_UNUSED(d); +#endif } -#endif // NNG_ENABLE_STATS void nni_dialer_bump_error(nni_dialer *d, int err) @@ -207,7 +215,8 @@ nni_dialer_bump_error(nni_dialer *d, int err) static int nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran) { - int rv; + int rv; + void *dp; d->d_closed = false; d->d_data = NULL; @@ -226,30 +235,41 @@ nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran) nni_mtx_init(&d->d_mtx); - nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); - nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); + if (d->d_ops.d_size != 0) { + d->d_data = ((uint8_t *) d) + NNI_ALIGN_UP(sizeof(*d)); + dp = d->d_data; + } else { + // legacy: remove me when transports converted + dp = &d->d_data; + } - nni_mtx_lock(&dialers_lk); - rv = nni_id_alloc32(&dialers, &d->d_id, d); - nni_mtx_unlock(&dialers_lk); + if (tran->tran_pipe->p_size) { + nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); + } else { + // legacy: remove me + nni_aio_init(&d->d_con_aio, dialer_connect_cb_old, d); + } + nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); -#ifdef NNG_ENABLE_STATS dialer_stats_init(d); -#endif - if ((rv != 0) || - ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) || - ((rv = nni_sock_add_dialer(s, d)) != 0)) { + rv = d->d_ops.d_init(dp, &d->d_url, d); + + if (rv == 0) { + rv = nni_sock_add_dialer(s, d); + } + + if (rv == 0) { nni_mtx_lock(&dialers_lk); - nni_id_remove(&dialers, d->d_id); + rv = nni_id_alloc32(&dialers, &d->d_id, d); nni_mtx_unlock(&dialers_lk); -#ifdef NNG_ENABLE_STATS - nni_stat_unregister(&d->st_root); -#endif - return (rv); } - return (0); + if (rv == 0) { + dialer_register_stats(d); + } + + return (rv); } int @@ -258,16 +278,18 @@ nni_dialer_create_url(nni_dialer **dp, nni_sock *s, const nng_url *url) nni_sp_tran *tran; nni_dialer *d; int rv; + size_t sz; if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) || (tran->tran_dialer == NULL)) { return (NNG_ENOTSUP); } - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size; + if ((d = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } if ((rv = nni_url_clone_inline(&d->d_url, url)) != 0) { - NNI_FREE_STRUCT(d); + nni_free(d, sz); return (rv); } if ((rv = nni_dialer_init(d, s, tran)) != 0) { @@ -289,59 +311,24 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str) nni_sp_tran *tran; nni_dialer *d; int rv; + size_t sz; if (((tran = nni_sp_tran_find(url_str)) == NULL) || (tran->tran_dialer == NULL)) { return (NNG_ENOTSUP); } - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size; + if ((d = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } if ((rv = nni_url_parse_inline(&d->d_url, url_str)) != 0) { - NNI_FREE_STRUCT(d); + nni_free(d, sz); return (rv); } - d->d_closed = false; - d->d_data = NULL; - d->d_ref = 1; - d->d_sock = s; - d->d_tran = tran; - nni_atomic_flag_reset(&d->d_started); - - // Make a copy of the endpoint operations. This allows us to - // modify them (to override NULLs for example), and avoids an extra - // dereference on hot paths. - d->d_ops = *tran->tran_dialer; - - NNI_LIST_NODE_INIT(&d->d_node); - NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node); - - nni_mtx_init(&d->d_mtx); - - nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); - nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); - - nni_mtx_lock(&dialers_lk); - rv = nni_id_alloc32(&dialers, &d->d_id, d); - nni_mtx_unlock(&dialers_lk); - -#ifdef NNG_ENABLE_STATS - dialer_stats_init(d); -#endif - - if ((rv != 0) || - ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) || - ((rv = nni_sock_add_dialer(s, d)) != 0)) { - nni_mtx_lock(&dialers_lk); - nni_id_remove(&dialers, d->d_id); - nni_mtx_unlock(&dialers_lk); -#ifdef NNG_ENABLE_STATS - nni_stat_unregister(&d->st_root); -#endif + if ((rv = nni_dialer_init(d, s, tran)) != 0) { nni_dialer_destroy(d); return (rv); } - *dp = d; return (0); } @@ -438,6 +425,50 @@ dialer_connect_cb(void *arg) #ifdef NNG_ENABLE_STATS nni_stat_inc(&d->st_connect, 1); #endif + nni_pipe_start(nni_aio_get_output(aio, 0)); + break; + case NNG_ECLOSED: // No further action. + case NNG_ECANCELED: // No further action. + nni_dialer_bump_error(d, rv); + break; + case NNG_ECONNREFUSED: + case NNG_ETIMEDOUT: + default: + nng_log_warn("NNG-CONN-FAIL", + "Failed connecting socket<%u>: %s", nni_sock_id(d->d_sock), + nng_strerror(rv)); + + nni_dialer_bump_error(d, rv); + if (user_aio == NULL) { + nni_dialer_timer_start(d); + } else { + nni_atomic_flag_reset(&d->d_started); + } + break; + } + if (user_aio != NULL) { + nni_aio_finish(user_aio, rv, 0); + } +} + +static void +dialer_connect_cb_old(void *arg) +{ + nni_dialer *d = arg; + nni_aio *aio = &d->d_con_aio; + nni_aio *user_aio; + int rv; + + nni_mtx_lock(&d->d_mtx); + user_aio = d->d_user_aio; + d->d_user_aio = NULL; + nni_mtx_unlock(&d->d_mtx); + + switch ((rv = nni_aio_result(aio))) { + case 0: +#ifdef NNG_ENABLE_STATS + nni_stat_inc(&d->st_connect, 1); +#endif nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0)); break; case NNG_ECLOSED: // No further action. |
