From 8811317e2da3b5a21d6caab0cc0e12aad417edd6 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 5 Jul 2017 20:22:36 -0700 Subject: Make ipc work 100% async. The connect & accept logic for IPC is now fully asynchronous. This will serve as a straight-forward template for TCP. Note that the upper logic still uses a thread to run this "synchronously", but that will be able to be removed once the last transport (TCP) is made fully async. The unified ipcsock is also now separated, and we anticipate being able to remove the posix_sock.c logic shortly. Separating out the endpoint logic from the pipe logic helps makes things clearer, and may faciliate a day where endpoints have multiple addresses (for example with a connect() endpoint that uses a round-robin DNS list and tries to run the entire list in parallel, stopping with the first connection made.) The platform header got a little cleanup while we were here. --- src/core/platform.h | 114 +++++++++++++------- src/platform/posix/posix_aio.h | 6 +- src/platform/posix/posix_epdesc.c | 115 ++++++++++---------- src/platform/posix/posix_ipc.c | 158 ++++++++++++++++++++------- src/platform/posix/posix_pollq_poll.c | 1 - src/transport/ipc/ipc.c | 196 +++++++++++++++++++++------------- 6 files changed, 379 insertions(+), 211 deletions(-) (limited to 'src') diff --git a/src/core/platform.h b/src/core/platform.h index 7e6dfbbe..971c6a5d 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -37,6 +37,10 @@ // they do not need in the child. (Note that posix_spawn() does *NOT* // arrange for pthread_atfork() handlers to be called on some platforms.) +// +// Debugging Support +// + // nni_plat_abort crashes the system; it should do whatever is appropriate // for abnormal programs on the platform, such as calling abort(). extern void nni_plat_abort(void); @@ -46,6 +50,18 @@ extern void nni_plat_abort(void); // not contain newlines, but the output will add them. extern void nni_plat_println(const char *); +// nni_plat_strerror allows the platform to use additional error messages +// for additional error codes. The err code passed in should be the +// equivalent of errno or GetLastError, without the NNG_ESYSERR component. +// The platform should make sure that the returned value will be valid +// after the call returns. (If necessary, thread-local storage can be +// used.) +extern const char *nni_plat_strerror(int); + +// +// Memory Management +// + // nni_alloc allocates memory. In most cases this can just be malloc(). // However, you may provide a different allocator, for example it is // possible to use a slab allocator or somesuch. It is permissible for this @@ -64,9 +80,10 @@ typedef struct nni_plat_mtx nni_plat_mtx; typedef struct nni_plat_cv nni_plat_cv; typedef struct nni_plat_thr nni_plat_thr; typedef struct nni_plat_tcpsock nni_plat_tcpsock; -typedef struct nni_plat_ipcsock nni_plat_ipcsock; -// Mutex handling. +// +// Threading & Synchronization Support +// // nni_plat_mtx_init initializes a mutex structure. This may require dynamic // allocation, depending on the platform. It can return NNG_ENOMEM if that @@ -127,6 +144,10 @@ extern int nni_plat_thr_init(nni_plat_thr *, void (*)(void *), void *); // is an error to reference the thread in any further way. extern void nni_plat_thr_fini(nni_plat_thr *); +// +// Clock Support +// + // nn_plat_clock returns a number of microseconds since some arbitrary time // in the past. The values returned by nni_clock must use the same base // as the times used in nni_plat_cond_waituntil. The nni_plat_clock() must @@ -138,6 +159,16 @@ extern nni_time nni_plat_clock(void); // nni_plat_usleep sleeps for the specified number of microseconds (at least). extern void nni_plat_usleep(nni_duration); +// +// Entropy Support +// + +// nni_plat_seed_prng seeds the PRNG subsystem. The specified number +// of bytes of entropy should be stashed. When possible, cryptographic +// quality entropy sources should be used. Note that today we prefer +// to seed up to 256 bytes of data. +extern void nni_plat_seed_prng(void *, size_t); + // nni_plat_init is called to allow the platform the chance to // do any necessary initialization. This routine MUST be idempotent, // and threadsafe, and will be called before any other API calls, and @@ -157,13 +188,6 @@ extern int nni_plat_init(int (*)(void)); // will be called until nni_platform_init is called. extern void nni_plat_fini(void); -// nni_plat_strerror allows the platform to use additional error messages -// for additional error codes. The err code passed in should be the -// equivalent of errno or GetLastError, without the NNG_ESYSERR component. -// The platform should make sure that the returned value will be valid -// after the call returns. (If necessary, thread-local storage can be -// used.) -extern const char *nni_plat_strerror(int); // nni_plat_lookup_host looks up a hostname in DNS, or the local hosts // file, or whatever. If your platform lacks support for naming, it must @@ -172,6 +196,10 @@ extern const char *nni_plat_strerror(int); // returned on dual stack machines. extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int); +// +// TCP Support. +// + // nni_plat_tcp_init initializes the socket, for example it can // set underlying file descriptors to -1, etc. extern int nni_plat_tcp_init(nni_plat_tcpsock **); @@ -214,48 +242,56 @@ extern void nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *); // full, or an error condition occurs. extern void nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *); -// nni_plat_ipc_init initializes the socket, for example it can -// set underlying file descriptors to -1, etc. -extern int nni_plat_ipc_init(nni_plat_ipcsock **); +// +// IPC (UNIX Domain Sockets & Named Pipes) Support. +// -// nni_plat_ipc_fini just closes an IPC socket, and releases any related -// resources. -extern void nni_plat_ipc_fini(nni_plat_ipcsock *); +typedef struct nni_plat_ipc_ep nni_plat_ipc_ep; +typedef struct nni_plat_ipc_pipe nni_plat_ipc_pipe; -// nni_plat_ipc_shutdown performs a shutdown of the socket. For -// BSD sockets, this closes both sides of the IPC connection gracefully, -// but the underlying file descriptor is left open. (This part is critical -// to prevention of close() related races.) -extern void nni_plat_ipc_shutdown(nni_plat_ipcsock *); +// nni_plat_ipc_ep_init creates a new endpoint associated with the path. +extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const char *); + +// nni_plat_ipc_ep_fini closes the endpoint and releases resources. +extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *); + +// nni_plat_ipc_ep_close closes the endpoint; this might not close the +// actual underlying socket, but it should call shutdown on it. +// Further operations on the pipe should return NNG_ECLOSED. +extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *); // nni_plat_tcp_listen creates an IPC socket in listening mode, bound // to the specified path. Note that nni_plat_ipcsock should be defined // to whatever your platform uses. For most systems its just "int". -extern int nni_plat_ipc_listen(nni_plat_ipcsock *, const char *); +extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *); -// nni_plat_ipc_accept does the accept to accept an inbound connection. -// The ipcsock used for the server will have been set up with the -// nni_plat_ipc_listen. -extern int nni_plat_ipc_accept(nni_plat_ipcsock *, nni_plat_ipcsock *); +// nni_plat_ipc_ep_accept starts an accept to receive an incoming connection. +// An accepted connection will be passed back in the a_pipe member. +extern void nni_plat_ipc_ep_accept(nni_plat_ipc_ep *, nni_aio *); // nni_plat_ipc_connect is the client side. -extern int nni_plat_ipc_connect(nni_plat_ipcsock *, const char *); +// An accepted connection will be passed back in the a_pipe member. +extern void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *, nni_aio *); -// nni_plat_ipc_aio_send sends data to the peer. The platform is responsible -// for attempting to send all of the data. The iov count will never be -// larger than 4. The platform may modify the iovs. -extern void nni_plat_ipc_send(nni_plat_ipcsock *, nni_aio *); +// nni_plat_ipc_pipe_fini closes the pipe, and releases all resources +// associated with it. +extern void nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *); -// nni_plat_ipc_aio_recv recvs data into the buffers provided by the -// iovs. The implementation does not return until the iovs are completely -// full, or an error condition occurs. -extern void nni_plat_ipc_recv(nni_plat_ipcsock *, nni_aio *); +// nni_plat_ipc_pipe_close closes the socket, or at least shuts it down. +// Further operations on the pipe should return NNG_ECLOSED. +extern void nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *); -// nni_plat_seed_prng seeds the PRNG subsystem. The specified number -// of bytes of entropy should be stashed. When possible, cryptographic -// quality entropy sources should be used. Note that today we prefer -// to seed up to 256 bytes of data. -extern void nni_plat_seed_prng(void *, size_t); +// nni_plat_ipc_pipe_send sends data in the iov buffers to the peer. +// The platform may modify the iovs. +extern void nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *, nni_aio *); + +// nni_plat_ipc_pipe_recv recvs data into the buffers provided by the iovs. +// The platform may modify the iovs. +extern void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *, nni_aio *); + +// +// Notification Pipe Pairs +// // nni_plat_pipe creates a pair of linked file descriptors that are // suitable for notification via SENDFD/RECVFD. These are platform diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 186f586f..13559c08 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -30,10 +30,14 @@ extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_send(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_close(nni_posix_pipedesc *); -extern int nni_posix_epdesc_init(nni_posix_epdesc **, int); +extern int nni_posix_epdesc_init(nni_posix_epdesc **, const char *); +extern const char *nni_posix_epdesc_url(nni_posix_epdesc *); +extern void nni_posix_epdesc_set_local(nni_posix_epdesc *, void *, int); +extern void nni_posix_epdesc_set_remote(nni_posix_epdesc *, void *, int); extern void nni_posix_epdesc_fini(nni_posix_epdesc *); extern void nni_posix_epdesc_close(nni_posix_epdesc *); extern void nni_posix_epdesc_connect(nni_posix_epdesc *, nni_aio *); +extern int nni_posix_epdesc_listen(nni_posix_epdesc *); extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *); #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index f419d25a..29420c26 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -42,6 +42,7 @@ struct nni_posix_epdesc { struct sockaddr_storage remaddr; socklen_t loclen; socklen_t remlen; + const char * url; nni_mtx mtx; }; @@ -71,7 +72,7 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) if (rv != 0) { (void) close(newfd); } else { - aio->a_pipe = pipe; + aio->a_pipe = pd; } } // Abuse the count to hold our new fd. This is only for accept. @@ -110,6 +111,9 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) return; default: + if (rv == ENOENT) { + rv = ECONNREFUSED; + } nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0); close(ed->fd); ed->fd = -1; @@ -232,65 +236,25 @@ nni_posix_epdesc_close(nni_posix_epdesc *ed) } -// UNIX DOMAIN SOCKETS -- these have names in the file namespace. -// We are going to check to see if there was a name already there. -// If there was, and nothing is listening (ECONNREFUSED), then we -// will just try to cleanup the old socket. Note that this is not -// perfect in all scenarios, so use this with caution. -static int -nni_posix_epdesc_remove_stale_ipc_socket(struct sockaddr *sa, socklen_t len) -{ - int fd; - struct sockaddr_un *sun = (void *) sa; - - if ((len == 0) || (sun->sun_family != AF_UNIX)) { - return (0); - } - - if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - - // There is an assumption here that connect() returns immediately - // (even when non-blocking) when a server is absent. This seems - // to be true for the platforms we've tried. If it doesn't work, - // then the cleanup will fail. As this is supposed to be an - // exceptional case, don't worry. - (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if (connect(fd, (void *) sun, len) < 0) { - if (errno == ECONNREFUSED) { - (void) unlink(sun->sun_path); - } - } - (void) close(fd); - return (0); -} - - int -nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr) +nni_posix_epdesc_listen(nni_posix_epdesc *ed) { int len; - struct sockaddr_storage ss; + struct sockaddr_storage *ss; int rv; int fd; - if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) { - return (NNG_EADDRINVAL); - } + nni_mtx_lock(&ed->mtx); + ss = &ed->locaddr; + len = ed->loclen; - if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { + if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { return (nni_plat_errno(errno)); } (void) fcntl(fd, F_SETFD, FD_CLOEXEC); - rv = nni_posix_epdesc_remove_stale_ipc_socket((void *) &ss, len); - if (rv != 0) { - (void) close(fd); - return (rv); - } - - if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + if (bind(fd, (struct sockaddr *) ss, len) < 0) { + nni_mtx_unlock(&ed->mtx); rv = nni_plat_errno(errno); (void) close(fd); return (rv); @@ -299,12 +263,15 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr) // Listen -- 128 depth is probably sufficient. If it isn't, other // bad things are going to happen. if (listen(fd, 128) != 0) { + nni_mtx_unlock(&ed->mtx); rv = nni_plat_errno(errno); (void) close(fd); return (rv); } ed->fd = fd; + ed->node.fd = fd; + nni_mtx_unlock(&ed->mtx); return (0); } @@ -363,6 +330,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) nni_posix_epdesc_finish(aio, rv, 0); return; } + ed->node.fd = ed->fd; // Possibly bind. if (ed->loclen != 0) { @@ -380,6 +348,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) if (rv == 0) { // Immediate connect, cool! This probably only happens on // loopback, and probably not on every platform. + nni_posix_epdesc_finish(aio, 0, ed->fd); ed->fd = -1; nni_mtx_unlock(&ed->mtx); @@ -388,6 +357,9 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) if (errno != EINPROGRESS) { // Some immediate failure occurred. + if (errno == ENOENT) { + errno = ECONNREFUSED; + } (void) close(ed->fd); ed->fd = -1; nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); @@ -413,7 +385,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) int -nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) +nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) { nni_posix_epdesc *ed; int rv; @@ -432,14 +404,12 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) // one. For now we just have a global pollq. Note that by tying // the ed to a single pollq we may get some kind of cache warmth. - ed->pq = nni_posix_pollq_get(fd); - ed->fd = fd; + ed->pq = nni_posix_pollq_get((int) nni_random()); + ed->fd = -1; ed->node.index = 0; - ed->node.cb = NULL; // XXXX: + ed->node.cb = nni_posix_epdesc_cb; ed->node.data = ed; - - // Ensure we are in non-blocking mode. - (void) fcntl(fd, F_SETFL, O_NONBLOCK); + ed->url = url; nni_aio_list_init(&ed->connectq); nni_aio_list_init(&ed->acceptq); @@ -449,6 +419,39 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) } +const char * +nni_posix_epdesc_url(nni_posix_epdesc *ed) +{ + return (ed->url); +} + + +void +nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, int len) +{ + if ((len < 0) || (len > sizeof (struct sockaddr_storage))) { + return; + } + nni_mtx_lock(&ed->mtx); + memcpy(&ed->locaddr, sa, len); + ed->loclen = len; + nni_mtx_unlock(&ed->mtx); +} + + +void +nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len) +{ + if ((len < 0) || (len > sizeof (struct sockaddr_storage))) { + return; + } + nni_mtx_lock(&ed->mtx); + memcpy(&ed->remaddr, sa, len); + ed->remlen = len; + nni_mtx_unlock(&ed->mtx); +} + + void nni_posix_epdesc_fini(nni_posix_epdesc *ed) { diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index c1911a05..5bd42190 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -25,105 +25,181 @@ #include #include +#ifdef SOCK_CLOEXEC +#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) +#else +#define NNI_STREAM_SOCKTYPE SOCK_STREAM +#endif + + // Solaris/SunOS systems define this, which collides with our symbol // names. Just undefine it now. #ifdef sun #undef sun #endif -// We alias nni_posix_pipdedesc to nni_plat_ipcsock. +// We alias nni_posix_pipedesc to nni_plat_ipc_pipe. +// We alias nni_posix_epdesc to nni_plat_ipc_ep. static int -nni_plat_ipc_path_resolve(nni_sockaddr *addr, const char *path) +nni_plat_ipc_path_resolve(struct sockaddr_un *sun, const char *path) { - nng_sockaddr_path *spath; size_t len; - memset(addr, 0, sizeof (*addr)); - spath = &addr->s_un.s_path; + memset(sun, 0, sizeof (*sun)); // TODO: abstract sockets, including autobind sockets. len = strlen(path); - if ((len >= sizeof (spath->sa_path)) || (len < 1)) { + if ((len >= sizeof (sun->sun_path)) || (len < 1)) { return (NNG_EADDRINVAL); } - (void) snprintf(spath->sa_path, sizeof (spath->sa_path), "%s", path); - spath->sa_family = NNG_AF_IPC; + (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", path); + sun->sun_family = AF_UNIX; + return (0); +} + + +int +nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url) +{ + nni_posix_epdesc *ed; + int rv; + + if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { + return (NNG_EADDRINVAL); + } + url += strlen("ipc://"); // skip the prefix. + if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { + return (rv); + } + + *epp = (void *) ed; return (0); } void -nni_plat_ipc_send(nni_plat_ipcsock *s, nni_aio *aio) +nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) { - nni_posix_sock_aio_send((void *) s, aio); + nni_posix_epdesc_fini((void *) ep); } void -nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_aio *aio) +nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) +{ + nni_posix_epdesc_close((void *) ep); +} + + +// UNIX DOMAIN SOCKETS -- these have names in the file namespace. +// We are going to check to see if there was a name already there. +// If there was, and nothing is listening (ECONNREFUSED), then we +// will just try to cleanup the old socket. Note that this is not +// perfect in all scenarios, so use this with caution. +static int +nni_plat_ipc_remove_stale(struct sockaddr_un *sun) { - nni_posix_sock_aio_recv((void *) s, aio); + int fd; + int rv; + + if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { + return (nni_plat_errno(errno)); + } + + // There is an assumption here that connect() returns immediately + // (even when non-blocking) when a server is absent. This seems + // to be true for the platforms we've tried. If it doesn't work, + // then the cleanup will fail. As this is supposed to be an + // exceptional case, don't worry. + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + if (connect(fd, (void *) sun, sizeof (*sun)) < 0) { + if (errno == ECONNREFUSED) { + (void) unlink(sun->sun_path); + } + } + (void) close(fd); + return (0); } int -nni_plat_ipc_init(nni_plat_ipcsock **sp) +nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) { - nni_posix_sock *s; + const char *path; + nni_posix_epdesc *ed = (void *) ep; + struct sockaddr_un sun; int rv; - if ((rv = nni_posix_sock_init(&s)) == 0) { - *sp = (void *) s; + path = nni_posix_epdesc_url(ed); + + if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { + return (rv); + } + + if ((rv = nni_plat_ipc_remove_stale(&sun)) != 0) { + return (rv); } - return (rv); + + nni_posix_epdesc_set_local(ed, &sun, sizeof (sun)); + + return (nni_posix_epdesc_listen(ed)); } void -nni_plat_ipc_fini(nni_plat_ipcsock *s) +nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) { - nni_posix_sock_fini((void *) s); + const char *path; + nni_posix_epdesc *ed = (void *) ep; + struct sockaddr_un sun; + int rv; + + path = nni_posix_epdesc_url(ed); + + if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { + nni_aio_finish(aio, rv, 0); + return; + } + + nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun)); + + nni_posix_epdesc_connect(ed, aio); } void -nni_plat_ipc_shutdown(nni_plat_ipcsock *s) +nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) { - nni_posix_sock_shutdown((void *) s); + nni_posix_epdesc_accept((void *) ep, aio); } -int -nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path) +void +nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p) { - int rv; - nni_sockaddr addr; - - if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) { - return (rv); - } - return (nni_posix_sock_listen((void *) s, &addr)); + nni_posix_pipedesc_fini((void *) p); } -int -nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path) +void +nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p) { - int rv; - nni_sockaddr addr; + nni_posix_pipedesc_close((void *) p); +} - if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) { - return (rv); - } - return (nni_posix_sock_connect_sync((void *) s, &addr, NULL)); + +void +nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio) +{ + nni_posix_pipedesc_send((void *) p, aio); } -int -nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server) +void +nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio) { - return (nni_posix_sock_accept_sync((void *) s, (void *) server)); + nni_posix_pipedesc_recv((void *) p, aio); } diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index a70c902d..d20e7b44 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -180,7 +180,6 @@ nni_posix_poll_thr(void *arg) // Clear the index for the next time around. node->index = 0; - node->revents = fds[index].revents; // Now we move this node to the callback list. diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 6ad9da75..d9ddeb2d 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -23,7 +23,7 @@ typedef struct nni_ipc_ep nni_ipc_ep; // nni_ipc_pipe is one end of an IPC connection. struct nni_ipc_pipe { const char * addr; - nni_plat_ipcsock * isp; + nni_plat_ipc_pipe * ipp; uint16_t peer; uint16_t proto; size_t rcvmax; @@ -47,16 +47,21 @@ struct nni_ipc_pipe { struct nni_ipc_ep { char addr[NNG_MAXADDRLEN+1]; - nni_plat_ipcsock * isp; + nni_plat_ipc_ep * iep; int closed; uint16_t proto; size_t rcvmax; + nni_aio aio; + nni_aio * user_aio; + nni_mtx mtx; }; static void nni_ipc_pipe_send_cb(void *); static void nni_ipc_pipe_recv_cb(void *); static void nni_ipc_pipe_nego_cb(void *); +static void nni_ipc_ep_cb(void *); + static int nni_ipc_tran_init(void) @@ -76,7 +81,7 @@ nni_ipc_pipe_close(void *arg) { nni_ipc_pipe *pipe = arg; - nni_plat_ipc_shutdown(pipe->isp); + nni_plat_ipc_pipe_close(pipe->ipp); } @@ -88,8 +93,8 @@ nni_ipc_pipe_fini(void *arg) nni_aio_fini(&pipe->rxaio); nni_aio_fini(&pipe->txaio); nni_aio_fini(&pipe->negaio); - if (pipe->isp != NULL) { - nni_plat_ipc_fini(pipe->isp); + if (pipe->ipp != NULL) { + nni_plat_ipc_pipe_fini(pipe->ipp); } if (pipe->rxmsg) { nni_msg_free(pipe->rxmsg); @@ -100,7 +105,7 @@ nni_ipc_pipe_fini(void *arg) static int -nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) +nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) { nni_ipc_pipe *pipe; int rv; @@ -111,9 +116,6 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { goto fail; } - if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) { - goto fail; - } rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe); if (rv != 0) { goto fail; @@ -129,6 +131,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; + pipe->ipp = ipp; + pipe->addr = ep->addr; *pipep = pipe; return (0); @@ -177,7 +181,7 @@ nni_ipc_pipe_nego_cb(void *arg) aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead]; // send it down... - nni_plat_ipc_send(pipe->isp, aio); + nni_plat_ipc_pipe_send(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -185,7 +189,7 @@ nni_ipc_pipe_nego_cb(void *arg) aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead]; - nni_plat_ipc_recv(pipe->isp, aio); + nni_plat_ipc_pipe_recv(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -310,7 +314,7 @@ nni_ipc_pipe_recv_cb(void *arg) pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); pipe->rxaio.a_niov = 1; - nni_plat_ipc_recv(pipe->isp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); return; } @@ -367,7 +371,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); pipe->txaio.a_niov = 3; - nni_plat_ipc_send(pipe->isp, &pipe->txaio); + nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); } @@ -406,7 +410,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead); pipe->rxaio.a_niov = 1; - nni_plat_ipc_recv(pipe->isp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); } @@ -438,7 +442,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) nni_mtx_unlock(&pipe->mtx); return; } - nni_plat_ipc_send(pipe->isp, &pipe->negaio); + nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio); nni_mtx_unlock(&pipe->mtx); } @@ -476,26 +480,41 @@ nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) } +static void +nni_ipc_ep_fini(void *arg) +{ + nni_ipc_ep *ep = arg; + + nni_plat_ipc_ep_fini(ep->iep); + nni_aio_fini(&ep->aio); + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + + static int nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) { nni_ipc_ep *ep; int rv; - if (strlen(url) > NNG_MAXADDRLEN-1) { + if ((strlen(url) > NNG_MAXADDRLEN-1) || + (strncmp(url, "ipc://", strlen("ipc://")) != 0)) { return (NNG_EADDRINVAL); } + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } + if (((rv = nni_mtx_init(&ep->mtx)) != 0) || + ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) || + ((rv = nni_plat_ipc_ep_init(&ep->iep, url)) != 0)) { + nni_ipc_ep_fini(ep); + return (rv); + } ep->closed = 0; ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); - if ((rv = nni_plat_ipc_init(&ep->isp)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } - (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); *epp = ep; @@ -504,89 +523,120 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) static void -nni_ipc_ep_fini(void *arg) +nni_ipc_ep_close(void *arg) { nni_ipc_ep *ep = arg; - nni_plat_ipc_fini(ep->isp); - NNI_FREE_STRUCT(ep); + nni_plat_ipc_ep_close(ep->iep); } -static void -nni_ipc_ep_close(void *arg) +static int +nni_ipc_ep_bind(void *arg) { nni_ipc_ep *ep = arg; - nni_plat_ipc_shutdown(ep->isp); + return (nni_plat_ipc_ep_listen(ep->iep)); } -static int -nni_ipc_ep_connect_sync(void *arg, void **pipep) +static void +nni_ipc_ep_finish(nni_ipc_ep *ep) { - nni_ipc_ep *ep = arg; + nni_aio *aio = ep->user_aio; nni_ipc_pipe *pipe; int rv; - const char *path; - if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) { - return (NNG_EADDRINVAL); + if ((aio = ep->user_aio) == NULL) { + return; + } + ep->user_aio = NULL; + if ((rv = nni_aio_result(&ep->aio)) != 0) { + goto done; } - path = ep->addr + strlen("ipc://"); + NNI_ASSERT(ep->aio.a_pipe != NULL); - if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { - return (rv); + // Attempt to allocate the parent pipe. If this fails we'll + // drop the connection (ENOMEM probably). + if ((rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe)) != 0) { + nni_plat_ipc_pipe_fini(ep->aio.a_pipe); + goto done; } + aio->a_pipe = pipe; - rv = nni_plat_ipc_connect(pipe->isp, path); - if (rv != 0) { - nni_ipc_pipe_fini(pipe); - return (rv); - } +done: + ep->aio.a_pipe = NULL; + nni_aio_finish(aio, rv, 0); +} - *pipep = pipe; - return (0); + +static void +nni_ipc_ep_cb(void *arg) +{ + nni_ipc_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + nni_ipc_ep_finish(ep); + nni_mtx_unlock(&ep->mtx); } -static int -nni_ipc_ep_bind(void *arg) +static void +nni_ipc_cancel_ep(nni_aio *aio) +{ + nni_ipc_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&ep->mtx); + if (ep->user_aio == aio) { + ep->user_aio = NULL; + } + nni_aio_stop(&ep->aio); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_ipc_ep_accept(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; int rv; - const char *path; - // We want to strok this, so make a copy. Skip the scheme. - if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) { - return (NNG_EADDRINVAL); - } - path = ep->addr + strlen("ipc://"); + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; - if ((rv = nni_plat_ipc_listen(ep->isp, path)) != 0) { - return (rv); + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; } - return (0); + + nni_plat_ipc_ep_accept(ep->iep, &ep->aio); + nni_mtx_unlock(&ep->mtx); } -static int -nni_ipc_ep_accept_sync(void *arg, void **pipep) +static void +nni_ipc_ep_connect(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - nni_ipc_pipe *pipe; int rv; - if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { - return (rv); - } - if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) { - nni_ipc_pipe_fini(pipe); - return (rv); + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; + + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; } - *pipep = pipe; - return (0); + + nni_plat_ipc_ep_connect(ep->iep, &ep->aio); + nni_mtx_unlock(&ep->mtx); } @@ -601,14 +651,14 @@ static nni_tran_pipe nni_ipc_pipe_ops = { }; static nni_tran_ep nni_ipc_ep_ops = { - .ep_init = nni_ipc_ep_init, - .ep_fini = nni_ipc_ep_fini, - .ep_connect_sync = nni_ipc_ep_connect_sync, - .ep_bind = nni_ipc_ep_bind, - .ep_accept_sync = nni_ipc_ep_accept_sync, - .ep_close = nni_ipc_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_ipc_ep_init, + .ep_fini = nni_ipc_ep_fini, + .ep_connect = nni_ipc_ep_connect, + .ep_bind = nni_ipc_ep_bind, + .ep_accept = nni_ipc_ep_accept, + .ep_close = nni_ipc_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the IPC transport linkage, and should be the only global -- cgit v1.2.3-70-g09d2