summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c139
1 files changed, 93 insertions, 46 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 3ec194fc..09f58a33 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -31,10 +31,61 @@ struct nni_msgq {
int mq_wwait;
nni_msg ** mq_msgs;
+ int mq_notify_sig;
+ nni_cv mq_notify_cv;
+ nni_thr mq_notify_thr;
nni_msgq_notify_fn mq_notify_fn;
void * mq_notify_arg;
};
+
+// nni_msgq_notifier thread runs if events callbacks are registered on the
+// message queue, and calls the notification callbacks outside of the
+// lock. It looks at the actual msgq state to trigger the right events.
+static void
+nni_msgq_notifier(void *arg)
+{
+ nni_msgq *mq = arg;
+ int sig;
+ nni_msgq_notify_fn fn;
+ void *fnarg;
+
+ for (;;) {
+ nni_mtx_lock(&mq->mq_lock);
+ while ((mq->mq_notify_sig == 0) && (!mq->mq_closed)) {
+ nni_cv_wait(&mq->mq_notify_cv);
+ }
+ if (mq->mq_closed) {
+ nni_mtx_unlock(&mq->mq_lock);
+ break;
+ }
+
+ sig = mq->mq_notify_sig;
+ mq->mq_notify_sig = 0;
+
+ fn = mq->mq_notify_fn;
+ fnarg = mq->mq_notify_arg;
+ nni_mtx_unlock(&mq->mq_lock);
+
+ if (fn != NULL) {
+ fn(mq, sig, fnarg);
+ }
+ }
+}
+
+
+// nni_msgq_kick kicks the msgq notification thread. It should be called
+// with the lock held.
+static void
+nni_msgq_kick(nni_msgq *mq, int sig)
+{
+ if (mq->mq_notify_fn != NULL) {
+ mq->mq_notify_sig |= sig;
+ nni_cv_wake(&mq->mq_notify_cv);
+ }
+}
+
+
int
nni_msgq_init(nni_msgq **mqp, int cap)
{
@@ -59,13 +110,10 @@ nni_msgq_init(nni_msgq **mqp, int cap)
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
goto fail;
}
- if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) {
- goto fail;
- }
- if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) {
- goto fail;
- }
- if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
+ if (((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) ||
+ ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) ||
+ ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) ||
+ ((rv = nni_cv_init(&mq->mq_notify_cv, &mq->mq_lock)) != 0)) {
goto fail;
}
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) {
@@ -110,6 +158,7 @@ nni_msgq_fini(nni_msgq *mq)
if (mq == NULL) {
return;
}
+ nni_thr_fini(&mq->mq_notify_thr);
nni_cv_fini(&mq->mq_drained);
nni_cv_fini(&mq->mq_writeable);
nni_cv_fini(&mq->mq_readable);
@@ -131,13 +180,24 @@ nni_msgq_fini(nni_msgq *mq)
}
-void
+int
nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg)
{
+ int rv;
+
+ nni_thr_fini(&mq->mq_notify_thr);
+
nni_mtx_lock(&mq->mq_lock);
+ rv = nni_thr_init(&mq->mq_notify_thr, nni_msgq_notifier, mq);
+ if (rv != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
mq->mq_notify_fn = fn;
mq->mq_notify_arg = arg;
+ nni_thr_run(&mq->mq_notify_thr);
nni_mtx_unlock(&mq->mq_lock);
+ return (0);
}
@@ -197,6 +257,7 @@ nni_msgq_signal(nni_msgq *mq, int *signal)
mq->mq_wwait = 0;
nni_cv_wake(&mq->mq_readable);
nni_cv_wake(&mq->mq_writeable);
+ nni_cv_wake(&mq->mq_notify_cv);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -205,9 +266,6 @@ int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{
int rv;
- int notify = 0;
- nni_msgq_notify_fn fn;
- void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -247,8 +305,16 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
return (NNG_EAGAIN);
}
- // not writeable, so wait until something changes
+ // waiting....
mq->mq_wwait = 1;
+
+ // if we are unbuffered, kick the notifier, because we're
+ // writable.
+ if (mq->mq_cap == 0) {
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
+ }
+
+ // not writeable, so wait until something changes
rv = nni_cv_until(&mq->mq_writeable, expire);
if (rv == NNG_ETIMEDOUT) {
nni_mtx_unlock(&mq->mq_lock);
@@ -267,18 +333,12 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_readable);
}
- notify = NNI_MSGQ_NOTIFY_CANGET;
if (mq->mq_len < mq->mq_cap) {
- notify |= NNI_MSGQ_NOTIFY_CANPUT;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
}
- fn = mq->mq_notify_fn;
- arg = mq->mq_notify_arg;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
nni_mtx_unlock(&mq->mq_lock);
- // The notify callback is executed outside of the lock.
- if (fn != NULL) {
- fn(mq, notify, arg);
- }
return (0);
}
@@ -289,10 +349,6 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
int
nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
{
- int notify = 0;
- nni_msgq_notify_fn fn;
- void *arg;
-
nni_mtx_lock(&mq->mq_lock);
// if closed, we don't put more... this check is first!
@@ -319,18 +375,9 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
nni_cv_wake(&mq->mq_readable);
}
- notify = NNI_MSGQ_NOTIFY_CANGET;
- if (mq->mq_len < mq->mq_cap) {
- notify |= NNI_MSGQ_NOTIFY_CANPUT;
- }
- fn = mq->mq_notify_fn;
- arg = mq->mq_notify_arg;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
nni_mtx_unlock(&mq->mq_lock);
- if (fn != NULL) {
- fn(mq, notify, arg);
- }
-
return (0);
}
@@ -339,9 +386,6 @@ static int
nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
{
int rv;
- int notify = 0;
- nni_msgq_notify_fn fn;
- void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -372,6 +416,13 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
nni_cv_wake(&mq->mq_writeable);
}
mq->mq_rwait = 1;
+
+ if (mq->mq_cap == 0) {
+ // If unbuffered, kick it since a writer would not
+ // block.
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
+ }
+
rv = nni_cv_until(&mq->mq_readable, expire);
if (rv == NNG_ETIMEDOUT) {
nni_mtx_unlock(&mq->mq_lock);
@@ -391,18 +442,12 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
mq->mq_wwait = 0;
nni_cv_wake(&mq->mq_writeable);
}
- notify = NNI_MSGQ_NOTIFY_CANPUT;
- if (mq->mq_len > 0) {
- notify |= NNI_MSGQ_NOTIFY_CANGET;
+ if (mq->mq_len) {
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
}
- fn = mq->mq_notify_fn;
- arg = mq->mq_notify_arg;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
nni_mtx_unlock(&mq->mq_lock);
- if (fn != NULL) {
- fn(mq, notify, arg);
- }
-
return (0);
}
@@ -475,6 +520,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_notify_cv);
while (mq->mq_len > 0) {
if (nni_cv_until(&mq->mq_drained, expire) != 0) {
break;
@@ -502,6 +548,7 @@ nni_msgq_close(nni_msgq *mq)
mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_notify_cv);
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {