aboutsummaryrefslogtreecommitdiff
path: root/src/platform/windows
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-09 17:21:27 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-14 17:09:20 -0700
commit16b4c4019c7b7904de171c588ed8c72ca732d2cf (patch)
tree9e5a8416470631cfb48f5a6ebdd4b16e4b1be3d6 /src/platform/windows
parente0beb13b066d27ce32347a1c18c9d441828dc553 (diff)
downloadnng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.gz
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.bz2
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.zip
fixes #352 aio lock is burning hot
fixes #326 consider nni_taskq_exec_synch() fixes #410 kqueue implementation could be smarter fixes #411 epoll_implementation could be smarter fixes #426 synchronous completion can lead to panic fixes #421 pipe close race condition/duplicate destroy This is a major refactoring of two significant parts of the code base, which are closely interrelated. First the aio and taskq framework have undergone a number of simplifications, and improvements. We have ditched a few parts of the internal API (for example tasks no longer support cancellation) that weren't terribly useful but added a lot of complexity, and we've made aio_schedule something that now checks for cancellation or other "premature" completions. The aio framework now uses the tasks more tightly, so that aio wait can devolve into just nni_task_wait(). We did have to add a "task_prep()" step to prevent race conditions. Second, the entire POSIX poller framework has been simplified, and made more robust, and more scalable. There were some fairly inherent race conditions around the shutdown/close code, where we *thought* we were synchronizing against the other thread, but weren't doing so adequately. With a cleaner design, we've been able to tighten up the implementation to remove these race conditions, while substantially reducing the chance for lock contention, thereby improving scalability. The illumos poller also got a performance boost by polling for multiple events. In highly "busy" systems, we expect to see vast reductions in lock contention, and therefore greater scalability, in addition to overall improved reliability. One area where we currently can do better is that there is still only a single poller thread run. Scaling this out is a task that has to be done differently for each poller, and carefuly to ensure that close conditions are safe on all pollers, and that no chance for deadlock/livelock waiting for pfd finalizers can occur.
Diffstat (limited to 'src/platform/windows')
-rw-r--r--src/platform/windows/win_impl.h1
-rw-r--r--src/platform/windows/win_iocp.c7
-rw-r--r--src/platform/windows/win_ipc.c7
-rw-r--r--src/platform/windows/win_resolv.c138
-rw-r--r--src/platform/windows/win_thread.c7
5 files changed, 102 insertions, 58 deletions
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index b3c4738f..0bd12b24 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -34,6 +34,7 @@ struct nni_plat_thr {
void (*func)(void *);
void * arg;
HANDLE handle;
+ DWORD id;
};
struct nni_plat_mtx {
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index a3ae3748..5ead9cbc 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -155,11 +155,16 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
void
nni_win_event_submit(nni_win_event *evt, nni_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&evt->mtx);
- nni_aio_schedule(aio, nni_win_event_cancel, evt);
+ if ((rv = nni_aio_schedule(aio, nni_win_event_cancel, evt)) != 0) {
+ nni_mtx_unlock(&evt->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&evt->aios, aio);
nni_win_event_start(evt);
nni_mtx_unlock(&evt->mtx);
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index c6376cc7..169a2e00 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -572,17 +572,22 @@ void
nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&w->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep)) != 0) {
+ nni_mtx_unlock(&w->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
NNI_ASSERT(!nni_list_active(&w->waiters, ep));
ep->con_aio = aio;
nni_list_append(&w->waiters, ep);
- nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep);
nni_cv_wake(&w->cv);
nni_mtx_unlock(&w->mtx);
}
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index 070bf40d..2bc68c32 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -29,49 +29,55 @@
#define NNG_WIN_RESOLV_CONCURRENCY 4
#endif
-static nni_taskq *nni_win_resolv_tq = NULL;
-static nni_mtx nni_win_resolv_mtx;
-
-typedef struct nni_win_resolv_item nni_win_resolv_item;
-struct nni_win_resolv_item {
- int family;
- int passive;
- const char *name;
- const char *serv;
- int proto;
- nni_aio * aio;
- nni_task task;
+static nni_taskq *win_resolv_tq = NULL;
+static nni_mtx win_resolv_mtx;
+
+typedef struct win_resolv_item win_resolv_item;
+struct win_resolv_item {
+ int family;
+ int passive;
+ const char * name;
+ const char * serv;
+ int proto;
+ nni_aio * aio;
+ nni_task * task;
+ nng_sockaddr sa;
};
static void
-nni_win_resolv_finish(nni_win_resolv_item *item, int rv)
+win_resolv_finish(win_resolv_item *item, int rv)
{
- nni_aio *aio = item->aio;
-
- nni_aio_set_prov_data(aio, NULL);
- nni_aio_finish(aio, rv, 0);
- NNI_FREE_STRUCT(item);
+ nni_aio *aio;
+
+ if (((aio = item->aio) != NULL) &&
+ (nni_aio_get_prov_data(aio) == item)) {
+ nni_sockaddr *sa = nni_aio_get_input(aio, 0);
+ nni_aio_set_prov_data(aio, NULL);
+ memcpy(sa, &item->sa, sizeof(*sa));
+ nni_aio_finish(aio, rv, 0);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ }
}
static void
-nni_win_resolv_cancel(nni_aio *aio, int rv)
+win_resolv_cancel(nni_aio *aio, int rv)
{
- nni_win_resolv_item *item;
+ win_resolv_item *item;
- nni_mtx_lock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_unlock(&win_resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
- nni_mtx_unlock(&nni_win_resolv_mtx);
- nni_task_cancel(&item->task);
- NNI_FREE_STRUCT(item);
+ item->aio = NULL;
+ nni_mtx_unlock(&win_resolv_mtx);
nni_aio_finish_error(aio, rv);
}
static int
-nni_win_gai_errno(int rv)
+win_gai_errno(int rv)
{
switch (rv) {
case 0:
@@ -98,17 +104,26 @@ nni_win_gai_errno(int rv)
}
static void
-nni_win_resolv_task(void *arg)
+win_resolv_task(void *arg)
{
- nni_win_resolv_item *item = arg;
- nni_aio * aio = item->aio;
- struct addrinfo hints;
- struct addrinfo * results;
- struct addrinfo * probe;
- int rv;
+ win_resolv_item *item = arg;
+ struct addrinfo hints;
+ struct addrinfo *results;
+ struct addrinfo *probe;
+ int rv;
results = NULL;
+ nni_mtx_lock(&win_resolv_mtx);
+ if (item->aio == NULL) {
+ nni_mtx_unlock(&win_resolv_mtx);
+ // Caller canceled, and no longer cares about this.
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ return;
+ }
+ nni_mtx_unlock(&win_resolv_mtx);
+
// We treat these all as IP addresses. The service and the
// host part are split.
memset(&hints, 0, sizeof(hints));
@@ -124,7 +139,7 @@ nni_win_resolv_task(void *arg)
rv = getaddrinfo(item->name, item->serv, &hints, &results);
if (rv != 0) {
- rv = nni_win_gai_errno(rv);
+ rv = win_gai_errno(rv);
goto done;
}
@@ -142,7 +157,7 @@ nni_win_resolv_task(void *arg)
if (probe != NULL) {
struct sockaddr_in * sin;
struct sockaddr_in6 *sin6;
- nni_sockaddr * sa = nni_aio_get_input(aio, 0);
+ nni_sockaddr * sa = &item->sa;
switch (probe->ai_addr->sa_family) {
case AF_INET:
@@ -167,17 +182,18 @@ done:
if (results != NULL) {
freeaddrinfo(results);
}
- nni_mtx_lock(&nni_win_resolv_mtx);
- nni_win_resolv_finish(item, rv);
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
+ win_resolv_finish(item, rv);
+ nni_mtx_unlock(&win_resolv_mtx);
}
static void
-nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
+win_resolv_ip(const char *host, const char *serv, int passive, int family,
int proto, nni_aio *aio)
{
- nni_win_resolv_item *item;
- int fam;
+ win_resolv_item *item;
+ int fam;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -202,8 +218,12 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
return;
}
- nni_task_init(
- nni_win_resolv_tq, &item->task, nni_win_resolv_task, item);
+ rv = nni_task_init(&item->task, win_resolv_tq, win_resolv_task, item);
+ if (rv != 0) {
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
item->passive = passive;
item->name = host;
@@ -212,24 +232,30 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
item->aio = aio;
item->family = fam;
- nni_mtx_lock(&nni_win_resolv_mtx);
- nni_aio_schedule(aio, nni_win_resolv_cancel, item);
- nni_task_dispatch(&item->task);
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
+ if ((rv = nni_aio_schedule(aio, win_resolv_cancel, item)) != 0) {
+ nni_mtx_unlock(&win_resolv_mtx);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_task_dispatch(item->task);
+ nni_mtx_unlock(&win_resolv_mtx);
}
void
nni_plat_tcp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
+ win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
}
void
nni_plat_udp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
+ win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}
int
@@ -237,10 +263,10 @@ nni_win_resolv_sysinit(void)
{
int rv;
- nni_mtx_init(&nni_win_resolv_mtx);
+ nni_mtx_init(&win_resolv_mtx);
- if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) {
- nni_mtx_fini(&nni_win_resolv_mtx);
+ if ((rv = nni_taskq_init(&win_resolv_tq, 4)) != 0) {
+ nni_mtx_fini(&win_resolv_mtx);
return (rv);
}
return (0);
@@ -249,11 +275,11 @@ nni_win_resolv_sysinit(void)
void
nni_win_resolv_sysfini(void)
{
- if (nni_win_resolv_tq != NULL) {
- nni_taskq_fini(nni_win_resolv_tq);
- nni_win_resolv_tq = NULL;
+ if (win_resolv_tq != NULL) {
+ nni_taskq_fini(win_resolv_tq);
+ win_resolv_tq = NULL;
}
- nni_mtx_fini(&nni_win_resolv_mtx);
+ nni_mtx_fini(&win_resolv_mtx);
}
#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index a2ae72fe..2e9d58d7 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -107,6 +107,7 @@ static unsigned int __stdcall nni_plat_thr_main(void *arg)
{
nni_plat_thr *thr = arg;
+ thr->id = GetCurrentThreadId();
thr->func(thr->arg);
return (0);
}
@@ -138,6 +139,12 @@ nni_plat_thr_fini(nni_plat_thr *thr)
}
}
+bool
+nni_plat_thr_is_self(nni_plat_thr *thr)
+{
+ return (GetCurrentThreadId() == thr->id);
+}
+
static LONG plat_inited = 0;
int