aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-17 20:41:06 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-17 21:52:07 -0700
commit846c30081a67e961b4a060bdca192ddafb87cce9 (patch)
treec795cd6241979418f651c2e2d5e42091988591d6 /src/platform/posix
parent109a559590abfe3017dd317c3068e2457188541c (diff)
downloadnng-846c30081a67e961b4a060bdca192ddafb87cce9.tar.gz
nng-846c30081a67e961b4a060bdca192ddafb87cce9.tar.bz2
nng-846c30081a67e961b4a060bdca192ddafb87cce9.zip
fixes #451 task finalization could be better/smarter (resolver)
This changes nni_task_fini to always run synchronously, waiting for the task to finish before cleaning up. Much simpler code. Additionally, we've refactored the resolver code to avoid the use of taskqs, which added complexity and inefficiency. The approach of just allocating its own threads and a work queue to process them turns out to be vastly simpler, and actually reduces extra allocations and context switches. wip POSIX resolv threads. (Taskqs are just overhead and complexity here.) Windows resolver changes. Task cleanup. fix up windows mutex.
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_resolv_gai.c155
1 files changed, 97 insertions, 58 deletions
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index ad1d5147..dcaef409 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -37,8 +37,11 @@
#define NNG_POSIX_RESOLV_CONCURRENCY 4
#endif
-static nni_taskq *resolv_tq = NULL;
-static nni_mtx resolv_mtx;
+static nni_mtx resolv_mtx;
+static nni_cv resolv_cv;
+static bool resolv_fini;
+static nni_list resolv_aios;
+static nni_thr resolv_thrs[NNG_POSIX_RESOLV_CONCURRENCY];
typedef struct resolv_item resolv_item;
struct resolv_item {
@@ -48,45 +51,40 @@ struct resolv_item {
const char * serv;
int proto;
nni_aio * aio;
- nni_task * task;
nng_sockaddr sa;
};
static void
-resolv_finish(resolv_item *item, int rv)
-{
- nni_aio *aio;
-
- if (((aio = item->aio) != NULL) &&
- (nni_aio_get_prov_data(aio) == item)) {
- nng_sockaddr *sa = nni_aio_get_input(aio, 0);
- nni_aio_set_prov_data(aio, NULL);
- item->aio = NULL;
- memcpy(sa, &item->sa, sizeof(*sa));
- nni_aio_finish(aio, rv, 0);
- nni_task_fini(item->task);
- NNI_FREE_STRUCT(item);
- }
-}
-
-static void
resolv_cancel(nni_aio *aio, int rv)
{
resolv_item *item;
nni_mtx_lock(&resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
+ // Already canceled?
nni_mtx_unlock(&resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
- item->aio = NULL;
- nni_mtx_unlock(&resolv_mtx);
+ if (nni_aio_list_active(aio)) {
+ // We have not been picked up by a resolver thread yet,
+ // so we can just discard everything.
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&resolv_mtx);
+ NNI_FREE_STRUCT(item);
+ } else {
+ // This case indicates the resolver is still processing our
+ // node. We can discard our interest in the result, but we
+ // can't interrupt the resolver itself. (Too bad, name
+ // resolution is utterly synchronous for now.)
+ item->aio = NULL;
+ nni_mtx_unlock(&resolv_mtx);
+ }
nni_aio_finish_error(aio, rv);
}
static int
-nni_posix_gai_errno(int rv)
+posix_gai_errno(int rv)
{
switch (rv) {
case 0:
@@ -116,25 +114,14 @@ nni_posix_gai_errno(int rv)
}
}
-static void
-resolv_task(void *arg)
+static int
+resolv_task(resolv_item *item)
{
- resolv_item * item = arg;
struct addrinfo hints;
struct addrinfo *results;
struct addrinfo *probe;
int rv;
- nni_mtx_lock(&resolv_mtx);
- if (item->aio == NULL) {
- nni_mtx_unlock(&resolv_mtx);
- // Caller canceled, and no longer cares about this.
- nni_task_fini(item->task);
- NNI_FREE_STRUCT(item);
- return;
- }
- nni_mtx_unlock(&resolv_mtx);
-
results = NULL;
// We treat these all as IP addresses. The service and the
@@ -152,7 +139,7 @@ resolv_task(void *arg)
rv = getaddrinfo(item->name, item->serv, &hints, &results);
if (rv != 0) {
- rv = nni_posix_gai_errno(rv);
+ rv = posix_gai_errno(rv);
goto done;
}
@@ -196,9 +183,7 @@ done:
freeaddrinfo(results);
}
- nni_mtx_lock(&resolv_mtx);
- resolv_finish(item, rv);
- nni_mtx_unlock(&resolv_mtx);
+ return (rv);
}
static void
@@ -232,13 +217,6 @@ resolv_ip(const char *host, const char *serv, int passive, int family,
return;
}
- if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) !=
- 0) {
- NNI_FREE_STRUCT(item);
- nni_aio_finish_error(aio, rv);
- return;
- };
-
// NB: host and serv must remain valid until this is completed.
memset(&item->sa, 0, sizeof(item->sa));
item->passive = passive;
@@ -249,14 +227,19 @@ resolv_ip(const char *host, const char *serv, int passive, int family,
item->family = fam;
nni_mtx_lock(&resolv_mtx);
- if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) {
+ if (resolv_fini) {
+ rv = NNG_ECLOSED;
+ } else {
+ rv = nni_aio_schedule(aio, resolv_cancel, item);
+ }
+ if (rv != 0) {
nni_mtx_unlock(&resolv_mtx);
- nni_task_fini(item->task);
NNI_FREE_STRUCT(item);
nni_aio_finish_error(aio, rv);
return;
}
- nni_task_dispatch(item->task);
+ nni_list_append(&resolv_aios, aio);
+ nni_cv_wake1(&resolv_cv);
nni_mtx_unlock(&resolv_mtx);
}
@@ -274,27 +257,83 @@ nni_plat_udp_resolv(
resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}
+void
+resolv_worker(void *notused)
+{
+
+ NNI_ARG_UNUSED(notused);
+
+ nni_mtx_lock(&resolv_mtx);
+ for (;;) {
+ nni_aio * aio;
+ resolv_item *item;
+ int rv;
+
+ if ((aio = nni_list_first(&resolv_aios)) == NULL) {
+ if (resolv_fini) {
+ break;
+ }
+ nni_cv_wait(&resolv_cv);
+ continue;
+ }
+
+ item = nni_aio_get_prov_data(aio);
+ nni_aio_list_remove(aio);
+
+ // Now attempt to do the work. This runs synchronously.
+ nni_mtx_unlock(&resolv_mtx);
+ rv = resolv_task(item);
+ nni_mtx_lock(&resolv_mtx);
+
+ // Check to make sure we were not canceled.
+ if ((aio = item->aio) != NULL) {
+ nng_sockaddr *sa = nni_aio_get_input(aio, 0);
+ nni_aio_set_prov_data(aio, NULL);
+ item->aio = NULL;
+ memcpy(sa, &item->sa, sizeof(*sa));
+ nni_aio_finish(aio, rv, 0);
+
+ NNI_FREE_STRUCT(item);
+ }
+ }
+ nni_mtx_unlock(&resolv_mtx);
+}
+
int
nni_posix_resolv_sysinit(void)
{
- int rv;
-
nni_mtx_init(&resolv_mtx);
+ nni_cv_init(&resolv_cv, &resolv_mtx);
+ nni_aio_list_init(&resolv_aios);
+
+ resolv_fini = false;
- if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) {
- nni_mtx_fini(&resolv_mtx);
- return (rv);
+ for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
+ int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL);
+ if (rv != 0) {
+ nni_posix_resolv_sysfini();
+ return (rv);
+ }
+ }
+ for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
+ nni_thr_run(&resolv_thrs[i]);
}
+
return (0);
}
void
nni_posix_resolv_sysfini(void)
{
- if (resolv_tq != NULL) {
- nni_taskq_fini(resolv_tq);
- resolv_tq = NULL;
+ nni_mtx_lock(&resolv_mtx);
+ resolv_fini = true;
+ nni_cv_wake(&resolv_cv);
+ nni_mtx_unlock(&resolv_mtx);
+
+ for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
+ nni_thr_fini(&resolv_thrs[i]);
}
+ nni_cv_fini(&resolv_cv);
nni_mtx_fini(&resolv_mtx);
}