diff options
| author | Garrett D'Amore <garrett@damore.org> | 2019-02-24 22:04:16 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2019-02-26 21:09:54 -0800 |
| commit | 5803db08e55ed9287dc59b3adc281b89c52c530f (patch) | |
| tree | 9d2d65ed86be5c7b976fc3bdfc5ed5b375143641 /src/core | |
| parent | 9cf967e9d7fdab6ccf38f80d83e4bf3d1a5e1a67 (diff) | |
| download | nng-5803db08e55ed9287dc59b3adc281b89c52c530f.tar.gz nng-5803db08e55ed9287dc59b3adc281b89c52c530f.tar.bz2 nng-5803db08e55ed9287dc59b3adc281b89c52c530f.zip | |
fixes #461 Context support for SUB
fixes #762 Pub/Sub very slow compared with nanomsg
This introduces contexts for SUB, and converts both the cooked SUB
and PUB protocols to use a new lightweight message queue that has
significant performance benefits over the heavy-weight message queue.
We've also added a test program, pubdrop, in the perf directory,
which can be used for measuring pub/sub message rates and drop rates.
Note that its quite easy to overwhelm a subscriber still.
The SUB socket performance is still not completely where it needs to be.
There are two remainging things to improve. Firsst we need to replace
the naive linked list of topics with a proper PATRICIA trie. Second, we
need to work on the low level POSIX poller code. (The Windows code is
already quite good, and we outperform nanomsg on Windows.)
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/lmq.c | 158 | ||||
| -rw-r--r-- | src/core/lmq.h | 39 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 |
3 files changed, 198 insertions, 0 deletions
diff --git a/src/core/lmq.c b/src/core/lmq.c new file mode 100644 index 00000000..9772e78e --- /dev/null +++ b/src/core/lmq.c @@ -0,0 +1,158 @@ +// +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "nng_impl.h" + +// Light-weight message queue. These are derived from our heavy-weight +// message queues, but are less "featureful", but more useful for +// performance sensitive contexts. Locking must be done by the caller. + +int +nni_lmq_init(nni_lmq *lmq, size_t cap) +{ + size_t alloc; + + // We prefer alloc to a power of 2, this allows us to do modulo + // operations as a power of two, for efficiency. It does possibly + // waste some space, but never more than 2x. Consumers should try + // for powers of two if they are concerned about efficiency. + alloc = 1; + while (alloc < cap) { + alloc *= 2; + } + if ((lmq->lmq_msgs = nni_zalloc(sizeof(nng_msg *) * alloc)) == NULL) { + NNI_FREE_STRUCT(lmq); + return (NNG_ENOMEM); + } + lmq->lmq_cap = cap; + lmq->lmq_alloc = alloc; + lmq->lmq_mask = (cap - 1); + lmq->lmq_len = 0; + lmq->lmq_get = 0; + lmq->lmq_put = 0; + + return (0); +} + +void +nni_lmq_fini(nni_lmq *lmq) +{ + if (lmq == NULL) { + return; + } + + /* Free any orphaned messages. */ + while (lmq->lmq_len > 0) { + nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++]; + lmq->lmq_get &= lmq->lmq_mask; + lmq->lmq_len--; + nni_msg_free(msg); + } + + nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *)); +} + +void +nni_lmq_flush(nni_lmq *lmq) +{ + while (lmq->lmq_len > 0) { + nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++]; + lmq->lmq_get &= lmq->lmq_mask; + lmq->lmq_len--; + nni_msg_free(msg); + } +} + +size_t +nni_lmq_len(nni_lmq *lmq) +{ + return (lmq->lmq_len); +} + +size_t +nni_lmq_cap(nni_lmq *lmq) +{ + return (lmq->lmq_cap); +} + +bool +nni_lmq_full(nni_lmq *lmq) +{ + return (lmq->lmq_len >= lmq->lmq_cap); +} + +bool +nni_lmq_empty(nni_lmq *lmq) +{ + return (lmq->lmq_len == 0); +} + +int +nni_lmq_putq(nni_lmq *lmq, nng_msg *msg) +{ + if (lmq->lmq_len >= lmq->lmq_cap) { + return (NNG_EAGAIN); + } + lmq->lmq_msgs[lmq->lmq_put++] = msg; + lmq->lmq_len++; + lmq->lmq_put &= lmq->lmq_mask; + return (0); +} + +int +nni_lmq_getq(nni_lmq *lmq, nng_msg **msgp) +{ + nng_msg *msg; + if (lmq->lmq_len == 0) { + return (NNG_EAGAIN); + } + msg = lmq->lmq_msgs[lmq->lmq_get++]; + lmq->lmq_get &= lmq->lmq_mask; + lmq->lmq_len--; + *msgp = msg; + return (0); +} + +int +nni_lmq_resize(nni_lmq *lmq, size_t cap) +{ + nng_msg * msg; + nng_msg **newq; + size_t alloc; + size_t len; + + alloc = 1; + while (alloc < cap) { + alloc *= 2; + } + + newq = nni_alloc(sizeof(nng_msg *) * alloc); + if (newq == NULL) { + return (NNG_ENOMEM); + } + + len = 0; + while ((len < cap) && (nni_lmq_getq(lmq, &msg) == 0)) { + newq[len++] = msg; + } + + // Flush anything left over. + nni_lmq_flush(lmq); + + nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *)); + lmq->lmq_msgs = newq; + lmq->lmq_cap = cap; + lmq->lmq_alloc = alloc; + lmq->lmq_mask = alloc - 1; + lmq->lmq_len = len; + lmq->lmq_put = len; + lmq->lmq_get = 0; + + return (0); +} diff --git a/src/core/lmq.h b/src/core/lmq.h new file mode 100644 index 00000000..0a64c984 --- /dev/null +++ b/src/core/lmq.h @@ -0,0 +1,39 @@ +// +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_LMQ_H +#define CORE_LMQ_H + +#include "nng_impl.h" + +// nni_lmq is a very lightweight message queue. Defining it this way allows +// us to share some common code. Locking must be supplied by the caller. +// For performance reasons, this is allocated inline. +typedef struct nni_lmq { + size_t lmq_cap; + size_t lmq_alloc; // alloc is cap, rounded up to power of 2 + size_t lmq_mask; + size_t lmq_len; + size_t lmq_get; + size_t lmq_put; + nng_msg **lmq_msgs; +} nni_lmq; + +extern int nni_lmq_init(nni_lmq *, size_t); +extern void nni_lmq_fini(nni_lmq *); +extern void nni_lmq_flush(nni_lmq *); +extern size_t nni_lmq_len(nni_lmq *); +extern size_t nni_lmq_cap(nni_lmq *); +extern int nni_lmq_putq(nni_lmq *, nng_msg *); +extern int nni_lmq_getq(nni_lmq *, nng_msg **); +extern int nni_lmq_resize(nni_lmq *, size_t); +extern bool nni_lmq_full(nni_lmq *); +extern bool nni_lmq_empty(nni_lmq *); + +#endif // CORE_LMQ_H diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 07fe44f5..0dcc976f 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -33,6 +33,7 @@ #include "core/idhash.h" #include "core/init.h" #include "core/list.h" +#include "core/lmq.h" #include "core/message.h" #include "core/msgqueue.h" #include "core/options.h" |
