aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pair
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pair')
-rw-r--r--src/protocol/pair/pair.c53
1 files changed, 5 insertions, 48 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 306615b7..3f57c12b 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -35,9 +35,6 @@ struct nni_pair_sock {
struct nni_pair_pipe {
nni_pipe * pipe;
nni_pair_sock * pair;
- int good;
- nni_thread * sthr;
- nni_thread * rthr;
int sigclose;
};
@@ -81,21 +78,14 @@ nni_pair_destroy(void *arg)
static int
-nni_pair_add_pipe(void *arg, nni_pipe *pipe, void **datap)
+nni_pair_add_pipe(void *arg, nni_pipe *pipe, void *data)
{
nni_pair_sock *pair = arg;
- nni_pair_pipe *pp;
+ nni_pair_pipe *pp = data;
int rv;
- if ((pp = nni_alloc(sizeof (*pp))) == NULL) {
- return (NNG_ENOMEM);
- }
-
pp->pipe = pipe;
- pp->good = 0;
pp->sigclose = 0;
- pp->sthr = NULL;
- pp->rthr = NULL;
pp->pair = pair;
nni_mutex_enter(&pair->mx);
@@ -103,27 +93,12 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe, void **datap)
rv = NNG_EBUSY; // Already have a peer, denied.
goto fail;
}
- if ((rv = nni_thread_create(&pp->rthr, nni_pair_receiver, pp)) != 0) {
- goto fail;
- }
- if ((rv = nni_thread_create(&pp->sthr, nni_pair_sender, pp)) != 0) {
- goto fail;
- }
- pp->good = 1;
pair->pipe = pp;
- *datap = pp;
nni_mutex_exit(&pair->mx);
return (0);
fail:
nni_mutex_exit(&pair->mx);
- if (pp->rthr != NULL) {
- nni_thread_reap(pp->rthr);
- }
- if (pp->sthr != NULL) {
- nni_thread_reap(pp->sthr);
- }
- nni_free(pp, sizeof (*pp));
return (rv);
}
@@ -141,13 +116,6 @@ nni_pair_rem_pipe(void *arg, void *data)
}
pair->pipe = NULL;
nni_mutex_exit(&pair->mx);
-
- if (pp->sthr != NULL) {
- (void) nni_thread_reap(pp->sthr);
- }
- if (pp->rthr != NULL) {
- (void) nni_thread_reap(pp->rthr);
- }
}
@@ -162,14 +130,6 @@ nni_pair_sender(void *arg)
nni_msg *msg;
int rv;
- nni_mutex_enter(&pair->mx);
- if (!pp->good) {
- nni_mutex_exit(&pair->mx);
- return;
- }
- nni_mutex_exit(&pair->mx);
-
-
for (;;) {
rv = nni_msgqueue_get_sig(uwq, &msg, &pp->sigclose);
if (rv != 0) {
@@ -197,12 +157,6 @@ nni_pair_receiver(void *arg)
nni_msg *msg;
int rv;
- nni_mutex_enter(&pair->mx);
- if (!pp->good) {
- nni_mutex_exit(&pair->mx);
- return;
- }
- nni_mutex_exit(&pair->mx);
for (;;) {
rv = nni_pipe_recv(pipe, &msg);
if (rv != 0) {
@@ -245,6 +199,9 @@ struct nni_protocol nni_pair_protocol = {
.proto_destroy = nni_pair_destroy,
.proto_add_pipe = nni_pair_add_pipe,
.proto_rem_pipe = nni_pair_rem_pipe,
+ .proto_pipe_size = sizeof (nni_pair_pipe),
+ .proto_pipe_send = nni_pair_sender,
+ .proto_pipe_recv = nni_pair_receiver,
.proto_setopt = nni_pair_setopt,
.proto_getopt = nni_pair_getopt,
.proto_recv_filter = NULL,