aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/survey0/respond.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/survey0/respond.c')
-rw-r--r--src/protocol/survey0/respond.c10
1 files changed, 10 insertions, 0 deletions
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index 580aa743..b414c189 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -63,6 +63,7 @@ struct resp0_pipe {
nni_pipe * npipe;
resp0_sock * psock;
bool busy;
+ bool closed;
uint32_t id;
nni_list sendq; // contexts waiting to send
nni_aio aio_send;
@@ -336,6 +337,7 @@ resp0_pipe_close(void *arg)
nni_aio_close(&p->aio_recv);
nni_mtx_lock(&s->mtx);
+ p->closed = true;
while ((ctx = nni_list_first(&p->sendq)) != NULL) {
nni_aio *aio;
nni_msg *msg;
@@ -522,6 +524,14 @@ resp0_pipe_recv_cb(void *arg)
len = nni_msg_header_len(msg);
nni_mtx_lock(&s->mtx);
+
+ if (p->closed) {
+ // If pipe was closed, we just abandon the data from it.
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_mtx_unlock(&s->mtx);
+ nni_msg_free(msg);
+ return;
+ }
if ((ctx = nni_list_first(&s->recvq)) == NULL) {
// No one blocked in recv, stall.
nni_list_append(&s->recvpipes, p);