diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-11-04 23:20:14 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-11-04 23:20:14 -0800 |
| commit | a19dfb14c66d39c44a04aebd3b2b170a3111fd0f (patch) | |
| tree | abee02c42249704566a83074306ec7392ce6266c /src/sp | |
| parent | 75eb6f3cefe7979ede8c58cd6fde01ea1618cbbb (diff) | |
| download | nng-a19dfb14c66d39c44a04aebd3b2b170a3111fd0f.tar.gz nng-a19dfb14c66d39c44a04aebd3b2b170a3111fd0f.tar.bz2 nng-a19dfb14c66d39c44a04aebd3b2b170a3111fd0f.zip | |
UDP: prefer new packets (drop older)
Adds test cases for this and for remote pipe as well.
Diffstat (limited to 'src/sp')
| -rw-r--r-- | src/sp/transport/udp/udp.c | 8 | ||||
| -rw-r--r-- | src/sp/transport/udp/udp_tran_test.c | 102 |
2 files changed, 109 insertions, 1 deletions
diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c index 165b118d..25899070 100644 --- a/src/sp/transport/udp/udp.c +++ b/src/sp/transport/udp/udp.c @@ -676,9 +676,15 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa) return; } + // We have a choice to make. Drop this message (easiest), or + // drop the oldest. We drop the oldest because generally we + // find that applications prefer to have more recent data rather + // than keeping stale data. if (nni_lmq_full(&p->rx_mq)) { + nni_msg *old; + (void) nni_lmq_get(&p->rx_mq, &old); + nni_msg_free(old); nni_stat_inc(&ep->st_rcv_nobuf, 1); - return; } // Short message, just alloc and copy diff --git a/src/sp/transport/udp/udp_tran_test.c b/src/sp/transport/udp/udp_tran_test.c index 38a9a15e..151c2783 100644 --- a/src/sp/transport/udp/udp_tran_test.c +++ b/src/sp/transport/udp/udp_tran_test.c @@ -377,6 +377,106 @@ test_udp_multi_small_burst(void) NUTS_CLOSE(s1); } +// this test is designed to overwhelm the receiver, and then +// make sure that the last packet makes it through. +void +test_udp_crush(void) +{ + char msg[256]; + char buf[256]; + nng_socket s0; + nng_socket s1; + nng_listener l; + nng_dialer d; + size_t sz; + char *addr; + + NUTS_ADDR(addr, "udp"); + + NUTS_OPEN(s0); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_listener_create(&l, s0, addr)); + NUTS_PASS(nng_listener_set_size(l, NNG_OPT_UDP_COPY_MAX, 100)); + NUTS_PASS(nng_listener_get_size(l, NNG_OPT_UDP_COPY_MAX, &sz)); + NUTS_TRUE(sz == 100); + NUTS_PASS(nng_listener_start(l, 0)); + + NUTS_OPEN(s1); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_dialer_create(&d, s1, addr)); + NUTS_PASS(nng_dialer_set_size(d, NNG_OPT_UDP_COPY_MAX, 100)); + NUTS_PASS(nng_dialer_get_size(d, NNG_OPT_UDP_COPY_MAX, &sz)); + NUTS_PASS(nng_dialer_start(d, 0)); + nng_msleep(100); + + (void) snprintf(msg, sizeof(msg), "Garbage in, Garbage out"); + + for (int i = 0; i < 1000; i++) { + (void) nng_send(s1, msg, strlen(msg) + 1, 0); + } + + (void) snprintf(msg, sizeof(msg), "All Ok"); + NUTS_PASS(nng_send(s1, msg, strlen(msg) + 1, 0)); + + memset(buf, 0, sizeof(buf)); + for (int i = 0; i < 1000; i++) { + size_t sz; + int rv = nng_recv(s0, buf, &sz, 0); + if (rv == 0) { + continue; + } + if (rv == NNG_ETIMEDOUT) { + break; + } + NUTS_PASS(rv); + } + + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); +} + +void +test_udp_pipe(void) +{ + nng_socket s0; + nng_socket s1; + nng_listener l; + size_t sz; + char *addr; + nng_msg *msg; + nng_pipe p; + nng_sockaddr sa0; + nng_sockaddr sa1; + + NUTS_ADDR(addr, "udp4"); + + NUTS_OPEN(s0); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_listener_create(&l, s0, addr)); + NUTS_PASS(nng_listener_set_size(l, NNG_OPT_UDP_COPY_MAX, 100)); + NUTS_PASS(nng_listener_get_size(l, NNG_OPT_UDP_COPY_MAX, &sz)); + NUTS_TRUE(sz == 100); + NUTS_PASS(nng_listener_start(l, 0)); + NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa0)); + + NUTS_OPEN(s1); + NUTS_PASS(nng_dial(s1, addr, NULL, 0)); + nng_msleep(100); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100)); + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_sendmsg(s0, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + p = nng_msg_get_pipe(msg); + NUTS_PASS(nng_pipe_get_addr(p, NNG_OPT_REMADDR, &sa1)); + + NUTS_TRUE(memcmp(&sa0.s_in, &sa1.s_in, sizeof(sa0.s_in)) == 0); + nng_msg_free(msg); + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); +} void test_udp_stats(void) { @@ -438,6 +538,8 @@ NUTS_TESTS = { { "udp recv copy", test_udp_recv_copy }, { "udp multi send recv", test_udp_multi_send_recv }, { "udp multi small burst", test_udp_multi_small_burst }, + { "udp crush", test_udp_crush }, + { "udp pipe", test_udp_pipe }, { "udp stats", test_udp_stats }, { NULL, NULL }, }; |
