aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep/req.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-20 14:34:51 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-20 14:34:51 -0700
commita37093079b492e966344416445aae354b147d30e (patch)
tree2f21fc2bc716f2423ba02f4713b25038c429ec4e /src/protocol/reqrep/req.c
parent88fb04f61918b06e6e269c1960058c3df5e0a0ef (diff)
downloadnng-a37093079b492e966344416445aae354b147d30e.tar.gz
nng-a37093079b492e966344416445aae354b147d30e.tar.bz2
nng-a37093079b492e966344416445aae354b147d30e.zip
Yet more race condition fixes.
We need to remember that protocol stops can run synchronously, and therefore we need to wait for the aio to complete. Further, we need to break apart shutting down aio activity from deallocation, as we need to shut down *all* async activity before deallocating *anything*. Noticed that we had a pipe race in the surveyor pattern too.
Diffstat (limited to 'src/protocol/reqrep/req.c')
-rw-r--r--src/protocol/reqrep/req.c59
1 files changed, 42 insertions, 17 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index f13094ff..8e7056f5 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -34,6 +34,7 @@ struct nni_req_sock {
nni_time resend;
int raw;
int wantw;
+ int closed;
nni_msg * reqmsg;
nni_req_pipe *pendpipe;
@@ -46,6 +47,7 @@ struct nni_req_sock {
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
nni_mtx mtx;
+ nni_cv cv;
};
// An nni_req_pipe is our per-pipe protocol private structure.
@@ -81,6 +83,10 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
NNI_FREE_STRUCT(req);
return (rv);
}
+ if ((rv = nni_cv_init(&req->cv, &req->mtx)) != 0) {
+ nni_mtx_fini(&req->mtx);
+ NNI_FREE_STRUCT(req);
+ }
NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node);
NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node);
@@ -108,6 +114,10 @@ nni_req_sock_close(void *arg)
{
nni_req_sock *req = arg;
+ nni_mtx_lock(&req->mtx);
+ req->closed = 1;
+ nni_mtx_unlock(&req->mtx);
+
nni_timer_cancel(&req->timer);
}
@@ -117,10 +127,15 @@ nni_req_sock_fini(void *arg)
nni_req_sock *req = arg;
nni_mtx_lock(&req->mtx);
+ while ((!nni_list_empty(&req->readypipes)) ||
+ (!nni_list_empty(&req->busypipes))) {
+ nni_cv_wait(&req->cv);
+ }
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
}
nni_mtx_unlock(&req->mtx);
+ nni_cv_fini(&req->cv);
nni_mtx_fini(&req->mtx);
NNI_FREE_STRUCT(req);
}
@@ -171,15 +186,13 @@ nni_req_pipe_fini(void *arg)
{
nni_req_pipe *rp = arg;
- if (rp != NULL) {
- nni_aio_fini(&rp->aio_getq);
- nni_aio_fini(&rp->aio_putq);
- nni_aio_fini(&rp->aio_recv);
- nni_aio_fini(&rp->aio_sendcooked);
- nni_aio_fini(&rp->aio_sendraw);
- nni_mtx_fini(&rp->mtx);
- NNI_FREE_STRUCT(rp);
- }
+ nni_aio_fini(&rp->aio_getq);
+ nni_aio_fini(&rp->aio_putq);
+ nni_aio_fini(&rp->aio_recv);
+ nni_aio_fini(&rp->aio_sendcooked);
+ nni_aio_fini(&rp->aio_sendraw);
+ nni_mtx_fini(&rp->mtx);
+ NNI_FREE_STRUCT(rp);
}
static int
@@ -193,6 +206,10 @@ nni_req_pipe_start(void *arg)
}
nni_mtx_lock(&req->mtx);
+ if (req->closed) {
+ nni_mtx_unlock(&req->mtx);
+ return (NNG_ECLOSED);
+ }
nni_list_append(&req->readypipes, rp);
if (req->wantw) {
nni_req_resend(req);
@@ -210,11 +227,11 @@ nni_req_pipe_stop(void *arg)
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- nni_aio_cancel(&rp->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_sendcooked, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_sendraw, NNG_ECANCELED);
+ nni_aio_stop(&rp->aio_getq);
+ nni_aio_stop(&rp->aio_putq);
+ nni_aio_stop(&rp->aio_recv);
+ nni_aio_stop(&rp->aio_sendcooked);
+ nni_aio_stop(&rp->aio_sendraw);
// At this point there should not be any further AIOs running.
// Further, any completion tasks have completed.
@@ -222,8 +239,11 @@ nni_req_pipe_stop(void *arg)
nni_mtx_lock(&req->mtx);
// This removes the node from either busypipes or readypipes.
// It doesn't much matter which.
- if (nni_list_active(&req->readypipes, rp)) {
- nni_list_remove(&req->readypipes, rp);
+ if (nni_list_node_active(&rp->node)) {
+ nni_list_node_remove(&rp->node);
+ if (req->closed) {
+ nni_cv_wake(&req->cv);
+ }
}
if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
@@ -443,10 +463,15 @@ nni_req_resend(nni_req_sock *req)
// Note: This routine should be called with the socket lock held.
// Also, this should only be called while handling cooked mode
// requests.
- if (req->reqmsg == NULL) {
+ if ((msg = req->reqmsg) == NULL) {
return;
}
+ if (req->closed) {
+ req->reqmsg = NULL;
+ nni_msg_free(msg);
+ }
+
if (req->wantw) {
req->wantw = 0;