summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-11 22:38:21 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-11 22:38:21 -0800
commit3d4be5126f91978b7d7349de79334ecfc8fc2afe (patch)
treec8cfadbb1096e99cad21bddbb9fe9ff7b5dd175a /src/core/msgqueue.c
parent3d90bae8eda62fecdf367932fca591b965838e20 (diff)
downloadnng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.tar.gz
nng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.tar.bz2
nng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.zip
Notification working - separate thread now.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c303
1 files changed, 63 insertions, 240 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 153d7c15..37d6d5ff 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -15,83 +15,29 @@
// side can close, and they may be closed more than once.
struct nni_msgq {
- nni_mtx mq_lock;
- nni_cv mq_readable;
- nni_cv mq_writeable;
- nni_cv mq_drained;
- int mq_cap;
- int mq_alloc; // alloc is cap + 2...
- int mq_len;
- int mq_get;
- int mq_put;
- int mq_closed;
- int mq_puterr;
- int mq_geterr;
- nni_msg ** mq_msgs;
-
- int mq_notify_sig;
- nni_cv mq_notify_cv;
- nni_thr mq_notify_thr;
- nni_msgq_notify_fn mq_notify_fn;
- void * mq_notify_arg;
-
- nni_list mq_aio_putq;
- nni_list mq_aio_getq;
- nni_list mq_aio_notify_get;
- nni_list mq_aio_notify_put;
-
- nni_timer_node mq_timer;
- nni_time mq_expire;
+ nni_mtx mq_lock;
+ nni_cv mq_drained;
+ int mq_cap;
+ int mq_alloc; // alloc is cap + 2...
+ int mq_len;
+ int mq_get;
+ int mq_put;
+ int mq_closed;
+ int mq_puterr;
+ int mq_geterr;
+ int mq_draining;
+ nni_msg ** mq_msgs;
+
+ nni_list mq_aio_putq;
+ nni_list mq_aio_getq;
+ nni_list mq_aio_notify_get;
+ nni_list mq_aio_notify_put;
+
+ nni_timer_node mq_timer;
+ nni_time mq_expire;
};
-// nni_msgq_notifier thread runs if events callbacks are registered on the
-// message queue, and calls the notification callbacks outside of the
-// lock. It looks at the actual msgq state to trigger the right events.
-static void
-nni_msgq_notifier(void *arg)
-{
- nni_msgq *mq = arg;
- int sig;
- nni_msgq_notify_fn fn;
- void *fnarg;
-
- for (;;) {
- nni_mtx_lock(&mq->mq_lock);
- while ((mq->mq_notify_sig == 0) && (!mq->mq_closed)) {
- nni_cv_wait(&mq->mq_notify_cv);
- }
- if (mq->mq_closed) {
- nni_mtx_unlock(&mq->mq_lock);
- break;
- }
-
- sig = mq->mq_notify_sig;
- mq->mq_notify_sig = 0;
-
- fn = mq->mq_notify_fn;
- fnarg = mq->mq_notify_arg;
- nni_mtx_unlock(&mq->mq_lock);
-
- if (fn != NULL) {
- fn(mq, sig, fnarg);
- }
- }
-}
-
-
-// nni_msgq_kick kicks the msgq notification thread. It should be called
-// with the lock held.
-static void
-nni_msgq_kick(nni_msgq *mq, int sig)
-{
- if (mq->mq_notify_fn != NULL) {
- mq->mq_notify_sig |= sig;
- nni_cv_wake(&mq->mq_notify_cv);
- }
-}
-
-
static void nni_msgq_run_timeout(void *);
int
@@ -123,10 +69,7 @@ nni_msgq_init(nni_msgq **mqp, int cap)
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
goto fail;
}
- if (((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) ||
- ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) ||
- ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) ||
- ((rv = nni_cv_init(&mq->mq_notify_cv, &mq->mq_lock)) != 0)) {
+ if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
goto fail;
}
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) {
@@ -144,17 +87,14 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_closed = 0;
mq->mq_puterr = 0;
mq->mq_geterr = 0;
- mq->mq_notify_fn = NULL;
- mq->mq_notify_arg = NULL;
mq->mq_expire = NNI_TIME_NEVER;
+ mq->mq_draining = 0;
*mqp = mq;
return (0);
fail:
nni_cv_fini(&mq->mq_drained);
- nni_cv_fini(&mq->mq_writeable);
- nni_cv_fini(&mq->mq_readable);
nni_mtx_fini(&mq->mq_lock);
if (mq->mq_msgs != NULL) {
nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc);
@@ -173,10 +113,7 @@ nni_msgq_fini(nni_msgq *mq)
return;
}
nni_timer_cancel(&mq->mq_timer);
- nni_thr_fini(&mq->mq_notify_thr);
nni_cv_fini(&mq->mq_drained);
- nni_cv_fini(&mq->mq_writeable);
- nni_cv_fini(&mq->mq_readable);
nni_mtx_fini(&mq->mq_lock);
/* Free any orphaned messages. */
@@ -195,27 +132,6 @@ nni_msgq_fini(nni_msgq *mq)
}
-int
-nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg)
-{
- int rv;
-
- nni_thr_fini(&mq->mq_notify_thr);
-
- nni_mtx_lock(&mq->mq_lock);
- rv = nni_thr_init(&mq->mq_notify_thr, nni_msgq_notifier, mq);
- if (rv != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return (rv);
- }
- mq->mq_notify_fn = fn;
- mq->mq_notify_arg = arg;
- nni_thr_run(&mq->mq_notify_thr);
- nni_mtx_unlock(&mq->mq_lock);
- return (0);
-}
-
-
void
nni_msgq_set_get_error(nni_msgq *mq, int error)
{
@@ -329,14 +245,6 @@ nni_msgq_run_putq(nni_msgq *mq)
// Unable to make progress, leave the aio where it is.
break;
}
-
- // XXX: REMOVE ME WHEN WE GO COMPLETELY ASYNC.
- if (mq->mq_len != 0) {
- nni_cv_wake(&mq->mq_readable);
- }
- if (mq->mq_len < mq->mq_cap) {
- nni_cv_wake(&mq->mq_writeable);
- }
}
@@ -394,7 +302,6 @@ nni_msgq_run_notify(nni_msgq *mq)
}
if ((mq->mq_len < mq->mq_cap) ||
(nni_list_first(&mq->mq_aio_getq) != NULL)) {
-
NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) {
// This stays on the list.
nni_aio_finish(aio, 0, 0);
@@ -406,6 +313,37 @@ nni_msgq_run_notify(nni_msgq *mq)
nni_aio_finish(aio, 0, 0);
}
}
+
+ if (mq->mq_draining) {
+ if ((mq->mq_len == 0) &&
+ (nni_list_first(&mq->mq_aio_putq) == NULL)) {
+ nni_cv_wake(&mq->mq_drained);
+ }
+ }
+}
+
+
+void
+nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if (nni_list_active(&mq->mq_aio_notify_put, aio)) {
+ nni_list_remove(&mq->mq_aio_notify_put, aio);
+ }
+ nni_list_append(&mq->mq_aio_notify_put, aio);
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
+nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if (nni_list_active(&mq->mq_aio_notify_get, aio)) {
+ nni_list_remove(&mq->mq_aio_notify_get, aio);
+ }
+ nni_list_append(&mq->mq_aio_notify_get, aio);
+ nni_mtx_unlock(&mq->mq_lock);
}
@@ -608,122 +546,6 @@ nni_msgq_run_timeout(void *arg)
}
-#if 0
-int
-nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
-{
- int rv;
-
- nni_mtx_lock(&mq->mq_lock);
-
- for (;;) {
- // if closed, we don't put more... this check is first!
- if (mq->mq_closed) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_ECLOSED);
- }
-
- if ((rv = mq->mq_puterr) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return (rv);
- }
-
- // room in the queue?
- if (mq->mq_len < mq->mq_cap) {
- break;
- }
-
- // unbuffered, room for one, and a reader waiting?
- if (mq->mq_rwait &&
- (mq->mq_cap == 0) &&
- (mq->mq_len == mq->mq_cap)) {
- break;
- }
-
- // interrupted?
- if (*sig) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_EINTR);
- }
-
- // single poll?
- if (expire == NNI_TIME_ZERO) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_EAGAIN);
- }
-
- // waiting....
- mq->mq_wwait = 1;
-
- // if we are unbuffered, kick the notifier, because we're
- // writable.
- if (mq->mq_cap == 0) {
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
- }
-
- // not writeable, so wait until something changes
- rv = nni_cv_until(&mq->mq_writeable, expire);
- if (rv == NNG_ETIMEDOUT) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_ETIMEDOUT);
- }
- }
-
- // Writeable! Yay!!
- mq->mq_msgs[mq->mq_put] = msg;
- mq->mq_put++;
- if (mq->mq_put == mq->mq_alloc) {
- mq->mq_put = 0;
- }
- mq->mq_len++;
- if (mq->mq_rwait) {
- mq->mq_rwait = 0;
- nni_cv_wake(&mq->mq_readable);
- }
- if (mq->mq_len < mq->mq_cap) {
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
- }
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
- nni_msgq_run_getq(mq);
- nni_mtx_unlock(&mq->mq_lock);
-
- return (0);
-}
-
-
-static int
-nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
-{
- int rv;
-
- nni_mtx_lock(&mq->mq_lock);
-
- // Readable! Yay!!
-
- *msgp = mq->mq_msgs[mq->mq_get];
- mq->mq_len--;
- mq->mq_get++;
- if (mq->mq_get == mq->mq_alloc) {
- mq->mq_get = 0;
- }
- if (mq->mq_wwait) {
- mq->mq_wwait = 0;
- nni_cv_wake(&mq->mq_writeable);
- }
- if (mq->mq_len) {
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
- }
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
- nni_msgq_run_putq(mq);
- nni_mtx_unlock(&mq->mq_lock);
-
- return (0);
-}
-
-
-#endif
-
-
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
@@ -781,17 +603,23 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg)
void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
+ nni_aio *aio;
+
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
- nni_cv_wake(&mq->mq_writeable);
- nni_cv_wake(&mq->mq_readable);
- nni_cv_wake(&mq->mq_notify_cv);
- while (mq->mq_len > 0) {
+ mq->mq_draining = 1;
+ while ((mq->mq_len > 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) {
if (nni_cv_until(&mq->mq_drained, expire) != 0) {
break;
}
}
// If we timedout, free any remaining messages in the queue.
+ // Also complete the putq as NNG_ECLOSED.
+
+ while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
+ nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ }
while (mq->mq_len > 0) {
nni_msg *msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get > mq->mq_alloc) {
@@ -812,9 +640,6 @@ nni_msgq_close(nni_msgq *mq)
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
- nni_cv_wake(&mq->mq_writeable);
- nni_cv_wake(&mq->mq_readable);
- nni_cv_wake(&mq->mq_notify_cv);
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
@@ -937,8 +762,6 @@ nni_msgq_resize(nni_msgq *mq, int cap)
out:
// Wake everyone up -- we changed everything.
- nni_cv_wake(&mq->mq_readable);
- nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_drained);
nni_mtx_unlock(&mq->mq_lock);
return (0);