aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_udp.c151
1 files changed, 137 insertions, 14 deletions
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index a1601874..9a1cd9b8 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -1,5 +1,5 @@
//
-// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -58,6 +58,8 @@ struct nni_plat_udp {
nni_list udp_recvq;
nni_list udp_sendq;
nni_mtx udp_mtx;
+ bool udp_connected; // if true its a connected socket
+ nng_sockaddr udp_peer; // only valid if udp_connected
};
static void
@@ -151,10 +153,12 @@ nni_posix_udp_dorecv(nni_plat_udp *udp)
iov[i].iov_base = aiov[i].iov_buf;
iov[i].iov_len = aiov[i].iov_len;
}
- hdr.msg_iov = iov;
- hdr.msg_iovlen = niov;
- hdr.msg_name = &ss;
- hdr.msg_namelen = sizeof(ss);
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = niov;
+ if (!udp->udp_connected) {
+ hdr.msg_name = &ss;
+ hdr.msg_namelen = sizeof(ss);
+ }
if ((cnt = recvmsg(udp->udp_fd, &hdr, 0)) < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
@@ -163,7 +167,8 @@ nni_posix_udp_dorecv(nni_plat_udp *udp)
return;
}
rv = nni_plat_errno(errno);
- } else if ((sa = nni_aio_get_input(aio, 0)) != NULL) {
+ } else if ((!udp->udp_connected) &&
+ (sa = nni_aio_get_input(aio, 0)) != NULL) {
// We need to store the address information.
// It is incumbent on the AIO submitter to supply
// storage for the address.
@@ -189,7 +194,8 @@ nni_posix_udp_dorecv(nni_plat_udp *udp)
return;
}
rv = nni_plat_errno(errno);
- } else if ((sa = nni_aio_get_input(aio, 0)) != NULL) {
+ } else if (udp->udp_connected &&
+ (sa = nni_aio_get_input(aio, 0)) != NULL) {
nni_posix_sockaddr2nn(sa, (void *) &ss, salen);
}
if (niov != 1) {
@@ -219,7 +225,8 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
nni_aio_get_iov(aio, &niov, &aiov);
NNI_ASSERT(niov <= NNI_AIO_MAX_IOV);
- if ((salen = nni_posix_nn2sockaddr(
+ if ((!udp->udp_connected) &&
+ (salen = nni_posix_nn2sockaddr(
&ss, nni_aio_get_input(aio, 0))) < 1) {
rv = NNG_EADDRINVAL;
} else {
@@ -231,10 +238,12 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
iov[i].iov_base = aiov[i].iov_buf;
iov[i].iov_len = aiov[i].iov_len;
}
- hdr.msg_iov = iov;
- hdr.msg_iovlen = niov;
- hdr.msg_name = &ss;
- hdr.msg_namelen = salen;
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = niov;
+ if (!udp->udp_connected) {
+ hdr.msg_name = &ss;
+ hdr.msg_namelen = salen;
+ }
cnt = sendmsg(udp->udp_fd, &hdr, MSG_NOSIGNAL);
if (cnt < 0) {
@@ -255,8 +264,12 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
len = copy_to_bounce(aiov, niov);
buf = bouncebuf;
}
- cnt = sendto(
- udp->udp_fd, buf, len, 0, (void *) &ss, salen);
+ if (udp->udp_connected) {
+ cnt = send(udp->udp_fd, buf, len, 0);
+ } else {
+ cnt = sendto(udp->udp_fd, buf, len, 0,
+ (void *) &ss, salen);
+ }
if (cnt < 0) {
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK)) {
@@ -360,6 +373,106 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
return (0);
}
+// nni_plat_udp_connect is like nni_plat_udp_open, but it creates a *connected*
+// socket by doing UDP connect. As a special case, if the peer address is
+// NULL, the socket is connected, but this is a server socket, with
+// SO_REUSEADDR set, in anticipation of "faking" accept and having other
+// connected sockets set up.
+int
+nni_plat_udp_connect(
+ nni_plat_udp **upp, nni_sockaddr *bindaddr, nni_sockaddr *peeraddr)
+{
+ nni_plat_udp *udp;
+ int selflen;
+ int peerlen;
+ struct sockaddr_storage self;
+ struct sockaddr_storage peer;
+ int rv;
+
+ if (bindaddr == NULL) {
+ return (NNG_EADDRINVAL);
+ }
+ if (peeraddr != NULL) {
+ if (bindaddr->s_family != peeraddr->s_family) {
+ return (NNG_EADDRINVAL);
+ }
+ }
+ switch (bindaddr->s_family) {
+ case NNG_AF_INET:
+#ifdef NNG_ENABLE_IPV6
+ case NNG_AF_INET6:
+#endif
+ break;
+ default:
+ return (NNG_EADDRINVAL);
+ }
+ selflen = nni_posix_nn2sockaddr(&self, bindaddr);
+ NNI_ASSERT(selflen > 1);
+ if (peeraddr != NULL) {
+ peerlen = nni_posix_nn2sockaddr(&peer, peeraddr);
+ NNI_ASSERT(peerlen > 1);
+ NNI_ASSERT(selflen == peerlen);
+ }
+
+ // UDP opens can actually run synchronously.
+ if ((udp = NNI_ALLOC_STRUCT(udp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&udp->udp_mtx);
+ nni_aio_list_init(&udp->udp_recvq);
+ nni_aio_list_init(&udp->udp_sendq);
+
+ udp->udp_fd = socket(self.ss_family, SOCK_DGRAM, IPPROTO_UDP);
+ if (udp->udp_fd < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+ int on = 1;
+ if (setsockopt(
+ udp->udp_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+#if defined(SO_REUSEPORT)
+ if (setsockopt(
+ udp->udp_fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) != 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+#endif
+
+ if (bind(udp->udp_fd, (void *) &self, selflen) != 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(udp->udp_fd);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+
+ // NB: This is a non-blocking call!
+ if (peeraddr != NULL) {
+ udp->udp_connected = true;
+ udp->udp_peer = *peeraddr;
+ if (connect(udp->udp_fd, (void *) &peer, peerlen) != 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(udp->udp_fd);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ }
+ }
+
+ nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd, nni_posix_udp_cb, udp);
+
+ *upp = udp;
+ return (0);
+}
+
void
nni_plat_udp_close(nni_plat_udp *udp)
{
@@ -444,6 +557,16 @@ nni_plat_udp_sockname(nni_plat_udp *udp, nni_sockaddr *sa)
return (nni_posix_sockaddr2nn(sa, &ss, sz));
}
+int
+nni_plat_udp_peername(nni_plat_udp *udp, nni_sockaddr *sa)
+{
+ if (!udp->udp_connected) {
+ return (NNG_ENOTCONN);
+ }
+ *sa = udp->udp_peer;
+ return (0);
+}
+
// Joining a multicast group is different than binding to a multicast
// group. This allows to receive both unicast and multicast at the given
// address.