diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-08-15 14:09:17 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-08-16 23:07:35 -0700 |
| commit | 4f5e11c391c4a8f1b2731aee5ad47bc0c925042a (patch) | |
| tree | 640aef66eb7e0030a2833bc9bba3246edb29d074 | |
| parent | 750662d4aab305d8a3d48bfa6edfc4dac4018881 (diff) | |
| download | nng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.tar.gz nng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.tar.bz2 nng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.zip | |
fixes #1289 zerotier should have it's own copy of the id hashing code
fixes #1288 id allocation can overallocate
fixes #1126 consider removing lock from idhash
This substantially refactors the id hash code, giving a cleaner API,
and eliminating a extra locking as well as some wasteful allocations.
The ZeroTier code has it's own copy, that is 64-bit friendly, as the
rest of the consumers need only a simpler 32-bit API.
| -rw-r--r-- | include/nng/nng.h | 6 | ||||
| -rw-r--r-- | src/core/dialer.c | 78 | ||||
| -rw-r--r-- | src/core/dialer.h | 4 | ||||
| -rw-r--r-- | src/core/idhash.c | 341 | ||||
| -rw-r--r-- | src/core/idhash.h | 40 | ||||
| -rw-r--r-- | src/core/listener.c | 83 | ||||
| -rw-r--r-- | src/core/pipe.c | 63 | ||||
| -rw-r--r-- | src/core/socket.c | 90 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 8 | ||||
| -rw-r--r-- | src/protocol/pair1/pair.c | 15 | ||||
| -rw-r--r-- | src/protocol/pair1/pair1_poly.c | 14 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 20 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 24 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xrep.c | 20 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.c | 16 | ||||
| -rw-r--r-- | src/protocol/survey0/respond_test.c | 3 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c | 75 | ||||
| -rw-r--r-- | src/protocol/survey0/xrespond.c | 124 | ||||
| -rw-r--r-- | src/transport/zerotier/CMakeLists.txt | 8 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 62 | ||||
| -rw-r--r-- | src/transport/zerotier/zthash.c | 302 | ||||
| -rw-r--r-- | src/transport/zerotier/zthash.h | 43 | ||||
| -rw-r--r-- | tests/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | tests/id.c | 275 | ||||
| -rw-r--r-- | tests/idhash.c | 259 |
25 files changed, 1084 insertions, 891 deletions
diff --git a/include/nng/nng.h b/include/nng/nng.h index 6f5fdd45..3e70bc8f 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -761,10 +761,8 @@ NNG_DECL nng_dialer nng_pipe_dialer(nng_pipe); NNG_DECL nng_listener nng_pipe_listener(nng_pipe); // Flags. -enum nng_flag_enum { - NNG_FLAG_ALLOC = 1, // Recv to allocate receive buffer. - NNG_FLAG_NONBLOCK = 2 // Non-blocking operations. -}; +#define NNG_FLAG_ALLOC 1u // Recv to allocate receive buffer +#define NNG_FLAG_NONBLOCK 2u // Non-blocking operations // Options. #define NNG_OPT_SOCKNAME "socket-name" diff --git a/src/core/dialer.c b/src/core/dialer.c index fe9cb92f..80b93dc0 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -20,21 +20,16 @@ static void dialer_connect_start(nni_dialer *); static void dialer_connect_cb(void *); static void dialer_timer_cb(void *); -static nni_idhash *dialers; -static nni_mtx dialers_lk; +static nni_id_map dialers; +static nni_mtx dialers_lk; #define BUMP_STAT(x) nni_stat_inc_atomic(x, 1) int nni_dialer_sys_init(void) { - int rv; - - if ((rv = nni_idhash_init(&dialers)) != 0) { - return (rv); - } + nni_id_map_init(&dialers, 1, 0x7fffffff, false); nni_mtx_init(&dialers_lk); - nni_idhash_set_limits(dialers, 1, 0x7fffffff, 1); return (0); } @@ -44,8 +39,7 @@ nni_dialer_sys_fini(void) { nni_reap_drain(); nni_mtx_fini(&dialers_lk); - nni_idhash_fini(dialers); - dialers = NULL; + nni_id_map_fini(&dialers); } uint32_t @@ -57,11 +51,11 @@ nni_dialer_id(nni_dialer *d) void nni_dialer_destroy(nni_dialer *d) { - nni_aio_stop(d->d_con_aio); - nni_aio_stop(d->d_tmo_aio); + nni_aio_stop(&d->d_con_aio); + nni_aio_stop(&d->d_tmo_aio); - nni_aio_free(d->d_con_aio); - nni_aio_free(d->d_tmo_aio); + nni_aio_fini(&d->d_con_aio); + nni_aio_fini(&d->d_tmo_aio); if (d->d_data != NULL) { d->d_ops.d_fini(d->d_data); @@ -112,7 +106,7 @@ dialer_stats_init(nni_dialer *d) nni_stat_init_atomic(&st->s_etimedout, "timedout", "timed out"); nni_stat_add(root, &st->s_etimedout); - nni_stat_init_atomic(&st->s_eproto, "protoerr", "protcol errors"); + nni_stat_init_atomic(&st->s_eproto, "protoerr", "protocol errors"); nni_stat_add(root, &st->s_eproto); nni_stat_init_atomic(&st->s_eauth, "autherr", "auth errors"); @@ -204,11 +198,18 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) nni_mtx_init(&d->d_mtx); dialer_stats_init(d); - if (((rv = nni_aio_alloc(&d->d_con_aio, dialer_connect_cb, d)) != 0) || - ((rv = nni_aio_alloc(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || - ((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) || - ((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) || + nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); + nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); + + nni_mtx_lock(&dialers_lk); + rv = nni_id_alloc(&dialers, &d->d_id, d); + nni_mtx_unlock(&dialers_lk); + + if ((rv != 0) || ((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) || ((rv = nni_sock_add_dialer(s, d)) != 0)) { + nni_mtx_lock(&dialers_lk); + nni_id_remove(&dialers, d->d_id); + nni_mtx_unlock(&dialers_lk); nni_dialer_destroy(d); return (rv); } @@ -232,16 +233,12 @@ nni_dialer_find(nni_dialer **dp, uint32_t id) } nni_mtx_lock(&dialers_lk); - if ((rv = nni_idhash_find(dialers, id, (void **) &d)) == 0) { - if (d->d_closed) { - rv = NNG_ECLOSED; - } else { - d->d_refcnt++; - *dp = d; - } + if ((d = nni_id_get(&dialers, id)) != NULL) { + d->d_refcnt++; + *dp = d; } nni_mtx_unlock(&dialers_lk); - return (rv); + return (d == NULL ? NNG_ENOENT : 0); } int @@ -280,14 +277,9 @@ nni_dialer_close_rele(nni_dialer *d) return; } d->d_closed = true; + nni_id_remove(&dialers, d->d_id); nni_mtx_unlock(&dialers_lk); - // Remove us from the table so we cannot be found. - // This is done fairly early in the teardown process. - // If we're here, either the socket or the listener has been - // closed at the user request, so there would be a race anyway. - nni_idhash_remove(dialers, d->d_id); - nni_dialer_rele(d); } @@ -301,14 +293,9 @@ nni_dialer_close(nni_dialer *d) return; } d->d_closed = true; + nni_id_remove(&dialers, d->d_id); nni_mtx_unlock(&dialers_lk); - // Remove us from the table so we cannot be found. - // This is done fairly early in the teardown process. - // If we're here, either the socket or the listener has been - // closed at the user request, so there would be a race anyway. - nni_idhash_remove(dialers, d->d_id); - nni_dialer_shutdown(d); nni_dialer_rele(d); @@ -317,10 +304,9 @@ nni_dialer_close(nni_dialer *d) static void dialer_timer_cb(void *arg) { - nni_dialer *d = arg; - nni_aio * aio = d->d_tmo_aio; + nni_dialer *d = arg; - if (nni_aio_result(aio) == 0) { + if (nni_aio_result(&d->d_tmo_aio) == 0) { dialer_connect_start(d); } } @@ -329,7 +315,7 @@ static void dialer_connect_cb(void *arg) { nni_dialer *d = arg; - nni_aio * aio = d->d_con_aio; + nni_aio * aio = &d->d_con_aio; nni_aio * user_aio; int rv; @@ -366,13 +352,11 @@ dialer_connect_cb(void *arg) static void dialer_connect_start(nni_dialer *d) { - nni_aio *aio = d->d_con_aio; - - d->d_ops.d_connect(d->d_data, aio); + d->d_ops.d_connect(d->d_data, &d->d_con_aio); } int -nni_dialer_start(nni_dialer *d, int flags) +nni_dialer_start(nni_dialer *d, unsigned flags) { int rv = 0; nni_aio *aio; diff --git a/src/core/dialer.h b/src/core/dialer.h index 200ad01d..df923209 100644 --- a/src/core/dialer.h +++ b/src/core/dialer.h @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2018 Devolutions <info@devolutions.net> // @@ -20,7 +20,7 @@ extern void nni_dialer_rele(nni_dialer *); extern uint32_t nni_dialer_id(nni_dialer *); extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *); extern void nni_dialer_close(nni_dialer *); -extern int nni_dialer_start(nni_dialer *, int); +extern int nni_dialer_start(nni_dialer *, unsigned); extern nni_sock *nni_dialer_sock(nni_dialer *); extern int nni_dialer_setopt( diff --git a/src/core/idhash.c b/src/core/idhash.c index 80613ce0..0edf16dc 100644 --- a/src/core/idhash.c +++ b/src/core/idhash.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -12,109 +12,77 @@ #include <string.h> -struct nni_idhash_entry { - uint64_t ihe_key; - void * ihe_val; - uint32_t ihe_skips; +struct nni_id_entry { + uint32_t key; + uint32_t skips; + void * val; }; -struct nni_idhash { - size_t ih_cap; - size_t ih_count; - size_t ih_load; - size_t ih_minload; // considers placeholders - size_t ih_maxload; - uint64_t ih_minval; - uint64_t ih_maxval; - uint64_t ih_dynval; - nni_idhash_entry *ih_entries; - nni_mtx ih_mtx; -}; - -int -nni_idhash_init(nni_idhash **hp) -{ - nni_idhash *h; - - if ((h = NNI_ALLOC_STRUCT(h)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&h->ih_mtx); - h->ih_entries = NULL; - h->ih_count = 0; - h->ih_load = 0; - h->ih_cap = 0; - h->ih_maxload = 0; - h->ih_minload = 0; // never shrink below this - h->ih_minval = 0; - h->ih_maxval = 0xffffffff; - h->ih_dynval = 0; - *hp = h; - return (0); -} - void -nni_idhash_fini(nni_idhash *h) +nni_id_map_init(nni_id_map *m, uint32_t lo, uint32_t hi, bool randomize) { - if (h != NULL) { - if (h->ih_entries != NULL) { - NNI_FREE_STRUCTS(h->ih_entries, h->ih_cap); - h->ih_entries = NULL; - h->ih_cap = h->ih_count = 0; - h->ih_load = h->ih_minload = h->ih_maxload = 0; - } - nni_mtx_fini(&h->ih_mtx); - NNI_FREE_STRUCT(h); + if (lo == 0) { + lo = 1; + } + if (hi == 0) { + hi = 0xffffffffu; + } + NNI_ASSERT(lo != 0); + NNI_ASSERT(hi > lo); + m->id_entries = NULL; + m->id_count = 0; + m->id_load = 0; + m->id_cap = 0; + m->id_max_load = 0; + m->id_min_load = 0; // never shrink below this + m->id_min_val = lo; + m->id_max_val = hi; + if (randomize) { + // NB: The range is inclusive. + m->id_dyn_val = nni_random() % ((hi - lo) + 1) + lo; + } else { + m->id_dyn_val = lo; } } void -nni_idhash_set_limits( - nni_idhash *h, uint64_t minval, uint64_t maxval, uint64_t start) +nni_id_map_fini(nni_id_map *m) { - if (start < minval) { - start = minval; - } - if (start > maxval) { - start = maxval; + if (m->id_entries != NULL) { + NNI_FREE_STRUCTS(m->id_entries, m->id_cap); + m->id_entries = NULL; + m->id_cap = m->id_count = 0; + m->id_load = m->id_min_load = m->id_max_load = 0; } - - nni_mtx_lock(&h->ih_mtx); - h->ih_minval = minval; - h->ih_maxval = maxval; - h->ih_dynval = start; - NNI_ASSERT(minval < maxval); - NNI_ASSERT(start >= minval); - NNI_ASSERT(start <= maxval); - nni_mtx_unlock(&h->ih_mtx); } // Inspired by Python dict implementation. This probe will visit every -// cell. We always hash consecutively assigned IDs. -#define NNI_IDHASH_NEXTPROBE(h, j) ((((j) *5) + 1) & (h->ih_cap - 1)) -#define NNI_IDHASH_INDEX(h, j) ((j) & (h->ih_cap - 1)) +// cell. We always hash consecutively assigned IDs. This requires that +// the capacity is always a power of two. +#define ID_NEXT(m, j) ((((j) *5) + 1) & (m->id_cap - 1)) +#define ID_INDEX(m, j) ((j) & (m->id_cap - 1)) static size_t -nni_hash_find_index(nni_idhash *h, uint64_t id) +id_find(nni_id_map *m, uint32_t id) { size_t index; size_t start; - if (h->ih_count == 0) { + if (m->id_count == 0) { return ((size_t) -1); } - index = NNI_IDHASH_INDEX(h, id); + index = ID_INDEX(m, id); start = index; for (;;) { // The value of ihe_key is only valid if ihe_val is not NULL. - if ((h->ih_entries[index].ihe_key == id) && - (h->ih_entries[index].ihe_val != NULL)) { + if ((m->id_entries[index].key == id) && + (m->id_entries[index].val != NULL)) { return (index); } - if (h->ih_entries[index].ihe_skips == 0) { + if (m->id_entries[index].skips == 0) { return ((size_t) -1); } - index = NNI_IDHASH_NEXTPROBE(h, index); + index = ID_NEXT(m, index); if (index == start) { break; @@ -124,249 +92,196 @@ nni_hash_find_index(nni_idhash *h, uint64_t id) return ((size_t) -1); } -static int -nni_hash_find(nni_idhash *h, uint64_t id, void **valp) +void * +nni_id_get(nni_id_map *m, uint32_t id) { size_t index; - if ((index = nni_hash_find_index(h, id)) == (size_t) -1) { - return (NNG_ENOENT); + if ((index = id_find(m, id)) == (size_t) -1) { + return (NULL); } - *valp = h->ih_entries[index].ihe_val; - return (0); -} - -int -nni_idhash_find(nni_idhash *h, uint64_t id, void **valp) -{ - int rv; - - nni_mtx_lock(&h->ih_mtx); - rv = nni_hash_find(h, id, valp); - nni_mtx_unlock(&h->ih_mtx); - return (rv); + return (m->id_entries[index].val); } static int -nni_hash_resize(nni_idhash *h) +id_resize(nni_id_map *m) { - size_t newsize; - size_t oldsize; - nni_idhash_entry *newents; - nni_idhash_entry *oldents; - uint32_t i; + size_t new_cap; + size_t old_cap; + nni_id_entry *new_entries; + nni_id_entry *old_entries; + uint32_t i; - if ((h->ih_load < h->ih_maxload) && (h->ih_load >= h->ih_minload)) { + if ((m->id_load < m->id_max_load) && (m->id_load >= m->id_min_load)) { // No resize needed. return (0); } - oldsize = h->ih_cap; - - newsize = 8; - while (newsize < (h->ih_count * 2)) { - newsize *= 2; + old_cap = m->id_cap; + new_cap = 8; + while (new_cap < (m->id_count * 2)) { + new_cap *= 2; } - if (newsize == oldsize) { + if (new_cap == old_cap) { // Same size. return (0); } - oldents = h->ih_entries; - newents = NNI_ALLOC_STRUCTS(newents, newsize); - if (newents == NULL) { + old_entries = m->id_entries; + new_entries = NNI_ALLOC_STRUCTS(new_entries, new_cap); + if (new_entries == NULL) { return (NNG_ENOMEM); } - h->ih_entries = newents; - h->ih_cap = newsize; - if (newsize > 8) { - h->ih_minload = newsize / 8; - h->ih_maxload = newsize * 2 / 3; + m->id_entries = new_entries; + m->id_cap = new_cap; + m->id_load = 0; + if (new_cap > 8) { + m->id_min_load = new_cap / 8; + m->id_max_load = new_cap * 2 / 3; } else { - h->ih_minload = 0; - h->ih_maxload = 5; + m->id_min_load = 0; + m->id_max_load = 5; } - for (i = 0; i < oldsize; i++) { + for (i = 0; i < old_cap; i++) { size_t index; - if (oldents[i].ihe_val == NULL) { + if (old_entries[i].val == NULL) { continue; } - index = oldents[i].ihe_key & (newsize - 1); + index = old_entries[i].key & (new_cap - 1); for (;;) { // Increment the load unconditionally. It counts // once for every item stored, plus once for each // hashing operation we use to store the item (i.e. // one for the item, plus once for each rehash.) - h->ih_load++; - if (newents[index].ihe_val == NULL) { + m->id_load++; + if (new_entries[index].val == NULL) { // As we are hitting this entry for the first // time, it won't have any skips. - NNI_ASSERT(newents[index].ihe_skips == 0); - newents[index].ihe_val = oldents[i].ihe_val; - newents[index].ihe_key = oldents[i].ihe_key; + NNI_ASSERT(new_entries[index].skips == 0); + new_entries[index].val = old_entries[i].val; + new_entries[index].key = old_entries[i].key; break; } - newents[index].ihe_skips++; - index = NNI_IDHASH_NEXTPROBE(h, index); + new_entries[index].skips++; + index = ID_NEXT(m, index); } } - if (oldsize != 0) { - NNI_FREE_STRUCTS(oldents, oldsize); + if (old_cap != 0) { + NNI_FREE_STRUCTS(old_entries, old_cap); } return (0); } int -nni_idhash_remove(nni_idhash *h, uint64_t id) +nni_id_remove(nni_id_map *m, uint32_t id) { - size_t index; - size_t srch; - nni_idhash_entry *ent; + size_t index; + size_t probe; - nni_mtx_lock(&h->ih_mtx); - if ((index = nni_hash_find_index(h, id)) == (size_t) -1) { - nni_mtx_unlock(&h->ih_mtx); + if ((index = id_find(m, id)) == (size_t) -1) { return (NNG_ENOENT); } // Now we have found the index where the object exists. We are going // to restart the search, until the index matches, to decrement the // skips counter. - srch = (int) NNI_IDHASH_INDEX(h, id); + probe = ID_INDEX(m, id); for (;;) { + nni_id_entry *entry; + // The load was increased once each hashing operation we used // to place the the item. Decrement it accordingly. - h->ih_load--; - ent = &h->ih_entries[srch]; - if (srch == index) { - ent->ihe_val = NULL; - ent->ihe_key = (uint64_t) -1; // garbage key + m->id_load--; + entry = &m->id_entries[probe]; + if (probe == index) { + entry->val = NULL; + entry->key = 0; // invalid key break; } - NNI_ASSERT(ent->ihe_skips > 0); - ent->ihe_skips--; - srch = NNI_IDHASH_NEXTPROBE(h, srch); + NNI_ASSERT(entry->skips > 0); + entry->skips--; + probe = ID_NEXT(m, probe); } - h->ih_count--; + m->id_count--; // Shrink -- but it's ok if we can't. - (void) nni_hash_resize(h); - - nni_mtx_unlock(&h->ih_mtx); + (void) id_resize(m); return (0); } -static int -nni_hash_insert(nni_idhash *h, uint64_t id, void *val) +int +nni_id_set(nni_id_map *m, uint32_t id, void *val) { - size_t index; - nni_idhash_entry *ent; + size_t index; + nni_id_entry *ent; // Try to resize -- if we don't need to, this will be a no-op. - if (nni_hash_resize(h) != 0) { + if (id_resize(m) != 0) { return (NNG_ENOMEM); } // If it already exists, just overwrite the old value. - if ((index = nni_hash_find_index(h, id)) != (size_t) -1) { - ent = &h->ih_entries[index]; - ent->ihe_val = val; + if ((index = id_find(m, id)) != (size_t) -1) { + ent = &m->id_entries[index]; + ent->val = val; return (0); } - index = NNI_IDHASH_INDEX(h, id); + index = ID_INDEX(m, id); for (;;) { - ent = &h->ih_entries[index]; + ent = &m->id_entries[index]; // Increment the load count. We do this each time time we - // rehash. This may overcount items that collide on the + // rehash. This may over-count items that collide on the // same rehashing, but this should just cause a table to // grow sooner, which is probably a good thing. - h->ih_load++; - if (ent->ihe_val == NULL) { - h->ih_count++; - ent->ihe_key = id; - ent->ihe_val = val; + m->id_load++; + if (ent->val == NULL) { + m->id_count++; + ent->key = id; + ent->val = val; return (0); } // Record the skip count. This being non-zero informs // that a rehash will be necessary. Without this we // would need to scan the entire hash for the match. - ent->ihe_skips++; - index = NNI_IDHASH_NEXTPROBE(h, index); + ent->skips++; + index = ID_NEXT(m, index); } } int -nni_idhash_insert(nni_idhash *h, uint64_t id, void *val) -{ - int rv; - - nni_mtx_lock(&h->ih_mtx); - rv = nni_hash_insert(h, id, val); - nni_mtx_unlock(&h->ih_mtx); - return (rv); -} - -int -nni_idhash_alloc(nni_idhash *h, uint64_t *idp, void *val) +nni_id_alloc(nni_id_map *m, uint32_t *idp, void *val) { uint64_t id; int rv; - nni_mtx_lock(&h->ih_mtx); NNI_ASSERT(val != NULL); - if (h->ih_count > (h->ih_maxval - h->ih_minval)) { + // range is inclusive, so > to get +1 effect. + if (m->id_count > (m->id_max_val - m->id_min_val)) { // Really more like ENOSPC.. the table is filled to max. - nni_mtx_unlock(&h->ih_mtx); - return (NNG_ENOMEM); } for (;;) { - id = h->ih_dynval; - h->ih_dynval++; - if (h->ih_dynval > h->ih_maxval) { - h->ih_dynval = h->ih_minval; + id = m->id_dyn_val; + m->id_dyn_val++; + if (m->id_dyn_val > m->id_max_val) { + m->id_dyn_val = m->id_min_val; } - if (nni_hash_find_index(h, id) == (size_t) -1) { + if (id_find(m, id) == (size_t) -1) { break; } } - rv = nni_hash_insert(h, id, val); + rv = nni_id_set(m, id, val); if (rv == 0) { *idp = id; } - nni_mtx_unlock(&h->ih_mtx); - return (rv); } - -int -nni_idhash_alloc32(nni_idhash *h, uint32_t *idp, void *val) -{ - uint64_t id64; - int rv; - - if ((rv = nni_idhash_alloc(h, &id64, val)) == 0) { - if (id64 > 0xffffffffull) { - nni_idhash_remove(h, id64); - rv = NNG_EINVAL; - } else { - *idp = (uint32_t) id64; - } - } - return (rv); -} - -size_t -nni_idhash_count(nni_idhash *h) -{ - return (h->ih_count); -} diff --git a/src/core/idhash.h b/src/core/idhash.h index 5b9ed593..0c9043ce 100644 --- a/src/core/idhash.h +++ b/src/core/idhash.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -23,20 +23,28 @@ // use table sizes that are powers of two. Note that hash items // must be non-NULL. The table is protected by an internal lock. -typedef struct nni_idhash nni_idhash; -typedef struct nni_idhash_entry nni_idhash_entry; - -extern int nni_idhash_init(nni_idhash **); -extern void nni_idhash_fini(nni_idhash *); -extern void nni_idhash_set_limits(nni_idhash *, uint64_t, uint64_t, uint64_t); -extern int nni_idhash_find(nni_idhash *, uint64_t, void **); -extern int nni_idhash_remove(nni_idhash *, uint64_t); -extern int nni_idhash_insert(nni_idhash *, uint64_t, void *); -extern int nni_idhash_alloc(nni_idhash *, uint64_t *, void *); - -// 32-bit version of idhash -- limits must have been set accordingly. -extern int nni_idhash_alloc32(nni_idhash *, uint32_t *, void *); - -extern size_t nni_idhash_count(nni_idhash *); +typedef struct nni_id_map nni_id_map; +typedef struct nni_id_entry nni_id_entry; + +// NB: These details are entirely private to the hash implementation. +// They are provided here to facilitate inlining in structures. +struct nni_id_map { + size_t id_cap; + size_t id_count; + size_t id_load; + size_t id_min_load; // considers placeholders + size_t id_max_load; + uint32_t id_min_val; + uint32_t id_max_val; + uint32_t id_dyn_val; + nni_id_entry *id_entries; +}; + +extern void nni_id_map_init(nni_id_map *, uint32_t, uint32_t, bool); +extern void nni_id_map_fini(nni_id_map *); +extern void *nni_id_get(nni_id_map *, uint32_t); +extern int nni_id_set(nni_id_map *, uint32_t, void *); +extern int nni_id_alloc(nni_id_map *, uint32_t *, void *); +extern int nni_id_remove(nni_id_map *, uint32_t); #endif // CORE_IDHASH_H diff --git a/src/core/listener.c b/src/core/listener.c index 1fde6e36..56850649 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2018 Devolutions <info@devolutions.net> // @@ -21,21 +21,16 @@ static void listener_accept_start(nni_listener *); static void listener_accept_cb(void *); static void listener_timer_cb(void *); -static nni_idhash *listeners; -static nni_mtx listeners_lk; +static nni_id_map listeners; +static nni_mtx listeners_lk; #define BUMP_STAT(x) nni_stat_inc_atomic(x, 1) int nni_listener_sys_init(void) { - int rv; - - if ((rv = nni_idhash_init(&listeners)) != 0) { - return (rv); - } + nni_id_map_init(&listeners, 1, 0x7fffffff, false); nni_mtx_init(&listeners_lk); - nni_idhash_set_limits(listeners, 1, 0x7fffffff, 1); return (0); } @@ -45,8 +40,7 @@ nni_listener_sys_fini(void) { nni_reap_drain(); nni_mtx_fini(&listeners_lk); - nni_idhash_fini(listeners); - listeners = NULL; + nni_id_map_fini(&listeners); } uint32_t @@ -58,11 +52,11 @@ nni_listener_id(nni_listener *l) void nni_listener_destroy(nni_listener *l) { - nni_aio_stop(l->l_acc_aio); - nni_aio_stop(l->l_tmo_aio); + nni_aio_stop(&l->l_acc_aio); + nni_aio_stop(&l->l_tmo_aio); - nni_aio_free(l->l_acc_aio); - nni_aio_free(l->l_tmo_aio); + nni_aio_fini(&l->l_acc_aio); + nni_aio_fini(&l->l_tmo_aio); if (l->l_data != NULL) { l->l_ops.l_fini(l->l_data); @@ -109,7 +103,7 @@ listener_stats_init(nni_listener *l) nni_stat_init_atomic(&st->s_etimedout, "timedout", "timed out"); nni_stat_add(root, &st->s_etimedout); - nni_stat_init_atomic(&st->s_eproto, "protoerr", "protcol errors"); + nni_stat_init_atomic(&st->s_eproto, "protoerr", "protocol errors"); nni_stat_add(root, &st->s_eproto); nni_stat_init_atomic(&st->s_eauth, "autherr", "auth errors"); @@ -196,11 +190,18 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str) NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node); listener_stats_init(l); - if (((rv = nni_aio_alloc(&l->l_acc_aio, listener_accept_cb, l)) != 0) || - ((rv = nni_aio_alloc(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || - ((rv = l->l_ops.l_init(&l->l_data, url, l)) != 0) || - ((rv = nni_idhash_alloc32(listeners, &l->l_id, l)) != 0) || + nni_aio_init(&l->l_acc_aio, listener_accept_cb, l); + nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l); + + nni_mtx_lock(&listeners_lk); + rv = nni_id_alloc(&listeners, &l->l_id, l); + nni_mtx_unlock(&listeners_lk); + + if ((rv != 0) || ((rv = l->l_ops.l_init(&l->l_data, url, l)) != 0) || ((rv = nni_sock_add_listener(s, l)) != 0)) { + nni_mtx_lock(&listeners_lk); + nni_id_remove(&listeners, l->l_id); + nni_mtx_unlock(&listeners_lk); nni_listener_destroy(l); return (rv); } @@ -226,16 +227,12 @@ nni_listener_find(nni_listener **lp, uint32_t id) } nni_mtx_lock(&listeners_lk); - if ((rv = nni_idhash_find(listeners, id, (void **) &l)) == 0) { - if (l->l_closed) { - rv = NNG_ECLOSED; - } else { - l->l_refcnt++; - *lp = l; - } + if ((l = nni_id_get(&listeners, id)) != NULL) { + l->l_refcnt++; + *lp = l; } nni_mtx_unlock(&listeners_lk); - return (rv); + return (l == NULL ? NNG_ENOENT : 0); } int @@ -274,14 +271,9 @@ nni_listener_close(nni_listener *l) return; } l->l_closed = true; + nni_id_remove(&listeners, l->l_id); nni_mtx_unlock(&listeners_lk); - // Remove us from the table so we cannot be found. - // This is done fairly early in the teardown process. - // If we're here, either the socket or the listener has been - // closed at the user request, so there would be a race anyway. - nni_idhash_remove(listeners, l->l_id); - nni_listener_shutdown(l); nni_listener_rele(l); // This will trigger a reap if id count is zero. @@ -298,23 +290,18 @@ nni_listener_close_rele(nni_listener *l) return; } l->l_closed = true; + nni_id_remove(&listeners, l->l_id); nni_mtx_unlock(&listeners_lk); - // Remove us from the table so we cannot be found. - // This is done fairly early in the teardown process. - // If we're here, either the socket or the listener has been - // closed at the user request, so there would be a race anyway. - nni_idhash_remove(listeners, l->l_id); nni_listener_rele(l); // This will trigger a reap if id count is zero. } static void listener_timer_cb(void *arg) { - nni_listener *l = arg; - nni_aio * aio = l->l_tmo_aio; + nni_listener *l = arg; - if (nni_aio_result(aio) == 0) { + if (nni_aio_result(&l->l_tmo_aio) == 0) { listener_accept_start(l); } } @@ -323,8 +310,8 @@ static void listener_accept_cb(void *arg) { nni_listener *l = arg; - nni_aio * aio = l->l_acc_aio; - int rv; + nni_aio * aio = &l->l_acc_aio; + int rv; switch ((rv = nni_aio_result(aio))) { case 0: @@ -350,7 +337,7 @@ listener_accept_cb(void *arg) // by not thrashing we give the system a chance to // recover. 100 ms is enough to cool down. nni_listener_bump_error(l, rv); - nni_sleep_aio(100, l->l_tmo_aio); + nni_sleep_aio(100, &l->l_tmo_aio); break; } } @@ -358,16 +345,14 @@ listener_accept_cb(void *arg) static void listener_accept_start(nni_listener *l) { - nni_aio *aio = l->l_acc_aio; - // Call with the listener lock held. - l->l_ops.l_accept(l->l_data, aio); + l->l_ops.l_accept(l->l_data, &l->l_acc_aio); } int nni_listener_start(nni_listener *l, int flags) { - int rv = 0; + int rv; NNI_ARG_UNUSED(flags); if (nni_atomic_flag_test_and_set(&l->l_started)) { diff --git a/src/core/pipe.c b/src/core/pipe.c index 4076b62c..b93d9f64 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -13,34 +13,23 @@ #include "sockimpl.h" #include <stdio.h> -#include <string.h> // This file contains functions relating to pipes. // // Operations on pipes (to the transport) are generally blocking operations, // performed in the context of the protocol. -static nni_idhash *nni_pipes; -static nni_mtx nni_pipe_lk; +static nni_id_map pipes; +static nni_mtx pipes_lk; int nni_pipe_sys_init(void) { - int rv; - - nni_mtx_init(&nni_pipe_lk); - - if ((rv = nni_idhash_init(&nni_pipes)) != 0) { - return (rv); - } + nni_mtx_init(&pipes_lk); - // Note that pipes have their own namespace. ID hash will - // guarantee the that the first value is reasonable (non-zero), - // if we supply an out of range value (0). (Consequently the - // value "1" has a bias -- its roughly twice as likely to be - // chosen as any other value. This does not mater.) - nni_idhash_set_limits( - nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffffu); + // Pipe IDs needs to have high order bit clear, and we want + // them to start at a random value. + nni_id_map_init(&pipes, 1, 0x7fffffff, true); return (0); } @@ -49,11 +38,8 @@ void nni_pipe_sys_fini(void) { nni_reap_drain(); - nni_mtx_fini(&nni_pipe_lk); - if (nni_pipes != NULL) { - nni_idhash_fini(nni_pipes); - nni_pipes = NULL; - } + nni_mtx_fini(&pipes_lk); + nni_id_map_fini(&pipes); } static void @@ -67,15 +53,15 @@ pipe_destroy(nni_pipe *p) // Make sure any unlocked holders are done with this. // This happens during initialization for example. - nni_mtx_lock(&nni_pipe_lk); + nni_mtx_lock(&pipes_lk); if (p->p_id != 0) { - nni_idhash_remove(nni_pipes, p->p_id); + nni_id_remove(&pipes, p->p_id); } // This wait guarantees that all callers are done with us. while (p->p_refcnt != 0) { nni_cv_wait(&p->p_cv); } - nni_mtx_unlock(&nni_pipe_lk); + nni_mtx_unlock(&pipes_lk); if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_stop(p->p_proto_data); @@ -101,31 +87,30 @@ pipe_destroy(nni_pipe *p) int nni_pipe_find(nni_pipe **pp, uint32_t id) { - int rv; nni_pipe *p; - nni_mtx_lock(&nni_pipe_lk); // We don't care if the pipe is "closed". End users only have // access to the pipe in order to obtain properties (which may // be retried during the post-close notification callback) or to // close the pipe. - if ((rv = nni_idhash_find(nni_pipes, id, (void **) &p)) == 0) { + nni_mtx_lock(&pipes_lk); + if ((p = nni_id_get(&pipes, id)) != NULL) { p->p_refcnt++; *pp = p; } - nni_mtx_unlock(&nni_pipe_lk); - return (rv); + nni_mtx_unlock(&pipes_lk); + return (p == NULL ? NNG_ENOENT : 0); } void nni_pipe_rele(nni_pipe *p) { - nni_mtx_lock(&nni_pipe_lk); + nni_mtx_lock(&pipes_lk); p->p_refcnt--; if (p->p_refcnt == 0) { nni_cv_wake(&p->p_cv); } - nni_mtx_unlock(&nni_pipe_lk); + nni_mtx_unlock(&pipes_lk); } // nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. @@ -188,9 +173,9 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) void * sdata = nni_sock_proto_data(sock); nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); nni_pipe_stats * st; - size_t sz; + size_t sz; - sz = NNI_ALIGN_UP(sizeof (*p)) + pops->pipe_size; + sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size; if ((p = nni_zalloc(sz)) == NULL) { // In this case we just toss the pipe... @@ -198,7 +183,7 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) return (NNG_ENOMEM); } - p->p_size = sz; + p->p_size = sz; p->p_proto_data = p + 1; p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; @@ -214,13 +199,13 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) NNI_LIST_NODE_INIT(&p->p_ep_node); nni_mtx_init(&p->p_mtx); - nni_cv_init(&p->p_cv, &nni_pipe_lk); + nni_cv_init(&p->p_cv, &pipes_lk); - nni_mtx_lock(&nni_pipe_lk); - if ((rv = nni_idhash_alloc32(nni_pipes, &p->p_id, p)) == 0) { + nni_mtx_lock(&pipes_lk); + if ((rv = nni_id_alloc(&pipes, &p->p_id, p)) == 0) { p->p_refcnt = 1; } - nni_mtx_unlock(&nni_pipe_lk); + nni_mtx_unlock(&pipes_lk); snprintf(st->s_scope, sizeof(st->s_scope), "pipe%u", p->p_id); diff --git a/src/core/socket.c b/src/core/socket.c index 6e3f14d0..66130d4a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -16,10 +16,11 @@ // Socket implementation. -static nni_list sock_list; -static nni_idhash *sock_hash; -static nni_mtx sock_lk; -static nni_idhash *ctx_hash; +static nni_list sock_list; +static nni_id_map sock_ids; +static nni_mtx sock_lk; +static nni_id_map ctx_ids; +static bool inited; struct nni_ctx { nni_list_node c_node; @@ -117,7 +118,7 @@ static void listener_shutdown_locked(nni_listener *); #define SOCK(s) ((nni_sock *) (s)) static int -sock_get_fd(void *s, int flag, int *fdp) +sock_get_fd(void *s, unsigned flag, int *fdp) { int rv; nni_pollable *p; @@ -374,19 +375,17 @@ nni_sock_find(nni_sock **sockp, uint32_t id) return (rv); } nni_mtx_lock(&sock_lk); - if ((rv = nni_idhash_find(sock_hash, id, (void **) &s)) == 0) { + if ((s = nni_id_get(&sock_ids, id)) != NULL) { if (s->s_closed) { rv = NNG_ECLOSED; } else { s->s_refcnt++; *sockp = s; } - } - nni_mtx_unlock(&sock_lk); - - if (rv == NNG_ENOENT) { + } else { rv = NNG_ECLOSED; } + nni_mtx_unlock(&sock_lk); return (rv); } @@ -582,33 +581,22 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) int nni_sock_sys_init(void) { - int rv; - NNI_LIST_INIT(&sock_list, nni_sock, s_node); nni_mtx_init(&sock_lk); - if (((rv = nni_idhash_init(&sock_hash)) != 0) || - ((rv = nni_idhash_init(&ctx_hash)) != 0)) { - nni_sock_sys_fini(); - return (rv); - } - nni_idhash_set_limits(sock_hash, 1, 0x7fffffff, 1); - nni_idhash_set_limits(ctx_hash, 1, 0x7fffffff, 1); + nni_id_map_init(&sock_ids, 1, 0x7fffffff, false); + nni_id_map_init(&ctx_ids, 1, 0x7fffffff, false); + inited = true; return (0); } void nni_sock_sys_fini(void) { - if (sock_hash != NULL) { - nni_idhash_fini(sock_hash); - sock_hash = NULL; - } - if (ctx_hash != NULL) { - nni_idhash_fini(ctx_hash); - ctx_hash = NULL; - } + nni_id_map_fini(&sock_ids); + nni_id_map_fini(&ctx_ids); nni_mtx_fini(&sock_lk); + inited = false; } int @@ -628,7 +616,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) } nni_mtx_lock(&sock_lk); - if (nni_idhash_alloc32(sock_hash, &s->s_id, s) != 0) { + if (nni_id_alloc(&sock_ids, &s->s_id, s) != 0) { sock_destroy(s); } else { nni_list_append(&sock_list, s); @@ -691,7 +679,7 @@ nni_sock_shutdown(nni_sock *sock) ctx->c_closed = true; if (ctx->c_refcnt == 0) { // No open operations. So close it. - nni_idhash_remove(ctx_hash, ctx->c_id); + nni_id_remove(&ctx_ids, ctx->c_id); nni_list_remove(&sock->s_ctxs, ctx); nni_ctx_destroy(ctx); } @@ -790,7 +778,7 @@ nni_sock_close(nni_sock *s) return; } s->s_closed = true; - nni_idhash_remove(sock_hash, s->s_id); + nni_id_remove(&sock_ids, s->s_id); // We might have been removed from the list already, e.g. by // nni_sock_closeall. This is idempotent. @@ -820,7 +808,7 @@ nni_sock_closeall(void) { nni_sock *s; - if (sock_hash == NULL) { + if (!inited) { return; } for (;;) { @@ -1152,7 +1140,7 @@ nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg) } int -nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing) +nni_ctx_find(nni_ctx **cp, uint32_t id, bool closing) { int rv; nni_ctx *ctx; @@ -1161,7 +1149,7 @@ nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing) return (rv); } nni_mtx_lock(&sock_lk); - if ((rv = nni_idhash_find(ctx_hash, id, (void **) &ctx)) == 0) { + if ((ctx = nni_id_get(&ctx_ids, id)) != NULL) { // We refuse a reference if either the socket is // closed, or the context is closed. (If the socket // is closed, and we are only getting the reference so @@ -1172,14 +1160,12 @@ nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing) rv = NNG_ECLOSED; } else { ctx->c_refcnt++; - *ctxp = ctx; + *cp = ctx; } - } - nni_mtx_unlock(&sock_lk); - - if (rv == NNG_ENOENT) { + } else { rv = NNG_ECLOSED; } + nni_mtx_unlock(&sock_lk); return (rv); } @@ -1211,7 +1197,7 @@ nni_ctx_rele(nni_ctx *ctx) // Remove us from the hash, so we can't be found any more. // This allows our ID to be reused later, although the system // tries to avoid ID reuse. - nni_idhash_remove(ctx_hash, ctx->c_id); + nni_id_remove(&ctx_ids, ctx->c_id); nni_list_remove(&sock->s_ctxs, ctx); if (sock->s_closed || sock->s_ctxwait) { nni_cv_wake(&sock->s_close_cv); @@ -1236,8 +1222,8 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) if ((ctx = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } - ctx->c_size = sz; - ctx->c_data = ctx + 1; + ctx->c_size = sz; + ctx->c_data = ctx + 1; ctx->c_closed = false; ctx->c_refcnt = 1; // Caller implicitly gets a reference. ctx->c_sock = sock; @@ -1251,14 +1237,14 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) nni_free(ctx, ctx->c_size); return (NNG_ECLOSED); } - if ((rv = nni_idhash_alloc32(ctx_hash, &ctx->c_id, ctx)) != 0) { + if ((rv = nni_id_alloc(&ctx_ids, &ctx->c_id, ctx)) != 0) { nni_mtx_unlock(&sock_lk); nni_free(ctx, ctx->c_size); return (rv); } if ((rv = sock->s_ctx_ops.ctx_init(ctx->c_data, sock->s_data)) != 0) { - nni_idhash_remove(ctx_hash, ctx->c_id); + nni_id_remove(&ctx_ids, ctx->c_id); nni_mtx_unlock(&sock_lk); nni_free(ctx, ctx->c_size); return (rv); @@ -1396,7 +1382,7 @@ dialer_timer_start_locked(nni_dialer *d) // This algorithm may lead to slight biases because we don't // have a statistically perfect distribution with the modulo of // the random number, but this really doesn't matter. - nni_sleep_aio(back_off ? nni_random() % back_off : 0, d->d_tmo_aio); + nni_sleep_aio(back_off ? nni_random() % back_off : 0, &d->d_tmo_aio); } void @@ -1460,8 +1446,8 @@ dialer_shutdown_impl(nni_dialer *d) nni_pipe *p; // Abort any remaining in-flight operations. - nni_aio_close(d->d_con_aio); - nni_aio_close(d->d_tmo_aio); + nni_aio_close(&d->d_con_aio); + nni_aio_close(&d->d_tmo_aio); // Stop the underlying transport. d->d_ops.d_close(d->d_data); @@ -1494,8 +1480,8 @@ nni_dialer_reap(nni_dialer *d) { nni_sock *s = d->d_sock; - nni_aio_stop(d->d_tmo_aio); - nni_aio_stop(d->d_con_aio); + nni_aio_stop(&d->d_tmo_aio); + nni_aio_stop(&d->d_con_aio); nni_stat_unregister(&d->d_stats.s_root); @@ -1571,8 +1557,8 @@ listener_shutdown_impl(nni_listener *l) nni_pipe *p; // Abort any remaining in-flight accepts. - nni_aio_close(l->l_acc_aio); - nni_aio_close(l->l_tmo_aio); + nni_aio_close(&l->l_acc_aio); + nni_aio_close(&l->l_tmo_aio); // Stop the underlying transport. l->l_ops.l_close(l->l_data); @@ -1606,8 +1592,8 @@ nni_listener_reap(nni_listener *l) { nni_sock *s = l->l_sock; - nni_aio_stop(l->l_tmo_aio); - nni_aio_stop(l->l_acc_aio); + nni_aio_stop(&l->l_tmo_aio); + nni_aio_stop(&l->l_acc_aio); nni_stat_unregister(&l->l_stats.s_root); diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 16596c63..732e2285 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -50,8 +50,8 @@ struct nni_dialer { nni_mtx d_mtx; nni_list d_pipes; nni_aio * d_user_aio; - nni_aio * d_con_aio; - nni_aio * d_tmo_aio; // backoff timer + nni_aio d_con_aio; + nni_aio d_tmo_aio; // backoff timer nni_duration d_maxrtime; // maximum time for reconnect nni_duration d_currtime; // current time for reconnect nni_duration d_inirtime; // initial time for reconnect @@ -91,8 +91,8 @@ struct nni_listener { bool l_closing; // close started (shutdown) nni_atomic_flag l_started; nni_list l_pipes; - nni_aio * l_acc_aio; - nni_aio * l_tmo_aio; + nni_aio l_acc_aio; + nni_aio l_tmo_aio; nni_reap_item l_reap; nni_listener_stats l_stats; }; diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index 00959a4c..0492119a 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -34,7 +34,7 @@ struct pair1_sock { bool raw; nni_atomic_int ttl; nni_mtx mtx; - nni_idhash * pipes; + nni_id_map pipes; nni_list plist; bool started; nni_stat_item stat_poly; @@ -66,7 +66,7 @@ pair1_sock_fini(void *arg) { pair1_sock *s = arg; - nni_idhash_fini(s->pipes); + nni_id_map_fini(&s->pipes); nni_mtx_fini(&s->mtx); } @@ -75,9 +75,7 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw) { pair1_sock *s = arg; - if (nni_idhash_init(&s->pipes) != 0) { - return (NNG_ENOMEM); - } + nni_id_map_init(&s->pipes, 0, 0, false); NNI_LIST_INIT(&s->plist, pair1_pipe, node); // Raw mode uses this. @@ -199,12 +197,12 @@ pair1_pipe_start(void *arg) } id = nni_pipe_id(p->pipe); - if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) { + if ((rv = nni_id_set(&s->pipes, id, p)) != 0) { nni_mtx_unlock(&s->mtx); return (rv); } if (!nni_list_empty(&s->plist)) { - nni_idhash_remove(s->pipes, id); + nni_id_remove(&s->pipes, id); nni_mtx_unlock(&s->mtx); BUMP_STAT(&s->stat_reject_already); return (NNG_EBUSY); @@ -234,7 +232,7 @@ pair1_pipe_close(void *arg) nni_aio_close(&p->aio_get); nni_mtx_lock(&s->mtx); - nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); + nni_id_remove(&s->pipes, nni_pipe_id(p->pipe)); nni_list_node_remove(&p->node); nni_mtx_unlock(&s->mtx); } @@ -400,7 +398,6 @@ pair1_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); } - #ifdef NNG_TEST_LIB static int pair1_set_test_inject_header(void *arg, const void *buf, size_t sz, nni_type t) diff --git a/src/protocol/pair1/pair1_poly.c b/src/protocol/pair1/pair1_poly.c index 950c60f7..fc1bbf6a 100644 --- a/src/protocol/pair1/pair1_poly.c +++ b/src/protocol/pair1/pair1_poly.c @@ -40,7 +40,7 @@ struct pair1poly_sock { nni_sock * sock; nni_atomic_int ttl; nni_mtx mtx; - nni_idhash * pipes; + nni_id_map pipes; nni_list plist; bool started; nni_aio aio_get; @@ -72,7 +72,7 @@ pair1poly_sock_fini(void *arg) pair1poly_sock *s = arg; nni_aio_fini(&s->aio_get); - nni_idhash_fini(s->pipes); + nni_id_map_fini(&s->pipes); nni_mtx_fini(&s->mtx); } @@ -81,9 +81,7 @@ pair1poly_sock_init(void *arg, nni_sock *sock) { pair1poly_sock *s = arg; - if (nni_idhash_init(&s->pipes) != 0) { - return (NNG_ENOMEM); - } + nni_id_map_init(&s->pipes, 0, 0, false); NNI_LIST_INIT(&s->plist, pair1poly_pipe, node); // Raw mode uses this. @@ -196,7 +194,7 @@ pair1poly_pipe_start(void *arg) } id = nni_pipe_id(p->pipe); - if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) { + if ((rv = nni_id_set(&s->pipes, id, p)) != 0) { nni_mtx_unlock(&s->mtx); return (rv); } @@ -231,7 +229,7 @@ pair1poly_pipe_close(void *arg) nni_aio_close(&p->aio_get); nni_mtx_lock(&s->mtx); - nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); + nni_id_remove(&s->pipes, nni_pipe_id(p->pipe)); nni_list_node_remove(&p->node); nni_mtx_unlock(&s->mtx); @@ -311,7 +309,7 @@ pair1poly_sock_get_cb(void *arg) (!nni_list_empty(&s->plist))) { p = nni_list_first(&s->plist); } else { - nni_idhash_find(s->pipes, id, (void **) &p); + p = nni_id_get(&s->pipes, id); } // Try a non-blocking send. If this fails we just discard the diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index e750ef56..6f859ee6 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -41,7 +41,7 @@ struct rep0_ctx { struct rep0_sock { nni_mtx lk; nni_atomic_int ttl; - nni_idhash * pipes; + nni_id_map pipes; nni_list recvpipes; // list of pipes with data to receive nni_list recvq; rep0_ctx ctx; @@ -177,7 +177,7 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } - if (nni_idhash_find(s->pipes, p_id, (void **) &p) != 0) { + if ((p = nni_id_get(&s->pipes, p_id)) == NULL) { // Pipe is gone. Make this look like a good send to avoid // disrupting the state machine. We don't care if the peer // lost interest in our reply. @@ -210,7 +210,7 @@ rep0_sock_fini(void *arg) { rep0_sock *s = arg; - nni_idhash_fini(s->pipes); + nni_id_map_fini(&s->pipes); rep0_ctx_fini(&s->ctx); nni_pollable_fini(&s->writable); nni_pollable_fini(&s->readable); @@ -221,16 +221,11 @@ static int rep0_sock_init(void *arg, nni_sock *sock) { rep0_sock *s = arg; - int rv; NNI_ARG_UNUSED(sock); nni_mtx_init(&s->lk); - if ((rv = nni_idhash_init(&s->pipes)) != 0) { - rep0_sock_fini(s); - return (rv); - } - + nni_id_map_init(&s->pipes, 0, 0, false); NNI_LIST_INIT(&s->recvq, rep0_ctx, rqnode); NNI_LIST_INIT(&s->recvpipes, rep0_pipe, rnode); nni_atomic_init(&s->ttl); @@ -312,7 +307,10 @@ rep0_pipe_start(void *arg) return (NNG_EPROTO); } - if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { + nni_mtx_lock(&s->lk); + rv = nni_id_set(&s->pipes, nni_pipe_id(p->pipe), p); + nni_mtx_unlock(&s->lk); + if (rv != 0) { return (rv); } // By definition, we have not received a request yet on this pipe, @@ -355,7 +353,7 @@ rep0_pipe_close(void *arg) // accept a message and discard it.) nni_pollable_raise(&s->writable); } - nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); + nni_id_remove(&s->pipes, nni_pipe_id(p->pipe)); nni_mtx_unlock(&s->lk); } diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 0112f835..c63359d5 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -55,7 +55,7 @@ struct req0_sock { nni_list stop_pipes; nni_list contexts; nni_list send_queue; // contexts waiting to send. - nni_idhash * requests; // contexts by request ID + nni_id_map requests; // contexts by request ID nni_pollable readable; nni_pollable writable; nni_mtx mtx; @@ -80,19 +80,13 @@ static int req0_sock_init(void *arg, nni_sock *sock) { req0_sock *s = arg; - int rv; NNI_ARG_UNUSED(sock); - if ((rv = nni_idhash_init(&s->requests)) != 0) { - return (rv); - } - // Request IDs are 32 bits, with the high order bit set. // We start at a random point, to minimize likelihood of // accidental collision across restarts. - nni_idhash_set_limits( - s->requests, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u); + nni_id_map_init(&s->requests, 0x80000000u, 0xffffffffu, true); nni_mtx_init(&s->mtx); @@ -145,7 +139,7 @@ req0_sock_fini(void *arg) req0_ctx_fini(&s->master); nni_pollable_fini(&s->readable); nni_pollable_fini(&s->writable); - nni_idhash_fini(s->requests); + nni_id_map_fini(&s->requests); nni_mtx_fini(&s->mtx); } @@ -316,7 +310,7 @@ req0_recv_cb(void *arg) nni_pipe_recv(p->pipe, &p->aio_recv); // Look for a context to receive it. - if ((nni_idhash_find(s->requests, id, (void **) &ctx) != 0) || + if (((ctx = nni_id_get(&s->requests, id)) == NULL) || (ctx->send_aio != NULL) || (ctx->rep_msg != NULL)) { nni_mtx_unlock(&s->mtx); // No waiting context, we have not sent the request out to @@ -328,7 +322,7 @@ req0_recv_cb(void *arg) // We have our match, so we can remove this. nni_list_node_remove(&ctx->send_node); - nni_idhash_remove(s->requests, id); + nni_id_remove(&s->requests, id); ctx->request_id = 0; if (ctx->req_msg != NULL) { nni_msg_free(ctx->req_msg); @@ -512,7 +506,7 @@ req0_ctx_reset(req0_ctx *ctx) nni_list_node_remove(&ctx->pipe_node); nni_list_node_remove(&ctx->send_node); if (ctx->request_id != 0) { - nni_idhash_remove(s->requests, ctx->request_id); + nni_id_remove(&s->requests, ctx->request_id); ctx->request_id = 0; } if (ctx->req_msg != NULL) { @@ -631,7 +625,6 @@ req0_ctx_send(void *arg, nni_aio *aio) req0_ctx * ctx = arg; req0_sock *s = ctx->sock; nng_msg * msg = nni_aio_get_msg(aio); - uint64_t id; int rv; if (nni_aio_begin(aio) != 0) { @@ -662,12 +655,11 @@ req0_ctx_send(void *arg, nni_aio *aio) req0_ctx_reset(ctx); // Insert us on the per ID hash list, so that receives can find us. - if ((rv = nni_idhash_alloc(s->requests, &id, ctx)) != 0) { + if ((rv = nni_id_alloc(&s->requests, &ctx->request_id, ctx)) != 0) { nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); return; } - ctx->request_id = (uint32_t) id; nni_msg_header_clear(msg); nni_msg_header_append_u32(msg, ctx->request_id); @@ -675,7 +667,7 @@ req0_ctx_send(void *arg, nni_aio *aio) // schedule), then fail it. Should be NNG_ETIMEDOUT. rv = nni_aio_schedule(aio, req0_ctx_cancel_send, ctx); if ((rv != 0) && (nni_list_empty(&s->ready_pipes))) { - nni_idhash_remove(s->requests, id); + nni_id_remove(&s->requests, ctx->request_id); nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); return; diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index 0bce27ba..9737c600 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -33,7 +33,7 @@ struct xrep0_sock { nni_msgq * urq; nni_mtx lk; nni_atomic_int ttl; - nni_idhash * pipes; + nni_id_map pipes; nni_aio aio_getq; }; @@ -54,7 +54,7 @@ xrep0_sock_fini(void *arg) xrep0_sock *s = arg; nni_aio_fini(&s->aio_getq); - nni_idhash_fini(s->pipes); + nni_id_map_fini(&s->pipes); nni_mtx_fini(&s->lk); } @@ -62,7 +62,6 @@ static int xrep0_sock_init(void *arg, nni_sock *sock) { xrep0_sock *s = arg; - int rv; nni_mtx_init(&s->lk); nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s); @@ -71,11 +70,7 @@ xrep0_sock_init(void *arg, nni_sock *sock) s->uwq = nni_sock_sendq(sock); s->urq = nni_sock_recvq(sock); - if ((rv = nni_idhash_init(&s->pipes)) != 0) { - xrep0_sock_fini(s); - return (rv); - } - + nni_id_map_init(&s->pipes, 0, 0, false); return (0); } @@ -164,7 +159,10 @@ xrep0_pipe_start(void *arg) return (NNG_EPROTO); } - if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { + nni_mtx_lock(&s->lk); + rv = nni_id_set(&s->pipes, nni_pipe_id(p->pipe), p); + nni_mtx_unlock(&s->lk); + if (rv != 0) { return (rv); } @@ -186,7 +184,7 @@ xrep0_pipe_close(void *arg) nni_msgq_close(p->sendq); nni_mtx_lock(&s->lk); - nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); + nni_id_remove(&s->pipes, nni_pipe_id(p->pipe)); nni_mtx_unlock(&s->lk); } @@ -227,7 +225,7 @@ xrep0_sock_getq_cb(void *arg) // (non-blocking) if we can. If we can't for any reason, then we // free the message. nni_mtx_lock(&s->lk); - if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) || + if (((p = nni_id_get(&s->pipes, id)) == NULL) || (nni_msgq_tryput(p->sendq, msg) != 0)) { nni_msg_free(msg); } diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index b414c189..7583c4d8 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -50,7 +50,7 @@ struct resp0_ctx { struct resp0_sock { nni_mtx mtx; nni_atomic_int ttl; - nni_idhash * pipes; + nni_id_map pipes; resp0_ctx ctx; nni_list recvpipes; nni_list recvq; @@ -181,7 +181,7 @@ resp0_ctx_send(void *arg, nni_aio *aio) return; } - if (nni_idhash_find(s->pipes, pid, (void **) &p) != 0) { + if ((p = nni_id_get(&s->pipes, pid)) == NULL) { // Surveyor has left the building. Just discard the reply. nni_mtx_unlock(&s->mtx); nni_aio_set_msg(aio, NULL); @@ -213,7 +213,7 @@ resp0_sock_fini(void *arg) { resp0_sock *s = arg; - nni_idhash_fini(s->pipes); + nni_id_map_fini(&s->pipes); resp0_ctx_fini(&s->ctx); nni_pollable_fini(&s->writable); nni_pollable_fini(&s->readable); @@ -224,15 +224,11 @@ static int resp0_sock_init(void *arg, nni_sock *nsock) { resp0_sock *s = arg; - int rv; NNI_ARG_UNUSED(nsock); nni_mtx_init(&s->mtx); - if ((rv = nni_idhash_init(&s->pipes)) != 0) { - resp0_sock_fini(s); - return (rv); - } + nni_id_map_init(&s->pipes, 0, 0, false); NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode); NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode); @@ -316,7 +312,7 @@ resp0_pipe_start(void *arg) } nni_mtx_lock(&s->mtx); - rv = nni_idhash_insert(s->pipes, p->id, p); + rv = nni_id_set(&s->pipes, p->id, p); nni_mtx_unlock(&s->mtx); if (rv != 0) { return (rv); @@ -354,7 +350,7 @@ resp0_pipe_close(void *arg) // which we will happily discard. nni_pollable_raise(&s->writable); } - nni_idhash_remove(s->pipes, p->id); + nni_id_remove(&s->pipes, p->id); nni_mtx_unlock(&s->mtx); } diff --git a/src/protocol/survey0/respond_test.c b/src/protocol/survey0/respond_test.c index 2801222c..efda181b 100644 --- a/src/protocol/survey0/respond_test.c +++ b/src/protocol/survey0/respond_test.c @@ -13,9 +13,6 @@ #include <nng/protocol/survey0/respond.h> #include <nng/protocol/survey0/survey.h> -#include <nng/protocol/reqrep0/rep.h> -#include <nng/protocol/reqrep0/req.h> - #include <acutest.h> #include <testutil.h> diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index e4cdca2c..f2cc8aa8 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -29,7 +29,7 @@ static void surv0_ctx_timeout(void *); struct surv0_ctx { surv0_sock * sock; - uint64_t survey_id; // survey id + uint32_t survey_id; // survey id nni_timer_node timer; nni_time expire; nni_lmq recv_lmq; @@ -45,7 +45,7 @@ struct surv0_sock { nni_list pipes; nni_mtx mtx; surv0_ctx ctx; - nni_idhash * surveys; + nni_id_map surveys; nni_pollable writable; nni_pollable readable; nni_atomic_int send_buf; @@ -57,8 +57,8 @@ struct surv0_pipe { surv0_sock * sock; nni_lmq send_queue; nni_list_node node; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; bool busy; bool closed; }; @@ -75,7 +75,7 @@ surv0_ctx_abort(surv0_ctx *ctx, int err) } nni_lmq_flush(&ctx->recv_lmq); if (ctx->survey_id != 0) { - nni_idhash_remove(sock->surveys, ctx->survey_id); + nni_id_remove(&sock->surveys, ctx->survey_id); ctx->survey_id = 0; } if (ctx == &sock->ctx) { @@ -148,7 +148,7 @@ surv0_ctx_cancel(nni_aio *aio, void *arg, int rv) nni_aio_finish_error(aio, rv); } if (ctx->survey_id != 0) { - nni_idhash_remove(sock->surveys, ctx->survey_id); + nni_id_remove(&sock->surveys, ctx->survey_id); ctx->survey_id = 0; } nni_mtx_unlock(&sock->mtx); @@ -237,8 +237,7 @@ surv0_ctx_send(void *arg, nni_aio *aio) nni_timer_cancel(&ctx->timer); // Allocate the new ID. - if ((rv = nni_idhash_alloc(sock->surveys, &ctx->survey_id, ctx)) != - 0) { + if ((rv = nni_id_alloc(&sock->surveys, &ctx->survey_id, ctx)) != 0) { nni_mtx_unlock(&sock->mtx); nni_aio_finish_error(aio, rv); return; @@ -256,8 +255,8 @@ surv0_ctx_send(void *arg, nni_aio *aio) if (!pipe->busy) { pipe->busy = true; nni_msg_clone(msg); - nni_aio_set_msg(pipe->aio_send, msg); - nni_pipe_send(pipe->pipe, pipe->aio_send); + nni_aio_set_msg(&pipe->aio_send, msg); + nni_pipe_send(pipe->pipe, &pipe->aio_send); } else if (!nni_lmq_full(&pipe->send_queue)) { nni_msg_clone(msg); nni_lmq_putq(&pipe->send_queue, msg); @@ -279,7 +278,7 @@ surv0_sock_fini(void *arg) surv0_sock *sock = arg; surv0_ctx_fini(&sock->ctx); - nni_idhash_fini(sock->surveys); + nni_id_map_fini(&sock->surveys); nni_pollable_fini(&sock->writable); nni_pollable_fini(&sock->readable); nni_mtx_fini(&sock->mtx); @@ -307,17 +306,15 @@ surv0_sock_init(void *arg, nni_sock *s) nni_atomic_init(&sock->send_buf); nni_atomic_set(&sock->send_buf, 8); - if (((rv = nni_idhash_init(&sock->surveys)) != 0) || - ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0)) { - surv0_sock_fini(sock); - return (rv); - } - // Survey IDs are 32 bits, with the high order bit set. // We start at a random point, to minimize likelihood of // accidental collision across restarts. - nni_idhash_set_limits(sock->surveys, 0x80000000u, 0xffffffffu, - nni_random() | 0x80000000u); + nni_id_map_init(&sock->surveys, 0x80000000u, 0xffffffffu, true); + + if ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0) { + surv0_sock_fini(sock); + return (rv); + } sock->ttl = 8; @@ -343,8 +340,8 @@ surv0_pipe_stop(void *arg) { surv0_pipe *p = arg; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -352,8 +349,8 @@ surv0_pipe_fini(void *arg) { surv0_pipe *p = arg; - nni_aio_free(p->aio_send); - nni_aio_free(p->aio_recv); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); nni_lmq_fini(&p->send_queue); } @@ -366,13 +363,13 @@ surv0_pipe_init(void *arg, nni_pipe *pipe, void *s) int len; len = nni_atomic_get(&sock->send_buf); + nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p); // This depth could be tunable. The deeper the queue, the more // concurrent surveys that can be delivered (multiple contexts). // Note that surveys can be *outstanding*, but not yet put on the wire. - if (((rv = nni_lmq_init(&p->send_queue, len)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, surv0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) { + if ((rv = nni_lmq_init(&p->send_queue, len)) != 0) { surv0_pipe_fini(p); return (rv); } @@ -396,7 +393,7 @@ surv0_pipe_start(void *arg) nni_list_append(&s->pipes, p); nni_mtx_unlock(&s->mtx); - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -406,8 +403,8 @@ surv0_pipe_close(void *arg) surv0_pipe *p = arg; surv0_sock *s = p->sock; - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_mtx_lock(&s->mtx); p->closed = true; @@ -425,9 +422,9 @@ surv0_pipe_send_cb(void *arg) surv0_sock *sock = p->sock; nni_msg * msg; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->pipe); return; } @@ -438,8 +435,8 @@ surv0_pipe_send_cb(void *arg) return; } if (nni_lmq_getq(&p->send_queue, &msg) == 0) { - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); } else { p->busy = false; } @@ -456,13 +453,13 @@ surv0_pipe_recv_cb(void *arg) uint32_t id; nni_aio * aio; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); // We yank 4 bytes of body, and move them to the header. @@ -478,7 +475,7 @@ surv0_pipe_recv_cb(void *arg) nni_mtx_lock(&sock->mtx); // Best effort at delivery. Discard if no context or context is // unable to receive it. - if ((nni_idhash_find(sock->surveys, id, (void **) &ctx) != 0) || + if (((ctx = nni_id_get(&sock->surveys, id)) == NULL) || (nni_lmq_full(&ctx->recv_lmq))) { nni_msg_free(msg); } else if ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { @@ -492,7 +489,7 @@ surv0_pipe_recv_cb(void *arg) } nni_mtx_unlock(&sock->mtx); - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); } static int diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c index 25aacc2c..b2f203c3 100644 --- a/src/protocol/survey0/xrespond.c +++ b/src/protocol/survey0/xrespond.c @@ -40,8 +40,8 @@ struct xresp0_sock { nni_msgq * urq; nni_msgq * uwq; nni_atomic_int ttl; - nni_idhash * pipes; - nni_aio * aio_getq; + nni_id_map pipes; + nni_aio aio_getq; nni_mtx mtx; }; @@ -51,10 +51,10 @@ struct xresp0_pipe { xresp0_sock *psock; uint32_t id; nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_putq; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_getq; + nni_aio aio_putq; + nni_aio aio_send; + nni_aio aio_recv; }; static void @@ -62,8 +62,8 @@ xresp0_sock_fini(void *arg) { xresp0_sock *s = arg; - nni_aio_free(s->aio_getq); - nni_idhash_fini(s->pipes); + nni_aio_fini(&s->aio_getq); + nni_id_map_fini(&s->pipes); nni_mtx_fini(&s->mtx); } @@ -71,17 +71,12 @@ static int xresp0_sock_init(void *arg, nni_sock *nsock) { xresp0_sock *s = arg; - int rv; nni_mtx_init(&s->mtx); nni_atomic_init(&s->ttl); nni_atomic_set(&s->ttl, 8); // Per RFC - if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_alloc(&s->aio_getq, xresp0_sock_getq_cb, s)) != - 0)) { - xresp0_sock_fini(s); - return (rv); - } + nni_id_map_init(&s->pipes, 0, 0, false); + nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s); s->urq = nni_sock_recvq(nsock); s->uwq = nni_sock_sendq(nsock); @@ -94,7 +89,7 @@ xresp0_sock_open(void *arg) { xresp0_sock *s = arg; - nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_msgq_aio_get(s->uwq, &s->aio_getq); } static void @@ -102,7 +97,7 @@ xresp0_sock_close(void *arg) { xresp0_sock *s = arg; - nni_aio_close(s->aio_getq); + nni_aio_close(&s->aio_getq); } static void @@ -110,10 +105,10 @@ xresp0_pipe_stop(void *arg) { xresp0_pipe *p = arg; - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_putq); + nni_aio_stop(&p->aio_getq); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -121,10 +116,10 @@ xresp0_pipe_fini(void *arg) { xresp0_pipe *p = arg; - nni_aio_free(p->aio_putq); - nni_aio_free(p->aio_getq); - nni_aio_free(p->aio_send); - nni_aio_free(p->aio_recv); + nni_aio_fini(&p->aio_putq); + nni_aio_fini(&p->aio_getq); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); nni_msgq_fini(p->sendq); } @@ -134,11 +129,12 @@ xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s) xresp0_pipe *p = arg; int rv; - if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_alloc(&p->aio_putq, xresp0_putq_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_recv, xresp0_recv_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_getq, xresp0_getq_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, xresp0_send_cb, p)) != 0)) { + nni_aio_init(&p->aio_putq, xresp0_putq_cb, p); + nni_aio_init(&p->aio_recv, xresp0_recv_cb, p); + nni_aio_init(&p->aio_getq, xresp0_getq_cb, p); + nni_aio_init(&p->aio_send, xresp0_send_cb, p); + + if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) { xresp0_pipe_fini(p); return (rv); } @@ -162,14 +158,14 @@ xresp0_pipe_start(void *arg) p->id = nni_pipe_id(p->npipe); nni_mtx_lock(&s->mtx); - rv = nni_idhash_insert(s->pipes, p->id, p); + rv = nni_id_set(&s->pipes, p->id, p); nni_mtx_unlock(&s->mtx); if (rv != 0) { return (rv); } - nni_pipe_recv(p->npipe, p->aio_recv); - nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_pipe_recv(p->npipe, &p->aio_recv); + nni_msgq_aio_get(p->sendq, &p->aio_getq); return (rv); } @@ -180,15 +176,15 @@ xresp0_pipe_close(void *arg) xresp0_pipe *p = arg; xresp0_sock *s = p->psock; - nni_aio_close(p->aio_putq); - nni_aio_close(p->aio_getq); - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_putq); + nni_aio_close(&p->aio_getq); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_msgq_close(p->sendq); nni_mtx_lock(&s->mtx); - nni_idhash_remove(s->pipes, p->id); + nni_id_remove(&s->pipes, p->id); nni_mtx_unlock(&s->mtx); } @@ -205,17 +201,17 @@ xresp0_sock_getq_cb(void *arg) uint32_t id; xresp0_pipe *p; - if (nni_aio_result(s->aio_getq) != 0) { + if (nni_aio_result(&s->aio_getq) != 0) { return; } - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); + msg = nni_aio_get_msg(&s->aio_getq); + nni_aio_set_msg(&s->aio_getq, NULL); // We yank the outgoing pipe id from the header if (nni_msg_header_len(msg) < 4) { nni_msg_free(msg); // We can't really close down the socket, so just keep going. - nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_msgq_aio_get(s->uwq, &s->aio_getq); return; } id = nni_msg_header_trim_u32(msg); @@ -224,12 +220,12 @@ xresp0_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) || + if (((p = nni_id_get(&s->pipes, id)) == NULL) || (nni_msgq_tryput(p->sendq, msg) != 0)) { nni_msg_free(msg); } nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_msgq_aio_get(s->uwq, &s->aio_getq); } void @@ -237,15 +233,15 @@ xresp0_getq_cb(void *arg) { xresp0_pipe *p = arg; - if (nni_aio_result(p->aio_getq) != 0) { + if (nni_aio_result(&p->aio_getq) != 0) { nni_pipe_close(p->npipe); return; } - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); + nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq)); + nni_aio_set_msg(&p->aio_getq, NULL); - nni_pipe_send(p->npipe, p->aio_send); + nni_pipe_send(p->npipe, &p->aio_send); } void @@ -253,14 +249,14 @@ xresp0_send_cb(void *arg) { xresp0_pipe *p = arg; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->npipe); return; } - nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_msgq_aio_get(p->sendq, &p->aio_getq); } static void @@ -273,14 +269,14 @@ xresp0_recv_cb(void *arg) int hops; int ttl; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->npipe); return; } ttl = nni_atomic_get(&s->ttl); - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, p->id); // Store the pipe id in the header, first thing. @@ -314,13 +310,13 @@ xresp0_recv_cb(void *arg) } // Now send it up. - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(urq, p->aio_putq); + nni_aio_set_msg(&p->aio_putq, msg); + nni_msgq_aio_put(urq, &p->aio_putq); return; drop: nni_msg_free(msg); - nni_pipe_recv(p->npipe, p->aio_recv); + nni_pipe_recv(p->npipe, &p->aio_recv); } static void @@ -328,22 +324,22 @@ xresp0_putq_cb(void *arg) { xresp0_pipe *p = arg; - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); + if (nni_aio_result(&p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_putq)); + nni_aio_set_msg(&p->aio_putq, NULL); nni_pipe_close(p->npipe); return; } - nni_pipe_recv(p->npipe, p->aio_recv); + nni_pipe_recv(p->npipe, &p->aio_recv); } static int xresp0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t) { xresp0_sock *s = arg; - int ttl; - int rv; + int ttl; + int rv; if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) { nni_atomic_set(&s->ttl, ttl); } diff --git a/src/transport/zerotier/CMakeLists.txt b/src/transport/zerotier/CMakeLists.txt index 5eca54c3..bc8673c5 100644 --- a/src/transport/zerotier/CMakeLists.txt +++ b/src/transport/zerotier/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> # Copyright 2018 Capitar IT Group BV <info@capitar.com> # # This software is supplied under the terms of the MIT License, a @@ -26,7 +26,7 @@ if (NNG_TRANSPORT_ZEROTIER) message(WARNING " ************************************************************ - Linking against zerotiercore changes license terms (GPLv3). + Linking against zerotiercore changes license terms. Consult a lawyer and the license files for details. ************************************************************") @@ -35,7 +35,9 @@ if (NNG_TRANSPORT_ZEROTIER) set(_LIBS zerotiercore::zerotiercore) set(_DEFS -DNNG_TRANSPORT_ZEROTIER) - set(_SRCS transport/zerotier/zerotier.c ${PROJECT_SOURCE_DIR}/include/nng/transport/zerotier/zerotier.h) + set(_SRCS transport/zerotier/zerotier.c + transport/zerotier/zthash.c + ${PROJECT_SOURCE_DIR}/include/nng/transport/zerotier/zerotier.h) set(NNG_DEFS ${NNG_DEFS} ${_DEFS} PARENT_SCOPE) set(NNG_LIBS ${NNG_LIBS} ${_LIBS} PARENT_SCOPE) diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index 32e3ef7d..2667d027 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -14,6 +14,7 @@ #include <string.h> #include "core/nng_impl.h" +#include "zthash.h" #include "nng/transport/zerotier/zerotier.h" @@ -153,10 +154,10 @@ struct zt_node { nni_plat_udp * zn_udp6; nni_list zn_eplist; nni_list zn_plist; - nni_idhash * zn_ports; - nni_idhash * zn_eps; - nni_idhash * zn_lpipes; - nni_idhash * zn_rpipes; + zt_hash * zn_ports; + zt_hash * zn_eps; + zt_hash * zn_lpipes; + zt_hash * zn_rpipes; nni_aio * zn_rcv4_aio; uint8_t * zn_rcv4_buf; nng_sockaddr zn_rcv4_addr; @@ -657,8 +658,7 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) // Do we already have a matching pipe? If so, we can discard // the operation. This should not happen, since we normally, // deregister the endpoint when we create the pipe. - if ((nni_idhash_find(ztn->zn_lpipes, ep->ze_laddr, (void **) &p)) == - 0) { + if ((zt_hash_find(ztn->zn_lpipes, ep->ze_laddr, (void **) &p)) == 0) { return; } @@ -672,7 +672,7 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) // Reset the address of the endpoint, so that the next call to // ep_connect will bind a new one -- we are using this one for the // pipe. - nni_idhash_remove(ztn->zn_eps, ep->ze_laddr); + zt_hash_remove(ztn->zn_eps, ep->ze_laddr); ep->ze_laddr = 0; nni_aio_set_output(aio, 0, p); @@ -699,7 +699,7 @@ zt_ep_recv_conn_req(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) // If we already have created a pipe for this connection // then just reply the conn ack. - if ((nni_idhash_find(ztn->zn_rpipes, raddr, (void **) &p)) == 0) { + if ((zt_hash_find(ztn->zn_rpipes, raddr, (void **) &p)) == 0) { zt_pipe_send_conn_ack(p); return; } @@ -1083,21 +1083,21 @@ zt_virtual_recv(ZT_Node *node, void *userptr, void *thr, uint64_t nwid, // vs. client pipes separately. // If its a local address match on a client pipe, process it. - if ((nni_idhash_find(ztn->zn_lpipes, laddr, (void *) &p) == 0) && + if ((zt_hash_find(ztn->zn_lpipes, laddr, (void *) &p) == 0) && (p->zp_nwid == nwid) && (p->zp_raddr == raddr)) { zt_pipe_virtual_recv(p, op, data, len); return; } // If its a remote address match on a server pipe, process it. - if ((nni_idhash_find(ztn->zn_rpipes, raddr, (void *) &p) == 0) && + if ((zt_hash_find(ztn->zn_rpipes, raddr, (void *) &p) == 0) && (p->zp_nwid == nwid) && (p->zp_laddr == laddr)) { zt_pipe_virtual_recv(p, op, data, len); return; } // No pipe, so look for an endpoint. - if ((nni_idhash_find(ztn->zn_eps, laddr, (void **) &ep) == 0) && + if ((zt_hash_find(ztn->zn_eps, laddr, (void **) &ep) == 0) && (ep->ze_nwid == nwid)) { // direct this to an endpoint. zt_ep_virtual_recv(ep, op, raddr, data, len); @@ -1408,9 +1408,9 @@ zt_node_destroy(zt_node *ztn) } nni_aio_free(ztn->zn_rcv4_aio); nni_aio_free(ztn->zn_rcv6_aio); - nni_idhash_fini(ztn->zn_eps); - nni_idhash_fini(ztn->zn_lpipes); - nni_idhash_fini(ztn->zn_rpipes); + zt_hash_fini(ztn->zn_eps); + zt_hash_fini(ztn->zn_lpipes); + zt_hash_fini(ztn->zn_rpipes); nni_cv_fini(&ztn->zn_bgcv); NNI_FREE_STRUCT(ztn); } @@ -1448,10 +1448,10 @@ zt_node_create(zt_node **ztnp, const char *path) zt_node_destroy(ztn); return (NNG_ENOMEM); } - if (((rv = nni_idhash_init(&ztn->zn_ports)) != 0) || - ((rv = nni_idhash_init(&ztn->zn_eps)) != 0) || - ((rv = nni_idhash_init(&ztn->zn_lpipes)) != 0) || - ((rv = nni_idhash_init(&ztn->zn_rpipes)) != 0) || + if (((rv = zt_hash_init(&ztn->zn_ports)) != 0) || + ((rv = zt_hash_init(&ztn->zn_eps)) != 0) || + ((rv = zt_hash_init(&ztn->zn_lpipes)) != 0) || + ((rv = zt_hash_init(&ztn->zn_rpipes)) != 0) || ((rv = nni_thr_init(&ztn->zn_bgthr, zt_bgthr, ztn)) != 0) || ((rv = nni_plat_udp_open(&ztn->zn_udp4, &sa4)) != 0) || ((rv = nni_plat_udp_open(&ztn->zn_udp6, &sa6)) != 0)) { @@ -1480,7 +1480,7 @@ zt_node_create(zt_node **ztnp, const char *path) // higher than the max port, and starting with an // initial random value. Note that this should give us // about 8 million possible ephemeral ports. - nni_idhash_set_limits(ztn->zn_ports, zt_ephemeral, zt_max_port, + zt_hash_limits(ztn->zn_ports, zt_ephemeral, zt_max_port, (nni_random() % (zt_max_port - zt_ephemeral)) + zt_ephemeral); nni_strlcpy(ztn->zn_path, path, sizeof(ztn->zn_path)); @@ -1745,9 +1745,9 @@ zt_pipe_fini(void *arg) // This tosses the connection details and all state. nni_mtx_lock(&zt_lk); - nni_idhash_remove(ztn->zn_ports, p->zp_laddr & zt_port_mask); - nni_idhash_remove(ztn->zn_lpipes, p->zp_laddr); - nni_idhash_remove(ztn->zn_rpipes, p->zp_raddr); + zt_hash_remove(ztn->zn_ports, p->zp_laddr & zt_port_mask); + zt_hash_remove(ztn->zn_lpipes, p->zp_laddr); + zt_hash_remove(ztn->zn_rpipes, p->zp_raddr); nni_mtx_unlock(&zt_lk); for (int i = 0; i < zt_recvq; i++) { @@ -1798,10 +1798,10 @@ zt_pipe_alloc( if (listener) { // listener - rv = nni_idhash_insert(ztn->zn_rpipes, raddr, p); + rv = zt_hash_insert(ztn->zn_rpipes, raddr, p); } else { // dialer - rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p); + rv = zt_hash_insert(ztn->zn_lpipes, laddr, p); } if ((rv != 0) || ((rv = nni_aio_alloc(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) { @@ -2342,8 +2342,8 @@ zt_ep_close(void *arg) // If we're on the ztn node list, pull us off. if (ztn != NULL) { nni_list_node_remove(&ep->ze_link); - nni_idhash_remove(ztn->zn_ports, ep->ze_laddr & zt_port_mask); - nni_idhash_remove(ztn->zn_eps, ep->ze_laddr); + zt_hash_remove(ztn->zn_ports, ep->ze_laddr & zt_port_mask); + zt_hash_remove(ztn->zn_eps, ep->ze_laddr); } nni_mtx_unlock(&zt_lk); @@ -2374,7 +2374,7 @@ zt_ep_bind_locked(zt_ep *ep) if ((ep->ze_laddr & zt_port_mask) == 0) { // ask for an ephemeral port - if ((rv = nni_idhash_alloc(ztn->zn_ports, &port, ep)) != 0) { + if ((rv = zt_hash_alloc(ztn->zn_ports, &port, ep)) != 0) { return (rv); } NNI_ASSERT(port & zt_ephemeral); @@ -2383,10 +2383,10 @@ zt_ep_bind_locked(zt_ep *ep) // make sure port requested is free. port = ep->ze_laddr & zt_port_mask; - if (nni_idhash_find(ztn->zn_ports, port, &conflict) == 0) { + if (zt_hash_find(ztn->zn_ports, port, &conflict) == 0) { return (NNG_EADDRINUSE); } - if ((rv = nni_idhash_insert(ztn->zn_ports, port, ep)) != 0) { + if ((rv = zt_hash_insert(ztn->zn_ports, port, ep)) != 0) { return (rv); } } @@ -2398,8 +2398,8 @@ zt_ep_bind_locked(zt_ep *ep) ep->ze_laddr |= port; ep->ze_running = true; - if ((rv = nni_idhash_insert(ztn->zn_eps, ep->ze_laddr, ep)) != 0) { - nni_idhash_remove(ztn->zn_ports, port); + if ((rv = zt_hash_insert(ztn->zn_eps, ep->ze_laddr, ep)) != 0) { + zt_hash_remove(ztn->zn_ports, port); return (rv); } diff --git a/src/transport/zerotier/zthash.c b/src/transport/zerotier/zthash.c new file mode 100644 index 00000000..ca46b373 --- /dev/null +++ b/src/transport/zerotier/zthash.c @@ -0,0 +1,302 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" +#include "zthash.h" + +struct zt_hash_entry { + uint64_t key; + void * val; + uint32_t skips; +}; + +int +zt_hash_init(zt_hash **hp) +{ + zt_hash *h; + + if ((h = NNI_ALLOC_STRUCT(h)) == NULL) { + return (NNG_ENOMEM); + } + h->ih_entries = NULL; + h->ih_count = 0; + h->ih_load = 0; + h->ih_cap = 0; + h->ih_maxload = 0; + h->ih_minload = 0; // never shrink below this + h->ih_minval = 0; + h->ih_maxval = 0xffffffff; + h->ih_dynval = 0; + + *hp = h; + return (0); +} + +void +zt_hash_fini(zt_hash *h) +{ + if (h != NULL) { + if (h->ih_entries != NULL) { + NNI_FREE_STRUCTS(h->ih_entries, h->ih_cap); + h->ih_entries = NULL; + h->ih_cap = h->ih_count = 0; + h->ih_load = h->ih_minload = h->ih_maxload = 0; + } + + NNI_FREE_STRUCT(h); + } +} + +void +zt_hash_limits(zt_hash *h, uint64_t minval, uint64_t maxval, uint64_t start) +{ + if (start < minval) { + start = minval; + } + if (start > maxval) { + start = maxval; + } + + h->ih_minval = minval; + h->ih_maxval = maxval; + h->ih_dynval = start; + NNI_ASSERT(minval < maxval); + NNI_ASSERT(start >= minval); + NNI_ASSERT(start <= maxval); +} + +// Inspired by Python dict implementation. This probe will visit every +// cell. We always hash consecutively assigned IDs. +#define ZT_HASH_NEXT(h, j) ((((j) *5) + 1) & (h->ih_cap - 1)) +#define ZT_HASH_INDEX(h, j) ((j) & (h->ih_cap - 1)) + +static size_t +zt_hash_find_index(zt_hash *h, uint64_t id) +{ + size_t index; + size_t start; + if (h->ih_count == 0) { + return ((size_t) -1); + } + + index = ZT_HASH_INDEX(h, id); + start = index; + for (;;) { + // The value of ihe_key is only valid if ihe_val is not NULL. + if ((h->ih_entries[index].key == id) && + (h->ih_entries[index].val != NULL)) { + return (index); + } + if (h->ih_entries[index].skips == 0) { + return ((size_t) -1); + } + index = ZT_HASH_NEXT(h, index); + + if (index == start) { + break; + } + } + + return ((size_t) -1); +} + +int +zt_hash_find(zt_hash *h, uint64_t id, void **vp) +{ + size_t index; + if ((index = zt_hash_find_index(h, id)) == (size_t) -1) { + return (NNG_ENOENT); + } + *vp = h->ih_entries[index].val; + return (0); +} + +static int +zt_hash_resize(zt_hash *h) +{ + size_t newsize; + size_t oldsize; + zt_hash_entry *newents; + zt_hash_entry *oldents; + uint32_t i; + + if ((h->ih_load < h->ih_maxload) && (h->ih_load >= h->ih_minload)) { + // No resize needed. + return (0); + } + + oldsize = h->ih_cap; + + newsize = 8; + while (newsize < (h->ih_count * 2)) { + newsize *= 2; + } + if (newsize == oldsize) { + // Same size. + return (0); + } + + oldents = h->ih_entries; + newents = NNI_ALLOC_STRUCTS(newents, newsize); + if (newents == NULL) { + return (NNG_ENOMEM); + } + + h->ih_entries = newents; + h->ih_cap = newsize; + h->ih_load = 0; + if (newsize > 8) { + h->ih_minload = newsize / 8; + h->ih_maxload = newsize * 2 / 3; + } else { + h->ih_minload = 0; + h->ih_maxload = 5; + } + for (i = 0; i < oldsize; i++) { + size_t index; + if (oldents[i].val == NULL) { + continue; + } + index = oldents[i].key & (newsize - 1); + for (;;) { + // Increment the load unconditionally. It counts + // once for every item stored, plus once for each + // hashing operation we use to store the item (i.e. + // one for the item, plus once for each rehash.) + h->ih_load++; + if (newents[index].val == NULL) { + // As we are hitting this entry for the first + // time, it won't have any skips. + NNI_ASSERT(newents[index].skips == 0); + newents[index].val = oldents[i].val; + newents[index].key = oldents[i].key; + break; + } + newents[index].skips++; + index = ZT_HASH_NEXT(h, index); + } + } + if (oldsize != 0) { + NNI_FREE_STRUCTS(oldents, oldsize); + } + return (0); +} + +int +zt_hash_remove(zt_hash *h, uint64_t id) +{ + size_t index; + size_t probe; + + if ((index = zt_hash_find_index(h, id)) == (size_t) -1) { + return (NNG_ENOENT); + } + + // Now we have found the index where the object exists. We are going + // to restart the search, until the index matches, to decrement the + // skips counter. + probe = (int) ZT_HASH_INDEX(h, id); + + for (;;) { + zt_hash_entry *entry; + // The load was increased once each hashing operation we used + // to place the the item. Decrement it accordingly. + h->ih_load--; + entry = &h->ih_entries[probe]; + if (probe == index) { + entry->val = NULL; + entry->key = 0; + break; + } + NNI_ASSERT(entry->skips > 0); + entry->skips--; + probe = ZT_HASH_NEXT(h, probe); + } + + h->ih_count--; + + // Shrink -- but it's ok if we can't. + (void) zt_hash_resize(h); + + return (0); +} + +int +zt_hash_insert(zt_hash *h, uint64_t id, void *val) +{ + size_t index; + zt_hash_entry *ent; + + // Try to resize -- if we don't need to, this will be a no-op. + if (zt_hash_resize(h) != 0) { + return (NNG_ENOMEM); + } + + // If it already exists, just overwrite the old value. + if ((index = zt_hash_find_index(h, id)) != (size_t) -1) { + ent = &h->ih_entries[index]; + ent->val = val; + return (0); + } + + index = ZT_HASH_INDEX(h, id); + for (;;) { + ent = &h->ih_entries[index]; + + // Increment the load count. We do this each time time we + // rehash. This may over-count items that collide on the + // same rehashing, but this should just cause a table to + // grow sooner, which is probably a good thing. + h->ih_load++; + if (ent->val == NULL) { + h->ih_count++; + ent->key = id; + ent->val = val; + return (0); + } + // Record the skip count. This being non-zero informs + // that a rehash will be necessary. Without this we + // would need to scan the entire hash for the match. + ent->skips++; + index = ZT_HASH_NEXT(h, index); + } +} + +int +zt_hash_alloc(zt_hash *h, uint64_t *idp, void *val) +{ + uint64_t id; + int rv; + + NNI_ASSERT(val != NULL); + + if (h->ih_count > (h->ih_maxval - h->ih_minval)) { + // Really more like ENOSPC.. the table is filled to max. + return (NNG_ENOMEM); + } + + for (;;) { + id = h->ih_dynval; + h->ih_dynval++; + if (h->ih_dynval > h->ih_maxval) { + h->ih_dynval = h->ih_minval; + } + + if (zt_hash_find_index(h, id) == (size_t) -1) { + break; + } + } + + rv = zt_hash_insert(h, id, val); + if (rv == 0) { + *idp = id; + } + return (rv); +} diff --git a/src/transport/zerotier/zthash.h b/src/transport/zerotier/zthash.h new file mode 100644 index 00000000..249eabbf --- /dev/null +++ b/src/transport/zerotier/zthash.h @@ -0,0 +1,43 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef ZT_HASH_H +#define ZT_HASH_H + +#include <stdint.h> + +// This code is derived from id hash, but supports 64-bit IDs. + +typedef struct zt_hash zt_hash; +typedef struct zt_hash_entry zt_hash_entry; + +// NB: These details are entirely private to the hash implementation. +// They are provided here to facilitate inlining in structures. +struct zt_hash { + size_t ih_cap; + size_t ih_count; + size_t ih_load; + size_t ih_minload; // considers placeholders + size_t ih_maxload; + uint64_t ih_minval; + uint64_t ih_maxval; + uint64_t ih_dynval; + zt_hash_entry *ih_entries; +}; + +extern int zt_hash_init(zt_hash **); +extern void zt_hash_fini(zt_hash *); +extern void zt_hash_limits(zt_hash *, uint64_t, uint64_t, uint64_t); +extern int zt_hash_find(zt_hash *, uint64_t, void **); +extern int zt_hash_remove(zt_hash *, uint64_t); +extern int zt_hash_insert(zt_hash *, uint64_t, void *); +extern int zt_hash_alloc(zt_hash *, uint64_t *, void *); + +#endif // CORE_IDHASH_H diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 90d674ff..d7842bfb 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -127,6 +127,7 @@ endif () nng_test(aio) nng_test(bufsz) nng_test(bug1247) +nng_test(id) nng_test(platform) nng_test(reconnect) nng_test(sock) @@ -136,7 +137,6 @@ add_nng_test(errors 2) add_nng_test(files 5) add_nng_test1(httpclient 60 NNG_SUPP_HTTP) add_nng_test1(httpserver 30 NNG_SUPP_HTTP) -add_nng_test(idhash 30) add_nng_test(inproc 5) add_nng_test(ipc 5) add_nng_test(ipcperms 5) diff --git a/tests/id.c b/tests/id.c new file mode 100644 index 00000000..e8944547 --- /dev/null +++ b/tests/id.c @@ -0,0 +1,275 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "acutest.h" +#include "testutil.h" + +#include "core/idhash.h" + +void +test_basic(void) +{ + nni_id_map m; + char * five = "five"; + char * four = "four"; + + nni_id_map_init(&m, 0, 0, false); + + // insert it + TEST_NNG_PASS(nni_id_set(&m, 5, five)); + // retrieve it + TEST_CHECK(nni_id_get(&m, 5) == five); + + // change it + TEST_NNG_PASS(nni_id_set(&m, 5, four)); + TEST_CHECK(nni_id_get(&m, 5) == four); + + // delete + TEST_NNG_PASS(nni_id_remove(&m, 5)); + + nni_id_map_fini(&m); +} + +void +test_random(void) +{ + int i; + uint32_t id; + for (i = 0; i < 2; i++) { + nni_id_map m; + nni_id_map_init(&m, 0, 0, true); + TEST_NNG_PASS(nni_id_alloc(&m, &id, &id)); + nni_id_map_fini(&m); + TEST_CHECK(id != 0); + if (id != 1) { + break; + } + // one chance in 4 billion, but try again + } + + TEST_CHECK(id != 1); + TEST_CHECK(i < 2); +} + +void +test_collision(void) +{ + nni_id_map m; + char * five = "five"; + char * four = "four"; + + nni_id_map_init(&m, 0, 0, false); + + // Carefully crafted -- 13 % 8 == 5. + TEST_NNG_PASS(nni_id_set(&m, 5, five)); + TEST_NNG_PASS(nni_id_set(&m, 13, four)); + TEST_CHECK(nni_id_get(&m, 5) == five); + TEST_CHECK(nni_id_get(&m, 13) == four); + + // Delete the intermediate + TEST_NNG_PASS(nni_id_remove(&m, 5)); + TEST_CHECK(nni_id_get(&m, 13) == four); + + nni_id_map_fini(&m); +} + +void +test_empty(void) +{ + nni_id_map m; + nni_id_map_init(&m, 0, 0, false); + + TEST_CHECK(nni_id_get(&m, 42) == NULL); + TEST_NNG_FAIL(nni_id_remove(&m, 42), NNG_ENOENT); + TEST_NNG_FAIL(nni_id_remove(&m, 1), NNG_ENOENT); + nni_id_map_fini(&m); +} + +void +test_not_found(void) +{ + nni_id_map m; + uint32_t id; + nni_id_map_init(&m, 0, 0, false); + + TEST_NNG_PASS(nni_id_alloc(&m, &id, &id)); + TEST_NNG_FAIL(nni_id_remove(&m, 42), NNG_ENOENT); + TEST_NNG_FAIL(nni_id_remove(&m, 2), NNG_ENOENT); + TEST_NNG_PASS(nni_id_remove(&m, id)); + nni_id_map_fini(&m); +} + +void +test_resize(void) +{ + nni_id_map m; + int rv; + int i; + int expect[1024]; + + for (i = 0; i < 1024; i++) { + expect[i] = i; + } + + nni_id_map_init(&m, 0, 0, false); + + for (i = 0; i < 1024; i++) { + if ((rv = nni_id_set(&m, i, &expect[i])) != 0) { + TEST_NNG_PASS(rv); + } + } + + for (i = 0; i < 1024; i++) { + if ((rv = nni_id_remove(&m, i)) != 0) { + TEST_NNG_PASS(rv); + } + } + nni_id_map_fini(&m); +} + +void +test_dynamic(void) +{ + nni_id_map m; + int expect[5]; + uint32_t id; + + nni_id_map_init(&m, 10, 13, false); + + // We can fill the table. + TEST_NNG_PASS(nni_id_alloc(&m, &id, &expect[0])); + TEST_CHECK(id == 10); + TEST_NNG_PASS(nni_id_alloc(&m, &id, &expect[1])); + TEST_CHECK(id == 11); + TEST_NNG_PASS(nni_id_alloc(&m, &id, &expect[2])); + TEST_CHECK(id == 12); + TEST_NNG_PASS(nni_id_alloc(&m, &id, &expect[3])); + TEST_CHECK(id == 13); + + // Adding another fails. + TEST_NNG_FAIL(nni_id_alloc(&m, &id, &expect[4]), NNG_ENOMEM); + + // Delete one. + TEST_NNG_PASS(nni_id_remove(&m, 11)); + + // And now we can allocate one. + TEST_NNG_PASS(nni_id_alloc(&m, &id, &expect[4])); + TEST_CHECK(id == 11); + nni_id_map_fini(&m); +} + +void +test_set_out_of_range(void) +{ + nni_id_map m; + int x; + uint32_t id; + + nni_id_map_init(&m, 10, 13, false); + + // We can insert outside the range forcibly. + TEST_NNG_PASS(nni_id_set(&m, 1, &x)); + TEST_NNG_PASS(nni_id_set(&m, 100, &x)); + TEST_NNG_PASS(nni_id_alloc(&m, &id, &x)); + TEST_CHECK(id == 10); + nni_id_map_fini(&m); +} + +#define STRESS_LOAD 50000 +#define NUM_VALUES 1000 + +void +test_stress(void) +{ + void * values[NUM_VALUES]; + nni_id_map m; + size_t i; + int rv; + void * x; + int v; + + nni_id_map_init(&m, 0, 0, false); + for (i = 0; i < NUM_VALUES; i++) { + values[i] = NULL; + } + + for (i = 0; i < STRESS_LOAD; i++) { + v = rand() % NUM_VALUES; // Keep it constrained + + switch (rand() & 3) { + case 0: + x = &values[rand() % NUM_VALUES]; + values[v] = x; + if ((rv = nni_id_set(&m, v, x)) != 0) { + TEST_NNG_PASS(rv); + goto out; + } + break; + + case 1: + rv = nni_id_remove(&m, v); + if (values[v] == NULL) { + if (rv != NNG_ENOENT) { + TEST_NNG_FAIL(rv, NNG_ENOENT); + goto out; + } + } else { + values[v] = NULL; + if (rv != 0) { + TEST_NNG_PASS(rv); + goto out; + } + } + break; + case 2: + x = nni_id_get(&m, v); + if (x != values[v]) { + TEST_CHECK(x == values[v]); + goto out; + } + break; + } + } +out: + TEST_CHECK(i == STRESS_LOAD); + + // Post stress check. + for (i = 0; i < NUM_VALUES; i++) { + x = nni_id_get(&m, i); + if (x != values[i]) { + TEST_CHECK(x == values[i]); + break; + } + + // We only use the test macros if we know they are going + // to fail. Otherwise there will be too many errors reported. + rv = nni_id_remove(&m, i); + if ((x == NULL) && (rv != NNG_ENOENT)) { + TEST_NNG_FAIL(rv, NNG_ENOENT); + } else if ((x != NULL) && (rv != 0)) { + TEST_NNG_PASS(rv); + } + } + TEST_CHECK(i == NUM_VALUES); + + nni_id_map_fini(&m); +} + +TEST_LIST = { + { "basic", test_basic }, + { "random", test_random }, + { "collision", test_collision }, + { "empty", test_empty }, + { "not found", test_not_found }, + { "resize", test_resize }, + { "dynamic", test_dynamic }, + { "set out of range", test_set_out_of_range }, + { "stress", test_stress }, + { NULL, NULL }, +}; diff --git a/tests/idhash.c b/tests/idhash.c deleted file mode 100644 index 60a7133a..00000000 --- a/tests/idhash.c +++ /dev/null @@ -1,259 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/idhash.c" -#include "core/nng_impl.h" - -#include "convey.h" - -#define STRESSLOAD 50000 -#define NVALUES 1000 - -int -stress(nni_idhash *h, void **values, size_t nvalues, int iter) -{ - for (int i = 0; i < iter; i++) { - void *x; - int v = rand() % nvalues; // Keep it constrained - - switch (rand() & 3) { - case 0: - x = &values[rand() % nvalues]; - values[v] = x; - if (nni_idhash_insert(h, v, x) != 0) { - return (-1); - } - break; - - case 1: - if (values[v] == NULL) { - if (nni_idhash_remove(h, v) != NNG_ENOENT) { - return (-1); - } else { - break; - } - } else { - if (nni_idhash_remove(h, v) != 0) { - return (-1); - } - values[v] = NULL; - } - break; - case 2: - if (values[v] == NULL) { - if (nni_idhash_find(h, v, &x) != NNG_ENOENT) { - return (-1); - } - - } else { - if ((nni_idhash_find(h, v, &x) != 0) || - (x != values[v])) { - return (-1); - } - } - break; - } - } - return (0); -} - -int -poststress(nni_idhash *h, void **values, size_t nvalues) -{ - for (size_t i = 0; i < nvalues; i++) { - void *x; - if (values[i] == NULL) { - if ((nni_idhash_find(h, i, &x) != NNG_ENOENT) || - (nni_idhash_remove(h, i) != NNG_ENOENT)) { - return (-1); - } - continue; - } - if (((nni_idhash_find(h, i, &x) != 0) || (x != values[i])) || - (nni_idhash_remove(h, i) != 0)) { - return (-1); - } - } - return (0); -} - -Main({ - nni_init(); - atexit(nni_fini); - Test("General ID Hash", { - int rv; - - Convey("Given an id hash", { - nni_idhash *h = NULL; - - So(nni_idhash_init(&h) == 0); - So(h != NULL); - - Reset({ nni_idhash_fini(h); }); - - Convey("We can insert an element", { - char *five = "five"; - char *four = "four"; - rv = nni_idhash_insert(h, 5, five); - So(rv == 0); - - Convey("And we can find it", { - void *ptr; - rv = nni_idhash_find(h, 5, &ptr); - So(rv == 0); - So(ptr == five); - }); - Convey("We can delete it", { - void *ptr; - rv = nni_idhash_remove(h, 5); - So(rv == 0); - rv = nni_idhash_find(h, 5, &ptr); - So(rv == NNG_ENOENT); - }); - Convey("We can change the value", { - void *ptr; - So(nni_idhash_insert(h, 5, four) == 0); - So(nni_idhash_find(h, 5, &ptr) == 0); - So(ptr == four); - }); - Convey("We can insert a hash collision", { - void *ptr; - So(nni_idhash_insert(h, 13, four) == - 0); - So(nni_idhash_find(h, 5, &ptr) == 0); - So(ptr == five); - So(nni_idhash_find(h, 13, &ptr) == 0); - So(ptr == four); - Convey("And delete intermediate", { - So(nni_idhash_remove(h, 5) == - 0); - ptr = NULL; - So(nni_idhash_find( - h, 13, &ptr) == 0); - So(ptr == four); - }); - }); - }); - Convey("We cannot find bogus values", { - void *ptr; - ptr = NULL; - rv = nni_idhash_find(h, 42, &ptr); - So(rv == NNG_ENOENT); - So(ptr == NULL); - }); - - Convey("64-bit hash values work", { - char * huge = "huge"; - void * ptr = NULL; - uint64_t hugenum = 0x1234567890ULL; - - nni_idhash_set_limits(h, 1, 1ULL << 63, 1); - So(nni_idhash_insert(h, hugenum, huge) == 0); - So(nni_idhash_find(h, hugenum, &ptr) == 0); - So((char *) ptr == huge); - }); - - Convey("64-bit dynvals work", { - char * huge = "dynhuge"; - void * ptr = NULL; - uint64_t id; - - nni_idhash_set_limits( - h, 1ULL << 32, 1ULL << 63, 1); - So(nni_idhash_alloc(h, &id, huge) == 0); - So(id > 0xffffffff); - So(nni_idhash_find(h, id, &ptr) == 0); - So((char *) ptr == huge); - }); - }); - }); - - Test("Resize ID Hash", { - int expect[1024]; - int i; - - for (i = 0; i < 1024; i++) { - expect[i] = i; - } - Convey("Given an id hash", { - nni_idhash *h; - - So(nni_idhash_init(&h) == 0); - - Reset({ nni_idhash_fini(h); }); - - Convey("We can insert 1024 items", { - for (i = 0; i < 1024; i++) { - nni_idhash_insert(h, i, &expect[i]); - } - - Convey("We can remove them", { - for (i = 0; i < 1024; i++) { - nni_idhash_remove(h, i); - } - }); - }); - }); - }); - - Test("Dynamic ID generation", { - Convey("Given a small ID hash", { - nni_idhash *h; - int expect[5]; - uint64_t id; - So(nni_idhash_init(&h) == 0); - Reset({ nni_idhash_fini(h); }); - nni_idhash_set_limits(h, 10, 13, 10); - So(1); - Convey("We can fill the table", { - for (uint64_t i = 0; i < 4; i++) { - So(nni_idhash_alloc( - h, &id, &expect[i]) == 0); - So(id == (i + 10)); - } - Convey("Adding another fails", { - So(nni_idhash_alloc(h, &id, - &expect[5]) == NNG_ENOMEM); - }); - Convey("Deleting one lets us reinsert", { - nni_idhash_remove(h, 11); - So(nni_idhash_alloc( - h, &id, &expect[5]) == 0); - So(id == 11); - }); - }); - Convey("We can insert outside range forcibly", { - So(nni_idhash_insert(h, 1, &expect[0]) == 0); - So(nni_idhash_insert(h, 100, &expect[0]) == 0); - So(nni_idhash_alloc(h, &id, &expect[1]) == 0); - So(id >= 10); - So(id <= 13); - }); - }); - }); - - Test("Stress it", { - void *values[NVALUES]; - - Convey("Given a hash", { - nni_idhash *h; - So(nni_idhash_init(&h) == 0); - Reset({ nni_idhash_fini(h); }); - memset(values, 0, sizeof(values)); - - Convey("A stress run works", { - So(stress(h, values, NVALUES, STRESSLOAD) == - 0); - So(poststress(h, values, NVALUES) == 0); - So(nni_idhash_count(h) == 0); - }); - }); - }); -}) |
