aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/msgqueue.c63
-rw-r--r--src/core/msgqueue.h14
-rw-r--r--src/nng.h1
3 files changed, 74 insertions, 4 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 9f168878..7a4eaf52 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -108,8 +108,26 @@ nni_msgqueue_destroy(nni_msgqueue_t mq)
nni_free(mq, sizeof (*mq));
}
+/*
+ * nni_msgqueue_signal raises a signal on the signal object. This allows a
+ * waiter to be signaled, so that it can be woken e.g. due to a pipe closing.
+ * Note that the signal object must be *zero* if no signal is raised.
+ */
+void
+nni_msgqueue_signal(nni_msgqueue_t mq, int *signal)
+{
+ nni_mutex_enter(mq->mq_lock);
+ *signal = 1;
+ /*
+ * We have to wake everyone.
+ */
+ nni_cond_broadcast(mq->mq_readable);
+ nni_cond_broadcast(mq->mq_writeable);
+ nni_mutex_exit(mq->mq_lock);
+}
+
int
-nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
+nni_msgqueue_put_sig(nni_msgqueue_t mq, nng_msg_t msg, int tmout, int *signal)
{
uint64_t expire, now;
@@ -119,7 +137,7 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
nni_mutex_enter(mq->mq_lock);
- while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap)) {
+ while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) {
if (tmout == 0) {
nni_mutex_exit(mq->mq_lock);
return (NNG_EAGAIN);
@@ -143,6 +161,17 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
return (NNG_ECLOSED);
}
+ if ((mq->mq_len == mq->mq_cap) && (*signal)) {
+ /*
+ * We are being interrupted. We only allow an interrupt
+ * if there is no room though, because we'd really prefer
+ * to queue the data. Otherwise our failure to queue
+ * the data could lead to starvation.
+ */
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_EINTR);
+ }
+
mq->mq_msgs[mq->mq_put] = msg;
mq->mq_put++;
if (mq->mq_put == mq->mq_cap) {
@@ -157,7 +186,7 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
}
int
-nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
+nni_msgqueue_get_sig(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout, int *signal)
{
uint64_t expire, now;
@@ -167,7 +196,7 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
nni_mutex_enter(mq->mq_lock);
- while ((!mq->mq_closed) && (mq->mq_len == 0)) {
+ while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) {
if (tmout == 0) {
nni_mutex_exit(mq->mq_lock);
return (NNG_EAGAIN);
@@ -191,6 +220,17 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
return (NNG_ECLOSED);
}
+ if ((mq->mq_len == 0) && (*signal)) {
+ /*
+ * We are being interrupted. We only allow an interrupt
+ * if there is no data though, because we'd really prefer
+ * to give back the data. Otherwise our failure to deal
+ * with the data could lead to starvation.
+ */
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_EINTR);
+ }
+
*msgp = mq->mq_msgs[mq->mq_get];
mq->mq_len--;
mq->mq_get++;
@@ -203,6 +243,21 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
}
nni_mutex_exit(mq->mq_lock);
return (0);
+
+}
+
+int
+nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
+{
+ int nosig = 0;
+ return (nni_msgqueue_get_sig(mq, msgp, tmout, &nosig));
+}
+
+int
+nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
+{
+ int nosig = 0;
+ return (nni_msgqueue_put_sig(mq, msg, tmout, &nosig));
}
void
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 9d856edd..85b22ece 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -64,6 +64,20 @@ extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int);
extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int);
/*
+ * The following two functions are interruptible versions of msgqueue_get
+ * and msgqueue_put. The signal argument (pointer) must be initialized
+ * to zero. Then, we can raise a signal, by calling nni_msgqueue_signal
+ * on the same object. The signal flag will remain raised until it is
+ * cleared to zero. If a routine is interrupted, it will return NNG_EINTR.
+ * Note that only threads using the signal object will be interrupted;
+ * this has no effect on other threads that may be waiting on the msgqueue
+ * as well.
+ */
+extern int nni_msgqueue_put_sig(nni_msgqueue_t, nng_msg_t, int, int *);
+extern int nni_msgqueue_get_sig(nni_msgqueue_t, nng_msg_t *, int, int *);
+extern void nni_msgqueue_signal(nni_msgqueue_t, int *);
+
+/*
* nni_msgqueue_close closes the queue. After this all operates on the
* message queue will return NNG_ECLOSED. Messages inside the queue
* are freed. Unlike closing a go channel, this operation is idempotent.
diff --git a/src/nng.h b/src/nng.h
index e20d3543..0adadb39 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -422,6 +422,7 @@ NNG_DECL int nng_device(nng_socket_t, nng_socket_t);
* Error codes. These may happen to align to errnos used on your platform,
* but do not count on this.
*/
+#define NNG_EINTR (-1)
#define NNG_ENOMEM (-2)
#define NNG_EINVAL (-3)
#define NNG_EBUSY (-4)