diff options
Diffstat (limited to 'src/protocol/reqrep/rep.c')
| -rw-r--r-- | src/protocol/reqrep/rep.c | 42 |
1 files changed, 15 insertions, 27 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 507edf66..38bf082a 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -51,8 +51,6 @@ struct nni_rep_pipe { nni_aio aio_send; nni_aio aio_recv; nni_aio aio_putq; - int running; - int refcnt; nni_mtx mtx; }; @@ -123,7 +121,7 @@ nni_rep_sock_close(void *arg) { nni_rep_sock *rep = arg; - nni_msgq_aio_cancel(rep->uwq, &rep->aio_getq); + nni_aio_stop(&rep->aio_getq); } @@ -194,11 +192,6 @@ nni_rep_pipe_start(void *arg) return (rv); } - nni_mtx_lock(&rp->mtx); - rp->refcnt = 2; - rp->running = 1; - nni_mtx_unlock(&rp->mtx); - nni_msgq_aio_get(rp->sendq, &rp->aio_getq); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); return (0); @@ -206,23 +199,21 @@ nni_rep_pipe_start(void *arg) static void -nni_rep_pipe_stop(nni_rep_pipe *rp) +nni_rep_pipe_stop(void *arg) { + nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - int refcnt; uint32_t id; + nni_aio_stop(&rp->aio_getq); + nni_aio_stop(&rp->aio_putq); + nni_aio_stop(&rp->aio_send); + nni_aio_stop(&rp->aio_recv); + nni_mtx_lock(&rp->mtx); - NNI_ASSERT(rp->refcnt > 0); - rp->refcnt--; - refcnt = rp->refcnt; id = rp->id; - rp->id = 0; - if (rp->running) { - rp->running = 0; - nni_msgq_close(rp->sendq); - nni_msgq_aio_cancel(rep->urq, &rp->aio_putq); - } + rp->id = 0; // makes this idempotent + nni_msgq_close(rp->sendq); nni_mtx_unlock(&rp->mtx); if (id != 0) { @@ -230,10 +221,6 @@ nni_rep_pipe_stop(nni_rep_pipe *rp) nni_idhash_remove(&rep->pipes, id); nni_mtx_unlock(&rep->mtx); } - - if (refcnt == 0) { - nni_pipe_remove(rp->pipe); - } } @@ -298,7 +285,7 @@ nni_rep_pipe_getq_cb(void *arg) nni_rep_pipe *rp = arg; if (nni_aio_result(&rp->aio_getq) != 0) { - nni_rep_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -317,7 +304,7 @@ nni_rep_pipe_send_cb(void *arg) if (nni_aio_result(&rp->aio_send) != 0) { nni_msg_free(rp->aio_send.a_msg); rp->aio_send.a_msg = NULL; - nni_rep_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -337,7 +324,7 @@ nni_rep_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(&rp->aio_recv) != 0) { - nni_rep_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -399,7 +386,7 @@ nni_rep_pipe_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); rp->aio_putq.a_msg = NULL; - nni_rep_pipe_stop(rp); + nni_pipe_stop(rp->pipe); return; } @@ -521,6 +508,7 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = { .pipe_init = nni_rep_pipe_init, .pipe_fini = nni_rep_pipe_fini, .pipe_start = nni_rep_pipe_start, + .pipe_stop = nni_rep_pipe_stop, }; static nni_proto_sock_ops nni_rep_sock_ops = { |
