aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/msgqueue.c133
1 files changed, 66 insertions, 67 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 3af4e081..ba6aa6bb 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -15,10 +15,10 @@
// side can close, and they may be closed more than once.
struct nni_msgqueue {
- nni_mutex mq_lock;
- nni_cond mq_readable;
- nni_cond mq_writeable;
- nni_cond mq_drained;
+ nni_mtx mq_lock;
+ nni_cv mq_readable;
+ nni_cv mq_writeable;
+ nni_cv mq_drained;
int mq_cap;
int mq_alloc; // alloc is cap + 2...
int mq_len;
@@ -51,31 +51,31 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap)
if ((mq = nni_alloc(sizeof (*mq))) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mutex_init(&mq->mq_lock)) != 0) {
+ if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
nni_free(mq, sizeof (*mq));
return (rv);
}
- if ((rv = nni_cond_init(&mq->mq_readable, &mq->mq_lock)) != 0) {
- nni_mutex_fini(&mq->mq_lock);
+ if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) {
+ nni_mtx_fini(&mq->mq_lock);
nni_free(mq, sizeof (*mq));
return (NNG_ENOMEM);
}
- if ((rv = nni_cond_init(&mq->mq_writeable, &mq->mq_lock)) != 0) {
- nni_cond_fini(&mq->mq_readable);
- nni_mutex_fini(&mq->mq_lock);
+ if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) {
+ nni_cv_fini(&mq->mq_readable);
+ nni_mtx_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
- if ((rv = nni_cond_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
- nni_cond_fini(&mq->mq_writeable);
- nni_cond_fini(&mq->mq_readable);
- nni_mutex_fini(&mq->mq_lock);
+ if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
+ nni_cv_fini(&mq->mq_writeable);
+ nni_cv_fini(&mq->mq_readable);
+ nni_mtx_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) {
- nni_cond_fini(&mq->mq_drained);
- nni_cond_fini(&mq->mq_writeable);
- nni_cond_fini(&mq->mq_readable);
- nni_mutex_fini(&mq->mq_lock);
+ nni_cv_fini(&mq->mq_drained);
+ nni_cv_fini(&mq->mq_writeable);
+ nni_cv_fini(&mq->mq_readable);
+ nni_mtx_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
@@ -96,10 +96,10 @@ nni_msgqueue_destroy(nni_msgqueue *mq)
{
nni_msg *msg;
- nni_cond_fini(&mq->mq_drained);
- nni_cond_fini(&mq->mq_writeable);
- nni_cond_fini(&mq->mq_readable);
- nni_mutex_fini(&mq->mq_lock);
+ nni_cv_fini(&mq->mq_drained);
+ nni_cv_fini(&mq->mq_writeable);
+ nni_cv_fini(&mq->mq_readable);
+ nni_mtx_fini(&mq->mq_lock);
/* Free any orphaned messages. */
while (mq->mq_len > 0) {
@@ -123,13 +123,13 @@ nni_msgqueue_destroy(nni_msgqueue *mq)
void
nni_msgqueue_signal(nni_msgqueue *mq, int *signal)
{
- nni_mutex_enter(&mq->mq_lock);
+ nni_mtx_lock(&mq->mq_lock);
*signal = 1;
// We have to wake everyone.
- nni_cond_broadcast(&mq->mq_readable);
- nni_cond_broadcast(&mq->mq_writeable);
- nni_mutex_exit(&mq->mq_lock);
+ nni_cv_wake(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_writeable);
+ nni_mtx_unlock(&mq->mq_lock);
}
@@ -139,12 +139,12 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
{
int rv;
- nni_mutex_enter(&mq->mq_lock);
+ nni_mtx_lock(&mq->mq_lock);
for (;;) {
// if closed, we don't put more... this check is first!
if (mq->mq_closed) {
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (NNG_ECLOSED);
}
@@ -162,22 +162,22 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
// interrupted?
if (*signal) {
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (NNG_EINTR);
}
// single poll?
if (expire == NNI_TIME_ZERO) {
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (NNG_EAGAIN);
}
// not writeable, so wait until something changes
mq->mq_wwait++;
- rv = nni_cond_waituntil(&mq->mq_writeable, expire);
+ rv = nni_cv_until(&mq->mq_writeable, expire);
mq->mq_wwait--;
if (rv == NNG_ETIMEDOUT) {
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (NNG_ETIMEDOUT);
}
}
@@ -191,9 +191,9 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
}
mq->mq_len++;
if (mq->mq_rwait) {
- nni_cond_broadcast(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_readable);
}
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (0);
}
@@ -204,17 +204,17 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
int
nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg)
{
- nni_mutex_enter(&mq->mq_lock);
+ nni_mtx_lock(&mq->mq_lock);
// if closed, we don't put more... this check is first!
if (mq->mq_closed) {
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (NNG_ECLOSED);
}
// room in the queue?
if (mq->mq_len >= mq->mq_cap) {
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (NNG_EAGAIN);
}
@@ -226,9 +226,9 @@ nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg)
mq->mq_msgs[mq->mq_get] = msg;
mq->mq_len++;
if (mq->mq_rwait) {
- nni_cond_broadcast(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_readable);
}
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (0);
}
@@ -239,7 +239,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
{
int rv;
- nni_mutex_enter(&mq->mq_lock);
+ nni_mtx_lock(&mq->mq_lock);
for (;;) {
// always prefer to deliver data if its there
@@ -247,26 +247,26 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
break;
}
if (mq->mq_closed) {
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (NNG_ECLOSED);
}
if (expire == NNI_TIME_ZERO) {
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (NNG_EAGAIN);
}
if (*signal) {
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (NNG_EINTR);
}
if ((mq->mq_cap == 0) & (mq->mq_wwait)) {
// let a write waiter know we are ready
- nni_cond_broadcast(&mq->mq_writeable);
+ nni_cv_wake(&mq->mq_writeable);
}
mq->mq_rwait++;
- rv = nni_cond_waituntil(&mq->mq_readable, expire);
+ rv = nni_cv_until(&mq->mq_readable, expire);
mq->mq_rwait--;
if (rv == NNG_ETIMEDOUT) {
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (NNG_ETIMEDOUT);
}
}
@@ -280,9 +280,9 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
mq->mq_get = 0;
}
if (mq->mq_wwait) {
- nni_cond_broadcast(&mq->mq_writeable);
+ nni_cv_wake(&mq->mq_writeable);
}
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (0);
}
@@ -340,13 +340,12 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire)
void
nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire)
{
- nni_mutex_enter(&mq->mq_lock);
+ nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
- nni_cond_broadcast(&mq->mq_writeable);
- nni_cond_broadcast(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_writeable);
+ nni_cv_wake(&mq->mq_readable);
while (mq->mq_len > 0) {
- if (nni_cond_waituntil(&mq->mq_drained, expire) ==
- NNG_ETIMEDOUT) {
+ if (nni_cv_until(&mq->mq_drained, expire) == NNG_ETIMEDOUT) {
break;
}
}
@@ -359,17 +358,17 @@ nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire)
mq->mq_len--;
nni_msg_free(msg);
}
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
}
void
nni_msgqueue_close(nni_msgqueue *mq)
{
- nni_mutex_enter(&mq->mq_lock);
+ nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
- nni_cond_broadcast(&mq->mq_writeable);
- nni_cond_broadcast(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_writeable);
+ nni_cv_wake(&mq->mq_readable);
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
@@ -380,7 +379,7 @@ nni_msgqueue_close(nni_msgqueue *mq)
mq->mq_len--;
nni_msg_free(msg);
}
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
}
@@ -389,9 +388,9 @@ nni_msgqueue_len(nni_msgqueue *mq)
{
int rv;
- nni_mutex_enter(&mq->mq_lock);
+ nni_mtx_lock(&mq->mq_lock);
rv = mq->mq_len;
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (rv);
}
@@ -401,9 +400,9 @@ nni_msgqueue_cap(nni_msgqueue *mq)
{
int rv;
- nni_mutex_enter(&mq->mq_lock);
+ nni_mtx_lock(&mq->mq_lock);
rv = mq->mq_cap;
- nni_mutex_exit(&mq->mq_lock);
+ nni_mtx_unlock(&mq->mq_lock);
return (rv);
}
@@ -431,7 +430,7 @@ nni_msgqueue_resize(nni_msgqueue *mq, int cap)
newq = NULL;
}
- nni_mutex_enter(&mq->mq_lock);
+ nni_mtx_lock(&mq->mq_lock);
while (mq->mq_len > (cap + 1)) {
// too many messages -- we allow that one for
// the case of pushback or cap == 0.
@@ -476,9 +475,9 @@ nni_msgqueue_resize(nni_msgqueue *mq, int cap)
out:
// Wake everyone up -- we changed everything.
- nni_cond_broadcast(&mq->mq_readable);
- nni_cond_broadcast(&mq->mq_writeable);
- nni_cond_broadcast(&mq->mq_drained);
- nni_mutex_exit(&mq->mq_lock);
+ nni_cv_wake(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_writeable);
+ nni_cv_wake(&mq->mq_drained);
+ nni_mtx_unlock(&mq->mq_lock);
return (0);
}