diff options
Diffstat (limited to 'src')
27 files changed, 2913 insertions, 2566 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 11c7d82e..ed96b6bc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -87,19 +87,20 @@ set (NNG_SOURCES if (NNG_PLATFORM_POSIX) set (NNG_SOURCES ${NNG_SOURCES} platform/posix/posix_impl.h + platform/posix/posix_ipc.h platform/posix/posix_config.h - platform/posix/posix_aio.h platform/posix/posix_pollq.h + platform/posix/posix_tcp.h platform/posix/posix_alloc.c platform/posix/posix_atomic.c platform/posix/posix_clock.c platform/posix/posix_debug.c - platform/posix/posix_epdesc.c platform/posix/posix_file.c - platform/posix/posix_ipc.c + platform/posix/posix_ipcconn.c + platform/posix/posix_ipcdial.c + platform/posix/posix_ipclisten.c platform/posix/posix_pipe.c - platform/posix/posix_pipedesc.c platform/posix/posix_rand.c platform/posix/posix_resolv_gai.c platform/posix/posix_sockaddr.c @@ -132,12 +133,17 @@ endif() if (NNG_PLATFORM_WINDOWS) set (NNG_SOURCES ${NNG_SOURCES} platform/windows/win_impl.h + platform/windows/win_ipc.h + platform/windows/win_tcp.h + platform/windows/win_clock.c platform/windows/win_debug.c platform/windows/win_file.c platform/windows/win_io.c platform/windows/win_iocp.c - platform/windows/win_ipc.c + platform/windows/win_ipcconn.c + platform/windows/win_ipcdial.c + platform/windows/win_ipclisten.c platform/windows/win_pipe.c platform/windows/win_rand.c platform/windows/win_resolv.c diff --git a/src/core/platform.h b/src/core/platform.h index 5045b172..2ae0fd0b 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -215,7 +215,7 @@ typedef struct nni_tcp_listener nni_tcp_listener; extern void nni_tcp_conn_fini(nni_tcp_conn *); -// nni_tcp_dialer_close closes the dialer, which might actually be +// nni_tcp_conn_close closes the connection, which might actually be // implemented as a shutdown() call. // Further operations on it should return NNG_ECLOSED. extern void nni_tcp_conn_close(nni_tcp_conn *); @@ -248,7 +248,7 @@ extern int nni_tcp_conn_set_nodelay(nni_tcp_conn *, bool); // keepalive probes. Tuning of these keepalives is current unsupported. extern int nni_tcp_conn_set_keepalive(nni_tcp_conn *, bool); -// nni_tcp_listener_init creates a new dialer object. +// nni_tcp_dialer_init creates a new dialer object. extern int nni_tcp_dialer_init(nni_tcp_dialer **); // nni_tcp_dialer_fini finalizes the dialer, closing it and freeing @@ -311,81 +311,102 @@ extern void nni_udp_resolv(const char *, const char *, int, int, nni_aio *); // IPC (UNIX Domain Sockets & Named Pipes) Support. // -typedef struct nni_plat_ipc_ep nni_plat_ipc_ep; -typedef struct nni_plat_ipc_pipe nni_plat_ipc_pipe; +typedef struct nni_ipc_conn nni_ipc_conn; +typedef struct nni_ipc_dialer nni_ipc_dialer; +typedef struct nni_ipc_listener nni_ipc_listener; -// nni_plat_ipc_ep_init creates a new endpoint associated with the url. -// The final field is the mode, either for dialing (NNI_EP_MODE_DIAL) or -// listening (NNI_EP_MODE_LISTEN). -extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const nni_sockaddr *, int); +// nni_ipc_conn_fini disposes of the connection. +extern void nni_ipc_conn_fini(nni_ipc_conn *); -// nni_plat_ipc_ep_fini closes the endpoint and releases resources. -extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *); +// nni_ipc_conn_close closes the connection, which might actually be +// implemented as a shutdown() call. +// Further operations on it should return NNG_ECLOSED. +extern void nni_ipc_conn_close(nni_ipc_conn *); -// nni_plat_ipc_ep_close closes the endpoint; this might not close the -// actual underlying socket, but it should call shutdown on it. -// Further operations on the pipe should return NNG_ECLOSED. -extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *); +// nni_ipc_conn_send sends data in the iov buffers to the peer. +// The platform may modify the iovs. +extern void nni_ipc_conn_send(nni_ipc_conn *, nni_aio *); -// nni_plat_ipc_listen creates an IPC socket in listening mode, bound -// to the specified path. -extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *); +// nni_ipc_conn_recv receives data into the buffers provided by the +// I/O vector (iovs). The platform should attempt to scatter the received +// data into the iovs if possible. +// +// It is possible for the reader to return less data than is requested, +// in which case the caller is responsible for resubmitting. The platform +// must not return "zero" data however. (It is an error to attempt to +// receive zero bytes.) The platform may modify the iovs. +extern void nni_ipc_conn_recv(nni_ipc_conn *, nni_aio *); -// nni_plat_ipc_ep_accept starts an accept to receive an incoming connection. -// An accepted connection will be passed back in the a_pipe member. -extern void nni_plat_ipc_ep_accept(nni_plat_ipc_ep *, nni_aio *); +// nni_ipc_conn_get_peer_uid obtains the peer user id, if possible. +// NB: Only POSIX systems support user IDs. +extern int nni_ipc_conn_get_peer_uid(nni_ipc_conn *, uint64_t *); -// nni_plat_ipc_connect is the client side. -// An accepted connection will be passed back in the a_pipe member. -extern void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *, nni_aio *); +// nni_ipc_conn_get_peer_gid obtains the peer group id, if possible. +// NB: Only POSIX systems support group IDs. +extern int nni_ipc_conn_get_peer_gid(nni_ipc_conn *, uint64_t *); -// nni_plat_ipc_ep_set_security_descriptor sets the Windows security -// descriptor. This is *only* supported for Windows platforms. All -// others return NNG_ENOTSUP. The void argument is a pointer to -// a SECURITY_DESCRIPTOR object, and must be valid. -extern int nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *, void *); +// nni_ipc_conn_get_peer_pid obtains the peer process id, if possible. +extern int nni_ipc_conn_get_peer_pid(nni_ipc_conn *, uint64_t *); -// nni_plat_ipc_ep_set_permissions sets UNIX style permissions -// on the named pipes. This basically just does a chmod() on the -// named pipe, and is only supported o the server side, and only on -// systems that support this (POSIX, not Windows). Note that changing -// ownership is not supported at this time. Most systems use only -// 16-bits, the lower 12 of which are user, group, and other, e.g. -// 0640 gives read/write access to user, read to group, and prevents -// any other user from accessing it. This option only has meaning -// for listeners, on dialers it is ignored. -extern int nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *, uint32_t); +// nni_ipc_conn_get_peer_zoneid obtains the peer zone id, if possible. +// NB: Only illumos & SunOS systems have the notion of "zones". +extern int nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *, uint64_t *); + +// nni_ipc_dialer_init creates a new dialer object. +extern int nni_ipc_dialer_init(nni_ipc_dialer **); -// nni_plat_ipc_pipe_fini closes the pipe, and releases all resources -// associated with it. -extern void nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *); +// nni_ipc_dialer_fini finalizes the dialer, closing it and freeing +// all resources. +extern void nni_ipc_dialer_fini(nni_ipc_dialer *); -// nni_plat_ipc_pipe_close closes the socket, or at least shuts it down. -// Further operations on the pipe should return NNG_ECLOSED. -extern void nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *); +// nni_ipc_dialer_close closes the dialer. +// Further operations on it should return NNG_ECLOSED. Any in-progress +// connection will be aborted. +extern void nni_ipc_dialer_close(nni_ipc_dialer *); -// nni_plat_ipc_pipe_send sends data in the iov buffers to the peer. -// The platform may modify the iovs. -extern void nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *, nni_aio *); +// nni_ipc_dialer_dial attempts to create an outgoing connection, +// asynchronously, to the address specified. On success, the first (and only) +// output will be an nni_ipc_conn * associated with the remote server. +extern void nni_ipc_dialer_dial( + nni_ipc_dialer *, const nni_sockaddr *, nni_aio *); -// nni_plat_ipc_pipe_recv recvs data into the buffers provided by the iovs. -// The platform may modify the iovs. -extern void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *, nni_aio *); +// nni_ipc_listener_init creates a new listener object, unbound. +extern int nni_ipc_listener_init(nni_ipc_listener **); -// nni_plat_ipc_pipe_get_peer_uid obtains the peer user id, if possible. -// NB: Only POSIX systems support user IDs. -extern int nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *, uint64_t *); +// nni_ipc_listener_fini frees the listener and all associated resources. +// It implictly closes the listener as well. +extern void nni_ipc_listener_fini(nni_ipc_listener *); -// nni_plat_ipc_pipe_get_peer_gid obtains the peer group id, if possible. -// NB: Only POSIX systems support group IDs. -extern int nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *, uint64_t *); +// nni_ipc_listener_close closes the listener. This will unbind +// any bound socket, and further operations will result in NNG_ECLOSED. +extern void nni_ipc_listener_close(nni_ipc_listener *); -// nni_plat_ipc_pipe_get_peer_pid obtains the peer process id, if possible. -extern int nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *, uint64_t *); +// nni_ipc_listener_listen creates the socket in listening mode, bound +// to the specified address. Unlike TCP, this address does not change. +extern int nni_ipc_listener_listen(nni_ipc_listener *, const nni_sockaddr *); -// nni_plat_ipc_pipe_get_peer_zoneid obtains the peer zone id, if possible. -// NB: Only illumos & SunOS systems have the notion of "zones". -extern int nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *, uint64_t *); +// nni_ipc_listener_accept accepts in incoming connect, asynchronously. +// On success, the first (and only) output will be an nni_ipc_conn * +// associated with the remote peer. +extern void nni_ipc_listener_accept(nni_ipc_listener *, nni_aio *); + +// nni_ipc_listener_set_permissions sets UNIX style permissions +// on the named pipes. This basically just does a chmod() on the +// named pipe, and is only supported o the server side, and only on +// systems that support this (POSIX, not Windows). Note that changing +// ownership is not supported at this time. Most systems use only +// 16-bits, the lower 12 of which are user, group, and other, e.g. +// 0640 gives read/write access to user, read to group, and prevents +// any other user from accessing it. On platforms without this support, +// ENOTSUP is returned. +extern int nni_ipc_listener_set_permissions(nni_ipc_listener *, int); + +// nni_ipc_listener_set_security_descriptor sets the Windows security +// descriptor. This is *only* supported for Windows platforms. All +// others return NNG_ENOTSUP. The void argument is a pointer to +// a SECURITY_DESCRIPTOR object, and must be valid. +extern int nni_ipc_listener_set_security_descriptor( + nni_ipc_listener *, void *); // // UDP support. UDP is not connection oriented, and only has the notion diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c deleted file mode 100644 index d796765a..00000000 --- a/src/platform/posix/posix_epdesc.c +++ /dev/null @@ -1,597 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_pollq.h" - -#include <errno.h> -#include <fcntl.h> -#include <netdb.h> -#include <poll.h> -#include <stdbool.h> -#include <stdlib.h> -#include <string.h> -#include <sys/socket.h> -#include <sys/stat.h> -#include <sys/types.h> -#include <sys/uio.h> -#include <sys/un.h> -#include <unistd.h> - -#ifdef sun -#undef sun -#endif - -#ifdef SOCK_CLOEXEC -#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) -#else -#define NNI_STREAM_SOCKTYPE SOCK_STREAM -#endif - -struct nni_posix_epdesc { - nni_posix_pfd * pfd; - nni_list connectq; - nni_list acceptq; - bool closed; - bool started; - bool ipcbound; // if true unlink socket on exit - struct sockaddr_storage locaddr; - struct sockaddr_storage remaddr; - socklen_t loclen; - socklen_t remlen; - mode_t perms; // UNIX sockets only - int mode; // end point mode (dialer/listener) - nni_mtx mtx; -}; - -static void nni_epdesc_connect_cb(nni_posix_pfd *, int, void *); -static void nni_epdesc_accept_cb(nni_posix_pfd *, int, void *); - -static void -nni_epdesc_cancel(nni_aio *aio, int rv) -{ - nni_posix_epdesc *ed = nni_aio_get_prov_data(aio); - nni_posix_pfd * pfd = NULL; - - NNI_ASSERT(rv != 0); - nni_mtx_lock(&ed->mtx); - if (nni_aio_list_active(aio)) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - } - if ((ed->mode == NNI_EP_MODE_DIAL) && nni_list_empty(&ed->connectq) && - ((pfd = ed->pfd) != NULL)) { - nni_posix_pfd_close(pfd); - } - nni_mtx_unlock(&ed->mtx); -} - -static void -nni_epdesc_finish(nni_aio *aio, int rv, nni_posix_pfd *newpfd) -{ - nni_posix_pipedesc *pd = NULL; - - // acceptq or connectq. - nni_aio_list_remove(aio); - - if (rv != 0) { - NNI_ASSERT(newpfd == NULL); - nni_aio_finish_error(aio, rv); - return; - } - - NNI_ASSERT(newpfd != NULL); - if ((rv = nni_posix_pipedesc_init(&pd, newpfd)) != 0) { - nni_posix_pfd_fini(newpfd); - nni_aio_finish_error(aio, rv); - return; - } - nni_aio_set_output(aio, 0, pd); - nni_aio_finish(aio, 0, 0); -} - -static void -nni_epdesc_doaccept(nni_posix_epdesc *ed) -{ - nni_aio *aio; - - while ((aio = nni_list_first(&ed->acceptq)) != NULL) { - int newfd; - int fd; - int rv; - nni_posix_pfd *pfd; - - fd = nni_posix_pfd_fd(ed->pfd); - -#ifdef NNG_USE_ACCEPT4 - newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); - if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { - newfd = accept(fd, NULL, NULL); - } -#else - newfd = accept(fd, NULL, NULL); -#endif - if (newfd < 0) { - switch (errno) { - case EAGAIN: -#ifdef EWOULDBLOCK -#if EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif -#endif - rv = nni_posix_pfd_arm(ed->pfd, POLLIN); - if (rv != 0) { - nni_epdesc_finish(aio, rv, NULL); - continue; - } - // Come back later... - return; - case ECONNABORTED: - case ECONNRESET: - // Eat them, they aren't interesting. - continue; - default: - // Error this one, but keep moving to the next. - rv = nni_plat_errno(errno); - nni_epdesc_finish(aio, rv, NULL); - continue; - } - } - - if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { - close(newfd); - nni_epdesc_finish(aio, rv, NULL); - continue; - } - - nni_epdesc_finish(aio, 0, pfd); - } -} - -static void -nni_epdesc_doclose(nni_posix_epdesc *ed) -{ - nni_aio *aio; - - ed->closed = true; - while ((aio = nni_list_first(&ed->acceptq)) != NULL) { - nni_epdesc_finish(aio, NNG_ECLOSED, 0); - } - while ((aio = nni_list_first(&ed->connectq)) != NULL) { - nni_epdesc_finish(aio, NNG_ECLOSED, 0); - } - - if (ed->pfd != NULL) { - - nni_posix_pfd_close(ed->pfd); - } - - // clean up stale UNIX socket when closing the server. - if (ed->ipcbound) { - struct sockaddr_un *sun = (void *) &ed->locaddr; - (void) unlink(sun->sun_path); - } -} - -static void -nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg) -{ - nni_posix_epdesc *ed = arg; - - NNI_ARG_UNUSED(pfd); - - nni_mtx_lock(&ed->mtx); - if (events & POLLNVAL) { - nni_epdesc_doclose(ed); - nni_mtx_unlock(&ed->mtx); - return; - } - - // Anything else will turn up in accept. - nni_epdesc_doaccept(ed); - nni_mtx_unlock(&ed->mtx); -} - -void -nni_posix_epdesc_close(nni_posix_epdesc *ed) -{ - nni_mtx_lock(&ed->mtx); - nni_epdesc_doclose(ed); - nni_mtx_unlock(&ed->mtx); -} - -int -nni_posix_epdesc_listen(nni_posix_epdesc *ed) -{ - int len; - struct sockaddr_storage *ss; - int rv; - int fd; - nni_posix_pfd * pfd; - - nni_mtx_lock(&ed->mtx); - - if (ed->started) { - nni_mtx_unlock(&ed->mtx); - return (NNG_ESTATE); - } - if (ed->closed) { - nni_mtx_unlock(&ed->mtx); - return (NNG_ECLOSED); - } - if ((len = ed->loclen) == 0) { - nni_mtx_unlock(&ed->mtx); - return (NNG_EADDRINVAL); - } - - ss = &ed->locaddr; - len = ed->loclen; - - if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { - nni_mtx_unlock(&ed->mtx); - return (nni_plat_errno(errno)); - } - - if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { - nni_mtx_unlock(&ed->mtx); - nni_posix_pfd_fini(pfd); - return (rv); - } - -#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL) - if (ss->ss_family != AF_UNIX) { - int on = 1; - // If for some reason this doesn't work, it's probably ok. - // Second bind will fail. - (void) setsockopt( - fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); - } -#endif - - if (bind(fd, (struct sockaddr *) ss, len) < 0) { - rv = nni_plat_errno(errno); - nni_mtx_unlock(&ed->mtx); - nni_posix_pfd_fini(pfd); - return (rv); - } - - // For UNIX domain sockets, optionally set the permission bits. - // This is done after the bind and before listen, and on the file - // rather than the file descriptor. - // Experiments have shown that chmod() works correctly, provided that - // it is done *before* the listen() operation, whereas fchmod seems to - // have no impact. This behavior was observed on both macOS and Linux. - // YMMV on other platforms. - if (ss->ss_family == AF_UNIX) { - ed->ipcbound = true; - if (ed->perms != 0) { - struct sockaddr_un *sun = (void *) ss; - mode_t perms = ed->perms & ~(S_IFMT); - if ((rv = chmod(sun->sun_path, perms)) != 0) { - rv = nni_plat_errno(errno); - nni_mtx_unlock(&ed->mtx); - nni_posix_pfd_fini(pfd); - return (rv); - } - } - } - - // Listen -- 128 depth is probably sufficient. If it isn't, other - // bad things are going to happen. - if (listen(fd, 128) != 0) { - rv = nni_plat_errno(errno); - nni_mtx_unlock(&ed->mtx); - nni_posix_pfd_fini(pfd); - return (rv); - } - - nni_posix_pfd_set_cb(pfd, nni_epdesc_accept_cb, ed); - - ed->pfd = pfd; - ed->started = true; - nni_mtx_unlock(&ed->mtx); - - return (0); -} - -void -nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) -{ - int rv; - - // Accept is simpler than the connect case. With accept we just - // need to wait for the socket to be readable to indicate an incoming - // connection is ready for us. There isn't anything else for us to - // do really, as that will have been done in listen. - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&ed->mtx); - - if (!ed->started) { - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } - if (ed->closed) { - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } - if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) { - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, rv); - return; - } - nni_aio_list_append(&ed->acceptq, aio); - if (nni_list_first(&ed->acceptq) == aio) { - nni_epdesc_doaccept(ed); - } - nni_mtx_unlock(&ed->mtx); -} - -int -nni_posix_epdesc_sockname(nni_posix_epdesc *ed, nni_sockaddr *sa) -{ - struct sockaddr_storage ss; - socklen_t sslen = sizeof(ss); - int fd = -1; - - nni_mtx_lock(&ed->mtx); - if (ed->pfd != NULL) { - fd = nni_posix_pfd_fd(ed->pfd); - } - nni_mtx_unlock(&ed->mtx); - - if (getsockname(fd, (void *) &ss, &sslen) != 0) { - return (nni_plat_errno(errno)); - } - return (nni_posix_sockaddr2nn(sa, &ss)); -} - -static void -nni_epdesc_connect_start(nni_posix_epdesc *ed) -{ - nni_posix_pfd *pfd; - int fd; - int rv; - nni_aio * aio; - -loop: - if ((aio = nni_list_first(&ed->connectq)) == NULL) { - return; - } - - NNI_ASSERT(ed->pfd == NULL); - if (ed->closed) { - nni_epdesc_finish(aio, NNG_ECLOSED, NULL); - goto loop; - } - ed->started = true; - - if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { - rv = nni_plat_errno(errno); - nni_epdesc_finish(aio, rv, NULL); - goto loop; - } - - if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { - (void) close(fd); - nni_epdesc_finish(aio, rv, NULL); - goto loop; - } - // Possibly bind. - if ((ed->loclen != 0) && - (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) { - rv = nni_plat_errno(errno); - nni_epdesc_finish(aio, rv, NULL); - nni_posix_pfd_fini(pfd); - goto loop; - } - - if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) { - // Immediate connect, cool! This probably only happens on - // loopback, and probably not on every platform. - nni_epdesc_finish(aio, 0, pfd); - goto loop; - } - - if (errno != EINPROGRESS) { - // Some immediate failure occurred. - if (errno == ENOENT) { // For UNIX domain sockets - rv = NNG_ECONNREFUSED; - } else { - rv = nni_plat_errno(errno); - } - nni_epdesc_finish(aio, rv, NULL); - nni_posix_pfd_fini(pfd); - goto loop; - } - nni_posix_pfd_set_cb(pfd, nni_epdesc_connect_cb, ed); - if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) { - nni_epdesc_finish(aio, rv, NULL); - nni_posix_pfd_fini(pfd); - goto loop; - } - ed->pfd = pfd; - // all done... wait for this to signal via callback -} - -void -nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) -{ - int rv; - - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&ed->mtx); - if (ed->closed) { - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } - if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) { - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, rv); - return; - } - - ed->started = true; - nni_list_append(&ed->connectq, aio); - if (nni_list_first(&ed->connectq) == aio) { - // If there was a stale pfd (probably from an aborted or - // canceled connect attempt), discard it so we start fresh. - if (ed->pfd != NULL) { - nni_posix_pfd_fini(ed->pfd); - ed->pfd = NULL; - } - nni_epdesc_connect_start(ed); - } - nni_mtx_unlock(&ed->mtx); -} - -static void -nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg) -{ - nni_posix_epdesc *ed = arg; - nni_aio * aio; - socklen_t sz; - int rv; - int fd; - - nni_mtx_lock(&ed->mtx); - if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) || - (pfd != ed->pfd)) { - // Spurious completion. Just ignore it. - nni_mtx_unlock(&ed->mtx); - return; - } - - fd = nni_posix_pfd_fd(pfd); - sz = sizeof(rv); - - if ((events & POLLNVAL) != 0) { - rv = EBADF; - - } else if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { - rv = errno; - } - - switch (rv) { - case 0: - // Good connect! - ed->pfd = NULL; - nni_epdesc_finish(aio, 0, pfd); - break; - case EINPROGRESS: // still connecting... come back later - nni_mtx_unlock(&ed->mtx); - return; - default: - ed->pfd = NULL; - nni_epdesc_finish(aio, nni_plat_errno(rv), NULL); - nni_posix_pfd_fini(pfd); - break; - } - - // Start another connect running, if any is waiting. - nni_epdesc_connect_start(ed); - nni_mtx_unlock(&ed->mtx); -} - -int -nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode) -{ - nni_posix_epdesc *ed; - - if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) { - return (NNG_ENOMEM); - } - - nni_mtx_init(&ed->mtx); - - ed->pfd = NULL; - ed->closed = false; - ed->started = false; - ed->perms = 0; // zero means use default (no change) - ed->mode = mode; - - nni_aio_list_init(&ed->connectq); - nni_aio_list_init(&ed->acceptq); - *edp = ed; - return (0); -} - -void -nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, size_t len) -{ - if ((len < 1) || (len > sizeof(struct sockaddr_storage))) { - return; - } - nni_mtx_lock(&ed->mtx); - memcpy(&ed->locaddr, sa, len); - ed->loclen = len; - nni_mtx_unlock(&ed->mtx); -} - -void -nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, size_t len) -{ - if ((len < 1) || (len > sizeof(struct sockaddr_storage))) { - return; - } - nni_mtx_lock(&ed->mtx); - memcpy(&ed->remaddr, sa, len); - ed->remlen = len; - nni_mtx_unlock(&ed->mtx); -} - -int -nni_posix_epdesc_set_permissions(nni_posix_epdesc *ed, mode_t mode) -{ - nni_mtx_lock(&ed->mtx); - if (ed->mode != NNI_EP_MODE_LISTEN) { - nni_mtx_unlock(&ed->mtx); - return (NNG_ENOTSUP); - } - if (ed->started) { - nni_mtx_unlock(&ed->mtx); - return (NNG_EBUSY); - } - if ((mode & S_IFMT) != 0) { - nni_mtx_unlock(&ed->mtx); - return (NNG_EINVAL); - } - ed->perms = mode | S_IFSOCK; // we set IFSOCK to ensure non-zero - nni_mtx_unlock(&ed->mtx); - return (0); -} - -void -nni_posix_epdesc_fini(nni_posix_epdesc *ed) -{ - nni_posix_pfd *pfd; - - nni_mtx_lock(&ed->mtx); - nni_epdesc_doclose(ed); - pfd = ed->pfd; - nni_mtx_unlock(&ed->mtx); - - if (pfd != NULL) { - nni_posix_pfd_fini(pfd); - } - nni_mtx_fini(&ed->mtx); - NNI_FREE_STRUCT(ed); -} - -#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c deleted file mode 100644 index 5ba3c8fb..00000000 --- a/src/platform/posix/posix_ipc.c +++ /dev/null @@ -1,250 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" - -#include <errno.h> -#include <fcntl.h> -#include <netdb.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/uio.h> -#include <sys/un.h> -#include <unistd.h> - -#ifdef SOCK_CLOEXEC -#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) -#else -#define NNI_STREAM_SOCKTYPE SOCK_STREAM -#endif - -// Solaris/SunOS systems define this, which collides with our symbol -// names. Just undefine it now. -#ifdef sun -#undef sun -#endif - -static int nni_plat_ipc_remove_stale(const char *path); - -// We alias nni_posix_pipedesc to nni_plat_ipc_pipe. -// We alias nni_posix_epdesc to nni_plat_ipc_ep. - -int -nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode) -{ - nni_posix_epdesc * ed; - int rv; - struct sockaddr_un sun; - - if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) { - return (rv); - } - switch (mode) { - case NNI_EP_MODE_DIAL: - nni_posix_nn2sockaddr(&sun, sa); - nni_posix_epdesc_set_remote(ed, &sun, sizeof(sun)); - break; - case NNI_EP_MODE_LISTEN: - - if ((rv = nni_plat_ipc_remove_stale(sa->s_ipc.sa_path)) != 0) { - return (rv); - } - - nni_posix_nn2sockaddr(&sun, sa); - nni_posix_epdesc_set_local(ed, &sun, sizeof(sun)); - break; - default: - nni_posix_epdesc_fini(ed); - return (NNG_EINVAL); - } - - *epp = (void *) ed; - return (0); -} - -void -nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) -{ - nni_posix_epdesc_fini((void *) ep); -} - -void -nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) -{ - nni_posix_epdesc_close((void *) ep); -} - -int -nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits) -{ - return (nni_posix_epdesc_set_permissions((void *) ep, (mode_t) bits)); -} - -int -nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *attr) -{ - NNI_ARG_UNUSED(ep); - NNI_ARG_UNUSED(attr); - return (NNG_ENOTSUP); -} - -// UNIX DOMAIN SOCKETS -- these have names in the file namespace. -// We are going to check to see if there was a name already there. -// If there was, and nothing is listening (ECONNREFUSED), then we -// will just try to cleanup the old socket. Note that this is not -// perfect in all scenarios, so use this with caution. -static int -nni_plat_ipc_remove_stale(const char *path) -{ - int fd; - struct sockaddr_un sun; - size_t sz; - - sun.sun_family = AF_UNIX; - sz = sizeof(sun.sun_path); - - if (nni_strlcpy(sun.sun_path, path, sz) >= sz) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - - // There is an assumption here that connect() returns immediately - // (even when non-blocking) when a server is absent. This seems - // to be true for the platforms we've tried. If it doesn't work, - // then the cleanup will fail. As this is supposed to be an - // exceptional case, don't worry. - (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if (connect(fd, (void *) &sun, sizeof(sun)) < 0) { - if (errno == ECONNREFUSED) { - (void) unlink(path); - } - } - (void) close(fd); - return (0); -} - -int -nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) -{ - nni_posix_epdesc *ed = (void *) ep; - - return (nni_posix_epdesc_listen(ed)); -} - -void -nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_connect((void *) ep, aio); -} - -void -nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_accept((void *) ep, aio); -} - -void -nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p) -{ - nni_posix_pipedesc_fini((void *) p); -} - -void -nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p) -{ - nni_posix_pipedesc_close((void *) p); -} - -void -nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_send((void *) p, aio); -} - -void -nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_recv((void *) p, aio); -} - -int -nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *p, uint64_t *uid) -{ - int rv; - uint64_t ignore; - - if ((rv = nni_posix_pipedesc_get_peerid( - (void *) p, uid, &ignore, &ignore, &ignore)) != 0) { - return (rv); - } - return (0); -} - -int -nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *p, uint64_t *gid) -{ - int rv; - uint64_t ignore; - - if ((rv = nni_posix_pipedesc_get_peerid( - (void *) p, &ignore, gid, &ignore, &ignore)) != 0) { - return (rv); - } - return (0); -} - -int -nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *p, uint64_t *zid) -{ - int rv; - uint64_t ignore; - uint64_t id; - - if ((rv = nni_posix_pipedesc_get_peerid( - (void *) p, &ignore, &ignore, &ignore, &id)) != 0) { - return (rv); - } - if (id == (uint64_t) -1) { - // NB: -1 is not a legal zone id (illumos/Solaris) - return (NNG_ENOTSUP); - } - *zid = id; - return (0); -} - -int -nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *p, uint64_t *pid) -{ - int rv; - uint64_t ignore; - uint64_t id; - - if ((rv = nni_posix_pipedesc_get_peerid( - (void *) p, &ignore, &ignore, &id, &ignore)) != 0) { - return (rv); - } - if (id == (uint64_t) -1) { - // NB: -1 is not a legal process id - return (NNG_ENOTSUP); - } - *pid = id; - return (0); -} - -#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h new file mode 100644 index 00000000..caf7ed7b --- /dev/null +++ b/src/platform/posix/posix_ipc.h @@ -0,0 +1,48 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <sys/types.h> // For mode_t + +struct nni_ipc_conn { + nni_posix_pfd * pfd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_aio * dial_aio; + nni_ipc_dialer *dialer; + nni_reap_item reap; +}; + +struct nni_ipc_dialer { + nni_list connq; // pending connections + bool closed; + nni_mtx mtx; +}; + +struct nni_ipc_listener { + nni_posix_pfd *pfd; + nni_list acceptq; + bool started; + bool closed; + char * path; + mode_t perms; + nni_mtx mtx; +}; + +extern int nni_posix_ipc_conn_init(nni_ipc_conn **, nni_posix_pfd *); +extern void nni_posix_ipc_conn_start(nni_ipc_conn *); + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c new file mode 100644 index 00000000..2b46fb12 --- /dev/null +++ b/src/platform/posix/posix_ipcconn.c @@ -0,0 +1,494 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX + +#include <errno.h> +#include <fcntl.h> +#include <poll.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <sys/un.h> +#include <unistd.h> +#if defined(NNG_HAVE_GETPEERUCRED) +#include <ucred.h> +#elif defined(NNG_HAVE_LOCALPEERCRED) +#include <sys/ucred.h> +#endif + +#ifdef NNG_HAVE_ALLOCA +#include <alloca.h> +#endif + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#include "posix_ipc.h" + +static void +ipc_conn_dowrite(nni_ipc_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->writeq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov * aiov; + struct msghdr hdr; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + memset(&hdr, 0, sizeof(hdr)); + nni_aio_get_iov(aio, &naiov, &aiov); + +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len > 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + hdr.msg_iovlen = niov; + hdr.msg_iov = iovec; + + if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + nni_aio_bump_count(aio, n); + // We completed the entire operation on this aio. + // (Sendmsg never returns a partial result.) + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +static void +ipc_conn_doread(nni_ipc_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->readq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov *aiov; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + nni_aio_get_iov(aio, &naiov, &aiov); +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + if (n == 0) { + // No bytes indicates a closed descriptor. + // This implicitly completes this (all!) aio. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + continue; + } + + nni_aio_bump_count(aio, n); + + // We completed the entire operation on this aio. + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +void +nni_ipc_conn_close(nni_ipc_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + nni_aio *aio; + c->closed = true; + while (((aio = nni_list_first(&c->readq)) != NULL) || + ((aio = nni_list_first(&c->writeq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_posix_pfd_close(c->pfd); + } + nni_mtx_unlock(&c->mtx); +} + +static void +ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_ipc_conn *c = arg; + + if (events & (POLLHUP | POLLERR | POLLNVAL)) { + nni_ipc_conn_close(c); + return; + } + nni_mtx_lock(&c->mtx); + if (events & POLLIN) { + ipc_conn_doread(c); + } + if (events & POLLOUT) { + ipc_conn_dowrite(c); + } + events = 0; + if (!nni_list_empty(&c->writeq)) { + events |= POLLOUT; + } + if (!nni_list_empty(&c->readq)) { + events |= POLLIN; + } + if ((!c->closed) && (events != 0)) { + nni_posix_pfd_arm(pfd, events); + } + nni_mtx_unlock(&c->mtx); +} + +static void +ipc_conn_cancel(nni_aio *aio, int rv) +{ + nni_ipc_conn *c = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio) +{ + + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->writeq, aio); + + if (nni_list_first(&c->writeq) == aio) { + ipc_conn_dowrite(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->writeq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLOUT); + } + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->readq, aio); + + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. + if (nni_list_first(&c->readq) == aio) { + ipc_conn_doread(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->readq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLIN); + } + } + nni_mtx_unlock(&c->mtx); +} + +int +ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid, + uint64_t *prid, uint64_t *znid) +{ + int fd = nni_posix_pfd_fd(c->pfd); +#if defined(NNG_HAVE_GETPEEREID) + uid_t uid; + gid_t gid; + + if (getpeereid(fd, &uid, &gid) != 0) { + return (nni_plat_errno(errno)); + } + *euid = uid; + *egid = gid; + *prid = (uint64_t) -1; + *znid = (uint64_t) -1; + return (0); +#elif defined(NNG_HAVE_GETPEERUCRED) + ucred_t *ucp = NULL; + if (getpeerucred(fd, &ucp) != 0) { + return (nni_plat_errno(errno)); + } + *euid = ucred_geteuid(ucp); + *egid = ucred_getegid(ucp); + *prid = ucred_getpid(ucp); + *znid = ucred_getzoneid(ucp); + ucred_free(ucp); + return (0); +#elif defined(NNG_HAVE_SOPEERCRED) + struct ucred uc; + socklen_t len = sizeof(uc); + if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) { + return (nni_plat_errno(errno)); + } + *euid = uc.uid; + *egid = uc.gid; + *prid = uc.pid; + *znid = (uint64_t) -1; + return (0); +#elif defined(NNG_HAVE_LOCALPEERCRED) + struct xucred xu; + socklen_t len = sizeof(xu); + if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) { + return (nni_plat_errno(errno)); + } + *euid = xu.cr_uid; + *egid = xu.cr_gid; + *prid = (uint64_t) -1; + *znid = (uint64_t) -1; +#if defined(LOCAL_PEERPID) // present (undocumented) on macOS + { + pid_t pid; + if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERPID, &pid, &len) == + 0) { + *prid = (uint64_t) pid; + } + } +#endif // LOCAL_PEERPID + return (0); +#else + if (fd < 0) { + return (NNG_ECLOSED); + } + NNI_ARG_UNUSED(euid); + NNI_ARG_UNUSED(egid); + NNI_ARG_UNUSED(prid); + NNI_ARG_UNUSED(znid); + return (NNG_ENOTSUP); +#endif +} +int +nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *uid) +{ + int rv; + uint64_t ignore; + + if ((rv = ipc_conn_peerid(c, uid, &ignore, &ignore, &ignore)) != 0) { + return (rv); + } + return (0); +} + +int +nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *gid) +{ + int rv; + uint64_t ignore; + + if ((rv = ipc_conn_peerid(c, &ignore, gid, &ignore, &ignore)) != 0) { + return (rv); + } + return (0); +} + +int +nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *zid) +{ + int rv; + uint64_t ignore; + uint64_t id; + + if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) { + return (rv); + } + if (id == (uint64_t) -1) { + // NB: -1 is not a legal zone id (illumos/Solaris) + return (NNG_ENOTSUP); + } + *zid = id; + return (0); +} + +int +nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid) +{ + int rv; + uint64_t ignore; + uint64_t id; + + if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) { + return (rv); + } + if (id == (uint64_t) -1) { + // NB: -1 is not a legal process id + return (NNG_ENOTSUP); + } + *pid = id; + return (0); +} + +int +nni_posix_ipc_conn_init(nni_ipc_conn **cp, nni_posix_pfd *pfd) +{ + nni_ipc_conn *c; + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + + c->closed = false; + c->pfd = pfd; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + *cp = c; + return (0); +} + +void +nni_posix_ipc_conn_start(nni_ipc_conn *c) +{ + nni_posix_pfd_set_cb(c->pfd, ipc_conn_cb, c); +} + +void +nni_ipc_conn_fini(nni_ipc_conn *c) +{ + nni_ipc_conn_close(c); + nni_posix_pfd_fini(c->pfd); + nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN + c->pfd = NULL; + nni_mtx_unlock(&c->mtx); + nni_mtx_fini(&c->mtx); + + NNI_FREE_STRUCT(c); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c new file mode 100644 index 00000000..13732911 --- /dev/null +++ b/src/platform/posix/posix_ipcdial.c @@ -0,0 +1,238 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_ipc.h" + +// Dialer stuff. +int +nni_ipc_dialer_init(nni_ipc_dialer **dp) +{ + nni_ipc_dialer *d; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&d->mtx); + d->closed = false; + nni_aio_list_init(&d->connq); + *dp = d; + return (0); +} + +void +nni_ipc_dialer_close(nni_ipc_dialer *d) +{ + nni_mtx_lock(&d->mtx); + if (!d->closed) { + nni_aio *aio; + d->closed = true; + while ((aio = nni_list_first(&d->connq)) != NULL) { + nni_ipc_conn *c; + nni_list_remove(&d->connq, aio); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_ipc_conn_close(c); + nni_reap( + &c->reap, (nni_cb) nni_ipc_conn_fini, c); + } + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } + nni_mtx_unlock(&d->mtx); +} + +void +nni_ipc_dialer_fini(nni_ipc_dialer *d) +{ + nni_ipc_dialer_close(d); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +ipc_dialer_cancel(nni_aio *aio, int rv) +{ + nni_ipc_dialer *d = nni_aio_get_prov_data(aio); + nni_ipc_conn * c; + + nni_mtx_lock(&d->mtx); + if ((!nni_aio_list_active(aio)) || + ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) { + nni_mtx_unlock(&d->mtx); + return; + } + nni_aio_list_remove(aio); + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + nni_aio_finish_error(aio, rv); + nni_ipc_conn_fini(c); +} + +static void +ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg) +{ + nni_ipc_conn * c = arg; + nni_ipc_dialer *d = c->dialer; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + aio = c->dial_aio; + if ((aio == NULL) || (!nni_aio_list_active(aio))) { + nni_mtx_unlock(&d->mtx); + return; + } + + if (ev & POLLNVAL) { + rv = EBADF; + + } else { + socklen_t sz = sizeof(int); + int fd = nni_posix_pfd_fd(pfd); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + if (rv == EINPROGRESS) { + // Connection still in progress, come back + // later. + nni_mtx_unlock(&d->mtx); + return; + } else if (rv != 0) { + rv = nni_plat_errno(rv); + } + } + + c->dial_aio = NULL; + nni_aio_list_remove(aio); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + if (rv != 0) { + nni_ipc_conn_close(c); + nni_ipc_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + nni_posix_ipc_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +// We don't give local address binding support. Outbound dialers always +// get an ephemeral port. +void +nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + nni_ipc_conn * c; + nni_posix_pfd * pfd = NULL; + struct sockaddr_storage ss; + size_t sslen; + int fd; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + (ss.ss_family != AF_UNIX)) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_aio_finish_error(aio, nni_plat_errno(errno)); + return; + } + + // This arranges for the fd to be in nonblocking mode, and adds the + // pollfd to the list. + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + (void) close(fd); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_finish_error(aio, rv); + return; + } + c->dialer = d; + nni_posix_pfd_set_cb(pfd, ipc_dialer_cb, c); + + nni_mtx_lock(&d->mtx); + if (d->closed) { + rv = NNG_ECLOSED; + goto error; + } + if ((rv = nni_aio_schedule(aio, ipc_dialer_cancel, d)) != 0) { + goto error; + } + if ((rv = connect(fd, (void *) &ss, sslen)) != 0) { + if (errno != EINPROGRESS) { + if (errno == ENOENT) { + // No socket present means nobody listening. + rv = NNG_ECONNREFUSED; + } else { + rv = nni_plat_errno(errno); + } + goto error; + } + // Asynchronous connect. + if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) { + goto error; + } + c->dial_aio = aio; + nni_aio_set_prov_extra(aio, 0, c); + nni_list_append(&d->connq, aio); + nni_mtx_unlock(&d->mtx); + return; + } + // Immediate connect, cool! This probably only happens + // on loopback, and probably not on every platform. + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_posix_ipc_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + return; + +error: + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_reap(&c->reap, (nni_cb) nni_ipc_conn_fini, c); + nni_aio_finish_error(aio, rv); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c new file mode 100644 index 00000000..4e33a08f --- /dev/null +++ b/src/platform/posix/posix_ipclisten.c @@ -0,0 +1,373 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX + +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <sys/un.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_ipc.h" + +int +nni_ipc_listener_init(nni_ipc_listener **lp) +{ + nni_ipc_listener *l; + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_init(&l->mtx); + + l->pfd = NULL; + l->closed = false; + l->started = false; + l->perms = 0; + + nni_aio_list_init(&l->acceptq); + *lp = l; + return (0); +} + +static void +ipc_listener_doclose(nni_ipc_listener *l) +{ + nni_aio *aio; + char * path; + + l->closed = true; + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + + if (l->pfd != NULL) { + nni_posix_pfd_close(l->pfd); + } + if (l->started && ((path = l->path) != NULL)) { + l->path = NULL; + (void) unlink(path); + nni_strfree(path); + } +} + +void +nni_ipc_listener_close(nni_ipc_listener *l) +{ + nni_mtx_lock(&l->mtx); + ipc_listener_doclose(l); + nni_mtx_unlock(&l->mtx); +} + +static void +ipc_listener_doaccept(nni_ipc_listener *l) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + int newfd; + int fd; + int rv; + nni_posix_pfd *pfd; + nni_ipc_conn * c; + + fd = nni_posix_pfd_fd(l->pfd); + +#ifdef NNG_USE_ACCEPT4 + newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); + if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { + newfd = accept(fd, NULL, NULL); + } +#else + newfd = accept(fd, NULL, NULL); +#endif + if (newfd < 0) { + switch (errno) { + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + rv = nni_posix_pfd_arm(l->pfd, POLLIN); + if (rv != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + // Come back later... + return; + case ECONNABORTED: + case ECONNRESET: + // Eat them, they aren't interesting. + continue; + default: + // Error this one, but keep moving to the next. + rv = nni_plat_errno(errno); + NNI_ASSERT(rv != 0); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + } + + if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + close(newfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + nni_aio_list_remove(aio); + nni_posix_ipc_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +static void +ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_ipc_listener *l = arg; + NNI_ARG_UNUSED(pfd); + + nni_mtx_lock(&l->mtx); + if (events & POLLNVAL) { + ipc_listener_doclose(l); + nni_mtx_unlock(&l->mtx); + return; + } + + // Anything else will turn up in accept. + ipc_listener_doaccept(l); + nni_mtx_unlock(&l->mtx); +} + +static void +ipc_listener_cancel(nni_aio *aio, int rv) +{ + nni_ipc_listener *l = nni_aio_get_prov_data(aio); + + // This is dead easy, because we'll ignore the completion if there + // isn't anything to do the accept on! + NNI_ASSERT(rv != 0); + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +static int +ipc_remove_stale(const char *path) +{ + int fd; + struct sockaddr_un sun; + size_t sz; + + sun.sun_family = AF_UNIX; + sz = sizeof(sun.sun_path); + + if (nni_strlcpy(sun.sun_path, path, sz) >= sz) { + return (NNG_EADDRINVAL); + } + + if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + return (nni_plat_errno(errno)); + } + + // There is an assumption here that connect() returns immediately + // (even when non-blocking) when a server is absent. This seems + // to be true for the platforms we've tried. If it doesn't work, + // then the cleanup will fail. As this is supposed to be an + // exceptional case, don't worry. + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + if (connect(fd, (void *) &sun, sizeof(sun)) < 0) { + if (errno == ECONNREFUSED) { + (void) unlink(path); + } + } + (void) close(fd); + return (0); +} + +int +nni_ipc_listener_set_permissions(nni_ipc_listener *l, int mode) +{ + if ((mode & S_IFMT) != 0) { + return (NNG_EINVAL); + } + mode |= S_IFSOCK; // set IFSOCK to ensure non-zero + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_EBUSY); + } + l->perms = mode; + nni_mtx_unlock(&l->mtx); + return (0); +} + +int +nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *sd) +{ + NNI_ARG_UNUSED(l); + NNI_ARG_UNUSED(sd); + return (NNG_ENOTSUP); +} + +int +nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa) +{ + socklen_t len; + struct sockaddr_storage ss; + int rv; + int fd; + nni_posix_pfd * pfd; + char * path; + + if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + (ss.ss_family != AF_UNIX)) { + return (NNG_EADDRINVAL); + } + + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_ESTATE); + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + path = nni_strdup(sa->s_ipc.sa_path); + if (path == NULL) { + return (NNG_ENOMEM); + } + + if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + return (rv); + } + + if (((rv = ipc_remove_stale(path)) != 0) || + ((rv = nni_posix_pfd_init(&pfd, fd)) != 0)) { + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + (void) close(fd); + return (rv); + } + + if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + nni_posix_pfd_fini(pfd); + return (rv); + } + + if (((l->perms != 0) && (chmod(path, l->perms & ~S_IFMT) != 0)) || + (listen(fd, 128) != 0)) { + rv = nni_plat_errno(errno); + (void) unlink(path); + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + nni_posix_pfd_fini(pfd); + return (rv); + } + + nni_posix_pfd_set_cb(pfd, ipc_listener_cb, l); + + l->pfd = pfd; + l->started = true; + l->path = path; + nni_mtx_unlock(&l->mtx); + + return (0); +} + +void +nni_ipc_listener_fini(nni_ipc_listener *l) +{ + nni_posix_pfd *pfd; + + nni_mtx_lock(&l->mtx); + ipc_listener_doclose(l); + pfd = l->pfd; + nni_mtx_unlock(&l->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +void +nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio) +{ + int rv; + + // Accept is simpler than the connect case. With accept we just + // need to wait for the socket to be readable to indicate an incoming + // connection is ready for us. There isn't anything else for us to + // do really, as that will have been done in listen. + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + + if (!l->started) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, ipc_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&l->acceptq, aio); + if (nni_list_first(&l->acceptq) == aio) { + ipc_listener_doaccept(l); + } + nni_mtx_unlock(&l->mtx); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c deleted file mode 100644 index b11036ea..00000000 --- a/src/platform/posix/posix_pipedesc.c +++ /dev/null @@ -1,503 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_pollq.h" - -#include <errno.h> -#include <fcntl.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <poll.h> -#include <stdbool.h> -#include <stdlib.h> -#include <string.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/uio.h> -#include <unistd.h> -#if defined(NNG_HAVE_GETPEERUCRED) -#include <ucred.h> -#elif defined(NNG_HAVE_LOCALPEERCRED) -#include <sys/ucred.h> -#include <sys/un.h> -#endif -#ifdef NNG_HAVE_ALLOCA -#include <alloca.h> -#endif - -// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open -// file descriptor for TCP socket, etc.) This contains the list of pending -// aios for that underlying socket, as well as the socket itself. -struct nni_posix_pipedesc { - nni_posix_pfd *pfd; - nni_list readq; - nni_list writeq; - bool closed; - nni_mtx mtx; -}; - -static void -nni_posix_pipedesc_finish(nni_aio *aio, int rv) -{ - nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, nni_aio_count(aio)); -} - -static void -nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) -{ - nni_aio *aio; - - pd->closed = true; - while ((aio = nni_list_first(&pd->readq)) != NULL) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - } - while ((aio = nni_list_first(&pd->writeq)) != NULL) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - } - nni_posix_pfd_close(pd->pfd); -} - -static void -nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) -{ - nni_aio *aio; - int fd; - - fd = nni_posix_pfd_fd(pd->pfd); - if ((fd < 0) || (pd->closed)) { - return; - } - - while ((aio = nni_list_first(&pd->writeq)) != NULL) { - unsigned i; - int n; - int niov; - unsigned naiov; - nni_iov * aiov; - struct msghdr hdr; -#ifdef NNG_HAVE_ALLOCA - struct iovec *iovec; -#else - struct iovec iovec[16]; -#endif - - memset(&hdr, 0, sizeof(hdr)); - - nni_aio_get_iov(aio, &naiov, &aiov); - -#ifdef NNG_HAVE_ALLOCA - if (naiov > 64) { - nni_posix_pipedesc_finish(aio, NNG_EINVAL); - continue; - } - iovec = alloca(naiov * sizeof(*iovec)); -#else - if (naiov > NNI_NUM_ELEMENTS(iovec)) { - nni_posix_pipedesc_finish(aio, NNG_EINVAL); - continue; - } -#endif - - for (niov = 0, i = 0; i < naiov; i++) { - if (aiov[i].iov_len > 0) { - iovec[niov].iov_len = aiov[i].iov_len; - iovec[niov].iov_base = aiov[i].iov_buf; - niov++; - } - } - -#ifndef MSG_NOSIGNAL -#define MSG_NOSIGNAL 0 -#endif - - hdr.msg_iovlen = niov; - hdr.msg_iov = iovec; - - if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { - switch (errno) { - case EINTR: - continue; - case EAGAIN: -#ifdef EWOULDBLOCK -#if EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif -#endif - return; - default: - nni_posix_pipedesc_finish( - aio, nni_plat_errno(errno)); - nni_posix_pipedesc_doclose(pd); - return; - } - } - - nni_aio_bump_count(aio, n); - // We completed the entire operation on this aioq. - // (Sendmsg never returns a partial result.) - nni_posix_pipedesc_finish(aio, 0); - - // Go back to start of loop to see if there is another - // aio ready for us to process. - } -} - -static void -nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) -{ - nni_aio *aio; - int fd; - - fd = nni_posix_pfd_fd(pd->pfd); - if ((fd < 0) || (pd->closed)) { - return; - } - - while ((aio = nni_list_first(&pd->readq)) != NULL) { - unsigned i; - int n; - int niov; - unsigned naiov; - nni_iov *aiov; -#ifdef NNG_HAVE_ALLOCA - struct iovec *iovec; -#else - struct iovec iovec[16]; -#endif - - nni_aio_get_iov(aio, &naiov, &aiov); -#ifdef NNG_HAVE_ALLOCA - if (naiov > 64) { - nni_posix_pipedesc_finish(aio, NNG_EINVAL); - continue; - } - iovec = alloca(naiov * sizeof(*iovec)); -#else - if (naiov > NNI_NUM_ELEMENTS(iovec)) { - nni_posix_pipedesc_finish(aio, NNG_EINVAL); - continue; - } -#endif - for (niov = 0, i = 0; i < naiov; i++) { - if (aiov[i].iov_len != 0) { - iovec[niov].iov_len = aiov[i].iov_len; - iovec[niov].iov_base = aiov[i].iov_buf; - niov++; - } - } - - if ((n = readv(fd, iovec, niov)) < 0) { - switch (errno) { - case EINTR: - continue; - case EAGAIN: - return; - default: - nni_posix_pipedesc_finish( - aio, nni_plat_errno(errno)); - nni_posix_pipedesc_doclose(pd); - return; - } - } - - if (n == 0) { - // No bytes indicates a closed descriptor. - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - nni_posix_pipedesc_doclose(pd); - return; - } - - nni_aio_bump_count(aio, n); - - // We completed the entire operation on this aioq. - nni_posix_pipedesc_finish(aio, 0); - - // Go back to start of loop to see if there is another - // aio ready for us to process. - } -} - -static void -nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg) -{ - nni_posix_pipedesc *pd = arg; - - nni_mtx_lock(&pd->mtx); - if (events & POLLIN) { - nni_posix_pipedesc_doread(pd); - } - if (events & POLLOUT) { - nni_posix_pipedesc_dowrite(pd); - } - if (events & (POLLHUP | POLLERR | POLLNVAL)) { - nni_posix_pipedesc_doclose(pd); - } else { - events = 0; - if (!nni_list_empty(&pd->writeq)) { - events |= POLLOUT; - } - if (!nni_list_empty(&pd->readq)) { - events |= POLLIN; - } - if ((!pd->closed) && (events != 0)) { - nni_posix_pfd_arm(pfd, events); - } - } - nni_mtx_unlock(&pd->mtx); -} - -void -nni_posix_pipedesc_close(nni_posix_pipedesc *pd) -{ - // NB: Events may still occur. - nni_mtx_lock(&pd->mtx); - nni_posix_pipedesc_doclose(pd); - nni_mtx_unlock(&pd->mtx); -} - -static void -nni_posix_pipedesc_cancel(nni_aio *aio, int rv) -{ - nni_posix_pipedesc *pd = nni_aio_get_prov_data(aio); - - nni_mtx_lock(&pd->mtx); - if (nni_aio_list_active(aio)) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - } - nni_mtx_unlock(&pd->mtx); -} - -void -nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) -{ - int rv; - - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&pd->mtx); - - if (pd->closed) { - nni_mtx_unlock(&pd->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } - - if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) { - nni_mtx_unlock(&pd->mtx); - nni_aio_finish_error(aio, rv); - return; - } - nni_aio_list_append(&pd->readq, aio); - - // If we are only job on the list, go ahead and try to do an - // immediate transfer. This allows for faster completions in - // many cases. We also need not arm a list if it was already - // armed. - if (nni_list_first(&pd->readq) == aio) { - nni_posix_pipedesc_doread(pd); - // If we are still the first thing on the list, that - // means we didn't finish the job, so arm the poller to - // complete us. - if (nni_list_first(&pd->readq) == aio) { - nni_posix_pfd_arm(pd->pfd, POLLIN); - } - } - nni_mtx_unlock(&pd->mtx); -} - -void -nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) -{ - int rv; - - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&pd->mtx); - - if (pd->closed) { - nni_mtx_unlock(&pd->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } - - if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) { - nni_mtx_unlock(&pd->mtx); - nni_aio_finish_error(aio, rv); - return; - } - nni_aio_list_append(&pd->writeq, aio); - - if (nni_list_first(&pd->writeq) == aio) { - nni_posix_pipedesc_dowrite(pd); - // If we are still the first thing on the list, that - // means we didn't finish the job, so arm the poller to - // complete us. - if (nni_list_first(&pd->writeq) == aio) { - nni_posix_pfd_arm(pd->pfd, POLLOUT); - } - } - nni_mtx_unlock(&pd->mtx); -} - -int -nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa) -{ - struct sockaddr_storage ss; - socklen_t sslen = sizeof(ss); - int fd = nni_posix_pfd_fd(pd->pfd); - - if (getpeername(fd, (void *) &ss, &sslen) != 0) { - return (nni_plat_errno(errno)); - } - return (nni_posix_sockaddr2nn(sa, &ss)); -} - -int -nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa) -{ - struct sockaddr_storage ss; - socklen_t sslen = sizeof(ss); - int fd = nni_posix_pfd_fd(pd->pfd); - - if (getsockname(fd, (void *) &ss, &sslen) != 0) { - return (nni_plat_errno(errno)); - } - return (nni_posix_sockaddr2nn(sa, &ss)); -} - -int -nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay) -{ - int val = nodelay ? 1 : 0; - int fd = nni_posix_pfd_fd(pd->pfd); - - if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { - return (nni_plat_errno(errno)); - } - return (0); -} - -int -nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep) -{ - int val = keep ? 1 : 0; - int fd = nni_posix_pfd_fd(pd->pfd); - - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { - return (nni_plat_errno(errno)); - } - return (0); -} - -int -nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid, - uint64_t *egid, uint64_t *prid, uint64_t *znid) -{ - int fd = nni_posix_pfd_fd(pd->pfd); -#if defined(NNG_HAVE_GETPEEREID) - uid_t uid; - gid_t gid; - - if (getpeereid(fd, &uid, &gid) != 0) { - return (nni_plat_errno(errno)); - } - *euid = uid; - *egid = gid; - *prid = (uint64_t) -1; - *znid = (uint64_t) -1; - return (0); -#elif defined(NNG_HAVE_GETPEERUCRED) - ucred_t *ucp = NULL; - if (getpeerucred(fd, &ucp) != 0) { - return (nni_plat_errno(errno)); - } - *euid = ucred_geteuid(ucp); - *egid = ucred_getegid(ucp); - *prid = ucred_getpid(ucp); - *znid = ucred_getzoneid(ucp); - ucred_free(ucp); - return (0); -#elif defined(NNG_HAVE_SOPEERCRED) - struct ucred uc; - socklen_t len = sizeof(uc); - if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) { - return (nni_plat_errno(errno)); - } - *euid = uc.uid; - *egid = uc.gid; - *prid = uc.pid; - *znid = (uint64_t) -1; - return (0); -#elif defined(NNG_HAVE_LOCALPEERCRED) - struct xucred xu; - socklen_t len = sizeof(xu); - if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) { - return (nni_plat_errno(errno)); - } - *euid = xu.cr_uid; - *egid = xu.cr_gid; - *prid = (uint64_t) -1; // XXX: macOS has undocumented - // LOCAL_PEERPID... - *znid = (uint64_t) -1; - return (0); -#else - if (fd < 0) { - return (NNG_ECLOSED); - } - NNI_ARG_UNUSED(euid); - NNI_ARG_UNUSED(egid); - NNI_ARG_UNUSED(prid); - NNI_ARG_UNUSED(znid); - return (NNG_ENOTSUP); -#endif -} - -int -nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd) -{ - nni_posix_pipedesc *pd; - - if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { - return (NNG_ENOMEM); - } - - pd->closed = false; - pd->pfd = pfd; - - nni_mtx_init(&pd->mtx); - nni_aio_list_init(&pd->readq); - nni_aio_list_init(&pd->writeq); - - nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd); - - *pdp = pd; - return (0); -} - -void -nni_posix_pipedesc_fini(nni_posix_pipedesc *pd) -{ - nni_posix_pipedesc_close(pd); - nni_posix_pfd_fini(pd->pfd); - pd->pfd = NULL; - nni_mtx_fini(&pd->mtx); - - NNI_FREE_STRUCT(pd); -} - -#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 63d858c2..f78d0b6b 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #ifdef NNG_USE_POSIX_RESOLV_GAI -#include "platform/posix/posix_aio.h" #include <arpa/inet.h> #include <ctype.h> diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index 5cd1887a..c0352c55 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" #include <arpa/inet.h> #include <errno.h> diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c index 7f627361..21f6ecfe 100644 --- a/src/platform/posix/posix_tcpdial.c +++ b/src/platform/posix/posix_tcpdial.c @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" #include <arpa/inet.h> #include <errno.h> diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c index cdcbecd9..8c186885 100644 --- a/src/platform/posix/posix_tcplisten.c +++ b/src/platform/posix/posix_tcplisten.c @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" #include <arpa/inet.h> #include <errno.h> @@ -151,6 +150,7 @@ static void tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_tcp_listener *l = arg; + NNI_ARG_UNUSED(pfd); nni_mtx_lock(&l->mtx); if (events & POLLNVAL) { @@ -158,7 +158,6 @@ tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg) nni_mtx_unlock(&l->mtx); return; } - NNI_ASSERT(pfd == l->pfd); // Anything else will turn up in accept. tcp_listener_doaccept(l); @@ -212,7 +211,7 @@ nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { nni_mtx_unlock(&l->mtx); - nni_posix_pfd_fini(pfd); + (void) close(fd); return (rv); } diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 759bdb96..873a02a1 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" #include "platform/posix/posix_pollq.h" #include <errno.h> @@ -177,7 +176,7 @@ static void nni_posix_udp_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_plat_udp *udp = arg; - NNI_ASSERT(pfd == udp->udp_pfd); + NNI_ARG_UNUSED(pfd); nni_mtx_lock(&udp->udp_mtx); if (events & POLLIN) { diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index 93e45423..b8741b52 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -127,9 +127,8 @@ extern void nni_win_udp_sysfini(void); extern int nni_win_resolv_sysinit(void); extern void nni_win_resolv_sysfini(void); -extern int nni_win_io_init(nni_win_io *, HANDLE, nni_win_io_cb, void *); +extern int nni_win_io_init(nni_win_io *, nni_win_io_cb, void *); extern void nni_win_io_fini(nni_win_io *); -extern void nni_win_io_cancel(nni_win_io *); extern int nni_win_io_register(HANDLE); diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c index 1179b603..50b22343 100644 --- a/src/platform/windows/win_io.c +++ b/src/platform/windows/win_io.c @@ -61,14 +61,13 @@ nni_win_io_register(HANDLE h) } int -nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr) +nni_win_io_init(nni_win_io *io, nni_win_io_cb cb, void *ptr) { ZeroMemory(&io->olpd, sizeof(io->olpd)); io->cb = cb; io->ptr = ptr; io->aio = NULL; - io->f = f; io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); if (io->olpd.hEvent == NULL) { return (nni_win_error(GetLastError())); @@ -77,14 +76,6 @@ nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr) } void -nni_win_io_cancel(nni_win_io *io) -{ - if (io->f != INVALID_HANDLE_VALUE) { - CancelIoEx(io->f, &io->olpd); - } -} - -void nni_win_io_fini(nni_win_io *io) { if (io->olpd.hEvent != NULL) { diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c deleted file mode 100644 index 169a2e00..00000000 --- a/src/platform/windows/win_ipc.c +++ /dev/null @@ -1,673 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef NNG_PLATFORM_WINDOWS - -#include <stdio.h> - -struct nni_plat_ipc_pipe { - HANDLE p; - int mode; - nni_win_event rcv_ev; - nni_win_event snd_ev; -}; - -struct nni_plat_ipc_ep { - char path[NNG_MAXADDRLEN + 16]; - nni_sockaddr addr; - int mode; - bool started; - HANDLE p; // accept side only - nni_win_event acc_ev; // accept side only - nni_aio * con_aio; // conn side only - nni_list_node node; // conn side uses this - SECURITY_ATTRIBUTES sec_attr; -}; - -static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *); -static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *); -static void nni_win_ipc_pipe_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_ipc_pipe_ops = { - .wev_start = nni_win_ipc_pipe_start, - .wev_finish = nni_win_ipc_pipe_finish, - .wev_cancel = nni_win_ipc_pipe_cancel, -}; - -static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *); -static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *); -static void nni_win_ipc_acc_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_ipc_acc_ops = { - .wev_start = nni_win_ipc_acc_start, - .wev_finish = nni_win_ipc_acc_finish, - .wev_cancel = nni_win_ipc_acc_cancel, -}; - -static int -nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) -{ - void * buf; - DWORD len; - BOOL ok; - int rv; - nni_plat_ipc_pipe *pipe = evt->ptr; - unsigned idx; - unsigned naiov; - nni_iov * aiov; - - NNI_ASSERT(aio != NULL); - - if (pipe->p == INVALID_HANDLE_VALUE) { - evt->status = NNG_ECLOSED; - evt->count = 0; - return (1); - } - - nni_aio_get_iov(aio, &naiov, &aiov); - idx = 0; - while ((idx < naiov) && (aiov[idx].iov_len == 0)) { - idx++; - } - NNI_ASSERT(idx < naiov); - // Now start a transfer. We assume that only one send can be - // outstanding on a pipe at a time. This is important to avoid - // scrambling the data anyway. Note that Windows named pipes do - // not appear to support scatter/gather, so we have to process - // each element in turn. - buf = aiov[idx].iov_buf; - len = (DWORD) aiov[idx].iov_len; - NNI_ASSERT(buf != NULL); - NNI_ASSERT(len != 0); - - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - if (len > 0x1000000) { - len = 0x1000000; - } - - evt->count = 0; - if (evt == &pipe->snd_ev) { - ok = WriteFile(pipe->p, buf, len, NULL, &evt->olpd); - } else { - ok = ReadFile(pipe->p, buf, len, NULL, &evt->olpd); - } - if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - - // Wait for the I/O completion event. Note that when an I/O - // completes immediately, the I/O completion packet is still - // delivered. - return (0); -} - -static void -nni_win_ipc_pipe_cancel(nni_win_event *evt) -{ - nni_plat_ipc_pipe *pipe = evt->ptr; - - CancelIoEx(pipe->p, &evt->olpd); -} - -static void -nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_aio_finish(aio, evt->status, evt->count); -} - -static int -nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p, int mode) -{ - nni_plat_ipc_pipe *pipe; - int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - pipe->mode = mode; - rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe); - if (rv != 0) { - nni_plat_ipc_pipe_fini(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe); - if (rv != 0) { - nni_plat_ipc_pipe_fini(pipe); - return (rv); - } - - pipe->p = p; - *pipep = pipe; - return (0); -} - -void -nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->snd_ev, aio); -} - -void -nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->rcv_ev, aio); -} - -void -nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe) -{ - HANDLE p; - - nni_win_event_close(&pipe->snd_ev); - nni_win_event_close(&pipe->rcv_ev); - - if ((p = pipe->p) != INVALID_HANDLE_VALUE) { - pipe->p = INVALID_HANDLE_VALUE; - CloseHandle(p); - } -} - -void -nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe) -{ - nni_plat_ipc_pipe_close(pipe); - - nni_win_event_fini(&pipe->snd_ev); - nni_win_event_fini(&pipe->rcv_ev); - NNI_FREE_STRUCT(pipe); -} - -int -nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *pipe, uint64_t *id) -{ - NNI_ARG_UNUSED(pipe); - NNI_ARG_UNUSED(id); - return (NNG_ENOTSUP); -} - -int -nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *pipe, uint64_t *id) -{ - NNI_ARG_UNUSED(pipe); - NNI_ARG_UNUSED(id); - return (NNG_ENOTSUP); -} - -int -nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *pipe, uint64_t *id) -{ - NNI_ARG_UNUSED(pipe); - NNI_ARG_UNUSED(id); - return (NNG_ENOTSUP); -} - -// nni_plat_ipc_pipe_get_peer_gid obtains the peer group id, if possible. -// NB: Only POSIX systems support group IDs. -int -nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *pipe, uint64_t *pid) -{ - ULONG id; - switch (pipe->mode) { - case NNI_EP_MODE_DIAL: - if (!GetNamedPipeServerProcessId(pipe->p, &id)) { - return (nni_win_error(GetLastError())); - } - *pid = id; - break; - case NNI_EP_MODE_LISTEN: - if (!GetNamedPipeClientProcessId(pipe->p, &id)) { - return (nni_win_error(GetLastError())); - } - *pid = id; - break; - default: - // Should never occur! - return (NNG_EINVAL); - } - return (0); -} - -int -nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode) -{ - const char * path; - nni_plat_ipc_ep *ep; - - path = sa->s_ipc.sa_path; - if (nni_strnlen(path, NNG_MAXADDRLEN) >= NNG_MAXADDRLEN) { - return (NNG_EINVAL); - } - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } - ZeroMemory(ep, sizeof(*ep)); - - ep->mode = mode; - ep->sec_attr.nLength = sizeof(ep->sec_attr); - ep->sec_attr.lpSecurityDescriptor = NULL; - ep->sec_attr.bInheritHandle = FALSE; - NNI_LIST_NODE_INIT(&ep->node); - - ep->addr = *sa; - (void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path); - - *epp = ep; - return (0); -} - -int -nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits) -{ - NNI_ARG_UNUSED(ep); - NNI_ARG_UNUSED(bits); - return (NNG_ENOTSUP); -} - -int -nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *desc) -{ - if (ep->started) { - return (NNG_EBUSY); - } - if (ep->mode != NNI_EP_MODE_LISTEN) { - return (NNG_ENOTSUP); - } - if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) { - return (NNG_EINVAL); - } - ep->sec_attr.lpSecurityDescriptor = desc; - return (0); -} - -int -nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) -{ - int rv; - HANDLE p; - - if (ep->mode != NNI_EP_MODE_LISTEN) { - return (NNG_EINVAL); - } - if (ep->started) { - return (NNG_EBUSY); - } - - // We create the first named pipe, and we make sure that it is - // properly ours. - p = CreateNamedPipeA(ep->path, - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | - FILE_FLAG_FIRST_PIPE_INSTANCE, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | - PIPE_REJECT_REMOTE_CLIENTS, - PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr); - if (p == INVALID_HANDLE_VALUE) { - if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) { - rv = NNG_EADDRINUSE; - } else { - rv = nni_win_error(rv); - } - goto failed; - } - rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep); - if (rv != 0) { - goto failed; - } - - if ((rv = nni_win_iocp_register(p)) != 0) { - goto failed; - } - - ep->p = p; - ep->started = true; - return (0); - -failed: - - if (p != INVALID_HANDLE_VALUE) { - (void) CloseHandle(p); - } - - return (rv); -} - -static void -nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_ipc_ep * ep = evt->ptr; - nni_plat_ipc_pipe *pipe; - int rv; - HANDLE newp, oldp; - - if ((rv = evt->status) != 0) { - nni_aio_finish_error(aio, rv); - return; - } - - newp = CreateNamedPipeA(ep->path, - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | - PIPE_REJECT_REMOTE_CLIENTS, - PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr); - if (newp == INVALID_HANDLE_VALUE) { - rv = nni_win_error(GetLastError()); - // We connected, but as we cannot get a new pipe, - // we have to disconnect the old one. - DisconnectNamedPipe(ep->p); - nni_aio_finish_error(aio, rv); - return; - } - if ((rv = nni_win_iocp_register(newp)) != 0) { - // Disconnect the old pipe. - DisconnectNamedPipe(ep->p); - // And discard the half-baked new one. - DisconnectNamedPipe(newp); - (void) CloseHandle(newp); - nni_aio_finish_error(aio, rv); - return; - } - - oldp = ep->p; - ep->p = newp; - - if ((rv = nni_win_ipc_pipe_init(&pipe, oldp, NNI_EP_MODE_LISTEN)) != - 0) { - // The new pipe is already fine for us. Discard - // the old one, since failed to be able to use it. - DisconnectNamedPipe(oldp); - (void) CloseHandle(oldp); - nni_aio_finish_error(aio, rv); - return; - } - - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); -} - -static void -nni_win_ipc_acc_cancel(nni_win_event *evt) -{ - nni_plat_ipc_ep *ep = evt->ptr; - - (void) CancelIoEx(ep->p, &evt->olpd); - // Just to be sure. - (void) DisconnectNamedPipe(ep->p); -} - -static int -nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_ipc_ep *ep = evt->ptr; - NNI_ARG_UNUSED(aio); - - if (!ConnectNamedPipe(ep->p, &evt->olpd)) { - int rv = GetLastError(); - switch (rv) { - case ERROR_PIPE_CONNECTED: - // Kind of like success, but as this is technically - // an "error", we have to complete it ourself. - evt->status = 0; - evt->count = 0; - return (1); - - case ERROR_IO_PENDING: - // Normal asynchronous operation. Wait for - // completion. - return (0); - - default: - // Fast-fail (synchronous). - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - } - - // Synchronous success - the I/O completion packet should still - // be delivered. - return (0); -} - -void -nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) -{ - nni_win_event_submit(&ep->acc_ev, aio); -} - -// So Windows IPC is a bit different on the client side. There is no -// support for asynchronous connection, but we can fake it with a -// single thread that runs to establish the connection. That thread -// will run keep looping, sleeping for 10 ms between attempts. It -// performs non-blocking attempts to connect. -typedef struct nni_win_ipc_conn_work nni_win_ipc_conn_work; -struct nni_win_ipc_conn_work { - nni_list waiters; - nni_list workers; - nni_mtx mtx; - nni_cv cv; - nni_thr thr; - int exit; -}; - -static nni_win_ipc_conn_work nni_win_ipc_connecter; - -static void -nni_win_ipc_conn_thr(void *arg) -{ - nni_win_ipc_conn_work *w = arg; - nni_plat_ipc_ep * ep; - nni_plat_ipc_pipe * pipe; - nni_aio * aio; - HANDLE p; - int rv; - - nni_mtx_lock(&w->mtx); - for (;;) { - if (w->exit) { - break; - } - while ((ep = nni_list_first(&w->waiters)) != NULL) { - nni_list_remove(&w->waiters, ep); - nni_list_append(&w->workers, ep); - } - - while ((ep = nni_list_first(&w->workers)) != NULL) { - - nni_list_remove(&w->workers, ep); - - if ((aio = ep->con_aio) == NULL) { - continue; - } - ep->con_aio = NULL; - - pipe = NULL; - - p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE, - 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, - NULL); - - if (p == INVALID_HANDLE_VALUE) { - switch ((rv = GetLastError())) { - case ERROR_PIPE_BUSY: - // Still in progress. This - // shouldn't happen unless the - // other side aborts the - // connection. - ep->con_aio = aio; - nni_list_append(&w->waiters, ep); - continue; - - case ERROR_FILE_NOT_FOUND: - rv = NNG_ECONNREFUSED; - break; - default: - rv = nni_win_error(rv); - break; - } - goto fail; - } - if (((rv = nni_win_ipc_pipe_init( - &pipe, p, NNI_EP_MODE_DIAL)) != 0) || - ((rv = nni_win_iocp_register(p)) != 0)) { - goto fail; - } - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); - continue; - - fail: - if (p != INVALID_HANDLE_VALUE) { - DisconnectNamedPipe(p); - CloseHandle(p); - } - if (pipe != NULL) { - nni_plat_ipc_pipe_fini(pipe); - } - nni_aio_finish_error(aio, rv); - } - - if (nni_list_empty(&w->waiters)) { - // Wait until an endpoint is added. - nni_cv_wait(&w->cv); - } else { - // Wait 10 ms, unless woken earlier. - nni_cv_until(&w->cv, nni_clock() + 10); - } - } - nni_mtx_unlock(&w->mtx); -} - -static void -nni_win_ipc_conn_cancel(nni_aio *aio, int rv) -{ - nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; - nni_plat_ipc_ep * ep = nni_aio_get_prov_data(aio); - - nni_mtx_lock(&w->mtx); - if (aio == ep->con_aio) { - ep->con_aio = NULL; - if (nni_list_active(&w->waiters, ep)) { - nni_list_remove(&w->waiters, ep); - nni_cv_wake(&w->cv); - } - nni_aio_finish_error(aio, rv); - } - nni_mtx_unlock(&w->mtx); -} - -void -nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) -{ - nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; - int rv; - - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&w->mtx); - if ((rv = nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep)) != 0) { - nni_mtx_unlock(&w->mtx); - nni_aio_finish_error(aio, rv); - return; - } - - NNI_ASSERT(!nni_list_active(&w->waiters, ep)); - - ep->con_aio = aio; - nni_list_append(&w->waiters, ep); - nni_cv_wake(&w->cv); - nni_mtx_unlock(&w->mtx); -} - -void -nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) -{ - nni_plat_ipc_ep_close(ep); - if (ep->p != INVALID_HANDLE_VALUE) { - CloseHandle(ep->p); - ep->p = INVALID_HANDLE_VALUE; - } - nni_win_event_close(&ep->acc_ev); - nni_win_event_fini(&ep->acc_ev); - NNI_FREE_STRUCT(ep); -} - -void -nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) -{ - nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; - nni_aio * aio; - - switch (ep->mode) { - case NNI_EP_MODE_DIAL: - nni_mtx_lock(&w->mtx); - if (nni_list_active(&w->waiters, ep)) { - nni_list_remove(&w->waiters, ep); - } - if ((aio = ep->con_aio) != NULL) { - ep->con_aio = NULL; - nni_aio_finish_error(aio, NNG_ECLOSED); - } - nni_mtx_unlock(&w->mtx); - break; - - case NNI_EP_MODE_LISTEN: - nni_win_event_close(&ep->acc_ev); - if (ep->p != INVALID_HANDLE_VALUE) { - CloseHandle(ep->p); - ep->p = INVALID_HANDLE_VALUE; - } - break; - } -} - -int -nni_win_ipc_sysinit(void) -{ - int rv; - nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter; - - NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node); - NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node); - - nni_mtx_init(&worker->mtx); - nni_cv_init(&worker->cv, &worker->mtx); - - rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker); - if (rv != 0) { - return (rv); - } - - nni_thr_run(&worker->thr); - - return (0); -} - -void -nni_win_ipc_sysfini(void) -{ - nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter; - - nni_mtx_lock(&worker->mtx); - worker->exit = 1; - nni_cv_wake(&worker->cv); - nni_mtx_unlock(&worker->mtx); - nni_thr_fini(&worker->thr); - nni_cv_fini(&worker->cv); - nni_mtx_fini(&worker->mtx); -} - -#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_ipc.h b/src/platform/windows/win_ipc.h new file mode 100644 index 00000000..51ce5548 --- /dev/null +++ b/src/platform/windows/win_ipc.h @@ -0,0 +1,62 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef PLATFORM_WIN_WINIPC_H +#define PLATFORM_WIN_WINIPC_H + +// This header file is private to the IPC (named pipes) support for Windows. + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +struct nni_ipc_conn { + HANDLE f; + nni_win_io recv_io; + nni_win_io send_io; + nni_win_io conn_io; + nni_list recv_aios; + nni_list send_aios; + nni_aio * conn_aio; + nni_ipc_dialer * dialer; + nni_ipc_listener *listener; + int recv_rv; + int send_rv; + int conn_rv; + bool closed; + nni_mtx mtx; + nni_cv cv; + nni_reap_item reap; +}; + +struct nni_ipc_dialer { + bool closed; // dialers are locked by the worker lock + nni_list aios; + nni_list_node node; // node on worker list +}; + +struct nni_ipc_listener { + char * path; + bool started; + bool closed; + HANDLE f; + SECURITY_ATTRIBUTES sec_attr; + nni_list aios; + nni_mtx mtx; + nni_cv cv; + nni_win_io io; + int rv; +}; + +extern int nni_win_ipc_conn_init(nni_ipc_conn **, HANDLE); + +#endif // NNG_PLATFORM_WINDOWS + +#endif // NNG_PLATFORM_WIN_WINIPC_H diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c new file mode 100644 index 00000000..d8ef4e4e --- /dev/null +++ b/src/platform/windows/win_ipcconn.c @@ -0,0 +1,388 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include "win_ipc.h" + +#include <stdio.h> + +static void +ipc_recv_start(nni_ipc_conn *c) +{ + nni_aio *aio; + unsigned idx; + unsigned naiov; + nni_iov *aiov; + void * buf; + DWORD len; + int rv; + + if (c->closed) { + while ((aio = nni_list_first(&c->recv_aios)) != NULL) { + nni_list_remove(&c->recv_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } +again: + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + + idx = 0; + while ((idx < naiov) && (aiov[idx].iov_len == 0)) { + idx++; + } + NNI_ASSERT(idx < naiov); + // Now start a transfer. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes do + // not appear to support scatter/gather, so we have to process + // each element in turn. + buf = aiov[idx].iov_buf; + len = (DWORD) aiov[idx].iov_len; + NNI_ASSERT(buf != NULL); + NNI_ASSERT(len != 0); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } + + if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +ipc_recv_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_ipc_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->recv_rv != 0) { + rv = c->recv_rv; + c->recv_rv = 0; + } + nni_aio_list_remove(aio); + ipc_recv_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + if ((rv == 0) && (num == 0)) { + // A zero byte receive is a remote close from the peer. + rv = NNG_ECLOSED; + } + nni_aio_finish_synch(aio, rv, num); +} +static void +ipc_recv_cancel(nni_aio *aio, int rv) +{ + nni_ipc_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->recv_aios)) { + c->recv_rv = rv; + CancelIoEx(c->f, &c->recv_io.olpd); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->recv_aios, aio); + if (aio == nni_list_first(&c->recv_aios)) { + ipc_recv_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +static void +ipc_send_start(nni_ipc_conn *c) +{ + nni_aio *aio; + unsigned idx; + unsigned naiov; + nni_iov *aiov; + void * buf; + DWORD len; + int rv; + + if (c->closed) { + while ((aio = nni_list_first(&c->send_aios)) != NULL) { + nni_list_remove(&c->send_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } +again: + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + + idx = 0; + while ((idx < naiov) && (aiov[idx].iov_len == 0)) { + idx++; + } + NNI_ASSERT(idx < naiov); + // Now start a transfer. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes do + // not appear to support scatter/gather, so we have to process + // each element in turn. + buf = aiov[idx].iov_buf; + len = (DWORD) aiov[idx].iov_len; + NNI_ASSERT(buf != NULL); + NNI_ASSERT(len != 0); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } + + if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +ipc_send_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_ipc_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->send_rv != 0) { + rv = c->send_rv; + c->send_rv = 0; + } + nni_aio_list_remove(aio); + ipc_send_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + if ((rv == 0) && (num == 0)) { + // A zero byte receive is a remote close from the peer. + rv = NNG_ECLOSED; + } + nni_aio_finish_synch(aio, rv, num); +} + +static void +ipc_send_cancel(nni_aio *aio, int rv) +{ + nni_ipc_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->send_aios)) { + c->send_rv = rv; + CancelIoEx(c->f, &c->send_io.olpd); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, ipc_send_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->send_aios, aio); + if (aio == nni_list_first(&c->send_aios)) { + ipc_send_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_win_ipc_conn_init(nni_ipc_conn **connp, HANDLE p) +{ + nni_ipc_conn *c; + int rv; + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + c->f = INVALID_HANDLE_VALUE; + nni_mtx_init(&c->mtx); + nni_cv_init(&c->cv, &c->mtx); + nni_aio_list_init(&c->recv_aios); + nni_aio_list_init(&c->send_aios); + + if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) || + ((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) { + nni_ipc_conn_fini(c); + return (rv); + } + + c->f = p; + *connp = c; + return (0); +} + +void +nni_ipc_conn_close(nni_ipc_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + c->closed = true; + if (!nni_list_empty(&c->recv_aios)) { + CancelIoEx(c->f, &c->recv_io.olpd); + } + if (!nni_list_empty(&c->send_aios)) { + CancelIoEx(c->f, &c->send_io.olpd); + } + + if (c->f != INVALID_HANDLE_VALUE) { + // NB: closing the pipe is dangerous at this point. + DisconnectNamedPipe(c->f); + } + } + nni_mtx_unlock(&c->mtx); +} + +static void +ipc_conn_reap(nni_ipc_conn *c) +{ + nni_mtx_lock(&c->mtx); + while ((!nni_list_empty(&c->recv_aios)) || + (!nni_list_empty(&c->send_aios))) { + nni_cv_wait(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + nni_win_io_fini(&c->recv_io); + nni_win_io_fini(&c->send_io); + nni_win_io_fini(&c->conn_io); + + if (c->f != INVALID_HANDLE_VALUE) { + CloseHandle(c->f); + } + nni_cv_fini(&c->cv); + nni_mtx_fini(&c->mtx); + NNI_FREE_STRUCT(c); +} + +void +nni_ipc_conn_fini(nni_ipc_conn *c) +{ + nni_ipc_conn_close(c); + + nni_reap(&c->reap, (nni_cb) ipc_conn_reap, c); +} + +int +nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *id) +{ + NNI_ARG_UNUSED(c); + NNI_ARG_UNUSED(id); + return (NNG_ENOTSUP); +} + +int +nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *id) +{ + NNI_ARG_UNUSED(c); + NNI_ARG_UNUSED(id); + return (NNG_ENOTSUP); +} + +int +nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *id) +{ + NNI_ARG_UNUSED(c); + NNI_ARG_UNUSED(id); + return (NNG_ENOTSUP); +} + +int +nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid) +{ + ULONG id; + if (c->dialer) { + if (!GetNamedPipeServerProcessId(c->f, &id)) { + return (nni_win_error(GetLastError())); + } + } else { + if (!GetNamedPipeClientProcessId(c->f, &id)) { + return (nni_win_error(GetLastError())); + } + } + *pid = id; + return (0); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c new file mode 100644 index 00000000..429bcedf --- /dev/null +++ b/src/platform/windows/win_ipcdial.c @@ -0,0 +1,265 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include "win_ipc.h" + +#include <stdio.h> + +int +nni_ipc_dialer_init(nni_ipc_dialer **dp) +{ + nni_ipc_dialer *d; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + d->closed = false; + nni_aio_list_init(&d->aios); + *dp = d; + return (0); +} + +// Windows IPC is a bit different on the client side. There is no +// support for asynchronous connection, but we can fake it with a +// single thread that runs to establish the connection. That thread +// will run keep looping, sleeping for 10 ms between attempts. It +// performs non-blocking attempts to connect. +typedef struct ipc_dial_work { + nni_list waiters; + nni_list workers; + nni_mtx mtx; + nni_cv cv; + nni_thr thr; + int exit; +} ipc_dial_work; + +static ipc_dial_work ipc_connecter; + +static void +ipc_dial_thr(void *arg) +{ + ipc_dial_work *w = arg; + + nni_mtx_lock(&w->mtx); + for (;;) { + nni_ipc_dialer *d; + + if (w->exit) { + break; + } + while ((d = nni_list_first(&w->waiters)) != NULL) { + nni_list_remove(&w->waiters, d); + nni_list_append(&w->workers, d); + } + + while ((d = nni_list_first(&w->workers)) != NULL) { + nni_ipc_conn *c; + nni_aio * aio; + HANDLE f; + int rv; + char * path; + + if ((aio = nni_list_first(&d->aios)) == NULL) { + nni_list_remove(&w->workers, d); + continue; + } + + path = nni_aio_get_prov_extra(aio, 0); + + f = CreateFileA(path, GENERIC_READ | GENERIC_WRITE, 0, + NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); + + if (f == INVALID_HANDLE_VALUE) { + switch ((rv = GetLastError())) { + case ERROR_PIPE_BUSY: + // Still in progress. This + // shouldn't happen unless the + // other side aborts the + // connection. + // back at the head of the list + nni_list_remove(&w->workers, d); + nni_list_prepend(&w->waiters, d); + continue; + + case ERROR_FILE_NOT_FOUND: + rv = NNG_ECONNREFUSED; + break; + default: + rv = nni_win_error(rv); + break; + } + nni_list_remove(&d->aios, aio); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_strfree(path); + nni_aio_finish_error(aio, rv); + continue; + } + + nni_list_remove(&d->aios, aio); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_strfree(path); + + if (((rv = nni_win_io_register(f)) != 0) || + ((rv = nni_win_ipc_conn_init(&c, f)) != 0)) { + DisconnectNamedPipe(f); + CloseHandle(f); + nni_aio_finish_error(aio, rv); + continue; + } + c->dialer = d; + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } + + if (nni_list_empty(&w->waiters)) { + // Wait until an endpoint is added. + nni_cv_wait(&w->cv); + } else { + // Wait 10 ms, unless woken earlier. + nni_cv_until(&w->cv, nni_clock() + 10); + } + } + nni_mtx_unlock(&w->mtx); +} + +static void +ipc_dial_cancel(nni_aio *aio, int rv) +{ + ipc_dial_work * w = &ipc_connecter; + nni_ipc_dialer *d = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&w->mtx); + if (nni_aio_list_active(aio)) { + char *path; + if (nni_list_active(&w->waiters, d)) { + nni_list_remove(&w->waiters, d); + nni_cv_wake(&w->cv); + } + nni_aio_list_remove(aio); + path = nni_aio_get_prov_extra(aio, 0); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_strfree(path); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&w->mtx); +} + +void +nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + ipc_dial_work *w = &ipc_connecter; + char * path; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + if (sa->s_family != NNG_AF_IPC) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + if ((rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path)) != + 0) { + nni_aio_finish_error(aio, rv); + return; + } + + nni_mtx_lock(&w->mtx); + if ((rv = nni_aio_schedule(aio, ipc_dial_cancel, d)) != 0) { + nni_mtx_unlock(&w->mtx); + nni_strfree(path); + nni_aio_finish_error(aio, rv); + return; + } + + if (d->closed) { + nni_mtx_unlock(&w->mtx); + nni_strfree(path); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + nni_aio_set_prov_extra(aio, 0, path); + nni_list_append(&d->aios, aio); + if (nni_list_first(&d->aios) == aio) { + nni_list_append(&w->waiters, d); + nni_cv_wake(&w->cv); + } + nni_mtx_unlock(&w->mtx); +} + +void +nni_ipc_dialer_fini(nni_ipc_dialer *d) +{ + nni_ipc_dialer_close(d); + NNI_FREE_STRUCT(d); +} + +void +nni_ipc_dialer_close(nni_ipc_dialer *d) +{ + ipc_dial_work *w = &ipc_connecter; + nni_aio * aio; + + nni_mtx_lock(&w->mtx); + d->closed = true; + if (nni_list_active(&w->waiters, d)) { + nni_list_remove(&w->waiters, d); + } + while ((aio = nni_list_first(&d->aios)) != NULL) { + nni_list_remove(&d->aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&w->mtx); +} + +int +nni_win_ipc_sysinit(void) +{ + int rv; + ipc_dial_work *worker = &ipc_connecter; + + NNI_LIST_INIT(&worker->workers, nni_ipc_dialer, node); + NNI_LIST_INIT(&worker->waiters, nni_ipc_dialer, node); + + nni_mtx_init(&worker->mtx); + nni_cv_init(&worker->cv, &worker->mtx); + + rv = nni_thr_init(&worker->thr, ipc_dial_thr, worker); + if (rv != 0) { + return (rv); + } + + nni_thr_run(&worker->thr); + + return (0); +} + +void +nni_win_ipc_sysfini(void) +{ + ipc_dial_work *worker = &ipc_connecter; + + nni_reap_drain(); // so that listeners get cleaned up. + + nni_mtx_lock(&worker->mtx); + worker->exit = 1; + nni_cv_wake(&worker->cv); + nni_mtx_unlock(&worker->mtx); + nni_thr_fini(&worker->thr); + nni_cv_fini(&worker->cv); + nni_mtx_fini(&worker->mtx); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c new file mode 100644 index 00000000..20bb8548 --- /dev/null +++ b/src/platform/windows/win_ipclisten.c @@ -0,0 +1,296 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include "win_ipc.h" + +#include <stdio.h> + +static void +ipc_accept_done(nni_ipc_listener *l, int rv) +{ + nni_aio * aio; + HANDLE f; + nni_ipc_conn *c; + + aio = nni_list_first(&l->aios); + nni_list_remove(&l->aios, aio); + nni_cv_wake(&l->cv); + + if (l->closed) { + // Closed, so bail. + DisconnectNamedPipe(l->f); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + // Create a replacement pipe. + f = CreateNamedPipeA(l->path, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | + PIPE_REJECT_REMOTE_CLIENTS, + PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr); + if (f == INVALID_HANDLE_VALUE) { + // We couldn't create a replacement pipe, so we have to + // abort the client from our side, so that we can keep + // our server pipe available. + rv = nni_win_error(GetLastError()); + DisconnectNamedPipe(l->f); + nni_aio_finish_error(aio, rv); + return; + } + + if (((rv = nni_win_io_register(f)) != 0) || + ((rv = nni_win_ipc_conn_init(&c, l->f)) != 0)) { + DisconnectNamedPipe(l->f); + DisconnectNamedPipe(f); + CloseHandle(f); + nni_aio_finish_error(aio, rv); + return; + } + l->f = f; + c->listener = l; + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +static void +ipc_accept_start(nni_ipc_listener *l) +{ + nni_aio *aio; + + if (l->closed) { + while ((aio = nni_list_first(&l->aios)) != NULL) { + nni_list_remove(&l->aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&l->cv); + } + + while ((aio = nni_list_first(&l->aios)) != NULL) { + int rv; + + if ((ConnectNamedPipe(l->f, &l->io.olpd)) || + ((rv = GetLastError()) == ERROR_IO_PENDING)) { + // Success, or pending, handled via completion pkt. + return; + } + if (rv == ERROR_PIPE_CONNECTED) { + // Kind of like success, but as this is technically + // an "error", we have to complete it ourself. + // Fake a completion. + ipc_accept_done(l, 0); + } else { + // Fast-fail (synchronous). + nni_aio_finish_error(aio, nni_win_error(rv)); + } + } +} + +static void +ipc_accept_cb(nni_win_io *io, int rv, size_t cnt) +{ + nni_ipc_listener *l = io->ptr; + + NNI_ARG_UNUSED(cnt); + + nni_mtx_lock(&l->mtx); + if (nni_list_empty(&l->aios)) { + // We canceled this somehow. We no longer care. + DisconnectNamedPipe(l->f); + nni_mtx_unlock(&l->mtx); + return; + } + if (l->rv != 0) { + rv = l->rv; + l->rv = 0; + } + ipc_accept_done(l, rv); + ipc_accept_start(l); + nni_mtx_unlock(&l->mtx); +} + +int +nni_ipc_listener_init(nni_ipc_listener **lp) +{ + nni_ipc_listener *l; + int rv; + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_win_io_init(&l->io, ipc_accept_cb, l)) != 0) { + NNI_FREE_STRUCT(l); + return (rv); + } + l->started = false; + l->closed = false; + l->sec_attr.nLength = sizeof(l->sec_attr); + l->sec_attr.lpSecurityDescriptor = NULL; + l->sec_attr.bInheritHandle = FALSE; + nni_aio_list_init(&l->aios); + nni_mtx_init(&l->mtx); + nni_cv_init(&l->cv, &l->mtx); + *lp = l; + return (0); +} + +int +nni_ipc_listener_set_permissions(nni_ipc_listener *l, int bits) +{ + NNI_ARG_UNUSED(l); + NNI_ARG_UNUSED(bits); + return (NNG_ENOTSUP); +} + +int +nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *desc) +{ + if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) { + return (NNG_EINVAL); + } + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_EBUSY); + } + l->sec_attr.lpSecurityDescriptor = desc; + nni_mtx_unlock(&l->mtx); + return (0); +} + +int +nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa) +{ + int rv; + HANDLE f; + char * path; + + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_EBUSY); + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path); + if (rv != 0) { + nni_mtx_unlock(&l->mtx); + return (rv); + } + + f = CreateNamedPipeA(path, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | + FILE_FLAG_FIRST_PIPE_INSTANCE, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | + PIPE_REJECT_REMOTE_CLIENTS, + PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr); + if (f == INVALID_HANDLE_VALUE) { + if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) { + rv = NNG_EADDRINUSE; + } else { + rv = nni_win_error(rv); + } + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + return (rv); + } + if ((rv = nni_win_io_register(f)) != 0) { + CloseHandle(f); + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + return (rv); + } + + l->f = f; + l->path = path; + l->started = true; + nni_mtx_unlock(&l->mtx); + return (0); +} + +static void +ipc_accept_cancel(nni_aio *aio, int rv) +{ + nni_ipc_listener *l = nni_aio_get_prov_data(aio); + + nni_mtx_unlock(&l->mtx); + if (aio == nni_list_first(&l->aios)) { + l->rv = rv; + CancelIoEx(l->f, &l->io.olpd); + } else if (nni_aio_list_active(aio)) { + nni_list_remove(&l->aios, aio); + nni_cv_wake(&l->cv); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio) +{ + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + if (!l->started) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + nni_list_append(&l->aios, aio); + if (nni_list_first(&l->aios) == aio) { + ipc_accept_start(l); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_ipc_listener_close(nni_ipc_listener *l) +{ + + nni_mtx_lock(&l->mtx); + if (!l->closed) { + l->closed = true; + if (!nni_list_empty(&l->aios)) { + CancelIoEx(l->f, &l->io.olpd); + } + DisconnectNamedPipe(l->f); + CloseHandle(l->f); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_ipc_listener_fini(nni_ipc_listener *l) +{ + nni_mtx_lock(&l->mtx); + while (!nni_list_empty(&l->aios)) { + nni_cv_wait(&l->cv); + } + nni_mtx_unlock(&l->mtx); + nni_win_io_fini(&l->io); + nni_strfree(l->path); + nni_cv_fini(&l->cv); + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c index c3a4a5d8..08b759ca 100644 --- a/src/platform/windows/win_tcpconn.c +++ b/src/platform/windows/win_tcpconn.c @@ -102,7 +102,7 @@ tcp_recv_cancel(nni_aio *aio, int rv) nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->recv_aios)) { c->recv_rv = rv; - nni_win_io_cancel(&c->recv_io); + CancelIoEx((HANDLE) c->s, &c->recv_io.olpd); } else if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -192,7 +192,7 @@ tcp_send_cancel(nni_aio *aio, int rv) nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->send_aios)) { c->send_rv = rv; - nni_win_io_cancel(&c->send_io); + CancelIoEx((HANDLE) c->s, &c->send_io.olpd); } else if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -273,10 +273,8 @@ nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s) nni_aio_list_init(&c->send_aios); c->conn_aio = NULL; - if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) != - 0) || - ((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) != - 0) || + if (((rv = nni_win_io_init(&c->recv_io, tcp_recv_cb, c)) != 0) || + ((rv = nni_win_io_init(&c->send_io, tcp_send_cb, c)) != 0) || ((rv = nni_win_io_register((HANDLE) s)) != 0)) { nni_tcp_conn_fini(c); return (rv); @@ -309,10 +307,10 @@ nni_tcp_conn_close(nni_tcp_conn *c) if (!c->closed) { c->closed = true; if (!nni_list_empty(&c->recv_aios)) { - nni_win_io_cancel(&c->recv_io); + CancelIoEx((HANDLE) c->s, &c->recv_io.olpd); } if (!nni_list_empty(&c->send_aios)) { - nni_win_io_cancel(&c->send_io); + CancelIoEx((HANDLE) c->s, &c->send_io.olpd); } if (c->s != INVALID_SOCKET) { shutdown(c->s, SD_BOTH); @@ -372,7 +370,6 @@ nni_tcp_conn_fini(nni_tcp_conn *c) while ((!nni_list_empty(&c->recv_aios)) || (!nni_list_empty(&c->send_aios))) { nni_cv_wait(&c->cv); - nni_mtx_unlock(&c->mtx); } nni_mtx_unlock(&c->mtx); diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c index 4a3e9f2f..5283ea81 100644 --- a/src/platform/windows/win_tcpdial.c +++ b/src/platform/windows/win_tcpdial.c @@ -70,7 +70,7 @@ nni_tcp_dialer_close(nni_tcp_dialer *d) if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { c->conn_rv = NNG_ECLOSED; - nni_win_io_cancel(&c->conn_io); + CancelIoEx((HANDLE) c->s, &c->conn_io.olpd); } } } @@ -104,7 +104,7 @@ tcp_dial_cancel(nni_aio *aio, int rv) if (c->conn_rv == 0) { c->conn_rv = rv; } - nni_win_io_cancel(&c->conn_io); + CancelIoEx((HANDLE) c->s, &c->conn_io.olpd); } nni_mtx_unlock(&d->mtx); } @@ -187,8 +187,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } - if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) != - 0) { + if ((rv = nni_win_io_init(&c->conn_io, tcp_dial_cb, c)) != 0) { nni_tcp_conn_fini(c); nni_aio_finish_error(aio, rv); return; diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c index 5055c32d..1f197246 100644 --- a/src/platform/windows/win_tcplisten.c +++ b/src/platform/windows/win_tcplisten.c @@ -147,7 +147,7 @@ nni_tcp_listener_close(nni_tcp_listener *l) if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { c->conn_rv = NNG_ECLOSED; - nni_win_io_cancel(&c->conn_io); + CancelIoEx((HANDLE) c->s, &c->conn_io.olpd); } } closesocket(l->s); @@ -246,7 +246,7 @@ tcp_accept_cancel(nni_aio *aio, int rv) if (c->conn_rv == 0) { c->conn_rv = rv; } - nni_win_io_cancel(&c->conn_io); + CancelIoEx((HANDLE) c->s, &c->conn_io.olpd); } nni_mtx_unlock(&l->mtx); } @@ -263,6 +263,11 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) return; } nni_mtx_lock(&l->mtx); + if (!l->started) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } if (l->closed) { nni_mtx_unlock(&l->mtx); nni_aio_finish_error(aio, NNG_ECLOSED); @@ -286,8 +291,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) c->listener = l; c->conn_aio = aio; nni_aio_set_prov_extra(aio, 0, c); - if (((rv = nni_win_io_init( - &c->conn_io, (HANDLE) l->s, tcp_accept_cb, c)) != 0) || + if (((rv = nni_win_io_init(&c->conn_io, tcp_accept_cb, c)) != 0) || ((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) { nni_aio_set_prov_extra(aio, 0, NULL); nni_mtx_unlock(&l->mtx); diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 7d99e507..ee72d6d8 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -20,16 +20,17 @@ // Windows named pipes. Other platforms could use other mechanisms, // but all implementations on the platform must use the same mechanism. -typedef struct ipc_pipe ipc_pipe; -typedef struct ipc_ep ipc_ep; +typedef struct ipctran_pipe ipctran_pipe; +typedef struct ipctran_dialer ipctran_dialer; +typedef struct ipctran_listener ipctran_listener; // ipc_pipe is one end of an IPC connection. -struct ipc_pipe { - nni_plat_ipc_pipe *ipp; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - nni_sockaddr sa; +struct ipctran_pipe { + nni_ipc_conn *conn; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + nni_sockaddr sa; uint8_t txhead[1 + sizeof(uint64_t)]; uint8_t rxhead[1 + sizeof(uint64_t)]; @@ -48,189 +49,201 @@ struct ipc_pipe { nni_mtx mtx; }; -struct ipc_ep { - nni_sockaddr sa; - nni_plat_ipc_ep *iep; - uint16_t proto; - size_t rcvmax; - nni_aio * aio; - nni_aio * user_aio; - nni_mtx mtx; +struct ipctran_dialer { + nni_sockaddr sa; + nni_ipc_dialer *dialer; + uint16_t proto; + size_t rcvmax; + nni_aio * aio; + nni_aio * user_aio; + nni_mtx mtx; }; -static void ipc_pipe_dosend(ipc_pipe *, nni_aio *); -static void ipc_pipe_dorecv(ipc_pipe *); -static void ipc_pipe_send_cb(void *); -static void ipc_pipe_recv_cb(void *); -static void ipc_pipe_nego_cb(void *); -static void ipc_ep_cb(void *); +struct ipctran_listener { + nni_sockaddr sa; + nni_ipc_listener *listener; + uint16_t proto; + size_t rcvmax; + nni_aio * aio; + nni_aio * user_aio; + nni_mtx mtx; +}; + +static void ipctran_pipe_send_start(ipctran_pipe *); +static void ipctran_pipe_recv_start(ipctran_pipe *); +static void ipctran_pipe_send_cb(void *); +static void ipctran_pipe_recv_cb(void *); +static void ipctran_pipe_nego_cb(void *); +static void ipctran_dialer_cb(void *); +static void ipctran_listener_cb(void *); static int -ipc_tran_init(void) +ipctran_init(void) { return (0); } static void -ipc_tran_fini(void) +ipctran_fini(void) { } static void -ipc_pipe_close(void *arg) +ipctran_pipe_close(void *arg) { - ipc_pipe *pipe = arg; + ipctran_pipe *p = arg; - nni_aio_close(pipe->rxaio); - nni_aio_close(pipe->txaio); - nni_aio_close(pipe->negaio); + nni_aio_close(p->rxaio); + nni_aio_close(p->txaio); + nni_aio_close(p->negaio); - nni_plat_ipc_pipe_close(pipe->ipp); + nni_ipc_conn_close(p->conn); } static void -ipc_pipe_stop(void *arg) +ipctran_pipe_stop(void *arg) { - ipc_pipe *pipe = arg; + ipctran_pipe *p = arg; - nni_aio_stop(pipe->rxaio); - nni_aio_stop(pipe->txaio); - nni_aio_stop(pipe->negaio); + nni_aio_stop(p->rxaio); + nni_aio_stop(p->txaio); + nni_aio_stop(p->negaio); } static void -ipc_pipe_fini(void *arg) +ipctran_pipe_fini(void *arg) { - ipc_pipe *pipe = arg; + ipctran_pipe *p = arg; - nni_aio_fini(pipe->rxaio); - nni_aio_fini(pipe->txaio); - nni_aio_fini(pipe->negaio); - if (pipe->ipp != NULL) { - nni_plat_ipc_pipe_fini(pipe->ipp); + nni_aio_fini(p->rxaio); + nni_aio_fini(p->txaio); + nni_aio_fini(p->negaio); + if (p->conn != NULL) { + nni_ipc_conn_fini(p->conn); } - if (pipe->rxmsg) { - nni_msg_free(pipe->rxmsg); + if (p->rxmsg) { + nni_msg_free(p->rxmsg); } - nni_mtx_fini(&pipe->mtx); - NNI_FREE_STRUCT(pipe); + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); } static int -ipc_pipe_init(ipc_pipe **pipep, ipc_ep *ep, void *ipp) +ipctran_pipe_init(ipctran_pipe **pipep, void *conn) { - ipc_pipe *p; - int rv; + ipctran_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->txaio, ipc_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, ipc_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negaio, ipc_pipe_nego_cb, p)) != 0)) { - ipc_pipe_fini(p); + if (((rv = nni_aio_init(&p->txaio, ipctran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->negaio, ipctran_pipe_nego_cb, p)) != 0)) { + ipctran_pipe_fini(p); return (rv); } nni_aio_list_init(&p->sendq); nni_aio_list_init(&p->recvq); + p->conn = conn; +#if 0 p->proto = ep->proto; p->rcvmax = ep->rcvmax; - p->ipp = ipp; p->sa.s_ipc.sa_family = NNG_AF_IPC; p->sa = ep->sa; - +#endif *pipep = p; return (0); } static void -ipc_cancel_start(nni_aio *aio, int rv) +ipctran_pipe_nego_cancel(nni_aio *aio, int rv) { - ipc_pipe *pipe = nni_aio_get_prov_data(aio); + ipctran_pipe *p = nni_aio_get_prov_data(aio); - nni_mtx_lock(&pipe->mtx); - if (pipe->user_negaio != aio) { - nni_mtx_unlock(&pipe->mtx); + nni_mtx_lock(&p->mtx); + if (p->user_negaio != aio) { + nni_mtx_unlock(&p->mtx); return; } - pipe->user_negaio = NULL; - nni_mtx_unlock(&pipe->mtx); + p->user_negaio = NULL; + nni_mtx_unlock(&p->mtx); - nni_aio_abort(pipe->negaio, rv); + nni_aio_abort(p->negaio, rv); nni_aio_finish_error(aio, rv); } static void -ipc_pipe_nego_cb(void *arg) +ipctran_pipe_nego_cb(void *arg) { - ipc_pipe *pipe = arg; - nni_aio * aio = pipe->negaio; - int rv; + ipctran_pipe *p = arg; + nni_aio * aio = p->negaio; + int rv; - nni_mtx_lock(&pipe->mtx); + nni_mtx_lock(&p->mtx); if ((rv = nni_aio_result(aio)) != 0) { goto done; } // We start transmitting before we receive. - if (pipe->gottxhead < pipe->wanttxhead) { - pipe->gottxhead += nni_aio_count(aio); - } else if (pipe->gotrxhead < pipe->wantrxhead) { - pipe->gotrxhead += nni_aio_count(aio); + if (p->gottxhead < p->wanttxhead) { + p->gottxhead += nni_aio_count(aio); + } else if (p->gotrxhead < p->wantrxhead) { + p->gotrxhead += nni_aio_count(aio); } - if (pipe->gottxhead < pipe->wanttxhead) { + if (p->gottxhead < p->wanttxhead) { nni_iov iov; - iov.iov_len = pipe->wanttxhead - pipe->gottxhead; - iov.iov_buf = &pipe->txhead[pipe->gottxhead]; + iov.iov_len = p->wanttxhead - p->gottxhead; + iov.iov_buf = &p->txhead[p->gottxhead]; nni_aio_set_iov(aio, 1, &iov); // send it down... - nni_plat_ipc_pipe_send(pipe->ipp, aio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_send(p->conn, aio); + nni_mtx_unlock(&p->mtx); return; } - if (pipe->gotrxhead < pipe->wantrxhead) { + if (p->gotrxhead < p->wantrxhead) { nni_iov iov; - iov.iov_len = pipe->wantrxhead - pipe->gotrxhead; - iov.iov_buf = &pipe->rxhead[pipe->gotrxhead]; + iov.iov_len = p->wantrxhead - p->gotrxhead; + iov.iov_buf = &p->rxhead[p->gotrxhead]; nni_aio_set_iov(aio, 1, &iov); - nni_plat_ipc_pipe_recv(pipe->ipp, aio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_recv(p->conn, aio); + nni_mtx_unlock(&p->mtx); return; } // We have both sent and received the headers. Lets check the // receive side header. - if ((pipe->rxhead[0] != 0) || (pipe->rxhead[1] != 'S') || - (pipe->rxhead[2] != 'P') || (pipe->rxhead[3] != 0) || - (pipe->rxhead[6] != 0) || (pipe->rxhead[7] != 0)) { + if ((p->rxhead[0] != 0) || (p->rxhead[1] != 'S') || + (p->rxhead[2] != 'P') || (p->rxhead[3] != 0) || + (p->rxhead[6] != 0) || (p->rxhead[7] != 0)) { rv = NNG_EPROTO; goto done; } - NNI_GET16(&pipe->rxhead[4], pipe->peer); + NNI_GET16(&p->rxhead[4], p->peer); done: - if ((aio = pipe->user_negaio) != NULL) { - pipe->user_negaio = NULL; + if ((aio = p->user_negaio) != NULL) { + p->user_negaio = NULL; nni_aio_finish(aio, rv, 0); } - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); } static void -ipc_pipe_send_cb(void *arg) +ipctran_pipe_send_cb(void *arg) { - ipc_pipe *pipe = arg; - nni_aio * aio; - nni_aio * txaio = pipe->txaio; - nni_msg * msg; - int rv; - size_t n; + ipctran_pipe *p = arg; + int rv; + nni_aio * aio; + size_t n; + nni_msg * msg; + nni_aio * txaio = p->txaio; - nni_mtx_lock(&pipe->mtx); - aio = nni_list_first(&pipe->sendq); + nni_mtx_lock(&p->mtx); + aio = nni_list_first(&p->sendq); if ((rv = nni_aio_result(txaio)) != 0) { // Intentionally we do not queue up another transfer. @@ -239,7 +252,7 @@ ipc_pipe_send_cb(void *arg) // The protocol should see this error, and close the // pipe itself, we hope. nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } @@ -247,17 +260,15 @@ ipc_pipe_send_cb(void *arg) n = nni_aio_count(txaio); nni_aio_iov_advance(txaio, n); if (nni_aio_iov_count(txaio) != 0) { - nni_plat_ipc_pipe_send(pipe->ipp, txaio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_send(p->conn, txaio); + nni_mtx_unlock(&p->mtx); return; } nni_aio_list_remove(aio); - if (!nni_list_empty(&pipe->sendq)) { - // schedule next send - ipc_pipe_dosend(pipe, nni_list_first(&pipe->sendq)); - } - nni_mtx_unlock(&pipe->mtx); + ipctran_pipe_send_start(p); + + nni_mtx_unlock(&p->mtx); msg = nni_aio_get_msg(aio); n = nni_msg_len(msg); @@ -267,17 +278,17 @@ ipc_pipe_send_cb(void *arg) } static void -ipc_pipe_recv_cb(void *arg) +ipctran_pipe_recv_cb(void *arg) { - ipc_pipe *pipe = arg; - nni_aio * aio; - int rv; - size_t n; - nni_msg * msg; - nni_aio * rxaio = pipe->rxaio; + ipctran_pipe *p = arg; + nni_aio * aio; + int rv; + size_t n; + nni_msg * msg; + nni_aio * rxaio = p->rxaio; - nni_mtx_lock(&pipe->mtx); - aio = nni_list_first(&pipe->recvq); + nni_mtx_lock(&p->mtx); + aio = nni_list_first(&p->recvq); if ((rv = nni_aio_result(rxaio)) != 0) { // Error on receive. This has to cause an error back @@ -290,29 +301,29 @@ ipc_pipe_recv_cb(void *arg) nni_aio_iov_advance(rxaio, n); if (nni_aio_iov_count(rxaio) != 0) { // Was this a partial read? If so then resubmit for the rest. - nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_recv(p->conn, rxaio); + nni_mtx_unlock(&p->mtx); return; } // If we don't have a message yet, we were reading the message // header, which is just the length. This tells us the size of the // message to allocate and how much more to expect. - if (pipe->rxmsg == NULL) { + if (p->rxmsg == NULL) { uint64_t len; // Check to make sure we got msg type 1. - if (pipe->rxhead[0] != 1) { + if (p->rxhead[0] != 1) { rv = NNG_EPROTO; goto recv_error; } // We should have gotten a message header. - NNI_GET64(pipe->rxhead + 1, len); + NNI_GET64(p->rxhead + 1, len); // Make sure the message payload is not too big. If it is // the caller will shut down the pipe. - if ((len > pipe->rcvmax) && (pipe->rcvmax > 0)) { + if ((len > p->rcvmax) && (p->rcvmax > 0)) { rv = NNG_EMSGSIZE; goto recv_error; } @@ -322,7 +333,7 @@ ipc_pipe_recv_cb(void *arg) // lock for the read side in the future, so that we allow // transmits to proceed normally. In practice this is // unlikely to be much of an issue though. - if ((rv = nni_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { + if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { goto recv_error; } @@ -330,11 +341,12 @@ ipc_pipe_recv_cb(void *arg) nni_iov iov; // Submit the rest of the data for a read -- we want to // read the entire message now. - iov.iov_buf = nni_msg_body(pipe->rxmsg); + iov.iov_buf = nni_msg_body(p->rxmsg); iov.iov_len = (size_t) len; + nni_aio_set_iov(rxaio, 1, &iov); - nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_recv(p->conn, rxaio); + nni_mtx_unlock(&p->mtx); return; } } @@ -343,12 +355,12 @@ ipc_pipe_recv_cb(void *arg) // good news. nni_aio_list_remove(aio); - msg = pipe->rxmsg; - pipe->rxmsg = NULL; - if (!nni_list_empty(&pipe->recvq)) { - ipc_pipe_dorecv(pipe); + msg = p->rxmsg; + p->rxmsg = NULL; + if (!nni_list_empty(&p->recvq)) { + ipctran_pipe_recv_start(p); } - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); nni_aio_set_msg(aio, msg); nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); @@ -356,59 +368,65 @@ ipc_pipe_recv_cb(void *arg) recv_error: nni_aio_list_remove(aio); - msg = pipe->rxmsg; - pipe->rxmsg = NULL; + msg = p->rxmsg; + p->rxmsg = NULL; // Intentionally, we do not queue up another receive. // The protocol should notice this error and close the pipe. - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); nni_msg_free(msg); nni_aio_finish_error(aio, rv); } static void -ipc_cancel_tx(nni_aio *aio, int rv) +ipctran_pipe_send_cancel(nni_aio *aio, int rv) { - ipc_pipe *pipe = nni_aio_get_prov_data(aio); + ipctran_pipe *p = nni_aio_get_prov_data(aio); - nni_mtx_lock(&pipe->mtx); + nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); return; } // If this is being sent, then cancel the pending transfer. // The callback on the txaio will cause the user aio to // be canceled too. - if (nni_list_first(&pipe->sendq) == aio) { - nni_aio_abort(pipe->txaio, rv); - nni_mtx_unlock(&pipe->mtx); + if (nni_list_first(&p->sendq) == aio) { + nni_aio_abort(p->txaio, rv); + nni_mtx_unlock(&p->mtx); return; } nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); } static void -ipc_pipe_dosend(ipc_pipe *pipe, nni_aio *aio) +ipctran_pipe_send_start(ipctran_pipe *p) { + nni_aio *aio; nni_aio *txaio; nni_msg *msg; int niov; nni_iov iov[3]; uint64_t len; + if ((aio = nni_list_first(&p->sendq)) == NULL) { + return; + } + // This runs to send the message. msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); - pipe->txhead[0] = 1; // message type, 1. - NNI_PUT64(pipe->txhead + 1, len); + p->txhead[0] = 1; // message type, 1. + NNI_PUT64(p->txhead + 1, len); - txaio = pipe->txaio; + txaio = p->txaio; niov = 0; - iov[0].iov_buf = pipe->txhead; - iov[0].iov_len = sizeof(pipe->txhead); + iov[0].iov_buf = p->txhead; + iov[0].iov_len = sizeof(p->txhead); niov++; if (nni_msg_header_len(msg) > 0) { iov[niov].iov_buf = nni_msg_header(msg); @@ -421,504 +439,643 @@ ipc_pipe_dosend(ipc_pipe *pipe, nni_aio *aio) niov++; } nni_aio_set_iov(txaio, niov, iov); - nni_plat_ipc_pipe_send(pipe->ipp, txaio); + nni_ipc_conn_send(p->conn, txaio); } static void -ipc_pipe_send(void *arg, nni_aio *aio) +ipctran_pipe_send(void *arg, nni_aio *aio) { - ipc_pipe *pipe = arg; - int rv; + ipctran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } - nni_mtx_lock(&pipe->mtx); - if ((rv = nni_aio_schedule(aio, ipc_cancel_tx, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, ipctran_pipe_send_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } - nni_list_append(&pipe->sendq, aio); - if (nni_list_first(&pipe->sendq) == aio) { - ipc_pipe_dosend(pipe, aio); + nni_list_append(&p->sendq, aio); + if (nni_list_first(&p->sendq) == aio) { + ipctran_pipe_send_start(p); } - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); } static void -ipc_cancel_rx(nni_aio *aio, int rv) +ipctran_pipe_recv_cancel(nni_aio *aio, int rv) { - ipc_pipe *pipe = nni_aio_get_prov_data(aio); + ipctran_pipe *p = nni_aio_get_prov_data(aio); - nni_mtx_lock(&pipe->mtx); + nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); return; } // If receive in progress, then cancel the pending transfer. // The callback on the rxaio will cause the user aio to // be canceled too. - if (nni_list_first(&pipe->recvq) == aio) { - nni_aio_abort(pipe->rxaio, rv); - nni_mtx_unlock(&pipe->mtx); + if (nni_list_first(&p->recvq) == aio) { + nni_aio_abort(p->rxaio, rv); + nni_mtx_unlock(&p->mtx); return; } nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); } static void -ipc_pipe_dorecv(ipc_pipe *pipe) +ipctran_pipe_recv_start(ipctran_pipe *p) { nni_aio *rxaio; nni_iov iov; - NNI_ASSERT(pipe->rxmsg == NULL); + NNI_ASSERT(p->rxmsg == NULL); // Schedule a read of the IPC header. - rxaio = pipe->rxaio; - iov.iov_buf = pipe->rxhead; - iov.iov_len = sizeof(pipe->rxhead); + rxaio = p->rxaio; + iov.iov_buf = p->rxhead; + iov.iov_len = sizeof(p->rxhead); nni_aio_set_iov(rxaio, 1, &iov); - nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); + nni_ipc_conn_recv(p->conn, rxaio); } static void -ipc_pipe_recv(void *arg, nni_aio *aio) +ipctran_pipe_recv(void *arg, nni_aio *aio) { - ipc_pipe *pipe = arg; - int rv; + ipctran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } - nni_mtx_lock(&pipe->mtx); + nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, ipc_cancel_rx, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); + if ((rv = nni_aio_schedule(aio, ipctran_pipe_recv_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } - nni_list_append(&pipe->recvq, aio); - if (nni_list_first(&pipe->recvq) == aio) { - ipc_pipe_dorecv(pipe); + nni_list_append(&p->recvq, aio); + if (nni_list_first(&p->recvq) == aio) { + ipctran_pipe_recv_start(p); } - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); } static void -ipc_pipe_start(void *arg, nni_aio *aio) +ipctran_pipe_start(void *arg, nni_aio *aio) { - ipc_pipe *pipe = arg; - nni_aio * negaio; - nni_iov iov; - int rv; + ipctran_pipe *p = arg; + nni_aio * negaio; + nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } - nni_mtx_lock(&pipe->mtx); - if ((rv = nni_aio_schedule(aio, ipc_cancel_start, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, ipctran_pipe_nego_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } - pipe->txhead[0] = 0; - pipe->txhead[1] = 'S'; - pipe->txhead[2] = 'P'; - pipe->txhead[3] = 0; - NNI_PUT16(&pipe->txhead[4], pipe->proto); - NNI_PUT16(&pipe->txhead[6], 0); - - pipe->user_negaio = aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - negaio = pipe->negaio; - iov.iov_len = 8; - iov.iov_buf = &pipe->txhead[0]; + p->txhead[0] = 0; + p->txhead[1] = 'S'; + p->txhead[2] = 'P'; + p->txhead[3] = 0; + NNI_PUT16(&p->txhead[4], p->proto); + NNI_PUT16(&p->txhead[6], 0); + + p->user_negaio = aio; + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 8; + p->wanttxhead = 8; + negaio = p->negaio; + iov.iov_len = 8; + iov.iov_buf = &p->txhead[0]; nni_aio_set_iov(negaio, 1, &iov); - nni_plat_ipc_pipe_send(pipe->ipp, negaio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_send(p->conn, negaio); + nni_mtx_unlock(&p->mtx); } static uint16_t -ipc_pipe_peer(void *arg) +ipctran_pipe_peer(void *arg) { - ipc_pipe *pipe = arg; + ipctran_pipe *p = arg; - return (pipe->peer); + return (p->peer); } static int -ipc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) +ipctran_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_pipe *p = arg; + ipctran_pipe *p = arg; return (nni_copyout_sockaddr(&p->sa, buf, szp, t)); } static int -ipc_pipe_get_peer_uid(void *arg, void *buf, size_t *szp, nni_opt_type t) +ipctran_pipe_get_peer_uid(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_pipe *p = arg; - uint64_t id; - int rv; - if ((rv = nni_plat_ipc_pipe_get_peer_uid(p->ipp, &id)) != 0) { + ipctran_pipe *p = arg; + uint64_t id; + int rv; + if ((rv = nni_ipc_conn_get_peer_uid(p->conn, &id)) != 0) { return (rv); } return (nni_copyout_u64(id, buf, szp, t)); } static int -ipc_pipe_get_peer_gid(void *arg, void *buf, size_t *szp, nni_opt_type t) +ipctran_pipe_get_peer_gid(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_pipe *p = arg; - uint64_t id; - int rv; - if ((rv = nni_plat_ipc_pipe_get_peer_gid(p->ipp, &id)) != 0) { + ipctran_pipe *p = arg; + uint64_t id; + int rv; + if ((rv = nni_ipc_conn_get_peer_gid(p->conn, &id)) != 0) { return (rv); } return (nni_copyout_u64(id, buf, szp, t)); } static int -ipc_pipe_get_peer_pid(void *arg, void *buf, size_t *szp, nni_opt_type t) +ipctran_pipe_get_peer_pid(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_pipe *p = arg; - uint64_t id; - int rv; - if ((rv = nni_plat_ipc_pipe_get_peer_pid(p->ipp, &id)) != 0) { + ipctran_pipe *p = arg; + uint64_t id; + int rv; + if ((rv = nni_ipc_conn_get_peer_pid(p->conn, &id)) != 0) { return (rv); } return (nni_copyout_u64(id, buf, szp, t)); } static int -ipc_pipe_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_opt_type t) +ipctran_pipe_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_pipe *p = arg; - uint64_t id; - int rv; - if ((rv = nni_plat_ipc_pipe_get_peer_zoneid(p->ipp, &id)) != 0) { + ipctran_pipe *p = arg; + uint64_t id; + int rv; + if ((rv = nni_ipc_conn_get_peer_zoneid(p->conn, &id)) != 0) { return (rv); } return (nni_copyout_u64(id, buf, szp, t)); } static void -ipc_ep_fini(void *arg) +ipctran_dialer_fini(void *arg) { - ipc_ep *ep = arg; + ipctran_dialer *d = arg; - nni_aio_stop(ep->aio); - nni_plat_ipc_ep_fini(ep->iep); - nni_aio_fini(ep->aio); - nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); + nni_aio_stop(d->aio); + if (d->dialer != NULL) { + nni_ipc_dialer_fini(d->dialer); + } + nni_aio_fini(d->aio); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +ipctran_dialer_close(void *arg) +{ + ipctran_dialer *d = arg; + + nni_aio_close(d->aio); + nni_ipc_dialer_close(d->dialer); } static int -ipc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) +ipctran_dialer_init(void **dp, nni_url *url, nni_sock *sock) { - ipc_ep *ep; - int rv; - size_t sz; + ipctran_dialer *d; + int rv; + size_t sz; - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&ep->mtx); + nni_mtx_init(&d->mtx); - sz = sizeof(ep->sa.s_ipc.sa_path); - ep->sa.s_ipc.sa_family = NNG_AF_IPC; + sz = sizeof(d->sa.s_ipc.sa_path); + d->sa.s_ipc.sa_family = NNG_AF_IPC; - if (nni_strlcpy(ep->sa.s_ipc.sa_path, url->u_path, sz) >= sz) { - ipc_ep_fini(ep); + if (nni_strlcpy(d->sa.s_ipc.sa_path, url->u_path, sz) >= sz) { + ipctran_dialer_fini(d); return (NNG_EADDRINVAL); } - if ((rv = nni_plat_ipc_ep_init(&ep->iep, &ep->sa, mode)) != 0) { - ipc_ep_fini(ep); + if (((rv = nni_ipc_dialer_init(&d->dialer)) != 0) || + ((rv = nni_aio_init(&d->aio, ipctran_dialer_cb, d)) != 0)) { + ipctran_dialer_fini(d); return (rv); } - if ((rv = nni_aio_init(&ep->aio, ipc_ep_cb, ep)) != 0) { - ipc_ep_fini(ep); - return (rv); - } - ep->proto = nni_sock_proto_id(sock); + d->proto = nni_sock_proto_id(sock); - *epp = ep; + *dp = d; return (0); } -static int -ipc_dialer_init(void **epp, nni_url *url, nni_sock *sock) +static void +ipctran_dialer_cb(void *arg) { - return (ipc_ep_init(epp, url, sock, NNI_EP_MODE_DIAL)); + ipctran_dialer *d = arg; + ipctran_pipe * p; + nni_ipc_conn * conn; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + aio = d->user_aio; + rv = nni_aio_result(d->aio); + + if (aio == NULL) { + nni_mtx_unlock(&d->mtx); + if (rv == 0) { + conn = nni_aio_get_output(d->aio, 0); + nni_ipc_conn_fini(conn); + } + return; + } + + if (rv != 0) { + d->user_aio = NULL; + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + d->user_aio = NULL; + conn = nni_aio_get_output(d->aio, 0); + NNI_ASSERT(conn != NULL); + if ((rv = ipctran_pipe_init(&p, conn)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_ipc_conn_fini(conn); + nni_aio_finish_error(aio, rv); + return; + } + + p->proto = d->proto; + p->rcvmax = d->rcvmax; + p->sa = d->sa; + nni_mtx_unlock(&d->mtx); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } -static int -ipc_listener_init(void **epp, nni_url *url, nni_sock *sock) +static void +ipctran_dialer_cancel(nni_aio *aio, int rv) { - return (ipc_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN)); + ipctran_dialer *d = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&d->mtx); + if (d->user_aio != aio) { + nni_mtx_unlock(&d->mtx); + return; + } + d->user_aio = NULL; + nni_mtx_unlock(&d->mtx); + + nni_aio_abort(d->aio, rv); + nni_aio_finish_error(aio, rv); } static void -ipc_ep_close(void *arg) +ipctran_dialer_connect(void *arg, nni_aio *aio) { - ipc_ep *ep = arg; + ipctran_dialer *d = arg; + int rv; - nni_aio_close(ep->aio); + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&d->mtx); + NNI_ASSERT(d->user_aio == NULL); + + if ((rv = nni_aio_schedule(aio, ipctran_dialer_cancel, d)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; + } + d->user_aio = aio; - nni_mtx_lock(&ep->mtx); - nni_plat_ipc_ep_close(ep->iep); - nni_mtx_unlock(&ep->mtx); + nni_ipc_dialer_dial(d->dialer, &d->sa, d->aio); + nni_mtx_unlock(&d->mtx); } static int -ipc_ep_bind(void *arg) +ipctran_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { - ipc_ep *ep = arg; - int rv; + ipctran_dialer *d = arg; + int rv; + nni_mtx_lock(&d->mtx); + rv = nni_copyout_size(d->rcvmax, v, szp, t); + nni_mtx_unlock(&d->mtx); + return (rv); +} - nni_mtx_lock(&ep->mtx); - rv = nni_plat_ipc_ep_listen(ep->iep); - nni_mtx_unlock(&ep->mtx); +static int +ipctran_dialer_set_recvmaxsz( + void *arg, const void *v, size_t sz, nni_opt_type t) +{ + ipctran_dialer *d = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->rcvmax = val; + nni_mtx_unlock(&d->mtx); + } return (rv); } static void -ipc_ep_finish(ipc_ep *ep) +ipctran_listener_fini(void *arg) { - nni_aio * aio; - int rv; - ipc_pipe *pipe = NULL; + ipctran_listener *l = arg; - if ((rv = nni_aio_result(ep->aio)) != 0) { - goto done; + nni_aio_stop(l->aio); + if (l->listener != NULL) { + nni_ipc_listener_fini(l->listener); } - NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); - - // Attempt to allocate the parent pipe. If this fails we'll - // drop the connection (ENOMEM probably). - rv = ipc_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); + nni_aio_fini(l->aio); + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} -done: - aio = ep->user_aio; - ep->user_aio = NULL; +static int +ipctran_listener_init(void **lp, nni_url *url, nni_sock *sock) +{ + ipctran_listener *l; + int rv; + size_t sz; - if ((aio != NULL) && (rv == 0)) { - NNI_ASSERT(pipe != NULL); - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); - return; + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); } + nni_mtx_init(&l->mtx); + + sz = sizeof(l->sa.s_ipc.sa_path); + l->sa.s_ipc.sa_family = NNG_AF_IPC; - if (pipe != NULL) { - ipc_pipe_fini(pipe); + if (nni_strlcpy(l->sa.s_ipc.sa_path, url->u_path, sz) >= sz) { + ipctran_listener_fini(l); + return (NNG_EADDRINVAL); } - if (aio != NULL) { - NNI_ASSERT(rv != 0); - nni_aio_finish_error(aio, rv); + + if (((rv = nni_ipc_listener_init(&l->listener)) != 0) || + ((rv = nni_aio_init(&l->aio, ipctran_listener_cb, l)) != 0)) { + ipctran_listener_fini(l); + return (rv); } + + l->proto = nni_sock_proto_id(sock); + + *lp = l; + return (0); } static void -ipc_ep_cb(void *arg) +ipctran_listener_close(void *arg) { - ipc_ep *ep = arg; + ipctran_listener *l = arg; - nni_mtx_lock(&ep->mtx); - ipc_ep_finish(ep); - nni_mtx_unlock(&ep->mtx); + nni_aio_close(l->aio); + nni_ipc_listener_close(l->listener); } -static void -ipc_cancel_ep(nni_aio *aio, int rv) +static int +ipctran_listener_bind(void *arg) { - ipc_ep *ep = nni_aio_get_prov_data(aio); + ipctran_listener *l = arg; + int rv; - NNI_ASSERT(rv != 0); - nni_mtx_lock(&ep->mtx); - if (ep->user_aio != aio) { - nni_mtx_unlock(&ep->mtx); - return; - } - ep->user_aio = NULL; - nni_mtx_unlock(&ep->mtx); - - nni_aio_abort(ep->aio, rv); - nni_aio_finish_error(aio, rv); + nni_mtx_lock(&l->mtx); + rv = nni_ipc_listener_listen(l->listener, &l->sa); + nni_mtx_unlock(&l->mtx); + return (rv); } static void -ipc_ep_accept(void *arg, nni_aio *aio) +ipctran_listener_cb(void *arg) { - ipc_ep *ep = arg; - int rv; + ipctran_listener *l = arg; + nni_aio * aio; + int rv; + ipctran_pipe * p = NULL; + nni_ipc_conn * conn; + + nni_mtx_lock(&l->mtx); + rv = nni_aio_result(l->aio); + aio = l->user_aio; + l->user_aio = NULL; + + if (aio == NULL) { + nni_mtx_unlock(&l->mtx); + if (rv == 0) { + conn = nni_aio_get_output(l->aio, 0); + nni_ipc_conn_fini(conn); + } + return; + } - if (nni_aio_begin(aio) != 0) { + if (rv != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); return; } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - if ((rv = nni_aio_schedule(aio, ipc_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + conn = nni_aio_get_output(l->aio, 0); + NNI_ASSERT(conn != NULL); + + // Attempt to allocate the parent pipe. If this fails we'll + // drop the connection (ENOMEM probably). + if ((rv = ipctran_pipe_init(&p, conn)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_ipc_conn_fini(conn); nni_aio_finish_error(aio, rv); return; } - ep->user_aio = aio; + p->proto = l->proto; + p->rcvmax = l->rcvmax; + p->sa = l->sa; + nni_mtx_unlock(&l->mtx); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); +} + +static void +ipctran_listener_cancel(nni_aio *aio, int rv) +{ + ipctran_listener *l = nni_aio_get_prov_data(aio); + + NNI_ASSERT(rv != 0); + nni_mtx_lock(&l->mtx); + if (l->user_aio != aio) { + nni_mtx_unlock(&l->mtx); + return; + } + l->user_aio = NULL; + nni_mtx_unlock(&l->mtx); - nni_plat_ipc_ep_accept(ep->iep, ep->aio); - nni_mtx_unlock(&ep->mtx); + nni_aio_abort(l->aio, rv); + nni_aio_finish_error(aio, rv); } static void -ipc_ep_connect(void *arg, nni_aio *aio) +ipctran_listener_accept(void *arg, nni_aio *aio) { - ipc_ep *ep = arg; - int rv; + ipctran_listener *l = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); + nni_mtx_lock(&l->mtx); + NNI_ASSERT(l->user_aio == NULL); - if ((rv = nni_aio_schedule(aio, ipc_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if ((rv = nni_aio_schedule(aio, ipctran_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); nni_aio_finish_error(aio, rv); return; } - ep->user_aio = aio; + l->user_aio = aio; - nni_plat_ipc_ep_connect(ep->iep, ep->aio); - nni_mtx_unlock(&ep->mtx); + nni_ipc_listener_accept(l->listener, l->aio); + nni_mtx_unlock(&l->mtx); } static int -ipc_ep_set_recvmaxsz(void *arg, const void *data, size_t sz, nni_opt_type t) +ipctran_listener_set_recvmaxsz( + void *arg, const void *data, size_t sz, nni_opt_type t) { - ipc_ep *ep = arg; - size_t val; - int rv; + ipctran_listener *l = arg; + size_t val; + int rv; if ((rv = nni_copyin_size(&val, data, sz, 0, NNI_MAXSZ, t)) == 0) { - nni_mtx_lock(&ep->mtx); - ep->rcvmax = val; - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->mtx); + l->rcvmax = val; + nni_mtx_unlock(&l->mtx); } return (rv); } static int -ipc_ep_chk_recvmaxsz(const void *data, size_t sz, nni_opt_type t) +ipctran_listener_get_recvmaxsz( + void *arg, void *data, size_t *szp, nni_opt_type t) { - return (nni_copyin_size(NULL, data, sz, 0, NNI_MAXSZ, t)); + ipctran_listener *l = arg; + int rv; + nni_mtx_lock(&l->mtx); + rv = nni_copyout_size(l->rcvmax, data, szp, t); + nni_mtx_unlock(&l->mtx); + return (rv); } static int -ipc_ep_get_recvmaxsz(void *arg, void *data, size_t *szp, nni_opt_type t) +ipctran_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_size(ep->rcvmax, data, szp, t); - nni_mtx_unlock(&ep->mtx); + ipctran_listener *l = arg; + int rv; + + nni_mtx_lock(&l->mtx); + rv = nni_copyout_sockaddr(&l->sa, buf, szp, t); + nni_mtx_unlock(&l->mtx); return (rv); } static int -ipc_ep_get_addr(void *arg, void *data, size_t *szp, nni_opt_type t) +ipctran_check_recvmaxsz(const void *data, size_t sz, nni_opt_type t) { - ipc_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_sockaddr(&ep->sa, data, szp, t); - nni_mtx_unlock(&ep->mtx); - return (rv); + return (nni_copyin_size(NULL, data, sz, 0, NNI_MAXSZ, t)); } static int -ipc_ep_set_perms(void *arg, const void *data, size_t sz, nni_opt_type t) +ipctran_listener_set_perms( + void *arg, const void *data, size_t sz, nni_opt_type t) { - ipc_ep *ep = arg; - int val; - int rv; + ipctran_listener *l = arg; + int val; + int rv; // Probably we could further limit this -- most systems don't have // meaningful chmod beyond the lower 9 bits. if ((rv = nni_copyin_int(&val, data, sz, 0, 0x7FFFFFFF, t)) == 0) { - nni_mtx_lock(&ep->mtx); - rv = nni_plat_ipc_ep_set_permissions(ep->iep, val); - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->mtx); + rv = nni_ipc_listener_set_permissions(l->listener, val); + nni_mtx_unlock(&l->mtx); } return (rv); } static int -ipc_ep_chk_perms(const void *data, size_t sz, nni_opt_type t) +ipctran_check_perms(const void *data, size_t sz, nni_opt_type t) { return (nni_copyin_int(NULL, data, sz, 0, 0x7FFFFFFF, t)); } static int -ipc_ep_set_sec_desc(void *arg, const void *data, size_t sz, nni_opt_type t) +ipctran_listener_set_sec_desc( + void *arg, const void *data, size_t sz, nni_opt_type t) { - ipc_ep *ep = arg; - void * ptr; - int rv; + ipctran_listener *l = arg; + void * ptr; + int rv; if ((rv = nni_copyin_ptr(&ptr, data, sz, t)) == 0) { - nni_mtx_lock(&ep->mtx); - rv = nni_plat_ipc_ep_set_security_descriptor(ep->iep, ptr); - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->mtx); + rv = + nni_ipc_listener_set_security_descriptor(l->listener, ptr); + nni_mtx_unlock(&l->mtx); } return (rv); } static int -ipc_ep_chk_sec_desc(const void *data, size_t sz, nni_opt_type t) +ipctran_check_sec_desc(const void *data, size_t sz, nni_opt_type t) { return (nni_copyin_ptr(NULL, data, sz, t)); } -static nni_tran_option ipc_pipe_options[] = { +static nni_tran_option ipctran_pipe_options[] = { { .o_name = NNG_OPT_REMADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = ipc_pipe_get_addr, + .o_get = ipctran_pipe_get_addr, }, { .o_name = NNG_OPT_LOCADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = ipc_pipe_get_addr, + .o_get = ipctran_pipe_get_addr, }, { .o_name = NNG_OPT_IPC_PEER_UID, .o_type = NNI_TYPE_UINT64, - .o_get = ipc_pipe_get_peer_uid, + .o_get = ipctran_pipe_get_peer_uid, }, { .o_name = NNG_OPT_IPC_PEER_GID, .o_type = NNI_TYPE_UINT64, - .o_get = ipc_pipe_get_peer_gid, + .o_get = ipctran_pipe_get_peer_gid, }, { .o_name = NNG_OPT_IPC_PEER_PID, .o_type = NNI_TYPE_UINT64, - .o_get = ipc_pipe_get_peer_pid, + .o_get = ipctran_pipe_get_peer_pid, }, { .o_name = NNG_OPT_IPC_PEER_ZONEID, .o_type = NNI_TYPE_UINT64, - .o_get = ipc_pipe_get_peer_zoneid, + .o_get = ipctran_pipe_get_peer_zoneid, }, // terminate list { @@ -926,29 +1083,24 @@ static nni_tran_option ipc_pipe_options[] = { }, }; -static nni_tran_pipe_ops ipc_pipe_ops = { - .p_fini = ipc_pipe_fini, - .p_start = ipc_pipe_start, - .p_stop = ipc_pipe_stop, - .p_send = ipc_pipe_send, - .p_recv = ipc_pipe_recv, - .p_close = ipc_pipe_close, - .p_peer = ipc_pipe_peer, - .p_options = ipc_pipe_options, +static nni_tran_pipe_ops ipctran_pipe_ops = { + .p_fini = ipctran_pipe_fini, + .p_start = ipctran_pipe_start, + .p_stop = ipctran_pipe_stop, + .p_send = ipctran_pipe_send, + .p_recv = ipctran_pipe_recv, + .p_close = ipctran_pipe_close, + .p_peer = ipctran_pipe_peer, + .p_options = ipctran_pipe_options, }; -static nni_tran_option ipc_dialer_options[] = { +static nni_tran_option ipctran_dialer_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = ipc_ep_get_recvmaxsz, - .o_set = ipc_ep_set_recvmaxsz, - .o_chk = ipc_ep_chk_recvmaxsz, - }, - { - .o_name = NNG_OPT_LOCADDR, - .o_type = NNI_TYPE_SOCKADDR, - .o_get = ipc_ep_get_addr, + .o_get = ipctran_dialer_get_recvmaxsz, + .o_set = ipctran_dialer_set_recvmaxsz, + .o_chk = ipctran_check_recvmaxsz, }, // terminate list { @@ -956,32 +1108,32 @@ static nni_tran_option ipc_dialer_options[] = { }, }; -static nni_tran_option ipc_listener_options[] = { +static nni_tran_option ipctran_listener_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = ipc_ep_get_recvmaxsz, - .o_set = ipc_ep_set_recvmaxsz, - .o_chk = ipc_ep_chk_recvmaxsz, + .o_get = ipctran_listener_get_recvmaxsz, + .o_set = ipctran_listener_set_recvmaxsz, + .o_chk = ipctran_check_recvmaxsz, }, { .o_name = NNG_OPT_LOCADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = ipc_ep_get_addr, + .o_get = ipctran_listener_get_locaddr, }, { .o_name = NNG_OPT_IPC_SECURITY_DESCRIPTOR, .o_type = NNI_TYPE_POINTER, .o_get = NULL, - .o_set = ipc_ep_set_sec_desc, - .o_chk = ipc_ep_chk_sec_desc, + .o_set = ipctran_listener_set_sec_desc, + .o_chk = ipctran_check_sec_desc, }, { .o_name = NNG_OPT_IPC_PERMISSIONS, .o_type = NNI_TYPE_INT32, .o_get = NULL, - .o_set = ipc_ep_set_perms, - .o_chk = ipc_ep_chk_perms, + .o_set = ipctran_listener_set_perms, + .o_chk = ipctran_check_perms, }, // terminate list { @@ -989,31 +1141,31 @@ static nni_tran_option ipc_listener_options[] = { }, }; -static nni_tran_dialer_ops ipc_dialer_ops = { - .d_init = ipc_dialer_init, - .d_fini = ipc_ep_fini, - .d_connect = ipc_ep_connect, - .d_close = ipc_ep_close, - .d_options = ipc_dialer_options, +static nni_tran_dialer_ops ipctran_dialer_ops = { + .d_init = ipctran_dialer_init, + .d_fini = ipctran_dialer_fini, + .d_connect = ipctran_dialer_connect, + .d_close = ipctran_dialer_close, + .d_options = ipctran_dialer_options, }; -static nni_tran_listener_ops ipc_listener_ops = { - .l_init = ipc_listener_init, - .l_fini = ipc_ep_fini, - .l_bind = ipc_ep_bind, - .l_accept = ipc_ep_accept, - .l_close = ipc_ep_close, - .l_options = ipc_listener_options, +static nni_tran_listener_ops ipctran_listener_ops = { + .l_init = ipctran_listener_init, + .l_fini = ipctran_listener_fini, + .l_bind = ipctran_listener_bind, + .l_accept = ipctran_listener_accept, + .l_close = ipctran_listener_close, + .l_options = ipctran_listener_options, }; static nni_tran ipc_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "ipc", - .tran_dialer = &ipc_dialer_ops, - .tran_listener = &ipc_listener_ops, - .tran_pipe = &ipc_pipe_ops, - .tran_init = ipc_tran_init, - .tran_fini = ipc_tran_fini, + .tran_dialer = &ipctran_dialer_ops, + .tran_listener = &ipctran_listener_ops, + .tran_pipe = &ipctran_pipe_ops, + .tran_init = ipctran_init, + .tran_fini = ipctran_fini, }; int diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index c323da29..0d47529e 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -129,6 +129,7 @@ tcptran_pipe_fini(void *arg) nni_tcp_conn_fini(p->conn); } nni_msg_free(p->rxmsg); + nni_mtx_fini(&p->mtx); NNI_FREE_STRUCT(p); } @@ -780,7 +781,12 @@ static int tcptran_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { tcptran_dialer *d = arg; - return (nni_copyout_size(d->rcvmax, v, szp, t)); + int rv; + + nni_mtx_lock(&d->mtx); + rv = nni_copyout_size(d->rcvmax, v, szp, t); + nni_mtx_unlock(&d->mtx); + return (rv); } static int @@ -922,15 +928,12 @@ tcptran_listener_init(void **lp, nni_url *url, nni_sock *sock) return (rv); } - if ((rv = nni_tcp_listener_init(&l->listener)) != 0) { + if (((rv = nni_tcp_listener_init(&l->listener)) != 0) || + ((rv = nni_aio_init(&l->aio, tcptran_listener_cb, l)) != 0)) { tcptran_listener_fini(l); return (rv); } - if ((rv = nni_aio_init(&l->aio, tcptran_listener_cb, l)) != 0) { - tcptran_listener_fini(l); - return (rv); - } l->proto = nni_sock_proto_id(sock); l->nodelay = true; l->keepalive = false; @@ -955,9 +958,9 @@ tcptran_listener_bind(void *arg) tcptran_listener *l = arg; int rv; - l->bsa = l->sa; nni_mtx_lock(&l->mtx); - rv = nni_tcp_listener_listen(l->listener, &l->bsa); + l->bsa = l->sa; + rv = nni_tcp_listener_listen(l->listener, &l->bsa); nni_mtx_unlock(&l->mtx); return (rv); @@ -1139,7 +1142,24 @@ static int tcptran_listener_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { tcptran_listener *l = arg; - return (nni_copyout_size(l->rcvmax, v, szp, t)); + int rv; + + nni_mtx_lock(&l->mtx); + rv = nni_copyout_size(l->rcvmax, v, szp, t); + nni_mtx_unlock(&l->mtx); + return (rv); +} + +static int +tcptran_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + tcptran_listener *l = arg; + int rv; + + nni_mtx_lock(&l->mtx); + rv = nni_copyout_sockaddr(&l->bsa, buf, szp, t); + nni_mtx_unlock(&l->mtx); + return (rv); } static int @@ -1234,6 +1254,11 @@ static nni_tran_option tcptran_listener_options[] = { .o_chk = tcptran_check_recvmaxsz, }, { + .o_name = NNG_OPT_LOCADDR, + .o_type = NNI_TYPE_SOCKADDR, + .o_get = tcptran_listener_get_locaddr, + }, + { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, .o_get = tcptran_listener_get_url, diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index b1bdddb8..e6701f5f 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -913,9 +913,9 @@ tlstran_listener_bind(void *arg) tlstran_listener *l = arg; int rv; - l->bsa = l->sa; nni_mtx_lock(&l->ep.mtx); - rv = nni_tcp_listener_listen(l->listener, &l->bsa); + l->bsa = l->sa; + rv = nni_tcp_listener_listen(l->listener, &l->bsa); nni_mtx_unlock(&l->ep.mtx); return (rv); @@ -1122,6 +1122,18 @@ tlstran_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) } static int +tlstran_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + tlstran_listener *l = arg; + int rv; + + nni_mtx_lock(&l->ep.mtx); + rv = nni_copyout_sockaddr(&l->bsa, buf, szp, t); + nni_mtx_unlock(&l->ep.mtx); + return (rv); +} + +static int tlstran_check_recvmaxsz(const void *v, size_t sz, nni_opt_type t) { return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); @@ -1381,6 +1393,11 @@ static nni_tran_option tlstran_listener_options[] = { .o_get = tlstran_listener_get_url, }, { + .o_name = NNG_OPT_LOCADDR, + .o_type = NNI_TYPE_SOCKADDR, + .o_get = tlstran_listener_get_locaddr, + }, + { .o_name = NNG_OPT_TLS_CONFIG, .o_type = NNI_TYPE_POINTER, .o_get = tlstran_ep_get_config, |
