diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/msqueue.c | 120 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 10 |
2 files changed, 127 insertions, 3 deletions
diff --git a/src/core/msqueue.c b/src/core/msqueue.c index 3e4ade79..7becabfb 100644 --- a/src/core/msqueue.c +++ b/src/core/msqueue.c @@ -109,3 +109,123 @@ nni_msgqueue_destroy(nni_msgqueue_t mq) nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t)); nni_free(mq, sizeof (*mq)); } + +int +nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) +{ + uint64_t expire, now; + + if (tmout > 0) { + expire = nni_clock() + tmout; + } + + nni_mutex_enter(mq->mq_lock); + + while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap)) { + if (tmout == 0) { + nni_mutex_exit(mq->mq_lock); + return (NNG_EAGAIN); + } + + if (tmout < 0) { + (void) nni_cond_wait(mq->mq_writeable); + continue; + } + + now = nni_clock(); + if (now >= expire) { + nni_mutex_exit(mq->mq_lock); + return (NNG_ETIMEDOUT); + } + (void) nni_cond_timedwait(mq->mq_writeable, (expire - now)); + } + + if (mq->mq_closed) { + nni_mutex_exit(mq->mq_lock); + return (NNG_ECLOSED); + } + + mq->mq_msgs[mq->mq_put] = msg; + mq->mq_put++; + if (mq->mq_put == mq->mq_cap) { + mq->mq_put = 0; + } + mq->mq_len++; + if (mq->mq_len == 1) { + (void) nni_cond_signal(mq->mq_readable); + } + nni_mutex_exit(mq->mq_lock); + return (0); +} + +int +nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) +{ + uint64_t expire, now; + + if (tmout > 0) { + expire = nni_clock() + tmout; + } + + nni_mutex_enter(mq->mq_lock); + + while ((!mq->mq_closed) && (mq->mq_len == 0)) { + if (tmout == 0) { + nni_mutex_exit(mq->mq_lock); + return (NNG_EAGAIN); + } + + if (tmout < 0) { + (void) nni_cond_wait(mq->mq_readable); + continue; + } + + now = nni_clock(); + if (now >= expire) { + nni_mutex_exit(mq->mq_lock); + return (NNG_ETIMEDOUT); + } + (void) nni_cond_timedwait(mq->mq_readable, (expire - now)); + } + + if (mq->mq_closed) { + nni_mutex_exit(mq->mq_lock); + return (NNG_ECLOSED); + } + + *msgp = mq->mq_msgs[mq->mq_get]; + mq->mq_len--; + mq->mq_get++; + if (mq->mq_get == mq->mq_cap) { + mq->mq_get = 0; + } + mq->mq_len++; + if (mq->mq_len == (mq->mq_cap - 1)) { + (void) nni_cond_signal(mq->mq_writeable); + } + nni_mutex_exit(mq->mq_lock); + return (0); +} + +void +nni_msgqueue_close(nni_msgqueue_t mq) +{ + nng_msg_t msg; + + nni_mutex_enter(mq->mq_lock); + mq->mq_closed = 1; + nni_cond_broadcast(mq->mq_writeable); + nni_cond_broadcast(mq->mq_readable); + + /* Free the messages orphaned in the queue. */ + while (mq->mq_len > 0) { + msg = mq->mq_msgs[mq->mq_get]; + mq->mq_get++; + if (mq->mq_get > mq->mq_cap) { + mq->mq_get = 0; + } + mq->mq_len--; + nng_msg_free(msg); + } + nni_mutex_exit(mq->mq_lock); +} diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index a62ac101..683e0df1 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -80,9 +80,6 @@ extern int nni_msgqueue_create(nni_msgqueue_t *, int); */ extern void nni_msgqueue_destroy(nni_msgqueue_t); -extern int nni_msgqueue_len(nni_msgqueue_t); -extern int nni_msgqueue_cap(nni_msgqueue_t); - /* * nni_msgqueue_put attempts to put a message to the queue. It will wait * for the timeout (us), if the value is positive. If the value is negative @@ -102,4 +99,11 @@ extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int); */ extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int); +/* + * nni_msgqueue_close closes the queue. After this all operates on the + * message queue will return NNG_ECLOSED. Messages inside the queue + * are freed. Unlike closing a go channel, this operation is idempotent. + */ +extern void nni_msgqueue_close(nni_msgqueue_t); + #endif /* NNG_IMPL_H */ |
