aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/msgqueue.c25
-rw-r--r--src/core/msgqueue.h4
2 files changed, 29 insertions, 0 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 1bb5a762..7c33b256 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -40,6 +40,8 @@ struct nni_msgq {
void * mq_filter_arg;
};
+static void nni_msgq_run_notify(nni_msgq *);
+
int
nni_msgq_init(nni_msgq **mqp, unsigned cap)
{
@@ -128,6 +130,7 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
}
}
mq->mq_geterr = error;
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -149,6 +152,7 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
}
}
mq->mq_puterr = error;
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -172,6 +176,24 @@ nni_msgq_set_error(nni_msgq *mq, int error)
}
mq->mq_puterr = error;
mq->mq_geterr = error;
+ nni_msgq_run_notify(mq);
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+void
+nni_msgq_flush(nni_msgq *mq)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ while (mq->mq_len > 0) {
+ nni_msg *msg = mq->mq_msgs[mq->mq_get];
+ mq->mq_get++;
+ if (mq->mq_get >= mq->mq_alloc) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ nni_msg_free(msg);
+ }
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -331,6 +353,7 @@ nni_msgq_cancel(nni_aio *aio, int rv)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -413,6 +436,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
nni_list_remove(&mq->mq_aio_getq, raio);
nni_aio_finish_msg(raio, msg);
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
@@ -424,6 +448,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
mq->mq_put = 0;
}
mq->mq_len++;
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 2f1a46eb..65215bd0 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -33,6 +33,10 @@ extern int nni_msgq_init(nni_msgq **, unsigned);
// messages that may be in the queue.
extern void nni_msgq_fini(nni_msgq *);
+// nni_msgq_flush discards any messages that are sitting in the queue.
+// It does not wake any writers that might be waiting.
+extern void nni_msgq_flush(nni_msgq *);
+
extern void nni_msgq_aio_put(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_get(nni_msgq *, nni_aio *);