aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/msgqueue.c82
1 files changed, 55 insertions, 27 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 266a8e26..823f6e45 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -20,10 +20,12 @@ struct nni_msgqueue {
nni_cond mq_writeable;
nni_cond mq_drained;
int mq_cap;
+ int mq_alloc; // alloc is cap + 2...
int mq_len;
int mq_get;
int mq_put;
int mq_closed;
+ int mq_rwait; // readers waiting (unbuffered)
nni_msg ** mq_msgs;
};
@@ -32,10 +34,19 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap)
{
struct nni_msgqueue *mq;
int rv;
+ int alloc;
- if (cap < 1) {
+ if (cap < 0) {
return (NNG_EINVAL);
}
+
+ // We allocate 2 extra cells in the fifo. One to accommodate a
+ // waiting writer when cap == 0. (We can "briefly" move the message
+ // through. This lets us behave the same as unbuffered Go channels.
+ // The second cell is to permit pushback later, e.g. for REQ to stash
+ // a message back at the end to do a retry.
+ alloc = cap + 2;
+
if ((mq = nni_alloc(sizeof (*mq))) == NULL) {
return (NNG_ENOMEM);
}
@@ -59,7 +70,7 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap)
nni_mutex_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
- if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) {
+ if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) {
nni_cond_fini(&mq->mq_drained);
nni_cond_fini(&mq->mq_writeable);
nni_cond_fini(&mq->mq_readable);
@@ -68,6 +79,7 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap)
}
mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
mq->mq_len = 0;
mq->mq_get = 0;
mq->mq_put = 0;
@@ -92,14 +104,14 @@ nni_msgqueue_destroy(nni_msgqueue *mq)
while (mq->mq_len > 0) {
msg = mq->mq_msgs[mq->mq_get];
mq->mq_get++;
- if (mq->mq_get > mq->mq_cap) {
+ if (mq->mq_get > mq->mq_alloc) {
mq->mq_get = 0;
}
mq->mq_len--;
nni_msg_free(msg);
}
- nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg *));
+ nni_free(mq->mq_msgs, mq->mq_alloc * sizeof (nng_msg *));
nni_free(mq, sizeof (*mq));
}
@@ -126,38 +138,48 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
{
nni_mutex_enter(&mq->mq_lock);
- while ((!mq->mq_closed) &&
- (mq->mq_len == mq->mq_cap) &&
- (!*signal)) {
+ for (;;) {
+ // if closed, we don't put more... this check is first!
+ if (mq->mq_closed) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_ECLOSED);
+ }
+
+ // room in the queue?
+ if (mq->mq_len < mq->mq_cap)
+ break;
+
+ // unbuffered, room for one, and a reader waiting?
+ if (mq->mq_rwait && (mq->mq_len == mq->mq_cap))
+ break;
+
+ // interrupted?
+ if (*signal) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_EINTR);
+ }
+
+ // single poll?
+ if (expire == NNI_TIME_ZERO) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_EAGAIN);
+ }
+
+ // timedout?
if (expire <= nni_clock()) {
nni_mutex_exit(&mq->mq_lock);
- if (expire == NNI_TIME_ZERO) {
- return (NNG_EAGAIN);
- }
return (NNG_ETIMEDOUT);
}
- (void) nni_cond_waituntil(&mq->mq_writeable, expire);
- }
- // Once a queue is closed, you can't write to it. It can still be
- // read from, at least until its empty.
- if (mq->mq_closed) {
- nni_mutex_exit(&mq->mq_lock);
- return (NNG_ECLOSED);
+ // not writeable, so wait until something changes
+ (void) nni_cond_waituntil(&mq->mq_writeable, expire);
}
- if ((mq->mq_len == mq->mq_cap) && (*signal)) {
- // We are being interrupted. We only allow an interrupt
- // if there is no room though, because we'd really prefer
- // to queue the data. Otherwise our failure to queue
- // the data could lead to starvation.
- nni_mutex_exit(&mq->mq_lock);
- return (NNG_EINTR);
- }
+ // Writeable! Yay!!
mq->mq_msgs[mq->mq_put] = msg;
mq->mq_put++;
- if (mq->mq_put == mq->mq_cap) {
+ if (mq->mq_put == mq->mq_alloc) {
mq->mq_put = 0;
}
mq->mq_len++;
@@ -183,7 +205,13 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
}
return (NNG_ETIMEDOUT);
}
+ mq->mq_rwait++;
+ if (mq->mq_cap == 0) {
+ // If a writer is blocked, unblock him.
+ nni_cond_signal(&mq->mq_writeable);
+ }
(void) nni_cond_waituntil(&mq->mq_readable, expire);
+ mq->mq_rwait--;
}
// If there is any data left in the message queue, we will still
@@ -205,7 +233,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
*msgp = mq->mq_msgs[mq->mq_get];
mq->mq_len--;
mq->mq_get++;
- if (mq->mq_get == mq->mq_cap) {
+ if (mq->mq_get == mq->mq_alloc) {
mq->mq_get = 0;
}
if (mq->mq_len == (mq->mq_cap - 1)) {