aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c47
1 files changed, 16 insertions, 31 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 99b53274..3d373e2b 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -137,8 +137,6 @@ nni_msgq_finish(nni_aio *aio, int rv)
{
nni_msgq *mq = aio->a_prov_data;
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
if ((mq != NULL) && nni_list_active(&mq->mq_aio_putq, aio)) {
nni_list_remove(&mq->mq_aio_putq, aio);
}
@@ -339,8 +337,6 @@ nni_msgq_cancel(nni_aio *aio)
if (nni_list_active(&mq->mq_aio_getq, aio)) {
nni_list_remove(&mq->mq_aio_getq, aio);
}
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_mtx_unlock(&mq->mq_lock);
}
@@ -349,8 +345,10 @@ void
nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (nni_list_active(&mq->mq_aio_notify_put, aio)) {
nni_list_remove(&mq->mq_aio_notify_put, aio);
}
@@ -363,8 +361,10 @@ void
nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (nni_list_active(&mq->mq_aio_notify_get, aio)) {
nni_list_remove(&mq->mq_aio_notify_get, aio);
}
@@ -379,7 +379,10 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_time expire = aio->a_expire;
nni_mtx_lock(&mq->mq_lock);
-
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (mq->mq_closed) {
nni_aio_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&mq->mq_lock);
@@ -391,9 +394,6 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
return;
}
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
-
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -413,6 +413,10 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_time expire = aio->a_expire;
nni_mtx_lock(&mq->mq_lock);
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (mq->mq_closed) {
nni_aio_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&mq->mq_lock);
@@ -424,9 +428,6 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
return;
}
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
-
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
@@ -507,8 +508,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
nni_list_remove(&mq->mq_aio_getq, raio);
raio->a_msg = msg;
- raio->a_prov_cancel = NULL;
- raio->a_prov_data = NULL;
nni_aio_finish(raio, 0, len);
nni_mtx_unlock(&mq->mq_lock);
@@ -550,13 +549,9 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
@@ -568,13 +563,9 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
@@ -662,8 +653,6 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_list_remove(&mq->mq_aio_putq, aio);
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
while (mq->mq_len > 0) {
@@ -701,8 +690,6 @@ nni_msgq_close(nni_msgq *mq)
naio = nni_list_first(&mq->mq_aio_getq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
@@ -710,8 +697,6 @@ nni_msgq_close(nni_msgq *mq)
naio = nni_list_first(&mq->mq_aio_putq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}