diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/pipe.c | 78 | ||||
| -rw-r--r-- | src/core/pipe.h | 2 |
2 files changed, 76 insertions, 4 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 9247ec0f..71434fcb 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -17,12 +17,27 @@ static nni_idhash *nni_pipes; +static nni_list nni_pipe_reap_list; +static nni_mtx nni_pipe_reap_lk; +static nni_cv nni_pipe_reap_cv; +static nni_thr nni_pipe_reap_thr; +static int nni_pipe_reap_exit; +static int nni_pipe_reap_start; + +static void nni_pipe_reaper(void *); +static void nni_pipe_destroy(nni_pipe *); + int nni_pipe_sys_init(void) { int rv; - if ((rv = nni_idhash_init(&nni_pipes)) != 0) { + NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node); + if (((rv = nni_idhash_init(&nni_pipes)) != 0) || + ((rv = nni_mtx_init(&nni_pipe_reap_lk)) != 0) || + ((rv = nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk)) != 0) || + ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) != + 0)) { return (rv); } @@ -34,12 +49,25 @@ nni_pipe_sys_init(void) nni_idhash_set_limits( nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff); + nni_pipe_reap_start = 1; + nni_thr_run(&nni_pipe_reap_thr); + return (0); } void nni_pipe_sys_fini(void) { + if (nni_pipe_reap_start) { + nni_mtx_lock(&nni_pipe_reap_lk); + nni_pipe_reap_exit = 1; + nni_cv_wake(&nni_pipe_reap_cv); + nni_mtx_unlock(&nni_pipe_reap_lk); + nni_thr_fini(&nni_pipe_reap_thr); + } + + nni_cv_fini(&nni_pipe_reap_cv); + nni_mtx_fini(&nni_pipe_reap_lk); if (nni_pipes != NULL) { nni_idhash_fini(nni_pipes); nni_pipes = NULL; @@ -158,8 +186,13 @@ nni_pipe_stop(nni_pipe *p) return; } p->p_stop = 1; - nni_task_dispatch(&p->p_reap_task); nni_mtx_unlock(&p->p_mtx); + + // Put it on the reaplist for async cleanup + nni_mtx_lock(&nni_pipe_reap_lk); + nni_list_append(&nni_pipe_reap_list, p); + nni_cv_wake(&nni_pipe_reap_cv); + nni_mtx_unlock(&nni_pipe_reap_lk); } uint16_t @@ -208,7 +241,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_tran_data = tdata; p->p_proto_data = NULL; - nni_task_init(NULL, &p->p_reap_task, (nni_cb) nni_pipe_reap, p); + NNI_LIST_NODE_INIT(&p->p_reap_node); if (((rv = nni_mtx_init(&p->p_mtx)) != 0) || ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0)) { @@ -271,3 +304,42 @@ nni_pipe_ep_list_init(nni_list *list) { NNI_LIST_INIT(list, nni_pipe, p_ep_node); } + +static void +nni_pipe_reaper(void *notused) +{ + NNI_ARG_UNUSED(notused); + + nni_mtx_lock(&nni_pipe_reap_lk); + for (;;) { + nni_pipe *p; + if ((p = nni_list_first(&nni_pipe_reap_list)) != NULL) { + nni_list_remove(&nni_pipe_reap_list, p); + + nni_mtx_unlock(&nni_pipe_reap_lk); + // Transport close... + nni_pipe_close(p); + + nni_aio_stop(&p->p_start_aio); + + // Remove the pipe from the socket and the endpoint. + // Note that it is in theory possible for either of + // these to be null if the pipe is being torn down + // before it is fully initialized. + if (p->p_ep != NULL) { + nni_ep_pipe_remove(p->p_ep, p); + } + if (p->p_sock != NULL) { + nni_sock_pipe_remove(p->p_sock, p); + } + nni_pipe_destroy(p); + nni_mtx_lock(&nni_pipe_reap_lk); + continue; + } + if (nni_pipe_reap_exit) { + break; + } + nni_cv_wait(&nni_pipe_reap_cv); + } + nni_mtx_unlock(&nni_pipe_reap_lk); +} diff --git a/src/core/pipe.h b/src/core/pipe.h index b8f6d90a..40871062 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -34,7 +34,7 @@ struct nni_pipe { int p_refcnt; nni_mtx p_mtx; nni_cv p_cv; - nni_task p_reap_task; + nni_list_node p_reap_node; nni_aio p_start_aio; }; |
