aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c216
-rw-r--r--src/core/aio.h15
-rw-r--r--src/core/device.c7
-rw-r--r--src/core/endpt.c13
-rw-r--r--src/core/msgqueue.c4
-rw-r--r--src/core/platform.h7
-rw-r--r--src/core/taskq.c263
-rw-r--r--src/core/taskq.h20
-rw-r--r--src/core/thread.c9
-rw-r--r--src/core/thread.h3
10 files changed, 297 insertions, 260 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 3606aa14..a5b6c088 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -17,6 +17,7 @@ static nni_cv nni_aio_expire_cv;
static int nni_aio_expire_run;
static nni_thr nni_aio_expire_thr;
static nni_list nni_aio_expire_aios;
+static nni_aio *nni_aio_expire_aio;
// Design notes.
//
@@ -62,16 +63,11 @@ struct nng_aio {
nni_duration a_timeout; // Relative timeout
// These fields are private to the aio framework.
- nni_cv a_cv;
- bool a_fini : 1; // shutting down (no new operations)
- bool a_done : 1; // operation has completed
- bool a_pend : 1; // completion routine pending
- bool a_active : 1; // aio was started
- bool a_expiring : 1; // expiration callback in progress
- bool a_waiting : 1; // a thread is waiting for this to finish
- bool a_synch : 1; // run completion synchronously
- bool a_sleep : 1; // sleeping with no action
- nni_task a_task;
+ bool a_stop; // shutting down (no new operations)
+ bool a_sleep; // sleeping with no action
+ int a_sleeprv; // result when sleep wakes
+ int a_cancelrv; // if canceled between begin and schedule
+ nni_task *a_task;
// Read/write operations.
nni_iov *a_iov;
@@ -109,12 +105,16 @@ int
nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
{
nni_aio *aio;
+ int rv;
if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
return (NNG_ENOMEM);
}
memset(aio, 0, sizeof(*aio));
- nni_cv_init(&aio->a_cv, &nni_aio_lk);
+ if ((rv = nni_task_init(&aio->a_task, NULL, cb, arg)) != 0) {
+ NNI_FREE_STRUCT(aio);
+ return (rv);
+ }
aio->a_expire = NNI_TIME_NEVER;
aio->a_timeout = NNG_DURATION_INFINITE;
aio->a_iov = aio->a_iovinl;
@@ -122,7 +122,6 @@ nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
if (arg == NULL) {
arg = aio;
}
- nni_task_init(NULL, &aio->a_task, cb, arg);
*aiop = aio;
return (0);
}
@@ -133,9 +132,19 @@ nni_aio_fini(nni_aio *aio)
if (aio != NULL) {
nni_aio_stop(aio);
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
+ // Wait for the aio to be "done"; this ensures that we don't
+ // destroy an aio from a "normal" completion callback while
+ // the expiration thread is working.
+
+ nni_mtx_lock(&nni_aio_lk);
+ while (nni_aio_expire_aio == aio) {
+ nni_cv_wait(&nni_aio_expire_cv);
+ }
+ nni_mtx_unlock(&nni_aio_lk);
+ nni_task_fini(aio->a_task);
+
+ // At this point the AIO is done.
if (aio->a_niovalloc > 0) {
NNI_FREE_STRUCTS(aio->a_iovalloc, aio->a_niovalloc);
}
@@ -186,7 +195,7 @@ nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
nni_mtx_lock(&nni_aio_lk);
- aio->a_fini = true;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
nni_aio_abort(aio, NNG_ECANCELED);
@@ -279,15 +288,7 @@ nni_aio_count(nni_aio *aio)
void
nni_aio_wait(nni_aio *aio)
{
- nni_mtx_lock(&nni_aio_lk);
- // Wait until we're done, and the synchronous completion flag
- // is cleared (meaning any synch completion is finished).
- while ((aio->a_active) && ((!aio->a_done) || (aio->a_synch))) {
- aio->a_waiting = true;
- nni_cv_wait(&aio->a_cv);
- }
- nni_mtx_unlock(&nni_aio_lk);
- nni_task_wait(&aio->a_task);
+ nni_task_wait(aio->a_task);
}
int
@@ -295,35 +296,34 @@ nni_aio_begin(nni_aio *aio)
{
nni_mtx_lock(&nni_aio_lk);
// We should not reschedule anything at this point.
- if (aio->a_fini) {
- aio->a_active = false;
+ if (aio->a_stop) {
+ nni_task_unprep(aio->a_task);
aio->a_result = NNG_ECANCELED;
nni_mtx_unlock(&nni_aio_lk);
return (NNG_ECANCELED);
}
- aio->a_done = false;
- aio->a_pend = false;
aio->a_result = 0;
aio->a_count = 0;
aio->a_prov_cancel = NULL;
aio->a_prov_data = NULL;
- aio->a_active = true;
+ aio->a_cancelrv = 0;
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
+ nni_task_prep(aio->a_task);
nni_mtx_unlock(&nni_aio_lk);
return (0);
}
-void
+int
nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
{
+ int rv;
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
- aio->a_expire = nni_clock();
- break;
+ return (NNG_ETIMEDOUT);
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
aio->a_expire = NNI_TIME_NEVER;
@@ -335,22 +335,26 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
}
nni_mtx_lock(&nni_aio_lk);
+ if (aio->a_stop) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return (NNG_ECANCELED);
+ }
+ if ((rv = aio->a_cancelrv) != 0) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return (rv);
+ }
+
+ // If cancellation occurred in between "begin" and "schedule",
+ // then cancel it right now.
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
- if (aio->a_expire != NNI_TIME_NEVER) {
+ if ((rv = aio->a_cancelrv) != 0) {
+ aio->a_expire = 0;
+ nni_aio_expire_add(aio);
+ } else if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
-}
-
-int
-nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
-{
-
- if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) {
- return (NNG_ETIMEDOUT);
- }
- nni_aio_schedule(aio, cancelfn, data);
return (0);
}
@@ -379,11 +383,8 @@ nni_aio_finish_impl(
{
nni_mtx_lock(&nni_aio_lk);
- NNI_ASSERT(!aio->a_pend); // provider only calls us *once*
-
nni_list_node_remove(&aio->a_expire_node);
- aio->a_pend = true;
aio->a_result = rv;
aio->a_count = count;
aio->a_prov_cancel = NULL;
@@ -393,38 +394,13 @@ nni_aio_finish_impl(
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
-
- // If we are expiring, then we rely on the expiration thread to
- // complete this; we must not because the expiration thread is
- // still holding the reference.
-
- if (aio->a_expiring) {
- nni_mtx_unlock(&nni_aio_lk);
- return;
- }
-
- aio->a_done = true;
- aio->a_synch = synch;
+ nni_mtx_unlock(&nni_aio_lk);
if (synch) {
- if (aio->a_task.task_cb != NULL) {
- nni_mtx_unlock(&nni_aio_lk);
- aio->a_task.task_cb(aio->a_task.task_arg);
- nni_mtx_lock(&nni_aio_lk);
- }
+ nni_task_exec(aio->a_task);
} else {
- nni_task_dispatch(&aio->a_task);
+ nni_task_dispatch(aio->a_task);
}
- aio->a_synch = false;
-
- if (aio->a_waiting) {
- aio->a_waiting = false;
- nni_cv_wake(&aio->a_cv);
- }
-
- // This has to be done with the lock still held, in order
- // to prevent taskq wait from returning prematurely.
- nni_mtx_unlock(&nni_aio_lk);
}
void
@@ -510,25 +486,27 @@ nni_aio_expire_add(nni_aio *aio)
static void
nni_aio_expire_loop(void *arg)
{
- nni_list * aios = &nni_aio_expire_aios;
- nni_aio * aio;
- nni_time now;
- nni_aio_cancelfn cancelfn;
- int rv;
+ nni_list *aios = &nni_aio_expire_aios;
NNI_ARG_UNUSED(arg);
for (;;) {
+ nni_aio_cancelfn cancelfn;
+ nni_time now;
+ nni_aio * aio;
+ int rv;
+
now = nni_clock();
nni_mtx_lock(&nni_aio_lk);
- if (nni_aio_expire_run == 0) {
- nni_mtx_unlock(&nni_aio_lk);
- return;
- }
-
if ((aio = nni_list_first(aios)) == NULL) {
+
+ if (nni_aio_expire_run == 0) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return;
+ }
+
nni_cv_wait(&nni_aio_expire_cv);
nni_mtx_unlock(&nni_aio_lk);
continue;
@@ -544,38 +522,24 @@ nni_aio_expire_loop(void *arg)
// This aio's time has come. Expire it, canceling any
// outstanding I/O.
nni_list_remove(aios, aio);
+ rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT;
+
+ if ((cancelfn = aio->a_prov_cancel) != NULL) {
+
+ // Place a temporary hold on the aio. This prevents it
+ // from being destroyed.
+ nni_aio_expire_aio = aio;
- // Mark it as expiring. This acts as a hold on
- // the aio, similar to the consumers. The actual taskq
- // dispatch on completion won't occur until this is cleared,
- // and the done flag won't be set either.
- aio->a_expiring = true;
- cancelfn = aio->a_prov_cancel;
- rv = aio->a_sleep ? 0 : NNG_ETIMEDOUT;
- aio->a_sleep = false;
-
- // Cancel any outstanding activity. This is always non-NULL
- // for a valid aio, and becomes NULL only when an AIO is
- // already being canceled or finished.
- if (cancelfn != NULL) {
+ // We let the cancel function handle the completion.
+ // If there is no cancellation function, then we cannot
+ // terminate the aio - we've tried, but it has to run
+ // to it's natural conclusion.
nni_mtx_unlock(&nni_aio_lk);
cancelfn(aio, rv);
nni_mtx_lock(&nni_aio_lk);
- } else {
- aio->a_pend = true;
- aio->a_result = rv;
- }
-
- NNI_ASSERT(aio->a_pend); // nni_aio_finish was run
- NNI_ASSERT(aio->a_prov_cancel == NULL);
- aio->a_expiring = false;
- aio->a_done = true;
- nni_task_dispatch(&aio->a_task);
-
- if (aio->a_waiting) {
- aio->a_waiting = false;
- nni_cv_wake(&aio->a_cv);
+ nni_aio_expire_aio = NULL;
+ nni_cv_wake(&nni_aio_expire_cv);
}
nni_mtx_unlock(&nni_aio_lk);
}
@@ -656,12 +620,31 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
return (resid); // we might not have used all of n for this iov
}
+static void
+nni_sleep_cancel(nng_aio *aio, int rv)
+{
+ nni_mtx_lock(&nni_aio_lk);
+ if (!aio->a_sleep) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return;
+ }
+
+ aio->a_sleep = false;
+ nni_list_node_remove(&aio->a_expire_node);
+ nni_mtx_unlock(&nni_aio_lk);
+
+ nni_aio_finish_error(aio, rv);
+}
+
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
+ aio->a_sleeprv = 0;
+ aio->a_sleep = true;
switch (aio->a_timeout) {
case NNG_DURATION_DEFAULT:
case NNG_DURATION_INFINITE:
@@ -671,16 +654,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
// If the timeout on the aio is shorter than our sleep time,
// then let it still wake up early, but with NNG_ETIMEDOUT.
if (ms > aio->a_timeout) {
- aio->a_sleep = false;
- (void) nni_aio_schedule(aio, NULL, NULL);
- return;
+ aio->a_sleeprv = NNG_ETIMEDOUT;
+ ms = aio->a_timeout;
}
}
- aio->a_sleep = true;
aio->a_expire = nni_clock() + ms;
- // There is no cancellation, apart from just unexpiring.
- nni_aio_schedule(aio, NULL, NULL);
+ if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ }
}
void
diff --git a/src/core/aio.h b/src/core/aio.h
index 9b7ac46f..2ed0fb5b 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -146,14 +146,13 @@ extern void nni_aio_bump_count(nni_aio *, size_t);
// nni_aio_schedule indicates that the AIO has begun, and is scheduled for
// asychronous completion. This also starts the expiration timer. Note that
-// prior to this, the aio is uncancellable.
-extern void nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *);
-
-// nni_aio_schedule_verify is like nni_aio_schedule, except that if the
-// operation has been run with a zero time (NNG_FLAG_NONBLOCK), then it
-// returns NNG_ETIMEDOUT. This is done to permit bypassing scheduling
-// if the operation could not be immediately completed.
-extern int nni_aio_schedule_verify(nni_aio *, nni_aio_cancelfn, void *);
+// prior to this, the aio is uncancellable. If the operation has a zero
+// timeout (NNG_FLAG_NONBLOCK) then NNG_ETIMEDOUT is returned. If the
+// operation has already been canceled, or should not be run, then an error
+// is returned. (In that case the caller should probably either return an
+// error to its caller, or possibly cause an asynchronous error by calling
+// nni_aio_finish_error on this aio.)
+extern int nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *);
extern void nni_sleep_aio(nni_duration, nni_aio *);
diff --git a/src/core/device.c b/src/core/device.c
index e3b1d220..1f3bf233 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -187,12 +187,17 @@ void
nni_device_start(nni_device_data *dd, nni_aio *user)
{
int i;
+ int rv;
if (nni_aio_begin(user) != 0) {
return;
}
nni_mtx_lock(&dd->mtx);
- nni_aio_schedule(user, nni_device_cancel, dd);
+ if ((rv = nni_aio_schedule(user, nni_device_cancel, dd)) != 0) {
+ nni_mtx_unlock(&dd->mtx);
+ nni_aio_finish_error(user, rv);
+ return;
+ }
dd->user = user;
for (i = 0; i < dd->npath; i++) {
nni_device_path *p = &dd->paths[i];
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 2741a8e6..7593fb42 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -27,7 +27,7 @@ struct nni_ep {
int ep_closed; // full shutdown
int ep_closing; // close pending (waiting on refcnt)
int ep_refcnt;
- int ep_tmo_run;
+ bool ep_tmo_run;
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_list ep_pipes;
@@ -303,7 +303,7 @@ nni_ep_tmo_cancel(nni_aio *aio, int rv)
if (ep->ep_tmo_run) {
nni_aio_finish_error(aio, rv);
}
- ep->ep_tmo_run = 0;
+ ep->ep_tmo_run = false;
nni_mtx_unlock(&ep->ep_mtx);
}
}
@@ -312,6 +312,7 @@ static void
nni_ep_tmo_start(nni_ep *ep)
{
nni_duration backoff;
+ int rv;
if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) {
return;
@@ -333,8 +334,12 @@ nni_ep_tmo_start(nni_ep *ep)
nni_aio_set_timeout(
ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
- ep->ep_tmo_run = 1;
- nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep);
+ if ((rv = nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep)) !=
+ 0) {
+ nni_aio_finish_error(ep->ep_tmo_aio, rv);
+ }
+
+ ep->ep_tmo_run = true;
}
static void
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 2367b57f..fa94e32f 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -345,7 +345,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
// If this is an instantaneous poll operation, and the queue has
// no room, nobody is waiting to receive, then report NNG_ETIMEDOUT.
- rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
(nni_list_empty(&mq->mq_aio_getq))) {
nni_mtx_unlock(&mq->mq_lock);
@@ -373,7 +373,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
- rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
if ((rv != 0) && (mq->mq_len == 0) &&
(nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
diff --git a/src/core/platform.h b/src/core/platform.h
index 6e7acdbf..bdc74349 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -136,11 +136,16 @@ extern int nni_plat_cv_until(nni_plat_cv *, nni_time);
// immediately.
extern int nni_plat_thr_init(nni_plat_thr *, void (*)(void *), void *);
-// nni_thread_reap waits for the thread to exit, and then releases any
+// nni_plat_thr_fini waits for the thread to exit, and then releases any
// resources associated with the thread. After this returns, it
// is an error to reference the thread in any further way.
extern void nni_plat_thr_fini(nni_plat_thr *);
+// nni_plat_thr_is_self returns true if the caller is the thread
+// identified, and false otherwise. (This allows some deadlock
+// prevention in callbacks, for example.)
+extern bool nni_plat_thr_is_self(nni_plat_thr *);
+
//
// Clock Support
//
diff --git a/src/core/taskq.c b/src/core/taskq.c
index b0fe160b..526fa0b4 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -11,11 +11,22 @@
#include "core/nng_impl.h"
typedef struct nni_taskq_thr nni_taskq_thr;
+struct nni_task {
+ nni_list_node task_node;
+ void * task_arg;
+ nni_cb task_cb;
+ nni_taskq * task_tq;
+ bool task_sched;
+ bool task_run;
+ bool task_done;
+ bool task_exec;
+ bool task_fini;
+ nni_mtx task_mtx;
+ nni_cv task_cv;
+};
struct nni_taskq_thr {
nni_taskq *tqt_tq;
nni_thr tqt_thread;
- nni_task * tqt_running;
- int tqt_wait;
};
struct nni_taskq {
nni_list tq_tasks;
@@ -24,8 +35,7 @@ struct nni_taskq {
nni_cv tq_wait_cv;
nni_taskq_thr *tq_threads;
int tq_nthreads;
- int tq_run;
- int tq_waiting;
+ bool tq_run;
};
static nni_taskq *nni_taskq_systq = NULL;
@@ -40,25 +50,37 @@ nni_taskq_thread(void *self)
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
if ((task = nni_list_first(&tq->tq_tasks)) != NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_run = true;
+ task->task_sched = false;
+ nni_mtx_unlock(&task->task_mtx);
nni_list_remove(&tq->tq_tasks, task);
- thr->tqt_running = task;
nni_mtx_unlock(&tq->tq_mtx);
+
task->task_cb(task->task_arg);
- nni_mtx_lock(&tq->tq_mtx);
- thr->tqt_running = NULL;
- if (thr->tqt_wait || tq->tq_waiting) {
- thr->tqt_wait = 0;
- tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_wait_cv);
- }
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_sched || task->task_exec) {
+ // task resubmitted itself most likely.
+ // We cannot touch the rest of the flags,
+ // since the called function has taken control.
+ nni_mtx_unlock(&task->task_mtx);
+ } else {
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+
+ if (task->task_fini) {
+ task->task_fini = false;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_fini(task);
+ } else {
+ nni_mtx_unlock(&task->task_mtx);
+ }
+ }
+ nni_mtx_lock(&tq->tq_mtx);
continue;
}
- if (tq->tq_waiting) {
- tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_wait_cv);
- }
if (!tq->tq_run) {
break;
}
@@ -89,8 +111,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
for (int i = 0; i < nthr; i++) {
int rv;
- tq->tq_threads[i].tqt_tq = tq;
- tq->tq_threads[i].tqt_running = NULL;
+ tq->tq_threads[i].tqt_tq = tq;
rv = nni_thr_init(&tq->tq_threads[i].tqt_thread,
nni_taskq_thread, &tq->tq_threads[i]);
if (rv != 0) {
@@ -98,7 +119,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
return (rv);
}
}
- tq->tq_run = 1;
+ tq->tq_run = true;
for (int i = 0; i < tq->tq_nthreads; i++) {
nni_thr_run(&tq->tq_threads[i].tqt_thread);
}
@@ -106,53 +127,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
return (0);
}
-static void
-nni_taskq_drain_locked(nni_taskq *tq)
-{
- // We need to first let the taskq completely drain.
- for (;;) {
- int busy = 0;
- if (!nni_list_empty(&tq->tq_tasks)) {
- busy = 1;
- } else {
- int i;
- for (i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running != 0) {
- busy = 1;
- break;
- }
- }
- }
- if (!busy) {
- break;
- }
- tq->tq_waiting++;
- nni_cv_wait(&tq->tq_wait_cv);
- }
-}
-
-void
-nni_taskq_drain(nni_taskq *tq)
-{
- nni_mtx_lock(&tq->tq_mtx);
- nni_taskq_drain_locked(tq);
- nni_mtx_unlock(&tq->tq_mtx);
-}
-
void
nni_taskq_fini(nni_taskq *tq)
{
- // First drain the taskq completely. This is necessary since some
- // tasks that are presently running may need to schedule additional
- // tasks, and we don't want those to block.
if (tq == NULL) {
return;
}
if (tq->tq_run) {
nni_mtx_lock(&tq->tq_mtx);
- nni_taskq_drain_locked(tq);
-
- tq->tq_run = 0;
+ tq->tq_run = false;
nni_cv_wake(&tq->tq_sched_cv);
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -174,90 +157,142 @@ nni_task_dispatch(nni_task *task)
// If there is no callback to perform, then do nothing!
// The user will be none the wiser.
if (task->task_cb == NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_run = false;
+ task->task_exec = false;
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
return;
}
nni_mtx_lock(&tq->tq_mtx);
- // It might already be scheduled... if so don't redo it.
- if (!nni_list_active(&tq->tq_tasks, task)) {
- nni_list_append(&tq->tq_tasks, task);
- }
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = true;
+ task->task_run = false;
+ task->task_done = false;
+ nni_mtx_unlock(&task->task_mtx);
+
+ nni_list_append(&tq->tq_tasks, task);
nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate
nni_mtx_unlock(&tq->tq_mtx);
}
void
-nni_task_wait(nni_task *task)
+nni_task_exec(nni_task *task)
{
- nni_taskq *tq = task->task_tq;
-
if (task->task_cb == NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_run = false;
+ task->task_exec = false;
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
return;
}
- nni_mtx_lock(&tq->tq_mtx);
- for (;;) {
- bool running = false;
- if (nni_list_active(&tq->tq_tasks, task)) {
- running = true;
- } else {
- for (int i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == task) {
- running = true;
- break;
- }
- }
- }
- if (!running) {
- break;
- }
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_exec) {
+ // recursive taskq_exec, run it asynchronously
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_dispatch(task);
+ return;
+ }
+ task->task_exec = true;
+ task->task_sched = false;
+ task->task_done = false;
+ nni_mtx_unlock(&task->task_mtx);
- tq->tq_waiting = 1;
- nni_cv_wait(&tq->tq_wait_cv);
+ task->task_cb(task->task_arg);
+
+ nni_mtx_lock(&task->task_mtx);
+ task->task_exec = false;
+ if (task->task_sched || task->task_run) {
+ // cb scheduled a task
+ nni_mtx_unlock(&task->task_mtx);
+ return;
+ }
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ if (task->task_fini) {
+ task->task_fini = false;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_fini(task);
+ } else {
+ nni_mtx_unlock(&task->task_mtx);
}
- nni_mtx_unlock(&tq->tq_mtx);
}
-int
-nni_task_cancel(nni_task *task)
+void
+nni_task_prep(nni_task *task)
{
- nni_taskq *tq = task->task_tq;
- bool running;
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = true;
+ task->task_done = false;
+ task->task_run = false;
+ nni_mtx_unlock(&task->task_mtx);
+}
- nni_mtx_lock(&tq->tq_mtx);
- running = true;
- for (;;) {
- running = false;
- for (int i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == task) {
- running = true;
- break;
- }
- }
+void
+nni_task_unprep(nni_task *task)
+{
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_done = false;
+ task->task_run = false;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
+}
- if (!running) {
- break;
- }
- // tq->tq_threads[i].tqt_wait = 1;
- tq->tq_waiting++;
- nni_cv_wait(&tq->tq_wait_cv);
+void
+nni_task_wait(nni_task *task)
+{
+ nni_mtx_lock(&task->task_mtx);
+ while ((task->task_sched || task->task_run || task->task_exec) &&
+ (!task->task_done)) {
+ nni_cv_wait(&task->task_cv);
}
+ nni_mtx_unlock(&task->task_mtx);
+}
- if (nni_list_active(&tq->tq_tasks, task)) {
- nni_list_remove(&tq->tq_tasks, task);
+int
+nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg)
+{
+ nni_task *task;
+
+ if ((task = NNI_ALLOC_STRUCT(task)) == NULL) {
+ return (NNG_ENOMEM);
}
- nni_mtx_unlock(&tq->tq_mtx);
+ NNI_LIST_NODE_INIT(&task->task_node);
+ nni_mtx_init(&task->task_mtx);
+ nni_cv_init(&task->task_cv, &task->task_mtx);
+ task->task_sched = false;
+ task->task_done = false;
+ task->task_run = false;
+ task->task_sched = false;
+ task->task_exec = false;
+ task->task_cb = cb;
+ task->task_arg = arg;
+ task->task_tq = tq != NULL ? tq : nni_taskq_systq;
+ *taskp = task;
return (0);
}
void
-nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg)
+nni_task_fini(nni_task *task)
{
- if (tq == NULL) {
- tq = nni_taskq_systq;
+ NNI_ASSERT(!nni_list_node_active(&task->task_node));
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_run || task->task_exec) {
+ // destroy later.
+ task->task_fini = true;
+ nni_mtx_unlock(&task->task_mtx);
+ return;
}
- NNI_LIST_NODE_INIT(&task->task_node);
- task->task_cb = cb;
- task->task_arg = arg;
- task->task_tq = tq;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_cv_fini(&task->task_cv);
+ nni_mtx_fini(&task->task_mtx);
+ NNI_FREE_STRUCT(task);
}
int
diff --git a/src/core/taskq.h b/src/core/taskq.h
index 40b5dc00..513b15bb 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -17,28 +17,22 @@
typedef struct nni_taskq nni_taskq;
typedef struct nni_task nni_task;
-// nni_task is a structure representing a task. Its intended to inlined
-// into structures so that taskq_dispatch can be a guaranteed operation.
-struct nni_task {
- nni_list_node task_node;
- void * task_arg;
- nni_cb task_cb;
- nni_taskq * task_tq;
-};
-
extern int nni_taskq_init(nni_taskq **, int);
extern void nni_taskq_fini(nni_taskq *);
-extern void nni_taskq_drain(nni_taskq *);
// nni_task_dispatch sends the task to the queue. It is guaranteed to
// succeed. (If the queue is shutdown, then the behavior is undefined.)
extern void nni_task_dispatch(nni_task *);
+extern void nni_task_exec(nni_task *);
+extern void nni_task_prep(nni_task *);
+extern void nni_task_unprep(nni_task *);
// nni_task_cancel cancels the task. It will wait for the task to complete
// if it is already running.
extern int nni_task_cancel(nni_task *);
extern void nni_task_wait(nni_task *);
-extern void nni_task_init(nni_taskq *, nni_task *, nni_cb, void *);
+extern int nni_task_init(nni_task **, nni_taskq *, nni_cb, void *);
+extern void nni_task_fini(nni_task *);
extern int nni_taskq_sys_init(void);
extern void nni_taskq_sys_fini(void);
diff --git a/src/core/thread.c b/src/core/thread.c
index 54c9c7d2..adc35542 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -173,3 +173,12 @@ nni_thr_fini(nni_thr *thr)
nni_plat_mtx_fini(&thr->mtx);
thr->init = 0;
}
+
+bool
+nni_thr_is_self(nni_thr *thr)
+{
+ if (!thr->init) {
+ return (false);
+ }
+ return (nni_plat_thr_is_self(&thr->thr));
+}
diff --git a/src/core/thread.h b/src/core/thread.h
index ee83b196..c3d5531e 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -82,4 +82,7 @@ extern void nni_thr_run(nni_thr *thr);
// at all.
extern void nni_thr_wait(nni_thr *thr);
+// nni_thr_is_self returns true if the caller is the named thread.
+extern bool nni_thr_is_self(nni_thr *thr);
+
#endif // CORE_THREAD_H