summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-22 13:22:18 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-22 13:22:18 -0800
commitee969ad99dc1e07e1c38876223e7aed13463b121 (patch)
tree40bde325d041532661e7dcc3441185aca7701e53 /src/core/msgqueue.c
parent6c1325a2b17548a4249d26a846bc32b95b7d747d (diff)
downloadnng-ee969ad99dc1e07e1c38876223e7aed13463b121.tar.gz
nng-ee969ad99dc1e07e1c38876223e7aed13463b121.tar.bz2
nng-ee969ad99dc1e07e1c38876223e7aed13463b121.zip
Synchronization enhancements - inproc & msgqueue. Absolute waits...
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c197
1 files changed, 85 insertions, 112 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 6b208e70..cbdb94c6 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -1,35 +1,33 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#include "nng_impl.h"
-/*
- * Message queue. These operate in some respects like Go channels,
- * but as we have access to the internals, we have made some fundamental
- * differences and improvements. For example, these can grow, and either
- * side can close, and they may be closed more than once.
- */
+// Message queue. These operate in some respects like Go channels,
+// but as we have access to the internals, we have made some fundamental
+// differences and improvements. For example, these can grow, and either
+// side can close, and they may be closed more than once.
struct nni_msgqueue {
- nni_mutex_t mq_lock;
- nni_cond_t mq_readable;
- nni_cond_t mq_writeable;
+ nni_mutex mq_lock;
+ nni_cond mq_readable;
+ nni_cond mq_writeable;
int mq_cap;
int mq_len;
int mq_get;
int mq_put;
int mq_closed;
- nng_msg_t * mq_msgs;
+ nni_msg ** mq_msgs;
};
int
-nni_msgqueue_create(nni_msgqueue_t *mqp, int cap)
+nni_msgqueue_create(nni_msgqueue **mqp, int cap)
{
struct nni_msgqueue *mq;
int rv;
@@ -40,24 +38,24 @@ nni_msgqueue_create(nni_msgqueue_t *mqp, int cap)
if ((mq = nni_alloc(sizeof (*mq))) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mutex_create(&mq->mq_lock)) != 0) {
+ if ((rv = nni_mutex_init(&mq->mq_lock)) != 0) {
nni_free(mq, sizeof (*mq));
return (rv);
}
- if ((rv = nni_cond_create(&mq->mq_readable, mq->mq_lock)) != 0) {
- nni_mutex_destroy(mq->mq_lock);
+ if ((rv = nni_cond_init(&mq->mq_readable, &mq->mq_lock)) != 0) {
+ nni_mutex_fini(&mq->mq_lock);
nni_free(mq, sizeof (*mq));
return (NNG_ENOMEM);
}
- if ((rv = nni_cond_create(&mq->mq_writeable, mq->mq_lock)) != 0) {
- nni_cond_destroy(mq->mq_readable);
- nni_mutex_destroy(mq->mq_lock);
+ if ((rv = nni_cond_init(&mq->mq_writeable, &mq->mq_lock)) != 0) {
+ nni_cond_fini(&mq->mq_readable);
+ nni_mutex_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) {
- nni_cond_destroy(mq->mq_writeable);
- nni_cond_destroy(mq->mq_readable);
- nni_mutex_destroy(mq->mq_lock);
+ nni_cond_fini(&mq->mq_writeable);
+ nni_cond_fini(&mq->mq_readable);
+ nni_mutex_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
@@ -73,13 +71,13 @@ nni_msgqueue_create(nni_msgqueue_t *mqp, int cap)
void
-nni_msgqueue_destroy(nni_msgqueue_t mq)
+nni_msgqueue_destroy(nni_msgqueue *mq)
{
- nni_msg_t msg;
+ nni_msg *msg;
- nni_cond_destroy(mq->mq_writeable);
- nni_cond_destroy(mq->mq_readable);
- nni_mutex_destroy(mq->mq_lock);
+ nni_cond_fini(&mq->mq_writeable);
+ nni_cond_fini(&mq->mq_readable);
+ nni_mutex_fini(&mq->mq_lock);
/* Free any orphaned messages. */
while (mq->mq_len > 0) {
@@ -97,69 +95,54 @@ nni_msgqueue_destroy(nni_msgqueue_t mq)
}
-/*
- * nni_msgqueue_signal raises a signal on the signal object. This allows a
- * waiter to be signaled, so that it can be woken e.g. due to a pipe closing.
- * Note that the signal object must be *zero* if no signal is raised.
- */
+// nni_msgqueue_signal raises a signal on the signal object. This allows a
+// waiter to be signaled, so that it can be woken e.g. due to a pipe closing.
+// Note that the signal object must be *zero* if no signal is raised.
void
-nni_msgqueue_signal(nni_msgqueue_t mq, int *signal)
+nni_msgqueue_signal(nni_msgqueue *mq, int *signal)
{
- nni_mutex_enter(mq->mq_lock);
+ nni_mutex_enter(&mq->mq_lock);
*signal = 1;
- /*
- * We have to wake everyone.
- */
- nni_cond_broadcast(mq->mq_readable);
- nni_cond_broadcast(mq->mq_writeable);
- nni_mutex_exit(mq->mq_lock);
+ // We have to wake everyone.
+ nni_cond_broadcast(&mq->mq_readable);
+ nni_cond_broadcast(&mq->mq_writeable);
+ nni_mutex_exit(&mq->mq_lock);
}
int
-nni_msgqueue_put_sig(nni_msgqueue_t mq, nni_msg_t msg, int tmout, int *signal)
+nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal)
{
- uint64_t expire, now;
+ uint64_t expire;
- if (tmout > 0) {
+ if (tmout >= 0) {
expire = nni_clock() + tmout;
+ } else {
+ expire = 0xffffffffffffffffull;
}
- nni_mutex_enter(mq->mq_lock);
+ nni_mutex_enter(&mq->mq_lock);
while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) {
- if (tmout == 0) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_EAGAIN);
- }
-
- if (tmout < 0) {
- (void) nni_cond_wait(mq->mq_writeable);
- continue;
- }
-
- now = nni_clock();
- if (now >= expire) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_ETIMEDOUT);
+ if (expire <= nni_clock()) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT);
}
- (void) nni_cond_timedwait(mq->mq_writeable, (expire - now));
+ (void) nni_cond_waituntil(&mq->mq_writeable, expire);
}
if (mq->mq_closed) {
- nni_mutex_exit(mq->mq_lock);
+ nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
}
if ((mq->mq_len == mq->mq_cap) && (*signal)) {
- /*
- * We are being interrupted. We only allow an interrupt
- * if there is no room though, because we'd really prefer
- * to queue the data. Otherwise our failure to queue
- * the data could lead to starvation.
- */
- nni_mutex_exit(mq->mq_lock);
+ // We are being interrupted. We only allow an interrupt
+ // if there is no room though, because we'd really prefer
+ // to queue the data. Otherwise our failure to queue
+ // the data could lead to starvation.
+ nni_mutex_exit(&mq->mq_lock);
return (NNG_EINTR);
}
@@ -170,56 +153,46 @@ nni_msgqueue_put_sig(nni_msgqueue_t mq, nni_msg_t msg, int tmout, int *signal)
}
mq->mq_len++;
if (mq->mq_len == 1) {
- (void) nni_cond_signal(mq->mq_readable);
+ (void) nni_cond_signal(&mq->mq_readable);
}
- nni_mutex_exit(mq->mq_lock);
+ nni_mutex_exit(&mq->mq_lock);
return (0);
}
int
-nni_msgqueue_get_sig(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout, int *signal)
+nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal)
{
- uint64_t expire, now;
+ uint64_t expire;
- if (tmout > 0) {
+ if (tmout >= 0) {
expire = nni_clock() + tmout;
+ } else {
+ expire = 0xffffffffffffffffull;
}
- nni_mutex_enter(mq->mq_lock);
+ nni_mutex_enter(&mq->mq_lock);
while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) {
- if (tmout == 0) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_EAGAIN);
- }
-
- if (tmout < 0) {
- (void) nni_cond_wait(mq->mq_readable);
- continue;
- }
-
- now = nni_clock();
- if (now >= expire) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_ETIMEDOUT);
+ if (expire <= nni_clock()) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT);
}
- (void) nni_cond_timedwait(mq->mq_readable, (expire - now));
+ (void) nni_cond_waituntil(&mq->mq_readable, expire);
}
if (mq->mq_closed) {
- nni_mutex_exit(mq->mq_lock);
+ nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
}
if ((mq->mq_len == 0) && (*signal)) {
- /*
- * We are being interrupted. We only allow an interrupt
- * if there is no data though, because we'd really prefer
- * to give back the data. Otherwise our failure to deal
- * with the data could lead to starvation.
- */
- nni_mutex_exit(mq->mq_lock);
+ // We are being interrupted. We only allow an interrupt
+ // if there is no data though, because we'd really prefer
+ // to give back the data. Otherwise our failure to deal
+ // with the data could lead to starvation; also lingering
+ // relies on this not interrupting if data is pending.
+ nni_mutex_exit(&mq->mq_lock);
return (NNG_EINTR);
}
@@ -231,15 +204,15 @@ nni_msgqueue_get_sig(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout, int *signal)
}
mq->mq_len++;
if (mq->mq_len == (mq->mq_cap - 1)) {
- (void) nni_cond_signal(mq->mq_writeable);
+ (void) nni_cond_signal(&mq->mq_writeable);
}
- nni_mutex_exit(mq->mq_lock);
+ nni_mutex_exit(&mq->mq_lock);
return (0);
}
int
-nni_msgqueue_get(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout)
+nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp, int tmout)
{
int nosig = 0;
@@ -248,7 +221,7 @@ nni_msgqueue_get(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout)
int
-nni_msgqueue_put(nni_msgqueue_t mq, nni_msg_t msg, int tmout)
+nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg, int tmout)
{
int nosig = 0;
@@ -257,16 +230,16 @@ nni_msgqueue_put(nni_msgqueue_t mq, nni_msg_t msg, int tmout)
void
-nni_msgqueue_close(nni_msgqueue_t mq)
+nni_msgqueue_close(nni_msgqueue *mq)
{
nni_msg_t msg;
- nni_mutex_enter(mq->mq_lock);
+ nni_mutex_enter(&mq->mq_lock);
mq->mq_closed = 1;
- nni_cond_broadcast(mq->mq_writeable);
- nni_cond_broadcast(mq->mq_readable);
+ nni_cond_broadcast(&mq->mq_writeable);
+ nni_cond_broadcast(&mq->mq_readable);
- /* Free the messages orphaned in the queue. */
+ // Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
msg = mq->mq_msgs[mq->mq_get];
mq->mq_get++;
@@ -276,5 +249,5 @@ nni_msgqueue_close(nni_msgqueue_t mq)
mq->mq_len--;
nni_msg_free(msg);
}
- nni_mutex_exit(mq->mq_lock);
+ nni_mutex_exit(&mq->mq_lock);
}