aboutsummaryrefslogtreecommitdiff
path: root/src/sp/transport/dtls/dtls.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2025-04-27 18:40:40 -0700
committerGarrett D'Amore <garrett@damore.org>2025-06-01 22:49:00 -0700
commit8bcb82d245a5fce1bd519e2f99250dedf11e763d (patch)
tree4d663bedbb043b9d599f061d7f2b5f9509c8f390 /src/sp/transport/dtls/dtls.c
parent08400bd437149c4fb31af9b2abece2ae44041283 (diff)
downloadnng-8bcb82d245a5fce1bd519e2f99250dedf11e763d.tar.gz
nng-8bcb82d245a5fce1bd519e2f99250dedf11e763d.tar.bz2
nng-8bcb82d245a5fce1bd519e2f99250dedf11e763d.zip
Introduce DTLS transport for NNG.
This introduces a new experimental transport for DTLS, that provides encryption over UDP. It has a simpler protocol than the current UDP SP protocol (but we intend to fix that by making the UDP transport simpler in a follow up!) There are a few other fixes in the TLS layer itself, and in the build, that were needed to accomplish this work. Also there was an endianness bug in the UDP protocol handling, which is fixed here.
Diffstat (limited to 'src/sp/transport/dtls/dtls.c')
-rw-r--r--src/sp/transport/dtls/dtls.c1802
1 files changed, 1802 insertions, 0 deletions
diff --git a/src/sp/transport/dtls/dtls.c b/src/sp/transport/dtls/dtls.c
new file mode 100644
index 00000000..02961a56
--- /dev/null
+++ b/src/sp/transport/dtls/dtls.c
@@ -0,0 +1,1802 @@
+// Copyright 2025 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/aio.h"
+#include "core/defs.h"
+#include "core/idhash.h"
+#include "core/message.h"
+#include "core/nng_impl.h"
+#include "core/options.h"
+#include "core/pipe.h"
+#include "core/platform.h"
+#include "core/socket.h"
+#include "core/stats.h"
+#include "nng/nng.h"
+#include "supplemental/tls/tls_common.h"
+
+#include <string.h>
+
+// Experimental DTLS transport. Unicast only.
+//
+typedef struct dtls_pipe dtls_pipe;
+typedef struct dtls_ep dtls_ep;
+typedef struct dtls_conn dtls_conn;
+
+const uint8_t PROTO_VERSION = 1;
+
+// OP code, 8 bits
+enum dtls_opcode {
+ OPCODE_DATA = 0,
+ OPCODE_CREQ = 1,
+ OPCODE_CACK = 2,
+ OPCODE_DISC = 3,
+};
+
+// Disconnect reason, must be 16 bits
+typedef enum dtls_disc_reason {
+ DISC_CLOSED = 0, // normal close
+ DISC_TYPE = 1, // bad SP type
+ DISC_NOTCONN = 2, // no such connection
+ DISC_REFUSED = 3, // refused by policy
+ DISC_MSGSIZE = 4, // message too large
+ DISC_NEGO = 5, // neogtiation failed
+ DISC_INACTIVE = 6, // closed due to inactivity
+ DISC_PROTO = 7, // other protocol error
+ DISC_NOBUF = 8, // resources exhausted
+} dtls_disc_reason;
+
+#ifndef NNG_DTLS_TXQUEUE_LEN
+#define NNG_DTLS_TXQUEUE_LEN 32
+#endif
+
+#ifndef NNG_DTLS_RXQUEUE_LEN
+#define NNG_DTLS_RXQUEUE_LEN 16
+#endif
+
+// The maximum TLS record size
+#define DTLS_MAX_RECORD 16384
+
+// For DTLS we use a maximum record size of 16384,
+// but we reserve some space for headers. DTLS needs
+// 13 bytes, and the transport layer needs 8 bytes.
+// To leave some room for the future, we just trim to 64 bytes.
+#ifndef NNG_DTLS_RECVMAX
+#define NNG_DTLS_RECVMAX (DTLS_MAX_RECORD - 64)
+#endif
+
+#ifndef NNG_DTLS_REFRESH
+#define NNG_DTLS_REFRESH (5 * NNI_SECOND)
+#endif
+
+#ifndef NNG_DTLS_CONNRETRY
+#define NNG_DTLS_CONNRETRY (NNI_SECOND / 5)
+#endif
+
+// 64-bit protocol header
+typedef struct dtls_sp_hdr {
+ uint8_t us_ver;
+ uint8_t us_op_code;
+ uint16_t us_type;
+ uint16_t us_params[2];
+} dtls_sp_hdr;
+
+// DTLS pipe resend (CREQ) in msec (nng_duration)
+#define DTLS_PIPE_REFRESH(p) ((p)->refresh)
+
+// DTLS pipe timeout in msec (nng_duration)
+#define DTLS_PIPE_TIMEOUT(p) ((p)->refresh * 5)
+
+struct dtls_pipe {
+ dtls_ep *ep;
+ nni_pipe *npipe;
+ nng_sockaddr peer_addr;
+ uint64_t id; // hash of peer address
+ uint16_t peer;
+ uint16_t proto;
+ bool matched; // true if have matched and given this to SP
+ bool closed; // true if we are closed (no more send or recv!)
+ bool dialer; // true if we are dialer
+ nng_duration refresh; // seconds, for the protocol
+ nng_time next_wake;
+ nng_time expire; // inactivity expiration time
+ nng_time next_refresh;
+ nni_list_node node;
+ nni_lmq rx_mq;
+
+ // Upper layer queues. These are between the PIPE and SP.
+ bool send_busy; // true if send is in process
+ uint16_t send_max; // peer's max recv size
+ nni_list send_aios;
+ uint8_t *send_buf;
+ size_t send_bufsz;
+ nng_aio send_tls_aio;
+
+ bool recv_busy;
+ bool recv_rdy; // receive is done and data in recvbuf
+ uint16_t recv_max; // max recv size
+ nni_list recv_aios;
+ uint8_t *recv_buf;
+ size_t recv_bufsz;
+ nng_aio recv_tls_aio;
+
+ // Lower layer queues. These are between the
+
+ uint8_t send_op; // usually OPCODE_DATA
+ uint8_t last_op; // last op code we sent
+ uint16_t reason; // only for disconnect
+
+ nni_mtx lower_mtx; // protects the lower rx_q, etc.
+
+ // This is the lower level RX buffer, which contains only
+ // received ciphertext (content before passed to TLS layer for
+ // decrypt). The actual pointers may change, as we "swap"
+ // buffers between the endpoint and the pipe to avoid copying.
+ nni_list rx_q; // lower aio from the TLS layer
+
+ nni_tls_conn tls;
+};
+
+struct dtls_ep {
+ nng_udp *udp;
+ nni_mtx mtx;
+ uint16_t proto;
+ uint16_t peer;
+ uint16_t af; // address family
+ bool fini;
+ bool started;
+ bool closed;
+ nng_url *url;
+ const char *host; // for dialers
+ nni_aio *useraio;
+ nni_aio *connaio;
+ nni_aio timeaio;
+ nni_aio resaio;
+ bool dialer;
+ nni_listener *nlistener;
+ nni_dialer *ndialer;
+ nni_msg *rx_payload; // current receive message
+ nng_sockaddr rx_sa; // addr for last message
+ nni_aio tx_aio; // aio for TX handling
+ nni_aio rx_aio; // aio for RX handling
+ nni_id_map pipes; // pipes (indexed by id)
+ nni_sockaddr self_sa; // our address
+ nni_sockaddr peer_sa; // peer address, only for dialer;
+ nni_list connaios; // aios from accept waiting for a client peer
+ nni_list connpipes; // pipes waiting to be connected
+ nng_duration refresh; // refresh interval for connections in seconds
+ uint16_t rcvmax; // max payload, trimmed to uint16_t
+ nni_resolv_item resolv;
+
+ nng_tls_config *tlscfg;
+
+ size_t rx_size; // size of the rx buffer
+ void *rx_buf;
+
+ nni_stat_item st_rcv_max;
+ nni_stat_item st_rcv_reorder;
+ nni_stat_item st_rcv_toobig;
+ nni_stat_item st_rcv_nomatch;
+ nni_stat_item st_rcv_copy;
+ nni_stat_item st_rcv_nocopy;
+ nni_stat_item st_rcv_nobuf;
+ nni_stat_item st_snd_toobig;
+ nni_stat_item st_snd_nobuf;
+ nni_stat_item st_peer_inactive;
+ nni_stat_item st_copy_max;
+};
+
+static void dtls_ep_start(dtls_ep *);
+static void dtls_resolv_cb(void *);
+static void dtls_rx_cb(void *);
+
+static void dtls_ep_match(dtls_ep *ep);
+static void dtls_remove_pipe(dtls_pipe *p);
+
+// BIO send/recv functions for use by the common TLS layer.
+
+static void
+dtls_bio_cancel(nng_aio *aio, void *arg, nng_err rv)
+{
+ dtls_pipe *p = arg;
+ nni_mtx_lock(&p->lower_mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ }
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&p->lower_mtx);
+}
+
+static void
+dtls_bio_recv_done(dtls_pipe *p)
+{
+ nng_aio *aio;
+ uint8_t *ptr;
+ size_t resid;
+ nni_msg *msg;
+
+ while ((!nni_lmq_empty(&p->rx_mq)) &&
+ ((aio = nni_list_first(&p->rx_q)) != NULL)) {
+
+ nni_aio_list_remove(aio);
+ nni_lmq_get(&p->rx_mq, &msg);
+
+ // assumption we only have a body, because we don't bother to
+ // fill in the header for raw UDP.
+
+ resid = nni_msg_len(msg);
+ ptr = nni_msg_body(msg);
+
+ for (unsigned i = 0; i < aio->a_nio && resid > 0; i++) {
+ size_t num = resid > aio->a_iov[i].iov_len
+ ? aio->a_iov[i].iov_len
+ : resid;
+ memcpy(aio->a_iov[i].iov_buf, ptr, num);
+ ptr += num;
+ resid -= num;
+ }
+ nni_aio_finish(aio, NNG_OK, nni_msg_len(msg));
+ nni_msg_free(msg);
+ }
+}
+
+static void
+dtls_bio_recv(void *arg, nng_aio *aio)
+{
+ dtls_pipe *p = arg;
+
+ nni_mtx_lock(&p->lower_mtx);
+ if (!nni_aio_start(aio, dtls_bio_cancel, p)) {
+ nni_mtx_unlock(&p->lower_mtx);
+ return;
+ }
+
+ nni_aio_list_append(&p->rx_q, aio);
+ dtls_bio_recv_done(p);
+ nni_mtx_unlock(&p->lower_mtx);
+}
+
+static void
+dtls_bio_send(void *arg, nng_aio *aio)
+{
+ dtls_pipe *p = arg;
+
+ nni_mtx_lock(&p->lower_mtx);
+ if (!p->closed) {
+ nni_aio_set_input(aio, 0, &p->peer_addr);
+ nng_udp_send(p->ep->udp, aio);
+ }
+ nni_mtx_unlock(&p->lower_mtx);
+}
+
+static void
+dtls_bio_free(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+dtls_bio_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+dtls_bio_stop(void *arg)
+{
+ dtls_pipe *p = arg;
+ nni_aio_stop(&p->recv_tls_aio);
+ nni_aio_stop(&p->send_tls_aio);
+}
+
+static nni_tls_bio_ops dtls_bio_ops = {
+ .bio_send = dtls_bio_send,
+ .bio_recv = dtls_bio_recv,
+ .bio_close = dtls_bio_close,
+ .bio_stop = dtls_bio_stop,
+ .bio_free = dtls_bio_free,
+};
+
+static void
+dtls_tran_init(void)
+{
+}
+
+static void
+dtls_tran_fini(void)
+{
+}
+
+//
+// Upper layer functions - moving data between TLS and SP.
+// TLS acts as kind of a stream for us, so we only see the
+// data that is meant for us, but we will send and receive
+// control messages that are not just data payloads.
+//
+
+static void dtls_pipe_send_cancel(nng_aio *, void *, nng_err);
+static void dtls_pipe_send_tls(dtls_pipe *);
+static void dtls_pipe_send_tls_cb(void *arg);
+
+static void
+dtls_pipe_send(void *arg, nni_aio *aio)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep;
+ nng_msg *msg;
+ size_t count = 0;
+ size_t sndmax;
+
+ msg = nni_aio_get_msg(aio);
+ ep = p->ep;
+
+ if (msg != NULL) {
+ count = nni_msg_len(msg) + nni_msg_header_len(msg);
+ }
+
+ nni_mtx_lock(&ep->mtx);
+ sndmax = p->send_max;
+ if (!nni_aio_start(aio, dtls_pipe_send_cancel, p)) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ nni_aio_reset(aio);
+ if ((nni_msg_len(msg) + nni_msg_header_len(msg)) > sndmax) {
+ // rather failing this with an error, we just drop it
+ // on the floor. this is on the sender, so there isn't
+ // a compelling need to disconnect the pipe, since it
+ // we're not being "ill-behaved" to our peer.
+ nni_stat_inc(&ep->st_snd_toobig, 1);
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish(aio, 0, count);
+ nni_msg_free(msg);
+ return;
+ }
+
+ nni_aio_list_append(&p->send_aios, aio);
+ dtls_pipe_send_tls(p);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_pipe_send_cancel(nng_aio *aio, void *arg, nng_err err)
+{
+ dtls_pipe *p = arg;
+ nni_mtx_lock(&p->ep->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, err);
+ }
+ nni_mtx_unlock(&p->ep->mtx);
+}
+
+// Lower layer send/recv functions, used by the pipe layer.
+
+static void
+dtls_pipe_send_tls(dtls_pipe *p)
+{
+ nni_aio *aio;
+ nng_msg *msg;
+ uint8_t opcode;
+ nng_iov iov;
+ dtls_sp_hdr *hdr = (void *) p->send_buf;
+
+ if (p->send_busy || p->closed) {
+ return;
+ }
+
+ opcode = p->send_op;
+ // reset the last op
+ p->send_op = OPCODE_DATA;
+
+ hdr->us_ver = PROTO_VERSION;
+ hdr->us_op_code = opcode;
+ NNI_PUT16LE(&hdr->us_type, p->proto);
+ hdr->us_params[0] = 0;
+ hdr->us_params[1] = 0;
+
+ iov.iov_buf = hdr;
+ iov.iov_len = sizeof(*hdr);
+
+ switch (opcode) {
+ case OPCODE_DATA:
+ for (;;) {
+ if ((aio = nni_list_first(&p->send_aios)) == NULL) {
+ // no work for us!
+ return;
+ }
+ nni_aio_list_remove(aio);
+ msg = nni_aio_get_msg(aio);
+ if (nni_msg_header_len(msg) + nni_msg_len(msg) +
+ sizeof(*hdr) >
+ p->send_bufsz) {
+ nng_msg_free(msg);
+ nni_aio_finish_error(aio, NNG_EMSGSIZE);
+ continue;
+ }
+ break; // for loop
+ }
+
+ size_t len = nni_msg_header_len(msg);
+ uint8_t *data = (void *) (hdr + 1);
+ memcpy(data, nni_msg_header(msg), len);
+ data += len;
+ memcpy(data, nni_msg_body(msg), nni_msg_len(msg));
+ len += nni_msg_len(msg);
+
+ NNI_PUT16LE(&hdr->us_params[0], (uint16_t) len);
+ iov.iov_len += len;
+
+ nni_msg_free(msg);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
+ break;
+
+ case OPCODE_CREQ:
+ case OPCODE_CACK:
+ NNI_PUT16LE(&hdr->us_params[0], p->recv_max);
+ NNI_PUT16LE(&hdr->us_params[1], p->refresh);
+ break;
+
+ case OPCODE_DISC:
+ NNI_PUT16LE(&hdr->us_params[0], p->reason);
+ p->closed = true;
+ break;
+ default:
+ NNI_ASSERT(false); // this should never happen!
+ // fall back to sending a disconnect
+ hdr->us_op_code = OPCODE_DISC;
+ NNI_PUT16LE(&hdr->us_params[0], DISC_PROTO);
+ }
+
+ p->last_op = opcode;
+ p->send_busy = true;
+ nni_aio_set_iov(&p->send_tls_aio, 1, &iov);
+ nni_tls_send(&p->tls, &p->send_tls_aio);
+}
+
+static void
+dtls_pipe_send_tls_cb(void *arg)
+{
+ dtls_pipe *p = arg;
+
+ nni_mtx_lock(&p->ep->mtx);
+
+ p->send_busy = false;
+ if (nni_aio_result(&p->send_tls_aio) != NNG_OK ||
+ p->last_op == OPCODE_DISC) {
+ nni_pipe_close(p->npipe);
+ if (p->matched == 0) {
+ dtls_remove_pipe(p);
+ }
+ nni_mtx_unlock(&p->ep->mtx);
+ return;
+ }
+ dtls_pipe_send_tls(p);
+ nni_mtx_unlock(&p->ep->mtx);
+}
+
+// RECV SIDE
+
+static void dtls_pipe_recv_cancel(nni_aio *, void *, nng_err);
+static void dtls_pipe_recv_tls(dtls_pipe *);
+
+static void
+dtls_pipe_recv(void *arg, nni_aio *aio)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+
+ nni_aio_reset(aio);
+ nni_mtx_lock(&ep->mtx);
+ if (p->closed) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if (!nni_aio_start(aio, dtls_pipe_recv_cancel, p)) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ nni_list_append(&p->recv_aios, aio);
+ dtls_pipe_recv_tls(p);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_pipe_recv_cancel(nni_aio *aio, void *arg, nng_err rv)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+
+ nni_mtx_lock(&ep->mtx);
+ if (!nni_aio_list_active(aio)) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+}
+
+static void
+dtls_pipe_recv_tls_start(dtls_pipe *p)
+{
+ nng_iov iov;
+ if (p->recv_busy || p->closed) {
+ return;
+ }
+ p->recv_busy = true;
+ iov.iov_buf = p->recv_buf;
+ iov.iov_len = p->recv_bufsz;
+
+ nni_aio_set_iov(&p->recv_tls_aio, 1, &iov);
+ nni_tls_recv(&p->tls, &p->recv_tls_aio);
+}
+
+static void
+dtls_pipe_recv_tls(dtls_pipe *p)
+{
+ nng_aio *aio = nni_list_first(&p->recv_aios);
+ size_t len;
+ nng_msg *msg;
+ int rv;
+
+ if (aio == NULL) {
+ return;
+ }
+ if (!p->recv_rdy) {
+ dtls_pipe_recv_tls_start(p);
+ return;
+ }
+
+ p->recv_rdy = false;
+
+ nni_aio_list_remove(aio);
+ len = nng_aio_count(&p->recv_tls_aio);
+ NNI_ASSERT(len >= sizeof(dtls_sp_hdr));
+ len -= sizeof(dtls_sp_hdr);
+
+ if ((rv = nni_msg_alloc(&msg, len)) != NNG_OK) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ memcpy(nng_msg_body(msg), p->recv_buf + sizeof(dtls_sp_hdr), len);
+ nni_aio_finish_msg(aio, msg);
+}
+
+static void
+dtls_pipe_recv_tls_cb(void *arg)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+ dtls_sp_hdr *hdr = (void *) p->recv_buf;
+ nng_aio *aio = &p->recv_tls_aio;
+ uint16_t proto;
+ uint16_t refresh;
+ uint16_t rcvmax;
+ nng_err rv;
+
+ nni_mtx_lock(&ep->mtx);
+ p->recv_busy = false;
+
+ if ((rv = nni_aio_result(aio)) != NNG_OK) {
+
+ // If we didn't connect yet, issue an error so the peer can see
+ // a connection failure (e.g. if we failed the TLS handshake.)
+ if (p->dialer && !p->matched) {
+ nni_aio *caio;
+ if ((caio = nni_list_first(&ep->connaios)) != NULL) {
+ nni_aio_list_remove(caio);
+ nni_aio_finish_error(caio, rv);
+ }
+ }
+
+ // Bump a bad receive stat (e.g. someone may have sent us
+ // garbage.) We do not acknowledge or handle garbage frames
+ // sent to an open session.
+ nni_pipe_close(p->npipe);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ // We had a "good" receive (TLS passed at least) from the peer.
+
+ if (nni_aio_count(aio) < sizeof(*hdr)) {
+ // Runt frame.
+ p->send_op = OPCODE_DISC;
+ p->reason = DISC_PROTO;
+ goto bad;
+ }
+
+ if (nni_aio_count(aio) > sizeof(*hdr) + p->recv_max) {
+ p->send_op = OPCODE_DISC;
+ p->reason = DISC_MSGSIZE;
+ goto bad;
+ }
+
+ if (hdr->us_ver != PROTO_VERSION) {
+ // Bad protocol version
+ p->send_op = OPCODE_DISC;
+ p->reason = DISC_PROTO;
+ goto bad;
+ }
+ NNI_GET16LE(&hdr->us_type, proto);
+ if (proto != p->peer) {
+ // Bad SP protocol type
+ p->send_op = OPCODE_DISC;
+ p->reason = DISC_TYPE;
+ goto bad;
+ }
+
+ p->expire = nni_clock() + DTLS_PIPE_TIMEOUT(p);
+
+ if (!p->matched) {
+ p->matched = true;
+ nni_list_append(&p->ep->connpipes, p);
+ dtls_ep_match(p->ep);
+ }
+
+ switch (hdr->us_op_code) {
+ case OPCODE_CREQ:
+ if (p->dialer) {
+ // dialers don't accept requests
+ goto bad;
+ }
+ NNI_GET16LE(&hdr->us_params[0], rcvmax);
+ NNI_GET16LE(&hdr->us_params[1], refresh);
+ if ((refresh > 0) && ((refresh * NNI_SECOND) < p->refresh)) {
+ p->refresh = refresh * NNI_SECOND;
+ }
+ if ((rcvmax > 0) && (rcvmax < NNG_DTLS_RECVMAX)) {
+ p->send_max = rcvmax;
+ }
+ // schedule the CACK reply
+ p->send_op = OPCODE_CACK;
+ break;
+
+ case OPCODE_CACK:
+ if (!p->dialer) {
+ goto bad;
+ }
+ NNI_GET16LE(&hdr->us_params[0], rcvmax);
+ NNI_GET16LE(&hdr->us_params[0], refresh);
+
+ if ((refresh > 0) && ((refresh * NNI_SECOND) < p->refresh)) {
+ p->refresh = refresh * NNI_SECOND;
+ }
+ if ((rcvmax > 0) && (rcvmax < NNG_DTLS_RECVMAX)) {
+ p->send_max = rcvmax;
+ }
+ break;
+
+ case OPCODE_DISC:
+ p->closed = true;
+ nni_mtx_unlock(&ep->mtx);
+ nni_pipe_close(p->npipe);
+ return;
+
+ case OPCODE_DATA:
+ p->recv_rdy = true;
+ dtls_pipe_recv_tls(p);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+bad:
+ if (p->send_op != OPCODE_DATA) {
+ dtls_pipe_send_tls(p);
+ }
+ dtls_pipe_recv_tls_start(p);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_pipe_close(void *arg)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+ nni_aio *aio;
+
+ nni_mtx_lock(&ep->mtx);
+ while ((aio = nni_list_first(&p->recv_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ while ((aio = nni_list_first(&p->send_aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ if (!p->matched) {
+ dtls_remove_pipe(p);
+ } else {
+ p->send_op = OPCODE_DISC;
+ p->reason = DISC_CLOSED;
+ dtls_pipe_send_tls(p);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static nng_err dtls_add_pipe(dtls_ep *ep, dtls_pipe *p);
+
+static void
+dtls_pipe_stop(void *arg)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+
+ dtls_pipe_close(arg);
+
+ nni_mtx_lock(&ep->mtx);
+ dtls_remove_pipe(p);
+ nni_list_node_remove(&p->node);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static int
+dtls_pipe_alloc(dtls_ep *ep, dtls_pipe **pp, const nng_sockaddr *sa)
+{
+ dtls_pipe *p;
+ nng_err rv;
+
+ if (ep->dialer) {
+ rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer);
+ } else {
+ rv = nni_pipe_alloc_listener((void *) &p, ep->nlistener);
+ }
+ if (rv != NNG_OK) {
+ nng_log_err("NNG-DTLS-PIPE-ALLOC-FAIL",
+ "Failed allocating pipe for DTLS: %s", nng_strerror(rv));
+ return (rv);
+ }
+ p->dialer = ep->dialer;
+ p->ep = ep;
+ p->proto = ep->proto;
+ p->peer = ep->peer;
+ p->peer_addr = *sa;
+ p->id = nng_sockaddr_hash(sa);
+ p->refresh = ep->refresh;
+ p->send_max = NNG_DTLS_RECVMAX;
+ p->recv_max = ep->rcvmax;
+ *pp = p;
+
+ if (((rv = dtls_add_pipe(ep, p)) != NNG_OK) ||
+ ((rv = nni_tls_init(&p->tls, ep->tlscfg)) != NNG_OK) ||
+ ((rv = nni_tls_start(&p->tls, &dtls_bio_ops, p, sa)) != NNG_OK)) {
+ nni_pipe_close(p->npipe);
+ nng_log_err("NNG-DTLS-PIPE-ADD-FAIL",
+ "Failed adding pipe for DTLS: %s", nng_strerror(rv));
+ return (rv);
+ }
+
+ // We need to start a receiver on the pipe.
+ dtls_pipe_recv_tls_start(p);
+
+ // Also start TLS up and running.
+ switch (nni_tls_run(&p->tls)) {
+ case NNG_OK:
+ case NNG_EAGAIN:
+ break;
+ default:
+ nni_pipe_close(p->npipe);
+ break;
+ }
+
+ // wake the timer so it knows to resubmit
+ nni_aio_abort(&ep->timeaio, NNG_ETIMEDOUT);
+
+ return (NNG_OK);
+}
+
+static size_t
+dtls_pipe_size(void)
+{
+ return (NNI_ALIGN_UP(sizeof(dtls_pipe)) +
+ NNI_ALIGN_UP(nni_tls_engine_conn_size()));
+}
+
+static int
+dtls_pipe_init(void *arg, nni_pipe *npipe)
+{
+ dtls_pipe *p = arg;
+ p->npipe = npipe;
+
+ size_t bufsz = DTLS_MAX_RECORD; // TODO: Make this a tunable.
+
+ if ((p->recv_buf = nni_alloc(bufsz)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((p->send_buf = nni_alloc(bufsz)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ p->recv_bufsz = bufsz;
+ p->send_bufsz = bufsz;
+ nni_mtx_init(&p->lower_mtx);
+ nni_aio_init(&p->recv_tls_aio, dtls_pipe_recv_tls_cb, p);
+ nni_aio_init(&p->send_tls_aio, dtls_pipe_send_tls_cb, p);
+ nni_aio_list_init(&p->rx_q);
+ nni_aio_list_init(&p->recv_aios);
+ nni_aio_list_init(&p->send_aios);
+ nni_lmq_init(&p->rx_mq, NNG_DTLS_RXQUEUE_LEN);
+
+ return (0);
+}
+
+static void
+dtls_pipe_fini(void *arg)
+{
+ dtls_pipe *p = arg;
+ nng_msg *m;
+
+ nni_tls_fini(&p->tls);
+ nni_aio_fini(&p->recv_tls_aio);
+ nni_aio_fini(&p->send_tls_aio);
+ if (p->recv_buf != NULL) {
+ nni_free(p->recv_buf, p->recv_bufsz);
+ }
+ if (p->send_buf != NULL) {
+ nni_free(p->send_buf, p->send_bufsz);
+ }
+ nni_mtx_lock(&p->lower_mtx);
+ while (!nni_lmq_empty(&p->rx_mq)) {
+ nni_lmq_get(&p->rx_mq, &m);
+ nni_msg_free(m);
+ }
+ nni_mtx_unlock(&p->lower_mtx);
+ nni_mtx_fini(&p->lower_mtx);
+ nni_lmq_fini(&p->rx_mq);
+ NNI_ASSERT(nni_list_empty(&p->recv_aios));
+ NNI_ASSERT(nni_list_empty(&p->send_aios));
+}
+
+static dtls_pipe *
+dtls_find_pipe(dtls_ep *ep, const nng_sockaddr *peer_addr)
+{
+ uint64_t id = nng_sockaddr_hash(peer_addr);
+ dtls_pipe *p;
+
+ // we'll keep incrementing id until we conclusively match
+ // or we get a NULL. This is another level of rehashing, but
+ // it keeps us from having to look up.
+ for (;;) {
+ if ((p = nni_id_get(&ep->pipes, id)) == NULL) {
+ return (NULL);
+ }
+ if (nng_sockaddr_equal(&p->peer_addr, peer_addr)) {
+ return (p);
+ }
+ id++;
+ if (id == 0) {
+ id = 1;
+ }
+ }
+}
+
+static void
+dtls_remove_pipe(dtls_pipe *p)
+{
+ // ep locked
+ dtls_ep *ep = p->ep;
+ uint64_t id = p->id;
+ bool matched = p->matched;
+ if (id == 0) {
+ return;
+ }
+ p->id = 0;
+ for (;;) {
+ dtls_pipe *srch;
+ if ((srch = nni_id_get(&ep->pipes, id)) == NULL) {
+ break;
+ }
+ if (srch == p) {
+ nni_id_remove(&ep->pipes, id);
+ break;
+ }
+ id++;
+ if (id == 0) {
+ id = 1;
+ }
+ }
+ if (!matched) {
+ nni_pipe_rele(p->npipe);
+ }
+}
+
+static nng_err
+dtls_add_pipe(dtls_ep *ep, dtls_pipe *p)
+{
+ // Id must be part of the hash
+ uint64_t id = p->id;
+ while (nni_id_get(&ep->pipes, id) != NULL) {
+ id++;
+ if (id == 0) {
+ id = 1;
+ }
+ }
+ return (nni_id_set(&ep->pipes, id, p));
+}
+
+static void
+dtls_start_rx(dtls_ep *ep)
+{
+ nni_iov iov;
+
+ iov.iov_buf = ep->rx_buf;
+ iov.iov_len = ep->rx_size;
+
+ nni_aio_reset(&ep->rx_aio);
+ nni_aio_set_input(&ep->rx_aio, 0, &ep->rx_sa);
+ nni_aio_set_iov(&ep->rx_aio, 1, &iov);
+ nng_udp_recv(ep->udp, &ep->rx_aio);
+}
+
+static void
+dtls_rx_cb(void *arg)
+{
+ dtls_ep *ep = arg;
+ dtls_pipe *p;
+ nni_aio *aio = &ep->rx_aio;
+ int rv;
+ nni_msg *msg;
+
+ nni_mtx_lock(&ep->mtx);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ // something bad happened on RX... which is unexpected.
+ // sleep a little bit and hope for recovery.
+ switch (nni_aio_result(aio)) {
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ case NNG_ESTOPPED:
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ case NNG_ETIMEDOUT:
+ case NNG_EAGAIN:
+ case NNG_EINTR:
+ default:
+ goto fail;
+ }
+ }
+
+ // If this came from another host, and we are a dialer, we discard.
+ // Dialers only talk to the party they explicitly dialed.
+ if (ep->dialer && !nng_sockaddr_equal(&ep->rx_sa, &ep->peer_sa)) {
+ goto fail;
+ }
+
+ if ((p = dtls_find_pipe(ep, &ep->rx_sa)) == NULL) {
+ if (dtls_pipe_alloc(ep, &p, &ep->rx_sa) != NNG_OK) {
+ goto fail;
+ }
+ }
+ if (p->closed) {
+ goto fail;
+ }
+ NNI_ASSERT(p != NULL);
+
+ if (nni_msg_alloc(&msg, nni_aio_count(aio)) != NNG_OK) {
+ // TODO BUMP A NO RECV ALLOC STAT
+ goto fail;
+ }
+ memcpy(nni_msg_body(msg), ep->rx_buf, nni_aio_count(aio));
+ dtls_start_rx(ep);
+ nni_mtx_unlock(&ep->mtx);
+
+ nni_mtx_lock(&p->lower_mtx);
+
+ if (nni_lmq_put(&p->rx_mq, msg) != NNG_OK) {
+ // TODO: BUMP TXQ FULL STAT
+ nng_msg_free(msg);
+ }
+ dtls_bio_recv_done(p);
+ nni_mtx_unlock(&p->lower_mtx);
+
+ // Run the TLS state machine.
+ switch (nni_tls_run(&p->tls)) {
+ case NNG_OK:
+ case NNG_EAGAIN:
+ break;
+ default:
+ nni_pipe_close(p->npipe);
+ }
+ return;
+
+fail:
+ // start another receive
+ dtls_start_rx(ep);
+
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static uint16_t
+dtls_pipe_peer(void *arg)
+{
+ dtls_pipe *p = arg;
+
+ return (p->peer);
+}
+
+static nng_err
+dtls_pipe_get_recvmax(void *arg, void *v, size_t *szp, nni_type t)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+ nng_err rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_size(p->recv_max, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
+}
+
+static nng_err
+dtls_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_type t)
+{
+ dtls_pipe *p = arg;
+ dtls_ep *ep = p->ep;
+ nng_err rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_sockaddr(&p->peer_addr, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
+}
+
+static nni_option dtls_pipe_options[] = {
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_get = dtls_pipe_get_recvmax,
+ },
+ {
+ .o_name = NNG_OPT_REMADDR,
+ .o_get = dtls_pipe_get_remaddr,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static nng_err
+dtls_pipe_getopt(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ dtls_pipe *p = arg;
+ int rv;
+
+ rv = nni_getopt(dtls_pipe_options, name, p, buf, szp, t);
+ return (rv);
+}
+
+static void
+dtls_ep_fini(void *arg)
+{
+ dtls_ep *ep = arg;
+
+ nni_aio_fini(&ep->timeaio);
+ nni_aio_fini(&ep->resaio);
+ nni_aio_fini(&ep->tx_aio);
+ nni_aio_fini(&ep->rx_aio);
+
+ if (ep->udp != NULL) {
+ nng_udp_close(ep->udp);
+ }
+ if (ep->rx_size != 0) {
+ nni_free(ep->rx_buf, ep->rx_size);
+ }
+
+ nni_msg_free(ep->rx_payload); // safe even if msg is null
+ nni_id_map_fini(&ep->pipes);
+ nni_mtx_fini(&ep->mtx);
+}
+
+static void
+dtls_ep_close(void *arg)
+{
+ dtls_ep *ep = arg;
+ nni_aio *aio;
+ dtls_pipe *p;
+ uint64_t key;
+ uint32_t cursor;
+
+ nni_aio_close(&ep->resaio);
+ nni_aio_close(&ep->rx_aio);
+ nni_aio_close(&ep->timeaio);
+
+ // leave tx open so we can send disconnects
+
+ nni_mtx_lock(&ep->mtx);
+ ep->closed = true;
+ while ((aio = nni_list_first(&ep->connaios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECONNABORTED);
+ }
+ cursor = 0;
+ key = 0;
+ while (nni_id_visit(&ep->pipes, &key, (void **) &p, &cursor)) {
+ nni_pipe_close(p->npipe);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_ep_stop(void *arg)
+{
+ dtls_ep *ep = arg;
+
+ nni_aio_stop(&ep->resaio);
+ nni_aio_stop(&ep->rx_aio);
+ nni_aio_stop(&ep->timeaio);
+
+ nni_mtx_lock(&ep->mtx);
+ ep->fini = true;
+ nni_mtx_unlock(&ep->mtx);
+}
+
+// timer handler - sends out additional creqs as needed,
+// reaps stale connections, and handles linger.
+static void
+dtls_timer_cb(void *arg)
+{
+ dtls_ep *ep = arg;
+ dtls_pipe *p;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_aio_result(&ep->timeaio);
+ switch (rv) {
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ case NNG_ESTOPPED:
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ default:
+ if (ep->closed) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ break;
+ }
+
+ uint32_t cursor = 0;
+ nni_time now = nni_clock();
+ nng_duration refresh = NNG_DURATION_INFINITE;
+
+ while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) {
+
+ if (p->closed) {
+ continue;
+ }
+ NNI_ASSERT(p->refresh > 0);
+ if (p->expire > 0 && now > p->expire) {
+ char buf[128];
+ nng_log_info("NNG-DTLS-INACTIVE",
+ "Pipe peer %s timed out due to inactivity",
+ nng_str_sockaddr(&p->peer_addr, buf, sizeof(buf)));
+
+ nni_stat_inc(&ep->st_peer_inactive, 1);
+ nni_pipe_close(p->npipe);
+ continue;
+ }
+
+ if (p->dialer && now > p->next_refresh) {
+ p->send_op = OPCODE_CREQ;
+ p->next_refresh = p->expire + p->refresh;
+ dtls_pipe_send_tls(p);
+ }
+ if (refresh == NNG_DURATION_INFINITE && p->refresh > 0) {
+ refresh = p->refresh;
+ } else if ((p->refresh > 0) && (p->refresh < refresh)) {
+ refresh = p->refresh;
+ }
+ }
+ nni_sleep_aio(refresh, &ep->timeaio);
+
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static nng_err
+dtls_ep_init(
+ dtls_ep *ep, nng_url *url, nni_sock *sock, nni_dialer *d, nni_listener *l)
+{
+ nni_mtx_init(&ep->mtx);
+ nni_id_map_init(&ep->pipes, 1, 0xFFFFFFFF, true);
+ NNI_LIST_INIT(&ep->connpipes, dtls_pipe, node);
+ nni_aio_list_init(&ep->connaios);
+
+ nni_aio_init(&ep->rx_aio, dtls_rx_cb, ep);
+ nni_aio_init(&ep->timeaio, dtls_timer_cb, ep);
+ nni_aio_init(&ep->resaio, dtls_resolv_cb, ep);
+
+ if (strcmp(url->u_scheme, "dtls") == 0) {
+ ep->af = NNG_AF_UNSPEC;
+ } else if (strcmp(url->u_scheme, "dtls4") == 0) {
+ ep->af = NNG_AF_INET;
+ } else if (strcmp(url->u_scheme, "dtls6") == 0) {
+ ep->af = NNG_AF_INET6;
+ } else {
+ return (NNG_EADDRINVAL);
+ }
+
+ ep->self_sa.s_family = ep->af;
+ ep->proto = nni_sock_proto_id(sock);
+ ep->peer = nni_sock_peer_id(sock);
+ ep->url = url;
+ ep->refresh = NNG_DTLS_REFRESH; // one minute by default
+ ep->rcvmax = NNG_DTLS_RECVMAX;
+
+ // receive buffer plus some extra for UDP and TLS headers
+ if ((ep->rx_buf = nni_alloc(DTLS_MAX_RECORD)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ ep->rx_size = DTLS_MAX_RECORD;
+
+ NNI_STAT_LOCK(rcv_max_info, "rcv_max", "maximum receive size",
+ NNG_STAT_LEVEL, NNG_UNIT_BYTES);
+ NNI_STAT_LOCK(rcv_nomatch_info, "rcv_nomatch",
+ "messages without a matching connection", NNG_STAT_COUNTER,
+ NNG_UNIT_MESSAGES);
+ NNI_STAT_LOCK(rcv_toobig_info, "rcv_toobig",
+ "received messages rejected because too big", NNG_STAT_COUNTER,
+ NNG_UNIT_MESSAGES);
+ NNI_STAT_LOCK(rcv_nobuf_info, "rcv_nobuf",
+ "received messages dropped no buffer", NNG_STAT_COUNTER,
+ NNG_UNIT_MESSAGES);
+ NNI_STAT_LOCK(snd_toobig_info, "snd_toobig",
+ "sent messages rejected because too big", NNG_STAT_COUNTER,
+ NNG_UNIT_MESSAGES);
+ NNI_STAT_LOCK(snd_nobuf_info, "snd_nobuf",
+ "sent messages dropped no buffer", NNG_STAT_COUNTER,
+ NNG_UNIT_MESSAGES);
+ NNI_STAT_LOCK(peer_inactive_info, "peer_inactive",
+ "connections closed due to inactive peer", NNG_STAT_COUNTER,
+ NNG_UNIT_EVENTS);
+
+ nni_stat_init_lock(&ep->st_rcv_max, &rcv_max_info, &ep->mtx);
+ nni_stat_init_lock(&ep->st_rcv_toobig, &rcv_toobig_info, &ep->mtx);
+ nni_stat_init_lock(&ep->st_rcv_nomatch, &rcv_nomatch_info, &ep->mtx);
+ nni_stat_init_lock(&ep->st_rcv_nobuf, &rcv_nobuf_info, &ep->mtx);
+ nni_stat_init_lock(&ep->st_snd_toobig, &snd_toobig_info, &ep->mtx);
+ nni_stat_init_lock(&ep->st_snd_nobuf, &snd_nobuf_info, &ep->mtx);
+ nni_stat_init_lock(
+ &ep->st_peer_inactive, &peer_inactive_info, &ep->mtx);
+
+ if (l) {
+ NNI_ASSERT(d == NULL);
+ nni_listener_add_stat(l, &ep->st_rcv_max);
+
+ nni_listener_add_stat(l, &ep->st_rcv_toobig);
+ nni_listener_add_stat(l, &ep->st_rcv_nomatch);
+ nni_listener_add_stat(l, &ep->st_rcv_nobuf);
+ nni_listener_add_stat(l, &ep->st_snd_toobig);
+ nni_listener_add_stat(l, &ep->st_snd_nobuf);
+ }
+ if (d) {
+ NNI_ASSERT(l == NULL);
+ nni_dialer_add_stat(d, &ep->st_rcv_max);
+ nni_dialer_add_stat(d, &ep->st_rcv_toobig);
+ nni_dialer_add_stat(d, &ep->st_rcv_nomatch);
+ nni_dialer_add_stat(d, &ep->st_rcv_nobuf);
+ nni_dialer_add_stat(d, &ep->st_snd_toobig);
+ nni_dialer_add_stat(d, &ep->st_snd_nobuf);
+ }
+
+ // schedule our timer callback - forever for now
+ // adjusted automatically as we add pipes or other
+ // actions which require earlier wakeup.
+ nni_sleep_aio(NNG_DURATION_INFINITE, &ep->timeaio);
+ // nni_sleep_aio(100, &ep->timeaio);
+
+ return (NNG_OK);
+}
+
+static nng_err
+dtls_check_url(nng_url *url, bool listen)
+{
+ // Check for invalid URL components.
+ if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) {
+ return (NNG_EADDRINVAL);
+ }
+ if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) ||
+ (url->u_query != NULL)) {
+ return (NNG_EADDRINVAL);
+ }
+ if (!listen) {
+ if ((strlen(url->u_hostname) == 0) || (url->u_port == 0)) {
+ return (NNG_EADDRINVAL);
+ }
+ }
+ return (NNG_OK);
+}
+
+static nng_err
+dtls_dialer_init(void *arg, nng_url *url, nni_dialer *ndialer)
+{
+ dtls_ep *ep = arg;
+ nng_err rv;
+ nni_sock *sock = nni_dialer_sock(ndialer);
+
+ if ((rv = dtls_check_url(url, false)) != NNG_OK) {
+ return (rv);
+ }
+
+ ep->ndialer = ndialer;
+ if ((rv = dtls_ep_init(ep, url, sock, ndialer, NULL)) != NNG_OK) {
+ return (rv);
+ }
+
+ return (NNG_OK);
+}
+
+static nng_err
+dtls_listener_init(void *arg, nng_url *url, nni_listener *nlistener)
+{
+ dtls_ep *ep = arg;
+ nng_err rv;
+ nni_sock *sock = nni_listener_sock(nlistener);
+
+ ep->nlistener = nlistener;
+ if ((rv = dtls_ep_init(ep, url, sock, NULL, nlistener)) != NNG_OK) {
+ return (rv);
+ }
+ // Check for invalid URL components.
+ if (((rv = dtls_check_url(url, true)) != NNG_OK) ||
+ ((rv = nni_url_to_address(&ep->self_sa, url)) != NNG_OK)) {
+ return (rv);
+ }
+
+ return (NNG_OK);
+}
+
+static void
+dtls_ep_cancel(nni_aio *aio, void *arg, nng_err rv)
+{
+ dtls_ep *ep = arg;
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_aio_abort(&ep->resaio, rv);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_resolv_cb(void *arg)
+{
+ dtls_ep *ep = arg;
+ dtls_pipe *p;
+ nni_aio *aio;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ if ((aio = nni_list_first(&ep->connaios)) == NULL) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if (ep->closed) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if ((rv = nni_aio_result(&ep->resaio)) != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&ep->mtx);
+ nng_log_warn("NNG-RESOLV", "Failed resolving IP address: %s",
+ nng_strerror(rv));
+ return;
+ }
+
+ // Choose the right port to bind to. The family must match.
+ if (ep->self_sa.s_family == NNG_AF_UNSPEC) {
+ ep->self_sa.s_family = ep->peer_sa.s_family;
+ }
+
+ if (ep->udp == NULL) {
+ if ((rv = nng_udp_open(&ep->udp, &ep->self_sa)) != NNG_OK) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ }
+
+ if ((rv = dtls_pipe_alloc(ep, &p, &ep->peer_sa)) != NNG_OK) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ dtls_ep_start(ep);
+
+ // Send out the connection request. We don't complete
+ // the user aio until we confirm a connection, so that
+ // we can supply details like maximum receive message size
+ // and the protocol the peer is using.
+ p->send_op = OPCODE_CREQ;
+ dtls_pipe_send_tls(p);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+dtls_ep_connect(void *arg, nni_aio *aio)
+{
+ dtls_ep *ep = arg;
+
+ nni_mtx_lock(&ep->mtx);
+ if (!nni_aio_start(aio, dtls_ep_cancel, ep)) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if (ep->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if (ep->started) {
+ nni_aio_finish_error(aio, NNG_EBUSY);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ NNI_ASSERT(nni_list_empty(&ep->connaios));
+ ep->dialer = true;
+
+ nni_list_append(&ep->connaios, aio);
+
+ // lookup the IP address
+
+ memset(&ep->resolv, 0, sizeof(ep->resolv));
+ ep->resolv.ri_family = ep->af;
+ ep->resolv.ri_host = ep->url->u_hostname;
+ ep->resolv.ri_port = ep->url->u_port;
+ ep->resolv.ri_passive = false;
+ ep->resolv.ri_sa = &ep->peer_sa;
+ nni_aio_set_timeout(&ep->resaio, NNI_SECOND * 5);
+ nni_resolv(&ep->resolv, &ep->resaio);
+
+ // wake up for retries
+ nni_aio_abort(&ep->timeaio, NNG_EINTR);
+
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static nng_err
+dtls_ep_get_port(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ dtls_ep *ep = arg;
+ nng_sockaddr sa;
+ int port;
+ uint8_t *paddr;
+
+ nni_mtx_lock(&ep->mtx);
+ if (ep->udp != NULL) {
+ (void) nng_udp_sockname(ep->udp, &sa);
+ } else {
+ sa = ep->self_sa;
+ }
+ switch (sa.s_family) {
+ case NNG_AF_INET:
+ paddr = (void *) &sa.s_in.sa_port;
+ break;
+
+ case NNG_AF_INET6:
+ paddr = (void *) &sa.s_in6.sa_port;
+ break;
+
+ default:
+ paddr = NULL;
+ break;
+ }
+ nni_mtx_unlock(&ep->mtx);
+
+ if (paddr == NULL) {
+ return (NNG_ESTATE);
+ }
+
+ NNI_GET16(paddr, port);
+ return (nni_copyout_int(port, buf, szp, t));
+}
+
+static nng_err
+dtls_ep_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ dtls_ep *ep = arg;
+ nng_sockaddr sa;
+
+ nni_mtx_lock(&ep->mtx);
+ if (ep->udp != NULL) {
+ (void) nng_udp_sockname(ep->udp, &sa);
+ } else {
+ sa = ep->self_sa;
+ }
+ nni_mtx_unlock(&ep->mtx);
+
+ return (nni_copyout_sockaddr(&sa, v, szp, t));
+}
+
+static nng_err
+dtls_ep_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ dtls_ep *ep = arg;
+ nng_err rv;
+
+ if (!ep->dialer) {
+ return (NNG_ENOTSUP);
+ }
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_sockaddr(&ep->peer_sa, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
+}
+
+static nng_err
+dtls_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ dtls_ep *ep = arg;
+ nng_err rv;
+
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_size(ep->rcvmax, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
+}
+
+static nng_err
+dtls_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
+{
+ dtls_ep *ep = arg;
+ size_t val;
+ nng_err rv;
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNG_DTLS_RECVMAX, t)) == 0) {
+ if ((val == 0) || (val > NNG_DTLS_RECVMAX)) {
+ val = NNG_DTLS_RECVMAX;
+ }
+ nni_mtx_lock(&ep->mtx);
+ if (ep->started) {
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_EBUSY);
+ }
+ ep->rcvmax = (uint16_t) val;
+ nni_stat_set_value(&ep->st_rcv_max, val);
+ nni_mtx_unlock(&ep->mtx);
+ }
+ return (rv);
+}
+
+static nng_err
+dtls_ep_set_tls(void *arg, nng_tls_config *cfg)
+{
+ dtls_ep *ep = arg;
+ nni_mtx_lock(&ep->mtx);
+ if (ep->started) {
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_EBUSY);
+ }
+ ep->tlscfg = cfg;
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_OK);
+}
+
+static nng_err
+dtls_ep_get_tls(void *arg, nng_tls_config **cfgp)
+{
+ dtls_ep *ep = arg;
+ nni_mtx_lock(&ep->mtx);
+ *cfgp = ep->tlscfg;
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_OK);
+}
+
+// this just looks for pipes waiting for an aio, and aios waiting for
+// a connection, and matches them together.
+static void
+dtls_ep_match(dtls_ep *ep)
+{
+ nng_aio *aio = nni_list_first(&ep->connaios);
+ dtls_pipe *p = nni_list_first(&ep->connpipes);
+
+ if ((aio == NULL) || (p == NULL)) {
+ return;
+ }
+
+ nni_aio_list_remove(aio);
+ nni_list_remove(&ep->connpipes, p);
+ nni_aio_set_output(aio, 0, p->npipe);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+dtls_ep_start(dtls_ep *ep)
+{
+ ep->started = true;
+ dtls_start_rx(ep);
+}
+
+static nng_err
+dtls_ep_bind(void *arg, nng_url *url)
+{
+ dtls_ep *ep = arg;
+ nng_err rv;
+
+ nni_mtx_lock(&ep->mtx);
+ if (ep->started) {
+ nni_mtx_unlock(&ep->mtx);
+ return (NNG_EBUSY);
+ }
+
+ rv = nng_udp_open(&ep->udp, &ep->self_sa);
+ if (rv != NNG_OK) {
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
+ }
+ nng_sockaddr sa;
+ nng_udp_sockname(ep->udp, &sa);
+ url->u_port = nng_sockaddr_port(&sa);
+ dtls_ep_start(ep);
+ nni_mtx_unlock(&ep->mtx);
+
+ return (rv);
+}
+
+static void
+dtls_ep_accept(void *arg, nni_aio *aio)
+{
+ dtls_ep *ep = arg;
+
+ nni_aio_reset(aio);
+ nni_mtx_lock(&ep->mtx);
+ if (ep->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if (!nni_aio_start(aio, dtls_ep_cancel, ep)) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_aio_list_append(&ep->connaios, aio);
+ dtls_ep_match(ep);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static nni_sp_pipe_ops dtls_pipe_ops = {
+ .p_size = dtls_pipe_size,
+ .p_init = dtls_pipe_init,
+ .p_fini = dtls_pipe_fini,
+ .p_stop = dtls_pipe_stop,
+ .p_send = dtls_pipe_send,
+ .p_recv = dtls_pipe_recv,
+ .p_close = dtls_pipe_close,
+ .p_peer = dtls_pipe_peer,
+ .p_getopt = dtls_pipe_getopt,
+};
+
+static const nni_option dtls_ep_opts[] = {
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_get = dtls_ep_get_recvmaxsz,
+ .o_set = dtls_ep_set_recvmaxsz,
+ },
+ {
+ .o_name = NNG_OPT_LOCADDR,
+ .o_get = dtls_ep_get_locaddr,
+ },
+ {
+ .o_name = NNG_OPT_REMADDR,
+ .o_get = dtls_ep_get_remaddr,
+ },
+ {
+ .o_name = NNG_OPT_TCP_BOUND_PORT,
+ .o_get = dtls_ep_get_port,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nng_err
+dtls_dialer_getopt(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ dtls_ep *ep = arg;
+
+ return (nni_getopt(dtls_ep_opts, name, ep, buf, szp, t));
+}
+
+static nng_err
+dtls_dialer_setopt(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ dtls_ep *ep = arg;
+
+ return (nni_setopt(dtls_ep_opts, name, ep, buf, sz, t));
+}
+
+static nng_err
+dtls_listener_getopt(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ dtls_ep *ep = arg;
+
+ return (nni_getopt(dtls_ep_opts, name, ep, buf, szp, t));
+}
+
+static nng_err
+dtls_listener_setopt(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ dtls_ep *ep = arg;
+
+ return (nni_setopt(dtls_ep_opts, name, ep, buf, sz, t));
+}
+
+static nni_sp_dialer_ops dtls_dialer_ops = {
+ .d_size = sizeof(dtls_ep),
+ .d_init = dtls_dialer_init,
+ .d_fini = dtls_ep_fini,
+ .d_connect = dtls_ep_connect,
+ .d_close = dtls_ep_close,
+ .d_stop = dtls_ep_stop,
+ .d_set_tls = dtls_ep_set_tls,
+ .d_get_tls = dtls_ep_get_tls,
+ .d_getopt = dtls_dialer_getopt,
+ .d_setopt = dtls_dialer_setopt,
+};
+
+static nni_sp_listener_ops dtls_listener_ops = {
+ .l_size = sizeof(dtls_ep),
+ .l_init = dtls_listener_init,
+ .l_fini = dtls_ep_fini,
+ .l_bind = dtls_ep_bind,
+ .l_accept = dtls_ep_accept,
+ .l_close = dtls_ep_close,
+ .l_stop = dtls_ep_stop,
+ .l_set_tls = dtls_ep_set_tls,
+ .l_get_tls = dtls_ep_get_tls,
+ .l_getopt = dtls_listener_getopt,
+ .l_setopt = dtls_listener_setopt,
+};
+
+static nni_sp_tran dtls_tran = {
+ .tran_scheme = "dtls",
+ .tran_dialer = &dtls_dialer_ops,
+ .tran_listener = &dtls_listener_ops,
+ .tran_pipe = &dtls_pipe_ops,
+ .tran_init = dtls_tran_init,
+ .tran_fini = dtls_tran_fini,
+};
+
+static nni_sp_tran dtls4_tran = {
+ .tran_scheme = "dtls4",
+ .tran_dialer = &dtls_dialer_ops,
+ .tran_listener = &dtls_listener_ops,
+ .tran_pipe = &dtls_pipe_ops,
+ .tran_init = dtls_tran_init,
+ .tran_fini = dtls_tran_fini,
+};
+
+#ifdef NNG_ENABLE_IPV6
+static nni_sp_tran dtls6_tran = {
+ .tran_scheme = "dtls6",
+ .tran_dialer = &dtls_dialer_ops,
+ .tran_listener = &dtls_listener_ops,
+ .tran_pipe = &dtls_pipe_ops,
+ .tran_init = dtls_tran_init,
+ .tran_fini = dtls_tran_fini,
+};
+#endif
+
+void
+nni_sp_dtls_register(void)
+{
+ nni_sp_tran_register(&dtls_tran);
+ nni_sp_tran_register(&dtls4_tran);
+#ifdef NNG_ENABLE_IPV6
+ nni_sp_tran_register(&dtls6_tran);
+#endif
+}