diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 66 |
1 files changed, 10 insertions, 56 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 8246279f..529e7f4d 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -17,7 +17,6 @@ struct nni_msgq { nni_mtx mq_lock; - nni_cv mq_drained; int mq_cap; int mq_alloc; // alloc is cap + 2... int mq_len; @@ -26,7 +25,6 @@ struct nni_msgq { int mq_closed; int mq_puterr; int mq_geterr; - int mq_draining; int mq_besteffort; nni_msg **mq_msgs; @@ -68,18 +66,16 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); nni_mtx_init(&mq->mq_lock); - nni_cv_init(&mq->mq_drained, &mq->mq_lock); - - mq->mq_cap = cap; - mq->mq_alloc = alloc; - mq->mq_len = 0; - mq->mq_get = 0; - mq->mq_put = 0; - mq->mq_closed = 0; - mq->mq_puterr = 0; - mq->mq_geterr = 0; - mq->mq_draining = 0; - *mqp = mq; + + mq->mq_cap = cap; + mq->mq_alloc = alloc; + mq->mq_len = 0; + mq->mq_get = 0; + mq->mq_put = 0; + mq->mq_closed = 0; + mq->mq_puterr = 0; + mq->mq_geterr = 0; + *mqp = mq; return (0); } @@ -92,7 +88,6 @@ nni_msgq_fini(nni_msgq *mq) if (mq == NULL) { return; } - nni_cv_fini(&mq->mq_drained); nni_mtx_fini(&mq->mq_lock); /* Free any orphaned messages. */ @@ -317,12 +312,6 @@ nni_msgq_run_notify(nni_msgq *mq) } mq->mq_cb_fn(mq->mq_cb_arg, flags); } - - if (mq->mq_draining) { - if ((mq->mq_len == 0) && !nni_list_empty(&mq->mq_aio_putq)) { - nni_cv_wake(&mq->mq_drained); - } - } } void @@ -438,40 +427,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) } void -nni_msgq_drain(nni_msgq *mq, nni_time expire) -{ - nni_aio *aio; - - nni_mtx_lock(&mq->mq_lock); - mq->mq_closed = 1; - mq->mq_draining = 1; - while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) { - if (nni_cv_until(&mq->mq_drained, expire) != 0) { - break; - } - } - - // Timed out or writers drained. - - // Complete the putq as NNG_ECLOSED. - while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } - - // Free any remaining messages in the queue. - while (mq->mq_len > 0) { - nni_msg *msg = mq->mq_msgs[mq->mq_get++]; - if (mq->mq_get >= mq->mq_alloc) { - mq->mq_get = 0; - } - mq->mq_len--; - nni_msg_free(msg); - } - nni_mtx_unlock(&mq->mq_lock); -} - -void nni_msgq_close(nni_msgq *mq) { nni_aio *aio; @@ -585,7 +540,6 @@ nni_msgq_resize(nni_msgq *mq, int cap) out: // Wake everyone up -- we changed everything. - nni_cv_wake(&mq->mq_drained); nni_mtx_unlock(&mq->mq_lock); return (0); } |
