diff options
| -rw-r--r-- | src/core/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/pipe.c | 53 | ||||
| -rw-r--r-- | src/core/refcnt.c | 33 | ||||
| -rw-r--r-- | src/core/refcnt.h | 28 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 3 |
6 files changed, 89 insertions, 31 deletions
diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index b8aa1e63..81f8d1ee 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -57,6 +57,8 @@ nng_sources( protocol.h reap.c reap.h + refcnt.c + refcnt.h sockaddr.c socket.c socket.h diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 0eceaf0b..b45a3712 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -40,6 +40,7 @@ #include "core/pollable.h" #include "core/protocol.h" #include "core/reap.h" +#include "core/refcnt.h" #include "core/stats.h" #include "core/stream.h" #include "core/strs.h" diff --git a/src/core/pipe.c b/src/core/pipe.c index 1e2842dc..c54cdfe0 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -25,19 +25,31 @@ static nni_id_map pipes = static nni_mtx pipes_lk = NNI_MTX_INITIALIZER; static void pipe_destroy(void *); +static void pipe_reap(void *); static nni_reap_list pipe_reap_list = { .rl_offset = offsetof(nni_pipe, p_reap), - .rl_func = pipe_destroy, + .rl_func = pipe_reap, }; static void pipe_destroy(void *arg) { nni_pipe *p = arg; - if (p == NULL) { - return; + + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_fini(p->p_proto_data); } + if (p->p_tran_data != NULL) { + p->p_tran_ops.p_fini(p->p_tran_data); + } + nni_free(p, p->p_size); +} + +void +pipe_reap(void *arg) +{ + nni_pipe *p = arg; nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST); @@ -47,32 +59,21 @@ pipe_destroy(void *arg) if (p->p_id != 0) { nni_id_remove(&pipes, p->p_id); } - // This wait guarantees that all callers are done with us. - while (p->p_ref != 0) { - nni_cv_wait(&p->p_cv); - } nni_mtx_unlock(&pipes_lk); - if (p->p_proto_data != NULL) { - p->p_proto_ops.pipe_stop(p->p_proto_data); - } - if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { - p->p_tran_ops.p_stop(p->p_tran_data); - } - #ifdef NNG_ENABLE_STATS nni_stat_unregister(&p->st_root); #endif nni_pipe_remove(p); if (p->p_proto_data != NULL) { - p->p_proto_ops.pipe_fini(p->p_proto_data); + p->p_proto_ops.pipe_stop(p->p_proto_data); } - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_fini(p->p_tran_data); + if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { + p->p_tran_ops.p_stop(p->p_tran_data); } - nni_cv_fini(&p->p_cv); - nni_free(p, p->p_size); + + nni_pipe_rele(p); } int @@ -86,7 +87,7 @@ nni_pipe_find(nni_pipe **pp, uint32_t id) // close the pipe. nni_mtx_lock(&pipes_lk); if ((p = nni_id_get(&pipes, id)) != NULL) { - p->p_ref++; + nni_refcnt_hold(&p->p_refcnt); *pp = p; } nni_mtx_unlock(&pipes_lk); @@ -96,12 +97,7 @@ nni_pipe_find(nni_pipe **pp, uint32_t id) void nni_pipe_rele(nni_pipe *p) { - nni_mtx_lock(&pipes_lk); - p->p_ref--; - if (p->p_ref == 0) { - nni_cv_wake(&p->p_cv); - } - nni_mtx_unlock(&pipes_lk); + nni_refcnt_rele(&p->p_refcnt); } // nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. @@ -250,15 +246,14 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) p->p_proto_ops = *pops; p->p_sock = sock; p->p_cbs = false; - p->p_ref = 1; + + nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy); nni_atomic_init_bool(&p->p_closed); nni_atomic_flag_reset(&p->p_stop); NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); - nni_cv_init(&p->p_cv, &pipes_lk); - nni_mtx_lock(&pipes_lk); rv = nni_id_alloc32(&pipes, &p->p_id, p); nni_mtx_unlock(&pipes_lk); diff --git a/src/core/refcnt.c b/src/core/refcnt.c new file mode 100644 index 00000000..1b119507 --- /dev/null +++ b/src/core/refcnt.c @@ -0,0 +1,33 @@ +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <core/refcnt.h> + +void +nni_refcnt_init( + nni_refcnt *rc, unsigned value, void *data, void (*fini)(void *)) +{ + nni_atomic_init(&rc->rc_cnt); + nni_atomic_set(&rc->rc_cnt, value); + rc->rc_data = data; + rc->rc_fini = fini; +} + +void +nni_refcnt_hold(nni_refcnt *rc) +{ + nni_atomic_inc(&rc->rc_cnt); +} + +void +nni_refcnt_rele(nni_refcnt *rc) +{ + if (nni_atomic_dec_nv(&rc->rc_cnt) == 0) { + rc->rc_fini(rc->rc_data); + } +} diff --git a/src/core/refcnt.h b/src/core/refcnt.h new file mode 100644 index 00000000..9878ba50 --- /dev/null +++ b/src/core/refcnt.h @@ -0,0 +1,28 @@ +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_REFCNT_H +#define CORE_REFCNT_H + +#include <nng/nng.h> + +#include <core/nng_impl.h> +#include <core/platform.h> + +typedef struct { + nni_atomic_int rc_cnt; + void (*rc_fini)(void *); + void *rc_data; +} nni_refcnt; + +extern void nni_refcnt_init( + nni_refcnt *rc, unsigned value, void *v, void (*fini)(void *)); +extern void nni_refcnt_hold(nni_refcnt *rc); +extern void nni_refcnt_rele(nni_refcnt *rc); + +#endif // CORE_REFCNT_H diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 45135976..801ef7b1 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -105,9 +105,8 @@ struct nni_pipe { nni_atomic_bool p_closed; nni_atomic_flag p_stop; bool p_cbs; - int p_ref; - nni_cv p_cv; nni_reap_node p_reap; + nni_refcnt p_refcnt; #ifdef NNG_ENABLE_STATS nni_stat_item st_root; |
