aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt4
-rw-r--r--src/core/platform.h29
-rw-r--r--src/platform/posix/posix_impl.h12
-rw-r--r--src/platform/posix/posix_sockaddr.c93
-rw-r--r--src/platform/posix/posix_tcp.c (renamed from src/platform/posix/posix_net.c)39
-rw-r--r--src/platform/posix/posix_udp.c333
6 files changed, 471 insertions, 39 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 18d4932f..31abc042 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -87,13 +87,15 @@ set (NNG_SOURCES
platform/posix/posix_debug.c
platform/posix/posix_epdesc.c
platform/posix/posix_ipc.c
- platform/posix/posix_net.c
platform/posix/posix_pipe.c
platform/posix/posix_pipedesc.c
platform/posix/posix_pollq_poll.c
platform/posix/posix_rand.c
platform/posix/posix_resolv_gai.c
+ platform/posix/posix_sockaddr.c
+ platform/posix/posix_tcp.c
platform/posix/posix_thread.c
+ platform/posix/posix_udp.c
platform/windows/win_impl.h
platform/windows/win_clock.c
diff --git a/src/core/platform.h b/src/core/platform.h
index ed32c550..fa93916c 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -289,6 +289,35 @@ extern void nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *, nni_aio *);
extern void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *, nni_aio *);
//
+// UDP support. UDP is not connection oriented, and only has the notion
+// of being bound, sendto, and recvfrom. (It is possible to set up a
+// connect call that semantically acts as a filter on recvfrom, but we
+// don't use that.) Outbound packets will include the destination address
+// in the AIO, and inbound packets include the source address in the AIO.
+// For now we don't have more sophisticated options like setting the TTL.
+//
+typedef struct nni_plat_udp nni_plat_udp;
+
+// nni_plat_udp_open initializes a UDP socket, binding to the local
+// address specified specified in the AIO. The remote address is
+// not used. The resulting nni_plat_udp structure is returned in the
+// the aio's a_pipe.
+extern int nni_plat_udp_open(nni_plat_udp **, nni_sockaddr *);
+
+// nni_plat_udp_close closes the underlying UDP socket.
+extern void nni_plat_udp_close(nni_plat_udp *);
+
+// nni_plat_udp_send sends the data in the aio to the the
+// destination specified in the nni_aio. The iovs are the
+// UDP payload.
+extern void nni_plat_udp_send(nni_plat_udp *, nni_aio *);
+
+// nni_plat_udp_pipe_recv recvs a message, storing it in the iovs
+// from the UDP payload. If the UDP payload will not fit, then
+// NNG_EMSGSIZE results.
+extern void nni_plat_udp_recv(nni_plat_udp *, nni_aio *);
+
+//
// Notification Pipe Pairs
//
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 72842076..46ebbc1d 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -21,17 +21,25 @@
#define PLATFORM_POSIX_DEBUG
#define PLATFORM_POSIX_CLOCK
#define PLATFORM_POSIX_IPC
-#define PLATFORM_POSIX_NET
+#define PLATFORM_POSIX_TCP
#define PLATFORM_POSIX_PIPE
#define PLATFORM_POSIX_RANDOM
#define PLATFORM_POSIX_SOCKET
#define PLATFORM_POSIX_THREAD
#define PLATFORM_POSIX_PIPEDESC
#define PLATFORM_POSIX_EPDESC
+#define PLATFORM_POSIX_SOCKADDR
+#define PLATFORM_POSIX_UDP
#include "platform/posix/posix_config.h"
#endif
+#ifdef PLATFORM_POSIX_SOCKADDR
+#include <sys/socket.h>
+extern int nni_posix_sockaddr2nn(nni_sockaddr *, const void *);
+extern int nni_posix_nn2sockaddr(void *, const nni_sockaddr *);
+#endif
+
#ifdef PLATFORM_POSIX_DEBUG
extern int nni_plat_errno(int);
@@ -40,8 +48,6 @@ extern int nni_plat_errno(int);
// Define types that this platform uses.
#ifdef PLATFORM_POSIX_THREAD
-extern int nni_plat_devnull; // open descriptor on /dev/null
-
#include <pthread.h>
// These types are provided for here, to permit them to be directly inlined
diff --git a/src/platform/posix/posix_sockaddr.c b/src/platform/posix/posix_sockaddr.c
new file mode 100644
index 00000000..d3ab9c6d
--- /dev/null
+++ b/src/platform/posix/posix_sockaddr.c
@@ -0,0 +1,93 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef PLATFORM_POSIX_SOCKADDR
+
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+
+int
+nni_posix_nn2sockaddr(void *sa, const nni_sockaddr *na)
+{
+ struct sockaddr_in * sin;
+ struct sockaddr_in6 * sin6;
+ const nng_sockaddr_in * nsin;
+ const nng_sockaddr_in6 *nsin6;
+
+ switch (na->s_un.s_family) {
+ case NNG_AF_INET:
+ sin = (void *) sa;
+ nsin = &na->s_un.s_in;
+ memset(sin, 0, sizeof(*sin));
+ sin->sin_family = PF_INET;
+ sin->sin_port = nsin->sa_port;
+ sin->sin_addr.s_addr = nsin->sa_addr;
+ return (sizeof(*sin));
+
+ case NNG_AF_INET6:
+ sin6 = (void *) sa;
+ nsin6 = &na->s_un.s_in6;
+ memset(sin6, 0, sizeof(*sin6));
+#ifdef SIN6_LEN
+ sin6->sin6_len = sizeof(*sin6);
+#endif
+ sin6->sin6_family = PF_INET6;
+ sin6->sin6_port = nsin6->sa_port;
+ memcpy(sin6->sin6_addr.s6_addr, nsin6->sa_addr, 16);
+ return (sizeof(*sin6));
+ }
+ return (-1);
+}
+
+int
+nni_posix_sockaddr2nn(nni_sockaddr *na, const void *sa)
+{
+ const struct sockaddr_in * sin;
+ const struct sockaddr_in6 *sin6;
+ nng_sockaddr_in * nsin;
+ nng_sockaddr_in6 * nsin6;
+
+ switch (((struct sockaddr *) sa)->sa_family) {
+ case AF_INET:
+ sin = (void *) sa;
+ nsin = &na->s_un.s_in;
+ nsin->sa_family = NNG_AF_INET;
+ nsin->sa_port = sin->sin_port;
+ nsin->sa_addr = sin->sin_addr.s_addr;
+ break;
+ case AF_INET6:
+ sin6 = (void *) sa;
+ nsin6 = &na->s_un.s_in6;
+ nsin6->sa_family = NNG_AF_INET6;
+ nsin6->sa_port = sin6->sin6_port;
+ memcpy(nsin6->sa_addr, sin6->sin6_addr.s6_addr, 16);
+ break;
+ default:
+ // We should never see this - the OS should always be
+ // specific about giving us either AF_INET or AF_INET6.
+ // Other address families are not handled here.
+ return (-1);
+ }
+ return (0);
+}
+
+#else
+
+// Suppress empty symbols warnings in ranlib.
+int nni_posix_sockaddr_not_used = 0;
+
+#endif // PLATFORM_POSIX_SOCKADDR
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_tcp.c
index 69c0e772..4efbf47d 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_tcp.c
@@ -9,14 +9,12 @@
#include "core/nng_impl.h"
-#ifdef PLATFORM_POSIX_NET
+#ifdef PLATFORM_POSIX_TCP
#include "platform/posix/posix_aio.h"
-#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
-#include <netinet/tcp.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -25,35 +23,6 @@
#include <sys/uio.h>
#include <unistd.h>
-static int
-nni_posix_tcp_addr(struct sockaddr_storage *ss, const nni_sockaddr *sa)
-{
- struct sockaddr_in * sin;
- struct sockaddr_in6 *sin6;
-
- switch (sa->s_un.s_family) {
- case NNG_AF_INET:
- sin = (void *) ss;
- memset(sin, 0, sizeof(*sin));
- sin->sin_family = PF_INET;
- sin->sin_port = sa->s_un.s_in.sa_port;
- sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr;
- return (sizeof(*sin));
-
- case NNG_AF_INET6:
- sin6 = (void *) ss;
- memset(sin6, 0, sizeof(*sin6));
-#ifdef SIN6_LEN
- sin6->sin6_len = sizeof(*sin6);
-#endif
- sin6->sin6_family = PF_INET6;
- sin6->sin6_port = sa->s_un.s_in6.sa_port;
- memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16);
- return (sizeof(*sin6));
- }
- return (-1);
-}
-
extern int nni_tcp_parse_url(char *, char **, char **, char **, char **);
int
@@ -112,7 +81,7 @@ nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode)
if ((rv = nni_aio_result(&aio)) != 0) {
goto done;
}
- len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]);
+ len = nni_posix_nn2sockaddr((void *) &ss, &aio.a_addrs[0]);
nni_posix_epdesc_set_remote(ed, &ss, len);
}
@@ -122,7 +91,7 @@ nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode)
if ((rv = nni_aio_result(&aio)) != 0) {
goto done;
}
- len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]);
+ len = nni_posix_nn2sockaddr((void *) &ss, &aio.a_addrs[0]);
nni_posix_epdesc_set_local(ed, &ss, len);
}
nni_aio_fini(&aio);
@@ -196,4 +165,4 @@ nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio)
// Suppress empty symbols warnings in ranlib.
int nni_posix_net_not_used = 0;
-#endif // PLATFORM_POSIX_NET
+#endif // PLATFORM_POSIX_TCP
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
new file mode 100644
index 00000000..49e06814
--- /dev/null
+++ b/src/platform/posix/posix_udp.c
@@ -0,0 +1,333 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef PLATFORM_POSIX_UDP
+#include "platform/posix/posix_aio.h"
+#include "platform/posix/posix_pollq.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+// UDP support.
+
+// If we can suppress SIGPIPE on send, please do so.
+#ifdef MSG_NOSIGNAL
+#define NNI_MSG_NOSIGNAL MSG_NOSIGNAL
+#else
+#define NNI_MSG_NOSIGNAL 0
+#endif
+
+struct nni_plat_udp {
+ nni_posix_pollq_node udp_pitem;
+ int udp_fd;
+ int udp_closed;
+ nni_list udp_recvq;
+ nni_list udp_sendq;
+ nni_mtx udp_mtx;
+};
+
+static void
+nni_posix_udp_doclose(nni_plat_udp *udp)
+{
+ nni_aio *aio;
+
+ udp->udp_closed = 1;
+ while ((aio = nni_list_first(&udp->udp_recvq)) != NULL) {
+ nni_list_remove(&udp->udp_recvq, aio);
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ }
+ while ((aio = nni_list_first(&udp->udp_sendq)) != NULL) {
+ nni_list_remove(&udp->udp_recvq, aio);
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ }
+ // Underlying socket left open until close API called.
+}
+
+static void
+nni_posix_udp_dorecv(nni_plat_udp *udp)
+{
+ nni_aio * aio;
+ struct sockaddr_storage ss;
+ struct msghdr hdr;
+ int niov;
+ int rv;
+ nni_list * q = &udp->udp_recvq;
+
+ // While we're able to recv, do so.
+ while ((aio = nni_list_first(q)) != NULL) {
+ nni_list_remove(q, aio);
+
+ for (niov = 0; niov < aio->a_niov; niov++) {
+ hdr.msg_iov[niov].iov_base = aio->a_iov[niov].iov_buf;
+ hdr.msg_iov[niov].iov_len = aio->a_iov[niov].iov_len;
+ }
+ hdr.msg_iovlen = niov;
+ hdr.msg_name = &ss;
+ hdr.msg_namelen = sizeof(ss);
+ hdr.msg_flags = 0;
+ hdr.msg_control = NULL;
+ hdr.msg_controllen = 0;
+ rv = recvmsg(udp->udp_fd, &hdr, 0);
+ if (rv < 0) {
+ if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
+ // No data available at socket. Return
+ // the AIO to the head of the queue.
+ nni_list_prepend(q, aio);
+ return;
+ }
+ rv = nni_plat_errno(errno);
+ nni_aio_finish(aio, rv, 0);
+ continue;
+ }
+
+ // We need to store the address information.
+ // It is incumbent on the AIO submitter to supply
+ // storage for the address.
+ if (aio->a_naddrs > 0) {
+ nni_posix_sockaddr2nn(&aio->a_addrs[0], (void *) &ss);
+ }
+
+ nni_aio_finish(aio, 0, rv);
+ }
+}
+
+static void
+nni_posix_udp_dosend(nni_plat_udp *udp)
+{
+ // XXX: TBD.
+ nni_aio * aio;
+ struct sockaddr_storage ss;
+ struct msghdr hdr;
+ int niov;
+ int rv;
+ int len;
+ nni_list * q = &udp->udp_sendq;
+
+ // While we're able to recv, do so.
+ while ((aio = nni_list_first(q)) != NULL) {
+ nni_list_remove(q, aio);
+
+ if (aio->a_naddrs < 1) {
+ // No incoming address?
+ nni_aio_finish(aio, NNG_EADDRINVAL, 0);
+ return;
+ }
+ len = nni_posix_nn2sockaddr(&ss, &aio->a_addrs[0]);
+ if (len < 0) {
+ nni_aio_finish(aio, NNG_EADDRINVAL, 0);
+ return;
+ }
+
+ for (niov = 0; niov < aio->a_niov; niov++) {
+ hdr.msg_iov[niov].iov_base = aio->a_iov[niov].iov_buf;
+ hdr.msg_iov[niov].iov_len = aio->a_iov[niov].iov_len;
+ }
+ hdr.msg_iovlen = niov;
+ hdr.msg_name = &ss;
+ hdr.msg_namelen = len;
+ hdr.msg_flags = NNI_MSG_NOSIGNAL;
+ hdr.msg_control = NULL;
+ hdr.msg_controllen = 0;
+
+ rv = sendmsg(udp->udp_fd, &hdr, 0);
+ if (rv < 0) {
+ if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
+ // Cannot send (buffers full), return to
+ // head of queue.
+ nni_list_prepend(q, aio);
+ return;
+ }
+ rv = nni_plat_errno(errno);
+ nni_aio_finish(aio, rv, 0);
+ continue;
+ }
+
+ nni_aio_finish(aio, 0, rv);
+ }
+}
+
+// This function is called by the poller on activity on the FD.
+static void
+nni_posix_udp_cb(void *arg)
+{
+ nni_plat_udp *udp = arg;
+ int revents;
+ int events = 0;
+
+ nni_mtx_lock(&udp->udp_mtx);
+ revents = udp->udp_pitem.revents;
+ if (revents & POLLIN) {
+ nni_posix_udp_dorecv(udp);
+ }
+ if (revents & POLLOUT) {
+ nni_posix_udp_dosend(udp);
+ }
+ if (revents & (POLLHUP | POLLERR | POLLNVAL)) {
+ nni_posix_udp_doclose(udp);
+ } else {
+ if (!nni_list_empty(&udp->udp_sendq)) {
+ events |= POLLOUT;
+ }
+ if (!nni_list_empty(&udp->udp_recvq)) {
+ events |= POLLIN;
+ }
+ if (events) {
+ nni_posix_pollq_arm(&udp->udp_pitem, events);
+ }
+ }
+ nni_mtx_unlock(&udp->udp_mtx);
+}
+
+int
+nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
+{
+ nni_plat_udp * udp;
+ int salen;
+ struct sockaddr_storage sa;
+ int rv;
+
+ if ((salen = nni_posix_nn2sockaddr(&sa, bindaddr)) < 0) {
+ return (NNG_EADDRINVAL);
+ }
+
+ // UDP opens can actually run synchronously.
+ if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_mtx_init(&udp->udp_mtx)) != 0) {
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+
+ udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP);
+ if (udp->udp_fd < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+
+ if (bind(udp->udp_fd, (void *) &sa, salen) != 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(udp->udp_fd);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+ udp->udp_pitem.fd = udp->udp_fd;
+ udp->udp_pitem.cb = nni_posix_udp_cb;
+ udp->udp_pitem.data = udp;
+
+ nni_aio_list_init(&udp->udp_recvq);
+ nni_aio_list_init(&udp->udp_sendq);
+
+ rv = nni_posix_pollq_add(
+ nni_posix_pollq_get(udp->udp_fd), &udp->udp_pitem);
+ if (rv != 0) {
+ (void) close(udp->udp_fd);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+ return (rv);
+ }
+
+ *upp = udp;
+ return (0);
+}
+
+void
+nni_plat_udp_close(nni_plat_udp *udp)
+{
+ nni_aio *aio;
+
+ nni_mtx_lock(&udp->udp_mtx);
+ if (udp->udp_closed) {
+ // The only way this happens is in response to a callback that
+ // is being canceled. Double close from user code is a bug.
+ nni_mtx_unlock(&udp->udp_mtx);
+ return;
+ }
+
+ // We're no longer interested in events.
+ nni_posix_pollq_remove(&udp->udp_pitem);
+
+ nni_posix_udp_doclose(udp);
+ nni_mtx_unlock(&udp->udp_mtx);
+
+ (void) close(udp->udp_fd);
+ nni_mtx_fini(&udp->udp_mtx);
+ NNI_FREE_STRUCT(udp);
+}
+
+void
+nni_plat_udp_cancel(nni_aio *aio)
+{
+ nni_plat_udp *udp = aio->a_prov_data;
+
+ nni_mtx_lock(&udp->udp_mtx);
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&udp->udp_mtx);
+}
+
+void
+nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
+{
+ nni_mtx_lock(&udp->udp_mtx);
+ if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) {
+ nni_mtx_unlock(&udp->udp_mtx);
+ return;
+ }
+
+ if (udp->udp_closed) {
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&udp->udp_mtx);
+ return;
+ }
+
+ nni_list_append(&udp->udp_recvq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
+ nni_mtx_unlock(&udp->udp_mtx);
+}
+
+void
+nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
+{
+ nni_mtx_lock(&udp->udp_mtx);
+ if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) {
+ nni_mtx_unlock(&udp->udp_mtx);
+ return;
+ }
+
+ if (udp->udp_closed) {
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&udp->udp_mtx);
+ return;
+ }
+
+ nni_list_append(&udp->udp_sendq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
+ nni_mtx_unlock(&udp->udp_mtx);
+}
+
+#else
+
+// Suppress empty symbols warnings in ranlib.
+int nni_posix_udp_not_used = 0;
+
+#endif // PLATFORM_POSIX_UDP