diff options
Diffstat (limited to 'src/protocol/pair/pair.c')
| -rw-r--r-- | src/protocol/pair/pair.c | 52 |
1 files changed, 28 insertions, 24 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 692f4b0e..98995186 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -16,27 +16,30 @@ // While a peer is connected to the server, all other peer connection // attempts are discarded. +typedef struct nni_pair_pipe nni_pair_pipe; +typedef struct nni_pair_sock nni_pair_sock; + // An nni_pair_sock is our per-socket protocol private structure. -typedef struct nni_pair_sock { +struct nni_pair_sock { nni_socket * sock; - nni_pipe * pipe; + nni_pair_pipe * pipe; nni_mutex mx; nni_msgqueue * uwq; nni_msgqueue * urq; -} nni_pair_sock; +}; // An nni_pair_pipe is our per-pipe protocol private structure. We keep // one of these even though in theory we'd only have a single underlying // pipe. The separate data structure is more like other protocols that do // manage multiple pipes. -typedef struct nni_pair_pipe { +struct nni_pair_pipe { nni_pipe * pipe; nni_pair_sock * pair; int good; nni_thread * sthr; nni_thread * rthr; int sigclose; -} nni_pair_pipe; +}; static void nni_pair_receiver(void *); static void nni_pair_sender(void *); @@ -55,6 +58,7 @@ nni_pair_create(void **pairp, nni_socket *sock) return (rv); } pair->sock = sock; + pair->pipe = NULL; pair->uwq = nni_socket_sendq(sock); pair->urq = nni_socket_recvq(sock); *pairp = pair; @@ -77,7 +81,7 @@ nni_pair_destroy(void *arg) static int -nni_pair_add_pipe(void *arg, nni_pipe *pipe) +nni_pair_add_pipe(void *arg, nni_pipe *pipe, void **datap) { nni_pair_sock *pair = arg; nni_pair_pipe *pp; @@ -89,6 +93,7 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe) pp->sigclose = 0; pp->sthr = NULL; pp->rthr = NULL; + pp->pair = pair; nni_mutex_enter(&pair->mx); if (pair->pipe != NULL) { @@ -106,31 +111,33 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe) return (rv); } pp->good = 1; - pair->pipe = pipe; + pair->pipe = pp; + *datap = pp; nni_mutex_exit(&pair->mx); - return (NNG_EINVAL); + return (0); } -static int -nni_pair_rem_pipe(void *arg, nni_pipe *pipe) +static void +nni_pair_rem_pipe(void *arg, void *data) { - nni_pair_pipe *pp = arg; - nni_pair_sock *pair = pp->pair; + nni_pair_sock *pair = arg; + nni_pair_pipe *pp = data; - if (pp->sthr) { - (void) nni_thread_reap(pp->sthr); - } - if (pp->rthr) { - (void) nni_thread_reap(pp->rthr); - } nni_mutex_enter(&pair->mx); - if (pair->pipe != pipe) { + if (pair->pipe != pp) { nni_mutex_exit(&pair->mx); - return (NNG_EINVAL); + return; } + pair->pipe = NULL; nni_mutex_exit(&pair->mx); - return (NNG_EINVAL); + + if (pp->sthr != NULL) { + (void) nni_thread_reap(pp->sthr); + } + if (pp->rthr != NULL) { + (void) nni_thread_reap(pp->rthr); + } } @@ -166,7 +173,6 @@ nni_pair_sender(void *arg) } nni_msgqueue_signal(urq, &pp->sigclose); nni_pipe_close(pipe); - nni_socket_rem_pipe(pair->sock, pipe); } @@ -187,7 +193,6 @@ nni_pair_receiver(void *arg) return; } nni_mutex_exit(&pair->mx); - for (;;) { rv = nni_pipe_recv(pipe, &msg); if (rv != 0) { @@ -201,7 +206,6 @@ nni_pair_receiver(void *arg) } nni_msgqueue_signal(uwq, &pp->sigclose); nni_pipe_close(pipe); - nni_socket_rem_pipe(pair->sock, pipe); } |
