From 719bc93e11f6607d302d908475b240e1d50f5a89 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 5 Aug 2017 00:26:37 -0700 Subject: Use a dedicated reap thread instead of taskq. The problem is that reaping these things performs some blocking operations which can tie up slots in the taskq, preventing other tasks from running. Ultimately this can lead to a deadlock as tasks that are blocked wind up waiting for tasks that can't get scheduled. Blocking tasks really should not run on the system taskq. --- src/core/pipe.c | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 75 insertions(+), 3 deletions(-) (limited to 'src/core/pipe.c') diff --git a/src/core/pipe.c b/src/core/pipe.c index 9247ec0f..71434fcb 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -17,12 +17,27 @@ static nni_idhash *nni_pipes; +static nni_list nni_pipe_reap_list; +static nni_mtx nni_pipe_reap_lk; +static nni_cv nni_pipe_reap_cv; +static nni_thr nni_pipe_reap_thr; +static int nni_pipe_reap_exit; +static int nni_pipe_reap_start; + +static void nni_pipe_reaper(void *); +static void nni_pipe_destroy(nni_pipe *); + int nni_pipe_sys_init(void) { int rv; - if ((rv = nni_idhash_init(&nni_pipes)) != 0) { + NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node); + if (((rv = nni_idhash_init(&nni_pipes)) != 0) || + ((rv = nni_mtx_init(&nni_pipe_reap_lk)) != 0) || + ((rv = nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk)) != 0) || + ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) != + 0)) { return (rv); } @@ -34,12 +49,25 @@ nni_pipe_sys_init(void) nni_idhash_set_limits( nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff); + nni_pipe_reap_start = 1; + nni_thr_run(&nni_pipe_reap_thr); + return (0); } void nni_pipe_sys_fini(void) { + if (nni_pipe_reap_start) { + nni_mtx_lock(&nni_pipe_reap_lk); + nni_pipe_reap_exit = 1; + nni_cv_wake(&nni_pipe_reap_cv); + nni_mtx_unlock(&nni_pipe_reap_lk); + nni_thr_fini(&nni_pipe_reap_thr); + } + + nni_cv_fini(&nni_pipe_reap_cv); + nni_mtx_fini(&nni_pipe_reap_lk); if (nni_pipes != NULL) { nni_idhash_fini(nni_pipes); nni_pipes = NULL; @@ -158,8 +186,13 @@ nni_pipe_stop(nni_pipe *p) return; } p->p_stop = 1; - nni_task_dispatch(&p->p_reap_task); nni_mtx_unlock(&p->p_mtx); + + // Put it on the reaplist for async cleanup + nni_mtx_lock(&nni_pipe_reap_lk); + nni_list_append(&nni_pipe_reap_list, p); + nni_cv_wake(&nni_pipe_reap_cv); + nni_mtx_unlock(&nni_pipe_reap_lk); } uint16_t @@ -208,7 +241,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_tran_data = tdata; p->p_proto_data = NULL; - nni_task_init(NULL, &p->p_reap_task, (nni_cb) nni_pipe_reap, p); + NNI_LIST_NODE_INIT(&p->p_reap_node); if (((rv = nni_mtx_init(&p->p_mtx)) != 0) || ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0)) { @@ -271,3 +304,42 @@ nni_pipe_ep_list_init(nni_list *list) { NNI_LIST_INIT(list, nni_pipe, p_ep_node); } + +static void +nni_pipe_reaper(void *notused) +{ + NNI_ARG_UNUSED(notused); + + nni_mtx_lock(&nni_pipe_reap_lk); + for (;;) { + nni_pipe *p; + if ((p = nni_list_first(&nni_pipe_reap_list)) != NULL) { + nni_list_remove(&nni_pipe_reap_list, p); + + nni_mtx_unlock(&nni_pipe_reap_lk); + // Transport close... + nni_pipe_close(p); + + nni_aio_stop(&p->p_start_aio); + + // Remove the pipe from the socket and the endpoint. + // Note that it is in theory possible for either of + // these to be null if the pipe is being torn down + // before it is fully initialized. + if (p->p_ep != NULL) { + nni_ep_pipe_remove(p->p_ep, p); + } + if (p->p_sock != NULL) { + nni_sock_pipe_remove(p->p_sock, p); + } + nni_pipe_destroy(p); + nni_mtx_lock(&nni_pipe_reap_lk); + continue; + } + if (nni_pipe_reap_exit) { + break; + } + nni_cv_wait(&nni_pipe_reap_cv); + } + nni_mtx_unlock(&nni_pipe_reap_lk); +} -- cgit v1.2.3-70-g09d2