aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-07 22:28:15 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-08 14:02:42 -0800
commitd6ab6bca7a538c1a320ce00ab845e98c16649c94 (patch)
treed4a8a4f8ac49eee54c2d9bbdf8f9da6c28362311 /src/core/pipe.c
parent5223a142e38a320ce53097cea450d8ba7f175193 (diff)
downloadnng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.gz
nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.bz2
nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.zip
pipes and endpoints: support for inline allocations of transport data
This is a new transport API, which should make it easier for transports to rely upon lifetime guarantees made by the common SP framework, thus eliminating the need for transport specific reference counters, reap lists, and similar. The transport declares the size of the object in the ops vector (for pipe, dialer, or listener), and the framework supplies one allocated using the associated allocator. For now these add the pipe object to the socket and endpoint using linked linked lists. The plan is to transition those to reference counts which should be lighter weight and free form locking issues. The pipe teardown has been moved more fully to the reaper, to avoid some of the deadlocks that can occur as nni_pipe_close can be called from almost any context. For now the old API is retained as well, but the intention is to convert all the transports and then remove it.
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c169
1 files changed, 110 insertions, 59 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 05089fee..eaf49af9 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -38,9 +38,8 @@ pipe_destroy(void *arg)
nni_pipe *p = arg;
p->p_proto_ops.pipe_fini(p->p_proto_data);
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.p_fini(p->p_tran_data);
- }
+ p->p_tran_ops.p_fini(p->p_tran_data);
+
nni_free(p, p->p_size);
}
@@ -49,6 +48,11 @@ pipe_reap(void *arg)
{
nni_pipe *p = arg;
+ p->p_proto_ops.pipe_close(p->p_proto_data);
+
+ // Close the underlying transport.
+ p->p_tran_ops.p_close(p->p_tran_data);
+
nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST);
// Make sure any unlocked holders are done with this.
@@ -62,12 +66,11 @@ pipe_reap(void *arg)
#ifdef NNG_ENABLE_STATS
nni_stat_unregister(&p->st_root);
#endif
- nni_pipe_remove(p);
p->p_proto_ops.pipe_stop(p->p_proto_data);
- if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) {
- p->p_tran_ops.p_stop(p->p_tran_data);
- }
+ p->p_tran_ops.p_stop(p->p_tran_data);
+
+ nni_pipe_remove(p);
nni_pipe_rele(p);
}
@@ -125,13 +128,6 @@ nni_pipe_close(nni_pipe *p)
return; // We already did a close.
}
- p->p_proto_ops.pipe_close(p->p_proto_data);
-
- // Close the underlying transport.
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.p_close(p->p_tran_data);
- }
-
nni_reap(&pipe_reap_list, p);
}
@@ -154,10 +150,12 @@ pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info)
nni_stat_init(item, info);
nni_stat_add(&p->st_root, item);
}
+#endif
static void
pipe_stats_init(nni_pipe *p)
{
+#ifdef NNG_ENABLE_STATS
static const nni_stat_info root_info = {
.si_name = "pipe",
.si_desc = "pipe statistics",
@@ -201,6 +199,16 @@ pipe_stats_init(nni_pipe *p)
.si_unit = NNG_UNIT_BYTES,
.si_atomic = true,
};
+ static const nni_stat_info dialer_info = {
+ .si_name = "dialer",
+ .si_desc = "dialer for pipe",
+ .si_type = NNG_STAT_ID,
+ };
+ static const nni_stat_info listener_info = {
+ .si_name = "listener",
+ .si_desc = "listener for pipe",
+ .si_type = NNG_STAT_ID,
+ };
nni_stat_init(&p->st_root, &root_info);
pipe_stat_init(p, &p->st_id, &id_info);
@@ -213,34 +221,56 @@ pipe_stats_init(nni_pipe *p)
nni_stat_set_id(&p->st_root, (int) p->p_id);
nni_stat_set_id(&p->st_id, (int) p->p_id);
nni_stat_set_id(&p->st_sock_id, (int) nni_sock_id(p->p_sock));
-}
+
+ if (p->p_dialer) {
+ NNI_ASSERT(p->p_listener == NULL);
+ pipe_stat_init(p, &p->st_ep_id, &dialer_info);
+ nni_stat_set_id(
+ &p->st_ep_id, (int) nni_dialer_id(p->p_dialer));
+ }
+ if (p->p_listener) {
+ pipe_stat_init(p, &p->st_ep_id, &listener_info);
+ nni_stat_set_id(
+ &p->st_ep_id, (int) nni_listener_id(p->p_listener));
+ }
+#else
+ NNI_ARG_UNUSED(p);
#endif // NNG_ENABLE_STATS
+}
static int
-pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)
+pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d,
+ nni_listener *l, void *tran_data)
{
- nni_pipe *p;
- int rv;
- void *sock_data = nni_sock_proto_data(sock);
- nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
- size_t sz;
+ nni_pipe *p;
+ int rv1, rv2, rv3;
+ void *sock_data = nni_sock_proto_data(sock);
+ nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
+ const nni_sp_pipe_ops *tops = tran->tran_pipe;
+ size_t sz;
- sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size;
+ sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) +
+ NNI_ALIGN_UP(tops->p_size);
if ((p = nni_zalloc(sz)) == NULL) {
- // In this case we just toss the pipe...
- tran->tran_pipe->p_fini(tran_data);
+ // TODO: remove when all transports converted
+ // to use p_size.
+ if (tran_data != NULL) {
+ tops->p_fini(tran_data);
+ }
return (NNG_ENOMEM);
}
- p->p_size = sz;
- p->p_proto_data = p + 1;
- p->p_tran_ops = *tran->tran_pipe;
- p->p_tran_data = tran_data;
- p->p_proto_ops = *pops;
- p->p_sock = sock;
- p->p_cbs = false;
+ p->p_size = sz;
+ p->p_proto_ops = *pops;
+ p->p_tran_ops = *tops;
+ p->p_sock = sock;
+ p->p_cbs = false;
+ p->p_dialer = d;
+ p->p_listener = l;
+ // Two references - one for our caller, and
+ // one to be dropped when the pipe is closed.
nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy);
nni_atomic_init_bool(&p->p_closed);
@@ -248,19 +278,30 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
+ uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p));
+
+ if (tran_data == NULL) {
+ tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size);
+ }
+ nni_pipe_add(p);
+
+ p->p_tran_data = tran_data;
+ p->p_proto_data = proto_data;
+
nni_mtx_lock(&pipes_lk);
- rv = nni_id_alloc32(&pipes, &p->p_id, p);
+ rv1 = nni_id_alloc32(&pipes, &p->p_id, p);
nni_mtx_unlock(&pipes_lk);
-#ifdef NNG_ENABLE_STATS
+ // must be done before protocol or transports, because
+ // they may add further stats
pipe_stats_init(p);
-#endif
- if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) ||
- ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) {
+ rv2 = tops->p_init(tran_data, p);
+ rv3 = pops->pipe_init(proto_data, p, sock_data);
+ if (rv1 != 0 || rv2 != 0 || rv3 != 0) {
nni_pipe_close(p);
nni_pipe_rele(p);
- return (rv);
+ return (rv1 ? rv1 : rv2 ? rv2 : rv3);
}
*pp = p;
@@ -274,19 +315,9 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data)
nni_sp_tran *tran = d->d_tran;
nni_pipe *p;
- if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) {
+ if ((rv = pipe_create(&p, d->d_sock, tran, d, NULL, tran_data)) != 0) {
return (rv);
}
- p->p_dialer = d;
-#ifdef NNG_ENABLE_STATS
- static const nni_stat_info dialer_info = {
- .si_name = "dialer",
- .si_desc = "dialer for pipe",
- .si_type = NNG_STAT_ID,
- };
- pipe_stat_init(p, &p->st_ep_id, &dialer_info);
- nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d));
-#endif
*pp = p;
return (0);
}
@@ -298,24 +329,44 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data)
nni_sp_tran *tran = l->l_tran;
nni_pipe *p;
- if ((rv = pipe_create(&p, l->l_sock, tran, tran_data)) != 0) {
+ if ((rv = pipe_create(&p, l->l_sock, tran, NULL, l, tran_data)) != 0) {
return (rv);
}
- p->p_listener = l;
-#ifdef NNG_ENABLE_STATS
- static const nni_stat_info listener_info = {
- .si_name = "listener",
- .si_desc = "listener for pipe",
- .si_type = NNG_STAT_ID,
- };
- pipe_stat_init(p, &p->st_ep_id, &listener_info);
- nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l));
-#endif
*pp = p;
return (0);
}
int
+nni_pipe_alloc_dialer(void **datap, nni_dialer *d)
+{
+ int rv;
+ nni_sp_tran *tran = d->d_tran;
+ nni_sock *s = d->d_sock;
+ nni_pipe *p;
+
+ if ((rv = pipe_create(&p, s, tran, d, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ *datap = p->p_tran_data;
+ return (0);
+}
+
+int
+nni_pipe_alloc_listener(void **datap, nni_listener *l)
+{
+ int rv;
+ nni_sp_tran *tran = l->l_tran;
+ nni_sock *s = l->l_sock;
+ nni_pipe *p;
+
+ if ((rv = pipe_create(&p, s, tran, NULL, l, NULL)) != 0) {
+ return (rv);
+ }
+ *datap = p->p_tran_data;
+ return (0);
+}
+
+int
nni_pipe_getopt(
nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
{