diff options
| -rw-r--r-- | src/core/platform.h | 114 | ||||
| -rw-r--r-- | src/platform/posix/posix_aio.h | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 115 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipc.c | 158 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 1 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 196 |
6 files changed, 379 insertions, 211 deletions
diff --git a/src/core/platform.h b/src/core/platform.h index 7e6dfbbe..971c6a5d 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -37,6 +37,10 @@ // they do not need in the child. (Note that posix_spawn() does *NOT* // arrange for pthread_atfork() handlers to be called on some platforms.) +// +// Debugging Support +// + // nni_plat_abort crashes the system; it should do whatever is appropriate // for abnormal programs on the platform, such as calling abort(). extern void nni_plat_abort(void); @@ -46,6 +50,18 @@ extern void nni_plat_abort(void); // not contain newlines, but the output will add them. extern void nni_plat_println(const char *); +// nni_plat_strerror allows the platform to use additional error messages +// for additional error codes. The err code passed in should be the +// equivalent of errno or GetLastError, without the NNG_ESYSERR component. +// The platform should make sure that the returned value will be valid +// after the call returns. (If necessary, thread-local storage can be +// used.) +extern const char *nni_plat_strerror(int); + +// +// Memory Management +// + // nni_alloc allocates memory. In most cases this can just be malloc(). // However, you may provide a different allocator, for example it is // possible to use a slab allocator or somesuch. It is permissible for this @@ -64,9 +80,10 @@ typedef struct nni_plat_mtx nni_plat_mtx; typedef struct nni_plat_cv nni_plat_cv; typedef struct nni_plat_thr nni_plat_thr; typedef struct nni_plat_tcpsock nni_plat_tcpsock; -typedef struct nni_plat_ipcsock nni_plat_ipcsock; -// Mutex handling. +// +// Threading & Synchronization Support +// // nni_plat_mtx_init initializes a mutex structure. This may require dynamic // allocation, depending on the platform. It can return NNG_ENOMEM if that @@ -127,6 +144,10 @@ extern int nni_plat_thr_init(nni_plat_thr *, void (*)(void *), void *); // is an error to reference the thread in any further way. extern void nni_plat_thr_fini(nni_plat_thr *); +// +// Clock Support +// + // nn_plat_clock returns a number of microseconds since some arbitrary time // in the past. The values returned by nni_clock must use the same base // as the times used in nni_plat_cond_waituntil. The nni_plat_clock() must @@ -138,6 +159,16 @@ extern nni_time nni_plat_clock(void); // nni_plat_usleep sleeps for the specified number of microseconds (at least). extern void nni_plat_usleep(nni_duration); +// +// Entropy Support +// + +// nni_plat_seed_prng seeds the PRNG subsystem. The specified number +// of bytes of entropy should be stashed. When possible, cryptographic +// quality entropy sources should be used. Note that today we prefer +// to seed up to 256 bytes of data. +extern void nni_plat_seed_prng(void *, size_t); + // nni_plat_init is called to allow the platform the chance to // do any necessary initialization. This routine MUST be idempotent, // and threadsafe, and will be called before any other API calls, and @@ -157,13 +188,6 @@ extern int nni_plat_init(int (*)(void)); // will be called until nni_platform_init is called. extern void nni_plat_fini(void); -// nni_plat_strerror allows the platform to use additional error messages -// for additional error codes. The err code passed in should be the -// equivalent of errno or GetLastError, without the NNG_ESYSERR component. -// The platform should make sure that the returned value will be valid -// after the call returns. (If necessary, thread-local storage can be -// used.) -extern const char *nni_plat_strerror(int); // nni_plat_lookup_host looks up a hostname in DNS, or the local hosts // file, or whatever. If your platform lacks support for naming, it must @@ -172,6 +196,10 @@ extern const char *nni_plat_strerror(int); // returned on dual stack machines. extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int); +// +// TCP Support. +// + // nni_plat_tcp_init initializes the socket, for example it can // set underlying file descriptors to -1, etc. extern int nni_plat_tcp_init(nni_plat_tcpsock **); @@ -214,48 +242,56 @@ extern void nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *); // full, or an error condition occurs. extern void nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *); -// nni_plat_ipc_init initializes the socket, for example it can -// set underlying file descriptors to -1, etc. -extern int nni_plat_ipc_init(nni_plat_ipcsock **); +// +// IPC (UNIX Domain Sockets & Named Pipes) Support. +// -// nni_plat_ipc_fini just closes an IPC socket, and releases any related -// resources. -extern void nni_plat_ipc_fini(nni_plat_ipcsock *); +typedef struct nni_plat_ipc_ep nni_plat_ipc_ep; +typedef struct nni_plat_ipc_pipe nni_plat_ipc_pipe; -// nni_plat_ipc_shutdown performs a shutdown of the socket. For -// BSD sockets, this closes both sides of the IPC connection gracefully, -// but the underlying file descriptor is left open. (This part is critical -// to prevention of close() related races.) -extern void nni_plat_ipc_shutdown(nni_plat_ipcsock *); +// nni_plat_ipc_ep_init creates a new endpoint associated with the path. +extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const char *); + +// nni_plat_ipc_ep_fini closes the endpoint and releases resources. +extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *); + +// nni_plat_ipc_ep_close closes the endpoint; this might not close the +// actual underlying socket, but it should call shutdown on it. +// Further operations on the pipe should return NNG_ECLOSED. +extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *); // nni_plat_tcp_listen creates an IPC socket in listening mode, bound // to the specified path. Note that nni_plat_ipcsock should be defined // to whatever your platform uses. For most systems its just "int". -extern int nni_plat_ipc_listen(nni_plat_ipcsock *, const char *); +extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *); -// nni_plat_ipc_accept does the accept to accept an inbound connection. -// The ipcsock used for the server will have been set up with the -// nni_plat_ipc_listen. -extern int nni_plat_ipc_accept(nni_plat_ipcsock *, nni_plat_ipcsock *); +// nni_plat_ipc_ep_accept starts an accept to receive an incoming connection. +// An accepted connection will be passed back in the a_pipe member. +extern void nni_plat_ipc_ep_accept(nni_plat_ipc_ep *, nni_aio *); // nni_plat_ipc_connect is the client side. -extern int nni_plat_ipc_connect(nni_plat_ipcsock *, const char *); +// An accepted connection will be passed back in the a_pipe member. +extern void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *, nni_aio *); -// nni_plat_ipc_aio_send sends data to the peer. The platform is responsible -// for attempting to send all of the data. The iov count will never be -// larger than 4. The platform may modify the iovs. -extern void nni_plat_ipc_send(nni_plat_ipcsock *, nni_aio *); +// nni_plat_ipc_pipe_fini closes the pipe, and releases all resources +// associated with it. +extern void nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *); -// nni_plat_ipc_aio_recv recvs data into the buffers provided by the -// iovs. The implementation does not return until the iovs are completely -// full, or an error condition occurs. -extern void nni_plat_ipc_recv(nni_plat_ipcsock *, nni_aio *); +// nni_plat_ipc_pipe_close closes the socket, or at least shuts it down. +// Further operations on the pipe should return NNG_ECLOSED. +extern void nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *); -// nni_plat_seed_prng seeds the PRNG subsystem. The specified number -// of bytes of entropy should be stashed. When possible, cryptographic -// quality entropy sources should be used. Note that today we prefer -// to seed up to 256 bytes of data. -extern void nni_plat_seed_prng(void *, size_t); +// nni_plat_ipc_pipe_send sends data in the iov buffers to the peer. +// The platform may modify the iovs. +extern void nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *, nni_aio *); + +// nni_plat_ipc_pipe_recv recvs data into the buffers provided by the iovs. +// The platform may modify the iovs. +extern void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *, nni_aio *); + +// +// Notification Pipe Pairs +// // nni_plat_pipe creates a pair of linked file descriptors that are // suitable for notification via SENDFD/RECVFD. These are platform diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 186f586f..13559c08 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -30,10 +30,14 @@ extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_send(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_close(nni_posix_pipedesc *); -extern int nni_posix_epdesc_init(nni_posix_epdesc **, int); +extern int nni_posix_epdesc_init(nni_posix_epdesc **, const char *); +extern const char *nni_posix_epdesc_url(nni_posix_epdesc *); +extern void nni_posix_epdesc_set_local(nni_posix_epdesc *, void *, int); +extern void nni_posix_epdesc_set_remote(nni_posix_epdesc *, void *, int); extern void nni_posix_epdesc_fini(nni_posix_epdesc *); extern void nni_posix_epdesc_close(nni_posix_epdesc *); extern void nni_posix_epdesc_connect(nni_posix_epdesc *, nni_aio *); +extern int nni_posix_epdesc_listen(nni_posix_epdesc *); extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *); #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index f419d25a..29420c26 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -42,6 +42,7 @@ struct nni_posix_epdesc { struct sockaddr_storage remaddr; socklen_t loclen; socklen_t remlen; + const char * url; nni_mtx mtx; }; @@ -71,7 +72,7 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) if (rv != 0) { (void) close(newfd); } else { - aio->a_pipe = pipe; + aio->a_pipe = pd; } } // Abuse the count to hold our new fd. This is only for accept. @@ -110,6 +111,9 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) return; default: + if (rv == ENOENT) { + rv = ECONNREFUSED; + } nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0); close(ed->fd); ed->fd = -1; @@ -232,65 +236,25 @@ nni_posix_epdesc_close(nni_posix_epdesc *ed) } -// UNIX DOMAIN SOCKETS -- these have names in the file namespace. -// We are going to check to see if there was a name already there. -// If there was, and nothing is listening (ECONNREFUSED), then we -// will just try to cleanup the old socket. Note that this is not -// perfect in all scenarios, so use this with caution. -static int -nni_posix_epdesc_remove_stale_ipc_socket(struct sockaddr *sa, socklen_t len) -{ - int fd; - struct sockaddr_un *sun = (void *) sa; - - if ((len == 0) || (sun->sun_family != AF_UNIX)) { - return (0); - } - - if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - - // There is an assumption here that connect() returns immediately - // (even when non-blocking) when a server is absent. This seems - // to be true for the platforms we've tried. If it doesn't work, - // then the cleanup will fail. As this is supposed to be an - // exceptional case, don't worry. - (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if (connect(fd, (void *) sun, len) < 0) { - if (errno == ECONNREFUSED) { - (void) unlink(sun->sun_path); - } - } - (void) close(fd); - return (0); -} - - int -nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr) +nni_posix_epdesc_listen(nni_posix_epdesc *ed) { int len; - struct sockaddr_storage ss; + struct sockaddr_storage *ss; int rv; int fd; - if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) { - return (NNG_EADDRINVAL); - } + nni_mtx_lock(&ed->mtx); + ss = &ed->locaddr; + len = ed->loclen; - if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { + if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { return (nni_plat_errno(errno)); } (void) fcntl(fd, F_SETFD, FD_CLOEXEC); - rv = nni_posix_epdesc_remove_stale_ipc_socket((void *) &ss, len); - if (rv != 0) { - (void) close(fd); - return (rv); - } - - if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + if (bind(fd, (struct sockaddr *) ss, len) < 0) { + nni_mtx_unlock(&ed->mtx); rv = nni_plat_errno(errno); (void) close(fd); return (rv); @@ -299,12 +263,15 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr) // Listen -- 128 depth is probably sufficient. If it isn't, other // bad things are going to happen. if (listen(fd, 128) != 0) { + nni_mtx_unlock(&ed->mtx); rv = nni_plat_errno(errno); (void) close(fd); return (rv); } ed->fd = fd; + ed->node.fd = fd; + nni_mtx_unlock(&ed->mtx); return (0); } @@ -363,6 +330,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) nni_posix_epdesc_finish(aio, rv, 0); return; } + ed->node.fd = ed->fd; // Possibly bind. if (ed->loclen != 0) { @@ -380,6 +348,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) if (rv == 0) { // Immediate connect, cool! This probably only happens on // loopback, and probably not on every platform. + nni_posix_epdesc_finish(aio, 0, ed->fd); ed->fd = -1; nni_mtx_unlock(&ed->mtx); @@ -388,6 +357,9 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) if (errno != EINPROGRESS) { // Some immediate failure occurred. + if (errno == ENOENT) { + errno = ECONNREFUSED; + } (void) close(ed->fd); ed->fd = -1; nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); @@ -413,7 +385,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) int -nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) +nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) { nni_posix_epdesc *ed; int rv; @@ -432,14 +404,12 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) // one. For now we just have a global pollq. Note that by tying // the ed to a single pollq we may get some kind of cache warmth. - ed->pq = nni_posix_pollq_get(fd); - ed->fd = fd; + ed->pq = nni_posix_pollq_get((int) nni_random()); + ed->fd = -1; ed->node.index = 0; - ed->node.cb = NULL; // XXXX: + ed->node.cb = nni_posix_epdesc_cb; ed->node.data = ed; - - // Ensure we are in non-blocking mode. - (void) fcntl(fd, F_SETFL, O_NONBLOCK); + ed->url = url; nni_aio_list_init(&ed->connectq); nni_aio_list_init(&ed->acceptq); @@ -449,6 +419,39 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) } +const char * +nni_posix_epdesc_url(nni_posix_epdesc *ed) +{ + return (ed->url); +} + + +void +nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, int len) +{ + if ((len < 0) || (len > sizeof (struct sockaddr_storage))) { + return; + } + nni_mtx_lock(&ed->mtx); + memcpy(&ed->locaddr, sa, len); + ed->loclen = len; + nni_mtx_unlock(&ed->mtx); +} + + +void +nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len) +{ + if ((len < 0) || (len > sizeof (struct sockaddr_storage))) { + return; + } + nni_mtx_lock(&ed->mtx); + memcpy(&ed->remaddr, sa, len); + ed->remlen = len; + nni_mtx_unlock(&ed->mtx); +} + + void nni_posix_epdesc_fini(nni_posix_epdesc *ed) { diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index c1911a05..5bd42190 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -25,105 +25,181 @@ #include <unistd.h> #include <netdb.h> +#ifdef SOCK_CLOEXEC +#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) +#else +#define NNI_STREAM_SOCKTYPE SOCK_STREAM +#endif + + // Solaris/SunOS systems define this, which collides with our symbol // names. Just undefine it now. #ifdef sun #undef sun #endif -// We alias nni_posix_pipdedesc to nni_plat_ipcsock. +// We alias nni_posix_pipedesc to nni_plat_ipc_pipe. +// We alias nni_posix_epdesc to nni_plat_ipc_ep. static int -nni_plat_ipc_path_resolve(nni_sockaddr *addr, const char *path) +nni_plat_ipc_path_resolve(struct sockaddr_un *sun, const char *path) { - nng_sockaddr_path *spath; size_t len; - memset(addr, 0, sizeof (*addr)); - spath = &addr->s_un.s_path; + memset(sun, 0, sizeof (*sun)); // TODO: abstract sockets, including autobind sockets. len = strlen(path); - if ((len >= sizeof (spath->sa_path)) || (len < 1)) { + if ((len >= sizeof (sun->sun_path)) || (len < 1)) { return (NNG_EADDRINVAL); } - (void) snprintf(spath->sa_path, sizeof (spath->sa_path), "%s", path); - spath->sa_family = NNG_AF_IPC; + (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", path); + sun->sun_family = AF_UNIX; + return (0); +} + + +int +nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url) +{ + nni_posix_epdesc *ed; + int rv; + + if (strncmp(url, "ipc://", strlen("ipc://")) != 0) { + return (NNG_EADDRINVAL); + } + url += strlen("ipc://"); // skip the prefix. + if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) { + return (rv); + } + + *epp = (void *) ed; return (0); } void -nni_plat_ipc_send(nni_plat_ipcsock *s, nni_aio *aio) +nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) { - nni_posix_sock_aio_send((void *) s, aio); + nni_posix_epdesc_fini((void *) ep); } void -nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_aio *aio) +nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) +{ + nni_posix_epdesc_close((void *) ep); +} + + +// UNIX DOMAIN SOCKETS -- these have names in the file namespace. +// We are going to check to see if there was a name already there. +// If there was, and nothing is listening (ECONNREFUSED), then we +// will just try to cleanup the old socket. Note that this is not +// perfect in all scenarios, so use this with caution. +static int +nni_plat_ipc_remove_stale(struct sockaddr_un *sun) { - nni_posix_sock_aio_recv((void *) s, aio); + int fd; + int rv; + + if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { + return (nni_plat_errno(errno)); + } + + // There is an assumption here that connect() returns immediately + // (even when non-blocking) when a server is absent. This seems + // to be true for the platforms we've tried. If it doesn't work, + // then the cleanup will fail. As this is supposed to be an + // exceptional case, don't worry. + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + if (connect(fd, (void *) sun, sizeof (*sun)) < 0) { + if (errno == ECONNREFUSED) { + (void) unlink(sun->sun_path); + } + } + (void) close(fd); + return (0); } int -nni_plat_ipc_init(nni_plat_ipcsock **sp) +nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) { - nni_posix_sock *s; + const char *path; + nni_posix_epdesc *ed = (void *) ep; + struct sockaddr_un sun; int rv; - if ((rv = nni_posix_sock_init(&s)) == 0) { - *sp = (void *) s; + path = nni_posix_epdesc_url(ed); + + if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { + return (rv); + } + + if ((rv = nni_plat_ipc_remove_stale(&sun)) != 0) { + return (rv); } - return (rv); + + nni_posix_epdesc_set_local(ed, &sun, sizeof (sun)); + + return (nni_posix_epdesc_listen(ed)); } void -nni_plat_ipc_fini(nni_plat_ipcsock *s) +nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) { - nni_posix_sock_fini((void *) s); + const char *path; + nni_posix_epdesc *ed = (void *) ep; + struct sockaddr_un sun; + int rv; + + path = nni_posix_epdesc_url(ed); + + if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) { + nni_aio_finish(aio, rv, 0); + return; + } + + nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun)); + + nni_posix_epdesc_connect(ed, aio); } void -nni_plat_ipc_shutdown(nni_plat_ipcsock *s) +nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) { - nni_posix_sock_shutdown((void *) s); + nni_posix_epdesc_accept((void *) ep, aio); } -int -nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path) +void +nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p) { - int rv; - nni_sockaddr addr; - - if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) { - return (rv); - } - return (nni_posix_sock_listen((void *) s, &addr)); + nni_posix_pipedesc_fini((void *) p); } -int -nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path) +void +nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p) { - int rv; - nni_sockaddr addr; + nni_posix_pipedesc_close((void *) p); +} - if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) { - return (rv); - } - return (nni_posix_sock_connect_sync((void *) s, &addr, NULL)); + +void +nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio) +{ + nni_posix_pipedesc_send((void *) p, aio); } -int -nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server) +void +nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio) { - return (nni_posix_sock_accept_sync((void *) s, (void *) server)); + nni_posix_pipedesc_recv((void *) p, aio); } diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index a70c902d..d20e7b44 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -180,7 +180,6 @@ nni_posix_poll_thr(void *arg) // Clear the index for the next time around. node->index = 0; - node->revents = fds[index].revents; // Now we move this node to the callback list. diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 6ad9da75..d9ddeb2d 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -23,7 +23,7 @@ typedef struct nni_ipc_ep nni_ipc_ep; // nni_ipc_pipe is one end of an IPC connection. struct nni_ipc_pipe { const char * addr; - nni_plat_ipcsock * isp; + nni_plat_ipc_pipe * ipp; uint16_t peer; uint16_t proto; size_t rcvmax; @@ -47,16 +47,21 @@ struct nni_ipc_pipe { struct nni_ipc_ep { char addr[NNG_MAXADDRLEN+1]; - nni_plat_ipcsock * isp; + nni_plat_ipc_ep * iep; int closed; uint16_t proto; size_t rcvmax; + nni_aio aio; + nni_aio * user_aio; + nni_mtx mtx; }; static void nni_ipc_pipe_send_cb(void *); static void nni_ipc_pipe_recv_cb(void *); static void nni_ipc_pipe_nego_cb(void *); +static void nni_ipc_ep_cb(void *); + static int nni_ipc_tran_init(void) @@ -76,7 +81,7 @@ nni_ipc_pipe_close(void *arg) { nni_ipc_pipe *pipe = arg; - nni_plat_ipc_shutdown(pipe->isp); + nni_plat_ipc_pipe_close(pipe->ipp); } @@ -88,8 +93,8 @@ nni_ipc_pipe_fini(void *arg) nni_aio_fini(&pipe->rxaio); nni_aio_fini(&pipe->txaio); nni_aio_fini(&pipe->negaio); - if (pipe->isp != NULL) { - nni_plat_ipc_fini(pipe->isp); + if (pipe->ipp != NULL) { + nni_plat_ipc_pipe_fini(pipe->ipp); } if (pipe->rxmsg) { nni_msg_free(pipe->rxmsg); @@ -100,7 +105,7 @@ nni_ipc_pipe_fini(void *arg) static int -nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) +nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) { nni_ipc_pipe *pipe; int rv; @@ -111,9 +116,6 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { goto fail; } - if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) { - goto fail; - } rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe); if (rv != 0) { goto fail; @@ -129,6 +131,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep) pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; + pipe->ipp = ipp; + pipe->addr = ep->addr; *pipep = pipe; return (0); @@ -177,7 +181,7 @@ nni_ipc_pipe_nego_cb(void *arg) aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead]; // send it down... - nni_plat_ipc_send(pipe->isp, aio); + nni_plat_ipc_pipe_send(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -185,7 +189,7 @@ nni_ipc_pipe_nego_cb(void *arg) aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead]; - nni_plat_ipc_recv(pipe->isp, aio); + nni_plat_ipc_pipe_recv(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -310,7 +314,7 @@ nni_ipc_pipe_recv_cb(void *arg) pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); pipe->rxaio.a_niov = 1; - nni_plat_ipc_recv(pipe->isp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); return; } @@ -367,7 +371,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); pipe->txaio.a_niov = 3; - nni_plat_ipc_send(pipe->isp, &pipe->txaio); + nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); } @@ -406,7 +410,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead); pipe->rxaio.a_niov = 1; - nni_plat_ipc_recv(pipe->isp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); } @@ -438,7 +442,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) nni_mtx_unlock(&pipe->mtx); return; } - nni_plat_ipc_send(pipe->isp, &pipe->negaio); + nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio); nni_mtx_unlock(&pipe->mtx); } @@ -476,26 +480,41 @@ nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) } +static void +nni_ipc_ep_fini(void *arg) +{ + nni_ipc_ep *ep = arg; + + nni_plat_ipc_ep_fini(ep->iep); + nni_aio_fini(&ep->aio); + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + + static int nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) { nni_ipc_ep *ep; int rv; - if (strlen(url) > NNG_MAXADDRLEN-1) { + if ((strlen(url) > NNG_MAXADDRLEN-1) || + (strncmp(url, "ipc://", strlen("ipc://")) != 0)) { return (NNG_EADDRINVAL); } + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } + if (((rv = nni_mtx_init(&ep->mtx)) != 0) || + ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) || + ((rv = nni_plat_ipc_ep_init(&ep->iep, url)) != 0)) { + nni_ipc_ep_fini(ep); + return (rv); + } ep->closed = 0; ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); - if ((rv = nni_plat_ipc_init(&ep->isp)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } - (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); *epp = ep; @@ -504,89 +523,120 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) static void -nni_ipc_ep_fini(void *arg) +nni_ipc_ep_close(void *arg) { nni_ipc_ep *ep = arg; - nni_plat_ipc_fini(ep->isp); - NNI_FREE_STRUCT(ep); + nni_plat_ipc_ep_close(ep->iep); } -static void -nni_ipc_ep_close(void *arg) +static int +nni_ipc_ep_bind(void *arg) { nni_ipc_ep *ep = arg; - nni_plat_ipc_shutdown(ep->isp); + return (nni_plat_ipc_ep_listen(ep->iep)); } -static int -nni_ipc_ep_connect_sync(void *arg, void **pipep) +static void +nni_ipc_ep_finish(nni_ipc_ep *ep) { - nni_ipc_ep *ep = arg; + nni_aio *aio = ep->user_aio; nni_ipc_pipe *pipe; int rv; - const char *path; - if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) { - return (NNG_EADDRINVAL); + if ((aio = ep->user_aio) == NULL) { + return; + } + ep->user_aio = NULL; + if ((rv = nni_aio_result(&ep->aio)) != 0) { + goto done; } - path = ep->addr + strlen("ipc://"); + NNI_ASSERT(ep->aio.a_pipe != NULL); - if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { - return (rv); + // Attempt to allocate the parent pipe. If this fails we'll + // drop the connection (ENOMEM probably). + if ((rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe)) != 0) { + nni_plat_ipc_pipe_fini(ep->aio.a_pipe); + goto done; } + aio->a_pipe = pipe; - rv = nni_plat_ipc_connect(pipe->isp, path); - if (rv != 0) { - nni_ipc_pipe_fini(pipe); - return (rv); - } +done: + ep->aio.a_pipe = NULL; + nni_aio_finish(aio, rv, 0); +} - *pipep = pipe; - return (0); + +static void +nni_ipc_ep_cb(void *arg) +{ + nni_ipc_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + nni_ipc_ep_finish(ep); + nni_mtx_unlock(&ep->mtx); } -static int -nni_ipc_ep_bind(void *arg) +static void +nni_ipc_cancel_ep(nni_aio *aio) +{ + nni_ipc_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&ep->mtx); + if (ep->user_aio == aio) { + ep->user_aio = NULL; + } + nni_aio_stop(&ep->aio); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_ipc_ep_accept(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; int rv; - const char *path; - // We want to strok this, so make a copy. Skip the scheme. - if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) { - return (NNG_EADDRINVAL); - } - path = ep->addr + strlen("ipc://"); + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; - if ((rv = nni_plat_ipc_listen(ep->isp, path)) != 0) { - return (rv); + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; } - return (0); + + nni_plat_ipc_ep_accept(ep->iep, &ep->aio); + nni_mtx_unlock(&ep->mtx); } -static int -nni_ipc_ep_accept_sync(void *arg, void **pipep) +static void +nni_ipc_ep_connect(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - nni_ipc_pipe *pipe; int rv; - if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) { - return (rv); - } - if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) { - nni_ipc_pipe_fini(pipe); - return (rv); + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; + + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; } - *pipep = pipe; - return (0); + + nni_plat_ipc_ep_connect(ep->iep, &ep->aio); + nni_mtx_unlock(&ep->mtx); } @@ -601,14 +651,14 @@ static nni_tran_pipe nni_ipc_pipe_ops = { }; static nni_tran_ep nni_ipc_ep_ops = { - .ep_init = nni_ipc_ep_init, - .ep_fini = nni_ipc_ep_fini, - .ep_connect_sync = nni_ipc_ep_connect_sync, - .ep_bind = nni_ipc_ep_bind, - .ep_accept_sync = nni_ipc_ep_accept_sync, - .ep_close = nni_ipc_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_ipc_ep_init, + .ep_fini = nni_ipc_ep_fini, + .ep_connect = nni_ipc_ep_connect, + .ep_bind = nni_ipc_ep_bind, + .ep_accept = nni_ipc_ep_accept, + .ep_close = nni_ipc_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the IPC transport linkage, and should be the only global |
