diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-07 00:08:24 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-07 00:08:24 -0700 |
| commit | 3730260da3744b549aaa1fe13946a674f924f63c (patch) | |
| tree | 902866876ee71246a299370cbe8f6580d758525c /src/platform/posix | |
| parent | 3b19940dfcd5d3585b1fb1dcf7915a748ae67289 (diff) | |
| download | nng-3730260da3744b549aaa1fe13946a674f924f63c.tar.gz nng-3730260da3744b549aaa1fe13946a674f924f63c.tar.bz2 nng-3730260da3744b549aaa1fe13946a674f924f63c.zip | |
TCP asynchronous working now.
It turns out that I had to fix a number of subtle asynchronous
handling bugs, but now TCP is fully asynchronous. We need to
change the high-level dial and listen interfaces to be async
as well.
Some of the transport APIs have changed here, and I've elected
to change what we expose to consumers as endpoints into seperate
dialers and listeners. Under the hood they are the same, but
it turns out that its helpful to know the intended use of the
endpoint at initialization time.
Scalability still occasionally hangs on Linux. Investigation
pending.
Diffstat (limited to 'src/platform/posix')
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 98 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipc.c | 80 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 176 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_socket.c | 422 | ||||
| -rw-r--r-- | src/platform/posix/posix_socket.h | 45 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 3 |
7 files changed, 259 insertions, 571 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 3d703e97..f64e25e9 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -8,11 +8,10 @@ // #include "core/nng_impl.h" -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_pollq.h" -#include "platform/posix/posix_socket.h" #ifdef PLATFORM_POSIX_EPDESC +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_pollq.h" #include <errno.h> #include <stdlib.h> @@ -180,6 +179,30 @@ nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) static void +nni_posix_epdesc_doerror(nni_posix_epdesc *ed) +{ + nni_aio *aio; + int rv = 1; + socklen_t sz = sizeof (rv); + + if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + if (rv == 0) { + return; + } + rv = nni_plat_errno(rv); + + while ((aio = nni_list_first(&ed->acceptq)) != NULL) { + nni_posix_epdesc_finish(aio, rv, 0); + } + while ((aio = nni_list_first(&ed->connectq)) != NULL) { + nni_posix_epdesc_finish(aio, rv, 0); + } +} + + +static void nni_posix_epdesc_doclose(nni_posix_epdesc *ed) { nni_aio *aio; @@ -191,6 +214,8 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed) if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) { (void) unlink(sun->sun_path); } + (void) close(ed->fd); + ed->fd = -1; } while ((aio = nni_list_first(&ed->acceptq)) != NULL) { nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); @@ -214,7 +239,10 @@ nni_posix_epdesc_cb(void *arg) if (ed->node.revents & POLLOUT) { nni_posix_epdesc_doconnect(ed); } - if (ed->node.revents & (POLLHUP|POLLERR|POLLNVAL)) { + if (ed->node.revents & (POLLERR|POLLHUP)) { + nni_posix_epdesc_doerror(ed); + } + if (ed->node.revents & POLLNVAL) { nni_posix_epdesc_doclose(ed); } ed->node.revents = 0; @@ -241,6 +269,58 @@ nni_posix_epdesc_close(nni_posix_epdesc *ed) } +static int +nni_posix_epdesc_parseaddr(char *pair, char **hostp, uint16_t *portp) +{ + char *host, *port, *end; + char c; + int val; + + if (pair[0] == '[') { + host = pair+1; + // IP address enclosed ... for IPv6 usually. + if ((end = strchr(host, ']')) == NULL) { + return (NNG_EADDRINVAL); + } + *end = '\0'; + port = end + 1; + if (*port == ':') { + port++; + } else if (port != '\0') { + return (NNG_EADDRINVAL); + } + } else { + host = pair; + port = strchr(host, ':'); + if (port != NULL) { + *port = '\0'; + port++; + } + } + val = 0; + while ((c = *port) != '\0') { + val *= 10; + if ((c >= '0') && (c <= '9')) { + val += (c - '0'); + } else { + return (NNG_EADDRINVAL); + } + if (val > 65535) { + return (NNG_EADDRINVAL); + } + port++; + } + if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) { + *hostp = NULL; + } else { + *hostp = host; + } + // Stash the port in big endian (network) byte order. + NNI_PUT16((uint8_t *) portp, val); + return (0); +} + + int nni_posix_epdesc_listen(nni_posix_epdesc *ed) { @@ -250,6 +330,7 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) int fd; nni_mtx_lock(&ed->mtx); + ss = &ed->locaddr; len = ed->loclen; @@ -274,6 +355,8 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) return (rv); } + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + ed->fd = fd; ed->node.fd = fd; nni_mtx_unlock(&ed->mtx); @@ -349,7 +432,10 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) } } + (void) fcntl(ed->fd, F_SETFL, O_NONBLOCK); + rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen); + if (rv == 0) { // Immediate connect, cool! This probably only happens on // loopback, and probably not on every platform. @@ -460,7 +546,9 @@ nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len) void nni_posix_epdesc_fini(nni_posix_epdesc *ed) { - // XXX: MORE WORK HERE. + if (ed->fd >= 0) { + (void) close(ed->fd); + } nni_mtx_fini(&ed->mtx); NNI_FREE_STRUCT(ed); } diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index 5bd42190..706ef15f 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -11,7 +11,6 @@ #ifdef PLATFORM_POSIX_IPC #include "platform/posix/posix_aio.h" -#include "platform/posix/posix_socket.h" #include <errno.h> #include <stdlib.h> @@ -41,37 +40,40 @@ // We alias nni_posix_pipedesc to nni_plat_ipc_pipe. // We alias nni_posix_epdesc to nni_plat_ipc_ep. -static int -nni_plat_ipc_path_resolve(struct sockaddr_un *sun, const char *path) -{ - size_t len; - - memset(sun, 0, sizeof (*sun)); - - // TODO: abstract sockets, including autobind sockets. - len = strlen(path); - if ((len >= sizeof (sun->sun_path)) || (len < 1)) { - return (NNG_EADDRINVAL); - } - (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", path); - sun->sun_family = AF_UNIX; - return (0); -} - - int -nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url) +nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url, int mode) { nni_posix_epdesc *ed; int rv; + struct sockaddr_un sun; + const char *path; if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { return (NNG_EADDRINVAL); } - url += strlen("ipc://"); // skip the prefix. + path = url + strlen("ipc://"); // skip the prefix. + + // prepare the sockaddr_un + sun.sun_family = AF_UNIX; + if (strlen(url) >= sizeof (sun.sun_path)) { + return (NNG_EADDRINVAL); + } + snprintf(sun.sun_path, sizeof (sun.sun_path), "%s", path); + if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { return (rv); } + switch (mode) { + case NNI_EP_MODE_DIAL: + nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun)); + break; + case NNI_EP_MODE_LISTEN: + nni_posix_epdesc_set_local(ed, &sun, sizeof (sun)); + break; + default: + nni_posix_epdesc_fini(ed); + return (NNG_EINVAL); + } *epp = (void *) ed; return (0); @@ -98,10 +100,14 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) // will just try to cleanup the old socket. Note that this is not // perfect in all scenarios, so use this with caution. static int -nni_plat_ipc_remove_stale(struct sockaddr_un *sun) +nni_plat_ipc_remove_stale(const char *path) { int fd; int rv; + struct sockaddr_un sun; + + sun.sun_family = AF_UNIX; + snprintf(sun.sun_path, sizeof (sun.sun_path), "%s", path); if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { return (nni_plat_errno(errno)); @@ -113,9 +119,9 @@ nni_plat_ipc_remove_stale(struct sockaddr_un *sun) // then the cleanup will fail. As this is supposed to be an // exceptional case, don't worry. (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if (connect(fd, (void *) sun, sizeof (*sun)) < 0) { + if (connect(fd, (void *) &sun, sizeof (sun)) < 0) { if (errno == ECONNREFUSED) { - (void) unlink(sun->sun_path); + (void) unlink(path); } } (void) close(fd); @@ -132,17 +138,11 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) int rv; path = nni_posix_epdesc_url(ed); + path += strlen("ipc://"); - if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { - return (rv); - } - - if ((rv = nni_plat_ipc_remove_stale(&sun)) != 0) { + if ((rv = nni_plat_ipc_remove_stale(path)) != 0) { return (rv); } - - nni_posix_epdesc_set_local(ed, &sun, sizeof (sun)); - return (nni_posix_epdesc_listen(ed)); } @@ -150,21 +150,7 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) { - const char *path; - nni_posix_epdesc *ed = (void *) ep; - struct sockaddr_un sun; - int rv; - - path = nni_posix_epdesc_url(ed); - - if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { - nni_aio_finish(aio, rv, 0); - return; - } - - nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun)); - - nni_posix_epdesc_connect(ed, aio); + nni_posix_epdesc_connect((void *) ep, aio); } diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index 6d0d7c08..26ec3632 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -11,9 +11,9 @@ #ifdef PLATFORM_POSIX_NET #include "platform/posix/posix_aio.h" -#include "platform/posix/posix_socket.h" #include <errno.h> +#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> @@ -24,101 +24,181 @@ #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> -#include <netdb.h> -// We alias nni_plat_tcpsock to an nni_posix_sock. +static int +nni_posix_tcp_addr(struct sockaddr_storage *ss, const nni_sockaddr *sa) +{ + struct sockaddr_in *sin; + struct sockaddr_in6 *sin6; + + switch (sa->s_un.s_family) { + case NNG_AF_INET: + sin = (void *) ss; + memset(sin, 0, sizeof (*sin)); + sin->sin_family = PF_INET; + sin->sin_port = sa->s_un.s_in.sa_port; + sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; + return (sizeof (*sin)); + + + case NNG_AF_INET6: + sin6 = (void *) ss; + memset(sin6, 0, sizeof (*sin6)); +#ifdef SIN6_LEN + sin6->sin6_len = sizeof (*sin6); +#endif + sin6->sin6_family = PF_INET6; + sin6->sin6_port = sa->s_un.s_in6.sa_port; + memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); + return (sizeof (*sin6)); + } + return (-1); +} + + +extern int nni_tcp_parse_url(char *, char **, char **, char **, char **); int -nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags) +nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode) { - struct addrinfo hint; - struct addrinfo *ai; - - memset(&hint, 0, sizeof (hint)); - hint.ai_flags = AI_PASSIVE | AI_ADDRCONFIG | AI_NUMERICSERV; - hint.ai_family = PF_UNSPEC; - hint.ai_socktype = SOCK_STREAM; - hint.ai_protocol = IPPROTO_TCP; - if (flags & NNI_FLAG_IPV4ONLY) { - hint.ai_family = PF_INET; + nni_posix_epdesc *ed; + char buf[NNG_MAXADDRLEN]; + int rv; + char *lhost, *rhost; + char *lserv, *rserv; + char *sep; + struct sockaddr_storage ss; + int len; + int passive; + nni_aio aio; + + if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { + return (rv); } - if (getaddrinfo(host, "1", &hint, &ai) != 0) { - return (NNG_EADDRINVAL); + // Make a local copy. + snprintf(buf, sizeof (buf), "%s", url); + nni_aio_init(&aio, NULL, NULL); + + if (mode == NNI_EP_MODE_DIAL) { + rv = nni_tcp_parse_url(buf, &rhost, &rserv, &lhost, &lserv); + if (rv != 0) { + goto done; + } + + // We have to have a remote destination! + if ((rhost == NULL) || (rserv == NULL)) { + rv = NNG_EADDRINVAL; + goto done; + } + } else { + rv = nni_tcp_parse_url(buf, &lhost, &lserv, &rhost, &rserv); + if (rv != 0) { + goto done; + } + if ((rhost != NULL) || (rserv != NULL)) { + // remotes are nonsensical here. + rv = NNG_EADDRINVAL; + goto done; + } + if (lserv == NULL) { + // missing port to listen on! + rv = NNG_EADDRINVAL; + goto done; + } } - if (nni_posix_from_sockaddr(addr, ai->ai_addr) < 0) { - freeaddrinfo(ai); - return (NNG_EADDRINVAL); + if ((rserv != NULL) || (rhost != NULL)) { + nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, 0, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) != 0) { + goto done; + } + len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]); + nni_posix_epdesc_set_remote(ed, &ss, len); } - freeaddrinfo(ai); + + if ((lserv != NULL) || (lhost != NULL)) { + nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, 1, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) != 0) { + goto done; + } + len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]); + nni_posix_epdesc_set_local(ed, &ss, len); + } + *epp = (void *) ed; return (0); + +done: + if (rv != 0) { + nni_posix_epdesc_fini(ed); + } + nni_aio_fini(&aio); + return (rv); } void -nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio) +nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) { - nni_posix_sock_aio_send((void *) s, aio); + nni_posix_epdesc_fini((void *) ep); } void -nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio) +nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) { - nni_posix_sock_aio_recv((void *) s, aio); + nni_posix_epdesc_close((void *) ep); } int -nni_plat_tcp_init(nni_plat_tcpsock **sp) +nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep) { - nni_posix_sock *s; - int rv; + return (nni_posix_epdesc_listen((void *) ep)); +} - if ((rv = nni_posix_sock_init(&s)) == 0) { - *sp = (void *) s; - } - return (rv); + +void +nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) +{ + return (nni_posix_epdesc_connect((void *) ep, aio)); } void -nni_plat_tcp_fini(nni_plat_tcpsock *s) +nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) { - nni_posix_sock_fini((void *) s); + return (nni_posix_epdesc_accept((void *) ep, aio)); } void -nni_plat_tcp_shutdown(nni_plat_tcpsock *s) +nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p) { - nni_posix_sock_shutdown((void *) s); + nni_posix_pipedesc_fini((void *) p); } -int -nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) +void +nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p) { - return (nni_posix_sock_listen((void *) s, addr)); + nni_posix_pipedesc_close((void *) p); } -// nni_plat_tcp_connect establishes an outbound connection. It the -// bind address is not null, then it will attempt to bind to the local -// address specified first. -int -nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, - const nni_sockaddr *bindaddr) +void +nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio) { - return (nni_posix_sock_connect_sync((void *) s, addr, bindaddr)); + nni_posix_pipedesc_send((void *) p, aio); } -int -nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) +void +nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) { - return (nni_posix_sock_accept_sync((void *) s, (void *) server)); + nni_posix_pipedesc_recv((void *) p, aio); } diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index c3e29c33..c509643e 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -8,10 +8,10 @@ // #include "core/nng_impl.h" -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_pollq.h" #ifdef PLATFORM_POSIX_PIPEDESC +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_pollq.h" #include <errno.h> #include <stdlib.h> @@ -183,6 +183,8 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) if (pd->fd != -1) { // Let any peer know we are closing. (void) shutdown(pd->fd, SHUT_RDWR); + close(pd->fd); + pd->fd = -1; } while ((aio = nni_list_first(&pd->readq)) != NULL) { nni_posix_pipedesc_finish(aio, NNG_ECLOSED); diff --git a/src/platform/posix/posix_socket.c b/src/platform/posix/posix_socket.c deleted file mode 100644 index 9e054c42..00000000 --- a/src/platform/posix/posix_socket.c +++ /dev/null @@ -1,422 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef PLATFORM_POSIX_SOCKET -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_socket.h" - -#include <errno.h> -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/uio.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <arpa/inet.h> -#include <sys/un.h> -#include <fcntl.h> -#include <unistd.h> -#include <netdb.h> - -// Solaris/SunOS systems define this, which collides with our symbol -// names. Just undefine it now. -#ifdef sun -#undef sun -#endif - - -#ifdef SOCK_CLOEXEC -#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) -#else -#define NNI_STREAM_SOCKTYPE SOCK_STREAM -#endif - -struct nni_posix_sock { - int fd; - char * unlink; // path to unlink at unbind - nni_posix_pipedesc * pd; - int tcpnodelay; -}; - -int -nni_posix_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa) -{ - struct sockaddr_in *sin; - struct sockaddr_un *sun; - -#ifdef PF_INET6 - struct sockaddr_in6 *sin6; -#endif - - switch (sa->s_un.s_family) { - case NNG_AF_INET: - sin = (void *) ss; - memset(sin, 0, sizeof (*sin)); - sin->sin_family = PF_INET; - sin->sin_port = sa->s_un.s_in.sa_port; - sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; - return (sizeof (*sin)); - -#ifdef PF_INET6 - // Not every platform can do IPv6. Amazingly. - case NNG_AF_INET6: - sin6 = (void *) ss; - memset(sin6, 0, sizeof (*sin6)); -#ifdef SIN6_LEN - sin6->sin6_len = sizeof (*sin6); -#endif - sin6->sin6_family = PF_INET6; - sin6->sin6_port = sa->s_un.s_in6.sa_port; - memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); - return (sizeof (*sin6)); - -#endif // PF_INET6 - - case NNG_AF_IPC: - sun = (void *) ss; - memset(sun, 0, sizeof (*sun)); - // NB: This logic does not support abstract sockets, which - // have their first byte NULL, and rely on length instead. - // Probably for dealing with abstract sockets we will just - // handle @ specially in the future. - if (strlen(sa->s_un.s_path.sa_path) >= - sizeof (sun->sun_path)) { - return (-1); // caller converts to NNG_EADDRINVAL - } - - sun->sun_family = PF_UNIX; - (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", - sa->s_un.s_path.sa_path); - return (sizeof (*sun)); - } - return (-1); -} - - -int -nni_posix_from_sockaddr(nni_sockaddr *sa, const struct sockaddr *ss) -{ - const struct sockaddr_in *sin; - const struct sockaddr_un *sun; - -#ifdef PF_INET6 - const struct sockaddr_in6 *sin6; -#endif - - memset(sa, 0, sizeof (*sa)); - switch (ss->sa_family) { - case PF_INET: - sin = (const void *) ss; - sa->s_un.s_in.sa_family = NNG_AF_INET; - sa->s_un.s_in.sa_port = sin->sin_port; - sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr; - return (0); - -#ifdef PF_INET6 - case PF_INET6: - sin6 = (const void *) ss; - sa->s_un.s_in6.sa_family = NNG_AF_INET6; - sa->s_un.s_in6.sa_port = sin6->sin6_port; - memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); - return (0); - -#endif // PF_INET6 - - case PF_UNIX: - // NB: This doesn't handle abstract sockets! - sun = (const void *) ss; - sa->s_un.s_path.sa_family = NNG_AF_IPC; - snprintf(sa->s_un.s_path.sa_path, - sizeof (sa->s_un.s_path.sa_path), "%s", sun->sun_path); - return (0); - } - return (-1); -} - - -void -nni_posix_sock_aio_send(nni_posix_sock *s, nni_aio *aio) -{ - nni_posix_pipedesc_send(s->pd, aio); -} - - -void -nni_posix_sock_aio_recv(nni_posix_sock *s, nni_aio *aio) -{ - nni_posix_pipedesc_recv(s->pd, aio); -} - - -static void -nni_posix_sock_setopts_fd(int fd, int tcpnodelay) -{ - int one; - - // Try to ensure that both CLOEXEC is set, and that we don't - // generate SIGPIPE. (Note that SIGPIPE suppression in this way - // only works on BSD systems. Linux wants us to use sendmsg().) - (void) fcntl(fd, F_SETFD, FD_CLOEXEC); -#if defined(F_SETNOSIGPIPE) - (void) fcntl(fd, F_SETNOSIGPIPE, 1); -#elif defined(SO_NOSIGPIPE) - one = 1; - (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one)); -#endif - - // Also disable Nagle. We are careful to group data with writev, - // and latency is king for most of our users. (Consider adding - // a method to enable this later.) - - // It's unclear whether this is safe for UNIX domain sockets. It - // *should* be. - if (tcpnodelay) { - one = 1; - (void) setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, - sizeof (one)); - } -} - - -int -nni_posix_sock_init(nni_posix_sock **sp) -{ - nni_posix_sock *s; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - s->fd = -1; - *sp = s; - return (0); -} - - -void -nni_posix_sock_fini(nni_posix_sock *s) -{ - if (s->fd != -1) { - (void) close(s->fd); - s->fd = -1; - } - if (s->pd != NULL) { - nni_posix_pipedesc_fini(s->pd); - } - if (s->unlink != NULL) { - (void) unlink(s->unlink); - nni_free(s->unlink, strlen(s->unlink) + 1); - } - NNI_FREE_STRUCT(s); -} - - -void -nni_posix_sock_shutdown(nni_posix_sock *s) -{ - if (s->fd != -1) { - (void) shutdown(s->fd, SHUT_RDWR); - // This causes the equivalent of a close. Hopefully waking - // up anything that didn't get the hint with the shutdown. - // (macOS does not see the shtudown). - (void) dup2(nni_plat_devnull, s->fd); - } - if (s->pd != NULL) { - nni_posix_pipedesc_close(s->pd); - } -} - - -int -nni_posix_sock_listen(nni_posix_sock *s, const nni_sockaddr *saddr) -{ - int len; - struct sockaddr_storage ss; - int rv; - int fd; - - if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - if ((saddr->s_un.s_family == NNG_AF_INET) || - (saddr->s_un.s_family == NNG_AF_INET6)) { - s->tcpnodelay = 1; - } - - nni_posix_sock_setopts_fd(fd, s->tcpnodelay); - - // UNIX DOMAIN SOCKETS -- these have names in the file namespace. - // We are going to check to see if there was a name already there. - // If there was, and nothing is listening (ECONNREFUSED), then we - // will just try to cleanup the old socket. Note that this is not - // perfect in all scenarios, so use this with caution. - if ((saddr->s_un.s_family == NNG_AF_IPC) && - (saddr->s_un.s_path.sa_path[0] != 0)) { - int chkfd; - if ((chkfd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { - (void) close(fd); - return (nni_plat_errno(errno)); - } - - // Nonblocking; we don't want to wait for remote server. - (void) fcntl(chkfd, F_SETFL, O_NONBLOCK); - if (connect(chkfd, (struct sockaddr *) &ss, len) < 0) { - if (errno == ECONNREFUSED) { - (void) unlink(saddr->s_un.s_path.sa_path); - } - } - (void) close(chkfd); - } - - if (bind(fd, (struct sockaddr *) &ss, len) < 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - - // For IPC, record the path so we unlink it later - if ((saddr->s_un.s_family == NNG_AF_IPC) && - (saddr->s_un.s_path.sa_path[0] != 0)) { - s->unlink = nni_alloc(strlen(saddr->s_un.s_path.sa_path) + 1); - if (s->unlink == NULL) { - (void) close(fd); - (void) unlink(saddr->s_un.s_path.sa_path); - return (NNG_ENOMEM); - } - strcpy(s->unlink, saddr->s_un.s_path.sa_path); - } - - // Listen -- 128 depth is probably sufficient. If it isn't, other - // bad things are going to happen. - if (listen(fd, 128) != 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - - s->fd = fd; - - return (0); -} - - -// These functions will need to be removed in the future. They are -// transition functions for now. - -int -nni_posix_sock_accept_sync(nni_posix_sock *s, nni_posix_sock *server) -{ - int fd; - int rv; - - for (;;) { -#ifdef NNG_USE_ACCEPT4 - fd = accept4(server->fd, NULL, NULL, SOCK_CLOEXEC); - if ((fd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { - fd = accept(server->fd, NULL, NULL); - } -#else - fd = accept(server->fd, NULL, NULL); -#endif - - if (fd < 0) { - return (nni_plat_errno(errno)); - } else { - break; - } - } - - nni_posix_sock_setopts_fd(fd, s->tcpnodelay); - - if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) { - close(fd); - return (rv); - } - s->fd = fd; - return (0); -} - - -int -nni_posix_sock_connect_sync(nni_posix_sock *s, const nni_sockaddr *addr, - const nni_sockaddr *bindaddr) -{ - int fd; - int len; - struct sockaddr_storage ss; - struct sockaddr_storage bss; - int rv; - - if ((len = nni_posix_to_sockaddr(&ss, addr)) < 0) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - - if ((addr->s_un.s_family == NNG_AF_INET) || - (addr->s_un.s_family == NNG_AF_INET6)) { - s->tcpnodelay = 1; - } - - if (bindaddr != NULL) { - if (bindaddr->s_un.s_family != addr->s_un.s_family) { - return (NNG_EINVAL); - } - if (nni_posix_to_sockaddr(&bss, bindaddr) < 0) { - return (NNG_EADDRINVAL); - } - if (bind(fd, (struct sockaddr *) &bss, len) < 0) { - rv = nni_plat_errno(errno); - (void) close(fd); - return (rv); - } - } - - nni_posix_sock_setopts_fd(fd, s->tcpnodelay); - - if (connect(fd, (struct sockaddr *) &ss, len) != 0) { - rv = nni_plat_errno(errno); - // Unix domain sockets return ENOENT when nothing is there. - // Massage this into ECONNREFUSED, to provide more consistent - // behavior. - if (rv == NNG_ENOENT) { - rv = NNG_ECONNREFUSED; - } - (void) close(fd); - return (rv); - } - if (s->pd != NULL) { - // If we had a prior pipedesc hanging around, nuke it. - nni_posix_pipedesc_fini(s->pd); - s->pd = NULL; - } - if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) { - (void) close(fd); - return (rv); - } - s->fd = fd; - return (0); -} - - -#else - -// Suppress empty symbols warnings in ranlib. -int nni_posix_socket_not_used = 0; - -#endif // PLATFORM_POSIX_SOCKET diff --git a/src/platform/posix/posix_socket.h b/src/platform/posix/posix_socket.h deleted file mode 100644 index f3fb169c..00000000 --- a/src/platform/posix/posix_socket.h +++ /dev/null @@ -1,45 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#ifndef PLATFORM_POSIX_SOCKET_H -#define PLATFORM_POSIX_SOCKET_H - -// This file provides declarations for comment socket handling functions on -// POSIX platforms. We assume that TCP and Unix domain socket (IPC) all -// work using mostly comment socket handling routines. - -#include "core/nng_impl.h" - -#include "platform/posix/posix_aio.h" - -#include <sys/types.h> -#include <sys/socket.h> - -typedef struct nni_posix_sock nni_posix_sock; - -extern int nni_posix_to_sockaddr(struct sockaddr_storage *, - const nni_sockaddr *); -extern int nni_posix_from_sockaddr(nni_sockaddr *, const struct sockaddr *); -extern void nni_posix_sock_aio_send(nni_posix_sock *, nni_aio *); -extern void nni_posix_sock_aio_recv(nni_posix_sock *, nni_aio *); -extern int nni_posix_sock_init(nni_posix_sock **); -extern void nni_posix_sock_fini(nni_posix_sock *); -extern void nni_posix_sock_shutdown(nni_posix_sock *); -extern int nni_posix_sock_listen(nni_posix_sock *, const nni_sockaddr *); - -// These functions will need to be removed in the future. They are -// transition functions for now. - -extern int nni_posix_sock_send_sync(nni_posix_sock *, nni_iov *, int); -extern int nni_posix_sock_recv_sync(nni_posix_sock *, nni_iov *, int); -extern int nni_posix_sock_accept_sync(nni_posix_sock *, nni_posix_sock *); -extern int nni_posix_sock_connect_sync(nni_posix_sock *, - const nni_sockaddr *, const nni_sockaddr *); - -#endif // PLATFORM_POSIX_SOCKET_H diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 4137984f..a2f24f8a 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -297,7 +297,7 @@ nni_plat_init(int (*helper)(void)) // probably get by with even just 8k, but Linux usually wants 16k // as a minimum. If this fails, its not fatal, just we won't be // as scalable / thrifty with our use of VM. - (void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); + //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); if ((rv = nni_posix_pollq_sysinit()) != 0) { pthread_mutex_unlock(&nni_plat_lock); @@ -316,7 +316,6 @@ nni_plat_init(int (*helper)(void)) pthread_condattr_destroy(&nni_cvattr); pthread_attr_destroy(&nni_pthread_attr); return (rv); - } if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { |
