diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-29 16:30:31 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-29 17:36:46 -0800 |
| commit | aabe3ed301081aacce11408749c26b6baae3faaa (patch) | |
| tree | c7e72e15c94221d0326204513fb666e6a7bd45a1 /src | |
| parent | 60ff324b1e6e5124dbbfefec732940512ed40f87 (diff) | |
| download | nng-aabe3ed301081aacce11408749c26b6baae3faaa.tar.gz nng-aabe3ed301081aacce11408749c26b6baae3faaa.tar.bz2 nng-aabe3ed301081aacce11408749c26b6baae3faaa.zip | |
udp: use a bounce buffer if we lack sendmsg or recvmsg
This includes checks to determine if those functions are present,
and a test case to verify that scatter gather with UDP works.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.h | 4 | ||||
| -rw-r--r-- | src/platform/posix/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 159 | ||||
| -rw-r--r-- | src/platform/udp_test.c | 70 | ||||
| -rw-r--r-- | src/sp/transport/udp/udp.c | 28 |
5 files changed, 218 insertions, 45 deletions
diff --git a/src/core/aio.h b/src/core/aio.h index 5c74691d..cbf2c919 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -194,6 +194,8 @@ extern void nni_aio_sys_fini(void); typedef struct nni_aio_expire_q nni_aio_expire_q; +#define NNI_AIO_MAX_IOV 8 + // nng_aio is an async I/O handle. The details of this aio structure // are private to the AIO framework. The structure has the public name // (nng_aio) so that we minimize the pollution in the public API namespace. @@ -216,7 +218,7 @@ struct nng_aio { nni_task a_task; // Read/write operations. - nni_iov a_iov[8]; + nni_iov a_iov[NNI_AIO_MAX_IOV]; unsigned a_nio; // Message operations. diff --git a/src/platform/posix/CMakeLists.txt b/src/platform/posix/CMakeLists.txt index 95d9a56a..b8fb665a 100644 --- a/src/platform/posix/CMakeLists.txt +++ b/src/platform/posix/CMakeLists.txt @@ -29,6 +29,8 @@ if (NNG_PLATFORM_POSIX) nng_check_func(getentropy NNG_HAVE_GETENTROPY) nng_check_func(getrandom NNG_HAVE_GETRANDOM) nng_check_func(arc4random_buf NNG_HAVE_ARC4RANDOM) + nng_check_func(recvmsg NNG_HAVE_RECVMSG) + nng_check_func(sendmsg NNG_HAVE_SENDMSG) nng_check_func(clock_gettime NNG_HAVE_CLOCK_GETTIME_LIBC) if (NNG_HAVE_CLOCK_GETTIME_LIBC) diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 4bb50656..a1601874 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -78,6 +78,54 @@ nni_posix_udp_doclose(nni_plat_udp *udp) nni_posix_udp_doerror(udp, NNG_ECLOSED); } +// If we need to do sendmsg or recvmsg, then we do this fairly +// awful thing to do a "pull" up. It is important that in such a +// case we must have only a single poller, because we have only +// this single buffer. Performance will be worse, as data copies +// are involved. +#if !defined(NNG_HAVE_RECVMSG) || !defined(NNG_HAVE_SENDMSG) +static uint8_t bouncebuf[65536]; +#endif + +#if !defined(NNG_HAVE_RECVMSG) +static int +copy_to_bounce(nng_iov *iov, int niov) +{ + int room = sizeof(bouncebuf); + uint8_t *buf = bouncebuf; + int len = 0; + + for (int i = 0; i < niov && room; i++) { + int n = iov[i].iov_len; + if (n > room) { + n = room; + } + memcpy(buf, iov[i].iov_buf, n); + room -= n; + buf += n; + len += n; + } + return (len); +} +#endif + +#if !defined(NNG_HAVE_SENDMSG) +static void +copy_from_bounce(nng_iov *iov, int niov, int len) +{ + uint8_t *buf = bouncebuf; + for (int i = 0; i < niov && len; i++) { + int n = iov[i].iov_len; + if (n > len) { + n = len; + } + memcpy(iov[i].iov_buf, buf, n); + len -= n; + buf += n; + } +} +#endif + static void nni_posix_udp_dorecv(nni_plat_udp *udp) { @@ -85,16 +133,19 @@ nni_posix_udp_dorecv(nni_plat_udp *udp) nni_list *q = &udp->udp_recvq; // While we're able to recv, do so. while ((aio = nni_list_first(q)) != NULL) { - struct iovec iov[4]; unsigned niov; - nni_iov *aiov; + nng_iov *aiov; struct sockaddr_storage ss; nng_sockaddr *sa; - struct msghdr hdr = { .msg_name = NULL }; int rv = 0; int cnt = 0; nni_aio_get_iov(aio, &niov, &aiov); + NNI_ASSERT(niov <= NNI_AIO_MAX_IOV); + +#ifdef NNG_HAVE_RECVMSG + struct iovec iov[NNI_AIO_MAX_IOV]; + struct msghdr hdr = { .msg_name = NULL }; for (unsigned i = 0; i < niov; i++) { iov[i].iov_base = aiov[i].iov_buf; @@ -119,6 +170,32 @@ nni_posix_udp_dorecv(nni_plat_udp *udp) nni_posix_sockaddr2nn( sa, (void *) &ss, hdr.msg_namelen); } +#else // !NNG_HAVE_RECVMSG + // Here we have to use a bounce buffer + uint8_t *buf; + size_t len; + socklen_t salen; + if (niov == 1) { + buf = aiov[0].iov_buf; + len = aiov[0].iov_len; + } else { + buf = bouncebuf; + len = sizeof(bouncebuf); + } + salen = sizeof(ss); + if ((cnt = recvfrom(udp->udp_fd, buf, len, 0, (void *) &ss, + &salen)) < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; + } + rv = nni_plat_errno(errno); + } else if ((sa = nni_aio_get_input(aio, 0)) != NULL) { + nni_posix_sockaddr2nn(sa, (void *) &ss, salen); + } + if (niov != 1) { + copy_from_bounce(aiov, niov, cnt); + } +#endif nni_list_remove(q, aio); nni_aio_finish(aio, rv, cnt); } @@ -134,43 +211,61 @@ nni_posix_udp_dosend(nni_plat_udp *udp) while ((aio = nni_list_first(q)) != NULL) { struct sockaddr_storage ss; - int len; - int rv = 0; - int cnt = 0; + int salen; + int rv = 0; + int cnt = 0; + unsigned niov; + nni_iov *aiov; - len = nni_posix_nn2sockaddr(&ss, nni_aio_get_input(aio, 0)); - if (len < 1) { + nni_aio_get_iov(aio, &niov, &aiov); + NNI_ASSERT(niov <= NNI_AIO_MAX_IOV); + if ((salen = nni_posix_nn2sockaddr( + &ss, nni_aio_get_input(aio, 0))) < 1) { rv = NNG_EADDRINVAL; } else { - unsigned niov; - nni_iov *aiov; - struct iovec iov[16]; +#ifdef NNG_HAVE_SENDMSG - nni_aio_get_iov(aio, &niov, &aiov); - if (niov > NNI_NUM_ELEMENTS(iov)) { - rv = NNG_EINVAL; + struct iovec iov[NNI_AIO_MAX_IOV]; + struct msghdr hdr = { .msg_name = NULL }; + for (unsigned i = 0; i < niov; i++) { + iov[i].iov_base = aiov[i].iov_buf; + iov[i].iov_len = aiov[i].iov_len; } - if (rv == 0) { - struct msghdr hdr = { .msg_name = NULL }; - for (unsigned i = 0; i < niov; i++) { - iov[i].iov_base = aiov[i].iov_buf; - iov[i].iov_len = aiov[i].iov_len; + hdr.msg_iov = iov; + hdr.msg_iovlen = niov; + hdr.msg_name = &ss; + hdr.msg_namelen = salen; + + cnt = sendmsg(udp->udp_fd, &hdr, MSG_NOSIGNAL); + if (cnt < 0) { + if ((errno == EAGAIN) || + (errno == EWOULDBLOCK)) { + // Cannot send now, leave. + return; } - hdr.msg_iov = iov; - hdr.msg_iovlen = niov; - hdr.msg_name = &ss; - hdr.msg_namelen = len; - - cnt = sendmsg(udp->udp_fd, &hdr, MSG_NOSIGNAL); - if (cnt < 0) { - if ((errno == EAGAIN) || - (errno == EWOULDBLOCK)) { - // Cannot send now, leave. - return; - } - rv = nni_plat_errno(errno); + rv = nni_plat_errno(errno); + } +#else // !NNG_HAVE_SENDMSG + uint8_t *buf; + size_t len; + if (niov == 1) { + buf = aiov[0].iov_buf; + len = aiov[0].iov_len; + } else { + len = copy_to_bounce(aiov, niov); + buf = bouncebuf; + } + cnt = sendto( + udp->udp_fd, buf, len, 0, (void *) &ss, salen); + if (cnt < 0) { + if ((errno == EAGAIN) || + (errno == EWOULDBLOCK)) { + // Cannot send now, leave. + return; } + rv = nni_plat_errno(errno); } +#endif } nni_list_remove(q, aio); diff --git a/src/platform/udp_test.c b/src/platform/udp_test.c index 211c985c..fa1faba9 100644 --- a/src/platform/udp_test.c +++ b/src/platform/udp_test.c @@ -86,6 +86,75 @@ test_udp_pair(void) } void +test_udp_scatter_gather(void) +{ + nng_sockaddr sa1; + nng_sockaddr sa2; + nng_udp *u1; + nng_udp *u2; + uint32_t loopback; + nng_aio *aio1; + nng_aio *aio2; + nng_iov iov1[2], iov2[2]; + char rbuf[1024]; + nng_sockaddr to; + nng_sockaddr from; + + loopback = htonl(0x7f000001); // 127.0.0.1 + + sa1.s_in.sa_family = NNG_AF_INET; + sa1.s_in.sa_addr = loopback; + sa1.s_in.sa_port = 0; // wild card port binding + + sa2.s_in.sa_family = NNG_AF_INET; + sa2.s_in.sa_addr = loopback; + sa2.s_in.sa_port = 0; + + NUTS_PASS(nng_udp_open(&u1, &sa1)); + NUTS_PASS(nng_udp_open(&u2, &sa2)); + + NUTS_PASS(nng_udp_sockname(u1, &sa1)); + NUTS_PASS(nng_udp_sockname(u2, &sa2)); + + NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + NUTS_PASS(nng_aio_alloc(&aio2, NULL, NULL)); + + to = sa2; + iov1[0].iov_buf = "abc"; + iov1[0].iov_len = 3; + iov1[1].iov_buf = "def"; + iov1[1].iov_len = 4; // include nul + NUTS_PASS(nng_aio_set_iov(aio1, 2, iov1)); + NUTS_PASS(nng_aio_set_input(aio1, 0, &to)); + + iov2[0].iov_buf = rbuf; + iov2[0].iov_len = 4; + iov2[1].iov_len = 1020; + iov2[1].iov_buf = rbuf + 4; + + NUTS_PASS(nng_aio_set_iov(aio2, 2, iov2)); + NUTS_PASS(nng_aio_set_input(aio2, 0, &from)); + + nng_udp_recv(u2, aio2); + nng_udp_send(u1, aio1); + nng_aio_wait(aio1); + nng_aio_wait(aio2); + + NUTS_PASS(nng_aio_result(aio1)); + NUTS_PASS(nng_aio_result(aio2)); + NUTS_TRUE(nng_aio_count(aio2) == strlen("abcdef") + 1); + NUTS_TRUE(strcmp(rbuf, "abcdef") == 0); + NUTS_TRUE(from.s_in.sa_family == sa1.s_in.sa_family); + NUTS_TRUE(from.s_in.sa_addr == sa1.s_in.sa_addr); + NUTS_TRUE(from.s_in.sa_port == sa1.s_in.sa_port); + + nng_aio_free(aio1); + nng_aio_free(aio2); + nng_udp_close(u1); + nng_udp_close(u2); +} + +void test_udp_multi_send_recv(void) { nng_sockaddr sa1, sa2, sa3, sa4; @@ -411,6 +480,7 @@ test_udp_send_v6_from_v4(void) NUTS_TESTS = { { "udp pair", test_udp_pair }, + { "udp scatter gather", test_udp_scatter_gather }, { "udp send recv multi", test_udp_multi_send_recv }, { "udp send no address", test_udp_send_no_addr }, { "udp send ipc address", test_udp_send_ipc }, diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c index fee142e3..45260a1b 100644 --- a/src/sp/transport/udp/udp.c +++ b/src/sp/transport/udp/udp.c @@ -214,7 +214,7 @@ struct udp_ep { nni_list connaios; // aios from accept waiting for a client peer nni_list connpipes; // pipes waiting to be connected nng_duration refresh; // refresh interval for connections in seconds - udp_sp_msg rx_msg; // contains the received message header + udp_sp_msg *rx_msg; // contains the received message header uint16_t rcvmax; // max payload, trimmed to uint16_t uint16_t copymax; udp_txring tx_ring; @@ -385,14 +385,20 @@ udp_pipe_schedule(udp_pipe *p) static void udp_start_rx(udp_ep *ep) { - nni_iov iov[2]; + nni_iov iov; + + // We use this trick to collect the message header so that we can + // do the entire message in a single iov, which avoids the need to + // scatter/gather (which can be problematic for platforms that cannot + // do scatter/gather due to missing recvmsg.) + (void) nni_msg_insert(ep->rx_payload, NULL, sizeof(udp_sp_msg)); + iov.iov_buf = nni_msg_body(ep->rx_payload); + iov.iov_len = nni_msg_len(ep->rx_payload); + ep->rx_msg = nni_msg_body(ep->rx_payload); + nni_msg_trim(ep->rx_payload, sizeof(udp_sp_msg)); - iov[0].iov_buf = &ep->rx_msg; - iov[0].iov_len = sizeof(ep->rx_msg); - iov[1].iov_buf = nni_msg_body(ep->rx_payload); - iov[1].iov_len = nni_msg_len(ep->rx_payload); nni_aio_set_input(&ep->rx_aio, 0, &ep->rx_sa); - nni_aio_set_iov(&ep->rx_aio, 2, iov); + nni_aio_set_iov(&ep->rx_aio, 1, &iov); nni_udp_recv(ep->udp, &ep->rx_aio); } @@ -678,8 +684,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa) nni_stat_inc(&ep->st_rcv_nocopy, 1); // Message size larger than copy break, do zero copy msg = ep->rx_payload; - if (nng_msg_alloc(&ep->rx_payload, - ep->rcvmax + sizeof(ep->rx_msg)) != 0) { + if (nng_msg_alloc(&ep->rx_payload, ep->rcvmax) != 0) { ep->rx_payload = msg; // make sure we put it back if (p->npipe != NULL) { nni_pipe_bump_error(p->npipe, NNG_ENOMEM); @@ -892,7 +897,7 @@ udp_rx_cb(void *arg) } // Received message will be in the ep rx header. - hdr = &ep->rx_msg; + hdr = ep->rx_msg; sa = &ep->rx_sa; n = nng_aio_count(aio); @@ -1281,8 +1286,7 @@ udp_ep_init( ep->refresh = NNG_UDP_REFRESH; // one minute by default ep->rcvmax = NNG_UDP_RECVMAX; ep->copymax = NNG_UDP_COPYMAX; - if ((rv = nni_msg_alloc(&ep->rx_payload, - ep->rcvmax + sizeof(ep->rx_msg)) != 0)) { + if ((rv = nni_msg_alloc(&ep->rx_payload, ep->rcvmax) != 0)) { NNI_FREE_STRUCTS(ep->tx_ring.descs, NNG_UDP_TXQUEUE_LEN); return (rv); } |
