diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-04-18 20:38:00 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-04-20 07:34:16 -0700 |
| commit | 5902d02ad0a056a146231568f1293ffbcd59f61c (patch) | |
| tree | be38584c02d703ec2322ab941d4d723c752fe187 /src/core/msgqueue.c | |
| parent | 40542e7af0f5003d7ad67876ea580a59174031ca (diff) | |
| download | nng-5902d02ad0a056a146231568f1293ffbcd59f61c.tar.gz nng-5902d02ad0a056a146231568f1293ffbcd59f61c.tar.bz2 nng-5902d02ad0a056a146231568f1293ffbcd59f61c.zip | |
fixes #346 nng_recv() sometimes acts on null `msg` pointer
This closes a fundamental flaw in the way aio structures were
handled. In paticular, aio expiration could race ahead, and
fire before the aio was properly registered by the provider.
This ultimately led to the possibility of duplicate completions
on the same aio.
The solution involved breaking up nni_aio_start into two functions.
nni_aio_begin (which can be run outside of external locks) simply
validates that nni_aio_fini() has not been called, and clears certain
fields in the aio to make it ready for use by the provider.
nni_aio_schedule does the work to register the aio with the expiration
thread, and should only be called when the aio is actually scheduled
for asynchronous completion. nni_aio_schedule_verify does the same thing,
but returns NNG_ETIMEDOUT if the aio has a zero length timeout.
This change has a small negative performance impact. We have plans to
rectify that by converting nni_aio_begin to use a locklesss flag for
the aio->a_fini bit.
While we were here, we fixed some error paths in the POSIX subsystem,
which would have returned incorrect error codes, and we made some
optmizations in the message queues to reduce conditionals while holding
locks in the hot code path.
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 60 |
1 files changed, 41 insertions, 19 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 212b52c2..1bb5a762 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -22,10 +22,10 @@ struct nni_msgq { int mq_len; int mq_get; int mq_put; - int mq_closed; int mq_puterr; int mq_geterr; - int mq_besteffort; + bool mq_besteffort; + bool mq_closed; nni_msg **mq_msgs; nni_list mq_aio_putq; @@ -117,6 +117,10 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) { nni_aio_list_remove(aio); @@ -134,6 +138,10 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { nni_aio_list_remove(aio); @@ -151,6 +159,10 @@ nni_msgq_set_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) || ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) { @@ -231,7 +243,7 @@ nni_msgq_run_putq(nni_msgq *mq) } void -nni_msgq_set_best_effort(nni_msgq *mq, int on) +nni_msgq_set_best_effort(nni_msgq *mq, bool on) { nni_mtx_lock(&mq->mq_lock); mq->mq_besteffort = on; @@ -325,22 +337,28 @@ nni_msgq_cancel(nni_aio *aio, int rv) void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return; - } - if (mq->mq_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); - nni_mtx_unlock(&mq->mq_lock); + int rv; + + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&mq->mq_lock); if (mq->mq_puterr) { nni_aio_finish_error(aio, mq->mq_puterr); nni_mtx_unlock(&mq->mq_lock); return; } + // If this is an instantaneous poll operation, and the queue has + // no room, nobody is waiting to receive, and we're not best effort + // (best effort discards), then report the error (NNG_ETIMEDOUT). + rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + if ((rv != 0) && (mq->mq_len >= mq->mq_cap) && + (nni_list_empty(&mq->mq_aio_getq)) && (!mq->mq_besteffort)) { + nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -351,19 +369,22 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); + int rv; + + if (nni_aio_begin(aio) != 0) { return; } - if (mq->mq_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_geterr) { nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, mq->mq_geterr); return; } - if (mq->mq_geterr) { - nni_aio_finish_error(aio, mq->mq_geterr); + rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + if ((rv != 0) && (mq->mq_len == 0) && + (nni_list_empty(&mq->mq_aio_putq))) { nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, rv); return; } @@ -417,7 +438,8 @@ nni_msgq_close(nni_msgq *mq) nni_aio *aio; nni_mtx_lock(&mq->mq_lock); - mq->mq_closed = 1; + mq->mq_closed = true; + mq->mq_puterr = mq->mq_geterr = NNG_ECLOSED; // Free the messages orphaned in the queue. while (mq->mq_len > 0) { |
