diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 214 |
1 files changed, 184 insertions, 30 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index b2f331fb..63696b36 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -53,6 +53,22 @@ typedef struct nni_sock_pipe_cb { void * cb_arg; } nni_sock_pipe_cb; +typedef struct sock_stats { + nni_stat_item s_root; // socket scope + nni_stat_item s_id; // socket id + nni_stat_item s_name; // socket name + nni_stat_item s_protocol; // socket protocol + nni_stat_item s_ndialers; // number of dialers + nni_stat_item s_nlisteners; // number of listeners + nni_stat_item s_npipes; // number of pipes + nni_stat_item s_rxbytes; // number of bytes received + nni_stat_item s_txbytes; // number of bytes received + nni_stat_item s_rxmsgs; // number of msgs received + nni_stat_item s_txmsgs; // number of msgs sent + nni_stat_item s_protorej; // pipes rejected by protocol + nni_stat_item s_apprej; // pipes rejected by application +} sock_stats; + struct nni_socket { nni_list_node s_node; nni_mtx s_mx; @@ -82,6 +98,7 @@ struct nni_socket { size_t s_rcvmaxsz; // max receive size nni_list s_options; // opts not handled by sock/proto char s_name[64]; // socket name (legacy compat) + char s_scope[24]; // socket scope ("socket%u", 32 bits max) nni_list s_listeners; // active listeners nni_list s_dialers; // active dialers @@ -94,6 +111,8 @@ struct nni_socket { nni_mtx s_pipe_cbs_mtx; nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM]; + + sock_stats s_stats; }; static void nni_ctx_destroy(nni_ctx *); @@ -400,6 +419,81 @@ nni_sock_rele(nni_sock *s) } static void +sock_stats_fini(nni_sock *s) +{ +#ifdef NNG_ENABLE_STATS + sock_stats *st = &s->s_stats; + nni_stat_remove(&st->s_root); +#else + NNI_ARG_UNUSED(s); +#endif +} + +static void +sock_stats_init(nni_sock *s) +{ +#ifdef NNG_ENABLE_STATS + sock_stats * st = &s->s_stats; + nni_stat_item *root = &s->s_stats.s_root; + + // To make collection cheap and atomic for the socket, + // we just use a single lock for the entire chain. + + nni_stat_init_scope(root, s->s_scope, "socket statistics"); + + nni_stat_init_id(&st->s_id, "id", "socket id", s->s_id); + nni_stat_append(root, &st->s_id); + + nni_stat_init_string(&st->s_name, "name", "socket name", s->s_name); + nni_stat_set_lock(&st->s_name, &s->s_mx); + nni_stat_append(root, &st->s_name); + + nni_stat_init_string(&st->s_protocol, "protocol", "socket protocol", + nni_sock_proto_name(s)); + nni_stat_append(root, &st->s_protocol); + + nni_stat_init_atomic(&st->s_ndialers, "ndialers", "open dialers"); + nni_stat_set_type(&st->s_ndialers, NNG_STAT_LEVEL); + nni_stat_append(root, &st->s_ndialers); + + nni_stat_init_atomic( + &st->s_nlisteners, "nlisteners", "open listeners"); + nni_stat_set_type(&st->s_nlisteners, NNG_STAT_LEVEL); + nni_stat_append(root, &st->s_nlisteners); + + nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes"); + nni_stat_set_type(&st->s_npipes, NNG_STAT_LEVEL); + nni_stat_append(root, &st->s_npipes); + + nni_stat_init_atomic(&st->s_rxbytes, "rxbytes", "bytes received"); + nni_stat_set_unit(&st->s_rxbytes, NNG_UNIT_BYTES); + nni_stat_append(root, &st->s_rxbytes); + + nni_stat_init_atomic(&st->s_txbytes, "txbytes", "bytes sent"); + nni_stat_set_unit(&st->s_txbytes, NNG_UNIT_BYTES); + nni_stat_append(root, &st->s_txbytes); + + nni_stat_init_atomic(&st->s_rxmsgs, "rxmsgs", "messages received"); + nni_stat_set_unit(&st->s_rxmsgs, NNG_UNIT_MESSAGES); + nni_stat_append(root, &st->s_rxmsgs); + + nni_stat_init_atomic(&st->s_txmsgs, "txmsgs", "messages sent"); + nni_stat_set_unit(&st->s_txmsgs, NNG_UNIT_MESSAGES); + nni_stat_append(root, &st->s_txmsgs); + + nni_stat_init_atomic( + &st->s_protorej, "protoreject", "pipes rejected by protocol"); + nni_stat_append(root, &st->s_protorej); + + nni_stat_init_atomic( + &st->s_apprej, "appreject", "pipes rejected by application"); + nni_stat_append(root, &st->s_apprej); +#else + NNI_ARG_UNUSED(s); +#endif +} + +static void sock_destroy(nni_sock *s) { nni_sockopt *sopt; @@ -418,6 +512,7 @@ sock_destroy(nni_sock *s) nni_mtx_lock(&s->s_mx); nni_mtx_unlock(&s->s_mx); + sock_stats_fini(s); nni_msgq_fini(s->s_urq); nni_msgq_fini(s->s_uwq); nni_cv_fini(&s->s_close_cv); @@ -469,6 +564,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); nni_cv_init(&s->s_close_cv, &sock_lk); + sock_stats_init(s); if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 1)) != 0) || @@ -567,7 +663,14 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) // Set the sockname. (void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id); - return (rv); + // Set up basic stat values. + (void) snprintf(s->s_scope, sizeof(s->s_scope), "socket%u", s->s_id); + nni_stat_set_value(&s->s_stats.s_id, s->s_id); + + // Add our stats chain. + nni_stat_append(NULL, &s->s_stats.s_root); + + return (0); } // nni_sock_shutdown shuts down the socket; after this point no @@ -699,6 +802,8 @@ nni_sock_close(nni_sock *s) // is idempotent. nni_sock_shutdown(s); + nni_stat_remove(&s->s_stats.s_root); + nni_mtx_lock(&sock_lk); if (s->s_closed) { // Some other thread called close. All we need to do @@ -830,6 +935,9 @@ nni_sock_add_listener(nni_sock *s, nni_listener *l) } nni_list_append(&s->s_listeners, l); + + nni_stat_inc_atomic(&s->s_stats.s_nlisteners, 1); + nni_mtx_unlock(&s->s_mx); return (0); } @@ -856,34 +964,11 @@ nni_sock_add_dialer(nni_sock *s, nni_dialer *d) } nni_list_append(&s->s_dialers, d); - nni_mtx_unlock(&s->s_mx); - return (0); -} -void -nni_sock_remove_listener(nni_sock *s, nni_listener *l) -{ - nni_mtx_lock(&s->s_mx); - if (nni_list_active(&s->s_listeners, l)) { - nni_list_remove(&s->s_listeners, l); - if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { - nni_cv_wake(&s->s_cv); - } - } - nni_mtx_unlock(&s->s_mx); -} + nni_stat_inc_atomic(&s->s_stats.s_ndialers, 1); -void -nni_sock_remove_dialer(nni_sock *s, nni_dialer *d) -{ - nni_mtx_lock(&s->s_mx); - if (nni_list_active(&s->s_dialers, d)) { - nni_list_remove(&s->s_dialers, d); - if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { - nni_cv_wake(&s->s_cv); - } - } nni_mtx_unlock(&s->s_mx); + return (0); } int @@ -1398,13 +1483,25 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) d->d_pipe = p; d->d_currtime = d->d_inirtime; nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&s->s_stats.s_npipes, 1); + nni_stat_inc_atomic(&d->d_stats.s_npipes, 1); + + nni_pipe_stats_init(p); nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); nni_mtx_lock(&s->s_mx); - if ((p->p_closed) || - (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + if (p->p_closed) { nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&d->d_stats.s_apprej, 1); + nni_stat_inc_atomic(&s->s_stats.s_apprej, 1); + nni_pipe_rele(p); + return; + } + if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&d->d_stats.s_protorej, 1); + nni_stat_inc_atomic(&s->s_stats.s_protorej, 1); nni_pipe_close(p); nni_pipe_rele(p); return; @@ -1493,17 +1590,30 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_mtx_unlock(&s->s_mx); return; } + p->p_listener = l; nni_list_append(&l->l_pipes, p); nni_list_append(&s->s_pipes, p); nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&l->l_stats.s_npipes, 1); + nni_stat_inc_atomic(&s->s_stats.s_npipes, 1); + + nni_pipe_stats_init(p); nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); nni_mtx_lock(&s->s_mx); - if ((p->p_closed) || - (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + if (p->p_closed) { nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&l->l_stats.s_apprej, 1); + nni_stat_inc_atomic(&s->s_stats.s_apprej, 1); + nni_pipe_rele(p); + return; + } + if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_stat_inc_atomic(&l->l_stats.s_protorej, 1); + nni_stat_inc_atomic(&s->s_stats.s_protorej, 1); nni_pipe_close(p); nni_pipe_rele(p); return; @@ -1616,6 +1726,15 @@ nni_pipe_remove(nni_pipe *p) nni_dialer *d = p->p_dialer; nni_mtx_lock(&s->s_mx); + if (nni_list_node_active(&p->p_sock_node)) { + nni_stat_dec_atomic(&s->s_stats.s_npipes, 1); + } + if (p->p_listener != NULL) { + nni_stat_dec_atomic(&p->p_listener->l_stats.s_npipes, 1); + } + if (p->p_dialer != NULL) { + nni_stat_dec_atomic(&p->p_dialer->d_stats.s_npipes, 1); + } nni_list_node_remove(&p->p_sock_node); nni_list_node_remove(&p->p_ep_node); p->p_listener = NULL; @@ -1629,3 +1748,38 @@ nni_pipe_remove(nni_pipe *p) } nni_mtx_unlock(&s->s_mx); } + +void +nni_sock_add_stat(nni_sock *s, nni_stat_item *stat) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_append(&s->s_stats.s_root, stat); +#else + NNI_ARG_UNUSED(s); + NNI_ARG_UNUSED(stat); +#endif +} + +void +nni_sock_bump_tx(nni_sock *s, uint64_t sz) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_inc_atomic(&s->s_stats.s_txmsgs, 1); + nni_stat_inc_atomic(&s->s_stats.s_txbytes, sz); +#else + NNI_ARG_UNUSED(s); + NNI_ARG_UNUSED(sz); +#endif +} + +void +nni_sock_bump_rx(nni_sock *s, uint64_t sz) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_inc_atomic(&s->s_stats.s_rxmsgs, 1); + nni_stat_inc_atomic(&s->s_stats.s_rxbytes, sz); +#else + NNI_ARG_UNUSED(s); + NNI_ARG_UNUSED(sz); +#endif +} |
