diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-04 17:17:42 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-04 21:20:00 -0700 |
| commit | dc334d7193a2a0bc0194221b853a37e1be7f5b9a (patch) | |
| tree | 1eebf2773745a3a25e8a071fbe4f51cd5490d4e4 /src/core/msgqueue.c | |
| parent | 6887900ae033add30ee0151b72abe927c5239588 (diff) | |
| download | nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.gz nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.bz2 nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.zip | |
Refactor AIO logic to close numerous races and reduce complexity.
This passes valgrind 100% clean for both helgrind and deep leak
checks. This represents a complete rethink of how the AIOs work,
and much simpler synchronization; the provider API is a bit simpler
to boot, as a number of failure modes have been simply eliminated.
While here a few other minor bugs were squashed.
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 103 |
1 files changed, 41 insertions, 62 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index d98c68be..2ebc4927 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -121,26 +121,17 @@ nni_msgq_fini(nni_msgq *mq) NNI_FREE_STRUCT(mq); } -static void -nni_msgq_finish(nni_aio *aio, int rv) -{ - nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, 0); -} - void nni_msgq_set_get_error(nni_msgq *mq, int error) { - nni_aio *naio; nni_aio *aio; // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); if (error != 0) { - naio = nni_list_first(&mq->mq_aio_getq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_getq, aio); - nni_msgq_finish(aio, error); + while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, error); } } mq->mq_geterr = error; @@ -150,16 +141,14 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) void nni_msgq_set_put_error(nni_msgq *mq, int error) { - nni_aio *naio; nni_aio *aio; // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); if (error != 0) { - naio = nni_list_first(&mq->mq_aio_putq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_putq, aio); - nni_msgq_finish(aio, error); + while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, error); } } mq->mq_puterr = error; @@ -169,21 +158,15 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) void nni_msgq_set_error(nni_msgq *mq, int error) { - nni_aio *naio; nni_aio *aio; // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); if (error != 0) { - naio = nni_list_first(&mq->mq_aio_getq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_getq, aio); - nni_msgq_finish(aio, error); - } - naio = nni_list_first(&mq->mq_aio_putq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_putq, aio); - nni_msgq_finish(aio, error); + while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) || + ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, error); } } mq->mq_puterr = error; @@ -207,11 +190,12 @@ nni_msgq_run_putq(nni_msgq *mq) // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - raio->a_msg = msg; waio->a_msg = NULL; - nni_msgq_finish(raio, 0); - nni_msgq_finish(waio, 0); + nni_aio_list_remove(raio); + nni_aio_list_remove(waio); + nni_aio_finish(waio, 0, len); + nni_aio_finish_msg(raio, msg); continue; } @@ -224,7 +208,7 @@ nni_msgq_run_putq(nni_msgq *mq) } mq->mq_len++; waio->a_msg = NULL; - nni_msgq_finish(waio, 0); + nni_aio_finish(waio, 0, len); continue; } @@ -243,14 +227,14 @@ nni_msgq_run_getq(nni_msgq *mq) while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { // If anything is waiting in the queue, get it first. if (mq->mq_len != 0) { - nni_list_remove(&mq->mq_aio_getq, raio); msg = mq->mq_msgs[mq->mq_get++]; if (mq->mq_get == mq->mq_alloc) { mq->mq_get = 0; } mq->mq_len--; raio->a_msg = msg; - nni_msgq_finish(raio, 0); + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); continue; } @@ -258,9 +242,11 @@ nni_msgq_run_getq(nni_msgq *mq) if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { msg = waio->a_msg; waio->a_msg = NULL; - raio->a_msg = msg; - nni_msgq_finish(raio, 0); - nni_msgq_finish(waio, 0); + nni_aio_list_remove(waio); + nni_aio_list_remove(raio); + + nni_aio_finish(waio, 0, nni_msg_len(msg)); + nni_aio_finish_msg(raio, msg); continue; } @@ -300,16 +286,15 @@ nni_msgq_run_notify(nni_msgq *mq) } static void -nni_msgq_cancel(nni_aio *aio) +nni_msgq_cancel(nni_aio *aio, int rv) { nni_msgq *mq = aio->a_prov_data; - if (mq == NULL) { - return; - } - nni_mtx_lock(&mq->mq_lock); - nni_aio_list_remove(aio); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } nni_mtx_unlock(&mq->mq_lock); } @@ -346,12 +331,12 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) return; } if (mq->mq_closed) { - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&mq->mq_lock); return; } if (mq->mq_puterr) { - nni_aio_finish(aio, mq->mq_puterr, 0); + nni_aio_finish_error(aio, mq->mq_puterr); nni_mtx_unlock(&mq->mq_lock); return; } @@ -372,12 +357,12 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) return; } if (mq->mq_closed) { - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&mq->mq_lock); return; } if (mq->mq_geterr) { - nni_aio_finish(aio, mq->mq_geterr, 0); + nni_aio_finish_error(aio, mq->mq_geterr); nni_mtx_unlock(&mq->mq_lock); return; } @@ -439,9 +424,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { nni_list_remove(&mq->mq_aio_getq, raio); - raio->a_msg = msg; - - nni_aio_finish(raio, 0, len); + nni_aio_finish_msg(raio, msg); nni_mtx_unlock(&mq->mq_lock); return (0); } @@ -512,13 +495,16 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) break; } } - // If we timedout, free any remaining messages in the queue. - // Also complete the putq as NNG_ECLOSED. + // Timed out or writers drained. + + // Complete the putq as NNG_ECLOSED. while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); } + + // Free any remaining messages in the queue. while (mq->mq_len > 0) { nni_msg *msg = mq->mq_msgs[mq->mq_get++]; if (mq->mq_get > mq->mq_alloc) { @@ -551,17 +537,10 @@ nni_msgq_close(nni_msgq *mq) // Let all pending blockers know we are closing the queue. naio = nni_list_first(&mq->mq_aio_getq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_getq, aio); - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ECLOSED, 0); - } - - naio = nni_list_first(&mq->mq_aio_putq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_putq, aio); + while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) || + ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) { nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); } nni_mtx_unlock(&mq->mq_lock); |
