aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.c258
-rw-r--r--src/core/aio.h5
-rw-r--r--src/core/init.c13
-rw-r--r--src/core/msgqueue.c87
4 files changed, 240 insertions, 123 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index ba09b608..73c1dd9b 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -11,10 +11,22 @@
#include "core/nng_impl.h"
#include <string.h>
-#define NNI_AIO_WAKE (1 << 0)
-#define NNI_AIO_DONE (1 << 1)
-#define NNI_AIO_FINI (1 << 2)
-#define NNI_AIO_STOP (1 << 3)
+enum nni_aio_flags {
+ NNI_AIO_WAKE = 0x1,
+ NNI_AIO_DONE = 0x2,
+ NNI_AIO_FINI = 0x4,
+};
+
+// These are used for expiration.
+static nni_mtx nni_aio_expire_mtx;
+static nni_cv nni_aio_expire_cv;
+static int nni_aio_expire_exit;
+static nni_list nni_aio_expire_aios;
+static nni_thr nni_aio_expire_thr;
+static nni_aio *nni_aio_expire_current;
+
+static void nni_aio_expire_add(nni_aio *);
+static void nni_aio_expire_remove(nni_aio *);
int
nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
@@ -53,12 +65,20 @@ nni_aio_fini(nni_aio *aio)
aio->a_flags |= NNI_AIO_DONE;
aio->a_result = NNG_ECANCELED;
cancelfn = aio->a_prov_cancel;
+
} else {
cancelfn = NULL;
}
- nni_cv_wake(&aio->a_cv); // XXX: why? aio_wait? We shouldn't have any
+ nni_cv_wake(&aio->a_cv);
+
+ while (aio->a_refcnt != 0) {
+ nni_cv_wait(&aio->a_cv);
+ }
nni_mtx_unlock(&aio->a_lk);
+ // just a list operation at this point.
+ nni_aio_expire_remove(aio);
+
// Cancel the AIO if it was scheduled.
if (cancelfn != NULL) {
cancelfn(aio);
@@ -122,7 +142,7 @@ nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data)
nni_mtx_lock(&aio->a_lk);
aio->a_flags &= ~(NNI_AIO_DONE | NNI_AIO_WAKE);
- if (aio->a_flags & (NNI_AIO_FINI | NNI_AIO_STOP)) {
+ if (aio->a_flags & NNI_AIO_FINI) {
// We should not reschedule anything at this point.
nni_mtx_unlock(&aio->a_lk);
return (NNG_ECANCELED);
@@ -131,40 +151,11 @@ nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data)
aio->a_count = 0;
aio->a_prov_cancel = cancel;
aio->a_prov_data = data;
- nni_mtx_unlock(&aio->a_lk);
- return (0);
-}
-
-// XXX: REMOVE ME...
-void
-nni_aio_stop(nni_aio *aio)
-{
- void (*cancelfn)(nni_aio *);
-
- nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_STOP | NNI_AIO_FINI | NNI_AIO_DONE)) {
- nni_mtx_unlock(&aio->a_lk);
- return;
- }
- aio->a_result = NNG_ECANCELED;
- aio->a_flags |= NNI_AIO_DONE | NNI_AIO_STOP;
- cancelfn = aio->a_prov_cancel;
- nni_mtx_unlock(&aio->a_lk);
-
- // This unregisters the AIO from the provider.
- if (cancelfn != NULL) {
- cancelfn(aio);
+ if (aio->a_expire != NNI_TIME_NEVER) {
+ nni_aio_expire_add(aio);
}
-
- nni_mtx_lock(&aio->a_lk);
- aio->a_prov_data = NULL;
- aio->a_prov_cancel = NULL;
- nni_cv_wake(&aio->a_cv);
nni_mtx_unlock(&aio->a_lk);
-
- // This either aborts the task, or waits for it to complete if already
- // dispatched.
- nni_taskq_cancel(NULL, &aio->a_tqe);
+ return (0);
}
void
@@ -180,10 +171,14 @@ nni_aio_cancel(nni_aio *aio, int rv)
return;
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_result = rv;
- cancelfn = aio->a_prov_cancel;
+ aio->a_result = rv;
+ cancelfn = aio->a_prov_cancel;
+ aio->a_prov_cancel = NULL;
+
+ // Guaraneed to just be a list operation.
+ nni_aio_expire_remove(aio);
- // XXX: Think about the synchronization with nni_aio_fini...
+ aio->a_refcnt++;
nni_mtx_unlock(&aio->a_lk);
// Stop any I/O at the provider level.
@@ -192,16 +187,20 @@ nni_aio_cancel(nni_aio *aio, int rv)
}
nni_mtx_lock(&aio->a_lk);
+
+ aio->a_refcnt--;
+ if (aio->a_refcnt == 0) {
+ nni_cv_wake(&aio->a_cv);
+ }
+
// These should have already been cleared by the cancel function.
aio->a_prov_data = NULL;
aio->a_prov_cancel = NULL;
- // XXX: mark unbusy
if (!(aio->a_flags & NNI_AIO_FINI)) {
// If we are finalizing, then we are done.
nni_taskq_dispatch(NULL, &aio->a_tqe);
}
- // XXX: else wake aio_cv.. because there is someone watching.
nni_mtx_unlock(&aio->a_lk);
}
@@ -222,10 +221,12 @@ nni_aio_finish(nni_aio *aio, int result, size_t count)
aio->a_prov_cancel = NULL;
aio->a_prov_data = NULL;
- // XXX: cleanup the NNI_AIO_STOP flag, it's kind of pointless I think.
- if (!(aio->a_flags & NNI_AIO_STOP)) {
- nni_taskq_dispatch(NULL, &aio->a_tqe);
- }
+ // This is guaranteed to just be a list operation at this point,
+ // because done wasn't set.
+ nni_aio_expire_remove(aio);
+ aio->a_expire = NNI_TIME_NEVER;
+
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
nni_mtx_unlock(&aio->a_lk);
}
@@ -253,3 +254,166 @@ nni_aio_list_active(nni_aio *aio)
{
return (nni_list_node_active(&aio->a_prov_node));
}
+
+static void
+nni_aio_expire_add(nni_aio *aio)
+{
+ nni_mtx * mtx = &nni_aio_expire_mtx;
+ nni_cv * cv = &nni_aio_expire_cv;
+ nni_list *list = &nni_aio_expire_aios;
+ nni_aio * naio;
+
+ nni_mtx_lock(mtx);
+ // This is a reverse walk of the list. We're more likely to find
+ // a match at the end of the list.
+ for (naio = nni_list_last(list); naio != NULL;
+ naio = nni_list_prev(list, naio)) {
+ if (aio->a_expire >= naio->a_expire) {
+ nni_list_insert_after(list, aio, naio);
+ break;
+ }
+ }
+ if (naio == NULL) {
+ // This has the shortest time, so insert at the start.
+ nni_list_prepend(list, aio);
+ // And, as we are the latest, kick the thing.
+ nni_cv_wake(cv);
+ }
+ nni_mtx_unlock(mtx);
+}
+
+static void
+nni_aio_expire_remove(nni_aio *aio)
+{
+ nni_mtx * mtx = &nni_aio_expire_mtx;
+ nni_cv * cv = &nni_aio_expire_cv;
+ nni_list *list = &nni_aio_expire_aios;
+
+ nni_mtx_lock(mtx);
+ if (nni_list_active(list, aio)) {
+ nni_list_remove(list, aio);
+ }
+ while (aio == nni_aio_expire_current) {
+ nni_cv_wait(cv);
+ }
+ nni_mtx_unlock(mtx);
+}
+
+static void
+nni_aio_expire_loop(void *arg)
+{
+ nni_mtx * mtx = &nni_aio_expire_mtx;
+ nni_cv * cv = &nni_aio_expire_cv;
+ nni_list *aios = &nni_aio_expire_aios;
+ nni_aio * aio;
+ nni_time now;
+ int rv;
+ void (*cancelfn)(nni_aio *);
+
+ NNI_ARG_UNUSED(arg);
+
+ for (;;) {
+ nni_mtx_lock(mtx);
+
+ // If we are resuming this loop after processing an AIO,
+ // note that we are done with it, and wake anyone waiting
+ // for that to clear up.
+ if ((aio = nni_aio_expire_current) != NULL) {
+ nni_aio_expire_current = NULL;
+ nni_cv_wake(cv);
+ }
+
+ if (nni_aio_expire_exit) {
+ nni_mtx_unlock(mtx);
+ return;
+ }
+
+ if ((aio = nni_list_first(aios)) == NULL) {
+ nni_cv_wait(cv);
+ nni_mtx_unlock(mtx);
+ continue;
+ }
+
+ now = nni_clock();
+ if (now < aio->a_expire) {
+ // Unexpired; the list is ordered, so we just wait.
+ nni_cv_until(cv, aio->a_expire);
+ nni_mtx_unlock(mtx);
+ continue;
+ }
+
+ // This aio's time has come. Expire it, canceling any
+ // outstanding I/O.
+
+ nni_list_remove(aios, aio);
+ nni_aio_expire_current = aio;
+ nni_mtx_unlock(mtx);
+
+ cancelfn = NULL;
+
+ nni_mtx_lock(&aio->a_lk);
+ if ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) != 0) {
+ nni_mtx_unlock(&aio->a_lk);
+ continue;
+ }
+
+ aio->a_flags |= NNI_AIO_DONE;
+ aio->a_result = NNG_ETIMEDOUT;
+ cancelfn = aio->a_prov_cancel;
+ aio->a_prov_cancel = NULL;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // Cancel any outstanding activity.
+ if (cancelfn != NULL) {
+ cancelfn(aio);
+ }
+
+ // Arguably we could avoid dispatching, and execute the
+ // callback inline here as we are already on a separate
+ // thread. But keeping it separate is clearer, and more
+ // consistent with other uses. And this should not be a
+ // hot code path.
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
+ }
+}
+
+int
+nni_aio_sys_init(void)
+{
+ int rv;
+ nni_mtx *mtx = &nni_aio_expire_mtx;
+ nni_cv * cv = &nni_aio_expire_cv;
+ nni_thr *thr = &nni_aio_expire_thr;
+
+ if (((rv = nni_mtx_init(mtx)) != 0) ||
+ ((rv = nni_cv_init(cv, mtx)) != 0) ||
+ ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0)) {
+ goto fail;
+ }
+ NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node);
+ nni_thr_run(thr);
+ return (0);
+
+fail:
+ nni_thr_fini(thr);
+ nni_cv_fini(cv);
+ nni_mtx_fini(mtx);
+ return (rv);
+}
+
+void
+nni_aio_sys_fini(void)
+{
+ nni_mtx *mtx = &nni_aio_expire_mtx;
+ nni_cv * cv = &nni_aio_expire_cv;
+ nni_thr *thr = &nni_aio_expire_thr;
+
+ nni_mtx_lock(mtx);
+ nni_aio_expire_exit = 1;
+ nni_cv_wake(cv);
+ nni_mtx_unlock(mtx);
+
+ nni_thr_fini(thr);
+ nni_cv_fini(cv);
+ nni_mtx_fini(mtx);
+} \ No newline at end of file
diff --git a/src/core/aio.h b/src/core/aio.h
index 296b0682..09923d7f 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -52,6 +52,9 @@ struct nni_aio {
void (*a_prov_cancel)(nni_aio *);
void * a_prov_data;
nni_list_node a_prov_node;
+
+ // Expire node.
+ nni_list_node a_expire_node;
};
// nni_aio_init initializes an aio object. The callback is called with
@@ -118,4 +121,6 @@ extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *);
// nni_fini?)
// extern void nni_aio_stop(nni_aio *);
+extern int nni_aio_sys_init(void);
+extern void nni_aio_sys_fini(void);
#endif // CORE_AIO_H
diff --git a/src/core/init.c b/src/core/init.c
index 4292146d..88c80f3f 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -23,21 +23,25 @@ nni_init_helper(void)
nni_taskq_sys_fini();
return (rv);
}
+ if ((rv = nni_aio_sys_init()) != 0) {
+ nni_taskq_sys_fini();
+ return (rv);
+ }
if ((rv = nni_random_sys_init()) != 0) {
- nni_timer_sys_fini();
+ nni_aio_sys_fini();
nni_taskq_sys_fini();
return (rv);
}
if ((rv = nni_sock_sys_init()) != 0) {
nni_random_sys_fini();
- nni_timer_sys_fini();
+ nni_aio_sys_fini();
nni_taskq_sys_fini();
return (rv);
}
if ((rv = nni_ep_sys_init()) != 0) {
nni_sock_sys_fini();
nni_random_sys_fini();
- nni_timer_sys_fini();
+ nni_aio_sys_fini();
nni_taskq_sys_fini();
return (rv);
}
@@ -45,7 +49,7 @@ nni_init_helper(void)
nni_ep_sys_fini();
nni_sock_sys_fini();
nni_random_sys_fini();
- nni_timer_sys_fini();
+ nni_aio_sys_fini();
nni_taskq_sys_fini();
return (rv);
}
@@ -71,5 +75,6 @@ nni_fini(void)
nni_sock_sys_fini();
nni_random_sys_fini();
nni_timer_sys_fini();
+ nni_aio_sys_fini();
nni_plat_fini();
}
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 9f7ff7fd..71c6dff4 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -32,13 +33,8 @@ struct nni_msgq {
nni_list mq_aio_getq;
nni_list mq_aio_notify_get;
nni_list mq_aio_notify_put;
-
- nni_timer_node mq_timer;
- nni_time mq_expire;
};
-static void nni_msgq_run_timeout(void *);
-
int
nni_msgq_init(nni_msgq **mqp, int cap)
{
@@ -76,8 +72,6 @@ nni_msgq_init(nni_msgq **mqp, int cap)
goto fail;
}
- nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq);
-
mq->mq_cap = cap;
mq->mq_alloc = alloc;
mq->mq_len = 0;
@@ -86,7 +80,6 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_closed = 0;
mq->mq_puterr = 0;
mq->mq_geterr = 0;
- mq->mq_expire = NNI_TIME_NEVER;
mq->mq_draining = 0;
*mqp = mq;
@@ -110,7 +103,6 @@ nni_msgq_fini(nni_msgq *mq)
if (mq == NULL) {
return;
}
- nni_timer_cancel(&mq->mq_timer);
nni_cv_fini(&mq->mq_drained);
nni_mtx_fini(&mq->mq_lock);
@@ -348,8 +340,6 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
- nni_time expire = aio->a_expire;
-
nni_mtx_lock(&mq->mq_lock);
if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
nni_mtx_unlock(&mq->mq_lock);
@@ -368,21 +358,21 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_aio_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
- nni_msgq_run_notify(mq);
- // XXX: handle this using a generic aio timeout.
- if (expire < mq->mq_expire) {
- mq->mq_expire = expire;
- nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ // if this was a non-blocking operation, and we couldn't finish
+ // it synchronously in the above run_putq, then abort.
+ if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, NNG_EAGAIN, 0);
}
+ nni_msgq_run_notify(mq);
+
nni_mtx_unlock(&mq->mq_lock);
}
void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
- nni_time expire = aio->a_expire;
-
nni_mtx_lock(&mq->mq_lock);
if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
nni_mtx_unlock(&mq->mq_lock);
@@ -401,13 +391,15 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_aio_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
- nni_msgq_run_notify(mq);
- // XXX: handle this using a generic aio timeout.
- if (expire < mq->mq_expire) {
- mq->mq_expire = expire;
- nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ // if this was a non-blocking operation, and we couldn't finish
+ // it synchronously in the above run_getq, then abort.
+ if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, NNG_EAGAIN, 0);
}
+ nni_msgq_run_notify(mq);
+
nni_mtx_unlock(&mq->mq_lock);
}
@@ -483,55 +475,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_EAGAIN);
}
-// XXX: Move this to generic AIO timeout...
-void
-nni_msgq_run_timeout(void *arg)
-{
- nni_msgq *mq = arg;
- nni_time now;
- nni_time exp;
- nni_aio * aio;
- nni_aio * naio;
-
- now = nni_clock();
- exp = NNI_TIME_NEVER;
-
- nni_mtx_lock(&mq->mq_lock);
- naio = nni_list_first(&mq->mq_aio_getq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_getq, aio);
- if (aio->a_expire == NNI_TIME_ZERO) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_EAGAIN, 0);
- } else if (now >= aio->a_expire) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
- } else if (exp > aio->a_expire) {
- exp = aio->a_expire;
- }
- }
-
- naio = nni_list_first(&mq->mq_aio_putq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_putq, aio);
- if (aio->a_expire == NNI_TIME_ZERO) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_EAGAIN, 0);
- } else if (now >= aio->a_expire) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
- } else if (exp > aio->a_expire) {
- exp = aio->a_expire;
- }
- }
-
- mq->mq_expire = exp;
- if (mq->mq_expire != NNI_TIME_NEVER) {
- nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
- }
- nni_mtx_unlock(&mq->mq_lock);
-}
-
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{