aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-05 00:26:37 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-05 00:26:37 -0700
commit719bc93e11f6607d302d908475b240e1d50f5a89 (patch)
tree885da39473db379d0b914df55f2555316b6fe5b6 /src/core/pipe.c
parenta40ab9c52768044fa8f8b74e43bcc23637417f7a (diff)
downloadnng-719bc93e11f6607d302d908475b240e1d50f5a89.tar.gz
nng-719bc93e11f6607d302d908475b240e1d50f5a89.tar.bz2
nng-719bc93e11f6607d302d908475b240e1d50f5a89.zip
Use a dedicated reap thread instead of taskq.
The problem is that reaping these things performs some blocking operations which can tie up slots in the taskq, preventing other tasks from running. Ultimately this can lead to a deadlock as tasks that are blocked wind up waiting for tasks that can't get scheduled. Blocking tasks really should not run on the system taskq.
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c78
1 files changed, 75 insertions, 3 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 9247ec0f..71434fcb 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -17,12 +17,27 @@
static nni_idhash *nni_pipes;
+static nni_list nni_pipe_reap_list;
+static nni_mtx nni_pipe_reap_lk;
+static nni_cv nni_pipe_reap_cv;
+static nni_thr nni_pipe_reap_thr;
+static int nni_pipe_reap_exit;
+static int nni_pipe_reap_start;
+
+static void nni_pipe_reaper(void *);
+static void nni_pipe_destroy(nni_pipe *);
+
int
nni_pipe_sys_init(void)
{
int rv;
- if ((rv = nni_idhash_init(&nni_pipes)) != 0) {
+ NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node);
+ if (((rv = nni_idhash_init(&nni_pipes)) != 0) ||
+ ((rv = nni_mtx_init(&nni_pipe_reap_lk)) != 0) ||
+ ((rv = nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk)) != 0) ||
+ ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) !=
+ 0)) {
return (rv);
}
@@ -34,12 +49,25 @@ nni_pipe_sys_init(void)
nni_idhash_set_limits(
nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+ nni_pipe_reap_start = 1;
+ nni_thr_run(&nni_pipe_reap_thr);
+
return (0);
}
void
nni_pipe_sys_fini(void)
{
+ if (nni_pipe_reap_start) {
+ nni_mtx_lock(&nni_pipe_reap_lk);
+ nni_pipe_reap_exit = 1;
+ nni_cv_wake(&nni_pipe_reap_cv);
+ nni_mtx_unlock(&nni_pipe_reap_lk);
+ nni_thr_fini(&nni_pipe_reap_thr);
+ }
+
+ nni_cv_fini(&nni_pipe_reap_cv);
+ nni_mtx_fini(&nni_pipe_reap_lk);
if (nni_pipes != NULL) {
nni_idhash_fini(nni_pipes);
nni_pipes = NULL;
@@ -158,8 +186,13 @@ nni_pipe_stop(nni_pipe *p)
return;
}
p->p_stop = 1;
- nni_task_dispatch(&p->p_reap_task);
nni_mtx_unlock(&p->p_mtx);
+
+ // Put it on the reaplist for async cleanup
+ nni_mtx_lock(&nni_pipe_reap_lk);
+ nni_list_append(&nni_pipe_reap_list, p);
+ nni_cv_wake(&nni_pipe_reap_cv);
+ nni_mtx_unlock(&nni_pipe_reap_lk);
}
uint16_t
@@ -208,7 +241,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_tran_data = tdata;
p->p_proto_data = NULL;
- nni_task_init(NULL, &p->p_reap_task, (nni_cb) nni_pipe_reap, p);
+ NNI_LIST_NODE_INIT(&p->p_reap_node);
if (((rv = nni_mtx_init(&p->p_mtx)) != 0) ||
((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0)) {
@@ -271,3 +304,42 @@ nni_pipe_ep_list_init(nni_list *list)
{
NNI_LIST_INIT(list, nni_pipe, p_ep_node);
}
+
+static void
+nni_pipe_reaper(void *notused)
+{
+ NNI_ARG_UNUSED(notused);
+
+ nni_mtx_lock(&nni_pipe_reap_lk);
+ for (;;) {
+ nni_pipe *p;
+ if ((p = nni_list_first(&nni_pipe_reap_list)) != NULL) {
+ nni_list_remove(&nni_pipe_reap_list, p);
+
+ nni_mtx_unlock(&nni_pipe_reap_lk);
+ // Transport close...
+ nni_pipe_close(p);
+
+ nni_aio_stop(&p->p_start_aio);
+
+ // Remove the pipe from the socket and the endpoint.
+ // Note that it is in theory possible for either of
+ // these to be null if the pipe is being torn down
+ // before it is fully initialized.
+ if (p->p_ep != NULL) {
+ nni_ep_pipe_remove(p->p_ep, p);
+ }
+ if (p->p_sock != NULL) {
+ nni_sock_pipe_remove(p->p_sock, p);
+ }
+ nni_pipe_destroy(p);
+ nni_mtx_lock(&nni_pipe_reap_lk);
+ continue;
+ }
+ if (nni_pipe_reap_exit) {
+ break;
+ }
+ nni_cv_wait(&nni_pipe_reap_cv);
+ }
+ nni_mtx_unlock(&nni_pipe_reap_lk);
+}