diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 63 | ||||
| -rw-r--r-- | src/core/aio.h | 5 | ||||
| -rw-r--r-- | src/core/aio_test.c | 3 | ||||
| -rw-r--r-- | src/nng.c | 50 | ||||
| -rw-r--r-- | src/testing/streams.c | 2 |
5 files changed, 20 insertions, 103 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 87c79d0d..fa6eb76f 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -428,69 +428,6 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } bool -nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) -{ - nni_aio_expire_q *eq = aio->a_expire_q; - bool timeout = false; - - if (!aio->a_sleep && !aio->a_use_expire) { - // Convert the relative timeout to an absolute timeout. - switch (aio->a_timeout) { - case NNG_DURATION_ZERO: - timeout = true; - break; - case NNG_DURATION_INFINITE: - case NNG_DURATION_DEFAULT: - aio->a_expire = NNI_TIME_NEVER; - break; - default: - aio->a_expire = nni_clock() + aio->a_timeout; - break; - } - } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) { - timeout = true; - } - - nni_mtx_lock(&eq->eq_mtx); - if (aio->a_stop || eq->eq_stop) { - aio->a_stop = true; - aio->a_sleep = false; - aio->a_result = NNG_ESTOPPED; - aio->a_stopped = true; - nni_mtx_unlock(&eq->eq_mtx); - nni_task_dispatch(&aio->a_task); - return (false); - } - if (aio->a_abort) { - aio->a_sleep = false; - aio->a_abort = false; - nni_mtx_unlock(&eq->eq_mtx); - nni_task_dispatch(&aio->a_task); - return (false); - } - if (timeout) { - aio->a_sleep = false; - aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; - aio->a_abort = false; - nni_mtx_unlock(&eq->eq_mtx); - nni_task_dispatch(&aio->a_task); - return (false); - } - - NNI_ASSERT(aio->a_cancel_fn == NULL); - aio->a_cancel_fn = cancel; - aio->a_cancel_arg = data; - - // We only schedule expiration if we have a way for the expiration - // handler to actively cancel it. - if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) { - nni_aio_expire_add(aio); - } - nni_mtx_unlock(&eq->eq_mtx); - return (true); -} - -bool nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) { nni_aio_expire_q *eq = aio->a_expire_q; diff --git a/src/core/aio.h b/src/core/aio.h index d909853f..5346d19b 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -170,11 +170,6 @@ extern void nni_aio_bump_count(nni_aio *, size_t); // cannot be canceled, which can lead to apparent hangs. extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *); -// nni_aio_defer is just like nni_io_schedule, but it also calls the callback -// automatically if the operation cannot be started because the AIO is stopped -// or was canceled before this call (but after nni_aio_begin). -extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *); - // nni_aio_reset is called by providers before doing any work -- it resets // counts other fields to their initial state. It will not reset the closed // state if the aio has been stopped or closed. diff --git a/src/core/aio_test.c b/src/core/aio_test.c index 66682460..a41f608d 100644 --- a/src/core/aio_test.c +++ b/src/core/aio_test.c @@ -130,8 +130,7 @@ test_provider_cancel(void) int rv = 0; // We fake an empty provider that does not do anything. NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - NUTS_TRUE(nng_aio_begin(aio) == true); - nng_aio_defer(aio, cancel, &rv); + nng_aio_start(aio, cancel, &rv); nng_aio_cancel(aio); nng_aio_wait(aio); NUTS_TRUE(rv == NNG_ECANCELED); @@ -212,10 +212,9 @@ nng_recv_aio(nng_socket s, nng_aio *aio) nni_sock *sock; int rv; + nni_aio_reset(aio); if ((rv = nni_sock_find(&sock, s.id)) != 0) { - if (nni_aio_begin(aio) == 0) { - nni_aio_finish_error(aio, rv); - } + nni_aio_finish_error(aio, rv); return; } nni_sock_recv(sock, aio); @@ -228,16 +227,13 @@ nng_send_aio(nng_socket s, nng_aio *aio) nni_sock *sock; int rv; + nni_aio_reset(aio); if (nni_aio_get_msg(aio) == NULL) { - if (nni_aio_begin(aio) == 0) { - nni_aio_finish_error(aio, NNG_EINVAL); - } + nni_aio_finish_error(aio, NNG_EINVAL); return; } if ((rv = nni_sock_find(&sock, s.id)) != 0) { - if (nni_aio_begin(aio) == 0) { - nni_aio_finish_error(aio, rv); - } + nni_aio_finish_error(aio, rv); return; } nni_sock_send(sock, aio); @@ -326,10 +322,9 @@ nng_ctx_recv(nng_ctx cid, nng_aio *aio) int rv; nni_ctx *ctx; + nni_aio_reset(aio); if ((rv = nni_ctx_find(&ctx, cid.id)) != 0) { - if (nni_aio_begin(aio) == 0) { - nni_aio_finish_error(aio, rv); - } + nni_aio_finish_error(aio, rv); return; } nni_ctx_recv(ctx, aio); @@ -342,16 +337,13 @@ nng_ctx_send(nng_ctx cid, nng_aio *aio) int rv; nni_ctx *ctx; + nni_aio_reset(aio); if (nni_aio_get_msg(aio) == NULL) { - if (nni_aio_begin(aio) == 0) { - nni_aio_finish_error(aio, NNG_EINVAL); - } + nni_aio_finish_error(aio, NNG_EINVAL); return; } if ((rv = nni_ctx_find(&ctx, cid.id)) != 0) { - if (nni_aio_begin(aio) == 0) { - nni_aio_finish_error(aio, rv); - } + nni_aio_finish_error(aio, rv); return; } nni_ctx_send(ctx, aio); @@ -1279,20 +1271,17 @@ nng_device_aio(nng_aio *aio, nng_socket s1, nng_socket s2) nni_sock *sock1 = NULL; nni_sock *sock2 = NULL; + nni_aio_reset(aio); if ((s1.id > 0) && (s1.id != (uint32_t) -1)) { if ((rv = nni_sock_find(&sock1, s1.id)) != 0) { - if (nni_aio_begin(aio) == 0) { - nni_aio_finish_error(aio, rv); - } + nni_aio_finish_error(aio, rv); return; } } if (((s2.id > 0) && (s2.id != (uint32_t) -1)) && (s2.id != s1.id)) { if ((rv = nni_sock_find(&sock2, s2.id)) != 0) { nni_sock_rele(sock1); - if (nni_aio_begin(aio) == 0) { - nni_aio_finish_error(aio, rv); - } + nni_aio_finish_error(aio, rv); return; } } @@ -2064,18 +2053,15 @@ nng_aio_finish(nng_aio *aio, int rv) } bool -nng_aio_defer(nng_aio *aio, nng_aio_cancelfn fn, void *arg) +nng_aio_start(nng_aio *aio, nng_aio_cancelfn fn, void *arg) { - return (nni_aio_defer(aio, fn, arg)); + return (nni_aio_start(aio, fn, arg)); } -bool -nng_aio_begin(nng_aio *aio) +void +nng_aio_reset(nng_aio *aio) { - if (nni_aio_begin(aio) != 0) { - return (false); - } - return (true); + nni_aio_reset(aio); } #define xstr(a) str(a) diff --git a/src/testing/streams.c b/src/testing/streams.c index 92d52f37..9f4b01bc 100644 --- a/src/testing/streams.c +++ b/src/testing/streams.c @@ -95,7 +95,7 @@ stream_xfr_alloc(nng_stream *s, void (*submit)(nng_stream *, nng_aio *), nng_aio_set_timeout(x->upper_aio, 30000); nng_aio_set_timeout(x->lower_aio, 5000); - nng_aio_begin(x->upper_aio); + (void) nng_aio_start(x->upper_aio, NULL, NULL); x->s = s; x->rem = size; |
