aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-04 13:04:38 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-04 13:04:38 -0700
commit58c5fbb731f50a952864bc500a8efd3b7077ee65 (patch)
treebff20e9758e04c024ef26288dff0f4edb89cf033 /src/core/msgqueue.c
parent5b45db0aeb1026fcf7bbdec0e6451d1cfaac58f1 (diff)
downloadnng-58c5fbb731f50a952864bc500a8efd3b7077ee65.tar.gz
nng-58c5fbb731f50a952864bc500a8efd3b7077ee65.tar.bz2
nng-58c5fbb731f50a952864bc500a8efd3b7077ee65.zip
Improved routines for list management.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c63
1 files changed, 23 insertions, 40 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 93a487ad..985563ad 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -61,10 +61,10 @@ nni_msgq_init(nni_msgq **mqp, int cap)
if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&mq->mq_aio_putq, nni_aio, a_prov_node);
- NNI_LIST_INIT(&mq->mq_aio_getq, nni_aio, a_prov_node);
- NNI_LIST_INIT(&mq->mq_aio_notify_get, nni_aio, a_prov_node);
- NNI_LIST_INIT(&mq->mq_aio_notify_put, nni_aio, a_prov_node);
+ nni_aio_list_init(&mq->mq_aio_putq);
+ nni_aio_list_init(&mq->mq_aio_getq);
+ nni_aio_list_init(&mq->mq_aio_notify_get);
+ nni_aio_list_init(&mq->mq_aio_notify_put);
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
goto fail;
@@ -135,11 +135,7 @@ nni_msgq_fini(nni_msgq *mq)
static void
nni_msgq_finish(nni_aio *aio, int rv)
{
- nni_msgq *mq = aio->a_prov_data;
-
- if ((mq != NULL) && nni_list_active(&mq->mq_aio_putq, aio)) {
- nni_list_remove(&mq->mq_aio_putq, aio);
- }
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, rv, 0);
}
@@ -299,15 +295,14 @@ nni_msgq_run_notify(nni_msgq *mq)
if (mq->mq_closed) {
return;
}
- if ((mq->mq_len < mq->mq_cap) ||
- (nni_list_first(&mq->mq_aio_getq) != NULL)) {
+ if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) {
NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) {
// This stays on the list.
nni_aio_finish(aio, 0, 0);
}
}
- if ((mq->mq_len != 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) {
+ if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) {
// This stays on the list.
nni_aio_finish(aio, 0, 0);
@@ -315,8 +310,7 @@ nni_msgq_run_notify(nni_msgq *mq)
}
if (mq->mq_draining) {
- if ((mq->mq_len == 0) &&
- (nni_list_first(&mq->mq_aio_putq) == NULL)) {
+ if ((mq->mq_len == 0) && !nni_list_empty(&mq->mq_aio_putq)) {
nni_cv_wake(&mq->mq_drained);
}
}
@@ -333,10 +327,7 @@ nni_msgq_cancel(nni_aio *aio)
}
nni_mtx_lock(&mq->mq_lock);
- // this checks if the AIO is active, it doesn't matter what list
- if (nni_list_active(&mq->mq_aio_getq, aio)) {
- nni_list_remove(&mq->mq_aio_getq, aio);
- }
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -349,10 +340,7 @@ nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
return;
}
- if (nni_list_active(&mq->mq_aio_notify_put, aio)) {
- nni_list_remove(&mq->mq_aio_notify_put, aio);
- }
- nni_list_append(&mq->mq_aio_notify_put, aio);
+ nni_aio_list_append(&mq->mq_aio_notify_put, aio);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -365,10 +353,7 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
return;
}
- if (nni_list_active(&mq->mq_aio_notify_get, aio)) {
- nni_list_remove(&mq->mq_aio_notify_get, aio);
- }
- nni_list_append(&mq->mq_aio_notify_get, aio);
+ nni_aio_list_append(&mq->mq_aio_notify_get, aio);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -394,7 +379,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
return;
}
- nni_list_append(&mq->mq_aio_putq, aio);
+ nni_aio_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -428,7 +413,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
return;
}
- nni_list_append(&mq->mq_aio_getq, aio);
+ nni_aio_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
@@ -449,8 +434,7 @@ nni_msgq_canput(nni_msgq *mq)
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
- if ((mq->mq_len < mq->mq_cap) ||
- (nni_list_first(&mq->mq_aio_getq) != NULL)) {
+ if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) {
nni_mtx_unlock(&mq->mq_lock);
return (1);
}
@@ -467,8 +451,7 @@ nni_msgq_canget(nni_msgq *mq)
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
- if ((mq->mq_len != 0) ||
- (nni_list_first(&mq->mq_aio_putq) != NULL)) {
+ if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
nni_mtx_unlock(&mq->mq_lock);
return (1);
}
@@ -537,10 +520,10 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
- nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
- nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
exp = aio->a_expire;
@@ -551,10 +534,10 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
- nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
- nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
exp = aio->a_expire;
@@ -617,7 +600,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
mq->mq_draining = 1;
- while ((mq->mq_len > 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) {
+ while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) {
if (nni_cv_until(&mq->mq_drained, expire) != 0) {
break;
}
@@ -626,7 +609,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
// Also complete the putq as NNG_ECLOSED.
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
while (mq->mq_len > 0) {
@@ -664,14 +647,14 @@ nni_msgq_close(nni_msgq *mq)
naio = nni_list_first(&mq->mq_aio_getq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
- nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
naio = nni_list_first(&mq->mq_aio_putq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
- nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}