diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-08 09:50:34 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-08 14:02:42 -0800 |
| commit | aa2639b41978890ca165633c8efd56ba59e3cdbc (patch) | |
| tree | 098d82f0a9724b7a05a5c71332213466bc05482b /src/sp/transport | |
| parent | d6ab6bca7a538c1a320ce00ab845e98c16649c94 (diff) | |
| download | nng-aa2639b41978890ca165633c8efd56ba59e3cdbc.tar.gz nng-aa2639b41978890ca165633c8efd56ba59e3cdbc.tar.bz2 nng-aa2639b41978890ca165633c8efd56ba59e3cdbc.zip | |
socket transport: convert to using inline pipe and endpoint allocations
This eliminates the need for separate reap operations, and it
also eliminates a few failure modes, further simplifying the code.
This is the first transport to get this treatment. The others will follow.
Diffstat (limited to 'src/sp/transport')
| -rw-r--r-- | src/sp/transport/socket/sockfd.c | 174 |
1 files changed, 47 insertions, 127 deletions
diff --git a/src/sp/transport/socket/sockfd.c b/src/sp/transport/socket/sockfd.c index 6fbda8fe..3db76215 100644 --- a/src/sp/transport/socket/sockfd.c +++ b/src/sp/transport/socket/sockfd.c @@ -25,29 +25,27 @@ typedef struct sfd_tran_ep sfd_tran_ep; // sfd_tran_pipe wraps an open file descriptor struct sfd_tran_pipe { - nng_stream *conn; - nni_pipe *npipe; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool closed; - nni_list_node node; - sfd_tran_ep *ep; - nni_atomic_flag reaped; - nni_reap_node reap; - uint8_t txlen[sizeof(uint64_t)]; - uint8_t rxlen[sizeof(uint64_t)]; - size_t gottxhead; - size_t gotrxhead; - size_t wanttxhead; - size_t wantrxhead; - nni_list recvq; - nni_list sendq; - nni_aio txaio; - nni_aio rxaio; - nni_aio negoaio; - nni_msg *rxmsg; - nni_mtx mtx; + nng_stream *conn; + nni_pipe *npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool closed; + nni_list_node node; + sfd_tran_ep *ep; + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_list recvq; + nni_list sendq; + nni_aio txaio; + nni_aio rxaio; + nni_aio negoaio; + nni_msg *rxmsg; + nni_mtx mtx; }; struct sfd_tran_ep { @@ -58,13 +56,11 @@ struct sfd_tran_ep { bool started; bool closed; nng_sockaddr src; - int refcnt; // active pipes nni_aio *useraio; nni_aio connaio; - nni_list busypipes; // busy pipes -- ones passed to socket nni_list waitpipes; // pipes waiting to match to socket nni_list negopipes; // pipes busy negotiating - nni_reap_node reap; + nni_listener *nlistener; nng_stream_listener *listener; #ifdef NNG_ENABLE_STATS @@ -78,17 +74,6 @@ static void sfd_tran_pipe_send_cb(void *); static void sfd_tran_pipe_recv_cb(void *); static void sfd_tran_pipe_nego_cb(void *); static void sfd_tran_ep_fini(void *); -static void sfd_tran_pipe_fini(void *); - -static nni_reap_list sfd_tran_ep_reap_list = { - .rl_offset = offsetof(sfd_tran_ep, reap), - .rl_func = sfd_tran_ep_fini, -}; - -static nni_reap_list sfd_tran_pipe_reap_list = { - .rl_offset = offsetof(sfd_tran_pipe, reap), - .rl_func = sfd_tran_pipe_fini, -}; static void sfd_tran_init(void) @@ -119,11 +104,15 @@ sfd_tran_pipe_close(void *arg) static void sfd_tran_pipe_stop(void *arg) { - sfd_tran_pipe *p = arg; + sfd_tran_pipe *p = arg; + sfd_tran_ep *ep = p->ep; nni_aio_stop(&p->rxaio); nni_aio_stop(&p->txaio); nni_aio_stop(&p->negoaio); + nni_mtx_lock(&ep->mtx); + nni_list_node_remove(&p->node); + nni_mtx_unlock(&ep->mtx); } static int @@ -131,6 +120,12 @@ sfd_tran_pipe_init(void *arg, nni_pipe *npipe) { sfd_tran_pipe *p = arg; p->npipe = npipe; + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p); + nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p); + nni_aio_list_init(&p->recvq); + nni_aio_list_init(&p->sendq); return (0); } @@ -139,18 +134,6 @@ static void sfd_tran_pipe_fini(void *arg) { sfd_tran_pipe *p = arg; - sfd_tran_ep *ep; - - sfd_tran_pipe_stop(p); - if ((ep = p->ep) != NULL) { - nni_mtx_lock(&ep->mtx); - nni_list_node_remove(&p->node); - ep->refcnt--; - if (ep->fini && (ep->refcnt == 0)) { - nni_reap(&sfd_tran_ep_reap_list, ep); - } - nni_mtx_unlock(&ep->mtx); - } nni_aio_fini(&p->rxaio); nni_aio_fini(&p->txaio); @@ -158,39 +141,6 @@ sfd_tran_pipe_fini(void *arg) nng_stream_free(p->conn); nni_msg_free(p->rxmsg); nni_mtx_fini(&p->mtx); - NNI_FREE_STRUCT(p); -} - -static void -sfd_tran_pipe_reap(sfd_tran_pipe *p) -{ - if (!nni_atomic_flag_test_and_set(&p->reaped)) { - if (p->conn != NULL) { - nng_stream_close(p->conn); - } - nni_reap(&sfd_tran_pipe_reap_list, p); - } -} - -static int -sfd_tran_pipe_alloc(sfd_tran_pipe **pipep) -{ - sfd_tran_pipe *p; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&p->mtx); - nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p); - nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p); - nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p); - nni_aio_list_init(&p->recvq); - nni_aio_list_init(&p->sendq); - nni_atomic_flag_reset(&p->reaped); - - *pipep = p; - - return (0); } static void @@ -204,10 +154,9 @@ sfd_tran_ep_match(sfd_tran_ep *ep) return; } nni_list_remove(&ep->waitpipes, p); - nni_list_append(&ep->busypipes, p); ep->useraio = NULL; p->rcvmax = ep->rcvmax; - nni_aio_set_output(aio, 0, p); + nni_aio_set_output(aio, 0, p->npipe); nni_aio_finish(aio, 0, 0); } @@ -288,7 +237,8 @@ error: nni_aio_finish_error(uaio, rv); } nni_mtx_unlock(&ep->mtx); - sfd_tran_pipe_reap(p); + nni_pipe_close(p->npipe); + nni_pipe_rele(p->npipe); } static void @@ -620,8 +570,6 @@ sfd_tran_pipe_start(sfd_tran_pipe *p, nng_stream *conn, sfd_tran_ep *ep) { nni_iov iov; - ep->refcnt++; - p->conn = conn; p->ep = ep; p->proto = ep->proto; @@ -659,18 +607,12 @@ sfd_tran_ep_fini(void *arg) sfd_tran_ep *ep = arg; nni_mtx_lock(&ep->mtx); - ep->fini = true; - if (ep->refcnt != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } nni_mtx_unlock(&ep->mtx); nni_aio_stop(&ep->connaio); nng_stream_listener_free(ep->listener); nni_aio_fini(&ep->connaio); nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); } static void @@ -691,9 +633,6 @@ sfd_tran_ep_close(void *arg) NNI_LIST_FOREACH (&ep->waitpipes, p) { sfd_tran_pipe_close(p); } - NNI_LIST_FOREACH (&ep->busypipes, p) { - sfd_tran_pipe_close(p); - } if (ep->useraio != NULL) { nni_aio_finish_error(ep->useraio, NNG_ECLOSED); ep->useraio = NULL; @@ -718,15 +657,13 @@ sfd_tran_accept_cb(void *arg) } conn = nni_aio_get_output(aio, 0); - if ((rv = sfd_tran_pipe_alloc(&p)) != 0) { + if (ep->closed) { nng_stream_free(conn); + rv = NNG_ECLOSED; goto error; } - - if (ep->closed) { - sfd_tran_pipe_fini(p); + if ((rv = nni_pipe_alloc_listener((void **) &p, ep->nlistener)) != 0) { nng_stream_free(conn); - rv = NNG_ECLOSED; goto error; } sfd_tran_pipe_start(p, conn, ep); @@ -748,18 +685,17 @@ error: } static int -sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock) +sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) { - sfd_tran_ep *ep; - NNI_ARG_UNUSED(url); + sfd_tran_ep *ep = (void *) lp; + int rv; + nni_sock *sock = nni_listener_sock(nlistener); - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } + ep->nlistener = nlistener; nni_mtx_init(&ep->mtx); - NNI_LIST_INIT(&ep->busypipes, sfd_tran_pipe, node); NNI_LIST_INIT(&ep->waitpipes, sfd_tran_pipe, node); NNI_LIST_INIT(&ep->negopipes, sfd_tran_pipe, node); + nni_aio_init(&ep->connaio, sfd_tran_accept_cb, ep); ep->proto = nni_sock_proto_id(sock); @@ -774,17 +710,6 @@ sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock) nni_stat_init(&ep->st_rcv_max, &rcv_max_info); #endif - *epp = ep; - return (0); -} - -static int -sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) -{ - sfd_tran_ep *ep; - int rv; - nni_sock *sock = nni_listener_sock(nlistener); - // Check for invalid URL components -- we only accept a bare scheme if ((url->u_hostname != NULL) || (strlen(url->u_path) != 0) || (url->u_fragment != NULL) || (url->u_userinfo != NULL) || @@ -792,12 +717,6 @@ sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) return (NNG_EADDRINVAL); } - if ((rv = sfd_tran_ep_init(&ep, url, sock)) != 0) { - return (rv); - } - - nni_aio_init(&ep->connaio, sfd_tran_accept_cb, ep); - if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) { sfd_tran_ep_fini(ep); return (rv); @@ -807,7 +726,6 @@ sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) nni_listener_add_stat(nlistener, &ep->st_rcv_max); #endif - *lp = ep; return (0); } @@ -896,6 +814,7 @@ sfd_tran_ep_accept(void *arg, nni_aio *aio) } static nni_sp_pipe_ops sfd_tran_pipe_ops = { + .p_size = sizeof(sfd_tran_pipe), .p_init = sfd_tran_pipe_init, .p_fini = sfd_tran_pipe_fini, .p_stop = sfd_tran_pipe_stop, @@ -947,6 +866,7 @@ sfd_tran_listener_setopt( } static nni_sp_listener_ops sfd_tran_listener_ops = { + .l_size = sizeof(sfd_tran_ep), .l_init = sfd_tran_listener_init, .l_fini = sfd_tran_ep_fini, .l_bind = sfd_tran_ep_bind, |
