aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-09 17:21:27 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-14 17:09:20 -0700
commit16b4c4019c7b7904de171c588ed8c72ca732d2cf (patch)
tree9e5a8416470631cfb48f5a6ebdd4b16e4b1be3d6 /src/core/aio.c
parente0beb13b066d27ce32347a1c18c9d441828dc553 (diff)
downloadnng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.gz
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.bz2
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.zip
fixes #352 aio lock is burning hot
fixes #326 consider nni_taskq_exec_synch() fixes #410 kqueue implementation could be smarter fixes #411 epoll_implementation could be smarter fixes #426 synchronous completion can lead to panic fixes #421 pipe close race condition/duplicate destroy This is a major refactoring of two significant parts of the code base, which are closely interrelated. First the aio and taskq framework have undergone a number of simplifications, and improvements. We have ditched a few parts of the internal API (for example tasks no longer support cancellation) that weren't terribly useful but added a lot of complexity, and we've made aio_schedule something that now checks for cancellation or other "premature" completions. The aio framework now uses the tasks more tightly, so that aio wait can devolve into just nni_task_wait(). We did have to add a "task_prep()" step to prevent race conditions. Second, the entire POSIX poller framework has been simplified, and made more robust, and more scalable. There were some fairly inherent race conditions around the shutdown/close code, where we *thought* we were synchronizing against the other thread, but weren't doing so adequately. With a cleaner design, we've been able to tighten up the implementation to remove these race conditions, while substantially reducing the chance for lock contention, thereby improving scalability. The illumos poller also got a performance boost by polling for multiple events. In highly "busy" systems, we expect to see vast reductions in lock contention, and therefore greater scalability, in addition to overall improved reliability. One area where we currently can do better is that there is still only a single poller thread run. Scaling this out is a task that has to be done differently for each poller, and carefuly to ensure that close conditions are safe on all pollers, and that no chance for deadlock/livelock waiting for pfd finalizers can occur.
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c216
1 files changed, 99 insertions, 117 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 3606aa14..a5b6c088 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -17,6 +17,7 @@ static nni_cv nni_aio_expire_cv;
static int nni_aio_expire_run;
static nni_thr nni_aio_expire_thr;
static nni_list nni_aio_expire_aios;
+static nni_aio *nni_aio_expire_aio;
// Design notes.
//
@@ -62,16 +63,11 @@ struct nng_aio {
nni_duration a_timeout; // Relative timeout
// These fields are private to the aio framework.
- nni_cv a_cv;
- bool a_fini : 1; // shutting down (no new operations)
- bool a_done : 1; // operation has completed
- bool a_pend : 1; // completion routine pending
- bool a_active : 1; // aio was started
- bool a_expiring : 1; // expiration callback in progress
- bool a_waiting : 1; // a thread is waiting for this to finish
- bool a_synch : 1; // run completion synchronously
- bool a_sleep : 1; // sleeping with no action
- nni_task a_task;
+ bool a_stop; // shutting down (no new operations)
+ bool a_sleep; // sleeping with no action
+ int a_sleeprv; // result when sleep wakes
+ int a_cancelrv; // if canceled between begin and schedule
+ nni_task *a_task;
// Read/write operations.
nni_iov *a_iov;
@@ -109,12 +105,16 @@ int
nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
{
nni_aio *aio;
+ int rv;
if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
return (NNG_ENOMEM);
}
memset(aio, 0, sizeof(*aio));
- nni_cv_init(&aio->a_cv, &nni_aio_lk);
+ if ((rv = nni_task_init(&aio->a_task, NULL, cb, arg)) != 0) {
+ NNI_FREE_STRUCT(aio);
+ return (rv);
+ }
aio->a_expire = NNI_TIME_NEVER;
aio->a_timeout = NNG_DURATION_INFINITE;
aio->a_iov = aio->a_iovinl;
@@ -122,7 +122,6 @@ nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
if (arg == NULL) {
arg = aio;
}
- nni_task_init(NULL, &aio->a_task, cb, arg);
*aiop = aio;
return (0);
}
@@ -133,9 +132,19 @@ nni_aio_fini(nni_aio *aio)
if (aio != NULL) {
nni_aio_stop(aio);
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
+ // Wait for the aio to be "done"; this ensures that we don't
+ // destroy an aio from a "normal" completion callback while
+ // the expiration thread is working.
+
+ nni_mtx_lock(&nni_aio_lk);
+ while (nni_aio_expire_aio == aio) {
+ nni_cv_wait(&nni_aio_expire_cv);
+ }
+ nni_mtx_unlock(&nni_aio_lk);
+ nni_task_fini(aio->a_task);
+
+ // At this point the AIO is done.
if (aio->a_niovalloc > 0) {
NNI_FREE_STRUCTS(aio->a_iovalloc, aio->a_niovalloc);
}
@@ -186,7 +195,7 @@ nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
nni_mtx_lock(&nni_aio_lk);
- aio->a_fini = true;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
nni_aio_abort(aio, NNG_ECANCELED);
@@ -279,15 +288,7 @@ nni_aio_count(nni_aio *aio)
void
nni_aio_wait(nni_aio *aio)
{
- nni_mtx_lock(&nni_aio_lk);
- // Wait until we're done, and the synchronous completion flag
- // is cleared (meaning any synch completion is finished).
- while ((aio->a_active) && ((!aio->a_done) || (aio->a_synch))) {
- aio->a_waiting = true;
- nni_cv_wait(&aio->a_cv);
- }
- nni_mtx_unlock(&nni_aio_lk);
- nni_task_wait(&aio->a_task);
+ nni_task_wait(aio->a_task);
}
int
@@ -295,35 +296,34 @@ nni_aio_begin(nni_aio *aio)
{
nni_mtx_lock(&nni_aio_lk);
// We should not reschedule anything at this point.
- if (aio->a_fini) {
- aio->a_active = false;
+ if (aio->a_stop) {
+ nni_task_unprep(aio->a_task);
aio->a_result = NNG_ECANCELED;
nni_mtx_unlock(&nni_aio_lk);
return (NNG_ECANCELED);
}
- aio->a_done = false;
- aio->a_pend = false;
aio->a_result = 0;
aio->a_count = 0;
aio->a_prov_cancel = NULL;
aio->a_prov_data = NULL;
- aio->a_active = true;
+ aio->a_cancelrv = 0;
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
+ nni_task_prep(aio->a_task);
nni_mtx_unlock(&nni_aio_lk);
return (0);
}
-void
+int
nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
{
+ int rv;
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
- aio->a_expire = nni_clock();
- break;
+ return (NNG_ETIMEDOUT);
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
aio->a_expire = NNI_TIME_NEVER;
@@ -335,22 +335,26 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
}
nni_mtx_lock(&nni_aio_lk);
+ if (aio->a_stop) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return (NNG_ECANCELED);
+ }
+ if ((rv = aio->a_cancelrv) != 0) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return (rv);
+ }
+
+ // If cancellation occurred in between "begin" and "schedule",
+ // then cancel it right now.
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
- if (aio->a_expire != NNI_TIME_NEVER) {
+ if ((rv = aio->a_cancelrv) != 0) {
+ aio->a_expire = 0;
+ nni_aio_expire_add(aio);
+ } else if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
-}
-
-int
-nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
-{
-
- if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) {
- return (NNG_ETIMEDOUT);
- }
- nni_aio_schedule(aio, cancelfn, data);
return (0);
}
@@ -379,11 +383,8 @@ nni_aio_finish_impl(
{
nni_mtx_lock(&nni_aio_lk);
- NNI_ASSERT(!aio->a_pend); // provider only calls us *once*
-
nni_list_node_remove(&aio->a_expire_node);
- aio->a_pend = true;
aio->a_result = rv;
aio->a_count = count;
aio->a_prov_cancel = NULL;
@@ -393,38 +394,13 @@ nni_aio_finish_impl(
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
-
- // If we are expiring, then we rely on the expiration thread to
- // complete this; we must not because the expiration thread is
- // still holding the reference.
-
- if (aio->a_expiring) {
- nni_mtx_unlock(&nni_aio_lk);
- return;
- }
-
- aio->a_done = true;
- aio->a_synch = synch;
+ nni_mtx_unlock(&nni_aio_lk);
if (synch) {
- if (aio->a_task.task_cb != NULL) {
- nni_mtx_unlock(&nni_aio_lk);
- aio->a_task.task_cb(aio->a_task.task_arg);
- nni_mtx_lock(&nni_aio_lk);
- }
+ nni_task_exec(aio->a_task);
} else {
- nni_task_dispatch(&aio->a_task);
+ nni_task_dispatch(aio->a_task);
}
- aio->a_synch = false;
-
- if (aio->a_waiting) {
- aio->a_waiting = false;
- nni_cv_wake(&aio->a_cv);
- }
-
- // This has to be done with the lock still held, in order
- // to prevent taskq wait from returning prematurely.
- nni_mtx_unlock(&nni_aio_lk);
}
void
@@ -510,25 +486,27 @@ nni_aio_expire_add(nni_aio *aio)
static void
nni_aio_expire_loop(void *arg)
{
- nni_list * aios = &nni_aio_expire_aios;
- nni_aio * aio;
- nni_time now;
- nni_aio_cancelfn cancelfn;
- int rv;
+ nni_list *aios = &nni_aio_expire_aios;
NNI_ARG_UNUSED(arg);
for (;;) {
+ nni_aio_cancelfn cancelfn;
+ nni_time now;
+ nni_aio * aio;
+ int rv;
+
now = nni_clock();
nni_mtx_lock(&nni_aio_lk);
- if (nni_aio_expire_run == 0) {
- nni_mtx_unlock(&nni_aio_lk);
- return;
- }
-
if ((aio = nni_list_first(aios)) == NULL) {
+
+ if (nni_aio_expire_run == 0) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return;
+ }
+
nni_cv_wait(&nni_aio_expire_cv);
nni_mtx_unlock(&nni_aio_lk);
continue;
@@ -544,38 +522,24 @@ nni_aio_expire_loop(void *arg)
// This aio's time has come. Expire it, canceling any
// outstanding I/O.
nni_list_remove(aios, aio);
+ rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT;
+
+ if ((cancelfn = aio->a_prov_cancel) != NULL) {
+
+ // Place a temporary hold on the aio. This prevents it
+ // from being destroyed.
+ nni_aio_expire_aio = aio;
- // Mark it as expiring. This acts as a hold on
- // the aio, similar to the consumers. The actual taskq
- // dispatch on completion won't occur until this is cleared,
- // and the done flag won't be set either.
- aio->a_expiring = true;
- cancelfn = aio->a_prov_cancel;
- rv = aio->a_sleep ? 0 : NNG_ETIMEDOUT;
- aio->a_sleep = false;
-
- // Cancel any outstanding activity. This is always non-NULL
- // for a valid aio, and becomes NULL only when an AIO is
- // already being canceled or finished.
- if (cancelfn != NULL) {
+ // We let the cancel function handle the completion.
+ // If there is no cancellation function, then we cannot
+ // terminate the aio - we've tried, but it has to run
+ // to it's natural conclusion.
nni_mtx_unlock(&nni_aio_lk);
cancelfn(aio, rv);
nni_mtx_lock(&nni_aio_lk);
- } else {
- aio->a_pend = true;
- aio->a_result = rv;
- }
-
- NNI_ASSERT(aio->a_pend); // nni_aio_finish was run
- NNI_ASSERT(aio->a_prov_cancel == NULL);
- aio->a_expiring = false;
- aio->a_done = true;
- nni_task_dispatch(&aio->a_task);
-
- if (aio->a_waiting) {
- aio->a_waiting = false;
- nni_cv_wake(&aio->a_cv);
+ nni_aio_expire_aio = NULL;
+ nni_cv_wake(&nni_aio_expire_cv);
}
nni_mtx_unlock(&nni_aio_lk);
}
@@ -656,12 +620,31 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
return (resid); // we might not have used all of n for this iov
}
+static void
+nni_sleep_cancel(nng_aio *aio, int rv)
+{
+ nni_mtx_lock(&nni_aio_lk);
+ if (!aio->a_sleep) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return;
+ }
+
+ aio->a_sleep = false;
+ nni_list_node_remove(&aio->a_expire_node);
+ nni_mtx_unlock(&nni_aio_lk);
+
+ nni_aio_finish_error(aio, rv);
+}
+
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
+ aio->a_sleeprv = 0;
+ aio->a_sleep = true;
switch (aio->a_timeout) {
case NNG_DURATION_DEFAULT:
case NNG_DURATION_INFINITE:
@@ -671,16 +654,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
// If the timeout on the aio is shorter than our sleep time,
// then let it still wake up early, but with NNG_ETIMEDOUT.
if (ms > aio->a_timeout) {
- aio->a_sleep = false;
- (void) nni_aio_schedule(aio, NULL, NULL);
- return;
+ aio->a_sleeprv = NNG_ETIMEDOUT;
+ ms = aio->a_timeout;
}
}
- aio->a_sleep = true;
aio->a_expire = nni_clock() + ms;
- // There is no cancellation, apart from just unexpiring.
- nni_aio_schedule(aio, NULL, NULL);
+ if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ }
}
void