aboutsummaryrefslogtreecommitdiff
path: root/src/core/lmq.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2019-02-24 22:04:16 -0800
committerGarrett D'Amore <garrett@damore.org>2019-02-26 21:09:54 -0800
commit5803db08e55ed9287dc59b3adc281b89c52c530f (patch)
tree9d2d65ed86be5c7b976fc3bdfc5ed5b375143641 /src/core/lmq.c
parent9cf967e9d7fdab6ccf38f80d83e4bf3d1a5e1a67 (diff)
downloadnng-5803db08e55ed9287dc59b3adc281b89c52c530f.tar.gz
nng-5803db08e55ed9287dc59b3adc281b89c52c530f.tar.bz2
nng-5803db08e55ed9287dc59b3adc281b89c52c530f.zip
fixes #461 Context support for SUB
fixes #762 Pub/Sub very slow compared with nanomsg This introduces contexts for SUB, and converts both the cooked SUB and PUB protocols to use a new lightweight message queue that has significant performance benefits over the heavy-weight message queue. We've also added a test program, pubdrop, in the perf directory, which can be used for measuring pub/sub message rates and drop rates. Note that its quite easy to overwhelm a subscriber still. The SUB socket performance is still not completely where it needs to be. There are two remainging things to improve. Firsst we need to replace the naive linked list of topics with a proper PATRICIA trie. Second, we need to work on the low level POSIX poller code. (The Windows code is already quite good, and we outperform nanomsg on Windows.)
Diffstat (limited to 'src/core/lmq.c')
-rw-r--r--src/core/lmq.c158
1 files changed, 158 insertions, 0 deletions
diff --git a/src/core/lmq.c b/src/core/lmq.c
new file mode 100644
index 00000000..9772e78e
--- /dev/null
+++ b/src/core/lmq.c
@@ -0,0 +1,158 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "nng_impl.h"
+
+// Light-weight message queue. These are derived from our heavy-weight
+// message queues, but are less "featureful", but more useful for
+// performance sensitive contexts. Locking must be done by the caller.
+
+int
+nni_lmq_init(nni_lmq *lmq, size_t cap)
+{
+ size_t alloc;
+
+ // We prefer alloc to a power of 2, this allows us to do modulo
+ // operations as a power of two, for efficiency. It does possibly
+ // waste some space, but never more than 2x. Consumers should try
+ // for powers of two if they are concerned about efficiency.
+ alloc = 1;
+ while (alloc < cap) {
+ alloc *= 2;
+ }
+ if ((lmq->lmq_msgs = nni_zalloc(sizeof(nng_msg *) * alloc)) == NULL) {
+ NNI_FREE_STRUCT(lmq);
+ return (NNG_ENOMEM);
+ }
+ lmq->lmq_cap = cap;
+ lmq->lmq_alloc = alloc;
+ lmq->lmq_mask = (cap - 1);
+ lmq->lmq_len = 0;
+ lmq->lmq_get = 0;
+ lmq->lmq_put = 0;
+
+ return (0);
+}
+
+void
+nni_lmq_fini(nni_lmq *lmq)
+{
+ if (lmq == NULL) {
+ return;
+ }
+
+ /* Free any orphaned messages. */
+ while (lmq->lmq_len > 0) {
+ nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
+ lmq->lmq_get &= lmq->lmq_mask;
+ lmq->lmq_len--;
+ nni_msg_free(msg);
+ }
+
+ nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
+}
+
+void
+nni_lmq_flush(nni_lmq *lmq)
+{
+ while (lmq->lmq_len > 0) {
+ nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
+ lmq->lmq_get &= lmq->lmq_mask;
+ lmq->lmq_len--;
+ nni_msg_free(msg);
+ }
+}
+
+size_t
+nni_lmq_len(nni_lmq *lmq)
+{
+ return (lmq->lmq_len);
+}
+
+size_t
+nni_lmq_cap(nni_lmq *lmq)
+{
+ return (lmq->lmq_cap);
+}
+
+bool
+nni_lmq_full(nni_lmq *lmq)
+{
+ return (lmq->lmq_len >= lmq->lmq_cap);
+}
+
+bool
+nni_lmq_empty(nni_lmq *lmq)
+{
+ return (lmq->lmq_len == 0);
+}
+
+int
+nni_lmq_putq(nni_lmq *lmq, nng_msg *msg)
+{
+ if (lmq->lmq_len >= lmq->lmq_cap) {
+ return (NNG_EAGAIN);
+ }
+ lmq->lmq_msgs[lmq->lmq_put++] = msg;
+ lmq->lmq_len++;
+ lmq->lmq_put &= lmq->lmq_mask;
+ return (0);
+}
+
+int
+nni_lmq_getq(nni_lmq *lmq, nng_msg **msgp)
+{
+ nng_msg *msg;
+ if (lmq->lmq_len == 0) {
+ return (NNG_EAGAIN);
+ }
+ msg = lmq->lmq_msgs[lmq->lmq_get++];
+ lmq->lmq_get &= lmq->lmq_mask;
+ lmq->lmq_len--;
+ *msgp = msg;
+ return (0);
+}
+
+int
+nni_lmq_resize(nni_lmq *lmq, size_t cap)
+{
+ nng_msg * msg;
+ nng_msg **newq;
+ size_t alloc;
+ size_t len;
+
+ alloc = 1;
+ while (alloc < cap) {
+ alloc *= 2;
+ }
+
+ newq = nni_alloc(sizeof(nng_msg *) * alloc);
+ if (newq == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ len = 0;
+ while ((len < cap) && (nni_lmq_getq(lmq, &msg) == 0)) {
+ newq[len++] = msg;
+ }
+
+ // Flush anything left over.
+ nni_lmq_flush(lmq);
+
+ nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
+ lmq->lmq_msgs = newq;
+ lmq->lmq_cap = cap;
+ lmq->lmq_alloc = alloc;
+ lmq->lmq_mask = alloc - 1;
+ lmq->lmq_len = len;
+ lmq->lmq_put = len;
+ lmq->lmq_get = 0;
+
+ return (0);
+}