diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-06-29 17:39:41 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-06-29 17:39:41 -0700 |
| commit | f8ac106494444962788dc94e147bd33a3e8cdab6 (patch) | |
| tree | a26dbcb45c4d8bfefbb3f1761b3938eab48bfabe /src | |
| parent | 723e39f6c03e241994a2e26b907e41f5bf5db3e7 (diff) | |
| download | nng-f8ac106494444962788dc94e147bd33a3e8cdab6.tar.gz nng-f8ac106494444962788dc94e147bd33a3e8cdab6.tar.bz2 nng-f8ac106494444962788dc94e147bd33a3e8cdab6.zip | |
Use common socket handling on POSIX (tcp done, ipc pending.)
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/aio.h | 3 | ||||
| -rw-r--r-- | src/nng.h | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 1 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 351 | ||||
| -rw-r--r-- | src/platform/posix/posix_poll.c | 18 | ||||
| -rw-r--r-- | src/platform/posix/posix_socket.c | 491 | ||||
| -rw-r--r-- | src/platform/posix/posix_socket.h | 45 |
8 files changed, 579 insertions, 334 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4c32d5ce..cfe66dd5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -80,6 +80,7 @@ set (NNG_SOURCES platform/posix/posix_impl.h platform/posix/posix_config.h platform/posix/posix_aio.h + platform/posix/posix_socket.h platform/posix/posix_alloc.c platform/posix/posix_clock.c @@ -89,6 +90,7 @@ set (NNG_SOURCES platform/posix/posix_pipe.c platform/posix/posix_poll.c platform/posix/posix_rand.c + platform/posix/posix_socket.c platform/posix/posix_thread.c platform/windows/win_impl.h diff --git a/src/core/aio.h b/src/core/aio.h index a290281f..6aad8b00 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -38,6 +38,9 @@ struct nni_aio { // Message operations. nni_msg * a_msg; + // Connect operations. + nni_sockaddr * a_sockaddr; + // TBD: Resolver operations. // Provider-use fields. @@ -458,7 +458,7 @@ NNG_DECL void nng_thread_destroy(void *); // These structures can be obtained via property lookups, etc. struct nng_sockaddr_path { uint16_t sa_family; - uint8_t sa_path[NNG_MAXADDRLEN]; + char sa_path[NNG_MAXADDRLEN]; }; typedef struct nng_sockaddr_path nng_sockaddr_path; typedef struct nng_sockaddr_path nng_sockaddr_ipc; diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 9c8d00dc..83b4914e 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -24,6 +24,7 @@ #define PLATFORM_POSIX_NET #define PLATFORM_POSIX_PIPE #define PLATFORM_POSIX_RANDOM +#define PLATFORM_POSIX_SOCKET #define PLATFORM_POSIX_THREAD #include "platform/posix/posix_config.h" diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index 1d44435e..161ffc7c 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -11,6 +11,7 @@ #ifdef PLATFORM_POSIX_NET #include "platform/posix/posix_aio.h" +#include "platform/posix/posix_socket.h" #include <errno.h> #include <stdlib.h> @@ -25,73 +26,7 @@ #include <unistd.h> #include <netdb.h> -#ifdef SOCK_CLOEXEC -#define NNI_TCP_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) -#else -#define NNI_TCP_SOCKTYPE SOCK_STREAM -#endif - -struct nni_plat_tcpsock { - int fd; - int devnull; // for shutting down accept() - nni_posix_pipedesc * pd; -}; - -static int -nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa) -{ - struct sockaddr_in *sin; - struct sockaddr_in6 *sin6; - - switch (sa->s_un.s_family) { - case NNG_AF_INET: - sin = (void *) ss; - memset(sin, 0, sizeof (*sin)); - sin->sin_family = PF_INET; - sin->sin_port = sa->s_un.s_in.sa_port; - sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; - return (sizeof (*sin)); - - case NNG_AF_INET6: - sin6 = (void *) ss; - memset(sin6, 0, sizeof (*sin6)); -#ifdef SIN6_LEN - sin6->sin6_len = sizeof (*sin6); -#endif - sin6->sin6_family = PF_INET6; - sin6->sin6_port = sa->s_un.s_in6.sa_port; - memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); - return (sizeof (*sin6)); - } - return (-1); -} - - -static int -nni_plat_from_sockaddr(nni_sockaddr *sa, const struct sockaddr *ss) -{ - const struct sockaddr_in *sin; - const struct sockaddr_in6 *sin6; - - memset(sa, 0, sizeof (*sa)); - switch (ss->sa_family) { - case PF_INET: - sin = (const void *) ss; - sa->s_un.s_in.sa_family = NNG_AF_INET; - sa->s_un.s_in.sa_port = sin->sin_port; - sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr; - return (0); - - case PF_INET6: - sin6 = (const void *) ss; - sa->s_un.s_in6.sa_family = NNG_AF_INET6; - sa->s_un.s_in6.sa_port = sin6->sin6_port; - memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); - return (0); - } - return (-1); -} - +// We alias nni_plat_tcpsock to an nni_posix_sock. int nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags) @@ -112,7 +47,7 @@ nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags) return (NNG_EADDRINVAL); } - if (nni_plat_from_sockaddr(addr, ai->ai_addr) < 0) { + if (nni_posix_from_sockaddr(addr, ai->ai_addr) < 0) { freeaddrinfo(ai); return (NNG_EADDRINVAL); } @@ -124,228 +59,62 @@ nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags) int nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) { - struct iovec iov[4]; // We never have more than 3 at present - int i; - int offset; - int resid = 0; - int rv; - - if (cnt > 4) { - return (NNG_EINVAL); - } - - for (i = 0; i < cnt; i++) { - iov[i].iov_base = iovs[i].iov_buf; - iov[i].iov_len = iovs[i].iov_len; - resid += iov[i].iov_len; - } + return (nni_posix_sock_send_sync((void *) s, iovs, cnt)); +} - i = 0; - while (resid) { - rv = writev(s->fd, &iov[i], cnt); - if (rv < 0) { - if (rv == EINTR) { - continue; - } - return (nni_plat_errno(errno)); - } - if (rv > resid) { - nni_panic("writev says it wrote too much!"); - } - resid -= rv; - while (rv) { - if (iov[i].iov_len <= rv) { - rv -= iov[i].iov_len; - i++; - cnt--; - } else { - iov[i].iov_len -= rv; - iov[i].iov_base += rv; - rv = 0; - } - } - } - return (0); +int +nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) +{ + return (nni_posix_sock_recv_sync((void *) s, iovs, cnt)); } void nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio) { - nni_posix_pipedesc_write(s->pd, aio); + nni_posix_sock_aio_send((void *) s, aio); } void nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio) { - nni_posix_pipedesc_read(s->pd, aio); -} - - -int -nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) -{ - struct iovec iov[4]; // We never have more than 3 at present - int i; - int offset; - int resid = 0; - int rv; - - if (cnt > 4) { - return (NNG_EINVAL); - } - - for (i = 0; i < cnt; i++) { - iov[i].iov_base = iovs[i].iov_buf; - iov[i].iov_len = iovs[i].iov_len; - resid += iov[i].iov_len; - } - - i = 0; - while (resid) { - rv = readv(s->fd, &iov[i], cnt); - if (rv < 0) { - if (errno == EINTR) { - continue; - } - return (nni_plat_errno(errno)); - } - if (rv == 0) { - return (NNG_ECLOSED); - } - if (rv > resid) { - nni_panic("readv says it read too much!"); - } - - resid -= rv; - while (rv) { - if (iov[i].iov_len <= rv) { - rv -= iov[i].iov_len; - i++; - cnt--; - } else { - iov[i].iov_len -= rv; - iov[i].iov_base += rv; - rv = 0; - } - } - } - - return (0); -} - - -static void -nni_plat_tcp_setopts(int fd) -{ - int one; - - // Try to ensure that both CLOEXEC is set, and that we don't - // generate SIGPIPE. (Note that SIGPIPE suppression in this way - // only works on BSD systems. Linux wants us to use sendmsg().) - (void) fcntl(fd, F_SETFD, FD_CLOEXEC); -#if defined(F_SETNOSIGPIPE) - (void) fcntl(fd, F_SETNOSIGPIPE, 1); -#elif defined(SO_NOSIGPIPE) - one = 1; - (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one)); -#endif - - // Also disable Nagle. We are careful to group data with writev, - // and latency is king for most of our users. (Consider adding - // a method to enable this later.) - one = 1; - (void) setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof (one)); + nni_posix_sock_aio_recv((void *) s, aio); } int nni_plat_tcp_init(nni_plat_tcpsock **tspp) { - nni_plat_tcpsock *tsp; + nni_posix_sock *s; + int rv; - if ((tsp = NNI_ALLOC_STRUCT(tsp)) == NULL) { - return (NNG_ENOMEM); + if ((rv = nni_posix_sock_init(&s)) == 0) { + *tspp = (void *) s; } - tsp->fd = -1; - *tspp = tsp; - return (0); + return (rv); } void -nni_plat_tcp_fini(nni_plat_tcpsock *tsp) +nni_plat_tcp_fini(nni_plat_tcpsock *s) { - if (tsp->fd != -1) { - (void) close(tsp->fd); - tsp->fd = -1; - } - if (tsp->pd != NULL) { - nni_posix_pipedesc_fini(tsp->pd); - } - NNI_FREE_STRUCT(tsp); + nni_posix_sock_fini((void *) s); } void -nni_plat_tcp_shutdown(nni_plat_tcpsock *tsp) +nni_plat_tcp_shutdown(nni_plat_tcpsock *s) { - if (tsp->fd != -1) { - (void) shutdown(tsp->fd, SHUT_RDWR); - // This causes the equivalent of a close. Hopefully waking - // up anything that didn't get the hint with the shutdown. - // (macOS does not see the shtudown). - (void) dup2(nni_plat_devnull, tsp->fd); - } - if (tsp->pd != NULL) { - nni_posix_pipedesc_close(tsp->pd); - } + nni_posix_sock_shutdown((void *) s); } -// nni_plat_tcp_bind creates a file descriptor bound to the given address. -// This basically does the equivalent of socket, bind, and listen. We have -// chosen a default value for the listen backlog of 128, which should be -// plenty. (If it isn't, then the accept thread can't get enough resources -// to keep up, and your clients are going to experience bad things. Normally -// the actual backlog should hover near 0 anyway.) int -nni_plat_tcp_listen(nni_plat_tcpsock *tsp, const nni_sockaddr *addr) +nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) { - int fd; - int len; - struct sockaddr_storage ss; - int rv; - - len = nni_plat_to_sockaddr(&ss, addr); - if (len < 0) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(ss.ss_family, NNI_TCP_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - - nni_plat_tcp_setopts(fd); - - if (bind(fd, (struct sockaddr *) &ss, len) < 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - - // Listen -- 128 depth is probably sufficient. If it isn't, other - // bad things are going to happen. - if (listen(fd, 128) != 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - - tsp->fd = fd; - return (0); + return (nni_posix_sock_listen((void *) s, addr)); } @@ -353,85 +122,17 @@ nni_plat_tcp_listen(nni_plat_tcpsock *tsp, const nni_sockaddr *addr) // bind address is not null, then it will attempt to bind to the local // address specified first. int -nni_plat_tcp_connect(nni_plat_tcpsock *tsp, const nni_sockaddr *addr, +nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, const nni_sockaddr *bindaddr) { - int fd; - int len; - struct sockaddr_storage ss; - struct sockaddr_storage bss; - int rv; - - len = nni_plat_to_sockaddr(&ss, addr); - if (len < 0) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(ss.ss_family, NNI_TCP_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - - if (bindaddr != NULL) { - if (bindaddr->s_un.s_family != addr->s_un.s_family) { - return (NNG_EINVAL); - } - if (nni_plat_to_sockaddr(&bss, bindaddr) < 0) { - return (NNG_EADDRINVAL); - } - if (bind(fd, (struct sockaddr *) &bss, len) < 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - } - - nni_plat_tcp_setopts(fd); - - if (connect(fd, (struct sockaddr *) &ss, len) != 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) { - (void) close(fd); - return (rv); - } - tsp->fd = fd; - return (0); + return (nni_posix_sock_connect_sync((void *) s, addr, bindaddr)); } int -nni_plat_tcp_accept(nni_plat_tcpsock *tsp, nni_plat_tcpsock *server) +nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) { - int fd; - int rv; - - for (;;) { -#ifdef NNG_USE_ACCEPT4 - fd = accept4(server->fd, NULL, NULL, SOCK_CLOEXEC); - if ((fd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { - fd = accept(server->fd, NULL, NULL); - } -#else - fd = accept(server->fd, NULL, NULL); -#endif - - if (fd < 0) { - return (nni_plat_errno(errno)); - } else { - break; - } - } - - nni_plat_tcp_setopts(fd); - - if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) { - close(fd); - return (rv); - } - tsp->fd = fd; - return (0); + return (nni_posix_sock_accept_sync((void *) s, (void *) server)); } diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c index d309babf..c5c494f6 100644 --- a/src/platform/posix/posix_poll.c +++ b/src/platform/posix/posix_poll.c @@ -53,6 +53,8 @@ struct nni_posix_epdesc { nni_list acceptq; nni_list_node node; nni_posix_pollq * pq; + struct sockaddr_storage locaddr; + struct sockaddr_storage remaddr; }; @@ -78,7 +80,7 @@ struct nni_posix_pollq { static nni_posix_pollq nni_posix_global_pollq; static void -nni_posix_poll_finish(nni_aio *aio, int rv) +nni_posix_pipedesc_finish(nni_aio *aio, int rv) { nni_posix_pipedesc *pd; @@ -120,7 +122,7 @@ nni_posix_poll_write(nni_posix_pipedesc *pd) rv = nni_plat_errno(errno); nni_list_remove(&pd->writeq, aio); - nni_posix_poll_finish(aio, rv); + nni_posix_pipedesc_finish(aio, rv); return; } @@ -148,7 +150,7 @@ nni_posix_poll_write(nni_posix_pipedesc *pd) // We completed the entire operation on this aioq. nni_list_remove(&pd->writeq, aio); - nni_posix_poll_finish(aio, 0); + nni_posix_pipedesc_finish(aio, 0); // Go back to start of loop to see if there is another // aioq ready for us to process. @@ -183,13 +185,13 @@ nni_posix_poll_read(nni_posix_pipedesc *pd) } rv = nni_plat_errno(errno); - nni_posix_poll_finish(aio, rv); + nni_posix_pipedesc_finish(aio, rv); return; } if (n == 0) { // No bytes indicates a closed descriptor. - nni_posix_poll_finish(aio, NNG_ECLOSED); + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); return; } @@ -216,7 +218,7 @@ nni_posix_poll_read(nni_posix_pipedesc *pd) } // We completed the entire operation on this aioq. - nni_posix_poll_finish(aio, 0); + nni_posix_pipedesc_finish(aio, 0); // Go back to start of loop to see if there is another // aioq ready for us to process. @@ -230,10 +232,10 @@ nni_posix_poll_close(nni_posix_pipedesc *pd) nni_aio *aio; while ((aio = nni_list_first(&pd->readq)) != NULL) { - nni_posix_poll_finish(aio, NNG_ECLOSED); + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); } while ((aio = nni_list_first(&pd->writeq)) != NULL) { - nni_posix_poll_finish(aio, NNG_ECLOSED); + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); } } diff --git a/src/platform/posix/posix_socket.c b/src/platform/posix/posix_socket.c new file mode 100644 index 00000000..b50bd65f --- /dev/null +++ b/src/platform/posix/posix_socket.c @@ -0,0 +1,491 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef PLATFORM_POSIX_SOCKET +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_socket.h" + +#include <errno.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/uio.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <sys/un.h> +#include <fcntl.h> +#include <unistd.h> +#include <netdb.h> + +#ifdef SOCK_CLOEXEC +#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) +#else +#define NNI_STREAM_SOCKTYPE SOCK_STREAM +#endif + +struct nni_posix_sock { + int fd; + int devnull; // for shutting down accept() + char * unlink; // path to unlink at unbind + nni_posix_pipedesc * pd; +}; + +int +nni_posix_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa) +{ + struct sockaddr_in *sin; + struct sockaddr_un *sun; + +#ifdef PF_INET6 + struct sockaddr_in6 *sin6; +#endif + + switch (sa->s_un.s_family) { + case NNG_AF_INET: + sin = (void *) ss; + memset(sin, 0, sizeof (*sin)); + sin->sin_family = PF_INET; + sin->sin_port = sa->s_un.s_in.sa_port; + sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; + return (sizeof (*sin)); + +#ifdef PF_INET6 + // Not every platform can do IPv6. Amazingly. + case NNG_AF_INET6: + sin6 = (void *) ss; + memset(sin6, 0, sizeof (*sin6)); +#ifdef SIN6_LEN + sin6->sin6_len = sizeof (*sin6); +#endif + sin6->sin6_family = PF_INET6; + sin6->sin6_port = sa->s_un.s_in6.sa_port; + memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); + return (sizeof (*sin6)); + +#endif // PF_INET6 + + case NNG_AF_IPC: + sun = (void *) ss; + memset(sun, 0, sizeof (*sun)); + // NB: This logic does not support abstract sockets, which + // have their first byte NULL, and rely on length instead. + // Probably for dealing with abstract sockets we will just + // handle @ specially in the future. + if (strlen(sa->s_un.s_path.sa_path) >= + sizeof (sun->sun_path)) { + return (-1); // caller converts to NNG_EADDRINVAL + } + + sun->sun_family = PF_UNIX; + (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", + sa->s_un.s_path.sa_path); + // Some systems (Linux!) have sun_len, while others do not. + // The lack of a length field means systems without it cannot + // use abstract sockets. +#ifdef SUN_LEN + sun->sun_len = SUN_LEN(sun); + return (sun->sun_len); + +#else + return (sizeof (*sun)); + +#endif + } + return (-1); +} + + +int +nni_posix_from_sockaddr(nni_sockaddr *sa, const struct sockaddr *ss) +{ + const struct sockaddr_in *sin; + const struct sockaddr_un *sun; + +#ifdef PF_INET6 + const struct sockaddr_in6 *sin6; +#endif + + memset(sa, 0, sizeof (*sa)); + switch (ss->sa_family) { + case PF_INET: + sin = (const void *) ss; + sa->s_un.s_in.sa_family = NNG_AF_INET; + sa->s_un.s_in.sa_port = sin->sin_port; + sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr; + return (0); + +#ifdef PF_INET6 + case PF_INET6: + sin6 = (const void *) ss; + sa->s_un.s_in6.sa_family = NNG_AF_INET6; + sa->s_un.s_in6.sa_port = sin6->sin6_port; + memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); + return (0); + +#endif // PF_INET6 + + case PF_UNIX: + // NB: This doesn't handle abstract sockets! + sun = (const void *) ss; + sa->s_un.s_path.sa_family = NNG_AF_IPC; + snprintf(sa->s_un.s_path.sa_path, + sizeof (sa->s_un.s_path.sa_path), "%s", sun->sun_path); + return (0); + } + return (-1); +} + + +void +nni_posix_sock_aio_send(nni_posix_sock *s, nni_aio *aio) +{ + nni_posix_pipedesc_write(s->pd, aio); +} + + +void +nni_posix_sock_aio_recv(nni_posix_sock *s, nni_aio *aio) +{ + nni_posix_pipedesc_read(s->pd, aio); +} + + +static void +nni_posix_sock_setopts_fd(int fd) +{ + int one; + + // Try to ensure that both CLOEXEC is set, and that we don't + // generate SIGPIPE. (Note that SIGPIPE suppression in this way + // only works on BSD systems. Linux wants us to use sendmsg().) + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); +#if defined(F_SETNOSIGPIPE) + (void) fcntl(fd, F_SETNOSIGPIPE, 1); +#elif defined(SO_NOSIGPIPE) + one = 1; + (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one)); +#endif + + // Also disable Nagle. We are careful to group data with writev, + // and latency is king for most of our users. (Consider adding + // a method to enable this later.) + + // It's unclear whether this is safe for UNIX domain sockets. It + // *should* be. + one = 1; + (void) setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof (one)); +} + + +int +nni_posix_sock_init(nni_posix_sock **sp) +{ + nni_posix_sock *s; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + s->fd = -1; + *sp = s; + return (0); +} + + +void +nni_posix_sock_fini(nni_posix_sock *s) +{ + if (s->fd != -1) { + (void) close(s->fd); + s->fd = -1; + } + if (s->pd != NULL) { + nni_posix_pipedesc_fini(s->pd); + } + NNI_FREE_STRUCT(s); +} + + +void +nni_posix_sock_shutdown(nni_posix_sock *s) +{ + if (s->fd != -1) { + (void) shutdown(s->fd, SHUT_RDWR); + // This causes the equivalent of a close. Hopefully waking + // up anything that didn't get the hint with the shutdown. + // (macOS does not see the shtudown). + (void) dup2(nni_plat_devnull, s->fd); + } + if (s->pd != NULL) { + nni_posix_pipedesc_close(s->pd); + } +} + + +int +nni_posix_sock_listen(nni_posix_sock *s, const nni_sockaddr *saddr) +{ + int len; + struct sockaddr_storage ss; + int rv; + int fd; + + if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) { + return (NNG_EADDRINVAL); + } + + if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { + return (nni_plat_errno(errno)); + } + + nni_posix_sock_setopts_fd(fd); + + // UNIX DOMAIN SOCKETS -- these have names in the file namespace. + // We are going to check to see if there was a name already there. + // If there was, and nothing is listening (ECONNREFUSED), then we + // will just try to cleanup the old socket. Note that this is not + // perfect in all scenarios, so use this with caution. + if ((ss.ss_family == AF_UNIX) && + (saddr->s_un.s_path.sa_path[0] != 0)) { + int chkfd; + if ((chkfd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { + (void) close(fd); + return (nni_plat_errno(errno)); + } + + // Nonblocking; we don't want to wait for remote server. + (void) fcntl(chkfd, F_SETFL, O_NONBLOCK); + if (connect(chkfd, (struct sockaddr *) &ss, sizeof (ss)) < 0) { + if (errno == ECONNREFUSED) { + (void) unlink(saddr->s_un.s_path.sa_path); + } + } + (void) close(chkfd); + + // Record the path so we unlink it later + s->unlink = nni_alloc(strlen(saddr->s_un.s_path.sa_path) + 1); + if (s->unlink == NULL) { + (void) close(fd); + return (NNG_ENOMEM); + } + } + + + if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + rv = nni_plat_errno(errno); + (void) close(fd); + return (rv); + } + + // Listen -- 128 depth is probably sufficient. If it isn't, other + // bad things are going to happen. + if (listen(fd, 128) != 0) { + rv = nni_plat_errno(errno); + (void) close(fd); + return (rv); + } + + s->fd = fd; + return (0); +} + + +// These functions will need to be removed in the future. They are +// transition functions for now. + +int +nni_posix_sock_send_sync(nni_posix_sock *s, nni_iov *iovs, int cnt) +{ + struct iovec iov[4]; // We never have more than 3 at present + int i; + int offset; + int resid = 0; + int rv; + + if (cnt > 4) { + return (NNG_EINVAL); + } + + for (i = 0; i < cnt; i++) { + iov[i].iov_base = iovs[i].iov_buf; + iov[i].iov_len = iovs[i].iov_len; + resid += iov[i].iov_len; + } + + i = 0; + while (resid) { + rv = writev(s->fd, &iov[i], cnt); + if (rv < 0) { + if (rv == EINTR) { + continue; + } + return (nni_plat_errno(errno)); + } + NNI_ASSERT(rv <= resid); + resid -= rv; + while (rv) { + if (iov[i].iov_len <= rv) { + rv -= iov[i].iov_len; + i++; + cnt--; + } else { + iov[i].iov_len -= rv; + iov[i].iov_base += rv; + rv = 0; + } + } + } + + return (0); +} + + +int +nni_posix_sock_recv_sync(nni_posix_sock *s, nni_iov *iovs, int cnt) +{ + struct iovec iov[4]; // We never have more than 3 at present + int i; + int offset; + int resid = 0; + int rv; + + if (cnt > 4) { + return (NNG_EINVAL); + } + + for (i = 0; i < cnt; i++) { + iov[i].iov_base = iovs[i].iov_buf; + iov[i].iov_len = iovs[i].iov_len; + resid += iov[i].iov_len; + } + + i = 0; + while (resid) { + rv = readv(s->fd, &iov[i], cnt); + if (rv < 0) { + if (errno == EINTR) { + continue; + } + return (nni_plat_errno(errno)); + } + if (rv == 0) { + return (NNG_ECLOSED); + } + NNI_ASSERT(rv <= resid); + + resid -= rv; + while (rv) { + if (iov[i].iov_len <= rv) { + rv -= iov[i].iov_len; + i++; + cnt--; + } else { + iov[i].iov_len -= rv; + iov[i].iov_base += rv; + rv = 0; + } + } + } + + return (0); +} + + +int +nni_posix_sock_accept_sync(nni_posix_sock *s, nni_posix_sock *server) +{ + int fd; + int rv; + + for (;;) { +#ifdef NNG_USE_ACCEPT4 + fd = accept4(server->fd, NULL, NULL, SOCK_CLOEXEC); + if ((fd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { + fd = accept(server->fd, NULL, NULL); + } +#else + fd = accept(server->fd, NULL, NULL); +#endif + + if (fd < 0) { + return (nni_plat_errno(errno)); + } else { + break; + } + } + + nni_posix_sock_setopts_fd(fd); + + if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) { + close(fd); + return (rv); + } + s->fd = fd; + return (0); +} + + +int +nni_posix_sock_connect_sync(nni_posix_sock *s, const nni_sockaddr *addr, + const nni_sockaddr *bindaddr) +{ + int fd; + int len; + struct sockaddr_storage ss; + struct sockaddr_storage bss; + int rv; + + if ((len = nni_posix_to_sockaddr(&ss, addr)) < 0) { + return (NNG_EADDRINVAL); + } + + if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { + return (nni_plat_errno(errno)); + } + + if (bindaddr != NULL) { + if (bindaddr->s_un.s_family != addr->s_un.s_family) { + return (NNG_EINVAL); + } + if (nni_posix_to_sockaddr(&bss, bindaddr) < 0) { + return (NNG_EADDRINVAL); + } + if (bind(fd, (struct sockaddr *) &bss, len) < 0) { + rv = nni_plat_errno(errno); + (void) close(fd); + return (rv); + } + } + + nni_posix_sock_setopts_fd(fd); + + if (connect(fd, (struct sockaddr *) &ss, len) != 0) { + rv = nni_plat_errno(errno); + (void) close(fd); + return (rv); + } + if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) { + (void) close(fd); + return (rv); + } + s->fd = fd; + return (0); +} + + +#else + +// Suppress empty symbols warnings in ranlib. +int nni_posix_socket_not_used = 0; + +#endif // PLATFORM_POSIX_SOCKET diff --git a/src/platform/posix/posix_socket.h b/src/platform/posix/posix_socket.h new file mode 100644 index 00000000..f3fb169c --- /dev/null +++ b/src/platform/posix/posix_socket.h @@ -0,0 +1,45 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef PLATFORM_POSIX_SOCKET_H +#define PLATFORM_POSIX_SOCKET_H + +// This file provides declarations for comment socket handling functions on +// POSIX platforms. We assume that TCP and Unix domain socket (IPC) all +// work using mostly comment socket handling routines. + +#include "core/nng_impl.h" + +#include "platform/posix/posix_aio.h" + +#include <sys/types.h> +#include <sys/socket.h> + +typedef struct nni_posix_sock nni_posix_sock; + +extern int nni_posix_to_sockaddr(struct sockaddr_storage *, + const nni_sockaddr *); +extern int nni_posix_from_sockaddr(nni_sockaddr *, const struct sockaddr *); +extern void nni_posix_sock_aio_send(nni_posix_sock *, nni_aio *); +extern void nni_posix_sock_aio_recv(nni_posix_sock *, nni_aio *); +extern int nni_posix_sock_init(nni_posix_sock **); +extern void nni_posix_sock_fini(nni_posix_sock *); +extern void nni_posix_sock_shutdown(nni_posix_sock *); +extern int nni_posix_sock_listen(nni_posix_sock *, const nni_sockaddr *); + +// These functions will need to be removed in the future. They are +// transition functions for now. + +extern int nni_posix_sock_send_sync(nni_posix_sock *, nni_iov *, int); +extern int nni_posix_sock_recv_sync(nni_posix_sock *, nni_iov *, int); +extern int nni_posix_sock_accept_sync(nni_posix_sock *, nni_posix_sock *); +extern int nni_posix_sock_connect_sync(nni_posix_sock *, + const nni_sockaddr *, const nni_sockaddr *); + +#endif // PLATFORM_POSIX_SOCKET_H |
