diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-06-28 23:07:28 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-06-28 23:07:28 -0700 |
| commit | fe3c9705072ac8cafecdf2ea6bca4c26f9464824 (patch) | |
| tree | 07aaea70cbf8bb6af369d5efede475ed03ffdd63 /src/core/msgqueue.c | |
| parent | 10d748fa6444324878a77cc5749c93b75819ced2 (diff) | |
| download | nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.tar.gz nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.tar.bz2 nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.zip | |
Refactor stop again, closing numerous races (thanks valgrind!)
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 47 |
1 files changed, 16 insertions, 31 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 99b53274..3d373e2b 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -137,8 +137,6 @@ nni_msgq_finish(nni_aio *aio, int rv) { nni_msgq *mq = aio->a_prov_data; - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; if ((mq != NULL) && nni_list_active(&mq->mq_aio_putq, aio)) { nni_list_remove(&mq->mq_aio_putq, aio); } @@ -339,8 +337,6 @@ nni_msgq_cancel(nni_aio *aio) if (nni_list_active(&mq->mq_aio_getq, aio)) { nni_list_remove(&mq->mq_aio_getq, aio); } - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_mtx_unlock(&mq->mq_lock); } @@ -349,8 +345,10 @@ void nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); - aio->a_prov_data = mq; - aio->a_prov_cancel = nni_msgq_cancel; + if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return; + } if (nni_list_active(&mq->mq_aio_notify_put, aio)) { nni_list_remove(&mq->mq_aio_notify_put, aio); } @@ -363,8 +361,10 @@ void nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); - aio->a_prov_data = mq; - aio->a_prov_cancel = nni_msgq_cancel; + if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return; + } if (nni_list_active(&mq->mq_aio_notify_get, aio)) { nni_list_remove(&mq->mq_aio_notify_get, aio); } @@ -379,7 +379,10 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_time expire = aio->a_expire; nni_mtx_lock(&mq->mq_lock); - + if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return; + } if (mq->mq_closed) { nni_aio_finish(aio, NNG_ECLOSED, 0); nni_mtx_unlock(&mq->mq_lock); @@ -391,9 +394,6 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) return; } - aio->a_prov_data = mq; - aio->a_prov_cancel = nni_msgq_cancel; - nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -413,6 +413,10 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_time expire = aio->a_expire; nni_mtx_lock(&mq->mq_lock); + if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return; + } if (mq->mq_closed) { nni_aio_finish(aio, NNG_ECLOSED, 0); nni_mtx_unlock(&mq->mq_lock); @@ -424,9 +428,6 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) return; } - aio->a_prov_data = mq; - aio->a_prov_cancel = nni_msgq_cancel; - nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); @@ -507,8 +508,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) nni_list_remove(&mq->mq_aio_getq, raio); raio->a_msg = msg; - raio->a_prov_cancel = NULL; - raio->a_prov_data = NULL; nni_aio_finish(raio, 0, len); nni_mtx_unlock(&mq->mq_lock); @@ -550,13 +549,9 @@ nni_msgq_run_timeout(void *arg) while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); if (aio->a_expire == NNI_TIME_ZERO) { - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_EAGAIN, 0); } else if (now >= aio->a_expire) { - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } else if (exp > aio->a_expire) { @@ -568,13 +563,9 @@ nni_msgq_run_timeout(void *arg) while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); if (aio->a_expire == NNI_TIME_ZERO) { - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_EAGAIN, 0); } else if (now >= aio->a_expire) { - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } else if (exp > aio->a_expire) { @@ -662,8 +653,6 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { nni_list_remove(&mq->mq_aio_putq, aio); - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_aio_finish(aio, NNG_ECLOSED, 0); } while (mq->mq_len > 0) { @@ -701,8 +690,6 @@ nni_msgq_close(nni_msgq *mq) naio = nni_list_first(&mq->mq_aio_getq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } @@ -710,8 +697,6 @@ nni_msgq_close(nni_msgq *mq) naio = nni_list_first(&mq->mq_aio_putq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } |
