diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 67 |
1 files changed, 62 insertions, 5 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 374c45c8..4f50ac7c 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -75,6 +75,9 @@ pipe_destroy(nni_pipe *p) } nni_mtx_unlock(&nni_pipe_lk); + // Wait for neg callbacks to finish. (Already closed). + nni_aio_stop(p->p_start_aio); + if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_stop(p->p_proto_data); } @@ -90,6 +93,7 @@ pipe_destroy(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } + nni_aio_fini(p->p_start_aio); nni_cv_fini(&p->p_cv); nni_mtx_fini(&p->p_mtx); NNI_FREE_STRUCT(p); @@ -150,6 +154,9 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) void nni_pipe_close(nni_pipe *p) { + // abort any pending negotiation/start process. + nni_aio_close(p->p_start_aio); + nni_mtx_lock(&p->p_mtx); if (p->p_closed) { // We already did a close. @@ -171,12 +178,49 @@ nni_pipe_close(nni_pipe *p) nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p); } +bool +nni_pipe_closed(nni_pipe *p) +{ + bool rv; + nni_mtx_lock(&p->p_mtx); + rv = p->p_closed; + nni_mtx_unlock(&p->p_mtx); + return (rv); +} + uint16_t nni_pipe_peer(nni_pipe *p) { return (p->p_tran_ops.p_peer(p->p_tran_data)); } +static void +nni_pipe_start_cb(void *arg) +{ + nni_pipe *p = arg; + nni_sock *s = p->p_sock; + nni_aio * aio = p->p_start_aio; + + if (nni_aio_result(aio) != 0) { + nni_pipe_close(p); + return; + } + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); + if (nni_pipe_closed(p)) { + nni_pipe_close(p); + return; + } + + if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || + nni_sock_closing(s)) { + nni_pipe_close(p); + return; + } + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); +} + int nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { @@ -184,7 +228,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) int rv; void * sdata = nni_sock_proto_data(sock); nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); - uint64_t id; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... @@ -193,6 +236,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) } // Make a private copy of the transport ops. + p->p_start_aio = NULL; p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; p->p_proto_ops = *pops; @@ -209,11 +253,14 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) nni_mtx_init(&p->p_mtx); nni_cv_init(&p->p_cv, &nni_pipe_lk); - nni_mtx_lock(&nni_pipe_lk); - if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { - p->p_id = (uint32_t) id; + if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) { + uint64_t id; + nni_mtx_lock(&nni_pipe_lk); + if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { + p->p_id = (uint32_t) id; + } + nni_mtx_unlock(&nni_pipe_lk); } - nni_mtx_unlock(&nni_pipe_lk); if ((rv != 0) || ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) { @@ -249,6 +296,16 @@ nni_pipe_getopt( return (NNG_ENOTSUP); } +void +nni_pipe_start(nni_pipe *p) +{ + if (p->p_tran_ops.p_start == NULL) { + nni_aio_finish(p->p_start_aio, 0, 0); + } else { + p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio); + } +} + void * nni_pipe_get_proto_data(nni_pipe *p) { |
