aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/dialer.c87
-rw-r--r--src/core/dialer.h1
-rw-r--r--src/core/init.c4
-rw-r--r--src/core/listener.c82
-rw-r--r--src/core/listener.h1
-rw-r--r--src/core/msgqueue.c100
-rw-r--r--src/core/msgqueue.h9
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/pipe.c50
-rw-r--r--src/core/pipe.h3
-rw-r--r--src/core/socket.c214
-rw-r--r--src/core/socket.h9
-rw-r--r--src/core/sockimpl.h51
-rw-r--r--src/core/stats.c525
-rw-r--r--src/core/stats.h108
15 files changed, 1200 insertions, 45 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 00ebb513..11faed7c 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -23,6 +23,8 @@ static void dialer_timer_cb(void *);
static nni_idhash *dialers;
static nni_mtx dialers_lk;
+#define BUMPSTAT(x) nni_stat_inc_atomic(x, 1)
+
int
nni_dialer_sys_init(void)
{
@@ -32,8 +34,7 @@ nni_dialer_sys_init(void)
return (rv);
}
nni_mtx_init(&dialers_lk);
- nni_idhash_set_limits(
- dialers, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+ nni_idhash_set_limits(dialers, 1, 0x7fffffff, 1);
return (0);
}
@@ -70,6 +71,55 @@ nni_dialer_destroy(nni_dialer *d)
NNI_FREE_STRUCT(d);
}
+static void
+dialer_stats_init(nni_dialer *d)
+{
+ nni_dialer_stats *st = &d->d_stats;
+ nni_stat_item * root = &st->s_root;
+
+ nni_stat_init_scope(root, st->s_scope, "dialer statistics");
+
+ nni_stat_init_id(&st->s_id, "id", "dialer id", d->d_id);
+ nni_stat_append(root, &st->s_id);
+
+ nni_stat_init_id(&st->s_sock, "socket", "socket for dialer",
+ nni_sock_id(d->d_sock));
+ nni_stat_append(root, &st->s_sock);
+
+ nni_stat_init_string(
+ &st->s_url, "url", "dialer url", d->d_url->u_rawurl);
+ nni_stat_append(root, &st->s_url);
+
+ nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
+ nni_stat_append(root, &st->s_npipes);
+
+ nni_stat_init_atomic(&st->s_connok, "connok", "connections made");
+ nni_stat_append(root, &st->s_connok);
+
+ nni_stat_init_atomic(
+ &st->s_canceled, "canceled", "connections canceled");
+ nni_stat_append(root, &st->s_canceled);
+
+ nni_stat_init_atomic(&st->s_refused, "refused", "connections refused");
+ nni_stat_append(root, &st->s_refused);
+
+ nni_stat_init_atomic(
+ &st->s_timedout, "timedout", "connections timed out");
+ nni_stat_append(root, &st->s_timedout);
+
+ nni_stat_init_atomic(
+ &st->s_othererr, "othererr", "other connection errors");
+ nni_stat_append(root, &st->s_othererr);
+
+ nni_stat_init_atomic(
+ &st->s_protorej, "protoreject", "pipes rejected by protocol");
+ nni_stat_append(root, &st->s_protorej);
+
+ nni_stat_init_atomic(
+ &st->s_apprej, "appreject", "pipes rejected by application");
+ nni_stat_append(root, &st->s_apprej);
+}
+
int
nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
{
@@ -110,6 +160,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
nni_mtx_init(&d->d_mtx);
+ dialer_stats_init(d);
if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) ||
@@ -119,6 +170,10 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
return (rv);
}
+ snprintf(d->d_stats.s_scope, sizeof(d->d_stats.s_scope), "dialer%u",
+ d->d_id);
+ nni_stat_set_value(&d->d_stats.s_id, d->d_id);
+ nni_stat_append(NULL, &d->d_stats.s_root);
*dp = d;
return (0);
}
@@ -167,6 +222,7 @@ nni_dialer_rele(nni_dialer *d)
nni_mtx_lock(&dialers_lk);
d->d_refcnt--;
if ((d->d_refcnt == 0) && (d->d_closed)) {
+ nni_stat_remove(&d->d_stats.s_root);
nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d);
}
nni_mtx_unlock(&dialers_lk);
@@ -242,12 +298,33 @@ dialer_connect_cb(void *arg)
switch ((rv = nni_aio_result(aio))) {
case 0:
+ BUMPSTAT(&d->d_stats.s_connok);
nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
break;
case NNG_ECLOSED: // No further action.
case NNG_ECANCELED: // No further action.
+ BUMPSTAT(&d->d_stats.s_canceled);
break;
+ case NNG_ECONNREFUSED:
+ BUMPSTAT(&d->d_stats.s_refused);
+ if (uaio == NULL) {
+ nni_dialer_timer_start(d);
+ } else {
+ nni_atomic_flag_reset(&d->d_started);
+ }
+ break;
+
+ case NNG_ETIMEDOUT:
+ BUMPSTAT(&d->d_stats.s_timedout);
+ if (uaio == NULL) {
+ nni_dialer_timer_start(d);
+ } else {
+ nni_atomic_flag_reset(&d->d_started);
+ }
+ break;
+
default:
+ BUMPSTAT(&d->d_stats.s_othererr);
if (uaio == NULL) {
nni_dialer_timer_start(d);
} else {
@@ -389,3 +466,9 @@ nni_dialer_getopt(
return (nni_sock_getopt(d->d_sock, name, valp, szp, t));
}
+
+void
+nni_dialer_add_stat(nni_dialer *d, nni_stat_item *stat)
+{
+ nni_stat_append(&d->d_stats.s_root, stat);
+}
diff --git a/src/core/dialer.h b/src/core/dialer.h
index 361c6d5e..da2cc992 100644
--- a/src/core/dialer.h
+++ b/src/core/dialer.h
@@ -26,5 +26,6 @@ extern int nni_dialer_setopt(
nni_dialer *, const char *, const void *, size_t, nni_opt_type);
extern int nni_dialer_getopt(
nni_dialer *, const char *, void *, size_t *, nni_opt_type);
+extern void nni_dialer_add_stat(nni_dialer *, nni_stat_item *);
#endif // CORE_DIALER_H
diff --git a/src/core/init.c b/src/core/init.c
index c66e54d2..60736fb7 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -27,7 +27,8 @@ nni_init_helper(void)
NNI_LIST_INIT(&nni_init_list, nni_initializer, i_node);
nni_inited = true;
- if (((rv = nni_taskq_sys_init()) != 0) ||
+ if (((rv = nni_stat_sys_init()) != 0) ||
+ ((rv = nni_taskq_sys_init()) != 0) ||
((rv = nni_reap_sys_init()) != 0) ||
((rv = nni_timer_sys_init()) != 0) ||
((rv = nni_aio_sys_init()) != 0) ||
@@ -81,6 +82,7 @@ nni_fini(void)
nni_timer_sys_fini();
nni_taskq_sys_fini();
nni_reap_sys_fini(); // must be before timer and aio (expire)
+ nni_stat_sys_fini();
nni_mtx_fini(&nni_init_mtx);
nni_plat_fini();
diff --git a/src/core/listener.c b/src/core/listener.c
index 1fe6faab..135478de 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -24,6 +24,8 @@ static void listener_timer_cb(void *);
static nni_idhash *listeners;
static nni_mtx listeners_lk;
+#define BUMPSTAT(x) nni_stat_inc_atomic(x, 1)
+
int
nni_listener_sys_init(void)
{
@@ -33,8 +35,7 @@ nni_listener_sys_init(void)
return (rv);
}
nni_mtx_init(&listeners_lk);
- nni_idhash_set_limits(
- listeners, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+ nni_idhash_set_limits(listeners, 1, 0x7fffffff, 1);
return (0);
}
@@ -70,6 +71,55 @@ nni_listener_destroy(nni_listener *l)
NNI_FREE_STRUCT(l);
}
+static void
+listener_stats_init(nni_listener *l)
+{
+ nni_listener_stats *st = &l->l_stats;
+ nni_stat_item * root = &st->s_root;
+
+ nni_stat_init_scope(root, st->s_scope, "listener statistics");
+
+ // NB: This will be updated later.
+ nni_stat_init_id(&st->s_id, "id", "listener id", l->l_id);
+ nni_stat_append(root, &st->s_id);
+
+ nni_stat_init_id(&st->s_sock, "socket", "socket for listener",
+ nni_sock_id(l->l_sock));
+ nni_stat_append(root, &st->s_sock);
+
+ nni_stat_init_string(
+ &st->s_url, "url", "listener url", l->l_url->u_rawurl);
+ nni_stat_append(root, &st->s_url);
+
+ nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
+ nni_stat_append(root, &st->s_npipes);
+
+ nni_stat_init_atomic(&st->s_accept, "accept", "connections accepted");
+ nni_stat_append(root, &st->s_accept);
+
+ nni_stat_init_atomic(
+ &st->s_aborted, "aborted", "accepts aborted remotely");
+ nni_stat_append(root, &st->s_aborted);
+
+ nni_stat_init_atomic(&st->s_timedout, "timedout", "accepts timed out");
+ nni_stat_append(root, &st->s_timedout);
+
+ nni_stat_init_atomic(&st->s_canceled, "canceled", "accepts canceled");
+ nni_stat_append(root, &st->s_canceled);
+
+ nni_stat_init_atomic(
+ &st->s_othererr, "othererr", "other accept errors");
+ nni_stat_append(root, &st->s_othererr);
+
+ nni_stat_init_atomic(
+ &st->s_protorej, "protoreject", "pipes rejected by protocol");
+ nni_stat_append(root, &st->s_protorej);
+
+ nni_stat_init_atomic(
+ &st->s_apprej, "appreject", "pipes rejected by application");
+ nni_stat_append(root, &st->s_apprej);
+}
+
int
nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
{
@@ -107,6 +157,7 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
NNI_LIST_NODE_INIT(&l->l_node);
NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node);
+ listener_stats_init(l);
if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) ||
((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) ||
@@ -117,6 +168,12 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
return (rv);
}
+ // Update a few stat bits, and register them.
+ snprintf(l->l_stats.s_scope, sizeof(l->l_stats.s_scope), "listener%u",
+ l->l_id);
+ nni_stat_set_value(&l->l_stats.s_id, l->l_id);
+ nni_stat_append(NULL, &l->l_stats.s_root);
+
*lp = l;
return (0);
}
@@ -165,6 +222,7 @@ nni_listener_rele(nni_listener *l)
nni_mtx_lock(&listeners_lk);
l->l_refcnt--;
if ((l->l_refcnt == 0) && (l->l_closed)) {
+ nni_stat_remove(&l->l_stats.s_root);
nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l);
}
nni_mtx_unlock(&listeners_lk);
@@ -233,18 +291,30 @@ listener_accept_cb(void *arg)
switch (nni_aio_result(aio)) {
case 0:
+ BUMPSTAT(&l->l_stats.s_accept);
nni_listener_add_pipe(l, nni_aio_get_output(aio, 0));
listener_accept_start(l);
break;
case NNG_ECONNABORTED: // remote condition, no cooldown
case NNG_ECONNRESET: // remote condition, no cooldown
- case NNG_EPEERAUTH: // peer validation failure
+ BUMPSTAT(&l->l_stats.s_aborted);
+ listener_accept_start(l);
+ break;
+ case NNG_ETIMEDOUT:
+ // No need to sleep since we timed out already.
+ BUMPSTAT(&l->l_stats.s_timedout);
+ listener_accept_start(l);
+ break;
+ case NNG_EPEERAUTH: // peer validation failure
+ BUMPSTAT(&l->l_stats.s_othererr);
listener_accept_start(l);
break;
case NNG_ECLOSED: // no further action
case NNG_ECANCELED: // no further action
+ BUMPSTAT(&l->l_stats.s_canceled);
break;
default:
+ BUMPSTAT(&l->l_stats.s_othererr);
// We don't really know why we failed, but we backoff
// here. This is because errors here are probably due
// to system failures (resource exhaustion) and we hope
@@ -339,3 +409,9 @@ nni_listener_getopt(
return (nni_sock_getopt(l->l_sock, name, valp, szp, t));
}
+
+void
+nni_listener_add_stat(nni_listener *l, nni_stat_item *stat)
+{
+ nni_stat_append(&l->l_stats.s_root, stat);
+}
diff --git a/src/core/listener.h b/src/core/listener.h
index 67a782bd..828102a2 100644
--- a/src/core/listener.h
+++ b/src/core/listener.h
@@ -26,5 +26,6 @@ extern int nni_listener_setopt(
nni_listener *, const char *, const void *, size_t, nni_opt_type);
extern int nni_listener_getopt(
nni_listener *, const char *, void *, size_t *, nni_opt_type);
+extern void nni_listener_add_stat(nni_listener *, nni_stat_item *);
#endif // CORE_LISTENER_H
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 62a3893b..4b59aead 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -37,6 +37,15 @@ struct nni_msgq {
// Filters.
nni_msgq_filter mq_filter_fn;
void * mq_filter_arg;
+
+ // Statistics.
+ nni_atomic_u64 mq_get_msgs;
+ nni_atomic_u64 mq_put_msgs;
+ nni_atomic_u64 mq_get_bytes;
+ nni_atomic_u64 mq_put_bytes;
+ nni_atomic_u64 mq_get_errs;
+ nni_atomic_u64 mq_put_errs;
+ nni_atomic_u64 mq_discards;
};
static void nni_msgq_run_notify(nni_msgq *);
@@ -76,6 +85,14 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
mq->mq_geterr = 0;
*mqp = mq;
+ nni_atomic_init64(&mq->mq_get_bytes);
+ nni_atomic_init64(&mq->mq_put_bytes);
+ nni_atomic_init64(&mq->mq_get_msgs);
+ nni_atomic_init64(&mq->mq_put_msgs);
+ nni_atomic_init64(&mq->mq_get_errs);
+ nni_atomic_init64(&mq->mq_put_errs);
+ nni_atomic_init64(&mq->mq_discards);
+
return (0);
}
@@ -185,6 +202,7 @@ nni_msgq_flush(nni_msgq *mq)
mq->mq_get = 0;
}
mq->mq_len--;
+ nni_atomic_inc64(&mq->mq_discards, 1);
nni_msg_free(msg);
}
nni_msgq_run_notify(mq);
@@ -215,13 +233,20 @@ nni_msgq_run_putq(nni_msgq *mq)
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
nni_aio_finish(waio, 0, len);
@@ -230,6 +255,8 @@ nni_msgq_run_putq(nni_msgq *mq)
// Otherwise if we have room in the buffer, just queue it.
if (mq->mq_len < mq->mq_cap) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
nni_list_remove(&mq->mq_aio_putq, waio);
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
@@ -265,8 +292,13 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ size_t len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
continue;
}
@@ -278,18 +310,26 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = nni_aio_get_msg(waio);
len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
+
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
+ nni_aio_finish(waio, 0, len);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
- nni_aio_finish(waio, 0, len);
continue;
}
@@ -338,6 +378,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
}
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_puterr) {
+ nni_atomic_inc64(&mq->mq_put_errs, 1);
nni_aio_finish_error(aio, mq->mq_puterr);
nni_mtx_unlock(&mq->mq_lock);
return;
@@ -348,6 +389,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
(nni_list_empty(&mq->mq_aio_getq))) {
+ nni_atomic_inc64(&mq->mq_put_errs, 1);
nni_mtx_unlock(&mq->mq_lock);
nni_aio_finish_error(aio, rv);
return;
@@ -370,6 +412,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_geterr) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_atomic_inc64(&mq->mq_get_errs, 1);
nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
@@ -377,6 +420,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
if ((rv != 0) && (mq->mq_len == 0) &&
(nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_atomic_inc64(&mq->mq_get_errs, 1);
nni_aio_finish_error(aio, rv);
return;
}
@@ -392,6 +436,7 @@ int
nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
{
nni_aio *raio;
+ size_t len;
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_closed) {
@@ -399,15 +444,22 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_ECLOSED);
}
+ len = nni_msg_len(msg);
+
// The presence of any blocked reader indicates that
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_list_remove(&mq->mq_aio_getq, raio);
nni_aio_finish_msg(raio, msg);
nni_msgq_run_notify(mq);
@@ -418,6 +470,9 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
// Otherwise if we have room in the buffer, just queue it.
if (mq->mq_len < mq->mq_cap) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
+
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
mq->mq_put = 0;
@@ -514,6 +569,7 @@ nni_msgq_resize(nni_msgq *mq, int cap)
mq->mq_get = 0;
}
mq->mq_len--;
+ nni_atomic_inc64(&mq->mq_discards, 1);
nni_msg_free(msg);
}
if (newq == NULL) {
@@ -586,3 +642,45 @@ nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp)
*sp = mq->mq_sendable;
return (0);
}
+
+uint64_t
+nni_msgq_stat_get_bytes(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_bytes));
+}
+
+uint64_t
+nni_msgq_stat_put_bytes(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_bytes));
+}
+
+uint64_t
+nni_msgq_stat_get_msgs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_msgs));
+}
+
+uint64_t
+nni_msgq_stat_put_msgs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_msgs));
+}
+
+uint64_t
+nni_msgq_stat_get_errs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_errs));
+}
+
+uint64_t
+nni_msgq_stat_put_errs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_errs));
+}
+
+uint64_t
+nni_msgq_stat_discards(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_discards));
+}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 7dc5800d..bd9b3682 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -99,4 +99,13 @@ extern int nni_msgq_len(nni_msgq *mq);
extern int nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **);
extern int nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **);
+// message queues keep statistics
+extern uint64_t nni_msgq_stat_get_bytes(nni_msgq *);
+extern uint64_t nni_msgq_stat_put_bytes(nni_msgq *);
+extern uint64_t nni_msgq_stat_get_msgs(nni_msgq *);
+extern uint64_t nni_msgq_stat_put_msgs(nni_msgq *);
+extern uint64_t nni_msgq_stat_get_errs(nni_msgq *);
+extern uint64_t nni_msgq_stat_put_errs(nni_msgq *);
+extern uint64_t nni_msgq_stat_discards(nni_msgq *);
+
#endif // CORE_MSQUEUE_H
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 9af12720..f28ddf8c 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -41,6 +41,7 @@
#include "core/protocol.h"
#include "core/random.h"
#include "core/reap.h"
+#include "core/stats.h"
#include "core/strs.h"
#include "core/taskq.h"
#include "core/thread.h"
diff --git a/src/core/pipe.c b/src/core/pipe.c
index bdbc76e0..d623e158 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -11,6 +11,7 @@
#include "core/nng_impl.h"
#include "sockimpl.h"
+#include <stdio.h>
#include <string.h>
// This file contains functions relating to pipes.
@@ -82,6 +83,7 @@ pipe_destroy(nni_pipe *p)
p->p_tran_ops.p_stop(p->p_tran_data);
}
+ nni_stat_remove(&p->p_stats.s_root);
nni_pipe_remove(p);
if (p->p_proto_data != NULL) {
@@ -177,6 +179,28 @@ nni_pipe_peer(nni_pipe *p)
return (p->p_tran_ops.p_peer(p->p_tran_data));
}
+void
+nni_pipe_stats_init(nni_pipe *p)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_pipe_stats *st = &p->p_stats;
+
+ if (p->p_listener != NULL) {
+ st->s_ep_id.si_name = "listener";
+ st->s_ep_id.si_desc = "listener for pipe";
+ st->s_ep_id.si_value = nni_listener_id(p->p_listener);
+ } else {
+ st->s_ep_id.si_name = "dialer";
+ st->s_ep_id.si_desc = "dialer for pipe";
+ st->s_ep_id.si_value = nni_dialer_id(p->p_dialer);
+ }
+
+ nni_stat_append(NULL, &st->s_root);
+#else
+ NNI_ARG_UNUSED(p);
+#endif
+}
+
int
nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
@@ -184,7 +208,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
int rv;
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
- uint64_t id;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -210,12 +233,27 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_cv_init(&p->p_cv, &nni_pipe_lk);
nni_mtx_lock(&nni_pipe_lk);
- if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
- p->p_id = (uint32_t) id;
+ if ((rv = nni_idhash_alloc32(nni_pipes, &p->p_id, p)) == 0) {
p->p_refcnt = 1;
}
nni_mtx_unlock(&nni_pipe_lk);
+ nni_pipe_stats *st = &p->p_stats;
+ snprintf(st->s_scope, sizeof(st->s_scope), "pipe%u", p->p_id);
+
+ nni_stat_init_scope(&st->s_root, st->s_scope, "pipe statistics");
+
+ nni_stat_init_id(&st->s_id, "id", "pipe id", p->p_id);
+ nni_stat_append(&st->s_root, &st->s_id);
+
+ // name and description fleshed out later.
+ nni_stat_init_id(&st->s_ep_id, "", "", 0);
+ nni_stat_append(&st->s_root, &st->s_ep_id);
+
+ nni_stat_init_id(&st->s_sock_id, "socket", "socket for pipe",
+ nni_sock_id(p->p_sock));
+ nni_stat_append(&st->s_root, &st->s_sock_id);
+
if ((rv != 0) || ((rv = tran->tran_pipe->p_init(tdata, p)) != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
nni_pipe_close(p);
@@ -274,3 +312,9 @@ nni_pipe_dialer_id(nni_pipe *p)
{
return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0);
}
+
+void
+nni_pipe_add_stat(nni_pipe *p, nni_stat_item *item)
+{
+ nni_stat_append(&p->p_stats.s_root, item);
+}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 1e2f2b5d..5a83059f 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -62,4 +62,7 @@ extern uint32_t nni_pipe_dialer_id(nni_pipe *);
// nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find.
extern void nni_pipe_rele(nni_pipe *);
+// nni_pipe_add_stat adds a statistic to the pipe
+extern void nni_pipe_add_stat(nni_pipe *p, nni_stat_item *);
+
#endif // CORE_PIPE_H
diff --git a/src/core/socket.c b/src/core/socket.c
index b2f331fb..63696b36 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -53,6 +53,22 @@ typedef struct nni_sock_pipe_cb {
void * cb_arg;
} nni_sock_pipe_cb;
+typedef struct sock_stats {
+ nni_stat_item s_root; // socket scope
+ nni_stat_item s_id; // socket id
+ nni_stat_item s_name; // socket name
+ nni_stat_item s_protocol; // socket protocol
+ nni_stat_item s_ndialers; // number of dialers
+ nni_stat_item s_nlisteners; // number of listeners
+ nni_stat_item s_npipes; // number of pipes
+ nni_stat_item s_rxbytes; // number of bytes received
+ nni_stat_item s_txbytes; // number of bytes received
+ nni_stat_item s_rxmsgs; // number of msgs received
+ nni_stat_item s_txmsgs; // number of msgs sent
+ nni_stat_item s_protorej; // pipes rejected by protocol
+ nni_stat_item s_apprej; // pipes rejected by application
+} sock_stats;
+
struct nni_socket {
nni_list_node s_node;
nni_mtx s_mx;
@@ -82,6 +98,7 @@ struct nni_socket {
size_t s_rcvmaxsz; // max receive size
nni_list s_options; // opts not handled by sock/proto
char s_name[64]; // socket name (legacy compat)
+ char s_scope[24]; // socket scope ("socket%u", 32 bits max)
nni_list s_listeners; // active listeners
nni_list s_dialers; // active dialers
@@ -94,6 +111,8 @@ struct nni_socket {
nni_mtx s_pipe_cbs_mtx;
nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
+
+ sock_stats s_stats;
};
static void nni_ctx_destroy(nni_ctx *);
@@ -400,6 +419,81 @@ nni_sock_rele(nni_sock *s)
}
static void
+sock_stats_fini(nni_sock *s)
+{
+#ifdef NNG_ENABLE_STATS
+ sock_stats *st = &s->s_stats;
+ nni_stat_remove(&st->s_root);
+#else
+ NNI_ARG_UNUSED(s);
+#endif
+}
+
+static void
+sock_stats_init(nni_sock *s)
+{
+#ifdef NNG_ENABLE_STATS
+ sock_stats * st = &s->s_stats;
+ nni_stat_item *root = &s->s_stats.s_root;
+
+ // To make collection cheap and atomic for the socket,
+ // we just use a single lock for the entire chain.
+
+ nni_stat_init_scope(root, s->s_scope, "socket statistics");
+
+ nni_stat_init_id(&st->s_id, "id", "socket id", s->s_id);
+ nni_stat_append(root, &st->s_id);
+
+ nni_stat_init_string(&st->s_name, "name", "socket name", s->s_name);
+ nni_stat_set_lock(&st->s_name, &s->s_mx);
+ nni_stat_append(root, &st->s_name);
+
+ nni_stat_init_string(&st->s_protocol, "protocol", "socket protocol",
+ nni_sock_proto_name(s));
+ nni_stat_append(root, &st->s_protocol);
+
+ nni_stat_init_atomic(&st->s_ndialers, "ndialers", "open dialers");
+ nni_stat_set_type(&st->s_ndialers, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_ndialers);
+
+ nni_stat_init_atomic(
+ &st->s_nlisteners, "nlisteners", "open listeners");
+ nni_stat_set_type(&st->s_nlisteners, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_nlisteners);
+
+ nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
+ nni_stat_set_type(&st->s_npipes, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_npipes);
+
+ nni_stat_init_atomic(&st->s_rxbytes, "rxbytes", "bytes received");
+ nni_stat_set_unit(&st->s_rxbytes, NNG_UNIT_BYTES);
+ nni_stat_append(root, &st->s_rxbytes);
+
+ nni_stat_init_atomic(&st->s_txbytes, "txbytes", "bytes sent");
+ nni_stat_set_unit(&st->s_txbytes, NNG_UNIT_BYTES);
+ nni_stat_append(root, &st->s_txbytes);
+
+ nni_stat_init_atomic(&st->s_rxmsgs, "rxmsgs", "messages received");
+ nni_stat_set_unit(&st->s_rxmsgs, NNG_UNIT_MESSAGES);
+ nni_stat_append(root, &st->s_rxmsgs);
+
+ nni_stat_init_atomic(&st->s_txmsgs, "txmsgs", "messages sent");
+ nni_stat_set_unit(&st->s_txmsgs, NNG_UNIT_MESSAGES);
+ nni_stat_append(root, &st->s_txmsgs);
+
+ nni_stat_init_atomic(
+ &st->s_protorej, "protoreject", "pipes rejected by protocol");
+ nni_stat_append(root, &st->s_protorej);
+
+ nni_stat_init_atomic(
+ &st->s_apprej, "appreject", "pipes rejected by application");
+ nni_stat_append(root, &st->s_apprej);
+#else
+ NNI_ARG_UNUSED(s);
+#endif
+}
+
+static void
sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
@@ -418,6 +512,7 @@ sock_destroy(nni_sock *s)
nni_mtx_lock(&s->s_mx);
nni_mtx_unlock(&s->s_mx);
+ sock_stats_fini(s);
nni_msgq_fini(s->s_urq);
nni_msgq_fini(s->s_uwq);
nni_cv_fini(&s->s_close_cv);
@@ -469,6 +564,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
nni_cv_init(&s->s_close_cv, &sock_lk);
+ sock_stats_init(s);
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 1)) != 0) ||
@@ -567,7 +663,14 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
// Set the sockname.
(void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id);
- return (rv);
+ // Set up basic stat values.
+ (void) snprintf(s->s_scope, sizeof(s->s_scope), "socket%u", s->s_id);
+ nni_stat_set_value(&s->s_stats.s_id, s->s_id);
+
+ // Add our stats chain.
+ nni_stat_append(NULL, &s->s_stats.s_root);
+
+ return (0);
}
// nni_sock_shutdown shuts down the socket; after this point no
@@ -699,6 +802,8 @@ nni_sock_close(nni_sock *s)
// is idempotent.
nni_sock_shutdown(s);
+ nni_stat_remove(&s->s_stats.s_root);
+
nni_mtx_lock(&sock_lk);
if (s->s_closed) {
// Some other thread called close. All we need to do
@@ -830,6 +935,9 @@ nni_sock_add_listener(nni_sock *s, nni_listener *l)
}
nni_list_append(&s->s_listeners, l);
+
+ nni_stat_inc_atomic(&s->s_stats.s_nlisteners, 1);
+
nni_mtx_unlock(&s->s_mx);
return (0);
}
@@ -856,34 +964,11 @@ nni_sock_add_dialer(nni_sock *s, nni_dialer *d)
}
nni_list_append(&s->s_dialers, d);
- nni_mtx_unlock(&s->s_mx);
- return (0);
-}
-void
-nni_sock_remove_listener(nni_sock *s, nni_listener *l)
-{
- nni_mtx_lock(&s->s_mx);
- if (nni_list_active(&s->s_listeners, l)) {
- nni_list_remove(&s->s_listeners, l);
- if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
- nni_cv_wake(&s->s_cv);
- }
- }
- nni_mtx_unlock(&s->s_mx);
-}
+ nni_stat_inc_atomic(&s->s_stats.s_ndialers, 1);
-void
-nni_sock_remove_dialer(nni_sock *s, nni_dialer *d)
-{
- nni_mtx_lock(&s->s_mx);
- if (nni_list_active(&s->s_dialers, d)) {
- nni_list_remove(&s->s_dialers, d);
- if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
- nni_cv_wake(&s->s_cv);
- }
- }
nni_mtx_unlock(&s->s_mx);
+ return (0);
}
int
@@ -1398,13 +1483,25 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
d->d_pipe = p;
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);
+ nni_stat_inc_atomic(&d->d_stats.s_npipes, 1);
+
+ nni_pipe_stats_init(p);
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
nni_mtx_lock(&s->s_mx);
- if ((p->p_closed) ||
- (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&d->d_stats.s_apprej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_apprej, 1);
+ nni_pipe_rele(p);
+ return;
+ }
+ if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&d->d_stats.s_protorej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_protorej, 1);
nni_pipe_close(p);
nni_pipe_rele(p);
return;
@@ -1493,17 +1590,30 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_mtx_unlock(&s->s_mx);
return;
}
+
p->p_listener = l;
nni_list_append(&l->l_pipes, p);
nni_list_append(&s->s_pipes, p);
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_npipes, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);
+
+ nni_pipe_stats_init(p);
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
nni_mtx_lock(&s->s_mx);
- if ((p->p_closed) ||
- (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_apprej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_apprej, 1);
+ nni_pipe_rele(p);
+ return;
+ }
+ if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_protorej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_protorej, 1);
nni_pipe_close(p);
nni_pipe_rele(p);
return;
@@ -1616,6 +1726,15 @@ nni_pipe_remove(nni_pipe *p)
nni_dialer *d = p->p_dialer;
nni_mtx_lock(&s->s_mx);
+ if (nni_list_node_active(&p->p_sock_node)) {
+ nni_stat_dec_atomic(&s->s_stats.s_npipes, 1);
+ }
+ if (p->p_listener != NULL) {
+ nni_stat_dec_atomic(&p->p_listener->l_stats.s_npipes, 1);
+ }
+ if (p->p_dialer != NULL) {
+ nni_stat_dec_atomic(&p->p_dialer->d_stats.s_npipes, 1);
+ }
nni_list_node_remove(&p->p_sock_node);
nni_list_node_remove(&p->p_ep_node);
p->p_listener = NULL;
@@ -1629,3 +1748,38 @@ nni_pipe_remove(nni_pipe *p)
}
nni_mtx_unlock(&s->s_mx);
}
+
+void
+nni_sock_add_stat(nni_sock *s, nni_stat_item *stat)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_append(&s->s_stats.s_root, stat);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(stat);
+#endif
+}
+
+void
+nni_sock_bump_tx(nni_sock *s, uint64_t sz)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc_atomic(&s->s_stats.s_txmsgs, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_txbytes, sz);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(sz);
+#endif
+}
+
+void
+nni_sock_bump_rx(nni_sock *s, uint64_t sz)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc_atomic(&s->s_stats.s_rxmsgs, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_rxbytes, sz);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(sz);
+#endif
+}
diff --git a/src/core/socket.h b/src/core/socket.h
index 4b9c4642..5486918e 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -25,6 +25,7 @@ extern uint16_t nni_sock_peer_id(nni_sock *);
extern const char *nni_sock_proto_name(nni_sock *);
extern const char *nni_sock_peer_name(nni_sock *);
extern void * nni_sock_proto_data(nni_sock *);
+extern void nni_sock_add_stat(nni_sock *, nni_stat_item *);
extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *);
@@ -107,4 +108,12 @@ extern int nni_ctx_getopt(
extern int nni_ctx_setopt(
nni_ctx *, const char *, const void *, size_t, nni_opt_type);
+// nni_sock_bump_rx is called by a protocol when a message is received by
+// a consuming app. It bumps the rxmsgs by one and rxbytes by the size.
+extern void nni_sock_bump_rx(nni_sock *s, uint64_t sz);
+
+// nni_sock_bump_rx is called by a protocol when a message is sent by
+// a consuming app. It bumps the txmsgs by one and txbytes by the size.
+extern void nni_sock_bump_tx(nni_sock *s, uint64_t sz);
+
#endif // CORE_SOCKET_H
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index be454d8a..ae1fd92e 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -15,6 +15,22 @@
// and pipes. This must not be exposed to other subsystems -- these internals
// are subject to change at any time.
+typedef struct nni_dialer_stats {
+ nni_stat_item s_root;
+ nni_stat_item s_id;
+ nni_stat_item s_sock;
+ nni_stat_item s_url;
+ nni_stat_item s_npipes;
+ nni_stat_item s_connok;
+ nni_stat_item s_refused;
+ nni_stat_item s_canceled;
+ nni_stat_item s_timedout;
+ nni_stat_item s_othererr;
+ nni_stat_item s_protorej;
+ nni_stat_item s_apprej;
+ char s_scope[24]; // scope name for stats
+} nni_dialer_stats;
+
struct nni_dialer {
nni_tran_dialer_ops d_ops; // transport ops
nni_tran * d_tran; // transport pointer
@@ -38,8 +54,25 @@ struct nni_dialer {
nni_duration d_inirtime; // initial time for reconnect
nni_time d_conntime; // time of last good connect
nni_reap_item d_reap;
+ nni_dialer_stats d_stats;
};
+typedef struct nni_listener_stats {
+ nni_stat_item s_root;
+ nni_stat_item s_id;
+ nni_stat_item s_sock;
+ nni_stat_item s_url;
+ nni_stat_item s_npipes;
+ nni_stat_item s_accept;
+ nni_stat_item s_aborted; // aborted remotely
+ nni_stat_item s_timedout;
+ nni_stat_item s_canceled;
+ nni_stat_item s_othererr;
+ nni_stat_item s_protorej;
+ nni_stat_item s_apprej;
+ char s_scope[24]; // scope name for stats
+} nni_listener_stats;
+
struct nni_listener {
nni_tran_listener_ops l_ops; // transport ops
nni_tran * l_tran; // transport pointer
@@ -56,8 +89,17 @@ struct nni_listener {
nni_aio * l_acc_aio;
nni_aio * l_tmo_aio;
nni_reap_item l_reap;
+ nni_listener_stats l_stats;
};
+typedef struct nni_pipe_stats {
+ nni_stat_item s_root;
+ nni_stat_item s_id;
+ nni_stat_item s_ep_id;
+ nni_stat_item s_sock_id;
+ char s_scope[16]; // scope name for stats ("pipe" is short)
+} nni_pipe_stats;
+
struct nni_pipe {
uint32_t p_id;
nni_tran_pipe_ops p_tran_ops;
@@ -76,13 +118,11 @@ struct nni_pipe {
nni_mtx p_mtx;
nni_cv p_cv;
nni_reap_item p_reap;
+ nni_pipe_stats p_stats;
};
-extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
-extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *);
-
-extern int nni_sock_add_listener(nni_sock *, nni_listener *);
-extern void nni_sock_remove_listener(nni_sock *, nni_listener *);
+extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
+extern int nni_sock_add_listener(nni_sock *, nni_listener *);
extern void nni_dialer_add_pipe(nni_dialer *, void *);
extern void nni_dialer_shutdown(nni_dialer *);
@@ -101,5 +141,6 @@ extern void nni_pipe_remove(nni_pipe *);
extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev);
extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *, void *);
extern void nni_pipe_start(nni_pipe *);
+extern void nni_pipe_stats_init(nni_pipe *);
#endif // CORE_SOCKIMPL_H
diff --git a/src/core/stats.c b/src/core/stats.c
new file mode 100644
index 00000000..0363a932
--- /dev/null
+++ b/src/core/stats.c
@@ -0,0 +1,525 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdio.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+typedef struct nng_stat nni_stat;
+
+struct nng_stat {
+ char * s_name;
+ const char * s_desc;
+ const char * s_string;
+ uint64_t s_value;
+ nni_time s_time;
+ nni_stat_type s_type;
+ nni_stat_unit s_unit;
+ nni_stat_item *s_item; // Used during snapshot collection
+ nni_list s_children;
+ nni_stat * s_parent;
+ nni_list_node s_node;
+};
+
+#ifdef NNG_ENABLE_STATS
+static nni_stat_item stats_root;
+static nni_mtx stats_lock;
+static nni_mtx * stats_held = NULL;
+#endif
+
+void
+nni_stat_append(nni_stat_item *parent, nni_stat_item *child)
+{
+#ifdef NNG_ENABLE_STATS
+ if (parent == NULL) {
+ parent = &stats_root;
+ }
+ nni_mtx_lock(&stats_lock);
+ // Make sure that the lists for both children and parents
+ // are correctly initialized.
+ if (parent->si_children.ll_head.ln_next == NULL) {
+ NNI_LIST_INIT(&parent->si_children, nni_stat_item, si_node);
+ }
+ if (child->si_children.ll_head.ln_next == NULL) {
+ NNI_LIST_INIT(&child->si_children, nni_stat_item, si_node);
+ }
+ nni_list_append(&parent->si_children, child);
+ child->si_parent = parent;
+ nni_mtx_unlock(&stats_lock);
+#else
+ NNI_ARG_UNUSED(parent);
+ NNI_ARG_UNUSED(child);
+#endif
+}
+
+void
+nni_stat_remove(nni_stat_item *child)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_item *parent;
+ nni_mtx_lock(&stats_lock);
+ if ((parent = child->si_parent) != NULL) {
+ nni_list_remove(&parent->si_children, child);
+ child->si_parent = NULL;
+ }
+ nni_mtx_unlock(&stats_lock);
+#else
+ NNI_ARG_UNUSED(child);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+void
+nni_stat_init(nni_stat_item *stat, const char *name, const char *desc)
+{
+ NNI_LIST_INIT(&stat->si_children, nni_stat_item, si_node);
+ stat->si_parent = NULL;
+ stat->si_name = name;
+ stat->si_desc = desc;
+ stat->si_lock = NULL;
+ stat->si_update = NULL;
+ stat->si_private = NULL;
+ stat->si_string = NULL;
+ stat->si_value = 0;
+ stat->si_type = NNG_STAT_COUNTER;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_scope(nni_stat_item *stat, const char *name, const char *desc)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_type = NNG_STAT_SCOPE;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_string(
+ nni_stat_item *stat, const char *name, const char *desc, const char *str)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_string = str;
+ stat->si_type = NNG_STAT_STRING;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_id(
+ nni_stat_item *stat, const char *name, const char *desc, uint64_t id)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_value = id;
+ stat->si_type = NNG_STAT_ID;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_bool(
+ nni_stat_item *stat, const char *name, const char *desc, bool v)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_value = v ? 1 : 0;
+ stat->si_type = NNG_STAT_BOOLEAN;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+static void
+stat_atomic_update(nni_stat_item *stat, void *notused)
+{
+ NNI_ARG_UNUSED(notused);
+ stat->si_value = nni_atomic_get64(&stat->si_atomic);
+}
+
+void
+nni_stat_init_atomic(nni_stat_item *stat, const char *name, const char *desc)
+{
+
+ nni_stat_init(stat, name, desc);
+ stat->si_value = 0;
+ stat->si_private = NULL;
+ stat->si_update = stat_atomic_update;
+ nni_atomic_init64(&stat->si_atomic);
+}
+
+void
+nni_stat_inc_atomic(nni_stat_item *stat, uint64_t inc)
+{
+ nni_atomic_inc64(&stat->si_atomic, inc);
+}
+
+void
+nni_stat_dec_atomic(nni_stat_item *stat, uint64_t inc)
+{
+ nni_atomic_dec64(&stat->si_atomic, inc);
+}
+#endif
+
+void
+nni_stat_set_value(nni_stat_item *stat, uint64_t v)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_value = v;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(v);
+#endif
+}
+
+void
+nni_stat_set_string(nni_stat_item *stat, const char *str)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_string = str;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(str);
+#endif
+}
+
+void
+nni_stat_set_lock(nni_stat_item *stat, nni_mtx *mtx)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_lock = mtx;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(mtx);
+#endif
+}
+
+void
+nni_stat_set_update(nni_stat_item *stat, nni_stat_update f, void *a)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_update = f;
+ stat->si_private = a;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(f);
+ NNI_ARG_UNUSED(a);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+void
+nni_stat_set_type(nni_stat_item *stat, int type)
+{
+ stat->si_type = type;
+}
+
+void
+nni_stat_set_unit(nni_stat_item *stat, int unit)
+{
+ stat->si_unit = unit;
+}
+#endif
+
+void
+nng_stats_free(nni_stat *st)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat *child;
+
+ while ((child = nni_list_first(&st->s_children)) != NULL) {
+ nni_list_remove(&st->s_children, child);
+ nng_stats_free(child);
+ }
+ nni_strfree(st->s_name);
+ NNI_FREE_STRUCT(st);
+#else
+ NNI_ARG_UNUSED(st);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+static int
+stat_make_tree(nni_stat_item *item, nni_stat **sp)
+{
+ nni_stat * stat;
+ nni_stat_item *child;
+
+ if ((stat = NNI_ALLOC_STRUCT(stat)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((stat->s_name = nni_strdup(item->si_name)) == NULL) {
+ NNI_FREE_STRUCT(stat);
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&stat->s_children, nni_stat, s_node);
+ stat->s_item = item;
+ stat->s_type = item->si_type;
+ stat->s_unit = item->si_unit;
+ stat->s_desc = item->si_desc;
+ stat->s_parent = NULL;
+
+ NNI_LIST_FOREACH (&item->si_children, child) {
+ nni_stat *cs;
+ int rv;
+ if ((rv = stat_make_tree(child, &cs)) != 0) {
+ nng_stats_free(stat);
+ return (rv);
+ }
+ nni_list_append(&stat->s_children, cs);
+ cs->s_parent = stat;
+ }
+ *sp = stat;
+ return (0);
+}
+
+static void
+stat_update(nni_stat *stat)
+{
+ nni_stat_item *item = stat->s_item;
+
+ if (item->si_lock != stats_held) {
+ if (stats_held != NULL) {
+ nni_mtx_unlock(stats_held);
+ stats_held = NULL;
+ }
+ if (item->si_lock != NULL) {
+ nni_mtx_lock(item->si_lock);
+ stats_held = item->si_lock;
+ }
+ }
+ if (item->si_update != NULL) {
+ item->si_update(item, item->si_private);
+ }
+ stat->s_value = item->si_value;
+ stat->s_string = item->si_string;
+ stat->s_time = nni_clock();
+}
+
+static void
+stat_update_tree(nni_stat *stat)
+{
+ nni_stat *child;
+ stat_update(stat);
+ NNI_LIST_FOREACH (&stat->s_children, child) {
+ stat_update_tree(child);
+ }
+}
+
+int
+nni_stat_snapshot(nni_stat **statp, nni_stat_item *item)
+{
+ int rv;
+ nni_stat *stat;
+
+ if (item == NULL) {
+ item = &stats_root;
+ }
+ nni_mtx_lock(&stats_lock);
+ if ((rv = stat_make_tree(item, &stat)) != 0) {
+ nni_mtx_unlock(&stats_lock);
+ return (rv);
+ }
+ stat_update_tree(stat);
+ if (stats_held != NULL) {
+ nni_mtx_unlock(stats_held);
+ stats_held = NULL;
+ }
+ nni_mtx_unlock(&stats_lock);
+ *statp = stat;
+ return (0);
+}
+#endif
+
+int
+nng_stats_get(nng_stat **statp)
+{
+#ifdef NNG_ENABLE_STATS
+ return (nni_stat_snapshot(statp, &stats_root));
+#else
+ NNI_ARG_UNUSED(statp);
+ return (NNG_ENOTSUP);
+#endif
+}
+
+nng_stat *
+nng_stat_parent(nng_stat *stat)
+{
+ return (stat->s_parent);
+}
+
+nng_stat *
+nng_stat_next(nng_stat *stat)
+{
+ if (stat->s_parent == NULL) {
+ return (NULL); // Root node, no siblings.
+ }
+ return (nni_list_next(&stat->s_parent->s_children, stat));
+}
+
+nng_stat *
+nng_stat_child(nng_stat *stat)
+{
+ return (nni_list_first(&stat->s_children));
+}
+
+const char *
+nng_stat_name(nni_stat *stat)
+{
+ return (stat->s_name);
+}
+
+uint64_t
+nng_stat_value(nni_stat *stat)
+{
+ return (stat->s_value);
+}
+
+const char *
+nng_stat_string(nng_stat *stat)
+{
+ return (stat->s_string);
+}
+
+uint64_t
+nng_stat_timestamp(nng_stat *stat)
+{
+ return ((uint64_t) stat->s_time);
+}
+
+int
+nng_stat_type(nng_stat *stat)
+{
+ return (stat->s_type);
+}
+
+int
+nng_stat_unit(nng_stat *stat)
+{
+ return (stat->s_unit);
+}
+
+const char *
+nng_stat_desc(nng_stat *stat)
+{
+ return (stat->s_desc);
+}
+
+int
+nni_stat_sys_init(void)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_mtx_init(&stats_lock);
+ NNI_LIST_INIT(&stats_root.si_children, nni_stat_item, si_node);
+ stats_root.si_name = "";
+ stats_root.si_desc = "all statistsics";
+#endif
+
+ return (0);
+}
+
+void
+nni_stat_sys_fini(void)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_mtx_fini(&stats_lock);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+void
+stat_sprint_scope(nni_stat *stat, char **scope, int *lenp)
+{
+ if (stat->s_parent != NULL) {
+ stat_sprint_scope(stat->s_parent, scope, lenp);
+ }
+ if (strlen(stat->s_name) > 0) {
+ snprintf(*scope, *lenp, "%s.", stat->s_name);
+ } else {
+ (*scope)[0] = '\0';
+ }
+ *lenp -= strlen(*scope);
+ *scope += strlen(*scope);
+}
+#endif
+
+void
+nng_stats_dump(nng_stat *stat)
+{
+#ifdef NNG_ENABLE_STATS
+ static char buf[128]; // to minimize recursion, not thread safe
+ static char line[128];
+ int len;
+ char * scope;
+ char * indent = " ";
+ unsigned long long val;
+ nni_stat * child;
+
+ switch (nng_stat_type(stat)) {
+ case NNG_STAT_SCOPE:
+ scope = buf;
+ len = sizeof(buf);
+ stat_sprint_scope(stat, &scope, &len);
+ len = strlen(buf);
+ if (len > 0) {
+ if (buf[len - 1] == '.') {
+ buf[--len] = '\0';
+ }
+ }
+ if (len > 0) {
+ snprintf(line, sizeof(line), "\n%s:", buf);
+ }
+ break;
+ case NNG_STAT_STRING:
+ snprintf(line, sizeof(line), "%s%-32s\"%s\"", indent,
+ nng_stat_name(stat), nng_stat_string(stat));
+ break;
+ case NNG_STAT_BOOLEAN:
+ val = nng_stat_value(stat);
+ snprintf(line, sizeof(line), "%s%-32s%s", indent,
+ nng_stat_name(stat), val != 0 ? "true" : "false");
+ break;
+ case NNG_STAT_LEVEL:
+ case NNG_STAT_COUNTER:
+ val = nng_stat_value(stat);
+ switch (nng_stat_unit(stat)) {
+ case NNG_UNIT_BYTES:
+ snprintf(line, sizeof(line), "%s%-32s%llu bytes",
+ indent, nng_stat_name(stat), val);
+ break;
+ case NNG_UNIT_MESSAGES:
+ snprintf(line, sizeof(line), "%s%-32s%llu msgs",
+ indent, nng_stat_name(stat), val);
+ break;
+ case NNG_UNIT_MILLIS:
+ snprintf(line, sizeof(line), "%s%-32s%llu msec",
+ indent, nng_stat_name(stat), val);
+ break;
+ case NNG_UNIT_NONE:
+ case NNG_UNIT_EVENTS:
+ default:
+ snprintf(line, sizeof(line), "%s%-32s%llu", indent,
+ nng_stat_name(stat), val);
+ break;
+ }
+ break;
+ case NNG_STAT_ID:
+ val = nng_stat_value(stat);
+ snprintf(line, (sizeof line), "%s%-32s%llu", indent,
+ nng_stat_name(stat), val);
+ break;
+ default:
+ snprintf(line, (sizeof line), "%s%-32s<?>", indent,
+ nng_stat_name(stat));
+ break;
+ }
+ nni_plat_println(line);
+
+ NNI_LIST_FOREACH (&stat->s_children, child) {
+ nng_stats_dump(child);
+ }
+#else
+ NNI_ARG_UNUSED(stat);
+#endif
+}
diff --git a/src/core/stats.h b/src/core/stats.h
new file mode 100644
index 00000000..8cfe14de
--- /dev/null
+++ b/src/core/stats.h
@@ -0,0 +1,108 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_STATS_H
+#define CORE_STATS_H
+
+#include "core/defs.h"
+
+// Statistics support. This is inspired in part by the Solaris
+// kstats framework, but we've simplified and tuned it for our use.
+//
+// Collection of the stats will be done in two steps. First we
+// will walk the list of stats, with the chain held, allocating
+// a user local copy of the stat and pointers.
+//
+// In phase 2, we run the update, and copy the values. We conditionally
+// acquire the lock on the stat first though.
+
+typedef struct nni_stat_item nni_stat_item;
+
+typedef void (*nni_stat_update)(nni_stat_item *, void *);
+typedef enum nng_stat_type_enum nni_stat_type;
+typedef enum nng_unit_enum nni_stat_unit;
+
+// nni_stat_item is used by providers
+struct nni_stat_item {
+#ifdef NNG_ENABLE_STATS
+ nni_list_node si_node; // list node, framework use only
+ nni_stat_item * si_parent; // link back to parent, framework use only
+ nni_list si_children; // children, framework use only
+ const char * si_name; // name of statistic
+ const char * si_desc; // description of statistic (English)
+ nni_mtx * si_lock; // lock for accessing, can be NULL
+ void * si_private; // provider private pointer
+ nni_stat_type si_type; // type of stat, e.g. NNG_STAT_LEVEL
+ nni_stat_unit si_unit; // units, e.g. NNG_UNIT_MILLIS
+ nni_stat_update si_update; // update function (can be NULL)
+ const char * si_string; // string value (NULL for numerics)
+ uint64_t si_value; // numeric value
+ nni_atomic_u64 si_atomic; // atomic value
+#endif
+};
+
+void nni_stat_append(nni_stat_item *, nni_stat_item *);
+void nni_stat_remove(nni_stat_item *);
+
+void nni_stat_set_value(nni_stat_item *, uint64_t);
+void nni_stat_set_string(nni_stat_item *, const char *);
+void nni_stat_set_lock(nni_stat_item *, nni_mtx *);
+void nni_stat_set_update(nni_stat_item *, nni_stat_update, void *);
+
+#ifdef NNG_ENABLE_STATS
+void nni_stat_init(nni_stat_item *, const char *, const char *);
+void nni_stat_init_scope(nni_stat_item *, const char *, const char *);
+void nni_stat_init_string(
+ nni_stat_item *, const char *, const char *, const char *);
+void nni_stat_init_id(nni_stat_item *, const char *, const char *, uint64_t);
+void nni_stat_init_bool(nni_stat_item *, const char *, const char *, bool);
+void nni_stat_init_atomic(nni_stat_item *, const char *, const char *);
+void nni_stat_inc_atomic(nni_stat_item *, uint64_t);
+void nni_stat_dec_atomic(nni_stat_item *, uint64_t);
+void nni_stat_set_type(nni_stat_item *, int);
+void nni_stat_set_unit(nni_stat_item *, int);
+#else
+// We override initialization so that we can avoid compiling static strings
+// into the binary. Presumably if stats are disabled, we are trying to save
+// space for constrained environments. We do evaluate an unused arg to
+// prevent the compiler from bitching about unused values.
+#define nni_stat_init(a, b, c) ((void) (a))
+#define nni_stat_init_scope(a, b, c) ((void) (a))
+#define nni_stat_init_atomic(a, b, c) ((void) (a))
+#define nni_stat_init_id(a, b, c, d) ((void) (a))
+#define nni_stat_init_bool(a, b, c, d) ((void) (a))
+#define nni_stat_init_string(a, b, c, d) ((void) (a))
+#define nni_stat_set_unit(a, b) ((void) (a))
+#define nni_stat_set_type(a, b) ((void) (a))
+#define nni_stat_inc_atomic(stat, inc)
+#define nni_stat_dec_atomic(stat, inc)
+#endif
+
+#if 0
+#define nni_stat_append(a, b)
+#define nni_stat_remove(a)
+#define nni_stat_init(a, b, c)
+#define nni_stat_init_scope(a, b, c)
+#define nni_stat_init_string(a, b, c, d)
+#define nni_stat_init_id(a, b, c, d)
+#define nni_stat_init_bool(a, b, c, d)
+#define nni_stat_dec_atomic(a, b)
+#define nni_stat_set_value(a, b)
+#define nni_stat_set_string(a, b)
+#define nni_stat_set_lock(a, b)
+#define nni_stat_set_update(a, b, c)
+#define nni_stat_set_type(a, b)
+#define nni_stat_set_unit(a, b)
+#endif
+
+int nni_stat_sys_init(void);
+void nni_stat_sys_fini(void);
+
+#endif // CORE_STATS_H