aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/lmq.c158
-rw-r--r--src/core/lmq.h39
-rw-r--r--src/core/nng_impl.h1
3 files changed, 198 insertions, 0 deletions
diff --git a/src/core/lmq.c b/src/core/lmq.c
new file mode 100644
index 00000000..9772e78e
--- /dev/null
+++ b/src/core/lmq.c
@@ -0,0 +1,158 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "nng_impl.h"
+
+// Light-weight message queue. These are derived from our heavy-weight
+// message queues, but are less "featureful", but more useful for
+// performance sensitive contexts. Locking must be done by the caller.
+
+int
+nni_lmq_init(nni_lmq *lmq, size_t cap)
+{
+ size_t alloc;
+
+ // We prefer alloc to a power of 2, this allows us to do modulo
+ // operations as a power of two, for efficiency. It does possibly
+ // waste some space, but never more than 2x. Consumers should try
+ // for powers of two if they are concerned about efficiency.
+ alloc = 1;
+ while (alloc < cap) {
+ alloc *= 2;
+ }
+ if ((lmq->lmq_msgs = nni_zalloc(sizeof(nng_msg *) * alloc)) == NULL) {
+ NNI_FREE_STRUCT(lmq);
+ return (NNG_ENOMEM);
+ }
+ lmq->lmq_cap = cap;
+ lmq->lmq_alloc = alloc;
+ lmq->lmq_mask = (cap - 1);
+ lmq->lmq_len = 0;
+ lmq->lmq_get = 0;
+ lmq->lmq_put = 0;
+
+ return (0);
+}
+
+void
+nni_lmq_fini(nni_lmq *lmq)
+{
+ if (lmq == NULL) {
+ return;
+ }
+
+ /* Free any orphaned messages. */
+ while (lmq->lmq_len > 0) {
+ nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
+ lmq->lmq_get &= lmq->lmq_mask;
+ lmq->lmq_len--;
+ nni_msg_free(msg);
+ }
+
+ nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
+}
+
+void
+nni_lmq_flush(nni_lmq *lmq)
+{
+ while (lmq->lmq_len > 0) {
+ nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
+ lmq->lmq_get &= lmq->lmq_mask;
+ lmq->lmq_len--;
+ nni_msg_free(msg);
+ }
+}
+
+size_t
+nni_lmq_len(nni_lmq *lmq)
+{
+ return (lmq->lmq_len);
+}
+
+size_t
+nni_lmq_cap(nni_lmq *lmq)
+{
+ return (lmq->lmq_cap);
+}
+
+bool
+nni_lmq_full(nni_lmq *lmq)
+{
+ return (lmq->lmq_len >= lmq->lmq_cap);
+}
+
+bool
+nni_lmq_empty(nni_lmq *lmq)
+{
+ return (lmq->lmq_len == 0);
+}
+
+int
+nni_lmq_putq(nni_lmq *lmq, nng_msg *msg)
+{
+ if (lmq->lmq_len >= lmq->lmq_cap) {
+ return (NNG_EAGAIN);
+ }
+ lmq->lmq_msgs[lmq->lmq_put++] = msg;
+ lmq->lmq_len++;
+ lmq->lmq_put &= lmq->lmq_mask;
+ return (0);
+}
+
+int
+nni_lmq_getq(nni_lmq *lmq, nng_msg **msgp)
+{
+ nng_msg *msg;
+ if (lmq->lmq_len == 0) {
+ return (NNG_EAGAIN);
+ }
+ msg = lmq->lmq_msgs[lmq->lmq_get++];
+ lmq->lmq_get &= lmq->lmq_mask;
+ lmq->lmq_len--;
+ *msgp = msg;
+ return (0);
+}
+
+int
+nni_lmq_resize(nni_lmq *lmq, size_t cap)
+{
+ nng_msg * msg;
+ nng_msg **newq;
+ size_t alloc;
+ size_t len;
+
+ alloc = 1;
+ while (alloc < cap) {
+ alloc *= 2;
+ }
+
+ newq = nni_alloc(sizeof(nng_msg *) * alloc);
+ if (newq == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ len = 0;
+ while ((len < cap) && (nni_lmq_getq(lmq, &msg) == 0)) {
+ newq[len++] = msg;
+ }
+
+ // Flush anything left over.
+ nni_lmq_flush(lmq);
+
+ nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
+ lmq->lmq_msgs = newq;
+ lmq->lmq_cap = cap;
+ lmq->lmq_alloc = alloc;
+ lmq->lmq_mask = alloc - 1;
+ lmq->lmq_len = len;
+ lmq->lmq_put = len;
+ lmq->lmq_get = 0;
+
+ return (0);
+}
diff --git a/src/core/lmq.h b/src/core/lmq.h
new file mode 100644
index 00000000..0a64c984
--- /dev/null
+++ b/src/core/lmq.h
@@ -0,0 +1,39 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_LMQ_H
+#define CORE_LMQ_H
+
+#include "nng_impl.h"
+
+// nni_lmq is a very lightweight message queue. Defining it this way allows
+// us to share some common code. Locking must be supplied by the caller.
+// For performance reasons, this is allocated inline.
+typedef struct nni_lmq {
+ size_t lmq_cap;
+ size_t lmq_alloc; // alloc is cap, rounded up to power of 2
+ size_t lmq_mask;
+ size_t lmq_len;
+ size_t lmq_get;
+ size_t lmq_put;
+ nng_msg **lmq_msgs;
+} nni_lmq;
+
+extern int nni_lmq_init(nni_lmq *, size_t);
+extern void nni_lmq_fini(nni_lmq *);
+extern void nni_lmq_flush(nni_lmq *);
+extern size_t nni_lmq_len(nni_lmq *);
+extern size_t nni_lmq_cap(nni_lmq *);
+extern int nni_lmq_putq(nni_lmq *, nng_msg *);
+extern int nni_lmq_getq(nni_lmq *, nng_msg **);
+extern int nni_lmq_resize(nni_lmq *, size_t);
+extern bool nni_lmq_full(nni_lmq *);
+extern bool nni_lmq_empty(nni_lmq *);
+
+#endif // CORE_LMQ_H
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 07fe44f5..0dcc976f 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -33,6 +33,7 @@
#include "core/idhash.h"
#include "core/init.h"
#include "core/list.h"
+#include "core/lmq.h"
#include "core/message.h"
#include "core/msgqueue.h"
#include "core/options.h"