aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c32
1 files changed, 17 insertions, 15 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index e49ad05a..b3a107ff 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -77,7 +77,7 @@ nni_pipe_destroy(nni_pipe *p)
p->p_tran_ops.pipe_destroy(p->p_tran_data);
}
if (p->p_proto_data != NULL) {
- p->p_proto_ops.pipe_fini(p->p_proto_data);
+ p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data);
}
NNI_FREE_STRUCT(p);
}
@@ -88,8 +88,8 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
{
nni_pipe *p;
nni_sock *sock = ep->ep_sock;
- nni_proto *proto = &sock->s_proto;
- const nni_proto_pipe *pops = proto->proto_pipe;
+ const nni_proto_pipe_ops *ops = &sock->s_pipe_ops;
+ void *pdata;
int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
@@ -105,23 +105,19 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
// and we avoid an extra dereference on hot code paths.
p->p_tran_ops = *ep->ep_tran->tran_pipe;
- // Make a copy of the protocol ops. Same rationale.
- p->p_proto_ops = *pops;
-
- if ((rv = pops->pipe_init(&p->p_proto_data, p, sock->s_data)) != 0) {
+ if ((rv = ops->pipe_init(&pdata, p, sock->s_data)) != 0) {
NNI_FREE_STRUCT(p);
return (rv);
}
- rv = nni_thr_init(&p->p_recv_thr, pops->pipe_recv, p->p_proto_data);
- if (rv != 0) {
- pops->pipe_fini(&p->p_proto_data);
+ p->p_proto_data = pdata;
+ if ((rv = nni_thr_init(&p->p_recv_thr, ops->pipe_recv, pdata)) != 0) {
+ ops->pipe_fini(&p->p_proto_data);
NNI_FREE_STRUCT(p);
return (rv);
}
- rv = nni_thr_init(&p->p_send_thr, pops->pipe_send, p->p_proto_data);
- if (rv != 0) {
+ if ((rv = nni_thr_init(&p->p_send_thr, ops->pipe_send, pdata)) != 0) {
nni_thr_fini(&p->p_recv_thr);
- pops->pipe_fini(&p->p_proto_data);
+ ops->pipe_fini(&p->p_proto_data);
NNI_FREE_STRUCT(p);
return (rv);
}
@@ -155,9 +151,16 @@ nni_pipe_start(nni_pipe *pipe)
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
nni_mtx_unlock(&sock->s_mx);
+ nni_pipe_close(pipe);
return (NNG_ECLOSED);
}
+ if (nni_pipe_peer(pipe) != sock->s_peer) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_pipe_close(pipe);
+ return (NNG_EPROTO);
+ }
+
do {
// We generate a new pipe ID, but we make sure it does not
// collide with any we already have. This can only normally
@@ -174,8 +177,7 @@ nni_pipe_start(nni_pipe *pipe)
}
} while (collide);
- rv = pipe->p_proto_ops.pipe_add(pipe->p_proto_data);
- if (rv != 0) {
+ if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) {
nni_mtx_unlock(&sock->s_mx);
nni_pipe_close(pipe);
return (rv);