summaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c156
1 files changed, 105 insertions, 51 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index b93d9f64..8ba35b2b 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -58,7 +58,7 @@ pipe_destroy(nni_pipe *p)
nni_id_remove(&pipes, p->p_id);
}
// This wait guarantees that all callers are done with us.
- while (p->p_refcnt != 0) {
+ while (p->p_ref != 0) {
nni_cv_wait(&p->p_cv);
}
nni_mtx_unlock(&pipes_lk);
@@ -70,7 +70,9 @@ pipe_destroy(nni_pipe *p)
p->p_tran_ops.p_stop(p->p_tran_data);
}
- nni_stat_unregister(&p->p_stats.s_root);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_unregister(&p->st_root);
+#endif
nni_pipe_remove(p);
if (p->p_proto_data != NULL) {
@@ -93,9 +95,9 @@ nni_pipe_find(nni_pipe **pp, uint32_t id)
// access to the pipe in order to obtain properties (which may
// be retried during the post-close notification callback) or to
// close the pipe.
- nni_mtx_lock(&pipes_lk);
+ nni_mtx_lock(&pipes_lk);
if ((p = nni_id_get(&pipes, id)) != NULL) {
- p->p_refcnt++;
+ p->p_ref++;
*pp = p;
}
nni_mtx_unlock(&pipes_lk);
@@ -106,8 +108,8 @@ void
nni_pipe_rele(nni_pipe *p)
{
nni_mtx_lock(&pipes_lk);
- p->p_refcnt--;
- if (p->p_refcnt == 0) {
+ p->p_ref--;
+ if (p->p_ref == 0) {
nni_cv_wake(&p->p_cv);
}
nni_mtx_unlock(&pipes_lk);
@@ -165,6 +167,75 @@ nni_pipe_peer(nni_pipe *p)
return (p->p_tran_ops.p_peer(p->p_tran_data));
}
+#ifdef NNG_ENABLE_STATS
+static void
+pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info)
+{
+ nni_stat_init(item, info);
+ nni_stat_add(&p->st_root, item);
+}
+
+static void
+pipe_stats_init(nni_pipe *p)
+{
+ static const nni_stat_info root_info = {
+ .si_name = "pipe",
+ .si_desc = "pipe statistics",
+ .si_type = NNG_STAT_SCOPE,
+ };
+ static const nni_stat_info id_info = {
+ .si_name = "id",
+ .si_desc = "pipe id",
+ .si_type = NNG_STAT_ID,
+ };
+ static const nni_stat_info socket_info = {
+ .si_name = "socket",
+ .si_desc = "socket for pipe",
+ .si_type = NNG_STAT_ID,
+ };
+ static const nni_stat_info rx_msgs_info = {
+ .si_name = "rx_msgs",
+ .si_desc = "messages received",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info tx_msgs_info = {
+ .si_name = "tx_msgs",
+ .si_desc = "messages sent",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info rx_bytes_info = {
+ .si_name = "rx_bytes",
+ .si_desc = "bytes received",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_BYTES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info tx_bytes_info = {
+ .si_name = "tx_bytes",
+ .si_desc = "bytes sent",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_BYTES,
+ .si_atomic = true,
+ };
+
+ nni_stat_init(&p->st_root, &root_info);
+ pipe_stat_init(p, &p->st_id, &id_info);
+ pipe_stat_init(p, &p->st_sock_id, &socket_info);
+ pipe_stat_init(p, &p->st_rx_msgs, &rx_msgs_info);
+ pipe_stat_init(p, &p->st_tx_msgs, &tx_msgs_info);
+ pipe_stat_init(p, &p->st_rx_bytes, &rx_bytes_info);
+ pipe_stat_init(p, &p->st_tx_bytes, &tx_bytes_info);
+
+ nni_stat_set_id(&p->st_root, p->p_id);
+ nni_stat_set_id(&p->st_id, p->p_id);
+ nni_stat_set_id(&p->st_sock_id, nni_sock_id(p->p_sock));
+}
+#endif // NNG_ENABLE_STATS
+
static int
pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
@@ -172,7 +243,6 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
int rv;
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
- nni_pipe_stats * st;
size_t sz;
sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size;
@@ -191,8 +261,7 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
p->p_sock = sock;
p->p_closed = false;
p->p_cbs = false;
- p->p_refcnt = 0;
- st = &p->p_stats;
+ p->p_ref = 0;
nni_atomic_flag_reset(&p->p_stop);
NNI_LIST_NODE_INIT(&p->p_sock_node);
@@ -203,32 +272,13 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_mtx_lock(&pipes_lk);
if ((rv = nni_id_alloc(&pipes, &p->p_id, p)) == 0) {
- p->p_refcnt = 1;
+ p->p_ref = 1;
}
nni_mtx_unlock(&pipes_lk);
- snprintf(st->s_scope, sizeof(st->s_scope), "pipe%u", p->p_id);
-
- nni_stat_init_scope(&st->s_root, st->s_scope, "pipe statistics");
-
- nni_stat_init_id(&st->s_id, "id", "pipe id", p->p_id);
- nni_stat_add(&st->s_root, &st->s_id);
-
- nni_stat_init_id(&st->s_sock_id, "socket", "socket for pipe",
- nni_sock_id(p->p_sock));
- nni_stat_add(&st->s_root, &st->s_sock_id);
- nni_stat_init_atomic(&st->s_rxmsgs, "rxmsgs", "messages received");
- nni_stat_set_unit(&st->s_rxmsgs, NNG_UNIT_MESSAGES);
- nni_stat_add(&st->s_root, &st->s_rxmsgs);
- nni_stat_init_atomic(&st->s_txmsgs, "txmsgs", "messages sent");
- nni_stat_set_unit(&st->s_txmsgs, NNG_UNIT_MESSAGES);
- nni_stat_add(&st->s_root, &st->s_txmsgs);
- nni_stat_init_atomic(&st->s_rxbytes, "rxbytes", "bytes received");
- nni_stat_set_unit(&st->s_rxbytes, NNG_UNIT_BYTES);
- nni_stat_add(&st->s_root, &st->s_rxbytes);
- nni_stat_init_atomic(&st->s_txbytes, "txbytes", "bytes sent");
- nni_stat_set_unit(&st->s_txbytes, NNG_UNIT_BYTES);
- nni_stat_add(&st->s_root, &st->s_txbytes);
+#ifdef NNG_ENABLE_STATS
+ pipe_stats_init(p);
+#endif
if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tdata, p)) != 0) ||
((rv = pops->pipe_init(p->p_proto_data, p, sdata)) != 0)) {
@@ -247,18 +297,20 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tdata)
int rv;
nni_tran * tran = d->d_tran;
nni_pipe * p;
- nni_stat_item *st;
-#ifdef NNG_ENABLE_STATS
- uint64_t id = nni_dialer_id(d);
-#endif
if ((rv = pipe_create(&p, d->d_sock, tran, tdata)) != 0) {
return (rv);
}
- st = &p->p_stats.s_ep_id;
p->p_dialer = d;
- nni_stat_init_id(st, "dialer", "dialer for pipe", id);
- nni_pipe_add_stat(p, st);
+#ifdef NNG_ENABLE_STATS
+ static const nni_stat_info dialer_info = {
+ .si_name = "dialer",
+ .si_desc = "dialer for pipe",
+ .si_type = NNG_STAT_ID,
+ };
+ pipe_stat_init(p, &p->st_ep_id, &dialer_info);
+ nni_stat_set_id(&p->st_ep_id, nni_dialer_id(d));
+#endif
*pp = p;
return (0);
}
@@ -269,18 +321,20 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tdata)
int rv;
nni_tran * tran = l->l_tran;
nni_pipe * p;
- nni_stat_item *st;
-#ifdef NNG_ENABLE_STATS
- uint64_t id = nni_listener_id(l);
-#endif
if ((rv = pipe_create(&p, l->l_sock, tran, tdata)) != 0) {
return (rv);
}
- st = &p->p_stats.s_ep_id;
p->p_listener = l;
- nni_stat_init_id(st, "listener", "listener for pipe", id);
- nni_pipe_add_stat(p, st);
+#if NNG_ENABLE_STATS
+ static const nni_stat_info listener_info = {
+ .si_name = "listener",
+ .si_desc = "listener for pipe",
+ .si_type = NNG_STAT_ID,
+ };
+ pipe_stat_init(p, &p->st_ep_id, &listener_info);
+ nni_stat_set_id(&p->st_ep_id, nni_listener_id(l));
+#endif
*pp = p;
return (0);
}
@@ -334,15 +388,15 @@ nni_pipe_dialer_id(nni_pipe *p)
void
nni_pipe_add_stat(nni_pipe *p, nni_stat_item *item)
{
- nni_stat_add(&p->p_stats.s_root, item);
+ nni_stat_add(&p->st_root, item);
}
void
nni_pipe_bump_rx(nni_pipe *p, size_t nbytes)
{
#ifdef NNG_ENABLE_STATS
- nni_stat_inc_atomic(&p->p_stats.s_rxbytes, nbytes);
- nni_stat_inc_atomic(&p->p_stats.s_rxmsgs, 1);
+ nni_stat_inc(&p->st_rx_bytes, nbytes);
+ nni_stat_inc(&p->st_rx_msgs, 1);
#else
NNI_ARG_UNUSED(p);
NNI_ARG_UNUSED(nbytes);
@@ -353,8 +407,8 @@ void
nni_pipe_bump_tx(nni_pipe *p, size_t nbytes)
{
#ifdef NNG_ENABLE_STATS
- nni_stat_inc_atomic(&p->p_stats.s_txbytes, nbytes);
- nni_stat_inc_atomic(&p->p_stats.s_txmsgs, 1);
+ nni_stat_inc(&p->st_tx_bytes, nbytes);
+ nni_stat_inc(&p->st_tx_msgs, 1);
#else
NNI_ARG_UNUSED(p);
NNI_ARG_UNUSED(nbytes);