diff options
Diffstat (limited to 'src/platform/posix/posix_resolv_gai.c')
| -rw-r--r-- | src/platform/posix/posix_resolv_gai.c | 365 |
1 files changed, 157 insertions, 208 deletions
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index b5f48ecc..f3f69f9f 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -45,33 +45,18 @@ #endif #endif -static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; -static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); -static bool resolv_fini = false; -static nni_list resolv_aios; -static nni_thr *resolv_thrs; -static int16_t resolv_num_thr; - -typedef struct resolv_item resolv_item; -struct resolv_item { - int family; - bool passive; - char host[256]; - char serv[8]; - nni_aio *aio; - nng_sockaddr *sa; -}; - -static void -resolv_free_item(resolv_item *item) -{ - NNI_FREE_STRUCT(item); -} +static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; +static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); +static bool resolv_fini = false; +static nni_list resolv_aios; +static nni_thr *resolv_thrs; +static nni_aio **resolv_active; +static int16_t resolv_num_thr; static void resolv_cancel(nni_aio *aio, void *arg, int rv) { - resolv_item *item = arg; + nni_resolv_item *item = arg; nni_mtx_lock(&resolv_mtx); if (item != nni_aio_get_prov_data(aio)) { @@ -84,17 +69,18 @@ resolv_cancel(nni_aio *aio, void *arg, int rv) // We have not been picked up by a resolver thread yet, // so we can just discard everything. nni_aio_list_remove(aio); - nni_mtx_unlock(&resolv_mtx); - resolv_free_item(item); } else { - // This case indicates the resolver is still processing our - // node. We can discard our interest in the result, but we - // can't interrupt the resolver itself. (Too bad, name - // resolution is utterly synchronous for now.) - item->aio = NULL; - item->sa = NULL; - nni_mtx_unlock(&resolv_mtx); + // Remove it from the thread pending list. We cannot abort + // the actual lookup, but this abandons interest in it, and + // the resolver thread will pick the next item when done. + for (int i = 0; i < resolv_num_thr; i++) { + if (resolv_active[i] == aio) { + resolv_active[i] = NULL; + break; + } + } } + nni_mtx_unlock(&resolv_mtx); nni_aio_finish_error(aio, rv); } @@ -139,157 +125,27 @@ posix_gai_errno(int rv) } } -static int -resolv_task(resolv_item *item) -{ - struct addrinfo hints; - struct addrinfo *results; - struct addrinfo *probe; - int rv; - - results = NULL; - - // We treat these all as IP addresses. The service and the - // host part are split. - memset(&hints, 0, sizeof(hints)); -#ifdef AI_ADDRCONFIG - hints.ai_flags = AI_ADDRCONFIG; -#endif - if (item->passive) { - hints.ai_flags |= AI_PASSIVE; - } - hints.ai_family = item->family; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags |= AI_NUMERICSERV; - - // We can pass any non-zero service number, but we have to pass - // *something*, in case we are using a NULL hostname. - if ((rv = getaddrinfo(item->host[0] != 0 ? item->host : NULL, - item->serv, &hints, &results)) != 0) { - rv = posix_gai_errno(rv); - goto done; - } - - // We only take the first matching address. Presumably - // DNS load balancing is done by the resolver/server. - - rv = NNG_EADDRINVAL; - for (probe = results; probe != NULL; probe = probe->ai_next) { - if (probe->ai_addr->sa_family == AF_INET) { - break; - } -#ifdef NNG_ENABLE_IPV6 - if (probe->ai_addr->sa_family == AF_INET6) { - break; - } -#endif - } - - nni_mtx_lock(&resolv_mtx); - if ((probe != NULL) && (item->aio != NULL)) { - struct sockaddr_in *sin; -#ifdef NNG_ENABLE_IPV6 - struct sockaddr_in6 *sin6; -#endif - nng_sockaddr *sa = item->sa; - - switch (probe->ai_addr->sa_family) { - case AF_INET: - rv = 0; - sin = (void *) probe->ai_addr; - sa->s_in.sa_family = NNG_AF_INET; - sa->s_in.sa_port = sin->sin_port; - sa->s_in.sa_addr = sin->sin_addr.s_addr; - break; -#ifdef NNG_ENABLE_IPV6 - case AF_INET6: - rv = 0; - sin6 = (void *) probe->ai_addr; - sa->s_in6.sa_family = NNG_AF_INET6; - sa->s_in6.sa_port = sin6->sin6_port; - sa->s_in6.sa_scope = sin6->sin6_scope_id; - memcpy(sa->s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); - break; -#endif - } - } - nni_mtx_unlock(&resolv_mtx); - -done: - - if (results != NULL) { - freeaddrinfo(results); - } - - return (rv); -} - void -nni_resolv_ip(const char *host, uint16_t port, int af, bool passive, - nng_sockaddr *sa, nni_aio *aio) +nni_resolv(nni_resolv_item *item, nni_aio *aio) { - resolv_item *item; - sa_family_t fam; - nni_aio_reset(aio); - if (host != NULL) { - if ((strlen(host) >= sizeof(item->host)) || - (strcmp(host, "*") == 0)) { + if (item->ri_host != NULL) { + if ((strlen(item->ri_host) >= 256) || + (strcmp(item->ri_host, "*") == 0)) { nni_aio_finish_error(aio, NNG_EADDRINVAL); return; } } - switch (af) { - case NNG_AF_INET: - fam = AF_INET; - break; - -#ifdef NNG_ENABLE_IPV6 - case NNG_AF_INET6: - fam = AF_INET6; - break; - case NNG_AF_UNSPEC: - fam = AF_UNSPEC; - break; -#else - case NNG_AF_UNSPEC: - fam = AF_INET; - break; -#endif - - default: - nni_aio_finish_error(aio, NNG_ENOTSUP); - return; - } - - if ((item = NNI_ALLOC_STRUCT(item)) == NULL) { - nni_aio_finish_error(aio, NNG_ENOMEM); - return; - } - - snprintf(item->serv, sizeof(item->serv), "%u", port); - if (host == NULL) { - item->host[0] = '\0'; - } else { - snprintf(item->host, sizeof(item->host), "%s", host); - } - - item->aio = aio; - item->family = fam; - item->passive = passive; - item->sa = sa; nni_mtx_lock(&resolv_mtx); nni_aio_set_prov_data(aio, item); if (!nni_aio_start(aio, resolv_cancel, item)) { nni_mtx_unlock(&resolv_mtx); - resolv_free_item(item); return; } if (resolv_fini) { nni_mtx_unlock(&resolv_mtx); - resolv_free_item(item); nni_aio_finish_error(aio, NNG_ECLOSED); return; } @@ -299,18 +155,22 @@ nni_resolv_ip(const char *host, uint16_t port, int af, bool passive, } void -resolv_worker(void *unused) +resolv_worker(void *index) { - - NNI_ARG_UNUSED(unused); + int tid = (int) (intptr_t) index; + struct addrinfo hints; + struct addrinfo *results; + struct addrinfo *probe; + int rv; + char serv[8]; + char host[256]; + nni_aio *aio; + nni_resolv_item *item; nni_thr_set_name(NULL, "nng:resolver"); nni_mtx_lock(&resolv_mtx); for (;;) { - nni_aio *aio; - resolv_item *item; - int rv; if ((aio = nni_list_first(&resolv_aios)) == NULL) { if (resolv_fini) { @@ -322,22 +182,111 @@ resolv_worker(void *unused) item = nni_aio_get_prov_data(aio); nni_aio_list_remove(aio); + resolv_active[tid] = aio; + + snprintf(host, sizeof(host), "%s", + item->ri_host ? item->ri_host : ""); + snprintf(serv, sizeof(serv), "%u", item->ri_port); - // Now attempt to do the work. This runs synchronously. + // We treat these all as IP addresses. The service and the + // host part are split. + memset(&hints, 0, sizeof(hints)); + + results = NULL; + + switch (item->ri_family) { + case NNG_AF_INET: + hints.ai_family = AF_INET; + break; + +#ifdef NNG_ENABLE_IPV6 + case NNG_AF_INET6: + hints.ai_family = AF_INET6; + break; + case NNG_AF_UNSPEC: + hints.ai_family = AF_UNSPEC; + break; +#else + case NNG_AF_UNSPEC: + hints.ai_family = AF_INET; + break; +#endif + default: + resolv_active[tid] = NULL; + nni_aio_finish_error(aio, NNG_ENOTSUP); + continue; + } + +#ifdef AI_ADDRCONFIG + hints.ai_flags = AI_ADDRCONFIG; +#endif + if (item->ri_passive) { + hints.ai_flags |= AI_PASSIVE; + } + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags |= AI_NUMERICSERV; + + // We can pass any non-zero service number, but we have to pass + // *something*, in case we are using a NULL hostname. The lock + // is dropped to allow other submissions, and other threads to + // process. nni_mtx_unlock(&resolv_mtx); - rv = resolv_task(item); + rv = getaddrinfo( + host[0] != 0 ? host : NULL, serv, &hints, &results); nni_mtx_lock(&resolv_mtx); - // Check to make sure we were not canceled. - if ((aio = item->aio) != NULL) { + if ((aio = resolv_active[tid]) == NULL) { + // no more interest (canceled), so ignore the result + // and carry on + if (results != NULL) { + freeaddrinfo(results); + } + continue; + } + resolv_active[tid] = NULL; + + if (rv != 0) { + rv = posix_gai_errno(rv); + nni_aio_finish_error(aio, rv); + if (results != NULL) { + freeaddrinfo(results); + } + continue; + } - nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - item->sa = NULL; + // We only take the first matching address. Presumably + // DNS load balancing is done by the resolver/server. - nni_aio_finish(aio, rv, 0); + rv = NNG_EADDRINVAL; + for (probe = results; probe != NULL; probe = probe->ai_next) { + if (probe->ai_addr->sa_family == AF_INET) { + break; + } +#ifdef NNG_ENABLE_IPV6 + if (probe->ai_addr->sa_family == AF_INET6) { + break; + } +#endif } - resolv_free_item(item); + + if (probe == NULL) { + // no match + nni_aio_finish_error(aio, rv); + if (results != NULL) { + freeaddrinfo(results); + } + continue; + } + + item = nni_aio_get_prov_data(aio); + nni_aio_set_prov_data(aio, NULL); + NNI_ASSERT(item != NULL); + + (void) nni_posix_sockaddr2nn( + item->ri_sa, probe->ai_addr, probe->ai_addrlen); + + freeaddrinfo(results); + nni_aio_finish(aio, 0, 0); } nni_mtx_unlock(&resolv_mtx); } @@ -350,8 +299,7 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port) int rv; char *port; char *host; - char *buf; - size_t buf_len; + char buf[64]; #ifdef NNG_ENABLE_IPV6 bool v6 = false; @@ -363,11 +311,7 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port) addr = ""; } - buf_len = strlen(addr) + 1; - if ((buf = nni_alloc(buf_len)) == NULL) { - return (NNG_ENOMEM); - } - memcpy(buf, addr, buf_len); + snprintf(buf, sizeof(buf), "%s", addr); host = buf; #ifdef NNG_ENABLE_IPV6 if (*host == '[') { @@ -446,7 +390,6 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port) freeaddrinfo(results); done: - nni_free(buf, buf_len); return (rv); } @@ -482,6 +425,25 @@ nni_get_port_by_name(const char *name, uint32_t *portp) return (NNG_EADDRINVAL); } +void +nni_posix_resolv_sysfini(void) +{ + nni_mtx_lock(&resolv_mtx); + resolv_fini = true; + nni_cv_wake(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); + + if (resolv_thrs != NULL) { + for (int i = 0; i < resolv_num_thr; i++) { + nni_thr_fini(&resolv_thrs[i]); + } + NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr); + } + if (resolv_active != NULL) { + NNI_FREE_STRUCTS(resolv_active, resolv_num_thr); + } +} + int nni_posix_resolv_sysinit(nng_init_params *params) { @@ -494,13 +456,16 @@ nni_posix_resolv_sysinit(nng_init_params *params) } params->num_resolver_threads = resolv_num_thr; // no limit on the maximum for now - resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr); - if (resolv_thrs == NULL) { + resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr); + resolv_active = NNI_ALLOC_STRUCTS(resolv_active, resolv_num_thr); + if (resolv_thrs == NULL || resolv_active == NULL) { + nni_posix_resolv_sysfini(); return (NNG_ENOMEM); } for (int i = 0; i < resolv_num_thr; i++) { - int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); + int rv = nni_thr_init( + &resolv_thrs[i], resolv_worker, (void *) (intptr_t) i); if (rv != 0) { nni_posix_resolv_sysfini(); return (rv); @@ -513,20 +478,4 @@ nni_posix_resolv_sysinit(nng_init_params *params) return (0); } -void -nni_posix_resolv_sysfini(void) -{ - nni_mtx_lock(&resolv_mtx); - resolv_fini = true; - nni_cv_wake(&resolv_cv); - nni_mtx_unlock(&resolv_mtx); - - if (resolv_thrs != NULL) { - for (int i = 0; i < resolv_num_thr; i++) { - nni_thr_fini(&resolv_thrs[i]); - } - NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr); - } -} - #endif // NNG_USE_POSIX_RESOLV_GAI |
