diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-12-19 10:21:54 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-12-19 12:50:05 -0800 |
| commit | d12e169c1e733b255d146847ed57037b74681285 (patch) | |
| tree | e4a59142a6cf097dfdda8620635f173f53db9e7a /src | |
| parent | 2033988343bce413763d3e9664e3e8372da48591 (diff) | |
| download | nng-d12e169c1e733b255d146847ed57037b74681285.tar.gz nng-d12e169c1e733b255d146847ed57037b74681285.tar.bz2 nng-d12e169c1e733b255d146847ed57037b74681285.zip | |
fixes #1372 nni_reap could be smaller
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 101 | ||||
| -rw-r--r-- | src/core/aio.h | 3 | ||||
| -rw-r--r-- | src/core/aio_test.c | 4 | ||||
| -rw-r--r-- | src/core/dialer.c | 10 | ||||
| -rw-r--r-- | src/core/listener.c | 9 | ||||
| -rw-r--r-- | src/core/pipe.c | 67 | ||||
| -rw-r--r-- | src/core/pipe.h | 4 | ||||
| -rw-r--r-- | src/core/reap.c | 102 | ||||
| -rw-r--r-- | src/core/reap.h | 42 | ||||
| -rw-r--r-- | src/core/socket.c | 44 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 7 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipc.h | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipcconn.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcp.h | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpconn.c | 10 | ||||
| -rw-r--r-- | src/platform/windows/win_ipcconn.c | 13 | ||||
| -rw-r--r-- | src/platform/windows/win_tcpdial.c | 11 | ||||
| -rw-r--r-- | src/platform/windows/win_tcplisten.c | 11 | ||||
| -rw-r--r-- | src/supplemental/http/http_server.c | 38 | ||||
| -rw-r--r-- | src/supplemental/tls/tls_common.c | 14 | ||||
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 11 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 19 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 19 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 25 |
24 files changed, 338 insertions, 236 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index b910a600..272f5e9d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -19,13 +19,6 @@ static nni_thr nni_aio_expire_thr; static nni_list nni_aio_expire_list; static nni_aio *nni_aio_expire_aio; -// Reaping items. -static nni_thr nni_aio_reap_thr; -static nni_aio *nni_aio_reap_list; -static nni_mtx nni_aio_reap_lk; -static nni_cv nni_aio_reap_cv; -static bool nni_aio_reap_exit; - // Design notes. // // AIOs are only ever "completed" by the provider, which must call @@ -68,6 +61,11 @@ static bool nni_aio_reap_exit; // operations from starting, without waiting for any existing one to // complete, call nni_aio_close. +static nni_reap_list aio_reap_list = { + .rl_offset = offsetof(nni_aio, a_reap_node), + .rl_func = (nni_cb) nni_aio_free, +}; + static void nni_aio_expire_add(nni_aio *); void @@ -145,11 +143,7 @@ void nni_aio_reap(nni_aio *aio) { if (aio != NULL) { - nni_mtx_lock(&nni_aio_reap_lk); - aio->a_reap_next = nni_aio_reap_list; - nni_aio_reap_list = aio; - nni_cv_wake1(&nni_aio_reap_cv); - nni_mtx_unlock(&nni_aio_reap_lk); + nni_reap(&aio_reap_list, aio); } } @@ -549,41 +543,6 @@ nni_aio_expire_loop(void *unused) } } -static void -nni_aio_reap_loop(void *unused) -{ - NNI_ARG_UNUSED(unused); - - nni_thr_set_name(NULL, "nng:aio:reap"); - - nni_mtx_lock(&nni_aio_reap_lk); - - for (;;) { - nni_aio *aio; - - if ((aio = nni_aio_reap_list) == NULL) { - if (nni_aio_reap_exit) { - break; - } - - nni_cv_wait(&nni_aio_reap_cv); - continue; - } - nni_aio_reap_list = NULL; - nni_mtx_unlock(&nni_aio_reap_lk); - - while (aio != NULL) { - nni_aio *old = aio; - aio = aio->a_reap_next; - nni_aio_free(old); - } - - nni_mtx_lock(&nni_aio_reap_lk); - } - - nni_mtx_unlock(&nni_aio_reap_lk); -} - void * nni_aio_get_prov_extra(nni_aio *aio, unsigned index) { @@ -699,60 +658,40 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) void nni_aio_sys_fini(void) { - nni_mtx *mtx1 = &nni_aio_lk; - nni_cv * cv1 = &nni_aio_expire_cv; - nni_thr *thr1 = &nni_aio_expire_thr; - nni_mtx *mtx2 = &nni_aio_reap_lk; - nni_cv * cv2 = &nni_aio_reap_cv; - nni_thr *thr2 = &nni_aio_reap_thr; + nni_mtx *mtx = &nni_aio_lk; + nni_cv * cv = &nni_aio_expire_cv; + nni_thr *thr = &nni_aio_expire_thr; if (!nni_aio_expire_exit) { - nni_mtx_lock(mtx1); + nni_mtx_lock(mtx); nni_aio_expire_exit = true; - nni_cv_wake(cv1); - nni_mtx_unlock(mtx1); + nni_cv_wake(cv); + nni_mtx_unlock(mtx); } - if (!nni_aio_reap_exit) { - nni_mtx_lock(mtx2); - nni_aio_reap_exit = true; - nni_cv_wake(cv2); - nni_mtx_unlock(mtx2); - } - - nni_thr_fini(thr1); - nni_cv_fini(cv1); - nni_mtx_fini(mtx1); - - nni_thr_fini(thr2); - nni_cv_fini(cv2); - nni_mtx_fini(mtx2); + nni_thr_fini(thr); + nni_cv_fini(cv); + nni_mtx_fini(mtx); } int nni_aio_sys_init(void) { - int rv, rv1, rv2; - nni_thr *thr1 = &nni_aio_expire_thr; - nni_thr *thr2 = &nni_aio_reap_thr; + int rv; + nni_thr *thr = &nni_aio_expire_thr; NNI_LIST_INIT(&nni_aio_expire_list, nni_aio, a_expire_node); nni_mtx_init(&nni_aio_lk); nni_cv_init(&nni_aio_expire_cv, &nni_aio_lk); - nni_mtx_init(&nni_aio_reap_lk); - nni_cv_init(&nni_aio_reap_cv, &nni_aio_reap_lk); nni_aio_expire_exit = false; - nni_aio_reap_exit = false; - rv1 = nni_thr_init(thr1, nni_aio_expire_loop, NULL); - rv2 = nni_thr_init(thr2, nni_aio_reap_loop, NULL); - if (((rv = rv1) != 0) || ((rv = rv2) != 0)) { + rv = nni_thr_init(thr, nni_aio_expire_loop, NULL); + if (rv != 0) { nni_aio_sys_fini(); return (rv); } - nni_thr_run(thr1); - nni_thr_run(thr2); + nni_thr_run(thr); return (0); } diff --git a/src/core/aio.h b/src/core/aio.h index dbe7fbb9..e108e4b8 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -13,6 +13,7 @@ #include "core/defs.h" #include "core/list.h" +#include "core/reap.h" #include "core/taskq.h" #include "core/thread.h" @@ -200,7 +201,7 @@ struct nng_aio { void * a_prov_extra[2]; // Extra data used by provider nni_list_node a_expire_node; // Expiration node - struct nng_aio *a_reap_next; + nni_reap_node a_reap_node; }; #endif // CORE_AIO_H diff --git a/src/core/aio_test.c b/src/core/aio_test.c index acf3c129..01024af6 100644 --- a/src/core/aio_test.c +++ b/src/core/aio_test.c @@ -236,10 +236,10 @@ aio_sleep_cb(void *arg) void test_aio_reap(void) { - nng_aio *a; + static nng_aio *a; NUTS_PASS(nng_aio_alloc(&a, aio_sleep_cb, &a)); nng_sleep_aio(10, a); - nng_msleep(20); + nng_msleep(100); } NUTS_TESTS = { diff --git a/src/core/dialer.c b/src/core/dialer.c index 3efbcabd..1f9a5e58 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -326,12 +326,16 @@ nni_dialer_hold(nni_dialer *d) void nni_dialer_rele(nni_dialer *d) { + bool reap; + nni_mtx_lock(&dialers_lk); d->d_ref--; - if ((d->d_ref == 0) && (d->d_closed)) { - nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d); - } + reap = ((d->d_ref == 0) && (d->d_closed)); nni_mtx_unlock(&dialers_lk); + + if (reap) { + nni_dialer_reap(d); + } } void diff --git a/src/core/listener.c b/src/core/listener.c index 83882389..13d5c6a6 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -314,12 +314,15 @@ nni_listener_hold(nni_listener *l) void nni_listener_rele(nni_listener *l) { + bool reap; + nni_mtx_lock(&listeners_lk); l->l_ref--; - if ((l->l_ref == 0) && (l->l_closed)) { - nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l); - } + reap = ((l->l_ref == 0) && (l->l_closed)); nni_mtx_unlock(&listeners_lk); + if (reap) { + nni_listener_reap(l); + } } void diff --git a/src/core/pipe.c b/src/core/pipe.c index 4e767605..47304fe7 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -22,6 +22,13 @@ static nni_id_map pipes; static nni_mtx pipes_lk; +static void pipe_destroy(void *); + +static nni_reap_list pipe_reap_list = { + .rl_offset = offsetof(nni_pipe, p_reap), + .rl_func = pipe_destroy, +}; + int nni_pipe_sys_init(void) { @@ -43,8 +50,9 @@ nni_pipe_sys_fini(void) } static void -pipe_destroy(nni_pipe *p) +pipe_destroy(void *arg) { + nni_pipe *p = arg; if (p == NULL) { return; } @@ -158,7 +166,7 @@ nni_pipe_close(nni_pipe *p) p->p_tran_ops.p_close(p->p_tran_data); } - nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p); + nni_reap(&pipe_reap_list, p); } uint16_t @@ -201,25 +209,25 @@ pipe_stats_init(nni_pipe *p) .si_atomic = true, }; static const nni_stat_info tx_msgs_info = { - .si_name = "tx_msgs", - .si_desc = "messages sent", - .si_type = NNG_STAT_COUNTER, - .si_unit = NNG_UNIT_MESSAGES, - .si_atomic = true, + .si_name = "tx_msgs", + .si_desc = "messages sent", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_MESSAGES, + .si_atomic = true, }; static const nni_stat_info rx_bytes_info = { - .si_name = "rx_bytes", - .si_desc = "bytes received", - .si_type = NNG_STAT_COUNTER, - .si_unit = NNG_UNIT_BYTES, - .si_atomic = true, + .si_name = "rx_bytes", + .si_desc = "bytes received", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_BYTES, + .si_atomic = true, }; static const nni_stat_info tx_bytes_info = { - .si_name = "tx_bytes", - .si_desc = "bytes sent", - .si_type = NNG_STAT_COUNTER, - .si_unit = NNG_UNIT_BYTES, - .si_atomic = true, + .si_name = "tx_bytes", + .si_desc = "bytes sent", + .si_type = NNG_STAT_COUNTER, + .si_unit = NNG_UNIT_BYTES, + .si_atomic = true, }; nni_stat_init(&p->st_root, &root_info); @@ -294,9 +302,9 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) int nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tdata) { - int rv; - nni_tran * tran = d->d_tran; - nni_pipe * p; + int rv; + nni_tran *tran = d->d_tran; + nni_pipe *p; if ((rv = pipe_create(&p, d->d_sock, tran, tdata)) != 0) { return (rv); @@ -318,9 +326,9 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tdata) int nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tdata) { - int rv; - nni_tran * tran = l->l_tran; - nni_pipe * p; + int rv; + nni_tran *tran = l->l_tran; + nni_pipe *p; if ((rv = pipe_create(&p, l->l_sock, tran, tdata)) != 0) { return (rv); @@ -328,9 +336,9 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tdata) p->p_listener = l; #if NNG_ENABLE_STATS static const nni_stat_info listener_info = { - .si_name = "listener", - .si_desc = "listener for pipe", - .si_type = NNG_STAT_ID, + .si_name = "listener", + .si_desc = "listener for pipe", + .si_type = NNG_STAT_ID, }; pipe_stat_init(p, &p->st_ep_id, &listener_info); nni_stat_set_id(&p->st_ep_id, nni_listener_id(l)); @@ -361,12 +369,6 @@ nni_pipe_getopt( return (NNG_ENOTSUP); } -void * -nni_pipe_get_proto_data(nni_pipe *p) -{ - return (p->p_proto_data); -} - uint32_t nni_pipe_sock_id(nni_pipe *p) { @@ -385,7 +387,6 @@ nni_pipe_dialer_id(nni_pipe *p) return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0); } - void nni_pipe_add_stat(nni_pipe *p, nni_stat_item *item) { diff --git a/src/core/pipe.h b/src/core/pipe.h index 20d34e82..79c48500 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -42,10 +42,6 @@ extern uint16_t nni_pipe_peer(nni_pipe *); extern int nni_pipe_getopt( nni_pipe *, const char *, void *, size_t *, nni_opt_type); -// nni_pipe_get_proto_data gets the protocol private data set with the -// nni_pipe_set_proto_data function. No locking is performed. -extern void *nni_pipe_get_proto_data(nni_pipe *); - // nni_pipe_find finds a pipe given its ID. It places a hold on the // pipe, which must be released by the caller when it is done. extern int nni_pipe_find(nni_pipe **, uint32_t); diff --git a/src/core/reap.c b/src/core/reap.c index ddd2a06e..8be5ee12 100644 --- a/src/core/reap.c +++ b/src/core/reap.c @@ -14,53 +14,79 @@ #include <stdbool.h> -static nni_list reap_list; -static nni_mtx reap_mtx; -static nni_cv reap_cv; -static nni_cv reap_empty_cv; -static bool reap_exit = false; -static bool reap_empty = false; -static nni_thr reap_thr; +// New stuff. +static nni_reap_list *reap_list = NULL; +static nni_thr reap_thr; +static bool reap_exit; +static nni_mtx reap_mtx; +static bool reap_empty; +static nni_cv reap_work_cv; +static nni_cv reap_empty_cv; static void -reap_worker(void *notused) +reap_worker(void *unused) { - NNI_ARG_UNUSED(notused); + NNI_ARG_UNUSED(unused); + nni_thr_set_name(NULL, "nng:reap2"); - nni_thr_set_name(NULL, "nng:reap"); - - nni_mtx_lock(&reap_mtx); + nni_mtx_lock(&reap_mtx); for (;;) { - nni_reap_item *item; - while ((item = nni_list_first(&reap_list)) != NULL) { - nni_list_remove(&reap_list, item); - nni_mtx_unlock(&reap_mtx); + nni_reap_list *list; + bool reaped = false; - item->r_func(item->r_ptr); - nni_mtx_lock(&reap_mtx); - } + for (list = reap_list; list != NULL; list = list->rl_next) { + nni_reap_node *node; + size_t offset; + nni_cb func; - reap_empty = true; - nni_cv_wake(&reap_empty_cv); + if ((node = list->rl_nodes) == NULL) { + continue; + } - if (reap_exit) { - break; - } + reaped = true; + offset = list->rl_offset; + func = list->rl_func; + list->rl_nodes = NULL; - nni_cv_wait(&reap_cv); + // We process our list of nodes while not holding + // the lock. + nni_mtx_unlock(&reap_mtx); + while (node != NULL) { + void *ptr; + ptr = ((char *) node) - offset; + node = node->rn_next; + func(ptr); + } + nni_mtx_lock(&reap_mtx); + } + if (!reaped) { + reap_empty = true; + nni_cv_wake(&reap_empty_cv); + if (reap_exit) { + nni_mtx_unlock(&reap_mtx); + return; + } + nni_cv_wait(&reap_work_cv); + } } - nni_mtx_unlock(&reap_mtx); } void -nni_reap(nni_reap_item *item, nni_cb func, void *ptr) +nni_reap(nni_reap_list *rl, void *item) { + nni_reap_node *node; + nni_mtx_lock(&reap_mtx); - item->r_func = func; - item->r_ptr = ptr; - nni_list_append(&reap_list, item); - reap_empty = false; - nni_cv_wake(&reap_cv); + if (!rl->rl_inited) { + rl->rl_inited = true; + rl->rl_next = reap_list; + reap_list = rl; + } + reap_empty = false; + node = (void *) ((char *) item + rl->rl_offset); + node->rn_next = rl->rl_nodes; + rl->rl_nodes = node; + nni_cv_wake1(&reap_work_cv); nni_mtx_unlock(&reap_mtx); } @@ -79,16 +105,15 @@ nni_reap_sys_init(void) { int rv; - NNI_LIST_INIT(&reap_list, nni_reap_item, r_link); + reap_exit = false; nni_mtx_init(&reap_mtx); - nni_cv_init(&reap_cv, &reap_mtx); + nni_cv_init(&reap_work_cv, &reap_mtx); nni_cv_init(&reap_empty_cv, &reap_mtx); - reap_exit = false; // If this fails, we don't fail init, instead we will try to // start up at reap time. if ((rv = nni_thr_init(&reap_thr, reap_worker, NULL)) != 0) { - nni_cv_fini(&reap_cv); + nni_cv_fini(&reap_work_cv); nni_cv_fini(&reap_empty_cv); nni_mtx_fini(&reap_mtx); return (rv); @@ -102,7 +127,10 @@ nni_reap_sys_fini(void) { nni_mtx_lock(&reap_mtx); reap_exit = true; - nni_cv_wake(&reap_cv); + nni_cv_wake1(&reap_work_cv); nni_mtx_unlock(&reap_mtx); nni_thr_fini(&reap_thr); + + // NB: The subsystem linkages remain in place. We don't need + // to reinitialize them across future initializations. } diff --git a/src/core/reap.h b/src/core/reap.h index 40c27a5d..5f631885 100644 --- a/src/core/reap.h +++ b/src/core/reap.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -14,19 +14,35 @@ #include "core/defs.h" #include "core/list.h" -// nni_reap_item is defined here so that it can be inlined into -// structures. Callers must access its members directly. -typedef struct nni_reap_item { - nni_list_node r_link; - void * r_ptr; - nni_cb r_func; -} nni_reap_item; +// nni_reap_node is to be inserted inline into structures +// for subsystems that wish to support deferred reaping. +// It should be zeroed at object initialization, but apart +// from that it must not be touched directly except by the +// reap subsystem. +typedef struct nni_reap_node nni_reap_node; +struct nni_reap_node { + nni_reap_node *rn_next; +}; + +// nni_reap_list is for subsystems to define their own reap lists. +// This allows for the reap linkage to be restricted to a single +// pointer. Subsystems should initialize rl_offset and rl_func, +// and leave the rest zeroed. The intention is that this is a global +// static member for each subsystem. +typedef struct nni_reap_list nni_reap_list; +struct nni_reap_list { + nni_reap_list *rl_next; // linkage in global reap list + nni_reap_node *rl_nodes; // list of nodes to reap + size_t rl_offset; // offset of reap_node within member. + nni_cb rl_func; // function called to reap the item + bool rl_inited; // initialized means it is linked in the list +}; // nni_reap performs an asynchronous reap of an item. This allows functions // it calls to acquire locks or resources without worrying about deadlocks // (such as from a completion callback.) The called function should avoid // blocking for too long if possible, since only one reap thread is present -// in the system. The intended usage is for an nni_reap_item to be a member +// in the system. The intended usage is for an nni_reap_node to be a member // of the structure to be reaped, and and then this function is called to // finalize it. // @@ -35,10 +51,14 @@ typedef struct nni_reap_item { // is busy. These will be queued at the end of the reap list. This will // allow a dependency to defer reaping until its dependents have first been // reaped. HOWEVER, it is important that the item in question actually be -// part of a fully reapable graph; otherwise this can lead to an infinite +// part of a fully reap-able graph; otherwise this can lead to an infinite // loop in the reap thread. -extern void nni_reap(nni_reap_item *, nni_cb, void *); + +extern void nni_reap(nni_reap_list *, void *); + +// nni_reap_drain waits for the reap queue to be drained. extern void nni_reap_drain(void); + extern int nni_reap_sys_init(void); extern void nni_reap_sys_fini(void); diff --git a/src/core/socket.c b/src/core/socket.c index f741b6f0..82267287 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1581,10 +1581,18 @@ nni_dialer_shutdown(nni_dialer *d) nni_mtx_unlock(&s->s_mx); } -void -nni_dialer_reap(nni_dialer *d) +static void dialer_reap(void *); + +static nni_reap_list dialer_reap_list = { + .rl_offset = offsetof(nni_dialer, d_reap), + .rl_func = dialer_reap, +}; + +static void +dialer_reap(void *arg) { - nni_sock *s = d->d_sock; + nni_dialer *d = arg; + nni_sock * s = d->d_sock; nni_aio_stop(&d->d_tmo_aio); nni_aio_stop(&d->d_con_aio); @@ -1602,7 +1610,7 @@ nni_dialer_reap(nni_dialer *d) } nni_mtx_unlock(&s->s_mx); // Go back to the end of reap list. - nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d); + nni_dialer_reap(d); return; } @@ -1617,6 +1625,12 @@ nni_dialer_reap(nni_dialer *d) } void +nni_dialer_reap(nni_dialer *d) +{ + nni_reap(&dialer_reap_list, d); +} + +void nni_listener_add_pipe(nni_listener *l, void *tpipe) { nni_sock *s = l->l_sock; @@ -1703,10 +1717,18 @@ nni_listener_shutdown(nni_listener *l) nni_mtx_unlock(&s->s_mx); } -void -nni_listener_reap(nni_listener *l) +static void listener_reap(void *); + +static nni_reap_list listener_reap_list = { + .rl_offset = offsetof(nni_listener, l_reap), + .rl_func = listener_reap, +}; + +static void +listener_reap(void *arg) { - nni_sock *s = l->l_sock; + nni_listener *l = arg; + nni_sock * s = l->l_sock; nni_aio_stop(&l->l_tmo_aio); nni_aio_stop(&l->l_acc_aio); @@ -1724,7 +1746,7 @@ nni_listener_reap(nni_listener *l) } nni_mtx_unlock(&s->s_mx); // Go back to the end of reap list. - nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l); + nni_reap(&listener_reap_list, l); return; } @@ -1739,6 +1761,12 @@ nni_listener_reap(nni_listener *l) } void +nni_listener_reap(nni_listener *l) +{ + nni_reap(&listener_reap_list, l); +} + +void nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) { nni_sock * s = p->p_sock; diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 804fa00c..638a3bfd 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -37,7 +37,7 @@ struct nni_dialer { nni_duration d_currtime; // current time for reconnect nni_duration d_inirtime; // initial time for reconnect nni_time d_conntime; // time of last good connect - nni_reap_item d_reap; + nni_reap_node d_reap; #ifdef NNG_ENABLE_STATS nni_stat_item st_root; @@ -73,7 +73,7 @@ struct nni_listener { nni_list l_pipes; nni_aio l_acc_aio; nni_aio l_tmo_aio; - nni_reap_item l_reap; + nni_reap_node l_reap; #ifdef NNG_ENABLE_STATS nni_stat_item st_root; @@ -111,7 +111,7 @@ struct nni_pipe { int p_ref; nni_mtx p_mtx; nni_cv p_cv; - nni_reap_item p_reap; + nni_reap_node p_reap; #ifdef NNG_ENABLE_STATS nni_stat_item st_root; @@ -122,7 +122,6 @@ struct nni_pipe { nni_stat_item st_tx_msgs; nni_stat_item st_rx_bytes; nni_stat_item st_tx_bytes; - #endif }; diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h index 2c2c9af7..4ef5fa80 100644 --- a/src/platform/posix/posix_ipc.h +++ b/src/platform/posix/posix_ipc.h @@ -30,7 +30,7 @@ struct nni_ipc_conn { nni_aio * dial_aio; nni_ipc_dialer *dialer; nng_sockaddr sa; - nni_reap_item reap; + nni_reap_node reap; }; struct nni_ipc_dialer { diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c index 825ecd4b..04eddd5f 100644 --- a/src/platform/posix/posix_ipcconn.c +++ b/src/platform/posix/posix_ipcconn.c @@ -493,11 +493,15 @@ ipc_reap(void *arg) NNI_FREE_STRUCT(c); } +static nni_reap_list ipc_reap_list = { + .rl_offset = offsetof(ipc_conn, reap), + .rl_func = ipc_reap, +}; static void ipc_free(void *arg) { ipc_conn *c = arg; - nni_reap(&c->reap, ipc_reap, c); + nni_reap(&ipc_reap_list, c); } static const nni_option ipc_options[] = { diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h index 87312dff..a8d1308e 100644 --- a/src/platform/posix/posix_tcp.h +++ b/src/platform/posix/posix_tcp.h @@ -25,7 +25,7 @@ struct nni_tcp_conn { nni_mtx mtx; nni_aio * dial_aio; nni_tcp_dialer *dialer; - nni_reap_item reap; + nni_reap_node reap; }; struct nni_tcp_dialer { diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index 9684fe27..2494b05e 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -217,11 +217,15 @@ tcp_fini(void *arg) NNI_FREE_STRUCT(c); } +static nni_reap_list tcp_reap_list = { + .rl_offset = offsetof(nni_tcp_conn, reap), + .rl_func = tcp_fini, +}; static void tcp_free(void *arg) { nni_tcp_conn *c = arg; - nni_reap(&c->reap, tcp_fini, arg); + nni_reap(&tcp_reap_list, c); } static void @@ -336,7 +340,7 @@ tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t) nni_tcp_conn * c = arg; struct sockaddr_storage ss; socklen_t len = sizeof(ss); - int fd = nni_posix_pfd_fd(c->pfd); + int fd = nni_posix_pfd_fd(c->pfd); int rv; nng_sockaddr sa; @@ -355,7 +359,7 @@ tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t) nni_tcp_conn * c = arg; struct sockaddr_storage ss; socklen_t len = sizeof(ss); - int fd = nni_posix_pfd_fd(c->pfd); + int fd = nni_posix_pfd_fd(c->pfd); int rv; nng_sockaddr sa; diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c index b8dc62d3..135a961e 100644 --- a/src/platform/windows/win_ipcconn.c +++ b/src/platform/windows/win_ipcconn.c @@ -34,7 +34,7 @@ typedef struct ipc_conn { bool closed; nni_mtx mtx; nni_cv cv; - nni_reap_item reap; + nni_reap_node reap; } ipc_conn; static void @@ -309,8 +309,10 @@ ipc_close(void *arg) } static void -ipc_conn_reap(ipc_conn *c) +ipc_conn_reap(void *arg) { + ipc_conn *c = arg; + nni_mtx_lock(&c->mtx); while ((!nni_list_empty(&c->recv_aios)) || (!nni_list_empty(&c->send_aios))) { @@ -330,13 +332,18 @@ ipc_conn_reap(ipc_conn *c) NNI_FREE_STRUCT(c); } +static nni_reap_list ipc_reap_list = { + .rl_offset = offsetof(ipc_conn, reap), + .rl_func = ipc_conn_reap, +}; + static void ipc_free(void *arg) { ipc_conn *c = arg; ipc_close(c); - nni_reap(&c->reap, (nni_cb) ipc_conn_reap, CONN(c)); + nni_reap(&ipc_reap_list, c); } static int diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c index 12ebdca6..3353d380 100644 --- a/src/platform/windows/win_tcpdial.c +++ b/src/platform/windows/win_tcpdial.c @@ -25,7 +25,7 @@ struct nni_tcp_dialer { SOCKADDR_STORAGE src; // source address size_t srclen; nni_mtx mtx; - nni_reap_item reap; + nni_reap_node reap; }; int @@ -88,6 +88,11 @@ nni_tcp_dialer_close(nni_tcp_dialer *d) nni_mtx_unlock(&d->mtx); } +static nni_reap_list tcp_dialer_reap_list = { + .rl_offset = offsetof(nni_tcp_dialer, reap), + .rl_func = (nni_cb) nni_tcp_dialer_fini, +}; + void nni_tcp_dialer_fini(nni_tcp_dialer *d) { @@ -95,7 +100,7 @@ nni_tcp_dialer_fini(nni_tcp_dialer *d) nni_mtx_lock(&d->mtx); if (!nni_list_empty(&d->aios)) { nni_mtx_unlock(&d->mtx); - nni_reap(&d->reap, (nni_cb) nni_tcp_dialer_fini, d); + nni_reap(&tcp_dialer_reap_list, nni_tcp_dialer_fini); return; } nni_mtx_unlock(&d->mtx); @@ -164,7 +169,7 @@ tcp_dial_cb(nni_win_io *io, int rv, size_t cnt) (void) setsockopt( c->s, IPPROTO_TCP, TCP_NODELAY, (char *) &nd, sizeof(nd)); - len = sizeof (SOCKADDR_STORAGE); + len = sizeof(SOCKADDR_STORAGE); (void) getsockname(c->s, (SOCKADDR *) &c->sockname, &len); nni_aio_set_output(aio, 0, c); diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c index 9c5fec3c..1e87fd31 100644 --- a/src/platform/windows/win_tcplisten.c +++ b/src/platform/windows/win_tcplisten.c @@ -28,7 +28,7 @@ struct nni_tcp_listener { LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs; SOCKADDR_STORAGE ss; nni_mtx mtx; - nni_reap_item reap; + nni_reap_node reap; }; // tcp_listener_funcs looks up function pointers we need for advanced accept @@ -184,6 +184,11 @@ nni_tcp_listener_close(nni_tcp_listener *l) nni_mtx_unlock(&l->mtx); } +static nni_reap_list tcp_listener_reap_list = { + .rl_offset = offsetof(nni_tcp_listener, reap), + .rl_func = (nni_cb) nni_tcp_listener_fini, +}; + void nni_tcp_listener_fini(nni_tcp_listener *l) { @@ -191,7 +196,7 @@ nni_tcp_listener_fini(nni_tcp_listener *l) nni_mtx_lock(&l->mtx); if (!nni_list_empty(&l->aios)) { nni_mtx_unlock(&l->mtx); - nni_reap(&l->reap, (nni_cb) nni_tcp_listener_fini, l); + nni_reap(&tcp_listener_reap_list, l); return; } nni_mtx_unlock(&l->mtx); @@ -302,7 +307,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) return; } - // Windows requires us to explicity create the socket before + // Windows requires us to explicitly create the socket before // calling accept on it. if ((s = socket(l->ss.ss_family, SOCK_STREAM, 0)) == INVALID_SOCKET) { rv = nni_win_error(GetLastError()); diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index e1f51e52..1ebc8907 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -64,7 +64,7 @@ typedef struct http_sconn { nni_aio * rxaio; nni_aio * txaio; nni_aio * txdataio; - nni_reap_item reap; + nni_reap_node reap; } http_sconn; typedef struct http_error { @@ -90,7 +90,21 @@ struct nng_http_server { char * hostname; nni_list errors; nni_mtx errors_mtx; - nni_reap_item reap; + nni_reap_node reap; +}; + +static void http_sc_reap(void *); + +static nni_reap_list http_sc_reap_list = { + .rl_offset = offsetof(http_sconn, reap), + .rl_func = http_sc_reap, +}; + +static void http_server_fini(nni_http_server *); + +static nni_reap_list http_server_reap_list = { + .rl_offset = offsetof(nni_http_server, reap), + .rl_func = (nni_cb) http_server_fini, }; int @@ -269,7 +283,7 @@ static nni_list http_servers; static nni_mtx http_servers_lk; static void -http_sconn_reap(void *arg) +http_sc_reap(void *arg) { http_sconn * sc = arg; nni_http_server *s = sc->server; @@ -304,7 +318,7 @@ http_sconn_reap(void *arg) } static void -http_sconn_close_locked(http_sconn *sc) +http_sc_close_locked(http_sconn *sc) { nni_http_conn *conn; @@ -322,7 +336,7 @@ http_sconn_close_locked(http_sconn *sc) if ((conn = sc->conn) != NULL) { nni_http_conn_close(conn); } - nni_reap(&sc->reap, http_sconn_reap, sc); + nni_reap(&http_sc_reap_list, sc); } static void @@ -332,7 +346,7 @@ http_sconn_close(http_sconn *sc) s = sc->server; nni_mtx_lock(&s->mtx); - http_sconn_close_locked(sc); + http_sc_close_locked(sc); nni_mtx_unlock(&s->mtx); } @@ -704,7 +718,7 @@ http_sconn_rxdone(void *arg) if ((h->getbody) && ((cls = nni_http_req_get_header(req, "Content-Length")) != NULL)) { uint64_t len; - char *end; + char * end; len = strtoull(cls, &end, 10); if ((end == NULL) || (*end != '\0') || (len > h->maxbody)) { @@ -900,9 +914,9 @@ http_server_fini(nni_http_server *s) nni_mtx_lock(&s->mtx); if (!nni_list_empty(&s->conns)) { - // Try to reap later, after the sconns are done reaping. - // (Note, sconns will all have been closed already.) - nni_reap(&s->reap, (nni_cb) http_server_fini, s); + // Try to reap later, after the connections are done reaping. + // (Note, connections will all have been closed already.) + nni_reap(&http_server_reap_list, s); nni_mtx_unlock(&s->mtx); return; } @@ -1059,7 +1073,7 @@ http_server_stop(nni_http_server *s) // Stopping the server is a hard stop -- it aborts any work // being done by clients. (No graceful shutdown). NNI_LIST_FOREACH (&s->conns, sc) { - http_sconn_close_locked(sc); + http_sc_close_locked(sc); } while (!nni_list_empty(&s->conns)) { @@ -1904,7 +1918,7 @@ nni_http_server_fini(nni_http_server *s) http_server_stop(s); nni_mtx_unlock(&s->mtx); nni_list_remove(&http_servers, s); - nni_reap(&s->reap, (nni_cb) http_server_fini, s); + nni_reap(&http_server_reap_list, s); } nni_mtx_unlock(&http_servers_lk); } diff --git a/src/supplemental/tls/tls_common.c b/src/supplemental/tls/tls_common.c index e23fb4d8..6e5e6757 100644 --- a/src/supplemental/tls/tls_common.c +++ b/src/supplemental/tls/tls_common.c @@ -78,7 +78,7 @@ typedef struct { size_t tcp_send_len; size_t tcp_send_head; size_t tcp_send_tail; - struct nni_reap_item reap; + nni_reap_node reap; // ... engine connection data follows } tls_conn; @@ -89,10 +89,16 @@ static void tls_do_send(tls_conn *); static void tls_do_recv(tls_conn *); static void tls_tcp_send_start(tls_conn *); static void tls_free(void *); +static void tls_reap(void *); static int tls_alloc(tls_conn **, nng_tls_config *, nng_aio *); static int tls_start(tls_conn *, nng_stream *); static void tls_tcp_error(tls_conn *, int); +static nni_reap_list tls_conn_reap_list = { + .rl_offset = offsetof(tls_conn, reap), + .rl_func = tls_reap, +}; + typedef struct { nng_stream_dialer ops; nng_stream_dialer *d; // underlying TCP dialer @@ -331,8 +337,7 @@ static const nni_option tls_dialer_opts[] = { }; static int -tls_dialer_get( - void *arg, const char *name, void *buf, size_t *szp, nni_type t) +tls_dialer_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t) { tls_dialer *d = arg; int rv; @@ -873,7 +878,7 @@ tls_free(void *arg) { tls_conn *conn = arg; - nni_reap(&conn->reap, tls_reap, conn); + nni_reap(&tls_conn_reap_list, conn); } static int @@ -1501,6 +1506,7 @@ nni_tls_sys_init(void) void nni_tls_sys_fini(void) { + nni_reap_drain(); NNG_TLS_ENGINE_FINI(); } diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 20adf626..daf1be13 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -47,7 +47,7 @@ typedef struct ws_header { struct nni_ws { nng_stream ops; nni_list_node node; - nni_reap_item reap; + nni_reap_node reap; bool server; bool closed; bool ready; @@ -1251,10 +1251,15 @@ ws_fini(void *arg) NNI_FREE_STRUCT(ws); } +static nni_reap_list ws_reap_list = { + .rl_offset = offsetof(nni_ws, reap), + .rl_func = ws_fini, +}; + static void ws_reap(nni_ws *ws) { - nni_reap(&ws->reap, ws_fini, ws); + nni_reap(&ws_reap_list, ws); } static void @@ -2656,7 +2661,7 @@ static void ws_str_free(void *arg) { nni_ws *ws = arg; - nni_reap(&ws->reap, ws_fini, ws); + ws_reap(ws); } static void diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 9e75f13a..efaa823c 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -36,7 +36,7 @@ struct ipc_pipe { nni_pipe * pipe; nni_list_node node; nni_atomic_flag reaped; - nni_reap_item reap; + nni_reap_node reap; uint8_t tx_head[1 + sizeof(uint64_t)]; uint8_t rx_head[1 + sizeof(uint64_t)]; size_t got_tx_head; @@ -69,7 +69,7 @@ struct ipc_ep { nni_list busy_pipes; // busy pipes -- ones passed to socket nni_list wait_pipes; // pipes waiting to match to socket nni_list neg_pipes; // pipes busy negotiating - nni_reap_item reap; + nni_reap_node reap; #ifdef NNG_ENABLE_STATS nni_stat_item st_rcv_max; #endif @@ -80,8 +80,19 @@ static void ipc_pipe_recv_start(ipc_pipe *p); static void ipc_pipe_send_cb(void *); static void ipc_pipe_recv_cb(void *); static void ipc_pipe_neg_cb(void *); +static void ipc_pipe_fini(void *); static void ipc_ep_fini(void *); +static nni_reap_list ipc_ep_reap_list = { + .rl_offset = offsetof(ipc_ep, reap), + .rl_func = ipc_ep_fini, +}; + +static nni_reap_list ipc_pipe_reap_list = { + .rl_offset = offsetof(ipc_pipe, reap), + .rl_func = ipc_pipe_fini, +}; + static int ipc_tran_init(void) { @@ -139,7 +150,7 @@ ipc_pipe_fini(void *arg) nni_list_node_remove(&p->node); ep->ref_cnt--; if (ep->fini && (ep->ref_cnt == 0)) { - nni_reap(&ep->reap, ipc_ep_fini, ep); + nni_reap(&ipc_ep_reap_list, ep); } nni_mtx_unlock(&ep->mtx); } @@ -161,7 +172,7 @@ ipc_pipe_reap(ipc_pipe *p) if (p->conn != NULL) { nng_stream_close(p->conn); } - nni_reap(&p->reap, ipc_pipe_fini, p); + nni_reap(&ipc_pipe_reap_list, p); } } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index cf0bfeb1..524c6988 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -32,7 +32,7 @@ struct tcptran_pipe { nni_list_node node; tcptran_ep * ep; nni_atomic_flag reaped; - nni_reap_item reap; + nni_reap_node reap; uint8_t txlen[sizeof(uint64_t)]; uint8_t rxlen[sizeof(uint64_t)]; size_t gottxhead; @@ -65,7 +65,7 @@ struct tcptran_ep { nni_list busypipes; // busy pipes -- ones passed to socket nni_list waitpipes; // pipes waiting to match to socket nni_list negopipes; // pipes busy negotiating - nni_reap_item reap; + nni_reap_node reap; nng_stream_dialer * dialer; nng_stream_listener *listener; @@ -80,6 +80,17 @@ static void tcptran_pipe_send_cb(void *); static void tcptran_pipe_recv_cb(void *); static void tcptran_pipe_nego_cb(void *); static void tcptran_ep_fini(void *); +static void tcptran_pipe_fini(void *); + +static nni_reap_list tcptran_ep_reap_list = { + .rl_offset = offsetof(tcptran_ep, reap), + .rl_func = tcptran_ep_fini, +}; + +static nni_reap_list tcptran_pipe_reap_list = { + .rl_offset = offsetof (tcptran_pipe, reap), + .rl_func = tcptran_pipe_fini, +}; static int tcptran_init(void) @@ -139,7 +150,7 @@ tcptran_pipe_fini(void *arg) nni_list_node_remove(&p->node); ep->refcnt--; if (ep->fini && (ep->refcnt == 0)) { - nni_reap(&ep->reap, tcptran_ep_fini, ep); + nni_reap(&tcptran_ep_reap_list, ep); } nni_mtx_unlock(&ep->mtx); } @@ -160,7 +171,7 @@ tcptran_pipe_reap(tcptran_pipe *p) if (p->conn != NULL) { nng_stream_close(p->conn); } - nni_reap(&p->reap, tcptran_pipe_fini, p); + nni_reap(&tcptran_pipe_reap_list, p); } } diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 62393d22..b6623733 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -40,7 +40,7 @@ struct tlstran_pipe { tlstran_ep * ep; nni_sockaddr sa; nni_atomic_flag reaped; - nni_reap_item reap; + nni_reap_node reap; uint8_t txlen[sizeof(uint64_t)]; uint8_t rxlen[sizeof(uint64_t)]; size_t gottxhead; @@ -66,7 +66,7 @@ struct tlstran_ep { int authmode; nni_url * url; nni_list pipes; - nni_reap_item reap; + nni_reap_node reap; nng_stream_dialer * dialer; nng_stream_listener *listener; nni_aio * useraio; @@ -87,6 +87,17 @@ static void tlstran_pipe_send_cb(void *); static void tlstran_pipe_recv_cb(void *); static void tlstran_pipe_nego_cb(void *); static void tlstran_ep_fini(void *); +static void tlstran_pipe_fini(void *); + +static nni_reap_list tlstran_ep_reap_list = { + .rl_offset = offsetof(tlstran_ep, reap), + .rl_func = tlstran_ep_fini, +}; + +static nni_reap_list tlstran_pipe_reap_list = { + .rl_offset = offsetof(tlstran_pipe, reap), + .rl_func = tlstran_pipe_fini, +}; static int tlstran_init(void) @@ -141,7 +152,7 @@ tlstran_pipe_fini(void *arg) nni_list_node_remove(&p->node); ep->refcnt--; if (ep->fini && (ep->refcnt == 0)) { - nni_reap(&ep->reap, tlstran_ep_fini, ep); + nni_reap(&tlstran_ep_reap_list, ep); } nni_mtx_unlock(&ep->mtx); } @@ -186,7 +197,7 @@ tlstran_pipe_reap(tlstran_pipe *p) if (p->tls != NULL) { nng_stream_close(p->tls); } - nni_reap(&p->reap, tlstran_pipe_fini, p); + nni_reap(&tlstran_pipe_reap_list, p); } } @@ -952,9 +963,9 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener) if ((rv != 0) || ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) || - ((rv = nni_stream_listener_set(ep->listener, - NNG_OPT_TLS_AUTH_MODE, &ep->authmode, sizeof(ep->authmode), - NNI_TYPE_INT32)) != 0)) { + ((rv = nni_stream_listener_set(ep->listener, NNG_OPT_TLS_AUTH_MODE, + &ep->authmode, sizeof(ep->authmode), NNI_TYPE_INT32)) != + 0)) { tlstran_ep_fini(ep); return (rv); } |
