aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c33
-rw-r--r--src/core/aio.h19
-rw-r--r--src/core/taskq.c5
3 files changed, 21 insertions, 36 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 45bb4427..e6157786 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,7 +12,7 @@
#include <string.h>
enum nni_aio_flags {
- NNI_AIO_WAKE = 0x1,
+ NNI_AIO_INIT = 0x1,
NNI_AIO_DONE = 0x2,
NNI_AIO_FINI = 0x4,
};
@@ -33,10 +33,6 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
{
int rv;
- if (cb == NULL) {
- cb = (nni_cb) nni_aio_wake;
- arg = aio;
- }
memset(aio, 0, sizeof(*aio));
if ((rv = nni_mtx_init(&aio->a_lk)) != 0) {
return (rv);
@@ -45,10 +41,8 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
nni_mtx_fini(&aio->a_lk);
return (rv);
}
- aio->a_cb = cb;
- aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_flags = 0;
+ aio->a_flags = NNI_AIO_INIT;
nni_task_init(NULL, &aio->a_task, cb, arg);
return (0);
@@ -77,7 +71,7 @@ nni_aio_fini(nni_aio *aio)
void
nni_aio_stop(nni_aio *aio)
{
- if ((aio->a_cb == NULL) && (aio->a_cbarg == NULL)) {
+ if ((aio->a_flags & NNI_AIO_INIT) == 0) {
// Never initialized, so nothing should have happened.
return;
}
@@ -110,29 +104,21 @@ nni_aio_count(nni_aio *aio)
}
void
-nni_aio_wake(nni_aio *aio)
-{
- nni_mtx_lock(&aio->a_lk);
- aio->a_flags |= NNI_AIO_WAKE;
- nni_cv_wake(&aio->a_cv);
- nni_mtx_unlock(&aio->a_lk);
-}
-
-void
nni_aio_wait(nni_aio *aio)
{
nni_mtx_lock(&aio->a_lk);
- while ((aio->a_flags & (NNI_AIO_WAKE | NNI_AIO_FINI)) == 0) {
+ while ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) == 0) {
nni_cv_wait(&aio->a_cv);
}
nni_mtx_unlock(&aio->a_lk);
+ nni_task_wait(&aio->a_task);
}
int
nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data)
{
nni_mtx_lock(&aio->a_lk);
- aio->a_flags &= ~(NNI_AIO_DONE | NNI_AIO_WAKE);
+ aio->a_flags &= ~NNI_AIO_DONE;
if (aio->a_flags & NNI_AIO_FINI) {
// We should not reschedule anything at this point.
nni_mtx_unlock(&aio->a_lk);
@@ -180,9 +166,7 @@ nni_aio_cancel(nni_aio *aio, int rv)
nni_mtx_lock(&aio->a_lk);
aio->a_refcnt--;
- if (aio->a_refcnt == 0) {
- nni_cv_wake(&aio->a_cv);
- }
+ nni_cv_wake(&aio->a_cv);
// These should have already been cleared by the cancel function.
aio->a_prov_data = NULL;
@@ -214,6 +198,7 @@ nni_aio_finish(nni_aio *aio, int result, size_t count)
// because done wasn't set.
nni_aio_expire_remove(aio);
aio->a_expire = NNI_TIME_NEVER;
+ nni_cv_wake(&aio->a_cv);
nni_task_dispatch(&aio->a_task);
nni_mtx_unlock(&aio->a_lk);
@@ -241,6 +226,7 @@ nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
// because done wasn't set.
nni_aio_expire_remove(aio);
aio->a_expire = NNI_TIME_NEVER;
+ nni_cv_wake(&aio->a_cv);
nni_task_dispatch(&aio->a_task);
nni_mtx_unlock(&aio->a_lk);
@@ -379,6 +365,7 @@ nni_aio_expire_loop(void *arg)
aio->a_result = NNG_ETIMEDOUT;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
+ nni_cv_wake(&aio->a_cv);
nni_mtx_unlock(&aio->a_lk);
// Cancel any outstanding activity.
diff --git a/src/core/aio.h b/src/core/aio.h
index 955f84ae..31a54f12 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -22,8 +22,6 @@ typedef struct nni_aio_ops nni_aio_ops;
struct nni_aio {
int a_result; // Result code (nng_errno)
size_t a_count; // Bytes transferred (I/O only)
- nni_cb a_cb; // User specified callback.
- void * a_cbarg; // Callback argument.
nni_time a_expire;
// These fields are private to the aio framework.
@@ -90,17 +88,12 @@ extern int nni_aio_result(nni_aio *);
// completed.
extern size_t nni_aio_count(nni_aio *);
-// nni_aio_wake wakes any threads blocked in nni_aio_wait. This is the
-// default callback if no other is supplied. If a user callback is supplied
-// then that code must call this routine to wake any waiters (unless the
-// user code is certain that there are no such waiters).
-extern void nni_aio_wake(nni_aio *);
-
-// nni_aio_wait blocks the caller until the operation is complete, as indicated
-// by nni_aio_wake being called. (Recall nni_aio_wake is the default
-// callback if none is supplied.) If a user supplied callback is provided,
-// and that callback does not call nni_aio_wake, then this routine may
-// block the caller indefinitely.
+// nni_aio_wait blocks the caller until the operation is complete.
+// The operation must have already been started. This routine will
+// block until the AIO, as well as any callback, has completed execution.
+// If the callback routine reschedules the AIO, the wait may wind up
+// waiting for the rescheduled operation; this is most often used in
+// lieu of a callback to build synchronous constructs on top of AIOs.
extern void nni_aio_wait(nni_aio *);
// nni_aio_list_init creates a list suitable for use by providers using
diff --git a/src/core/taskq.c b/src/core/taskq.c
index cf722596..33116ec9 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -145,6 +145,11 @@ nni_task_dispatch(nni_task *task)
{
nni_taskq *tq = task->task_tq;
+ // If there is no callback to perform, then do nothing!
+ // The user will be none the wiser.
+ if (task->task_cb == NULL) {
+ return;
+ }
nni_mtx_lock(&tq->tq_mtx);
// It might already be scheduled... if so don't redo it.
if (!nni_list_active(&tq->tq_tasks, task)) {