From d83b96faeb02d7a3574e63880141d6b23f31ced1 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 22 Aug 2018 08:56:53 -0700 Subject: fixes #4 Statistics support This introduces new public APIs for obtaining statistics, and adds some generic stats for dialers, listeners, pipes, and sockets. Also added are stats for inproc and pairv1 protocol. The other protocols and transports will have stats added incrementally as time goes on. A simple test program, and man pages are provided for this. Start by looking at nng_stat(5). Statistics does have some impact, and they can be disabled by using the advanced NNG_ENABLE_STATS (setting it to OFF, it's ON by default) if you need to build a minimized configuration. --- src/core/socket.c | 214 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 184 insertions(+), 30 deletions(-) (limited to 'src/core/socket.c') diff --git a/src/core/socket.c b/src/core/socket.c index b2f331fb..63696b36 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -53,6 +53,22 @@ typedef struct nni_sock_pipe_cb { void * cb_arg; } nni_sock_pipe_cb; +typedef struct sock_stats { + nni_stat_item s_root; // socket scope + nni_stat_item s_id; // socket id + nni_stat_item s_name; // socket name + nni_stat_item s_protocol; // socket protocol + nni_stat_item s_ndialers; // number of dialers + nni_stat_item s_nlisteners; // number of listeners + nni_stat_item s_npipes; // number of pipes + nni_stat_item s_rxbytes; // number of bytes received + nni_stat_item s_txbytes; // number of bytes received + nni_stat_item s_rxmsgs; // number of msgs received + nni_stat_item s_txmsgs; // number of msgs sent + nni_stat_item s_protorej; // pipes rejected by protocol + nni_stat_item s_apprej; // pipes rejected by application +} sock_stats; + struct nni_socket { nni_list_node s_node; nni_mtx s_mx; @@ -82,6 +98,7 @@ struct nni_socket { size_t s_rcvmaxsz; // max receive size nni_list s_options; // opts not handled by sock/proto char s_name[64]; // socket name (legacy compat) + char s_scope[24]; // socket scope ("socket%u", 32 bits max) nni_list s_listeners; // active listeners nni_list s_dialers; // active dialers @@ -94,6 +111,8 @@ struct nni_socket { nni_mtx s_pipe_cbs_mtx; nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM]; + + sock_stats s_stats; }; static void nni_ctx_destroy(nni_ctx *); @@ -399,6 +418,81 @@ nni_sock_rele(nni_sock *s) nni_mtx_unlock(&sock_lk); } +static void +sock_stats_fini(nni_sock *s) +{ +#ifdef NNG_ENABLE_STATS + sock_stats *st = &s->s_stats; + nni_stat_remove(&st->s_root); +#else + NNI_ARG_UNUSED(s); +#endif +} + +static void +sock_stats_init(nni_sock *s) +{ +#ifdef NNG_ENABLE_STATS + sock_stats * st = &s->s_stats; + nni_stat_item *root = &s->s_stats.s_root; + + // To make collection cheap and atomic for the socket, + // we just use a single lock for the entire chain. + + nni_stat_init_scope(root, s->s_scope, "socket statistics"); + + nni_stat_init_id(&st->s_id, "id", "socket id", s->s_id); + nni_stat_append(root, &st->s_id); + + nni_stat_init_string(&st->s_name, "name", "socket name", s->s_name); + nni_stat_set_lock(&st->s_name, &s->s_mx); + nni_stat_append(root, &st->s_name); + + nni_stat_init_string(&st->s_protocol, "protocol", "socket protocol", + nni_sock_proto_name(s)); + nni_stat_append(root, &st->s_protocol); + + nni_stat_init_atomic(&st->s_ndialers, "ndialers", "open dialers"); + nni_stat_set_type(&st->s_ndialers, NNG_STAT_LEVEL); + nni_stat_append(root, &st->s_ndialers); + + nni_stat_init_atomic( + &st->s_nlisteners, "nlisteners", "open listeners"); + nni_stat_set_type(&st->s_nlisteners, NNG_STAT_LEVEL); + nni_stat_append(root, &st->s_nlisteners); + + nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes"); + nni_stat_set_type(&st->s_npipes, NNG_STAT_LEVEL); + nni_stat_append(root, &st->s_npipes); + + nni_stat_init_atomic(&st->s_rxbytes, "rxbytes", "bytes received"); + nni_stat_set_unit(&st->s_rxbytes, NNG_UNIT_BYTES); + nni_stat_append(root, &st->s_rxbytes); + + nni_stat_init_atomic(&st->s_txbytes, "txbytes", "bytes sent"); + nni_stat_set_unit(&st->s_txbytes, NNG_UNIT_BYTES); + nni_stat_append(root, &st->s_txbytes); + + nni_stat_init_atomic(&st->s_rxmsgs, "rxmsgs", "messages received"); + nni_stat_set_unit(&st->s_rxmsgs, NNG_UNIT_MESSAGES); + nni_stat_append(root, &st->s_rxmsgs); + + nni_stat_init_atomic(&st->s_txmsgs, "txmsgs", "messages sent"); + nni_stat_set_unit(&st->s_txmsgs, NNG_UNIT_MESSAGES); + nni_stat_append(root, &st->s_txmsgs); + + nni_stat_init_atomic( + &st->s_protorej, "protoreject", "pipes rejected by protocol"); + nni_stat_append(root, &st->s_protorej); + + nni_stat_init_atomic( + &st->s_apprej, "appreject", "pipes rejected by application"); + nni_stat_append(root, &st->s_apprej); +#else + NNI_ARG_UNUSED(s); +#endif +} + static void sock_destroy(nni_sock *s) { @@ -418,6 +512,7 @@ sock_destroy(nni_sock *s) nni_mtx_lock(&s->s_mx); nni_mtx_unlock(&s->s_mx); + sock_stats_fini(s); nni_msgq_fini(s->s_urq); nni_msgq_fini(s->s_uwq); nni_cv_fini(&s->s_close_cv); @@ -469,6 +564,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); nni_cv_init(&s->s_close_cv, &sock_lk); + sock_stats_init(s); if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 1)) != 0) || @@ -567,7 +663,14 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) // Set the sockname. (void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id); - return (rv); + // Set up basic stat values. + (void) snprintf(s->s_scope, sizeof(s->s_scope), "socket%u", s->s_id); + nni_stat_set_value(&s->s_stats.s_id, s->s_id); + + // Add our stats chain. + nni_stat_append(NULL, &s->s_stats.s_root); + + return (0); } // nni_sock_shutdown shuts down the socket; after this point no @@ -699,6 +802,8 @@ nni_sock_close(nni_sock *s) // is idempotent. nni_sock_shutdown(s); + nni_stat_remove(&s->s_stats.s_root); + nni_mtx_lock(&sock_lk); if (s->s_closed) { // Some other thread called close. All we need to do @@ -830,6 +935,9 @@ nni_sock_add_listener(nni_sock *s, nni_listener *l) } nni_list_append(&s->s_listeners, l); + + nni_stat_inc_atomic(&s->s_stats.s_nlisteners, 1); + nni_mtx_unlock(&s->s_mx); return (0); } @@ -856,34 +964,11 @@ nni_sock_add_dialer(nni_sock *s, nni_dialer *d) } nni_list_append(&s->s_dialers, d); - nni_mtx_unlock(&s->s_mx); - return (0); -} -void -nni_sock_remove_listener(nni_sock *s, nni_listener *l) -{ - nni_mtx_lock(&s->s_mx); - if (nni_list_active(&s->s_listeners, l)) { - nni_list_remove(&s->s_listeners, l); - if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { - nni_cv_wake(&s->s_cv); - } - } - nni_mtx_unlock(&s->s_mx); -} + nni_stat_inc_atomic(&s->s_stats.s_ndialers, 1); -void -nni_sock_remove_dialer(nni_sock *s, nni_dialer *d) -{ - nni_mtx_lock(&s->s_mx); - if (nni_list_active(&s->s_dialers, d)) { - nni_list_remove(&s->s_dialers, d); - if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { - nni_cv_wake(&s->s_cv); - } - } nni_mtx_unlock(&s->s_mx); + return (0); } int @@ -1398,13 +1483,25 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) d->d_pipe = p; d->d_currtime = d->d_inirtime; nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&s->s_stats.s_npipes, 1); + nni_stat_inc_atomic(&d->d_stats.s_npipes, 1); + + nni_pipe_stats_init(p); nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); nni_mtx_lock(&s->s_mx); - if ((p->p_closed) || - (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + if (p->p_closed) { nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&d->d_stats.s_apprej, 1); + nni_stat_inc_atomic(&s->s_stats.s_apprej, 1); + nni_pipe_rele(p); + return; + } + if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&d->d_stats.s_protorej, 1); + nni_stat_inc_atomic(&s->s_stats.s_protorej, 1); nni_pipe_close(p); nni_pipe_rele(p); return; @@ -1493,17 +1590,30 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_mtx_unlock(&s->s_mx); return; } + p->p_listener = l; nni_list_append(&l->l_pipes, p); nni_list_append(&s->s_pipes, p); nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&l->l_stats.s_npipes, 1); + nni_stat_inc_atomic(&s->s_stats.s_npipes, 1); + + nni_pipe_stats_init(p); nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); nni_mtx_lock(&s->s_mx); - if ((p->p_closed) || - (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + if (p->p_closed) { nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&l->l_stats.s_apprej, 1); + nni_stat_inc_atomic(&s->s_stats.s_apprej, 1); + nni_pipe_rele(p); + return; + } + if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&l->l_stats.s_protorej, 1); + nni_stat_inc_atomic(&s->s_stats.s_protorej, 1); nni_pipe_close(p); nni_pipe_rele(p); return; @@ -1616,6 +1726,15 @@ nni_pipe_remove(nni_pipe *p) nni_dialer *d = p->p_dialer; nni_mtx_lock(&s->s_mx); + if (nni_list_node_active(&p->p_sock_node)) { + nni_stat_dec_atomic(&s->s_stats.s_npipes, 1); + } + if (p->p_listener != NULL) { + nni_stat_dec_atomic(&p->p_listener->l_stats.s_npipes, 1); + } + if (p->p_dialer != NULL) { + nni_stat_dec_atomic(&p->p_dialer->d_stats.s_npipes, 1); + } nni_list_node_remove(&p->p_sock_node); nni_list_node_remove(&p->p_ep_node); p->p_listener = NULL; @@ -1629,3 +1748,38 @@ nni_pipe_remove(nni_pipe *p) } nni_mtx_unlock(&s->s_mx); } + +void +nni_sock_add_stat(nni_sock *s, nni_stat_item *stat) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_append(&s->s_stats.s_root, stat); +#else + NNI_ARG_UNUSED(s); + NNI_ARG_UNUSED(stat); +#endif +} + +void +nni_sock_bump_tx(nni_sock *s, uint64_t sz) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_inc_atomic(&s->s_stats.s_txmsgs, 1); + nni_stat_inc_atomic(&s->s_stats.s_txbytes, sz); +#else + NNI_ARG_UNUSED(s); + NNI_ARG_UNUSED(sz); +#endif +} + +void +nni_sock_bump_rx(nni_sock *s, uint64_t sz) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_inc_atomic(&s->s_stats.s_rxmsgs, 1); + nni_stat_inc_atomic(&s->s_stats.s_rxbytes, sz); +#else + NNI_ARG_UNUSED(s); + NNI_ARG_UNUSED(sz); +#endif +} -- cgit v1.2.3-70-g09d2