diff options
Diffstat (limited to 'src/platform/posix')
| -rw-r--r-- | src/platform/posix/posix_udp.c | 151 |
1 files changed, 137 insertions, 14 deletions
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index a1601874..9a1cd9b8 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -1,5 +1,5 @@ // -// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2025 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -58,6 +58,8 @@ struct nni_plat_udp { nni_list udp_recvq; nni_list udp_sendq; nni_mtx udp_mtx; + bool udp_connected; // if true its a connected socket + nng_sockaddr udp_peer; // only valid if udp_connected }; static void @@ -151,10 +153,12 @@ nni_posix_udp_dorecv(nni_plat_udp *udp) iov[i].iov_base = aiov[i].iov_buf; iov[i].iov_len = aiov[i].iov_len; } - hdr.msg_iov = iov; - hdr.msg_iovlen = niov; - hdr.msg_name = &ss; - hdr.msg_namelen = sizeof(ss); + hdr.msg_iov = iov; + hdr.msg_iovlen = niov; + if (!udp->udp_connected) { + hdr.msg_name = &ss; + hdr.msg_namelen = sizeof(ss); + } if ((cnt = recvmsg(udp->udp_fd, &hdr, 0)) < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { @@ -163,7 +167,8 @@ nni_posix_udp_dorecv(nni_plat_udp *udp) return; } rv = nni_plat_errno(errno); - } else if ((sa = nni_aio_get_input(aio, 0)) != NULL) { + } else if ((!udp->udp_connected) && + (sa = nni_aio_get_input(aio, 0)) != NULL) { // We need to store the address information. // It is incumbent on the AIO submitter to supply // storage for the address. @@ -189,7 +194,8 @@ nni_posix_udp_dorecv(nni_plat_udp *udp) return; } rv = nni_plat_errno(errno); - } else if ((sa = nni_aio_get_input(aio, 0)) != NULL) { + } else if (udp->udp_connected && + (sa = nni_aio_get_input(aio, 0)) != NULL) { nni_posix_sockaddr2nn(sa, (void *) &ss, salen); } if (niov != 1) { @@ -219,7 +225,8 @@ nni_posix_udp_dosend(nni_plat_udp *udp) nni_aio_get_iov(aio, &niov, &aiov); NNI_ASSERT(niov <= NNI_AIO_MAX_IOV); - if ((salen = nni_posix_nn2sockaddr( + if ((!udp->udp_connected) && + (salen = nni_posix_nn2sockaddr( &ss, nni_aio_get_input(aio, 0))) < 1) { rv = NNG_EADDRINVAL; } else { @@ -231,10 +238,12 @@ nni_posix_udp_dosend(nni_plat_udp *udp) iov[i].iov_base = aiov[i].iov_buf; iov[i].iov_len = aiov[i].iov_len; } - hdr.msg_iov = iov; - hdr.msg_iovlen = niov; - hdr.msg_name = &ss; - hdr.msg_namelen = salen; + hdr.msg_iov = iov; + hdr.msg_iovlen = niov; + if (!udp->udp_connected) { + hdr.msg_name = &ss; + hdr.msg_namelen = salen; + } cnt = sendmsg(udp->udp_fd, &hdr, MSG_NOSIGNAL); if (cnt < 0) { @@ -255,8 +264,12 @@ nni_posix_udp_dosend(nni_plat_udp *udp) len = copy_to_bounce(aiov, niov); buf = bouncebuf; } - cnt = sendto( - udp->udp_fd, buf, len, 0, (void *) &ss, salen); + if (udp->udp_connected) { + cnt = send(udp->udp_fd, buf, len, 0); + } else { + cnt = sendto(udp->udp_fd, buf, len, 0, + (void *) &ss, salen); + } if (cnt < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { @@ -360,6 +373,106 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) return (0); } +// nni_plat_udp_connect is like nni_plat_udp_open, but it creates a *connected* +// socket by doing UDP connect. As a special case, if the peer address is +// NULL, the socket is connected, but this is a server socket, with +// SO_REUSEADDR set, in anticipation of "faking" accept and having other +// connected sockets set up. +int +nni_plat_udp_connect( + nni_plat_udp **upp, nni_sockaddr *bindaddr, nni_sockaddr *peeraddr) +{ + nni_plat_udp *udp; + int selflen; + int peerlen; + struct sockaddr_storage self; + struct sockaddr_storage peer; + int rv; + + if (bindaddr == NULL) { + return (NNG_EADDRINVAL); + } + if (peeraddr != NULL) { + if (bindaddr->s_family != peeraddr->s_family) { + return (NNG_EADDRINVAL); + } + } + switch (bindaddr->s_family) { + case NNG_AF_INET: +#ifdef NNG_ENABLE_IPV6 + case NNG_AF_INET6: +#endif + break; + default: + return (NNG_EADDRINVAL); + } + selflen = nni_posix_nn2sockaddr(&self, bindaddr); + NNI_ASSERT(selflen > 1); + if (peeraddr != NULL) { + peerlen = nni_posix_nn2sockaddr(&peer, peeraddr); + NNI_ASSERT(peerlen > 1); + NNI_ASSERT(selflen == peerlen); + } + + // UDP opens can actually run synchronously. + if ((udp = NNI_ALLOC_STRUCT(udp)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&udp->udp_mtx); + nni_aio_list_init(&udp->udp_recvq); + nni_aio_list_init(&udp->udp_sendq); + + udp->udp_fd = socket(self.ss_family, SOCK_DGRAM, IPPROTO_UDP); + if (udp->udp_fd < 0) { + rv = nni_plat_errno(errno); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } + int on = 1; + if (setsockopt( + udp->udp_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0) { + rv = nni_plat_errno(errno); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } +#if defined(SO_REUSEPORT) + if (setsockopt( + udp->udp_fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) != 0) { + rv = nni_plat_errno(errno); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } +#endif + + if (bind(udp->udp_fd, (void *) &self, selflen) != 0) { + rv = nni_plat_errno(errno); + (void) close(udp->udp_fd); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } + + // NB: This is a non-blocking call! + if (peeraddr != NULL) { + udp->udp_connected = true; + udp->udp_peer = *peeraddr; + if (connect(udp->udp_fd, (void *) &peer, peerlen) != 0) { + rv = nni_plat_errno(errno); + (void) close(udp->udp_fd); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + } + } + + nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd, nni_posix_udp_cb, udp); + + *upp = udp; + return (0); +} + void nni_plat_udp_close(nni_plat_udp *udp) { @@ -444,6 +557,16 @@ nni_plat_udp_sockname(nni_plat_udp *udp, nni_sockaddr *sa) return (nni_posix_sockaddr2nn(sa, &ss, sz)); } +int +nni_plat_udp_peername(nni_plat_udp *udp, nni_sockaddr *sa) +{ + if (!udp->udp_connected) { + return (NNG_ENOTCONN); + } + *sa = udp->udp_peer; + return (0); +} + // Joining a multicast group is different than binding to a multicast // group. This allows to receive both unicast and multicast at the given // address. |
