From 1e4cf4e7c872a705a41bca66dd377ca2dbbe0d69 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 18 Jan 2017 13:16:58 -0800 Subject: Fixes pipeline stall due to backpressure signaling race. --- src/protocol/pipeline/push.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'src/protocol/pipeline/push.c') diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 0f973f8a..1d1e6d88 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -38,6 +38,7 @@ struct nni_push_pipe { nni_push_sock * push; nni_msgq * mq; int sigclose; + int wantr; nni_list_node node; }; @@ -108,6 +109,7 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) pp->pipe = pipe; pp->sigclose = 0; pp->push = psock; + pp->wantr = 0; *ppp = pp; return (0); } @@ -168,14 +170,15 @@ nni_push_pipe_send(void *arg) nni_msg *msg; for (;;) { - if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) { - break; - } nni_mtx_lock(mx); + pp->wantr = 1; if (push->wantw) { nni_cv_wake(&push->cv); } nni_mtx_unlock(mx); + if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) { + break; + } if (nni_pipe_send(pp->pipe, msg) != 0) { nni_msg_free(msg); break; @@ -260,15 +263,19 @@ nni_push_sock_send(void *arg) return; } } + push->wantw = 0; for (i = 0; i < push->npipes; i++) { pp = push->nextpipe; if (pp == NULL) { pp = nni_list_first(&push->pipes); } push->nextpipe = nni_list_next(&push->pipes, pp); - if (nni_msgq_tryput(pp->mq, msg) == 0) { - msg = NULL; - break; + if (pp->wantr) { + pp->wantr = 0; + if (nni_msgq_put(pp->mq, msg) == 0) { + msg = NULL; + break; + } } } if (msg != NULL) { @@ -276,8 +283,6 @@ nni_push_sock_send(void *arg) // wait for a sender to let us know its ready. push->wantw = 1; nni_cv_wait(&push->cv); - } else { - push->wantw = 0; } nni_mtx_unlock(mx); } -- cgit v1.2.3-70-g09d2