diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/msgqueue.c | 25 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 4 |
2 files changed, 29 insertions, 0 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 1bb5a762..7c33b256 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -40,6 +40,8 @@ struct nni_msgq { void * mq_filter_arg; }; +static void nni_msgq_run_notify(nni_msgq *); + int nni_msgq_init(nni_msgq **mqp, unsigned cap) { @@ -128,6 +130,7 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) } } mq->mq_geterr = error; + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -149,6 +152,7 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) } } mq->mq_puterr = error; + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -172,6 +176,24 @@ nni_msgq_set_error(nni_msgq *mq, int error) } mq->mq_puterr = error; mq->mq_geterr = error; + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); +} + +void +nni_msgq_flush(nni_msgq *mq) +{ + nni_mtx_lock(&mq->mq_lock); + while (mq->mq_len > 0) { + nni_msg *msg = mq->mq_msgs[mq->mq_get]; + mq->mq_get++; + if (mq->mq_get >= mq->mq_alloc) { + mq->mq_get = 0; + } + mq->mq_len--; + nni_msg_free(msg); + } + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -331,6 +353,7 @@ nni_msgq_cancel(nni_aio *aio, int rv) nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -413,6 +436,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) nni_list_remove(&mq->mq_aio_getq, raio); nni_aio_finish_msg(raio, msg); + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); return (0); } @@ -424,6 +448,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) mq->mq_put = 0; } mq->mq_len++; + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); return (0); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 2f1a46eb..65215bd0 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -33,6 +33,10 @@ extern int nni_msgq_init(nni_msgq **, unsigned); // messages that may be in the queue. extern void nni_msgq_fini(nni_msgq *); +// nni_msgq_flush discards any messages that are sitting in the queue. +// It does not wake any writers that might be waiting. +extern void nni_msgq_flush(nni_msgq *); + extern void nni_msgq_aio_put(nni_msgq *, nni_aio *); extern void nni_msgq_aio_get(nni_msgq *, nni_aio *); |
