diff options
Diffstat (limited to 'src/protocol/reqrep/rep.c')
| -rw-r--r-- | src/protocol/reqrep/rep.c | 100 |
1 files changed, 63 insertions, 37 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 822758ef..507edf66 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -38,6 +38,7 @@ struct nni_rep_sock { char * btrace; size_t btrace_len; nni_aio aio_getq; + nni_mtx mtx; }; // An nni_rep_pipe is our per-pipe protocol private structure. @@ -51,8 +52,25 @@ struct nni_rep_pipe { nni_aio aio_recv; nni_aio aio_putq; int running; + int refcnt; + nni_mtx mtx; }; +static void +nni_rep_sock_fini(void *arg) +{ + nni_rep_sock *rep = arg; + + nni_aio_fini(&rep->aio_getq); + nni_idhash_fini(&rep->pipes); + if (rep->btrace != NULL) { + nni_free(rep->btrace, rep->btrace_len); + } + nni_mtx_fini(&rep->mtx); + NNI_FREE_STRUCT(rep); +} + + static int nni_rep_sock_init(void **repp, nni_sock *sock) { @@ -67,15 +85,14 @@ nni_rep_sock_init(void **repp, nni_sock *sock) rep->raw = 0; rep->btrace = NULL; rep->btrace_len = 0; - if ((rv = nni_idhash_init(&rep->pipes)) != 0) { - NNI_FREE_STRUCT(rep); - return (rv); + if (((rv = nni_mtx_init(&rep->mtx)) != 0) || + ((rv = nni_idhash_init(&rep->pipes)) != 0)) { + goto fail; } rv = nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep); if (rv != 0) { - nni_idhash_fini(&rep->pipes); - return (rv); + goto fail; } rep->uwq = nni_sock_sendq(sock); @@ -85,6 +102,10 @@ nni_rep_sock_init(void **repp, nni_sock *sock) nni_sock_senderr(sock, NNG_ESTATE); return (0); + +fail: + nni_rep_sock_fini(rep); + return (rv); } @@ -106,20 +127,6 @@ nni_rep_sock_close(void *arg) } -static void -nni_rep_sock_fini(void *arg) -{ - nni_rep_sock *rep = arg; - - nni_aio_fini(&rep->aio_getq); - nni_idhash_fini(&rep->pipes); - if (rep->btrace != NULL) { - nni_free(rep->btrace, rep->btrace_len); - } - NNI_FREE_STRUCT(rep); -} - - static int nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { @@ -129,7 +136,8 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) { + if (((rv = nni_msgq_init(&rp->sendq, 2)) != 0) || + ((rv = nni_mtx_init(&rp->mtx)) != 0)) { goto fail; } if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) != 0) { @@ -165,6 +173,7 @@ nni_rep_pipe_fini(void *arg) nni_aio_fini(&rp->aio_send); nni_aio_fini(&rp->aio_recv); nni_aio_fini(&rp->aio_putq); + nni_mtx_fini(&rp->mtx); NNI_FREE_STRUCT(rp); } @@ -177,31 +186,53 @@ nni_rep_pipe_start(void *arg) int rv; rp->id = nni_pipe_id(rp->pipe); + + nni_mtx_lock(&rep->mtx); rv = nni_idhash_insert(&rep->pipes, rp->id, rp); + nni_mtx_unlock(&rep->mtx); if (rv != 0) { return (rv); } - nni_pipe_hold(rp->pipe); + nni_mtx_lock(&rp->mtx); + rp->refcnt = 2; + rp->running = 1; + nni_mtx_unlock(&rp->mtx); + nni_msgq_aio_get(rp->sendq, &rp->aio_getq); - nni_pipe_hold(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); - rp->running = 1; return (0); } static void -nni_rep_pipe_stop(void *arg) +nni_rep_pipe_stop(nni_rep_pipe *rp) { - nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; + int refcnt; + uint32_t id; + nni_mtx_lock(&rp->mtx); + NNI_ASSERT(rp->refcnt > 0); + rp->refcnt--; + refcnt = rp->refcnt; + id = rp->id; + rp->id = 0; if (rp->running) { rp->running = 0; nni_msgq_close(rp->sendq); nni_msgq_aio_cancel(rep->urq, &rp->aio_putq); - nni_idhash_remove(&rep->pipes, rp->id); + } + nni_mtx_unlock(&rp->mtx); + + if (id != 0) { + nni_mtx_lock(&rep->mtx); + nni_idhash_remove(&rep->pipes, id); + nni_mtx_unlock(&rep->mtx); + } + + if (refcnt == 0) { + nni_pipe_remove(rp->pipe); } } @@ -246,12 +277,12 @@ nni_rep_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - nni_sock_lock(rep->sock); + nni_mtx_lock(&rep->mtx); rv = nni_idhash_find(&rep->pipes, id, (void **) &rp); + nni_mtx_unlock(&rep->mtx); if (rv == 0) { rv = nni_msgq_tryput(rp->sendq, msg); } - nni_sock_unlock(rep->sock); if (rv != 0) { nni_msg_free(msg); } @@ -267,8 +298,7 @@ nni_rep_pipe_getq_cb(void *arg) nni_rep_pipe *rp = arg; if (nni_aio_result(&rp->aio_getq) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -287,8 +317,7 @@ nni_rep_pipe_send_cb(void *arg) if (nni_aio_result(&rp->aio_send) != 0) { nni_msg_free(rp->aio_send.a_msg); rp->aio_send.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -308,8 +337,7 @@ nni_rep_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(&rp->aio_recv) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -371,8 +399,7 @@ nni_rep_pipe_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); rp->aio_putq.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -494,7 +521,6 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = { .pipe_init = nni_rep_pipe_init, .pipe_fini = nni_rep_pipe_fini, .pipe_start = nni_rep_pipe_start, - .pipe_stop = nni_rep_pipe_stop, }; static nni_proto_sock_ops nni_rep_sock_ops = { |
