aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0/req.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep0/req.c')
-rw-r--r--src/protocol/reqrep0/req.c29
1 files changed, 14 insertions, 15 deletions
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 59dd73b4..6220697b 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -66,6 +66,7 @@ struct req0_sock {
nni_list readypipes;
nni_list busypipes;
+ nni_list stoppipes;
nni_list ctxs;
nni_list sendq; // contexts waiting to send.
@@ -74,7 +75,6 @@ struct req0_sock {
nni_pollable *sendable;
nni_mtx mtx;
- nni_cv cv;
};
// A req0_pipe is our per-pipe protocol private structure.
@@ -113,10 +113,10 @@ req0_sock_init(void **sp, nni_sock *sock)
s->reqids, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u);
nni_mtx_init(&s->mtx);
- nni_cv_init(&s->cv, &s->mtx);
NNI_LIST_INIT(&s->readypipes, req0_pipe, node);
NNI_LIST_INIT(&s->busypipes, req0_pipe, node);
+ NNI_LIST_INIT(&s->stoppipes, req0_pipe, node);
NNI_LIST_INIT(&s->sendq, req0_ctx, sqnode);
NNI_LIST_INIT(&s->ctxs, req0_ctx, snode);
@@ -170,10 +170,9 @@ req0_sock_fini(void *arg)
req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
- while ((!nni_list_empty(&s->readypipes)) ||
- (!nni_list_empty(&s->busypipes))) {
- nni_cv_wait(&s->cv);
- }
+ NNI_ASSERT(nni_list_empty(&s->busypipes));
+ NNI_ASSERT(nni_list_empty(&s->stoppipes));
+ NNI_ASSERT(nni_list_empty(&s->readypipes));
nni_mtx_unlock(&s->mtx);
if (s->ctx) {
req0_ctx_fini(s->ctx);
@@ -181,7 +180,6 @@ req0_sock_fini(void *arg)
nni_pollable_free(s->recvable);
nni_pollable_free(s->sendable);
nni_idhash_fini(s->reqids);
- nni_cv_fini(&s->cv);
nni_mtx_fini(&s->mtx);
NNI_FREE_STRUCT(s);
}
@@ -190,9 +188,13 @@ static void
req0_pipe_stop(void *arg)
{
req0_pipe *p = arg;
+ req0_sock *s = p->req;
nni_aio_stop(p->aio_recv);
nni_aio_stop(p->aio_send);
+ nni_mtx_lock(&s->mtx);
+ nni_list_node_remove(&p->node);
+ nni_mtx_unlock(&s->mtx);
}
static void
@@ -239,7 +241,7 @@ req0_pipe_start(void *arg)
}
nni_mtx_lock(&s->mtx);
- if (s->closed) {
+ if (s->closed || p->closed) {
nni_mtx_unlock(&s->mtx);
return (NNG_ECLOSED);
}
@@ -264,14 +266,11 @@ req0_pipe_close(void *arg)
nni_mtx_lock(&s->mtx);
// This removes the node from either busypipes or readypipes.
- // It doesn't much matter which.
+ // It doesn't much matter which. We stick the pipe on the stop
+ // list, so that we can wait for that to close down safely.
p->closed = true;
- if (nni_list_node_active(&p->node)) {
- nni_list_node_remove(&p->node);
- if (s->closed) {
- nni_cv_wake(&s->cv);
- }
- }
+ nni_list_node_remove(&p->node);
+ nni_list_append(&s->stoppipes, p);
if (nni_list_empty(&s->readypipes)) {
nni_pollable_clear(s->sendable);
}