summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-04 02:46:40 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-04 02:46:40 -0800
commitfb6550a242bb1742ec62202a99d0604ee9069795 (patch)
treebd235a8ddf6766dc54c3e47b31dbc7a59802d5da /src/core/msgqueue.c
parentc17a1dd3f5333da59355ecc3f8788a0396a8f72d (diff)
downloadnng-fb6550a242bb1742ec62202a99d0604ee9069795.tar.gz
nng-fb6550a242bb1742ec62202a99d0604ee9069795.tar.bz2
nng-fb6550a242bb1742ec62202a99d0604ee9069795.zip
Pipeline protocol now entirely callback driven.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c168
1 files changed, 160 insertions, 8 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 8b9a7e1a..2673063d 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -39,6 +39,9 @@ struct nni_msgq {
nni_list mq_aio_putq;
nni_list mq_aio_getq;
+
+ nni_timer_node mq_timer;
+ nni_time mq_expire;
};
@@ -89,6 +92,8 @@ nni_msgq_kick(nni_msgq *mq, int sig)
}
+static void nni_msgq_run_timeout(void *);
+
int
nni_msgq_init(nni_msgq **mqp, int cap)
{
@@ -127,6 +132,8 @@ nni_msgq_init(nni_msgq **mqp, int cap)
goto fail;
}
+ nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq);
+
mq->mq_cap = cap;
mq->mq_alloc = alloc;
mq->mq_len = 0;
@@ -139,6 +146,7 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_rwait = 0;
mq->mq_notify_fn = NULL;
mq->mq_notify_arg = NULL;
+ mq->mq_expire = NNI_TIME_NEVER;
*mqp = mq;
return (0);
@@ -287,7 +295,7 @@ nni_msgq_run_putq(nni_msgq *mq)
nni_list_remove(&mq->mq_aio_getq, raio);
nni_list_remove(&mq->mq_aio_putq, waio);
- *raio->a_msgp = msg;
+ raio->a_msg = msg;
waio->a_msg = NULL;
nni_aio_finish(raio, 0, len);
@@ -296,7 +304,8 @@ nni_msgq_run_putq(nni_msgq *mq)
}
// Otherwise if we have room in the buffer, just queue it.
- if (mq->mq_len < mq->mq_cap) {
+ if ((mq->mq_len < mq->mq_cap) ||
+ ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) {
nni_list_remove(&mq->mq_aio_putq, waio);
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
@@ -311,6 +320,14 @@ nni_msgq_run_putq(nni_msgq *mq)
// Unable to make progress, leave the aio where it is.
break;
}
+
+ // XXX: REMOVE ME WHEN WE GO COMPLETELY ASYNC.
+ if (mq->mq_len != 0) {
+ nni_cv_wake(&mq->mq_readable);
+ }
+ if (mq->mq_len < mq->mq_cap) {
+ nni_cv_wake(&mq->mq_writeable);
+ }
}
@@ -325,13 +342,14 @@ nni_msgq_run_getq(nni_msgq *mq)
while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
// If anything is waiting in the queue, get it first.
if (mq->mq_len != 0) {
+ nni_list_remove(&mq->mq_aio_getq, raio);
msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get > mq->mq_cap) {
mq->mq_get = 0;
}
mq->mq_len--;
len = nni_msg_len(msg);
- *(raio->a_msgp) = msg;
+ raio->a_msg = msg;
nni_aio_finish(raio, 0, len);
continue;
}
@@ -344,7 +362,7 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = waio->a_msg;
len = nni_msg_len(msg);
waio->a_msg = NULL;
- *raio->a_msgp = msg;
+ raio->a_msg = msg;
nni_aio_finish(raio, 0, len);
nni_aio_finish(waio, 0, len);
continue;
@@ -363,39 +381,93 @@ nni_msgq_run_notify(nni_msgq *mq)
}
-int
+void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
+ if (aio->a_expire < mq->mq_expire) {
+ mq->mq_expire = aio->a_expire;
+ nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ }
nni_mtx_unlock(&mq->mq_lock);
- return (0);
}
-int
+void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
+ if (aio->a_expire < mq->mq_expire) {
+ mq->mq_expire = aio->a_expire;
+ nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
+nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ // NB: nni_list_active and nni_list_remove only use the list structure
+ // to determine list node offsets. Otherwise, they only look at the
+ // node's linkage structure. Therefore the following check will remove
+ // the node from either the getq or the putq list.
+ if (nni_list_active(&mq->mq_aio_getq, aio)) {
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+int
+nni_msgq_canput(nni_msgq *mq)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if ((mq->mq_len < mq->mq_cap) ||
+ (mq->mq_rwait != 0) || // XXX: REMOVE ME
+ (nni_list_first(&mq->mq_aio_getq) != NULL)) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (1);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+}
+
+
+int
+nni_msgq_canget(nni_msgq *mq)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if ((mq->mq_len != 0) ||
+ (mq->mq_wwait != 0) ||
+ (nni_list_first(&mq->mq_aio_putq) != NULL)) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (1);
+ }
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
void
-nni_msgq_run_timeout(nni_msgq *mq)
+nni_msgq_run_timeout(void *arg)
{
+ nni_msgq *mq = arg;
nni_time now;
+ nni_time exp;
nni_aio *aio;
nni_aio *naio;
int rv;
now = nni_clock();
+ exp = NNI_TIME_NEVER;
nni_mtx_lock(&mq->mq_lock);
naio = nni_list_first(&mq->mq_aio_getq);
@@ -405,6 +477,9 @@ nni_msgq_run_timeout(nni_msgq *mq)
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
}
+ if (exp > aio->a_expire) {
+ exp = aio->a_expire;
+ }
}
naio = nni_list_first(&mq->mq_aio_putq);
@@ -414,8 +489,15 @@ nni_msgq_run_timeout(nni_msgq *mq)
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
}
+ if (exp > aio->a_expire) {
+ exp = aio->a_expire;
+ }
}
+ mq->mq_expire = exp;
+ if (mq->mq_expire != NNI_TIME_NEVER) {
+ nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ }
nni_mtx_unlock(&mq->mq_lock);
}
@@ -495,6 +577,7 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
}
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
+ nni_msgq_run_getq(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
@@ -565,6 +648,7 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
}
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
+ nni_msgq_run_putq(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
@@ -572,6 +656,35 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
int
+nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
+{
+ nni_aio aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ aio.a_expire = expire;
+ nni_msgq_aio_get(mq, &aio);
+ nni_aio_wait(&aio);
+ if ((rv = nni_aio_result(&aio)) == 0) {
+ *msgp = aio.a_msg;
+ aio.a_msg = NULL;
+ }
+ nni_aio_fini(&aio);
+ return (rv);
+}
+
+
+int
+nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
+{
+ return (nni_msgq_get_until(mq, msgp, NNI_TIME_NEVER));
+}
+
+
+#if 0
+int
nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
{
nni_signal nosig = 0;
@@ -580,6 +693,9 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
}
+#endif
+
+
int
nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal)
{
@@ -587,6 +703,7 @@ nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal)
}
+#if 0
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
@@ -596,6 +713,34 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
}
+#endif
+
+int
+nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
+{
+ nni_aio aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ aio.a_expire = expire;
+ aio.a_msg = msg;
+ nni_msgq_aio_put(mq, &aio);
+ nni_aio_wait(&aio);
+ nni_aio_fini(&aio);
+ return (rv);
+}
+
+
+int
+nni_msgq_put(nni_msgq *mq, nni_msg *msg)
+{
+ return (nni_msgq_put_until(mq, msg, NNI_TIME_NEVER));
+}
+
+
+#if 0
int
nni_msgq_put(nni_msgq *mq, nni_msg *msg)
{
@@ -605,6 +750,9 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg)
}
+#endif
+
+
int
nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
{
@@ -621,6 +769,7 @@ nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal)
}
+#if 0
int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
@@ -630,6 +779,9 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
}
+#endif
+
+
void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{