diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-05-17 20:41:06 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-05-17 21:52:07 -0700 |
| commit | 846c30081a67e961b4a060bdca192ddafb87cce9 (patch) | |
| tree | c795cd6241979418f651c2e2d5e42091988591d6 /src/platform/posix | |
| parent | 109a559590abfe3017dd317c3068e2457188541c (diff) | |
| download | nng-846c30081a67e961b4a060bdca192ddafb87cce9.tar.gz nng-846c30081a67e961b4a060bdca192ddafb87cce9.tar.bz2 nng-846c30081a67e961b4a060bdca192ddafb87cce9.zip | |
fixes #451 task finalization could be better/smarter (resolver)
This changes nni_task_fini to always run synchronously, waiting
for the task to finish before cleaning up. Much simpler code.
Additionally, we've refactored the resolver code to avoid the
use of taskqs, which added complexity and inefficiency. The
approach of just allocating its own threads and a work queue
to process them turns out to be vastly simpler, and actually
reduces extra allocations and context switches.
wip
POSIX resolv threads.
(Taskqs are just overhead and complexity here.)
Windows resolver changes.
Task cleanup.
fix up windows mutex.
Diffstat (limited to 'src/platform/posix')
| -rw-r--r-- | src/platform/posix/posix_resolv_gai.c | 155 |
1 files changed, 97 insertions, 58 deletions
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index ad1d5147..dcaef409 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -37,8 +37,11 @@ #define NNG_POSIX_RESOLV_CONCURRENCY 4 #endif -static nni_taskq *resolv_tq = NULL; -static nni_mtx resolv_mtx; +static nni_mtx resolv_mtx; +static nni_cv resolv_cv; +static bool resolv_fini; +static nni_list resolv_aios; +static nni_thr resolv_thrs[NNG_POSIX_RESOLV_CONCURRENCY]; typedef struct resolv_item resolv_item; struct resolv_item { @@ -48,45 +51,40 @@ struct resolv_item { const char * serv; int proto; nni_aio * aio; - nni_task * task; nng_sockaddr sa; }; static void -resolv_finish(resolv_item *item, int rv) -{ - nni_aio *aio; - - if (((aio = item->aio) != NULL) && - (nni_aio_get_prov_data(aio) == item)) { - nng_sockaddr *sa = nni_aio_get_input(aio, 0); - nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - memcpy(sa, &item->sa, sizeof(*sa)); - nni_aio_finish(aio, rv, 0); - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - } -} - -static void resolv_cancel(nni_aio *aio, int rv) { resolv_item *item; nni_mtx_lock(&resolv_mtx); if ((item = nni_aio_get_prov_data(aio)) == NULL) { + // Already canceled? nni_mtx_unlock(&resolv_mtx); return; } nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - nni_mtx_unlock(&resolv_mtx); + if (nni_aio_list_active(aio)) { + // We have not been picked up by a resolver thread yet, + // so we can just discard everything. + nni_aio_list_remove(aio); + nni_mtx_unlock(&resolv_mtx); + NNI_FREE_STRUCT(item); + } else { + // This case indicates the resolver is still processing our + // node. We can discard our interest in the result, but we + // can't interrupt the resolver itself. (Too bad, name + // resolution is utterly synchronous for now.) + item->aio = NULL; + nni_mtx_unlock(&resolv_mtx); + } nni_aio_finish_error(aio, rv); } static int -nni_posix_gai_errno(int rv) +posix_gai_errno(int rv) { switch (rv) { case 0: @@ -116,25 +114,14 @@ nni_posix_gai_errno(int rv) } } -static void -resolv_task(void *arg) +static int +resolv_task(resolv_item *item) { - resolv_item * item = arg; struct addrinfo hints; struct addrinfo *results; struct addrinfo *probe; int rv; - nni_mtx_lock(&resolv_mtx); - if (item->aio == NULL) { - nni_mtx_unlock(&resolv_mtx); - // Caller canceled, and no longer cares about this. - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - return; - } - nni_mtx_unlock(&resolv_mtx); - results = NULL; // We treat these all as IP addresses. The service and the @@ -152,7 +139,7 @@ resolv_task(void *arg) rv = getaddrinfo(item->name, item->serv, &hints, &results); if (rv != 0) { - rv = nni_posix_gai_errno(rv); + rv = posix_gai_errno(rv); goto done; } @@ -196,9 +183,7 @@ done: freeaddrinfo(results); } - nni_mtx_lock(&resolv_mtx); - resolv_finish(item, rv); - nni_mtx_unlock(&resolv_mtx); + return (rv); } static void @@ -232,13 +217,6 @@ resolv_ip(const char *host, const char *serv, int passive, int family, return; } - if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) != - 0) { - NNI_FREE_STRUCT(item); - nni_aio_finish_error(aio, rv); - return; - }; - // NB: host and serv must remain valid until this is completed. memset(&item->sa, 0, sizeof(item->sa)); item->passive = passive; @@ -249,14 +227,19 @@ resolv_ip(const char *host, const char *serv, int passive, int family, item->family = fam; nni_mtx_lock(&resolv_mtx); - if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) { + if (resolv_fini) { + rv = NNG_ECLOSED; + } else { + rv = nni_aio_schedule(aio, resolv_cancel, item); + } + if (rv != 0) { nni_mtx_unlock(&resolv_mtx); - nni_task_fini(item->task); NNI_FREE_STRUCT(item); nni_aio_finish_error(aio, rv); return; } - nni_task_dispatch(item->task); + nni_list_append(&resolv_aios, aio); + nni_cv_wake1(&resolv_cv); nni_mtx_unlock(&resolv_mtx); } @@ -274,27 +257,83 @@ nni_plat_udp_resolv( resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); } +void +resolv_worker(void *notused) +{ + + NNI_ARG_UNUSED(notused); + + nni_mtx_lock(&resolv_mtx); + for (;;) { + nni_aio * aio; + resolv_item *item; + int rv; + + if ((aio = nni_list_first(&resolv_aios)) == NULL) { + if (resolv_fini) { + break; + } + nni_cv_wait(&resolv_cv); + continue; + } + + item = nni_aio_get_prov_data(aio); + nni_aio_list_remove(aio); + + // Now attempt to do the work. This runs synchronously. + nni_mtx_unlock(&resolv_mtx); + rv = resolv_task(item); + nni_mtx_lock(&resolv_mtx); + + // Check to make sure we were not canceled. + if ((aio = item->aio) != NULL) { + nng_sockaddr *sa = nni_aio_get_input(aio, 0); + nni_aio_set_prov_data(aio, NULL); + item->aio = NULL; + memcpy(sa, &item->sa, sizeof(*sa)); + nni_aio_finish(aio, rv, 0); + + NNI_FREE_STRUCT(item); + } + } + nni_mtx_unlock(&resolv_mtx); +} + int nni_posix_resolv_sysinit(void) { - int rv; - nni_mtx_init(&resolv_mtx); + nni_cv_init(&resolv_cv, &resolv_mtx); + nni_aio_list_init(&resolv_aios); + + resolv_fini = false; - if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) { - nni_mtx_fini(&resolv_mtx); - return (rv); + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); + if (rv != 0) { + nni_posix_resolv_sysfini(); + return (rv); + } + } + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + nni_thr_run(&resolv_thrs[i]); } + return (0); } void nni_posix_resolv_sysfini(void) { - if (resolv_tq != NULL) { - nni_taskq_fini(resolv_tq); - resolv_tq = NULL; + nni_mtx_lock(&resolv_mtx); + resolv_fini = true; + nni_cv_wake(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); + + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + nni_thr_fini(&resolv_thrs[i]); } + nni_cv_fini(&resolv_cv); nni_mtx_fini(&resolv_mtx); } |
