aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c135
1 files changed, 87 insertions, 48 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index f7110386..add51897 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -61,6 +61,28 @@ static nni_aio *nni_aio_expire_aio;
// operations from starting, without waiting for any existing one to
// complete, call nni_aio_close.
+// In some places we want to check that an aio is not in use.
+// Technically if these checks pass, then they should not need
+// to be done with a lock, because the caller should have the only
+// references to them. However, race detectors can't necessarily
+// know about this semantic, and may complain about potential data
+// races. To suppress false positives, define NNG_RACE_DETECTOR.
+// Note that this will cause extra locks to be acquired, affecting
+// performance, so don't use it in production.
+#ifdef __has_feature
+#if __has_feature(thread_sanitizer)
+#define NNG_RACE_DETECTOR
+#endif
+#endif
+
+#ifdef NNG_RACE_DETECTOR
+#define aio_safe_lock(l) nni_mtx_lock(l)
+#define aio_safe_unlock(l) nni_mtx_unlock(l)
+#else
+#define aio_safe_lock(l)
+#define aio_safe_unlock(l)
+#endif
+
static nni_reap_list aio_reap_list = {
.rl_offset = offsetof(nni_aio, a_reap_node),
.rl_func = (nni_cb) nni_aio_free,
@@ -89,9 +111,7 @@ nni_aio_fini(nni_aio *aio)
// We also wait if the aio is being expired.
nni_mtx_lock(&nni_aio_lk);
aio->a_stop = true;
- if (nni_list_active(&nni_aio_expire_list, aio)) {
- nni_list_remove(&nni_aio_expire_list, aio);
- }
+ nni_list_node_remove(&aio->a_expire_node);
while (nni_aio_expire_aio == aio) {
nni_cv_wait(&nni_aio_expire_cv);
}
@@ -171,6 +191,7 @@ nni_aio_stop(nni_aio *aio)
void * arg;
nni_mtx_lock(&nni_aio_lk);
+ nni_list_node_remove(&aio->a_expire_node);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
@@ -194,6 +215,7 @@ nni_aio_close(nni_aio *aio)
void * arg;
nni_mtx_lock(&nni_aio_lk);
+ nni_list_node_remove(&aio->a_expire_node);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
@@ -280,29 +302,42 @@ nni_aio_wait(nni_aio *aio)
int
nni_aio_begin(nni_aio *aio)
{
+ // If any of these triggers then the caller has a defect because
+ // it means that the aio is already in use. This is always
+ // a bug in the caller. These checks are not technically thread
+ // safe in the event that they are false. Users of race detectors
+ // checks may wish ignore or suppress these checks.
+
+ aio_safe_lock(&nni_aio_lk);
+
+ NNI_ASSERT(!nni_aio_list_active(aio));
+ NNI_ASSERT(aio->a_cancel_fn == NULL);
+ NNI_ASSERT(!nni_list_node_active(&aio->a_expire_node));
+
+ // Some initialization can be done outside of the lock, because
+ // we must have exclusive access to the aio.
+ for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
+ aio->a_outputs[i] = NULL;
+ }
+ aio->a_result = 0;
+ aio->a_count = 0;
+ aio->a_cancel_fn = NULL;
+
+ aio_safe_unlock(&nni_aio_lk);
+
nni_mtx_lock(&nni_aio_lk);
// We should not reschedule anything at this point.
if (aio->a_stop) {
- aio->a_result = NNG_ECANCELED;
- aio->a_count = 0;
- nni_list_node_remove(&aio->a_expire_node);
- aio->a_cancel_fn = NULL;
- aio->a_cancel_arg = NULL;
- aio->a_expire = NNI_TIME_NEVER;
- aio->a_sleep = false;
- aio->a_expire_ok = false;
+ aio->a_result = NNG_ECANCELED;
+ aio->a_cancel_fn = NULL;
+ aio->a_expire = NNI_TIME_NEVER;
+ aio->a_sleep = false;
+ aio->a_expire_ok = false;
nni_mtx_unlock(&nni_aio_lk);
nni_task_dispatch(&aio->a_task);
return (NNG_ECANCELED);
}
- aio->a_result = 0;
- aio->a_count = 0;
- aio->a_cancel_fn = NULL;
- aio->a_cancel_arg = NULL;
- for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
- aio->a_outputs[i] = NULL;
- }
nni_task_prep(&aio->a_task);
nni_mtx_unlock(&nni_aio_lk);
return (0);
@@ -338,7 +373,9 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
aio->a_cancel_fn = cancel;
aio->a_cancel_arg = data;
- if (aio->a_expire != NNI_TIME_NEVER) {
+ // We only schedule expiration if we have a way for the expiration
+ // handler to actively cancel it.
+ if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
@@ -472,20 +509,18 @@ static void
nni_aio_expire_loop(void *unused)
{
nni_list *list = &nni_aio_expire_list;
+ nni_time now;
NNI_ARG_UNUSED(unused);
nni_thr_set_name(NULL, "nng:aio:expire");
- for (;;) {
- nni_aio_cancel_fn fn;
- nni_time now;
- nni_aio * aio;
- int rv;
-
- now = nni_clock();
+ now = nni_clock();
+ nni_mtx_lock(&nni_aio_lk);
- nni_mtx_lock(&nni_aio_lk);
+ for (;;) {
+ nni_aio *aio;
+ int rv;
if ((aio = nni_list_first(list)) == NULL) {
@@ -495,14 +530,15 @@ nni_aio_expire_loop(void *unused)
}
nni_cv_wait(&nni_aio_expire_cv);
- nni_mtx_unlock(&nni_aio_lk);
+
+ now = nni_clock();
continue;
}
if (now < aio->a_expire) {
// Unexpired; the list is ordered, so we just wait.
nni_cv_until(&nni_aio_expire_cv, aio->a_expire);
- nni_mtx_unlock(&nni_aio_lk);
+ now = nni_clock();
continue;
}
@@ -511,26 +547,29 @@ nni_aio_expire_loop(void *unused)
nni_list_remove(list, aio);
rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
- if ((fn = aio->a_cancel_fn) != NULL) {
- void *arg = aio->a_cancel_arg;
- aio->a_cancel_fn = NULL;
- aio->a_cancel_arg = NULL;
- // Place a temporary hold on the aio. This prevents it
- // from being destroyed.
- nni_aio_expire_aio = aio;
-
- // We let the cancel function handle the completion.
- // If there is no cancellation function, then we cannot
- // terminate the aio - we've tried, but it has to run
- // to it's natural conclusion.
- nni_mtx_unlock(&nni_aio_lk);
- fn(aio, arg, rv);
- nni_mtx_lock(&nni_aio_lk);
-
- nni_aio_expire_aio = NULL;
- nni_cv_wake(&nni_aio_expire_cv);
- }
+ nni_aio_cancel_fn fn = aio->a_cancel_fn;
+ void * arg = aio->a_cancel_arg;
+
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ // Place a temporary hold on the aio. This prevents it
+ // from being destroyed.
+ nni_aio_expire_aio = aio;
+
+ // We let the cancel function handle the completion.
+ // If there is no cancellation function, then we cannot
+ // terminate the aio - we've tried, but it has to run
+ // to it's natural conclusion.
nni_mtx_unlock(&nni_aio_lk);
+ fn(aio, arg, rv);
+
+ // Get updated time before reacquiring lock.
+ now = nni_clock();
+
+ nni_mtx_lock(&nni_aio_lk);
+
+ nni_aio_expire_aio = NULL;
+ nni_cv_wake(&nni_aio_expire_cv);
}
}