aboutsummaryrefslogtreecommitdiff
path: root/src/core/dialer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/dialer.c')
-rw-r--r--src/core/dialer.c159
1 files changed, 95 insertions, 64 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 2945fded..388d9981 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -19,6 +19,7 @@
// Functionality related to dialing.
static void dialer_connect_start(nni_dialer *);
static void dialer_connect_cb(void *);
+static void dialer_connect_cb_old(void *);
static void dialer_timer_cb(void *);
static nni_id_map dialers = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0);
@@ -33,9 +34,6 @@ nni_dialer_id(nni_dialer *d)
void
nni_dialer_destroy(nni_dialer *d)
{
- nni_aio_stop(&d->d_con_aio);
- nni_aio_stop(&d->d_tmo_aio);
-
nni_aio_fini(&d->d_con_aio);
nni_aio_fini(&d->d_tmo_aio);
@@ -44,7 +42,7 @@ nni_dialer_destroy(nni_dialer *d)
}
nni_mtx_fini(&d->d_mtx);
nni_url_fini(&d->d_url);
- NNI_FREE_STRUCT(d);
+ nni_free(d, NNI_ALIGN_UP(sizeof(*d)) + d->d_ops.d_size);
}
#ifdef NNG_ENABLE_STATS
@@ -54,10 +52,12 @@ dialer_stat_init(nni_dialer *d, nni_stat_item *item, const nni_stat_info *info)
nni_stat_init(item, info);
nni_stat_add(&d->st_root, item);
}
+#endif // NNG_ENABLE_STATS
static void
dialer_stats_init(nni_dialer *d)
{
+#ifdef NNG_ENABLE_STATS
static const nni_stat_info root_info = {
.si_name = "dialer",
.si_desc = "dialer statistics",
@@ -155,13 +155,21 @@ dialer_stats_init(nni_dialer *d)
dialer_stat_init(d, &d->st_auth, &auth_info);
dialer_stat_init(d, &d->st_oom, &oom_info);
dialer_stat_init(d, &d->st_reject, &reject_info);
+#endif // NNG_ENABLE_STATS
+}
+static void
+dialer_register_stats(nni_dialer *d)
+{
+#ifdef NNG_ENABLE_STATS
nni_stat_set_id(&d->st_root, (int) d->d_id);
nni_stat_set_id(&d->st_id, (int) d->d_id);
nni_stat_set_id(&d->st_sock, (int) nni_sock_id(d->d_sock));
nni_stat_register(&d->st_root);
+#else
+ NNI_ARG_UNUSED(d);
+#endif
}
-#endif // NNG_ENABLE_STATS
void
nni_dialer_bump_error(nni_dialer *d, int err)
@@ -207,7 +215,8 @@ nni_dialer_bump_error(nni_dialer *d, int err)
static int
nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran)
{
- int rv;
+ int rv;
+ void *dp;
d->d_closed = false;
d->d_data = NULL;
@@ -226,30 +235,41 @@ nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran)
nni_mtx_init(&d->d_mtx);
- nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
- nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
+ if (d->d_ops.d_size != 0) {
+ d->d_data = ((uint8_t *) d) + NNI_ALIGN_UP(sizeof(*d));
+ dp = d->d_data;
+ } else {
+ // legacy: remove me when transports converted
+ dp = &d->d_data;
+ }
- nni_mtx_lock(&dialers_lk);
- rv = nni_id_alloc32(&dialers, &d->d_id, d);
- nni_mtx_unlock(&dialers_lk);
+ if (tran->tran_pipe->p_size) {
+ nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
+ } else {
+ // legacy: remove me
+ nni_aio_init(&d->d_con_aio, dialer_connect_cb_old, d);
+ }
+ nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
-#ifdef NNG_ENABLE_STATS
dialer_stats_init(d);
-#endif
- if ((rv != 0) ||
- ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) ||
- ((rv = nni_sock_add_dialer(s, d)) != 0)) {
+ rv = d->d_ops.d_init(dp, &d->d_url, d);
+
+ if (rv == 0) {
+ rv = nni_sock_add_dialer(s, d);
+ }
+
+ if (rv == 0) {
nni_mtx_lock(&dialers_lk);
- nni_id_remove(&dialers, d->d_id);
+ rv = nni_id_alloc32(&dialers, &d->d_id, d);
nni_mtx_unlock(&dialers_lk);
-#ifdef NNG_ENABLE_STATS
- nni_stat_unregister(&d->st_root);
-#endif
- return (rv);
}
- return (0);
+ if (rv == 0) {
+ dialer_register_stats(d);
+ }
+
+ return (rv);
}
int
@@ -258,16 +278,18 @@ nni_dialer_create_url(nni_dialer **dp, nni_sock *s, const nng_url *url)
nni_sp_tran *tran;
nni_dialer *d;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) ||
(tran->tran_dialer == NULL)) {
return (NNG_ENOTSUP);
}
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size;
+ if ((d = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_url_clone_inline(&d->d_url, url)) != 0) {
- NNI_FREE_STRUCT(d);
+ nni_free(d, sz);
return (rv);
}
if ((rv = nni_dialer_init(d, s, tran)) != 0) {
@@ -289,59 +311,24 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str)
nni_sp_tran *tran;
nni_dialer *d;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(url_str)) == NULL) ||
(tran->tran_dialer == NULL)) {
return (NNG_ENOTSUP);
}
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size;
+ if ((d = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_url_parse_inline(&d->d_url, url_str)) != 0) {
- NNI_FREE_STRUCT(d);
+ nni_free(d, sz);
return (rv);
}
- d->d_closed = false;
- d->d_data = NULL;
- d->d_ref = 1;
- d->d_sock = s;
- d->d_tran = tran;
- nni_atomic_flag_reset(&d->d_started);
-
- // Make a copy of the endpoint operations. This allows us to
- // modify them (to override NULLs for example), and avoids an extra
- // dereference on hot paths.
- d->d_ops = *tran->tran_dialer;
-
- NNI_LIST_NODE_INIT(&d->d_node);
- NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node);
-
- nni_mtx_init(&d->d_mtx);
-
- nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
- nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
-
- nni_mtx_lock(&dialers_lk);
- rv = nni_id_alloc32(&dialers, &d->d_id, d);
- nni_mtx_unlock(&dialers_lk);
-
-#ifdef NNG_ENABLE_STATS
- dialer_stats_init(d);
-#endif
-
- if ((rv != 0) ||
- ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) ||
- ((rv = nni_sock_add_dialer(s, d)) != 0)) {
- nni_mtx_lock(&dialers_lk);
- nni_id_remove(&dialers, d->d_id);
- nni_mtx_unlock(&dialers_lk);
-#ifdef NNG_ENABLE_STATS
- nni_stat_unregister(&d->st_root);
-#endif
+ if ((rv = nni_dialer_init(d, s, tran)) != 0) {
nni_dialer_destroy(d);
return (rv);
}
-
*dp = d;
return (0);
}
@@ -438,6 +425,50 @@ dialer_connect_cb(void *arg)
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&d->st_connect, 1);
#endif
+ nni_pipe_start(nni_aio_get_output(aio, 0));
+ break;
+ case NNG_ECLOSED: // No further action.
+ case NNG_ECANCELED: // No further action.
+ nni_dialer_bump_error(d, rv);
+ break;
+ case NNG_ECONNREFUSED:
+ case NNG_ETIMEDOUT:
+ default:
+ nng_log_warn("NNG-CONN-FAIL",
+ "Failed connecting socket<%u>: %s", nni_sock_id(d->d_sock),
+ nng_strerror(rv));
+
+ nni_dialer_bump_error(d, rv);
+ if (user_aio == NULL) {
+ nni_dialer_timer_start(d);
+ } else {
+ nni_atomic_flag_reset(&d->d_started);
+ }
+ break;
+ }
+ if (user_aio != NULL) {
+ nni_aio_finish(user_aio, rv, 0);
+ }
+}
+
+static void
+dialer_connect_cb_old(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_aio *aio = &d->d_con_aio;
+ nni_aio *user_aio;
+ int rv;
+
+ nni_mtx_lock(&d->d_mtx);
+ user_aio = d->d_user_aio;
+ d->d_user_aio = NULL;
+ nni_mtx_unlock(&d->d_mtx);
+
+ switch ((rv = nni_aio_result(aio))) {
+ case 0:
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&d->st_connect, 1);
+#endif
nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
break;
case NNG_ECLOSED: // No further action.