diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 70 | ||||
| -rw-r--r-- | src/core/aio.h | 9 | ||||
| -rw-r--r-- | src/core/device.c | 3 | ||||
| -rw-r--r-- | src/core/sockfd.c | 5 | ||||
| -rw-r--r-- | src/core/tcp.c | 4 |
5 files changed, 76 insertions, 15 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index eaff5c80..0ac70a0d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -8,6 +8,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "core/aio.h" #include "core/nng_impl.h" #include "core/taskq.h" #include <string.h> @@ -347,14 +348,14 @@ nni_aio_begin(nni_aio *aio) aio->a_count = 0; aio->a_cancel_fn = NULL; aio->a_abort = false; + aio->a_expire_ok = false; + aio->a_sleep = false; // We should not reschedule anything at this point. if (aio->a_stop || eq->eq_stop) { aio->a_result = NNG_ECANCELED; aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = false; - aio->a_expire_ok = false; nni_mtx_unlock(&eq->eq_mtx); return (NNG_ECANCELED); @@ -408,6 +409,65 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) return (0); } +bool +nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) +{ + nni_aio_expire_q *eq = aio->a_expire_q; + bool timeout = false; + + if (!aio->a_sleep && !aio->a_use_expire) { + // Convert the relative timeout to an absolute timeout. + switch (aio->a_timeout) { + case NNG_DURATION_ZERO: + timeout = true; + break; + case NNG_DURATION_INFINITE: + case NNG_DURATION_DEFAULT: + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + aio->a_timeout; + break; + } + } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) { + timeout = true; + } + + nni_mtx_lock(&eq->eq_mtx); + if (timeout) { + aio->a_sleep = false; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + if (aio->a_abort) { + aio->a_sleep = false; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + if (aio->a_stop || eq->eq_stop) { + aio->a_sleep = false; + aio->a_result = NNG_ECLOSED; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + + NNI_ASSERT(aio->a_cancel_fn == NULL); + aio->a_cancel_fn = cancel; + aio->a_cancel_arg = data; + + // We only schedule expiration if we have a way for the expiration + // handler to actively cancel it. + if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) { + nni_aio_expire_add(aio); + } + nni_mtx_unlock(&eq->eq_mtx); + return (true); +} + // nni_aio_abort is called by a consumer which guarantees that the aio // is still valid. void @@ -766,7 +826,6 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv) void nni_sleep_aio(nng_duration ms, nng_aio *aio) { - int rv; if (nni_aio_begin(aio) != 0) { return; } @@ -788,9 +847,8 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) aio->a_expire = ms == NNG_DURATION_INFINITE ? NNI_TIME_NEVER : nni_clock() + ms; - if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) { - nni_aio_finish_error(aio, rv); - } + // we don't do anything else here, so we can ignore the return + (void) nni_aio_defer(aio, nni_sleep_cancel, NULL); } static bool diff --git a/src/core/aio.h b/src/core/aio.h index 9491a2fa..8628d8ef 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -164,8 +164,17 @@ extern void nni_aio_bump_count(nni_aio *, size_t); // is returned. (In that case the caller should probably either return an // error to its caller, or possibly cause an asynchronous error by calling // nni_aio_finish_error on this aio.) +// +// NB: This function should be called while holding the lock that will be used +// to cancel the operation. Otherwise a race can occur where the operation +// cannot be canceled, which can lead to apparent hangs. extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *); +// nni_aio_defer is just like nni_io_schedule, but it also calls the callback +// automatically if the operation cannot be started because the AIO is stopped +// or was canceled before this call (but after nni_aio_begin). +extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *); + extern void nni_sleep_aio(nni_duration, nni_aio *); // nni_aio_completion_list is used after removing the aio from an diff --git a/src/core/device.c b/src/core/device.c index 815fafcc..7084d3e4 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -228,9 +228,8 @@ nni_device(nni_aio *aio, nni_sock *s1, nni_sock *s2) nni_aio_finish_error(aio, rv); return; } - if ((rv = nni_aio_schedule(aio, device_cancel, d)) != 0) { + if (!nni_aio_defer(aio, device_cancel, d)) { nni_mtx_unlock(&device_mtx); - nni_aio_finish_error(aio, rv); nni_reap(&device_reap, d); } device_start(d, aio); diff --git a/src/core/sockfd.c b/src/core/sockfd.c index 787a0783..1a0e792f 100644 --- a/src/core/sockfd.c +++ b/src/core/sockfd.c @@ -115,7 +115,6 @@ static void sfd_listener_accept(void *arg, nng_aio *aio) { sfd_listener *l = arg; - int rv; if (nni_aio_begin(aio) != 0) { return; @@ -129,9 +128,7 @@ sfd_listener_accept(void *arg, nng_aio *aio) if (l->listen_cnt) { sfd_start_conn(l, aio); - } else if ((rv = nni_aio_schedule(aio, sfd_cancel_accept, l)) != 0) { - nni_aio_finish_error(aio, rv); - } else { + } else if (nni_aio_defer(aio, sfd_cancel_accept, l)) { nni_aio_list_append(&l->accept_q, aio); } nni_mtx_unlock(&l->mtx); diff --git a/src/core/tcp.c b/src/core/tcp.c index d2e08493..75b938b0 100644 --- a/src/core/tcp.c +++ b/src/core/tcp.c @@ -177,7 +177,6 @@ static void tcp_dialer_dial(void *arg, nng_aio *aio) { tcp_dialer *d = arg; - int rv; if (nni_aio_begin(aio) != 0) { return; } @@ -187,9 +186,8 @@ tcp_dialer_dial(void *arg, nng_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) { + if (!nni_aio_defer(aio, tcp_dial_cancel, d)) { nni_mtx_unlock(&d->mtx); - nni_aio_finish_error(aio, rv); return; } nni_list_append(&d->conaios, aio); |
