aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c214
1 files changed, 184 insertions, 30 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index b2f331fb..63696b36 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -53,6 +53,22 @@ typedef struct nni_sock_pipe_cb {
void * cb_arg;
} nni_sock_pipe_cb;
+typedef struct sock_stats {
+ nni_stat_item s_root; // socket scope
+ nni_stat_item s_id; // socket id
+ nni_stat_item s_name; // socket name
+ nni_stat_item s_protocol; // socket protocol
+ nni_stat_item s_ndialers; // number of dialers
+ nni_stat_item s_nlisteners; // number of listeners
+ nni_stat_item s_npipes; // number of pipes
+ nni_stat_item s_rxbytes; // number of bytes received
+ nni_stat_item s_txbytes; // number of bytes received
+ nni_stat_item s_rxmsgs; // number of msgs received
+ nni_stat_item s_txmsgs; // number of msgs sent
+ nni_stat_item s_protorej; // pipes rejected by protocol
+ nni_stat_item s_apprej; // pipes rejected by application
+} sock_stats;
+
struct nni_socket {
nni_list_node s_node;
nni_mtx s_mx;
@@ -82,6 +98,7 @@ struct nni_socket {
size_t s_rcvmaxsz; // max receive size
nni_list s_options; // opts not handled by sock/proto
char s_name[64]; // socket name (legacy compat)
+ char s_scope[24]; // socket scope ("socket%u", 32 bits max)
nni_list s_listeners; // active listeners
nni_list s_dialers; // active dialers
@@ -94,6 +111,8 @@ struct nni_socket {
nni_mtx s_pipe_cbs_mtx;
nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
+
+ sock_stats s_stats;
};
static void nni_ctx_destroy(nni_ctx *);
@@ -400,6 +419,81 @@ nni_sock_rele(nni_sock *s)
}
static void
+sock_stats_fini(nni_sock *s)
+{
+#ifdef NNG_ENABLE_STATS
+ sock_stats *st = &s->s_stats;
+ nni_stat_remove(&st->s_root);
+#else
+ NNI_ARG_UNUSED(s);
+#endif
+}
+
+static void
+sock_stats_init(nni_sock *s)
+{
+#ifdef NNG_ENABLE_STATS
+ sock_stats * st = &s->s_stats;
+ nni_stat_item *root = &s->s_stats.s_root;
+
+ // To make collection cheap and atomic for the socket,
+ // we just use a single lock for the entire chain.
+
+ nni_stat_init_scope(root, s->s_scope, "socket statistics");
+
+ nni_stat_init_id(&st->s_id, "id", "socket id", s->s_id);
+ nni_stat_append(root, &st->s_id);
+
+ nni_stat_init_string(&st->s_name, "name", "socket name", s->s_name);
+ nni_stat_set_lock(&st->s_name, &s->s_mx);
+ nni_stat_append(root, &st->s_name);
+
+ nni_stat_init_string(&st->s_protocol, "protocol", "socket protocol",
+ nni_sock_proto_name(s));
+ nni_stat_append(root, &st->s_protocol);
+
+ nni_stat_init_atomic(&st->s_ndialers, "ndialers", "open dialers");
+ nni_stat_set_type(&st->s_ndialers, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_ndialers);
+
+ nni_stat_init_atomic(
+ &st->s_nlisteners, "nlisteners", "open listeners");
+ nni_stat_set_type(&st->s_nlisteners, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_nlisteners);
+
+ nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
+ nni_stat_set_type(&st->s_npipes, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_npipes);
+
+ nni_stat_init_atomic(&st->s_rxbytes, "rxbytes", "bytes received");
+ nni_stat_set_unit(&st->s_rxbytes, NNG_UNIT_BYTES);
+ nni_stat_append(root, &st->s_rxbytes);
+
+ nni_stat_init_atomic(&st->s_txbytes, "txbytes", "bytes sent");
+ nni_stat_set_unit(&st->s_txbytes, NNG_UNIT_BYTES);
+ nni_stat_append(root, &st->s_txbytes);
+
+ nni_stat_init_atomic(&st->s_rxmsgs, "rxmsgs", "messages received");
+ nni_stat_set_unit(&st->s_rxmsgs, NNG_UNIT_MESSAGES);
+ nni_stat_append(root, &st->s_rxmsgs);
+
+ nni_stat_init_atomic(&st->s_txmsgs, "txmsgs", "messages sent");
+ nni_stat_set_unit(&st->s_txmsgs, NNG_UNIT_MESSAGES);
+ nni_stat_append(root, &st->s_txmsgs);
+
+ nni_stat_init_atomic(
+ &st->s_protorej, "protoreject", "pipes rejected by protocol");
+ nni_stat_append(root, &st->s_protorej);
+
+ nni_stat_init_atomic(
+ &st->s_apprej, "appreject", "pipes rejected by application");
+ nni_stat_append(root, &st->s_apprej);
+#else
+ NNI_ARG_UNUSED(s);
+#endif
+}
+
+static void
sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
@@ -418,6 +512,7 @@ sock_destroy(nni_sock *s)
nni_mtx_lock(&s->s_mx);
nni_mtx_unlock(&s->s_mx);
+ sock_stats_fini(s);
nni_msgq_fini(s->s_urq);
nni_msgq_fini(s->s_uwq);
nni_cv_fini(&s->s_close_cv);
@@ -469,6 +564,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
nni_cv_init(&s->s_close_cv, &sock_lk);
+ sock_stats_init(s);
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 1)) != 0) ||
@@ -567,7 +663,14 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
// Set the sockname.
(void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id);
- return (rv);
+ // Set up basic stat values.
+ (void) snprintf(s->s_scope, sizeof(s->s_scope), "socket%u", s->s_id);
+ nni_stat_set_value(&s->s_stats.s_id, s->s_id);
+
+ // Add our stats chain.
+ nni_stat_append(NULL, &s->s_stats.s_root);
+
+ return (0);
}
// nni_sock_shutdown shuts down the socket; after this point no
@@ -699,6 +802,8 @@ nni_sock_close(nni_sock *s)
// is idempotent.
nni_sock_shutdown(s);
+ nni_stat_remove(&s->s_stats.s_root);
+
nni_mtx_lock(&sock_lk);
if (s->s_closed) {
// Some other thread called close. All we need to do
@@ -830,6 +935,9 @@ nni_sock_add_listener(nni_sock *s, nni_listener *l)
}
nni_list_append(&s->s_listeners, l);
+
+ nni_stat_inc_atomic(&s->s_stats.s_nlisteners, 1);
+
nni_mtx_unlock(&s->s_mx);
return (0);
}
@@ -856,34 +964,11 @@ nni_sock_add_dialer(nni_sock *s, nni_dialer *d)
}
nni_list_append(&s->s_dialers, d);
- nni_mtx_unlock(&s->s_mx);
- return (0);
-}
-void
-nni_sock_remove_listener(nni_sock *s, nni_listener *l)
-{
- nni_mtx_lock(&s->s_mx);
- if (nni_list_active(&s->s_listeners, l)) {
- nni_list_remove(&s->s_listeners, l);
- if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
- nni_cv_wake(&s->s_cv);
- }
- }
- nni_mtx_unlock(&s->s_mx);
-}
+ nni_stat_inc_atomic(&s->s_stats.s_ndialers, 1);
-void
-nni_sock_remove_dialer(nni_sock *s, nni_dialer *d)
-{
- nni_mtx_lock(&s->s_mx);
- if (nni_list_active(&s->s_dialers, d)) {
- nni_list_remove(&s->s_dialers, d);
- if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
- nni_cv_wake(&s->s_cv);
- }
- }
nni_mtx_unlock(&s->s_mx);
+ return (0);
}
int
@@ -1398,13 +1483,25 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
d->d_pipe = p;
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);
+ nni_stat_inc_atomic(&d->d_stats.s_npipes, 1);
+
+ nni_pipe_stats_init(p);
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
nni_mtx_lock(&s->s_mx);
- if ((p->p_closed) ||
- (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&d->d_stats.s_apprej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_apprej, 1);
+ nni_pipe_rele(p);
+ return;
+ }
+ if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&d->d_stats.s_protorej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_protorej, 1);
nni_pipe_close(p);
nni_pipe_rele(p);
return;
@@ -1493,17 +1590,30 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_mtx_unlock(&s->s_mx);
return;
}
+
p->p_listener = l;
nni_list_append(&l->l_pipes, p);
nni_list_append(&s->s_pipes, p);
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_npipes, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);
+
+ nni_pipe_stats_init(p);
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
nni_mtx_lock(&s->s_mx);
- if ((p->p_closed) ||
- (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_apprej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_apprej, 1);
+ nni_pipe_rele(p);
+ return;
+ }
+ if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_protorej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_protorej, 1);
nni_pipe_close(p);
nni_pipe_rele(p);
return;
@@ -1616,6 +1726,15 @@ nni_pipe_remove(nni_pipe *p)
nni_dialer *d = p->p_dialer;
nni_mtx_lock(&s->s_mx);
+ if (nni_list_node_active(&p->p_sock_node)) {
+ nni_stat_dec_atomic(&s->s_stats.s_npipes, 1);
+ }
+ if (p->p_listener != NULL) {
+ nni_stat_dec_atomic(&p->p_listener->l_stats.s_npipes, 1);
+ }
+ if (p->p_dialer != NULL) {
+ nni_stat_dec_atomic(&p->p_dialer->d_stats.s_npipes, 1);
+ }
nni_list_node_remove(&p->p_sock_node);
nni_list_node_remove(&p->p_ep_node);
p->p_listener = NULL;
@@ -1629,3 +1748,38 @@ nni_pipe_remove(nni_pipe *p)
}
nni_mtx_unlock(&s->s_mx);
}
+
+void
+nni_sock_add_stat(nni_sock *s, nni_stat_item *stat)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_append(&s->s_stats.s_root, stat);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(stat);
+#endif
+}
+
+void
+nni_sock_bump_tx(nni_sock *s, uint64_t sz)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc_atomic(&s->s_stats.s_txmsgs, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_txbytes, sz);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(sz);
+#endif
+}
+
+void
+nni_sock_bump_rx(nni_sock *s, uint64_t sz)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc_atomic(&s->s_stats.s_rxmsgs, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_rxbytes, sz);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(sz);
+#endif
+}