aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/platform.h114
-rw-r--r--src/platform/posix/posix_aio.h6
-rw-r--r--src/platform/posix/posix_epdesc.c115
-rw-r--r--src/platform/posix/posix_ipc.c158
-rw-r--r--src/platform/posix/posix_pollq_poll.c1
-rw-r--r--src/transport/ipc/ipc.c196
6 files changed, 379 insertions, 211 deletions
diff --git a/src/core/platform.h b/src/core/platform.h
index 7e6dfbbe..971c6a5d 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -37,6 +37,10 @@
// they do not need in the child. (Note that posix_spawn() does *NOT*
// arrange for pthread_atfork() handlers to be called on some platforms.)
+//
+// Debugging Support
+//
+
// nni_plat_abort crashes the system; it should do whatever is appropriate
// for abnormal programs on the platform, such as calling abort().
extern void nni_plat_abort(void);
@@ -46,6 +50,18 @@ extern void nni_plat_abort(void);
// not contain newlines, but the output will add them.
extern void nni_plat_println(const char *);
+// nni_plat_strerror allows the platform to use additional error messages
+// for additional error codes. The err code passed in should be the
+// equivalent of errno or GetLastError, without the NNG_ESYSERR component.
+// The platform should make sure that the returned value will be valid
+// after the call returns. (If necessary, thread-local storage can be
+// used.)
+extern const char *nni_plat_strerror(int);
+
+//
+// Memory Management
+//
+
// nni_alloc allocates memory. In most cases this can just be malloc().
// However, you may provide a different allocator, for example it is
// possible to use a slab allocator or somesuch. It is permissible for this
@@ -64,9 +80,10 @@ typedef struct nni_plat_mtx nni_plat_mtx;
typedef struct nni_plat_cv nni_plat_cv;
typedef struct nni_plat_thr nni_plat_thr;
typedef struct nni_plat_tcpsock nni_plat_tcpsock;
-typedef struct nni_plat_ipcsock nni_plat_ipcsock;
-// Mutex handling.
+//
+// Threading & Synchronization Support
+//
// nni_plat_mtx_init initializes a mutex structure. This may require dynamic
// allocation, depending on the platform. It can return NNG_ENOMEM if that
@@ -127,6 +144,10 @@ extern int nni_plat_thr_init(nni_plat_thr *, void (*)(void *), void *);
// is an error to reference the thread in any further way.
extern void nni_plat_thr_fini(nni_plat_thr *);
+//
+// Clock Support
+//
+
// nn_plat_clock returns a number of microseconds since some arbitrary time
// in the past. The values returned by nni_clock must use the same base
// as the times used in nni_plat_cond_waituntil. The nni_plat_clock() must
@@ -138,6 +159,16 @@ extern nni_time nni_plat_clock(void);
// nni_plat_usleep sleeps for the specified number of microseconds (at least).
extern void nni_plat_usleep(nni_duration);
+//
+// Entropy Support
+//
+
+// nni_plat_seed_prng seeds the PRNG subsystem. The specified number
+// of bytes of entropy should be stashed. When possible, cryptographic
+// quality entropy sources should be used. Note that today we prefer
+// to seed up to 256 bytes of data.
+extern void nni_plat_seed_prng(void *, size_t);
+
// nni_plat_init is called to allow the platform the chance to
// do any necessary initialization. This routine MUST be idempotent,
// and threadsafe, and will be called before any other API calls, and
@@ -157,13 +188,6 @@ extern int nni_plat_init(int (*)(void));
// will be called until nni_platform_init is called.
extern void nni_plat_fini(void);
-// nni_plat_strerror allows the platform to use additional error messages
-// for additional error codes. The err code passed in should be the
-// equivalent of errno or GetLastError, without the NNG_ESYSERR component.
-// The platform should make sure that the returned value will be valid
-// after the call returns. (If necessary, thread-local storage can be
-// used.)
-extern const char *nni_plat_strerror(int);
// nni_plat_lookup_host looks up a hostname in DNS, or the local hosts
// file, or whatever. If your platform lacks support for naming, it must
@@ -172,6 +196,10 @@ extern const char *nni_plat_strerror(int);
// returned on dual stack machines.
extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int);
+//
+// TCP Support.
+//
+
// nni_plat_tcp_init initializes the socket, for example it can
// set underlying file descriptors to -1, etc.
extern int nni_plat_tcp_init(nni_plat_tcpsock **);
@@ -214,48 +242,56 @@ extern void nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *);
// full, or an error condition occurs.
extern void nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *);
-// nni_plat_ipc_init initializes the socket, for example it can
-// set underlying file descriptors to -1, etc.
-extern int nni_plat_ipc_init(nni_plat_ipcsock **);
+//
+// IPC (UNIX Domain Sockets & Named Pipes) Support.
+//
-// nni_plat_ipc_fini just closes an IPC socket, and releases any related
-// resources.
-extern void nni_plat_ipc_fini(nni_plat_ipcsock *);
+typedef struct nni_plat_ipc_ep nni_plat_ipc_ep;
+typedef struct nni_plat_ipc_pipe nni_plat_ipc_pipe;
-// nni_plat_ipc_shutdown performs a shutdown of the socket. For
-// BSD sockets, this closes both sides of the IPC connection gracefully,
-// but the underlying file descriptor is left open. (This part is critical
-// to prevention of close() related races.)
-extern void nni_plat_ipc_shutdown(nni_plat_ipcsock *);
+// nni_plat_ipc_ep_init creates a new endpoint associated with the path.
+extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const char *);
+
+// nni_plat_ipc_ep_fini closes the endpoint and releases resources.
+extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *);
+
+// nni_plat_ipc_ep_close closes the endpoint; this might not close the
+// actual underlying socket, but it should call shutdown on it.
+// Further operations on the pipe should return NNG_ECLOSED.
+extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *);
// nni_plat_tcp_listen creates an IPC socket in listening mode, bound
// to the specified path. Note that nni_plat_ipcsock should be defined
// to whatever your platform uses. For most systems its just "int".
-extern int nni_plat_ipc_listen(nni_plat_ipcsock *, const char *);
+extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *);
-// nni_plat_ipc_accept does the accept to accept an inbound connection.
-// The ipcsock used for the server will have been set up with the
-// nni_plat_ipc_listen.
-extern int nni_plat_ipc_accept(nni_plat_ipcsock *, nni_plat_ipcsock *);
+// nni_plat_ipc_ep_accept starts an accept to receive an incoming connection.
+// An accepted connection will be passed back in the a_pipe member.
+extern void nni_plat_ipc_ep_accept(nni_plat_ipc_ep *, nni_aio *);
// nni_plat_ipc_connect is the client side.
-extern int nni_plat_ipc_connect(nni_plat_ipcsock *, const char *);
+// An accepted connection will be passed back in the a_pipe member.
+extern void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *, nni_aio *);
-// nni_plat_ipc_aio_send sends data to the peer. The platform is responsible
-// for attempting to send all of the data. The iov count will never be
-// larger than 4. The platform may modify the iovs.
-extern void nni_plat_ipc_send(nni_plat_ipcsock *, nni_aio *);
+// nni_plat_ipc_pipe_fini closes the pipe, and releases all resources
+// associated with it.
+extern void nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *);
-// nni_plat_ipc_aio_recv recvs data into the buffers provided by the
-// iovs. The implementation does not return until the iovs are completely
-// full, or an error condition occurs.
-extern void nni_plat_ipc_recv(nni_plat_ipcsock *, nni_aio *);
+// nni_plat_ipc_pipe_close closes the socket, or at least shuts it down.
+// Further operations on the pipe should return NNG_ECLOSED.
+extern void nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *);
-// nni_plat_seed_prng seeds the PRNG subsystem. The specified number
-// of bytes of entropy should be stashed. When possible, cryptographic
-// quality entropy sources should be used. Note that today we prefer
-// to seed up to 256 bytes of data.
-extern void nni_plat_seed_prng(void *, size_t);
+// nni_plat_ipc_pipe_send sends data in the iov buffers to the peer.
+// The platform may modify the iovs.
+extern void nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *, nni_aio *);
+
+// nni_plat_ipc_pipe_recv recvs data into the buffers provided by the iovs.
+// The platform may modify the iovs.
+extern void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *, nni_aio *);
+
+//
+// Notification Pipe Pairs
+//
// nni_plat_pipe creates a pair of linked file descriptors that are
// suitable for notification via SENDFD/RECVFD. These are platform
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 186f586f..13559c08 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -30,10 +30,14 @@ extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_send(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_close(nni_posix_pipedesc *);
-extern int nni_posix_epdesc_init(nni_posix_epdesc **, int);
+extern int nni_posix_epdesc_init(nni_posix_epdesc **, const char *);
+extern const char *nni_posix_epdesc_url(nni_posix_epdesc *);
+extern void nni_posix_epdesc_set_local(nni_posix_epdesc *, void *, int);
+extern void nni_posix_epdesc_set_remote(nni_posix_epdesc *, void *, int);
extern void nni_posix_epdesc_fini(nni_posix_epdesc *);
extern void nni_posix_epdesc_close(nni_posix_epdesc *);
extern void nni_posix_epdesc_connect(nni_posix_epdesc *, nni_aio *);
+extern int nni_posix_epdesc_listen(nni_posix_epdesc *);
extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *);
#endif // PLATFORM_POSIX_AIO_H
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index f419d25a..29420c26 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -42,6 +42,7 @@ struct nni_posix_epdesc {
struct sockaddr_storage remaddr;
socklen_t loclen;
socklen_t remlen;
+ const char * url;
nni_mtx mtx;
};
@@ -71,7 +72,7 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
if (rv != 0) {
(void) close(newfd);
} else {
- aio->a_pipe = pipe;
+ aio->a_pipe = pd;
}
}
// Abuse the count to hold our new fd. This is only for accept.
@@ -110,6 +111,9 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
return;
default:
+ if (rv == ENOENT) {
+ rv = ECONNREFUSED;
+ }
nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
close(ed->fd);
ed->fd = -1;
@@ -232,65 +236,25 @@ nni_posix_epdesc_close(nni_posix_epdesc *ed)
}
-// UNIX DOMAIN SOCKETS -- these have names in the file namespace.
-// We are going to check to see if there was a name already there.
-// If there was, and nothing is listening (ECONNREFUSED), then we
-// will just try to cleanup the old socket. Note that this is not
-// perfect in all scenarios, so use this with caution.
-static int
-nni_posix_epdesc_remove_stale_ipc_socket(struct sockaddr *sa, socklen_t len)
-{
- int fd;
- struct sockaddr_un *sun = (void *) sa;
-
- if ((len == 0) || (sun->sun_family != AF_UNIX)) {
- return (0);
- }
-
- if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
-
- // There is an assumption here that connect() returns immediately
- // (even when non-blocking) when a server is absent. This seems
- // to be true for the platforms we've tried. If it doesn't work,
- // then the cleanup will fail. As this is supposed to be an
- // exceptional case, don't worry.
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if (connect(fd, (void *) sun, len) < 0) {
- if (errno == ECONNREFUSED) {
- (void) unlink(sun->sun_path);
- }
- }
- (void) close(fd);
- return (0);
-}
-
-
int
-nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr)
+nni_posix_epdesc_listen(nni_posix_epdesc *ed)
{
int len;
- struct sockaddr_storage ss;
+ struct sockaddr_storage *ss;
int rv;
int fd;
- if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) {
- return (NNG_EADDRINVAL);
- }
+ nni_mtx_lock(&ed->mtx);
+ ss = &ed->locaddr;
+ len = ed->loclen;
- if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
return (nni_plat_errno(errno));
}
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
- rv = nni_posix_epdesc_remove_stale_ipc_socket((void *) &ss, len);
- if (rv != 0) {
- (void) close(fd);
- return (rv);
- }
-
- if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ if (bind(fd, (struct sockaddr *) ss, len) < 0) {
+ nni_mtx_unlock(&ed->mtx);
rv = nni_plat_errno(errno);
(void) close(fd);
return (rv);
@@ -299,12 +263,15 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr)
// Listen -- 128 depth is probably sufficient. If it isn't, other
// bad things are going to happen.
if (listen(fd, 128) != 0) {
+ nni_mtx_unlock(&ed->mtx);
rv = nni_plat_errno(errno);
(void) close(fd);
return (rv);
}
ed->fd = fd;
+ ed->node.fd = fd;
+ nni_mtx_unlock(&ed->mtx);
return (0);
}
@@ -363,6 +330,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
nni_posix_epdesc_finish(aio, rv, 0);
return;
}
+ ed->node.fd = ed->fd;
// Possibly bind.
if (ed->loclen != 0) {
@@ -380,6 +348,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (rv == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
+
nni_posix_epdesc_finish(aio, 0, ed->fd);
ed->fd = -1;
nni_mtx_unlock(&ed->mtx);
@@ -388,6 +357,9 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (errno != EINPROGRESS) {
// Some immediate failure occurred.
+ if (errno == ENOENT) {
+ errno = ECONNREFUSED;
+ }
(void) close(ed->fd);
ed->fd = -1;
nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
@@ -413,7 +385,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
int
-nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
+nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
{
nni_posix_epdesc *ed;
int rv;
@@ -432,14 +404,12 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
// one. For now we just have a global pollq. Note that by tying
// the ed to a single pollq we may get some kind of cache warmth.
- ed->pq = nni_posix_pollq_get(fd);
- ed->fd = fd;
+ ed->pq = nni_posix_pollq_get((int) nni_random());
+ ed->fd = -1;
ed->node.index = 0;
- ed->node.cb = NULL; // XXXX:
+ ed->node.cb = nni_posix_epdesc_cb;
ed->node.data = ed;
-
- // Ensure we are in non-blocking mode.
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ ed->url = url;
nni_aio_list_init(&ed->connectq);
nni_aio_list_init(&ed->acceptq);
@@ -449,6 +419,39 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
}
+const char *
+nni_posix_epdesc_url(nni_posix_epdesc *ed)
+{
+ return (ed->url);
+}
+
+
+void
+nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, int len)
+{
+ if ((len < 0) || (len > sizeof (struct sockaddr_storage))) {
+ return;
+ }
+ nni_mtx_lock(&ed->mtx);
+ memcpy(&ed->locaddr, sa, len);
+ ed->loclen = len;
+ nni_mtx_unlock(&ed->mtx);
+}
+
+
+void
+nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len)
+{
+ if ((len < 0) || (len > sizeof (struct sockaddr_storage))) {
+ return;
+ }
+ nni_mtx_lock(&ed->mtx);
+ memcpy(&ed->remaddr, sa, len);
+ ed->remlen = len;
+ nni_mtx_unlock(&ed->mtx);
+}
+
+
void
nni_posix_epdesc_fini(nni_posix_epdesc *ed)
{
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index c1911a05..5bd42190 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -25,105 +25,181 @@
#include <unistd.h>
#include <netdb.h>
+#ifdef SOCK_CLOEXEC
+#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
+#else
+#define NNI_STREAM_SOCKTYPE SOCK_STREAM
+#endif
+
+
// Solaris/SunOS systems define this, which collides with our symbol
// names. Just undefine it now.
#ifdef sun
#undef sun
#endif
-// We alias nni_posix_pipdedesc to nni_plat_ipcsock.
+// We alias nni_posix_pipedesc to nni_plat_ipc_pipe.
+// We alias nni_posix_epdesc to nni_plat_ipc_ep.
static int
-nni_plat_ipc_path_resolve(nni_sockaddr *addr, const char *path)
+nni_plat_ipc_path_resolve(struct sockaddr_un *sun, const char *path)
{
- nng_sockaddr_path *spath;
size_t len;
- memset(addr, 0, sizeof (*addr));
- spath = &addr->s_un.s_path;
+ memset(sun, 0, sizeof (*sun));
// TODO: abstract sockets, including autobind sockets.
len = strlen(path);
- if ((len >= sizeof (spath->sa_path)) || (len < 1)) {
+ if ((len >= sizeof (sun->sun_path)) || (len < 1)) {
return (NNG_EADDRINVAL);
}
- (void) snprintf(spath->sa_path, sizeof (spath->sa_path), "%s", path);
- spath->sa_family = NNG_AF_IPC;
+ (void) snprintf(sun->sun_path, sizeof (sun->sun_path), "%s", path);
+ sun->sun_family = AF_UNIX;
+ return (0);
+}
+
+
+int
+nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const char *url)
+{
+ nni_posix_epdesc *ed;
+ int rv;
+
+ if (strncmp(url, "ipc://", strlen("ipc://")) != 0) {
+ return (NNG_EADDRINVAL);
+ }
+ url += strlen("ipc://"); // skip the prefix.
+ if ((rv = nni_posix_epdesc_init(&ed, url)) != 0) {
+ return (rv);
+ }
+
+ *epp = (void *) ed;
return (0);
}
void
-nni_plat_ipc_send(nni_plat_ipcsock *s, nni_aio *aio)
+nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
{
- nni_posix_sock_aio_send((void *) s, aio);
+ nni_posix_epdesc_fini((void *) ep);
}
void
-nni_plat_ipc_recv(nni_plat_ipcsock *s, nni_aio *aio)
+nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
+{
+ nni_posix_epdesc_close((void *) ep);
+}
+
+
+// UNIX DOMAIN SOCKETS -- these have names in the file namespace.
+// We are going to check to see if there was a name already there.
+// If there was, and nothing is listening (ECONNREFUSED), then we
+// will just try to cleanup the old socket. Note that this is not
+// perfect in all scenarios, so use this with caution.
+static int
+nni_plat_ipc_remove_stale(struct sockaddr_un *sun)
{
- nni_posix_sock_aio_recv((void *) s, aio);
+ int fd;
+ int rv;
+
+ if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ // There is an assumption here that connect() returns immediately
+ // (even when non-blocking) when a server is absent. This seems
+ // to be true for the platforms we've tried. If it doesn't work,
+ // then the cleanup will fail. As this is supposed to be an
+ // exceptional case, don't worry.
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ if (connect(fd, (void *) sun, sizeof (*sun)) < 0) {
+ if (errno == ECONNREFUSED) {
+ (void) unlink(sun->sun_path);
+ }
+ }
+ (void) close(fd);
+ return (0);
}
int
-nni_plat_ipc_init(nni_plat_ipcsock **sp)
+nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
{
- nni_posix_sock *s;
+ const char *path;
+ nni_posix_epdesc *ed = (void *) ep;
+ struct sockaddr_un sun;
int rv;
- if ((rv = nni_posix_sock_init(&s)) == 0) {
- *sp = (void *) s;
+ path = nni_posix_epdesc_url(ed);
+
+ if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) {
+ return (rv);
+ }
+
+ if ((rv = nni_plat_ipc_remove_stale(&sun)) != 0) {
+ return (rv);
}
- return (rv);
+
+ nni_posix_epdesc_set_local(ed, &sun, sizeof (sun));
+
+ return (nni_posix_epdesc_listen(ed));
}
void
-nni_plat_ipc_fini(nni_plat_ipcsock *s)
+nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
- nni_posix_sock_fini((void *) s);
+ const char *path;
+ nni_posix_epdesc *ed = (void *) ep;
+ struct sockaddr_un sun;
+ int rv;
+
+ path = nni_posix_epdesc_url(ed);
+
+ if ((rv = nni_plat_ipc_path_resolve(&sun, path)) != 0) {
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ nni_posix_epdesc_set_remote(ed, &sun, sizeof (sun));
+
+ nni_posix_epdesc_connect(ed, aio);
}
void
-nni_plat_ipc_shutdown(nni_plat_ipcsock *s)
+nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
{
- nni_posix_sock_shutdown((void *) s);
+ nni_posix_epdesc_accept((void *) ep, aio);
}
-int
-nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path)
+void
+nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p)
{
- int rv;
- nni_sockaddr addr;
-
- if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) {
- return (rv);
- }
- return (nni_posix_sock_listen((void *) s, &addr));
+ nni_posix_pipedesc_fini((void *) p);
}
-int
-nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path)
+void
+nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p)
{
- int rv;
- nni_sockaddr addr;
+ nni_posix_pipedesc_close((void *) p);
+}
- if ((rv = nni_plat_ipc_path_resolve(&addr, path)) != 0) {
- return (rv);
- }
- return (nni_posix_sock_connect_sync((void *) s, &addr, NULL));
+
+void
+nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio)
+{
+ nni_posix_pipedesc_send((void *) p, aio);
}
-int
-nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server)
+void
+nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio)
{
- return (nni_posix_sock_accept_sync((void *) s, (void *) server));
+ nni_posix_pipedesc_recv((void *) p, aio);
}
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index a70c902d..d20e7b44 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -180,7 +180,6 @@ nni_posix_poll_thr(void *arg)
// Clear the index for the next time around.
node->index = 0;
-
node->revents = fds[index].revents;
// Now we move this node to the callback list.
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 6ad9da75..d9ddeb2d 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -23,7 +23,7 @@ typedef struct nni_ipc_ep nni_ipc_ep;
// nni_ipc_pipe is one end of an IPC connection.
struct nni_ipc_pipe {
const char * addr;
- nni_plat_ipcsock * isp;
+ nni_plat_ipc_pipe * ipp;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
@@ -47,16 +47,21 @@ struct nni_ipc_pipe {
struct nni_ipc_ep {
char addr[NNG_MAXADDRLEN+1];
- nni_plat_ipcsock * isp;
+ nni_plat_ipc_ep * iep;
int closed;
uint16_t proto;
size_t rcvmax;
+ nni_aio aio;
+ nni_aio * user_aio;
+ nni_mtx mtx;
};
static void nni_ipc_pipe_send_cb(void *);
static void nni_ipc_pipe_recv_cb(void *);
static void nni_ipc_pipe_nego_cb(void *);
+static void nni_ipc_ep_cb(void *);
+
static int
nni_ipc_tran_init(void)
@@ -76,7 +81,7 @@ nni_ipc_pipe_close(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_plat_ipc_shutdown(pipe->isp);
+ nni_plat_ipc_pipe_close(pipe->ipp);
}
@@ -88,8 +93,8 @@ nni_ipc_pipe_fini(void *arg)
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
nni_aio_fini(&pipe->negaio);
- if (pipe->isp != NULL) {
- nni_plat_ipc_fini(pipe->isp);
+ if (pipe->ipp != NULL) {
+ nni_plat_ipc_pipe_fini(pipe->ipp);
}
if (pipe->rxmsg) {
nni_msg_free(pipe->rxmsg);
@@ -100,7 +105,7 @@ nni_ipc_pipe_fini(void *arg)
static int
-nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep)
+nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
{
nni_ipc_pipe *pipe;
int rv;
@@ -111,9 +116,6 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep)
if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
goto fail;
}
- if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) {
- goto fail;
- }
rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe);
if (rv != 0) {
goto fail;
@@ -129,6 +131,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep)
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
+ pipe->ipp = ipp;
+ pipe->addr = ep->addr;
*pipep = pipe;
return (0);
@@ -177,7 +181,7 @@ nni_ipc_pipe_nego_cb(void *arg)
aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead;
aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead];
// send it down...
- nni_plat_ipc_send(pipe->isp, aio);
+ nni_plat_ipc_pipe_send(pipe->ipp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -185,7 +189,7 @@ nni_ipc_pipe_nego_cb(void *arg)
aio->a_niov = 1;
aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead;
aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead];
- nni_plat_ipc_recv(pipe->isp, aio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -310,7 +314,7 @@ nni_ipc_pipe_recv_cb(void *arg)
pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
pipe->rxaio.a_niov = 1;
- nni_plat_ipc_recv(pipe->isp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -367,7 +371,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
pipe->txaio.a_niov = 3;
- nni_plat_ipc_send(pipe->isp, &pipe->txaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -406,7 +410,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead);
pipe->rxaio.a_niov = 1;
- nni_plat_ipc_recv(pipe->isp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -438,7 +442,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
nni_mtx_unlock(&pipe->mtx);
return;
}
- nni_plat_ipc_send(pipe->isp, &pipe->negaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -476,26 +480,41 @@ nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
}
+static void
+nni_ipc_ep_fini(void *arg)
+{
+ nni_ipc_ep *ep = arg;
+
+ nni_plat_ipc_ep_fini(ep->iep);
+ nni_aio_fini(&ep->aio);
+ nni_mtx_fini(&ep->mtx);
+ NNI_FREE_STRUCT(ep);
+}
+
+
static int
nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock)
{
nni_ipc_ep *ep;
int rv;
- if (strlen(url) > NNG_MAXADDRLEN-1) {
+ if ((strlen(url) > NNG_MAXADDRLEN-1) ||
+ (strncmp(url, "ipc://", strlen("ipc://")) != 0)) {
return (NNG_EADDRINVAL);
}
+
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
+ if (((rv = nni_mtx_init(&ep->mtx)) != 0) ||
+ ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) ||
+ ((rv = nni_plat_ipc_ep_init(&ep->iep, url)) != 0)) {
+ nni_ipc_ep_fini(ep);
+ return (rv);
+ }
ep->closed = 0;
ep->proto = nni_sock_proto(sock);
ep->rcvmax = nni_sock_rcvmaxsz(sock);
- if ((rv = nni_plat_ipc_init(&ep->isp)) != 0) {
- NNI_FREE_STRUCT(ep);
- return (rv);
- }
-
(void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
*epp = ep;
@@ -504,89 +523,120 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock)
static void
-nni_ipc_ep_fini(void *arg)
+nni_ipc_ep_close(void *arg)
{
nni_ipc_ep *ep = arg;
- nni_plat_ipc_fini(ep->isp);
- NNI_FREE_STRUCT(ep);
+ nni_plat_ipc_ep_close(ep->iep);
}
-static void
-nni_ipc_ep_close(void *arg)
+static int
+nni_ipc_ep_bind(void *arg)
{
nni_ipc_ep *ep = arg;
- nni_plat_ipc_shutdown(ep->isp);
+ return (nni_plat_ipc_ep_listen(ep->iep));
}
-static int
-nni_ipc_ep_connect_sync(void *arg, void **pipep)
+static void
+nni_ipc_ep_finish(nni_ipc_ep *ep)
{
- nni_ipc_ep *ep = arg;
+ nni_aio *aio = ep->user_aio;
nni_ipc_pipe *pipe;
int rv;
- const char *path;
- if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) {
- return (NNG_EADDRINVAL);
+ if ((aio = ep->user_aio) == NULL) {
+ return;
+ }
+ ep->user_aio = NULL;
+ if ((rv = nni_aio_result(&ep->aio)) != 0) {
+ goto done;
}
- path = ep->addr + strlen("ipc://");
+ NNI_ASSERT(ep->aio.a_pipe != NULL);
- if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) {
- return (rv);
+ // Attempt to allocate the parent pipe. If this fails we'll
+ // drop the connection (ENOMEM probably).
+ if ((rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe)) != 0) {
+ nni_plat_ipc_pipe_fini(ep->aio.a_pipe);
+ goto done;
}
+ aio->a_pipe = pipe;
- rv = nni_plat_ipc_connect(pipe->isp, path);
- if (rv != 0) {
- nni_ipc_pipe_fini(pipe);
- return (rv);
- }
+done:
+ ep->aio.a_pipe = NULL;
+ nni_aio_finish(aio, rv, 0);
+}
- *pipep = pipe;
- return (0);
+
+static void
+nni_ipc_ep_cb(void *arg)
+{
+ nni_ipc_ep *ep = arg;
+
+ nni_mtx_lock(&ep->mtx);
+ nni_ipc_ep_finish(ep);
+ nni_mtx_unlock(&ep->mtx);
}
-static int
-nni_ipc_ep_bind(void *arg)
+static void
+nni_ipc_cancel_ep(nni_aio *aio)
+{
+ nni_ipc_ep *ep = aio->a_prov_data;
+
+ nni_mtx_lock(&ep->mtx);
+ if (ep->user_aio == aio) {
+ ep->user_aio = NULL;
+ }
+ nni_aio_stop(&ep->aio);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+
+static void
+nni_ipc_ep_accept(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
int rv;
- const char *path;
- // We want to strok this, so make a copy. Skip the scheme.
- if (strncmp(ep->addr, "ipc://", strlen("ipc://")) != 0) {
- return (NNG_EADDRINVAL);
- }
- path = ep->addr + strlen("ipc://");
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ ep->user_aio = aio;
- if ((rv = nni_plat_ipc_listen(ep->isp, path)) != 0) {
- return (rv);
+ // If we can't start, then its dying and we can't report either,
+ if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ ep->user_aio = NULL;
+ nni_mtx_unlock(&ep->mtx);
+ return;
}
- return (0);
+
+ nni_plat_ipc_ep_accept(ep->iep, &ep->aio);
+ nni_mtx_unlock(&ep->mtx);
}
-static int
-nni_ipc_ep_accept_sync(void *arg, void **pipep)
+static void
+nni_ipc_ep_connect(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
- nni_ipc_pipe *pipe;
int rv;
- if ((rv = nni_ipc_pipe_init(&pipe, ep)) != 0) {
- return (rv);
- }
- if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) {
- nni_ipc_pipe_fini(pipe);
- return (rv);
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ ep->user_aio = aio;
+
+ // If we can't start, then its dying and we can't report either,
+ if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ ep->user_aio = NULL;
+ nni_mtx_unlock(&ep->mtx);
+ return;
}
- *pipep = pipe;
- return (0);
+
+ nni_plat_ipc_ep_connect(ep->iep, &ep->aio);
+ nni_mtx_unlock(&ep->mtx);
}
@@ -601,14 +651,14 @@ static nni_tran_pipe nni_ipc_pipe_ops = {
};
static nni_tran_ep nni_ipc_ep_ops = {
- .ep_init = nni_ipc_ep_init,
- .ep_fini = nni_ipc_ep_fini,
- .ep_connect_sync = nni_ipc_ep_connect_sync,
- .ep_bind = nni_ipc_ep_bind,
- .ep_accept_sync = nni_ipc_ep_accept_sync,
- .ep_close = nni_ipc_ep_close,
- .ep_setopt = NULL,
- .ep_getopt = NULL,
+ .ep_init = nni_ipc_ep_init,
+ .ep_fini = nni_ipc_ep_fini,
+ .ep_connect = nni_ipc_ep_connect,
+ .ep_bind = nni_ipc_ep_bind,
+ .ep_accept = nni_ipc_ep_accept,
+ .ep_close = nni_ipc_ep_close,
+ .ep_setopt = NULL,
+ .ep_getopt = NULL,
};
// This is the IPC transport linkage, and should be the only global