summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c84
1 files changed, 29 insertions, 55 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 60744ba1..153d7c15 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -27,8 +27,6 @@ struct nni_msgq {
int mq_closed;
int mq_puterr;
int mq_geterr;
- int mq_rwait; // readers waiting (unbuffered)
- int mq_wwait;
nni_msg ** mq_msgs;
int mq_notify_sig;
@@ -39,6 +37,8 @@ struct nni_msgq {
nni_list mq_aio_putq;
nni_list mq_aio_getq;
+ nni_list mq_aio_notify_get;
+ nni_list mq_aio_notify_put;
nni_timer_node mq_timer;
nni_time mq_expire;
@@ -117,6 +117,8 @@ nni_msgq_init(nni_msgq **mqp, int cap)
}
NNI_LIST_INIT(&mq->mq_aio_putq, nni_aio, a_prov_node);
NNI_LIST_INIT(&mq->mq_aio_getq, nni_aio, a_prov_node);
+ NNI_LIST_INIT(&mq->mq_aio_notify_get, nni_aio, a_prov_node);
+ NNI_LIST_INIT(&mq->mq_aio_notify_put, nni_aio, a_prov_node);
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
goto fail;
@@ -142,8 +144,6 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_closed = 0;
mq->mq_puterr = 0;
mq->mq_geterr = 0;
- mq->mq_wwait = 0;
- mq->mq_rwait = 0;
mq->mq_notify_fn = NULL;
mq->mq_notify_arg = NULL;
mq->mq_expire = NNI_TIME_NEVER;
@@ -314,8 +314,7 @@ nni_msgq_run_putq(nni_msgq *mq)
}
// Otherwise if we have room in the buffer, just queue it.
- if ((mq->mq_len < mq->mq_cap) ||
- ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) {
+ if (mq->mq_len < mq->mq_cap) {
nni_list_remove(&mq->mq_aio_putq, waio);
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
@@ -388,6 +387,25 @@ nni_msgq_run_getq(nni_msgq *mq)
static void
nni_msgq_run_notify(nni_msgq *mq)
{
+ nni_aio *aio;
+
+ if (mq->mq_closed) {
+ return;
+ }
+ if ((mq->mq_len < mq->mq_cap) ||
+ (nni_list_first(&mq->mq_aio_getq) != NULL)) {
+
+ NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) {
+ // This stays on the list.
+ nni_aio_finish(aio, 0, 0);
+ }
+ }
+
+ if ((mq->mq_len != 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) {
+ NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) {
+ nni_aio_finish(aio, 0, 0);
+ }
+ }
}
@@ -472,7 +490,6 @@ nni_msgq_canput(nni_msgq *mq)
return (0);
}
if ((mq->mq_len < mq->mq_cap) ||
- (mq->mq_rwait != 0) || // XXX: REMOVE ME
(nni_list_first(&mq->mq_aio_getq) != NULL)) {
nni_mtx_unlock(&mq->mq_lock);
return (1);
@@ -491,7 +508,6 @@ nni_msgq_canget(nni_msgq *mq)
return (0);
}
if ((mq->mq_len != 0) ||
- (mq->mq_wwait != 0) ||
(nni_list_first(&mq->mq_aio_putq) != NULL)) {
nni_mtx_unlock(&mq->mq_lock);
return (1);
@@ -527,8 +543,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
}
// Otherwise if we have room in the buffer, just queue it.
- if ((mq->mq_len < mq->mq_cap) ||
- ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) {
+ if (mq->mq_len < mq->mq_cap) {
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
mq->mq_put = 0;
@@ -593,6 +608,7 @@ nni_msgq_run_timeout(void *arg)
}
+#if 0
int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{
@@ -682,47 +698,6 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
nni_mtx_lock(&mq->mq_lock);
- for (;;) {
- // always prefer to deliver data if its there
- if (mq->mq_len != 0) {
- break;
- }
- if (mq->mq_closed) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_ECLOSED);
- }
- if ((rv = mq->mq_geterr) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return (rv);
- }
- if (expire == NNI_TIME_ZERO) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_EAGAIN);
- }
- if (*sig) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_EINTR);
- }
- if ((mq->mq_cap == 0) && (mq->mq_wwait)) {
- // let a write waiter know we are ready
- mq->mq_wwait = 0;
- nni_cv_wake(&mq->mq_writeable);
- }
- mq->mq_rwait = 1;
-
- if (mq->mq_cap == 0) {
- // If unbuffered, kick it since a writer would not
- // block.
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
- }
-
- rv = nni_cv_until(&mq->mq_readable, expire);
- if (rv == NNG_ETIMEDOUT) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_ETIMEDOUT);
- }
- }
-
// Readable! Yay!!
*msgp = mq->mq_msgs[mq->mq_get];
@@ -746,6 +721,9 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
}
+#endif
+
+
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
@@ -805,8 +783,6 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
- mq->mq_wwait = 0;
- mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);
nni_cv_wake(&mq->mq_notify_cv);
@@ -836,8 +812,6 @@ nni_msgq_close(nni_msgq *mq)
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
- mq->mq_wwait = 0;
- mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);
nni_cv_wake(&mq->mq_notify_cv);