diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-22 13:22:18 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-22 13:22:18 -0800 |
| commit | ee969ad99dc1e07e1c38876223e7aed13463b121 (patch) | |
| tree | 40bde325d041532661e7dcc3441185aca7701e53 /src/core/msgqueue.c | |
| parent | 6c1325a2b17548a4249d26a846bc32b95b7d747d (diff) | |
| download | nng-ee969ad99dc1e07e1c38876223e7aed13463b121.tar.gz nng-ee969ad99dc1e07e1c38876223e7aed13463b121.tar.bz2 nng-ee969ad99dc1e07e1c38876223e7aed13463b121.zip | |
Synchronization enhancements - inproc & msgqueue. Absolute waits...
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 197 |
1 files changed, 85 insertions, 112 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 6b208e70..cbdb94c6 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -1,35 +1,33 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "nng_impl.h" -/* - * Message queue. These operate in some respects like Go channels, - * but as we have access to the internals, we have made some fundamental - * differences and improvements. For example, these can grow, and either - * side can close, and they may be closed more than once. - */ +// Message queue. These operate in some respects like Go channels, +// but as we have access to the internals, we have made some fundamental +// differences and improvements. For example, these can grow, and either +// side can close, and they may be closed more than once. struct nni_msgqueue { - nni_mutex_t mq_lock; - nni_cond_t mq_readable; - nni_cond_t mq_writeable; + nni_mutex mq_lock; + nni_cond mq_readable; + nni_cond mq_writeable; int mq_cap; int mq_len; int mq_get; int mq_put; int mq_closed; - nng_msg_t * mq_msgs; + nni_msg ** mq_msgs; }; int -nni_msgqueue_create(nni_msgqueue_t *mqp, int cap) +nni_msgqueue_create(nni_msgqueue **mqp, int cap) { struct nni_msgqueue *mq; int rv; @@ -40,24 +38,24 @@ nni_msgqueue_create(nni_msgqueue_t *mqp, int cap) if ((mq = nni_alloc(sizeof (*mq))) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mutex_create(&mq->mq_lock)) != 0) { + if ((rv = nni_mutex_init(&mq->mq_lock)) != 0) { nni_free(mq, sizeof (*mq)); return (rv); } - if ((rv = nni_cond_create(&mq->mq_readable, mq->mq_lock)) != 0) { - nni_mutex_destroy(mq->mq_lock); + if ((rv = nni_cond_init(&mq->mq_readable, &mq->mq_lock)) != 0) { + nni_mutex_fini(&mq->mq_lock); nni_free(mq, sizeof (*mq)); return (NNG_ENOMEM); } - if ((rv = nni_cond_create(&mq->mq_writeable, mq->mq_lock)) != 0) { - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); + if ((rv = nni_cond_init(&mq->mq_writeable, &mq->mq_lock)) != 0) { + nni_cond_fini(&mq->mq_readable); + nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) { - nni_cond_destroy(mq->mq_writeable); - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); + nni_cond_fini(&mq->mq_writeable); + nni_cond_fini(&mq->mq_readable); + nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } @@ -73,13 +71,13 @@ nni_msgqueue_create(nni_msgqueue_t *mqp, int cap) void -nni_msgqueue_destroy(nni_msgqueue_t mq) +nni_msgqueue_destroy(nni_msgqueue *mq) { - nni_msg_t msg; + nni_msg *msg; - nni_cond_destroy(mq->mq_writeable); - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); + nni_cond_fini(&mq->mq_writeable); + nni_cond_fini(&mq->mq_readable); + nni_mutex_fini(&mq->mq_lock); /* Free any orphaned messages. */ while (mq->mq_len > 0) { @@ -97,69 +95,54 @@ nni_msgqueue_destroy(nni_msgqueue_t mq) } -/* - * nni_msgqueue_signal raises a signal on the signal object. This allows a - * waiter to be signaled, so that it can be woken e.g. due to a pipe closing. - * Note that the signal object must be *zero* if no signal is raised. - */ +// nni_msgqueue_signal raises a signal on the signal object. This allows a +// waiter to be signaled, so that it can be woken e.g. due to a pipe closing. +// Note that the signal object must be *zero* if no signal is raised. void -nni_msgqueue_signal(nni_msgqueue_t mq, int *signal) +nni_msgqueue_signal(nni_msgqueue *mq, int *signal) { - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); *signal = 1; - /* - * We have to wake everyone. - */ - nni_cond_broadcast(mq->mq_readable); - nni_cond_broadcast(mq->mq_writeable); - nni_mutex_exit(mq->mq_lock); + // We have to wake everyone. + nni_cond_broadcast(&mq->mq_readable); + nni_cond_broadcast(&mq->mq_writeable); + nni_mutex_exit(&mq->mq_lock); } int -nni_msgqueue_put_sig(nni_msgqueue_t mq, nni_msg_t msg, int tmout, int *signal) +nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal) { - uint64_t expire, now; + uint64_t expire; - if (tmout > 0) { + if (tmout >= 0) { expire = nni_clock() + tmout; + } else { + expire = 0xffffffffffffffffull; } - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) { - if (tmout == 0) { - nni_mutex_exit(mq->mq_lock); - return (NNG_EAGAIN); - } - - if (tmout < 0) { - (void) nni_cond_wait(mq->mq_writeable); - continue; - } - - now = nni_clock(); - if (now >= expire) { - nni_mutex_exit(mq->mq_lock); - return (NNG_ETIMEDOUT); + if (expire <= nni_clock()) { + nni_mutex_exit(&mq->mq_lock); + return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT); } - (void) nni_cond_timedwait(mq->mq_writeable, (expire - now)); + (void) nni_cond_waituntil(&mq->mq_writeable, expire); } if (mq->mq_closed) { - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); } if ((mq->mq_len == mq->mq_cap) && (*signal)) { - /* - * We are being interrupted. We only allow an interrupt - * if there is no room though, because we'd really prefer - * to queue the data. Otherwise our failure to queue - * the data could lead to starvation. - */ - nni_mutex_exit(mq->mq_lock); + // We are being interrupted. We only allow an interrupt + // if there is no room though, because we'd really prefer + // to queue the data. Otherwise our failure to queue + // the data could lead to starvation. + nni_mutex_exit(&mq->mq_lock); return (NNG_EINTR); } @@ -170,56 +153,46 @@ nni_msgqueue_put_sig(nni_msgqueue_t mq, nni_msg_t msg, int tmout, int *signal) } mq->mq_len++; if (mq->mq_len == 1) { - (void) nni_cond_signal(mq->mq_readable); + (void) nni_cond_signal(&mq->mq_readable); } - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (0); } int -nni_msgqueue_get_sig(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout, int *signal) +nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal) { - uint64_t expire, now; + uint64_t expire; - if (tmout > 0) { + if (tmout >= 0) { expire = nni_clock() + tmout; + } else { + expire = 0xffffffffffffffffull; } - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) { - if (tmout == 0) { - nni_mutex_exit(mq->mq_lock); - return (NNG_EAGAIN); - } - - if (tmout < 0) { - (void) nni_cond_wait(mq->mq_readable); - continue; - } - - now = nni_clock(); - if (now >= expire) { - nni_mutex_exit(mq->mq_lock); - return (NNG_ETIMEDOUT); + if (expire <= nni_clock()) { + nni_mutex_exit(&mq->mq_lock); + return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT); } - (void) nni_cond_timedwait(mq->mq_readable, (expire - now)); + (void) nni_cond_waituntil(&mq->mq_readable, expire); } if (mq->mq_closed) { - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); } if ((mq->mq_len == 0) && (*signal)) { - /* - * We are being interrupted. We only allow an interrupt - * if there is no data though, because we'd really prefer - * to give back the data. Otherwise our failure to deal - * with the data could lead to starvation. - */ - nni_mutex_exit(mq->mq_lock); + // We are being interrupted. We only allow an interrupt + // if there is no data though, because we'd really prefer + // to give back the data. Otherwise our failure to deal + // with the data could lead to starvation; also lingering + // relies on this not interrupting if data is pending. + nni_mutex_exit(&mq->mq_lock); return (NNG_EINTR); } @@ -231,15 +204,15 @@ nni_msgqueue_get_sig(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout, int *signal) } mq->mq_len++; if (mq->mq_len == (mq->mq_cap - 1)) { - (void) nni_cond_signal(mq->mq_writeable); + (void) nni_cond_signal(&mq->mq_writeable); } - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (0); } int -nni_msgqueue_get(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout) +nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp, int tmout) { int nosig = 0; @@ -248,7 +221,7 @@ nni_msgqueue_get(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout) int -nni_msgqueue_put(nni_msgqueue_t mq, nni_msg_t msg, int tmout) +nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg, int tmout) { int nosig = 0; @@ -257,16 +230,16 @@ nni_msgqueue_put(nni_msgqueue_t mq, nni_msg_t msg, int tmout) void -nni_msgqueue_close(nni_msgqueue_t mq) +nni_msgqueue_close(nni_msgqueue *mq) { nni_msg_t msg; - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); mq->mq_closed = 1; - nni_cond_broadcast(mq->mq_writeable); - nni_cond_broadcast(mq->mq_readable); + nni_cond_broadcast(&mq->mq_writeable); + nni_cond_broadcast(&mq->mq_readable); - /* Free the messages orphaned in the queue. */ + // Free the messages orphaned in the queue. while (mq->mq_len > 0) { msg = mq->mq_msgs[mq->mq_get]; mq->mq_get++; @@ -276,5 +249,5 @@ nni_msgqueue_close(nni_msgqueue_t mq) mq->mq_len--; nni_msg_free(msg); } - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); } |
