aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-08-22 08:56:53 -0700
committerGarrett D'Amore <garrett@damore.org>2018-09-03 22:57:23 -0400
commitd83b96faeb02d7a3574e63880141d6b23f31ced1 (patch)
treeb6cd2feca3513dccba012b9da2ac230e94d09ac0 /src/core
parent1b2a93503e0ed108f7c4add4bcf4b201a363bb80 (diff)
downloadnng-d83b96faeb02d7a3574e63880141d6b23f31ced1.tar.gz
nng-d83b96faeb02d7a3574e63880141d6b23f31ced1.tar.bz2
nng-d83b96faeb02d7a3574e63880141d6b23f31ced1.zip
fixes #4 Statistics support
This introduces new public APIs for obtaining statistics, and adds some generic stats for dialers, listeners, pipes, and sockets. Also added are stats for inproc and pairv1 protocol. The other protocols and transports will have stats added incrementally as time goes on. A simple test program, and man pages are provided for this. Start by looking at nng_stat(5). Statistics does have some impact, and they can be disabled by using the advanced NNG_ENABLE_STATS (setting it to OFF, it's ON by default) if you need to build a minimized configuration.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/dialer.c87
-rw-r--r--src/core/dialer.h1
-rw-r--r--src/core/init.c4
-rw-r--r--src/core/listener.c82
-rw-r--r--src/core/listener.h1
-rw-r--r--src/core/msgqueue.c100
-rw-r--r--src/core/msgqueue.h9
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/pipe.c50
-rw-r--r--src/core/pipe.h3
-rw-r--r--src/core/socket.c214
-rw-r--r--src/core/socket.h9
-rw-r--r--src/core/sockimpl.h51
-rw-r--r--src/core/stats.c525
-rw-r--r--src/core/stats.h108
15 files changed, 1200 insertions, 45 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 00ebb513..11faed7c 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -23,6 +23,8 @@ static void dialer_timer_cb(void *);
static nni_idhash *dialers;
static nni_mtx dialers_lk;
+#define BUMPSTAT(x) nni_stat_inc_atomic(x, 1)
+
int
nni_dialer_sys_init(void)
{
@@ -32,8 +34,7 @@ nni_dialer_sys_init(void)
return (rv);
}
nni_mtx_init(&dialers_lk);
- nni_idhash_set_limits(
- dialers, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+ nni_idhash_set_limits(dialers, 1, 0x7fffffff, 1);
return (0);
}
@@ -70,6 +71,55 @@ nni_dialer_destroy(nni_dialer *d)
NNI_FREE_STRUCT(d);
}
+static void
+dialer_stats_init(nni_dialer *d)
+{
+ nni_dialer_stats *st = &d->d_stats;
+ nni_stat_item * root = &st->s_root;
+
+ nni_stat_init_scope(root, st->s_scope, "dialer statistics");
+
+ nni_stat_init_id(&st->s_id, "id", "dialer id", d->d_id);
+ nni_stat_append(root, &st->s_id);
+
+ nni_stat_init_id(&st->s_sock, "socket", "socket for dialer",
+ nni_sock_id(d->d_sock));
+ nni_stat_append(root, &st->s_sock);
+
+ nni_stat_init_string(
+ &st->s_url, "url", "dialer url", d->d_url->u_rawurl);
+ nni_stat_append(root, &st->s_url);
+
+ nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
+ nni_stat_append(root, &st->s_npipes);
+
+ nni_stat_init_atomic(&st->s_connok, "connok", "connections made");
+ nni_stat_append(root, &st->s_connok);
+
+ nni_stat_init_atomic(
+ &st->s_canceled, "canceled", "connections canceled");
+ nni_stat_append(root, &st->s_canceled);
+
+ nni_stat_init_atomic(&st->s_refused, "refused", "connections refused");
+ nni_stat_append(root, &st->s_refused);
+
+ nni_stat_init_atomic(
+ &st->s_timedout, "timedout", "connections timed out");
+ nni_stat_append(root, &st->s_timedout);
+
+ nni_stat_init_atomic(
+ &st->s_othererr, "othererr", "other connection errors");
+ nni_stat_append(root, &st->s_othererr);
+
+ nni_stat_init_atomic(
+ &st->s_protorej, "protoreject", "pipes rejected by protocol");
+ nni_stat_append(root, &st->s_protorej);
+
+ nni_stat_init_atomic(
+ &st->s_apprej, "appreject", "pipes rejected by application");
+ nni_stat_append(root, &st->s_apprej);
+}
+
int
nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
{
@@ -110,6 +160,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
nni_mtx_init(&d->d_mtx);
+ dialer_stats_init(d);
if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) ||
@@ -119,6 +170,10 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
return (rv);
}
+ snprintf(d->d_stats.s_scope, sizeof(d->d_stats.s_scope), "dialer%u",
+ d->d_id);
+ nni_stat_set_value(&d->d_stats.s_id, d->d_id);
+ nni_stat_append(NULL, &d->d_stats.s_root);
*dp = d;
return (0);
}
@@ -167,6 +222,7 @@ nni_dialer_rele(nni_dialer *d)
nni_mtx_lock(&dialers_lk);
d->d_refcnt--;
if ((d->d_refcnt == 0) && (d->d_closed)) {
+ nni_stat_remove(&d->d_stats.s_root);
nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d);
}
nni_mtx_unlock(&dialers_lk);
@@ -242,12 +298,33 @@ dialer_connect_cb(void *arg)
switch ((rv = nni_aio_result(aio))) {
case 0:
+ BUMPSTAT(&d->d_stats.s_connok);
nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
break;
case NNG_ECLOSED: // No further action.
case NNG_ECANCELED: // No further action.
+ BUMPSTAT(&d->d_stats.s_canceled);
break;
+ case NNG_ECONNREFUSED:
+ BUMPSTAT(&d->d_stats.s_refused);
+ if (uaio == NULL) {
+ nni_dialer_timer_start(d);
+ } else {
+ nni_atomic_flag_reset(&d->d_started);
+ }
+ break;
+
+ case NNG_ETIMEDOUT:
+ BUMPSTAT(&d->d_stats.s_timedout);
+ if (uaio == NULL) {
+ nni_dialer_timer_start(d);
+ } else {
+ nni_atomic_flag_reset(&d->d_started);
+ }
+ break;
+
default:
+ BUMPSTAT(&d->d_stats.s_othererr);
if (uaio == NULL) {
nni_dialer_timer_start(d);
} else {
@@ -389,3 +466,9 @@ nni_dialer_getopt(
return (nni_sock_getopt(d->d_sock, name, valp, szp, t));
}
+
+void
+nni_dialer_add_stat(nni_dialer *d, nni_stat_item *stat)
+{
+ nni_stat_append(&d->d_stats.s_root, stat);
+}
diff --git a/src/core/dialer.h b/src/core/dialer.h
index 361c6d5e..da2cc992 100644
--- a/src/core/dialer.h
+++ b/src/core/dialer.h
@@ -26,5 +26,6 @@ extern int nni_dialer_setopt(
nni_dialer *, const char *, const void *, size_t, nni_opt_type);
extern int nni_dialer_getopt(
nni_dialer *, const char *, void *, size_t *, nni_opt_type);
+extern void nni_dialer_add_stat(nni_dialer *, nni_stat_item *);
#endif // CORE_DIALER_H
diff --git a/src/core/init.c b/src/core/init.c
index c66e54d2..60736fb7 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -27,7 +27,8 @@ nni_init_helper(void)
NNI_LIST_INIT(&nni_init_list, nni_initializer, i_node);
nni_inited = true;
- if (((rv = nni_taskq_sys_init()) != 0) ||
+ if (((rv = nni_stat_sys_init()) != 0) ||
+ ((rv = nni_taskq_sys_init()) != 0) ||
((rv = nni_reap_sys_init()) != 0) ||
((rv = nni_timer_sys_init()) != 0) ||
((rv = nni_aio_sys_init()) != 0) ||
@@ -81,6 +82,7 @@ nni_fini(void)
nni_timer_sys_fini();
nni_taskq_sys_fini();
nni_reap_sys_fini(); // must be before timer and aio (expire)
+ nni_stat_sys_fini();
nni_mtx_fini(&nni_init_mtx);
nni_plat_fini();
diff --git a/src/core/listener.c b/src/core/listener.c
index 1fe6faab..135478de 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -24,6 +24,8 @@ static void listener_timer_cb(void *);
static nni_idhash *listeners;
static nni_mtx listeners_lk;
+#define BUMPSTAT(x) nni_stat_inc_atomic(x, 1)
+
int
nni_listener_sys_init(void)
{
@@ -33,8 +35,7 @@ nni_listener_sys_init(void)
return (rv);
}
nni_mtx_init(&listeners_lk);
- nni_idhash_set_limits(
- listeners, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+ nni_idhash_set_limits(listeners, 1, 0x7fffffff, 1);
return (0);
}
@@ -70,6 +71,55 @@ nni_listener_destroy(nni_listener *l)
NNI_FREE_STRUCT(l);
}
+static void
+listener_stats_init(nni_listener *l)
+{
+ nni_listener_stats *st = &l->l_stats;
+ nni_stat_item * root = &st->s_root;
+
+ nni_stat_init_scope(root, st->s_scope, "listener statistics");
+
+ // NB: This will be updated later.
+ nni_stat_init_id(&st->s_id, "id", "listener id", l->l_id);
+ nni_stat_append(root, &st->s_id);
+
+ nni_stat_init_id(&st->s_sock, "socket", "socket for listener",
+ nni_sock_id(l->l_sock));
+ nni_stat_append(root, &st->s_sock);
+
+ nni_stat_init_string(
+ &st->s_url, "url", "listener url", l->l_url->u_rawurl);
+ nni_stat_append(root, &st->s_url);
+
+ nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
+ nni_stat_append(root, &st->s_npipes);
+
+ nni_stat_init_atomic(&st->s_accept, "accept", "connections accepted");
+ nni_stat_append(root, &st->s_accept);
+
+ nni_stat_init_atomic(
+ &st->s_aborted, "aborted", "accepts aborted remotely");
+ nni_stat_append(root, &st->s_aborted);
+
+ nni_stat_init_atomic(&st->s_timedout, "timedout", "accepts timed out");
+ nni_stat_append(root, &st->s_timedout);
+
+ nni_stat_init_atomic(&st->s_canceled, "canceled", "accepts canceled");
+ nni_stat_append(root, &st->s_canceled);
+
+ nni_stat_init_atomic(
+ &st->s_othererr, "othererr", "other accept errors");
+ nni_stat_append(root, &st->s_othererr);
+
+ nni_stat_init_atomic(
+ &st->s_protorej, "protoreject", "pipes rejected by protocol");
+ nni_stat_append(root, &st->s_protorej);
+
+ nni_stat_init_atomic(
+ &st->s_apprej, "appreject", "pipes rejected by application");
+ nni_stat_append(root, &st->s_apprej);
+}
+
int
nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
{
@@ -107,6 +157,7 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
NNI_LIST_NODE_INIT(&l->l_node);
NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node);
+ listener_stats_init(l);
if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) ||
((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) ||
@@ -117,6 +168,12 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
return (rv);
}
+ // Update a few stat bits, and register them.
+ snprintf(l->l_stats.s_scope, sizeof(l->l_stats.s_scope), "listener%u",
+ l->l_id);
+ nni_stat_set_value(&l->l_stats.s_id, l->l_id);
+ nni_stat_append(NULL, &l->l_stats.s_root);
+
*lp = l;
return (0);
}
@@ -165,6 +222,7 @@ nni_listener_rele(nni_listener *l)
nni_mtx_lock(&listeners_lk);
l->l_refcnt--;
if ((l->l_refcnt == 0) && (l->l_closed)) {
+ nni_stat_remove(&l->l_stats.s_root);
nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l);
}
nni_mtx_unlock(&listeners_lk);
@@ -233,18 +291,30 @@ listener_accept_cb(void *arg)
switch (nni_aio_result(aio)) {
case 0:
+ BUMPSTAT(&l->l_stats.s_accept);
nni_listener_add_pipe(l, nni_aio_get_output(aio, 0));
listener_accept_start(l);
break;
case NNG_ECONNABORTED: // remote condition, no cooldown
case NNG_ECONNRESET: // remote condition, no cooldown
- case NNG_EPEERAUTH: // peer validation failure
+ BUMPSTAT(&l->l_stats.s_aborted);
+ listener_accept_start(l);
+ break;
+ case NNG_ETIMEDOUT:
+ // No need to sleep since we timed out already.
+ BUMPSTAT(&l->l_stats.s_timedout);
+ listener_accept_start(l);
+ break;
+ case NNG_EPEERAUTH: // peer validation failure
+ BUMPSTAT(&l->l_stats.s_othererr);
listener_accept_start(l);
break;
case NNG_ECLOSED: // no further action
case NNG_ECANCELED: // no further action
+ BUMPSTAT(&l->l_stats.s_canceled);
break;
default:
+ BUMPSTAT(&l->l_stats.s_othererr);
// We don't really know why we failed, but we backoff
// here. This is because errors here are probably due
// to system failures (resource exhaustion) and we hope
@@ -339,3 +409,9 @@ nni_listener_getopt(
return (nni_sock_getopt(l->l_sock, name, valp, szp, t));
}
+
+void
+nni_listener_add_stat(nni_listener *l, nni_stat_item *stat)
+{
+ nni_stat_append(&l->l_stats.s_root, stat);
+}
diff --git a/src/core/listener.h b/src/core/listener.h
index 67a782bd..828102a2 100644
--- a/src/core/listener.h
+++ b/src/core/listener.h
@@ -26,5 +26,6 @@ extern int nni_listener_setopt(
nni_listener *, const char *, const void *, size_t, nni_opt_type);
extern int nni_listener_getopt(
nni_listener *, const char *, void *, size_t *, nni_opt_type);
+extern void nni_listener_add_stat(nni_listener *, nni_stat_item *);
#endif // CORE_LISTENER_H
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 62a3893b..4b59aead 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -37,6 +37,15 @@ struct nni_msgq {
// Filters.
nni_msgq_filter mq_filter_fn;
void * mq_filter_arg;
+
+ // Statistics.
+ nni_atomic_u64 mq_get_msgs;
+ nni_atomic_u64 mq_put_msgs;
+ nni_atomic_u64 mq_get_bytes;
+ nni_atomic_u64 mq_put_bytes;
+ nni_atomic_u64 mq_get_errs;
+ nni_atomic_u64 mq_put_errs;
+ nni_atomic_u64 mq_discards;
};
static void nni_msgq_run_notify(nni_msgq *);
@@ -76,6 +85,14 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
mq->mq_geterr = 0;
*mqp = mq;
+ nni_atomic_init64(&mq->mq_get_bytes);
+ nni_atomic_init64(&mq->mq_put_bytes);
+ nni_atomic_init64(&mq->mq_get_msgs);
+ nni_atomic_init64(&mq->mq_put_msgs);
+ nni_atomic_init64(&mq->mq_get_errs);
+ nni_atomic_init64(&mq->mq_put_errs);
+ nni_atomic_init64(&mq->mq_discards);
+
return (0);
}
@@ -185,6 +202,7 @@ nni_msgq_flush(nni_msgq *mq)
mq->mq_get = 0;
}
mq->mq_len--;
+ nni_atomic_inc64(&mq->mq_discards, 1);
nni_msg_free(msg);
}
nni_msgq_run_notify(mq);
@@ -215,13 +233,20 @@ nni_msgq_run_putq(nni_msgq *mq)
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
nni_aio_finish(waio, 0, len);
@@ -230,6 +255,8 @@ nni_msgq_run_putq(nni_msgq *mq)
// Otherwise if we have room in the buffer, just queue it.
if (mq->mq_len < mq->mq_cap) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
nni_list_remove(&mq->mq_aio_putq, waio);
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
@@ -265,8 +292,13 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ size_t len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
continue;
}
@@ -278,18 +310,26 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = nni_aio_get_msg(waio);
len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
+
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
+ nni_aio_finish(waio, 0, len);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
- nni_aio_finish(waio, 0, len);
continue;
}
@@ -338,6 +378,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
}
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_puterr) {
+ nni_atomic_inc64(&mq->mq_put_errs, 1);
nni_aio_finish_error(aio, mq->mq_puterr);
nni_mtx_unlock(&mq->mq_lock);
return;
@@ -348,6 +389,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
(nni_list_empty(&mq->mq_aio_getq))) {
+ nni_atomic_inc64(&mq->mq_put_errs, 1);
nni_mtx_unlock(&mq->mq_lock);
nni_aio_finish_error(aio, rv);
return;
@@ -370,6 +412,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_geterr) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_atomic_inc64(&mq->mq_get_errs, 1);
nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
@@ -377,6 +420,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
if ((rv != 0) && (mq->mq_len == 0) &&
(nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_atomic_inc64(&mq->mq_get_errs, 1);
nni_aio_finish_error(aio, rv);
return;
}
@@ -392,6 +436,7 @@ int
nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
{
nni_aio *raio;
+ size_t len;
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_closed) {
@@ -399,15 +444,22 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_ECLOSED);
}
+ len = nni_msg_len(msg);
+
// The presence of any blocked reader indicates that
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_list_remove(&mq->mq_aio_getq, raio);
nni_aio_finish_msg(raio, msg);
nni_msgq_run_notify(mq);
@@ -418,6 +470,9 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
// Otherwise if we have room in the buffer, just queue it.
if (mq->mq_len < mq->mq_cap) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
+
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
mq->mq_put = 0;
@@ -514,6 +569,7 @@ nni_msgq_resize(nni_msgq *mq, int cap)
mq->mq_get = 0;
}
mq->mq_len--;
+ nni_atomic_inc64(&mq->mq_discards, 1);
nni_msg_free(msg);
}
if (newq == NULL) {
@@ -586,3 +642,45 @@ nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp)
*sp = mq->mq_sendable;
return (0);
}
+
+uint64_t
+nni_msgq_stat_get_bytes(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_bytes));
+}
+
+uint64_t
+nni_msgq_stat_put_bytes(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_bytes));
+}
+
+uint64_t
+nni_msgq_stat_get_msgs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_msgs));
+}
+
+uint64_t
+nni_msgq_stat_put_msgs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_msgs));
+}
+
+uint64_t
+nni_msgq_stat_get_errs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_errs));
+}
+
+uint64_t
+nni_msgq_stat_put_errs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_errs));
+}
+
+uint64_t
+nni_msgq_stat_discards(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_discards));
+}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 7dc5800d..bd9b3682 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -99,4 +99,13 @@ extern int nni_msgq_len(nni_msgq *mq);
extern int nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **);
extern int nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **);
+// message queues keep statistics
+extern uint64_t nni_msgq_stat_get_bytes(nni_msgq *);
+extern uint64_t nni_msgq_stat_put_bytes(nni_msgq *);
+extern uint64_t nni_msgq_stat_get_msgs(nni_msgq *);
+extern uint64_t nni_msgq_stat_put_msgs(nni_msgq *);
+extern uint64_t nni_msgq_stat_get_errs(nni_msgq *);
+extern uint64_t nni_msgq_stat_put_errs(nni_msgq *);
+extern uint64_t nni_msgq_stat_discards(nni_msgq *);
+
#endif // CORE_MSQUEUE_H
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 9af12720..f28ddf8c 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -41,6 +41,7 @@
#include "core/protocol.h"
#include "core/random.h"
#include "core/reap.h"
+#include "core/stats.h"
#include "core/strs.h"
#include "core/taskq.h"
#include "core/thread.h"
diff --git a/src/core/pipe.c b/src/core/pipe.c
index bdbc76e0..d623e158 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -11,6 +11,7 @@
#include "core/nng_impl.h"
#include "sockimpl.h"
+#include <stdio.h>
#include <string.h>
// This file contains functions relating to pipes.
@@ -82,6 +83,7 @@ pipe_destroy(nni_pipe *p)
p->p_tran_ops.p_stop(p->p_tran_data);
}
+ nni_stat_remove(&p->p_stats.s_root);
nni_pipe_remove(p);
if (p->p_proto_data != NULL) {
@@ -177,6 +179,28 @@ nni_pipe_peer(nni_pipe *p)
return (p->p_tran_ops.p_peer(p->p_tran_data));
}
+void
+nni_pipe_stats_init(nni_pipe *p)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_pipe_stats *st = &p->p_stats;
+
+ if (p->p_listener != NULL) {
+ st->s_ep_id.si_name = "listener";
+ st->s_ep_id.si_desc = "listener for pipe";
+ st->s_ep_id.si_value = nni_listener_id(p->p_listener);
+ } else {
+ st->s_ep_id.si_name = "dialer";
+ st->s_ep_id.si_desc = "dialer for pipe";
+ st->s_ep_id.si_value = nni_dialer_id(p->p_dialer);
+ }
+
+ nni_stat_append(NULL, &st->s_root);
+#else
+ NNI_ARG_UNUSED(p);
+#endif
+}
+
int
nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
@@ -184,7 +208,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
int rv;
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
- uint64_t id;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -210,12 +233,27 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_cv_init(&p->p_cv, &nni_pipe_lk);
nni_mtx_lock(&nni_pipe_lk);
- if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
- p->p_id = (uint32_t) id;
+ if ((rv = nni_idhash_alloc32(nni_pipes, &p->p_id, p)) == 0) {
p->p_refcnt = 1;
}
nni_mtx_unlock(&nni_pipe_lk);
+ nni_pipe_stats *st = &p->p_stats;
+ snprintf(st->s_scope, sizeof(st->s_scope), "pipe%u", p->p_id);
+
+ nni_stat_init_scope(&st->s_root, st->s_scope, "pipe statistics");
+
+ nni_stat_init_id(&st->s_id, "id", "pipe id", p->p_id);
+ nni_stat_append(&st->s_root, &st->s_id);
+
+ // name and description fleshed out later.
+ nni_stat_init_id(&st->s_ep_id, "", "", 0);
+ nni_stat_append(&st->s_root, &st->s_ep_id);
+
+ nni_stat_init_id(&st->s_sock_id, "socket", "socket for pipe",
+ nni_sock_id(p->p_sock));
+ nni_stat_append(&st->s_root, &st->s_sock_id);
+
if ((rv != 0) || ((rv = tran->tran_pipe->p_init(tdata, p)) != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
nni_pipe_close(p);
@@ -274,3 +312,9 @@ nni_pipe_dialer_id(nni_pipe *p)
{
return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0);
}
+
+void
+nni_pipe_add_stat(nni_pipe *p, nni_stat_item *item)
+{
+ nni_stat_append(&p->p_stats.s_root, item);
+}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 1e2f2b5d..5a83059f 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -62,4 +62,7 @@ extern uint32_t nni_pipe_dialer_id(nni_pipe *);
// nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find.
extern void nni_pipe_rele(nni_pipe *);
+// nni_pipe_add_stat adds a statistic to the pipe
+extern void nni_pipe_add_stat(nni_pipe *p, nni_stat_item *);
+
#endif // CORE_PIPE_H
diff --git a/src/core/socket.c b/src/core/socket.c
index b2f331fb..63696b36 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -53,6 +53,22 @@ typedef struct nni_sock_pipe_cb {
void * cb_arg;
} nni_sock_pipe_cb;
+typedef struct sock_stats {
+ nni_stat_item s_root; // socket scope
+ nni_stat_item s_id; // socket id
+ nni_stat_item s_name; // socket name
+ nni_stat_item s_protocol; // socket protocol
+ nni_stat_item s_ndialers; // number of dialers
+ nni_stat_item s_nlisteners; // number of listeners
+ nni_stat_item s_npipes; // number of pipes
+ nni_stat_item s_rxbytes; // number of bytes received
+ nni_stat_item s_txbytes; // number of bytes received
+ nni_stat_item s_rxmsgs; // number of msgs received
+ nni_stat_item s_txmsgs; // number of msgs sent
+ nni_stat_item s_protorej; // pipes rejected by protocol
+ nni_stat_item s_apprej; // pipes rejected by application
+} sock_stats;
+
struct nni_socket {
nni_list_node s_node;
nni_mtx s_mx;
@@ -82,6 +98,7 @@ struct nni_socket {
size_t s_rcvmaxsz; // max receive size
nni_list s_options; // opts not handled by sock/proto
char s_name[64]; // socket name (legacy compat)
+ char s_scope[24]; // socket scope ("socket%u", 32 bits max)
nni_list s_listeners; // active listeners
nni_list s_dialers; // active dialers
@@ -94,6 +111,8 @@ struct nni_socket {
nni_mtx s_pipe_cbs_mtx;
nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
+
+ sock_stats s_stats;
};
static void nni_ctx_destroy(nni_ctx *);
@@ -400,6 +419,81 @@ nni_sock_rele(nni_sock *s)
}
static void
+sock_stats_fini(nni_sock *s)
+{
+#ifdef NNG_ENABLE_STATS
+ sock_stats *st = &s->s_stats;
+ nni_stat_remove(&st->s_root);
+#else
+ NNI_ARG_UNUSED(s);
+#endif
+}
+
+static void
+sock_stats_init(nni_sock *s)
+{
+#ifdef NNG_ENABLE_STATS
+ sock_stats * st = &s->s_stats;
+ nni_stat_item *root = &s->s_stats.s_root;
+
+ // To make collection cheap and atomic for the socket,
+ // we just use a single lock for the entire chain.
+
+ nni_stat_init_scope(root, s->s_scope, "socket statistics");
+
+ nni_stat_init_id(&st->s_id, "id", "socket id", s->s_id);
+ nni_stat_append(root, &st->s_id);
+
+ nni_stat_init_string(&st->s_name, "name", "socket name", s->s_name);
+ nni_stat_set_lock(&st->s_name, &s->s_mx);
+ nni_stat_append(root, &st->s_name);
+
+ nni_stat_init_string(&st->s_protocol, "protocol", "socket protocol",
+ nni_sock_proto_name(s));
+ nni_stat_append(root, &st->s_protocol);
+
+ nni_stat_init_atomic(&st->s_ndialers, "ndialers", "open dialers");
+ nni_stat_set_type(&st->s_ndialers, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_ndialers);
+
+ nni_stat_init_atomic(
+ &st->s_nlisteners, "nlisteners", "open listeners");
+ nni_stat_set_type(&st->s_nlisteners, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_nlisteners);
+
+ nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
+ nni_stat_set_type(&st->s_npipes, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_npipes);
+
+ nni_stat_init_atomic(&st->s_rxbytes, "rxbytes", "bytes received");
+ nni_stat_set_unit(&st->s_rxbytes, NNG_UNIT_BYTES);
+ nni_stat_append(root, &st->s_rxbytes);
+
+ nni_stat_init_atomic(&st->s_txbytes, "txbytes", "bytes sent");
+ nni_stat_set_unit(&st->s_txbytes, NNG_UNIT_BYTES);
+ nni_stat_append(root, &st->s_txbytes);
+
+ nni_stat_init_atomic(&st->s_rxmsgs, "rxmsgs", "messages received");
+ nni_stat_set_unit(&st->s_rxmsgs, NNG_UNIT_MESSAGES);
+ nni_stat_append(root, &st->s_rxmsgs);
+
+ nni_stat_init_atomic(&st->s_txmsgs, "txmsgs", "messages sent");
+ nni_stat_set_unit(&st->s_txmsgs, NNG_UNIT_MESSAGES);
+ nni_stat_append(root, &st->s_txmsgs);
+
+ nni_stat_init_atomic(
+ &st->s_protorej, "protoreject", "pipes rejected by protocol");
+ nni_stat_append(root, &st->s_protorej);
+
+ nni_stat_init_atomic(
+ &st->s_apprej, "appreject", "pipes rejected by application");
+ nni_stat_append(root, &st->s_apprej);
+#else
+ NNI_ARG_UNUSED(s);
+#endif
+}
+
+static void
sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
@@ -418,6 +512,7 @@ sock_destroy(nni_sock *s)
nni_mtx_lock(&s->s_mx);
nni_mtx_unlock(&s->s_mx);
+ sock_stats_fini(s);
nni_msgq_fini(s->s_urq);
nni_msgq_fini(s->s_uwq);
nni_cv_fini(&s->s_close_cv);
@@ -469,6 +564,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
nni_cv_init(&s->s_close_cv, &sock_lk);
+ sock_stats_init(s);
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 1)) != 0) ||
@@ -567,7 +663,14 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
// Set the sockname.
(void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id);
- return (rv);
+ // Set up basic stat values.
+ (void) snprintf(s->s_scope, sizeof(s->s_scope), "socket%u", s->s_id);
+ nni_stat_set_value(&s->s_stats.s_id, s->s_id);
+
+ // Add our stats chain.
+ nni_stat_append(NULL, &s->s_stats.s_root);
+
+ return (0);
}
// nni_sock_shutdown shuts down the socket; after this point no
@@ -699,6 +802,8 @@ nni_sock_close(nni_sock *s)
// is idempotent.
nni_sock_shutdown(s);
+ nni_stat_remove(&s->s_stats.s_root);
+
nni_mtx_lock(&sock_lk);
if (s->s_closed) {
// Some other thread called close. All we need to do
@@ -830,6 +935,9 @@ nni_sock_add_listener(nni_sock *s, nni_listener *l)
}
nni_list_append(&s->s_listeners, l);
+
+ nni_stat_inc_atomic(&s->s_stats.s_nlisteners, 1);
+
nni_mtx_unlock(&s->s_mx);
return (0);
}
@@ -856,34 +964,11 @@ nni_sock_add_dialer(nni_sock *s, nni_dialer *d)
}
nni_list_append(&s->s_dialers, d);
- nni_mtx_unlock(&s->s_mx);
- return (0);
-}
-void
-nni_sock_remove_listener(nni_sock *s, nni_listener *l)
-{
- nni_mtx_lock(&s->s_mx);
- if (nni_list_active(&s->s_listeners, l)) {
- nni_list_remove(&s->s_listeners, l);
- if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
- nni_cv_wake(&s->s_cv);
- }
- }
- nni_mtx_unlock(&s->s_mx);
-}
+ nni_stat_inc_atomic(&s->s_stats.s_ndialers, 1);
-void
-nni_sock_remove_dialer(nni_sock *s, nni_dialer *d)
-{
- nni_mtx_lock(&s->s_mx);
- if (nni_list_active(&s->s_dialers, d)) {
- nni_list_remove(&s->s_dialers, d);
- if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
- nni_cv_wake(&s->s_cv);
- }
- }
nni_mtx_unlock(&s->s_mx);
+ return (0);
}
int
@@ -1398,13 +1483,25 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
d->d_pipe = p;
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);
+ nni_stat_inc_atomic(&d->d_stats.s_npipes, 1);
+
+ nni_pipe_stats_init(p);
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
nni_mtx_lock(&s->s_mx);
- if ((p->p_closed) ||
- (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&d->d_stats.s_apprej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_apprej, 1);
+ nni_pipe_rele(p);
+ return;
+ }
+ if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&d->d_stats.s_protorej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_protorej, 1);
nni_pipe_close(p);
nni_pipe_rele(p);
return;
@@ -1493,17 +1590,30 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_mtx_unlock(&s->s_mx);
return;
}
+
p->p_listener = l;
nni_list_append(&l->l_pipes, p);
nni_list_append(&s->s_pipes, p);
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_npipes, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);
+
+ nni_pipe_stats_init(p);
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
nni_mtx_lock(&s->s_mx);
- if ((p->p_closed) ||
- (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_apprej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_apprej, 1);
+ nni_pipe_rele(p);
+ return;
+ }
+ if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_protorej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_protorej, 1);
nni_pipe_close(p);
nni_pipe_rele(p);
return;
@@ -1616,6 +1726,15 @@ nni_pipe_remove(nni_pipe *p)
nni_dialer *d = p->p_dialer;
nni_mtx_lock(&s->s_mx);
+ if (nni_list_node_active(&p->p_sock_node)) {
+ nni_stat_dec_atomic(&s->s_stats.s_npipes, 1);
+ }
+ if (p->p_listener != NULL) {
+ nni_stat_dec_atomic(&p->p_listener->l_stats.s_npipes, 1);
+ }
+ if (p->p_dialer != NULL) {
+ nni_stat_dec_atomic(&p->p_dialer->d_stats.s_npipes, 1);
+ }
nni_list_node_remove(&p->p_sock_node);
nni_list_node_remove(&p->p_ep_node);
p->p_listener = NULL;
@@ -1629,3 +1748,38 @@ nni_pipe_remove(nni_pipe *p)
}
nni_mtx_unlock(&s->s_mx);
}
+
+void
+nni_sock_add_stat(nni_sock *s, nni_stat_item *stat)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_append(&s->s_stats.s_root, stat);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(stat);
+#endif
+}
+
+void
+nni_sock_bump_tx(nni_sock *s, uint64_t sz)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc_atomic(&s->s_stats.s_txmsgs, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_txbytes, sz);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(sz);
+#endif
+}
+
+void
+nni_sock_bump_rx(nni_sock *s, uint64_t sz)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc_atomic(&s->s_stats.s_rxmsgs, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_rxbytes, sz);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(sz);
+#endif
+}
diff --git a/src/core/socket.h b/src/core/socket.h
index 4b9c4642..5486918e 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -25,6 +25,7 @@ extern uint16_t nni_sock_peer_id(nni_sock *);
extern const char *nni_sock_proto_name(nni_sock *);
extern const char *nni_sock_peer_name(nni_sock *);
extern void * nni_sock_proto_data(nni_sock *);
+extern void nni_sock_add_stat(nni_sock *, nni_stat_item *);
extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *);
@@ -107,4 +108,12 @@ extern int nni_ctx_getopt(
extern int nni_ctx_setopt(
nni_ctx *, const char *, const void *, size_t, nni_opt_type);
+// nni_sock_bump_rx is called by a protocol when a message is received by
+// a consuming app. It bumps the rxmsgs by one and rxbytes by the size.
+extern void nni_sock_bump_rx(nni_sock *s, uint64_t sz);
+
+// nni_sock_bump_rx is called by a protocol when a message is sent by
+// a consuming app. It bumps the txmsgs by one and txbytes by the size.
+extern void nni_sock_bump_tx(nni_sock *s, uint64_t sz);
+
#endif // CORE_SOCKET_H
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index be454d8a..ae1fd92e 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -15,6 +15,22 @@
// and pipes. This must not be exposed to other subsystems -- these internals
// are subject to change at any time.
+typedef struct nni_dialer_stats {
+ nni_stat_item s_root;
+ nni_stat_item s_id;
+ nni_stat_item s_sock;
+ nni_stat_item s_url;
+ nni_stat_item s_npipes;
+ nni_stat_item s_connok;
+ nni_stat_item s_refused;
+ nni_stat_item s_canceled;
+ nni_stat_item s_timedout;
+ nni_stat_item s_othererr;
+ nni_stat_item s_protorej;
+ nni_stat_item s_apprej;
+ char s_scope[24]; // scope name for stats
+} nni_dialer_stats;
+
struct nni_dialer {
nni_tran_dialer_ops d_ops; // transport ops
nni_tran * d_tran; // transport pointer
@@ -38,8 +54,25 @@ struct nni_dialer {
nni_duration d_inirtime; // initial time for reconnect
nni_time d_conntime; // time of last good connect
nni_reap_item d_reap;
+ nni_dialer_stats d_stats;
};
+typedef struct nni_listener_stats {
+ nni_stat_item s_root;
+ nni_stat_item s_id;
+ nni_stat_item s_sock;
+ nni_stat_item s_url;
+ nni_stat_item s_npipes;
+ nni_stat_item s_accept;
+ nni_stat_item s_aborted; // aborted remotely
+ nni_stat_item s_timedout;
+ nni_stat_item s_canceled;
+ nni_stat_item s_othererr;
+ nni_stat_item s_protorej;
+ nni_stat_item s_apprej;
+ char s_scope[24]; // scope name for stats
+} nni_listener_stats;
+
struct nni_listener {
nni_tran_listener_ops l_ops; // transport ops
nni_tran * l_tran; // transport pointer
@@ -56,8 +89,17 @@ struct nni_listener {
nni_aio * l_acc_aio;
nni_aio * l_tmo_aio;
nni_reap_item l_reap;
+ nni_listener_stats l_stats;
};
+typedef struct nni_pipe_stats {
+ nni_stat_item s_root;
+ nni_stat_item s_id;
+ nni_stat_item s_ep_id;
+ nni_stat_item s_sock_id;
+ char s_scope[16]; // scope name for stats ("pipe" is short)
+} nni_pipe_stats;
+
struct nni_pipe {
uint32_t p_id;
nni_tran_pipe_ops p_tran_ops;
@@ -76,13 +118,11 @@ struct nni_pipe {
nni_mtx p_mtx;
nni_cv p_cv;
nni_reap_item p_reap;
+ nni_pipe_stats p_stats;
};
-extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
-extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *);
-
-extern int nni_sock_add_listener(nni_sock *, nni_listener *);
-extern void nni_sock_remove_listener(nni_sock *, nni_listener *);
+extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
+extern int nni_sock_add_listener(nni_sock *, nni_listener *);
extern void nni_dialer_add_pipe(nni_dialer *, void *);
extern void nni_dialer_shutdown(nni_dialer *);
@@ -101,5 +141,6 @@ extern void nni_pipe_remove(nni_pipe *);
extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev);
extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *, void *);
extern void nni_pipe_start(nni_pipe *);
+extern void nni_pipe_stats_init(nni_pipe *);
#endif // CORE_SOCKIMPL_H
diff --git a/src/core/stats.c b/src/core/stats.c
new file mode 100644
index 00000000..0363a932
--- /dev/null
+++ b/src/core/stats.c
@@ -0,0 +1,525 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdio.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+typedef struct nng_stat nni_stat;
+
+struct nng_stat {
+ char * s_name;
+ const char * s_desc;
+ const char * s_string;
+ uint64_t s_value;
+ nni_time s_time;
+ nni_stat_type s_type;
+ nni_stat_unit s_unit;
+ nni_stat_item *s_item; // Used during snapshot collection
+ nni_list s_children;
+ nni_stat * s_parent;
+ nni_list_node s_node;
+};
+
+#ifdef NNG_ENABLE_STATS
+static nni_stat_item stats_root;
+static nni_mtx stats_lock;
+static nni_mtx * stats_held = NULL;
+#endif
+
+void
+nni_stat_append(nni_stat_item *parent, nni_stat_item *child)
+{
+#ifdef NNG_ENABLE_STATS
+ if (parent == NULL) {
+ parent = &stats_root;
+ }
+ nni_mtx_lock(&stats_lock);
+ // Make sure that the lists for both children and parents
+ // are correctly initialized.
+ if (parent->si_children.ll_head.ln_next == NULL) {
+ NNI_LIST_INIT(&parent->si_children, nni_stat_item, si_node);
+ }
+ if (child->si_children.ll_head.ln_next == NULL) {
+ NNI_LIST_INIT(&child->si_children, nni_stat_item, si_node);
+ }
+ nni_list_append(&parent->si_children, child);
+ child->si_parent = parent;
+ nni_mtx_unlock(&stats_lock);
+#else
+ NNI_ARG_UNUSED(parent);
+ NNI_ARG_UNUSED(child);
+#endif
+}
+
+void
+nni_stat_remove(nni_stat_item *child)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_item *parent;
+ nni_mtx_lock(&stats_lock);
+ if ((parent = child->si_parent) != NULL) {
+ nni_list_remove(&parent->si_children, child);
+ child->si_parent = NULL;
+ }
+ nni_mtx_unlock(&stats_lock);
+#else
+ NNI_ARG_UNUSED(child);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+void
+nni_stat_init(nni_stat_item *stat, const char *name, const char *desc)
+{
+ NNI_LIST_INIT(&stat->si_children, nni_stat_item, si_node);
+ stat->si_parent = NULL;
+ stat->si_name = name;
+ stat->si_desc = desc;
+ stat->si_lock = NULL;
+ stat->si_update = NULL;
+ stat->si_private = NULL;
+ stat->si_string = NULL;
+ stat->si_value = 0;
+ stat->si_type = NNG_STAT_COUNTER;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_scope(nni_stat_item *stat, const char *name, const char *desc)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_type = NNG_STAT_SCOPE;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_string(
+ nni_stat_item *stat, const char *name, const char *desc, const char *str)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_string = str;
+ stat->si_type = NNG_STAT_STRING;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_id(
+ nni_stat_item *stat, const char *name, const char *desc, uint64_t id)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_value = id;
+ stat->si_type = NNG_STAT_ID;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_bool(
+ nni_stat_item *stat, const char *name, const char *desc, bool v)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_value = v ? 1 : 0;
+ stat->si_type = NNG_STAT_BOOLEAN;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+static void
+stat_atomic_update(nni_stat_item *stat, void *notused)
+{
+ NNI_ARG_UNUSED(notused);
+ stat->si_value = nni_atomic_get64(&stat->si_atomic);
+}
+
+void
+nni_stat_init_atomic(nni_stat_item *stat, const char *name, const char *desc)
+{
+
+ nni_stat_init(stat, name, desc);
+ stat->si_value = 0;
+ stat->si_private = NULL;
+ stat->si_update = stat_atomic_update;
+ nni_atomic_init64(&stat->si_atomic);
+}
+
+void
+nni_stat_inc_atomic(nni_stat_item *stat, uint64_t inc)
+{
+ nni_atomic_inc64(&stat->si_atomic, inc);
+}
+
+void
+nni_stat_dec_atomic(nni_stat_item *stat, uint64_t inc)
+{
+ nni_atomic_dec64(&stat->si_atomic, inc);
+}
+#endif
+
+void
+nni_stat_set_value(nni_stat_item *stat, uint64_t v)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_value = v;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(v);
+#endif
+}
+
+void
+nni_stat_set_string(nni_stat_item *stat, const char *str)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_string = str;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(str);
+#endif
+}
+
+void
+nni_stat_set_lock(nni_stat_item *stat, nni_mtx *mtx)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_lock = mtx;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(mtx);
+#endif
+}
+
+void
+nni_stat_set_update(nni_stat_item *stat, nni_stat_update f, void *a)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_update = f;
+ stat->si_private = a;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(f);
+ NNI_ARG_UNUSED(a);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+void
+nni_stat_set_type(nni_stat_item *stat, int type)
+{
+ stat->si_type = type;
+}
+
+void
+nni_stat_set_unit(nni_stat_item *stat, int unit)
+{
+ stat->si_unit = unit;
+}
+#endif
+
+void
+nng_stats_free(nni_stat *st)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat *child;
+
+ while ((child = nni_list_first(&st->s_children)) != NULL) {
+ nni_list_remove(&st->s_children, child);
+ nng_stats_free(child);
+ }
+ nni_strfree(st->s_name);
+ NNI_FREE_STRUCT(st);
+#else
+ NNI_ARG_UNUSED(st);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+static int
+stat_make_tree(nni_stat_item *item, nni_stat **sp)
+{
+ nni_stat * stat;
+ nni_stat_item *child;
+
+ if ((stat = NNI_ALLOC_STRUCT(stat)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((stat->s_name = nni_strdup(item->si_name)) == NULL) {
+ NNI_FREE_STRUCT(stat);
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&stat->s_children, nni_stat, s_node);
+ stat->s_item = item;
+ stat->s_type = item->si_type;
+ stat->s_unit = item->si_unit;
+ stat->s_desc = item->si_desc;
+ stat->s_parent = NULL;
+
+ NNI_LIST_FOREACH (&item->si_children, child) {
+ nni_stat *cs;
+ int rv;
+ if ((rv = stat_make_tree(child, &cs)) != 0) {
+ nng_stats_free(stat);
+ return (rv);
+ }
+ nni_list_append(&stat->s_children, cs);
+ cs->s_parent = stat;
+ }
+ *sp = stat;
+ return (0);
+}
+
+static void
+stat_update(nni_stat *stat)
+{
+ nni_stat_item *item = stat->s_item;
+
+ if (item->si_lock != stats_held) {
+ if (stats_held != NULL) {
+ nni_mtx_unlock(stats_held);
+ stats_held = NULL;
+ }
+ if (item->si_lock != NULL) {
+ nni_mtx_lock(item->si_lock);
+ stats_held = item->si_lock;
+ }
+ }
+ if (item->si_update != NULL) {
+ item->si_update(item, item->si_private);
+ }
+ stat->s_value = item->si_value;
+ stat->s_string = item->si_string;
+ stat->s_time = nni_clock();
+}
+
+static void
+stat_update_tree(nni_stat *stat)
+{
+ nni_stat *child;
+ stat_update(stat);
+ NNI_LIST_FOREACH (&stat->s_children, child) {
+ stat_update_tree(child);
+ }
+}
+
+int
+nni_stat_snapshot(nni_stat **statp, nni_stat_item *item)
+{
+ int rv;
+ nni_stat *stat;
+
+ if (item == NULL) {
+ item = &stats_root;
+ }
+ nni_mtx_lock(&stats_lock);
+ if ((rv = stat_make_tree(item, &stat)) != 0) {
+ nni_mtx_unlock(&stats_lock);
+ return (rv);
+ }
+ stat_update_tree(stat);
+ if (stats_held != NULL) {
+ nni_mtx_unlock(stats_held);
+ stats_held = NULL;
+ }
+ nni_mtx_unlock(&stats_lock);
+ *statp = stat;
+ return (0);
+}
+#endif
+
+int
+nng_stats_get(nng_stat **statp)
+{
+#ifdef NNG_ENABLE_STATS
+ return (nni_stat_snapshot(statp, &stats_root));
+#else
+ NNI_ARG_UNUSED(statp);
+ return (NNG_ENOTSUP);
+#endif
+}
+
+nng_stat *
+nng_stat_parent(nng_stat *stat)
+{
+ return (stat->s_parent);
+}
+
+nng_stat *
+nng_stat_next(nng_stat *stat)
+{
+ if (stat->s_parent == NULL) {
+ return (NULL); // Root node, no siblings.
+ }
+ return (nni_list_next(&stat->s_parent->s_children, stat));
+}
+
+nng_stat *
+nng_stat_child(nng_stat *stat)
+{
+ return (nni_list_first(&stat->s_children));
+}
+
+const char *
+nng_stat_name(nni_stat *stat)
+{
+ return (stat->s_name);
+}
+
+uint64_t
+nng_stat_value(nni_stat *stat)
+{
+ return (stat->s_value);
+}
+
+const char *
+nng_stat_string(nng_stat *stat)
+{
+ return (stat->s_string);
+}
+
+uint64_t
+nng_stat_timestamp(nng_stat *stat)
+{
+ return ((uint64_t) stat->s_time);
+}
+
+int
+nng_stat_type(nng_stat *stat)
+{
+ return (stat->s_type);
+}
+
+int
+nng_stat_unit(nng_stat *stat)
+{
+ return (stat->s_unit);
+}
+
+const char *
+nng_stat_desc(nng_stat *stat)
+{
+ return (stat->s_desc);
+}
+
+int
+nni_stat_sys_init(void)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_mtx_init(&stats_lock);
+ NNI_LIST_INIT(&stats_root.si_children, nni_stat_item, si_node);
+ stats_root.si_name = "";
+ stats_root.si_desc = "all statistsics";
+#endif
+
+ return (0);
+}
+
+void
+nni_stat_sys_fini(void)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_mtx_fini(&stats_lock);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+void
+stat_sprint_scope(nni_stat *stat, char **scope, int *lenp)
+{
+ if (stat->s_parent != NULL) {
+ stat_sprint_scope(stat->s_parent, scope, lenp);
+ }
+ if (strlen(stat->s_name) > 0) {
+ snprintf(*scope, *lenp, "%s.", stat->s_name);
+ } else {
+ (*scope)[0] = '\0';
+ }
+ *lenp -= strlen(*scope);
+ *scope += strlen(*scope);
+}
+#endif
+
+void
+nng_stats_dump(nng_stat *stat)
+{
+#ifdef NNG_ENABLE_STATS
+ static char buf[128]; // to minimize recursion, not thread safe
+ static char line[128];
+ int len;
+ char * scope;
+ char * indent = " ";
+ unsigned long long val;
+ nni_stat * child;
+
+ switch (nng_stat_type(stat)) {
+ case NNG_STAT_SCOPE:
+ scope = buf;
+ len = sizeof(buf);
+ stat_sprint_scope(stat, &scope, &len);
+ len = strlen(buf);
+ if (len > 0) {
+ if (buf[len - 1] == '.') {
+ buf[--len] = '\0';
+ }
+ }
+ if (len > 0) {
+ snprintf(line, sizeof(line), "\n%s:", buf);
+ }
+ break;
+ case NNG_STAT_STRING:
+ snprintf(line, sizeof(line), "%s%-32s\"%s\"", indent,
+ nng_stat_name(stat), nng_stat_string(stat));
+ break;
+ case NNG_STAT_BOOLEAN:
+ val = nng_stat_value(stat);
+ snprintf(line, sizeof(line), "%s%-32s%s", indent,
+ nng_stat_name(stat), val != 0 ? "true" : "false");
+ break;
+ case NNG_STAT_LEVEL:
+ case NNG_STAT_COUNTER:
+ val = nng_stat_value(stat);
+ switch (nng_stat_unit(stat)) {
+ case NNG_UNIT_BYTES:
+ snprintf(line, sizeof(line), "%s%-32s%llu bytes",
+ indent, nng_stat_name(stat), val);
+ break;
+ case NNG_UNIT_MESSAGES:
+ snprintf(line, sizeof(line), "%s%-32s%llu msgs",
+ indent, nng_stat_name(stat), val);
+ break;
+ case NNG_UNIT_MILLIS:
+ snprintf(line, sizeof(line), "%s%-32s%llu msec",
+ indent, nng_stat_name(stat), val);
+ break;
+ case NNG_UNIT_NONE:
+ case NNG_UNIT_EVENTS:
+ default:
+ snprintf(line, sizeof(line), "%s%-32s%llu", indent,
+ nng_stat_name(stat), val);
+ break;
+ }
+ break;
+ case NNG_STAT_ID:
+ val = nng_stat_value(stat);
+ snprintf(line, (sizeof line), "%s%-32s%llu", indent,
+ nng_stat_name(stat), val);
+ break;
+ default:
+ snprintf(line, (sizeof line), "%s%-32s<?>", indent,
+ nng_stat_name(stat));
+ break;
+ }
+ nni_plat_println(line);
+
+ NNI_LIST_FOREACH (&stat->s_children, child) {
+ nng_stats_dump(child);
+ }
+#else
+ NNI_ARG_UNUSED(stat);
+#endif
+}
diff --git a/src/core/stats.h b/src/core/stats.h
new file mode 100644
index 00000000..8cfe14de
--- /dev/null
+++ b/src/core/stats.h
@@ -0,0 +1,108 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_STATS_H
+#define CORE_STATS_H
+
+#include "core/defs.h"
+
+// Statistics support. This is inspired in part by the Solaris
+// kstats framework, but we've simplified and tuned it for our use.
+//
+// Collection of the stats will be done in two steps. First we
+// will walk the list of stats, with the chain held, allocating
+// a user local copy of the stat and pointers.
+//
+// In phase 2, we run the update, and copy the values. We conditionally
+// acquire the lock on the stat first though.
+
+typedef struct nni_stat_item nni_stat_item;
+
+typedef void (*nni_stat_update)(nni_stat_item *, void *);
+typedef enum nng_stat_type_enum nni_stat_type;
+typedef enum nng_unit_enum nni_stat_unit;
+
+// nni_stat_item is used by providers
+struct nni_stat_item {
+#ifdef NNG_ENABLE_STATS
+ nni_list_node si_node; // list node, framework use only
+ nni_stat_item * si_parent; // link back to parent, framework use only
+ nni_list si_children; // children, framework use only
+ const char * si_name; // name of statistic
+ const char * si_desc; // description of statistic (English)
+ nni_mtx * si_lock; // lock for accessing, can be NULL
+ void * si_private; // provider private pointer
+ nni_stat_type si_type; // type of stat, e.g. NNG_STAT_LEVEL
+ nni_stat_unit si_unit; // units, e.g. NNG_UNIT_MILLIS
+ nni_stat_update si_update; // update function (can be NULL)
+ const char * si_string; // string value (NULL for numerics)
+ uint64_t si_value; // numeric value
+ nni_atomic_u64 si_atomic; // atomic value
+#endif
+};
+
+void nni_stat_append(nni_stat_item *, nni_stat_item *);
+void nni_stat_remove(nni_stat_item *);
+
+void nni_stat_set_value(nni_stat_item *, uint64_t);
+void nni_stat_set_string(nni_stat_item *, const char *);
+void nni_stat_set_lock(nni_stat_item *, nni_mtx *);
+void nni_stat_set_update(nni_stat_item *, nni_stat_update, void *);
+
+#ifdef NNG_ENABLE_STATS
+void nni_stat_init(nni_stat_item *, const char *, const char *);
+void nni_stat_init_scope(nni_stat_item *, const char *, const char *);
+void nni_stat_init_string(
+ nni_stat_item *, const char *, const char *, const char *);
+void nni_stat_init_id(nni_stat_item *, const char *, const char *, uint64_t);
+void nni_stat_init_bool(nni_stat_item *, const char *, const char *, bool);
+void nni_stat_init_atomic(nni_stat_item *, const char *, const char *);
+void nni_stat_inc_atomic(nni_stat_item *, uint64_t);
+void nni_stat_dec_atomic(nni_stat_item *, uint64_t);
+void nni_stat_set_type(nni_stat_item *, int);
+void nni_stat_set_unit(nni_stat_item *, int);
+#else
+// We override initialization so that we can avoid compiling static strings
+// into the binary. Presumably if stats are disabled, we are trying to save
+// space for constrained environments. We do evaluate an unused arg to
+// prevent the compiler from bitching about unused values.
+#define nni_stat_init(a, b, c) ((void) (a))
+#define nni_stat_init_scope(a, b, c) ((void) (a))
+#define nni_stat_init_atomic(a, b, c) ((void) (a))
+#define nni_stat_init_id(a, b, c, d) ((void) (a))
+#define nni_stat_init_bool(a, b, c, d) ((void) (a))
+#define nni_stat_init_string(a, b, c, d) ((void) (a))
+#define nni_stat_set_unit(a, b) ((void) (a))
+#define nni_stat_set_type(a, b) ((void) (a))
+#define nni_stat_inc_atomic(stat, inc)
+#define nni_stat_dec_atomic(stat, inc)
+#endif
+
+#if 0
+#define nni_stat_append(a, b)
+#define nni_stat_remove(a)
+#define nni_stat_init(a, b, c)
+#define nni_stat_init_scope(a, b, c)
+#define nni_stat_init_string(a, b, c, d)
+#define nni_stat_init_id(a, b, c, d)
+#define nni_stat_init_bool(a, b, c, d)
+#define nni_stat_dec_atomic(a, b)
+#define nni_stat_set_value(a, b)
+#define nni_stat_set_string(a, b)
+#define nni_stat_set_lock(a, b)
+#define nni_stat_set_update(a, b, c)
+#define nni_stat_set_type(a, b)
+#define nni_stat_set_unit(a, b)
+#endif
+
+int nni_stat_sys_init(void);
+void nni_stat_sys_fini(void);
+
+#endif // CORE_STATS_H