diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-02-18 19:24:15 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-02-18 19:25:14 -0800 |
| commit | 2944f3a2634876e543003fabb05cba1b178df6aa (patch) | |
| tree | a9a5aba5aec0ff1028e1a4d5815939887d406bb5 /src | |
| parent | f42f22abf488d30d4d68a8ab57063c48d618948d (diff) | |
| download | nng-2944f3a2634876e543003fabb05cba1b178df6aa.tar.gz nng-2944f3a2634876e543003fabb05cba1b178df6aa.tar.bz2 nng-2944f3a2634876e543003fabb05cba1b178df6aa.zip | |
Taskq implementation.
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/defs.h | 4 | ||||
| -rw-r--r-- | src/core/init.c | 18 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/random.c | 4 | ||||
| -rw-r--r-- | src/core/random.h | 8 | ||||
| -rw-r--r-- | src/core/taskq.c | 177 | ||||
| -rw-r--r-- | src/core/taskq.h | 32 | ||||
| -rw-r--r-- | src/core/transport.c | 8 | ||||
| -rw-r--r-- | src/core/transport.h | 4 |
10 files changed, 242 insertions, 16 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c81286c1..6efecd0d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -64,6 +64,8 @@ set (NNG_SOURCES core/random.h core/socket.c core/socket.h + core/taskq.c + core/taskq.h core/thread.c core/thread.h core/transport.c diff --git a/src/core/defs.h b/src/core/defs.h index abb340bc..0a04b732 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -50,6 +50,10 @@ typedef uint64_t nni_time; // Abs. time (usec). typedef int64_t nni_duration; // Rel. time (usec). typedef void (*nni_worker)(void *); +typedef struct nni_taskq nni_taskq; +typedef struct nni_taskq_ent nni_taskq_ent; +typedef void (*nni_cb)(void *); + // Used by transports for scatter gather I/O. typedef struct { void * iov_buf; diff --git a/src/core/init.c b/src/core/init.c index 66ad9725..0e27e289 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -26,17 +26,24 @@ nni_init_helper(void) { int rv; - if ((rv = nni_random_init()) != 0) { + if ((rv = nni_taskq_sys_init()) != 0) { + return (rv); + } + if ((rv = nni_random_sys_init()) != 0) { + nni_taskq_sys_fini(); return (rv); } if ((rv = nni_mtx_init(&nni_idlock_x)) != 0) { + nni_random_sys_fini(); + nni_taskq_sys_fini(); return (rv); } if (((rv = nni_idhash_init(&nni_endpoints_x)) != 0) || ((rv = nni_idhash_init(&nni_pipes_x)) != 0) || ((rv = nni_idhash_init(&nni_sockets_x)) != 0)) { nni_mtx_fini(&nni_idlock_x); - nni_random_fini(); + nni_random_sys_fini(); + nni_taskq_sys_fini(); return (rv); } nni_idhash_set_limits(&nni_pipes_x, 1, 0x7fffffff, @@ -49,7 +56,7 @@ nni_init_helper(void) nni_endpoints = &nni_endpoints_x; nni_sockets = &nni_sockets_x; - nni_tran_init(); + nni_tran_sys_init(); return (0); } @@ -68,7 +75,8 @@ nni_fini(void) nni_idhash_fini(&nni_pipes_x); nni_idhash_fini(&nni_sockets_x); nni_mtx_fini(&nni_idlock_x); - nni_tran_fini(); - nni_random_fini(); + nni_tran_sys_fini(); + nni_random_sys_fini(); + nni_taskq_sys_fini(); nni_plat_fini(); } diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 871fcf67..91dce28a 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -35,6 +35,7 @@ #include "core/platform.h" #include "core/protocol.h" #include "core/random.h" +#include "core/taskq.h" #include "core/thread.h" #include "core/transport.h" diff --git a/src/core/random.c b/src/core/random.c index b2ce0ef7..00c0bcf7 100644 --- a/src/core/random.c +++ b/src/core/random.c @@ -153,7 +153,7 @@ nni_isaac_randinit(nni_isaac_ctx *ctx, int flag) static nni_isaac_ctx nni_random_ctx; int -nni_random_init(void) +nni_random_sys_init(void) { // minimally, grab the system clock nni_isaac_ctx *ctx = &nni_random_ctx; @@ -189,7 +189,7 @@ nni_random(void) void -nni_random_fini(void) +nni_random_sys_fini(void) { nni_mtx_fini(&nni_random_ctx.mx); } diff --git a/src/core/random.h b/src/core/random.h index cf365380..33229b09 100644 --- a/src/core/random.h +++ b/src/core/random.h @@ -10,12 +10,12 @@ #ifndef CORE_RANDOM_H #define CORE_RANDOM_H -// nni_random_init initializes the pRNG subsystem. This includes obtaining +// nni_random_sys_init initializes the pRNG subsystem. This includes obtaining // suitable seeding material from the platform. -extern int nni_random_init(void); +extern int nni_random_sys_init(void); -// nni_random_fini destroys the pRNG subsystem. -extern void nni_random_fini(void); +// nni_random_sys_fini destroys the pRNG subsystem. +extern void nni_random_sys_fini(void); // nni_random returns a random 32-bit integer. Note that this routine is // thread-safe/reentrant. The pRNG is very robust, should be of crypto diff --git a/src/core/taskq.c b/src/core/taskq.c new file mode 100644 index 00000000..2b2b0d1a --- /dev/null +++ b/src/core/taskq.c @@ -0,0 +1,177 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +struct nni_taskq { + nni_list tq_ents; + nni_mtx tq_mtx; + nni_cv tq_cv; + nni_thr * tq_threads; + int tq_nthreads; + int tq_close; +}; + +static nni_taskq *nni_taskq_systq = NULL; + +static void +nni_taskq_thread(void *self) +{ + nni_taskq *tq = self; + nni_taskq_ent *ent; + + nni_mtx_lock(&tq->tq_mtx); + for (;;) { + if ((ent = nni_list_first(&tq->tq_ents)) != NULL) { + nni_list_remove(&tq->tq_ents, ent); + ent->tqe_tq = NULL; + nni_mtx_unlock(&tq->tq_mtx); + ent->tqe_cb(ent->tqe_arg); + continue; + } + + if (tq->tq_close) { + break; + } + + nni_cv_wait(&tq->tq_cv); + } + nni_mtx_unlock(&tq->tq_mtx); +} + + +int +nni_taskq_init(nni_taskq **tqp, int nthr) +{ + int rv; + nni_taskq *tq; + int i; + + if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&tq->tq_mtx); + nni_cv_init(&tq->tq_cv, &tq->tq_mtx); + tq->tq_close = 0; + NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node); + + if ((tq->tq_threads = nni_alloc(sizeof (nni_thr) * nthr)) == NULL) { + return (NNG_ENOMEM); + } + for (tq->tq_nthreads = 0; tq->tq_nthreads < nthr; tq->tq_nthreads++) { + rv = nni_thr_init(&tq->tq_threads[tq->tq_nthreads], + nni_taskq_thread, tq); + if (rv != 0) { + goto fail; + } + } + tq->tq_nthreads = nthr; + for (i = 0; i < tq->tq_nthreads; i++) { + nni_thr_run(&tq->tq_threads[i]); + } + *tqp = tq; + return (0); + +fail: + + nni_taskq_fini(tq); + return (rv); +} + + +void +nni_taskq_fini(nni_taskq *tq) +{ + int i; + + nni_mtx_lock(&tq->tq_mtx); + tq->tq_close = 1; + nni_cv_wake(&tq->tq_cv); + nni_mtx_unlock(&tq->tq_mtx); + for (i = 0; i < tq->tq_nthreads; i++) { + nni_thr_fini(&tq->tq_threads[i]); + } + nni_cv_fini(&tq->tq_cv); + nni_mtx_fini(&tq->tq_mtx); + NNI_FREE_STRUCT(tq); +} + + +int +nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent) +{ + if (tq == NULL) { + tq = nni_taskq_systq; + } + + nni_mtx_lock(&tq->tq_mtx); + if (tq->tq_close) { + nni_mtx_unlock(&tq->tq_mtx); + return (NNG_ECLOSED); + } + // It might already be scheduled... if so don't redo it. + if (ent->tqe_tq == NULL) { + ent->tqe_tq = tq; + nni_list_append(&tq->tq_ents, ent); + } + nni_cv_wake(&tq->tq_cv); + nni_mtx_unlock(&tq->tq_mtx); + return (0); +} + + +int +nni_taskq_cancel(nni_taskq_ent *ent) +{ + nni_taskq *tq; + + if ((tq = ent->tqe_tq) == NULL) { + return (NNG_ENOENT); + } + nni_mtx_lock(&tq->tq_mtx); + if (ent->tqe_tq == NULL) { + nni_mtx_unlock(&tq->tq_mtx); + return (NNG_ENOENT); + } + if ((ent->tqe_tq) != tq) { + nni_mtx_unlock(&tq->tq_mtx); + return (NNG_EBUSY); + } + nni_list_remove(&tq->tq_ents, ent); + nni_mtx_unlock(&tq->tq_mtx); + return (0); +} + + +void +nni_taskq_ent_init(nni_taskq_ent *ent, nni_cb cb, void *arg) +{ + NNI_LIST_NODE_INIT(&ent->tqe_node); + ent->tqe_cb = cb; + ent->tqe_arg = arg; + ent->tqe_tq = NULL; +} + + +int +nni_taskq_sys_init(void) +{ + int rv; + + // XXX: Make the "16" = NCPUs * 2 + rv = nni_taskq_init(&nni_taskq_systq, 16); + return (rv); +} + + +void +nni_taskq_sys_fini(void) +{ + nni_taskq_fini(nni_taskq_systq); +} diff --git a/src/core/taskq.h b/src/core/taskq.h new file mode 100644 index 00000000..dc1f4f2d --- /dev/null +++ b/src/core/taskq.h @@ -0,0 +1,32 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_TASKQ_H +#define CORE_TASKQ_H + +#include "core/nng_impl.h" + +struct nni_taskq_ent { + nni_list_node tqe_node; + void * tqe_arg; + nni_cb tqe_cb; + nni_taskq * tqe_tq; +}; + +extern int nni_taskq_init(nni_taskq **, int); +extern void nni_taskq_fini(nni_taskq *); + +extern int nni_taskq_dispatch(nni_taskq *, nni_taskq_ent *); +extern int nni_taskq_cancel(nni_taskq_ent *); +extern void nni_taskq_ent_init(nni_taskq_ent *, nni_cb, void *); + +extern int nni_taskq_sys_init(void); +extern void nni_taskq_sys_fini(void); + +#endif // CORE_TASKQ_H diff --git a/src/core/transport.c b/src/core/transport.c index 25a881d6..de94596c 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -47,10 +47,10 @@ nni_tran_find(const char *addr) } -// nni_transport_init initializes the entire transport subsystem, including +// nni_tran_sys_init initializes the entire transport subsystem, including // each individual transport. void -nni_tran_init(void) +nni_tran_sys_init(void) { int i; nni_tran *tran; @@ -61,8 +61,10 @@ nni_tran_init(void) } +// nni_tran_sys_fini finalizes the entire transport system, including all +// transports. void -nni_tran_fini(void) +nni_tran_sys_fini(void) { int i; nni_tran *tran; diff --git a/src/core/transport.h b/src/core/transport.h index c74ec497..1765b361 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -117,7 +117,7 @@ struct nni_tran_pipe { // These APIs are used by the framework internally, and not for use by // transport implementations. extern nni_tran *nni_tran_find(const char *); -extern void nni_tran_init(void); -extern void nni_tran_fini(void); +extern void nni_tran_sys_init(void); +extern void nni_tran_sys_fini(void); #endif // CORE_TRANSPORT_H |
