diff options
Diffstat (limited to 'src/platform/posix')
| -rw-r--r-- | src/platform/posix/posix_resolv_gai.c | 155 |
1 files changed, 97 insertions, 58 deletions
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index ad1d5147..dcaef409 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -37,8 +37,11 @@ #define NNG_POSIX_RESOLV_CONCURRENCY 4 #endif -static nni_taskq *resolv_tq = NULL; -static nni_mtx resolv_mtx; +static nni_mtx resolv_mtx; +static nni_cv resolv_cv; +static bool resolv_fini; +static nni_list resolv_aios; +static nni_thr resolv_thrs[NNG_POSIX_RESOLV_CONCURRENCY]; typedef struct resolv_item resolv_item; struct resolv_item { @@ -48,45 +51,40 @@ struct resolv_item { const char * serv; int proto; nni_aio * aio; - nni_task * task; nng_sockaddr sa; }; static void -resolv_finish(resolv_item *item, int rv) -{ - nni_aio *aio; - - if (((aio = item->aio) != NULL) && - (nni_aio_get_prov_data(aio) == item)) { - nng_sockaddr *sa = nni_aio_get_input(aio, 0); - nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - memcpy(sa, &item->sa, sizeof(*sa)); - nni_aio_finish(aio, rv, 0); - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - } -} - -static void resolv_cancel(nni_aio *aio, int rv) { resolv_item *item; nni_mtx_lock(&resolv_mtx); if ((item = nni_aio_get_prov_data(aio)) == NULL) { + // Already canceled? nni_mtx_unlock(&resolv_mtx); return; } nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - nni_mtx_unlock(&resolv_mtx); + if (nni_aio_list_active(aio)) { + // We have not been picked up by a resolver thread yet, + // so we can just discard everything. + nni_aio_list_remove(aio); + nni_mtx_unlock(&resolv_mtx); + NNI_FREE_STRUCT(item); + } else { + // This case indicates the resolver is still processing our + // node. We can discard our interest in the result, but we + // can't interrupt the resolver itself. (Too bad, name + // resolution is utterly synchronous for now.) + item->aio = NULL; + nni_mtx_unlock(&resolv_mtx); + } nni_aio_finish_error(aio, rv); } static int -nni_posix_gai_errno(int rv) +posix_gai_errno(int rv) { switch (rv) { case 0: @@ -116,25 +114,14 @@ nni_posix_gai_errno(int rv) } } -static void -resolv_task(void *arg) +static int +resolv_task(resolv_item *item) { - resolv_item * item = arg; struct addrinfo hints; struct addrinfo *results; struct addrinfo *probe; int rv; - nni_mtx_lock(&resolv_mtx); - if (item->aio == NULL) { - nni_mtx_unlock(&resolv_mtx); - // Caller canceled, and no longer cares about this. - nni_task_fini(item->task); - NNI_FREE_STRUCT(item); - return; - } - nni_mtx_unlock(&resolv_mtx); - results = NULL; // We treat these all as IP addresses. The service and the @@ -152,7 +139,7 @@ resolv_task(void *arg) rv = getaddrinfo(item->name, item->serv, &hints, &results); if (rv != 0) { - rv = nni_posix_gai_errno(rv); + rv = posix_gai_errno(rv); goto done; } @@ -196,9 +183,7 @@ done: freeaddrinfo(results); } - nni_mtx_lock(&resolv_mtx); - resolv_finish(item, rv); - nni_mtx_unlock(&resolv_mtx); + return (rv); } static void @@ -232,13 +217,6 @@ resolv_ip(const char *host, const char *serv, int passive, int family, return; } - if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) != - 0) { - NNI_FREE_STRUCT(item); - nni_aio_finish_error(aio, rv); - return; - }; - // NB: host and serv must remain valid until this is completed. memset(&item->sa, 0, sizeof(item->sa)); item->passive = passive; @@ -249,14 +227,19 @@ resolv_ip(const char *host, const char *serv, int passive, int family, item->family = fam; nni_mtx_lock(&resolv_mtx); - if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) { + if (resolv_fini) { + rv = NNG_ECLOSED; + } else { + rv = nni_aio_schedule(aio, resolv_cancel, item); + } + if (rv != 0) { nni_mtx_unlock(&resolv_mtx); - nni_task_fini(item->task); NNI_FREE_STRUCT(item); nni_aio_finish_error(aio, rv); return; } - nni_task_dispatch(item->task); + nni_list_append(&resolv_aios, aio); + nni_cv_wake1(&resolv_cv); nni_mtx_unlock(&resolv_mtx); } @@ -274,27 +257,83 @@ nni_plat_udp_resolv( resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); } +void +resolv_worker(void *notused) +{ + + NNI_ARG_UNUSED(notused); + + nni_mtx_lock(&resolv_mtx); + for (;;) { + nni_aio * aio; + resolv_item *item; + int rv; + + if ((aio = nni_list_first(&resolv_aios)) == NULL) { + if (resolv_fini) { + break; + } + nni_cv_wait(&resolv_cv); + continue; + } + + item = nni_aio_get_prov_data(aio); + nni_aio_list_remove(aio); + + // Now attempt to do the work. This runs synchronously. + nni_mtx_unlock(&resolv_mtx); + rv = resolv_task(item); + nni_mtx_lock(&resolv_mtx); + + // Check to make sure we were not canceled. + if ((aio = item->aio) != NULL) { + nng_sockaddr *sa = nni_aio_get_input(aio, 0); + nni_aio_set_prov_data(aio, NULL); + item->aio = NULL; + memcpy(sa, &item->sa, sizeof(*sa)); + nni_aio_finish(aio, rv, 0); + + NNI_FREE_STRUCT(item); + } + } + nni_mtx_unlock(&resolv_mtx); +} + int nni_posix_resolv_sysinit(void) { - int rv; - nni_mtx_init(&resolv_mtx); + nni_cv_init(&resolv_cv, &resolv_mtx); + nni_aio_list_init(&resolv_aios); + + resolv_fini = false; - if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) { - nni_mtx_fini(&resolv_mtx); - return (rv); + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); + if (rv != 0) { + nni_posix_resolv_sysfini(); + return (rv); + } + } + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + nni_thr_run(&resolv_thrs[i]); } + return (0); } void nni_posix_resolv_sysfini(void) { - if (resolv_tq != NULL) { - nni_taskq_fini(resolv_tq); - resolv_tq = NULL; + nni_mtx_lock(&resolv_mtx); + resolv_fini = true; + nni_cv_wake(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); + + for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) { + nni_thr_fini(&resolv_thrs[i]); } + nni_cv_fini(&resolv_cv); nni_mtx_fini(&resolv_mtx); } |
