aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/sp/transport/udp/udp.c517
-rw-r--r--src/sp/transport/udp/udp_tran_test.c61
2 files changed, 288 insertions, 290 deletions
diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c
index 3722549e..b0945737 100644
--- a/src/sp/transport/udp/udp.c
+++ b/src/sp/transport/udp/udp.c
@@ -5,6 +5,7 @@
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//
+#include <stdio.h>
#include "core/aio.h"
#include "core/defs.h"
@@ -22,14 +23,6 @@
typedef struct udp_pipe udp_pipe;
typedef struct udp_ep udp_ep;
-// These should reallyh be renamed for the project.
-#define nni_udp_open nni_plat_udp_open
-#define nni_udp_close nni_plat_udp_close
-#define nni_udp_send nni_plat_udp_send
-#define nni_udp_recv nni_plat_udp_recv
-#define nni_udp nni_plat_udp
-#define nni_udp_sockname nni_plat_udp_sockname
-
// OP code, 8 bits
enum udp_opcode {
OPCODE_DATA = 0,
@@ -43,7 +36,6 @@ enum udp_opcode {
typedef enum udp_disc_reason {
DISC_CLOSED = 0, // normal close
DISC_TYPE = 1, // bad SP type
- DISC_NOTCONN = 2, // no such connection
DISC_REFUSED = 3, // refused by policy
DISC_MSGSIZE = 4, // message too large
DISC_NEGO = 5, // neogtiation failed
@@ -80,61 +72,18 @@ typedef enum udp_disc_reason {
// NB: Each of the following messages is exactly 20 bytes in size
-typedef struct udp_sp_data {
- uint8_t us_ver;
- uint8_t us_op_code;
- uint16_t us_type;
- uint32_t us_sender_id;
- uint32_t us_peer_id;
- uint32_t us_sequence;
- uint16_t us_length;
- uint16_t us_reserved; // depends on message type
-} udp_sp_data;
-
-typedef struct udp_sp_creq {
+typedef struct udp_sp_msg {
uint8_t us_ver;
uint8_t us_op_code;
uint16_t us_type;
- uint32_t us_sender_id;
- uint32_t us_peer_id;
- uint32_t us_sequence;
- uint16_t us_recv_max; // actually max payload size
- uint8_t us_reserved;
- uint8_t us_refresh;
-} udp_sp_creq;
-
-typedef struct udp_sp_disc {
- uint8_t us_ver;
- uint8_t us_op_code;
- uint16_t us_type;
- uint32_t us_sender_id;
- uint32_t us_peer_id;
- uint32_t us_sequence;
- uint16_t us_reason; // depends on message type
- uint16_t us_reserved;
-} udp_sp_disc;
-
-typedef struct udp_sp_mesh {
- uint8_t us_ver;
- uint8_t us_op_code;
- uint16_t us_type;
- uint32_t us_sender_id;
- uint32_t us_reserved1;
- uint32_t us_sequence;
- uint32_t us_reserved2;
-} udp_sp_mesh;
-
-// ack is the same format as request
-typedef struct udp_sp_creq udp_sp_cack;
-
-typedef union udp_sp_msg {
- udp_sp_data data;
- udp_sp_creq creq;
- udp_sp_cack cack;
- udp_sp_disc disc;
- udp_sp_mesh mesh;
+ uint16_t us_params[2];
} udp_sp_msg;
+#define us_length us_params[0] // for DATA requests
+#define us_recvmax us_params[0] // for CREQ, CACK
+#define us_refresh us_params[1] // for CREQ, CACK
+#define us_reason us_params[0] // for DISC
+
// Like a NIC driver, this is a "descriptor" for UDP TX packets.
// This allows us to create a circular ring of these to support
// queueing for TX gracefully.
@@ -153,6 +102,12 @@ typedef struct udp_txring {
uint16_t size;
} udp_txring;
+typedef enum {
+ PIPE_CONN_INIT, // pipe is created, but not yet matched to a peer
+ PIPE_CONN_MATCH, // pipe matched to peer, but not added to SP socket
+ PIPE_CONN_DONE, // pipe is fully owned by the SP socket
+} udp_pipe_state;
+
#define UDP_TXRING_SZ 128
// UDP pipe resend (CREQ) in msec (nng_duration)
@@ -162,36 +117,37 @@ typedef struct udp_txring {
#define UDP_PIPE_TIMEOUT(p) ((p)->refresh * 5)
struct udp_pipe {
- udp_ep *ep;
- nni_pipe *npipe;
- nng_sockaddr peer_addr;
- uint16_t peer;
- uint16_t proto;
- uint32_t self_id;
- uint32_t peer_id;
- uint32_t self_seq;
- uint32_t peer_seq;
- uint16_t sndmax; // peer's max recv size
- uint16_t rcvmax; // max recv size
- bool closed;
- bool dialer;
- nng_duration refresh; // seconds, for the protocol
- nng_time next_wake;
- nng_time next_creq;
- nng_time expire;
- nni_list_node node;
- nni_lmq rx_mq;
- nni_list rx_aios;
+ udp_ep *ep;
+ nni_pipe *npipe;
+ nng_sockaddr peer_addr;
+ uint16_t peer;
+ uint16_t proto;
+ uint64_t id;
+ uint32_t self_id;
+ uint32_t peer_id;
+ uint16_t sndmax; // peer's max recv size
+ uint16_t rcvmax; // max recv size
+ bool closed;
+ bool dialer;
+ nng_duration refresh; // seconds, for the protocol
+ nng_time next_wake;
+ nng_time next_creq;
+ nng_time expire;
+ nni_list_node node;
+ nni_lmq rx_mq;
+ nni_list rx_aios;
+ udp_pipe_state state;
};
struct udp_ep {
- nni_udp *udp;
+ nng_udp *udp;
nni_mtx mtx;
uint16_t proto;
+ uint16_t peer;
uint16_t af; // address family
- bool fini;
bool started;
bool closed;
+ bool stopped;
bool cooldown;
nng_url *url;
const char *host; // for dialers
@@ -223,7 +179,6 @@ struct udp_ep {
nni_resolv_item resolv;
nni_stat_item st_rcv_max;
- nni_stat_item st_rcv_reorder;
nni_stat_item st_rcv_toobig;
nni_stat_item st_rcv_nomatch;
nni_stat_item st_rcv_copy;
@@ -240,13 +195,14 @@ static void udp_resolv_cb(void *);
static void udp_rx_cb(void *);
static void udp_recv_data(
- udp_ep *ep, udp_sp_data *dreq, size_t len, const nng_sockaddr *sa);
-static void udp_send_disc_full(udp_ep *ep, const nng_sockaddr *sa,
- uint32_t local_id, uint32_t remote_id, uint32_t seq,
- udp_disc_reason reason);
+ udp_ep *ep, udp_sp_msg *msg, size_t len, const nng_sockaddr *sa);
+static void udp_send_disc_full(
+ udp_ep *ep, const nng_sockaddr *sa, udp_disc_reason reason);
static void udp_send_disc(udp_ep *ep, udp_pipe *p, udp_disc_reason reason);
-static void udp_ep_match(udp_ep *ep);
+static void udp_ep_match(udp_ep *ep);
+static nng_err udp_add_pipe(udp_ep *ep, udp_pipe *p);
+static void udp_remove_pipe(udp_pipe *p);
static void
udp_tran_init(void)
@@ -266,6 +222,7 @@ udp_pipe_close(void *arg)
nni_aio *aio;
nni_mtx_lock(&ep->mtx);
+ udp_remove_pipe(p);
udp_send_disc(ep, p, DISC_CLOSED);
while ((aio = nni_list_first(&p->rx_aios)) != NULL) {
nni_aio_list_remove(aio);
@@ -283,11 +240,7 @@ udp_pipe_stop(void *arg)
udp_pipe_close(arg);
nni_mtx_lock(&ep->mtx);
- if (p->self_id != 0) {
- nni_id_remove(&p->ep->pipes, p->self_id);
- p->self_id = 0;
- }
- nni_list_node_remove(&p->node);
+ udp_remove_pipe(p);
nni_mtx_unlock(&ep->mtx);
}
@@ -301,19 +254,21 @@ udp_pipe_init(void *arg, nni_pipe *npipe)
return (0);
}
-static int
-udp_pipe_start(udp_pipe *p, udp_ep *ep, nng_sockaddr *sa)
+static nng_err
+udp_pipe_start(udp_pipe *p, udp_ep *ep, const nng_sockaddr *sa)
{
nni_time now = nni_clock();
p->ep = ep;
p->proto = ep->proto;
+ p->peer = ep->peer;
p->peer_addr = *sa;
p->dialer = ep->dialer;
p->refresh = p->dialer ? NNG_UDP_CONNRETRY : ep->refresh;
p->rcvmax = ep->rcvmax;
- p->self_seq = nni_random();
+ p->id = nng_sockaddr_hash(sa);
p->expire = now + (p->dialer ? (5 * NNI_SECOND) : UDP_PIPE_TIMEOUT(p));
- return (nni_id_alloc32(&ep->pipes, &p->self_id, p));
+
+ return (udp_add_pipe(ep, p));
}
static void
@@ -331,38 +286,71 @@ udp_pipe_fini(void *arg)
NNI_ASSERT(nni_list_empty(&p->rx_aios));
}
-// Find the pipe matching the given id (our pipe id, taken from the peer_id
-// of the header) and peer's sockaddr. Returns NULL if not found. The
-// ep lock must be held. If a pending pipe (not yet connected) is found, then
-// it is returned instead.
static udp_pipe *
-udp_find_pipe(udp_ep *ep, uint32_t self_id, uint32_t peer_id)
+udp_find_pipe(udp_ep *ep, const nng_sockaddr *peer_addr)
{
+ uint64_t id = nng_sockaddr_hash(peer_addr);
udp_pipe *p;
- if (((p = nni_id_get(&ep->pipes, self_id)) != NULL) && (!p->closed)) {
- if (p->peer_id == 0 || p->peer_id == peer_id) {
+
+ // we'll keep incrementing id until we conclusively match
+ // or we get a NULL. This is another level of rehashing, but
+ // it keeps us from having to look up.
+ for (;;) {
+ if ((p = nni_id_get(&ep->pipes, id)) == NULL) {
+ return (NULL);
+ }
+ if (nng_sockaddr_equal(&p->peer_addr, peer_addr)) {
return (p);
}
+ id++;
+ if (id == 0) {
+ id = 1;
+ }
+ }
+}
+
+static void
+udp_remove_pipe(udp_pipe *p)
+{
+ // ep locked
+ udp_ep *ep = p->ep;
+ uint64_t id = p->id;
+ if (id == 0) {
+ return;
+ }
+ p->id = 0;
+ for (;;) {
+ udp_pipe *srch;
+ if ((srch = nni_id_get(&ep->pipes, id)) == NULL) {
+ break;
+ }
+ if (srch == p) {
+ nni_id_remove(&ep->pipes, id);
+ break;
+ }
+ id++;
+ if (id == 0) {
+ id = 1;
+ }
+ }
+ if (p->state < PIPE_CONN_DONE) {
+ nni_list_node_remove(&p->node);
+ nni_pipe_rele(p->npipe);
}
- return (NULL);
}
-static bool
-udp_check_pipe_sequence(udp_pipe *p, uint32_t seq)
+static nng_err
+udp_add_pipe(udp_ep *ep, udp_pipe *p)
{
- int32_t delta;
- // signed math so we can see how far apart they are
- delta = (int32_t) (seq - p->peer_seq);
- if (delta < 0) {
- // out of order delivery
- nni_stat_inc(&p->ep->st_rcv_reorder, 1);
- return (false);
- }
- if (delta > 0) {
- nni_stat_inc(&p->ep->st_rcv_reorder, 1);
- }
- p->peer_seq = seq + 1; // expected next sequence number
- return (true);
+ // Id must be part of the hash
+ uint64_t id = p->id;
+ while (nni_id_get(&ep->pipes, id) != NULL) {
+ id++;
+ if (id == 0) {
+ id = 1;
+ }
+ }
+ return (nni_id_set(&ep->pipes, id, p));
}
static void
@@ -388,6 +376,10 @@ udp_start_rx(udp_ep *ep)
{
nni_iov iov;
+ if (ep->closed) {
+ return;
+ }
+
// We use this trick to collect the message header so that we can
// do the entire message in a single iov, which avoids the need to
// scatter/gather (which can be problematic for platforms that cannot
@@ -400,7 +392,7 @@ udp_start_rx(udp_ep *ep)
nni_aio_set_input(&ep->rx_aio, 0, &ep->rx_sa);
nni_aio_set_iov(&ep->rx_aio, 1, &iov);
- nni_udp_recv(ep->udp, &ep->rx_aio);
+ nng_udp_recv(ep->udp, &ep->rx_aio);
}
static void
@@ -410,7 +402,7 @@ udp_start_tx(udp_ep *ep)
udp_txdesc *desc;
nng_msg *msg;
- if ((!ring->count) || (!ep->started) || ep->tx_busy) {
+ if ((!ring->count) || (!ep->started) || ep->tx_busy || ep->stopped) {
return;
}
ep->tx_busy = true;
@@ -442,7 +434,7 @@ udp_start_tx(udp_ep *ep)
nni_aio_set_iov(&ep->tx_aio, niov, iov);
// it should *never* take this long, but allow for ARP resolution
nni_aio_set_timeout(&ep->tx_aio, NNI_SECOND * 10);
- nni_udp_send(ep->udp, &ep->tx_aio);
+ nng_udp_send(ep->udp, &ep->tx_aio);
}
static void
@@ -465,19 +457,13 @@ udp_queue_tx(
desc->header = *msg;
#else
// Fix the endianness, so other routines don't have to.
- // It turns out that the endianness of the fields of CREQ
- // is compatible with the fields of every other message type.
// We only have to do this for systems that are not known
// (at compile time) to be little endian.
- desc->header.creq.us_ver = 0x1;
- desc->header.creq.us_op_code = msg->creq.us_op_code;
- NNI_PUT16LE(&desc->header.creq.us_type, msg->creq.us_type);
- NNI_PUT32LE(&desc->header.creq.us_sended_id, msg->creq.us_sender_id);
- NNI_PUT32LE(&desc->header.creq.us_peer_id, msg->creq.us_peer_id);
- NNI_PUT32LE(&desc->header.creq.us_sequence, msg->creq.us_sequence);
- NNI_PUT16LE(&desc->header.creq.us_recv_max, msg->creq.us_recv_max);
- desc->header.creq.us_reserved = 0;
- desc->header.creq.us_refresh = msg->creq.us_refresh;
+ desc->header.us_ver = 0x1;
+ desc->header.us_op_code = msg->us_op_code;
+ NNI_PUT16LE(&desc->header.us_type, msg->us_type);
+ NNI_PUT16LE(&desc->header.us_params[0], msg->us_params[0]);
+ NNI_PUT16LE(&desc->header.us_params[1], msg->us_params[1]);
#endif
desc->payload = payload;
@@ -528,40 +514,34 @@ udp_send_disc(udp_ep *ep, udp_pipe *p, udp_disc_reason reason)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- udp_send_disc_full(
- ep, &p->peer_addr, p->self_id, p->peer_id, p->self_seq++, reason);
+ udp_send_disc_full(ep, &p->peer_addr, reason);
+ nni_pipe_close(p->npipe);
}
static void
-udp_send_disc_full(udp_ep *ep, const nng_sockaddr *sa, uint32_t local_id,
- uint32_t remote_id, uint32_t seq, udp_disc_reason reason)
+udp_send_disc_full(udp_ep *ep, const nng_sockaddr *sa, udp_disc_reason reason)
{
- udp_sp_disc disc;
+ udp_sp_msg disc;
disc.us_ver = 0x1;
disc.us_op_code = OPCODE_DISC;
disc.us_type = ep->proto;
- disc.us_sender_id = local_id;
- disc.us_peer_id = remote_id;
- disc.us_sequence = seq;
disc.us_reason = (uint16_t) reason;
+ disc.us_params[1] = 0;
udp_queue_tx(ep, sa, (void *) &disc, NULL);
}
static void
udp_send_creq(udp_ep *ep, udp_pipe *p)
{
- udp_sp_creq creq;
- creq.us_ver = 0x1;
- creq.us_op_code = OPCODE_CREQ;
- creq.us_type = p->proto;
- creq.us_sender_id = p->self_id;
- creq.us_peer_id = p->peer_id;
- creq.us_sequence = p->self_seq++;
- creq.us_recv_max = p->rcvmax;
- creq.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND;
- p->next_creq = nni_clock() + UDP_PIPE_REFRESH(p);
- p->next_wake = p->next_creq;
+ udp_sp_msg creq;
+ creq.us_ver = 0x1;
+ creq.us_op_code = OPCODE_CREQ;
+ creq.us_type = p->proto;
+ creq.us_recvmax = p->rcvmax;
+ creq.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND;
+ p->next_creq = nni_clock() + UDP_PIPE_REFRESH(p);
+ p->next_wake = p->next_creq;
udp_pipe_schedule(p);
udp_queue_tx(ep, &p->peer_addr, (void *) &creq, NULL);
@@ -570,43 +550,40 @@ udp_send_creq(udp_ep *ep, udp_pipe *p)
static void
udp_send_cack(udp_ep *ep, udp_pipe *p)
{
- udp_sp_cack cack;
- cack.us_ver = 0x01;
- cack.us_op_code = OPCODE_CACK;
- cack.us_type = p->proto;
- cack.us_sender_id = p->self_id;
- cack.us_peer_id = p->peer_id;
- cack.us_sequence = p->self_seq++;
- cack.us_recv_max = p->rcvmax;
- cack.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND;
+ udp_sp_msg cack;
+ cack.us_ver = 0x01;
+ cack.us_op_code = OPCODE_CACK;
+ cack.us_type = p->proto;
+ cack.us_recvmax = p->rcvmax;
+ cack.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND;
udp_queue_tx(ep, &p->peer_addr, (void *) &cack, NULL);
}
static void
-udp_recv_disc(udp_ep *ep, udp_sp_disc *disc, const nng_sockaddr *sa)
+udp_recv_disc(udp_ep *ep, udp_sp_msg *disc, const nng_sockaddr *sa)
{
udp_pipe *p;
nni_aio *aio;
- NNI_ARG_UNUSED(sa);
+ char buf[NNG_MAXADDRSTRLEN];
- p = udp_find_pipe(ep, disc->us_peer_id, disc->us_sender_id);
+ nng_log_debug("NNG-UDP-DISC", "Received disconnect from %s reason %d",
+ nng_str_sockaddr(sa, buf, sizeof(buf)), disc->us_reason);
+
+ p = udp_find_pipe(ep, sa);
if (p != NULL) {
- // For now we aren't validating the sequence numbers.
- // This allows for an out of order DISC to cause the
- // connection to be dropped, but it should self heal.
p->closed = true;
while ((aio = nni_list_first(&p->rx_aios)) != NULL) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
+ nni_pipe_close(p->npipe);
}
}
// Receive data for the pipe. Returns true if we used
// the message, false otherwise.
static void
-udp_recv_data(
- udp_ep *ep, udp_sp_data *dreq, size_t len, const nng_sockaddr *sa)
+udp_recv_data(udp_ep *ep, udp_sp_msg *dreq, size_t len, const nng_sockaddr *sa)
{
// NB: ep mtx is locked
udp_pipe *p;
@@ -614,24 +591,8 @@ udp_recv_data(
nni_msg *msg;
nni_time now;
- // send_id is the remote peer's ID
- // peer_id is our ID (receiver ID)
- // sequence number is our sender's sequence
- uint32_t send_id = dreq->us_sender_id;
- uint32_t peer_id = dreq->us_peer_id;
-
- // NB: Peer ID endianness does not matter, as long we use it
- // consistently.
- if ((p = udp_find_pipe(ep, peer_id, send_id)) == NULL) {
+ if ((p = udp_find_pipe(ep, sa)) == NULL) {
nni_stat_inc(&ep->st_rcv_nomatch, 1);
- udp_send_disc_full(ep, sa, send_id, peer_id, 0, DISC_NOTCONN);
- // Question: how do we store the sockaddr for that?
- return;
- }
- if (p->peer_id == 0) {
- // connection isn't formed yet ... send another CREQ
- nni_stat_inc(&ep->st_rcv_nomatch, 1);
- udp_send_creq(ep, p);
return;
}
@@ -654,11 +615,6 @@ udp_recv_data(
nni_msg_chop(
ep->rx_payload, nni_msg_len(ep->rx_payload) - dreq->us_length);
- if (!udp_check_pipe_sequence(p, dreq->us_sequence)) {
- // out of order delivery, drop it
- return;
- }
-
// We have a choice to make. Drop this message (easiest), or
// drop the oldest. We drop the oldest because generally we
// find that applications prefer to have more recent data rather
@@ -714,26 +670,26 @@ udp_recv_data(
}
static void
-udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa)
+udp_recv_creq(udp_ep *ep, udp_sp_msg *creq, nng_sockaddr *sa)
{
udp_pipe *p;
nni_time now;
now = nni_clock();
+
+ if (ep->closed) {
+ // endpoint is closing down, just drop it without further ado
+ return;
+ }
+
if (ep->dialer) {
// dialers do not accept CREQ requests
- udp_send_disc_full(ep, sa, creq->us_peer_id,
- creq->us_sender_id, 0, DISC_REFUSED);
+ udp_send_disc_full(ep, sa, DISC_REFUSED);
return;
}
- if ((p = udp_find_pipe(ep, creq->us_peer_id, creq->us_sender_id))) {
- if ((p->peer_id == 0) || (p->peer != creq->us_type)) {
- // we don't expect this -- a connection request from a
- // peer while we have an outstanding request of our
- // own. We *could* compare the sockaddrs to see if they
- // match and if so then treat this as just a dueling
- // connection. but for now we just discard it -- we'll
- // wait for the CACK.
+ if ((p = udp_find_pipe(ep, sa))) {
+ if (p->peer != creq->us_type) {
+ udp_send_disc(ep, p, DISC_TYPE);
return;
}
@@ -754,26 +710,18 @@ udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa)
}
// new pipe
- if (ep->fini || ep->closed) {
- // endpoint is closing down, reject it.
- udp_send_disc_full(
- ep, sa, 0, creq->us_sender_id, 0, DISC_REFUSED);
- return;
- }
if (creq->us_refresh == 0) {
- udp_send_disc_full(
- ep, sa, 0, creq->us_sender_id, 0, DISC_NEGO);
+ udp_send_disc_full(ep, sa, DISC_NEGO);
return;
}
if (nni_pipe_alloc_listener((void **) &p, ep->nlistener) != 0) {
- udp_send_disc_full(
- ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF);
+ udp_send_disc_full(ep, sa, DISC_NOBUF);
return;
}
- if (udp_pipe_start(p, ep, sa) != 0) {
- udp_send_disc_full(
- ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF);
+
+ if (udp_pipe_start(p, ep, sa) != NNG_OK) {
+ udp_send_disc(ep, p, DISC_NOBUF);
nni_pipe_close(p->npipe);
return;
}
@@ -782,46 +730,39 @@ udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa)
p->refresh = (creq->us_refresh * NNI_SECOND);
}
p->peer = creq->us_type;
- p->peer_id = creq->us_sender_id;
- p->peer_seq = creq->us_sequence + 1;
- p->sndmax = creq->us_recv_max;
+ p->sndmax = creq->us_recvmax;
p->next_wake = now + UDP_PIPE_REFRESH(p);
udp_pipe_schedule(p);
+ p->state = PIPE_CONN_MATCH;
nni_list_append(&ep->connpipes, p);
udp_send_cack(ep, p);
udp_ep_match(ep);
}
static void
-udp_recv_cack(udp_ep *ep, udp_sp_creq *cack, const nng_sockaddr *sa)
+udp_recv_cack(udp_ep *ep, udp_sp_msg *cack, const nng_sockaddr *sa)
{
udp_pipe *p;
- bool first;
nni_time now;
- if ((p = udp_find_pipe(ep, cack->us_peer_id, cack->us_sender_id)) &&
- (!p->closed)) {
- if ((p->peer_id != 0) && (p->peer != cack->us_type)) {
+ if ((p = udp_find_pipe(ep, sa)) && (!p->closed)) {
+ if (p->peer != cack->us_type) {
udp_send_disc(ep, p, DISC_TYPE);
return;
}
- first = (p->peer_id == 0);
-
// so we know who it is from.. this is a refresh.
- p->sndmax = cack->us_recv_max;
- p->peer = cack->us_type;
- p->peer_id = cack->us_sender_id;
+ p->sndmax = cack->us_recvmax;
+ p->peer = cack->us_type;
if (cack->us_refresh == 0) {
udp_send_disc(ep, p, DISC_NEGO);
return;
}
- if (first) {
- p->refresh = ep->refresh;
- p->peer_seq = cack->us_sequence + 1;
- }
+ // Always reset this, as dialers may have started with an
+ // unreasonably low value.
+ p->refresh = ep->refresh;
if ((cack->us_refresh * NNI_SECOND) < p->refresh) {
p->refresh = cack->us_refresh * NNI_SECOND;
}
@@ -830,16 +771,13 @@ udp_recv_cack(udp_ep *ep, udp_sp_creq *cack, const nng_sockaddr *sa)
p->expire = now + UDP_PIPE_TIMEOUT(p);
udp_pipe_schedule(p);
- if (first) {
+ if (p->state < PIPE_CONN_MATCH) {
+ p->state = PIPE_CONN_MATCH;
nni_list_append(&ep->connpipes, p);
udp_ep_match(ep);
}
return;
}
-
- // a CACK without a corresponding CREQ (or timed out pipe already)
- udp_send_disc_full(
- ep, sa, cack->us_peer_id, cack->us_sender_id, 0, DISC_NOTCONN);
}
static void
@@ -904,41 +842,36 @@ udp_rx_cb(void *arg)
sa = &ep->rx_sa;
n = nng_aio_count(aio);
- if ((n >= sizeof(*hdr)) && (hdr->data.us_ver == 1)) {
+ if ((n >= sizeof(*hdr)) && (hdr->us_ver == 1)) {
n -= sizeof(*hdr);
#ifndef NNG_LITTLE_ENDIAN
// Fix the endianness, so other routines don't have to.
- // It turns out that the endianness of the fields of CREQ
- // is compatible with the fields of every other message type.
// We only have to do this for systems that are not known
// (at compile time) to be little endian.
- hdr->data.us_type = NNI_GET16LE(&hdr->data.us_type);
- hdr->data.us_sender_id = NNI_GET32LE(&hdr->data.us_sender_id);
- hdr->data.us_peeer_id = NNI_GET32LE(&hdr->data.us_peer_id);
- hdr->data.us_sequence = NNI_GET32LE(&hdr->data.us_sequence);
- hdr->data.us_length = NNI_GET16LE(&hdr->data.us_length);
+ hdr->us_type = NNI_GET16LE(&hdr->us_type);
+ hdr->us_params[0] = NNI_GET16LE(&hdr->us_params[0]);
+ hdr->us_params[1] = NNI_GET16LE(&hdr->us_params[1]);
#endif
- switch (hdr->data.us_op_code) {
+ switch (hdr->us_op_code) {
case OPCODE_DATA:
- udp_recv_data(ep, &hdr->data, n, sa);
+ udp_recv_data(ep, hdr, n, sa);
break;
case OPCODE_CREQ:
- udp_recv_creq(ep, &hdr->creq, sa);
+ udp_recv_creq(ep, hdr, sa);
break;
case OPCODE_CACK:
- udp_recv_cack(ep, &hdr->cack, sa);
+ udp_recv_cack(ep, hdr, sa);
break;
case OPCODE_DISC:
- udp_recv_disc(ep, &hdr->disc, sa);
+ udp_recv_disc(ep, hdr, sa);
break;
case OPCODE_MESH: // TODO:
// udp_recv_mesh(ep, &hdr->mesh, sa);
// break;
default:
- udp_send_disc_full(
- ep, sa, 0, hdr->data.us_sender_id, 0, DISC_PROTO);
+ udp_send_disc_full(ep, sa, DISC_PROTO);
break;
}
}
@@ -959,11 +892,11 @@ finish:
static void
udp_pipe_send(void *arg, nni_aio *aio)
{
- udp_pipe *p = arg;
- udp_ep *ep;
- udp_sp_data dreq;
- nng_msg *msg;
- size_t count = 0;
+ udp_pipe *p = arg;
+ udp_ep *ep;
+ udp_sp_msg dreq;
+ nng_msg *msg;
+ size_t count = 0;
msg = nni_aio_get_msg(aio);
ep = p->ep;
@@ -976,23 +909,20 @@ udp_pipe_send(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
if ((nni_msg_len(msg) + nni_msg_header_len(msg)) > p->sndmax) {
nni_mtx_unlock(&ep->mtx);
- // rather failing this with an error, we just drop it on the
- // floor. this is on the sender, so there isn't a compelling
- // need to disconnect the pipe, since it we're not being
- // "ill-behaved" to our peer.
+ // rather failing this with an error, we just drop it on
+ // the floor. this is on the sender, so there isn't a
+ // compelling need to disconnect the pipe, since it we're
+ // not being "ill-behaved" to our peer.
nni_aio_finish(aio, 0, count);
nni_stat_inc(&ep->st_snd_toobig, 1);
nni_msg_free(msg);
return;
}
- dreq.us_ver = 1;
- dreq.us_type = ep->proto;
- dreq.us_op_code = OPCODE_DATA;
- dreq.us_sender_id = p->self_id;
- dreq.us_peer_id = p->peer_id;
- dreq.us_sequence = p->self_seq++;
- dreq.us_length = (uint16_t) count;
+ dreq.us_ver = 1;
+ dreq.us_type = ep->proto;
+ dreq.us_op_code = OPCODE_DATA;
+ dreq.us_length = (uint16_t) count;
// Just queue it, or fail it.
udp_queue_tx(ep, &p->peer_addr, (void *) &dreq, msg);
@@ -1107,7 +1037,7 @@ udp_ep_fini(void *arg)
nni_aio_fini(&ep->rx_aio);
if (ep->udp != NULL) {
- nni_udp_close(ep->udp);
+ nng_udp_close(ep->udp);
}
for (int i = 0; i < ep->tx_ring.size; i++) {
@@ -1122,16 +1052,24 @@ udp_ep_fini(void *arg)
static void
udp_ep_close(void *arg)
{
- udp_ep *ep = arg;
- nni_aio *aio;
+ udp_ep *ep = arg;
+ udp_pipe *p;
+ nni_aio *aio;
+ uint32_t cursor;
+ uint64_t key;
+
+ nni_mtx_lock(&ep->mtx);
+ ep->closed = true;
+ // leave tx open so we can send disconnects
nni_aio_close(&ep->resaio);
nni_aio_close(&ep->rx_aio);
nni_aio_close(&ep->timeaio);
- // leave tx open so we can send disconnects
-
- nni_mtx_lock(&ep->mtx);
+ // close all the underlying pipes, so the peer can see it.
+ while (nni_id_visit(&ep->pipes, &key, (void **) &p, &cursor)) {
+ nni_pipe_close(p->npipe);
+ }
while ((aio = nni_list_first(&ep->connaios)) != NULL) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECONNABORTED);
@@ -1154,7 +1092,6 @@ udp_ep_stop(void *arg)
nni_time linger =
nni_clock() + NNI_SECOND / 2; // half second to drain, max
nni_mtx_lock(&ep->mtx);
- ep->fini = true;
while ((ep->tx_ring.count > 0) && (nni_clock() < linger)) {
nni_mtx_unlock(&ep->mtx);
nng_msleep(1);
@@ -1165,6 +1102,7 @@ udp_ep_stop(void *arg)
"Lingering timed out on endpoint close, peer "
"notifications dropped");
}
+ ep->stopped = true;
nni_mtx_unlock(&ep->mtx);
// finally close the tx channel
@@ -1210,8 +1148,8 @@ udp_timer_cb(void *arg)
"Pipe peer %s timed out due to inactivity",
nng_str_sockaddr(&p->peer_addr, buf, sizeof(buf)));
- // Possibly alert the dialer, so it can restart a new
- // attempt.
+ // Possibly alert the dialer, so it can restart a
+ // new attempt.
if ((ep->dialer) && (p->peer_id == 0) &&
(aio = nni_list_first(&ep->connaios))) {
nni_aio_list_remove(aio);
@@ -1285,6 +1223,7 @@ udp_ep_init(
ep->self_sa.s_family = ep->af;
ep->proto = nni_sock_proto_id(sock);
+ ep->peer = nni_sock_peer_id(sock);
ep->url = url;
ep->refresh = NNG_UDP_REFRESH; // one minute by default
ep->rcvmax = NNG_UDP_RECVMAX;
@@ -1298,9 +1237,6 @@ udp_ep_init(
NNG_STAT_LEVEL, NNG_UNIT_BYTES);
NNI_STAT_LOCK(copy_max_info, "copy_max",
"threshold to switch to loan-up", NNG_STAT_LEVEL, NNG_UNIT_BYTES);
- NNI_STAT_LOCK(rcv_reorder_info, "rcv_reorder",
- "messages received out of order", NNG_STAT_COUNTER,
- NNG_UNIT_MESSAGES);
NNI_STAT_LOCK(rcv_nomatch_info, "rcv_nomatch",
"messages without a matching connection", NNG_STAT_COUNTER,
NNG_UNIT_MESSAGES);
@@ -1330,7 +1266,6 @@ udp_ep_init(
nni_stat_init_lock(&ep->st_copy_max, &copy_max_info, &ep->mtx);
nni_stat_init_lock(&ep->st_rcv_copy, &rcv_copy_info, &ep->mtx);
nni_stat_init_lock(&ep->st_rcv_nocopy, &rcv_nocopy_info, &ep->mtx);
- nni_stat_init_lock(&ep->st_rcv_reorder, &rcv_reorder_info, &ep->mtx);
nni_stat_init_lock(&ep->st_rcv_toobig, &rcv_toobig_info, &ep->mtx);
nni_stat_init_lock(&ep->st_rcv_nomatch, &rcv_nomatch_info, &ep->mtx);
nni_stat_init_lock(&ep->st_rcv_nobuf, &rcv_nobuf_info, &ep->mtx);
@@ -1345,7 +1280,6 @@ udp_ep_init(
nni_listener_add_stat(l, &ep->st_copy_max);
nni_listener_add_stat(l, &ep->st_rcv_copy);
nni_listener_add_stat(l, &ep->st_rcv_nocopy);
- nni_listener_add_stat(l, &ep->st_rcv_reorder);
nni_listener_add_stat(l, &ep->st_rcv_toobig);
nni_listener_add_stat(l, &ep->st_rcv_nomatch);
nni_listener_add_stat(l, &ep->st_rcv_nobuf);
@@ -1358,7 +1292,6 @@ udp_ep_init(
nni_dialer_add_stat(d, &ep->st_copy_max);
nni_dialer_add_stat(d, &ep->st_rcv_copy);
nni_dialer_add_stat(d, &ep->st_rcv_nocopy);
- nni_dialer_add_stat(d, &ep->st_rcv_reorder);
nni_dialer_add_stat(d, &ep->st_rcv_toobig);
nni_dialer_add_stat(d, &ep->st_rcv_nomatch);
nni_dialer_add_stat(d, &ep->st_rcv_nobuf);
@@ -1478,7 +1411,7 @@ udp_resolv_cb(void *arg)
}
if (ep->udp == NULL) {
- if ((rv = nni_udp_open(&ep->udp, &ep->self_sa)) != 0) {
+ if ((rv = nng_udp_open(&ep->udp, &ep->self_sa)) != NNG_OK) {
nni_aio_list_remove(aio);
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
@@ -1487,16 +1420,19 @@ udp_resolv_cb(void *arg)
}
// places a "hold" on the ep
- if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) != 0) {
+ if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) !=
+ NNG_OK) {
nni_aio_list_remove(aio);
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
+ return;
}
- if ((rv = udp_pipe_start(p, ep, &ep->peer_sa)) != 0) {
+ if ((rv = udp_pipe_start(p, ep, &ep->peer_sa)) != NNG_OK) {
nni_aio_list_remove(aio);
nni_pipe_close(p->npipe);
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
+ return;
}
udp_pipe_schedule(p);
@@ -1562,7 +1498,7 @@ udp_ep_get_port(void *arg, void *buf, size_t *szp, nni_type t)
nni_mtx_lock(&ep->mtx);
if (ep->udp != NULL) {
- (void) nni_udp_sockname(ep->udp, &sa);
+ (void) nng_udp_sockname(ep->udp, &sa);
} else {
sa = ep->self_sa;
}
@@ -1597,7 +1533,7 @@ udp_ep_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t)
nng_sockaddr sa;
if (ep->udp != NULL) {
- (void) nni_udp_sockname(ep->udp, &sa);
+ (void) nng_udp_sockname(ep->udp, &sa);
} else {
sa = ep->self_sa;
}
@@ -1699,6 +1635,7 @@ udp_ep_match(udp_ep *ep)
return;
}
+ p->state = PIPE_CONN_DONE;
nni_aio_list_remove(aio);
nni_list_remove(&ep->connpipes, p);
nni_aio_set_output(aio, 0, p->npipe);
@@ -1724,13 +1661,13 @@ udp_ep_bind(void *arg, nng_url *url)
return (NNG_EBUSY);
}
- rv = nni_udp_open(&ep->udp, &ep->self_sa);
+ rv = nng_udp_open(&ep->udp, &ep->self_sa);
if (rv != NNG_OK) {
nni_mtx_unlock(&ep->mtx);
return (rv);
}
nng_sockaddr sa;
- nni_plat_udp_sockname(ep->udp, &sa);
+ nng_udp_sockname(ep->udp, &sa);
url->u_port = nng_sockaddr_port(&sa);
udp_ep_start(ep);
nni_mtx_unlock(&ep->mtx);
diff --git a/src/sp/transport/udp/udp_tran_test.c b/src/sp/transport/udp/udp_tran_test.c
index 0b4b180d..56a25cce 100644
--- a/src/sp/transport/udp/udp_tran_test.c
+++ b/src/sp/transport/udp/udp_tran_test.c
@@ -496,6 +496,66 @@ test_udp_pipe(void)
NUTS_CLOSE(s0);
NUTS_CLOSE(s1);
}
+
+void
+test_udp_reconnect_dialer(void)
+{
+ nng_socket s0;
+ nng_socket s1;
+ nng_listener l;
+ nng_dialer d;
+ char *addr;
+ nng_msg *msg;
+ nng_sockaddr sa0;
+
+ NUTS_LOGGING();
+ NUTS_ADDR(addr, "udp4");
+
+ // For this test, using PUB sub is better to avoid fighting the pair
+ // exclusion. Generally speaking pair is probably a poor fit for UDP
+ // anyway.
+
+ NUTS_PASS(nng_pub0_open(&s0));
+ NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 2000));
+ NUTS_PASS(nng_listener_create(&l, s0, addr));
+ NUTS_PASS(nng_listener_start(l, 0));
+ NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa0));
+
+ NUTS_PASS(nng_sub0_open(&s1));
+ NUTS_PASS(nng_sub0_socket_subscribe(s1, "", 0));
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 2000));
+ NUTS_PASS(nng_dialer_create(&d, s1, addr));
+ NUTS_PASS(nng_dialer_start(d, 0));
+
+ nng_msleep(500);
+
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_sendmsg(s0, msg, 0));
+ NUTS_PASS(nng_recvmsg(s1, &msg, 0));
+ nng_msg_free(msg);
+
+ // now close the dialer
+ NUTS_PASS(nng_dialer_close(d));
+
+ // wait a bit before starting a new one -- allow for the old one to
+ // tear down completely on both ends.
+ nng_msleep(500);
+
+ // and create and start a new one
+ NUTS_PASS(nng_dialer_create(&d, s1, addr));
+ NUTS_PASS(nng_dialer_start(d, 0));
+ nng_msleep(500);
+
+ // show that we still send and receive
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_sendmsg(s0, msg, 0));
+ NUTS_PASS(nng_recvmsg(s1, &msg, 0));
+ nng_msg_free(msg);
+
+ NUTS_CLOSE(s0);
+ NUTS_CLOSE(s1);
+}
+
void
test_udp_stats(void)
{
@@ -559,6 +619,7 @@ NUTS_TESTS = {
{ "udp multi small burst", test_udp_multi_small_burst },
{ "udp crush", test_udp_crush },
{ "udp pipe", test_udp_pipe },
+ { "udp reconnect dialer", test_udp_reconnect_dialer },
{ "udp stats", test_udp_stats },
{ NULL, NULL },
};