diff options
| -rw-r--r-- | CMakeLists.txt | 40 | ||||
| -rw-r--r-- | src/CMakeLists.txt | 8 | ||||
| -rw-r--r-- | src/core/platform.h | 27 | ||||
| -rw-r--r-- | src/nng.c | 13 | ||||
| -rw-r--r-- | src/nng.h | 43 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 2 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.adoc | 383 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 2677 | ||||
| -rw-r--r-- | tests/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | tests/sock.c | 1 | ||||
| -rw-r--r-- | tests/trantest.h | 2 | ||||
| -rw-r--r-- | tests/zt.c | 195 |
12 files changed, 3378 insertions, 17 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index d95206c5..c2815ca3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,6 @@ # +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> # Copyright (c) 2012 Martin Sustrik All rights reserved. # Copyright (c) 2013 GoPivotal, Inc. All rights reserved. # Copyright (c) 2015-2016 Jack R. Dunaway. All rights reserved. @@ -91,7 +93,8 @@ option (NNG_TESTS "Build and run tests" ON) option (NNG_TOOLS "Build extra tools" OFF) option (NNG_ENABLE_NNGCAT "Enable building nngcat utility." ${NNG_TOOLS}) option (NNG_ENABLE_COVERAGE "Enable coverage reporting." OFF) - +option (NNG_ENABLE_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF) +set (NNG_ZEROTIER_SOURCE "" CACHE PATH "Location of ZeroTier source tree.") # Enable access to private APIs for our own use. add_definitions (-DNNG_PRIVATE) @@ -244,6 +247,39 @@ nng_check_sym (strlcat string.h NNG_HAVE_STRLCAT) nng_check_sym (strlcpy string.h NNG_HAVE_STRLCPY) nng_check_sym (strnlen string.h NNG_HAVE_STRNLEN) +# Search for ZeroTier +# We use the libzerotiercore.a library, which is unfortunately a C++ object +# even though it exposes only public C symbols. It would be extremely +# helpful if libzerotiercore didn't make us carry the whole C++ runtime +# behind us. The user must specify the location of the ZeroTier source +# tree (dev branch for now, and already compiled please) by setting the +# NNG_ZEROTIER_SOURCE macro. +# NB: This needs to be the zerotierone tree, not the libzt library. +# This is because we don't access the API, but instead use the low +# level zerotiercore functionality directly. +# NB: As we wind up linking libzerotiercore.a into the application, +# this means that your application will *also* need to either be licensed +# under the GPLv3, or you will need to have a commercial license from +# ZeroTier permitting its use elsewhere. +if (NNG_ENABLE_ZEROTIER) + enable_language(CXX) + find_library(NNG_LIBZTCORE zerotiercore PATHS ${NNG_ZEROTIER_SOURCE}) + if (NNG_LIBZTCORE) + set(CMAKE_REQUIRED_INCLUDES ${NNG_ZEROTIER_SOURCE}/include) +# set(CMAKE_REQUIRED_LIBRARIES ${NNG_LIBZTCORE} c++) +# set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} c++) + message(STATUS "C++ ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}") + set(CMAKE_REQUIRED_LIBRARIES ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}) + set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}) + set(NNG_REQUIRED_INCLUDES ${NNG_REQUIRED_INCLUDES} ${NNG_ZEROTIER_SOURCE}/include) + nng_check_sym(ZT_Node_join ZeroTierOne.h NNG_HAVE_ZEROTIER) + endif() + if (NOT NNG_HAVE_ZEROTIER) + message (FATAL_ERROR "Cannot find ZeroTier components") + endif() + message(STATUS "Found ZeroTier at ${NNG_LIBZTCORE}") +endif() + add_subdirectory (src) if (NNG_TESTS) @@ -253,6 +289,7 @@ if (NNG_TESTS) add_subdirectory (perf) endif() + # Build the tools if (NNG_ENABLE_NNGCAT) @@ -270,6 +307,7 @@ if (NNG_ENABLE_DOC) endif () endif () + # Build the documenation if (NNG_ENABLE_DOC) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1d616e95..17b14d2e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -99,6 +99,7 @@ set (NNG_SOURCES transport/ipc/ipc.c transport/tcp/tcp.c + ) if (NNG_PLATFORM_POSIX) @@ -140,7 +141,12 @@ if (NNG_PLATFORM_WINDOWS) ) endif() -include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src) +if (NNG_ENABLE_ZEROTIER) + set (NNG_SOURCES ${NNG_SOURCES} transport/zerotier/zerotier.c) +endif() + +include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src + ${NNG_REQUIRED_INCLUDES}) # Provide same folder structure in IDE as on disk foreach (f ${NNG_SOURCES}) diff --git a/src/core/platform.h b/src/core/platform.h index 02ea9a11..7bfb370a 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -350,6 +350,33 @@ extern void nni_plat_pipe_clear(int); // routine. extern void nni_plat_pipe_close(int, int); +// +// File/Store Support +// +// Some transports require a persistent storage for things like +// key material, etc. Generally, these are all going to be relatively +// small objects (such as certificates), so we ony require a synchronous +// implementation from platforms. This Key-Value API is intended to +// to support using the Key's as filenames, and keys will consist of +// only these characters: [0-9a-zA-Z._-]. The directory used should be +// determined by using an environment variable (NNG_STATE_DIR), or +// using some other application-specific method. +// + +// nni_plat_file_put writes the named file, with the provided data, +// and the given size. If the file already exists it is overwritten. +// The permissions on the file should be limited to read and write +// access by the entity running the application only. +extern int nni_plat_file_put(const char *, const void *, int); + +// nni_plat_file_get reads the entire named file, allocating storage +// to receive the data and returning the data and the size in the +// reference arguments. +extern int nni_plat_file_get(const char *, void **, int *); + +// nni_plat_file_delete deletes the named file. +extern int nni_plat_file_delete(const char *); + // Actual platforms we support. This is included up front so that we can // get the specific types that are supplied by the platform. #if defined(NNG_PLATFORM_POSIX) @@ -19,6 +19,7 @@ // Pretty much every function calls the nni_platform_init to check against // fork related activity. +#include <stdio.h> #include <string.h> void @@ -650,6 +651,7 @@ static const struct { const char * nng_strerror(int num) { + static char unknownerrbuf[32]; for (int i = 0; nni_errors[i].msg != NULL; i++) { if (nni_errors[i].code == num) { return (nni_errors[i].msg); @@ -660,7 +662,16 @@ nng_strerror(int num) return (nni_plat_strerror(num & ~NNG_ESYSERR)); } - return ("Unknown error"); + if (num & NNG_ETRANERR) { + static char tranerrbuf[32]; + (void) snprintf(tranerrbuf, sizeof(tranerrbuf), + "Transport error #%d", num & ~NNG_ETRANERR); + return (tranerrbuf); + } + + (void) snprintf( + unknownerrbuf, sizeof(unknownerrbuf), "Unknown error #%d", num); + return (unknownerrbuf); } int @@ -557,6 +557,23 @@ NNG_DECL void nng_thread_destroy(void *); // Error codes. These may happen to align to errnos used on your platform, // but do not count on this. +// +// NNG_SYSERR is a special code, which allows us to wrap errors from the +// underlying operating system. We generally prefer to map errors to one +// of the above, but if we cannot, then we just encode an error this way. +// The bit is large enough to accommodate all known UNIX and Win32 error +// codes. We try hard to match things semantically to one of our standard +// errors. For example, a connection reset or aborted we treat as a +// closed connection, because that's basically what it means. (The remote +// peer closed the connection.) For certain kinds of resource exhaustion +// we treat it the same as memory. But for files, etc. that's OS-specific, +// and we use the generic below. Some of the above error codes we use +// internally, and the application should never see (e.g. NNG_EINTR). +// +// NNG_ETRANERR is like ESYSERR, but is used to wrap transport specific +// errors, from different transports. It should only be used when none +// of the other options are available. + enum nng_errno_enum { NNG_EINTR = 1, NNG_ENOMEM = 2, @@ -582,21 +599,11 @@ enum nng_errno_enum { NNG_ENOSPC = 22, NNG_EEXIST = 23, NNG_EINTERNAL = 24, + NNG_ETRANSPORT = 25, + NNG_ESYSERR = 0x10000000, + NNG_ETRANERR = 0x20000000, }; -// NNG_SYSERR is a special code, which allows us to wrap errors from the -// underlyuing operating system. We generally prefer to map errors to one -// of the above, but if we cannot, then we just encode an error this way. -// The bit is large enough to accommodate all known UNIX and Win32 error -// codes. We try hard to match things semantically to one of our standard -// errors. For example, a connection reset or aborted we treat as a -// closed connection, because that's basically what it means. (The remote -// peer closed the connection.) For certain kinds of resource exhaustion -// we treat it the same as memory. But for files, etc. that's OS-specific, -// and we use the generic below. Some of the above error codes we use -// internally, and the application should never see (e.g. NNG_EINTR). -#define NNG_ESYSERR (0x10000000) - // Maximum length of a socket address. This includes the terminating NUL. // This limit is built into other implementations, so do not change it. #define NNG_MAXADDRLEN (128) @@ -627,9 +634,17 @@ struct nng_sockaddr_in { uint16_t sa_port; uint32_t sa_addr; }; + +struct nng_sockaddr_zt { + uint64_t sa_nwid; + uint64_t sa_nodeid; + uint32_t sa_port; +}; + typedef struct nng_sockaddr_in nng_sockaddr_in; typedef struct nng_sockaddr_in nng_sockaddr_udp; typedef struct nng_sockaddr_in nng_sockaddr_tcp; +typedef struct nng_sockaddr_zt nng_sockaddr_zt; typedef struct nng_sockaddr { union { @@ -637,6 +652,7 @@ typedef struct nng_sockaddr { nng_sockaddr_path s_path; nng_sockaddr_in6 s_in6; nng_sockaddr_in s_in; + nng_sockaddr_zt s_zt; } s_un; } nng_sockaddr; @@ -646,6 +662,7 @@ enum nng_sockaddr_family { NNG_AF_IPC = 2, NNG_AF_INET = 3, NNG_AF_INET6 = 4, + NNG_AF_ZT = 5, // ZeroTier }; #ifdef __cplusplus diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 552730da..05f7bea1 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -210,7 +210,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) } // UDP opens can actually run synchronously. - if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) { + if ((udp = NNI_ALLOC_STRUCT(udp)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&udp->udp_mtx); diff --git a/src/transport/zerotier/zerotier.adoc b/src/transport/zerotier/zerotier.adoc new file mode 100644 index 00000000..dc838420 --- /dev/null +++ b/src/transport/zerotier/zerotier.adoc @@ -0,0 +1,383 @@ +ZeroTier Mapping for Scalability Protocols +=========================================== + +sp-zerotier-mapping-06 +~~~~~~~~~~~~~~~~~~~~~~ + +Abstract +-------- + +This document defines the ZeroTier mapping for scalability protocols. + +Status of This Memo +------------------- + +This is the third draft document, and is intended to guide early +development efforts. Nothing here is finalized yet. + +Copyright Notice +---------------- + +Copyright 2017 Garrett D'Amore <garrett@damore.org> + +Copyright 2017 Capitar IT Group BV <info@capitar.com> + +At this point, all rights are reserved. (Note that we do intend to +release this under a liberal reuse license once it stabilizes a bit.) + +Underlying protocol +------------------- + +ZeroTier expresses an 802.3 style layer 2, where frames maybe exchanged as if +they were Ethernet frames. Virtual broadcast domains are created within a +numbered "network", and frames may then be exchanged with any peers on that +network. + +Frames may arrive in any order, or be lost, just a with Ethernet +(best effort delivery), but they are strongly protected by a +cryptographic checksum, so frames that do arrive will be uncorrupted. +Furthermore, ZeroTier guarantees that a given frame will be received +at most once. + +Each application on a ZeroTier network has its own address, called a +ZeroTier ID (`ZTID`), which is globally unique -- this is generated +from a hash of the public key associated with the application. + +A given application may participate in multiple ZeroTier networks. + +Sharing of ZeroTier IDs between applications, as well as use of multiple +ZTIDs within a single application, as well as management of the associated +ZeroTier-specific state is out of scope for this document. + +ZeroTier networks have a standard MTU of 2800 bytes, but over typical +public networks an "optimum" MTU of 1400 bytes is used. +ZeroTier may be configured to have larger MTUs, but typically this involves +extensive reassembly at underlying layers, and implementations *SHOULD* +use the optimum MTU advertised by the ZeroTier implementation. + + +Packet layout +~~~~~~~~~~~~~ + +Each SP message sent over ZeroTier is comprised of one or +more fragments, where each fragment is mapped to a single underlying +ZeroTier L2 frame. We use the EtherType field of 0x0901 to indicate +SP over ZeroTier protocol (number to be registered with IEEE). + +The ZeroTier L2 payload shall be encoded with a header as follows: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | op | flags | version | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | reserved | destination port | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | reserved | source port | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | op-specific payload... + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +All numeric fields are in big-endian byte order. Note that ZeroTier +APIs present this as the L2 payload, but ZeroTier itself may prepend +additional data such as the Ethernet type, and source and destination +MAC addresses, as well as ZeroTier specific headers. The details of +such headers are out of scope for this document. + +As above, the start of each frame is just as a normal Ethernet payload. +The Ethernet type (ethertype) we use for these frames is 0x901, with +a VLAN ID of 0. + +The `op` is a field that indicates the type of message being sent. The +following values are defined: `DATA` (0x00), `CONN-REQ` (0x10), +`CONN-ACK` (0x12), `DISC` (0x20), `PING` (0x30), `PONG` (0x32), +and `ERR` (0x40). These are discussed further below. Implementations +*MUST* discard messages where the `op` is not one of these. + +The `flags` field is reserved for future use, and *MUST* be zero. +Implementations *MUST* discard frames for which this is not true. + +The `version` byte MUST be set to 0x1. Implementations *MUST* discard +any messages received for any other version. + +The `source port` and `destination port` are used to construct a logical +conversation. These are 24-bits wide, and are discussed further below. +The `reserved` fields must be set to zero. + +The remainder of frame varies depending on the `op` used. + + +The port fields are used to discriminate different uses, allowing one +application to have multiple connections or sockets open. The +purpose is analogous to TCP port numbers, except that instead of the +operating system performing the discrimination the application or +library code must do so. Note that port numbers are 24-bits. This +was chosen to allow a peer to allocate a unique port number for each +local conversation, allowing up to 16 million concurrent conversations. +This also allows a 40-bit node number to be combined with the 24-bit +port number to create a 64-bit unique address. + +The `type` field is the numeric SP protocol ID, in big-endian form. +When receiving a message for a port, if the SP protocol ID does not +match the SP protocol expected on that port, the implementation *MUST* +discard this message. + +Note that it is not by accident that the payload is 32-bit aligned in +this message format. The payload is actually 64-bit aligned. + + +Note that at this time, broadcast and multicast is not supported by +this mapping. (A future update may resolve this.) + +DATA messages +~~~~~~~~~~~~~ + +`DATA` messages carry SP protocol payload data. They can only be sent +on an established session (see `CONN` messages below), and are never +acknowledged (in this version). The op-specific payload they carry +is formed like this: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | message ID | fragment size | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | fragment number | total fragments | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | user data... + +-+-+-+-+-+-+-+- + +All fragments, except for the last, *MUST* be the same size. The fragment +size field carries the size of every fragment, except that the last +fragment may be shorter; however even for the last fragment, the fragment +size *MUST* be the size of the rest of the fragments. This is necessary +to allow a receiver to know the fragment size of the other fragments even +if the final fragment is received before any others. (Typically this may +occur if a message consisting of two fragments arrives with fragments +out of order.) + +The last fragment shall have the fragment number equal to +the total fragments minus one, and the first fragment shall have fragment +number 0. Under typical optimal conditions, with an optimal MTU of 1400 +bytes, the largest message that can be transmitted is approximately 86 MB. +Specifically the limit is (65534 * (1400 - 20)) = 90,436,920 bytes. +(Larger MTUs may be used, if the implementation determines that it is +advantageous to do so. Doing so would necessarily give a larger maximum +message size.) + +However, transmitting such a large message would require sending over +65 thousand fragments, and given the likelihood of fragment loss, and +the lack of acknowledgment, it is likely that the entire message would +be lost. As a result, implementations are encouraged to limit the +amount of data that they send to at most a few megabytes. Implementations +receiving the first fragment can easily calculate the worst case for +the message size (the size of the user payload multiplied by the total +number of fragments), and MAY reply to the sender with an `ERR` message +using the code 0x05, indicating that the message is larger than the +receiver is willing to accept. + +Each fragment for a given message must carry the same `message ID`. +Implementations *MUST* initialize this to a random value when starting +a conversation, and *MUST* increment this each time a new message is sent. +Message IDs of zero are not permitted; implementations *MUST* skip past zero +when incrementing message IDs. + +Implementations may detect the loss of a message by noticing skips in the +message IDs that are received, accounting for the expected skip past zero. + +Note that no field conveys the length of the fragment itself, as +this can be determined from the L2 length -- the user data within +the fragment extends to the end of the L2 payload supplied by ZeroTier. +(And, all fragments other than the final fragment for a message must +therefore have the same length.) + + +CONN-REQ and CONN-ACK messages +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +`CONN-REQ` frames represent a request from an initiator to establish a +session, i.e. a new conversation or connection, and `CONN-ACK` +messages are the normal successful reply from the responder. They both +take the same form, which consists of the usual headers along with the +senders 16-bit (big-endian) SP protocol ID appended: + + 0 1 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SP protocol ID | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +The connection is initiated by the initiator sending this message, +with its own SP protocol ID, with the `op` set to `CONN-REQ`. +The initiator must choose a `source port` number that is not currently +being used with the remote peer. (Most implementations will choose a +a source port that is not used at all. Source port numbers *SHOULD* +be chosen randomly.) + +The responder will acknowledge this by replying with its SP protocol +ID in the 4-byte payload, using the `CONN-ACK` op. Additionally, +the source port number that the responder replies with *MUST* be the +one the intiator requested. + +(Responders will identify the session using the initiators chosen +`source port`, which the initiator *MUST NOT* concurrently use for any +other sessions.) + +Alternatively, a responder *MAY* reject the connection attempt by +sending a suitably formed ERR message (see below). + +If a sender does not receive a reply, it *SHOULD* retry this message +before giving up and reporting an error to the user. It is recommended +that a configurable number of retries and time interval be used. + +Given modern Internet latencies of generally less than 500 ms, resending +up to 12 `CONN-REQ` requests, once every 5 seconds, before giving up seems +reasonable. (These times are somewhat larger to allow for ZeroTier +path discovery to take place; this results in a timeout of approximately +a minute.) + +The initiator *MUST NOT* send any `DATA` messages for a conversation until +it has received an ACK from the other party, and it *MUST* send all further +messages for the conversation to the port number supplied by the responder. + +If a `CONN-REQ` frame is received by a responder for a conversation that already +exists, the responder MUST reply. Further, the source port it replies with, +and the SP protocol IDs MUST be identical to what it first sent. This +ensures that the `CONN-REQ` request is idempotent. + +DISC messages +~~~~~~~~~~~~~ + +DISC messages are used to request a session be terminated. This +notifies the remote sender that no more data will be sent or +accepted, and the session resources may be released. There is no +payload. There is no acknowledgment. + +PING and PONG messages +~~~~~~~~~~~~~~~~~~~~~~ + +In order to keep session state, implementations will generally store +data for each session. In order to prevent a stale session from +consuming these resources forever, and in order to keep underlying +ZeroTier sessions alive, a `PING` message *MAY* be sent to a peer +with whom a session has been established. This message has no payload. + +If the `PING` is is successful, then the responder *MUST* reply with a `PONG` +message. As with `PING`, the `PONG` message carries no payload. + +There is no response to a `PONG` message. + +In the event of an error, an implementation *MAY*_ reply with an `ERR` +message. + +Implementations *SHOULD NOT* initiate `PING` messages if they have either +received other session messages recently. + +Implementations *SHOULD* use a timeout T1 seconds of be used before +initiating a message the first time, and that in the absence of a +reply, up to N further attempts be made, separated by T2 seconds. If +no reply to the Nth attempt is received after T2 seconds have passed, +then the remote peer should be assumed offline or dead, and the +session closed. + +The values for T1, T2, and N *SHOULD* be configurable, with +recommended default values of 60, 10, and 5. With these values, +sessions that appear dead after 2 minutes will be closed, and their +resources reclaimed. + +ERR messages +~~~~~~~~~~~~ + +`ERR` messages indicate a failure in the session, and abruptly +terminate the session. The payload for these messages consists of a +single byte error code, followed by an ASCII message describing the +error (not terminated by zero). This message *MUST NOT* be more than +128 bytes in length. + +The following error codes are defined: + + * 0x01 No party listening at that address or port. + * 0x02 No such session found. + * 0x03 SP protocol ID invalid. + * 0x04 Generic protocol error. + * 0x05 Message size too big. + * 0xff Other uncategorized error. + +Implementations *MUST* discard any session state upon receiving an ERR +message. These messages are not acknowledged. + +Reassembly Guidelines +~~~~~~~~~~~~~~~~~~~~~ + +Implementations *MUST* accept and reassemble fragmented `DATA` messages. +Implementations *MUST* discard fragmented messages of other types. + +Messages larger than the ZeroTier MTU *MUST* be fragmented. + +Implementations *SHOULD* limit the number of unassembled messages +retained for reassembly, to minimize the likelihood of intentional +abuse. It is suggested that at most 2 unassembled messages be +retained. It is further suggested that if 2 or more unfragmented +messages arrive before a message is reassembled, or more than 5 +seconds pass before the reassembly is complete, that the unassembled +fragments be discarded. + + +Ports +~~~~~ + +The port numbers are 24-bit fields, allowing a single ZT ID to +service multiple application layer protocols, which could be treated +as separate end points, or as separate sockets in the application. +The implementation is responsible for discriminating on these and +delivering to the appropriate consumer. + +As with UDP or TCP, it is intended that each party have its own port +number, and that a pair of ports (combined with ZeroTier IDs) be used +to identify a single conversation. + +An SP server should allocate a port for number advertisement. It is +expected clients will generate ephemeral port numbers. + +Implementations are free to choose how to allocate port numbers, but +it is recommended manually configured port numbers are small, with +the high order bit clear, and that numbers > 2^23 (high order bit +set) be used for ephemeral allocations. + +It is recommended that separate short queues (perhaps just one or two +messages long) be kept per local port in implementations, to prevent +head-of-line blocking issues where backpressure on one consumer +(perhaps just a single thread or socket) blocks others. + +URI Format +~~~~~~~~~~ + +The URI scheme used to represent ZeroTier addresses makes use of +ZeroTier IDs, ZeroTier network IDs, and our own 24-bit ports. + +The format shall be `zt://<nwid>/<ztid>:<port>`, where the `<nwid>` +component represents the 64-bit hexadecimal ZeroTier network ID, +the `<ztid>` represents the 40-bit hexadecimal ZeroTier Device ID, +and the `<port>` is the 24-bit port number previously described. + +A responder may elide the `<ztid>/` portion, to just bind to itself, +in which case the format will be `zt://<nwid>/<ztid>:<port>`. + +A port number of 0 may be used when listening to indicate that a random +ephemeral port should be chosen. + +An implementation *MAY* allow the `<ztid>` t0 be replaced with `*` to +indicate that the node's local ZT_ID be used. + +// XXX: the ztid could use DNS names, generating 6PLANE IP addresses, +// and extracting the 10 digit device id from that. Note that there +// is no good way to determine a nwid automatically. The 6PLANE +// address is determined by a non-reversible XOR transform of the +// network id. + +Security Considerations +~~~~~~~~~~~~~~~~~~~~~~~ + +The mapping isn't intended to provide any additional security beyond that +provided by ZeroTier itself. Managing the key materials used by ZeroTier +is implementation-specific, and they must take the appropriate care when +dealing with them. diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c new file mode 100644 index 00000000..40af6e54 --- /dev/null +++ b/src/transport/zerotier/zerotier.c @@ -0,0 +1,2677 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifdef NNG_HAVE_ZEROTIER +#include <ctype.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" + +#ifndef _WIN32 +#include <unistd.h> +#endif + +#include <ZeroTierOne.h> + +const char *nng_opt_zt_home = "zt:home"; +const char *nng_opt_zt_nwid = "zt:nwid"; +const char *nng_opt_zt_node = "zt:node"; +const char *nng_opt_zt_status = "zt:status"; +const char *nng_opt_zt_network_name = "zt:network-name"; +const char *nng_opt_zt_local_port = "zt:local-port"; +const char *nng_opt_zt_ping_time = "zt:ping-time"; +const char *nng_opt_zt_ping_count = "zt:ping-count"; + +int nng_optid_zt_home = -1; +int nng_optid_zt_nwid = -1; +int nng_optid_zt_node = -1; +int nng_optid_zt_status = -1; +int nng_optid_zt_network_name = -1; +int nng_optid_zt_ping_time = -1; +int nng_optid_zt_ping_count = -1; +int nng_optid_zt_local_port = -1; + +// These values are supplied to help folks checking status. They are the +// return values from nng_optid_zt_status. +int nng_zt_status_configuring = ZT_NETWORK_STATUS_REQUESTING_CONFIGURATION; +int nng_zt_status_ok = ZT_NETWORK_STATUS_OK; +int nng_zt_status_denied = ZT_NETWORK_STATUS_ACCESS_DENIED; +int nng_zt_status_notfound = ZT_NETWORK_STATUS_NOT_FOUND; +int nng_zt_status_error = ZT_NETWORK_STATUS_PORT_ERROR; +int nng_zt_status_obsolete = ZT_NETWORK_STATUS_CLIENT_TOO_OLD; + +// ZeroTier Transport. This sits on the ZeroTier L2 network, which itself +// is implemented on top of UDP. This requires the 3rd party +// libzerotiercore library (which is GPLv3!) and platform specific UDP +// functionality to be built in. Note that care must be taken to link +// dynamically if one wishes to avoid making your entire application GPL3. +// (Alternatively ZeroTier offers commercial licenses which may prevent +// this particular problem.) This implementation does not make use of +// certain advanced capabilities in ZeroTier such as more sophisticated +// route management and TCP fallback. You need to have connectivity +// to the Internet to use this. (Or at least to your Planetary root.) +// +// Because ZeroTier takes a while to establish connectivity, it is even +// more important that applicaitons using the ZeroTier transport not +// assume that a connection will be immediately available. It can take +// quite a few seconds for peer-to-peer connectivity to be established. +// +// The ZeroTier transport was funded by Capitar IT Group, BV. +// +// This transport is highly experimental. + +// ZeroTier and UDP are connectionless, but nng is designed around +// connection oriented paradigms. An "unreliable" connection is created +// on top using our own network protocol. The details of this are +// documented in the RFC. + +// Every participant has an "address", which is a 64-bit value constructed +// using the ZT node number in the upper 40-bits, and a 24-bit port number +// in the lower bits. We elect to operate primarily on these addresses, +// but the wire protocol relies on just conveying the 24-bit port along +// with the MAC address (from which the ZT node number can be derived, +// given the network ID.) + +typedef struct zt_pipe zt_pipe; +typedef struct zt_ep zt_ep; +typedef struct zt_node zt_node; +typedef struct zt_frag zt_frag; +typedef struct zt_fraglist zt_fraglist; + +// Port numbers are stored as 24-bit values in network byte order. +#define ZT_GET24(ptr, v) \ + v = (((uint32_t)((uint8_t)(ptr)[0])) << 16) + \ + (((uint32_t)((uint8_t)(ptr)[1])) << 8) + \ + (((uint32_t)(uint8_t)(ptr)[2])) + +#define ZT_PUT24(ptr, u) \ + do { \ + (ptr)[0] = (uint8_t)(((uint32_t)(u)) >> 16); \ + (ptr)[1] = (uint8_t)(((uint32_t)(u)) >> 8); \ + (ptr)[2] = (uint8_t)((uint32_t)(u)); \ + } while (0) + +static const uint16_t zt_ethertype = 0x901; +static const uint8_t zt_version = 0x01; +static const uint32_t zt_ephemeral = 0x800000u; // start of ephemeral ports +static const uint32_t zt_max_port = 0xffffffu; // largest port +static const uint32_t zt_port_mask = 0xffffffu; // mask of valid ports + +// These are compile time tunables for now. +enum zt_tunables { + zt_listenq = 128, // backlog queue length + zt_listen_expire = 60000000, // maximum time in backlog + zt_rcv_bufsize = ZT_MAX_MTU + 128, // max UDP recv + zt_conn_attempts = 12, // connection attempts (default) + zt_conn_interval = 5000000, // between attempts (usec) + zt_udp_sendq = 16, // outgoing UDP queue length + zt_recvq = 2, // max pending recv (per pipe) + zt_recv_stale = 1000000, // frags older than are stale + zt_ping_time = 60000000, // if no traffic, ping time (usec) + zt_ping_count = 5, // max ping attempts before close +}; + +enum zt_op_codes { + zt_op_data = 0x00, // data, final fragment + zt_op_conn_req = 0x10, // connect request + zt_op_conn_ack = 0x12, // connect accepted + zt_op_disc_req = 0x20, // disconnect request (no ack) + zt_op_ping = 0x30, // ping request + zt_op_pong = 0x32, // ping response + zt_op_error = 0x40, // error response +}; + +enum zt_offsets { + zt_offset_op = 0x00, + zt_offset_flags = 0x01, + zt_offset_version = 0x02, // protocol version number (2 bytes) + zt_offset_zero1 = 0x04, // reserved, must be zero (1 byte) + zt_offset_dst_port = 0x05, // destination port (3 bytes) + zt_offset_zero2 = 0x08, // reserved, must be zero (1 byte) + zt_offset_src_port = 0x09, // source port number (3 bytes) + zt_offset_creq_proto = 0x0C, // SP protocol number (2 bytes) + zt_offset_cack_proto = 0x0C, // SP protocol number (2 bytes) + zt_offset_err_code = 0x0C, // error code (1 byte) + zt_offset_err_msg = 0x0D, // error message (string) + zt_offset_data_id = 0x0C, // message ID (2 bytes) + zt_offset_data_fragsz = 0x0E, // fragment size + zt_offset_data_frag = 0x10, // fragment number, first is 1 (2 bytes) + zt_offset_data_nfrag = 0x12, // total fragments (2 bytes) + zt_offset_data_data = 0x14, // user payload + zt_size_headers = 0x0C, // size of headers + zt_size_conn_req = 0x0E, // size of conn_req (connect request) + zt_size_conn_ack = 0x0E, // size of conn_ack (connect reply) + zt_size_disc_req = 0x0C, // size of disc_req (disconnect) + zt_size_ping = 0x0C, // size of ping request + zt_size_pong = 0x0C, // size of ping reply + zt_size_data = 0x14, // size of data message (w/o payload) +}; + +enum zt_errors { + zt_err_refused = 0x01, // Connection refused + zt_err_notconn = 0x02, // Connection does not exit + zt_err_wrongsp = 0x03, // SP protocol mismatch + zt_err_proto = 0x04, // Other protocol errror + zt_err_msgsize = 0x05, // Message to large + zt_err_unknown = 0x06, // Other errors +}; + +// This node structure is wrapped around the ZT_node; this allows us to +// have multiple endpoints referencing the same ZT_node, but also to +// support different nodes (identities) based on different homedirs. +// This means we need to stick these on a global linked list, manage +// them with a reference count, and uniquely identify them using the +// homedir. +struct zt_node { + char zn_path[NNG_MAXADDRLEN]; // ought to be sufficient + ZT_Node * zn_znode; + uint64_t zn_self; + nni_list_node zn_link; + int zn_closed; + nni_plat_udp *zn_udp4; + nni_plat_udp *zn_udp6; + nni_list zn_eplist; + nni_list zn_plist; + nni_idhash * zn_ports; + nni_idhash * zn_eps; + nni_idhash * zn_lpipes; + nni_idhash * zn_rpipes; + nni_idhash * zn_peers; // indexed by remote address + nni_aio * zn_rcv4_aio; + char * zn_rcv4_buf; + nng_sockaddr zn_rcv4_addr; + nni_aio * zn_rcv6_aio; + char * zn_rcv6_buf; + nng_sockaddr zn_rcv6_addr; + nni_thr zn_bgthr; + nni_time zn_bgtime; + nni_cv zn_bgcv; + nni_cv zn_snd6_cv; +}; + +// The fragment list is used to keep track of incoming received +// fragments for reassembly into a complete message. +struct zt_fraglist { + nni_time fl_time; // time first frag was received + uint32_t fl_msgid; // message id + int fl_ready; // we have all messages + unsigned int fl_fragsz; + unsigned int fl_nfrags; + uint8_t * fl_missing; + size_t fl_missingsz; + nni_msg * fl_msg; +}; + +struct zt_pipe { + nni_list_node zp_link; + const char * zp_addr; + zt_node * zp_ztn; + uint64_t zp_nwid; + uint64_t zp_laddr; + uint64_t zp_raddr; + uint16_t zp_peer; + uint16_t zp_proto; + uint16_t zp_next_msgid; + size_t zp_rcvmax; + size_t zp_mtu; + int zp_closed; + nni_aio * zp_user_rxaio; + nni_time zp_last_recv; + zt_fraglist zp_recvq[zt_recvq]; + int zp_ping_try; + int zp_ping_count; + nni_duration zp_ping_time; + nni_aio * zp_ping_aio; +}; + +typedef struct zt_creq zt_creq; +struct zt_creq { + uint64_t cr_expire; + uint64_t cr_raddr; + uint16_t cr_proto; +}; + +struct zt_ep { + nni_list_node ze_link; + char ze_url[NNG_MAXADDRLEN]; + char ze_home[NNG_MAXADDRLEN]; // should be enough + zt_node * ze_ztn; + uint64_t ze_nwid; + int ze_mode; + nni_sockaddr ze_addr; + uint64_t ze_raddr; // remote node address + uint64_t ze_laddr; // local node address + uint16_t ze_proto; + size_t ze_rcvmax; + nni_aio * ze_aio; + nni_aio * ze_creq_aio; + int ze_creq_try; + nni_list ze_aios; + int ze_maxmtu; + int ze_phymtu; + int ze_ping_count; + nni_duration ze_ping_time; + + // Incoming connection requests (server only). We only + // only have "accepted" requests -- that is we won't have an + // established connection/pipe unless the application calls + // accept. Since the "application" is our library, that should + // be pretty much as fast we can run. + zt_creq ze_creqs[zt_listenq]; + int ze_creq_head; + int ze_creq_tail; +}; + +// Locking strategy. At present the ZeroTier core is not reentrant or fully +// threadsafe. (We expect this will be fixed.) Furthermore, there are +// some significant challenges in dealing with locks associated with the +// callbacks, etc. So we take a big-hammer approach, and just use a single +// global lock for everything. We hold this lock when calling into the +// ZeroTier framework. Since ZeroTier has no independent threads, that +// means that it will always hold this lock in its core, and the lock will +// also be held automatically in any of our callbacks. We never hold any +// other locks across ZeroTier core calls. We may not acquire the global +// lock in callbacks (they will already have it held). Any other locks +// can be acquired as long as they are not held during calls into ZeroTier. +// +// This will have a detrimental impact on performance, but to be completely +// honest we don't think anyone will be using the ZeroTier transport in +// performance critical applications; scalability may become a factor for +// large servers sitting in a ZeroTier hub situation. (Then again, since +// only the zerotier procesing is single threaded, it may not +// be that much of a bottleneck -- really depends on how expensive these +// operations are. We can use lockstat or other lock-hotness tools to +// check for this later.) + +static nni_mtx zt_lk; +static nni_list zt_nodes; + +static void zt_ep_send_conn_req(zt_ep *); +static void zt_ep_conn_req_cb(void *); +static void zt_ep_doaccept(zt_ep *); +static void zt_pipe_dorecv(zt_pipe *); +static int zt_pipe_init(zt_pipe **, zt_ep *, uint64_t, uint64_t); +static void zt_pipe_ping_cb(void *); +static void zt_fraglist_clear(zt_fraglist *); +static void zt_fraglist_free(zt_fraglist *); +static void zt_virtual_recv(ZT_Node *, void *, void *, uint64_t, void **, + uint64_t, uint64_t, unsigned int, unsigned int, const void *, + unsigned int); + +static uint64_t +zt_now(void) +{ + // We return msec + return (nni_clock() / 1000); +} + +static void +zt_bgthr(void *arg) +{ + zt_node *ztn = arg; + nni_time now; + + nni_mtx_lock(&zt_lk); + for (;;) { + now = nni_clock(); + + if (ztn->zn_closed) { + break; + } + + if (now < ztn->zn_bgtime) { + nni_cv_until(&ztn->zn_bgcv, ztn->zn_bgtime); + continue; + } + + now /= 1000; // usec -> msec + ZT_Node_processBackgroundTasks(ztn->zn_znode, NULL, now, &now); + + ztn->zn_bgtime = now * 1000; // usec + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_node_resched(zt_node *ztn, uint64_t msec) +{ + ztn->zn_bgtime = msec * 1000; // convert to usec + nni_cv_wake1(&ztn->zn_bgcv); +} + +static void +zt_node_rcv4_cb(void *arg) +{ + zt_node * ztn = arg; + nni_aio * aio = ztn->zn_rcv4_aio; + struct sockaddr_storage sa; + struct sockaddr_in * sin; + nng_sockaddr_in * nsin; + uint64_t now; + + if (nni_aio_result(aio) != 0) { + // Outside of memory exhaustion, we can't really think + // of any reason for this to legitimately fail. + // Arguably we should inject a fallback delay, but for + // now we just carry on. + return; + } + + memset(&sa, 0, sizeof(sa)); + sin = (void *) &sa; + nsin = &ztn->zn_rcv4_addr.s_un.s_in; + sin->sin_family = AF_INET; + sin->sin_port = nsin->sa_port; + sin->sin_addr.s_addr = nsin->sa_addr; + + nni_mtx_lock(&zt_lk); + now = zt_now(); + + // We are not going to perform any validation of the data; we + // just pass this straight into the ZeroTier core. + // XXX: CHECK THIS, if it fails then we have a fatal error with + // the znode, and have to shut everything down. + ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa, + ztn->zn_rcv4_buf, aio->a_count, &now); + + // Schedule background work + zt_node_resched(ztn, now); + + // Schedule another receive. + if (ztn->zn_udp4 != NULL) { + aio->a_niov = 1; + aio->a_iov[0].iov_buf = ztn->zn_rcv4_buf; + aio->a_iov[0].iov_len = zt_rcv_bufsize; + aio->a_addr = &ztn->zn_rcv4_addr; + aio->a_count = 0; + + nni_plat_udp_recv(ztn->zn_udp4, aio); + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_node_rcv6_cb(void *arg) +{ + zt_node * ztn = arg; + nni_aio * aio = ztn->zn_rcv6_aio; + struct sockaddr_storage sa; + struct sockaddr_in6 * sin6; + struct nng_sockaddr_in6 *nsin6; + uint64_t now; + + if (nni_aio_result(aio) != 0) { + // Outside of memory exhaustion, we can't really think + // of any reason for this to legitimately fail. + // Arguably we should inject a fallback delay, but for + // now we just carry on. + return; + } + + memset(&sa, 0, sizeof(sa)); + sin6 = (void *) &sa; + nsin6 = &ztn->zn_rcv6_addr.s_un.s_in6; + sin6->sin6_family = AF_INET6; + sin6->sin6_port = nsin6->sa_port; + memcpy(&sin6->sin6_addr, nsin6->sa_addr, 16); + + nni_mtx_lock(&zt_lk); + now = zt_now(); // msec + + // We are not going to perform any validation of the data; we + // just pass this straight into the ZeroTier core. + ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa, + ztn->zn_rcv6_buf, aio->a_count, &now); + + // Schedule background work + zt_node_resched(ztn, now); + + // Schedule another receive. + if (ztn->zn_udp6 != NULL) { + aio->a_niov = 1; + aio->a_iov[0].iov_buf = ztn->zn_rcv6_buf; + aio->a_iov[0].iov_len = zt_rcv_bufsize; + aio->a_addr = &ztn->zn_rcv6_addr; + aio->a_count = 0; + nni_plat_udp_recv(ztn->zn_udp6, aio); + } + nni_mtx_unlock(&zt_lk); +} + +static uint64_t +zt_mac_to_node(uint64_t mac, uint64_t nwid) +{ + uint64_t node; + // This extracts a node address from a mac addres. The + // network ID is mixed in, and has to be extricated. We + // the node ID is located in the lower 40 bits, and scrambled + // against the nwid. + node = mac & 0xffffffffffull; + node ^= ((nwid >> 8) & 0xff) << 32; + node ^= ((nwid >> 16) & 0xff) << 24; + node ^= ((nwid >> 24) & 0xff) << 16; + node ^= ((nwid >> 32) & 0xff) << 8; + node ^= (nwid >> 40) & 0xff; + return (node); +} + +static uint64_t +zt_node_to_mac(uint64_t node, uint64_t nwid) +{ + uint64_t mac; + // We use LSB of network ID, and make sure that we clear + // multicast and set local administration -- this is the first + // octet of the 48 bit mac address. We also avoid 0x52, which + // is known to be used in KVM, libvirt, etc. + mac = ((uint8_t)(nwid & 0xfe) | 0x02); + if (mac == 0x52) { + mac = 0x32; + } + mac <<= 40; + mac |= node; + // The rest of the network ID is XOR'd in, in reverse byte + // order. + mac ^= ((nwid >> 8) & 0xff) << 32; + mac ^= ((nwid >> 16) & 0xff) << 24; + mac ^= ((nwid >> 24) & 0xff) << 16; + mac ^= ((nwid >> 32) & 0xff) << 8; + mac ^= (nwid >> 40) & 0xff; + return (mac); +} + +static int +zt_result(enum ZT_ResultCode rv) +{ + switch (rv) { + case ZT_RESULT_OK: + return (0); + case ZT_RESULT_OK_IGNORED: + return (0); + case ZT_RESULT_FATAL_ERROR_OUT_OF_MEMORY: + return (NNG_ENOMEM); + case ZT_RESULT_FATAL_ERROR_DATA_STORE_FAILED: + return (NNG_EPERM); + case ZT_RESULT_FATAL_ERROR_INTERNAL: + return (NNG_EINTERNAL); + case ZT_RESULT_ERROR_NETWORK_NOT_FOUND: + return (NNG_EADDRINVAL); + case ZT_RESULT_ERROR_UNSUPPORTED_OPERATION: + return (NNG_ENOTSUP); + case ZT_RESULT_ERROR_BAD_PARAMETER: + return (NNG_EINVAL); + default: + return (NNG_ETRANERR + (int) rv); + } +} + +// ZeroTier Node API callbacks +static int +zt_virtual_config(ZT_Node *node, void *userptr, void *thr, uint64_t nwid, + void **netptr, enum ZT_VirtualNetworkConfigOperation op, + const ZT_VirtualNetworkConfig *config) +{ + zt_node *ztn = userptr; + zt_ep * ep; + + NNI_ARG_UNUSED(thr); + NNI_ARG_UNUSED(netptr); + + NNI_ASSERT(node == ztn->zn_znode); + + // Maybe we don't have to create taps or anything like that. + // We do get our mac and MTUs from this, so there's that. + switch (op) { + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_UP: + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_CONFIG_UPDATE: + + // We only really care about changes to the MTU. From + // an API perspective the MAC could change, but that + // cannot really happen because the node identity and + // the nwid are fixed. + NNI_LIST_FOREACH (&ztn->zn_eplist, ep) { + NNI_ASSERT(nwid == config->nwid); + if (ep->ze_nwid != config->nwid) { + continue; + } + ep->ze_maxmtu = config->mtu; + ep->ze_phymtu = config->physicalMtu; + + if ((ep->ze_mode == NNI_EP_MODE_DIAL) && + (nni_list_first(&ep->ze_aios) != NULL)) { + zt_ep_send_conn_req(ep); + } + // if (ep->ze_mode == NNI_EP + // zt_send_ + // nni_aio_finish(ep->ze_join_aio, 0); + // } + // XXX: schedule creqs if needed! + } + break; + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY: + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DOWN: + // XXX: tear down endpoints? + default: + break; + } + return (0); +} + +// zt_send modifies the start of the supplied buffer to update the +// message headers with protocol specific details (version, port numbers, +// etc.) and then sends it over the virtual network. +static void +zt_send(zt_node *ztn, uint64_t nwid, uint8_t op, uint64_t raddr, + uint64_t laddr, uint8_t *data, size_t len) +{ + uint64_t srcmac = zt_node_to_mac(laddr >> 24, nwid); + uint64_t dstmac = zt_node_to_mac(raddr >> 24, nwid); + uint64_t now = zt_now(); + + NNI_ASSERT(len >= zt_size_headers); + data[zt_offset_op] = op; + data[zt_offset_flags] = 0; + data[zt_offset_zero1] = 0; + data[zt_offset_zero2] = 0; + NNI_PUT16(data + zt_offset_version, zt_version); + ZT_PUT24(data + zt_offset_dst_port, raddr & zt_port_mask); + ZT_PUT24(data + zt_offset_src_port, laddr & zt_port_mask); + + // If we are looping back, bypass ZT. + if (srcmac == dstmac) { + zt_virtual_recv(ztn->zn_znode, ztn, NULL, nwid, NULL, srcmac, + dstmac, zt_ethertype, 0, data, len); + return; + } + + (void) ZT_Node_processVirtualNetworkFrame(ztn->zn_znode, NULL, now, + nwid, srcmac, dstmac, zt_ethertype, 0, data, len, &now); + + zt_node_resched(ztn, now); +} + +static void +zt_send_err(zt_node *ztn, uint64_t nwid, uint64_t raddr, uint64_t laddr, + uint8_t err, const char *msg) +{ + uint8_t data[128]; + + NNI_ASSERT((strlen(msg) + zt_offset_err_msg) < sizeof(data)); + + data[zt_offset_err_code] = err; + nni_strlcpy((char *) data + zt_offset_err_msg, msg, + sizeof(data) - zt_offset_err_msg); + + zt_send(ztn, nwid, zt_op_error, raddr, laddr, data, + strlen(msg) + zt_offset_err_msg); +} + +static void +zt_pipe_send_err(zt_pipe *p, uint8_t err, const char *msg) +{ + zt_send_err(p->zp_ztn, p->zp_nwid, p->zp_raddr, p->zp_laddr, err, msg); +} + +static void +zt_pipe_send_disc_req(zt_pipe *p) +{ + uint8_t data[zt_size_disc_req]; + + zt_send(p->zp_ztn, p->zp_nwid, zt_op_disc_req, p->zp_raddr, + p->zp_laddr, data, sizeof(data)); +} + +static void +zt_pipe_send_ping(zt_pipe *p) +{ + uint8_t data[zt_size_ping]; + + zt_send(p->zp_ztn, p->zp_nwid, zt_op_ping, p->zp_raddr, p->zp_laddr, + data, sizeof(data)); +} + +static void +zt_pipe_send_pong(zt_pipe *p) +{ + uint8_t data[zt_size_ping]; + + zt_send(p->zp_ztn, p->zp_nwid, zt_op_pong, p->zp_raddr, p->zp_laddr, + data, sizeof(data)); +} + +static void +zt_pipe_send_conn_ack(zt_pipe *p) +{ + uint8_t data[zt_size_conn_ack]; + + NNI_PUT16(data + zt_offset_cack_proto, p->zp_proto); + zt_send(p->zp_ztn, p->zp_nwid, zt_op_conn_ack, p->zp_raddr, + p->zp_laddr, data, sizeof(data)); +} + +static void +zt_ep_send_conn_req(zt_ep *ep) +{ + uint8_t data[zt_size_conn_req]; + + NNI_PUT16(data + zt_offset_creq_proto, ep->ze_proto); + zt_send(ep->ze_ztn, ep->ze_nwid, zt_op_conn_req, ep->ze_raddr, + ep->ze_laddr, data, sizeof(data)); +} + +static void +zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) +{ + zt_node *ztn = ep->ze_ztn; + nni_aio *aio = ep->ze_creq_aio; + zt_pipe *p; + int rv; + + if (ep->ze_mode != NNI_EP_MODE_DIAL) { + zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, + zt_err_proto, "Inappropriate operation"); + return; + } + + if (len != zt_size_conn_ack) { + zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, + zt_err_proto, "Bad message length"); + return; + } + + if (ep->ze_creq_try == 0) { + return; + } + + // Do we already have a matching pipe? If so, we can discard + // the operation. This should not happen, since we normally, + // deregister the endpoint when we create the pipe. + if ((nni_idhash_find(ztn->zn_peers, raddr, (void **) &p)) == 0) { + return; + } + + if ((rv = zt_pipe_init(&p, ep, raddr, ep->ze_laddr)) != 0) { + // We couldn't create the pipe, just drop it. + nni_aio_finish_error(aio, rv); + return; + } + NNI_GET16(data + zt_offset_cack_proto, p->zp_peer); + + // Reset the address of the endpoint, so that the next call to + // ep_connect will bind a new one -- we are using this one for the + // pipe. + nni_idhash_remove(ztn->zn_eps, ep->ze_laddr); + ep->ze_laddr = 0; + + nni_aio_finish_pipe(aio, p); +} + +static void +zt_ep_recv_conn_req(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) +{ + zt_node *ztn = ep->ze_ztn; + zt_pipe *p; + int i; + + if (ep->ze_mode != NNI_EP_MODE_LISTEN) { + zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, + zt_err_proto, "Inappropriate operation"); + return; + } + if (len != zt_size_conn_req) { + zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, + zt_err_proto, "Bad message length"); + return; + } + + // If we already have created a pipe for this connection + // then just reply the conn ack. + if ((nni_idhash_find(ztn->zn_peers, raddr, (void **) &p)) == 0) { + zt_pipe_send_conn_ack(p); + return; + } + + // We may already have a connection request queued (if this was + // a resend for example); if that's the case we just ignore + // this one. + for (i = ep->ze_creq_tail; i != ep->ze_creq_head; i++) { + if (ep->ze_creqs[i % zt_listenq].cr_raddr == raddr) { + return; + } + } + // We may already have filled our listenq, in which case we just drop. + if ((ep->ze_creq_tail + zt_listenq) == ep->ze_creq_head) { + // We have taken as many as we can, so just drop it. + return; + } + + // Record the connection request, and then process any + // pending acceptors. + i = ep->ze_creq_head % zt_listenq; + + NNI_GET16(data + zt_offset_creq_proto, ep->ze_creqs[i].cr_proto); + ep->ze_creqs[i].cr_raddr = raddr; + ep->ze_creqs[i].cr_expire = nni_clock() + zt_listen_expire; + ep->ze_creq_head++; + + zt_ep_doaccept(ep); +} + +static void +zt_ep_recv_error(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) +{ + nni_aio *aio; + int code; + + // Most of the time we don't care about errors. The exception here + // is that when we have an outstanding CON_REQ, we would like to + // process that appropriately. + + if (ep->ze_mode != NNI_EP_MODE_DIAL) { + // Drop it. + return; + } + + if (len < zt_offset_err_msg) { + // Malformed error frame. + return; + } + + code = data[zt_offset_err_code]; + switch (code) { + case zt_err_refused: + code = NNG_ECONNREFUSED; + break; + case zt_err_notconn: + code = NNG_ECLOSED; + break; + case zt_err_wrongsp: + code = NNG_EPROTO; + break; + default: + code = NNG_ETRANERR; + break; + } + + if (ep->ze_creq_try > 0) { + ep->ze_creq_try = 0; + nni_aio_finish_error(ep->ze_creq_aio, code); + } +} + +static void +zt_ep_virtual_recv( + zt_ep *ep, uint8_t op, uint64_t raddr, const uint8_t *data, size_t len) +{ + // Only listeners should be receiving. Dialers receive on the pipe, + // rather than the endpoint. The only message that endpoints can + // receive are connection requests. + switch (op) { + case zt_op_conn_req: + zt_ep_recv_conn_req(ep, raddr, data, len); + return; + case zt_op_conn_ack: + zt_ep_recv_conn_ack(ep, raddr, data, len); + return; + case zt_op_error: + zt_ep_recv_error(ep, raddr, data, len); + return; + default: + zt_send_err(ep->ze_ztn, ep->ze_nwid, raddr, ep->ze_laddr, + zt_err_proto, "Bad operation"); + return; + } +} + +static void +zt_pipe_close_err(zt_pipe *p, int err, uint8_t code, const char *msg) +{ + nni_aio *aio; + if ((aio = p->zp_user_rxaio) != NULL) { + p->zp_user_rxaio = NULL; + nni_aio_finish_error(aio, err); + } + if ((aio = p->zp_ping_aio) != NULL) { + nni_aio_finish_error(aio, NNG_ECLOSED); + } + p->zp_closed = 1; + if (msg != NULL) { + zt_pipe_send_err(p, code, msg); + } +} + +static void +zt_pipe_recv_data(zt_pipe *p, const uint8_t *data, size_t len) +{ + nni_aio * aio; + uint16_t msgid; + uint16_t fragno; + uint16_t nfrags; + uint16_t fragsz; + zt_fraglist *fl; + int i; + int slot; + uint8_t bit; + uint8_t * body; + + if (len < zt_size_data) { + // Runt frame. Drop it and close pipe with a protocol error. + zt_pipe_close_err(p, NNG_EPROTO, zt_err_proto, "Runt frame"); + return; + } + + NNI_GET16(data + zt_offset_data_id, msgid); + NNI_GET16(data + zt_offset_data_fragsz, fragsz); + NNI_GET16(data + zt_offset_data_frag, fragno); + NNI_GET16(data + zt_offset_data_nfrag, nfrags); + len -= zt_offset_data_data; + data += zt_offset_data_data; + + // Check for cases where message size is clearly too large. Note + // that we only can catch the case where a message is larger by + // more than a fragment, since the final fragment may be shorter, + // and we won't know that until we receive it. + if ((nfrags * fragsz) >= (p->zp_rcvmax + fragsz)) { + // Discard, as the forwarder might be on the other side + // of a device. This is gentler than just shutting the pipe + // down. Sending a remote error might be polite, but since + // most peers will close the pipe on such an error, we + // simply silent discard it. + return; + } + + // We run the recv logic once, to clear stale fragment entries. + zt_pipe_dorecv(p); + + // Find a suitable fragment slot. + slot = -1; + for (i = 0; i < zt_recvq; i++) { + fl = &p->zp_recvq[i]; + // This was our message ID, we always use it. + if (msgid == fl->fl_msgid) { + slot = i; + break; + } + + if (slot < 0) { + slot = i; + } else if (fl->fl_time < p->zp_recvq[slot].fl_time) { + // This has an earlier expiration, so lets choose it. + slot = i; + } + } + + NNI_ASSERT(slot >= 0); + + fl = &p->zp_recvq[slot]; + if (fl->fl_msgid != msgid) { + // First fragment we've received for this message (but might + // not be first fragment for message!) + zt_fraglist_clear(fl); + + if (nni_msg_alloc(&fl->fl_msg, nfrags * fragsz) != 0) { + // Out of memory. We don't close the pipe, but + // just fail to receive the message. Bump a stat? + return; + } + + fl->fl_nfrags = nfrags; + fl->fl_fragsz = fragsz; + fl->fl_msgid = msgid; + fl->fl_time = nni_clock(); + + // Set the missing mask. + memset(fl->fl_missing, 0xff, nfrags / 8); + fl->fl_missing[nfrags / 8] |= ((1 << (nfrags % 8)) - 1); + } + if ((nfrags != fl->fl_nfrags) || (fragsz != fl->fl_fragsz) || + (fragno >= nfrags) || (fragsz == 0) || (nfrags == 0) || + ((fragno != (nfrags - 1)) && (len != fragsz))) { + // Protocol error, message parameters changed. + zt_pipe_close_err( + p, NNG_EPROTO, zt_err_proto, "Invalid message parameters"); + zt_fraglist_clear(fl); + return; + } + + bit = (uint8_t)(1 << (fragno % 8)); + if ((fl->fl_missing[fragno / 8] & bit) == 0) { + // We've already got this fragment, ignore it. We don't + // bother to check for changed data. + return; + } + + fl->fl_missing[fragno / 8] &= ~(bit); + body = nni_msg_body(fl->fl_msg); + body += fragno * fragsz; + memcpy(body, data, len); + if (fragno == (nfrags - 1)) { + // Last frag, maybe shorten the message. + nni_msg_chop(fl->fl_msg, (fragsz - len)); + if (nni_msg_len(fl->fl_msg) > p->zp_rcvmax) { + // Strict enforcement of max recv. + zt_fraglist_clear(fl); + // Just discard the message. + return; + } + } + + for (i = 0; i < ((nfrags + 7) / 8); i++) { + if (fl->fl_missing[i]) { + return; + } + } + + // We got all fragments... try to send it up. + fl->fl_ready = 1; + zt_pipe_dorecv(p); +} + +static void +zt_pipe_recv_ping(zt_pipe *p, const uint8_t *data, size_t len) +{ + NNI_ARG_UNUSED(data); + + if (len != zt_size_ping) { + zt_pipe_send_err(p, zt_err_proto, "Incorrect ping size"); + return; + } + zt_pipe_send_pong(p); +} + +static void +zt_pipe_recv_pong(zt_pipe *p, const uint8_t *data, size_t len) +{ + NNI_ARG_UNUSED(data); + + if (len != zt_size_pong) { + zt_pipe_send_err(p, zt_err_proto, "Incorrect pong size"); + } +} + +static void +zt_pipe_recv_disc_req(zt_pipe *p, const uint8_t *data, size_t len) +{ + nni_aio *aio; + // NB: lock held already. + // Don't bother to check the length, going to disconnect anyway. + if ((aio = p->zp_user_rxaio) != NULL) { + p->zp_user_rxaio = NULL; + p->zp_closed = 1; + nni_aio_finish_error(aio, NNG_ECLOSED); + } +} + +static void +zt_pipe_recv_error(zt_pipe *p, const uint8_t *data, size_t len) +{ + nni_aio *aio; + + // Perhaps we should log an error message, but at the end of + // the day, the details are just not that interesting. + if ((aio = p->zp_user_rxaio) != NULL) { + p->zp_user_rxaio = NULL; + p->zp_closed = 1; + nni_aio_finish_error(aio, NNG_ETRANERR); + } +} + +// This function is called when we have determined that a frame has +// arrived for a pipe. The remote and local addresses were both +// matched by the caller. +static void +zt_pipe_virtual_recv(zt_pipe *p, uint8_t op, const uint8_t *data, size_t len) +{ + // We got data, so update our recv time. + p->zp_last_recv = nni_clock(); + p->zp_ping_try = 0; + + switch (op) { + case zt_op_data: + zt_pipe_recv_data(p, data, len); + return; + case zt_op_disc_req: + zt_pipe_recv_disc_req(p, data, len); + return; + case zt_op_ping: + zt_pipe_recv_ping(p, data, len); + return; + case zt_op_pong: + zt_pipe_recv_pong(p, data, len); + return; + case zt_op_error: + zt_pipe_recv_error(p, data, len); + return; + } +} + +// This function is called when a frame arrives on the +// *virtual* network. +static void +zt_virtual_recv(ZT_Node *node, void *userptr, void *thr, uint64_t nwid, + void **netptr, uint64_t srcmac, uint64_t dstmac, unsigned int ethertype, + unsigned int vlanid, const void *payload, unsigned int len) +{ + zt_node * ztn = userptr; + uint8_t op; + const uint8_t *data = payload; + uint16_t proto; + uint16_t version; + uint32_t rport; + uint32_t lport; + zt_ep * ep; + zt_pipe * p; + uint64_t raddr; + uint64_t laddr; + + if ((ethertype != zt_ethertype) || (len < zt_size_headers) || + (data[zt_offset_flags] != 0) || (data[zt_offset_zero1] != 0) || + (data[zt_offset_zero2] != 0)) { + return; + } + NNI_GET16(data + zt_offset_version, version); + if (version != zt_version) { + return; + } + + op = data[zt_offset_op]; + + ZT_GET24(data + zt_offset_dst_port, lport); + ZT_GET24(data + zt_offset_src_port, rport); + + raddr = zt_mac_to_node(srcmac, nwid); + raddr <<= 24; + raddr |= rport; + + laddr = zt_mac_to_node(dstmac, nwid); + laddr <<= 24; + laddr |= lport; + + // NB: We are holding the zt_lock. + + // Look up a pipe, but also we use this chance to check that + // the source address matches what the pipe was established with. + // If the pipe does not match then we nak it. Note that pipes can + // appear on the znode twice (loopback), so we have to be careful + // to check the entire set of parameters, and to check for server + // vs. client pipes separately. + + // If its a local address match on a client pipe, process it. + if ((nni_idhash_find(ztn->zn_lpipes, laddr, (void *) &p) == 0) && + (p->zp_nwid == nwid) && (p->zp_raddr == raddr)) { + zt_pipe_virtual_recv(p, op, data, len); + return; + } + + // If its a remote address match on a server pipe, process it. + if ((nni_idhash_find(ztn->zn_rpipes, raddr, (void *) &p) == 0) && + (p->zp_nwid == nwid) && (p->zp_laddr == laddr)) { + zt_pipe_virtual_recv(p, op, data, len); + return; + } + + // No pipe, so look for an endpoint. + if ((nni_idhash_find(ztn->zn_eps, laddr, (void **) &ep) == 0) && + (ep->ze_nwid == nwid)) { + // direct this to an endpoint. + zt_ep_virtual_recv(ep, op, raddr, data, len); + return; + } + + // We have a request for which we have no listener, and no + // pipe. For some of these we send back a NAK, but for others + // we just drop the frame. + switch (op) { + case zt_op_conn_req: + // No listener. Connection refused. + zt_send_err(ztn, nwid, raddr, laddr, zt_err_refused, + "Connection refused"); + return; + case zt_op_data: + case zt_op_ping: + case zt_op_conn_ack: + zt_send_err(ztn, nwid, raddr, laddr, zt_err_notconn, + "Connection not found"); + break; + case zt_op_error: + case zt_op_pong: + case zt_op_disc_req: + default: + // Just drop these. + break; + } +} + +static void +zt_event_cb(ZT_Node *node, void *userptr, void *thr, enum ZT_Event event, + const void *payload) +{ + NNI_ARG_UNUSED(node); + NNI_ARG_UNUSED(userptr); + NNI_ARG_UNUSED(thr); + + switch (event) { + case ZT_EVENT_ONLINE: // Connected to the virtual net. + case ZT_EVENT_UP: // Node initialized (may not be connected). + case ZT_EVENT_DOWN: // Teardown of the node. + case ZT_EVENT_OFFLINE: // Removal of the node from the net. + case ZT_EVENT_TRACE: // Local trace events. + // printf("TRACE: %s\n", (const char *) payload); + break; + case ZT_EVENT_REMOTE_TRACE: // Remote trace, not supported. + default: + break; + } +} + +static const char *zt_files[] = { + // clang-format off + NULL, // none, i.e. not used at all + "identity.public", + "identity.secret", + "planet", + NULL, // moon, e.g. moons.d/<ID>.moon -- we don't persist it + NULL, // peer, e.g. peers.d/<ID> -- we don't persist this + NULL, // network, e.g. networks.d/<ID>.conf -- we don't persist + // clang-format on +}; + +#ifdef _WIN32 +#define unlink DeleteFile +#define pathsep "\\" +#else +#define pathsep "/" +#endif + +static struct { + size_t len; + void * data; +} zt_ephemeral_state[ZT_STATE_OBJECT_NETWORK_CONFIG]; + +static void +zt_state_put(ZT_Node *node, void *userptr, void *thr, + enum ZT_StateObjectType objtype, const uint64_t objid[2], const void *data, + int len) +{ + FILE * file; + zt_node * ztn = userptr; + char path[NNG_MAXADDRLEN + 1]; + const char *fname; + size_t sz; + + NNI_ARG_UNUSED(objid); // only use global files + + if ((objtype > ZT_STATE_OBJECT_NETWORK_CONFIG) || + ((fname = zt_files[(int) objtype]) == NULL)) { + return; + } + + // If we have no valid path, then we just use ephemeral data. + if (strlen(ztn->zn_path) == 0) { + void * ndata = NULL; + void * odata = zt_ephemeral_state[objtype].data; + size_t olen = zt_ephemeral_state[objtype].len; + if ((len >= 0) && ((ndata = nni_alloc(len)) != NULL)) { + memcpy(ndata, data, len); + } + zt_ephemeral_state[objtype].data = ndata; + zt_ephemeral_state[objtype].len = len; + + if (olen > 0) { + nni_free(odata, olen); + } + return; + } + + sz = sizeof(path); + if (snprintf(path, sz, "%s%s%s", ztn->zn_path, pathsep, fname) >= sz) { + // If the path is too long, we can't cope. We + // just decline to store anything. + return; + } + + // We assume that everyone can do standard C I/O. + // This may be a bad assumption. If that's the case, + // the platform should supply an alternative + // implementation. We are also assuming that we don't + // need to worry about atomic updates. As these items + // (keys, etc.) pretty much don't change, this should + // be fine. + + if (len < 0) { + (void) unlink(path); + return; + } + + if ((file = fopen(path, "wb")) == NULL) { + return; + } + + if (fwrite(data, 1, len, file) != len) { + (void) unlink(path); + } + (void) fclose(file); +} + +static int +zt_state_get(ZT_Node *node, void *userptr, void *thr, + enum ZT_StateObjectType objtype, const uint64_t objid[2], void *data, + unsigned int len) +{ + FILE * file; + zt_node * ztn = userptr; + char path[NNG_MAXADDRLEN + 1]; + const char *fname; + int nread; + size_t sz; + + NNI_ARG_UNUSED(objid); // we only use global files + + if ((objtype > ZT_STATE_OBJECT_NETWORK_CONFIG) || + ((fname = zt_files[(int) objtype]) == NULL)) { + return (-1); + } + + // If no base directory, we are using ephemeral data. + if (strlen(ztn->zn_path) == 0) { + if (zt_ephemeral_state[objtype].data == NULL) { + return (-1); + } + if (zt_ephemeral_state[objtype].len > len) { + return (-1); + } + len = zt_ephemeral_state[objtype].len; + memcpy(data, zt_ephemeral_state[objtype].data, len); + return (len); + } + + sz = sizeof(path); + if (snprintf(path, sz, "%s%s%s", ztn->zn_path, pathsep, fname) >= sz) { + // If the path is too long, we can't cope. + return (-1); + } + + // We assume that everyone can do standard C I/O. + // This may be a bad assumption. If that's the case, + // the platform should supply an alternative + // implementation. We are also assuming that we don't + // need to worry about atomic updates. As these items + // (keys, etc.) pretty much don't change, this should + // be fine. + + if ((file = fopen(path, "rb")) == NULL) { + return (-1); + } + + // seek to end of file + (void) fseek(file, 0, SEEK_END); + if (ftell(file) > len) { + fclose(file); + return (-1); + } + (void) fseek(file, 0, SEEK_SET); + + nread = (int) fread(data, 1, len, file); + (void) fclose(file); + + return (nread); +} + +typedef struct zt_send_hdr { + nni_sockaddr sa; + size_t len; +} zt_send_hdr; + +static void +zt_wire_packet_send_cb(void *arg) +{ + // We don't actually care much about the results, we + // just need to release the resources. + nni_aio * aio = arg; + zt_send_hdr *hdr; + + hdr = nni_aio_get_data(aio); + nni_free(hdr, hdr->len + sizeof(*hdr)); + nni_aio_fini_cb(aio); +} + +// This function is called when ZeroTier desires to send a +// physical frame. The data is a UDP payload, the rest of the +// payload should be set over vanilla UDP. +static int +zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket, + const struct sockaddr_storage *remaddr, const void *data, unsigned int len, + unsigned int ttl) +{ + nni_aio * aio; + nni_sockaddr addr; + struct sockaddr_in * sin = (void *) remaddr; + struct sockaddr_in6 *sin6 = (void *) remaddr; + zt_node * ztn = userptr; + nni_plat_udp * udp; + uint16_t port; + char * buf; + zt_send_hdr * hdr; + + NNI_ARG_UNUSED(thr); + NNI_ARG_UNUSED(socket); + NNI_ARG_UNUSED(ttl); + + // Kind of unfortunate, but we have to convert the + // sockaddr to a neutral form, and then back again in + // the platform layer. + switch (sin->sin_family) { + case AF_INET: + addr.s_un.s_in.sa_family = NNG_AF_INET; + addr.s_un.s_in.sa_port = sin->sin_port; + addr.s_un.s_in.sa_addr = sin->sin_addr.s_addr; + udp = ztn->zn_udp4; + port = htons(sin->sin_port); + break; + case AF_INET6: + addr.s_un.s_in6.sa_family = NNG_AF_INET6; + addr.s_un.s_in6.sa_port = sin6->sin6_port; + udp = ztn->zn_udp6; + port = htons(sin6->sin6_port); + memcpy(addr.s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); + break; + default: + // No way to understand the address. + return (-1); + } + + if (nni_aio_init(&aio, zt_wire_packet_send_cb, NULL) != 0) { + // Out of memory + return (-1); + } + if ((buf = nni_alloc(sizeof(*hdr) + len)) == NULL) { + nni_aio_fini(aio); + return (-1); + } + + hdr = (void *) buf; + buf += sizeof(*hdr); + + memcpy(buf, data, len); + nni_aio_set_data(aio, hdr); + hdr->sa = addr; + hdr->len = len; + + aio->a_addr = &hdr->sa; + aio->a_niov = 1; + aio->a_iov[0].iov_buf = buf; + aio->a_iov[0].iov_len = len; + + // This should be non-blocking/best-effort, so while + // not great that we're holding the lock, also not tragic. + nni_aio_set_synch(aio); + nni_plat_udp_send(udp, aio); + + return (0); +} + +static struct ZT_Node_Callbacks zt_callbacks = { + .version = 0, + .statePutFunction = zt_state_put, + .stateGetFunction = zt_state_get, + .wirePacketSendFunction = zt_wire_packet_send, + .virtualNetworkFrameFunction = zt_virtual_recv, + .virtualNetworkConfigFunction = zt_virtual_config, + .eventCallback = zt_event_cb, + .pathCheckFunction = NULL, + .pathLookupFunction = NULL, +}; + +static void +zt_node_destroy(zt_node *ztn) +{ + nni_aio_stop(ztn->zn_rcv4_aio); + nni_aio_stop(ztn->zn_rcv6_aio); + + // Wait for background thread to exit! + nni_thr_fini(&ztn->zn_bgthr); + + if (ztn->zn_znode != NULL) { + ZT_Node_delete(ztn->zn_znode); + } + + if (ztn->zn_udp4 != NULL) { + nni_plat_udp_close(ztn->zn_udp4); + } + if (ztn->zn_udp6 != NULL) { + nni_plat_udp_close(ztn->zn_udp6); + } + + if (ztn->zn_rcv4_buf != NULL) { + nni_free(ztn->zn_rcv4_buf, zt_rcv_bufsize); + } + if (ztn->zn_rcv6_buf != NULL) { + nni_free(ztn->zn_rcv6_buf, zt_rcv_bufsize); + } + nni_aio_fini(ztn->zn_rcv4_aio); + nni_aio_fini(ztn->zn_rcv6_aio); + nni_idhash_fini(ztn->zn_eps); + nni_idhash_fini(ztn->zn_lpipes); + nni_idhash_fini(ztn->zn_rpipes); + nni_idhash_fini(ztn->zn_peers); + nni_cv_fini(&ztn->zn_bgcv); + NNI_FREE_STRUCT(ztn); +} + +static int +zt_node_create(zt_node **ztnp, const char *path) +{ + zt_node * ztn; + nng_sockaddr sa4; + nng_sockaddr sa6; + int rv; + enum ZT_ResultCode zrv; + + // We want to bind to any address we can (for now). + // Note that at the moment we only support IPv4. Its + // unclear how we are meant to handle underlying IPv6 + // in ZeroTier. Probably we can use IPv6 dual stock + // sockets if they exist, but not all platforms support + // dual-stack. Furhtermore, IPv6 is not available + // everywhere, and the root servers may be IPv4 only. + memset(&sa4, 0, sizeof(sa4)); + sa4.s_un.s_in.sa_family = NNG_AF_INET; + memset(&sa6, 0, sizeof(sa6)); + sa6.s_un.s_in6.sa_family = NNG_AF_INET6; + + if ((ztn = NNI_ALLOC_STRUCT(ztn)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&ztn->zn_eplist, zt_ep, ze_link); + NNI_LIST_INIT(&ztn->zn_plist, zt_pipe, zp_link); + nni_cv_init(&ztn->zn_bgcv, &zt_lk); + nni_aio_init(&ztn->zn_rcv4_aio, zt_node_rcv4_cb, ztn); + nni_aio_init(&ztn->zn_rcv6_aio, zt_node_rcv6_cb, ztn); + + if (((ztn->zn_rcv4_buf = nni_alloc(zt_rcv_bufsize)) == NULL) || + ((ztn->zn_rcv6_buf = nni_alloc(zt_rcv_bufsize)) == NULL)) { + zt_node_destroy(ztn); + return (NNG_ENOMEM); + } + if (((rv = nni_idhash_init(&ztn->zn_ports)) != 0) || + ((rv = nni_idhash_init(&ztn->zn_eps)) != 0) || + ((rv = nni_idhash_init(&ztn->zn_lpipes)) != 0) || + ((rv = nni_idhash_init(&ztn->zn_rpipes)) != 0) || + ((rv = nni_idhash_init(&ztn->zn_peers)) != 0) || + ((rv = nni_thr_init(&ztn->zn_bgthr, zt_bgthr, ztn)) != 0) || + ((rv = nni_plat_udp_open(&ztn->zn_udp4, &sa4)) != 0) || + ((rv = nni_plat_udp_open(&ztn->zn_udp6, &sa6)) != 0)) { + zt_node_destroy(ztn); + return (rv); + } + + // Setup for dynamic ephemeral port allocations. We + // set the range to allow for ephemeral ports, but not + // higher than the max port, and starting with an + // initial random value. Note that this should give us + // about 8 million possible ephemeral ports. + nni_idhash_set_limits(ztn->zn_ports, zt_ephemeral, zt_max_port, + (nni_random() % (zt_max_port - zt_ephemeral)) + zt_ephemeral); + + nni_strlcpy(ztn->zn_path, path, sizeof(ztn->zn_path)); + zrv = ZT_Node_new(&ztn->zn_znode, ztn, NULL, &zt_callbacks, zt_now()); + if (zrv != ZT_RESULT_OK) { + zt_node_destroy(ztn); + return (zt_result(zrv)); + } + + nni_list_append(&zt_nodes, ztn); + + ztn->zn_self = ZT_Node_address(ztn->zn_znode); + + nni_thr_run(&ztn->zn_bgthr); + + // Schedule an initial background run. + zt_node_resched(ztn, 1); + + // Schedule receive + ztn->zn_rcv4_aio->a_niov = 1; + ztn->zn_rcv4_aio->a_iov[0].iov_buf = ztn->zn_rcv4_buf; + ztn->zn_rcv4_aio->a_iov[0].iov_len = zt_rcv_bufsize; + ztn->zn_rcv4_aio->a_addr = &ztn->zn_rcv4_addr; + ztn->zn_rcv4_aio->a_count = 0; + ztn->zn_rcv6_aio->a_niov = 1; + ztn->zn_rcv6_aio->a_iov[0].iov_buf = ztn->zn_rcv6_buf; + ztn->zn_rcv6_aio->a_iov[0].iov_len = zt_rcv_bufsize; + ztn->zn_rcv6_aio->a_addr = &ztn->zn_rcv6_addr; + ztn->zn_rcv6_aio->a_count = 0; + + nni_plat_udp_recv(ztn->zn_udp4, ztn->zn_rcv4_aio); + nni_plat_udp_recv(ztn->zn_udp6, ztn->zn_rcv6_aio); + + *ztnp = ztn; + return (0); +} + +static int +zt_node_find(zt_ep *ep) +{ + zt_node * ztn; + int rv; + nng_sockaddr sa; + ZT_VirtualNetworkConfig *cf; + + NNI_LIST_FOREACH (&zt_nodes, ztn) { + if (strcmp(ep->ze_home, ztn->zn_path) == 0) { + goto done; + } + } + + // We didn't find a node, so make one. And try to + // initialize it. + if ((rv = zt_node_create(&ztn, ep->ze_home)) != 0) { + return (rv); + } + +done: + + ep->ze_ztn = ztn; + if (nni_list_node_active(&ep->ze_link)) { + nni_list_node_remove(&ep->ze_link); + } + nni_list_append(&ztn->zn_eplist, ep); + + (void) ZT_Node_join(ztn->zn_znode, ep->ze_nwid, ztn, NULL); + + if ((cf = ZT_Node_networkConfig(ztn->zn_znode, ep->ze_nwid)) != NULL) { + NNI_ASSERT(cf->nwid == ep->ze_nwid); + ep->ze_maxmtu = cf->mtu; + ep->ze_phymtu = cf->physicalMtu; + ZT_Node_freeQueryResult(ztn->zn_znode, cf); + } + + return (0); +} + +static int +zt_chkopt(int opt, const void *dat, size_t sz) +{ + if (opt == nng_optid_recvmaxsz) { + // We cannot deal with message sizes larger + // than 64k. + return (nni_chkopt_size(dat, sz, 0, 0xffffffffU)); + } + if (opt == nng_optid_zt_home) { + size_t l = nni_strnlen(dat, sz); + if ((l >= sz) || (l >= NNG_MAXADDRLEN)) { + return (NNG_EINVAL); + } + // XXX: should we apply additional security + // checks? home path is not null terminated + return (0); + } + if (opt == nng_optid_zt_ping_count) { + return (nni_chkopt_int(dat, sz, 0, 1000000)); + } + if (opt == nng_optid_zt_ping_time) { + return (nni_chkopt_usec(dat, sz)); + } + return (NNG_ENOTSUP); +} + +static int +zt_tran_init(void) +{ + int rv; + if (((rv = nni_option_register(nng_opt_zt_home, &nng_optid_zt_home)) != + 0) || + ((rv = nni_option_register(nng_opt_zt_node, &nng_optid_zt_node)) != + 0) || + ((rv = nni_option_register(nng_opt_zt_nwid, &nng_optid_zt_nwid)) != + 0) || + ((rv = nni_option_register( + nng_opt_zt_status, &nng_optid_zt_status)) != 0) || + ((rv = nni_option_register(nng_opt_zt_network_name, + &nng_optid_zt_network_name)) != 0) || + ((rv = nni_option_register( + nng_opt_zt_local_port, &nng_optid_zt_local_port)) != 0) || + ((rv = nni_option_register( + nng_opt_zt_ping_count, &nng_optid_zt_ping_count)) != 0) || + ((rv = nni_option_register( + nng_opt_zt_ping_time, &nng_optid_zt_ping_time)) != 0)) { + return (rv); + } + nni_mtx_init(&zt_lk); + NNI_LIST_INIT(&zt_nodes, zt_node, zn_link); + return (0); +} + +static void +zt_tran_fini(void) +{ + nng_optid_zt_home = -1; + nng_optid_zt_nwid = -1; + nng_optid_zt_node = -1; + nng_optid_zt_ping_count = -1; + nng_optid_zt_ping_time = -1; + zt_node *ztn; + + nni_mtx_lock(&zt_lk); + while ((ztn = nni_list_first(&zt_nodes)) != 0) { + nni_list_remove(&zt_nodes, ztn); + ztn->zn_closed = 1; + nni_cv_wake(&ztn->zn_bgcv); + nni_mtx_unlock(&zt_lk); + + zt_node_destroy(ztn); + + nni_mtx_lock(&zt_lk); + } + nni_mtx_unlock(&zt_lk); + + for (int i = 0; i < ZT_STATE_OBJECT_NETWORK_CONFIG; i++) { + if (zt_ephemeral_state[i].len > 0) { + nni_free(zt_ephemeral_state[i].data, + zt_ephemeral_state[i].len); + } + } + NNI_ASSERT(nni_list_empty(&zt_nodes)); + nni_mtx_fini(&zt_lk); +} + +static void +zt_pipe_close(void *arg) +{ + zt_pipe *p = arg; + nni_aio *aio; + + nni_mtx_lock(&zt_lk); + p->zp_closed = 1; + if ((aio = p->zp_user_rxaio) != NULL) { + p->zp_user_rxaio = NULL; + nni_aio_finish_error(aio, NNG_ECLOSED); + } + zt_pipe_send_disc_req(p); + nni_mtx_unlock(&zt_lk); +} + +static void +zt_pipe_fini(void *arg) +{ + zt_pipe *p = arg; + zt_node *ztn = p->zp_ztn; + + nni_aio_fini(p->zp_ping_aio); + + // This tosses the connection details and all state. + nni_mtx_lock(&zt_lk); + nni_idhash_remove(ztn->zn_ports, p->zp_laddr & zt_port_mask); + nni_idhash_remove(ztn->zn_lpipes, p->zp_laddr); + nni_idhash_remove(ztn->zn_rpipes, p->zp_raddr); + nni_idhash_remove(ztn->zn_peers, p->zp_raddr); + nni_mtx_unlock(&zt_lk); + + for (int i = 0; i < zt_recvq; i++) { + zt_fraglist_free(&p->zp_recvq[i]); + } + + NNI_FREE_STRUCT(p); +} + +static int +zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr) +{ + zt_pipe *p; + int rv; + zt_node *ztn = ep->ze_ztn; + int i; + size_t maxfrag; + size_t maxfrags; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + p->zp_ztn = ztn; + p->zp_raddr = raddr; + p->zp_laddr = laddr; + p->zp_proto = ep->ze_proto; + p->zp_nwid = ep->ze_nwid; + p->zp_mtu = ep->ze_phymtu; + p->zp_rcvmax = ep->ze_rcvmax; + p->zp_ping_count = ep->ze_ping_count; + p->zp_ping_time = ep->ze_ping_time; + p->zp_next_msgid = (uint16_t) nni_random(); + p->zp_ping_try = 0; + + if (ep->ze_mode == NNI_EP_MODE_DIAL) { + rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p); + } else { + rv = nni_idhash_insert(ztn->zn_rpipes, raddr, p); + } + if ((rv != 0) || + ((rv = nni_idhash_insert(ztn->zn_peers, p->zp_raddr, p)) != 0) || + ((rv = nni_aio_init(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) { + zt_pipe_fini(p); + } + + // the largest fragment we can accept on this pipe + maxfrag = p->zp_mtu - zt_offset_data_data; + // and the larger fragment count we can accept on this pipe + // (round up) + maxfrags = (p->zp_rcvmax + (maxfrag - 1)) / maxfrag; + + for (i = 0; i < zt_recvq; i++) { + zt_fraglist *fl = &p->zp_recvq[i]; + fl->fl_time = NNI_TIME_ZERO; + fl->fl_msgid = 0; + fl->fl_ready = 0; + fl->fl_missingsz = (maxfrags + 7) / 8; + fl->fl_missing = nni_alloc(fl->fl_missingsz); + if (fl->fl_missing == NULL) { + zt_pipe_fini(p); + return (NNG_ENOMEM); + } + } + + *pipep = p; + return (0); +} + +static void +zt_pipe_send(void *arg, nni_aio *aio) +{ + // As we are sending UDP, and there is no callback to worry + // about, we just go ahead and send out a stream of messages + // synchronously. + zt_pipe *p = arg; + size_t offset; + uint8_t data[ZT_MAX_MTU]; + uint16_t id; + uint16_t nfrags; + uint16_t fragno; + uint16_t fragsz; + size_t bytes; + nni_msg *m; + + nni_mtx_lock(&zt_lk); + if (nni_aio_start(aio, NULL, p) != 0) { + nni_mtx_unlock(&zt_lk); + return; + } + + if (p->zp_closed) { + nni_aio_finish_error(aio, NNG_ECLOSED); + nni_mtx_unlock(&zt_lk); + return; + } + + fragsz = (uint16_t)(p->zp_mtu - zt_offset_data_data); + + if ((m = nni_aio_get_msg(aio)) == NULL) { + nni_aio_finish_error(aio, NNG_EINVAL); + nni_mtx_unlock(&zt_lk); + return; + }; + + bytes = nni_msg_header_len(m) + nni_msg_len(m); + if (bytes >= (0xfffe * fragsz)) { + nni_aio_finish_error(aio, NNG_EMSGSIZE); + nni_mtx_unlock(&zt_lk); + return; + } + // above check means nfrags will fit in 16-bits. + nfrags = (uint16_t)((bytes + (fragsz - 1)) / fragsz); + + // get the next message ID, but skip 0 + if ((id = p->zp_next_msgid++) == 0) { + id = p->zp_next_msgid++; + } + + offset = 0; + fragno = 0; + do { + uint8_t *dest = data + zt_offset_data_data; + size_t room = fragsz; + size_t fraglen = 0; + size_t len; + + // Prepend the header first. + if ((len = nni_msg_header_len(m)) > 0) { + if (len > fragsz) { + // This shouldn't happen! SP headers are + // supposed to be quite small. + nni_aio_finish_error(aio, NNG_EMSGSIZE); + nni_mtx_unlock(&zt_lk); + return; + } + memcpy(dest, nni_msg_header(m), len); + dest += len; + room -= len; + offset += len; + fraglen += len; + nni_msg_header_clear(m); + } + + len = nni_msg_len(m); + if (len > room) { + len = room; + } + memcpy(dest, nni_msg_body(m), len); + + nng_msg_trim(m, len); + NNI_PUT16(data + zt_offset_data_id, id); + NNI_PUT16(data + zt_offset_data_fragsz, fragsz); + NNI_PUT16(data + zt_offset_data_frag, fragno); + NNI_PUT16(data + zt_offset_data_nfrag, nfrags); + offset += len; + fraglen += len; + fragno++; + zt_send(p->zp_ztn, p->zp_nwid, zt_op_data, p->zp_raddr, + p->zp_laddr, data, fraglen + zt_offset_data_data); + } while (nni_msg_len(m) != 0); + + nni_aio_set_msg(aio, NULL); + nni_msg_free(m); + nni_aio_finish(aio, 0, offset); + nni_mtx_unlock(&zt_lk); +} + +static void +zt_pipe_cancel_recv(nni_aio *aio, int rv) +{ + zt_pipe *p = aio->a_prov_data; + nni_mtx_lock(&zt_lk); + if (p->zp_user_rxaio != aio) { + nni_mtx_unlock(&zt_lk); + } + p->zp_user_rxaio = NULL; + nni_mtx_unlock(&zt_lk); + nni_aio_finish_error(aio, rv); +} + +static void +zt_fraglist_clear(zt_fraglist *fl) +{ + nni_msg *msg; + + fl->fl_ready = 0; + fl->fl_msgid = 0; + fl->fl_time = NNI_TIME_ZERO; + if ((msg = fl->fl_msg) != NULL) { + fl->fl_msg = NULL; + nni_msg_free(msg); + } + memset(fl->fl_missing, 0, fl->fl_missingsz); +} + +static void +zt_fraglist_free(zt_fraglist *fl) +{ + zt_fraglist_clear(fl); + nni_free(fl->fl_missing, fl->fl_missingsz); + fl->fl_missing = NULL; +} + +static void +zt_pipe_dorecv(zt_pipe *p) +{ + nni_aio *aio = p->zp_user_rxaio; + nni_time now = nni_clock(); + + if (aio == NULL) { + return; + } + + for (int i = 0; i < zt_recvq; i++) { + zt_fraglist *fl = &p->zp_recvq[i]; + nni_msg * msg; + + if (now > (fl->fl_time + zt_recv_stale)) { + // fragment list is stale, clean it. + zt_fraglist_clear(fl); + continue; + } + if (!fl->fl_ready) { + continue; + } + + // Got data. Let's pass it up. + msg = fl->fl_msg; + fl->fl_msg = NULL; + NNI_ASSERT(msg != NULL); + nni_aio_finish_msg(aio, msg); + zt_fraglist_clear(fl); + return; + } +} + +static void +zt_pipe_recv(void *arg, nni_aio *aio) +{ + zt_pipe *p = arg; + + nni_mtx_lock(&zt_lk); + if (nni_aio_start(aio, zt_pipe_cancel_recv, p) != 0) { + nni_mtx_unlock(&zt_lk); + return; + } + if (p->zp_closed) { + nni_aio_finish_error(aio, NNG_ECLOSED); + } else { + p->zp_user_rxaio = aio; + zt_pipe_dorecv(p); + } + nni_mtx_unlock(&zt_lk); +} + +static uint16_t +zt_pipe_peer(void *arg) +{ + zt_pipe *pipe = arg; + + return (pipe->zp_peer); +} + +static int +zt_getopt_status(zt_node *ztn, uint64_t nwid, void *buf, size_t *szp) +{ + ZT_VirtualNetworkConfig *vcfg; + int status; + + nni_mtx_lock(&zt_lk); + vcfg = ZT_Node_networkConfig(ztn->zn_znode, nwid); + if (vcfg == NULL) { + nni_mtx_unlock(&zt_lk); + return (NNG_ECLOSED); + } + status = vcfg->status; + ZT_Node_freeQueryResult(ztn->zn_znode, vcfg); + nni_mtx_unlock(&zt_lk); + + return (nni_getopt_int(status, buf, szp)); +} + +static int +zt_getopt_network_name(zt_node *ztn, uint64_t nwid, void *buf, size_t *szp) +{ + ZT_VirtualNetworkConfig *vcfg; + int rv; + + nni_mtx_lock(&zt_lk); + vcfg = ZT_Node_networkConfig(ztn->zn_znode, nwid); + if (vcfg == NULL) { + nni_mtx_unlock(&zt_lk); + return (NNG_ECLOSED); + } + rv = nni_getopt_str(vcfg->name, buf, szp); + ZT_Node_freeQueryResult(ztn->zn_znode, vcfg); + nni_mtx_unlock(&zt_lk); + + return (rv); +} + +static int +zt_pipe_getopt(void *arg, int option, void *buf, size_t *szp) +{ + zt_pipe *p = arg; + int rv; + + if (option == nng_optid_recvmaxsz) { + rv = nni_getopt_size(p->zp_rcvmax, buf, szp); + } else if (option == nng_optid_zt_nwid) { + rv = nni_getopt_u64(p->zp_nwid, buf, szp); + } else if (option == nng_optid_zt_node) { + rv = nni_getopt_u64(p->zp_laddr >> 24, buf, szp); + } else if (option == nng_optid_zt_status) { + rv = zt_getopt_status(p->zp_ztn, p->zp_nwid, buf, szp); + } else if (option == nng_optid_zt_network_name) { + rv = zt_getopt_network_name(p->zp_ztn, p->zp_nwid, buf, szp); + } else { + rv = NNG_ENOTSUP; + } + return (rv); +} + +static void +zt_pipe_cancel_ping(nni_aio *aio, int rv) +{ + nni_aio_finish_error(aio, rv); +} + +static void +zt_pipe_ping_cb(void *arg) +{ + zt_pipe *p = arg; + nni_aio *aio = p->zp_ping_aio; + + nni_mtx_lock(&zt_lk); + if (p->zp_closed || aio == NULL || (p->zp_ping_count == 0) || + (p->zp_ping_time == NNI_TIME_NEVER) || + (p->zp_ping_time == NNI_TIME_ZERO)) { + nni_mtx_unlock(&zt_lk); + return; + } + if (nni_aio_result(aio) != NNG_ETIMEDOUT) { + nni_mtx_unlock(&zt_lk); + return; + } + if (p->zp_ping_try < p->zp_ping_count) { + nni_time now = nni_clock(); + nni_aio_set_timeout(aio, now + p->zp_ping_time); + if (now > (p->zp_last_recv + p->zp_ping_time)) { + // We have to send a ping to keep the session up. + if (nni_aio_start(aio, zt_pipe_cancel_ping, p) == 0) { + p->zp_ping_try++; + zt_pipe_send_ping(p); + } + } else { + // We still need the timer to wake us up in case + // we haven't seen traffic for a while. + nni_aio_start(aio, zt_pipe_cancel_ping, p); + } + } else { + // Close the pipe, but no need to send a reason to the + // peer, it is already AFK. + zt_pipe_close_err(p, NNG_ECLOSED, 0, NULL); + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_pipe_start(void *arg, nni_aio *aio) +{ + zt_pipe *p = arg; + + nni_mtx_lock(&zt_lk); + // send a gratuitous ping, and start the ping interval timer. + if ((p->zp_ping_count > 0) && (p->zp_ping_time != NNI_TIME_ZERO) && + (p->zp_ping_time != NNI_TIME_NEVER) && (p->zp_ping_aio != NULL)) { + p->zp_ping_try = 0; + nni_aio_set_timeout(aio, nni_clock() + p->zp_ping_time); + nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p); + zt_pipe_send_ping(p); + } + nni_aio_finish(aio, 0, 0); + nni_mtx_unlock(&zt_lk); +} + +static void +zt_ep_fini(void *arg) +{ + zt_ep *ep = arg; + nni_aio_stop(ep->ze_creq_aio); + nni_aio_fini(ep->ze_creq_aio); + NNI_FREE_STRUCT(ep); +} + +static int +zt_parsehex(const char **sp, uint64_t *valp, int wildok) +{ + int n; + const char *s = *sp; + char c; + uint64_t v; + + if (wildok && *s == '*') { + *valp = 0; + s++; + *sp = s; + return (0); + } + + for (v = 0, n = 0; (n < 16) && isxdigit(c = tolower(*s)); n++, s++) { + v *= 16; + if (isdigit(c)) { + v += (c - '0'); + } else { + v += ((c - 'a') + 10); + } + } + + *sp = s; + *valp = v; + return (n ? 0 : NNG_EINVAL); +} + +static int +zt_parsedec(const char **sp, uint64_t *valp) +{ + int n; + const char *s = *sp; + char c; + uint64_t v; + + for (v = 0, n = 0; (n < 20) && isdigit(c = *s); n++, s++) { + v *= 10; + v += (c - '0'); + } + *sp = s; + *valp = v; + return (n ? 0 : NNG_EINVAL); +} + +static int +zt_ep_init(void **epp, const char *url, nni_sock *sock, int mode) +{ + zt_ep * ep; + size_t sz; + uint64_t nwid; + uint64_t node; + uint64_t port; + int n; + int rv; + char c; + const char *u; + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + + // URL parsing... + // URL is form zt://<nwid>[/<remoteaddr>]:<port> + // The <remoteaddr> part is required for remote dialers, but is + // not used at all for listeners. (We have no notion of binding + // to different node addresses.) + ep->ze_mode = mode; + ep->ze_maxmtu = ZT_MAX_MTU; + ep->ze_phymtu = ZT_MIN_MTU; + ep->ze_aio = NULL; + ep->ze_ping_count = zt_ping_count; + ep->ze_ping_time = zt_ping_time; + ep->ze_proto = nni_sock_proto(sock); + sz = sizeof(ep->ze_url); + + nni_aio_list_init(&ep->ze_aios); + + if ((strncmp(url, "zt://", strlen("zt://")) != 0) || + (nni_strlcpy(ep->ze_url, url, sz) >= sz)) { + zt_ep_fini(ep); + return (NNG_EADDRINVAL); + } + rv = nni_aio_init(&ep->ze_creq_aio, zt_ep_conn_req_cb, ep); + if (rv != 0) { + zt_ep_fini(ep); + return (rv); + } + + u = url + strlen("zt://"); + // Parse the URL. + + switch (mode) { + case NNI_EP_MODE_DIAL: + // We require zt://<nwid>/<remotenode>:<port> + // The remote node must be a 40 bit address + // (max), and we require a non-zero port to + // connect to. + if ((zt_parsehex(&u, &nwid, 0) != 0) || (*u++ != '/') || + (zt_parsehex(&u, &node, 1) != 0) || + (node > 0xffffffffffull) || (*u++ != ':') || + (zt_parsedec(&u, &port) != 0) || (*u != '\0') || + (port > zt_max_port) || (port == 0)) { + return (NNG_EADDRINVAL); + } + ep->ze_raddr = node; + ep->ze_raddr <<= 24; + ep->ze_raddr |= port; + ep->ze_laddr = 0; + break; + case NNI_EP_MODE_LISTEN: + // Listen mode is just zt://<nwid>:<port>. The + // port may be zero in this case, to indicate + // that the server should allocate an ephemeral + // port. We do allow the same form of URL including + // the node address, but that must be zero, a wild + // card, + // or our own node address. + if (zt_parsehex(&u, &nwid, 0) != 0) { + return (NNG_EADDRINVAL); + } + node = 0; + // Look for optional node address. + if (*u == '/') { + u++; + if (zt_parsehex(&u, &node, 1) != 0) { + return (NNG_EADDRINVAL); + } + } + if ((*u++ != ':') || (zt_parsedec(&u, &port) != 0) || + (*u != '\0') || (port > zt_max_port)) { + return (NNG_EADDRINVAL); + } + ep->ze_laddr = node; + ep->ze_laddr <<= 24; + ep->ze_laddr |= port; + ep->ze_raddr = 0; + break; + default: + NNI_ASSERT(0); + break; + } + + ep->ze_nwid = nwid; + + nni_mtx_lock(&zt_lk); + rv = zt_node_find(ep); + nni_mtx_unlock(&zt_lk); + + if (rv != 0) { + zt_ep_fini(ep); + return (rv); + } + + *epp = ep; + return (0); +} + +static void +zt_ep_close(void *arg) +{ + zt_ep * ep = arg; + zt_node *ztn; + nni_aio *aio; + + nni_aio_cancel(ep->ze_creq_aio, NNG_ECLOSED); + + // Cancel any outstanding user operation(s) - they should have + // been aborted by the above cancellation, but we need to be + // sure, as the cancellation callback may not have run yet. + + nni_mtx_lock(&zt_lk); + while ((aio = nni_list_first(&ep->ze_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + + // Endpoint framework guarantees to only call us once, + // and to not call other things while we are closed. + ztn = ep->ze_ztn; + // If we're on the ztn node list, pull us off. + if (ztn != NULL) { + nni_list_node_remove(&ep->ze_link); + nni_idhash_remove(ztn->zn_ports, ep->ze_laddr & zt_port_mask); + nni_idhash_remove(ztn->zn_eps, ep->ze_laddr); + } + + // XXX: clean up the pipe if a dialer + + nni_mtx_unlock(&zt_lk); +} + +static int +zt_ep_bind_locked(zt_ep *ep) +{ + int rv; + uint64_t port; + uint64_t node; + zt_node *ztn; + + // If we haven't already got a ZT node, get one. + if ((ztn = ep->ze_ztn) == NULL) { + if ((rv = zt_node_find(ep)) != 0) { + return (rv); + } + ztn = ep->ze_ztn; + } + + node = ep->ze_laddr >> 24; + if ((node != 0) && (node != ztn->zn_self)) { + // User requested node id, but it doesn't match our + // own. + return (NNG_EADDRINVAL); + } + + if ((ep->ze_laddr & zt_port_mask) == 0) { + // ask for an ephemeral port + if ((rv = nni_idhash_alloc(ztn->zn_ports, &port, ep)) != 0) { + return (rv); + } + NNI_ASSERT(port & zt_ephemeral); + } else { + void *conflict; + // make sure port requested is free. + port = ep->ze_laddr & zt_port_mask; + + if (nni_idhash_find(ztn->zn_ports, port, &conflict) == 0) { + return (NNG_EADDRINUSE); + } + if ((rv = nni_idhash_insert(ztn->zn_ports, port, ep)) != 0) { + return (rv); + } + } + NNI_ASSERT(port <= zt_max_port); + NNI_ASSERT(port > 0); + + ep->ze_laddr = ztn->zn_self; + ep->ze_laddr <<= 24; + ep->ze_laddr |= port; + + if ((rv = nni_idhash_insert(ztn->zn_eps, ep->ze_laddr, ep)) != 0) { + nni_idhash_remove(ztn->zn_ports, port); + return (rv); + } + + return (0); +} + +static int +zt_ep_bind(void *arg) +{ + int rv; + zt_ep *ep = arg; + + nni_mtx_lock(&zt_lk); + rv = zt_ep_bind_locked(ep); + nni_mtx_unlock(&zt_lk); + + return (rv); +} + +static void +zt_ep_cancel(nni_aio *aio, int rv) +{ + zt_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&zt_lk); + if (nni_aio_list_active(aio)) { + if (ep->ze_aio != NULL) { + nni_aio_cancel(ep->ze_aio, rv); + } + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_ep_doaccept(zt_ep *ep) +{ + // Call with ep lock held. + nni_time now; + zt_pipe *p; + int rv; + + now = nni_clock(); + // Consume any timedout connect requests. + while (ep->ze_creq_tail != ep->ze_creq_head) { + zt_creq creq; + nni_aio *aio; + + creq = ep->ze_creqs[ep->ze_creq_tail % zt_listenq]; + // Discard old connection requests. + if (creq.cr_expire < now) { + ep->ze_creq_tail++; + continue; + } + + if ((aio = nni_list_first(&ep->ze_aios)) == NULL) { + // No outstanding accept. We're done. + break; + } + + // We have both conn request, and a place to accept it. + + // Advance the tail. + ep->ze_creq_tail++; + + // We remove this AIO. This keeps it from being canceled. + nni_aio_list_remove(aio); + + rv = zt_pipe_init(&p, ep, creq.cr_raddr, ep->ze_laddr); + if (rv != 0) { + zt_send_err(ep->ze_ztn, ep->ze_nwid, creq.cr_raddr, + ep->ze_laddr, zt_err_unknown, + "Failed creating pipe"); + nni_aio_finish_error(aio, rv); + continue; + } + p->zp_peer = creq.cr_proto; + + zt_pipe_send_conn_ack(p); + nni_aio_finish_pipe(aio, p); + } +} + +static void +zt_ep_accept(void *arg, nni_aio *aio) +{ + zt_ep *ep = arg; + + nni_mtx_lock(&zt_lk); + if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) { + nni_aio_list_append(&ep->ze_aios, aio); + zt_ep_doaccept(ep); + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_ep_conn_req_cancel(nni_aio *aio, int rv) +{ + // We don't have much to do here. The AIO will have been + // canceled as a result of the "parent" AIO canceling. + nni_aio_finish_error(aio, rv); +} + +static void +zt_ep_conn_req_cb(void *arg) +{ + zt_ep * ep = arg; + zt_pipe *p; + nni_aio *aio = ep->ze_creq_aio; + nni_aio *uaio; + int rv; + + NNI_ASSERT(ep->ze_mode == NNI_EP_MODE_DIAL); + + nni_mtx_lock(&zt_lk); + rv = nni_aio_result(aio); + switch (rv) { + case 0: + // Already canceled, or already handled? + if (((uaio = nni_list_first(&ep->ze_aios)) == NULL) || + ((p = nni_aio_get_pipe(aio)) == NULL)) { + nni_mtx_unlock(&zt_lk); + return; + } + ep->ze_creq_try = 0; + nni_aio_list_remove(uaio); + nni_aio_finish_pipe(uaio, p); + nni_mtx_unlock(&zt_lk); + return; + + case NNG_ETIMEDOUT: + if (ep->ze_creq_try <= zt_conn_attempts) { + // Timed out, but we can try again. + ep->ze_creq_try++; + nni_aio_set_timeout( + aio, nni_clock() + zt_conn_interval); + nni_aio_start(aio, zt_ep_conn_req_cancel, ep); + zt_ep_send_conn_req(ep); + nni_mtx_unlock(&zt_lk); + return; + } + break; + } + + // These are failure modes. Either we timed out too many + // times, or an error occurred. + + ep->ze_creq_try = 0; + while ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { + nni_aio_list_remove(uaio); + nni_aio_finish_error(uaio, rv); + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_ep_connect(void *arg, nni_aio *aio) +{ + zt_ep * ep = arg; + + // We bind locally. We'll use the address later when we give + // it to the pipe, but this allows us to receive the initial + // ack back from the server. (This gives us an ephemeral + // address to work with.) + nni_mtx_lock(&zt_lk); + + if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) { + nni_time now = nni_clock(); + int rv; + + // Clear the port so we get an ephemeral port. + ep->ze_laddr &= ~((uint64_t) zt_port_mask); + + if ((rv = zt_ep_bind_locked(ep)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&zt_lk); + return; + } + + if ((ep->ze_raddr >> 24) == 0) { + ep->ze_raddr |= (ep->ze_ztn->zn_self << 24); + } + nni_aio_list_append(&ep->ze_aios, aio); + + ep->ze_creq_try = 1; + + nni_aio_set_timeout(ep->ze_creq_aio, now + zt_conn_interval); + // This can't fail -- the only way the ze_creq_aio gets + // terminated would have required us to have also + // canceled the user AIO and held the lock. + (void) nni_aio_start( + ep->ze_creq_aio, zt_ep_conn_req_cancel, ep); + + // We send out the first connect message; it we are not + // yet attached to the network the message will be dropped. + zt_ep_send_conn_req(ep); + } + nni_mtx_unlock(&zt_lk); +} + +static int +zt_ep_setopt(void *arg, int opt, const void *data, size_t size) +{ + zt_ep *ep = arg; + int i; + int rv = NNG_ENOTSUP; + + if (opt == nng_optid_recvmaxsz) { + nni_mtx_lock(&zt_lk); + rv = nni_setopt_size( + &ep->ze_rcvmax, data, size, 0, 0xffffffffu); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_home) { + // XXX: check to make sure not started... + i = nni_strnlen((const char *) data, size); + if ((i >= size) || (i >= NNG_MAXADDRLEN)) { + return (NNG_EINVAL); + } + nni_mtx_lock(&zt_lk); + nni_strlcpy(ep->ze_home, data, sizeof(ep->ze_home)); + rv = zt_node_find(ep); + if (rv != 0) { + ep->ze_ztn = NULL; + } + nni_mtx_unlock(&zt_lk); + rv = 0; + } else if (opt == nng_optid_zt_ping_count) { + nni_mtx_lock(&zt_lk); + rv = + nni_setopt_int(&ep->ze_ping_count, data, size, 0, 1000000); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_ping_time) { + nni_mtx_lock(&zt_lk); + rv = nni_setopt_usec(&ep->ze_ping_time, data, size); + nni_mtx_unlock(&zt_lk); + } + return (rv); +} + +static int +zt_ep_getopt(void *arg, int opt, void *data, size_t *szp) +{ + zt_ep *ep = arg; + int rv = NNG_ENOTSUP; + + if (opt == nng_optid_recvmaxsz) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_size(ep->ze_rcvmax, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_home) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_str(ep->ze_home, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_node) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_u64(ep->ze_ztn->zn_self, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_nwid) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_u64(ep->ze_nwid, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_ping_count) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_int(ep->ze_ping_count, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_ping_time) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_usec(ep->ze_ping_time, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_local_port) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_int( + (int) (ep->ze_laddr & zt_port_mask), data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_network_name) { + rv = + zt_getopt_network_name(ep->ze_ztn, ep->ze_nwid, data, szp); + } else if (opt == nng_optid_zt_status) { + rv = zt_getopt_status(ep->ze_ztn, ep->ze_nwid, data, szp); + } + return (rv); +} + +static nni_tran_pipe zt_pipe_ops = { + .p_fini = zt_pipe_fini, + .p_start = zt_pipe_start, + .p_send = zt_pipe_send, + .p_recv = zt_pipe_recv, + .p_close = zt_pipe_close, + .p_peer = zt_pipe_peer, + .p_getopt = zt_pipe_getopt, +}; + +static nni_tran_ep zt_ep_ops = { + .ep_init = zt_ep_init, + .ep_fini = zt_ep_fini, + .ep_connect = zt_ep_connect, + .ep_bind = zt_ep_bind, + .ep_accept = zt_ep_accept, + .ep_close = zt_ep_close, + .ep_setopt = zt_ep_setopt, + .ep_getopt = zt_ep_getopt, +}; + +// This is the ZeroTier transport linkage, and should be the +// only global symbol in this entire file. +static struct nni_tran zt_tran = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "zt", + .tran_ep = &zt_ep_ops, + .tran_pipe = &zt_pipe_ops, + .tran_chkopt = zt_chkopt, + .tran_init = zt_tran_init, + .tran_fini = zt_tran_fini, +}; + +int +nng_zt_register(void) +{ + return (nni_tran_register(&zt_tran)); +} + +#endif diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 0c0973b2..fdfd6055 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -128,6 +128,10 @@ add_nng_test(device 5) add_nng_test(errors 2) add_nng_test(pair1 5) +if (NNG_HAVE_ZEROTIER) + add_nng_test(zt 60) +endif() + # compatbility tests add_nng_compat_test(compat_block 5) add_nng_compat_test(compat_bug777 5) diff --git a/tests/sock.c b/tests/sock.c index 21b12792..6c911250 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -182,6 +182,7 @@ TestMain("Socket Operations", { &sz) == 0); So(v == 0); + sz = 0; So(nng_getopt(s1, nng_optid_reconnmaxt, &v, &sz) == 0); So(v == 0); diff --git a/tests/trantest.h b/tests/trantest.h index afb5a881..2e5f859d 100644 --- a/tests/trantest.h +++ b/tests/trantest.h @@ -137,6 +137,8 @@ trantest_send_recv(trantest *tt) So(nng_dial(tt->reqsock, tt->addr, &d, 0) == 0); So(d != 0); + nng_usleep(20000); // listener may be behind slightly + send = NULL; So(nng_msg_alloc(&send, 0) == 0); So(send != NULL); diff --git a/tests/zt.c b/tests/zt.c new file mode 100644 index 00000000..fe6023a1 --- /dev/null +++ b/tests/zt.c @@ -0,0 +1,195 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "trantest.h" + +extern int nng_zt_register(void); +extern const char *nng_opt_zt_home; +extern int nng_optid_zt_home; +extern int nng_optid_zt_node; +extern int nng_optid_zt_status; +extern int nng_optid_zt_network_name; +extern int nng_zt_status_ok; + +// zerotier tests. + +// This network is an open network setup exclusively for nng testing. +// Do not attach to it in production. +#define NWID "a09acf02337b057b" + +#ifdef _WIN32 + +int +mkdir(const char *path, int mode) +{ + CreateDirectory(path, NULL); +} +#else +#include <sys/stat.h> +#include <unistd.h> +#endif // WIN32 + +TestMain("ZeroTier Transport", { + + char path1[NNG_MAXADDRLEN] = "/tmp/zt_server"; + char path2[NNG_MAXADDRLEN] = "/tmp/zt_client"; + unsigned port; + + port = 5555; + + Convey("We can register the zero tier transport", + { So(nng_zt_register() == 0); }); + + Convey("We can create a zt listener", { + nng_listener l; + nng_socket s; + char addr[NNG_MAXADDRLEN]; + int rv; + + snprintf(addr, sizeof(addr), "zt://" NWID ":%u", port); + + So(nng_pair_open(&s) == 0); + Reset({ nng_close(s); }); + + So(nng_listener_create(&l, s, addr) == 0); + + Convey("We can lookup zerotier home option id", { + So(nng_optid_zt_home > 0); + So(nng_option_lookup(nng_opt_zt_home) == + nng_optid_zt_home); + }); + + Convey("And it can be started...", { + + mkdir(path1, 0700); + + So(nng_listener_setopt(l, nng_optid_zt_home, path1, + strlen(path1) + 1) == 0); + + So(nng_listener_start(l, 0) == 0); + }) + }); + + Convey("We can create a zt dialer", { + nng_dialer d; + nng_socket s; + char addr[NNG_MAXADDRLEN]; + int rv; + // uint64_t node = 0xb000072fa6ull; // my personal host + uint64_t node = 0x2d2f619cccull; // my personal host + + snprintf(addr, sizeof(addr), "zt://" NWID "/%llx:%u", + (unsigned long long) node, port); + + So(nng_pair_open(&s) == 0); + Reset({ nng_close(s); }); + + So(nng_dialer_create(&d, s, addr) == 0); + + Convey("We can lookup zerotier home option id", { + So(nng_optid_zt_home > 0); + So(nng_option_lookup(nng_opt_zt_home) == + nng_optid_zt_home); + }); + }); + + Convey("We can create an ephemeral listener", { + nng_dialer d; + nng_listener l; + nng_socket s; + char addr[NNG_MAXADDRLEN]; + int rv; + uint64_t node1 = 0; + uint64_t node2 = 0; + + snprintf(addr, sizeof(addr), "zt://" NWID ":%u", port); + + So(nng_pair_open(&s) == 0); + Reset({ nng_close(s); }); + + So(nng_listener_create(&l, s, addr) == 0); + + So(nng_listener_getopt_usec(l, nng_optid_zt_node, &node1) == + 0); + So(node1 != 0); + + Convey("Network name & status options work", { + char name[NNG_MAXADDRLEN]; + size_t namesz; + int status; + + namesz = sizeof(name); + nng_usleep(10000000); + So(nng_listener_getopt(l, nng_optid_zt_network_name, + name, &namesz) == 0); + So(strcmp(name, "nng_test_open") == 0); + So(nng_listener_getopt_int( + l, nng_optid_zt_status, &status) == 0); + So(status == nng_zt_status_ok); + }); + Convey("Connection refused works", { + snprintf(addr, sizeof(addr), "zt://" NWID "/%llx:%u", + (unsigned long long) node1, 42u); + So(nng_dialer_create(&d, s, addr) == 0); + So(nng_dialer_getopt_usec( + d, nng_optid_zt_node, &node2) == 0); + So(node2 == node1); + So(nng_dialer_start(d, 0) == NNG_ECONNREFUSED); + }); + }); + + Convey("We can create a zt pair (dialer & listener)", { + nng_dialer d; + nng_listener l; + nng_socket s1; + nng_socket s2; + char addr1[NNG_MAXADDRLEN]; + char addr2[NNG_MAXADDRLEN]; + int rv; + uint64_t node; + + port = 9944; + // uint64_t node = 0xb000072fa6ull; // my personal host + + snprintf(addr1, sizeof(addr1), "zt://" NWID ":%u", port); + + So(nng_pair_open(&s1) == 0); + So(nng_pair_open(&s2) == 0); + Reset({ + nng_close(s1); + // This sleep allows us to ensure disconnect + // messages work. + nng_usleep(1000000); + nng_close(s2); + }); + + So(nng_listener_create(&l, s1, addr1) == 0); + So(nng_listener_setopt( + l, nng_optid_zt_home, path1, strlen(path1) + 1) == 0); + + So(nng_listener_start(l, 0) == 0); + node = 0; + So(nng_listener_getopt_usec(l, nng_optid_zt_node, &node) == 0); + So(node != 0); + + snprintf(addr2, sizeof(addr2), "zt://" NWID "/%llx:%u", + (unsigned long long) node, port); + So(nng_dialer_create(&d, s2, addr2) == 0); + So(nng_dialer_setopt( + d, nng_optid_zt_home, path2, strlen(path2) + 1) == 0); + So(nng_dialer_start(d, 0) == 0); + + }); + + trantest_test_all("zt://" NWID "/*:%u"); + + nng_fini(); +}) |
