diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-05 20:22:36 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-05 20:22:36 -0700 |
| commit | 8811317e2da3b5a21d6caab0cc0e12aad417edd6 (patch) | |
| tree | 3ee093b515d3b6d69554bf7913c3626a5605d178 /src/platform/posix/posix_ipc.c | |
| parent | 5ee6713c34963ed400c8886213ed2ee53c367c74 (diff) | |
| download | nng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.tar.gz nng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.tar.bz2 nng-8811317e2da3b5a21d6caab0cc0e12aad417edd6.zip | |
Make ipc work 100% async.
The connect & accept logic for IPC is now fully asynchronous.
This will serve as a straight-forward template for TCP. Note that
the upper logic still uses a thread to run this "synchronously", but
that will be able to be removed once the last transport (TCP) is made
fully async.
The unified ipcsock is also now separated, and we anticipate being
able to remove the posix_sock.c logic shortly. Separating out the
endpoint logic from the pipe logic helps makes things clearer, and
may faciliate a day where endpoints have multiple addresses (for
example with a connect() endpoint that uses a round-robin DNS list
and tries to run the entire list in parallel, stopping with the first
connection made.)
The platform header got a little cleanup while we were here.
Diffstat (limited to 'src/platform/posix/posix_ipc.c')
| -rw-r--r-- | src/platform/posix/posix_ipc.c | 158 |
1 files changed, 117 insertions, 41 deletions
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index c1911a05..5bd42190 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -25,105 +25,181 @@ #include <unistd.h> #include <netdb.h> +#ifdef SOCK_CLOEXEC +#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) +#else +#define NNI_STREAM_SOCKTYPE SOCK_STREAM +#endif + + // Solaris/SunOS systems define this, which collides with our symbol // names. Just undefine it now. #ifdef sun #undef sun #endif -// We alias nni_posix_pipdedesc to nni_plat_ipcsock. +// We alias nni_posix_pipedesc to nni_plat_ipc_pipe. +// We alias nni_posix_epdesc to nni_plat_ipc_ep. static int -nni_plat_ipc_path_resolve(nni_sockaddr *addr, const char *path) +nni_plat_ipc_path_resolve(struct sockaddr_un *sun, const char *path) { - nng_sockaddr_path *spath; size_t len; - memset(addr, 0, sizeof (*addr)); - spath = &addr->s_un.s_path; + memset(sun, 0, sizeof (*sun)); // TODO: abstract sockets, including autobind sockets. len = strlen(path); - if ((len >= sizeof (spath->sa_path)) || (len < 1)) { + if ((len >= sizeof (sun->sun_path)) || (len < 1)) { return (NNG_EADDRINVAL); } - (void) snprintf(spath->sa_path, sizeof (spath->sa_path), "%s", path); - spath->sa_family = NNG_AF_IPC; + (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", path); + sun->sun_family = AF_UNIX; + return (0); +} + + +int +nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url) +{ + nni_posix_epdesc *ed; + int rv; + + if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { + return (NNG_EADDRINVAL); + } + url += strlen("ipc://"); // skip the prefix. + if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { + return (rv); + } + + *epp = (void *) ed; return (0); } void -nni_plat_ipc_send(nni_plat_ipcsock *s, nni_aio *aio) +nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) { - nni_posix_sock_aio_send((void *) s, aio); + nni_posix_epdesc_fini((void *) ep); } void -nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_aio *aio) +nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) +{ + nni_posix_epdesc_close((void *) ep); +} + + +// UNIX DOMAIN SOCKETS -- these have names in the file namespace. +// We are going to check to see if there was a name already there. +// If there was, and nothing is listening (ECONNREFUSED), then we +// will just try to cleanup the old socket. Note that this is not +// perfect in all scenarios, so use this with caution. +static int +nni_plat_ipc_remove_stale(struct sockaddr_un *sun) { - nni_posix_sock_aio_recv((void *) s, aio); + int fd; + int rv; + + if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { + return (nni_plat_errno(errno)); + } + + // There is an assumption here that connect() returns immediately + // (even when non-blocking) when a server is absent. This seems + // to be true for the platforms we've tried. If it doesn't work, + // then the cleanup will fail. As this is supposed to be an + // exceptional case, don't worry. + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + if (connect(fd, (void *) sun, sizeof (*sun)) < 0) { + if (errno == ECONNREFUSED) { + (void) unlink(sun->sun_path); + } + } + (void) close(fd); + return (0); } int -nni_plat_ipc_init(nni_plat_ipcsock **sp) +nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) { - nni_posix_sock *s; + const char *path; + nni_posix_epdesc *ed = (void *) ep; + struct sockaddr_un sun; int rv; - if ((rv = nni_posix_sock_init(&s)) == 0) { - *sp = (void *) s; + path = nni_posix_epdesc_url(ed); + + if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { + return (rv); + } + + if ((rv = nni_plat_ipc_remove_stale(&sun)) != 0) { + return (rv); } - return (rv); + + nni_posix_epdesc_set_local(ed, &sun, sizeof (sun)); + + return (nni_posix_epdesc_listen(ed)); } void -nni_plat_ipc_fini(nni_plat_ipcsock *s) +nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) { - nni_posix_sock_fini((void *) s); + const char *path; + nni_posix_epdesc *ed = (void *) ep; + struct sockaddr_un sun; + int rv; + + path = nni_posix_epdesc_url(ed); + + if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { + nni_aio_finish(aio, rv, 0); + return; + } + + nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun)); + + nni_posix_epdesc_connect(ed, aio); } void -nni_plat_ipc_shutdown(nni_plat_ipcsock *s) +nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) { - nni_posix_sock_shutdown((void *) s); + nni_posix_epdesc_accept((void *) ep, aio); } -int -nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path) +void +nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p) { - int rv; - nni_sockaddr addr; - - if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) { - return (rv); - } - return (nni_posix_sock_listen((void *) s, &addr)); + nni_posix_pipedesc_fini((void *) p); } -int -nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path) +void +nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p) { - int rv; - nni_sockaddr addr; + nni_posix_pipedesc_close((void *) p); +} - if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) { - return (rv); - } - return (nni_posix_sock_connect_sync((void *) s, &addr, NULL)); + +void +nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio) +{ + nni_posix_pipedesc_send((void *) p, aio); } -int -nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server) +void +nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio) { - return (nni_posix_sock_accept_sync((void *) s, (void *) server)); + nni_posix_pipedesc_recv((void *) p, aio); } |
