aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/taskq.c22
-rw-r--r--src/platform/posix/posix_resolv_gai.c155
-rw-r--r--src/platform/windows/win_resolv.c193
3 files changed, 209 insertions, 161 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 15351840..f0712e59 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -18,7 +18,6 @@ struct nni_task {
nni_taskq * task_tq;
unsigned task_busy;
bool task_prep;
- bool task_fini;
nni_mtx task_mtx;
nni_cv task_cv;
};
@@ -56,14 +55,6 @@ nni_taskq_thread(void *self)
nni_mtx_lock(&task->task_mtx);
task->task_busy--;
if (task->task_busy == 0) {
- if (task->task_fini) {
- task->task_fini = false;
- nni_mtx_unlock(&task->task_mtx);
- nni_task_fini(task);
-
- nni_mtx_lock(&tq->tq_mtx);
- continue;
- }
nni_cv_wake(&task->task_cv);
}
nni_mtx_unlock(&task->task_mtx);
@@ -158,12 +149,6 @@ nni_task_exec(nni_task *task)
nni_mtx_lock(&task->task_mtx);
task->task_busy--;
if (task->task_busy == 0) {
- if (task->task_fini) {
- task->task_fini = false;
- nni_mtx_unlock(&task->task_mtx);
- nni_task_fini(task);
- return;
- }
nni_cv_wake(&task->task_cv);
}
nni_mtx_unlock(&task->task_mtx);
@@ -238,11 +223,8 @@ nni_task_fini(nni_task *task)
{
NNI_ASSERT(!nni_list_node_active(&task->task_node));
nni_mtx_lock(&task->task_mtx);
- if (task->task_busy) {
- // destroy later.
- task->task_fini = true;
- nni_mtx_unlock(&task->task_mtx);
- return;
+ while (task->task_busy) {
+ nni_cv_wait(&task->task_cv);
}
nni_mtx_unlock(&task->task_mtx);
nni_cv_fini(&task->task_cv);
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index ad1d5147..dcaef409 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -37,8 +37,11 @@
#define NNG_POSIX_RESOLV_CONCURRENCY 4
#endif
-static nni_taskq *resolv_tq = NULL;
-static nni_mtx resolv_mtx;
+static nni_mtx resolv_mtx;
+static nni_cv resolv_cv;
+static bool resolv_fini;
+static nni_list resolv_aios;
+static nni_thr resolv_thrs[NNG_POSIX_RESOLV_CONCURRENCY];
typedef struct resolv_item resolv_item;
struct resolv_item {
@@ -48,45 +51,40 @@ struct resolv_item {
const char * serv;
int proto;
nni_aio * aio;
- nni_task * task;
nng_sockaddr sa;
};
static void
-resolv_finish(resolv_item *item, int rv)
-{
- nni_aio *aio;
-
- if (((aio = item->aio) != NULL) &&
- (nni_aio_get_prov_data(aio) == item)) {
- nng_sockaddr *sa = nni_aio_get_input(aio, 0);
- nni_aio_set_prov_data(aio, NULL);
- item->aio = NULL;
- memcpy(sa, &item->sa, sizeof(*sa));
- nni_aio_finish(aio, rv, 0);
- nni_task_fini(item->task);
- NNI_FREE_STRUCT(item);
- }
-}
-
-static void
resolv_cancel(nni_aio *aio, int rv)
{
resolv_item *item;
nni_mtx_lock(&resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
+ // Already canceled?
nni_mtx_unlock(&resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
- item->aio = NULL;
- nni_mtx_unlock(&resolv_mtx);
+ if (nni_aio_list_active(aio)) {
+ // We have not been picked up by a resolver thread yet,
+ // so we can just discard everything.
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&resolv_mtx);
+ NNI_FREE_STRUCT(item);
+ } else {
+ // This case indicates the resolver is still processing our
+ // node. We can discard our interest in the result, but we
+ // can't interrupt the resolver itself. (Too bad, name
+ // resolution is utterly synchronous for now.)
+ item->aio = NULL;
+ nni_mtx_unlock(&resolv_mtx);
+ }
nni_aio_finish_error(aio, rv);
}
static int
-nni_posix_gai_errno(int rv)
+posix_gai_errno(int rv)
{
switch (rv) {
case 0:
@@ -116,25 +114,14 @@ nni_posix_gai_errno(int rv)
}
}
-static void
-resolv_task(void *arg)
+static int
+resolv_task(resolv_item *item)
{
- resolv_item * item = arg;
struct addrinfo hints;
struct addrinfo *results;
struct addrinfo *probe;
int rv;
- nni_mtx_lock(&resolv_mtx);
- if (item->aio == NULL) {
- nni_mtx_unlock(&resolv_mtx);
- // Caller canceled, and no longer cares about this.
- nni_task_fini(item->task);
- NNI_FREE_STRUCT(item);
- return;
- }
- nni_mtx_unlock(&resolv_mtx);
-
results = NULL;
// We treat these all as IP addresses. The service and the
@@ -152,7 +139,7 @@ resolv_task(void *arg)
rv = getaddrinfo(item->name, item->serv, &hints, &results);
if (rv != 0) {
- rv = nni_posix_gai_errno(rv);
+ rv = posix_gai_errno(rv);
goto done;
}
@@ -196,9 +183,7 @@ done:
freeaddrinfo(results);
}
- nni_mtx_lock(&resolv_mtx);
- resolv_finish(item, rv);
- nni_mtx_unlock(&resolv_mtx);
+ return (rv);
}
static void
@@ -232,13 +217,6 @@ resolv_ip(const char *host, const char *serv, int passive, int family,
return;
}
- if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) !=
- 0) {
- NNI_FREE_STRUCT(item);
- nni_aio_finish_error(aio, rv);
- return;
- };
-
// NB: host and serv must remain valid until this is completed.
memset(&item->sa, 0, sizeof(item->sa));
item->passive = passive;
@@ -249,14 +227,19 @@ resolv_ip(const char *host, const char *serv, int passive, int family,
item->family = fam;
nni_mtx_lock(&resolv_mtx);
- if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) {
+ if (resolv_fini) {
+ rv = NNG_ECLOSED;
+ } else {
+ rv = nni_aio_schedule(aio, resolv_cancel, item);
+ }
+ if (rv != 0) {
nni_mtx_unlock(&resolv_mtx);
- nni_task_fini(item->task);
NNI_FREE_STRUCT(item);
nni_aio_finish_error(aio, rv);
return;
}
- nni_task_dispatch(item->task);
+ nni_list_append(&resolv_aios, aio);
+ nni_cv_wake1(&resolv_cv);
nni_mtx_unlock(&resolv_mtx);
}
@@ -274,27 +257,83 @@ nni_plat_udp_resolv(
resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}
+void
+resolv_worker(void *notused)
+{
+
+ NNI_ARG_UNUSED(notused);
+
+ nni_mtx_lock(&resolv_mtx);
+ for (;;) {
+ nni_aio * aio;
+ resolv_item *item;
+ int rv;
+
+ if ((aio = nni_list_first(&resolv_aios)) == NULL) {
+ if (resolv_fini) {
+ break;
+ }
+ nni_cv_wait(&resolv_cv);
+ continue;
+ }
+
+ item = nni_aio_get_prov_data(aio);
+ nni_aio_list_remove(aio);
+
+ // Now attempt to do the work. This runs synchronously.
+ nni_mtx_unlock(&resolv_mtx);
+ rv = resolv_task(item);
+ nni_mtx_lock(&resolv_mtx);
+
+ // Check to make sure we were not canceled.
+ if ((aio = item->aio) != NULL) {
+ nng_sockaddr *sa = nni_aio_get_input(aio, 0);
+ nni_aio_set_prov_data(aio, NULL);
+ item->aio = NULL;
+ memcpy(sa, &item->sa, sizeof(*sa));
+ nni_aio_finish(aio, rv, 0);
+
+ NNI_FREE_STRUCT(item);
+ }
+ }
+ nni_mtx_unlock(&resolv_mtx);
+}
+
int
nni_posix_resolv_sysinit(void)
{
- int rv;
-
nni_mtx_init(&resolv_mtx);
+ nni_cv_init(&resolv_cv, &resolv_mtx);
+ nni_aio_list_init(&resolv_aios);
+
+ resolv_fini = false;
- if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) {
- nni_mtx_fini(&resolv_mtx);
- return (rv);
+ for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
+ int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL);
+ if (rv != 0) {
+ nni_posix_resolv_sysfini();
+ return (rv);
+ }
+ }
+ for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
+ nni_thr_run(&resolv_thrs[i]);
}
+
return (0);
}
void
nni_posix_resolv_sysfini(void)
{
- if (resolv_tq != NULL) {
- nni_taskq_fini(resolv_tq);
- resolv_tq = NULL;
+ nni_mtx_lock(&resolv_mtx);
+ resolv_fini = true;
+ nni_cv_wake(&resolv_cv);
+ nni_mtx_unlock(&resolv_mtx);
+
+ for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
+ nni_thr_fini(&resolv_thrs[i]);
}
+ nni_cv_fini(&resolv_cv);
nni_mtx_fini(&resolv_mtx);
}
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index 54f1c61c..c0fa1e0a 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -16,68 +16,57 @@
// with it, where looking up names in DNS can poison results for other
// uses, because the asynchronous resolver *only* considers DNS -- ignoring
// host file, WINS, or other naming services. As a result, we just build
-// our own limited asynchronous using a taskq.
-
-// We use a single resolver taskq - but we allocate a few threads
-// for it to ensure that names can be looked up concurrently. This isn't
-// as elegant or scaleable as a true asynchronous resolver would be, but
-// it has the advantage of being fairly portable, and concurrent enough for
-// the vast, vast majority of use cases. The total thread count can be
-// changed with this define.
+// our own limited asynchronous resolver with threads.
#ifndef NNG_WIN_RESOLV_CONCURRENCY
#define NNG_WIN_RESOLV_CONCURRENCY 4
#endif
-static nni_taskq *win_resolv_tq = NULL;
-static nni_mtx win_resolv_mtx;
+static nni_mtx resolv_mtx;
+static nni_cv resolv_cv;
+static bool resolv_fini;
+static nni_list resolv_aios;
+static nni_thr resolv_thrs[NNG_WIN_RESOLV_CONCURRENCY];
-typedef struct win_resolv_item win_resolv_item;
-struct win_resolv_item {
+typedef struct resolv_item resolv_item;
+struct resolv_item {
int family;
int passive;
const char * name;
const char * serv;
int proto;
nni_aio * aio;
- nni_task * task;
nng_sockaddr sa;
};
static void
-win_resolv_finish(win_resolv_item *item, int rv)
+resolv_cancel(nni_aio *aio, int rv)
{
- nni_aio *aio;
-
- if (((aio = item->aio) != NULL) &&
- (nni_aio_get_prov_data(aio) == item)) {
- nni_sockaddr *sa = nni_aio_get_input(aio, 0);
- nni_aio_set_prov_data(aio, NULL);
- memcpy(sa, &item->sa, sizeof(*sa));
- nni_aio_finish(aio, rv, 0);
- nni_task_fini(item->task);
- NNI_FREE_STRUCT(item);
- }
-}
+ resolv_item *item;
-static void
-win_resolv_cancel(nni_aio *aio, int rv)
-{
- win_resolv_item *item;
-
- nni_mtx_lock(&win_resolv_mtx);
+ nni_mtx_lock(&resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
- nni_mtx_unlock(&win_resolv_mtx);
+ nni_mtx_unlock(&resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
- item->aio = NULL;
- nni_mtx_unlock(&win_resolv_mtx);
+ if (nni_aio_list_active(aio)) {
+ // We have not been picked up by a resolver thread yet,
+ // so we can just discard everything.
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&resolv_mtx);
+ NNI_FREE_STRUCT(item);
+ } else {
+ // Resolver still working, so just unlink our AIO to
+ // discard our interest in the results.
+ item->aio = NULL;
+ nni_mtx_unlock(&resolv_mtx);
+ }
nni_aio_finish_error(aio, rv);
}
static int
-win_gai_errno(int rv)
+resolv_gai_errno(int rv)
{
switch (rv) {
case 0:
@@ -103,10 +92,9 @@ win_gai_errno(int rv)
}
}
-static void
-win_resolv_task(void *arg)
+static int
+resolv_task(resolv_item *item)
{
- win_resolv_item *item = arg;
struct addrinfo hints;
struct addrinfo *results;
struct addrinfo *probe;
@@ -114,16 +102,6 @@ win_resolv_task(void *arg)
results = NULL;
- nni_mtx_lock(&win_resolv_mtx);
- if (item->aio == NULL) {
- nni_mtx_unlock(&win_resolv_mtx);
- // Caller canceled, and no longer cares about this.
- nni_task_fini(item->task);
- NNI_FREE_STRUCT(item);
- return;
- }
- nni_mtx_unlock(&win_resolv_mtx);
-
// We treat these all as IP addresses. The service and the
// host part are split.
memset(&hints, 0, sizeof(hints));
@@ -136,7 +114,7 @@ win_resolv_task(void *arg)
rv = getaddrinfo(item->name, item->serv, &hints, &results);
if (rv != 0) {
- rv = win_gai_errno(rv);
+ rv = resolv_gai_errno(rv);
goto done;
}
@@ -179,18 +157,16 @@ done:
if (results != NULL) {
freeaddrinfo(results);
}
- nni_mtx_lock(&win_resolv_mtx);
- win_resolv_finish(item, rv);
- nni_mtx_unlock(&win_resolv_mtx);
+ return (rv);
}
static void
-win_resolv_ip(const char *host, const char *serv, int passive, int family,
+resolv_ip(const char *host, const char *serv, int passive, int family,
int proto, nni_aio *aio)
{
- win_resolv_item *item;
- int fam;
- int rv;
+ resolv_item *item;
+ int fam;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -215,13 +191,6 @@ win_resolv_ip(const char *host, const char *serv, int passive, int family,
return;
}
- rv = nni_task_init(&item->task, win_resolv_tq, win_resolv_task, item);
- if (rv != 0) {
- NNI_FREE_STRUCT(item);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
item->passive = passive;
item->name = host;
item->serv = serv;
@@ -229,42 +198,96 @@ win_resolv_ip(const char *host, const char *serv, int passive, int family,
item->aio = aio;
item->family = fam;
- nni_mtx_lock(&win_resolv_mtx);
- if ((rv = nni_aio_schedule(aio, win_resolv_cancel, item)) != 0) {
- nni_mtx_unlock(&win_resolv_mtx);
- nni_task_fini(item->task);
+ nni_mtx_lock(&resolv_mtx);
+ if (resolv_fini) {
+ rv = NNG_ECLOSED;
+ } else {
+ rv = nni_aio_schedule(aio, resolv_cancel, item);
+ }
+ if (rv != 0) {
+ nni_mtx_unlock(&resolv_mtx);
NNI_FREE_STRUCT(item);
nni_aio_finish_error(aio, rv);
return;
}
- nni_task_dispatch(item->task);
- nni_mtx_unlock(&win_resolv_mtx);
+ nni_list_append(&resolv_aios, aio);
+ nni_cv_wake1(&resolv_cv);
+ nni_mtx_unlock(&resolv_mtx);
}
void
nni_plat_tcp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
+ resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
}
void
nni_plat_udp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
+ resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}
-int
-nni_win_resolv_sysinit(void)
+void
+resolv_worker(void *notused)
{
- int rv;
- nni_mtx_init(&win_resolv_mtx);
+ NNI_ARG_UNUSED(notused);
+
+ nni_mtx_lock(&resolv_mtx);
+ for (;;) {
+ nni_aio * aio;
+ resolv_item *item;
+ int rv;
+
+ if ((aio = nni_list_first(&resolv_aios)) == NULL) {
+ if (resolv_fini) {
+ break;
+ }
+ nni_cv_wait(&resolv_cv);
+ continue;
+ }
+
+ item = nni_aio_get_prov_data(aio);
+ nni_aio_list_remove(aio);
+
+ // Now attempt to do the work. This runs synchronously.
+ nni_mtx_unlock(&resolv_mtx);
+ rv = resolv_task(item);
+ nni_mtx_lock(&resolv_mtx);
- if ((rv = nni_taskq_init(&win_resolv_tq, 4)) != 0) {
- nni_mtx_fini(&win_resolv_mtx);
- return (rv);
+ // Check to make sure we were not canceled.
+ if ((aio = item->aio) != NULL) {
+ nng_sockaddr *sa = nni_aio_get_input(aio, 0);
+ nni_aio_set_prov_data(aio, NULL);
+ item->aio = NULL;
+ memcpy(sa, &item->sa, sizeof(*sa));
+ nni_aio_finish(aio, rv, 0);
+
+ NNI_FREE_STRUCT(item);
+ }
+ }
+ nni_mtx_unlock(&resolv_mtx);
+}
+
+int
+nni_win_resolv_sysinit(void)
+{
+ nni_mtx_init(&resolv_mtx);
+ nni_cv_init(&resolv_cv, &resolv_mtx);
+ nni_aio_list_init(&resolv_aios);
+
+ resolv_fini = false;
+ for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) {
+ int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL);
+ if (rv != 0) {
+ nni_win_resolv_sysfini();
+ return (rv);
+ }
+ }
+ for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) {
+ nni_thr_run(&resolv_thrs[i]);
}
return (0);
}
@@ -272,11 +295,15 @@ nni_win_resolv_sysinit(void)
void
nni_win_resolv_sysfini(void)
{
- if (win_resolv_tq != NULL) {
- nni_taskq_fini(win_resolv_tq);
- win_resolv_tq = NULL;
+ nni_mtx_lock(&resolv_mtx);
+ resolv_fini = true;
+ nni_cv_wake(&resolv_cv);
+ nni_mtx_unlock(&resolv_mtx);
+ for (int i = 0; i < NNG_WIN_RESOLV_CONCURRENCY; i++) {
+ nni_thr_fini(&resolv_thrs[i]);
}
- nni_mtx_fini(&win_resolv_mtx);
+ nni_cv_fini(&resolv_cv);
+ nni_mtx_fini(&resolv_mtx);
}
#endif // NNG_PLATFORM_WINDOWS