aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-12 21:29:52 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-12 21:29:52 -0800
commit4919519754a0b5aee826add75273c291c33c4b5f (patch)
tree10413c4f1768b47ca201a319bdcf13a9c5ac9473 /src/core/msgqueue.c
parent1d1e8703b5735cd65fd3835573a6a66868adafa6 (diff)
downloadnng-4919519754a0b5aee826add75273c291c33c4b5f.tar.gz
nng-4919519754a0b5aee826add75273c291c33c4b5f.tar.bz2
nng-4919519754a0b5aee826add75273c291c33c4b5f.zip
Make an interruptible version of message queues.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c63
1 files changed, 59 insertions, 4 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 9f168878..7a4eaf52 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -108,8 +108,26 @@ nni_msgqueue_destroy(nni_msgqueue_t mq)
nni_free(mq, sizeof (*mq));
}
+/*
+ * nni_msgqueue_signal raises a signal on the signal object. This allows a
+ * waiter to be signaled, so that it can be woken e.g. due to a pipe closing.
+ * Note that the signal object must be *zero* if no signal is raised.
+ */
+void
+nni_msgqueue_signal(nni_msgqueue_t mq, int *signal)
+{
+ nni_mutex_enter(mq->mq_lock);
+ *signal = 1;
+ /*
+ * We have to wake everyone.
+ */
+ nni_cond_broadcast(mq->mq_readable);
+ nni_cond_broadcast(mq->mq_writeable);
+ nni_mutex_exit(mq->mq_lock);
+}
+
int
-nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
+nni_msgqueue_put_sig(nni_msgqueue_t mq, nng_msg_t msg, int tmout, int *signal)
{
uint64_t expire, now;
@@ -119,7 +137,7 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
nni_mutex_enter(mq->mq_lock);
- while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap)) {
+ while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) {
if (tmout == 0) {
nni_mutex_exit(mq->mq_lock);
return (NNG_EAGAIN);
@@ -143,6 +161,17 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
return (NNG_ECLOSED);
}
+ if ((mq->mq_len == mq->mq_cap) && (*signal)) {
+ /*
+ * We are being interrupted. We only allow an interrupt
+ * if there is no room though, because we'd really prefer
+ * to queue the data. Otherwise our failure to queue
+ * the data could lead to starvation.
+ */
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_EINTR);
+ }
+
mq->mq_msgs[mq->mq_put] = msg;
mq->mq_put++;
if (mq->mq_put == mq->mq_cap) {
@@ -157,7 +186,7 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
}
int
-nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
+nni_msgqueue_get_sig(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout, int *signal)
{
uint64_t expire, now;
@@ -167,7 +196,7 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
nni_mutex_enter(mq->mq_lock);
- while ((!mq->mq_closed) && (mq->mq_len == 0)) {
+ while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) {
if (tmout == 0) {
nni_mutex_exit(mq->mq_lock);
return (NNG_EAGAIN);
@@ -191,6 +220,17 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
return (NNG_ECLOSED);
}
+ if ((mq->mq_len == 0) && (*signal)) {
+ /*
+ * We are being interrupted. We only allow an interrupt
+ * if there is no data though, because we'd really prefer
+ * to give back the data. Otherwise our failure to deal
+ * with the data could lead to starvation.
+ */
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_EINTR);
+ }
+
*msgp = mq->mq_msgs[mq->mq_get];
mq->mq_len--;
mq->mq_get++;
@@ -203,6 +243,21 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
}
nni_mutex_exit(mq->mq_lock);
return (0);
+
+}
+
+int
+nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
+{
+ int nosig = 0;
+ return (nni_msgqueue_get_sig(mq, msgp, tmout, &nosig));
+}
+
+int
+nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
+{
+ int nosig = 0;
+ return (nni_msgqueue_put_sig(mq, msg, tmout, &nosig));
}
void