aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-10 14:39:21 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-10 14:39:21 -0800
commitc436e174d0ed8c5dc14af060e994b97a83df7750 (patch)
tree9eeb7ef18ad6eb1a975ab6aaa7a68bcd3ee81c9a /src/core/msgqueue.c
parentf5c259eec0cd3fa5cd623e159cbfec83b4a500d5 (diff)
downloadnng-c436e174d0ed8c5dc14af060e994b97a83df7750.tar.gz
nng-c436e174d0ed8c5dc14af060e994b97a83df7750.tar.bz2
nng-c436e174d0ed8c5dc14af060e994b97a83df7750.zip
Start of close related race fixes. Scalability test.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c42
1 files changed, 42 insertions, 0 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 9607f562..47b98629 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -388,6 +388,11 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_time expire = aio->a_expire;
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -406,6 +411,11 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_time expire = aio->a_expire;
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
@@ -428,6 +438,7 @@ nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio)
// the node from either the getq or the putq list.
if (nni_list_active(&mq->mq_aio_getq, aio)) {
nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, NNG_ECANCELED, 0);
}
nni_mtx_unlock(&mq->mq_lock);
}
@@ -437,6 +448,10 @@ int
nni_msgq_canput(nni_msgq *mq)
{
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+ }
if ((mq->mq_len < mq->mq_cap) ||
(mq->mq_rwait != 0) || // XXX: REMOVE ME
(nni_list_first(&mq->mq_aio_getq) != NULL)) {
@@ -452,6 +467,10 @@ int
nni_msgq_canget(nni_msgq *mq)
{
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+ }
if ((mq->mq_len != 0) ||
(mq->mq_wwait != 0) ||
(nni_list_first(&mq->mq_aio_putq) != NULL)) {
@@ -470,6 +489,10 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
size_t len = nni_msg_len(msg);
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (NNG_ECLOSED);
+ }
// The presence of any blocked reader indicates that
// the queue is empty, otherwise it would have just taken
@@ -804,6 +827,9 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
void
nni_msgq_close(nni_msgq *mq)
{
+ nni_aio *aio;
+ nni_aio *naio;
+
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
mq->mq_wwait = 0;
@@ -821,6 +847,22 @@ nni_msgq_close(nni_msgq *mq)
mq->mq_len--;
nni_msg_free(msg);
}
+
+ // Let all pending blockers know we are closing the queue.
+ naio = nni_list_first(&mq->mq_aio_getq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ }
+
+ naio = nni_list_first(&mq->mq_aio_putq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_putq, aio);
+ nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ }
+
nni_mtx_unlock(&mq->mq_lock);
}