diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 52 | ||||
| -rw-r--r-- | src/core/aio.h | 20 | ||||
| -rw-r--r-- | src/core/device.c | 8 | ||||
| -rw-r--r-- | src/core/endpt.c | 6 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 60 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 2 |
6 files changed, 103 insertions, 45 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index fac62f12..388e6677 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -49,9 +49,9 @@ static nni_list nni_aio_expire_aios; // // In order to guard against aio reuse during teardown, we set a fini // flag. Any attempt to initialize for a new operation after that point -// will fail and the caller will get NNG_ESTATE indicating this. The -// provider that calls nni_aio_start() MUST check the return value, and -// if it comes back nonzero (NNG_ESTATE) then it must simply discard the +// will fail and the caller will get NNG_ECANCELED indicating this. The +// provider that calls nni_aio_begin() MUST check the return value, and +// if it comes back nonzero (NNG_ECANCELED) then it must simply discard the // request and return. // An nni_aio is an async I/O handle. @@ -184,7 +184,7 @@ nni_aio_fini_cb(nni_aio *aio) // nni_aio_stop cancels any oustanding operation, and waits for the // callback to complete, if still running. It also marks the AIO as -// stopped, preventing further calls to nni_aio_start from succeeding. +// stopped, preventing further calls to nni_aio_begin from succeeding. // To correctly tear down an AIO, call stop, and make sure any other // calles are not also stopped, before calling nni_aio_fini to release // actual memory. @@ -298,14 +298,11 @@ nni_aio_wait(nni_aio *aio) } int -nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) +nni_aio_begin(nni_aio *aio) { - nni_time now = nni_clock(); - nni_mtx_lock(&nni_aio_lk); - + // We should not reschedule anything at this point. if (aio->a_fini) { - // We should not reschedule anything at this point. aio->a_active = false; aio->a_result = NNG_ECANCELED; nni_mtx_unlock(&nni_aio_lk); @@ -315,34 +312,52 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) aio->a_pend = false; aio->a_result = 0; aio->a_count = 0; - aio->a_prov_cancel = cancelfn; - aio->a_prov_data = data; + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; aio->a_active = true; - for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } + nni_mtx_unlock(&nni_aio_lk); + return (0); +} +void +nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) +{ if (!aio->a_sleep) { // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { case NNG_DURATION_ZERO: - aio->a_expire = NNI_TIME_ZERO; + aio->a_expire = nni_clock(); break; case NNG_DURATION_INFINITE: case NNG_DURATION_DEFAULT: aio->a_expire = NNI_TIME_NEVER; break; default: - aio->a_expire = now + aio->a_timeout; + aio->a_expire = nni_clock() + aio->a_timeout; break; } } + nni_mtx_lock(&nni_aio_lk); + aio->a_prov_cancel = cancelfn; + aio->a_prov_data = data; if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } nni_mtx_unlock(&nni_aio_lk); +} + +int +nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) +{ + + if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) { + return (NNG_ETIMEDOUT); + } + nni_aio_schedule(aio, cancelfn, data); return (0); } @@ -651,6 +666,9 @@ nni_aio_iov_advance(nni_aio *aio, size_t n) void nni_sleep_aio(nng_duration ms, nng_aio *aio) { + if (nni_aio_begin(aio) != 0) { + return; + } switch (aio->a_timeout) { case NNG_DURATION_DEFAULT: case NNG_DURATION_INFINITE: @@ -661,13 +679,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) // then let it still wake up early, but with NNG_ETIMEDOUT. if (ms > aio->a_timeout) { aio->a_sleep = false; - (void) nni_aio_start(aio, NULL, NULL); + (void) nni_aio_schedule(aio, NULL, NULL); return; } } aio->a_sleep = true; aio->a_expire = nni_clock() + ms; - (void) nni_aio_start(aio, NULL, NULL); + + // There is no cancellation, apart from just unexpiring. + nni_aio_schedule(aio, NULL, NULL); } void diff --git a/src/core/aio.h b/src/core/aio.h index a0f4934f..d23ddf5b 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -123,7 +123,14 @@ extern void nni_aio_finish_msg(nni_aio *, nni_msg *); // with the indicated result (NNG_ECLOSED or NNG_ECANCELED is recommended.) extern void nni_aio_abort(nni_aio *, int rv); -extern int nni_aio_start(nni_aio *, nni_aio_cancelfn, void *); +// nni_aio_begin is called by a provider to indicate it is starting the +// operation, and to check that the aio has not already been marked for +// teardown. It returns 0 on success, or NNG_ECANCELED if the aio is being +// torn down. (In that case, no operation should be aborted without any +// call to any other functions on this AIO, most especially not the +// nng_aio_finish family of functions.) +extern int nni_aio_begin(nni_aio *); + extern void *nni_aio_get_prov_data(nni_aio *); extern void nni_aio_set_prov_data(nni_aio *, void *); extern void *nni_aio_get_prov_extra(nni_aio *, unsigned); @@ -143,6 +150,17 @@ extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **); extern void nni_aio_normalize_timeout(nni_aio *, nng_duration); extern void nni_aio_bump_count(nni_aio *, size_t); +// nni_aio_schedule indicates that the AIO has begun, and is scheduled for +// asychronous completion. This also starts the expiration timer. Note that +// prior to this, the aio is uncancellable. +extern void nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *); + +// nni_aio_schedule_verify is like nni_aio_schedule, except that if the +// operation has been run with a zero time (NNG_FLAG_NONBLOCK), then it +// returns NNG_ETIMEDOUT. This is done to permit bypassing scheduling +// if the operation could not be immediately completed. +extern int nni_aio_schedule_verify(nni_aio *, nni_aio_cancelfn, void *); + extern void nni_sleep_aio(nni_duration, nni_aio *); extern int nni_aio_sys_init(void); diff --git a/src/core/device.c b/src/core/device.c index 6def9f64..e3b1d220 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -188,12 +188,12 @@ nni_device_start(nni_device_data *dd, nni_aio *user) { int i; - nni_mtx_lock(&dd->mtx); - dd->user = user; - if (nni_aio_start(user, nni_device_cancel, dd) != 0) { - nni_mtx_unlock(&dd->mtx); + if (nni_aio_begin(user) != 0) { return; } + nni_mtx_lock(&dd->mtx); + nni_aio_schedule(user, nni_device_cancel, dd); + dd->user = user; for (i = 0; i < dd->npath; i++) { nni_device_path *p = &dd->paths[i]; p->user = user; diff --git a/src/core/endpt.c b/src/core/endpt.c index 0cb59a14..2741a8e6 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -313,7 +313,7 @@ nni_ep_tmo_start(nni_ep *ep) { nni_duration backoff; - if (ep->ep_closing) { + if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) { return; } backoff = ep->ep_currtime; @@ -334,9 +334,7 @@ nni_ep_tmo_start(nni_ep *ep) ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0)); ep->ep_tmo_run = 1; - if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) { - ep->ep_tmo_run = 0; - } + nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep); } static void diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 212b52c2..1bb5a762 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -22,10 +22,10 @@ struct nni_msgq { int mq_len; int mq_get; int mq_put; - int mq_closed; int mq_puterr; int mq_geterr; - int mq_besteffort; + bool mq_besteffort; + bool mq_closed; nni_msg **mq_msgs; nni_list mq_aio_putq; @@ -117,6 +117,10 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) { nni_aio_list_remove(aio); @@ -134,6 +138,10 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { nni_aio_list_remove(aio); @@ -151,6 +159,10 @@ nni_msgq_set_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) || ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) { @@ -231,7 +243,7 @@ nni_msgq_run_putq(nni_msgq *mq) } void -nni_msgq_set_best_effort(nni_msgq *mq, int on) +nni_msgq_set_best_effort(nni_msgq *mq, bool on) { nni_mtx_lock(&mq->mq_lock); mq->mq_besteffort = on; @@ -325,22 +337,28 @@ nni_msgq_cancel(nni_aio *aio, int rv) void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return; - } - if (mq->mq_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); - nni_mtx_unlock(&mq->mq_lock); + int rv; + + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&mq->mq_lock); if (mq->mq_puterr) { nni_aio_finish_error(aio, mq->mq_puterr); nni_mtx_unlock(&mq->mq_lock); return; } + // If this is an instantaneous poll operation, and the queue has + // no room, nobody is waiting to receive, and we're not best effort + // (best effort discards), then report the error (NNG_ETIMEDOUT). + rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + if ((rv != 0) && (mq->mq_len >= mq->mq_cap) && + (nni_list_empty(&mq->mq_aio_getq)) && (!mq->mq_besteffort)) { + nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -351,19 +369,22 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); + int rv; + + if (nni_aio_begin(aio) != 0) { return; } - if (mq->mq_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_geterr) { nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, mq->mq_geterr); return; } - if (mq->mq_geterr) { - nni_aio_finish_error(aio, mq->mq_geterr); + rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + if ((rv != 0) && (mq->mq_len == 0) && + (nni_list_empty(&mq->mq_aio_putq))) { nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, rv); return; } @@ -417,7 +438,8 @@ nni_msgq_close(nni_msgq *mq) nni_aio *aio; nni_mtx_lock(&mq->mq_lock); - mq->mq_closed = 1; + mq->mq_closed = true; + mq->mq_puterr = mq->mq_geterr = NNG_ECLOSED; // Free the messages orphaned in the queue. while (mq->mq_len > 0) { diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 2d23f448..2f1a46eb 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -63,7 +63,7 @@ extern void nni_msgq_set_get_error(nni_msgq *, int); // What this does is treat the message queue condition as if it were // successful, returning 0, and discarding the message. If zero is // passed then this mode is reset to normal. -extern void nni_msgq_set_best_effort(nni_msgq *, int); +extern void nni_msgq_set_best_effort(nni_msgq *, bool); // nni_msgq_filter is a callback function used to filter messages. // The function is called on entry (put) or exit (get). The void |
