aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-08 09:50:34 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-08 14:02:42 -0800
commitaa2639b41978890ca165633c8efd56ba59e3cdbc (patch)
tree098d82f0a9724b7a05a5c71332213466bc05482b
parentd6ab6bca7a538c1a320ce00ab845e98c16649c94 (diff)
downloadnng-aa2639b41978890ca165633c8efd56ba59e3cdbc.tar.gz
nng-aa2639b41978890ca165633c8efd56ba59e3cdbc.tar.bz2
nng-aa2639b41978890ca165633c8efd56ba59e3cdbc.zip
socket transport: convert to using inline pipe and endpoint allocations
This eliminates the need for separate reap operations, and it also eliminates a few failure modes, further simplifying the code. This is the first transport to get this treatment. The others will follow.
-rw-r--r--src/sp/transport/socket/sockfd.c174
1 files changed, 47 insertions, 127 deletions
diff --git a/src/sp/transport/socket/sockfd.c b/src/sp/transport/socket/sockfd.c
index 6fbda8fe..3db76215 100644
--- a/src/sp/transport/socket/sockfd.c
+++ b/src/sp/transport/socket/sockfd.c
@@ -25,29 +25,27 @@ typedef struct sfd_tran_ep sfd_tran_ep;
// sfd_tran_pipe wraps an open file descriptor
struct sfd_tran_pipe {
- nng_stream *conn;
- nni_pipe *npipe;
- uint16_t peer;
- uint16_t proto;
- size_t rcvmax;
- bool closed;
- nni_list_node node;
- sfd_tran_ep *ep;
- nni_atomic_flag reaped;
- nni_reap_node reap;
- uint8_t txlen[sizeof(uint64_t)];
- uint8_t rxlen[sizeof(uint64_t)];
- size_t gottxhead;
- size_t gotrxhead;
- size_t wanttxhead;
- size_t wantrxhead;
- nni_list recvq;
- nni_list sendq;
- nni_aio txaio;
- nni_aio rxaio;
- nni_aio negoaio;
- nni_msg *rxmsg;
- nni_mtx mtx;
+ nng_stream *conn;
+ nni_pipe *npipe;
+ uint16_t peer;
+ uint16_t proto;
+ size_t rcvmax;
+ bool closed;
+ nni_list_node node;
+ sfd_tran_ep *ep;
+ uint8_t txlen[sizeof(uint64_t)];
+ uint8_t rxlen[sizeof(uint64_t)];
+ size_t gottxhead;
+ size_t gotrxhead;
+ size_t wanttxhead;
+ size_t wantrxhead;
+ nni_list recvq;
+ nni_list sendq;
+ nni_aio txaio;
+ nni_aio rxaio;
+ nni_aio negoaio;
+ nni_msg *rxmsg;
+ nni_mtx mtx;
};
struct sfd_tran_ep {
@@ -58,13 +56,11 @@ struct sfd_tran_ep {
bool started;
bool closed;
nng_sockaddr src;
- int refcnt; // active pipes
nni_aio *useraio;
nni_aio connaio;
- nni_list busypipes; // busy pipes -- ones passed to socket
nni_list waitpipes; // pipes waiting to match to socket
nni_list negopipes; // pipes busy negotiating
- nni_reap_node reap;
+ nni_listener *nlistener;
nng_stream_listener *listener;
#ifdef NNG_ENABLE_STATS
@@ -78,17 +74,6 @@ static void sfd_tran_pipe_send_cb(void *);
static void sfd_tran_pipe_recv_cb(void *);
static void sfd_tran_pipe_nego_cb(void *);
static void sfd_tran_ep_fini(void *);
-static void sfd_tran_pipe_fini(void *);
-
-static nni_reap_list sfd_tran_ep_reap_list = {
- .rl_offset = offsetof(sfd_tran_ep, reap),
- .rl_func = sfd_tran_ep_fini,
-};
-
-static nni_reap_list sfd_tran_pipe_reap_list = {
- .rl_offset = offsetof(sfd_tran_pipe, reap),
- .rl_func = sfd_tran_pipe_fini,
-};
static void
sfd_tran_init(void)
@@ -119,11 +104,15 @@ sfd_tran_pipe_close(void *arg)
static void
sfd_tran_pipe_stop(void *arg)
{
- sfd_tran_pipe *p = arg;
+ sfd_tran_pipe *p = arg;
+ sfd_tran_ep *ep = p->ep;
nni_aio_stop(&p->rxaio);
nni_aio_stop(&p->txaio);
nni_aio_stop(&p->negoaio);
+ nni_mtx_lock(&ep->mtx);
+ nni_list_node_remove(&p->node);
+ nni_mtx_unlock(&ep->mtx);
}
static int
@@ -131,6 +120,12 @@ sfd_tran_pipe_init(void *arg, nni_pipe *npipe)
{
sfd_tran_pipe *p = arg;
p->npipe = npipe;
+ nni_mtx_init(&p->mtx);
+ nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p);
+ nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p);
+ nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p);
+ nni_aio_list_init(&p->recvq);
+ nni_aio_list_init(&p->sendq);
return (0);
}
@@ -139,18 +134,6 @@ static void
sfd_tran_pipe_fini(void *arg)
{
sfd_tran_pipe *p = arg;
- sfd_tran_ep *ep;
-
- sfd_tran_pipe_stop(p);
- if ((ep = p->ep) != NULL) {
- nni_mtx_lock(&ep->mtx);
- nni_list_node_remove(&p->node);
- ep->refcnt--;
- if (ep->fini && (ep->refcnt == 0)) {
- nni_reap(&sfd_tran_ep_reap_list, ep);
- }
- nni_mtx_unlock(&ep->mtx);
- }
nni_aio_fini(&p->rxaio);
nni_aio_fini(&p->txaio);
@@ -158,39 +141,6 @@ sfd_tran_pipe_fini(void *arg)
nng_stream_free(p->conn);
nni_msg_free(p->rxmsg);
nni_mtx_fini(&p->mtx);
- NNI_FREE_STRUCT(p);
-}
-
-static void
-sfd_tran_pipe_reap(sfd_tran_pipe *p)
-{
- if (!nni_atomic_flag_test_and_set(&p->reaped)) {
- if (p->conn != NULL) {
- nng_stream_close(p->conn);
- }
- nni_reap(&sfd_tran_pipe_reap_list, p);
- }
-}
-
-static int
-sfd_tran_pipe_alloc(sfd_tran_pipe **pipep)
-{
- sfd_tran_pipe *p;
-
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
- nni_mtx_init(&p->mtx);
- nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p);
- nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p);
- nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p);
- nni_aio_list_init(&p->recvq);
- nni_aio_list_init(&p->sendq);
- nni_atomic_flag_reset(&p->reaped);
-
- *pipep = p;
-
- return (0);
}
static void
@@ -204,10 +154,9 @@ sfd_tran_ep_match(sfd_tran_ep *ep)
return;
}
nni_list_remove(&ep->waitpipes, p);
- nni_list_append(&ep->busypipes, p);
ep->useraio = NULL;
p->rcvmax = ep->rcvmax;
- nni_aio_set_output(aio, 0, p);
+ nni_aio_set_output(aio, 0, p->npipe);
nni_aio_finish(aio, 0, 0);
}
@@ -288,7 +237,8 @@ error:
nni_aio_finish_error(uaio, rv);
}
nni_mtx_unlock(&ep->mtx);
- sfd_tran_pipe_reap(p);
+ nni_pipe_close(p->npipe);
+ nni_pipe_rele(p->npipe);
}
static void
@@ -620,8 +570,6 @@ sfd_tran_pipe_start(sfd_tran_pipe *p, nng_stream *conn, sfd_tran_ep *ep)
{
nni_iov iov;
- ep->refcnt++;
-
p->conn = conn;
p->ep = ep;
p->proto = ep->proto;
@@ -659,18 +607,12 @@ sfd_tran_ep_fini(void *arg)
sfd_tran_ep *ep = arg;
nni_mtx_lock(&ep->mtx);
- ep->fini = true;
- if (ep->refcnt != 0) {
- nni_mtx_unlock(&ep->mtx);
- return;
- }
nni_mtx_unlock(&ep->mtx);
nni_aio_stop(&ep->connaio);
nng_stream_listener_free(ep->listener);
nni_aio_fini(&ep->connaio);
nni_mtx_fini(&ep->mtx);
- NNI_FREE_STRUCT(ep);
}
static void
@@ -691,9 +633,6 @@ sfd_tran_ep_close(void *arg)
NNI_LIST_FOREACH (&ep->waitpipes, p) {
sfd_tran_pipe_close(p);
}
- NNI_LIST_FOREACH (&ep->busypipes, p) {
- sfd_tran_pipe_close(p);
- }
if (ep->useraio != NULL) {
nni_aio_finish_error(ep->useraio, NNG_ECLOSED);
ep->useraio = NULL;
@@ -718,15 +657,13 @@ sfd_tran_accept_cb(void *arg)
}
conn = nni_aio_get_output(aio, 0);
- if ((rv = sfd_tran_pipe_alloc(&p)) != 0) {
+ if (ep->closed) {
nng_stream_free(conn);
+ rv = NNG_ECLOSED;
goto error;
}
-
- if (ep->closed) {
- sfd_tran_pipe_fini(p);
+ if ((rv = nni_pipe_alloc_listener((void **) &p, ep->nlistener)) != 0) {
nng_stream_free(conn);
- rv = NNG_ECLOSED;
goto error;
}
sfd_tran_pipe_start(p, conn, ep);
@@ -748,18 +685,17 @@ error:
}
static int
-sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock)
+sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
{
- sfd_tran_ep *ep;
- NNI_ARG_UNUSED(url);
+ sfd_tran_ep *ep = (void *) lp;
+ int rv;
+ nni_sock *sock = nni_listener_sock(nlistener);
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
- }
+ ep->nlistener = nlistener;
nni_mtx_init(&ep->mtx);
- NNI_LIST_INIT(&ep->busypipes, sfd_tran_pipe, node);
NNI_LIST_INIT(&ep->waitpipes, sfd_tran_pipe, node);
NNI_LIST_INIT(&ep->negopipes, sfd_tran_pipe, node);
+ nni_aio_init(&ep->connaio, sfd_tran_accept_cb, ep);
ep->proto = nni_sock_proto_id(sock);
@@ -774,17 +710,6 @@ sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock)
nni_stat_init(&ep->st_rcv_max, &rcv_max_info);
#endif
- *epp = ep;
- return (0);
-}
-
-static int
-sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
-{
- sfd_tran_ep *ep;
- int rv;
- nni_sock *sock = nni_listener_sock(nlistener);
-
// Check for invalid URL components -- we only accept a bare scheme
if ((url->u_hostname != NULL) || (strlen(url->u_path) != 0) ||
(url->u_fragment != NULL) || (url->u_userinfo != NULL) ||
@@ -792,12 +717,6 @@ sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
return (NNG_EADDRINVAL);
}
- if ((rv = sfd_tran_ep_init(&ep, url, sock)) != 0) {
- return (rv);
- }
-
- nni_aio_init(&ep->connaio, sfd_tran_accept_cb, ep);
-
if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) {
sfd_tran_ep_fini(ep);
return (rv);
@@ -807,7 +726,6 @@ sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
nni_listener_add_stat(nlistener, &ep->st_rcv_max);
#endif
- *lp = ep;
return (0);
}
@@ -896,6 +814,7 @@ sfd_tran_ep_accept(void *arg, nni_aio *aio)
}
static nni_sp_pipe_ops sfd_tran_pipe_ops = {
+ .p_size = sizeof(sfd_tran_pipe),
.p_init = sfd_tran_pipe_init,
.p_fini = sfd_tran_pipe_fini,
.p_stop = sfd_tran_pipe_stop,
@@ -947,6 +866,7 @@ sfd_tran_listener_setopt(
}
static nni_sp_listener_ops sfd_tran_listener_ops = {
+ .l_size = sizeof(sfd_tran_ep),
.l_init = sfd_tran_listener_init,
.l_fini = sfd_tran_ep_fini,
.l_bind = sfd_tran_ep_bind,