aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/inproc/inproc.c141
1 files changed, 139 insertions, 2 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index ab6486bd..6199e949 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -37,6 +37,16 @@ struct nni_inproc_pipe {
uint16_t peer;
uint16_t proto;
size_t rcvmax;
+ nni_stat_item st_rxbytes;
+ nni_stat_item st_txbytes;
+ nni_stat_item st_rxmsgs;
+ nni_stat_item st_txmsgs;
+ nni_stat_item st_rxdiscards;
+ nni_stat_item st_txdiscards;
+ nni_stat_item st_rxerrs;
+ nni_stat_item st_txerrs;
+ nni_stat_item st_rxoversize;
+ nni_stat_item st_rcvmaxsz;
};
// nni_inproc_pair represents a pair of pipes. Because we control both
@@ -60,6 +70,7 @@ struct nni_inproc_ep {
nni_mtx mtx;
nni_dialer * ndialer;
nni_listener *nlistener;
+ nni_stat_item st_rcvmaxsz;
};
// nni_inproc is our global state - this contains the list of active endpoints
@@ -123,11 +134,120 @@ nni_inproc_pipe_alloc(nni_inproc_pipe **pipep, nni_inproc_ep *ep)
return (0);
}
+#ifdef NNG_ENABLE_STATS
+static void
+inproc_get_rxbytes(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_get_bytes(mq));
+}
+
+static void
+inproc_get_rxmsgs(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_get_msgs(mq));
+}
+
+static void
+inproc_get_txbytes(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_put_bytes(mq));
+}
+
+static void
+inproc_get_txmsgs(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_put_msgs(mq));
+}
+
+static void
+inproc_get_discards(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_discards(mq));
+}
+
+static void
+inproc_get_txerrs(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_put_errs(mq));
+}
+
+static void
+inproc_get_rxerrs(nni_stat_item *st, void *arg)
+{
+ nni_msgq *mq = arg;
+ nni_stat_set_value(st, nni_msgq_stat_get_errs(mq));
+}
+#else
+#undef nni_stat_set_update
+#define nni_stat_set_update(p, x, f)
+#endif
+
static int
nni_inproc_pipe_init(void *arg, nni_pipe *p)
{
nni_inproc_pipe *pipe = arg;
pipe->npipe = p;
+
+ nni_stat_init(&pipe->st_rxbytes, "rxbytes", "bytes received (raw)");
+ nni_stat_set_update(&pipe->st_rxbytes, inproc_get_rxbytes, pipe->rq);
+ nni_stat_set_unit(&pipe->st_rxbytes, NNG_UNIT_BYTES);
+ nni_pipe_add_stat(p, &pipe->st_rxbytes);
+
+ nni_stat_init(&pipe->st_txbytes, "txbytes", "bytes sent (raw)");
+ nni_stat_set_update(&pipe->st_txbytes, inproc_get_txbytes, pipe->wq);
+ nni_stat_set_unit(&pipe->st_txbytes, NNG_UNIT_BYTES);
+ nni_pipe_add_stat(p, &pipe->st_txbytes);
+
+ nni_stat_init(&pipe->st_rxmsgs, "rxmsgs", "msgs received");
+ nni_stat_set_update(&pipe->st_rxmsgs, inproc_get_rxmsgs, pipe->rq);
+ nni_stat_set_unit(&pipe->st_rxmsgs, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_rxmsgs);
+
+ nni_stat_init(&pipe->st_txmsgs, "txmsgs", "msgs sent");
+ nni_stat_set_update(&pipe->st_txmsgs, inproc_get_txmsgs, pipe->wq);
+ nni_stat_set_unit(&pipe->st_txmsgs, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_txmsgs);
+
+ nni_stat_init(
+ &pipe->st_rxdiscards, "rxdiscards", "receives discarded");
+ nni_stat_set_update(
+ &pipe->st_rxdiscards, inproc_get_discards, pipe->rq);
+ nni_stat_set_unit(&pipe->st_rxdiscards, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_rxdiscards);
+
+ nni_stat_init(&pipe->st_txdiscards, "txdiscards", "sends discarded");
+ nni_stat_set_update(
+ &pipe->st_txdiscards, inproc_get_discards, pipe->wq);
+ nni_stat_set_unit(&pipe->st_txdiscards, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_txdiscards);
+
+ nni_stat_init(&pipe->st_rxerrs, "rxerrs", "receive errors");
+ nni_stat_set_update(&pipe->st_rxerrs, inproc_get_rxerrs, pipe->rq);
+ nni_stat_set_unit(&pipe->st_rxerrs, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_rxerrs);
+
+ nni_stat_init(&pipe->st_txerrs, "txerrs", "send errors");
+ nni_stat_set_update(&pipe->st_txerrs, inproc_get_txerrs, pipe->wq);
+ nni_stat_set_unit(&pipe->st_txerrs, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_txerrs);
+
+ nni_stat_init_atomic(&pipe->st_rxoversize, "rxoversize",
+ "oversize msgs received (dropped)");
+ nni_stat_set_unit(&pipe->st_rxoversize, NNG_UNIT_MESSAGES);
+ nni_pipe_add_stat(p, &pipe->st_rxoversize);
+
+ nni_stat_init(&pipe->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
+ nni_stat_set_type(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES);
+ nni_stat_set_unit(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES);
+ nni_stat_set_value(&pipe->st_rcvmaxsz, pipe->rcvmax);
+ nni_pipe_add_stat(p, &pipe->st_rcvmaxsz);
+
return (0);
}
@@ -225,7 +345,15 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer)
nni_aio_list_init(&ep->aios);
ep->addr = url->u_rawurl; // we match on the full URL.
- *epp = ep;
+
+ nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
+ nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL);
+ nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES);
+ nni_stat_set_lock(&ep->st_rcvmaxsz, &ep->mtx);
+
+ nni_dialer_add_stat(ndialer, &ep->st_rcvmaxsz);
+
+ *epp = ep;
return (0);
}
@@ -248,7 +376,14 @@ nni_inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener)
nni_aio_list_init(&ep->aios);
ep->addr = url->u_rawurl; // we match on the full URL.
- *epp = ep;
+
+ nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
+ nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL);
+ nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES);
+ nni_stat_set_lock(&ep->st_rcvmaxsz, &ep->mtx);
+ nni_listener_add_stat(nlistener, &ep->st_rcvmaxsz);
+
+ *epp = ep;
return (0);
}
@@ -284,6 +419,7 @@ inproc_filter(void *arg, nni_msg *msg)
{
nni_inproc_pipe *p = arg;
if (p->rcvmax && (nni_msg_len(msg) > p->rcvmax)) {
+ nni_stat_inc_atomic(&p->st_rxoversize, 1);
nni_msg_free(msg);
return (NULL);
}
@@ -509,6 +645,7 @@ inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
+ nni_stat_set_value(&ep->st_rcvmaxsz, val);
nni_mtx_unlock(&ep->mtx);
}
return (rv);