From 9e5149973d5d16efec14ee12e62de23198b325ce Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 21 Jul 2017 14:03:08 -0700 Subject: Initial swag at UDP (POSIX only) low level handling. This includes async send and recv, driven from the poller. This will be requierd to support the underlying UDP and ZeroTier transports in the future. (ZeroTier is getting done first.) --- src/CMakeLists.txt | 4 +- src/core/platform.h | 29 ++++ src/platform/posix/posix_impl.h | 12 +- src/platform/posix/posix_net.c | 199 --------------------- src/platform/posix/posix_sockaddr.c | 93 ++++++++++ src/platform/posix/posix_tcp.c | 168 ++++++++++++++++++ src/platform/posix/posix_udp.c | 333 ++++++++++++++++++++++++++++++++++++ 7 files changed, 635 insertions(+), 203 deletions(-) delete mode 100644 src/platform/posix/posix_net.c create mode 100644 src/platform/posix/posix_sockaddr.c create mode 100644 src/platform/posix/posix_tcp.c create mode 100644 src/platform/posix/posix_udp.c (limited to 'src') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 18d4932f..31abc042 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -87,13 +87,15 @@ set (NNG_SOURCES platform/posix/posix_debug.c platform/posix/posix_epdesc.c platform/posix/posix_ipc.c - platform/posix/posix_net.c platform/posix/posix_pipe.c platform/posix/posix_pipedesc.c platform/posix/posix_pollq_poll.c platform/posix/posix_rand.c platform/posix/posix_resolv_gai.c + platform/posix/posix_sockaddr.c + platform/posix/posix_tcp.c platform/posix/posix_thread.c + platform/posix/posix_udp.c platform/windows/win_impl.h platform/windows/win_clock.c diff --git a/src/core/platform.h b/src/core/platform.h index ed32c550..fa93916c 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -288,6 +288,35 @@ extern void nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *, nni_aio *); // The platform may modify the iovs. extern void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *, nni_aio *); +// +// UDP support. UDP is not connection oriented, and only has the notion +// of being bound, sendto, and recvfrom. (It is possible to set up a +// connect call that semantically acts as a filter on recvfrom, but we +// don't use that.) Outbound packets will include the destination address +// in the AIO, and inbound packets include the source address in the AIO. +// For now we don't have more sophisticated options like setting the TTL. +// +typedef struct nni_plat_udp nni_plat_udp; + +// nni_plat_udp_open initializes a UDP socket, binding to the local +// address specified specified in the AIO. The remote address is +// not used. The resulting nni_plat_udp structure is returned in the +// the aio's a_pipe. +extern int nni_plat_udp_open(nni_plat_udp **, nni_sockaddr *); + +// nni_plat_udp_close closes the underlying UDP socket. +extern void nni_plat_udp_close(nni_plat_udp *); + +// nni_plat_udp_send sends the data in the aio to the the +// destination specified in the nni_aio. The iovs are the +// UDP payload. +extern void nni_plat_udp_send(nni_plat_udp *, nni_aio *); + +// nni_plat_udp_pipe_recv recvs a message, storing it in the iovs +// from the UDP payload. If the UDP payload will not fit, then +// NNG_EMSGSIZE results. +extern void nni_plat_udp_recv(nni_plat_udp *, nni_aio *); + // // Notification Pipe Pairs // diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 72842076..46ebbc1d 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -21,17 +21,25 @@ #define PLATFORM_POSIX_DEBUG #define PLATFORM_POSIX_CLOCK #define PLATFORM_POSIX_IPC -#define PLATFORM_POSIX_NET +#define PLATFORM_POSIX_TCP #define PLATFORM_POSIX_PIPE #define PLATFORM_POSIX_RANDOM #define PLATFORM_POSIX_SOCKET #define PLATFORM_POSIX_THREAD #define PLATFORM_POSIX_PIPEDESC #define PLATFORM_POSIX_EPDESC +#define PLATFORM_POSIX_SOCKADDR +#define PLATFORM_POSIX_UDP #include "platform/posix/posix_config.h" #endif +#ifdef PLATFORM_POSIX_SOCKADDR +#include +extern int nni_posix_sockaddr2nn(nni_sockaddr *, const void *); +extern int nni_posix_nn2sockaddr(void *, const nni_sockaddr *); +#endif + #ifdef PLATFORM_POSIX_DEBUG extern int nni_plat_errno(int); @@ -40,8 +48,6 @@ extern int nni_plat_errno(int); // Define types that this platform uses. #ifdef PLATFORM_POSIX_THREAD -extern int nni_plat_devnull; // open descriptor on /dev/null - #include // These types are provided for here, to permit them to be directly inlined diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c deleted file mode 100644 index 69c0e772..00000000 --- a/src/platform/posix/posix_net.c +++ /dev/null @@ -1,199 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef PLATFORM_POSIX_NET -#include "platform/posix/posix_aio.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -static int -nni_posix_tcp_addr(struct sockaddr_storage *ss, const nni_sockaddr *sa) -{ - struct sockaddr_in * sin; - struct sockaddr_in6 *sin6; - - switch (sa->s_un.s_family) { - case NNG_AF_INET: - sin = (void *) ss; - memset(sin, 0, sizeof(*sin)); - sin->sin_family = PF_INET; - sin->sin_port = sa->s_un.s_in.sa_port; - sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; - return (sizeof(*sin)); - - case NNG_AF_INET6: - sin6 = (void *) ss; - memset(sin6, 0, sizeof(*sin6)); -#ifdef SIN6_LEN - sin6->sin6_len = sizeof(*sin6); -#endif - sin6->sin6_family = PF_INET6; - sin6->sin6_port = sa->s_un.s_in6.sa_port; - memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); - return (sizeof(*sin6)); - } - return (-1); -} - -extern int nni_tcp_parse_url(char *, char **, char **, char **, char **); - -int -nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode) -{ - nni_posix_epdesc * ed; - char buf[NNG_MAXADDRLEN]; - int rv; - char * lhost, *rhost; - char * lserv, *rserv; - char * sep; - struct sockaddr_storage ss; - int len; - int passive; - nni_aio aio; - - if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { - return (rv); - } - - // Make a local copy. - snprintf(buf, sizeof(buf), "%s", url); - nni_aio_init(&aio, NULL, NULL); - - if (mode == NNI_EP_MODE_DIAL) { - rv = nni_tcp_parse_url(buf, &rhost, &rserv, &lhost, &lserv); - if (rv != 0) { - goto done; - } - - // We have to have a remote destination! - if ((rhost == NULL) || (rserv == NULL)) { - rv = NNG_EADDRINVAL; - goto done; - } - } else { - rv = nni_tcp_parse_url(buf, &lhost, &lserv, &rhost, &rserv); - if (rv != 0) { - goto done; - } - if ((rhost != NULL) || (rserv != NULL)) { - // remotes are nonsensical here. - rv = NNG_EADDRINVAL; - goto done; - } - if (lserv == NULL) { - // missing port to listen on! - rv = NNG_EADDRINVAL; - goto done; - } - } - - if ((rserv != NULL) || (rhost != NULL)) { - nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, 0, &aio); - nni_aio_wait(&aio); - if ((rv = nni_aio_result(&aio)) != 0) { - goto done; - } - len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]); - nni_posix_epdesc_set_remote(ed, &ss, len); - } - - if ((lserv != NULL) || (lhost != NULL)) { - nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, 1, &aio); - nni_aio_wait(&aio); - if ((rv = nni_aio_result(&aio)) != 0) { - goto done; - } - len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]); - nni_posix_epdesc_set_local(ed, &ss, len); - } - nni_aio_fini(&aio); - *epp = (void *) ed; - return (0); - -done: - if (rv != 0) { - nni_posix_epdesc_fini(ed); - } - nni_aio_fini(&aio); - return (rv); -} - -void -nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_fini((void *) ep); -} - -void -nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) -{ - nni_posix_epdesc_close((void *) ep); -} - -int -nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep) -{ - return (nni_posix_epdesc_listen((void *) ep)); -} - -void -nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - return (nni_posix_epdesc_connect((void *) ep, aio)); -} - -void -nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) -{ - return (nni_posix_epdesc_accept((void *) ep, aio)); -} - -void -nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_fini((void *) p); -} - -void -nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p) -{ - nni_posix_pipedesc_close((void *) p); -} - -void -nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_send((void *) p, aio); -} - -void -nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_recv((void *) p, aio); -} - -#else - -// Suppress empty symbols warnings in ranlib. -int nni_posix_net_not_used = 0; - -#endif // PLATFORM_POSIX_NET diff --git a/src/platform/posix/posix_sockaddr.c b/src/platform/posix/posix_sockaddr.c new file mode 100644 index 00000000..d3ab9c6d --- /dev/null +++ b/src/platform/posix/posix_sockaddr.c @@ -0,0 +1,93 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef PLATFORM_POSIX_SOCKADDR + +#include +#include +#include +#include +#include +#include +#include + +int +nni_posix_nn2sockaddr(void *sa, const nni_sockaddr *na) +{ + struct sockaddr_in * sin; + struct sockaddr_in6 * sin6; + const nng_sockaddr_in * nsin; + const nng_sockaddr_in6 *nsin6; + + switch (na->s_un.s_family) { + case NNG_AF_INET: + sin = (void *) sa; + nsin = &na->s_un.s_in; + memset(sin, 0, sizeof(*sin)); + sin->sin_family = PF_INET; + sin->sin_port = nsin->sa_port; + sin->sin_addr.s_addr = nsin->sa_addr; + return (sizeof(*sin)); + + case NNG_AF_INET6: + sin6 = (void *) sa; + nsin6 = &na->s_un.s_in6; + memset(sin6, 0, sizeof(*sin6)); +#ifdef SIN6_LEN + sin6->sin6_len = sizeof(*sin6); +#endif + sin6->sin6_family = PF_INET6; + sin6->sin6_port = nsin6->sa_port; + memcpy(sin6->sin6_addr.s6_addr, nsin6->sa_addr, 16); + return (sizeof(*sin6)); + } + return (-1); +} + +int +nni_posix_sockaddr2nn(nni_sockaddr *na, const void *sa) +{ + const struct sockaddr_in * sin; + const struct sockaddr_in6 *sin6; + nng_sockaddr_in * nsin; + nng_sockaddr_in6 * nsin6; + + switch (((struct sockaddr *) sa)->sa_family) { + case AF_INET: + sin = (void *) sa; + nsin = &na->s_un.s_in; + nsin->sa_family = NNG_AF_INET; + nsin->sa_port = sin->sin_port; + nsin->sa_addr = sin->sin_addr.s_addr; + break; + case AF_INET6: + sin6 = (void *) sa; + nsin6 = &na->s_un.s_in6; + nsin6->sa_family = NNG_AF_INET6; + nsin6->sa_port = sin6->sin6_port; + memcpy(nsin6->sa_addr, sin6->sin6_addr.s6_addr, 16); + break; + default: + // We should never see this - the OS should always be + // specific about giving us either AF_INET or AF_INET6. + // Other address families are not handled here. + return (-1); + } + return (0); +} + +#else + +// Suppress empty symbols warnings in ranlib. +int nni_posix_sockaddr_not_used = 0; + +#endif // PLATFORM_POSIX_SOCKADDR diff --git a/src/platform/posix/posix_tcp.c b/src/platform/posix/posix_tcp.c new file mode 100644 index 00000000..4efbf47d --- /dev/null +++ b/src/platform/posix/posix_tcp.c @@ -0,0 +1,168 @@ +// +// Copyright 2017 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef PLATFORM_POSIX_TCP +#include "platform/posix/posix_aio.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern int nni_tcp_parse_url(char *, char **, char **, char **, char **); + +int +nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode) +{ + nni_posix_epdesc * ed; + char buf[NNG_MAXADDRLEN]; + int rv; + char * lhost, *rhost; + char * lserv, *rserv; + char * sep; + struct sockaddr_storage ss; + int len; + int passive; + nni_aio aio; + + if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { + return (rv); + } + + // Make a local copy. + snprintf(buf, sizeof(buf), "%s", url); + nni_aio_init(&aio, NULL, NULL); + + if (mode == NNI_EP_MODE_DIAL) { + rv = nni_tcp_parse_url(buf, &rhost, &rserv, &lhost, &lserv); + if (rv != 0) { + goto done; + } + + // We have to have a remote destination! + if ((rhost == NULL) || (rserv == NULL)) { + rv = NNG_EADDRINVAL; + goto done; + } + } else { + rv = nni_tcp_parse_url(buf, &lhost, &lserv, &rhost, &rserv); + if (rv != 0) { + goto done; + } + if ((rhost != NULL) || (rserv != NULL)) { + // remotes are nonsensical here. + rv = NNG_EADDRINVAL; + goto done; + } + if (lserv == NULL) { + // missing port to listen on! + rv = NNG_EADDRINVAL; + goto done; + } + } + + if ((rserv != NULL) || (rhost != NULL)) { + nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, 0, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) != 0) { + goto done; + } + len = nni_posix_nn2sockaddr((void *) &ss, &aio.a_addrs[0]); + nni_posix_epdesc_set_remote(ed, &ss, len); + } + + if ((lserv != NULL) || (lhost != NULL)) { + nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, 1, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) != 0) { + goto done; + } + len = nni_posix_nn2sockaddr((void *) &ss, &aio.a_addrs[0]); + nni_posix_epdesc_set_local(ed, &ss, len); + } + nni_aio_fini(&aio); + *epp = (void *) ed; + return (0); + +done: + if (rv != 0) { + nni_posix_epdesc_fini(ed); + } + nni_aio_fini(&aio); + return (rv); +} + +void +nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) +{ + nni_posix_epdesc_fini((void *) ep); +} + +void +nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) +{ + nni_posix_epdesc_close((void *) ep); +} + +int +nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep) +{ + return (nni_posix_epdesc_listen((void *) ep)); +} + +void +nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) +{ + return (nni_posix_epdesc_connect((void *) ep, aio)); +} + +void +nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) +{ + return (nni_posix_epdesc_accept((void *) ep, aio)); +} + +void +nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p) +{ + nni_posix_pipedesc_fini((void *) p); +} + +void +nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p) +{ + nni_posix_pipedesc_close((void *) p); +} + +void +nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio) +{ + nni_posix_pipedesc_send((void *) p, aio); +} + +void +nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) +{ + nni_posix_pipedesc_recv((void *) p, aio); +} + +#else + +// Suppress empty symbols warnings in ranlib. +int nni_posix_net_not_used = 0; + +#endif // PLATFORM_POSIX_TCP diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c new file mode 100644 index 00000000..49e06814 --- /dev/null +++ b/src/platform/posix/posix_udp.c @@ -0,0 +1,333 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef PLATFORM_POSIX_UDP +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_pollq.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// UDP support. + +// If we can suppress SIGPIPE on send, please do so. +#ifdef MSG_NOSIGNAL +#define NNI_MSG_NOSIGNAL MSG_NOSIGNAL +#else +#define NNI_MSG_NOSIGNAL 0 +#endif + +struct nni_plat_udp { + nni_posix_pollq_node udp_pitem; + int udp_fd; + int udp_closed; + nni_list udp_recvq; + nni_list udp_sendq; + nni_mtx udp_mtx; +}; + +static void +nni_posix_udp_doclose(nni_plat_udp *udp) +{ + nni_aio *aio; + + udp->udp_closed = 1; + while ((aio = nni_list_first(&udp->udp_recvq)) != NULL) { + nni_list_remove(&udp->udp_recvq, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + while ((aio = nni_list_first(&udp->udp_sendq)) != NULL) { + nni_list_remove(&udp->udp_recvq, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + // Underlying socket left open until close API called. +} + +static void +nni_posix_udp_dorecv(nni_plat_udp *udp) +{ + nni_aio * aio; + struct sockaddr_storage ss; + struct msghdr hdr; + int niov; + int rv; + nni_list * q = &udp->udp_recvq; + + // While we're able to recv, do so. + while ((aio = nni_list_first(q)) != NULL) { + nni_list_remove(q, aio); + + for (niov = 0; niov < aio->a_niov; niov++) { + hdr.msg_iov[niov].iov_base = aio->a_iov[niov].iov_buf; + hdr.msg_iov[niov].iov_len = aio->a_iov[niov].iov_len; + } + hdr.msg_iovlen = niov; + hdr.msg_name = &ss; + hdr.msg_namelen = sizeof(ss); + hdr.msg_flags = 0; + hdr.msg_control = NULL; + hdr.msg_controllen = 0; + rv = recvmsg(udp->udp_fd, &hdr, 0); + if (rv < 0) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + // No data available at socket. Return + // the AIO to the head of the queue. + nni_list_prepend(q, aio); + return; + } + rv = nni_plat_errno(errno); + nni_aio_finish(aio, rv, 0); + continue; + } + + // We need to store the address information. + // It is incumbent on the AIO submitter to supply + // storage for the address. + if (aio->a_naddrs > 0) { + nni_posix_sockaddr2nn(&aio->a_addrs[0], (void *) &ss); + } + + nni_aio_finish(aio, 0, rv); + } +} + +static void +nni_posix_udp_dosend(nni_plat_udp *udp) +{ + // XXX: TBD. + nni_aio * aio; + struct sockaddr_storage ss; + struct msghdr hdr; + int niov; + int rv; + int len; + nni_list * q = &udp->udp_sendq; + + // While we're able to recv, do so. + while ((aio = nni_list_first(q)) != NULL) { + nni_list_remove(q, aio); + + if (aio->a_naddrs < 1) { + // No incoming address? + nni_aio_finish(aio, NNG_EADDRINVAL, 0); + return; + } + len = nni_posix_nn2sockaddr(&ss, &aio->a_addrs[0]); + if (len < 0) { + nni_aio_finish(aio, NNG_EADDRINVAL, 0); + return; + } + + for (niov = 0; niov < aio->a_niov; niov++) { + hdr.msg_iov[niov].iov_base = aio->a_iov[niov].iov_buf; + hdr.msg_iov[niov].iov_len = aio->a_iov[niov].iov_len; + } + hdr.msg_iovlen = niov; + hdr.msg_name = &ss; + hdr.msg_namelen = len; + hdr.msg_flags = NNI_MSG_NOSIGNAL; + hdr.msg_control = NULL; + hdr.msg_controllen = 0; + + rv = sendmsg(udp->udp_fd, &hdr, 0); + if (rv < 0) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + // Cannot send (buffers full), return to + // head of queue. + nni_list_prepend(q, aio); + return; + } + rv = nni_plat_errno(errno); + nni_aio_finish(aio, rv, 0); + continue; + } + + nni_aio_finish(aio, 0, rv); + } +} + +// This function is called by the poller on activity on the FD. +static void +nni_posix_udp_cb(void *arg) +{ + nni_plat_udp *udp = arg; + int revents; + int events = 0; + + nni_mtx_lock(&udp->udp_mtx); + revents = udp->udp_pitem.revents; + if (revents & POLLIN) { + nni_posix_udp_dorecv(udp); + } + if (revents & POLLOUT) { + nni_posix_udp_dosend(udp); + } + if (revents & (POLLHUP | POLLERR | POLLNVAL)) { + nni_posix_udp_doclose(udp); + } else { + if (!nni_list_empty(&udp->udp_sendq)) { + events |= POLLOUT; + } + if (!nni_list_empty(&udp->udp_recvq)) { + events |= POLLIN; + } + if (events) { + nni_posix_pollq_arm(&udp->udp_pitem, events); + } + } + nni_mtx_unlock(&udp->udp_mtx); +} + +int +nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) +{ + nni_plat_udp * udp; + int salen; + struct sockaddr_storage sa; + int rv; + + if ((salen = nni_posix_nn2sockaddr(&sa, bindaddr)) < 0) { + return (NNG_EADDRINVAL); + } + + // UDP opens can actually run synchronously. + if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mtx_init(&udp->udp_mtx)) != 0) { + NNI_FREE_STRUCT(udp); + return (rv); + } + + udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP); + if (udp->udp_fd < 0) { + rv = nni_plat_errno(errno); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } + + if (bind(udp->udp_fd, (void *) &sa, salen) != 0) { + rv = nni_plat_errno(errno); + (void) close(udp->udp_fd); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } + udp->udp_pitem.fd = udp->udp_fd; + udp->udp_pitem.cb = nni_posix_udp_cb; + udp->udp_pitem.data = udp; + + nni_aio_list_init(&udp->udp_recvq); + nni_aio_list_init(&udp->udp_sendq); + + rv = nni_posix_pollq_add( + nni_posix_pollq_get(udp->udp_fd), &udp->udp_pitem); + if (rv != 0) { + (void) close(udp->udp_fd); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } + + *upp = udp; + return (0); +} + +void +nni_plat_udp_close(nni_plat_udp *udp) +{ + nni_aio *aio; + + nni_mtx_lock(&udp->udp_mtx); + if (udp->udp_closed) { + // The only way this happens is in response to a callback that + // is being canceled. Double close from user code is a bug. + nni_mtx_unlock(&udp->udp_mtx); + return; + } + + // We're no longer interested in events. + nni_posix_pollq_remove(&udp->udp_pitem); + + nni_posix_udp_doclose(udp); + nni_mtx_unlock(&udp->udp_mtx); + + (void) close(udp->udp_fd); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); +} + +void +nni_plat_udp_cancel(nni_aio *aio) +{ + nni_plat_udp *udp = aio->a_prov_data; + + nni_mtx_lock(&udp->udp_mtx); + nni_aio_list_remove(aio); + nni_mtx_unlock(&udp->udp_mtx); +} + +void +nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio) +{ + nni_mtx_lock(&udp->udp_mtx); + if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) { + nni_mtx_unlock(&udp->udp_mtx); + return; + } + + if (udp->udp_closed) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&udp->udp_mtx); + return; + } + + nni_list_append(&udp->udp_recvq, aio); + nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); + nni_mtx_unlock(&udp->udp_mtx); +} + +void +nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio) +{ + nni_mtx_lock(&udp->udp_mtx); + if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) { + nni_mtx_unlock(&udp->udp_mtx); + return; + } + + if (udp->udp_closed) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&udp->udp_mtx); + return; + } + + nni_list_append(&udp->udp_sendq, aio); + nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); + nni_mtx_unlock(&udp->udp_mtx); +} + +#else + +// Suppress empty symbols warnings in ranlib. +int nni_posix_udp_not_used = 0; + +#endif // PLATFORM_POSIX_UDP -- cgit v1.2.3-70-g09d2