aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pair/pair.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pair/pair.c')
-rw-r--r--src/protocol/pair/pair.c39
1 files changed, 23 insertions, 16 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index e5e2e17b..65eabd87 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -23,6 +23,7 @@ static void nni_pair_send_cb(void *);
static void nni_pair_recv_cb(void *);
static void nni_pair_getq_cb(void *);
static void nni_pair_putq_cb(void *);
+static void nni_pair_pipe_fini(void *);
// An nni_pair_sock is our per-socket protocol private structure.
struct nni_pair_sock {
@@ -44,11 +45,10 @@ struct nni_pair_pipe {
nni_aio aio_recv;
nni_aio aio_getq;
nni_aio aio_putq;
+ int busy;
+ int closed;
};
-static void nni_pair_receiver(void *);
-static void nni_pair_sender(void *);
-
static int
nni_pair_sock_init(void **sp, nni_sock *nsock)
{
@@ -90,22 +90,22 @@ nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock)
rv = nni_aio_init(&ppipe->aio_send, nni_pair_send_cb, ppipe);
if (rv != 0) {
- nni_pair_sock_fini(ppipe);
+ nni_pair_pipe_fini(ppipe);
return (rv);
}
rv = nni_aio_init(&ppipe->aio_recv, nni_pair_recv_cb, ppipe);
if (rv != 0) {
- nni_pair_sock_fini(ppipe);
+ nni_pair_pipe_fini(ppipe);
return (rv);
}
rv = nni_aio_init(&ppipe->aio_getq, nni_pair_getq_cb, ppipe);
if (rv != 0) {
- nni_pair_sock_fini(ppipe);
+ nni_pair_pipe_fini(ppipe);
return (rv);
}
rv = nni_aio_init(&ppipe->aio_putq, nni_pair_putq_cb, ppipe);
if (rv != 0) {
- nni_pair_sock_fini(ppipe);
+ nni_pair_pipe_fini(ppipe);
return (rv);
}
ppipe->npipe = npipe;
@@ -120,13 +120,12 @@ nni_pair_pipe_fini(void *arg)
{
nni_pair_pipe *ppipe = arg;
- if (ppipe != NULL) {
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_aio_fini(&ppipe->aio_putq);
- nni_aio_fini(&ppipe->aio_getq);
- NNI_FREE_STRUCT(ppipe);
- }
+ NNI_ASSERT(ppipe->busy >= 0);
+ nni_aio_fini(&ppipe->aio_send);
+ nni_aio_fini(&ppipe->aio_recv);
+ nni_aio_fini(&ppipe->aio_putq);
+ nni_aio_fini(&ppipe->aio_getq);
+ NNI_FREE_STRUCT(ppipe);
}
@@ -142,7 +141,10 @@ nni_pair_pipe_add(void *arg)
psock->ppipe = ppipe;
// Schedule a getq on the upper, and a read from the pipe.
+ // Each of these also sets up another hold on the pipe itself.
+ nni_pipe_incref(ppipe->npipe);
nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq);
+ nni_pipe_incref(ppipe->npipe);
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
return (0);
@@ -155,9 +157,10 @@ nni_pair_pipe_rem(void *arg)
nni_pair_pipe *ppipe = arg;
nni_pair_sock *psock = ppipe->psock;
+ nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq);
+ nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq);
+
if (psock->ppipe == ppipe) {
- nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq);
- nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq);
psock->ppipe = NULL;
}
}
@@ -171,6 +174,7 @@ nni_pair_recv_cb(void *arg)
if (nni_aio_result(&ppipe->aio_recv) != 0) {
nni_pipe_close(ppipe->npipe);
+ nni_pipe_decref(ppipe->npipe);
return;
}
@@ -189,6 +193,7 @@ nni_pair_putq_cb(void *arg)
nni_msg_free(ppipe->aio_putq.a_msg);
ppipe->aio_putq.a_msg = NULL;
nni_pipe_close(ppipe->npipe);
+ nni_pipe_decref(ppipe->npipe);
return;
}
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
@@ -204,6 +209,7 @@ nni_pair_getq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_getq) != 0) {
nni_pipe_close(ppipe->npipe);
+ nni_pipe_decref(ppipe->npipe);
return;
}
@@ -223,6 +229,7 @@ nni_pair_send_cb(void *arg)
nni_msg_free(ppipe->aio_send.a_msg);
ppipe->aio_send.a_msg = NULL;
nni_pipe_close(ppipe->npipe);
+ nni_pipe_decref(ppipe->npipe);
return;
}