diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-07 22:28:15 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-08 14:02:42 -0800 |
| commit | d6ab6bca7a538c1a320ce00ab845e98c16649c94 (patch) | |
| tree | d4a8a4f8ac49eee54c2d9bbdf8f9da6c28362311 /src/core/pipe.c | |
| parent | 5223a142e38a320ce53097cea450d8ba7f175193 (diff) | |
| download | nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.gz nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.bz2 nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.zip | |
pipes and endpoints: support for inline allocations of transport data
This is a new transport API, which should make it easier for transports
to rely upon lifetime guarantees made by the common SP framework, thus
eliminating the need for transport specific reference counters, reap
lists, and similar.
The transport declares the size of the object in the ops vector (for
pipe, dialer, or listener), and the framework supplies one allocated
using the associated allocator.
For now these add the pipe object to the socket and endpoint using
linked linked lists. The plan is to transition those to reference
counts which should be lighter weight and free form locking issues.
The pipe teardown has been moved more fully to the reaper, to avoid
some of the deadlocks that can occur as nni_pipe_close can be called
from almost any context.
For now the old API is retained as well, but the intention is to convert
all the transports and then remove it.
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 169 |
1 files changed, 110 insertions, 59 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 05089fee..eaf49af9 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -38,9 +38,8 @@ pipe_destroy(void *arg) nni_pipe *p = arg; p->p_proto_ops.pipe_fini(p->p_proto_data); - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_fini(p->p_tran_data); - } + p->p_tran_ops.p_fini(p->p_tran_data); + nni_free(p, p->p_size); } @@ -49,6 +48,11 @@ pipe_reap(void *arg) { nni_pipe *p = arg; + p->p_proto_ops.pipe_close(p->p_proto_data); + + // Close the underlying transport. + p->p_tran_ops.p_close(p->p_tran_data); + nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST); // Make sure any unlocked holders are done with this. @@ -62,12 +66,11 @@ pipe_reap(void *arg) #ifdef NNG_ENABLE_STATS nni_stat_unregister(&p->st_root); #endif - nni_pipe_remove(p); p->p_proto_ops.pipe_stop(p->p_proto_data); - if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { - p->p_tran_ops.p_stop(p->p_tran_data); - } + p->p_tran_ops.p_stop(p->p_tran_data); + + nni_pipe_remove(p); nni_pipe_rele(p); } @@ -125,13 +128,6 @@ nni_pipe_close(nni_pipe *p) return; // We already did a close. } - p->p_proto_ops.pipe_close(p->p_proto_data); - - // Close the underlying transport. - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_close(p->p_tran_data); - } - nni_reap(&pipe_reap_list, p); } @@ -154,10 +150,12 @@ pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info) nni_stat_init(item, info); nni_stat_add(&p->st_root, item); } +#endif static void pipe_stats_init(nni_pipe *p) { +#ifdef NNG_ENABLE_STATS static const nni_stat_info root_info = { .si_name = "pipe", .si_desc = "pipe statistics", @@ -201,6 +199,16 @@ pipe_stats_init(nni_pipe *p) .si_unit = NNG_UNIT_BYTES, .si_atomic = true, }; + static const nni_stat_info dialer_info = { + .si_name = "dialer", + .si_desc = "dialer for pipe", + .si_type = NNG_STAT_ID, + }; + static const nni_stat_info listener_info = { + .si_name = "listener", + .si_desc = "listener for pipe", + .si_type = NNG_STAT_ID, + }; nni_stat_init(&p->st_root, &root_info); pipe_stat_init(p, &p->st_id, &id_info); @@ -213,34 +221,56 @@ pipe_stats_init(nni_pipe *p) nni_stat_set_id(&p->st_root, (int) p->p_id); nni_stat_set_id(&p->st_id, (int) p->p_id); nni_stat_set_id(&p->st_sock_id, (int) nni_sock_id(p->p_sock)); -} + + if (p->p_dialer) { + NNI_ASSERT(p->p_listener == NULL); + pipe_stat_init(p, &p->st_ep_id, &dialer_info); + nni_stat_set_id( + &p->st_ep_id, (int) nni_dialer_id(p->p_dialer)); + } + if (p->p_listener) { + pipe_stat_init(p, &p->st_ep_id, &listener_info); + nni_stat_set_id( + &p->st_ep_id, (int) nni_listener_id(p->p_listener)); + } +#else + NNI_ARG_UNUSED(p); #endif // NNG_ENABLE_STATS +} static int -pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) +pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d, + nni_listener *l, void *tran_data) { - nni_pipe *p; - int rv; - void *sock_data = nni_sock_proto_data(sock); - nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); - size_t sz; + nni_pipe *p; + int rv1, rv2, rv3; + void *sock_data = nni_sock_proto_data(sock); + nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); + const nni_sp_pipe_ops *tops = tran->tran_pipe; + size_t sz; - sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size; + sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) + + NNI_ALIGN_UP(tops->p_size); if ((p = nni_zalloc(sz)) == NULL) { - // In this case we just toss the pipe... - tran->tran_pipe->p_fini(tran_data); + // TODO: remove when all transports converted + // to use p_size. + if (tran_data != NULL) { + tops->p_fini(tran_data); + } return (NNG_ENOMEM); } - p->p_size = sz; - p->p_proto_data = p + 1; - p->p_tran_ops = *tran->tran_pipe; - p->p_tran_data = tran_data; - p->p_proto_ops = *pops; - p->p_sock = sock; - p->p_cbs = false; + p->p_size = sz; + p->p_proto_ops = *pops; + p->p_tran_ops = *tops; + p->p_sock = sock; + p->p_cbs = false; + p->p_dialer = d; + p->p_listener = l; + // Two references - one for our caller, and + // one to be dropped when the pipe is closed. nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy); nni_atomic_init_bool(&p->p_closed); @@ -248,19 +278,30 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); + uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p)); + + if (tran_data == NULL) { + tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size); + } + nni_pipe_add(p); + + p->p_tran_data = tran_data; + p->p_proto_data = proto_data; + nni_mtx_lock(&pipes_lk); - rv = nni_id_alloc32(&pipes, &p->p_id, p); + rv1 = nni_id_alloc32(&pipes, &p->p_id, p); nni_mtx_unlock(&pipes_lk); -#ifdef NNG_ENABLE_STATS + // must be done before protocol or transports, because + // they may add further stats pipe_stats_init(p); -#endif - if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) || - ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) { + rv2 = tops->p_init(tran_data, p); + rv3 = pops->pipe_init(proto_data, p, sock_data); + if (rv1 != 0 || rv2 != 0 || rv3 != 0) { nni_pipe_close(p); nni_pipe_rele(p); - return (rv); + return (rv1 ? rv1 : rv2 ? rv2 : rv3); } *pp = p; @@ -274,19 +315,9 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data) nni_sp_tran *tran = d->d_tran; nni_pipe *p; - if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) { + if ((rv = pipe_create(&p, d->d_sock, tran, d, NULL, tran_data)) != 0) { return (rv); } - p->p_dialer = d; -#ifdef NNG_ENABLE_STATS - static const nni_stat_info dialer_info = { - .si_name = "dialer", - .si_desc = "dialer for pipe", - .si_type = NNG_STAT_ID, - }; - pipe_stat_init(p, &p->st_ep_id, &dialer_info); - nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d)); -#endif *pp = p; return (0); } @@ -298,24 +329,44 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data) nni_sp_tran *tran = l->l_tran; nni_pipe *p; - if ((rv = pipe_create(&p, l->l_sock, tran, tran_data)) != 0) { + if ((rv = pipe_create(&p, l->l_sock, tran, NULL, l, tran_data)) != 0) { return (rv); } - p->p_listener = l; -#ifdef NNG_ENABLE_STATS - static const nni_stat_info listener_info = { - .si_name = "listener", - .si_desc = "listener for pipe", - .si_type = NNG_STAT_ID, - }; - pipe_stat_init(p, &p->st_ep_id, &listener_info); - nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l)); -#endif *pp = p; return (0); } int +nni_pipe_alloc_dialer(void **datap, nni_dialer *d) +{ + int rv; + nni_sp_tran *tran = d->d_tran; + nni_sock *s = d->d_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, d, NULL, NULL)) != 0) { + return (rv); + } + *datap = p->p_tran_data; + return (0); +} + +int +nni_pipe_alloc_listener(void **datap, nni_listener *l) +{ + int rv; + nni_sp_tran *tran = l->l_tran; + nni_sock *s = l->l_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, NULL, l, NULL)) != 0) { + return (rv); + } + *datap = p->p_tran_data; + return (0); +} + +int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) { |
