diff options
Diffstat (limited to 'src/sp/transport/dtls/dtls.c')
| -rw-r--r-- | src/sp/transport/dtls/dtls.c | 1802 |
1 files changed, 1802 insertions, 0 deletions
diff --git a/src/sp/transport/dtls/dtls.c b/src/sp/transport/dtls/dtls.c new file mode 100644 index 00000000..02961a56 --- /dev/null +++ b/src/sp/transport/dtls/dtls.c @@ -0,0 +1,1802 @@ +// Copyright 2025 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/aio.h" +#include "core/defs.h" +#include "core/idhash.h" +#include "core/message.h" +#include "core/nng_impl.h" +#include "core/options.h" +#include "core/pipe.h" +#include "core/platform.h" +#include "core/socket.h" +#include "core/stats.h" +#include "nng/nng.h" +#include "supplemental/tls/tls_common.h" + +#include <string.h> + +// Experimental DTLS transport. Unicast only. +// +typedef struct dtls_pipe dtls_pipe; +typedef struct dtls_ep dtls_ep; +typedef struct dtls_conn dtls_conn; + +const uint8_t PROTO_VERSION = 1; + +// OP code, 8 bits +enum dtls_opcode { + OPCODE_DATA = 0, + OPCODE_CREQ = 1, + OPCODE_CACK = 2, + OPCODE_DISC = 3, +}; + +// Disconnect reason, must be 16 bits +typedef enum dtls_disc_reason { + DISC_CLOSED = 0, // normal close + DISC_TYPE = 1, // bad SP type + DISC_NOTCONN = 2, // no such connection + DISC_REFUSED = 3, // refused by policy + DISC_MSGSIZE = 4, // message too large + DISC_NEGO = 5, // neogtiation failed + DISC_INACTIVE = 6, // closed due to inactivity + DISC_PROTO = 7, // other protocol error + DISC_NOBUF = 8, // resources exhausted +} dtls_disc_reason; + +#ifndef NNG_DTLS_TXQUEUE_LEN +#define NNG_DTLS_TXQUEUE_LEN 32 +#endif + +#ifndef NNG_DTLS_RXQUEUE_LEN +#define NNG_DTLS_RXQUEUE_LEN 16 +#endif + +// The maximum TLS record size +#define DTLS_MAX_RECORD 16384 + +// For DTLS we use a maximum record size of 16384, +// but we reserve some space for headers. DTLS needs +// 13 bytes, and the transport layer needs 8 bytes. +// To leave some room for the future, we just trim to 64 bytes. +#ifndef NNG_DTLS_RECVMAX +#define NNG_DTLS_RECVMAX (DTLS_MAX_RECORD - 64) +#endif + +#ifndef NNG_DTLS_REFRESH +#define NNG_DTLS_REFRESH (5 * NNI_SECOND) +#endif + +#ifndef NNG_DTLS_CONNRETRY +#define NNG_DTLS_CONNRETRY (NNI_SECOND / 5) +#endif + +// 64-bit protocol header +typedef struct dtls_sp_hdr { + uint8_t us_ver; + uint8_t us_op_code; + uint16_t us_type; + uint16_t us_params[2]; +} dtls_sp_hdr; + +// DTLS pipe resend (CREQ) in msec (nng_duration) +#define DTLS_PIPE_REFRESH(p) ((p)->refresh) + +// DTLS pipe timeout in msec (nng_duration) +#define DTLS_PIPE_TIMEOUT(p) ((p)->refresh * 5) + +struct dtls_pipe { + dtls_ep *ep; + nni_pipe *npipe; + nng_sockaddr peer_addr; + uint64_t id; // hash of peer address + uint16_t peer; + uint16_t proto; + bool matched; // true if have matched and given this to SP + bool closed; // true if we are closed (no more send or recv!) + bool dialer; // true if we are dialer + nng_duration refresh; // seconds, for the protocol + nng_time next_wake; + nng_time expire; // inactivity expiration time + nng_time next_refresh; + nni_list_node node; + nni_lmq rx_mq; + + // Upper layer queues. These are between the PIPE and SP. + bool send_busy; // true if send is in process + uint16_t send_max; // peer's max recv size + nni_list send_aios; + uint8_t *send_buf; + size_t send_bufsz; + nng_aio send_tls_aio; + + bool recv_busy; + bool recv_rdy; // receive is done and data in recvbuf + uint16_t recv_max; // max recv size + nni_list recv_aios; + uint8_t *recv_buf; + size_t recv_bufsz; + nng_aio recv_tls_aio; + + // Lower layer queues. These are between the + + uint8_t send_op; // usually OPCODE_DATA + uint8_t last_op; // last op code we sent + uint16_t reason; // only for disconnect + + nni_mtx lower_mtx; // protects the lower rx_q, etc. + + // This is the lower level RX buffer, which contains only + // received ciphertext (content before passed to TLS layer for + // decrypt). The actual pointers may change, as we "swap" + // buffers between the endpoint and the pipe to avoid copying. + nni_list rx_q; // lower aio from the TLS layer + + nni_tls_conn tls; +}; + +struct dtls_ep { + nng_udp *udp; + nni_mtx mtx; + uint16_t proto; + uint16_t peer; + uint16_t af; // address family + bool fini; + bool started; + bool closed; + nng_url *url; + const char *host; // for dialers + nni_aio *useraio; + nni_aio *connaio; + nni_aio timeaio; + nni_aio resaio; + bool dialer; + nni_listener *nlistener; + nni_dialer *ndialer; + nni_msg *rx_payload; // current receive message + nng_sockaddr rx_sa; // addr for last message + nni_aio tx_aio; // aio for TX handling + nni_aio rx_aio; // aio for RX handling + nni_id_map pipes; // pipes (indexed by id) + nni_sockaddr self_sa; // our address + nni_sockaddr peer_sa; // peer address, only for dialer; + nni_list connaios; // aios from accept waiting for a client peer + nni_list connpipes; // pipes waiting to be connected + nng_duration refresh; // refresh interval for connections in seconds + uint16_t rcvmax; // max payload, trimmed to uint16_t + nni_resolv_item resolv; + + nng_tls_config *tlscfg; + + size_t rx_size; // size of the rx buffer + void *rx_buf; + + nni_stat_item st_rcv_max; + nni_stat_item st_rcv_reorder; + nni_stat_item st_rcv_toobig; + nni_stat_item st_rcv_nomatch; + nni_stat_item st_rcv_copy; + nni_stat_item st_rcv_nocopy; + nni_stat_item st_rcv_nobuf; + nni_stat_item st_snd_toobig; + nni_stat_item st_snd_nobuf; + nni_stat_item st_peer_inactive; + nni_stat_item st_copy_max; +}; + +static void dtls_ep_start(dtls_ep *); +static void dtls_resolv_cb(void *); +static void dtls_rx_cb(void *); + +static void dtls_ep_match(dtls_ep *ep); +static void dtls_remove_pipe(dtls_pipe *p); + +// BIO send/recv functions for use by the common TLS layer. + +static void +dtls_bio_cancel(nng_aio *aio, void *arg, nng_err rv) +{ + dtls_pipe *p = arg; + nni_mtx_lock(&p->lower_mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + } + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&p->lower_mtx); +} + +static void +dtls_bio_recv_done(dtls_pipe *p) +{ + nng_aio *aio; + uint8_t *ptr; + size_t resid; + nni_msg *msg; + + while ((!nni_lmq_empty(&p->rx_mq)) && + ((aio = nni_list_first(&p->rx_q)) != NULL)) { + + nni_aio_list_remove(aio); + nni_lmq_get(&p->rx_mq, &msg); + + // assumption we only have a body, because we don't bother to + // fill in the header for raw UDP. + + resid = nni_msg_len(msg); + ptr = nni_msg_body(msg); + + for (unsigned i = 0; i < aio->a_nio && resid > 0; i++) { + size_t num = resid > aio->a_iov[i].iov_len + ? aio->a_iov[i].iov_len + : resid; + memcpy(aio->a_iov[i].iov_buf, ptr, num); + ptr += num; + resid -= num; + } + nni_aio_finish(aio, NNG_OK, nni_msg_len(msg)); + nni_msg_free(msg); + } +} + +static void +dtls_bio_recv(void *arg, nng_aio *aio) +{ + dtls_pipe *p = arg; + + nni_mtx_lock(&p->lower_mtx); + if (!nni_aio_start(aio, dtls_bio_cancel, p)) { + nni_mtx_unlock(&p->lower_mtx); + return; + } + + nni_aio_list_append(&p->rx_q, aio); + dtls_bio_recv_done(p); + nni_mtx_unlock(&p->lower_mtx); +} + +static void +dtls_bio_send(void *arg, nng_aio *aio) +{ + dtls_pipe *p = arg; + + nni_mtx_lock(&p->lower_mtx); + if (!p->closed) { + nni_aio_set_input(aio, 0, &p->peer_addr); + nng_udp_send(p->ep->udp, aio); + } + nni_mtx_unlock(&p->lower_mtx); +} + +static void +dtls_bio_free(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +dtls_bio_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +dtls_bio_stop(void *arg) +{ + dtls_pipe *p = arg; + nni_aio_stop(&p->recv_tls_aio); + nni_aio_stop(&p->send_tls_aio); +} + +static nni_tls_bio_ops dtls_bio_ops = { + .bio_send = dtls_bio_send, + .bio_recv = dtls_bio_recv, + .bio_close = dtls_bio_close, + .bio_stop = dtls_bio_stop, + .bio_free = dtls_bio_free, +}; + +static void +dtls_tran_init(void) +{ +} + +static void +dtls_tran_fini(void) +{ +} + +// +// Upper layer functions - moving data between TLS and SP. +// TLS acts as kind of a stream for us, so we only see the +// data that is meant for us, but we will send and receive +// control messages that are not just data payloads. +// + +static void dtls_pipe_send_cancel(nng_aio *, void *, nng_err); +static void dtls_pipe_send_tls(dtls_pipe *); +static void dtls_pipe_send_tls_cb(void *arg); + +static void +dtls_pipe_send(void *arg, nni_aio *aio) +{ + dtls_pipe *p = arg; + dtls_ep *ep; + nng_msg *msg; + size_t count = 0; + size_t sndmax; + + msg = nni_aio_get_msg(aio); + ep = p->ep; + + if (msg != NULL) { + count = nni_msg_len(msg) + nni_msg_header_len(msg); + } + + nni_mtx_lock(&ep->mtx); + sndmax = p->send_max; + if (!nni_aio_start(aio, dtls_pipe_send_cancel, p)) { + nni_mtx_unlock(&ep->mtx); + return; + } + + nni_aio_reset(aio); + if ((nni_msg_len(msg) + nni_msg_header_len(msg)) > sndmax) { + // rather failing this with an error, we just drop it + // on the floor. this is on the sender, so there isn't + // a compelling need to disconnect the pipe, since it + // we're not being "ill-behaved" to our peer. + nni_stat_inc(&ep->st_snd_toobig, 1); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish(aio, 0, count); + nni_msg_free(msg); + return; + } + + nni_aio_list_append(&p->send_aios, aio); + dtls_pipe_send_tls(p); + nni_mtx_unlock(&ep->mtx); +} + +static void +dtls_pipe_send_cancel(nng_aio *aio, void *arg, nng_err err) +{ + dtls_pipe *p = arg; + nni_mtx_lock(&p->ep->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, err); + } + nni_mtx_unlock(&p->ep->mtx); +} + +// Lower layer send/recv functions, used by the pipe layer. + +static void +dtls_pipe_send_tls(dtls_pipe *p) +{ + nni_aio *aio; + nng_msg *msg; + uint8_t opcode; + nng_iov iov; + dtls_sp_hdr *hdr = (void *) p->send_buf; + + if (p->send_busy || p->closed) { + return; + } + + opcode = p->send_op; + // reset the last op + p->send_op = OPCODE_DATA; + + hdr->us_ver = PROTO_VERSION; + hdr->us_op_code = opcode; + NNI_PUT16LE(&hdr->us_type, p->proto); + hdr->us_params[0] = 0; + hdr->us_params[1] = 0; + + iov.iov_buf = hdr; + iov.iov_len = sizeof(*hdr); + + switch (opcode) { + case OPCODE_DATA: + for (;;) { + if ((aio = nni_list_first(&p->send_aios)) == NULL) { + // no work for us! + return; + } + nni_aio_list_remove(aio); + msg = nni_aio_get_msg(aio); + if (nni_msg_header_len(msg) + nni_msg_len(msg) + + sizeof(*hdr) > + p->send_bufsz) { + nng_msg_free(msg); + nni_aio_finish_error(aio, NNG_EMSGSIZE); + continue; + } + break; // for loop + } + + size_t len = nni_msg_header_len(msg); + uint8_t *data = (void *) (hdr + 1); + memcpy(data, nni_msg_header(msg), len); + data += len; + memcpy(data, nni_msg_body(msg), nni_msg_len(msg)); + len += nni_msg_len(msg); + + NNI_PUT16LE(&hdr->us_params[0], (uint16_t) len); + iov.iov_len += len; + + nni_msg_free(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + break; + + case OPCODE_CREQ: + case OPCODE_CACK: + NNI_PUT16LE(&hdr->us_params[0], p->recv_max); + NNI_PUT16LE(&hdr->us_params[1], p->refresh); + break; + + case OPCODE_DISC: + NNI_PUT16LE(&hdr->us_params[0], p->reason); + p->closed = true; + break; + default: + NNI_ASSERT(false); // this should never happen! + // fall back to sending a disconnect + hdr->us_op_code = OPCODE_DISC; + NNI_PUT16LE(&hdr->us_params[0], DISC_PROTO); + } + + p->last_op = opcode; + p->send_busy = true; + nni_aio_set_iov(&p->send_tls_aio, 1, &iov); + nni_tls_send(&p->tls, &p->send_tls_aio); +} + +static void +dtls_pipe_send_tls_cb(void *arg) +{ + dtls_pipe *p = arg; + + nni_mtx_lock(&p->ep->mtx); + + p->send_busy = false; + if (nni_aio_result(&p->send_tls_aio) != NNG_OK || + p->last_op == OPCODE_DISC) { + nni_pipe_close(p->npipe); + if (p->matched == 0) { + dtls_remove_pipe(p); + } + nni_mtx_unlock(&p->ep->mtx); + return; + } + dtls_pipe_send_tls(p); + nni_mtx_unlock(&p->ep->mtx); +} + +// RECV SIDE + +static void dtls_pipe_recv_cancel(nni_aio *, void *, nng_err); +static void dtls_pipe_recv_tls(dtls_pipe *); + +static void +dtls_pipe_recv(void *arg, nni_aio *aio) +{ + dtls_pipe *p = arg; + dtls_ep *ep = p->ep; + + nni_aio_reset(aio); + nni_mtx_lock(&ep->mtx); + if (p->closed) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if (!nni_aio_start(aio, dtls_pipe_recv_cancel, p)) { + nni_mtx_unlock(&ep->mtx); + return; + } + + nni_list_append(&p->recv_aios, aio); + dtls_pipe_recv_tls(p); + nni_mtx_unlock(&ep->mtx); +} + +static void +dtls_pipe_recv_cancel(nni_aio *aio, void *arg, nng_err rv) +{ + dtls_pipe *p = arg; + dtls_ep *ep = p->ep; + + nni_mtx_lock(&ep->mtx); + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&ep->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); +} + +static void +dtls_pipe_recv_tls_start(dtls_pipe *p) +{ + nng_iov iov; + if (p->recv_busy || p->closed) { + return; + } + p->recv_busy = true; + iov.iov_buf = p->recv_buf; + iov.iov_len = p->recv_bufsz; + + nni_aio_set_iov(&p->recv_tls_aio, 1, &iov); + nni_tls_recv(&p->tls, &p->recv_tls_aio); +} + +static void +dtls_pipe_recv_tls(dtls_pipe *p) +{ + nng_aio *aio = nni_list_first(&p->recv_aios); + size_t len; + nng_msg *msg; + int rv; + + if (aio == NULL) { + return; + } + if (!p->recv_rdy) { + dtls_pipe_recv_tls_start(p); + return; + } + + p->recv_rdy = false; + + nni_aio_list_remove(aio); + len = nng_aio_count(&p->recv_tls_aio); + NNI_ASSERT(len >= sizeof(dtls_sp_hdr)); + len -= sizeof(dtls_sp_hdr); + + if ((rv = nni_msg_alloc(&msg, len)) != NNG_OK) { + nni_aio_finish_error(aio, rv); + return; + } + + memcpy(nng_msg_body(msg), p->recv_buf + sizeof(dtls_sp_hdr), len); + nni_aio_finish_msg(aio, msg); +} + +static void +dtls_pipe_recv_tls_cb(void *arg) +{ + dtls_pipe *p = arg; + dtls_ep *ep = p->ep; + dtls_sp_hdr *hdr = (void *) p->recv_buf; + nng_aio *aio = &p->recv_tls_aio; + uint16_t proto; + uint16_t refresh; + uint16_t rcvmax; + nng_err rv; + + nni_mtx_lock(&ep->mtx); + p->recv_busy = false; + + if ((rv = nni_aio_result(aio)) != NNG_OK) { + + // If we didn't connect yet, issue an error so the peer can see + // a connection failure (e.g. if we failed the TLS handshake.) + if (p->dialer && !p->matched) { + nni_aio *caio; + if ((caio = nni_list_first(&ep->connaios)) != NULL) { + nni_aio_list_remove(caio); + nni_aio_finish_error(caio, rv); + } + } + + // Bump a bad receive stat (e.g. someone may have sent us + // garbage.) We do not acknowledge or handle garbage frames + // sent to an open session. + nni_pipe_close(p->npipe); + nni_mtx_unlock(&ep->mtx); + return; + } + + // We had a "good" receive (TLS passed at least) from the peer. + + if (nni_aio_count(aio) < sizeof(*hdr)) { + // Runt frame. + p->send_op = OPCODE_DISC; + p->reason = DISC_PROTO; + goto bad; + } + + if (nni_aio_count(aio) > sizeof(*hdr) + p->recv_max) { + p->send_op = OPCODE_DISC; + p->reason = DISC_MSGSIZE; + goto bad; + } + + if (hdr->us_ver != PROTO_VERSION) { + // Bad protocol version + p->send_op = OPCODE_DISC; + p->reason = DISC_PROTO; + goto bad; + } + NNI_GET16LE(&hdr->us_type, proto); + if (proto != p->peer) { + // Bad SP protocol type + p->send_op = OPCODE_DISC; + p->reason = DISC_TYPE; + goto bad; + } + + p->expire = nni_clock() + DTLS_PIPE_TIMEOUT(p); + + if (!p->matched) { + p->matched = true; + nni_list_append(&p->ep->connpipes, p); + dtls_ep_match(p->ep); + } + + switch (hdr->us_op_code) { + case OPCODE_CREQ: + if (p->dialer) { + // dialers don't accept requests + goto bad; + } + NNI_GET16LE(&hdr->us_params[0], rcvmax); + NNI_GET16LE(&hdr->us_params[1], refresh); + if ((refresh > 0) && ((refresh * NNI_SECOND) < p->refresh)) { + p->refresh = refresh * NNI_SECOND; + } + if ((rcvmax > 0) && (rcvmax < NNG_DTLS_RECVMAX)) { + p->send_max = rcvmax; + } + // schedule the CACK reply + p->send_op = OPCODE_CACK; + break; + + case OPCODE_CACK: + if (!p->dialer) { + goto bad; + } + NNI_GET16LE(&hdr->us_params[0], rcvmax); + NNI_GET16LE(&hdr->us_params[0], refresh); + + if ((refresh > 0) && ((refresh * NNI_SECOND) < p->refresh)) { + p->refresh = refresh * NNI_SECOND; + } + if ((rcvmax > 0) && (rcvmax < NNG_DTLS_RECVMAX)) { + p->send_max = rcvmax; + } + break; + + case OPCODE_DISC: + p->closed = true; + nni_mtx_unlock(&ep->mtx); + nni_pipe_close(p->npipe); + return; + + case OPCODE_DATA: + p->recv_rdy = true; + dtls_pipe_recv_tls(p); + nni_mtx_unlock(&ep->mtx); + return; + } +bad: + if (p->send_op != OPCODE_DATA) { + dtls_pipe_send_tls(p); + } + dtls_pipe_recv_tls_start(p); + nni_mtx_unlock(&ep->mtx); +} + +static void +dtls_pipe_close(void *arg) +{ + dtls_pipe *p = arg; + dtls_ep *ep = p->ep; + nni_aio *aio; + + nni_mtx_lock(&ep->mtx); + while ((aio = nni_list_first(&p->recv_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + while ((aio = nni_list_first(&p->send_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + if (!p->matched) { + dtls_remove_pipe(p); + } else { + p->send_op = OPCODE_DISC; + p->reason = DISC_CLOSED; + dtls_pipe_send_tls(p); + } + nni_mtx_unlock(&ep->mtx); +} + +static nng_err dtls_add_pipe(dtls_ep *ep, dtls_pipe *p); + +static void +dtls_pipe_stop(void *arg) +{ + dtls_pipe *p = arg; + dtls_ep *ep = p->ep; + + dtls_pipe_close(arg); + + nni_mtx_lock(&ep->mtx); + dtls_remove_pipe(p); + nni_list_node_remove(&p->node); + nni_mtx_unlock(&ep->mtx); +} + +static int +dtls_pipe_alloc(dtls_ep *ep, dtls_pipe **pp, const nng_sockaddr *sa) +{ + dtls_pipe *p; + nng_err rv; + + if (ep->dialer) { + rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer); + } else { + rv = nni_pipe_alloc_listener((void *) &p, ep->nlistener); + } + if (rv != NNG_OK) { + nng_log_err("NNG-DTLS-PIPE-ALLOC-FAIL", + "Failed allocating pipe for DTLS: %s", nng_strerror(rv)); + return (rv); + } + p->dialer = ep->dialer; + p->ep = ep; + p->proto = ep->proto; + p->peer = ep->peer; + p->peer_addr = *sa; + p->id = nng_sockaddr_hash(sa); + p->refresh = ep->refresh; + p->send_max = NNG_DTLS_RECVMAX; + p->recv_max = ep->rcvmax; + *pp = p; + + if (((rv = dtls_add_pipe(ep, p)) != NNG_OK) || + ((rv = nni_tls_init(&p->tls, ep->tlscfg)) != NNG_OK) || + ((rv = nni_tls_start(&p->tls, &dtls_bio_ops, p, sa)) != NNG_OK)) { + nni_pipe_close(p->npipe); + nng_log_err("NNG-DTLS-PIPE-ADD-FAIL", + "Failed adding pipe for DTLS: %s", nng_strerror(rv)); + return (rv); + } + + // We need to start a receiver on the pipe. + dtls_pipe_recv_tls_start(p); + + // Also start TLS up and running. + switch (nni_tls_run(&p->tls)) { + case NNG_OK: + case NNG_EAGAIN: + break; + default: + nni_pipe_close(p->npipe); + break; + } + + // wake the timer so it knows to resubmit + nni_aio_abort(&ep->timeaio, NNG_ETIMEDOUT); + + return (NNG_OK); +} + +static size_t +dtls_pipe_size(void) +{ + return (NNI_ALIGN_UP(sizeof(dtls_pipe)) + + NNI_ALIGN_UP(nni_tls_engine_conn_size())); +} + +static int +dtls_pipe_init(void *arg, nni_pipe *npipe) +{ + dtls_pipe *p = arg; + p->npipe = npipe; + + size_t bufsz = DTLS_MAX_RECORD; // TODO: Make this a tunable. + + if ((p->recv_buf = nni_alloc(bufsz)) == NULL) { + return (NNG_ENOMEM); + } + if ((p->send_buf = nni_alloc(bufsz)) == NULL) { + return (NNG_ENOMEM); + } + p->recv_bufsz = bufsz; + p->send_bufsz = bufsz; + nni_mtx_init(&p->lower_mtx); + nni_aio_init(&p->recv_tls_aio, dtls_pipe_recv_tls_cb, p); + nni_aio_init(&p->send_tls_aio, dtls_pipe_send_tls_cb, p); + nni_aio_list_init(&p->rx_q); + nni_aio_list_init(&p->recv_aios); + nni_aio_list_init(&p->send_aios); + nni_lmq_init(&p->rx_mq, NNG_DTLS_RXQUEUE_LEN); + + return (0); +} + +static void +dtls_pipe_fini(void *arg) +{ + dtls_pipe *p = arg; + nng_msg *m; + + nni_tls_fini(&p->tls); + nni_aio_fini(&p->recv_tls_aio); + nni_aio_fini(&p->send_tls_aio); + if (p->recv_buf != NULL) { + nni_free(p->recv_buf, p->recv_bufsz); + } + if (p->send_buf != NULL) { + nni_free(p->send_buf, p->send_bufsz); + } + nni_mtx_lock(&p->lower_mtx); + while (!nni_lmq_empty(&p->rx_mq)) { + nni_lmq_get(&p->rx_mq, &m); + nni_msg_free(m); + } + nni_mtx_unlock(&p->lower_mtx); + nni_mtx_fini(&p->lower_mtx); + nni_lmq_fini(&p->rx_mq); + NNI_ASSERT(nni_list_empty(&p->recv_aios)); + NNI_ASSERT(nni_list_empty(&p->send_aios)); +} + +static dtls_pipe * +dtls_find_pipe(dtls_ep *ep, const nng_sockaddr *peer_addr) +{ + uint64_t id = nng_sockaddr_hash(peer_addr); + dtls_pipe *p; + + // we'll keep incrementing id until we conclusively match + // or we get a NULL. This is another level of rehashing, but + // it keeps us from having to look up. + for (;;) { + if ((p = nni_id_get(&ep->pipes, id)) == NULL) { + return (NULL); + } + if (nng_sockaddr_equal(&p->peer_addr, peer_addr)) { + return (p); + } + id++; + if (id == 0) { + id = 1; + } + } +} + +static void +dtls_remove_pipe(dtls_pipe *p) +{ + // ep locked + dtls_ep *ep = p->ep; + uint64_t id = p->id; + bool matched = p->matched; + if (id == 0) { + return; + } + p->id = 0; + for (;;) { + dtls_pipe *srch; + if ((srch = nni_id_get(&ep->pipes, id)) == NULL) { + break; + } + if (srch == p) { + nni_id_remove(&ep->pipes, id); + break; + } + id++; + if (id == 0) { + id = 1; + } + } + if (!matched) { + nni_pipe_rele(p->npipe); + } +} + +static nng_err +dtls_add_pipe(dtls_ep *ep, dtls_pipe *p) +{ + // Id must be part of the hash + uint64_t id = p->id; + while (nni_id_get(&ep->pipes, id) != NULL) { + id++; + if (id == 0) { + id = 1; + } + } + return (nni_id_set(&ep->pipes, id, p)); +} + +static void +dtls_start_rx(dtls_ep *ep) +{ + nni_iov iov; + + iov.iov_buf = ep->rx_buf; + iov.iov_len = ep->rx_size; + + nni_aio_reset(&ep->rx_aio); + nni_aio_set_input(&ep->rx_aio, 0, &ep->rx_sa); + nni_aio_set_iov(&ep->rx_aio, 1, &iov); + nng_udp_recv(ep->udp, &ep->rx_aio); +} + +static void +dtls_rx_cb(void *arg) +{ + dtls_ep *ep = arg; + dtls_pipe *p; + nni_aio *aio = &ep->rx_aio; + int rv; + nni_msg *msg; + + nni_mtx_lock(&ep->mtx); + if ((rv = nni_aio_result(aio)) != 0) { + // something bad happened on RX... which is unexpected. + // sleep a little bit and hope for recovery. + switch (nni_aio_result(aio)) { + case NNG_ECLOSED: + case NNG_ECANCELED: + case NNG_ESTOPPED: + nni_mtx_unlock(&ep->mtx); + return; + case NNG_ETIMEDOUT: + case NNG_EAGAIN: + case NNG_EINTR: + default: + goto fail; + } + } + + // If this came from another host, and we are a dialer, we discard. + // Dialers only talk to the party they explicitly dialed. + if (ep->dialer && !nng_sockaddr_equal(&ep->rx_sa, &ep->peer_sa)) { + goto fail; + } + + if ((p = dtls_find_pipe(ep, &ep->rx_sa)) == NULL) { + if (dtls_pipe_alloc(ep, &p, &ep->rx_sa) != NNG_OK) { + goto fail; + } + } + if (p->closed) { + goto fail; + } + NNI_ASSERT(p != NULL); + + if (nni_msg_alloc(&msg, nni_aio_count(aio)) != NNG_OK) { + // TODO BUMP A NO RECV ALLOC STAT + goto fail; + } + memcpy(nni_msg_body(msg), ep->rx_buf, nni_aio_count(aio)); + dtls_start_rx(ep); + nni_mtx_unlock(&ep->mtx); + + nni_mtx_lock(&p->lower_mtx); + + if (nni_lmq_put(&p->rx_mq, msg) != NNG_OK) { + // TODO: BUMP TXQ FULL STAT + nng_msg_free(msg); + } + dtls_bio_recv_done(p); + nni_mtx_unlock(&p->lower_mtx); + + // Run the TLS state machine. + switch (nni_tls_run(&p->tls)) { + case NNG_OK: + case NNG_EAGAIN: + break; + default: + nni_pipe_close(p->npipe); + } + return; + +fail: + // start another receive + dtls_start_rx(ep); + + nni_mtx_unlock(&ep->mtx); +} + +static uint16_t +dtls_pipe_peer(void *arg) +{ + dtls_pipe *p = arg; + + return (p->peer); +} + +static nng_err +dtls_pipe_get_recvmax(void *arg, void *v, size_t *szp, nni_type t) +{ + dtls_pipe *p = arg; + dtls_ep *ep = p->ep; + nng_err rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(p->recv_max, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static nng_err +dtls_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_type t) +{ + dtls_pipe *p = arg; + dtls_ep *ep = p->ep; + nng_err rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_sockaddr(&p->peer_addr, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static nni_option dtls_pipe_options[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_get = dtls_pipe_get_recvmax, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = dtls_pipe_get_remaddr, + }, + { + .o_name = NULL, + }, +}; + +static nng_err +dtls_pipe_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + dtls_pipe *p = arg; + int rv; + + rv = nni_getopt(dtls_pipe_options, name, p, buf, szp, t); + return (rv); +} + +static void +dtls_ep_fini(void *arg) +{ + dtls_ep *ep = arg; + + nni_aio_fini(&ep->timeaio); + nni_aio_fini(&ep->resaio); + nni_aio_fini(&ep->tx_aio); + nni_aio_fini(&ep->rx_aio); + + if (ep->udp != NULL) { + nng_udp_close(ep->udp); + } + if (ep->rx_size != 0) { + nni_free(ep->rx_buf, ep->rx_size); + } + + nni_msg_free(ep->rx_payload); // safe even if msg is null + nni_id_map_fini(&ep->pipes); + nni_mtx_fini(&ep->mtx); +} + +static void +dtls_ep_close(void *arg) +{ + dtls_ep *ep = arg; + nni_aio *aio; + dtls_pipe *p; + uint64_t key; + uint32_t cursor; + + nni_aio_close(&ep->resaio); + nni_aio_close(&ep->rx_aio); + nni_aio_close(&ep->timeaio); + + // leave tx open so we can send disconnects + + nni_mtx_lock(&ep->mtx); + ep->closed = true; + while ((aio = nni_list_first(&ep->connaios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECONNABORTED); + } + cursor = 0; + key = 0; + while (nni_id_visit(&ep->pipes, &key, (void **) &p, &cursor)) { + nni_pipe_close(p->npipe); + } + nni_mtx_unlock(&ep->mtx); +} + +static void +dtls_ep_stop(void *arg) +{ + dtls_ep *ep = arg; + + nni_aio_stop(&ep->resaio); + nni_aio_stop(&ep->rx_aio); + nni_aio_stop(&ep->timeaio); + + nni_mtx_lock(&ep->mtx); + ep->fini = true; + nni_mtx_unlock(&ep->mtx); +} + +// timer handler - sends out additional creqs as needed, +// reaps stale connections, and handles linger. +static void +dtls_timer_cb(void *arg) +{ + dtls_ep *ep = arg; + dtls_pipe *p; + int rv; + + nni_mtx_lock(&ep->mtx); + rv = nni_aio_result(&ep->timeaio); + switch (rv) { + case NNG_ECLOSED: + case NNG_ECANCELED: + case NNG_ESTOPPED: + nni_mtx_unlock(&ep->mtx); + return; + default: + if (ep->closed) { + nni_mtx_unlock(&ep->mtx); + return; + } + break; + } + + uint32_t cursor = 0; + nni_time now = nni_clock(); + nng_duration refresh = NNG_DURATION_INFINITE; + + while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) { + + if (p->closed) { + continue; + } + NNI_ASSERT(p->refresh > 0); + if (p->expire > 0 && now > p->expire) { + char buf[128]; + nng_log_info("NNG-DTLS-INACTIVE", + "Pipe peer %s timed out due to inactivity", + nng_str_sockaddr(&p->peer_addr, buf, sizeof(buf))); + + nni_stat_inc(&ep->st_peer_inactive, 1); + nni_pipe_close(p->npipe); + continue; + } + + if (p->dialer && now > p->next_refresh) { + p->send_op = OPCODE_CREQ; + p->next_refresh = p->expire + p->refresh; + dtls_pipe_send_tls(p); + } + if (refresh == NNG_DURATION_INFINITE && p->refresh > 0) { + refresh = p->refresh; + } else if ((p->refresh > 0) && (p->refresh < refresh)) { + refresh = p->refresh; + } + } + nni_sleep_aio(refresh, &ep->timeaio); + + nni_mtx_unlock(&ep->mtx); +} + +static nng_err +dtls_ep_init( + dtls_ep *ep, nng_url *url, nni_sock *sock, nni_dialer *d, nni_listener *l) +{ + nni_mtx_init(&ep->mtx); + nni_id_map_init(&ep->pipes, 1, 0xFFFFFFFF, true); + NNI_LIST_INIT(&ep->connpipes, dtls_pipe, node); + nni_aio_list_init(&ep->connaios); + + nni_aio_init(&ep->rx_aio, dtls_rx_cb, ep); + nni_aio_init(&ep->timeaio, dtls_timer_cb, ep); + nni_aio_init(&ep->resaio, dtls_resolv_cb, ep); + + if (strcmp(url->u_scheme, "dtls") == 0) { + ep->af = NNG_AF_UNSPEC; + } else if (strcmp(url->u_scheme, "dtls4") == 0) { + ep->af = NNG_AF_INET; + } else if (strcmp(url->u_scheme, "dtls6") == 0) { + ep->af = NNG_AF_INET6; + } else { + return (NNG_EADDRINVAL); + } + + ep->self_sa.s_family = ep->af; + ep->proto = nni_sock_proto_id(sock); + ep->peer = nni_sock_peer_id(sock); + ep->url = url; + ep->refresh = NNG_DTLS_REFRESH; // one minute by default + ep->rcvmax = NNG_DTLS_RECVMAX; + + // receive buffer plus some extra for UDP and TLS headers + if ((ep->rx_buf = nni_alloc(DTLS_MAX_RECORD)) == NULL) { + return (NNG_ENOMEM); + } + ep->rx_size = DTLS_MAX_RECORD; + + NNI_STAT_LOCK(rcv_max_info, "rcv_max", "maximum receive size", + NNG_STAT_LEVEL, NNG_UNIT_BYTES); + NNI_STAT_LOCK(rcv_nomatch_info, "rcv_nomatch", + "messages without a matching connection", NNG_STAT_COUNTER, + NNG_UNIT_MESSAGES); + NNI_STAT_LOCK(rcv_toobig_info, "rcv_toobig", + "received messages rejected because too big", NNG_STAT_COUNTER, + NNG_UNIT_MESSAGES); + NNI_STAT_LOCK(rcv_nobuf_info, "rcv_nobuf", + "received messages dropped no buffer", NNG_STAT_COUNTER, + NNG_UNIT_MESSAGES); + NNI_STAT_LOCK(snd_toobig_info, "snd_toobig", + "sent messages rejected because too big", NNG_STAT_COUNTER, + NNG_UNIT_MESSAGES); + NNI_STAT_LOCK(snd_nobuf_info, "snd_nobuf", + "sent messages dropped no buffer", NNG_STAT_COUNTER, + NNG_UNIT_MESSAGES); + NNI_STAT_LOCK(peer_inactive_info, "peer_inactive", + "connections closed due to inactive peer", NNG_STAT_COUNTER, + NNG_UNIT_EVENTS); + + nni_stat_init_lock(&ep->st_rcv_max, &rcv_max_info, &ep->mtx); + nni_stat_init_lock(&ep->st_rcv_toobig, &rcv_toobig_info, &ep->mtx); + nni_stat_init_lock(&ep->st_rcv_nomatch, &rcv_nomatch_info, &ep->mtx); + nni_stat_init_lock(&ep->st_rcv_nobuf, &rcv_nobuf_info, &ep->mtx); + nni_stat_init_lock(&ep->st_snd_toobig, &snd_toobig_info, &ep->mtx); + nni_stat_init_lock(&ep->st_snd_nobuf, &snd_nobuf_info, &ep->mtx); + nni_stat_init_lock( + &ep->st_peer_inactive, &peer_inactive_info, &ep->mtx); + + if (l) { + NNI_ASSERT(d == NULL); + nni_listener_add_stat(l, &ep->st_rcv_max); + + nni_listener_add_stat(l, &ep->st_rcv_toobig); + nni_listener_add_stat(l, &ep->st_rcv_nomatch); + nni_listener_add_stat(l, &ep->st_rcv_nobuf); + nni_listener_add_stat(l, &ep->st_snd_toobig); + nni_listener_add_stat(l, &ep->st_snd_nobuf); + } + if (d) { + NNI_ASSERT(l == NULL); + nni_dialer_add_stat(d, &ep->st_rcv_max); + nni_dialer_add_stat(d, &ep->st_rcv_toobig); + nni_dialer_add_stat(d, &ep->st_rcv_nomatch); + nni_dialer_add_stat(d, &ep->st_rcv_nobuf); + nni_dialer_add_stat(d, &ep->st_snd_toobig); + nni_dialer_add_stat(d, &ep->st_snd_nobuf); + } + + // schedule our timer callback - forever for now + // adjusted automatically as we add pipes or other + // actions which require earlier wakeup. + nni_sleep_aio(NNG_DURATION_INFINITE, &ep->timeaio); + // nni_sleep_aio(100, &ep->timeaio); + + return (NNG_OK); +} + +static nng_err +dtls_check_url(nng_url *url, bool listen) +{ + // Check for invalid URL components. + if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) { + return (NNG_EADDRINVAL); + } + if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) || + (url->u_query != NULL)) { + return (NNG_EADDRINVAL); + } + if (!listen) { + if ((strlen(url->u_hostname) == 0) || (url->u_port == 0)) { + return (NNG_EADDRINVAL); + } + } + return (NNG_OK); +} + +static nng_err +dtls_dialer_init(void *arg, nng_url *url, nni_dialer *ndialer) +{ + dtls_ep *ep = arg; + nng_err rv; + nni_sock *sock = nni_dialer_sock(ndialer); + + if ((rv = dtls_check_url(url, false)) != NNG_OK) { + return (rv); + } + + ep->ndialer = ndialer; + if ((rv = dtls_ep_init(ep, url, sock, ndialer, NULL)) != NNG_OK) { + return (rv); + } + + return (NNG_OK); +} + +static nng_err +dtls_listener_init(void *arg, nng_url *url, nni_listener *nlistener) +{ + dtls_ep *ep = arg; + nng_err rv; + nni_sock *sock = nni_listener_sock(nlistener); + + ep->nlistener = nlistener; + if ((rv = dtls_ep_init(ep, url, sock, NULL, nlistener)) != NNG_OK) { + return (rv); + } + // Check for invalid URL components. + if (((rv = dtls_check_url(url, true)) != NNG_OK) || + ((rv = nni_url_to_address(&ep->self_sa, url)) != NNG_OK)) { + return (rv); + } + + return (NNG_OK); +} + +static void +dtls_ep_cancel(nni_aio *aio, void *arg, nng_err rv) +{ + dtls_ep *ep = arg; + nni_mtx_lock(&ep->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_aio_abort(&ep->resaio, rv); + } + nni_mtx_unlock(&ep->mtx); +} + +static void +dtls_resolv_cb(void *arg) +{ + dtls_ep *ep = arg; + dtls_pipe *p; + nni_aio *aio; + int rv; + + nni_mtx_lock(&ep->mtx); + if ((aio = nni_list_first(&ep->connaios)) == NULL) { + nni_mtx_unlock(&ep->mtx); + return; + } + if (ep->closed) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + nni_mtx_unlock(&ep->mtx); + return; + } + if ((rv = nni_aio_result(&ep->resaio)) != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&ep->mtx); + nng_log_warn("NNG-RESOLV", "Failed resolving IP address: %s", + nng_strerror(rv)); + return; + } + + // Choose the right port to bind to. The family must match. + if (ep->self_sa.s_family == NNG_AF_UNSPEC) { + ep->self_sa.s_family = ep->peer_sa.s_family; + } + + if (ep->udp == NULL) { + if ((rv = nng_udp_open(&ep->udp, &ep->self_sa)) != NNG_OK) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&ep->mtx); + return; + } + } + + if ((rv = dtls_pipe_alloc(ep, &p, &ep->peer_sa)) != NNG_OK) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&ep->mtx); + return; + } + dtls_ep_start(ep); + + // Send out the connection request. We don't complete + // the user aio until we confirm a connection, so that + // we can supply details like maximum receive message size + // and the protocol the peer is using. + p->send_op = OPCODE_CREQ; + dtls_pipe_send_tls(p); + nni_mtx_unlock(&ep->mtx); +} + +static void +dtls_ep_connect(void *arg, nni_aio *aio) +{ + dtls_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + if (!nni_aio_start(aio, dtls_ep_cancel, ep)) { + nni_mtx_unlock(&ep->mtx); + return; + } + if (ep->closed) { + nni_aio_finish_error(aio, NNG_ECLOSED); + nni_mtx_unlock(&ep->mtx); + return; + } + if (ep->started) { + nni_aio_finish_error(aio, NNG_EBUSY); + nni_mtx_unlock(&ep->mtx); + return; + } + NNI_ASSERT(nni_list_empty(&ep->connaios)); + ep->dialer = true; + + nni_list_append(&ep->connaios, aio); + + // lookup the IP address + + memset(&ep->resolv, 0, sizeof(ep->resolv)); + ep->resolv.ri_family = ep->af; + ep->resolv.ri_host = ep->url->u_hostname; + ep->resolv.ri_port = ep->url->u_port; + ep->resolv.ri_passive = false; + ep->resolv.ri_sa = &ep->peer_sa; + nni_aio_set_timeout(&ep->resaio, NNI_SECOND * 5); + nni_resolv(&ep->resolv, &ep->resaio); + + // wake up for retries + nni_aio_abort(&ep->timeaio, NNG_EINTR); + + nni_mtx_unlock(&ep->mtx); +} + +static nng_err +dtls_ep_get_port(void *arg, void *buf, size_t *szp, nni_type t) +{ + dtls_ep *ep = arg; + nng_sockaddr sa; + int port; + uint8_t *paddr; + + nni_mtx_lock(&ep->mtx); + if (ep->udp != NULL) { + (void) nng_udp_sockname(ep->udp, &sa); + } else { + sa = ep->self_sa; + } + switch (sa.s_family) { + case NNG_AF_INET: + paddr = (void *) &sa.s_in.sa_port; + break; + + case NNG_AF_INET6: + paddr = (void *) &sa.s_in6.sa_port; + break; + + default: + paddr = NULL; + break; + } + nni_mtx_unlock(&ep->mtx); + + if (paddr == NULL) { + return (NNG_ESTATE); + } + + NNI_GET16(paddr, port); + return (nni_copyout_int(port, buf, szp, t)); +} + +static nng_err +dtls_ep_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + dtls_ep *ep = arg; + nng_sockaddr sa; + + nni_mtx_lock(&ep->mtx); + if (ep->udp != NULL) { + (void) nng_udp_sockname(ep->udp, &sa); + } else { + sa = ep->self_sa; + } + nni_mtx_unlock(&ep->mtx); + + return (nni_copyout_sockaddr(&sa, v, szp, t)); +} + +static nng_err +dtls_ep_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + dtls_ep *ep = arg; + nng_err rv; + + if (!ep->dialer) { + return (NNG_ENOTSUP); + } + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_sockaddr(&ep->peer_sa, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static nng_err +dtls_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + dtls_ep *ep = arg; + nng_err rv; + + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static nng_err +dtls_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + dtls_ep *ep = arg; + size_t val; + nng_err rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNG_DTLS_RECVMAX, t)) == 0) { + if ((val == 0) || (val > NNG_DTLS_RECVMAX)) { + val = NNG_DTLS_RECVMAX; + } + nni_mtx_lock(&ep->mtx); + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EBUSY); + } + ep->rcvmax = (uint16_t) val; + nni_stat_set_value(&ep->st_rcv_max, val); + nni_mtx_unlock(&ep->mtx); + } + return (rv); +} + +static nng_err +dtls_ep_set_tls(void *arg, nng_tls_config *cfg) +{ + dtls_ep *ep = arg; + nni_mtx_lock(&ep->mtx); + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EBUSY); + } + ep->tlscfg = cfg; + nni_mtx_unlock(&ep->mtx); + return (NNG_OK); +} + +static nng_err +dtls_ep_get_tls(void *arg, nng_tls_config **cfgp) +{ + dtls_ep *ep = arg; + nni_mtx_lock(&ep->mtx); + *cfgp = ep->tlscfg; + nni_mtx_unlock(&ep->mtx); + return (NNG_OK); +} + +// this just looks for pipes waiting for an aio, and aios waiting for +// a connection, and matches them together. +static void +dtls_ep_match(dtls_ep *ep) +{ + nng_aio *aio = nni_list_first(&ep->connaios); + dtls_pipe *p = nni_list_first(&ep->connpipes); + + if ((aio == NULL) || (p == NULL)) { + return; + } + + nni_aio_list_remove(aio); + nni_list_remove(&ep->connpipes, p); + nni_aio_set_output(aio, 0, p->npipe); + nni_aio_finish(aio, 0, 0); +} + +static void +dtls_ep_start(dtls_ep *ep) +{ + ep->started = true; + dtls_start_rx(ep); +} + +static nng_err +dtls_ep_bind(void *arg, nng_url *url) +{ + dtls_ep *ep = arg; + nng_err rv; + + nni_mtx_lock(&ep->mtx); + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EBUSY); + } + + rv = nng_udp_open(&ep->udp, &ep->self_sa); + if (rv != NNG_OK) { + nni_mtx_unlock(&ep->mtx); + return (rv); + } + nng_sockaddr sa; + nng_udp_sockname(ep->udp, &sa); + url->u_port = nng_sockaddr_port(&sa); + dtls_ep_start(ep); + nni_mtx_unlock(&ep->mtx); + + return (rv); +} + +static void +dtls_ep_accept(void *arg, nni_aio *aio) +{ + dtls_ep *ep = arg; + + nni_aio_reset(aio); + nni_mtx_lock(&ep->mtx); + if (ep->closed) { + nni_aio_finish_error(aio, NNG_ECLOSED); + nni_mtx_unlock(&ep->mtx); + return; + } + if (!nni_aio_start(aio, dtls_ep_cancel, ep)) { + nni_mtx_unlock(&ep->mtx); + return; + } + nni_aio_list_append(&ep->connaios, aio); + dtls_ep_match(ep); + nni_mtx_unlock(&ep->mtx); +} + +static nni_sp_pipe_ops dtls_pipe_ops = { + .p_size = dtls_pipe_size, + .p_init = dtls_pipe_init, + .p_fini = dtls_pipe_fini, + .p_stop = dtls_pipe_stop, + .p_send = dtls_pipe_send, + .p_recv = dtls_pipe_recv, + .p_close = dtls_pipe_close, + .p_peer = dtls_pipe_peer, + .p_getopt = dtls_pipe_getopt, +}; + +static const nni_option dtls_ep_opts[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_get = dtls_ep_get_recvmaxsz, + .o_set = dtls_ep_set_recvmaxsz, + }, + { + .o_name = NNG_OPT_LOCADDR, + .o_get = dtls_ep_get_locaddr, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = dtls_ep_get_remaddr, + }, + { + .o_name = NNG_OPT_TCP_BOUND_PORT, + .o_get = dtls_ep_get_port, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nng_err +dtls_dialer_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + dtls_ep *ep = arg; + + return (nni_getopt(dtls_ep_opts, name, ep, buf, szp, t)); +} + +static nng_err +dtls_dialer_setopt( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + dtls_ep *ep = arg; + + return (nni_setopt(dtls_ep_opts, name, ep, buf, sz, t)); +} + +static nng_err +dtls_listener_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + dtls_ep *ep = arg; + + return (nni_getopt(dtls_ep_opts, name, ep, buf, szp, t)); +} + +static nng_err +dtls_listener_setopt( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + dtls_ep *ep = arg; + + return (nni_setopt(dtls_ep_opts, name, ep, buf, sz, t)); +} + +static nni_sp_dialer_ops dtls_dialer_ops = { + .d_size = sizeof(dtls_ep), + .d_init = dtls_dialer_init, + .d_fini = dtls_ep_fini, + .d_connect = dtls_ep_connect, + .d_close = dtls_ep_close, + .d_stop = dtls_ep_stop, + .d_set_tls = dtls_ep_set_tls, + .d_get_tls = dtls_ep_get_tls, + .d_getopt = dtls_dialer_getopt, + .d_setopt = dtls_dialer_setopt, +}; + +static nni_sp_listener_ops dtls_listener_ops = { + .l_size = sizeof(dtls_ep), + .l_init = dtls_listener_init, + .l_fini = dtls_ep_fini, + .l_bind = dtls_ep_bind, + .l_accept = dtls_ep_accept, + .l_close = dtls_ep_close, + .l_stop = dtls_ep_stop, + .l_set_tls = dtls_ep_set_tls, + .l_get_tls = dtls_ep_get_tls, + .l_getopt = dtls_listener_getopt, + .l_setopt = dtls_listener_setopt, +}; + +static nni_sp_tran dtls_tran = { + .tran_scheme = "dtls", + .tran_dialer = &dtls_dialer_ops, + .tran_listener = &dtls_listener_ops, + .tran_pipe = &dtls_pipe_ops, + .tran_init = dtls_tran_init, + .tran_fini = dtls_tran_fini, +}; + +static nni_sp_tran dtls4_tran = { + .tran_scheme = "dtls4", + .tran_dialer = &dtls_dialer_ops, + .tran_listener = &dtls_listener_ops, + .tran_pipe = &dtls_pipe_ops, + .tran_init = dtls_tran_init, + .tran_fini = dtls_tran_fini, +}; + +#ifdef NNG_ENABLE_IPV6 +static nni_sp_tran dtls6_tran = { + .tran_scheme = "dtls6", + .tran_dialer = &dtls_dialer_ops, + .tran_listener = &dtls_listener_ops, + .tran_pipe = &dtls_pipe_ops, + .tran_init = dtls_tran_init, + .tran_fini = dtls_tran_fini, +}; +#endif + +void +nni_sp_dtls_register(void) +{ + nni_sp_tran_register(&dtls_tran); + nni_sp_tran_register(&dtls4_tran); +#ifdef NNG_ENABLE_IPV6 + nni_sp_tran_register(&dtls6_tran); +#endif +} |
