diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | src/core/platform.h | 29 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 12 | ||||
| -rw-r--r-- | src/platform/posix/posix_sockaddr.c | 93 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcp.c (renamed from src/platform/posix/posix_net.c) | 39 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 333 |
6 files changed, 471 insertions, 39 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 18d4932f..31abc042 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -87,13 +87,15 @@ set (NNG_SOURCES platform/posix/posix_debug.c platform/posix/posix_epdesc.c platform/posix/posix_ipc.c - platform/posix/posix_net.c platform/posix/posix_pipe.c platform/posix/posix_pipedesc.c platform/posix/posix_pollq_poll.c platform/posix/posix_rand.c platform/posix/posix_resolv_gai.c + platform/posix/posix_sockaddr.c + platform/posix/posix_tcp.c platform/posix/posix_thread.c + platform/posix/posix_udp.c platform/windows/win_impl.h platform/windows/win_clock.c diff --git a/src/core/platform.h b/src/core/platform.h index ed32c550..fa93916c 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -289,6 +289,35 @@ extern void nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *, nni_aio *); extern void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *, nni_aio *); // +// UDP support. UDP is not connection oriented, and only has the notion +// of being bound, sendto, and recvfrom. (It is possible to set up a +// connect call that semantically acts as a filter on recvfrom, but we +// don't use that.) Outbound packets will include the destination address +// in the AIO, and inbound packets include the source address in the AIO. +// For now we don't have more sophisticated options like setting the TTL. +// +typedef struct nni_plat_udp nni_plat_udp; + +// nni_plat_udp_open initializes a UDP socket, binding to the local +// address specified specified in the AIO. The remote address is +// not used. The resulting nni_plat_udp structure is returned in the +// the aio's a_pipe. +extern int nni_plat_udp_open(nni_plat_udp **, nni_sockaddr *); + +// nni_plat_udp_close closes the underlying UDP socket. +extern void nni_plat_udp_close(nni_plat_udp *); + +// nni_plat_udp_send sends the data in the aio to the the +// destination specified in the nni_aio. The iovs are the +// UDP payload. +extern void nni_plat_udp_send(nni_plat_udp *, nni_aio *); + +// nni_plat_udp_pipe_recv recvs a message, storing it in the iovs +// from the UDP payload. If the UDP payload will not fit, then +// NNG_EMSGSIZE results. +extern void nni_plat_udp_recv(nni_plat_udp *, nni_aio *); + +// // Notification Pipe Pairs // diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 72842076..46ebbc1d 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -21,17 +21,25 @@ #define PLATFORM_POSIX_DEBUG #define PLATFORM_POSIX_CLOCK #define PLATFORM_POSIX_IPC -#define PLATFORM_POSIX_NET +#define PLATFORM_POSIX_TCP #define PLATFORM_POSIX_PIPE #define PLATFORM_POSIX_RANDOM #define PLATFORM_POSIX_SOCKET #define PLATFORM_POSIX_THREAD #define PLATFORM_POSIX_PIPEDESC #define PLATFORM_POSIX_EPDESC +#define PLATFORM_POSIX_SOCKADDR +#define PLATFORM_POSIX_UDP #include "platform/posix/posix_config.h" #endif +#ifdef PLATFORM_POSIX_SOCKADDR +#include <sys/socket.h> +extern int nni_posix_sockaddr2nn(nni_sockaddr *, const void *); +extern int nni_posix_nn2sockaddr(void *, const nni_sockaddr *); +#endif + #ifdef PLATFORM_POSIX_DEBUG extern int nni_plat_errno(int); @@ -40,8 +48,6 @@ extern int nni_plat_errno(int); // Define types that this platform uses. #ifdef PLATFORM_POSIX_THREAD -extern int nni_plat_devnull; // open descriptor on /dev/null - #include <pthread.h> // These types are provided for here, to permit them to be directly inlined diff --git a/src/platform/posix/posix_sockaddr.c b/src/platform/posix/posix_sockaddr.c new file mode 100644 index 00000000..d3ab9c6d --- /dev/null +++ b/src/platform/posix/posix_sockaddr.c @@ -0,0 +1,93 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef PLATFORM_POSIX_SOCKADDR + +#include <arpa/inet.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> + +int +nni_posix_nn2sockaddr(void *sa, const nni_sockaddr *na) +{ + struct sockaddr_in * sin; + struct sockaddr_in6 * sin6; + const nng_sockaddr_in * nsin; + const nng_sockaddr_in6 *nsin6; + + switch (na->s_un.s_family) { + case NNG_AF_INET: + sin = (void *) sa; + nsin = &na->s_un.s_in; + memset(sin, 0, sizeof(*sin)); + sin->sin_family = PF_INET; + sin->sin_port = nsin->sa_port; + sin->sin_addr.s_addr = nsin->sa_addr; + return (sizeof(*sin)); + + case NNG_AF_INET6: + sin6 = (void *) sa; + nsin6 = &na->s_un.s_in6; + memset(sin6, 0, sizeof(*sin6)); +#ifdef SIN6_LEN + sin6->sin6_len = sizeof(*sin6); +#endif + sin6->sin6_family = PF_INET6; + sin6->sin6_port = nsin6->sa_port; + memcpy(sin6->sin6_addr.s6_addr, nsin6->sa_addr, 16); + return (sizeof(*sin6)); + } + return (-1); +} + +int +nni_posix_sockaddr2nn(nni_sockaddr *na, const void *sa) +{ + const struct sockaddr_in * sin; + const struct sockaddr_in6 *sin6; + nng_sockaddr_in * nsin; + nng_sockaddr_in6 * nsin6; + + switch (((struct sockaddr *) sa)->sa_family) { + case AF_INET: + sin = (void *) sa; + nsin = &na->s_un.s_in; + nsin->sa_family = NNG_AF_INET; + nsin->sa_port = sin->sin_port; + nsin->sa_addr = sin->sin_addr.s_addr; + break; + case AF_INET6: + sin6 = (void *) sa; + nsin6 = &na->s_un.s_in6; + nsin6->sa_family = NNG_AF_INET6; + nsin6->sa_port = sin6->sin6_port; + memcpy(nsin6->sa_addr, sin6->sin6_addr.s6_addr, 16); + break; + default: + // We should never see this - the OS should always be + // specific about giving us either AF_INET or AF_INET6. + // Other address families are not handled here. + return (-1); + } + return (0); +} + +#else + +// Suppress empty symbols warnings in ranlib. +int nni_posix_sockaddr_not_used = 0; + +#endif // PLATFORM_POSIX_SOCKADDR diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_tcp.c index 69c0e772..4efbf47d 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_tcp.c @@ -9,14 +9,12 @@ #include "core/nng_impl.h" -#ifdef PLATFORM_POSIX_NET +#ifdef PLATFORM_POSIX_TCP #include "platform/posix/posix_aio.h" -#include <arpa/inet.h> #include <errno.h> #include <fcntl.h> #include <netinet/in.h> -#include <netinet/tcp.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -25,35 +23,6 @@ #include <sys/uio.h> #include <unistd.h> -static int -nni_posix_tcp_addr(struct sockaddr_storage *ss, const nni_sockaddr *sa) -{ - struct sockaddr_in * sin; - struct sockaddr_in6 *sin6; - - switch (sa->s_un.s_family) { - case NNG_AF_INET: - sin = (void *) ss; - memset(sin, 0, sizeof(*sin)); - sin->sin_family = PF_INET; - sin->sin_port = sa->s_un.s_in.sa_port; - sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; - return (sizeof(*sin)); - - case NNG_AF_INET6: - sin6 = (void *) ss; - memset(sin6, 0, sizeof(*sin6)); -#ifdef SIN6_LEN - sin6->sin6_len = sizeof(*sin6); -#endif - sin6->sin6_family = PF_INET6; - sin6->sin6_port = sa->s_un.s_in6.sa_port; - memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); - return (sizeof(*sin6)); - } - return (-1); -} - extern int nni_tcp_parse_url(char *, char **, char **, char **, char **); int @@ -112,7 +81,7 @@ nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode) if ((rv = nni_aio_result(&aio)) != 0) { goto done; } - len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]); + len = nni_posix_nn2sockaddr((void *) &ss, &aio.a_addrs[0]); nni_posix_epdesc_set_remote(ed, &ss, len); } @@ -122,7 +91,7 @@ nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode) if ((rv = nni_aio_result(&aio)) != 0) { goto done; } - len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]); + len = nni_posix_nn2sockaddr((void *) &ss, &aio.a_addrs[0]); nni_posix_epdesc_set_local(ed, &ss, len); } nni_aio_fini(&aio); @@ -196,4 +165,4 @@ nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) // Suppress empty symbols warnings in ranlib. int nni_posix_net_not_used = 0; -#endif // PLATFORM_POSIX_NET +#endif // PLATFORM_POSIX_TCP diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c new file mode 100644 index 00000000..49e06814 --- /dev/null +++ b/src/platform/posix/posix_udp.c @@ -0,0 +1,333 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef PLATFORM_POSIX_UDP +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_pollq.h" + +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> + +// UDP support. + +// If we can suppress SIGPIPE on send, please do so. +#ifdef MSG_NOSIGNAL +#define NNI_MSG_NOSIGNAL MSG_NOSIGNAL +#else +#define NNI_MSG_NOSIGNAL 0 +#endif + +struct nni_plat_udp { + nni_posix_pollq_node udp_pitem; + int udp_fd; + int udp_closed; + nni_list udp_recvq; + nni_list udp_sendq; + nni_mtx udp_mtx; +}; + +static void +nni_posix_udp_doclose(nni_plat_udp *udp) +{ + nni_aio *aio; + + udp->udp_closed = 1; + while ((aio = nni_list_first(&udp->udp_recvq)) != NULL) { + nni_list_remove(&udp->udp_recvq, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + while ((aio = nni_list_first(&udp->udp_sendq)) != NULL) { + nni_list_remove(&udp->udp_recvq, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + // Underlying socket left open until close API called. +} + +static void +nni_posix_udp_dorecv(nni_plat_udp *udp) +{ + nni_aio * aio; + struct sockaddr_storage ss; + struct msghdr hdr; + int niov; + int rv; + nni_list * q = &udp->udp_recvq; + + // While we're able to recv, do so. + while ((aio = nni_list_first(q)) != NULL) { + nni_list_remove(q, aio); + + for (niov = 0; niov < aio->a_niov; niov++) { + hdr.msg_iov[niov].iov_base = aio->a_iov[niov].iov_buf; + hdr.msg_iov[niov].iov_len = aio->a_iov[niov].iov_len; + } + hdr.msg_iovlen = niov; + hdr.msg_name = &ss; + hdr.msg_namelen = sizeof(ss); + hdr.msg_flags = 0; + hdr.msg_control = NULL; + hdr.msg_controllen = 0; + rv = recvmsg(udp->udp_fd, &hdr, 0); + if (rv < 0) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + // No data available at socket. Return + // the AIO to the head of the queue. + nni_list_prepend(q, aio); + return; + } + rv = nni_plat_errno(errno); + nni_aio_finish(aio, rv, 0); + continue; + } + + // We need to store the address information. + // It is incumbent on the AIO submitter to supply + // storage for the address. + if (aio->a_naddrs > 0) { + nni_posix_sockaddr2nn(&aio->a_addrs[0], (void *) &ss); + } + + nni_aio_finish(aio, 0, rv); + } +} + +static void +nni_posix_udp_dosend(nni_plat_udp *udp) +{ + // XXX: TBD. + nni_aio * aio; + struct sockaddr_storage ss; + struct msghdr hdr; + int niov; + int rv; + int len; + nni_list * q = &udp->udp_sendq; + + // While we're able to recv, do so. + while ((aio = nni_list_first(q)) != NULL) { + nni_list_remove(q, aio); + + if (aio->a_naddrs < 1) { + // No incoming address? + nni_aio_finish(aio, NNG_EADDRINVAL, 0); + return; + } + len = nni_posix_nn2sockaddr(&ss, &aio->a_addrs[0]); + if (len < 0) { + nni_aio_finish(aio, NNG_EADDRINVAL, 0); + return; + } + + for (niov = 0; niov < aio->a_niov; niov++) { + hdr.msg_iov[niov].iov_base = aio->a_iov[niov].iov_buf; + hdr.msg_iov[niov].iov_len = aio->a_iov[niov].iov_len; + } + hdr.msg_iovlen = niov; + hdr.msg_name = &ss; + hdr.msg_namelen = len; + hdr.msg_flags = NNI_MSG_NOSIGNAL; + hdr.msg_control = NULL; + hdr.msg_controllen = 0; + + rv = sendmsg(udp->udp_fd, &hdr, 0); + if (rv < 0) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + // Cannot send (buffers full), return to + // head of queue. + nni_list_prepend(q, aio); + return; + } + rv = nni_plat_errno(errno); + nni_aio_finish(aio, rv, 0); + continue; + } + + nni_aio_finish(aio, 0, rv); + } +} + +// This function is called by the poller on activity on the FD. +static void +nni_posix_udp_cb(void *arg) +{ + nni_plat_udp *udp = arg; + int revents; + int events = 0; + + nni_mtx_lock(&udp->udp_mtx); + revents = udp->udp_pitem.revents; + if (revents & POLLIN) { + nni_posix_udp_dorecv(udp); + } + if (revents & POLLOUT) { + nni_posix_udp_dosend(udp); + } + if (revents & (POLLHUP | POLLERR | POLLNVAL)) { + nni_posix_udp_doclose(udp); + } else { + if (!nni_list_empty(&udp->udp_sendq)) { + events |= POLLOUT; + } + if (!nni_list_empty(&udp->udp_recvq)) { + events |= POLLIN; + } + if (events) { + nni_posix_pollq_arm(&udp->udp_pitem, events); + } + } + nni_mtx_unlock(&udp->udp_mtx); +} + +int +nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) +{ + nni_plat_udp * udp; + int salen; + struct sockaddr_storage sa; + int rv; + + if ((salen = nni_posix_nn2sockaddr(&sa, bindaddr)) < 0) { + return (NNG_EADDRINVAL); + } + + // UDP opens can actually run synchronously. + if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mtx_init(&udp->udp_mtx)) != 0) { + NNI_FREE_STRUCT(udp); + return (rv); + } + + udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP); + if (udp->udp_fd < 0) { + rv = nni_plat_errno(errno); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } + + if (bind(udp->udp_fd, (void *) &sa, salen) != 0) { + rv = nni_plat_errno(errno); + (void) close(udp->udp_fd); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } + udp->udp_pitem.fd = udp->udp_fd; + udp->udp_pitem.cb = nni_posix_udp_cb; + udp->udp_pitem.data = udp; + + nni_aio_list_init(&udp->udp_recvq); + nni_aio_list_init(&udp->udp_sendq); + + rv = nni_posix_pollq_add( + nni_posix_pollq_get(udp->udp_fd), &udp->udp_pitem); + if (rv != 0) { + (void) close(udp->udp_fd); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); + return (rv); + } + + *upp = udp; + return (0); +} + +void +nni_plat_udp_close(nni_plat_udp *udp) +{ + nni_aio *aio; + + nni_mtx_lock(&udp->udp_mtx); + if (udp->udp_closed) { + // The only way this happens is in response to a callback that + // is being canceled. Double close from user code is a bug. + nni_mtx_unlock(&udp->udp_mtx); + return; + } + + // We're no longer interested in events. + nni_posix_pollq_remove(&udp->udp_pitem); + + nni_posix_udp_doclose(udp); + nni_mtx_unlock(&udp->udp_mtx); + + (void) close(udp->udp_fd); + nni_mtx_fini(&udp->udp_mtx); + NNI_FREE_STRUCT(udp); +} + +void +nni_plat_udp_cancel(nni_aio *aio) +{ + nni_plat_udp *udp = aio->a_prov_data; + + nni_mtx_lock(&udp->udp_mtx); + nni_aio_list_remove(aio); + nni_mtx_unlock(&udp->udp_mtx); +} + +void +nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio) +{ + nni_mtx_lock(&udp->udp_mtx); + if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) { + nni_mtx_unlock(&udp->udp_mtx); + return; + } + + if (udp->udp_closed) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&udp->udp_mtx); + return; + } + + nni_list_append(&udp->udp_recvq, aio); + nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); + nni_mtx_unlock(&udp->udp_mtx); +} + +void +nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio) +{ + nni_mtx_lock(&udp->udp_mtx); + if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) { + nni_mtx_unlock(&udp->udp_mtx); + return; + } + + if (udp->udp_closed) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&udp->udp_mtx); + return; + } + + nni_list_append(&udp->udp_sendq, aio); + nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); + nni_mtx_unlock(&udp->udp_mtx); +} + +#else + +// Suppress empty symbols warnings in ranlib. +int nni_posix_udp_not_used = 0; + +#endif // PLATFORM_POSIX_UDP |
