diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 12 | ||||
| -rw-r--r-- | src/core/aio.h | 10 | ||||
| -rw-r--r-- | src/core/endpt.c | 5 | ||||
| -rw-r--r-- | src/core/endpt.h | 2 | ||||
| -rw-r--r-- | src/core/pipe.c | 5 | ||||
| -rw-r--r-- | src/core/pipe.h | 2 | ||||
| -rw-r--r-- | src/core/taskq.c | 90 | ||||
| -rw-r--r-- | src/core/taskq.h | 30 |
8 files changed, 75 insertions, 81 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 620a865d..45bb4427 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -49,7 +49,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) aio->a_cbarg = arg; aio->a_expire = NNI_TIME_NEVER; aio->a_flags = 0; - nni_taskq_ent_init(&aio->a_tqe, cb, arg); + nni_task_init(NULL, &aio->a_task, cb, arg); return (0); } @@ -89,7 +89,7 @@ nni_aio_stop(nni_aio *aio) // Wait for any outstanding task to complete. We won't schedule // new stuff because nni_aio_start will fail (due to AIO_FINI). - nni_taskq_wait(NULL, &aio->a_tqe); + nni_task_wait(&aio->a_task); } int @@ -188,7 +188,7 @@ nni_aio_cancel(nni_aio *aio, int rv) aio->a_prov_data = NULL; aio->a_prov_cancel = NULL; - nni_taskq_dispatch(NULL, &aio->a_tqe); + nni_task_dispatch(&aio->a_task); nni_mtx_unlock(&aio->a_lk); } @@ -215,7 +215,7 @@ nni_aio_finish(nni_aio *aio, int result, size_t count) nni_aio_expire_remove(aio); aio->a_expire = NNI_TIME_NEVER; - nni_taskq_dispatch(NULL, &aio->a_tqe); + nni_task_dispatch(&aio->a_task); nni_mtx_unlock(&aio->a_lk); return (0); } @@ -242,7 +242,7 @@ nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe) nni_aio_expire_remove(aio); aio->a_expire = NNI_TIME_NEVER; - nni_taskq_dispatch(NULL, &aio->a_tqe); + nni_task_dispatch(&aio->a_task); nni_mtx_unlock(&aio->a_lk); return (0); } @@ -391,7 +391,7 @@ nni_aio_expire_loop(void *arg) // thread. But keeping it separate is clearer, and more // consistent with other uses. And this should not be a // hot code path. - nni_taskq_dispatch(NULL, &aio->a_tqe); + nni_task_dispatch(&aio->a_task); } } diff --git a/src/core/aio.h b/src/core/aio.h index c698ce20..955f84ae 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -27,11 +27,11 @@ struct nni_aio { nni_time a_expire; // These fields are private to the aio framework. - nni_mtx a_lk; - nni_cv a_cv; - unsigned a_flags; - int a_refcnt; // prevent use-after-free - nni_taskq_ent a_tqe; + nni_mtx a_lk; + nni_cv a_cv; + unsigned a_flags; + int a_refcnt; // prevent use-after-free + nni_task a_task; // Read/write operations. nni_iov a_iov[4]; diff --git a/src/core/endpt.c b/src/core/endpt.c index 6b474698..c0f9d399 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -22,6 +22,7 @@ static void nni_ep_connect_start(nni_ep *); static void nni_ep_connect_done(void *); static void nni_ep_backoff_start(nni_ep *); static void nni_ep_backoff_done(void *); +static void nni_ep_reap(nni_ep *); static nni_idhash *nni_eps; @@ -107,6 +108,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) ep->ep_refcnt = 0; NNI_LIST_NODE_INIT(&ep->ep_node); + nni_task_init(NULL, &ep->ep_reap_task, (nni_cb) nni_ep_reap, ep); nni_pipe_ep_list_init(&ep->ep_pipes); @@ -226,8 +228,7 @@ nni_ep_stop(nni_ep *ep) nni_pipe_stop(pipe); } - nni_taskq_ent_init(&ep->ep_reap_tqe, (nni_cb) nni_ep_reap, ep); - nni_taskq_dispatch(NULL, &ep->ep_reap_tqe); + nni_task_dispatch(&ep->ep_reap_task); nni_mtx_unlock(&ep->ep_mtx); } diff --git a/src/core/endpt.h b/src/core/endpt.h index 0d2a4570..942ba978 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -44,7 +44,7 @@ struct nni_ep { nni_duration ep_currtime; // current time for reconnect nni_duration ep_inirtime; // initial time for reconnect nni_time ep_conntime; // time of last good connect - nni_taskq_ent ep_reap_tqe; + nni_task ep_reap_task; }; enum nni_ep_mode { diff --git a/src/core/pipe.c b/src/core/pipe.c index 5b51a38b..9247ec0f 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -158,8 +158,7 @@ nni_pipe_stop(nni_pipe *p) return; } p->p_stop = 1; - nni_taskq_ent_init(&p->p_reap_tqe, (nni_cb) nni_pipe_reap, p); - nni_taskq_dispatch(NULL, &p->p_reap_tqe); + nni_task_dispatch(&p->p_reap_task); nni_mtx_unlock(&p->p_mtx); } @@ -209,6 +208,8 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_tran_data = tdata; p->p_proto_data = NULL; + nni_task_init(NULL, &p->p_reap_task, (nni_cb) nni_pipe_reap, p); + if (((rv = nni_mtx_init(&p->p_mtx)) != 0) || ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0)) { tran->tran_pipe->p_fini(p); diff --git a/src/core/pipe.h b/src/core/pipe.h index e2c76d4d..b8f6d90a 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -34,7 +34,7 @@ struct nni_pipe { int p_refcnt; nni_mtx p_mtx; nni_cv p_cv; - nni_taskq_ent p_reap_tqe; + nni_task p_reap_task; nni_aio p_start_aio; }; diff --git a/src/core/taskq.c b/src/core/taskq.c index 64179790..cf722596 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -12,13 +12,13 @@ typedef struct nni_taskq_thr nni_taskq_thr; struct nni_taskq_thr { - nni_taskq * tqt_tq; - nni_thr tqt_thread; - nni_taskq_ent *tqt_running; - int tqt_wait; + nni_taskq *tqt_tq; + nni_thr tqt_thread; + nni_task * tqt_running; + int tqt_wait; }; struct nni_taskq { - nni_list tq_ents; + nni_list tq_tasks; nni_mtx tq_mtx; nni_cv tq_cv; nni_taskq_thr *tq_threads; @@ -34,16 +34,15 @@ nni_taskq_thread(void *self) { nni_taskq_thr *thr = self; nni_taskq * tq = thr->tqt_tq; - nni_taskq_ent *ent; + nni_task * task; nni_mtx_lock(&tq->tq_mtx); for (;;) { - if ((ent = nni_list_first(&tq->tq_ents)) != NULL) { - nni_list_remove(&tq->tq_ents, ent); - ent->tqe_tq = NULL; - thr->tqt_running = ent; + if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { + nni_list_remove(&tq->tq_tasks, task); + thr->tqt_running = task; nni_mtx_unlock(&tq->tq_mtx); - ent->tqe_cb(ent->tqe_arg); + task->task_cb(task->task_arg); nni_mtx_lock(&tq->tq_mtx); thr->tqt_running = NULL; if (thr->tqt_wait) { @@ -91,7 +90,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr) return (rv); } tq->tq_close = 0; - NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node); + NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node); tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr); if (tq->tq_threads == NULL) { @@ -141,46 +140,35 @@ nni_taskq_fini(nni_taskq *tq) NNI_FREE_STRUCT(tq); } -int -nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent) +void +nni_task_dispatch(nni_task *task) { - if (tq == NULL) { - tq = nni_taskq_systq; - } + nni_taskq *tq = task->task_tq; nni_mtx_lock(&tq->tq_mtx); - if (tq->tq_close) { - nni_mtx_unlock(&tq->tq_mtx); - return (NNG_ECLOSED); - } // It might already be scheduled... if so don't redo it. - if (!nni_list_active(&tq->tq_ents, ent)) { - ent->tqe_tq = tq; - nni_list_append(&tq->tq_ents, ent); + if (!nni_list_active(&tq->tq_tasks, task)) { + nni_list_append(&tq->tq_tasks, task); } nni_cv_wake(&tq->tq_cv); nni_mtx_unlock(&tq->tq_mtx); - return (0); } void -nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent) +nni_task_wait(nni_task *task) { - int i; - int running; - - if (tq == NULL) { - tq = nni_taskq_systq; - } + nni_taskq *tq = task->task_tq; + int i; + int running; nni_mtx_lock(&tq->tq_mtx); for (;;) { running = 0; - if (nni_list_active(&tq->tq_ents, ent)) { + if (nni_list_active(&tq->tq_tasks, task)) { running = 1; } else { for (i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == ent) { + if (tq->tq_threads[i].tqt_running == task) { running = 1; break; } @@ -197,21 +185,18 @@ nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent) } int -nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) +nni_task_cancel(nni_task *task) { - int i; - int running; - - if (tq == NULL) { - tq = nni_taskq_systq; - } + nni_taskq *tq = task->task_tq; + int i; + int running; nni_mtx_lock(&tq->tq_mtx); running = 1; for (;;) { running = 0; for (i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == ent) { + if (tq->tq_threads[i].tqt_running == task) { running = 1; break; } @@ -225,24 +210,23 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) nni_cv_wait(&tq->tq_cv); } - if (ent->tqe_tq != tq) { - nni_mtx_unlock(&tq->tq_mtx); - return (NNG_ENOENT); - } - if (nni_list_active(&tq->tq_ents, ent)) { - nni_list_remove(&tq->tq_ents, ent); + if (nni_list_active(&tq->tq_tasks, task)) { + nni_list_remove(&tq->tq_tasks, task); } nni_mtx_unlock(&tq->tq_mtx); return (0); } void -nni_taskq_ent_init(nni_taskq_ent *ent, nni_cb cb, void *arg) +nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg) { - NNI_LIST_NODE_INIT(&ent->tqe_node); - ent->tqe_cb = cb; - ent->tqe_arg = arg; - ent->tqe_tq = NULL; + if (tq == NULL) { + tq = nni_taskq_systq; + } + NNI_LIST_NODE_INIT(&task->task_node); + task->task_cb = cb; + task->task_arg = arg; + task->task_tq = tq; } int diff --git a/src/core/taskq.h b/src/core/taskq.h index e6399706..89b7be4a 100644 --- a/src/core/taskq.h +++ b/src/core/taskq.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -13,23 +14,30 @@ #include "core/defs.h" #include "core/list.h" -typedef struct nni_taskq nni_taskq; -typedef struct nni_taskq_ent nni_taskq_ent; +typedef struct nni_taskq nni_taskq; +typedef struct nni_task nni_task; -struct nni_taskq_ent { - nni_list_node tqe_node; - void * tqe_arg; - nni_cb tqe_cb; - nni_taskq * tqe_tq; +// nni_task is a structure representing a task. Its intended to inlined +// into structures so that taskq_dispatch can be a guaranteed operation. +struct nni_task { + nni_list_node task_node; + void * task_arg; + nni_cb task_cb; + nni_taskq * task_tq; }; extern int nni_taskq_init(nni_taskq **, int); extern void nni_taskq_fini(nni_taskq *); -extern int nni_taskq_dispatch(nni_taskq *, nni_taskq_ent *); -extern int nni_taskq_cancel(nni_taskq *, nni_taskq_ent *); -extern void nni_taskq_wait(nni_taskq *, nni_taskq_ent *); -extern void nni_taskq_ent_init(nni_taskq_ent *, nni_cb, void *); +// nni_task_dispatch sends the task to the queue. It is guaranteed to +// succeed. (If the queue is shutdown, then the behavior is undefined.) +extern void nni_task_dispatch(nni_task *); + +// nni_task_cancel cancels the task. It will wait for the task to complete +// if it is already running. +extern int nni_task_cancel(nni_task *); +extern void nni_task_wait(nni_task *); +extern void nni_task_init(nni_taskq *, nni_task *, nni_cb, void *); extern int nni_taskq_sys_init(void); extern void nni_taskq_sys_fini(void); |
