aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/dialer.c87
-rw-r--r--src/core/dialer.h1
-rw-r--r--src/core/init.c4
-rw-r--r--src/core/listener.c82
-rw-r--r--src/core/listener.h1
-rw-r--r--src/core/msgqueue.c100
-rw-r--r--src/core/msgqueue.h9
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/pipe.c50
-rw-r--r--src/core/pipe.h3
-rw-r--r--src/core/socket.c214
-rw-r--r--src/core/socket.h9
-rw-r--r--src/core/sockimpl.h51
-rw-r--r--src/core/stats.c525
-rw-r--r--src/core/stats.h108
-rw-r--r--src/nng.c68
-rw-r--r--src/nng.h102
-rw-r--r--src/protocol/pair1/pair.c68
-rw-r--r--src/transport/inproc/inproc.c141
20 files changed, 1451 insertions, 175 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 14894db8..305922db 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -70,6 +70,8 @@ set (NNG_SOURCES
core/reap.h
core/socket.c
core/socket.h
+ core/stats.c
+ core/stats.h
core/strs.c
core/strs.h
core/taskq.c
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 00ebb513..11faed7c 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -23,6 +23,8 @@ static void dialer_timer_cb(void *);
static nni_idhash *dialers;
static nni_mtx dialers_lk;
+#define BUMPSTAT(x) nni_stat_inc_atomic(x, 1)
+
int
nni_dialer_sys_init(void)
{
@@ -32,8 +34,7 @@ nni_dialer_sys_init(void)
return (rv);
}
nni_mtx_init(&dialers_lk);
- nni_idhash_set_limits(
- dialers, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+ nni_idhash_set_limits(dialers, 1, 0x7fffffff, 1);
return (0);
}
@@ -70,6 +71,55 @@ nni_dialer_destroy(nni_dialer *d)
NNI_FREE_STRUCT(d);
}
+static void
+dialer_stats_init(nni_dialer *d)
+{
+ nni_dialer_stats *st = &d->d_stats;
+ nni_stat_item * root = &st->s_root;
+
+ nni_stat_init_scope(root, st->s_scope, "dialer statistics");
+
+ nni_stat_init_id(&st->s_id, "id", "dialer id", d->d_id);
+ nni_stat_append(root, &st->s_id);
+
+ nni_stat_init_id(&st->s_sock, "socket", "socket for dialer",
+ nni_sock_id(d->d_sock));
+ nni_stat_append(root, &st->s_sock);
+
+ nni_stat_init_string(
+ &st->s_url, "url", "dialer url", d->d_url->u_rawurl);
+ nni_stat_append(root, &st->s_url);
+
+ nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
+ nni_stat_append(root, &st->s_npipes);
+
+ nni_stat_init_atomic(&st->s_connok, "connok", "connections made");
+ nni_stat_append(root, &st->s_connok);
+
+ nni_stat_init_atomic(
+ &st->s_canceled, "canceled", "connections canceled");
+ nni_stat_append(root, &st->s_canceled);
+
+ nni_stat_init_atomic(&st->s_refused, "refused", "connections refused");
+ nni_stat_append(root, &st->s_refused);
+
+ nni_stat_init_atomic(
+ &st->s_timedout, "timedout", "connections timed out");
+ nni_stat_append(root, &st->s_timedout);
+
+ nni_stat_init_atomic(
+ &st->s_othererr, "othererr", "other connection errors");
+ nni_stat_append(root, &st->s_othererr);
+
+ nni_stat_init_atomic(
+ &st->s_protorej, "protoreject", "pipes rejected by protocol");
+ nni_stat_append(root, &st->s_protorej);
+
+ nni_stat_init_atomic(
+ &st->s_apprej, "appreject", "pipes rejected by application");
+ nni_stat_append(root, &st->s_apprej);
+}
+
int
nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
{
@@ -110,6 +160,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
nni_mtx_init(&d->d_mtx);
+ dialer_stats_init(d);
if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) ||
@@ -119,6 +170,10 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
return (rv);
}
+ snprintf(d->d_stats.s_scope, sizeof(d->d_stats.s_scope), "dialer%u",
+ d->d_id);
+ nni_stat_set_value(&d->d_stats.s_id, d->d_id);
+ nni_stat_append(NULL, &d->d_stats.s_root);
*dp = d;
return (0);
}
@@ -167,6 +222,7 @@ nni_dialer_rele(nni_dialer *d)
nni_mtx_lock(&dialers_lk);
d->d_refcnt--;
if ((d->d_refcnt == 0) && (d->d_closed)) {
+ nni_stat_remove(&d->d_stats.s_root);
nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d);
}
nni_mtx_unlock(&dialers_lk);
@@ -242,12 +298,33 @@ dialer_connect_cb(void *arg)
switch ((rv = nni_aio_result(aio))) {
case 0:
+ BUMPSTAT(&d->d_stats.s_connok);
nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
break;
case NNG_ECLOSED: // No further action.
case NNG_ECANCELED: // No further action.
+ BUMPSTAT(&d->d_stats.s_canceled);
break;
+ case NNG_ECONNREFUSED:
+ BUMPSTAT(&d->d_stats.s_refused);
+ if (uaio == NULL) {
+ nni_dialer_timer_start(d);
+ } else {
+ nni_atomic_flag_reset(&d->d_started);
+ }
+ break;
+
+ case NNG_ETIMEDOUT:
+ BUMPSTAT(&d->d_stats.s_timedout);
+ if (uaio == NULL) {
+ nni_dialer_timer_start(d);
+ } else {
+ nni_atomic_flag_reset(&d->d_started);
+ }
+ break;
+
default:
+ BUMPSTAT(&d->d_stats.s_othererr);
if (uaio == NULL) {
nni_dialer_timer_start(d);
} else {
@@ -389,3 +466,9 @@ nni_dialer_getopt(
return (nni_sock_getopt(d->d_sock, name, valp, szp, t));
}
+
+void
+nni_dialer_add_stat(nni_dialer *d, nni_stat_item *stat)
+{
+ nni_stat_append(&d->d_stats.s_root, stat);
+}
diff --git a/src/core/dialer.h b/src/core/dialer.h
index 361c6d5e..da2cc992 100644
--- a/src/core/dialer.h
+++ b/src/core/dialer.h
@@ -26,5 +26,6 @@ extern int nni_dialer_setopt(
nni_dialer *, const char *, const void *, size_t, nni_opt_type);
extern int nni_dialer_getopt(
nni_dialer *, const char *, void *, size_t *, nni_opt_type);
+extern void nni_dialer_add_stat(nni_dialer *, nni_stat_item *);
#endif // CORE_DIALER_H
diff --git a/src/core/init.c b/src/core/init.c
index c66e54d2..60736fb7 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -27,7 +27,8 @@ nni_init_helper(void)
NNI_LIST_INIT(&nni_init_list, nni_initializer, i_node);
nni_inited = true;
- if (((rv = nni_taskq_sys_init()) != 0) ||
+ if (((rv = nni_stat_sys_init()) != 0) ||
+ ((rv = nni_taskq_sys_init()) != 0) ||
((rv = nni_reap_sys_init()) != 0) ||
((rv = nni_timer_sys_init()) != 0) ||
((rv = nni_aio_sys_init()) != 0) ||
@@ -81,6 +82,7 @@ nni_fini(void)
nni_timer_sys_fini();
nni_taskq_sys_fini();
nni_reap_sys_fini(); // must be before timer and aio (expire)
+ nni_stat_sys_fini();
nni_mtx_fini(&nni_init_mtx);
nni_plat_fini();
diff --git a/src/core/listener.c b/src/core/listener.c
index 1fe6faab..135478de 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -24,6 +24,8 @@ static void listener_timer_cb(void *);
static nni_idhash *listeners;
static nni_mtx listeners_lk;
+#define BUMPSTAT(x) nni_stat_inc_atomic(x, 1)
+
int
nni_listener_sys_init(void)
{
@@ -33,8 +35,7 @@ nni_listener_sys_init(void)
return (rv);
}
nni_mtx_init(&listeners_lk);
- nni_idhash_set_limits(
- listeners, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+ nni_idhash_set_limits(listeners, 1, 0x7fffffff, 1);
return (0);
}
@@ -70,6 +71,55 @@ nni_listener_destroy(nni_listener *l)
NNI_FREE_STRUCT(l);
}
+static void
+listener_stats_init(nni_listener *l)
+{
+ nni_listener_stats *st = &l->l_stats;
+ nni_stat_item * root = &st->s_root;
+
+ nni_stat_init_scope(root, st->s_scope, "listener statistics");
+
+ // NB: This will be updated later.
+ nni_stat_init_id(&st->s_id, "id", "listener id", l->l_id);
+ nni_stat_append(root, &st->s_id);
+
+ nni_stat_init_id(&st->s_sock, "socket", "socket for listener",
+ nni_sock_id(l->l_sock));
+ nni_stat_append(root, &st->s_sock);
+
+ nni_stat_init_string(
+ &st->s_url, "url", "listener url", l->l_url->u_rawurl);
+ nni_stat_append(root, &st->s_url);
+
+ nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
+ nni_stat_append(root, &st->s_npipes);
+
+ nni_stat_init_atomic(&st->s_accept, "accept", "connections accepted");
+ nni_stat_append(root, &st->s_accept);
+
+ nni_stat_init_atomic(
+ &st->s_aborted, "aborted", "accepts aborted remotely");
+ nni_stat_append(root, &st->s_aborted);
+
+ nni_stat_init_atomic(&st->s_timedout, "timedout", "accepts timed out");
+ nni_stat_append(root, &st->s_timedout);
+
+ nni_stat_init_atomic(&st->s_canceled, "canceled", "accepts canceled");
+ nni_stat_append(root, &st->s_canceled);
+
+ nni_stat_init_atomic(
+ &st->s_othererr, "othererr", "other accept errors");
+ nni_stat_append(root, &st->s_othererr);
+
+ nni_stat_init_atomic(
+ &st->s_protorej, "protoreject", "pipes rejected by protocol");
+ nni_stat_append(root, &st->s_protorej);
+
+ nni_stat_init_atomic(
+ &st->s_apprej, "appreject", "pipes rejected by application");
+ nni_stat_append(root, &st->s_apprej);
+}
+
int
nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
{
@@ -107,6 +157,7 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
NNI_LIST_NODE_INIT(&l->l_node);
NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node);
+ listener_stats_init(l);
if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) ||
((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) ||
@@ -117,6 +168,12 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
return (rv);
}
+ // Update a few stat bits, and register them.
+ snprintf(l->l_stats.s_scope, sizeof(l->l_stats.s_scope), "listener%u",
+ l->l_id);
+ nni_stat_set_value(&l->l_stats.s_id, l->l_id);
+ nni_stat_append(NULL, &l->l_stats.s_root);
+
*lp = l;
return (0);
}
@@ -165,6 +222,7 @@ nni_listener_rele(nni_listener *l)
nni_mtx_lock(&listeners_lk);
l->l_refcnt--;
if ((l->l_refcnt == 0) && (l->l_closed)) {
+ nni_stat_remove(&l->l_stats.s_root);
nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l);
}
nni_mtx_unlock(&listeners_lk);
@@ -233,18 +291,30 @@ listener_accept_cb(void *arg)
switch (nni_aio_result(aio)) {
case 0:
+ BUMPSTAT(&l->l_stats.s_accept);
nni_listener_add_pipe(l, nni_aio_get_output(aio, 0));
listener_accept_start(l);
break;
case NNG_ECONNABORTED: // remote condition, no cooldown
case NNG_ECONNRESET: // remote condition, no cooldown
- case NNG_EPEERAUTH: // peer validation failure
+ BUMPSTAT(&l->l_stats.s_aborted);
+ listener_accept_start(l);
+ break;
+ case NNG_ETIMEDOUT:
+ // No need to sleep since we timed out already.
+ BUMPSTAT(&l->l_stats.s_timedout);
+ listener_accept_start(l);
+ break;
+ case NNG_EPEERAUTH: // peer validation failure
+ BUMPSTAT(&l->l_stats.s_othererr);
listener_accept_start(l);
break;
case NNG_ECLOSED: // no further action
case NNG_ECANCELED: // no further action
+ BUMPSTAT(&l->l_stats.s_canceled);
break;
default:
+ BUMPSTAT(&l->l_stats.s_othererr);
// We don't really know why we failed, but we backoff
// here. This is because errors here are probably due
// to system failures (resource exhaustion) and we hope
@@ -339,3 +409,9 @@ nni_listener_getopt(
return (nni_sock_getopt(l->l_sock, name, valp, szp, t));
}
+
+void
+nni_listener_add_stat(nni_listener *l, nni_stat_item *stat)
+{
+ nni_stat_append(&l->l_stats.s_root, stat);
+}
diff --git a/src/core/listener.h b/src/core/listener.h
index 67a782bd..828102a2 100644
--- a/src/core/listener.h
+++ b/src/core/listener.h
@@ -26,5 +26,6 @@ extern int nni_listener_setopt(
nni_listener *, const char *, const void *, size_t, nni_opt_type);
extern int nni_listener_getopt(
nni_listener *, const char *, void *, size_t *, nni_opt_type);
+extern void nni_listener_add_stat(nni_listener *, nni_stat_item *);
#endif // CORE_LISTENER_H
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 62a3893b..4b59aead 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -37,6 +37,15 @@ struct nni_msgq {
// Filters.
nni_msgq_filter mq_filter_fn;
void * mq_filter_arg;
+
+ // Statistics.
+ nni_atomic_u64 mq_get_msgs;
+ nni_atomic_u64 mq_put_msgs;
+ nni_atomic_u64 mq_get_bytes;
+ nni_atomic_u64 mq_put_bytes;
+ nni_atomic_u64 mq_get_errs;
+ nni_atomic_u64 mq_put_errs;
+ nni_atomic_u64 mq_discards;
};
static void nni_msgq_run_notify(nni_msgq *);
@@ -76,6 +85,14 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
mq->mq_geterr = 0;
*mqp = mq;
+ nni_atomic_init64(&mq->mq_get_bytes);
+ nni_atomic_init64(&mq->mq_put_bytes);
+ nni_atomic_init64(&mq->mq_get_msgs);
+ nni_atomic_init64(&mq->mq_put_msgs);
+ nni_atomic_init64(&mq->mq_get_errs);
+ nni_atomic_init64(&mq->mq_put_errs);
+ nni_atomic_init64(&mq->mq_discards);
+
return (0);
}
@@ -185,6 +202,7 @@ nni_msgq_flush(nni_msgq *mq)
mq->mq_get = 0;
}
mq->mq_len--;
+ nni_atomic_inc64(&mq->mq_discards, 1);
nni_msg_free(msg);
}
nni_msgq_run_notify(mq);
@@ -215,13 +233,20 @@ nni_msgq_run_putq(nni_msgq *mq)
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
nni_aio_finish(waio, 0, len);
@@ -230,6 +255,8 @@ nni_msgq_run_putq(nni_msgq *mq)
// Otherwise if we have room in the buffer, just queue it.
if (mq->mq_len < mq->mq_cap) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
nni_list_remove(&mq->mq_aio_putq, waio);
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
@@ -265,8 +292,13 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ size_t len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
continue;
}
@@ -278,18 +310,26 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = nni_aio_get_msg(waio);
len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
+
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
+ nni_aio_finish(waio, 0, len);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
+ } else {
+ nni_atomic_inc64(&mq->mq_discards, 1);
}
- nni_aio_finish(waio, 0, len);
continue;
}
@@ -338,6 +378,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
}
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_puterr) {
+ nni_atomic_inc64(&mq->mq_put_errs, 1);
nni_aio_finish_error(aio, mq->mq_puterr);
nni_mtx_unlock(&mq->mq_lock);
return;
@@ -348,6 +389,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
(nni_list_empty(&mq->mq_aio_getq))) {
+ nni_atomic_inc64(&mq->mq_put_errs, 1);
nni_mtx_unlock(&mq->mq_lock);
nni_aio_finish_error(aio, rv);
return;
@@ -370,6 +412,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_geterr) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_atomic_inc64(&mq->mq_get_errs, 1);
nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
@@ -377,6 +420,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
if ((rv != 0) && (mq->mq_len == 0) &&
(nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_atomic_inc64(&mq->mq_get_errs, 1);
nni_aio_finish_error(aio, rv);
return;
}
@@ -392,6 +436,7 @@ int
nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
{
nni_aio *raio;
+ size_t len;
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_closed) {
@@ -399,15 +444,22 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_ECLOSED);
}
+ len = nni_msg_len(msg);
+
// The presence of any blocked reader indicates that
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
+ len = nni_msg_len(msg);
+ nni_atomic_inc64(&mq->mq_get_bytes, len);
+ nni_atomic_inc64(&mq->mq_get_msgs, 1);
nni_list_remove(&mq->mq_aio_getq, raio);
nni_aio_finish_msg(raio, msg);
nni_msgq_run_notify(mq);
@@ -418,6 +470,9 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
// Otherwise if we have room in the buffer, just queue it.
if (mq->mq_len < mq->mq_cap) {
+ nni_atomic_inc64(&mq->mq_put_bytes, len);
+ nni_atomic_inc64(&mq->mq_put_msgs, 1);
+
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
mq->mq_put = 0;
@@ -514,6 +569,7 @@ nni_msgq_resize(nni_msgq *mq, int cap)
mq->mq_get = 0;
}
mq->mq_len--;
+ nni_atomic_inc64(&mq->mq_discards, 1);
nni_msg_free(msg);
}
if (newq == NULL) {
@@ -586,3 +642,45 @@ nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp)
*sp = mq->mq_sendable;
return (0);
}
+
+uint64_t
+nni_msgq_stat_get_bytes(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_bytes));
+}
+
+uint64_t
+nni_msgq_stat_put_bytes(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_bytes));
+}
+
+uint64_t
+nni_msgq_stat_get_msgs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_msgs));
+}
+
+uint64_t
+nni_msgq_stat_put_msgs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_msgs));
+}
+
+uint64_t
+nni_msgq_stat_get_errs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_get_errs));
+}
+
+uint64_t
+nni_msgq_stat_put_errs(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_put_errs));
+}
+
+uint64_t
+nni_msgq_stat_discards(nni_msgq *mq)
+{
+ return (nni_atomic_get64(&mq->mq_discards));
+}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 7dc5800d..bd9b3682 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -99,4 +99,13 @@ extern int nni_msgq_len(nni_msgq *mq);
extern int nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **);
extern int nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **);
+// message queues keep statistics
+extern uint64_t nni_msgq_stat_get_bytes(nni_msgq *);
+extern uint64_t nni_msgq_stat_put_bytes(nni_msgq *);
+extern uint64_t nni_msgq_stat_get_msgs(nni_msgq *);
+extern uint64_t nni_msgq_stat_put_msgs(nni_msgq *);
+extern uint64_t nni_msgq_stat_get_errs(nni_msgq *);
+extern uint64_t nni_msgq_stat_put_errs(nni_msgq *);
+extern uint64_t nni_msgq_stat_discards(nni_msgq *);
+
#endif // CORE_MSQUEUE_H
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 9af12720..f28ddf8c 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -41,6 +41,7 @@
#include "core/protocol.h"
#include "core/random.h"
#include "core/reap.h"
+#include "core/stats.h"
#include "core/strs.h"
#include "core/taskq.h"
#include "core/thread.h"
diff --git a/src/core/pipe.c b/src/core/pipe.c
index bdbc76e0..d623e158 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -11,6 +11,7 @@
#include "core/nng_impl.h"
#include "sockimpl.h"
+#include <stdio.h>
#include <string.h>
// This file contains functions relating to pipes.
@@ -82,6 +83,7 @@ pipe_destroy(nni_pipe *p)
p->p_tran_ops.p_stop(p->p_tran_data);
}
+ nni_stat_remove(&p->p_stats.s_root);
nni_pipe_remove(p);
if (p->p_proto_data != NULL) {
@@ -177,6 +179,28 @@ nni_pipe_peer(nni_pipe *p)
return (p->p_tran_ops.p_peer(p->p_tran_data));
}
+void
+nni_pipe_stats_init(nni_pipe *p)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_pipe_stats *st = &p->p_stats;
+
+ if (p->p_listener != NULL) {
+ st->s_ep_id.si_name = "listener";
+ st->s_ep_id.si_desc = "listener for pipe";
+ st->s_ep_id.si_value = nni_listener_id(p->p_listener);
+ } else {
+ st->s_ep_id.si_name = "dialer";
+ st->s_ep_id.si_desc = "dialer for pipe";
+ st->s_ep_id.si_value = nni_dialer_id(p->p_dialer);
+ }
+
+ nni_stat_append(NULL, &st->s_root);
+#else
+ NNI_ARG_UNUSED(p);
+#endif
+}
+
int
nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
@@ -184,7 +208,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
int rv;
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
- uint64_t id;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -210,12 +233,27 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_cv_init(&p->p_cv, &nni_pipe_lk);
nni_mtx_lock(&nni_pipe_lk);
- if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
- p->p_id = (uint32_t) id;
+ if ((rv = nni_idhash_alloc32(nni_pipes, &p->p_id, p)) == 0) {
p->p_refcnt = 1;
}
nni_mtx_unlock(&nni_pipe_lk);
+ nni_pipe_stats *st = &p->p_stats;
+ snprintf(st->s_scope, sizeof(st->s_scope), "pipe%u", p->p_id);
+
+ nni_stat_init_scope(&st->s_root, st->s_scope, "pipe statistics");
+
+ nni_stat_init_id(&st->s_id, "id", "pipe id", p->p_id);
+ nni_stat_append(&st->s_root, &st->s_id);
+
+ // name and description fleshed out later.
+ nni_stat_init_id(&st->s_ep_id, "", "", 0);
+ nni_stat_append(&st->s_root, &st->s_ep_id);
+
+ nni_stat_init_id(&st->s_sock_id, "socket", "socket for pipe",
+ nni_sock_id(p->p_sock));
+ nni_stat_append(&st->s_root, &st->s_sock_id);
+
if ((rv != 0) || ((rv = tran->tran_pipe->p_init(tdata, p)) != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
nni_pipe_close(p);
@@ -274,3 +312,9 @@ nni_pipe_dialer_id(nni_pipe *p)
{
return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0);
}
+
+void
+nni_pipe_add_stat(nni_pipe *p, nni_stat_item *item)
+{
+ nni_stat_append(&p->p_stats.s_root, item);
+}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 1e2f2b5d..5a83059f 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -62,4 +62,7 @@ extern uint32_t nni_pipe_dialer_id(nni_pipe *);
// nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find.
extern void nni_pipe_rele(nni_pipe *);
+// nni_pipe_add_stat adds a statistic to the pipe
+extern void nni_pipe_add_stat(nni_pipe *p, nni_stat_item *);
+
#endif // CORE_PIPE_H
diff --git a/src/core/socket.c b/src/core/socket.c
index b2f331fb..63696b36 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -53,6 +53,22 @@ typedef struct nni_sock_pipe_cb {
void * cb_arg;
} nni_sock_pipe_cb;
+typedef struct sock_stats {
+ nni_stat_item s_root; // socket scope
+ nni_stat_item s_id; // socket id
+ nni_stat_item s_name; // socket name
+ nni_stat_item s_protocol; // socket protocol
+ nni_stat_item s_ndialers; // number of dialers
+ nni_stat_item s_nlisteners; // number of listeners
+ nni_stat_item s_npipes; // number of pipes
+ nni_stat_item s_rxbytes; // number of bytes received
+ nni_stat_item s_txbytes; // number of bytes received
+ nni_stat_item s_rxmsgs; // number of msgs received
+ nni_stat_item s_txmsgs; // number of msgs sent
+ nni_stat_item s_protorej; // pipes rejected by protocol
+ nni_stat_item s_apprej; // pipes rejected by application
+} sock_stats;
+
struct nni_socket {
nni_list_node s_node;
nni_mtx s_mx;
@@ -82,6 +98,7 @@ struct nni_socket {
size_t s_rcvmaxsz; // max receive size
nni_list s_options; // opts not handled by sock/proto
char s_name[64]; // socket name (legacy compat)
+ char s_scope[24]; // socket scope ("socket%u", 32 bits max)
nni_list s_listeners; // active listeners
nni_list s_dialers; // active dialers
@@ -94,6 +111,8 @@ struct nni_socket {
nni_mtx s_pipe_cbs_mtx;
nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
+
+ sock_stats s_stats;
};
static void nni_ctx_destroy(nni_ctx *);
@@ -400,6 +419,81 @@ nni_sock_rele(nni_sock *s)
}
static void
+sock_stats_fini(nni_sock *s)
+{
+#ifdef NNG_ENABLE_STATS
+ sock_stats *st = &s->s_stats;
+ nni_stat_remove(&st->s_root);
+#else
+ NNI_ARG_UNUSED(s);
+#endif
+}
+
+static void
+sock_stats_init(nni_sock *s)
+{
+#ifdef NNG_ENABLE_STATS
+ sock_stats * st = &s->s_stats;
+ nni_stat_item *root = &s->s_stats.s_root;
+
+ // To make collection cheap and atomic for the socket,
+ // we just use a single lock for the entire chain.
+
+ nni_stat_init_scope(root, s->s_scope, "socket statistics");
+
+ nni_stat_init_id(&st->s_id, "id", "socket id", s->s_id);
+ nni_stat_append(root, &st->s_id);
+
+ nni_stat_init_string(&st->s_name, "name", "socket name", s->s_name);
+ nni_stat_set_lock(&st->s_name, &s->s_mx);
+ nni_stat_append(root, &st->s_name);
+
+ nni_stat_init_string(&st->s_protocol, "protocol", "socket protocol",
+ nni_sock_proto_name(s));
+ nni_stat_append(root, &st->s_protocol);
+
+ nni_stat_init_atomic(&st->s_ndialers, "ndialers", "open dialers");
+ nni_stat_set_type(&st->s_ndialers, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_ndialers);
+
+ nni_stat_init_atomic(
+ &st->s_nlisteners, "nlisteners", "open listeners");
+ nni_stat_set_type(&st->s_nlisteners, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_nlisteners);
+
+ nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
+ nni_stat_set_type(&st->s_npipes, NNG_STAT_LEVEL);
+ nni_stat_append(root, &st->s_npipes);
+
+ nni_stat_init_atomic(&st->s_rxbytes, "rxbytes", "bytes received");
+ nni_stat_set_unit(&st->s_rxbytes, NNG_UNIT_BYTES);
+ nni_stat_append(root, &st->s_rxbytes);
+
+ nni_stat_init_atomic(&st->s_txbytes, "txbytes", "bytes sent");
+ nni_stat_set_unit(&st->s_txbytes, NNG_UNIT_BYTES);
+ nni_stat_append(root, &st->s_txbytes);
+
+ nni_stat_init_atomic(&st->s_rxmsgs, "rxmsgs", "messages received");
+ nni_stat_set_unit(&st->s_rxmsgs, NNG_UNIT_MESSAGES);
+ nni_stat_append(root, &st->s_rxmsgs);
+
+ nni_stat_init_atomic(&st->s_txmsgs, "txmsgs", "messages sent");
+ nni_stat_set_unit(&st->s_txmsgs, NNG_UNIT_MESSAGES);
+ nni_stat_append(root, &st->s_txmsgs);
+
+ nni_stat_init_atomic(
+ &st->s_protorej, "protoreject", "pipes rejected by protocol");
+ nni_stat_append(root, &st->s_protorej);
+
+ nni_stat_init_atomic(
+ &st->s_apprej, "appreject", "pipes rejected by application");
+ nni_stat_append(root, &st->s_apprej);
+#else
+ NNI_ARG_UNUSED(s);
+#endif
+}
+
+static void
sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
@@ -418,6 +512,7 @@ sock_destroy(nni_sock *s)
nni_mtx_lock(&s->s_mx);
nni_mtx_unlock(&s->s_mx);
+ sock_stats_fini(s);
nni_msgq_fini(s->s_urq);
nni_msgq_fini(s->s_uwq);
nni_cv_fini(&s->s_close_cv);
@@ -469,6 +564,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
nni_cv_init(&s->s_close_cv, &sock_lk);
+ sock_stats_init(s);
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 1)) != 0) ||
@@ -567,7 +663,14 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
// Set the sockname.
(void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id);
- return (rv);
+ // Set up basic stat values.
+ (void) snprintf(s->s_scope, sizeof(s->s_scope), "socket%u", s->s_id);
+ nni_stat_set_value(&s->s_stats.s_id, s->s_id);
+
+ // Add our stats chain.
+ nni_stat_append(NULL, &s->s_stats.s_root);
+
+ return (0);
}
// nni_sock_shutdown shuts down the socket; after this point no
@@ -699,6 +802,8 @@ nni_sock_close(nni_sock *s)
// is idempotent.
nni_sock_shutdown(s);
+ nni_stat_remove(&s->s_stats.s_root);
+
nni_mtx_lock(&sock_lk);
if (s->s_closed) {
// Some other thread called close. All we need to do
@@ -830,6 +935,9 @@ nni_sock_add_listener(nni_sock *s, nni_listener *l)
}
nni_list_append(&s->s_listeners, l);
+
+ nni_stat_inc_atomic(&s->s_stats.s_nlisteners, 1);
+
nni_mtx_unlock(&s->s_mx);
return (0);
}
@@ -856,34 +964,11 @@ nni_sock_add_dialer(nni_sock *s, nni_dialer *d)
}
nni_list_append(&s->s_dialers, d);
- nni_mtx_unlock(&s->s_mx);
- return (0);
-}
-void
-nni_sock_remove_listener(nni_sock *s, nni_listener *l)
-{
- nni_mtx_lock(&s->s_mx);
- if (nni_list_active(&s->s_listeners, l)) {
- nni_list_remove(&s->s_listeners, l);
- if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
- nni_cv_wake(&s->s_cv);
- }
- }
- nni_mtx_unlock(&s->s_mx);
-}
+ nni_stat_inc_atomic(&s->s_stats.s_ndialers, 1);
-void
-nni_sock_remove_dialer(nni_sock *s, nni_dialer *d)
-{
- nni_mtx_lock(&s->s_mx);
- if (nni_list_active(&s->s_dialers, d)) {
- nni_list_remove(&s->s_dialers, d);
- if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
- nni_cv_wake(&s->s_cv);
- }
- }
nni_mtx_unlock(&s->s_mx);
+ return (0);
}
int
@@ -1398,13 +1483,25 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
d->d_pipe = p;
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);
+ nni_stat_inc_atomic(&d->d_stats.s_npipes, 1);
+
+ nni_pipe_stats_init(p);
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
nni_mtx_lock(&s->s_mx);
- if ((p->p_closed) ||
- (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&d->d_stats.s_apprej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_apprej, 1);
+ nni_pipe_rele(p);
+ return;
+ }
+ if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&d->d_stats.s_protorej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_protorej, 1);
nni_pipe_close(p);
nni_pipe_rele(p);
return;
@@ -1493,17 +1590,30 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_mtx_unlock(&s->s_mx);
return;
}
+
p->p_listener = l;
nni_list_append(&l->l_pipes, p);
nni_list_append(&s->s_pipes, p);
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_npipes, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);
+
+ nni_pipe_stats_init(p);
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
nni_mtx_lock(&s->s_mx);
- if ((p->p_closed) ||
- (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_apprej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_apprej, 1);
+ nni_pipe_rele(p);
+ return;
+ }
+ if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_stat_inc_atomic(&l->l_stats.s_protorej, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_protorej, 1);
nni_pipe_close(p);
nni_pipe_rele(p);
return;
@@ -1616,6 +1726,15 @@ nni_pipe_remove(nni_pipe *p)
nni_dialer *d = p->p_dialer;
nni_mtx_lock(&s->s_mx);
+ if (nni_list_node_active(&p->p_sock_node)) {
+ nni_stat_dec_atomic(&s->s_stats.s_npipes, 1);
+ }
+ if (p->p_listener != NULL) {
+ nni_stat_dec_atomic(&p->p_listener->l_stats.s_npipes, 1);
+ }
+ if (p->p_dialer != NULL) {
+ nni_stat_dec_atomic(&p->p_dialer->d_stats.s_npipes, 1);
+ }
nni_list_node_remove(&p->p_sock_node);
nni_list_node_remove(&p->p_ep_node);
p->p_listener = NULL;
@@ -1629,3 +1748,38 @@ nni_pipe_remove(nni_pipe *p)
}
nni_mtx_unlock(&s->s_mx);
}
+
+void
+nni_sock_add_stat(nni_sock *s, nni_stat_item *stat)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_append(&s->s_stats.s_root, stat);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(stat);
+#endif
+}
+
+void
+nni_sock_bump_tx(nni_sock *s, uint64_t sz)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc_atomic(&s->s_stats.s_txmsgs, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_txbytes, sz);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(sz);
+#endif
+}
+
+void
+nni_sock_bump_rx(nni_sock *s, uint64_t sz)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc_atomic(&s->s_stats.s_rxmsgs, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_rxbytes, sz);
+#else
+ NNI_ARG_UNUSED(s);
+ NNI_ARG_UNUSED(sz);
+#endif
+}
diff --git a/src/core/socket.h b/src/core/socket.h
index 4b9c4642..5486918e 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -25,6 +25,7 @@ extern uint16_t nni_sock_peer_id(nni_sock *);
extern const char *nni_sock_proto_name(nni_sock *);
extern const char *nni_sock_peer_name(nni_sock *);
extern void * nni_sock_proto_data(nni_sock *);
+extern void nni_sock_add_stat(nni_sock *, nni_stat_item *);
extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *);
@@ -107,4 +108,12 @@ extern int nni_ctx_getopt(
extern int nni_ctx_setopt(
nni_ctx *, const char *, const void *, size_t, nni_opt_type);
+// nni_sock_bump_rx is called by a protocol when a message is received by
+// a consuming app. It bumps the rxmsgs by one and rxbytes by the size.
+extern void nni_sock_bump_rx(nni_sock *s, uint64_t sz);
+
+// nni_sock_bump_rx is called by a protocol when a message is sent by
+// a consuming app. It bumps the txmsgs by one and txbytes by the size.
+extern void nni_sock_bump_tx(nni_sock *s, uint64_t sz);
+
#endif // CORE_SOCKET_H
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index be454d8a..ae1fd92e 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -15,6 +15,22 @@
// and pipes. This must not be exposed to other subsystems -- these internals
// are subject to change at any time.
+typedef struct nni_dialer_stats {
+ nni_stat_item s_root;
+ nni_stat_item s_id;
+ nni_stat_item s_sock;
+ nni_stat_item s_url;
+ nni_stat_item s_npipes;
+ nni_stat_item s_connok;
+ nni_stat_item s_refused;
+ nni_stat_item s_canceled;
+ nni_stat_item s_timedout;
+ nni_stat_item s_othererr;
+ nni_stat_item s_protorej;
+ nni_stat_item s_apprej;
+ char s_scope[24]; // scope name for stats
+} nni_dialer_stats;
+
struct nni_dialer {
nni_tran_dialer_ops d_ops; // transport ops
nni_tran * d_tran; // transport pointer
@@ -38,8 +54,25 @@ struct nni_dialer {
nni_duration d_inirtime; // initial time for reconnect
nni_time d_conntime; // time of last good connect
nni_reap_item d_reap;
+ nni_dialer_stats d_stats;
};
+typedef struct nni_listener_stats {
+ nni_stat_item s_root;
+ nni_stat_item s_id;
+ nni_stat_item s_sock;
+ nni_stat_item s_url;
+ nni_stat_item s_npipes;
+ nni_stat_item s_accept;
+ nni_stat_item s_aborted; // aborted remotely
+ nni_stat_item s_timedout;
+ nni_stat_item s_canceled;
+ nni_stat_item s_othererr;
+ nni_stat_item s_protorej;
+ nni_stat_item s_apprej;
+ char s_scope[24]; // scope name for stats
+} nni_listener_stats;
+
struct nni_listener {
nni_tran_listener_ops l_ops; // transport ops
nni_tran * l_tran; // transport pointer
@@ -56,8 +89,17 @@ struct nni_listener {
nni_aio * l_acc_aio;
nni_aio * l_tmo_aio;
nni_reap_item l_reap;
+ nni_listener_stats l_stats;
};
+typedef struct nni_pipe_stats {
+ nni_stat_item s_root;
+ nni_stat_item s_id;
+ nni_stat_item s_ep_id;
+ nni_stat_item s_sock_id;
+ char s_scope[16]; // scope name for stats ("pipe" is short)
+} nni_pipe_stats;
+
struct nni_pipe {
uint32_t p_id;
nni_tran_pipe_ops p_tran_ops;
@@ -76,13 +118,11 @@ struct nni_pipe {
nni_mtx p_mtx;
nni_cv p_cv;
nni_reap_item p_reap;
+ nni_pipe_stats p_stats;
};
-extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
-extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *);
-
-extern int nni_sock_add_listener(nni_sock *, nni_listener *);
-extern void nni_sock_remove_listener(nni_sock *, nni_listener *);
+extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
+extern int nni_sock_add_listener(nni_sock *, nni_listener *);
extern void nni_dialer_add_pipe(nni_dialer *, void *);
extern void nni_dialer_shutdown(nni_dialer *);
@@ -101,5 +141,6 @@ extern void nni_pipe_remove(nni_pipe *);
extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev);
extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *, void *);
extern void nni_pipe_start(nni_pipe *);
+extern void nni_pipe_stats_init(nni_pipe *);
#endif // CORE_SOCKIMPL_H
diff --git a/src/core/stats.c b/src/core/stats.c
new file mode 100644
index 00000000..0363a932
--- /dev/null
+++ b/src/core/stats.c
@@ -0,0 +1,525 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdio.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+typedef struct nng_stat nni_stat;
+
+struct nng_stat {
+ char * s_name;
+ const char * s_desc;
+ const char * s_string;
+ uint64_t s_value;
+ nni_time s_time;
+ nni_stat_type s_type;
+ nni_stat_unit s_unit;
+ nni_stat_item *s_item; // Used during snapshot collection
+ nni_list s_children;
+ nni_stat * s_parent;
+ nni_list_node s_node;
+};
+
+#ifdef NNG_ENABLE_STATS
+static nni_stat_item stats_root;
+static nni_mtx stats_lock;
+static nni_mtx * stats_held = NULL;
+#endif
+
+void
+nni_stat_append(nni_stat_item *parent, nni_stat_item *child)
+{
+#ifdef NNG_ENABLE_STATS
+ if (parent == NULL) {
+ parent = &stats_root;
+ }
+ nni_mtx_lock(&stats_lock);
+ // Make sure that the lists for both children and parents
+ // are correctly initialized.
+ if (parent->si_children.ll_head.ln_next == NULL) {
+ NNI_LIST_INIT(&parent->si_children, nni_stat_item, si_node);
+ }
+ if (child->si_children.ll_head.ln_next == NULL) {
+ NNI_LIST_INIT(&child->si_children, nni_stat_item, si_node);
+ }
+ nni_list_append(&parent->si_children, child);
+ child->si_parent = parent;
+ nni_mtx_unlock(&stats_lock);
+#else
+ NNI_ARG_UNUSED(parent);
+ NNI_ARG_UNUSED(child);
+#endif
+}
+
+void
+nni_stat_remove(nni_stat_item *child)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_item *parent;
+ nni_mtx_lock(&stats_lock);
+ if ((parent = child->si_parent) != NULL) {
+ nni_list_remove(&parent->si_children, child);
+ child->si_parent = NULL;
+ }
+ nni_mtx_unlock(&stats_lock);
+#else
+ NNI_ARG_UNUSED(child);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+void
+nni_stat_init(nni_stat_item *stat, const char *name, const char *desc)
+{
+ NNI_LIST_INIT(&stat->si_children, nni_stat_item, si_node);
+ stat->si_parent = NULL;
+ stat->si_name = name;
+ stat->si_desc = desc;
+ stat->si_lock = NULL;
+ stat->si_update = NULL;
+ stat->si_private = NULL;
+ stat->si_string = NULL;
+ stat->si_value = 0;
+ stat->si_type = NNG_STAT_COUNTER;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_scope(nni_stat_item *stat, const char *name, const char *desc)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_type = NNG_STAT_SCOPE;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_string(
+ nni_stat_item *stat, const char *name, const char *desc, const char *str)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_string = str;
+ stat->si_type = NNG_STAT_STRING;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_id(
+ nni_stat_item *stat, const char *name, const char *desc, uint64_t id)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_value = id;
+ stat->si_type = NNG_STAT_ID;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+void
+nni_stat_init_bool(
+ nni_stat_item *stat, const char *name, const char *desc, bool v)
+{
+ nni_stat_init(stat, name, desc);
+ stat->si_value = v ? 1 : 0;
+ stat->si_type = NNG_STAT_BOOLEAN;
+ stat->si_unit = NNG_UNIT_NONE;
+}
+
+static void
+stat_atomic_update(nni_stat_item *stat, void *notused)
+{
+ NNI_ARG_UNUSED(notused);
+ stat->si_value = nni_atomic_get64(&stat->si_atomic);
+}
+
+void
+nni_stat_init_atomic(nni_stat_item *stat, const char *name, const char *desc)
+{
+
+ nni_stat_init(stat, name, desc);
+ stat->si_value = 0;
+ stat->si_private = NULL;
+ stat->si_update = stat_atomic_update;
+ nni_atomic_init64(&stat->si_atomic);
+}
+
+void
+nni_stat_inc_atomic(nni_stat_item *stat, uint64_t inc)
+{
+ nni_atomic_inc64(&stat->si_atomic, inc);
+}
+
+void
+nni_stat_dec_atomic(nni_stat_item *stat, uint64_t inc)
+{
+ nni_atomic_dec64(&stat->si_atomic, inc);
+}
+#endif
+
+void
+nni_stat_set_value(nni_stat_item *stat, uint64_t v)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_value = v;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(v);
+#endif
+}
+
+void
+nni_stat_set_string(nni_stat_item *stat, const char *str)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_string = str;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(str);
+#endif
+}
+
+void
+nni_stat_set_lock(nni_stat_item *stat, nni_mtx *mtx)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_lock = mtx;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(mtx);
+#endif
+}
+
+void
+nni_stat_set_update(nni_stat_item *stat, nni_stat_update f, void *a)
+{
+#ifdef NNG_ENABLE_STATS
+ stat->si_update = f;
+ stat->si_private = a;
+#else
+ NNI_ARG_UNUSED(stat);
+ NNI_ARG_UNUSED(f);
+ NNI_ARG_UNUSED(a);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+void
+nni_stat_set_type(nni_stat_item *stat, int type)
+{
+ stat->si_type = type;
+}
+
+void
+nni_stat_set_unit(nni_stat_item *stat, int unit)
+{
+ stat->si_unit = unit;
+}
+#endif
+
+void
+nng_stats_free(nni_stat *st)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat *child;
+
+ while ((child = nni_list_first(&st->s_children)) != NULL) {
+ nni_list_remove(&st->s_children, child);
+ nng_stats_free(child);
+ }
+ nni_strfree(st->s_name);
+ NNI_FREE_STRUCT(st);
+#else
+ NNI_ARG_UNUSED(st);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+static int
+stat_make_tree(nni_stat_item *item, nni_stat **sp)
+{
+ nni_stat * stat;
+ nni_stat_item *child;
+
+ if ((stat = NNI_ALLOC_STRUCT(stat)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((stat->s_name = nni_strdup(item->si_name)) == NULL) {
+ NNI_FREE_STRUCT(stat);
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&stat->s_children, nni_stat, s_node);
+ stat->s_item = item;
+ stat->s_type = item->si_type;
+ stat->s_unit = item->si_unit;
+ stat->s_desc = item->si_desc;
+ stat->s_parent = NULL;
+
+ NNI_LIST_FOREACH (&item->si_children, child) {
+ nni_stat *cs;
+ int rv;
+ if ((rv = stat_make_tree(child, &cs)) != 0) {
+ nng_stats_free(stat);
+ return (rv);
+ }
+ nni_list_append(&stat->s_children, cs);
+ cs->s_parent = stat;
+ }
+ *sp = stat;
+ return (0);
+}
+
+static void
+stat_update(nni_stat *stat)
+{
+ nni_stat_item *item = stat->s_item;
+
+ if (item->si_lock != stats_held) {
+ if (stats_held != NULL) {
+ nni_mtx_unlock(stats_held);
+ stats_held = NULL;
+ }
+ if (item->si_lock != NULL) {
+ nni_mtx_lock(item->si_lock);
+ stats_held = item->si_lock;
+ }
+ }
+ if (item->si_update != NULL) {
+ item->si_update(item, item->si_private);
+ }
+ stat->s_value = item->si_value;
+ stat->s_string = item->si_string;
+ stat->s_time = nni_clock();
+}
+
+static void
+stat_update_tree(nni_stat *stat)
+{
+ nni_stat *child;
+ stat_update(stat);
+ NNI_LIST_FOREACH (&stat->s_children, child) {
+ stat_update_tree(child);
+ }
+}
+
+int
+nni_stat_snapshot(nni_stat **statp, nni_stat_item *item)
+{
+ int rv;
+ nni_stat *stat;
+
+ if (item == NULL) {
+ item = &stats_root;
+ }
+ nni_mtx_lock(&stats_lock);
+ if ((rv = stat_make_tree(item, &stat)) != 0) {
+ nni_mtx_unlock(&stats_lock);
+ return (rv);
+ }
+ stat_update_tree(stat);
+ if (stats_held != NULL) {
+ nni_mtx_unlock(stats_held);
+ stats_held = NULL;
+ }
+ nni_mtx_unlock(&stats_lock);
+ *statp = stat;
+ return (0);
+}
+#endif
+
+int
+nng_stats_get(nng_stat **statp)
+{
+#ifdef NNG_ENABLE_STATS
+ return (nni_stat_snapshot(statp, &stats_root));
+#else
+ NNI_ARG_UNUSED(statp);
+ return (NNG_ENOTSUP);
+#endif
+}
+
+nng_stat *
+nng_stat_parent(nng_stat *stat)
+{
+ return (stat->s_parent);
+}
+
+nng_stat *
+nng_stat_next(nng_stat *stat)
+{
+ if (stat->s_parent == NULL) {
+ return (NULL); // Root node, no siblings.
+ }
+ return (nni_list_next(&stat->s_parent->s_children, stat));
+}
+
+nng_stat *
+nng_stat_child(nng_stat *stat)
+{
+ return (nni_list_first(&stat->s_children));
+}
+
+const char *
+nng_stat_name(nni_stat *stat)
+{
+ return (stat->s_name);
+}
+
+uint64_t
+nng_stat_value(nni_stat *stat)
+{
+ return (stat->s_value);
+}
+
+const char *
+nng_stat_string(nng_stat *stat)
+{
+ return (stat->s_string);
+}
+
+uint64_t
+nng_stat_timestamp(nng_stat *stat)
+{
+ return ((uint64_t) stat->s_time);
+}
+
+int
+nng_stat_type(nng_stat *stat)
+{
+ return (stat->s_type);
+}
+
+int
+nng_stat_unit(nng_stat *stat)
+{
+ return (stat->s_unit);
+}
+
+const char *
+nng_stat_desc(nng_stat *stat)
+{
+ return (stat->s_desc);
+}
+
+int
+nni_stat_sys_init(void)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_mtx_init(&stats_lock);
+ NNI_LIST_INIT(&stats_root.si_children, nni_stat_item, si_node);
+ stats_root.si_name = "";
+ stats_root.si_desc = "all statistsics";
+#endif
+
+ return (0);
+}
+
+void
+nni_stat_sys_fini(void)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_mtx_fini(&stats_lock);
+#endif
+}
+
+#ifdef NNG_ENABLE_STATS
+void
+stat_sprint_scope(nni_stat *stat, char **scope, int *lenp)
+{
+ if (stat->s_parent != NULL) {
+ stat_sprint_scope(stat->s_parent, scope, lenp);
+ }
+ if (strlen(stat->s_name) > 0) {
+ snprintf(*scope, *lenp, "%s.", stat->s_name);
+ } else {
+ (*scope)[0] = '\0';
+ }
+ *lenp -= strlen(*scope);
+ *scope += strlen(*scope);
+}
+#endif
+
+void
+nng_stats_dump(nng_stat *stat)
+{
+#ifdef NNG_ENABLE_STATS
+ static char buf[128]; // to minimize recursion, not thread safe
+ static char line[128];
+ int len;
+ char * scope;
+ char * indent = " ";
+ unsigned long long val;
+ nni_stat * child;
+
+ switch (nng_stat_type(stat)) {
+ case NNG_STAT_SCOPE:
+ scope = buf;
+ len = sizeof(buf);
+ stat_sprint_scope(stat, &scope, &len);
+ len = strlen(buf);
+ if (len > 0) {
+ if (buf[len - 1] == '.') {
+ buf[--len] = '\0';
+ }
+ }
+ if (len > 0) {
+ snprintf(line, sizeof(line), "\n%s:", buf);
+ }
+ break;
+ case NNG_STAT_STRING:
+ snprintf(line, sizeof(line), "%s%-32s\"%s\"", indent,
+ nng_stat_name(stat), nng_stat_string(stat));
+ break;
+ case NNG_STAT_BOOLEAN:
+ val = nng_stat_value(stat);
+ snprintf(line, sizeof(line), "%s%-32s%s", indent,
+ nng_stat_name(stat), val != 0 ? "true" : "false");
+ break;
+ case NNG_STAT_LEVEL:
+ case NNG_STAT_COUNTER:
+ val = nng_stat_value(stat);
+ switch (nng_stat_unit(stat)) {
+ case NNG_UNIT_BYTES:
+ snprintf(line, sizeof(line), "%s%-32s%llu bytes",
+ indent, nng_stat_name(stat), val);
+ break;
+ case NNG_UNIT_MESSAGES:
+ snprintf(line, sizeof(line), "%s%-32s%llu msgs",
+ indent, nng_stat_name(stat), val);
+ break;
+ case NNG_UNIT_MILLIS:
+ snprintf(line, sizeof(line), "%s%-32s%llu msec",
+ indent, nng_stat_name(stat), val);
+ break;
+ case NNG_UNIT_NONE:
+ case NNG_UNIT_EVENTS:
+ default:
+ snprintf(line, sizeof(line), "%s%-32s%llu", indent,
+ nng_stat_name(stat), val);
+ break;
+ }
+ break;
+ case NNG_STAT_ID:
+ val = nng_stat_value(stat);
+ snprintf(line, (sizeof line), "%s%-32s%llu", indent,
+ nng_stat_name(stat), val);
+ break;
+ default:
+ snprintf(line, (sizeof line), "%s%-32s<?>", indent,
+ nng_stat_name(stat));
+ break;
+ }
+ nni_plat_println(line);
+
+ NNI_LIST_FOREACH (&stat->s_children, child) {
+ nng_stats_dump(child);
+ }
+#else
+ NNI_ARG_UNUSED(stat);
+#endif
+}
diff --git a/src/core/stats.h b/src/core/stats.h
new file mode 100644
index 00000000..8cfe14de
--- /dev/null
+++ b/src/core/stats.h
@@ -0,0 +1,108 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_STATS_H
+#define CORE_STATS_H
+
+#include "core/defs.h"
+
+// Statistics support. This is inspired in part by the Solaris
+// kstats framework, but we've simplified and tuned it for our use.
+//
+// Collection of the stats will be done in two steps. First we
+// will walk the list of stats, with the chain held, allocating
+// a user local copy of the stat and pointers.
+//
+// In phase 2, we run the update, and copy the values. We conditionally
+// acquire the lock on the stat first though.
+
+typedef struct nni_stat_item nni_stat_item;
+
+typedef void (*nni_stat_update)(nni_stat_item *, void *);
+typedef enum nng_stat_type_enum nni_stat_type;
+typedef enum nng_unit_enum nni_stat_unit;
+
+// nni_stat_item is used by providers
+struct nni_stat_item {
+#ifdef NNG_ENABLE_STATS
+ nni_list_node si_node; // list node, framework use only
+ nni_stat_item * si_parent; // link back to parent, framework use only
+ nni_list si_children; // children, framework use only
+ const char * si_name; // name of statistic
+ const char * si_desc; // description of statistic (English)
+ nni_mtx * si_lock; // lock for accessing, can be NULL
+ void * si_private; // provider private pointer
+ nni_stat_type si_type; // type of stat, e.g. NNG_STAT_LEVEL
+ nni_stat_unit si_unit; // units, e.g. NNG_UNIT_MILLIS
+ nni_stat_update si_update; // update function (can be NULL)
+ const char * si_string; // string value (NULL for numerics)
+ uint64_t si_value; // numeric value
+ nni_atomic_u64 si_atomic; // atomic value
+#endif
+};
+
+void nni_stat_append(nni_stat_item *, nni_stat_item *);
+void nni_stat_remove(nni_stat_item *);
+
+void nni_stat_set_value(nni_stat_item *, uint64_t);
+void nni_stat_set_string(nni_stat_item *, const char *);
+void nni_stat_set_lock(nni_stat_item *, nni_mtx *);
+void nni_stat_set_update(nni_stat_item *, nni_stat_update, void *);
+
+#ifdef NNG_ENABLE_STATS
+void nni_stat_init(nni_stat_item *, const char *, const char *);
+void nni_stat_init_scope(nni_stat_item *, const char *, const char *);
+void nni_stat_init_string(
+ nni_stat_item *, const char *, const char *, const char *);
+void nni_stat_init_id(nni_stat_item *, const char *, const char *, uint64_t);
+void nni_stat_init_bool(nni_stat_item *, const char *, const char *, bool);
+void nni_stat_init_atomic(nni_stat_item *, const char *, const char *);
+void nni_stat_inc_atomic(nni_stat_item *, uint64_t);
+void nni_stat_dec_atomic(nni_stat_item *, uint64_t);
+void nni_stat_set_type(nni_stat_item *, int);
+void nni_stat_set_unit(nni_stat_item *, int);
+#else
+// We override initialization so that we can avoid compiling static strings
+// into the binary. Presumably if stats are disabled, we are trying to save
+// space for constrained environments. We do evaluate an unused arg to
+// prevent the compiler from bitching about unused values.
+#define nni_stat_init(a, b, c) ((void) (a))
+#define nni_stat_init_scope(a, b, c) ((void) (a))
+#define nni_stat_init_atomic(a, b, c) ((void) (a))
+#define nni_stat_init_id(a, b, c, d) ((void) (a))
+#define nni_stat_init_bool(a, b, c, d) ((void) (a))
+#define nni_stat_init_string(a, b, c, d) ((void) (a))
+#define nni_stat_set_unit(a, b) ((void) (a))
+#define nni_stat_set_type(a, b) ((void) (a))
+#define nni_stat_inc_atomic(stat, inc)
+#define nni_stat_dec_atomic(stat, inc)
+#endif
+
+#if 0
+#define nni_stat_append(a, b)
+#define nni_stat_remove(a)
+#define nni_stat_init(a, b, c)
+#define nni_stat_init_scope(a, b, c)
+#define nni_stat_init_string(a, b, c, d)
+#define nni_stat_init_id(a, b, c, d)
+#define nni_stat_init_bool(a, b, c, d)
+#define nni_stat_dec_atomic(a, b)
+#define nni_stat_set_value(a, b)
+#define nni_stat_set_string(a, b)
+#define nni_stat_set_lock(a, b)
+#define nni_stat_set_update(a, b, c)
+#define nni_stat_set_type(a, b)
+#define nni_stat_set_unit(a, b)
+#endif
+
+int nni_stat_sys_init(void);
+void nni_stat_sys_fini(void);
+
+#endif // CORE_STATS_H
diff --git a/src/nng.c b/src/nng.c
index 64968cae..f98727ce 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1594,74 +1594,6 @@ nng_aio_begin(nng_aio *aio)
return (true);
}
-#if 0
-int
-nng_snapshot_create(nng_socket sock, nng_snapshot **snapp)
-{
- // Stats TBD.
- NNI_ARG_UNUSED(sock)
- NNI_ARG_UNUSED(snapp)
- return (NNG_ENOTSUP);
-}
-
-void
-nng_snapshot_free(nng_snapshot *snap)
-{
- NNI_ARG_UNUSED(snap)
- // Stats TBD.
-}
-
-int
-nng_snapshot_update(nng_snapshot *snap)
-{
- NNI_ARG_UNUSED(snap)
- // Stats TBD.
- return (NNG_ENOTSUP);
-}
-
-int
-nng_snapshot_next(nng_snapshot *snap, nng_stat **statp)
-{
- NNI_ARG_UNUSED(snap)
- NNI_ARG_UNUSED(statp)
- // Stats TBD.
- *statp = NULL;
- return (NNG_ENOTSUP);
-}
-
-const char *
-nng_stat_name(nng_stat *stat)
-{
- NNI_ARG_UNUSED(stat)
- // Stats TBD.
- return (NULL);
-}
-
-int
-nng_stat_type(nng_stat *stat)
-{
- NNI_ARG_UNUSED(stat)
- // Stats TBD.
- return (0);
-}
-
-int
-nng_stat_unit(nng_stat *stat)
-{
- NNI_ARG_UNUSED(stat)
- // Stats TBD.
- return (0);
-}
-
-int64_t
-nng_stat_value(nng_stat *stat)
-{
- NNI_ARG_UNUSED(stat)
- // Stats TBD.
- return (0);
-}
-#endif
-
int
nng_url_parse(nng_url **result, const char *ustr)
{
diff --git a/src/nng.h b/src/nng.h
index 863ce363..0af21fc7 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -90,11 +90,10 @@ typedef struct nng_socket_s {
uint32_t id;
} nng_socket;
-typedef int32_t nng_duration; // in milliseconds
-typedef struct nng_msg nng_msg;
-typedef struct nng_snapshot nng_snapshot;
-typedef struct nng_stat nng_stat;
-typedef struct nng_aio nng_aio;
+typedef int32_t nng_duration; // in milliseconds
+typedef struct nng_msg nng_msg;
+typedef struct nng_stat nng_stat;
+typedef struct nng_aio nng_aio;
// Initializers.
// clang-format off
@@ -762,36 +761,37 @@ enum nng_flag_enum {
// but the individual statistic names, values, and meanings are all
// subject to change.
-// nng_snapshot_create creates a statistics snapshot. The snapshot
-// object must be deallocated expressly by the user, and may persist beyond
-// the lifetime of any socket object used to update it. Note that the
-// values of the statistics are initially unset.
-// NNG_DECL int nng_snapshot_create(nng_socket, nng_snapshot **);
-
-// nng_snapshot_free frees a snapshot object. All statistic objects
-// contained therein are destroyed as well.
-// NNG_DECL void nng_snapshot_free(nng_snapshot *);
-
-// nng_snapshot_update updates a snapshot of all the statistics
-// relevant to a particular socket. All prior values are overwritten.
-// NNG_DECL int nng_snapshot_update(nng_snapshot *);
-
-// nng_snapshot_next is used to iterate over the individual statistic
-// objects inside the snapshot. Note that the statistic object, and the
-// meta-data for the object (name, type, units) is fixed, and does not
-// change for the entire life of the snapshot. Only the value
-// is subject to change, and then only when a snapshot is updated.
-//
-// Iteration begins by providing NULL in the value referenced. Successive
-// calls will update this value, returning NULL when no more statistics
-// are available in the snapshot.
-// NNG_DECL int nng_snapshot_next(nng_snapshot *, nng_stat **);
+// nng_stats_get takes a snapshot of the entire set of statistics.
+// While the operation can be somewhat expensive (allocations), it
+// is done in a way that minimizes impact to running operations.
+// Note that the statistics are provided as tree, with parents
+// used for grouping, and with child statistics underneath. The
+// top stat returned will be of type NNG_STAT_SCOPE with name "".
+// Applications may choose to consider this root scope as "root", if
+// the empty string is not suitable.
+NNG_DECL int nng_stats_get(nng_stat **);
+
+// nng_stats_free frees a previous list of snapshots. This should only
+// be called on the parent statistic that obtained via nng_stats_get.
+NNG_DECL void nng_stats_free(nng_stat *);
+
+// nng_stats_dump is a debugging function that dumps the entire set of
+// statistics to stdout.
+NNG_DECL void nng_stats_dump(nng_stat *);
+
+// nng_stat_next finds the next sibling for the current stat. If there
+// are no more siblings, it returns NULL.
+NNG_DECL nng_stat *nng_stat_next(nng_stat *);
+
+// nng_stat_child finds the first child of the current stat. If no children
+// exist, then NULL is returned.
+NNG_DECL nng_stat *nng_stat_child(nng_stat *);
// nng_stat_name is used to determine the name of the statistic.
// This is a human readable name. Statistic names, as well as the presence
// or absence or semantic of any particular statistic are not part of any
// stable API, and may be changed without notice in future updates.
-// NNG_DECL const char *nng_stat_name(nng_stat *);
+NNG_DECL const char *nng_stat_name(nng_stat *);
// nng_stat_type is used to determine the type of the statistic.
// At present, only NNG_STAT_TYPE_LEVEL and and NNG_STAT_TYPE_COUNTER
@@ -799,32 +799,48 @@ enum nng_flag_enum {
// value over time are likely more interesting than the actual level. Level
// values reflect some absolute state however, and should be presented to the
// user as is.
-// NNG_DECL int nng_stat_type(nng_stat *);
+NNG_DECL int nng_stat_type(nng_stat *);
enum nng_stat_type_enum {
- NNG_STAT_LEVEL = 0, // Numeric "absolute" value, diffs meaningless
- NNG_STAT_COUNTER = 1 // Incrementing value (diffs are meaningful)
+ NNG_STAT_SCOPE = 0, // Stat is for scoping, and carries no value
+ NNG_STAT_LEVEL = 1, // Numeric "absolute" value, diffs meaningless
+ NNG_STAT_COUNTER = 2, // Incrementing value (diffs are meaningful)
+ NNG_STAT_STRING = 3, // Value is a string
+ NNG_STAT_BOOLEAN = 4, // Value is a boolean
+ NNG_STAT_ID = 5, // Value is a numeric ID
};
// nng_stat_unit provides information about the unit for the statistic,
// such as NNG_UNIT_BYTES or NNG_UNIT_BYTES. If no specific unit is
-// applicable, such as a relative priority, then NN_UNIT_NONE is
-// returned.
-// NNG_DECL int nng_stat_unit(nng_stat *);
+// applicable, such as a relative priority, then NN_UNIT_NONE is returned.
+NNG_DECL int nng_stat_unit(nng_stat *);
enum nng_unit_enum {
- NNG_UNIT_NONE = 0,
- NNG_UNIT_BYTES = 1,
- NNG_UNIT_MESSAGES = 2,
- NNG_UNIT_BOOLEAN = 3,
- NNG_UNIT_MILLIS = 4,
- NNG_UNIT_EVENTS = 5
+ NNG_UNIT_NONE = 0, // No special units
+ NNG_UNIT_BYTES = 1, // Bytes, e.g. bytes sent, etc.
+ NNG_UNIT_MESSAGES = 2, // Messages, one per message
+ NNG_UNIT_MILLIS = 3, // Milliseconds
+ NNG_UNIT_EVENTS = 4 // Some other type of event
};
// nng_stat_value returns returns the actual value of the statistic.
// Statistic values reflect their value at the time that the corresponding
// snapshot was updated, and are undefined until an update is performed.
-// NNG_DECL int64_t nng_stat_value(nng_stat *);
+NNG_DECL uint64_t nng_stat_value(nng_stat *);
+
+// nng_stat_string returns the string associated with a string statistic,
+// or NULL if the statistic is not part of the string. The value returned
+// is valid until the associated statistic is freed.
+NNG_DECL const char *nng_stat_string(nng_stat *);
+
+// nng_stat_desc returns a human readable description of the statistic.
+// This may be useful for display in diagnostic interfaces, etc.
+NNG_DECL const char *nng_stat_desc(nng_stat *);
+
+// nng_stat_timestamp returns a timestamp (milliseconds) when the statistic
+// was captured. The base offset is the same as used by nng_clock().
+// We don't use nng_time though, because that's in the supplemental header.
+NNG_DECL uint64_t nng_stat_timestamp(nng_stat *);
// Device functionality. This connects two sockets together in a device,
// which means that messages from one side are forwarded to the other.
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index dc250943..3033b196 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -23,6 +23,8 @@
#define NNI_PROTO_PAIR_V1 NNI_PROTO(1, 1)
#endif
+#define BUMPSTAT(x) nni_stat_inc_atomic(x, 1)
+
typedef struct pair1_pipe pair1_pipe;
typedef struct pair1_sock pair1_sock;
@@ -35,16 +37,21 @@ static void pair1_pipe_fini(void *);
// pair1_sock is our per-socket protocol private structure.
struct pair1_sock {
- nni_msgq * uwq;
- nni_msgq * urq;
- bool raw;
- int ttl;
- nni_mtx mtx;
- nni_idhash *pipes;
- nni_list plist;
- bool started;
- bool poly;
- nni_aio * aio_getq;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ nni_sock * nsock;
+ bool raw;
+ int ttl;
+ nni_mtx mtx;
+ nni_idhash * pipes;
+ nni_list plist;
+ bool started;
+ bool poly;
+ nni_aio * aio_getq;
+ nni_stat_item stat_poly;
+ nni_stat_item stat_raw;
+ nni_stat_item stat_rejmismatch;
+ nni_stat_item stat_rejinuse;
};
// pair1_pipe is our per-pipe protocol private structure.
@@ -94,12 +101,29 @@ pair1_sock_init_impl(void **sp, nni_sock *nsock, bool raw)
return (rv);
}
- s->raw = raw;
- s->poly = false;
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
- s->ttl = 8;
- *sp = s;
+ nni_stat_init_bool(
+ &s->stat_poly, "polyamorous", "polyamorous mode?", false);
+ nni_stat_set_lock(&s->stat_poly, &s->mtx);
+ nni_sock_add_stat(nsock, &s->stat_poly);
+
+ nni_stat_init_bool(&s->stat_raw, "raw", "raw mode?", raw);
+ nni_sock_add_stat(nsock, &s->stat_raw);
+
+ nni_stat_init_atomic(&s->stat_rejmismatch, "mismatch",
+ "pipes rejected (protocol mismatch)");
+ nni_sock_add_stat(nsock, &s->stat_rejmismatch);
+
+ nni_stat_init_atomic(&s->stat_rejinuse, "already",
+ "pipes rejected (already connected)");
+ nni_sock_add_stat(nsock, &s->stat_rejinuse);
+
+ s->nsock = nsock;
+ s->raw = raw;
+ s->poly = false;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+ s->ttl = 8;
+ *sp = s;
return (0);
}
@@ -173,13 +197,15 @@ pair1_pipe_start(void *arg)
uint32_t id;
int rv;
+ nni_mtx_lock(&s->mtx);
if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V1) {
+ nni_mtx_unlock(&s->mtx);
+ BUMPSTAT(&s->stat_rejmismatch);
// Peer protocol mismatch.
return (NNG_EPROTO);
}
id = nni_pipe_id(p->npipe);
- nni_mtx_lock(&s->mtx);
if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) {
nni_mtx_unlock(&s->mtx);
return (rv);
@@ -188,6 +214,7 @@ pair1_pipe_start(void *arg)
if (!nni_list_empty(&s->plist)) {
nni_idhash_remove(s->pipes, id);
nni_mtx_unlock(&s->mtx);
+ BUMPSTAT(&s->stat_rejinuse);
return (NNG_EBUSY);
}
} else {
@@ -242,6 +269,7 @@ pair1_pipe_recv_cb(void *arg)
uint32_t hdr;
nni_pipe * npipe = p->npipe;
int rv;
+ size_t len;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_close(p->npipe);
@@ -266,10 +294,12 @@ pair1_pipe_recv_cb(void *arg)
nni_pipe_close(npipe);
return;
}
+ len = nni_msg_len(msg);
// If we bounced too many times, discard the message, but
// keep getting more.
if (hdr > (unsigned) s->ttl) {
+ // STAT: bump TTLdrop
nni_msg_free(msg);
nni_pipe_recv(npipe, p->aio_recv);
return;
@@ -277,6 +307,7 @@ pair1_pipe_recv_cb(void *arg)
// Store the hop count in the header.
if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) {
+ // STAT: bump allocfail
nni_msg_free(msg);
nni_pipe_recv(npipe, p->aio_recv);
return;
@@ -284,6 +315,7 @@ pair1_pipe_recv_cb(void *arg)
// Send the message up.
nni_aio_set_msg(p->aio_putq, msg);
+ nni_sock_bump_rx(s->nsock, len);
nni_msgq_aio_put(s->urq, p->aio_putq);
}
@@ -452,6 +484,7 @@ pair1_sock_set_poly(void *arg, const void *buf, size_t sz, nni_opt_type t)
int rv;
nni_mtx_lock(&s->mtx);
rv = s->started ? NNG_ESTATE : nni_copyin_bool(&s->poly, buf, sz, t);
+ nni_stat_set_value(&s->stat_poly, s->poly);
nni_mtx_unlock(&s->mtx);
return (rv);
}
@@ -468,6 +501,7 @@ pair1_sock_send(void *arg, nni_aio *aio)
{
pair1_sock *s = arg;
+ nni_sock_bump_tx(s->nsock, nni_msg_len(nni_aio_get_msg(aio)));
nni_msgq_aio_put(s->uwq, aio);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index ab6486bd..6199e949 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -37,6 +37,16 @@ struct nni_inproc_pipe {
uint16_t peer;
uint16_t proto;
size_t rcvmax;
+ nni_stat_item st_rxbytes;
+ nni_stat_item st_txbytes;
+ nni_stat_item st_rxmsgs;
+ nni_stat_item st_txmsgs;
+ nni_stat_item st_rxdiscards;
+ nni_stat_item st_txdiscards;
+ nni_stat_item st_rxerrs;
+ nni_stat_item st_txerrs;
+ nni_stat_item st_rxoversize;
+ nni_stat_item st_rcvmaxsz;
};
// nni_inproc_pair represents a pair of pipes. Because we control both
@@ -60,6 +70,7 @@ struct nni_inproc_ep {
nni_mtx mtx;
nni_dialer * ndialer;
nni_listener *nlistener;
+ nni_stat_item st_rcvmaxsz;
};
// nni_inproc is our global state - this contains the list of active endpoints
@@ -123,11 +134,120 @@ nni_inproc_pipe_alloc(nni_inproc_pipe **pipep, nni_inproc_ep *ep)
return (0);
}
+#ifdef NNG_ENABLE_STATS
+static void
+inproc_get_rxbytes(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_get_bytes(mq));
+}
+
+static void
+inproc_get_rxmsgs(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_get_msgs(mq));
+}
+
+static void
+inproc_get_txbytes(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_put_bytes(mq));
+}
+
+static void
+inproc_get_txmsgs(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_put_msgs(mq));
+}
+
+static void
+inproc_get_discards(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_discards(mq));
+}
+
+static void
+inproc_get_txerrs(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_put_errs(mq));
+}
+
+static void
+inproc_get_rxerrs(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_get_errs(mq));
+}
+#else
+#undef nni_stat_set_update
+#define nni_stat_set_update(p, x, f)
+#endif
+
static int
nni_inproc_pipe_init(void *arg, nni_pipe *p)
{
nni_inproc_pipe *pipe = arg;
pipe->npipe = p;
+
+ nni_stat_init(&pipe->st_rxbytes, "rxbytes", "bytes received (raw)");
+ nni_stat_set_update(&pipe->st_rxbytes, inproc_get_rxbytes, pipe->rq);
+ nni_stat_set_unit(&pipe->st_rxbytes, NNG_UNIT_BYTES);
+ nni_pipe_add_stat(p, &pipe->st_rxbytes);
+
+ nni_stat_init(&pipe->st_txbytes, "txbytes", "bytes sent (raw)");
+ nni_stat_set_update(&pipe->st_txbytes, inproc_get_txbytes, pipe->wq);
+ nni_stat_set_unit(&pipe->st_txbytes, NNG_UNIT_BYTES);
+ nni_pipe_add_stat(p, &pipe->st_txbytes);
+
+ nni_stat_init(&pipe->st_rxmsgs, "rxmsgs", "msgs received");
+ nni_stat_set_update(&pipe->st_rxmsgs, inproc_get_rxmsgs, pipe->rq);
+ nni_stat_set_unit(&pipe->st_rxmsgs, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_rxmsgs);
+
+ nni_stat_init(&pipe->st_txmsgs, "txmsgs", "msgs sent");
+ nni_stat_set_update(&pipe->st_txmsgs, inproc_get_txmsgs, pipe->wq);
+ nni_stat_set_unit(&pipe->st_txmsgs, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_txmsgs);
+
+ nni_stat_init(
+ &pipe->st_rxdiscards, "rxdiscards", "receives discarded");
+ nni_stat_set_update(
+ &pipe->st_rxdiscards, inproc_get_discards, pipe->rq);
+ nni_stat_set_unit(&pipe->st_rxdiscards, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_rxdiscards);
+
+ nni_stat_init(&pipe->st_txdiscards, "txdiscards", "sends discarded");
+ nni_stat_set_update(
+ &pipe->st_txdiscards, inproc_get_discards, pipe->wq);
+ nni_stat_set_unit(&pipe->st_txdiscards, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_txdiscards);
+
+ nni_stat_init(&pipe->st_rxerrs, "rxerrs", "receive errors");
+ nni_stat_set_update(&pipe->st_rxerrs, inproc_get_rxerrs, pipe->rq);
+ nni_stat_set_unit(&pipe->st_rxerrs, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_rxerrs);
+
+ nni_stat_init(&pipe->st_txerrs, "txerrs", "send errors");
+ nni_stat_set_update(&pipe->st_txerrs, inproc_get_txerrs, pipe->wq);
+ nni_stat_set_unit(&pipe->st_txerrs, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_txerrs);
+
+ nni_stat_init_atomic(&pipe->st_rxoversize, "rxoversize",
+ "oversize msgs received (dropped)");
+ nni_stat_set_unit(&pipe->st_rxoversize, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_rxoversize);
+
+ nni_stat_init(&pipe->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
+ nni_stat_set_type(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES);
+ nni_stat_set_unit(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES);
+ nni_stat_set_value(&pipe->st_rcvmaxsz, pipe->rcvmax);
+ nni_pipe_add_stat(p, &pipe->st_rcvmaxsz);
+
return (0);
}
@@ -225,7 +345,15 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer)
nni_aio_list_init(&ep->aios);
ep->addr = url->u_rawurl; // we match on the full URL.
- *epp = ep;
+
+ nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
+ nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL);
+ nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES);
+ nni_stat_set_lock(&ep->st_rcvmaxsz, &ep->mtx);
+
+ nni_dialer_add_stat(ndialer, &ep->st_rcvmaxsz);
+
+ *epp = ep;
return (0);
}
@@ -248,7 +376,14 @@ nni_inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener)
nni_aio_list_init(&ep->aios);
ep->addr = url->u_rawurl; // we match on the full URL.
- *epp = ep;
+
+ nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
+ nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL);
+ nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES);
+ nni_stat_set_lock(&ep->st_rcvmaxsz, &ep->mtx);
+ nni_listener_add_stat(nlistener, &ep->st_rcvmaxsz);
+
+ *epp = ep;
return (0);
}
@@ -284,6 +419,7 @@ inproc_filter(void *arg, nni_msg *msg)
{
nni_inproc_pipe *p = arg;
if (p->rcvmax && (nni_msg_len(msg) > p->rcvmax)) {
+ nni_stat_inc_atomic(&p->st_rxoversize, 1);
nni_msg_free(msg);
return (NULL);
}
@@ -509,6 +645,7 @@ inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
+ nni_stat_set_value(&ep->st_rcvmaxsz, val);
nni_mtx_unlock(&ep->mtx);
}
return (rv);