aboutsummaryrefslogtreecommitdiff
path: root/src/sp
diff options
context:
space:
mode:
Diffstat (limited to 'src/sp')
-rw-r--r--src/sp/transport.c6
-rw-r--r--src/sp/transport.h7
-rw-r--r--src/sp/transport/CMakeLists.txt3
-rw-r--r--src/sp/transport/dtls/CMakeLists.txt17
-rw-r--r--src/sp/transport/dtls/dtls.c1802
-rw-r--r--src/sp/transport/dtls/dtls_tran_test.c345
-rw-r--r--src/sp/transport/inproc/CMakeLists.txt10
-rw-r--r--src/sp/transport/inproc/inproc.c8
-rw-r--r--src/sp/transport/ipc/CMakeLists.txt10
-rw-r--r--src/sp/transport/ipc/ipc.c10
-rw-r--r--src/sp/transport/socket/CMakeLists.txt10
-rw-r--r--src/sp/transport/socket/sockfd.c8
-rw-r--r--src/sp/transport/tcp/CMakeLists.txt10
-rw-r--r--src/sp/transport/tcp/tcp.c8
-rw-r--r--src/sp/transport/tls/CMakeLists.txt10
-rw-r--r--src/sp/transport/tls/tls.c8
-rw-r--r--src/sp/transport/tls/tls_tran_test.c2
-rw-r--r--src/sp/transport/udp/CMakeLists.txt10
-rw-r--r--src/sp/transport/udp/udp.c8
-rw-r--r--src/sp/transport/ws/CMakeLists.txt15
-rw-r--r--src/sp/transport/ws/websocket.c8
21 files changed, 2275 insertions, 40 deletions
diff --git a/src/sp/transport.c b/src/sp/transport.c
index e1c2737e..1f2e0021 100644
--- a/src/sp/transport.c
+++ b/src/sp/transport.c
@@ -105,6 +105,9 @@ extern void nni_sp_wss_register(void);
#ifdef NNG_TRANSPORT_FDC
extern void nni_sp_sfd_register(void);
#endif
+#ifdef NNG_TRANSPORT_DTLS
+extern void nni_sp_dtls_register(void);
+#endif
void
nni_sp_tran_sys_init(void)
@@ -133,6 +136,9 @@ nni_sp_tran_sys_init(void)
#ifdef NNG_TRANSPORT_FDC
nni_sp_sfd_register();
#endif
+#ifdef NNG_TRANSPORT_DTLS
+ nni_sp_dtls_register();
+#endif
}
// nni_sp_tran_sys_fini finalizes the entire transport system, including all
diff --git a/src/sp/transport.h b/src/sp/transport.h
index 9d67b7c2..6aa2086b 100644
--- a/src/sp/transport.h
+++ b/src/sp/transport.h
@@ -139,7 +139,12 @@ struct nni_sp_pipe_ops {
// p_init initializes the pipe data structures. The main
// purpose of this is so that the pipe will see the upper
// layer nni_pipe and get a chance to register stats and such.
- size_t p_size;
+ // size_t p_size;
+
+ // p_size returns the size of the transport data needed for a pipe.
+ // This allows for dynamic registration of context size to allow for
+ // different tunings or different runtimes.
+ size_t (*p_size)(void);
// p_init initializes the transport's pipe data structure.
// The pipe MUST be left in a state that p_fini can be safely
diff --git a/src/sp/transport/CMakeLists.txt b/src/sp/transport/CMakeLists.txt
index d0875e57..5b1c9b17 100644
--- a/src/sp/transport/CMakeLists.txt
+++ b/src/sp/transport/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2024 Staysail Systems, Inc. <info@staystail.tech>
+# Copyright 2025 Staysail Systems, Inc. <info@staystail.tech>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
@@ -15,5 +15,6 @@ add_subdirectory(inproc)
add_subdirectory(ipc)
add_subdirectory(tcp)
add_subdirectory(tls)
+add_subdirectory(dtls)
add_subdirectory(udp)
add_subdirectory(ws)
diff --git a/src/sp/transport/dtls/CMakeLists.txt b/src/sp/transport/dtls/CMakeLists.txt
new file mode 100644
index 00000000..e1472345
--- /dev/null
+++ b/src/sp/transport/dtls/CMakeLists.txt
@@ -0,0 +1,17 @@
+#
+# Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# DTLS transport
+nng_directory(dtls)
+
+if (NNG_TRANSPORT_DTLS)
+ nng_sources(dtls.c)
+ nng_defines(NNG_TRANSPORT_DTLS)
+ nng_test(dtls_tran_test)
+endif()
diff --git a/src/sp/transport/dtls/dtls.c b/src/sp/transport/dtls/dtls.c
new file mode 100644
index 00000000..02961a56
--- /dev/null
+++ b/src/sp/transport/dtls/dtls.c
@@ -0,0 +1,1802 @@
+// Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/aio.h"
+#include "core/defs.h"
+#include "core/idhash.h"
+#include "core/message.h"
+#include "core/nng_impl.h"
+#include "core/options.h"
+#include "core/pipe.h"
+#include "core/platform.h"
+#include "core/socket.h"
+#include "core/stats.h"
+#include "nng/nng.h"
+#include "supplemental/tls/tls_common.h"
+
+#include <string.h>
+
+// Experimental DTLS transport. Unicast only.
+//
+typedef struct dtls_pipe dtls_pipe;
+typedef struct dtls_ep dtls_ep;
+typedef struct dtls_conn dtls_conn;
+
+const uint8_t PROTO_VERSION = 1;
+
+// OP code, 8 bits
+enum dtls_opcode {
+ OPCODE_DATA = 0,
+ OPCODE_CREQ = 1,
+ OPCODE_CACK = 2,
+ OPCODE_DISC = 3,
+};
+
+// Disconnect reason, must be 16 bits
+typedef enum dtls_disc_reason {
+ DISC_CLOSED = 0, // normal close
+ DISC_TYPE = 1, // bad SP type
+ DISC_NOTCONN = 2, // no such connection
+ DISC_REFUSED = 3, // refused by policy
+ DISC_MSGSIZE = 4, // message too large
+ DISC_NEGO = 5, // neogtiation failed
+ DISC_INACTIVE = 6, // closed due to inactivity
+ DISC_PROTO = 7, // other protocol error
+ DISC_NOBUF = 8, // resources exhausted
+} dtls_disc_reason;
+
+#ifndef NNG_DTLS_TXQUEUE_LEN
+#define NNG_DTLS_TXQUEUE_LEN 32
+#endif
+
+#ifndef NNG_DTLS_RXQUEUE_LEN
+#define NNG_DTLS_RXQUEUE_LEN 16
+#endif
+
+// The maximum TLS record size
+#define DTLS_MAX_RECORD 16384
+
+// For DTLS we use a maximum record size of 16384,
+// but we reserve some space for headers. DTLS needs
+// 13 bytes, and the transport layer needs 8 bytes.
+// To leave some room for the future, we just trim to 64 bytes.
+#ifndef NNG_DTLS_RECVMAX
+#define NNG_DTLS_RECVMAX (DTLS_MAX_RECORD - 64)
+#endif
+
+#ifndef NNG_DTLS_REFRESH
+#define NNG_DTLS_REFRESH (5 * NNI_SECOND)
+#endif
+
+#ifndef NNG_DTLS_CONNRETRY
+#define NNG_DTLS_CONNRETRY (NNI_SECOND / 5)
+#endif
+
+// 64-bit protocol header
+typedef struct dtls_sp_hdr {
+ uint8_t us_ver;
+ uint8_t us_op_code;
+ uint16_t us_type;
+ uint16_t us_params[2];
+} dtls_sp_hdr;
+
+// DTLS pipe resend (CREQ) in msec (nng_duration)
+#define DTLS_PIPE_REFRESH(p) ((p)->refresh)
+
+// DTLS pipe timeout in msec (nng_duration)
+#define DTLS_PIPE_TIMEOUT(p) ((p)->refresh * 5)
+
+struct dtls_pipe {
+ dtls_ep *ep;
+ nni_pipe *npipe;
+ nng_sockaddr peer_addr;
+ uint64_t id; // hash of peer address
+ uint16_t peer;
+ uint16_t proto;
+ bool matched; // true if have matched and given this to SP
+ bool closed; // true if we are closed (no more send or recv!)
+ bool dialer; // true if we are dialer
+ nng_duration refresh; // seconds, for the protocol
+ nng_time next_wake;
+ nng_time expire; // inactivity expiration time
+ nng_time next_refresh;
+ nni_list_node node;
+ nni_lmq rx_mq;
+
+ // Upper layer queues. These are between the PIPE and SP.
+ bool send_busy; // true if send is in process
+ uint16_t send_max; // peer's max recv size
+ nni_list send_aios;
+ uint8_t *send_buf;
+ size_t send_bufsz;
+ nng_aio send_tls_aio;
+
+ bool recv_busy;
+ bool recv_rdy; // receive is done and data in recvbuf
+ uint16_t recv_max; // max recv size
+ nni_list recv_aios;
+ uint8_t *recv_buf;
+ size_t recv_bufsz;
+ nng_aio recv_tls_aio;
+
+ // Lower layer queues. These are between the
+
+ uint8_t send_op; // usually OPCODE_DATA
+ uint8_t last_op; // last op code we sent
+ uint16_t reason; // only for disconnect
+
+ nni_mtx lower_mtx; // protects the lower rx_q, etc.
+
+ // This is the lower level RX buffer, which contains only
+ // received ciphertext (content before passed to TLS layer for
+ // decrypt). The actual pointers may change, as we "swap"
+ // buffers between the endpoint and the pipe to avoid copying.
+ nni_list rx_q; // lower aio from the TLS layer
+
+ nni_tls_conn tls;
+};
+
+struct dtls_ep {
+ nng_udp *udp;
+ nni_mtx mtx;
+ uint16_t proto;
+ uint16_t peer;
+ uint16_t af; // address family
+ bool fini;
+ bool started;
+ bool closed;
+ nng_url *url;
+ const char *host; // for dialers
+ nni_aio *useraio;
+ nni_aio *connaio;
+ nni_aio timeaio;
+ nni_aio resaio;
+ bool dialer;
+ nni_listener *nlistener;
+ nni_dialer *ndialer;
+ nni_msg *rx_payload; // current receive message
+ nng_sockaddr rx_sa; // addr for last message
+ nni_aio tx_aio; // aio for TX handling
+ nni_aio rx_aio; // aio for RX handling
+ nni_id_map pipes; // pipes (indexed by id)
+ nni_sockaddr self_sa; // our address
+ nni_sockaddr peer_sa; // peer address, only for dialer;
+ nni_list connaios; // aios from accept waiting for a client peer
+ nni_list connpipes; // pipes waiting to be connected
+ nng_duration refresh; // refresh interval for connections in seconds
+ uint16_t rcvmax; // max payload, trimmed to uint16_t
+ nni_resolv_item resolv;
+
+ nng_tls_config *tlscfg;
+
+ size_t rx_size; // size of the rx buffer
+ void *rx_buf;
+
+ nni_stat_item st_rcv_max;
+ nni_stat_item st_rcv_reorder;
+ nni_stat_item st_rcv_toobig;
+ nni_stat_item st_rcv_nomatch;
+ nni_stat_item st_rcv_copy;
+ nni_stat_item st_rcv_nocopy;
+ nni_stat_item st_rcv_nobuf;
+ nni_stat_item st_snd_toobig;
+ nni_stat_item st_snd_nobuf;
+ nni_stat_item st_peer_inactive;
+ nni_stat_item st_copy_max;
+};
+
+static void dtls_ep_start(dtls_ep *);
+static void dtls_resolv_cb(void *);
+static void dtls_rx_cb(void *);
+
+static void dtls_ep_match(dtls_ep *ep);
+static void dtls_remove_pipe(dtls_pipe *p);
+
+// BIO send/recv functions for use by the common TLS layer.
+
+static void
+dtls_bio_cancel(nng_aio *aio, void *arg, nng_err rv)
+{
+ dtls_pipe *p = arg;
+ nni_mtx_lock(&p->lower_mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ }
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&p->lower_mtx);
+}
+
+static void
+dtls_bio_recv_done(dtls_pipe *p)
+{
+ nng_aio *aio;
+ uint8_t *ptr;
+ size_t resid;
+ nni_msg *msg;
+
+ while ((!nni_lmq_empty(&p->rx_mq)) &&
+ ((aio = nni_list_first(&p->rx_q)) != NULL)) {
+
+ nni_aio_list_remove(aio);
+ nni_lmq_get(&p->rx_mq, &msg);
+
+ // assumption we only have a body, because we don't bother to
+ // fill in the header for raw UDP.
+
+ resid = nni_msg_len(msg);
+ ptr = nni_msg_body(msg);
+
+ for (unsigned i = 0; i < aio->a_nio && resid > 0; i++) {
+ size_t num = resid > aio->a_iov[i].iov_len
+ ? aio->a_iov[i].iov_len
+ : resid;
+ memcpy(aio->a_iov[i].iov_buf, ptr, num);
+ ptr += num;
+ resid -= num;
+ }
+ nni_aio_finish(aio, NNG_OK, nni_msg_len(msg));
+ nni_msg_free(msg);
+ }
+}
+
+static void
+dtls_bio_recv(void *arg, nng_aio *aio)
+{
+ dtls_pipe *p = arg;
+
+ nni_mtx_lock(&p->lower_mtx);
+ if (!nni_aio_start(aio, dtls_bio_cancel, p)) {
+ nni_mtx_unlock(&p->lower_mtx);
+ return;
+ }
+
+ nni_aio_list_append(&p->rx_q, aio);
+ dtls_bio_recv_done(p);
+ nni_mtx_unlock(&p->lower_mtx);
+}
+
+static void
+dtls_bio_send(void *arg, nng_aio *aio)
+{
+ dtls_pipe *p = arg;
+
+ nni_mtx_lock(&p->lower_mtx);
+ if (!p->closed) {
+ nni_aio_set_input(aio, 0, &p->peer_addr);
+ nng_udp_send(p->ep->udp, aio);
+ }
+ nni_mtx_unlock(&p->lower_mtx);
+}
+
+static void
+dtls_bio_free(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+dtls_bio_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+dtls_bio_stop(void *arg)
+{
+ dtls_pipe *p = arg;
+ nni_aio_stop(&p->recv_tls_aio);
+ nni_aio_stop(&p->send_tls_aio);
+}
+
+static nni_tls_bio_ops dtls_bio_ops = {
+ .bio_send = dtls_bio_send,
+ .bio_recv = dtls_bio_recv,
+ .bio_close = dtls_bio_close,
+ .bio_stop = dtls_bio_stop,
+ .bio_free = dtls_bio_free,
+};
+
+static void
+dtls_tran_init(void)
+{
+}
+
+static void
+dtls_tran_fini(void)
+{
+}
+
+//
+// Upper layer functions - moving data between TLS and SP.
+// TLS acts as kind of a stream for us, so we only see the
+// data that is meant for us, but we will send and receive
+// control messages that are not just data payloads.
+//
+
+static void dtls_pipe_send_cancel(nng_aio *, void *, nng_err);
+static void dtls_pipe_send_tls(dtls_pipe *);
+static void dtls_pipe_send_tls_cb(void *arg);
+
+static void
+dtls_pipe_send(void *arg, nni_aio *aio)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep;
+ nng_msg *msg;
+ size_t count = 0;
+ size_t sndmax;
+
+ msg = nni_aio_get_msg(aio);
+ ep = p->ep;
+
+ if (msg != NULL) {
+ count = nni_msg_len(msg) + nni_msg_header_len(msg);
+ }
+
+ nni_mtx_lock(&ep->mtx);
+ sndmax = p->send_max;
+ if (!nni_aio_start(aio, dtls_pipe_send_cancel, p)) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ nni_aio_reset(aio);
+ if ((nni_msg_len(msg) + nni_msg_header_len(msg)) > sndmax) {
+ // rather failing this with an error, we just drop it
+ // on the floor. this is on the sender, so there isn't
+ // a compelling need to disconnect the pipe, since it
+ // we're not being "ill-behaved" to our peer.
+ nni_stat_inc(&ep->st_snd_toobig, 1);
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish(aio, 0, count);
+ nni_msg_free(msg);
+ return;
+ }
+
+ nni_aio_list_append(&p->send_aios, aio);
+ dtls_pipe_send_tls(p);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_pipe_send_cancel(nng_aio *aio, void *arg, nng_err err)
+{
+ dtls_pipe *p = arg;
+ nni_mtx_lock(&p->ep->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, err);
+ }
+ nni_mtx_unlock(&p->ep->mtx);
+}
+
+// Lower layer send/recv functions, used by the pipe layer.
+
+static void
+dtls_pipe_send_tls(dtls_pipe *p)
+{
+ nni_aio *aio;
+ nng_msg *msg;
+ uint8_t opcode;
+ nng_iov iov;
+ dtls_sp_hdr *hdr = (void *) p->send_buf;
+
+ if (p->send_busy || p->closed) {
+ return;
+ }
+
+ opcode = p->send_op;
+ // reset the last op
+ p->send_op = OPCODE_DATA;
+
+ hdr->us_ver = PROTO_VERSION;
+ hdr->us_op_code = opcode;
+ NNI_PUT16LE(&hdr->us_type, p->proto);
+ hdr->us_params[0] = 0;
+ hdr->us_params[1] = 0;
+
+ iov.iov_buf = hdr;
+ iov.iov_len = sizeof(*hdr);
+
+ switch (opcode) {
+ case OPCODE_DATA:
+ for (;;) {
+ if ((aio = nni_list_first(&p->send_aios)) == NULL) {
+ // no work for us!
+ return;
+ }
+ nni_aio_list_remove(aio);
+ msg = nni_aio_get_msg(aio);
+ if (nni_msg_header_len(msg) + nni_msg_len(msg) +
+ sizeof(*hdr) >
+ p->send_bufsz) {
+ nng_msg_free(msg);
+ nni_aio_finish_error(aio, NNG_EMSGSIZE);
+ continue;
+ }
+ break; // for loop
+ }
+
+ size_t len = nni_msg_header_len(msg);
+ uint8_t *data = (void *) (hdr + 1);
+ memcpy(data, nni_msg_header(msg), len);
+ data += len;
+ memcpy(data, nni_msg_body(msg), nni_msg_len(msg));
+ len += nni_msg_len(msg);
+
+ NNI_PUT16LE(&hdr->us_params[0], (uint16_t) len);
+ iov.iov_len += len;
+
+ nni_msg_free(msg);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
+ break;
+
+ case OPCODE_CREQ:
+ case OPCODE_CACK:
+ NNI_PUT16LE(&hdr->us_params[0], p->recv_max);
+ NNI_PUT16LE(&hdr->us_params[1], p->refresh);
+ break;
+
+ case OPCODE_DISC:
+ NNI_PUT16LE(&hdr->us_params[0], p->reason);
+ p->closed = true;
+ break;
+ default:
+ NNI_ASSERT(false); // this should never happen!
+ // fall back to sending a disconnect
+ hdr->us_op_code = OPCODE_DISC;
+ NNI_PUT16LE(&hdr->us_params[0], DISC_PROTO);
+ }
+
+ p->last_op = opcode;
+ p->send_busy = true;
+ nni_aio_set_iov(&p->send_tls_aio, 1, &iov);
+ nni_tls_send(&p->tls, &p->send_tls_aio);
+}
+
+static void
+dtls_pipe_send_tls_cb(void *arg)
+{
+ dtls_pipe *p = arg;
+
+ nni_mtx_lock(&p->ep->mtx);
+
+ p->send_busy = false;
+ if (nni_aio_result(&p->send_tls_aio) != NNG_OK ||
+ p->last_op == OPCODE_DISC) {
+ nni_pipe_close(p->npipe);
+ if (p->matched == 0) {
+ dtls_remove_pipe(p);
+ }
+ nni_mtx_unlock(&p->ep->mtx);
+ return;
+ }
+ dtls_pipe_send_tls(p);
+ nni_mtx_unlock(&p->ep->mtx);
+}
+
+// RECV SIDE
+
+static void dtls_pipe_recv_cancel(nni_aio *, void *, nng_err);
+static void dtls_pipe_recv_tls(dtls_pipe *);
+
+static void
+dtls_pipe_recv(void *arg, nni_aio *aio)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+
+ nni_aio_reset(aio);
+ nni_mtx_lock(&ep->mtx);
+ if (p->closed) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if (!nni_aio_start(aio, dtls_pipe_recv_cancel, p)) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ nni_list_append(&p->recv_aios, aio);
+ dtls_pipe_recv_tls(p);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_pipe_recv_cancel(nni_aio *aio, void *arg, nng_err rv)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+
+ nni_mtx_lock(&ep->mtx);
+ if (!nni_aio_list_active(aio)) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+}
+
+static void
+dtls_pipe_recv_tls_start(dtls_pipe *p)
+{
+ nng_iov iov;
+ if (p->recv_busy || p->closed) {
+ return;
+ }
+ p->recv_busy = true;
+ iov.iov_buf = p->recv_buf;
+ iov.iov_len = p->recv_bufsz;
+
+ nni_aio_set_iov(&p->recv_tls_aio, 1, &iov);
+ nni_tls_recv(&p->tls, &p->recv_tls_aio);
+}
+
+static void
+dtls_pipe_recv_tls(dtls_pipe *p)
+{
+ nng_aio *aio = nni_list_first(&p->recv_aios);
+ size_t len;
+ nng_msg *msg;
+ int rv;
+
+ if (aio == NULL) {
+ return;
+ }
+ if (!p->recv_rdy) {
+ dtls_pipe_recv_tls_start(p);
+ return;
+ }
+
+ p->recv_rdy = false;
+
+ nni_aio_list_remove(aio);
+ len = nng_aio_count(&p->recv_tls_aio);
+ NNI_ASSERT(len >= sizeof(dtls_sp_hdr));
+ len -= sizeof(dtls_sp_hdr);
+
+ if ((rv = nni_msg_alloc(&msg, len)) != NNG_OK) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ memcpy(nng_msg_body(msg), p->recv_buf + sizeof(dtls_sp_hdr), len);
+ nni_aio_finish_msg(aio, msg);
+}
+
+static void
+dtls_pipe_recv_tls_cb(void *arg)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+ dtls_sp_hdr *hdr = (void *) p->recv_buf;
+ nng_aio *aio = &p->recv_tls_aio;
+ uint16_t proto;
+ uint16_t refresh;
+ uint16_t rcvmax;
+ nng_err rv;
+
+ nni_mtx_lock(&ep->mtx);
+ p->recv_busy = false;
+
+ if ((rv = nni_aio_result(aio)) != NNG_OK) {
+
+ // If we didn't connect yet, issue an error so the peer can see
+ // a connection failure (e.g. if we failed the TLS handshake.)
+ if (p->dialer && !p->matched) {
+ nni_aio *caio;
+ if ((caio = nni_list_first(&ep->connaios)) != NULL) {
+ nni_aio_list_remove(caio);
+ nni_aio_finish_error(caio, rv);
+ }
+ }
+
+ // Bump a bad receive stat (e.g. someone may have sent us
+ // garbage.) We do not acknowledge or handle garbage frames
+ // sent to an open session.
+ nni_pipe_close(p->npipe);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ // We had a "good" receive (TLS passed at least) from the peer.
+
+ if (nni_aio_count(aio) < sizeof(*hdr)) {
+ // Runt frame.
+ p->send_op = OPCODE_DISC;
+ p->reason = DISC_PROTO;
+ goto bad;
+ }
+
+ if (nni_aio_count(aio) > sizeof(*hdr) + p->recv_max) {
+ p->send_op = OPCODE_DISC;
+ p->reason = DISC_MSGSIZE;
+ goto bad;
+ }
+
+ if (hdr->us_ver != PROTO_VERSION) {
+ // Bad protocol version
+ p->send_op = OPCODE_DISC;
+ p->reason = DISC_PROTO;
+ goto bad;
+ }
+ NNI_GET16LE(&hdr->us_type, proto);
+ if (proto != p->peer) {
+ // Bad SP protocol type
+ p->send_op = OPCODE_DISC;
+ p->reason = DISC_TYPE;
+ goto bad;
+ }
+
+ p->expire = nni_clock() + DTLS_PIPE_TIMEOUT(p);
+
+ if (!p->matched) {
+ p->matched = true;
+ nni_list_append(&p->ep->connpipes, p);
+ dtls_ep_match(p->ep);
+ }
+
+ switch (hdr->us_op_code) {
+ case OPCODE_CREQ:
+ if (p->dialer) {
+ // dialers don't accept requests
+ goto bad;
+ }
+ NNI_GET16LE(&hdr->us_params[0], rcvmax);
+ NNI_GET16LE(&hdr->us_params[1], refresh);
+ if ((refresh > 0) && ((refresh * NNI_SECOND) < p->refresh)) {
+ p->refresh = refresh * NNI_SECOND;
+ }
+ if ((rcvmax > 0) && (rcvmax < NNG_DTLS_RECVMAX)) {
+ p->send_max = rcvmax;
+ }
+ // schedule the CACK reply
+ p->send_op = OPCODE_CACK;
+ break;
+
+ case OPCODE_CACK:
+ if (!p->dialer) {
+ goto bad;
+ }
+ NNI_GET16LE(&hdr->us_params[0], rcvmax);
+ NNI_GET16LE(&hdr->us_params[0], refresh);
+
+ if ((refresh > 0) && ((refresh * NNI_SECOND) < p->refresh)) {
+ p->refresh = refresh * NNI_SECOND;
+ }
+ if ((rcvmax > 0) && (rcvmax < NNG_DTLS_RECVMAX)) {
+ p->send_max = rcvmax;
+ }
+ break;
+
+ case OPCODE_DISC:
+ p->closed = true;
+ nni_mtx_unlock(&ep->mtx);
+ nni_pipe_close(p->npipe);
+ return;
+
+ case OPCODE_DATA:
+ p->recv_rdy = true;
+ dtls_pipe_recv_tls(p);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+bad:
+ if (p->send_op != OPCODE_DATA) {
+ dtls_pipe_send_tls(p);
+ }
+ dtls_pipe_recv_tls_start(p);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_pipe_close(void *arg)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+ nni_aio *aio;
+
+ nni_mtx_lock(&ep->mtx);
+ while ((aio = nni_list_first(&p->recv_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ while ((aio = nni_list_first(&p->send_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ if (!p->matched) {
+ dtls_remove_pipe(p);
+ } else {
+ p->send_op = OPCODE_DISC;
+ p->reason = DISC_CLOSED;
+ dtls_pipe_send_tls(p);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static nng_err dtls_add_pipe(dtls_ep *ep, dtls_pipe *p);
+
+static void
+dtls_pipe_stop(void *arg)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+
+ dtls_pipe_close(arg);
+
+ nni_mtx_lock(&ep->mtx);
+ dtls_remove_pipe(p);
+ nni_list_node_remove(&p->node);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static int
+dtls_pipe_alloc(dtls_ep *ep, dtls_pipe **pp, const nng_sockaddr *sa)
+{
+ dtls_pipe *p;
+ nng_err rv;
+
+ if (ep->dialer) {
+ rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer);
+ } else {
+ rv = nni_pipe_alloc_listener((void *) &p, ep->nlistener);
+ }
+ if (rv != NNG_OK) {
+ nng_log_err("NNG-DTLS-PIPE-ALLOC-FAIL",
+ "Failed allocating pipe for DTLS: %s", nng_strerror(rv));
+ return (rv);
+ }
+ p->dialer = ep->dialer;
+ p->ep = ep;
+ p->proto = ep->proto;
+ p->peer = ep->peer;
+ p->peer_addr = *sa;
+ p->id = nng_sockaddr_hash(sa);
+ p->refresh = ep->refresh;
+ p->send_max = NNG_DTLS_RECVMAX;
+ p->recv_max = ep->rcvmax;
+ *pp = p;
+
+ if (((rv = dtls_add_pipe(ep, p)) != NNG_OK) ||
+ ((rv = nni_tls_init(&p->tls, ep->tlscfg)) != NNG_OK) ||
+ ((rv = nni_tls_start(&p->tls, &dtls_bio_ops, p, sa)) != NNG_OK)) {
+ nni_pipe_close(p->npipe);
+ nng_log_err("NNG-DTLS-PIPE-ADD-FAIL",
+ "Failed adding pipe for DTLS: %s", nng_strerror(rv));
+ return (rv);
+ }
+
+ // We need to start a receiver on the pipe.
+ dtls_pipe_recv_tls_start(p);
+
+ // Also start TLS up and running.
+ switch (nni_tls_run(&p->tls)) {
+ case NNG_OK:
+ case NNG_EAGAIN:
+ break;
+ default:
+ nni_pipe_close(p->npipe);
+ break;
+ }
+
+ // wake the timer so it knows to resubmit
+ nni_aio_abort(&ep->timeaio, NNG_ETIMEDOUT);
+
+ return (NNG_OK);
+}
+
+static size_t
+dtls_pipe_size(void)
+{
+ return (NNI_ALIGN_UP(sizeof(dtls_pipe)) +
+ NNI_ALIGN_UP(nni_tls_engine_conn_size()));
+}
+
+static int
+dtls_pipe_init(void *arg, nni_pipe *npipe)
+{
+ dtls_pipe *p = arg;
+ p->npipe = npipe;
+
+ size_t bufsz = DTLS_MAX_RECORD; // TODO: Make this a tunable.
+
+ if ((p->recv_buf = nni_alloc(bufsz)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((p->send_buf = nni_alloc(bufsz)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ p->recv_bufsz = bufsz;
+ p->send_bufsz = bufsz;
+ nni_mtx_init(&p->lower_mtx);
+ nni_aio_init(&p->recv_tls_aio, dtls_pipe_recv_tls_cb, p);
+ nni_aio_init(&p->send_tls_aio, dtls_pipe_send_tls_cb, p);
+ nni_aio_list_init(&p->rx_q);
+ nni_aio_list_init(&p->recv_aios);
+ nni_aio_list_init(&p->send_aios);
+ nni_lmq_init(&p->rx_mq, NNG_DTLS_RXQUEUE_LEN);
+
+ return (0);
+}
+
+static void
+dtls_pipe_fini(void *arg)
+{
+ dtls_pipe *p = arg;
+ nng_msg *m;
+
+ nni_tls_fini(&p->tls);
+ nni_aio_fini(&p->recv_tls_aio);
+ nni_aio_fini(&p->send_tls_aio);
+ if (p->recv_buf != NULL) {
+ nni_free(p->recv_buf, p->recv_bufsz);
+ }
+ if (p->send_buf != NULL) {
+ nni_free(p->send_buf, p->send_bufsz);
+ }
+ nni_mtx_lock(&p->lower_mtx);
+ while (!nni_lmq_empty(&p->rx_mq)) {
+ nni_lmq_get(&p->rx_mq, &m);
+ nni_msg_free(m);
+ }
+ nni_mtx_unlock(&p->lower_mtx);
+ nni_mtx_fini(&p->lower_mtx);
+ nni_lmq_fini(&p->rx_mq);
+ NNI_ASSERT(nni_list_empty(&p->recv_aios));
+ NNI_ASSERT(nni_list_empty(&p->send_aios));
+}
+
+static dtls_pipe *
+dtls_find_pipe(dtls_ep *ep, const nng_sockaddr *peer_addr)
+{
+ uint64_t id = nng_sockaddr_hash(peer_addr);
+ dtls_pipe *p;
+
+ // we'll keep incrementing id until we conclusively match
+ // or we get a NULL. This is another level of rehashing, but
+ // it keeps us from having to look up.
+ for (;;) {
+ if ((p = nni_id_get(&ep->pipes, id)) == NULL) {
+ return (NULL);
+ }
+ if (nng_sockaddr_equal(&p->peer_addr, peer_addr)) {
+ return (p);
+ }
+ id++;
+ if (id == 0) {
+ id = 1;
+ }
+ }
+}
+
+static void
+dtls_remove_pipe(dtls_pipe *p)
+{
+ // ep locked
+ dtls_ep *ep = p->ep;
+ uint64_t id = p->id;
+ bool matched = p->matched;
+ if (id == 0) {
+ return;
+ }
+ p->id = 0;
+ for (;;) {
+ dtls_pipe *srch;
+ if ((srch = nni_id_get(&ep->pipes, id)) == NULL) {
+ break;
+ }
+ if (srch == p) {
+ nni_id_remove(&ep->pipes, id);
+ break;
+ }
+ id++;
+ if (id == 0) {
+ id = 1;
+ }
+ }
+ if (!matched) {
+ nni_pipe_rele(p->npipe);
+ }
+}
+
+static nng_err
+dtls_add_pipe(dtls_ep *ep, dtls_pipe *p)
+{
+ // Id must be part of the hash
+ uint64_t id = p->id;
+ while (nni_id_get(&ep->pipes, id) != NULL) {
+ id++;
+ if (id == 0) {
+ id = 1;
+ }
+ }
+ return (nni_id_set(&ep->pipes, id, p));
+}
+
+static void
+dtls_start_rx(dtls_ep *ep)
+{
+ nni_iov iov;
+
+ iov.iov_buf = ep->rx_buf;
+ iov.iov_len = ep->rx_size;
+
+ nni_aio_reset(&ep->rx_aio);
+ nni_aio_set_input(&ep->rx_aio, 0, &ep->rx_sa);
+ nni_aio_set_iov(&ep->rx_aio, 1, &iov);
+ nng_udp_recv(ep->udp, &ep->rx_aio);
+}
+
+static void
+dtls_rx_cb(void *arg)
+{
+ dtls_ep *ep = arg;
+ dtls_pipe *p;
+ nni_aio *aio = &ep->rx_aio;
+ int rv;
+ nni_msg *msg;
+
+ nni_mtx_lock(&ep->mtx);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ // something bad happened on RX... which is unexpected.
+ // sleep a little bit and hope for recovery.
+ switch (nni_aio_result(aio)) {
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ case NNG_ESTOPPED:
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ case NNG_ETIMEDOUT:
+ case NNG_EAGAIN:
+ case NNG_EINTR:
+ default:
+ goto fail;
+ }
+ }
+
+ // If this came from another host, and we are a dialer, we discard.
+ // Dialers only talk to the party they explicitly dialed.
+ if (ep->dialer && !nng_sockaddr_equal(&ep->rx_sa, &ep->peer_sa)) {
+ goto fail;
+ }
+
+ if ((p = dtls_find_pipe(ep, &ep->rx_sa)) == NULL) {
+ if (dtls_pipe_alloc(ep, &p, &ep->rx_sa) != NNG_OK) {
+ goto fail;
+ }
+ }
+ if (p->closed) {
+ goto fail;
+ }
+ NNI_ASSERT(p != NULL);
+
+ if (nni_msg_alloc(&msg, nni_aio_count(aio)) != NNG_OK) {
+ // TODO BUMP A NO RECV ALLOC STAT
+ goto fail;
+ }
+ memcpy(nni_msg_body(msg), ep->rx_buf, nni_aio_count(aio));
+ dtls_start_rx(ep);
+ nni_mtx_unlock(&ep->mtx);
+
+ nni_mtx_lock(&p->lower_mtx);
+
+ if (nni_lmq_put(&p->rx_mq, msg) != NNG_OK) {
+ // TODO: BUMP TXQ FULL STAT
+ nng_msg_free(msg);
+ }
+ dtls_bio_recv_done(p);
+ nni_mtx_unlock(&p->lower_mtx);
+
+ // Run the TLS state machine.
+ switch (nni_tls_run(&p->tls)) {
+ case NNG_OK:
+ case NNG_EAGAIN:
+ break;
+ default:
+ nni_pipe_close(p->npipe);
+ }
+ return;
+
+fail:
+ // start another receive
+ dtls_start_rx(ep);
+
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static uint16_t
+dtls_pipe_peer(void *arg)
+{
+ dtls_pipe *p = arg;
+
+ return (p->peer);
+}
+
+static nng_err
+dtls_pipe_get_recvmax(void *arg, void *v, size_t *szp, nni_type t)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+ nng_err rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_size(p->recv_max, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
+}
+
+static nng_err
+dtls_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_type t)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+ nng_err rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_sockaddr(&p->peer_addr, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
+}
+
+static nni_option dtls_pipe_options[] = {
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_get = dtls_pipe_get_recvmax,
+ },
+ {
+ .o_name = NNG_OPT_REMADDR,
+ .o_get = dtls_pipe_get_remaddr,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static nng_err
+dtls_pipe_getopt(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ dtls_pipe *p = arg;
+ int rv;
+
+ rv = nni_getopt(dtls_pipe_options, name, p, buf, szp, t);
+ return (rv);
+}
+
+static void
+dtls_ep_fini(void *arg)
+{
+ dtls_ep *ep = arg;
+
+ nni_aio_fini(&ep->timeaio);
+ nni_aio_fini(&ep->resaio);
+ nni_aio_fini(&ep->tx_aio);
+ nni_aio_fini(&ep->rx_aio);
+
+ if (ep->udp != NULL) {
+ nng_udp_close(ep->udp);
+ }
+ if (ep->rx_size != 0) {
+ nni_free(ep->rx_buf, ep->rx_size);
+ }
+
+ nni_msg_free(ep->rx_payload); // safe even if msg is null
+ nni_id_map_fini(&ep->pipes);
+ nni_mtx_fini(&ep->mtx);
+}
+
+static void
+dtls_ep_close(void *arg)
+{
+ dtls_ep *ep = arg;
+ nni_aio *aio;
+ dtls_pipe *p;
+ uint64_t key;
+ uint32_t cursor;
+
+ nni_aio_close(&ep->resaio);
+ nni_aio_close(&ep->rx_aio);
+ nni_aio_close(&ep->timeaio);
+
+ // leave tx open so we can send disconnects
+
+ nni_mtx_lock(&ep->mtx);
+ ep->closed = true;
+ while ((aio = nni_list_first(&ep->connaios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECONNABORTED);
+ }
+ cursor = 0;
+ key = 0;
+ while (nni_id_visit(&ep->pipes, &key, (void **) &p, &cursor)) {
+ nni_pipe_close(p->npipe);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_ep_stop(void *arg)
+{
+ dtls_ep *ep = arg;
+
+ nni_aio_stop(&ep->resaio);
+ nni_aio_stop(&ep->rx_aio);
+ nni_aio_stop(&ep->timeaio);
+
+ nni_mtx_lock(&ep->mtx);
+ ep->fini = true;
+ nni_mtx_unlock(&ep->mtx);
+}
+
+// timer handler - sends out additional creqs as needed,
+// reaps stale connections, and handles linger.
+static void
+dtls_timer_cb(void *arg)
+{
+ dtls_ep *ep = arg;
+ dtls_pipe *p;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_aio_result(&ep->timeaio);
+ switch (rv) {
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ case NNG_ESTOPPED:
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ default:
+ if (ep->closed) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ break;
+ }
+
+ uint32_t cursor = 0;
+ nni_time now = nni_clock();
+ nng_duration refresh = NNG_DURATION_INFINITE;
+
+ while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) {
+
+ if (p->closed) {
+ continue;
+ }
+ NNI_ASSERT(p->refresh > 0);
+ if (p->expire > 0 && now > p->expire) {
+ char buf[128];
+ nng_log_info("NNG-DTLS-INACTIVE",
+ "Pipe peer %s timed out due to inactivity",
+ nng_str_sockaddr(&p->peer_addr, buf, sizeof(buf)));
+
+ nni_stat_inc(&ep->st_peer_inactive, 1);
+ nni_pipe_close(p->npipe);
+ continue;
+ }
+
+ if (p->dialer && now > p->next_refresh) {
+ p->send_op = OPCODE_CREQ;
+ p->next_refresh = p->expire + p->refresh;
+ dtls_pipe_send_tls(p);
+ }
+ if (refresh == NNG_DURATION_INFINITE && p->refresh > 0) {
+ refresh = p->refresh;
+ } else if ((p->refresh > 0) && (p->refresh < refresh)) {
+ refresh = p->refresh;
+ }
+ }
+ nni_sleep_aio(refresh, &ep->timeaio);
+
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static nng_err
+dtls_ep_init(
+ dtls_ep *ep, nng_url *url, nni_sock *sock, nni_dialer *d, nni_listener *l)
+{
+ nni_mtx_init(&ep->mtx);
+ nni_id_map_init(&ep->pipes, 1, 0xFFFFFFFF, true);
+ NNI_LIST_INIT(&ep->connpipes, dtls_pipe, node);
+ nni_aio_list_init(&ep->connaios);
+
+ nni_aio_init(&ep->rx_aio, dtls_rx_cb, ep);
+ nni_aio_init(&ep->timeaio, dtls_timer_cb, ep);
+ nni_aio_init(&ep->resaio, dtls_resolv_cb, ep);
+
+ if (strcmp(url->u_scheme, "dtls") == 0) {
+ ep->af = NNG_AF_UNSPEC;
+ } else if (strcmp(url->u_scheme, "dtls4") == 0) {
+ ep->af = NNG_AF_INET;
+ } else if (strcmp(url->u_scheme, "dtls6") == 0) {
+ ep->af = NNG_AF_INET6;
+ } else {
+ return (NNG_EADDRINVAL);
+ }
+
+ ep->self_sa.s_family = ep->af;
+ ep->proto = nni_sock_proto_id(sock);
+ ep->peer = nni_sock_peer_id(sock);
+ ep->url = url;
+ ep->refresh = NNG_DTLS_REFRESH; // one minute by default
+ ep->rcvmax = NNG_DTLS_RECVMAX;
+
+ // receive buffer plus some extra for UDP and TLS headers
+ if ((ep->rx_buf = nni_alloc(DTLS_MAX_RECORD)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ ep->rx_size = DTLS_MAX_RECORD;
+
+ NNI_STAT_LOCK(rcv_max_info, "rcv_max", "maximum receive size",
+ NNG_STAT_LEVEL, NNG_UNIT_BYTES);
+ NNI_STAT_LOCK(rcv_nomatch_info, "rcv_nomatch",
+ "messages without a matching connection", NNG_STAT_COUNTER,
+ NNG_UNIT_MESSAGES);
+ NNI_STAT_LOCK(rcv_toobig_info, "rcv_toobig",
+ "received messages rejected because too big", NNG_STAT_COUNTER,
+ NNG_UNIT_MESSAGES);
+ NNI_STAT_LOCK(rcv_nobuf_info, "rcv_nobuf",
+ "received messages dropped no buffer", NNG_STAT_COUNTER,
+ NNG_UNIT_MESSAGES);
+ NNI_STAT_LOCK(snd_toobig_info, "snd_toobig",
+ "sent messages rejected because too big", NNG_STAT_COUNTER,
+ NNG_UNIT_MESSAGES);
+ NNI_STAT_LOCK(snd_nobuf_info, "snd_nobuf",
+ "sent messages dropped no buffer", NNG_STAT_COUNTER,
+ NNG_UNIT_MESSAGES);
+ NNI_STAT_LOCK(peer_inactive_info, "peer_inactive",
+ "connections closed due to inactive peer", NNG_STAT_COUNTER,
+ NNG_UNIT_EVENTS);
+
+ nni_stat_init_lock(&ep->st_rcv_max, &rcv_max_info, &ep->mtx);
+ nni_stat_init_lock(&ep->st_rcv_toobig, &rcv_toobig_info, &ep->mtx);
+ nni_stat_init_lock(&ep->st_rcv_nomatch, &rcv_nomatch_info, &ep->mtx);
+ nni_stat_init_lock(&ep->st_rcv_nobuf, &rcv_nobuf_info, &ep->mtx);
+ nni_stat_init_lock(&ep->st_snd_toobig, &snd_toobig_info, &ep->mtx);
+ nni_stat_init_lock(&ep->st_snd_nobuf, &snd_nobuf_info, &ep->mtx);
+ nni_stat_init_lock(
+ &ep->st_peer_inactive, &peer_inactive_info, &ep->mtx);
+
+ if (l) {
+ NNI_ASSERT(d == NULL);
+ nni_listener_add_stat(l, &ep->st_rcv_max);
+
+ nni_listener_add_stat(l, &ep->st_rcv_toobig);
+ nni_listener_add_stat(l, &ep->st_rcv_nomatch);
+ nni_listener_add_stat(l, &ep->st_rcv_nobuf);
+ nni_listener_add_stat(l, &ep->st_snd_toobig);
+ nni_listener_add_stat(l, &ep->st_snd_nobuf);
+ }
+ if (d) {
+ NNI_ASSERT(l == NULL);
+ nni_dialer_add_stat(d, &ep->st_rcv_max);
+ nni_dialer_add_stat(d, &ep->st_rcv_toobig);
+ nni_dialer_add_stat(d, &ep->st_rcv_nomatch);
+ nni_dialer_add_stat(d, &ep->st_rcv_nobuf);
+ nni_dialer_add_stat(d, &ep->st_snd_toobig);
+ nni_dialer_add_stat(d, &ep->st_snd_nobuf);
+ }
+
+ // schedule our timer callback - forever for now
+ // adjusted automatically as we add pipes or other
+ // actions which require earlier wakeup.
+ nni_sleep_aio(NNG_DURATION_INFINITE, &ep->timeaio);
+ // nni_sleep_aio(100, &ep->timeaio);
+
+ return (NNG_OK);
+}
+
+static nng_err
+dtls_check_url(nng_url *url, bool listen)
+{
+ // Check for invalid URL components.
+ if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) {
+ return (NNG_EADDRINVAL);
+ }
+ if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) ||
+ (url->u_query != NULL)) {
+ return (NNG_EADDRINVAL);
+ }
+ if (!listen) {
+ if ((strlen(url->u_hostname) == 0) || (url->u_port == 0)) {
+ return (NNG_EADDRINVAL);
+ }
+ }
+ return (NNG_OK);
+}
+
+static nng_err
+dtls_dialer_init(void *arg, nng_url *url, nni_dialer *ndialer)
+{
+ dtls_ep *ep = arg;
+ nng_err rv;
+ nni_sock *sock = nni_dialer_sock(ndialer);
+
+ if ((rv = dtls_check_url(url, false)) != NNG_OK) {
+ return (rv);
+ }
+
+ ep->ndialer = ndialer;
+ if ((rv = dtls_ep_init(ep, url, sock, ndialer, NULL)) != NNG_OK) {
+ return (rv);
+ }
+
+ return (NNG_OK);
+}
+
+static nng_err
+dtls_listener_init(void *arg, nng_url *url, nni_listener *nlistener)
+{
+ dtls_ep *ep = arg;
+ nng_err rv;
+ nni_sock *sock = nni_listener_sock(nlistener);
+
+ ep->nlistener = nlistener;
+ if ((rv = dtls_ep_init(ep, url, sock, NULL, nlistener)) != NNG_OK) {
+ return (rv);
+ }
+ // Check for invalid URL components.
+ if (((rv = dtls_check_url(url, true)) != NNG_OK) ||
+ ((rv = nni_url_to_address(&ep->self_sa, url)) != NNG_OK)) {
+ return (rv);
+ }
+
+ return (NNG_OK);
+}
+
+static void
+dtls_ep_cancel(nni_aio *aio, void *arg, nng_err rv)
+{
+ dtls_ep *ep = arg;
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_aio_abort(&ep->resaio, rv);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_resolv_cb(void *arg)
+{
+ dtls_ep *ep = arg;
+ dtls_pipe *p;
+ nni_aio *aio;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ if ((aio = nni_list_first(&ep->connaios)) == NULL) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if (ep->closed) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if ((rv = nni_aio_result(&ep->resaio)) != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&ep->mtx);
+ nng_log_warn("NNG-RESOLV", "Failed resolving IP address: %s",
+ nng_strerror(rv));
+ return;
+ }
+
+ // Choose the right port to bind to. The family must match.
+ if (ep->self_sa.s_family == NNG_AF_UNSPEC) {
+ ep->self_sa.s_family = ep->peer_sa.s_family;
+ }
+
+ if (ep->udp == NULL) {
+ if ((rv = nng_udp_open(&ep->udp, &ep->self_sa)) != NNG_OK) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ }
+
+ if ((rv = dtls_pipe_alloc(ep, &p, &ep->peer_sa)) != NNG_OK) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ dtls_ep_start(ep);
+
+ // Send out the connection request. We don't complete
+ // the user aio until we confirm a connection, so that
+ // we can supply details like maximum receive message size
+ // and the protocol the peer is using.
+ p->send_op = OPCODE_CREQ;
+ dtls_pipe_send_tls(p);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_ep_connect(void *arg, nni_aio *aio)
+{
+ dtls_ep *ep = arg;
+
+ nni_mtx_lock(&ep->mtx);
+ if (!nni_aio_start(aio, dtls_ep_cancel, ep)) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if (ep->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if (ep->started) {
+ nni_aio_finish_error(aio, NNG_EBUSY);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ NNI_ASSERT(nni_list_empty(&ep->connaios));
+ ep->dialer = true;
+
+ nni_list_append(&ep->connaios, aio);
+
+ // lookup the IP address
+
+ memset(&ep->resolv, 0, sizeof(ep->resolv));
+ ep->resolv.ri_family = ep->af;
+ ep->resolv.ri_host = ep->url->u_hostname;
+ ep->resolv.ri_port = ep->url->u_port;
+ ep->resolv.ri_passive = false;
+ ep->resolv.ri_sa = &ep->peer_sa;
+ nni_aio_set_timeout(&ep->resaio, NNI_SECOND * 5);
+ nni_resolv(&ep->resolv, &ep->resaio);
+
+ // wake up for retries
+ nni_aio_abort(&ep->timeaio, NNG_EINTR);
+
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static nng_err
+dtls_ep_get_port(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ dtls_ep *ep = arg;
+ nng_sockaddr sa;
+ int port;
+ uint8_t *paddr;
+
+ nni_mtx_lock(&ep->mtx);
+ if (ep->udp != NULL) {
+ (void) nng_udp_sockname(ep->udp, &sa);
+ } else {
+ sa = ep->self_sa;
+ }
+ switch (sa.s_family) {
+ case NNG_AF_INET:
+ paddr = (void *) &sa.s_in.sa_port;
+ break;
+
+ case NNG_AF_INET6:
+ paddr = (void *) &sa.s_in6.sa_port;
+ break;
+
+ default:
+ paddr = NULL;
+ break;
+ }
+ nni_mtx_unlock(&ep->mtx);
+
+ if (paddr == NULL) {
+ return (NNG_ESTATE);
+ }
+
+ NNI_GET16(paddr, port);
+ return (nni_copyout_int(port, buf, szp, t));
+}
+
+static nng_err
+dtls_ep_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ dtls_ep *ep = arg;
+ nng_sockaddr sa;
+
+ nni_mtx_lock(&ep->mtx);
+ if (ep->udp != NULL) {
+ (void) nng_udp_sockname(ep->udp, &sa);
+ } else {
+ sa = ep->self_sa;
+ }
+ nni_mtx_unlock(&ep->mtx);
+
+ return (nni_copyout_sockaddr(&sa, v, szp, t));
+}
+
+static nng_err
+dtls_ep_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ dtls_ep *ep = arg;
+ nng_err rv;
+
+ if (!ep->dialer) {
+ return (NNG_ENOTSUP);
+ }
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_sockaddr(&ep->peer_sa, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
+}
+
+static nng_err
+dtls_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ dtls_ep *ep = arg;
+ nng_err rv;
+
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_size(ep->rcvmax, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
+}
+
+static nng_err
+dtls_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
+{
+ dtls_ep *ep = arg;
+ size_t val;
+ nng_err rv;
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNG_DTLS_RECVMAX, t)) == 0) {
+ if ((val == 0) || (val > NNG_DTLS_RECVMAX)) {
+ val = NNG_DTLS_RECVMAX;
+ }
+ nni_mtx_lock(&ep->mtx);
+ if (ep->started) {
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_EBUSY);
+ }
+ ep->rcvmax = (uint16_t) val;
+ nni_stat_set_value(&ep->st_rcv_max, val);
+ nni_mtx_unlock(&ep->mtx);
+ }
+ return (rv);
+}
+
+static nng_err
+dtls_ep_set_tls(void *arg, nng_tls_config *cfg)
+{
+ dtls_ep *ep = arg;
+ nni_mtx_lock(&ep->mtx);
+ if (ep->started) {
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_EBUSY);
+ }
+ ep->tlscfg = cfg;
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_OK);
+}
+
+static nng_err
+dtls_ep_get_tls(void *arg, nng_tls_config **cfgp)
+{
+ dtls_ep *ep = arg;
+ nni_mtx_lock(&ep->mtx);
+ *cfgp = ep->tlscfg;
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_OK);
+}
+
+// this just looks for pipes waiting for an aio, and aios waiting for
+// a connection, and matches them together.
+static void
+dtls_ep_match(dtls_ep *ep)
+{
+ nng_aio *aio = nni_list_first(&ep->connaios);
+ dtls_pipe *p = nni_list_first(&ep->connpipes);
+
+ if ((aio == NULL) || (p == NULL)) {
+ return;
+ }
+
+ nni_aio_list_remove(aio);
+ nni_list_remove(&ep->connpipes, p);
+ nni_aio_set_output(aio, 0, p->npipe);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+dtls_ep_start(dtls_ep *ep)
+{
+ ep->started = true;
+ dtls_start_rx(ep);
+}
+
+static nng_err
+dtls_ep_bind(void *arg, nng_url *url)
+{
+ dtls_ep *ep = arg;
+ nng_err rv;
+
+ nni_mtx_lock(&ep->mtx);
+ if (ep->started) {
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_EBUSY);
+ }
+
+ rv = nng_udp_open(&ep->udp, &ep->self_sa);
+ if (rv != NNG_OK) {
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
+ }
+ nng_sockaddr sa;
+ nng_udp_sockname(ep->udp, &sa);
+ url->u_port = nng_sockaddr_port(&sa);
+ dtls_ep_start(ep);
+ nni_mtx_unlock(&ep->mtx);
+
+ return (rv);
+}
+
+static void
+dtls_ep_accept(void *arg, nni_aio *aio)
+{
+ dtls_ep *ep = arg;
+
+ nni_aio_reset(aio);
+ nni_mtx_lock(&ep->mtx);
+ if (ep->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if (!nni_aio_start(aio, dtls_ep_cancel, ep)) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_aio_list_append(&ep->connaios, aio);
+ dtls_ep_match(ep);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static nni_sp_pipe_ops dtls_pipe_ops = {
+ .p_size = dtls_pipe_size,
+ .p_init = dtls_pipe_init,
+ .p_fini = dtls_pipe_fini,
+ .p_stop = dtls_pipe_stop,
+ .p_send = dtls_pipe_send,
+ .p_recv = dtls_pipe_recv,
+ .p_close = dtls_pipe_close,
+ .p_peer = dtls_pipe_peer,
+ .p_getopt = dtls_pipe_getopt,
+};
+
+static const nni_option dtls_ep_opts[] = {
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_get = dtls_ep_get_recvmaxsz,
+ .o_set = dtls_ep_set_recvmaxsz,
+ },
+ {
+ .o_name = NNG_OPT_LOCADDR,
+ .o_get = dtls_ep_get_locaddr,
+ },
+ {
+ .o_name = NNG_OPT_REMADDR,
+ .o_get = dtls_ep_get_remaddr,
+ },
+ {
+ .o_name = NNG_OPT_TCP_BOUND_PORT,
+ .o_get = dtls_ep_get_port,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nng_err
+dtls_dialer_getopt(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ dtls_ep *ep = arg;
+
+ return (nni_getopt(dtls_ep_opts, name, ep, buf, szp, t));
+}
+
+static nng_err
+dtls_dialer_setopt(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ dtls_ep *ep = arg;
+
+ return (nni_setopt(dtls_ep_opts, name, ep, buf, sz, t));
+}
+
+static nng_err
+dtls_listener_getopt(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ dtls_ep *ep = arg;
+
+ return (nni_getopt(dtls_ep_opts, name, ep, buf, szp, t));
+}
+
+static nng_err
+dtls_listener_setopt(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ dtls_ep *ep = arg;
+
+ return (nni_setopt(dtls_ep_opts, name, ep, buf, sz, t));
+}
+
+static nni_sp_dialer_ops dtls_dialer_ops = {
+ .d_size = sizeof(dtls_ep),
+ .d_init = dtls_dialer_init,
+ .d_fini = dtls_ep_fini,
+ .d_connect = dtls_ep_connect,
+ .d_close = dtls_ep_close,
+ .d_stop = dtls_ep_stop,
+ .d_set_tls = dtls_ep_set_tls,
+ .d_get_tls = dtls_ep_get_tls,
+ .d_getopt = dtls_dialer_getopt,
+ .d_setopt = dtls_dialer_setopt,
+};
+
+static nni_sp_listener_ops dtls_listener_ops = {
+ .l_size = sizeof(dtls_ep),
+ .l_init = dtls_listener_init,
+ .l_fini = dtls_ep_fini,
+ .l_bind = dtls_ep_bind,
+ .l_accept = dtls_ep_accept,
+ .l_close = dtls_ep_close,
+ .l_stop = dtls_ep_stop,
+ .l_set_tls = dtls_ep_set_tls,
+ .l_get_tls = dtls_ep_get_tls,
+ .l_getopt = dtls_listener_getopt,
+ .l_setopt = dtls_listener_setopt,
+};
+
+static nni_sp_tran dtls_tran = {
+ .tran_scheme = "dtls",
+ .tran_dialer = &dtls_dialer_ops,
+ .tran_listener = &dtls_listener_ops,
+ .tran_pipe = &dtls_pipe_ops,
+ .tran_init = dtls_tran_init,
+ .tran_fini = dtls_tran_fini,
+};
+
+static nni_sp_tran dtls4_tran = {
+ .tran_scheme = "dtls4",
+ .tran_dialer = &dtls_dialer_ops,
+ .tran_listener = &dtls_listener_ops,
+ .tran_pipe = &dtls_pipe_ops,
+ .tran_init = dtls_tran_init,
+ .tran_fini = dtls_tran_fini,
+};
+
+#ifdef NNG_ENABLE_IPV6
+static nni_sp_tran dtls6_tran = {
+ .tran_scheme = "dtls6",
+ .tran_dialer = &dtls_dialer_ops,
+ .tran_listener = &dtls_listener_ops,
+ .tran_pipe = &dtls_pipe_ops,
+ .tran_init = dtls_tran_init,
+ .tran_fini = dtls_tran_fini,
+};
+#endif
+
+void
+nni_sp_dtls_register(void)
+{
+ nni_sp_tran_register(&dtls_tran);
+ nni_sp_tran_register(&dtls4_tran);
+#ifdef NNG_ENABLE_IPV6
+ nni_sp_tran_register(&dtls6_tran);
+#endif
+}
diff --git a/src/sp/transport/dtls/dtls_tran_test.c b/src/sp/transport/dtls/dtls_tran_test.c
new file mode 100644
index 00000000..1dcfeb49
--- /dev/null
+++ b/src/sp/transport/dtls/dtls_tran_test.c
@@ -0,0 +1,345 @@
+//
+// Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2018 Cody Piersall <cody.piersall@gmail.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "nng/nng.h"
+#include <nuts.h>
+
+// TLS tests.
+
+static nng_tls_config *
+tls_server_config(void)
+{
+ nng_tls_config *c;
+ NUTS_PASS(nng_tls_config_alloc(&c, NNG_TLS_MODE_SERVER));
+ NUTS_PASS(nng_tls_config_own_cert(
+ c, nuts_server_crt, nuts_server_key, NULL));
+ return (c);
+}
+
+static nng_tls_config *
+tls_server_config_ecdsa(void)
+{
+ nng_tls_config *c;
+ NUTS_PASS(nng_tls_config_alloc(&c, NNG_TLS_MODE_SERVER));
+ NUTS_PASS(nng_tls_config_own_cert(
+ c, nuts_ecdsa_server_crt, nuts_ecdsa_server_key, NULL));
+ return (c);
+}
+
+static nng_tls_config *
+tls_config_psk(nng_tls_mode mode, const char *name, uint8_t *key, size_t len)
+{
+ nng_tls_config *c;
+ NUTS_PASS(nng_tls_config_alloc(&c, mode));
+ NUTS_PASS(nng_tls_config_psk(c, name, key, len));
+ return (c);
+}
+
+static nng_tls_config *
+tls_client_config(void)
+{
+ nng_tls_config *c;
+ NUTS_PASS(nng_tls_config_alloc(&c, NNG_TLS_MODE_CLIENT));
+ NUTS_PASS(nng_tls_config_own_cert(
+ c, nuts_client_crt, nuts_client_key, NULL));
+ NUTS_PASS(nng_tls_config_ca_chain(c, nuts_server_crt, NULL));
+ NUTS_PASS(nng_tls_config_server_name(c, "localhost"));
+ return (c);
+}
+
+static nng_tls_config *
+tls_client_config_ecdsa(void)
+{
+ nng_tls_config *c;
+ NUTS_PASS(nng_tls_config_alloc(&c, NNG_TLS_MODE_CLIENT));
+ NUTS_PASS(nng_tls_config_own_cert(
+ c, nuts_ecdsa_client_crt, nuts_ecdsa_client_key, NULL));
+ NUTS_PASS(nng_tls_config_ca_chain(c, nuts_ecdsa_server_crt, NULL));
+ NUTS_PASS(nng_tls_config_server_name(c, "localhost"));
+ return (c);
+}
+
+void
+test_dtls_port_zero_bind(void)
+{
+ nng_socket s1;
+ nng_socket s2;
+ nng_tls_config *c1, *c2;
+ nng_sockaddr sa;
+ nng_listener l;
+ nng_dialer d;
+ const nng_url *url;
+
+ NUTS_ENABLE_LOG(NNG_LOG_DEBUG);
+ c1 = tls_server_config();
+ c2 = tls_client_config();
+ NUTS_OPEN(s1);
+ NUTS_OPEN(s2);
+ NUTS_PASS(nng_listener_create(&l, s1, "dtls://127.0.0.1:0"));
+ NUTS_PASS(nng_listener_set_tls(l, c1));
+ NUTS_PASS(nng_listener_start(l, 0));
+ NUTS_PASS(nng_listener_get_url(l, &url));
+ NUTS_MATCH(nng_url_scheme(url), "dtls");
+ NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa));
+ NUTS_TRUE(sa.s_in.sa_family == NNG_AF_INET);
+ NUTS_TRUE(sa.s_in.sa_port != 0);
+ NUTS_TRUE(sa.s_in.sa_addr = nuts_be32(0x7f000001));
+ NUTS_PASS(nng_dialer_create_url(&d, s2, url));
+ NUTS_PASS(nng_dialer_set_tls(d, c2));
+ // NUTS_PASS(nng_dialer_start(d, NNG_FLAG_NONBLOCK));
+ NUTS_PASS(nng_dialer_start(d, 0));
+ nng_msleep(1000);
+ NUTS_CLOSE(s2);
+ NUTS_CLOSE(s1);
+ nng_tls_config_free(c1);
+ nng_tls_config_free(c2);
+}
+
+void
+test_dtls_bad_cert_mutual(void)
+{
+ nng_socket s1;
+ nng_socket s2;
+ nng_tls_config *c1, *c2;
+ nng_sockaddr sa;
+ nng_listener l;
+ nng_dialer d;
+ const nng_url *url;
+
+ c1 = tls_server_config();
+ c2 = tls_client_config();
+
+ NUTS_ENABLE_LOG(NNG_LOG_DEBUG);
+ NUTS_OPEN(s1);
+ NUTS_OPEN(s2);
+ NUTS_PASS(nng_tls_config_auth_mode(c1, NNG_TLS_AUTH_MODE_REQUIRED));
+ // a valid cert, but not the one that signed the config!
+ NUTS_PASS(nng_tls_config_ca_chain(c1, nuts_ecdsa_server_crt, NULL));
+ NUTS_PASS(nng_listener_create(&l, s1, "dtls://127.0.0.1:0"));
+ NUTS_PASS(nng_listener_set_tls(l, c1));
+ NUTS_PASS(nng_listener_start(l, 0));
+ NUTS_PASS(nng_listener_get_url(l, &url));
+ NUTS_MATCH(nng_url_scheme(url), "dtls");
+ NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa));
+ NUTS_TRUE(sa.s_in.sa_family == NNG_AF_INET);
+ NUTS_TRUE(sa.s_in.sa_port != 0);
+ NUTS_TRUE(sa.s_in.sa_addr = nuts_be32(0x7f000001));
+ NUTS_PASS(nng_dialer_create_url(&d, s2, url));
+ NUTS_PASS(nng_dialer_set_tls(d, c2));
+ // With DTLS we are not guaranteed to get the connection failure.
+ nng_dialer_start(d, NNG_FLAG_NONBLOCK);
+ nng_msleep(500);
+ NUTS_CLOSE(s2);
+ NUTS_CLOSE(s1);
+ nng_tls_config_free(c1);
+ nng_tls_config_free(c2);
+}
+
+void
+test_dtls_cert_mutual(void)
+{
+ nng_socket s1;
+ nng_socket s2;
+ nng_tls_config *c1, *c2;
+ nng_sockaddr sa;
+ nng_listener l;
+ nng_dialer d;
+ const nng_url *url;
+
+ c1 = tls_server_config_ecdsa();
+ c2 = tls_client_config_ecdsa();
+
+ NUTS_ENABLE_LOG(NNG_LOG_DEBUG);
+ NUTS_OPEN(s1);
+ NUTS_OPEN(s2);
+ NUTS_PASS(nng_tls_config_auth_mode(c1, NNG_TLS_AUTH_MODE_REQUIRED));
+ NUTS_PASS(nng_tls_config_ca_chain(c1, nuts_ecdsa_server_crt, NULL));
+ NUTS_PASS(nng_tls_config_ca_chain(c2, nuts_ecdsa_server_crt, NULL));
+ NUTS_PASS(nng_listener_create(&l, s1, "dtls://127.0.0.1:0"));
+ NUTS_PASS(nng_listener_set_tls(l, c1));
+ NUTS_PASS(nng_listener_start(l, 0));
+ NUTS_PASS(nng_listener_get_url(l, &url));
+ NUTS_MATCH(nng_url_scheme(url), "dtls");
+ NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa));
+ NUTS_TRUE(sa.s_in.sa_family == NNG_AF_INET);
+ NUTS_TRUE(sa.s_in.sa_port != 0);
+ NUTS_TRUE(sa.s_in.sa_addr = nuts_be32(0x7f000001));
+ NUTS_PASS(nng_dialer_create_url(&d, s2, url));
+ NUTS_PASS(nng_dialer_set_tls(d, c2));
+ NUTS_PASS(nng_dialer_start(d, 0));
+ nng_msleep(50);
+ NUTS_CLOSE(s2);
+ NUTS_CLOSE(s1);
+ nng_tls_config_free(c1);
+ nng_tls_config_free(c2);
+}
+
+void
+test_dtls_malformed_address(void)
+{
+ nng_socket s1;
+
+ NUTS_OPEN(s1);
+ NUTS_FAIL(nng_dial(s1, "dtls://127.0.0.1", NULL, 0), NNG_EADDRINVAL);
+ NUTS_FAIL(
+ nng_dial(s1, "dtls://127.0.0.1.32", NULL, 0), NNG_EADDRINVAL);
+ NUTS_FAIL(
+ nng_dial(s1, "dtls://127.0.x.1.32", NULL, 0), NNG_EADDRINVAL);
+ NUTS_FAIL(
+ nng_listen(s1, "dtls://127.0.0.1.32", NULL, 0), NNG_EADDRINVAL);
+ NUTS_FAIL(
+ nng_listen(s1, "dtls://127.0.x.1.32", NULL, 0), NNG_EADDRINVAL);
+ NUTS_CLOSE(s1);
+}
+
+// DTLS does not support TCP_NODELAY because it's based on UDP.
+void
+test_dtls_no_delay_option(void)
+{
+ nng_socket s;
+ nng_dialer d;
+ nng_listener l;
+ bool v;
+ char *addr;
+ nng_tls_config *dc, *lc;
+
+ NUTS_ADDR(addr, "dtls");
+ dc = tls_client_config();
+ lc = tls_server_config();
+
+ NUTS_OPEN(s);
+ NUTS_PASS(nng_dialer_create(&d, s, addr));
+ NUTS_PASS(nng_dialer_set_tls(d, dc));
+ NUTS_FAIL(
+ nng_dialer_get_bool(d, NNG_OPT_TCP_NODELAY, &v), NNG_ENOTSUP);
+ NUTS_FAIL(nng_dialer_set_bool(d, NNG_OPT_TCP_NODELAY, v), NNG_ENOTSUP);
+
+ NUTS_PASS(nng_listener_create(&l, s, addr));
+ NUTS_PASS(nng_listener_set_tls(l, lc));
+ NUTS_FAIL(
+ nng_listener_get_bool(l, NNG_OPT_TCP_NODELAY, &v), NNG_ENOTSUP);
+ NUTS_FAIL(
+ nng_listener_set_bool(l, NNG_OPT_TCP_NODELAY, v), NNG_ENOTSUP);
+
+ NUTS_PASS(nng_dialer_close(d));
+ NUTS_PASS(nng_listener_close(l));
+
+ NUTS_CLOSE(s);
+ nng_tls_config_free(lc);
+ nng_tls_config_free(dc);
+}
+
+void
+test_dtls_recv_max(void)
+{
+ char msg[256];
+ char buf[256];
+ nng_socket s0;
+ nng_socket s1;
+ nng_tls_config *c0, *c1;
+ nng_listener l;
+ nng_dialer d;
+ size_t sz;
+ char *addr;
+ const nng_url *url;
+
+ NUTS_ADDR_ZERO(addr, "dtls");
+
+ c0 = tls_server_config();
+ c1 = tls_client_config();
+ NUTS_OPEN(s0);
+ NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100));
+ NUTS_PASS(nng_socket_set_size(s0, NNG_OPT_RECVMAXSZ, 200));
+ NUTS_PASS(nng_listener_create(&l, s0, addr));
+ NUTS_PASS(nng_listener_set_tls(l, c0));
+ NUTS_PASS(nng_socket_get_size(s0, NNG_OPT_RECVMAXSZ, &sz));
+ NUTS_TRUE(sz == 200);
+ NUTS_PASS(nng_listener_set_size(l, NNG_OPT_RECVMAXSZ, 100));
+ NUTS_PASS(nng_listener_start(l, 0));
+ NUTS_PASS(nng_listener_get_url(l, &url));
+
+ NUTS_OPEN(s1);
+ NUTS_PASS(nng_dialer_create_url(&d, s1, url));
+ NUTS_PASS(nng_dialer_set_tls(d, c1));
+ NUTS_PASS(nng_dialer_start(d, 0));
+ NUTS_PASS(nng_send(s1, msg, 95, 0));
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100));
+ NUTS_PASS(nng_recv(s0, buf, &sz, 0));
+ NUTS_TRUE(sz == 95);
+ NUTS_PASS(nng_send(s1, msg, 150, 0));
+ NUTS_FAIL(nng_recv(s0, buf, &sz, 0), NNG_ETIMEDOUT);
+ NUTS_CLOSE(s0);
+ NUTS_CLOSE(s1);
+ nng_tls_config_free(c0);
+ nng_tls_config_free(c1);
+}
+
+void
+test_dtls_psk(void)
+{
+#ifdef NNG_SUPP_TLS_PSK
+ char msg[256];
+ char buf[256];
+ nng_socket s0;
+ nng_socket s1;
+ nng_tls_config *c0, *c1;
+ nng_listener l;
+ nng_dialer d;
+ size_t sz;
+ char *addr;
+ uint8_t key[32];
+ const nng_url *url;
+
+ for (unsigned i = 0; i < sizeof(key); i++) {
+ key[i] = rand() % 0xff;
+ }
+
+ NUTS_ADDR_ZERO(addr, "dtls");
+ NUTS_ENABLE_LOG(NNG_LOG_DEBUG);
+
+ c0 = tls_config_psk(NNG_TLS_MODE_SERVER, "identity", key, sizeof key);
+ c1 = tls_config_psk(NNG_TLS_MODE_CLIENT, "identity", key, sizeof key);
+ NUTS_OPEN(s0);
+ NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100));
+ NUTS_PASS(nng_listener_create(&l, s0, addr));
+ NUTS_PASS(nng_listener_set_tls(l, c0));
+ NUTS_PASS(nng_listener_start(l, 0));
+ NUTS_PASS(nng_listener_get_url(l, &url));
+
+ NUTS_OPEN(s1);
+ NUTS_PASS(nng_dialer_create_url(&d, s1, url));
+ NUTS_PASS(nng_dialer_set_tls(d, c1));
+ NUTS_PASS(nng_dialer_start(d, 0));
+ NUTS_SLEEP(1000); // make sure connection has time to form!
+ NUTS_PASS(nng_send(s1, msg, 95, 0));
+ NUTS_PASS(nng_recv(s0, buf, &sz, 0));
+ NUTS_TRUE(sz == 95);
+ NUTS_CLOSE(s0);
+ NUTS_CLOSE(s1);
+ nng_tls_config_free(c0);
+ nng_tls_config_free(c1);
+#else
+ NUTS_SKIP("no PSK support");
+#endif
+}
+
+NUTS_TESTS = {
+
+ { "dtls port zero bind", test_dtls_port_zero_bind },
+ { "dtls malformed address", test_dtls_malformed_address },
+ { "dtls no delay option", test_dtls_no_delay_option },
+ { "dtls recv max", test_dtls_recv_max },
+ { "dtls pre-shared key", test_dtls_psk },
+ { "dtls bad cert mutual", test_dtls_bad_cert_mutual },
+ { "dtls cert mutual", test_dtls_cert_mutual },
+ { NULL, NULL },
+};
diff --git a/src/sp/transport/inproc/CMakeLists.txt b/src/sp/transport/inproc/CMakeLists.txt
index 2132e8d7..42f4d824 100644
--- a/src/sp/transport/inproc/CMakeLists.txt
+++ b/src/sp/transport/inproc/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
# Copyright 2018 Capitar IT Group BV <info@capitar.com>
#
# This software is supplied under the terms of the MIT License, a
@@ -11,6 +11,8 @@
# inproc protocol
nng_directory(inproc)
-nng_sources_if(NNG_TRANSPORT_INPROC inproc.c)
-nng_defines_if(NNG_TRANSPORT_INPROC NNG_TRANSPORT_INPROC)
-nng_test_if(NNG_TRANSPORT_INPROC inproc_test)
+if (NNG_TRANSPORT_INPROC)
+ nng_sources(inproc.c)
+ nng_defines(NNG_TRANSPORT_INPROC)
+ nng_test(inproc_test)
+endif()
diff --git a/src/sp/transport/inproc/inproc.c b/src/sp/transport/inproc/inproc.c
index 24c0f1ad..22fe619b 100644
--- a/src/sp/transport/inproc/inproc.c
+++ b/src/sp/transport/inproc/inproc.c
@@ -584,8 +584,14 @@ inproc_pipe_getopt(
return (nni_getopt(inproc_pipe_options, name, arg, v, szp, t));
}
+static size_t
+inproc_pipe_size(void)
+{
+ return (sizeof(inproc_pipe));
+}
+
static nni_sp_pipe_ops inproc_pipe_ops = {
- .p_size = sizeof(inproc_pipe),
+ .p_size = inproc_pipe_size,
.p_init = inproc_pipe_init,
.p_fini = inproc_pipe_fini,
.p_send = inproc_pipe_send,
diff --git a/src/sp/transport/ipc/CMakeLists.txt b/src/sp/transport/ipc/CMakeLists.txt
index 7353c4f3..8cd78941 100644
--- a/src/sp/transport/ipc/CMakeLists.txt
+++ b/src/sp/transport/ipc/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
# Copyright 2018 Capitar IT Group BV <info@capitar.com>
#
# This software is supplied under the terms of the MIT License, a
@@ -11,6 +11,8 @@
# ipc protocol
nng_directory(ipc)
-nng_sources_if(NNG_TRANSPORT_IPC ipc.c)
-nng_defines_if(NNG_TRANSPORT_IPC NNG_TRANSPORT_IPC)
-nng_test_if(NNG_TRANSPORT_IPC ipc_test)
+if (NNG_TRANSPORT_IPC)
+ nng_sources(ipc.c)
+ nng_defines(NNG_TRANSPORT_IPC)
+ nng_test(ipc_test)
+endif()
diff --git a/src/sp/transport/ipc/ipc.c b/src/sp/transport/ipc/ipc.c
index 67038e2e..6bf4445b 100644
--- a/src/sp/transport/ipc/ipc.c
+++ b/src/sp/transport/ipc/ipc.c
@@ -1,5 +1,5 @@
//
-// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -956,8 +956,14 @@ ipc_pipe_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t)
return (nni_stream_get(p->conn, name, buf, szp, t));
}
+static size_t
+ipc_pipe_size(void)
+{
+ return (sizeof(ipc_pipe));
+}
+
static nni_sp_pipe_ops ipc_tran_pipe_ops = {
- .p_size = sizeof(ipc_pipe),
+ .p_size = ipc_pipe_size,
.p_init = ipc_pipe_init,
.p_fini = ipc_pipe_fini,
.p_stop = ipc_pipe_stop,
diff --git a/src/sp/transport/socket/CMakeLists.txt b/src/sp/transport/socket/CMakeLists.txt
index d79b261e..4c7e8b58 100644
--- a/src/sp/transport/socket/CMakeLists.txt
+++ b/src/sp/transport/socket/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
@@ -10,6 +10,8 @@
# File Descriptor (or Handle) based connections
nng_directory(socket)
-nng_sources_if(NNG_TRANSPORT_FDC sockfd.c)
-nng_defines_if(NNG_TRANSPORT_FDC NNG_TRANSPORT_FDC)
-nng_test(sockfd_test) \ No newline at end of file
+if (NNG_TRANSPORT_FDC)
+ nng_sources(sockfd.c)
+ nng_defines(NNG_TRANSPORT_FDC)
+ nng_test(sockfd_test)
+endif()
diff --git a/src/sp/transport/socket/sockfd.c b/src/sp/transport/socket/sockfd.c
index 37debc85..57693088 100644
--- a/src/sp/transport/socket/sockfd.c
+++ b/src/sp/transport/socket/sockfd.c
@@ -807,8 +807,14 @@ sfd_tran_ep_accept(void *arg, nni_aio *aio)
nni_mtx_unlock(&ep->mtx);
}
+static size_t
+sfd_tran_pipe_size(void)
+{
+ return (sizeof(sfd_tran_pipe));
+}
+
static nni_sp_pipe_ops sfd_tran_pipe_ops = {
- .p_size = sizeof(sfd_tran_pipe),
+ .p_size = sfd_tran_pipe_size,
.p_init = sfd_tran_pipe_init,
.p_fini = sfd_tran_pipe_fini,
.p_stop = sfd_tran_pipe_stop,
diff --git a/src/sp/transport/tcp/CMakeLists.txt b/src/sp/transport/tcp/CMakeLists.txt
index fea821c2..e611a502 100644
--- a/src/sp/transport/tcp/CMakeLists.txt
+++ b/src/sp/transport/tcp/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
# Copyright 2018 Capitar IT Group BV <info@capitar.com>
#
# This software is supplied under the terms of the MIT License, a
@@ -11,6 +11,8 @@
# TCP protocol
nng_directory(tcp)
-nng_sources_if(NNG_TRANSPORT_TCP tcp.c)
-nng_defines_if(NNG_TRANSPORT_TCP NNG_TRANSPORT_TCP)
-nng_test(tcp_test)
+if (NNG_TRANSPORT_TCP)
+ nng_sources(tcp.c)
+ nng_defines(NNG_TRANSPORT_TCP)
+ nng_test(tcp_test)
+endif()
diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c
index d77e7b2f..096d2e24 100644
--- a/src/sp/transport/tcp/tcp.c
+++ b/src/sp/transport/tcp/tcp.c
@@ -964,8 +964,14 @@ tcptran_ep_accept(void *arg, nni_aio *aio)
nni_mtx_unlock(&ep->mtx);
}
+static size_t
+tcptran_pipe_size(void)
+{
+ return (sizeof(tcptran_pipe));
+}
+
static nni_sp_pipe_ops tcptran_pipe_ops = {
- .p_size = sizeof(tcptran_pipe),
+ .p_size = tcptran_pipe_size,
.p_init = tcptran_pipe_init,
.p_fini = tcptran_pipe_fini,
.p_stop = tcptran_pipe_stop,
diff --git a/src/sp/transport/tls/CMakeLists.txt b/src/sp/transport/tls/CMakeLists.txt
index f55340a9..0ba9a235 100644
--- a/src/sp/transport/tls/CMakeLists.txt
+++ b/src/sp/transport/tls/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
# Copyright 2018 Capitar IT Group BV <info@capitar.com>
#
# This software is supplied under the terms of the MIT License, a
@@ -11,6 +11,8 @@
# TLS transport
nng_directory(tls)
-nng_sources_if(NNG_TRANSPORT_TLS tls.c)
-nng_defines_if(NNG_TRANSPORT_TLS NNG_TRANSPORT_TLS)
-nng_test_if(NNG_ENABLE_TLS tls_tran_test)
+if (NNG_TRANSPORT_TLS)
+ nng_sources(tls.c)
+ nng_defines(NNG_TRANSPORT_TLS)
+ nng_test(tls_tran_test)
+endif()
diff --git a/src/sp/transport/tls/tls.c b/src/sp/transport/tls/tls.c
index 5c567692..fd983c67 100644
--- a/src/sp/transport/tls/tls.c
+++ b/src/sp/transport/tls/tls.c
@@ -952,8 +952,14 @@ tlstran_pipe_getopt(
return (rv);
}
+static size_t
+tlstran_pipe_size(void)
+{
+ return (sizeof(tlstran_pipe)); // TODO add engine data size
+}
+
static nni_sp_pipe_ops tlstran_pipe_ops = {
- .p_size = sizeof(tlstran_pipe),
+ .p_size = tlstran_pipe_size,
.p_init = tlstran_pipe_init,
.p_fini = tlstran_pipe_fini,
.p_stop = tlstran_pipe_stop,
diff --git a/src/sp/transport/tls/tls_tran_test.c b/src/sp/transport/tls/tls_tran_test.c
index 5b38d733..3c43b36e 100644
--- a/src/sp/transport/tls/tls_tran_test.c
+++ b/src/sp/transport/tls/tls_tran_test.c
@@ -35,6 +35,7 @@ tls_server_config_ecdsa(void)
return (c);
}
+#ifdef NNG_SUPP_TLS_PSK
static nng_tls_config *
tls_config_psk(nng_tls_mode mode, const char *name, uint8_t *key, size_t len)
{
@@ -43,6 +44,7 @@ tls_config_psk(nng_tls_mode mode, const char *name, uint8_t *key, size_t len)
NUTS_PASS(nng_tls_config_psk(c, name, key, len));
return (c);
}
+#endif
static nng_tls_config *
tls_client_config(void)
diff --git a/src/sp/transport/udp/CMakeLists.txt b/src/sp/transport/udp/CMakeLists.txt
index b08cd861..391888fe 100644
--- a/src/sp/transport/udp/CMakeLists.txt
+++ b/src/sp/transport/udp/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
@@ -10,6 +10,8 @@
# UDP transport
nng_directory(udp)
-nng_sources_if(NNG_TRANSPORT_UDP udp.c)
-nng_defines_if(NNG_TRANSPORT_UDP NNG_TRANSPORT_UDP)
-nng_test_if(NNG_TRANSPORT_UDP udp_tran_test)
+if (NNG_TRANSPORT_UDP)
+ nng_sources(udp.c)
+ nng_defines(NNG_TRANSPORT_UDP)
+ nng_test(udp_tran_test)
+endif()
diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c
index 0aa46767..e19a5684 100644
--- a/src/sp/transport/udp/udp.c
+++ b/src/sp/transport/udp/udp.c
@@ -1756,8 +1756,14 @@ udp_ep_accept(void *arg, nni_aio *aio)
nni_mtx_unlock(&ep->mtx);
}
+static size_t
+udp_pipe_size(void)
+{
+ return (sizeof(udp_pipe));
+}
+
static nni_sp_pipe_ops udp_pipe_ops = {
- .p_size = sizeof(udp_pipe),
+ .p_size = udp_pipe_size,
.p_init = udp_pipe_init,
.p_fini = udp_pipe_fini,
.p_stop = udp_pipe_stop,
diff --git a/src/sp/transport/ws/CMakeLists.txt b/src/sp/transport/ws/CMakeLists.txt
index 437d0919..2b477c27 100644
--- a/src/sp/transport/ws/CMakeLists.txt
+++ b/src/sp/transport/ws/CMakeLists.txt
@@ -12,11 +12,14 @@
nng_directory(ws)
if (NNG_TRANSPORT_WS OR NNG_TRANSPORT_WSS)
- set(WS_ON ON)
+ nng_sources(websocket.c)
endif()
-nng_defines_if(NNG_TRANSPORT_WS NNG_TRANSPORT_WS)
-nng_defines_if(NNG_TRANSPORT_WSS NNG_TRANSPORT_WSS)
-nng_sources_if(WS_ON websocket.c)
-nng_test_if(WS_ON ws_test)
-nng_test_if(NNG_TRANSPORT_WSS wss_test)
+if (NNG_TRANSPORT_WS)
+ nng_defines(NNG_TRANSPORT_WS)
+ nng_test(ws_test)
+endif()
+if (NNG_TRANSPORT_WSS)
+ nng_defines(NNG_TRANSPORT_WSS)
+ nng_test(wss_test)
+endif()
diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c
index b6045306..515f7b65 100644
--- a/src/sp/transport/ws/websocket.c
+++ b/src/sp/transport/ws/websocket.c
@@ -328,8 +328,14 @@ wstran_pipe_getopt(
return (rv);
}
+static size_t
+wstran_pipe_size(void)
+{
+ return (sizeof(ws_pipe));
+}
+
static nni_sp_pipe_ops ws_pipe_ops = {
- .p_size = sizeof(ws_pipe),
+ .p_size = wstran_pipe_size,
.p_init = wstran_pipe_init,
.p_fini = wstran_pipe_fini,
.p_stop = wstran_pipe_stop,