diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-18 13:16:58 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-18 13:20:01 -0800 |
| commit | 1e4cf4e7c872a705a41bca66dd377ca2dbbe0d69 (patch) | |
| tree | 04a99c591fbfd28400e4af51414dfbfce1c68e96 | |
| parent | e88e28fcf5e5dd5b478d0a7a462836026da975fe (diff) | |
| download | nng-1e4cf4e7c872a705a41bca66dd377ca2dbbe0d69.tar.gz nng-1e4cf4e7c872a705a41bca66dd377ca2dbbe0d69.tar.bz2 nng-1e4cf4e7c872a705a41bca66dd377ca2dbbe0d69.zip | |
Fixes pipeline stall due to backpressure signaling race.
| -rw-r--r-- | src/protocol/pipeline/push.c | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 0f973f8a..1d1e6d88 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -38,6 +38,7 @@ struct nni_push_pipe { nni_push_sock * push; nni_msgq * mq; int sigclose; + int wantr; nni_list_node node; }; @@ -108,6 +109,7 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) pp->pipe = pipe; pp->sigclose = 0; pp->push = psock; + pp->wantr = 0; *ppp = pp; return (0); } @@ -168,14 +170,15 @@ nni_push_pipe_send(void *arg) nni_msg *msg; for (;;) { - if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) { - break; - } nni_mtx_lock(mx); + pp->wantr = 1; if (push->wantw) { nni_cv_wake(&push->cv); } nni_mtx_unlock(mx); + if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) { + break; + } if (nni_pipe_send(pp->pipe, msg) != 0) { nni_msg_free(msg); break; @@ -260,15 +263,19 @@ nni_push_sock_send(void *arg) return; } } + push->wantw = 0; for (i = 0; i < push->npipes; i++) { pp = push->nextpipe; if (pp == NULL) { pp = nni_list_first(&push->pipes); } push->nextpipe = nni_list_next(&push->pipes, pp); - if (nni_msgq_tryput(pp->mq, msg) == 0) { - msg = NULL; - break; + if (pp->wantr) { + pp->wantr = 0; + if (nni_msgq_put(pp->mq, msg) == 0) { + msg = NULL; + break; + } } } if (msg != NULL) { @@ -276,8 +283,6 @@ nni_push_sock_send(void *arg) // wait for a sender to let us know its ready. push->wantw = 1; nni_cv_wait(&push->cv); - } else { - push->wantw = 0; } nni_mtx_unlock(mx); } |
