From 884da789532be511245248206adb00696ae62d31 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 5 Mar 2017 23:28:48 -0800 Subject: Bus protocol now callback-driven. --- src/core/msgqueue.c | 115 +++++++++++++++++++++++----------------------------- 1 file changed, 50 insertions(+), 65 deletions(-) (limited to 'src/core/msgqueue.c') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 2673063d..dc3f84f9 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -344,7 +344,7 @@ nni_msgq_run_getq(nni_msgq *mq) if (mq->mq_len != 0) { nni_list_remove(&mq->mq_aio_getq, raio); msg = mq->mq_msgs[mq->mq_get++]; - if (mq->mq_get > mq->mq_cap) { + if (mq->mq_get == mq->mq_alloc) { mq->mq_get = 0; } mq->mq_len--; @@ -384,12 +384,15 @@ nni_msgq_run_notify(nni_msgq *mq) void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { + nni_time expire = aio->a_expire; + nni_mtx_lock(&mq->mq_lock); nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); - if (aio->a_expire < mq->mq_expire) { - mq->mq_expire = aio->a_expire; + + if (expire < mq->mq_expire) { + mq->mq_expire = expire; nni_timer_schedule(&mq->mq_timer, mq->mq_expire); } nni_mtx_unlock(&mq->mq_lock); @@ -399,12 +402,15 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { + nni_time expire = aio->a_expire; + nni_mtx_lock(&mq->mq_lock); nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); - if (aio->a_expire < mq->mq_expire) { - mq->mq_expire = aio->a_expire; + + if (expire < mq->mq_expire) { + mq->mq_expire = expire; nni_timer_schedule(&mq->mq_timer, mq->mq_expire); } nni_mtx_unlock(&mq->mq_lock); @@ -456,6 +462,44 @@ nni_msgq_canget(nni_msgq *mq) } +int +nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) +{ + nni_aio *raio; + size_t len = nni_msg_len(msg); + + nni_mtx_lock(&mq->mq_lock); + + // The presence of any blocked reader indicates that + // the queue is empty, otherwise it would have just taken + // data from the queue. + if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { + nni_list_remove(&mq->mq_aio_getq, raio); + + raio->a_msg = msg; + + nni_aio_finish(raio, 0, len); + nni_mtx_unlock(&mq->mq_lock); + return (0); + } + + // Otherwise if we have room in the buffer, just queue it. + if ((mq->mq_len < mq->mq_cap) || + ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) { + mq->mq_msgs[mq->mq_put++] = msg; + if (mq->mq_put == mq->mq_alloc) { + mq->mq_put = 0; + } + mq->mq_len++; + nni_mtx_unlock(&mq->mq_lock); + return (0); + } + + nni_mtx_unlock(&mq->mq_lock); + return (NNG_EAGAIN); +} + + void nni_msgq_run_timeout(void *arg) { @@ -683,19 +727,7 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp) } -#if 0 -int -nni_msgq_get(nni_msgq *mq, nni_msg **msgp) -{ - nni_signal nosig = 0; - - return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, &nosig)); -} - - -#endif - - +// XXX Remove this later. int nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) { @@ -703,18 +735,6 @@ nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) } -#if 0 -int -nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) -{ - nni_signal nosig = 0; - - return (nni_msgq_get_(mq, msgp, expire, &nosig)); -} - - -#endif - int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { @@ -740,28 +760,6 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg) } -#if 0 -int -nni_msgq_put(nni_msgq *mq, nni_msg *msg) -{ - nni_signal nosig = 0; - - return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, &nosig)); -} - - -#endif - - -int -nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) -{ - nni_signal nosig = 0; - - return (nni_msgq_put_(mq, msg, NNI_TIME_ZERO, &nosig)); -} - - int nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal) { @@ -769,19 +767,6 @@ nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal) } -#if 0 -int -nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) -{ - nni_signal nosig = 0; - - return (nni_msgq_put_(mq, msg, expire, &nosig)); -} - - -#endif - - void nni_msgq_drain(nni_msgq *mq, nni_time expire) { -- cgit v1.2.3-70-g09d2