diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 42 |
1 files changed, 42 insertions, 0 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 9607f562..47b98629 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -388,6 +388,11 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_time expire = aio->a_expire; nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&mq->mq_lock); + return; + } nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -406,6 +411,11 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_time expire = aio->a_expire; nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&mq->mq_lock); + return; + } nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); @@ -428,6 +438,7 @@ nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio) // the node from either the getq or the putq list. if (nni_list_active(&mq->mq_aio_getq, aio)) { nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, NNG_ECANCELED, 0); } nni_mtx_unlock(&mq->mq_lock); } @@ -437,6 +448,10 @@ int nni_msgq_canput(nni_msgq *mq) { nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + nni_mtx_unlock(&mq->mq_lock); + return (0); + } if ((mq->mq_len < mq->mq_cap) || (mq->mq_rwait != 0) || // XXX: REMOVE ME (nni_list_first(&mq->mq_aio_getq) != NULL)) { @@ -452,6 +467,10 @@ int nni_msgq_canget(nni_msgq *mq) { nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + nni_mtx_unlock(&mq->mq_lock); + return (0); + } if ((mq->mq_len != 0) || (mq->mq_wwait != 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) { @@ -470,6 +489,10 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) size_t len = nni_msg_len(msg); nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + nni_mtx_unlock(&mq->mq_lock); + return (NNG_ECLOSED); + } // The presence of any blocked reader indicates that // the queue is empty, otherwise it would have just taken @@ -804,6 +827,9 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) void nni_msgq_close(nni_msgq *mq) { + nni_aio *aio; + nni_aio *naio; + nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; mq->mq_wwait = 0; @@ -821,6 +847,22 @@ nni_msgq_close(nni_msgq *mq) mq->mq_len--; nni_msg_free(msg); } + + // Let all pending blockers know we are closing the queue. + naio = nni_list_first(&mq->mq_aio_getq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_getq, aio); + nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + + naio = nni_list_first(&mq->mq_aio_putq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_putq, aio); + nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + nni_mtx_unlock(&mq->mq_lock); } |
