From d83b96faeb02d7a3574e63880141d6b23f31ced1 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 22 Aug 2018 08:56:53 -0700 Subject: fixes #4 Statistics support This introduces new public APIs for obtaining statistics, and adds some generic stats for dialers, listeners, pipes, and sockets. Also added are stats for inproc and pairv1 protocol. The other protocols and transports will have stats added incrementally as time goes on. A simple test program, and man pages are provided for this. Start by looking at nng_stat(5). Statistics does have some impact, and they can be disabled by using the advanced NNG_ENABLE_STATS (setting it to OFF, it's ON by default) if you need to build a minimized configuration. --- src/core/msgqueue.c | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) (limited to 'src/core/msgqueue.c') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 62a3893b..4b59aead 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -37,6 +37,15 @@ struct nni_msgq { // Filters. nni_msgq_filter mq_filter_fn; void * mq_filter_arg; + + // Statistics. + nni_atomic_u64 mq_get_msgs; + nni_atomic_u64 mq_put_msgs; + nni_atomic_u64 mq_get_bytes; + nni_atomic_u64 mq_put_bytes; + nni_atomic_u64 mq_get_errs; + nni_atomic_u64 mq_put_errs; + nni_atomic_u64 mq_discards; }; static void nni_msgq_run_notify(nni_msgq *); @@ -76,6 +85,14 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) mq->mq_geterr = 0; *mqp = mq; + nni_atomic_init64(&mq->mq_get_bytes); + nni_atomic_init64(&mq->mq_put_bytes); + nni_atomic_init64(&mq->mq_get_msgs); + nni_atomic_init64(&mq->mq_put_msgs); + nni_atomic_init64(&mq->mq_get_errs); + nni_atomic_init64(&mq->mq_put_errs); + nni_atomic_init64(&mq->mq_discards); + return (0); } @@ -185,6 +202,7 @@ nni_msgq_flush(nni_msgq *mq) mq->mq_get = 0; } mq->mq_len--; + nni_atomic_inc64(&mq->mq_discards, 1); nni_msg_free(msg); } nni_msgq_run_notify(mq); @@ -215,13 +233,20 @@ nni_msgq_run_putq(nni_msgq *mq) nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); if (mq->mq_filter_fn != NULL) { msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); + } else { + nni_atomic_inc64(&mq->mq_discards, 1); } nni_aio_finish(waio, 0, len); @@ -230,6 +255,8 @@ nni_msgq_run_putq(nni_msgq *mq) // Otherwise if we have room in the buffer, just queue it. if (mq->mq_len < mq->mq_cap) { + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); nni_list_remove(&mq->mq_aio_putq, waio); mq->mq_msgs[mq->mq_put++] = msg; if (mq->mq_put == mq->mq_alloc) { @@ -265,8 +292,13 @@ nni_msgq_run_getq(nni_msgq *mq) msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + size_t len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); + } else { + nni_atomic_inc64(&mq->mq_discards, 1); } continue; } @@ -278,18 +310,26 @@ nni_msgq_run_getq(nni_msgq *mq) msg = nni_aio_get_msg(waio); len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); + nni_aio_finish(waio, 0, len); if (mq->mq_filter_fn != NULL) { msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + len = nni_msg_len(msg); nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); + } else { + nni_atomic_inc64(&mq->mq_discards, 1); } - nni_aio_finish(waio, 0, len); continue; } @@ -338,6 +378,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) } nni_mtx_lock(&mq->mq_lock); if (mq->mq_puterr) { + nni_atomic_inc64(&mq->mq_put_errs, 1); nni_aio_finish_error(aio, mq->mq_puterr); nni_mtx_unlock(&mq->mq_lock); return; @@ -348,6 +389,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) rv = nni_aio_schedule(aio, nni_msgq_cancel, mq); if ((rv != 0) && (mq->mq_len >= mq->mq_cap) && (nni_list_empty(&mq->mq_aio_getq))) { + nni_atomic_inc64(&mq->mq_put_errs, 1); nni_mtx_unlock(&mq->mq_lock); nni_aio_finish_error(aio, rv); return; @@ -370,6 +412,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_mtx_lock(&mq->mq_lock); if (mq->mq_geterr) { nni_mtx_unlock(&mq->mq_lock); + nni_atomic_inc64(&mq->mq_get_errs, 1); nni_aio_finish_error(aio, mq->mq_geterr); return; } @@ -377,6 +420,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) if ((rv != 0) && (mq->mq_len == 0) && (nni_list_empty(&mq->mq_aio_putq))) { nni_mtx_unlock(&mq->mq_lock); + nni_atomic_inc64(&mq->mq_get_errs, 1); nni_aio_finish_error(aio, rv); return; } @@ -392,6 +436,7 @@ int nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) { nni_aio *raio; + size_t len; nni_mtx_lock(&mq->mq_lock); if (mq->mq_closed) { @@ -399,15 +444,22 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) return (NNG_ECLOSED); } + len = nni_msg_len(msg); + // The presence of any blocked reader indicates that // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); if (mq->mq_filter_fn != NULL) { msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); nni_list_remove(&mq->mq_aio_getq, raio); nni_aio_finish_msg(raio, msg); nni_msgq_run_notify(mq); @@ -418,6 +470,9 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) // Otherwise if we have room in the buffer, just queue it. if (mq->mq_len < mq->mq_cap) { + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); + mq->mq_msgs[mq->mq_put++] = msg; if (mq->mq_put == mq->mq_alloc) { mq->mq_put = 0; @@ -514,6 +569,7 @@ nni_msgq_resize(nni_msgq *mq, int cap) mq->mq_get = 0; } mq->mq_len--; + nni_atomic_inc64(&mq->mq_discards, 1); nni_msg_free(msg); } if (newq == NULL) { @@ -586,3 +642,45 @@ nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp) *sp = mq->mq_sendable; return (0); } + +uint64_t +nni_msgq_stat_get_bytes(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_get_bytes)); +} + +uint64_t +nni_msgq_stat_put_bytes(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_put_bytes)); +} + +uint64_t +nni_msgq_stat_get_msgs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_get_msgs)); +} + +uint64_t +nni_msgq_stat_put_msgs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_put_msgs)); +} + +uint64_t +nni_msgq_stat_get_errs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_get_errs)); +} + +uint64_t +nni_msgq_stat_put_errs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_put_errs)); +} + +uint64_t +nni_msgq_stat_discards(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_discards)); +} -- cgit v1.2.3-70-g09d2