aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_resolv_gai.c155
1 files changed, 97 insertions, 58 deletions
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index ad1d5147..dcaef409 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -37,8 +37,11 @@
#define NNG_POSIX_RESOLV_CONCURRENCY 4
#endif
-static nni_taskq *resolv_tq = NULL;
-static nni_mtx resolv_mtx;
+static nni_mtx resolv_mtx;
+static nni_cv resolv_cv;
+static bool resolv_fini;
+static nni_list resolv_aios;
+static nni_thr resolv_thrs[NNG_POSIX_RESOLV_CONCURRENCY];
typedef struct resolv_item resolv_item;
struct resolv_item {
@@ -48,45 +51,40 @@ struct resolv_item {
const char * serv;
int proto;
nni_aio * aio;
- nni_task * task;
nng_sockaddr sa;
};
static void
-resolv_finish(resolv_item *item, int rv)
-{
- nni_aio *aio;
-
- if (((aio = item->aio) != NULL) &&
- (nni_aio_get_prov_data(aio) == item)) {
- nng_sockaddr *sa = nni_aio_get_input(aio, 0);
- nni_aio_set_prov_data(aio, NULL);
- item->aio = NULL;
- memcpy(sa, &item->sa, sizeof(*sa));
- nni_aio_finish(aio, rv, 0);
- nni_task_fini(item->task);
- NNI_FREE_STRUCT(item);
- }
-}
-
-static void
resolv_cancel(nni_aio *aio, int rv)
{
resolv_item *item;
nni_mtx_lock(&resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
+ // Already canceled?
nni_mtx_unlock(&resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
- item->aio = NULL;
- nni_mtx_unlock(&resolv_mtx);
+ if (nni_aio_list_active(aio)) {
+ // We have not been picked up by a resolver thread yet,
+ // so we can just discard everything.
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&resolv_mtx);
+ NNI_FREE_STRUCT(item);
+ } else {
+ // This case indicates the resolver is still processing our
+ // node. We can discard our interest in the result, but we
+ // can't interrupt the resolver itself. (Too bad, name
+ // resolution is utterly synchronous for now.)
+ item->aio = NULL;
+ nni_mtx_unlock(&resolv_mtx);
+ }
nni_aio_finish_error(aio, rv);
}
static int
-nni_posix_gai_errno(int rv)
+posix_gai_errno(int rv)
{
switch (rv) {
case 0:
@@ -116,25 +114,14 @@ nni_posix_gai_errno(int rv)
}
}
-static void
-resolv_task(void *arg)
+static int
+resolv_task(resolv_item *item)
{
- resolv_item * item = arg;
struct addrinfo hints;
struct addrinfo *results;
struct addrinfo *probe;
int rv;
- nni_mtx_lock(&resolv_mtx);
- if (item->aio == NULL) {
- nni_mtx_unlock(&resolv_mtx);
- // Caller canceled, and no longer cares about this.
- nni_task_fini(item->task);
- NNI_FREE_STRUCT(item);
- return;
- }
- nni_mtx_unlock(&resolv_mtx);
-
results = NULL;
// We treat these all as IP addresses. The service and the
@@ -152,7 +139,7 @@ resolv_task(void *arg)
rv = getaddrinfo(item->name, item->serv, &hints, &results);
if (rv != 0) {
- rv = nni_posix_gai_errno(rv);
+ rv = posix_gai_errno(rv);
goto done;
}
@@ -196,9 +183,7 @@ done:
freeaddrinfo(results);
}
- nni_mtx_lock(&resolv_mtx);
- resolv_finish(item, rv);
- nni_mtx_unlock(&resolv_mtx);
+ return (rv);
}
static void
@@ -232,13 +217,6 @@ resolv_ip(const char *host, const char *serv, int passive, int family,
return;
}
- if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) !=
- 0) {
- NNI_FREE_STRUCT(item);
- nni_aio_finish_error(aio, rv);
- return;
- };
-
// NB: host and serv must remain valid until this is completed.
memset(&item->sa, 0, sizeof(item->sa));
item->passive = passive;
@@ -249,14 +227,19 @@ resolv_ip(const char *host, const char *serv, int passive, int family,
item->family = fam;
nni_mtx_lock(&resolv_mtx);
- if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) {
+ if (resolv_fini) {
+ rv = NNG_ECLOSED;
+ } else {
+ rv = nni_aio_schedule(aio, resolv_cancel, item);
+ }
+ if (rv != 0) {
nni_mtx_unlock(&resolv_mtx);
- nni_task_fini(item->task);
NNI_FREE_STRUCT(item);
nni_aio_finish_error(aio, rv);
return;
}
- nni_task_dispatch(item->task);
+ nni_list_append(&resolv_aios, aio);
+ nni_cv_wake1(&resolv_cv);
nni_mtx_unlock(&resolv_mtx);
}
@@ -274,27 +257,83 @@ nni_plat_udp_resolv(
resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}
+void
+resolv_worker(void *notused)
+{
+
+ NNI_ARG_UNUSED(notused);
+
+ nni_mtx_lock(&resolv_mtx);
+ for (;;) {
+ nni_aio * aio;
+ resolv_item *item;
+ int rv;
+
+ if ((aio = nni_list_first(&resolv_aios)) == NULL) {
+ if (resolv_fini) {
+ break;
+ }
+ nni_cv_wait(&resolv_cv);
+ continue;
+ }
+
+ item = nni_aio_get_prov_data(aio);
+ nni_aio_list_remove(aio);
+
+ // Now attempt to do the work. This runs synchronously.
+ nni_mtx_unlock(&resolv_mtx);
+ rv = resolv_task(item);
+ nni_mtx_lock(&resolv_mtx);
+
+ // Check to make sure we were not canceled.
+ if ((aio = item->aio) != NULL) {
+ nng_sockaddr *sa = nni_aio_get_input(aio, 0);
+ nni_aio_set_prov_data(aio, NULL);
+ item->aio = NULL;
+ memcpy(sa, &item->sa, sizeof(*sa));
+ nni_aio_finish(aio, rv, 0);
+
+ NNI_FREE_STRUCT(item);
+ }
+ }
+ nni_mtx_unlock(&resolv_mtx);
+}
+
int
nni_posix_resolv_sysinit(void)
{
- int rv;
-
nni_mtx_init(&resolv_mtx);
+ nni_cv_init(&resolv_cv, &resolv_mtx);
+ nni_aio_list_init(&resolv_aios);
+
+ resolv_fini = false;
- if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) {
- nni_mtx_fini(&resolv_mtx);
- return (rv);
+ for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
+ int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL);
+ if (rv != 0) {
+ nni_posix_resolv_sysfini();
+ return (rv);
+ }
+ }
+ for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
+ nni_thr_run(&resolv_thrs[i]);
}
+
return (0);
}
void
nni_posix_resolv_sysfini(void)
{
- if (resolv_tq != NULL) {
- nni_taskq_fini(resolv_tq);
- resolv_tq = NULL;
+ nni_mtx_lock(&resolv_mtx);
+ resolv_fini = true;
+ nni_cv_wake(&resolv_cv);
+ nni_mtx_unlock(&resolv_mtx);
+
+ for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
+ nni_thr_fini(&resolv_thrs[i]);
}
+ nni_cv_fini(&resolv_cv);
nni_mtx_fini(&resolv_mtx);
}