aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c69
1 files changed, 7 insertions, 62 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 4f50ac7c..66cd87cc 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -75,9 +75,6 @@ pipe_destroy(nni_pipe *p)
}
nni_mtx_unlock(&nni_pipe_lk);
- // Wait for neg callbacks to finish. (Already closed).
- nni_aio_stop(p->p_start_aio);
-
if (p->p_proto_data != NULL) {
p->p_proto_ops.pipe_stop(p->p_proto_data);
}
@@ -93,7 +90,6 @@ pipe_destroy(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
- nni_aio_fini(p->p_start_aio);
nni_cv_fini(&p->p_cv);
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
@@ -154,9 +150,6 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
void
nni_pipe_close(nni_pipe *p)
{
- // abort any pending negotiation/start process.
- nni_aio_close(p->p_start_aio);
-
nni_mtx_lock(&p->p_mtx);
if (p->p_closed) {
// We already did a close.
@@ -178,49 +171,12 @@ nni_pipe_close(nni_pipe *p)
nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p);
}
-bool
-nni_pipe_closed(nni_pipe *p)
-{
- bool rv;
- nni_mtx_lock(&p->p_mtx);
- rv = p->p_closed;
- nni_mtx_unlock(&p->p_mtx);
- return (rv);
-}
-
uint16_t
nni_pipe_peer(nni_pipe *p)
{
return (p->p_tran_ops.p_peer(p->p_tran_data));
}
-static void
-nni_pipe_start_cb(void *arg)
-{
- nni_pipe *p = arg;
- nni_sock *s = p->p_sock;
- nni_aio * aio = p->p_start_aio;
-
- if (nni_aio_result(aio) != 0) {
- nni_pipe_close(p);
- return;
- }
-
- nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
- if (nni_pipe_closed(p)) {
- nni_pipe_close(p);
- return;
- }
-
- if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) ||
- nni_sock_closing(s)) {
- nni_pipe_close(p);
- return;
- }
-
- nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
-}
-
int
nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
@@ -228,6 +184,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
int rv;
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
+ uint64_t id;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -236,7 +193,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
}
// Make a private copy of the transport ops.
- p->p_start_aio = NULL;
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
@@ -253,18 +209,17 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
- if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) {
- uint64_t id;
- nni_mtx_lock(&nni_pipe_lk);
- if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
- p->p_id = (uint32_t) id;
- }
- nni_mtx_unlock(&nni_pipe_lk);
+ nni_mtx_lock(&nni_pipe_lk);
+ if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
+ p->p_id = id;
+ p->p_refcnt = 1;
}
+ nni_mtx_unlock(&nni_pipe_lk);
if ((rv != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
nni_pipe_close(p);
+ nni_pipe_rele(p);
return (rv);
}
@@ -296,16 +251,6 @@ nni_pipe_getopt(
return (NNG_ENOTSUP);
}
-void
-nni_pipe_start(nni_pipe *p)
-{
- if (p->p_tran_ops.p_start == NULL) {
- nni_aio_finish(p->p_start_aio, 0, 0);
- } else {
- p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio);
- }
-}
-
void *
nni_pipe_get_proto_data(nni_pipe *p)
{