aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-11-04 23:20:14 -0800
committerGarrett D'Amore <garrett@damore.org>2024-11-04 23:20:14 -0800
commita19dfb14c66d39c44a04aebd3b2b170a3111fd0f (patch)
treeabee02c42249704566a83074306ec7392ce6266c
parent75eb6f3cefe7979ede8c58cd6fde01ea1618cbbb (diff)
downloadnng-a19dfb14c66d39c44a04aebd3b2b170a3111fd0f.tar.gz
nng-a19dfb14c66d39c44a04aebd3b2b170a3111fd0f.tar.bz2
nng-a19dfb14c66d39c44a04aebd3b2b170a3111fd0f.zip
UDP: prefer new packets (drop older)
Adds test cases for this and for remote pipe as well.
-rw-r--r--src/sp/transport/udp/udp.c8
-rw-r--r--src/sp/transport/udp/udp_tran_test.c102
2 files changed, 109 insertions, 1 deletions
diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c
index 165b118d..25899070 100644
--- a/src/sp/transport/udp/udp.c
+++ b/src/sp/transport/udp/udp.c
@@ -676,9 +676,15 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
return;
}
+ // We have a choice to make. Drop this message (easiest), or
+ // drop the oldest. We drop the oldest because generally we
+ // find that applications prefer to have more recent data rather
+ // than keeping stale data.
if (nni_lmq_full(&p->rx_mq)) {
+ nni_msg *old;
+ (void) nni_lmq_get(&p->rx_mq, &old);
+ nni_msg_free(old);
nni_stat_inc(&ep->st_rcv_nobuf, 1);
- return;
}
// Short message, just alloc and copy
diff --git a/src/sp/transport/udp/udp_tran_test.c b/src/sp/transport/udp/udp_tran_test.c
index 38a9a15e..151c2783 100644
--- a/src/sp/transport/udp/udp_tran_test.c
+++ b/src/sp/transport/udp/udp_tran_test.c
@@ -377,6 +377,106 @@ test_udp_multi_small_burst(void)
NUTS_CLOSE(s1);
}
+// this test is designed to overwhelm the receiver, and then
+// make sure that the last packet makes it through.
+void
+test_udp_crush(void)
+{
+ char msg[256];
+ char buf[256];
+ nng_socket s0;
+ nng_socket s1;
+ nng_listener l;
+ nng_dialer d;
+ size_t sz;
+ char *addr;
+
+ NUTS_ADDR(addr, "udp");
+
+ NUTS_OPEN(s0);
+ NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100));
+ NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 100));
+ NUTS_PASS(nng_listener_create(&l, s0, addr));
+ NUTS_PASS(nng_listener_set_size(l, NNG_OPT_UDP_COPY_MAX, 100));
+ NUTS_PASS(nng_listener_get_size(l, NNG_OPT_UDP_COPY_MAX, &sz));
+ NUTS_TRUE(sz == 100);
+ NUTS_PASS(nng_listener_start(l, 0));
+
+ NUTS_OPEN(s1);
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 100));
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100));
+ NUTS_PASS(nng_dialer_create(&d, s1, addr));
+ NUTS_PASS(nng_dialer_set_size(d, NNG_OPT_UDP_COPY_MAX, 100));
+ NUTS_PASS(nng_dialer_get_size(d, NNG_OPT_UDP_COPY_MAX, &sz));
+ NUTS_PASS(nng_dialer_start(d, 0));
+ nng_msleep(100);
+
+ (void) snprintf(msg, sizeof(msg), "Garbage in, Garbage out");
+
+ for (int i = 0; i < 1000; i++) {
+ (void) nng_send(s1, msg, strlen(msg) + 1, 0);
+ }
+
+ (void) snprintf(msg, sizeof(msg), "All Ok");
+ NUTS_PASS(nng_send(s1, msg, strlen(msg) + 1, 0));
+
+ memset(buf, 0, sizeof(buf));
+ for (int i = 0; i < 1000; i++) {
+ size_t sz;
+ int rv = nng_recv(s0, buf, &sz, 0);
+ if (rv == 0) {
+ continue;
+ }
+ if (rv == NNG_ETIMEDOUT) {
+ break;
+ }
+ NUTS_PASS(rv);
+ }
+
+ NUTS_CLOSE(s0);
+ NUTS_CLOSE(s1);
+}
+
+void
+test_udp_pipe(void)
+{
+ nng_socket s0;
+ nng_socket s1;
+ nng_listener l;
+ size_t sz;
+ char *addr;
+ nng_msg *msg;
+ nng_pipe p;
+ nng_sockaddr sa0;
+ nng_sockaddr sa1;
+
+ NUTS_ADDR(addr, "udp4");
+
+ NUTS_OPEN(s0);
+ NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100));
+ NUTS_PASS(nng_listener_create(&l, s0, addr));
+ NUTS_PASS(nng_listener_set_size(l, NNG_OPT_UDP_COPY_MAX, 100));
+ NUTS_PASS(nng_listener_get_size(l, NNG_OPT_UDP_COPY_MAX, &sz));
+ NUTS_TRUE(sz == 100);
+ NUTS_PASS(nng_listener_start(l, 0));
+ NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa0));
+
+ NUTS_OPEN(s1);
+ NUTS_PASS(nng_dial(s1, addr, NULL, 0));
+ nng_msleep(100);
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100));
+
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_sendmsg(s0, msg, 0));
+ NUTS_PASS(nng_recvmsg(s1, &msg, 0));
+ p = nng_msg_get_pipe(msg);
+ NUTS_PASS(nng_pipe_get_addr(p, NNG_OPT_REMADDR, &sa1));
+
+ NUTS_TRUE(memcmp(&sa0.s_in, &sa1.s_in, sizeof(sa0.s_in)) == 0);
+ nng_msg_free(msg);
+ NUTS_CLOSE(s0);
+ NUTS_CLOSE(s1);
+}
void
test_udp_stats(void)
{
@@ -438,6 +538,8 @@ NUTS_TESTS = {
{ "udp recv copy", test_udp_recv_copy },
{ "udp multi send recv", test_udp_multi_send_recv },
{ "udp multi small burst", test_udp_multi_small_burst },
+ { "udp crush", test_udp_crush },
+ { "udp pipe", test_udp_pipe },
{ "udp stats", test_udp_stats },
{ NULL, NULL },
};