aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-27 22:24:25 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-27 22:24:25 -0700
commit604c0e6274fc9e508e30eead924d81972fe021c0 (patch)
tree34250fb9c9ae4503a954f39eff5a900ddbd43990 /src/core/msgqueue.c
parent9a39b3cd0ef1d0439851ecfae757c6a93757b1b5 (diff)
downloadnng-604c0e6274fc9e508e30eead924d81972fe021c0.tar.gz
nng-604c0e6274fc9e508e30eead924d81972fe021c0.tar.bz2
nng-604c0e6274fc9e508e30eead924d81972fe021c0.zip
Cancellation plumbing for message queues.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c104
1 files changed, 77 insertions, 27 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index a58aff41..99b53274 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -132,6 +132,20 @@ nni_msgq_fini(nni_msgq *mq)
}
+static void
+nni_msgq_finish(nni_aio *aio, int rv)
+{
+ nni_msgq *mq = aio->a_prov_data;
+
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
+ if ((mq != NULL) && nni_list_active(&mq->mq_aio_putq, aio)) {
+ nni_list_remove(&mq->mq_aio_putq, aio);
+ }
+ nni_aio_finish(aio, rv, 0);
+}
+
+
void
nni_msgq_set_get_error(nni_msgq *mq, int error)
{
@@ -144,8 +158,7 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
naio = nni_list_first(&mq->mq_aio_getq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
- nni_list_remove(&mq->mq_aio_getq, aio);
- nni_aio_finish(aio, error, 0);
+ nni_msgq_finish(aio, error);
}
}
mq->mq_geterr = error;
@@ -165,8 +178,7 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
naio = nni_list_first(&mq->mq_aio_putq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
- nni_list_remove(&mq->mq_aio_putq, aio);
- nni_aio_finish(aio, error, 0);
+ nni_msgq_finish(aio, error);
}
}
mq->mq_puterr = error;
@@ -186,14 +198,12 @@ nni_msgq_set_error(nni_msgq *mq, int error)
naio = nni_list_first(&mq->mq_aio_getq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
- nni_list_remove(&mq->mq_aio_getq, aio);
- nni_aio_finish(aio, error, 0);
+ nni_msgq_finish(aio, error);
}
naio = nni_list_first(&mq->mq_aio_putq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
- nni_list_remove(&mq->mq_aio_putq, aio);
- nni_aio_finish(aio, error, 0);
+ nni_msgq_finish(aio, error);
}
}
mq->mq_puterr = error;
@@ -218,14 +228,11 @@ nni_msgq_run_putq(nni_msgq *mq)
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- nni_list_remove(&mq->mq_aio_getq, raio);
- nni_list_remove(&mq->mq_aio_putq, waio);
-
raio->a_msg = msg;
waio->a_msg = NULL;
- nni_aio_finish(raio, 0, len);
- nni_aio_finish(waio, 0, len);
+ nni_msgq_finish(raio, 0);
+ nni_msgq_finish(waio, 0);
continue;
}
@@ -238,7 +245,7 @@ nni_msgq_run_putq(nni_msgq *mq)
}
mq->mq_len++;
waio->a_msg = NULL;
- nni_aio_finish(waio, 0, len);
+ nni_msgq_finish(waio, 0);
continue;
}
@@ -254,7 +261,6 @@ nni_msgq_run_getq(nni_msgq *mq)
nni_aio *raio;
nni_aio *waio;
nni_msg *msg;
- size_t len;
while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
// If anything is waiting in the queue, get it first.
@@ -265,23 +271,18 @@ nni_msgq_run_getq(nni_msgq *mq)
mq->mq_get = 0;
}
mq->mq_len--;
- len = nni_msg_len(msg);
raio->a_msg = msg;
- nni_aio_finish(raio, 0, len);
+ nni_msgq_finish(raio, 0);
continue;
}
// Nothing queued (unbuffered?), maybe a writer is waiting.
if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- nni_list_remove(&mq->mq_aio_putq, waio);
- nni_list_remove(&mq->mq_aio_getq, raio);
-
msg = waio->a_msg;
- len = nni_msg_len(msg);
waio->a_msg = NULL;
raio->a_msg = msg;
- nni_aio_finish(raio, 0, len);
- nni_aio_finish(waio, 0, len);
+ nni_msgq_finish(raio, 0);
+ nni_msgq_finish(waio, 0);
continue;
}
@@ -310,6 +311,7 @@ nni_msgq_run_notify(nni_msgq *mq)
if ((mq->mq_len != 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) {
NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) {
+ // This stays on the list.
nni_aio_finish(aio, 0, 0);
}
}
@@ -323,10 +325,32 @@ nni_msgq_run_notify(nni_msgq *mq)
}
+static void
+nni_msgq_cancel(nni_aio *aio)
+{
+ nni_msgq *mq = aio->a_prov_data;
+
+ if (mq == NULL) {
+ return;
+ }
+
+ nni_mtx_lock(&mq->mq_lock);
+ // this checks if the AIO is active, it doesn't matter what list
+ if (nni_list_active(&mq->mq_aio_getq, aio)) {
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ }
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
void
nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
+ aio->a_prov_data = mq;
+ aio->a_prov_cancel = nni_msgq_cancel;
if (nni_list_active(&mq->mq_aio_notify_put, aio)) {
nni_list_remove(&mq->mq_aio_notify_put, aio);
}
@@ -339,6 +363,8 @@ void
nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
+ aio->a_prov_data = mq;
+ aio->a_prov_cancel = nni_msgq_cancel;
if (nni_list_active(&mq->mq_aio_notify_get, aio)) {
nni_list_remove(&mq->mq_aio_notify_get, aio);
}
@@ -353,6 +379,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_time expire = aio->a_expire;
nni_mtx_lock(&mq->mq_lock);
+
if (mq->mq_closed) {
nni_aio_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&mq->mq_lock);
@@ -363,10 +390,15 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
return;
}
+
+ aio->a_prov_data = mq;
+ aio->a_prov_cancel = nni_msgq_cancel;
+
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
+ // XXX: handle this using a generic aio timeout.
if (expire < mq->mq_expire) {
mq->mq_expire = expire;
nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
@@ -391,10 +423,15 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
return;
}
+
+ aio->a_prov_data = mq;
+ aio->a_prov_cancel = nni_msgq_cancel;
+
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
+ // XXX: handle this using a generic aio timeout.
if (expire < mq->mq_expire) {
mq->mq_expire = expire;
nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
@@ -407,10 +444,6 @@ void
nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
- // NB: nni_list_active and nni_list_remove only use the list structure
- // to determine list node offsets. Otherwise, they only look at the
- // node's linkage structure. Therefore the following check will remove
- // the node from either the getq or the putq list.
if (nni_list_active(&mq->mq_aio_getq, aio)) {
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ECANCELED, 0);
@@ -474,6 +507,8 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
nni_list_remove(&mq->mq_aio_getq, raio);
raio->a_msg = msg;
+ raio->a_prov_cancel = NULL;
+ raio->a_prov_data = NULL;
nni_aio_finish(raio, 0, len);
nni_mtx_unlock(&mq->mq_lock);
@@ -496,6 +531,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
}
+// XXX: Move this to generic AIO timeout...
void
nni_msgq_run_timeout(void *arg)
{
@@ -514,9 +550,13 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
@@ -528,9 +568,13 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
@@ -618,6 +662,8 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_list_remove(&mq->mq_aio_putq, aio);
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
while (mq->mq_len > 0) {
@@ -655,6 +701,8 @@ nni_msgq_close(nni_msgq *mq)
naio = nni_list_first(&mq->mq_aio_getq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
@@ -662,6 +710,8 @@ nni_msgq_close(nni_msgq *mq)
naio = nni_list_first(&mq->mq_aio_putq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}