aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.h4
-rw-r--r--src/platform/posix/CMakeLists.txt2
-rw-r--r--src/platform/posix/posix_udp.c159
-rw-r--r--src/platform/udp_test.c70
-rw-r--r--src/sp/transport/udp/udp.c28
5 files changed, 218 insertions, 45 deletions
diff --git a/src/core/aio.h b/src/core/aio.h
index 5c74691d..cbf2c919 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -194,6 +194,8 @@ extern void nni_aio_sys_fini(void);
typedef struct nni_aio_expire_q nni_aio_expire_q;
+#define NNI_AIO_MAX_IOV 8
+
// nng_aio is an async I/O handle. The details of this aio structure
// are private to the AIO framework. The structure has the public name
// (nng_aio) so that we minimize the pollution in the public API namespace.
@@ -216,7 +218,7 @@ struct nng_aio {
nni_task a_task;
// Read/write operations.
- nni_iov a_iov[8];
+ nni_iov a_iov[NNI_AIO_MAX_IOV];
unsigned a_nio;
// Message operations.
diff --git a/src/platform/posix/CMakeLists.txt b/src/platform/posix/CMakeLists.txt
index 95d9a56a..b8fb665a 100644
--- a/src/platform/posix/CMakeLists.txt
+++ b/src/platform/posix/CMakeLists.txt
@@ -29,6 +29,8 @@ if (NNG_PLATFORM_POSIX)
nng_check_func(getentropy NNG_HAVE_GETENTROPY)
nng_check_func(getrandom NNG_HAVE_GETRANDOM)
nng_check_func(arc4random_buf NNG_HAVE_ARC4RANDOM)
+ nng_check_func(recvmsg NNG_HAVE_RECVMSG)
+ nng_check_func(sendmsg NNG_HAVE_SENDMSG)
nng_check_func(clock_gettime NNG_HAVE_CLOCK_GETTIME_LIBC)
if (NNG_HAVE_CLOCK_GETTIME_LIBC)
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 4bb50656..a1601874 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -78,6 +78,54 @@ nni_posix_udp_doclose(nni_plat_udp *udp)
nni_posix_udp_doerror(udp, NNG_ECLOSED);
}
+// If we need to do sendmsg or recvmsg, then we do this fairly
+// awful thing to do a "pull" up. It is important that in such a
+// case we must have only a single poller, because we have only
+// this single buffer. Performance will be worse, as data copies
+// are involved.
+#if !defined(NNG_HAVE_RECVMSG) || !defined(NNG_HAVE_SENDMSG)
+static uint8_t bouncebuf[65536];
+#endif
+
+#if !defined(NNG_HAVE_RECVMSG)
+static int
+copy_to_bounce(nng_iov *iov, int niov)
+{
+ int room = sizeof(bouncebuf);
+ uint8_t *buf = bouncebuf;
+ int len = 0;
+
+ for (int i = 0; i < niov && room; i++) {
+ int n = iov[i].iov_len;
+ if (n > room) {
+ n = room;
+ }
+ memcpy(buf, iov[i].iov_buf, n);
+ room -= n;
+ buf += n;
+ len += n;
+ }
+ return (len);
+}
+#endif
+
+#if !defined(NNG_HAVE_SENDMSG)
+static void
+copy_from_bounce(nng_iov *iov, int niov, int len)
+{
+ uint8_t *buf = bouncebuf;
+ for (int i = 0; i < niov && len; i++) {
+ int n = iov[i].iov_len;
+ if (n > len) {
+ n = len;
+ }
+ memcpy(iov[i].iov_buf, buf, n);
+ len -= n;
+ buf += n;
+ }
+}
+#endif
+
static void
nni_posix_udp_dorecv(nni_plat_udp *udp)
{
@@ -85,16 +133,19 @@ nni_posix_udp_dorecv(nni_plat_udp *udp)
nni_list *q = &udp->udp_recvq;
// While we're able to recv, do so.
while ((aio = nni_list_first(q)) != NULL) {
- struct iovec iov[4];
unsigned niov;
- nni_iov *aiov;
+ nng_iov *aiov;
struct sockaddr_storage ss;
nng_sockaddr *sa;
- struct msghdr hdr = { .msg_name = NULL };
int rv = 0;
int cnt = 0;
nni_aio_get_iov(aio, &niov, &aiov);
+ NNI_ASSERT(niov <= NNI_AIO_MAX_IOV);
+
+#ifdef NNG_HAVE_RECVMSG
+ struct iovec iov[NNI_AIO_MAX_IOV];
+ struct msghdr hdr = { .msg_name = NULL };
for (unsigned i = 0; i < niov; i++) {
iov[i].iov_base = aiov[i].iov_buf;
@@ -119,6 +170,32 @@ nni_posix_udp_dorecv(nni_plat_udp *udp)
nni_posix_sockaddr2nn(
sa, (void *) &ss, hdr.msg_namelen);
}
+#else // !NNG_HAVE_RECVMSG
+ // Here we have to use a bounce buffer
+ uint8_t *buf;
+ size_t len;
+ socklen_t salen;
+ if (niov == 1) {
+ buf = aiov[0].iov_buf;
+ len = aiov[0].iov_len;
+ } else {
+ buf = bouncebuf;
+ len = sizeof(bouncebuf);
+ }
+ salen = sizeof(ss);
+ if ((cnt = recvfrom(udp->udp_fd, buf, len, 0, (void *) &ss,
+ &salen)) < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return;
+ }
+ rv = nni_plat_errno(errno);
+ } else if ((sa = nni_aio_get_input(aio, 0)) != NULL) {
+ nni_posix_sockaddr2nn(sa, (void *) &ss, salen);
+ }
+ if (niov != 1) {
+ copy_from_bounce(aiov, niov, cnt);
+ }
+#endif
nni_list_remove(q, aio);
nni_aio_finish(aio, rv, cnt);
}
@@ -134,43 +211,61 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
while ((aio = nni_list_first(q)) != NULL) {
struct sockaddr_storage ss;
- int len;
- int rv = 0;
- int cnt = 0;
+ int salen;
+ int rv = 0;
+ int cnt = 0;
+ unsigned niov;
+ nni_iov *aiov;
- len = nni_posix_nn2sockaddr(&ss, nni_aio_get_input(aio, 0));
- if (len < 1) {
+ nni_aio_get_iov(aio, &niov, &aiov);
+ NNI_ASSERT(niov <= NNI_AIO_MAX_IOV);
+ if ((salen = nni_posix_nn2sockaddr(
+ &ss, nni_aio_get_input(aio, 0))) < 1) {
rv = NNG_EADDRINVAL;
} else {
- unsigned niov;
- nni_iov *aiov;
- struct iovec iov[16];
+#ifdef NNG_HAVE_SENDMSG
- nni_aio_get_iov(aio, &niov, &aiov);
- if (niov > NNI_NUM_ELEMENTS(iov)) {
- rv = NNG_EINVAL;
+ struct iovec iov[NNI_AIO_MAX_IOV];
+ struct msghdr hdr = { .msg_name = NULL };
+ for (unsigned i = 0; i < niov; i++) {
+ iov[i].iov_base = aiov[i].iov_buf;
+ iov[i].iov_len = aiov[i].iov_len;
}
- if (rv == 0) {
- struct msghdr hdr = { .msg_name = NULL };
- for (unsigned i = 0; i < niov; i++) {
- iov[i].iov_base = aiov[i].iov_buf;
- iov[i].iov_len = aiov[i].iov_len;
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = niov;
+ hdr.msg_name = &ss;
+ hdr.msg_namelen = salen;
+
+ cnt = sendmsg(udp->udp_fd, &hdr, MSG_NOSIGNAL);
+ if (cnt < 0) {
+ if ((errno == EAGAIN) ||
+ (errno == EWOULDBLOCK)) {
+ // Cannot send now, leave.
+ return;
}
- hdr.msg_iov = iov;
- hdr.msg_iovlen = niov;
- hdr.msg_name = &ss;
- hdr.msg_namelen = len;
-
- cnt = sendmsg(udp->udp_fd, &hdr, MSG_NOSIGNAL);
- if (cnt < 0) {
- if ((errno == EAGAIN) ||
- (errno == EWOULDBLOCK)) {
- // Cannot send now, leave.
- return;
- }
- rv = nni_plat_errno(errno);
+ rv = nni_plat_errno(errno);
+ }
+#else // !NNG_HAVE_SENDMSG
+ uint8_t *buf;
+ size_t len;
+ if (niov == 1) {
+ buf = aiov[0].iov_buf;
+ len = aiov[0].iov_len;
+ } else {
+ len = copy_to_bounce(aiov, niov);
+ buf = bouncebuf;
+ }
+ cnt = sendto(
+ udp->udp_fd, buf, len, 0, (void *) &ss, salen);
+ if (cnt < 0) {
+ if ((errno == EAGAIN) ||
+ (errno == EWOULDBLOCK)) {
+ // Cannot send now, leave.
+ return;
}
+ rv = nni_plat_errno(errno);
}
+#endif
}
nni_list_remove(q, aio);
diff --git a/src/platform/udp_test.c b/src/platform/udp_test.c
index 211c985c..fa1faba9 100644
--- a/src/platform/udp_test.c
+++ b/src/platform/udp_test.c
@@ -86,6 +86,75 @@ test_udp_pair(void)
}
void
+test_udp_scatter_gather(void)
+{
+ nng_sockaddr sa1;
+ nng_sockaddr sa2;
+ nng_udp *u1;
+ nng_udp *u2;
+ uint32_t loopback;
+ nng_aio *aio1;
+ nng_aio *aio2;
+ nng_iov iov1[2], iov2[2];
+ char rbuf[1024];
+ nng_sockaddr to;
+ nng_sockaddr from;
+
+ loopback = htonl(0x7f000001); // 127.0.0.1
+
+ sa1.s_in.sa_family = NNG_AF_INET;
+ sa1.s_in.sa_addr = loopback;
+ sa1.s_in.sa_port = 0; // wild card port binding
+
+ sa2.s_in.sa_family = NNG_AF_INET;
+ sa2.s_in.sa_addr = loopback;
+ sa2.s_in.sa_port = 0;
+
+ NUTS_PASS(nng_udp_open(&u1, &sa1));
+ NUTS_PASS(nng_udp_open(&u2, &sa2));
+
+ NUTS_PASS(nng_udp_sockname(u1, &sa1));
+ NUTS_PASS(nng_udp_sockname(u2, &sa2));
+
+ NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL));
+ NUTS_PASS(nng_aio_alloc(&aio2, NULL, NULL));
+
+ to = sa2;
+ iov1[0].iov_buf = "abc";
+ iov1[0].iov_len = 3;
+ iov1[1].iov_buf = "def";
+ iov1[1].iov_len = 4; // include nul
+ NUTS_PASS(nng_aio_set_iov(aio1, 2, iov1));
+ NUTS_PASS(nng_aio_set_input(aio1, 0, &to));
+
+ iov2[0].iov_buf = rbuf;
+ iov2[0].iov_len = 4;
+ iov2[1].iov_len = 1020;
+ iov2[1].iov_buf = rbuf + 4;
+
+ NUTS_PASS(nng_aio_set_iov(aio2, 2, iov2));
+ NUTS_PASS(nng_aio_set_input(aio2, 0, &from));
+
+ nng_udp_recv(u2, aio2);
+ nng_udp_send(u1, aio1);
+ nng_aio_wait(aio1);
+ nng_aio_wait(aio2);
+
+ NUTS_PASS(nng_aio_result(aio1));
+ NUTS_PASS(nng_aio_result(aio2));
+ NUTS_TRUE(nng_aio_count(aio2) == strlen("abcdef") + 1);
+ NUTS_TRUE(strcmp(rbuf, "abcdef") == 0);
+ NUTS_TRUE(from.s_in.sa_family == sa1.s_in.sa_family);
+ NUTS_TRUE(from.s_in.sa_addr == sa1.s_in.sa_addr);
+ NUTS_TRUE(from.s_in.sa_port == sa1.s_in.sa_port);
+
+ nng_aio_free(aio1);
+ nng_aio_free(aio2);
+ nng_udp_close(u1);
+ nng_udp_close(u2);
+}
+
+void
test_udp_multi_send_recv(void)
{
nng_sockaddr sa1, sa2, sa3, sa4;
@@ -411,6 +480,7 @@ test_udp_send_v6_from_v4(void)
NUTS_TESTS = {
{ "udp pair", test_udp_pair },
+ { "udp scatter gather", test_udp_scatter_gather },
{ "udp send recv multi", test_udp_multi_send_recv },
{ "udp send no address", test_udp_send_no_addr },
{ "udp send ipc address", test_udp_send_ipc },
diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c
index fee142e3..45260a1b 100644
--- a/src/sp/transport/udp/udp.c
+++ b/src/sp/transport/udp/udp.c
@@ -214,7 +214,7 @@ struct udp_ep {
nni_list connaios; // aios from accept waiting for a client peer
nni_list connpipes; // pipes waiting to be connected
nng_duration refresh; // refresh interval for connections in seconds
- udp_sp_msg rx_msg; // contains the received message header
+ udp_sp_msg *rx_msg; // contains the received message header
uint16_t rcvmax; // max payload, trimmed to uint16_t
uint16_t copymax;
udp_txring tx_ring;
@@ -385,14 +385,20 @@ udp_pipe_schedule(udp_pipe *p)
static void
udp_start_rx(udp_ep *ep)
{
- nni_iov iov[2];
+ nni_iov iov;
+
+ // We use this trick to collect the message header so that we can
+ // do the entire message in a single iov, which avoids the need to
+ // scatter/gather (which can be problematic for platforms that cannot
+ // do scatter/gather due to missing recvmsg.)
+ (void) nni_msg_insert(ep->rx_payload, NULL, sizeof(udp_sp_msg));
+ iov.iov_buf = nni_msg_body(ep->rx_payload);
+ iov.iov_len = nni_msg_len(ep->rx_payload);
+ ep->rx_msg = nni_msg_body(ep->rx_payload);
+ nni_msg_trim(ep->rx_payload, sizeof(udp_sp_msg));
- iov[0].iov_buf = &ep->rx_msg;
- iov[0].iov_len = sizeof(ep->rx_msg);
- iov[1].iov_buf = nni_msg_body(ep->rx_payload);
- iov[1].iov_len = nni_msg_len(ep->rx_payload);
nni_aio_set_input(&ep->rx_aio, 0, &ep->rx_sa);
- nni_aio_set_iov(&ep->rx_aio, 2, iov);
+ nni_aio_set_iov(&ep->rx_aio, 1, &iov);
nni_udp_recv(ep->udp, &ep->rx_aio);
}
@@ -678,8 +684,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa)
nni_stat_inc(&ep->st_rcv_nocopy, 1);
// Message size larger than copy break, do zero copy
msg = ep->rx_payload;
- if (nng_msg_alloc(&ep->rx_payload,
- ep->rcvmax + sizeof(ep->rx_msg)) != 0) {
+ if (nng_msg_alloc(&ep->rx_payload, ep->rcvmax) != 0) {
ep->rx_payload = msg; // make sure we put it back
if (p->npipe != NULL) {
nni_pipe_bump_error(p->npipe, NNG_ENOMEM);
@@ -892,7 +897,7 @@ udp_rx_cb(void *arg)
}
// Received message will be in the ep rx header.
- hdr = &ep->rx_msg;
+ hdr = ep->rx_msg;
sa = &ep->rx_sa;
n = nng_aio_count(aio);
@@ -1281,8 +1286,7 @@ udp_ep_init(
ep->refresh = NNG_UDP_REFRESH; // one minute by default
ep->rcvmax = NNG_UDP_RECVMAX;
ep->copymax = NNG_UDP_COPYMAX;
- if ((rv = nni_msg_alloc(&ep->rx_payload,
- ep->rcvmax + sizeof(ep->rx_msg)) != 0)) {
+ if ((rv = nni_msg_alloc(&ep->rx_payload, ep->rcvmax) != 0)) {
NNI_FREE_STRUCTS(ep->tx_ring.descs, NNG_UDP_TXQUEUE_LEN);
return (rv);
}