aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_resolv_gai.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/posix/posix_resolv_gai.c')
-rw-r--r--src/platform/posix/posix_resolv_gai.c365
1 files changed, 157 insertions, 208 deletions
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index b5f48ecc..f3f69f9f 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -45,33 +45,18 @@
#endif
#endif
-static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER;
-static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx);
-static bool resolv_fini = false;
-static nni_list resolv_aios;
-static nni_thr *resolv_thrs;
-static int16_t resolv_num_thr;
-
-typedef struct resolv_item resolv_item;
-struct resolv_item {
- int family;
- bool passive;
- char host[256];
- char serv[8];
- nni_aio *aio;
- nng_sockaddr *sa;
-};
-
-static void
-resolv_free_item(resolv_item *item)
-{
- NNI_FREE_STRUCT(item);
-}
+static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER;
+static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx);
+static bool resolv_fini = false;
+static nni_list resolv_aios;
+static nni_thr *resolv_thrs;
+static nni_aio **resolv_active;
+static int16_t resolv_num_thr;
static void
resolv_cancel(nni_aio *aio, void *arg, int rv)
{
- resolv_item *item = arg;
+ nni_resolv_item *item = arg;
nni_mtx_lock(&resolv_mtx);
if (item != nni_aio_get_prov_data(aio)) {
@@ -84,17 +69,18 @@ resolv_cancel(nni_aio *aio, void *arg, int rv)
// We have not been picked up by a resolver thread yet,
// so we can just discard everything.
nni_aio_list_remove(aio);
- nni_mtx_unlock(&resolv_mtx);
- resolv_free_item(item);
} else {
- // This case indicates the resolver is still processing our
- // node. We can discard our interest in the result, but we
- // can't interrupt the resolver itself. (Too bad, name
- // resolution is utterly synchronous for now.)
- item->aio = NULL;
- item->sa = NULL;
- nni_mtx_unlock(&resolv_mtx);
+ // Remove it from the thread pending list. We cannot abort
+ // the actual lookup, but this abandons interest in it, and
+ // the resolver thread will pick the next item when done.
+ for (int i = 0; i < resolv_num_thr; i++) {
+ if (resolv_active[i] == aio) {
+ resolv_active[i] = NULL;
+ break;
+ }
+ }
}
+ nni_mtx_unlock(&resolv_mtx);
nni_aio_finish_error(aio, rv);
}
@@ -139,157 +125,27 @@ posix_gai_errno(int rv)
}
}
-static int
-resolv_task(resolv_item *item)
-{
- struct addrinfo hints;
- struct addrinfo *results;
- struct addrinfo *probe;
- int rv;
-
- results = NULL;
-
- // We treat these all as IP addresses. The service and the
- // host part are split.
- memset(&hints, 0, sizeof(hints));
-#ifdef AI_ADDRCONFIG
- hints.ai_flags = AI_ADDRCONFIG;
-#endif
- if (item->passive) {
- hints.ai_flags |= AI_PASSIVE;
- }
- hints.ai_family = item->family;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags |= AI_NUMERICSERV;
-
- // We can pass any non-zero service number, but we have to pass
- // *something*, in case we are using a NULL hostname.
- if ((rv = getaddrinfo(item->host[0] != 0 ? item->host : NULL,
- item->serv, &hints, &results)) != 0) {
- rv = posix_gai_errno(rv);
- goto done;
- }
-
- // We only take the first matching address. Presumably
- // DNS load balancing is done by the resolver/server.
-
- rv = NNG_EADDRINVAL;
- for (probe = results; probe != NULL; probe = probe->ai_next) {
- if (probe->ai_addr->sa_family == AF_INET) {
- break;
- }
-#ifdef NNG_ENABLE_IPV6
- if (probe->ai_addr->sa_family == AF_INET6) {
- break;
- }
-#endif
- }
-
- nni_mtx_lock(&resolv_mtx);
- if ((probe != NULL) && (item->aio != NULL)) {
- struct sockaddr_in *sin;
-#ifdef NNG_ENABLE_IPV6
- struct sockaddr_in6 *sin6;
-#endif
- nng_sockaddr *sa = item->sa;
-
- switch (probe->ai_addr->sa_family) {
- case AF_INET:
- rv = 0;
- sin = (void *) probe->ai_addr;
- sa->s_in.sa_family = NNG_AF_INET;
- sa->s_in.sa_port = sin->sin_port;
- sa->s_in.sa_addr = sin->sin_addr.s_addr;
- break;
-#ifdef NNG_ENABLE_IPV6
- case AF_INET6:
- rv = 0;
- sin6 = (void *) probe->ai_addr;
- sa->s_in6.sa_family = NNG_AF_INET6;
- sa->s_in6.sa_port = sin6->sin6_port;
- sa->s_in6.sa_scope = sin6->sin6_scope_id;
- memcpy(sa->s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16);
- break;
-#endif
- }
- }
- nni_mtx_unlock(&resolv_mtx);
-
-done:
-
- if (results != NULL) {
- freeaddrinfo(results);
- }
-
- return (rv);
-}
-
void
-nni_resolv_ip(const char *host, uint16_t port, int af, bool passive,
- nng_sockaddr *sa, nni_aio *aio)
+nni_resolv(nni_resolv_item *item, nni_aio *aio)
{
- resolv_item *item;
- sa_family_t fam;
-
nni_aio_reset(aio);
- if (host != NULL) {
- if ((strlen(host) >= sizeof(item->host)) ||
- (strcmp(host, "*") == 0)) {
+ if (item->ri_host != NULL) {
+ if ((strlen(item->ri_host) >= 256) ||
+ (strcmp(item->ri_host, "*") == 0)) {
nni_aio_finish_error(aio, NNG_EADDRINVAL);
return;
}
}
- switch (af) {
- case NNG_AF_INET:
- fam = AF_INET;
- break;
-
-#ifdef NNG_ENABLE_IPV6
- case NNG_AF_INET6:
- fam = AF_INET6;
- break;
- case NNG_AF_UNSPEC:
- fam = AF_UNSPEC;
- break;
-#else
- case NNG_AF_UNSPEC:
- fam = AF_INET;
- break;
-#endif
-
- default:
- nni_aio_finish_error(aio, NNG_ENOTSUP);
- return;
- }
-
- if ((item = NNI_ALLOC_STRUCT(item)) == NULL) {
- nni_aio_finish_error(aio, NNG_ENOMEM);
- return;
- }
-
- snprintf(item->serv, sizeof(item->serv), "%u", port);
- if (host == NULL) {
- item->host[0] = '\0';
- } else {
- snprintf(item->host, sizeof(item->host), "%s", host);
- }
-
- item->aio = aio;
- item->family = fam;
- item->passive = passive;
- item->sa = sa;
nni_mtx_lock(&resolv_mtx);
nni_aio_set_prov_data(aio, item);
if (!nni_aio_start(aio, resolv_cancel, item)) {
nni_mtx_unlock(&resolv_mtx);
- resolv_free_item(item);
return;
}
if (resolv_fini) {
nni_mtx_unlock(&resolv_mtx);
- resolv_free_item(item);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
@@ -299,18 +155,22 @@ nni_resolv_ip(const char *host, uint16_t port, int af, bool passive,
}
void
-resolv_worker(void *unused)
+resolv_worker(void *index)
{
-
- NNI_ARG_UNUSED(unused);
+ int tid = (int) (intptr_t) index;
+ struct addrinfo hints;
+ struct addrinfo *results;
+ struct addrinfo *probe;
+ int rv;
+ char serv[8];
+ char host[256];
+ nni_aio *aio;
+ nni_resolv_item *item;
nni_thr_set_name(NULL, "nng:resolver");
nni_mtx_lock(&resolv_mtx);
for (;;) {
- nni_aio *aio;
- resolv_item *item;
- int rv;
if ((aio = nni_list_first(&resolv_aios)) == NULL) {
if (resolv_fini) {
@@ -322,22 +182,111 @@ resolv_worker(void *unused)
item = nni_aio_get_prov_data(aio);
nni_aio_list_remove(aio);
+ resolv_active[tid] = aio;
+
+ snprintf(host, sizeof(host), "%s",
+ item->ri_host ? item->ri_host : "");
+ snprintf(serv, sizeof(serv), "%u", item->ri_port);
- // Now attempt to do the work. This runs synchronously.
+ // We treat these all as IP addresses. The service and the
+ // host part are split.
+ memset(&hints, 0, sizeof(hints));
+
+ results = NULL;
+
+ switch (item->ri_family) {
+ case NNG_AF_INET:
+ hints.ai_family = AF_INET;
+ break;
+
+#ifdef NNG_ENABLE_IPV6
+ case NNG_AF_INET6:
+ hints.ai_family = AF_INET6;
+ break;
+ case NNG_AF_UNSPEC:
+ hints.ai_family = AF_UNSPEC;
+ break;
+#else
+ case NNG_AF_UNSPEC:
+ hints.ai_family = AF_INET;
+ break;
+#endif
+ default:
+ resolv_active[tid] = NULL;
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+ continue;
+ }
+
+#ifdef AI_ADDRCONFIG
+ hints.ai_flags = AI_ADDRCONFIG;
+#endif
+ if (item->ri_passive) {
+ hints.ai_flags |= AI_PASSIVE;
+ }
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags |= AI_NUMERICSERV;
+
+ // We can pass any non-zero service number, but we have to pass
+ // *something*, in case we are using a NULL hostname. The lock
+ // is dropped to allow other submissions, and other threads to
+ // process.
nni_mtx_unlock(&resolv_mtx);
- rv = resolv_task(item);
+ rv = getaddrinfo(
+ host[0] != 0 ? host : NULL, serv, &hints, &results);
nni_mtx_lock(&resolv_mtx);
- // Check to make sure we were not canceled.
- if ((aio = item->aio) != NULL) {
+ if ((aio = resolv_active[tid]) == NULL) {
+ // no more interest (canceled), so ignore the result
+ // and carry on
+ if (results != NULL) {
+ freeaddrinfo(results);
+ }
+ continue;
+ }
+ resolv_active[tid] = NULL;
+
+ if (rv != 0) {
+ rv = posix_gai_errno(rv);
+ nni_aio_finish_error(aio, rv);
+ if (results != NULL) {
+ freeaddrinfo(results);
+ }
+ continue;
+ }
- nni_aio_set_prov_data(aio, NULL);
- item->aio = NULL;
- item->sa = NULL;
+ // We only take the first matching address. Presumably
+ // DNS load balancing is done by the resolver/server.
- nni_aio_finish(aio, rv, 0);
+ rv = NNG_EADDRINVAL;
+ for (probe = results; probe != NULL; probe = probe->ai_next) {
+ if (probe->ai_addr->sa_family == AF_INET) {
+ break;
+ }
+#ifdef NNG_ENABLE_IPV6
+ if (probe->ai_addr->sa_family == AF_INET6) {
+ break;
+ }
+#endif
}
- resolv_free_item(item);
+
+ if (probe == NULL) {
+ // no match
+ nni_aio_finish_error(aio, rv);
+ if (results != NULL) {
+ freeaddrinfo(results);
+ }
+ continue;
+ }
+
+ item = nni_aio_get_prov_data(aio);
+ nni_aio_set_prov_data(aio, NULL);
+ NNI_ASSERT(item != NULL);
+
+ (void) nni_posix_sockaddr2nn(
+ item->ri_sa, probe->ai_addr, probe->ai_addrlen);
+
+ freeaddrinfo(results);
+ nni_aio_finish(aio, 0, 0);
}
nni_mtx_unlock(&resolv_mtx);
}
@@ -350,8 +299,7 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port)
int rv;
char *port;
char *host;
- char *buf;
- size_t buf_len;
+ char buf[64];
#ifdef NNG_ENABLE_IPV6
bool v6 = false;
@@ -363,11 +311,7 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port)
addr = "";
}
- buf_len = strlen(addr) + 1;
- if ((buf = nni_alloc(buf_len)) == NULL) {
- return (NNG_ENOMEM);
- }
- memcpy(buf, addr, buf_len);
+ snprintf(buf, sizeof(buf), "%s", addr);
host = buf;
#ifdef NNG_ENABLE_IPV6
if (*host == '[') {
@@ -446,7 +390,6 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port)
freeaddrinfo(results);
done:
- nni_free(buf, buf_len);
return (rv);
}
@@ -482,6 +425,25 @@ nni_get_port_by_name(const char *name, uint32_t *portp)
return (NNG_EADDRINVAL);
}
+void
+nni_posix_resolv_sysfini(void)
+{
+ nni_mtx_lock(&resolv_mtx);
+ resolv_fini = true;
+ nni_cv_wake(&resolv_cv);
+ nni_mtx_unlock(&resolv_mtx);
+
+ if (resolv_thrs != NULL) {
+ for (int i = 0; i < resolv_num_thr; i++) {
+ nni_thr_fini(&resolv_thrs[i]);
+ }
+ NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr);
+ }
+ if (resolv_active != NULL) {
+ NNI_FREE_STRUCTS(resolv_active, resolv_num_thr);
+ }
+}
+
int
nni_posix_resolv_sysinit(nng_init_params *params)
{
@@ -494,13 +456,16 @@ nni_posix_resolv_sysinit(nng_init_params *params)
}
params->num_resolver_threads = resolv_num_thr;
// no limit on the maximum for now
- resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr);
- if (resolv_thrs == NULL) {
+ resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr);
+ resolv_active = NNI_ALLOC_STRUCTS(resolv_active, resolv_num_thr);
+ if (resolv_thrs == NULL || resolv_active == NULL) {
+ nni_posix_resolv_sysfini();
return (NNG_ENOMEM);
}
for (int i = 0; i < resolv_num_thr; i++) {
- int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL);
+ int rv = nni_thr_init(
+ &resolv_thrs[i], resolv_worker, (void *) (intptr_t) i);
if (rv != 0) {
nni_posix_resolv_sysfini();
return (rv);
@@ -513,20 +478,4 @@ nni_posix_resolv_sysinit(nng_init_params *params)
return (0);
}
-void
-nni_posix_resolv_sysfini(void)
-{
- nni_mtx_lock(&resolv_mtx);
- resolv_fini = true;
- nni_cv_wake(&resolv_cv);
- nni_mtx_unlock(&resolv_mtx);
-
- if (resolv_thrs != NULL) {
- for (int i = 0; i < resolv_num_thr; i++) {
- nni_thr_fini(&resolv_thrs[i]);
- }
- NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr);
- }
-}
-
#endif // NNG_USE_POSIX_RESOLV_GAI