diff options
| -rw-r--r-- | src/core/msgqueue.c | 104 |
1 files changed, 77 insertions, 27 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index a58aff41..99b53274 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -132,6 +132,20 @@ nni_msgq_fini(nni_msgq *mq) } +static void +nni_msgq_finish(nni_aio *aio, int rv) +{ + nni_msgq *mq = aio->a_prov_data; + + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; + if ((mq != NULL) && nni_list_active(&mq->mq_aio_putq, aio)) { + nni_list_remove(&mq->mq_aio_putq, aio); + } + nni_aio_finish(aio, rv, 0); +} + + void nni_msgq_set_get_error(nni_msgq *mq, int error) { @@ -144,8 +158,7 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) naio = nni_list_first(&mq->mq_aio_getq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); - nni_list_remove(&mq->mq_aio_getq, aio); - nni_aio_finish(aio, error, 0); + nni_msgq_finish(aio, error); } } mq->mq_geterr = error; @@ -165,8 +178,7 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) naio = nni_list_first(&mq->mq_aio_putq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); - nni_list_remove(&mq->mq_aio_putq, aio); - nni_aio_finish(aio, error, 0); + nni_msgq_finish(aio, error); } } mq->mq_puterr = error; @@ -186,14 +198,12 @@ nni_msgq_set_error(nni_msgq *mq, int error) naio = nni_list_first(&mq->mq_aio_getq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); - nni_list_remove(&mq->mq_aio_getq, aio); - nni_aio_finish(aio, error, 0); + nni_msgq_finish(aio, error); } naio = nni_list_first(&mq->mq_aio_putq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); - nni_list_remove(&mq->mq_aio_putq, aio); - nni_aio_finish(aio, error, 0); + nni_msgq_finish(aio, error); } } mq->mq_puterr = error; @@ -218,14 +228,11 @@ nni_msgq_run_putq(nni_msgq *mq) // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - nni_list_remove(&mq->mq_aio_getq, raio); - nni_list_remove(&mq->mq_aio_putq, waio); - raio->a_msg = msg; waio->a_msg = NULL; - nni_aio_finish(raio, 0, len); - nni_aio_finish(waio, 0, len); + nni_msgq_finish(raio, 0); + nni_msgq_finish(waio, 0); continue; } @@ -238,7 +245,7 @@ nni_msgq_run_putq(nni_msgq *mq) } mq->mq_len++; waio->a_msg = NULL; - nni_aio_finish(waio, 0, len); + nni_msgq_finish(waio, 0); continue; } @@ -254,7 +261,6 @@ nni_msgq_run_getq(nni_msgq *mq) nni_aio *raio; nni_aio *waio; nni_msg *msg; - size_t len; while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { // If anything is waiting in the queue, get it first. @@ -265,23 +271,18 @@ nni_msgq_run_getq(nni_msgq *mq) mq->mq_get = 0; } mq->mq_len--; - len = nni_msg_len(msg); raio->a_msg = msg; - nni_aio_finish(raio, 0, len); + nni_msgq_finish(raio, 0); continue; } // Nothing queued (unbuffered?), maybe a writer is waiting. if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - nni_list_remove(&mq->mq_aio_putq, waio); - nni_list_remove(&mq->mq_aio_getq, raio); - msg = waio->a_msg; - len = nni_msg_len(msg); waio->a_msg = NULL; raio->a_msg = msg; - nni_aio_finish(raio, 0, len); - nni_aio_finish(waio, 0, len); + nni_msgq_finish(raio, 0); + nni_msgq_finish(waio, 0); continue; } @@ -310,6 +311,7 @@ nni_msgq_run_notify(nni_msgq *mq) if ((mq->mq_len != 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) { NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) { + // This stays on the list. nni_aio_finish(aio, 0, 0); } } @@ -323,10 +325,32 @@ nni_msgq_run_notify(nni_msgq *mq) } +static void +nni_msgq_cancel(nni_aio *aio) +{ + nni_msgq *mq = aio->a_prov_data; + + if (mq == NULL) { + return; + } + + nni_mtx_lock(&mq->mq_lock); + // this checks if the AIO is active, it doesn't matter what list + if (nni_list_active(&mq->mq_aio_getq, aio)) { + nni_list_remove(&mq->mq_aio_getq, aio); + } + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; + nni_mtx_unlock(&mq->mq_lock); +} + + void nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); + aio->a_prov_data = mq; + aio->a_prov_cancel = nni_msgq_cancel; if (nni_list_active(&mq->mq_aio_notify_put, aio)) { nni_list_remove(&mq->mq_aio_notify_put, aio); } @@ -339,6 +363,8 @@ void nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); + aio->a_prov_data = mq; + aio->a_prov_cancel = nni_msgq_cancel; if (nni_list_active(&mq->mq_aio_notify_get, aio)) { nni_list_remove(&mq->mq_aio_notify_get, aio); } @@ -353,6 +379,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_time expire = aio->a_expire; nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { nni_aio_finish(aio, NNG_ECLOSED, 0); nni_mtx_unlock(&mq->mq_lock); @@ -363,10 +390,15 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); return; } + + aio->a_prov_data = mq; + aio->a_prov_cancel = nni_msgq_cancel; + nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); + // XXX: handle this using a generic aio timeout. if (expire < mq->mq_expire) { mq->mq_expire = expire; nni_timer_schedule(&mq->mq_timer, mq->mq_expire); @@ -391,10 +423,15 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); return; } + + aio->a_prov_data = mq; + aio->a_prov_cancel = nni_msgq_cancel; + nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); + // XXX: handle this using a generic aio timeout. if (expire < mq->mq_expire) { mq->mq_expire = expire; nni_timer_schedule(&mq->mq_timer, mq->mq_expire); @@ -407,10 +444,6 @@ void nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); - // NB: nni_list_active and nni_list_remove only use the list structure - // to determine list node offsets. Otherwise, they only look at the - // node's linkage structure. Therefore the following check will remove - // the node from either the getq or the putq list. if (nni_list_active(&mq->mq_aio_getq, aio)) { nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_ECANCELED, 0); @@ -474,6 +507,8 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) nni_list_remove(&mq->mq_aio_getq, raio); raio->a_msg = msg; + raio->a_prov_cancel = NULL; + raio->a_prov_data = NULL; nni_aio_finish(raio, 0, len); nni_mtx_unlock(&mq->mq_lock); @@ -496,6 +531,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) } +// XXX: Move this to generic AIO timeout... void nni_msgq_run_timeout(void *arg) { @@ -514,9 +550,13 @@ nni_msgq_run_timeout(void *arg) while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); if (aio->a_expire == NNI_TIME_ZERO) { + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_EAGAIN, 0); } else if (now >= aio->a_expire) { + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } else if (exp > aio->a_expire) { @@ -528,9 +568,13 @@ nni_msgq_run_timeout(void *arg) while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); if (aio->a_expire == NNI_TIME_ZERO) { + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_EAGAIN, 0); } else if (now >= aio->a_expire) { + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } else if (exp > aio->a_expire) { @@ -618,6 +662,8 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { nni_list_remove(&mq->mq_aio_putq, aio); + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; nni_aio_finish(aio, NNG_ECLOSED, 0); } while (mq->mq_len > 0) { @@ -655,6 +701,8 @@ nni_msgq_close(nni_msgq *mq) naio = nni_list_first(&mq->mq_aio_getq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } @@ -662,6 +710,8 @@ nni_msgq_close(nni_msgq *mq) naio = nni_list_first(&mq->mq_aio_putq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } |
