aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-05 23:28:48 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-05 23:29:18 -0800
commit884da789532be511245248206adb00696ae62d31 (patch)
treebbef6489291ed464f6ba7429434da9026fbef72c /src/core/msgqueue.c
parentfb6550a242bb1742ec62202a99d0604ee9069795 (diff)
downloadnng-884da789532be511245248206adb00696ae62d31.tar.gz
nng-884da789532be511245248206adb00696ae62d31.tar.bz2
nng-884da789532be511245248206adb00696ae62d31.zip
Bus protocol now callback-driven.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c115
1 files changed, 50 insertions, 65 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 2673063d..dc3f84f9 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -344,7 +344,7 @@ nni_msgq_run_getq(nni_msgq *mq)
if (mq->mq_len != 0) {
nni_list_remove(&mq->mq_aio_getq, raio);
msg = mq->mq_msgs[mq->mq_get++];
- if (mq->mq_get > mq->mq_cap) {
+ if (mq->mq_get == mq->mq_alloc) {
mq->mq_get = 0;
}
mq->mq_len--;
@@ -384,12 +384,15 @@ nni_msgq_run_notify(nni_msgq *mq)
void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
+ nni_time expire = aio->a_expire;
+
nni_mtx_lock(&mq->mq_lock);
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
- if (aio->a_expire < mq->mq_expire) {
- mq->mq_expire = aio->a_expire;
+
+ if (expire < mq->mq_expire) {
+ mq->mq_expire = expire;
nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
}
nni_mtx_unlock(&mq->mq_lock);
@@ -399,12 +402,15 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
+ nni_time expire = aio->a_expire;
+
nni_mtx_lock(&mq->mq_lock);
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
- if (aio->a_expire < mq->mq_expire) {
- mq->mq_expire = aio->a_expire;
+
+ if (expire < mq->mq_expire) {
+ mq->mq_expire = expire;
nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
}
nni_mtx_unlock(&mq->mq_lock);
@@ -456,6 +462,44 @@ nni_msgq_canget(nni_msgq *mq)
}
+int
+nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
+{
+ nni_aio *raio;
+ size_t len = nni_msg_len(msg);
+
+ nni_mtx_lock(&mq->mq_lock);
+
+ // The presence of any blocked reader indicates that
+ // the queue is empty, otherwise it would have just taken
+ // data from the queue.
+ if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
+ nni_list_remove(&mq->mq_aio_getq, raio);
+
+ raio->a_msg = msg;
+
+ nni_aio_finish(raio, 0, len);
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+ }
+
+ // Otherwise if we have room in the buffer, just queue it.
+ if ((mq->mq_len < mq->mq_cap) ||
+ ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) {
+ mq->mq_msgs[mq->mq_put++] = msg;
+ if (mq->mq_put == mq->mq_alloc) {
+ mq->mq_put = 0;
+ }
+ mq->mq_len++;
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+ }
+
+ nni_mtx_unlock(&mq->mq_lock);
+ return (NNG_EAGAIN);
+}
+
+
void
nni_msgq_run_timeout(void *arg)
{
@@ -683,19 +727,7 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
}
-#if 0
-int
-nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
-{
- nni_signal nosig = 0;
-
- return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, &nosig));
-}
-
-
-#endif
-
-
+// XXX Remove this later.
int
nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal)
{
@@ -703,18 +735,6 @@ nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal)
}
-#if 0
-int
-nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
-{
- nni_signal nosig = 0;
-
- return (nni_msgq_get_(mq, msgp, expire, &nosig));
-}
-
-
-#endif
-
int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
@@ -740,28 +760,6 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg)
}
-#if 0
-int
-nni_msgq_put(nni_msgq *mq, nni_msg *msg)
-{
- nni_signal nosig = 0;
-
- return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, &nosig));
-}
-
-
-#endif
-
-
-int
-nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
-{
- nni_signal nosig = 0;
-
- return (nni_msgq_put_(mq, msg, NNI_TIME_ZERO, &nosig));
-}
-
-
int
nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal)
{
@@ -769,19 +767,6 @@ nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal)
}
-#if 0
-int
-nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
-{
- nni_signal nosig = 0;
-
- return (nni_msgq_put_(mq, msg, expire, &nosig));
-}
-
-
-#endif
-
-
void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{