diff options
Diffstat (limited to 'src/protocol/reqrep/req.c')
| -rw-r--r-- | src/protocol/reqrep/req.c | 59 |
1 files changed, 42 insertions, 17 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index f13094ff..8e7056f5 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -34,6 +34,7 @@ struct nni_req_sock { nni_time resend; int raw; int wantw; + int closed; nni_msg * reqmsg; nni_req_pipe *pendpipe; @@ -46,6 +47,7 @@ struct nni_req_sock { uint32_t nextid; // next id uint8_t reqid[4]; // outstanding request ID (big endian) nni_mtx mtx; + nni_cv cv; }; // An nni_req_pipe is our per-pipe protocol private structure. @@ -81,6 +83,10 @@ nni_req_sock_init(void **reqp, nni_sock *sock) NNI_FREE_STRUCT(req); return (rv); } + if ((rv = nni_cv_init(&req->cv, &req->mtx)) != 0) { + nni_mtx_fini(&req->mtx); + NNI_FREE_STRUCT(req); + } NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node); NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node); @@ -108,6 +114,10 @@ nni_req_sock_close(void *arg) { nni_req_sock *req = arg; + nni_mtx_lock(&req->mtx); + req->closed = 1; + nni_mtx_unlock(&req->mtx); + nni_timer_cancel(&req->timer); } @@ -117,10 +127,15 @@ nni_req_sock_fini(void *arg) nni_req_sock *req = arg; nni_mtx_lock(&req->mtx); + while ((!nni_list_empty(&req->readypipes)) || + (!nni_list_empty(&req->busypipes))) { + nni_cv_wait(&req->cv); + } if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); } nni_mtx_unlock(&req->mtx); + nni_cv_fini(&req->cv); nni_mtx_fini(&req->mtx); NNI_FREE_STRUCT(req); } @@ -171,15 +186,13 @@ nni_req_pipe_fini(void *arg) { nni_req_pipe *rp = arg; - if (rp != NULL) { - nni_aio_fini(&rp->aio_getq); - nni_aio_fini(&rp->aio_putq); - nni_aio_fini(&rp->aio_recv); - nni_aio_fini(&rp->aio_sendcooked); - nni_aio_fini(&rp->aio_sendraw); - nni_mtx_fini(&rp->mtx); - NNI_FREE_STRUCT(rp); - } + nni_aio_fini(&rp->aio_getq); + nni_aio_fini(&rp->aio_putq); + nni_aio_fini(&rp->aio_recv); + nni_aio_fini(&rp->aio_sendcooked); + nni_aio_fini(&rp->aio_sendraw); + nni_mtx_fini(&rp->mtx); + NNI_FREE_STRUCT(rp); } static int @@ -193,6 +206,10 @@ nni_req_pipe_start(void *arg) } nni_mtx_lock(&req->mtx); + if (req->closed) { + nni_mtx_unlock(&req->mtx); + return (NNG_ECLOSED); + } nni_list_append(&req->readypipes, rp); if (req->wantw) { nni_req_resend(req); @@ -210,11 +227,11 @@ nni_req_pipe_stop(void *arg) nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; - nni_aio_cancel(&rp->aio_getq, NNG_ECANCELED); - nni_aio_cancel(&rp->aio_putq, NNG_ECANCELED); - nni_aio_cancel(&rp->aio_recv, NNG_ECANCELED); - nni_aio_cancel(&rp->aio_sendcooked, NNG_ECANCELED); - nni_aio_cancel(&rp->aio_sendraw, NNG_ECANCELED); + nni_aio_stop(&rp->aio_getq); + nni_aio_stop(&rp->aio_putq); + nni_aio_stop(&rp->aio_recv); + nni_aio_stop(&rp->aio_sendcooked); + nni_aio_stop(&rp->aio_sendraw); // At this point there should not be any further AIOs running. // Further, any completion tasks have completed. @@ -222,8 +239,11 @@ nni_req_pipe_stop(void *arg) nni_mtx_lock(&req->mtx); // This removes the node from either busypipes or readypipes. // It doesn't much matter which. - if (nni_list_active(&req->readypipes, rp)) { - nni_list_remove(&req->readypipes, rp); + if (nni_list_node_active(&rp->node)) { + nni_list_node_remove(&rp->node); + if (req->closed) { + nni_cv_wake(&req->cv); + } } if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { @@ -443,10 +463,15 @@ nni_req_resend(nni_req_sock *req) // Note: This routine should be called with the socket lock held. // Also, this should only be called while handling cooked mode // requests. - if (req->reqmsg == NULL) { + if ((msg = req->reqmsg) == NULL) { return; } + if (req->closed) { + req->reqmsg = NULL; + nni_msg_free(msg); + } + if (req->wantw) { req->wantw = 0; |
