aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep/rep.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep/rep.c')
-rw-r--r--src/protocol/reqrep/rep.c34
1 files changed, 6 insertions, 28 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index d37ca691..f35726d8 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -38,7 +38,6 @@ struct nni_rep_sock {
char * btrace;
size_t btrace_len;
nni_aio aio_getq;
- nni_mtx mtx;
};
// An nni_rep_pipe is our per-pipe protocol private structure.
@@ -46,12 +45,10 @@ struct nni_rep_pipe {
nni_pipe * pipe;
nni_rep_sock *rep;
nni_msgq * sendq;
- uint32_t id; // we have to save it
nni_aio aio_getq;
nni_aio aio_send;
nni_aio aio_recv;
nni_aio aio_putq;
- nni_mtx mtx;
};
static void
@@ -64,7 +61,6 @@ nni_rep_sock_fini(void *arg)
if (rep->btrace != NULL) {
nni_free(rep->btrace, rep->btrace_len);
}
- nni_mtx_fini(&rep->mtx);
NNI_FREE_STRUCT(rep);
}
@@ -82,8 +78,8 @@ nni_rep_sock_init(void **repp, nni_sock *sock)
rep->raw = 0;
rep->btrace = NULL;
rep->btrace_len = 0;
- if (((rv = nni_mtx_init(&rep->mtx)) != 0) ||
- ((rv = nni_idhash_init(&rep->pipes)) != 0)) {
+
+ if ((rv = nni_idhash_init(&rep->pipes)) != 0) {
goto fail;
}
@@ -130,8 +126,7 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_msgq_init(&rp->sendq, 2)) != 0) ||
- ((rv = nni_mtx_init(&rp->mtx)) != 0)) {
+ if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) {
goto fail;
}
if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) !=
@@ -170,7 +165,6 @@ nni_rep_pipe_fini(void *arg)
nni_aio_fini(&rp->aio_send);
nni_aio_fini(&rp->aio_recv);
nni_aio_fini(&rp->aio_putq);
- nni_mtx_fini(&rp->mtx);
NNI_FREE_STRUCT(rp);
}
@@ -181,11 +175,7 @@ nni_rep_pipe_start(void *arg)
nni_rep_sock *rep = rp->rep;
int rv;
- rp->id = nni_pipe_id(rp->pipe);
-
- nni_mtx_lock(&rep->mtx);
- rv = nni_idhash_insert(rep->pipes, rp->id, rp);
- nni_mtx_unlock(&rep->mtx);
+ rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp);
if (rv != 0) {
return (rv);
}
@@ -200,24 +190,14 @@ nni_rep_pipe_stop(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- uint32_t id;
nni_aio_stop(&rp->aio_getq);
nni_aio_stop(&rp->aio_putq);
nni_aio_stop(&rp->aio_send);
nni_aio_stop(&rp->aio_recv);
-
- nni_mtx_lock(&rp->mtx);
- id = rp->id;
- rp->id = 0; // makes this idempotent
nni_msgq_close(rp->sendq);
- nni_mtx_unlock(&rp->mtx);
- if (id != 0) {
- nni_mtx_lock(&rep->mtx);
- nni_idhash_remove(rep->pipes, id);
- nni_mtx_unlock(&rep->mtx);
- }
+ nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
}
static void
@@ -260,9 +240,7 @@ nni_rep_sock_getq_cb(void *arg)
// Look for the pipe, and attempt to put the message there
// (nonblocking) if we can. If we can't for any reason, then we
// free the message.
- nni_mtx_lock(&rep->mtx);
rv = nni_idhash_find(rep->pipes, id, (void **) &rp);
- nni_mtx_unlock(&rep->mtx);
if (rv == 0) {
rv = nni_msgq_tryput(rp->sendq, msg);
}
@@ -321,7 +299,7 @@ nni_rep_pipe_recv_cb(void *arg)
return;
}
- NNI_PUT32(idbuf, rp->id);
+ NNI_PUT32(idbuf, nni_pipe_id(rp->pipe));
msg = rp->aio_recv.a_msg;
rp->aio_recv.a_msg = NULL;