aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c70
-rw-r--r--src/core/aio.h9
-rw-r--r--src/core/device.c3
-rw-r--r--src/core/sockfd.c5
-rw-r--r--src/core/tcp.c4
5 files changed, 76 insertions, 15 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index eaff5c80..0ac70a0d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -8,6 +8,7 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include "core/aio.h"
#include "core/nng_impl.h"
#include "core/taskq.h"
#include <string.h>
@@ -347,14 +348,14 @@ nni_aio_begin(nni_aio *aio)
aio->a_count = 0;
aio->a_cancel_fn = NULL;
aio->a_abort = false;
+ aio->a_expire_ok = false;
+ aio->a_sleep = false;
// We should not reschedule anything at this point.
if (aio->a_stop || eq->eq_stop) {
aio->a_result = NNG_ECANCELED;
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_sleep = false;
- aio->a_expire_ok = false;
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ECANCELED);
@@ -408,6 +409,65 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
return (0);
}
+bool
+nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
+{
+ nni_aio_expire_q *eq = aio->a_expire_q;
+ bool timeout = false;
+
+ if (!aio->a_sleep && !aio->a_use_expire) {
+ // Convert the relative timeout to an absolute timeout.
+ switch (aio->a_timeout) {
+ case NNG_DURATION_ZERO:
+ timeout = true;
+ break;
+ case NNG_DURATION_INFINITE:
+ case NNG_DURATION_DEFAULT:
+ aio->a_expire = NNI_TIME_NEVER;
+ break;
+ default:
+ aio->a_expire = nni_clock() + aio->a_timeout;
+ break;
+ }
+ } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) {
+ timeout = true;
+ }
+
+ nni_mtx_lock(&eq->eq_mtx);
+ if (timeout) {
+ aio->a_sleep = false;
+ aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+ if (aio->a_abort) {
+ aio->a_sleep = false;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+ if (aio->a_stop || eq->eq_stop) {
+ aio->a_sleep = false;
+ aio->a_result = NNG_ECLOSED;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+
+ NNI_ASSERT(aio->a_cancel_fn == NULL);
+ aio->a_cancel_fn = cancel;
+ aio->a_cancel_arg = data;
+
+ // We only schedule expiration if we have a way for the expiration
+ // handler to actively cancel it.
+ if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) {
+ nni_aio_expire_add(aio);
+ }
+ nni_mtx_unlock(&eq->eq_mtx);
+ return (true);
+}
+
// nni_aio_abort is called by a consumer which guarantees that the aio
// is still valid.
void
@@ -766,7 +826,6 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -788,9 +847,8 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
aio->a_expire =
ms == NNG_DURATION_INFINITE ? NNI_TIME_NEVER : nni_clock() + ms;
- if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) {
- nni_aio_finish_error(aio, rv);
- }
+ // we don't do anything else here, so we can ignore the return
+ (void) nni_aio_defer(aio, nni_sleep_cancel, NULL);
}
static bool
diff --git a/src/core/aio.h b/src/core/aio.h
index 9491a2fa..8628d8ef 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -164,8 +164,17 @@ extern void nni_aio_bump_count(nni_aio *, size_t);
// is returned. (In that case the caller should probably either return an
// error to its caller, or possibly cause an asynchronous error by calling
// nni_aio_finish_error on this aio.)
+//
+// NB: This function should be called while holding the lock that will be used
+// to cancel the operation. Otherwise a race can occur where the operation
+// cannot be canceled, which can lead to apparent hangs.
extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);
+// nni_aio_defer is just like nni_io_schedule, but it also calls the callback
+// automatically if the operation cannot be started because the AIO is stopped
+// or was canceled before this call (but after nni_aio_begin).
+extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *);
+
extern void nni_sleep_aio(nni_duration, nni_aio *);
// nni_aio_completion_list is used after removing the aio from an
diff --git a/src/core/device.c b/src/core/device.c
index 815fafcc..7084d3e4 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -228,9 +228,8 @@ nni_device(nni_aio *aio, nni_sock *s1, nni_sock *s2)
nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_aio_schedule(aio, device_cancel, d)) != 0) {
+ if (!nni_aio_defer(aio, device_cancel, d)) {
nni_mtx_unlock(&device_mtx);
- nni_aio_finish_error(aio, rv);
nni_reap(&device_reap, d);
}
device_start(d, aio);
diff --git a/src/core/sockfd.c b/src/core/sockfd.c
index 787a0783..1a0e792f 100644
--- a/src/core/sockfd.c
+++ b/src/core/sockfd.c
@@ -115,7 +115,6 @@ static void
sfd_listener_accept(void *arg, nng_aio *aio)
{
sfd_listener *l = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -129,9 +128,7 @@ sfd_listener_accept(void *arg, nng_aio *aio)
if (l->listen_cnt) {
sfd_start_conn(l, aio);
- } else if ((rv = nni_aio_schedule(aio, sfd_cancel_accept, l)) != 0) {
- nni_aio_finish_error(aio, rv);
- } else {
+ } else if (nni_aio_defer(aio, sfd_cancel_accept, l)) {
nni_aio_list_append(&l->accept_q, aio);
}
nni_mtx_unlock(&l->mtx);
diff --git a/src/core/tcp.c b/src/core/tcp.c
index d2e08493..75b938b0 100644
--- a/src/core/tcp.c
+++ b/src/core/tcp.c
@@ -177,7 +177,6 @@ static void
tcp_dialer_dial(void *arg, nng_aio *aio)
{
tcp_dialer *d = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -187,9 +186,8 @@ tcp_dialer_dial(void *arg, nng_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) {
+ if (!nni_aio_defer(aio, tcp_dial_cancel, d)) {
nni_mtx_unlock(&d->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&d->conaios, aio);