diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-07-16 17:10:47 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-07-18 13:25:33 -0700 |
| commit | b310f712828962bf3187caf3bfe064c3531c5628 (patch) | |
| tree | f99ceb5851f601c93b0305617d692722f2978dd5 | |
| parent | 3f40a08eab60df77dc61ae0350e59f36e8d0ed16 (diff) | |
| download | nng-b310f712828962bf3187caf3bfe064c3531c5628.tar.gz nng-b310f712828962bf3187caf3bfe064c3531c5628.tar.bz2 nng-b310f712828962bf3187caf3bfe064c3531c5628.zip | |
fixes #595 mutex leak and other minor errors in TCP
fixes #596 POSIX IPC should move away from pipedesc/epdesc
fixes #598 TLS and TCP listeners could support NNG_OPT_LOCADDR
fixes #594 Windows IPC should use "new style" win_io code.
fixes #597 macOS could support PEER PID
This large change set cleans up the IPC support on Windows and
POSIX. This has the beneficial impact of significantly reducing
the complexity of the code, reducing locking, increasing
concurrency (multiple dial and accepts can be outstanding now),
reducing context switches (we complete thins synchronously now).
While here we have added some missing option support, and fixed a
few more bugs that we found in the TCP code changes from last week.
27 files changed, 2913 insertions, 2566 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 11c7d82e..ed96b6bc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -87,19 +87,20 @@ set (NNG_SOURCES if (NNG_PLATFORM_POSIX) set (NNG_SOURCES ${NNG_SOURCES} platform/posix/posix_impl.h + platform/posix/posix_ipc.h platform/posix/posix_config.h - platform/posix/posix_aio.h platform/posix/posix_pollq.h + platform/posix/posix_tcp.h platform/posix/posix_alloc.c platform/posix/posix_atomic.c platform/posix/posix_clock.c platform/posix/posix_debug.c - platform/posix/posix_epdesc.c platform/posix/posix_file.c - platform/posix/posix_ipc.c + platform/posix/posix_ipcconn.c + platform/posix/posix_ipcdial.c + platform/posix/posix_ipclisten.c platform/posix/posix_pipe.c - platform/posix/posix_pipedesc.c platform/posix/posix_rand.c platform/posix/posix_resolv_gai.c platform/posix/posix_sockaddr.c @@ -132,12 +133,17 @@ endif() if (NNG_PLATFORM_WINDOWS) set (NNG_SOURCES ${NNG_SOURCES} platform/windows/win_impl.h + platform/windows/win_ipc.h + platform/windows/win_tcp.h + platform/windows/win_clock.c platform/windows/win_debug.c platform/windows/win_file.c platform/windows/win_io.c platform/windows/win_iocp.c - platform/windows/win_ipc.c + platform/windows/win_ipcconn.c + platform/windows/win_ipcdial.c + platform/windows/win_ipclisten.c platform/windows/win_pipe.c platform/windows/win_rand.c platform/windows/win_resolv.c diff --git a/src/core/platform.h b/src/core/platform.h index 5045b172..2ae0fd0b 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -215,7 +215,7 @@ typedef struct nni_tcp_listener nni_tcp_listener; extern void nni_tcp_conn_fini(nni_tcp_conn *); -// nni_tcp_dialer_close closes the dialer, which might actually be +// nni_tcp_conn_close closes the connection, which might actually be // implemented as a shutdown() call. // Further operations on it should return NNG_ECLOSED. extern void nni_tcp_conn_close(nni_tcp_conn *); @@ -248,7 +248,7 @@ extern int nni_tcp_conn_set_nodelay(nni_tcp_conn *, bool); // keepalive probes. Tuning of these keepalives is current unsupported. extern int nni_tcp_conn_set_keepalive(nni_tcp_conn *, bool); -// nni_tcp_listener_init creates a new dialer object. +// nni_tcp_dialer_init creates a new dialer object. extern int nni_tcp_dialer_init(nni_tcp_dialer **); // nni_tcp_dialer_fini finalizes the dialer, closing it and freeing @@ -311,81 +311,102 @@ extern void nni_udp_resolv(const char *, const char *, int, int, nni_aio *); // IPC (UNIX Domain Sockets & Named Pipes) Support. // -typedef struct nni_plat_ipc_ep nni_plat_ipc_ep; -typedef struct nni_plat_ipc_pipe nni_plat_ipc_pipe; +typedef struct nni_ipc_conn nni_ipc_conn; +typedef struct nni_ipc_dialer nni_ipc_dialer; +typedef struct nni_ipc_listener nni_ipc_listener; -// nni_plat_ipc_ep_init creates a new endpoint associated with the url. -// The final field is the mode, either for dialing (NNI_EP_MODE_DIAL) or -// listening (NNI_EP_MODE_LISTEN). -extern int nni_plat_ipc_ep_init(nni_plat_ipc_ep **, const nni_sockaddr *, int); +// nni_ipc_conn_fini disposes of the connection. +extern void nni_ipc_conn_fini(nni_ipc_conn *); -// nni_plat_ipc_ep_fini closes the endpoint and releases resources. -extern void nni_plat_ipc_ep_fini(nni_plat_ipc_ep *); +// nni_ipc_conn_close closes the connection, which might actually be +// implemented as a shutdown() call. +// Further operations on it should return NNG_ECLOSED. +extern void nni_ipc_conn_close(nni_ipc_conn *); -// nni_plat_ipc_ep_close closes the endpoint; this might not close the -// actual underlying socket, but it should call shutdown on it. -// Further operations on the pipe should return NNG_ECLOSED. -extern void nni_plat_ipc_ep_close(nni_plat_ipc_ep *); +// nni_ipc_conn_send sends data in the iov buffers to the peer. +// The platform may modify the iovs. +extern void nni_ipc_conn_send(nni_ipc_conn *, nni_aio *); -// nni_plat_ipc_listen creates an IPC socket in listening mode, bound -// to the specified path. -extern int nni_plat_ipc_ep_listen(nni_plat_ipc_ep *); +// nni_ipc_conn_recv receives data into the buffers provided by the +// I/O vector (iovs). The platform should attempt to scatter the received +// data into the iovs if possible. +// +// It is possible for the reader to return less data than is requested, +// in which case the caller is responsible for resubmitting. The platform +// must not return "zero" data however. (It is an error to attempt to +// receive zero bytes.) The platform may modify the iovs. +extern void nni_ipc_conn_recv(nni_ipc_conn *, nni_aio *); -// nni_plat_ipc_ep_accept starts an accept to receive an incoming connection. -// An accepted connection will be passed back in the a_pipe member. -extern void nni_plat_ipc_ep_accept(nni_plat_ipc_ep *, nni_aio *); +// nni_ipc_conn_get_peer_uid obtains the peer user id, if possible. +// NB: Only POSIX systems support user IDs. +extern int nni_ipc_conn_get_peer_uid(nni_ipc_conn *, uint64_t *); -// nni_plat_ipc_connect is the client side. -// An accepted connection will be passed back in the a_pipe member. -extern void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *, nni_aio *); +// nni_ipc_conn_get_peer_gid obtains the peer group id, if possible. +// NB: Only POSIX systems support group IDs. +extern int nni_ipc_conn_get_peer_gid(nni_ipc_conn *, uint64_t *); -// nni_plat_ipc_ep_set_security_descriptor sets the Windows security -// descriptor. This is *only* supported for Windows platforms. All -// others return NNG_ENOTSUP. The void argument is a pointer to -// a SECURITY_DESCRIPTOR object, and must be valid. -extern int nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *, void *); +// nni_ipc_conn_get_peer_pid obtains the peer process id, if possible. +extern int nni_ipc_conn_get_peer_pid(nni_ipc_conn *, uint64_t *); -// nni_plat_ipc_ep_set_permissions sets UNIX style permissions -// on the named pipes. This basically just does a chmod() on the -// named pipe, and is only supported o the server side, and only on -// systems that support this (POSIX, not Windows). Note that changing -// ownership is not supported at this time. Most systems use only -// 16-bits, the lower 12 of which are user, group, and other, e.g. -// 0640 gives read/write access to user, read to group, and prevents -// any other user from accessing it. This option only has meaning -// for listeners, on dialers it is ignored. -extern int nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *, uint32_t); +// nni_ipc_conn_get_peer_zoneid obtains the peer zone id, if possible. +// NB: Only illumos & SunOS systems have the notion of "zones". +extern int nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *, uint64_t *); + +// nni_ipc_dialer_init creates a new dialer object. +extern int nni_ipc_dialer_init(nni_ipc_dialer **); -// nni_plat_ipc_pipe_fini closes the pipe, and releases all resources -// associated with it. -extern void nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *); +// nni_ipc_dialer_fini finalizes the dialer, closing it and freeing +// all resources. +extern void nni_ipc_dialer_fini(nni_ipc_dialer *); -// nni_plat_ipc_pipe_close closes the socket, or at least shuts it down. -// Further operations on the pipe should return NNG_ECLOSED. -extern void nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *); +// nni_ipc_dialer_close closes the dialer. +// Further operations on it should return NNG_ECLOSED. Any in-progress +// connection will be aborted. +extern void nni_ipc_dialer_close(nni_ipc_dialer *); -// nni_plat_ipc_pipe_send sends data in the iov buffers to the peer. -// The platform may modify the iovs. -extern void nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *, nni_aio *); +// nni_ipc_dialer_dial attempts to create an outgoing connection, +// asynchronously, to the address specified. On success, the first (and only) +// output will be an nni_ipc_conn * associated with the remote server. +extern void nni_ipc_dialer_dial( + nni_ipc_dialer *, const nni_sockaddr *, nni_aio *); -// nni_plat_ipc_pipe_recv recvs data into the buffers provided by the iovs. -// The platform may modify the iovs. -extern void nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *, nni_aio *); +// nni_ipc_listener_init creates a new listener object, unbound. +extern int nni_ipc_listener_init(nni_ipc_listener **); -// nni_plat_ipc_pipe_get_peer_uid obtains the peer user id, if possible. -// NB: Only POSIX systems support user IDs. -extern int nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *, uint64_t *); +// nni_ipc_listener_fini frees the listener and all associated resources. +// It implictly closes the listener as well. +extern void nni_ipc_listener_fini(nni_ipc_listener *); -// nni_plat_ipc_pipe_get_peer_gid obtains the peer group id, if possible. -// NB: Only POSIX systems support group IDs. -extern int nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *, uint64_t *); +// nni_ipc_listener_close closes the listener. This will unbind +// any bound socket, and further operations will result in NNG_ECLOSED. +extern void nni_ipc_listener_close(nni_ipc_listener *); -// nni_plat_ipc_pipe_get_peer_pid obtains the peer process id, if possible. -extern int nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *, uint64_t *); +// nni_ipc_listener_listen creates the socket in listening mode, bound +// to the specified address. Unlike TCP, this address does not change. +extern int nni_ipc_listener_listen(nni_ipc_listener *, const nni_sockaddr *); -// nni_plat_ipc_pipe_get_peer_zoneid obtains the peer zone id, if possible. -// NB: Only illumos & SunOS systems have the notion of "zones". -extern int nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *, uint64_t *); +// nni_ipc_listener_accept accepts in incoming connect, asynchronously. +// On success, the first (and only) output will be an nni_ipc_conn * +// associated with the remote peer. +extern void nni_ipc_listener_accept(nni_ipc_listener *, nni_aio *); + +// nni_ipc_listener_set_permissions sets UNIX style permissions +// on the named pipes. This basically just does a chmod() on the +// named pipe, and is only supported o the server side, and only on +// systems that support this (POSIX, not Windows). Note that changing +// ownership is not supported at this time. Most systems use only +// 16-bits, the lower 12 of which are user, group, and other, e.g. +// 0640 gives read/write access to user, read to group, and prevents +// any other user from accessing it. On platforms without this support, +// ENOTSUP is returned. +extern int nni_ipc_listener_set_permissions(nni_ipc_listener *, int); + +// nni_ipc_listener_set_security_descriptor sets the Windows security +// descriptor. This is *only* supported for Windows platforms. All +// others return NNG_ENOTSUP. The void argument is a pointer to +// a SECURITY_DESCRIPTOR object, and must be valid. +extern int nni_ipc_listener_set_security_descriptor( + nni_ipc_listener *, void *); // // UDP support. UDP is not connection oriented, and only has the notion diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c deleted file mode 100644 index d796765a..00000000 --- a/src/platform/posix/posix_epdesc.c +++ /dev/null @@ -1,597 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_pollq.h" - -#include <errno.h> -#include <fcntl.h> -#include <netdb.h> -#include <poll.h> -#include <stdbool.h> -#include <stdlib.h> -#include <string.h> -#include <sys/socket.h> -#include <sys/stat.h> -#include <sys/types.h> -#include <sys/uio.h> -#include <sys/un.h> -#include <unistd.h> - -#ifdef sun -#undef sun -#endif - -#ifdef SOCK_CLOEXEC -#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) -#else -#define NNI_STREAM_SOCKTYPE SOCK_STREAM -#endif - -struct nni_posix_epdesc { - nni_posix_pfd * pfd; - nni_list connectq; - nni_list acceptq; - bool closed; - bool started; - bool ipcbound; // if true unlink socket on exit - struct sockaddr_storage locaddr; - struct sockaddr_storage remaddr; - socklen_t loclen; - socklen_t remlen; - mode_t perms; // UNIX sockets only - int mode; // end point mode (dialer/listener) - nni_mtx mtx; -}; - -static void nni_epdesc_connect_cb(nni_posix_pfd *, int, void *); -static void nni_epdesc_accept_cb(nni_posix_pfd *, int, void *); - -static void -nni_epdesc_cancel(nni_aio *aio, int rv) -{ - nni_posix_epdesc *ed = nni_aio_get_prov_data(aio); - nni_posix_pfd * pfd = NULL; - - NNI_ASSERT(rv != 0); - nni_mtx_lock(&ed->mtx); - if (nni_aio_list_active(aio)) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - } - if ((ed->mode == NNI_EP_MODE_DIAL) && nni_list_empty(&ed->connectq) && - ((pfd = ed->pfd) != NULL)) { - nni_posix_pfd_close(pfd); - } - nni_mtx_unlock(&ed->mtx); -} - -static void -nni_epdesc_finish(nni_aio *aio, int rv, nni_posix_pfd *newpfd) -{ - nni_posix_pipedesc *pd = NULL; - - // acceptq or connectq. - nni_aio_list_remove(aio); - - if (rv != 0) { - NNI_ASSERT(newpfd == NULL); - nni_aio_finish_error(aio, rv); - return; - } - - NNI_ASSERT(newpfd != NULL); - if ((rv = nni_posix_pipedesc_init(&pd, newpfd)) != 0) { - nni_posix_pfd_fini(newpfd); - nni_aio_finish_error(aio, rv); - return; - } - nni_aio_set_output(aio, 0, pd); - nni_aio_finish(aio, 0, 0); -} - -static void -nni_epdesc_doaccept(nni_posix_epdesc *ed) -{ - nni_aio *aio; - - while ((aio = nni_list_first(&ed->acceptq)) != NULL) { - int newfd; - int fd; - int rv; - nni_posix_pfd *pfd; - - fd = nni_posix_pfd_fd(ed->pfd); - -#ifdef NNG_USE_ACCEPT4 - newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); - if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { - newfd = accept(fd, NULL, NULL); - } -#else - newfd = accept(fd, NULL, NULL); -#endif - if (newfd < 0) { - switch (errno) { - case EAGAIN: -#ifdef EWOULDBLOCK -#if EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif -#endif - rv = nni_posix_pfd_arm(ed->pfd, POLLIN); - if (rv != 0) { - nni_epdesc_finish(aio, rv, NULL); - continue; - } - // Come back later... - return; - case ECONNABORTED: - case ECONNRESET: - // Eat them, they aren't interesting. - continue; - default: - // Error this one, but keep moving to the next. - rv = nni_plat_errno(errno); - nni_epdesc_finish(aio, rv, NULL); - continue; - } - } - - if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { - close(newfd); - nni_epdesc_finish(aio, rv, NULL); - continue; - } - - nni_epdesc_finish(aio, 0, pfd); - } -} - -static void -nni_epdesc_doclose(nni_posix_epdesc *ed) -{ - nni_aio *aio; - - ed->closed = true; - while ((aio = nni_list_first(&ed->acceptq)) != NULL) { - nni_epdesc_finish(aio, NNG_ECLOSED, 0); - } - while ((aio = nni_list_first(&ed->connectq)) != NULL) { - nni_epdesc_finish(aio, NNG_ECLOSED, 0); - } - - if (ed->pfd != NULL) { - - nni_posix_pfd_close(ed->pfd); - } - - // clean up stale UNIX socket when closing the server. - if (ed->ipcbound) { - struct sockaddr_un *sun = (void *) &ed->locaddr; - (void) unlink(sun->sun_path); - } -} - -static void -nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg) -{ - nni_posix_epdesc *ed = arg; - - NNI_ARG_UNUSED(pfd); - - nni_mtx_lock(&ed->mtx); - if (events & POLLNVAL) { - nni_epdesc_doclose(ed); - nni_mtx_unlock(&ed->mtx); - return; - } - - // Anything else will turn up in accept. - nni_epdesc_doaccept(ed); - nni_mtx_unlock(&ed->mtx); -} - -void -nni_posix_epdesc_close(nni_posix_epdesc *ed) -{ - nni_mtx_lock(&ed->mtx); - nni_epdesc_doclose(ed); - nni_mtx_unlock(&ed->mtx); -} - -int -nni_posix_epdesc_listen(nni_posix_epdesc *ed) -{ - int len; - struct sockaddr_storage *ss; - int rv; - int fd; - nni_posix_pfd * pfd; - - nni_mtx_lock(&ed->mtx); - - if (ed->started) { - nni_mtx_unlock(&ed->mtx); - return (NNG_ESTATE); - } - if (ed->closed) { - nni_mtx_unlock(&ed->mtx); - return (NNG_ECLOSED); - } - if ((len = ed->loclen) == 0) { - nni_mtx_unlock(&ed->mtx); - return (NNG_EADDRINVAL); - } - - ss = &ed->locaddr; - len = ed->loclen; - - if ((fd = socket(ss->ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { - nni_mtx_unlock(&ed->mtx); - return (nni_plat_errno(errno)); - } - - if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { - nni_mtx_unlock(&ed->mtx); - nni_posix_pfd_fini(pfd); - return (rv); - } - -#if defined(SO_REUSEADDR) && !defined(NNG_PLATFORM_WSL) - if (ss->ss_family != AF_UNIX) { - int on = 1; - // If for some reason this doesn't work, it's probably ok. - // Second bind will fail. - (void) setsockopt( - fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); - } -#endif - - if (bind(fd, (struct sockaddr *) ss, len) < 0) { - rv = nni_plat_errno(errno); - nni_mtx_unlock(&ed->mtx); - nni_posix_pfd_fini(pfd); - return (rv); - } - - // For UNIX domain sockets, optionally set the permission bits. - // This is done after the bind and before listen, and on the file - // rather than the file descriptor. - // Experiments have shown that chmod() works correctly, provided that - // it is done *before* the listen() operation, whereas fchmod seems to - // have no impact. This behavior was observed on both macOS and Linux. - // YMMV on other platforms. - if (ss->ss_family == AF_UNIX) { - ed->ipcbound = true; - if (ed->perms != 0) { - struct sockaddr_un *sun = (void *) ss; - mode_t perms = ed->perms & ~(S_IFMT); - if ((rv = chmod(sun->sun_path, perms)) != 0) { - rv = nni_plat_errno(errno); - nni_mtx_unlock(&ed->mtx); - nni_posix_pfd_fini(pfd); - return (rv); - } - } - } - - // Listen -- 128 depth is probably sufficient. If it isn't, other - // bad things are going to happen. - if (listen(fd, 128) != 0) { - rv = nni_plat_errno(errno); - nni_mtx_unlock(&ed->mtx); - nni_posix_pfd_fini(pfd); - return (rv); - } - - nni_posix_pfd_set_cb(pfd, nni_epdesc_accept_cb, ed); - - ed->pfd = pfd; - ed->started = true; - nni_mtx_unlock(&ed->mtx); - - return (0); -} - -void -nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) -{ - int rv; - - // Accept is simpler than the connect case. With accept we just - // need to wait for the socket to be readable to indicate an incoming - // connection is ready for us. There isn't anything else for us to - // do really, as that will have been done in listen. - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&ed->mtx); - - if (!ed->started) { - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } - if (ed->closed) { - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } - if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) { - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, rv); - return; - } - nni_aio_list_append(&ed->acceptq, aio); - if (nni_list_first(&ed->acceptq) == aio) { - nni_epdesc_doaccept(ed); - } - nni_mtx_unlock(&ed->mtx); -} - -int -nni_posix_epdesc_sockname(nni_posix_epdesc *ed, nni_sockaddr *sa) -{ - struct sockaddr_storage ss; - socklen_t sslen = sizeof(ss); - int fd = -1; - - nni_mtx_lock(&ed->mtx); - if (ed->pfd != NULL) { - fd = nni_posix_pfd_fd(ed->pfd); - } - nni_mtx_unlock(&ed->mtx); - - if (getsockname(fd, (void *) &ss, &sslen) != 0) { - return (nni_plat_errno(errno)); - } - return (nni_posix_sockaddr2nn(sa, &ss)); -} - -static void -nni_epdesc_connect_start(nni_posix_epdesc *ed) -{ - nni_posix_pfd *pfd; - int fd; - int rv; - nni_aio * aio; - -loop: - if ((aio = nni_list_first(&ed->connectq)) == NULL) { - return; - } - - NNI_ASSERT(ed->pfd == NULL); - if (ed->closed) { - nni_epdesc_finish(aio, NNG_ECLOSED, NULL); - goto loop; - } - ed->started = true; - - if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { - rv = nni_plat_errno(errno); - nni_epdesc_finish(aio, rv, NULL); - goto loop; - } - - if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { - (void) close(fd); - nni_epdesc_finish(aio, rv, NULL); - goto loop; - } - // Possibly bind. - if ((ed->loclen != 0) && - (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) { - rv = nni_plat_errno(errno); - nni_epdesc_finish(aio, rv, NULL); - nni_posix_pfd_fini(pfd); - goto loop; - } - - if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) { - // Immediate connect, cool! This probably only happens on - // loopback, and probably not on every platform. - nni_epdesc_finish(aio, 0, pfd); - goto loop; - } - - if (errno != EINPROGRESS) { - // Some immediate failure occurred. - if (errno == ENOENT) { // For UNIX domain sockets - rv = NNG_ECONNREFUSED; - } else { - rv = nni_plat_errno(errno); - } - nni_epdesc_finish(aio, rv, NULL); - nni_posix_pfd_fini(pfd); - goto loop; - } - nni_posix_pfd_set_cb(pfd, nni_epdesc_connect_cb, ed); - if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) { - nni_epdesc_finish(aio, rv, NULL); - nni_posix_pfd_fini(pfd); - goto loop; - } - ed->pfd = pfd; - // all done... wait for this to signal via callback -} - -void -nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) -{ - int rv; - - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&ed->mtx); - if (ed->closed) { - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } - if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) { - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, rv); - return; - } - - ed->started = true; - nni_list_append(&ed->connectq, aio); - if (nni_list_first(&ed->connectq) == aio) { - // If there was a stale pfd (probably from an aborted or - // canceled connect attempt), discard it so we start fresh. - if (ed->pfd != NULL) { - nni_posix_pfd_fini(ed->pfd); - ed->pfd = NULL; - } - nni_epdesc_connect_start(ed); - } - nni_mtx_unlock(&ed->mtx); -} - -static void -nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg) -{ - nni_posix_epdesc *ed = arg; - nni_aio * aio; - socklen_t sz; - int rv; - int fd; - - nni_mtx_lock(&ed->mtx); - if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) || - (pfd != ed->pfd)) { - // Spurious completion. Just ignore it. - nni_mtx_unlock(&ed->mtx); - return; - } - - fd = nni_posix_pfd_fd(pfd); - sz = sizeof(rv); - - if ((events & POLLNVAL) != 0) { - rv = EBADF; - - } else if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { - rv = errno; - } - - switch (rv) { - case 0: - // Good connect! - ed->pfd = NULL; - nni_epdesc_finish(aio, 0, pfd); - break; - case EINPROGRESS: // still connecting... come back later - nni_mtx_unlock(&ed->mtx); - return; - default: - ed->pfd = NULL; - nni_epdesc_finish(aio, nni_plat_errno(rv), NULL); - nni_posix_pfd_fini(pfd); - break; - } - - // Start another connect running, if any is waiting. - nni_epdesc_connect_start(ed); - nni_mtx_unlock(&ed->mtx); -} - -int -nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode) -{ - nni_posix_epdesc *ed; - - if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) { - return (NNG_ENOMEM); - } - - nni_mtx_init(&ed->mtx); - - ed->pfd = NULL; - ed->closed = false; - ed->started = false; - ed->perms = 0; // zero means use default (no change) - ed->mode = mode; - - nni_aio_list_init(&ed->connectq); - nni_aio_list_init(&ed->acceptq); - *edp = ed; - return (0); -} - -void -nni_posix_epdesc_set_local(nni_posix_epdesc *ed, void *sa, size_t len) -{ - if ((len < 1) || (len > sizeof(struct sockaddr_storage))) { - return; - } - nni_mtx_lock(&ed->mtx); - memcpy(&ed->locaddr, sa, len); - ed->loclen = len; - nni_mtx_unlock(&ed->mtx); -} - -void -nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, size_t len) -{ - if ((len < 1) || (len > sizeof(struct sockaddr_storage))) { - return; - } - nni_mtx_lock(&ed->mtx); - memcpy(&ed->remaddr, sa, len); - ed->remlen = len; - nni_mtx_unlock(&ed->mtx); -} - -int -nni_posix_epdesc_set_permissions(nni_posix_epdesc *ed, mode_t mode) -{ - nni_mtx_lock(&ed->mtx); - if (ed->mode != NNI_EP_MODE_LISTEN) { - nni_mtx_unlock(&ed->mtx); - return (NNG_ENOTSUP); - } - if (ed->started) { - nni_mtx_unlock(&ed->mtx); - return (NNG_EBUSY); - } - if ((mode & S_IFMT) != 0) { - nni_mtx_unlock(&ed->mtx); - return (NNG_EINVAL); - } - ed->perms = mode | S_IFSOCK; // we set IFSOCK to ensure non-zero - nni_mtx_unlock(&ed->mtx); - return (0); -} - -void -nni_posix_epdesc_fini(nni_posix_epdesc *ed) -{ - nni_posix_pfd *pfd; - - nni_mtx_lock(&ed->mtx); - nni_epdesc_doclose(ed); - pfd = ed->pfd; - nni_mtx_unlock(&ed->mtx); - - if (pfd != NULL) { - nni_posix_pfd_fini(pfd); - } - nni_mtx_fini(&ed->mtx); - NNI_FREE_STRUCT(ed); -} - -#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c deleted file mode 100644 index 5ba3c8fb..00000000 --- a/src/platform/posix/posix_ipc.c +++ /dev/null @@ -1,250 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" - -#include <errno.h> -#include <fcntl.h> -#include <netdb.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/uio.h> -#include <sys/un.h> -#include <unistd.h> - -#ifdef SOCK_CLOEXEC -#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) -#else -#define NNI_STREAM_SOCKTYPE SOCK_STREAM -#endif - -// Solaris/SunOS systems define this, which collides with our symbol -// names. Just undefine it now. -#ifdef sun -#undef sun -#endif - -static int nni_plat_ipc_remove_stale(const char *path); - -// We alias nni_posix_pipedesc to nni_plat_ipc_pipe. -// We alias nni_posix_epdesc to nni_plat_ipc_ep. - -int -nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode) -{ - nni_posix_epdesc * ed; - int rv; - struct sockaddr_un sun; - - if ((rv = nni_posix_epdesc_init(&ed, mode)) != 0) { - return (rv); - } - switch (mode) { - case NNI_EP_MODE_DIAL: - nni_posix_nn2sockaddr(&sun, sa); - nni_posix_epdesc_set_remote(ed, &sun, sizeof(sun)); - break; - case NNI_EP_MODE_LISTEN: - - if ((rv = nni_plat_ipc_remove_stale(sa->s_ipc.sa_path)) != 0) { - return (rv); - } - - nni_posix_nn2sockaddr(&sun, sa); - nni_posix_epdesc_set_local(ed, &sun, sizeof(sun)); - break; - default: - nni_posix_epdesc_fini(ed); - return (NNG_EINVAL); - } - - *epp = (void *) ed; - return (0); -} - -void -nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) -{ - nni_posix_epdesc_fini((void *) ep); -} - -void -nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) -{ - nni_posix_epdesc_close((void *) ep); -} - -int -nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits) -{ - return (nni_posix_epdesc_set_permissions((void *) ep, (mode_t) bits)); -} - -int -nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *attr) -{ - NNI_ARG_UNUSED(ep); - NNI_ARG_UNUSED(attr); - return (NNG_ENOTSUP); -} - -// UNIX DOMAIN SOCKETS -- these have names in the file namespace. -// We are going to check to see if there was a name already there. -// If there was, and nothing is listening (ECONNREFUSED), then we -// will just try to cleanup the old socket. Note that this is not -// perfect in all scenarios, so use this with caution. -static int -nni_plat_ipc_remove_stale(const char *path) -{ - int fd; - struct sockaddr_un sun; - size_t sz; - - sun.sun_family = AF_UNIX; - sz = sizeof(sun.sun_path); - - if (nni_strlcpy(sun.sun_path, path, sz) >= sz) { - return (NNG_EADDRINVAL); - } - - if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { - return (nni_plat_errno(errno)); - } - - // There is an assumption here that connect() returns immediately - // (even when non-blocking) when a server is absent. This seems - // to be true for the platforms we've tried. If it doesn't work, - // then the cleanup will fail. As this is supposed to be an - // exceptional case, don't worry. - (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if (connect(fd, (void *) &sun, sizeof(sun)) < 0) { - if (errno == ECONNREFUSED) { - (void) unlink(path); - } - } - (void) close(fd); - return (0); -} - -int -nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) -{ - nni_posix_epdesc *ed = (void *) ep; - - return (nni_posix_epdesc_listen(ed)); -} - -void -nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_connect((void *) ep, aio); -} - -void -nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) -{ - nni_posix_epdesc_accept((void *) ep, aio); -} - -void -nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *p) -{ - nni_posix_pipedesc_fini((void *) p); -} - -void -nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *p) -{ - nni_posix_pipedesc_close((void *) p); -} - -void -nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_send((void *) p, aio); -} - -void -nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *p, nni_aio *aio) -{ - nni_posix_pipedesc_recv((void *) p, aio); -} - -int -nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *p, uint64_t *uid) -{ - int rv; - uint64_t ignore; - - if ((rv = nni_posix_pipedesc_get_peerid( - (void *) p, uid, &ignore, &ignore, &ignore)) != 0) { - return (rv); - } - return (0); -} - -int -nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *p, uint64_t *gid) -{ - int rv; - uint64_t ignore; - - if ((rv = nni_posix_pipedesc_get_peerid( - (void *) p, &ignore, gid, &ignore, &ignore)) != 0) { - return (rv); - } - return (0); -} - -int -nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *p, uint64_t *zid) -{ - int rv; - uint64_t ignore; - uint64_t id; - - if ((rv = nni_posix_pipedesc_get_peerid( - (void *) p, &ignore, &ignore, &ignore, &id)) != 0) { - return (rv); - } - if (id == (uint64_t) -1) { - // NB: -1 is not a legal zone id (illumos/Solaris) - return (NNG_ENOTSUP); - } - *zid = id; - return (0); -} - -int -nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *p, uint64_t *pid) -{ - int rv; - uint64_t ignore; - uint64_t id; - - if ((rv = nni_posix_pipedesc_get_peerid( - (void *) p, &ignore, &ignore, &id, &ignore)) != 0) { - return (rv); - } - if (id == (uint64_t) -1) { - // NB: -1 is not a legal process id - return (NNG_ENOTSUP); - } - *pid = id; - return (0); -} - -#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h new file mode 100644 index 00000000..caf7ed7b --- /dev/null +++ b/src/platform/posix/posix_ipc.h @@ -0,0 +1,48 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX +#include "platform/posix/posix_aio.h" + +#include <sys/types.h> // For mode_t + +struct nni_ipc_conn { + nni_posix_pfd * pfd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_aio * dial_aio; + nni_ipc_dialer *dialer; + nni_reap_item reap; +}; + +struct nni_ipc_dialer { + nni_list connq; // pending connections + bool closed; + nni_mtx mtx; +}; + +struct nni_ipc_listener { + nni_posix_pfd *pfd; + nni_list acceptq; + bool started; + bool closed; + char * path; + mode_t perms; + nni_mtx mtx; +}; + +extern int nni_posix_ipc_conn_init(nni_ipc_conn **, nni_posix_pfd *); +extern void nni_posix_ipc_conn_start(nni_ipc_conn *); + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c new file mode 100644 index 00000000..2b46fb12 --- /dev/null +++ b/src/platform/posix/posix_ipcconn.c @@ -0,0 +1,494 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX + +#include <errno.h> +#include <fcntl.h> +#include <poll.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <sys/un.h> +#include <unistd.h> +#if defined(NNG_HAVE_GETPEERUCRED) +#include <ucred.h> +#elif defined(NNG_HAVE_LOCALPEERCRED) +#include <sys/ucred.h> +#endif + +#ifdef NNG_HAVE_ALLOCA +#include <alloca.h> +#endif + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#include "posix_ipc.h" + +static void +ipc_conn_dowrite(nni_ipc_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->writeq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov * aiov; + struct msghdr hdr; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + memset(&hdr, 0, sizeof(hdr)); + nni_aio_get_iov(aio, &naiov, &aiov); + +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len > 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + hdr.msg_iovlen = niov; + hdr.msg_iov = iovec; + + if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + nni_aio_bump_count(aio, n); + // We completed the entire operation on this aio. + // (Sendmsg never returns a partial result.) + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +static void +ipc_conn_doread(nni_ipc_conn *c) +{ + nni_aio *aio; + int fd; + + if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + return; + } + + while ((aio = nni_list_first(&c->readq)) != NULL) { + unsigned i; + int n; + int niov; + unsigned naiov; + nni_iov *aiov; +#ifdef NNG_HAVE_ALLOCA + struct iovec *iovec; +#else + struct iovec iovec[16]; +#endif + + nni_aio_get_iov(aio, &naiov, &aiov); +#ifdef NNG_HAVE_ALLOCA + if (naiov > 64) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } + iovec = alloca(naiov * sizeof(*iovec)); +#else + if (naiov > NNI_NUM_ELEMENTS(iovec)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_EINVAL); + continue; + } +#endif + for (niov = 0, i = 0; i < naiov; i++) { + if (aiov[i].iov_len != 0) { + iovec[niov].iov_len = aiov[i].iov_len; + iovec[niov].iov_base = aiov[i].iov_buf; + niov++; + } + } + + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error( + aio, nni_plat_errno(errno)); + return; + } + } + + if (n == 0) { + // No bytes indicates a closed descriptor. + // This implicitly completes this (all!) aio. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + continue; + } + + nni_aio_bump_count(aio, n); + + // We completed the entire operation on this aio. + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } +} + +void +nni_ipc_conn_close(nni_ipc_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + nni_aio *aio; + c->closed = true; + while (((aio = nni_list_first(&c->readq)) != NULL) || + ((aio = nni_list_first(&c->writeq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_posix_pfd_close(c->pfd); + } + nni_mtx_unlock(&c->mtx); +} + +static void +ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_ipc_conn *c = arg; + + if (events & (POLLHUP | POLLERR | POLLNVAL)) { + nni_ipc_conn_close(c); + return; + } + nni_mtx_lock(&c->mtx); + if (events & POLLIN) { + ipc_conn_doread(c); + } + if (events & POLLOUT) { + ipc_conn_dowrite(c); + } + events = 0; + if (!nni_list_empty(&c->writeq)) { + events |= POLLOUT; + } + if (!nni_list_empty(&c->readq)) { + events |= POLLIN; + } + if ((!c->closed) && (events != 0)) { + nni_posix_pfd_arm(pfd, events); + } + nni_mtx_unlock(&c->mtx); +} + +static void +ipc_conn_cancel(nni_aio *aio, int rv) +{ + nni_ipc_conn *c = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio) +{ + + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->writeq, aio); + + if (nni_list_first(&c->writeq) == aio) { + ipc_conn_dowrite(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->writeq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLOUT); + } + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->readq, aio); + + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. + if (nni_list_first(&c->readq) == aio) { + ipc_conn_doread(c); + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. + if (nni_list_first(&c->readq) == aio) { + nni_posix_pfd_arm(c->pfd, POLLIN); + } + } + nni_mtx_unlock(&c->mtx); +} + +int +ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid, + uint64_t *prid, uint64_t *znid) +{ + int fd = nni_posix_pfd_fd(c->pfd); +#if defined(NNG_HAVE_GETPEEREID) + uid_t uid; + gid_t gid; + + if (getpeereid(fd, &uid, &gid) != 0) { + return (nni_plat_errno(errno)); + } + *euid = uid; + *egid = gid; + *prid = (uint64_t) -1; + *znid = (uint64_t) -1; + return (0); +#elif defined(NNG_HAVE_GETPEERUCRED) + ucred_t *ucp = NULL; + if (getpeerucred(fd, &ucp) != 0) { + return (nni_plat_errno(errno)); + } + *euid = ucred_geteuid(ucp); + *egid = ucred_getegid(ucp); + *prid = ucred_getpid(ucp); + *znid = ucred_getzoneid(ucp); + ucred_free(ucp); + return (0); +#elif defined(NNG_HAVE_SOPEERCRED) + struct ucred uc; + socklen_t len = sizeof(uc); + if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) { + return (nni_plat_errno(errno)); + } + *euid = uc.uid; + *egid = uc.gid; + *prid = uc.pid; + *znid = (uint64_t) -1; + return (0); +#elif defined(NNG_HAVE_LOCALPEERCRED) + struct xucred xu; + socklen_t len = sizeof(xu); + if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) { + return (nni_plat_errno(errno)); + } + *euid = xu.cr_uid; + *egid = xu.cr_gid; + *prid = (uint64_t) -1; + *znid = (uint64_t) -1; +#if defined(LOCAL_PEERPID) // present (undocumented) on macOS + { + pid_t pid; + if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERPID, &pid, &len) == + 0) { + *prid = (uint64_t) pid; + } + } +#endif // LOCAL_PEERPID + return (0); +#else + if (fd < 0) { + return (NNG_ECLOSED); + } + NNI_ARG_UNUSED(euid); + NNI_ARG_UNUSED(egid); + NNI_ARG_UNUSED(prid); + NNI_ARG_UNUSED(znid); + return (NNG_ENOTSUP); +#endif +} +int +nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *uid) +{ + int rv; + uint64_t ignore; + + if ((rv = ipc_conn_peerid(c, uid, &ignore, &ignore, &ignore)) != 0) { + return (rv); + } + return (0); +} + +int +nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *gid) +{ + int rv; + uint64_t ignore; + + if ((rv = ipc_conn_peerid(c, &ignore, gid, &ignore, &ignore)) != 0) { + return (rv); + } + return (0); +} + +int +nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *zid) +{ + int rv; + uint64_t ignore; + uint64_t id; + + if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) { + return (rv); + } + if (id == (uint64_t) -1) { + // NB: -1 is not a legal zone id (illumos/Solaris) + return (NNG_ENOTSUP); + } + *zid = id; + return (0); +} + +int +nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid) +{ + int rv; + uint64_t ignore; + uint64_t id; + + if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) { + return (rv); + } + if (id == (uint64_t) -1) { + // NB: -1 is not a legal process id + return (NNG_ENOTSUP); + } + *pid = id; + return (0); +} + +int +nni_posix_ipc_conn_init(nni_ipc_conn **cp, nni_posix_pfd *pfd) +{ + nni_ipc_conn *c; + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + + c->closed = false; + c->pfd = pfd; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + *cp = c; + return (0); +} + +void +nni_posix_ipc_conn_start(nni_ipc_conn *c) +{ + nni_posix_pfd_set_cb(c->pfd, ipc_conn_cb, c); +} + +void +nni_ipc_conn_fini(nni_ipc_conn *c) +{ + nni_ipc_conn_close(c); + nni_posix_pfd_fini(c->pfd); + nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN + c->pfd = NULL; + nni_mtx_unlock(&c->mtx); + nni_mtx_fini(&c->mtx); + + NNI_FREE_STRUCT(c); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c new file mode 100644 index 00000000..13732911 --- /dev/null +++ b/src/platform/posix/posix_ipcdial.c @@ -0,0 +1,238 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX + +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_ipc.h" + +// Dialer stuff. +int +nni_ipc_dialer_init(nni_ipc_dialer **dp) +{ + nni_ipc_dialer *d; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&d->mtx); + d->closed = false; + nni_aio_list_init(&d->connq); + *dp = d; + return (0); +} + +void +nni_ipc_dialer_close(nni_ipc_dialer *d) +{ + nni_mtx_lock(&d->mtx); + if (!d->closed) { + nni_aio *aio; + d->closed = true; + while ((aio = nni_list_first(&d->connq)) != NULL) { + nni_ipc_conn *c; + nni_list_remove(&d->connq, aio); + if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_ipc_conn_close(c); + nni_reap( + &c->reap, (nni_cb) nni_ipc_conn_fini, c); + } + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } + nni_mtx_unlock(&d->mtx); +} + +void +nni_ipc_dialer_fini(nni_ipc_dialer *d) +{ + nni_ipc_dialer_close(d); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +ipc_dialer_cancel(nni_aio *aio, int rv) +{ + nni_ipc_dialer *d = nni_aio_get_prov_data(aio); + nni_ipc_conn * c; + + nni_mtx_lock(&d->mtx); + if ((!nni_aio_list_active(aio)) || + ((c = nni_aio_get_prov_extra(aio, 0)) == NULL)) { + nni_mtx_unlock(&d->mtx); + return; + } + nni_aio_list_remove(aio); + c->dial_aio = NULL; + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + nni_aio_finish_error(aio, rv); + nni_ipc_conn_fini(c); +} + +static void +ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg) +{ + nni_ipc_conn * c = arg; + nni_ipc_dialer *d = c->dialer; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + aio = c->dial_aio; + if ((aio == NULL) || (!nni_aio_list_active(aio))) { + nni_mtx_unlock(&d->mtx); + return; + } + + if (ev & POLLNVAL) { + rv = EBADF; + + } else { + socklen_t sz = sizeof(int); + int fd = nni_posix_pfd_fd(pfd); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + if (rv == EINPROGRESS) { + // Connection still in progress, come back + // later. + nni_mtx_unlock(&d->mtx); + return; + } else if (rv != 0) { + rv = nni_plat_errno(rv); + } + } + + c->dial_aio = NULL; + nni_aio_list_remove(aio); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + + if (rv != 0) { + nni_ipc_conn_close(c); + nni_ipc_conn_fini(c); + nni_aio_finish_error(aio, rv); + return; + } + + nni_posix_ipc_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +// We don't give local address binding support. Outbound dialers always +// get an ephemeral port. +void +nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + nni_ipc_conn * c; + nni_posix_pfd * pfd = NULL; + struct sockaddr_storage ss; + size_t sslen; + int fd; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + (ss.ss_family != AF_UNIX)) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + + if ((fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + nni_aio_finish_error(aio, nni_plat_errno(errno)); + return; + } + + // This arranges for the fd to be in nonblocking mode, and adds the + // pollfd to the list. + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + (void) close(fd); + nni_aio_finish_error(aio, rv); + return; + } + if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_finish_error(aio, rv); + return; + } + c->dialer = d; + nni_posix_pfd_set_cb(pfd, ipc_dialer_cb, c); + + nni_mtx_lock(&d->mtx); + if (d->closed) { + rv = NNG_ECLOSED; + goto error; + } + if ((rv = nni_aio_schedule(aio, ipc_dialer_cancel, d)) != 0) { + goto error; + } + if ((rv = connect(fd, (void *) &ss, sslen)) != 0) { + if (errno != EINPROGRESS) { + if (errno == ENOENT) { + // No socket present means nobody listening. + rv = NNG_ECONNREFUSED; + } else { + rv = nni_plat_errno(errno); + } + goto error; + } + // Asynchronous connect. + if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) { + goto error; + } + c->dial_aio = aio; + nni_aio_set_prov_extra(aio, 0, c); + nni_list_append(&d->connq, aio); + nni_mtx_unlock(&d->mtx); + return; + } + // Immediate connect, cool! This probably only happens + // on loopback, and probably not on every platform. + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_posix_ipc_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + return; + +error: + nni_aio_set_prov_extra(aio, 0, NULL); + nni_mtx_unlock(&d->mtx); + nni_reap(&c->reap, (nni_cb) nni_ipc_conn_fini, c); + nni_aio_finish_error(aio, rv); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c new file mode 100644 index 00000000..4e33a08f --- /dev/null +++ b/src/platform/posix/posix_ipclisten.c @@ -0,0 +1,373 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX + +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <sys/un.h> +#include <unistd.h> + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif + +#include "posix_ipc.h" + +int +nni_ipc_listener_init(nni_ipc_listener **lp) +{ + nni_ipc_listener *l; + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_init(&l->mtx); + + l->pfd = NULL; + l->closed = false; + l->started = false; + l->perms = 0; + + nni_aio_list_init(&l->acceptq); + *lp = l; + return (0); +} + +static void +ipc_listener_doclose(nni_ipc_listener *l) +{ + nni_aio *aio; + char * path; + + l->closed = true; + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + + if (l->pfd != NULL) { + nni_posix_pfd_close(l->pfd); + } + if (l->started && ((path = l->path) != NULL)) { + l->path = NULL; + (void) unlink(path); + nni_strfree(path); + } +} + +void +nni_ipc_listener_close(nni_ipc_listener *l) +{ + nni_mtx_lock(&l->mtx); + ipc_listener_doclose(l); + nni_mtx_unlock(&l->mtx); +} + +static void +ipc_listener_doaccept(nni_ipc_listener *l) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&l->acceptq)) != NULL) { + int newfd; + int fd; + int rv; + nni_posix_pfd *pfd; + nni_ipc_conn * c; + + fd = nni_posix_pfd_fd(l->pfd); + +#ifdef NNG_USE_ACCEPT4 + newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); + if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { + newfd = accept(fd, NULL, NULL); + } +#else + newfd = accept(fd, NULL, NULL); +#endif + if (newfd < 0) { + switch (errno) { + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + rv = nni_posix_pfd_arm(l->pfd, POLLIN); + if (rv != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + // Come back later... + return; + case ECONNABORTED: + case ECONNRESET: + // Eat them, they aren't interesting. + continue; + default: + // Error this one, but keep moving to the next. + rv = nni_plat_errno(errno); + NNI_ASSERT(rv != 0); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + } + + if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + close(newfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) { + nni_posix_pfd_fini(pfd); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + continue; + } + + nni_aio_list_remove(aio); + nni_posix_ipc_conn_start(c); + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } +} + +static void +ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_ipc_listener *l = arg; + NNI_ARG_UNUSED(pfd); + + nni_mtx_lock(&l->mtx); + if (events & POLLNVAL) { + ipc_listener_doclose(l); + nni_mtx_unlock(&l->mtx); + return; + } + + // Anything else will turn up in accept. + ipc_listener_doaccept(l); + nni_mtx_unlock(&l->mtx); +} + +static void +ipc_listener_cancel(nni_aio *aio, int rv) +{ + nni_ipc_listener *l = nni_aio_get_prov_data(aio); + + // This is dead easy, because we'll ignore the completion if there + // isn't anything to do the accept on! + NNI_ASSERT(rv != 0); + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +static int +ipc_remove_stale(const char *path) +{ + int fd; + struct sockaddr_un sun; + size_t sz; + + sun.sun_family = AF_UNIX; + sz = sizeof(sun.sun_path); + + if (nni_strlcpy(sun.sun_path, path, sz) >= sz) { + return (NNG_EADDRINVAL); + } + + if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + return (nni_plat_errno(errno)); + } + + // There is an assumption here that connect() returns immediately + // (even when non-blocking) when a server is absent. This seems + // to be true for the platforms we've tried. If it doesn't work, + // then the cleanup will fail. As this is supposed to be an + // exceptional case, don't worry. + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + if (connect(fd, (void *) &sun, sizeof(sun)) < 0) { + if (errno == ECONNREFUSED) { + (void) unlink(path); + } + } + (void) close(fd); + return (0); +} + +int +nni_ipc_listener_set_permissions(nni_ipc_listener *l, int mode) +{ + if ((mode & S_IFMT) != 0) { + return (NNG_EINVAL); + } + mode |= S_IFSOCK; // set IFSOCK to ensure non-zero + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_EBUSY); + } + l->perms = mode; + nni_mtx_unlock(&l->mtx); + return (0); +} + +int +nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *sd) +{ + NNI_ARG_UNUSED(l); + NNI_ARG_UNUSED(sd); + return (NNG_ENOTSUP); +} + +int +nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa) +{ + socklen_t len; + struct sockaddr_storage ss; + int rv; + int fd; + nni_posix_pfd * pfd; + char * path; + + if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + (ss.ss_family != AF_UNIX)) { + return (NNG_EADDRINVAL); + } + + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_ESTATE); + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + path = nni_strdup(sa->s_ipc.sa_path); + if (path == NULL) { + return (NNG_ENOMEM); + } + + if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + return (rv); + } + + if (((rv = ipc_remove_stale(path)) != 0) || + ((rv = nni_posix_pfd_init(&pfd, fd)) != 0)) { + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + (void) close(fd); + return (rv); + } + + if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + nni_posix_pfd_fini(pfd); + return (rv); + } + + if (((l->perms != 0) && (chmod(path, l->perms & ~S_IFMT) != 0)) || + (listen(fd, 128) != 0)) { + rv = nni_plat_errno(errno); + (void) unlink(path); + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + nni_posix_pfd_fini(pfd); + return (rv); + } + + nni_posix_pfd_set_cb(pfd, ipc_listener_cb, l); + + l->pfd = pfd; + l->started = true; + l->path = path; + nni_mtx_unlock(&l->mtx); + + return (0); +} + +void +nni_ipc_listener_fini(nni_ipc_listener *l) +{ + nni_posix_pfd *pfd; + + nni_mtx_lock(&l->mtx); + ipc_listener_doclose(l); + pfd = l->pfd; + nni_mtx_unlock(&l->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +void +nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio) +{ + int rv; + + // Accept is simpler than the connect case. With accept we just + // need to wait for the socket to be readable to indicate an incoming + // connection is ready for us. There isn't anything else for us to + // do really, as that will have been done in listen. + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + + if (!l->started) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, ipc_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&l->acceptq, aio); + if (nni_list_first(&l->acceptq) == aio) { + ipc_listener_doaccept(l); + } + nni_mtx_unlock(&l->mtx); +} + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c deleted file mode 100644 index b11036ea..00000000 --- a/src/platform/posix/posix_pipedesc.c +++ /dev/null @@ -1,503 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" -#include "platform/posix/posix_pollq.h" - -#include <errno.h> -#include <fcntl.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <poll.h> -#include <stdbool.h> -#include <stdlib.h> -#include <string.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/uio.h> -#include <unistd.h> -#if defined(NNG_HAVE_GETPEERUCRED) -#include <ucred.h> -#elif defined(NNG_HAVE_LOCALPEERCRED) -#include <sys/ucred.h> -#include <sys/un.h> -#endif -#ifdef NNG_HAVE_ALLOCA -#include <alloca.h> -#endif - -// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open -// file descriptor for TCP socket, etc.) This contains the list of pending -// aios for that underlying socket, as well as the socket itself. -struct nni_posix_pipedesc { - nni_posix_pfd *pfd; - nni_list readq; - nni_list writeq; - bool closed; - nni_mtx mtx; -}; - -static void -nni_posix_pipedesc_finish(nni_aio *aio, int rv) -{ - nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, nni_aio_count(aio)); -} - -static void -nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) -{ - nni_aio *aio; - - pd->closed = true; - while ((aio = nni_list_first(&pd->readq)) != NULL) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - } - while ((aio = nni_list_first(&pd->writeq)) != NULL) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - } - nni_posix_pfd_close(pd->pfd); -} - -static void -nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) -{ - nni_aio *aio; - int fd; - - fd = nni_posix_pfd_fd(pd->pfd); - if ((fd < 0) || (pd->closed)) { - return; - } - - while ((aio = nni_list_first(&pd->writeq)) != NULL) { - unsigned i; - int n; - int niov; - unsigned naiov; - nni_iov * aiov; - struct msghdr hdr; -#ifdef NNG_HAVE_ALLOCA - struct iovec *iovec; -#else - struct iovec iovec[16]; -#endif - - memset(&hdr, 0, sizeof(hdr)); - - nni_aio_get_iov(aio, &naiov, &aiov); - -#ifdef NNG_HAVE_ALLOCA - if (naiov > 64) { - nni_posix_pipedesc_finish(aio, NNG_EINVAL); - continue; - } - iovec = alloca(naiov * sizeof(*iovec)); -#else - if (naiov > NNI_NUM_ELEMENTS(iovec)) { - nni_posix_pipedesc_finish(aio, NNG_EINVAL); - continue; - } -#endif - - for (niov = 0, i = 0; i < naiov; i++) { - if (aiov[i].iov_len > 0) { - iovec[niov].iov_len = aiov[i].iov_len; - iovec[niov].iov_base = aiov[i].iov_buf; - niov++; - } - } - -#ifndef MSG_NOSIGNAL -#define MSG_NOSIGNAL 0 -#endif - - hdr.msg_iovlen = niov; - hdr.msg_iov = iovec; - - if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { - switch (errno) { - case EINTR: - continue; - case EAGAIN: -#ifdef EWOULDBLOCK -#if EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif -#endif - return; - default: - nni_posix_pipedesc_finish( - aio, nni_plat_errno(errno)); - nni_posix_pipedesc_doclose(pd); - return; - } - } - - nni_aio_bump_count(aio, n); - // We completed the entire operation on this aioq. - // (Sendmsg never returns a partial result.) - nni_posix_pipedesc_finish(aio, 0); - - // Go back to start of loop to see if there is another - // aio ready for us to process. - } -} - -static void -nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) -{ - nni_aio *aio; - int fd; - - fd = nni_posix_pfd_fd(pd->pfd); - if ((fd < 0) || (pd->closed)) { - return; - } - - while ((aio = nni_list_first(&pd->readq)) != NULL) { - unsigned i; - int n; - int niov; - unsigned naiov; - nni_iov *aiov; -#ifdef NNG_HAVE_ALLOCA - struct iovec *iovec; -#else - struct iovec iovec[16]; -#endif - - nni_aio_get_iov(aio, &naiov, &aiov); -#ifdef NNG_HAVE_ALLOCA - if (naiov > 64) { - nni_posix_pipedesc_finish(aio, NNG_EINVAL); - continue; - } - iovec = alloca(naiov * sizeof(*iovec)); -#else - if (naiov > NNI_NUM_ELEMENTS(iovec)) { - nni_posix_pipedesc_finish(aio, NNG_EINVAL); - continue; - } -#endif - for (niov = 0, i = 0; i < naiov; i++) { - if (aiov[i].iov_len != 0) { - iovec[niov].iov_len = aiov[i].iov_len; - iovec[niov].iov_base = aiov[i].iov_buf; - niov++; - } - } - - if ((n = readv(fd, iovec, niov)) < 0) { - switch (errno) { - case EINTR: - continue; - case EAGAIN: - return; - default: - nni_posix_pipedesc_finish( - aio, nni_plat_errno(errno)); - nni_posix_pipedesc_doclose(pd); - return; - } - } - - if (n == 0) { - // No bytes indicates a closed descriptor. - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - nni_posix_pipedesc_doclose(pd); - return; - } - - nni_aio_bump_count(aio, n); - - // We completed the entire operation on this aioq. - nni_posix_pipedesc_finish(aio, 0); - - // Go back to start of loop to see if there is another - // aio ready for us to process. - } -} - -static void -nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg) -{ - nni_posix_pipedesc *pd = arg; - - nni_mtx_lock(&pd->mtx); - if (events & POLLIN) { - nni_posix_pipedesc_doread(pd); - } - if (events & POLLOUT) { - nni_posix_pipedesc_dowrite(pd); - } - if (events & (POLLHUP | POLLERR | POLLNVAL)) { - nni_posix_pipedesc_doclose(pd); - } else { - events = 0; - if (!nni_list_empty(&pd->writeq)) { - events |= POLLOUT; - } - if (!nni_list_empty(&pd->readq)) { - events |= POLLIN; - } - if ((!pd->closed) && (events != 0)) { - nni_posix_pfd_arm(pfd, events); - } - } - nni_mtx_unlock(&pd->mtx); -} - -void -nni_posix_pipedesc_close(nni_posix_pipedesc *pd) -{ - // NB: Events may still occur. - nni_mtx_lock(&pd->mtx); - nni_posix_pipedesc_doclose(pd); - nni_mtx_unlock(&pd->mtx); -} - -static void -nni_posix_pipedesc_cancel(nni_aio *aio, int rv) -{ - nni_posix_pipedesc *pd = nni_aio_get_prov_data(aio); - - nni_mtx_lock(&pd->mtx); - if (nni_aio_list_active(aio)) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - } - nni_mtx_unlock(&pd->mtx); -} - -void -nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) -{ - int rv; - - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&pd->mtx); - - if (pd->closed) { - nni_mtx_unlock(&pd->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } - - if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) { - nni_mtx_unlock(&pd->mtx); - nni_aio_finish_error(aio, rv); - return; - } - nni_aio_list_append(&pd->readq, aio); - - // If we are only job on the list, go ahead and try to do an - // immediate transfer. This allows for faster completions in - // many cases. We also need not arm a list if it was already - // armed. - if (nni_list_first(&pd->readq) == aio) { - nni_posix_pipedesc_doread(pd); - // If we are still the first thing on the list, that - // means we didn't finish the job, so arm the poller to - // complete us. - if (nni_list_first(&pd->readq) == aio) { - nni_posix_pfd_arm(pd->pfd, POLLIN); - } - } - nni_mtx_unlock(&pd->mtx); -} - -void -nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) -{ - int rv; - - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&pd->mtx); - - if (pd->closed) { - nni_mtx_unlock(&pd->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } - - if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) { - nni_mtx_unlock(&pd->mtx); - nni_aio_finish_error(aio, rv); - return; - } - nni_aio_list_append(&pd->writeq, aio); - - if (nni_list_first(&pd->writeq) == aio) { - nni_posix_pipedesc_dowrite(pd); - // If we are still the first thing on the list, that - // means we didn't finish the job, so arm the poller to - // complete us. - if (nni_list_first(&pd->writeq) == aio) { - nni_posix_pfd_arm(pd->pfd, POLLOUT); - } - } - nni_mtx_unlock(&pd->mtx); -} - -int -nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa) -{ - struct sockaddr_storage ss; - socklen_t sslen = sizeof(ss); - int fd = nni_posix_pfd_fd(pd->pfd); - - if (getpeername(fd, (void *) &ss, &sslen) != 0) { - return (nni_plat_errno(errno)); - } - return (nni_posix_sockaddr2nn(sa, &ss)); -} - -int -nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa) -{ - struct sockaddr_storage ss; - socklen_t sslen = sizeof(ss); - int fd = nni_posix_pfd_fd(pd->pfd); - - if (getsockname(fd, (void *) &ss, &sslen) != 0) { - return (nni_plat_errno(errno)); - } - return (nni_posix_sockaddr2nn(sa, &ss)); -} - -int -nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay) -{ - int val = nodelay ? 1 : 0; - int fd = nni_posix_pfd_fd(pd->pfd); - - if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { - return (nni_plat_errno(errno)); - } - return (0); -} - -int -nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep) -{ - int val = keep ? 1 : 0; - int fd = nni_posix_pfd_fd(pd->pfd); - - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { - return (nni_plat_errno(errno)); - } - return (0); -} - -int -nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid, - uint64_t *egid, uint64_t *prid, uint64_t *znid) -{ - int fd = nni_posix_pfd_fd(pd->pfd); -#if defined(NNG_HAVE_GETPEEREID) - uid_t uid; - gid_t gid; - - if (getpeereid(fd, &uid, &gid) != 0) { - return (nni_plat_errno(errno)); - } - *euid = uid; - *egid = gid; - *prid = (uint64_t) -1; - *znid = (uint64_t) -1; - return (0); -#elif defined(NNG_HAVE_GETPEERUCRED) - ucred_t *ucp = NULL; - if (getpeerucred(fd, &ucp) != 0) { - return (nni_plat_errno(errno)); - } - *euid = ucred_geteuid(ucp); - *egid = ucred_getegid(ucp); - *prid = ucred_getpid(ucp); - *znid = ucred_getzoneid(ucp); - ucred_free(ucp); - return (0); -#elif defined(NNG_HAVE_SOPEERCRED) - struct ucred uc; - socklen_t len = sizeof(uc); - if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) { - return (nni_plat_errno(errno)); - } - *euid = uc.uid; - *egid = uc.gid; - *prid = uc.pid; - *znid = (uint64_t) -1; - return (0); -#elif defined(NNG_HAVE_LOCALPEERCRED) - struct xucred xu; - socklen_t len = sizeof(xu); - if (getsockopt(fd, SOL_LOCAL, LOCAL_PEERCRED, &xu, &len) != 0) { - return (nni_plat_errno(errno)); - } - *euid = xu.cr_uid; - *egid = xu.cr_gid; - *prid = (uint64_t) -1; // XXX: macOS has undocumented - // LOCAL_PEERPID... - *znid = (uint64_t) -1; - return (0); -#else - if (fd < 0) { - return (NNG_ECLOSED); - } - NNI_ARG_UNUSED(euid); - NNI_ARG_UNUSED(egid); - NNI_ARG_UNUSED(prid); - NNI_ARG_UNUSED(znid); - return (NNG_ENOTSUP); -#endif -} - -int -nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd) -{ - nni_posix_pipedesc *pd; - - if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { - return (NNG_ENOMEM); - } - - pd->closed = false; - pd->pfd = pfd; - - nni_mtx_init(&pd->mtx); - nni_aio_list_init(&pd->readq); - nni_aio_list_init(&pd->writeq); - - nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd); - - *pdp = pd; - return (0); -} - -void -nni_posix_pipedesc_fini(nni_posix_pipedesc *pd) -{ - nni_posix_pipedesc_close(pd); - nni_posix_pfd_fini(pd->pfd); - pd->pfd = NULL; - nni_mtx_fini(&pd->mtx); - - NNI_FREE_STRUCT(pd); -} - -#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 63d858c2..f78d0b6b 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #ifdef NNG_USE_POSIX_RESOLV_GAI -#include "platform/posix/posix_aio.h" #include <arpa/inet.h> #include <ctype.h> diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index 5cd1887a..c0352c55 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" #include <arpa/inet.h> #include <errno.h> diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c index 7f627361..21f6ecfe 100644 --- a/src/platform/posix/posix_tcpdial.c +++ b/src/platform/posix/posix_tcpdial.c @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" #include <arpa/inet.h> #include <errno.h> diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c index cdcbecd9..8c186885 100644 --- a/src/platform/posix/posix_tcplisten.c +++ b/src/platform/posix/posix_tcplisten.c @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" #include <arpa/inet.h> #include <errno.h> @@ -151,6 +150,7 @@ static void tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_tcp_listener *l = arg; + NNI_ARG_UNUSED(pfd); nni_mtx_lock(&l->mtx); if (events & POLLNVAL) { @@ -158,7 +158,6 @@ tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg) nni_mtx_unlock(&l->mtx); return; } - NNI_ASSERT(pfd == l->pfd); // Anything else will turn up in accept. tcp_listener_doaccept(l); @@ -212,7 +211,7 @@ nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { nni_mtx_unlock(&l->mtx); - nni_posix_pfd_fini(pfd); + (void) close(fd); return (rv); } diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 759bdb96..873a02a1 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #ifdef NNG_PLATFORM_POSIX -#include "platform/posix/posix_aio.h" #include "platform/posix/posix_pollq.h" #include <errno.h> @@ -177,7 +176,7 @@ static void nni_posix_udp_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_plat_udp *udp = arg; - NNI_ASSERT(pfd == udp->udp_pfd); + NNI_ARG_UNUSED(pfd); nni_mtx_lock(&udp->udp_mtx); if (events & POLLIN) { diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index 93e45423..b8741b52 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -127,9 +127,8 @@ extern void nni_win_udp_sysfini(void); extern int nni_win_resolv_sysinit(void); extern void nni_win_resolv_sysfini(void); -extern int nni_win_io_init(nni_win_io *, HANDLE, nni_win_io_cb, void *); +extern int nni_win_io_init(nni_win_io *, nni_win_io_cb, void *); extern void nni_win_io_fini(nni_win_io *); -extern void nni_win_io_cancel(nni_win_io *); extern int nni_win_io_register(HANDLE); diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c index 1179b603..50b22343 100644 --- a/src/platform/windows/win_io.c +++ b/src/platform/windows/win_io.c @@ -61,14 +61,13 @@ nni_win_io_register(HANDLE h) } int -nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr) +nni_win_io_init(nni_win_io *io, nni_win_io_cb cb, void *ptr) { ZeroMemory(&io->olpd, sizeof(io->olpd)); io->cb = cb; io->ptr = ptr; io->aio = NULL; - io->f = f; io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); if (io->olpd.hEvent == NULL) { return (nni_win_error(GetLastError())); @@ -77,14 +76,6 @@ nni_win_io_init(nni_win_io *io, HANDLE f, nni_win_io_cb cb, void *ptr) } void -nni_win_io_cancel(nni_win_io *io) -{ - if (io->f != INVALID_HANDLE_VALUE) { - CancelIoEx(io->f, &io->olpd); - } -} - -void nni_win_io_fini(nni_win_io *io) { if (io->olpd.hEvent != NULL) { diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c deleted file mode 100644 index 169a2e00..00000000 --- a/src/platform/windows/win_ipc.c +++ /dev/null @@ -1,673 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#ifdef NNG_PLATFORM_WINDOWS - -#include <stdio.h> - -struct nni_plat_ipc_pipe { - HANDLE p; - int mode; - nni_win_event rcv_ev; - nni_win_event snd_ev; -}; - -struct nni_plat_ipc_ep { - char path[NNG_MAXADDRLEN + 16]; - nni_sockaddr addr; - int mode; - bool started; - HANDLE p; // accept side only - nni_win_event acc_ev; // accept side only - nni_aio * con_aio; // conn side only - nni_list_node node; // conn side uses this - SECURITY_ATTRIBUTES sec_attr; -}; - -static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *); -static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *); -static void nni_win_ipc_pipe_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_ipc_pipe_ops = { - .wev_start = nni_win_ipc_pipe_start, - .wev_finish = nni_win_ipc_pipe_finish, - .wev_cancel = nni_win_ipc_pipe_cancel, -}; - -static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *); -static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *); -static void nni_win_ipc_acc_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_ipc_acc_ops = { - .wev_start = nni_win_ipc_acc_start, - .wev_finish = nni_win_ipc_acc_finish, - .wev_cancel = nni_win_ipc_acc_cancel, -}; - -static int -nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) -{ - void * buf; - DWORD len; - BOOL ok; - int rv; - nni_plat_ipc_pipe *pipe = evt->ptr; - unsigned idx; - unsigned naiov; - nni_iov * aiov; - - NNI_ASSERT(aio != NULL); - - if (pipe->p == INVALID_HANDLE_VALUE) { - evt->status = NNG_ECLOSED; - evt->count = 0; - return (1); - } - - nni_aio_get_iov(aio, &naiov, &aiov); - idx = 0; - while ((idx < naiov) && (aiov[idx].iov_len == 0)) { - idx++; - } - NNI_ASSERT(idx < naiov); - // Now start a transfer. We assume that only one send can be - // outstanding on a pipe at a time. This is important to avoid - // scrambling the data anyway. Note that Windows named pipes do - // not appear to support scatter/gather, so we have to process - // each element in turn. - buf = aiov[idx].iov_buf; - len = (DWORD) aiov[idx].iov_len; - NNI_ASSERT(buf != NULL); - NNI_ASSERT(len != 0); - - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - if (len > 0x1000000) { - len = 0x1000000; - } - - evt->count = 0; - if (evt == &pipe->snd_ev) { - ok = WriteFile(pipe->p, buf, len, NULL, &evt->olpd); - } else { - ok = ReadFile(pipe->p, buf, len, NULL, &evt->olpd); - } - if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - - // Wait for the I/O completion event. Note that when an I/O - // completes immediately, the I/O completion packet is still - // delivered. - return (0); -} - -static void -nni_win_ipc_pipe_cancel(nni_win_event *evt) -{ - nni_plat_ipc_pipe *pipe = evt->ptr; - - CancelIoEx(pipe->p, &evt->olpd); -} - -static void -nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_aio_finish(aio, evt->status, evt->count); -} - -static int -nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p, int mode) -{ - nni_plat_ipc_pipe *pipe; - int rv; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - pipe->mode = mode; - rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe); - if (rv != 0) { - nni_plat_ipc_pipe_fini(pipe); - return (rv); - } - rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe); - if (rv != 0) { - nni_plat_ipc_pipe_fini(pipe); - return (rv); - } - - pipe->p = p; - *pipep = pipe; - return (0); -} - -void -nni_plat_ipc_pipe_send(nni_plat_ipc_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->snd_ev, aio); -} - -void -nni_plat_ipc_pipe_recv(nni_plat_ipc_pipe *pipe, nni_aio *aio) -{ - nni_win_event_submit(&pipe->rcv_ev, aio); -} - -void -nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe) -{ - HANDLE p; - - nni_win_event_close(&pipe->snd_ev); - nni_win_event_close(&pipe->rcv_ev); - - if ((p = pipe->p) != INVALID_HANDLE_VALUE) { - pipe->p = INVALID_HANDLE_VALUE; - CloseHandle(p); - } -} - -void -nni_plat_ipc_pipe_fini(nni_plat_ipc_pipe *pipe) -{ - nni_plat_ipc_pipe_close(pipe); - - nni_win_event_fini(&pipe->snd_ev); - nni_win_event_fini(&pipe->rcv_ev); - NNI_FREE_STRUCT(pipe); -} - -int -nni_plat_ipc_pipe_get_peer_uid(nni_plat_ipc_pipe *pipe, uint64_t *id) -{ - NNI_ARG_UNUSED(pipe); - NNI_ARG_UNUSED(id); - return (NNG_ENOTSUP); -} - -int -nni_plat_ipc_pipe_get_peer_gid(nni_plat_ipc_pipe *pipe, uint64_t *id) -{ - NNI_ARG_UNUSED(pipe); - NNI_ARG_UNUSED(id); - return (NNG_ENOTSUP); -} - -int -nni_plat_ipc_pipe_get_peer_zoneid(nni_plat_ipc_pipe *pipe, uint64_t *id) -{ - NNI_ARG_UNUSED(pipe); - NNI_ARG_UNUSED(id); - return (NNG_ENOTSUP); -} - -// nni_plat_ipc_pipe_get_peer_gid obtains the peer group id, if possible. -// NB: Only POSIX systems support group IDs. -int -nni_plat_ipc_pipe_get_peer_pid(nni_plat_ipc_pipe *pipe, uint64_t *pid) -{ - ULONG id; - switch (pipe->mode) { - case NNI_EP_MODE_DIAL: - if (!GetNamedPipeServerProcessId(pipe->p, &id)) { - return (nni_win_error(GetLastError())); - } - *pid = id; - break; - case NNI_EP_MODE_LISTEN: - if (!GetNamedPipeClientProcessId(pipe->p, &id)) { - return (nni_win_error(GetLastError())); - } - *pid = id; - break; - default: - // Should never occur! - return (NNG_EINVAL); - } - return (0); -} - -int -nni_plat_ipc_ep_init(nni_plat_ipc_ep **epp, const nni_sockaddr *sa, int mode) -{ - const char * path; - nni_plat_ipc_ep *ep; - - path = sa->s_ipc.sa_path; - if (nni_strnlen(path, NNG_MAXADDRLEN) >= NNG_MAXADDRLEN) { - return (NNG_EINVAL); - } - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } - ZeroMemory(ep, sizeof(*ep)); - - ep->mode = mode; - ep->sec_attr.nLength = sizeof(ep->sec_attr); - ep->sec_attr.lpSecurityDescriptor = NULL; - ep->sec_attr.bInheritHandle = FALSE; - NNI_LIST_NODE_INIT(&ep->node); - - ep->addr = *sa; - (void) snprintf(ep->path, sizeof(ep->path), "\\\\.\\pipe\\%s", path); - - *epp = ep; - return (0); -} - -int -nni_plat_ipc_ep_set_permissions(nni_plat_ipc_ep *ep, uint32_t bits) -{ - NNI_ARG_UNUSED(ep); - NNI_ARG_UNUSED(bits); - return (NNG_ENOTSUP); -} - -int -nni_plat_ipc_ep_set_security_descriptor(nni_plat_ipc_ep *ep, void *desc) -{ - if (ep->started) { - return (NNG_EBUSY); - } - if (ep->mode != NNI_EP_MODE_LISTEN) { - return (NNG_ENOTSUP); - } - if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) { - return (NNG_EINVAL); - } - ep->sec_attr.lpSecurityDescriptor = desc; - return (0); -} - -int -nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) -{ - int rv; - HANDLE p; - - if (ep->mode != NNI_EP_MODE_LISTEN) { - return (NNG_EINVAL); - } - if (ep->started) { - return (NNG_EBUSY); - } - - // We create the first named pipe, and we make sure that it is - // properly ours. - p = CreateNamedPipeA(ep->path, - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | - FILE_FLAG_FIRST_PIPE_INSTANCE, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | - PIPE_REJECT_REMOTE_CLIENTS, - PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr); - if (p == INVALID_HANDLE_VALUE) { - if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) { - rv = NNG_EADDRINUSE; - } else { - rv = nni_win_error(rv); - } - goto failed; - } - rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep); - if (rv != 0) { - goto failed; - } - - if ((rv = nni_win_iocp_register(p)) != 0) { - goto failed; - } - - ep->p = p; - ep->started = true; - return (0); - -failed: - - if (p != INVALID_HANDLE_VALUE) { - (void) CloseHandle(p); - } - - return (rv); -} - -static void -nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_ipc_ep * ep = evt->ptr; - nni_plat_ipc_pipe *pipe; - int rv; - HANDLE newp, oldp; - - if ((rv = evt->status) != 0) { - nni_aio_finish_error(aio, rv); - return; - } - - newp = CreateNamedPipeA(ep->path, - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | - PIPE_REJECT_REMOTE_CLIENTS, - PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &ep->sec_attr); - if (newp == INVALID_HANDLE_VALUE) { - rv = nni_win_error(GetLastError()); - // We connected, but as we cannot get a new pipe, - // we have to disconnect the old one. - DisconnectNamedPipe(ep->p); - nni_aio_finish_error(aio, rv); - return; - } - if ((rv = nni_win_iocp_register(newp)) != 0) { - // Disconnect the old pipe. - DisconnectNamedPipe(ep->p); - // And discard the half-baked new one. - DisconnectNamedPipe(newp); - (void) CloseHandle(newp); - nni_aio_finish_error(aio, rv); - return; - } - - oldp = ep->p; - ep->p = newp; - - if ((rv = nni_win_ipc_pipe_init(&pipe, oldp, NNI_EP_MODE_LISTEN)) != - 0) { - // The new pipe is already fine for us. Discard - // the old one, since failed to be able to use it. - DisconnectNamedPipe(oldp); - (void) CloseHandle(oldp); - nni_aio_finish_error(aio, rv); - return; - } - - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); -} - -static void -nni_win_ipc_acc_cancel(nni_win_event *evt) -{ - nni_plat_ipc_ep *ep = evt->ptr; - - (void) CancelIoEx(ep->p, &evt->olpd); - // Just to be sure. - (void) DisconnectNamedPipe(ep->p); -} - -static int -nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio) -{ - nni_plat_ipc_ep *ep = evt->ptr; - NNI_ARG_UNUSED(aio); - - if (!ConnectNamedPipe(ep->p, &evt->olpd)) { - int rv = GetLastError(); - switch (rv) { - case ERROR_PIPE_CONNECTED: - // Kind of like success, but as this is technically - // an "error", we have to complete it ourself. - evt->status = 0; - evt->count = 0; - return (1); - - case ERROR_IO_PENDING: - // Normal asynchronous operation. Wait for - // completion. - return (0); - - default: - // Fast-fail (synchronous). - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } - } - - // Synchronous success - the I/O completion packet should still - // be delivered. - return (0); -} - -void -nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) -{ - nni_win_event_submit(&ep->acc_ev, aio); -} - -// So Windows IPC is a bit different on the client side. There is no -// support for asynchronous connection, but we can fake it with a -// single thread that runs to establish the connection. That thread -// will run keep looping, sleeping for 10 ms between attempts. It -// performs non-blocking attempts to connect. -typedef struct nni_win_ipc_conn_work nni_win_ipc_conn_work; -struct nni_win_ipc_conn_work { - nni_list waiters; - nni_list workers; - nni_mtx mtx; - nni_cv cv; - nni_thr thr; - int exit; -}; - -static nni_win_ipc_conn_work nni_win_ipc_connecter; - -static void -nni_win_ipc_conn_thr(void *arg) -{ - nni_win_ipc_conn_work *w = arg; - nni_plat_ipc_ep * ep; - nni_plat_ipc_pipe * pipe; - nni_aio * aio; - HANDLE p; - int rv; - - nni_mtx_lock(&w->mtx); - for (;;) { - if (w->exit) { - break; - } - while ((ep = nni_list_first(&w->waiters)) != NULL) { - nni_list_remove(&w->waiters, ep); - nni_list_append(&w->workers, ep); - } - - while ((ep = nni_list_first(&w->workers)) != NULL) { - - nni_list_remove(&w->workers, ep); - - if ((aio = ep->con_aio) == NULL) { - continue; - } - ep->con_aio = NULL; - - pipe = NULL; - - p = CreateFileA(ep->path, GENERIC_READ | GENERIC_WRITE, - 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, - NULL); - - if (p == INVALID_HANDLE_VALUE) { - switch ((rv = GetLastError())) { - case ERROR_PIPE_BUSY: - // Still in progress. This - // shouldn't happen unless the - // other side aborts the - // connection. - ep->con_aio = aio; - nni_list_append(&w->waiters, ep); - continue; - - case ERROR_FILE_NOT_FOUND: - rv = NNG_ECONNREFUSED; - break; - default: - rv = nni_win_error(rv); - break; - } - goto fail; - } - if (((rv = nni_win_ipc_pipe_init( - &pipe, p, NNI_EP_MODE_DIAL)) != 0) || - ((rv = nni_win_iocp_register(p)) != 0)) { - goto fail; - } - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); - continue; - - fail: - if (p != INVALID_HANDLE_VALUE) { - DisconnectNamedPipe(p); - CloseHandle(p); - } - if (pipe != NULL) { - nni_plat_ipc_pipe_fini(pipe); - } - nni_aio_finish_error(aio, rv); - } - - if (nni_list_empty(&w->waiters)) { - // Wait until an endpoint is added. - nni_cv_wait(&w->cv); - } else { - // Wait 10 ms, unless woken earlier. - nni_cv_until(&w->cv, nni_clock() + 10); - } - } - nni_mtx_unlock(&w->mtx); -} - -static void -nni_win_ipc_conn_cancel(nni_aio *aio, int rv) -{ - nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; - nni_plat_ipc_ep * ep = nni_aio_get_prov_data(aio); - - nni_mtx_lock(&w->mtx); - if (aio == ep->con_aio) { - ep->con_aio = NULL; - if (nni_list_active(&w->waiters, ep)) { - nni_list_remove(&w->waiters, ep); - nni_cv_wake(&w->cv); - } - nni_aio_finish_error(aio, rv); - } - nni_mtx_unlock(&w->mtx); -} - -void -nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) -{ - nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; - int rv; - - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&w->mtx); - if ((rv = nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep)) != 0) { - nni_mtx_unlock(&w->mtx); - nni_aio_finish_error(aio, rv); - return; - } - - NNI_ASSERT(!nni_list_active(&w->waiters, ep)); - - ep->con_aio = aio; - nni_list_append(&w->waiters, ep); - nni_cv_wake(&w->cv); - nni_mtx_unlock(&w->mtx); -} - -void -nni_plat_ipc_ep_fini(nni_plat_ipc_ep *ep) -{ - nni_plat_ipc_ep_close(ep); - if (ep->p != INVALID_HANDLE_VALUE) { - CloseHandle(ep->p); - ep->p = INVALID_HANDLE_VALUE; - } - nni_win_event_close(&ep->acc_ev); - nni_win_event_fini(&ep->acc_ev); - NNI_FREE_STRUCT(ep); -} - -void -nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) -{ - nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; - nni_aio * aio; - - switch (ep->mode) { - case NNI_EP_MODE_DIAL: - nni_mtx_lock(&w->mtx); - if (nni_list_active(&w->waiters, ep)) { - nni_list_remove(&w->waiters, ep); - } - if ((aio = ep->con_aio) != NULL) { - ep->con_aio = NULL; - nni_aio_finish_error(aio, NNG_ECLOSED); - } - nni_mtx_unlock(&w->mtx); - break; - - case NNI_EP_MODE_LISTEN: - nni_win_event_close(&ep->acc_ev); - if (ep->p != INVALID_HANDLE_VALUE) { - CloseHandle(ep->p); - ep->p = INVALID_HANDLE_VALUE; - } - break; - } -} - -int -nni_win_ipc_sysinit(void) -{ - int rv; - nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter; - - NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node); - NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node); - - nni_mtx_init(&worker->mtx); - nni_cv_init(&worker->cv, &worker->mtx); - - rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker); - if (rv != 0) { - return (rv); - } - - nni_thr_run(&worker->thr); - - return (0); -} - -void -nni_win_ipc_sysfini(void) -{ - nni_win_ipc_conn_work *worker = &nni_win_ipc_connecter; - - nni_mtx_lock(&worker->mtx); - worker->exit = 1; - nni_cv_wake(&worker->cv); - nni_mtx_unlock(&worker->mtx); - nni_thr_fini(&worker->thr); - nni_cv_fini(&worker->cv); - nni_mtx_fini(&worker->mtx); -} - -#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_ipc.h b/src/platform/windows/win_ipc.h new file mode 100644 index 00000000..51ce5548 --- /dev/null +++ b/src/platform/windows/win_ipc.h @@ -0,0 +1,62 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef PLATFORM_WIN_WINIPC_H +#define PLATFORM_WIN_WINIPC_H + +// This header file is private to the IPC (named pipes) support for Windows. + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +struct nni_ipc_conn { + HANDLE f; + nni_win_io recv_io; + nni_win_io send_io; + nni_win_io conn_io; + nni_list recv_aios; + nni_list send_aios; + nni_aio * conn_aio; + nni_ipc_dialer * dialer; + nni_ipc_listener *listener; + int recv_rv; + int send_rv; + int conn_rv; + bool closed; + nni_mtx mtx; + nni_cv cv; + nni_reap_item reap; +}; + +struct nni_ipc_dialer { + bool closed; // dialers are locked by the worker lock + nni_list aios; + nni_list_node node; // node on worker list +}; + +struct nni_ipc_listener { + char * path; + bool started; + bool closed; + HANDLE f; + SECURITY_ATTRIBUTES sec_attr; + nni_list aios; + nni_mtx mtx; + nni_cv cv; + nni_win_io io; + int rv; +}; + +extern int nni_win_ipc_conn_init(nni_ipc_conn **, HANDLE); + +#endif // NNG_PLATFORM_WINDOWS + +#endif // NNG_PLATFORM_WIN_WINIPC_H diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c new file mode 100644 index 00000000..d8ef4e4e --- /dev/null +++ b/src/platform/windows/win_ipcconn.c @@ -0,0 +1,388 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include "win_ipc.h" + +#include <stdio.h> + +static void +ipc_recv_start(nni_ipc_conn *c) +{ + nni_aio *aio; + unsigned idx; + unsigned naiov; + nni_iov *aiov; + void * buf; + DWORD len; + int rv; + + if (c->closed) { + while ((aio = nni_list_first(&c->recv_aios)) != NULL) { + nni_list_remove(&c->recv_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } +again: + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + + idx = 0; + while ((idx < naiov) && (aiov[idx].iov_len == 0)) { + idx++; + } + NNI_ASSERT(idx < naiov); + // Now start a transfer. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes do + // not appear to support scatter/gather, so we have to process + // each element in turn. + buf = aiov[idx].iov_buf; + len = (DWORD) aiov[idx].iov_len; + NNI_ASSERT(buf != NULL); + NNI_ASSERT(len != 0); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } + + if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +ipc_recv_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_ipc_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->recv_rv != 0) { + rv = c->recv_rv; + c->recv_rv = 0; + } + nni_aio_list_remove(aio); + ipc_recv_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + if ((rv == 0) && (num == 0)) { + // A zero byte receive is a remote close from the peer. + rv = NNG_ECLOSED; + } + nni_aio_finish_synch(aio, rv, num); +} +static void +ipc_recv_cancel(nni_aio *aio, int rv) +{ + nni_ipc_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->recv_aios)) { + c->recv_rv = rv; + CancelIoEx(c->f, &c->recv_io.olpd); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->recv_aios, aio); + if (aio == nni_list_first(&c->recv_aios)) { + ipc_recv_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +static void +ipc_send_start(nni_ipc_conn *c) +{ + nni_aio *aio; + unsigned idx; + unsigned naiov; + nni_iov *aiov; + void * buf; + DWORD len; + int rv; + + if (c->closed) { + while ((aio = nni_list_first(&c->send_aios)) != NULL) { + nni_list_remove(&c->send_aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&c->cv); + } +again: + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + + idx = 0; + while ((idx < naiov) && (aiov[idx].iov_len == 0)) { + idx++; + } + NNI_ASSERT(idx < naiov); + // Now start a transfer. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes do + // not appear to support scatter/gather, so we have to process + // each element in turn. + buf = aiov[idx].iov_buf; + len = (DWORD) aiov[idx].iov_len; + NNI_ASSERT(buf != NULL); + NNI_ASSERT(len != 0); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } + + if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; + } +} + +static void +ipc_send_cb(nni_win_io *io, int rv, size_t num) +{ + nni_aio * aio; + nni_ipc_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&c->mtx); + return; + } + if (c->send_rv != 0) { + rv = c->send_rv; + c->send_rv = 0; + } + nni_aio_list_remove(aio); + ipc_send_start(c); + if (c->closed) { + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + if ((rv == 0) && (num == 0)) { + // A zero byte receive is a remote close from the peer. + rv = NNG_ECLOSED; + } + nni_aio_finish_synch(aio, rv, num); +} + +static void +ipc_send_cancel(nni_aio *aio, int rv) +{ + nni_ipc_conn *c = nni_aio_get_prov_data(aio); + nni_mtx_lock(&c->mtx); + if (aio == nni_list_first(&c->send_aios)) { + c->send_rv = rv; + CancelIoEx(c->f, &c->send_io.olpd); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + if (c->closed) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, ipc_send_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&c->send_aios, aio); + if (aio == nni_list_first(&c->send_aios)) { + ipc_send_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +int +nni_win_ipc_conn_init(nni_ipc_conn **connp, HANDLE p) +{ + nni_ipc_conn *c; + int rv; + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + c->f = INVALID_HANDLE_VALUE; + nni_mtx_init(&c->mtx); + nni_cv_init(&c->cv, &c->mtx); + nni_aio_list_init(&c->recv_aios); + nni_aio_list_init(&c->send_aios); + + if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) || + ((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) { + nni_ipc_conn_fini(c); + return (rv); + } + + c->f = p; + *connp = c; + return (0); +} + +void +nni_ipc_conn_close(nni_ipc_conn *c) +{ + nni_mtx_lock(&c->mtx); + if (!c->closed) { + c->closed = true; + if (!nni_list_empty(&c->recv_aios)) { + CancelIoEx(c->f, &c->recv_io.olpd); + } + if (!nni_list_empty(&c->send_aios)) { + CancelIoEx(c->f, &c->send_io.olpd); + } + + if (c->f != INVALID_HANDLE_VALUE) { + // NB: closing the pipe is dangerous at this point. + DisconnectNamedPipe(c->f); + } + } + nni_mtx_unlock(&c->mtx); +} + +static void +ipc_conn_reap(nni_ipc_conn *c) +{ + nni_mtx_lock(&c->mtx); + while ((!nni_list_empty(&c->recv_aios)) || + (!nni_list_empty(&c->send_aios))) { + nni_cv_wait(&c->cv); + } + nni_mtx_unlock(&c->mtx); + + nni_win_io_fini(&c->recv_io); + nni_win_io_fini(&c->send_io); + nni_win_io_fini(&c->conn_io); + + if (c->f != INVALID_HANDLE_VALUE) { + CloseHandle(c->f); + } + nni_cv_fini(&c->cv); + nni_mtx_fini(&c->mtx); + NNI_FREE_STRUCT(c); +} + +void +nni_ipc_conn_fini(nni_ipc_conn *c) +{ + nni_ipc_conn_close(c); + + nni_reap(&c->reap, (nni_cb) ipc_conn_reap, c); +} + +int +nni_ipc_conn_get_peer_uid(nni_ipc_conn *c, uint64_t *id) +{ + NNI_ARG_UNUSED(c); + NNI_ARG_UNUSED(id); + return (NNG_ENOTSUP); +} + +int +nni_ipc_conn_get_peer_gid(nni_ipc_conn *c, uint64_t *id) +{ + NNI_ARG_UNUSED(c); + NNI_ARG_UNUSED(id); + return (NNG_ENOTSUP); +} + +int +nni_ipc_conn_get_peer_zoneid(nni_ipc_conn *c, uint64_t *id) +{ + NNI_ARG_UNUSED(c); + NNI_ARG_UNUSED(id); + return (NNG_ENOTSUP); +} + +int +nni_ipc_conn_get_peer_pid(nni_ipc_conn *c, uint64_t *pid) +{ + ULONG id; + if (c->dialer) { + if (!GetNamedPipeServerProcessId(c->f, &id)) { + return (nni_win_error(GetLastError())); + } + } else { + if (!GetNamedPipeClientProcessId(c->f, &id)) { + return (nni_win_error(GetLastError())); + } + } + *pid = id; + return (0); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c new file mode 100644 index 00000000..429bcedf --- /dev/null +++ b/src/platform/windows/win_ipcdial.c @@ -0,0 +1,265 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include "win_ipc.h" + +#include <stdio.h> + +int +nni_ipc_dialer_init(nni_ipc_dialer **dp) +{ + nni_ipc_dialer *d; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + d->closed = false; + nni_aio_list_init(&d->aios); + *dp = d; + return (0); +} + +// Windows IPC is a bit different on the client side. There is no +// support for asynchronous connection, but we can fake it with a +// single thread that runs to establish the connection. That thread +// will run keep looping, sleeping for 10 ms between attempts. It +// performs non-blocking attempts to connect. +typedef struct ipc_dial_work { + nni_list waiters; + nni_list workers; + nni_mtx mtx; + nni_cv cv; + nni_thr thr; + int exit; +} ipc_dial_work; + +static ipc_dial_work ipc_connecter; + +static void +ipc_dial_thr(void *arg) +{ + ipc_dial_work *w = arg; + + nni_mtx_lock(&w->mtx); + for (;;) { + nni_ipc_dialer *d; + + if (w->exit) { + break; + } + while ((d = nni_list_first(&w->waiters)) != NULL) { + nni_list_remove(&w->waiters, d); + nni_list_append(&w->workers, d); + } + + while ((d = nni_list_first(&w->workers)) != NULL) { + nni_ipc_conn *c; + nni_aio * aio; + HANDLE f; + int rv; + char * path; + + if ((aio = nni_list_first(&d->aios)) == NULL) { + nni_list_remove(&w->workers, d); + continue; + } + + path = nni_aio_get_prov_extra(aio, 0); + + f = CreateFileA(path, GENERIC_READ | GENERIC_WRITE, 0, + NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); + + if (f == INVALID_HANDLE_VALUE) { + switch ((rv = GetLastError())) { + case ERROR_PIPE_BUSY: + // Still in progress. This + // shouldn't happen unless the + // other side aborts the + // connection. + // back at the head of the list + nni_list_remove(&w->workers, d); + nni_list_prepend(&w->waiters, d); + continue; + + case ERROR_FILE_NOT_FOUND: + rv = NNG_ECONNREFUSED; + break; + default: + rv = nni_win_error(rv); + break; + } + nni_list_remove(&d->aios, aio); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_strfree(path); + nni_aio_finish_error(aio, rv); + continue; + } + + nni_list_remove(&d->aios, aio); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_strfree(path); + + if (((rv = nni_win_io_register(f)) != 0) || + ((rv = nni_win_ipc_conn_init(&c, f)) != 0)) { + DisconnectNamedPipe(f); + CloseHandle(f); + nni_aio_finish_error(aio, rv); + continue; + } + c->dialer = d; + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); + } + + if (nni_list_empty(&w->waiters)) { + // Wait until an endpoint is added. + nni_cv_wait(&w->cv); + } else { + // Wait 10 ms, unless woken earlier. + nni_cv_until(&w->cv, nni_clock() + 10); + } + } + nni_mtx_unlock(&w->mtx); +} + +static void +ipc_dial_cancel(nni_aio *aio, int rv) +{ + ipc_dial_work * w = &ipc_connecter; + nni_ipc_dialer *d = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&w->mtx); + if (nni_aio_list_active(aio)) { + char *path; + if (nni_list_active(&w->waiters, d)) { + nni_list_remove(&w->waiters, d); + nni_cv_wake(&w->cv); + } + nni_aio_list_remove(aio); + path = nni_aio_get_prov_extra(aio, 0); + nni_aio_set_prov_extra(aio, 0, NULL); + nni_strfree(path); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&w->mtx); +} + +void +nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +{ + ipc_dial_work *w = &ipc_connecter; + char * path; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + if (sa->s_family != NNG_AF_IPC) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; + } + if ((rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path)) != + 0) { + nni_aio_finish_error(aio, rv); + return; + } + + nni_mtx_lock(&w->mtx); + if ((rv = nni_aio_schedule(aio, ipc_dial_cancel, d)) != 0) { + nni_mtx_unlock(&w->mtx); + nni_strfree(path); + nni_aio_finish_error(aio, rv); + return; + } + + if (d->closed) { + nni_mtx_unlock(&w->mtx); + nni_strfree(path); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + nni_aio_set_prov_extra(aio, 0, path); + nni_list_append(&d->aios, aio); + if (nni_list_first(&d->aios) == aio) { + nni_list_append(&w->waiters, d); + nni_cv_wake(&w->cv); + } + nni_mtx_unlock(&w->mtx); +} + +void +nni_ipc_dialer_fini(nni_ipc_dialer *d) +{ + nni_ipc_dialer_close(d); + NNI_FREE_STRUCT(d); +} + +void +nni_ipc_dialer_close(nni_ipc_dialer *d) +{ + ipc_dial_work *w = &ipc_connecter; + nni_aio * aio; + + nni_mtx_lock(&w->mtx); + d->closed = true; + if (nni_list_active(&w->waiters, d)) { + nni_list_remove(&w->waiters, d); + } + while ((aio = nni_list_first(&d->aios)) != NULL) { + nni_list_remove(&d->aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&w->mtx); +} + +int +nni_win_ipc_sysinit(void) +{ + int rv; + ipc_dial_work *worker = &ipc_connecter; + + NNI_LIST_INIT(&worker->workers, nni_ipc_dialer, node); + NNI_LIST_INIT(&worker->waiters, nni_ipc_dialer, node); + + nni_mtx_init(&worker->mtx); + nni_cv_init(&worker->cv, &worker->mtx); + + rv = nni_thr_init(&worker->thr, ipc_dial_thr, worker); + if (rv != 0) { + return (rv); + } + + nni_thr_run(&worker->thr); + + return (0); +} + +void +nni_win_ipc_sysfini(void) +{ + ipc_dial_work *worker = &ipc_connecter; + + nni_reap_drain(); // so that listeners get cleaned up. + + nni_mtx_lock(&worker->mtx); + worker->exit = 1; + nni_cv_wake(&worker->cv); + nni_mtx_unlock(&worker->mtx); + nni_thr_fini(&worker->thr); + nni_cv_fini(&worker->cv); + nni_mtx_fini(&worker->mtx); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c new file mode 100644 index 00000000..20bb8548 --- /dev/null +++ b/src/platform/windows/win_ipclisten.c @@ -0,0 +1,296 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_WINDOWS + +#include "win_ipc.h" + +#include <stdio.h> + +static void +ipc_accept_done(nni_ipc_listener *l, int rv) +{ + nni_aio * aio; + HANDLE f; + nni_ipc_conn *c; + + aio = nni_list_first(&l->aios); + nni_list_remove(&l->aios, aio); + nni_cv_wake(&l->cv); + + if (l->closed) { + // Closed, so bail. + DisconnectNamedPipe(l->f); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + // Create a replacement pipe. + f = CreateNamedPipeA(l->path, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | + PIPE_REJECT_REMOTE_CLIENTS, + PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr); + if (f == INVALID_HANDLE_VALUE) { + // We couldn't create a replacement pipe, so we have to + // abort the client from our side, so that we can keep + // our server pipe available. + rv = nni_win_error(GetLastError()); + DisconnectNamedPipe(l->f); + nni_aio_finish_error(aio, rv); + return; + } + + if (((rv = nni_win_io_register(f)) != 0) || + ((rv = nni_win_ipc_conn_init(&c, l->f)) != 0)) { + DisconnectNamedPipe(l->f); + DisconnectNamedPipe(f); + CloseHandle(f); + nni_aio_finish_error(aio, rv); + return; + } + l->f = f; + c->listener = l; + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +static void +ipc_accept_start(nni_ipc_listener *l) +{ + nni_aio *aio; + + if (l->closed) { + while ((aio = nni_list_first(&l->aios)) != NULL) { + nni_list_remove(&l->aios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&l->cv); + } + + while ((aio = nni_list_first(&l->aios)) != NULL) { + int rv; + + if ((ConnectNamedPipe(l->f, &l->io.olpd)) || + ((rv = GetLastError()) == ERROR_IO_PENDING)) { + // Success, or pending, handled via completion pkt. + return; + } + if (rv == ERROR_PIPE_CONNECTED) { + // Kind of like success, but as this is technically + // an "error", we have to complete it ourself. + // Fake a completion. + ipc_accept_done(l, 0); + } else { + // Fast-fail (synchronous). + nni_aio_finish_error(aio, nni_win_error(rv)); + } + } +} + +static void +ipc_accept_cb(nni_win_io *io, int rv, size_t cnt) +{ + nni_ipc_listener *l = io->ptr; + + NNI_ARG_UNUSED(cnt); + + nni_mtx_lock(&l->mtx); + if (nni_list_empty(&l->aios)) { + // We canceled this somehow. We no longer care. + DisconnectNamedPipe(l->f); + nni_mtx_unlock(&l->mtx); + return; + } + if (l->rv != 0) { + rv = l->rv; + l->rv = 0; + } + ipc_accept_done(l, rv); + ipc_accept_start(l); + nni_mtx_unlock(&l->mtx); +} + +int +nni_ipc_listener_init(nni_ipc_listener **lp) +{ + nni_ipc_listener *l; + int rv; + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_win_io_init(&l->io, ipc_accept_cb, l)) != 0) { + NNI_FREE_STRUCT(l); + return (rv); + } + l->started = false; + l->closed = false; + l->sec_attr.nLength = sizeof(l->sec_attr); + l->sec_attr.lpSecurityDescriptor = NULL; + l->sec_attr.bInheritHandle = FALSE; + nni_aio_list_init(&l->aios); + nni_mtx_init(&l->mtx); + nni_cv_init(&l->cv, &l->mtx); + *lp = l; + return (0); +} + +int +nni_ipc_listener_set_permissions(nni_ipc_listener *l, int bits) +{ + NNI_ARG_UNUSED(l); + NNI_ARG_UNUSED(bits); + return (NNG_ENOTSUP); +} + +int +nni_ipc_listener_set_security_descriptor(nni_ipc_listener *l, void *desc) +{ + if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) { + return (NNG_EINVAL); + } + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_EBUSY); + } + l->sec_attr.lpSecurityDescriptor = desc; + nni_mtx_unlock(&l->mtx); + return (0); +} + +int +nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa) +{ + int rv; + HANDLE f; + char * path; + + nni_mtx_lock(&l->mtx); + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_EBUSY); + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + rv = nni_asprintf(&path, "\\\\.\\pipe\\%s", sa->s_ipc.sa_path); + if (rv != 0) { + nni_mtx_unlock(&l->mtx); + return (rv); + } + + f = CreateNamedPipeA(path, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | + FILE_FLAG_FIRST_PIPE_INSTANCE, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | + PIPE_REJECT_REMOTE_CLIENTS, + PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, &l->sec_attr); + if (f == INVALID_HANDLE_VALUE) { + if ((rv = GetLastError()) == ERROR_ACCESS_DENIED) { + rv = NNG_EADDRINUSE; + } else { + rv = nni_win_error(rv); + } + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + return (rv); + } + if ((rv = nni_win_io_register(f)) != 0) { + CloseHandle(f); + nni_mtx_unlock(&l->mtx); + nni_strfree(path); + return (rv); + } + + l->f = f; + l->path = path; + l->started = true; + nni_mtx_unlock(&l->mtx); + return (0); +} + +static void +ipc_accept_cancel(nni_aio *aio, int rv) +{ + nni_ipc_listener *l = nni_aio_get_prov_data(aio); + + nni_mtx_unlock(&l->mtx); + if (aio == nni_list_first(&l->aios)) { + l->rv = rv; + CancelIoEx(l->f, &l->io.olpd); + } else if (nni_aio_list_active(aio)) { + nni_list_remove(&l->aios, aio); + nni_cv_wake(&l->cv); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio) +{ + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + if (!l->started) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + if (l->closed) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + nni_list_append(&l->aios, aio); + if (nni_list_first(&l->aios) == aio) { + ipc_accept_start(l); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_ipc_listener_close(nni_ipc_listener *l) +{ + + nni_mtx_lock(&l->mtx); + if (!l->closed) { + l->closed = true; + if (!nni_list_empty(&l->aios)) { + CancelIoEx(l->f, &l->io.olpd); + } + DisconnectNamedPipe(l->f); + CloseHandle(l->f); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_ipc_listener_fini(nni_ipc_listener *l) +{ + nni_mtx_lock(&l->mtx); + while (!nni_list_empty(&l->aios)) { + nni_cv_wait(&l->cv); + } + nni_mtx_unlock(&l->mtx); + nni_win_io_fini(&l->io); + nni_strfree(l->path); + nni_cv_fini(&l->cv); + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +#endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c index c3a4a5d8..08b759ca 100644 --- a/src/platform/windows/win_tcpconn.c +++ b/src/platform/windows/win_tcpconn.c @@ -102,7 +102,7 @@ tcp_recv_cancel(nni_aio *aio, int rv) nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->recv_aios)) { c->recv_rv = rv; - nni_win_io_cancel(&c->recv_io); + CancelIoEx((HANDLE) c->s, &c->recv_io.olpd); } else if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -192,7 +192,7 @@ tcp_send_cancel(nni_aio *aio, int rv) nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->send_aios)) { c->send_rv = rv; - nni_win_io_cancel(&c->send_io); + CancelIoEx((HANDLE) c->s, &c->send_io.olpd); } else if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -273,10 +273,8 @@ nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s) nni_aio_list_init(&c->send_aios); c->conn_aio = NULL; - if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) != - 0) || - ((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) != - 0) || + if (((rv = nni_win_io_init(&c->recv_io, tcp_recv_cb, c)) != 0) || + ((rv = nni_win_io_init(&c->send_io, tcp_send_cb, c)) != 0) || ((rv = nni_win_io_register((HANDLE) s)) != 0)) { nni_tcp_conn_fini(c); return (rv); @@ -309,10 +307,10 @@ nni_tcp_conn_close(nni_tcp_conn *c) if (!c->closed) { c->closed = true; if (!nni_list_empty(&c->recv_aios)) { - nni_win_io_cancel(&c->recv_io); + CancelIoEx((HANDLE) c->s, &c->recv_io.olpd); } if (!nni_list_empty(&c->send_aios)) { - nni_win_io_cancel(&c->send_io); + CancelIoEx((HANDLE) c->s, &c->send_io.olpd); } if (c->s != INVALID_SOCKET) { shutdown(c->s, SD_BOTH); @@ -372,7 +370,6 @@ nni_tcp_conn_fini(nni_tcp_conn *c) while ((!nni_list_empty(&c->recv_aios)) || (!nni_list_empty(&c->send_aios))) { nni_cv_wait(&c->cv); - nni_mtx_unlock(&c->mtx); } nni_mtx_unlock(&c->mtx); diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c index 4a3e9f2f..5283ea81 100644 --- a/src/platform/windows/win_tcpdial.c +++ b/src/platform/windows/win_tcpdial.c @@ -70,7 +70,7 @@ nni_tcp_dialer_close(nni_tcp_dialer *d) if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { c->conn_rv = NNG_ECLOSED; - nni_win_io_cancel(&c->conn_io); + CancelIoEx((HANDLE) c->s, &c->conn_io.olpd); } } } @@ -104,7 +104,7 @@ tcp_dial_cancel(nni_aio *aio, int rv) if (c->conn_rv == 0) { c->conn_rv = rv; } - nni_win_io_cancel(&c->conn_io); + CancelIoEx((HANDLE) c->s, &c->conn_io.olpd); } nni_mtx_unlock(&d->mtx); } @@ -187,8 +187,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } - if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) != - 0) { + if ((rv = nni_win_io_init(&c->conn_io, tcp_dial_cb, c)) != 0) { nni_tcp_conn_fini(c); nni_aio_finish_error(aio, rv); return; diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c index 5055c32d..1f197246 100644 --- a/src/platform/windows/win_tcplisten.c +++ b/src/platform/windows/win_tcplisten.c @@ -147,7 +147,7 @@ nni_tcp_listener_close(nni_tcp_listener *l) if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { c->conn_rv = NNG_ECLOSED; - nni_win_io_cancel(&c->conn_io); + CancelIoEx((HANDLE) c->s, &c->conn_io.olpd); } } closesocket(l->s); @@ -246,7 +246,7 @@ tcp_accept_cancel(nni_aio *aio, int rv) if (c->conn_rv == 0) { c->conn_rv = rv; } - nni_win_io_cancel(&c->conn_io); + CancelIoEx((HANDLE) c->s, &c->conn_io.olpd); } nni_mtx_unlock(&l->mtx); } @@ -263,6 +263,11 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) return; } nni_mtx_lock(&l->mtx); + if (!l->started) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } if (l->closed) { nni_mtx_unlock(&l->mtx); nni_aio_finish_error(aio, NNG_ECLOSED); @@ -286,8 +291,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) c->listener = l; c->conn_aio = aio; nni_aio_set_prov_extra(aio, 0, c); - if (((rv = nni_win_io_init( - &c->conn_io, (HANDLE) l->s, tcp_accept_cb, c)) != 0) || + if (((rv = nni_win_io_init(&c->conn_io, tcp_accept_cb, c)) != 0) || ((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) { nni_aio_set_prov_extra(aio, 0, NULL); nni_mtx_unlock(&l->mtx); diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 7d99e507..ee72d6d8 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -20,16 +20,17 @@ // Windows named pipes. Other platforms could use other mechanisms, // but all implementations on the platform must use the same mechanism. -typedef struct ipc_pipe ipc_pipe; -typedef struct ipc_ep ipc_ep; +typedef struct ipctran_pipe ipctran_pipe; +typedef struct ipctran_dialer ipctran_dialer; +typedef struct ipctran_listener ipctran_listener; // ipc_pipe is one end of an IPC connection. -struct ipc_pipe { - nni_plat_ipc_pipe *ipp; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - nni_sockaddr sa; +struct ipctran_pipe { + nni_ipc_conn *conn; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + nni_sockaddr sa; uint8_t txhead[1 + sizeof(uint64_t)]; uint8_t rxhead[1 + sizeof(uint64_t)]; @@ -48,189 +49,201 @@ struct ipc_pipe { nni_mtx mtx; }; -struct ipc_ep { - nni_sockaddr sa; - nni_plat_ipc_ep *iep; - uint16_t proto; - size_t rcvmax; - nni_aio * aio; - nni_aio * user_aio; - nni_mtx mtx; +struct ipctran_dialer { + nni_sockaddr sa; + nni_ipc_dialer *dialer; + uint16_t proto; + size_t rcvmax; + nni_aio * aio; + nni_aio * user_aio; + nni_mtx mtx; }; -static void ipc_pipe_dosend(ipc_pipe *, nni_aio *); -static void ipc_pipe_dorecv(ipc_pipe *); -static void ipc_pipe_send_cb(void *); -static void ipc_pipe_recv_cb(void *); -static void ipc_pipe_nego_cb(void *); -static void ipc_ep_cb(void *); +struct ipctran_listener { + nni_sockaddr sa; + nni_ipc_listener *listener; + uint16_t proto; + size_t rcvmax; + nni_aio * aio; + nni_aio * user_aio; + nni_mtx mtx; +}; + +static void ipctran_pipe_send_start(ipctran_pipe *); +static void ipctran_pipe_recv_start(ipctran_pipe *); +static void ipctran_pipe_send_cb(void *); +static void ipctran_pipe_recv_cb(void *); +static void ipctran_pipe_nego_cb(void *); +static void ipctran_dialer_cb(void *); +static void ipctran_listener_cb(void *); static int -ipc_tran_init(void) +ipctran_init(void) { return (0); } static void -ipc_tran_fini(void) +ipctran_fini(void) { } static void -ipc_pipe_close(void *arg) +ipctran_pipe_close(void *arg) { - ipc_pipe *pipe = arg; + ipctran_pipe *p = arg; - nni_aio_close(pipe->rxaio); - nni_aio_close(pipe->txaio); - nni_aio_close(pipe->negaio); + nni_aio_close(p->rxaio); + nni_aio_close(p->txaio); + nni_aio_close(p->negaio); - nni_plat_ipc_pipe_close(pipe->ipp); + nni_ipc_conn_close(p->conn); } static void -ipc_pipe_stop(void *arg) +ipctran_pipe_stop(void *arg) { - ipc_pipe *pipe = arg; + ipctran_pipe *p = arg; - nni_aio_stop(pipe->rxaio); - nni_aio_stop(pipe->txaio); - nni_aio_stop(pipe->negaio); + nni_aio_stop(p->rxaio); + nni_aio_stop(p->txaio); + nni_aio_stop(p->negaio); } static void -ipc_pipe_fini(void *arg) +ipctran_pipe_fini(void *arg) { - ipc_pipe *pipe = arg; + ipctran_pipe *p = arg; - nni_aio_fini(pipe->rxaio); - nni_aio_fini(pipe->txaio); - nni_aio_fini(pipe->negaio); - if (pipe->ipp != NULL) { - nni_plat_ipc_pipe_fini(pipe->ipp); + nni_aio_fini(p->rxaio); + nni_aio_fini(p->txaio); + nni_aio_fini(p->negaio); + if (p->conn != NULL) { + nni_ipc_conn_fini(p->conn); } - if (pipe->rxmsg) { - nni_msg_free(pipe->rxmsg); + if (p->rxmsg) { + nni_msg_free(p->rxmsg); } - nni_mtx_fini(&pipe->mtx); - NNI_FREE_STRUCT(pipe); + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); } static int -ipc_pipe_init(ipc_pipe **pipep, ipc_ep *ep, void *ipp) +ipctran_pipe_init(ipctran_pipe **pipep, void *conn) { - ipc_pipe *p; - int rv; + ipctran_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->txaio, ipc_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, ipc_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negaio, ipc_pipe_nego_cb, p)) != 0)) { - ipc_pipe_fini(p); + if (((rv = nni_aio_init(&p->txaio, ipctran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->negaio, ipctran_pipe_nego_cb, p)) != 0)) { + ipctran_pipe_fini(p); return (rv); } nni_aio_list_init(&p->sendq); nni_aio_list_init(&p->recvq); + p->conn = conn; +#if 0 p->proto = ep->proto; p->rcvmax = ep->rcvmax; - p->ipp = ipp; p->sa.s_ipc.sa_family = NNG_AF_IPC; p->sa = ep->sa; - +#endif *pipep = p; return (0); } static void -ipc_cancel_start(nni_aio *aio, int rv) +ipctran_pipe_nego_cancel(nni_aio *aio, int rv) { - ipc_pipe *pipe = nni_aio_get_prov_data(aio); + ipctran_pipe *p = nni_aio_get_prov_data(aio); - nni_mtx_lock(&pipe->mtx); - if (pipe->user_negaio != aio) { - nni_mtx_unlock(&pipe->mtx); + nni_mtx_lock(&p->mtx); + if (p->user_negaio != aio) { + nni_mtx_unlock(&p->mtx); return; } - pipe->user_negaio = NULL; - nni_mtx_unlock(&pipe->mtx); + p->user_negaio = NULL; + nni_mtx_unlock(&p->mtx); - nni_aio_abort(pipe->negaio, rv); + nni_aio_abort(p->negaio, rv); nni_aio_finish_error(aio, rv); } static void -ipc_pipe_nego_cb(void *arg) +ipctran_pipe_nego_cb(void *arg) { - ipc_pipe *pipe = arg; - nni_aio * aio = pipe->negaio; - int rv; + ipctran_pipe *p = arg; + nni_aio * aio = p->negaio; + int rv; - nni_mtx_lock(&pipe->mtx); + nni_mtx_lock(&p->mtx); if ((rv = nni_aio_result(aio)) != 0) { goto done; } // We start transmitting before we receive. - if (pipe->gottxhead < pipe->wanttxhead) { - pipe->gottxhead += nni_aio_count(aio); - } else if (pipe->gotrxhead < pipe->wantrxhead) { - pipe->gotrxhead += nni_aio_count(aio); + if (p->gottxhead < p->wanttxhead) { + p->gottxhead += nni_aio_count(aio); + } else if (p->gotrxhead < p->wantrxhead) { + p->gotrxhead += nni_aio_count(aio); } - if (pipe->gottxhead < pipe->wanttxhead) { + if (p->gottxhead < p->wanttxhead) { nni_iov iov; - iov.iov_len = pipe->wanttxhead - pipe->gottxhead; - iov.iov_buf = &pipe->txhead[pipe->gottxhead]; + iov.iov_len = p->wanttxhead - p->gottxhead; + iov.iov_buf = &p->txhead[p->gottxhead]; nni_aio_set_iov(aio, 1, &iov); // send it down... - nni_plat_ipc_pipe_send(pipe->ipp, aio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_send(p->conn, aio); + nni_mtx_unlock(&p->mtx); return; } - if (pipe->gotrxhead < pipe->wantrxhead) { + if (p->gotrxhead < p->wantrxhead) { nni_iov iov; - iov.iov_len = pipe->wantrxhead - pipe->gotrxhead; - iov.iov_buf = &pipe->rxhead[pipe->gotrxhead]; + iov.iov_len = p->wantrxhead - p->gotrxhead; + iov.iov_buf = &p->rxhead[p->gotrxhead]; nni_aio_set_iov(aio, 1, &iov); - nni_plat_ipc_pipe_recv(pipe->ipp, aio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_recv(p->conn, aio); + nni_mtx_unlock(&p->mtx); return; } // We have both sent and received the headers. Lets check the // receive side header. - if ((pipe->rxhead[0] != 0) || (pipe->rxhead[1] != 'S') || - (pipe->rxhead[2] != 'P') || (pipe->rxhead[3] != 0) || - (pipe->rxhead[6] != 0) || (pipe->rxhead[7] != 0)) { + if ((p->rxhead[0] != 0) || (p->rxhead[1] != 'S') || + (p->rxhead[2] != 'P') || (p->rxhead[3] != 0) || + (p->rxhead[6] != 0) || (p->rxhead[7] != 0)) { rv = NNG_EPROTO; goto done; } - NNI_GET16(&pipe->rxhead[4], pipe->peer); + NNI_GET16(&p->rxhead[4], p->peer); done: - if ((aio = pipe->user_negaio) != NULL) { - pipe->user_negaio = NULL; + if ((aio = p->user_negaio) != NULL) { + p->user_negaio = NULL; nni_aio_finish(aio, rv, 0); } - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); } static void -ipc_pipe_send_cb(void *arg) +ipctran_pipe_send_cb(void *arg) { - ipc_pipe *pipe = arg; - nni_aio * aio; - nni_aio * txaio = pipe->txaio; - nni_msg * msg; - int rv; - size_t n; + ipctran_pipe *p = arg; + int rv; + nni_aio * aio; + size_t n; + nni_msg * msg; + nni_aio * txaio = p->txaio; - nni_mtx_lock(&pipe->mtx); - aio = nni_list_first(&pipe->sendq); + nni_mtx_lock(&p->mtx); + aio = nni_list_first(&p->sendq); if ((rv = nni_aio_result(txaio)) != 0) { // Intentionally we do not queue up another transfer. @@ -239,7 +252,7 @@ ipc_pipe_send_cb(void *arg) // The protocol should see this error, and close the // pipe itself, we hope. nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } @@ -247,17 +260,15 @@ ipc_pipe_send_cb(void *arg) n = nni_aio_count(txaio); nni_aio_iov_advance(txaio, n); if (nni_aio_iov_count(txaio) != 0) { - nni_plat_ipc_pipe_send(pipe->ipp, txaio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_send(p->conn, txaio); + nni_mtx_unlock(&p->mtx); return; } nni_aio_list_remove(aio); - if (!nni_list_empty(&pipe->sendq)) { - // schedule next send - ipc_pipe_dosend(pipe, nni_list_first(&pipe->sendq)); - } - nni_mtx_unlock(&pipe->mtx); + ipctran_pipe_send_start(p); + + nni_mtx_unlock(&p->mtx); msg = nni_aio_get_msg(aio); n = nni_msg_len(msg); @@ -267,17 +278,17 @@ ipc_pipe_send_cb(void *arg) } static void -ipc_pipe_recv_cb(void *arg) +ipctran_pipe_recv_cb(void *arg) { - ipc_pipe *pipe = arg; - nni_aio * aio; - int rv; - size_t n; - nni_msg * msg; - nni_aio * rxaio = pipe->rxaio; + ipctran_pipe *p = arg; + nni_aio * aio; + int rv; + size_t n; + nni_msg * msg; + nni_aio * rxaio = p->rxaio; - nni_mtx_lock(&pipe->mtx); - aio = nni_list_first(&pipe->recvq); + nni_mtx_lock(&p->mtx); + aio = nni_list_first(&p->recvq); if ((rv = nni_aio_result(rxaio)) != 0) { // Error on receive. This has to cause an error back @@ -290,29 +301,29 @@ ipc_pipe_recv_cb(void *arg) nni_aio_iov_advance(rxaio, n); if (nni_aio_iov_count(rxaio) != 0) { // Was this a partial read? If so then resubmit for the rest. - nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_recv(p->conn, rxaio); + nni_mtx_unlock(&p->mtx); return; } // If we don't have a message yet, we were reading the message // header, which is just the length. This tells us the size of the // message to allocate and how much more to expect. - if (pipe->rxmsg == NULL) { + if (p->rxmsg == NULL) { uint64_t len; // Check to make sure we got msg type 1. - if (pipe->rxhead[0] != 1) { + if (p->rxhead[0] != 1) { rv = NNG_EPROTO; goto recv_error; } // We should have gotten a message header. - NNI_GET64(pipe->rxhead + 1, len); + NNI_GET64(p->rxhead + 1, len); // Make sure the message payload is not too big. If it is // the caller will shut down the pipe. - if ((len > pipe->rcvmax) && (pipe->rcvmax > 0)) { + if ((len > p->rcvmax) && (p->rcvmax > 0)) { rv = NNG_EMSGSIZE; goto recv_error; } @@ -322,7 +333,7 @@ ipc_pipe_recv_cb(void *arg) // lock for the read side in the future, so that we allow // transmits to proceed normally. In practice this is // unlikely to be much of an issue though. - if ((rv = nni_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { + if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { goto recv_error; } @@ -330,11 +341,12 @@ ipc_pipe_recv_cb(void *arg) nni_iov iov; // Submit the rest of the data for a read -- we want to // read the entire message now. - iov.iov_buf = nni_msg_body(pipe->rxmsg); + iov.iov_buf = nni_msg_body(p->rxmsg); iov.iov_len = (size_t) len; + nni_aio_set_iov(rxaio, 1, &iov); - nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_recv(p->conn, rxaio); + nni_mtx_unlock(&p->mtx); return; } } @@ -343,12 +355,12 @@ ipc_pipe_recv_cb(void *arg) // good news. nni_aio_list_remove(aio); - msg = pipe->rxmsg; - pipe->rxmsg = NULL; - if (!nni_list_empty(&pipe->recvq)) { - ipc_pipe_dorecv(pipe); + msg = p->rxmsg; + p->rxmsg = NULL; + if (!nni_list_empty(&p->recvq)) { + ipctran_pipe_recv_start(p); } - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); nni_aio_set_msg(aio, msg); nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); @@ -356,59 +368,65 @@ ipc_pipe_recv_cb(void *arg) recv_error: nni_aio_list_remove(aio); - msg = pipe->rxmsg; - pipe->rxmsg = NULL; + msg = p->rxmsg; + p->rxmsg = NULL; // Intentionally, we do not queue up another receive. // The protocol should notice this error and close the pipe. - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); nni_msg_free(msg); nni_aio_finish_error(aio, rv); } static void -ipc_cancel_tx(nni_aio *aio, int rv) +ipctran_pipe_send_cancel(nni_aio *aio, int rv) { - ipc_pipe *pipe = nni_aio_get_prov_data(aio); + ipctran_pipe *p = nni_aio_get_prov_data(aio); - nni_mtx_lock(&pipe->mtx); + nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); return; } // If this is being sent, then cancel the pending transfer. // The callback on the txaio will cause the user aio to // be canceled too. - if (nni_list_first(&pipe->sendq) == aio) { - nni_aio_abort(pipe->txaio, rv); - nni_mtx_unlock(&pipe->mtx); + if (nni_list_first(&p->sendq) == aio) { + nni_aio_abort(p->txaio, rv); + nni_mtx_unlock(&p->mtx); return; } nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); } static void -ipc_pipe_dosend(ipc_pipe *pipe, nni_aio *aio) +ipctran_pipe_send_start(ipctran_pipe *p) { + nni_aio *aio; nni_aio *txaio; nni_msg *msg; int niov; nni_iov iov[3]; uint64_t len; + if ((aio = nni_list_first(&p->sendq)) == NULL) { + return; + } + // This runs to send the message. msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); - pipe->txhead[0] = 1; // message type, 1. - NNI_PUT64(pipe->txhead + 1, len); + p->txhead[0] = 1; // message type, 1. + NNI_PUT64(p->txhead + 1, len); - txaio = pipe->txaio; + txaio = p->txaio; niov = 0; - iov[0].iov_buf = pipe->txhead; - iov[0].iov_len = sizeof(pipe->txhead); + iov[0].iov_buf = p->txhead; + iov[0].iov_len = sizeof(p->txhead); niov++; if (nni_msg_header_len(msg) > 0) { iov[niov].iov_buf = nni_msg_header(msg); @@ -421,504 +439,643 @@ ipc_pipe_dosend(ipc_pipe *pipe, nni_aio *aio) niov++; } nni_aio_set_iov(txaio, niov, iov); - nni_plat_ipc_pipe_send(pipe->ipp, txaio); + nni_ipc_conn_send(p->conn, txaio); } static void -ipc_pipe_send(void *arg, nni_aio *aio) +ipctran_pipe_send(void *arg, nni_aio *aio) { - ipc_pipe *pipe = arg; - int rv; + ipctran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } - nni_mtx_lock(&pipe->mtx); - if ((rv = nni_aio_schedule(aio, ipc_cancel_tx, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, ipctran_pipe_send_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } - nni_list_append(&pipe->sendq, aio); - if (nni_list_first(&pipe->sendq) == aio) { - ipc_pipe_dosend(pipe, aio); + nni_list_append(&p->sendq, aio); + if (nni_list_first(&p->sendq) == aio) { + ipctran_pipe_send_start(p); } - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); } static void -ipc_cancel_rx(nni_aio *aio, int rv) +ipctran_pipe_recv_cancel(nni_aio *aio, int rv) { - ipc_pipe *pipe = nni_aio_get_prov_data(aio); + ipctran_pipe *p = nni_aio_get_prov_data(aio); - nni_mtx_lock(&pipe->mtx); + nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); return; } // If receive in progress, then cancel the pending transfer. // The callback on the rxaio will cause the user aio to // be canceled too. - if (nni_list_first(&pipe->recvq) == aio) { - nni_aio_abort(pipe->rxaio, rv); - nni_mtx_unlock(&pipe->mtx); + if (nni_list_first(&p->recvq) == aio) { + nni_aio_abort(p->rxaio, rv); + nni_mtx_unlock(&p->mtx); return; } nni_aio_list_remove(aio); - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); } static void -ipc_pipe_dorecv(ipc_pipe *pipe) +ipctran_pipe_recv_start(ipctran_pipe *p) { nni_aio *rxaio; nni_iov iov; - NNI_ASSERT(pipe->rxmsg == NULL); + NNI_ASSERT(p->rxmsg == NULL); // Schedule a read of the IPC header. - rxaio = pipe->rxaio; - iov.iov_buf = pipe->rxhead; - iov.iov_len = sizeof(pipe->rxhead); + rxaio = p->rxaio; + iov.iov_buf = p->rxhead; + iov.iov_len = sizeof(p->rxhead); nni_aio_set_iov(rxaio, 1, &iov); - nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); + nni_ipc_conn_recv(p->conn, rxaio); } static void -ipc_pipe_recv(void *arg, nni_aio *aio) +ipctran_pipe_recv(void *arg, nni_aio *aio) { - ipc_pipe *pipe = arg; - int rv; + ipctran_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } - nni_mtx_lock(&pipe->mtx); + nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, ipc_cancel_rx, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); + if ((rv = nni_aio_schedule(aio, ipctran_pipe_recv_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } - nni_list_append(&pipe->recvq, aio); - if (nni_list_first(&pipe->recvq) == aio) { - ipc_pipe_dorecv(pipe); + nni_list_append(&p->recvq, aio); + if (nni_list_first(&p->recvq) == aio) { + ipctran_pipe_recv_start(p); } - nni_mtx_unlock(&pipe->mtx); + nni_mtx_unlock(&p->mtx); } static void -ipc_pipe_start(void *arg, nni_aio *aio) +ipctran_pipe_start(void *arg, nni_aio *aio) { - ipc_pipe *pipe = arg; - nni_aio * negaio; - nni_iov iov; - int rv; + ipctran_pipe *p = arg; + nni_aio * negaio; + nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } - nni_mtx_lock(&pipe->mtx); - if ((rv = nni_aio_schedule(aio, ipc_cancel_start, pipe)) != 0) { - nni_mtx_unlock(&pipe->mtx); + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, ipctran_pipe_nego_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } - pipe->txhead[0] = 0; - pipe->txhead[1] = 'S'; - pipe->txhead[2] = 'P'; - pipe->txhead[3] = 0; - NNI_PUT16(&pipe->txhead[4], pipe->proto); - NNI_PUT16(&pipe->txhead[6], 0); - - pipe->user_negaio = aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - negaio = pipe->negaio; - iov.iov_len = 8; - iov.iov_buf = &pipe->txhead[0]; + p->txhead[0] = 0; + p->txhead[1] = 'S'; + p->txhead[2] = 'P'; + p->txhead[3] = 0; + NNI_PUT16(&p->txhead[4], p->proto); + NNI_PUT16(&p->txhead[6], 0); + + p->user_negaio = aio; + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 8; + p->wanttxhead = 8; + negaio = p->negaio; + iov.iov_len = 8; + iov.iov_buf = &p->txhead[0]; nni_aio_set_iov(negaio, 1, &iov); - nni_plat_ipc_pipe_send(pipe->ipp, negaio); - nni_mtx_unlock(&pipe->mtx); + nni_ipc_conn_send(p->conn, negaio); + nni_mtx_unlock(&p->mtx); } static uint16_t -ipc_pipe_peer(void *arg) +ipctran_pipe_peer(void *arg) { - ipc_pipe *pipe = arg; + ipctran_pipe *p = arg; - return (pipe->peer); + return (p->peer); } static int -ipc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) +ipctran_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_pipe *p = arg; + ipctran_pipe *p = arg; return (nni_copyout_sockaddr(&p->sa, buf, szp, t)); } static int -ipc_pipe_get_peer_uid(void *arg, void *buf, size_t *szp, nni_opt_type t) +ipctran_pipe_get_peer_uid(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_pipe *p = arg; - uint64_t id; - int rv; - if ((rv = nni_plat_ipc_pipe_get_peer_uid(p->ipp, &id)) != 0) { + ipctran_pipe *p = arg; + uint64_t id; + int rv; + if ((rv = nni_ipc_conn_get_peer_uid(p->conn, &id)) != 0) { return (rv); } return (nni_copyout_u64(id, buf, szp, t)); } static int -ipc_pipe_get_peer_gid(void *arg, void *buf, size_t *szp, nni_opt_type t) +ipctran_pipe_get_peer_gid(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_pipe *p = arg; - uint64_t id; - int rv; - if ((rv = nni_plat_ipc_pipe_get_peer_gid(p->ipp, &id)) != 0) { + ipctran_pipe *p = arg; + uint64_t id; + int rv; + if ((rv = nni_ipc_conn_get_peer_gid(p->conn, &id)) != 0) { return (rv); } return (nni_copyout_u64(id, buf, szp, t)); } static int -ipc_pipe_get_peer_pid(void *arg, void *buf, size_t *szp, nni_opt_type t) +ipctran_pipe_get_peer_pid(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_pipe *p = arg; - uint64_t id; - int rv; - if ((rv = nni_plat_ipc_pipe_get_peer_pid(p->ipp, &id)) != 0) { + ipctran_pipe *p = arg; + uint64_t id; + int rv; + if ((rv = nni_ipc_conn_get_peer_pid(p->conn, &id)) != 0) { return (rv); } return (nni_copyout_u64(id, buf, szp, t)); } static int -ipc_pipe_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_opt_type t) +ipctran_pipe_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_pipe *p = arg; - uint64_t id; - int rv; - if ((rv = nni_plat_ipc_pipe_get_peer_zoneid(p->ipp, &id)) != 0) { + ipctran_pipe *p = arg; + uint64_t id; + int rv; + if ((rv = nni_ipc_conn_get_peer_zoneid(p->conn, &id)) != 0) { return (rv); } return (nni_copyout_u64(id, buf, szp, t)); } static void -ipc_ep_fini(void *arg) +ipctran_dialer_fini(void *arg) { - ipc_ep *ep = arg; + ipctran_dialer *d = arg; - nni_aio_stop(ep->aio); - nni_plat_ipc_ep_fini(ep->iep); - nni_aio_fini(ep->aio); - nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); + nni_aio_stop(d->aio); + if (d->dialer != NULL) { + nni_ipc_dialer_fini(d->dialer); + } + nni_aio_fini(d->aio); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +ipctran_dialer_close(void *arg) +{ + ipctran_dialer *d = arg; + + nni_aio_close(d->aio); + nni_ipc_dialer_close(d->dialer); } static int -ipc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) +ipctran_dialer_init(void **dp, nni_url *url, nni_sock *sock) { - ipc_ep *ep; - int rv; - size_t sz; + ipctran_dialer *d; + int rv; + size_t sz; - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&ep->mtx); + nni_mtx_init(&d->mtx); - sz = sizeof(ep->sa.s_ipc.sa_path); - ep->sa.s_ipc.sa_family = NNG_AF_IPC; + sz = sizeof(d->sa.s_ipc.sa_path); + d->sa.s_ipc.sa_family = NNG_AF_IPC; - if (nni_strlcpy(ep->sa.s_ipc.sa_path, url->u_path, sz) >= sz) { - ipc_ep_fini(ep); + if (nni_strlcpy(d->sa.s_ipc.sa_path, url->u_path, sz) >= sz) { + ipctran_dialer_fini(d); return (NNG_EADDRINVAL); } - if ((rv = nni_plat_ipc_ep_init(&ep->iep, &ep->sa, mode)) != 0) { - ipc_ep_fini(ep); + if (((rv = nni_ipc_dialer_init(&d->dialer)) != 0) || + ((rv = nni_aio_init(&d->aio, ipctran_dialer_cb, d)) != 0)) { + ipctran_dialer_fini(d); return (rv); } - if ((rv = nni_aio_init(&ep->aio, ipc_ep_cb, ep)) != 0) { - ipc_ep_fini(ep); - return (rv); - } - ep->proto = nni_sock_proto_id(sock); + d->proto = nni_sock_proto_id(sock); - *epp = ep; + *dp = d; return (0); } -static int -ipc_dialer_init(void **epp, nni_url *url, nni_sock *sock) +static void +ipctran_dialer_cb(void *arg) { - return (ipc_ep_init(epp, url, sock, NNI_EP_MODE_DIAL)); + ipctran_dialer *d = arg; + ipctran_pipe * p; + nni_ipc_conn * conn; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + aio = d->user_aio; + rv = nni_aio_result(d->aio); + + if (aio == NULL) { + nni_mtx_unlock(&d->mtx); + if (rv == 0) { + conn = nni_aio_get_output(d->aio, 0); + nni_ipc_conn_fini(conn); + } + return; + } + + if (rv != 0) { + d->user_aio = NULL; + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + d->user_aio = NULL; + conn = nni_aio_get_output(d->aio, 0); + NNI_ASSERT(conn != NULL); + if ((rv = ipctran_pipe_init(&p, conn)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_ipc_conn_fini(conn); + nni_aio_finish_error(aio, rv); + return; + } + + p->proto = d->proto; + p->rcvmax = d->rcvmax; + p->sa = d->sa; + nni_mtx_unlock(&d->mtx); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } -static int -ipc_listener_init(void **epp, nni_url *url, nni_sock *sock) +static void +ipctran_dialer_cancel(nni_aio *aio, int rv) { - return (ipc_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN)); + ipctran_dialer *d = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&d->mtx); + if (d->user_aio != aio) { + nni_mtx_unlock(&d->mtx); + return; + } + d->user_aio = NULL; + nni_mtx_unlock(&d->mtx); + + nni_aio_abort(d->aio, rv); + nni_aio_finish_error(aio, rv); } static void -ipc_ep_close(void *arg) +ipctran_dialer_connect(void *arg, nni_aio *aio) { - ipc_ep *ep = arg; + ipctran_dialer *d = arg; + int rv; - nni_aio_close(ep->aio); + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&d->mtx); + NNI_ASSERT(d->user_aio == NULL); + + if ((rv = nni_aio_schedule(aio, ipctran_dialer_cancel, d)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; + } + d->user_aio = aio; - nni_mtx_lock(&ep->mtx); - nni_plat_ipc_ep_close(ep->iep); - nni_mtx_unlock(&ep->mtx); + nni_ipc_dialer_dial(d->dialer, &d->sa, d->aio); + nni_mtx_unlock(&d->mtx); } static int -ipc_ep_bind(void *arg) +ipctran_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { - ipc_ep *ep = arg; - int rv; + ipctran_dialer *d = arg; + int rv; + nni_mtx_lock(&d->mtx); + rv = nni_copyout_size(d->rcvmax, v, szp, t); + nni_mtx_unlock(&d->mtx); + return (rv); +} - nni_mtx_lock(&ep->mtx); - rv = nni_plat_ipc_ep_listen(ep->iep); - nni_mtx_unlock(&ep->mtx); +static int +ipctran_dialer_set_recvmaxsz( + void *arg, const void *v, size_t sz, nni_opt_type t) +{ + ipctran_dialer *d = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->rcvmax = val; + nni_mtx_unlock(&d->mtx); + } return (rv); } static void -ipc_ep_finish(ipc_ep *ep) +ipctran_listener_fini(void *arg) { - nni_aio * aio; - int rv; - ipc_pipe *pipe = NULL; + ipctran_listener *l = arg; - if ((rv = nni_aio_result(ep->aio)) != 0) { - goto done; + nni_aio_stop(l->aio); + if (l->listener != NULL) { + nni_ipc_listener_fini(l->listener); } - NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); - - // Attempt to allocate the parent pipe. If this fails we'll - // drop the connection (ENOMEM probably). - rv = ipc_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); + nni_aio_fini(l->aio); + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} -done: - aio = ep->user_aio; - ep->user_aio = NULL; +static int +ipctran_listener_init(void **lp, nni_url *url, nni_sock *sock) +{ + ipctran_listener *l; + int rv; + size_t sz; - if ((aio != NULL) && (rv == 0)) { - NNI_ASSERT(pipe != NULL); - nni_aio_set_output(aio, 0, pipe); - nni_aio_finish(aio, 0, 0); - return; + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); } + nni_mtx_init(&l->mtx); + + sz = sizeof(l->sa.s_ipc.sa_path); + l->sa.s_ipc.sa_family = NNG_AF_IPC; - if (pipe != NULL) { - ipc_pipe_fini(pipe); + if (nni_strlcpy(l->sa.s_ipc.sa_path, url->u_path, sz) >= sz) { + ipctran_listener_fini(l); + return (NNG_EADDRINVAL); } - if (aio != NULL) { - NNI_ASSERT(rv != 0); - nni_aio_finish_error(aio, rv); + + if (((rv = nni_ipc_listener_init(&l->listener)) != 0) || + ((rv = nni_aio_init(&l->aio, ipctran_listener_cb, l)) != 0)) { + ipctran_listener_fini(l); + return (rv); } + + l->proto = nni_sock_proto_id(sock); + + *lp = l; + return (0); } static void -ipc_ep_cb(void *arg) +ipctran_listener_close(void *arg) { - ipc_ep *ep = arg; + ipctran_listener *l = arg; - nni_mtx_lock(&ep->mtx); - ipc_ep_finish(ep); - nni_mtx_unlock(&ep->mtx); + nni_aio_close(l->aio); + nni_ipc_listener_close(l->listener); } -static void -ipc_cancel_ep(nni_aio *aio, int rv) +static int +ipctran_listener_bind(void *arg) { - ipc_ep *ep = nni_aio_get_prov_data(aio); + ipctran_listener *l = arg; + int rv; - NNI_ASSERT(rv != 0); - nni_mtx_lock(&ep->mtx); - if (ep->user_aio != aio) { - nni_mtx_unlock(&ep->mtx); - return; - } - ep->user_aio = NULL; - nni_mtx_unlock(&ep->mtx); - - nni_aio_abort(ep->aio, rv); - nni_aio_finish_error(aio, rv); + nni_mtx_lock(&l->mtx); + rv = nni_ipc_listener_listen(l->listener, &l->sa); + nni_mtx_unlock(&l->mtx); + return (rv); } static void -ipc_ep_accept(void *arg, nni_aio *aio) +ipctran_listener_cb(void *arg) { - ipc_ep *ep = arg; - int rv; + ipctran_listener *l = arg; + nni_aio * aio; + int rv; + ipctran_pipe * p = NULL; + nni_ipc_conn * conn; + + nni_mtx_lock(&l->mtx); + rv = nni_aio_result(l->aio); + aio = l->user_aio; + l->user_aio = NULL; + + if (aio == NULL) { + nni_mtx_unlock(&l->mtx); + if (rv == 0) { + conn = nni_aio_get_output(l->aio, 0); + nni_ipc_conn_fini(conn); + } + return; + } - if (nni_aio_begin(aio) != 0) { + if (rv != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); return; } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - if ((rv = nni_aio_schedule(aio, ipc_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + conn = nni_aio_get_output(l->aio, 0); + NNI_ASSERT(conn != NULL); + + // Attempt to allocate the parent pipe. If this fails we'll + // drop the connection (ENOMEM probably). + if ((rv = ipctran_pipe_init(&p, conn)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_ipc_conn_fini(conn); nni_aio_finish_error(aio, rv); return; } - ep->user_aio = aio; + p->proto = l->proto; + p->rcvmax = l->rcvmax; + p->sa = l->sa; + nni_mtx_unlock(&l->mtx); + + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); +} + +static void +ipctran_listener_cancel(nni_aio *aio, int rv) +{ + ipctran_listener *l = nni_aio_get_prov_data(aio); + + NNI_ASSERT(rv != 0); + nni_mtx_lock(&l->mtx); + if (l->user_aio != aio) { + nni_mtx_unlock(&l->mtx); + return; + } + l->user_aio = NULL; + nni_mtx_unlock(&l->mtx); - nni_plat_ipc_ep_accept(ep->iep, ep->aio); - nni_mtx_unlock(&ep->mtx); + nni_aio_abort(l->aio, rv); + nni_aio_finish_error(aio, rv); } static void -ipc_ep_connect(void *arg, nni_aio *aio) +ipctran_listener_accept(void *arg, nni_aio *aio) { - ipc_ep *ep = arg; - int rv; + ipctran_listener *l = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); + nni_mtx_lock(&l->mtx); + NNI_ASSERT(l->user_aio == NULL); - if ((rv = nni_aio_schedule(aio, ipc_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if ((rv = nni_aio_schedule(aio, ipctran_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); nni_aio_finish_error(aio, rv); return; } - ep->user_aio = aio; + l->user_aio = aio; - nni_plat_ipc_ep_connect(ep->iep, ep->aio); - nni_mtx_unlock(&ep->mtx); + nni_ipc_listener_accept(l->listener, l->aio); + nni_mtx_unlock(&l->mtx); } static int -ipc_ep_set_recvmaxsz(void *arg, const void *data, size_t sz, nni_opt_type t) +ipctran_listener_set_recvmaxsz( + void *arg, const void *data, size_t sz, nni_opt_type t) { - ipc_ep *ep = arg; - size_t val; - int rv; + ipctran_listener *l = arg; + size_t val; + int rv; if ((rv = nni_copyin_size(&val, data, sz, 0, NNI_MAXSZ, t)) == 0) { - nni_mtx_lock(&ep->mtx); - ep->rcvmax = val; - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->mtx); + l->rcvmax = val; + nni_mtx_unlock(&l->mtx); } return (rv); } static int -ipc_ep_chk_recvmaxsz(const void *data, size_t sz, nni_opt_type t) +ipctran_listener_get_recvmaxsz( + void *arg, void *data, size_t *szp, nni_opt_type t) { - return (nni_copyin_size(NULL, data, sz, 0, NNI_MAXSZ, t)); + ipctran_listener *l = arg; + int rv; + nni_mtx_lock(&l->mtx); + rv = nni_copyout_size(l->rcvmax, data, szp, t); + nni_mtx_unlock(&l->mtx); + return (rv); } static int -ipc_ep_get_recvmaxsz(void *arg, void *data, size_t *szp, nni_opt_type t) +ipctran_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_opt_type t) { - ipc_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_size(ep->rcvmax, data, szp, t); - nni_mtx_unlock(&ep->mtx); + ipctran_listener *l = arg; + int rv; + + nni_mtx_lock(&l->mtx); + rv = nni_copyout_sockaddr(&l->sa, buf, szp, t); + nni_mtx_unlock(&l->mtx); return (rv); } static int -ipc_ep_get_addr(void *arg, void *data, size_t *szp, nni_opt_type t) +ipctran_check_recvmaxsz(const void *data, size_t sz, nni_opt_type t) { - ipc_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - rv = nni_copyout_sockaddr(&ep->sa, data, szp, t); - nni_mtx_unlock(&ep->mtx); - return (rv); + return (nni_copyin_size(NULL, data, sz, 0, NNI_MAXSZ, t)); } static int -ipc_ep_set_perms(void *arg, const void *data, size_t sz, nni_opt_type t) +ipctran_listener_set_perms( + void *arg, const void *data, size_t sz, nni_opt_type t) { - ipc_ep *ep = arg; - int val; - int rv; + ipctran_listener *l = arg; + int val; + int rv; // Probably we could further limit this -- most systems don't have // meaningful chmod beyond the lower 9 bits. if ((rv = nni_copyin_int(&val, data, sz, 0, 0x7FFFFFFF, t)) == 0) { - nni_mtx_lock(&ep->mtx); - rv = nni_plat_ipc_ep_set_permissions(ep->iep, val); - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->mtx); + rv = nni_ipc_listener_set_permissions(l->listener, val); + nni_mtx_unlock(&l->mtx); } return (rv); } static int -ipc_ep_chk_perms(const void *data, size_t sz, nni_opt_type t) +ipctran_check_perms(const void *data, size_t sz, nni_opt_type t) { return (nni_copyin_int(NULL, data, sz, 0, 0x7FFFFFFF, t)); } static int -ipc_ep_set_sec_desc(void *arg, const void *data, size_t sz, nni_opt_type t) +ipctran_listener_set_sec_desc( + void *arg, const void *data, size_t sz, nni_opt_type t) { - ipc_ep *ep = arg; - void * ptr; - int rv; + ipctran_listener *l = arg; + void * ptr; + int rv; if ((rv = nni_copyin_ptr(&ptr, data, sz, t)) == 0) { - nni_mtx_lock(&ep->mtx); - rv = nni_plat_ipc_ep_set_security_descriptor(ep->iep, ptr); - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->mtx); + rv = + nni_ipc_listener_set_security_descriptor(l->listener, ptr); + nni_mtx_unlock(&l->mtx); } return (rv); } static int -ipc_ep_chk_sec_desc(const void *data, size_t sz, nni_opt_type t) +ipctran_check_sec_desc(const void *data, size_t sz, nni_opt_type t) { return (nni_copyin_ptr(NULL, data, sz, t)); } -static nni_tran_option ipc_pipe_options[] = { +static nni_tran_option ipctran_pipe_options[] = { { .o_name = NNG_OPT_REMADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = ipc_pipe_get_addr, + .o_get = ipctran_pipe_get_addr, }, { .o_name = NNG_OPT_LOCADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = ipc_pipe_get_addr, + .o_get = ipctran_pipe_get_addr, }, { .o_name = NNG_OPT_IPC_PEER_UID, .o_type = NNI_TYPE_UINT64, - .o_get = ipc_pipe_get_peer_uid, + .o_get = ipctran_pipe_get_peer_uid, }, { .o_name = NNG_OPT_IPC_PEER_GID, .o_type = NNI_TYPE_UINT64, - .o_get = ipc_pipe_get_peer_gid, + .o_get = ipctran_pipe_get_peer_gid, }, { .o_name = NNG_OPT_IPC_PEER_PID, .o_type = NNI_TYPE_UINT64, - .o_get = ipc_pipe_get_peer_pid, + .o_get = ipctran_pipe_get_peer_pid, }, { .o_name = NNG_OPT_IPC_PEER_ZONEID, .o_type = NNI_TYPE_UINT64, - .o_get = ipc_pipe_get_peer_zoneid, + .o_get = ipctran_pipe_get_peer_zoneid, }, // terminate list { @@ -926,29 +1083,24 @@ static nni_tran_option ipc_pipe_options[] = { }, }; -static nni_tran_pipe_ops ipc_pipe_ops = { - .p_fini = ipc_pipe_fini, - .p_start = ipc_pipe_start, - .p_stop = ipc_pipe_stop, - .p_send = ipc_pipe_send, - .p_recv = ipc_pipe_recv, - .p_close = ipc_pipe_close, - .p_peer = ipc_pipe_peer, - .p_options = ipc_pipe_options, +static nni_tran_pipe_ops ipctran_pipe_ops = { + .p_fini = ipctran_pipe_fini, + .p_start = ipctran_pipe_start, + .p_stop = ipctran_pipe_stop, + .p_send = ipctran_pipe_send, + .p_recv = ipctran_pipe_recv, + .p_close = ipctran_pipe_close, + .p_peer = ipctran_pipe_peer, + .p_options = ipctran_pipe_options, }; -static nni_tran_option ipc_dialer_options[] = { +static nni_tran_option ipctran_dialer_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = ipc_ep_get_recvmaxsz, - .o_set = ipc_ep_set_recvmaxsz, - .o_chk = ipc_ep_chk_recvmaxsz, - }, - { - .o_name = NNG_OPT_LOCADDR, - .o_type = NNI_TYPE_SOCKADDR, - .o_get = ipc_ep_get_addr, + .o_get = ipctran_dialer_get_recvmaxsz, + .o_set = ipctran_dialer_set_recvmaxsz, + .o_chk = ipctran_check_recvmaxsz, }, // terminate list { @@ -956,32 +1108,32 @@ static nni_tran_option ipc_dialer_options[] = { }, }; -static nni_tran_option ipc_listener_options[] = { +static nni_tran_option ipctran_listener_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = ipc_ep_get_recvmaxsz, - .o_set = ipc_ep_set_recvmaxsz, - .o_chk = ipc_ep_chk_recvmaxsz, + .o_get = ipctran_listener_get_recvmaxsz, + .o_set = ipctran_listener_set_recvmaxsz, + .o_chk = ipctran_check_recvmaxsz, }, { .o_name = NNG_OPT_LOCADDR, .o_type = NNI_TYPE_SOCKADDR, - .o_get = ipc_ep_get_addr, + .o_get = ipctran_listener_get_locaddr, }, { .o_name = NNG_OPT_IPC_SECURITY_DESCRIPTOR, .o_type = NNI_TYPE_POINTER, .o_get = NULL, - .o_set = ipc_ep_set_sec_desc, - .o_chk = ipc_ep_chk_sec_desc, + .o_set = ipctran_listener_set_sec_desc, + .o_chk = ipctran_check_sec_desc, }, { .o_name = NNG_OPT_IPC_PERMISSIONS, .o_type = NNI_TYPE_INT32, .o_get = NULL, - .o_set = ipc_ep_set_perms, - .o_chk = ipc_ep_chk_perms, + .o_set = ipctran_listener_set_perms, + .o_chk = ipctran_check_perms, }, // terminate list { @@ -989,31 +1141,31 @@ static nni_tran_option ipc_listener_options[] = { }, }; -static nni_tran_dialer_ops ipc_dialer_ops = { - .d_init = ipc_dialer_init, - .d_fini = ipc_ep_fini, - .d_connect = ipc_ep_connect, - .d_close = ipc_ep_close, - .d_options = ipc_dialer_options, +static nni_tran_dialer_ops ipctran_dialer_ops = { + .d_init = ipctran_dialer_init, + .d_fini = ipctran_dialer_fini, + .d_connect = ipctran_dialer_connect, + .d_close = ipctran_dialer_close, + .d_options = ipctran_dialer_options, }; -static nni_tran_listener_ops ipc_listener_ops = { - .l_init = ipc_listener_init, - .l_fini = ipc_ep_fini, - .l_bind = ipc_ep_bind, - .l_accept = ipc_ep_accept, - .l_close = ipc_ep_close, - .l_options = ipc_listener_options, +static nni_tran_listener_ops ipctran_listener_ops = { + .l_init = ipctran_listener_init, + .l_fini = ipctran_listener_fini, + .l_bind = ipctran_listener_bind, + .l_accept = ipctran_listener_accept, + .l_close = ipctran_listener_close, + .l_options = ipctran_listener_options, }; static nni_tran ipc_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "ipc", - .tran_dialer = &ipc_dialer_ops, - .tran_listener = &ipc_listener_ops, - .tran_pipe = &ipc_pipe_ops, - .tran_init = ipc_tran_init, - .tran_fini = ipc_tran_fini, + .tran_dialer = &ipctran_dialer_ops, + .tran_listener = &ipctran_listener_ops, + .tran_pipe = &ipctran_pipe_ops, + .tran_init = ipctran_init, + .tran_fini = ipctran_fini, }; int diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index c323da29..0d47529e 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -129,6 +129,7 @@ tcptran_pipe_fini(void *arg) nni_tcp_conn_fini(p->conn); } nni_msg_free(p->rxmsg); + nni_mtx_fini(&p->mtx); NNI_FREE_STRUCT(p); } @@ -780,7 +781,12 @@ static int tcptran_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { tcptran_dialer *d = arg; - return (nni_copyout_size(d->rcvmax, v, szp, t)); + int rv; + + nni_mtx_lock(&d->mtx); + rv = nni_copyout_size(d->rcvmax, v, szp, t); + nni_mtx_unlock(&d->mtx); + return (rv); } static int @@ -922,15 +928,12 @@ tcptran_listener_init(void **lp, nni_url *url, nni_sock *sock) return (rv); } - if ((rv = nni_tcp_listener_init(&l->listener)) != 0) { + if (((rv = nni_tcp_listener_init(&l->listener)) != 0) || + ((rv = nni_aio_init(&l->aio, tcptran_listener_cb, l)) != 0)) { tcptran_listener_fini(l); return (rv); } - if ((rv = nni_aio_init(&l->aio, tcptran_listener_cb, l)) != 0) { - tcptran_listener_fini(l); - return (rv); - } l->proto = nni_sock_proto_id(sock); l->nodelay = true; l->keepalive = false; @@ -955,9 +958,9 @@ tcptran_listener_bind(void *arg) tcptran_listener *l = arg; int rv; - l->bsa = l->sa; nni_mtx_lock(&l->mtx); - rv = nni_tcp_listener_listen(l->listener, &l->bsa); + l->bsa = l->sa; + rv = nni_tcp_listener_listen(l->listener, &l->bsa); nni_mtx_unlock(&l->mtx); return (rv); @@ -1139,7 +1142,24 @@ static int tcptran_listener_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { tcptran_listener *l = arg; - return (nni_copyout_size(l->rcvmax, v, szp, t)); + int rv; + + nni_mtx_lock(&l->mtx); + rv = nni_copyout_size(l->rcvmax, v, szp, t); + nni_mtx_unlock(&l->mtx); + return (rv); +} + +static int +tcptran_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + tcptran_listener *l = arg; + int rv; + + nni_mtx_lock(&l->mtx); + rv = nni_copyout_sockaddr(&l->bsa, buf, szp, t); + nni_mtx_unlock(&l->mtx); + return (rv); } static int @@ -1234,6 +1254,11 @@ static nni_tran_option tcptran_listener_options[] = { .o_chk = tcptran_check_recvmaxsz, }, { + .o_name = NNG_OPT_LOCADDR, + .o_type = NNI_TYPE_SOCKADDR, + .o_get = tcptran_listener_get_locaddr, + }, + { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, .o_get = tcptran_listener_get_url, diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index b1bdddb8..e6701f5f 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -913,9 +913,9 @@ tlstran_listener_bind(void *arg) tlstran_listener *l = arg; int rv; - l->bsa = l->sa; nni_mtx_lock(&l->ep.mtx); - rv = nni_tcp_listener_listen(l->listener, &l->bsa); + l->bsa = l->sa; + rv = nni_tcp_listener_listen(l->listener, &l->bsa); nni_mtx_unlock(&l->ep.mtx); return (rv); @@ -1122,6 +1122,18 @@ tlstran_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) } static int +tlstran_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + tlstran_listener *l = arg; + int rv; + + nni_mtx_lock(&l->ep.mtx); + rv = nni_copyout_sockaddr(&l->bsa, buf, szp, t); + nni_mtx_unlock(&l->ep.mtx); + return (rv); +} + +static int tlstran_check_recvmaxsz(const void *v, size_t sz, nni_opt_type t) { return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); @@ -1381,6 +1393,11 @@ static nni_tran_option tlstran_listener_options[] = { .o_get = tlstran_listener_get_url, }, { + .o_name = NNG_OPT_LOCADDR, + .o_type = NNI_TYPE_SOCKADDR, + .o_get = tlstran_listener_get_locaddr, + }, + { .o_name = NNG_OPT_TLS_CONFIG, .o_type = NNI_TYPE_POINTER, .o_get = tlstran_ep_get_config, |
