aboutsummaryrefslogtreecommitdiff
path: root/src/sp
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-08 12:30:42 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-08 14:02:42 -0800
commitb6549f7ead48cabd65cb372ca3cf9ead9f5e4a1c (patch)
treeddbadc766d7347969b5b9e04c0ae8488a8efb079 /src/sp
parentaa2639b41978890ca165633c8efd56ba59e3cdbc (diff)
downloadnng-b6549f7ead48cabd65cb372ca3cf9ead9f5e4a1c.tar.gz
nng-b6549f7ead48cabd65cb372ca3cf9ead9f5e4a1c.tar.bz2
nng-b6549f7ead48cabd65cb372ca3cf9ead9f5e4a1c.zip
udp transport: convert to using inline pipes and endpoints
This allows us to eliminate some extra reference counting and reaping related complexity.
Diffstat (limited to 'src/sp')
-rw-r--r--src/sp/transport/udp/udp.c381
1 files changed, 153 insertions, 228 deletions
diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c
index 9a00974a..59b5e8a6 100644
--- a/src/sp/transport/udp/udp.c
+++ b/src/sp/transport/udp/udp.c
@@ -16,8 +16,6 @@
#include "core/platform.h"
#include "nng/nng.h"
-#include <stdbool.h>
-#include <stdlib.h>
#include <string.h>
// Experimental UDP transport. Unicast only.
@@ -187,39 +185,40 @@ struct udp_pipe {
};
struct udp_ep {
- nni_udp *udp;
- nni_mtx mtx;
- uint16_t proto;
- uint16_t af; // address family
- bool fini;
- bool started;
- bool closed;
- bool cooldown;
- nng_url *url;
- const char *host; // for dialers
- int refcnt; // active pipes
- nni_aio *useraio;
- nni_aio *connaio;
- nni_aio timeaio;
- nni_aio resaio;
- bool dialer;
- bool tx_busy; // true if tx pending
- nni_msg *rx_payload; // current receive message
- nng_sockaddr rx_sa; // addr for last message
- nni_aio tx_aio; // aio for TX handling
- nni_aio rx_aio; // aio for RX handling
- nni_id_map pipes; // pipes (indexed by id)
- nni_sockaddr self_sa; // our address
- nni_sockaddr peer_sa; // peer address, only for dialer;
- nni_sockaddr mesh_sa; // mesh source address (ours)
- nni_list connaios; // aios from accept waiting for a client peer
- nni_list connpipes; // pipes waiting to be connected
- nng_duration refresh; // refresh interval for connections in seconds
- udp_sp_msg rx_msg; // contains the received message header
- uint16_t rcvmax; // max payload, trimmed to uint16_t
- uint16_t copymax;
- udp_txring tx_ring;
- nni_time next_wake;
+ nni_udp *udp;
+ nni_mtx mtx;
+ uint16_t proto;
+ uint16_t af; // address family
+ bool fini;
+ bool started;
+ bool closed;
+ bool cooldown;
+ nng_url *url;
+ const char *host; // for dialers
+ nni_aio *useraio;
+ nni_aio *connaio;
+ nni_aio timeaio;
+ nni_aio resaio;
+ bool dialer;
+ bool tx_busy; // true if tx pending
+ nni_listener *nlistener;
+ nni_dialer *ndialer;
+ nni_msg *rx_payload; // current receive message
+ nng_sockaddr rx_sa; // addr for last message
+ nni_aio tx_aio; // aio for TX handling
+ nni_aio rx_aio; // aio for RX handling
+ nni_id_map pipes; // pipes (indexed by id)
+ nni_sockaddr self_sa; // our address
+ nni_sockaddr peer_sa; // peer address, only for dialer;
+ nni_sockaddr mesh_sa; // mesh source address (ours)
+ nni_list connaios; // aios from accept waiting for a client peer
+ nni_list connpipes; // pipes waiting to be connected
+ nng_duration refresh; // refresh interval for connections in seconds
+ udp_sp_msg rx_msg; // contains the received message header
+ uint16_t rcvmax; // max payload, trimmed to uint16_t
+ uint16_t copymax;
+ udp_txring tx_ring;
+ nni_time next_wake;
nni_aio_completions complq;
nni_stat_item st_rcv_max;
@@ -235,45 +234,10 @@ struct udp_ep {
nni_stat_item st_copy_max;
};
-static void udp_ep_hold(udp_ep *ep);
-static void udp_ep_rele(udp_ep *ep);
-static void udp_ep_fini(void *);
static void udp_ep_start(udp_ep *);
-static void udp_pipe_fini(void *);
static void udp_resolv_cb(void *);
static void udp_rx_cb(void *);
-static int
-udp_pipe_alloc(udp_pipe **pp, udp_ep *ep, uint32_t peer_id, nng_sockaddr *sa)
-{
- udp_pipe *p;
- int rv;
- nni_time now;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_id_alloc32(&ep->pipes, &p->self_id, p)) != 0) {
- NNI_FREE_STRUCT(p);
- return (rv);
- }
- udp_ep_hold(ep);
- now = nni_clock();
- nni_aio_list_init(&p->rx_aios);
- nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN);
- p->ep = ep;
- p->dialer = ep->dialer;
- p->self_seq = nni_random();
- p->peer_id = peer_id;
- p->proto = ep->proto;
- p->peer_addr = *sa;
- p->refresh = p->dialer ? NNG_UDP_CONNRETRY : ep->refresh;
- p->next_wake = now + UDP_PIPE_REFRESH(p);
- p->expire = now + (p->dialer ? (5 * NNI_SECOND) : UDP_PIPE_TIMEOUT(p));
- p->rcvmax = ep->rcvmax;
- *pp = p;
- return (0);
-}
-
static void udp_recv_data(
udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa);
static void udp_send_disc_full(udp_ep *ep, nng_sockaddr *sa, uint32_t local_id,
@@ -311,7 +275,18 @@ udp_pipe_close(void *arg)
static void
udp_pipe_stop(void *arg)
{
+ udp_pipe *p = arg;
+ udp_ep *ep = p->ep;
+
udp_pipe_close(arg);
+
+ nni_mtx_lock(&ep->mtx);
+ if (p->self_id != 0) {
+ nni_id_remove(&p->ep->pipes, p->self_id);
+ p->self_id = 0;
+ }
+ nni_list_node_remove(&p->node);
+ nni_mtx_unlock(&ep->mtx);
}
static int
@@ -319,20 +294,31 @@ udp_pipe_init(void *arg, nni_pipe *npipe)
{
udp_pipe *p = arg;
p->npipe = npipe;
-
+ nni_aio_list_init(&p->rx_aios);
+ nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN);
return (0);
}
-static void
-udp_pipe_destroy(udp_pipe *p)
+static int
+udp_pipe_start(udp_pipe *p, udp_ep *ep, nng_sockaddr *sa)
{
- nng_msg *m;
+ nni_time now = nni_clock();
+ p->ep = ep;
+ p->proto = ep->proto;
+ p->peer_addr = *sa;
+ p->dialer = ep->dialer;
+ p->refresh = p->dialer ? NNG_UDP_CONNRETRY : ep->refresh;
+ p->rcvmax = ep->rcvmax;
+ p->self_seq = nni_random();
+ p->expire = now + (p->dialer ? (5 * NNI_SECOND) : UDP_PIPE_TIMEOUT(p));
+ return (nni_id_alloc32(&ep->pipes, &p->self_id, p));
+}
- if (p->self_id != 0) {
- nni_id_remove(&p->ep->pipes, p->self_id);
- p->self_id = 0;
- }
- nni_list_node_remove(&p->node);
+static void
+udp_pipe_fini(void *arg)
+{
+ udp_pipe *p = arg;
+ nng_msg *m;
// call with ep->mtx lock held
while (!nni_lmq_empty(&p->rx_mq)) {
@@ -341,19 +327,6 @@ udp_pipe_destroy(udp_pipe *p)
}
nni_lmq_fini(&p->rx_mq);
NNI_ASSERT(nni_list_empty(&p->rx_aios));
-
- NNI_FREE_STRUCT(p);
-}
-
-static void
-udp_pipe_fini(void *arg)
-{
- udp_pipe *p = arg;
- udp_ep *ep = p->ep;
-
- nni_mtx_lock(&ep->mtx);
- udp_pipe_destroy(p);
- udp_ep_rele(ep); // releases lock
}
// Find the pipe matching the given id (our pipe id, taken from the peer_id
@@ -784,12 +757,18 @@ udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa)
return;
}
- if (udp_pipe_alloc(&p, ep, creq->us_peer_id, sa) != 0) {
+ if (nni_pipe_alloc_listener((void **) &p, ep->nlistener) != 0) {
+ udp_send_disc_full(
+ ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF);
+ return;
+ }
+ if (udp_pipe_start(p, ep, sa) != 0) {
udp_send_disc_full(
ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF);
+ nni_pipe_close(p->npipe);
return;
}
- p->refresh = ep->refresh;
+
if ((creq->us_refresh * NNI_SECOND) < p->refresh) {
p->refresh = (creq->us_refresh * NNI_SECOND);
}
@@ -798,7 +777,6 @@ udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa)
p->peer_seq = creq->us_sequence + 1;
p->sndmax = creq->us_recv_max;
p->next_wake = now + UDP_PIPE_REFRESH(p);
- p->expire = now + UDP_PIPE_TIMEOUT(p);
udp_pipe_schedule(p);
nni_list_append(&ep->connpipes, p);
@@ -1119,33 +1097,29 @@ udp_pipe_getopt(
return (rv);
}
-// udp_ep_hold simply bumps the reference count.
-// This needs to be done with the lock for the EP held.
static void
-udp_ep_hold(udp_ep *ep)
+udp_ep_fini(void *arg)
{
- ep->refcnt++;
-}
+ udp_ep *ep = arg;
-// udp_ep_rele drops the reference count on the endpoint.
-// If the endpoint drops to zero, the EP is freed. It also
-// unlocks the mutex, which must be held when calling this.
-static void
-udp_ep_rele(udp_ep *ep)
-{
- nni_aio *aio;
- NNI_ASSERT(ep->refcnt > 0);
- ep->refcnt--;
- if (!ep->fini || ep->refcnt > 0) {
+ // We optionally linger a little bit (up to a half second)
+ // so that the disconnect messages can get pushed out. On
+ // most systems this should only take a single millisecond.
+ nni_time linger =
+ nni_clock() + NNI_SECOND / 2; // half second to drain, max
+ nni_mtx_lock(&ep->mtx);
+ ep->fini = true;
+ while ((ep->tx_ring.count > 0) && (nni_clock() < linger)) {
nni_mtx_unlock(&ep->mtx);
- return;
+ nng_msleep(1);
+ nni_mtx_lock(&ep->mtx);
}
- while ((aio = nni_list_first(&ep->connaios)) != NULL) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ if (ep->tx_ring.count > 0) {
+ nng_log_warn("NNG-UDP-LINGER",
+ "Lingering timed out on endpoint close, peer "
+ "notifications dropped");
}
nni_mtx_unlock(&ep->mtx);
-
nni_aio_close(&ep->timeaio);
nni_aio_close(&ep->resaio);
nni_aio_close(&ep->tx_aio);
@@ -1165,41 +1139,13 @@ udp_ep_rele(udp_ep *ep)
nni_msg_free(ep->rx_payload); // safe even if msg is null
nni_id_map_fini(&ep->pipes);
NNI_FREE_STRUCTS(ep->tx_ring.descs, ep->tx_ring.size);
- NNI_FREE_STRUCT(ep);
-}
-
-static void
-udp_ep_fini(void *arg)
-{
- udp_ep *ep = arg;
-
- // We optionally linger a little bit (up to a half second)
- // so that the disconnect messages can get pushed out. On
- // most systems this should only take a single millisecond.
- nni_time linger =
- nni_clock() + NNI_SECOND / 2; // half second to drain, max
- nni_mtx_lock(&ep->mtx);
- ep->fini = true;
- while ((ep->tx_ring.count > 0) && (nni_clock() < linger)) {
- NNI_ASSERT(ep->refcnt > 0);
- nni_mtx_unlock(&ep->mtx);
- nng_msleep(1);
- nni_mtx_lock(&ep->mtx);
- }
- if (ep->tx_ring.count > 0) {
- nng_log_warn("NNG-UDP-LINGER",
- "Lingering timed out on endpoint close, peer "
- "notifications dropped");
- }
- udp_ep_rele(ep); // drops the lock
}
static void
udp_ep_close(void *arg)
{
- udp_ep *ep = arg;
- udp_pipe *p;
- nni_aio *aio;
+ udp_ep *ep = arg;
+ nni_aio *aio;
nni_mtx_lock(&ep->mtx);
while ((aio = nni_list_first(&ep->connaios)) != NULL) {
@@ -1207,25 +1153,6 @@ udp_ep_close(void *arg)
nni_aio_finish_error(aio, NNG_ECONNABORTED);
}
- // close all pipes
- uint32_t cursor = 0;
- uint64_t id;
-
- // first we grab the connpipes that are not closed upstream
- while ((p = nni_list_first(&ep->connpipes)) != NULL) {
- udp_pipe_destroy(p);
- ep->refcnt--;
- }
- while (nni_id_visit(&ep->pipes, &id, (void **) &p, &cursor)) {
- if (p->peer_id != 0) {
- udp_send_disc(ep, p, DISC_CLOSED);
- }
- p->closed = true;
- while ((aio = nni_list_first(&p->rx_aios)) != NULL) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
- }
nni_aio_close(&ep->resaio);
nni_mtx_unlock(&ep->mtx);
}
@@ -1271,8 +1198,7 @@ udp_timer_cb(void *arg)
// If we're still on the connect list, then we need
// take responsibility for cleaning this up.
if (nni_list_node_active(&p->node)) {
- udp_pipe_destroy(p);
- ep->refcnt--;
+ nni_pipe_close(p->npipe);
continue;
}
@@ -1300,15 +1226,21 @@ udp_timer_cb(void *arg)
}
static int
-udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock, nni_dialer *dialer,
- nni_listener *listener)
+udp_ep_init(
+ udp_ep *ep, nng_url *url, nni_sock *sock, nni_dialer *d, nni_listener *l)
{
- udp_ep *ep;
- int rv;
+ int rv;
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
- }
+ nni_mtx_init(&ep->mtx);
+ nni_id_map_init(&ep->pipes, 1, 0xFFFFFFFF, true);
+ NNI_LIST_INIT(&ep->connpipes, udp_pipe, node);
+ nni_aio_list_init(&ep->connaios);
+
+ nni_aio_init(&ep->rx_aio, udp_rx_cb, ep);
+ nni_aio_init(&ep->tx_aio, udp_tx_cb, ep);
+ nni_aio_init(&ep->timeaio, udp_timer_cb, ep);
+ nni_aio_init(&ep->resaio, udp_resolv_cb, ep);
+ nni_aio_completions_init(&ep->complq);
ep->tx_ring.descs =
NNI_ALLOC_STRUCTS(ep->tx_ring.descs, NNG_UDP_TXQUEUE_LEN);
@@ -1325,7 +1257,6 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock, nni_dialer *dialer,
} else if (strcmp(url->u_scheme, "udp6") == 0) {
ep->af = NNG_AF_INET6;
} else {
- NNI_FREE_STRUCT(ep);
return (NNG_EADDRINVAL);
}
@@ -1335,25 +1266,12 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock, nni_dialer *dialer,
ep->refresh = NNG_UDP_REFRESH; // one minute by default
ep->rcvmax = NNG_UDP_RECVMAX;
ep->copymax = NNG_UDP_COPYMAX;
- ep->refcnt = 1;
if ((rv = nni_msg_alloc(&ep->rx_payload,
ep->rcvmax + sizeof(ep->rx_msg)) != 0)) {
NNI_FREE_STRUCTS(ep->tx_ring.descs, NNG_UDP_TXQUEUE_LEN);
- NNI_FREE_STRUCT(ep);
return (rv);
}
- nni_mtx_init(&ep->mtx);
- nni_id_map_init(&ep->pipes, 1, 0xFFFFFFFF, true);
- NNI_LIST_INIT(&ep->connpipes, udp_pipe, node);
- nni_aio_list_init(&ep->connaios);
-
- nni_aio_init(&ep->rx_aio, udp_rx_cb, ep);
- nni_aio_init(&ep->tx_aio, udp_tx_cb, ep);
- nni_aio_init(&ep->timeaio, udp_timer_cb, ep);
- nni_aio_init(&ep->resaio, udp_resolv_cb, ep);
- nni_aio_completions_init(&ep->complq);
-
NNI_STAT_LOCK(rcv_max_info, "rcv_max", "maximum receive size",
NNG_STAT_LEVEL, NNG_UNIT_BYTES);
NNI_STAT_LOCK(copy_max_info, "copy_max",
@@ -1399,28 +1317,31 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock, nni_dialer *dialer,
nni_stat_init_lock(
&ep->st_peer_inactive, &peer_inactive_info, &ep->mtx);
- if (listener) {
- nni_listener_add_stat(listener, &ep->st_rcv_max);
- nni_listener_add_stat(listener, &ep->st_copy_max);
- nni_listener_add_stat(listener, &ep->st_rcv_copy);
- nni_listener_add_stat(listener, &ep->st_rcv_nocopy);
- nni_listener_add_stat(listener, &ep->st_rcv_reorder);
- nni_listener_add_stat(listener, &ep->st_rcv_toobig);
- nni_listener_add_stat(listener, &ep->st_rcv_nomatch);
- nni_listener_add_stat(listener, &ep->st_rcv_nobuf);
- nni_listener_add_stat(listener, &ep->st_snd_toobig);
- nni_listener_add_stat(listener, &ep->st_snd_nobuf);
- } else {
- nni_dialer_add_stat(dialer, &ep->st_rcv_max);
- nni_dialer_add_stat(dialer, &ep->st_copy_max);
- nni_dialer_add_stat(dialer, &ep->st_rcv_copy);
- nni_dialer_add_stat(dialer, &ep->st_rcv_nocopy);
- nni_dialer_add_stat(dialer, &ep->st_rcv_reorder);
- nni_dialer_add_stat(dialer, &ep->st_rcv_toobig);
- nni_dialer_add_stat(dialer, &ep->st_rcv_nomatch);
- nni_dialer_add_stat(dialer, &ep->st_rcv_nobuf);
- nni_dialer_add_stat(dialer, &ep->st_snd_toobig);
- nni_dialer_add_stat(dialer, &ep->st_snd_nobuf);
+ if (l) {
+ NNI_ASSERT(d == NULL);
+ nni_listener_add_stat(l, &ep->st_rcv_max);
+ nni_listener_add_stat(l, &ep->st_copy_max);
+ nni_listener_add_stat(l, &ep->st_rcv_copy);
+ nni_listener_add_stat(l, &ep->st_rcv_nocopy);
+ nni_listener_add_stat(l, &ep->st_rcv_reorder);
+ nni_listener_add_stat(l, &ep->st_rcv_toobig);
+ nni_listener_add_stat(l, &ep->st_rcv_nomatch);
+ nni_listener_add_stat(l, &ep->st_rcv_nobuf);
+ nni_listener_add_stat(l, &ep->st_snd_toobig);
+ nni_listener_add_stat(l, &ep->st_snd_nobuf);
+ }
+ if (d) {
+ NNI_ASSERT(l == NULL);
+ nni_dialer_add_stat(d, &ep->st_rcv_max);
+ nni_dialer_add_stat(d, &ep->st_copy_max);
+ nni_dialer_add_stat(d, &ep->st_rcv_copy);
+ nni_dialer_add_stat(d, &ep->st_rcv_nocopy);
+ nni_dialer_add_stat(d, &ep->st_rcv_reorder);
+ nni_dialer_add_stat(d, &ep->st_rcv_toobig);
+ nni_dialer_add_stat(d, &ep->st_rcv_nomatch);
+ nni_dialer_add_stat(d, &ep->st_rcv_nobuf);
+ nni_dialer_add_stat(d, &ep->st_snd_toobig);
+ nni_dialer_add_stat(d, &ep->st_snd_nobuf);
}
// schedule our timer callback - forever for now
@@ -1428,7 +1349,6 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock, nni_dialer *dialer,
// actions which require earlier wakeup.
nni_sleep_aio(NNG_DURATION_INFINITE, &ep->timeaio);
- *epp = ep;
return (0);
}
@@ -1454,42 +1374,39 @@ udp_check_url(nng_url *url, bool listen)
static int
udp_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
{
- udp_ep *ep;
+ udp_ep *ep = (void *) dp;
int rv;
nni_sock *sock = nni_dialer_sock(ndialer);
- if ((rv = udp_check_url(url, false)) != 0) {
+ ep->ndialer = ndialer;
+ if ((rv = udp_ep_init(ep, url, sock, ndialer, NULL)) != 0) {
return (rv);
}
- if ((rv = udp_ep_init(&ep, url, sock, ndialer, NULL)) != 0) {
+ if ((rv = udp_check_url(url, false)) != 0) {
return (rv);
}
- *dp = ep;
return (0);
}
static int
udp_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
{
- udp_ep *ep;
- int rv;
- nni_sock *sock = nni_listener_sock(nlistener);
- nng_sockaddr sa;
+ udp_ep *ep = (void *) lp;
+ int rv;
+ nni_sock *sock = nni_listener_sock(nlistener);
- // Check for invalid URL components.
- if (((rv = udp_check_url(url, true)) != 0) ||
- ((rv = nni_url_to_address(&sa, url)) != 0)) {
+ ep->nlistener = nlistener;
+ if ((rv = udp_ep_init(ep, url, sock, NULL, nlistener)) != 0) {
return (rv);
}
-
- if ((rv = udp_ep_init(&ep, url, sock, NULL, nlistener)) != 0) {
+ // Check for invalid URL components.
+ if (((rv = udp_check_url(url, true)) != 0) ||
+ ((rv = nni_url_to_address(&ep->self_sa, url)) != 0)) {
return (rv);
}
- ep->self_sa = sa;
- *lp = ep;
return (0);
}
@@ -1548,11 +1465,16 @@ udp_resolv_cb(void *arg)
}
// places a "hold" on the ep
- if ((rv = udp_pipe_alloc(&p, ep, 0, &ep->peer_sa)) != 0) {
+ if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) != 0) {
nni_aio_list_remove(aio);
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
- return;
+ }
+ if ((rv = udp_pipe_start(p, ep, &ep->peer_sa)) != 0) {
+ nni_aio_list_remove(aio);
+ nni_pipe_close(p->npipe);
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
}
udp_pipe_schedule(p);
@@ -1758,7 +1680,7 @@ udp_ep_match(udp_ep *ep)
nni_aio_list_remove(aio);
nni_list_remove(&ep->connpipes, p);
- nni_aio_set_output(aio, 0, p);
+ nni_aio_set_output(aio, 0, p->npipe);
nni_aio_finish(aio, 0, 0);
}
@@ -1821,6 +1743,7 @@ udp_ep_accept(void *arg, nni_aio *aio)
}
static nni_sp_pipe_ops udp_pipe_ops = {
+ .p_size = sizeof(udp_pipe),
.p_init = udp_pipe_init,
.p_fini = udp_pipe_fini,
.p_stop = udp_pipe_stop,
@@ -1897,6 +1820,7 @@ udp_listener_setopt(
}
static nni_sp_dialer_ops udp_dialer_ops = {
+ .d_size = sizeof(udp_ep),
.d_init = udp_dialer_init,
.d_fini = udp_ep_fini,
.d_connect = udp_ep_connect,
@@ -1906,6 +1830,7 @@ static nni_sp_dialer_ops udp_dialer_ops = {
};
static nni_sp_listener_ops udp_listener_ops = {
+ .l_size = sizeof(udp_ep),
.l_init = udp_listener_init,
.l_fini = udp_ep_fini,
.l_bind = udp_ep_bind,