summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-01 21:18:58 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-01 21:18:58 -0800
commit97614393e450b6c6813021f0e733b864a6265872 (patch)
tree31c8d4d0b2206e944f797b291660d8cf52eebea0 /src/core/msgqueue.c
parent5436cc9e1619bf1dd80fabb568aac344ae65d406 (diff)
downloadnng-97614393e450b6c6813021f0e733b864a6265872.tar.gz
nng-97614393e450b6c6813021f0e733b864a6265872.tar.bz2
nng-97614393e450b6c6813021f0e733b864a6265872.zip
Start of msgq aio.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c127
1 files changed, 127 insertions, 0 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index b09bb817..66f246f8 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -36,6 +36,9 @@ struct nni_msgq {
nni_thr mq_notify_thr;
nni_msgq_notify_fn mq_notify_fn;
void * mq_notify_arg;
+
+ nni_list mq_aio_putq;
+ nni_list mq_aio_getq;
};
@@ -107,6 +110,9 @@ nni_msgq_init(nni_msgq **mqp, int cap)
if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) {
return (NNG_ENOMEM);
}
+ NNI_LIST_INIT(&mq->mq_aio_putq, nni_aio, a_prov_node);
+ NNI_LIST_INIT(&mq->mq_aio_getq, nni_aio, a_prov_node);
+
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
goto fail;
}
@@ -262,6 +268,127 @@ nni_msgq_signal(nni_msgq *mq, int *signal)
}
+static void
+nni_msgq_run_putq(nni_msgq *mq)
+{
+ nni_aio *waio;
+ nni_aio *raio;
+ nni_msg *msg;
+ size_t len;
+
+ while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
+
+ msg = waio->a_msg;
+ len = nni_msg_len(msg);
+
+ // The presence of any blocked reader indicates that
+ // the queue is empty, otherwise it would have just taken
+ // data from the queue.
+ if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
+ nni_list_remove(&mq->mq_aio_getq, raio);
+ nni_list_remove(&mq->mq_aio_putq, waio);
+
+ *raio->a_msgp = msg;
+ waio->a_msg = NULL;
+
+ nni_aio_finish(raio, 0, len);
+ nni_aio_finish(waio, 0, len);
+ continue;
+ }
+
+ // Otherwise if we have room in the buffer, just queue it.
+ if (mq->mq_len < mq->mq_cap) {
+ nni_list_remove(&mq->mq_aio_putq, waio);
+ mq->mq_msgs[mq->mq_put++] = msg;
+ if (mq->mq_put == mq->mq_alloc) {
+ mq->mq_put = 0;
+ }
+ mq->mq_len++;
+ waio->a_msg = NULL;
+ nni_aio_finish(waio, 0, len);
+ continue;
+ }
+
+ // Unable to make progress, leave the aio where it is.
+ break;
+ }
+}
+
+
+static void
+nni_msgq_run_getq(nni_msgq *mq)
+{
+ nni_aio *raio;
+ nni_aio *waio;
+ nni_msg *msg;
+ size_t len;
+
+ while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
+
+ // If anything is waiting in the queue, get it first.
+ if (mq->mq_len != 0) {
+ msg = mq->mq_msgs[mq->mq_get++];
+ if (mq->mq_get > mq->mq_cap) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ len = nni_msg_len(msg);
+ *(raio->a_msgp) = msg;
+ nni_aio_finish(raio, 0, len);
+ continue;
+ }
+
+ // Nothing queued (unbuffered?), maybe a writer is waiting.
+ if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
+ nni_list_remove(&mq->mq_aio_putq, waio);
+ nni_list_remove(&mq->mq_aio_getq, raio);
+
+ msg = waio->a_msg;
+ len = nni_msg_len(msg);
+ waio->a_msg = NULL;
+ *raio->a_msgp = msg;
+ nni_aio_finish(raio, 0, len);
+ nni_aio_finish(waio, 0, len);
+ continue;
+ }
+
+ // No data to get, and no unbuffered writers waiting. Just
+ // wait until something arrives.
+ break;
+ }
+}
+
+
+static void
+nni_msgq_run_notify(nni_msgq *mq)
+{
+}
+
+
+int
+nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ nni_list_append(&mq->mq_aio_putq, aio);
+ nni_msgq_run_putq(mq);
+ nni_msgq_run_notify(mq);
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+}
+
+
+int
+nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ nni_list_append(&mq->mq_aio_getq, aio);
+ nni_msgq_run_getq(mq);
+ nni_msgq_run_notify(mq);
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+}
+
+
int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{