aboutsummaryrefslogtreecommitdiff
path: root/src/sp
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2025-08-10 10:45:13 -0700
committerGarrett D'Amore <garrett@damore.org>2025-08-10 10:45:13 -0700
commit77dc72635e64d959822b86562ff57f3fde79b5ac (patch)
tree0a5b53273128dab61b4662f2c8ccbf0af224d3bb /src/sp
parent72dc573e732049353c64b1a58d5df24f3f661b9c (diff)
downloadnng-pub-stats.tar.gz
nng-pub-stats.tar.bz2
nng-pub-stats.zip
fixes #2146 need statistics for PUB protocolpub-stats
Diffstat (limited to 'src/sp')
-rw-r--r--src/sp/protocol/pubsub0/pub.c70
-rw-r--r--src/sp/protocol/pubsub0/pub_test.c22
2 files changed, 92 insertions, 0 deletions
diff --git a/src/sp/protocol/pubsub0/pub.c b/src/sp/protocol/pubsub0/pub.c
index 5104162b..8d4f8ae2 100644
--- a/src/sp/protocol/pubsub0/pub.c
+++ b/src/sp/protocol/pubsub0/pub.c
@@ -40,6 +40,13 @@ struct pub0_sock {
bool closed;
size_t sendbuf;
nni_pollable sendable;
+
+#ifdef NNG_ENABLE_STATS
+ nni_stat_item stat_tx_direct;
+ nni_stat_item stat_tx_discard;
+ nni_stat_item stat_tx_queued;
+ nni_stat_item stat_tx_bufsz;
+#endif
};
// pub0_pipe is our per-pipe protocol private structure.
@@ -73,6 +80,43 @@ pub0_sock_init(void *arg, nni_sock *ns)
nni_mtx_init(&sock->mtx);
NNI_LIST_INIT(&sock->pipes, pub0_pipe, node);
sock->sendbuf = 16; // fairly arbitrary
+
+#if NNG_ENABLE_STATS
+ static const nni_stat_info tx_direct_info = {
+ .si_name = "tx_direct",
+ .si_desc = "messages sent without queueing (per pipe)",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ };
+ static const nni_stat_info tx_discard_info = {
+ .si_name = "tx_discard",
+ .si_desc = "messages dropped (once per pipe)",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ };
+ static const nni_stat_info tx_queued_info = {
+ .si_name = "tx_queued",
+ .si_desc = "messages queued (once per pipe)",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ };
+ static const nni_stat_info tx_bufsz_info = {
+ .si_name = "tx_buf_size",
+ .si_desc = "pipe buffer size for queued messages",
+ .si_type = NNG_STAT_LEVEL,
+ .si_unit = NNG_UNIT_MESSAGES,
+ };
+
+ nni_stat_init(&sock->stat_tx_direct, &tx_direct_info);
+ nni_stat_init(&sock->stat_tx_discard, &tx_discard_info);
+ nni_stat_init(&sock->stat_tx_queued, &tx_queued_info);
+ nni_stat_init(&sock->stat_tx_bufsz, &tx_bufsz_info);
+ nni_sock_add_stat(ns, &sock->stat_tx_direct);
+ nni_sock_add_stat(ns, &sock->stat_tx_discard);
+ nni_sock_add_stat(ns, &sock->stat_tx_queued);
+ nni_sock_add_stat(ns, &sock->stat_tx_bufsz);
+ nni_stat_set_value(&sock->stat_tx_bufsz, sock->sendbuf);
+#endif
}
static void
@@ -227,6 +271,11 @@ pub0_sock_send(void *arg, nni_aio *aio)
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
nni_mtx_lock(&sock->mtx);
+#ifdef NNG_ENABLE_STATS
+ int dropped = 0;
+ int direct = 0;
+ int queued = 0;
+#endif
NNI_LIST_FOREACH (&sock->pipes, p) {
nni_msg_clone(msg);
@@ -236,14 +285,32 @@ pub0_sock_send(void *arg, nni_aio *aio)
nni_msg *old;
(void) nni_lmq_get(&p->sendq, &old);
nni_msg_free(old);
+#ifdef NNG_ENABLE_STATS
+ dropped++;
+#endif
}
+#ifdef NNG_ENABLE_STATS
+ queued++;
+#endif
+
nni_lmq_put(&p->sendq, msg);
} else {
p->busy = true;
nni_aio_set_msg(&p->aio_send, msg);
nni_pipe_send(p->pipe, &p->aio_send);
+#ifdef NNG_ENABLE_STATS
+ direct++;
+#endif
}
}
+#ifdef NNG_ENABLE_STATS
+ if (direct == 0 && queued == 0) {
+ dropped++; // we didn't find a pipe to send it to!
+ }
+ nni_stat_inc(&sock->stat_tx_discard, dropped);
+ nni_stat_inc(&sock->stat_tx_queued, queued);
+ nni_stat_inc(&sock->stat_tx_direct, direct);
+#endif
nni_mtx_unlock(&sock->mtx);
nng_msg_free(msg);
nni_aio_finish(aio, 0, len);
@@ -273,6 +340,9 @@ pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t)
nni_mtx_lock(&sock->mtx);
sock->sendbuf = (size_t) val;
+#ifdef NNG_ENABLE_STATS
+ nni_stat_set_value(&sock->stat_tx_bufsz, sock->sendbuf);
+#endif
NNI_LIST_FOREACH (&sock->pipes, p) {
// If we fail part way through (should only be ENOMEM), we
// stop short. The others would likely fail for ENOMEM as
diff --git a/src/sp/protocol/pubsub0/pub_test.c b/src/sp/protocol/pubsub0/pub_test.c
index 2b571bef..3018ced9 100644
--- a/src/sp/protocol/pubsub0/pub_test.c
+++ b/src/sp/protocol/pubsub0/pub_test.c
@@ -96,6 +96,17 @@ test_pub_send_no_pipes(void)
NUTS_PASS(nng_pub0_open(&pub));
NUTS_SEND(pub, "DROP1");
NUTS_SEND(pub, "DROP2");
+
+ nng_stat *stats;
+ const nng_stat *pubs;
+ NUTS_PASS(nng_stats_get(&stats));
+
+ NUTS_TRUE(stats != NULL);
+ NUTS_TRUE((pubs = nng_stat_find_socket(stats, pub)) != NULL);
+
+ nng_stats_dump(pubs);
+ nng_stats_free(stats);
+
NUTS_CLOSE(pub);
}
@@ -127,6 +138,7 @@ test_pub_validate_peer(void)
NUTS_CLOSE(s1);
NUTS_CLOSE(s2);
+ nng_stats_dump(stats);
nng_stats_free(stats);
}
@@ -157,6 +169,16 @@ test_pub_send_queued(void)
NUTS_RECV(sub, "three musketeers");
NUTS_RECV(sub, "four");
+ nng_stat *stats;
+ const nng_stat *pubs;
+ NUTS_PASS(nng_stats_get(&stats));
+
+ NUTS_TRUE(stats != NULL);
+ NUTS_TRUE((pubs = nng_stat_find_socket(stats, pub)) != NULL);
+
+ nng_stats_dump(pubs);
+ nng_stats_free(stats);
+
NUTS_CLOSE(pub);
NUTS_CLOSE(sub);
}