aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c82
1 files changed, 54 insertions, 28 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index cbdb94c6..7d892b88 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -112,22 +112,20 @@ nni_msgqueue_signal(nni_msgqueue *mq, int *signal)
int
-nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal)
+nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
+ nni_time expire, nni_signal *signal)
{
- uint64_t expire;
-
- if (tmout >= 0) {
- expire = nni_clock() + tmout;
- } else {
- expire = 0xffffffffffffffffull;
- }
-
nni_mutex_enter(&mq->mq_lock);
- while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) {
+ while ((!mq->mq_closed) &&
+ (mq->mq_len == mq->mq_cap) &&
+ (!*signal)) {
if (expire <= nni_clock()) {
nni_mutex_exit(&mq->mq_lock);
- return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT);
+ if (expire == NNI_TIME_ZERO) {
+ return (NNG_EAGAIN);
+ }
+ return (NNG_ETIMEDOUT);
}
(void) nni_cond_waituntil(&mq->mq_writeable, expire);
}
@@ -160,23 +158,19 @@ nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal)
}
-int
-nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal)
+static int
+nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
+ nni_time expire, nni_signal *signal)
{
- uint64_t expire;
-
- if (tmout >= 0) {
- expire = nni_clock() + tmout;
- } else {
- expire = 0xffffffffffffffffull;
- }
-
nni_mutex_enter(&mq->mq_lock);
while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) {
if (expire <= nni_clock()) {
nni_mutex_exit(&mq->mq_lock);
- return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT);
+ if (expire == NNI_TIME_ZERO) {
+ return (NNG_EAGAIN);
+ }
+ return (NNG_ETIMEDOUT);
}
(void) nni_cond_waituntil(&mq->mq_readable, expire);
}
@@ -212,20 +206,52 @@ nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal)
int
-nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp, int tmout)
+nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp)
{
- int nosig = 0;
+ nni_signal nosig = 0;
+
+ return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, &nosig));
+}
+
+
+int
+nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, nni_signal *signal)
+{
+ return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, signal));
+}
+
- return (nni_msgqueue_get_sig(mq, msgp, tmout, &nosig));
+int
+nni_msgqueue_get_until(nni_msgqueue *mq, nni_msg **msgp, nni_time expire)
+{
+ nni_signal nosig = 0;
+
+ return (nni_msgqueue_get_impl(mq, msgp, expire, &nosig));
+}
+
+
+int
+nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg)
+{
+ nni_signal nosig = 0;
+
+ return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, &nosig));
+}
+
+
+int
+nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, nni_signal *signal)
+{
+ return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, signal));
}
int
-nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg, int tmout)
+nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire)
{
- int nosig = 0;
+ nni_signal nosig = 0;
- return (nni_msgqueue_put_sig(mq, msg, tmout, &nosig));
+ return (nni_msgqueue_put_impl(mq, msg, expire, &nosig));
}