diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-07 22:28:15 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-08 14:02:42 -0800 |
| commit | d6ab6bca7a538c1a320ce00ab845e98c16649c94 (patch) | |
| tree | d4a8a4f8ac49eee54c2d9bbdf8f9da6c28362311 /src/core/socket.c | |
| parent | 5223a142e38a320ce53097cea450d8ba7f175193 (diff) | |
| download | nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.gz nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.tar.bz2 nng-d6ab6bca7a538c1a320ce00ab845e98c16649c94.zip | |
pipes and endpoints: support for inline allocations of transport data
This is a new transport API, which should make it easier for transports
to rely upon lifetime guarantees made by the common SP framework, thus
eliminating the need for transport specific reference counters, reap
lists, and similar.
The transport declares the size of the object in the ops vector (for
pipe, dialer, or listener), and the framework supplies one allocated
using the associated allocator.
For now these add the pipe object to the socket and endpoint using
linked linked lists. The plan is to transition those to reference
counts which should be lighter weight and free form locking issues.
The pipe teardown has been moved more fully to the reaper, to avoid
some of the deadlocks that can occur as nni_pipe_close can be called
from almost any context.
For now the old API is retained as well, but the intention is to convert
all the transports and then remove it.
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 144 |
1 files changed, 91 insertions, 53 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index fe1b62d2..ec2691a3 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -645,6 +645,13 @@ nni_sock_shutdown(nni_sock *sock) nni_dialer_close(d); nni_mtx_lock(&sock->s_mx); } + + // For each pipe, arrange for it to teardown hard. We would + // expect there not to be any here. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_close(pipe); + } + nni_mtx_unlock(&sock->s_mx); // Close the upper queues immediately. @@ -679,30 +686,15 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_unlock(&sock_lk); nni_mtx_lock(&sock->s_mx); - - // At this point, we've done everything we politely can to - // give the protocol a chance to flush its write side. Now - // it is time to be a little more insistent. - - // For each pipe, arrange for it to teardown hard. We would - // expect there not to be any here. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_close(pipe); - } - // We have to wait for pipes to be removed. while (!nni_list_empty(&sock->s_pipes)) { nni_cv_wait(&sock->s_cv); } - - sock->s_sock_ops.sock_close(sock->s_data); - - nni_cv_wake(&sock->s_cv); - NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL); - nni_mtx_unlock(&sock->s_mx); + sock->s_sock_ops.sock_close(sock->s_data); + // At this point, there are no threads blocked inside of us // that are referencing socket state. User code should call // nng_close to release the last resources. @@ -1277,24 +1269,16 @@ nni_dialer_timer_start(nni_dialer *d) nni_mtx_unlock(&s->s_mx); } -void -nni_dialer_add_pipe(nni_dialer *d, void *tpipe) +static void +dialer_start_pipe(nni_dialer *d, nni_pipe *p) { nni_sock *s = d->d_sock; - nni_pipe *p; nni_mtx_lock(&s->s_mx); - - if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; - } - - nni_list_append(&d->d_pipes, p); - nni_list_append(&s->s_pipes, p); d->d_pipe = p; d->d_currtime = d->d_inirtime; nni_mtx_unlock(&s->s_mx); + #ifdef NNG_ENABLE_STATS nni_stat_inc(&s->st_pipes, 1); nni_stat_inc(&d->st_pipes, 1); @@ -1342,6 +1326,24 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) } void +nni_dialer_add_pipe(nni_dialer *d, void *tpipe) +{ + nni_sock *s = d->d_sock; + nni_pipe *p; + + if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { + return; + } + + nni_mtx_lock(&s->s_mx); + d->d_pipe = p; + d->d_currtime = d->d_inirtime; + nni_mtx_unlock(&s->s_mx); + + dialer_start_pipe(d, p); +} + +void nni_dialer_shutdown(nni_dialer *d) { nni_sock *s = d->d_sock; @@ -1403,21 +1405,11 @@ nni_dialer_reap(nni_dialer *d) nni_reap(&dialer_reap_list, d); } -void -nni_listener_add_pipe(nni_listener *l, void *tpipe) +static void +listener_start_pipe(nni_listener *l, nni_pipe *p) { nni_sock *s = l->l_sock; - nni_pipe *p; - - nni_mtx_lock(&s->s_mx); - if (nni_pipe_create_listener(&p, l, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; - } - nni_list_append(&l->l_pipes, p); - nni_list_append(&s->s_pipes, p); - nni_mtx_unlock(&s->s_mx); #ifdef NNG_ENABLE_STATS nni_stat_inc(&l->st_pipes, 1); nni_stat_inc(&s->st_pipes, 1); @@ -1454,10 +1446,25 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) "Accepted pipe<%u> on socket<%u> from %s", nni_pipe_id(p), nni_sock_id(s), nni_pipe_peer_addr(p, addr)); } + + // the socket now "owns" the pipe, and a pipe close should immediately + // start the process of teardown. nni_pipe_rele(p); } void +nni_listener_add_pipe(nni_listener *l, void *tpipe) +{ + nni_pipe *p; + + if (nni_pipe_create_listener(&p, l, tpipe) != 0) { + return; + } + + listener_start_pipe(l, p); +} + +void nni_listener_shutdown(nni_listener *l) { nni_sock *s = l->l_sock; @@ -1520,6 +1527,44 @@ nni_listener_reap(nni_listener *l) nni_reap(&listener_reap_list, l); } +// nni_pipe_add just registers the pipe with the socket and endpoint +// so they won't be deallocated while the pipe still exists. +void +nni_pipe_add(nni_pipe *p) +{ + nni_sock *s = p->p_sock; + nni_dialer *d = p->p_dialer; + nni_listener *l = p->p_listener; + + nni_mtx_lock(&s->s_mx); + nni_list_append(&s->s_pipes, p); + if (d != NULL) { + NNI_ASSERT(l == NULL); + nni_list_append(&d->d_pipes, p); + } + if (l != NULL) { + NNI_ASSERT(d == NULL); + nni_list_append(&l->l_pipes, p); + } + nni_mtx_unlock(&s->s_mx); +} + +// nni_pipe_start attempts to start the pipe, adding it to the socket and +// endpoints and calling callbacks, etc. The pipe should already have finished +// any negotiation needed at the transport layer. +void +nni_pipe_start(nni_pipe *p) +{ + if (p->p_listener) { + NNI_ASSERT(p->p_dialer == NULL); + listener_start_pipe(p->p_listener, p); + } + if (p->p_dialer) { + NNI_ASSERT(p->p_listener == NULL); + dialer_start_pipe(p->p_dialer, p); + } +} + void nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) { @@ -1528,14 +1573,11 @@ nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) void *arg; nni_mtx_lock(&s->s_pipe_cbs_mtx); - if (!p->p_cbs) { - if (ev == NNG_PIPE_EV_ADD_PRE) { - // First event, after this we want all other events. - p->p_cbs = true; - } else { - nni_mtx_unlock(&s->s_pipe_cbs_mtx); - return; - } + if (ev == NNG_PIPE_EV_ADD_PRE) { + p->p_cbs = true; + } else if (!p->p_cbs) { + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + return; } cb = s->s_pipe_cbs[ev].cb_fn; arg = s->s_pipe_cbs[ev].cb_arg; @@ -1556,9 +1598,7 @@ nni_pipe_remove(nni_pipe *p) nni_mtx_lock(&s->s_mx); #ifdef NNG_ENABLE_STATS - if (nni_list_node_active(&p->p_sock_node)) { - nni_stat_dec(&s->st_pipes, 1); - } + nni_stat_dec(&s->st_pipes, 1); if (p->p_listener != NULL) { nni_stat_dec(&p->p_listener->st_pipes, 1); } @@ -1568,8 +1608,6 @@ nni_pipe_remove(nni_pipe *p) #endif nni_list_node_remove(&p->p_sock_node); nni_list_node_remove(&p->p_ep_node); - p->p_listener = NULL; - p->p_dialer = NULL; if ((d != NULL) && (d->d_pipe == p)) { d->d_pipe = NULL; dialer_timer_start_locked(d); // Kick the timer to redial. |
