From d6ab6bca7a538c1a320ce00ab845e98c16649c94 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 7 Dec 2024 22:28:15 -0800 Subject: pipes and endpoints: support for inline allocations of transport data This is a new transport API, which should make it easier for transports to rely upon lifetime guarantees made by the common SP framework, thus eliminating the need for transport specific reference counters, reap lists, and similar. The transport declares the size of the object in the ops vector (for pipe, dialer, or listener), and the framework supplies one allocated using the associated allocator. For now these add the pipe object to the socket and endpoint using linked linked lists. The plan is to transition those to reference counts which should be lighter weight and free form locking issues. The pipe teardown has been moved more fully to the reaper, to avoid some of the deadlocks that can occur as nni_pipe_close can be called from almost any context. For now the old API is retained as well, but the intention is to convert all the transports and then remove it. --- src/core/socket.c | 144 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 91 insertions(+), 53 deletions(-) (limited to 'src/core/socket.c') diff --git a/src/core/socket.c b/src/core/socket.c index fe1b62d2..ec2691a3 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -645,6 +645,13 @@ nni_sock_shutdown(nni_sock *sock) nni_dialer_close(d); nni_mtx_lock(&sock->s_mx); } + + // For each pipe, arrange for it to teardown hard. We would + // expect there not to be any here. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_close(pipe); + } + nni_mtx_unlock(&sock->s_mx); // Close the upper queues immediately. @@ -679,30 +686,15 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_unlock(&sock_lk); nni_mtx_lock(&sock->s_mx); - - // At this point, we've done everything we politely can to - // give the protocol a chance to flush its write side. Now - // it is time to be a little more insistent. - - // For each pipe, arrange for it to teardown hard. We would - // expect there not to be any here. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_close(pipe); - } - // We have to wait for pipes to be removed. while (!nni_list_empty(&sock->s_pipes)) { nni_cv_wait(&sock->s_cv); } - - sock->s_sock_ops.sock_close(sock->s_data); - - nni_cv_wake(&sock->s_cv); - NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL); - nni_mtx_unlock(&sock->s_mx); + sock->s_sock_ops.sock_close(sock->s_data); + // At this point, there are no threads blocked inside of us // that are referencing socket state. User code should call // nng_close to release the last resources. @@ -1277,24 +1269,16 @@ nni_dialer_timer_start(nni_dialer *d) nni_mtx_unlock(&s->s_mx); } -void -nni_dialer_add_pipe(nni_dialer *d, void *tpipe) +static void +dialer_start_pipe(nni_dialer *d, nni_pipe *p) { nni_sock *s = d->d_sock; - nni_pipe *p; nni_mtx_lock(&s->s_mx); - - if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; - } - - nni_list_append(&d->d_pipes, p); - nni_list_append(&s->s_pipes, p); d->d_pipe = p; d->d_currtime = d->d_inirtime; nni_mtx_unlock(&s->s_mx); + #ifdef NNG_ENABLE_STATS nni_stat_inc(&s->st_pipes, 1); nni_stat_inc(&d->st_pipes, 1); @@ -1341,6 +1325,24 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) nni_pipe_rele(p); } +void +nni_dialer_add_pipe(nni_dialer *d, void *tpipe) +{ + nni_sock *s = d->d_sock; + nni_pipe *p; + + if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { + return; + } + + nni_mtx_lock(&s->s_mx); + d->d_pipe = p; + d->d_currtime = d->d_inirtime; + nni_mtx_unlock(&s->s_mx); + + dialer_start_pipe(d, p); +} + void nni_dialer_shutdown(nni_dialer *d) { @@ -1403,21 +1405,11 @@ nni_dialer_reap(nni_dialer *d) nni_reap(&dialer_reap_list, d); } -void -nni_listener_add_pipe(nni_listener *l, void *tpipe) +static void +listener_start_pipe(nni_listener *l, nni_pipe *p) { nni_sock *s = l->l_sock; - nni_pipe *p; - - nni_mtx_lock(&s->s_mx); - if (nni_pipe_create_listener(&p, l, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; - } - nni_list_append(&l->l_pipes, p); - nni_list_append(&s->s_pipes, p); - nni_mtx_unlock(&s->s_mx); #ifdef NNG_ENABLE_STATS nni_stat_inc(&l->st_pipes, 1); nni_stat_inc(&s->st_pipes, 1); @@ -1454,9 +1446,24 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) "Accepted pipe<%u> on socket<%u> from %s", nni_pipe_id(p), nni_sock_id(s), nni_pipe_peer_addr(p, addr)); } + + // the socket now "owns" the pipe, and a pipe close should immediately + // start the process of teardown. nni_pipe_rele(p); } +void +nni_listener_add_pipe(nni_listener *l, void *tpipe) +{ + nni_pipe *p; + + if (nni_pipe_create_listener(&p, l, tpipe) != 0) { + return; + } + + listener_start_pipe(l, p); +} + void nni_listener_shutdown(nni_listener *l) { @@ -1520,6 +1527,44 @@ nni_listener_reap(nni_listener *l) nni_reap(&listener_reap_list, l); } +// nni_pipe_add just registers the pipe with the socket and endpoint +// so they won't be deallocated while the pipe still exists. +void +nni_pipe_add(nni_pipe *p) +{ + nni_sock *s = p->p_sock; + nni_dialer *d = p->p_dialer; + nni_listener *l = p->p_listener; + + nni_mtx_lock(&s->s_mx); + nni_list_append(&s->s_pipes, p); + if (d != NULL) { + NNI_ASSERT(l == NULL); + nni_list_append(&d->d_pipes, p); + } + if (l != NULL) { + NNI_ASSERT(d == NULL); + nni_list_append(&l->l_pipes, p); + } + nni_mtx_unlock(&s->s_mx); +} + +// nni_pipe_start attempts to start the pipe, adding it to the socket and +// endpoints and calling callbacks, etc. The pipe should already have finished +// any negotiation needed at the transport layer. +void +nni_pipe_start(nni_pipe *p) +{ + if (p->p_listener) { + NNI_ASSERT(p->p_dialer == NULL); + listener_start_pipe(p->p_listener, p); + } + if (p->p_dialer) { + NNI_ASSERT(p->p_listener == NULL); + dialer_start_pipe(p->p_dialer, p); + } +} + void nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) { @@ -1528,14 +1573,11 @@ nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) void *arg; nni_mtx_lock(&s->s_pipe_cbs_mtx); - if (!p->p_cbs) { - if (ev == NNG_PIPE_EV_ADD_PRE) { - // First event, after this we want all other events. - p->p_cbs = true; - } else { - nni_mtx_unlock(&s->s_pipe_cbs_mtx); - return; - } + if (ev == NNG_PIPE_EV_ADD_PRE) { + p->p_cbs = true; + } else if (!p->p_cbs) { + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + return; } cb = s->s_pipe_cbs[ev].cb_fn; arg = s->s_pipe_cbs[ev].cb_arg; @@ -1556,9 +1598,7 @@ nni_pipe_remove(nni_pipe *p) nni_mtx_lock(&s->s_mx); #ifdef NNG_ENABLE_STATS - if (nni_list_node_active(&p->p_sock_node)) { - nni_stat_dec(&s->st_pipes, 1); - } + nni_stat_dec(&s->st_pipes, 1); if (p->p_listener != NULL) { nni_stat_dec(&p->p_listener->st_pipes, 1); } @@ -1568,8 +1608,6 @@ nni_pipe_remove(nni_pipe *p) #endif nni_list_node_remove(&p->p_sock_node); nni_list_node_remove(&p->p_ep_node); - p->p_listener = NULL; - p->p_dialer = NULL; if ((d != NULL) && (d->d_pipe == p)) { d->d_pipe = NULL; dialer_timer_start_locked(d); // Kick the timer to redial. -- cgit v1.2.3-70-g09d2