diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-12 21:29:52 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-12 21:29:52 -0800 |
| commit | 4919519754a0b5aee826add75273c291c33c4b5f (patch) | |
| tree | 10413c4f1768b47ca201a319bdcf13a9c5ac9473 /src/core/msgqueue.c | |
| parent | 1d1e8703b5735cd65fd3835573a6a66868adafa6 (diff) | |
| download | nng-4919519754a0b5aee826add75273c291c33c4b5f.tar.gz nng-4919519754a0b5aee826add75273c291c33c4b5f.tar.bz2 nng-4919519754a0b5aee826add75273c291c33c4b5f.zip | |
Make an interruptible version of message queues.
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 63 |
1 files changed, 59 insertions, 4 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 9f168878..7a4eaf52 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -108,8 +108,26 @@ nni_msgqueue_destroy(nni_msgqueue_t mq) nni_free(mq, sizeof (*mq)); } +/* + * nni_msgqueue_signal raises a signal on the signal object. This allows a + * waiter to be signaled, so that it can be woken e.g. due to a pipe closing. + * Note that the signal object must be *zero* if no signal is raised. + */ +void +nni_msgqueue_signal(nni_msgqueue_t mq, int *signal) +{ + nni_mutex_enter(mq->mq_lock); + *signal = 1; + /* + * We have to wake everyone. + */ + nni_cond_broadcast(mq->mq_readable); + nni_cond_broadcast(mq->mq_writeable); + nni_mutex_exit(mq->mq_lock); +} + int -nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) +nni_msgqueue_put_sig(nni_msgqueue_t mq, nng_msg_t msg, int tmout, int *signal) { uint64_t expire, now; @@ -119,7 +137,7 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) nni_mutex_enter(mq->mq_lock); - while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap)) { + while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) { if (tmout == 0) { nni_mutex_exit(mq->mq_lock); return (NNG_EAGAIN); @@ -143,6 +161,17 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) return (NNG_ECLOSED); } + if ((mq->mq_len == mq->mq_cap) && (*signal)) { + /* + * We are being interrupted. We only allow an interrupt + * if there is no room though, because we'd really prefer + * to queue the data. Otherwise our failure to queue + * the data could lead to starvation. + */ + nni_mutex_exit(mq->mq_lock); + return (NNG_EINTR); + } + mq->mq_msgs[mq->mq_put] = msg; mq->mq_put++; if (mq->mq_put == mq->mq_cap) { @@ -157,7 +186,7 @@ nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) } int -nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) +nni_msgqueue_get_sig(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout, int *signal) { uint64_t expire, now; @@ -167,7 +196,7 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) nni_mutex_enter(mq->mq_lock); - while ((!mq->mq_closed) && (mq->mq_len == 0)) { + while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) { if (tmout == 0) { nni_mutex_exit(mq->mq_lock); return (NNG_EAGAIN); @@ -191,6 +220,17 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) return (NNG_ECLOSED); } + if ((mq->mq_len == 0) && (*signal)) { + /* + * We are being interrupted. We only allow an interrupt + * if there is no data though, because we'd really prefer + * to give back the data. Otherwise our failure to deal + * with the data could lead to starvation. + */ + nni_mutex_exit(mq->mq_lock); + return (NNG_EINTR); + } + *msgp = mq->mq_msgs[mq->mq_get]; mq->mq_len--; mq->mq_get++; @@ -203,6 +243,21 @@ nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) } nni_mutex_exit(mq->mq_lock); return (0); + +} + +int +nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) +{ + int nosig = 0; + return (nni_msgqueue_get_sig(mq, msgp, tmout, &nosig)); +} + +int +nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) +{ + int nosig = 0; + return (nni_msgqueue_put_sig(mq, msg, tmout, &nosig)); } void |
