diff options
| -rw-r--r-- | src/core/msgqueue.c | 133 |
1 files changed, 66 insertions, 67 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 3af4e081..ba6aa6bb 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -15,10 +15,10 @@ // side can close, and they may be closed more than once. struct nni_msgqueue { - nni_mutex mq_lock; - nni_cond mq_readable; - nni_cond mq_writeable; - nni_cond mq_drained; + nni_mtx mq_lock; + nni_cv mq_readable; + nni_cv mq_writeable; + nni_cv mq_drained; int mq_cap; int mq_alloc; // alloc is cap + 2... int mq_len; @@ -51,31 +51,31 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap) if ((mq = nni_alloc(sizeof (*mq))) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mutex_init(&mq->mq_lock)) != 0) { + if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { nni_free(mq, sizeof (*mq)); return (rv); } - if ((rv = nni_cond_init(&mq->mq_readable, &mq->mq_lock)) != 0) { - nni_mutex_fini(&mq->mq_lock); + if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) { + nni_mtx_fini(&mq->mq_lock); nni_free(mq, sizeof (*mq)); return (NNG_ENOMEM); } - if ((rv = nni_cond_init(&mq->mq_writeable, &mq->mq_lock)) != 0) { - nni_cond_fini(&mq->mq_readable); - nni_mutex_fini(&mq->mq_lock); + if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) { + nni_cv_fini(&mq->mq_readable); + nni_mtx_fini(&mq->mq_lock); return (NNG_ENOMEM); } - if ((rv = nni_cond_init(&mq->mq_drained, &mq->mq_lock)) != 0) { - nni_cond_fini(&mq->mq_writeable); - nni_cond_fini(&mq->mq_readable); - nni_mutex_fini(&mq->mq_lock); + if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) { + nni_cv_fini(&mq->mq_writeable); + nni_cv_fini(&mq->mq_readable); + nni_mtx_fini(&mq->mq_lock); return (NNG_ENOMEM); } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) { - nni_cond_fini(&mq->mq_drained); - nni_cond_fini(&mq->mq_writeable); - nni_cond_fini(&mq->mq_readable); - nni_mutex_fini(&mq->mq_lock); + nni_cv_fini(&mq->mq_drained); + nni_cv_fini(&mq->mq_writeable); + nni_cv_fini(&mq->mq_readable); + nni_mtx_fini(&mq->mq_lock); return (NNG_ENOMEM); } @@ -96,10 +96,10 @@ nni_msgqueue_destroy(nni_msgqueue *mq) { nni_msg *msg; - nni_cond_fini(&mq->mq_drained); - nni_cond_fini(&mq->mq_writeable); - nni_cond_fini(&mq->mq_readable); - nni_mutex_fini(&mq->mq_lock); + nni_cv_fini(&mq->mq_drained); + nni_cv_fini(&mq->mq_writeable); + nni_cv_fini(&mq->mq_readable); + nni_mtx_fini(&mq->mq_lock); /* Free any orphaned messages. */ while (mq->mq_len > 0) { @@ -123,13 +123,13 @@ nni_msgqueue_destroy(nni_msgqueue *mq) void nni_msgqueue_signal(nni_msgqueue *mq, int *signal) { - nni_mutex_enter(&mq->mq_lock); + nni_mtx_lock(&mq->mq_lock); *signal = 1; // We have to wake everyone. - nni_cond_broadcast(&mq->mq_readable); - nni_cond_broadcast(&mq->mq_writeable); - nni_mutex_exit(&mq->mq_lock); + nni_cv_wake(&mq->mq_readable); + nni_cv_wake(&mq->mq_writeable); + nni_mtx_unlock(&mq->mq_lock); } @@ -139,12 +139,12 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, { int rv; - nni_mutex_enter(&mq->mq_lock); + nni_mtx_lock(&mq->mq_lock); for (;;) { // if closed, we don't put more... this check is first! if (mq->mq_closed) { - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (NNG_ECLOSED); } @@ -162,22 +162,22 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, // interrupted? if (*signal) { - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (NNG_EINTR); } // single poll? if (expire == NNI_TIME_ZERO) { - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (NNG_EAGAIN); } // not writeable, so wait until something changes mq->mq_wwait++; - rv = nni_cond_waituntil(&mq->mq_writeable, expire); + rv = nni_cv_until(&mq->mq_writeable, expire); mq->mq_wwait--; if (rv == NNG_ETIMEDOUT) { - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (NNG_ETIMEDOUT); } } @@ -191,9 +191,9 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, } mq->mq_len++; if (mq->mq_rwait) { - nni_cond_broadcast(&mq->mq_readable); + nni_cv_wake(&mq->mq_readable); } - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (0); } @@ -204,17 +204,17 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, int nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg) { - nni_mutex_enter(&mq->mq_lock); + nni_mtx_lock(&mq->mq_lock); // if closed, we don't put more... this check is first! if (mq->mq_closed) { - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (NNG_ECLOSED); } // room in the queue? if (mq->mq_len >= mq->mq_cap) { - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (NNG_EAGAIN); } @@ -226,9 +226,9 @@ nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg) mq->mq_msgs[mq->mq_get] = msg; mq->mq_len++; if (mq->mq_rwait) { - nni_cond_broadcast(&mq->mq_readable); + nni_cv_wake(&mq->mq_readable); } - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (0); } @@ -239,7 +239,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, { int rv; - nni_mutex_enter(&mq->mq_lock); + nni_mtx_lock(&mq->mq_lock); for (;;) { // always prefer to deliver data if its there @@ -247,26 +247,26 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, break; } if (mq->mq_closed) { - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (NNG_ECLOSED); } if (expire == NNI_TIME_ZERO) { - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (NNG_EAGAIN); } if (*signal) { - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (NNG_EINTR); } if ((mq->mq_cap == 0) & (mq->mq_wwait)) { // let a write waiter know we are ready - nni_cond_broadcast(&mq->mq_writeable); + nni_cv_wake(&mq->mq_writeable); } mq->mq_rwait++; - rv = nni_cond_waituntil(&mq->mq_readable, expire); + rv = nni_cv_until(&mq->mq_readable, expire); mq->mq_rwait--; if (rv == NNG_ETIMEDOUT) { - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (NNG_ETIMEDOUT); } } @@ -280,9 +280,9 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, mq->mq_get = 0; } if (mq->mq_wwait) { - nni_cond_broadcast(&mq->mq_writeable); + nni_cv_wake(&mq->mq_writeable); } - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (0); } @@ -340,13 +340,12 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire) void nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire) { - nni_mutex_enter(&mq->mq_lock); + nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; - nni_cond_broadcast(&mq->mq_writeable); - nni_cond_broadcast(&mq->mq_readable); + nni_cv_wake(&mq->mq_writeable); + nni_cv_wake(&mq->mq_readable); while (mq->mq_len > 0) { - if (nni_cond_waituntil(&mq->mq_drained, expire) == - NNG_ETIMEDOUT) { + if (nni_cv_until(&mq->mq_drained, expire) == NNG_ETIMEDOUT) { break; } } @@ -359,17 +358,17 @@ nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire) mq->mq_len--; nni_msg_free(msg); } - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); } void nni_msgqueue_close(nni_msgqueue *mq) { - nni_mutex_enter(&mq->mq_lock); + nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; - nni_cond_broadcast(&mq->mq_writeable); - nni_cond_broadcast(&mq->mq_readable); + nni_cv_wake(&mq->mq_writeable); + nni_cv_wake(&mq->mq_readable); // Free the messages orphaned in the queue. while (mq->mq_len > 0) { @@ -380,7 +379,7 @@ nni_msgqueue_close(nni_msgqueue *mq) mq->mq_len--; nni_msg_free(msg); } - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); } @@ -389,9 +388,9 @@ nni_msgqueue_len(nni_msgqueue *mq) { int rv; - nni_mutex_enter(&mq->mq_lock); + nni_mtx_lock(&mq->mq_lock); rv = mq->mq_len; - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (rv); } @@ -401,9 +400,9 @@ nni_msgqueue_cap(nni_msgqueue *mq) { int rv; - nni_mutex_enter(&mq->mq_lock); + nni_mtx_lock(&mq->mq_lock); rv = mq->mq_cap; - nni_mutex_exit(&mq->mq_lock); + nni_mtx_unlock(&mq->mq_lock); return (rv); } @@ -431,7 +430,7 @@ nni_msgqueue_resize(nni_msgqueue *mq, int cap) newq = NULL; } - nni_mutex_enter(&mq->mq_lock); + nni_mtx_lock(&mq->mq_lock); while (mq->mq_len > (cap + 1)) { // too many messages -- we allow that one for // the case of pushback or cap == 0. @@ -476,9 +475,9 @@ nni_msgqueue_resize(nni_msgqueue *mq, int cap) out: // Wake everyone up -- we changed everything. - nni_cond_broadcast(&mq->mq_readable); - nni_cond_broadcast(&mq->mq_writeable); - nni_cond_broadcast(&mq->mq_drained); - nni_mutex_exit(&mq->mq_lock); + nni_cv_wake(&mq->mq_readable); + nni_cv_wake(&mq->mq_writeable); + nni_cv_wake(&mq->mq_drained); + nni_mtx_unlock(&mq->mq_lock); return (0); } |
