summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-09-06 13:08:44 -0700
committerGarrett D'Amore <garrett@damore.org>2021-09-06 13:11:57 -0700
commit515cdff883f44fbd192f7d957f47d6e2e1385dd0 (patch)
tree7fe06f7e71ed2b7bbaf87c9a917437d219c2182f
parent468714a51bbc9fc6acf03479b8825ad25a2ffeb0 (diff)
downloadnng-515cdff883f44fbd192f7d957f47d6e2e1385dd0.tar.gz
nng-515cdff883f44fbd192f7d957f47d6e2e1385dd0.tar.bz2
nng-515cdff883f44fbd192f7d957f47d6e2e1385dd0.zip
Eliminate the pipe mutex and use atomic for pipe closed.
This eliminates several mutex operations done each time a pipe is created or destroyed. For large scale systems this should reduce overall pressure on the memory subsystem, and scale better as many threads are coming and going. This also reduces the overall size of nni_pipe -- on Linux by 36 bytes typically.
-rw-r--r--src/core/pipe.c27
-rw-r--r--src/core/socket.c13
-rw-r--r--src/core/sockimpl.h4
3 files changed, 18 insertions, 26 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 47a78f35..a61ca74d 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -88,7 +88,6 @@ pipe_destroy(void *arg)
p->p_tran_ops.p_fini(p->p_tran_data);
}
nni_cv_fini(&p->p_cv);
- nni_mtx_fini(&p->p_mtx);
nni_free(p, p->p_size);
}
@@ -146,14 +145,9 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
void
nni_pipe_close(nni_pipe *p)
{
- nni_mtx_lock(&p->p_mtx);
- if (p->p_closed) {
- // We already did a close.
- nni_mtx_unlock(&p->p_mtx);
- return;
+ if (nni_atomic_swap_bool(&p->p_closed, true)) {
+ return; // We already did a close.
}
- p->p_closed = true;
- nni_mtx_unlock(&p->p_mtx);
if (p->p_proto_data != NULL) {
p->p_proto_ops.pipe_close(p->p_proto_data);
@@ -167,6 +161,12 @@ nni_pipe_close(nni_pipe *p)
nni_reap(&pipe_reap_list, p);
}
+bool
+nni_pipe_is_closed(nni_pipe *p)
+{
+ return (nni_atomic_get_bool(&p->p_closed));
+}
+
uint16_t
nni_pipe_peer(nni_pipe *p)
{
@@ -245,9 +245,9 @@ pipe_stats_init(nni_pipe *p)
static int
pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tdata)
{
- nni_pipe * p;
+ nni_pipe *p;
int rv;
- void * sdata = nni_sock_proto_data(sock);
+ void *sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
size_t sz;
@@ -265,15 +265,14 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tdata)
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
p->p_sock = sock;
- p->p_closed = false;
p->p_cbs = false;
p->p_ref = 0;
+ nni_atomic_init_bool(&p->p_closed);
nni_atomic_flag_reset(&p->p_stop);
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
- nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &pipes_lk);
nni_mtx_lock(&pipes_lk);
@@ -302,7 +301,7 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tdata)
{
int rv;
nni_sp_tran *tran = d->d_tran;
- nni_pipe * p;
+ nni_pipe *p;
if ((rv = pipe_create(&p, d->d_sock, tran, tdata)) != 0) {
return (rv);
@@ -326,7 +325,7 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tdata)
{
int rv;
nni_sp_tran *tran = l->l_tran;
- nni_pipe * p;
+ nni_pipe *p;
if ((rv = pipe_create(&p, l->l_sock, tran, tdata)) != 0) {
return (rv);
diff --git a/src/core/socket.c b/src/core/socket.c
index e376b773..24403ac7 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -1489,9 +1489,7 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
- nni_mtx_lock(&s->s_mx);
- if (p->p_closed) {
- nni_mtx_unlock(&s->s_mx);
+ if (nni_pipe_is_closed(p)) {
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&d->st_reject, 1);
nni_stat_inc(&s->st_rejects, 1);
@@ -1499,8 +1497,8 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_pipe_rele(p);
return;
}
+
if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
- nni_mtx_unlock(&s->s_mx);
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&d->st_reject, 1);
nni_stat_inc(&s->st_rejects, 1);
@@ -1509,7 +1507,6 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_pipe_rele(p);
return;
}
- nni_mtx_unlock(&s->s_mx);
#ifdef NNG_ENABLE_STATS
nni_stat_register(&p->st_root);
#endif
@@ -1604,9 +1601,7 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
- nni_mtx_lock(&s->s_mx);
- if (p->p_closed) {
- nni_mtx_unlock(&s->s_mx);
+ if (nni_pipe_is_closed(p)) {
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&l->st_reject, 1);
nni_stat_inc(&s->st_rejects, 1);
@@ -1615,7 +1610,6 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
return;
}
if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
- nni_mtx_unlock(&s->s_mx);
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&l->st_reject, 1);
nni_stat_inc(&s->st_rejects, 1);
@@ -1624,7 +1618,6 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_pipe_rele(p);
return;
}
- nni_mtx_unlock(&s->s_mx);
#ifdef NNG_ENABLE_STATS
nni_stat_register(&p->st_root);
#endif
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index 850a4d80..83cbb877 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -104,11 +104,10 @@ struct nni_pipe {
nni_sock *p_sock;
nni_dialer *p_dialer;
nni_listener *p_listener;
- bool p_closed;
+ nni_atomic_bool p_closed;
nni_atomic_flag p_stop;
bool p_cbs;
int p_ref;
- nni_mtx p_mtx;
nni_cv p_cv;
nni_reap_node p_reap;
@@ -141,6 +140,7 @@ extern void nni_listener_destroy(nni_listener *);
extern void nni_listener_stop(nni_listener *);
extern void nni_pipe_remove(nni_pipe *);
+extern bool nni_pipe_is_closed(nni_pipe *);
extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev);
extern int nni_pipe_create_dialer(nni_pipe **, nni_dialer *, void *);
extern int nni_pipe_create_listener(nni_pipe **, nni_listener *, void *);