diff options
Diffstat (limited to 'src/protocol/reqrep')
| -rw-r--r-- | src/protocol/reqrep/rep.c | 42 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 44 |
2 files changed, 32 insertions, 54 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 507edf66..38bf082a 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -51,8 +51,6 @@ struct nni_rep_pipe { nni_aio aio_send; nni_aio aio_recv; nni_aio aio_putq; - int running; - int refcnt; nni_mtx mtx; }; @@ -123,7 +121,7 @@ nni_rep_sock_close(void *arg) { nni_rep_sock *rep = arg; - nni_msgq_aio_cancel(rep->uwq, &rep->aio_getq); + nni_aio_stop(&rep->aio_getq); } @@ -194,11 +192,6 @@ nni_rep_pipe_start(void *arg) return (rv); } - nni_mtx_lock(&rp->mtx); - rp->refcnt = 2; - rp->running = 1; - nni_mtx_unlock(&rp->mtx); - nni_msgq_aio_get(rp->sendq, &rp->aio_getq); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); return (0); @@ -206,23 +199,21 @@ nni_rep_pipe_start(void *arg) static void -nni_rep_pipe_stop(nni_rep_pipe *rp) +nni_rep_pipe_stop(void *arg) { + nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - int refcnt; uint32_t id; + nni_aio_stop(&rp->aio_getq); + nni_aio_stop(&rp->aio_putq); + nni_aio_stop(&rp->aio_send); + nni_aio_stop(&rp->aio_recv); + nni_mtx_lock(&rp->mtx); - NNI_ASSERT(rp->refcnt > 0); - rp->refcnt--; - refcnt = rp->refcnt; id = rp->id; - rp->id = 0; - if (rp->running) { - rp->running = 0; - nni_msgq_close(rp->sendq); - nni_msgq_aio_cancel(rep->urq, &rp->aio_putq); - } + rp->id = 0; // makes this idempotent + nni_msgq_close(rp->sendq); nni_mtx_unlock(&rp->mtx); if (id != 0) { @@ -230,10 +221,6 @@ nni_rep_pipe_stop(nni_rep_pipe *rp) nni_idhash_remove(&rep->pipes, id); nni_mtx_unlock(&rep->mtx); } - - if (refcnt == 0) { - nni_pipe_remove(rp->pipe); - } } @@ -298,7 +285,7 @@ nni_rep_pipe_getq_cb(void *arg) nni_rep_pipe *rp = arg; if (nni_aio_result(&rp->aio_getq) != 0) { - nni_rep_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -317,7 +304,7 @@ nni_rep_pipe_send_cb(void *arg) if (nni_aio_result(&rp->aio_send) != 0) { nni_msg_free(rp->aio_send.a_msg); rp->aio_send.a_msg = NULL; - nni_rep_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -337,7 +324,7 @@ nni_rep_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(&rp->aio_recv) != 0) { - nni_rep_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -399,7 +386,7 @@ nni_rep_pipe_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); rp->aio_putq.a_msg = NULL; - nni_rep_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -521,6 +508,7 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = { .pipe_init = nni_rep_pipe_init, .pipe_fini = nni_rep_pipe_fini, .pipe_start = nni_rep_pipe_start, + .pipe_stop = nni_rep_pipe_stop, }; static nni_proto_sock_ops nni_rep_sock_ops = { diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 1ce1156c..c2542e18 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -57,7 +57,6 @@ struct nni_req_pipe { nni_aio aio_sendcooked; // cooked mode only nni_aio aio_recv; nni_aio aio_putq; - int refcnt; nni_mtx mtx; }; @@ -197,10 +196,6 @@ nni_req_pipe_start(void *arg) return (NNG_EPROTO); } - nni_mtx_lock(&rp->mtx); - rp->refcnt = 2; - nni_mtx_unlock(&rp->mtx); - nni_mtx_lock(&req->mtx); nni_list_append(&req->readypipes, rp); if (req->wantw) { @@ -216,19 +211,19 @@ nni_req_pipe_start(void *arg) static void -nni_req_pipe_stop(nni_req_pipe *rp) +nni_req_pipe_stop(void *arg) { + nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; - int refcnt; - nni_mtx_lock(&rp->mtx); - NNI_ASSERT(rp->refcnt > 0); - rp->refcnt--; - refcnt = rp->refcnt; - nni_mtx_unlock(&rp->mtx); + nni_aio_stop(&rp->aio_getq); + nni_aio_stop(&rp->aio_putq); + nni_aio_stop(&rp->aio_recv); + nni_aio_stop(&rp->aio_sendcooked); + nni_aio_stop(&rp->aio_sendraw); - nni_msgq_aio_cancel(req->uwq, &rp->aio_getq); - nni_msgq_aio_cancel(req->urq, &rp->aio_putq); + // At this point there should not be any further AIOs running. + // Further, any completion tasks have completed. nni_mtx_lock(&req->mtx); // This removes the node from either busypipes or readypipes. @@ -245,12 +240,7 @@ nni_req_pipe_stop(nni_req_pipe *rp) req->wantw = 1; nni_req_resend(req); } - nni_mtx_unlock(&req->mtx); - - if (refcnt == 0) { - nni_pipe_remove(rp->pipe); - } } @@ -323,7 +313,7 @@ nni_req_getq_cb(void *arg) // exception: we wind up here in error state when the uwq is closed.) if (nni_aio_result(&rp->aio_getq) != 0) { - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -344,7 +334,7 @@ nni_req_sendraw_cb(void *arg) if (nni_aio_result(&rp->aio_sendraw) != 0) { nni_msg_free(rp->aio_sendraw.a_msg); rp->aio_sendraw.a_msg = NULL; - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -359,14 +349,13 @@ nni_req_sendcooked_cb(void *arg) nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; - NNI_ASSERT(rp->refcnt > 0); if (nni_aio_result(&rp->aio_sendcooked) != 0) { // We failed to send... clean up and deal with it. // We leave ourselves on the busy list for now, which // means no new asynchronous traffic can occur here. nni_msg_free(rp->aio_sendcooked.a_msg); rp->aio_sendcooked.a_msg = NULL; - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -384,7 +373,7 @@ nni_req_sendcooked_cb(void *arg) // side while we were waiting to be scheduled to run for the // writer side. In this case we can't complete the operation, // and we have to abort. - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); } nni_mtx_unlock(&req->mtx); } @@ -397,7 +386,7 @@ nni_req_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } rp->aio_putq.a_msg = NULL; @@ -413,7 +402,7 @@ nni_req_recv_cb(void *arg) nni_msg *msg; if (nni_aio_result(&rp->aio_recv) != 0) { - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -443,7 +432,7 @@ nni_req_recv_cb(void *arg) malformed: nni_msg_free(msg); - nni_req_pipe_stop(rp); + nni_pipe_stop(rp->pipe); } @@ -615,6 +604,7 @@ static nni_proto_pipe_ops nni_req_pipe_ops = { .pipe_init = nni_req_pipe_init, .pipe_fini = nni_req_pipe_fini, .pipe_start = nni_req_pipe_start, + .pipe_stop = nni_req_pipe_stop, }; static nni_proto_sock_ops nni_req_sock_ops = { |
