aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep/rep.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep/rep.c')
-rw-r--r--src/protocol/reqrep/rep.c36
1 files changed, 18 insertions, 18 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 53007e12..9323428c 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -23,8 +23,8 @@ typedef struct nni_rep_sock nni_rep_sock;
struct nni_rep_sock {
nni_socket * sock;
nni_mtx mx;
- nni_msgqueue * uwq;
- nni_msgqueue * urq;
+ nni_msgq * uwq;
+ nni_msgq * urq;
int raw;
int ttl;
nni_thr sender;
@@ -37,7 +37,7 @@ struct nni_rep_sock {
struct nni_rep_pipe {
nni_pipe * pipe;
nni_rep_sock * rep;
- nni_msgqueue * sendq;
+ nni_msgq * sendq;
int sigclose;
};
@@ -111,14 +111,14 @@ nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap)
rp->pipe = pipe;
rp->sigclose = 0;
- rv = nni_msgqueue_create(&rp->sendq, 2);
+ rv = nni_msgq_init(&rp->sendq, 2);
if (rv != 0) {
return (rv);
}
nni_mtx_lock(&rep->mx);
if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) {
- nni_msgqueue_destroy(rp->sendq);
+ nni_msgq_fini(rp->sendq);
nni_mtx_unlock(&rep->mx);
return (rv);
}
@@ -136,7 +136,7 @@ nni_rep_rem_pipe(void *arg, void *data)
nni_mtx_lock(&rep->mx);
nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
nni_mtx_unlock(&rep->mx);
- nni_msgqueue_destroy(rp->sendq);
+ nni_msgq_fini(rp->sendq);
}
@@ -148,8 +148,8 @@ static void
nni_rep_topsender(void *arg)
{
nni_rep_sock *rep = arg;
- nni_msgqueue *uwq = rep->uwq;
- nni_msgqueue *urq = rep->urq;
+ nni_msgq *uwq = rep->uwq;
+ nni_msgq *urq = rep->urq;
nni_msg *msg;
for (;;) {
@@ -159,7 +159,7 @@ nni_rep_topsender(void *arg)
nni_rep_pipe *rp;
int rv;
- if ((rv = nni_msgqueue_get(uwq, &msg)) != 0) {
+ if ((rv = nni_msgq_get(uwq, &msg)) != 0) {
break;
}
// We yank the outgoing pipe id from the header
@@ -184,7 +184,7 @@ nni_rep_topsender(void *arg)
continue;
}
// Try a non-blocking put to the lower writer.
- rv = nni_msgqueue_put_until(rp->sendq, msg, NNI_TIME_ZERO);
+ rv = nni_msgq_put_until(rp->sendq, msg, NNI_TIME_ZERO);
if (rv != 0) {
// message queue is full, we have no choice but
// to drop it. This should not happen under normal
@@ -201,8 +201,8 @@ nni_rep_sender(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- nni_msgqueue *urq = rep->urq;
- nni_msgqueue *wq = rp->sendq;
+ nni_msgq *urq = rep->urq;
+ nni_msgq *wq = rp->sendq;
nni_pipe *pipe = rp->pipe;
nni_msg *msg;
uint8_t *body;
@@ -210,7 +210,7 @@ nni_rep_sender(void *arg)
int rv;
for (;;) {
- rv = nni_msgqueue_get_sig(wq, &msg, &rp->sigclose);
+ rv = nni_msgq_get_sig(wq, &msg, &rp->sigclose);
if (rv != 0) {
break;
}
@@ -221,7 +221,7 @@ nni_rep_sender(void *arg)
break;
}
}
- nni_msgqueue_signal(urq, &rp->sigclose);
+ nni_msgq_signal(urq, &rp->sigclose);
nni_pipe_close(pipe);
}
@@ -231,8 +231,8 @@ nni_rep_receiver(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- nni_msgqueue *urq = rep->urq;
- nni_msgqueue *uwq = rep->uwq;
+ nni_msgq *urq = rep->urq;
+ nni_msgq *uwq = rep->uwq;
nni_pipe *pipe = rp->pipe;
nni_msg *msg;
int rv;
@@ -288,13 +288,13 @@ again:
}
// Now send it up.
- rv = nni_msgqueue_put_sig(urq, msg, &rp->sigclose);
+ rv = nni_msgq_put_sig(urq, msg, &rp->sigclose);
if (rv != 0) {
nni_msg_free(msg);
break;
}
}
- nni_msgqueue_signal(uwq, &rp->sigclose);
+ nni_msgq_signal(uwq, &rp->sigclose);
nni_pipe_close(pipe);
}