From d6ab6bca7a538c1a320ce00ab845e98c16649c94 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 7 Dec 2024 22:28:15 -0800 Subject: pipes and endpoints: support for inline allocations of transport data This is a new transport API, which should make it easier for transports to rely upon lifetime guarantees made by the common SP framework, thus eliminating the need for transport specific reference counters, reap lists, and similar. The transport declares the size of the object in the ops vector (for pipe, dialer, or listener), and the framework supplies one allocated using the associated allocator. For now these add the pipe object to the socket and endpoint using linked linked lists. The plan is to transition those to reference counts which should be lighter weight and free form locking issues. The pipe teardown has been moved more fully to the reaper, to avoid some of the deadlocks that can occur as nni_pipe_close can be called from almost any context. For now the old API is retained as well, but the intention is to convert all the transports and then remove it. --- src/core/pipe.c | 169 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 110 insertions(+), 59 deletions(-) (limited to 'src/core/pipe.c') diff --git a/src/core/pipe.c b/src/core/pipe.c index 05089fee..eaf49af9 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -38,9 +38,8 @@ pipe_destroy(void *arg) nni_pipe *p = arg; p->p_proto_ops.pipe_fini(p->p_proto_data); - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_fini(p->p_tran_data); - } + p->p_tran_ops.p_fini(p->p_tran_data); + nni_free(p, p->p_size); } @@ -49,6 +48,11 @@ pipe_reap(void *arg) { nni_pipe *p = arg; + p->p_proto_ops.pipe_close(p->p_proto_data); + + // Close the underlying transport. + p->p_tran_ops.p_close(p->p_tran_data); + nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST); // Make sure any unlocked holders are done with this. @@ -62,12 +66,11 @@ pipe_reap(void *arg) #ifdef NNG_ENABLE_STATS nni_stat_unregister(&p->st_root); #endif - nni_pipe_remove(p); p->p_proto_ops.pipe_stop(p->p_proto_data); - if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { - p->p_tran_ops.p_stop(p->p_tran_data); - } + p->p_tran_ops.p_stop(p->p_tran_data); + + nni_pipe_remove(p); nni_pipe_rele(p); } @@ -125,13 +128,6 @@ nni_pipe_close(nni_pipe *p) return; // We already did a close. } - p->p_proto_ops.pipe_close(p->p_proto_data); - - // Close the underlying transport. - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_close(p->p_tran_data); - } - nni_reap(&pipe_reap_list, p); } @@ -154,10 +150,12 @@ pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info) nni_stat_init(item, info); nni_stat_add(&p->st_root, item); } +#endif static void pipe_stats_init(nni_pipe *p) { +#ifdef NNG_ENABLE_STATS static const nni_stat_info root_info = { .si_name = "pipe", .si_desc = "pipe statistics", @@ -201,6 +199,16 @@ pipe_stats_init(nni_pipe *p) .si_unit = NNG_UNIT_BYTES, .si_atomic = true, }; + static const nni_stat_info dialer_info = { + .si_name = "dialer", + .si_desc = "dialer for pipe", + .si_type = NNG_STAT_ID, + }; + static const nni_stat_info listener_info = { + .si_name = "listener", + .si_desc = "listener for pipe", + .si_type = NNG_STAT_ID, + }; nni_stat_init(&p->st_root, &root_info); pipe_stat_init(p, &p->st_id, &id_info); @@ -213,34 +221,56 @@ pipe_stats_init(nni_pipe *p) nni_stat_set_id(&p->st_root, (int) p->p_id); nni_stat_set_id(&p->st_id, (int) p->p_id); nni_stat_set_id(&p->st_sock_id, (int) nni_sock_id(p->p_sock)); -} + + if (p->p_dialer) { + NNI_ASSERT(p->p_listener == NULL); + pipe_stat_init(p, &p->st_ep_id, &dialer_info); + nni_stat_set_id( + &p->st_ep_id, (int) nni_dialer_id(p->p_dialer)); + } + if (p->p_listener) { + pipe_stat_init(p, &p->st_ep_id, &listener_info); + nni_stat_set_id( + &p->st_ep_id, (int) nni_listener_id(p->p_listener)); + } +#else + NNI_ARG_UNUSED(p); #endif // NNG_ENABLE_STATS +} static int -pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) +pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d, + nni_listener *l, void *tran_data) { - nni_pipe *p; - int rv; - void *sock_data = nni_sock_proto_data(sock); - nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); - size_t sz; + nni_pipe *p; + int rv1, rv2, rv3; + void *sock_data = nni_sock_proto_data(sock); + nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); + const nni_sp_pipe_ops *tops = tran->tran_pipe; + size_t sz; - sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size; + sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) + + NNI_ALIGN_UP(tops->p_size); if ((p = nni_zalloc(sz)) == NULL) { - // In this case we just toss the pipe... - tran->tran_pipe->p_fini(tran_data); + // TODO: remove when all transports converted + // to use p_size. + if (tran_data != NULL) { + tops->p_fini(tran_data); + } return (NNG_ENOMEM); } - p->p_size = sz; - p->p_proto_data = p + 1; - p->p_tran_ops = *tran->tran_pipe; - p->p_tran_data = tran_data; - p->p_proto_ops = *pops; - p->p_sock = sock; - p->p_cbs = false; + p->p_size = sz; + p->p_proto_ops = *pops; + p->p_tran_ops = *tops; + p->p_sock = sock; + p->p_cbs = false; + p->p_dialer = d; + p->p_listener = l; + // Two references - one for our caller, and + // one to be dropped when the pipe is closed. nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy); nni_atomic_init_bool(&p->p_closed); @@ -248,19 +278,30 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); + uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p)); + + if (tran_data == NULL) { + tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size); + } + nni_pipe_add(p); + + p->p_tran_data = tran_data; + p->p_proto_data = proto_data; + nni_mtx_lock(&pipes_lk); - rv = nni_id_alloc32(&pipes, &p->p_id, p); + rv1 = nni_id_alloc32(&pipes, &p->p_id, p); nni_mtx_unlock(&pipes_lk); -#ifdef NNG_ENABLE_STATS + // must be done before protocol or transports, because + // they may add further stats pipe_stats_init(p); -#endif - if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) || - ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) { + rv2 = tops->p_init(tran_data, p); + rv3 = pops->pipe_init(proto_data, p, sock_data); + if (rv1 != 0 || rv2 != 0 || rv3 != 0) { nni_pipe_close(p); nni_pipe_rele(p); - return (rv); + return (rv1 ? rv1 : rv2 ? rv2 : rv3); } *pp = p; @@ -274,19 +315,9 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data) nni_sp_tran *tran = d->d_tran; nni_pipe *p; - if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) { + if ((rv = pipe_create(&p, d->d_sock, tran, d, NULL, tran_data)) != 0) { return (rv); } - p->p_dialer = d; -#ifdef NNG_ENABLE_STATS - static const nni_stat_info dialer_info = { - .si_name = "dialer", - .si_desc = "dialer for pipe", - .si_type = NNG_STAT_ID, - }; - pipe_stat_init(p, &p->st_ep_id, &dialer_info); - nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d)); -#endif *pp = p; return (0); } @@ -298,23 +329,43 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data) nni_sp_tran *tran = l->l_tran; nni_pipe *p; - if ((rv = pipe_create(&p, l->l_sock, tran, tran_data)) != 0) { + if ((rv = pipe_create(&p, l->l_sock, tran, NULL, l, tran_data)) != 0) { return (rv); } - p->p_listener = l; -#ifdef NNG_ENABLE_STATS - static const nni_stat_info listener_info = { - .si_name = "listener", - .si_desc = "listener for pipe", - .si_type = NNG_STAT_ID, - }; - pipe_stat_init(p, &p->st_ep_id, &listener_info); - nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l)); -#endif *pp = p; return (0); } +int +nni_pipe_alloc_dialer(void **datap, nni_dialer *d) +{ + int rv; + nni_sp_tran *tran = d->d_tran; + nni_sock *s = d->d_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, d, NULL, NULL)) != 0) { + return (rv); + } + *datap = p->p_tran_data; + return (0); +} + +int +nni_pipe_alloc_listener(void **datap, nni_listener *l) +{ + int rv; + nni_sp_tran *tran = l->l_tran; + nni_sock *s = l->l_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, NULL, l, NULL)) != 0) { + return (rv); + } + *datap = p->p_tran_data; + return (0); +} + int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) -- cgit v1.2.3-70-g09d2