diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-02-18 19:24:15 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-02-18 19:25:14 -0800 |
| commit | 2944f3a2634876e543003fabb05cba1b178df6aa (patch) | |
| tree | a9a5aba5aec0ff1028e1a4d5815939887d406bb5 /src/core/taskq.c | |
| parent | f42f22abf488d30d4d68a8ab57063c48d618948d (diff) | |
| download | nng-2944f3a2634876e543003fabb05cba1b178df6aa.tar.gz nng-2944f3a2634876e543003fabb05cba1b178df6aa.tar.bz2 nng-2944f3a2634876e543003fabb05cba1b178df6aa.zip | |
Taskq implementation.
Diffstat (limited to 'src/core/taskq.c')
| -rw-r--r-- | src/core/taskq.c | 177 |
1 files changed, 177 insertions, 0 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c new file mode 100644 index 00000000..2b2b0d1a --- /dev/null +++ b/src/core/taskq.c @@ -0,0 +1,177 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +struct nni_taskq { + nni_list tq_ents; + nni_mtx tq_mtx; + nni_cv tq_cv; + nni_thr * tq_threads; + int tq_nthreads; + int tq_close; +}; + +static nni_taskq *nni_taskq_systq = NULL; + +static void +nni_taskq_thread(void *self) +{ + nni_taskq *tq = self; + nni_taskq_ent *ent; + + nni_mtx_lock(&tq->tq_mtx); + for (;;) { + if ((ent = nni_list_first(&tq->tq_ents)) != NULL) { + nni_list_remove(&tq->tq_ents, ent); + ent->tqe_tq = NULL; + nni_mtx_unlock(&tq->tq_mtx); + ent->tqe_cb(ent->tqe_arg); + continue; + } + + if (tq->tq_close) { + break; + } + + nni_cv_wait(&tq->tq_cv); + } + nni_mtx_unlock(&tq->tq_mtx); +} + + +int +nni_taskq_init(nni_taskq **tqp, int nthr) +{ + int rv; + nni_taskq *tq; + int i; + + if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&tq->tq_mtx); + nni_cv_init(&tq->tq_cv, &tq->tq_mtx); + tq->tq_close = 0; + NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node); + + if ((tq->tq_threads = nni_alloc(sizeof (nni_thr) * nthr)) == NULL) { + return (NNG_ENOMEM); + } + for (tq->tq_nthreads = 0; tq->tq_nthreads < nthr; tq->tq_nthreads++) { + rv = nni_thr_init(&tq->tq_threads[tq->tq_nthreads], + nni_taskq_thread, tq); + if (rv != 0) { + goto fail; + } + } + tq->tq_nthreads = nthr; + for (i = 0; i < tq->tq_nthreads; i++) { + nni_thr_run(&tq->tq_threads[i]); + } + *tqp = tq; + return (0); + +fail: + + nni_taskq_fini(tq); + return (rv); +} + + +void +nni_taskq_fini(nni_taskq *tq) +{ + int i; + + nni_mtx_lock(&tq->tq_mtx); + tq->tq_close = 1; + nni_cv_wake(&tq->tq_cv); + nni_mtx_unlock(&tq->tq_mtx); + for (i = 0; i < tq->tq_nthreads; i++) { + nni_thr_fini(&tq->tq_threads[i]); + } + nni_cv_fini(&tq->tq_cv); + nni_mtx_fini(&tq->tq_mtx); + NNI_FREE_STRUCT(tq); +} + + +int +nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent) +{ + if (tq == NULL) { + tq = nni_taskq_systq; + } + + nni_mtx_lock(&tq->tq_mtx); + if (tq->tq_close) { + nni_mtx_unlock(&tq->tq_mtx); + return (NNG_ECLOSED); + } + // It might already be scheduled... if so don't redo it. + if (ent->tqe_tq == NULL) { + ent->tqe_tq = tq; + nni_list_append(&tq->tq_ents, ent); + } + nni_cv_wake(&tq->tq_cv); + nni_mtx_unlock(&tq->tq_mtx); + return (0); +} + + +int +nni_taskq_cancel(nni_taskq_ent *ent) +{ + nni_taskq *tq; + + if ((tq = ent->tqe_tq) == NULL) { + return (NNG_ENOENT); + } + nni_mtx_lock(&tq->tq_mtx); + if (ent->tqe_tq == NULL) { + nni_mtx_unlock(&tq->tq_mtx); + return (NNG_ENOENT); + } + if ((ent->tqe_tq) != tq) { + nni_mtx_unlock(&tq->tq_mtx); + return (NNG_EBUSY); + } + nni_list_remove(&tq->tq_ents, ent); + nni_mtx_unlock(&tq->tq_mtx); + return (0); +} + + +void +nni_taskq_ent_init(nni_taskq_ent *ent, nni_cb cb, void *arg) +{ + NNI_LIST_NODE_INIT(&ent->tqe_node); + ent->tqe_cb = cb; + ent->tqe_arg = arg; + ent->tqe_tq = NULL; +} + + +int +nni_taskq_sys_init(void) +{ + int rv; + + // XXX: Make the "16" = NCPUs * 2 + rv = nni_taskq_init(&nni_taskq_systq, 16); + return (rv); +} + + +void +nni_taskq_sys_fini(void) +{ + nni_taskq_fini(nni_taskq_systq); +} |
