diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-01 21:18:58 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-01 21:18:58 -0800 |
| commit | 97614393e450b6c6813021f0e733b864a6265872 (patch) | |
| tree | 31c8d4d0b2206e944f797b291660d8cf52eebea0 /src/core/msgqueue.c | |
| parent | 5436cc9e1619bf1dd80fabb568aac344ae65d406 (diff) | |
| download | nng-97614393e450b6c6813021f0e733b864a6265872.tar.gz nng-97614393e450b6c6813021f0e733b864a6265872.tar.bz2 nng-97614393e450b6c6813021f0e733b864a6265872.zip | |
Start of msgq aio.
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index b09bb817..66f246f8 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -36,6 +36,9 @@ struct nni_msgq { nni_thr mq_notify_thr; nni_msgq_notify_fn mq_notify_fn; void * mq_notify_arg; + + nni_list mq_aio_putq; + nni_list mq_aio_getq; }; @@ -107,6 +110,9 @@ nni_msgq_init(nni_msgq **mqp, int cap) if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) { return (NNG_ENOMEM); } + NNI_LIST_INIT(&mq->mq_aio_putq, nni_aio, a_prov_node); + NNI_LIST_INIT(&mq->mq_aio_getq, nni_aio, a_prov_node); + if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { goto fail; } @@ -262,6 +268,127 @@ nni_msgq_signal(nni_msgq *mq, int *signal) } +static void +nni_msgq_run_putq(nni_msgq *mq) +{ + nni_aio *waio; + nni_aio *raio; + nni_msg *msg; + size_t len; + + while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { + + msg = waio->a_msg; + len = nni_msg_len(msg); + + // The presence of any blocked reader indicates that + // the queue is empty, otherwise it would have just taken + // data from the queue. + if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { + nni_list_remove(&mq->mq_aio_getq, raio); + nni_list_remove(&mq->mq_aio_putq, waio); + + *raio->a_msgp = msg; + waio->a_msg = NULL; + + nni_aio_finish(raio, 0, len); + nni_aio_finish(waio, 0, len); + continue; + } + + // Otherwise if we have room in the buffer, just queue it. + if (mq->mq_len < mq->mq_cap) { + nni_list_remove(&mq->mq_aio_putq, waio); + mq->mq_msgs[mq->mq_put++] = msg; + if (mq->mq_put == mq->mq_alloc) { + mq->mq_put = 0; + } + mq->mq_len++; + waio->a_msg = NULL; + nni_aio_finish(waio, 0, len); + continue; + } + + // Unable to make progress, leave the aio where it is. + break; + } +} + + +static void +nni_msgq_run_getq(nni_msgq *mq) +{ + nni_aio *raio; + nni_aio *waio; + nni_msg *msg; + size_t len; + + while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { + + // If anything is waiting in the queue, get it first. + if (mq->mq_len != 0) { + msg = mq->mq_msgs[mq->mq_get++]; + if (mq->mq_get > mq->mq_cap) { + mq->mq_get = 0; + } + mq->mq_len--; + len = nni_msg_len(msg); + *(raio->a_msgp) = msg; + nni_aio_finish(raio, 0, len); + continue; + } + + // Nothing queued (unbuffered?), maybe a writer is waiting. + if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { + nni_list_remove(&mq->mq_aio_putq, waio); + nni_list_remove(&mq->mq_aio_getq, raio); + + msg = waio->a_msg; + len = nni_msg_len(msg); + waio->a_msg = NULL; + *raio->a_msgp = msg; + nni_aio_finish(raio, 0, len); + nni_aio_finish(waio, 0, len); + continue; + } + + // No data to get, and no unbuffered writers waiting. Just + // wait until something arrives. + break; + } +} + + +static void +nni_msgq_run_notify(nni_msgq *mq) +{ +} + + +int +nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) +{ + nni_mtx_lock(&mq->mq_lock); + nni_list_append(&mq->mq_aio_putq, aio); + nni_msgq_run_putq(mq); + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); + return (0); +} + + +int +nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) +{ + nni_mtx_lock(&mq->mq_lock); + nni_list_append(&mq->mq_aio_getq, aio); + nni_msgq_run_getq(mq); + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); + return (0); +} + + int nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) { |
