aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/pipe.c78
-rw-r--r--src/core/pipe.h2
2 files changed, 76 insertions, 4 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 9247ec0f..71434fcb 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -17,12 +17,27 @@
static nni_idhash *nni_pipes;
+static nni_list nni_pipe_reap_list;
+static nni_mtx nni_pipe_reap_lk;
+static nni_cv nni_pipe_reap_cv;
+static nni_thr nni_pipe_reap_thr;
+static int nni_pipe_reap_exit;
+static int nni_pipe_reap_start;
+
+static void nni_pipe_reaper(void *);
+static void nni_pipe_destroy(nni_pipe *);
+
int
nni_pipe_sys_init(void)
{
int rv;
- if ((rv = nni_idhash_init(&nni_pipes)) != 0) {
+ NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node);
+ if (((rv = nni_idhash_init(&nni_pipes)) != 0) ||
+ ((rv = nni_mtx_init(&nni_pipe_reap_lk)) != 0) ||
+ ((rv = nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk)) != 0) ||
+ ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) !=
+ 0)) {
return (rv);
}
@@ -34,12 +49,25 @@ nni_pipe_sys_init(void)
nni_idhash_set_limits(
nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+ nni_pipe_reap_start = 1;
+ nni_thr_run(&nni_pipe_reap_thr);
+
return (0);
}
void
nni_pipe_sys_fini(void)
{
+ if (nni_pipe_reap_start) {
+ nni_mtx_lock(&nni_pipe_reap_lk);
+ nni_pipe_reap_exit = 1;
+ nni_cv_wake(&nni_pipe_reap_cv);
+ nni_mtx_unlock(&nni_pipe_reap_lk);
+ nni_thr_fini(&nni_pipe_reap_thr);
+ }
+
+ nni_cv_fini(&nni_pipe_reap_cv);
+ nni_mtx_fini(&nni_pipe_reap_lk);
if (nni_pipes != NULL) {
nni_idhash_fini(nni_pipes);
nni_pipes = NULL;
@@ -158,8 +186,13 @@ nni_pipe_stop(nni_pipe *p)
return;
}
p->p_stop = 1;
- nni_task_dispatch(&p->p_reap_task);
nni_mtx_unlock(&p->p_mtx);
+
+ // Put it on the reaplist for async cleanup
+ nni_mtx_lock(&nni_pipe_reap_lk);
+ nni_list_append(&nni_pipe_reap_list, p);
+ nni_cv_wake(&nni_pipe_reap_cv);
+ nni_mtx_unlock(&nni_pipe_reap_lk);
}
uint16_t
@@ -208,7 +241,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_tran_data = tdata;
p->p_proto_data = NULL;
- nni_task_init(NULL, &p->p_reap_task, (nni_cb) nni_pipe_reap, p);
+ NNI_LIST_NODE_INIT(&p->p_reap_node);
if (((rv = nni_mtx_init(&p->p_mtx)) != 0) ||
((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0)) {
@@ -271,3 +304,42 @@ nni_pipe_ep_list_init(nni_list *list)
{
NNI_LIST_INIT(list, nni_pipe, p_ep_node);
}
+
+static void
+nni_pipe_reaper(void *notused)
+{
+ NNI_ARG_UNUSED(notused);
+
+ nni_mtx_lock(&nni_pipe_reap_lk);
+ for (;;) {
+ nni_pipe *p;
+ if ((p = nni_list_first(&nni_pipe_reap_list)) != NULL) {
+ nni_list_remove(&nni_pipe_reap_list, p);
+
+ nni_mtx_unlock(&nni_pipe_reap_lk);
+ // Transport close...
+ nni_pipe_close(p);
+
+ nni_aio_stop(&p->p_start_aio);
+
+ // Remove the pipe from the socket and the endpoint.
+ // Note that it is in theory possible for either of
+ // these to be null if the pipe is being torn down
+ // before it is fully initialized.
+ if (p->p_ep != NULL) {
+ nni_ep_pipe_remove(p->p_ep, p);
+ }
+ if (p->p_sock != NULL) {
+ nni_sock_pipe_remove(p->p_sock, p);
+ }
+ nni_pipe_destroy(p);
+ nni_mtx_lock(&nni_pipe_reap_lk);
+ continue;
+ }
+ if (nni_pipe_reap_exit) {
+ break;
+ }
+ nni_cv_wait(&nni_pipe_reap_cv);
+ }
+ nni_mtx_unlock(&nni_pipe_reap_lk);
+}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index b8f6d90a..40871062 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -34,7 +34,7 @@ struct nni_pipe {
int p_refcnt;
nni_mtx p_mtx;
nni_cv p_cv;
- nni_task p_reap_task;
+ nni_list_node p_reap_node;
nni_aio p_start_aio;
};