aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c63
-rw-r--r--src/core/aio.h5
-rw-r--r--src/core/aio_test.c3
-rw-r--r--src/nng.c50
-rw-r--r--src/testing/streams.c2
5 files changed, 20 insertions, 103 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 87c79d0d..fa6eb76f 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -428,69 +428,6 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
}
bool
-nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
-{
- nni_aio_expire_q *eq = aio->a_expire_q;
- bool timeout = false;
-
- if (!aio->a_sleep && !aio->a_use_expire) {
- // Convert the relative timeout to an absolute timeout.
- switch (aio->a_timeout) {
- case NNG_DURATION_ZERO:
- timeout = true;
- break;
- case NNG_DURATION_INFINITE:
- case NNG_DURATION_DEFAULT:
- aio->a_expire = NNI_TIME_NEVER;
- break;
- default:
- aio->a_expire = nni_clock() + aio->a_timeout;
- break;
- }
- } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) {
- timeout = true;
- }
-
- nni_mtx_lock(&eq->eq_mtx);
- if (aio->a_stop || eq->eq_stop) {
- aio->a_stop = true;
- aio->a_sleep = false;
- aio->a_result = NNG_ESTOPPED;
- aio->a_stopped = true;
- nni_mtx_unlock(&eq->eq_mtx);
- nni_task_dispatch(&aio->a_task);
- return (false);
- }
- if (aio->a_abort) {
- aio->a_sleep = false;
- aio->a_abort = false;
- nni_mtx_unlock(&eq->eq_mtx);
- nni_task_dispatch(&aio->a_task);
- return (false);
- }
- if (timeout) {
- aio->a_sleep = false;
- aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
- aio->a_abort = false;
- nni_mtx_unlock(&eq->eq_mtx);
- nni_task_dispatch(&aio->a_task);
- return (false);
- }
-
- NNI_ASSERT(aio->a_cancel_fn == NULL);
- aio->a_cancel_fn = cancel;
- aio->a_cancel_arg = data;
-
- // We only schedule expiration if we have a way for the expiration
- // handler to actively cancel it.
- if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) {
- nni_aio_expire_add(aio);
- }
- nni_mtx_unlock(&eq->eq_mtx);
- return (true);
-}
-
-bool
nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
{
nni_aio_expire_q *eq = aio->a_expire_q;
diff --git a/src/core/aio.h b/src/core/aio.h
index d909853f..5346d19b 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -170,11 +170,6 @@ extern void nni_aio_bump_count(nni_aio *, size_t);
// cannot be canceled, which can lead to apparent hangs.
extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);
-// nni_aio_defer is just like nni_io_schedule, but it also calls the callback
-// automatically if the operation cannot be started because the AIO is stopped
-// or was canceled before this call (but after nni_aio_begin).
-extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *);
-
// nni_aio_reset is called by providers before doing any work -- it resets
// counts other fields to their initial state. It will not reset the closed
// state if the aio has been stopped or closed.
diff --git a/src/core/aio_test.c b/src/core/aio_test.c
index 66682460..a41f608d 100644
--- a/src/core/aio_test.c
+++ b/src/core/aio_test.c
@@ -130,8 +130,7 @@ test_provider_cancel(void)
int rv = 0;
// We fake an empty provider that does not do anything.
NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
- NUTS_TRUE(nng_aio_begin(aio) == true);
- nng_aio_defer(aio, cancel, &rv);
+ nng_aio_start(aio, cancel, &rv);
nng_aio_cancel(aio);
nng_aio_wait(aio);
NUTS_TRUE(rv == NNG_ECANCELED);
diff --git a/src/nng.c b/src/nng.c
index 5f3c6f12..0c96fd90 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -212,10 +212,9 @@ nng_recv_aio(nng_socket s, nng_aio *aio)
nni_sock *sock;
int rv;
+ nni_aio_reset(aio);
if ((rv = nni_sock_find(&sock, s.id)) != 0) {
- if (nni_aio_begin(aio) == 0) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
nni_sock_recv(sock, aio);
@@ -228,16 +227,13 @@ nng_send_aio(nng_socket s, nng_aio *aio)
nni_sock *sock;
int rv;
+ nni_aio_reset(aio);
if (nni_aio_get_msg(aio) == NULL) {
- if (nni_aio_begin(aio) == 0) {
- nni_aio_finish_error(aio, NNG_EINVAL);
- }
+ nni_aio_finish_error(aio, NNG_EINVAL);
return;
}
if ((rv = nni_sock_find(&sock, s.id)) != 0) {
- if (nni_aio_begin(aio) == 0) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
nni_sock_send(sock, aio);
@@ -326,10 +322,9 @@ nng_ctx_recv(nng_ctx cid, nng_aio *aio)
int rv;
nni_ctx *ctx;
+ nni_aio_reset(aio);
if ((rv = nni_ctx_find(&ctx, cid.id)) != 0) {
- if (nni_aio_begin(aio) == 0) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
nni_ctx_recv(ctx, aio);
@@ -342,16 +337,13 @@ nng_ctx_send(nng_ctx cid, nng_aio *aio)
int rv;
nni_ctx *ctx;
+ nni_aio_reset(aio);
if (nni_aio_get_msg(aio) == NULL) {
- if (nni_aio_begin(aio) == 0) {
- nni_aio_finish_error(aio, NNG_EINVAL);
- }
+ nni_aio_finish_error(aio, NNG_EINVAL);
return;
}
if ((rv = nni_ctx_find(&ctx, cid.id)) != 0) {
- if (nni_aio_begin(aio) == 0) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
nni_ctx_send(ctx, aio);
@@ -1279,20 +1271,17 @@ nng_device_aio(nng_aio *aio, nng_socket s1, nng_socket s2)
nni_sock *sock1 = NULL;
nni_sock *sock2 = NULL;
+ nni_aio_reset(aio);
if ((s1.id > 0) && (s1.id != (uint32_t) -1)) {
if ((rv = nni_sock_find(&sock1, s1.id)) != 0) {
- if (nni_aio_begin(aio) == 0) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
}
if (((s2.id > 0) && (s2.id != (uint32_t) -1)) && (s2.id != s1.id)) {
if ((rv = nni_sock_find(&sock2, s2.id)) != 0) {
nni_sock_rele(sock1);
- if (nni_aio_begin(aio) == 0) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
}
@@ -2064,18 +2053,15 @@ nng_aio_finish(nng_aio *aio, int rv)
}
bool
-nng_aio_defer(nng_aio *aio, nng_aio_cancelfn fn, void *arg)
+nng_aio_start(nng_aio *aio, nng_aio_cancelfn fn, void *arg)
{
- return (nni_aio_defer(aio, fn, arg));
+ return (nni_aio_start(aio, fn, arg));
}
-bool
-nng_aio_begin(nng_aio *aio)
+void
+nng_aio_reset(nng_aio *aio)
{
- if (nni_aio_begin(aio) != 0) {
- return (false);
- }
- return (true);
+ nni_aio_reset(aio);
}
#define xstr(a) str(a)
diff --git a/src/testing/streams.c b/src/testing/streams.c
index 92d52f37..9f4b01bc 100644
--- a/src/testing/streams.c
+++ b/src/testing/streams.c
@@ -95,7 +95,7 @@ stream_xfr_alloc(nng_stream *s, void (*submit)(nng_stream *, nng_aio *),
nng_aio_set_timeout(x->upper_aio, 30000);
nng_aio_set_timeout(x->lower_aio, 5000);
- nng_aio_begin(x->upper_aio);
+ (void) nng_aio_start(x->upper_aio, NULL, NULL);
x->s = s;
x->rem = size;