From 0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 29 Dec 2016 23:49:05 -0800 Subject: Factor out repeated protocol code into common. --- src/protocol/pair/pair.c | 53 +++++----------------------------------------- src/protocol/reqrep/req.c | 54 +++++------------------------------------------ 2 files changed, 10 insertions(+), 97 deletions(-) (limited to 'src/protocol') diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 306615b7..3f57c12b 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -35,9 +35,6 @@ struct nni_pair_sock { struct nni_pair_pipe { nni_pipe * pipe; nni_pair_sock * pair; - int good; - nni_thread * sthr; - nni_thread * rthr; int sigclose; }; @@ -81,21 +78,14 @@ nni_pair_destroy(void *arg) static int -nni_pair_add_pipe(void *arg, nni_pipe *pipe, void **datap) +nni_pair_add_pipe(void *arg, nni_pipe *pipe, void *data) { nni_pair_sock *pair = arg; - nni_pair_pipe *pp; + nni_pair_pipe *pp = data; int rv; - if ((pp = nni_alloc(sizeof (*pp))) == NULL) { - return (NNG_ENOMEM); - } - pp->pipe = pipe; - pp->good = 0; pp->sigclose = 0; - pp->sthr = NULL; - pp->rthr = NULL; pp->pair = pair; nni_mutex_enter(&pair->mx); @@ -103,27 +93,12 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe, void **datap) rv = NNG_EBUSY; // Already have a peer, denied. goto fail; } - if ((rv = nni_thread_create(&pp->rthr, nni_pair_receiver, pp)) != 0) { - goto fail; - } - if ((rv = nni_thread_create(&pp->sthr, nni_pair_sender, pp)) != 0) { - goto fail; - } - pp->good = 1; pair->pipe = pp; - *datap = pp; nni_mutex_exit(&pair->mx); return (0); fail: nni_mutex_exit(&pair->mx); - if (pp->rthr != NULL) { - nni_thread_reap(pp->rthr); - } - if (pp->sthr != NULL) { - nni_thread_reap(pp->sthr); - } - nni_free(pp, sizeof (*pp)); return (rv); } @@ -141,13 +116,6 @@ nni_pair_rem_pipe(void *arg, void *data) } pair->pipe = NULL; nni_mutex_exit(&pair->mx); - - if (pp->sthr != NULL) { - (void) nni_thread_reap(pp->sthr); - } - if (pp->rthr != NULL) { - (void) nni_thread_reap(pp->rthr); - } } @@ -162,14 +130,6 @@ nni_pair_sender(void *arg) nni_msg *msg; int rv; - nni_mutex_enter(&pair->mx); - if (!pp->good) { - nni_mutex_exit(&pair->mx); - return; - } - nni_mutex_exit(&pair->mx); - - for (;;) { rv = nni_msgqueue_get_sig(uwq, &msg, &pp->sigclose); if (rv != 0) { @@ -197,12 +157,6 @@ nni_pair_receiver(void *arg) nni_msg *msg; int rv; - nni_mutex_enter(&pair->mx); - if (!pp->good) { - nni_mutex_exit(&pair->mx); - return; - } - nni_mutex_exit(&pair->mx); for (;;) { rv = nni_pipe_recv(pipe, &msg); if (rv != 0) { @@ -245,6 +199,9 @@ struct nni_protocol nni_pair_protocol = { .proto_destroy = nni_pair_destroy, .proto_add_pipe = nni_pair_add_pipe, .proto_rem_pipe = nni_pair_rem_pipe, + .proto_pipe_size = sizeof (nni_pair_pipe), + .proto_pipe_send = nni_pair_sender, + .proto_pipe_recv = nni_pair_receiver, .proto_setopt = nni_pair_setopt, .proto_getopt = nni_pair_getopt, .proto_recv_filter = NULL, diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 54ac7b11..366e0c57 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -40,9 +40,6 @@ struct nni_req_sock { struct nni_req_pipe { nni_pipe * pipe; nni_req_sock * req; - int good; - nni_thread * sthr; - nni_thread * rthr; int sigclose; nni_list_node node; }; @@ -115,44 +112,20 @@ nni_req_destroy(void *arg) static int -nni_req_add_pipe(void *arg, nni_pipe *pipe, void **datap) +nni_req_add_pipe(void *arg, nni_pipe *pipe, void *data) { nni_req_sock *req = arg; - nni_req_pipe *rp; + nni_req_pipe *rp = data; int rv; - if ((rp = nni_alloc(sizeof (*rp))) == NULL) { - return (NNG_ENOMEM); - } rp->pipe = pipe; - rp->good = 0; rp->sigclose = 0; - rp->sthr = NULL; - rp->rthr = NULL; rp->req = req; nni_mutex_enter(&req->mx); - if ((rv = nni_thread_create(&rp->rthr, nni_req_receiver, rp)) != 0) { - goto fail; - } - if ((rv = nni_thread_create(&rp->sthr, nni_req_sender, rp)) != 0) { - goto fail; - } - rp->good = 1; nni_list_append(&req->pipes, rp); - *datap = rp; nni_mutex_exit(&req->mx); return (0); -fail: - nni_mutex_exit(&req->mx); - if (rp->rthr) { - nni_thread_reap(rp->rthr); - } - if (rp->sthr) { - nni_thread_reap(rp->sthr); - } - nni_free(rp, sizeof (*rp)); - return (rv); } @@ -165,13 +138,6 @@ nni_req_rem_pipe(void *arg, void *data) nni_mutex_enter(&req->mx); nni_list_remove(&req->pipes, rp); nni_mutex_exit(&req->mx); - - if (rp->sthr != NULL) { - (void) nni_thread_reap(rp->sthr); - } - if (rp->rthr != NULL) { - (void) nni_thread_reap(rp->rthr); - } } @@ -186,13 +152,6 @@ nni_req_sender(void *arg) nni_msg *msg; int rv; - nni_mutex_enter(&req->mx); - if (!rp->good) { - nni_mutex_exit(&req->mx); - return; - } - nni_mutex_exit(&req->mx); - for (;;) { rv = nni_msgqueue_get_sig(uwq, &msg, &rp->sigclose); if (rv != 0) { @@ -220,12 +179,6 @@ nni_req_receiver(void *arg) nni_msg *msg; int rv; - nni_mutex_enter(&req->mx); - if (!rp->good) { - nni_mutex_exit(&req->mx); - return; - } - nni_mutex_exit(&req->mx); for (;;) { size_t len; char *body; @@ -454,6 +407,9 @@ struct nni_protocol nni_req_protocol = { .proto_rem_pipe = nni_req_rem_pipe, .proto_setopt = nni_req_setopt, .proto_getopt = nni_req_getopt, + .proto_pipe_size = sizeof (nni_req_pipe), + .proto_pipe_send = nni_req_sender, + .proto_pipe_recv = nni_req_receiver, .proto_recv_filter = nni_req_recvfilter, .proto_send_filter = nni_req_sendfilter, }; -- cgit v1.2.3-70-g09d2