diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 37 |
1 files changed, 23 insertions, 14 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 150fcd23..249e887c 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -88,8 +88,11 @@ nni_pipe_peer(nni_pipe *p) void nni_pipe_destroy(nni_pipe *p) { - nni_thr_fini(&p->p_send_thr); - nni_thr_fini(&p->p_recv_thr); + int i; + + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_fini(&p->p_worker_thr[i]); + } if (p->p_tran_data != NULL) { p->p_tran_ops.pipe_destroy(p->p_tran_data); @@ -109,6 +112,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) const nni_proto_pipe_ops *ops = &sock->s_pipe_ops; void *pdata; int rv; + int i; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -128,16 +132,19 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) return (rv); } p->p_proto_data = pdata; - if ((rv = nni_thr_init(&p->p_recv_thr, ops->pipe_recv, pdata)) != 0) { - ops->pipe_fini(&p->p_proto_data); - NNI_FREE_STRUCT(p); - return (rv); - } - if ((rv = nni_thr_init(&p->p_send_thr, ops->pipe_send, pdata)) != 0) { - nni_thr_fini(&p->p_recv_thr); - ops->pipe_fini(&p->p_proto_data); - NNI_FREE_STRUCT(p); - return (rv); + + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_worker fn = ops->pipe_worker[i]; + rv = nni_thr_init(&p->p_worker_thr[i], fn, pdata); + if (rv != 0) { + while (i > 0) { + i--; + nni_thr_fini(&p->p_worker_thr[i]); + } + ops->pipe_fini(pdata); + NNI_FREE_STRUCT(p); + return (rv); + } } *pp = p; @@ -160,6 +167,7 @@ int nni_pipe_start(nni_pipe *pipe) { int rv; + int i; int collide; nni_sock *sock = pipe->p_sock; @@ -200,8 +208,9 @@ nni_pipe_start(nni_pipe *pipe) nni_list_append(&sock->s_pipes, pipe); - nni_thr_run(&pipe->p_send_thr); - nni_thr_run(&pipe->p_recv_thr); + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_run(&pipe->p_worker_thr[i]); + } pipe->p_active = 1; // XXX: Publish event |
