aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/endpt.c10
-rw-r--r--src/core/pipe.c1
2 files changed, 11 insertions, 0 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 96e8e42c..9a677a23 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -153,6 +153,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
void
nni_ep_close(nni_ep *ep)
{
+ nni_pipe *pipe;
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
@@ -186,6 +187,9 @@ nni_ep_reap(nni_ep *ep)
// done everything we can to wake any waiter (synchronous connect)
// gracefully.
nni_mtx_lock(&ep->ep_mtx);
+ while (!nni_list_empty(&ep->ep_pipes)) {
+ nni_cv_wait(&ep->ep_cv);
+ }
while (ep->ep_refcnt != 0) {
nni_cv_wait(&ep->ep_cv);
}
@@ -197,6 +201,8 @@ nni_ep_reap(nni_ep *ep)
void
nni_ep_stop(nni_ep *ep)
{
+ nni_pipe *pipe;
+
nni_mtx_lock(&ep->ep_mtx);
// Protection against recursion.
@@ -205,6 +211,10 @@ nni_ep_stop(nni_ep *ep)
return;
}
ep->ep_stop = 1;
+ NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
+ nni_pipe_stop(pipe);
+ }
+
nni_taskq_ent_init(&ep->ep_reap_tqe, (nni_cb) nni_ep_reap, ep);
nni_taskq_dispatch(NULL, &ep->ep_reap_tqe);
nni_mtx_unlock(&ep->ep_mtx);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 7b285575..f24ea9c3 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -63,6 +63,7 @@ nni_pipe_destroy(nni_pipe *p)
nni_idhash_remove(nni_pipes, p->p_id);
}
nni_mtx_fini(&p->p_mtx);
+ NNI_FREE_STRUCT(p);
}
// nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces.