From df3059dd130ce22f2326abeb41149bdf35e0c38d Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 14 Apr 2025 00:03:47 -0700 Subject: UDP: Add support for connected mode UDP sockets. --- src/core/platform.h | 15 ++-- src/nng.c | 13 ++++ src/platform/posix/posix_udp.c | 151 +++++++++++++++++++++++++++++++++++++---- src/platform/udp_test.c | 88 ++++++++++++++++++++++++ src/platform/windows/win_udp.c | 141 ++++++++++++++++++++++++++++++++++---- 5 files changed, 377 insertions(+), 31 deletions(-) (limited to 'src') diff --git a/src/core/platform.h b/src/core/platform.h index bdb8beb4..3658020c 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -353,11 +353,15 @@ extern int nni_ipc_listener_alloc(nng_stream_listener **, const nng_url *); // typedef struct nni_plat_udp nni_plat_udp; -// nni_plat_udp_open initializes a UDP socket, binding to the local -// address specified in the AIO. The remote address is -// not used. The resulting nni_plat_udp structure is returned in the -// aio's a_pipe. -extern int nni_plat_udp_open(nni_plat_udp **, nni_sockaddr *); +// nni_plat_udp_open initializes a UDP socket, binding to the specified local +// address. +extern int nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *self); + +// nni_plat_udp_connected initializes a connected UDP socket, binding to the +// local address specified in the AIO, and connecting to the remote one. +// This is a non-blocking operation. +extern int nni_plat_udp_connect( + nni_plat_udp **udpp, nni_sockaddr *self, nni_sockaddr *peer); // nni_plat_udp_close closes the underlying UDP socket. extern void nni_plat_udp_close(nni_plat_udp *); @@ -405,6 +409,7 @@ extern void nni_plat_pipe_clear(int); extern void nni_plat_pipe_close(int, int); extern int nni_plat_udp_sockname(nni_plat_udp *, nni_sockaddr *); +extern int nni_plat_udp_peername(nni_plat_udp *, nni_sockaddr *); // nni_socket_pair is used to create a socket pair using socketpair() // on POSIX systems. (Windows might provide a similar solution, using diff --git a/src/nng.c b/src/nng.c index 68820675..96591678 100644 --- a/src/nng.c +++ b/src/nng.c @@ -1268,6 +1268,7 @@ static const struct { { NNG_EPEERAUTH, "Peer could not be authenticated" }, { NNG_EBADTYPE, "Incorrect type" }, { NNG_ECONNSHUT, "Connection shutdown" }, + { NNG_ENOTCONN, "Not connected"}, { NNG_ESTOPPED, "Operation stopped"}, { NNG_EINTERNAL, "Internal error detected" }, { 0, NULL }, @@ -2147,6 +2148,12 @@ nng_udp_open(nng_udp **udp, nng_sockaddr *sa) return (nni_plat_udp_open((nni_plat_udp **) udp, sa)); } +int +nng_udp_connect(nng_udp **udp, nng_sockaddr *self, nng_sockaddr *peer) +{ + return (nni_plat_udp_connect((nni_plat_udp **) udp, self, peer)); +} + void nng_udp_close(nng_udp *udp) { @@ -2159,6 +2166,12 @@ nng_udp_sockname(nng_udp *udp, nng_sockaddr *sa) return (nni_plat_udp_sockname((nni_plat_udp *) udp, sa)); } +int +nng_udp_peername(nng_udp *udp, nng_sockaddr *sa) +{ + return (nni_plat_udp_peername((nni_plat_udp *) udp, sa)); +} + void nng_udp_send(nng_udp *udp, nng_aio *aio) { diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index a1601874..9a1cd9b8 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -1,5 +1,5 @@ // -// Copyright 2024 Staysail Systems, Inc. +// Copyright 2025 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -58,6 +58,8 @@ struct nni_plat_udp { nni_list udp_recvq; nni_list udp_sendq; nni_mtx udp_mtx; + bool udp_connected; // if true its a connected socket + nng_sockaddr udp_peer; // only valid if udp_connected }; static void @@ -151,10 +153,12 @@ nni_posix_udp_dorecv(nni_plat_udp *udp) iov[i].iov_base = aiov[i].iov_buf; iov[i].iov_len = aiov[i].iov_len; } - hdr.msg_iov = iov; - hdr.msg_iovlen = niov; - hdr.msg_name = &ss; - hdr.msg_namelen = sizeof(ss); + hdr.msg_iov = iov; + hdr.msg_iovlen = niov; + if (!udp->udp_connected) { + hdr.msg_name = &ss; + hdr.msg_namelen = sizeof(ss); + } if ((cnt = recvmsg(udp->udp_fd, &hdr, 0)) < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { @@ -163,7 +167,8 @@ nni_posix_udp_dorecv(nni_plat_udp *udp) return; } rv = nni_plat_errno(errno); - } else if ((sa = nni_aio_get_input(aio, 0)) != NULL) { + } else if ((!udp->udp_connected) && + (sa = nni_aio_get_input(aio, 0)) != NULL) { // We need to store the address information. // It is incumbent on the AIO submitter to supply // storage for the address. @@ -189,7 +194,8 @@ nni_posix_udp_dorecv(nni_plat_udp *udp) return; } rv = nni_plat_errno(errno); - } else if ((sa = nni_aio_get_input(aio, 0)) != NULL) { + } else if (udp->udp_connected && + (sa = nni_aio_get_input(aio, 0)) != NULL) { nni_posix_sockaddr2nn(sa, (void *) &ss, salen); } if (niov != 1) { @@ -219,7 +225,8 @@ nni_posix_udp_dosend(nni_plat_udp *udp) nni_aio_get_iov(aio, &niov, &aiov); NNI_ASSERT(niov <= NNI_AIO_MAX_IOV); - if ((salen = nni_posix_nn2sockaddr( + if ((!udp->udp_connected) && + (salen = nni_posix_nn2sockaddr( &ss, nni_aio_get_input(aio, 0))) < 1) { rv = NNG_EADDRINVAL; } else { @@ -231,10 +238,12 @@ nni_posix_udp_dosend(nni_plat_udp *udp) iov[i].iov_base = aiov[i].iov_buf; iov[i].iov_len = aiov[i].iov_len; } - hdr.msg_iov = iov; - hdr.msg_iovlen = niov; - hdr.msg_name = &ss; - hdr.msg_namelen = salen; + hdr.msg_iov = iov; + hdr.msg_iovlen = niov; + if (!udp->udp_connected) { + hdr.msg_name = &ss; + hdr.msg_namelen = salen; + } cnt = sendmsg(udp->udp_fd, &hdr, MSG_NOSIGNAL); if (cnt < 0) { @@ -255,8 +264,12 @@ nni_posix_udp_dosend(nni_plat_udp *udp) len = copy_to_bounce(aiov, niov); buf = bouncebuf; } - cnt = sendto( - udp->udp_fd, buf, len, 0, (void *) &ss, salen); + if (udp->udp_connected) { + cnt = send(udp->udp_fd, buf, len, 0); + } else { + cnt = sendto(udp->udp_fd, buf, len, 0, + (void *) &ss, salen); + } if (cnt < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { @@ -360,6 +373,106 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) return (0); } +// nni_plat_udp_connect is like nni_plat_udp_open, but it creates a *connected* +// socket by doing UDP connect. As a special case, if the peer address is +// NULL, the socket is connected, but this is a server socket, with +// SO_REUSEADDR set, in anticipation of "faking" accept and having other +// connected sockets set up. +int +nni_plat_udp_connect( + nni_plat_udp **upp, nni_sockaddr *bindaddr, nni_sockaddr *peeraddr) +{ + nni_plat_udp *udp; + int selflen; + int peerlen; + struct sockaddr_storage self; + struct sockaddr_storage peer; + int rv; + + if (bindaddr == NULL) { + return (NNG_EADDRINVAL); + } + if (peeraddr != NULL) { + if (bindaddr->s_family != peeraddr->s_family) { + return (NNG_EADDRINVAL); + } + } + switch (bindaddr->s_family) { + case NNG_AF_INET: +#ifdef NNG_ENABLE_IPV6 + case NNG_AF_INET6: +#endif + break; + default: + return (NNG_EADDRINVAL); + } + selflen = nni_posix_nn2sockaddr(&self, bindaddr); + NNI_ASSERT(selflen > 1); + if (peeraddr != NULL) { + peerlen = nni_posix_nn2sockaddr(&peer, peeraddr); + NNI_ASSERT(peerlen > 1); + NNI_ASSERT(selflen == peerlen); + } + + // UDP opens can actually run synchronously. + if ((udp = NNI_ALLOC_STRUCT(udp)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&udp->udp_mtx); + nni_aio_list_init(&udp->udp_recvq); + nni_aio_list_init(&udp->udp_sendq); + + udp->udp_fd = socket(self.ss_family, SOCK_DGRAM, IPPROTO_UDP); + if (udp->udp_fd < 0) { + rv = nni_plat_errno(errno); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } + int on = 1; + if (setsockopt( + udp->udp_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0) { + rv = nni_plat_errno(errno); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } +#if defined(SO_REUSEPORT) + if (setsockopt( + udp->udp_fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) != 0) { + rv = nni_plat_errno(errno); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } +#endif + + if (bind(udp->udp_fd, (void *) &self, selflen) != 0) { + rv = nni_plat_errno(errno); + (void) close(udp->udp_fd); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } + + // NB: This is a non-blocking call! + if (peeraddr != NULL) { + udp->udp_connected = true; + udp->udp_peer = *peeraddr; + if (connect(udp->udp_fd, (void *) &peer, peerlen) != 0) { + rv = nni_plat_errno(errno); + (void) close(udp->udp_fd); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + } + } + + nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd, nni_posix_udp_cb, udp); + + *upp = udp; + return (0); +} + void nni_plat_udp_close(nni_plat_udp *udp) { @@ -444,6 +557,16 @@ nni_plat_udp_sockname(nni_plat_udp *udp, nni_sockaddr *sa) return (nni_posix_sockaddr2nn(sa, &ss, sz)); } +int +nni_plat_udp_peername(nni_plat_udp *udp, nni_sockaddr *sa) +{ + if (!udp->udp_connected) { + return (NNG_ENOTCONN); + } + *sa = udp->udp_peer; + return (0); +} + // Joining a multicast group is different than binding to a multicast // group. This allows to receive both unicast and multicast at the given // address. diff --git a/src/platform/udp_test.c b/src/platform/udp_test.c index fa1faba9..d8090e99 100644 --- a/src/platform/udp_test.c +++ b/src/platform/udp_test.c @@ -85,6 +85,93 @@ test_udp_pair(void) nng_udp_close(u2); } +void +test_udp_connected(void) +{ + nng_sockaddr sa1; + nng_sockaddr sa2; + nng_udp *u1; + nng_udp *u2; + nng_udp *u3; + nng_udp *u4; + uint32_t loopback; + nng_aio *aio1; + nng_aio *aio2; + nng_iov iov1, iov2; + char msg[] = "hello"; + char rbuf[1024]; + nng_sockaddr to; + nng_sockaddr from; + + loopback = htonl(0x7f000001); // 127.0.0.1 + + sa1.s_in.sa_family = NNG_AF_INET; + sa1.s_in.sa_addr = loopback; + sa1.s_in.sa_port = 0; // wild card port binding + + sa2.s_in.sa_family = NNG_AF_INET; + sa2.s_in.sa_addr = loopback; + sa2.s_in.sa_port = 0; + + NUTS_PASS(nng_udp_connect(&u1, &sa1, NULL)); + NUTS_PASS(nng_udp_sockname(u1, &sa1)); + + NUTS_PASS(nng_udp_connect(&u2, &sa2, NULL)); + NUTS_PASS(nng_udp_sockname(u2, &sa2)); + + sa2.s_in.sa_family = NNG_AF_INET; + sa2.s_in.sa_addr = loopback; + sa2.s_in.sa_port = 0; + + NUTS_PASS(nng_udp_sockname(u1, &sa1)); + NUTS_PASS(nng_udp_sockname(u2, &sa2)); + + NUTS_FAIL(nng_udp_peername(u1, &to), NNG_ENOTCONN); + NUTS_FAIL(nng_udp_peername(u2, &to), NNG_ENOTCONN); + + NUTS_PASS(nng_udp_connect(&u3, &sa1, &sa2)); + NUTS_PASS(nng_udp_connect(&u4, &sa2, &sa1)); + + nng_msleep(500); + + NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + NUTS_PASS(nng_aio_alloc(&aio2, NULL, NULL)); + + iov1.iov_buf = msg; + iov1.iov_len = strlen(msg) + 1; + NUTS_PASS(nng_aio_set_iov(aio1, 1, &iov1)); + NUTS_PASS(nng_aio_set_input(aio1, 0, &to)); + + iov2.iov_buf = rbuf; + iov2.iov_len = 1024; + NUTS_PASS(nng_aio_set_iov(aio2, 1, &iov2)); + NUTS_PASS(nng_aio_set_input(aio2, 0, &from)); + + nng_udp_recv(u3, aio2); + nng_udp_send(u4, aio1); + nng_aio_wait(aio1); + nng_aio_wait(aio2); + + NUTS_PASS(nng_aio_result(aio1)); + NUTS_PASS(nng_aio_result(aio2)); + NUTS_ASSERT(nng_aio_count(aio2) == strlen(msg) + 1); + NUTS_ASSERT(strcmp(rbuf, msg) == 0); + + NUTS_PASS(nng_udp_peername(u3, &from)); + NUTS_PASS(nng_udp_peername(u4, &to)); + + NUTS_ASSERT(from.s_in.sa_family == sa2.s_in.sa_family); + NUTS_ASSERT(from.s_in.sa_addr == sa2.s_in.sa_addr); + NUTS_ASSERT(to.s_in.sa_port == sa1.s_in.sa_port); + + nng_aio_free(aio1); + nng_aio_free(aio2); + nng_udp_close(u1); + nng_udp_close(u2); + nng_udp_close(u3); + nng_udp_close(u4); +} + void test_udp_scatter_gather(void) { @@ -480,6 +567,7 @@ test_udp_send_v6_from_v4(void) NUTS_TESTS = { { "udp pair", test_udp_pair }, + { "udp connected", test_udp_connected }, { "udp scatter gather", test_udp_scatter_gather }, { "udp send recv multi", test_udp_multi_send_recv }, { "udp send no address", test_udp_send_no_addr }, diff --git a/src/platform/windows/win_udp.c b/src/platform/windows/win_udp.c index a280e116..08041f01 100644 --- a/src/platform/windows/win_udp.c +++ b/src/platform/windows/win_udp.c @@ -1,5 +1,5 @@ // -// Copyright 2024 Staysail Systems, Inc. +// Copyright 2025 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -28,6 +28,9 @@ struct nni_plat_udp { bool closed; SOCKADDR_STORAGE rxsa; int rxsalen; + bool connected; // is the socket a connected socket? + nni_sockaddr peer; // only valid for connected + int peerlen; }; static void udp_recv_cb(nni_win_io *, int, size_t); @@ -85,6 +88,90 @@ nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa) return (rv); } +// nni_plat_udp_connect initializes a connected UDP socket, binding to the +// local address specified specified, and then connecting to the remote one. +// If the peer address is NULL, then its not connected, but intended to be +// compatible with connected sockets (by using SO_REUSEADDR.) +int +nni_plat_udp_connect( + nni_plat_udp **udpp, nni_sockaddr *self, nni_sockaddr *peer) +{ + nni_plat_udp *u; + SOCKADDR_STORAGE ss; + SOCKADDR_STORAGE ps; + int sslen; + int pslen; + DWORD no; + DWORD yes; + int rv; + + if ((sslen = nni_win_nn2sockaddr(&ss, self)) < 0) { + return (NNG_EADDRINVAL); + } + if (peer != NULL) { + if (self->s_family != peer->s_family) { + return (NNG_EADDRINVAL); + } + + if ((pslen = nni_win_nn2sockaddr(&ps, peer)) < 0) { + return (NNG_EADDRINVAL); + } + } + + if ((u = NNI_ALLOC_STRUCT(u)) == NULL) { + return (NNG_ENOMEM); + } + nni_aio_list_init(&u->rxq); + nni_mtx_init(&u->lk); + nni_cv_init(&u->cv, &u->lk); + + u->s = socket(ss.ss_family, SOCK_DGRAM, IPPROTO_UDP); + if (u->s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + nni_plat_udp_close(u); + return (rv); + } + // Don't inherit the handle (CLOEXEC really). + SetHandleInformation((HANDLE) u->s, HANDLE_FLAG_INHERIT, 0); + no = 0; + (void) setsockopt( + u->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); + + nni_win_io_init(&u->rxio, udp_recv_cb, u); + + if ((rv = nni_win_io_register((HANDLE) u->s)) != 0) { + nni_plat_udp_close(u); + return (rv); + } + + // We need to allow the concurrent until we switch to connected mode. + yes = 1; + (void) setsockopt( + u->s, SOL_SOCKET, SO_REUSEADDR, (char *) &yes, sizeof(yes)); + + // Bind the local address + if (bind(u->s, (struct sockaddr *) &ss, sslen) == SOCKET_ERROR) { + rv = nni_win_error(GetLastError()); + nni_plat_udp_close(u); + return (rv); + } + + // Connect, if we have a peer address + if (peer != NULL) { + if (connect(u->s, (struct sockaddr *) &ps, pslen) == + SOCKET_ERROR) { + rv = nni_win_error(GetLastError()); + nni_plat_udp_close(u); + return (rv); + } + u->connected = true; + u->peer = *peer; + } + + *udpp = u; + return (rv); +} + // nni_plat_udp_close closes the underlying UDP socket. void nni_plat_udp_close(nni_plat_udp *u) @@ -123,10 +210,13 @@ nni_plat_udp_send(nni_plat_udp *u, nni_aio *aio) DWORD nsent; nni_aio_reset(aio); - sa = nni_aio_get_input(aio, 0); - if ((tolen = nni_win_nn2sockaddr(&to, sa)) < 0) { - nni_aio_finish_error(aio, NNG_EADDRINVAL); - return; + + if (!u->connected) { + sa = nni_aio_get_input(aio, 0); + if ((tolen = nni_win_nn2sockaddr(&to, sa)) < 0) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } } nni_aio_get_iov(aio, &naiov, &aiov); @@ -150,10 +240,14 @@ nni_plat_udp_send(nni_plat_udp *u, nni_aio *aio) iov[i].len = (ULONG) aiov[i].iov_len; } - // We can use a "non-overlapping" send; there is little point in - // handling UDP send completions asynchronously. - rv = WSASendTo(u->s, iov, (DWORD) naiov, &nsent, 0, - (struct sockaddr *) &to, tolen, NULL, NULL); + // We can use a "non-overlapping" send; there is little point + // in handling UDP send completions asynchronously. + if (u->connected) { + rv = WSASend(u->s, iov, (DWORD) naiov, &nsent, 0, NULL, NULL); + } else { + rv = WSASendTo(u->s, iov, (DWORD) naiov, &nsent, 0, + (struct sockaddr *) &to, tolen, NULL, NULL); + } if (rv == SOCKET_ERROR) { rv = nni_win_error(GetLastError()); @@ -203,7 +297,7 @@ udp_recv_cb(nni_win_io *io, int rv, size_t num) } // convert address from Windows form... - if ((sa = nni_aio_get_input(aio, 0)) != NULL) { + if (((sa = nni_aio_get_input(aio, 0)) != NULL) && !u->connected) { if (nni_win_sockaddr2nn(sa, &u->rxsa, sizeof(u->rxsa)) != 0) { rv = NNG_EADDRINVAL; num = 0; @@ -259,8 +353,14 @@ again: // already. The actual aio's iov array we don't touch. flags = 0; - rv = WSARecvFrom(u->s, iov, (DWORD) naiov, NULL, &flags, - (struct sockaddr *) &u->rxsa, &u->rxsalen, &u->rxio.olpd, NULL); + if (u->connected) { + rv = WSARecv(u->s, iov, (DWORD) naiov, NULL, &flags, + &u->rxio.olpd, NULL); + } else { + rv = WSARecvFrom(u->s, iov, (DWORD) naiov, NULL, &flags, + (struct sockaddr *) &u->rxsa, &u->rxsalen, &u->rxio.olpd, + NULL); + } _freea(iov); @@ -309,6 +409,23 @@ nni_plat_udp_sockname(nni_plat_udp *udp, nni_sockaddr *sa) return (nni_win_sockaddr2nn(sa, &ss, sz)); } +int +nni_plat_udp_peername(nni_plat_udp *udp, nni_sockaddr *sa) +{ + SOCKADDR_STORAGE ss; + int sz; + + if (!udp->connected) { + return (NNG_ENOTCONN); + } + *sa = udp->peer; + sz = sizeof(*sa); + if (getsockname(udp->s, (SOCKADDR *) &ss, &sz) < 0) { + return (nni_win_error(GetLastError())); + } + return (nni_win_sockaddr2nn(sa, &ss, sz)); +} + // Joining a multicast group is different than binding to a multicast // group. This allows to receive both unicast and multicast at the given // address. -- cgit v1.2.3-70-g09d2