aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-21 02:46:01 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-21 02:46:01 -0700
commit537e2eda8d9fda2001295c835a4720def6a237f1 (patch)
tree615d05096e0fe4c8bfbccf31f37c7256381ccd8e /src/core
parent07078e1cf761cb6e56e46bde3ade6f792368d7dd (diff)
downloadnng-537e2eda8d9fda2001295c835a4720def6a237f1.tar.gz
nng-537e2eda8d9fda2001295c835a4720def6a237f1.tar.bz2
nng-537e2eda8d9fda2001295c835a4720def6a237f1.zip
Simpler taskq API.
The queue is bound at initialization time of the task, and we call entries just tasks, so we don't have to pass around a taskq pointer across all the calls. Further, nni_task_dispatch is now guaranteed to succeed.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c12
-rw-r--r--src/core/aio.h10
-rw-r--r--src/core/endpt.c5
-rw-r--r--src/core/endpt.h2
-rw-r--r--src/core/pipe.c5
-rw-r--r--src/core/pipe.h2
-rw-r--r--src/core/taskq.c90
-rw-r--r--src/core/taskq.h30
8 files changed, 75 insertions, 81 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 620a865d..45bb4427 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -49,7 +49,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
aio->a_flags = 0;
- nni_taskq_ent_init(&aio->a_tqe, cb, arg);
+ nni_task_init(NULL, &aio->a_task, cb, arg);
return (0);
}
@@ -89,7 +89,7 @@ nni_aio_stop(nni_aio *aio)
// Wait for any outstanding task to complete. We won't schedule
// new stuff because nni_aio_start will fail (due to AIO_FINI).
- nni_taskq_wait(NULL, &aio->a_tqe);
+ nni_task_wait(&aio->a_task);
}
int
@@ -188,7 +188,7 @@ nni_aio_cancel(nni_aio *aio, int rv)
aio->a_prov_data = NULL;
aio->a_prov_cancel = NULL;
- nni_taskq_dispatch(NULL, &aio->a_tqe);
+ nni_task_dispatch(&aio->a_task);
nni_mtx_unlock(&aio->a_lk);
}
@@ -215,7 +215,7 @@ nni_aio_finish(nni_aio *aio, int result, size_t count)
nni_aio_expire_remove(aio);
aio->a_expire = NNI_TIME_NEVER;
- nni_taskq_dispatch(NULL, &aio->a_tqe);
+ nni_task_dispatch(&aio->a_task);
nni_mtx_unlock(&aio->a_lk);
return (0);
}
@@ -242,7 +242,7 @@ nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
nni_aio_expire_remove(aio);
aio->a_expire = NNI_TIME_NEVER;
- nni_taskq_dispatch(NULL, &aio->a_tqe);
+ nni_task_dispatch(&aio->a_task);
nni_mtx_unlock(&aio->a_lk);
return (0);
}
@@ -391,7 +391,7 @@ nni_aio_expire_loop(void *arg)
// thread. But keeping it separate is clearer, and more
// consistent with other uses. And this should not be a
// hot code path.
- nni_taskq_dispatch(NULL, &aio->a_tqe);
+ nni_task_dispatch(&aio->a_task);
}
}
diff --git a/src/core/aio.h b/src/core/aio.h
index c698ce20..955f84ae 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -27,11 +27,11 @@ struct nni_aio {
nni_time a_expire;
// These fields are private to the aio framework.
- nni_mtx a_lk;
- nni_cv a_cv;
- unsigned a_flags;
- int a_refcnt; // prevent use-after-free
- nni_taskq_ent a_tqe;
+ nni_mtx a_lk;
+ nni_cv a_cv;
+ unsigned a_flags;
+ int a_refcnt; // prevent use-after-free
+ nni_task a_task;
// Read/write operations.
nni_iov a_iov[4];
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 6b474698..c0f9d399 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -22,6 +22,7 @@ static void nni_ep_connect_start(nni_ep *);
static void nni_ep_connect_done(void *);
static void nni_ep_backoff_start(nni_ep *);
static void nni_ep_backoff_done(void *);
+static void nni_ep_reap(nni_ep *);
static nni_idhash *nni_eps;
@@ -107,6 +108,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
ep->ep_refcnt = 0;
NNI_LIST_NODE_INIT(&ep->ep_node);
+ nni_task_init(NULL, &ep->ep_reap_task, (nni_cb) nni_ep_reap, ep);
nni_pipe_ep_list_init(&ep->ep_pipes);
@@ -226,8 +228,7 @@ nni_ep_stop(nni_ep *ep)
nni_pipe_stop(pipe);
}
- nni_taskq_ent_init(&ep->ep_reap_tqe, (nni_cb) nni_ep_reap, ep);
- nni_taskq_dispatch(NULL, &ep->ep_reap_tqe);
+ nni_task_dispatch(&ep->ep_reap_task);
nni_mtx_unlock(&ep->ep_mtx);
}
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 0d2a4570..942ba978 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -44,7 +44,7 @@ struct nni_ep {
nni_duration ep_currtime; // current time for reconnect
nni_duration ep_inirtime; // initial time for reconnect
nni_time ep_conntime; // time of last good connect
- nni_taskq_ent ep_reap_tqe;
+ nni_task ep_reap_task;
};
enum nni_ep_mode {
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 5b51a38b..9247ec0f 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -158,8 +158,7 @@ nni_pipe_stop(nni_pipe *p)
return;
}
p->p_stop = 1;
- nni_taskq_ent_init(&p->p_reap_tqe, (nni_cb) nni_pipe_reap, p);
- nni_taskq_dispatch(NULL, &p->p_reap_tqe);
+ nni_task_dispatch(&p->p_reap_task);
nni_mtx_unlock(&p->p_mtx);
}
@@ -209,6 +208,8 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_tran_data = tdata;
p->p_proto_data = NULL;
+ nni_task_init(NULL, &p->p_reap_task, (nni_cb) nni_pipe_reap, p);
+
if (((rv = nni_mtx_init(&p->p_mtx)) != 0) ||
((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0)) {
tran->tran_pipe->p_fini(p);
diff --git a/src/core/pipe.h b/src/core/pipe.h
index e2c76d4d..b8f6d90a 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -34,7 +34,7 @@ struct nni_pipe {
int p_refcnt;
nni_mtx p_mtx;
nni_cv p_cv;
- nni_taskq_ent p_reap_tqe;
+ nni_task p_reap_task;
nni_aio p_start_aio;
};
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 64179790..cf722596 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -12,13 +12,13 @@
typedef struct nni_taskq_thr nni_taskq_thr;
struct nni_taskq_thr {
- nni_taskq * tqt_tq;
- nni_thr tqt_thread;
- nni_taskq_ent *tqt_running;
- int tqt_wait;
+ nni_taskq *tqt_tq;
+ nni_thr tqt_thread;
+ nni_task * tqt_running;
+ int tqt_wait;
};
struct nni_taskq {
- nni_list tq_ents;
+ nni_list tq_tasks;
nni_mtx tq_mtx;
nni_cv tq_cv;
nni_taskq_thr *tq_threads;
@@ -34,16 +34,15 @@ nni_taskq_thread(void *self)
{
nni_taskq_thr *thr = self;
nni_taskq * tq = thr->tqt_tq;
- nni_taskq_ent *ent;
+ nni_task * task;
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
- if ((ent = nni_list_first(&tq->tq_ents)) != NULL) {
- nni_list_remove(&tq->tq_ents, ent);
- ent->tqe_tq = NULL;
- thr->tqt_running = ent;
+ if ((task = nni_list_first(&tq->tq_tasks)) != NULL) {
+ nni_list_remove(&tq->tq_tasks, task);
+ thr->tqt_running = task;
nni_mtx_unlock(&tq->tq_mtx);
- ent->tqe_cb(ent->tqe_arg);
+ task->task_cb(task->task_arg);
nni_mtx_lock(&tq->tq_mtx);
thr->tqt_running = NULL;
if (thr->tqt_wait) {
@@ -91,7 +90,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
return (rv);
}
tq->tq_close = 0;
- NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node);
+ NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node);
tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr);
if (tq->tq_threads == NULL) {
@@ -141,46 +140,35 @@ nni_taskq_fini(nni_taskq *tq)
NNI_FREE_STRUCT(tq);
}
-int
-nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
+void
+nni_task_dispatch(nni_task *task)
{
- if (tq == NULL) {
- tq = nni_taskq_systq;
- }
+ nni_taskq *tq = task->task_tq;
nni_mtx_lock(&tq->tq_mtx);
- if (tq->tq_close) {
- nni_mtx_unlock(&tq->tq_mtx);
- return (NNG_ECLOSED);
- }
// It might already be scheduled... if so don't redo it.
- if (!nni_list_active(&tq->tq_ents, ent)) {
- ent->tqe_tq = tq;
- nni_list_append(&tq->tq_ents, ent);
+ if (!nni_list_active(&tq->tq_tasks, task)) {
+ nni_list_append(&tq->tq_tasks, task);
}
nni_cv_wake(&tq->tq_cv);
nni_mtx_unlock(&tq->tq_mtx);
- return (0);
}
void
-nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent)
+nni_task_wait(nni_task *task)
{
- int i;
- int running;
-
- if (tq == NULL) {
- tq = nni_taskq_systq;
- }
+ nni_taskq *tq = task->task_tq;
+ int i;
+ int running;
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
running = 0;
- if (nni_list_active(&tq->tq_ents, ent)) {
+ if (nni_list_active(&tq->tq_tasks, task)) {
running = 1;
} else {
for (i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == ent) {
+ if (tq->tq_threads[i].tqt_running == task) {
running = 1;
break;
}
@@ -197,21 +185,18 @@ nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent)
}
int
-nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
+nni_task_cancel(nni_task *task)
{
- int i;
- int running;
-
- if (tq == NULL) {
- tq = nni_taskq_systq;
- }
+ nni_taskq *tq = task->task_tq;
+ int i;
+ int running;
nni_mtx_lock(&tq->tq_mtx);
running = 1;
for (;;) {
running = 0;
for (i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == ent) {
+ if (tq->tq_threads[i].tqt_running == task) {
running = 1;
break;
}
@@ -225,24 +210,23 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
nni_cv_wait(&tq->tq_cv);
}
- if (ent->tqe_tq != tq) {
- nni_mtx_unlock(&tq->tq_mtx);
- return (NNG_ENOENT);
- }
- if (nni_list_active(&tq->tq_ents, ent)) {
- nni_list_remove(&tq->tq_ents, ent);
+ if (nni_list_active(&tq->tq_tasks, task)) {
+ nni_list_remove(&tq->tq_tasks, task);
}
nni_mtx_unlock(&tq->tq_mtx);
return (0);
}
void
-nni_taskq_ent_init(nni_taskq_ent *ent, nni_cb cb, void *arg)
+nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg)
{
- NNI_LIST_NODE_INIT(&ent->tqe_node);
- ent->tqe_cb = cb;
- ent->tqe_arg = arg;
- ent->tqe_tq = NULL;
+ if (tq == NULL) {
+ tq = nni_taskq_systq;
+ }
+ NNI_LIST_NODE_INIT(&task->task_node);
+ task->task_cb = cb;
+ task->task_arg = arg;
+ task->task_tq = tq;
}
int
diff --git a/src/core/taskq.h b/src/core/taskq.h
index e6399706..89b7be4a 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -13,23 +14,30 @@
#include "core/defs.h"
#include "core/list.h"
-typedef struct nni_taskq nni_taskq;
-typedef struct nni_taskq_ent nni_taskq_ent;
+typedef struct nni_taskq nni_taskq;
+typedef struct nni_task nni_task;
-struct nni_taskq_ent {
- nni_list_node tqe_node;
- void * tqe_arg;
- nni_cb tqe_cb;
- nni_taskq * tqe_tq;
+// nni_task is a structure representing a task. Its intended to inlined
+// into structures so that taskq_dispatch can be a guaranteed operation.
+struct nni_task {
+ nni_list_node task_node;
+ void * task_arg;
+ nni_cb task_cb;
+ nni_taskq * task_tq;
};
extern int nni_taskq_init(nni_taskq **, int);
extern void nni_taskq_fini(nni_taskq *);
-extern int nni_taskq_dispatch(nni_taskq *, nni_taskq_ent *);
-extern int nni_taskq_cancel(nni_taskq *, nni_taskq_ent *);
-extern void nni_taskq_wait(nni_taskq *, nni_taskq_ent *);
-extern void nni_taskq_ent_init(nni_taskq_ent *, nni_cb, void *);
+// nni_task_dispatch sends the task to the queue. It is guaranteed to
+// succeed. (If the queue is shutdown, then the behavior is undefined.)
+extern void nni_task_dispatch(nni_task *);
+
+// nni_task_cancel cancels the task. It will wait for the task to complete
+// if it is already running.
+extern int nni_task_cancel(nni_task *);
+extern void nni_task_wait(nni_task *);
+extern void nni_task_init(nni_taskq *, nni_task *, nni_cb, void *);
extern int nni_taskq_sys_init(void);
extern void nni_taskq_sys_fini(void);