aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-05 20:22:36 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-05 20:22:36 -0700
commit8811317e2da3b5a21d6caab0cc0e12aad417edd6 (patch)
tree3ee093b515d3b6d69554bf7913c3626a5605d178 /src/platform/posix
parent5ee6713c34963ed400c8886213ed2ee53c367c74 (diff)
downloadnng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.tar.gz
nng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.tar.bz2
nng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.zip
Make ipc work 100% async.
The connect & accept logic for IPC is now fully asynchronous. This will serve as a straight-forward template for TCP. Note that the upper logic still uses a thread to run this "synchronously", but that will be able to be removed once the last transport (TCP) is made fully async. The unified ipcsock is also now separated, and we anticipate being able to remove the posix_sock.c logic shortly. Separating out the endpoint logic from the pipe logic helps makes things clearer, and may faciliate a day where endpoints have multiple addresses (for example with a connect() endpoint that uses a round-robin DNS list and tries to run the entire list in parallel, stopping with the first connection made.) The platform header got a little cleanup while we were here.
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_aio.h6
-rw-r--r--src/platform/posix/posix_epdesc.c115
-rw-r--r--src/platform/posix/posix_ipc.c158
-rw-r--r--src/platform/posix/posix_pollq_poll.c1
4 files changed, 181 insertions, 99 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 186f586f..13559c08 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -30,10 +30,14 @@ extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_send(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_close(nni_posix_pipedesc *);
-extern int nni_posix_epdesc_init(nni_posix_epdesc **, int);
+extern int nni_posix_epdesc_init(nni_posix_epdesc **, const char *);
+extern const char *nni_posix_epdesc_url(nni_posix_epdesc *);
+extern void nni_posix_epdesc_set_local(nni_posix_epdesc *, void *, int);
+extern void nni_posix_epdesc_set_remote(nni_posix_epdesc *, void *, int);
extern void nni_posix_epdesc_fini(nni_posix_epdesc *);
extern void nni_posix_epdesc_close(nni_posix_epdesc *);
extern void nni_posix_epdesc_connect(nni_posix_epdesc *, nni_aio *);
+extern int nni_posix_epdesc_listen(nni_posix_epdesc *);
extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *);
#endif // PLATFORM_POSIX_AIO_H
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index f419d25a..29420c26 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -42,6 +42,7 @@ struct nni_posix_epdesc {
struct sockaddr_storage remaddr;
socklen_t loclen;
socklen_t remlen;
+ const char * url;
nni_mtx mtx;
};
@@ -71,7 +72,7 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
if (rv != 0) {
(void) close(newfd);
} else {
- aio->a_pipe = pipe;
+ aio->a_pipe = pd;
}
}
// Abuse the count to hold our new fd. This is only for accept.
@@ -110,6 +111,9 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
return;
default:
+ if (rv == ENOENT) {
+ rv = ECONNREFUSED;
+ }
nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
close(ed->fd);
ed->fd = -1;
@@ -232,65 +236,25 @@ nni_posix_epdesc_close(nni_posix_epdesc *ed)
}
-// UNIX DOMAIN SOCKETS -- these have names in the file namespace.
-// We are going to check to see if there was a name already there.
-// If there was, and nothing is listening (ECONNREFUSED), then we
-// will just try to cleanup the old socket. Note that this is not
-// perfect in all scenarios, so use this with caution.
-static int
-nni_posix_epdesc_remove_stale_ipc_socket(struct sockaddr *sa, socklen_t len)
-{
- int fd;
- struct sockaddr_un *sun = (void *) sa;
-
- if ((len == 0) || (sun->sun_family != AF_UNIX)) {
- return (0);
- }
-
- if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
-
- // There is an assumption here that connect() returns immediately
- // (even when non-blocking) when a server is absent. This seems
- // to be true for the platforms we've tried. If it doesn't work,
- // then the cleanup will fail. As this is supposed to be an
- // exceptional case, don't worry.
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if (connect(fd, (void *) sun, len) < 0) {
- if (errno == ECONNREFUSED) {
- (void) unlink(sun->sun_path);
- }
- }
- (void) close(fd);
- return (0);
-}
-
-
int
-nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr)
+nni_posix_epdesc_listen(nni_posix_epdesc *ed)
{
int len;
- struct sockaddr_storage ss;
+ struct sockaddr_storage *ss;
int rv;
int fd;
- if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) {
- return (NNG_EADDRINVAL);
- }
+ nni_mtx_lock(&ed->mtx);
+ ss = &ed->locaddr;
+ len = ed->loclen;
- if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
return (nni_plat_errno(errno));
}
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
- rv = nni_posix_epdesc_remove_stale_ipc_socket((void *) &ss, len);
- if (rv != 0) {
- (void) close(fd);
- return (rv);
- }
-
- if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ if (bind(fd, (struct sockaddr *) ss, len) < 0) {
+ nni_mtx_unlock(&ed->mtx);
rv = nni_plat_errno(errno);
(void) close(fd);
return (rv);
@@ -299,12 +263,15 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr)
// Listen -- 128 depth is probably sufficient. If it isn't, other
// bad things are going to happen.
if (listen(fd, 128) != 0) {
+ nni_mtx_unlock(&ed->mtx);
rv = nni_plat_errno(errno);
(void) close(fd);
return (rv);
}
ed->fd = fd;
+ ed->node.fd = fd;
+ nni_mtx_unlock(&ed->mtx);
return (0);
}
@@ -363,6 +330,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
nni_posix_epdesc_finish(aio, rv, 0);
return;
}
+ ed->node.fd = ed->fd;
// Possibly bind.
if (ed->loclen != 0) {
@@ -380,6 +348,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (rv == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
+
nni_posix_epdesc_finish(aio, 0, ed->fd);
ed->fd = -1;
nni_mtx_unlock(&ed->mtx);
@@ -388,6 +357,9 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (errno != EINPROGRESS) {
// Some immediate failure occurred.
+ if (errno == ENOENT) {
+ errno = ECONNREFUSED;
+ }
(void) close(ed->fd);
ed->fd = -1;
nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
@@ -413,7 +385,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
int
-nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
+nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
{
nni_posix_epdesc *ed;
int rv;
@@ -432,14 +404,12 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
// one. For now we just have a global pollq. Note that by tying
// the ed to a single pollq we may get some kind of cache warmth.
- ed->pq = nni_posix_pollq_get(fd);
- ed->fd = fd;
+ ed->pq = nni_posix_pollq_get((int) nni_random());
+ ed->fd = -1;
ed->node.index = 0;
- ed->node.cb = NULL; // XXXX:
+ ed->node.cb = nni_posix_epdesc_cb;
ed->node.data = ed;
-
- // Ensure we are in non-blocking mode.
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ ed->url = url;
nni_aio_list_init(&ed->connectq);
nni_aio_list_init(&ed->acceptq);
@@ -449,6 +419,39 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
}
+const char *
+nni_posix_epdesc_url(nni_posix_epdesc *ed)
+{
+ return (ed->url);
+}
+
+
+void
+nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, int len)
+{
+ if ((len < 0) || (len > sizeof (struct sockaddr_storage))) {
+ return;
+ }
+ nni_mtx_lock(&ed->mtx);
+ memcpy(&ed->locaddr, sa, len);
+ ed->loclen = len;
+ nni_mtx_unlock(&ed->mtx);
+}
+
+
+void
+nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len)
+{
+ if ((len < 0) || (len > sizeof (struct sockaddr_storage))) {
+ return;
+ }
+ nni_mtx_lock(&ed->mtx);
+ memcpy(&ed->remaddr, sa, len);
+ ed->remlen = len;
+ nni_mtx_unlock(&ed->mtx);
+}
+
+
void
nni_posix_epdesc_fini(nni_posix_epdesc *ed)
{
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index c1911a05..5bd42190 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -25,105 +25,181 @@
#include <unistd.h>
#include <netdb.h>
+#ifdef SOCK_CLOEXEC
+#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
+#else
+#define NNI_STREAM_SOCKTYPE SOCK_STREAM
+#endif
+
+
// Solaris/SunOS systems define this, which collides with our symbol
// names. Just undefine it now.
#ifdef sun
#undef sun
#endif
-// We alias nni_posix_pipdedesc to nni_plat_ipcsock.
+// We alias nni_posix_pipedesc to nni_plat_ipc_pipe.
+// We alias nni_posix_epdesc to nni_plat_ipc_ep.
static int
-nni_plat_ipc_path_resolve(nni_sockaddr *addr, const char *path)
+nni_plat_ipc_path_resolve(struct sockaddr_un *sun, const char *path)
{
- nng_sockaddr_path *spath;
size_t len;
- memset(addr, 0, sizeof (*addr));
- spath = &addr->s_un.s_path;
+ memset(sun, 0, sizeof (*sun));
// TODO: abstract sockets, including autobind sockets.
len = strlen(path);
- if ((len >= sizeof (spath->sa_path)) || (len < 1)) {
+ if ((len >= sizeof (sun->sun_path)) || (len < 1)) {
return (NNG_EADDRINVAL);
}
- (void) snprintf(spath->sa_path, sizeof (spath->sa_path), "%s", path);
- spath->sa_family = NNG_AF_IPC;
+ (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", path);
+ sun->sun_family = AF_UNIX;
+ return (0);
+}
+
+
+int
+nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url)
+{
+ nni_posix_epdesc *ed;
+ int rv;
+
+ if (strncmp(url, "ipc://", strlen("ipc://")) != 0) {
+ return (NNG_EADDRINVAL);
+ }
+ url += strlen("ipc://"); // skip the prefix.
+ if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) {
+ return (rv);
+ }
+
+ *epp = (void *) ed;
return (0);
}
void
-nni_plat_ipc_send(nni_plat_ipcsock *s, nni_aio *aio)
+nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
{
- nni_posix_sock_aio_send((void *) s, aio);
+ nni_posix_epdesc_fini((void *) ep);
}
void
-nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_aio *aio)
+nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
+{
+ nni_posix_epdesc_close((void *) ep);
+}
+
+
+// UNIX DOMAIN SOCKETS -- these have names in the file namespace.
+// We are going to check to see if there was a name already there.
+// If there was, and nothing is listening (ECONNREFUSED), then we
+// will just try to cleanup the old socket. Note that this is not
+// perfect in all scenarios, so use this with caution.
+static int
+nni_plat_ipc_remove_stale(struct sockaddr_un *sun)
{
- nni_posix_sock_aio_recv((void *) s, aio);
+ int fd;
+ int rv;
+
+ if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ // There is an assumption here that connect() returns immediately
+ // (even when non-blocking) when a server is absent. This seems
+ // to be true for the platforms we've tried. If it doesn't work,
+ // then the cleanup will fail. As this is supposed to be an
+ // exceptional case, don't worry.
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ if (connect(fd, (void *) sun, sizeof (*sun)) < 0) {
+ if (errno == ECONNREFUSED) {
+ (void) unlink(sun->sun_path);
+ }
+ }
+ (void) close(fd);
+ return (0);
}
int
-nni_plat_ipc_init(nni_plat_ipcsock **sp)
+nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
{
- nni_posix_sock *s;
+ const char *path;
+ nni_posix_epdesc *ed = (void *) ep;
+ struct sockaddr_un sun;
int rv;
- if ((rv = nni_posix_sock_init(&s)) == 0) {
- *sp = (void *) s;
+ path = nni_posix_epdesc_url(ed);
+
+ if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) {
+ return (rv);
+ }
+
+ if ((rv = nni_plat_ipc_remove_stale(&sun)) != 0) {
+ return (rv);
}
- return (rv);
+
+ nni_posix_epdesc_set_local(ed, &sun, sizeof (sun));
+
+ return (nni_posix_epdesc_listen(ed));
}
void
-nni_plat_ipc_fini(nni_plat_ipcsock *s)
+nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
- nni_posix_sock_fini((void *) s);
+ const char *path;
+ nni_posix_epdesc *ed = (void *) ep;
+ struct sockaddr_un sun;
+ int rv;
+
+ path = nni_posix_epdesc_url(ed);
+
+ if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) {
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun));
+
+ nni_posix_epdesc_connect(ed, aio);
}
void
-nni_plat_ipc_shutdown(nni_plat_ipcsock *s)
+nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
{
- nni_posix_sock_shutdown((void *) s);
+ nni_posix_epdesc_accept((void *) ep, aio);
}
-int
-nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path)
+void
+nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p)
{
- int rv;
- nni_sockaddr addr;
-
- if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) {
- return (rv);
- }
- return (nni_posix_sock_listen((void *) s, &addr));
+ nni_posix_pipedesc_fini((void *) p);
}
-int
-nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path)
+void
+nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p)
{
- int rv;
- nni_sockaddr addr;
+ nni_posix_pipedesc_close((void *) p);
+}
- if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) {
- return (rv);
- }
- return (nni_posix_sock_connect_sync((void *) s, &addr, NULL));
+
+void
+nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio)
+{
+ nni_posix_pipedesc_send((void *) p, aio);
}
-int
-nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server)
+void
+nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio)
{
- return (nni_posix_sock_accept_sync((void *) s, (void *) server));
+ nni_posix_pipedesc_recv((void *) p, aio);
}
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index a70c902d..d20e7b44 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -180,7 +180,6 @@ nni_posix_poll_thr(void *arg)
// Clear the index for the next time around.
node->index = 0;
-
node->revents = fds[index].revents;
// Now we move this node to the callback list.