diff options
Diffstat (limited to 'src/protocol/reqrep/req.c')
| -rw-r--r-- | src/protocol/reqrep/req.c | 44 |
1 files changed, 17 insertions, 27 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 1ce1156c..c2542e18 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -57,7 +57,6 @@ struct nni_req_pipe { nni_aio aio_sendcooked; // cooked mode only nni_aio aio_recv; nni_aio aio_putq; - int refcnt; nni_mtx mtx; }; @@ -197,10 +196,6 @@ nni_req_pipe_start(void *arg) return (NNG_EPROTO); } - nni_mtx_lock(&rp->mtx); - rp->refcnt = 2; - nni_mtx_unlock(&rp->mtx); - nni_mtx_lock(&req->mtx); nni_list_append(&req->readypipes, rp); if (req->wantw) { @@ -216,19 +211,19 @@ nni_req_pipe_start(void *arg) static void -nni_req_pipe_stop(nni_req_pipe *rp) +nni_req_pipe_stop(void *arg) { + nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; - int refcnt; - nni_mtx_lock(&rp->mtx); - NNI_ASSERT(rp->refcnt > 0); - rp->refcnt--; - refcnt = rp->refcnt; - nni_mtx_unlock(&rp->mtx); + nni_aio_stop(&rp->aio_getq); + nni_aio_stop(&rp->aio_putq); + nni_aio_stop(&rp->aio_recv); + nni_aio_stop(&rp->aio_sendcooked); + nni_aio_stop(&rp->aio_sendraw); - nni_msgq_aio_cancel(req->uwq, &rp->aio_getq); - nni_msgq_aio_cancel(req->urq, &rp->aio_putq); + // At this point there should not be any further AIOs running. + // Further, any completion tasks have completed. nni_mtx_lock(&req->mtx); // This removes the node from either busypipes or readypipes. @@ -245,12 +240,7 @@ nni_req_pipe_stop(nni_req_pipe *rp) req->wantw = 1; nni_req_resend(req); } - nni_mtx_unlock(&req->mtx); - - if (refcnt == 0) { - nni_pipe_remove(rp->pipe); - } } @@ -323,7 +313,7 @@ nni_req_getq_cb(void *arg) // exception: we wind up here in error state when the uwq is closed.) if (nni_aio_result(&rp->aio_getq) != 0) { - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -344,7 +334,7 @@ nni_req_sendraw_cb(void *arg) if (nni_aio_result(&rp->aio_sendraw) != 0) { nni_msg_free(rp->aio_sendraw.a_msg); rp->aio_sendraw.a_msg = NULL; - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -359,14 +349,13 @@ nni_req_sendcooked_cb(void *arg) nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; - NNI_ASSERT(rp->refcnt > 0); if (nni_aio_result(&rp->aio_sendcooked) != 0) { // We failed to send... clean up and deal with it. // We leave ourselves on the busy list for now, which // means no new asynchronous traffic can occur here. nni_msg_free(rp->aio_sendcooked.a_msg); rp->aio_sendcooked.a_msg = NULL; - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -384,7 +373,7 @@ nni_req_sendcooked_cb(void *arg) // side while we were waiting to be scheduled to run for the // writer side. In this case we can't complete the operation, // and we have to abort. - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); } nni_mtx_unlock(&req->mtx); } @@ -397,7 +386,7 @@ nni_req_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } rp->aio_putq.a_msg = NULL; @@ -413,7 +402,7 @@ nni_req_recv_cb(void *arg) nni_msg *msg; if (nni_aio_result(&rp->aio_recv) != 0) { - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -443,7 +432,7 @@ nni_req_recv_cb(void *arg) malformed: nni_msg_free(msg); - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); } @@ -615,6 +604,7 @@ static nni_proto_pipe_ops nni_req_pipe_ops = { .pipe_init = nni_req_pipe_init, .pipe_fini = nni_req_pipe_fini, .pipe_start = nni_req_pipe_start, + .pipe_stop = nni_req_pipe_stop, }; static nni_proto_sock_ops nni_req_sock_ops = { |
