From 846c30081a67e961b4a060bdca192ddafb87cce9 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 17 May 2018 20:41:06 -0700 Subject: fixes #451 task finalization could be better/smarter (resolver) This changes nni_task_fini to always run synchronously, waiting for the task to finish before cleaning up. Much simpler code. Additionally, we've refactored the resolver code to avoid the use of taskqs, which added complexity and inefficiency. The approach of just allocating its own threads and a work queue to process them turns out to be vastly simpler, and actually reduces extra allocations and context switches. wip POSIX resolv threads. (Taskqs are just overhead and complexity here.) Windows resolver changes. Task cleanup. fix up windows mutex. --- src/core/taskq.c | 22 +--- src/platform/posix/posix_resolv_gai.c | 155 +++++++++++++++++---------- src/platform/windows/win_resolv.c | 193 +++++++++++++++++++--------------- 3 files changed, 209 insertions(+), 161 deletions(-) diff --git a/src/core/taskq.c b/src/core/taskq.c index 15351840..f0712e59 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -18,7 +18,6 @@ struct nni_task { nni_taskq * task_tq; unsigned task_busy; bool task_prep; - bool task_fini; nni_mtx task_mtx; nni_cv task_cv; }; @@ -56,14 +55,6 @@ nni_taskq_thread(void *self) nni_mtx_lock(&task->task_mtx); task->task_busy--; if (task->task_busy == 0) { - if (task->task_fini) { - task->task_fini = false; - nni_mtx_unlock(&task->task_mtx); - nni_task_fini(task); - - nni_mtx_lock(&tq->tq_mtx); - continue; - } nni_cv_wake(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); @@ -158,12 +149,6 @@ nni_task_exec(nni_task *task) nni_mtx_lock(&task->task_mtx); task->task_busy--; if (task->task_busy == 0) { - if (task->task_fini) { - task->task_fini = false; - nni_mtx_unlock(&task->task_mtx); - nni_task_fini(task); - return; - } nni_cv_wake(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); @@ -238,11 +223,8 @@ nni_task_fini(nni_task *task) { NNI_ASSERT(!nni_list_node_active(&task->task_node)); nni_mtx_lock(&task->task_mtx); - if (task->task_busy) { - // destroy later. - task->task_fini = true; - nni_mtx_unlock(&task->task_mtx); - return; + while (task->task_busy) { + nni_cv_wait(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); nni_cv_fini(&task->task_cv); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index ad1d5147..dcaef409 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -37,8 +37,11 @@ #define NNG_POSIX_RESOLV_CONCURRENCY 4 #endif -static nni_taskq *resolv_tq = NULL; -static nni_mtx resolv_mtx; +static nni_mtx resolv_mtx; +static nni_cv resolv_cv; +static bool resolv_fini; +static nni_list resolv_aios; +static nni_thr resolv_thrs[NNG_POSIX_RESOLV_CONCURRENCY]; typedef struct resolv_item resolv_item; struct resolv_item { @@ -48,27 +51,9 @@ struct resolv_item { const char * serv; int proto; nni_aio * aio; - nni_task * task; nng_sockaddr sa; }; -static void -resolv_finish(resolv_item *item, int rv) -{ - nni_aio *aio; - - if (((aio = item->aio) != NULL) && - (nni_aio_get_prov_data(aio) == item)) { - nng_sockaddr *sa = nni_aio_get_input(aio, 0); - nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - memcpy(sa, &item->sa, sizeof(*sa)); - nni_aio_finish(aio, rv, 0); - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - } -} - static void resolv_cancel(nni_aio *aio, int rv) { @@ -76,17 +61,30 @@ resolv_cancel(nni_aio *aio, int rv) nni_mtx_lock(&resolv_mtx); if ((item = nni_aio_get_prov_data(aio)) == NULL) { + // Already canceled? nni_mtx_unlock(&resolv_mtx); return; } nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - nni_mtx_unlock(&resolv_mtx); + if (nni_aio_list_active(aio)) { + // We have not been picked up by a resolver thread yet, + // so we can just discard everything. + nni_aio_list_remove(aio); + nni_mtx_unlock(&resolv_mtx); + NNI_FREE_STRUCT(item); + } else { + // This case indicates the resolver is still processing our + // node. We can discard our interest in the result, but we + // can't interrupt the resolver itself. (Too bad, name + // resolution is utterly synchronous for now.) + item->aio = NULL; + nni_mtx_unlock(&resolv_mtx); + } nni_aio_finish_error(aio, rv); } static int -nni_posix_gai_errno(int rv) +posix_gai_errno(int rv) { switch (rv) { case 0: @@ -116,25 +114,14 @@ nni_posix_gai_errno(int rv) } } -static void -resolv_task(void *arg) +static int +resolv_task(resolv_item *item) { - resolv_item * item = arg; struct addrinfo hints; struct addrinfo *results; struct addrinfo *probe; int rv; - nni_mtx_lock(&resolv_mtx); - if (item->aio == NULL) { - nni_mtx_unlock(&resolv_mtx); - // Caller canceled, and no longer cares about this. - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - return; - } - nni_mtx_unlock(&resolv_mtx); - results = NULL; // We treat these all as IP addresses. The service and the @@ -152,7 +139,7 @@ resolv_task(void *arg) rv = getaddrinfo(item->name, item->serv, &hints, &results); if (rv != 0) { - rv = nni_posix_gai_errno(rv); + rv = posix_gai_errno(rv); goto done; } @@ -196,9 +183,7 @@ done: freeaddrinfo(results); } - nni_mtx_lock(&resolv_mtx); - resolv_finish(item, rv); - nni_mtx_unlock(&resolv_mtx); + return (rv); } static void @@ -232,13 +217,6 @@ resolv_ip(const char *host, const char *serv, int passive, int family, return; } - if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) != - 0) { - NNI_FREE_STRUCT(item); - nni_aio_finish_error(aio, rv); - return; - }; - // NB: host and serv must remain valid until this is completed. memset(&item->sa, 0, sizeof(item->sa)); item->passive = passive; @@ -249,14 +227,19 @@ resolv_ip(const char *host, const char *serv, int passive, int family, item->family = fam; nni_mtx_lock(&resolv_mtx); - if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) { + if (resolv_fini) { + rv = NNG_ECLOSED; + } else { + rv = nni_aio_schedule(aio, resolv_cancel, item); + } + if (rv != 0) { nni_mtx_unlock(&resolv_mtx); - nni_task_fini(item->task); NNI_FREE_STRUCT(item); nni_aio_finish_error(aio, rv); return; } - nni_task_dispatch(item->task); + nni_list_append(&resolv_aios, aio); + nni_cv_wake1(&resolv_cv); nni_mtx_unlock(&resolv_mtx); } @@ -274,27 +257,83 @@ nni_plat_udp_resolv( resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); } +void +resolv_worker(void *notused) +{ + + NNI_ARG_UNUSED(notused); + + nni_mtx_lock(&resolv_mtx); + for (;;) { + nni_aio * aio; + resolv_item *item; + int rv; + + if ((aio = nni_list_first(&resolv_aios)) == NULL) { + if (resolv_fini) { + break; + } + nni_cv_wait(&resolv_cv); + continue; + } + + item = nni_aio_get_prov_data(aio); + nni_aio_list_remove(aio); + + // Now attempt to do the work. This runs synchronously. + nni_mtx_unlock(&resolv_mtx); + rv = resolv_task(item); + nni_mtx_lock(&resolv_mtx); + + // Check to make sure we were not canceled. + if ((aio = item->aio) != NULL) { + nng_sockaddr *sa = nni_aio_get_input(aio, 0); + nni_aio_set_prov_data(aio, NULL); + item->aio = NULL; + memcpy(sa, &item->sa, sizeof(*sa)); + nni_aio_finish(aio, rv, 0); + + NNI_FREE_STRUCT(item); + } + } + nni_mtx_unlock(&resolv_mtx); +} + int nni_posix_resolv_sysinit(void) { - int rv; - nni_mtx_init(&resolv_mtx); + nni_cv_init(&resolv_cv, &resolv_mtx); + nni_aio_list_init(&resolv_aios); + + resolv_fini = false; - if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) { - nni_mtx_fini(&resolv_mtx); - return (rv); + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); + if (rv != 0) { + nni_posix_resolv_sysfini(); + return (rv); + } + } + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + nni_thr_run(&resolv_thrs[i]); } + return (0); } void nni_posix_resolv_sysfini(void) { - if (resolv_tq != NULL) { - nni_taskq_fini(resolv_tq); - resolv_tq = NULL; + nni_mtx_lock(&resolv_mtx); + resolv_fini = true; + nni_cv_wake(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); + + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + nni_thr_fini(&resolv_thrs[i]); } + nni_cv_fini(&resolv_cv); nni_mtx_fini(&resolv_mtx); } diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index 54f1c61c..c0fa1e0a 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -16,68 +16,57 @@ // with it, where looking up names in DNS can poison results for other // uses, because the asynchronous resolver *only* considers DNS -- ignoring // host file, WINS, or other naming services. As a result, we just build -// our own limited asynchronous using a taskq. - -// We use a single resolver taskq - but we allocate a few threads -// for it to ensure that names can be looked up concurrently. This isn't -// as elegant or scaleable as a true asynchronous resolver would be, but -// it has the advantage of being fairly portable, and concurrent enough for -// the vast, vast majority of use cases. The total thread count can be -// changed with this define. +// our own limited asynchronous resolver with threads. #ifndef NNG_WIN_RESOLV_CONCURRENCY #define NNG_WIN_RESOLV_CONCURRENCY 4 #endif -static nni_taskq *win_resolv_tq = NULL; -static nni_mtx win_resolv_mtx; +static nni_mtx resolv_mtx; +static nni_cv resolv_cv; +static bool resolv_fini; +static nni_list resolv_aios; +static nni_thr resolv_thrs[NNG_WIN_RESOLV_CONCURRENCY]; -typedef struct win_resolv_item win_resolv_item; -struct win_resolv_item { +typedef struct resolv_item resolv_item; +struct resolv_item { int family; int passive; const char * name; const char * serv; int proto; nni_aio * aio; - nni_task * task; nng_sockaddr sa; }; static void -win_resolv_finish(win_resolv_item *item, int rv) +resolv_cancel(nni_aio *aio, int rv) { - nni_aio *aio; - - if (((aio = item->aio) != NULL) && - (nni_aio_get_prov_data(aio) == item)) { - nni_sockaddr *sa = nni_aio_get_input(aio, 0); - nni_aio_set_prov_data(aio, NULL); - memcpy(sa, &item->sa, sizeof(*sa)); - nni_aio_finish(aio, rv, 0); - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - } -} + resolv_item *item; -static void -win_resolv_cancel(nni_aio *aio, int rv) -{ - win_resolv_item *item; - - nni_mtx_lock(&win_resolv_mtx); + nni_mtx_lock(&resolv_mtx); if ((item = nni_aio_get_prov_data(aio)) == NULL) { - nni_mtx_unlock(&win_resolv_mtx); + nni_mtx_unlock(&resolv_mtx); return; } nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - nni_mtx_unlock(&win_resolv_mtx); + if (nni_aio_list_active(aio)) { + // We have not been picked up by a resolver thread yet, + // so we can just discard everything. + nni_aio_list_remove(aio); + nni_mtx_unlock(&resolv_mtx); + NNI_FREE_STRUCT(item); + } else { + // Resolver still working, so just unlink our AIO to + // discard our interest in the results. + item->aio = NULL; + nni_mtx_unlock(&resolv_mtx); + } nni_aio_finish_error(aio, rv); } static int -win_gai_errno(int rv) +resolv_gai_errno(int rv) { switch (rv) { case 0: @@ -103,10 +92,9 @@ win_gai_errno(int rv) } } -static void -win_resolv_task(void *arg) +static int +resolv_task(resolv_item *item) { - win_resolv_item *item = arg; struct addrinfo hints; struct addrinfo *results; struct addrinfo *probe; @@ -114,16 +102,6 @@ win_resolv_task(void *arg) results = NULL; - nni_mtx_lock(&win_resolv_mtx); - if (item->aio == NULL) { - nni_mtx_unlock(&win_resolv_mtx); - // Caller canceled, and no longer cares about this. - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - return; - } - nni_mtx_unlock(&win_resolv_mtx); - // We treat these all as IP addresses. The service and the // host part are split. memset(&hints, 0, sizeof(hints)); @@ -136,7 +114,7 @@ win_resolv_task(void *arg) rv = getaddrinfo(item->name, item->serv, &hints, &results); if (rv != 0) { - rv = win_gai_errno(rv); + rv = resolv_gai_errno(rv); goto done; } @@ -179,18 +157,16 @@ done: if (results != NULL) { freeaddrinfo(results); } - nni_mtx_lock(&win_resolv_mtx); - win_resolv_finish(item, rv); - nni_mtx_unlock(&win_resolv_mtx); + return (rv); } static void -win_resolv_ip(const char *host, const char *serv, int passive, int family, +resolv_ip(const char *host, const char *serv, int passive, int family, int proto, nni_aio *aio) { - win_resolv_item *item; - int fam; - int rv; + resolv_item *item; + int fam; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -215,13 +191,6 @@ win_resolv_ip(const char *host, const char *serv, int passive, int family, return; } - rv = nni_task_init(&item->task, win_resolv_tq, win_resolv_task, item); - if (rv != 0) { - NNI_FREE_STRUCT(item); - nni_aio_finish_error(aio, rv); - return; - } - item->passive = passive; item->name = host; item->serv = serv; @@ -229,42 +198,96 @@ win_resolv_ip(const char *host, const char *serv, int passive, int family, item->aio = aio; item->family = fam; - nni_mtx_lock(&win_resolv_mtx); - if ((rv = nni_aio_schedule(aio, win_resolv_cancel, item)) != 0) { - nni_mtx_unlock(&win_resolv_mtx); - nni_task_fini(item->task); + nni_mtx_lock(&resolv_mtx); + if (resolv_fini) { + rv = NNG_ECLOSED; + } else { + rv = nni_aio_schedule(aio, resolv_cancel, item); + } + if (rv != 0) { + nni_mtx_unlock(&resolv_mtx); NNI_FREE_STRUCT(item); nni_aio_finish_error(aio, rv); return; } - nni_task_dispatch(item->task); - nni_mtx_unlock(&win_resolv_mtx); + nni_list_append(&resolv_aios, aio); + nni_cv_wake1(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); } void nni_plat_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); + resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void nni_plat_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); + resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); } -int -nni_win_resolv_sysinit(void) +void +resolv_worker(void *notused) { - int rv; - nni_mtx_init(&win_resolv_mtx); + NNI_ARG_UNUSED(notused); + + nni_mtx_lock(&resolv_mtx); + for (;;) { + nni_aio * aio; + resolv_item *item; + int rv; + + if ((aio = nni_list_first(&resolv_aios)) == NULL) { + if (resolv_fini) { + break; + } + nni_cv_wait(&resolv_cv); + continue; + } + + item = nni_aio_get_prov_data(aio); + nni_aio_list_remove(aio); + + // Now attempt to do the work. This runs synchronously. + nni_mtx_unlock(&resolv_mtx); + rv = resolv_task(item); + nni_mtx_lock(&resolv_mtx); - if ((rv = nni_taskq_init(&win_resolv_tq, 4)) != 0) { - nni_mtx_fini(&win_resolv_mtx); - return (rv); + // Check to make sure we were not canceled. + if ((aio = item->aio) != NULL) { + nng_sockaddr *sa = nni_aio_get_input(aio, 0); + nni_aio_set_prov_data(aio, NULL); + item->aio = NULL; + memcpy(sa, &item->sa, sizeof(*sa)); + nni_aio_finish(aio, rv, 0); + + NNI_FREE_STRUCT(item); + } + } + nni_mtx_unlock(&resolv_mtx); +} + +int +nni_win_resolv_sysinit(void) +{ + nni_mtx_init(&resolv_mtx); + nni_cv_init(&resolv_cv, &resolv_mtx); + nni_aio_list_init(&resolv_aios); + + resolv_fini = false; + for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) { + int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); + if (rv != 0) { + nni_win_resolv_sysfini(); + return (rv); + } + } + for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) { + nni_thr_run(&resolv_thrs[i]); } return (0); } @@ -272,11 +295,15 @@ nni_win_resolv_sysinit(void) void nni_win_resolv_sysfini(void) { - if (win_resolv_tq != NULL) { - nni_taskq_fini(win_resolv_tq); - win_resolv_tq = NULL; + nni_mtx_lock(&resolv_mtx); + resolv_fini = true; + nni_cv_wake(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); + for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) { + nni_thr_fini(&resolv_thrs[i]); } - nni_mtx_fini(&win_resolv_mtx); + nni_cv_fini(&resolv_cv); + nni_mtx_fini(&resolv_mtx); } #endif // NNG_PLATFORM_WINDOWS -- cgit v1.2.3-70-g09d2