aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c98
1 files changed, 1 insertions, 97 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 773fb9aa..92213265 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -36,15 +36,6 @@ struct nni_msgq {
// Filters.
nni_msgq_filter mq_filter_fn;
void * mq_filter_arg;
-
- // Statistics.
- nni_atomic_u64 mq_get_msgs;
- nni_atomic_u64 mq_put_msgs;
- nni_atomic_u64 mq_get_bytes;
- nni_atomic_u64 mq_put_bytes;
- nni_atomic_u64 mq_get_errs;
- nni_atomic_u64 mq_put_errs;
- nni_atomic_u64 mq_discards;
};
static void nni_msgq_run_notify(nni_msgq *);
@@ -83,14 +74,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
mq->mq_geterr = 0;
*mqp = mq;
- nni_atomic_init64(&mq->mq_get_bytes);
- nni_atomic_init64(&mq->mq_put_bytes);
- nni_atomic_init64(&mq->mq_get_msgs);
- nni_atomic_init64(&mq->mq_put_msgs);
- nni_atomic_init64(&mq->mq_get_errs);
- nni_atomic_init64(&mq->mq_put_errs);
- nni_atomic_init64(&mq->mq_discards);
-
return (0);
}
@@ -154,7 +137,6 @@ nni_msgq_flush(nni_msgq *mq)
mq->mq_get = 0;
}
mq->mq_len--;
- nni_atomic_inc64(&mq->mq_discards, 1);
nni_msg_free(msg);
}
nni_msgq_run_notify(mq);
@@ -185,20 +167,13 @@ nni_msgq_run_putq(nni_msgq *mq)
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
- nni_atomic_inc64(&mq->mq_put_bytes, len);
- nni_atomic_inc64(&mq->mq_put_msgs, 1);
-
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
len = nni_msg_len(msg);
- nni_atomic_inc64(&mq->mq_get_bytes, len);
- nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
- } else {
- nni_atomic_inc64(&mq->mq_discards, 1);
}
nni_aio_finish(waio, 0, len);
@@ -207,8 +182,6 @@ nni_msgq_run_putq(nni_msgq *mq)
// Otherwise if we have room in the buffer, just queue it.
if (mq->mq_len < mq->mq_cap) {
- nni_atomic_inc64(&mq->mq_put_bytes, len);
- nni_atomic_inc64(&mq->mq_put_msgs, 1);
nni_list_remove(&mq->mq_aio_putq, waio);
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
@@ -244,13 +217,8 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
- size_t len = nni_msg_len(msg);
- nni_atomic_inc64(&mq->mq_get_bytes, len);
- nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
- } else {
- nni_atomic_inc64(&mq->mq_discards, 1);
}
continue;
}
@@ -262,9 +230,6 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = nni_aio_get_msg(waio);
len = nni_msg_len(msg);
- nni_atomic_inc64(&mq->mq_put_bytes, len);
- nni_atomic_inc64(&mq->mq_put_msgs, 1);
-
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
nni_aio_finish(waio, 0, len);
@@ -276,10 +241,6 @@ nni_msgq_run_getq(nni_msgq *mq)
len = nni_msg_len(msg);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
- nni_atomic_inc64(&mq->mq_get_bytes, len);
- nni_atomic_inc64(&mq->mq_get_msgs, 1);
- } else {
- nni_atomic_inc64(&mq->mq_discards, 1);
}
continue;
@@ -336,7 +297,6 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
(nni_list_empty(&mq->mq_aio_getq))) {
nni_mtx_unlock(&mq->mq_lock);
- nni_atomic_inc64(&mq->mq_put_errs, 1);
nni_aio_finish_error(aio, rv);
return;
}
@@ -358,7 +318,6 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_geterr) {
nni_mtx_unlock(&mq->mq_lock);
- nni_atomic_inc64(&mq->mq_get_errs, 1);
nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
@@ -366,7 +325,6 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
if ((rv != 0) && (mq->mq_len == 0) &&
(nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
- nni_atomic_inc64(&mq->mq_get_errs, 1);
nni_aio_finish_error(aio, rv);
return;
}
@@ -382,7 +340,6 @@ int
nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
{
nni_aio *raio;
- size_t len;
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_closed) {
@@ -390,22 +347,15 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_ECLOSED);
}
- len = nni_msg_len(msg);
-
// The presence of any blocked reader indicates that
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- nni_atomic_inc64(&mq->mq_put_bytes, len);
- nni_atomic_inc64(&mq->mq_put_msgs, 1);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
- len = nni_msg_len(msg);
- nni_atomic_inc64(&mq->mq_get_bytes, len);
- nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_list_remove(&mq->mq_aio_getq, raio);
nni_aio_finish_msg(raio, msg);
nni_msgq_run_notify(mq);
@@ -416,9 +366,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
// Otherwise if we have room in the buffer, just queue it.
if (mq->mq_len < mq->mq_cap) {
- nni_atomic_inc64(&mq->mq_put_bytes, len);
- nni_atomic_inc64(&mq->mq_put_msgs, 1);
-
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
mq->mq_put = 0;
@@ -515,7 +462,6 @@ nni_msgq_resize(nni_msgq *mq, int cap)
mq->mq_get = 0;
}
mq->mq_len--;
- nni_atomic_inc64(&mq->mq_discards, 1);
nni_msg_free(msg);
}
if (newq == NULL) {
@@ -588,45 +534,3 @@ nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp)
*sp = mq->mq_sendable;
return (0);
}
-
-uint64_t
-nni_msgq_stat_get_bytes(nni_msgq *mq)
-{
- return (nni_atomic_get64(&mq->mq_get_bytes));
-}
-
-uint64_t
-nni_msgq_stat_put_bytes(nni_msgq *mq)
-{
- return (nni_atomic_get64(&mq->mq_put_bytes));
-}
-
-uint64_t
-nni_msgq_stat_get_msgs(nni_msgq *mq)
-{
- return (nni_atomic_get64(&mq->mq_get_msgs));
-}
-
-uint64_t
-nni_msgq_stat_put_msgs(nni_msgq *mq)
-{
- return (nni_atomic_get64(&mq->mq_put_msgs));
-}
-
-uint64_t
-nni_msgq_stat_get_errs(nni_msgq *mq)
-{
- return (nni_atomic_get64(&mq->mq_get_errs));
-}
-
-uint64_t
-nni_msgq_stat_put_errs(nni_msgq *mq)
-{
- return (nni_atomic_get64(&mq->mq_put_errs));
-}
-
-uint64_t
-nni_msgq_stat_discards(nni_msgq *mq)
-{
- return (nni_atomic_get64(&mq->mq_discards));
-}