diff options
Diffstat (limited to 'src/protocol/pair')
| -rw-r--r-- | src/protocol/pair/pair.c | 39 |
1 files changed, 23 insertions, 16 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index e5e2e17b..65eabd87 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -23,6 +23,7 @@ static void nni_pair_send_cb(void *); static void nni_pair_recv_cb(void *); static void nni_pair_getq_cb(void *); static void nni_pair_putq_cb(void *); +static void nni_pair_pipe_fini(void *); // An nni_pair_sock is our per-socket protocol private structure. struct nni_pair_sock { @@ -44,11 +45,10 @@ struct nni_pair_pipe { nni_aio aio_recv; nni_aio aio_getq; nni_aio aio_putq; + int busy; + int closed; }; -static void nni_pair_receiver(void *); -static void nni_pair_sender(void *); - static int nni_pair_sock_init(void **sp, nni_sock *nsock) { @@ -90,22 +90,22 @@ nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock) rv = nni_aio_init(&ppipe->aio_send, nni_pair_send_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } rv = nni_aio_init(&ppipe->aio_recv, nni_pair_recv_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } rv = nni_aio_init(&ppipe->aio_getq, nni_pair_getq_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } rv = nni_aio_init(&ppipe->aio_putq, nni_pair_putq_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } ppipe->npipe = npipe; @@ -120,13 +120,12 @@ nni_pair_pipe_fini(void *arg) { nni_pair_pipe *ppipe = arg; - if (ppipe != NULL) { - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_recv); - nni_aio_fini(&ppipe->aio_putq); - nni_aio_fini(&ppipe->aio_getq); - NNI_FREE_STRUCT(ppipe); - } + NNI_ASSERT(ppipe->busy >= 0); + nni_aio_fini(&ppipe->aio_send); + nni_aio_fini(&ppipe->aio_recv); + nni_aio_fini(&ppipe->aio_putq); + nni_aio_fini(&ppipe->aio_getq); + NNI_FREE_STRUCT(ppipe); } @@ -142,7 +141,10 @@ nni_pair_pipe_add(void *arg) psock->ppipe = ppipe; // Schedule a getq on the upper, and a read from the pipe. + // Each of these also sets up another hold on the pipe itself. + nni_pipe_incref(ppipe->npipe); nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq); + nni_pipe_incref(ppipe->npipe); nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); return (0); @@ -155,9 +157,10 @@ nni_pair_pipe_rem(void *arg) nni_pair_pipe *ppipe = arg; nni_pair_sock *psock = ppipe->psock; + nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq); + nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); + if (psock->ppipe == ppipe) { - nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq); - nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); psock->ppipe = NULL; } } @@ -171,6 +174,7 @@ nni_pair_recv_cb(void *arg) if (nni_aio_result(&ppipe->aio_recv) != 0) { nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } @@ -189,6 +193,7 @@ nni_pair_putq_cb(void *arg) nni_msg_free(ppipe->aio_putq.a_msg); ppipe->aio_putq.a_msg = NULL; nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); @@ -204,6 +209,7 @@ nni_pair_getq_cb(void *arg) if (nni_aio_result(&ppipe->aio_getq) != 0) { nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } @@ -223,6 +229,7 @@ nni_pair_send_cb(void *arg) nni_msg_free(ppipe->aio_send.a_msg); ppipe->aio_send.a_msg = NULL; nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } |
