aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c37
1 files changed, 23 insertions, 14 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 150fcd23..249e887c 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -88,8 +88,11 @@ nni_pipe_peer(nni_pipe *p)
void
nni_pipe_destroy(nni_pipe *p)
{
- nni_thr_fini(&p->p_send_thr);
- nni_thr_fini(&p->p_recv_thr);
+ int i;
+
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_thr_fini(&p->p_worker_thr[i]);
+ }
if (p->p_tran_data != NULL) {
p->p_tran_ops.pipe_destroy(p->p_tran_data);
@@ -109,6 +112,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
const nni_proto_pipe_ops *ops = &sock->s_pipe_ops;
void *pdata;
int rv;
+ int i;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
@@ -128,16 +132,19 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
return (rv);
}
p->p_proto_data = pdata;
- if ((rv = nni_thr_init(&p->p_recv_thr, ops->pipe_recv, pdata)) != 0) {
- ops->pipe_fini(&p->p_proto_data);
- NNI_FREE_STRUCT(p);
- return (rv);
- }
- if ((rv = nni_thr_init(&p->p_send_thr, ops->pipe_send, pdata)) != 0) {
- nni_thr_fini(&p->p_recv_thr);
- ops->pipe_fini(&p->p_proto_data);
- NNI_FREE_STRUCT(p);
- return (rv);
+
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_worker fn = ops->pipe_worker[i];
+ rv = nni_thr_init(&p->p_worker_thr[i], fn, pdata);
+ if (rv != 0) {
+ while (i > 0) {
+ i--;
+ nni_thr_fini(&p->p_worker_thr[i]);
+ }
+ ops->pipe_fini(pdata);
+ NNI_FREE_STRUCT(p);
+ return (rv);
+ }
}
*pp = p;
@@ -160,6 +167,7 @@ int
nni_pipe_start(nni_pipe *pipe)
{
int rv;
+ int i;
int collide;
nni_sock *sock = pipe->p_sock;
@@ -200,8 +208,9 @@ nni_pipe_start(nni_pipe *pipe)
nni_list_append(&sock->s_pipes, pipe);
- nni_thr_run(&pipe->p_send_thr);
- nni_thr_run(&pipe->p_recv_thr);
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_thr_run(&pipe->p_worker_thr[i]);
+ }
pipe->p_active = 1;
// XXX: Publish event