diff options
Diffstat (limited to 'src/protocol/reqrep0')
| -rw-r--r-- | src/protocol/reqrep0/req.c | 29 |
1 files changed, 14 insertions, 15 deletions
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 59dd73b4..6220697b 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -66,6 +66,7 @@ struct req0_sock { nni_list readypipes; nni_list busypipes; + nni_list stoppipes; nni_list ctxs; nni_list sendq; // contexts waiting to send. @@ -74,7 +75,6 @@ struct req0_sock { nni_pollable *sendable; nni_mtx mtx; - nni_cv cv; }; // A req0_pipe is our per-pipe protocol private structure. @@ -113,10 +113,10 @@ req0_sock_init(void **sp, nni_sock *sock) s->reqids, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u); nni_mtx_init(&s->mtx); - nni_cv_init(&s->cv, &s->mtx); NNI_LIST_INIT(&s->readypipes, req0_pipe, node); NNI_LIST_INIT(&s->busypipes, req0_pipe, node); + NNI_LIST_INIT(&s->stoppipes, req0_pipe, node); NNI_LIST_INIT(&s->sendq, req0_ctx, sqnode); NNI_LIST_INIT(&s->ctxs, req0_ctx, snode); @@ -170,10 +170,9 @@ req0_sock_fini(void *arg) req0_sock *s = arg; nni_mtx_lock(&s->mtx); - while ((!nni_list_empty(&s->readypipes)) || - (!nni_list_empty(&s->busypipes))) { - nni_cv_wait(&s->cv); - } + NNI_ASSERT(nni_list_empty(&s->busypipes)); + NNI_ASSERT(nni_list_empty(&s->stoppipes)); + NNI_ASSERT(nni_list_empty(&s->readypipes)); nni_mtx_unlock(&s->mtx); if (s->ctx) { req0_ctx_fini(s->ctx); @@ -181,7 +180,6 @@ req0_sock_fini(void *arg) nni_pollable_free(s->recvable); nni_pollable_free(s->sendable); nni_idhash_fini(s->reqids); - nni_cv_fini(&s->cv); nni_mtx_fini(&s->mtx); NNI_FREE_STRUCT(s); } @@ -190,9 +188,13 @@ static void req0_pipe_stop(void *arg) { req0_pipe *p = arg; + req0_sock *s = p->req; nni_aio_stop(p->aio_recv); nni_aio_stop(p->aio_send); + nni_mtx_lock(&s->mtx); + nni_list_node_remove(&p->node); + nni_mtx_unlock(&s->mtx); } static void @@ -239,7 +241,7 @@ req0_pipe_start(void *arg) } nni_mtx_lock(&s->mtx); - if (s->closed) { + if (s->closed || p->closed) { nni_mtx_unlock(&s->mtx); return (NNG_ECLOSED); } @@ -264,14 +266,11 @@ req0_pipe_close(void *arg) nni_mtx_lock(&s->mtx); // This removes the node from either busypipes or readypipes. - // It doesn't much matter which. + // It doesn't much matter which. We stick the pipe on the stop + // list, so that we can wait for that to close down safely. p->closed = true; - if (nni_list_node_active(&p->node)) { - nni_list_node_remove(&p->node); - if (s->closed) { - nni_cv_wake(&s->cv); - } - } + nni_list_node_remove(&p->node); + nni_list_append(&s->stoppipes, p); if (nni_list_empty(&s->readypipes)) { nni_pollable_clear(s->sendable); } |
