aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-09 17:21:27 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-14 17:09:20 -0700
commit16b4c4019c7b7904de171c588ed8c72ca732d2cf (patch)
tree9e5a8416470631cfb48f5a6ebdd4b16e4b1be3d6 /src/core
parente0beb13b066d27ce32347a1c18c9d441828dc553 (diff)
downloadnng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.gz
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.bz2
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.zip
fixes #352 aio lock is burning hot
fixes #326 consider nni_taskq_exec_synch() fixes #410 kqueue implementation could be smarter fixes #411 epoll_implementation could be smarter fixes #426 synchronous completion can lead to panic fixes #421 pipe close race condition/duplicate destroy This is a major refactoring of two significant parts of the code base, which are closely interrelated. First the aio and taskq framework have undergone a number of simplifications, and improvements. We have ditched a few parts of the internal API (for example tasks no longer support cancellation) that weren't terribly useful but added a lot of complexity, and we've made aio_schedule something that now checks for cancellation or other "premature" completions. The aio framework now uses the tasks more tightly, so that aio wait can devolve into just nni_task_wait(). We did have to add a "task_prep()" step to prevent race conditions. Second, the entire POSIX poller framework has been simplified, and made more robust, and more scalable. There were some fairly inherent race conditions around the shutdown/close code, where we *thought* we were synchronizing against the other thread, but weren't doing so adequately. With a cleaner design, we've been able to tighten up the implementation to remove these race conditions, while substantially reducing the chance for lock contention, thereby improving scalability. The illumos poller also got a performance boost by polling for multiple events. In highly "busy" systems, we expect to see vast reductions in lock contention, and therefore greater scalability, in addition to overall improved reliability. One area where we currently can do better is that there is still only a single poller thread run. Scaling this out is a task that has to be done differently for each poller, and carefuly to ensure that close conditions are safe on all pollers, and that no chance for deadlock/livelock waiting for pfd finalizers can occur.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c216
-rw-r--r--src/core/aio.h15
-rw-r--r--src/core/device.c7
-rw-r--r--src/core/endpt.c13
-rw-r--r--src/core/msgqueue.c4
-rw-r--r--src/core/platform.h7
-rw-r--r--src/core/taskq.c263
-rw-r--r--src/core/taskq.h20
-rw-r--r--src/core/thread.c9
-rw-r--r--src/core/thread.h3
10 files changed, 297 insertions, 260 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 3606aa14..a5b6c088 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -17,6 +17,7 @@ static nni_cv nni_aio_expire_cv;
static int nni_aio_expire_run;
static nni_thr nni_aio_expire_thr;
static nni_list nni_aio_expire_aios;
+static nni_aio *nni_aio_expire_aio;
// Design notes.
//
@@ -62,16 +63,11 @@ struct nng_aio {
nni_duration a_timeout; // Relative timeout
// These fields are private to the aio framework.
- nni_cv a_cv;
- bool a_fini : 1; // shutting down (no new operations)
- bool a_done : 1; // operation has completed
- bool a_pend : 1; // completion routine pending
- bool a_active : 1; // aio was started
- bool a_expiring : 1; // expiration callback in progress
- bool a_waiting : 1; // a thread is waiting for this to finish
- bool a_synch : 1; // run completion synchronously
- bool a_sleep : 1; // sleeping with no action
- nni_task a_task;
+ bool a_stop; // shutting down (no new operations)
+ bool a_sleep; // sleeping with no action
+ int a_sleeprv; // result when sleep wakes
+ int a_cancelrv; // if canceled between begin and schedule
+ nni_task *a_task;
// Read/write operations.
nni_iov *a_iov;
@@ -109,12 +105,16 @@ int
nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
{
nni_aio *aio;
+ int rv;
if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
return (NNG_ENOMEM);
}
memset(aio, 0, sizeof(*aio));
- nni_cv_init(&aio->a_cv, &nni_aio_lk);
+ if ((rv = nni_task_init(&aio->a_task, NULL, cb, arg)) != 0) {
+ NNI_FREE_STRUCT(aio);
+ return (rv);
+ }
aio->a_expire = NNI_TIME_NEVER;
aio->a_timeout = NNG_DURATION_INFINITE;
aio->a_iov = aio->a_iovinl;
@@ -122,7 +122,6 @@ nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
if (arg == NULL) {
arg = aio;
}
- nni_task_init(NULL, &aio->a_task, cb, arg);
*aiop = aio;
return (0);
}
@@ -133,9 +132,19 @@ nni_aio_fini(nni_aio *aio)
if (aio != NULL) {
nni_aio_stop(aio);
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
+ // Wait for the aio to be "done"; this ensures that we don't
+ // destroy an aio from a "normal" completion callback while
+ // the expiration thread is working.
+
+ nni_mtx_lock(&nni_aio_lk);
+ while (nni_aio_expire_aio == aio) {
+ nni_cv_wait(&nni_aio_expire_cv);
+ }
+ nni_mtx_unlock(&nni_aio_lk);
+ nni_task_fini(aio->a_task);
+
+ // At this point the AIO is done.
if (aio->a_niovalloc > 0) {
NNI_FREE_STRUCTS(aio->a_iovalloc, aio->a_niovalloc);
}
@@ -186,7 +195,7 @@ nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
nni_mtx_lock(&nni_aio_lk);
- aio->a_fini = true;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
nni_aio_abort(aio, NNG_ECANCELED);
@@ -279,15 +288,7 @@ nni_aio_count(nni_aio *aio)
void
nni_aio_wait(nni_aio *aio)
{
- nni_mtx_lock(&nni_aio_lk);
- // Wait until we're done, and the synchronous completion flag
- // is cleared (meaning any synch completion is finished).
- while ((aio->a_active) && ((!aio->a_done) || (aio->a_synch))) {
- aio->a_waiting = true;
- nni_cv_wait(&aio->a_cv);
- }
- nni_mtx_unlock(&nni_aio_lk);
- nni_task_wait(&aio->a_task);
+ nni_task_wait(aio->a_task);
}
int
@@ -295,35 +296,34 @@ nni_aio_begin(nni_aio *aio)
{
nni_mtx_lock(&nni_aio_lk);
// We should not reschedule anything at this point.
- if (aio->a_fini) {
- aio->a_active = false;
+ if (aio->a_stop) {
+ nni_task_unprep(aio->a_task);
aio->a_result = NNG_ECANCELED;
nni_mtx_unlock(&nni_aio_lk);
return (NNG_ECANCELED);
}
- aio->a_done = false;
- aio->a_pend = false;
aio->a_result = 0;
aio->a_count = 0;
aio->a_prov_cancel = NULL;
aio->a_prov_data = NULL;
- aio->a_active = true;
+ aio->a_cancelrv = 0;
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
+ nni_task_prep(aio->a_task);
nni_mtx_unlock(&nni_aio_lk);
return (0);
}
-void
+int
nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
{
+ int rv;
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
- aio->a_expire = nni_clock();
- break;
+ return (NNG_ETIMEDOUT);
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
aio->a_expire = NNI_TIME_NEVER;
@@ -335,22 +335,26 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
}
nni_mtx_lock(&nni_aio_lk);
+ if (aio->a_stop) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return (NNG_ECANCELED);
+ }
+ if ((rv = aio->a_cancelrv) != 0) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return (rv);
+ }
+
+ // If cancellation occurred in between "begin" and "schedule",
+ // then cancel it right now.
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
- if (aio->a_expire != NNI_TIME_NEVER) {
+ if ((rv = aio->a_cancelrv) != 0) {
+ aio->a_expire = 0;
+ nni_aio_expire_add(aio);
+ } else if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
-}
-
-int
-nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
-{
-
- if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) {
- return (NNG_ETIMEDOUT);
- }
- nni_aio_schedule(aio, cancelfn, data);
return (0);
}
@@ -379,11 +383,8 @@ nni_aio_finish_impl(
{
nni_mtx_lock(&nni_aio_lk);
- NNI_ASSERT(!aio->a_pend); // provider only calls us *once*
-
nni_list_node_remove(&aio->a_expire_node);
- aio->a_pend = true;
aio->a_result = rv;
aio->a_count = count;
aio->a_prov_cancel = NULL;
@@ -393,38 +394,13 @@ nni_aio_finish_impl(
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
-
- // If we are expiring, then we rely on the expiration thread to
- // complete this; we must not because the expiration thread is
- // still holding the reference.
-
- if (aio->a_expiring) {
- nni_mtx_unlock(&nni_aio_lk);
- return;
- }
-
- aio->a_done = true;
- aio->a_synch = synch;
+ nni_mtx_unlock(&nni_aio_lk);
if (synch) {
- if (aio->a_task.task_cb != NULL) {
- nni_mtx_unlock(&nni_aio_lk);
- aio->a_task.task_cb(aio->a_task.task_arg);
- nni_mtx_lock(&nni_aio_lk);
- }
+ nni_task_exec(aio->a_task);
} else {
- nni_task_dispatch(&aio->a_task);
+ nni_task_dispatch(aio->a_task);
}
- aio->a_synch = false;
-
- if (aio->a_waiting) {
- aio->a_waiting = false;
- nni_cv_wake(&aio->a_cv);
- }
-
- // This has to be done with the lock still held, in order
- // to prevent taskq wait from returning prematurely.
- nni_mtx_unlock(&nni_aio_lk);
}
void
@@ -510,25 +486,27 @@ nni_aio_expire_add(nni_aio *aio)
static void
nni_aio_expire_loop(void *arg)
{
- nni_list * aios = &nni_aio_expire_aios;
- nni_aio * aio;
- nni_time now;
- nni_aio_cancelfn cancelfn;
- int rv;
+ nni_list *aios = &nni_aio_expire_aios;
NNI_ARG_UNUSED(arg);
for (;;) {
+ nni_aio_cancelfn cancelfn;
+ nni_time now;
+ nni_aio * aio;
+ int rv;
+
now = nni_clock();
nni_mtx_lock(&nni_aio_lk);
- if (nni_aio_expire_run == 0) {
- nni_mtx_unlock(&nni_aio_lk);
- return;
- }
-
if ((aio = nni_list_first(aios)) == NULL) {
+
+ if (nni_aio_expire_run == 0) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return;
+ }
+
nni_cv_wait(&nni_aio_expire_cv);
nni_mtx_unlock(&nni_aio_lk);
continue;
@@ -544,38 +522,24 @@ nni_aio_expire_loop(void *arg)
// This aio's time has come. Expire it, canceling any
// outstanding I/O.
nni_list_remove(aios, aio);
+ rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT;
+
+ if ((cancelfn = aio->a_prov_cancel) != NULL) {
+
+ // Place a temporary hold on the aio. This prevents it
+ // from being destroyed.
+ nni_aio_expire_aio = aio;
- // Mark it as expiring. This acts as a hold on
- // the aio, similar to the consumers. The actual taskq
- // dispatch on completion won't occur until this is cleared,
- // and the done flag won't be set either.
- aio->a_expiring = true;
- cancelfn = aio->a_prov_cancel;
- rv = aio->a_sleep ? 0 : NNG_ETIMEDOUT;
- aio->a_sleep = false;
-
- // Cancel any outstanding activity. This is always non-NULL
- // for a valid aio, and becomes NULL only when an AIO is
- // already being canceled or finished.
- if (cancelfn != NULL) {
+ // We let the cancel function handle the completion.
+ // If there is no cancellation function, then we cannot
+ // terminate the aio - we've tried, but it has to run
+ // to it's natural conclusion.
nni_mtx_unlock(&nni_aio_lk);
cancelfn(aio, rv);
nni_mtx_lock(&nni_aio_lk);
- } else {
- aio->a_pend = true;
- aio->a_result = rv;
- }
-
- NNI_ASSERT(aio->a_pend); // nni_aio_finish was run
- NNI_ASSERT(aio->a_prov_cancel == NULL);
- aio->a_expiring = false;
- aio->a_done = true;
- nni_task_dispatch(&aio->a_task);
-
- if (aio->a_waiting) {
- aio->a_waiting = false;
- nni_cv_wake(&aio->a_cv);
+ nni_aio_expire_aio = NULL;
+ nni_cv_wake(&nni_aio_expire_cv);
}
nni_mtx_unlock(&nni_aio_lk);
}
@@ -656,12 +620,31 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
return (resid); // we might not have used all of n for this iov
}
+static void
+nni_sleep_cancel(nng_aio *aio, int rv)
+{
+ nni_mtx_lock(&nni_aio_lk);
+ if (!aio->a_sleep) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return;
+ }
+
+ aio->a_sleep = false;
+ nni_list_node_remove(&aio->a_expire_node);
+ nni_mtx_unlock(&nni_aio_lk);
+
+ nni_aio_finish_error(aio, rv);
+}
+
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
+ aio->a_sleeprv = 0;
+ aio->a_sleep = true;
switch (aio->a_timeout) {
case NNG_DURATION_DEFAULT:
case NNG_DURATION_INFINITE:
@@ -671,16 +654,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
// If the timeout on the aio is shorter than our sleep time,
// then let it still wake up early, but with NNG_ETIMEDOUT.
if (ms > aio->a_timeout) {
- aio->a_sleep = false;
- (void) nni_aio_schedule(aio, NULL, NULL);
- return;
+ aio->a_sleeprv = NNG_ETIMEDOUT;
+ ms = aio->a_timeout;
}
}
- aio->a_sleep = true;
aio->a_expire = nni_clock() + ms;
- // There is no cancellation, apart from just unexpiring.
- nni_aio_schedule(aio, NULL, NULL);
+ if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ }
}
void
diff --git a/src/core/aio.h b/src/core/aio.h
index 9b7ac46f..2ed0fb5b 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -146,14 +146,13 @@ extern void nni_aio_bump_count(nni_aio *, size_t);
// nni_aio_schedule indicates that the AIO has begun, and is scheduled for
// asychronous completion. This also starts the expiration timer. Note that
-// prior to this, the aio is uncancellable.
-extern void nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *);
-
-// nni_aio_schedule_verify is like nni_aio_schedule, except that if the
-// operation has been run with a zero time (NNG_FLAG_NONBLOCK), then it
-// returns NNG_ETIMEDOUT. This is done to permit bypassing scheduling
-// if the operation could not be immediately completed.
-extern int nni_aio_schedule_verify(nni_aio *, nni_aio_cancelfn, void *);
+// prior to this, the aio is uncancellable. If the operation has a zero
+// timeout (NNG_FLAG_NONBLOCK) then NNG_ETIMEDOUT is returned. If the
+// operation has already been canceled, or should not be run, then an error
+// is returned. (In that case the caller should probably either return an
+// error to its caller, or possibly cause an asynchronous error by calling
+// nni_aio_finish_error on this aio.)
+extern int nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *);
extern void nni_sleep_aio(nni_duration, nni_aio *);
diff --git a/src/core/device.c b/src/core/device.c
index e3b1d220..1f3bf233 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -187,12 +187,17 @@ void
nni_device_start(nni_device_data *dd, nni_aio *user)
{
int i;
+ int rv;
if (nni_aio_begin(user) != 0) {
return;
}
nni_mtx_lock(&dd->mtx);
- nni_aio_schedule(user, nni_device_cancel, dd);
+ if ((rv = nni_aio_schedule(user, nni_device_cancel, dd)) != 0) {
+ nni_mtx_unlock(&dd->mtx);
+ nni_aio_finish_error(user, rv);
+ return;
+ }
dd->user = user;
for (i = 0; i < dd->npath; i++) {
nni_device_path *p = &dd->paths[i];
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 2741a8e6..7593fb42 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -27,7 +27,7 @@ struct nni_ep {
int ep_closed; // full shutdown
int ep_closing; // close pending (waiting on refcnt)
int ep_refcnt;
- int ep_tmo_run;
+ bool ep_tmo_run;
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_list ep_pipes;
@@ -303,7 +303,7 @@ nni_ep_tmo_cancel(nni_aio *aio, int rv)
if (ep->ep_tmo_run) {
nni_aio_finish_error(aio, rv);
}
- ep->ep_tmo_run = 0;
+ ep->ep_tmo_run = false;
nni_mtx_unlock(&ep->ep_mtx);
}
}
@@ -312,6 +312,7 @@ static void
nni_ep_tmo_start(nni_ep *ep)
{
nni_duration backoff;
+ int rv;
if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) {
return;
@@ -333,8 +334,12 @@ nni_ep_tmo_start(nni_ep *ep)
nni_aio_set_timeout(
ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
- ep->ep_tmo_run = 1;
- nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep);
+ if ((rv = nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep)) !=
+ 0) {
+ nni_aio_finish_error(ep->ep_tmo_aio, rv);
+ }
+
+ ep->ep_tmo_run = true;
}
static void
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 2367b57f..fa94e32f 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -345,7 +345,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
// If this is an instantaneous poll operation, and the queue has
// no room, nobody is waiting to receive, then report NNG_ETIMEDOUT.
- rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
(nni_list_empty(&mq->mq_aio_getq))) {
nni_mtx_unlock(&mq->mq_lock);
@@ -373,7 +373,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
- rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
if ((rv != 0) && (mq->mq_len == 0) &&
(nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
diff --git a/src/core/platform.h b/src/core/platform.h
index 6e7acdbf..bdc74349 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -136,11 +136,16 @@ extern int nni_plat_cv_until(nni_plat_cv *, nni_time);
// immediately.
extern int nni_plat_thr_init(nni_plat_thr *, void (*)(void *), void *);
-// nni_thread_reap waits for the thread to exit, and then releases any
+// nni_plat_thr_fini waits for the thread to exit, and then releases any
// resources associated with the thread. After this returns, it
// is an error to reference the thread in any further way.
extern void nni_plat_thr_fini(nni_plat_thr *);
+// nni_plat_thr_is_self returns true if the caller is the thread
+// identified, and false otherwise. (This allows some deadlock
+// prevention in callbacks, for example.)
+extern bool nni_plat_thr_is_self(nni_plat_thr *);
+
//
// Clock Support
//
diff --git a/src/core/taskq.c b/src/core/taskq.c
index b0fe160b..526fa0b4 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -11,11 +11,22 @@
#include "core/nng_impl.h"
typedef struct nni_taskq_thr nni_taskq_thr;
+struct nni_task {
+ nni_list_node task_node;
+ void * task_arg;
+ nni_cb task_cb;
+ nni_taskq * task_tq;
+ bool task_sched;
+ bool task_run;
+ bool task_done;
+ bool task_exec;
+ bool task_fini;
+ nni_mtx task_mtx;
+ nni_cv task_cv;
+};
struct nni_taskq_thr {
nni_taskq *tqt_tq;
nni_thr tqt_thread;
- nni_task * tqt_running;
- int tqt_wait;
};
struct nni_taskq {
nni_list tq_tasks;
@@ -24,8 +35,7 @@ struct nni_taskq {
nni_cv tq_wait_cv;
nni_taskq_thr *tq_threads;
int tq_nthreads;
- int tq_run;
- int tq_waiting;
+ bool tq_run;
};
static nni_taskq *nni_taskq_systq = NULL;
@@ -40,25 +50,37 @@ nni_taskq_thread(void *self)
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
if ((task = nni_list_first(&tq->tq_tasks)) != NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_run = true;
+ task->task_sched = false;
+ nni_mtx_unlock(&task->task_mtx);
nni_list_remove(&tq->tq_tasks, task);
- thr->tqt_running = task;
nni_mtx_unlock(&tq->tq_mtx);
+
task->task_cb(task->task_arg);
- nni_mtx_lock(&tq->tq_mtx);
- thr->tqt_running = NULL;
- if (thr->tqt_wait || tq->tq_waiting) {
- thr->tqt_wait = 0;
- tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_wait_cv);
- }
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_sched || task->task_exec) {
+ // task resubmitted itself most likely.
+ // We cannot touch the rest of the flags,
+ // since the called function has taken control.
+ nni_mtx_unlock(&task->task_mtx);
+ } else {
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+
+ if (task->task_fini) {
+ task->task_fini = false;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_fini(task);
+ } else {
+ nni_mtx_unlock(&task->task_mtx);
+ }
+ }
+ nni_mtx_lock(&tq->tq_mtx);
continue;
}
- if (tq->tq_waiting) {
- tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_wait_cv);
- }
if (!tq->tq_run) {
break;
}
@@ -89,8 +111,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
for (int i = 0; i < nthr; i++) {
int rv;
- tq->tq_threads[i].tqt_tq = tq;
- tq->tq_threads[i].tqt_running = NULL;
+ tq->tq_threads[i].tqt_tq = tq;
rv = nni_thr_init(&tq->tq_threads[i].tqt_thread,
nni_taskq_thread, &tq->tq_threads[i]);
if (rv != 0) {
@@ -98,7 +119,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
return (rv);
}
}
- tq->tq_run = 1;
+ tq->tq_run = true;
for (int i = 0; i < tq->tq_nthreads; i++) {
nni_thr_run(&tq->tq_threads[i].tqt_thread);
}
@@ -106,53 +127,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
return (0);
}
-static void
-nni_taskq_drain_locked(nni_taskq *tq)
-{
- // We need to first let the taskq completely drain.
- for (;;) {
- int busy = 0;
- if (!nni_list_empty(&tq->tq_tasks)) {
- busy = 1;
- } else {
- int i;
- for (i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running != 0) {
- busy = 1;
- break;
- }
- }
- }
- if (!busy) {
- break;
- }
- tq->tq_waiting++;
- nni_cv_wait(&tq->tq_wait_cv);
- }
-}
-
-void
-nni_taskq_drain(nni_taskq *tq)
-{
- nni_mtx_lock(&tq->tq_mtx);
- nni_taskq_drain_locked(tq);
- nni_mtx_unlock(&tq->tq_mtx);
-}
-
void
nni_taskq_fini(nni_taskq *tq)
{
- // First drain the taskq completely. This is necessary since some
- // tasks that are presently running may need to schedule additional
- // tasks, and we don't want those to block.
if (tq == NULL) {
return;
}
if (tq->tq_run) {
nni_mtx_lock(&tq->tq_mtx);
- nni_taskq_drain_locked(tq);
-
- tq->tq_run = 0;
+ tq->tq_run = false;
nni_cv_wake(&tq->tq_sched_cv);
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -174,90 +157,142 @@ nni_task_dispatch(nni_task *task)
// If there is no callback to perform, then do nothing!
// The user will be none the wiser.
if (task->task_cb == NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_run = false;
+ task->task_exec = false;
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
return;
}
nni_mtx_lock(&tq->tq_mtx);
- // It might already be scheduled... if so don't redo it.
- if (!nni_list_active(&tq->tq_tasks, task)) {
- nni_list_append(&tq->tq_tasks, task);
- }
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = true;
+ task->task_run = false;
+ task->task_done = false;
+ nni_mtx_unlock(&task->task_mtx);
+
+ nni_list_append(&tq->tq_tasks, task);
nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate
nni_mtx_unlock(&tq->tq_mtx);
}
void
-nni_task_wait(nni_task *task)
+nni_task_exec(nni_task *task)
{
- nni_taskq *tq = task->task_tq;
-
if (task->task_cb == NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_run = false;
+ task->task_exec = false;
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
return;
}
- nni_mtx_lock(&tq->tq_mtx);
- for (;;) {
- bool running = false;
- if (nni_list_active(&tq->tq_tasks, task)) {
- running = true;
- } else {
- for (int i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == task) {
- running = true;
- break;
- }
- }
- }
- if (!running) {
- break;
- }
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_exec) {
+ // recursive taskq_exec, run it asynchronously
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_dispatch(task);
+ return;
+ }
+ task->task_exec = true;
+ task->task_sched = false;
+ task->task_done = false;
+ nni_mtx_unlock(&task->task_mtx);
- tq->tq_waiting = 1;
- nni_cv_wait(&tq->tq_wait_cv);
+ task->task_cb(task->task_arg);
+
+ nni_mtx_lock(&task->task_mtx);
+ task->task_exec = false;
+ if (task->task_sched || task->task_run) {
+ // cb scheduled a task
+ nni_mtx_unlock(&task->task_mtx);
+ return;
+ }
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ if (task->task_fini) {
+ task->task_fini = false;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_fini(task);
+ } else {
+ nni_mtx_unlock(&task->task_mtx);
}
- nni_mtx_unlock(&tq->tq_mtx);
}
-int
-nni_task_cancel(nni_task *task)
+void
+nni_task_prep(nni_task *task)
{
- nni_taskq *tq = task->task_tq;
- bool running;
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = true;
+ task->task_done = false;
+ task->task_run = false;
+ nni_mtx_unlock(&task->task_mtx);
+}
- nni_mtx_lock(&tq->tq_mtx);
- running = true;
- for (;;) {
- running = false;
- for (int i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == task) {
- running = true;
- break;
- }
- }
+void
+nni_task_unprep(nni_task *task)
+{
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_done = false;
+ task->task_run = false;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
+}
- if (!running) {
- break;
- }
- // tq->tq_threads[i].tqt_wait = 1;
- tq->tq_waiting++;
- nni_cv_wait(&tq->tq_wait_cv);
+void
+nni_task_wait(nni_task *task)
+{
+ nni_mtx_lock(&task->task_mtx);
+ while ((task->task_sched || task->task_run || task->task_exec) &&
+ (!task->task_done)) {
+ nni_cv_wait(&task->task_cv);
}
+ nni_mtx_unlock(&task->task_mtx);
+}
- if (nni_list_active(&tq->tq_tasks, task)) {
- nni_list_remove(&tq->tq_tasks, task);
+int
+nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg)
+{
+ nni_task *task;
+
+ if ((task = NNI_ALLOC_STRUCT(task)) == NULL) {
+ return (NNG_ENOMEM);
}
- nni_mtx_unlock(&tq->tq_mtx);
+ NNI_LIST_NODE_INIT(&task->task_node);
+ nni_mtx_init(&task->task_mtx);
+ nni_cv_init(&task->task_cv, &task->task_mtx);
+ task->task_sched = false;
+ task->task_done = false;
+ task->task_run = false;
+ task->task_sched = false;
+ task->task_exec = false;
+ task->task_cb = cb;
+ task->task_arg = arg;
+ task->task_tq = tq != NULL ? tq : nni_taskq_systq;
+ *taskp = task;
return (0);
}
void
-nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg)
+nni_task_fini(nni_task *task)
{
- if (tq == NULL) {
- tq = nni_taskq_systq;
+ NNI_ASSERT(!nni_list_node_active(&task->task_node));
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_run || task->task_exec) {
+ // destroy later.
+ task->task_fini = true;
+ nni_mtx_unlock(&task->task_mtx);
+ return;
}
- NNI_LIST_NODE_INIT(&task->task_node);
- task->task_cb = cb;
- task->task_arg = arg;
- task->task_tq = tq;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_cv_fini(&task->task_cv);
+ nni_mtx_fini(&task->task_mtx);
+ NNI_FREE_STRUCT(task);
}
int
diff --git a/src/core/taskq.h b/src/core/taskq.h
index 40b5dc00..513b15bb 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -17,28 +17,22 @@
typedef struct nni_taskq nni_taskq;
typedef struct nni_task nni_task;
-// nni_task is a structure representing a task. Its intended to inlined
-// into structures so that taskq_dispatch can be a guaranteed operation.
-struct nni_task {
- nni_list_node task_node;
- void * task_arg;
- nni_cb task_cb;
- nni_taskq * task_tq;
-};
-
extern int nni_taskq_init(nni_taskq **, int);
extern void nni_taskq_fini(nni_taskq *);
-extern void nni_taskq_drain(nni_taskq *);
// nni_task_dispatch sends the task to the queue. It is guaranteed to
// succeed. (If the queue is shutdown, then the behavior is undefined.)
extern void nni_task_dispatch(nni_task *);
+extern void nni_task_exec(nni_task *);
+extern void nni_task_prep(nni_task *);
+extern void nni_task_unprep(nni_task *);
// nni_task_cancel cancels the task. It will wait for the task to complete
// if it is already running.
extern int nni_task_cancel(nni_task *);
extern void nni_task_wait(nni_task *);
-extern void nni_task_init(nni_taskq *, nni_task *, nni_cb, void *);
+extern int nni_task_init(nni_task **, nni_taskq *, nni_cb, void *);
+extern void nni_task_fini(nni_task *);
extern int nni_taskq_sys_init(void);
extern void nni_taskq_sys_fini(void);
diff --git a/src/core/thread.c b/src/core/thread.c
index 54c9c7d2..adc35542 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -173,3 +173,12 @@ nni_thr_fini(nni_thr *thr)
nni_plat_mtx_fini(&thr->mtx);
thr->init = 0;
}
+
+bool
+nni_thr_is_self(nni_thr *thr)
+{
+ if (!thr->init) {
+ return (false);
+ }
+ return (nni_plat_thr_is_self(&thr->thr));
+}
diff --git a/src/core/thread.h b/src/core/thread.h
index ee83b196..c3d5531e 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -82,4 +82,7 @@ extern void nni_thr_run(nni_thr *thr);
// at all.
extern void nni_thr_wait(nni_thr *thr);
+// nni_thr_is_self returns true if the caller is the named thread.
+extern bool nni_thr_is_self(nni_thr *thr);
+
#endif // CORE_THREAD_H