aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt16
-rw-r--r--src/core/platform.h143
-rw-r--r--src/platform/posix/posix_epdesc.c597
-rw-r--r--src/platform/posix/posix_ipc.c250
-rw-r--r--src/platform/posix/posix_ipc.h48
-rw-r--r--src/platform/posix/posix_ipcconn.c494
-rw-r--r--src/platform/posix/posix_ipcdial.c238
-rw-r--r--src/platform/posix/posix_ipclisten.c373
-rw-r--r--src/platform/posix/posix_pipedesc.c503
-rw-r--r--src/platform/posix/posix_resolv_gai.c1
-rw-r--r--src/platform/posix/posix_tcpconn.c1
-rw-r--r--src/platform/posix/posix_tcpdial.c1
-rw-r--r--src/platform/posix/posix_tcplisten.c5
-rw-r--r--src/platform/posix/posix_udp.c3
-rw-r--r--src/platform/windows/win_impl.h3
-rw-r--r--src/platform/windows/win_io.c11
-rw-r--r--src/platform/windows/win_ipc.c673
-rw-r--r--src/platform/windows/win_ipc.h62
-rw-r--r--src/platform/windows/win_ipcconn.c388
-rw-r--r--src/platform/windows/win_ipcdial.c265
-rw-r--r--src/platform/windows/win_ipclisten.c296
-rw-r--r--src/platform/windows/win_tcpconn.c15
-rw-r--r--src/platform/windows/win_tcpdial.c7
-rw-r--r--src/platform/windows/win_tcplisten.c12
-rw-r--r--src/transport/ipc/ipc.c1010
-rw-r--r--src/transport/tcp/tcp.c43
-rw-r--r--src/transport/tls/tls.c21
27 files changed, 2913 insertions, 2566 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 11c7d82e..ed96b6bc 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -87,19 +87,20 @@ set (NNG_SOURCES
if (NNG_PLATFORM_POSIX)
set (NNG_SOURCES ${NNG_SOURCES}
platform/posix/posix_impl.h
+ platform/posix/posix_ipc.h
platform/posix/posix_config.h
- platform/posix/posix_aio.h
platform/posix/posix_pollq.h
+ platform/posix/posix_tcp.h
platform/posix/posix_alloc.c
platform/posix/posix_atomic.c
platform/posix/posix_clock.c
platform/posix/posix_debug.c
- platform/posix/posix_epdesc.c
platform/posix/posix_file.c
- platform/posix/posix_ipc.c
+ platform/posix/posix_ipcconn.c
+ platform/posix/posix_ipcdial.c
+ platform/posix/posix_ipclisten.c
platform/posix/posix_pipe.c
- platform/posix/posix_pipedesc.c
platform/posix/posix_rand.c
platform/posix/posix_resolv_gai.c
platform/posix/posix_sockaddr.c
@@ -132,12 +133,17 @@ endif()
if (NNG_PLATFORM_WINDOWS)
set (NNG_SOURCES ${NNG_SOURCES}
platform/windows/win_impl.h
+ platform/windows/win_ipc.h
+ platform/windows/win_tcp.h
+
platform/windows/win_clock.c
platform/windows/win_debug.c
platform/windows/win_file.c
platform/windows/win_io.c
platform/windows/win_iocp.c
- platform/windows/win_ipc.c
+ platform/windows/win_ipcconn.c
+ platform/windows/win_ipcdial.c
+ platform/windows/win_ipclisten.c
platform/windows/win_pipe.c
platform/windows/win_rand.c
platform/windows/win_resolv.c
diff --git a/src/core/platform.h b/src/core/platform.h
index 5045b172..2ae0fd0b 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -215,7 +215,7 @@ typedef struct nni_tcp_listener nni_tcp_listener;
extern void nni_tcp_conn_fini(nni_tcp_conn *);
-// nni_tcp_dialer_close closes the dialer, which might actually be
+// nni_tcp_conn_close closes the connection, which might actually be
// implemented as a shutdown() call.
// Further operations on it should return NNG_ECLOSED.
extern void nni_tcp_conn_close(nni_tcp_conn *);
@@ -248,7 +248,7 @@ extern int nni_tcp_conn_set_nodelay(nni_tcp_conn *, bool);
// keepalive probes. Tuning of these keepalives is current unsupported.
extern int nni_tcp_conn_set_keepalive(nni_tcp_conn *, bool);
-// nni_tcp_listener_init creates a new dialer object.
+// nni_tcp_dialer_init creates a new dialer object.
extern int nni_tcp_dialer_init(nni_tcp_dialer **);
// nni_tcp_dialer_fini finalizes the dialer, closing it and freeing
@@ -311,81 +311,102 @@ extern void nni_udp_resolv(const char *, const char *, int, int, nni_aio *);
// IPC (UNIX Domain Sockets & Named Pipes) Support.
//
-typedef struct nni_plat_ipc_ep nni_plat_ipc_ep;
-typedef struct nni_plat_ipc_pipe nni_plat_ipc_pipe;
+typedef struct nni_ipc_conn nni_ipc_conn;
+typedef struct nni_ipc_dialer nni_ipc_dialer;
+typedef struct nni_ipc_listener nni_ipc_listener;
-// nni_plat_ipc_ep_init creates a new endpoint associated with the url.
-// The final field is the mode, either for dialing (NNI_EP_MODE_DIAL) or
-// listening (NNI_EP_MODE_LISTEN).
-extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const nni_sockaddr *, int);
+// nni_ipc_conn_fini disposes of the connection.
+extern void nni_ipc_conn_fini(nni_ipc_conn *);
-// nni_plat_ipc_ep_fini closes the endpoint and releases resources.
-extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *);
+// nni_ipc_conn_close closes the connection, which might actually be
+// implemented as a shutdown() call.
+// Further operations on it should return NNG_ECLOSED.
+extern void nni_ipc_conn_close(nni_ipc_conn *);
-// nni_plat_ipc_ep_close closes the endpoint; this might not close the
-// actual underlying socket, but it should call shutdown on it.
-// Further operations on the pipe should return NNG_ECLOSED.
-extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *);
+// nni_ipc_conn_send sends data in the iov buffers to the peer.
+// The platform may modify the iovs.
+extern void nni_ipc_conn_send(nni_ipc_conn *, nni_aio *);
-// nni_plat_ipc_listen creates an IPC socket in listening mode, bound
-// to the specified path.
-extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *);
+// nni_ipc_conn_recv receives data into the buffers provided by the
+// I/O vector (iovs). The platform should attempt to scatter the received
+// data into the iovs if possible.
+//
+// It is possible for the reader to return less data than is requested,
+// in which case the caller is responsible for resubmitting. The platform
+// must not return "zero" data however. (It is an error to attempt to
+// receive zero bytes.) The platform may modify the iovs.
+extern void nni_ipc_conn_recv(nni_ipc_conn *, nni_aio *);
-// nni_plat_ipc_ep_accept starts an accept to receive an incoming connection.
-// An accepted connection will be passed back in the a_pipe member.
-extern void nni_plat_ipc_ep_accept(nni_plat_ipc_ep *, nni_aio *);
+// nni_ipc_conn_get_peer_uid obtains the peer user id, if possible.
+// NB: Only POSIX systems support user IDs.
+extern int nni_ipc_conn_get_peer_uid(nni_ipc_conn *, uint64_t *);
-// nni_plat_ipc_connect is the client side.
-// An accepted connection will be passed back in the a_pipe member.
-extern void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *, nni_aio *);
+// nni_ipc_conn_get_peer_gid obtains the peer group id, if possible.
+// NB: Only POSIX systems support group IDs.
+extern int nni_ipc_conn_get_peer_gid(nni_ipc_conn *, uint64_t *);
-// nni_plat_ipc_ep_set_security_descriptor sets the Windows security
-// descriptor. This is *only* supported for Windows platforms. All
-// others return NNG_ENOTSUP. The void argument is a pointer to
-// a SECURITY_DESCRIPTOR object, and must be valid.
-extern int nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *, void *);
+// nni_ipc_conn_get_peer_pid obtains the peer process id, if possible.
+extern int nni_ipc_conn_get_peer_pid(nni_ipc_conn *, uint64_t *);
-// nni_plat_ipc_ep_set_permissions sets UNIX style permissions
-// on the named pipes. This basically just does a chmod() on the
-// named pipe, and is only supported o the server side, and only on
-// systems that support this (POSIX, not Windows). Note that changing
-// ownership is not supported at this time. Most systems use only
-// 16-bits, the lower 12 of which are user, group, and other, e.g.
-// 0640 gives read/write access to user, read to group, and prevents
-// any other user from accessing it. This option only has meaning
-// for listeners, on dialers it is ignored.
-extern int nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *, uint32_t);
+// nni_ipc_conn_get_peer_zoneid obtains the peer zone id, if possible.
+// NB: Only illumos & SunOS systems have the notion of "zones".
+extern int nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *, uint64_t *);
+
+// nni_ipc_dialer_init creates a new dialer object.
+extern int nni_ipc_dialer_init(nni_ipc_dialer **);
-// nni_plat_ipc_pipe_fini closes the pipe, and releases all resources
-// associated with it.
-extern void nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *);
+// nni_ipc_dialer_fini finalizes the dialer, closing it and freeing
+// all resources.
+extern void nni_ipc_dialer_fini(nni_ipc_dialer *);
-// nni_plat_ipc_pipe_close closes the socket, or at least shuts it down.
-// Further operations on the pipe should return NNG_ECLOSED.
-extern void nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *);
+// nni_ipc_dialer_close closes the dialer.
+// Further operations on it should return NNG_ECLOSED. Any in-progress
+// connection will be aborted.
+extern void nni_ipc_dialer_close(nni_ipc_dialer *);
-// nni_plat_ipc_pipe_send sends data in the iov buffers to the peer.
-// The platform may modify the iovs.
-extern void nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *, nni_aio *);
+// nni_ipc_dialer_dial attempts to create an outgoing connection,
+// asynchronously, to the address specified. On success, the first (and only)
+// output will be an nni_ipc_conn * associated with the remote server.
+extern void nni_ipc_dialer_dial(
+ nni_ipc_dialer *, const nni_sockaddr *, nni_aio *);
-// nni_plat_ipc_pipe_recv recvs data into the buffers provided by the iovs.
-// The platform may modify the iovs.
-extern void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *, nni_aio *);
+// nni_ipc_listener_init creates a new listener object, unbound.
+extern int nni_ipc_listener_init(nni_ipc_listener **);
-// nni_plat_ipc_pipe_get_peer_uid obtains the peer user id, if possible.
-// NB: Only POSIX systems support user IDs.
-extern int nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *, uint64_t *);
+// nni_ipc_listener_fini frees the listener and all associated resources.
+// It implictly closes the listener as well.
+extern void nni_ipc_listener_fini(nni_ipc_listener *);
-// nni_plat_ipc_pipe_get_peer_gid obtains the peer group id, if possible.
-// NB: Only POSIX systems support group IDs.
-extern int nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *, uint64_t *);
+// nni_ipc_listener_close closes the listener. This will unbind
+// any bound socket, and further operations will result in NNG_ECLOSED.
+extern void nni_ipc_listener_close(nni_ipc_listener *);
-// nni_plat_ipc_pipe_get_peer_pid obtains the peer process id, if possible.
-extern int nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *, uint64_t *);
+// nni_ipc_listener_listen creates the socket in listening mode, bound
+// to the specified address. Unlike TCP, this address does not change.
+extern int nni_ipc_listener_listen(nni_ipc_listener *, const nni_sockaddr *);
-// nni_plat_ipc_pipe_get_peer_zoneid obtains the peer zone id, if possible.
-// NB: Only illumos & SunOS systems have the notion of "zones".
-extern int nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *, uint64_t *);
+// nni_ipc_listener_accept accepts in incoming connect, asynchronously.
+// On success, the first (and only) output will be an nni_ipc_conn *
+// associated with the remote peer.
+extern void nni_ipc_listener_accept(nni_ipc_listener *, nni_aio *);
+
+// nni_ipc_listener_set_permissions sets UNIX style permissions
+// on the named pipes. This basically just does a chmod() on the
+// named pipe, and is only supported o the server side, and only on
+// systems that support this (POSIX, not Windows). Note that changing
+// ownership is not supported at this time. Most systems use only
+// 16-bits, the lower 12 of which are user, group, and other, e.g.
+// 0640 gives read/write access to user, read to group, and prevents
+// any other user from accessing it. On platforms without this support,
+// ENOTSUP is returned.
+extern int nni_ipc_listener_set_permissions(nni_ipc_listener *, int);
+
+// nni_ipc_listener_set_security_descriptor sets the Windows security
+// descriptor. This is *only* supported for Windows platforms. All
+// others return NNG_ENOTSUP. The void argument is a pointer to
+// a SECURITY_DESCRIPTOR object, and must be valid.
+extern int nni_ipc_listener_set_security_descriptor(
+ nni_ipc_listener *, void *);
//
// UDP support. UDP is not connection oriented, and only has the notion
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
deleted file mode 100644
index d796765a..00000000
--- a/src/platform/posix/posix_epdesc.c
+++ /dev/null
@@ -1,597 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_pollq.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <poll.h>
-#include <stdbool.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-#ifdef sun
-#undef sun
-#endif
-
-#ifdef SOCK_CLOEXEC
-#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define NNI_STREAM_SOCKTYPE SOCK_STREAM
-#endif
-
-struct nni_posix_epdesc {
- nni_posix_pfd * pfd;
- nni_list connectq;
- nni_list acceptq;
- bool closed;
- bool started;
- bool ipcbound; // if true unlink socket on exit
- struct sockaddr_storage locaddr;
- struct sockaddr_storage remaddr;
- socklen_t loclen;
- socklen_t remlen;
- mode_t perms; // UNIX sockets only
- int mode; // end point mode (dialer/listener)
- nni_mtx mtx;
-};
-
-static void nni_epdesc_connect_cb(nni_posix_pfd *, int, void *);
-static void nni_epdesc_accept_cb(nni_posix_pfd *, int, void *);
-
-static void
-nni_epdesc_cancel(nni_aio *aio, int rv)
-{
- nni_posix_epdesc *ed = nni_aio_get_prov_data(aio);
- nni_posix_pfd * pfd = NULL;
-
- NNI_ASSERT(rv != 0);
- nni_mtx_lock(&ed->mtx);
- if (nni_aio_list_active(aio)) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- }
- if ((ed->mode == NNI_EP_MODE_DIAL) && nni_list_empty(&ed->connectq) &&
- ((pfd = ed->pfd) != NULL)) {
- nni_posix_pfd_close(pfd);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-static void
-nni_epdesc_finish(nni_aio *aio, int rv, nni_posix_pfd *newpfd)
-{
- nni_posix_pipedesc *pd = NULL;
-
- // acceptq or connectq.
- nni_aio_list_remove(aio);
-
- if (rv != 0) {
- NNI_ASSERT(newpfd == NULL);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- NNI_ASSERT(newpfd != NULL);
- if ((rv = nni_posix_pipedesc_init(&pd, newpfd)) != 0) {
- nni_posix_pfd_fini(newpfd);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_set_output(aio, 0, pd);
- nni_aio_finish(aio, 0, 0);
-}
-
-static void
-nni_epdesc_doaccept(nni_posix_epdesc *ed)
-{
- nni_aio *aio;
-
- while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- int newfd;
- int fd;
- int rv;
- nni_posix_pfd *pfd;
-
- fd = nni_posix_pfd_fd(ed->pfd);
-
-#ifdef NNG_USE_ACCEPT4
- newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
- if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
- newfd = accept(fd, NULL, NULL);
- }
-#else
- newfd = accept(fd, NULL, NULL);
-#endif
- if (newfd < 0) {
- switch (errno) {
- case EAGAIN:
-#ifdef EWOULDBLOCK
-#if EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
-#endif
-#endif
- rv = nni_posix_pfd_arm(ed->pfd, POLLIN);
- if (rv != 0) {
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
- // Come back later...
- return;
- case ECONNABORTED:
- case ECONNRESET:
- // Eat them, they aren't interesting.
- continue;
- default:
- // Error this one, but keep moving to the next.
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
- close(newfd);
- nni_epdesc_finish(aio, rv, NULL);
- continue;
- }
-
- nni_epdesc_finish(aio, 0, pfd);
- }
-}
-
-static void
-nni_epdesc_doclose(nni_posix_epdesc *ed)
-{
- nni_aio *aio;
-
- ed->closed = true;
- while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- nni_epdesc_finish(aio, NNG_ECLOSED, 0);
- }
- while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- nni_epdesc_finish(aio, NNG_ECLOSED, 0);
- }
-
- if (ed->pfd != NULL) {
-
- nni_posix_pfd_close(ed->pfd);
- }
-
- // clean up stale UNIX socket when closing the server.
- if (ed->ipcbound) {
- struct sockaddr_un *sun = (void *) &ed->locaddr;
- (void) unlink(sun->sun_path);
- }
-}
-
-static void
-nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_epdesc *ed = arg;
-
- NNI_ARG_UNUSED(pfd);
-
- nni_mtx_lock(&ed->mtx);
- if (events & POLLNVAL) {
- nni_epdesc_doclose(ed);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
-
- // Anything else will turn up in accept.
- nni_epdesc_doaccept(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-void
-nni_posix_epdesc_close(nni_posix_epdesc *ed)
-{
- nni_mtx_lock(&ed->mtx);
- nni_epdesc_doclose(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_listen(nni_posix_epdesc *ed)
-{
- int len;
- struct sockaddr_storage *ss;
- int rv;
- int fd;
- nni_posix_pfd * pfd;
-
- nni_mtx_lock(&ed->mtx);
-
- if (ed->started) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ESTATE);
- }
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ECLOSED);
- }
- if ((len = ed->loclen) == 0) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EADDRINVAL);
- }
-
- ss = &ed->locaddr;
- len = ed->loclen;
-
- if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- nni_mtx_unlock(&ed->mtx);
- return (nni_plat_errno(errno));
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
-#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL)
- if (ss->ss_family != AF_UNIX) {
- int on = 1;
- // If for some reason this doesn't work, it's probably ok.
- // Second bind will fail.
- (void) setsockopt(
- fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
- }
-#endif
-
- if (bind(fd, (struct sockaddr *) ss, len) < 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
- // For UNIX domain sockets, optionally set the permission bits.
- // This is done after the bind and before listen, and on the file
- // rather than the file descriptor.
- // Experiments have shown that chmod() works correctly, provided that
- // it is done *before* the listen() operation, whereas fchmod seems to
- // have no impact. This behavior was observed on both macOS and Linux.
- // YMMV on other platforms.
- if (ss->ss_family == AF_UNIX) {
- ed->ipcbound = true;
- if (ed->perms != 0) {
- struct sockaddr_un *sun = (void *) ss;
- mode_t perms = ed->perms & ~(S_IFMT);
- if ((rv = chmod(sun->sun_path, perms)) != 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
- }
- }
-
- // Listen -- 128 depth is probably sufficient. If it isn't, other
- // bad things are going to happen.
- if (listen(fd, 128) != 0) {
- rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_posix_pfd_fini(pfd);
- return (rv);
- }
-
- nni_posix_pfd_set_cb(pfd, nni_epdesc_accept_cb, ed);
-
- ed->pfd = pfd;
- ed->started = true;
- nni_mtx_unlock(&ed->mtx);
-
- return (0);
-}
-
-void
-nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
-{
- int rv;
-
- // Accept is simpler than the connect case. With accept we just
- // need to wait for the socket to be readable to indicate an incoming
- // connection is ready for us. There isn't anything else for us to
- // do really, as that will have been done in listen.
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
-
- if (!ed->started) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ESTATE);
- return;
- }
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
- if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&ed->acceptq, aio);
- if (nni_list_first(&ed->acceptq) == aio) {
- nni_epdesc_doaccept(ed);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_sockname(nni_posix_epdesc *ed, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = -1;
-
- nni_mtx_lock(&ed->mtx);
- if (ed->pfd != NULL) {
- fd = nni_posix_pfd_fd(ed->pfd);
- }
- nni_mtx_unlock(&ed->mtx);
-
- if (getsockname(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-static void
-nni_epdesc_connect_start(nni_posix_epdesc *ed)
-{
- nni_posix_pfd *pfd;
- int fd;
- int rv;
- nni_aio * aio;
-
-loop:
- if ((aio = nni_list_first(&ed->connectq)) == NULL) {
- return;
- }
-
- NNI_ASSERT(ed->pfd == NULL);
- if (ed->closed) {
- nni_epdesc_finish(aio, NNG_ECLOSED, NULL);
- goto loop;
- }
- ed->started = true;
-
- if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- goto loop;
- }
-
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- (void) close(fd);
- nni_epdesc_finish(aio, rv, NULL);
- goto loop;
- }
- // Possibly bind.
- if ((ed->loclen != 0) &&
- (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) {
- rv = nni_plat_errno(errno);
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
-
- if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) {
- // Immediate connect, cool! This probably only happens on
- // loopback, and probably not on every platform.
- nni_epdesc_finish(aio, 0, pfd);
- goto loop;
- }
-
- if (errno != EINPROGRESS) {
- // Some immediate failure occurred.
- if (errno == ENOENT) { // For UNIX domain sockets
- rv = NNG_ECONNREFUSED;
- } else {
- rv = nni_plat_errno(errno);
- }
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
- nni_posix_pfd_set_cb(pfd, nni_epdesc_connect_cb, ed);
- if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
- nni_epdesc_finish(aio, rv, NULL);
- nni_posix_pfd_fini(pfd);
- goto loop;
- }
- ed->pfd = pfd;
- // all done... wait for this to signal via callback
-}
-
-void
-nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- if (ed->closed) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
- if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- ed->started = true;
- nni_list_append(&ed->connectq, aio);
- if (nni_list_first(&ed->connectq) == aio) {
- // If there was a stale pfd (probably from an aborted or
- // canceled connect attempt), discard it so we start fresh.
- if (ed->pfd != NULL) {
- nni_posix_pfd_fini(ed->pfd);
- ed->pfd = NULL;
- }
- nni_epdesc_connect_start(ed);
- }
- nni_mtx_unlock(&ed->mtx);
-}
-
-static void
-nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_epdesc *ed = arg;
- nni_aio * aio;
- socklen_t sz;
- int rv;
- int fd;
-
- nni_mtx_lock(&ed->mtx);
- if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) ||
- (pfd != ed->pfd)) {
- // Spurious completion. Just ignore it.
- nni_mtx_unlock(&ed->mtx);
- return;
- }
-
- fd = nni_posix_pfd_fd(pfd);
- sz = sizeof(rv);
-
- if ((events & POLLNVAL) != 0) {
- rv = EBADF;
-
- } else if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
- rv = errno;
- }
-
- switch (rv) {
- case 0:
- // Good connect!
- ed->pfd = NULL;
- nni_epdesc_finish(aio, 0, pfd);
- break;
- case EINPROGRESS: // still connecting... come back later
- nni_mtx_unlock(&ed->mtx);
- return;
- default:
- ed->pfd = NULL;
- nni_epdesc_finish(aio, nni_plat_errno(rv), NULL);
- nni_posix_pfd_fini(pfd);
- break;
- }
-
- // Start another connect running, if any is waiting.
- nni_epdesc_connect_start(ed);
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode)
-{
- nni_posix_epdesc *ed;
-
- if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- nni_mtx_init(&ed->mtx);
-
- ed->pfd = NULL;
- ed->closed = false;
- ed->started = false;
- ed->perms = 0; // zero means use default (no change)
- ed->mode = mode;
-
- nni_aio_list_init(&ed->connectq);
- nni_aio_list_init(&ed->acceptq);
- *edp = ed;
- return (0);
-}
-
-void
-nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, size_t len)
-{
- if ((len < 1) || (len > sizeof(struct sockaddr_storage))) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- memcpy(&ed->locaddr, sa, len);
- ed->loclen = len;
- nni_mtx_unlock(&ed->mtx);
-}
-
-void
-nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, size_t len)
-{
- if ((len < 1) || (len > sizeof(struct sockaddr_storage))) {
- return;
- }
- nni_mtx_lock(&ed->mtx);
- memcpy(&ed->remaddr, sa, len);
- ed->remlen = len;
- nni_mtx_unlock(&ed->mtx);
-}
-
-int
-nni_posix_epdesc_set_permissions(nni_posix_epdesc *ed, mode_t mode)
-{
- nni_mtx_lock(&ed->mtx);
- if (ed->mode != NNI_EP_MODE_LISTEN) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_ENOTSUP);
- }
- if (ed->started) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EBUSY);
- }
- if ((mode & S_IFMT) != 0) {
- nni_mtx_unlock(&ed->mtx);
- return (NNG_EINVAL);
- }
- ed->perms = mode | S_IFSOCK; // we set IFSOCK to ensure non-zero
- nni_mtx_unlock(&ed->mtx);
- return (0);
-}
-
-void
-nni_posix_epdesc_fini(nni_posix_epdesc *ed)
-{
- nni_posix_pfd *pfd;
-
- nni_mtx_lock(&ed->mtx);
- nni_epdesc_doclose(ed);
- pfd = ed->pfd;
- nni_mtx_unlock(&ed->mtx);
-
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
- nni_mtx_fini(&ed->mtx);
- NNI_FREE_STRUCT(ed);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
deleted file mode 100644
index 5ba3c8fb..00000000
--- a/src/platform/posix/posix_ipc.c
+++ /dev/null
@@ -1,250 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-#ifdef SOCK_CLOEXEC
-#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define NNI_STREAM_SOCKTYPE SOCK_STREAM
-#endif
-
-// Solaris/SunOS systems define this, which collides with our symbol
-// names. Just undefine it now.
-#ifdef sun
-#undef sun
-#endif
-
-static int nni_plat_ipc_remove_stale(const char *path);
-
-// We alias nni_posix_pipedesc to nni_plat_ipc_pipe.
-// We alias nni_posix_epdesc to nni_plat_ipc_ep.
-
-int
-nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode)
-{
- nni_posix_epdesc * ed;
- int rv;
- struct sockaddr_un sun;
-
- if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) {
- return (rv);
- }
- switch (mode) {
- case NNI_EP_MODE_DIAL:
- nni_posix_nn2sockaddr(&sun, sa);
- nni_posix_epdesc_set_remote(ed, &sun, sizeof(sun));
- break;
- case NNI_EP_MODE_LISTEN:
-
- if ((rv = nni_plat_ipc_remove_stale(sa->s_ipc.sa_path)) != 0) {
- return (rv);
- }
-
- nni_posix_nn2sockaddr(&sun, sa);
- nni_posix_epdesc_set_local(ed, &sun, sizeof(sun));
- break;
- default:
- nni_posix_epdesc_fini(ed);
- return (NNG_EINVAL);
- }
-
- *epp = (void *) ed;
- return (0);
-}
-
-void
-nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc_fini((void *) ep);
-}
-
-void
-nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc_close((void *) ep);
-}
-
-int
-nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits)
-{
- return (nni_posix_epdesc_set_permissions((void *) ep, (mode_t) bits));
-}
-
-int
-nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *attr)
-{
- NNI_ARG_UNUSED(ep);
- NNI_ARG_UNUSED(attr);
- return (NNG_ENOTSUP);
-}
-
-// UNIX DOMAIN SOCKETS -- these have names in the file namespace.
-// We are going to check to see if there was a name already there.
-// If there was, and nothing is listening (ECONNREFUSED), then we
-// will just try to cleanup the old socket. Note that this is not
-// perfect in all scenarios, so use this with caution.
-static int
-nni_plat_ipc_remove_stale(const char *path)
-{
- int fd;
- struct sockaddr_un sun;
- size_t sz;
-
- sun.sun_family = AF_UNIX;
- sz = sizeof(sun.sun_path);
-
- if (nni_strlcpy(sun.sun_path, path, sz) >= sz) {
- return (NNG_EADDRINVAL);
- }
-
- if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
- return (nni_plat_errno(errno));
- }
-
- // There is an assumption here that connect() returns immediately
- // (even when non-blocking) when a server is absent. This seems
- // to be true for the platforms we've tried. If it doesn't work,
- // then the cleanup will fail. As this is supposed to be an
- // exceptional case, don't worry.
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if (connect(fd, (void *) &sun, sizeof(sun)) < 0) {
- if (errno == ECONNREFUSED) {
- (void) unlink(path);
- }
- }
- (void) close(fd);
- return (0);
-}
-
-int
-nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
-{
- nni_posix_epdesc *ed = (void *) ep;
-
- return (nni_posix_epdesc_listen(ed));
-}
-
-void
-nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_connect((void *) ep, aio);
-}
-
-void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_posix_epdesc_accept((void *) ep, aio);
-}
-
-void
-nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p)
-{
- nni_posix_pipedesc_fini((void *) p);
-}
-
-void
-nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p)
-{
- nni_posix_pipedesc_close((void *) p);
-}
-
-void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_send((void *) p, aio);
-}
-
-void
-nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio)
-{
- nni_posix_pipedesc_recv((void *) p, aio);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *p, uint64_t *uid)
-{
- int rv;
- uint64_t ignore;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, uid, &ignore, &ignore, &ignore)) != 0) {
- return (rv);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *p, uint64_t *gid)
-{
- int rv;
- uint64_t ignore;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, gid, &ignore, &ignore)) != 0) {
- return (rv);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *p, uint64_t *zid)
-{
- int rv;
- uint64_t ignore;
- uint64_t id;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, &ignore, &ignore, &id)) != 0) {
- return (rv);
- }
- if (id == (uint64_t) -1) {
- // NB: -1 is not a legal zone id (illumos/Solaris)
- return (NNG_ENOTSUP);
- }
- *zid = id;
- return (0);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *p, uint64_t *pid)
-{
- int rv;
- uint64_t ignore;
- uint64_t id;
-
- if ((rv = nni_posix_pipedesc_get_peerid(
- (void *) p, &ignore, &ignore, &id, &ignore)) != 0) {
- return (rv);
- }
- if (id == (uint64_t) -1) {
- // NB: -1 is not a legal process id
- return (NNG_ENOTSUP);
- }
- *pid = id;
- return (0);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h
new file mode 100644
index 00000000..caf7ed7b
--- /dev/null
+++ b/src/platform/posix/posix_ipc.h
@@ -0,0 +1,48 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+#include "platform/posix/posix_aio.h"
+
+#include <sys/types.h> // For mode_t
+
+struct nni_ipc_conn {
+ nni_posix_pfd * pfd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
+ nni_aio * dial_aio;
+ nni_ipc_dialer *dialer;
+ nni_reap_item reap;
+};
+
+struct nni_ipc_dialer {
+ nni_list connq; // pending connections
+ bool closed;
+ nni_mtx mtx;
+};
+
+struct nni_ipc_listener {
+ nni_posix_pfd *pfd;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ char * path;
+ mode_t perms;
+ nni_mtx mtx;
+};
+
+extern int nni_posix_ipc_conn_init(nni_ipc_conn **, nni_posix_pfd *);
+extern void nni_posix_ipc_conn_start(nni_ipc_conn *);
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
new file mode 100644
index 00000000..2b46fb12
--- /dev/null
+++ b/src/platform/posix/posix_ipcconn.c
@@ -0,0 +1,494 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <unistd.h>
+#if defined(NNG_HAVE_GETPEERUCRED)
+#include <ucred.h>
+#elif defined(NNG_HAVE_LOCALPEERCRED)
+#include <sys/ucred.h>
+#endif
+
+#ifdef NNG_HAVE_ALLOCA
+#include <alloca.h>
+#endif
+
+#ifndef MSG_NOSIGNAL
+#define MSG_NOSIGNAL 0
+#endif
+
+#include "posix_ipc.h"
+
+static void
+ipc_conn_dowrite(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->writeq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov * aiov;
+ struct msghdr hdr;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ memset(&hdr, 0, sizeof(hdr));
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len > 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ hdr.msg_iovlen = niov;
+ hdr.msg_iov = iovec;
+
+ if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ nni_aio_bump_count(aio, n);
+ // We completed the entire operation on this aio.
+ // (Sendmsg never returns a partial result.)
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+static void
+ipc_conn_doread(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ int fd;
+
+ if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ return;
+ }
+
+ while ((aio = nni_list_first(&c->readq)) != NULL) {
+ unsigned i;
+ int n;
+ int niov;
+ unsigned naiov;
+ nni_iov *aiov;
+#ifdef NNG_HAVE_ALLOCA
+ struct iovec *iovec;
+#else
+ struct iovec iovec[16];
+#endif
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+#ifdef NNG_HAVE_ALLOCA
+ if (naiov > 64) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+ iovec = alloca(naiov * sizeof(*iovec));
+#else
+ if (naiov > NNI_NUM_ELEMENTS(iovec)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ continue;
+ }
+#endif
+ for (niov = 0, i = 0; i < naiov; i++) {
+ if (aiov[i].iov_len != 0) {
+ iovec[niov].iov_len = aiov[i].iov_len;
+ iovec[niov].iov_base = aiov[i].iov_buf;
+ niov++;
+ }
+ }
+
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(
+ aio, nni_plat_errno(errno));
+ return;
+ }
+ }
+
+ if (n == 0) {
+ // No bytes indicates a closed descriptor.
+ // This implicitly completes this (all!) aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ continue;
+ }
+
+ nni_aio_bump_count(aio, n);
+
+ // We completed the entire operation on this aio.
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+
+ // Go back to start of loop to see if there is another
+ // aio ready for us to process.
+ }
+}
+
+void
+nni_ipc_conn_close(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ nni_aio *aio;
+ c->closed = true;
+ while (((aio = nni_list_first(&c->readq)) != NULL) ||
+ ((aio = nni_list_first(&c->writeq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_posix_pfd_close(c->pfd);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_ipc_conn *c = arg;
+
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
+ nni_ipc_conn_close(c);
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (events & POLLIN) {
+ ipc_conn_doread(c);
+ }
+ if (events & POLLOUT) {
+ ipc_conn_dowrite(c);
+ }
+ events = 0;
+ if (!nni_list_empty(&c->writeq)) {
+ events |= POLLOUT;
+ }
+ if (!nni_list_empty(&c->readq)) {
+ events |= POLLIN;
+ }
+ if ((!c->closed) && (events != 0)) {
+ nni_posix_pfd_arm(pfd, events);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&c->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
+{
+
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->writeq, aio);
+
+ if (nni_list_first(&c->writeq) == aio) {
+ ipc_conn_dowrite(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->writeq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLOUT);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+
+ if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&c->readq, aio);
+
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
+ if (nni_list_first(&c->readq) == aio) {
+ ipc_conn_doread(c);
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
+ if (nni_list_first(&c->readq) == aio) {
+ nni_posix_pfd_arm(c->pfd, POLLIN);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid,
+ uint64_t *prid, uint64_t *znid)
+{
+ int fd = nni_posix_pfd_fd(c->pfd);
+#if defined(NNG_HAVE_GETPEEREID)
+ uid_t uid;
+ gid_t gid;
+
+ if (getpeereid(fd, &uid, &gid) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = uid;
+ *egid = gid;
+ *prid = (uint64_t) -1;
+ *znid = (uint64_t) -1;
+ return (0);
+#elif defined(NNG_HAVE_GETPEERUCRED)
+ ucred_t *ucp = NULL;
+ if (getpeerucred(fd, &ucp) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = ucred_geteuid(ucp);
+ *egid = ucred_getegid(ucp);
+ *prid = ucred_getpid(ucp);
+ *znid = ucred_getzoneid(ucp);
+ ucred_free(ucp);
+ return (0);
+#elif defined(NNG_HAVE_SOPEERCRED)
+ struct ucred uc;
+ socklen_t len = sizeof(uc);
+ if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = uc.uid;
+ *egid = uc.gid;
+ *prid = uc.pid;
+ *znid = (uint64_t) -1;
+ return (0);
+#elif defined(NNG_HAVE_LOCALPEERCRED)
+ struct xucred xu;
+ socklen_t len = sizeof(xu);
+ if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) {
+ return (nni_plat_errno(errno));
+ }
+ *euid = xu.cr_uid;
+ *egid = xu.cr_gid;
+ *prid = (uint64_t) -1;
+ *znid = (uint64_t) -1;
+#if defined(LOCAL_PEERPID) // present (undocumented) on macOS
+ {
+ pid_t pid;
+ if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERPID, &pid, &len) ==
+ 0) {
+ *prid = (uint64_t) pid;
+ }
+ }
+#endif // LOCAL_PEERPID
+ return (0);
+#else
+ if (fd < 0) {
+ return (NNG_ECLOSED);
+ }
+ NNI_ARG_UNUSED(euid);
+ NNI_ARG_UNUSED(egid);
+ NNI_ARG_UNUSED(prid);
+ NNI_ARG_UNUSED(znid);
+ return (NNG_ENOTSUP);
+#endif
+}
+int
+nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *uid)
+{
+ int rv;
+ uint64_t ignore;
+
+ if ((rv = ipc_conn_peerid(c, uid, &ignore, &ignore, &ignore)) != 0) {
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *gid)
+{
+ int rv;
+ uint64_t ignore;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, gid, &ignore, &ignore)) != 0) {
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *zid)
+{
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal zone id (illumos/Solaris)
+ return (NNG_ENOTSUP);
+ }
+ *zid = id;
+ return (0);
+}
+
+int
+nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid)
+{
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
+
+ if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) {
+ return (rv);
+ }
+ if (id == (uint64_t) -1) {
+ // NB: -1 is not a legal process id
+ return (NNG_ENOTSUP);
+ }
+ *pid = id;
+ return (0);
+}
+
+int
+nni_posix_ipc_conn_init(nni_ipc_conn **cp, nni_posix_pfd *pfd)
+{
+ nni_ipc_conn *c;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ c->closed = false;
+ c->pfd = pfd;
+
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->readq);
+ nni_aio_list_init(&c->writeq);
+
+ *cp = c;
+ return (0);
+}
+
+void
+nni_posix_ipc_conn_start(nni_ipc_conn *c)
+{
+ nni_posix_pfd_set_cb(c->pfd, ipc_conn_cb, c);
+}
+
+void
+nni_ipc_conn_fini(nni_ipc_conn *c)
+{
+ nni_ipc_conn_close(c);
+ nni_posix_pfd_fini(c->pfd);
+ nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+ nni_mtx_fini(&c->mtx);
+
+ NNI_FREE_STRUCT(c);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c
new file mode 100644
index 00000000..13732911
--- /dev/null
+++ b/src/platform/posix/posix_ipcdial.c
@@ -0,0 +1,238 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_ipc.h"
+
+// Dialer stuff.
+int
+nni_ipc_dialer_init(nni_ipc_dialer **dp)
+{
+ nni_ipc_dialer *d;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&d->mtx);
+ d->closed = false;
+ nni_aio_list_init(&d->connq);
+ *dp = d;
+ return (0);
+}
+
+void
+nni_ipc_dialer_close(nni_ipc_dialer *d)
+{
+ nni_mtx_lock(&d->mtx);
+ if (!d->closed) {
+ nni_aio *aio;
+ d->closed = true;
+ while ((aio = nni_list_first(&d->connq)) != NULL) {
+ nni_ipc_conn *c;
+ nni_list_remove(&d->connq, aio);
+ if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_ipc_conn_close(c);
+ nni_reap(
+ &c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ }
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_ipc_dialer_fini(nni_ipc_dialer *d)
+{
+ nni_ipc_dialer_close(d);
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+static void
+ipc_dialer_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_dialer *d = nni_aio_get_prov_data(aio);
+ nni_ipc_conn * c;
+
+ nni_mtx_lock(&d->mtx);
+ if ((!nni_aio_list_active(aio)) ||
+ ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ c->dial_aio = NULL;
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ nni_aio_finish_error(aio, rv);
+ nni_ipc_conn_fini(c);
+}
+
+static void
+ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
+{
+ nni_ipc_conn * c = arg;
+ nni_ipc_dialer *d = c->dialer;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&d->mtx);
+ aio = c->dial_aio;
+ if ((aio == NULL) || (!nni_aio_list_active(aio))) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ if (ev & POLLNVAL) {
+ rv = EBADF;
+
+ } else {
+ socklen_t sz = sizeof(int);
+ int fd = nni_posix_pfd_fd(pfd);
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ rv = errno;
+ }
+ if (rv == EINPROGRESS) {
+ // Connection still in progress, come back
+ // later.
+ nni_mtx_unlock(&d->mtx);
+ return;
+ } else if (rv != 0) {
+ rv = nni_plat_errno(rv);
+ }
+ }
+
+ c->dial_aio = NULL;
+ nni_aio_list_remove(aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+
+ if (rv != 0) {
+ nni_ipc_conn_close(c);
+ nni_ipc_conn_fini(c);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+// We don't give local address binding support. Outbound dialers always
+// get an ephemeral port.
+void
+nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+{
+ nni_ipc_conn * c;
+ nni_posix_pfd * pfd = NULL;
+ struct sockaddr_storage ss;
+ size_t sslen;
+ int fd;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ (ss.ss_family != AF_UNIX)) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
+
+ if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ nni_aio_finish_error(aio, nni_plat_errno(errno));
+ return;
+ }
+
+ // This arranges for the fd to be in nonblocking mode, and adds the
+ // pollfd to the list.
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ c->dialer = d;
+ nni_posix_pfd_set_cb(pfd, ipc_dialer_cb, c);
+
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ rv = NNG_ECLOSED;
+ goto error;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_dialer_cancel, d)) != 0) {
+ goto error;
+ }
+ if ((rv = connect(fd, (void *) &ss, sslen)) != 0) {
+ if (errno != EINPROGRESS) {
+ if (errno == ENOENT) {
+ // No socket present means nobody listening.
+ rv = NNG_ECONNREFUSED;
+ } else {
+ rv = nni_plat_errno(errno);
+ }
+ goto error;
+ }
+ // Asynchronous connect.
+ if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
+ goto error;
+ }
+ c->dial_aio = aio;
+ nni_aio_set_prov_extra(aio, 0, c);
+ nni_list_append(&d->connq, aio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ // Immediate connect, cool! This probably only happens
+ // on loopback, and probably not on every platform.
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ return;
+
+error:
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_mtx_unlock(&d->mtx);
+ nni_reap(&c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ nni_aio_finish_error(aio, rv);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c
new file mode 100644
index 00000000..4e33a08f
--- /dev/null
+++ b/src/platform/posix/posix_ipclisten.c
@@ -0,0 +1,373 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+
+#include "posix_ipc.h"
+
+int
+nni_ipc_listener_init(nni_ipc_listener **lp)
+{
+ nni_ipc_listener *l;
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&l->mtx);
+
+ l->pfd = NULL;
+ l->closed = false;
+ l->started = false;
+ l->perms = 0;
+
+ nni_aio_list_init(&l->acceptq);
+ *lp = l;
+ return (0);
+}
+
+static void
+ipc_listener_doclose(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+ char * path;
+
+ l->closed = true;
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+
+ if (l->pfd != NULL) {
+ nni_posix_pfd_close(l->pfd);
+ }
+ if (l->started && ((path = l->path) != NULL)) {
+ l->path = NULL;
+ (void) unlink(path);
+ nni_strfree(path);
+ }
+}
+
+void
+nni_ipc_listener_close(nni_ipc_listener *l)
+{
+ nni_mtx_lock(&l->mtx);
+ ipc_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ipc_listener_doaccept(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&l->acceptq)) != NULL) {
+ int newfd;
+ int fd;
+ int rv;
+ nni_posix_pfd *pfd;
+ nni_ipc_conn * c;
+
+ fd = nni_posix_pfd_fd(l->pfd);
+
+#ifdef NNG_USE_ACCEPT4
+ newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
+ if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
+ newfd = accept(fd, NULL, NULL);
+ }
+#else
+ newfd = accept(fd, NULL, NULL);
+#endif
+ if (newfd < 0) {
+ switch (errno) {
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ rv = nni_posix_pfd_arm(l->pfd, POLLIN);
+ if (rv != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ // Come back later...
+ return;
+ case ECONNABORTED:
+ case ECONNRESET:
+ // Eat them, they aren't interesting.
+ continue;
+ default:
+ // Error this one, but keep moving to the next.
+ rv = nni_plat_errno(errno);
+ NNI_ASSERT(rv != 0);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ }
+
+ if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ close(newfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ nni_posix_pfd_fini(pfd);
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ nni_aio_list_remove(aio);
+ nni_posix_ipc_conn_start(c);
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+static void
+ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_ipc_listener *l = arg;
+ NNI_ARG_UNUSED(pfd);
+
+ nni_mtx_lock(&l->mtx);
+ if (events & POLLNVAL) {
+ ipc_listener_doclose(l);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+
+ // Anything else will turn up in accept.
+ ipc_listener_doaccept(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ipc_listener_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_listener *l = nni_aio_get_prov_data(aio);
+
+ // This is dead easy, because we'll ignore the completion if there
+ // isn't anything to do the accept on!
+ NNI_ASSERT(rv != 0);
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+static int
+ipc_remove_stale(const char *path)
+{
+ int fd;
+ struct sockaddr_un sun;
+ size_t sz;
+
+ sun.sun_family = AF_UNIX;
+ sz = sizeof(sun.sun_path);
+
+ if (nni_strlcpy(sun.sun_path, path, sz) >= sz) {
+ return (NNG_EADDRINVAL);
+ }
+
+ if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ // There is an assumption here that connect() returns immediately
+ // (even when non-blocking) when a server is absent. This seems
+ // to be true for the platforms we've tried. If it doesn't work,
+ // then the cleanup will fail. As this is supposed to be an
+ // exceptional case, don't worry.
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ if (connect(fd, (void *) &sun, sizeof(sun)) < 0) {
+ if (errno == ECONNREFUSED) {
+ (void) unlink(path);
+ }
+ }
+ (void) close(fd);
+ return (0);
+}
+
+int
+nni_ipc_listener_set_permissions(nni_ipc_listener *l, int mode)
+{
+ if ((mode & S_IFMT) != 0) {
+ return (NNG_EINVAL);
+ }
+ mode |= S_IFSOCK; // set IFSOCK to ensure non-zero
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ l->perms = mode;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+int
+nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *sd)
+{
+ NNI_ARG_UNUSED(l);
+ NNI_ARG_UNUSED(sd);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
+{
+ socklen_t len;
+ struct sockaddr_storage ss;
+ int rv;
+ int fd;
+ nni_posix_pfd * pfd;
+ char * path;
+
+ if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ (ss.ss_family != AF_UNIX)) {
+ return (NNG_EADDRINVAL);
+ }
+
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ESTATE);
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ path = nni_strdup(sa->s_ipc.sa_path);
+ if (path == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+
+ if (((rv = ipc_remove_stale(path)) != 0) ||
+ ((rv = nni_posix_pfd_init(&pfd, fd)) != 0)) {
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ (void) close(fd);
+ return (rv);
+ }
+
+ if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ if (((l->perms != 0) && (chmod(path, l->perms & ~S_IFMT) != 0)) ||
+ (listen(fd, 128) != 0)) {
+ rv = nni_plat_errno(errno);
+ (void) unlink(path);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
+
+ nni_posix_pfd_set_cb(pfd, ipc_listener_cb, l);
+
+ l->pfd = pfd;
+ l->started = true;
+ l->path = path;
+ nni_mtx_unlock(&l->mtx);
+
+ return (0);
+}
+
+void
+nni_ipc_listener_fini(nni_ipc_listener *l)
+{
+ nni_posix_pfd *pfd;
+
+ nni_mtx_lock(&l->mtx);
+ ipc_listener_doclose(l);
+ pfd = l->pfd;
+ nni_mtx_unlock(&l->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+void
+nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
+{
+ int rv;
+
+ // Accept is simpler than the connect case. With accept we just
+ // need to wait for the socket to be readable to indicate an incoming
+ // connection is ready for us. There isn't anything else for us to
+ // do really, as that will have been done in listen.
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_listener_cancel, l)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_aio_list_append(&l->acceptq, aio);
+ if (nni_list_first(&l->acceptq) == aio) {
+ ipc_listener_doaccept(l);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
deleted file mode 100644
index b11036ea..00000000
--- a/src/platform/posix/posix_pipedesc.c
+++ /dev/null
@@ -1,503 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
-#include "platform/posix/posix_pollq.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <poll.h>
-#include <stdbool.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <unistd.h>
-#if defined(NNG_HAVE_GETPEERUCRED)
-#include <ucred.h>
-#elif defined(NNG_HAVE_LOCALPEERCRED)
-#include <sys/ucred.h>
-#include <sys/un.h>
-#endif
-#ifdef NNG_HAVE_ALLOCA
-#include <alloca.h>
-#endif
-
-// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open
-// file descriptor for TCP socket, etc.) This contains the list of pending
-// aios for that underlying socket, as well as the socket itself.
-struct nni_posix_pipedesc {
- nni_posix_pfd *pfd;
- nni_list readq;
- nni_list writeq;
- bool closed;
- nni_mtx mtx;
-};
-
-static void
-nni_posix_pipedesc_finish(nni_aio *aio, int rv)
-{
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, nni_aio_count(aio));
-}
-
-static void
-nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
-
- pd->closed = true;
- while ((aio = nni_list_first(&pd->readq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
- while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
- nni_posix_pfd_close(pd->pfd);
-}
-
-static void
-nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
- int fd;
-
- fd = nni_posix_pfd_fd(pd->pfd);
- if ((fd < 0) || (pd->closed)) {
- return;
- }
-
- while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- unsigned i;
- int n;
- int niov;
- unsigned naiov;
- nni_iov * aiov;
- struct msghdr hdr;
-#ifdef NNG_HAVE_ALLOCA
- struct iovec *iovec;
-#else
- struct iovec iovec[16];
-#endif
-
- memset(&hdr, 0, sizeof(hdr));
-
- nni_aio_get_iov(aio, &naiov, &aiov);
-
-#ifdef NNG_HAVE_ALLOCA
- if (naiov > 64) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
- iovec = alloca(naiov * sizeof(*iovec));
-#else
- if (naiov > NNI_NUM_ELEMENTS(iovec)) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
-#endif
-
- for (niov = 0, i = 0; i < naiov; i++) {
- if (aiov[i].iov_len > 0) {
- iovec[niov].iov_len = aiov[i].iov_len;
- iovec[niov].iov_base = aiov[i].iov_buf;
- niov++;
- }
- }
-
-#ifndef MSG_NOSIGNAL
-#define MSG_NOSIGNAL 0
-#endif
-
- hdr.msg_iovlen = niov;
- hdr.msg_iov = iovec;
-
- if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
- switch (errno) {
- case EINTR:
- continue;
- case EAGAIN:
-#ifdef EWOULDBLOCK
-#if EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
-#endif
-#endif
- return;
- default:
- nni_posix_pipedesc_finish(
- aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
- }
- }
-
- nni_aio_bump_count(aio, n);
- // We completed the entire operation on this aioq.
- // (Sendmsg never returns a partial result.)
- nni_posix_pipedesc_finish(aio, 0);
-
- // Go back to start of loop to see if there is another
- // aio ready for us to process.
- }
-}
-
-static void
-nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
- int fd;
-
- fd = nni_posix_pfd_fd(pd->pfd);
- if ((fd < 0) || (pd->closed)) {
- return;
- }
-
- while ((aio = nni_list_first(&pd->readq)) != NULL) {
- unsigned i;
- int n;
- int niov;
- unsigned naiov;
- nni_iov *aiov;
-#ifdef NNG_HAVE_ALLOCA
- struct iovec *iovec;
-#else
- struct iovec iovec[16];
-#endif
-
- nni_aio_get_iov(aio, &naiov, &aiov);
-#ifdef NNG_HAVE_ALLOCA
- if (naiov > 64) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
- iovec = alloca(naiov * sizeof(*iovec));
-#else
- if (naiov > NNI_NUM_ELEMENTS(iovec)) {
- nni_posix_pipedesc_finish(aio, NNG_EINVAL);
- continue;
- }
-#endif
- for (niov = 0, i = 0; i < naiov; i++) {
- if (aiov[i].iov_len != 0) {
- iovec[niov].iov_len = aiov[i].iov_len;
- iovec[niov].iov_base = aiov[i].iov_buf;
- niov++;
- }
- }
-
- if ((n = readv(fd, iovec, niov)) < 0) {
- switch (errno) {
- case EINTR:
- continue;
- case EAGAIN:
- return;
- default:
- nni_posix_pipedesc_finish(
- aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
- }
- }
-
- if (n == 0) {
- // No bytes indicates a closed descriptor.
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- nni_posix_pipedesc_doclose(pd);
- return;
- }
-
- nni_aio_bump_count(aio, n);
-
- // We completed the entire operation on this aioq.
- nni_posix_pipedesc_finish(aio, 0);
-
- // Go back to start of loop to see if there is another
- // aio ready for us to process.
- }
-}
-
-static void
-nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg)
-{
- nni_posix_pipedesc *pd = arg;
-
- nni_mtx_lock(&pd->mtx);
- if (events & POLLIN) {
- nni_posix_pipedesc_doread(pd);
- }
- if (events & POLLOUT) {
- nni_posix_pipedesc_dowrite(pd);
- }
- if (events & (POLLHUP | POLLERR | POLLNVAL)) {
- nni_posix_pipedesc_doclose(pd);
- } else {
- events = 0;
- if (!nni_list_empty(&pd->writeq)) {
- events |= POLLOUT;
- }
- if (!nni_list_empty(&pd->readq)) {
- events |= POLLIN;
- }
- if ((!pd->closed) && (events != 0)) {
- nni_posix_pfd_arm(pfd, events);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
-{
- // NB: Events may still occur.
- nni_mtx_lock(&pd->mtx);
- nni_posix_pipedesc_doclose(pd);
- nni_mtx_unlock(&pd->mtx);
-}
-
-static void
-nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
-{
- nni_posix_pipedesc *pd = nni_aio_get_prov_data(aio);
-
- nni_mtx_lock(&pd->mtx);
- if (nni_aio_list_active(aio)) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&pd->mtx);
-
- if (pd->closed) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
-
- if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&pd->readq, aio);
-
- // If we are only job on the list, go ahead and try to do an
- // immediate transfer. This allows for faster completions in
- // many cases. We also need not arm a list if it was already
- // armed.
- if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pipedesc_doread(pd);
- // If we are still the first thing on the list, that
- // means we didn't finish the job, so arm the poller to
- // complete us.
- if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pfd_arm(pd->pfd, POLLIN);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-void
-nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
-{
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&pd->mtx);
-
- if (pd->closed) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
-
- if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_list_append(&pd->writeq, aio);
-
- if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pipedesc_dowrite(pd);
- // If we are still the first thing on the list, that
- // means we didn't finish the job, so arm the poller to
- // complete us.
- if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pfd_arm(pd->pfd, POLLOUT);
- }
- }
- nni_mtx_unlock(&pd->mtx);
-}
-
-int
-nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (getpeername(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (getsockname(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay)
-{
- int val = nodelay ? 1 : 0;
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-int
-nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep)
-{
- int val = keep ? 1 : 0;
- int fd = nni_posix_pfd_fd(pd->pfd);
-
- if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-int
-nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
- uint64_t *egid, uint64_t *prid, uint64_t *znid)
-{
- int fd = nni_posix_pfd_fd(pd->pfd);
-#if defined(NNG_HAVE_GETPEEREID)
- uid_t uid;
- gid_t gid;
-
- if (getpeereid(fd, &uid, &gid) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = uid;
- *egid = gid;
- *prid = (uint64_t) -1;
- *znid = (uint64_t) -1;
- return (0);
-#elif defined(NNG_HAVE_GETPEERUCRED)
- ucred_t *ucp = NULL;
- if (getpeerucred(fd, &ucp) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = ucred_geteuid(ucp);
- *egid = ucred_getegid(ucp);
- *prid = ucred_getpid(ucp);
- *znid = ucred_getzoneid(ucp);
- ucred_free(ucp);
- return (0);
-#elif defined(NNG_HAVE_SOPEERCRED)
- struct ucred uc;
- socklen_t len = sizeof(uc);
- if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = uc.uid;
- *egid = uc.gid;
- *prid = uc.pid;
- *znid = (uint64_t) -1;
- return (0);
-#elif defined(NNG_HAVE_LOCALPEERCRED)
- struct xucred xu;
- socklen_t len = sizeof(xu);
- if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) {
- return (nni_plat_errno(errno));
- }
- *euid = xu.cr_uid;
- *egid = xu.cr_gid;
- *prid = (uint64_t) -1; // XXX: macOS has undocumented
- // LOCAL_PEERPID...
- *znid = (uint64_t) -1;
- return (0);
-#else
- if (fd < 0) {
- return (NNG_ECLOSED);
- }
- NNI_ARG_UNUSED(euid);
- NNI_ARG_UNUSED(egid);
- NNI_ARG_UNUSED(prid);
- NNI_ARG_UNUSED(znid);
- return (NNG_ENOTSUP);
-#endif
-}
-
-int
-nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd)
-{
- nni_posix_pipedesc *pd;
-
- if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- pd->closed = false;
- pd->pfd = pfd;
-
- nni_mtx_init(&pd->mtx);
- nni_aio_list_init(&pd->readq);
- nni_aio_list_init(&pd->writeq);
-
- nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd);
-
- *pdp = pd;
- return (0);
-}
-
-void
-nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
-{
- nni_posix_pipedesc_close(pd);
- nni_posix_pfd_fini(pd->pfd);
- pd->pfd = NULL;
- nni_mtx_fini(&pd->mtx);
-
- NNI_FREE_STRUCT(pd);
-}
-
-#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index 63d858c2..f78d0b6b 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_USE_POSIX_RESOLV_GAI
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <ctype.h>
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index 5cd1887a..c0352c55 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index 7f627361..21f6ecfe 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index cdcbecd9..8c186885 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include <arpa/inet.h>
#include <errno.h>
@@ -151,6 +150,7 @@ static void
tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_tcp_listener *l = arg;
+ NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&l->mtx);
if (events & POLLNVAL) {
@@ -158,7 +158,6 @@ tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
nni_mtx_unlock(&l->mtx);
return;
}
- NNI_ASSERT(pfd == l->pfd);
// Anything else will turn up in accept.
tcp_listener_doaccept(l);
@@ -212,7 +211,7 @@ nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa)
if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
nni_mtx_unlock(&l->mtx);
- nni_posix_pfd_fini(pfd);
+ (void) close(fd);
return (rv);
}
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 759bdb96..873a02a1 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -11,7 +11,6 @@
#include "core/nng_impl.h"
#ifdef NNG_PLATFORM_POSIX
-#include "platform/posix/posix_aio.h"
#include "platform/posix/posix_pollq.h"
#include <errno.h>
@@ -177,7 +176,7 @@ static void
nni_posix_udp_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_plat_udp *udp = arg;
- NNI_ASSERT(pfd == udp->udp_pfd);
+ NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&udp->udp_mtx);
if (events & POLLIN) {
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index 93e45423..b8741b52 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -127,9 +127,8 @@ extern void nni_win_udp_sysfini(void);
extern int nni_win_resolv_sysinit(void);
extern void nni_win_resolv_sysfini(void);
-extern int nni_win_io_init(nni_win_io *, HANDLE, nni_win_io_cb, void *);
+extern int nni_win_io_init(nni_win_io *, nni_win_io_cb, void *);
extern void nni_win_io_fini(nni_win_io *);
-extern void nni_win_io_cancel(nni_win_io *);
extern int nni_win_io_register(HANDLE);
diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c
index 1179b603..50b22343 100644
--- a/src/platform/windows/win_io.c
+++ b/src/platform/windows/win_io.c
@@ -61,14 +61,13 @@ nni_win_io_register(HANDLE h)
}
int
-nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr)
+nni_win_io_init(nni_win_io *io, nni_win_io_cb cb, void *ptr)
{
ZeroMemory(&io->olpd, sizeof(io->olpd));
io->cb = cb;
io->ptr = ptr;
io->aio = NULL;
- io->f = f;
io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (io->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
@@ -77,14 +76,6 @@ nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr)
}
void
-nni_win_io_cancel(nni_win_io *io)
-{
- if (io->f != INVALID_HANDLE_VALUE) {
- CancelIoEx(io->f, &io->olpd);
- }
-}
-
-void
nni_win_io_fini(nni_win_io *io)
{
if (io->olpd.hEvent != NULL) {
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
deleted file mode 100644
index 169a2e00..00000000
--- a/src/platform/windows/win_ipc.c
+++ /dev/null
@@ -1,673 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#ifdef NNG_PLATFORM_WINDOWS
-
-#include <stdio.h>
-
-struct nni_plat_ipc_pipe {
- HANDLE p;
- int mode;
- nni_win_event rcv_ev;
- nni_win_event snd_ev;
-};
-
-struct nni_plat_ipc_ep {
- char path[NNG_MAXADDRLEN + 16];
- nni_sockaddr addr;
- int mode;
- bool started;
- HANDLE p; // accept side only
- nni_win_event acc_ev; // accept side only
- nni_aio * con_aio; // conn side only
- nni_list_node node; // conn side uses this
- SECURITY_ATTRIBUTES sec_attr;
-};
-
-static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *);
-static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *);
-static void nni_win_ipc_pipe_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_ipc_pipe_ops = {
- .wev_start = nni_win_ipc_pipe_start,
- .wev_finish = nni_win_ipc_pipe_finish,
- .wev_cancel = nni_win_ipc_pipe_cancel,
-};
-
-static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *);
-static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *);
-static void nni_win_ipc_acc_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_ipc_acc_ops = {
- .wev_start = nni_win_ipc_acc_start,
- .wev_finish = nni_win_ipc_acc_finish,
- .wev_cancel = nni_win_ipc_acc_cancel,
-};
-
-static int
-nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
-{
- void * buf;
- DWORD len;
- BOOL ok;
- int rv;
- nni_plat_ipc_pipe *pipe = evt->ptr;
- unsigned idx;
- unsigned naiov;
- nni_iov * aiov;
-
- NNI_ASSERT(aio != NULL);
-
- if (pipe->p == INVALID_HANDLE_VALUE) {
- evt->status = NNG_ECLOSED;
- evt->count = 0;
- return (1);
- }
-
- nni_aio_get_iov(aio, &naiov, &aiov);
- idx = 0;
- while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
- idx++;
- }
- NNI_ASSERT(idx < naiov);
- // Now start a transfer. We assume that only one send can be
- // outstanding on a pipe at a time. This is important to avoid
- // scrambling the data anyway. Note that Windows named pipes do
- // not appear to support scatter/gather, so we have to process
- // each element in turn.
- buf = aiov[idx].iov_buf;
- len = (DWORD) aiov[idx].iov_len;
- NNI_ASSERT(buf != NULL);
- NNI_ASSERT(len != 0);
-
- // We limit ourselves to writing 16MB at a time. Named Pipes
- // on Windows have limits of between 31 and 64MB.
- if (len > 0x1000000) {
- len = 0x1000000;
- }
-
- evt->count = 0;
- if (evt == &pipe->snd_ev) {
- ok = WriteFile(pipe->p, buf, len, NULL, &evt->olpd);
- } else {
- ok = ReadFile(pipe->p, buf, len, NULL, &evt->olpd);
- }
- if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
-
- // Wait for the I/O completion event. Note that when an I/O
- // completes immediately, the I/O completion packet is still
- // delivered.
- return (0);
-}
-
-static void
-nni_win_ipc_pipe_cancel(nni_win_event *evt)
-{
- nni_plat_ipc_pipe *pipe = evt->ptr;
-
- CancelIoEx(pipe->p, &evt->olpd);
-}
-
-static void
-nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio)
-{
- nni_aio_finish(aio, evt->status, evt->count);
-}
-
-static int
-nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p, int mode)
-{
- nni_plat_ipc_pipe *pipe;
- int rv;
-
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- pipe->mode = mode;
- rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
- rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe);
- if (rv != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- return (rv);
- }
-
- pipe->p = p;
- *pipep = pipe;
- return (0);
-}
-
-void
-nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio)
-{
- nni_win_event_submit(&pipe->snd_ev, aio);
-}
-
-void
-nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio)
-{
- nni_win_event_submit(&pipe->rcv_ev, aio);
-}
-
-void
-nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe)
-{
- HANDLE p;
-
- nni_win_event_close(&pipe->snd_ev);
- nni_win_event_close(&pipe->rcv_ev);
-
- if ((p = pipe->p) != INVALID_HANDLE_VALUE) {
- pipe->p = INVALID_HANDLE_VALUE;
- CloseHandle(p);
- }
-}
-
-void
-nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe)
-{
- nni_plat_ipc_pipe_close(pipe);
-
- nni_win_event_fini(&pipe->snd_ev);
- nni_win_event_fini(&pipe->rcv_ev);
- NNI_FREE_STRUCT(pipe);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *pipe, uint64_t *id)
-{
- NNI_ARG_UNUSED(pipe);
- NNI_ARG_UNUSED(id);
- return (NNG_ENOTSUP);
-}
-
-// nni_plat_ipc_pipe_get_peer_gid obtains the peer group id, if possible.
-// NB: Only POSIX systems support group IDs.
-int
-nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *pipe, uint64_t *pid)
-{
- ULONG id;
- switch (pipe->mode) {
- case NNI_EP_MODE_DIAL:
- if (!GetNamedPipeServerProcessId(pipe->p, &id)) {
- return (nni_win_error(GetLastError()));
- }
- *pid = id;
- break;
- case NNI_EP_MODE_LISTEN:
- if (!GetNamedPipeClientProcessId(pipe->p, &id)) {
- return (nni_win_error(GetLastError()));
- }
- *pid = id;
- break;
- default:
- // Should never occur!
- return (NNG_EINVAL);
- }
- return (0);
-}
-
-int
-nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode)
-{
- const char * path;
- nni_plat_ipc_ep *ep;
-
- path = sa->s_ipc.sa_path;
- if (nni_strnlen(path, NNG_MAXADDRLEN) >= NNG_MAXADDRLEN) {
- return (NNG_EINVAL);
- }
-
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
- }
- ZeroMemory(ep, sizeof(*ep));
-
- ep->mode = mode;
- ep->sec_attr.nLength = sizeof(ep->sec_attr);
- ep->sec_attr.lpSecurityDescriptor = NULL;
- ep->sec_attr.bInheritHandle = FALSE;
- NNI_LIST_NODE_INIT(&ep->node);
-
- ep->addr = *sa;
- (void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path);
-
- *epp = ep;
- return (0);
-}
-
-int
-nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits)
-{
- NNI_ARG_UNUSED(ep);
- NNI_ARG_UNUSED(bits);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *desc)
-{
- if (ep->started) {
- return (NNG_EBUSY);
- }
- if (ep->mode != NNI_EP_MODE_LISTEN) {
- return (NNG_ENOTSUP);
- }
- if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) {
- return (NNG_EINVAL);
- }
- ep->sec_attr.lpSecurityDescriptor = desc;
- return (0);
-}
-
-int
-nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep)
-{
- int rv;
- HANDLE p;
-
- if (ep->mode != NNI_EP_MODE_LISTEN) {
- return (NNG_EINVAL);
- }
- if (ep->started) {
- return (NNG_EBUSY);
- }
-
- // We create the first named pipe, and we make sure that it is
- // properly ours.
- p = CreateNamedPipeA(ep->path,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
- FILE_FLAG_FIRST_PIPE_INSTANCE,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
- PIPE_REJECT_REMOTE_CLIENTS,
- PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr);
- if (p == INVALID_HANDLE_VALUE) {
- if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) {
- rv = NNG_EADDRINUSE;
- } else {
- rv = nni_win_error(rv);
- }
- goto failed;
- }
- rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep);
- if (rv != 0) {
- goto failed;
- }
-
- if ((rv = nni_win_iocp_register(p)) != 0) {
- goto failed;
- }
-
- ep->p = p;
- ep->started = true;
- return (0);
-
-failed:
-
- if (p != INVALID_HANDLE_VALUE) {
- (void) CloseHandle(p);
- }
-
- return (rv);
-}
-
-static void
-nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_ipc_ep * ep = evt->ptr;
- nni_plat_ipc_pipe *pipe;
- int rv;
- HANDLE newp, oldp;
-
- if ((rv = evt->status) != 0) {
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- newp = CreateNamedPipeA(ep->path,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
- PIPE_REJECT_REMOTE_CLIENTS,
- PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr);
- if (newp == INVALID_HANDLE_VALUE) {
- rv = nni_win_error(GetLastError());
- // We connected, but as we cannot get a new pipe,
- // we have to disconnect the old one.
- DisconnectNamedPipe(ep->p);
- nni_aio_finish_error(aio, rv);
- return;
- }
- if ((rv = nni_win_iocp_register(newp)) != 0) {
- // Disconnect the old pipe.
- DisconnectNamedPipe(ep->p);
- // And discard the half-baked new one.
- DisconnectNamedPipe(newp);
- (void) CloseHandle(newp);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- oldp = ep->p;
- ep->p = newp;
-
- if ((rv = nni_win_ipc_pipe_init(&pipe, oldp, NNI_EP_MODE_LISTEN)) !=
- 0) {
- // The new pipe is already fine for us. Discard
- // the old one, since failed to be able to use it.
- DisconnectNamedPipe(oldp);
- (void) CloseHandle(oldp);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
-}
-
-static void
-nni_win_ipc_acc_cancel(nni_win_event *evt)
-{
- nni_plat_ipc_ep *ep = evt->ptr;
-
- (void) CancelIoEx(ep->p, &evt->olpd);
- // Just to be sure.
- (void) DisconnectNamedPipe(ep->p);
-}
-
-static int
-nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio)
-{
- nni_plat_ipc_ep *ep = evt->ptr;
- NNI_ARG_UNUSED(aio);
-
- if (!ConnectNamedPipe(ep->p, &evt->olpd)) {
- int rv = GetLastError();
- switch (rv) {
- case ERROR_PIPE_CONNECTED:
- // Kind of like success, but as this is technically
- // an "error", we have to complete it ourself.
- evt->status = 0;
- evt->count = 0;
- return (1);
-
- case ERROR_IO_PENDING:
- // Normal asynchronous operation. Wait for
- // completion.
- return (0);
-
- default:
- // Fast-fail (synchronous).
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
- }
-
- // Synchronous success - the I/O completion packet should still
- // be delivered.
- return (0);
-}
-
-void
-nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_win_event_submit(&ep->acc_ev, aio);
-}
-
-// So Windows IPC is a bit different on the client side. There is no
-// support for asynchronous connection, but we can fake it with a
-// single thread that runs to establish the connection. That thread
-// will run keep looping, sleeping for 10 ms between attempts. It
-// performs non-blocking attempts to connect.
-typedef struct nni_win_ipc_conn_work nni_win_ipc_conn_work;
-struct nni_win_ipc_conn_work {
- nni_list waiters;
- nni_list workers;
- nni_mtx mtx;
- nni_cv cv;
- nni_thr thr;
- int exit;
-};
-
-static nni_win_ipc_conn_work nni_win_ipc_connecter;
-
-static void
-nni_win_ipc_conn_thr(void *arg)
-{
- nni_win_ipc_conn_work *w = arg;
- nni_plat_ipc_ep * ep;
- nni_plat_ipc_pipe * pipe;
- nni_aio * aio;
- HANDLE p;
- int rv;
-
- nni_mtx_lock(&w->mtx);
- for (;;) {
- if (w->exit) {
- break;
- }
- while ((ep = nni_list_first(&w->waiters)) != NULL) {
- nni_list_remove(&w->waiters, ep);
- nni_list_append(&w->workers, ep);
- }
-
- while ((ep = nni_list_first(&w->workers)) != NULL) {
-
- nni_list_remove(&w->workers, ep);
-
- if ((aio = ep->con_aio) == NULL) {
- continue;
- }
- ep->con_aio = NULL;
-
- pipe = NULL;
-
- p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE,
- 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED,
- NULL);
-
- if (p == INVALID_HANDLE_VALUE) {
- switch ((rv = GetLastError())) {
- case ERROR_PIPE_BUSY:
- // Still in progress. This
- // shouldn't happen unless the
- // other side aborts the
- // connection.
- ep->con_aio = aio;
- nni_list_append(&w->waiters, ep);
- continue;
-
- case ERROR_FILE_NOT_FOUND:
- rv = NNG_ECONNREFUSED;
- break;
- default:
- rv = nni_win_error(rv);
- break;
- }
- goto fail;
- }
- if (((rv = nni_win_ipc_pipe_init(
- &pipe, p, NNI_EP_MODE_DIAL)) != 0) ||
- ((rv = nni_win_iocp_register(p)) != 0)) {
- goto fail;
- }
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
- continue;
-
- fail:
- if (p != INVALID_HANDLE_VALUE) {
- DisconnectNamedPipe(p);
- CloseHandle(p);
- }
- if (pipe != NULL) {
- nni_plat_ipc_pipe_fini(pipe);
- }
- nni_aio_finish_error(aio, rv);
- }
-
- if (nni_list_empty(&w->waiters)) {
- // Wait until an endpoint is added.
- nni_cv_wait(&w->cv);
- } else {
- // Wait 10 ms, unless woken earlier.
- nni_cv_until(&w->cv, nni_clock() + 10);
- }
- }
- nni_mtx_unlock(&w->mtx);
-}
-
-static void
-nni_win_ipc_conn_cancel(nni_aio *aio, int rv)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- nni_plat_ipc_ep * ep = nni_aio_get_prov_data(aio);
-
- nni_mtx_lock(&w->mtx);
- if (aio == ep->con_aio) {
- ep->con_aio = NULL;
- if (nni_list_active(&w->waiters, ep)) {
- nni_list_remove(&w->waiters, ep);
- nni_cv_wake(&w->cv);
- }
- nni_aio_finish_error(aio, rv);
- }
- nni_mtx_unlock(&w->mtx);
-}
-
-void
-nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&w->mtx);
- if ((rv = nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep)) != 0) {
- nni_mtx_unlock(&w->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- NNI_ASSERT(!nni_list_active(&w->waiters, ep));
-
- ep->con_aio = aio;
- nni_list_append(&w->waiters, ep);
- nni_cv_wake(&w->cv);
- nni_mtx_unlock(&w->mtx);
-}
-
-void
-nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep)
-{
- nni_plat_ipc_ep_close(ep);
- if (ep->p != INVALID_HANDLE_VALUE) {
- CloseHandle(ep->p);
- ep->p = INVALID_HANDLE_VALUE;
- }
- nni_win_event_close(&ep->acc_ev);
- nni_win_event_fini(&ep->acc_ev);
- NNI_FREE_STRUCT(ep);
-}
-
-void
-nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
-{
- nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
- nni_aio * aio;
-
- switch (ep->mode) {
- case NNI_EP_MODE_DIAL:
- nni_mtx_lock(&w->mtx);
- if (nni_list_active(&w->waiters, ep)) {
- nni_list_remove(&w->waiters, ep);
- }
- if ((aio = ep->con_aio) != NULL) {
- ep->con_aio = NULL;
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
- nni_mtx_unlock(&w->mtx);
- break;
-
- case NNI_EP_MODE_LISTEN:
- nni_win_event_close(&ep->acc_ev);
- if (ep->p != INVALID_HANDLE_VALUE) {
- CloseHandle(ep->p);
- ep->p = INVALID_HANDLE_VALUE;
- }
- break;
- }
-}
-
-int
-nni_win_ipc_sysinit(void)
-{
- int rv;
- nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter;
-
- NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node);
- NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node);
-
- nni_mtx_init(&worker->mtx);
- nni_cv_init(&worker->cv, &worker->mtx);
-
- rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker);
- if (rv != 0) {
- return (rv);
- }
-
- nni_thr_run(&worker->thr);
-
- return (0);
-}
-
-void
-nni_win_ipc_sysfini(void)
-{
- nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter;
-
- nni_mtx_lock(&worker->mtx);
- worker->exit = 1;
- nni_cv_wake(&worker->cv);
- nni_mtx_unlock(&worker->mtx);
- nni_thr_fini(&worker->thr);
- nni_cv_fini(&worker->cv);
- nni_mtx_fini(&worker->mtx);
-}
-
-#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipc.h b/src/platform/windows/win_ipc.h
new file mode 100644
index 00000000..51ce5548
--- /dev/null
+++ b/src/platform/windows/win_ipc.h
@@ -0,0 +1,62 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef PLATFORM_WIN_WINIPC_H
+#define PLATFORM_WIN_WINIPC_H
+
+// This header file is private to the IPC (named pipes) support for Windows.
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+struct nni_ipc_conn {
+ HANDLE f;
+ nni_win_io recv_io;
+ nni_win_io send_io;
+ nni_win_io conn_io;
+ nni_list recv_aios;
+ nni_list send_aios;
+ nni_aio * conn_aio;
+ nni_ipc_dialer * dialer;
+ nni_ipc_listener *listener;
+ int recv_rv;
+ int send_rv;
+ int conn_rv;
+ bool closed;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_reap_item reap;
+};
+
+struct nni_ipc_dialer {
+ bool closed; // dialers are locked by the worker lock
+ nni_list aios;
+ nni_list_node node; // node on worker list
+};
+
+struct nni_ipc_listener {
+ char * path;
+ bool started;
+ bool closed;
+ HANDLE f;
+ SECURITY_ATTRIBUTES sec_attr;
+ nni_list aios;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_win_io io;
+ int rv;
+};
+
+extern int nni_win_ipc_conn_init(nni_ipc_conn **, HANDLE);
+
+#endif // NNG_PLATFORM_WINDOWS
+
+#endif // NNG_PLATFORM_WIN_WINIPC_H
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c
new file mode 100644
index 00000000..d8ef4e4e
--- /dev/null
+++ b/src/platform/windows/win_ipcconn.c
@@ -0,0 +1,388 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+static void
+ipc_recv_start(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ unsigned idx;
+ unsigned naiov;
+ nni_iov *aiov;
+ void * buf;
+ DWORD len;
+ int rv;
+
+ if (c->closed) {
+ while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
+ nni_list_remove(&c->recv_aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&c->cv);
+ }
+again:
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ return;
+ }
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes do
+ // not appear to support scatter/gather, so we have to process
+ // each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ goto again;
+ }
+}
+
+static void
+ipc_recv_cb(nni_win_io *io, int rv, size_t num)
+{
+ nni_aio * aio;
+ nni_ipc_conn *c = io->ptr;
+ nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
+ // Should indicate that it was closed.
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ if (c->recv_rv != 0) {
+ rv = c->recv_rv;
+ c->recv_rv = 0;
+ }
+ nni_aio_list_remove(aio);
+ ipc_recv_start(c);
+ if (c->closed) {
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ if ((rv == 0) && (num == 0)) {
+ // A zero byte receive is a remote close from the peer.
+ rv = NNG_ECLOSED;
+ }
+ nni_aio_finish_synch(aio, rv, num);
+}
+static void
+ipc_recv_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+ nni_mtx_lock(&c->mtx);
+ if (aio == nni_list_first(&c->recv_aios)) {
+ c->recv_rv = rv;
+ CancelIoEx(c->f, &c->recv_io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (c->closed) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&c->recv_aios, aio);
+ if (aio == nni_list_first(&c->recv_aios)) {
+ ipc_recv_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_send_start(nni_ipc_conn *c)
+{
+ nni_aio *aio;
+ unsigned idx;
+ unsigned naiov;
+ nni_iov *aiov;
+ void * buf;
+ DWORD len;
+ int rv;
+
+ if (c->closed) {
+ while ((aio = nni_list_first(&c->send_aios)) != NULL) {
+ nni_list_remove(&c->send_aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&c->cv);
+ }
+again:
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ return;
+ }
+
+ nni_aio_get_iov(aio, &naiov, &aiov);
+
+ idx = 0;
+ while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < naiov);
+ // Now start a transfer. We assume that only one send can be
+ // outstanding on a pipe at a time. This is important to avoid
+ // scrambling the data anyway. Note that Windows named pipes do
+ // not appear to support scatter/gather, so we have to process
+ // each element in turn.
+ buf = aiov[idx].iov_buf;
+ len = (DWORD) aiov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
+
+ // We limit ourselves to writing 16MB at a time. Named Pipes
+ // on Windows have limits of between 31 and 64MB.
+ if (len > 0x1000000) {
+ len = 0x1000000;
+ }
+
+ if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
+ ((rv = GetLastError()) != ERROR_IO_PENDING)) {
+ // Synchronous failure.
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ goto again;
+ }
+}
+
+static void
+ipc_send_cb(nni_win_io *io, int rv, size_t num)
+{
+ nni_aio * aio;
+ nni_ipc_conn *c = io->ptr;
+ nni_mtx_lock(&c->mtx);
+ if ((aio = nni_list_first(&c->send_aios)) == NULL) {
+ // Should indicate that it was closed.
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ if (c->send_rv != 0) {
+ rv = c->send_rv;
+ c->send_rv = 0;
+ }
+ nni_aio_list_remove(aio);
+ ipc_send_start(c);
+ if (c->closed) {
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ if ((rv == 0) && (num == 0)) {
+ // A zero byte receive is a remote close from the peer.
+ rv = NNG_ECLOSED;
+ }
+ nni_aio_finish_synch(aio, rv, num);
+}
+
+static void
+ipc_send_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_conn *c = nni_aio_get_prov_data(aio);
+ nni_mtx_lock(&c->mtx);
+ if (aio == nni_list_first(&c->send_aios)) {
+ c->send_rv = rv;
+ CancelIoEx(c->f, &c->send_io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_cv_wake(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ if (c->closed) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipc_send_cancel, c)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&c->send_aios, aio);
+ if (aio == nni_list_first(&c->send_aios)) {
+ ipc_send_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+int
+nni_win_ipc_conn_init(nni_ipc_conn **connp, HANDLE p)
+{
+ nni_ipc_conn *c;
+ int rv;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ c->f = INVALID_HANDLE_VALUE;
+ nni_mtx_init(&c->mtx);
+ nni_cv_init(&c->cv, &c->mtx);
+ nni_aio_list_init(&c->recv_aios);
+ nni_aio_list_init(&c->send_aios);
+
+ if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) ||
+ ((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) {
+ nni_ipc_conn_fini(c);
+ return (rv);
+ }
+
+ c->f = p;
+ *connp = c;
+ return (0);
+}
+
+void
+nni_ipc_conn_close(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ if (!c->closed) {
+ c->closed = true;
+ if (!nni_list_empty(&c->recv_aios)) {
+ CancelIoEx(c->f, &c->recv_io.olpd);
+ }
+ if (!nni_list_empty(&c->send_aios)) {
+ CancelIoEx(c->f, &c->send_io.olpd);
+ }
+
+ if (c->f != INVALID_HANDLE_VALUE) {
+ // NB: closing the pipe is dangerous at this point.
+ DisconnectNamedPipe(c->f);
+ }
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+static void
+ipc_conn_reap(nni_ipc_conn *c)
+{
+ nni_mtx_lock(&c->mtx);
+ while ((!nni_list_empty(&c->recv_aios)) ||
+ (!nni_list_empty(&c->send_aios))) {
+ nni_cv_wait(&c->cv);
+ }
+ nni_mtx_unlock(&c->mtx);
+
+ nni_win_io_fini(&c->recv_io);
+ nni_win_io_fini(&c->send_io);
+ nni_win_io_fini(&c->conn_io);
+
+ if (c->f != INVALID_HANDLE_VALUE) {
+ CloseHandle(c->f);
+ }
+ nni_cv_fini(&c->cv);
+ nni_mtx_fini(&c->mtx);
+ NNI_FREE_STRUCT(c);
+}
+
+void
+nni_ipc_conn_fini(nni_ipc_conn *c)
+{
+ nni_ipc_conn_close(c);
+
+ nni_reap(&c->reap, (nni_cb) ipc_conn_reap, c);
+}
+
+int
+nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *id)
+{
+ NNI_ARG_UNUSED(c);
+ NNI_ARG_UNUSED(id);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid)
+{
+ ULONG id;
+ if (c->dialer) {
+ if (!GetNamedPipeServerProcessId(c->f, &id)) {
+ return (nni_win_error(GetLastError()));
+ }
+ } else {
+ if (!GetNamedPipeClientProcessId(c->f, &id)) {
+ return (nni_win_error(GetLastError()));
+ }
+ }
+ *pid = id;
+ return (0);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c
new file mode 100644
index 00000000..429bcedf
--- /dev/null
+++ b/src/platform/windows/win_ipcdial.c
@@ -0,0 +1,265 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+int
+nni_ipc_dialer_init(nni_ipc_dialer **dp)
+{
+ nni_ipc_dialer *d;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ d->closed = false;
+ nni_aio_list_init(&d->aios);
+ *dp = d;
+ return (0);
+}
+
+// Windows IPC is a bit different on the client side. There is no
+// support for asynchronous connection, but we can fake it with a
+// single thread that runs to establish the connection. That thread
+// will run keep looping, sleeping for 10 ms between attempts. It
+// performs non-blocking attempts to connect.
+typedef struct ipc_dial_work {
+ nni_list waiters;
+ nni_list workers;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_thr thr;
+ int exit;
+} ipc_dial_work;
+
+static ipc_dial_work ipc_connecter;
+
+static void
+ipc_dial_thr(void *arg)
+{
+ ipc_dial_work *w = arg;
+
+ nni_mtx_lock(&w->mtx);
+ for (;;) {
+ nni_ipc_dialer *d;
+
+ if (w->exit) {
+ break;
+ }
+ while ((d = nni_list_first(&w->waiters)) != NULL) {
+ nni_list_remove(&w->waiters, d);
+ nni_list_append(&w->workers, d);
+ }
+
+ while ((d = nni_list_first(&w->workers)) != NULL) {
+ nni_ipc_conn *c;
+ nni_aio * aio;
+ HANDLE f;
+ int rv;
+ char * path;
+
+ if ((aio = nni_list_first(&d->aios)) == NULL) {
+ nni_list_remove(&w->workers, d);
+ continue;
+ }
+
+ path = nni_aio_get_prov_extra(aio, 0);
+
+ f = CreateFileA(path, GENERIC_READ | GENERIC_WRITE, 0,
+ NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
+
+ if (f == INVALID_HANDLE_VALUE) {
+ switch ((rv = GetLastError())) {
+ case ERROR_PIPE_BUSY:
+ // Still in progress. This
+ // shouldn't happen unless the
+ // other side aborts the
+ // connection.
+ // back at the head of the list
+ nni_list_remove(&w->workers, d);
+ nni_list_prepend(&w->waiters, d);
+ continue;
+
+ case ERROR_FILE_NOT_FOUND:
+ rv = NNG_ECONNREFUSED;
+ break;
+ default:
+ rv = nni_win_error(rv);
+ break;
+ }
+ nni_list_remove(&d->aios, aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ nni_list_remove(&d->aios, aio);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+
+ if (((rv = nni_win_io_register(f)) != 0) ||
+ ((rv = nni_win_ipc_conn_init(&c, f)) != 0)) {
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ c->dialer = d;
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+ }
+
+ if (nni_list_empty(&w->waiters)) {
+ // Wait until an endpoint is added.
+ nni_cv_wait(&w->cv);
+ } else {
+ // Wait 10 ms, unless woken earlier.
+ nni_cv_until(&w->cv, nni_clock() + 10);
+ }
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+static void
+ipc_dial_cancel(nni_aio *aio, int rv)
+{
+ ipc_dial_work * w = &ipc_connecter;
+ nni_ipc_dialer *d = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&w->mtx);
+ if (nni_aio_list_active(aio)) {
+ char *path;
+ if (nni_list_active(&w->waiters, d)) {
+ nni_list_remove(&w->waiters, d);
+ nni_cv_wake(&w->cv);
+ }
+ nni_aio_list_remove(aio);
+ path = nni_aio_get_prov_extra(aio, 0);
+ nni_aio_set_prov_extra(aio, 0, NULL);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+void
+nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+{
+ ipc_dial_work *w = &ipc_connecter;
+ char * path;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ if (sa->s_family != NNG_AF_IPC) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
+ }
+ if ((rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path)) !=
+ 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_mtx_lock(&w->mtx);
+ if ((rv = nni_aio_schedule(aio, ipc_dial_cancel, d)) != 0) {
+ nni_mtx_unlock(&w->mtx);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (d->closed) {
+ nni_mtx_unlock(&w->mtx);
+ nni_strfree(path);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ nni_aio_set_prov_extra(aio, 0, path);
+ nni_list_append(&d->aios, aio);
+ if (nni_list_first(&d->aios) == aio) {
+ nni_list_append(&w->waiters, d);
+ nni_cv_wake(&w->cv);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+void
+nni_ipc_dialer_fini(nni_ipc_dialer *d)
+{
+ nni_ipc_dialer_close(d);
+ NNI_FREE_STRUCT(d);
+}
+
+void
+nni_ipc_dialer_close(nni_ipc_dialer *d)
+{
+ ipc_dial_work *w = &ipc_connecter;
+ nni_aio * aio;
+
+ nni_mtx_lock(&w->mtx);
+ d->closed = true;
+ if (nni_list_active(&w->waiters, d)) {
+ nni_list_remove(&w->waiters, d);
+ }
+ while ((aio = nni_list_first(&d->aios)) != NULL) {
+ nni_list_remove(&d->aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&w->mtx);
+}
+
+int
+nni_win_ipc_sysinit(void)
+{
+ int rv;
+ ipc_dial_work *worker = &ipc_connecter;
+
+ NNI_LIST_INIT(&worker->workers, nni_ipc_dialer, node);
+ NNI_LIST_INIT(&worker->waiters, nni_ipc_dialer, node);
+
+ nni_mtx_init(&worker->mtx);
+ nni_cv_init(&worker->cv, &worker->mtx);
+
+ rv = nni_thr_init(&worker->thr, ipc_dial_thr, worker);
+ if (rv != 0) {
+ return (rv);
+ }
+
+ nni_thr_run(&worker->thr);
+
+ return (0);
+}
+
+void
+nni_win_ipc_sysfini(void)
+{
+ ipc_dial_work *worker = &ipc_connecter;
+
+ nni_reap_drain(); // so that listeners get cleaned up.
+
+ nni_mtx_lock(&worker->mtx);
+ worker->exit = 1;
+ nni_cv_wake(&worker->cv);
+ nni_mtx_unlock(&worker->mtx);
+ nni_thr_fini(&worker->thr);
+ nni_cv_fini(&worker->cv);
+ nni_mtx_fini(&worker->mtx);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c
new file mode 100644
index 00000000..20bb8548
--- /dev/null
+++ b/src/platform/windows/win_ipclisten.c
@@ -0,0 +1,296 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_WINDOWS
+
+#include "win_ipc.h"
+
+#include <stdio.h>
+
+static void
+ipc_accept_done(nni_ipc_listener *l, int rv)
+{
+ nni_aio * aio;
+ HANDLE f;
+ nni_ipc_conn *c;
+
+ aio = nni_list_first(&l->aios);
+ nni_list_remove(&l->aios, aio);
+ nni_cv_wake(&l->cv);
+
+ if (l->closed) {
+ // Closed, so bail.
+ DisconnectNamedPipe(l->f);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ // Create a replacement pipe.
+ f = CreateNamedPipeA(l->path,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS,
+ PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr);
+ if (f == INVALID_HANDLE_VALUE) {
+ // We couldn't create a replacement pipe, so we have to
+ // abort the client from our side, so that we can keep
+ // our server pipe available.
+ rv = nni_win_error(GetLastError());
+ DisconnectNamedPipe(l->f);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (((rv = nni_win_io_register(f)) != 0) ||
+ ((rv = nni_win_ipc_conn_init(&c, l->f)) != 0)) {
+ DisconnectNamedPipe(l->f);
+ DisconnectNamedPipe(f);
+ CloseHandle(f);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ l->f = f;
+ c->listener = l;
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+ipc_accept_start(nni_ipc_listener *l)
+{
+ nni_aio *aio;
+
+ if (l->closed) {
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ nni_list_remove(&l->aios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&l->cv);
+ }
+
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ int rv;
+
+ if ((ConnectNamedPipe(l->f, &l->io.olpd)) ||
+ ((rv = GetLastError()) == ERROR_IO_PENDING)) {
+ // Success, or pending, handled via completion pkt.
+ return;
+ }
+ if (rv == ERROR_PIPE_CONNECTED) {
+ // Kind of like success, but as this is technically
+ // an "error", we have to complete it ourself.
+ // Fake a completion.
+ ipc_accept_done(l, 0);
+ } else {
+ // Fast-fail (synchronous).
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ }
+ }
+}
+
+static void
+ipc_accept_cb(nni_win_io *io, int rv, size_t cnt)
+{
+ nni_ipc_listener *l = io->ptr;
+
+ NNI_ARG_UNUSED(cnt);
+
+ nni_mtx_lock(&l->mtx);
+ if (nni_list_empty(&l->aios)) {
+ // We canceled this somehow. We no longer care.
+ DisconnectNamedPipe(l->f);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (l->rv != 0) {
+ rv = l->rv;
+ l->rv = 0;
+ }
+ ipc_accept_done(l, rv);
+ ipc_accept_start(l);
+ nni_mtx_unlock(&l->mtx);
+}
+
+int
+nni_ipc_listener_init(nni_ipc_listener **lp)
+{
+ nni_ipc_listener *l;
+ int rv;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_win_io_init(&l->io, ipc_accept_cb, l)) != 0) {
+ NNI_FREE_STRUCT(l);
+ return (rv);
+ }
+ l->started = false;
+ l->closed = false;
+ l->sec_attr.nLength = sizeof(l->sec_attr);
+ l->sec_attr.lpSecurityDescriptor = NULL;
+ l->sec_attr.bInheritHandle = FALSE;
+ nni_aio_list_init(&l->aios);
+ nni_mtx_init(&l->mtx);
+ nni_cv_init(&l->cv, &l->mtx);
+ *lp = l;
+ return (0);
+}
+
+int
+nni_ipc_listener_set_permissions(nni_ipc_listener *l, int bits)
+{
+ NNI_ARG_UNUSED(l);
+ NNI_ARG_UNUSED(bits);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *desc)
+{
+ if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) {
+ return (NNG_EINVAL);
+ }
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ l->sec_attr.lpSecurityDescriptor = desc;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+int
+nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
+{
+ int rv;
+ HANDLE f;
+ char * path;
+
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path);
+ if (rv != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ f = CreateNamedPipeA(path,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
+ FILE_FLAG_FIRST_PIPE_INSTANCE,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS,
+ PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr);
+ if (f == INVALID_HANDLE_VALUE) {
+ if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) {
+ rv = NNG_EADDRINUSE;
+ } else {
+ rv = nni_win_error(rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+ if ((rv = nni_win_io_register(f)) != 0) {
+ CloseHandle(f);
+ nni_mtx_unlock(&l->mtx);
+ nni_strfree(path);
+ return (rv);
+ }
+
+ l->f = f;
+ l->path = path;
+ l->started = true;
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+static void
+ipc_accept_cancel(nni_aio *aio, int rv)
+{
+ nni_ipc_listener *l = nni_aio_get_prov_data(aio);
+
+ nni_mtx_unlock(&l->mtx);
+ if (aio == nni_list_first(&l->aios)) {
+ l->rv = rv;
+ CancelIoEx(l->f, &l->io.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_list_remove(&l->aios, aio);
+ nni_cv_wake(&l->cv);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
+{
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&l->mtx);
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ nni_list_append(&l->aios, aio);
+ if (nni_list_first(&l->aios) == aio) {
+ ipc_accept_start(l);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_close(nni_ipc_listener *l)
+{
+
+ nni_mtx_lock(&l->mtx);
+ if (!l->closed) {
+ l->closed = true;
+ if (!nni_list_empty(&l->aios)) {
+ CancelIoEx(l->f, &l->io.olpd);
+ }
+ DisconnectNamedPipe(l->f);
+ CloseHandle(l->f);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ipc_listener_fini(nni_ipc_listener *l)
+{
+ nni_mtx_lock(&l->mtx);
+ while (!nni_list_empty(&l->aios)) {
+ nni_cv_wait(&l->cv);
+ }
+ nni_mtx_unlock(&l->mtx);
+ nni_win_io_fini(&l->io);
+ nni_strfree(l->path);
+ nni_cv_fini(&l->cv);
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
+
+#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c
index c3a4a5d8..08b759ca 100644
--- a/src/platform/windows/win_tcpconn.c
+++ b/src/platform/windows/win_tcpconn.c
@@ -102,7 +102,7 @@ tcp_recv_cancel(nni_aio *aio, int rv)
nni_mtx_lock(&c->mtx);
if (aio == nni_list_first(&c->recv_aios)) {
c->recv_rv = rv;
- nni_win_io_cancel(&c->recv_io);
+ CancelIoEx((HANDLE) c->s, &c->recv_io.olpd);
} else if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -192,7 +192,7 @@ tcp_send_cancel(nni_aio *aio, int rv)
nni_mtx_lock(&c->mtx);
if (aio == nni_list_first(&c->send_aios)) {
c->send_rv = rv;
- nni_win_io_cancel(&c->send_io);
+ CancelIoEx((HANDLE) c->s, &c->send_io.olpd);
} else if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -273,10 +273,8 @@ nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s)
nni_aio_list_init(&c->send_aios);
c->conn_aio = NULL;
- if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) !=
- 0) ||
- ((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) !=
- 0) ||
+ if (((rv = nni_win_io_init(&c->recv_io, tcp_recv_cb, c)) != 0) ||
+ ((rv = nni_win_io_init(&c->send_io, tcp_send_cb, c)) != 0) ||
((rv = nni_win_io_register((HANDLE) s)) != 0)) {
nni_tcp_conn_fini(c);
return (rv);
@@ -309,10 +307,10 @@ nni_tcp_conn_close(nni_tcp_conn *c)
if (!c->closed) {
c->closed = true;
if (!nni_list_empty(&c->recv_aios)) {
- nni_win_io_cancel(&c->recv_io);
+ CancelIoEx((HANDLE) c->s, &c->recv_io.olpd);
}
if (!nni_list_empty(&c->send_aios)) {
- nni_win_io_cancel(&c->send_io);
+ CancelIoEx((HANDLE) c->s, &c->send_io.olpd);
}
if (c->s != INVALID_SOCKET) {
shutdown(c->s, SD_BOTH);
@@ -372,7 +370,6 @@ nni_tcp_conn_fini(nni_tcp_conn *c)
while ((!nni_list_empty(&c->recv_aios)) ||
(!nni_list_empty(&c->send_aios))) {
nni_cv_wait(&c->cv);
- nni_mtx_unlock(&c->mtx);
}
nni_mtx_unlock(&c->mtx);
diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c
index 4a3e9f2f..5283ea81 100644
--- a/src/platform/windows/win_tcpdial.c
+++ b/src/platform/windows/win_tcpdial.c
@@ -70,7 +70,7 @@ nni_tcp_dialer_close(nni_tcp_dialer *d)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->conn_rv = NNG_ECLOSED;
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
}
}
@@ -104,7 +104,7 @@ tcp_dial_cancel(nni_aio *aio, int rv)
if (c->conn_rv == 0) {
c->conn_rv = rv;
}
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
nni_mtx_unlock(&d->mtx);
}
@@ -187,8 +187,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) !=
- 0) {
+ if ((rv = nni_win_io_init(&c->conn_io, tcp_dial_cb, c)) != 0) {
nni_tcp_conn_fini(c);
nni_aio_finish_error(aio, rv);
return;
diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c
index 5055c32d..1f197246 100644
--- a/src/platform/windows/win_tcplisten.c
+++ b/src/platform/windows/win_tcplisten.c
@@ -147,7 +147,7 @@ nni_tcp_listener_close(nni_tcp_listener *l)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->conn_rv = NNG_ECLOSED;
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
}
closesocket(l->s);
@@ -246,7 +246,7 @@ tcp_accept_cancel(nni_aio *aio, int rv)
if (c->conn_rv == 0) {
c->conn_rv = rv;
}
- nni_win_io_cancel(&c->conn_io);
+ CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
nni_mtx_unlock(&l->mtx);
}
@@ -263,6 +263,11 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
return;
}
nni_mtx_lock(&l->mtx);
+ if (!l->started) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
if (l->closed) {
nni_mtx_unlock(&l->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -286,8 +291,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
c->listener = l;
c->conn_aio = aio;
nni_aio_set_prov_extra(aio, 0, c);
- if (((rv = nni_win_io_init(
- &c->conn_io, (HANDLE) l->s, tcp_accept_cb, c)) != 0) ||
+ if (((rv = nni_win_io_init(&c->conn_io, tcp_accept_cb, c)) != 0) ||
((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) {
nni_aio_set_prov_extra(aio, 0, NULL);
nni_mtx_unlock(&l->mtx);
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 7d99e507..ee72d6d8 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -20,16 +20,17 @@
// Windows named pipes. Other platforms could use other mechanisms,
// but all implementations on the platform must use the same mechanism.
-typedef struct ipc_pipe ipc_pipe;
-typedef struct ipc_ep ipc_ep;
+typedef struct ipctran_pipe ipctran_pipe;
+typedef struct ipctran_dialer ipctran_dialer;
+typedef struct ipctran_listener ipctran_listener;
// ipc_pipe is one end of an IPC connection.
-struct ipc_pipe {
- nni_plat_ipc_pipe *ipp;
- uint16_t peer;
- uint16_t proto;
- size_t rcvmax;
- nni_sockaddr sa;
+struct ipctran_pipe {
+ nni_ipc_conn *conn;
+ uint16_t peer;
+ uint16_t proto;
+ size_t rcvmax;
+ nni_sockaddr sa;
uint8_t txhead[1 + sizeof(uint64_t)];
uint8_t rxhead[1 + sizeof(uint64_t)];
@@ -48,189 +49,201 @@ struct ipc_pipe {
nni_mtx mtx;
};
-struct ipc_ep {
- nni_sockaddr sa;
- nni_plat_ipc_ep *iep;
- uint16_t proto;
- size_t rcvmax;
- nni_aio * aio;
- nni_aio * user_aio;
- nni_mtx mtx;
+struct ipctran_dialer {
+ nni_sockaddr sa;
+ nni_ipc_dialer *dialer;
+ uint16_t proto;
+ size_t rcvmax;
+ nni_aio * aio;
+ nni_aio * user_aio;
+ nni_mtx mtx;
};
-static void ipc_pipe_dosend(ipc_pipe *, nni_aio *);
-static void ipc_pipe_dorecv(ipc_pipe *);
-static void ipc_pipe_send_cb(void *);
-static void ipc_pipe_recv_cb(void *);
-static void ipc_pipe_nego_cb(void *);
-static void ipc_ep_cb(void *);
+struct ipctran_listener {
+ nni_sockaddr sa;
+ nni_ipc_listener *listener;
+ uint16_t proto;
+ size_t rcvmax;
+ nni_aio * aio;
+ nni_aio * user_aio;
+ nni_mtx mtx;
+};
+
+static void ipctran_pipe_send_start(ipctran_pipe *);
+static void ipctran_pipe_recv_start(ipctran_pipe *);
+static void ipctran_pipe_send_cb(void *);
+static void ipctran_pipe_recv_cb(void *);
+static void ipctran_pipe_nego_cb(void *);
+static void ipctran_dialer_cb(void *);
+static void ipctran_listener_cb(void *);
static int
-ipc_tran_init(void)
+ipctran_init(void)
{
return (0);
}
static void
-ipc_tran_fini(void)
+ipctran_fini(void)
{
}
static void
-ipc_pipe_close(void *arg)
+ipctran_pipe_close(void *arg)
{
- ipc_pipe *pipe = arg;
+ ipctran_pipe *p = arg;
- nni_aio_close(pipe->rxaio);
- nni_aio_close(pipe->txaio);
- nni_aio_close(pipe->negaio);
+ nni_aio_close(p->rxaio);
+ nni_aio_close(p->txaio);
+ nni_aio_close(p->negaio);
- nni_plat_ipc_pipe_close(pipe->ipp);
+ nni_ipc_conn_close(p->conn);
}
static void
-ipc_pipe_stop(void *arg)
+ipctran_pipe_stop(void *arg)
{
- ipc_pipe *pipe = arg;
+ ipctran_pipe *p = arg;
- nni_aio_stop(pipe->rxaio);
- nni_aio_stop(pipe->txaio);
- nni_aio_stop(pipe->negaio);
+ nni_aio_stop(p->rxaio);
+ nni_aio_stop(p->txaio);
+ nni_aio_stop(p->negaio);
}
static void
-ipc_pipe_fini(void *arg)
+ipctran_pipe_fini(void *arg)
{
- ipc_pipe *pipe = arg;
+ ipctran_pipe *p = arg;
- nni_aio_fini(pipe->rxaio);
- nni_aio_fini(pipe->txaio);
- nni_aio_fini(pipe->negaio);
- if (pipe->ipp != NULL) {
- nni_plat_ipc_pipe_fini(pipe->ipp);
+ nni_aio_fini(p->rxaio);
+ nni_aio_fini(p->txaio);
+ nni_aio_fini(p->negaio);
+ if (p->conn != NULL) {
+ nni_ipc_conn_fini(p->conn);
}
- if (pipe->rxmsg) {
- nni_msg_free(pipe->rxmsg);
+ if (p->rxmsg) {
+ nni_msg_free(p->rxmsg);
}
- nni_mtx_fini(&pipe->mtx);
- NNI_FREE_STRUCT(pipe);
+ nni_mtx_fini(&p->mtx);
+ NNI_FREE_STRUCT(p);
}
static int
-ipc_pipe_init(ipc_pipe **pipep, ipc_ep *ep, void *ipp)
+ipctran_pipe_init(ipctran_pipe **pipep, void *conn)
{
- ipc_pipe *p;
- int rv;
+ ipctran_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- if (((rv = nni_aio_init(&p->txaio, ipc_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->rxaio, ipc_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->negaio, ipc_pipe_nego_cb, p)) != 0)) {
- ipc_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->txaio, ipctran_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->negaio, ipctran_pipe_nego_cb, p)) != 0)) {
+ ipctran_pipe_fini(p);
return (rv);
}
nni_aio_list_init(&p->sendq);
nni_aio_list_init(&p->recvq);
+ p->conn = conn;
+#if 0
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
- p->ipp = ipp;
p->sa.s_ipc.sa_family = NNG_AF_IPC;
p->sa = ep->sa;
-
+#endif
*pipep = p;
return (0);
}
static void
-ipc_cancel_start(nni_aio *aio, int rv)
+ipctran_pipe_nego_cancel(nni_aio *aio, int rv)
{
- ipc_pipe *pipe = nni_aio_get_prov_data(aio);
+ ipctran_pipe *p = nni_aio_get_prov_data(aio);
- nni_mtx_lock(&pipe->mtx);
- if (pipe->user_negaio != aio) {
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
+ if (p->user_negaio != aio) {
+ nni_mtx_unlock(&p->mtx);
return;
}
- pipe->user_negaio = NULL;
- nni_mtx_unlock(&pipe->mtx);
+ p->user_negaio = NULL;
+ nni_mtx_unlock(&p->mtx);
- nni_aio_abort(pipe->negaio, rv);
+ nni_aio_abort(p->negaio, rv);
nni_aio_finish_error(aio, rv);
}
static void
-ipc_pipe_nego_cb(void *arg)
+ipctran_pipe_nego_cb(void *arg)
{
- ipc_pipe *pipe = arg;
- nni_aio * aio = pipe->negaio;
- int rv;
+ ipctran_pipe *p = arg;
+ nni_aio * aio = p->negaio;
+ int rv;
- nni_mtx_lock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
if ((rv = nni_aio_result(aio)) != 0) {
goto done;
}
// We start transmitting before we receive.
- if (pipe->gottxhead < pipe->wanttxhead) {
- pipe->gottxhead += nni_aio_count(aio);
- } else if (pipe->gotrxhead < pipe->wantrxhead) {
- pipe->gotrxhead += nni_aio_count(aio);
+ if (p->gottxhead < p->wanttxhead) {
+ p->gottxhead += nni_aio_count(aio);
+ } else if (p->gotrxhead < p->wantrxhead) {
+ p->gotrxhead += nni_aio_count(aio);
}
- if (pipe->gottxhead < pipe->wanttxhead) {
+ if (p->gottxhead < p->wanttxhead) {
nni_iov iov;
- iov.iov_len = pipe->wanttxhead - pipe->gottxhead;
- iov.iov_buf = &pipe->txhead[pipe->gottxhead];
+ iov.iov_len = p->wanttxhead - p->gottxhead;
+ iov.iov_buf = &p->txhead[p->gottxhead];
nni_aio_set_iov(aio, 1, &iov);
// send it down...
- nni_plat_ipc_pipe_send(pipe->ipp, aio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_ipc_conn_send(p->conn, aio);
+ nni_mtx_unlock(&p->mtx);
return;
}
- if (pipe->gotrxhead < pipe->wantrxhead) {
+ if (p->gotrxhead < p->wantrxhead) {
nni_iov iov;
- iov.iov_len = pipe->wantrxhead - pipe->gotrxhead;
- iov.iov_buf = &pipe->rxhead[pipe->gotrxhead];
+ iov.iov_len = p->wantrxhead - p->gotrxhead;
+ iov.iov_buf = &p->rxhead[p->gotrxhead];
nni_aio_set_iov(aio, 1, &iov);
- nni_plat_ipc_pipe_recv(pipe->ipp, aio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_ipc_conn_recv(p->conn, aio);
+ nni_mtx_unlock(&p->mtx);
return;
}
// We have both sent and received the headers. Lets check the
// receive side header.
- if ((pipe->rxhead[0] != 0) || (pipe->rxhead[1] != 'S') ||
- (pipe->rxhead[2] != 'P') || (pipe->rxhead[3] != 0) ||
- (pipe->rxhead[6] != 0) || (pipe->rxhead[7] != 0)) {
+ if ((p->rxhead[0] != 0) || (p->rxhead[1] != 'S') ||
+ (p->rxhead[2] != 'P') || (p->rxhead[3] != 0) ||
+ (p->rxhead[6] != 0) || (p->rxhead[7] != 0)) {
rv = NNG_EPROTO;
goto done;
}
- NNI_GET16(&pipe->rxhead[4], pipe->peer);
+ NNI_GET16(&p->rxhead[4], p->peer);
done:
- if ((aio = pipe->user_negaio) != NULL) {
- pipe->user_negaio = NULL;
+ if ((aio = p->user_negaio) != NULL) {
+ p->user_negaio = NULL;
nni_aio_finish(aio, rv, 0);
}
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
}
static void
-ipc_pipe_send_cb(void *arg)
+ipctran_pipe_send_cb(void *arg)
{
- ipc_pipe *pipe = arg;
- nni_aio * aio;
- nni_aio * txaio = pipe->txaio;
- nni_msg * msg;
- int rv;
- size_t n;
+ ipctran_pipe *p = arg;
+ int rv;
+ nni_aio * aio;
+ size_t n;
+ nni_msg * msg;
+ nni_aio * txaio = p->txaio;
- nni_mtx_lock(&pipe->mtx);
- aio = nni_list_first(&pipe->sendq);
+ nni_mtx_lock(&p->mtx);
+ aio = nni_list_first(&p->sendq);
if ((rv = nni_aio_result(txaio)) != 0) {
// Intentionally we do not queue up another transfer.
@@ -239,7 +252,7 @@ ipc_pipe_send_cb(void *arg)
// The protocol should see this error, and close the
// pipe itself, we hope.
nni_aio_list_remove(aio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
}
@@ -247,17 +260,15 @@ ipc_pipe_send_cb(void *arg)
n = nni_aio_count(txaio);
nni_aio_iov_advance(txaio, n);
if (nni_aio_iov_count(txaio) != 0) {
- nni_plat_ipc_pipe_send(pipe->ipp, txaio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_ipc_conn_send(p->conn, txaio);
+ nni_mtx_unlock(&p->mtx);
return;
}
nni_aio_list_remove(aio);
- if (!nni_list_empty(&pipe->sendq)) {
- // schedule next send
- ipc_pipe_dosend(pipe, nni_list_first(&pipe->sendq));
- }
- nni_mtx_unlock(&pipe->mtx);
+ ipctran_pipe_send_start(p);
+
+ nni_mtx_unlock(&p->mtx);
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
@@ -267,17 +278,17 @@ ipc_pipe_send_cb(void *arg)
}
static void
-ipc_pipe_recv_cb(void *arg)
+ipctran_pipe_recv_cb(void *arg)
{
- ipc_pipe *pipe = arg;
- nni_aio * aio;
- int rv;
- size_t n;
- nni_msg * msg;
- nni_aio * rxaio = pipe->rxaio;
+ ipctran_pipe *p = arg;
+ nni_aio * aio;
+ int rv;
+ size_t n;
+ nni_msg * msg;
+ nni_aio * rxaio = p->rxaio;
- nni_mtx_lock(&pipe->mtx);
- aio = nni_list_first(&pipe->recvq);
+ nni_mtx_lock(&p->mtx);
+ aio = nni_list_first(&p->recvq);
if ((rv = nni_aio_result(rxaio)) != 0) {
// Error on receive. This has to cause an error back
@@ -290,29 +301,29 @@ ipc_pipe_recv_cb(void *arg)
nni_aio_iov_advance(rxaio, n);
if (nni_aio_iov_count(rxaio) != 0) {
// Was this a partial read? If so then resubmit for the rest.
- nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_ipc_conn_recv(p->conn, rxaio);
+ nni_mtx_unlock(&p->mtx);
return;
}
// If we don't have a message yet, we were reading the message
// header, which is just the length. This tells us the size of the
// message to allocate and how much more to expect.
- if (pipe->rxmsg == NULL) {
+ if (p->rxmsg == NULL) {
uint64_t len;
// Check to make sure we got msg type 1.
- if (pipe->rxhead[0] != 1) {
+ if (p->rxhead[0] != 1) {
rv = NNG_EPROTO;
goto recv_error;
}
// We should have gotten a message header.
- NNI_GET64(pipe->rxhead + 1, len);
+ NNI_GET64(p->rxhead + 1, len);
// Make sure the message payload is not too big. If it is
// the caller will shut down the pipe.
- if ((len > pipe->rcvmax) && (pipe->rcvmax > 0)) {
+ if ((len > p->rcvmax) && (p->rcvmax > 0)) {
rv = NNG_EMSGSIZE;
goto recv_error;
}
@@ -322,7 +333,7 @@ ipc_pipe_recv_cb(void *arg)
// lock for the read side in the future, so that we allow
// transmits to proceed normally. In practice this is
// unlikely to be much of an issue though.
- if ((rv = nni_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
+ if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) {
goto recv_error;
}
@@ -330,11 +341,12 @@ ipc_pipe_recv_cb(void *arg)
nni_iov iov;
// Submit the rest of the data for a read -- we want to
// read the entire message now.
- iov.iov_buf = nni_msg_body(pipe->rxmsg);
+ iov.iov_buf = nni_msg_body(p->rxmsg);
iov.iov_len = (size_t) len;
+
nni_aio_set_iov(rxaio, 1, &iov);
- nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_ipc_conn_recv(p->conn, rxaio);
+ nni_mtx_unlock(&p->mtx);
return;
}
}
@@ -343,12 +355,12 @@ ipc_pipe_recv_cb(void *arg)
// good news.
nni_aio_list_remove(aio);
- msg = pipe->rxmsg;
- pipe->rxmsg = NULL;
- if (!nni_list_empty(&pipe->recvq)) {
- ipc_pipe_dorecv(pipe);
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
+ if (!nni_list_empty(&p->recvq)) {
+ ipctran_pipe_recv_start(p);
}
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
nni_aio_set_msg(aio, msg);
nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
@@ -356,59 +368,65 @@ ipc_pipe_recv_cb(void *arg)
recv_error:
nni_aio_list_remove(aio);
- msg = pipe->rxmsg;
- pipe->rxmsg = NULL;
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
// Intentionally, we do not queue up another receive.
// The protocol should notice this error and close the pipe.
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
nni_msg_free(msg);
nni_aio_finish_error(aio, rv);
}
static void
-ipc_cancel_tx(nni_aio *aio, int rv)
+ipctran_pipe_send_cancel(nni_aio *aio, int rv)
{
- ipc_pipe *pipe = nni_aio_get_prov_data(aio);
+ ipctran_pipe *p = nni_aio_get_prov_data(aio);
- nni_mtx_lock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
if (!nni_aio_list_active(aio)) {
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
return;
}
// If this is being sent, then cancel the pending transfer.
// The callback on the txaio will cause the user aio to
// be canceled too.
- if (nni_list_first(&pipe->sendq) == aio) {
- nni_aio_abort(pipe->txaio, rv);
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_list_first(&p->sendq) == aio) {
+ nni_aio_abort(p->txaio, rv);
+ nni_mtx_unlock(&p->mtx);
return;
}
nni_aio_list_remove(aio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
+
nni_aio_finish_error(aio, rv);
}
static void
-ipc_pipe_dosend(ipc_pipe *pipe, nni_aio *aio)
+ipctran_pipe_send_start(ipctran_pipe *p)
{
+ nni_aio *aio;
nni_aio *txaio;
nni_msg *msg;
int niov;
nni_iov iov[3];
uint64_t len;
+ if ((aio = nni_list_first(&p->sendq)) == NULL) {
+ return;
+ }
+
// This runs to send the message.
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg) + nni_msg_header_len(msg);
- pipe->txhead[0] = 1; // message type, 1.
- NNI_PUT64(pipe->txhead + 1, len);
+ p->txhead[0] = 1; // message type, 1.
+ NNI_PUT64(p->txhead + 1, len);
- txaio = pipe->txaio;
+ txaio = p->txaio;
niov = 0;
- iov[0].iov_buf = pipe->txhead;
- iov[0].iov_len = sizeof(pipe->txhead);
+ iov[0].iov_buf = p->txhead;
+ iov[0].iov_len = sizeof(p->txhead);
niov++;
if (nni_msg_header_len(msg) > 0) {
iov[niov].iov_buf = nni_msg_header(msg);
@@ -421,504 +439,643 @@ ipc_pipe_dosend(ipc_pipe *pipe, nni_aio *aio)
niov++;
}
nni_aio_set_iov(txaio, niov, iov);
- nni_plat_ipc_pipe_send(pipe->ipp, txaio);
+ nni_ipc_conn_send(p->conn, txaio);
}
static void
-ipc_pipe_send(void *arg, nni_aio *aio)
+ipctran_pipe_send(void *arg, nni_aio *aio)
{
- ipc_pipe *pipe = arg;
- int rv;
+ ipctran_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
- nni_mtx_lock(&pipe->mtx);
- if ((rv = nni_aio_schedule(aio, ipc_cancel_tx, pipe)) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
+ if ((rv = nni_aio_schedule(aio, ipctran_pipe_send_cancel, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
}
- nni_list_append(&pipe->sendq, aio);
- if (nni_list_first(&pipe->sendq) == aio) {
- ipc_pipe_dosend(pipe, aio);
+ nni_list_append(&p->sendq, aio);
+ if (nni_list_first(&p->sendq) == aio) {
+ ipctran_pipe_send_start(p);
}
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
}
static void
-ipc_cancel_rx(nni_aio *aio, int rv)
+ipctran_pipe_recv_cancel(nni_aio *aio, int rv)
{
- ipc_pipe *pipe = nni_aio_get_prov_data(aio);
+ ipctran_pipe *p = nni_aio_get_prov_data(aio);
- nni_mtx_lock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
if (!nni_aio_list_active(aio)) {
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
return;
}
// If receive in progress, then cancel the pending transfer.
// The callback on the rxaio will cause the user aio to
// be canceled too.
- if (nni_list_first(&pipe->recvq) == aio) {
- nni_aio_abort(pipe->rxaio, rv);
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_list_first(&p->recvq) == aio) {
+ nni_aio_abort(p->rxaio, rv);
+ nni_mtx_unlock(&p->mtx);
return;
}
nni_aio_list_remove(aio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
}
static void
-ipc_pipe_dorecv(ipc_pipe *pipe)
+ipctran_pipe_recv_start(ipctran_pipe *p)
{
nni_aio *rxaio;
nni_iov iov;
- NNI_ASSERT(pipe->rxmsg == NULL);
+ NNI_ASSERT(p->rxmsg == NULL);
// Schedule a read of the IPC header.
- rxaio = pipe->rxaio;
- iov.iov_buf = pipe->rxhead;
- iov.iov_len = sizeof(pipe->rxhead);
+ rxaio = p->rxaio;
+ iov.iov_buf = p->rxhead;
+ iov.iov_len = sizeof(p->rxhead);
nni_aio_set_iov(rxaio, 1, &iov);
- nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
+ nni_ipc_conn_recv(p->conn, rxaio);
}
static void
-ipc_pipe_recv(void *arg, nni_aio *aio)
+ipctran_pipe_recv(void *arg, nni_aio *aio)
{
- ipc_pipe *pipe = arg;
- int rv;
+ ipctran_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
- nni_mtx_lock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, ipc_cancel_rx, pipe)) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ if ((rv = nni_aio_schedule(aio, ipctran_pipe_recv_cancel, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
}
- nni_list_append(&pipe->recvq, aio);
- if (nni_list_first(&pipe->recvq) == aio) {
- ipc_pipe_dorecv(pipe);
+ nni_list_append(&p->recvq, aio);
+ if (nni_list_first(&p->recvq) == aio) {
+ ipctran_pipe_recv_start(p);
}
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
}
static void
-ipc_pipe_start(void *arg, nni_aio *aio)
+ipctran_pipe_start(void *arg, nni_aio *aio)
{
- ipc_pipe *pipe = arg;
- nni_aio * negaio;
- nni_iov iov;
- int rv;
+ ipctran_pipe *p = arg;
+ nni_aio * negaio;
+ nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
- nni_mtx_lock(&pipe->mtx);
- if ((rv = nni_aio_schedule(aio, ipc_cancel_start, pipe)) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
+ if ((rv = nni_aio_schedule(aio, ipctran_pipe_nego_cancel, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
}
- pipe->txhead[0] = 0;
- pipe->txhead[1] = 'S';
- pipe->txhead[2] = 'P';
- pipe->txhead[3] = 0;
- NNI_PUT16(&pipe->txhead[4], pipe->proto);
- NNI_PUT16(&pipe->txhead[6], 0);
-
- pipe->user_negaio = aio;
- pipe->gotrxhead = 0;
- pipe->gottxhead = 0;
- pipe->wantrxhead = 8;
- pipe->wanttxhead = 8;
- negaio = pipe->negaio;
- iov.iov_len = 8;
- iov.iov_buf = &pipe->txhead[0];
+ p->txhead[0] = 0;
+ p->txhead[1] = 'S';
+ p->txhead[2] = 'P';
+ p->txhead[3] = 0;
+ NNI_PUT16(&p->txhead[4], p->proto);
+ NNI_PUT16(&p->txhead[6], 0);
+
+ p->user_negaio = aio;
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ negaio = p->negaio;
+ iov.iov_len = 8;
+ iov.iov_buf = &p->txhead[0];
nni_aio_set_iov(negaio, 1, &iov);
- nni_plat_ipc_pipe_send(pipe->ipp, negaio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_ipc_conn_send(p->conn, negaio);
+ nni_mtx_unlock(&p->mtx);
}
static uint16_t
-ipc_pipe_peer(void *arg)
+ipctran_pipe_peer(void *arg)
{
- ipc_pipe *pipe = arg;
+ ipctran_pipe *p = arg;
- return (pipe->peer);
+ return (p->peer);
}
static int
-ipc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t)
+ipctran_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
- ipc_pipe *p = arg;
+ ipctran_pipe *p = arg;
return (nni_copyout_sockaddr(&p->sa, buf, szp, t));
}
static int
-ipc_pipe_get_peer_uid(void *arg, void *buf, size_t *szp, nni_opt_type t)
+ipctran_pipe_get_peer_uid(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
- ipc_pipe *p = arg;
- uint64_t id;
- int rv;
- if ((rv = nni_plat_ipc_pipe_get_peer_uid(p->ipp, &id)) != 0) {
+ ipctran_pipe *p = arg;
+ uint64_t id;
+ int rv;
+ if ((rv = nni_ipc_conn_get_peer_uid(p->conn, &id)) != 0) {
return (rv);
}
return (nni_copyout_u64(id, buf, szp, t));
}
static int
-ipc_pipe_get_peer_gid(void *arg, void *buf, size_t *szp, nni_opt_type t)
+ipctran_pipe_get_peer_gid(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
- ipc_pipe *p = arg;
- uint64_t id;
- int rv;
- if ((rv = nni_plat_ipc_pipe_get_peer_gid(p->ipp, &id)) != 0) {
+ ipctran_pipe *p = arg;
+ uint64_t id;
+ int rv;
+ if ((rv = nni_ipc_conn_get_peer_gid(p->conn, &id)) != 0) {
return (rv);
}
return (nni_copyout_u64(id, buf, szp, t));
}
static int
-ipc_pipe_get_peer_pid(void *arg, void *buf, size_t *szp, nni_opt_type t)
+ipctran_pipe_get_peer_pid(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
- ipc_pipe *p = arg;
- uint64_t id;
- int rv;
- if ((rv = nni_plat_ipc_pipe_get_peer_pid(p->ipp, &id)) != 0) {
+ ipctran_pipe *p = arg;
+ uint64_t id;
+ int rv;
+ if ((rv = nni_ipc_conn_get_peer_pid(p->conn, &id)) != 0) {
return (rv);
}
return (nni_copyout_u64(id, buf, szp, t));
}
static int
-ipc_pipe_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_opt_type t)
+ipctran_pipe_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
- ipc_pipe *p = arg;
- uint64_t id;
- int rv;
- if ((rv = nni_plat_ipc_pipe_get_peer_zoneid(p->ipp, &id)) != 0) {
+ ipctran_pipe *p = arg;
+ uint64_t id;
+ int rv;
+ if ((rv = nni_ipc_conn_get_peer_zoneid(p->conn, &id)) != 0) {
return (rv);
}
return (nni_copyout_u64(id, buf, szp, t));
}
static void
-ipc_ep_fini(void *arg)
+ipctran_dialer_fini(void *arg)
{
- ipc_ep *ep = arg;
+ ipctran_dialer *d = arg;
- nni_aio_stop(ep->aio);
- nni_plat_ipc_ep_fini(ep->iep);
- nni_aio_fini(ep->aio);
- nni_mtx_fini(&ep->mtx);
- NNI_FREE_STRUCT(ep);
+ nni_aio_stop(d->aio);
+ if (d->dialer != NULL) {
+ nni_ipc_dialer_fini(d->dialer);
+ }
+ nni_aio_fini(d->aio);
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+static void
+ipctran_dialer_close(void *arg)
+{
+ ipctran_dialer *d = arg;
+
+ nni_aio_close(d->aio);
+ nni_ipc_dialer_close(d->dialer);
}
static int
-ipc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
+ipctran_dialer_init(void **dp, nni_url *url, nni_sock *sock)
{
- ipc_ep *ep;
- int rv;
- size_t sz;
+ ipctran_dialer *d;
+ int rv;
+ size_t sz;
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&ep->mtx);
+ nni_mtx_init(&d->mtx);
- sz = sizeof(ep->sa.s_ipc.sa_path);
- ep->sa.s_ipc.sa_family = NNG_AF_IPC;
+ sz = sizeof(d->sa.s_ipc.sa_path);
+ d->sa.s_ipc.sa_family = NNG_AF_IPC;
- if (nni_strlcpy(ep->sa.s_ipc.sa_path, url->u_path, sz) >= sz) {
- ipc_ep_fini(ep);
+ if (nni_strlcpy(d->sa.s_ipc.sa_path, url->u_path, sz) >= sz) {
+ ipctran_dialer_fini(d);
return (NNG_EADDRINVAL);
}
- if ((rv = nni_plat_ipc_ep_init(&ep->iep, &ep->sa, mode)) != 0) {
- ipc_ep_fini(ep);
+ if (((rv = nni_ipc_dialer_init(&d->dialer)) != 0) ||
+ ((rv = nni_aio_init(&d->aio, ipctran_dialer_cb, d)) != 0)) {
+ ipctran_dialer_fini(d);
return (rv);
}
- if ((rv = nni_aio_init(&ep->aio, ipc_ep_cb, ep)) != 0) {
- ipc_ep_fini(ep);
- return (rv);
- }
- ep->proto = nni_sock_proto_id(sock);
+ d->proto = nni_sock_proto_id(sock);
- *epp = ep;
+ *dp = d;
return (0);
}
-static int
-ipc_dialer_init(void **epp, nni_url *url, nni_sock *sock)
+static void
+ipctran_dialer_cb(void *arg)
{
- return (ipc_ep_init(epp, url, sock, NNI_EP_MODE_DIAL));
+ ipctran_dialer *d = arg;
+ ipctran_pipe * p;
+ nni_ipc_conn * conn;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&d->mtx);
+ aio = d->user_aio;
+ rv = nni_aio_result(d->aio);
+
+ if (aio == NULL) {
+ nni_mtx_unlock(&d->mtx);
+ if (rv == 0) {
+ conn = nni_aio_get_output(d->aio, 0);
+ nni_ipc_conn_fini(conn);
+ }
+ return;
+ }
+
+ if (rv != 0) {
+ d->user_aio = NULL;
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ d->user_aio = NULL;
+ conn = nni_aio_get_output(d->aio, 0);
+ NNI_ASSERT(conn != NULL);
+ if ((rv = ipctran_pipe_init(&p, conn)) != 0) {
+ nni_mtx_unlock(&d->mtx);
+ nni_ipc_conn_fini(conn);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ p->proto = d->proto;
+ p->rcvmax = d->rcvmax;
+ p->sa = d->sa;
+ nni_mtx_unlock(&d->mtx);
+
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
}
-static int
-ipc_listener_init(void **epp, nni_url *url, nni_sock *sock)
+static void
+ipctran_dialer_cancel(nni_aio *aio, int rv)
{
- return (ipc_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN));
+ ipctran_dialer *d = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&d->mtx);
+ if (d->user_aio != aio) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ d->user_aio = NULL;
+ nni_mtx_unlock(&d->mtx);
+
+ nni_aio_abort(d->aio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
-ipc_ep_close(void *arg)
+ipctran_dialer_connect(void *arg, nni_aio *aio)
{
- ipc_ep *ep = arg;
+ ipctran_dialer *d = arg;
+ int rv;
- nni_aio_close(ep->aio);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&d->mtx);
+ NNI_ASSERT(d->user_aio == NULL);
+
+ if ((rv = nni_aio_schedule(aio, ipctran_dialer_cancel, d)) != 0) {
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ d->user_aio = aio;
- nni_mtx_lock(&ep->mtx);
- nni_plat_ipc_ep_close(ep->iep);
- nni_mtx_unlock(&ep->mtx);
+ nni_ipc_dialer_dial(d->dialer, &d->sa, d->aio);
+ nni_mtx_unlock(&d->mtx);
}
static int
-ipc_ep_bind(void *arg)
+ipctran_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- ipc_ep *ep = arg;
- int rv;
+ ipctran_dialer *d = arg;
+ int rv;
+ nni_mtx_lock(&d->mtx);
+ rv = nni_copyout_size(d->rcvmax, v, szp, t);
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
+}
- nni_mtx_lock(&ep->mtx);
- rv = nni_plat_ipc_ep_listen(ep->iep);
- nni_mtx_unlock(&ep->mtx);
+static int
+ipctran_dialer_set_recvmaxsz(
+ void *arg, const void *v, size_t sz, nni_opt_type t)
+{
+ ipctran_dialer *d = arg;
+ size_t val;
+ int rv;
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ d->rcvmax = val;
+ nni_mtx_unlock(&d->mtx);
+ }
return (rv);
}
static void
-ipc_ep_finish(ipc_ep *ep)
+ipctran_listener_fini(void *arg)
{
- nni_aio * aio;
- int rv;
- ipc_pipe *pipe = NULL;
+ ipctran_listener *l = arg;
- if ((rv = nni_aio_result(ep->aio)) != 0) {
- goto done;
+ nni_aio_stop(l->aio);
+ if (l->listener != NULL) {
+ nni_ipc_listener_fini(l->listener);
}
- NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL);
-
- // Attempt to allocate the parent pipe. If this fails we'll
- // drop the connection (ENOMEM probably).
- rv = ipc_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0));
+ nni_aio_fini(l->aio);
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
-done:
- aio = ep->user_aio;
- ep->user_aio = NULL;
+static int
+ipctran_listener_init(void **lp, nni_url *url, nni_sock *sock)
+{
+ ipctran_listener *l;
+ int rv;
+ size_t sz;
- if ((aio != NULL) && (rv == 0)) {
- NNI_ASSERT(pipe != NULL);
- nni_aio_set_output(aio, 0, pipe);
- nni_aio_finish(aio, 0, 0);
- return;
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
}
+ nni_mtx_init(&l->mtx);
+
+ sz = sizeof(l->sa.s_ipc.sa_path);
+ l->sa.s_ipc.sa_family = NNG_AF_IPC;
- if (pipe != NULL) {
- ipc_pipe_fini(pipe);
+ if (nni_strlcpy(l->sa.s_ipc.sa_path, url->u_path, sz) >= sz) {
+ ipctran_listener_fini(l);
+ return (NNG_EADDRINVAL);
}
- if (aio != NULL) {
- NNI_ASSERT(rv != 0);
- nni_aio_finish_error(aio, rv);
+
+ if (((rv = nni_ipc_listener_init(&l->listener)) != 0) ||
+ ((rv = nni_aio_init(&l->aio, ipctran_listener_cb, l)) != 0)) {
+ ipctran_listener_fini(l);
+ return (rv);
}
+
+ l->proto = nni_sock_proto_id(sock);
+
+ *lp = l;
+ return (0);
}
static void
-ipc_ep_cb(void *arg)
+ipctran_listener_close(void *arg)
{
- ipc_ep *ep = arg;
+ ipctran_listener *l = arg;
- nni_mtx_lock(&ep->mtx);
- ipc_ep_finish(ep);
- nni_mtx_unlock(&ep->mtx);
+ nni_aio_close(l->aio);
+ nni_ipc_listener_close(l->listener);
}
-static void
-ipc_cancel_ep(nni_aio *aio, int rv)
+static int
+ipctran_listener_bind(void *arg)
{
- ipc_ep *ep = nni_aio_get_prov_data(aio);
+ ipctran_listener *l = arg;
+ int rv;
- NNI_ASSERT(rv != 0);
- nni_mtx_lock(&ep->mtx);
- if (ep->user_aio != aio) {
- nni_mtx_unlock(&ep->mtx);
- return;
- }
- ep->user_aio = NULL;
- nni_mtx_unlock(&ep->mtx);
-
- nni_aio_abort(ep->aio, rv);
- nni_aio_finish_error(aio, rv);
+ nni_mtx_lock(&l->mtx);
+ rv = nni_ipc_listener_listen(l->listener, &l->sa);
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
}
static void
-ipc_ep_accept(void *arg, nni_aio *aio)
+ipctran_listener_cb(void *arg)
{
- ipc_ep *ep = arg;
- int rv;
+ ipctran_listener *l = arg;
+ nni_aio * aio;
+ int rv;
+ ipctran_pipe * p = NULL;
+ nni_ipc_conn * conn;
+
+ nni_mtx_lock(&l->mtx);
+ rv = nni_aio_result(l->aio);
+ aio = l->user_aio;
+ l->user_aio = NULL;
+
+ if (aio == NULL) {
+ nni_mtx_unlock(&l->mtx);
+ if (rv == 0) {
+ conn = nni_aio_get_output(l->aio, 0);
+ nni_ipc_conn_fini(conn);
+ }
+ return;
+ }
- if (nni_aio_begin(aio) != 0) {
+ if (rv != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_aio_finish_error(aio, rv);
return;
}
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- if ((rv = nni_aio_schedule(aio, ipc_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ conn = nni_aio_get_output(l->aio, 0);
+ NNI_ASSERT(conn != NULL);
+
+ // Attempt to allocate the parent pipe. If this fails we'll
+ // drop the connection (ENOMEM probably).
+ if ((rv = ipctran_pipe_init(&p, conn)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ nni_ipc_conn_fini(conn);
nni_aio_finish_error(aio, rv);
return;
}
- ep->user_aio = aio;
+ p->proto = l->proto;
+ p->rcvmax = l->rcvmax;
+ p->sa = l->sa;
+ nni_mtx_unlock(&l->mtx);
+
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+ipctran_listener_cancel(nni_aio *aio, int rv)
+{
+ ipctran_listener *l = nni_aio_get_prov_data(aio);
+
+ NNI_ASSERT(rv != 0);
+ nni_mtx_lock(&l->mtx);
+ if (l->user_aio != aio) {
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ l->user_aio = NULL;
+ nni_mtx_unlock(&l->mtx);
- nni_plat_ipc_ep_accept(ep->iep, ep->aio);
- nni_mtx_unlock(&ep->mtx);
+ nni_aio_abort(l->aio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
-ipc_ep_connect(void *arg, nni_aio *aio)
+ipctran_listener_accept(void *arg, nni_aio *aio)
{
- ipc_ep *ep = arg;
- int rv;
+ ipctran_listener *l = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
+ nni_mtx_lock(&l->mtx);
+ NNI_ASSERT(l->user_aio == NULL);
- if ((rv = nni_aio_schedule(aio, ipc_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if ((rv = nni_aio_schedule(aio, ipctran_listener_cancel, l)) != 0) {
+ nni_mtx_unlock(&l->mtx);
nni_aio_finish_error(aio, rv);
return;
}
- ep->user_aio = aio;
+ l->user_aio = aio;
- nni_plat_ipc_ep_connect(ep->iep, ep->aio);
- nni_mtx_unlock(&ep->mtx);
+ nni_ipc_listener_accept(l->listener, l->aio);
+ nni_mtx_unlock(&l->mtx);
}
static int
-ipc_ep_set_recvmaxsz(void *arg, const void *data, size_t sz, nni_opt_type t)
+ipctran_listener_set_recvmaxsz(
+ void *arg, const void *data, size_t sz, nni_opt_type t)
{
- ipc_ep *ep = arg;
- size_t val;
- int rv;
+ ipctran_listener *l = arg;
+ size_t val;
+ int rv;
if ((rv = nni_copyin_size(&val, data, sz, 0, NNI_MAXSZ, t)) == 0) {
- nni_mtx_lock(&ep->mtx);
- ep->rcvmax = val;
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_lock(&l->mtx);
+ l->rcvmax = val;
+ nni_mtx_unlock(&l->mtx);
}
return (rv);
}
static int
-ipc_ep_chk_recvmaxsz(const void *data, size_t sz, nni_opt_type t)
+ipctran_listener_get_recvmaxsz(
+ void *arg, void *data, size_t *szp, nni_opt_type t)
{
- return (nni_copyin_size(NULL, data, sz, 0, NNI_MAXSZ, t));
+ ipctran_listener *l = arg;
+ int rv;
+ nni_mtx_lock(&l->mtx);
+ rv = nni_copyout_size(l->rcvmax, data, szp, t);
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
}
static int
-ipc_ep_get_recvmaxsz(void *arg, void *data, size_t *szp, nni_opt_type t)
+ipctran_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
- ipc_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- rv = nni_copyout_size(ep->rcvmax, data, szp, t);
- nni_mtx_unlock(&ep->mtx);
+ ipctran_listener *l = arg;
+ int rv;
+
+ nni_mtx_lock(&l->mtx);
+ rv = nni_copyout_sockaddr(&l->sa, buf, szp, t);
+ nni_mtx_unlock(&l->mtx);
return (rv);
}
static int
-ipc_ep_get_addr(void *arg, void *data, size_t *szp, nni_opt_type t)
+ipctran_check_recvmaxsz(const void *data, size_t sz, nni_opt_type t)
{
- ipc_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- rv = nni_copyout_sockaddr(&ep->sa, data, szp, t);
- nni_mtx_unlock(&ep->mtx);
- return (rv);
+ return (nni_copyin_size(NULL, data, sz, 0, NNI_MAXSZ, t));
}
static int
-ipc_ep_set_perms(void *arg, const void *data, size_t sz, nni_opt_type t)
+ipctran_listener_set_perms(
+ void *arg, const void *data, size_t sz, nni_opt_type t)
{
- ipc_ep *ep = arg;
- int val;
- int rv;
+ ipctran_listener *l = arg;
+ int val;
+ int rv;
// Probably we could further limit this -- most systems don't have
// meaningful chmod beyond the lower 9 bits.
if ((rv = nni_copyin_int(&val, data, sz, 0, 0x7FFFFFFF, t)) == 0) {
- nni_mtx_lock(&ep->mtx);
- rv = nni_plat_ipc_ep_set_permissions(ep->iep, val);
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_lock(&l->mtx);
+ rv = nni_ipc_listener_set_permissions(l->listener, val);
+ nni_mtx_unlock(&l->mtx);
}
return (rv);
}
static int
-ipc_ep_chk_perms(const void *data, size_t sz, nni_opt_type t)
+ipctran_check_perms(const void *data, size_t sz, nni_opt_type t)
{
return (nni_copyin_int(NULL, data, sz, 0, 0x7FFFFFFF, t));
}
static int
-ipc_ep_set_sec_desc(void *arg, const void *data, size_t sz, nni_opt_type t)
+ipctran_listener_set_sec_desc(
+ void *arg, const void *data, size_t sz, nni_opt_type t)
{
- ipc_ep *ep = arg;
- void * ptr;
- int rv;
+ ipctran_listener *l = arg;
+ void * ptr;
+ int rv;
if ((rv = nni_copyin_ptr(&ptr, data, sz, t)) == 0) {
- nni_mtx_lock(&ep->mtx);
- rv = nni_plat_ipc_ep_set_security_descriptor(ep->iep, ptr);
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_lock(&l->mtx);
+ rv =
+ nni_ipc_listener_set_security_descriptor(l->listener, ptr);
+ nni_mtx_unlock(&l->mtx);
}
return (rv);
}
static int
-ipc_ep_chk_sec_desc(const void *data, size_t sz, nni_opt_type t)
+ipctran_check_sec_desc(const void *data, size_t sz, nni_opt_type t)
{
return (nni_copyin_ptr(NULL, data, sz, t));
}
-static nni_tran_option ipc_pipe_options[] = {
+static nni_tran_option ipctran_pipe_options[] = {
{
.o_name = NNG_OPT_REMADDR,
.o_type = NNI_TYPE_SOCKADDR,
- .o_get = ipc_pipe_get_addr,
+ .o_get = ipctran_pipe_get_addr,
},
{
.o_name = NNG_OPT_LOCADDR,
.o_type = NNI_TYPE_SOCKADDR,
- .o_get = ipc_pipe_get_addr,
+ .o_get = ipctran_pipe_get_addr,
},
{
.o_name = NNG_OPT_IPC_PEER_UID,
.o_type = NNI_TYPE_UINT64,
- .o_get = ipc_pipe_get_peer_uid,
+ .o_get = ipctran_pipe_get_peer_uid,
},
{
.o_name = NNG_OPT_IPC_PEER_GID,
.o_type = NNI_TYPE_UINT64,
- .o_get = ipc_pipe_get_peer_gid,
+ .o_get = ipctran_pipe_get_peer_gid,
},
{
.o_name = NNG_OPT_IPC_PEER_PID,
.o_type = NNI_TYPE_UINT64,
- .o_get = ipc_pipe_get_peer_pid,
+ .o_get = ipctran_pipe_get_peer_pid,
},
{
.o_name = NNG_OPT_IPC_PEER_ZONEID,
.o_type = NNI_TYPE_UINT64,
- .o_get = ipc_pipe_get_peer_zoneid,
+ .o_get = ipctran_pipe_get_peer_zoneid,
},
// terminate list
{
@@ -926,29 +1083,24 @@ static nni_tran_option ipc_pipe_options[] = {
},
};
-static nni_tran_pipe_ops ipc_pipe_ops = {
- .p_fini = ipc_pipe_fini,
- .p_start = ipc_pipe_start,
- .p_stop = ipc_pipe_stop,
- .p_send = ipc_pipe_send,
- .p_recv = ipc_pipe_recv,
- .p_close = ipc_pipe_close,
- .p_peer = ipc_pipe_peer,
- .p_options = ipc_pipe_options,
+static nni_tran_pipe_ops ipctran_pipe_ops = {
+ .p_fini = ipctran_pipe_fini,
+ .p_start = ipctran_pipe_start,
+ .p_stop = ipctran_pipe_stop,
+ .p_send = ipctran_pipe_send,
+ .p_recv = ipctran_pipe_recv,
+ .p_close = ipctran_pipe_close,
+ .p_peer = ipctran_pipe_peer,
+ .p_options = ipctran_pipe_options,
};
-static nni_tran_option ipc_dialer_options[] = {
+static nni_tran_option ipctran_dialer_options[] = {
{
.o_name = NNG_OPT_RECVMAXSZ,
.o_type = NNI_TYPE_SIZE,
- .o_get = ipc_ep_get_recvmaxsz,
- .o_set = ipc_ep_set_recvmaxsz,
- .o_chk = ipc_ep_chk_recvmaxsz,
- },
- {
- .o_name = NNG_OPT_LOCADDR,
- .o_type = NNI_TYPE_SOCKADDR,
- .o_get = ipc_ep_get_addr,
+ .o_get = ipctran_dialer_get_recvmaxsz,
+ .o_set = ipctran_dialer_set_recvmaxsz,
+ .o_chk = ipctran_check_recvmaxsz,
},
// terminate list
{
@@ -956,32 +1108,32 @@ static nni_tran_option ipc_dialer_options[] = {
},
};
-static nni_tran_option ipc_listener_options[] = {
+static nni_tran_option ipctran_listener_options[] = {
{
.o_name = NNG_OPT_RECVMAXSZ,
.o_type = NNI_TYPE_SIZE,
- .o_get = ipc_ep_get_recvmaxsz,
- .o_set = ipc_ep_set_recvmaxsz,
- .o_chk = ipc_ep_chk_recvmaxsz,
+ .o_get = ipctran_listener_get_recvmaxsz,
+ .o_set = ipctran_listener_set_recvmaxsz,
+ .o_chk = ipctran_check_recvmaxsz,
},
{
.o_name = NNG_OPT_LOCADDR,
.o_type = NNI_TYPE_SOCKADDR,
- .o_get = ipc_ep_get_addr,
+ .o_get = ipctran_listener_get_locaddr,
},
{
.o_name = NNG_OPT_IPC_SECURITY_DESCRIPTOR,
.o_type = NNI_TYPE_POINTER,
.o_get = NULL,
- .o_set = ipc_ep_set_sec_desc,
- .o_chk = ipc_ep_chk_sec_desc,
+ .o_set = ipctran_listener_set_sec_desc,
+ .o_chk = ipctran_check_sec_desc,
},
{
.o_name = NNG_OPT_IPC_PERMISSIONS,
.o_type = NNI_TYPE_INT32,
.o_get = NULL,
- .o_set = ipc_ep_set_perms,
- .o_chk = ipc_ep_chk_perms,
+ .o_set = ipctran_listener_set_perms,
+ .o_chk = ipctran_check_perms,
},
// terminate list
{
@@ -989,31 +1141,31 @@ static nni_tran_option ipc_listener_options[] = {
},
};
-static nni_tran_dialer_ops ipc_dialer_ops = {
- .d_init = ipc_dialer_init,
- .d_fini = ipc_ep_fini,
- .d_connect = ipc_ep_connect,
- .d_close = ipc_ep_close,
- .d_options = ipc_dialer_options,
+static nni_tran_dialer_ops ipctran_dialer_ops = {
+ .d_init = ipctran_dialer_init,
+ .d_fini = ipctran_dialer_fini,
+ .d_connect = ipctran_dialer_connect,
+ .d_close = ipctran_dialer_close,
+ .d_options = ipctran_dialer_options,
};
-static nni_tran_listener_ops ipc_listener_ops = {
- .l_init = ipc_listener_init,
- .l_fini = ipc_ep_fini,
- .l_bind = ipc_ep_bind,
- .l_accept = ipc_ep_accept,
- .l_close = ipc_ep_close,
- .l_options = ipc_listener_options,
+static nni_tran_listener_ops ipctran_listener_ops = {
+ .l_init = ipctran_listener_init,
+ .l_fini = ipctran_listener_fini,
+ .l_bind = ipctran_listener_bind,
+ .l_accept = ipctran_listener_accept,
+ .l_close = ipctran_listener_close,
+ .l_options = ipctran_listener_options,
};
static nni_tran ipc_tran = {
.tran_version = NNI_TRANSPORT_VERSION,
.tran_scheme = "ipc",
- .tran_dialer = &ipc_dialer_ops,
- .tran_listener = &ipc_listener_ops,
- .tran_pipe = &ipc_pipe_ops,
- .tran_init = ipc_tran_init,
- .tran_fini = ipc_tran_fini,
+ .tran_dialer = &ipctran_dialer_ops,
+ .tran_listener = &ipctran_listener_ops,
+ .tran_pipe = &ipctran_pipe_ops,
+ .tran_init = ipctran_init,
+ .tran_fini = ipctran_fini,
};
int
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index c323da29..0d47529e 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -129,6 +129,7 @@ tcptran_pipe_fini(void *arg)
nni_tcp_conn_fini(p->conn);
}
nni_msg_free(p->rxmsg);
+ nni_mtx_fini(&p->mtx);
NNI_FREE_STRUCT(p);
}
@@ -780,7 +781,12 @@ static int
tcptran_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
{
tcptran_dialer *d = arg;
- return (nni_copyout_size(d->rcvmax, v, szp, t));
+ int rv;
+
+ nni_mtx_lock(&d->mtx);
+ rv = nni_copyout_size(d->rcvmax, v, szp, t);
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
}
static int
@@ -922,15 +928,12 @@ tcptran_listener_init(void **lp, nni_url *url, nni_sock *sock)
return (rv);
}
- if ((rv = nni_tcp_listener_init(&l->listener)) != 0) {
+ if (((rv = nni_tcp_listener_init(&l->listener)) != 0) ||
+ ((rv = nni_aio_init(&l->aio, tcptran_listener_cb, l)) != 0)) {
tcptran_listener_fini(l);
return (rv);
}
- if ((rv = nni_aio_init(&l->aio, tcptran_listener_cb, l)) != 0) {
- tcptran_listener_fini(l);
- return (rv);
- }
l->proto = nni_sock_proto_id(sock);
l->nodelay = true;
l->keepalive = false;
@@ -955,9 +958,9 @@ tcptran_listener_bind(void *arg)
tcptran_listener *l = arg;
int rv;
- l->bsa = l->sa;
nni_mtx_lock(&l->mtx);
- rv = nni_tcp_listener_listen(l->listener, &l->bsa);
+ l->bsa = l->sa;
+ rv = nni_tcp_listener_listen(l->listener, &l->bsa);
nni_mtx_unlock(&l->mtx);
return (rv);
@@ -1139,7 +1142,24 @@ static int
tcptran_listener_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
{
tcptran_listener *l = arg;
- return (nni_copyout_size(l->rcvmax, v, szp, t));
+ int rv;
+
+ nni_mtx_lock(&l->mtx);
+ rv = nni_copyout_size(l->rcvmax, v, szp, t);
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+}
+
+static int
+tcptran_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ tcptran_listener *l = arg;
+ int rv;
+
+ nni_mtx_lock(&l->mtx);
+ rv = nni_copyout_sockaddr(&l->bsa, buf, szp, t);
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
}
static int
@@ -1234,6 +1254,11 @@ static nni_tran_option tcptran_listener_options[] = {
.o_chk = tcptran_check_recvmaxsz,
},
{
+ .o_name = NNG_OPT_LOCADDR,
+ .o_type = NNI_TYPE_SOCKADDR,
+ .o_get = tcptran_listener_get_locaddr,
+ },
+ {
.o_name = NNG_OPT_URL,
.o_type = NNI_TYPE_STRING,
.o_get = tcptran_listener_get_url,
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index b1bdddb8..e6701f5f 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -913,9 +913,9 @@ tlstran_listener_bind(void *arg)
tlstran_listener *l = arg;
int rv;
- l->bsa = l->sa;
nni_mtx_lock(&l->ep.mtx);
- rv = nni_tcp_listener_listen(l->listener, &l->bsa);
+ l->bsa = l->sa;
+ rv = nni_tcp_listener_listen(l->listener, &l->bsa);
nni_mtx_unlock(&l->ep.mtx);
return (rv);
@@ -1122,6 +1122,18 @@ tlstran_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
}
static int
+tlstran_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ tlstran_listener *l = arg;
+ int rv;
+
+ nni_mtx_lock(&l->ep.mtx);
+ rv = nni_copyout_sockaddr(&l->bsa, buf, szp, t);
+ nni_mtx_unlock(&l->ep.mtx);
+ return (rv);
+}
+
+static int
tlstran_check_recvmaxsz(const void *v, size_t sz, nni_opt_type t)
{
return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t));
@@ -1381,6 +1393,11 @@ static nni_tran_option tlstran_listener_options[] = {
.o_get = tlstran_listener_get_url,
},
{
+ .o_name = NNG_OPT_LOCADDR,
+ .o_type = NNI_TYPE_SOCKADDR,
+ .o_get = tlstran_listener_get_locaddr,
+ },
+ {
.o_name = NNG_OPT_TLS_CONFIG,
.o_type = NNI_TYPE_POINTER,
.o_get = tlstran_ep_get_config,