aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pair
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pair')
-rw-r--r--src/protocol/pair/pair.c52
1 files changed, 28 insertions, 24 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 692f4b0e..98995186 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -16,27 +16,30 @@
// While a peer is connected to the server, all other peer connection
// attempts are discarded.
+typedef struct nni_pair_pipe nni_pair_pipe;
+typedef struct nni_pair_sock nni_pair_sock;
+
// An nni_pair_sock is our per-socket protocol private structure.
-typedef struct nni_pair_sock {
+struct nni_pair_sock {
nni_socket * sock;
- nni_pipe * pipe;
+ nni_pair_pipe * pipe;
nni_mutex mx;
nni_msgqueue * uwq;
nni_msgqueue * urq;
-} nni_pair_sock;
+};
// An nni_pair_pipe is our per-pipe protocol private structure. We keep
// one of these even though in theory we'd only have a single underlying
// pipe. The separate data structure is more like other protocols that do
// manage multiple pipes.
-typedef struct nni_pair_pipe {
+struct nni_pair_pipe {
nni_pipe * pipe;
nni_pair_sock * pair;
int good;
nni_thread * sthr;
nni_thread * rthr;
int sigclose;
-} nni_pair_pipe;
+};
static void nni_pair_receiver(void *);
static void nni_pair_sender(void *);
@@ -55,6 +58,7 @@ nni_pair_create(void **pairp, nni_socket *sock)
return (rv);
}
pair->sock = sock;
+ pair->pipe = NULL;
pair->uwq = nni_socket_sendq(sock);
pair->urq = nni_socket_recvq(sock);
*pairp = pair;
@@ -77,7 +81,7 @@ nni_pair_destroy(void *arg)
static int
-nni_pair_add_pipe(void *arg, nni_pipe *pipe)
+nni_pair_add_pipe(void *arg, nni_pipe *pipe, void **datap)
{
nni_pair_sock *pair = arg;
nni_pair_pipe *pp;
@@ -89,6 +93,7 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe)
pp->sigclose = 0;
pp->sthr = NULL;
pp->rthr = NULL;
+ pp->pair = pair;
nni_mutex_enter(&pair->mx);
if (pair->pipe != NULL) {
@@ -106,31 +111,33 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe)
return (rv);
}
pp->good = 1;
- pair->pipe = pipe;
+ pair->pipe = pp;
+ *datap = pp;
nni_mutex_exit(&pair->mx);
- return (NNG_EINVAL);
+ return (0);
}
-static int
-nni_pair_rem_pipe(void *arg, nni_pipe *pipe)
+static void
+nni_pair_rem_pipe(void *arg, void *data)
{
- nni_pair_pipe *pp = arg;
- nni_pair_sock *pair = pp->pair;
+ nni_pair_sock *pair = arg;
+ nni_pair_pipe *pp = data;
- if (pp->sthr) {
- (void) nni_thread_reap(pp->sthr);
- }
- if (pp->rthr) {
- (void) nni_thread_reap(pp->rthr);
- }
nni_mutex_enter(&pair->mx);
- if (pair->pipe != pipe) {
+ if (pair->pipe != pp) {
nni_mutex_exit(&pair->mx);
- return (NNG_EINVAL);
+ return;
}
+ pair->pipe = NULL;
nni_mutex_exit(&pair->mx);
- return (NNG_EINVAL);
+
+ if (pp->sthr != NULL) {
+ (void) nni_thread_reap(pp->sthr);
+ }
+ if (pp->rthr != NULL) {
+ (void) nni_thread_reap(pp->rthr);
+ }
}
@@ -166,7 +173,6 @@ nni_pair_sender(void *arg)
}
nni_msgqueue_signal(urq, &pp->sigclose);
nni_pipe_close(pipe);
- nni_socket_rem_pipe(pair->sock, pipe);
}
@@ -187,7 +193,6 @@ nni_pair_receiver(void *arg)
return;
}
nni_mutex_exit(&pair->mx);
-
for (;;) {
rv = nni_pipe_recv(pipe, &msg);
if (rv != 0) {
@@ -201,7 +206,6 @@ nni_pair_receiver(void *arg)
}
nni_msgqueue_signal(uwq, &pp->sigclose);
nni_pipe_close(pipe);
- nni_socket_rem_pipe(pair->sock, pipe);
}