aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt40
-rw-r--r--src/CMakeLists.txt8
-rw-r--r--src/core/platform.h27
-rw-r--r--src/nng.c13
-rw-r--r--src/nng.h43
-rw-r--r--src/platform/posix/posix_udp.c2
-rw-r--r--src/transport/zerotier/zerotier.adoc383
-rw-r--r--src/transport/zerotier/zerotier.c2677
-rw-r--r--tests/CMakeLists.txt4
-rw-r--r--tests/sock.c1
-rw-r--r--tests/trantest.h2
-rw-r--r--tests/zt.c195
12 files changed, 3378 insertions, 17 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d95206c5..c2815ca3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,4 +1,6 @@
#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
# Copyright (c) 2012 Martin Sustrik All rights reserved.
# Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
# Copyright (c) 2015-2016 Jack R. Dunaway. All rights reserved.
@@ -91,7 +93,8 @@ option (NNG_TESTS "Build and run tests" ON)
option (NNG_TOOLS "Build extra tools" OFF)
option (NNG_ENABLE_NNGCAT "Enable building nngcat utility." ${NNG_TOOLS})
option (NNG_ENABLE_COVERAGE "Enable coverage reporting." OFF)
-
+option (NNG_ENABLE_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF)
+set (NNG_ZEROTIER_SOURCE "" CACHE PATH "Location of ZeroTier source tree.")
# Enable access to private APIs for our own use.
add_definitions (-DNNG_PRIVATE)
@@ -244,6 +247,39 @@ nng_check_sym (strlcat string.h NNG_HAVE_STRLCAT)
nng_check_sym (strlcpy string.h NNG_HAVE_STRLCPY)
nng_check_sym (strnlen string.h NNG_HAVE_STRNLEN)
+# Search for ZeroTier
+# We use the libzerotiercore.a library, which is unfortunately a C++ object
+# even though it exposes only public C symbols. It would be extremely
+# helpful if libzerotiercore didn't make us carry the whole C++ runtime
+# behind us. The user must specify the location of the ZeroTier source
+# tree (dev branch for now, and already compiled please) by setting the
+# NNG_ZEROTIER_SOURCE macro.
+# NB: This needs to be the zerotierone tree, not the libzt library.
+# This is because we don't access the API, but instead use the low
+# level zerotiercore functionality directly.
+# NB: As we wind up linking libzerotiercore.a into the application,
+# this means that your application will *also* need to either be licensed
+# under the GPLv3, or you will need to have a commercial license from
+# ZeroTier permitting its use elsewhere.
+if (NNG_ENABLE_ZEROTIER)
+ enable_language(CXX)
+ find_library(NNG_LIBZTCORE zerotiercore PATHS ${NNG_ZEROTIER_SOURCE})
+ if (NNG_LIBZTCORE)
+ set(CMAKE_REQUIRED_INCLUDES ${NNG_ZEROTIER_SOURCE}/include)
+# set(CMAKE_REQUIRED_LIBRARIES ${NNG_LIBZTCORE} c++)
+# set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} c++)
+ message(STATUS "C++ ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}")
+ set(CMAKE_REQUIRED_LIBRARIES ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES})
+ set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES})
+ set(NNG_REQUIRED_INCLUDES ${NNG_REQUIRED_INCLUDES} ${NNG_ZEROTIER_SOURCE}/include)
+ nng_check_sym(ZT_Node_join ZeroTierOne.h NNG_HAVE_ZEROTIER)
+ endif()
+ if (NOT NNG_HAVE_ZEROTIER)
+ message (FATAL_ERROR "Cannot find ZeroTier components")
+ endif()
+ message(STATUS "Found ZeroTier at ${NNG_LIBZTCORE}")
+endif()
+
add_subdirectory (src)
if (NNG_TESTS)
@@ -253,6 +289,7 @@ if (NNG_TESTS)
add_subdirectory (perf)
endif()
+
# Build the tools
if (NNG_ENABLE_NNGCAT)
@@ -270,6 +307,7 @@ if (NNG_ENABLE_DOC)
endif ()
endif ()
+
# Build the documenation
if (NNG_ENABLE_DOC)
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1d616e95..17b14d2e 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -99,6 +99,7 @@ set (NNG_SOURCES
transport/ipc/ipc.c
transport/tcp/tcp.c
+
)
if (NNG_PLATFORM_POSIX)
@@ -140,7 +141,12 @@ if (NNG_PLATFORM_WINDOWS)
)
endif()
-include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src)
+if (NNG_ENABLE_ZEROTIER)
+ set (NNG_SOURCES ${NNG_SOURCES} transport/zerotier/zerotier.c)
+endif()
+
+include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src
+ ${NNG_REQUIRED_INCLUDES})
# Provide same folder structure in IDE as on disk
foreach (f ${NNG_SOURCES})
diff --git a/src/core/platform.h b/src/core/platform.h
index 02ea9a11..7bfb370a 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -350,6 +350,33 @@ extern void nni_plat_pipe_clear(int);
// routine.
extern void nni_plat_pipe_close(int, int);
+//
+// File/Store Support
+//
+// Some transports require a persistent storage for things like
+// key material, etc. Generally, these are all going to be relatively
+// small objects (such as certificates), so we ony require a synchronous
+// implementation from platforms. This Key-Value API is intended to
+// to support using the Key's as filenames, and keys will consist of
+// only these characters: [0-9a-zA-Z._-]. The directory used should be
+// determined by using an environment variable (NNG_STATE_DIR), or
+// using some other application-specific method.
+//
+
+// nni_plat_file_put writes the named file, with the provided data,
+// and the given size. If the file already exists it is overwritten.
+// The permissions on the file should be limited to read and write
+// access by the entity running the application only.
+extern int nni_plat_file_put(const char *, const void *, int);
+
+// nni_plat_file_get reads the entire named file, allocating storage
+// to receive the data and returning the data and the size in the
+// reference arguments.
+extern int nni_plat_file_get(const char *, void **, int *);
+
+// nni_plat_file_delete deletes the named file.
+extern int nni_plat_file_delete(const char *);
+
// Actual platforms we support. This is included up front so that we can
// get the specific types that are supplied by the platform.
#if defined(NNG_PLATFORM_POSIX)
diff --git a/src/nng.c b/src/nng.c
index 32004895..ef821a50 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -19,6 +19,7 @@
// Pretty much every function calls the nni_platform_init to check against
// fork related activity.
+#include <stdio.h>
#include <string.h>
void
@@ -650,6 +651,7 @@ static const struct {
const char *
nng_strerror(int num)
{
+ static char unknownerrbuf[32];
for (int i = 0; nni_errors[i].msg != NULL; i++) {
if (nni_errors[i].code == num) {
return (nni_errors[i].msg);
@@ -660,7 +662,16 @@ nng_strerror(int num)
return (nni_plat_strerror(num & ~NNG_ESYSERR));
}
- return ("Unknown error");
+ if (num & NNG_ETRANERR) {
+ static char tranerrbuf[32];
+ (void) snprintf(tranerrbuf, sizeof(tranerrbuf),
+ "Transport error #%d", num & ~NNG_ETRANERR);
+ return (tranerrbuf);
+ }
+
+ (void) snprintf(
+ unknownerrbuf, sizeof(unknownerrbuf), "Unknown error #%d", num);
+ return (unknownerrbuf);
}
int
diff --git a/src/nng.h b/src/nng.h
index 492ba091..ef8cdddd 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -557,6 +557,23 @@ NNG_DECL void nng_thread_destroy(void *);
// Error codes. These may happen to align to errnos used on your platform,
// but do not count on this.
+//
+// NNG_SYSERR is a special code, which allows us to wrap errors from the
+// underlying operating system. We generally prefer to map errors to one
+// of the above, but if we cannot, then we just encode an error this way.
+// The bit is large enough to accommodate all known UNIX and Win32 error
+// codes. We try hard to match things semantically to one of our standard
+// errors. For example, a connection reset or aborted we treat as a
+// closed connection, because that's basically what it means. (The remote
+// peer closed the connection.) For certain kinds of resource exhaustion
+// we treat it the same as memory. But for files, etc. that's OS-specific,
+// and we use the generic below. Some of the above error codes we use
+// internally, and the application should never see (e.g. NNG_EINTR).
+//
+// NNG_ETRANERR is like ESYSERR, but is used to wrap transport specific
+// errors, from different transports. It should only be used when none
+// of the other options are available.
+
enum nng_errno_enum {
NNG_EINTR = 1,
NNG_ENOMEM = 2,
@@ -582,21 +599,11 @@ enum nng_errno_enum {
NNG_ENOSPC = 22,
NNG_EEXIST = 23,
NNG_EINTERNAL = 24,
+ NNG_ETRANSPORT = 25,
+ NNG_ESYSERR = 0x10000000,
+ NNG_ETRANERR = 0x20000000,
};
-// NNG_SYSERR is a special code, which allows us to wrap errors from the
-// underlyuing operating system. We generally prefer to map errors to one
-// of the above, but if we cannot, then we just encode an error this way.
-// The bit is large enough to accommodate all known UNIX and Win32 error
-// codes. We try hard to match things semantically to one of our standard
-// errors. For example, a connection reset or aborted we treat as a
-// closed connection, because that's basically what it means. (The remote
-// peer closed the connection.) For certain kinds of resource exhaustion
-// we treat it the same as memory. But for files, etc. that's OS-specific,
-// and we use the generic below. Some of the above error codes we use
-// internally, and the application should never see (e.g. NNG_EINTR).
-#define NNG_ESYSERR (0x10000000)
-
// Maximum length of a socket address. This includes the terminating NUL.
// This limit is built into other implementations, so do not change it.
#define NNG_MAXADDRLEN (128)
@@ -627,9 +634,17 @@ struct nng_sockaddr_in {
uint16_t sa_port;
uint32_t sa_addr;
};
+
+struct nng_sockaddr_zt {
+ uint64_t sa_nwid;
+ uint64_t sa_nodeid;
+ uint32_t sa_port;
+};
+
typedef struct nng_sockaddr_in nng_sockaddr_in;
typedef struct nng_sockaddr_in nng_sockaddr_udp;
typedef struct nng_sockaddr_in nng_sockaddr_tcp;
+typedef struct nng_sockaddr_zt nng_sockaddr_zt;
typedef struct nng_sockaddr {
union {
@@ -637,6 +652,7 @@ typedef struct nng_sockaddr {
nng_sockaddr_path s_path;
nng_sockaddr_in6 s_in6;
nng_sockaddr_in s_in;
+ nng_sockaddr_zt s_zt;
} s_un;
} nng_sockaddr;
@@ -646,6 +662,7 @@ enum nng_sockaddr_family {
NNG_AF_IPC = 2,
NNG_AF_INET = 3,
NNG_AF_INET6 = 4,
+ NNG_AF_ZT = 5, // ZeroTier
};
#ifdef __cplusplus
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 552730da..05f7bea1 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -210,7 +210,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
}
// UDP opens can actually run synchronously.
- if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) {
+ if ((udp = NNI_ALLOC_STRUCT(udp)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&udp->udp_mtx);
diff --git a/src/transport/zerotier/zerotier.adoc b/src/transport/zerotier/zerotier.adoc
new file mode 100644
index 00000000..dc838420
--- /dev/null
+++ b/src/transport/zerotier/zerotier.adoc
@@ -0,0 +1,383 @@
+ZeroTier Mapping for Scalability Protocols
+===========================================
+
+sp-zerotier-mapping-06
+~~~~~~~~~~~~~~~~~~~~~~
+
+Abstract
+--------
+
+This document defines the ZeroTier mapping for scalability protocols.
+
+Status of This Memo
+-------------------
+
+This is the third draft document, and is intended to guide early
+development efforts. Nothing here is finalized yet.
+
+Copyright Notice
+----------------
+
+Copyright 2017 Garrett D'Amore <garrett@damore.org> +
+Copyright 2017 Capitar IT Group BV <info@capitar.com>
+
+At this point, all rights are reserved. (Note that we do intend to
+release this under a liberal reuse license once it stabilizes a bit.)
+
+Underlying protocol
+-------------------
+
+ZeroTier expresses an 802.3 style layer 2, where frames maybe exchanged as if
+they were Ethernet frames. Virtual broadcast domains are created within a
+numbered "network", and frames may then be exchanged with any peers on that
+network.
+
+Frames may arrive in any order, or be lost, just a with Ethernet
+(best effort delivery), but they are strongly protected by a
+cryptographic checksum, so frames that do arrive will be uncorrupted.
+Furthermore, ZeroTier guarantees that a given frame will be received
+at most once.
+
+Each application on a ZeroTier network has its own address, called a
+ZeroTier ID (`ZTID`), which is globally unique -- this is generated
+from a hash of the public key associated with the application.
+
+A given application may participate in multiple ZeroTier networks.
+
+Sharing of ZeroTier IDs between applications, as well as use of multiple
+ZTIDs within a single application, as well as management of the associated
+ZeroTier-specific state is out of scope for this document.
+
+ZeroTier networks have a standard MTU of 2800 bytes, but over typical
+public networks an "optimum" MTU of 1400 bytes is used.
+ZeroTier may be configured to have larger MTUs, but typically this involves
+extensive reassembly at underlying layers, and implementations *SHOULD*
+use the optimum MTU advertised by the ZeroTier implementation.
+
+
+Packet layout
+~~~~~~~~~~~~~
+
+Each SP message sent over ZeroTier is comprised of one or
+more fragments, where each fragment is mapped to a single underlying
+ZeroTier L2 frame. We use the EtherType field of 0x0901 to indicate
+SP over ZeroTier protocol (number to be registered with IEEE).
+
+The ZeroTier L2 payload shall be encoded with a header as follows:
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | op | flags | version |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | reserved | destination port |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | reserved | source port |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | op-specific payload...
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+All numeric fields are in big-endian byte order. Note that ZeroTier
+APIs present this as the L2 payload, but ZeroTier itself may prepend
+additional data such as the Ethernet type, and source and destination
+MAC addresses, as well as ZeroTier specific headers. The details of
+such headers are out of scope for this document.
+
+As above, the start of each frame is just as a normal Ethernet payload.
+The Ethernet type (ethertype) we use for these frames is 0x901, with
+a VLAN ID of 0.
+
+The `op` is a field that indicates the type of message being sent. The
+following values are defined: `DATA` (0x00), `CONN-REQ` (0x10),
+`CONN-ACK` (0x12), `DISC` (0x20), `PING` (0x30), `PONG` (0x32),
+and `ERR` (0x40). These are discussed further below. Implementations
+*MUST* discard messages where the `op` is not one of these.
+
+The `flags` field is reserved for future use, and *MUST* be zero.
+Implementations *MUST* discard frames for which this is not true.
+
+The `version` byte MUST be set to 0x1. Implementations *MUST* discard
+any messages received for any other version.
+
+The `source port` and `destination port` are used to construct a logical
+conversation. These are 24-bits wide, and are discussed further below.
+The `reserved` fields must be set to zero.
+
+The remainder of frame varies depending on the `op` used.
+
+
+The port fields are used to discriminate different uses, allowing one
+application to have multiple connections or sockets open. The
+purpose is analogous to TCP port numbers, except that instead of the
+operating system performing the discrimination the application or
+library code must do so. Note that port numbers are 24-bits. This
+was chosen to allow a peer to allocate a unique port number for each
+local conversation, allowing up to 16 million concurrent conversations.
+This also allows a 40-bit node number to be combined with the 24-bit
+port number to create a 64-bit unique address.
+
+The `type` field is the numeric SP protocol ID, in big-endian form.
+When receiving a message for a port, if the SP protocol ID does not
+match the SP protocol expected on that port, the implementation *MUST*
+discard this message.
+
+Note that it is not by accident that the payload is 32-bit aligned in
+this message format. The payload is actually 64-bit aligned.
+
+
+Note that at this time, broadcast and multicast is not supported by
+this mapping. (A future update may resolve this.)
+
+DATA messages
+~~~~~~~~~~~~~
+
+`DATA` messages carry SP protocol payload data. They can only be sent
+on an established session (see `CONN` messages below), and are never
+acknowledged (in this version). The op-specific payload they carry
+is formed like this:
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | message ID | fragment size |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | fragment number | total fragments |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | user data...
+ +-+-+-+-+-+-+-+-
+
+All fragments, except for the last, *MUST* be the same size. The fragment
+size field carries the size of every fragment, except that the last
+fragment may be shorter; however even for the last fragment, the fragment
+size *MUST* be the size of the rest of the fragments. This is necessary
+to allow a receiver to know the fragment size of the other fragments even
+if the final fragment is received before any others. (Typically this may
+occur if a message consisting of two fragments arrives with fragments
+out of order.)
+
+The last fragment shall have the fragment number equal to
+the total fragments minus one, and the first fragment shall have fragment
+number 0. Under typical optimal conditions, with an optimal MTU of 1400
+bytes, the largest message that can be transmitted is approximately 86 MB.
+Specifically the limit is (65534 * (1400 - 20)) = 90,436,920 bytes.
+(Larger MTUs may be used, if the implementation determines that it is
+advantageous to do so. Doing so would necessarily give a larger maximum
+message size.)
+
+However, transmitting such a large message would require sending over
+65 thousand fragments, and given the likelihood of fragment loss, and
+the lack of acknowledgment, it is likely that the entire message would
+be lost. As a result, implementations are encouraged to limit the
+amount of data that they send to at most a few megabytes. Implementations
+receiving the first fragment can easily calculate the worst case for
+the message size (the size of the user payload multiplied by the total
+number of fragments), and MAY reply to the sender with an `ERR` message
+using the code 0x05, indicating that the message is larger than the
+receiver is willing to accept.
+
+Each fragment for a given message must carry the same `message ID`.
+Implementations *MUST* initialize this to a random value when starting
+a conversation, and *MUST* increment this each time a new message is sent.
+Message IDs of zero are not permitted; implementations *MUST* skip past zero
+when incrementing message IDs.
+
+Implementations may detect the loss of a message by noticing skips in the
+message IDs that are received, accounting for the expected skip past zero.
+
+Note that no field conveys the length of the fragment itself, as
+this can be determined from the L2 length -- the user data within
+the fragment extends to the end of the L2 payload supplied by ZeroTier.
+(And, all fragments other than the final fragment for a message must
+therefore have the same length.)
+
+
+CONN-REQ and CONN-ACK messages
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+`CONN-REQ` frames represent a request from an initiator to establish a
+session, i.e. a new conversation or connection, and `CONN-ACK`
+messages are the normal successful reply from the responder. They both
+take the same form, which consists of the usual headers along with the
+senders 16-bit (big-endian) SP protocol ID appended:
+
+ 0 1
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | SP protocol ID |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+The connection is initiated by the initiator sending this message,
+with its own SP protocol ID, with the `op` set to `CONN-REQ`.
+The initiator must choose a `source port` number that is not currently
+being used with the remote peer. (Most implementations will choose a
+a source port that is not used at all. Source port numbers *SHOULD*
+be chosen randomly.)
+
+The responder will acknowledge this by replying with its SP protocol
+ID in the 4-byte payload, using the `CONN-ACK` op. Additionally,
+the source port number that the responder replies with *MUST* be the
+one the intiator requested.
+
+(Responders will identify the session using the initiators chosen
+`source port`, which the initiator *MUST NOT* concurrently use for any
+other sessions.)
+
+Alternatively, a responder *MAY* reject the connection attempt by
+sending a suitably formed ERR message (see below).
+
+If a sender does not receive a reply, it *SHOULD* retry this message
+before giving up and reporting an error to the user. It is recommended
+that a configurable number of retries and time interval be used.
+
+Given modern Internet latencies of generally less than 500 ms, resending
+up to 12 `CONN-REQ` requests, once every 5 seconds, before giving up seems
+reasonable. (These times are somewhat larger to allow for ZeroTier
+path discovery to take place; this results in a timeout of approximately
+a minute.)
+
+The initiator *MUST NOT* send any `DATA` messages for a conversation until
+it has received an ACK from the other party, and it *MUST* send all further
+messages for the conversation to the port number supplied by the responder.
+
+If a `CONN-REQ` frame is received by a responder for a conversation that already
+exists, the responder MUST reply. Further, the source port it replies with,
+and the SP protocol IDs MUST be identical to what it first sent. This
+ensures that the `CONN-REQ` request is idempotent.
+
+DISC messages
+~~~~~~~~~~~~~
+
+DISC messages are used to request a session be terminated. This
+notifies the remote sender that no more data will be sent or
+accepted, and the session resources may be released. There is no
+payload. There is no acknowledgment.
+
+PING and PONG messages
+~~~~~~~~~~~~~~~~~~~~~~
+
+In order to keep session state, implementations will generally store
+data for each session. In order to prevent a stale session from
+consuming these resources forever, and in order to keep underlying
+ZeroTier sessions alive, a `PING` message *MAY* be sent to a peer
+with whom a session has been established. This message has no payload.
+
+If the `PING` is is successful, then the responder *MUST* reply with a `PONG`
+message. As with `PING`, the `PONG` message carries no payload.
+
+There is no response to a `PONG` message.
+
+In the event of an error, an implementation *MAY*_ reply with an `ERR`
+message.
+
+Implementations *SHOULD NOT* initiate `PING` messages if they have either
+received other session messages recently.
+
+Implementations *SHOULD* use a timeout T1 seconds of be used before
+initiating a message the first time, and that in the absence of a
+reply, up to N further attempts be made, separated by T2 seconds. If
+no reply to the Nth attempt is received after T2 seconds have passed,
+then the remote peer should be assumed offline or dead, and the
+session closed.
+
+The values for T1, T2, and N *SHOULD* be configurable, with
+recommended default values of 60, 10, and 5. With these values,
+sessions that appear dead after 2 minutes will be closed, and their
+resources reclaimed.
+
+ERR messages
+~~~~~~~~~~~~
+
+`ERR` messages indicate a failure in the session, and abruptly
+terminate the session. The payload for these messages consists of a
+single byte error code, followed by an ASCII message describing the
+error (not terminated by zero). This message *MUST NOT* be more than
+128 bytes in length.
+
+The following error codes are defined:
+
+ * 0x01 No party listening at that address or port.
+ * 0x02 No such session found.
+ * 0x03 SP protocol ID invalid.
+ * 0x04 Generic protocol error.
+ * 0x05 Message size too big.
+ * 0xff Other uncategorized error.
+
+Implementations *MUST* discard any session state upon receiving an ERR
+message. These messages are not acknowledged.
+
+Reassembly Guidelines
+~~~~~~~~~~~~~~~~~~~~~
+
+Implementations *MUST* accept and reassemble fragmented `DATA` messages.
+Implementations *MUST* discard fragmented messages of other types.
+
+Messages larger than the ZeroTier MTU *MUST* be fragmented.
+
+Implementations *SHOULD* limit the number of unassembled messages
+retained for reassembly, to minimize the likelihood of intentional
+abuse. It is suggested that at most 2 unassembled messages be
+retained. It is further suggested that if 2 or more unfragmented
+messages arrive before a message is reassembled, or more than 5
+seconds pass before the reassembly is complete, that the unassembled
+fragments be discarded.
+
+
+Ports
+~~~~~
+
+The port numbers are 24-bit fields, allowing a single ZT ID to
+service multiple application layer protocols, which could be treated
+as separate end points, or as separate sockets in the application.
+The implementation is responsible for discriminating on these and
+delivering to the appropriate consumer.
+
+As with UDP or TCP, it is intended that each party have its own port
+number, and that a pair of ports (combined with ZeroTier IDs) be used
+to identify a single conversation.
+
+An SP server should allocate a port for number advertisement. It is
+expected clients will generate ephemeral port numbers.
+
+Implementations are free to choose how to allocate port numbers, but
+it is recommended manually configured port numbers are small, with
+the high order bit clear, and that numbers > 2^23 (high order bit
+set) be used for ephemeral allocations.
+
+It is recommended that separate short queues (perhaps just one or two
+messages long) be kept per local port in implementations, to prevent
+head-of-line blocking issues where backpressure on one consumer
+(perhaps just a single thread or socket) blocks others.
+
+URI Format
+~~~~~~~~~~
+
+The URI scheme used to represent ZeroTier addresses makes use of
+ZeroTier IDs, ZeroTier network IDs, and our own 24-bit ports.
+
+The format shall be `zt://<nwid>/<ztid>:<port>`, where the `<nwid>`
+component represents the 64-bit hexadecimal ZeroTier network ID,
+the `<ztid>` represents the 40-bit hexadecimal ZeroTier Device ID,
+and the `<port>` is the 24-bit port number previously described.
+
+A responder may elide the `<ztid>/` portion, to just bind to itself,
+in which case the format will be `zt://<nwid>/<ztid>:<port>`.
+
+A port number of 0 may be used when listening to indicate that a random
+ephemeral port should be chosen.
+
+An implementation *MAY* allow the `<ztid>` t0 be replaced with `*` to
+indicate that the node's local ZT_ID be used.
+
+// XXX: the ztid could use DNS names, generating 6PLANE IP addresses,
+// and extracting the 10 digit device id from that. Note that there
+// is no good way to determine a nwid automatically. The 6PLANE
+// address is determined by a non-reversible XOR transform of the
+// network id.
+
+Security Considerations
+~~~~~~~~~~~~~~~~~~~~~~~
+
+The mapping isn't intended to provide any additional security beyond that
+provided by ZeroTier itself. Managing the key materials used by ZeroTier
+is implementation-specific, and they must take the appropriate care when
+dealing with them.
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
new file mode 100644
index 00000000..40af6e54
--- /dev/null
+++ b/src/transport/zerotier/zerotier.c
@@ -0,0 +1,2677 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifdef NNG_HAVE_ZEROTIER
+#include <ctype.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+#ifndef _WIN32
+#include <unistd.h>
+#endif
+
+#include <ZeroTierOne.h>
+
+const char *nng_opt_zt_home = "zt:home";
+const char *nng_opt_zt_nwid = "zt:nwid";
+const char *nng_opt_zt_node = "zt:node";
+const char *nng_opt_zt_status = "zt:status";
+const char *nng_opt_zt_network_name = "zt:network-name";
+const char *nng_opt_zt_local_port = "zt:local-port";
+const char *nng_opt_zt_ping_time = "zt:ping-time";
+const char *nng_opt_zt_ping_count = "zt:ping-count";
+
+int nng_optid_zt_home = -1;
+int nng_optid_zt_nwid = -1;
+int nng_optid_zt_node = -1;
+int nng_optid_zt_status = -1;
+int nng_optid_zt_network_name = -1;
+int nng_optid_zt_ping_time = -1;
+int nng_optid_zt_ping_count = -1;
+int nng_optid_zt_local_port = -1;
+
+// These values are supplied to help folks checking status. They are the
+// return values from nng_optid_zt_status.
+int nng_zt_status_configuring = ZT_NETWORK_STATUS_REQUESTING_CONFIGURATION;
+int nng_zt_status_ok = ZT_NETWORK_STATUS_OK;
+int nng_zt_status_denied = ZT_NETWORK_STATUS_ACCESS_DENIED;
+int nng_zt_status_notfound = ZT_NETWORK_STATUS_NOT_FOUND;
+int nng_zt_status_error = ZT_NETWORK_STATUS_PORT_ERROR;
+int nng_zt_status_obsolete = ZT_NETWORK_STATUS_CLIENT_TOO_OLD;
+
+// ZeroTier Transport. This sits on the ZeroTier L2 network, which itself
+// is implemented on top of UDP. This requires the 3rd party
+// libzerotiercore library (which is GPLv3!) and platform specific UDP
+// functionality to be built in. Note that care must be taken to link
+// dynamically if one wishes to avoid making your entire application GPL3.
+// (Alternatively ZeroTier offers commercial licenses which may prevent
+// this particular problem.) This implementation does not make use of
+// certain advanced capabilities in ZeroTier such as more sophisticated
+// route management and TCP fallback. You need to have connectivity
+// to the Internet to use this. (Or at least to your Planetary root.)
+//
+// Because ZeroTier takes a while to establish connectivity, it is even
+// more important that applicaitons using the ZeroTier transport not
+// assume that a connection will be immediately available. It can take
+// quite a few seconds for peer-to-peer connectivity to be established.
+//
+// The ZeroTier transport was funded by Capitar IT Group, BV.
+//
+// This transport is highly experimental.
+
+// ZeroTier and UDP are connectionless, but nng is designed around
+// connection oriented paradigms. An "unreliable" connection is created
+// on top using our own network protocol. The details of this are
+// documented in the RFC.
+
+// Every participant has an "address", which is a 64-bit value constructed
+// using the ZT node number in the upper 40-bits, and a 24-bit port number
+// in the lower bits. We elect to operate primarily on these addresses,
+// but the wire protocol relies on just conveying the 24-bit port along
+// with the MAC address (from which the ZT node number can be derived,
+// given the network ID.)
+
+typedef struct zt_pipe zt_pipe;
+typedef struct zt_ep zt_ep;
+typedef struct zt_node zt_node;
+typedef struct zt_frag zt_frag;
+typedef struct zt_fraglist zt_fraglist;
+
+// Port numbers are stored as 24-bit values in network byte order.
+#define ZT_GET24(ptr, v) \
+ v = (((uint32_t)((uint8_t)(ptr)[0])) << 16) + \
+ (((uint32_t)((uint8_t)(ptr)[1])) << 8) + \
+ (((uint32_t)(uint8_t)(ptr)[2]))
+
+#define ZT_PUT24(ptr, u) \
+ do { \
+ (ptr)[0] = (uint8_t)(((uint32_t)(u)) >> 16); \
+ (ptr)[1] = (uint8_t)(((uint32_t)(u)) >> 8); \
+ (ptr)[2] = (uint8_t)((uint32_t)(u)); \
+ } while (0)
+
+static const uint16_t zt_ethertype = 0x901;
+static const uint8_t zt_version = 0x01;
+static const uint32_t zt_ephemeral = 0x800000u; // start of ephemeral ports
+static const uint32_t zt_max_port = 0xffffffu; // largest port
+static const uint32_t zt_port_mask = 0xffffffu; // mask of valid ports
+
+// These are compile time tunables for now.
+enum zt_tunables {
+ zt_listenq = 128, // backlog queue length
+ zt_listen_expire = 60000000, // maximum time in backlog
+ zt_rcv_bufsize = ZT_MAX_MTU + 128, // max UDP recv
+ zt_conn_attempts = 12, // connection attempts (default)
+ zt_conn_interval = 5000000, // between attempts (usec)
+ zt_udp_sendq = 16, // outgoing UDP queue length
+ zt_recvq = 2, // max pending recv (per pipe)
+ zt_recv_stale = 1000000, // frags older than are stale
+ zt_ping_time = 60000000, // if no traffic, ping time (usec)
+ zt_ping_count = 5, // max ping attempts before close
+};
+
+enum zt_op_codes {
+ zt_op_data = 0x00, // data, final fragment
+ zt_op_conn_req = 0x10, // connect request
+ zt_op_conn_ack = 0x12, // connect accepted
+ zt_op_disc_req = 0x20, // disconnect request (no ack)
+ zt_op_ping = 0x30, // ping request
+ zt_op_pong = 0x32, // ping response
+ zt_op_error = 0x40, // error response
+};
+
+enum zt_offsets {
+ zt_offset_op = 0x00,
+ zt_offset_flags = 0x01,
+ zt_offset_version = 0x02, // protocol version number (2 bytes)
+ zt_offset_zero1 = 0x04, // reserved, must be zero (1 byte)
+ zt_offset_dst_port = 0x05, // destination port (3 bytes)
+ zt_offset_zero2 = 0x08, // reserved, must be zero (1 byte)
+ zt_offset_src_port = 0x09, // source port number (3 bytes)
+ zt_offset_creq_proto = 0x0C, // SP protocol number (2 bytes)
+ zt_offset_cack_proto = 0x0C, // SP protocol number (2 bytes)
+ zt_offset_err_code = 0x0C, // error code (1 byte)
+ zt_offset_err_msg = 0x0D, // error message (string)
+ zt_offset_data_id = 0x0C, // message ID (2 bytes)
+ zt_offset_data_fragsz = 0x0E, // fragment size
+ zt_offset_data_frag = 0x10, // fragment number, first is 1 (2 bytes)
+ zt_offset_data_nfrag = 0x12, // total fragments (2 bytes)
+ zt_offset_data_data = 0x14, // user payload
+ zt_size_headers = 0x0C, // size of headers
+ zt_size_conn_req = 0x0E, // size of conn_req (connect request)
+ zt_size_conn_ack = 0x0E, // size of conn_ack (connect reply)
+ zt_size_disc_req = 0x0C, // size of disc_req (disconnect)
+ zt_size_ping = 0x0C, // size of ping request
+ zt_size_pong = 0x0C, // size of ping reply
+ zt_size_data = 0x14, // size of data message (w/o payload)
+};
+
+enum zt_errors {
+ zt_err_refused = 0x01, // Connection refused
+ zt_err_notconn = 0x02, // Connection does not exit
+ zt_err_wrongsp = 0x03, // SP protocol mismatch
+ zt_err_proto = 0x04, // Other protocol errror
+ zt_err_msgsize = 0x05, // Message to large
+ zt_err_unknown = 0x06, // Other errors
+};
+
+// This node structure is wrapped around the ZT_node; this allows us to
+// have multiple endpoints referencing the same ZT_node, but also to
+// support different nodes (identities) based on different homedirs.
+// This means we need to stick these on a global linked list, manage
+// them with a reference count, and uniquely identify them using the
+// homedir.
+struct zt_node {
+ char zn_path[NNG_MAXADDRLEN]; // ought to be sufficient
+ ZT_Node * zn_znode;
+ uint64_t zn_self;
+ nni_list_node zn_link;
+ int zn_closed;
+ nni_plat_udp *zn_udp4;
+ nni_plat_udp *zn_udp6;
+ nni_list zn_eplist;
+ nni_list zn_plist;
+ nni_idhash * zn_ports;
+ nni_idhash * zn_eps;
+ nni_idhash * zn_lpipes;
+ nni_idhash * zn_rpipes;
+ nni_idhash * zn_peers; // indexed by remote address
+ nni_aio * zn_rcv4_aio;
+ char * zn_rcv4_buf;
+ nng_sockaddr zn_rcv4_addr;
+ nni_aio * zn_rcv6_aio;
+ char * zn_rcv6_buf;
+ nng_sockaddr zn_rcv6_addr;
+ nni_thr zn_bgthr;
+ nni_time zn_bgtime;
+ nni_cv zn_bgcv;
+ nni_cv zn_snd6_cv;
+};
+
+// The fragment list is used to keep track of incoming received
+// fragments for reassembly into a complete message.
+struct zt_fraglist {
+ nni_time fl_time; // time first frag was received
+ uint32_t fl_msgid; // message id
+ int fl_ready; // we have all messages
+ unsigned int fl_fragsz;
+ unsigned int fl_nfrags;
+ uint8_t * fl_missing;
+ size_t fl_missingsz;
+ nni_msg * fl_msg;
+};
+
+struct zt_pipe {
+ nni_list_node zp_link;
+ const char * zp_addr;
+ zt_node * zp_ztn;
+ uint64_t zp_nwid;
+ uint64_t zp_laddr;
+ uint64_t zp_raddr;
+ uint16_t zp_peer;
+ uint16_t zp_proto;
+ uint16_t zp_next_msgid;
+ size_t zp_rcvmax;
+ size_t zp_mtu;
+ int zp_closed;
+ nni_aio * zp_user_rxaio;
+ nni_time zp_last_recv;
+ zt_fraglist zp_recvq[zt_recvq];
+ int zp_ping_try;
+ int zp_ping_count;
+ nni_duration zp_ping_time;
+ nni_aio * zp_ping_aio;
+};
+
+typedef struct zt_creq zt_creq;
+struct zt_creq {
+ uint64_t cr_expire;
+ uint64_t cr_raddr;
+ uint16_t cr_proto;
+};
+
+struct zt_ep {
+ nni_list_node ze_link;
+ char ze_url[NNG_MAXADDRLEN];
+ char ze_home[NNG_MAXADDRLEN]; // should be enough
+ zt_node * ze_ztn;
+ uint64_t ze_nwid;
+ int ze_mode;
+ nni_sockaddr ze_addr;
+ uint64_t ze_raddr; // remote node address
+ uint64_t ze_laddr; // local node address
+ uint16_t ze_proto;
+ size_t ze_rcvmax;
+ nni_aio * ze_aio;
+ nni_aio * ze_creq_aio;
+ int ze_creq_try;
+ nni_list ze_aios;
+ int ze_maxmtu;
+ int ze_phymtu;
+ int ze_ping_count;
+ nni_duration ze_ping_time;
+
+ // Incoming connection requests (server only). We only
+ // only have "accepted" requests -- that is we won't have an
+ // established connection/pipe unless the application calls
+ // accept. Since the "application" is our library, that should
+ // be pretty much as fast we can run.
+ zt_creq ze_creqs[zt_listenq];
+ int ze_creq_head;
+ int ze_creq_tail;
+};
+
+// Locking strategy. At present the ZeroTier core is not reentrant or fully
+// threadsafe. (We expect this will be fixed.) Furthermore, there are
+// some significant challenges in dealing with locks associated with the
+// callbacks, etc. So we take a big-hammer approach, and just use a single
+// global lock for everything. We hold this lock when calling into the
+// ZeroTier framework. Since ZeroTier has no independent threads, that
+// means that it will always hold this lock in its core, and the lock will
+// also be held automatically in any of our callbacks. We never hold any
+// other locks across ZeroTier core calls. We may not acquire the global
+// lock in callbacks (they will already have it held). Any other locks
+// can be acquired as long as they are not held during calls into ZeroTier.
+//
+// This will have a detrimental impact on performance, but to be completely
+// honest we don't think anyone will be using the ZeroTier transport in
+// performance critical applications; scalability may become a factor for
+// large servers sitting in a ZeroTier hub situation. (Then again, since
+// only the zerotier procesing is single threaded, it may not
+// be that much of a bottleneck -- really depends on how expensive these
+// operations are. We can use lockstat or other lock-hotness tools to
+// check for this later.)
+
+static nni_mtx zt_lk;
+static nni_list zt_nodes;
+
+static void zt_ep_send_conn_req(zt_ep *);
+static void zt_ep_conn_req_cb(void *);
+static void zt_ep_doaccept(zt_ep *);
+static void zt_pipe_dorecv(zt_pipe *);
+static int zt_pipe_init(zt_pipe **, zt_ep *, uint64_t, uint64_t);
+static void zt_pipe_ping_cb(void *);
+static void zt_fraglist_clear(zt_fraglist *);
+static void zt_fraglist_free(zt_fraglist *);
+static void zt_virtual_recv(ZT_Node *, void *, void *, uint64_t, void **,
+ uint64_t, uint64_t, unsigned int, unsigned int, const void *,
+ unsigned int);
+
+static uint64_t
+zt_now(void)
+{
+ // We return msec
+ return (nni_clock() / 1000);
+}
+
+static void
+zt_bgthr(void *arg)
+{
+ zt_node *ztn = arg;
+ nni_time now;
+
+ nni_mtx_lock(&zt_lk);
+ for (;;) {
+ now = nni_clock();
+
+ if (ztn->zn_closed) {
+ break;
+ }
+
+ if (now < ztn->zn_bgtime) {
+ nni_cv_until(&ztn->zn_bgcv, ztn->zn_bgtime);
+ continue;
+ }
+
+ now /= 1000; // usec -> msec
+ ZT_Node_processBackgroundTasks(ztn->zn_znode, NULL, now, &now);
+
+ ztn->zn_bgtime = now * 1000; // usec
+ }
+ nni_mtx_unlock(&zt_lk);
+}
+
+static void
+zt_node_resched(zt_node *ztn, uint64_t msec)
+{
+ ztn->zn_bgtime = msec * 1000; // convert to usec
+ nni_cv_wake1(&ztn->zn_bgcv);
+}
+
+static void
+zt_node_rcv4_cb(void *arg)
+{
+ zt_node * ztn = arg;
+ nni_aio * aio = ztn->zn_rcv4_aio;
+ struct sockaddr_storage sa;
+ struct sockaddr_in * sin;
+ nng_sockaddr_in * nsin;
+ uint64_t now;
+
+ if (nni_aio_result(aio) != 0) {
+ // Outside of memory exhaustion, we can't really think
+ // of any reason for this to legitimately fail.
+ // Arguably we should inject a fallback delay, but for
+ // now we just carry on.
+ return;
+ }
+
+ memset(&sa, 0, sizeof(sa));
+ sin = (void *) &sa;
+ nsin = &ztn->zn_rcv4_addr.s_un.s_in;
+ sin->sin_family = AF_INET;
+ sin->sin_port = nsin->sa_port;
+ sin->sin_addr.s_addr = nsin->sa_addr;
+
+ nni_mtx_lock(&zt_lk);
+ now = zt_now();
+
+ // We are not going to perform any validation of the data; we
+ // just pass this straight into the ZeroTier core.
+ // XXX: CHECK THIS, if it fails then we have a fatal error with
+ // the znode, and have to shut everything down.
+ ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa,
+ ztn->zn_rcv4_buf, aio->a_count, &now);
+
+ // Schedule background work
+ zt_node_resched(ztn, now);
+
+ // Schedule another receive.
+ if (ztn->zn_udp4 != NULL) {
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_buf = ztn->zn_rcv4_buf;
+ aio->a_iov[0].iov_len = zt_rcv_bufsize;
+ aio->a_addr = &ztn->zn_rcv4_addr;
+ aio->a_count = 0;
+
+ nni_plat_udp_recv(ztn->zn_udp4, aio);
+ }
+ nni_mtx_unlock(&zt_lk);
+}
+
+static void
+zt_node_rcv6_cb(void *arg)
+{
+ zt_node * ztn = arg;
+ nni_aio * aio = ztn->zn_rcv6_aio;
+ struct sockaddr_storage sa;
+ struct sockaddr_in6 * sin6;
+ struct nng_sockaddr_in6 *nsin6;
+ uint64_t now;
+
+ if (nni_aio_result(aio) != 0) {
+ // Outside of memory exhaustion, we can't really think
+ // of any reason for this to legitimately fail.
+ // Arguably we should inject a fallback delay, but for
+ // now we just carry on.
+ return;
+ }
+
+ memset(&sa, 0, sizeof(sa));
+ sin6 = (void *) &sa;
+ nsin6 = &ztn->zn_rcv6_addr.s_un.s_in6;
+ sin6->sin6_family = AF_INET6;
+ sin6->sin6_port = nsin6->sa_port;
+ memcpy(&sin6->sin6_addr, nsin6->sa_addr, 16);
+
+ nni_mtx_lock(&zt_lk);
+ now = zt_now(); // msec
+
+ // We are not going to perform any validation of the data; we
+ // just pass this straight into the ZeroTier core.
+ ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa,
+ ztn->zn_rcv6_buf, aio->a_count, &now);
+
+ // Schedule background work
+ zt_node_resched(ztn, now);
+
+ // Schedule another receive.
+ if (ztn->zn_udp6 != NULL) {
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_buf = ztn->zn_rcv6_buf;
+ aio->a_iov[0].iov_len = zt_rcv_bufsize;
+ aio->a_addr = &ztn->zn_rcv6_addr;
+ aio->a_count = 0;
+ nni_plat_udp_recv(ztn->zn_udp6, aio);
+ }
+ nni_mtx_unlock(&zt_lk);
+}
+
+static uint64_t
+zt_mac_to_node(uint64_t mac, uint64_t nwid)
+{
+ uint64_t node;
+ // This extracts a node address from a mac addres. The
+ // network ID is mixed in, and has to be extricated. We
+ // the node ID is located in the lower 40 bits, and scrambled
+ // against the nwid.
+ node = mac & 0xffffffffffull;
+ node ^= ((nwid >> 8) & 0xff) << 32;
+ node ^= ((nwid >> 16) & 0xff) << 24;
+ node ^= ((nwid >> 24) & 0xff) << 16;
+ node ^= ((nwid >> 32) & 0xff) << 8;
+ node ^= (nwid >> 40) & 0xff;
+ return (node);
+}
+
+static uint64_t
+zt_node_to_mac(uint64_t node, uint64_t nwid)
+{
+ uint64_t mac;
+ // We use LSB of network ID, and make sure that we clear
+ // multicast and set local administration -- this is the first
+ // octet of the 48 bit mac address. We also avoid 0x52, which
+ // is known to be used in KVM, libvirt, etc.
+ mac = ((uint8_t)(nwid & 0xfe) | 0x02);
+ if (mac == 0x52) {
+ mac = 0x32;
+ }
+ mac <<= 40;
+ mac |= node;
+ // The rest of the network ID is XOR'd in, in reverse byte
+ // order.
+ mac ^= ((nwid >> 8) & 0xff) << 32;
+ mac ^= ((nwid >> 16) & 0xff) << 24;
+ mac ^= ((nwid >> 24) & 0xff) << 16;
+ mac ^= ((nwid >> 32) & 0xff) << 8;
+ mac ^= (nwid >> 40) & 0xff;
+ return (mac);
+}
+
+static int
+zt_result(enum ZT_ResultCode rv)
+{
+ switch (rv) {
+ case ZT_RESULT_OK:
+ return (0);
+ case ZT_RESULT_OK_IGNORED:
+ return (0);
+ case ZT_RESULT_FATAL_ERROR_OUT_OF_MEMORY:
+ return (NNG_ENOMEM);
+ case ZT_RESULT_FATAL_ERROR_DATA_STORE_FAILED:
+ return (NNG_EPERM);
+ case ZT_RESULT_FATAL_ERROR_INTERNAL:
+ return (NNG_EINTERNAL);
+ case ZT_RESULT_ERROR_NETWORK_NOT_FOUND:
+ return (NNG_EADDRINVAL);
+ case ZT_RESULT_ERROR_UNSUPPORTED_OPERATION:
+ return (NNG_ENOTSUP);
+ case ZT_RESULT_ERROR_BAD_PARAMETER:
+ return (NNG_EINVAL);
+ default:
+ return (NNG_ETRANERR + (int) rv);
+ }
+}
+
+// ZeroTier Node API callbacks
+static int
+zt_virtual_config(ZT_Node *node, void *userptr, void *thr, uint64_t nwid,
+ void **netptr, enum ZT_VirtualNetworkConfigOperation op,
+ const ZT_VirtualNetworkConfig *config)
+{
+ zt_node *ztn = userptr;
+ zt_ep * ep;
+
+ NNI_ARG_UNUSED(thr);
+ NNI_ARG_UNUSED(netptr);
+
+ NNI_ASSERT(node == ztn->zn_znode);
+
+ // Maybe we don't have to create taps or anything like that.
+ // We do get our mac and MTUs from this, so there's that.
+ switch (op) {
+ case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_UP:
+ case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_CONFIG_UPDATE:
+
+ // We only really care about changes to the MTU. From
+ // an API perspective the MAC could change, but that
+ // cannot really happen because the node identity and
+ // the nwid are fixed.
+ NNI_LIST_FOREACH (&ztn->zn_eplist, ep) {
+ NNI_ASSERT(nwid == config->nwid);
+ if (ep->ze_nwid != config->nwid) {
+ continue;
+ }
+ ep->ze_maxmtu = config->mtu;
+ ep->ze_phymtu = config->physicalMtu;
+
+ if ((ep->ze_mode == NNI_EP_MODE_DIAL) &&
+ (nni_list_first(&ep->ze_aios) != NULL)) {
+ zt_ep_send_conn_req(ep);
+ }
+ // if (ep->ze_mode == NNI_EP
+ // zt_send_
+ // nni_aio_finish(ep->ze_join_aio, 0);
+ // }
+ // XXX: schedule creqs if needed!
+ }
+ break;
+ case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY:
+ case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DOWN:
+ // XXX: tear down endpoints?
+ default:
+ break;
+ }
+ return (0);
+}
+
+// zt_send modifies the start of the supplied buffer to update the
+// message headers with protocol specific details (version, port numbers,
+// etc.) and then sends it over the virtual network.
+static void
+zt_send(zt_node *ztn, uint64_t nwid, uint8_t op, uint64_t raddr,
+ uint64_t laddr, uint8_t *data, size_t len)
+{
+ uint64_t srcmac = zt_node_to_mac(laddr >> 24, nwid);
+ uint64_t dstmac = zt_node_to_mac(raddr >> 24, nwid);
+ uint64_t now = zt_now();
+
+ NNI_ASSERT(len >= zt_size_headers);
+ data[zt_offset_op] = op;
+ data[zt_offset_flags] = 0;
+ data[zt_offset_zero1] = 0;
+ data[zt_offset_zero2] = 0;
+ NNI_PUT16(data + zt_offset_version, zt_version);
+ ZT_PUT24(data + zt_offset_dst_port, raddr & zt_port_mask);
+ ZT_PUT24(data + zt_offset_src_port, laddr & zt_port_mask);
+
+ // If we are looping back, bypass ZT.
+ if (srcmac == dstmac) {
+ zt_virtual_recv(ztn->zn_znode, ztn, NULL, nwid, NULL, srcmac,
+ dstmac, zt_ethertype, 0, data, len);
+ return;
+ }
+
+ (void) ZT_Node_processVirtualNetworkFrame(ztn->zn_znode, NULL, now,
+ nwid, srcmac, dstmac, zt_ethertype, 0, data, len, &now);
+
+ zt_node_resched(ztn, now);
+}
+
+static void
+zt_send_err(zt_node *ztn, uint64_t nwid, uint64_t raddr, uint64_t laddr,
+ uint8_t err, const char *msg)
+{
+ uint8_t data[128];
+
+ NNI_ASSERT((strlen(msg) + zt_offset_err_msg) < sizeof(data));
+
+ data[zt_offset_err_code] = err;
+ nni_strlcpy((char *) data + zt_offset_err_msg, msg,
+ sizeof(data) - zt_offset_err_msg);
+
+ zt_send(ztn, nwid, zt_op_error, raddr, laddr, data,
+ strlen(msg) + zt_offset_err_msg);
+}
+
+static void
+zt_pipe_send_err(zt_pipe *p, uint8_t err, const char *msg)
+{
+ zt_send_err(p->zp_ztn, p->zp_nwid, p->zp_raddr, p->zp_laddr, err, msg);
+}
+
+static void
+zt_pipe_send_disc_req(zt_pipe *p)
+{
+ uint8_t data[zt_size_disc_req];
+
+ zt_send(p->zp_ztn, p->zp_nwid, zt_op_disc_req, p->zp_raddr,
+ p->zp_laddr, data, sizeof(data));
+}
+
+static void
+zt_pipe_send_ping(zt_pipe *p)
+{
+ uint8_t data[zt_size_ping];
+
+ zt_send(p->zp_ztn, p->zp_nwid, zt_op_ping, p->zp_raddr, p->zp_laddr,
+ data, sizeof(data));
+}
+
+static void
+zt_pipe_send_pong(zt_pipe *p)
+{
+ uint8_t data[zt_size_ping];
+
+ zt_send(p->zp_ztn, p->zp_nwid, zt_op_pong, p->zp_raddr, p->zp_laddr,
+ data, sizeof(data));
+}
+
+static void
+zt_pipe_send_conn_ack(zt_pipe *p)
+{
+ uint8_t data[zt_size_conn_ack];
+
+ NNI_PUT16(data + zt_offset_cack_proto, p->zp_proto);
+ zt_send(p->zp_ztn, p->zp_nwid, zt_op_conn_ack, p->zp_raddr,
+ p->zp_laddr, data, sizeof(data));
+}
+
+static void
+zt_ep_send_conn_req(zt_ep *ep)
+{
+ uint8_t data[zt_size_conn_req];
+
+ NNI_PUT16(data + zt_offset_creq_proto, ep->ze_proto);
+ zt_send(ep->ze_ztn, ep->ze_nwid, zt_op_conn_req, ep->ze_raddr,
+ ep->ze_laddr, data, sizeof(data));
+}
+
+static void
+zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
+{
+ zt_node *ztn = ep->ze_ztn;
+ nni_aio *aio = ep->ze_creq_aio;
+ zt_pipe *p;
+ int rv;
+
+ if (ep->ze_mode != NNI_EP_MODE_DIAL) {
+ zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr,
+ zt_err_proto, "Inappropriate operation");
+ return;
+ }
+
+ if (len != zt_size_conn_ack) {
+ zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr,
+ zt_err_proto, "Bad message length");
+ return;
+ }
+
+ if (ep->ze_creq_try == 0) {
+ return;
+ }
+
+ // Do we already have a matching pipe? If so, we can discard
+ // the operation. This should not happen, since we normally,
+ // deregister the endpoint when we create the pipe.
+ if ((nni_idhash_find(ztn->zn_peers, raddr, (void **) &p)) == 0) {
+ return;
+ }
+
+ if ((rv = zt_pipe_init(&p, ep, raddr, ep->ze_laddr)) != 0) {
+ // We couldn't create the pipe, just drop it.
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ NNI_GET16(data + zt_offset_cack_proto, p->zp_peer);
+
+ // Reset the address of the endpoint, so that the next call to
+ // ep_connect will bind a new one -- we are using this one for the
+ // pipe.
+ nni_idhash_remove(ztn->zn_eps, ep->ze_laddr);
+ ep->ze_laddr = 0;
+
+ nni_aio_finish_pipe(aio, p);
+}
+
+static void
+zt_ep_recv_conn_req(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
+{
+ zt_node *ztn = ep->ze_ztn;
+ zt_pipe *p;
+ int i;
+
+ if (ep->ze_mode != NNI_EP_MODE_LISTEN) {
+ zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr,
+ zt_err_proto, "Inappropriate operation");
+ return;
+ }
+ if (len != zt_size_conn_req) {
+ zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr,
+ zt_err_proto, "Bad message length");
+ return;
+ }
+
+ // If we already have created a pipe for this connection
+ // then just reply the conn ack.
+ if ((nni_idhash_find(ztn->zn_peers, raddr, (void **) &p)) == 0) {
+ zt_pipe_send_conn_ack(p);
+ return;
+ }
+
+ // We may already have a connection request queued (if this was
+ // a resend for example); if that's the case we just ignore
+ // this one.
+ for (i = ep->ze_creq_tail; i != ep->ze_creq_head; i++) {
+ if (ep->ze_creqs[i % zt_listenq].cr_raddr == raddr) {
+ return;
+ }
+ }
+ // We may already have filled our listenq, in which case we just drop.
+ if ((ep->ze_creq_tail + zt_listenq) == ep->ze_creq_head) {
+ // We have taken as many as we can, so just drop it.
+ return;
+ }
+
+ // Record the connection request, and then process any
+ // pending acceptors.
+ i = ep->ze_creq_head % zt_listenq;
+
+ NNI_GET16(data + zt_offset_creq_proto, ep->ze_creqs[i].cr_proto);
+ ep->ze_creqs[i].cr_raddr = raddr;
+ ep->ze_creqs[i].cr_expire = nni_clock() + zt_listen_expire;
+ ep->ze_creq_head++;
+
+ zt_ep_doaccept(ep);
+}
+
+static void
+zt_ep_recv_error(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
+{
+ nni_aio *aio;
+ int code;
+
+ // Most of the time we don't care about errors. The exception here
+ // is that when we have an outstanding CON_REQ, we would like to
+ // process that appropriately.
+
+ if (ep->ze_mode != NNI_EP_MODE_DIAL) {
+ // Drop it.
+ return;
+ }
+
+ if (len < zt_offset_err_msg) {
+ // Malformed error frame.
+ return;
+ }
+
+ code = data[zt_offset_err_code];
+ switch (code) {
+ case zt_err_refused:
+ code = NNG_ECONNREFUSED;
+ break;
+ case zt_err_notconn:
+ code = NNG_ECLOSED;
+ break;
+ case zt_err_wrongsp:
+ code = NNG_EPROTO;
+ break;
+ default:
+ code = NNG_ETRANERR;
+ break;
+ }
+
+ if (ep->ze_creq_try > 0) {
+ ep->ze_creq_try = 0;
+ nni_aio_finish_error(ep->ze_creq_aio, code);
+ }
+}
+
+static void
+zt_ep_virtual_recv(
+ zt_ep *ep, uint8_t op, uint64_t raddr, const uint8_t *data, size_t len)
+{
+ // Only listeners should be receiving. Dialers receive on the pipe,
+ // rather than the endpoint. The only message that endpoints can
+ // receive are connection requests.
+ switch (op) {
+ case zt_op_conn_req:
+ zt_ep_recv_conn_req(ep, raddr, data, len);
+ return;
+ case zt_op_conn_ack:
+ zt_ep_recv_conn_ack(ep, raddr, data, len);
+ return;
+ case zt_op_error:
+ zt_ep_recv_error(ep, raddr, data, len);
+ return;
+ default:
+ zt_send_err(ep->ze_ztn, ep->ze_nwid, raddr, ep->ze_laddr,
+ zt_err_proto, "Bad operation");
+ return;
+ }
+}
+
+static void
+zt_pipe_close_err(zt_pipe *p, int err, uint8_t code, const char *msg)
+{
+ nni_aio *aio;
+ if ((aio = p->zp_user_rxaio) != NULL) {
+ p->zp_user_rxaio = NULL;
+ nni_aio_finish_error(aio, err);
+ }
+ if ((aio = p->zp_ping_aio) != NULL) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ p->zp_closed = 1;
+ if (msg != NULL) {
+ zt_pipe_send_err(p, code, msg);
+ }
+}
+
+static void
+zt_pipe_recv_data(zt_pipe *p, const uint8_t *data, size_t len)
+{
+ nni_aio * aio;
+ uint16_t msgid;
+ uint16_t fragno;
+ uint16_t nfrags;
+ uint16_t fragsz;
+ zt_fraglist *fl;
+ int i;
+ int slot;
+ uint8_t bit;
+ uint8_t * body;
+
+ if (len < zt_size_data) {
+ // Runt frame. Drop it and close pipe with a protocol error.
+ zt_pipe_close_err(p, NNG_EPROTO, zt_err_proto, "Runt frame");
+ return;
+ }
+
+ NNI_GET16(data + zt_offset_data_id, msgid);
+ NNI_GET16(data + zt_offset_data_fragsz, fragsz);
+ NNI_GET16(data + zt_offset_data_frag, fragno);
+ NNI_GET16(data + zt_offset_data_nfrag, nfrags);
+ len -= zt_offset_data_data;
+ data += zt_offset_data_data;
+
+ // Check for cases where message size is clearly too large. Note
+ // that we only can catch the case where a message is larger by
+ // more than a fragment, since the final fragment may be shorter,
+ // and we won't know that until we receive it.
+ if ((nfrags * fragsz) >= (p->zp_rcvmax + fragsz)) {
+ // Discard, as the forwarder might be on the other side
+ // of a device. This is gentler than just shutting the pipe
+ // down. Sending a remote error might be polite, but since
+ // most peers will close the pipe on such an error, we
+ // simply silent discard it.
+ return;
+ }
+
+ // We run the recv logic once, to clear stale fragment entries.
+ zt_pipe_dorecv(p);
+
+ // Find a suitable fragment slot.
+ slot = -1;
+ for (i = 0; i < zt_recvq; i++) {
+ fl = &p->zp_recvq[i];
+ // This was our message ID, we always use it.
+ if (msgid == fl->fl_msgid) {
+ slot = i;
+ break;
+ }
+
+ if (slot < 0) {
+ slot = i;
+ } else if (fl->fl_time < p->zp_recvq[slot].fl_time) {
+ // This has an earlier expiration, so lets choose it.
+ slot = i;
+ }
+ }
+
+ NNI_ASSERT(slot >= 0);
+
+ fl = &p->zp_recvq[slot];
+ if (fl->fl_msgid != msgid) {
+ // First fragment we've received for this message (but might
+ // not be first fragment for message!)
+ zt_fraglist_clear(fl);
+
+ if (nni_msg_alloc(&fl->fl_msg, nfrags * fragsz) != 0) {
+ // Out of memory. We don't close the pipe, but
+ // just fail to receive the message. Bump a stat?
+ return;
+ }
+
+ fl->fl_nfrags = nfrags;
+ fl->fl_fragsz = fragsz;
+ fl->fl_msgid = msgid;
+ fl->fl_time = nni_clock();
+
+ // Set the missing mask.
+ memset(fl->fl_missing, 0xff, nfrags / 8);
+ fl->fl_missing[nfrags / 8] |= ((1 << (nfrags % 8)) - 1);
+ }
+ if ((nfrags != fl->fl_nfrags) || (fragsz != fl->fl_fragsz) ||
+ (fragno >= nfrags) || (fragsz == 0) || (nfrags == 0) ||
+ ((fragno != (nfrags - 1)) && (len != fragsz))) {
+ // Protocol error, message parameters changed.
+ zt_pipe_close_err(
+ p, NNG_EPROTO, zt_err_proto, "Invalid message parameters");
+ zt_fraglist_clear(fl);
+ return;
+ }
+
+ bit = (uint8_t)(1 << (fragno % 8));
+ if ((fl->fl_missing[fragno / 8] & bit) == 0) {
+ // We've already got this fragment, ignore it. We don't
+ // bother to check for changed data.
+ return;
+ }
+
+ fl->fl_missing[fragno / 8] &= ~(bit);
+ body = nni_msg_body(fl->fl_msg);
+ body += fragno * fragsz;
+ memcpy(body, data, len);
+ if (fragno == (nfrags - 1)) {
+ // Last frag, maybe shorten the message.
+ nni_msg_chop(fl->fl_msg, (fragsz - len));
+ if (nni_msg_len(fl->fl_msg) > p->zp_rcvmax) {
+ // Strict enforcement of max recv.
+ zt_fraglist_clear(fl);
+ // Just discard the message.
+ return;
+ }
+ }
+
+ for (i = 0; i < ((nfrags + 7) / 8); i++) {
+ if (fl->fl_missing[i]) {
+ return;
+ }
+ }
+
+ // We got all fragments... try to send it up.
+ fl->fl_ready = 1;
+ zt_pipe_dorecv(p);
+}
+
+static void
+zt_pipe_recv_ping(zt_pipe *p, const uint8_t *data, size_t len)
+{
+ NNI_ARG_UNUSED(data);
+
+ if (len != zt_size_ping) {
+ zt_pipe_send_err(p, zt_err_proto, "Incorrect ping size");
+ return;
+ }
+ zt_pipe_send_pong(p);
+}
+
+static void
+zt_pipe_recv_pong(zt_pipe *p, const uint8_t *data, size_t len)
+{
+ NNI_ARG_UNUSED(data);
+
+ if (len != zt_size_pong) {
+ zt_pipe_send_err(p, zt_err_proto, "Incorrect pong size");
+ }
+}
+
+static void
+zt_pipe_recv_disc_req(zt_pipe *p, const uint8_t *data, size_t len)
+{
+ nni_aio *aio;
+ // NB: lock held already.
+ // Don't bother to check the length, going to disconnect anyway.
+ if ((aio = p->zp_user_rxaio) != NULL) {
+ p->zp_user_rxaio = NULL;
+ p->zp_closed = 1;
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+}
+
+static void
+zt_pipe_recv_error(zt_pipe *p, const uint8_t *data, size_t len)
+{
+ nni_aio *aio;
+
+ // Perhaps we should log an error message, but at the end of
+ // the day, the details are just not that interesting.
+ if ((aio = p->zp_user_rxaio) != NULL) {
+ p->zp_user_rxaio = NULL;
+ p->zp_closed = 1;
+ nni_aio_finish_error(aio, NNG_ETRANERR);
+ }
+}
+
+// This function is called when we have determined that a frame has
+// arrived for a pipe. The remote and local addresses were both
+// matched by the caller.
+static void
+zt_pipe_virtual_recv(zt_pipe *p, uint8_t op, const uint8_t *data, size_t len)
+{
+ // We got data, so update our recv time.
+ p->zp_last_recv = nni_clock();
+ p->zp_ping_try = 0;
+
+ switch (op) {
+ case zt_op_data:
+ zt_pipe_recv_data(p, data, len);
+ return;
+ case zt_op_disc_req:
+ zt_pipe_recv_disc_req(p, data, len);
+ return;
+ case zt_op_ping:
+ zt_pipe_recv_ping(p, data, len);
+ return;
+ case zt_op_pong:
+ zt_pipe_recv_pong(p, data, len);
+ return;
+ case zt_op_error:
+ zt_pipe_recv_error(p, data, len);
+ return;
+ }
+}
+
+// This function is called when a frame arrives on the
+// *virtual* network.
+static void
+zt_virtual_recv(ZT_Node *node, void *userptr, void *thr, uint64_t nwid,
+ void **netptr, uint64_t srcmac, uint64_t dstmac, unsigned int ethertype,
+ unsigned int vlanid, const void *payload, unsigned int len)
+{
+ zt_node * ztn = userptr;
+ uint8_t op;
+ const uint8_t *data = payload;
+ uint16_t proto;
+ uint16_t version;
+ uint32_t rport;
+ uint32_t lport;
+ zt_ep * ep;
+ zt_pipe * p;
+ uint64_t raddr;
+ uint64_t laddr;
+
+ if ((ethertype != zt_ethertype) || (len < zt_size_headers) ||
+ (data[zt_offset_flags] != 0) || (data[zt_offset_zero1] != 0) ||
+ (data[zt_offset_zero2] != 0)) {
+ return;
+ }
+ NNI_GET16(data + zt_offset_version, version);
+ if (version != zt_version) {
+ return;
+ }
+
+ op = data[zt_offset_op];
+
+ ZT_GET24(data + zt_offset_dst_port, lport);
+ ZT_GET24(data + zt_offset_src_port, rport);
+
+ raddr = zt_mac_to_node(srcmac, nwid);
+ raddr <<= 24;
+ raddr |= rport;
+
+ laddr = zt_mac_to_node(dstmac, nwid);
+ laddr <<= 24;
+ laddr |= lport;
+
+ // NB: We are holding the zt_lock.
+
+ // Look up a pipe, but also we use this chance to check that
+ // the source address matches what the pipe was established with.
+ // If the pipe does not match then we nak it. Note that pipes can
+ // appear on the znode twice (loopback), so we have to be careful
+ // to check the entire set of parameters, and to check for server
+ // vs. client pipes separately.
+
+ // If its a local address match on a client pipe, process it.
+ if ((nni_idhash_find(ztn->zn_lpipes, laddr, (void *) &p) == 0) &&
+ (p->zp_nwid == nwid) && (p->zp_raddr == raddr)) {
+ zt_pipe_virtual_recv(p, op, data, len);
+ return;
+ }
+
+ // If its a remote address match on a server pipe, process it.
+ if ((nni_idhash_find(ztn->zn_rpipes, raddr, (void *) &p) == 0) &&
+ (p->zp_nwid == nwid) && (p->zp_laddr == laddr)) {
+ zt_pipe_virtual_recv(p, op, data, len);
+ return;
+ }
+
+ // No pipe, so look for an endpoint.
+ if ((nni_idhash_find(ztn->zn_eps, laddr, (void **) &ep) == 0) &&
+ (ep->ze_nwid == nwid)) {
+ // direct this to an endpoint.
+ zt_ep_virtual_recv(ep, op, raddr, data, len);
+ return;
+ }
+
+ // We have a request for which we have no listener, and no
+ // pipe. For some of these we send back a NAK, but for others
+ // we just drop the frame.
+ switch (op) {
+ case zt_op_conn_req:
+ // No listener. Connection refused.
+ zt_send_err(ztn, nwid, raddr, laddr, zt_err_refused,
+ "Connection refused");
+ return;
+ case zt_op_data:
+ case zt_op_ping:
+ case zt_op_conn_ack:
+ zt_send_err(ztn, nwid, raddr, laddr, zt_err_notconn,
+ "Connection not found");
+ break;
+ case zt_op_error:
+ case zt_op_pong:
+ case zt_op_disc_req:
+ default:
+ // Just drop these.
+ break;
+ }
+}
+
+static void
+zt_event_cb(ZT_Node *node, void *userptr, void *thr, enum ZT_Event event,
+ const void *payload)
+{
+ NNI_ARG_UNUSED(node);
+ NNI_ARG_UNUSED(userptr);
+ NNI_ARG_UNUSED(thr);
+
+ switch (event) {
+ case ZT_EVENT_ONLINE: // Connected to the virtual net.
+ case ZT_EVENT_UP: // Node initialized (may not be connected).
+ case ZT_EVENT_DOWN: // Teardown of the node.
+ case ZT_EVENT_OFFLINE: // Removal of the node from the net.
+ case ZT_EVENT_TRACE: // Local trace events.
+ // printf("TRACE: %s\n", (const char *) payload);
+ break;
+ case ZT_EVENT_REMOTE_TRACE: // Remote trace, not supported.
+ default:
+ break;
+ }
+}
+
+static const char *zt_files[] = {
+ // clang-format off
+ NULL, // none, i.e. not used at all
+ "identity.public",
+ "identity.secret",
+ "planet",
+ NULL, // moon, e.g. moons.d/<ID>.moon -- we don't persist it
+ NULL, // peer, e.g. peers.d/<ID> -- we don't persist this
+ NULL, // network, e.g. networks.d/<ID>.conf -- we don't persist
+ // clang-format on
+};
+
+#ifdef _WIN32
+#define unlink DeleteFile
+#define pathsep "\\"
+#else
+#define pathsep "/"
+#endif
+
+static struct {
+ size_t len;
+ void * data;
+} zt_ephemeral_state[ZT_STATE_OBJECT_NETWORK_CONFIG];
+
+static void
+zt_state_put(ZT_Node *node, void *userptr, void *thr,
+ enum ZT_StateObjectType objtype, const uint64_t objid[2], const void *data,
+ int len)
+{
+ FILE * file;
+ zt_node * ztn = userptr;
+ char path[NNG_MAXADDRLEN + 1];
+ const char *fname;
+ size_t sz;
+
+ NNI_ARG_UNUSED(objid); // only use global files
+
+ if ((objtype > ZT_STATE_OBJECT_NETWORK_CONFIG) ||
+ ((fname = zt_files[(int) objtype]) == NULL)) {
+ return;
+ }
+
+ // If we have no valid path, then we just use ephemeral data.
+ if (strlen(ztn->zn_path) == 0) {
+ void * ndata = NULL;
+ void * odata = zt_ephemeral_state[objtype].data;
+ size_t olen = zt_ephemeral_state[objtype].len;
+ if ((len >= 0) && ((ndata = nni_alloc(len)) != NULL)) {
+ memcpy(ndata, data, len);
+ }
+ zt_ephemeral_state[objtype].data = ndata;
+ zt_ephemeral_state[objtype].len = len;
+
+ if (olen > 0) {
+ nni_free(odata, olen);
+ }
+ return;
+ }
+
+ sz = sizeof(path);
+ if (snprintf(path, sz, "%s%s%s", ztn->zn_path, pathsep, fname) >= sz) {
+ // If the path is too long, we can't cope. We
+ // just decline to store anything.
+ return;
+ }
+
+ // We assume that everyone can do standard C I/O.
+ // This may be a bad assumption. If that's the case,
+ // the platform should supply an alternative
+ // implementation. We are also assuming that we don't
+ // need to worry about atomic updates. As these items
+ // (keys, etc.) pretty much don't change, this should
+ // be fine.
+
+ if (len < 0) {
+ (void) unlink(path);
+ return;
+ }
+
+ if ((file = fopen(path, "wb")) == NULL) {
+ return;
+ }
+
+ if (fwrite(data, 1, len, file) != len) {
+ (void) unlink(path);
+ }
+ (void) fclose(file);
+}
+
+static int
+zt_state_get(ZT_Node *node, void *userptr, void *thr,
+ enum ZT_StateObjectType objtype, const uint64_t objid[2], void *data,
+ unsigned int len)
+{
+ FILE * file;
+ zt_node * ztn = userptr;
+ char path[NNG_MAXADDRLEN + 1];
+ const char *fname;
+ int nread;
+ size_t sz;
+
+ NNI_ARG_UNUSED(objid); // we only use global files
+
+ if ((objtype > ZT_STATE_OBJECT_NETWORK_CONFIG) ||
+ ((fname = zt_files[(int) objtype]) == NULL)) {
+ return (-1);
+ }
+
+ // If no base directory, we are using ephemeral data.
+ if (strlen(ztn->zn_path) == 0) {
+ if (zt_ephemeral_state[objtype].data == NULL) {
+ return (-1);
+ }
+ if (zt_ephemeral_state[objtype].len > len) {
+ return (-1);
+ }
+ len = zt_ephemeral_state[objtype].len;
+ memcpy(data, zt_ephemeral_state[objtype].data, len);
+ return (len);
+ }
+
+ sz = sizeof(path);
+ if (snprintf(path, sz, "%s%s%s", ztn->zn_path, pathsep, fname) >= sz) {
+ // If the path is too long, we can't cope.
+ return (-1);
+ }
+
+ // We assume that everyone can do standard C I/O.
+ // This may be a bad assumption. If that's the case,
+ // the platform should supply an alternative
+ // implementation. We are also assuming that we don't
+ // need to worry about atomic updates. As these items
+ // (keys, etc.) pretty much don't change, this should
+ // be fine.
+
+ if ((file = fopen(path, "rb")) == NULL) {
+ return (-1);
+ }
+
+ // seek to end of file
+ (void) fseek(file, 0, SEEK_END);
+ if (ftell(file) > len) {
+ fclose(file);
+ return (-1);
+ }
+ (void) fseek(file, 0, SEEK_SET);
+
+ nread = (int) fread(data, 1, len, file);
+ (void) fclose(file);
+
+ return (nread);
+}
+
+typedef struct zt_send_hdr {
+ nni_sockaddr sa;
+ size_t len;
+} zt_send_hdr;
+
+static void
+zt_wire_packet_send_cb(void *arg)
+{
+ // We don't actually care much about the results, we
+ // just need to release the resources.
+ nni_aio * aio = arg;
+ zt_send_hdr *hdr;
+
+ hdr = nni_aio_get_data(aio);
+ nni_free(hdr, hdr->len + sizeof(*hdr));
+ nni_aio_fini_cb(aio);
+}
+
+// This function is called when ZeroTier desires to send a
+// physical frame. The data is a UDP payload, the rest of the
+// payload should be set over vanilla UDP.
+static int
+zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket,
+ const struct sockaddr_storage *remaddr, const void *data, unsigned int len,
+ unsigned int ttl)
+{
+ nni_aio * aio;
+ nni_sockaddr addr;
+ struct sockaddr_in * sin = (void *) remaddr;
+ struct sockaddr_in6 *sin6 = (void *) remaddr;
+ zt_node * ztn = userptr;
+ nni_plat_udp * udp;
+ uint16_t port;
+ char * buf;
+ zt_send_hdr * hdr;
+
+ NNI_ARG_UNUSED(thr);
+ NNI_ARG_UNUSED(socket);
+ NNI_ARG_UNUSED(ttl);
+
+ // Kind of unfortunate, but we have to convert the
+ // sockaddr to a neutral form, and then back again in
+ // the platform layer.
+ switch (sin->sin_family) {
+ case AF_INET:
+ addr.s_un.s_in.sa_family = NNG_AF_INET;
+ addr.s_un.s_in.sa_port = sin->sin_port;
+ addr.s_un.s_in.sa_addr = sin->sin_addr.s_addr;
+ udp = ztn->zn_udp4;
+ port = htons(sin->sin_port);
+ break;
+ case AF_INET6:
+ addr.s_un.s_in6.sa_family = NNG_AF_INET6;
+ addr.s_un.s_in6.sa_port = sin6->sin6_port;
+ udp = ztn->zn_udp6;
+ port = htons(sin6->sin6_port);
+ memcpy(addr.s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16);
+ break;
+ default:
+ // No way to understand the address.
+ return (-1);
+ }
+
+ if (nni_aio_init(&aio, zt_wire_packet_send_cb, NULL) != 0) {
+ // Out of memory
+ return (-1);
+ }
+ if ((buf = nni_alloc(sizeof(*hdr) + len)) == NULL) {
+ nni_aio_fini(aio);
+ return (-1);
+ }
+
+ hdr = (void *) buf;
+ buf += sizeof(*hdr);
+
+ memcpy(buf, data, len);
+ nni_aio_set_data(aio, hdr);
+ hdr->sa = addr;
+ hdr->len = len;
+
+ aio->a_addr = &hdr->sa;
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_buf = buf;
+ aio->a_iov[0].iov_len = len;
+
+ // This should be non-blocking/best-effort, so while
+ // not great that we're holding the lock, also not tragic.
+ nni_aio_set_synch(aio);
+ nni_plat_udp_send(udp, aio);
+
+ return (0);
+}
+
+static struct ZT_Node_Callbacks zt_callbacks = {
+ .version = 0,
+ .statePutFunction = zt_state_put,
+ .stateGetFunction = zt_state_get,
+ .wirePacketSendFunction = zt_wire_packet_send,
+ .virtualNetworkFrameFunction = zt_virtual_recv,
+ .virtualNetworkConfigFunction = zt_virtual_config,
+ .eventCallback = zt_event_cb,
+ .pathCheckFunction = NULL,
+ .pathLookupFunction = NULL,
+};
+
+static void
+zt_node_destroy(zt_node *ztn)
+{
+ nni_aio_stop(ztn->zn_rcv4_aio);
+ nni_aio_stop(ztn->zn_rcv6_aio);
+
+ // Wait for background thread to exit!
+ nni_thr_fini(&ztn->zn_bgthr);
+
+ if (ztn->zn_znode != NULL) {
+ ZT_Node_delete(ztn->zn_znode);
+ }
+
+ if (ztn->zn_udp4 != NULL) {
+ nni_plat_udp_close(ztn->zn_udp4);
+ }
+ if (ztn->zn_udp6 != NULL) {
+ nni_plat_udp_close(ztn->zn_udp6);
+ }
+
+ if (ztn->zn_rcv4_buf != NULL) {
+ nni_free(ztn->zn_rcv4_buf, zt_rcv_bufsize);
+ }
+ if (ztn->zn_rcv6_buf != NULL) {
+ nni_free(ztn->zn_rcv6_buf, zt_rcv_bufsize);
+ }
+ nni_aio_fini(ztn->zn_rcv4_aio);
+ nni_aio_fini(ztn->zn_rcv6_aio);
+ nni_idhash_fini(ztn->zn_eps);
+ nni_idhash_fini(ztn->zn_lpipes);
+ nni_idhash_fini(ztn->zn_rpipes);
+ nni_idhash_fini(ztn->zn_peers);
+ nni_cv_fini(&ztn->zn_bgcv);
+ NNI_FREE_STRUCT(ztn);
+}
+
+static int
+zt_node_create(zt_node **ztnp, const char *path)
+{
+ zt_node * ztn;
+ nng_sockaddr sa4;
+ nng_sockaddr sa6;
+ int rv;
+ enum ZT_ResultCode zrv;
+
+ // We want to bind to any address we can (for now).
+ // Note that at the moment we only support IPv4. Its
+ // unclear how we are meant to handle underlying IPv6
+ // in ZeroTier. Probably we can use IPv6 dual stock
+ // sockets if they exist, but not all platforms support
+ // dual-stack. Furhtermore, IPv6 is not available
+ // everywhere, and the root servers may be IPv4 only.
+ memset(&sa4, 0, sizeof(sa4));
+ sa4.s_un.s_in.sa_family = NNG_AF_INET;
+ memset(&sa6, 0, sizeof(sa6));
+ sa6.s_un.s_in6.sa_family = NNG_AF_INET6;
+
+ if ((ztn = NNI_ALLOC_STRUCT(ztn)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&ztn->zn_eplist, zt_ep, ze_link);
+ NNI_LIST_INIT(&ztn->zn_plist, zt_pipe, zp_link);
+ nni_cv_init(&ztn->zn_bgcv, &zt_lk);
+ nni_aio_init(&ztn->zn_rcv4_aio, zt_node_rcv4_cb, ztn);
+ nni_aio_init(&ztn->zn_rcv6_aio, zt_node_rcv6_cb, ztn);
+
+ if (((ztn->zn_rcv4_buf = nni_alloc(zt_rcv_bufsize)) == NULL) ||
+ ((ztn->zn_rcv6_buf = nni_alloc(zt_rcv_bufsize)) == NULL)) {
+ zt_node_destroy(ztn);
+ return (NNG_ENOMEM);
+ }
+ if (((rv = nni_idhash_init(&ztn->zn_ports)) != 0) ||
+ ((rv = nni_idhash_init(&ztn->zn_eps)) != 0) ||
+ ((rv = nni_idhash_init(&ztn->zn_lpipes)) != 0) ||
+ ((rv = nni_idhash_init(&ztn->zn_rpipes)) != 0) ||
+ ((rv = nni_idhash_init(&ztn->zn_peers)) != 0) ||
+ ((rv = nni_thr_init(&ztn->zn_bgthr, zt_bgthr, ztn)) != 0) ||
+ ((rv = nni_plat_udp_open(&ztn->zn_udp4, &sa4)) != 0) ||
+ ((rv = nni_plat_udp_open(&ztn->zn_udp6, &sa6)) != 0)) {
+ zt_node_destroy(ztn);
+ return (rv);
+ }
+
+ // Setup for dynamic ephemeral port allocations. We
+ // set the range to allow for ephemeral ports, but not
+ // higher than the max port, and starting with an
+ // initial random value. Note that this should give us
+ // about 8 million possible ephemeral ports.
+ nni_idhash_set_limits(ztn->zn_ports, zt_ephemeral, zt_max_port,
+ (nni_random() % (zt_max_port - zt_ephemeral)) + zt_ephemeral);
+
+ nni_strlcpy(ztn->zn_path, path, sizeof(ztn->zn_path));
+ zrv = ZT_Node_new(&ztn->zn_znode, ztn, NULL, &zt_callbacks, zt_now());
+ if (zrv != ZT_RESULT_OK) {
+ zt_node_destroy(ztn);
+ return (zt_result(zrv));
+ }
+
+ nni_list_append(&zt_nodes, ztn);
+
+ ztn->zn_self = ZT_Node_address(ztn->zn_znode);
+
+ nni_thr_run(&ztn->zn_bgthr);
+
+ // Schedule an initial background run.
+ zt_node_resched(ztn, 1);
+
+ // Schedule receive
+ ztn->zn_rcv4_aio->a_niov = 1;
+ ztn->zn_rcv4_aio->a_iov[0].iov_buf = ztn->zn_rcv4_buf;
+ ztn->zn_rcv4_aio->a_iov[0].iov_len = zt_rcv_bufsize;
+ ztn->zn_rcv4_aio->a_addr = &ztn->zn_rcv4_addr;
+ ztn->zn_rcv4_aio->a_count = 0;
+ ztn->zn_rcv6_aio->a_niov = 1;
+ ztn->zn_rcv6_aio->a_iov[0].iov_buf = ztn->zn_rcv6_buf;
+ ztn->zn_rcv6_aio->a_iov[0].iov_len = zt_rcv_bufsize;
+ ztn->zn_rcv6_aio->a_addr = &ztn->zn_rcv6_addr;
+ ztn->zn_rcv6_aio->a_count = 0;
+
+ nni_plat_udp_recv(ztn->zn_udp4, ztn->zn_rcv4_aio);
+ nni_plat_udp_recv(ztn->zn_udp6, ztn->zn_rcv6_aio);
+
+ *ztnp = ztn;
+ return (0);
+}
+
+static int
+zt_node_find(zt_ep *ep)
+{
+ zt_node * ztn;
+ int rv;
+ nng_sockaddr sa;
+ ZT_VirtualNetworkConfig *cf;
+
+ NNI_LIST_FOREACH (&zt_nodes, ztn) {
+ if (strcmp(ep->ze_home, ztn->zn_path) == 0) {
+ goto done;
+ }
+ }
+
+ // We didn't find a node, so make one. And try to
+ // initialize it.
+ if ((rv = zt_node_create(&ztn, ep->ze_home)) != 0) {
+ return (rv);
+ }
+
+done:
+
+ ep->ze_ztn = ztn;
+ if (nni_list_node_active(&ep->ze_link)) {
+ nni_list_node_remove(&ep->ze_link);
+ }
+ nni_list_append(&ztn->zn_eplist, ep);
+
+ (void) ZT_Node_join(ztn->zn_znode, ep->ze_nwid, ztn, NULL);
+
+ if ((cf = ZT_Node_networkConfig(ztn->zn_znode, ep->ze_nwid)) != NULL) {
+ NNI_ASSERT(cf->nwid == ep->ze_nwid);
+ ep->ze_maxmtu = cf->mtu;
+ ep->ze_phymtu = cf->physicalMtu;
+ ZT_Node_freeQueryResult(ztn->zn_znode, cf);
+ }
+
+ return (0);
+}
+
+static int
+zt_chkopt(int opt, const void *dat, size_t sz)
+{
+ if (opt == nng_optid_recvmaxsz) {
+ // We cannot deal with message sizes larger
+ // than 64k.
+ return (nni_chkopt_size(dat, sz, 0, 0xffffffffU));
+ }
+ if (opt == nng_optid_zt_home) {
+ size_t l = nni_strnlen(dat, sz);
+ if ((l >= sz) || (l >= NNG_MAXADDRLEN)) {
+ return (NNG_EINVAL);
+ }
+ // XXX: should we apply additional security
+ // checks? home path is not null terminated
+ return (0);
+ }
+ if (opt == nng_optid_zt_ping_count) {
+ return (nni_chkopt_int(dat, sz, 0, 1000000));
+ }
+ if (opt == nng_optid_zt_ping_time) {
+ return (nni_chkopt_usec(dat, sz));
+ }
+ return (NNG_ENOTSUP);
+}
+
+static int
+zt_tran_init(void)
+{
+ int rv;
+ if (((rv = nni_option_register(nng_opt_zt_home, &nng_optid_zt_home)) !=
+ 0) ||
+ ((rv = nni_option_register(nng_opt_zt_node, &nng_optid_zt_node)) !=
+ 0) ||
+ ((rv = nni_option_register(nng_opt_zt_nwid, &nng_optid_zt_nwid)) !=
+ 0) ||
+ ((rv = nni_option_register(
+ nng_opt_zt_status, &nng_optid_zt_status)) != 0) ||
+ ((rv = nni_option_register(nng_opt_zt_network_name,
+ &nng_optid_zt_network_name)) != 0) ||
+ ((rv = nni_option_register(
+ nng_opt_zt_local_port, &nng_optid_zt_local_port)) != 0) ||
+ ((rv = nni_option_register(
+ nng_opt_zt_ping_count, &nng_optid_zt_ping_count)) != 0) ||
+ ((rv = nni_option_register(
+ nng_opt_zt_ping_time, &nng_optid_zt_ping_time)) != 0)) {
+ return (rv);
+ }
+ nni_mtx_init(&zt_lk);
+ NNI_LIST_INIT(&zt_nodes, zt_node, zn_link);
+ return (0);
+}
+
+static void
+zt_tran_fini(void)
+{
+ nng_optid_zt_home = -1;
+ nng_optid_zt_nwid = -1;
+ nng_optid_zt_node = -1;
+ nng_optid_zt_ping_count = -1;
+ nng_optid_zt_ping_time = -1;
+ zt_node *ztn;
+
+ nni_mtx_lock(&zt_lk);
+ while ((ztn = nni_list_first(&zt_nodes)) != 0) {
+ nni_list_remove(&zt_nodes, ztn);
+ ztn->zn_closed = 1;
+ nni_cv_wake(&ztn->zn_bgcv);
+ nni_mtx_unlock(&zt_lk);
+
+ zt_node_destroy(ztn);
+
+ nni_mtx_lock(&zt_lk);
+ }
+ nni_mtx_unlock(&zt_lk);
+
+ for (int i = 0; i < ZT_STATE_OBJECT_NETWORK_CONFIG; i++) {
+ if (zt_ephemeral_state[i].len > 0) {
+ nni_free(zt_ephemeral_state[i].data,
+ zt_ephemeral_state[i].len);
+ }
+ }
+ NNI_ASSERT(nni_list_empty(&zt_nodes));
+ nni_mtx_fini(&zt_lk);
+}
+
+static void
+zt_pipe_close(void *arg)
+{
+ zt_pipe *p = arg;
+ nni_aio *aio;
+
+ nni_mtx_lock(&zt_lk);
+ p->zp_closed = 1;
+ if ((aio = p->zp_user_rxaio) != NULL) {
+ p->zp_user_rxaio = NULL;
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ zt_pipe_send_disc_req(p);
+ nni_mtx_unlock(&zt_lk);
+}
+
+static void
+zt_pipe_fini(void *arg)
+{
+ zt_pipe *p = arg;
+ zt_node *ztn = p->zp_ztn;
+
+ nni_aio_fini(p->zp_ping_aio);
+
+ // This tosses the connection details and all state.
+ nni_mtx_lock(&zt_lk);
+ nni_idhash_remove(ztn->zn_ports, p->zp_laddr & zt_port_mask);
+ nni_idhash_remove(ztn->zn_lpipes, p->zp_laddr);
+ nni_idhash_remove(ztn->zn_rpipes, p->zp_raddr);
+ nni_idhash_remove(ztn->zn_peers, p->zp_raddr);
+ nni_mtx_unlock(&zt_lk);
+
+ for (int i = 0; i < zt_recvq; i++) {
+ zt_fraglist_free(&p->zp_recvq[i]);
+ }
+
+ NNI_FREE_STRUCT(p);
+}
+
+static int
+zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
+{
+ zt_pipe *p;
+ int rv;
+ zt_node *ztn = ep->ze_ztn;
+ int i;
+ size_t maxfrag;
+ size_t maxfrags;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ p->zp_ztn = ztn;
+ p->zp_raddr = raddr;
+ p->zp_laddr = laddr;
+ p->zp_proto = ep->ze_proto;
+ p->zp_nwid = ep->ze_nwid;
+ p->zp_mtu = ep->ze_phymtu;
+ p->zp_rcvmax = ep->ze_rcvmax;
+ p->zp_ping_count = ep->ze_ping_count;
+ p->zp_ping_time = ep->ze_ping_time;
+ p->zp_next_msgid = (uint16_t) nni_random();
+ p->zp_ping_try = 0;
+
+ if (ep->ze_mode == NNI_EP_MODE_DIAL) {
+ rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p);
+ } else {
+ rv = nni_idhash_insert(ztn->zn_rpipes, raddr, p);
+ }
+ if ((rv != 0) ||
+ ((rv = nni_idhash_insert(ztn->zn_peers, p->zp_raddr, p)) != 0) ||
+ ((rv = nni_aio_init(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) {
+ zt_pipe_fini(p);
+ }
+
+ // the largest fragment we can accept on this pipe
+ maxfrag = p->zp_mtu - zt_offset_data_data;
+ // and the larger fragment count we can accept on this pipe
+ // (round up)
+ maxfrags = (p->zp_rcvmax + (maxfrag - 1)) / maxfrag;
+
+ for (i = 0; i < zt_recvq; i++) {
+ zt_fraglist *fl = &p->zp_recvq[i];
+ fl->fl_time = NNI_TIME_ZERO;
+ fl->fl_msgid = 0;
+ fl->fl_ready = 0;
+ fl->fl_missingsz = (maxfrags + 7) / 8;
+ fl->fl_missing = nni_alloc(fl->fl_missingsz);
+ if (fl->fl_missing == NULL) {
+ zt_pipe_fini(p);
+ return (NNG_ENOMEM);
+ }
+ }
+
+ *pipep = p;
+ return (0);
+}
+
+static void
+zt_pipe_send(void *arg, nni_aio *aio)
+{
+ // As we are sending UDP, and there is no callback to worry
+ // about, we just go ahead and send out a stream of messages
+ // synchronously.
+ zt_pipe *p = arg;
+ size_t offset;
+ uint8_t data[ZT_MAX_MTU];
+ uint16_t id;
+ uint16_t nfrags;
+ uint16_t fragno;
+ uint16_t fragsz;
+ size_t bytes;
+ nni_msg *m;
+
+ nni_mtx_lock(&zt_lk);
+ if (nni_aio_start(aio, NULL, p) != 0) {
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
+
+ if (p->zp_closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
+
+ fragsz = (uint16_t)(p->zp_mtu - zt_offset_data_data);
+
+ if ((m = nni_aio_get_msg(aio)) == NULL) {
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ };
+
+ bytes = nni_msg_header_len(m) + nni_msg_len(m);
+ if (bytes >= (0xfffe * fragsz)) {
+ nni_aio_finish_error(aio, NNG_EMSGSIZE);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
+ // above check means nfrags will fit in 16-bits.
+ nfrags = (uint16_t)((bytes + (fragsz - 1)) / fragsz);
+
+ // get the next message ID, but skip 0
+ if ((id = p->zp_next_msgid++) == 0) {
+ id = p->zp_next_msgid++;
+ }
+
+ offset = 0;
+ fragno = 0;
+ do {
+ uint8_t *dest = data + zt_offset_data_data;
+ size_t room = fragsz;
+ size_t fraglen = 0;
+ size_t len;
+
+ // Prepend the header first.
+ if ((len = nni_msg_header_len(m)) > 0) {
+ if (len > fragsz) {
+ // This shouldn't happen! SP headers are
+ // supposed to be quite small.
+ nni_aio_finish_error(aio, NNG_EMSGSIZE);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
+ memcpy(dest, nni_msg_header(m), len);
+ dest += len;
+ room -= len;
+ offset += len;
+ fraglen += len;
+ nni_msg_header_clear(m);
+ }
+
+ len = nni_msg_len(m);
+ if (len > room) {
+ len = room;
+ }
+ memcpy(dest, nni_msg_body(m), len);
+
+ nng_msg_trim(m, len);
+ NNI_PUT16(data + zt_offset_data_id, id);
+ NNI_PUT16(data + zt_offset_data_fragsz, fragsz);
+ NNI_PUT16(data + zt_offset_data_frag, fragno);
+ NNI_PUT16(data + zt_offset_data_nfrag, nfrags);
+ offset += len;
+ fraglen += len;
+ fragno++;
+ zt_send(p->zp_ztn, p->zp_nwid, zt_op_data, p->zp_raddr,
+ p->zp_laddr, data, fraglen + zt_offset_data_data);
+ } while (nni_msg_len(m) != 0);
+
+ nni_aio_set_msg(aio, NULL);
+ nni_msg_free(m);
+ nni_aio_finish(aio, 0, offset);
+ nni_mtx_unlock(&zt_lk);
+}
+
+static void
+zt_pipe_cancel_recv(nni_aio *aio, int rv)
+{
+ zt_pipe *p = aio->a_prov_data;
+ nni_mtx_lock(&zt_lk);
+ if (p->zp_user_rxaio != aio) {
+ nni_mtx_unlock(&zt_lk);
+ }
+ p->zp_user_rxaio = NULL;
+ nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, rv);
+}
+
+static void
+zt_fraglist_clear(zt_fraglist *fl)
+{
+ nni_msg *msg;
+
+ fl->fl_ready = 0;
+ fl->fl_msgid = 0;
+ fl->fl_time = NNI_TIME_ZERO;
+ if ((msg = fl->fl_msg) != NULL) {
+ fl->fl_msg = NULL;
+ nni_msg_free(msg);
+ }
+ memset(fl->fl_missing, 0, fl->fl_missingsz);
+}
+
+static void
+zt_fraglist_free(zt_fraglist *fl)
+{
+ zt_fraglist_clear(fl);
+ nni_free(fl->fl_missing, fl->fl_missingsz);
+ fl->fl_missing = NULL;
+}
+
+static void
+zt_pipe_dorecv(zt_pipe *p)
+{
+ nni_aio *aio = p->zp_user_rxaio;
+ nni_time now = nni_clock();
+
+ if (aio == NULL) {
+ return;
+ }
+
+ for (int i = 0; i < zt_recvq; i++) {
+ zt_fraglist *fl = &p->zp_recvq[i];
+ nni_msg * msg;
+
+ if (now > (fl->fl_time + zt_recv_stale)) {
+ // fragment list is stale, clean it.
+ zt_fraglist_clear(fl);
+ continue;
+ }
+ if (!fl->fl_ready) {
+ continue;
+ }
+
+ // Got data. Let's pass it up.
+ msg = fl->fl_msg;
+ fl->fl_msg = NULL;
+ NNI_ASSERT(msg != NULL);
+ nni_aio_finish_msg(aio, msg);
+ zt_fraglist_clear(fl);
+ return;
+ }
+}
+
+static void
+zt_pipe_recv(void *arg, nni_aio *aio)
+{
+ zt_pipe *p = arg;
+
+ nni_mtx_lock(&zt_lk);
+ if (nni_aio_start(aio, zt_pipe_cancel_recv, p) != 0) {
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
+ if (p->zp_closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ } else {
+ p->zp_user_rxaio = aio;
+ zt_pipe_dorecv(p);
+ }
+ nni_mtx_unlock(&zt_lk);
+}
+
+static uint16_t
+zt_pipe_peer(void *arg)
+{
+ zt_pipe *pipe = arg;
+
+ return (pipe->zp_peer);
+}
+
+static int
+zt_getopt_status(zt_node *ztn, uint64_t nwid, void *buf, size_t *szp)
+{
+ ZT_VirtualNetworkConfig *vcfg;
+ int status;
+
+ nni_mtx_lock(&zt_lk);
+ vcfg = ZT_Node_networkConfig(ztn->zn_znode, nwid);
+ if (vcfg == NULL) {
+ nni_mtx_unlock(&zt_lk);
+ return (NNG_ECLOSED);
+ }
+ status = vcfg->status;
+ ZT_Node_freeQueryResult(ztn->zn_znode, vcfg);
+ nni_mtx_unlock(&zt_lk);
+
+ return (nni_getopt_int(status, buf, szp));
+}
+
+static int
+zt_getopt_network_name(zt_node *ztn, uint64_t nwid, void *buf, size_t *szp)
+{
+ ZT_VirtualNetworkConfig *vcfg;
+ int rv;
+
+ nni_mtx_lock(&zt_lk);
+ vcfg = ZT_Node_networkConfig(ztn->zn_znode, nwid);
+ if (vcfg == NULL) {
+ nni_mtx_unlock(&zt_lk);
+ return (NNG_ECLOSED);
+ }
+ rv = nni_getopt_str(vcfg->name, buf, szp);
+ ZT_Node_freeQueryResult(ztn->zn_znode, vcfg);
+ nni_mtx_unlock(&zt_lk);
+
+ return (rv);
+}
+
+static int
+zt_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
+{
+ zt_pipe *p = arg;
+ int rv;
+
+ if (option == nng_optid_recvmaxsz) {
+ rv = nni_getopt_size(p->zp_rcvmax, buf, szp);
+ } else if (option == nng_optid_zt_nwid) {
+ rv = nni_getopt_u64(p->zp_nwid, buf, szp);
+ } else if (option == nng_optid_zt_node) {
+ rv = nni_getopt_u64(p->zp_laddr >> 24, buf, szp);
+ } else if (option == nng_optid_zt_status) {
+ rv = zt_getopt_status(p->zp_ztn, p->zp_nwid, buf, szp);
+ } else if (option == nng_optid_zt_network_name) {
+ rv = zt_getopt_network_name(p->zp_ztn, p->zp_nwid, buf, szp);
+ } else {
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+static void
+zt_pipe_cancel_ping(nni_aio *aio, int rv)
+{
+ nni_aio_finish_error(aio, rv);
+}
+
+static void
+zt_pipe_ping_cb(void *arg)
+{
+ zt_pipe *p = arg;
+ nni_aio *aio = p->zp_ping_aio;
+
+ nni_mtx_lock(&zt_lk);
+ if (p->zp_closed || aio == NULL || (p->zp_ping_count == 0) ||
+ (p->zp_ping_time == NNI_TIME_NEVER) ||
+ (p->zp_ping_time == NNI_TIME_ZERO)) {
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
+ if (nni_aio_result(aio) != NNG_ETIMEDOUT) {
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
+ if (p->zp_ping_try < p->zp_ping_count) {
+ nni_time now = nni_clock();
+ nni_aio_set_timeout(aio, now + p->zp_ping_time);
+ if (now > (p->zp_last_recv + p->zp_ping_time)) {
+ // We have to send a ping to keep the session up.
+ if (nni_aio_start(aio, zt_pipe_cancel_ping, p) == 0) {
+ p->zp_ping_try++;
+ zt_pipe_send_ping(p);
+ }
+ } else {
+ // We still need the timer to wake us up in case
+ // we haven't seen traffic for a while.
+ nni_aio_start(aio, zt_pipe_cancel_ping, p);
+ }
+ } else {
+ // Close the pipe, but no need to send a reason to the
+ // peer, it is already AFK.
+ zt_pipe_close_err(p, NNG_ECLOSED, 0, NULL);
+ }
+ nni_mtx_unlock(&zt_lk);
+}
+
+static void
+zt_pipe_start(void *arg, nni_aio *aio)
+{
+ zt_pipe *p = arg;
+
+ nni_mtx_lock(&zt_lk);
+ // send a gratuitous ping, and start the ping interval timer.
+ if ((p->zp_ping_count > 0) && (p->zp_ping_time != NNI_TIME_ZERO) &&
+ (p->zp_ping_time != NNI_TIME_NEVER) && (p->zp_ping_aio != NULL)) {
+ p->zp_ping_try = 0;
+ nni_aio_set_timeout(aio, nni_clock() + p->zp_ping_time);
+ nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p);
+ zt_pipe_send_ping(p);
+ }
+ nni_aio_finish(aio, 0, 0);
+ nni_mtx_unlock(&zt_lk);
+}
+
+static void
+zt_ep_fini(void *arg)
+{
+ zt_ep *ep = arg;
+ nni_aio_stop(ep->ze_creq_aio);
+ nni_aio_fini(ep->ze_creq_aio);
+ NNI_FREE_STRUCT(ep);
+}
+
+static int
+zt_parsehex(const char **sp, uint64_t *valp, int wildok)
+{
+ int n;
+ const char *s = *sp;
+ char c;
+ uint64_t v;
+
+ if (wildok && *s == '*') {
+ *valp = 0;
+ s++;
+ *sp = s;
+ return (0);
+ }
+
+ for (v = 0, n = 0; (n < 16) && isxdigit(c = tolower(*s)); n++, s++) {
+ v *= 16;
+ if (isdigit(c)) {
+ v += (c - '0');
+ } else {
+ v += ((c - 'a') + 10);
+ }
+ }
+
+ *sp = s;
+ *valp = v;
+ return (n ? 0 : NNG_EINVAL);
+}
+
+static int
+zt_parsedec(const char **sp, uint64_t *valp)
+{
+ int n;
+ const char *s = *sp;
+ char c;
+ uint64_t v;
+
+ for (v = 0, n = 0; (n < 20) && isdigit(c = *s); n++, s++) {
+ v *= 10;
+ v += (c - '0');
+ }
+ *sp = s;
+ *valp = v;
+ return (n ? 0 : NNG_EINVAL);
+}
+
+static int
+zt_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
+{
+ zt_ep * ep;
+ size_t sz;
+ uint64_t nwid;
+ uint64_t node;
+ uint64_t port;
+ int n;
+ int rv;
+ char c;
+ const char *u;
+
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ // URL parsing...
+ // URL is form zt://<nwid>[/<remoteaddr>]:<port>
+ // The <remoteaddr> part is required for remote dialers, but is
+ // not used at all for listeners. (We have no notion of binding
+ // to different node addresses.)
+ ep->ze_mode = mode;
+ ep->ze_maxmtu = ZT_MAX_MTU;
+ ep->ze_phymtu = ZT_MIN_MTU;
+ ep->ze_aio = NULL;
+ ep->ze_ping_count = zt_ping_count;
+ ep->ze_ping_time = zt_ping_time;
+ ep->ze_proto = nni_sock_proto(sock);
+ sz = sizeof(ep->ze_url);
+
+ nni_aio_list_init(&ep->ze_aios);
+
+ if ((strncmp(url, "zt://", strlen("zt://")) != 0) ||
+ (nni_strlcpy(ep->ze_url, url, sz) >= sz)) {
+ zt_ep_fini(ep);
+ return (NNG_EADDRINVAL);
+ }
+ rv = nni_aio_init(&ep->ze_creq_aio, zt_ep_conn_req_cb, ep);
+ if (rv != 0) {
+ zt_ep_fini(ep);
+ return (rv);
+ }
+
+ u = url + strlen("zt://");
+ // Parse the URL.
+
+ switch (mode) {
+ case NNI_EP_MODE_DIAL:
+ // We require zt://<nwid>/<remotenode>:<port>
+ // The remote node must be a 40 bit address
+ // (max), and we require a non-zero port to
+ // connect to.
+ if ((zt_parsehex(&u, &nwid, 0) != 0) || (*u++ != '/') ||
+ (zt_parsehex(&u, &node, 1) != 0) ||
+ (node > 0xffffffffffull) || (*u++ != ':') ||
+ (zt_parsedec(&u, &port) != 0) || (*u != '\0') ||
+ (port > zt_max_port) || (port == 0)) {
+ return (NNG_EADDRINVAL);
+ }
+ ep->ze_raddr = node;
+ ep->ze_raddr <<= 24;
+ ep->ze_raddr |= port;
+ ep->ze_laddr = 0;
+ break;
+ case NNI_EP_MODE_LISTEN:
+ // Listen mode is just zt://<nwid>:<port>. The
+ // port may be zero in this case, to indicate
+ // that the server should allocate an ephemeral
+ // port. We do allow the same form of URL including
+ // the node address, but that must be zero, a wild
+ // card,
+ // or our own node address.
+ if (zt_parsehex(&u, &nwid, 0) != 0) {
+ return (NNG_EADDRINVAL);
+ }
+ node = 0;
+ // Look for optional node address.
+ if (*u == '/') {
+ u++;
+ if (zt_parsehex(&u, &node, 1) != 0) {
+ return (NNG_EADDRINVAL);
+ }
+ }
+ if ((*u++ != ':') || (zt_parsedec(&u, &port) != 0) ||
+ (*u != '\0') || (port > zt_max_port)) {
+ return (NNG_EADDRINVAL);
+ }
+ ep->ze_laddr = node;
+ ep->ze_laddr <<= 24;
+ ep->ze_laddr |= port;
+ ep->ze_raddr = 0;
+ break;
+ default:
+ NNI_ASSERT(0);
+ break;
+ }
+
+ ep->ze_nwid = nwid;
+
+ nni_mtx_lock(&zt_lk);
+ rv = zt_node_find(ep);
+ nni_mtx_unlock(&zt_lk);
+
+ if (rv != 0) {
+ zt_ep_fini(ep);
+ return (rv);
+ }
+
+ *epp = ep;
+ return (0);
+}
+
+static void
+zt_ep_close(void *arg)
+{
+ zt_ep * ep = arg;
+ zt_node *ztn;
+ nni_aio *aio;
+
+ nni_aio_cancel(ep->ze_creq_aio, NNG_ECLOSED);
+
+ // Cancel any outstanding user operation(s) - they should have
+ // been aborted by the above cancellation, but we need to be
+ // sure, as the cancellation callback may not have run yet.
+
+ nni_mtx_lock(&zt_lk);
+ while ((aio = nni_list_first(&ep->ze_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+
+ // Endpoint framework guarantees to only call us once,
+ // and to not call other things while we are closed.
+ ztn = ep->ze_ztn;
+ // If we're on the ztn node list, pull us off.
+ if (ztn != NULL) {
+ nni_list_node_remove(&ep->ze_link);
+ nni_idhash_remove(ztn->zn_ports, ep->ze_laddr & zt_port_mask);
+ nni_idhash_remove(ztn->zn_eps, ep->ze_laddr);
+ }
+
+ // XXX: clean up the pipe if a dialer
+
+ nni_mtx_unlock(&zt_lk);
+}
+
+static int
+zt_ep_bind_locked(zt_ep *ep)
+{
+ int rv;
+ uint64_t port;
+ uint64_t node;
+ zt_node *ztn;
+
+ // If we haven't already got a ZT node, get one.
+ if ((ztn = ep->ze_ztn) == NULL) {
+ if ((rv = zt_node_find(ep)) != 0) {
+ return (rv);
+ }
+ ztn = ep->ze_ztn;
+ }
+
+ node = ep->ze_laddr >> 24;
+ if ((node != 0) && (node != ztn->zn_self)) {
+ // User requested node id, but it doesn't match our
+ // own.
+ return (NNG_EADDRINVAL);
+ }
+
+ if ((ep->ze_laddr & zt_port_mask) == 0) {
+ // ask for an ephemeral port
+ if ((rv = nni_idhash_alloc(ztn->zn_ports, &port, ep)) != 0) {
+ return (rv);
+ }
+ NNI_ASSERT(port & zt_ephemeral);
+ } else {
+ void *conflict;
+ // make sure port requested is free.
+ port = ep->ze_laddr & zt_port_mask;
+
+ if (nni_idhash_find(ztn->zn_ports, port, &conflict) == 0) {
+ return (NNG_EADDRINUSE);
+ }
+ if ((rv = nni_idhash_insert(ztn->zn_ports, port, ep)) != 0) {
+ return (rv);
+ }
+ }
+ NNI_ASSERT(port <= zt_max_port);
+ NNI_ASSERT(port > 0);
+
+ ep->ze_laddr = ztn->zn_self;
+ ep->ze_laddr <<= 24;
+ ep->ze_laddr |= port;
+
+ if ((rv = nni_idhash_insert(ztn->zn_eps, ep->ze_laddr, ep)) != 0) {
+ nni_idhash_remove(ztn->zn_ports, port);
+ return (rv);
+ }
+
+ return (0);
+}
+
+static int
+zt_ep_bind(void *arg)
+{
+ int rv;
+ zt_ep *ep = arg;
+
+ nni_mtx_lock(&zt_lk);
+ rv = zt_ep_bind_locked(ep);
+ nni_mtx_unlock(&zt_lk);
+
+ return (rv);
+}
+
+static void
+zt_ep_cancel(nni_aio *aio, int rv)
+{
+ zt_ep *ep = aio->a_prov_data;
+
+ nni_mtx_lock(&zt_lk);
+ if (nni_aio_list_active(aio)) {
+ if (ep->ze_aio != NULL) {
+ nni_aio_cancel(ep->ze_aio, rv);
+ }
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&zt_lk);
+}
+
+static void
+zt_ep_doaccept(zt_ep *ep)
+{
+ // Call with ep lock held.
+ nni_time now;
+ zt_pipe *p;
+ int rv;
+
+ now = nni_clock();
+ // Consume any timedout connect requests.
+ while (ep->ze_creq_tail != ep->ze_creq_head) {
+ zt_creq creq;
+ nni_aio *aio;
+
+ creq = ep->ze_creqs[ep->ze_creq_tail % zt_listenq];
+ // Discard old connection requests.
+ if (creq.cr_expire < now) {
+ ep->ze_creq_tail++;
+ continue;
+ }
+
+ if ((aio = nni_list_first(&ep->ze_aios)) == NULL) {
+ // No outstanding accept. We're done.
+ break;
+ }
+
+ // We have both conn request, and a place to accept it.
+
+ // Advance the tail.
+ ep->ze_creq_tail++;
+
+ // We remove this AIO. This keeps it from being canceled.
+ nni_aio_list_remove(aio);
+
+ rv = zt_pipe_init(&p, ep, creq.cr_raddr, ep->ze_laddr);
+ if (rv != 0) {
+ zt_send_err(ep->ze_ztn, ep->ze_nwid, creq.cr_raddr,
+ ep->ze_laddr, zt_err_unknown,
+ "Failed creating pipe");
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ p->zp_peer = creq.cr_proto;
+
+ zt_pipe_send_conn_ack(p);
+ nni_aio_finish_pipe(aio, p);
+ }
+}
+
+static void
+zt_ep_accept(void *arg, nni_aio *aio)
+{
+ zt_ep *ep = arg;
+
+ nni_mtx_lock(&zt_lk);
+ if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
+ nni_aio_list_append(&ep->ze_aios, aio);
+ zt_ep_doaccept(ep);
+ }
+ nni_mtx_unlock(&zt_lk);
+}
+
+static void
+zt_ep_conn_req_cancel(nni_aio *aio, int rv)
+{
+ // We don't have much to do here. The AIO will have been
+ // canceled as a result of the "parent" AIO canceling.
+ nni_aio_finish_error(aio, rv);
+}
+
+static void
+zt_ep_conn_req_cb(void *arg)
+{
+ zt_ep * ep = arg;
+ zt_pipe *p;
+ nni_aio *aio = ep->ze_creq_aio;
+ nni_aio *uaio;
+ int rv;
+
+ NNI_ASSERT(ep->ze_mode == NNI_EP_MODE_DIAL);
+
+ nni_mtx_lock(&zt_lk);
+ rv = nni_aio_result(aio);
+ switch (rv) {
+ case 0:
+ // Already canceled, or already handled?
+ if (((uaio = nni_list_first(&ep->ze_aios)) == NULL) ||
+ ((p = nni_aio_get_pipe(aio)) == NULL)) {
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
+ ep->ze_creq_try = 0;
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_pipe(uaio, p);
+ nni_mtx_unlock(&zt_lk);
+ return;
+
+ case NNG_ETIMEDOUT:
+ if (ep->ze_creq_try <= zt_conn_attempts) {
+ // Timed out, but we can try again.
+ ep->ze_creq_try++;
+ nni_aio_set_timeout(
+ aio, nni_clock() + zt_conn_interval);
+ nni_aio_start(aio, zt_ep_conn_req_cancel, ep);
+ zt_ep_send_conn_req(ep);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
+ break;
+ }
+
+ // These are failure modes. Either we timed out too many
+ // times, or an error occurred.
+
+ ep->ze_creq_try = 0;
+ while ((uaio = nni_list_first(&ep->ze_aios)) != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ nni_mtx_unlock(&zt_lk);
+}
+
+static void
+zt_ep_connect(void *arg, nni_aio *aio)
+{
+ zt_ep * ep = arg;
+
+ // We bind locally. We'll use the address later when we give
+ // it to the pipe, but this allows us to receive the initial
+ // ack back from the server. (This gives us an ephemeral
+ // address to work with.)
+ nni_mtx_lock(&zt_lk);
+
+ if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
+ nni_time now = nni_clock();
+ int rv;
+
+ // Clear the port so we get an ephemeral port.
+ ep->ze_laddr &= ~((uint64_t) zt_port_mask);
+
+ if ((rv = zt_ep_bind_locked(ep)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
+
+ if ((ep->ze_raddr >> 24) == 0) {
+ ep->ze_raddr |= (ep->ze_ztn->zn_self << 24);
+ }
+ nni_aio_list_append(&ep->ze_aios, aio);
+
+ ep->ze_creq_try = 1;
+
+ nni_aio_set_timeout(ep->ze_creq_aio, now + zt_conn_interval);
+ // This can't fail -- the only way the ze_creq_aio gets
+ // terminated would have required us to have also
+ // canceled the user AIO and held the lock.
+ (void) nni_aio_start(
+ ep->ze_creq_aio, zt_ep_conn_req_cancel, ep);
+
+ // We send out the first connect message; it we are not
+ // yet attached to the network the message will be dropped.
+ zt_ep_send_conn_req(ep);
+ }
+ nni_mtx_unlock(&zt_lk);
+}
+
+static int
+zt_ep_setopt(void *arg, int opt, const void *data, size_t size)
+{
+ zt_ep *ep = arg;
+ int i;
+ int rv = NNG_ENOTSUP;
+
+ if (opt == nng_optid_recvmaxsz) {
+ nni_mtx_lock(&zt_lk);
+ rv = nni_setopt_size(
+ &ep->ze_rcvmax, data, size, 0, 0xffffffffu);
+ nni_mtx_unlock(&zt_lk);
+ } else if (opt == nng_optid_zt_home) {
+ // XXX: check to make sure not started...
+ i = nni_strnlen((const char *) data, size);
+ if ((i >= size) || (i >= NNG_MAXADDRLEN)) {
+ return (NNG_EINVAL);
+ }
+ nni_mtx_lock(&zt_lk);
+ nni_strlcpy(ep->ze_home, data, sizeof(ep->ze_home));
+ rv = zt_node_find(ep);
+ if (rv != 0) {
+ ep->ze_ztn = NULL;
+ }
+ nni_mtx_unlock(&zt_lk);
+ rv = 0;
+ } else if (opt == nng_optid_zt_ping_count) {
+ nni_mtx_lock(&zt_lk);
+ rv =
+ nni_setopt_int(&ep->ze_ping_count, data, size, 0, 1000000);
+ nni_mtx_unlock(&zt_lk);
+ } else if (opt == nng_optid_zt_ping_time) {
+ nni_mtx_lock(&zt_lk);
+ rv = nni_setopt_usec(&ep->ze_ping_time, data, size);
+ nni_mtx_unlock(&zt_lk);
+ }
+ return (rv);
+}
+
+static int
+zt_ep_getopt(void *arg, int opt, void *data, size_t *szp)
+{
+ zt_ep *ep = arg;
+ int rv = NNG_ENOTSUP;
+
+ if (opt == nng_optid_recvmaxsz) {
+ nni_mtx_lock(&zt_lk);
+ rv = nni_getopt_size(ep->ze_rcvmax, data, szp);
+ nni_mtx_unlock(&zt_lk);
+ } else if (opt == nng_optid_zt_home) {
+ nni_mtx_lock(&zt_lk);
+ rv = nni_getopt_str(ep->ze_home, data, szp);
+ nni_mtx_unlock(&zt_lk);
+ } else if (opt == nng_optid_zt_node) {
+ nni_mtx_lock(&zt_lk);
+ rv = nni_getopt_u64(ep->ze_ztn->zn_self, data, szp);
+ nni_mtx_unlock(&zt_lk);
+ } else if (opt == nng_optid_zt_nwid) {
+ nni_mtx_lock(&zt_lk);
+ rv = nni_getopt_u64(ep->ze_nwid, data, szp);
+ nni_mtx_unlock(&zt_lk);
+ } else if (opt == nng_optid_zt_ping_count) {
+ nni_mtx_lock(&zt_lk);
+ rv = nni_getopt_int(ep->ze_ping_count, data, szp);
+ nni_mtx_unlock(&zt_lk);
+ } else if (opt == nng_optid_zt_ping_time) {
+ nni_mtx_lock(&zt_lk);
+ rv = nni_getopt_usec(ep->ze_ping_time, data, szp);
+ nni_mtx_unlock(&zt_lk);
+ } else if (opt == nng_optid_zt_local_port) {
+ nni_mtx_lock(&zt_lk);
+ rv = nni_getopt_int(
+ (int) (ep->ze_laddr & zt_port_mask), data, szp);
+ nni_mtx_unlock(&zt_lk);
+ } else if (opt == nng_optid_zt_network_name) {
+ rv =
+ zt_getopt_network_name(ep->ze_ztn, ep->ze_nwid, data, szp);
+ } else if (opt == nng_optid_zt_status) {
+ rv = zt_getopt_status(ep->ze_ztn, ep->ze_nwid, data, szp);
+ }
+ return (rv);
+}
+
+static nni_tran_pipe zt_pipe_ops = {
+ .p_fini = zt_pipe_fini,
+ .p_start = zt_pipe_start,
+ .p_send = zt_pipe_send,
+ .p_recv = zt_pipe_recv,
+ .p_close = zt_pipe_close,
+ .p_peer = zt_pipe_peer,
+ .p_getopt = zt_pipe_getopt,
+};
+
+static nni_tran_ep zt_ep_ops = {
+ .ep_init = zt_ep_init,
+ .ep_fini = zt_ep_fini,
+ .ep_connect = zt_ep_connect,
+ .ep_bind = zt_ep_bind,
+ .ep_accept = zt_ep_accept,
+ .ep_close = zt_ep_close,
+ .ep_setopt = zt_ep_setopt,
+ .ep_getopt = zt_ep_getopt,
+};
+
+// This is the ZeroTier transport linkage, and should be the
+// only global symbol in this entire file.
+static struct nni_tran zt_tran = {
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "zt",
+ .tran_ep = &zt_ep_ops,
+ .tran_pipe = &zt_pipe_ops,
+ .tran_chkopt = zt_chkopt,
+ .tran_init = zt_tran_init,
+ .tran_fini = zt_tran_fini,
+};
+
+int
+nng_zt_register(void)
+{
+ return (nni_tran_register(&zt_tran));
+}
+
+#endif
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 0c0973b2..fdfd6055 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -128,6 +128,10 @@ add_nng_test(device 5)
add_nng_test(errors 2)
add_nng_test(pair1 5)
+if (NNG_HAVE_ZEROTIER)
+ add_nng_test(zt 60)
+endif()
+
# compatbility tests
add_nng_compat_test(compat_block 5)
add_nng_compat_test(compat_bug777 5)
diff --git a/tests/sock.c b/tests/sock.c
index 21b12792..6c911250 100644
--- a/tests/sock.c
+++ b/tests/sock.c
@@ -182,6 +182,7 @@ TestMain("Socket Operations", {
&sz) == 0);
So(v == 0);
+ sz = 0;
So(nng_getopt(s1, nng_optid_reconnmaxt, &v,
&sz) == 0);
So(v == 0);
diff --git a/tests/trantest.h b/tests/trantest.h
index afb5a881..2e5f859d 100644
--- a/tests/trantest.h
+++ b/tests/trantest.h
@@ -137,6 +137,8 @@ trantest_send_recv(trantest *tt)
So(nng_dial(tt->reqsock, tt->addr, &d, 0) == 0);
So(d != 0);
+ nng_usleep(20000); // listener may be behind slightly
+
send = NULL;
So(nng_msg_alloc(&send, 0) == 0);
So(send != NULL);
diff --git a/tests/zt.c b/tests/zt.c
new file mode 100644
index 00000000..fe6023a1
--- /dev/null
+++ b/tests/zt.c
@@ -0,0 +1,195 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "trantest.h"
+
+extern int nng_zt_register(void);
+extern const char *nng_opt_zt_home;
+extern int nng_optid_zt_home;
+extern int nng_optid_zt_node;
+extern int nng_optid_zt_status;
+extern int nng_optid_zt_network_name;
+extern int nng_zt_status_ok;
+
+// zerotier tests.
+
+// This network is an open network setup exclusively for nng testing.
+// Do not attach to it in production.
+#define NWID "a09acf02337b057b"
+
+#ifdef _WIN32
+
+int
+mkdir(const char *path, int mode)
+{
+ CreateDirectory(path, NULL);
+}
+#else
+#include <sys/stat.h>
+#include <unistd.h>
+#endif // WIN32
+
+TestMain("ZeroTier Transport", {
+
+ char path1[NNG_MAXADDRLEN] = "/tmp/zt_server";
+ char path2[NNG_MAXADDRLEN] = "/tmp/zt_client";
+ unsigned port;
+
+ port = 5555;
+
+ Convey("We can register the zero tier transport",
+ { So(nng_zt_register() == 0); });
+
+ Convey("We can create a zt listener", {
+ nng_listener l;
+ nng_socket s;
+ char addr[NNG_MAXADDRLEN];
+ int rv;
+
+ snprintf(addr, sizeof(addr), "zt://" NWID ":%u", port);
+
+ So(nng_pair_open(&s) == 0);
+ Reset({ nng_close(s); });
+
+ So(nng_listener_create(&l, s, addr) == 0);
+
+ Convey("We can lookup zerotier home option id", {
+ So(nng_optid_zt_home > 0);
+ So(nng_option_lookup(nng_opt_zt_home) ==
+ nng_optid_zt_home);
+ });
+
+ Convey("And it can be started...", {
+
+ mkdir(path1, 0700);
+
+ So(nng_listener_setopt(l, nng_optid_zt_home, path1,
+ strlen(path1) + 1) == 0);
+
+ So(nng_listener_start(l, 0) == 0);
+ })
+ });
+
+ Convey("We can create a zt dialer", {
+ nng_dialer d;
+ nng_socket s;
+ char addr[NNG_MAXADDRLEN];
+ int rv;
+ // uint64_t node = 0xb000072fa6ull; // my personal host
+ uint64_t node = 0x2d2f619cccull; // my personal host
+
+ snprintf(addr, sizeof(addr), "zt://" NWID "/%llx:%u",
+ (unsigned long long) node, port);
+
+ So(nng_pair_open(&s) == 0);
+ Reset({ nng_close(s); });
+
+ So(nng_dialer_create(&d, s, addr) == 0);
+
+ Convey("We can lookup zerotier home option id", {
+ So(nng_optid_zt_home > 0);
+ So(nng_option_lookup(nng_opt_zt_home) ==
+ nng_optid_zt_home);
+ });
+ });
+
+ Convey("We can create an ephemeral listener", {
+ nng_dialer d;
+ nng_listener l;
+ nng_socket s;
+ char addr[NNG_MAXADDRLEN];
+ int rv;
+ uint64_t node1 = 0;
+ uint64_t node2 = 0;
+
+ snprintf(addr, sizeof(addr), "zt://" NWID ":%u", port);
+
+ So(nng_pair_open(&s) == 0);
+ Reset({ nng_close(s); });
+
+ So(nng_listener_create(&l, s, addr) == 0);
+
+ So(nng_listener_getopt_usec(l, nng_optid_zt_node, &node1) ==
+ 0);
+ So(node1 != 0);
+
+ Convey("Network name & status options work", {
+ char name[NNG_MAXADDRLEN];
+ size_t namesz;
+ int status;
+
+ namesz = sizeof(name);
+ nng_usleep(10000000);
+ So(nng_listener_getopt(l, nng_optid_zt_network_name,
+ name, &namesz) == 0);
+ So(strcmp(name, "nng_test_open") == 0);
+ So(nng_listener_getopt_int(
+ l, nng_optid_zt_status, &status) == 0);
+ So(status == nng_zt_status_ok);
+ });
+ Convey("Connection refused works", {
+ snprintf(addr, sizeof(addr), "zt://" NWID "/%llx:%u",
+ (unsigned long long) node1, 42u);
+ So(nng_dialer_create(&d, s, addr) == 0);
+ So(nng_dialer_getopt_usec(
+ d, nng_optid_zt_node, &node2) == 0);
+ So(node2 == node1);
+ So(nng_dialer_start(d, 0) == NNG_ECONNREFUSED);
+ });
+ });
+
+ Convey("We can create a zt pair (dialer & listener)", {
+ nng_dialer d;
+ nng_listener l;
+ nng_socket s1;
+ nng_socket s2;
+ char addr1[NNG_MAXADDRLEN];
+ char addr2[NNG_MAXADDRLEN];
+ int rv;
+ uint64_t node;
+
+ port = 9944;
+ // uint64_t node = 0xb000072fa6ull; // my personal host
+
+ snprintf(addr1, sizeof(addr1), "zt://" NWID ":%u", port);
+
+ So(nng_pair_open(&s1) == 0);
+ So(nng_pair_open(&s2) == 0);
+ Reset({
+ nng_close(s1);
+ // This sleep allows us to ensure disconnect
+ // messages work.
+ nng_usleep(1000000);
+ nng_close(s2);
+ });
+
+ So(nng_listener_create(&l, s1, addr1) == 0);
+ So(nng_listener_setopt(
+ l, nng_optid_zt_home, path1, strlen(path1) + 1) == 0);
+
+ So(nng_listener_start(l, 0) == 0);
+ node = 0;
+ So(nng_listener_getopt_usec(l, nng_optid_zt_node, &node) == 0);
+ So(node != 0);
+
+ snprintf(addr2, sizeof(addr2), "zt://" NWID "/%llx:%u",
+ (unsigned long long) node, port);
+ So(nng_dialer_create(&d, s2, addr2) == 0);
+ So(nng_dialer_setopt(
+ d, nng_optid_zt_home, path2, strlen(path2) + 1) == 0);
+ So(nng_dialer_start(d, 0) == 0);
+
+ });
+
+ trantest_test_all("zt://" NWID "/*:%u");
+
+ nng_fini();
+})