aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-29 17:39:41 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-29 17:39:41 -0700
commitf8ac106494444962788dc94e147bd33a3e8cdab6 (patch)
treea26dbcb45c4d8bfefbb3f1761b3938eab48bfabe
parent723e39f6c03e241994a2e26b907e41f5bf5db3e7 (diff)
downloadnng-f8ac106494444962788dc94e147bd33a3e8cdab6.tar.gz
nng-f8ac106494444962788dc94e147bd33a3e8cdab6.tar.bz2
nng-f8ac106494444962788dc94e147bd33a3e8cdab6.zip
Use common socket handling on POSIX (tcp done, ipc pending.)
-rw-r--r--CMakeLists.txt4
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/aio.h3
-rw-r--r--src/nng.h2
-rw-r--r--src/platform/posix/posix_impl.h1
-rw-r--r--src/platform/posix/posix_net.c351
-rw-r--r--src/platform/posix/posix_poll.c18
-rw-r--r--src/platform/posix/posix_socket.c491
-rw-r--r--src/platform/posix/posix_socket.h45
9 files changed, 581 insertions, 336 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3ccf895a..c91beed4 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -138,9 +138,9 @@ elseif (CMAKE_SYSTEM_NAME MATCHES "Windows")
list (APPEND CMAKE_REQUIRED_DEFINITIONS -D_WIN32_WINNT=0x0600)
else ()
- message (AUTHOR_WARNING "WARNING: This platform may or may not be supported: ${CMAKE_SYSTEM_NAME}")
+ message (AUTHOR_WARNING "WARNING: This platform may not be supported: ${CMAKE_SYSTEM_NAME}")
message (AUTHOR_WARNING "${ISSUE_REPORT_MSG}")
- # blithely hope for POSIX t work
+ # blithely hope for POSIX to work
find_package (Threads REQUIRED)
add_definitions (-DPLATFORM_POSIX)
endif ()
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4c32d5ce..cfe66dd5 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -80,6 +80,7 @@ set (NNG_SOURCES
platform/posix/posix_impl.h
platform/posix/posix_config.h
platform/posix/posix_aio.h
+ platform/posix/posix_socket.h
platform/posix/posix_alloc.c
platform/posix/posix_clock.c
@@ -89,6 +90,7 @@ set (NNG_SOURCES
platform/posix/posix_pipe.c
platform/posix/posix_poll.c
platform/posix/posix_rand.c
+ platform/posix/posix_socket.c
platform/posix/posix_thread.c
platform/windows/win_impl.h
diff --git a/src/core/aio.h b/src/core/aio.h
index a290281f..6aad8b00 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -38,6 +38,9 @@ struct nni_aio {
// Message operations.
nni_msg * a_msg;
+ // Connect operations.
+ nni_sockaddr * a_sockaddr;
+
// TBD: Resolver operations.
// Provider-use fields.
diff --git a/src/nng.h b/src/nng.h
index 875bf995..208df8e9 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -458,7 +458,7 @@ NNG_DECL void nng_thread_destroy(void *);
// These structures can be obtained via property lookups, etc.
struct nng_sockaddr_path {
uint16_t sa_family;
- uint8_t sa_path[NNG_MAXADDRLEN];
+ char sa_path[NNG_MAXADDRLEN];
};
typedef struct nng_sockaddr_path nng_sockaddr_path;
typedef struct nng_sockaddr_path nng_sockaddr_ipc;
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 9c8d00dc..83b4914e 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -24,6 +24,7 @@
#define PLATFORM_POSIX_NET
#define PLATFORM_POSIX_PIPE
#define PLATFORM_POSIX_RANDOM
+#define PLATFORM_POSIX_SOCKET
#define PLATFORM_POSIX_THREAD
#include "platform/posix/posix_config.h"
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index 1d44435e..161ffc7c 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -11,6 +11,7 @@
#ifdef PLATFORM_POSIX_NET
#include "platform/posix/posix_aio.h"
+#include "platform/posix/posix_socket.h"
#include <errno.h>
#include <stdlib.h>
@@ -25,73 +26,7 @@
#include <unistd.h>
#include <netdb.h>
-#ifdef SOCK_CLOEXEC
-#define NNI_TCP_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define NNI_TCP_SOCKTYPE SOCK_STREAM
-#endif
-
-struct nni_plat_tcpsock {
- int fd;
- int devnull; // for shutting down accept()
- nni_posix_pipedesc * pd;
-};
-
-static int
-nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa)
-{
- struct sockaddr_in *sin;
- struct sockaddr_in6 *sin6;
-
- switch (sa->s_un.s_family) {
- case NNG_AF_INET:
- sin = (void *) ss;
- memset(sin, 0, sizeof (*sin));
- sin->sin_family = PF_INET;
- sin->sin_port = sa->s_un.s_in.sa_port;
- sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr;
- return (sizeof (*sin));
-
- case NNG_AF_INET6:
- sin6 = (void *) ss;
- memset(sin6, 0, sizeof (*sin6));
-#ifdef SIN6_LEN
- sin6->sin6_len = sizeof (*sin6);
-#endif
- sin6->sin6_family = PF_INET6;
- sin6->sin6_port = sa->s_un.s_in6.sa_port;
- memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16);
- return (sizeof (*sin6));
- }
- return (-1);
-}
-
-
-static int
-nni_plat_from_sockaddr(nni_sockaddr *sa, const struct sockaddr *ss)
-{
- const struct sockaddr_in *sin;
- const struct sockaddr_in6 *sin6;
-
- memset(sa, 0, sizeof (*sa));
- switch (ss->sa_family) {
- case PF_INET:
- sin = (const void *) ss;
- sa->s_un.s_in.sa_family = NNG_AF_INET;
- sa->s_un.s_in.sa_port = sin->sin_port;
- sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr;
- return (0);
-
- case PF_INET6:
- sin6 = (const void *) ss;
- sa->s_un.s_in6.sa_family = NNG_AF_INET6;
- sa->s_un.s_in6.sa_port = sin6->sin6_port;
- memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16);
- return (0);
- }
- return (-1);
-}
-
+// We alias nni_plat_tcpsock to an nni_posix_sock.
int
nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags)
@@ -112,7 +47,7 @@ nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags)
return (NNG_EADDRINVAL);
}
- if (nni_plat_from_sockaddr(addr, ai->ai_addr) < 0) {
+ if (nni_posix_from_sockaddr(addr, ai->ai_addr) < 0) {
freeaddrinfo(ai);
return (NNG_EADDRINVAL);
}
@@ -124,228 +59,62 @@ nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags)
int
nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
{
- struct iovec iov[4]; // We never have more than 3 at present
- int i;
- int offset;
- int resid = 0;
- int rv;
-
- if (cnt > 4) {
- return (NNG_EINVAL);
- }
-
- for (i = 0; i < cnt; i++) {
- iov[i].iov_base = iovs[i].iov_buf;
- iov[i].iov_len = iovs[i].iov_len;
- resid += iov[i].iov_len;
- }
+ return (nni_posix_sock_send_sync((void *) s, iovs, cnt));
+}
- i = 0;
- while (resid) {
- rv = writev(s->fd, &iov[i], cnt);
- if (rv < 0) {
- if (rv == EINTR) {
- continue;
- }
- return (nni_plat_errno(errno));
- }
- if (rv > resid) {
- nni_panic("writev says it wrote too much!");
- }
- resid -= rv;
- while (rv) {
- if (iov[i].iov_len <= rv) {
- rv -= iov[i].iov_len;
- i++;
- cnt--;
- } else {
- iov[i].iov_len -= rv;
- iov[i].iov_base += rv;
- rv = 0;
- }
- }
- }
- return (0);
+int
+nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
+{
+ return (nni_posix_sock_recv_sync((void *) s, iovs, cnt));
}
void
nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio)
{
- nni_posix_pipedesc_write(s->pd, aio);
+ nni_posix_sock_aio_send((void *) s, aio);
}
void
nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio)
{
- nni_posix_pipedesc_read(s->pd, aio);
-}
-
-
-int
-nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
-{
- struct iovec iov[4]; // We never have more than 3 at present
- int i;
- int offset;
- int resid = 0;
- int rv;
-
- if (cnt > 4) {
- return (NNG_EINVAL);
- }
-
- for (i = 0; i < cnt; i++) {
- iov[i].iov_base = iovs[i].iov_buf;
- iov[i].iov_len = iovs[i].iov_len;
- resid += iov[i].iov_len;
- }
-
- i = 0;
- while (resid) {
- rv = readv(s->fd, &iov[i], cnt);
- if (rv < 0) {
- if (errno == EINTR) {
- continue;
- }
- return (nni_plat_errno(errno));
- }
- if (rv == 0) {
- return (NNG_ECLOSED);
- }
- if (rv > resid) {
- nni_panic("readv says it read too much!");
- }
-
- resid -= rv;
- while (rv) {
- if (iov[i].iov_len <= rv) {
- rv -= iov[i].iov_len;
- i++;
- cnt--;
- } else {
- iov[i].iov_len -= rv;
- iov[i].iov_base += rv;
- rv = 0;
- }
- }
- }
-
- return (0);
-}
-
-
-static void
-nni_plat_tcp_setopts(int fd)
-{
- int one;
-
- // Try to ensure that both CLOEXEC is set, and that we don't
- // generate SIGPIPE. (Note that SIGPIPE suppression in this way
- // only works on BSD systems. Linux wants us to use sendmsg().)
- (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
-#if defined(F_SETNOSIGPIPE)
- (void) fcntl(fd, F_SETNOSIGPIPE, 1);
-#elif defined(SO_NOSIGPIPE)
- one = 1;
- (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one));
-#endif
-
- // Also disable Nagle. We are careful to group data with writev,
- // and latency is king for most of our users. (Consider adding
- // a method to enable this later.)
- one = 1;
- (void) setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof (one));
+ nni_posix_sock_aio_recv((void *) s, aio);
}
int
nni_plat_tcp_init(nni_plat_tcpsock **tspp)
{
- nni_plat_tcpsock *tsp;
+ nni_posix_sock *s;
+ int rv;
- if ((tsp = NNI_ALLOC_STRUCT(tsp)) == NULL) {
- return (NNG_ENOMEM);
+ if ((rv = nni_posix_sock_init(&s)) == 0) {
+ *tspp = (void *) s;
}
- tsp->fd = -1;
- *tspp = tsp;
- return (0);
+ return (rv);
}
void
-nni_plat_tcp_fini(nni_plat_tcpsock *tsp)
+nni_plat_tcp_fini(nni_plat_tcpsock *s)
{
- if (tsp->fd != -1) {
- (void) close(tsp->fd);
- tsp->fd = -1;
- }
- if (tsp->pd != NULL) {
- nni_posix_pipedesc_fini(tsp->pd);
- }
- NNI_FREE_STRUCT(tsp);
+ nni_posix_sock_fini((void *) s);
}
void
-nni_plat_tcp_shutdown(nni_plat_tcpsock *tsp)
+nni_plat_tcp_shutdown(nni_plat_tcpsock *s)
{
- if (tsp->fd != -1) {
- (void) shutdown(tsp->fd, SHUT_RDWR);
- // This causes the equivalent of a close. Hopefully waking
- // up anything that didn't get the hint with the shutdown.
- // (macOS does not see the shtudown).
- (void) dup2(nni_plat_devnull, tsp->fd);
- }
- if (tsp->pd != NULL) {
- nni_posix_pipedesc_close(tsp->pd);
- }
+ nni_posix_sock_shutdown((void *) s);
}
-// nni_plat_tcp_bind creates a file descriptor bound to the given address.
-// This basically does the equivalent of socket, bind, and listen. We have
-// chosen a default value for the listen backlog of 128, which should be
-// plenty. (If it isn't, then the accept thread can't get enough resources
-// to keep up, and your clients are going to experience bad things. Normally
-// the actual backlog should hover near 0 anyway.)
int
-nni_plat_tcp_listen(nni_plat_tcpsock *tsp, const nni_sockaddr *addr)
+nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
{
- int fd;
- int len;
- struct sockaddr_storage ss;
- int rv;
-
- len = nni_plat_to_sockaddr(&ss, addr);
- if (len < 0) {
- return (NNG_EADDRINVAL);
- }
-
- if ((fd = socket(ss.ss_family, NNI_TCP_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
-
- nni_plat_tcp_setopts(fd);
-
- if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
- rv = nni_plat_errno(errno);
- (void) close(fd);
- return (rv);
- }
-
- // Listen -- 128 depth is probably sufficient. If it isn't, other
- // bad things are going to happen.
- if (listen(fd, 128) != 0) {
- rv = nni_plat_errno(errno);
- (void) close(fd);
- return (rv);
- }
-
- tsp->fd = fd;
- return (0);
+ return (nni_posix_sock_listen((void *) s, addr));
}
@@ -353,85 +122,17 @@ nni_plat_tcp_listen(nni_plat_tcpsock *tsp, const nni_sockaddr *addr)
// bind address is not null, then it will attempt to bind to the local
// address specified first.
int
-nni_plat_tcp_connect(nni_plat_tcpsock *tsp, const nni_sockaddr *addr,
+nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
const nni_sockaddr *bindaddr)
{
- int fd;
- int len;
- struct sockaddr_storage ss;
- struct sockaddr_storage bss;
- int rv;
-
- len = nni_plat_to_sockaddr(&ss, addr);
- if (len < 0) {
- return (NNG_EADDRINVAL);
- }
-
- if ((fd = socket(ss.ss_family, NNI_TCP_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
-
- if (bindaddr != NULL) {
- if (bindaddr->s_un.s_family != addr->s_un.s_family) {
- return (NNG_EINVAL);
- }
- if (nni_plat_to_sockaddr(&bss, bindaddr) < 0) {
- return (NNG_EADDRINVAL);
- }
- if (bind(fd, (struct sockaddr *) &bss, len) < 0) {
- rv = nni_plat_errno(errno);
- (void) close(fd);
- return (rv);
- }
- }
-
- nni_plat_tcp_setopts(fd);
-
- if (connect(fd, (struct sockaddr *) &ss, len) != 0) {
- rv = nni_plat_errno(errno);
- (void) close(fd);
- return (rv);
- }
- if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) {
- (void) close(fd);
- return (rv);
- }
- tsp->fd = fd;
- return (0);
+ return (nni_posix_sock_connect_sync((void *) s, addr, bindaddr));
}
int
-nni_plat_tcp_accept(nni_plat_tcpsock *tsp, nni_plat_tcpsock *server)
+nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server)
{
- int fd;
- int rv;
-
- for (;;) {
-#ifdef NNG_USE_ACCEPT4
- fd = accept4(server->fd, NULL, NULL, SOCK_CLOEXEC);
- if ((fd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
- fd = accept(server->fd, NULL, NULL);
- }
-#else
- fd = accept(server->fd, NULL, NULL);
-#endif
-
- if (fd < 0) {
- return (nni_plat_errno(errno));
- } else {
- break;
- }
- }
-
- nni_plat_tcp_setopts(fd);
-
- if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) {
- close(fd);
- return (rv);
- }
- tsp->fd = fd;
- return (0);
+ return (nni_posix_sock_accept_sync((void *) s, (void *) server));
}
diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c
index d309babf..c5c494f6 100644
--- a/src/platform/posix/posix_poll.c
+++ b/src/platform/posix/posix_poll.c
@@ -53,6 +53,8 @@ struct nni_posix_epdesc {
nni_list acceptq;
nni_list_node node;
nni_posix_pollq * pq;
+ struct sockaddr_storage locaddr;
+ struct sockaddr_storage remaddr;
};
@@ -78,7 +80,7 @@ struct nni_posix_pollq {
static nni_posix_pollq nni_posix_global_pollq;
static void
-nni_posix_poll_finish(nni_aio *aio, int rv)
+nni_posix_pipedesc_finish(nni_aio *aio, int rv)
{
nni_posix_pipedesc *pd;
@@ -120,7 +122,7 @@ nni_posix_poll_write(nni_posix_pipedesc *pd)
rv = nni_plat_errno(errno);
nni_list_remove(&pd->writeq, aio);
- nni_posix_poll_finish(aio, rv);
+ nni_posix_pipedesc_finish(aio, rv);
return;
}
@@ -148,7 +150,7 @@ nni_posix_poll_write(nni_posix_pipedesc *pd)
// We completed the entire operation on this aioq.
nni_list_remove(&pd->writeq, aio);
- nni_posix_poll_finish(aio, 0);
+ nni_posix_pipedesc_finish(aio, 0);
// Go back to start of loop to see if there is another
// aioq ready for us to process.
@@ -183,13 +185,13 @@ nni_posix_poll_read(nni_posix_pipedesc *pd)
}
rv = nni_plat_errno(errno);
- nni_posix_poll_finish(aio, rv);
+ nni_posix_pipedesc_finish(aio, rv);
return;
}
if (n == 0) {
// No bytes indicates a closed descriptor.
- nni_posix_poll_finish(aio, NNG_ECLOSED);
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
return;
}
@@ -216,7 +218,7 @@ nni_posix_poll_read(nni_posix_pipedesc *pd)
}
// We completed the entire operation on this aioq.
- nni_posix_poll_finish(aio, 0);
+ nni_posix_pipedesc_finish(aio, 0);
// Go back to start of loop to see if there is another
// aioq ready for us to process.
@@ -230,10 +232,10 @@ nni_posix_poll_close(nni_posix_pipedesc *pd)
nni_aio *aio;
while ((aio = nni_list_first(&pd->readq)) != NULL) {
- nni_posix_poll_finish(aio, NNG_ECLOSED);
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
}
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- nni_posix_poll_finish(aio, NNG_ECLOSED);
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
}
}
diff --git a/src/platform/posix/posix_socket.c b/src/platform/posix/posix_socket.c
new file mode 100644
index 00000000..b50bd65f
--- /dev/null
+++ b/src/platform/posix/posix_socket.c
@@ -0,0 +1,491 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef PLATFORM_POSIX_SOCKET
+#include "platform/posix/posix_aio.h"
+#include "platform/posix/posix_socket.h"
+
+#include <errno.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <netdb.h>
+
+#ifdef SOCK_CLOEXEC
+#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
+#else
+#define NNI_STREAM_SOCKTYPE SOCK_STREAM
+#endif
+
+struct nni_posix_sock {
+ int fd;
+ int devnull; // for shutting down accept()
+ char * unlink; // path to unlink at unbind
+ nni_posix_pipedesc * pd;
+};
+
+int
+nni_posix_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa)
+{
+ struct sockaddr_in *sin;
+ struct sockaddr_un *sun;
+
+#ifdef PF_INET6
+ struct sockaddr_in6 *sin6;
+#endif
+
+ switch (sa->s_un.s_family) {
+ case NNG_AF_INET:
+ sin = (void *) ss;
+ memset(sin, 0, sizeof (*sin));
+ sin->sin_family = PF_INET;
+ sin->sin_port = sa->s_un.s_in.sa_port;
+ sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr;
+ return (sizeof (*sin));
+
+#ifdef PF_INET6
+ // Not every platform can do IPv6. Amazingly.
+ case NNG_AF_INET6:
+ sin6 = (void *) ss;
+ memset(sin6, 0, sizeof (*sin6));
+#ifdef SIN6_LEN
+ sin6->sin6_len = sizeof (*sin6);
+#endif
+ sin6->sin6_family = PF_INET6;
+ sin6->sin6_port = sa->s_un.s_in6.sa_port;
+ memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16);
+ return (sizeof (*sin6));
+
+#endif // PF_INET6
+
+ case NNG_AF_IPC:
+ sun = (void *) ss;
+ memset(sun, 0, sizeof (*sun));
+ // NB: This logic does not support abstract sockets, which
+ // have their first byte NULL, and rely on length instead.
+ // Probably for dealing with abstract sockets we will just
+ // handle @ specially in the future.
+ if (strlen(sa->s_un.s_path.sa_path) >=
+ sizeof (sun->sun_path)) {
+ return (-1); // caller converts to NNG_EADDRINVAL
+ }
+
+ sun->sun_family = PF_UNIX;
+ (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s",
+ sa->s_un.s_path.sa_path);
+ // Some systems (Linux!) have sun_len, while others do not.
+ // The lack of a length field means systems without it cannot
+ // use abstract sockets.
+#ifdef SUN_LEN
+ sun->sun_len = SUN_LEN(sun);
+ return (sun->sun_len);
+
+#else
+ return (sizeof (*sun));
+
+#endif
+ }
+ return (-1);
+}
+
+
+int
+nni_posix_from_sockaddr(nni_sockaddr *sa, const struct sockaddr *ss)
+{
+ const struct sockaddr_in *sin;
+ const struct sockaddr_un *sun;
+
+#ifdef PF_INET6
+ const struct sockaddr_in6 *sin6;
+#endif
+
+ memset(sa, 0, sizeof (*sa));
+ switch (ss->sa_family) {
+ case PF_INET:
+ sin = (const void *) ss;
+ sa->s_un.s_in.sa_family = NNG_AF_INET;
+ sa->s_un.s_in.sa_port = sin->sin_port;
+ sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr;
+ return (0);
+
+#ifdef PF_INET6
+ case PF_INET6:
+ sin6 = (const void *) ss;
+ sa->s_un.s_in6.sa_family = NNG_AF_INET6;
+ sa->s_un.s_in6.sa_port = sin6->sin6_port;
+ memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16);
+ return (0);
+
+#endif // PF_INET6
+
+ case PF_UNIX:
+ // NB: This doesn't handle abstract sockets!
+ sun = (const void *) ss;
+ sa->s_un.s_path.sa_family = NNG_AF_IPC;
+ snprintf(sa->s_un.s_path.sa_path,
+ sizeof (sa->s_un.s_path.sa_path), "%s", sun->sun_path);
+ return (0);
+ }
+ return (-1);
+}
+
+
+void
+nni_posix_sock_aio_send(nni_posix_sock *s, nni_aio *aio)
+{
+ nni_posix_pipedesc_write(s->pd, aio);
+}
+
+
+void
+nni_posix_sock_aio_recv(nni_posix_sock *s, nni_aio *aio)
+{
+ nni_posix_pipedesc_read(s->pd, aio);
+}
+
+
+static void
+nni_posix_sock_setopts_fd(int fd)
+{
+ int one;
+
+ // Try to ensure that both CLOEXEC is set, and that we don't
+ // generate SIGPIPE. (Note that SIGPIPE suppression in this way
+ // only works on BSD systems. Linux wants us to use sendmsg().)
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+#if defined(F_SETNOSIGPIPE)
+ (void) fcntl(fd, F_SETNOSIGPIPE, 1);
+#elif defined(SO_NOSIGPIPE)
+ one = 1;
+ (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one));
+#endif
+
+ // Also disable Nagle. We are careful to group data with writev,
+ // and latency is king for most of our users. (Consider adding
+ // a method to enable this later.)
+
+ // It's unclear whether this is safe for UNIX domain sockets. It
+ // *should* be.
+ one = 1;
+ (void) setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof (one));
+}
+
+
+int
+nni_posix_sock_init(nni_posix_sock **sp)
+{
+ nni_posix_sock *s;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ s->fd = -1;
+ *sp = s;
+ return (0);
+}
+
+
+void
+nni_posix_sock_fini(nni_posix_sock *s)
+{
+ if (s->fd != -1) {
+ (void) close(s->fd);
+ s->fd = -1;
+ }
+ if (s->pd != NULL) {
+ nni_posix_pipedesc_fini(s->pd);
+ }
+ NNI_FREE_STRUCT(s);
+}
+
+
+void
+nni_posix_sock_shutdown(nni_posix_sock *s)
+{
+ if (s->fd != -1) {
+ (void) shutdown(s->fd, SHUT_RDWR);
+ // This causes the equivalent of a close. Hopefully waking
+ // up anything that didn't get the hint with the shutdown.
+ // (macOS does not see the shtudown).
+ (void) dup2(nni_plat_devnull, s->fd);
+ }
+ if (s->pd != NULL) {
+ nni_posix_pipedesc_close(s->pd);
+ }
+}
+
+
+int
+nni_posix_sock_listen(nni_posix_sock *s, const nni_sockaddr *saddr)
+{
+ int len;
+ struct sockaddr_storage ss;
+ int rv;
+ int fd;
+
+ if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) {
+ return (NNG_EADDRINVAL);
+ }
+
+ if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ nni_posix_sock_setopts_fd(fd);
+
+ // UNIX DOMAIN SOCKETS -- these have names in the file namespace.
+ // We are going to check to see if there was a name already there.
+ // If there was, and nothing is listening (ECONNREFUSED), then we
+ // will just try to cleanup the old socket. Note that this is not
+ // perfect in all scenarios, so use this with caution.
+ if ((ss.ss_family == AF_UNIX) &&
+ (saddr->s_un.s_path.sa_path[0] != 0)) {
+ int chkfd;
+ if ((chkfd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ (void) close(fd);
+ return (nni_plat_errno(errno));
+ }
+
+ // Nonblocking; we don't want to wait for remote server.
+ (void) fcntl(chkfd, F_SETFL, O_NONBLOCK);
+ if (connect(chkfd, (struct sockaddr *) &ss, sizeof (ss)) < 0) {
+ if (errno == ECONNREFUSED) {
+ (void) unlink(saddr->s_un.s_path.sa_path);
+ }
+ }
+ (void) close(chkfd);
+
+ // Record the path so we unlink it later
+ s->unlink = nni_alloc(strlen(saddr->s_un.s_path.sa_path) + 1);
+ if (s->unlink == NULL) {
+ (void) close(fd);
+ return (NNG_ENOMEM);
+ }
+ }
+
+
+ if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(fd);
+ return (rv);
+ }
+
+ // Listen -- 128 depth is probably sufficient. If it isn't, other
+ // bad things are going to happen.
+ if (listen(fd, 128) != 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(fd);
+ return (rv);
+ }
+
+ s->fd = fd;
+ return (0);
+}
+
+
+// These functions will need to be removed in the future. They are
+// transition functions for now.
+
+int
+nni_posix_sock_send_sync(nni_posix_sock *s, nni_iov *iovs, int cnt)
+{
+ struct iovec iov[4]; // We never have more than 3 at present
+ int i;
+ int offset;
+ int resid = 0;
+ int rv;
+
+ if (cnt > 4) {
+ return (NNG_EINVAL);
+ }
+
+ for (i = 0; i < cnt; i++) {
+ iov[i].iov_base = iovs[i].iov_buf;
+ iov[i].iov_len = iovs[i].iov_len;
+ resid += iov[i].iov_len;
+ }
+
+ i = 0;
+ while (resid) {
+ rv = writev(s->fd, &iov[i], cnt);
+ if (rv < 0) {
+ if (rv == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
+ }
+ NNI_ASSERT(rv <= resid);
+ resid -= rv;
+ while (rv) {
+ if (iov[i].iov_len <= rv) {
+ rv -= iov[i].iov_len;
+ i++;
+ cnt--;
+ } else {
+ iov[i].iov_len -= rv;
+ iov[i].iov_base += rv;
+ rv = 0;
+ }
+ }
+ }
+
+ return (0);
+}
+
+
+int
+nni_posix_sock_recv_sync(nni_posix_sock *s, nni_iov *iovs, int cnt)
+{
+ struct iovec iov[4]; // We never have more than 3 at present
+ int i;
+ int offset;
+ int resid = 0;
+ int rv;
+
+ if (cnt > 4) {
+ return (NNG_EINVAL);
+ }
+
+ for (i = 0; i < cnt; i++) {
+ iov[i].iov_base = iovs[i].iov_buf;
+ iov[i].iov_len = iovs[i].iov_len;
+ resid += iov[i].iov_len;
+ }
+
+ i = 0;
+ while (resid) {
+ rv = readv(s->fd, &iov[i], cnt);
+ if (rv < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
+ }
+ if (rv == 0) {
+ return (NNG_ECLOSED);
+ }
+ NNI_ASSERT(rv <= resid);
+
+ resid -= rv;
+ while (rv) {
+ if (iov[i].iov_len <= rv) {
+ rv -= iov[i].iov_len;
+ i++;
+ cnt--;
+ } else {
+ iov[i].iov_len -= rv;
+ iov[i].iov_base += rv;
+ rv = 0;
+ }
+ }
+ }
+
+ return (0);
+}
+
+
+int
+nni_posix_sock_accept_sync(nni_posix_sock *s, nni_posix_sock *server)
+{
+ int fd;
+ int rv;
+
+ for (;;) {
+#ifdef NNG_USE_ACCEPT4
+ fd = accept4(server->fd, NULL, NULL, SOCK_CLOEXEC);
+ if ((fd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
+ fd = accept(server->fd, NULL, NULL);
+ }
+#else
+ fd = accept(server->fd, NULL, NULL);
+#endif
+
+ if (fd < 0) {
+ return (nni_plat_errno(errno));
+ } else {
+ break;
+ }
+ }
+
+ nni_posix_sock_setopts_fd(fd);
+
+ if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) {
+ close(fd);
+ return (rv);
+ }
+ s->fd = fd;
+ return (0);
+}
+
+
+int
+nni_posix_sock_connect_sync(nni_posix_sock *s, const nni_sockaddr *addr,
+ const nni_sockaddr *bindaddr)
+{
+ int fd;
+ int len;
+ struct sockaddr_storage ss;
+ struct sockaddr_storage bss;
+ int rv;
+
+ if ((len = nni_posix_to_sockaddr(&ss, addr)) < 0) {
+ return (NNG_EADDRINVAL);
+ }
+
+ if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ if (bindaddr != NULL) {
+ if (bindaddr->s_un.s_family != addr->s_un.s_family) {
+ return (NNG_EINVAL);
+ }
+ if (nni_posix_to_sockaddr(&bss, bindaddr) < 0) {
+ return (NNG_EADDRINVAL);
+ }
+ if (bind(fd, (struct sockaddr *) &bss, len) < 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(fd);
+ return (rv);
+ }
+ }
+
+ nni_posix_sock_setopts_fd(fd);
+
+ if (connect(fd, (struct sockaddr *) &ss, len) != 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(fd);
+ return (rv);
+ }
+ if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) {
+ (void) close(fd);
+ return (rv);
+ }
+ s->fd = fd;
+ return (0);
+}
+
+
+#else
+
+// Suppress empty symbols warnings in ranlib.
+int nni_posix_socket_not_used = 0;
+
+#endif // PLATFORM_POSIX_SOCKET
diff --git a/src/platform/posix/posix_socket.h b/src/platform/posix/posix_socket.h
new file mode 100644
index 00000000..f3fb169c
--- /dev/null
+++ b/src/platform/posix/posix_socket.h
@@ -0,0 +1,45 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef PLATFORM_POSIX_SOCKET_H
+#define PLATFORM_POSIX_SOCKET_H
+
+// This file provides declarations for comment socket handling functions on
+// POSIX platforms. We assume that TCP and Unix domain socket (IPC) all
+// work using mostly comment socket handling routines.
+
+#include "core/nng_impl.h"
+
+#include "platform/posix/posix_aio.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+
+typedef struct nni_posix_sock nni_posix_sock;
+
+extern int nni_posix_to_sockaddr(struct sockaddr_storage *,
+ const nni_sockaddr *);
+extern int nni_posix_from_sockaddr(nni_sockaddr *, const struct sockaddr *);
+extern void nni_posix_sock_aio_send(nni_posix_sock *, nni_aio *);
+extern void nni_posix_sock_aio_recv(nni_posix_sock *, nni_aio *);
+extern int nni_posix_sock_init(nni_posix_sock **);
+extern void nni_posix_sock_fini(nni_posix_sock *);
+extern void nni_posix_sock_shutdown(nni_posix_sock *);
+extern int nni_posix_sock_listen(nni_posix_sock *, const nni_sockaddr *);
+
+// These functions will need to be removed in the future. They are
+// transition functions for now.
+
+extern int nni_posix_sock_send_sync(nni_posix_sock *, nni_iov *, int);
+extern int nni_posix_sock_recv_sync(nni_posix_sock *, nni_iov *, int);
+extern int nni_posix_sock_accept_sync(nni_posix_sock *, nni_posix_sock *);
+extern int nni_posix_sock_connect_sync(nni_posix_sock *,
+ const nni_sockaddr *, const nni_sockaddr *);
+
+#endif // PLATFORM_POSIX_SOCKET_H