diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-21 16:11:16 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-09-26 13:26:18 -0700 |
| commit | 86a96e5bf1b207a8b1aa925e1d9f73ce834505b8 (patch) | |
| tree | 53e7d9042cf8d72c723767cf31ef950594cbf736 /src | |
| parent | 52118e4dcbc105d2b83c774e001926aceb978488 (diff) | |
| download | nng-86a96e5bf1b207a8b1aa925e1d9f73ce834505b8.tar.gz nng-86a96e5bf1b207a8b1aa925e1d9f73ce834505b8.tar.bz2 nng-86a96e5bf1b207a8b1aa925e1d9f73ce834505b8.zip | |
ZeroTier transport implementation (work funded by Capitar IT Group BV)
The ZeroTier transport is experimental at this point, and not enabled
by default. It does not work with Windows yet (the Windows platform
needs UDP support first.)
Configure with -DNNG_ENABLE_ZEROTIER=yes -DNNG_ZEROTIER_SOUCE=<path>
The <path> must point to a dev branch of the ZeroTierOne source tree,
checked out, and built with a libzerotiercore.a in the top directory,
and a ZeroTierOne.h header located at include. The build will add
-lc++ to the compile, as the ZeroTier core functionality is written in
C++ and needs some runtime support (e.g. new, delete, etc.)
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 8 | ||||
| -rw-r--r-- | src/core/platform.h | 27 | ||||
| -rw-r--r-- | src/nng.c | 13 | ||||
| -rw-r--r-- | src/nng.h | 43 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 2 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.adoc | 383 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 2677 |
7 files changed, 3137 insertions, 16 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1d616e95..17b14d2e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -99,6 +99,7 @@ set (NNG_SOURCES transport/ipc/ipc.c transport/tcp/tcp.c + ) if (NNG_PLATFORM_POSIX) @@ -140,7 +141,12 @@ if (NNG_PLATFORM_WINDOWS) ) endif() -include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src) +if (NNG_ENABLE_ZEROTIER) + set (NNG_SOURCES ${NNG_SOURCES} transport/zerotier/zerotier.c) +endif() + +include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src + ${NNG_REQUIRED_INCLUDES}) # Provide same folder structure in IDE as on disk foreach (f ${NNG_SOURCES}) diff --git a/src/core/platform.h b/src/core/platform.h index 02ea9a11..7bfb370a 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -350,6 +350,33 @@ extern void nni_plat_pipe_clear(int); // routine. extern void nni_plat_pipe_close(int, int); +// +// File/Store Support +// +// Some transports require a persistent storage for things like +// key material, etc. Generally, these are all going to be relatively +// small objects (such as certificates), so we ony require a synchronous +// implementation from platforms. This Key-Value API is intended to +// to support using the Key's as filenames, and keys will consist of +// only these characters: [0-9a-zA-Z._-]. The directory used should be +// determined by using an environment variable (NNG_STATE_DIR), or +// using some other application-specific method. +// + +// nni_plat_file_put writes the named file, with the provided data, +// and the given size. If the file already exists it is overwritten. +// The permissions on the file should be limited to read and write +// access by the entity running the application only. +extern int nni_plat_file_put(const char *, const void *, int); + +// nni_plat_file_get reads the entire named file, allocating storage +// to receive the data and returning the data and the size in the +// reference arguments. +extern int nni_plat_file_get(const char *, void **, int *); + +// nni_plat_file_delete deletes the named file. +extern int nni_plat_file_delete(const char *); + // Actual platforms we support. This is included up front so that we can // get the specific types that are supplied by the platform. #if defined(NNG_PLATFORM_POSIX) @@ -19,6 +19,7 @@ // Pretty much every function calls the nni_platform_init to check against // fork related activity. +#include <stdio.h> #include <string.h> void @@ -650,6 +651,7 @@ static const struct { const char * nng_strerror(int num) { + static char unknownerrbuf[32]; for (int i = 0; nni_errors[i].msg != NULL; i++) { if (nni_errors[i].code == num) { return (nni_errors[i].msg); @@ -660,7 +662,16 @@ nng_strerror(int num) return (nni_plat_strerror(num & ~NNG_ESYSERR)); } - return ("Unknown error"); + if (num & NNG_ETRANERR) { + static char tranerrbuf[32]; + (void) snprintf(tranerrbuf, sizeof(tranerrbuf), + "Transport error #%d", num & ~NNG_ETRANERR); + return (tranerrbuf); + } + + (void) snprintf( + unknownerrbuf, sizeof(unknownerrbuf), "Unknown error #%d", num); + return (unknownerrbuf); } int @@ -557,6 +557,23 @@ NNG_DECL void nng_thread_destroy(void *); // Error codes. These may happen to align to errnos used on your platform, // but do not count on this. +// +// NNG_SYSERR is a special code, which allows us to wrap errors from the +// underlying operating system. We generally prefer to map errors to one +// of the above, but if we cannot, then we just encode an error this way. +// The bit is large enough to accommodate all known UNIX and Win32 error +// codes. We try hard to match things semantically to one of our standard +// errors. For example, a connection reset or aborted we treat as a +// closed connection, because that's basically what it means. (The remote +// peer closed the connection.) For certain kinds of resource exhaustion +// we treat it the same as memory. But for files, etc. that's OS-specific, +// and we use the generic below. Some of the above error codes we use +// internally, and the application should never see (e.g. NNG_EINTR). +// +// NNG_ETRANERR is like ESYSERR, but is used to wrap transport specific +// errors, from different transports. It should only be used when none +// of the other options are available. + enum nng_errno_enum { NNG_EINTR = 1, NNG_ENOMEM = 2, @@ -582,21 +599,11 @@ enum nng_errno_enum { NNG_ENOSPC = 22, NNG_EEXIST = 23, NNG_EINTERNAL = 24, + NNG_ETRANSPORT = 25, + NNG_ESYSERR = 0x10000000, + NNG_ETRANERR = 0x20000000, }; -// NNG_SYSERR is a special code, which allows us to wrap errors from the -// underlyuing operating system. We generally prefer to map errors to one -// of the above, but if we cannot, then we just encode an error this way. -// The bit is large enough to accommodate all known UNIX and Win32 error -// codes. We try hard to match things semantically to one of our standard -// errors. For example, a connection reset or aborted we treat as a -// closed connection, because that's basically what it means. (The remote -// peer closed the connection.) For certain kinds of resource exhaustion -// we treat it the same as memory. But for files, etc. that's OS-specific, -// and we use the generic below. Some of the above error codes we use -// internally, and the application should never see (e.g. NNG_EINTR). -#define NNG_ESYSERR (0x10000000) - // Maximum length of a socket address. This includes the terminating NUL. // This limit is built into other implementations, so do not change it. #define NNG_MAXADDRLEN (128) @@ -627,9 +634,17 @@ struct nng_sockaddr_in { uint16_t sa_port; uint32_t sa_addr; }; + +struct nng_sockaddr_zt { + uint64_t sa_nwid; + uint64_t sa_nodeid; + uint32_t sa_port; +}; + typedef struct nng_sockaddr_in nng_sockaddr_in; typedef struct nng_sockaddr_in nng_sockaddr_udp; typedef struct nng_sockaddr_in nng_sockaddr_tcp; +typedef struct nng_sockaddr_zt nng_sockaddr_zt; typedef struct nng_sockaddr { union { @@ -637,6 +652,7 @@ typedef struct nng_sockaddr { nng_sockaddr_path s_path; nng_sockaddr_in6 s_in6; nng_sockaddr_in s_in; + nng_sockaddr_zt s_zt; } s_un; } nng_sockaddr; @@ -646,6 +662,7 @@ enum nng_sockaddr_family { NNG_AF_IPC = 2, NNG_AF_INET = 3, NNG_AF_INET6 = 4, + NNG_AF_ZT = 5, // ZeroTier }; #ifdef __cplusplus diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 552730da..05f7bea1 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -210,7 +210,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) } // UDP opens can actually run synchronously. - if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) { + if ((udp = NNI_ALLOC_STRUCT(udp)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&udp->udp_mtx); diff --git a/src/transport/zerotier/zerotier.adoc b/src/transport/zerotier/zerotier.adoc new file mode 100644 index 00000000..dc838420 --- /dev/null +++ b/src/transport/zerotier/zerotier.adoc @@ -0,0 +1,383 @@ +ZeroTier Mapping for Scalability Protocols +=========================================== + +sp-zerotier-mapping-06 +~~~~~~~~~~~~~~~~~~~~~~ + +Abstract +-------- + +This document defines the ZeroTier mapping for scalability protocols. + +Status of This Memo +------------------- + +This is the third draft document, and is intended to guide early +development efforts. Nothing here is finalized yet. + +Copyright Notice +---------------- + +Copyright 2017 Garrett D'Amore <garrett@damore.org> + +Copyright 2017 Capitar IT Group BV <info@capitar.com> + +At this point, all rights are reserved. (Note that we do intend to +release this under a liberal reuse license once it stabilizes a bit.) + +Underlying protocol +------------------- + +ZeroTier expresses an 802.3 style layer 2, where frames maybe exchanged as if +they were Ethernet frames. Virtual broadcast domains are created within a +numbered "network", and frames may then be exchanged with any peers on that +network. + +Frames may arrive in any order, or be lost, just a with Ethernet +(best effort delivery), but they are strongly protected by a +cryptographic checksum, so frames that do arrive will be uncorrupted. +Furthermore, ZeroTier guarantees that a given frame will be received +at most once. + +Each application on a ZeroTier network has its own address, called a +ZeroTier ID (`ZTID`), which is globally unique -- this is generated +from a hash of the public key associated with the application. + +A given application may participate in multiple ZeroTier networks. + +Sharing of ZeroTier IDs between applications, as well as use of multiple +ZTIDs within a single application, as well as management of the associated +ZeroTier-specific state is out of scope for this document. + +ZeroTier networks have a standard MTU of 2800 bytes, but over typical +public networks an "optimum" MTU of 1400 bytes is used. +ZeroTier may be configured to have larger MTUs, but typically this involves +extensive reassembly at underlying layers, and implementations *SHOULD* +use the optimum MTU advertised by the ZeroTier implementation. + + +Packet layout +~~~~~~~~~~~~~ + +Each SP message sent over ZeroTier is comprised of one or +more fragments, where each fragment is mapped to a single underlying +ZeroTier L2 frame. We use the EtherType field of 0x0901 to indicate +SP over ZeroTier protocol (number to be registered with IEEE). + +The ZeroTier L2 payload shall be encoded with a header as follows: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | op | flags | version | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | reserved | destination port | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | reserved | source port | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | op-specific payload... + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +All numeric fields are in big-endian byte order. Note that ZeroTier +APIs present this as the L2 payload, but ZeroTier itself may prepend +additional data such as the Ethernet type, and source and destination +MAC addresses, as well as ZeroTier specific headers. The details of +such headers are out of scope for this document. + +As above, the start of each frame is just as a normal Ethernet payload. +The Ethernet type (ethertype) we use for these frames is 0x901, with +a VLAN ID of 0. + +The `op` is a field that indicates the type of message being sent. The +following values are defined: `DATA` (0x00), `CONN-REQ` (0x10), +`CONN-ACK` (0x12), `DISC` (0x20), `PING` (0x30), `PONG` (0x32), +and `ERR` (0x40). These are discussed further below. Implementations +*MUST* discard messages where the `op` is not one of these. + +The `flags` field is reserved for future use, and *MUST* be zero. +Implementations *MUST* discard frames for which this is not true. + +The `version` byte MUST be set to 0x1. Implementations *MUST* discard +any messages received for any other version. + +The `source port` and `destination port` are used to construct a logical +conversation. These are 24-bits wide, and are discussed further below. +The `reserved` fields must be set to zero. + +The remainder of frame varies depending on the `op` used. + + +The port fields are used to discriminate different uses, allowing one +application to have multiple connections or sockets open. The +purpose is analogous to TCP port numbers, except that instead of the +operating system performing the discrimination the application or +library code must do so. Note that port numbers are 24-bits. This +was chosen to allow a peer to allocate a unique port number for each +local conversation, allowing up to 16 million concurrent conversations. +This also allows a 40-bit node number to be combined with the 24-bit +port number to create a 64-bit unique address. + +The `type` field is the numeric SP protocol ID, in big-endian form. +When receiving a message for a port, if the SP protocol ID does not +match the SP protocol expected on that port, the implementation *MUST* +discard this message. + +Note that it is not by accident that the payload is 32-bit aligned in +this message format. The payload is actually 64-bit aligned. + + +Note that at this time, broadcast and multicast is not supported by +this mapping. (A future update may resolve this.) + +DATA messages +~~~~~~~~~~~~~ + +`DATA` messages carry SP protocol payload data. They can only be sent +on an established session (see `CONN` messages below), and are never +acknowledged (in this version). The op-specific payload they carry +is formed like this: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | message ID | fragment size | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | fragment number | total fragments | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | user data... + +-+-+-+-+-+-+-+- + +All fragments, except for the last, *MUST* be the same size. The fragment +size field carries the size of every fragment, except that the last +fragment may be shorter; however even for the last fragment, the fragment +size *MUST* be the size of the rest of the fragments. This is necessary +to allow a receiver to know the fragment size of the other fragments even +if the final fragment is received before any others. (Typically this may +occur if a message consisting of two fragments arrives with fragments +out of order.) + +The last fragment shall have the fragment number equal to +the total fragments minus one, and the first fragment shall have fragment +number 0. Under typical optimal conditions, with an optimal MTU of 1400 +bytes, the largest message that can be transmitted is approximately 86 MB. +Specifically the limit is (65534 * (1400 - 20)) = 90,436,920 bytes. +(Larger MTUs may be used, if the implementation determines that it is +advantageous to do so. Doing so would necessarily give a larger maximum +message size.) + +However, transmitting such a large message would require sending over +65 thousand fragments, and given the likelihood of fragment loss, and +the lack of acknowledgment, it is likely that the entire message would +be lost. As a result, implementations are encouraged to limit the +amount of data that they send to at most a few megabytes. Implementations +receiving the first fragment can easily calculate the worst case for +the message size (the size of the user payload multiplied by the total +number of fragments), and MAY reply to the sender with an `ERR` message +using the code 0x05, indicating that the message is larger than the +receiver is willing to accept. + +Each fragment for a given message must carry the same `message ID`. +Implementations *MUST* initialize this to a random value when starting +a conversation, and *MUST* increment this each time a new message is sent. +Message IDs of zero are not permitted; implementations *MUST* skip past zero +when incrementing message IDs. + +Implementations may detect the loss of a message by noticing skips in the +message IDs that are received, accounting for the expected skip past zero. + +Note that no field conveys the length of the fragment itself, as +this can be determined from the L2 length -- the user data within +the fragment extends to the end of the L2 payload supplied by ZeroTier. +(And, all fragments other than the final fragment for a message must +therefore have the same length.) + + +CONN-REQ and CONN-ACK messages +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +`CONN-REQ` frames represent a request from an initiator to establish a +session, i.e. a new conversation or connection, and `CONN-ACK` +messages are the normal successful reply from the responder. They both +take the same form, which consists of the usual headers along with the +senders 16-bit (big-endian) SP protocol ID appended: + + 0 1 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SP protocol ID | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +The connection is initiated by the initiator sending this message, +with its own SP protocol ID, with the `op` set to `CONN-REQ`. +The initiator must choose a `source port` number that is not currently +being used with the remote peer. (Most implementations will choose a +a source port that is not used at all. Source port numbers *SHOULD* +be chosen randomly.) + +The responder will acknowledge this by replying with its SP protocol +ID in the 4-byte payload, using the `CONN-ACK` op. Additionally, +the source port number that the responder replies with *MUST* be the +one the intiator requested. + +(Responders will identify the session using the initiators chosen +`source port`, which the initiator *MUST NOT* concurrently use for any +other sessions.) + +Alternatively, a responder *MAY* reject the connection attempt by +sending a suitably formed ERR message (see below). + +If a sender does not receive a reply, it *SHOULD* retry this message +before giving up and reporting an error to the user. It is recommended +that a configurable number of retries and time interval be used. + +Given modern Internet latencies of generally less than 500 ms, resending +up to 12 `CONN-REQ` requests, once every 5 seconds, before giving up seems +reasonable. (These times are somewhat larger to allow for ZeroTier +path discovery to take place; this results in a timeout of approximately +a minute.) + +The initiator *MUST NOT* send any `DATA` messages for a conversation until +it has received an ACK from the other party, and it *MUST* send all further +messages for the conversation to the port number supplied by the responder. + +If a `CONN-REQ` frame is received by a responder for a conversation that already +exists, the responder MUST reply. Further, the source port it replies with, +and the SP protocol IDs MUST be identical to what it first sent. This +ensures that the `CONN-REQ` request is idempotent. + +DISC messages +~~~~~~~~~~~~~ + +DISC messages are used to request a session be terminated. This +notifies the remote sender that no more data will be sent or +accepted, and the session resources may be released. There is no +payload. There is no acknowledgment. + +PING and PONG messages +~~~~~~~~~~~~~~~~~~~~~~ + +In order to keep session state, implementations will generally store +data for each session. In order to prevent a stale session from +consuming these resources forever, and in order to keep underlying +ZeroTier sessions alive, a `PING` message *MAY* be sent to a peer +with whom a session has been established. This message has no payload. + +If the `PING` is is successful, then the responder *MUST* reply with a `PONG` +message. As with `PING`, the `PONG` message carries no payload. + +There is no response to a `PONG` message. + +In the event of an error, an implementation *MAY*_ reply with an `ERR` +message. + +Implementations *SHOULD NOT* initiate `PING` messages if they have either +received other session messages recently. + +Implementations *SHOULD* use a timeout T1 seconds of be used before +initiating a message the first time, and that in the absence of a +reply, up to N further attempts be made, separated by T2 seconds. If +no reply to the Nth attempt is received after T2 seconds have passed, +then the remote peer should be assumed offline or dead, and the +session closed. + +The values for T1, T2, and N *SHOULD* be configurable, with +recommended default values of 60, 10, and 5. With these values, +sessions that appear dead after 2 minutes will be closed, and their +resources reclaimed. + +ERR messages +~~~~~~~~~~~~ + +`ERR` messages indicate a failure in the session, and abruptly +terminate the session. The payload for these messages consists of a +single byte error code, followed by an ASCII message describing the +error (not terminated by zero). This message *MUST NOT* be more than +128 bytes in length. + +The following error codes are defined: + + * 0x01 No party listening at that address or port. + * 0x02 No such session found. + * 0x03 SP protocol ID invalid. + * 0x04 Generic protocol error. + * 0x05 Message size too big. + * 0xff Other uncategorized error. + +Implementations *MUST* discard any session state upon receiving an ERR +message. These messages are not acknowledged. + +Reassembly Guidelines +~~~~~~~~~~~~~~~~~~~~~ + +Implementations *MUST* accept and reassemble fragmented `DATA` messages. +Implementations *MUST* discard fragmented messages of other types. + +Messages larger than the ZeroTier MTU *MUST* be fragmented. + +Implementations *SHOULD* limit the number of unassembled messages +retained for reassembly, to minimize the likelihood of intentional +abuse. It is suggested that at most 2 unassembled messages be +retained. It is further suggested that if 2 or more unfragmented +messages arrive before a message is reassembled, or more than 5 +seconds pass before the reassembly is complete, that the unassembled +fragments be discarded. + + +Ports +~~~~~ + +The port numbers are 24-bit fields, allowing a single ZT ID to +service multiple application layer protocols, which could be treated +as separate end points, or as separate sockets in the application. +The implementation is responsible for discriminating on these and +delivering to the appropriate consumer. + +As with UDP or TCP, it is intended that each party have its own port +number, and that a pair of ports (combined with ZeroTier IDs) be used +to identify a single conversation. + +An SP server should allocate a port for number advertisement. It is +expected clients will generate ephemeral port numbers. + +Implementations are free to choose how to allocate port numbers, but +it is recommended manually configured port numbers are small, with +the high order bit clear, and that numbers > 2^23 (high order bit +set) be used for ephemeral allocations. + +It is recommended that separate short queues (perhaps just one or two +messages long) be kept per local port in implementations, to prevent +head-of-line blocking issues where backpressure on one consumer +(perhaps just a single thread or socket) blocks others. + +URI Format +~~~~~~~~~~ + +The URI scheme used to represent ZeroTier addresses makes use of +ZeroTier IDs, ZeroTier network IDs, and our own 24-bit ports. + +The format shall be `zt://<nwid>/<ztid>:<port>`, where the `<nwid>` +component represents the 64-bit hexadecimal ZeroTier network ID, +the `<ztid>` represents the 40-bit hexadecimal ZeroTier Device ID, +and the `<port>` is the 24-bit port number previously described. + +A responder may elide the `<ztid>/` portion, to just bind to itself, +in which case the format will be `zt://<nwid>/<ztid>:<port>`. + +A port number of 0 may be used when listening to indicate that a random +ephemeral port should be chosen. + +An implementation *MAY* allow the `<ztid>` t0 be replaced with `*` to +indicate that the node's local ZT_ID be used. + +// XXX: the ztid could use DNS names, generating 6PLANE IP addresses, +// and extracting the 10 digit device id from that. Note that there +// is no good way to determine a nwid automatically. The 6PLANE +// address is determined by a non-reversible XOR transform of the +// network id. + +Security Considerations +~~~~~~~~~~~~~~~~~~~~~~~ + +The mapping isn't intended to provide any additional security beyond that +provided by ZeroTier itself. Managing the key materials used by ZeroTier +is implementation-specific, and they must take the appropriate care when +dealing with them. diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c new file mode 100644 index 00000000..40af6e54 --- /dev/null +++ b/src/transport/zerotier/zerotier.c @@ -0,0 +1,2677 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifdef NNG_HAVE_ZEROTIER +#include <ctype.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" + +#ifndef _WIN32 +#include <unistd.h> +#endif + +#include <ZeroTierOne.h> + +const char *nng_opt_zt_home = "zt:home"; +const char *nng_opt_zt_nwid = "zt:nwid"; +const char *nng_opt_zt_node = "zt:node"; +const char *nng_opt_zt_status = "zt:status"; +const char *nng_opt_zt_network_name = "zt:network-name"; +const char *nng_opt_zt_local_port = "zt:local-port"; +const char *nng_opt_zt_ping_time = "zt:ping-time"; +const char *nng_opt_zt_ping_count = "zt:ping-count"; + +int nng_optid_zt_home = -1; +int nng_optid_zt_nwid = -1; +int nng_optid_zt_node = -1; +int nng_optid_zt_status = -1; +int nng_optid_zt_network_name = -1; +int nng_optid_zt_ping_time = -1; +int nng_optid_zt_ping_count = -1; +int nng_optid_zt_local_port = -1; + +// These values are supplied to help folks checking status. They are the +// return values from nng_optid_zt_status. +int nng_zt_status_configuring = ZT_NETWORK_STATUS_REQUESTING_CONFIGURATION; +int nng_zt_status_ok = ZT_NETWORK_STATUS_OK; +int nng_zt_status_denied = ZT_NETWORK_STATUS_ACCESS_DENIED; +int nng_zt_status_notfound = ZT_NETWORK_STATUS_NOT_FOUND; +int nng_zt_status_error = ZT_NETWORK_STATUS_PORT_ERROR; +int nng_zt_status_obsolete = ZT_NETWORK_STATUS_CLIENT_TOO_OLD; + +// ZeroTier Transport. This sits on the ZeroTier L2 network, which itself +// is implemented on top of UDP. This requires the 3rd party +// libzerotiercore library (which is GPLv3!) and platform specific UDP +// functionality to be built in. Note that care must be taken to link +// dynamically if one wishes to avoid making your entire application GPL3. +// (Alternatively ZeroTier offers commercial licenses which may prevent +// this particular problem.) This implementation does not make use of +// certain advanced capabilities in ZeroTier such as more sophisticated +// route management and TCP fallback. You need to have connectivity +// to the Internet to use this. (Or at least to your Planetary root.) +// +// Because ZeroTier takes a while to establish connectivity, it is even +// more important that applicaitons using the ZeroTier transport not +// assume that a connection will be immediately available. It can take +// quite a few seconds for peer-to-peer connectivity to be established. +// +// The ZeroTier transport was funded by Capitar IT Group, BV. +// +// This transport is highly experimental. + +// ZeroTier and UDP are connectionless, but nng is designed around +// connection oriented paradigms. An "unreliable" connection is created +// on top using our own network protocol. The details of this are +// documented in the RFC. + +// Every participant has an "address", which is a 64-bit value constructed +// using the ZT node number in the upper 40-bits, and a 24-bit port number +// in the lower bits. We elect to operate primarily on these addresses, +// but the wire protocol relies on just conveying the 24-bit port along +// with the MAC address (from which the ZT node number can be derived, +// given the network ID.) + +typedef struct zt_pipe zt_pipe; +typedef struct zt_ep zt_ep; +typedef struct zt_node zt_node; +typedef struct zt_frag zt_frag; +typedef struct zt_fraglist zt_fraglist; + +// Port numbers are stored as 24-bit values in network byte order. +#define ZT_GET24(ptr, v) \ + v = (((uint32_t)((uint8_t)(ptr)[0])) << 16) + \ + (((uint32_t)((uint8_t)(ptr)[1])) << 8) + \ + (((uint32_t)(uint8_t)(ptr)[2])) + +#define ZT_PUT24(ptr, u) \ + do { \ + (ptr)[0] = (uint8_t)(((uint32_t)(u)) >> 16); \ + (ptr)[1] = (uint8_t)(((uint32_t)(u)) >> 8); \ + (ptr)[2] = (uint8_t)((uint32_t)(u)); \ + } while (0) + +static const uint16_t zt_ethertype = 0x901; +static const uint8_t zt_version = 0x01; +static const uint32_t zt_ephemeral = 0x800000u; // start of ephemeral ports +static const uint32_t zt_max_port = 0xffffffu; // largest port +static const uint32_t zt_port_mask = 0xffffffu; // mask of valid ports + +// These are compile time tunables for now. +enum zt_tunables { + zt_listenq = 128, // backlog queue length + zt_listen_expire = 60000000, // maximum time in backlog + zt_rcv_bufsize = ZT_MAX_MTU + 128, // max UDP recv + zt_conn_attempts = 12, // connection attempts (default) + zt_conn_interval = 5000000, // between attempts (usec) + zt_udp_sendq = 16, // outgoing UDP queue length + zt_recvq = 2, // max pending recv (per pipe) + zt_recv_stale = 1000000, // frags older than are stale + zt_ping_time = 60000000, // if no traffic, ping time (usec) + zt_ping_count = 5, // max ping attempts before close +}; + +enum zt_op_codes { + zt_op_data = 0x00, // data, final fragment + zt_op_conn_req = 0x10, // connect request + zt_op_conn_ack = 0x12, // connect accepted + zt_op_disc_req = 0x20, // disconnect request (no ack) + zt_op_ping = 0x30, // ping request + zt_op_pong = 0x32, // ping response + zt_op_error = 0x40, // error response +}; + +enum zt_offsets { + zt_offset_op = 0x00, + zt_offset_flags = 0x01, + zt_offset_version = 0x02, // protocol version number (2 bytes) + zt_offset_zero1 = 0x04, // reserved, must be zero (1 byte) + zt_offset_dst_port = 0x05, // destination port (3 bytes) + zt_offset_zero2 = 0x08, // reserved, must be zero (1 byte) + zt_offset_src_port = 0x09, // source port number (3 bytes) + zt_offset_creq_proto = 0x0C, // SP protocol number (2 bytes) + zt_offset_cack_proto = 0x0C, // SP protocol number (2 bytes) + zt_offset_err_code = 0x0C, // error code (1 byte) + zt_offset_err_msg = 0x0D, // error message (string) + zt_offset_data_id = 0x0C, // message ID (2 bytes) + zt_offset_data_fragsz = 0x0E, // fragment size + zt_offset_data_frag = 0x10, // fragment number, first is 1 (2 bytes) + zt_offset_data_nfrag = 0x12, // total fragments (2 bytes) + zt_offset_data_data = 0x14, // user payload + zt_size_headers = 0x0C, // size of headers + zt_size_conn_req = 0x0E, // size of conn_req (connect request) + zt_size_conn_ack = 0x0E, // size of conn_ack (connect reply) + zt_size_disc_req = 0x0C, // size of disc_req (disconnect) + zt_size_ping = 0x0C, // size of ping request + zt_size_pong = 0x0C, // size of ping reply + zt_size_data = 0x14, // size of data message (w/o payload) +}; + +enum zt_errors { + zt_err_refused = 0x01, // Connection refused + zt_err_notconn = 0x02, // Connection does not exit + zt_err_wrongsp = 0x03, // SP protocol mismatch + zt_err_proto = 0x04, // Other protocol errror + zt_err_msgsize = 0x05, // Message to large + zt_err_unknown = 0x06, // Other errors +}; + +// This node structure is wrapped around the ZT_node; this allows us to +// have multiple endpoints referencing the same ZT_node, but also to +// support different nodes (identities) based on different homedirs. +// This means we need to stick these on a global linked list, manage +// them with a reference count, and uniquely identify them using the +// homedir. +struct zt_node { + char zn_path[NNG_MAXADDRLEN]; // ought to be sufficient + ZT_Node * zn_znode; + uint64_t zn_self; + nni_list_node zn_link; + int zn_closed; + nni_plat_udp *zn_udp4; + nni_plat_udp *zn_udp6; + nni_list zn_eplist; + nni_list zn_plist; + nni_idhash * zn_ports; + nni_idhash * zn_eps; + nni_idhash * zn_lpipes; + nni_idhash * zn_rpipes; + nni_idhash * zn_peers; // indexed by remote address + nni_aio * zn_rcv4_aio; + char * zn_rcv4_buf; + nng_sockaddr zn_rcv4_addr; + nni_aio * zn_rcv6_aio; + char * zn_rcv6_buf; + nng_sockaddr zn_rcv6_addr; + nni_thr zn_bgthr; + nni_time zn_bgtime; + nni_cv zn_bgcv; + nni_cv zn_snd6_cv; +}; + +// The fragment list is used to keep track of incoming received +// fragments for reassembly into a complete message. +struct zt_fraglist { + nni_time fl_time; // time first frag was received + uint32_t fl_msgid; // message id + int fl_ready; // we have all messages + unsigned int fl_fragsz; + unsigned int fl_nfrags; + uint8_t * fl_missing; + size_t fl_missingsz; + nni_msg * fl_msg; +}; + +struct zt_pipe { + nni_list_node zp_link; + const char * zp_addr; + zt_node * zp_ztn; + uint64_t zp_nwid; + uint64_t zp_laddr; + uint64_t zp_raddr; + uint16_t zp_peer; + uint16_t zp_proto; + uint16_t zp_next_msgid; + size_t zp_rcvmax; + size_t zp_mtu; + int zp_closed; + nni_aio * zp_user_rxaio; + nni_time zp_last_recv; + zt_fraglist zp_recvq[zt_recvq]; + int zp_ping_try; + int zp_ping_count; + nni_duration zp_ping_time; + nni_aio * zp_ping_aio; +}; + +typedef struct zt_creq zt_creq; +struct zt_creq { + uint64_t cr_expire; + uint64_t cr_raddr; + uint16_t cr_proto; +}; + +struct zt_ep { + nni_list_node ze_link; + char ze_url[NNG_MAXADDRLEN]; + char ze_home[NNG_MAXADDRLEN]; // should be enough + zt_node * ze_ztn; + uint64_t ze_nwid; + int ze_mode; + nni_sockaddr ze_addr; + uint64_t ze_raddr; // remote node address + uint64_t ze_laddr; // local node address + uint16_t ze_proto; + size_t ze_rcvmax; + nni_aio * ze_aio; + nni_aio * ze_creq_aio; + int ze_creq_try; + nni_list ze_aios; + int ze_maxmtu; + int ze_phymtu; + int ze_ping_count; + nni_duration ze_ping_time; + + // Incoming connection requests (server only). We only + // only have "accepted" requests -- that is we won't have an + // established connection/pipe unless the application calls + // accept. Since the "application" is our library, that should + // be pretty much as fast we can run. + zt_creq ze_creqs[zt_listenq]; + int ze_creq_head; + int ze_creq_tail; +}; + +// Locking strategy. At present the ZeroTier core is not reentrant or fully +// threadsafe. (We expect this will be fixed.) Furthermore, there are +// some significant challenges in dealing with locks associated with the +// callbacks, etc. So we take a big-hammer approach, and just use a single +// global lock for everything. We hold this lock when calling into the +// ZeroTier framework. Since ZeroTier has no independent threads, that +// means that it will always hold this lock in its core, and the lock will +// also be held automatically in any of our callbacks. We never hold any +// other locks across ZeroTier core calls. We may not acquire the global +// lock in callbacks (they will already have it held). Any other locks +// can be acquired as long as they are not held during calls into ZeroTier. +// +// This will have a detrimental impact on performance, but to be completely +// honest we don't think anyone will be using the ZeroTier transport in +// performance critical applications; scalability may become a factor for +// large servers sitting in a ZeroTier hub situation. (Then again, since +// only the zerotier procesing is single threaded, it may not +// be that much of a bottleneck -- really depends on how expensive these +// operations are. We can use lockstat or other lock-hotness tools to +// check for this later.) + +static nni_mtx zt_lk; +static nni_list zt_nodes; + +static void zt_ep_send_conn_req(zt_ep *); +static void zt_ep_conn_req_cb(void *); +static void zt_ep_doaccept(zt_ep *); +static void zt_pipe_dorecv(zt_pipe *); +static int zt_pipe_init(zt_pipe **, zt_ep *, uint64_t, uint64_t); +static void zt_pipe_ping_cb(void *); +static void zt_fraglist_clear(zt_fraglist *); +static void zt_fraglist_free(zt_fraglist *); +static void zt_virtual_recv(ZT_Node *, void *, void *, uint64_t, void **, + uint64_t, uint64_t, unsigned int, unsigned int, const void *, + unsigned int); + +static uint64_t +zt_now(void) +{ + // We return msec + return (nni_clock() / 1000); +} + +static void +zt_bgthr(void *arg) +{ + zt_node *ztn = arg; + nni_time now; + + nni_mtx_lock(&zt_lk); + for (;;) { + now = nni_clock(); + + if (ztn->zn_closed) { + break; + } + + if (now < ztn->zn_bgtime) { + nni_cv_until(&ztn->zn_bgcv, ztn->zn_bgtime); + continue; + } + + now /= 1000; // usec -> msec + ZT_Node_processBackgroundTasks(ztn->zn_znode, NULL, now, &now); + + ztn->zn_bgtime = now * 1000; // usec + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_node_resched(zt_node *ztn, uint64_t msec) +{ + ztn->zn_bgtime = msec * 1000; // convert to usec + nni_cv_wake1(&ztn->zn_bgcv); +} + +static void +zt_node_rcv4_cb(void *arg) +{ + zt_node * ztn = arg; + nni_aio * aio = ztn->zn_rcv4_aio; + struct sockaddr_storage sa; + struct sockaddr_in * sin; + nng_sockaddr_in * nsin; + uint64_t now; + + if (nni_aio_result(aio) != 0) { + // Outside of memory exhaustion, we can't really think + // of any reason for this to legitimately fail. + // Arguably we should inject a fallback delay, but for + // now we just carry on. + return; + } + + memset(&sa, 0, sizeof(sa)); + sin = (void *) &sa; + nsin = &ztn->zn_rcv4_addr.s_un.s_in; + sin->sin_family = AF_INET; + sin->sin_port = nsin->sa_port; + sin->sin_addr.s_addr = nsin->sa_addr; + + nni_mtx_lock(&zt_lk); + now = zt_now(); + + // We are not going to perform any validation of the data; we + // just pass this straight into the ZeroTier core. + // XXX: CHECK THIS, if it fails then we have a fatal error with + // the znode, and have to shut everything down. + ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa, + ztn->zn_rcv4_buf, aio->a_count, &now); + + // Schedule background work + zt_node_resched(ztn, now); + + // Schedule another receive. + if (ztn->zn_udp4 != NULL) { + aio->a_niov = 1; + aio->a_iov[0].iov_buf = ztn->zn_rcv4_buf; + aio->a_iov[0].iov_len = zt_rcv_bufsize; + aio->a_addr = &ztn->zn_rcv4_addr; + aio->a_count = 0; + + nni_plat_udp_recv(ztn->zn_udp4, aio); + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_node_rcv6_cb(void *arg) +{ + zt_node * ztn = arg; + nni_aio * aio = ztn->zn_rcv6_aio; + struct sockaddr_storage sa; + struct sockaddr_in6 * sin6; + struct nng_sockaddr_in6 *nsin6; + uint64_t now; + + if (nni_aio_result(aio) != 0) { + // Outside of memory exhaustion, we can't really think + // of any reason for this to legitimately fail. + // Arguably we should inject a fallback delay, but for + // now we just carry on. + return; + } + + memset(&sa, 0, sizeof(sa)); + sin6 = (void *) &sa; + nsin6 = &ztn->zn_rcv6_addr.s_un.s_in6; + sin6->sin6_family = AF_INET6; + sin6->sin6_port = nsin6->sa_port; + memcpy(&sin6->sin6_addr, nsin6->sa_addr, 16); + + nni_mtx_lock(&zt_lk); + now = zt_now(); // msec + + // We are not going to perform any validation of the data; we + // just pass this straight into the ZeroTier core. + ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa, + ztn->zn_rcv6_buf, aio->a_count, &now); + + // Schedule background work + zt_node_resched(ztn, now); + + // Schedule another receive. + if (ztn->zn_udp6 != NULL) { + aio->a_niov = 1; + aio->a_iov[0].iov_buf = ztn->zn_rcv6_buf; + aio->a_iov[0].iov_len = zt_rcv_bufsize; + aio->a_addr = &ztn->zn_rcv6_addr; + aio->a_count = 0; + nni_plat_udp_recv(ztn->zn_udp6, aio); + } + nni_mtx_unlock(&zt_lk); +} + +static uint64_t +zt_mac_to_node(uint64_t mac, uint64_t nwid) +{ + uint64_t node; + // This extracts a node address from a mac addres. The + // network ID is mixed in, and has to be extricated. We + // the node ID is located in the lower 40 bits, and scrambled + // against the nwid. + node = mac & 0xffffffffffull; + node ^= ((nwid >> 8) & 0xff) << 32; + node ^= ((nwid >> 16) & 0xff) << 24; + node ^= ((nwid >> 24) & 0xff) << 16; + node ^= ((nwid >> 32) & 0xff) << 8; + node ^= (nwid >> 40) & 0xff; + return (node); +} + +static uint64_t +zt_node_to_mac(uint64_t node, uint64_t nwid) +{ + uint64_t mac; + // We use LSB of network ID, and make sure that we clear + // multicast and set local administration -- this is the first + // octet of the 48 bit mac address. We also avoid 0x52, which + // is known to be used in KVM, libvirt, etc. + mac = ((uint8_t)(nwid & 0xfe) | 0x02); + if (mac == 0x52) { + mac = 0x32; + } + mac <<= 40; + mac |= node; + // The rest of the network ID is XOR'd in, in reverse byte + // order. + mac ^= ((nwid >> 8) & 0xff) << 32; + mac ^= ((nwid >> 16) & 0xff) << 24; + mac ^= ((nwid >> 24) & 0xff) << 16; + mac ^= ((nwid >> 32) & 0xff) << 8; + mac ^= (nwid >> 40) & 0xff; + return (mac); +} + +static int +zt_result(enum ZT_ResultCode rv) +{ + switch (rv) { + case ZT_RESULT_OK: + return (0); + case ZT_RESULT_OK_IGNORED: + return (0); + case ZT_RESULT_FATAL_ERROR_OUT_OF_MEMORY: + return (NNG_ENOMEM); + case ZT_RESULT_FATAL_ERROR_DATA_STORE_FAILED: + return (NNG_EPERM); + case ZT_RESULT_FATAL_ERROR_INTERNAL: + return (NNG_EINTERNAL); + case ZT_RESULT_ERROR_NETWORK_NOT_FOUND: + return (NNG_EADDRINVAL); + case ZT_RESULT_ERROR_UNSUPPORTED_OPERATION: + return (NNG_ENOTSUP); + case ZT_RESULT_ERROR_BAD_PARAMETER: + return (NNG_EINVAL); + default: + return (NNG_ETRANERR + (int) rv); + } +} + +// ZeroTier Node API callbacks +static int +zt_virtual_config(ZT_Node *node, void *userptr, void *thr, uint64_t nwid, + void **netptr, enum ZT_VirtualNetworkConfigOperation op, + const ZT_VirtualNetworkConfig *config) +{ + zt_node *ztn = userptr; + zt_ep * ep; + + NNI_ARG_UNUSED(thr); + NNI_ARG_UNUSED(netptr); + + NNI_ASSERT(node == ztn->zn_znode); + + // Maybe we don't have to create taps or anything like that. + // We do get our mac and MTUs from this, so there's that. + switch (op) { + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_UP: + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_CONFIG_UPDATE: + + // We only really care about changes to the MTU. From + // an API perspective the MAC could change, but that + // cannot really happen because the node identity and + // the nwid are fixed. + NNI_LIST_FOREACH (&ztn->zn_eplist, ep) { + NNI_ASSERT(nwid == config->nwid); + if (ep->ze_nwid != config->nwid) { + continue; + } + ep->ze_maxmtu = config->mtu; + ep->ze_phymtu = config->physicalMtu; + + if ((ep->ze_mode == NNI_EP_MODE_DIAL) && + (nni_list_first(&ep->ze_aios) != NULL)) { + zt_ep_send_conn_req(ep); + } + // if (ep->ze_mode == NNI_EP + // zt_send_ + // nni_aio_finish(ep->ze_join_aio, 0); + // } + // XXX: schedule creqs if needed! + } + break; + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY: + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DOWN: + // XXX: tear down endpoints? + default: + break; + } + return (0); +} + +// zt_send modifies the start of the supplied buffer to update the +// message headers with protocol specific details (version, port numbers, +// etc.) and then sends it over the virtual network. +static void +zt_send(zt_node *ztn, uint64_t nwid, uint8_t op, uint64_t raddr, + uint64_t laddr, uint8_t *data, size_t len) +{ + uint64_t srcmac = zt_node_to_mac(laddr >> 24, nwid); + uint64_t dstmac = zt_node_to_mac(raddr >> 24, nwid); + uint64_t now = zt_now(); + + NNI_ASSERT(len >= zt_size_headers); + data[zt_offset_op] = op; + data[zt_offset_flags] = 0; + data[zt_offset_zero1] = 0; + data[zt_offset_zero2] = 0; + NNI_PUT16(data + zt_offset_version, zt_version); + ZT_PUT24(data + zt_offset_dst_port, raddr & zt_port_mask); + ZT_PUT24(data + zt_offset_src_port, laddr & zt_port_mask); + + // If we are looping back, bypass ZT. + if (srcmac == dstmac) { + zt_virtual_recv(ztn->zn_znode, ztn, NULL, nwid, NULL, srcmac, + dstmac, zt_ethertype, 0, data, len); + return; + } + + (void) ZT_Node_processVirtualNetworkFrame(ztn->zn_znode, NULL, now, + nwid, srcmac, dstmac, zt_ethertype, 0, data, len, &now); + + zt_node_resched(ztn, now); +} + +static void +zt_send_err(zt_node *ztn, uint64_t nwid, uint64_t raddr, uint64_t laddr, + uint8_t err, const char *msg) +{ + uint8_t data[128]; + + NNI_ASSERT((strlen(msg) + zt_offset_err_msg) < sizeof(data)); + + data[zt_offset_err_code] = err; + nni_strlcpy((char *) data + zt_offset_err_msg, msg, + sizeof(data) - zt_offset_err_msg); + + zt_send(ztn, nwid, zt_op_error, raddr, laddr, data, + strlen(msg) + zt_offset_err_msg); +} + +static void +zt_pipe_send_err(zt_pipe *p, uint8_t err, const char *msg) +{ + zt_send_err(p->zp_ztn, p->zp_nwid, p->zp_raddr, p->zp_laddr, err, msg); +} + +static void +zt_pipe_send_disc_req(zt_pipe *p) +{ + uint8_t data[zt_size_disc_req]; + + zt_send(p->zp_ztn, p->zp_nwid, zt_op_disc_req, p->zp_raddr, + p->zp_laddr, data, sizeof(data)); +} + +static void +zt_pipe_send_ping(zt_pipe *p) +{ + uint8_t data[zt_size_ping]; + + zt_send(p->zp_ztn, p->zp_nwid, zt_op_ping, p->zp_raddr, p->zp_laddr, + data, sizeof(data)); +} + +static void +zt_pipe_send_pong(zt_pipe *p) +{ + uint8_t data[zt_size_ping]; + + zt_send(p->zp_ztn, p->zp_nwid, zt_op_pong, p->zp_raddr, p->zp_laddr, + data, sizeof(data)); +} + +static void +zt_pipe_send_conn_ack(zt_pipe *p) +{ + uint8_t data[zt_size_conn_ack]; + + NNI_PUT16(data + zt_offset_cack_proto, p->zp_proto); + zt_send(p->zp_ztn, p->zp_nwid, zt_op_conn_ack, p->zp_raddr, + p->zp_laddr, data, sizeof(data)); +} + +static void +zt_ep_send_conn_req(zt_ep *ep) +{ + uint8_t data[zt_size_conn_req]; + + NNI_PUT16(data + zt_offset_creq_proto, ep->ze_proto); + zt_send(ep->ze_ztn, ep->ze_nwid, zt_op_conn_req, ep->ze_raddr, + ep->ze_laddr, data, sizeof(data)); +} + +static void +zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) +{ + zt_node *ztn = ep->ze_ztn; + nni_aio *aio = ep->ze_creq_aio; + zt_pipe *p; + int rv; + + if (ep->ze_mode != NNI_EP_MODE_DIAL) { + zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, + zt_err_proto, "Inappropriate operation"); + return; + } + + if (len != zt_size_conn_ack) { + zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, + zt_err_proto, "Bad message length"); + return; + } + + if (ep->ze_creq_try == 0) { + return; + } + + // Do we already have a matching pipe? If so, we can discard + // the operation. This should not happen, since we normally, + // deregister the endpoint when we create the pipe. + if ((nni_idhash_find(ztn->zn_peers, raddr, (void **) &p)) == 0) { + return; + } + + if ((rv = zt_pipe_init(&p, ep, raddr, ep->ze_laddr)) != 0) { + // We couldn't create the pipe, just drop it. + nni_aio_finish_error(aio, rv); + return; + } + NNI_GET16(data + zt_offset_cack_proto, p->zp_peer); + + // Reset the address of the endpoint, so that the next call to + // ep_connect will bind a new one -- we are using this one for the + // pipe. + nni_idhash_remove(ztn->zn_eps, ep->ze_laddr); + ep->ze_laddr = 0; + + nni_aio_finish_pipe(aio, p); +} + +static void +zt_ep_recv_conn_req(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) +{ + zt_node *ztn = ep->ze_ztn; + zt_pipe *p; + int i; + + if (ep->ze_mode != NNI_EP_MODE_LISTEN) { + zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, + zt_err_proto, "Inappropriate operation"); + return; + } + if (len != zt_size_conn_req) { + zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, + zt_err_proto, "Bad message length"); + return; + } + + // If we already have created a pipe for this connection + // then just reply the conn ack. + if ((nni_idhash_find(ztn->zn_peers, raddr, (void **) &p)) == 0) { + zt_pipe_send_conn_ack(p); + return; + } + + // We may already have a connection request queued (if this was + // a resend for example); if that's the case we just ignore + // this one. + for (i = ep->ze_creq_tail; i != ep->ze_creq_head; i++) { + if (ep->ze_creqs[i % zt_listenq].cr_raddr == raddr) { + return; + } + } + // We may already have filled our listenq, in which case we just drop. + if ((ep->ze_creq_tail + zt_listenq) == ep->ze_creq_head) { + // We have taken as many as we can, so just drop it. + return; + } + + // Record the connection request, and then process any + // pending acceptors. + i = ep->ze_creq_head % zt_listenq; + + NNI_GET16(data + zt_offset_creq_proto, ep->ze_creqs[i].cr_proto); + ep->ze_creqs[i].cr_raddr = raddr; + ep->ze_creqs[i].cr_expire = nni_clock() + zt_listen_expire; + ep->ze_creq_head++; + + zt_ep_doaccept(ep); +} + +static void +zt_ep_recv_error(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) +{ + nni_aio *aio; + int code; + + // Most of the time we don't care about errors. The exception here + // is that when we have an outstanding CON_REQ, we would like to + // process that appropriately. + + if (ep->ze_mode != NNI_EP_MODE_DIAL) { + // Drop it. + return; + } + + if (len < zt_offset_err_msg) { + // Malformed error frame. + return; + } + + code = data[zt_offset_err_code]; + switch (code) { + case zt_err_refused: + code = NNG_ECONNREFUSED; + break; + case zt_err_notconn: + code = NNG_ECLOSED; + break; + case zt_err_wrongsp: + code = NNG_EPROTO; + break; + default: + code = NNG_ETRANERR; + break; + } + + if (ep->ze_creq_try > 0) { + ep->ze_creq_try = 0; + nni_aio_finish_error(ep->ze_creq_aio, code); + } +} + +static void +zt_ep_virtual_recv( + zt_ep *ep, uint8_t op, uint64_t raddr, const uint8_t *data, size_t len) +{ + // Only listeners should be receiving. Dialers receive on the pipe, + // rather than the endpoint. The only message that endpoints can + // receive are connection requests. + switch (op) { + case zt_op_conn_req: + zt_ep_recv_conn_req(ep, raddr, data, len); + return; + case zt_op_conn_ack: + zt_ep_recv_conn_ack(ep, raddr, data, len); + return; + case zt_op_error: + zt_ep_recv_error(ep, raddr, data, len); + return; + default: + zt_send_err(ep->ze_ztn, ep->ze_nwid, raddr, ep->ze_laddr, + zt_err_proto, "Bad operation"); + return; + } +} + +static void +zt_pipe_close_err(zt_pipe *p, int err, uint8_t code, const char *msg) +{ + nni_aio *aio; + if ((aio = p->zp_user_rxaio) != NULL) { + p->zp_user_rxaio = NULL; + nni_aio_finish_error(aio, err); + } + if ((aio = p->zp_ping_aio) != NULL) { + nni_aio_finish_error(aio, NNG_ECLOSED); + } + p->zp_closed = 1; + if (msg != NULL) { + zt_pipe_send_err(p, code, msg); + } +} + +static void +zt_pipe_recv_data(zt_pipe *p, const uint8_t *data, size_t len) +{ + nni_aio * aio; + uint16_t msgid; + uint16_t fragno; + uint16_t nfrags; + uint16_t fragsz; + zt_fraglist *fl; + int i; + int slot; + uint8_t bit; + uint8_t * body; + + if (len < zt_size_data) { + // Runt frame. Drop it and close pipe with a protocol error. + zt_pipe_close_err(p, NNG_EPROTO, zt_err_proto, "Runt frame"); + return; + } + + NNI_GET16(data + zt_offset_data_id, msgid); + NNI_GET16(data + zt_offset_data_fragsz, fragsz); + NNI_GET16(data + zt_offset_data_frag, fragno); + NNI_GET16(data + zt_offset_data_nfrag, nfrags); + len -= zt_offset_data_data; + data += zt_offset_data_data; + + // Check for cases where message size is clearly too large. Note + // that we only can catch the case where a message is larger by + // more than a fragment, since the final fragment may be shorter, + // and we won't know that until we receive it. + if ((nfrags * fragsz) >= (p->zp_rcvmax + fragsz)) { + // Discard, as the forwarder might be on the other side + // of a device. This is gentler than just shutting the pipe + // down. Sending a remote error might be polite, but since + // most peers will close the pipe on such an error, we + // simply silent discard it. + return; + } + + // We run the recv logic once, to clear stale fragment entries. + zt_pipe_dorecv(p); + + // Find a suitable fragment slot. + slot = -1; + for (i = 0; i < zt_recvq; i++) { + fl = &p->zp_recvq[i]; + // This was our message ID, we always use it. + if (msgid == fl->fl_msgid) { + slot = i; + break; + } + + if (slot < 0) { + slot = i; + } else if (fl->fl_time < p->zp_recvq[slot].fl_time) { + // This has an earlier expiration, so lets choose it. + slot = i; + } + } + + NNI_ASSERT(slot >= 0); + + fl = &p->zp_recvq[slot]; + if (fl->fl_msgid != msgid) { + // First fragment we've received for this message (but might + // not be first fragment for message!) + zt_fraglist_clear(fl); + + if (nni_msg_alloc(&fl->fl_msg, nfrags * fragsz) != 0) { + // Out of memory. We don't close the pipe, but + // just fail to receive the message. Bump a stat? + return; + } + + fl->fl_nfrags = nfrags; + fl->fl_fragsz = fragsz; + fl->fl_msgid = msgid; + fl->fl_time = nni_clock(); + + // Set the missing mask. + memset(fl->fl_missing, 0xff, nfrags / 8); + fl->fl_missing[nfrags / 8] |= ((1 << (nfrags % 8)) - 1); + } + if ((nfrags != fl->fl_nfrags) || (fragsz != fl->fl_fragsz) || + (fragno >= nfrags) || (fragsz == 0) || (nfrags == 0) || + ((fragno != (nfrags - 1)) && (len != fragsz))) { + // Protocol error, message parameters changed. + zt_pipe_close_err( + p, NNG_EPROTO, zt_err_proto, "Invalid message parameters"); + zt_fraglist_clear(fl); + return; + } + + bit = (uint8_t)(1 << (fragno % 8)); + if ((fl->fl_missing[fragno / 8] & bit) == 0) { + // We've already got this fragment, ignore it. We don't + // bother to check for changed data. + return; + } + + fl->fl_missing[fragno / 8] &= ~(bit); + body = nni_msg_body(fl->fl_msg); + body += fragno * fragsz; + memcpy(body, data, len); + if (fragno == (nfrags - 1)) { + // Last frag, maybe shorten the message. + nni_msg_chop(fl->fl_msg, (fragsz - len)); + if (nni_msg_len(fl->fl_msg) > p->zp_rcvmax) { + // Strict enforcement of max recv. + zt_fraglist_clear(fl); + // Just discard the message. + return; + } + } + + for (i = 0; i < ((nfrags + 7) / 8); i++) { + if (fl->fl_missing[i]) { + return; + } + } + + // We got all fragments... try to send it up. + fl->fl_ready = 1; + zt_pipe_dorecv(p); +} + +static void +zt_pipe_recv_ping(zt_pipe *p, const uint8_t *data, size_t len) +{ + NNI_ARG_UNUSED(data); + + if (len != zt_size_ping) { + zt_pipe_send_err(p, zt_err_proto, "Incorrect ping size"); + return; + } + zt_pipe_send_pong(p); +} + +static void +zt_pipe_recv_pong(zt_pipe *p, const uint8_t *data, size_t len) +{ + NNI_ARG_UNUSED(data); + + if (len != zt_size_pong) { + zt_pipe_send_err(p, zt_err_proto, "Incorrect pong size"); + } +} + +static void +zt_pipe_recv_disc_req(zt_pipe *p, const uint8_t *data, size_t len) +{ + nni_aio *aio; + // NB: lock held already. + // Don't bother to check the length, going to disconnect anyway. + if ((aio = p->zp_user_rxaio) != NULL) { + p->zp_user_rxaio = NULL; + p->zp_closed = 1; + nni_aio_finish_error(aio, NNG_ECLOSED); + } +} + +static void +zt_pipe_recv_error(zt_pipe *p, const uint8_t *data, size_t len) +{ + nni_aio *aio; + + // Perhaps we should log an error message, but at the end of + // the day, the details are just not that interesting. + if ((aio = p->zp_user_rxaio) != NULL) { + p->zp_user_rxaio = NULL; + p->zp_closed = 1; + nni_aio_finish_error(aio, NNG_ETRANERR); + } +} + +// This function is called when we have determined that a frame has +// arrived for a pipe. The remote and local addresses were both +// matched by the caller. +static void +zt_pipe_virtual_recv(zt_pipe *p, uint8_t op, const uint8_t *data, size_t len) +{ + // We got data, so update our recv time. + p->zp_last_recv = nni_clock(); + p->zp_ping_try = 0; + + switch (op) { + case zt_op_data: + zt_pipe_recv_data(p, data, len); + return; + case zt_op_disc_req: + zt_pipe_recv_disc_req(p, data, len); + return; + case zt_op_ping: + zt_pipe_recv_ping(p, data, len); + return; + case zt_op_pong: + zt_pipe_recv_pong(p, data, len); + return; + case zt_op_error: + zt_pipe_recv_error(p, data, len); + return; + } +} + +// This function is called when a frame arrives on the +// *virtual* network. +static void +zt_virtual_recv(ZT_Node *node, void *userptr, void *thr, uint64_t nwid, + void **netptr, uint64_t srcmac, uint64_t dstmac, unsigned int ethertype, + unsigned int vlanid, const void *payload, unsigned int len) +{ + zt_node * ztn = userptr; + uint8_t op; + const uint8_t *data = payload; + uint16_t proto; + uint16_t version; + uint32_t rport; + uint32_t lport; + zt_ep * ep; + zt_pipe * p; + uint64_t raddr; + uint64_t laddr; + + if ((ethertype != zt_ethertype) || (len < zt_size_headers) || + (data[zt_offset_flags] != 0) || (data[zt_offset_zero1] != 0) || + (data[zt_offset_zero2] != 0)) { + return; + } + NNI_GET16(data + zt_offset_version, version); + if (version != zt_version) { + return; + } + + op = data[zt_offset_op]; + + ZT_GET24(data + zt_offset_dst_port, lport); + ZT_GET24(data + zt_offset_src_port, rport); + + raddr = zt_mac_to_node(srcmac, nwid); + raddr <<= 24; + raddr |= rport; + + laddr = zt_mac_to_node(dstmac, nwid); + laddr <<= 24; + laddr |= lport; + + // NB: We are holding the zt_lock. + + // Look up a pipe, but also we use this chance to check that + // the source address matches what the pipe was established with. + // If the pipe does not match then we nak it. Note that pipes can + // appear on the znode twice (loopback), so we have to be careful + // to check the entire set of parameters, and to check for server + // vs. client pipes separately. + + // If its a local address match on a client pipe, process it. + if ((nni_idhash_find(ztn->zn_lpipes, laddr, (void *) &p) == 0) && + (p->zp_nwid == nwid) && (p->zp_raddr == raddr)) { + zt_pipe_virtual_recv(p, op, data, len); + return; + } + + // If its a remote address match on a server pipe, process it. + if ((nni_idhash_find(ztn->zn_rpipes, raddr, (void *) &p) == 0) && + (p->zp_nwid == nwid) && (p->zp_laddr == laddr)) { + zt_pipe_virtual_recv(p, op, data, len); + return; + } + + // No pipe, so look for an endpoint. + if ((nni_idhash_find(ztn->zn_eps, laddr, (void **) &ep) == 0) && + (ep->ze_nwid == nwid)) { + // direct this to an endpoint. + zt_ep_virtual_recv(ep, op, raddr, data, len); + return; + } + + // We have a request for which we have no listener, and no + // pipe. For some of these we send back a NAK, but for others + // we just drop the frame. + switch (op) { + case zt_op_conn_req: + // No listener. Connection refused. + zt_send_err(ztn, nwid, raddr, laddr, zt_err_refused, + "Connection refused"); + return; + case zt_op_data: + case zt_op_ping: + case zt_op_conn_ack: + zt_send_err(ztn, nwid, raddr, laddr, zt_err_notconn, + "Connection not found"); + break; + case zt_op_error: + case zt_op_pong: + case zt_op_disc_req: + default: + // Just drop these. + break; + } +} + +static void +zt_event_cb(ZT_Node *node, void *userptr, void *thr, enum ZT_Event event, + const void *payload) +{ + NNI_ARG_UNUSED(node); + NNI_ARG_UNUSED(userptr); + NNI_ARG_UNUSED(thr); + + switch (event) { + case ZT_EVENT_ONLINE: // Connected to the virtual net. + case ZT_EVENT_UP: // Node initialized (may not be connected). + case ZT_EVENT_DOWN: // Teardown of the node. + case ZT_EVENT_OFFLINE: // Removal of the node from the net. + case ZT_EVENT_TRACE: // Local trace events. + // printf("TRACE: %s\n", (const char *) payload); + break; + case ZT_EVENT_REMOTE_TRACE: // Remote trace, not supported. + default: + break; + } +} + +static const char *zt_files[] = { + // clang-format off + NULL, // none, i.e. not used at all + "identity.public", + "identity.secret", + "planet", + NULL, // moon, e.g. moons.d/<ID>.moon -- we don't persist it + NULL, // peer, e.g. peers.d/<ID> -- we don't persist this + NULL, // network, e.g. networks.d/<ID>.conf -- we don't persist + // clang-format on +}; + +#ifdef _WIN32 +#define unlink DeleteFile +#define pathsep "\\" +#else +#define pathsep "/" +#endif + +static struct { + size_t len; + void * data; +} zt_ephemeral_state[ZT_STATE_OBJECT_NETWORK_CONFIG]; + +static void +zt_state_put(ZT_Node *node, void *userptr, void *thr, + enum ZT_StateObjectType objtype, const uint64_t objid[2], const void *data, + int len) +{ + FILE * file; + zt_node * ztn = userptr; + char path[NNG_MAXADDRLEN + 1]; + const char *fname; + size_t sz; + + NNI_ARG_UNUSED(objid); // only use global files + + if ((objtype > ZT_STATE_OBJECT_NETWORK_CONFIG) || + ((fname = zt_files[(int) objtype]) == NULL)) { + return; + } + + // If we have no valid path, then we just use ephemeral data. + if (strlen(ztn->zn_path) == 0) { + void * ndata = NULL; + void * odata = zt_ephemeral_state[objtype].data; + size_t olen = zt_ephemeral_state[objtype].len; + if ((len >= 0) && ((ndata = nni_alloc(len)) != NULL)) { + memcpy(ndata, data, len); + } + zt_ephemeral_state[objtype].data = ndata; + zt_ephemeral_state[objtype].len = len; + + if (olen > 0) { + nni_free(odata, olen); + } + return; + } + + sz = sizeof(path); + if (snprintf(path, sz, "%s%s%s", ztn->zn_path, pathsep, fname) >= sz) { + // If the path is too long, we can't cope. We + // just decline to store anything. + return; + } + + // We assume that everyone can do standard C I/O. + // This may be a bad assumption. If that's the case, + // the platform should supply an alternative + // implementation. We are also assuming that we don't + // need to worry about atomic updates. As these items + // (keys, etc.) pretty much don't change, this should + // be fine. + + if (len < 0) { + (void) unlink(path); + return; + } + + if ((file = fopen(path, "wb")) == NULL) { + return; + } + + if (fwrite(data, 1, len, file) != len) { + (void) unlink(path); + } + (void) fclose(file); +} + +static int +zt_state_get(ZT_Node *node, void *userptr, void *thr, + enum ZT_StateObjectType objtype, const uint64_t objid[2], void *data, + unsigned int len) +{ + FILE * file; + zt_node * ztn = userptr; + char path[NNG_MAXADDRLEN + 1]; + const char *fname; + int nread; + size_t sz; + + NNI_ARG_UNUSED(objid); // we only use global files + + if ((objtype > ZT_STATE_OBJECT_NETWORK_CONFIG) || + ((fname = zt_files[(int) objtype]) == NULL)) { + return (-1); + } + + // If no base directory, we are using ephemeral data. + if (strlen(ztn->zn_path) == 0) { + if (zt_ephemeral_state[objtype].data == NULL) { + return (-1); + } + if (zt_ephemeral_state[objtype].len > len) { + return (-1); + } + len = zt_ephemeral_state[objtype].len; + memcpy(data, zt_ephemeral_state[objtype].data, len); + return (len); + } + + sz = sizeof(path); + if (snprintf(path, sz, "%s%s%s", ztn->zn_path, pathsep, fname) >= sz) { + // If the path is too long, we can't cope. + return (-1); + } + + // We assume that everyone can do standard C I/O. + // This may be a bad assumption. If that's the case, + // the platform should supply an alternative + // implementation. We are also assuming that we don't + // need to worry about atomic updates. As these items + // (keys, etc.) pretty much don't change, this should + // be fine. + + if ((file = fopen(path, "rb")) == NULL) { + return (-1); + } + + // seek to end of file + (void) fseek(file, 0, SEEK_END); + if (ftell(file) > len) { + fclose(file); + return (-1); + } + (void) fseek(file, 0, SEEK_SET); + + nread = (int) fread(data, 1, len, file); + (void) fclose(file); + + return (nread); +} + +typedef struct zt_send_hdr { + nni_sockaddr sa; + size_t len; +} zt_send_hdr; + +static void +zt_wire_packet_send_cb(void *arg) +{ + // We don't actually care much about the results, we + // just need to release the resources. + nni_aio * aio = arg; + zt_send_hdr *hdr; + + hdr = nni_aio_get_data(aio); + nni_free(hdr, hdr->len + sizeof(*hdr)); + nni_aio_fini_cb(aio); +} + +// This function is called when ZeroTier desires to send a +// physical frame. The data is a UDP payload, the rest of the +// payload should be set over vanilla UDP. +static int +zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket, + const struct sockaddr_storage *remaddr, const void *data, unsigned int len, + unsigned int ttl) +{ + nni_aio * aio; + nni_sockaddr addr; + struct sockaddr_in * sin = (void *) remaddr; + struct sockaddr_in6 *sin6 = (void *) remaddr; + zt_node * ztn = userptr; + nni_plat_udp * udp; + uint16_t port; + char * buf; + zt_send_hdr * hdr; + + NNI_ARG_UNUSED(thr); + NNI_ARG_UNUSED(socket); + NNI_ARG_UNUSED(ttl); + + // Kind of unfortunate, but we have to convert the + // sockaddr to a neutral form, and then back again in + // the platform layer. + switch (sin->sin_family) { + case AF_INET: + addr.s_un.s_in.sa_family = NNG_AF_INET; + addr.s_un.s_in.sa_port = sin->sin_port; + addr.s_un.s_in.sa_addr = sin->sin_addr.s_addr; + udp = ztn->zn_udp4; + port = htons(sin->sin_port); + break; + case AF_INET6: + addr.s_un.s_in6.sa_family = NNG_AF_INET6; + addr.s_un.s_in6.sa_port = sin6->sin6_port; + udp = ztn->zn_udp6; + port = htons(sin6->sin6_port); + memcpy(addr.s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); + break; + default: + // No way to understand the address. + return (-1); + } + + if (nni_aio_init(&aio, zt_wire_packet_send_cb, NULL) != 0) { + // Out of memory + return (-1); + } + if ((buf = nni_alloc(sizeof(*hdr) + len)) == NULL) { + nni_aio_fini(aio); + return (-1); + } + + hdr = (void *) buf; + buf += sizeof(*hdr); + + memcpy(buf, data, len); + nni_aio_set_data(aio, hdr); + hdr->sa = addr; + hdr->len = len; + + aio->a_addr = &hdr->sa; + aio->a_niov = 1; + aio->a_iov[0].iov_buf = buf; + aio->a_iov[0].iov_len = len; + + // This should be non-blocking/best-effort, so while + // not great that we're holding the lock, also not tragic. + nni_aio_set_synch(aio); + nni_plat_udp_send(udp, aio); + + return (0); +} + +static struct ZT_Node_Callbacks zt_callbacks = { + .version = 0, + .statePutFunction = zt_state_put, + .stateGetFunction = zt_state_get, + .wirePacketSendFunction = zt_wire_packet_send, + .virtualNetworkFrameFunction = zt_virtual_recv, + .virtualNetworkConfigFunction = zt_virtual_config, + .eventCallback = zt_event_cb, + .pathCheckFunction = NULL, + .pathLookupFunction = NULL, +}; + +static void +zt_node_destroy(zt_node *ztn) +{ + nni_aio_stop(ztn->zn_rcv4_aio); + nni_aio_stop(ztn->zn_rcv6_aio); + + // Wait for background thread to exit! + nni_thr_fini(&ztn->zn_bgthr); + + if (ztn->zn_znode != NULL) { + ZT_Node_delete(ztn->zn_znode); + } + + if (ztn->zn_udp4 != NULL) { + nni_plat_udp_close(ztn->zn_udp4); + } + if (ztn->zn_udp6 != NULL) { + nni_plat_udp_close(ztn->zn_udp6); + } + + if (ztn->zn_rcv4_buf != NULL) { + nni_free(ztn->zn_rcv4_buf, zt_rcv_bufsize); + } + if (ztn->zn_rcv6_buf != NULL) { + nni_free(ztn->zn_rcv6_buf, zt_rcv_bufsize); + } + nni_aio_fini(ztn->zn_rcv4_aio); + nni_aio_fini(ztn->zn_rcv6_aio); + nni_idhash_fini(ztn->zn_eps); + nni_idhash_fini(ztn->zn_lpipes); + nni_idhash_fini(ztn->zn_rpipes); + nni_idhash_fini(ztn->zn_peers); + nni_cv_fini(&ztn->zn_bgcv); + NNI_FREE_STRUCT(ztn); +} + +static int +zt_node_create(zt_node **ztnp, const char *path) +{ + zt_node * ztn; + nng_sockaddr sa4; + nng_sockaddr sa6; + int rv; + enum ZT_ResultCode zrv; + + // We want to bind to any address we can (for now). + // Note that at the moment we only support IPv4. Its + // unclear how we are meant to handle underlying IPv6 + // in ZeroTier. Probably we can use IPv6 dual stock + // sockets if they exist, but not all platforms support + // dual-stack. Furhtermore, IPv6 is not available + // everywhere, and the root servers may be IPv4 only. + memset(&sa4, 0, sizeof(sa4)); + sa4.s_un.s_in.sa_family = NNG_AF_INET; + memset(&sa6, 0, sizeof(sa6)); + sa6.s_un.s_in6.sa_family = NNG_AF_INET6; + + if ((ztn = NNI_ALLOC_STRUCT(ztn)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&ztn->zn_eplist, zt_ep, ze_link); + NNI_LIST_INIT(&ztn->zn_plist, zt_pipe, zp_link); + nni_cv_init(&ztn->zn_bgcv, &zt_lk); + nni_aio_init(&ztn->zn_rcv4_aio, zt_node_rcv4_cb, ztn); + nni_aio_init(&ztn->zn_rcv6_aio, zt_node_rcv6_cb, ztn); + + if (((ztn->zn_rcv4_buf = nni_alloc(zt_rcv_bufsize)) == NULL) || + ((ztn->zn_rcv6_buf = nni_alloc(zt_rcv_bufsize)) == NULL)) { + zt_node_destroy(ztn); + return (NNG_ENOMEM); + } + if (((rv = nni_idhash_init(&ztn->zn_ports)) != 0) || + ((rv = nni_idhash_init(&ztn->zn_eps)) != 0) || + ((rv = nni_idhash_init(&ztn->zn_lpipes)) != 0) || + ((rv = nni_idhash_init(&ztn->zn_rpipes)) != 0) || + ((rv = nni_idhash_init(&ztn->zn_peers)) != 0) || + ((rv = nni_thr_init(&ztn->zn_bgthr, zt_bgthr, ztn)) != 0) || + ((rv = nni_plat_udp_open(&ztn->zn_udp4, &sa4)) != 0) || + ((rv = nni_plat_udp_open(&ztn->zn_udp6, &sa6)) != 0)) { + zt_node_destroy(ztn); + return (rv); + } + + // Setup for dynamic ephemeral port allocations. We + // set the range to allow for ephemeral ports, but not + // higher than the max port, and starting with an + // initial random value. Note that this should give us + // about 8 million possible ephemeral ports. + nni_idhash_set_limits(ztn->zn_ports, zt_ephemeral, zt_max_port, + (nni_random() % (zt_max_port - zt_ephemeral)) + zt_ephemeral); + + nni_strlcpy(ztn->zn_path, path, sizeof(ztn->zn_path)); + zrv = ZT_Node_new(&ztn->zn_znode, ztn, NULL, &zt_callbacks, zt_now()); + if (zrv != ZT_RESULT_OK) { + zt_node_destroy(ztn); + return (zt_result(zrv)); + } + + nni_list_append(&zt_nodes, ztn); + + ztn->zn_self = ZT_Node_address(ztn->zn_znode); + + nni_thr_run(&ztn->zn_bgthr); + + // Schedule an initial background run. + zt_node_resched(ztn, 1); + + // Schedule receive + ztn->zn_rcv4_aio->a_niov = 1; + ztn->zn_rcv4_aio->a_iov[0].iov_buf = ztn->zn_rcv4_buf; + ztn->zn_rcv4_aio->a_iov[0].iov_len = zt_rcv_bufsize; + ztn->zn_rcv4_aio->a_addr = &ztn->zn_rcv4_addr; + ztn->zn_rcv4_aio->a_count = 0; + ztn->zn_rcv6_aio->a_niov = 1; + ztn->zn_rcv6_aio->a_iov[0].iov_buf = ztn->zn_rcv6_buf; + ztn->zn_rcv6_aio->a_iov[0].iov_len = zt_rcv_bufsize; + ztn->zn_rcv6_aio->a_addr = &ztn->zn_rcv6_addr; + ztn->zn_rcv6_aio->a_count = 0; + + nni_plat_udp_recv(ztn->zn_udp4, ztn->zn_rcv4_aio); + nni_plat_udp_recv(ztn->zn_udp6, ztn->zn_rcv6_aio); + + *ztnp = ztn; + return (0); +} + +static int +zt_node_find(zt_ep *ep) +{ + zt_node * ztn; + int rv; + nng_sockaddr sa; + ZT_VirtualNetworkConfig *cf; + + NNI_LIST_FOREACH (&zt_nodes, ztn) { + if (strcmp(ep->ze_home, ztn->zn_path) == 0) { + goto done; + } + } + + // We didn't find a node, so make one. And try to + // initialize it. + if ((rv = zt_node_create(&ztn, ep->ze_home)) != 0) { + return (rv); + } + +done: + + ep->ze_ztn = ztn; + if (nni_list_node_active(&ep->ze_link)) { + nni_list_node_remove(&ep->ze_link); + } + nni_list_append(&ztn->zn_eplist, ep); + + (void) ZT_Node_join(ztn->zn_znode, ep->ze_nwid, ztn, NULL); + + if ((cf = ZT_Node_networkConfig(ztn->zn_znode, ep->ze_nwid)) != NULL) { + NNI_ASSERT(cf->nwid == ep->ze_nwid); + ep->ze_maxmtu = cf->mtu; + ep->ze_phymtu = cf->physicalMtu; + ZT_Node_freeQueryResult(ztn->zn_znode, cf); + } + + return (0); +} + +static int +zt_chkopt(int opt, const void *dat, size_t sz) +{ + if (opt == nng_optid_recvmaxsz) { + // We cannot deal with message sizes larger + // than 64k. + return (nni_chkopt_size(dat, sz, 0, 0xffffffffU)); + } + if (opt == nng_optid_zt_home) { + size_t l = nni_strnlen(dat, sz); + if ((l >= sz) || (l >= NNG_MAXADDRLEN)) { + return (NNG_EINVAL); + } + // XXX: should we apply additional security + // checks? home path is not null terminated + return (0); + } + if (opt == nng_optid_zt_ping_count) { + return (nni_chkopt_int(dat, sz, 0, 1000000)); + } + if (opt == nng_optid_zt_ping_time) { + return (nni_chkopt_usec(dat, sz)); + } + return (NNG_ENOTSUP); +} + +static int +zt_tran_init(void) +{ + int rv; + if (((rv = nni_option_register(nng_opt_zt_home, &nng_optid_zt_home)) != + 0) || + ((rv = nni_option_register(nng_opt_zt_node, &nng_optid_zt_node)) != + 0) || + ((rv = nni_option_register(nng_opt_zt_nwid, &nng_optid_zt_nwid)) != + 0) || + ((rv = nni_option_register( + nng_opt_zt_status, &nng_optid_zt_status)) != 0) || + ((rv = nni_option_register(nng_opt_zt_network_name, + &nng_optid_zt_network_name)) != 0) || + ((rv = nni_option_register( + nng_opt_zt_local_port, &nng_optid_zt_local_port)) != 0) || + ((rv = nni_option_register( + nng_opt_zt_ping_count, &nng_optid_zt_ping_count)) != 0) || + ((rv = nni_option_register( + nng_opt_zt_ping_time, &nng_optid_zt_ping_time)) != 0)) { + return (rv); + } + nni_mtx_init(&zt_lk); + NNI_LIST_INIT(&zt_nodes, zt_node, zn_link); + return (0); +} + +static void +zt_tran_fini(void) +{ + nng_optid_zt_home = -1; + nng_optid_zt_nwid = -1; + nng_optid_zt_node = -1; + nng_optid_zt_ping_count = -1; + nng_optid_zt_ping_time = -1; + zt_node *ztn; + + nni_mtx_lock(&zt_lk); + while ((ztn = nni_list_first(&zt_nodes)) != 0) { + nni_list_remove(&zt_nodes, ztn); + ztn->zn_closed = 1; + nni_cv_wake(&ztn->zn_bgcv); + nni_mtx_unlock(&zt_lk); + + zt_node_destroy(ztn); + + nni_mtx_lock(&zt_lk); + } + nni_mtx_unlock(&zt_lk); + + for (int i = 0; i < ZT_STATE_OBJECT_NETWORK_CONFIG; i++) { + if (zt_ephemeral_state[i].len > 0) { + nni_free(zt_ephemeral_state[i].data, + zt_ephemeral_state[i].len); + } + } + NNI_ASSERT(nni_list_empty(&zt_nodes)); + nni_mtx_fini(&zt_lk); +} + +static void +zt_pipe_close(void *arg) +{ + zt_pipe *p = arg; + nni_aio *aio; + + nni_mtx_lock(&zt_lk); + p->zp_closed = 1; + if ((aio = p->zp_user_rxaio) != NULL) { + p->zp_user_rxaio = NULL; + nni_aio_finish_error(aio, NNG_ECLOSED); + } + zt_pipe_send_disc_req(p); + nni_mtx_unlock(&zt_lk); +} + +static void +zt_pipe_fini(void *arg) +{ + zt_pipe *p = arg; + zt_node *ztn = p->zp_ztn; + + nni_aio_fini(p->zp_ping_aio); + + // This tosses the connection details and all state. + nni_mtx_lock(&zt_lk); + nni_idhash_remove(ztn->zn_ports, p->zp_laddr & zt_port_mask); + nni_idhash_remove(ztn->zn_lpipes, p->zp_laddr); + nni_idhash_remove(ztn->zn_rpipes, p->zp_raddr); + nni_idhash_remove(ztn->zn_peers, p->zp_raddr); + nni_mtx_unlock(&zt_lk); + + for (int i = 0; i < zt_recvq; i++) { + zt_fraglist_free(&p->zp_recvq[i]); + } + + NNI_FREE_STRUCT(p); +} + +static int +zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr) +{ + zt_pipe *p; + int rv; + zt_node *ztn = ep->ze_ztn; + int i; + size_t maxfrag; + size_t maxfrags; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + p->zp_ztn = ztn; + p->zp_raddr = raddr; + p->zp_laddr = laddr; + p->zp_proto = ep->ze_proto; + p->zp_nwid = ep->ze_nwid; + p->zp_mtu = ep->ze_phymtu; + p->zp_rcvmax = ep->ze_rcvmax; + p->zp_ping_count = ep->ze_ping_count; + p->zp_ping_time = ep->ze_ping_time; + p->zp_next_msgid = (uint16_t) nni_random(); + p->zp_ping_try = 0; + + if (ep->ze_mode == NNI_EP_MODE_DIAL) { + rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p); + } else { + rv = nni_idhash_insert(ztn->zn_rpipes, raddr, p); + } + if ((rv != 0) || + ((rv = nni_idhash_insert(ztn->zn_peers, p->zp_raddr, p)) != 0) || + ((rv = nni_aio_init(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) { + zt_pipe_fini(p); + } + + // the largest fragment we can accept on this pipe + maxfrag = p->zp_mtu - zt_offset_data_data; + // and the larger fragment count we can accept on this pipe + // (round up) + maxfrags = (p->zp_rcvmax + (maxfrag - 1)) / maxfrag; + + for (i = 0; i < zt_recvq; i++) { + zt_fraglist *fl = &p->zp_recvq[i]; + fl->fl_time = NNI_TIME_ZERO; + fl->fl_msgid = 0; + fl->fl_ready = 0; + fl->fl_missingsz = (maxfrags + 7) / 8; + fl->fl_missing = nni_alloc(fl->fl_missingsz); + if (fl->fl_missing == NULL) { + zt_pipe_fini(p); + return (NNG_ENOMEM); + } + } + + *pipep = p; + return (0); +} + +static void +zt_pipe_send(void *arg, nni_aio *aio) +{ + // As we are sending UDP, and there is no callback to worry + // about, we just go ahead and send out a stream of messages + // synchronously. + zt_pipe *p = arg; + size_t offset; + uint8_t data[ZT_MAX_MTU]; + uint16_t id; + uint16_t nfrags; + uint16_t fragno; + uint16_t fragsz; + size_t bytes; + nni_msg *m; + + nni_mtx_lock(&zt_lk); + if (nni_aio_start(aio, NULL, p) != 0) { + nni_mtx_unlock(&zt_lk); + return; + } + + if (p->zp_closed) { + nni_aio_finish_error(aio, NNG_ECLOSED); + nni_mtx_unlock(&zt_lk); + return; + } + + fragsz = (uint16_t)(p->zp_mtu - zt_offset_data_data); + + if ((m = nni_aio_get_msg(aio)) == NULL) { + nni_aio_finish_error(aio, NNG_EINVAL); + nni_mtx_unlock(&zt_lk); + return; + }; + + bytes = nni_msg_header_len(m) + nni_msg_len(m); + if (bytes >= (0xfffe * fragsz)) { + nni_aio_finish_error(aio, NNG_EMSGSIZE); + nni_mtx_unlock(&zt_lk); + return; + } + // above check means nfrags will fit in 16-bits. + nfrags = (uint16_t)((bytes + (fragsz - 1)) / fragsz); + + // get the next message ID, but skip 0 + if ((id = p->zp_next_msgid++) == 0) { + id = p->zp_next_msgid++; + } + + offset = 0; + fragno = 0; + do { + uint8_t *dest = data + zt_offset_data_data; + size_t room = fragsz; + size_t fraglen = 0; + size_t len; + + // Prepend the header first. + if ((len = nni_msg_header_len(m)) > 0) { + if (len > fragsz) { + // This shouldn't happen! SP headers are + // supposed to be quite small. + nni_aio_finish_error(aio, NNG_EMSGSIZE); + nni_mtx_unlock(&zt_lk); + return; + } + memcpy(dest, nni_msg_header(m), len); + dest += len; + room -= len; + offset += len; + fraglen += len; + nni_msg_header_clear(m); + } + + len = nni_msg_len(m); + if (len > room) { + len = room; + } + memcpy(dest, nni_msg_body(m), len); + + nng_msg_trim(m, len); + NNI_PUT16(data + zt_offset_data_id, id); + NNI_PUT16(data + zt_offset_data_fragsz, fragsz); + NNI_PUT16(data + zt_offset_data_frag, fragno); + NNI_PUT16(data + zt_offset_data_nfrag, nfrags); + offset += len; + fraglen += len; + fragno++; + zt_send(p->zp_ztn, p->zp_nwid, zt_op_data, p->zp_raddr, + p->zp_laddr, data, fraglen + zt_offset_data_data); + } while (nni_msg_len(m) != 0); + + nni_aio_set_msg(aio, NULL); + nni_msg_free(m); + nni_aio_finish(aio, 0, offset); + nni_mtx_unlock(&zt_lk); +} + +static void +zt_pipe_cancel_recv(nni_aio *aio, int rv) +{ + zt_pipe *p = aio->a_prov_data; + nni_mtx_lock(&zt_lk); + if (p->zp_user_rxaio != aio) { + nni_mtx_unlock(&zt_lk); + } + p->zp_user_rxaio = NULL; + nni_mtx_unlock(&zt_lk); + nni_aio_finish_error(aio, rv); +} + +static void +zt_fraglist_clear(zt_fraglist *fl) +{ + nni_msg *msg; + + fl->fl_ready = 0; + fl->fl_msgid = 0; + fl->fl_time = NNI_TIME_ZERO; + if ((msg = fl->fl_msg) != NULL) { + fl->fl_msg = NULL; + nni_msg_free(msg); + } + memset(fl->fl_missing, 0, fl->fl_missingsz); +} + +static void +zt_fraglist_free(zt_fraglist *fl) +{ + zt_fraglist_clear(fl); + nni_free(fl->fl_missing, fl->fl_missingsz); + fl->fl_missing = NULL; +} + +static void +zt_pipe_dorecv(zt_pipe *p) +{ + nni_aio *aio = p->zp_user_rxaio; + nni_time now = nni_clock(); + + if (aio == NULL) { + return; + } + + for (int i = 0; i < zt_recvq; i++) { + zt_fraglist *fl = &p->zp_recvq[i]; + nni_msg * msg; + + if (now > (fl->fl_time + zt_recv_stale)) { + // fragment list is stale, clean it. + zt_fraglist_clear(fl); + continue; + } + if (!fl->fl_ready) { + continue; + } + + // Got data. Let's pass it up. + msg = fl->fl_msg; + fl->fl_msg = NULL; + NNI_ASSERT(msg != NULL); + nni_aio_finish_msg(aio, msg); + zt_fraglist_clear(fl); + return; + } +} + +static void +zt_pipe_recv(void *arg, nni_aio *aio) +{ + zt_pipe *p = arg; + + nni_mtx_lock(&zt_lk); + if (nni_aio_start(aio, zt_pipe_cancel_recv, p) != 0) { + nni_mtx_unlock(&zt_lk); + return; + } + if (p->zp_closed) { + nni_aio_finish_error(aio, NNG_ECLOSED); + } else { + p->zp_user_rxaio = aio; + zt_pipe_dorecv(p); + } + nni_mtx_unlock(&zt_lk); +} + +static uint16_t +zt_pipe_peer(void *arg) +{ + zt_pipe *pipe = arg; + + return (pipe->zp_peer); +} + +static int +zt_getopt_status(zt_node *ztn, uint64_t nwid, void *buf, size_t *szp) +{ + ZT_VirtualNetworkConfig *vcfg; + int status; + + nni_mtx_lock(&zt_lk); + vcfg = ZT_Node_networkConfig(ztn->zn_znode, nwid); + if (vcfg == NULL) { + nni_mtx_unlock(&zt_lk); + return (NNG_ECLOSED); + } + status = vcfg->status; + ZT_Node_freeQueryResult(ztn->zn_znode, vcfg); + nni_mtx_unlock(&zt_lk); + + return (nni_getopt_int(status, buf, szp)); +} + +static int +zt_getopt_network_name(zt_node *ztn, uint64_t nwid, void *buf, size_t *szp) +{ + ZT_VirtualNetworkConfig *vcfg; + int rv; + + nni_mtx_lock(&zt_lk); + vcfg = ZT_Node_networkConfig(ztn->zn_znode, nwid); + if (vcfg == NULL) { + nni_mtx_unlock(&zt_lk); + return (NNG_ECLOSED); + } + rv = nni_getopt_str(vcfg->name, buf, szp); + ZT_Node_freeQueryResult(ztn->zn_znode, vcfg); + nni_mtx_unlock(&zt_lk); + + return (rv); +} + +static int +zt_pipe_getopt(void *arg, int option, void *buf, size_t *szp) +{ + zt_pipe *p = arg; + int rv; + + if (option == nng_optid_recvmaxsz) { + rv = nni_getopt_size(p->zp_rcvmax, buf, szp); + } else if (option == nng_optid_zt_nwid) { + rv = nni_getopt_u64(p->zp_nwid, buf, szp); + } else if (option == nng_optid_zt_node) { + rv = nni_getopt_u64(p->zp_laddr >> 24, buf, szp); + } else if (option == nng_optid_zt_status) { + rv = zt_getopt_status(p->zp_ztn, p->zp_nwid, buf, szp); + } else if (option == nng_optid_zt_network_name) { + rv = zt_getopt_network_name(p->zp_ztn, p->zp_nwid, buf, szp); + } else { + rv = NNG_ENOTSUP; + } + return (rv); +} + +static void +zt_pipe_cancel_ping(nni_aio *aio, int rv) +{ + nni_aio_finish_error(aio, rv); +} + +static void +zt_pipe_ping_cb(void *arg) +{ + zt_pipe *p = arg; + nni_aio *aio = p->zp_ping_aio; + + nni_mtx_lock(&zt_lk); + if (p->zp_closed || aio == NULL || (p->zp_ping_count == 0) || + (p->zp_ping_time == NNI_TIME_NEVER) || + (p->zp_ping_time == NNI_TIME_ZERO)) { + nni_mtx_unlock(&zt_lk); + return; + } + if (nni_aio_result(aio) != NNG_ETIMEDOUT) { + nni_mtx_unlock(&zt_lk); + return; + } + if (p->zp_ping_try < p->zp_ping_count) { + nni_time now = nni_clock(); + nni_aio_set_timeout(aio, now + p->zp_ping_time); + if (now > (p->zp_last_recv + p->zp_ping_time)) { + // We have to send a ping to keep the session up. + if (nni_aio_start(aio, zt_pipe_cancel_ping, p) == 0) { + p->zp_ping_try++; + zt_pipe_send_ping(p); + } + } else { + // We still need the timer to wake us up in case + // we haven't seen traffic for a while. + nni_aio_start(aio, zt_pipe_cancel_ping, p); + } + } else { + // Close the pipe, but no need to send a reason to the + // peer, it is already AFK. + zt_pipe_close_err(p, NNG_ECLOSED, 0, NULL); + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_pipe_start(void *arg, nni_aio *aio) +{ + zt_pipe *p = arg; + + nni_mtx_lock(&zt_lk); + // send a gratuitous ping, and start the ping interval timer. + if ((p->zp_ping_count > 0) && (p->zp_ping_time != NNI_TIME_ZERO) && + (p->zp_ping_time != NNI_TIME_NEVER) && (p->zp_ping_aio != NULL)) { + p->zp_ping_try = 0; + nni_aio_set_timeout(aio, nni_clock() + p->zp_ping_time); + nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p); + zt_pipe_send_ping(p); + } + nni_aio_finish(aio, 0, 0); + nni_mtx_unlock(&zt_lk); +} + +static void +zt_ep_fini(void *arg) +{ + zt_ep *ep = arg; + nni_aio_stop(ep->ze_creq_aio); + nni_aio_fini(ep->ze_creq_aio); + NNI_FREE_STRUCT(ep); +} + +static int +zt_parsehex(const char **sp, uint64_t *valp, int wildok) +{ + int n; + const char *s = *sp; + char c; + uint64_t v; + + if (wildok && *s == '*') { + *valp = 0; + s++; + *sp = s; + return (0); + } + + for (v = 0, n = 0; (n < 16) && isxdigit(c = tolower(*s)); n++, s++) { + v *= 16; + if (isdigit(c)) { + v += (c - '0'); + } else { + v += ((c - 'a') + 10); + } + } + + *sp = s; + *valp = v; + return (n ? 0 : NNG_EINVAL); +} + +static int +zt_parsedec(const char **sp, uint64_t *valp) +{ + int n; + const char *s = *sp; + char c; + uint64_t v; + + for (v = 0, n = 0; (n < 20) && isdigit(c = *s); n++, s++) { + v *= 10; + v += (c - '0'); + } + *sp = s; + *valp = v; + return (n ? 0 : NNG_EINVAL); +} + +static int +zt_ep_init(void **epp, const char *url, nni_sock *sock, int mode) +{ + zt_ep * ep; + size_t sz; + uint64_t nwid; + uint64_t node; + uint64_t port; + int n; + int rv; + char c; + const char *u; + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + + // URL parsing... + // URL is form zt://<nwid>[/<remoteaddr>]:<port> + // The <remoteaddr> part is required for remote dialers, but is + // not used at all for listeners. (We have no notion of binding + // to different node addresses.) + ep->ze_mode = mode; + ep->ze_maxmtu = ZT_MAX_MTU; + ep->ze_phymtu = ZT_MIN_MTU; + ep->ze_aio = NULL; + ep->ze_ping_count = zt_ping_count; + ep->ze_ping_time = zt_ping_time; + ep->ze_proto = nni_sock_proto(sock); + sz = sizeof(ep->ze_url); + + nni_aio_list_init(&ep->ze_aios); + + if ((strncmp(url, "zt://", strlen("zt://")) != 0) || + (nni_strlcpy(ep->ze_url, url, sz) >= sz)) { + zt_ep_fini(ep); + return (NNG_EADDRINVAL); + } + rv = nni_aio_init(&ep->ze_creq_aio, zt_ep_conn_req_cb, ep); + if (rv != 0) { + zt_ep_fini(ep); + return (rv); + } + + u = url + strlen("zt://"); + // Parse the URL. + + switch (mode) { + case NNI_EP_MODE_DIAL: + // We require zt://<nwid>/<remotenode>:<port> + // The remote node must be a 40 bit address + // (max), and we require a non-zero port to + // connect to. + if ((zt_parsehex(&u, &nwid, 0) != 0) || (*u++ != '/') || + (zt_parsehex(&u, &node, 1) != 0) || + (node > 0xffffffffffull) || (*u++ != ':') || + (zt_parsedec(&u, &port) != 0) || (*u != '\0') || + (port > zt_max_port) || (port == 0)) { + return (NNG_EADDRINVAL); + } + ep->ze_raddr = node; + ep->ze_raddr <<= 24; + ep->ze_raddr |= port; + ep->ze_laddr = 0; + break; + case NNI_EP_MODE_LISTEN: + // Listen mode is just zt://<nwid>:<port>. The + // port may be zero in this case, to indicate + // that the server should allocate an ephemeral + // port. We do allow the same form of URL including + // the node address, but that must be zero, a wild + // card, + // or our own node address. + if (zt_parsehex(&u, &nwid, 0) != 0) { + return (NNG_EADDRINVAL); + } + node = 0; + // Look for optional node address. + if (*u == '/') { + u++; + if (zt_parsehex(&u, &node, 1) != 0) { + return (NNG_EADDRINVAL); + } + } + if ((*u++ != ':') || (zt_parsedec(&u, &port) != 0) || + (*u != '\0') || (port > zt_max_port)) { + return (NNG_EADDRINVAL); + } + ep->ze_laddr = node; + ep->ze_laddr <<= 24; + ep->ze_laddr |= port; + ep->ze_raddr = 0; + break; + default: + NNI_ASSERT(0); + break; + } + + ep->ze_nwid = nwid; + + nni_mtx_lock(&zt_lk); + rv = zt_node_find(ep); + nni_mtx_unlock(&zt_lk); + + if (rv != 0) { + zt_ep_fini(ep); + return (rv); + } + + *epp = ep; + return (0); +} + +static void +zt_ep_close(void *arg) +{ + zt_ep * ep = arg; + zt_node *ztn; + nni_aio *aio; + + nni_aio_cancel(ep->ze_creq_aio, NNG_ECLOSED); + + // Cancel any outstanding user operation(s) - they should have + // been aborted by the above cancellation, but we need to be + // sure, as the cancellation callback may not have run yet. + + nni_mtx_lock(&zt_lk); + while ((aio = nni_list_first(&ep->ze_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + + // Endpoint framework guarantees to only call us once, + // and to not call other things while we are closed. + ztn = ep->ze_ztn; + // If we're on the ztn node list, pull us off. + if (ztn != NULL) { + nni_list_node_remove(&ep->ze_link); + nni_idhash_remove(ztn->zn_ports, ep->ze_laddr & zt_port_mask); + nni_idhash_remove(ztn->zn_eps, ep->ze_laddr); + } + + // XXX: clean up the pipe if a dialer + + nni_mtx_unlock(&zt_lk); +} + +static int +zt_ep_bind_locked(zt_ep *ep) +{ + int rv; + uint64_t port; + uint64_t node; + zt_node *ztn; + + // If we haven't already got a ZT node, get one. + if ((ztn = ep->ze_ztn) == NULL) { + if ((rv = zt_node_find(ep)) != 0) { + return (rv); + } + ztn = ep->ze_ztn; + } + + node = ep->ze_laddr >> 24; + if ((node != 0) && (node != ztn->zn_self)) { + // User requested node id, but it doesn't match our + // own. + return (NNG_EADDRINVAL); + } + + if ((ep->ze_laddr & zt_port_mask) == 0) { + // ask for an ephemeral port + if ((rv = nni_idhash_alloc(ztn->zn_ports, &port, ep)) != 0) { + return (rv); + } + NNI_ASSERT(port & zt_ephemeral); + } else { + void *conflict; + // make sure port requested is free. + port = ep->ze_laddr & zt_port_mask; + + if (nni_idhash_find(ztn->zn_ports, port, &conflict) == 0) { + return (NNG_EADDRINUSE); + } + if ((rv = nni_idhash_insert(ztn->zn_ports, port, ep)) != 0) { + return (rv); + } + } + NNI_ASSERT(port <= zt_max_port); + NNI_ASSERT(port > 0); + + ep->ze_laddr = ztn->zn_self; + ep->ze_laddr <<= 24; + ep->ze_laddr |= port; + + if ((rv = nni_idhash_insert(ztn->zn_eps, ep->ze_laddr, ep)) != 0) { + nni_idhash_remove(ztn->zn_ports, port); + return (rv); + } + + return (0); +} + +static int +zt_ep_bind(void *arg) +{ + int rv; + zt_ep *ep = arg; + + nni_mtx_lock(&zt_lk); + rv = zt_ep_bind_locked(ep); + nni_mtx_unlock(&zt_lk); + + return (rv); +} + +static void +zt_ep_cancel(nni_aio *aio, int rv) +{ + zt_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&zt_lk); + if (nni_aio_list_active(aio)) { + if (ep->ze_aio != NULL) { + nni_aio_cancel(ep->ze_aio, rv); + } + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_ep_doaccept(zt_ep *ep) +{ + // Call with ep lock held. + nni_time now; + zt_pipe *p; + int rv; + + now = nni_clock(); + // Consume any timedout connect requests. + while (ep->ze_creq_tail != ep->ze_creq_head) { + zt_creq creq; + nni_aio *aio; + + creq = ep->ze_creqs[ep->ze_creq_tail % zt_listenq]; + // Discard old connection requests. + if (creq.cr_expire < now) { + ep->ze_creq_tail++; + continue; + } + + if ((aio = nni_list_first(&ep->ze_aios)) == NULL) { + // No outstanding accept. We're done. + break; + } + + // We have both conn request, and a place to accept it. + + // Advance the tail. + ep->ze_creq_tail++; + + // We remove this AIO. This keeps it from being canceled. + nni_aio_list_remove(aio); + + rv = zt_pipe_init(&p, ep, creq.cr_raddr, ep->ze_laddr); + if (rv != 0) { + zt_send_err(ep->ze_ztn, ep->ze_nwid, creq.cr_raddr, + ep->ze_laddr, zt_err_unknown, + "Failed creating pipe"); + nni_aio_finish_error(aio, rv); + continue; + } + p->zp_peer = creq.cr_proto; + + zt_pipe_send_conn_ack(p); + nni_aio_finish_pipe(aio, p); + } +} + +static void +zt_ep_accept(void *arg, nni_aio *aio) +{ + zt_ep *ep = arg; + + nni_mtx_lock(&zt_lk); + if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) { + nni_aio_list_append(&ep->ze_aios, aio); + zt_ep_doaccept(ep); + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_ep_conn_req_cancel(nni_aio *aio, int rv) +{ + // We don't have much to do here. The AIO will have been + // canceled as a result of the "parent" AIO canceling. + nni_aio_finish_error(aio, rv); +} + +static void +zt_ep_conn_req_cb(void *arg) +{ + zt_ep * ep = arg; + zt_pipe *p; + nni_aio *aio = ep->ze_creq_aio; + nni_aio *uaio; + int rv; + + NNI_ASSERT(ep->ze_mode == NNI_EP_MODE_DIAL); + + nni_mtx_lock(&zt_lk); + rv = nni_aio_result(aio); + switch (rv) { + case 0: + // Already canceled, or already handled? + if (((uaio = nni_list_first(&ep->ze_aios)) == NULL) || + ((p = nni_aio_get_pipe(aio)) == NULL)) { + nni_mtx_unlock(&zt_lk); + return; + } + ep->ze_creq_try = 0; + nni_aio_list_remove(uaio); + nni_aio_finish_pipe(uaio, p); + nni_mtx_unlock(&zt_lk); + return; + + case NNG_ETIMEDOUT: + if (ep->ze_creq_try <= zt_conn_attempts) { + // Timed out, but we can try again. + ep->ze_creq_try++; + nni_aio_set_timeout( + aio, nni_clock() + zt_conn_interval); + nni_aio_start(aio, zt_ep_conn_req_cancel, ep); + zt_ep_send_conn_req(ep); + nni_mtx_unlock(&zt_lk); + return; + } + break; + } + + // These are failure modes. Either we timed out too many + // times, or an error occurred. + + ep->ze_creq_try = 0; + while ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { + nni_aio_list_remove(uaio); + nni_aio_finish_error(uaio, rv); + } + nni_mtx_unlock(&zt_lk); +} + +static void +zt_ep_connect(void *arg, nni_aio *aio) +{ + zt_ep * ep = arg; + + // We bind locally. We'll use the address later when we give + // it to the pipe, but this allows us to receive the initial + // ack back from the server. (This gives us an ephemeral + // address to work with.) + nni_mtx_lock(&zt_lk); + + if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) { + nni_time now = nni_clock(); + int rv; + + // Clear the port so we get an ephemeral port. + ep->ze_laddr &= ~((uint64_t) zt_port_mask); + + if ((rv = zt_ep_bind_locked(ep)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&zt_lk); + return; + } + + if ((ep->ze_raddr >> 24) == 0) { + ep->ze_raddr |= (ep->ze_ztn->zn_self << 24); + } + nni_aio_list_append(&ep->ze_aios, aio); + + ep->ze_creq_try = 1; + + nni_aio_set_timeout(ep->ze_creq_aio, now + zt_conn_interval); + // This can't fail -- the only way the ze_creq_aio gets + // terminated would have required us to have also + // canceled the user AIO and held the lock. + (void) nni_aio_start( + ep->ze_creq_aio, zt_ep_conn_req_cancel, ep); + + // We send out the first connect message; it we are not + // yet attached to the network the message will be dropped. + zt_ep_send_conn_req(ep); + } + nni_mtx_unlock(&zt_lk); +} + +static int +zt_ep_setopt(void *arg, int opt, const void *data, size_t size) +{ + zt_ep *ep = arg; + int i; + int rv = NNG_ENOTSUP; + + if (opt == nng_optid_recvmaxsz) { + nni_mtx_lock(&zt_lk); + rv = nni_setopt_size( + &ep->ze_rcvmax, data, size, 0, 0xffffffffu); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_home) { + // XXX: check to make sure not started... + i = nni_strnlen((const char *) data, size); + if ((i >= size) || (i >= NNG_MAXADDRLEN)) { + return (NNG_EINVAL); + } + nni_mtx_lock(&zt_lk); + nni_strlcpy(ep->ze_home, data, sizeof(ep->ze_home)); + rv = zt_node_find(ep); + if (rv != 0) { + ep->ze_ztn = NULL; + } + nni_mtx_unlock(&zt_lk); + rv = 0; + } else if (opt == nng_optid_zt_ping_count) { + nni_mtx_lock(&zt_lk); + rv = + nni_setopt_int(&ep->ze_ping_count, data, size, 0, 1000000); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_ping_time) { + nni_mtx_lock(&zt_lk); + rv = nni_setopt_usec(&ep->ze_ping_time, data, size); + nni_mtx_unlock(&zt_lk); + } + return (rv); +} + +static int +zt_ep_getopt(void *arg, int opt, void *data, size_t *szp) +{ + zt_ep *ep = arg; + int rv = NNG_ENOTSUP; + + if (opt == nng_optid_recvmaxsz) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_size(ep->ze_rcvmax, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_home) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_str(ep->ze_home, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_node) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_u64(ep->ze_ztn->zn_self, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_nwid) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_u64(ep->ze_nwid, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_ping_count) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_int(ep->ze_ping_count, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_ping_time) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_usec(ep->ze_ping_time, data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_local_port) { + nni_mtx_lock(&zt_lk); + rv = nni_getopt_int( + (int) (ep->ze_laddr & zt_port_mask), data, szp); + nni_mtx_unlock(&zt_lk); + } else if (opt == nng_optid_zt_network_name) { + rv = + zt_getopt_network_name(ep->ze_ztn, ep->ze_nwid, data, szp); + } else if (opt == nng_optid_zt_status) { + rv = zt_getopt_status(ep->ze_ztn, ep->ze_nwid, data, szp); + } + return (rv); +} + +static nni_tran_pipe zt_pipe_ops = { + .p_fini = zt_pipe_fini, + .p_start = zt_pipe_start, + .p_send = zt_pipe_send, + .p_recv = zt_pipe_recv, + .p_close = zt_pipe_close, + .p_peer = zt_pipe_peer, + .p_getopt = zt_pipe_getopt, +}; + +static nni_tran_ep zt_ep_ops = { + .ep_init = zt_ep_init, + .ep_fini = zt_ep_fini, + .ep_connect = zt_ep_connect, + .ep_bind = zt_ep_bind, + .ep_accept = zt_ep_accept, + .ep_close = zt_ep_close, + .ep_setopt = zt_ep_setopt, + .ep_getopt = zt_ep_getopt, +}; + +// This is the ZeroTier transport linkage, and should be the +// only global symbol in this entire file. +static struct nni_tran zt_tran = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "zt", + .tran_ep = &zt_ep_ops, + .tran_pipe = &zt_pipe_ops, + .tran_chkopt = zt_chkopt, + .tran_init = zt_tran_init, + .tran_fini = zt_tran_fini, +}; + +int +nng_zt_register(void) +{ + return (nni_tran_register(&zt_tran)); +} + +#endif |
