diff options
| author | Garrett D'Amore <garrett@damore.org> | 2025-06-07 12:20:41 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2025-06-08 16:06:15 -0700 |
| commit | 150116df53017c449e58ef7022fc08eea086ae75 (patch) | |
| tree | 3ca18985bc0b8388017093025ce80cbaf9f26b66 | |
| parent | f16a78fdfbd264c998c4abe9ab4398babb4b01a9 (diff) | |
| download | nng-150116df53017c449e58ef7022fc08eea086ae75.tar.gz nng-150116df53017c449e58ef7022fc08eea086ae75.tar.bz2 nng-150116df53017c449e58ef7022fc08eea086ae75.zip | |
UDP: Signficant refactoring and simplification.
This work is inspired by the DTLS work, and harmonizes the UDP implementation
with DTLS somewhat.
This should make it more resilient to failures, although there is no longer any
attempt to guard against sequencing (reorders, dupes) errors. Applications that
need such protection should either add it themselves, or use a transport which
provides that guarantee (such as TCP). Note that with devices and and such in
the way, such guarantees have never been perfect with SP anyway.
The UDP transport header sizes for this are now just 8 bytes (beyond the UDP header
itself.
| -rw-r--r-- | src/sp/transport/udp/udp.c | 517 | ||||
| -rw-r--r-- | src/sp/transport/udp/udp_tran_test.c | 61 |
2 files changed, 288 insertions, 290 deletions
diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c index 3722549e..b0945737 100644 --- a/src/sp/transport/udp/udp.c +++ b/src/sp/transport/udp/udp.c @@ -5,6 +5,7 @@ // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // +#include <stdio.h> #include "core/aio.h" #include "core/defs.h" @@ -22,14 +23,6 @@ typedef struct udp_pipe udp_pipe; typedef struct udp_ep udp_ep; -// These should reallyh be renamed for the project. -#define nni_udp_open nni_plat_udp_open -#define nni_udp_close nni_plat_udp_close -#define nni_udp_send nni_plat_udp_send -#define nni_udp_recv nni_plat_udp_recv -#define nni_udp nni_plat_udp -#define nni_udp_sockname nni_plat_udp_sockname - // OP code, 8 bits enum udp_opcode { OPCODE_DATA = 0, @@ -43,7 +36,6 @@ enum udp_opcode { typedef enum udp_disc_reason { DISC_CLOSED = 0, // normal close DISC_TYPE = 1, // bad SP type - DISC_NOTCONN = 2, // no such connection DISC_REFUSED = 3, // refused by policy DISC_MSGSIZE = 4, // message too large DISC_NEGO = 5, // neogtiation failed @@ -80,61 +72,18 @@ typedef enum udp_disc_reason { // NB: Each of the following messages is exactly 20 bytes in size -typedef struct udp_sp_data { - uint8_t us_ver; - uint8_t us_op_code; - uint16_t us_type; - uint32_t us_sender_id; - uint32_t us_peer_id; - uint32_t us_sequence; - uint16_t us_length; - uint16_t us_reserved; // depends on message type -} udp_sp_data; - -typedef struct udp_sp_creq { +typedef struct udp_sp_msg { uint8_t us_ver; uint8_t us_op_code; uint16_t us_type; - uint32_t us_sender_id; - uint32_t us_peer_id; - uint32_t us_sequence; - uint16_t us_recv_max; // actually max payload size - uint8_t us_reserved; - uint8_t us_refresh; -} udp_sp_creq; - -typedef struct udp_sp_disc { - uint8_t us_ver; - uint8_t us_op_code; - uint16_t us_type; - uint32_t us_sender_id; - uint32_t us_peer_id; - uint32_t us_sequence; - uint16_t us_reason; // depends on message type - uint16_t us_reserved; -} udp_sp_disc; - -typedef struct udp_sp_mesh { - uint8_t us_ver; - uint8_t us_op_code; - uint16_t us_type; - uint32_t us_sender_id; - uint32_t us_reserved1; - uint32_t us_sequence; - uint32_t us_reserved2; -} udp_sp_mesh; - -// ack is the same format as request -typedef struct udp_sp_creq udp_sp_cack; - -typedef union udp_sp_msg { - udp_sp_data data; - udp_sp_creq creq; - udp_sp_cack cack; - udp_sp_disc disc; - udp_sp_mesh mesh; + uint16_t us_params[2]; } udp_sp_msg; +#define us_length us_params[0] // for DATA requests +#define us_recvmax us_params[0] // for CREQ, CACK +#define us_refresh us_params[1] // for CREQ, CACK +#define us_reason us_params[0] // for DISC + // Like a NIC driver, this is a "descriptor" for UDP TX packets. // This allows us to create a circular ring of these to support // queueing for TX gracefully. @@ -153,6 +102,12 @@ typedef struct udp_txring { uint16_t size; } udp_txring; +typedef enum { + PIPE_CONN_INIT, // pipe is created, but not yet matched to a peer + PIPE_CONN_MATCH, // pipe matched to peer, but not added to SP socket + PIPE_CONN_DONE, // pipe is fully owned by the SP socket +} udp_pipe_state; + #define UDP_TXRING_SZ 128 // UDP pipe resend (CREQ) in msec (nng_duration) @@ -162,36 +117,37 @@ typedef struct udp_txring { #define UDP_PIPE_TIMEOUT(p) ((p)->refresh * 5) struct udp_pipe { - udp_ep *ep; - nni_pipe *npipe; - nng_sockaddr peer_addr; - uint16_t peer; - uint16_t proto; - uint32_t self_id; - uint32_t peer_id; - uint32_t self_seq; - uint32_t peer_seq; - uint16_t sndmax; // peer's max recv size - uint16_t rcvmax; // max recv size - bool closed; - bool dialer; - nng_duration refresh; // seconds, for the protocol - nng_time next_wake; - nng_time next_creq; - nng_time expire; - nni_list_node node; - nni_lmq rx_mq; - nni_list rx_aios; + udp_ep *ep; + nni_pipe *npipe; + nng_sockaddr peer_addr; + uint16_t peer; + uint16_t proto; + uint64_t id; + uint32_t self_id; + uint32_t peer_id; + uint16_t sndmax; // peer's max recv size + uint16_t rcvmax; // max recv size + bool closed; + bool dialer; + nng_duration refresh; // seconds, for the protocol + nng_time next_wake; + nng_time next_creq; + nng_time expire; + nni_list_node node; + nni_lmq rx_mq; + nni_list rx_aios; + udp_pipe_state state; }; struct udp_ep { - nni_udp *udp; + nng_udp *udp; nni_mtx mtx; uint16_t proto; + uint16_t peer; uint16_t af; // address family - bool fini; bool started; bool closed; + bool stopped; bool cooldown; nng_url *url; const char *host; // for dialers @@ -223,7 +179,6 @@ struct udp_ep { nni_resolv_item resolv; nni_stat_item st_rcv_max; - nni_stat_item st_rcv_reorder; nni_stat_item st_rcv_toobig; nni_stat_item st_rcv_nomatch; nni_stat_item st_rcv_copy; @@ -240,13 +195,14 @@ static void udp_resolv_cb(void *); static void udp_rx_cb(void *); static void udp_recv_data( - udp_ep *ep, udp_sp_data *dreq, size_t len, const nng_sockaddr *sa); -static void udp_send_disc_full(udp_ep *ep, const nng_sockaddr *sa, - uint32_t local_id, uint32_t remote_id, uint32_t seq, - udp_disc_reason reason); + udp_ep *ep, udp_sp_msg *msg, size_t len, const nng_sockaddr *sa); +static void udp_send_disc_full( + udp_ep *ep, const nng_sockaddr *sa, udp_disc_reason reason); static void udp_send_disc(udp_ep *ep, udp_pipe *p, udp_disc_reason reason); -static void udp_ep_match(udp_ep *ep); +static void udp_ep_match(udp_ep *ep); +static nng_err udp_add_pipe(udp_ep *ep, udp_pipe *p); +static void udp_remove_pipe(udp_pipe *p); static void udp_tran_init(void) @@ -266,6 +222,7 @@ udp_pipe_close(void *arg) nni_aio *aio; nni_mtx_lock(&ep->mtx); + udp_remove_pipe(p); udp_send_disc(ep, p, DISC_CLOSED); while ((aio = nni_list_first(&p->rx_aios)) != NULL) { nni_aio_list_remove(aio); @@ -283,11 +240,7 @@ udp_pipe_stop(void *arg) udp_pipe_close(arg); nni_mtx_lock(&ep->mtx); - if (p->self_id != 0) { - nni_id_remove(&p->ep->pipes, p->self_id); - p->self_id = 0; - } - nni_list_node_remove(&p->node); + udp_remove_pipe(p); nni_mtx_unlock(&ep->mtx); } @@ -301,19 +254,21 @@ udp_pipe_init(void *arg, nni_pipe *npipe) return (0); } -static int -udp_pipe_start(udp_pipe *p, udp_ep *ep, nng_sockaddr *sa) +static nng_err +udp_pipe_start(udp_pipe *p, udp_ep *ep, const nng_sockaddr *sa) { nni_time now = nni_clock(); p->ep = ep; p->proto = ep->proto; + p->peer = ep->peer; p->peer_addr = *sa; p->dialer = ep->dialer; p->refresh = p->dialer ? NNG_UDP_CONNRETRY : ep->refresh; p->rcvmax = ep->rcvmax; - p->self_seq = nni_random(); + p->id = nng_sockaddr_hash(sa); p->expire = now + (p->dialer ? (5 * NNI_SECOND) : UDP_PIPE_TIMEOUT(p)); - return (nni_id_alloc32(&ep->pipes, &p->self_id, p)); + + return (udp_add_pipe(ep, p)); } static void @@ -331,38 +286,71 @@ udp_pipe_fini(void *arg) NNI_ASSERT(nni_list_empty(&p->rx_aios)); } -// Find the pipe matching the given id (our pipe id, taken from the peer_id -// of the header) and peer's sockaddr. Returns NULL if not found. The -// ep lock must be held. If a pending pipe (not yet connected) is found, then -// it is returned instead. static udp_pipe * -udp_find_pipe(udp_ep *ep, uint32_t self_id, uint32_t peer_id) +udp_find_pipe(udp_ep *ep, const nng_sockaddr *peer_addr) { + uint64_t id = nng_sockaddr_hash(peer_addr); udp_pipe *p; - if (((p = nni_id_get(&ep->pipes, self_id)) != NULL) && (!p->closed)) { - if (p->peer_id == 0 || p->peer_id == peer_id) { + + // we'll keep incrementing id until we conclusively match + // or we get a NULL. This is another level of rehashing, but + // it keeps us from having to look up. + for (;;) { + if ((p = nni_id_get(&ep->pipes, id)) == NULL) { + return (NULL); + } + if (nng_sockaddr_equal(&p->peer_addr, peer_addr)) { return (p); } + id++; + if (id == 0) { + id = 1; + } + } +} + +static void +udp_remove_pipe(udp_pipe *p) +{ + // ep locked + udp_ep *ep = p->ep; + uint64_t id = p->id; + if (id == 0) { + return; + } + p->id = 0; + for (;;) { + udp_pipe *srch; + if ((srch = nni_id_get(&ep->pipes, id)) == NULL) { + break; + } + if (srch == p) { + nni_id_remove(&ep->pipes, id); + break; + } + id++; + if (id == 0) { + id = 1; + } + } + if (p->state < PIPE_CONN_DONE) { + nni_list_node_remove(&p->node); + nni_pipe_rele(p->npipe); } - return (NULL); } -static bool -udp_check_pipe_sequence(udp_pipe *p, uint32_t seq) +static nng_err +udp_add_pipe(udp_ep *ep, udp_pipe *p) { - int32_t delta; - // signed math so we can see how far apart they are - delta = (int32_t) (seq - p->peer_seq); - if (delta < 0) { - // out of order delivery - nni_stat_inc(&p->ep->st_rcv_reorder, 1); - return (false); - } - if (delta > 0) { - nni_stat_inc(&p->ep->st_rcv_reorder, 1); - } - p->peer_seq = seq + 1; // expected next sequence number - return (true); + // Id must be part of the hash + uint64_t id = p->id; + while (nni_id_get(&ep->pipes, id) != NULL) { + id++; + if (id == 0) { + id = 1; + } + } + return (nni_id_set(&ep->pipes, id, p)); } static void @@ -388,6 +376,10 @@ udp_start_rx(udp_ep *ep) { nni_iov iov; + if (ep->closed) { + return; + } + // We use this trick to collect the message header so that we can // do the entire message in a single iov, which avoids the need to // scatter/gather (which can be problematic for platforms that cannot @@ -400,7 +392,7 @@ udp_start_rx(udp_ep *ep) nni_aio_set_input(&ep->rx_aio, 0, &ep->rx_sa); nni_aio_set_iov(&ep->rx_aio, 1, &iov); - nni_udp_recv(ep->udp, &ep->rx_aio); + nng_udp_recv(ep->udp, &ep->rx_aio); } static void @@ -410,7 +402,7 @@ udp_start_tx(udp_ep *ep) udp_txdesc *desc; nng_msg *msg; - if ((!ring->count) || (!ep->started) || ep->tx_busy) { + if ((!ring->count) || (!ep->started) || ep->tx_busy || ep->stopped) { return; } ep->tx_busy = true; @@ -442,7 +434,7 @@ udp_start_tx(udp_ep *ep) nni_aio_set_iov(&ep->tx_aio, niov, iov); // it should *never* take this long, but allow for ARP resolution nni_aio_set_timeout(&ep->tx_aio, NNI_SECOND * 10); - nni_udp_send(ep->udp, &ep->tx_aio); + nng_udp_send(ep->udp, &ep->tx_aio); } static void @@ -465,19 +457,13 @@ udp_queue_tx( desc->header = *msg; #else // Fix the endianness, so other routines don't have to. - // It turns out that the endianness of the fields of CREQ - // is compatible with the fields of every other message type. // We only have to do this for systems that are not known // (at compile time) to be little endian. - desc->header.creq.us_ver = 0x1; - desc->header.creq.us_op_code = msg->creq.us_op_code; - NNI_PUT16LE(&desc->header.creq.us_type, msg->creq.us_type); - NNI_PUT32LE(&desc->header.creq.us_sended_id, msg->creq.us_sender_id); - NNI_PUT32LE(&desc->header.creq.us_peer_id, msg->creq.us_peer_id); - NNI_PUT32LE(&desc->header.creq.us_sequence, msg->creq.us_sequence); - NNI_PUT16LE(&desc->header.creq.us_recv_max, msg->creq.us_recv_max); - desc->header.creq.us_reserved = 0; - desc->header.creq.us_refresh = msg->creq.us_refresh; + desc->header.us_ver = 0x1; + desc->header.us_op_code = msg->us_op_code; + NNI_PUT16LE(&desc->header.us_type, msg->us_type); + NNI_PUT16LE(&desc->header.us_params[0], msg->us_params[0]); + NNI_PUT16LE(&desc->header.us_params[1], msg->us_params[1]); #endif desc->payload = payload; @@ -528,40 +514,34 @@ udp_send_disc(udp_ep *ep, udp_pipe *p, udp_disc_reason reason) nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } - udp_send_disc_full( - ep, &p->peer_addr, p->self_id, p->peer_id, p->self_seq++, reason); + udp_send_disc_full(ep, &p->peer_addr, reason); + nni_pipe_close(p->npipe); } static void -udp_send_disc_full(udp_ep *ep, const nng_sockaddr *sa, uint32_t local_id, - uint32_t remote_id, uint32_t seq, udp_disc_reason reason) +udp_send_disc_full(udp_ep *ep, const nng_sockaddr *sa, udp_disc_reason reason) { - udp_sp_disc disc; + udp_sp_msg disc; disc.us_ver = 0x1; disc.us_op_code = OPCODE_DISC; disc.us_type = ep->proto; - disc.us_sender_id = local_id; - disc.us_peer_id = remote_id; - disc.us_sequence = seq; disc.us_reason = (uint16_t) reason; + disc.us_params[1] = 0; udp_queue_tx(ep, sa, (void *) &disc, NULL); } static void udp_send_creq(udp_ep *ep, udp_pipe *p) { - udp_sp_creq creq; - creq.us_ver = 0x1; - creq.us_op_code = OPCODE_CREQ; - creq.us_type = p->proto; - creq.us_sender_id = p->self_id; - creq.us_peer_id = p->peer_id; - creq.us_sequence = p->self_seq++; - creq.us_recv_max = p->rcvmax; - creq.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND; - p->next_creq = nni_clock() + UDP_PIPE_REFRESH(p); - p->next_wake = p->next_creq; + udp_sp_msg creq; + creq.us_ver = 0x1; + creq.us_op_code = OPCODE_CREQ; + creq.us_type = p->proto; + creq.us_recvmax = p->rcvmax; + creq.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND; + p->next_creq = nni_clock() + UDP_PIPE_REFRESH(p); + p->next_wake = p->next_creq; udp_pipe_schedule(p); udp_queue_tx(ep, &p->peer_addr, (void *) &creq, NULL); @@ -570,43 +550,40 @@ udp_send_creq(udp_ep *ep, udp_pipe *p) static void udp_send_cack(udp_ep *ep, udp_pipe *p) { - udp_sp_cack cack; - cack.us_ver = 0x01; - cack.us_op_code = OPCODE_CACK; - cack.us_type = p->proto; - cack.us_sender_id = p->self_id; - cack.us_peer_id = p->peer_id; - cack.us_sequence = p->self_seq++; - cack.us_recv_max = p->rcvmax; - cack.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND; + udp_sp_msg cack; + cack.us_ver = 0x01; + cack.us_op_code = OPCODE_CACK; + cack.us_type = p->proto; + cack.us_recvmax = p->rcvmax; + cack.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND; udp_queue_tx(ep, &p->peer_addr, (void *) &cack, NULL); } static void -udp_recv_disc(udp_ep *ep, udp_sp_disc *disc, const nng_sockaddr *sa) +udp_recv_disc(udp_ep *ep, udp_sp_msg *disc, const nng_sockaddr *sa) { udp_pipe *p; nni_aio *aio; - NNI_ARG_UNUSED(sa); + char buf[NNG_MAXADDRSTRLEN]; - p = udp_find_pipe(ep, disc->us_peer_id, disc->us_sender_id); + nng_log_debug("NNG-UDP-DISC", "Received disconnect from %s reason %d", + nng_str_sockaddr(sa, buf, sizeof(buf)), disc->us_reason); + + p = udp_find_pipe(ep, sa); if (p != NULL) { - // For now we aren't validating the sequence numbers. - // This allows for an out of order DISC to cause the - // connection to be dropped, but it should self heal. p->closed = true; while ((aio = nni_list_first(&p->rx_aios)) != NULL) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } + nni_pipe_close(p->npipe); } } // Receive data for the pipe. Returns true if we used // the message, false otherwise. static void -udp_recv_data( - udp_ep *ep, udp_sp_data *dreq, size_t len, const nng_sockaddr *sa) +udp_recv_data(udp_ep *ep, udp_sp_msg *dreq, size_t len, const nng_sockaddr *sa) { // NB: ep mtx is locked udp_pipe *p; @@ -614,24 +591,8 @@ udp_recv_data( nni_msg *msg; nni_time now; - // send_id is the remote peer's ID - // peer_id is our ID (receiver ID) - // sequence number is our sender's sequence - uint32_t send_id = dreq->us_sender_id; - uint32_t peer_id = dreq->us_peer_id; - - // NB: Peer ID endianness does not matter, as long we use it - // consistently. - if ((p = udp_find_pipe(ep, peer_id, send_id)) == NULL) { + if ((p = udp_find_pipe(ep, sa)) == NULL) { nni_stat_inc(&ep->st_rcv_nomatch, 1); - udp_send_disc_full(ep, sa, send_id, peer_id, 0, DISC_NOTCONN); - // Question: how do we store the sockaddr for that? - return; - } - if (p->peer_id == 0) { - // connection isn't formed yet ... send another CREQ - nni_stat_inc(&ep->st_rcv_nomatch, 1); - udp_send_creq(ep, p); return; } @@ -654,11 +615,6 @@ udp_recv_data( nni_msg_chop( ep->rx_payload, nni_msg_len(ep->rx_payload) - dreq->us_length); - if (!udp_check_pipe_sequence(p, dreq->us_sequence)) { - // out of order delivery, drop it - return; - } - // We have a choice to make. Drop this message (easiest), or // drop the oldest. We drop the oldest because generally we // find that applications prefer to have more recent data rather @@ -714,26 +670,26 @@ udp_recv_data( } static void -udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa) +udp_recv_creq(udp_ep *ep, udp_sp_msg *creq, nng_sockaddr *sa) { udp_pipe *p; nni_time now; now = nni_clock(); + + if (ep->closed) { + // endpoint is closing down, just drop it without further ado + return; + } + if (ep->dialer) { // dialers do not accept CREQ requests - udp_send_disc_full(ep, sa, creq->us_peer_id, - creq->us_sender_id, 0, DISC_REFUSED); + udp_send_disc_full(ep, sa, DISC_REFUSED); return; } - if ((p = udp_find_pipe(ep, creq->us_peer_id, creq->us_sender_id))) { - if ((p->peer_id == 0) || (p->peer != creq->us_type)) { - // we don't expect this -- a connection request from a - // peer while we have an outstanding request of our - // own. We *could* compare the sockaddrs to see if they - // match and if so then treat this as just a dueling - // connection. but for now we just discard it -- we'll - // wait for the CACK. + if ((p = udp_find_pipe(ep, sa))) { + if (p->peer != creq->us_type) { + udp_send_disc(ep, p, DISC_TYPE); return; } @@ -754,26 +710,18 @@ udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa) } // new pipe - if (ep->fini || ep->closed) { - // endpoint is closing down, reject it. - udp_send_disc_full( - ep, sa, 0, creq->us_sender_id, 0, DISC_REFUSED); - return; - } if (creq->us_refresh == 0) { - udp_send_disc_full( - ep, sa, 0, creq->us_sender_id, 0, DISC_NEGO); + udp_send_disc_full(ep, sa, DISC_NEGO); return; } if (nni_pipe_alloc_listener((void **) &p, ep->nlistener) != 0) { - udp_send_disc_full( - ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF); + udp_send_disc_full(ep, sa, DISC_NOBUF); return; } - if (udp_pipe_start(p, ep, sa) != 0) { - udp_send_disc_full( - ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF); + + if (udp_pipe_start(p, ep, sa) != NNG_OK) { + udp_send_disc(ep, p, DISC_NOBUF); nni_pipe_close(p->npipe); return; } @@ -782,46 +730,39 @@ udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa) p->refresh = (creq->us_refresh * NNI_SECOND); } p->peer = creq->us_type; - p->peer_id = creq->us_sender_id; - p->peer_seq = creq->us_sequence + 1; - p->sndmax = creq->us_recv_max; + p->sndmax = creq->us_recvmax; p->next_wake = now + UDP_PIPE_REFRESH(p); udp_pipe_schedule(p); + p->state = PIPE_CONN_MATCH; nni_list_append(&ep->connpipes, p); udp_send_cack(ep, p); udp_ep_match(ep); } static void -udp_recv_cack(udp_ep *ep, udp_sp_creq *cack, const nng_sockaddr *sa) +udp_recv_cack(udp_ep *ep, udp_sp_msg *cack, const nng_sockaddr *sa) { udp_pipe *p; - bool first; nni_time now; - if ((p = udp_find_pipe(ep, cack->us_peer_id, cack->us_sender_id)) && - (!p->closed)) { - if ((p->peer_id != 0) && (p->peer != cack->us_type)) { + if ((p = udp_find_pipe(ep, sa)) && (!p->closed)) { + if (p->peer != cack->us_type) { udp_send_disc(ep, p, DISC_TYPE); return; } - first = (p->peer_id == 0); - // so we know who it is from.. this is a refresh. - p->sndmax = cack->us_recv_max; - p->peer = cack->us_type; - p->peer_id = cack->us_sender_id; + p->sndmax = cack->us_recvmax; + p->peer = cack->us_type; if (cack->us_refresh == 0) { udp_send_disc(ep, p, DISC_NEGO); return; } - if (first) { - p->refresh = ep->refresh; - p->peer_seq = cack->us_sequence + 1; - } + // Always reset this, as dialers may have started with an + // unreasonably low value. + p->refresh = ep->refresh; if ((cack->us_refresh * NNI_SECOND) < p->refresh) { p->refresh = cack->us_refresh * NNI_SECOND; } @@ -830,16 +771,13 @@ udp_recv_cack(udp_ep *ep, udp_sp_creq *cack, const nng_sockaddr *sa) p->expire = now + UDP_PIPE_TIMEOUT(p); udp_pipe_schedule(p); - if (first) { + if (p->state < PIPE_CONN_MATCH) { + p->state = PIPE_CONN_MATCH; nni_list_append(&ep->connpipes, p); udp_ep_match(ep); } return; } - - // a CACK without a corresponding CREQ (or timed out pipe already) - udp_send_disc_full( - ep, sa, cack->us_peer_id, cack->us_sender_id, 0, DISC_NOTCONN); } static void @@ -904,41 +842,36 @@ udp_rx_cb(void *arg) sa = &ep->rx_sa; n = nng_aio_count(aio); - if ((n >= sizeof(*hdr)) && (hdr->data.us_ver == 1)) { + if ((n >= sizeof(*hdr)) && (hdr->us_ver == 1)) { n -= sizeof(*hdr); #ifndef NNG_LITTLE_ENDIAN // Fix the endianness, so other routines don't have to. - // It turns out that the endianness of the fields of CREQ - // is compatible with the fields of every other message type. // We only have to do this for systems that are not known // (at compile time) to be little endian. - hdr->data.us_type = NNI_GET16LE(&hdr->data.us_type); - hdr->data.us_sender_id = NNI_GET32LE(&hdr->data.us_sender_id); - hdr->data.us_peeer_id = NNI_GET32LE(&hdr->data.us_peer_id); - hdr->data.us_sequence = NNI_GET32LE(&hdr->data.us_sequence); - hdr->data.us_length = NNI_GET16LE(&hdr->data.us_length); + hdr->us_type = NNI_GET16LE(&hdr->us_type); + hdr->us_params[0] = NNI_GET16LE(&hdr->us_params[0]); + hdr->us_params[1] = NNI_GET16LE(&hdr->us_params[1]); #endif - switch (hdr->data.us_op_code) { + switch (hdr->us_op_code) { case OPCODE_DATA: - udp_recv_data(ep, &hdr->data, n, sa); + udp_recv_data(ep, hdr, n, sa); break; case OPCODE_CREQ: - udp_recv_creq(ep, &hdr->creq, sa); + udp_recv_creq(ep, hdr, sa); break; case OPCODE_CACK: - udp_recv_cack(ep, &hdr->cack, sa); + udp_recv_cack(ep, hdr, sa); break; case OPCODE_DISC: - udp_recv_disc(ep, &hdr->disc, sa); + udp_recv_disc(ep, hdr, sa); break; case OPCODE_MESH: // TODO: // udp_recv_mesh(ep, &hdr->mesh, sa); // break; default: - udp_send_disc_full( - ep, sa, 0, hdr->data.us_sender_id, 0, DISC_PROTO); + udp_send_disc_full(ep, sa, DISC_PROTO); break; } } @@ -959,11 +892,11 @@ finish: static void udp_pipe_send(void *arg, nni_aio *aio) { - udp_pipe *p = arg; - udp_ep *ep; - udp_sp_data dreq; - nng_msg *msg; - size_t count = 0; + udp_pipe *p = arg; + udp_ep *ep; + udp_sp_msg dreq; + nng_msg *msg; + size_t count = 0; msg = nni_aio_get_msg(aio); ep = p->ep; @@ -976,23 +909,20 @@ udp_pipe_send(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); if ((nni_msg_len(msg) + nni_msg_header_len(msg)) > p->sndmax) { nni_mtx_unlock(&ep->mtx); - // rather failing this with an error, we just drop it on the - // floor. this is on the sender, so there isn't a compelling - // need to disconnect the pipe, since it we're not being - // "ill-behaved" to our peer. + // rather failing this with an error, we just drop it on + // the floor. this is on the sender, so there isn't a + // compelling need to disconnect the pipe, since it we're + // not being "ill-behaved" to our peer. nni_aio_finish(aio, 0, count); nni_stat_inc(&ep->st_snd_toobig, 1); nni_msg_free(msg); return; } - dreq.us_ver = 1; - dreq.us_type = ep->proto; - dreq.us_op_code = OPCODE_DATA; - dreq.us_sender_id = p->self_id; - dreq.us_peer_id = p->peer_id; - dreq.us_sequence = p->self_seq++; - dreq.us_length = (uint16_t) count; + dreq.us_ver = 1; + dreq.us_type = ep->proto; + dreq.us_op_code = OPCODE_DATA; + dreq.us_length = (uint16_t) count; // Just queue it, or fail it. udp_queue_tx(ep, &p->peer_addr, (void *) &dreq, msg); @@ -1107,7 +1037,7 @@ udp_ep_fini(void *arg) nni_aio_fini(&ep->rx_aio); if (ep->udp != NULL) { - nni_udp_close(ep->udp); + nng_udp_close(ep->udp); } for (int i = 0; i < ep->tx_ring.size; i++) { @@ -1122,16 +1052,24 @@ udp_ep_fini(void *arg) static void udp_ep_close(void *arg) { - udp_ep *ep = arg; - nni_aio *aio; + udp_ep *ep = arg; + udp_pipe *p; + nni_aio *aio; + uint32_t cursor; + uint64_t key; + + nni_mtx_lock(&ep->mtx); + ep->closed = true; + // leave tx open so we can send disconnects nni_aio_close(&ep->resaio); nni_aio_close(&ep->rx_aio); nni_aio_close(&ep->timeaio); - // leave tx open so we can send disconnects - - nni_mtx_lock(&ep->mtx); + // close all the underlying pipes, so the peer can see it. + while (nni_id_visit(&ep->pipes, &key, (void **) &p, &cursor)) { + nni_pipe_close(p->npipe); + } while ((aio = nni_list_first(&ep->connaios)) != NULL) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECONNABORTED); @@ -1154,7 +1092,6 @@ udp_ep_stop(void *arg) nni_time linger = nni_clock() + NNI_SECOND / 2; // half second to drain, max nni_mtx_lock(&ep->mtx); - ep->fini = true; while ((ep->tx_ring.count > 0) && (nni_clock() < linger)) { nni_mtx_unlock(&ep->mtx); nng_msleep(1); @@ -1165,6 +1102,7 @@ udp_ep_stop(void *arg) "Lingering timed out on endpoint close, peer " "notifications dropped"); } + ep->stopped = true; nni_mtx_unlock(&ep->mtx); // finally close the tx channel @@ -1210,8 +1148,8 @@ udp_timer_cb(void *arg) "Pipe peer %s timed out due to inactivity", nng_str_sockaddr(&p->peer_addr, buf, sizeof(buf))); - // Possibly alert the dialer, so it can restart a new - // attempt. + // Possibly alert the dialer, so it can restart a + // new attempt. if ((ep->dialer) && (p->peer_id == 0) && (aio = nni_list_first(&ep->connaios))) { nni_aio_list_remove(aio); @@ -1285,6 +1223,7 @@ udp_ep_init( ep->self_sa.s_family = ep->af; ep->proto = nni_sock_proto_id(sock); + ep->peer = nni_sock_peer_id(sock); ep->url = url; ep->refresh = NNG_UDP_REFRESH; // one minute by default ep->rcvmax = NNG_UDP_RECVMAX; @@ -1298,9 +1237,6 @@ udp_ep_init( NNG_STAT_LEVEL, NNG_UNIT_BYTES); NNI_STAT_LOCK(copy_max_info, "copy_max", "threshold to switch to loan-up", NNG_STAT_LEVEL, NNG_UNIT_BYTES); - NNI_STAT_LOCK(rcv_reorder_info, "rcv_reorder", - "messages received out of order", NNG_STAT_COUNTER, - NNG_UNIT_MESSAGES); NNI_STAT_LOCK(rcv_nomatch_info, "rcv_nomatch", "messages without a matching connection", NNG_STAT_COUNTER, NNG_UNIT_MESSAGES); @@ -1330,7 +1266,6 @@ udp_ep_init( nni_stat_init_lock(&ep->st_copy_max, ©_max_info, &ep->mtx); nni_stat_init_lock(&ep->st_rcv_copy, &rcv_copy_info, &ep->mtx); nni_stat_init_lock(&ep->st_rcv_nocopy, &rcv_nocopy_info, &ep->mtx); - nni_stat_init_lock(&ep->st_rcv_reorder, &rcv_reorder_info, &ep->mtx); nni_stat_init_lock(&ep->st_rcv_toobig, &rcv_toobig_info, &ep->mtx); nni_stat_init_lock(&ep->st_rcv_nomatch, &rcv_nomatch_info, &ep->mtx); nni_stat_init_lock(&ep->st_rcv_nobuf, &rcv_nobuf_info, &ep->mtx); @@ -1345,7 +1280,6 @@ udp_ep_init( nni_listener_add_stat(l, &ep->st_copy_max); nni_listener_add_stat(l, &ep->st_rcv_copy); nni_listener_add_stat(l, &ep->st_rcv_nocopy); - nni_listener_add_stat(l, &ep->st_rcv_reorder); nni_listener_add_stat(l, &ep->st_rcv_toobig); nni_listener_add_stat(l, &ep->st_rcv_nomatch); nni_listener_add_stat(l, &ep->st_rcv_nobuf); @@ -1358,7 +1292,6 @@ udp_ep_init( nni_dialer_add_stat(d, &ep->st_copy_max); nni_dialer_add_stat(d, &ep->st_rcv_copy); nni_dialer_add_stat(d, &ep->st_rcv_nocopy); - nni_dialer_add_stat(d, &ep->st_rcv_reorder); nni_dialer_add_stat(d, &ep->st_rcv_toobig); nni_dialer_add_stat(d, &ep->st_rcv_nomatch); nni_dialer_add_stat(d, &ep->st_rcv_nobuf); @@ -1478,7 +1411,7 @@ udp_resolv_cb(void *arg) } if (ep->udp == NULL) { - if ((rv = nni_udp_open(&ep->udp, &ep->self_sa)) != 0) { + if ((rv = nng_udp_open(&ep->udp, &ep->self_sa)) != NNG_OK) { nni_aio_list_remove(aio); nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); @@ -1487,16 +1420,19 @@ udp_resolv_cb(void *arg) } // places a "hold" on the ep - if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) != 0) { + if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) != + NNG_OK) { nni_aio_list_remove(aio); nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); + return; } - if ((rv = udp_pipe_start(p, ep, &ep->peer_sa)) != 0) { + if ((rv = udp_pipe_start(p, ep, &ep->peer_sa)) != NNG_OK) { nni_aio_list_remove(aio); nni_pipe_close(p->npipe); nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); + return; } udp_pipe_schedule(p); @@ -1562,7 +1498,7 @@ udp_ep_get_port(void *arg, void *buf, size_t *szp, nni_type t) nni_mtx_lock(&ep->mtx); if (ep->udp != NULL) { - (void) nni_udp_sockname(ep->udp, &sa); + (void) nng_udp_sockname(ep->udp, &sa); } else { sa = ep->self_sa; } @@ -1597,7 +1533,7 @@ udp_ep_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) nng_sockaddr sa; if (ep->udp != NULL) { - (void) nni_udp_sockname(ep->udp, &sa); + (void) nng_udp_sockname(ep->udp, &sa); } else { sa = ep->self_sa; } @@ -1699,6 +1635,7 @@ udp_ep_match(udp_ep *ep) return; } + p->state = PIPE_CONN_DONE; nni_aio_list_remove(aio); nni_list_remove(&ep->connpipes, p); nni_aio_set_output(aio, 0, p->npipe); @@ -1724,13 +1661,13 @@ udp_ep_bind(void *arg, nng_url *url) return (NNG_EBUSY); } - rv = nni_udp_open(&ep->udp, &ep->self_sa); + rv = nng_udp_open(&ep->udp, &ep->self_sa); if (rv != NNG_OK) { nni_mtx_unlock(&ep->mtx); return (rv); } nng_sockaddr sa; - nni_plat_udp_sockname(ep->udp, &sa); + nng_udp_sockname(ep->udp, &sa); url->u_port = nng_sockaddr_port(&sa); udp_ep_start(ep); nni_mtx_unlock(&ep->mtx); diff --git a/src/sp/transport/udp/udp_tran_test.c b/src/sp/transport/udp/udp_tran_test.c index 0b4b180d..56a25cce 100644 --- a/src/sp/transport/udp/udp_tran_test.c +++ b/src/sp/transport/udp/udp_tran_test.c @@ -496,6 +496,66 @@ test_udp_pipe(void) NUTS_CLOSE(s0); NUTS_CLOSE(s1); } + +void +test_udp_reconnect_dialer(void) +{ + nng_socket s0; + nng_socket s1; + nng_listener l; + nng_dialer d; + char *addr; + nng_msg *msg; + nng_sockaddr sa0; + + NUTS_LOGGING(); + NUTS_ADDR(addr, "udp4"); + + // For this test, using PUB sub is better to avoid fighting the pair + // exclusion. Generally speaking pair is probably a poor fit for UDP + // anyway. + + NUTS_PASS(nng_pub0_open(&s0)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 2000)); + NUTS_PASS(nng_listener_create(&l, s0, addr)); + NUTS_PASS(nng_listener_start(l, 0)); + NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa0)); + + NUTS_PASS(nng_sub0_open(&s1)); + NUTS_PASS(nng_sub0_socket_subscribe(s1, "", 0)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 2000)); + NUTS_PASS(nng_dialer_create(&d, s1, addr)); + NUTS_PASS(nng_dialer_start(d, 0)); + + nng_msleep(500); + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_sendmsg(s0, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + nng_msg_free(msg); + + // now close the dialer + NUTS_PASS(nng_dialer_close(d)); + + // wait a bit before starting a new one -- allow for the old one to + // tear down completely on both ends. + nng_msleep(500); + + // and create and start a new one + NUTS_PASS(nng_dialer_create(&d, s1, addr)); + NUTS_PASS(nng_dialer_start(d, 0)); + nng_msleep(500); + + // show that we still send and receive + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_sendmsg(s0, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + nng_msg_free(msg); + + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); +} + void test_udp_stats(void) { @@ -559,6 +619,7 @@ NUTS_TESTS = { { "udp multi small burst", test_udp_multi_small_burst }, { "udp crush", test_udp_crush }, { "udp pipe", test_udp_pipe }, + { "udp reconnect dialer", test_udp_reconnect_dialer }, { "udp stats", test_udp_stats }, { NULL, NULL }, }; |
