aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c66
1 files changed, 10 insertions, 56 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 8246279f..529e7f4d 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -17,7 +17,6 @@
struct nni_msgq {
nni_mtx mq_lock;
- nni_cv mq_drained;
int mq_cap;
int mq_alloc; // alloc is cap + 2...
int mq_len;
@@ -26,7 +25,6 @@ struct nni_msgq {
int mq_closed;
int mq_puterr;
int mq_geterr;
- int mq_draining;
int mq_besteffort;
nni_msg **mq_msgs;
@@ -68,18 +66,16 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_mtx_init(&mq->mq_lock);
- nni_cv_init(&mq->mq_drained, &mq->mq_lock);
-
- mq->mq_cap = cap;
- mq->mq_alloc = alloc;
- mq->mq_len = 0;
- mq->mq_get = 0;
- mq->mq_put = 0;
- mq->mq_closed = 0;
- mq->mq_puterr = 0;
- mq->mq_geterr = 0;
- mq->mq_draining = 0;
- *mqp = mq;
+
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+ mq->mq_len = 0;
+ mq->mq_get = 0;
+ mq->mq_put = 0;
+ mq->mq_closed = 0;
+ mq->mq_puterr = 0;
+ mq->mq_geterr = 0;
+ *mqp = mq;
return (0);
}
@@ -92,7 +88,6 @@ nni_msgq_fini(nni_msgq *mq)
if (mq == NULL) {
return;
}
- nni_cv_fini(&mq->mq_drained);
nni_mtx_fini(&mq->mq_lock);
/* Free any orphaned messages. */
@@ -317,12 +312,6 @@ nni_msgq_run_notify(nni_msgq *mq)
}
mq->mq_cb_fn(mq->mq_cb_arg, flags);
}
-
- if (mq->mq_draining) {
- if ((mq->mq_len == 0) && !nni_list_empty(&mq->mq_aio_putq)) {
- nni_cv_wake(&mq->mq_drained);
- }
- }
}
void
@@ -438,40 +427,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
}
void
-nni_msgq_drain(nni_msgq *mq, nni_time expire)
-{
- nni_aio *aio;
-
- nni_mtx_lock(&mq->mq_lock);
- mq->mq_closed = 1;
- mq->mq_draining = 1;
- while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) {
- if (nni_cv_until(&mq->mq_drained, expire) != 0) {
- break;
- }
- }
-
- // Timed out or writers drained.
-
- // Complete the putq as NNG_ECLOSED.
- while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
-
- // Free any remaining messages in the queue.
- while (mq->mq_len > 0) {
- nni_msg *msg = mq->mq_msgs[mq->mq_get++];
- if (mq->mq_get >= mq->mq_alloc) {
- mq->mq_get = 0;
- }
- mq->mq_len--;
- nni_msg_free(msg);
- }
- nni_mtx_unlock(&mq->mq_lock);
-}
-
-void
nni_msgq_close(nni_msgq *mq)
{
nni_aio *aio;
@@ -585,7 +540,6 @@ nni_msgq_resize(nni_msgq *mq, int cap)
out:
// Wake everyone up -- we changed everything.
- nni_cv_wake(&mq->mq_drained);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}