From 1add4cc261f4dfa31b3c03a454a443a345358b7c Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 10 Aug 2025 10:45:13 -0700 Subject: fixes #2146 need statistics for PUB protocol --- src/sp/protocol/pubsub0/pub.c | 70 ++++++++++++++++++++++++++++++++++++++ src/sp/protocol/pubsub0/pub_test.c | 22 ++++++++++++ 2 files changed, 92 insertions(+) (limited to 'src/sp') diff --git a/src/sp/protocol/pubsub0/pub.c b/src/sp/protocol/pubsub0/pub.c index 5104162b..8d4f8ae2 100644 --- a/src/sp/protocol/pubsub0/pub.c +++ b/src/sp/protocol/pubsub0/pub.c @@ -40,6 +40,13 @@ struct pub0_sock { bool closed; size_t sendbuf; nni_pollable sendable; + +#ifdef NNG_ENABLE_STATS + nni_stat_item stat_tx_direct; + nni_stat_item stat_tx_discard; + nni_stat_item stat_tx_queued; + nni_stat_item stat_tx_bufsz; +#endif }; // pub0_pipe is our per-pipe protocol private structure. @@ -73,6 +80,43 @@ pub0_sock_init(void *arg, nni_sock *ns) nni_mtx_init(&sock->mtx); NNI_LIST_INIT(&sock->pipes, pub0_pipe, node); sock->sendbuf = 16; // fairly arbitrary + +#if NNG_ENABLE_STATS + static const nni_stat_info tx_direct_info = { + .si_name = "tx_direct", + .si_desc = "messages sent without queueing (per pipe)", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + }; + static const nni_stat_info tx_discard_info = { + .si_name = "tx_discard", + .si_desc = "messages dropped (once per pipe)", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + }; + static const nni_stat_info tx_queued_info = { + .si_name = "tx_queued", + .si_desc = "messages queued (once per pipe)", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + }; + static const nni_stat_info tx_bufsz_info = { + .si_name = "tx_buf_size", + .si_desc = "pipe buffer size for queued messages", + .si_type = NNG_STAT_LEVEL, + .si_unit = NNG_UNIT_MESSAGES, + }; + + nni_stat_init(&sock->stat_tx_direct, &tx_direct_info); + nni_stat_init(&sock->stat_tx_discard, &tx_discard_info); + nni_stat_init(&sock->stat_tx_queued, &tx_queued_info); + nni_stat_init(&sock->stat_tx_bufsz, &tx_bufsz_info); + nni_sock_add_stat(ns, &sock->stat_tx_direct); + nni_sock_add_stat(ns, &sock->stat_tx_discard); + nni_sock_add_stat(ns, &sock->stat_tx_queued); + nni_sock_add_stat(ns, &sock->stat_tx_bufsz); + nni_stat_set_value(&sock->stat_tx_bufsz, sock->sendbuf); +#endif } static void @@ -227,6 +271,11 @@ pub0_sock_send(void *arg, nni_aio *aio) msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_mtx_lock(&sock->mtx); +#ifdef NNG_ENABLE_STATS + int dropped = 0; + int direct = 0; + int queued = 0; +#endif NNI_LIST_FOREACH (&sock->pipes, p) { nni_msg_clone(msg); @@ -236,14 +285,32 @@ pub0_sock_send(void *arg, nni_aio *aio) nni_msg *old; (void) nni_lmq_get(&p->sendq, &old); nni_msg_free(old); +#ifdef NNG_ENABLE_STATS + dropped++; +#endif } +#ifdef NNG_ENABLE_STATS + queued++; +#endif + nni_lmq_put(&p->sendq, msg); } else { p->busy = true; nni_aio_set_msg(&p->aio_send, msg); nni_pipe_send(p->pipe, &p->aio_send); +#ifdef NNG_ENABLE_STATS + direct++; +#endif } } +#ifdef NNG_ENABLE_STATS + if (direct == 0 && queued == 0) { + dropped++; // we didn't find a pipe to send it to! + } + nni_stat_inc(&sock->stat_tx_discard, dropped); + nni_stat_inc(&sock->stat_tx_queued, queued); + nni_stat_inc(&sock->stat_tx_direct, direct); +#endif nni_mtx_unlock(&sock->mtx); nng_msg_free(msg); nni_aio_finish(aio, 0, len); @@ -273,6 +340,9 @@ pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t) nni_mtx_lock(&sock->mtx); sock->sendbuf = (size_t) val; +#ifdef NNG_ENABLE_STATS + nni_stat_set_value(&sock->stat_tx_bufsz, sock->sendbuf); +#endif NNI_LIST_FOREACH (&sock->pipes, p) { // If we fail part way through (should only be ENOMEM), we // stop short. The others would likely fail for ENOMEM as diff --git a/src/sp/protocol/pubsub0/pub_test.c b/src/sp/protocol/pubsub0/pub_test.c index 2b571bef..3018ced9 100644 --- a/src/sp/protocol/pubsub0/pub_test.c +++ b/src/sp/protocol/pubsub0/pub_test.c @@ -96,6 +96,17 @@ test_pub_send_no_pipes(void) NUTS_PASS(nng_pub0_open(&pub)); NUTS_SEND(pub, "DROP1"); NUTS_SEND(pub, "DROP2"); + + nng_stat *stats; + const nng_stat *pubs; + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((pubs = nng_stat_find_socket(stats, pub)) != NULL); + + nng_stats_dump(pubs); + nng_stats_free(stats); + NUTS_CLOSE(pub); } @@ -127,6 +138,7 @@ test_pub_validate_peer(void) NUTS_CLOSE(s1); NUTS_CLOSE(s2); + nng_stats_dump(stats); nng_stats_free(stats); } @@ -157,6 +169,16 @@ test_pub_send_queued(void) NUTS_RECV(sub, "three musketeers"); NUTS_RECV(sub, "four"); + nng_stat *stats; + const nng_stat *pubs; + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((pubs = nng_stat_find_socket(stats, pub)) != NULL); + + nng_stats_dump(pubs); + nng_stats_free(stats); + NUTS_CLOSE(pub); NUTS_CLOSE(sub); } -- cgit v1.2.3-70-g09d2