diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/lmq.c | 158 | ||||
| -rw-r--r-- | src/core/lmq.h | 39 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 |
3 files changed, 198 insertions, 0 deletions
diff --git a/src/core/lmq.c b/src/core/lmq.c new file mode 100644 index 00000000..9772e78e --- /dev/null +++ b/src/core/lmq.c @@ -0,0 +1,158 @@ +// +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "nng_impl.h" + +// Light-weight message queue. These are derived from our heavy-weight +// message queues, but are less "featureful", but more useful for +// performance sensitive contexts. Locking must be done by the caller. + +int +nni_lmq_init(nni_lmq *lmq, size_t cap) +{ + size_t alloc; + + // We prefer alloc to a power of 2, this allows us to do modulo + // operations as a power of two, for efficiency. It does possibly + // waste some space, but never more than 2x. Consumers should try + // for powers of two if they are concerned about efficiency. + alloc = 1; + while (alloc < cap) { + alloc *= 2; + } + if ((lmq->lmq_msgs = nni_zalloc(sizeof(nng_msg *) * alloc)) == NULL) { + NNI_FREE_STRUCT(lmq); + return (NNG_ENOMEM); + } + lmq->lmq_cap = cap; + lmq->lmq_alloc = alloc; + lmq->lmq_mask = (cap - 1); + lmq->lmq_len = 0; + lmq->lmq_get = 0; + lmq->lmq_put = 0; + + return (0); +} + +void +nni_lmq_fini(nni_lmq *lmq) +{ + if (lmq == NULL) { + return; + } + + /* Free any orphaned messages. */ + while (lmq->lmq_len > 0) { + nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++]; + lmq->lmq_get &= lmq->lmq_mask; + lmq->lmq_len--; + nni_msg_free(msg); + } + + nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *)); +} + +void +nni_lmq_flush(nni_lmq *lmq) +{ + while (lmq->lmq_len > 0) { + nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++]; + lmq->lmq_get &= lmq->lmq_mask; + lmq->lmq_len--; + nni_msg_free(msg); + } +} + +size_t +nni_lmq_len(nni_lmq *lmq) +{ + return (lmq->lmq_len); +} + +size_t +nni_lmq_cap(nni_lmq *lmq) +{ + return (lmq->lmq_cap); +} + +bool +nni_lmq_full(nni_lmq *lmq) +{ + return (lmq->lmq_len >= lmq->lmq_cap); +} + +bool +nni_lmq_empty(nni_lmq *lmq) +{ + return (lmq->lmq_len == 0); +} + +int +nni_lmq_putq(nni_lmq *lmq, nng_msg *msg) +{ + if (lmq->lmq_len >= lmq->lmq_cap) { + return (NNG_EAGAIN); + } + lmq->lmq_msgs[lmq->lmq_put++] = msg; + lmq->lmq_len++; + lmq->lmq_put &= lmq->lmq_mask; + return (0); +} + +int +nni_lmq_getq(nni_lmq *lmq, nng_msg **msgp) +{ + nng_msg *msg; + if (lmq->lmq_len == 0) { + return (NNG_EAGAIN); + } + msg = lmq->lmq_msgs[lmq->lmq_get++]; + lmq->lmq_get &= lmq->lmq_mask; + lmq->lmq_len--; + *msgp = msg; + return (0); +} + +int +nni_lmq_resize(nni_lmq *lmq, size_t cap) +{ + nng_msg * msg; + nng_msg **newq; + size_t alloc; + size_t len; + + alloc = 1; + while (alloc < cap) { + alloc *= 2; + } + + newq = nni_alloc(sizeof(nng_msg *) * alloc); + if (newq == NULL) { + return (NNG_ENOMEM); + } + + len = 0; + while ((len < cap) && (nni_lmq_getq(lmq, &msg) == 0)) { + newq[len++] = msg; + } + + // Flush anything left over. + nni_lmq_flush(lmq); + + nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *)); + lmq->lmq_msgs = newq; + lmq->lmq_cap = cap; + lmq->lmq_alloc = alloc; + lmq->lmq_mask = alloc - 1; + lmq->lmq_len = len; + lmq->lmq_put = len; + lmq->lmq_get = 0; + + return (0); +} diff --git a/src/core/lmq.h b/src/core/lmq.h new file mode 100644 index 00000000..0a64c984 --- /dev/null +++ b/src/core/lmq.h @@ -0,0 +1,39 @@ +// +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_LMQ_H +#define CORE_LMQ_H + +#include "nng_impl.h" + +// nni_lmq is a very lightweight message queue. Defining it this way allows +// us to share some common code. Locking must be supplied by the caller. +// For performance reasons, this is allocated inline. +typedef struct nni_lmq { + size_t lmq_cap; + size_t lmq_alloc; // alloc is cap, rounded up to power of 2 + size_t lmq_mask; + size_t lmq_len; + size_t lmq_get; + size_t lmq_put; + nng_msg **lmq_msgs; +} nni_lmq; + +extern int nni_lmq_init(nni_lmq *, size_t); +extern void nni_lmq_fini(nni_lmq *); +extern void nni_lmq_flush(nni_lmq *); +extern size_t nni_lmq_len(nni_lmq *); +extern size_t nni_lmq_cap(nni_lmq *); +extern int nni_lmq_putq(nni_lmq *, nng_msg *); +extern int nni_lmq_getq(nni_lmq *, nng_msg **); +extern int nni_lmq_resize(nni_lmq *, size_t); +extern bool nni_lmq_full(nni_lmq *); +extern bool nni_lmq_empty(nni_lmq *); + +#endif // CORE_LMQ_H diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 07fe44f5..0dcc976f 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -33,6 +33,7 @@ #include "core/idhash.h" #include "core/init.h" #include "core/list.h" +#include "core/lmq.h" #include "core/message.h" #include "core/msgqueue.h" #include "core/options.h" |
