diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 53 |
1 files changed, 24 insertions, 29 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 1e2842dc..c54cdfe0 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -25,19 +25,31 @@ static nni_id_map pipes = static nni_mtx pipes_lk = NNI_MTX_INITIALIZER; static void pipe_destroy(void *); +static void pipe_reap(void *); static nni_reap_list pipe_reap_list = { .rl_offset = offsetof(nni_pipe, p_reap), - .rl_func = pipe_destroy, + .rl_func = pipe_reap, }; static void pipe_destroy(void *arg) { nni_pipe *p = arg; - if (p == NULL) { - return; + + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_fini(p->p_proto_data); } + if (p->p_tran_data != NULL) { + p->p_tran_ops.p_fini(p->p_tran_data); + } + nni_free(p, p->p_size); +} + +void +pipe_reap(void *arg) +{ + nni_pipe *p = arg; nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST); @@ -47,32 +59,21 @@ pipe_destroy(void *arg) if (p->p_id != 0) { nni_id_remove(&pipes, p->p_id); } - // This wait guarantees that all callers are done with us. - while (p->p_ref != 0) { - nni_cv_wait(&p->p_cv); - } nni_mtx_unlock(&pipes_lk); - if (p->p_proto_data != NULL) { - p->p_proto_ops.pipe_stop(p->p_proto_data); - } - if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { - p->p_tran_ops.p_stop(p->p_tran_data); - } - #ifdef NNG_ENABLE_STATS nni_stat_unregister(&p->st_root); #endif nni_pipe_remove(p); if (p->p_proto_data != NULL) { - p->p_proto_ops.pipe_fini(p->p_proto_data); + p->p_proto_ops.pipe_stop(p->p_proto_data); } - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_fini(p->p_tran_data); + if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { + p->p_tran_ops.p_stop(p->p_tran_data); } - nni_cv_fini(&p->p_cv); - nni_free(p, p->p_size); + + nni_pipe_rele(p); } int @@ -86,7 +87,7 @@ nni_pipe_find(nni_pipe **pp, uint32_t id) // close the pipe. nni_mtx_lock(&pipes_lk); if ((p = nni_id_get(&pipes, id)) != NULL) { - p->p_ref++; + nni_refcnt_hold(&p->p_refcnt); *pp = p; } nni_mtx_unlock(&pipes_lk); @@ -96,12 +97,7 @@ nni_pipe_find(nni_pipe **pp, uint32_t id) void nni_pipe_rele(nni_pipe *p) { - nni_mtx_lock(&pipes_lk); - p->p_ref--; - if (p->p_ref == 0) { - nni_cv_wake(&p->p_cv); - } - nni_mtx_unlock(&pipes_lk); + nni_refcnt_rele(&p->p_refcnt); } // nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. @@ -250,15 +246,14 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) p->p_proto_ops = *pops; p->p_sock = sock; p->p_cbs = false; - p->p_ref = 1; + + nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy); nni_atomic_init_bool(&p->p_closed); nni_atomic_flag_reset(&p->p_stop); NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); - nni_cv_init(&p->p_cv, &pipes_lk); - nni_mtx_lock(&pipes_lk); rv = nni_id_alloc32(&pipes, &p->p_id, p); nni_mtx_unlock(&pipes_lk); |
