diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 100 |
1 files changed, 99 insertions, 1 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 62a3893b..4b59aead 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -37,6 +37,15 @@ struct nni_msgq { // Filters. nni_msgq_filter mq_filter_fn; void * mq_filter_arg; + + // Statistics. + nni_atomic_u64 mq_get_msgs; + nni_atomic_u64 mq_put_msgs; + nni_atomic_u64 mq_get_bytes; + nni_atomic_u64 mq_put_bytes; + nni_atomic_u64 mq_get_errs; + nni_atomic_u64 mq_put_errs; + nni_atomic_u64 mq_discards; }; static void nni_msgq_run_notify(nni_msgq *); @@ -76,6 +85,14 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) mq->mq_geterr = 0; *mqp = mq; + nni_atomic_init64(&mq->mq_get_bytes); + nni_atomic_init64(&mq->mq_put_bytes); + nni_atomic_init64(&mq->mq_get_msgs); + nni_atomic_init64(&mq->mq_put_msgs); + nni_atomic_init64(&mq->mq_get_errs); + nni_atomic_init64(&mq->mq_put_errs); + nni_atomic_init64(&mq->mq_discards); + return (0); } @@ -185,6 +202,7 @@ nni_msgq_flush(nni_msgq *mq) mq->mq_get = 0; } mq->mq_len--; + nni_atomic_inc64(&mq->mq_discards, 1); nni_msg_free(msg); } nni_msgq_run_notify(mq); @@ -215,13 +233,20 @@ nni_msgq_run_putq(nni_msgq *mq) nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); if (mq->mq_filter_fn != NULL) { msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); + } else { + nni_atomic_inc64(&mq->mq_discards, 1); } nni_aio_finish(waio, 0, len); @@ -230,6 +255,8 @@ nni_msgq_run_putq(nni_msgq *mq) // Otherwise if we have room in the buffer, just queue it. if (mq->mq_len < mq->mq_cap) { + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); nni_list_remove(&mq->mq_aio_putq, waio); mq->mq_msgs[mq->mq_put++] = msg; if (mq->mq_put == mq->mq_alloc) { @@ -265,8 +292,13 @@ nni_msgq_run_getq(nni_msgq *mq) msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + size_t len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); + } else { + nni_atomic_inc64(&mq->mq_discards, 1); } continue; } @@ -278,18 +310,26 @@ nni_msgq_run_getq(nni_msgq *mq) msg = nni_aio_get_msg(waio); len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); + nni_aio_finish(waio, 0, len); if (mq->mq_filter_fn != NULL) { msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + len = nni_msg_len(msg); nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); + } else { + nni_atomic_inc64(&mq->mq_discards, 1); } - nni_aio_finish(waio, 0, len); continue; } @@ -338,6 +378,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) } nni_mtx_lock(&mq->mq_lock); if (mq->mq_puterr) { + nni_atomic_inc64(&mq->mq_put_errs, 1); nni_aio_finish_error(aio, mq->mq_puterr); nni_mtx_unlock(&mq->mq_lock); return; @@ -348,6 +389,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) rv = nni_aio_schedule(aio, nni_msgq_cancel, mq); if ((rv != 0) && (mq->mq_len >= mq->mq_cap) && (nni_list_empty(&mq->mq_aio_getq))) { + nni_atomic_inc64(&mq->mq_put_errs, 1); nni_mtx_unlock(&mq->mq_lock); nni_aio_finish_error(aio, rv); return; @@ -370,6 +412,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_mtx_lock(&mq->mq_lock); if (mq->mq_geterr) { nni_mtx_unlock(&mq->mq_lock); + nni_atomic_inc64(&mq->mq_get_errs, 1); nni_aio_finish_error(aio, mq->mq_geterr); return; } @@ -377,6 +420,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) if ((rv != 0) && (mq->mq_len == 0) && (nni_list_empty(&mq->mq_aio_putq))) { nni_mtx_unlock(&mq->mq_lock); + nni_atomic_inc64(&mq->mq_get_errs, 1); nni_aio_finish_error(aio, rv); return; } @@ -392,6 +436,7 @@ int nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) { nni_aio *raio; + size_t len; nni_mtx_lock(&mq->mq_lock); if (mq->mq_closed) { @@ -399,15 +444,22 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) return (NNG_ECLOSED); } + len = nni_msg_len(msg); + // The presence of any blocked reader indicates that // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); if (mq->mq_filter_fn != NULL) { msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); nni_list_remove(&mq->mq_aio_getq, raio); nni_aio_finish_msg(raio, msg); nni_msgq_run_notify(mq); @@ -418,6 +470,9 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) // Otherwise if we have room in the buffer, just queue it. if (mq->mq_len < mq->mq_cap) { + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); + mq->mq_msgs[mq->mq_put++] = msg; if (mq->mq_put == mq->mq_alloc) { mq->mq_put = 0; @@ -514,6 +569,7 @@ nni_msgq_resize(nni_msgq *mq, int cap) mq->mq_get = 0; } mq->mq_len--; + nni_atomic_inc64(&mq->mq_discards, 1); nni_msg_free(msg); } if (newq == NULL) { @@ -586,3 +642,45 @@ nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp) *sp = mq->mq_sendable; return (0); } + +uint64_t +nni_msgq_stat_get_bytes(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_get_bytes)); +} + +uint64_t +nni_msgq_stat_put_bytes(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_put_bytes)); +} + +uint64_t +nni_msgq_stat_get_msgs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_get_msgs)); +} + +uint64_t +nni_msgq_stat_put_msgs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_put_msgs)); +} + +uint64_t +nni_msgq_stat_get_errs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_get_errs)); +} + +uint64_t +nni_msgq_stat_put_errs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_put_errs)); +} + +uint64_t +nni_msgq_stat_discards(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_discards)); +} |
