diff options
Diffstat (limited to 'src/platform')
| -rw-r--r-- | src/platform/posix/posix_aio.h | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 115 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipc.c | 158 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 1 |
4 files changed, 181 insertions, 99 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 186f586f..13559c08 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -30,10 +30,14 @@ extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_send(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_close(nni_posix_pipedesc *); -extern int nni_posix_epdesc_init(nni_posix_epdesc **, int); +extern int nni_posix_epdesc_init(nni_posix_epdesc **, const char *); +extern const char *nni_posix_epdesc_url(nni_posix_epdesc *); +extern void nni_posix_epdesc_set_local(nni_posix_epdesc *, void *, int); +extern void nni_posix_epdesc_set_remote(nni_posix_epdesc *, void *, int); extern void nni_posix_epdesc_fini(nni_posix_epdesc *); extern void nni_posix_epdesc_close(nni_posix_epdesc *); extern void nni_posix_epdesc_connect(nni_posix_epdesc *, nni_aio *); +extern int nni_posix_epdesc_listen(nni_posix_epdesc *); extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *); #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index f419d25a..29420c26 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -42,6 +42,7 @@ struct nni_posix_epdesc { struct sockaddr_storage remaddr; socklen_t loclen; socklen_t remlen; + const char * url; nni_mtx mtx; }; @@ -71,7 +72,7 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) if (rv != 0) { (void) close(newfd); } else { - aio->a_pipe = pipe; + aio->a_pipe = pd; } } // Abuse the count to hold our new fd. This is only for accept. @@ -110,6 +111,9 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) return; default: + if (rv == ENOENT) { + rv = ECONNREFUSED; + } nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0); close(ed->fd); ed->fd = -1; @@ -232,65 +236,25 @@ nni_posix_epdesc_close(nni_posix_epdesc *ed) } -// UNIX DOMAIN SOCKETS -- these have names in the file namespace. -// We are going to check to see if there was a name already there. -// If there was, and nothing is listening (ECONNREFUSED), then we -// will just try to cleanup the old socket. Note that this is not -// perfect in all scenarios, so use this with caution. -static int -nni_posix_epdesc_remove_stale_ipc_socket(struct sockaddr *sa, socklen_t len) -{ - int fd; - struct sockaddr_un *sun = (void *) sa; - - if ((len == 0) || (sun->sun_family != AF_UNIX)) { - return (0); - } - - if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - - // There is an assumption here that connect() returns immediately - // (even when non-blocking) when a server is absent. This seems - // to be true for the platforms we've tried. If it doesn't work, - // then the cleanup will fail. As this is supposed to be an - // exceptional case, don't worry. - (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if (connect(fd, (void *) sun, len) < 0) { - if (errno == ECONNREFUSED) { - (void) unlink(sun->sun_path); - } - } - (void) close(fd); - return (0); -} - - int -nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr) +nni_posix_epdesc_listen(nni_posix_epdesc *ed) { int len; - struct sockaddr_storage ss; + struct sockaddr_storage *ss; int rv; int fd; - if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) { - return (NNG_EADDRINVAL); - } + nni_mtx_lock(&ed->mtx); + ss = &ed->locaddr; + len = ed->loclen; - if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { + if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { return (nni_plat_errno(errno)); } (void) fcntl(fd, F_SETFD, FD_CLOEXEC); - rv = nni_posix_epdesc_remove_stale_ipc_socket((void *) &ss, len); - if (rv != 0) { - (void) close(fd); - return (rv); - } - - if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + if (bind(fd, (struct sockaddr *) ss, len) < 0) { + nni_mtx_unlock(&ed->mtx); rv = nni_plat_errno(errno); (void) close(fd); return (rv); @@ -299,12 +263,15 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr) // Listen -- 128 depth is probably sufficient. If it isn't, other // bad things are going to happen. if (listen(fd, 128) != 0) { + nni_mtx_unlock(&ed->mtx); rv = nni_plat_errno(errno); (void) close(fd); return (rv); } ed->fd = fd; + ed->node.fd = fd; + nni_mtx_unlock(&ed->mtx); return (0); } @@ -363,6 +330,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) nni_posix_epdesc_finish(aio, rv, 0); return; } + ed->node.fd = ed->fd; // Possibly bind. if (ed->loclen != 0) { @@ -380,6 +348,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) if (rv == 0) { // Immediate connect, cool! This probably only happens on // loopback, and probably not on every platform. + nni_posix_epdesc_finish(aio, 0, ed->fd); ed->fd = -1; nni_mtx_unlock(&ed->mtx); @@ -388,6 +357,9 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) if (errno != EINPROGRESS) { // Some immediate failure occurred. + if (errno == ENOENT) { + errno = ECONNREFUSED; + } (void) close(ed->fd); ed->fd = -1; nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); @@ -413,7 +385,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) int -nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) +nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) { nni_posix_epdesc *ed; int rv; @@ -432,14 +404,12 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) // one. For now we just have a global pollq. Note that by tying // the ed to a single pollq we may get some kind of cache warmth. - ed->pq = nni_posix_pollq_get(fd); - ed->fd = fd; + ed->pq = nni_posix_pollq_get((int) nni_random()); + ed->fd = -1; ed->node.index = 0; - ed->node.cb = NULL; // XXXX: + ed->node.cb = nni_posix_epdesc_cb; ed->node.data = ed; - - // Ensure we are in non-blocking mode. - (void) fcntl(fd, F_SETFL, O_NONBLOCK); + ed->url = url; nni_aio_list_init(&ed->connectq); nni_aio_list_init(&ed->acceptq); @@ -449,6 +419,39 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) } +const char * +nni_posix_epdesc_url(nni_posix_epdesc *ed) +{ + return (ed->url); +} + + +void +nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, int len) +{ + if ((len < 0) || (len > sizeof (struct sockaddr_storage))) { + return; + } + nni_mtx_lock(&ed->mtx); + memcpy(&ed->locaddr, sa, len); + ed->loclen = len; + nni_mtx_unlock(&ed->mtx); +} + + +void +nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len) +{ + if ((len < 0) || (len > sizeof (struct sockaddr_storage))) { + return; + } + nni_mtx_lock(&ed->mtx); + memcpy(&ed->remaddr, sa, len); + ed->remlen = len; + nni_mtx_unlock(&ed->mtx); +} + + void nni_posix_epdesc_fini(nni_posix_epdesc *ed) { diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index c1911a05..5bd42190 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -25,105 +25,181 @@ #include <unistd.h> #include <netdb.h> +#ifdef SOCK_CLOEXEC +#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) +#else +#define NNI_STREAM_SOCKTYPE SOCK_STREAM +#endif + + // Solaris/SunOS systems define this, which collides with our symbol // names. Just undefine it now. #ifdef sun #undef sun #endif -// We alias nni_posix_pipdedesc to nni_plat_ipcsock. +// We alias nni_posix_pipedesc to nni_plat_ipc_pipe. +// We alias nni_posix_epdesc to nni_plat_ipc_ep. static int -nni_plat_ipc_path_resolve(nni_sockaddr *addr, const char *path) +nni_plat_ipc_path_resolve(struct sockaddr_un *sun, const char *path) { - nng_sockaddr_path *spath; size_t len; - memset(addr, 0, sizeof (*addr)); - spath = &addr->s_un.s_path; + memset(sun, 0, sizeof (*sun)); // TODO: abstract sockets, including autobind sockets. len = strlen(path); - if ((len >= sizeof (spath->sa_path)) || (len < 1)) { + if ((len >= sizeof (sun->sun_path)) || (len < 1)) { return (NNG_EADDRINVAL); } - (void) snprintf(spath->sa_path, sizeof (spath->sa_path), "%s", path); - spath->sa_family = NNG_AF_IPC; + (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", path); + sun->sun_family = AF_UNIX; + return (0); +} + + +int +nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url) +{ + nni_posix_epdesc *ed; + int rv; + + if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { + return (NNG_EADDRINVAL); + } + url += strlen("ipc://"); // skip the prefix. + if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { + return (rv); + } + + *epp = (void *) ed; return (0); } void -nni_plat_ipc_send(nni_plat_ipcsock *s, nni_aio *aio) +nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) { - nni_posix_sock_aio_send((void *) s, aio); + nni_posix_epdesc_fini((void *) ep); } void -nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_aio *aio) +nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) +{ + nni_posix_epdesc_close((void *) ep); +} + + +// UNIX DOMAIN SOCKETS -- these have names in the file namespace. +// We are going to check to see if there was a name already there. +// If there was, and nothing is listening (ECONNREFUSED), then we +// will just try to cleanup the old socket. Note that this is not +// perfect in all scenarios, so use this with caution. +static int +nni_plat_ipc_remove_stale(struct sockaddr_un *sun) { - nni_posix_sock_aio_recv((void *) s, aio); + int fd; + int rv; + + if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { + return (nni_plat_errno(errno)); + } + + // There is an assumption here that connect() returns immediately + // (even when non-blocking) when a server is absent. This seems + // to be true for the platforms we've tried. If it doesn't work, + // then the cleanup will fail. As this is supposed to be an + // exceptional case, don't worry. + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + if (connect(fd, (void *) sun, sizeof (*sun)) < 0) { + if (errno == ECONNREFUSED) { + (void) unlink(sun->sun_path); + } + } + (void) close(fd); + return (0); } int -nni_plat_ipc_init(nni_plat_ipcsock **sp) +nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) { - nni_posix_sock *s; + const char *path; + nni_posix_epdesc *ed = (void *) ep; + struct sockaddr_un sun; int rv; - if ((rv = nni_posix_sock_init(&s)) == 0) { - *sp = (void *) s; + path = nni_posix_epdesc_url(ed); + + if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { + return (rv); + } + + if ((rv = nni_plat_ipc_remove_stale(&sun)) != 0) { + return (rv); } - return (rv); + + nni_posix_epdesc_set_local(ed, &sun, sizeof (sun)); + + return (nni_posix_epdesc_listen(ed)); } void -nni_plat_ipc_fini(nni_plat_ipcsock *s) +nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) { - nni_posix_sock_fini((void *) s); + const char *path; + nni_posix_epdesc *ed = (void *) ep; + struct sockaddr_un sun; + int rv; + + path = nni_posix_epdesc_url(ed); + + if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { + nni_aio_finish(aio, rv, 0); + return; + } + + nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun)); + + nni_posix_epdesc_connect(ed, aio); } void -nni_plat_ipc_shutdown(nni_plat_ipcsock *s) +nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) { - nni_posix_sock_shutdown((void *) s); + nni_posix_epdesc_accept((void *) ep, aio); } -int -nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path) +void +nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p) { - int rv; - nni_sockaddr addr; - - if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) { - return (rv); - } - return (nni_posix_sock_listen((void *) s, &addr)); + nni_posix_pipedesc_fini((void *) p); } -int -nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path) +void +nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p) { - int rv; - nni_sockaddr addr; + nni_posix_pipedesc_close((void *) p); +} - if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) { - return (rv); - } - return (nni_posix_sock_connect_sync((void *) s, &addr, NULL)); + +void +nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio) +{ + nni_posix_pipedesc_send((void *) p, aio); } -int -nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server) +void +nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio) { - return (nni_posix_sock_accept_sync((void *) s, (void *) server)); + nni_posix_pipedesc_recv((void *) p, aio); } diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index a70c902d..d20e7b44 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -180,7 +180,6 @@ nni_posix_poll_thr(void *arg) // Clear the index for the next time around. node->index = 0; - node->revents = fds[index].revents; // Now we move this node to the callback list. |
