From 4919519754a0b5aee826add75273c291c33c4b5f Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 12 Dec 2016 21:29:52 -0800 Subject: Make an interruptible version of message queues. --- src/core/msgqueue.c | 63 +++++++++++++++++++++++++++++++++++++++++++++++++---- src/core/msgqueue.h | 14 ++++++++++++ src/nng.h | 1 + 3 files changed, 74 insertions(+), 4 deletions(-) diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 9f168878..7a4eaf52 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -108,8 +108,26 @@ nni_msgqueue_destroy(nni_msgqueue_t mq) nni_free(mq, sizeof (*mq)); } +/* + * nni_msgqueue_signal raises a signal on the signal object. This allows a + * waiter to be signaled, so that it can be woken e.g. due to a pipe closing. + * Note that the signal object must be *zero* if no signal is raised. + */ +void +nni_msgqueue_signal(nni_msgqueue_t mq, int *signal) +{ + nni_mutex_enter(mq->mq_lock); + *signal = 1; + /* + * We have to wake everyone. + */ + nni_cond_broadcast(mq->mq_readable); + nni_cond_broadcast(mq->mq_writeable); + nni_mutex_exit(mq->mq_lock); +} + int -nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) +nni_msgqueue_put_sig(nni_msgqueue_t mq, nng_msg_t msg, int tmout, int *signal) { uint64_t expire, now; @@ -119,7 +137,7 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) nni_mutex_enter(mq->mq_lock); - while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap)) { + while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) { if (tmout == 0) { nni_mutex_exit(mq->mq_lock); return (NNG_EAGAIN); @@ -143,6 +161,17 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) return (NNG_ECLOSED); } + if ((mq->mq_len == mq->mq_cap) && (*signal)) { + /* + * We are being interrupted. We only allow an interrupt + * if there is no room though, because we'd really prefer + * to queue the data. Otherwise our failure to queue + * the data could lead to starvation. + */ + nni_mutex_exit(mq->mq_lock); + return (NNG_EINTR); + } + mq->mq_msgs[mq->mq_put] = msg; mq->mq_put++; if (mq->mq_put == mq->mq_cap) { @@ -157,7 +186,7 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) } int -nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) +nni_msgqueue_get_sig(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout, int *signal) { uint64_t expire, now; @@ -167,7 +196,7 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) nni_mutex_enter(mq->mq_lock); - while ((!mq->mq_closed) && (mq->mq_len == 0)) { + while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) { if (tmout == 0) { nni_mutex_exit(mq->mq_lock); return (NNG_EAGAIN); @@ -191,6 +220,17 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) return (NNG_ECLOSED); } + if ((mq->mq_len == 0) && (*signal)) { + /* + * We are being interrupted. We only allow an interrupt + * if there is no data though, because we'd really prefer + * to give back the data. Otherwise our failure to deal + * with the data could lead to starvation. + */ + nni_mutex_exit(mq->mq_lock); + return (NNG_EINTR); + } + *msgp = mq->mq_msgs[mq->mq_get]; mq->mq_len--; mq->mq_get++; @@ -203,6 +243,21 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) } nni_mutex_exit(mq->mq_lock); return (0); + +} + +int +nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) +{ + int nosig = 0; + return (nni_msgqueue_get_sig(mq, msgp, tmout, &nosig)); +} + +int +nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) +{ + int nosig = 0; + return (nni_msgqueue_put_sig(mq, msg, tmout, &nosig)); } void diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 9d856edd..85b22ece 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -63,6 +63,20 @@ extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int); */ extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int); +/* + * The following two functions are interruptible versions of msgqueue_get + * and msgqueue_put. The signal argument (pointer) must be initialized + * to zero. Then, we can raise a signal, by calling nni_msgqueue_signal + * on the same object. The signal flag will remain raised until it is + * cleared to zero. If a routine is interrupted, it will return NNG_EINTR. + * Note that only threads using the signal object will be interrupted; + * this has no effect on other threads that may be waiting on the msgqueue + * as well. + */ +extern int nni_msgqueue_put_sig(nni_msgqueue_t, nng_msg_t, int, int *); +extern int nni_msgqueue_get_sig(nni_msgqueue_t, nng_msg_t *, int, int *); +extern void nni_msgqueue_signal(nni_msgqueue_t, int *); + /* * nni_msgqueue_close closes the queue. After this all operates on the * message queue will return NNG_ECLOSED. Messages inside the queue diff --git a/src/nng.h b/src/nng.h index e20d3543..0adadb39 100644 --- a/src/nng.h +++ b/src/nng.h @@ -422,6 +422,7 @@ NNG_DECL int nng_device(nng_socket_t, nng_socket_t); * Error codes. These may happen to align to errnos used on your platform, * but do not count on this. */ +#define NNG_EINTR (-1) #define NNG_ENOMEM (-2) #define NNG_EINVAL (-3) #define NNG_EBUSY (-4) -- cgit v1.2.3-70-g09d2