aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep/req.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep/req.c')
-rw-r--r--src/protocol/reqrep/req.c54
1 files changed, 5 insertions, 49 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 54ac7b11..366e0c57 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -40,9 +40,6 @@ struct nni_req_sock {
struct nni_req_pipe {
nni_pipe * pipe;
nni_req_sock * req;
- int good;
- nni_thread * sthr;
- nni_thread * rthr;
int sigclose;
nni_list_node node;
};
@@ -115,44 +112,20 @@ nni_req_destroy(void *arg)
static int
-nni_req_add_pipe(void *arg, nni_pipe *pipe, void **datap)
+nni_req_add_pipe(void *arg, nni_pipe *pipe, void *data)
{
nni_req_sock *req = arg;
- nni_req_pipe *rp;
+ nni_req_pipe *rp = data;
int rv;
- if ((rp = nni_alloc(sizeof (*rp))) == NULL) {
- return (NNG_ENOMEM);
- }
rp->pipe = pipe;
- rp->good = 0;
rp->sigclose = 0;
- rp->sthr = NULL;
- rp->rthr = NULL;
rp->req = req;
nni_mutex_enter(&req->mx);
- if ((rv = nni_thread_create(&rp->rthr, nni_req_receiver, rp)) != 0) {
- goto fail;
- }
- if ((rv = nni_thread_create(&rp->sthr, nni_req_sender, rp)) != 0) {
- goto fail;
- }
- rp->good = 1;
nni_list_append(&req->pipes, rp);
- *datap = rp;
nni_mutex_exit(&req->mx);
return (0);
-fail:
- nni_mutex_exit(&req->mx);
- if (rp->rthr) {
- nni_thread_reap(rp->rthr);
- }
- if (rp->sthr) {
- nni_thread_reap(rp->sthr);
- }
- nni_free(rp, sizeof (*rp));
- return (rv);
}
@@ -165,13 +138,6 @@ nni_req_rem_pipe(void *arg, void *data)
nni_mutex_enter(&req->mx);
nni_list_remove(&req->pipes, rp);
nni_mutex_exit(&req->mx);
-
- if (rp->sthr != NULL) {
- (void) nni_thread_reap(rp->sthr);
- }
- if (rp->rthr != NULL) {
- (void) nni_thread_reap(rp->rthr);
- }
}
@@ -186,13 +152,6 @@ nni_req_sender(void *arg)
nni_msg *msg;
int rv;
- nni_mutex_enter(&req->mx);
- if (!rp->good) {
- nni_mutex_exit(&req->mx);
- return;
- }
- nni_mutex_exit(&req->mx);
-
for (;;) {
rv = nni_msgqueue_get_sig(uwq, &msg, &rp->sigclose);
if (rv != 0) {
@@ -220,12 +179,6 @@ nni_req_receiver(void *arg)
nni_msg *msg;
int rv;
- nni_mutex_enter(&req->mx);
- if (!rp->good) {
- nni_mutex_exit(&req->mx);
- return;
- }
- nni_mutex_exit(&req->mx);
for (;;) {
size_t len;
char *body;
@@ -454,6 +407,9 @@ struct nni_protocol nni_req_protocol = {
.proto_rem_pipe = nni_req_rem_pipe,
.proto_setopt = nni_req_setopt,
.proto_getopt = nni_req_getopt,
+ .proto_pipe_size = sizeof (nni_req_pipe),
+ .proto_pipe_send = nni_req_sender,
+ .proto_pipe_recv = nni_req_receiver,
.proto_recv_filter = nni_req_recvfilter,
.proto_send_filter = nni_req_sendfilter,
};