diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-05-17 20:41:06 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-05-17 21:52:07 -0700 |
| commit | 846c30081a67e961b4a060bdca192ddafb87cce9 (patch) | |
| tree | c795cd6241979418f651c2e2d5e42091988591d6 /src/platform/windows/win_resolv.c | |
| parent | 109a559590abfe3017dd317c3068e2457188541c (diff) | |
| download | nng-846c30081a67e961b4a060bdca192ddafb87cce9.tar.gz nng-846c30081a67e961b4a060bdca192ddafb87cce9.tar.bz2 nng-846c30081a67e961b4a060bdca192ddafb87cce9.zip | |
fixes #451 task finalization could be better/smarter (resolver)
This changes nni_task_fini to always run synchronously, waiting
for the task to finish before cleaning up. Much simpler code.
Additionally, we've refactored the resolver code to avoid the
use of taskqs, which added complexity and inefficiency. The
approach of just allocating its own threads and a work queue
to process them turns out to be vastly simpler, and actually
reduces extra allocations and context switches.
wip
POSIX resolv threads.
(Taskqs are just overhead and complexity here.)
Windows resolver changes.
Task cleanup.
fix up windows mutex.
Diffstat (limited to 'src/platform/windows/win_resolv.c')
| -rw-r--r-- | src/platform/windows/win_resolv.c | 193 |
1 files changed, 110 insertions, 83 deletions
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index 54f1c61c..c0fa1e0a 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -16,68 +16,57 @@ // with it, where looking up names in DNS can poison results for other // uses, because the asynchronous resolver *only* considers DNS -- ignoring // host file, WINS, or other naming services. As a result, we just build -// our own limited asynchronous using a taskq. - -// We use a single resolver taskq - but we allocate a few threads -// for it to ensure that names can be looked up concurrently. This isn't -// as elegant or scaleable as a true asynchronous resolver would be, but -// it has the advantage of being fairly portable, and concurrent enough for -// the vast, vast majority of use cases. The total thread count can be -// changed with this define. +// our own limited asynchronous resolver with threads. #ifndef NNG_WIN_RESOLV_CONCURRENCY #define NNG_WIN_RESOLV_CONCURRENCY 4 #endif -static nni_taskq *win_resolv_tq = NULL; -static nni_mtx win_resolv_mtx; +static nni_mtx resolv_mtx; +static nni_cv resolv_cv; +static bool resolv_fini; +static nni_list resolv_aios; +static nni_thr resolv_thrs[NNG_WIN_RESOLV_CONCURRENCY]; -typedef struct win_resolv_item win_resolv_item; -struct win_resolv_item { +typedef struct resolv_item resolv_item; +struct resolv_item { int family; int passive; const char * name; const char * serv; int proto; nni_aio * aio; - nni_task * task; nng_sockaddr sa; }; static void -win_resolv_finish(win_resolv_item *item, int rv) +resolv_cancel(nni_aio *aio, int rv) { - nni_aio *aio; - - if (((aio = item->aio) != NULL) && - (nni_aio_get_prov_data(aio) == item)) { - nni_sockaddr *sa = nni_aio_get_input(aio, 0); - nni_aio_set_prov_data(aio, NULL); - memcpy(sa, &item->sa, sizeof(*sa)); - nni_aio_finish(aio, rv, 0); - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - } -} + resolv_item *item; -static void -win_resolv_cancel(nni_aio *aio, int rv) -{ - win_resolv_item *item; - - nni_mtx_lock(&win_resolv_mtx); + nni_mtx_lock(&resolv_mtx); if ((item = nni_aio_get_prov_data(aio)) == NULL) { - nni_mtx_unlock(&win_resolv_mtx); + nni_mtx_unlock(&resolv_mtx); return; } nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - nni_mtx_unlock(&win_resolv_mtx); + if (nni_aio_list_active(aio)) { + // We have not been picked up by a resolver thread yet, + // so we can just discard everything. + nni_aio_list_remove(aio); + nni_mtx_unlock(&resolv_mtx); + NNI_FREE_STRUCT(item); + } else { + // Resolver still working, so just unlink our AIO to + // discard our interest in the results. + item->aio = NULL; + nni_mtx_unlock(&resolv_mtx); + } nni_aio_finish_error(aio, rv); } static int -win_gai_errno(int rv) +resolv_gai_errno(int rv) { switch (rv) { case 0: @@ -103,10 +92,9 @@ win_gai_errno(int rv) } } -static void -win_resolv_task(void *arg) +static int +resolv_task(resolv_item *item) { - win_resolv_item *item = arg; struct addrinfo hints; struct addrinfo *results; struct addrinfo *probe; @@ -114,16 +102,6 @@ win_resolv_task(void *arg) results = NULL; - nni_mtx_lock(&win_resolv_mtx); - if (item->aio == NULL) { - nni_mtx_unlock(&win_resolv_mtx); - // Caller canceled, and no longer cares about this. - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - return; - } - nni_mtx_unlock(&win_resolv_mtx); - // We treat these all as IP addresses. The service and the // host part are split. memset(&hints, 0, sizeof(hints)); @@ -136,7 +114,7 @@ win_resolv_task(void *arg) rv = getaddrinfo(item->name, item->serv, &hints, &results); if (rv != 0) { - rv = win_gai_errno(rv); + rv = resolv_gai_errno(rv); goto done; } @@ -179,18 +157,16 @@ done: if (results != NULL) { freeaddrinfo(results); } - nni_mtx_lock(&win_resolv_mtx); - win_resolv_finish(item, rv); - nni_mtx_unlock(&win_resolv_mtx); + return (rv); } static void -win_resolv_ip(const char *host, const char *serv, int passive, int family, +resolv_ip(const char *host, const char *serv, int passive, int family, int proto, nni_aio *aio) { - win_resolv_item *item; - int fam; - int rv; + resolv_item *item; + int fam; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -215,13 +191,6 @@ win_resolv_ip(const char *host, const char *serv, int passive, int family, return; } - rv = nni_task_init(&item->task, win_resolv_tq, win_resolv_task, item); - if (rv != 0) { - NNI_FREE_STRUCT(item); - nni_aio_finish_error(aio, rv); - return; - } - item->passive = passive; item->name = host; item->serv = serv; @@ -229,42 +198,96 @@ win_resolv_ip(const char *host, const char *serv, int passive, int family, item->aio = aio; item->family = fam; - nni_mtx_lock(&win_resolv_mtx); - if ((rv = nni_aio_schedule(aio, win_resolv_cancel, item)) != 0) { - nni_mtx_unlock(&win_resolv_mtx); - nni_task_fini(item->task); + nni_mtx_lock(&resolv_mtx); + if (resolv_fini) { + rv = NNG_ECLOSED; + } else { + rv = nni_aio_schedule(aio, resolv_cancel, item); + } + if (rv != 0) { + nni_mtx_unlock(&resolv_mtx); NNI_FREE_STRUCT(item); nni_aio_finish_error(aio, rv); return; } - nni_task_dispatch(item->task); - nni_mtx_unlock(&win_resolv_mtx); + nni_list_append(&resolv_aios, aio); + nni_cv_wake1(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); } void nni_plat_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); + resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void nni_plat_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); + resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); } -int -nni_win_resolv_sysinit(void) +void +resolv_worker(void *notused) { - int rv; - nni_mtx_init(&win_resolv_mtx); + NNI_ARG_UNUSED(notused); + + nni_mtx_lock(&resolv_mtx); + for (;;) { + nni_aio * aio; + resolv_item *item; + int rv; + + if ((aio = nni_list_first(&resolv_aios)) == NULL) { + if (resolv_fini) { + break; + } + nni_cv_wait(&resolv_cv); + continue; + } + + item = nni_aio_get_prov_data(aio); + nni_aio_list_remove(aio); + + // Now attempt to do the work. This runs synchronously. + nni_mtx_unlock(&resolv_mtx); + rv = resolv_task(item); + nni_mtx_lock(&resolv_mtx); - if ((rv = nni_taskq_init(&win_resolv_tq, 4)) != 0) { - nni_mtx_fini(&win_resolv_mtx); - return (rv); + // Check to make sure we were not canceled. + if ((aio = item->aio) != NULL) { + nng_sockaddr *sa = nni_aio_get_input(aio, 0); + nni_aio_set_prov_data(aio, NULL); + item->aio = NULL; + memcpy(sa, &item->sa, sizeof(*sa)); + nni_aio_finish(aio, rv, 0); + + NNI_FREE_STRUCT(item); + } + } + nni_mtx_unlock(&resolv_mtx); +} + +int +nni_win_resolv_sysinit(void) +{ + nni_mtx_init(&resolv_mtx); + nni_cv_init(&resolv_cv, &resolv_mtx); + nni_aio_list_init(&resolv_aios); + + resolv_fini = false; + for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) { + int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); + if (rv != 0) { + nni_win_resolv_sysfini(); + return (rv); + } + } + for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) { + nni_thr_run(&resolv_thrs[i]); } return (0); } @@ -272,11 +295,15 @@ nni_win_resolv_sysinit(void) void nni_win_resolv_sysfini(void) { - if (win_resolv_tq != NULL) { - nni_taskq_fini(win_resolv_tq); - win_resolv_tq = NULL; + nni_mtx_lock(&resolv_mtx); + resolv_fini = true; + nni_cv_wake(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); + for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) { + nni_thr_fini(&resolv_thrs[i]); } - nni_mtx_fini(&win_resolv_mtx); + nni_cv_fini(&resolv_cv); + nni_mtx_fini(&resolv_mtx); } #endif // NNG_PLATFORM_WINDOWS |
