aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/nng/nng.h2
-rw-r--r--src/sp/transport/udp/udp.c111
2 files changed, 103 insertions, 10 deletions
diff --git a/include/nng/nng.h b/include/nng/nng.h
index 17965010..1b3eae20 100644
--- a/include/nng/nng.h
+++ b/include/nng/nng.h
@@ -818,6 +818,8 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe);
// low order 16 bits will be set. This is provided in native byte order,
// which makes it more convenient than using the NNG_OPT_LOCADDR option.
#define NNG_OPT_TCP_BOUND_PORT "tcp-bound-port"
+// UDP alias for convenience uses the same value
+#define NNG_OPT_UDP_BOUND_PORT NNG_OPT_TCP_BOUND_PORT
// IPC options. These will largely vary depending on the platform,
// as POSIX systems have very different options than Windows.
diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c
index 8bf5a4d9..10e40397 100644
--- a/src/sp/transport/udp/udp.c
+++ b/src/sp/transport/udp/udp.c
@@ -12,6 +12,7 @@
#include "core/message.h"
#include "core/nng_impl.h"
#include "core/options.h"
+#include "core/pipe.h"
#include "core/platform.h"
#include "nng/nng.h"
@@ -219,6 +220,15 @@ struct udp_ep {
#ifdef NNG_ENABLE_STATS
nni_stat_item st_rcv_max;
+ nni_stat_item st_rcv_reorder;
+ nni_stat_item st_rcv_toobig;
+ nni_stat_item st_rcv_nomatch;
+ nni_stat_item st_rcv_copy;
+ nni_stat_item st_rcv_nocopy;
+ nni_stat_item st_rcv_nobuf;
+ nni_stat_item st_snd_toobig;
+ nni_stat_item st_snd_nobuf;
+ nni_stat_item st_peer_inactive;
#endif
};
@@ -363,9 +373,12 @@ udp_check_pipe_sequence(udp_pipe *p, uint32_t seq)
delta = (int32_t) (seq - p->peer_seq);
if (delta < 0) {
// out of order delivery
+ nni_stat_inc(&p->ep->st_rcv_reorder, 1);
return (false);
}
- // TODO: bump a stat for misses if delta > 0.
+ if (delta > 0) {
+ nni_stat_inc(&p->ep->st_rcv_reorder, 1);
+ }
p->peer_seq = seq + 1; // expected next sequence number
return (true);
}
@@ -452,7 +465,7 @@ udp_queue_tx(udp_ep *ep, nng_sockaddr *sa, udp_sp_msg *msg, nni_msg *payload)
if (ring->count == ring->size || !ep->started) {
// ring is full
- // TODO: bump a stat
+ nni_stat_inc(&ep->st_snd_nobuf, 1);
if (payload != NULL) {
nni_msg_free(payload);
}
@@ -621,7 +634,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
// NB: Peer ID endianness does not matter, as long we use it
// consistently.
if ((p = udp_find_pipe(ep, peer_id, send_id)) == NULL) {
- // TODO: Bump a stat...
+ nni_stat_inc(&ep->st_rcv_nomatch, 1);
udp_send_disc_full(ep, sa, send_id, peer_id, 0, DISC_NOTCONN);
// Question: how do we store the sockaddr for that?
return;
@@ -637,6 +650,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
// Make sure the message wasn't truncated, and that it fits within
// our maximum agreed upon payload.
if ((dreq->us_length > len) || (dreq->us_length > p->rcvmax)) {
+ nni_stat_inc(&ep->st_rcv_toobig, 1);
udp_send_disc(ep, p, DISC_MSGSIZE);
return;
}
@@ -652,19 +666,21 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
if (!udp_check_pipe_sequence(p, dreq->us_sequence)) {
// out of order delivery, drop it
- // TODO: bump a stat
return;
}
if (nni_lmq_full(&p->rx_mq)) {
- // bump a NOBUF stat
+ nni_stat_inc(&ep->st_rcv_nobuf, 1);
return;
}
// Short message, just alloc and copy
if (len <= ep->short_msg) {
+ nni_stat_inc(&ep->st_rcv_copy, 1);
if (nng_msg_alloc(&msg, len) != 0) {
- // TODO: bump a stat
+ if (p->npipe != NULL) {
+ nni_pipe_bump_error(p->npipe, NNG_ENOMEM);
+ }
return;
}
nni_msg_set_address(msg, sa);
@@ -672,12 +688,15 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
nni_msg_append(msg, nni_msg_body(ep->rx_payload), len);
nni_lmq_put(&p->rx_mq, msg);
} else {
+ nni_stat_inc(&ep->st_rcv_nocopy, 1);
// Message size larger than copy break, do zero copy
msg = ep->rx_payload;
if (nng_msg_alloc(&ep->rx_payload,
ep->rcvmax + sizeof(ep->rx_msg)) != 0) {
- // TODO: bump a stat
ep->rx_payload = msg; // make sure we put it back
+ if (p->npipe != NULL) {
+ nni_pipe_bump_error(p->npipe, NNG_ENOMEM);
+ }
return;
}
@@ -900,8 +919,6 @@ udp_rx_cb(void *arg)
hdr->data.us_length = NNI_GET16LE(&hdr->data.us_length);
#endif
- // TODO: verify that incoming type matches us!
-
switch (hdr->data.us_op_code) {
case OPCODE_DATA:
udp_recv_data(ep, &hdr->data, n, sa);
@@ -964,7 +981,7 @@ udp_pipe_send(void *arg, nni_aio *aio)
// floor. this is on the sender, so there isn't a compelling
// need to disconnect the pipe, since it we're not being
// "ill-behaved" to our peer.
- // TODO: bump a stat
+ nni_stat_inc(&ep->st_snd_toobig, 1);
nni_msg_free(msg);
return;
}
@@ -1241,6 +1258,7 @@ udp_timer_cb(void *arg)
// This will probably not be received by the peer,
// since we aren't getting anything from them. But
// having it on the wire may help debugging later.
+ nni_stat_inc(&ep->st_peer_inactive, 1);
udp_send_disc(ep, p, DISC_INACTIVE);
continue;
}
@@ -1321,7 +1339,80 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
.si_unit = NNG_UNIT_BYTES,
.si_atomic = true,
};
+ static const nni_stat_info rcv_reorder_info = {
+ .si_name = "rcv_reorder",
+ .si_desc = "messages received out of order",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info rcv_toobig_info = {
+ .si_name = "rcv_toobig",
+ .si_desc = "received messages rejected because too big",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info rcv_nomatch_info = {
+ .si_name = "rcv_nomatch",
+ .si_desc = "received messages without a matching connection",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info rcv_copy_info = {
+ .si_name = "rcv_copy",
+ .si_desc = "received messages copied (small)",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info rcv_nocopy_info = {
+ .si_name = "rcv_nocopy",
+ .si_desc = "received messages zero copy (large)",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info rcv_nobuf_info = {
+ .si_name = "rcv_nobuf",
+ .si_desc = "received messages dropped no buffer",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info snd_toobig_info = {
+ .si_name = "snd_toobig",
+ .si_desc = "sent messages rejected because too big",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info snd_nobuf_info = {
+ .si_name = "snd_nobuf",
+ .si_desc = "sent messages dropped no buffer",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info peer_inactive_info = {
+ .si_name = "peer_inactive",
+ .si_desc = "connections closed due to inactive peer",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_EVENTS,
+ .si_atomic = true,
+ };
+
nni_stat_init(&ep->st_rcv_max, &rcv_max_info);
+ nni_stat_init(&ep->st_rcv_copy, &rcv_copy_info);
+ nni_stat_init(&ep->st_rcv_nocopy, &rcv_nocopy_info);
+ nni_stat_init(&ep->st_rcv_reorder, &rcv_reorder_info);
+ nni_stat_init(&ep->st_rcv_toobig, &rcv_toobig_info);
+ nni_stat_init(&ep->st_rcv_nomatch, &rcv_nomatch_info);
+ nni_stat_init(&ep->st_rcv_nobuf, &rcv_nobuf_info);
+ nni_stat_init(&ep->st_snd_toobig, &snd_toobig_info);
+ nni_stat_init(&ep->st_snd_nobuf, &snd_nobuf_info);
+ nni_stat_init(&ep->st_peer_inactive, &peer_inactive_info);
#endif
// schedule our timer callback - forever for now