diff options
| author | Garrett D'Amore <garrett@damore.org> | 2021-09-06 13:08:44 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2021-09-06 13:11:57 -0700 |
| commit | 515cdff883f44fbd192f7d957f47d6e2e1385dd0 (patch) | |
| tree | 7fe06f7e71ed2b7bbaf87c9a917437d219c2182f | |
| parent | 468714a51bbc9fc6acf03479b8825ad25a2ffeb0 (diff) | |
| download | nng-515cdff883f44fbd192f7d957f47d6e2e1385dd0.tar.gz nng-515cdff883f44fbd192f7d957f47d6e2e1385dd0.tar.bz2 nng-515cdff883f44fbd192f7d957f47d6e2e1385dd0.zip | |
Eliminate the pipe mutex and use atomic for pipe closed.
This eliminates several mutex operations done each time a pipe
is created or destroyed. For large scale systems this should
reduce overall pressure on the memory subsystem, and scale better
as many threads are coming and going.
This also reduces the overall size of nni_pipe -- on Linux by
36 bytes typically.
| -rw-r--r-- | src/core/pipe.c | 27 | ||||
| -rw-r--r-- | src/core/socket.c | 13 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 4 |
3 files changed, 18 insertions, 26 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 47a78f35..a61ca74d 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -88,7 +88,6 @@ pipe_destroy(void *arg) p->p_tran_ops.p_fini(p->p_tran_data); } nni_cv_fini(&p->p_cv); - nni_mtx_fini(&p->p_mtx); nni_free(p, p->p_size); } @@ -146,14 +145,9 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) void nni_pipe_close(nni_pipe *p) { - nni_mtx_lock(&p->p_mtx); - if (p->p_closed) { - // We already did a close. - nni_mtx_unlock(&p->p_mtx); - return; + if (nni_atomic_swap_bool(&p->p_closed, true)) { + return; // We already did a close. } - p->p_closed = true; - nni_mtx_unlock(&p->p_mtx); if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_close(p->p_proto_data); @@ -167,6 +161,12 @@ nni_pipe_close(nni_pipe *p) nni_reap(&pipe_reap_list, p); } +bool +nni_pipe_is_closed(nni_pipe *p) +{ + return (nni_atomic_get_bool(&p->p_closed)); +} + uint16_t nni_pipe_peer(nni_pipe *p) { @@ -245,9 +245,9 @@ pipe_stats_init(nni_pipe *p) static int pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tdata) { - nni_pipe * p; + nni_pipe *p; int rv; - void * sdata = nni_sock_proto_data(sock); + void *sdata = nni_sock_proto_data(sock); nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); size_t sz; @@ -265,15 +265,14 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tdata) p->p_tran_data = tdata; p->p_proto_ops = *pops; p->p_sock = sock; - p->p_closed = false; p->p_cbs = false; p->p_ref = 0; + nni_atomic_init_bool(&p->p_closed); nni_atomic_flag_reset(&p->p_stop); NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); - nni_mtx_init(&p->p_mtx); nni_cv_init(&p->p_cv, &pipes_lk); nni_mtx_lock(&pipes_lk); @@ -302,7 +301,7 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tdata) { int rv; nni_sp_tran *tran = d->d_tran; - nni_pipe * p; + nni_pipe *p; if ((rv = pipe_create(&p, d->d_sock, tran, tdata)) != 0) { return (rv); @@ -326,7 +325,7 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tdata) { int rv; nni_sp_tran *tran = l->l_tran; - nni_pipe * p; + nni_pipe *p; if ((rv = pipe_create(&p, l->l_sock, tran, tdata)) != 0) { return (rv); diff --git a/src/core/socket.c b/src/core/socket.c index e376b773..24403ac7 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1489,9 +1489,7 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); - nni_mtx_lock(&s->s_mx); - if (p->p_closed) { - nni_mtx_unlock(&s->s_mx); + if (nni_pipe_is_closed(p)) { #ifdef NNG_ENABLE_STATS nni_stat_inc(&d->st_reject, 1); nni_stat_inc(&s->st_rejects, 1); @@ -1499,8 +1497,8 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) nni_pipe_rele(p); return; } + if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) { - nni_mtx_unlock(&s->s_mx); #ifdef NNG_ENABLE_STATS nni_stat_inc(&d->st_reject, 1); nni_stat_inc(&s->st_rejects, 1); @@ -1509,7 +1507,6 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) nni_pipe_rele(p); return; } - nni_mtx_unlock(&s->s_mx); #ifdef NNG_ENABLE_STATS nni_stat_register(&p->st_root); #endif @@ -1604,9 +1601,7 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); - nni_mtx_lock(&s->s_mx); - if (p->p_closed) { - nni_mtx_unlock(&s->s_mx); + if (nni_pipe_is_closed(p)) { #ifdef NNG_ENABLE_STATS nni_stat_inc(&l->st_reject, 1); nni_stat_inc(&s->st_rejects, 1); @@ -1615,7 +1610,6 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) return; } if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) { - nni_mtx_unlock(&s->s_mx); #ifdef NNG_ENABLE_STATS nni_stat_inc(&l->st_reject, 1); nni_stat_inc(&s->st_rejects, 1); @@ -1624,7 +1618,6 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_pipe_rele(p); return; } - nni_mtx_unlock(&s->s_mx); #ifdef NNG_ENABLE_STATS nni_stat_register(&p->st_root); #endif diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 850a4d80..83cbb877 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -104,11 +104,10 @@ struct nni_pipe { nni_sock *p_sock; nni_dialer *p_dialer; nni_listener *p_listener; - bool p_closed; + nni_atomic_bool p_closed; nni_atomic_flag p_stop; bool p_cbs; int p_ref; - nni_mtx p_mtx; nni_cv p_cv; nni_reap_node p_reap; @@ -141,6 +140,7 @@ extern void nni_listener_destroy(nni_listener *); extern void nni_listener_stop(nni_listener *); extern void nni_pipe_remove(nni_pipe *); +extern bool nni_pipe_is_closed(nni_pipe *); extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev); extern int nni_pipe_create_dialer(nni_pipe **, nni_dialer *, void *); extern int nni_pipe_create_listener(nni_pipe **, nni_listener *, void *); |
