aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-10-13 08:38:11 -0700
committerGarrett D'Amore <garrett@damore.org>2024-10-13 08:38:11 -0700
commit0bef8c0aa19042d618f9953f643b9150a5ae0ea5 (patch)
tree54d3011bef9a947f83a3f7cf97410696db3bacc6 /src
parent6d0143d66feec9f9f8418a8758807c503eea87e1 (diff)
downloadnng-0bef8c0aa19042d618f9953f643b9150a5ae0ea5.tar.gz
nng-0bef8c0aa19042d618f9953f643b9150a5ae0ea5.tar.bz2
nng-0bef8c0aa19042d618f9953f643b9150a5ae0ea5.zip
UDP: numerous fixes, added test for copy break
There were several bugs here, including use-after-free, a problem when the copy limit was exceeded, and uninitialized receive thresholds.
Diffstat (limited to 'src')
-rw-r--r--src/sp/transport/udp/udp.c132
-rw-r--r--src/sp/transport/udp/udp_tran_test.c38
2 files changed, 130 insertions, 40 deletions
diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c
index 10e40397..b55fcdb1 100644
--- a/src/sp/transport/udp/udp.c
+++ b/src/sp/transport/udp/udp.c
@@ -66,6 +66,10 @@ typedef enum udp_disc_reason {
#define NNG_UDP_RECVMAX 65000 // largest permitted by spec
#endif
+#ifndef NNG_UDP_COPYMAX // threshold for copying instead of loan up
+#define NNG_UDP_COPYMAX 1024
+#endif
+
#ifndef NNG_UDP_REFRESH
#define NNG_UDP_REFRESH (5 * NNI_SECOND)
#endif
@@ -213,12 +217,11 @@ struct udp_ep {
nng_duration refresh; // refresh interval for connections in seconds
udp_sp_msg rx_msg; // contains the received message header
uint16_t rcvmax; // max payload, trimmed to uint16_t
- uint16_t short_msg;
+ uint16_t copymax;
udp_txring tx_ring;
nni_time next_wake;
nni_aio_completions complq;
-#ifdef NNG_ENABLE_STATS
nni_stat_item st_rcv_max;
nni_stat_item st_rcv_reorder;
nni_stat_item st_rcv_toobig;
@@ -229,7 +232,7 @@ struct udp_ep {
nni_stat_item st_snd_toobig;
nni_stat_item st_snd_nobuf;
nni_stat_item st_peer_inactive;
-#endif
+ nni_stat_item st_copy_max;
};
static void udp_ep_hold(udp_ep *ep);
@@ -253,8 +256,10 @@ udp_pipe_alloc(udp_pipe **pp, udp_ep *ep, uint32_t peer_id, nng_sockaddr *sa)
NNI_FREE_STRUCT(p);
return (rv);
}
+ udp_ep_hold(ep);
now = nni_clock();
nni_aio_list_init(&p->rx_aios);
+ nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN);
p->ep = ep;
p->dialer = ep->dialer;
p->self_seq = nni_random();
@@ -266,8 +271,6 @@ udp_pipe_alloc(udp_pipe **pp, udp_ep *ep, uint32_t peer_id, nng_sockaddr *sa)
p->expire = now + (p->dialer ? (5 * NNI_SECOND) : UDP_PIPE_TIMEOUT(p));
p->rcvmax = ep->rcvmax;
*pp = p;
- nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN);
- udp_ep_hold(ep);
return (0);
}
@@ -325,6 +328,12 @@ udp_pipe_destroy(udp_pipe *p)
{
nng_msg *m;
+ if (p->self_id != 0) {
+ nni_id_remove(&p->ep->pipes, p->self_id);
+ p->self_id = 0;
+ }
+ nni_list_node_remove(&p->node);
+
// call with ep->mtx lock held
while (!nni_lmq_empty(&p->rx_mq)) {
nni_lmq_get(&p->rx_mq, &m);
@@ -343,8 +352,6 @@ udp_pipe_fini(void *arg)
udp_ep *ep = p->ep;
nni_mtx_lock(&ep->mtx);
- nni_id_remove(&ep->pipes, p->self_id);
-
udp_pipe_destroy(p);
udp_ep_rele(ep); // releases lock
}
@@ -605,8 +612,7 @@ udp_recv_disc(udp_ep *ep, udp_sp_disc *disc, nng_sockaddr *sa)
// For now we aren't validating the sequence numbers.
// This allows for an out of order DISC to cause the
// connection to be dropped, but it should self heal.
- p->closed = true;
- p->self_id = 0; // prevent it from being identified later
+ p->closed = true;
while ((aio = nni_list_first(&p->rx_aios)) != NULL) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -641,6 +647,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
}
if (p->peer_id == 0) {
// connection isn't formed yet ... send another CREQ
+ nni_stat_inc(&ep->st_rcv_nomatch, 1);
udp_send_creq(ep, p);
return;
}
@@ -675,7 +682,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
}
// Short message, just alloc and copy
- if (len <= ep->short_msg) {
+ if (len <= ep->copymax) {
nni_stat_inc(&ep->st_rcv_copy, 1);
if (nng_msg_alloc(&msg, len) != 0) {
if (p->npipe != NULL) {
@@ -684,9 +691,9 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
return;
}
nni_msg_set_address(msg, sa);
- nni_msg_clear(msg);
- nni_msg_append(msg, nni_msg_body(ep->rx_payload), len);
+ memcpy(nni_msg_body(msg), nni_msg_body(ep->rx_payload), len);
nni_lmq_put(&p->rx_mq, msg);
+ nni_msg_realloc(ep->rx_payload, ep->rcvmax);
} else {
nni_stat_inc(&ep->st_rcv_nocopy, 1);
// Message size larger than copy break, do zero copy
@@ -998,7 +1005,8 @@ udp_pipe_send(void *arg, nni_aio *aio)
// Just queue it, or fail it.
udp_queue_tx(ep, &p->peer_addr, (void *) &dreq, msg);
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish(aio, 0, dreq.us_length);
+ nni_aio_finish(
+ aio, 0, msg ? nni_msg_len(msg) + nni_msg_header_len(msg) : 0);
}
static void
@@ -1190,21 +1198,22 @@ udp_ep_close(void *arg)
// close all pipes
uint32_t cursor = 0;
- while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) {
- p->closed = true;
+ uint64_t id;
+
+ // first we grab the connpipes that are not closed upstream
+ while ((p = nni_list_first(&ep->connpipes)) != NULL) {
+ udp_pipe_destroy(p);
+ ep->refcnt--;
+ }
+ while (nni_id_visit(&ep->pipes, &id, (void **) &p, &cursor)) {
if (p->peer_id != 0) {
udp_send_disc(ep, p, DISC_CLOSED);
}
+ p->closed = true;
while ((aio = nni_list_first(&p->rx_aios)) != NULL) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (p->npipe == NULL) {
- nni_list_remove(&ep->connpipes, p);
- nni_id_remove(&ep->pipes, p->self_id);
- udp_pipe_destroy(p);
- ep->refcnt--;
- }
}
nni_aio_close(&ep->resaio);
nni_mtx_unlock(&ep->mtx);
@@ -1233,28 +1242,29 @@ udp_timer_cb(void *arg)
ep->next_wake = NNI_TIME_NEVER;
while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) {
- if (p->closed) {
- if (p->npipe == NULL) {
- // pipe closed, but we have to clean it up
- // ourselves
- nni_id_remove(&ep->pipes, p->self_id);
- udp_pipe_destroy(p);
- ep->refcnt--;
- }
- continue;
- }
-
if (now > p->expire) {
char buf[128];
nni_aio *aio;
nng_log_info("NNG-UDP-INACTIVE",
"Pipe peer %s timed out due to inactivity",
nng_str_sockaddr(&p->peer_addr, buf, sizeof(buf)));
+
+ // Possibly alert the dialer, so it can restart a new
+ // attempt.
if ((ep->dialer) && (p->peer_id == 0) &&
(aio = nni_list_first(&ep->connaios))) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ETIMEDOUT);
}
+
+ // If we're still on the connect list, then we need
+ // take responsibility for cleaning this up.
+ if (nni_list_node_active(&p->node)) {
+ udp_pipe_destroy(p);
+ ep->refcnt--;
+ continue;
+ }
+
// This will probably not be received by the peer,
// since we aren't getting anything from them. But
// having it on the wire may help debugging later.
@@ -1312,6 +1322,7 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
ep->url = url;
ep->refresh = NNG_UDP_REFRESH; // one minute by default
ep->rcvmax = NNG_UDP_RECVMAX;
+ ep->copymax = NNG_UDP_COPYMAX;
ep->refcnt = 1;
if ((rv = nni_msg_alloc(&ep->rx_payload,
ep->rcvmax + sizeof(ep->rx_msg)) != 0)) {
@@ -1331,7 +1342,6 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
nni_aio_init(&ep->resaio, udp_resolv_cb, ep);
nni_aio_completions_init(&ep->complq);
-#ifdef NNG_ENABLE_STATS
static const nni_stat_info rcv_max_info = {
.si_name = "rcv_max",
.si_desc = "maximum receive size",
@@ -1402,8 +1412,16 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
.si_unit = NNG_UNIT_EVENTS,
.si_atomic = true,
};
+ static const nni_stat_info copy_max_info = {
+ .si_name = "rcv_copy_max",
+ .si_desc = "threshold to copy instead of loan-up",
+ .si_type = NNG_STAT_LEVEL,
+ .si_unit = NNG_UNIT_BYTES,
+ .si_atomic = true,
+ };
nni_stat_init(&ep->st_rcv_max, &rcv_max_info);
+ nni_stat_init(&ep->st_copy_max, &copy_max_info);
nni_stat_init(&ep->st_rcv_copy, &rcv_copy_info);
nni_stat_init(&ep->st_rcv_nocopy, &rcv_nocopy_info);
nni_stat_init(&ep->st_rcv_reorder, &rcv_reorder_info);
@@ -1413,7 +1431,8 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock)
nni_stat_init(&ep->st_snd_toobig, &snd_toobig_info);
nni_stat_init(&ep->st_snd_nobuf, &snd_nobuf_info);
nni_stat_init(&ep->st_peer_inactive, &peer_inactive_info);
-#endif
+
+ nni_stat_set_value(&ep->st_rcv_max, ep->rcvmax);
// schedule our timer callback - forever for now
// adjusted automatically as we add pipes or other
@@ -1459,9 +1478,7 @@ udp_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
return (rv);
}
-#ifdef NNG_ENABLE_STATS
nni_dialer_add_stat(ndialer, &ep->st_rcv_max);
-#endif
*dp = ep;
return (0);
}
@@ -1485,9 +1502,7 @@ udp_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
}
ep->self_sa = sa;
-#ifdef NNG_ENABLE_STATS
nni_listener_add_stat(nlistener, &ep->st_rcv_max);
-#endif
*lp = ep;
return (0);
@@ -1730,6 +1745,9 @@ udp_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
size_t val;
int rv;
if ((rv = nni_copyin_size(&val, v, sz, 0, 65000, t)) == 0) {
+ if ((val == 0) || (val > 65000)) {
+ val = 65000;
+ }
nni_mtx_lock(&ep->mtx);
if (ep->started) {
nni_mtx_unlock(&ep->mtx);
@@ -1737,9 +1755,38 @@ udp_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
}
ep->rcvmax = (uint16_t) val;
nni_mtx_unlock(&ep->mtx);
-#ifdef NNG_ENABLE_STATS
nni_stat_set_value(&ep->st_rcv_max, val);
-#endif
+ }
+ return (rv);
+}
+
+static int
+udp_ep_get_copymax(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ udp_ep *ep = arg;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_size(ep->copymax, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
+}
+
+static int
+udp_ep_set_copymax(void *arg, const void *v, size_t sz, nni_opt_type t)
+{
+ udp_ep *ep = arg;
+ size_t val;
+ int rv;
+ if ((rv = nni_copyin_size(&val, v, sz, 0, 65000, t)) == 0) {
+ nni_mtx_lock(&ep->mtx);
+ if (ep->started) {
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_EBUSY);
+ }
+ ep->copymax = (uint16_t) val;
+ nni_mtx_unlock(&ep->mtx);
+ nni_stat_set_value(&ep->st_copy_max, val);
}
return (rv);
}
@@ -1835,6 +1882,11 @@ static const nni_option udp_ep_opts[] = {
.o_set = udp_ep_set_recvmaxsz,
},
{
+ .o_name = NNG_OPT_UDP_COPY_MAX,
+ .o_get = udp_ep_get_copymax,
+ .o_set = udp_ep_set_copymax,
+ },
+ {
.o_name = NNG_OPT_URL,
.o_get = udp_ep_get_url,
},
diff --git a/src/sp/transport/udp/udp_tran_test.c b/src/sp/transport/udp/udp_tran_test.c
index b99c5af1..c2b515e8 100644
--- a/src/sp/transport/udp/udp_tran_test.c
+++ b/src/sp/transport/udp/udp_tran_test.c
@@ -143,6 +143,8 @@ test_udp_recv_max(void)
NUTS_PASS(nng_socket_get_size(s0, NNG_OPT_RECVMAXSZ, &sz));
NUTS_TRUE(sz == 200);
NUTS_PASS(nng_listener_set_size(l, NNG_OPT_RECVMAXSZ, 100));
+ NUTS_PASS(nng_listener_get_size(l, NNG_OPT_RECVMAXSZ, &sz));
+ NUTS_TRUE(sz == 100);
NUTS_PASS(nng_listener_start(l, 0));
NUTS_OPEN(s1);
@@ -158,6 +160,41 @@ test_udp_recv_max(void)
NUTS_CLOSE(s1);
}
+void
+test_udp_recv_copy(void)
+{
+ char msg[256];
+ char buf[256];
+ nng_socket s0;
+ nng_socket s1;
+ nng_listener l;
+ size_t sz;
+ char *addr;
+
+ NUTS_ADDR(addr, "udp");
+
+ NUTS_OPEN(s0);
+ NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100));
+ NUTS_PASS(nng_listener_create(&l, s0, addr));
+ NUTS_PASS(nng_listener_set_size(l, NNG_OPT_UDP_COPY_MAX, 100));
+ NUTS_PASS(nng_listener_get_size(l, NNG_OPT_UDP_COPY_MAX, &sz));
+ NUTS_TRUE(sz == 100);
+ NUTS_PASS(nng_listener_start(l, 0));
+
+ NUTS_OPEN(s1);
+ NUTS_PASS(nng_dial(s1, addr, NULL, 0));
+ nng_msleep(100);
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100));
+ NUTS_PASS(nng_send(s1, msg, 95, 0));
+ NUTS_PASS(nng_recv(s0, buf, &sz, 0));
+ NUTS_TRUE(sz == 95);
+ NUTS_PASS(nng_send(s1, msg, 150, 0));
+ NUTS_PASS(nng_recv(s0, buf, &sz, 0));
+ NUTS_TRUE(sz == 150);
+ NUTS_CLOSE(s0);
+ NUTS_CLOSE(s1);
+}
+
NUTS_TESTS = {
{ "udp wild card connect fail", test_udp_wild_card_connect_fail },
@@ -167,5 +204,6 @@ NUTS_TESTS = {
{ "udp non-local address", test_udp_non_local_address },
{ "udp malformed address", test_udp_malformed_address },
{ "udp recv max", test_udp_recv_max },
+ { "udp recv copy", test_udp_recv_copy },
{ NULL, NULL },
};