aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c100
1 files changed, 99 insertions, 1 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 62a3893b..4b59aead 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -37,6 +37,15 @@ struct nni_msgq {
// Filters.
nni_msgq_filter mq_filter_fn;
void * mq_filter_arg;
+
+ // Statistics.
+ nni_atomic_u64 mq_get_msgs;
+ nni_atomic_u64 mq_put_msgs;
+ nni_atomic_u64 mq_get_bytes;
+ nni_atomic_u64 mq_put_bytes;
+ nni_atomic_u64 mq_get_errs;
+ nni_atomic_u64 mq_put_errs;
+ nni_atomic_u64 mq_discards;
};
static void nni_msgq_run_notify(nni_msgq *);
@@ -76,6 +85,14 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
mq->mq_geterr = 0;
*mqp = mq;
+ nni_atomic_init64(&mq->mq_get_bytes);
+ nni_atomic_init64(&mq->mq_put_bytes);
+ nni_atomic_init64(&mq->mq_get_msgs);
+ nni_atomic_init64(&mq->mq_put_msgs);
+ nni_atomic_init64(&mq->mq_get_errs);
+ nni_atomic_init64(&mq->mq_put_errs);
+ nni_atomic_init64(&mq->mq_discards);
+
return (0);
}
@@ -185,6 +202,7 @@ nni_msgq_flush(nni_msgq *mq)
mq->mq_get = 0;
}
mq->mq_len--;
+ nni_atomic_inc64(&mq->mq_discards, 1);
nni_msg_free(msg);
}
nni_msgq_run_notify(mq);
@@ -215,13 +233,20 @@ nni_msgq_run_putq(nni_msgq *mq)
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
nni_aio_finish(waio, 0, len);
@@ -230,6 +255,8 @@ nni_msgq_run_putq(nni_msgq *mq)
// Otherwise if we have room in the buffer, just queue it.
if (mq->mq_len < mq->mq_cap) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
nni_list_remove(&mq->mq_aio_putq, waio);
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
@@ -265,8 +292,13 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ size_t len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
continue;
}
@@ -278,18 +310,26 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = nni_aio_get_msg(waio);
len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
+
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
+ nni_aio_finish(waio, 0, len);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
- nni_aio_finish(waio, 0, len);
continue;
}
@@ -338,6 +378,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
}
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_puterr) {
+ nni_atomic_inc64(&mq->mq_put_errs, 1);
nni_aio_finish_error(aio, mq->mq_puterr);
nni_mtx_unlock(&mq->mq_lock);
return;
@@ -348,6 +389,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
(nni_list_empty(&mq->mq_aio_getq))) {
+ nni_atomic_inc64(&mq->mq_put_errs, 1);
nni_mtx_unlock(&mq->mq_lock);
nni_aio_finish_error(aio, rv);
return;
@@ -370,6 +412,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_geterr) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_atomic_inc64(&mq->mq_get_errs, 1);
nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
@@ -377,6 +420,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
if ((rv != 0) && (mq->mq_len == 0) &&
(nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_atomic_inc64(&mq->mq_get_errs, 1);
nni_aio_finish_error(aio, rv);
return;
}
@@ -392,6 +436,7 @@ int
nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
{
nni_aio *raio;
+ size_t len;
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_closed) {
@@ -399,15 +444,22 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_ECLOSED);
}
+ len = nni_msg_len(msg);
+
// The presence of any blocked reader indicates that
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_list_remove(&mq->mq_aio_getq, raio);
nni_aio_finish_msg(raio, msg);
nni_msgq_run_notify(mq);
@@ -418,6 +470,9 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
// Otherwise if we have room in the buffer, just queue it.
if (mq->mq_len < mq->mq_cap) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
+
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
mq->mq_put = 0;
@@ -514,6 +569,7 @@ nni_msgq_resize(nni_msgq *mq, int cap)
mq->mq_get = 0;
}
mq->mq_len--;
+ nni_atomic_inc64(&mq->mq_discards, 1);
nni_msg_free(msg);
}
if (newq == NULL) {
@@ -586,3 +642,45 @@ nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp)
*sp = mq->mq_sendable;
return (0);
}
+
+uint64_t
+nni_msgq_stat_get_bytes(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_bytes));
+}
+
+uint64_t
+nni_msgq_stat_put_bytes(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_bytes));
+}
+
+uint64_t
+nni_msgq_stat_get_msgs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_msgs));
+}
+
+uint64_t
+nni_msgq_stat_put_msgs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_msgs));
+}
+
+uint64_t
+nni_msgq_stat_get_errs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_errs));
+}
+
+uint64_t
+nni_msgq_stat_put_errs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_errs));
+}
+
+uint64_t
+nni_msgq_stat_discards(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_discards));
+}