diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-04 02:46:40 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-04 02:46:40 -0800 |
| commit | fb6550a242bb1742ec62202a99d0604ee9069795 (patch) | |
| tree | bd235a8ddf6766dc54c3e47b31dbc7a59802d5da /src/core/msgqueue.c | |
| parent | c17a1dd3f5333da59355ecc3f8788a0396a8f72d (diff) | |
| download | nng-fb6550a242bb1742ec62202a99d0604ee9069795.tar.gz nng-fb6550a242bb1742ec62202a99d0604ee9069795.tar.bz2 nng-fb6550a242bb1742ec62202a99d0604ee9069795.zip | |
Pipeline protocol now entirely callback driven.
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 168 |
1 files changed, 160 insertions, 8 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 8b9a7e1a..2673063d 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -39,6 +39,9 @@ struct nni_msgq { nni_list mq_aio_putq; nni_list mq_aio_getq; + + nni_timer_node mq_timer; + nni_time mq_expire; }; @@ -89,6 +92,8 @@ nni_msgq_kick(nni_msgq *mq, int sig) } +static void nni_msgq_run_timeout(void *); + int nni_msgq_init(nni_msgq **mqp, int cap) { @@ -127,6 +132,8 @@ nni_msgq_init(nni_msgq **mqp, int cap) goto fail; } + nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq); + mq->mq_cap = cap; mq->mq_alloc = alloc; mq->mq_len = 0; @@ -139,6 +146,7 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_rwait = 0; mq->mq_notify_fn = NULL; mq->mq_notify_arg = NULL; + mq->mq_expire = NNI_TIME_NEVER; *mqp = mq; return (0); @@ -287,7 +295,7 @@ nni_msgq_run_putq(nni_msgq *mq) nni_list_remove(&mq->mq_aio_getq, raio); nni_list_remove(&mq->mq_aio_putq, waio); - *raio->a_msgp = msg; + raio->a_msg = msg; waio->a_msg = NULL; nni_aio_finish(raio, 0, len); @@ -296,7 +304,8 @@ nni_msgq_run_putq(nni_msgq *mq) } // Otherwise if we have room in the buffer, just queue it. - if (mq->mq_len < mq->mq_cap) { + if ((mq->mq_len < mq->mq_cap) || + ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) { nni_list_remove(&mq->mq_aio_putq, waio); mq->mq_msgs[mq->mq_put++] = msg; if (mq->mq_put == mq->mq_alloc) { @@ -311,6 +320,14 @@ nni_msgq_run_putq(nni_msgq *mq) // Unable to make progress, leave the aio where it is. break; } + + // XXX: REMOVE ME WHEN WE GO COMPLETELY ASYNC. + if (mq->mq_len != 0) { + nni_cv_wake(&mq->mq_readable); + } + if (mq->mq_len < mq->mq_cap) { + nni_cv_wake(&mq->mq_writeable); + } } @@ -325,13 +342,14 @@ nni_msgq_run_getq(nni_msgq *mq) while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { // If anything is waiting in the queue, get it first. if (mq->mq_len != 0) { + nni_list_remove(&mq->mq_aio_getq, raio); msg = mq->mq_msgs[mq->mq_get++]; if (mq->mq_get > mq->mq_cap) { mq->mq_get = 0; } mq->mq_len--; len = nni_msg_len(msg); - *(raio->a_msgp) = msg; + raio->a_msg = msg; nni_aio_finish(raio, 0, len); continue; } @@ -344,7 +362,7 @@ nni_msgq_run_getq(nni_msgq *mq) msg = waio->a_msg; len = nni_msg_len(msg); waio->a_msg = NULL; - *raio->a_msgp = msg; + raio->a_msg = msg; nni_aio_finish(raio, 0, len); nni_aio_finish(waio, 0, len); continue; @@ -363,39 +381,93 @@ nni_msgq_run_notify(nni_msgq *mq) } -int +void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); + if (aio->a_expire < mq->mq_expire) { + mq->mq_expire = aio->a_expire; + nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + } nni_mtx_unlock(&mq->mq_lock); - return (0); } -int +void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); + if (aio->a_expire < mq->mq_expire) { + mq->mq_expire = aio->a_expire; + nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + } + nni_mtx_unlock(&mq->mq_lock); +} + + +void +nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio) +{ + nni_mtx_lock(&mq->mq_lock); + // NB: nni_list_active and nni_list_remove only use the list structure + // to determine list node offsets. Otherwise, they only look at the + // node's linkage structure. Therefore the following check will remove + // the node from either the getq or the putq list. + if (nni_list_active(&mq->mq_aio_getq, aio)) { + nni_list_remove(&mq->mq_aio_getq, aio); + } + nni_mtx_unlock(&mq->mq_lock); +} + + +int +nni_msgq_canput(nni_msgq *mq) +{ + nni_mtx_lock(&mq->mq_lock); + if ((mq->mq_len < mq->mq_cap) || + (mq->mq_rwait != 0) || // XXX: REMOVE ME + (nni_list_first(&mq->mq_aio_getq) != NULL)) { + nni_mtx_unlock(&mq->mq_lock); + return (1); + } + nni_mtx_unlock(&mq->mq_lock); + return (0); +} + + +int +nni_msgq_canget(nni_msgq *mq) +{ + nni_mtx_lock(&mq->mq_lock); + if ((mq->mq_len != 0) || + (mq->mq_wwait != 0) || + (nni_list_first(&mq->mq_aio_putq) != NULL)) { + nni_mtx_unlock(&mq->mq_lock); + return (1); + } nni_mtx_unlock(&mq->mq_lock); return (0); } void -nni_msgq_run_timeout(nni_msgq *mq) +nni_msgq_run_timeout(void *arg) { + nni_msgq *mq = arg; nni_time now; + nni_time exp; nni_aio *aio; nni_aio *naio; int rv; now = nni_clock(); + exp = NNI_TIME_NEVER; nni_mtx_lock(&mq->mq_lock); naio = nni_list_first(&mq->mq_aio_getq); @@ -405,6 +477,9 @@ nni_msgq_run_timeout(nni_msgq *mq) nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } + if (exp > aio->a_expire) { + exp = aio->a_expire; + } } naio = nni_list_first(&mq->mq_aio_putq); @@ -414,8 +489,15 @@ nni_msgq_run_timeout(nni_msgq *mq) nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } + if (exp > aio->a_expire) { + exp = aio->a_expire; + } } + mq->mq_expire = exp; + if (mq->mq_expire != NNI_TIME_NEVER) { + nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + } nni_mtx_unlock(&mq->mq_lock); } @@ -495,6 +577,7 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); } nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); + nni_msgq_run_getq(mq); nni_mtx_unlock(&mq->mq_lock); return (0); @@ -565,6 +648,7 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); } nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); + nni_msgq_run_putq(mq); nni_mtx_unlock(&mq->mq_lock); return (0); @@ -572,6 +656,35 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) int +nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) +{ + nni_aio aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + aio.a_expire = expire; + nni_msgq_aio_get(mq, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) == 0) { + *msgp = aio.a_msg; + aio.a_msg = NULL; + } + nni_aio_fini(&aio); + return (rv); +} + + +int +nni_msgq_get(nni_msgq *mq, nni_msg **msgp) +{ + return (nni_msgq_get_until(mq, msgp, NNI_TIME_NEVER)); +} + + +#if 0 +int nni_msgq_get(nni_msgq *mq, nni_msg **msgp) { nni_signal nosig = 0; @@ -580,6 +693,9 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp) } +#endif + + int nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) { @@ -587,6 +703,7 @@ nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) } +#if 0 int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { @@ -596,6 +713,34 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) } +#endif + +int +nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) +{ + nni_aio aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + aio.a_expire = expire; + aio.a_msg = msg; + nni_msgq_aio_put(mq, &aio); + nni_aio_wait(&aio); + nni_aio_fini(&aio); + return (rv); +} + + +int +nni_msgq_put(nni_msgq *mq, nni_msg *msg) +{ + return (nni_msgq_put_until(mq, msg, NNI_TIME_NEVER)); +} + + +#if 0 int nni_msgq_put(nni_msgq *mq, nni_msg *msg) { @@ -605,6 +750,9 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg) } +#endif + + int nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) { @@ -621,6 +769,7 @@ nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal) } +#if 0 int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { @@ -630,6 +779,9 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) } +#endif + + void nni_msgq_drain(nni_msgq *mq, nni_time expire) { |
