aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep/req.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep/req.c')
-rw-r--r--src/protocol/reqrep/req.c59
1 files changed, 42 insertions, 17 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index f13094ff..8e7056f5 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -34,6 +34,7 @@ struct nni_req_sock {
nni_time resend;
int raw;
int wantw;
+ int closed;
nni_msg * reqmsg;
nni_req_pipe *pendpipe;
@@ -46,6 +47,7 @@ struct nni_req_sock {
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
nni_mtx mtx;
+ nni_cv cv;
};
// An nni_req_pipe is our per-pipe protocol private structure.
@@ -81,6 +83,10 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
NNI_FREE_STRUCT(req);
return (rv);
}
+ if ((rv = nni_cv_init(&req->cv, &req->mtx)) != 0) {
+ nni_mtx_fini(&req->mtx);
+ NNI_FREE_STRUCT(req);
+ }
NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node);
NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node);
@@ -108,6 +114,10 @@ nni_req_sock_close(void *arg)
{
nni_req_sock *req = arg;
+ nni_mtx_lock(&req->mtx);
+ req->closed = 1;
+ nni_mtx_unlock(&req->mtx);
+
nni_timer_cancel(&req->timer);
}
@@ -117,10 +127,15 @@ nni_req_sock_fini(void *arg)
nni_req_sock *req = arg;
nni_mtx_lock(&req->mtx);
+ while ((!nni_list_empty(&req->readypipes)) ||
+ (!nni_list_empty(&req->busypipes))) {
+ nni_cv_wait(&req->cv);
+ }
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
}
nni_mtx_unlock(&req->mtx);
+ nni_cv_fini(&req->cv);
nni_mtx_fini(&req->mtx);
NNI_FREE_STRUCT(req);
}
@@ -171,15 +186,13 @@ nni_req_pipe_fini(void *arg)
{
nni_req_pipe *rp = arg;
- if (rp != NULL) {
- nni_aio_fini(&rp->aio_getq);
- nni_aio_fini(&rp->aio_putq);
- nni_aio_fini(&rp->aio_recv);
- nni_aio_fini(&rp->aio_sendcooked);
- nni_aio_fini(&rp->aio_sendraw);
- nni_mtx_fini(&rp->mtx);
- NNI_FREE_STRUCT(rp);
- }
+ nni_aio_fini(&rp->aio_getq);
+ nni_aio_fini(&rp->aio_putq);
+ nni_aio_fini(&rp->aio_recv);
+ nni_aio_fini(&rp->aio_sendcooked);
+ nni_aio_fini(&rp->aio_sendraw);
+ nni_mtx_fini(&rp->mtx);
+ NNI_FREE_STRUCT(rp);
}
static int
@@ -193,6 +206,10 @@ nni_req_pipe_start(void *arg)
}
nni_mtx_lock(&req->mtx);
+ if (req->closed) {
+ nni_mtx_unlock(&req->mtx);
+ return (NNG_ECLOSED);
+ }
nni_list_append(&req->readypipes, rp);
if (req->wantw) {
nni_req_resend(req);
@@ -210,11 +227,11 @@ nni_req_pipe_stop(void *arg)
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- nni_aio_cancel(&rp->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_sendcooked, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_sendraw, NNG_ECANCELED);
+ nni_aio_stop(&rp->aio_getq);
+ nni_aio_stop(&rp->aio_putq);
+ nni_aio_stop(&rp->aio_recv);
+ nni_aio_stop(&rp->aio_sendcooked);
+ nni_aio_stop(&rp->aio_sendraw);
// At this point there should not be any further AIOs running.
// Further, any completion tasks have completed.
@@ -222,8 +239,11 @@ nni_req_pipe_stop(void *arg)
nni_mtx_lock(&req->mtx);
// This removes the node from either busypipes or readypipes.
// It doesn't much matter which.
- if (nni_list_active(&req->readypipes, rp)) {
- nni_list_remove(&req->readypipes, rp);
+ if (nni_list_node_active(&rp->node)) {
+ nni_list_node_remove(&rp->node);
+ if (req->closed) {
+ nni_cv_wake(&req->cv);
+ }
}
if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
@@ -443,10 +463,15 @@ nni_req_resend(nni_req_sock *req)
// Note: This routine should be called with the socket lock held.
// Also, this should only be called while handling cooked mode
// requests.
- if (req->reqmsg == NULL) {
+ if ((msg = req->reqmsg) == NULL) {
return;
}
+ if (req->closed) {
+ req->reqmsg = NULL;
+ nni_msg_free(msg);
+ }
+
if (req->wantw) {
req->wantw = 0;