diff options
Diffstat (limited to 'src/sp/transport')
| -rw-r--r-- | src/sp/transport/udp/udp.c | 517 | ||||
| -rw-r--r-- | src/sp/transport/udp/udp_tran_test.c | 61 |
2 files changed, 288 insertions, 290 deletions
diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c index 3722549e..b0945737 100644 --- a/src/sp/transport/udp/udp.c +++ b/src/sp/transport/udp/udp.c @@ -5,6 +5,7 @@ // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // +#include <stdio.h> #include "core/aio.h" #include "core/defs.h" @@ -22,14 +23,6 @@ typedef struct udp_pipe udp_pipe; typedef struct udp_ep udp_ep; -// These should reallyh be renamed for the project. -#define nni_udp_open nni_plat_udp_open -#define nni_udp_close nni_plat_udp_close -#define nni_udp_send nni_plat_udp_send -#define nni_udp_recv nni_plat_udp_recv -#define nni_udp nni_plat_udp -#define nni_udp_sockname nni_plat_udp_sockname - // OP code, 8 bits enum udp_opcode { OPCODE_DATA = 0, @@ -43,7 +36,6 @@ enum udp_opcode { typedef enum udp_disc_reason { DISC_CLOSED = 0, // normal close DISC_TYPE = 1, // bad SP type - DISC_NOTCONN = 2, // no such connection DISC_REFUSED = 3, // refused by policy DISC_MSGSIZE = 4, // message too large DISC_NEGO = 5, // neogtiation failed @@ -80,61 +72,18 @@ typedef enum udp_disc_reason { // NB: Each of the following messages is exactly 20 bytes in size -typedef struct udp_sp_data { - uint8_t us_ver; - uint8_t us_op_code; - uint16_t us_type; - uint32_t us_sender_id; - uint32_t us_peer_id; - uint32_t us_sequence; - uint16_t us_length; - uint16_t us_reserved; // depends on message type -} udp_sp_data; - -typedef struct udp_sp_creq { +typedef struct udp_sp_msg { uint8_t us_ver; uint8_t us_op_code; uint16_t us_type; - uint32_t us_sender_id; - uint32_t us_peer_id; - uint32_t us_sequence; - uint16_t us_recv_max; // actually max payload size - uint8_t us_reserved; - uint8_t us_refresh; -} udp_sp_creq; - -typedef struct udp_sp_disc { - uint8_t us_ver; - uint8_t us_op_code; - uint16_t us_type; - uint32_t us_sender_id; - uint32_t us_peer_id; - uint32_t us_sequence; - uint16_t us_reason; // depends on message type - uint16_t us_reserved; -} udp_sp_disc; - -typedef struct udp_sp_mesh { - uint8_t us_ver; - uint8_t us_op_code; - uint16_t us_type; - uint32_t us_sender_id; - uint32_t us_reserved1; - uint32_t us_sequence; - uint32_t us_reserved2; -} udp_sp_mesh; - -// ack is the same format as request -typedef struct udp_sp_creq udp_sp_cack; - -typedef union udp_sp_msg { - udp_sp_data data; - udp_sp_creq creq; - udp_sp_cack cack; - udp_sp_disc disc; - udp_sp_mesh mesh; + uint16_t us_params[2]; } udp_sp_msg; +#define us_length us_params[0] // for DATA requests +#define us_recvmax us_params[0] // for CREQ, CACK +#define us_refresh us_params[1] // for CREQ, CACK +#define us_reason us_params[0] // for DISC + // Like a NIC driver, this is a "descriptor" for UDP TX packets. // This allows us to create a circular ring of these to support // queueing for TX gracefully. @@ -153,6 +102,12 @@ typedef struct udp_txring { uint16_t size; } udp_txring; +typedef enum { + PIPE_CONN_INIT, // pipe is created, but not yet matched to a peer + PIPE_CONN_MATCH, // pipe matched to peer, but not added to SP socket + PIPE_CONN_DONE, // pipe is fully owned by the SP socket +} udp_pipe_state; + #define UDP_TXRING_SZ 128 // UDP pipe resend (CREQ) in msec (nng_duration) @@ -162,36 +117,37 @@ typedef struct udp_txring { #define UDP_PIPE_TIMEOUT(p) ((p)->refresh * 5) struct udp_pipe { - udp_ep *ep; - nni_pipe *npipe; - nng_sockaddr peer_addr; - uint16_t peer; - uint16_t proto; - uint32_t self_id; - uint32_t peer_id; - uint32_t self_seq; - uint32_t peer_seq; - uint16_t sndmax; // peer's max recv size - uint16_t rcvmax; // max recv size - bool closed; - bool dialer; - nng_duration refresh; // seconds, for the protocol - nng_time next_wake; - nng_time next_creq; - nng_time expire; - nni_list_node node; - nni_lmq rx_mq; - nni_list rx_aios; + udp_ep *ep; + nni_pipe *npipe; + nng_sockaddr peer_addr; + uint16_t peer; + uint16_t proto; + uint64_t id; + uint32_t self_id; + uint32_t peer_id; + uint16_t sndmax; // peer's max recv size + uint16_t rcvmax; // max recv size + bool closed; + bool dialer; + nng_duration refresh; // seconds, for the protocol + nng_time next_wake; + nng_time next_creq; + nng_time expire; + nni_list_node node; + nni_lmq rx_mq; + nni_list rx_aios; + udp_pipe_state state; }; struct udp_ep { - nni_udp *udp; + nng_udp *udp; nni_mtx mtx; uint16_t proto; + uint16_t peer; uint16_t af; // address family - bool fini; bool started; bool closed; + bool stopped; bool cooldown; nng_url *url; const char *host; // for dialers @@ -223,7 +179,6 @@ struct udp_ep { nni_resolv_item resolv; nni_stat_item st_rcv_max; - nni_stat_item st_rcv_reorder; nni_stat_item st_rcv_toobig; nni_stat_item st_rcv_nomatch; nni_stat_item st_rcv_copy; @@ -240,13 +195,14 @@ static void udp_resolv_cb(void *); static void udp_rx_cb(void *); static void udp_recv_data( - udp_ep *ep, udp_sp_data *dreq, size_t len, const nng_sockaddr *sa); -static void udp_send_disc_full(udp_ep *ep, const nng_sockaddr *sa, - uint32_t local_id, uint32_t remote_id, uint32_t seq, - udp_disc_reason reason); + udp_ep *ep, udp_sp_msg *msg, size_t len, const nng_sockaddr *sa); +static void udp_send_disc_full( + udp_ep *ep, const nng_sockaddr *sa, udp_disc_reason reason); static void udp_send_disc(udp_ep *ep, udp_pipe *p, udp_disc_reason reason); -static void udp_ep_match(udp_ep *ep); +static void udp_ep_match(udp_ep *ep); +static nng_err udp_add_pipe(udp_ep *ep, udp_pipe *p); +static void udp_remove_pipe(udp_pipe *p); static void udp_tran_init(void) @@ -266,6 +222,7 @@ udp_pipe_close(void *arg) nni_aio *aio; nni_mtx_lock(&ep->mtx); + udp_remove_pipe(p); udp_send_disc(ep, p, DISC_CLOSED); while ((aio = nni_list_first(&p->rx_aios)) != NULL) { nni_aio_list_remove(aio); @@ -283,11 +240,7 @@ udp_pipe_stop(void *arg) udp_pipe_close(arg); nni_mtx_lock(&ep->mtx); - if (p->self_id != 0) { - nni_id_remove(&p->ep->pipes, p->self_id); - p->self_id = 0; - } - nni_list_node_remove(&p->node); + udp_remove_pipe(p); nni_mtx_unlock(&ep->mtx); } @@ -301,19 +254,21 @@ udp_pipe_init(void *arg, nni_pipe *npipe) return (0); } -static int -udp_pipe_start(udp_pipe *p, udp_ep *ep, nng_sockaddr *sa) +static nng_err +udp_pipe_start(udp_pipe *p, udp_ep *ep, const nng_sockaddr *sa) { nni_time now = nni_clock(); p->ep = ep; p->proto = ep->proto; + p->peer = ep->peer; p->peer_addr = *sa; p->dialer = ep->dialer; p->refresh = p->dialer ? NNG_UDP_CONNRETRY : ep->refresh; p->rcvmax = ep->rcvmax; - p->self_seq = nni_random(); + p->id = nng_sockaddr_hash(sa); p->expire = now + (p->dialer ? (5 * NNI_SECOND) : UDP_PIPE_TIMEOUT(p)); - return (nni_id_alloc32(&ep->pipes, &p->self_id, p)); + + return (udp_add_pipe(ep, p)); } static void @@ -331,38 +286,71 @@ udp_pipe_fini(void *arg) NNI_ASSERT(nni_list_empty(&p->rx_aios)); } -// Find the pipe matching the given id (our pipe id, taken from the peer_id -// of the header) and peer's sockaddr. Returns NULL if not found. The -// ep lock must be held. If a pending pipe (not yet connected) is found, then -// it is returned instead. static udp_pipe * -udp_find_pipe(udp_ep *ep, uint32_t self_id, uint32_t peer_id) +udp_find_pipe(udp_ep *ep, const nng_sockaddr *peer_addr) { + uint64_t id = nng_sockaddr_hash(peer_addr); udp_pipe *p; - if (((p = nni_id_get(&ep->pipes, self_id)) != NULL) && (!p->closed)) { - if (p->peer_id == 0 || p->peer_id == peer_id) { + + // we'll keep incrementing id until we conclusively match + // or we get a NULL. This is another level of rehashing, but + // it keeps us from having to look up. + for (;;) { + if ((p = nni_id_get(&ep->pipes, id)) == NULL) { + return (NULL); + } + if (nng_sockaddr_equal(&p->peer_addr, peer_addr)) { return (p); } + id++; + if (id == 0) { + id = 1; + } + } +} + +static void +udp_remove_pipe(udp_pipe *p) +{ + // ep locked + udp_ep *ep = p->ep; + uint64_t id = p->id; + if (id == 0) { + return; + } + p->id = 0; + for (;;) { + udp_pipe *srch; + if ((srch = nni_id_get(&ep->pipes, id)) == NULL) { + break; + } + if (srch == p) { + nni_id_remove(&ep->pipes, id); + break; + } + id++; + if (id == 0) { + id = 1; + } + } + if (p->state < PIPE_CONN_DONE) { + nni_list_node_remove(&p->node); + nni_pipe_rele(p->npipe); } - return (NULL); } -static bool -udp_check_pipe_sequence(udp_pipe *p, uint32_t seq) +static nng_err +udp_add_pipe(udp_ep *ep, udp_pipe *p) { - int32_t delta; - // signed math so we can see how far apart they are - delta = (int32_t) (seq - p->peer_seq); - if (delta < 0) { - // out of order delivery - nni_stat_inc(&p->ep->st_rcv_reorder, 1); - return (false); - } - if (delta > 0) { - nni_stat_inc(&p->ep->st_rcv_reorder, 1); - } - p->peer_seq = seq + 1; // expected next sequence number - return (true); + // Id must be part of the hash + uint64_t id = p->id; + while (nni_id_get(&ep->pipes, id) != NULL) { + id++; + if (id == 0) { + id = 1; + } + } + return (nni_id_set(&ep->pipes, id, p)); } static void @@ -388,6 +376,10 @@ udp_start_rx(udp_ep *ep) { nni_iov iov; + if (ep->closed) { + return; + } + // We use this trick to collect the message header so that we can // do the entire message in a single iov, which avoids the need to // scatter/gather (which can be problematic for platforms that cannot @@ -400,7 +392,7 @@ udp_start_rx(udp_ep *ep) nni_aio_set_input(&ep->rx_aio, 0, &ep->rx_sa); nni_aio_set_iov(&ep->rx_aio, 1, &iov); - nni_udp_recv(ep->udp, &ep->rx_aio); + nng_udp_recv(ep->udp, &ep->rx_aio); } static void @@ -410,7 +402,7 @@ udp_start_tx(udp_ep *ep) udp_txdesc *desc; nng_msg *msg; - if ((!ring->count) || (!ep->started) || ep->tx_busy) { + if ((!ring->count) || (!ep->started) || ep->tx_busy || ep->stopped) { return; } ep->tx_busy = true; @@ -442,7 +434,7 @@ udp_start_tx(udp_ep *ep) nni_aio_set_iov(&ep->tx_aio, niov, iov); // it should *never* take this long, but allow for ARP resolution nni_aio_set_timeout(&ep->tx_aio, NNI_SECOND * 10); - nni_udp_send(ep->udp, &ep->tx_aio); + nng_udp_send(ep->udp, &ep->tx_aio); } static void @@ -465,19 +457,13 @@ udp_queue_tx( desc->header = *msg; #else // Fix the endianness, so other routines don't have to. - // It turns out that the endianness of the fields of CREQ - // is compatible with the fields of every other message type. // We only have to do this for systems that are not known // (at compile time) to be little endian. - desc->header.creq.us_ver = 0x1; - desc->header.creq.us_op_code = msg->creq.us_op_code; - NNI_PUT16LE(&desc->header.creq.us_type, msg->creq.us_type); - NNI_PUT32LE(&desc->header.creq.us_sended_id, msg->creq.us_sender_id); - NNI_PUT32LE(&desc->header.creq.us_peer_id, msg->creq.us_peer_id); - NNI_PUT32LE(&desc->header.creq.us_sequence, msg->creq.us_sequence); - NNI_PUT16LE(&desc->header.creq.us_recv_max, msg->creq.us_recv_max); - desc->header.creq.us_reserved = 0; - desc->header.creq.us_refresh = msg->creq.us_refresh; + desc->header.us_ver = 0x1; + desc->header.us_op_code = msg->us_op_code; + NNI_PUT16LE(&desc->header.us_type, msg->us_type); + NNI_PUT16LE(&desc->header.us_params[0], msg->us_params[0]); + NNI_PUT16LE(&desc->header.us_params[1], msg->us_params[1]); #endif desc->payload = payload; @@ -528,40 +514,34 @@ udp_send_disc(udp_ep *ep, udp_pipe *p, udp_disc_reason reason) nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } - udp_send_disc_full( - ep, &p->peer_addr, p->self_id, p->peer_id, p->self_seq++, reason); + udp_send_disc_full(ep, &p->peer_addr, reason); + nni_pipe_close(p->npipe); } static void -udp_send_disc_full(udp_ep *ep, const nng_sockaddr *sa, uint32_t local_id, - uint32_t remote_id, uint32_t seq, udp_disc_reason reason) +udp_send_disc_full(udp_ep *ep, const nng_sockaddr *sa, udp_disc_reason reason) { - udp_sp_disc disc; + udp_sp_msg disc; disc.us_ver = 0x1; disc.us_op_code = OPCODE_DISC; disc.us_type = ep->proto; - disc.us_sender_id = local_id; - disc.us_peer_id = remote_id; - disc.us_sequence = seq; disc.us_reason = (uint16_t) reason; + disc.us_params[1] = 0; udp_queue_tx(ep, sa, (void *) &disc, NULL); } static void udp_send_creq(udp_ep *ep, udp_pipe *p) { - udp_sp_creq creq; - creq.us_ver = 0x1; - creq.us_op_code = OPCODE_CREQ; - creq.us_type = p->proto; - creq.us_sender_id = p->self_id; - creq.us_peer_id = p->peer_id; - creq.us_sequence = p->self_seq++; - creq.us_recv_max = p->rcvmax; - creq.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND; - p->next_creq = nni_clock() + UDP_PIPE_REFRESH(p); - p->next_wake = p->next_creq; + udp_sp_msg creq; + creq.us_ver = 0x1; + creq.us_op_code = OPCODE_CREQ; + creq.us_type = p->proto; + creq.us_recvmax = p->rcvmax; + creq.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND; + p->next_creq = nni_clock() + UDP_PIPE_REFRESH(p); + p->next_wake = p->next_creq; udp_pipe_schedule(p); udp_queue_tx(ep, &p->peer_addr, (void *) &creq, NULL); @@ -570,43 +550,40 @@ udp_send_creq(udp_ep *ep, udp_pipe *p) static void udp_send_cack(udp_ep *ep, udp_pipe *p) { - udp_sp_cack cack; - cack.us_ver = 0x01; - cack.us_op_code = OPCODE_CACK; - cack.us_type = p->proto; - cack.us_sender_id = p->self_id; - cack.us_peer_id = p->peer_id; - cack.us_sequence = p->self_seq++; - cack.us_recv_max = p->rcvmax; - cack.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND; + udp_sp_msg cack; + cack.us_ver = 0x01; + cack.us_op_code = OPCODE_CACK; + cack.us_type = p->proto; + cack.us_recvmax = p->rcvmax; + cack.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND; udp_queue_tx(ep, &p->peer_addr, (void *) &cack, NULL); } static void -udp_recv_disc(udp_ep *ep, udp_sp_disc *disc, const nng_sockaddr *sa) +udp_recv_disc(udp_ep *ep, udp_sp_msg *disc, const nng_sockaddr *sa) { udp_pipe *p; nni_aio *aio; - NNI_ARG_UNUSED(sa); + char buf[NNG_MAXADDRSTRLEN]; - p = udp_find_pipe(ep, disc->us_peer_id, disc->us_sender_id); + nng_log_debug("NNG-UDP-DISC", "Received disconnect from %s reason %d", + nng_str_sockaddr(sa, buf, sizeof(buf)), disc->us_reason); + + p = udp_find_pipe(ep, sa); if (p != NULL) { - // For now we aren't validating the sequence numbers. - // This allows for an out of order DISC to cause the - // connection to be dropped, but it should self heal. p->closed = true; while ((aio = nni_list_first(&p->rx_aios)) != NULL) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } + nni_pipe_close(p->npipe); } } // Receive data for the pipe. Returns true if we used // the message, false otherwise. static void -udp_recv_data( - udp_ep *ep, udp_sp_data *dreq, size_t len, const nng_sockaddr *sa) +udp_recv_data(udp_ep *ep, udp_sp_msg *dreq, size_t len, const nng_sockaddr *sa) { // NB: ep mtx is locked udp_pipe *p; @@ -614,24 +591,8 @@ udp_recv_data( nni_msg *msg; nni_time now; - // send_id is the remote peer's ID - // peer_id is our ID (receiver ID) - // sequence number is our sender's sequence - uint32_t send_id = dreq->us_sender_id; - uint32_t peer_id = dreq->us_peer_id; - - // NB: Peer ID endianness does not matter, as long we use it - // consistently. - if ((p = udp_find_pipe(ep, peer_id, send_id)) == NULL) { + if ((p = udp_find_pipe(ep, sa)) == NULL) { nni_stat_inc(&ep->st_rcv_nomatch, 1); - udp_send_disc_full(ep, sa, send_id, peer_id, 0, DISC_NOTCONN); - // Question: how do we store the sockaddr for that? - return; - } - if (p->peer_id == 0) { - // connection isn't formed yet ... send another CREQ - nni_stat_inc(&ep->st_rcv_nomatch, 1); - udp_send_creq(ep, p); return; } @@ -654,11 +615,6 @@ udp_recv_data( nni_msg_chop( ep->rx_payload, nni_msg_len(ep->rx_payload) - dreq->us_length); - if (!udp_check_pipe_sequence(p, dreq->us_sequence)) { - // out of order delivery, drop it - return; - } - // We have a choice to make. Drop this message (easiest), or // drop the oldest. We drop the oldest because generally we // find that applications prefer to have more recent data rather @@ -714,26 +670,26 @@ udp_recv_data( } static void -udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa) +udp_recv_creq(udp_ep *ep, udp_sp_msg *creq, nng_sockaddr *sa) { udp_pipe *p; nni_time now; now = nni_clock(); + + if (ep->closed) { + // endpoint is closing down, just drop it without further ado + return; + } + if (ep->dialer) { // dialers do not accept CREQ requests - udp_send_disc_full(ep, sa, creq->us_peer_id, - creq->us_sender_id, 0, DISC_REFUSED); + udp_send_disc_full(ep, sa, DISC_REFUSED); return; } - if ((p = udp_find_pipe(ep, creq->us_peer_id, creq->us_sender_id))) { - if ((p->peer_id == 0) || (p->peer != creq->us_type)) { - // we don't expect this -- a connection request from a - // peer while we have an outstanding request of our - // own. We *could* compare the sockaddrs to see if they - // match and if so then treat this as just a dueling - // connection. but for now we just discard it -- we'll - // wait for the CACK. + if ((p = udp_find_pipe(ep, sa))) { + if (p->peer != creq->us_type) { + udp_send_disc(ep, p, DISC_TYPE); return; } @@ -754,26 +710,18 @@ udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa) } // new pipe - if (ep->fini || ep->closed) { - // endpoint is closing down, reject it. - udp_send_disc_full( - ep, sa, 0, creq->us_sender_id, 0, DISC_REFUSED); - return; - } if (creq->us_refresh == 0) { - udp_send_disc_full( - ep, sa, 0, creq->us_sender_id, 0, DISC_NEGO); + udp_send_disc_full(ep, sa, DISC_NEGO); return; } if (nni_pipe_alloc_listener((void **) &p, ep->nlistener) != 0) { - udp_send_disc_full( - ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF); + udp_send_disc_full(ep, sa, DISC_NOBUF); return; } - if (udp_pipe_start(p, ep, sa) != 0) { - udp_send_disc_full( - ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF); + + if (udp_pipe_start(p, ep, sa) != NNG_OK) { + udp_send_disc(ep, p, DISC_NOBUF); nni_pipe_close(p->npipe); return; } @@ -782,46 +730,39 @@ udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa) p->refresh = (creq->us_refresh * NNI_SECOND); } p->peer = creq->us_type; - p->peer_id = creq->us_sender_id; - p->peer_seq = creq->us_sequence + 1; - p->sndmax = creq->us_recv_max; + p->sndmax = creq->us_recvmax; p->next_wake = now + UDP_PIPE_REFRESH(p); udp_pipe_schedule(p); + p->state = PIPE_CONN_MATCH; nni_list_append(&ep->connpipes, p); udp_send_cack(ep, p); udp_ep_match(ep); } static void -udp_recv_cack(udp_ep *ep, udp_sp_creq *cack, const nng_sockaddr *sa) +udp_recv_cack(udp_ep *ep, udp_sp_msg *cack, const nng_sockaddr *sa) { udp_pipe *p; - bool first; nni_time now; - if ((p = udp_find_pipe(ep, cack->us_peer_id, cack->us_sender_id)) && - (!p->closed)) { - if ((p->peer_id != 0) && (p->peer != cack->us_type)) { + if ((p = udp_find_pipe(ep, sa)) && (!p->closed)) { + if (p->peer != cack->us_type) { udp_send_disc(ep, p, DISC_TYPE); return; } - first = (p->peer_id == 0); - // so we know who it is from.. this is a refresh. - p->sndmax = cack->us_recv_max; - p->peer = cack->us_type; - p->peer_id = cack->us_sender_id; + p->sndmax = cack->us_recvmax; + p->peer = cack->us_type; if (cack->us_refresh == 0) { udp_send_disc(ep, p, DISC_NEGO); return; } - if (first) { - p->refresh = ep->refresh; - p->peer_seq = cack->us_sequence + 1; - } + // Always reset this, as dialers may have started with an + // unreasonably low value. + p->refresh = ep->refresh; if ((cack->us_refresh * NNI_SECOND) < p->refresh) { p->refresh = cack->us_refresh * NNI_SECOND; } @@ -830,16 +771,13 @@ udp_recv_cack(udp_ep *ep, udp_sp_creq *cack, const nng_sockaddr *sa) p->expire = now + UDP_PIPE_TIMEOUT(p); udp_pipe_schedule(p); - if (first) { + if (p->state < PIPE_CONN_MATCH) { + p->state = PIPE_CONN_MATCH; nni_list_append(&ep->connpipes, p); udp_ep_match(ep); } return; } - - // a CACK without a corresponding CREQ (or timed out pipe already) - udp_send_disc_full( - ep, sa, cack->us_peer_id, cack->us_sender_id, 0, DISC_NOTCONN); } static void @@ -904,41 +842,36 @@ udp_rx_cb(void *arg) sa = &ep->rx_sa; n = nng_aio_count(aio); - if ((n >= sizeof(*hdr)) && (hdr->data.us_ver == 1)) { + if ((n >= sizeof(*hdr)) && (hdr->us_ver == 1)) { n -= sizeof(*hdr); #ifndef NNG_LITTLE_ENDIAN // Fix the endianness, so other routines don't have to. - // It turns out that the endianness of the fields of CREQ - // is compatible with the fields of every other message type. // We only have to do this for systems that are not known // (at compile time) to be little endian. - hdr->data.us_type = NNI_GET16LE(&hdr->data.us_type); - hdr->data.us_sender_id = NNI_GET32LE(&hdr->data.us_sender_id); - hdr->data.us_peeer_id = NNI_GET32LE(&hdr->data.us_peer_id); - hdr->data.us_sequence = NNI_GET32LE(&hdr->data.us_sequence); - hdr->data.us_length = NNI_GET16LE(&hdr->data.us_length); + hdr->us_type = NNI_GET16LE(&hdr->us_type); + hdr->us_params[0] = NNI_GET16LE(&hdr->us_params[0]); + hdr->us_params[1] = NNI_GET16LE(&hdr->us_params[1]); #endif - switch (hdr->data.us_op_code) { + switch (hdr->us_op_code) { case OPCODE_DATA: - udp_recv_data(ep, &hdr->data, n, sa); + udp_recv_data(ep, hdr, n, sa); break; case OPCODE_CREQ: - udp_recv_creq(ep, &hdr->creq, sa); + udp_recv_creq(ep, hdr, sa); break; case OPCODE_CACK: - udp_recv_cack(ep, &hdr->cack, sa); + udp_recv_cack(ep, hdr, sa); break; case OPCODE_DISC: - udp_recv_disc(ep, &hdr->disc, sa); + udp_recv_disc(ep, hdr, sa); break; case OPCODE_MESH: // TODO: // udp_recv_mesh(ep, &hdr->mesh, sa); // break; default: - udp_send_disc_full( - ep, sa, 0, hdr->data.us_sender_id, 0, DISC_PROTO); + udp_send_disc_full(ep, sa, DISC_PROTO); break; } } @@ -959,11 +892,11 @@ finish: static void udp_pipe_send(void *arg, nni_aio *aio) { - udp_pipe *p = arg; - udp_ep *ep; - udp_sp_data dreq; - nng_msg *msg; - size_t count = 0; + udp_pipe *p = arg; + udp_ep *ep; + udp_sp_msg dreq; + nng_msg *msg; + size_t count = 0; msg = nni_aio_get_msg(aio); ep = p->ep; @@ -976,23 +909,20 @@ udp_pipe_send(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); if ((nni_msg_len(msg) + nni_msg_header_len(msg)) > p->sndmax) { nni_mtx_unlock(&ep->mtx); - // rather failing this with an error, we just drop it on the - // floor. this is on the sender, so there isn't a compelling - // need to disconnect the pipe, since it we're not being - // "ill-behaved" to our peer. + // rather failing this with an error, we just drop it on + // the floor. this is on the sender, so there isn't a + // compelling need to disconnect the pipe, since it we're + // not being "ill-behaved" to our peer. nni_aio_finish(aio, 0, count); nni_stat_inc(&ep->st_snd_toobig, 1); nni_msg_free(msg); return; } - dreq.us_ver = 1; - dreq.us_type = ep->proto; - dreq.us_op_code = OPCODE_DATA; - dreq.us_sender_id = p->self_id; - dreq.us_peer_id = p->peer_id; - dreq.us_sequence = p->self_seq++; - dreq.us_length = (uint16_t) count; + dreq.us_ver = 1; + dreq.us_type = ep->proto; + dreq.us_op_code = OPCODE_DATA; + dreq.us_length = (uint16_t) count; // Just queue it, or fail it. udp_queue_tx(ep, &p->peer_addr, (void *) &dreq, msg); @@ -1107,7 +1037,7 @@ udp_ep_fini(void *arg) nni_aio_fini(&ep->rx_aio); if (ep->udp != NULL) { - nni_udp_close(ep->udp); + nng_udp_close(ep->udp); } for (int i = 0; i < ep->tx_ring.size; i++) { @@ -1122,16 +1052,24 @@ udp_ep_fini(void *arg) static void udp_ep_close(void *arg) { - udp_ep *ep = arg; - nni_aio *aio; + udp_ep *ep = arg; + udp_pipe *p; + nni_aio *aio; + uint32_t cursor; + uint64_t key; + + nni_mtx_lock(&ep->mtx); + ep->closed = true; + // leave tx open so we can send disconnects nni_aio_close(&ep->resaio); nni_aio_close(&ep->rx_aio); nni_aio_close(&ep->timeaio); - // leave tx open so we can send disconnects - - nni_mtx_lock(&ep->mtx); + // close all the underlying pipes, so the peer can see it. + while (nni_id_visit(&ep->pipes, &key, (void **) &p, &cursor)) { + nni_pipe_close(p->npipe); + } while ((aio = nni_list_first(&ep->connaios)) != NULL) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECONNABORTED); @@ -1154,7 +1092,6 @@ udp_ep_stop(void *arg) nni_time linger = nni_clock() + NNI_SECOND / 2; // half second to drain, max nni_mtx_lock(&ep->mtx); - ep->fini = true; while ((ep->tx_ring.count > 0) && (nni_clock() < linger)) { nni_mtx_unlock(&ep->mtx); nng_msleep(1); @@ -1165,6 +1102,7 @@ udp_ep_stop(void *arg) "Lingering timed out on endpoint close, peer " "notifications dropped"); } + ep->stopped = true; nni_mtx_unlock(&ep->mtx); // finally close the tx channel @@ -1210,8 +1148,8 @@ udp_timer_cb(void *arg) "Pipe peer %s timed out due to inactivity", nng_str_sockaddr(&p->peer_addr, buf, sizeof(buf))); - // Possibly alert the dialer, so it can restart a new - // attempt. + // Possibly alert the dialer, so it can restart a + // new attempt. if ((ep->dialer) && (p->peer_id == 0) && (aio = nni_list_first(&ep->connaios))) { nni_aio_list_remove(aio); @@ -1285,6 +1223,7 @@ udp_ep_init( ep->self_sa.s_family = ep->af; ep->proto = nni_sock_proto_id(sock); + ep->peer = nni_sock_peer_id(sock); ep->url = url; ep->refresh = NNG_UDP_REFRESH; // one minute by default ep->rcvmax = NNG_UDP_RECVMAX; @@ -1298,9 +1237,6 @@ udp_ep_init( NNG_STAT_LEVEL, NNG_UNIT_BYTES); NNI_STAT_LOCK(copy_max_info, "copy_max", "threshold to switch to loan-up", NNG_STAT_LEVEL, NNG_UNIT_BYTES); - NNI_STAT_LOCK(rcv_reorder_info, "rcv_reorder", - "messages received out of order", NNG_STAT_COUNTER, - NNG_UNIT_MESSAGES); NNI_STAT_LOCK(rcv_nomatch_info, "rcv_nomatch", "messages without a matching connection", NNG_STAT_COUNTER, NNG_UNIT_MESSAGES); @@ -1330,7 +1266,6 @@ udp_ep_init( nni_stat_init_lock(&ep->st_copy_max, ©_max_info, &ep->mtx); nni_stat_init_lock(&ep->st_rcv_copy, &rcv_copy_info, &ep->mtx); nni_stat_init_lock(&ep->st_rcv_nocopy, &rcv_nocopy_info, &ep->mtx); - nni_stat_init_lock(&ep->st_rcv_reorder, &rcv_reorder_info, &ep->mtx); nni_stat_init_lock(&ep->st_rcv_toobig, &rcv_toobig_info, &ep->mtx); nni_stat_init_lock(&ep->st_rcv_nomatch, &rcv_nomatch_info, &ep->mtx); nni_stat_init_lock(&ep->st_rcv_nobuf, &rcv_nobuf_info, &ep->mtx); @@ -1345,7 +1280,6 @@ udp_ep_init( nni_listener_add_stat(l, &ep->st_copy_max); nni_listener_add_stat(l, &ep->st_rcv_copy); nni_listener_add_stat(l, &ep->st_rcv_nocopy); - nni_listener_add_stat(l, &ep->st_rcv_reorder); nni_listener_add_stat(l, &ep->st_rcv_toobig); nni_listener_add_stat(l, &ep->st_rcv_nomatch); nni_listener_add_stat(l, &ep->st_rcv_nobuf); @@ -1358,7 +1292,6 @@ udp_ep_init( nni_dialer_add_stat(d, &ep->st_copy_max); nni_dialer_add_stat(d, &ep->st_rcv_copy); nni_dialer_add_stat(d, &ep->st_rcv_nocopy); - nni_dialer_add_stat(d, &ep->st_rcv_reorder); nni_dialer_add_stat(d, &ep->st_rcv_toobig); nni_dialer_add_stat(d, &ep->st_rcv_nomatch); nni_dialer_add_stat(d, &ep->st_rcv_nobuf); @@ -1478,7 +1411,7 @@ udp_resolv_cb(void *arg) } if (ep->udp == NULL) { - if ((rv = nni_udp_open(&ep->udp, &ep->self_sa)) != 0) { + if ((rv = nng_udp_open(&ep->udp, &ep->self_sa)) != NNG_OK) { nni_aio_list_remove(aio); nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); @@ -1487,16 +1420,19 @@ udp_resolv_cb(void *arg) } // places a "hold" on the ep - if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) != 0) { + if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) != + NNG_OK) { nni_aio_list_remove(aio); nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); + return; } - if ((rv = udp_pipe_start(p, ep, &ep->peer_sa)) != 0) { + if ((rv = udp_pipe_start(p, ep, &ep->peer_sa)) != NNG_OK) { nni_aio_list_remove(aio); nni_pipe_close(p->npipe); nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); + return; } udp_pipe_schedule(p); @@ -1562,7 +1498,7 @@ udp_ep_get_port(void *arg, void *buf, size_t *szp, nni_type t) nni_mtx_lock(&ep->mtx); if (ep->udp != NULL) { - (void) nni_udp_sockname(ep->udp, &sa); + (void) nng_udp_sockname(ep->udp, &sa); } else { sa = ep->self_sa; } @@ -1597,7 +1533,7 @@ udp_ep_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) nng_sockaddr sa; if (ep->udp != NULL) { - (void) nni_udp_sockname(ep->udp, &sa); + (void) nng_udp_sockname(ep->udp, &sa); } else { sa = ep->self_sa; } @@ -1699,6 +1635,7 @@ udp_ep_match(udp_ep *ep) return; } + p->state = PIPE_CONN_DONE; nni_aio_list_remove(aio); nni_list_remove(&ep->connpipes, p); nni_aio_set_output(aio, 0, p->npipe); @@ -1724,13 +1661,13 @@ udp_ep_bind(void *arg, nng_url *url) return (NNG_EBUSY); } - rv = nni_udp_open(&ep->udp, &ep->self_sa); + rv = nng_udp_open(&ep->udp, &ep->self_sa); if (rv != NNG_OK) { nni_mtx_unlock(&ep->mtx); return (rv); } nng_sockaddr sa; - nni_plat_udp_sockname(ep->udp, &sa); + nng_udp_sockname(ep->udp, &sa); url->u_port = nng_sockaddr_port(&sa); udp_ep_start(ep); nni_mtx_unlock(&ep->mtx); diff --git a/src/sp/transport/udp/udp_tran_test.c b/src/sp/transport/udp/udp_tran_test.c index 0b4b180d..56a25cce 100644 --- a/src/sp/transport/udp/udp_tran_test.c +++ b/src/sp/transport/udp/udp_tran_test.c @@ -496,6 +496,66 @@ test_udp_pipe(void) NUTS_CLOSE(s0); NUTS_CLOSE(s1); } + +void +test_udp_reconnect_dialer(void) +{ + nng_socket s0; + nng_socket s1; + nng_listener l; + nng_dialer d; + char *addr; + nng_msg *msg; + nng_sockaddr sa0; + + NUTS_LOGGING(); + NUTS_ADDR(addr, "udp4"); + + // For this test, using PUB sub is better to avoid fighting the pair + // exclusion. Generally speaking pair is probably a poor fit for UDP + // anyway. + + NUTS_PASS(nng_pub0_open(&s0)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 2000)); + NUTS_PASS(nng_listener_create(&l, s0, addr)); + NUTS_PASS(nng_listener_start(l, 0)); + NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa0)); + + NUTS_PASS(nng_sub0_open(&s1)); + NUTS_PASS(nng_sub0_socket_subscribe(s1, "", 0)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 2000)); + NUTS_PASS(nng_dialer_create(&d, s1, addr)); + NUTS_PASS(nng_dialer_start(d, 0)); + + nng_msleep(500); + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_sendmsg(s0, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + nng_msg_free(msg); + + // now close the dialer + NUTS_PASS(nng_dialer_close(d)); + + // wait a bit before starting a new one -- allow for the old one to + // tear down completely on both ends. + nng_msleep(500); + + // and create and start a new one + NUTS_PASS(nng_dialer_create(&d, s1, addr)); + NUTS_PASS(nng_dialer_start(d, 0)); + nng_msleep(500); + + // show that we still send and receive + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_sendmsg(s0, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + nng_msg_free(msg); + + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); +} + void test_udp_stats(void) { @@ -559,6 +619,7 @@ NUTS_TESTS = { { "udp multi small burst", test_udp_multi_small_burst }, { "udp crush", test_udp_crush }, { "udp pipe", test_udp_pipe }, + { "udp reconnect dialer", test_udp_reconnect_dialer }, { "udp stats", test_udp_stats }, { NULL, NULL }, }; |
