aboutsummaryrefslogtreecommitdiff
path: root/src/core/dialer.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-07 22:28:15 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-08 14:02:42 -0800
commitd6ab6bca7a538c1a320ce00ab845e98c16649c94 (patch)
treed4a8a4f8ac49eee54c2d9bbdf8f9da6c28362311 /src/core/dialer.c
parent5223a142e38a320ce53097cea450d8ba7f175193 (diff)
downloadnng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.gz
nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.bz2
nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.zip
pipes and endpoints: support for inline allocations of transport data
This is a new transport API, which should make it easier for transports to rely upon lifetime guarantees made by the common SP framework, thus eliminating the need for transport specific reference counters, reap lists, and similar. The transport declares the size of the object in the ops vector (for pipe, dialer, or listener), and the framework supplies one allocated using the associated allocator. For now these add the pipe object to the socket and endpoint using linked linked lists. The plan is to transition those to reference counts which should be lighter weight and free form locking issues. The pipe teardown has been moved more fully to the reaper, to avoid some of the deadlocks that can occur as nni_pipe_close can be called from almost any context. For now the old API is retained as well, but the intention is to convert all the transports and then remove it.
Diffstat (limited to 'src/core/dialer.c')
-rw-r--r--src/core/dialer.c159
1 files changed, 95 insertions, 64 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 2945fded..388d9981 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -19,6 +19,7 @@
// Functionality related to dialing.
static void dialer_connect_start(nni_dialer *);
static void dialer_connect_cb(void *);
+static void dialer_connect_cb_old(void *);
static void dialer_timer_cb(void *);
static nni_id_map dialers = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0);
@@ -33,9 +34,6 @@ nni_dialer_id(nni_dialer *d)
void
nni_dialer_destroy(nni_dialer *d)
{
- nni_aio_stop(&d->d_con_aio);
- nni_aio_stop(&d->d_tmo_aio);
-
nni_aio_fini(&d->d_con_aio);
nni_aio_fini(&d->d_tmo_aio);
@@ -44,7 +42,7 @@ nni_dialer_destroy(nni_dialer *d)
}
nni_mtx_fini(&d->d_mtx);
nni_url_fini(&d->d_url);
- NNI_FREE_STRUCT(d);
+ nni_free(d, NNI_ALIGN_UP(sizeof(*d)) + d->d_ops.d_size);
}
#ifdef NNG_ENABLE_STATS
@@ -54,10 +52,12 @@ dialer_stat_init(nni_dialer *d, nni_stat_item *item, const nni_stat_info *info)
nni_stat_init(item, info);
nni_stat_add(&d->st_root, item);
}
+#endif // NNG_ENABLE_STATS
static void
dialer_stats_init(nni_dialer *d)
{
+#ifdef NNG_ENABLE_STATS
static const nni_stat_info root_info = {
.si_name = "dialer",
.si_desc = "dialer statistics",
@@ -155,13 +155,21 @@ dialer_stats_init(nni_dialer *d)
dialer_stat_init(d, &d->st_auth, &auth_info);
dialer_stat_init(d, &d->st_oom, &oom_info);
dialer_stat_init(d, &d->st_reject, &reject_info);
+#endif // NNG_ENABLE_STATS
+}
+static void
+dialer_register_stats(nni_dialer *d)
+{
+#ifdef NNG_ENABLE_STATS
nni_stat_set_id(&d->st_root, (int) d->d_id);
nni_stat_set_id(&d->st_id, (int) d->d_id);
nni_stat_set_id(&d->st_sock, (int) nni_sock_id(d->d_sock));
nni_stat_register(&d->st_root);
+#else
+ NNI_ARG_UNUSED(d);
+#endif
}
-#endif // NNG_ENABLE_STATS
void
nni_dialer_bump_error(nni_dialer *d, int err)
@@ -207,7 +215,8 @@ nni_dialer_bump_error(nni_dialer *d, int err)
static int
nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran)
{
- int rv;
+ int rv;
+ void *dp;
d->d_closed = false;
d->d_data = NULL;
@@ -226,30 +235,41 @@ nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran)
nni_mtx_init(&d->d_mtx);
- nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
- nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
+ if (d->d_ops.d_size != 0) {
+ d->d_data = ((uint8_t *) d) + NNI_ALIGN_UP(sizeof(*d));
+ dp = d->d_data;
+ } else {
+ // legacy: remove me when transports converted
+ dp = &d->d_data;
+ }
- nni_mtx_lock(&dialers_lk);
- rv = nni_id_alloc32(&dialers, &d->d_id, d);
- nni_mtx_unlock(&dialers_lk);
+ if (tran->tran_pipe->p_size) {
+ nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
+ } else {
+ // legacy: remove me
+ nni_aio_init(&d->d_con_aio, dialer_connect_cb_old, d);
+ }
+ nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
-#ifdef NNG_ENABLE_STATS
dialer_stats_init(d);
-#endif
- if ((rv != 0) ||
- ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) ||
- ((rv = nni_sock_add_dialer(s, d)) != 0)) {
+ rv = d->d_ops.d_init(dp, &d->d_url, d);
+
+ if (rv == 0) {
+ rv = nni_sock_add_dialer(s, d);
+ }
+
+ if (rv == 0) {
nni_mtx_lock(&dialers_lk);
- nni_id_remove(&dialers, d->d_id);
+ rv = nni_id_alloc32(&dialers, &d->d_id, d);
nni_mtx_unlock(&dialers_lk);
-#ifdef NNG_ENABLE_STATS
- nni_stat_unregister(&d->st_root);
-#endif
- return (rv);
}
- return (0);
+ if (rv == 0) {
+ dialer_register_stats(d);
+ }
+
+ return (rv);
}
int
@@ -258,16 +278,18 @@ nni_dialer_create_url(nni_dialer **dp, nni_sock *s, const nng_url *url)
nni_sp_tran *tran;
nni_dialer *d;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) ||
(tran->tran_dialer == NULL)) {
return (NNG_ENOTSUP);
}
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size;
+ if ((d = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_url_clone_inline(&d->d_url, url)) != 0) {
- NNI_FREE_STRUCT(d);
+ nni_free(d, sz);
return (rv);
}
if ((rv = nni_dialer_init(d, s, tran)) != 0) {
@@ -289,59 +311,24 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str)
nni_sp_tran *tran;
nni_dialer *d;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(url_str)) == NULL) ||
(tran->tran_dialer == NULL)) {
return (NNG_ENOTSUP);
}
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size;
+ if ((d = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_url_parse_inline(&d->d_url, url_str)) != 0) {
- NNI_FREE_STRUCT(d);
+ nni_free(d, sz);
return (rv);
}
- d->d_closed = false;
- d->d_data = NULL;
- d->d_ref = 1;
- d->d_sock = s;
- d->d_tran = tran;
- nni_atomic_flag_reset(&d->d_started);
-
- // Make a copy of the endpoint operations. This allows us to
- // modify them (to override NULLs for example), and avoids an extra
- // dereference on hot paths.
- d->d_ops = *tran->tran_dialer;
-
- NNI_LIST_NODE_INIT(&d->d_node);
- NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node);
-
- nni_mtx_init(&d->d_mtx);
-
- nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
- nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
-
- nni_mtx_lock(&dialers_lk);
- rv = nni_id_alloc32(&dialers, &d->d_id, d);
- nni_mtx_unlock(&dialers_lk);
-
-#ifdef NNG_ENABLE_STATS
- dialer_stats_init(d);
-#endif
-
- if ((rv != 0) ||
- ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) ||
- ((rv = nni_sock_add_dialer(s, d)) != 0)) {
- nni_mtx_lock(&dialers_lk);
- nni_id_remove(&dialers, d->d_id);
- nni_mtx_unlock(&dialers_lk);
-#ifdef NNG_ENABLE_STATS
- nni_stat_unregister(&d->st_root);
-#endif
+ if ((rv = nni_dialer_init(d, s, tran)) != 0) {
nni_dialer_destroy(d);
return (rv);
}
-
*dp = d;
return (0);
}
@@ -438,6 +425,50 @@ dialer_connect_cb(void *arg)
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&d->st_connect, 1);
#endif
+ nni_pipe_start(nni_aio_get_output(aio, 0));
+ break;
+ case NNG_ECLOSED: // No further action.
+ case NNG_ECANCELED: // No further action.
+ nni_dialer_bump_error(d, rv);
+ break;
+ case NNG_ECONNREFUSED:
+ case NNG_ETIMEDOUT:
+ default:
+ nng_log_warn("NNG-CONN-FAIL",
+ "Failed connecting socket<%u>: %s", nni_sock_id(d->d_sock),
+ nng_strerror(rv));
+
+ nni_dialer_bump_error(d, rv);
+ if (user_aio == NULL) {
+ nni_dialer_timer_start(d);
+ } else {
+ nni_atomic_flag_reset(&d->d_started);
+ }
+ break;
+ }
+ if (user_aio != NULL) {
+ nni_aio_finish(user_aio, rv, 0);
+ }
+}
+
+static void
+dialer_connect_cb_old(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_aio *aio = &d->d_con_aio;
+ nni_aio *user_aio;
+ int rv;
+
+ nni_mtx_lock(&d->d_mtx);
+ user_aio = d->d_user_aio;
+ d->d_user_aio = NULL;
+ nni_mtx_unlock(&d->d_mtx);
+
+ switch ((rv = nni_aio_result(aio))) {
+ case 0:
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&d->st_connect, 1);
+#endif
nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
break;
case NNG_ECLOSED: // No further action.