aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/reqrep0/rep.c5
-rw-r--r--src/protocol/reqrep0/req.c30
-rw-r--r--src/protocol/survey0/respond.c5
3 files changed, 20 insertions, 20 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 1f4f0b33..f725cadb 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -413,15 +413,14 @@ rep0_pipe_send_cb(void *arg)
nni_msg * msg;
size_t len;
- nni_mtx_lock(&s->lk);
- p->busy = false;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->pipe);
- nni_mtx_unlock(&s->lk);
return;
}
+ nni_mtx_lock(&s->lk);
+ p->busy = false;
if ((ctx = nni_list_first(&p->sendq)) == NULL) {
// Nothing else to send.
if (p->id == s->ctx->pipe_id) {
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 8a0dd4d8..43751d14 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -82,7 +82,8 @@ struct req0_pipe {
nni_pipe * pipe;
req0_sock * req;
nni_list_node node;
- nni_list ctxs; // ctxs with pending traffic
+ nni_list ctxs; // ctxs with pending traffic
+ bool sending; // if busy sending
nni_aio * aio_send;
nni_aio * aio_recv;
};
@@ -264,6 +265,7 @@ req0_pipe_close(void *arg)
nni_mtx_lock(&s->mtx);
// This removes the node from either busypipes or readypipes.
// It doesn't much matter which.
+ p->sending = false;
if (nni_list_node_active(&p->node)) {
nni_list_node_remove(&p->node);
if (s->closed) {
@@ -311,20 +313,19 @@ req0_send_cb(void *arg)
// in the ready list, and re-run the sendq.
nni_mtx_lock(&s->mtx);
- if (nni_list_active(&s->busypipes, p)) {
- nni_list_remove(&s->busypipes, p);
- nni_list_append(&s->readypipes, p);
- if (nni_list_empty(&s->sendq)) {
- nni_pollable_raise(s->sendable);
- }
- req0_run_sendq(s, &aios);
- } else {
- // We wind up here if stop was called from the reader
- // side while we were waiting to be scheduled to run for the
- // writer side. In this case we can't complete the operation,
- // and we have to abort.
- nni_pipe_stop(p->pipe);
+ if (!p->sending) {
+ // This occurs if the req0_pipe_close has been called.
+ // In that case we don't want any more processing.
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+ nni_list_remove(&s->busypipes, p);
+ nni_list_append(&s->readypipes, p);
+ p->sending = false;
+ if (nni_list_empty(&s->sendq)) {
+ nni_pollable_raise(s->sendable);
}
+ req0_run_sendq(s, &aios);
nni_mtx_unlock(&s->mtx);
while ((aio = nni_list_first(&aios)) != NULL) {
@@ -533,6 +534,7 @@ req0_run_sendq(req0_sock *s, nni_list *aiolist)
nni_list_remove(&s->readypipes, p);
nni_list_append(&s->busypipes, p);
+ p->sending = true;
if ((aio = ctx->saio) != NULL) {
ctx->saio = NULL;
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index db18a4e8..fbdeb65a 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -402,15 +402,14 @@ resp0_pipe_send_cb(void *arg)
nni_msg * msg;
size_t len;
- nni_mtx_lock(&s->mtx);
- p->busy = false;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->npipe);
- nni_mtx_unlock(&s->mtx);
return;
}
+ nni_mtx_lock(&s->mtx);
+ p->busy = false;
if ((ctx = nni_list_first(&p->sendq)) == NULL) {
// Nothing else to send.
if (p->id == s->ctx->pipe_id) {