aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-04 17:17:42 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-04 21:20:00 -0700
commitdc334d7193a2a0bc0194221b853a37e1be7f5b9a (patch)
tree1eebf2773745a3a25e8a071fbe4f51cd5490d4e4 /src/core/msgqueue.c
parent6887900ae033add30ee0151b72abe927c5239588 (diff)
downloadnng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.gz
nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.bz2
nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.zip
Refactor AIO logic to close numerous races and reduce complexity.
This passes valgrind 100% clean for both helgrind and deep leak checks. This represents a complete rethink of how the AIOs work, and much simpler synchronization; the provider API is a bit simpler to boot, as a number of failure modes have been simply eliminated. While here a few other minor bugs were squashed.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c103
1 files changed, 41 insertions, 62 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index d98c68be..2ebc4927 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -121,26 +121,17 @@ nni_msgq_fini(nni_msgq *mq)
NNI_FREE_STRUCT(mq);
}
-static void
-nni_msgq_finish(nni_aio *aio, int rv)
-{
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, 0);
-}
-
void
nni_msgq_set_get_error(nni_msgq *mq, int error)
{
- nni_aio *naio;
nni_aio *aio;
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
if (error != 0) {
- naio = nni_list_first(&mq->mq_aio_getq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_getq, aio);
- nni_msgq_finish(aio, error);
+ while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, error);
}
}
mq->mq_geterr = error;
@@ -150,16 +141,14 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
void
nni_msgq_set_put_error(nni_msgq *mq, int error)
{
- nni_aio *naio;
nni_aio *aio;
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
if (error != 0) {
- naio = nni_list_first(&mq->mq_aio_putq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_putq, aio);
- nni_msgq_finish(aio, error);
+ while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, error);
}
}
mq->mq_puterr = error;
@@ -169,21 +158,15 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
void
nni_msgq_set_error(nni_msgq *mq, int error)
{
- nni_aio *naio;
nni_aio *aio;
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
if (error != 0) {
- naio = nni_list_first(&mq->mq_aio_getq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_getq, aio);
- nni_msgq_finish(aio, error);
- }
- naio = nni_list_first(&mq->mq_aio_putq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_putq, aio);
- nni_msgq_finish(aio, error);
+ while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) ||
+ ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, error);
}
}
mq->mq_puterr = error;
@@ -207,11 +190,12 @@ nni_msgq_run_putq(nni_msgq *mq)
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- raio->a_msg = msg;
waio->a_msg = NULL;
- nni_msgq_finish(raio, 0);
- nni_msgq_finish(waio, 0);
+ nni_aio_list_remove(raio);
+ nni_aio_list_remove(waio);
+ nni_aio_finish(waio, 0, len);
+ nni_aio_finish_msg(raio, msg);
continue;
}
@@ -224,7 +208,7 @@ nni_msgq_run_putq(nni_msgq *mq)
}
mq->mq_len++;
waio->a_msg = NULL;
- nni_msgq_finish(waio, 0);
+ nni_aio_finish(waio, 0, len);
continue;
}
@@ -243,14 +227,14 @@ nni_msgq_run_getq(nni_msgq *mq)
while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
// If anything is waiting in the queue, get it first.
if (mq->mq_len != 0) {
- nni_list_remove(&mq->mq_aio_getq, raio);
msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get == mq->mq_alloc) {
mq->mq_get = 0;
}
mq->mq_len--;
raio->a_msg = msg;
- nni_msgq_finish(raio, 0);
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
continue;
}
@@ -258,9 +242,11 @@ nni_msgq_run_getq(nni_msgq *mq)
if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
msg = waio->a_msg;
waio->a_msg = NULL;
- raio->a_msg = msg;
- nni_msgq_finish(raio, 0);
- nni_msgq_finish(waio, 0);
+ nni_aio_list_remove(waio);
+ nni_aio_list_remove(raio);
+
+ nni_aio_finish(waio, 0, nni_msg_len(msg));
+ nni_aio_finish_msg(raio, msg);
continue;
}
@@ -300,16 +286,15 @@ nni_msgq_run_notify(nni_msgq *mq)
}
static void
-nni_msgq_cancel(nni_aio *aio)
+nni_msgq_cancel(nni_aio *aio, int rv)
{
nni_msgq *mq = aio->a_prov_data;
- if (mq == NULL) {
- return;
- }
-
nni_mtx_lock(&mq->mq_lock);
- nni_aio_list_remove(aio);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
nni_mtx_unlock(&mq->mq_lock);
}
@@ -346,12 +331,12 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
return;
}
if (mq->mq_closed) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&mq->mq_lock);
return;
}
if (mq->mq_puterr) {
- nni_aio_finish(aio, mq->mq_puterr, 0);
+ nni_aio_finish_error(aio, mq->mq_puterr);
nni_mtx_unlock(&mq->mq_lock);
return;
}
@@ -372,12 +357,12 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
return;
}
if (mq->mq_closed) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&mq->mq_lock);
return;
}
if (mq->mq_geterr) {
- nni_aio_finish(aio, mq->mq_geterr, 0);
+ nni_aio_finish_error(aio, mq->mq_geterr);
nni_mtx_unlock(&mq->mq_lock);
return;
}
@@ -439,9 +424,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
nni_list_remove(&mq->mq_aio_getq, raio);
- raio->a_msg = msg;
-
- nni_aio_finish(raio, 0, len);
+ nni_aio_finish_msg(raio, msg);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
@@ -512,13 +495,16 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
break;
}
}
- // If we timedout, free any remaining messages in the queue.
- // Also complete the putq as NNG_ECLOSED.
+ // Timed out or writers drained.
+
+ // Complete the putq as NNG_ECLOSED.
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
+
+ // Free any remaining messages in the queue.
while (mq->mq_len > 0) {
nni_msg *msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get > mq->mq_alloc) {
@@ -551,17 +537,10 @@ nni_msgq_close(nni_msgq *mq)
// Let all pending blockers know we are closing the queue.
naio = nni_list_first(&mq->mq_aio_getq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_getq, aio);
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- }
-
- naio = nni_list_first(&mq->mq_aio_putq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_putq, aio);
+ while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) ||
+ ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) {
nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
nni_mtx_unlock(&mq->mq_lock);