aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-18 20:38:00 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-20 07:34:16 -0700
commit5902d02ad0a056a146231568f1293ffbcd59f61c (patch)
treebe38584c02d703ec2322ab941d4d723c752fe187 /src/platform
parent40542e7af0f5003d7ad67876ea580a59174031ca (diff)
downloadnng-5902d02ad0a056a146231568f1293ffbcd59f61c.tar.gz
nng-5902d02ad0a056a146231568f1293ffbcd59f61c.tar.bz2
nng-5902d02ad0a056a146231568f1293ffbcd59f61c.zip
fixes #346 nng_recv() sometimes acts on null `msg` pointer
This closes a fundamental flaw in the way aio structures were handled. In paticular, aio expiration could race ahead, and fire before the aio was properly registered by the provider. This ultimately led to the possibility of duplicate completions on the same aio. The solution involved breaking up nni_aio_start into two functions. nni_aio_begin (which can be run outside of external locks) simply validates that nni_aio_fini() has not been called, and clears certain fields in the aio to make it ready for use by the provider. nni_aio_schedule does the work to register the aio with the expiration thread, and should only be called when the aio is actually scheduled for asynchronous completion. nni_aio_schedule_verify does the same thing, but returns NNG_ETIMEDOUT if the aio has a zero length timeout. This change has a small negative performance impact. We have plans to rectify that by converting nni_aio_begin to use a locklesss flag for the aio->a_fini bit. While we were here, we fixed some error paths in the POSIX subsystem, which would have returned incorrect error codes, and we made some optmizations in the message queues to reduce conditionals while holding locks in the hot code path.
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_epdesc.c57
-rw-r--r--src/platform/posix/posix_pipedesc.c24
-rw-r--r--src/platform/posix/posix_resolv_gai.c11
-rw-r--r--src/platform/posix/posix_udp.c20
-rw-r--r--src/platform/windows/win_iocp.c11
-rw-r--r--src/platform/windows/win_ipc.c8
-rw-r--r--src/platform/windows/win_resolv.c10
7 files changed, 66 insertions, 75 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 16c0d09f..196b206a 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -319,26 +319,22 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
void
nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
{
- int rv;
-
// Accept is simpler than the connect case. With accept we just
// need to wait for the socket to be readable to indicate an incoming
// connection is ready for us. There isn't anything else for us to
// do really, as that will have been done in listen.
- nni_mtx_lock(&ed->mtx);
- // If we can't start, it means that the AIO was stopped.
- if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ed->mtx);
if (ed->closed) {
- nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
-
nni_aio_list_append(&ed->acceptq, aio);
+ nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
nni_posix_pollq_arm(&ed->node, POLLIN);
nni_mtx_unlock(&ed->mtx);
}
@@ -362,39 +358,33 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
int rv;
int fd;
- nni_mtx_lock(&ed->mtx);
- // If we can't start, it means that the AIO was stopped.
- if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ed->mtx);
- fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0);
- if (fd < 0) {
- nni_posix_epdesc_finish(aio, rv, 0);
+ if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, rv);
return;
}
// Possibly bind.
- if (ed->loclen != 0) {
- rv = bind(fd, (void *) &ed->locaddr, ed->loclen);
- if (rv != 0) {
- (void) close(fd);
- nni_posix_epdesc_finish(aio, rv, 0);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
+ if ((ed->loclen != 0) &&
+ (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
+ return;
}
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- rv = connect(fd, (void *) &ed->remaddr, ed->remlen);
-
- if (rv == 0) {
+ if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
-
nni_posix_epdesc_finish(aio, 0, fd);
nni_mtx_unlock(&ed->mtx);
return;
@@ -402,24 +392,27 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (errno != EINPROGRESS) {
// Some immediate failure occurred.
- if (errno == ENOENT) {
+ if (errno == ENOENT) { // For UNIX domain sockets
errno = ECONNREFUSED;
}
- (void) close(fd);
- nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
+ rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
return;
}
// We have to submit to the pollq, because the connection is pending.
ed->node.fd = fd;
if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
- (void) close(fd);
- nni_posix_epdesc_finish(aio, rv, 0);
+ ed->node.fd = -1;
nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
return;
}
+ nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
nni_aio_list_append(&ed->connectq, aio);
nni_posix_pollq_arm(&ed->node, POLLOUT);
nni_mtx_unlock(&ed->mtx);
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index aebea4b8..62531f9c 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -254,20 +254,20 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
void
nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
{
- int rv;
-
- nni_mtx_lock(&pd->mtx);
- if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pd->mtx);
+
if (pd->closed) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
nni_aio_list_append(&pd->readq, aio);
+ nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
+
// If we are only job on the list, go ahead and try to do an immediate
// transfer. This allows for faster completions in many cases. We
// also need not arm a list if it was already armed.
@@ -285,20 +285,20 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
void
nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
{
- int rv;
-
- nni_mtx_lock(&pd->mtx);
- if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pd->mtx);
+
if (pd->closed) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
nni_aio_list_append(&pd->writeq, aio);
+ nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
+
if (nni_list_first(&pd->writeq) == aio) {
nni_posix_pipedesc_dowrite(pd);
// If we are still the first thing on the list, that means we
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index f3ac7a19..d86a2008 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -206,9 +206,11 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
int family, int proto, nni_aio *aio)
{
nni_posix_resolv_item *item;
- int rv;
sa_family_t fam;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
switch (family) {
case NNG_AF_INET:
fam = AF_INET;
@@ -241,12 +243,7 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
item->family = fam;
nni_mtx_lock(&nni_posix_resolv_mtx);
- // If we were stopped, we're done...
- if ((rv = nni_aio_start(aio, nni_posix_resolv_cancel, item)) != 0) {
- nni_mtx_unlock(&nni_posix_resolv_mtx);
- NNI_FREE_STRUCT(item);
- return;
- }
+ nni_aio_schedule(aio, nni_posix_resolv_cancel, item);
nni_task_dispatch(&item->task);
nni_mtx_unlock(&nni_posix_resolv_mtx);
}
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 8a402acc..b414fa95 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -287,22 +287,26 @@ nni_plat_udp_cancel(nni_aio *aio, int rv)
void
nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
{
- nni_mtx_lock(&udp->udp_mtx);
- if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
- nni_list_append(&udp->udp_recvq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&udp->udp_mtx);
+ nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ nni_list_append(&udp->udp_recvq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
nni_mtx_unlock(&udp->udp_mtx);
}
void
nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
{
- nni_mtx_lock(&udp->udp_mtx);
- if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
- nni_list_append(&udp->udp_sendq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&udp->udp_mtx);
+ nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ nni_list_append(&udp->udp_sendq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
nni_mtx_unlock(&udp->udp_mtx);
}
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index 1189433a..a3ae3748 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -155,12 +155,11 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
void
nni_win_event_submit(nni_win_event *evt, nni_aio *aio)
{
- nni_mtx_lock(&evt->mtx);
- if (nni_aio_start(aio, nni_win_event_cancel, evt) != 0) {
- // the aio was aborted
- nni_mtx_unlock(&evt->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&evt->mtx);
+ nni_aio_schedule(aio, nni_win_event_cancel, evt);
nni_aio_list_append(&evt->aios, aio);
nni_win_event_start(evt);
nni_mtx_unlock(&evt->mtx);
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index 5843917c..d372f639 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -491,16 +491,16 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&w->mtx);
NNI_ASSERT(!nni_list_active(&w->waiters, ep));
- if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) {
- nni_mtx_unlock(&w->mtx);
- return;
- }
ep->con_aio = aio;
nni_list_append(&w->waiters, ep);
+ nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep);
nni_cv_wake(&w->cv);
nni_mtx_unlock(&w->mtx);
}
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index d07b4fd5..72e6b439 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -180,6 +180,9 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
int rv;
int fam;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
switch (family) {
case NNG_AF_INET:
fam = AF_INET;
@@ -211,12 +214,7 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
item->family = fam;
nni_mtx_lock(&nni_win_resolv_mtx);
- // If we were stopped, we're done...
- if ((rv = nni_aio_start(aio, nni_win_resolv_cancel, item)) != 0) {
- nni_mtx_unlock(&nni_win_resolv_mtx);
- NNI_FREE_STRUCT(item);
- return;
- }
+ nni_aio_schedule(aio, nni_win_resolv_cancel, item);
nni_task_dispatch(&item->task);
nni_mtx_unlock(&nni_win_resolv_mtx);
}