aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-07 22:28:15 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-08 14:02:42 -0800
commitd6ab6bca7a538c1a320ce00ab845e98c16649c94 (patch)
treed4a8a4f8ac49eee54c2d9bbdf8f9da6c28362311
parent5223a142e38a320ce53097cea450d8ba7f175193 (diff)
downloadnng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.gz
nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.bz2
nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.zip
pipes and endpoints: support for inline allocations of transport data
This is a new transport API, which should make it easier for transports to rely upon lifetime guarantees made by the common SP framework, thus eliminating the need for transport specific reference counters, reap lists, and similar. The transport declares the size of the object in the ops vector (for pipe, dialer, or listener), and the framework supplies one allocated using the associated allocator. For now these add the pipe object to the socket and endpoint using linked linked lists. The plan is to transition those to reference counts which should be lighter weight and free form locking issues. The pipe teardown has been moved more fully to the reaper, to avoid some of the deadlocks that can occur as nni_pipe_close can be called from almost any context. For now the old API is retained as well, but the intention is to convert all the transports and then remove it.
-rw-r--r--src/core/dialer.c159
-rw-r--r--src/core/listener.c120
-rw-r--r--src/core/pipe.c169
-rw-r--r--src/core/pipe.h3
-rw-r--r--src/core/socket.c144
-rw-r--r--src/core/sockimpl.h2
-rw-r--r--src/sp/transport.h22
7 files changed, 417 insertions, 202 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 2945fded..388d9981 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -19,6 +19,7 @@
// Functionality related to dialing.
static void dialer_connect_start(nni_dialer *);
static void dialer_connect_cb(void *);
+static void dialer_connect_cb_old(void *);
static void dialer_timer_cb(void *);
static nni_id_map dialers = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0);
@@ -33,9 +34,6 @@ nni_dialer_id(nni_dialer *d)
void
nni_dialer_destroy(nni_dialer *d)
{
- nni_aio_stop(&d->d_con_aio);
- nni_aio_stop(&d->d_tmo_aio);
-
nni_aio_fini(&d->d_con_aio);
nni_aio_fini(&d->d_tmo_aio);
@@ -44,7 +42,7 @@ nni_dialer_destroy(nni_dialer *d)
}
nni_mtx_fini(&d->d_mtx);
nni_url_fini(&d->d_url);
- NNI_FREE_STRUCT(d);
+ nni_free(d, NNI_ALIGN_UP(sizeof(*d)) + d->d_ops.d_size);
}
#ifdef NNG_ENABLE_STATS
@@ -54,10 +52,12 @@ dialer_stat_init(nni_dialer *d, nni_stat_item *item, const nni_stat_info *info)
nni_stat_init(item, info);
nni_stat_add(&d->st_root, item);
}
+#endif // NNG_ENABLE_STATS
static void
dialer_stats_init(nni_dialer *d)
{
+#ifdef NNG_ENABLE_STATS
static const nni_stat_info root_info = {
.si_name = "dialer",
.si_desc = "dialer statistics",
@@ -155,13 +155,21 @@ dialer_stats_init(nni_dialer *d)
dialer_stat_init(d, &d->st_auth, &auth_info);
dialer_stat_init(d, &d->st_oom, &oom_info);
dialer_stat_init(d, &d->st_reject, &reject_info);
+#endif // NNG_ENABLE_STATS
+}
+static void
+dialer_register_stats(nni_dialer *d)
+{
+#ifdef NNG_ENABLE_STATS
nni_stat_set_id(&d->st_root, (int) d->d_id);
nni_stat_set_id(&d->st_id, (int) d->d_id);
nni_stat_set_id(&d->st_sock, (int) nni_sock_id(d->d_sock));
nni_stat_register(&d->st_root);
+#else
+ NNI_ARG_UNUSED(d);
+#endif
}
-#endif // NNG_ENABLE_STATS
void
nni_dialer_bump_error(nni_dialer *d, int err)
@@ -207,7 +215,8 @@ nni_dialer_bump_error(nni_dialer *d, int err)
static int
nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran)
{
- int rv;
+ int rv;
+ void *dp;
d->d_closed = false;
d->d_data = NULL;
@@ -226,30 +235,41 @@ nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran)
nni_mtx_init(&d->d_mtx);
- nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
- nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
+ if (d->d_ops.d_size != 0) {
+ d->d_data = ((uint8_t *) d) + NNI_ALIGN_UP(sizeof(*d));
+ dp = d->d_data;
+ } else {
+ // legacy: remove me when transports converted
+ dp = &d->d_data;
+ }
- nni_mtx_lock(&dialers_lk);
- rv = nni_id_alloc32(&dialers, &d->d_id, d);
- nni_mtx_unlock(&dialers_lk);
+ if (tran->tran_pipe->p_size) {
+ nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
+ } else {
+ // legacy: remove me
+ nni_aio_init(&d->d_con_aio, dialer_connect_cb_old, d);
+ }
+ nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
-#ifdef NNG_ENABLE_STATS
dialer_stats_init(d);
-#endif
- if ((rv != 0) ||
- ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) ||
- ((rv = nni_sock_add_dialer(s, d)) != 0)) {
+ rv = d->d_ops.d_init(dp, &d->d_url, d);
+
+ if (rv == 0) {
+ rv = nni_sock_add_dialer(s, d);
+ }
+
+ if (rv == 0) {
nni_mtx_lock(&dialers_lk);
- nni_id_remove(&dialers, d->d_id);
+ rv = nni_id_alloc32(&dialers, &d->d_id, d);
nni_mtx_unlock(&dialers_lk);
-#ifdef NNG_ENABLE_STATS
- nni_stat_unregister(&d->st_root);
-#endif
- return (rv);
}
- return (0);
+ if (rv == 0) {
+ dialer_register_stats(d);
+ }
+
+ return (rv);
}
int
@@ -258,16 +278,18 @@ nni_dialer_create_url(nni_dialer **dp, nni_sock *s, const nng_url *url)
nni_sp_tran *tran;
nni_dialer *d;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) ||
(tran->tran_dialer == NULL)) {
return (NNG_ENOTSUP);
}
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size;
+ if ((d = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_url_clone_inline(&d->d_url, url)) != 0) {
- NNI_FREE_STRUCT(d);
+ nni_free(d, sz);
return (rv);
}
if ((rv = nni_dialer_init(d, s, tran)) != 0) {
@@ -289,59 +311,24 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str)
nni_sp_tran *tran;
nni_dialer *d;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(url_str)) == NULL) ||
(tran->tran_dialer == NULL)) {
return (NNG_ENOTSUP);
}
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size;
+ if ((d = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_url_parse_inline(&d->d_url, url_str)) != 0) {
- NNI_FREE_STRUCT(d);
+ nni_free(d, sz);
return (rv);
}
- d->d_closed = false;
- d->d_data = NULL;
- d->d_ref = 1;
- d->d_sock = s;
- d->d_tran = tran;
- nni_atomic_flag_reset(&d->d_started);
-
- // Make a copy of the endpoint operations. This allows us to
- // modify them (to override NULLs for example), and avoids an extra
- // dereference on hot paths.
- d->d_ops = *tran->tran_dialer;
-
- NNI_LIST_NODE_INIT(&d->d_node);
- NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node);
-
- nni_mtx_init(&d->d_mtx);
-
- nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
- nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
-
- nni_mtx_lock(&dialers_lk);
- rv = nni_id_alloc32(&dialers, &d->d_id, d);
- nni_mtx_unlock(&dialers_lk);
-
-#ifdef NNG_ENABLE_STATS
- dialer_stats_init(d);
-#endif
-
- if ((rv != 0) ||
- ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) ||
- ((rv = nni_sock_add_dialer(s, d)) != 0)) {
- nni_mtx_lock(&dialers_lk);
- nni_id_remove(&dialers, d->d_id);
- nni_mtx_unlock(&dialers_lk);
-#ifdef NNG_ENABLE_STATS
- nni_stat_unregister(&d->st_root);
-#endif
+ if ((rv = nni_dialer_init(d, s, tran)) != 0) {
nni_dialer_destroy(d);
return (rv);
}
-
*dp = d;
return (0);
}
@@ -438,6 +425,50 @@ dialer_connect_cb(void *arg)
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&d->st_connect, 1);
#endif
+ nni_pipe_start(nni_aio_get_output(aio, 0));
+ break;
+ case NNG_ECLOSED: // No further action.
+ case NNG_ECANCELED: // No further action.
+ nni_dialer_bump_error(d, rv);
+ break;
+ case NNG_ECONNREFUSED:
+ case NNG_ETIMEDOUT:
+ default:
+ nng_log_warn("NNG-CONN-FAIL",
+ "Failed connecting socket<%u>: %s", nni_sock_id(d->d_sock),
+ nng_strerror(rv));
+
+ nni_dialer_bump_error(d, rv);
+ if (user_aio == NULL) {
+ nni_dialer_timer_start(d);
+ } else {
+ nni_atomic_flag_reset(&d->d_started);
+ }
+ break;
+ }
+ if (user_aio != NULL) {
+ nni_aio_finish(user_aio, rv, 0);
+ }
+}
+
+static void
+dialer_connect_cb_old(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_aio *aio = &d->d_con_aio;
+ nni_aio *user_aio;
+ int rv;
+
+ nni_mtx_lock(&d->d_mtx);
+ user_aio = d->d_user_aio;
+ d->d_user_aio = NULL;
+ nni_mtx_unlock(&d->d_mtx);
+
+ switch ((rv = nni_aio_result(aio))) {
+ case 0:
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&d->st_connect, 1);
+#endif
nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
break;
case NNG_ECLOSED: // No further action.
diff --git a/src/core/listener.c b/src/core/listener.c
index 9107e3a4..3dc2a0fa 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -22,6 +22,7 @@
static void listener_accept_start(nni_listener *);
static void listener_accept_cb(void *);
+static void listener_accept_cb_old(void *);
static void listener_timer_cb(void *);
static nni_id_map listeners = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0);
@@ -44,7 +45,7 @@ nni_listener_destroy(nni_listener *l)
l->l_ops.l_fini(l->l_data);
}
nni_url_fini(&l->l_url);
- NNI_FREE_STRUCT(l);
+ nni_free(l, NNI_ALIGN_UP(sizeof(*l)) + l->l_ops.l_size);
}
#ifdef NNG_ENABLE_STATS
@@ -55,10 +56,12 @@ listener_stat_init(
nni_stat_init(item, info);
nni_stat_add(&l->st_root, item);
}
+#endif
static void
listener_stats_init(nni_listener *l)
{
+#ifdef NNG_ENABLE_STATS
static const nni_stat_info root_info = {
.si_name = "listener",
.si_desc = "listener statistics",
@@ -153,9 +156,23 @@ listener_stats_init(nni_listener *l)
nni_stat_set_id(&l->st_root, (int) l->l_id);
nni_stat_set_id(&l->st_id, (int) l->l_id);
nni_stat_set_id(&l->st_sock, (int) nni_sock_id(l->l_sock));
+#else
+ NNI_ARG_UNUSED(l);
+#endif // NNG_ENABLE_STATS
+}
+
+static void
+listener_register_stats(nni_listener *l)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_set_id(&l->st_root, (int) l->l_id);
+ nni_stat_set_id(&l->st_id, (int) l->l_id);
+ nni_stat_set_id(&l->st_sock, (int) nni_sock_id(l->l_sock));
nni_stat_register(&l->st_root);
+#else
+ NNI_ARG_UNUSED(l);
+#endif
}
-#endif // NNG_ENABLE_STATS
void
nni_listener_bump_error(nni_listener *l, int err)
@@ -195,7 +212,8 @@ nni_listener_bump_error(nni_listener *l, int err)
static int
nni_listener_init(nni_listener *l, nni_sock *s, nni_sp_tran *tran)
{
- int rv;
+ int rv;
+ void *lp;
l->l_closed = false;
l->l_data = NULL;
@@ -212,30 +230,40 @@ nni_listener_init(nni_listener *l, nni_sock *s, nni_sp_tran *tran)
NNI_LIST_NODE_INIT(&l->l_node);
NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node);
- nni_aio_init(&l->l_acc_aio, listener_accept_cb, l);
+ if (tran->tran_pipe->p_size) {
+ nni_aio_init(&l->l_acc_aio, listener_accept_cb, l);
+ } else {
+ nni_aio_init(&l->l_acc_aio, listener_accept_cb_old, l);
+ }
nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l);
- nni_mtx_lock(&listeners_lk);
- rv = nni_id_alloc32(&listeners, &l->l_id, l);
- nni_mtx_unlock(&listeners_lk);
-
-#ifdef NNG_ENABLE_STATS
listener_stats_init(l);
-#endif
- if ((rv != 0) ||
- ((rv = l->l_ops.l_init(&l->l_data, &l->l_url, l)) != 0) ||
- ((rv = nni_sock_add_listener(s, l)) != 0)) {
+ if (l->l_ops.l_size != 0) {
+ l->l_data = ((uint8_t *) l) + NNI_ALIGN_UP(sizeof(*l));
+ lp = l->l_data;
+ } else {
+ // legacy: remove me when transports converted
+ lp = &l->l_data;
+ }
+
+ rv = l->l_ops.l_init(lp, &l->l_url, l);
+
+ if (rv == 0) {
+ rv = nni_sock_add_listener(s, l);
+ }
+
+ if (rv == 0) {
nni_mtx_lock(&listeners_lk);
- nni_id_remove(&listeners, l->l_id);
+ rv = nni_id_alloc32(&listeners, &l->l_id, l);
nni_mtx_unlock(&listeners_lk);
-#ifdef NNG_ENABLE_STATS
- nni_stat_unregister(&l->st_root);
-#endif
- return (rv);
}
- return (0);
+ if (rv == 0) {
+ listener_register_stats(l);
+ }
+
+ return (rv);
}
int
@@ -244,16 +272,18 @@ nni_listener_create_url(nni_listener **lp, nni_sock *s, const nng_url *url)
nni_sp_tran *tran;
nni_listener *l;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) ||
(tran->tran_listener == NULL)) {
return (NNG_ENOTSUP);
}
- if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*l)) + tran->tran_listener->l_size;
+ if ((l = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_url_clone_inline(&l->l_url, url)) != 0) {
- NNI_FREE_STRUCT(l);
+ nni_free(l, sz);
return (rv);
}
if ((rv = nni_listener_init(l, s, tran)) != 0) {
@@ -276,16 +306,19 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str)
nni_sp_tran *tran;
nni_listener *l;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(url_str)) == NULL) ||
(tran->tran_listener == NULL)) {
return (NNG_ENOTSUP);
}
- if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*l)) + tran->tran_listener->l_size;
+ if ((l = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
+
if ((rv = nni_url_parse_inline(&l->l_url, url_str)) != 0) {
- NNI_FREE_STRUCT(l);
+ nni_free(l, sz);
return (rv);
}
if ((rv = nni_listener_init(l, s, tran)) != 0) {
@@ -381,6 +414,47 @@ listener_accept_cb(void *arg)
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&l->st_accept, 1);
#endif
+ nni_pipe_start(nni_aio_get_output(aio, 0));
+ listener_accept_start(l);
+ break;
+ case NNG_ECONNABORTED: // remote condition, no cool down
+ case NNG_ECONNRESET: // remote condition, no cool down
+ case NNG_ETIMEDOUT: // No need to sleep, we timed out already.
+ case NNG_EPEERAUTH: // peer validation failure
+ nng_log_warn("NNG-ACCEPT-FAIL",
+ "Failed accepting for socket<%u>: %s",
+ nni_sock_id(l->l_sock), nng_strerror(rv));
+ nni_listener_bump_error(l, rv);
+ listener_accept_start(l);
+ break;
+ case NNG_ECLOSED: // no further action
+ case NNG_ECANCELED: // no further action
+ nni_listener_bump_error(l, rv);
+ break;
+ default:
+ // We don't really know why we failed, but we back off
+ // here. This is because errors here are probably due
+ // to system failures (resource exhaustion) and we hope
+ // by not thrashing we give the system a chance to
+ // recover. 100 ms is enough to cool down.
+ nni_listener_bump_error(l, rv);
+ nni_sleep_aio(100, &l->l_tmo_aio);
+ break;
+ }
+}
+
+static void
+listener_accept_cb_old(void *arg)
+{
+ nni_listener *l = arg;
+ nni_aio *aio = &l->l_acc_aio;
+ int rv;
+
+ switch ((rv = nni_aio_result(aio))) {
+ case 0:
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&l->st_accept, 1);
+#endif
nni_listener_add_pipe(l, nni_aio_get_output(aio, 0));
listener_accept_start(l);
break;
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 05089fee..eaf49af9 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -38,9 +38,8 @@ pipe_destroy(void *arg)
nni_pipe *p = arg;
p->p_proto_ops.pipe_fini(p->p_proto_data);
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.p_fini(p->p_tran_data);
- }
+ p->p_tran_ops.p_fini(p->p_tran_data);
+
nni_free(p, p->p_size);
}
@@ -49,6 +48,11 @@ pipe_reap(void *arg)
{
nni_pipe *p = arg;
+ p->p_proto_ops.pipe_close(p->p_proto_data);
+
+ // Close the underlying transport.
+ p->p_tran_ops.p_close(p->p_tran_data);
+
nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST);
// Make sure any unlocked holders are done with this.
@@ -62,12 +66,11 @@ pipe_reap(void *arg)
#ifdef NNG_ENABLE_STATS
nni_stat_unregister(&p->st_root);
#endif
- nni_pipe_remove(p);
p->p_proto_ops.pipe_stop(p->p_proto_data);
- if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) {
- p->p_tran_ops.p_stop(p->p_tran_data);
- }
+ p->p_tran_ops.p_stop(p->p_tran_data);
+
+ nni_pipe_remove(p);
nni_pipe_rele(p);
}
@@ -125,13 +128,6 @@ nni_pipe_close(nni_pipe *p)
return; // We already did a close.
}
- p->p_proto_ops.pipe_close(p->p_proto_data);
-
- // Close the underlying transport.
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.p_close(p->p_tran_data);
- }
-
nni_reap(&pipe_reap_list, p);
}
@@ -154,10 +150,12 @@ pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info)
nni_stat_init(item, info);
nni_stat_add(&p->st_root, item);
}
+#endif
static void
pipe_stats_init(nni_pipe *p)
{
+#ifdef NNG_ENABLE_STATS
static const nni_stat_info root_info = {
.si_name = "pipe",
.si_desc = "pipe statistics",
@@ -201,6 +199,16 @@ pipe_stats_init(nni_pipe *p)
.si_unit = NNG_UNIT_BYTES,
.si_atomic = true,
};
+ static const nni_stat_info dialer_info = {
+ .si_name = "dialer",
+ .si_desc = "dialer for pipe",
+ .si_type = NNG_STAT_ID,
+ };
+ static const nni_stat_info listener_info = {
+ .si_name = "listener",
+ .si_desc = "listener for pipe",
+ .si_type = NNG_STAT_ID,
+ };
nni_stat_init(&p->st_root, &root_info);
pipe_stat_init(p, &p->st_id, &id_info);
@@ -213,34 +221,56 @@ pipe_stats_init(nni_pipe *p)
nni_stat_set_id(&p->st_root, (int) p->p_id);
nni_stat_set_id(&p->st_id, (int) p->p_id);
nni_stat_set_id(&p->st_sock_id, (int) nni_sock_id(p->p_sock));
-}
+
+ if (p->p_dialer) {
+ NNI_ASSERT(p->p_listener == NULL);
+ pipe_stat_init(p, &p->st_ep_id, &dialer_info);
+ nni_stat_set_id(
+ &p->st_ep_id, (int) nni_dialer_id(p->p_dialer));
+ }
+ if (p->p_listener) {
+ pipe_stat_init(p, &p->st_ep_id, &listener_info);
+ nni_stat_set_id(
+ &p->st_ep_id, (int) nni_listener_id(p->p_listener));
+ }
+#else
+ NNI_ARG_UNUSED(p);
#endif // NNG_ENABLE_STATS
+}
static int
-pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)
+pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d,
+ nni_listener *l, void *tran_data)
{
- nni_pipe *p;
- int rv;
- void *sock_data = nni_sock_proto_data(sock);
- nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
- size_t sz;
+ nni_pipe *p;
+ int rv1, rv2, rv3;
+ void *sock_data = nni_sock_proto_data(sock);
+ nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
+ const nni_sp_pipe_ops *tops = tran->tran_pipe;
+ size_t sz;
- sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size;
+ sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) +
+ NNI_ALIGN_UP(tops->p_size);
if ((p = nni_zalloc(sz)) == NULL) {
- // In this case we just toss the pipe...
- tran->tran_pipe->p_fini(tran_data);
+ // TODO: remove when all transports converted
+ // to use p_size.
+ if (tran_data != NULL) {
+ tops->p_fini(tran_data);
+ }
return (NNG_ENOMEM);
}
- p->p_size = sz;
- p->p_proto_data = p + 1;
- p->p_tran_ops = *tran->tran_pipe;
- p->p_tran_data = tran_data;
- p->p_proto_ops = *pops;
- p->p_sock = sock;
- p->p_cbs = false;
+ p->p_size = sz;
+ p->p_proto_ops = *pops;
+ p->p_tran_ops = *tops;
+ p->p_sock = sock;
+ p->p_cbs = false;
+ p->p_dialer = d;
+ p->p_listener = l;
+ // Two references - one for our caller, and
+ // one to be dropped when the pipe is closed.
nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy);
nni_atomic_init_bool(&p->p_closed);
@@ -248,19 +278,30 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
+ uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p));
+
+ if (tran_data == NULL) {
+ tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size);
+ }
+ nni_pipe_add(p);
+
+ p->p_tran_data = tran_data;
+ p->p_proto_data = proto_data;
+
nni_mtx_lock(&pipes_lk);
- rv = nni_id_alloc32(&pipes, &p->p_id, p);
+ rv1 = nni_id_alloc32(&pipes, &p->p_id, p);
nni_mtx_unlock(&pipes_lk);
-#ifdef NNG_ENABLE_STATS
+ // must be done before protocol or transports, because
+ // they may add further stats
pipe_stats_init(p);
-#endif
- if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) ||
- ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) {
+ rv2 = tops->p_init(tran_data, p);
+ rv3 = pops->pipe_init(proto_data, p, sock_data);
+ if (rv1 != 0 || rv2 != 0 || rv3 != 0) {
nni_pipe_close(p);
nni_pipe_rele(p);
- return (rv);
+ return (rv1 ? rv1 : rv2 ? rv2 : rv3);
}
*pp = p;
@@ -274,19 +315,9 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data)
nni_sp_tran *tran = d->d_tran;
nni_pipe *p;
- if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) {
+ if ((rv = pipe_create(&p, d->d_sock, tran, d, NULL, tran_data)) != 0) {
return (rv);
}
- p->p_dialer = d;
-#ifdef NNG_ENABLE_STATS
- static const nni_stat_info dialer_info = {
- .si_name = "dialer",
- .si_desc = "dialer for pipe",
- .si_type = NNG_STAT_ID,
- };
- pipe_stat_init(p, &p->st_ep_id, &dialer_info);
- nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d));
-#endif
*pp = p;
return (0);
}
@@ -298,24 +329,44 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data)
nni_sp_tran *tran = l->l_tran;
nni_pipe *p;
- if ((rv = pipe_create(&p, l->l_sock, tran, tran_data)) != 0) {
+ if ((rv = pipe_create(&p, l->l_sock, tran, NULL, l, tran_data)) != 0) {
return (rv);
}
- p->p_listener = l;
-#ifdef NNG_ENABLE_STATS
- static const nni_stat_info listener_info = {
- .si_name = "listener",
- .si_desc = "listener for pipe",
- .si_type = NNG_STAT_ID,
- };
- pipe_stat_init(p, &p->st_ep_id, &listener_info);
- nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l));
-#endif
*pp = p;
return (0);
}
int
+nni_pipe_alloc_dialer(void **datap, nni_dialer *d)
+{
+ int rv;
+ nni_sp_tran *tran = d->d_tran;
+ nni_sock *s = d->d_sock;
+ nni_pipe *p;
+
+ if ((rv = pipe_create(&p, s, tran, d, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ *datap = p->p_tran_data;
+ return (0);
+}
+
+int
+nni_pipe_alloc_listener(void **datap, nni_listener *l)
+{
+ int rv;
+ nni_sp_tran *tran = l->l_tran;
+ nni_sock *s = l->l_sock;
+ nni_pipe *p;
+
+ if ((rv = pipe_create(&p, s, tran, NULL, l, NULL)) != 0) {
+ return (rv);
+ }
+ *datap = p->p_tran_data;
+ return (0);
+}
+
+int
nni_pipe_getopt(
nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
{
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 9e7109de..71e07a9f 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -63,4 +63,7 @@ extern void nni_pipe_bump_error(nni_pipe *, int);
extern char *nni_pipe_peer_addr(nni_pipe *p, char buf[NNG_MAXADDRSTRLEN]);
+extern int nni_pipe_alloc_dialer(void **, nni_dialer *);
+extern int nni_pipe_alloc_listener(void **, nni_listener *);
+
#endif // CORE_PIPE_H
diff --git a/src/core/socket.c b/src/core/socket.c
index fe1b62d2..ec2691a3 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -645,6 +645,13 @@ nni_sock_shutdown(nni_sock *sock)
nni_dialer_close(d);
nni_mtx_lock(&sock->s_mx);
}
+
+ // For each pipe, arrange for it to teardown hard. We would
+ // expect there not to be any here.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+
nni_mtx_unlock(&sock->s_mx);
// Close the upper queues immediately.
@@ -679,30 +686,15 @@ nni_sock_shutdown(nni_sock *sock)
nni_mtx_unlock(&sock_lk);
nni_mtx_lock(&sock->s_mx);
-
- // At this point, we've done everything we politely can to
- // give the protocol a chance to flush its write side. Now
- // it is time to be a little more insistent.
-
- // For each pipe, arrange for it to teardown hard. We would
- // expect there not to be any here.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_close(pipe);
- }
-
// We have to wait for pipes to be removed.
while (!nni_list_empty(&sock->s_pipes)) {
nni_cv_wait(&sock->s_cv);
}
-
- sock->s_sock_ops.sock_close(sock->s_data);
-
- nni_cv_wake(&sock->s_cv);
-
NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL);
-
nni_mtx_unlock(&sock->s_mx);
+ sock->s_sock_ops.sock_close(sock->s_data);
+
// At this point, there are no threads blocked inside of us
// that are referencing socket state. User code should call
// nng_close to release the last resources.
@@ -1277,24 +1269,16 @@ nni_dialer_timer_start(nni_dialer *d)
nni_mtx_unlock(&s->s_mx);
}
-void
-nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
+static void
+dialer_start_pipe(nni_dialer *d, nni_pipe *p)
{
nni_sock *s = d->d_sock;
- nni_pipe *p;
nni_mtx_lock(&s->s_mx);
-
- if (nni_pipe_create_dialer(&p, d, tpipe) != 0) {
- nni_mtx_unlock(&s->s_mx);
- return;
- }
-
- nni_list_append(&d->d_pipes, p);
- nni_list_append(&s->s_pipes, p);
d->d_pipe = p;
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);
+
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&s->st_pipes, 1);
nni_stat_inc(&d->st_pipes, 1);
@@ -1342,6 +1326,24 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
}
void
+nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
+{
+ nni_sock *s = d->d_sock;
+ nni_pipe *p;
+
+ if (nni_pipe_create_dialer(&p, d, tpipe) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&s->s_mx);
+ d->d_pipe = p;
+ d->d_currtime = d->d_inirtime;
+ nni_mtx_unlock(&s->s_mx);
+
+ dialer_start_pipe(d, p);
+}
+
+void
nni_dialer_shutdown(nni_dialer *d)
{
nni_sock *s = d->d_sock;
@@ -1403,21 +1405,11 @@ nni_dialer_reap(nni_dialer *d)
nni_reap(&dialer_reap_list, d);
}
-void
-nni_listener_add_pipe(nni_listener *l, void *tpipe)
+static void
+listener_start_pipe(nni_listener *l, nni_pipe *p)
{
nni_sock *s = l->l_sock;
- nni_pipe *p;
-
- nni_mtx_lock(&s->s_mx);
- if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
- nni_mtx_unlock(&s->s_mx);
- return;
- }
- nni_list_append(&l->l_pipes, p);
- nni_list_append(&s->s_pipes, p);
- nni_mtx_unlock(&s->s_mx);
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&l->st_pipes, 1);
nni_stat_inc(&s->st_pipes, 1);
@@ -1454,10 +1446,25 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
"Accepted pipe<%u> on socket<%u> from %s", nni_pipe_id(p),
nni_sock_id(s), nni_pipe_peer_addr(p, addr));
}
+
+ // the socket now "owns" the pipe, and a pipe close should immediately
+ // start the process of teardown.
nni_pipe_rele(p);
}
void
+nni_listener_add_pipe(nni_listener *l, void *tpipe)
+{
+ nni_pipe *p;
+
+ if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
+ return;
+ }
+
+ listener_start_pipe(l, p);
+}
+
+void
nni_listener_shutdown(nni_listener *l)
{
nni_sock *s = l->l_sock;
@@ -1520,6 +1527,44 @@ nni_listener_reap(nni_listener *l)
nni_reap(&listener_reap_list, l);
}
+// nni_pipe_add just registers the pipe with the socket and endpoint
+// so they won't be deallocated while the pipe still exists.
+void
+nni_pipe_add(nni_pipe *p)
+{
+ nni_sock *s = p->p_sock;
+ nni_dialer *d = p->p_dialer;
+ nni_listener *l = p->p_listener;
+
+ nni_mtx_lock(&s->s_mx);
+ nni_list_append(&s->s_pipes, p);
+ if (d != NULL) {
+ NNI_ASSERT(l == NULL);
+ nni_list_append(&d->d_pipes, p);
+ }
+ if (l != NULL) {
+ NNI_ASSERT(d == NULL);
+ nni_list_append(&l->l_pipes, p);
+ }
+ nni_mtx_unlock(&s->s_mx);
+}
+
+// nni_pipe_start attempts to start the pipe, adding it to the socket and
+// endpoints and calling callbacks, etc. The pipe should already have finished
+// any negotiation needed at the transport layer.
+void
+nni_pipe_start(nni_pipe *p)
+{
+ if (p->p_listener) {
+ NNI_ASSERT(p->p_dialer == NULL);
+ listener_start_pipe(p->p_listener, p);
+ }
+ if (p->p_dialer) {
+ NNI_ASSERT(p->p_listener == NULL);
+ dialer_start_pipe(p->p_dialer, p);
+ }
+}
+
void
nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
{
@@ -1528,14 +1573,11 @@ nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
void *arg;
nni_mtx_lock(&s->s_pipe_cbs_mtx);
- if (!p->p_cbs) {
- if (ev == NNG_PIPE_EV_ADD_PRE) {
- // First event, after this we want all other events.
- p->p_cbs = true;
- } else {
- nni_mtx_unlock(&s->s_pipe_cbs_mtx);
- return;
- }
+ if (ev == NNG_PIPE_EV_ADD_PRE) {
+ p->p_cbs = true;
+ } else if (!p->p_cbs) {
+ nni_mtx_unlock(&s->s_pipe_cbs_mtx);
+ return;
}
cb = s->s_pipe_cbs[ev].cb_fn;
arg = s->s_pipe_cbs[ev].cb_arg;
@@ -1556,9 +1598,7 @@ nni_pipe_remove(nni_pipe *p)
nni_mtx_lock(&s->s_mx);
#ifdef NNG_ENABLE_STATS
- if (nni_list_node_active(&p->p_sock_node)) {
- nni_stat_dec(&s->st_pipes, 1);
- }
+ nni_stat_dec(&s->st_pipes, 1);
if (p->p_listener != NULL) {
nni_stat_dec(&p->p_listener->st_pipes, 1);
}
@@ -1568,8 +1608,6 @@ nni_pipe_remove(nni_pipe *p)
#endif
nni_list_node_remove(&p->p_sock_node);
nni_list_node_remove(&p->p_ep_node);
- p->p_listener = NULL;
- p->p_dialer = NULL;
if ((d != NULL) && (d->d_pipe == p)) {
d->d_pipe = NULL;
dialer_timer_start_locked(d); // Kick the timer to redial.
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index 801ef7b1..aeaa369b 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -132,12 +132,14 @@ extern void nni_dialer_destroy(nni_dialer *);
extern void nni_dialer_timer_start(nni_dialer *);
extern void nni_dialer_stop(nni_dialer *);
+extern void nni_listener_start_pipe(nni_listener *, nni_pipe *);
extern void nni_listener_add_pipe(nni_listener *, void *);
extern void nni_listener_shutdown(nni_listener *);
extern void nni_listener_reap(nni_listener *);
extern void nni_listener_destroy(nni_listener *);
extern void nni_listener_stop(nni_listener *);
+extern void nni_pipe_add(nni_pipe *);
extern void nni_pipe_remove(nni_pipe *);
extern bool nni_pipe_is_closed(nni_pipe *);
extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev);
diff --git a/src/sp/transport.h b/src/sp/transport.h
index b7134595..10207f28 100644
--- a/src/sp/transport.h
+++ b/src/sp/transport.h
@@ -12,6 +12,7 @@
#ifndef PROTOCOL_SP_TRANSPORT_H
#define PROTOCOL_SP_TRANSPORT_H
+#include "core/defs.h"
#include "core/options.h"
// Endpoint operations are called by the socket in a
@@ -25,6 +26,10 @@
// against any asynchronous operations they manage themselves, though.)
struct nni_sp_dialer_ops {
+ // d_size is the size of transport specific data allocated
+ // to the listener.
+ size_t d_size;
+
// d_init creates a vanilla dialer. The value created is
// used for the first argument for all other dialer functions.
int (*d_init)(void **, nng_url *, nni_dialer *);
@@ -64,6 +69,10 @@ struct nni_sp_dialer_ops {
};
struct nni_sp_listener_ops {
+ // l_size is the size of transport specific data allocated
+ // to the listener.
+ size_t l_size;
+
// l_init creates a vanilla listener. The value created is
// used for the first argument for all other listener functions.
int (*l_init)(void **, nng_url *, nni_listener *);
@@ -121,20 +130,27 @@ struct nni_sp_pipe_ops {
// p_init initializes the pipe data structures. The main
// purpose of this is so that the pipe will see the upper
// layer nni_pipe and get a chance to register stats and such.
+ size_t p_size;
+
+ // p_init initializes the transport's pipe data structure.
+ // The pipe MUST be left in a state that p_fini can be safely
+ // called on it, even if it does not succeed. (The upper layers
+ // will call p_fini as part of the cleanup of a failure.)
+ // This function should not acquire any locks.
int (*p_init)(void *, nni_pipe *);
// p_fini destroys the pipe. This should clean up all local
// resources, including closing files and freeing memory, used
// by the pipe. After this call returns, the system will not
- // make further calls on the same pipe.
+ // make further calls on the same pipe. This call should not block.
void (*p_fini)(void *);
// p_stop stops the pipe, waiting for any callbacks that are
// outstanding to complete. This is done before tearing down
- // resources with p_fini.
+ // resources with p_fini. Unlike p_fini, p_stop may block.
void (*p_stop)(void *);
- // p_aio_send queues the message for transmit. If this fails,
+ // p_send queues the message for transmit. If this fails,
// then the caller may try again with the same message (or free
// it). If the call succeeds, then the transport has taken
// ownership of the message, and the caller may not use it