From d574649899a29ce7eb96485c0a4c606f14f87011 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 28 Dec 2024 23:29:35 -0800 Subject: resolver: use explicit resolver item provided by caller This avoids the need to perform multiple allocations for dialing, eliminating additional potential failures. Cancellation is also made simpler and more perfectly robust. --- src/platform/posix/posix_resolv_gai.c | 365 +++++++++++++++------------------- src/platform/posix/posix_sockaddr.c | 2 +- src/platform/resolver_test.c | 114 ++++++++--- src/platform/windows/win_impl.h | 2 +- src/platform/windows/win_resolv.c | 322 +++++++++++++----------------- src/platform/windows/win_sockaddr.c | 22 +- src/platform/windows/win_tcpconn.c | 4 +- src/platform/windows/win_tcpdial.c | 2 +- src/platform/windows/win_tcplisten.c | 2 +- src/platform/windows/win_udp.c | 4 +- 10 files changed, 407 insertions(+), 432 deletions(-) (limited to 'src/platform') diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index b5f48ecc..f3f69f9f 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -45,33 +45,18 @@ #endif #endif -static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; -static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); -static bool resolv_fini = false; -static nni_list resolv_aios; -static nni_thr *resolv_thrs; -static int16_t resolv_num_thr; - -typedef struct resolv_item resolv_item; -struct resolv_item { - int family; - bool passive; - char host[256]; - char serv[8]; - nni_aio *aio; - nng_sockaddr *sa; -}; - -static void -resolv_free_item(resolv_item *item) -{ - NNI_FREE_STRUCT(item); -} +static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; +static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); +static bool resolv_fini = false; +static nni_list resolv_aios; +static nni_thr *resolv_thrs; +static nni_aio **resolv_active; +static int16_t resolv_num_thr; static void resolv_cancel(nni_aio *aio, void *arg, int rv) { - resolv_item *item = arg; + nni_resolv_item *item = arg; nni_mtx_lock(&resolv_mtx); if (item != nni_aio_get_prov_data(aio)) { @@ -84,17 +69,18 @@ resolv_cancel(nni_aio *aio, void *arg, int rv) // We have not been picked up by a resolver thread yet, // so we can just discard everything. nni_aio_list_remove(aio); - nni_mtx_unlock(&resolv_mtx); - resolv_free_item(item); } else { - // This case indicates the resolver is still processing our - // node. We can discard our interest in the result, but we - // can't interrupt the resolver itself. (Too bad, name - // resolution is utterly synchronous for now.) - item->aio = NULL; - item->sa = NULL; - nni_mtx_unlock(&resolv_mtx); + // Remove it from the thread pending list. We cannot abort + // the actual lookup, but this abandons interest in it, and + // the resolver thread will pick the next item when done. + for (int i = 0; i < resolv_num_thr; i++) { + if (resolv_active[i] == aio) { + resolv_active[i] = NULL; + break; + } + } } + nni_mtx_unlock(&resolv_mtx); nni_aio_finish_error(aio, rv); } @@ -139,157 +125,27 @@ posix_gai_errno(int rv) } } -static int -resolv_task(resolv_item *item) -{ - struct addrinfo hints; - struct addrinfo *results; - struct addrinfo *probe; - int rv; - - results = NULL; - - // We treat these all as IP addresses. The service and the - // host part are split. - memset(&hints, 0, sizeof(hints)); -#ifdef AI_ADDRCONFIG - hints.ai_flags = AI_ADDRCONFIG; -#endif - if (item->passive) { - hints.ai_flags |= AI_PASSIVE; - } - hints.ai_family = item->family; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags |= AI_NUMERICSERV; - - // We can pass any non-zero service number, but we have to pass - // *something*, in case we are using a NULL hostname. - if ((rv = getaddrinfo(item->host[0] != 0 ? item->host : NULL, - item->serv, &hints, &results)) != 0) { - rv = posix_gai_errno(rv); - goto done; - } - - // We only take the first matching address. Presumably - // DNS load balancing is done by the resolver/server. - - rv = NNG_EADDRINVAL; - for (probe = results; probe != NULL; probe = probe->ai_next) { - if (probe->ai_addr->sa_family == AF_INET) { - break; - } -#ifdef NNG_ENABLE_IPV6 - if (probe->ai_addr->sa_family == AF_INET6) { - break; - } -#endif - } - - nni_mtx_lock(&resolv_mtx); - if ((probe != NULL) && (item->aio != NULL)) { - struct sockaddr_in *sin; -#ifdef NNG_ENABLE_IPV6 - struct sockaddr_in6 *sin6; -#endif - nng_sockaddr *sa = item->sa; - - switch (probe->ai_addr->sa_family) { - case AF_INET: - rv = 0; - sin = (void *) probe->ai_addr; - sa->s_in.sa_family = NNG_AF_INET; - sa->s_in.sa_port = sin->sin_port; - sa->s_in.sa_addr = sin->sin_addr.s_addr; - break; -#ifdef NNG_ENABLE_IPV6 - case AF_INET6: - rv = 0; - sin6 = (void *) probe->ai_addr; - sa->s_in6.sa_family = NNG_AF_INET6; - sa->s_in6.sa_port = sin6->sin6_port; - sa->s_in6.sa_scope = sin6->sin6_scope_id; - memcpy(sa->s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); - break; -#endif - } - } - nni_mtx_unlock(&resolv_mtx); - -done: - - if (results != NULL) { - freeaddrinfo(results); - } - - return (rv); -} - void -nni_resolv_ip(const char *host, uint16_t port, int af, bool passive, - nng_sockaddr *sa, nni_aio *aio) +nni_resolv(nni_resolv_item *item, nni_aio *aio) { - resolv_item *item; - sa_family_t fam; - nni_aio_reset(aio); - if (host != NULL) { - if ((strlen(host) >= sizeof(item->host)) || - (strcmp(host, "*") == 0)) { + if (item->ri_host != NULL) { + if ((strlen(item->ri_host) >= 256) || + (strcmp(item->ri_host, "*") == 0)) { nni_aio_finish_error(aio, NNG_EADDRINVAL); return; } } - switch (af) { - case NNG_AF_INET: - fam = AF_INET; - break; - -#ifdef NNG_ENABLE_IPV6 - case NNG_AF_INET6: - fam = AF_INET6; - break; - case NNG_AF_UNSPEC: - fam = AF_UNSPEC; - break; -#else - case NNG_AF_UNSPEC: - fam = AF_INET; - break; -#endif - - default: - nni_aio_finish_error(aio, NNG_ENOTSUP); - return; - } - - if ((item = NNI_ALLOC_STRUCT(item)) == NULL) { - nni_aio_finish_error(aio, NNG_ENOMEM); - return; - } - - snprintf(item->serv, sizeof(item->serv), "%u", port); - if (host == NULL) { - item->host[0] = '\0'; - } else { - snprintf(item->host, sizeof(item->host), "%s", host); - } - - item->aio = aio; - item->family = fam; - item->passive = passive; - item->sa = sa; nni_mtx_lock(&resolv_mtx); nni_aio_set_prov_data(aio, item); if (!nni_aio_start(aio, resolv_cancel, item)) { nni_mtx_unlock(&resolv_mtx); - resolv_free_item(item); return; } if (resolv_fini) { nni_mtx_unlock(&resolv_mtx); - resolv_free_item(item); nni_aio_finish_error(aio, NNG_ECLOSED); return; } @@ -299,18 +155,22 @@ nni_resolv_ip(const char *host, uint16_t port, int af, bool passive, } void -resolv_worker(void *unused) +resolv_worker(void *index) { - - NNI_ARG_UNUSED(unused); + int tid = (int) (intptr_t) index; + struct addrinfo hints; + struct addrinfo *results; + struct addrinfo *probe; + int rv; + char serv[8]; + char host[256]; + nni_aio *aio; + nni_resolv_item *item; nni_thr_set_name(NULL, "nng:resolver"); nni_mtx_lock(&resolv_mtx); for (;;) { - nni_aio *aio; - resolv_item *item; - int rv; if ((aio = nni_list_first(&resolv_aios)) == NULL) { if (resolv_fini) { @@ -322,22 +182,111 @@ resolv_worker(void *unused) item = nni_aio_get_prov_data(aio); nni_aio_list_remove(aio); + resolv_active[tid] = aio; + + snprintf(host, sizeof(host), "%s", + item->ri_host ? item->ri_host : ""); + snprintf(serv, sizeof(serv), "%u", item->ri_port); - // Now attempt to do the work. This runs synchronously. + // We treat these all as IP addresses. The service and the + // host part are split. + memset(&hints, 0, sizeof(hints)); + + results = NULL; + + switch (item->ri_family) { + case NNG_AF_INET: + hints.ai_family = AF_INET; + break; + +#ifdef NNG_ENABLE_IPV6 + case NNG_AF_INET6: + hints.ai_family = AF_INET6; + break; + case NNG_AF_UNSPEC: + hints.ai_family = AF_UNSPEC; + break; +#else + case NNG_AF_UNSPEC: + hints.ai_family = AF_INET; + break; +#endif + default: + resolv_active[tid] = NULL; + nni_aio_finish_error(aio, NNG_ENOTSUP); + continue; + } + +#ifdef AI_ADDRCONFIG + hints.ai_flags = AI_ADDRCONFIG; +#endif + if (item->ri_passive) { + hints.ai_flags |= AI_PASSIVE; + } + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags |= AI_NUMERICSERV; + + // We can pass any non-zero service number, but we have to pass + // *something*, in case we are using a NULL hostname. The lock + // is dropped to allow other submissions, and other threads to + // process. nni_mtx_unlock(&resolv_mtx); - rv = resolv_task(item); + rv = getaddrinfo( + host[0] != 0 ? host : NULL, serv, &hints, &results); nni_mtx_lock(&resolv_mtx); - // Check to make sure we were not canceled. - if ((aio = item->aio) != NULL) { + if ((aio = resolv_active[tid]) == NULL) { + // no more interest (canceled), so ignore the result + // and carry on + if (results != NULL) { + freeaddrinfo(results); + } + continue; + } + resolv_active[tid] = NULL; + + if (rv != 0) { + rv = posix_gai_errno(rv); + nni_aio_finish_error(aio, rv); + if (results != NULL) { + freeaddrinfo(results); + } + continue; + } - nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - item->sa = NULL; + // We only take the first matching address. Presumably + // DNS load balancing is done by the resolver/server. - nni_aio_finish(aio, rv, 0); + rv = NNG_EADDRINVAL; + for (probe = results; probe != NULL; probe = probe->ai_next) { + if (probe->ai_addr->sa_family == AF_INET) { + break; + } +#ifdef NNG_ENABLE_IPV6 + if (probe->ai_addr->sa_family == AF_INET6) { + break; + } +#endif } - resolv_free_item(item); + + if (probe == NULL) { + // no match + nni_aio_finish_error(aio, rv); + if (results != NULL) { + freeaddrinfo(results); + } + continue; + } + + item = nni_aio_get_prov_data(aio); + nni_aio_set_prov_data(aio, NULL); + NNI_ASSERT(item != NULL); + + (void) nni_posix_sockaddr2nn( + item->ri_sa, probe->ai_addr, probe->ai_addrlen); + + freeaddrinfo(results); + nni_aio_finish(aio, 0, 0); } nni_mtx_unlock(&resolv_mtx); } @@ -350,8 +299,7 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port) int rv; char *port; char *host; - char *buf; - size_t buf_len; + char buf[64]; #ifdef NNG_ENABLE_IPV6 bool v6 = false; @@ -363,11 +311,7 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port) addr = ""; } - buf_len = strlen(addr) + 1; - if ((buf = nni_alloc(buf_len)) == NULL) { - return (NNG_ENOMEM); - } - memcpy(buf, addr, buf_len); + snprintf(buf, sizeof(buf), "%s", addr); host = buf; #ifdef NNG_ENABLE_IPV6 if (*host == '[') { @@ -446,7 +390,6 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port) freeaddrinfo(results); done: - nni_free(buf, buf_len); return (rv); } @@ -482,6 +425,25 @@ nni_get_port_by_name(const char *name, uint32_t *portp) return (NNG_EADDRINVAL); } +void +nni_posix_resolv_sysfini(void) +{ + nni_mtx_lock(&resolv_mtx); + resolv_fini = true; + nni_cv_wake(&resolv_cv); + nni_mtx_unlock(&resolv_mtx); + + if (resolv_thrs != NULL) { + for (int i = 0; i < resolv_num_thr; i++) { + nni_thr_fini(&resolv_thrs[i]); + } + NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr); + } + if (resolv_active != NULL) { + NNI_FREE_STRUCTS(resolv_active, resolv_num_thr); + } +} + int nni_posix_resolv_sysinit(nng_init_params *params) { @@ -494,13 +456,16 @@ nni_posix_resolv_sysinit(nng_init_params *params) } params->num_resolver_threads = resolv_num_thr; // no limit on the maximum for now - resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr); - if (resolv_thrs == NULL) { + resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr); + resolv_active = NNI_ALLOC_STRUCTS(resolv_active, resolv_num_thr); + if (resolv_thrs == NULL || resolv_active == NULL) { + nni_posix_resolv_sysfini(); return (NNG_ENOMEM); } for (int i = 0; i < resolv_num_thr; i++) { - int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); + int rv = nni_thr_init( + &resolv_thrs[i], resolv_worker, (void *) (intptr_t) i); if (rv != 0) { nni_posix_resolv_sysfini(); return (rv); @@ -513,20 +478,4 @@ nni_posix_resolv_sysinit(nng_init_params *params) return (0); } -void -nni_posix_resolv_sysfini(void) -{ - nni_mtx_lock(&resolv_mtx); - resolv_fini = true; - nni_cv_wake(&resolv_cv); - nni_mtx_unlock(&resolv_mtx); - - if (resolv_thrs != NULL) { - for (int i = 0; i < resolv_num_thr; i++) { - nni_thr_fini(&resolv_thrs[i]); - } - NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr); - } -} - #endif // NNG_USE_POSIX_RESOLV_GAI diff --git a/src/platform/posix/posix_sockaddr.c b/src/platform/posix/posix_sockaddr.c index 1906c78d..c86b6fb5 100644 --- a/src/platform/posix/posix_sockaddr.c +++ b/src/platform/posix/posix_sockaddr.c @@ -122,7 +122,7 @@ nni_posix_sockaddr2nn(nni_sockaddr *na, const void *sa, size_t sz) nng_sockaddr_in6 *nsin6; #endif - switch (((struct sockaddr *) sa)->sa_family) { + switch (((const struct sockaddr *) sa)->sa_family) { case AF_INET: if (sz < sizeof(*sin)) { return (-1); diff --git a/src/platform/resolver_test.c b/src/platform/resolver_test.c index 005affe3..09a2e51e 100644 --- a/src/platform/resolver_test.c +++ b/src/platform/resolver_test.c @@ -38,12 +38,18 @@ has_v6(void) void test_google_dns(void) { - nng_aio *aio; - nng_sockaddr sa; + nng_aio *aio; + nng_sockaddr sa; + nni_resolv_item item = { 0 }; + + item.ri_host = "google-public-dns-a.google.com"; + item.ri_port = 80; + item.ri_passive = true; + item.ri_sa = &sa; + item.ri_family = NNG_AF_INET; NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nni_resolv_ip( - "google-public-dns-a.google.com", 80, NNG_AF_INET, true, &sa, aio); + nni_resolv(&item, aio); nng_aio_wait(aio); NUTS_PASS(nng_aio_result(aio)); NUTS_TRUE(sa.s_in.sa_family == NNG_AF_INET); @@ -55,14 +61,20 @@ test_google_dns(void) void test_hostname_too_long(void) { - nng_aio *aio; - nng_sockaddr sa; - char buffer[512]; + nng_aio *aio; + nng_sockaddr sa; + char buffer[512]; + nni_resolv_item item = { 0 }; memset(buffer, 'a', sizeof(buffer) - 1); buffer[sizeof(buffer) - 1] = '\0'; NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nni_resolv_ip(buffer, 80, NNG_AF_INET, true, &sa, aio); + item.ri_family = NNG_AF_INET; + item.ri_passive = true; + item.ri_host = buffer; + item.ri_port = 80; + item.ri_sa = &sa; + nni_resolv(&item, aio); nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_EADDRINVAL); nng_aio_free(aio); @@ -71,11 +83,17 @@ test_hostname_too_long(void) void test_numeric_addr(void) { - nng_aio *aio; - nng_sockaddr sa; + nng_aio *aio; + nng_sockaddr sa; + nni_resolv_item item = { 0 }; NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nni_resolv_ip("8.8.4.4", 69, NNG_AF_INET, true, &sa, aio); + item.ri_family = NNG_AF_INET; + item.ri_host = "8.8.4.4"; + item.ri_port = 69; + item.ri_passive = true; + item.ri_sa = &sa; + nni_resolv(&item, aio); nng_aio_wait(aio); NUTS_PASS(nng_aio_result(aio)); NUTS_TRUE(sa.s_in.sa_family == NNG_AF_INET); @@ -88,15 +106,21 @@ test_numeric_addr(void) void test_numeric_v6(void) { - nng_aio *aio; - nng_sockaddr sa; + nng_aio *aio; + nng_sockaddr sa; + nni_resolv_item item = { 0 }; if (!has_v6()) { return; } NUTS_MSG("IPV6 support present"); NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nni_resolv_ip("::1", 80, NNG_AF_INET6, true, &sa, aio); + item.ri_family = NNG_AF_INET6; + item.ri_host = "::1"; + item.ri_port = 80; + item.ri_passive = true; + item.ri_sa = &sa; + nni_resolv(&item, aio); nng_aio_wait(aio); NUTS_PASS(nng_aio_result(aio)); NUTS_TRUE(sa.s_in6.sa_family == NNG_AF_INET6); @@ -109,12 +133,18 @@ test_numeric_v6(void) void test_service_names(void) { - nng_aio *aio; - nng_sockaddr sa; - uint32_t port; + nng_aio *aio; + nng_sockaddr sa; + uint32_t port; + nni_resolv_item item = { 0 }; NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nni_resolv_ip("8.8.4.4", 80, NNG_AF_INET, true, &sa, aio); + item.ri_family = NNG_AF_INET; + item.ri_host = "8.8.4.4"; + item.ri_port = 80; + item.ri_passive = true; + item.ri_sa = &sa; + nni_resolv(&item, aio); nng_aio_wait(aio); NUTS_PASS(nng_aio_result(aio)); NUTS_TRUE(sa.s_in.sa_port == nuts_be16(80)); @@ -131,11 +161,17 @@ test_service_names(void) void test_localhost_v4(void) { - nng_aio *aio; - nng_sockaddr sa; + nng_aio *aio; + nng_sockaddr sa; + nni_resolv_item item = { 0 }; NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nni_resolv_ip("localhost", 80, NNG_AF_INET, true, &sa, aio); + item.ri_family = NNG_AF_INET; + item.ri_host = "localhost"; + item.ri_port = 80; + item.ri_passive = true; + item.ri_sa = &sa; + nni_resolv(&item, aio); nng_aio_wait(aio); NUTS_PASS(nng_aio_result(aio)); NUTS_TRUE(sa.s_in.sa_family == NNG_AF_INET); @@ -147,11 +183,17 @@ test_localhost_v4(void) void test_localhost_unspecified(void) { - nng_aio *aio; - nng_sockaddr sa; + nng_aio *aio; + nng_sockaddr sa; + nni_resolv_item item = { 0 }; NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nni_resolv_ip("localhost", 80, NNG_AF_UNSPEC, true, &sa, aio); + item.ri_family = NNG_AF_UNSPEC; + item.ri_host = "localhost"; + item.ri_port = 80; + item.ri_passive = true; + item.ri_sa = &sa; + nni_resolv(&item, aio); nng_aio_wait(aio); NUTS_PASS(nng_aio_result(aio)); NUTS_TRUE( @@ -174,11 +216,17 @@ test_localhost_unspecified(void) void test_null_passive(void) { - nng_aio *aio; - nng_sockaddr sa; + nng_aio *aio; + nng_sockaddr sa; + nni_resolv_item item = { 0 }; NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nni_resolv_ip(NULL, 80, NNG_AF_INET, true, &sa, aio); + item.ri_family = NNG_AF_INET; + item.ri_host = NULL; + item.ri_port = 80; + item.ri_passive = true; + item.ri_sa = &sa; + nni_resolv(&item, aio); nng_aio_wait(aio); NUTS_PASS(nng_aio_result(aio)); NUTS_TRUE(sa.s_in.sa_family == NNG_AF_INET); @@ -190,11 +238,17 @@ test_null_passive(void) void test_null_not_passive(void) { - nng_aio *aio; - nng_sockaddr sa; + nng_aio *aio; + nng_sockaddr sa; + nni_resolv_item item = { 0 }; NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nni_resolv_ip(NULL, 80, NNG_AF_INET, false, &sa, aio); + item.ri_family = NNG_AF_INET; + item.ri_host = NULL; + item.ri_port = 80; + item.ri_passive = false; + item.ri_sa = &sa; + nni_resolv(&item, aio); nng_aio_wait(aio); // We can either get invalid address, or a loopback address. // Most systems do the former, but Linux does the latter. diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index d94e1547..fd02e080 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -126,7 +126,7 @@ extern void nni_win_io_init(nni_win_io *, nni_win_io_cb, void *); extern int nni_win_io_register(HANDLE); -extern int nni_win_sockaddr2nn(nni_sockaddr *, const SOCKADDR_STORAGE *); +extern int nni_win_sockaddr2nn(nni_sockaddr *, const void *, size_t); extern int nni_win_nn2sockaddr(SOCKADDR_STORAGE *, const nni_sockaddr *); #define NNG_PLATFORM_DIR_SEP "\\" diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index 5f742d42..83da0b56 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -22,33 +22,18 @@ // host file, WINS, or other naming services. As a result, we just build // our own limited asynchronous resolver with threads. -static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; -static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); -static bool resolv_fini = false; -static nni_list resolv_aios; -static nni_thr *resolv_thrs; -static int16_t resolv_num_thr; - -typedef struct resolv_item resolv_item; -struct resolv_item { - int family; - bool passive; - char host[256]; - char serv[8]; - nni_aio *aio; - nng_sockaddr *sa; -}; - -static void -resolv_free_item(resolv_item *item) -{ - NNI_FREE_STRUCT(item); -} +static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; +static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); +static bool resolv_fini = false; +static nni_list resolv_aios; +static nni_thr *resolv_thrs; +static nni_aio **resolv_active; +static int16_t resolv_num_thr; static void resolv_cancel(nni_aio *aio, void *arg, int rv) { - resolv_item *item = arg; + nni_resolv_item *item = arg; nni_mtx_lock(&resolv_mtx); if (item != nni_aio_get_prov_data(aio)) { @@ -60,15 +45,15 @@ resolv_cancel(nni_aio *aio, void *arg, int rv) // We have not been picked up by a resolver thread yet, // so we can just discard everything. nni_aio_list_remove(aio); - nni_mtx_unlock(&resolv_mtx); - resolv_free_item(item); } else { - // Resolver still working, so just unlink our AIO to - // discard our interest in the results. - item->aio = NULL; - item->sa = NULL; - nni_mtx_unlock(&resolv_mtx); + for (int i = 0; i < resolv_num_thr; i++) { + if (resolv_active[i] == aio) { + resolv_active[i] = NULL; + break; + } + } } + nni_mtx_unlock(&resolv_mtx); nni_aio_finish_error(aio, rv); } @@ -99,169 +84,55 @@ resolv_errno(int rv) } } -static int -resolv_task(resolv_item *item) -{ - struct addrinfo hints; - struct addrinfo *results; - struct addrinfo *probe; - int rv; - - results = NULL; - - memset(&hints, 0, sizeof(hints)); - hints.ai_flags = AI_ADDRCONFIG; - if (item->passive) { - hints.ai_flags |= AI_PASSIVE; - } - hints.ai_family = item->family; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags |= AI_NUMERICSERV; - - if ((rv = getaddrinfo(item->host[0] != 0 ? item->host : NULL, - item->serv, &hints, &results)) != 0) { - rv = resolv_errno(rv); - goto done; - } - - // We only take the first matching address. Presumably - // DNS load balancing is done by the resolver/server. - - rv = NNG_EADDRINVAL; - for (probe = results; probe != NULL; probe = probe->ai_next) { - if (probe->ai_addr->sa_family == AF_INET) { - break; - } -#if NNG_ENABLE_IPV6 - if (probe->ai_addr->sa_family == AF_INET6) { - break; - } -#endif - } - - nni_mtx_lock(&resolv_mtx); - if ((probe != NULL) && (item->aio != NULL)) { - struct sockaddr_in *sin; -#ifdef NNG_ENABLE_IPV6 - struct sockaddr_in6 *sin6; -#endif - nni_sockaddr *sa; - - sa = item->sa; - - switch (probe->ai_addr->sa_family) { - case AF_INET: - rv = 0; - sin = (void *) probe->ai_addr; - sa->s_in.sa_family = NNG_AF_INET; - sa->s_in.sa_port = sin->sin_port; - sa->s_in.sa_addr = sin->sin_addr.s_addr; - break; -#ifdef NNG_ENABLE_IPV6 - case AF_INET6: - rv = 0; - sin6 = (void *) probe->ai_addr; - sa->s_in6.sa_family = NNG_AF_INET6; - sa->s_in6.sa_port = sin6->sin6_port; - sa->s_in6.sa_scope = sin6->sin6_scope_id; - memcpy(sa->s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); - break; -#endif - } - } - nni_mtx_unlock(&resolv_mtx); - -done: - - if (results != NULL) { - freeaddrinfo(results); - } - return (rv); -} - void -nni_resolv_ip(const char *host, uint16_t port, int family, bool passive, - nng_sockaddr *sa, nni_aio *aio) +nni_resolv(nni_resolv_item *item, nni_aio *aio) { - resolv_item *item; - int fam; - int rv; - nni_aio_reset(aio); - if (host != NULL) { - if ((strlen(host) >= sizeof(item->host)) || - (strcmp(host, "*") == 0)) { + if (item->ri_host != NULL) { + if ((strlen(item->ri_host) >= 256) || + (strcmp(item->ri_host, "*") == 0)) { nni_aio_finish_error(aio, NNG_EADDRINVAL); return; } } - switch (family) { - case NNG_AF_INET: - fam = AF_INET; - break; -#ifdef NNG_ENABLE_IPV6 - case NNG_AF_INET6: - fam = AF_INET6; - break; - case NNG_AF_UNSPEC: - fam = AF_UNSPEC; - break; -#else - case NNG_AF_UNSPEC: - fam = AF_INET; - break; -#endif - default: - nni_aio_finish_error(aio, NNG_ENOTSUP); - return; - } - - if ((item = NNI_ALLOC_STRUCT(item)) == NULL) { - nni_aio_finish_error(aio, NNG_ENOMEM); - return; - } - - snprintf(item->serv, sizeof(item->serv), "%u", port); - if (host == NULL) { - item->host[0] = '\0'; - } else { - snprintf(item->host, sizeof(item->host), "%s", host); - } - - item->sa = sa; - item->passive = passive; - item->aio = aio; - item->family = fam; nni_mtx_lock(&resolv_mtx); + nni_aio_set_prov_data(aio, item); if (!nni_aio_start(aio, resolv_cancel, item)) { nni_mtx_unlock(&resolv_mtx); - resolv_free_item(item); return; } + if (resolv_fini) { nni_mtx_unlock(&resolv_mtx); - resolv_free_item(item); nni_aio_finish_error(aio, NNG_ECLOSED); return; } - nni_aio_set_prov_data(aio, item); nni_list_append(&resolv_aios, aio); nni_cv_wake1(&resolv_cv); nni_mtx_unlock(&resolv_mtx); } void -resolv_worker(void *notused) +resolv_worker(void *index) { + int tid = (int) (intptr_t) index; + struct addrinfo hints; + struct addrinfo *results; + struct addrinfo *probe; + int rv; + char serv[8]; + char host[256]; + nni_aio *aio; + nni_resolv_item *item; - NNI_ARG_UNUSED(notused); + nni_thr_set_name(NULL, "nng:resolver"); nni_mtx_lock(&resolv_mtx); for (;;) { - nni_aio *aio; - resolv_item *item; - int rv; + nni_aio *aio; + nni_resolv_item *item; + int rv; if ((aio = nni_list_first(&resolv_aios)) == NULL) { if (resolv_fini) { @@ -273,21 +144,107 @@ resolv_worker(void *notused) item = nni_aio_get_prov_data(aio); nni_aio_list_remove(aio); + resolv_active[tid] = aio; + nni_aio_list_remove(aio); + + snprintf(host, sizeof(host), "%s", + item->ri_host ? item->ri_host : ""); + snprintf(serv, sizeof(serv), "%u", item->ri_port); + + // We treat these all as IP addresses. The service and the + // host part are split. + memset(&hints, 0, sizeof(hints)); + + results = NULL; + switch (item->ri_family) { + case NNG_AF_INET: + hints.ai_family = AF_INET; + break; + +#ifdef NNG_ENABLE_IPV6 + case NNG_AF_INET6: + hints.ai_family = AF_INET6; + break; + case NNG_AF_UNSPEC: + hints.ai_family = AF_UNSPEC; + break; +#else + case NNG_AF_UNSPEC: + hints.ai_family = AF_INET; + break; +#endif + default: + resolv_active[tid] = NULL; + nni_aio_finish_error(aio, NNG_ENOTSUP); + continue; + } + +#ifdef AI_ADDRCONFIG + hints.ai_flags = AI_ADDRCONFIG; +#endif + if (item->ri_passive) { + hints.ai_flags |= AI_PASSIVE; + } + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags |= AI_NUMERICSERV; - // Now attempt to do the work. This runs synchronously. nni_mtx_unlock(&resolv_mtx); - rv = resolv_task(item); + rv = getaddrinfo( + host[0] != 0 ? host : NULL, serv, &hints, &results); nni_mtx_lock(&resolv_mtx); - // Check to make sure we were not canceled. - if ((aio = item->aio) != NULL) { - nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - item->sa = NULL; + if ((aio = resolv_active[tid]) == NULL) { + // no more interest (canceled), so ignore the result + // and carry on + if (results != NULL) { + freeaddrinfo(results); + } + continue; + } + resolv_active[tid] = NULL; - nni_aio_finish(aio, rv, 0); + if (rv != 0) { + rv = resolv_errno(rv); + nni_aio_finish_error(aio, rv); + if (results != NULL) { + freeaddrinfo(results); + } + continue; } - resolv_free_item(item); + + // We only take the first matching address. Presumably + // DNS load balancing is done by the resolver/server. + + rv = NNG_EADDRINVAL; + for (probe = results; probe != NULL; probe = probe->ai_next) { + if (probe->ai_addr->sa_family == AF_INET) { + break; + } +#ifdef NNG_ENABLE_IPV6 + if (probe->ai_addr->sa_family == AF_INET6) { + break; + } +#endif + } + + if (probe == NULL) { + // no match + nni_aio_finish_error(aio, rv); + if (results != NULL) { + freeaddrinfo(results); + } + continue; + } + + item = nni_aio_get_prov_data(aio); + nni_aio_set_prov_data(aio, NULL); + NNI_ASSERT(item != NULL); + + (void) nni_win_sockaddr2nn( + item->ri_sa, probe->ai_addr, probe->ai_addrlen); + + freeaddrinfo(results); + nni_aio_finish(aio, 0, 0); } nni_mtx_unlock(&resolv_mtx); } @@ -389,7 +346,7 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port) rv = nni_win_error(rv); goto done; } - nni_win_sockaddr2nn(sa, (void *) results->ai_addr); + nni_win_sockaddr2nn(sa, results->ai_addr, results->ai_addrlen); freeaddrinfo(results); done: @@ -442,8 +399,10 @@ nni_win_resolv_sysinit(nng_init_params *params) params->num_resolver_threads = resolv_num_thr; // no limit on the maximum for now - resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr); - if (resolv_thrs == NULL) { + resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr); + resolv_active = NNI_ALLOC_STRUCTS(resolv_active, resolv_num_thr); + if (resolv_thrs == NULL || resolv_active == NULL) { + nni_win_resolv_sysfini(); return (NNG_ENOMEM); } @@ -468,10 +427,15 @@ nni_win_resolv_sysfini(void) resolv_fini = true; nni_cv_wake(&resolv_cv); nni_mtx_unlock(&resolv_mtx); - for (int i = 0; i < resolv_num_thr; i++) { - nni_thr_fini(&resolv_thrs[i]); + if (resolv_thrs != NULL) { + for (int i = 0; i < resolv_num_thr; i++) { + nni_thr_fini(&resolv_thrs[i]); + } + NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr); + } + if (resolv_active != NULL) { + NNI_FREE_STRUCTS(resolv_active, resolv_num_thr); } - NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr); } #endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_sockaddr.c b/src/platform/windows/win_sockaddr.c index 585096e9..60f6250d 100644 --- a/src/platform/windows/win_sockaddr.c +++ b/src/platform/windows/win_sockaddr.c @@ -47,19 +47,24 @@ nni_win_nn2sockaddr(SOCKADDR_STORAGE *ss, const nni_sockaddr *sa) } int -nni_win_sockaddr2nn(nni_sockaddr *sa, const SOCKADDR_STORAGE *ss) +nni_win_sockaddr2nn(nni_sockaddr *sa, const void *s, size_t sz) { - SOCKADDR_IN *sin; + SOCKADDR_IN *sin; + nng_sockaddr_in *nsin; #ifdef NNG_ENABLE_IPV6 - SOCKADDR_IN6 *sin6; + SOCKADDR_IN6 *sin6; + nng_sockaddr_in6 *nsin6; #endif - if ((ss == NULL) || (sa == NULL)) { + if ((s == NULL) || (sa == NULL)) { return (-1); } - switch (ss->ss_family) { + switch (((const SOCKADDR *) s)->sa_family) { case PF_INET: - sin = (void *) ss; + if (sz < sizeof(*sin)) { + return -1; + } + sin = (void *) s; sa->s_in.sa_family = NNG_AF_INET; sa->s_in.sa_port = sin->sin_port; sa->s_in.sa_addr = sin->sin_addr.s_addr; @@ -67,7 +72,10 @@ nni_win_sockaddr2nn(nni_sockaddr *sa, const SOCKADDR_STORAGE *ss) #ifdef NNG_ENABLE_IPV6 case PF_INET6: - sin6 = (void *) ss; + if (sz < sizeof(*sin6)) { + return (-1); + } + sin6 = (void *) s; sa->s_in6.sa_family = NNG_AF_INET6; sa->s_in6.sa_port = sin6->sin6_port; sa->s_in6.sa_scope = sin6->sin6_scope_id; diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c index bd7f8504..e8e66f0e 100644 --- a/src/platform/windows/win_tcpconn.c +++ b/src/platform/windows/win_tcpconn.c @@ -268,7 +268,7 @@ tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t) nni_tcp_conn *c = arg; nng_sockaddr sa; - if (nni_win_sockaddr2nn(&sa, &c->peername) < 0) { + if (nni_win_sockaddr2nn(&sa, &c->peername, sizeof(c->peername)) < 0) { return (NNG_EADDRINVAL); } return (nni_copyout_sockaddr(&sa, buf, szp, t)); @@ -280,7 +280,7 @@ tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t) nni_tcp_conn *c = arg; nng_sockaddr sa; - if (nni_win_sockaddr2nn(&sa, &c->sockname) < 0) { + if (nni_win_sockaddr2nn(&sa, &c->sockname, sizeof(c->sockname)) < 0) { return (NNG_EADDRINVAL); } return (nni_copyout_sockaddr(&sa, buf, szp, t)); diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c index 11f4f669..12bedf0a 100644 --- a/src/platform/windows/win_tcpdial.c +++ b/src/platform/windows/win_tcpdial.c @@ -307,7 +307,7 @@ tcp_dialer_get_locaddr(void *arg, void *buf, size_t *szp, nni_type t) nng_sockaddr sa; nni_mtx_lock(&d->mtx); - if (nni_win_sockaddr2nn(&sa, &d->src) != 0) { + if (nni_win_sockaddr2nn(&sa, &d->src, sizeof(d->src)) != 0) { sa.s_family = NNG_AF_UNSPEC; } nni_mtx_unlock(&d->mtx); diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c index c247ed4f..0dd5ae16 100644 --- a/src/platform/windows/win_tcplisten.c +++ b/src/platform/windows/win_tcplisten.c @@ -337,7 +337,7 @@ tcp_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_type t) nng_sockaddr sa; nni_mtx_lock(&l->mtx); if (l->started) { - nni_win_sockaddr2nn(&sa, &l->ss); + nni_win_sockaddr2nn(&sa, &l->ss, sizeof(l->ss)); } else { sa.s_family = NNG_AF_UNSPEC; } diff --git a/src/platform/windows/win_udp.c b/src/platform/windows/win_udp.c index c74b3f77..a280e116 100644 --- a/src/platform/windows/win_udp.c +++ b/src/platform/windows/win_udp.c @@ -204,7 +204,7 @@ udp_recv_cb(nni_win_io *io, int rv, size_t num) // convert address from Windows form... if ((sa = nni_aio_get_input(aio, 0)) != NULL) { - if (nni_win_sockaddr2nn(sa, &u->rxsa) != 0) { + if (nni_win_sockaddr2nn(sa, &u->rxsa, sizeof(u->rxsa)) != 0) { rv = NNG_EADDRINVAL; num = 0; } @@ -306,7 +306,7 @@ nni_plat_udp_sockname(nni_plat_udp *udp, nni_sockaddr *sa) if (getsockname(udp->s, (SOCKADDR *) &ss, &sz) < 0) { return (nni_win_error(GetLastError())); } - return (nni_win_sockaddr2nn(sa, &ss)); + return (nni_win_sockaddr2nn(sa, &ss, sz)); } // Joining a multicast group is different than binding to a multicast -- cgit v1.2.3-70-g09d2