diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-05 23:28:48 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-05 23:29:18 -0800 |
| commit | 884da789532be511245248206adb00696ae62d31 (patch) | |
| tree | bbef6489291ed464f6ba7429434da9026fbef72c /src/core | |
| parent | fb6550a242bb1742ec62202a99d0604ee9069795 (diff) | |
| download | nng-884da789532be511245248206adb00696ae62d31.tar.gz nng-884da789532be511245248206adb00696ae62d31.tar.bz2 nng-884da789532be511245248206adb00696ae62d31.zip | |
Bus protocol now callback-driven.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/msgqueue.c | 115 |
1 files changed, 50 insertions, 65 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 2673063d..dc3f84f9 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -344,7 +344,7 @@ nni_msgq_run_getq(nni_msgq *mq) if (mq->mq_len != 0) { nni_list_remove(&mq->mq_aio_getq, raio); msg = mq->mq_msgs[mq->mq_get++]; - if (mq->mq_get > mq->mq_cap) { + if (mq->mq_get == mq->mq_alloc) { mq->mq_get = 0; } mq->mq_len--; @@ -384,12 +384,15 @@ nni_msgq_run_notify(nni_msgq *mq) void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { + nni_time expire = aio->a_expire; + nni_mtx_lock(&mq->mq_lock); nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); - if (aio->a_expire < mq->mq_expire) { - mq->mq_expire = aio->a_expire; + + if (expire < mq->mq_expire) { + mq->mq_expire = expire; nni_timer_schedule(&mq->mq_timer, mq->mq_expire); } nni_mtx_unlock(&mq->mq_lock); @@ -399,12 +402,15 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { + nni_time expire = aio->a_expire; + nni_mtx_lock(&mq->mq_lock); nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); - if (aio->a_expire < mq->mq_expire) { - mq->mq_expire = aio->a_expire; + + if (expire < mq->mq_expire) { + mq->mq_expire = expire; nni_timer_schedule(&mq->mq_timer, mq->mq_expire); } nni_mtx_unlock(&mq->mq_lock); @@ -456,6 +462,44 @@ nni_msgq_canget(nni_msgq *mq) } +int +nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) +{ + nni_aio *raio; + size_t len = nni_msg_len(msg); + + nni_mtx_lock(&mq->mq_lock); + + // The presence of any blocked reader indicates that + // the queue is empty, otherwise it would have just taken + // data from the queue. + if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { + nni_list_remove(&mq->mq_aio_getq, raio); + + raio->a_msg = msg; + + nni_aio_finish(raio, 0, len); + nni_mtx_unlock(&mq->mq_lock); + return (0); + } + + // Otherwise if we have room in the buffer, just queue it. + if ((mq->mq_len < mq->mq_cap) || + ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) { + mq->mq_msgs[mq->mq_put++] = msg; + if (mq->mq_put == mq->mq_alloc) { + mq->mq_put = 0; + } + mq->mq_len++; + nni_mtx_unlock(&mq->mq_lock); + return (0); + } + + nni_mtx_unlock(&mq->mq_lock); + return (NNG_EAGAIN); +} + + void nni_msgq_run_timeout(void *arg) { @@ -683,19 +727,7 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp) } -#if 0 -int -nni_msgq_get(nni_msgq *mq, nni_msg **msgp) -{ - nni_signal nosig = 0; - - return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, &nosig)); -} - - -#endif - - +// XXX Remove this later. int nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) { @@ -703,18 +735,6 @@ nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) } -#if 0 -int -nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) -{ - nni_signal nosig = 0; - - return (nni_msgq_get_(mq, msgp, expire, &nosig)); -} - - -#endif - int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { @@ -740,28 +760,6 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg) } -#if 0 -int -nni_msgq_put(nni_msgq *mq, nni_msg *msg) -{ - nni_signal nosig = 0; - - return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, &nosig)); -} - - -#endif - - -int -nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) -{ - nni_signal nosig = 0; - - return (nni_msgq_put_(mq, msg, NNI_TIME_ZERO, &nosig)); -} - - int nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal) { @@ -769,19 +767,6 @@ nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal) } -#if 0 -int -nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) -{ - nni_signal nosig = 0; - - return (nni_msgq_put_(mq, msg, expire, &nosig)); -} - - -#endif - - void nni_msgq_drain(nni_msgq *mq, nni_time expire) { |
