diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-23 01:05:35 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-23 01:05:35 -0800 |
| commit | 07478f02caaebf74c11b366d048ba696a8678fec (patch) | |
| tree | 9af4ff5605d96fc98c2d687c07c1d6bb1754eb0f /src/core | |
| parent | 052e37eebb6b34c37997f46813689f8bbba92c18 (diff) | |
| download | nng-07478f02caaebf74c11b366d048ba696a8678fec.tar.gz nng-07478f02caaebf74c11b366d048ba696a8678fec.tar.bz2 nng-07478f02caaebf74c11b366d048ba696a8678fec.zip | |
Support for unbuffered msgqueues (like Go unbuffered channels.)
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/msgqueue.c | 82 |
1 files changed, 55 insertions, 27 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 266a8e26..823f6e45 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -20,10 +20,12 @@ struct nni_msgqueue { nni_cond mq_writeable; nni_cond mq_drained; int mq_cap; + int mq_alloc; // alloc is cap + 2... int mq_len; int mq_get; int mq_put; int mq_closed; + int mq_rwait; // readers waiting (unbuffered) nni_msg ** mq_msgs; }; @@ -32,10 +34,19 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap) { struct nni_msgqueue *mq; int rv; + int alloc; - if (cap < 1) { + if (cap < 0) { return (NNG_EINVAL); } + + // We allocate 2 extra cells in the fifo. One to accommodate a + // waiting writer when cap == 0. (We can "briefly" move the message + // through. This lets us behave the same as unbuffered Go channels. + // The second cell is to permit pushback later, e.g. for REQ to stash + // a message back at the end to do a retry. + alloc = cap + 2; + if ((mq = nni_alloc(sizeof (*mq))) == NULL) { return (NNG_ENOMEM); } @@ -59,7 +70,7 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap) nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } - if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) { + if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) { nni_cond_fini(&mq->mq_drained); nni_cond_fini(&mq->mq_writeable); nni_cond_fini(&mq->mq_readable); @@ -68,6 +79,7 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap) } mq->mq_cap = cap; + mq->mq_alloc = alloc; mq->mq_len = 0; mq->mq_get = 0; mq->mq_put = 0; @@ -92,14 +104,14 @@ nni_msgqueue_destroy(nni_msgqueue *mq) while (mq->mq_len > 0) { msg = mq->mq_msgs[mq->mq_get]; mq->mq_get++; - if (mq->mq_get > mq->mq_cap) { + if (mq->mq_get > mq->mq_alloc) { mq->mq_get = 0; } mq->mq_len--; nni_msg_free(msg); } - nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg *)); + nni_free(mq->mq_msgs, mq->mq_alloc * sizeof (nng_msg *)); nni_free(mq, sizeof (*mq)); } @@ -126,38 +138,48 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, { nni_mutex_enter(&mq->mq_lock); - while ((!mq->mq_closed) && - (mq->mq_len == mq->mq_cap) && - (!*signal)) { + for (;;) { + // if closed, we don't put more... this check is first! + if (mq->mq_closed) { + nni_mutex_exit(&mq->mq_lock); + return (NNG_ECLOSED); + } + + // room in the queue? + if (mq->mq_len < mq->mq_cap) + break; + + // unbuffered, room for one, and a reader waiting? + if (mq->mq_rwait && (mq->mq_len == mq->mq_cap)) + break; + + // interrupted? + if (*signal) { + nni_mutex_exit(&mq->mq_lock); + return (NNG_EINTR); + } + + // single poll? + if (expire == NNI_TIME_ZERO) { + nni_mutex_exit(&mq->mq_lock); + return (NNG_EAGAIN); + } + + // timedout? if (expire <= nni_clock()) { nni_mutex_exit(&mq->mq_lock); - if (expire == NNI_TIME_ZERO) { - return (NNG_EAGAIN); - } return (NNG_ETIMEDOUT); } - (void) nni_cond_waituntil(&mq->mq_writeable, expire); - } - // Once a queue is closed, you can't write to it. It can still be - // read from, at least until its empty. - if (mq->mq_closed) { - nni_mutex_exit(&mq->mq_lock); - return (NNG_ECLOSED); + // not writeable, so wait until something changes + (void) nni_cond_waituntil(&mq->mq_writeable, expire); } - if ((mq->mq_len == mq->mq_cap) && (*signal)) { - // We are being interrupted. We only allow an interrupt - // if there is no room though, because we'd really prefer - // to queue the data. Otherwise our failure to queue - // the data could lead to starvation. - nni_mutex_exit(&mq->mq_lock); - return (NNG_EINTR); - } + // Writeable! Yay!! mq->mq_msgs[mq->mq_put] = msg; mq->mq_put++; - if (mq->mq_put == mq->mq_cap) { + if (mq->mq_put == mq->mq_alloc) { mq->mq_put = 0; } mq->mq_len++; @@ -183,7 +205,13 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, } return (NNG_ETIMEDOUT); } + mq->mq_rwait++; + if (mq->mq_cap == 0) { + // If a writer is blocked, unblock him. + nni_cond_signal(&mq->mq_writeable); + } (void) nni_cond_waituntil(&mq->mq_readable, expire); + mq->mq_rwait--; } // If there is any data left in the message queue, we will still @@ -205,7 +233,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, *msgp = mq->mq_msgs[mq->mq_get]; mq->mq_len--; mq->mq_get++; - if (mq->mq_get == mq->mq_cap) { + if (mq->mq_get == mq->mq_alloc) { mq->mq_get = 0; } if (mq->mq_len == (mq->mq_cap - 1)) { |
