diff options
Diffstat (limited to 'src/protocol/reqrep/rep.c')
| -rw-r--r-- | src/protocol/reqrep/rep.c | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 53007e12..9323428c 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -23,8 +23,8 @@ typedef struct nni_rep_sock nni_rep_sock; struct nni_rep_sock { nni_socket * sock; nni_mtx mx; - nni_msgqueue * uwq; - nni_msgqueue * urq; + nni_msgq * uwq; + nni_msgq * urq; int raw; int ttl; nni_thr sender; @@ -37,7 +37,7 @@ struct nni_rep_sock { struct nni_rep_pipe { nni_pipe * pipe; nni_rep_sock * rep; - nni_msgqueue * sendq; + nni_msgq * sendq; int sigclose; }; @@ -111,14 +111,14 @@ nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap) rp->pipe = pipe; rp->sigclose = 0; - rv = nni_msgqueue_create(&rp->sendq, 2); + rv = nni_msgq_init(&rp->sendq, 2); if (rv != 0) { return (rv); } nni_mtx_lock(&rep->mx); if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) { - nni_msgqueue_destroy(rp->sendq); + nni_msgq_fini(rp->sendq); nni_mtx_unlock(&rep->mx); return (rv); } @@ -136,7 +136,7 @@ nni_rep_rem_pipe(void *arg, void *data) nni_mtx_lock(&rep->mx); nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); nni_mtx_unlock(&rep->mx); - nni_msgqueue_destroy(rp->sendq); + nni_msgq_fini(rp->sendq); } @@ -148,8 +148,8 @@ static void nni_rep_topsender(void *arg) { nni_rep_sock *rep = arg; - nni_msgqueue *uwq = rep->uwq; - nni_msgqueue *urq = rep->urq; + nni_msgq *uwq = rep->uwq; + nni_msgq *urq = rep->urq; nni_msg *msg; for (;;) { @@ -159,7 +159,7 @@ nni_rep_topsender(void *arg) nni_rep_pipe *rp; int rv; - if ((rv = nni_msgqueue_get(uwq, &msg)) != 0) { + if ((rv = nni_msgq_get(uwq, &msg)) != 0) { break; } // We yank the outgoing pipe id from the header @@ -184,7 +184,7 @@ nni_rep_topsender(void *arg) continue; } // Try a non-blocking put to the lower writer. - rv = nni_msgqueue_put_until(rp->sendq, msg, NNI_TIME_ZERO); + rv = nni_msgq_put_until(rp->sendq, msg, NNI_TIME_ZERO); if (rv != 0) { // message queue is full, we have no choice but // to drop it. This should not happen under normal @@ -201,8 +201,8 @@ nni_rep_sender(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_msgqueue *urq = rep->urq; - nni_msgqueue *wq = rp->sendq; + nni_msgq *urq = rep->urq; + nni_msgq *wq = rp->sendq; nni_pipe *pipe = rp->pipe; nni_msg *msg; uint8_t *body; @@ -210,7 +210,7 @@ nni_rep_sender(void *arg) int rv; for (;;) { - rv = nni_msgqueue_get_sig(wq, &msg, &rp->sigclose); + rv = nni_msgq_get_sig(wq, &msg, &rp->sigclose); if (rv != 0) { break; } @@ -221,7 +221,7 @@ nni_rep_sender(void *arg) break; } } - nni_msgqueue_signal(urq, &rp->sigclose); + nni_msgq_signal(urq, &rp->sigclose); nni_pipe_close(pipe); } @@ -231,8 +231,8 @@ nni_rep_receiver(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_msgqueue *urq = rep->urq; - nni_msgqueue *uwq = rep->uwq; + nni_msgq *urq = rep->urq; + nni_msgq *uwq = rep->uwq; nni_pipe *pipe = rp->pipe; nni_msg *msg; int rv; @@ -288,13 +288,13 @@ again: } // Now send it up. - rv = nni_msgqueue_put_sig(urq, msg, &rp->sigclose); + rv = nni_msgq_put_sig(urq, msg, &rp->sigclose); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgqueue_signal(uwq, &rp->sigclose); + nni_msgq_signal(uwq, &rp->sigclose); nni_pipe_close(pipe); } |
