diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-26 10:20:33 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-26 10:52:17 -0800 |
| commit | 06ebeefb102b223ff77dd47e16b64bd9575f7c34 (patch) | |
| tree | cf5d70cadfd2f7261bb5fd6443d2c449d1c1e812 | |
| parent | d02a320cffc82de8437c28869355a403069b3b21 (diff) | |
| download | nng-06ebeefb102b223ff77dd47e16b64bd9575f7c34.tar.gz nng-06ebeefb102b223ff77dd47e16b64bd9575f7c34.tar.bz2 nng-06ebeefb102b223ff77dd47e16b64bd9575f7c34.zip | |
aio: introduce NNG_ESTOPPED
This error code results when an AIO is stopped permanently, as a result
of nni_aio_close or nni_aio_stop. The associated AIO object cannot be
used again. This discrimantes against a file being closed, or a temporary
cancellation which might allow the aio to be reused.
Consumers must check for this error status in their callbacks, and not
resubmit an operation that failed with this error. Doing so, will result
in an infinite loop of submit / errors.
28 files changed, 371 insertions, 149 deletions
diff --git a/docs/ref/api/aio.md b/docs/ref/api/aio.md index 230744e8..966201ba 100644 --- a/docs/ref/api/aio.md +++ b/docs/ref/api/aio.md @@ -100,7 +100,7 @@ then [`nng_aio_result`] will return _err_. The {{i:`nng_aio_cancel`}} function acts like `nng_aio_abort`, but uses the error code [`NNG_ECANCELED`]{{hi:`NNG_ECANCELED`}}. -The {{i:`nng_aio_stop`}} function aborts the _aio_ operation with [`NNG_ECANCELED`], +The {{i:`nng_aio_stop`}} function aborts the _aio_ operation with [`NNG_ESTOPPED`], and then waits the operation and any associated callback to complete. This function also marks _aio_ itself permanently stopped, so that any new operations scheduled by I/O providers using [`nng_aio_begin`] diff --git a/docs/ref/api/errors.md b/docs/ref/api/errors.md index 6548c263..6954b658 100644 --- a/docs/ref/api/errors.md +++ b/docs/ref/api/errors.md @@ -69,6 +69,7 @@ future locale-specific strings may be presented instead. | `NNG_EAMBIGUOUS`<a name="NNG_EAMBIGUOUS"></a> | 29 | Ambiguous option. The command line option could not be unambiguously resolved. Only used with [`nng_opts_parse`]. | | `NNG_EBADTYPE`<a name="NNG_EBADTYPE"></a> | 30 | Incorrect type. A type-specific function was used for an object of the wrong type. | | `NNG_ECONNSHUT`<a name="NNG_ECONNSHUT"></a> | 31 | Connection shutdown. The connection was shut down and cannot be used. | +| `NNG_ESTOPPED`<a name="NNG_ESTOPPED"></a> | 1000 | Operation stopped. The operation was stopped with [`nng_aio_stop`] or [`nng_aio_close`]. | | `NNG_EINTERNAL`<a name="NNG_EINTERNAL"></a> | 1000 | An unidentifier internal error occurred. | | `NNG_ESYSERR`<a name="NNG_ESYSERR"></a> | 0x10000000 - 0x1FFFFFFF | An unidentified system error occurred. These are errors reported by the operating system. | | `NNG_ETRANERR`<a name="NNG_ETRANERR"></a> | 0x20000000 - 0x2FFFFFFF | An unidentified transport error occurred. | diff --git a/docs/ref/migrate/nng1.md b/docs/ref/migrate/nng1.md index 0d6ae86d..99afb4d3 100644 --- a/docs/ref/migrate/nng1.md +++ b/docs/ref/migrate/nng1.md @@ -1,7 +1,7 @@ # Migrating from NNG 1.x -There are some incompatibities from NNG 1.x. -This guide should help in migrating applications to use NNG 2.0. +There are some incompatibities from NNG 1.x, and applications must make certain changes for NNG 2.0. +This guide should help with this migration. ## Nanomsg Compatibility @@ -13,6 +13,14 @@ See the [Migrating From libnanomsg](nanomsg.md) chapter for details. It is now required for applications to initialize the library explicitly before using it. This is done using the [`nng_init`] function. +## New AIO Error Code NNG_ESTOPPED + +When an operation fails with [`NNG_ESTOPPED`], it means that the associated [`nni_aio`] object has +been permanently stopped and must not be reused. Applications must watch for this error code, and +not resubmit an operation that returns it. This is particularly important for callbacks that automatically +resubmit operations. Failure to observe this rule will lead to an infinite loop +as any further operations on the object will fail immediately with `NNG_ESTOPPED`. + ## Transport Specific Functions Transports have not needed to be registered for a long time now, diff --git a/include/nng/nng.h b/include/nng/nng.h index 4d21287c..2563bc1e 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -1078,6 +1078,7 @@ enum nng_errno_enum { NNG_EAMBIGUOUS = 29, NNG_EBADTYPE = 30, NNG_ECONNSHUT = 31, + NNG_ESTOPPED = 999, NNG_EINTERNAL = 1000, NNG_ESYSERR = 0x10000000, NNG_ETRANERR = 0x20000000 diff --git a/src/core/aio.c b/src/core/aio.c index 54ba7000..f039cdc8 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -60,11 +60,10 @@ static int nni_aio_expire_q_cnt; // when we want to know if the operation itself is complete. // // In order to guard against aio reuse during teardown, we set the a_stop -// flag. Any attempt to initialize for a new operation after that point -// will fail and the caller will get NNG_ECANCELED indicating this. The -// provider that calls nni_aio_begin() MUST check the return value, and -// if it comes back nonzero (NNG_ECANCELED) then it must simply discard the -// request and return. +// flag. Any attempt to submit new operation after that point will fail with +// the status NNG_ESTOPPED indicating this. The provider that calls +// nni_aio_begin() MUST check the return value, and if it comes back +// NNG_ESTOPPED then it must simply discard the request and // return. // // Calling nni_aio_wait waits for the current outstanding operation to // complete, but does not block another one from being started on the @@ -118,7 +117,7 @@ nni_aio_fini(nni_aio *aio) nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { - fn(aio, arg, NNG_ECLOSED); + fn(aio, arg, NNG_ESTOPPED); } nni_task_fini(&aio->a_task); @@ -201,7 +200,7 @@ nni_aio_stop(nni_aio *aio) nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { - fn(aio, arg, NNG_ECANCELED); + fn(aio, arg, NNG_ESTOPPED); } nni_aio_wait(aio); @@ -226,7 +225,7 @@ nni_aio_close(nni_aio *aio) nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { - fn(aio, arg, NNG_ECLOSED); + fn(aio, arg, NNG_ESTOPPED); } } } @@ -353,12 +352,13 @@ nni_aio_begin(nni_aio *aio) // We should not reschedule anything at this point. if (aio->a_stop || eq->eq_stop) { - aio->a_result = NNG_ECANCELED; + aio->a_result = NNG_ESTOPPED; aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; + aio->a_stop = true; nni_mtx_unlock(&eq->eq_mtx); - return (NNG_ECANCELED); + return (NNG_ESTOPPED); } nni_task_prep(&aio->a_task); nni_mtx_unlock(&eq->eq_mtx); @@ -386,15 +386,17 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } nni_mtx_lock(&eq->eq_mtx); + if (aio->a_stop || eq->eq_stop) { + aio->a_stop = true; + nni_mtx_unlock(&eq->eq_mtx); + return (NNG_ESTOPPED); + } if (aio->a_abort) { int rv = aio->a_result; nni_mtx_unlock(&eq->eq_mtx); + NNI_ASSERT(rv != 0); return (rv); } - if (aio->a_stop || eq->eq_stop) { - nni_mtx_unlock(&eq->eq_mtx); - return (NNG_ECLOSED); - } NNI_ASSERT(aio->a_cancel_fn == NULL); aio->a_cancel_fn = cancel; @@ -434,22 +436,25 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } nni_mtx_lock(&eq->eq_mtx); - if (timeout) { + if (aio->a_stop || eq->eq_stop) { + aio->a_stop = true; aio->a_sleep = false; - aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + aio->a_result = NNG_ESTOPPED; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } if (aio->a_abort) { aio->a_sleep = false; + aio->a_abort = false; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } - if (aio->a_stop || eq->eq_stop) { + if (timeout) { aio->a_sleep = false; - aio->a_result = NNG_ECLOSED; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + aio->a_abort = false; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); @@ -499,22 +504,26 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) nni_task_prep(&aio->a_task); nni_mtx_lock(&eq->eq_mtx); - if (timeout) { + if (aio->a_stop || eq->eq_stop) { + aio->a_stop = true; aio->a_sleep = false; - aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + aio->a_result = NNG_ESTOPPED; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } if (aio->a_abort) { aio->a_sleep = false; + aio->a_abort = false; + NNI_ASSERT(aio->a_result != 0); nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } - if (aio->a_stop || eq->eq_stop) { + if (timeout) { aio->a_sleep = false; - aio->a_result = NNG_ECLOSED; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + aio->a_abort = false; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); @@ -767,7 +776,8 @@ nni_aio_expire_loop(void *arg) for (uint32_t i = 0; i < exp_idx; i++) { aio = expires[i]; if (q->eq_stop) { - rv = NNG_ECANCELED; + rv = NNG_ESTOPPED; + aio->a_stop = true; } else if (aio->a_expire_ok) { aio->a_expire_ok = false; rv = 0; diff --git a/src/core/aio_test.c b/src/core/aio_test.c index f18eb5ed..66682460 100644 --- a/src/core/aio_test.c +++ b/src/core/aio_test.c @@ -76,8 +76,8 @@ static void sleep_reap(void *arg) { nng_aio *aio = *(nng_aio **) arg; - if (nng_aio_result(aio) != NNG_ECANCELED) { - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + if (nng_aio_result(aio) != NNG_ESTOPPED) { + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); } nng_aio_reap(aio); } diff --git a/src/core/dialer.c b/src/core/dialer.c index d684fd47..5bf9915c 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -421,6 +421,7 @@ dialer_connect_cb(void *arg) break; case NNG_ECLOSED: // No further action. case NNG_ECANCELED: // No further action. + case NNG_ESTOPPED: // No further action. nni_dialer_bump_error(d, rv); break; case NNG_ECONNREFUSED: diff --git a/src/core/errors_test.c b/src/core/errors_test.c index 72a107f4..b02bd36b 100644 --- a/src/core/errors_test.c +++ b/src/core/errors_test.c @@ -18,6 +18,7 @@ test_known_errors(void) NUTS_MATCH(nng_strerror(0), "Hunky dory"); NUTS_MATCH(nng_strerror(NNG_ECLOSED), "Object closed"); NUTS_MATCH(nng_strerror(NNG_ETIMEDOUT), "Timed out"); + NUTS_MATCH(nng_strerror(NNG_ESTOPPED), "Operation stopped"); } static void diff --git a/src/core/listener.c b/src/core/listener.c index c212ce91..0ba745c9 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -417,6 +417,7 @@ listener_accept_cb(void *arg) nni_listener_bump_error(l, rv); listener_accept_start(l); break; + case NNG_ESTOPPED: // no further action case NNG_ECLOSED: // no further action case NNG_ECANCELED: // no further action nni_listener_bump_error(l, rv); diff --git a/src/core/protocol.h b/src/core/protocol.h index 0d4d12dc..5231c8f4 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -46,7 +46,7 @@ struct nni_proto_pipe_ops { // pipe_close is an idempotent, non-blocking, operation, called // when the pipe is being closed. Any operations pending on the // pipe should be canceled with NNG_ECLOSED. (Best option is to - // use nng_aio_close() on them) + // use nni_aio_close() on them) void (*pipe_close)(void *); // pipe_stop is called during finalization, to ensure that @@ -1356,6 +1356,7 @@ static const struct { { NNG_EAMBIGUOUS, "Ambiguous option" }, { NNG_EBADTYPE, "Incorrect type" }, { NNG_ECONNSHUT, "Connection shutdown" }, + { NNG_ESTOPPED, "Operation stopped"}, { NNG_EINTERNAL, "Internal error detected" }, { 0, NULL }, // clang-format on diff --git a/src/sp/multistress_test.c b/src/sp/multistress_test.c index 1e336205..22e15ff3 100644 --- a/src/sp/multistress_test.c +++ b/src/sp/multistress_test.c @@ -86,7 +86,8 @@ fatal(const char *msg, int rv) void error(test_case *c, const char *msg, int rv) { - if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED)) { + if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED) || + (rv == NNG_ESTOPPED)) { return; } fprintf( diff --git a/src/sp/protocol/bus0/bus_test.c b/src/sp/protocol/bus0/bus_test.c index a6b04df9..c9952d81 100644 --- a/src/sp/protocol/bus0/bus_test.c +++ b/src/sp/protocol/bus0/bus_test.c @@ -189,11 +189,32 @@ test_bus_aio_stopped(void) nng_recv_aio(s1, aio); nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); nng_aio_set_msg(aio, msg); nng_send_aio(s1, aio); nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + + nng_aio_free(aio); + nng_msg_free(msg); + NUTS_CLOSE(s1); +} + +static void +test_bus_aio_canceled(void) +{ + nng_socket s1; + nng_aio *aio; + nng_msg *msg; + + NUTS_PASS(nng_bus0_open(&s1)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_recv_aio(s1, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); nng_aio_free(aio); @@ -393,6 +414,7 @@ TEST_LIST = { { "bus recv cancel", test_bus_recv_cancel }, { "bus close recv abort", test_bus_close_recv_abort }, { "bus aio stopped", test_bus_aio_stopped }, + { "bus aio canceled", test_bus_aio_canceled }, { "bus recv buf option", test_bus_recv_buf_option }, { "bus send buf option", test_bus_send_buf_option }, { "bus cooked", test_bus_cooked }, diff --git a/src/sp/protocol/pair0/pair0_test.c b/src/sp/protocol/pair0/pair0_test.c index b01bdf91..d883f1e3 100644 --- a/src/sp/protocol/pair0/pair0_test.c +++ b/src/sp/protocol/pair0/pair0_test.c @@ -199,7 +199,7 @@ test_raw_exchange(void) } void -test_pair0_send_closed_aio(void) +test_pair0_send_stopped_aio(void) { nng_socket s1; nng_aio *aio; @@ -212,6 +212,26 @@ test_pair0_send_closed_aio(void) nng_aio_stop(aio); nng_send_aio(s1, aio); nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + nng_msg_free(msg); + nng_aio_free(aio); + NUTS_PASS(nng_close(s1)); +} + +void +test_pair0_send_canceled_aio(void) +{ + nng_socket s1; + nng_aio *aio; + nng_msg *msg; + + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_pair0_open(&s1)); + nng_aio_set_msg(aio, msg); + nng_send_aio(s1, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); nng_msg_free(msg); nng_aio_free(aio); @@ -219,6 +239,37 @@ test_pair0_send_closed_aio(void) } void +test_pair0_recv_stopped_aio(void) +{ + nng_socket s1; + nng_aio *aio; + + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_pair0_open(&s1)); + nng_aio_stop(aio); + nng_recv_aio(s1, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + nng_aio_free(aio); + NUTS_PASS(nng_close(s1)); +} + +void +test_pair0_recv_canceled_aio(void) +{ + nng_socket s1; + nng_aio *aio; + + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_pair0_open(&s1)); + nng_recv_aio(s1, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + nng_aio_free(aio); + NUTS_PASS(nng_close(s1)); +} +void test_pair0_raw(void) { nng_socket s1; @@ -417,7 +468,10 @@ NUTS_TESTS = { { "pair0 back pressure", test_back_pressure }, { "pair0 send no peer", test_send_no_peer }, { "pair0 raw exchange", test_raw_exchange }, - { "pair0 send closed aio", test_pair0_send_closed_aio }, + { "pair0 send stopped aio", test_pair0_send_stopped_aio }, + { "pair0 send canceled aio", test_pair0_send_canceled_aio }, + { "pair0 recv stopped aio", test_pair0_recv_stopped_aio }, + { "pair0 recv canceled aio", test_pair0_recv_canceled_aio }, { "pair0 raw", test_pair0_raw }, { "pair0 validate peer", test_pair0_validate_peer }, { "pair0 no context", test_pair0_no_context }, diff --git a/src/sp/protocol/pair1/pair1_test.c b/src/sp/protocol/pair1/pair1_test.c index 8d2ad940..55fbb09f 100644 --- a/src/sp/protocol/pair1/pair1_test.c +++ b/src/sp/protocol/pair1/pair1_test.c @@ -265,7 +265,7 @@ test_mono_raw_header(void) } void -test_pair1_send_closed_aio(void) +test_pair1_send_stopped_aio(void) { nng_socket s1; nng_aio *aio; @@ -278,6 +278,26 @@ test_pair1_send_closed_aio(void) nng_aio_stop(aio); nng_send_aio(s1, aio); nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + nng_msg_free(msg); + nng_aio_free(aio); + NUTS_PASS(nng_close(s1)); +} + +void +test_pair1_send_canceled_aio(void) +{ + nng_socket s1; + nng_aio *aio; + nng_msg *msg; + + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_pair1_open(&s1)); + nng_aio_set_msg(aio, msg); + nng_send_aio(s1, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); nng_msg_free(msg); nng_aio_free(aio); @@ -285,6 +305,38 @@ test_pair1_send_closed_aio(void) } void +test_pair1_recv_stopped_aio(void) +{ + nng_socket s1; + nng_aio *aio; + + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_pair1_open(&s1)); + nng_aio_stop(aio); + nng_recv_aio(s1, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + nng_aio_free(aio); + NUTS_PASS(nng_close(s1)); +} + +void +test_pair1_recv_canceled_aio(void) +{ + nng_socket s1; + nng_aio *aio; + + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_pair1_open(&s1)); + nng_recv_aio(s1, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + nng_aio_free(aio); + NUTS_PASS(nng_close(s1)); +} + +void test_pair1_raw(void) { nng_socket s1; @@ -601,7 +653,10 @@ NUTS_TESTS = { { "pair1 send no peer", test_send_no_peer }, { "pair1 mono raw exchange", test_mono_raw_exchange }, { "pair1 mono raw header", test_mono_raw_header }, - { "pair1 send closed aio", test_pair1_send_closed_aio }, + { "pair1 send stopped aio", test_pair1_send_stopped_aio }, + { "pair1 send canceled aio", test_pair1_send_canceled_aio }, + { "pair1 recv stopped aio", test_pair1_recv_stopped_aio }, + { "pair1 recv canceled aio", test_pair1_recv_canceled_aio }, { "pair1 raw", test_pair1_raw }, { "pair1 ttl", test_pair1_ttl }, { "pair1 validate peer", test_pair1_validate_peer }, diff --git a/src/sp/protocol/pipeline0/pull_test.c b/src/sp/protocol/pipeline0/pull_test.c index 74fd6046..fb57b9a2 100644 --- a/src/sp/protocol/pipeline0/pull_test.c +++ b/src/sp/protocol/pipeline0/pull_test.c @@ -175,6 +175,23 @@ test_pull_recv_aio_stopped(void) nng_aio_stop(aio); nng_recv_aio(s, aio); nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + NUTS_CLOSE(s); + nng_aio_free(aio); +} + +static void +test_pull_recv_aio_canceled(void) +{ + nng_socket s; + nng_aio *aio; + + NUTS_PASS(nng_pull0_open(&s)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_recv_aio(s, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); NUTS_CLOSE(s); nng_aio_free(aio); @@ -216,7 +233,7 @@ test_pull_recv_nonblock(void) } static void -test_pull_recv_cancel(void) +test_pull_recv_abort(void) { nng_socket s; nng_aio *aio; @@ -226,10 +243,10 @@ test_pull_recv_cancel(void) nng_aio_set_timeout(aio, 1000); nng_recv_aio(s, aio); - nng_aio_abort(aio, NNG_ECANCELED); + nng_aio_abort(aio, NNG_EAMBIGUOUS); nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_FAIL(nng_aio_result(aio), NNG_EAMBIGUOUS); NUTS_CLOSE(s); nng_aio_free(aio); } @@ -255,9 +272,10 @@ TEST_LIST = { { "pull close pending", test_pull_close_pending }, { "pull validate peer", test_pull_validate_peer }, { "pull recv aio stopped", test_pull_recv_aio_stopped }, + { "pull recv aio canceled", test_pull_recv_aio_canceled }, { "pull close recv", test_pull_close_recv }, { "pull recv nonblock", test_pull_recv_nonblock }, - { "pull recv cancel", test_pull_recv_cancel }, + { "pull recv abort", test_pull_recv_abort }, { "pull cooked", test_pull_cooked }, { NULL, NULL }, }; diff --git a/src/sp/protocol/pipeline0/push_test.c b/src/sp/protocol/pipeline0/push_test.c index 5eb98844..e0d314ee 100644 --- a/src/sp/protocol/pipeline0/push_test.c +++ b/src/sp/protocol/pipeline0/push_test.c @@ -252,7 +252,7 @@ test_push_send_aio_stopped(void) nng_aio_stop(aio); nng_send_aio(s, aio); nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); NUTS_CLOSE(s); nng_aio_free(aio); nng_msg_free(m); diff --git a/src/sp/protocol/pubsub0/pub_test.c b/src/sp/protocol/pubsub0/pub_test.c index a2a20dd7..8b3a5d78 100644 --- a/src/sp/protocol/pubsub0/pub_test.c +++ b/src/sp/protocol/pubsub0/pub_test.c @@ -160,103 +160,6 @@ test_pub_send_queued(void) NUTS_CLOSE(pub); NUTS_CLOSE(sub); } -static void -test_sub_recv_ctx_closed(void) -{ - nng_socket sub; - nng_ctx ctx; - nng_aio *aio; - NUTS_PASS(nng_sub0_open(&sub)); - NUTS_PASS(nng_ctx_open(&ctx, sub)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nng_ctx_close(ctx); - nng_ctx_recv(ctx, aio); - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); - nng_aio_free(aio); - NUTS_CLOSE(sub); -} - -static void -test_sub_ctx_recv_aio_stopped(void) -{ - nng_socket sub; - nng_ctx ctx; - nng_aio *aio; - - NUTS_PASS(nng_sub0_open(&sub)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - NUTS_PASS(nng_ctx_open(&ctx, sub)); - - nng_aio_stop(aio); - nng_ctx_recv(ctx, aio); - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); - NUTS_PASS(nng_ctx_close(ctx)); - NUTS_CLOSE(sub); - nng_aio_free(aio); -} - -static void -test_sub_close_context_recv(void) -{ - nng_socket sub; - nng_ctx ctx; - nng_aio *aio; - - NUTS_PASS(nng_sub0_open(&sub)); - NUTS_PASS(nng_ctx_open(&ctx, sub)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nng_aio_set_timeout(aio, 1000); - nng_ctx_recv(ctx, aio); - NUTS_PASS(nng_ctx_close(ctx)); - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); - - NUTS_CLOSE(sub); - nng_aio_free(aio); -} - -static void -test_sub_ctx_recv_nonblock(void) -{ - nng_socket sub; - nng_ctx ctx; - nng_aio *aio; - - NUTS_PASS(nng_sub0_open(&sub)); - NUTS_PASS(nng_ctx_open(&ctx, sub)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - - nng_aio_set_timeout(aio, 0); // Instant timeout - nng_ctx_recv(ctx, aio); - - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); - NUTS_CLOSE(sub); - nng_aio_free(aio); -} - -static void -test_sub_ctx_recv_cancel(void) -{ - nng_socket sub; - nng_ctx ctx; - nng_aio *aio; - - NUTS_PASS(nng_sub0_open(&sub)); - NUTS_PASS(nng_ctx_open(&ctx, sub)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - - nng_aio_set_timeout(aio, 1000); - nng_ctx_recv(ctx, aio); - nng_aio_abort(aio, NNG_ECANCELED); - - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); - NUTS_CLOSE(sub); - nng_aio_free(aio); -} static void test_pub_send_buf_option(void) @@ -308,11 +211,6 @@ NUTS_TESTS = { { "pub validate peer", test_pub_validate_peer }, { "pub send queued", test_pub_send_queued }, { "pub send no pipes", test_pub_send_no_pipes }, - { "sub recv ctx closed", test_sub_recv_ctx_closed }, - { "sub recv aio ctx stopped", test_sub_ctx_recv_aio_stopped }, - { "sub close context recv", test_sub_close_context_recv }, - { "sub context recv nonblock", test_sub_ctx_recv_nonblock }, - { "sub context recv cancel", test_sub_ctx_recv_cancel }, { "pub send buf option", test_pub_send_buf_option }, { "pub cooked", test_pub_cooked }, { NULL, NULL }, diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c index 74a547f4..6899d47d 100644 --- a/src/sp/protocol/pubsub0/sub_test.c +++ b/src/sp/protocol/pubsub0/sub_test.c @@ -223,6 +223,26 @@ test_sub_ctx_recv_aio_stopped(void) nng_aio_stop(aio); nng_ctx_recv(ctx, aio); nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +static void +test_sub_ctx_recv_aio_canceled(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_aio *aio; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + + nng_ctx_recv(ctx, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); NUTS_PASS(nng_ctx_close(ctx)); NUTS_CLOSE(sub); @@ -270,7 +290,7 @@ test_sub_ctx_recv_nonblock(void) } static void -test_sub_ctx_recv_cancel(void) +test_sub_ctx_recv_abort(void) { nng_socket sub; nng_ctx ctx; @@ -282,10 +302,10 @@ test_sub_ctx_recv_cancel(void) nng_aio_set_timeout(aio, 1000); nng_ctx_recv(ctx, aio); - nng_aio_abort(aio, NNG_ECANCELED); + nng_aio_abort(aio, NNG_EAMBIGUOUS); nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_FAIL(nng_aio_result(aio), NNG_EAMBIGUOUS); NUTS_CLOSE(sub); nng_aio_free(aio); } @@ -593,9 +613,10 @@ TEST_LIST = { { "sub recv late", test_sub_recv_late }, { "sub recv ctx closed", test_sub_recv_ctx_closed }, { "sub recv aio ctx stopped", test_sub_ctx_recv_aio_stopped }, + { "sub recv aio ctx canceled", test_sub_ctx_recv_aio_canceled }, { "sub close context recv", test_sub_close_context_recv }, { "sub context recv nonblock", test_sub_ctx_recv_nonblock }, - { "sub context recv cancel", test_sub_ctx_recv_cancel }, + { "sub context recv abort", test_sub_ctx_recv_abort }, { "sub recv buf option", test_sub_recv_buf_option }, { "sub subscribe option", test_sub_subscribe_option }, { "sub unsubscribe option", test_sub_unsubscribe_option }, diff --git a/src/sp/protocol/pubsub0/xsub_test.c b/src/sp/protocol/pubsub0/xsub_test.c index eb918e14..fd26467d 100644 --- a/src/sp/protocol/pubsub0/xsub_test.c +++ b/src/sp/protocol/pubsub0/xsub_test.c @@ -337,6 +337,23 @@ test_xsub_recv_aio_stopped(void) nng_aio_stop(aio); nng_recv_aio(sub, aio); nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +static void +test_xsub_recv_aio_canceled(void) +{ + nng_socket sub; + nng_aio *aio; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_recv_aio(sub, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); NUTS_CLOSE(sub); nng_aio_free(aio); @@ -358,6 +375,7 @@ TEST_LIST = { { "xsub no context", test_xsub_no_context }, { "xsub raw", test_xsub_raw }, { "xsub recv aio stopped", test_xsub_recv_aio_stopped }, + { "xsub recv aio canceled", test_xsub_recv_aio_canceled }, { "xsub close during recv ", test_xsub_close_during_recv }, { "xsub close during pipe recv", test_xsub_close_during_pipe_recv }, { NULL, NULL }, diff --git a/src/sp/protocol/reqrep0/rep_test.c b/src/sp/protocol/reqrep0/rep_test.c index 579f795c..2a07ecbc 100644 --- a/src/sp/protocol/reqrep0/rep_test.c +++ b/src/sp/protocol/reqrep0/rep_test.c @@ -369,6 +369,26 @@ test_rep_ctx_recv_aio_stopped(void) nng_aio_stop(aio); nng_ctx_recv(ctx, aio); nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(rep); + nng_aio_free(aio); +} + +void +test_rep_ctx_recv_aio_canceled(void) +{ + nng_socket rep; + nng_ctx ctx; + nng_aio *aio; + + NUTS_PASS(nng_rep0_open(&rep)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_ctx_open(&ctx, rep)); + + nng_ctx_recv(ctx, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); NUTS_PASS(nng_ctx_close(ctx)); NUTS_CLOSE(rep); @@ -755,6 +775,7 @@ NUTS_TESTS = { { "rep close pipe before send", test_rep_close_pipe_before_send }, { "rep close pipe during send", test_rep_close_pipe_during_send }, { "rep recv aio ctx stopped", test_rep_ctx_recv_aio_stopped }, + { "rep recv aio ctx canceled", test_rep_ctx_recv_aio_canceled }, { "rep close pipe context send", test_rep_close_pipe_context_send }, { "rep close context send", test_rep_close_context_send }, { "rep close recv", test_rep_close_recv }, diff --git a/src/sp/protocol/reqrep0/reqstress_test.c b/src/sp/protocol/reqrep0/reqstress_test.c index cd0004a0..fd964003 100644 --- a/src/sp/protocol/reqrep0/reqstress_test.c +++ b/src/sp/protocol/reqrep0/reqstress_test.c @@ -79,7 +79,8 @@ fatal(const char *msg, int rv) void error(test_case *c, const char *msg, int rv) { - if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED)) { + if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED) || + (rv == NNG_ESTOPPED)) { return; } fprintf( diff --git a/src/sp/protocol/reqrep0/xrep_test.c b/src/sp/protocol/reqrep0/xrep_test.c index d5110469..068d64a7 100644 --- a/src/sp/protocol/reqrep0/xrep_test.c +++ b/src/sp/protocol/reqrep0/xrep_test.c @@ -262,6 +262,23 @@ test_xrep_recv_aio_stopped(void) nng_aio_stop(aio); nng_recv_aio(rep, aio); nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + NUTS_CLOSE(rep); + nng_aio_free(aio); +} + +static void +test_xrep_recv_aio_canceled(void) +{ + nng_socket rep; + nng_aio *aio; + + NUTS_PASS(nng_rep0_open_raw(&rep)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_recv_aio(rep, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); NUTS_CLOSE(rep); nng_aio_free(aio); @@ -415,6 +432,7 @@ NUTS_TESTS = { { "xrep close pipe during send", test_xrep_close_pipe_during_send }, { "xrep close during recv", test_xrep_close_during_recv }, { "xrep recv aio stopped", test_xrep_recv_aio_stopped }, + { "xrep recv aio canceled", test_xrep_recv_aio_canceled }, { "xrep send no header", test_xrep_send_no_header }, { "xrep recv garbage", test_xrep_recv_garbage }, { "xrep ttl option", test_xrep_ttl_option }, diff --git a/src/sp/protocol/reqrep0/xreq_test.c b/src/sp/protocol/reqrep0/xreq_test.c index 28f381fe..1f06eb17 100644 --- a/src/sp/protocol/reqrep0/xreq_test.c +++ b/src/sp/protocol/reqrep0/xreq_test.c @@ -168,7 +168,28 @@ test_xreq_recv_aio_stopped(void) nng_aio_stop(aio); nng_recv_aio(req, aio); nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + NUTS_CLOSE(req); + nng_aio_free(aio); +} + +static void +test_xreq_send_aio_canceled(void) +{ + nng_socket req; + nng_aio *aio; + nng_msg *msg; + + NUTS_PASS(nng_msg_alloc(&msg, 64)); + NUTS_PASS(nng_req0_open_raw(&req)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_msg(aio, msg); + nng_send_aio(req, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + nng_msg_free(msg); NUTS_CLOSE(req); nng_aio_free(aio); } @@ -348,6 +369,7 @@ NUTS_TESTS = { { "xreq poll writable", test_xreq_poll_writeable }, { "xreq validate peer", test_xreq_validate_peer }, { "xreq recv aio stopped", test_xreq_recv_aio_stopped }, + { "xreq send aio canceled", test_xreq_send_aio_canceled }, { "xreq recv garbage", test_xreq_recv_garbage }, { "xreq recv header", test_xreq_recv_header }, { "xreq close during recv", test_xreq_close_during_recv }, diff --git a/src/sp/protocol/survey0/respond_test.c b/src/sp/protocol/survey0/respond_test.c index 0260dc30..ad8f7e60 100644 --- a/src/sp/protocol/survey0/respond_test.c +++ b/src/sp/protocol/survey0/respond_test.c @@ -257,6 +257,26 @@ test_resp_ctx_recv_aio_stopped(void) nng_aio_stop(aio); nng_ctx_recv(ctx, aio); nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +void +test_resp_ctx_recv_aio_canceled(void) +{ + nng_socket resp; + nng_ctx ctx; + nng_aio *aio; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_ctx_open(&ctx, resp)); + + nng_ctx_recv(ctx, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); NUTS_PASS(nng_ctx_close(ctx)); NUTS_CLOSE(resp); @@ -548,6 +568,7 @@ TEST_LIST = { { "respond close pipe before send", test_resp_close_pipe_before_send }, { "respond close pipe during send", test_resp_close_pipe_during_send }, { "respond recv aio ctx stopped", test_resp_ctx_recv_aio_stopped }, + { "respond recv aio ctx canceled", test_resp_ctx_recv_aio_canceled }, { "respond close pipe context send", test_resp_close_pipe_context_send }, { "respond close context send", test_resp_close_context_send }, diff --git a/src/sp/protocol/survey0/xrespond_test.c b/src/sp/protocol/survey0/xrespond_test.c index 8106f161..579438ec 100644 --- a/src/sp/protocol/survey0/xrespond_test.c +++ b/src/sp/protocol/survey0/xrespond_test.c @@ -262,6 +262,23 @@ test_xresp_recv_aio_stopped(void) nng_aio_stop(aio); nng_recv_aio(resp, aio); nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +static void +test_xresp_recv_aio_canceled(void) +{ + nng_socket resp; + nng_aio *aio; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_recv_aio(resp, aio); + nng_aio_cancel(aio); + nng_aio_wait(aio); NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); NUTS_CLOSE(resp); nng_aio_free(aio); @@ -418,6 +435,7 @@ NUTS_TESTS = { test_xresp_close_pipe_during_send }, { "xrespond close during recv", test_xresp_close_during_recv }, { "xrespond recv aio stopped", test_xresp_recv_aio_stopped }, + { "xrespond recv aio canceled", test_xresp_recv_aio_canceled }, { "xrespond send no header", test_xresp_send_no_header }, { "xrespond recv garbage", test_xresp_recv_garbage }, { "xrespond ttl option", test_xresp_ttl_option }, diff --git a/src/sp/protocol/survey0/xsurvey_test.c b/src/sp/protocol/survey0/xsurvey_test.c index e90939e9..b151f230 100644 --- a/src/sp/protocol/survey0/xsurvey_test.c +++ b/src/sp/protocol/survey0/xsurvey_test.c @@ -167,7 +167,7 @@ test_xsurvey_recv_aio_stopped(void) nng_aio_stop(aio); nng_recv_aio(surv, aio); nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED); NUTS_CLOSE(surv); nng_aio_free(aio); } diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c index 6c0df5c4..97ffdc04 100644 --- a/src/sp/transport/udp/udp.c +++ b/src/sp/transport/udp/udp.c @@ -869,6 +869,7 @@ udp_rx_cb(void *arg) switch (nni_aio_result(aio)) { case NNG_ECLOSED: case NNG_ECANCELED: + case NNG_ESTOPPED: nni_mtx_unlock(&ep->mtx); return; case NNG_ETIMEDOUT: @@ -1183,9 +1184,18 @@ udp_timer_cb(void *arg) nni_mtx_lock(&ep->mtx); rv = nni_aio_result(&ep->timeaio); - if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED) || ep->closed) { + switch (rv) { + case NNG_ECLOSED: + case NNG_ECANCELED: + case NNG_ESTOPPED: nni_mtx_unlock(&ep->mtx); return; + default: + if (ep->closed) { + nni_mtx_unlock(&ep->mtx); + return; + } + break; } uint32_t cursor = 0; |
