diff options
| -rw-r--r-- | src/core/msgqueue.c | 22 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 5 |
2 files changed, 17 insertions, 10 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 5f4bd333..3ec194fc 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -147,6 +147,7 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) nni_mtx_lock(&mq->mq_lock); mq->mq_puterr = error; if (error) { + mq->mq_wwait = 0; nni_cv_wake(&mq->mq_writeable); } nni_mtx_unlock(&mq->mq_lock); @@ -159,6 +160,7 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) nni_mtx_lock(&mq->mq_lock); mq->mq_geterr = error; if (error) { + mq->mq_rwait = 0; nni_cv_wake(&mq->mq_readable); } nni_mtx_unlock(&mq->mq_lock); @@ -172,6 +174,8 @@ nni_msgq_set_error(nni_msgq *mq, int error) mq->mq_geterr = error; mq->mq_puterr = error; if (error) { + mq->mq_rwait = 0; + mq->mq_wwait = 0; nni_cv_wake(&mq->mq_readable); nni_cv_wake(&mq->mq_writeable); } @@ -189,6 +193,8 @@ nni_msgq_signal(nni_msgq *mq, int *signal) *signal = 1; // We have to wake everyone. + mq->mq_rwait = 0; + mq->mq_wwait = 0; nni_cv_wake(&mq->mq_readable); nni_cv_wake(&mq->mq_writeable); nni_mtx_unlock(&mq->mq_lock); @@ -242,9 +248,8 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) } // not writeable, so wait until something changes - mq->mq_wwait++; + mq->mq_wwait = 1; rv = nni_cv_until(&mq->mq_writeable, expire); - mq->mq_wwait--; if (rv == NNG_ETIMEDOUT) { nni_mtx_unlock(&mq->mq_lock); return (NNG_ETIMEDOUT); @@ -259,6 +264,7 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) } mq->mq_len++; if (mq->mq_rwait) { + mq->mq_rwait = 0; nni_cv_wake(&mq->mq_readable); } notify = NNI_MSGQ_NOTIFY_CANGET; @@ -309,6 +315,7 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg) mq->mq_msgs[mq->mq_get] = msg; mq->mq_len++; if (mq->mq_rwait) { + mq->mq_rwait = 0; nni_cv_wake(&mq->mq_readable); } @@ -359,13 +366,13 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) nni_mtx_unlock(&mq->mq_lock); return (NNG_EINTR); } - if ((mq->mq_cap == 0) & (mq->mq_wwait)) { + if ((mq->mq_cap == 0) && (mq->mq_wwait)) { // let a write waiter know we are ready + mq->mq_wwait = 0; nni_cv_wake(&mq->mq_writeable); } - mq->mq_rwait++; + mq->mq_rwait = 1; rv = nni_cv_until(&mq->mq_readable, expire); - mq->mq_rwait--; if (rv == NNG_ETIMEDOUT) { nni_mtx_unlock(&mq->mq_lock); return (NNG_ETIMEDOUT); @@ -381,6 +388,7 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) mq->mq_get = 0; } if (mq->mq_wwait) { + mq->mq_wwait = 0; nni_cv_wake(&mq->mq_writeable); } notify = NNI_MSGQ_NOTIFY_CANPUT; @@ -463,6 +471,8 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) { nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; + mq->mq_wwait = 0; + mq->mq_rwait = 0; nni_cv_wake(&mq->mq_writeable); nni_cv_wake(&mq->mq_readable); while (mq->mq_len > 0) { @@ -488,6 +498,8 @@ nni_msgq_close(nni_msgq *mq) { nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; + mq->mq_wwait = 0; + mq->mq_rwait = 0; nni_cv_wake(&mq->mq_writeable); nni_cv_wake(&mq->mq_readable); diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 113dd9ea..fddc25da 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -146,11 +146,6 @@ nni_plat_cv_until(nni_plat_cv *cv, nni_time until) rv = pthread_cond_timedwait(&cv->cv, cv->mtx, &ts); if (rv == ETIMEDOUT) { - if (nni_clock() < until) { - // Buggy pthreads implementation!! Seen with - // CLOCK_MONOTONIC on macOS Sierra. - nni_panic("nni_plat_cv_until: Premature wake up!"); - } return (NNG_ETIMEDOUT); } else if (rv != 0) { nni_panic("pthread_cond_timedwait: %d", rv); |
