diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/dialer.c | 87 | ||||
| -rw-r--r-- | src/core/dialer.h | 1 | ||||
| -rw-r--r-- | src/core/init.c | 4 | ||||
| -rw-r--r-- | src/core/listener.c | 82 | ||||
| -rw-r--r-- | src/core/listener.h | 1 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 100 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 9 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/pipe.c | 50 | ||||
| -rw-r--r-- | src/core/pipe.h | 3 | ||||
| -rw-r--r-- | src/core/socket.c | 214 | ||||
| -rw-r--r-- | src/core/socket.h | 9 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 51 | ||||
| -rw-r--r-- | src/core/stats.c | 525 | ||||
| -rw-r--r-- | src/core/stats.h | 108 | ||||
| -rw-r--r-- | src/nng.c | 68 | ||||
| -rw-r--r-- | src/nng.h | 102 | ||||
| -rw-r--r-- | src/protocol/pair1/pair.c | 68 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 141 |
20 files changed, 1451 insertions, 175 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 14894db8..305922db 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -70,6 +70,8 @@ set (NNG_SOURCES core/reap.h core/socket.c core/socket.h + core/stats.c + core/stats.h core/strs.c core/strs.h core/taskq.c diff --git a/src/core/dialer.c b/src/core/dialer.c index 00ebb513..11faed7c 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -23,6 +23,8 @@ static void dialer_timer_cb(void *); static nni_idhash *dialers; static nni_mtx dialers_lk; +#define BUMPSTAT(x) nni_stat_inc_atomic(x, 1) + int nni_dialer_sys_init(void) { @@ -32,8 +34,7 @@ nni_dialer_sys_init(void) return (rv); } nni_mtx_init(&dialers_lk); - nni_idhash_set_limits( - dialers, 1, 0x7fffffff, nni_random() & 0x7fffffff); + nni_idhash_set_limits(dialers, 1, 0x7fffffff, 1); return (0); } @@ -70,6 +71,55 @@ nni_dialer_destroy(nni_dialer *d) NNI_FREE_STRUCT(d); } +static void +dialer_stats_init(nni_dialer *d) +{ + nni_dialer_stats *st = &d->d_stats; + nni_stat_item * root = &st->s_root; + + nni_stat_init_scope(root, st->s_scope, "dialer statistics"); + + nni_stat_init_id(&st->s_id, "id", "dialer id", d->d_id); + nni_stat_append(root, &st->s_id); + + nni_stat_init_id(&st->s_sock, "socket", "socket for dialer", + nni_sock_id(d->d_sock)); + nni_stat_append(root, &st->s_sock); + + nni_stat_init_string( + &st->s_url, "url", "dialer url", d->d_url->u_rawurl); + nni_stat_append(root, &st->s_url); + + nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes"); + nni_stat_append(root, &st->s_npipes); + + nni_stat_init_atomic(&st->s_connok, "connok", "connections made"); + nni_stat_append(root, &st->s_connok); + + nni_stat_init_atomic( + &st->s_canceled, "canceled", "connections canceled"); + nni_stat_append(root, &st->s_canceled); + + nni_stat_init_atomic(&st->s_refused, "refused", "connections refused"); + nni_stat_append(root, &st->s_refused); + + nni_stat_init_atomic( + &st->s_timedout, "timedout", "connections timed out"); + nni_stat_append(root, &st->s_timedout); + + nni_stat_init_atomic( + &st->s_othererr, "othererr", "other connection errors"); + nni_stat_append(root, &st->s_othererr); + + nni_stat_init_atomic( + &st->s_protorej, "protoreject", "pipes rejected by protocol"); + nni_stat_append(root, &st->s_protorej); + + nni_stat_init_atomic( + &st->s_apprej, "appreject", "pipes rejected by application"); + nni_stat_append(root, &st->s_apprej); +} + int nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) { @@ -110,6 +160,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) nni_mtx_init(&d->d_mtx); + dialer_stats_init(d); if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) || ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || ((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) || @@ -119,6 +170,10 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) return (rv); } + snprintf(d->d_stats.s_scope, sizeof(d->d_stats.s_scope), "dialer%u", + d->d_id); + nni_stat_set_value(&d->d_stats.s_id, d->d_id); + nni_stat_append(NULL, &d->d_stats.s_root); *dp = d; return (0); } @@ -167,6 +222,7 @@ nni_dialer_rele(nni_dialer *d) nni_mtx_lock(&dialers_lk); d->d_refcnt--; if ((d->d_refcnt == 0) && (d->d_closed)) { + nni_stat_remove(&d->d_stats.s_root); nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d); } nni_mtx_unlock(&dialers_lk); @@ -242,12 +298,33 @@ dialer_connect_cb(void *arg) switch ((rv = nni_aio_result(aio))) { case 0: + BUMPSTAT(&d->d_stats.s_connok); nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0)); break; case NNG_ECLOSED: // No further action. case NNG_ECANCELED: // No further action. + BUMPSTAT(&d->d_stats.s_canceled); break; + case NNG_ECONNREFUSED: + BUMPSTAT(&d->d_stats.s_refused); + if (uaio == NULL) { + nni_dialer_timer_start(d); + } else { + nni_atomic_flag_reset(&d->d_started); + } + break; + + case NNG_ETIMEDOUT: + BUMPSTAT(&d->d_stats.s_timedout); + if (uaio == NULL) { + nni_dialer_timer_start(d); + } else { + nni_atomic_flag_reset(&d->d_started); + } + break; + default: + BUMPSTAT(&d->d_stats.s_othererr); if (uaio == NULL) { nni_dialer_timer_start(d); } else { @@ -389,3 +466,9 @@ nni_dialer_getopt( return (nni_sock_getopt(d->d_sock, name, valp, szp, t)); } + +void +nni_dialer_add_stat(nni_dialer *d, nni_stat_item *stat) +{ + nni_stat_append(&d->d_stats.s_root, stat); +} diff --git a/src/core/dialer.h b/src/core/dialer.h index 361c6d5e..da2cc992 100644 --- a/src/core/dialer.h +++ b/src/core/dialer.h @@ -26,5 +26,6 @@ extern int nni_dialer_setopt( nni_dialer *, const char *, const void *, size_t, nni_opt_type); extern int nni_dialer_getopt( nni_dialer *, const char *, void *, size_t *, nni_opt_type); +extern void nni_dialer_add_stat(nni_dialer *, nni_stat_item *); #endif // CORE_DIALER_H diff --git a/src/core/init.c b/src/core/init.c index c66e54d2..60736fb7 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -27,7 +27,8 @@ nni_init_helper(void) NNI_LIST_INIT(&nni_init_list, nni_initializer, i_node); nni_inited = true; - if (((rv = nni_taskq_sys_init()) != 0) || + if (((rv = nni_stat_sys_init()) != 0) || + ((rv = nni_taskq_sys_init()) != 0) || ((rv = nni_reap_sys_init()) != 0) || ((rv = nni_timer_sys_init()) != 0) || ((rv = nni_aio_sys_init()) != 0) || @@ -81,6 +82,7 @@ nni_fini(void) nni_timer_sys_fini(); nni_taskq_sys_fini(); nni_reap_sys_fini(); // must be before timer and aio (expire) + nni_stat_sys_fini(); nni_mtx_fini(&nni_init_mtx); nni_plat_fini(); diff --git a/src/core/listener.c b/src/core/listener.c index 1fe6faab..135478de 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -24,6 +24,8 @@ static void listener_timer_cb(void *); static nni_idhash *listeners; static nni_mtx listeners_lk; +#define BUMPSTAT(x) nni_stat_inc_atomic(x, 1) + int nni_listener_sys_init(void) { @@ -33,8 +35,7 @@ nni_listener_sys_init(void) return (rv); } nni_mtx_init(&listeners_lk); - nni_idhash_set_limits( - listeners, 1, 0x7fffffff, nni_random() & 0x7fffffff); + nni_idhash_set_limits(listeners, 1, 0x7fffffff, 1); return (0); } @@ -70,6 +71,55 @@ nni_listener_destroy(nni_listener *l) NNI_FREE_STRUCT(l); } +static void +listener_stats_init(nni_listener *l) +{ + nni_listener_stats *st = &l->l_stats; + nni_stat_item * root = &st->s_root; + + nni_stat_init_scope(root, st->s_scope, "listener statistics"); + + // NB: This will be updated later. + nni_stat_init_id(&st->s_id, "id", "listener id", l->l_id); + nni_stat_append(root, &st->s_id); + + nni_stat_init_id(&st->s_sock, "socket", "socket for listener", + nni_sock_id(l->l_sock)); + nni_stat_append(root, &st->s_sock); + + nni_stat_init_string( + &st->s_url, "url", "listener url", l->l_url->u_rawurl); + nni_stat_append(root, &st->s_url); + + nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes"); + nni_stat_append(root, &st->s_npipes); + + nni_stat_init_atomic(&st->s_accept, "accept", "connections accepted"); + nni_stat_append(root, &st->s_accept); + + nni_stat_init_atomic( + &st->s_aborted, "aborted", "accepts aborted remotely"); + nni_stat_append(root, &st->s_aborted); + + nni_stat_init_atomic(&st->s_timedout, "timedout", "accepts timed out"); + nni_stat_append(root, &st->s_timedout); + + nni_stat_init_atomic(&st->s_canceled, "canceled", "accepts canceled"); + nni_stat_append(root, &st->s_canceled); + + nni_stat_init_atomic( + &st->s_othererr, "othererr", "other accept errors"); + nni_stat_append(root, &st->s_othererr); + + nni_stat_init_atomic( + &st->s_protorej, "protoreject", "pipes rejected by protocol"); + nni_stat_append(root, &st->s_protorej); + + nni_stat_init_atomic( + &st->s_apprej, "appreject", "pipes rejected by application"); + nni_stat_append(root, &st->s_apprej); +} + int nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) { @@ -107,6 +157,7 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) NNI_LIST_NODE_INIT(&l->l_node); NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node); + listener_stats_init(l); if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) || ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || @@ -117,6 +168,12 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) return (rv); } + // Update a few stat bits, and register them. + snprintf(l->l_stats.s_scope, sizeof(l->l_stats.s_scope), "listener%u", + l->l_id); + nni_stat_set_value(&l->l_stats.s_id, l->l_id); + nni_stat_append(NULL, &l->l_stats.s_root); + *lp = l; return (0); } @@ -165,6 +222,7 @@ nni_listener_rele(nni_listener *l) nni_mtx_lock(&listeners_lk); l->l_refcnt--; if ((l->l_refcnt == 0) && (l->l_closed)) { + nni_stat_remove(&l->l_stats.s_root); nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l); } nni_mtx_unlock(&listeners_lk); @@ -233,18 +291,30 @@ listener_accept_cb(void *arg) switch (nni_aio_result(aio)) { case 0: + BUMPSTAT(&l->l_stats.s_accept); nni_listener_add_pipe(l, nni_aio_get_output(aio, 0)); listener_accept_start(l); break; case NNG_ECONNABORTED: // remote condition, no cooldown case NNG_ECONNRESET: // remote condition, no cooldown - case NNG_EPEERAUTH: // peer validation failure + BUMPSTAT(&l->l_stats.s_aborted); + listener_accept_start(l); + break; + case NNG_ETIMEDOUT: + // No need to sleep since we timed out already. + BUMPSTAT(&l->l_stats.s_timedout); + listener_accept_start(l); + break; + case NNG_EPEERAUTH: // peer validation failure + BUMPSTAT(&l->l_stats.s_othererr); listener_accept_start(l); break; case NNG_ECLOSED: // no further action case NNG_ECANCELED: // no further action + BUMPSTAT(&l->l_stats.s_canceled); break; default: + BUMPSTAT(&l->l_stats.s_othererr); // We don't really know why we failed, but we backoff // here. This is because errors here are probably due // to system failures (resource exhaustion) and we hope @@ -339,3 +409,9 @@ nni_listener_getopt( return (nni_sock_getopt(l->l_sock, name, valp, szp, t)); } + +void +nni_listener_add_stat(nni_listener *l, nni_stat_item *stat) +{ + nni_stat_append(&l->l_stats.s_root, stat); +} diff --git a/src/core/listener.h b/src/core/listener.h index 67a782bd..828102a2 100644 --- a/src/core/listener.h +++ b/src/core/listener.h @@ -26,5 +26,6 @@ extern int nni_listener_setopt( nni_listener *, const char *, const void *, size_t, nni_opt_type); extern int nni_listener_getopt( nni_listener *, const char *, void *, size_t *, nni_opt_type); +extern void nni_listener_add_stat(nni_listener *, nni_stat_item *); #endif // CORE_LISTENER_H diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 62a3893b..4b59aead 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -37,6 +37,15 @@ struct nni_msgq { // Filters. nni_msgq_filter mq_filter_fn; void * mq_filter_arg; + + // Statistics. + nni_atomic_u64 mq_get_msgs; + nni_atomic_u64 mq_put_msgs; + nni_atomic_u64 mq_get_bytes; + nni_atomic_u64 mq_put_bytes; + nni_atomic_u64 mq_get_errs; + nni_atomic_u64 mq_put_errs; + nni_atomic_u64 mq_discards; }; static void nni_msgq_run_notify(nni_msgq *); @@ -76,6 +85,14 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) mq->mq_geterr = 0; *mqp = mq; + nni_atomic_init64(&mq->mq_get_bytes); + nni_atomic_init64(&mq->mq_put_bytes); + nni_atomic_init64(&mq->mq_get_msgs); + nni_atomic_init64(&mq->mq_put_msgs); + nni_atomic_init64(&mq->mq_get_errs); + nni_atomic_init64(&mq->mq_put_errs); + nni_atomic_init64(&mq->mq_discards); + return (0); } @@ -185,6 +202,7 @@ nni_msgq_flush(nni_msgq *mq) mq->mq_get = 0; } mq->mq_len--; + nni_atomic_inc64(&mq->mq_discards, 1); nni_msg_free(msg); } nni_msgq_run_notify(mq); @@ -215,13 +233,20 @@ nni_msgq_run_putq(nni_msgq *mq) nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); if (mq->mq_filter_fn != NULL) { msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); + } else { + nni_atomic_inc64(&mq->mq_discards, 1); } nni_aio_finish(waio, 0, len); @@ -230,6 +255,8 @@ nni_msgq_run_putq(nni_msgq *mq) // Otherwise if we have room in the buffer, just queue it. if (mq->mq_len < mq->mq_cap) { + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); nni_list_remove(&mq->mq_aio_putq, waio); mq->mq_msgs[mq->mq_put++] = msg; if (mq->mq_put == mq->mq_alloc) { @@ -265,8 +292,13 @@ nni_msgq_run_getq(nni_msgq *mq) msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + size_t len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); + } else { + nni_atomic_inc64(&mq->mq_discards, 1); } continue; } @@ -278,18 +310,26 @@ nni_msgq_run_getq(nni_msgq *mq) msg = nni_aio_get_msg(waio); len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); + nni_aio_finish(waio, 0, len); if (mq->mq_filter_fn != NULL) { msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + len = nni_msg_len(msg); nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); + } else { + nni_atomic_inc64(&mq->mq_discards, 1); } - nni_aio_finish(waio, 0, len); continue; } @@ -338,6 +378,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) } nni_mtx_lock(&mq->mq_lock); if (mq->mq_puterr) { + nni_atomic_inc64(&mq->mq_put_errs, 1); nni_aio_finish_error(aio, mq->mq_puterr); nni_mtx_unlock(&mq->mq_lock); return; @@ -348,6 +389,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) rv = nni_aio_schedule(aio, nni_msgq_cancel, mq); if ((rv != 0) && (mq->mq_len >= mq->mq_cap) && (nni_list_empty(&mq->mq_aio_getq))) { + nni_atomic_inc64(&mq->mq_put_errs, 1); nni_mtx_unlock(&mq->mq_lock); nni_aio_finish_error(aio, rv); return; @@ -370,6 +412,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_mtx_lock(&mq->mq_lock); if (mq->mq_geterr) { nni_mtx_unlock(&mq->mq_lock); + nni_atomic_inc64(&mq->mq_get_errs, 1); nni_aio_finish_error(aio, mq->mq_geterr); return; } @@ -377,6 +420,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) if ((rv != 0) && (mq->mq_len == 0) && (nni_list_empty(&mq->mq_aio_putq))) { nni_mtx_unlock(&mq->mq_lock); + nni_atomic_inc64(&mq->mq_get_errs, 1); nni_aio_finish_error(aio, rv); return; } @@ -392,6 +436,7 @@ int nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) { nni_aio *raio; + size_t len; nni_mtx_lock(&mq->mq_lock); if (mq->mq_closed) { @@ -399,15 +444,22 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) return (NNG_ECLOSED); } + len = nni_msg_len(msg); + // The presence of any blocked reader indicates that // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); if (mq->mq_filter_fn != NULL) { msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); } if (msg != NULL) { + len = nni_msg_len(msg); + nni_atomic_inc64(&mq->mq_get_bytes, len); + nni_atomic_inc64(&mq->mq_get_msgs, 1); nni_list_remove(&mq->mq_aio_getq, raio); nni_aio_finish_msg(raio, msg); nni_msgq_run_notify(mq); @@ -418,6 +470,9 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) // Otherwise if we have room in the buffer, just queue it. if (mq->mq_len < mq->mq_cap) { + nni_atomic_inc64(&mq->mq_put_bytes, len); + nni_atomic_inc64(&mq->mq_put_msgs, 1); + mq->mq_msgs[mq->mq_put++] = msg; if (mq->mq_put == mq->mq_alloc) { mq->mq_put = 0; @@ -514,6 +569,7 @@ nni_msgq_resize(nni_msgq *mq, int cap) mq->mq_get = 0; } mq->mq_len--; + nni_atomic_inc64(&mq->mq_discards, 1); nni_msg_free(msg); } if (newq == NULL) { @@ -586,3 +642,45 @@ nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp) *sp = mq->mq_sendable; return (0); } + +uint64_t +nni_msgq_stat_get_bytes(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_get_bytes)); +} + +uint64_t +nni_msgq_stat_put_bytes(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_put_bytes)); +} + +uint64_t +nni_msgq_stat_get_msgs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_get_msgs)); +} + +uint64_t +nni_msgq_stat_put_msgs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_put_msgs)); +} + +uint64_t +nni_msgq_stat_get_errs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_get_errs)); +} + +uint64_t +nni_msgq_stat_put_errs(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_put_errs)); +} + +uint64_t +nni_msgq_stat_discards(nni_msgq *mq) +{ + return (nni_atomic_get64(&mq->mq_discards)); +} diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 7dc5800d..bd9b3682 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -99,4 +99,13 @@ extern int nni_msgq_len(nni_msgq *mq); extern int nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **); extern int nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **); +// message queues keep statistics +extern uint64_t nni_msgq_stat_get_bytes(nni_msgq *); +extern uint64_t nni_msgq_stat_put_bytes(nni_msgq *); +extern uint64_t nni_msgq_stat_get_msgs(nni_msgq *); +extern uint64_t nni_msgq_stat_put_msgs(nni_msgq *); +extern uint64_t nni_msgq_stat_get_errs(nni_msgq *); +extern uint64_t nni_msgq_stat_put_errs(nni_msgq *); +extern uint64_t nni_msgq_stat_discards(nni_msgq *); + #endif // CORE_MSQUEUE_H diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 9af12720..f28ddf8c 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -41,6 +41,7 @@ #include "core/protocol.h" #include "core/random.h" #include "core/reap.h" +#include "core/stats.h" #include "core/strs.h" #include "core/taskq.h" #include "core/thread.h" diff --git a/src/core/pipe.c b/src/core/pipe.c index bdbc76e0..d623e158 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -11,6 +11,7 @@ #include "core/nng_impl.h" #include "sockimpl.h" +#include <stdio.h> #include <string.h> // This file contains functions relating to pipes. @@ -82,6 +83,7 @@ pipe_destroy(nni_pipe *p) p->p_tran_ops.p_stop(p->p_tran_data); } + nni_stat_remove(&p->p_stats.s_root); nni_pipe_remove(p); if (p->p_proto_data != NULL) { @@ -177,6 +179,28 @@ nni_pipe_peer(nni_pipe *p) return (p->p_tran_ops.p_peer(p->p_tran_data)); } +void +nni_pipe_stats_init(nni_pipe *p) +{ +#ifdef NNG_ENABLE_STATS + nni_pipe_stats *st = &p->p_stats; + + if (p->p_listener != NULL) { + st->s_ep_id.si_name = "listener"; + st->s_ep_id.si_desc = "listener for pipe"; + st->s_ep_id.si_value = nni_listener_id(p->p_listener); + } else { + st->s_ep_id.si_name = "dialer"; + st->s_ep_id.si_desc = "dialer for pipe"; + st->s_ep_id.si_value = nni_dialer_id(p->p_dialer); + } + + nni_stat_append(NULL, &st->s_root); +#else + NNI_ARG_UNUSED(p); +#endif +} + int nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { @@ -184,7 +208,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) int rv; void * sdata = nni_sock_proto_data(sock); nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); - uint64_t id; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... @@ -210,12 +233,27 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) nni_cv_init(&p->p_cv, &nni_pipe_lk); nni_mtx_lock(&nni_pipe_lk); - if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { - p->p_id = (uint32_t) id; + if ((rv = nni_idhash_alloc32(nni_pipes, &p->p_id, p)) == 0) { p->p_refcnt = 1; } nni_mtx_unlock(&nni_pipe_lk); + nni_pipe_stats *st = &p->p_stats; + snprintf(st->s_scope, sizeof(st->s_scope), "pipe%u", p->p_id); + + nni_stat_init_scope(&st->s_root, st->s_scope, "pipe statistics"); + + nni_stat_init_id(&st->s_id, "id", "pipe id", p->p_id); + nni_stat_append(&st->s_root, &st->s_id); + + // name and description fleshed out later. + nni_stat_init_id(&st->s_ep_id, "", "", 0); + nni_stat_append(&st->s_root, &st->s_ep_id); + + nni_stat_init_id(&st->s_sock_id, "socket", "socket for pipe", + nni_sock_id(p->p_sock)); + nni_stat_append(&st->s_root, &st->s_sock_id); + if ((rv != 0) || ((rv = tran->tran_pipe->p_init(tdata, p)) != 0) || ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) { nni_pipe_close(p); @@ -274,3 +312,9 @@ nni_pipe_dialer_id(nni_pipe *p) { return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0); } + +void +nni_pipe_add_stat(nni_pipe *p, nni_stat_item *item) +{ + nni_stat_append(&p->p_stats.s_root, item); +} diff --git a/src/core/pipe.h b/src/core/pipe.h index 1e2f2b5d..5a83059f 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -62,4 +62,7 @@ extern uint32_t nni_pipe_dialer_id(nni_pipe *); // nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find. extern void nni_pipe_rele(nni_pipe *); +// nni_pipe_add_stat adds a statistic to the pipe +extern void nni_pipe_add_stat(nni_pipe *p, nni_stat_item *); + #endif // CORE_PIPE_H diff --git a/src/core/socket.c b/src/core/socket.c index b2f331fb..63696b36 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -53,6 +53,22 @@ typedef struct nni_sock_pipe_cb { void * cb_arg; } nni_sock_pipe_cb; +typedef struct sock_stats { + nni_stat_item s_root; // socket scope + nni_stat_item s_id; // socket id + nni_stat_item s_name; // socket name + nni_stat_item s_protocol; // socket protocol + nni_stat_item s_ndialers; // number of dialers + nni_stat_item s_nlisteners; // number of listeners + nni_stat_item s_npipes; // number of pipes + nni_stat_item s_rxbytes; // number of bytes received + nni_stat_item s_txbytes; // number of bytes received + nni_stat_item s_rxmsgs; // number of msgs received + nni_stat_item s_txmsgs; // number of msgs sent + nni_stat_item s_protorej; // pipes rejected by protocol + nni_stat_item s_apprej; // pipes rejected by application +} sock_stats; + struct nni_socket { nni_list_node s_node; nni_mtx s_mx; @@ -82,6 +98,7 @@ struct nni_socket { size_t s_rcvmaxsz; // max receive size nni_list s_options; // opts not handled by sock/proto char s_name[64]; // socket name (legacy compat) + char s_scope[24]; // socket scope ("socket%u", 32 bits max) nni_list s_listeners; // active listeners nni_list s_dialers; // active dialers @@ -94,6 +111,8 @@ struct nni_socket { nni_mtx s_pipe_cbs_mtx; nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM]; + + sock_stats s_stats; }; static void nni_ctx_destroy(nni_ctx *); @@ -400,6 +419,81 @@ nni_sock_rele(nni_sock *s) } static void +sock_stats_fini(nni_sock *s) +{ +#ifdef NNG_ENABLE_STATS + sock_stats *st = &s->s_stats; + nni_stat_remove(&st->s_root); +#else + NNI_ARG_UNUSED(s); +#endif +} + +static void +sock_stats_init(nni_sock *s) +{ +#ifdef NNG_ENABLE_STATS + sock_stats * st = &s->s_stats; + nni_stat_item *root = &s->s_stats.s_root; + + // To make collection cheap and atomic for the socket, + // we just use a single lock for the entire chain. + + nni_stat_init_scope(root, s->s_scope, "socket statistics"); + + nni_stat_init_id(&st->s_id, "id", "socket id", s->s_id); + nni_stat_append(root, &st->s_id); + + nni_stat_init_string(&st->s_name, "name", "socket name", s->s_name); + nni_stat_set_lock(&st->s_name, &s->s_mx); + nni_stat_append(root, &st->s_name); + + nni_stat_init_string(&st->s_protocol, "protocol", "socket protocol", + nni_sock_proto_name(s)); + nni_stat_append(root, &st->s_protocol); + + nni_stat_init_atomic(&st->s_ndialers, "ndialers", "open dialers"); + nni_stat_set_type(&st->s_ndialers, NNG_STAT_LEVEL); + nni_stat_append(root, &st->s_ndialers); + + nni_stat_init_atomic( + &st->s_nlisteners, "nlisteners", "open listeners"); + nni_stat_set_type(&st->s_nlisteners, NNG_STAT_LEVEL); + nni_stat_append(root, &st->s_nlisteners); + + nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes"); + nni_stat_set_type(&st->s_npipes, NNG_STAT_LEVEL); + nni_stat_append(root, &st->s_npipes); + + nni_stat_init_atomic(&st->s_rxbytes, "rxbytes", "bytes received"); + nni_stat_set_unit(&st->s_rxbytes, NNG_UNIT_BYTES); + nni_stat_append(root, &st->s_rxbytes); + + nni_stat_init_atomic(&st->s_txbytes, "txbytes", "bytes sent"); + nni_stat_set_unit(&st->s_txbytes, NNG_UNIT_BYTES); + nni_stat_append(root, &st->s_txbytes); + + nni_stat_init_atomic(&st->s_rxmsgs, "rxmsgs", "messages received"); + nni_stat_set_unit(&st->s_rxmsgs, NNG_UNIT_MESSAGES); + nni_stat_append(root, &st->s_rxmsgs); + + nni_stat_init_atomic(&st->s_txmsgs, "txmsgs", "messages sent"); + nni_stat_set_unit(&st->s_txmsgs, NNG_UNIT_MESSAGES); + nni_stat_append(root, &st->s_txmsgs); + + nni_stat_init_atomic( + &st->s_protorej, "protoreject", "pipes rejected by protocol"); + nni_stat_append(root, &st->s_protorej); + + nni_stat_init_atomic( + &st->s_apprej, "appreject", "pipes rejected by application"); + nni_stat_append(root, &st->s_apprej); +#else + NNI_ARG_UNUSED(s); +#endif +} + +static void sock_destroy(nni_sock *s) { nni_sockopt *sopt; @@ -418,6 +512,7 @@ sock_destroy(nni_sock *s) nni_mtx_lock(&s->s_mx); nni_mtx_unlock(&s->s_mx); + sock_stats_fini(s); nni_msgq_fini(s->s_urq); nni_msgq_fini(s->s_uwq); nni_cv_fini(&s->s_close_cv); @@ -469,6 +564,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); nni_cv_init(&s->s_close_cv, &sock_lk); + sock_stats_init(s); if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 1)) != 0) || @@ -567,7 +663,14 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) // Set the sockname. (void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id); - return (rv); + // Set up basic stat values. + (void) snprintf(s->s_scope, sizeof(s->s_scope), "socket%u", s->s_id); + nni_stat_set_value(&s->s_stats.s_id, s->s_id); + + // Add our stats chain. + nni_stat_append(NULL, &s->s_stats.s_root); + + return (0); } // nni_sock_shutdown shuts down the socket; after this point no @@ -699,6 +802,8 @@ nni_sock_close(nni_sock *s) // is idempotent. nni_sock_shutdown(s); + nni_stat_remove(&s->s_stats.s_root); + nni_mtx_lock(&sock_lk); if (s->s_closed) { // Some other thread called close. All we need to do @@ -830,6 +935,9 @@ nni_sock_add_listener(nni_sock *s, nni_listener *l) } nni_list_append(&s->s_listeners, l); + + nni_stat_inc_atomic(&s->s_stats.s_nlisteners, 1); + nni_mtx_unlock(&s->s_mx); return (0); } @@ -856,34 +964,11 @@ nni_sock_add_dialer(nni_sock *s, nni_dialer *d) } nni_list_append(&s->s_dialers, d); - nni_mtx_unlock(&s->s_mx); - return (0); -} -void -nni_sock_remove_listener(nni_sock *s, nni_listener *l) -{ - nni_mtx_lock(&s->s_mx); - if (nni_list_active(&s->s_listeners, l)) { - nni_list_remove(&s->s_listeners, l); - if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { - nni_cv_wake(&s->s_cv); - } - } - nni_mtx_unlock(&s->s_mx); -} + nni_stat_inc_atomic(&s->s_stats.s_ndialers, 1); -void -nni_sock_remove_dialer(nni_sock *s, nni_dialer *d) -{ - nni_mtx_lock(&s->s_mx); - if (nni_list_active(&s->s_dialers, d)) { - nni_list_remove(&s->s_dialers, d); - if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { - nni_cv_wake(&s->s_cv); - } - } nni_mtx_unlock(&s->s_mx); + return (0); } int @@ -1398,13 +1483,25 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) d->d_pipe = p; d->d_currtime = d->d_inirtime; nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&s->s_stats.s_npipes, 1); + nni_stat_inc_atomic(&d->d_stats.s_npipes, 1); + + nni_pipe_stats_init(p); nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); nni_mtx_lock(&s->s_mx); - if ((p->p_closed) || - (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + if (p->p_closed) { nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&d->d_stats.s_apprej, 1); + nni_stat_inc_atomic(&s->s_stats.s_apprej, 1); + nni_pipe_rele(p); + return; + } + if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&d->d_stats.s_protorej, 1); + nni_stat_inc_atomic(&s->s_stats.s_protorej, 1); nni_pipe_close(p); nni_pipe_rele(p); return; @@ -1493,17 +1590,30 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_mtx_unlock(&s->s_mx); return; } + p->p_listener = l; nni_list_append(&l->l_pipes, p); nni_list_append(&s->s_pipes, p); nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&l->l_stats.s_npipes, 1); + nni_stat_inc_atomic(&s->s_stats.s_npipes, 1); + + nni_pipe_stats_init(p); nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); nni_mtx_lock(&s->s_mx); - if ((p->p_closed) || - (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + if (p->p_closed) { nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&l->l_stats.s_apprej, 1); + nni_stat_inc_atomic(&s->s_stats.s_apprej, 1); + nni_pipe_rele(p); + return; + } + if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&l->l_stats.s_protorej, 1); + nni_stat_inc_atomic(&s->s_stats.s_protorej, 1); nni_pipe_close(p); nni_pipe_rele(p); return; @@ -1616,6 +1726,15 @@ nni_pipe_remove(nni_pipe *p) nni_dialer *d = p->p_dialer; nni_mtx_lock(&s->s_mx); + if (nni_list_node_active(&p->p_sock_node)) { + nni_stat_dec_atomic(&s->s_stats.s_npipes, 1); + } + if (p->p_listener != NULL) { + nni_stat_dec_atomic(&p->p_listener->l_stats.s_npipes, 1); + } + if (p->p_dialer != NULL) { + nni_stat_dec_atomic(&p->p_dialer->d_stats.s_npipes, 1); + } nni_list_node_remove(&p->p_sock_node); nni_list_node_remove(&p->p_ep_node); p->p_listener = NULL; @@ -1629,3 +1748,38 @@ nni_pipe_remove(nni_pipe *p) } nni_mtx_unlock(&s->s_mx); } + +void +nni_sock_add_stat(nni_sock *s, nni_stat_item *stat) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_append(&s->s_stats.s_root, stat); +#else + NNI_ARG_UNUSED(s); + NNI_ARG_UNUSED(stat); +#endif +} + +void +nni_sock_bump_tx(nni_sock *s, uint64_t sz) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_inc_atomic(&s->s_stats.s_txmsgs, 1); + nni_stat_inc_atomic(&s->s_stats.s_txbytes, sz); +#else + NNI_ARG_UNUSED(s); + NNI_ARG_UNUSED(sz); +#endif +} + +void +nni_sock_bump_rx(nni_sock *s, uint64_t sz) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_inc_atomic(&s->s_stats.s_rxmsgs, 1); + nni_stat_inc_atomic(&s->s_stats.s_rxbytes, sz); +#else + NNI_ARG_UNUSED(s); + NNI_ARG_UNUSED(sz); +#endif +} diff --git a/src/core/socket.h b/src/core/socket.h index 4b9c4642..5486918e 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -25,6 +25,7 @@ extern uint16_t nni_sock_peer_id(nni_sock *); extern const char *nni_sock_proto_name(nni_sock *); extern const char *nni_sock_peer_name(nni_sock *); extern void * nni_sock_proto_data(nni_sock *); +extern void nni_sock_add_stat(nni_sock *, nni_stat_item *); extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *); @@ -107,4 +108,12 @@ extern int nni_ctx_getopt( extern int nni_ctx_setopt( nni_ctx *, const char *, const void *, size_t, nni_opt_type); +// nni_sock_bump_rx is called by a protocol when a message is received by +// a consuming app. It bumps the rxmsgs by one and rxbytes by the size. +extern void nni_sock_bump_rx(nni_sock *s, uint64_t sz); + +// nni_sock_bump_rx is called by a protocol when a message is sent by +// a consuming app. It bumps the txmsgs by one and txbytes by the size. +extern void nni_sock_bump_tx(nni_sock *s, uint64_t sz); + #endif // CORE_SOCKET_H diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index be454d8a..ae1fd92e 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -15,6 +15,22 @@ // and pipes. This must not be exposed to other subsystems -- these internals // are subject to change at any time. +typedef struct nni_dialer_stats { + nni_stat_item s_root; + nni_stat_item s_id; + nni_stat_item s_sock; + nni_stat_item s_url; + nni_stat_item s_npipes; + nni_stat_item s_connok; + nni_stat_item s_refused; + nni_stat_item s_canceled; + nni_stat_item s_timedout; + nni_stat_item s_othererr; + nni_stat_item s_protorej; + nni_stat_item s_apprej; + char s_scope[24]; // scope name for stats +} nni_dialer_stats; + struct nni_dialer { nni_tran_dialer_ops d_ops; // transport ops nni_tran * d_tran; // transport pointer @@ -38,8 +54,25 @@ struct nni_dialer { nni_duration d_inirtime; // initial time for reconnect nni_time d_conntime; // time of last good connect nni_reap_item d_reap; + nni_dialer_stats d_stats; }; +typedef struct nni_listener_stats { + nni_stat_item s_root; + nni_stat_item s_id; + nni_stat_item s_sock; + nni_stat_item s_url; + nni_stat_item s_npipes; + nni_stat_item s_accept; + nni_stat_item s_aborted; // aborted remotely + nni_stat_item s_timedout; + nni_stat_item s_canceled; + nni_stat_item s_othererr; + nni_stat_item s_protorej; + nni_stat_item s_apprej; + char s_scope[24]; // scope name for stats +} nni_listener_stats; + struct nni_listener { nni_tran_listener_ops l_ops; // transport ops nni_tran * l_tran; // transport pointer @@ -56,8 +89,17 @@ struct nni_listener { nni_aio * l_acc_aio; nni_aio * l_tmo_aio; nni_reap_item l_reap; + nni_listener_stats l_stats; }; +typedef struct nni_pipe_stats { + nni_stat_item s_root; + nni_stat_item s_id; + nni_stat_item s_ep_id; + nni_stat_item s_sock_id; + char s_scope[16]; // scope name for stats ("pipe" is short) +} nni_pipe_stats; + struct nni_pipe { uint32_t p_id; nni_tran_pipe_ops p_tran_ops; @@ -76,13 +118,11 @@ struct nni_pipe { nni_mtx p_mtx; nni_cv p_cv; nni_reap_item p_reap; + nni_pipe_stats p_stats; }; -extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); -extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); - -extern int nni_sock_add_listener(nni_sock *, nni_listener *); -extern void nni_sock_remove_listener(nni_sock *, nni_listener *); +extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); +extern int nni_sock_add_listener(nni_sock *, nni_listener *); extern void nni_dialer_add_pipe(nni_dialer *, void *); extern void nni_dialer_shutdown(nni_dialer *); @@ -101,5 +141,6 @@ extern void nni_pipe_remove(nni_pipe *); extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev); extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *, void *); extern void nni_pipe_start(nni_pipe *); +extern void nni_pipe_stats_init(nni_pipe *); #endif // CORE_SOCKIMPL_H diff --git a/src/core/stats.c b/src/core/stats.c new file mode 100644 index 00000000..0363a932 --- /dev/null +++ b/src/core/stats.c @@ -0,0 +1,525 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdio.h> +#include <string.h> + +#include "core/nng_impl.h" + +typedef struct nng_stat nni_stat; + +struct nng_stat { + char * s_name; + const char * s_desc; + const char * s_string; + uint64_t s_value; + nni_time s_time; + nni_stat_type s_type; + nni_stat_unit s_unit; + nni_stat_item *s_item; // Used during snapshot collection + nni_list s_children; + nni_stat * s_parent; + nni_list_node s_node; +}; + +#ifdef NNG_ENABLE_STATS +static nni_stat_item stats_root; +static nni_mtx stats_lock; +static nni_mtx * stats_held = NULL; +#endif + +void +nni_stat_append(nni_stat_item *parent, nni_stat_item *child) +{ +#ifdef NNG_ENABLE_STATS + if (parent == NULL) { + parent = &stats_root; + } + nni_mtx_lock(&stats_lock); + // Make sure that the lists for both children and parents + // are correctly initialized. + if (parent->si_children.ll_head.ln_next == NULL) { + NNI_LIST_INIT(&parent->si_children, nni_stat_item, si_node); + } + if (child->si_children.ll_head.ln_next == NULL) { + NNI_LIST_INIT(&child->si_children, nni_stat_item, si_node); + } + nni_list_append(&parent->si_children, child); + child->si_parent = parent; + nni_mtx_unlock(&stats_lock); +#else + NNI_ARG_UNUSED(parent); + NNI_ARG_UNUSED(child); +#endif +} + +void +nni_stat_remove(nni_stat_item *child) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_item *parent; + nni_mtx_lock(&stats_lock); + if ((parent = child->si_parent) != NULL) { + nni_list_remove(&parent->si_children, child); + child->si_parent = NULL; + } + nni_mtx_unlock(&stats_lock); +#else + NNI_ARG_UNUSED(child); +#endif +} + +#ifdef NNG_ENABLE_STATS +void +nni_stat_init(nni_stat_item *stat, const char *name, const char *desc) +{ + NNI_LIST_INIT(&stat->si_children, nni_stat_item, si_node); + stat->si_parent = NULL; + stat->si_name = name; + stat->si_desc = desc; + stat->si_lock = NULL; + stat->si_update = NULL; + stat->si_private = NULL; + stat->si_string = NULL; + stat->si_value = 0; + stat->si_type = NNG_STAT_COUNTER; + stat->si_unit = NNG_UNIT_NONE; +} + +void +nni_stat_init_scope(nni_stat_item *stat, const char *name, const char *desc) +{ + nni_stat_init(stat, name, desc); + stat->si_type = NNG_STAT_SCOPE; + stat->si_unit = NNG_UNIT_NONE; +} + +void +nni_stat_init_string( + nni_stat_item *stat, const char *name, const char *desc, const char *str) +{ + nni_stat_init(stat, name, desc); + stat->si_string = str; + stat->si_type = NNG_STAT_STRING; + stat->si_unit = NNG_UNIT_NONE; +} + +void +nni_stat_init_id( + nni_stat_item *stat, const char *name, const char *desc, uint64_t id) +{ + nni_stat_init(stat, name, desc); + stat->si_value = id; + stat->si_type = NNG_STAT_ID; + stat->si_unit = NNG_UNIT_NONE; +} + +void +nni_stat_init_bool( + nni_stat_item *stat, const char *name, const char *desc, bool v) +{ + nni_stat_init(stat, name, desc); + stat->si_value = v ? 1 : 0; + stat->si_type = NNG_STAT_BOOLEAN; + stat->si_unit = NNG_UNIT_NONE; +} + +static void +stat_atomic_update(nni_stat_item *stat, void *notused) +{ + NNI_ARG_UNUSED(notused); + stat->si_value = nni_atomic_get64(&stat->si_atomic); +} + +void +nni_stat_init_atomic(nni_stat_item *stat, const char *name, const char *desc) +{ + + nni_stat_init(stat, name, desc); + stat->si_value = 0; + stat->si_private = NULL; + stat->si_update = stat_atomic_update; + nni_atomic_init64(&stat->si_atomic); +} + +void +nni_stat_inc_atomic(nni_stat_item *stat, uint64_t inc) +{ + nni_atomic_inc64(&stat->si_atomic, inc); +} + +void +nni_stat_dec_atomic(nni_stat_item *stat, uint64_t inc) +{ + nni_atomic_dec64(&stat->si_atomic, inc); +} +#endif + +void +nni_stat_set_value(nni_stat_item *stat, uint64_t v) +{ +#ifdef NNG_ENABLE_STATS + stat->si_value = v; +#else + NNI_ARG_UNUSED(stat); + NNI_ARG_UNUSED(v); +#endif +} + +void +nni_stat_set_string(nni_stat_item *stat, const char *str) +{ +#ifdef NNG_ENABLE_STATS + stat->si_string = str; +#else + NNI_ARG_UNUSED(stat); + NNI_ARG_UNUSED(str); +#endif +} + +void +nni_stat_set_lock(nni_stat_item *stat, nni_mtx *mtx) +{ +#ifdef NNG_ENABLE_STATS + stat->si_lock = mtx; +#else + NNI_ARG_UNUSED(stat); + NNI_ARG_UNUSED(mtx); +#endif +} + +void +nni_stat_set_update(nni_stat_item *stat, nni_stat_update f, void *a) +{ +#ifdef NNG_ENABLE_STATS + stat->si_update = f; + stat->si_private = a; +#else + NNI_ARG_UNUSED(stat); + NNI_ARG_UNUSED(f); + NNI_ARG_UNUSED(a); +#endif +} + +#ifdef NNG_ENABLE_STATS +void +nni_stat_set_type(nni_stat_item *stat, int type) +{ + stat->si_type = type; +} + +void +nni_stat_set_unit(nni_stat_item *stat, int unit) +{ + stat->si_unit = unit; +} +#endif + +void +nng_stats_free(nni_stat *st) +{ +#ifdef NNG_ENABLE_STATS + nni_stat *child; + + while ((child = nni_list_first(&st->s_children)) != NULL) { + nni_list_remove(&st->s_children, child); + nng_stats_free(child); + } + nni_strfree(st->s_name); + NNI_FREE_STRUCT(st); +#else + NNI_ARG_UNUSED(st); +#endif +} + +#ifdef NNG_ENABLE_STATS +static int +stat_make_tree(nni_stat_item *item, nni_stat **sp) +{ + nni_stat * stat; + nni_stat_item *child; + + if ((stat = NNI_ALLOC_STRUCT(stat)) == NULL) { + return (NNG_ENOMEM); + } + if ((stat->s_name = nni_strdup(item->si_name)) == NULL) { + NNI_FREE_STRUCT(stat); + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&stat->s_children, nni_stat, s_node); + stat->s_item = item; + stat->s_type = item->si_type; + stat->s_unit = item->si_unit; + stat->s_desc = item->si_desc; + stat->s_parent = NULL; + + NNI_LIST_FOREACH (&item->si_children, child) { + nni_stat *cs; + int rv; + if ((rv = stat_make_tree(child, &cs)) != 0) { + nng_stats_free(stat); + return (rv); + } + nni_list_append(&stat->s_children, cs); + cs->s_parent = stat; + } + *sp = stat; + return (0); +} + +static void +stat_update(nni_stat *stat) +{ + nni_stat_item *item = stat->s_item; + + if (item->si_lock != stats_held) { + if (stats_held != NULL) { + nni_mtx_unlock(stats_held); + stats_held = NULL; + } + if (item->si_lock != NULL) { + nni_mtx_lock(item->si_lock); + stats_held = item->si_lock; + } + } + if (item->si_update != NULL) { + item->si_update(item, item->si_private); + } + stat->s_value = item->si_value; + stat->s_string = item->si_string; + stat->s_time = nni_clock(); +} + +static void +stat_update_tree(nni_stat *stat) +{ + nni_stat *child; + stat_update(stat); + NNI_LIST_FOREACH (&stat->s_children, child) { + stat_update_tree(child); + } +} + +int +nni_stat_snapshot(nni_stat **statp, nni_stat_item *item) +{ + int rv; + nni_stat *stat; + + if (item == NULL) { + item = &stats_root; + } + nni_mtx_lock(&stats_lock); + if ((rv = stat_make_tree(item, &stat)) != 0) { + nni_mtx_unlock(&stats_lock); + return (rv); + } + stat_update_tree(stat); + if (stats_held != NULL) { + nni_mtx_unlock(stats_held); + stats_held = NULL; + } + nni_mtx_unlock(&stats_lock); + *statp = stat; + return (0); +} +#endif + +int +nng_stats_get(nng_stat **statp) +{ +#ifdef NNG_ENABLE_STATS + return (nni_stat_snapshot(statp, &stats_root)); +#else + NNI_ARG_UNUSED(statp); + return (NNG_ENOTSUP); +#endif +} + +nng_stat * +nng_stat_parent(nng_stat *stat) +{ + return (stat->s_parent); +} + +nng_stat * +nng_stat_next(nng_stat *stat) +{ + if (stat->s_parent == NULL) { + return (NULL); // Root node, no siblings. + } + return (nni_list_next(&stat->s_parent->s_children, stat)); +} + +nng_stat * +nng_stat_child(nng_stat *stat) +{ + return (nni_list_first(&stat->s_children)); +} + +const char * +nng_stat_name(nni_stat *stat) +{ + return (stat->s_name); +} + +uint64_t +nng_stat_value(nni_stat *stat) +{ + return (stat->s_value); +} + +const char * +nng_stat_string(nng_stat *stat) +{ + return (stat->s_string); +} + +uint64_t +nng_stat_timestamp(nng_stat *stat) +{ + return ((uint64_t) stat->s_time); +} + +int +nng_stat_type(nng_stat *stat) +{ + return (stat->s_type); +} + +int +nng_stat_unit(nng_stat *stat) +{ + return (stat->s_unit); +} + +const char * +nng_stat_desc(nng_stat *stat) +{ + return (stat->s_desc); +} + +int +nni_stat_sys_init(void) +{ +#ifdef NNG_ENABLE_STATS + nni_mtx_init(&stats_lock); + NNI_LIST_INIT(&stats_root.si_children, nni_stat_item, si_node); + stats_root.si_name = ""; + stats_root.si_desc = "all statistsics"; +#endif + + return (0); +} + +void +nni_stat_sys_fini(void) +{ +#ifdef NNG_ENABLE_STATS + nni_mtx_fini(&stats_lock); +#endif +} + +#ifdef NNG_ENABLE_STATS +void +stat_sprint_scope(nni_stat *stat, char **scope, int *lenp) +{ + if (stat->s_parent != NULL) { + stat_sprint_scope(stat->s_parent, scope, lenp); + } + if (strlen(stat->s_name) > 0) { + snprintf(*scope, *lenp, "%s.", stat->s_name); + } else { + (*scope)[0] = '\0'; + } + *lenp -= strlen(*scope); + *scope += strlen(*scope); +} +#endif + +void +nng_stats_dump(nng_stat *stat) +{ +#ifdef NNG_ENABLE_STATS + static char buf[128]; // to minimize recursion, not thread safe + static char line[128]; + int len; + char * scope; + char * indent = " "; + unsigned long long val; + nni_stat * child; + + switch (nng_stat_type(stat)) { + case NNG_STAT_SCOPE: + scope = buf; + len = sizeof(buf); + stat_sprint_scope(stat, &scope, &len); + len = strlen(buf); + if (len > 0) { + if (buf[len - 1] == '.') { + buf[--len] = '\0'; + } + } + if (len > 0) { + snprintf(line, sizeof(line), "\n%s:", buf); + } + break; + case NNG_STAT_STRING: + snprintf(line, sizeof(line), "%s%-32s\"%s\"", indent, + nng_stat_name(stat), nng_stat_string(stat)); + break; + case NNG_STAT_BOOLEAN: + val = nng_stat_value(stat); + snprintf(line, sizeof(line), "%s%-32s%s", indent, + nng_stat_name(stat), val != 0 ? "true" : "false"); + break; + case NNG_STAT_LEVEL: + case NNG_STAT_COUNTER: + val = nng_stat_value(stat); + switch (nng_stat_unit(stat)) { + case NNG_UNIT_BYTES: + snprintf(line, sizeof(line), "%s%-32s%llu bytes", + indent, nng_stat_name(stat), val); + break; + case NNG_UNIT_MESSAGES: + snprintf(line, sizeof(line), "%s%-32s%llu msgs", + indent, nng_stat_name(stat), val); + break; + case NNG_UNIT_MILLIS: + snprintf(line, sizeof(line), "%s%-32s%llu msec", + indent, nng_stat_name(stat), val); + break; + case NNG_UNIT_NONE: + case NNG_UNIT_EVENTS: + default: + snprintf(line, sizeof(line), "%s%-32s%llu", indent, + nng_stat_name(stat), val); + break; + } + break; + case NNG_STAT_ID: + val = nng_stat_value(stat); + snprintf(line, (sizeof line), "%s%-32s%llu", indent, + nng_stat_name(stat), val); + break; + default: + snprintf(line, (sizeof line), "%s%-32s<?>", indent, + nng_stat_name(stat)); + break; + } + nni_plat_println(line); + + NNI_LIST_FOREACH (&stat->s_children, child) { + nng_stats_dump(child); + } +#else + NNI_ARG_UNUSED(stat); +#endif +} diff --git a/src/core/stats.h b/src/core/stats.h new file mode 100644 index 00000000..8cfe14de --- /dev/null +++ b/src/core/stats.h @@ -0,0 +1,108 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_STATS_H +#define CORE_STATS_H + +#include "core/defs.h" + +// Statistics support. This is inspired in part by the Solaris +// kstats framework, but we've simplified and tuned it for our use. +// +// Collection of the stats will be done in two steps. First we +// will walk the list of stats, with the chain held, allocating +// a user local copy of the stat and pointers. +// +// In phase 2, we run the update, and copy the values. We conditionally +// acquire the lock on the stat first though. + +typedef struct nni_stat_item nni_stat_item; + +typedef void (*nni_stat_update)(nni_stat_item *, void *); +typedef enum nng_stat_type_enum nni_stat_type; +typedef enum nng_unit_enum nni_stat_unit; + +// nni_stat_item is used by providers +struct nni_stat_item { +#ifdef NNG_ENABLE_STATS + nni_list_node si_node; // list node, framework use only + nni_stat_item * si_parent; // link back to parent, framework use only + nni_list si_children; // children, framework use only + const char * si_name; // name of statistic + const char * si_desc; // description of statistic (English) + nni_mtx * si_lock; // lock for accessing, can be NULL + void * si_private; // provider private pointer + nni_stat_type si_type; // type of stat, e.g. NNG_STAT_LEVEL + nni_stat_unit si_unit; // units, e.g. NNG_UNIT_MILLIS + nni_stat_update si_update; // update function (can be NULL) + const char * si_string; // string value (NULL for numerics) + uint64_t si_value; // numeric value + nni_atomic_u64 si_atomic; // atomic value +#endif +}; + +void nni_stat_append(nni_stat_item *, nni_stat_item *); +void nni_stat_remove(nni_stat_item *); + +void nni_stat_set_value(nni_stat_item *, uint64_t); +void nni_stat_set_string(nni_stat_item *, const char *); +void nni_stat_set_lock(nni_stat_item *, nni_mtx *); +void nni_stat_set_update(nni_stat_item *, nni_stat_update, void *); + +#ifdef NNG_ENABLE_STATS +void nni_stat_init(nni_stat_item *, const char *, const char *); +void nni_stat_init_scope(nni_stat_item *, const char *, const char *); +void nni_stat_init_string( + nni_stat_item *, const char *, const char *, const char *); +void nni_stat_init_id(nni_stat_item *, const char *, const char *, uint64_t); +void nni_stat_init_bool(nni_stat_item *, const char *, const char *, bool); +void nni_stat_init_atomic(nni_stat_item *, const char *, const char *); +void nni_stat_inc_atomic(nni_stat_item *, uint64_t); +void nni_stat_dec_atomic(nni_stat_item *, uint64_t); +void nni_stat_set_type(nni_stat_item *, int); +void nni_stat_set_unit(nni_stat_item *, int); +#else +// We override initialization so that we can avoid compiling static strings +// into the binary. Presumably if stats are disabled, we are trying to save +// space for constrained environments. We do evaluate an unused arg to +// prevent the compiler from bitching about unused values. +#define nni_stat_init(a, b, c) ((void) (a)) +#define nni_stat_init_scope(a, b, c) ((void) (a)) +#define nni_stat_init_atomic(a, b, c) ((void) (a)) +#define nni_stat_init_id(a, b, c, d) ((void) (a)) +#define nni_stat_init_bool(a, b, c, d) ((void) (a)) +#define nni_stat_init_string(a, b, c, d) ((void) (a)) +#define nni_stat_set_unit(a, b) ((void) (a)) +#define nni_stat_set_type(a, b) ((void) (a)) +#define nni_stat_inc_atomic(stat, inc) +#define nni_stat_dec_atomic(stat, inc) +#endif + +#if 0 +#define nni_stat_append(a, b) +#define nni_stat_remove(a) +#define nni_stat_init(a, b, c) +#define nni_stat_init_scope(a, b, c) +#define nni_stat_init_string(a, b, c, d) +#define nni_stat_init_id(a, b, c, d) +#define nni_stat_init_bool(a, b, c, d) +#define nni_stat_dec_atomic(a, b) +#define nni_stat_set_value(a, b) +#define nni_stat_set_string(a, b) +#define nni_stat_set_lock(a, b) +#define nni_stat_set_update(a, b, c) +#define nni_stat_set_type(a, b) +#define nni_stat_set_unit(a, b) +#endif + +int nni_stat_sys_init(void); +void nni_stat_sys_fini(void); + +#endif // CORE_STATS_H @@ -1594,74 +1594,6 @@ nng_aio_begin(nng_aio *aio) return (true); } -#if 0 -int -nng_snapshot_create(nng_socket sock, nng_snapshot **snapp) -{ - // Stats TBD. - NNI_ARG_UNUSED(sock) - NNI_ARG_UNUSED(snapp) - return (NNG_ENOTSUP); -} - -void -nng_snapshot_free(nng_snapshot *snap) -{ - NNI_ARG_UNUSED(snap) - // Stats TBD. -} - -int -nng_snapshot_update(nng_snapshot *snap) -{ - NNI_ARG_UNUSED(snap) - // Stats TBD. - return (NNG_ENOTSUP); -} - -int -nng_snapshot_next(nng_snapshot *snap, nng_stat **statp) -{ - NNI_ARG_UNUSED(snap) - NNI_ARG_UNUSED(statp) - // Stats TBD. - *statp = NULL; - return (NNG_ENOTSUP); -} - -const char * -nng_stat_name(nng_stat *stat) -{ - NNI_ARG_UNUSED(stat) - // Stats TBD. - return (NULL); -} - -int -nng_stat_type(nng_stat *stat) -{ - NNI_ARG_UNUSED(stat) - // Stats TBD. - return (0); -} - -int -nng_stat_unit(nng_stat *stat) -{ - NNI_ARG_UNUSED(stat) - // Stats TBD. - return (0); -} - -int64_t -nng_stat_value(nng_stat *stat) -{ - NNI_ARG_UNUSED(stat) - // Stats TBD. - return (0); -} -#endif - int nng_url_parse(nng_url **result, const char *ustr) { @@ -90,11 +90,10 @@ typedef struct nng_socket_s { uint32_t id; } nng_socket; -typedef int32_t nng_duration; // in milliseconds -typedef struct nng_msg nng_msg; -typedef struct nng_snapshot nng_snapshot; -typedef struct nng_stat nng_stat; -typedef struct nng_aio nng_aio; +typedef int32_t nng_duration; // in milliseconds +typedef struct nng_msg nng_msg; +typedef struct nng_stat nng_stat; +typedef struct nng_aio nng_aio; // Initializers. // clang-format off @@ -762,36 +761,37 @@ enum nng_flag_enum { // but the individual statistic names, values, and meanings are all // subject to change. -// nng_snapshot_create creates a statistics snapshot. The snapshot -// object must be deallocated expressly by the user, and may persist beyond -// the lifetime of any socket object used to update it. Note that the -// values of the statistics are initially unset. -// NNG_DECL int nng_snapshot_create(nng_socket, nng_snapshot **); - -// nng_snapshot_free frees a snapshot object. All statistic objects -// contained therein are destroyed as well. -// NNG_DECL void nng_snapshot_free(nng_snapshot *); - -// nng_snapshot_update updates a snapshot of all the statistics -// relevant to a particular socket. All prior values are overwritten. -// NNG_DECL int nng_snapshot_update(nng_snapshot *); - -// nng_snapshot_next is used to iterate over the individual statistic -// objects inside the snapshot. Note that the statistic object, and the -// meta-data for the object (name, type, units) is fixed, and does not -// change for the entire life of the snapshot. Only the value -// is subject to change, and then only when a snapshot is updated. -// -// Iteration begins by providing NULL in the value referenced. Successive -// calls will update this value, returning NULL when no more statistics -// are available in the snapshot. -// NNG_DECL int nng_snapshot_next(nng_snapshot *, nng_stat **); +// nng_stats_get takes a snapshot of the entire set of statistics. +// While the operation can be somewhat expensive (allocations), it +// is done in a way that minimizes impact to running operations. +// Note that the statistics are provided as tree, with parents +// used for grouping, and with child statistics underneath. The +// top stat returned will be of type NNG_STAT_SCOPE with name "". +// Applications may choose to consider this root scope as "root", if +// the empty string is not suitable. +NNG_DECL int nng_stats_get(nng_stat **); + +// nng_stats_free frees a previous list of snapshots. This should only +// be called on the parent statistic that obtained via nng_stats_get. +NNG_DECL void nng_stats_free(nng_stat *); + +// nng_stats_dump is a debugging function that dumps the entire set of +// statistics to stdout. +NNG_DECL void nng_stats_dump(nng_stat *); + +// nng_stat_next finds the next sibling for the current stat. If there +// are no more siblings, it returns NULL. +NNG_DECL nng_stat *nng_stat_next(nng_stat *); + +// nng_stat_child finds the first child of the current stat. If no children +// exist, then NULL is returned. +NNG_DECL nng_stat *nng_stat_child(nng_stat *); // nng_stat_name is used to determine the name of the statistic. // This is a human readable name. Statistic names, as well as the presence // or absence or semantic of any particular statistic are not part of any // stable API, and may be changed without notice in future updates. -// NNG_DECL const char *nng_stat_name(nng_stat *); +NNG_DECL const char *nng_stat_name(nng_stat *); // nng_stat_type is used to determine the type of the statistic. // At present, only NNG_STAT_TYPE_LEVEL and and NNG_STAT_TYPE_COUNTER @@ -799,32 +799,48 @@ enum nng_flag_enum { // value over time are likely more interesting than the actual level. Level // values reflect some absolute state however, and should be presented to the // user as is. -// NNG_DECL int nng_stat_type(nng_stat *); +NNG_DECL int nng_stat_type(nng_stat *); enum nng_stat_type_enum { - NNG_STAT_LEVEL = 0, // Numeric "absolute" value, diffs meaningless - NNG_STAT_COUNTER = 1 // Incrementing value (diffs are meaningful) + NNG_STAT_SCOPE = 0, // Stat is for scoping, and carries no value + NNG_STAT_LEVEL = 1, // Numeric "absolute" value, diffs meaningless + NNG_STAT_COUNTER = 2, // Incrementing value (diffs are meaningful) + NNG_STAT_STRING = 3, // Value is a string + NNG_STAT_BOOLEAN = 4, // Value is a boolean + NNG_STAT_ID = 5, // Value is a numeric ID }; // nng_stat_unit provides information about the unit for the statistic, // such as NNG_UNIT_BYTES or NNG_UNIT_BYTES. If no specific unit is -// applicable, such as a relative priority, then NN_UNIT_NONE is -// returned. -// NNG_DECL int nng_stat_unit(nng_stat *); +// applicable, such as a relative priority, then NN_UNIT_NONE is returned. +NNG_DECL int nng_stat_unit(nng_stat *); enum nng_unit_enum { - NNG_UNIT_NONE = 0, - NNG_UNIT_BYTES = 1, - NNG_UNIT_MESSAGES = 2, - NNG_UNIT_BOOLEAN = 3, - NNG_UNIT_MILLIS = 4, - NNG_UNIT_EVENTS = 5 + NNG_UNIT_NONE = 0, // No special units + NNG_UNIT_BYTES = 1, // Bytes, e.g. bytes sent, etc. + NNG_UNIT_MESSAGES = 2, // Messages, one per message + NNG_UNIT_MILLIS = 3, // Milliseconds + NNG_UNIT_EVENTS = 4 // Some other type of event }; // nng_stat_value returns returns the actual value of the statistic. // Statistic values reflect their value at the time that the corresponding // snapshot was updated, and are undefined until an update is performed. -// NNG_DECL int64_t nng_stat_value(nng_stat *); +NNG_DECL uint64_t nng_stat_value(nng_stat *); + +// nng_stat_string returns the string associated with a string statistic, +// or NULL if the statistic is not part of the string. The value returned +// is valid until the associated statistic is freed. +NNG_DECL const char *nng_stat_string(nng_stat *); + +// nng_stat_desc returns a human readable description of the statistic. +// This may be useful for display in diagnostic interfaces, etc. +NNG_DECL const char *nng_stat_desc(nng_stat *); + +// nng_stat_timestamp returns a timestamp (milliseconds) when the statistic +// was captured. The base offset is the same as used by nng_clock(). +// We don't use nng_time though, because that's in the supplemental header. +NNG_DECL uint64_t nng_stat_timestamp(nng_stat *); // Device functionality. This connects two sockets together in a device, // which means that messages from one side are forwarded to the other. diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index dc250943..3033b196 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -23,6 +23,8 @@ #define NNI_PROTO_PAIR_V1 NNI_PROTO(1, 1) #endif +#define BUMPSTAT(x) nni_stat_inc_atomic(x, 1) + typedef struct pair1_pipe pair1_pipe; typedef struct pair1_sock pair1_sock; @@ -35,16 +37,21 @@ static void pair1_pipe_fini(void *); // pair1_sock is our per-socket protocol private structure. struct pair1_sock { - nni_msgq * uwq; - nni_msgq * urq; - bool raw; - int ttl; - nni_mtx mtx; - nni_idhash *pipes; - nni_list plist; - bool started; - bool poly; - nni_aio * aio_getq; + nni_msgq * uwq; + nni_msgq * urq; + nni_sock * nsock; + bool raw; + int ttl; + nni_mtx mtx; + nni_idhash * pipes; + nni_list plist; + bool started; + bool poly; + nni_aio * aio_getq; + nni_stat_item stat_poly; + nni_stat_item stat_raw; + nni_stat_item stat_rejmismatch; + nni_stat_item stat_rejinuse; }; // pair1_pipe is our per-pipe protocol private structure. @@ -94,12 +101,29 @@ pair1_sock_init_impl(void **sp, nni_sock *nsock, bool raw) return (rv); } - s->raw = raw; - s->poly = false; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - s->ttl = 8; - *sp = s; + nni_stat_init_bool( + &s->stat_poly, "polyamorous", "polyamorous mode?", false); + nni_stat_set_lock(&s->stat_poly, &s->mtx); + nni_sock_add_stat(nsock, &s->stat_poly); + + nni_stat_init_bool(&s->stat_raw, "raw", "raw mode?", raw); + nni_sock_add_stat(nsock, &s->stat_raw); + + nni_stat_init_atomic(&s->stat_rejmismatch, "mismatch", + "pipes rejected (protocol mismatch)"); + nni_sock_add_stat(nsock, &s->stat_rejmismatch); + + nni_stat_init_atomic(&s->stat_rejinuse, "already", + "pipes rejected (already connected)"); + nni_sock_add_stat(nsock, &s->stat_rejinuse); + + s->nsock = nsock; + s->raw = raw; + s->poly = false; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + s->ttl = 8; + *sp = s; return (0); } @@ -173,13 +197,15 @@ pair1_pipe_start(void *arg) uint32_t id; int rv; + nni_mtx_lock(&s->mtx); if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V1) { + nni_mtx_unlock(&s->mtx); + BUMPSTAT(&s->stat_rejmismatch); // Peer protocol mismatch. return (NNG_EPROTO); } id = nni_pipe_id(p->npipe); - nni_mtx_lock(&s->mtx); if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) { nni_mtx_unlock(&s->mtx); return (rv); @@ -188,6 +214,7 @@ pair1_pipe_start(void *arg) if (!nni_list_empty(&s->plist)) { nni_idhash_remove(s->pipes, id); nni_mtx_unlock(&s->mtx); + BUMPSTAT(&s->stat_rejinuse); return (NNG_EBUSY); } } else { @@ -242,6 +269,7 @@ pair1_pipe_recv_cb(void *arg) uint32_t hdr; nni_pipe * npipe = p->npipe; int rv; + size_t len; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_close(p->npipe); @@ -266,10 +294,12 @@ pair1_pipe_recv_cb(void *arg) nni_pipe_close(npipe); return; } + len = nni_msg_len(msg); // If we bounced too many times, discard the message, but // keep getting more. if (hdr > (unsigned) s->ttl) { + // STAT: bump TTLdrop nni_msg_free(msg); nni_pipe_recv(npipe, p->aio_recv); return; @@ -277,6 +307,7 @@ pair1_pipe_recv_cb(void *arg) // Store the hop count in the header. if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) { + // STAT: bump allocfail nni_msg_free(msg); nni_pipe_recv(npipe, p->aio_recv); return; @@ -284,6 +315,7 @@ pair1_pipe_recv_cb(void *arg) // Send the message up. nni_aio_set_msg(p->aio_putq, msg); + nni_sock_bump_rx(s->nsock, len); nni_msgq_aio_put(s->urq, p->aio_putq); } @@ -452,6 +484,7 @@ pair1_sock_set_poly(void *arg, const void *buf, size_t sz, nni_opt_type t) int rv; nni_mtx_lock(&s->mtx); rv = s->started ? NNG_ESTATE : nni_copyin_bool(&s->poly, buf, sz, t); + nni_stat_set_value(&s->stat_poly, s->poly); nni_mtx_unlock(&s->mtx); return (rv); } @@ -468,6 +501,7 @@ pair1_sock_send(void *arg, nni_aio *aio) { pair1_sock *s = arg; + nni_sock_bump_tx(s->nsock, nni_msg_len(nni_aio_get_msg(aio))); nni_msgq_aio_put(s->uwq, aio); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index ab6486bd..6199e949 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -37,6 +37,16 @@ struct nni_inproc_pipe { uint16_t peer; uint16_t proto; size_t rcvmax; + nni_stat_item st_rxbytes; + nni_stat_item st_txbytes; + nni_stat_item st_rxmsgs; + nni_stat_item st_txmsgs; + nni_stat_item st_rxdiscards; + nni_stat_item st_txdiscards; + nni_stat_item st_rxerrs; + nni_stat_item st_txerrs; + nni_stat_item st_rxoversize; + nni_stat_item st_rcvmaxsz; }; // nni_inproc_pair represents a pair of pipes. Because we control both @@ -60,6 +70,7 @@ struct nni_inproc_ep { nni_mtx mtx; nni_dialer * ndialer; nni_listener *nlistener; + nni_stat_item st_rcvmaxsz; }; // nni_inproc is our global state - this contains the list of active endpoints @@ -123,11 +134,120 @@ nni_inproc_pipe_alloc(nni_inproc_pipe **pipep, nni_inproc_ep *ep) return (0); } +#ifdef NNG_ENABLE_STATS +static void +inproc_get_rxbytes(nni_stat_item *st, void *arg) +{ + nni_msgq *mq = arg; + nni_stat_set_value(st, nni_msgq_stat_get_bytes(mq)); +} + +static void +inproc_get_rxmsgs(nni_stat_item *st, void *arg) +{ + nni_msgq *mq = arg; + nni_stat_set_value(st, nni_msgq_stat_get_msgs(mq)); +} + +static void +inproc_get_txbytes(nni_stat_item *st, void *arg) +{ + nni_msgq *mq = arg; + nni_stat_set_value(st, nni_msgq_stat_put_bytes(mq)); +} + +static void +inproc_get_txmsgs(nni_stat_item *st, void *arg) +{ + nni_msgq *mq = arg; + nni_stat_set_value(st, nni_msgq_stat_put_msgs(mq)); +} + +static void +inproc_get_discards(nni_stat_item *st, void *arg) +{ + nni_msgq *mq = arg; + nni_stat_set_value(st, nni_msgq_stat_discards(mq)); +} + +static void +inproc_get_txerrs(nni_stat_item *st, void *arg) +{ + nni_msgq *mq = arg; + nni_stat_set_value(st, nni_msgq_stat_put_errs(mq)); +} + +static void +inproc_get_rxerrs(nni_stat_item *st, void *arg) +{ + nni_msgq *mq = arg; + nni_stat_set_value(st, nni_msgq_stat_get_errs(mq)); +} +#else +#undef nni_stat_set_update +#define nni_stat_set_update(p, x, f) +#endif + static int nni_inproc_pipe_init(void *arg, nni_pipe *p) { nni_inproc_pipe *pipe = arg; pipe->npipe = p; + + nni_stat_init(&pipe->st_rxbytes, "rxbytes", "bytes received (raw)"); + nni_stat_set_update(&pipe->st_rxbytes, inproc_get_rxbytes, pipe->rq); + nni_stat_set_unit(&pipe->st_rxbytes, NNG_UNIT_BYTES); + nni_pipe_add_stat(p, &pipe->st_rxbytes); + + nni_stat_init(&pipe->st_txbytes, "txbytes", "bytes sent (raw)"); + nni_stat_set_update(&pipe->st_txbytes, inproc_get_txbytes, pipe->wq); + nni_stat_set_unit(&pipe->st_txbytes, NNG_UNIT_BYTES); + nni_pipe_add_stat(p, &pipe->st_txbytes); + + nni_stat_init(&pipe->st_rxmsgs, "rxmsgs", "msgs received"); + nni_stat_set_update(&pipe->st_rxmsgs, inproc_get_rxmsgs, pipe->rq); + nni_stat_set_unit(&pipe->st_rxmsgs, NNG_UNIT_MESSAGES); + nni_pipe_add_stat(p, &pipe->st_rxmsgs); + + nni_stat_init(&pipe->st_txmsgs, "txmsgs", "msgs sent"); + nni_stat_set_update(&pipe->st_txmsgs, inproc_get_txmsgs, pipe->wq); + nni_stat_set_unit(&pipe->st_txmsgs, NNG_UNIT_MESSAGES); + nni_pipe_add_stat(p, &pipe->st_txmsgs); + + nni_stat_init( + &pipe->st_rxdiscards, "rxdiscards", "receives discarded"); + nni_stat_set_update( + &pipe->st_rxdiscards, inproc_get_discards, pipe->rq); + nni_stat_set_unit(&pipe->st_rxdiscards, NNG_UNIT_MESSAGES); + nni_pipe_add_stat(p, &pipe->st_rxdiscards); + + nni_stat_init(&pipe->st_txdiscards, "txdiscards", "sends discarded"); + nni_stat_set_update( + &pipe->st_txdiscards, inproc_get_discards, pipe->wq); + nni_stat_set_unit(&pipe->st_txdiscards, NNG_UNIT_MESSAGES); + nni_pipe_add_stat(p, &pipe->st_txdiscards); + + nni_stat_init(&pipe->st_rxerrs, "rxerrs", "receive errors"); + nni_stat_set_update(&pipe->st_rxerrs, inproc_get_rxerrs, pipe->rq); + nni_stat_set_unit(&pipe->st_rxerrs, NNG_UNIT_MESSAGES); + nni_pipe_add_stat(p, &pipe->st_rxerrs); + + nni_stat_init(&pipe->st_txerrs, "txerrs", "send errors"); + nni_stat_set_update(&pipe->st_txerrs, inproc_get_txerrs, pipe->wq); + nni_stat_set_unit(&pipe->st_txerrs, NNG_UNIT_MESSAGES); + nni_pipe_add_stat(p, &pipe->st_txerrs); + + nni_stat_init_atomic(&pipe->st_rxoversize, "rxoversize", + "oversize msgs received (dropped)"); + nni_stat_set_unit(&pipe->st_rxoversize, NNG_UNIT_MESSAGES); + nni_pipe_add_stat(p, &pipe->st_rxoversize); + + nni_stat_init(&pipe->st_rcvmaxsz, "rcvmaxsz", "maximum receive size"); + nni_stat_set_type(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES); + nni_stat_set_unit(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES); + nni_stat_set_value(&pipe->st_rcvmaxsz, pipe->rcvmax); + nni_pipe_add_stat(p, &pipe->st_rcvmaxsz); + return (0); } @@ -225,7 +345,15 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer) nni_aio_list_init(&ep->aios); ep->addr = url->u_rawurl; // we match on the full URL. - *epp = ep; + + nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size"); + nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL); + nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES); + nni_stat_set_lock(&ep->st_rcvmaxsz, &ep->mtx); + + nni_dialer_add_stat(ndialer, &ep->st_rcvmaxsz); + + *epp = ep; return (0); } @@ -248,7 +376,14 @@ nni_inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener) nni_aio_list_init(&ep->aios); ep->addr = url->u_rawurl; // we match on the full URL. - *epp = ep; + + nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size"); + nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL); + nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES); + nni_stat_set_lock(&ep->st_rcvmaxsz, &ep->mtx); + nni_listener_add_stat(nlistener, &ep->st_rcvmaxsz); + + *epp = ep; return (0); } @@ -284,6 +419,7 @@ inproc_filter(void *arg, nni_msg *msg) { nni_inproc_pipe *p = arg; if (p->rcvmax && (nni_msg_len(msg) > p->rcvmax)) { + nni_stat_inc_atomic(&p->st_rxoversize, 1); nni_msg_free(msg); return (NULL); } @@ -509,6 +645,7 @@ inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { nni_mtx_lock(&ep->mtx); ep->rcvmax = val; + nni_stat_set_value(&ep->st_rcvmaxsz, val); nni_mtx_unlock(&ep->mtx); } return (rv); |
