aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep')
-rw-r--r--src/protocol/reqrep/req.c62
1 files changed, 35 insertions, 27 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 519b224e..1ce1156c 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -57,7 +57,6 @@ struct nni_req_pipe {
nni_aio aio_sendcooked; // cooked mode only
nni_aio aio_recv;
nni_aio aio_putq;
- int running;
int refcnt;
nni_mtx mtx;
};
@@ -119,9 +118,11 @@ nni_req_sock_fini(void *arg)
{
nni_req_sock *req = arg;
+ nni_mtx_lock(&req->mtx);
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
}
+ nni_mtx_unlock(&req->mtx);
nni_mtx_fini(&req->mtx);
NNI_FREE_STRUCT(req);
}
@@ -195,6 +196,11 @@ nni_req_pipe_start(void *arg)
if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) {
return (NNG_EPROTO);
}
+
+ nni_mtx_lock(&rp->mtx);
+ rp->refcnt = 2;
+ nni_mtx_unlock(&rp->mtx);
+
nni_mtx_lock(&req->mtx);
nni_list_append(&req->readypipes, rp);
if (req->wantw) {
@@ -202,10 +208,6 @@ nni_req_pipe_start(void *arg)
}
nni_mtx_unlock(&req->mtx);
- nni_mtx_lock(&rp->mtx);
- rp->refcnt = 2;
- rp->running = 1;
- nni_mtx_unlock(&rp->mtx);
nni_msgq_aio_get(req->uwq, &rp->aio_getq);
nni_pipe_aio_recv(rp->pipe, &rp->aio_recv);
@@ -218,34 +220,33 @@ nni_req_pipe_stop(nni_req_pipe *rp)
{
nni_req_sock *req = rp->req;
int refcnt;
- int running;
nni_mtx_lock(&rp->mtx);
- running = rp->running;
- rp->running = 0;
NNI_ASSERT(rp->refcnt > 0);
rp->refcnt--;
refcnt = rp->refcnt;
nni_mtx_unlock(&rp->mtx);
- if (running) {
- nni_mtx_lock(&req->mtx);
- // This removes the node from either busypipes or readypipes.
- // It doesn't much matter which.
+ nni_msgq_aio_cancel(req->uwq, &rp->aio_getq);
+ nni_msgq_aio_cancel(req->urq, &rp->aio_putq);
+
+ nni_mtx_lock(&req->mtx);
+ // This removes the node from either busypipes or readypipes.
+ // It doesn't much matter which.
+ if (nni_list_active(&req->readypipes, rp)) {
nni_list_remove(&req->readypipes, rp);
+ }
- if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
- // removing the pipe we sent the last request on...
- // schedule immediate resend.
- req->resend = NNI_TIME_ZERO;
- req->wantw = 1;
- nni_req_resend(req);
- }
- nni_mtx_unlock(&req->mtx);
+ if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
+ // removing the pipe we sent the last request on...
+ // schedule immediate resend.
+ req->pendpipe = NULL;
+ req->resend = NNI_TIME_ZERO;
+ req->wantw = 1;
+ nni_req_resend(req);
}
- nni_msgq_aio_cancel(req->uwq, &rp->aio_getq);
- nni_msgq_aio_cancel(req->urq, &rp->aio_putq);
+ nni_mtx_unlock(&req->mtx);
if (refcnt == 0) {
nni_pipe_remove(rp->pipe);
@@ -357,8 +358,8 @@ nni_req_sendcooked_cb(void *arg)
{
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- nni_mtx *mx = nni_sock_mtx(req->sock);
+ NNI_ASSERT(rp->refcnt > 0);
if (nni_aio_result(&rp->aio_sendcooked) != 0) {
// We failed to send... clean up and deal with it.
// We leave ourselves on the busy list for now, which
@@ -374,10 +375,17 @@ nni_req_sendcooked_cb(void *arg)
// a resend.
nni_mtx_lock(&req->mtx);
- nni_list_remove(&req->busypipes, rp);
- nni_list_append(&req->readypipes, rp);
-
- nni_req_resend(req);
+ if (nni_list_active(&req->busypipes, rp)) {
+ nni_list_remove(&req->busypipes, rp);
+ nni_list_append(&req->readypipes, rp);
+ nni_req_resend(req);
+ } else {
+ // We wind up here if stop was called from the reader
+ // side while we were waiting to be scheduled to run for the
+ // writer side. In this case we can't complete the operation,
+ // and we have to abort.
+ nni_req_pipe_stop(rp);
+ }
nni_mtx_unlock(&req->mtx);
}