diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 216 | ||||
| -rw-r--r-- | src/core/aio.h | 15 | ||||
| -rw-r--r-- | src/core/device.c | 7 | ||||
| -rw-r--r-- | src/core/endpt.c | 13 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 4 | ||||
| -rw-r--r-- | src/core/platform.h | 7 | ||||
| -rw-r--r-- | src/core/taskq.c | 263 | ||||
| -rw-r--r-- | src/core/taskq.h | 20 | ||||
| -rw-r--r-- | src/core/thread.c | 9 | ||||
| -rw-r--r-- | src/core/thread.h | 3 |
10 files changed, 297 insertions, 260 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 3606aa14..a5b6c088 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -17,6 +17,7 @@ static nni_cv nni_aio_expire_cv; static int nni_aio_expire_run; static nni_thr nni_aio_expire_thr; static nni_list nni_aio_expire_aios; +static nni_aio *nni_aio_expire_aio; // Design notes. // @@ -62,16 +63,11 @@ struct nng_aio { nni_duration a_timeout; // Relative timeout // These fields are private to the aio framework. - nni_cv a_cv; - bool a_fini : 1; // shutting down (no new operations) - bool a_done : 1; // operation has completed - bool a_pend : 1; // completion routine pending - bool a_active : 1; // aio was started - bool a_expiring : 1; // expiration callback in progress - bool a_waiting : 1; // a thread is waiting for this to finish - bool a_synch : 1; // run completion synchronously - bool a_sleep : 1; // sleeping with no action - nni_task a_task; + bool a_stop; // shutting down (no new operations) + bool a_sleep; // sleeping with no action + int a_sleeprv; // result when sleep wakes + int a_cancelrv; // if canceled between begin and schedule + nni_task *a_task; // Read/write operations. nni_iov *a_iov; @@ -109,12 +105,16 @@ int nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) { nni_aio *aio; + int rv; if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { return (NNG_ENOMEM); } memset(aio, 0, sizeof(*aio)); - nni_cv_init(&aio->a_cv, &nni_aio_lk); + if ((rv = nni_task_init(&aio->a_task, NULL, cb, arg)) != 0) { + NNI_FREE_STRUCT(aio); + return (rv); + } aio->a_expire = NNI_TIME_NEVER; aio->a_timeout = NNG_DURATION_INFINITE; aio->a_iov = aio->a_iovinl; @@ -122,7 +122,6 @@ nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) if (arg == NULL) { arg = aio; } - nni_task_init(NULL, &aio->a_task, cb, arg); *aiop = aio; return (0); } @@ -133,9 +132,19 @@ nni_aio_fini(nni_aio *aio) if (aio != NULL) { nni_aio_stop(aio); - // At this point the AIO is done. - nni_cv_fini(&aio->a_cv); + // Wait for the aio to be "done"; this ensures that we don't + // destroy an aio from a "normal" completion callback while + // the expiration thread is working. + + nni_mtx_lock(&nni_aio_lk); + while (nni_aio_expire_aio == aio) { + nni_cv_wait(&nni_aio_expire_cv); + } + nni_mtx_unlock(&nni_aio_lk); + nni_task_fini(aio->a_task); + + // At this point the AIO is done. if (aio->a_niovalloc > 0) { NNI_FREE_STRUCTS(aio->a_iovalloc, aio->a_niovalloc); } @@ -186,7 +195,7 @@ nni_aio_stop(nni_aio *aio) { if (aio != NULL) { nni_mtx_lock(&nni_aio_lk); - aio->a_fini = true; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); nni_aio_abort(aio, NNG_ECANCELED); @@ -279,15 +288,7 @@ nni_aio_count(nni_aio *aio) void nni_aio_wait(nni_aio *aio) { - nni_mtx_lock(&nni_aio_lk); - // Wait until we're done, and the synchronous completion flag - // is cleared (meaning any synch completion is finished). - while ((aio->a_active) && ((!aio->a_done) || (aio->a_synch))) { - aio->a_waiting = true; - nni_cv_wait(&aio->a_cv); - } - nni_mtx_unlock(&nni_aio_lk); - nni_task_wait(&aio->a_task); + nni_task_wait(aio->a_task); } int @@ -295,35 +296,34 @@ nni_aio_begin(nni_aio *aio) { nni_mtx_lock(&nni_aio_lk); // We should not reschedule anything at this point. - if (aio->a_fini) { - aio->a_active = false; + if (aio->a_stop) { + nni_task_unprep(aio->a_task); aio->a_result = NNG_ECANCELED; nni_mtx_unlock(&nni_aio_lk); return (NNG_ECANCELED); } - aio->a_done = false; - aio->a_pend = false; aio->a_result = 0; aio->a_count = 0; aio->a_prov_cancel = NULL; aio->a_prov_data = NULL; - aio->a_active = true; + aio->a_cancelrv = 0; for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } + nni_task_prep(aio->a_task); nni_mtx_unlock(&nni_aio_lk); return (0); } -void +int nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) { + int rv; if (!aio->a_sleep) { // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { case NNG_DURATION_ZERO: - aio->a_expire = nni_clock(); - break; + return (NNG_ETIMEDOUT); case NNG_DURATION_INFINITE: case NNG_DURATION_DEFAULT: aio->a_expire = NNI_TIME_NEVER; @@ -335,22 +335,26 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) } nni_mtx_lock(&nni_aio_lk); + if (aio->a_stop) { + nni_mtx_unlock(&nni_aio_lk); + return (NNG_ECANCELED); + } + if ((rv = aio->a_cancelrv) != 0) { + nni_mtx_unlock(&nni_aio_lk); + return (rv); + } + + // If cancellation occurred in between "begin" and "schedule", + // then cancel it right now. aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; - if (aio->a_expire != NNI_TIME_NEVER) { + if ((rv = aio->a_cancelrv) != 0) { + aio->a_expire = 0; + nni_aio_expire_add(aio); + } else if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } nni_mtx_unlock(&nni_aio_lk); -} - -int -nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) -{ - - if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) { - return (NNG_ETIMEDOUT); - } - nni_aio_schedule(aio, cancelfn, data); return (0); } @@ -379,11 +383,8 @@ nni_aio_finish_impl( { nni_mtx_lock(&nni_aio_lk); - NNI_ASSERT(!aio->a_pend); // provider only calls us *once* - nni_list_node_remove(&aio->a_expire_node); - aio->a_pend = true; aio->a_result = rv; aio->a_count = count; aio->a_prov_cancel = NULL; @@ -393,38 +394,13 @@ nni_aio_finish_impl( aio->a_expire = NNI_TIME_NEVER; aio->a_sleep = false; - - // If we are expiring, then we rely on the expiration thread to - // complete this; we must not because the expiration thread is - // still holding the reference. - - if (aio->a_expiring) { - nni_mtx_unlock(&nni_aio_lk); - return; - } - - aio->a_done = true; - aio->a_synch = synch; + nni_mtx_unlock(&nni_aio_lk); if (synch) { - if (aio->a_task.task_cb != NULL) { - nni_mtx_unlock(&nni_aio_lk); - aio->a_task.task_cb(aio->a_task.task_arg); - nni_mtx_lock(&nni_aio_lk); - } + nni_task_exec(aio->a_task); } else { - nni_task_dispatch(&aio->a_task); + nni_task_dispatch(aio->a_task); } - aio->a_synch = false; - - if (aio->a_waiting) { - aio->a_waiting = false; - nni_cv_wake(&aio->a_cv); - } - - // This has to be done with the lock still held, in order - // to prevent taskq wait from returning prematurely. - nni_mtx_unlock(&nni_aio_lk); } void @@ -510,25 +486,27 @@ nni_aio_expire_add(nni_aio *aio) static void nni_aio_expire_loop(void *arg) { - nni_list * aios = &nni_aio_expire_aios; - nni_aio * aio; - nni_time now; - nni_aio_cancelfn cancelfn; - int rv; + nni_list *aios = &nni_aio_expire_aios; NNI_ARG_UNUSED(arg); for (;;) { + nni_aio_cancelfn cancelfn; + nni_time now; + nni_aio * aio; + int rv; + now = nni_clock(); nni_mtx_lock(&nni_aio_lk); - if (nni_aio_expire_run == 0) { - nni_mtx_unlock(&nni_aio_lk); - return; - } - if ((aio = nni_list_first(aios)) == NULL) { + + if (nni_aio_expire_run == 0) { + nni_mtx_unlock(&nni_aio_lk); + return; + } + nni_cv_wait(&nni_aio_expire_cv); nni_mtx_unlock(&nni_aio_lk); continue; @@ -544,38 +522,24 @@ nni_aio_expire_loop(void *arg) // This aio's time has come. Expire it, canceling any // outstanding I/O. nni_list_remove(aios, aio); + rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT; + + if ((cancelfn = aio->a_prov_cancel) != NULL) { + + // Place a temporary hold on the aio. This prevents it + // from being destroyed. + nni_aio_expire_aio = aio; - // Mark it as expiring. This acts as a hold on - // the aio, similar to the consumers. The actual taskq - // dispatch on completion won't occur until this is cleared, - // and the done flag won't be set either. - aio->a_expiring = true; - cancelfn = aio->a_prov_cancel; - rv = aio->a_sleep ? 0 : NNG_ETIMEDOUT; - aio->a_sleep = false; - - // Cancel any outstanding activity. This is always non-NULL - // for a valid aio, and becomes NULL only when an AIO is - // already being canceled or finished. - if (cancelfn != NULL) { + // We let the cancel function handle the completion. + // If there is no cancellation function, then we cannot + // terminate the aio - we've tried, but it has to run + // to it's natural conclusion. nni_mtx_unlock(&nni_aio_lk); cancelfn(aio, rv); nni_mtx_lock(&nni_aio_lk); - } else { - aio->a_pend = true; - aio->a_result = rv; - } - - NNI_ASSERT(aio->a_pend); // nni_aio_finish was run - NNI_ASSERT(aio->a_prov_cancel == NULL); - aio->a_expiring = false; - aio->a_done = true; - nni_task_dispatch(&aio->a_task); - - if (aio->a_waiting) { - aio->a_waiting = false; - nni_cv_wake(&aio->a_cv); + nni_aio_expire_aio = NULL; + nni_cv_wake(&nni_aio_expire_cv); } nni_mtx_unlock(&nni_aio_lk); } @@ -656,12 +620,31 @@ nni_aio_iov_advance(nni_aio *aio, size_t n) return (resid); // we might not have used all of n for this iov } +static void +nni_sleep_cancel(nng_aio *aio, int rv) +{ + nni_mtx_lock(&nni_aio_lk); + if (!aio->a_sleep) { + nni_mtx_unlock(&nni_aio_lk); + return; + } + + aio->a_sleep = false; + nni_list_node_remove(&aio->a_expire_node); + nni_mtx_unlock(&nni_aio_lk); + + nni_aio_finish_error(aio, rv); +} + void nni_sleep_aio(nng_duration ms, nng_aio *aio) { + int rv; if (nni_aio_begin(aio) != 0) { return; } + aio->a_sleeprv = 0; + aio->a_sleep = true; switch (aio->a_timeout) { case NNG_DURATION_DEFAULT: case NNG_DURATION_INFINITE: @@ -671,16 +654,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) // If the timeout on the aio is shorter than our sleep time, // then let it still wake up early, but with NNG_ETIMEDOUT. if (ms > aio->a_timeout) { - aio->a_sleep = false; - (void) nni_aio_schedule(aio, NULL, NULL); - return; + aio->a_sleeprv = NNG_ETIMEDOUT; + ms = aio->a_timeout; } } - aio->a_sleep = true; aio->a_expire = nni_clock() + ms; - // There is no cancellation, apart from just unexpiring. - nni_aio_schedule(aio, NULL, NULL); + if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) { + nni_aio_finish_error(aio, rv); + } } void diff --git a/src/core/aio.h b/src/core/aio.h index 9b7ac46f..2ed0fb5b 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -146,14 +146,13 @@ extern void nni_aio_bump_count(nni_aio *, size_t); // nni_aio_schedule indicates that the AIO has begun, and is scheduled for // asychronous completion. This also starts the expiration timer. Note that -// prior to this, the aio is uncancellable. -extern void nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *); - -// nni_aio_schedule_verify is like nni_aio_schedule, except that if the -// operation has been run with a zero time (NNG_FLAG_NONBLOCK), then it -// returns NNG_ETIMEDOUT. This is done to permit bypassing scheduling -// if the operation could not be immediately completed. -extern int nni_aio_schedule_verify(nni_aio *, nni_aio_cancelfn, void *); +// prior to this, the aio is uncancellable. If the operation has a zero +// timeout (NNG_FLAG_NONBLOCK) then NNG_ETIMEDOUT is returned. If the +// operation has already been canceled, or should not be run, then an error +// is returned. (In that case the caller should probably either return an +// error to its caller, or possibly cause an asynchronous error by calling +// nni_aio_finish_error on this aio.) +extern int nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *); extern void nni_sleep_aio(nni_duration, nni_aio *); diff --git a/src/core/device.c b/src/core/device.c index e3b1d220..1f3bf233 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -187,12 +187,17 @@ void nni_device_start(nni_device_data *dd, nni_aio *user) { int i; + int rv; if (nni_aio_begin(user) != 0) { return; } nni_mtx_lock(&dd->mtx); - nni_aio_schedule(user, nni_device_cancel, dd); + if ((rv = nni_aio_schedule(user, nni_device_cancel, dd)) != 0) { + nni_mtx_unlock(&dd->mtx); + nni_aio_finish_error(user, rv); + return; + } dd->user = user; for (i = 0; i < dd->npath; i++) { nni_device_path *p = &dd->paths[i]; diff --git a/src/core/endpt.c b/src/core/endpt.c index 2741a8e6..7593fb42 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -27,7 +27,7 @@ struct nni_ep { int ep_closed; // full shutdown int ep_closing; // close pending (waiting on refcnt) int ep_refcnt; - int ep_tmo_run; + bool ep_tmo_run; nni_mtx ep_mtx; nni_cv ep_cv; nni_list ep_pipes; @@ -303,7 +303,7 @@ nni_ep_tmo_cancel(nni_aio *aio, int rv) if (ep->ep_tmo_run) { nni_aio_finish_error(aio, rv); } - ep->ep_tmo_run = 0; + ep->ep_tmo_run = false; nni_mtx_unlock(&ep->ep_mtx); } } @@ -312,6 +312,7 @@ static void nni_ep_tmo_start(nni_ep *ep) { nni_duration backoff; + int rv; if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) { return; @@ -333,8 +334,12 @@ nni_ep_tmo_start(nni_ep *ep) nni_aio_set_timeout( ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0)); - ep->ep_tmo_run = 1; - nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep); + if ((rv = nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep)) != + 0) { + nni_aio_finish_error(ep->ep_tmo_aio, rv); + } + + ep->ep_tmo_run = true; } static void diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 2367b57f..fa94e32f 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -345,7 +345,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) // If this is an instantaneous poll operation, and the queue has // no room, nobody is waiting to receive, then report NNG_ETIMEDOUT. - rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + rv = nni_aio_schedule(aio, nni_msgq_cancel, mq); if ((rv != 0) && (mq->mq_len >= mq->mq_cap) && (nni_list_empty(&mq->mq_aio_getq))) { nni_mtx_unlock(&mq->mq_lock); @@ -373,7 +373,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_aio_finish_error(aio, mq->mq_geterr); return; } - rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + rv = nni_aio_schedule(aio, nni_msgq_cancel, mq); if ((rv != 0) && (mq->mq_len == 0) && (nni_list_empty(&mq->mq_aio_putq))) { nni_mtx_unlock(&mq->mq_lock); diff --git a/src/core/platform.h b/src/core/platform.h index 6e7acdbf..bdc74349 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -136,11 +136,16 @@ extern int nni_plat_cv_until(nni_plat_cv *, nni_time); // immediately. extern int nni_plat_thr_init(nni_plat_thr *, void (*)(void *), void *); -// nni_thread_reap waits for the thread to exit, and then releases any +// nni_plat_thr_fini waits for the thread to exit, and then releases any // resources associated with the thread. After this returns, it // is an error to reference the thread in any further way. extern void nni_plat_thr_fini(nni_plat_thr *); +// nni_plat_thr_is_self returns true if the caller is the thread +// identified, and false otherwise. (This allows some deadlock +// prevention in callbacks, for example.) +extern bool nni_plat_thr_is_self(nni_plat_thr *); + // // Clock Support // diff --git a/src/core/taskq.c b/src/core/taskq.c index b0fe160b..526fa0b4 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -11,11 +11,22 @@ #include "core/nng_impl.h" typedef struct nni_taskq_thr nni_taskq_thr; +struct nni_task { + nni_list_node task_node; + void * task_arg; + nni_cb task_cb; + nni_taskq * task_tq; + bool task_sched; + bool task_run; + bool task_done; + bool task_exec; + bool task_fini; + nni_mtx task_mtx; + nni_cv task_cv; +}; struct nni_taskq_thr { nni_taskq *tqt_tq; nni_thr tqt_thread; - nni_task * tqt_running; - int tqt_wait; }; struct nni_taskq { nni_list tq_tasks; @@ -24,8 +35,7 @@ struct nni_taskq { nni_cv tq_wait_cv; nni_taskq_thr *tq_threads; int tq_nthreads; - int tq_run; - int tq_waiting; + bool tq_run; }; static nni_taskq *nni_taskq_systq = NULL; @@ -40,25 +50,37 @@ nni_taskq_thread(void *self) nni_mtx_lock(&tq->tq_mtx); for (;;) { if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_run = true; + task->task_sched = false; + nni_mtx_unlock(&task->task_mtx); nni_list_remove(&tq->tq_tasks, task); - thr->tqt_running = task; nni_mtx_unlock(&tq->tq_mtx); + task->task_cb(task->task_arg); - nni_mtx_lock(&tq->tq_mtx); - thr->tqt_running = NULL; - if (thr->tqt_wait || tq->tq_waiting) { - thr->tqt_wait = 0; - tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_wait_cv); - } + nni_mtx_lock(&task->task_mtx); + if (task->task_sched || task->task_exec) { + // task resubmitted itself most likely. + // We cannot touch the rest of the flags, + // since the called function has taken control. + nni_mtx_unlock(&task->task_mtx); + } else { + task->task_done = true; + nni_cv_wake(&task->task_cv); + + if (task->task_fini) { + task->task_fini = false; + nni_mtx_unlock(&task->task_mtx); + nni_task_fini(task); + } else { + nni_mtx_unlock(&task->task_mtx); + } + } + nni_mtx_lock(&tq->tq_mtx); continue; } - if (tq->tq_waiting) { - tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_wait_cv); - } if (!tq->tq_run) { break; } @@ -89,8 +111,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr) for (int i = 0; i < nthr; i++) { int rv; - tq->tq_threads[i].tqt_tq = tq; - tq->tq_threads[i].tqt_running = NULL; + tq->tq_threads[i].tqt_tq = tq; rv = nni_thr_init(&tq->tq_threads[i].tqt_thread, nni_taskq_thread, &tq->tq_threads[i]); if (rv != 0) { @@ -98,7 +119,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr) return (rv); } } - tq->tq_run = 1; + tq->tq_run = true; for (int i = 0; i < tq->tq_nthreads; i++) { nni_thr_run(&tq->tq_threads[i].tqt_thread); } @@ -106,53 +127,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr) return (0); } -static void -nni_taskq_drain_locked(nni_taskq *tq) -{ - // We need to first let the taskq completely drain. - for (;;) { - int busy = 0; - if (!nni_list_empty(&tq->tq_tasks)) { - busy = 1; - } else { - int i; - for (i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running != 0) { - busy = 1; - break; - } - } - } - if (!busy) { - break; - } - tq->tq_waiting++; - nni_cv_wait(&tq->tq_wait_cv); - } -} - -void -nni_taskq_drain(nni_taskq *tq) -{ - nni_mtx_lock(&tq->tq_mtx); - nni_taskq_drain_locked(tq); - nni_mtx_unlock(&tq->tq_mtx); -} - void nni_taskq_fini(nni_taskq *tq) { - // First drain the taskq completely. This is necessary since some - // tasks that are presently running may need to schedule additional - // tasks, and we don't want those to block. if (tq == NULL) { return; } if (tq->tq_run) { nni_mtx_lock(&tq->tq_mtx); - nni_taskq_drain_locked(tq); - - tq->tq_run = 0; + tq->tq_run = false; nni_cv_wake(&tq->tq_sched_cv); nni_mtx_unlock(&tq->tq_mtx); } @@ -174,90 +157,142 @@ nni_task_dispatch(nni_task *task) // If there is no callback to perform, then do nothing! // The user will be none the wiser. if (task->task_cb == NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_run = false; + task->task_exec = false; + task->task_done = true; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); return; } nni_mtx_lock(&tq->tq_mtx); - // It might already be scheduled... if so don't redo it. - if (!nni_list_active(&tq->tq_tasks, task)) { - nni_list_append(&tq->tq_tasks, task); - } + nni_mtx_lock(&task->task_mtx); + task->task_sched = true; + task->task_run = false; + task->task_done = false; + nni_mtx_unlock(&task->task_mtx); + + nni_list_append(&tq->tq_tasks, task); nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate nni_mtx_unlock(&tq->tq_mtx); } void -nni_task_wait(nni_task *task) +nni_task_exec(nni_task *task) { - nni_taskq *tq = task->task_tq; - if (task->task_cb == NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_run = false; + task->task_exec = false; + task->task_done = true; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); return; } - nni_mtx_lock(&tq->tq_mtx); - for (;;) { - bool running = false; - if (nni_list_active(&tq->tq_tasks, task)) { - running = true; - } else { - for (int i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == task) { - running = true; - break; - } - } - } - if (!running) { - break; - } + nni_mtx_lock(&task->task_mtx); + if (task->task_exec) { + // recursive taskq_exec, run it asynchronously + nni_mtx_unlock(&task->task_mtx); + nni_task_dispatch(task); + return; + } + task->task_exec = true; + task->task_sched = false; + task->task_done = false; + nni_mtx_unlock(&task->task_mtx); - tq->tq_waiting = 1; - nni_cv_wait(&tq->tq_wait_cv); + task->task_cb(task->task_arg); + + nni_mtx_lock(&task->task_mtx); + task->task_exec = false; + if (task->task_sched || task->task_run) { + // cb scheduled a task + nni_mtx_unlock(&task->task_mtx); + return; + } + task->task_done = true; + nni_cv_wake(&task->task_cv); + if (task->task_fini) { + task->task_fini = false; + nni_mtx_unlock(&task->task_mtx); + nni_task_fini(task); + } else { + nni_mtx_unlock(&task->task_mtx); } - nni_mtx_unlock(&tq->tq_mtx); } -int -nni_task_cancel(nni_task *task) +void +nni_task_prep(nni_task *task) { - nni_taskq *tq = task->task_tq; - bool running; + nni_mtx_lock(&task->task_mtx); + task->task_sched = true; + task->task_done = false; + task->task_run = false; + nni_mtx_unlock(&task->task_mtx); +} - nni_mtx_lock(&tq->tq_mtx); - running = true; - for (;;) { - running = false; - for (int i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == task) { - running = true; - break; - } - } +void +nni_task_unprep(nni_task *task) +{ + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_done = false; + task->task_run = false; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); +} - if (!running) { - break; - } - // tq->tq_threads[i].tqt_wait = 1; - tq->tq_waiting++; - nni_cv_wait(&tq->tq_wait_cv); +void +nni_task_wait(nni_task *task) +{ + nni_mtx_lock(&task->task_mtx); + while ((task->task_sched || task->task_run || task->task_exec) && + (!task->task_done)) { + nni_cv_wait(&task->task_cv); } + nni_mtx_unlock(&task->task_mtx); +} - if (nni_list_active(&tq->tq_tasks, task)) { - nni_list_remove(&tq->tq_tasks, task); +int +nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) +{ + nni_task *task; + + if ((task = NNI_ALLOC_STRUCT(task)) == NULL) { + return (NNG_ENOMEM); } - nni_mtx_unlock(&tq->tq_mtx); + NNI_LIST_NODE_INIT(&task->task_node); + nni_mtx_init(&task->task_mtx); + nni_cv_init(&task->task_cv, &task->task_mtx); + task->task_sched = false; + task->task_done = false; + task->task_run = false; + task->task_sched = false; + task->task_exec = false; + task->task_cb = cb; + task->task_arg = arg; + task->task_tq = tq != NULL ? tq : nni_taskq_systq; + *taskp = task; return (0); } void -nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg) +nni_task_fini(nni_task *task) { - if (tq == NULL) { - tq = nni_taskq_systq; + NNI_ASSERT(!nni_list_node_active(&task->task_node)); + nni_mtx_lock(&task->task_mtx); + if (task->task_run || task->task_exec) { + // destroy later. + task->task_fini = true; + nni_mtx_unlock(&task->task_mtx); + return; } - NNI_LIST_NODE_INIT(&task->task_node); - task->task_cb = cb; - task->task_arg = arg; - task->task_tq = tq; + nni_mtx_unlock(&task->task_mtx); + nni_cv_fini(&task->task_cv); + nni_mtx_fini(&task->task_mtx); + NNI_FREE_STRUCT(task); } int diff --git a/src/core/taskq.h b/src/core/taskq.h index 40b5dc00..513b15bb 100644 --- a/src/core/taskq.h +++ b/src/core/taskq.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -17,28 +17,22 @@ typedef struct nni_taskq nni_taskq; typedef struct nni_task nni_task; -// nni_task is a structure representing a task. Its intended to inlined -// into structures so that taskq_dispatch can be a guaranteed operation. -struct nni_task { - nni_list_node task_node; - void * task_arg; - nni_cb task_cb; - nni_taskq * task_tq; -}; - extern int nni_taskq_init(nni_taskq **, int); extern void nni_taskq_fini(nni_taskq *); -extern void nni_taskq_drain(nni_taskq *); // nni_task_dispatch sends the task to the queue. It is guaranteed to // succeed. (If the queue is shutdown, then the behavior is undefined.) extern void nni_task_dispatch(nni_task *); +extern void nni_task_exec(nni_task *); +extern void nni_task_prep(nni_task *); +extern void nni_task_unprep(nni_task *); // nni_task_cancel cancels the task. It will wait for the task to complete // if it is already running. extern int nni_task_cancel(nni_task *); extern void nni_task_wait(nni_task *); -extern void nni_task_init(nni_taskq *, nni_task *, nni_cb, void *); +extern int nni_task_init(nni_task **, nni_taskq *, nni_cb, void *); +extern void nni_task_fini(nni_task *); extern int nni_taskq_sys_init(void); extern void nni_taskq_sys_fini(void); diff --git a/src/core/thread.c b/src/core/thread.c index 54c9c7d2..adc35542 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -173,3 +173,12 @@ nni_thr_fini(nni_thr *thr) nni_plat_mtx_fini(&thr->mtx); thr->init = 0; } + +bool +nni_thr_is_self(nni_thr *thr) +{ + if (!thr->init) { + return (false); + } + return (nni_plat_thr_is_self(&thr->thr)); +} diff --git a/src/core/thread.h b/src/core/thread.h index ee83b196..c3d5531e 100644 --- a/src/core/thread.h +++ b/src/core/thread.h @@ -82,4 +82,7 @@ extern void nni_thr_run(nni_thr *thr); // at all. extern void nni_thr_wait(nni_thr *thr); +// nni_thr_is_self returns true if the caller is the named thread. +extern bool nni_thr_is_self(nni_thr *thr); + #endif // CORE_THREAD_H |
