aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_aio.h6
-rw-r--r--src/platform/posix/posix_epdesc.c115
-rw-r--r--src/platform/posix/posix_ipc.c158
-rw-r--r--src/platform/posix/posix_pollq_poll.c1
4 files changed, 181 insertions, 99 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 186f586f..13559c08 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -30,10 +30,14 @@ extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_send(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_close(nni_posix_pipedesc *);
-extern int nni_posix_epdesc_init(nni_posix_epdesc **, int);
+extern int nni_posix_epdesc_init(nni_posix_epdesc **, const char *);
+extern const char *nni_posix_epdesc_url(nni_posix_epdesc *);
+extern void nni_posix_epdesc_set_local(nni_posix_epdesc *, void *, int);
+extern void nni_posix_epdesc_set_remote(nni_posix_epdesc *, void *, int);
extern void nni_posix_epdesc_fini(nni_posix_epdesc *);
extern void nni_posix_epdesc_close(nni_posix_epdesc *);
extern void nni_posix_epdesc_connect(nni_posix_epdesc *, nni_aio *);
+extern int nni_posix_epdesc_listen(nni_posix_epdesc *);
extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *);
#endif // PLATFORM_POSIX_AIO_H
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index f419d25a..29420c26 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -42,6 +42,7 @@ struct nni_posix_epdesc {
struct sockaddr_storage remaddr;
socklen_t loclen;
socklen_t remlen;
+ const char * url;
nni_mtx mtx;
};
@@ -71,7 +72,7 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
if (rv != 0) {
(void) close(newfd);
} else {
- aio->a_pipe = pipe;
+ aio->a_pipe = pd;
}
}
// Abuse the count to hold our new fd. This is only for accept.
@@ -110,6 +111,9 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
return;
default:
+ if (rv == ENOENT) {
+ rv = ECONNREFUSED;
+ }
nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
close(ed->fd);
ed->fd = -1;
@@ -232,65 +236,25 @@ nni_posix_epdesc_close(nni_posix_epdesc *ed)
}
-// UNIX DOMAIN SOCKETS -- these have names in the file namespace.
-// We are going to check to see if there was a name already there.
-// If there was, and nothing is listening (ECONNREFUSED), then we
-// will just try to cleanup the old socket. Note that this is not
-// perfect in all scenarios, so use this with caution.
-static int
-nni_posix_epdesc_remove_stale_ipc_socket(struct sockaddr *sa, socklen_t len)
-{
- int fd;
- struct sockaddr_un *sun = (void *) sa;
-
- if ((len == 0) || (sun->sun_family != AF_UNIX)) {
- return (0);
- }
-
- if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
-
- // There is an assumption here that connect() returns immediately
- // (even when non-blocking) when a server is absent. This seems
- // to be true for the platforms we've tried. If it doesn't work,
- // then the cleanup will fail. As this is supposed to be an
- // exceptional case, don't worry.
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if (connect(fd, (void *) sun, len) < 0) {
- if (errno == ECONNREFUSED) {
- (void) unlink(sun->sun_path);
- }
- }
- (void) close(fd);
- return (0);
-}
-
-
int
-nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr)
+nni_posix_epdesc_listen(nni_posix_epdesc *ed)
{
int len;
- struct sockaddr_storage ss;
+ struct sockaddr_storage *ss;
int rv;
int fd;
- if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) {
- return (NNG_EADDRINVAL);
- }
+ nni_mtx_lock(&ed->mtx);
+ ss = &ed->locaddr;
+ len = ed->loclen;
- if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
return (nni_plat_errno(errno));
}
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
- rv = nni_posix_epdesc_remove_stale_ipc_socket((void *) &ss, len);
- if (rv != 0) {
- (void) close(fd);
- return (rv);
- }
-
- if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ if (bind(fd, (struct sockaddr *) ss, len) < 0) {
+ nni_mtx_unlock(&ed->mtx);
rv = nni_plat_errno(errno);
(void) close(fd);
return (rv);
@@ -299,12 +263,15 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr)
// Listen -- 128 depth is probably sufficient. If it isn't, other
// bad things are going to happen.
if (listen(fd, 128) != 0) {
+ nni_mtx_unlock(&ed->mtx);
rv = nni_plat_errno(errno);
(void) close(fd);
return (rv);
}
ed->fd = fd;
+ ed->node.fd = fd;
+ nni_mtx_unlock(&ed->mtx);
return (0);
}
@@ -363,6 +330,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
nni_posix_epdesc_finish(aio, rv, 0);
return;
}
+ ed->node.fd = ed->fd;
// Possibly bind.
if (ed->loclen != 0) {
@@ -380,6 +348,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (rv == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
+
nni_posix_epdesc_finish(aio, 0, ed->fd);
ed->fd = -1;
nni_mtx_unlock(&ed->mtx);
@@ -388,6 +357,9 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (errno != EINPROGRESS) {
// Some immediate failure occurred.
+ if (errno == ENOENT) {
+ errno = ECONNREFUSED;
+ }
(void) close(ed->fd);
ed->fd = -1;
nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
@@ -413,7 +385,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
int
-nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
+nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
{
nni_posix_epdesc *ed;
int rv;
@@ -432,14 +404,12 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
// one. For now we just have a global pollq. Note that by tying
// the ed to a single pollq we may get some kind of cache warmth.
- ed->pq = nni_posix_pollq_get(fd);
- ed->fd = fd;
+ ed->pq = nni_posix_pollq_get((int) nni_random());
+ ed->fd = -1;
ed->node.index = 0;
- ed->node.cb = NULL; // XXXX:
+ ed->node.cb = nni_posix_epdesc_cb;
ed->node.data = ed;
-
- // Ensure we are in non-blocking mode.
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ ed->url = url;
nni_aio_list_init(&ed->connectq);
nni_aio_list_init(&ed->acceptq);
@@ -449,6 +419,39 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
}
+const char *
+nni_posix_epdesc_url(nni_posix_epdesc *ed)
+{
+ return (ed->url);
+}
+
+
+void
+nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, int len)
+{
+ if ((len < 0) || (len > sizeof (struct sockaddr_storage))) {
+ return;
+ }
+ nni_mtx_lock(&ed->mtx);
+ memcpy(&ed->locaddr, sa, len);
+ ed->loclen = len;
+ nni_mtx_unlock(&ed->mtx);
+}
+
+
+void
+nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len)
+{
+ if ((len < 0) || (len > sizeof (struct sockaddr_storage))) {
+ return;
+ }
+ nni_mtx_lock(&ed->mtx);
+ memcpy(&ed->remaddr, sa, len);
+ ed->remlen = len;
+ nni_mtx_unlock(&ed->mtx);
+}
+
+
void
nni_posix_epdesc_fini(nni_posix_epdesc *ed)
{
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index c1911a05..5bd42190 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -25,105 +25,181 @@
#include <unistd.h>
#include <netdb.h>
+#ifdef SOCK_CLOEXEC
+#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
+#else
+#define NNI_STREAM_SOCKTYPE SOCK_STREAM
+#endif
+
+
// Solaris/SunOS systems define this, which collides with our symbol
// names. Just undefine it now.
#ifdef sun
#undef sun
#endif
-// We alias nni_posix_pipdedesc to nni_plat_ipcsock.
+// We alias nni_posix_pipedesc to nni_plat_ipc_pipe.
+// We alias nni_posix_epdesc to nni_plat_ipc_ep.
static int
-nni_plat_ipc_path_resolve(nni_sockaddr *addr, const char *path)
+nni_plat_ipc_path_resolve(struct sockaddr_un *sun, const char *path)
{
- nng_sockaddr_path *spath;
size_t len;
- memset(addr, 0, sizeof (*addr));
- spath = &addr->s_un.s_path;
+ memset(sun, 0, sizeof (*sun));
// TODO: abstract sockets, including autobind sockets.
len = strlen(path);
- if ((len >= sizeof (spath->sa_path)) || (len < 1)) {
+ if ((len >= sizeof (sun->sun_path)) || (len < 1)) {
return (NNG_EADDRINVAL);
}
- (void) snprintf(spath->sa_path, sizeof (spath->sa_path), "%s", path);
- spath->sa_family = NNG_AF_IPC;
+ (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", path);
+ sun->sun_family = AF_UNIX;
+ return (0);
+}
+
+
+int
+nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url)
+{
+ nni_posix_epdesc *ed;
+ int rv;
+
+ if (strncmp(url, "ipc://", strlen("ipc://")) != 0) {
+ return (NNG_EADDRINVAL);
+ }
+ url += strlen("ipc://"); // skip the prefix.
+ if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) {
+ return (rv);
+ }
+
+ *epp = (void *) ed;
return (0);
}
void
-nni_plat_ipc_send(nni_plat_ipcsock *s, nni_aio *aio)
+nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
{
- nni_posix_sock_aio_send((void *) s, aio);
+ nni_posix_epdesc_fini((void *) ep);
}
void
-nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_aio *aio)
+nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
+{
+ nni_posix_epdesc_close((void *) ep);
+}
+
+
+// UNIX DOMAIN SOCKETS -- these have names in the file namespace.
+// We are going to check to see if there was a name already there.
+// If there was, and nothing is listening (ECONNREFUSED), then we
+// will just try to cleanup the old socket. Note that this is not
+// perfect in all scenarios, so use this with caution.
+static int
+nni_plat_ipc_remove_stale(struct sockaddr_un *sun)
{
- nni_posix_sock_aio_recv((void *) s, aio);
+ int fd;
+ int rv;
+
+ if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ // There is an assumption here that connect() returns immediately
+ // (even when non-blocking) when a server is absent. This seems
+ // to be true for the platforms we've tried. If it doesn't work,
+ // then the cleanup will fail. As this is supposed to be an
+ // exceptional case, don't worry.
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ if (connect(fd, (void *) sun, sizeof (*sun)) < 0) {
+ if (errno == ECONNREFUSED) {
+ (void) unlink(sun->sun_path);
+ }
+ }
+ (void) close(fd);
+ return (0);
}
int
-nni_plat_ipc_init(nni_plat_ipcsock **sp)
+nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
{
- nni_posix_sock *s;
+ const char *path;
+ nni_posix_epdesc *ed = (void *) ep;
+ struct sockaddr_un sun;
int rv;
- if ((rv = nni_posix_sock_init(&s)) == 0) {
- *sp = (void *) s;
+ path = nni_posix_epdesc_url(ed);
+
+ if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) {
+ return (rv);
+ }
+
+ if ((rv = nni_plat_ipc_remove_stale(&sun)) != 0) {
+ return (rv);
}
- return (rv);
+
+ nni_posix_epdesc_set_local(ed, &sun, sizeof (sun));
+
+ return (nni_posix_epdesc_listen(ed));
}
void
-nni_plat_ipc_fini(nni_plat_ipcsock *s)
+nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
- nni_posix_sock_fini((void *) s);
+ const char *path;
+ nni_posix_epdesc *ed = (void *) ep;
+ struct sockaddr_un sun;
+ int rv;
+
+ path = nni_posix_epdesc_url(ed);
+
+ if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) {
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun));
+
+ nni_posix_epdesc_connect(ed, aio);
}
void
-nni_plat_ipc_shutdown(nni_plat_ipcsock *s)
+nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
{
- nni_posix_sock_shutdown((void *) s);
+ nni_posix_epdesc_accept((void *) ep, aio);
}
-int
-nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path)
+void
+nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p)
{
- int rv;
- nni_sockaddr addr;
-
- if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) {
- return (rv);
- }
- return (nni_posix_sock_listen((void *) s, &addr));
+ nni_posix_pipedesc_fini((void *) p);
}
-int
-nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path)
+void
+nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p)
{
- int rv;
- nni_sockaddr addr;
+ nni_posix_pipedesc_close((void *) p);
+}
- if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) {
- return (rv);
- }
- return (nni_posix_sock_connect_sync((void *) s, &addr, NULL));
+
+void
+nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio)
+{
+ nni_posix_pipedesc_send((void *) p, aio);
}
-int
-nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server)
+void
+nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio)
{
- return (nni_posix_sock_accept_sync((void *) s, (void *) server));
+ nni_posix_pipedesc_recv((void *) p, aio);
}
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index a70c902d..d20e7b44 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -180,7 +180,6 @@ nni_posix_poll_thr(void *arg)
// Clear the index for the next time around.
node->index = 0;
-
node->revents = fds[index].revents;
// Now we move this node to the callback list.