summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/protocol/pipeline/push.c21
1 files changed, 13 insertions, 8 deletions
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 0f973f8a..1d1e6d88 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -38,6 +38,7 @@ struct nni_push_pipe {
nni_push_sock * push;
nni_msgq * mq;
int sigclose;
+ int wantr;
nni_list_node node;
};
@@ -108,6 +109,7 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
pp->pipe = pipe;
pp->sigclose = 0;
pp->push = psock;
+ pp->wantr = 0;
*ppp = pp;
return (0);
}
@@ -168,14 +170,15 @@ nni_push_pipe_send(void *arg)
nni_msg *msg;
for (;;) {
- if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) {
- break;
- }
nni_mtx_lock(mx);
+ pp->wantr = 1;
if (push->wantw) {
nni_cv_wake(&push->cv);
}
nni_mtx_unlock(mx);
+ if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) {
+ break;
+ }
if (nni_pipe_send(pp->pipe, msg) != 0) {
nni_msg_free(msg);
break;
@@ -260,15 +263,19 @@ nni_push_sock_send(void *arg)
return;
}
}
+ push->wantw = 0;
for (i = 0; i < push->npipes; i++) {
pp = push->nextpipe;
if (pp == NULL) {
pp = nni_list_first(&push->pipes);
}
push->nextpipe = nni_list_next(&push->pipes, pp);
- if (nni_msgq_tryput(pp->mq, msg) == 0) {
- msg = NULL;
- break;
+ if (pp->wantr) {
+ pp->wantr = 0;
+ if (nni_msgq_put(pp->mq, msg) == 0) {
+ msg = NULL;
+ break;
+ }
}
}
if (msg != NULL) {
@@ -276,8 +283,6 @@ nni_push_sock_send(void *arg)
// wait for a sender to let us know its ready.
push->wantw = 1;
nni_cv_wait(&push->cv);
- } else {
- push->wantw = 0;
}
nni_mtx_unlock(mx);
}