aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/socket.c26
-rw-r--r--src/protocol/reqrep0/req.c29
2 files changed, 28 insertions, 27 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 1fcf5e0b..becd9b55 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -643,12 +643,6 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each pipe, arrange for it to teardown hard. We would
- // expect there not to be any here.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_close(pipe);
- }
-
// Go through the dialers and listeners, attempting to close them.
// We might already have a close in progress, in which case
// we skip past it; it will be removed from another thread.
@@ -663,6 +657,15 @@ nni_sock_shutdown(nni_sock *sock)
}
}
+ // For each pipe, arrange for it to teardown hard. We would
+ // expect there not to be any here. However, it is possible for
+ // a pipe to have been added by an endpoint due to racing conditions
+ // in the shutdown. Therefore it is important that we shutdown pipes
+ // *last*.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+
// We have to wait for *both* endpoints and pipes to be
// removed.
while ((!nni_list_empty(&sock->s_pipes)) ||
@@ -719,13 +722,12 @@ nni_sock_close(nni_sock *s)
}
nni_mtx_unlock(&sock_lk);
- // Wait for pipes, eps, and contexts to finish closing.
+ // Because we already shut everything down before, we should not
+ // have any child objects.
nni_mtx_lock(&s->s_mx);
- while ((!nni_list_empty(&s->s_pipes)) ||
- (!nni_list_empty(&s->s_dialers)) ||
- (!nni_list_empty(&s->s_listeners))) {
- nni_cv_wait(&s->s_cv);
- }
+ NNI_ASSERT(nni_list_empty(&s->s_dialers));
+ NNI_ASSERT(nni_list_empty(&s->s_listeners));
+ NNI_ASSERT(nni_list_empty(&s->s_pipes));
nni_mtx_unlock(&s->s_mx);
sock_destroy(s);
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 59dd73b4..6220697b 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -66,6 +66,7 @@ struct req0_sock {
nni_list readypipes;
nni_list busypipes;
+ nni_list stoppipes;
nni_list ctxs;
nni_list sendq; // contexts waiting to send.
@@ -74,7 +75,6 @@ struct req0_sock {
nni_pollable *sendable;
nni_mtx mtx;
- nni_cv cv;
};
// A req0_pipe is our per-pipe protocol private structure.
@@ -113,10 +113,10 @@ req0_sock_init(void **sp, nni_sock *sock)
s->reqids, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u);
nni_mtx_init(&s->mtx);
- nni_cv_init(&s->cv, &s->mtx);
NNI_LIST_INIT(&s->readypipes, req0_pipe, node);
NNI_LIST_INIT(&s->busypipes, req0_pipe, node);
+ NNI_LIST_INIT(&s->stoppipes, req0_pipe, node);
NNI_LIST_INIT(&s->sendq, req0_ctx, sqnode);
NNI_LIST_INIT(&s->ctxs, req0_ctx, snode);
@@ -170,10 +170,9 @@ req0_sock_fini(void *arg)
req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
- while ((!nni_list_empty(&s->readypipes)) ||
- (!nni_list_empty(&s->busypipes))) {
- nni_cv_wait(&s->cv);
- }
+ NNI_ASSERT(nni_list_empty(&s->busypipes));
+ NNI_ASSERT(nni_list_empty(&s->stoppipes));
+ NNI_ASSERT(nni_list_empty(&s->readypipes));
nni_mtx_unlock(&s->mtx);
if (s->ctx) {
req0_ctx_fini(s->ctx);
@@ -181,7 +180,6 @@ req0_sock_fini(void *arg)
nni_pollable_free(s->recvable);
nni_pollable_free(s->sendable);
nni_idhash_fini(s->reqids);
- nni_cv_fini(&s->cv);
nni_mtx_fini(&s->mtx);
NNI_FREE_STRUCT(s);
}
@@ -190,9 +188,13 @@ static void
req0_pipe_stop(void *arg)
{
req0_pipe *p = arg;
+ req0_sock *s = p->req;
nni_aio_stop(p->aio_recv);
nni_aio_stop(p->aio_send);
+ nni_mtx_lock(&s->mtx);
+ nni_list_node_remove(&p->node);
+ nni_mtx_unlock(&s->mtx);
}
static void
@@ -239,7 +241,7 @@ req0_pipe_start(void *arg)
}
nni_mtx_lock(&s->mtx);
- if (s->closed) {
+ if (s->closed || p->closed) {
nni_mtx_unlock(&s->mtx);
return (NNG_ECLOSED);
}
@@ -264,14 +266,11 @@ req0_pipe_close(void *arg)
nni_mtx_lock(&s->mtx);
// This removes the node from either busypipes or readypipes.
- // It doesn't much matter which.
+ // It doesn't much matter which. We stick the pipe on the stop
+ // list, so that we can wait for that to close down safely.
p->closed = true;
- if (nni_list_node_active(&p->node)) {
- nni_list_node_remove(&p->node);
- if (s->closed) {
- nni_cv_wake(&s->cv);
- }
- }
+ nni_list_node_remove(&p->node);
+ nni_list_append(&s->stoppipes, p);
if (nni_list_empty(&s->readypipes)) {
nni_pollable_clear(s->sendable);
}