aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2025-04-14 00:03:47 -0700
committerGarrett D'Amore <garrett@damore.org>2025-04-19 20:42:59 -0700
commitdf3059dd130ce22f2326abeb41149bdf35e0c38d (patch)
tree83ce0a27ac4a29f058977e378cc9f321e5995ef7 /src/platform
parent55925438bc8b8fd243ab995c48c8996ac49a6652 (diff)
downloadnng-df3059dd130ce22f2326abeb41149bdf35e0c38d.tar.gz
nng-df3059dd130ce22f2326abeb41149bdf35e0c38d.tar.bz2
nng-df3059dd130ce22f2326abeb41149bdf35e0c38d.zip
UDP: Add support for connected mode UDP sockets.gdamore/udpconn
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_udp.c151
-rw-r--r--src/platform/udp_test.c88
-rw-r--r--src/platform/windows/win_udp.c141
3 files changed, 354 insertions, 26 deletions
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index a1601874..9a1cd9b8 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -1,5 +1,5 @@
//
-// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -58,6 +58,8 @@ struct nni_plat_udp {
nni_list udp_recvq;
nni_list udp_sendq;
nni_mtx udp_mtx;
+ bool udp_connected; // if true its a connected socket
+ nng_sockaddr udp_peer; // only valid if udp_connected
};
static void
@@ -151,10 +153,12 @@ nni_posix_udp_dorecv(nni_plat_udp *udp)
iov[i].iov_base = aiov[i].iov_buf;
iov[i].iov_len = aiov[i].iov_len;
}
- hdr.msg_iov = iov;
- hdr.msg_iovlen = niov;
- hdr.msg_name = &ss;
- hdr.msg_namelen = sizeof(ss);
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = niov;
+ if (!udp->udp_connected) {
+ hdr.msg_name = &ss;
+ hdr.msg_namelen = sizeof(ss);
+ }
if ((cnt = recvmsg(udp->udp_fd, &hdr, 0)) < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
@@ -163,7 +167,8 @@ nni_posix_udp_dorecv(nni_plat_udp *udp)
return;
}
rv = nni_plat_errno(errno);
- } else if ((sa = nni_aio_get_input(aio, 0)) != NULL) {
+ } else if ((!udp->udp_connected) &&
+ (sa = nni_aio_get_input(aio, 0)) != NULL) {
// We need to store the address information.
// It is incumbent on the AIO submitter to supply
// storage for the address.
@@ -189,7 +194,8 @@ nni_posix_udp_dorecv(nni_plat_udp *udp)
return;
}
rv = nni_plat_errno(errno);
- } else if ((sa = nni_aio_get_input(aio, 0)) != NULL) {
+ } else if (udp->udp_connected &&
+ (sa = nni_aio_get_input(aio, 0)) != NULL) {
nni_posix_sockaddr2nn(sa, (void *) &ss, salen);
}
if (niov != 1) {
@@ -219,7 +225,8 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
nni_aio_get_iov(aio, &niov, &aiov);
NNI_ASSERT(niov <= NNI_AIO_MAX_IOV);
- if ((salen = nni_posix_nn2sockaddr(
+ if ((!udp->udp_connected) &&
+ (salen = nni_posix_nn2sockaddr(
&ss, nni_aio_get_input(aio, 0))) < 1) {
rv = NNG_EADDRINVAL;
} else {
@@ -231,10 +238,12 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
iov[i].iov_base = aiov[i].iov_buf;
iov[i].iov_len = aiov[i].iov_len;
}
- hdr.msg_iov = iov;
- hdr.msg_iovlen = niov;
- hdr.msg_name = &ss;
- hdr.msg_namelen = salen;
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = niov;
+ if (!udp->udp_connected) {
+ hdr.msg_name = &ss;
+ hdr.msg_namelen = salen;
+ }
cnt = sendmsg(udp->udp_fd, &hdr, MSG_NOSIGNAL);
if (cnt < 0) {
@@ -255,8 +264,12 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
len = copy_to_bounce(aiov, niov);
buf = bouncebuf;
}
- cnt = sendto(
- udp->udp_fd, buf, len, 0, (void *) &ss, salen);
+ if (udp->udp_connected) {
+ cnt = send(udp->udp_fd, buf, len, 0);
+ } else {
+ cnt = sendto(udp->udp_fd, buf, len, 0,
+ (void *) &ss, salen);
+ }
if (cnt < 0) {
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK)) {
@@ -360,6 +373,106 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
return (0);
}
+// nni_plat_udp_connect is like nni_plat_udp_open, but it creates a *connected*
+// socket by doing UDP connect. As a special case, if the peer address is
+// NULL, the socket is connected, but this is a server socket, with
+// SO_REUSEADDR set, in anticipation of "faking" accept and having other
+// connected sockets set up.
+int
+nni_plat_udp_connect(
+ nni_plat_udp **upp, nni_sockaddr *bindaddr, nni_sockaddr *peeraddr)
+{
+ nni_plat_udp *udp;
+ int selflen;
+ int peerlen;
+ struct sockaddr_storage self;
+ struct sockaddr_storage peer;
+ int rv;
+
+ if (bindaddr == NULL) {
+ return (NNG_EADDRINVAL);
+ }
+ if (peeraddr != NULL) {
+ if (bindaddr->s_family != peeraddr->s_family) {
+ return (NNG_EADDRINVAL);
+ }
+ }
+ switch (bindaddr->s_family) {
+ case NNG_AF_INET:
+#ifdef NNG_ENABLE_IPV6
+ case NNG_AF_INET6:
+#endif
+ break;
+ default:
+ return (NNG_EADDRINVAL);
+ }
+ selflen = nni_posix_nn2sockaddr(&self, bindaddr);
+ NNI_ASSERT(selflen > 1);
+ if (peeraddr != NULL) {
+ peerlen = nni_posix_nn2sockaddr(&peer, peeraddr);
+ NNI_ASSERT(peerlen > 1);
+ NNI_ASSERT(selflen == peerlen);
+ }
+
+ // UDP opens can actually run synchronously.
+ if ((udp = NNI_ALLOC_STRUCT(udp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&udp->udp_mtx);
+ nni_aio_list_init(&udp->udp_recvq);
+ nni_aio_list_init(&udp->udp_sendq);
+
+ udp->udp_fd = socket(self.ss_family, SOCK_DGRAM, IPPROTO_UDP);
+ if (udp->udp_fd < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+ int on = 1;
+ if (setsockopt(
+ udp->udp_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+#if defined(SO_REUSEPORT)
+ if (setsockopt(
+ udp->udp_fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) != 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+#endif
+
+ if (bind(udp->udp_fd, (void *) &self, selflen) != 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(udp->udp_fd);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+
+ // NB: This is a non-blocking call!
+ if (peeraddr != NULL) {
+ udp->udp_connected = true;
+ udp->udp_peer = *peeraddr;
+ if (connect(udp->udp_fd, (void *) &peer, peerlen) != 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(udp->udp_fd);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ }
+ }
+
+ nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd, nni_posix_udp_cb, udp);
+
+ *upp = udp;
+ return (0);
+}
+
void
nni_plat_udp_close(nni_plat_udp *udp)
{
@@ -444,6 +557,16 @@ nni_plat_udp_sockname(nni_plat_udp *udp, nni_sockaddr *sa)
return (nni_posix_sockaddr2nn(sa, &ss, sz));
}
+int
+nni_plat_udp_peername(nni_plat_udp *udp, nni_sockaddr *sa)
+{
+ if (!udp->udp_connected) {
+ return (NNG_ENOTCONN);
+ }
+ *sa = udp->udp_peer;
+ return (0);
+}
+
// Joining a multicast group is different than binding to a multicast
// group. This allows to receive both unicast and multicast at the given
// address.
diff --git a/src/platform/udp_test.c b/src/platform/udp_test.c
index fa1faba9..d8090e99 100644
--- a/src/platform/udp_test.c
+++ b/src/platform/udp_test.c
@@ -86,6 +86,93 @@ test_udp_pair(void)
}
void
+test_udp_connected(void)
+{
+ nng_sockaddr sa1;
+ nng_sockaddr sa2;
+ nng_udp *u1;
+ nng_udp *u2;
+ nng_udp *u3;
+ nng_udp *u4;
+ uint32_t loopback;
+ nng_aio *aio1;
+ nng_aio *aio2;
+ nng_iov iov1, iov2;
+ char msg[] = "hello";
+ char rbuf[1024];
+ nng_sockaddr to;
+ nng_sockaddr from;
+
+ loopback = htonl(0x7f000001); // 127.0.0.1
+
+ sa1.s_in.sa_family = NNG_AF_INET;
+ sa1.s_in.sa_addr = loopback;
+ sa1.s_in.sa_port = 0; // wild card port binding
+
+ sa2.s_in.sa_family = NNG_AF_INET;
+ sa2.s_in.sa_addr = loopback;
+ sa2.s_in.sa_port = 0;
+
+ NUTS_PASS(nng_udp_connect(&u1, &sa1, NULL));
+ NUTS_PASS(nng_udp_sockname(u1, &sa1));
+
+ NUTS_PASS(nng_udp_connect(&u2, &sa2, NULL));
+ NUTS_PASS(nng_udp_sockname(u2, &sa2));
+
+ sa2.s_in.sa_family = NNG_AF_INET;
+ sa2.s_in.sa_addr = loopback;
+ sa2.s_in.sa_port = 0;
+
+ NUTS_PASS(nng_udp_sockname(u1, &sa1));
+ NUTS_PASS(nng_udp_sockname(u2, &sa2));
+
+ NUTS_FAIL(nng_udp_peername(u1, &to), NNG_ENOTCONN);
+ NUTS_FAIL(nng_udp_peername(u2, &to), NNG_ENOTCONN);
+
+ NUTS_PASS(nng_udp_connect(&u3, &sa1, &sa2));
+ NUTS_PASS(nng_udp_connect(&u4, &sa2, &sa1));
+
+ nng_msleep(500);
+
+ NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL));
+ NUTS_PASS(nng_aio_alloc(&aio2, NULL, NULL));
+
+ iov1.iov_buf = msg;
+ iov1.iov_len = strlen(msg) + 1;
+ NUTS_PASS(nng_aio_set_iov(aio1, 1, &iov1));
+ NUTS_PASS(nng_aio_set_input(aio1, 0, &to));
+
+ iov2.iov_buf = rbuf;
+ iov2.iov_len = 1024;
+ NUTS_PASS(nng_aio_set_iov(aio2, 1, &iov2));
+ NUTS_PASS(nng_aio_set_input(aio2, 0, &from));
+
+ nng_udp_recv(u3, aio2);
+ nng_udp_send(u4, aio1);
+ nng_aio_wait(aio1);
+ nng_aio_wait(aio2);
+
+ NUTS_PASS(nng_aio_result(aio1));
+ NUTS_PASS(nng_aio_result(aio2));
+ NUTS_ASSERT(nng_aio_count(aio2) == strlen(msg) + 1);
+ NUTS_ASSERT(strcmp(rbuf, msg) == 0);
+
+ NUTS_PASS(nng_udp_peername(u3, &from));
+ NUTS_PASS(nng_udp_peername(u4, &to));
+
+ NUTS_ASSERT(from.s_in.sa_family == sa2.s_in.sa_family);
+ NUTS_ASSERT(from.s_in.sa_addr == sa2.s_in.sa_addr);
+ NUTS_ASSERT(to.s_in.sa_port == sa1.s_in.sa_port);
+
+ nng_aio_free(aio1);
+ nng_aio_free(aio2);
+ nng_udp_close(u1);
+ nng_udp_close(u2);
+ nng_udp_close(u3);
+ nng_udp_close(u4);
+}
+
+void
test_udp_scatter_gather(void)
{
nng_sockaddr sa1;
@@ -480,6 +567,7 @@ test_udp_send_v6_from_v4(void)
NUTS_TESTS = {
{ "udp pair", test_udp_pair },
+ { "udp connected", test_udp_connected },
{ "udp scatter gather", test_udp_scatter_gather },
{ "udp send recv multi", test_udp_multi_send_recv },
{ "udp send no address", test_udp_send_no_addr },
diff --git a/src/platform/windows/win_udp.c b/src/platform/windows/win_udp.c
index a280e116..08041f01 100644
--- a/src/platform/windows/win_udp.c
+++ b/src/platform/windows/win_udp.c
@@ -1,5 +1,5 @@
//
-// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -28,6 +28,9 @@ struct nni_plat_udp {
bool closed;
SOCKADDR_STORAGE rxsa;
int rxsalen;
+ bool connected; // is the socket a connected socket?
+ nni_sockaddr peer; // only valid for connected
+ int peerlen;
};
static void udp_recv_cb(nni_win_io *, int, size_t);
@@ -85,6 +88,90 @@ nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa)
return (rv);
}
+// nni_plat_udp_connect initializes a connected UDP socket, binding to the
+// local address specified specified, and then connecting to the remote one.
+// If the peer address is NULL, then its not connected, but intended to be
+// compatible with connected sockets (by using SO_REUSEADDR.)
+int
+nni_plat_udp_connect(
+ nni_plat_udp **udpp, nni_sockaddr *self, nni_sockaddr *peer)
+{
+ nni_plat_udp *u;
+ SOCKADDR_STORAGE ss;
+ SOCKADDR_STORAGE ps;
+ int sslen;
+ int pslen;
+ DWORD no;
+ DWORD yes;
+ int rv;
+
+ if ((sslen = nni_win_nn2sockaddr(&ss, self)) < 0) {
+ return (NNG_EADDRINVAL);
+ }
+ if (peer != NULL) {
+ if (self->s_family != peer->s_family) {
+ return (NNG_EADDRINVAL);
+ }
+
+ if ((pslen = nni_win_nn2sockaddr(&ps, peer)) < 0) {
+ return (NNG_EADDRINVAL);
+ }
+ }
+
+ if ((u = NNI_ALLOC_STRUCT(u)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_aio_list_init(&u->rxq);
+ nni_mtx_init(&u->lk);
+ nni_cv_init(&u->cv, &u->lk);
+
+ u->s = socket(ss.ss_family, SOCK_DGRAM, IPPROTO_UDP);
+ if (u->s == INVALID_SOCKET) {
+ rv = nni_win_error(GetLastError());
+ nni_plat_udp_close(u);
+ return (rv);
+ }
+ // Don't inherit the handle (CLOEXEC really).
+ SetHandleInformation((HANDLE) u->s, HANDLE_FLAG_INHERIT, 0);
+ no = 0;
+ (void) setsockopt(
+ u->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no));
+
+ nni_win_io_init(&u->rxio, udp_recv_cb, u);
+
+ if ((rv = nni_win_io_register((HANDLE) u->s)) != 0) {
+ nni_plat_udp_close(u);
+ return (rv);
+ }
+
+ // We need to allow the concurrent until we switch to connected mode.
+ yes = 1;
+ (void) setsockopt(
+ u->s, SOL_SOCKET, SO_REUSEADDR, (char *) &yes, sizeof(yes));
+
+ // Bind the local address
+ if (bind(u->s, (struct sockaddr *) &ss, sslen) == SOCKET_ERROR) {
+ rv = nni_win_error(GetLastError());
+ nni_plat_udp_close(u);
+ return (rv);
+ }
+
+ // Connect, if we have a peer address
+ if (peer != NULL) {
+ if (connect(u->s, (struct sockaddr *) &ps, pslen) ==
+ SOCKET_ERROR) {
+ rv = nni_win_error(GetLastError());
+ nni_plat_udp_close(u);
+ return (rv);
+ }
+ u->connected = true;
+ u->peer = *peer;
+ }
+
+ *udpp = u;
+ return (rv);
+}
+
// nni_plat_udp_close closes the underlying UDP socket.
void
nni_plat_udp_close(nni_plat_udp *u)
@@ -123,10 +210,13 @@ nni_plat_udp_send(nni_plat_udp *u, nni_aio *aio)
DWORD nsent;
nni_aio_reset(aio);
- sa = nni_aio_get_input(aio, 0);
- if ((tolen = nni_win_nn2sockaddr(&to, sa)) < 0) {
- nni_aio_finish_error(aio, NNG_EADDRINVAL);
- return;
+
+ if (!u->connected) {
+ sa = nni_aio_get_input(aio, 0);
+ if ((tolen = nni_win_nn2sockaddr(&to, sa)) < 0) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
}
nni_aio_get_iov(aio, &naiov, &aiov);
@@ -150,10 +240,14 @@ nni_plat_udp_send(nni_plat_udp *u, nni_aio *aio)
iov[i].len = (ULONG) aiov[i].iov_len;
}
- // We can use a "non-overlapping" send; there is little point in
- // handling UDP send completions asynchronously.
- rv = WSASendTo(u->s, iov, (DWORD) naiov, &nsent, 0,
- (struct sockaddr *) &to, tolen, NULL, NULL);
+ // We can use a "non-overlapping" send; there is little point
+ // in handling UDP send completions asynchronously.
+ if (u->connected) {
+ rv = WSASend(u->s, iov, (DWORD) naiov, &nsent, 0, NULL, NULL);
+ } else {
+ rv = WSASendTo(u->s, iov, (DWORD) naiov, &nsent, 0,
+ (struct sockaddr *) &to, tolen, NULL, NULL);
+ }
if (rv == SOCKET_ERROR) {
rv = nni_win_error(GetLastError());
@@ -203,7 +297,7 @@ udp_recv_cb(nni_win_io *io, int rv, size_t num)
}
// convert address from Windows form...
- if ((sa = nni_aio_get_input(aio, 0)) != NULL) {
+ if (((sa = nni_aio_get_input(aio, 0)) != NULL) && !u->connected) {
if (nni_win_sockaddr2nn(sa, &u->rxsa, sizeof(u->rxsa)) != 0) {
rv = NNG_EADDRINVAL;
num = 0;
@@ -259,8 +353,14 @@ again:
// already. The actual aio's iov array we don't touch.
flags = 0;
- rv = WSARecvFrom(u->s, iov, (DWORD) naiov, NULL, &flags,
- (struct sockaddr *) &u->rxsa, &u->rxsalen, &u->rxio.olpd, NULL);
+ if (u->connected) {
+ rv = WSARecv(u->s, iov, (DWORD) naiov, NULL, &flags,
+ &u->rxio.olpd, NULL);
+ } else {
+ rv = WSARecvFrom(u->s, iov, (DWORD) naiov, NULL, &flags,
+ (struct sockaddr *) &u->rxsa, &u->rxsalen, &u->rxio.olpd,
+ NULL);
+ }
_freea(iov);
@@ -309,6 +409,23 @@ nni_plat_udp_sockname(nni_plat_udp *udp, nni_sockaddr *sa)
return (nni_win_sockaddr2nn(sa, &ss, sz));
}
+int
+nni_plat_udp_peername(nni_plat_udp *udp, nni_sockaddr *sa)
+{
+ SOCKADDR_STORAGE ss;
+ int sz;
+
+ if (!udp->connected) {
+ return (NNG_ENOTCONN);
+ }
+ *sa = udp->peer;
+ sz = sizeof(*sa);
+ if (getsockname(udp->s, (SOCKADDR *) &ss, &sz) < 0) {
+ return (nni_win_error(GetLastError()));
+ }
+ return (nni_win_sockaddr2nn(sa, &ss, sz));
+}
+
// Joining a multicast group is different than binding to a multicast
// group. This allows to receive both unicast and multicast at the given
// address.