From 382b4cff3abd5ccb282ba420ef1f7c7d171ec91a Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 4 Jan 2020 10:24:05 -0800 Subject: fixes #1105 pollable can be inlined, and use atomics This also introduces an nni_atomic_cas64 to help with lock-free designs. Some mechanical renaming was done in some of the protocols for spelling. --- src/core/platform.h | 15 +++++--- src/core/pollable.c | 103 +++++++++++++++++++++++++++++++--------------------- src/core/pollable.h | 16 ++++++-- 3 files changed, 84 insertions(+), 50 deletions(-) (limited to 'src/core') diff --git a/src/core/platform.h b/src/core/platform.h index 355ef7eb..53d25137 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. +// Copyright 2020 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // Copyright 2018 Devolutions // @@ -189,13 +189,18 @@ extern uint64_t nni_atomic_swap64(nni_atomic_u64 *, uint64_t); extern uint64_t nni_atomic_dec64_nv(nni_atomic_u64 *); extern void nni_atomic_inc64(nni_atomic_u64 *); +// nni_atomic_cas64 is a compare and swap. The second argument is the +// value to compare against, and the third is the new value. Returns +// true if the value was set. +extern bool nni_atomic_cas64(nni_atomic_u64 *, uint64_t, uint64_t); + // // Clock Support // // nn_plat_clock returns a number of milliseconds since some arbitrary time // in the past. The values returned by nni_clock must use the same base -// as the times used in nni_plat_cond_waituntil. The nni_plat_clock() must +// as the times used in nni_plat_cond_until. The nni_plat_clock() must // return values > 0, and must return values smaller than 2^63. (We could // relax this last constraint, but there is no reason to, and leaves us the // option of using negative values for other purposes in the future.) @@ -213,9 +218,9 @@ uint32_t nni_random(void); // nni_plat_init is called to allow the platform the chance to // do any necessary initialization. This routine MUST be idempotent, -// and threadsafe, and will be called before any other API calls, and +// and thread-safe, and will be called before any other API calls, and // may be called at any point thereafter. It is permitted to return -// an error if some critical failure inializing the platform occurs, +// an error if some critical failure initializing the platform occurs, // but once this succeeds, all future calls must succeed as well, unless // nni_plat_fini has been called. // @@ -274,7 +279,7 @@ extern int nni_tcp_dialer_getopt( extern int nni_tcp_listener_init(nni_tcp_listener **); // nni_tcp_listener_fini frees the listener and all associated resources. -// It implictly closes the listener as well. +// It implicitly closes the listener as well. extern void nni_tcp_listener_fini(nni_tcp_listener *); // nni_tcp_listener_close closes the listener. This will unbind diff --git a/src/core/pollable.c b/src/core/pollable.c index a121ba3f..fb6af0f5 100644 --- a/src/core/pollable.c +++ b/src/core/pollable.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. +// Copyright 2020 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -10,13 +10,33 @@ #include "core/nng_impl.h" -struct nni_pollable { - int p_rfd; - int p_wfd; - nni_mtx p_lock; - bool p_raised; - bool p_open; -}; +// We pack the wfd and rfd into a uint64_t so that we can update the pair +// atomically and use nni_atomic_cas64, to be lock free. +#define WFD(fds) ((int) ((fds) &0xffffffffu)) +#define RFD(fds) ((int) (((fds) >> 32u) & 0xffffffffu)) +#define FD_JOIN(wfd, rfd) ((uint64_t)(wfd) + ((uint64_t)(rfd) << 32u)) + +void +nni_pollable_init(nni_pollable *p) +{ + nni_atomic_init_bool(&p->p_raised); + nni_atomic_set64(&p->p_fds, (uint64_t) -1); +} + +void +nni_pollable_fini(nni_pollable *p) +{ + uint64_t fds; + + fds = nni_atomic_get64(&p->p_fds); + if (fds != (uint64_t) -1) { + int rfd, wfd; + // Read in the high order, write in the low order. + rfd = RFD(fds); + wfd = WFD(fds); + nni_plat_pipe_close(rfd, wfd); + } +} int nni_pollable_alloc(nni_pollable **pp) @@ -25,9 +45,7 @@ nni_pollable_alloc(nni_pollable **pp) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - p->p_open = false; - p->p_raised = false; - nni_mtx_init(&p->p_lock); + nni_pollable_init(p); *pp = p; return (0); } @@ -38,10 +56,7 @@ nni_pollable_free(nni_pollable *p) if (p == NULL) { return; } - if (p->p_open) { - nni_plat_pipe_close(p->p_rfd, p->p_wfd); - } - nni_mtx_fini(&p->p_lock); + nni_pollable_fini(p); NNI_FREE_STRUCT(p); } @@ -51,16 +66,12 @@ nni_pollable_raise(nni_pollable *p) if (p == NULL) { return; } - nni_mtx_lock(&p->p_lock); - if (!p->p_raised) { - p->p_raised = true; - if (p->p_open) { - nni_mtx_unlock(&p->p_lock); - nni_plat_pipe_raise(p->p_wfd); - return; + if (!nni_atomic_swap_bool(&p->p_raised, true)) { + uint64_t fds; + if ((fds = nni_atomic_get64(&p->p_fds)) != (uint64_t) -1) { + nni_plat_pipe_raise(WFD(fds)); } } - nni_mtx_unlock(&p->p_lock); } void @@ -69,16 +80,12 @@ nni_pollable_clear(nni_pollable *p) if (p == NULL) { return; } - nni_mtx_lock(&p->p_lock); - if (p->p_raised) { - p->p_raised = false; - if (p->p_open) { - nni_mtx_unlock(&p->p_lock); - nni_plat_pipe_clear(p->p_rfd); - return; + if (nni_atomic_swap_bool(&p->p_raised, false)) { + uint64_t fds; + if ((fds = nni_atomic_get64(&p->p_fds)) != (uint64_t) -1) { + nni_plat_pipe_clear(RFD(fds)); } } - nni_mtx_unlock(&p->p_lock); } int @@ -87,19 +94,31 @@ nni_pollable_getfd(nni_pollable *p, int *fdp) if (p == NULL) { return (NNG_EINVAL); } - nni_mtx_lock(&p->p_lock); - if (!p->p_open) { - int rv; - if ((rv = nni_plat_pipe_open(&p->p_wfd, &p->p_rfd)) != 0) { - nni_mtx_unlock(&p->p_lock); + + for (;;) { + int rfd; + int wfd; + int rv; + uint64_t fds; + + if ((fds = nni_atomic_get64(&p->p_fds)) != (uint64_t) -1) { + *fdp = RFD(fds); + return (0); + } + if ((rv = nni_plat_pipe_open(&wfd, &rfd)) != 0) { return (rv); } - p->p_open = true; - if (p->p_raised) { - nni_plat_pipe_raise(p->p_wfd); + fds = FD_JOIN(wfd, rfd); + + if (nni_atomic_cas64(&p->p_fds, (uint64_t) -1, fds)) { + if (nni_atomic_get_bool(&p->p_raised)) { + nni_plat_pipe_raise(wfd); + } + *fdp = rfd; + return (0); } + + // Someone beat us. Close ours, and try again. + nni_plat_pipe_close(wfd, rfd); } - nni_mtx_unlock(&p->p_lock); - *fdp = p->p_rfd; - return (0); } diff --git a/src/core/pollable.h b/src/core/pollable.h index 50ec9bf6..a71a9693 100644 --- a/src/core/pollable.h +++ b/src/core/pollable.h @@ -1,5 +1,5 @@ // -// Copyright 2017 Garrett D'Amore +// Copyright 2020 Staysail Systems, Inc. // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -13,8 +13,6 @@ #include "core/defs.h" #include "core/list.h" -// For the sake of simplicity, we just maintain a single global timer thread. - typedef struct nni_pollable nni_pollable; extern int nni_pollable_alloc(nni_pollable **); @@ -23,4 +21,16 @@ extern void nni_pollable_raise(nni_pollable *); extern void nni_pollable_clear(nni_pollable *); extern int nni_pollable_getfd(nni_pollable *, int *); +// nni_pollable implementation details are private. Only here for inlining. +// We have joined to the write and read file descriptors into a a single +// atomic 64 so we can update them together (and we can use cas to be sure +// that such updates are always safe.) +struct nni_pollable { + nni_atomic_u64 p_fds; + nni_atomic_bool p_raised; +}; + +extern void nni_pollable_init(nni_pollable *); +extern void nni_pollable_fini(nni_pollable *); + #endif // CORE_POLLABLE_H -- cgit v1.2.3-70-g09d2