diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 32 |
1 files changed, 17 insertions, 15 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index e49ad05a..b3a107ff 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -77,7 +77,7 @@ nni_pipe_destroy(nni_pipe *p) p->p_tran_ops.pipe_destroy(p->p_tran_data); } if (p->p_proto_data != NULL) { - p->p_proto_ops.pipe_fini(p->p_proto_data); + p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data); } NNI_FREE_STRUCT(p); } @@ -88,8 +88,8 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) { nni_pipe *p; nni_sock *sock = ep->ep_sock; - nni_proto *proto = &sock->s_proto; - const nni_proto_pipe *pops = proto->proto_pipe; + const nni_proto_pipe_ops *ops = &sock->s_pipe_ops; + void *pdata; int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { @@ -105,23 +105,19 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) // and we avoid an extra dereference on hot code paths. p->p_tran_ops = *ep->ep_tran->tran_pipe; - // Make a copy of the protocol ops. Same rationale. - p->p_proto_ops = *pops; - - if ((rv = pops->pipe_init(&p->p_proto_data, p, sock->s_data)) != 0) { + if ((rv = ops->pipe_init(&pdata, p, sock->s_data)) != 0) { NNI_FREE_STRUCT(p); return (rv); } - rv = nni_thr_init(&p->p_recv_thr, pops->pipe_recv, p->p_proto_data); - if (rv != 0) { - pops->pipe_fini(&p->p_proto_data); + p->p_proto_data = pdata; + if ((rv = nni_thr_init(&p->p_recv_thr, ops->pipe_recv, pdata)) != 0) { + ops->pipe_fini(&p->p_proto_data); NNI_FREE_STRUCT(p); return (rv); } - rv = nni_thr_init(&p->p_send_thr, pops->pipe_send, p->p_proto_data); - if (rv != 0) { + if ((rv = nni_thr_init(&p->p_send_thr, ops->pipe_send, pdata)) != 0) { nni_thr_fini(&p->p_recv_thr); - pops->pipe_fini(&p->p_proto_data); + ops->pipe_fini(&p->p_proto_data); NNI_FREE_STRUCT(p); return (rv); } @@ -155,9 +151,16 @@ nni_pipe_start(nni_pipe *pipe) nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { nni_mtx_unlock(&sock->s_mx); + nni_pipe_close(pipe); return (NNG_ECLOSED); } + if (nni_pipe_peer(pipe) != sock->s_peer) { + nni_mtx_unlock(&sock->s_mx); + nni_pipe_close(pipe); + return (NNG_EPROTO); + } + do { // We generate a new pipe ID, but we make sure it does not // collide with any we already have. This can only normally @@ -174,8 +177,7 @@ nni_pipe_start(nni_pipe *pipe) } } while (collide); - rv = pipe->p_proto_ops.pipe_add(pipe->p_proto_data); - if (rv != 0) { + if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) { nni_mtx_unlock(&sock->s_mx); nni_pipe_close(pipe); return (rv); |
