aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-02-18 19:24:15 -0800
committerGarrett D'Amore <garrett@damore.org>2017-02-18 19:25:14 -0800
commit2944f3a2634876e543003fabb05cba1b178df6aa (patch)
treea9a5aba5aec0ff1028e1a4d5815939887d406bb5
parentf42f22abf488d30d4d68a8ab57063c48d618948d (diff)
downloadnng-2944f3a2634876e543003fabb05cba1b178df6aa.tar.gz
nng-2944f3a2634876e543003fabb05cba1b178df6aa.tar.bz2
nng-2944f3a2634876e543003fabb05cba1b178df6aa.zip
Taskq implementation.
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/defs.h4
-rw-r--r--src/core/init.c18
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/random.c4
-rw-r--r--src/core/random.h8
-rw-r--r--src/core/taskq.c177
-rw-r--r--src/core/taskq.h32
-rw-r--r--src/core/transport.c8
-rw-r--r--src/core/transport.h4
10 files changed, 242 insertions, 16 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c81286c1..6efecd0d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -64,6 +64,8 @@ set (NNG_SOURCES
core/random.h
core/socket.c
core/socket.h
+ core/taskq.c
+ core/taskq.h
core/thread.c
core/thread.h
core/transport.c
diff --git a/src/core/defs.h b/src/core/defs.h
index abb340bc..0a04b732 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -50,6 +50,10 @@ typedef uint64_t nni_time; // Abs. time (usec).
typedef int64_t nni_duration; // Rel. time (usec).
typedef void (*nni_worker)(void *);
+typedef struct nni_taskq nni_taskq;
+typedef struct nni_taskq_ent nni_taskq_ent;
+typedef void (*nni_cb)(void *);
+
// Used by transports for scatter gather I/O.
typedef struct {
void * iov_buf;
diff --git a/src/core/init.c b/src/core/init.c
index 66ad9725..0e27e289 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -26,17 +26,24 @@ nni_init_helper(void)
{
int rv;
- if ((rv = nni_random_init()) != 0) {
+ if ((rv = nni_taskq_sys_init()) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_random_sys_init()) != 0) {
+ nni_taskq_sys_fini();
return (rv);
}
if ((rv = nni_mtx_init(&nni_idlock_x)) != 0) {
+ nni_random_sys_fini();
+ nni_taskq_sys_fini();
return (rv);
}
if (((rv = nni_idhash_init(&nni_endpoints_x)) != 0) ||
((rv = nni_idhash_init(&nni_pipes_x)) != 0) ||
((rv = nni_idhash_init(&nni_sockets_x)) != 0)) {
nni_mtx_fini(&nni_idlock_x);
- nni_random_fini();
+ nni_random_sys_fini();
+ nni_taskq_sys_fini();
return (rv);
}
nni_idhash_set_limits(&nni_pipes_x, 1, 0x7fffffff,
@@ -49,7 +56,7 @@ nni_init_helper(void)
nni_endpoints = &nni_endpoints_x;
nni_sockets = &nni_sockets_x;
- nni_tran_init();
+ nni_tran_sys_init();
return (0);
}
@@ -68,7 +75,8 @@ nni_fini(void)
nni_idhash_fini(&nni_pipes_x);
nni_idhash_fini(&nni_sockets_x);
nni_mtx_fini(&nni_idlock_x);
- nni_tran_fini();
- nni_random_fini();
+ nni_tran_sys_fini();
+ nni_random_sys_fini();
+ nni_taskq_sys_fini();
nni_plat_fini();
}
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 871fcf67..91dce28a 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -35,6 +35,7 @@
#include "core/platform.h"
#include "core/protocol.h"
#include "core/random.h"
+#include "core/taskq.h"
#include "core/thread.h"
#include "core/transport.h"
diff --git a/src/core/random.c b/src/core/random.c
index b2ce0ef7..00c0bcf7 100644
--- a/src/core/random.c
+++ b/src/core/random.c
@@ -153,7 +153,7 @@ nni_isaac_randinit(nni_isaac_ctx *ctx, int flag)
static nni_isaac_ctx nni_random_ctx;
int
-nni_random_init(void)
+nni_random_sys_init(void)
{
// minimally, grab the system clock
nni_isaac_ctx *ctx = &nni_random_ctx;
@@ -189,7 +189,7 @@ nni_random(void)
void
-nni_random_fini(void)
+nni_random_sys_fini(void)
{
nni_mtx_fini(&nni_random_ctx.mx);
}
diff --git a/src/core/random.h b/src/core/random.h
index cf365380..33229b09 100644
--- a/src/core/random.h
+++ b/src/core/random.h
@@ -10,12 +10,12 @@
#ifndef CORE_RANDOM_H
#define CORE_RANDOM_H
-// nni_random_init initializes the pRNG subsystem. This includes obtaining
+// nni_random_sys_init initializes the pRNG subsystem. This includes obtaining
// suitable seeding material from the platform.
-extern int nni_random_init(void);
+extern int nni_random_sys_init(void);
-// nni_random_fini destroys the pRNG subsystem.
-extern void nni_random_fini(void);
+// nni_random_sys_fini destroys the pRNG subsystem.
+extern void nni_random_sys_fini(void);
// nni_random returns a random 32-bit integer. Note that this routine is
// thread-safe/reentrant. The pRNG is very robust, should be of crypto
diff --git a/src/core/taskq.c b/src/core/taskq.c
new file mode 100644
index 00000000..2b2b0d1a
--- /dev/null
+++ b/src/core/taskq.c
@@ -0,0 +1,177 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+struct nni_taskq {
+ nni_list tq_ents;
+ nni_mtx tq_mtx;
+ nni_cv tq_cv;
+ nni_thr * tq_threads;
+ int tq_nthreads;
+ int tq_close;
+};
+
+static nni_taskq *nni_taskq_systq = NULL;
+
+static void
+nni_taskq_thread(void *self)
+{
+ nni_taskq *tq = self;
+ nni_taskq_ent *ent;
+
+ nni_mtx_lock(&tq->tq_mtx);
+ for (;;) {
+ if ((ent = nni_list_first(&tq->tq_ents)) != NULL) {
+ nni_list_remove(&tq->tq_ents, ent);
+ ent->tqe_tq = NULL;
+ nni_mtx_unlock(&tq->tq_mtx);
+ ent->tqe_cb(ent->tqe_arg);
+ continue;
+ }
+
+ if (tq->tq_close) {
+ break;
+ }
+
+ nni_cv_wait(&tq->tq_cv);
+ }
+ nni_mtx_unlock(&tq->tq_mtx);
+}
+
+
+int
+nni_taskq_init(nni_taskq **tqp, int nthr)
+{
+ int rv;
+ nni_taskq *tq;
+ int i;
+
+ if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&tq->tq_mtx);
+ nni_cv_init(&tq->tq_cv, &tq->tq_mtx);
+ tq->tq_close = 0;
+ NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node);
+
+ if ((tq->tq_threads = nni_alloc(sizeof (nni_thr) * nthr)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ for (tq->tq_nthreads = 0; tq->tq_nthreads < nthr; tq->tq_nthreads++) {
+ rv = nni_thr_init(&tq->tq_threads[tq->tq_nthreads],
+ nni_taskq_thread, tq);
+ if (rv != 0) {
+ goto fail;
+ }
+ }
+ tq->tq_nthreads = nthr;
+ for (i = 0; i < tq->tq_nthreads; i++) {
+ nni_thr_run(&tq->tq_threads[i]);
+ }
+ *tqp = tq;
+ return (0);
+
+fail:
+
+ nni_taskq_fini(tq);
+ return (rv);
+}
+
+
+void
+nni_taskq_fini(nni_taskq *tq)
+{
+ int i;
+
+ nni_mtx_lock(&tq->tq_mtx);
+ tq->tq_close = 1;
+ nni_cv_wake(&tq->tq_cv);
+ nni_mtx_unlock(&tq->tq_mtx);
+ for (i = 0; i < tq->tq_nthreads; i++) {
+ nni_thr_fini(&tq->tq_threads[i]);
+ }
+ nni_cv_fini(&tq->tq_cv);
+ nni_mtx_fini(&tq->tq_mtx);
+ NNI_FREE_STRUCT(tq);
+}
+
+
+int
+nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
+{
+ if (tq == NULL) {
+ tq = nni_taskq_systq;
+ }
+
+ nni_mtx_lock(&tq->tq_mtx);
+ if (tq->tq_close) {
+ nni_mtx_unlock(&tq->tq_mtx);
+ return (NNG_ECLOSED);
+ }
+ // It might already be scheduled... if so don't redo it.
+ if (ent->tqe_tq == NULL) {
+ ent->tqe_tq = tq;
+ nni_list_append(&tq->tq_ents, ent);
+ }
+ nni_cv_wake(&tq->tq_cv);
+ nni_mtx_unlock(&tq->tq_mtx);
+ return (0);
+}
+
+
+int
+nni_taskq_cancel(nni_taskq_ent *ent)
+{
+ nni_taskq *tq;
+
+ if ((tq = ent->tqe_tq) == NULL) {
+ return (NNG_ENOENT);
+ }
+ nni_mtx_lock(&tq->tq_mtx);
+ if (ent->tqe_tq == NULL) {
+ nni_mtx_unlock(&tq->tq_mtx);
+ return (NNG_ENOENT);
+ }
+ if ((ent->tqe_tq) != tq) {
+ nni_mtx_unlock(&tq->tq_mtx);
+ return (NNG_EBUSY);
+ }
+ nni_list_remove(&tq->tq_ents, ent);
+ nni_mtx_unlock(&tq->tq_mtx);
+ return (0);
+}
+
+
+void
+nni_taskq_ent_init(nni_taskq_ent *ent, nni_cb cb, void *arg)
+{
+ NNI_LIST_NODE_INIT(&ent->tqe_node);
+ ent->tqe_cb = cb;
+ ent->tqe_arg = arg;
+ ent->tqe_tq = NULL;
+}
+
+
+int
+nni_taskq_sys_init(void)
+{
+ int rv;
+
+ // XXX: Make the "16" = NCPUs * 2
+ rv = nni_taskq_init(&nni_taskq_systq, 16);
+ return (rv);
+}
+
+
+void
+nni_taskq_sys_fini(void)
+{
+ nni_taskq_fini(nni_taskq_systq);
+}
diff --git a/src/core/taskq.h b/src/core/taskq.h
new file mode 100644
index 00000000..dc1f4f2d
--- /dev/null
+++ b/src/core/taskq.h
@@ -0,0 +1,32 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_TASKQ_H
+#define CORE_TASKQ_H
+
+#include "core/nng_impl.h"
+
+struct nni_taskq_ent {
+ nni_list_node tqe_node;
+ void * tqe_arg;
+ nni_cb tqe_cb;
+ nni_taskq * tqe_tq;
+};
+
+extern int nni_taskq_init(nni_taskq **, int);
+extern void nni_taskq_fini(nni_taskq *);
+
+extern int nni_taskq_dispatch(nni_taskq *, nni_taskq_ent *);
+extern int nni_taskq_cancel(nni_taskq_ent *);
+extern void nni_taskq_ent_init(nni_taskq_ent *, nni_cb, void *);
+
+extern int nni_taskq_sys_init(void);
+extern void nni_taskq_sys_fini(void);
+
+#endif // CORE_TASKQ_H
diff --git a/src/core/transport.c b/src/core/transport.c
index 25a881d6..de94596c 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -47,10 +47,10 @@ nni_tran_find(const char *addr)
}
-// nni_transport_init initializes the entire transport subsystem, including
+// nni_tran_sys_init initializes the entire transport subsystem, including
// each individual transport.
void
-nni_tran_init(void)
+nni_tran_sys_init(void)
{
int i;
nni_tran *tran;
@@ -61,8 +61,10 @@ nni_tran_init(void)
}
+// nni_tran_sys_fini finalizes the entire transport system, including all
+// transports.
void
-nni_tran_fini(void)
+nni_tran_sys_fini(void)
{
int i;
nni_tran *tran;
diff --git a/src/core/transport.h b/src/core/transport.h
index c74ec497..1765b361 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -117,7 +117,7 @@ struct nni_tran_pipe {
// These APIs are used by the framework internally, and not for use by
// transport implementations.
extern nni_tran *nni_tran_find(const char *);
-extern void nni_tran_init(void);
-extern void nni_tran_fini(void);
+extern void nni_tran_sys_init(void);
+extern void nni_tran_sys_fini(void);
#endif // CORE_TRANSPORT_H