diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 63 |
1 files changed, 23 insertions, 40 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 93a487ad..985563ad 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -61,10 +61,10 @@ nni_msgq_init(nni_msgq **mqp, int cap) if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) { return (NNG_ENOMEM); } - NNI_LIST_INIT(&mq->mq_aio_putq, nni_aio, a_prov_node); - NNI_LIST_INIT(&mq->mq_aio_getq, nni_aio, a_prov_node); - NNI_LIST_INIT(&mq->mq_aio_notify_get, nni_aio, a_prov_node); - NNI_LIST_INIT(&mq->mq_aio_notify_put, nni_aio, a_prov_node); + nni_aio_list_init(&mq->mq_aio_putq); + nni_aio_list_init(&mq->mq_aio_getq); + nni_aio_list_init(&mq->mq_aio_notify_get); + nni_aio_list_init(&mq->mq_aio_notify_put); if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { goto fail; @@ -135,11 +135,7 @@ nni_msgq_fini(nni_msgq *mq) static void nni_msgq_finish(nni_aio *aio, int rv) { - nni_msgq *mq = aio->a_prov_data; - - if ((mq != NULL) && nni_list_active(&mq->mq_aio_putq, aio)) { - nni_list_remove(&mq->mq_aio_putq, aio); - } + nni_aio_list_remove(aio); nni_aio_finish(aio, rv, 0); } @@ -299,15 +295,14 @@ nni_msgq_run_notify(nni_msgq *mq) if (mq->mq_closed) { return; } - if ((mq->mq_len < mq->mq_cap) || - (nni_list_first(&mq->mq_aio_getq) != NULL)) { + if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) { NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) { // This stays on the list. nni_aio_finish(aio, 0, 0); } } - if ((mq->mq_len != 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) { + if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) { // This stays on the list. nni_aio_finish(aio, 0, 0); @@ -315,8 +310,7 @@ nni_msgq_run_notify(nni_msgq *mq) } if (mq->mq_draining) { - if ((mq->mq_len == 0) && - (nni_list_first(&mq->mq_aio_putq) == NULL)) { + if ((mq->mq_len == 0) && !nni_list_empty(&mq->mq_aio_putq)) { nni_cv_wake(&mq->mq_drained); } } @@ -333,10 +327,7 @@ nni_msgq_cancel(nni_aio *aio) } nni_mtx_lock(&mq->mq_lock); - // this checks if the AIO is active, it doesn't matter what list - if (nni_list_active(&mq->mq_aio_getq, aio)) { - nni_list_remove(&mq->mq_aio_getq, aio); - } + nni_aio_list_remove(aio); nni_mtx_unlock(&mq->mq_lock); } @@ -349,10 +340,7 @@ nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); return; } - if (nni_list_active(&mq->mq_aio_notify_put, aio)) { - nni_list_remove(&mq->mq_aio_notify_put, aio); - } - nni_list_append(&mq->mq_aio_notify_put, aio); + nni_aio_list_append(&mq->mq_aio_notify_put, aio); nni_mtx_unlock(&mq->mq_lock); } @@ -365,10 +353,7 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); return; } - if (nni_list_active(&mq->mq_aio_notify_get, aio)) { - nni_list_remove(&mq->mq_aio_notify_get, aio); - } - nni_list_append(&mq->mq_aio_notify_get, aio); + nni_aio_list_append(&mq->mq_aio_notify_get, aio); nni_mtx_unlock(&mq->mq_lock); } @@ -394,7 +379,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) return; } - nni_list_append(&mq->mq_aio_putq, aio); + nni_aio_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -428,7 +413,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) return; } - nni_list_append(&mq->mq_aio_getq, aio); + nni_aio_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); @@ -449,8 +434,7 @@ nni_msgq_canput(nni_msgq *mq) nni_mtx_unlock(&mq->mq_lock); return (0); } - if ((mq->mq_len < mq->mq_cap) || - (nni_list_first(&mq->mq_aio_getq) != NULL)) { + if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) { nni_mtx_unlock(&mq->mq_lock); return (1); } @@ -467,8 +451,7 @@ nni_msgq_canget(nni_msgq *mq) nni_mtx_unlock(&mq->mq_lock); return (0); } - if ((mq->mq_len != 0) || - (nni_list_first(&mq->mq_aio_putq) != NULL)) { + if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { nni_mtx_unlock(&mq->mq_lock); return (1); } @@ -537,10 +520,10 @@ nni_msgq_run_timeout(void *arg) while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); if (aio->a_expire == NNI_TIME_ZERO) { - nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_EAGAIN, 0); } else if (now >= aio->a_expire) { - nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } else if (exp > aio->a_expire) { exp = aio->a_expire; @@ -551,10 +534,10 @@ nni_msgq_run_timeout(void *arg) while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); if (aio->a_expire == NNI_TIME_ZERO) { - nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_EAGAIN, 0); } else if (now >= aio->a_expire) { - nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } else if (exp > aio->a_expire) { exp = aio->a_expire; @@ -617,7 +600,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; mq->mq_draining = 1; - while ((mq->mq_len > 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) { + while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) { if (nni_cv_until(&mq->mq_drained, expire) != 0) { break; } @@ -626,7 +609,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) // Also complete the putq as NNG_ECLOSED. while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } while (mq->mq_len > 0) { @@ -664,14 +647,14 @@ nni_msgq_close(nni_msgq *mq) naio = nni_list_first(&mq->mq_aio_getq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); - nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } naio = nni_list_first(&mq->mq_aio_putq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); - nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } |
