aboutsummaryrefslogtreecommitdiff
path: root/src/core/msqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-12 03:42:26 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-12 03:42:26 -0800
commit1adefe3879b211a47a784f477d56a9416ae72254 (patch)
treecf017599969fb109d62981cbef56eeeef4d2b108 /src/core/msqueue.c
parent091043a0df9568b18e9bb49b07762b95e3508108 (diff)
downloadnng-1adefe3879b211a47a784f477d56a9416ae72254.tar.gz
nng-1adefe3879b211a47a784f477d56a9416ae72254.tar.bz2
nng-1adefe3879b211a47a784f477d56a9416ae72254.zip
New inproc transport.
Lots of supporting changes.
Diffstat (limited to 'src/core/msqueue.c')
-rw-r--r--src/core/msqueue.c231
1 files changed, 0 insertions, 231 deletions
diff --git a/src/core/msqueue.c b/src/core/msqueue.c
deleted file mode 100644
index 7becabfb..00000000
--- a/src/core/msqueue.c
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"),
- * to deal in the Software without restriction, including without limitation
- * the rights to use, copy, modify, merge, publish, distribute, sublicense,
- * and/or sell copies of the Software, and to permit persons to whom
- * the Software is furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included
- * in all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- * IN THE SOFTWARE.
- */
-
-#include "../nng.h"
-
-#include "nng_impl.h"
-
-/*
- * Message queue. These operate in some respects like Go channels,
- * but as we have access to the internals, we have made some fundamental
- * differences and improvements. For example, these can grow, and either
- * side can close, and they may be closed more than once.
- */
-
-struct nni_msgqueue {
- nni_mutex_t mq_lock;
- nni_cond_t mq_readable;
- nni_cond_t mq_writeable;
- int mq_cap;
- int mq_len;
- int mq_get;
- int mq_put;
- int mq_closed;
- nng_msg_t *mq_msgs;
-};
-
-int
-nni_msgqueue_create(nni_msgqueue_t *mqp, int cap)
-{
- struct nni_msgqueue *mq;
- int rv;
-
- if (cap < 1) {
- return (NNG_EINVAL);
- }
- if ((mq = nni_alloc(sizeof (*mq))) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_mutex_create(&mq->mq_lock)) != 0) {
- nni_free(mq, sizeof (*mq));
- return (rv);
- }
- if ((rv = nni_cond_create(&mq->mq_readable, mq->mq_lock)) != 0) {
- nni_mutex_destroy(mq->mq_lock);
- nni_free(mq, sizeof (*mq));
- return (NNG_ENOMEM);
- }
- if ((rv = nni_cond_create(&mq->mq_writeable, mq->mq_lock)) != 0) {
- nni_cond_destroy(mq->mq_readable);
- nni_mutex_destroy(mq->mq_lock);
- return (NNG_ENOMEM);
- }
- if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) {
- nni_cond_destroy(mq->mq_writeable);
- nni_cond_destroy(mq->mq_readable);
- nni_mutex_destroy(mq->mq_lock);
- return (NNG_ENOMEM);
- }
-
- mq->mq_cap = cap;
- mq->mq_len = 0;
- mq->mq_get = 0;
- mq->mq_put = 0;
- mq->mq_closed = 0;
- *mqp = mq;
-
- return (0);
-}
-
-void
-nni_msgqueue_destroy(nni_msgqueue_t mq)
-{
- nng_msg_t msg;
-
- nni_cond_destroy(mq->mq_writeable);
- nni_cond_destroy(mq->mq_readable);
- nni_mutex_destroy(mq->mq_lock);
-
- /* Free any orphaned messages. */
- while (mq->mq_len > 0) {
- msg = mq->mq_msgs[mq->mq_get];
- mq->mq_get++;
- if (mq->mq_get > mq->mq_cap) {
- mq->mq_get = 0;
- }
- mq->mq_len--;
- nng_msg_free(msg);
- }
-
- nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t));
- nni_free(mq, sizeof (*mq));
-}
-
-int
-nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
-{
- uint64_t expire, now;
-
- if (tmout > 0) {
- expire = nni_clock() + tmout;
- }
-
- nni_mutex_enter(mq->mq_lock);
-
- while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap)) {
- if (tmout == 0) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_EAGAIN);
- }
-
- if (tmout < 0) {
- (void) nni_cond_wait(mq->mq_writeable);
- continue;
- }
-
- now = nni_clock();
- if (now >= expire) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_ETIMEDOUT);
- }
- (void) nni_cond_timedwait(mq->mq_writeable, (expire - now));
- }
-
- if (mq->mq_closed) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_ECLOSED);
- }
-
- mq->mq_msgs[mq->mq_put] = msg;
- mq->mq_put++;
- if (mq->mq_put == mq->mq_cap) {
- mq->mq_put = 0;
- }
- mq->mq_len++;
- if (mq->mq_len == 1) {
- (void) nni_cond_signal(mq->mq_readable);
- }
- nni_mutex_exit(mq->mq_lock);
- return (0);
-}
-
-int
-nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
-{
- uint64_t expire, now;
-
- if (tmout > 0) {
- expire = nni_clock() + tmout;
- }
-
- nni_mutex_enter(mq->mq_lock);
-
- while ((!mq->mq_closed) && (mq->mq_len == 0)) {
- if (tmout == 0) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_EAGAIN);
- }
-
- if (tmout < 0) {
- (void) nni_cond_wait(mq->mq_readable);
- continue;
- }
-
- now = nni_clock();
- if (now >= expire) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_ETIMEDOUT);
- }
- (void) nni_cond_timedwait(mq->mq_readable, (expire - now));
- }
-
- if (mq->mq_closed) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_ECLOSED);
- }
-
- *msgp = mq->mq_msgs[mq->mq_get];
- mq->mq_len--;
- mq->mq_get++;
- if (mq->mq_get == mq->mq_cap) {
- mq->mq_get = 0;
- }
- mq->mq_len++;
- if (mq->mq_len == (mq->mq_cap - 1)) {
- (void) nni_cond_signal(mq->mq_writeable);
- }
- nni_mutex_exit(mq->mq_lock);
- return (0);
-}
-
-void
-nni_msgqueue_close(nni_msgqueue_t mq)
-{
- nng_msg_t msg;
-
- nni_mutex_enter(mq->mq_lock);
- mq->mq_closed = 1;
- nni_cond_broadcast(mq->mq_writeable);
- nni_cond_broadcast(mq->mq_readable);
-
- /* Free the messages orphaned in the queue. */
- while (mq->mq_len > 0) {
- msg = mq->mq_msgs[mq->mq_get];
- mq->mq_get++;
- if (mq->mq_get > mq->mq_cap) {
- mq->mq_get = 0;
- }
- mq->mq_len--;
- nng_msg_free(msg);
- }
- nni_mutex_exit(mq->mq_lock);
-}