aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c56
-rw-r--r--src/core/aio_test.c4
-rw-r--r--src/core/dialer.c1
-rw-r--r--src/core/errors_test.c1
-rw-r--r--src/core/listener.c1
-rw-r--r--src/core/protocol.h2
-rw-r--r--src/nng.c1
-rw-r--r--src/sp/multistress_test.c3
-rw-r--r--src/sp/protocol/bus0/bus_test.c24
-rw-r--r--src/sp/protocol/pair0/pair0_test.c58
-rw-r--r--src/sp/protocol/pair1/pair1_test.c59
-rw-r--r--src/sp/protocol/pipeline0/pull_test.c26
-rw-r--r--src/sp/protocol/pipeline0/push_test.c2
-rw-r--r--src/sp/protocol/pubsub0/pub_test.c102
-rw-r--r--src/sp/protocol/pubsub0/sub_test.c29
-rw-r--r--src/sp/protocol/pubsub0/xsub_test.c18
-rw-r--r--src/sp/protocol/reqrep0/rep_test.c21
-rw-r--r--src/sp/protocol/reqrep0/reqstress_test.c3
-rw-r--r--src/sp/protocol/reqrep0/xrep_test.c18
-rw-r--r--src/sp/protocol/reqrep0/xreq_test.c22
-rw-r--r--src/sp/protocol/survey0/respond_test.c21
-rw-r--r--src/sp/protocol/survey0/xrespond_test.c18
-rw-r--r--src/sp/protocol/survey0/xsurvey_test.c2
-rw-r--r--src/sp/transport/udp/udp.c12
24 files changed, 358 insertions, 146 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 54ba7000..f039cdc8 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -60,11 +60,10 @@ static int nni_aio_expire_q_cnt;
// when we want to know if the operation itself is complete.
//
// In order to guard against aio reuse during teardown, we set the a_stop
-// flag. Any attempt to initialize for a new operation after that point
-// will fail and the caller will get NNG_ECANCELED indicating this. The
-// provider that calls nni_aio_begin() MUST check the return value, and
-// if it comes back nonzero (NNG_ECANCELED) then it must simply discard the
-// request and return.
+// flag. Any attempt to submit new operation after that point will fail with
+// the status NNG_ESTOPPED indicating this. The provider that calls
+// nni_aio_begin() MUST check the return value, and if it comes back
+// NNG_ESTOPPED then it must simply discard the request and // return.
//
// Calling nni_aio_wait waits for the current outstanding operation to
// complete, but does not block another one from being started on the
@@ -118,7 +117,7 @@ nni_aio_fini(nni_aio *aio)
nni_mtx_unlock(&eq->eq_mtx);
if (fn != NULL) {
- fn(aio, arg, NNG_ECLOSED);
+ fn(aio, arg, NNG_ESTOPPED);
}
nni_task_fini(&aio->a_task);
@@ -201,7 +200,7 @@ nni_aio_stop(nni_aio *aio)
nni_mtx_unlock(&eq->eq_mtx);
if (fn != NULL) {
- fn(aio, arg, NNG_ECANCELED);
+ fn(aio, arg, NNG_ESTOPPED);
}
nni_aio_wait(aio);
@@ -226,7 +225,7 @@ nni_aio_close(nni_aio *aio)
nni_mtx_unlock(&eq->eq_mtx);
if (fn != NULL) {
- fn(aio, arg, NNG_ECLOSED);
+ fn(aio, arg, NNG_ESTOPPED);
}
}
}
@@ -353,12 +352,13 @@ nni_aio_begin(nni_aio *aio)
// We should not reschedule anything at this point.
if (aio->a_stop || eq->eq_stop) {
- aio->a_result = NNG_ECANCELED;
+ aio->a_result = NNG_ESTOPPED;
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
+ aio->a_stop = true;
nni_mtx_unlock(&eq->eq_mtx);
- return (NNG_ECANCELED);
+ return (NNG_ESTOPPED);
}
nni_task_prep(&aio->a_task);
nni_mtx_unlock(&eq->eq_mtx);
@@ -386,15 +386,17 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
}
nni_mtx_lock(&eq->eq_mtx);
+ if (aio->a_stop || eq->eq_stop) {
+ aio->a_stop = true;
+ nni_mtx_unlock(&eq->eq_mtx);
+ return (NNG_ESTOPPED);
+ }
if (aio->a_abort) {
int rv = aio->a_result;
nni_mtx_unlock(&eq->eq_mtx);
+ NNI_ASSERT(rv != 0);
return (rv);
}
- if (aio->a_stop || eq->eq_stop) {
- nni_mtx_unlock(&eq->eq_mtx);
- return (NNG_ECLOSED);
- }
NNI_ASSERT(aio->a_cancel_fn == NULL);
aio->a_cancel_fn = cancel;
@@ -434,22 +436,25 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
}
nni_mtx_lock(&eq->eq_mtx);
- if (timeout) {
+ if (aio->a_stop || eq->eq_stop) {
+ aio->a_stop = true;
aio->a_sleep = false;
- aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ aio->a_result = NNG_ESTOPPED;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (aio->a_abort) {
aio->a_sleep = false;
+ aio->a_abort = false;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
- if (aio->a_stop || eq->eq_stop) {
+ if (timeout) {
aio->a_sleep = false;
- aio->a_result = NNG_ECLOSED;
+ aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ aio->a_abort = false;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
@@ -499,22 +504,26 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_task_prep(&aio->a_task);
nni_mtx_lock(&eq->eq_mtx);
- if (timeout) {
+ if (aio->a_stop || eq->eq_stop) {
+ aio->a_stop = true;
aio->a_sleep = false;
- aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ aio->a_result = NNG_ESTOPPED;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (aio->a_abort) {
aio->a_sleep = false;
+ aio->a_abort = false;
+ NNI_ASSERT(aio->a_result != 0);
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
- if (aio->a_stop || eq->eq_stop) {
+ if (timeout) {
aio->a_sleep = false;
- aio->a_result = NNG_ECLOSED;
+ aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ aio->a_abort = false;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
@@ -767,7 +776,8 @@ nni_aio_expire_loop(void *arg)
for (uint32_t i = 0; i < exp_idx; i++) {
aio = expires[i];
if (q->eq_stop) {
- rv = NNG_ECANCELED;
+ rv = NNG_ESTOPPED;
+ aio->a_stop = true;
} else if (aio->a_expire_ok) {
aio->a_expire_ok = false;
rv = 0;
diff --git a/src/core/aio_test.c b/src/core/aio_test.c
index f18eb5ed..66682460 100644
--- a/src/core/aio_test.c
+++ b/src/core/aio_test.c
@@ -76,8 +76,8 @@ static void
sleep_reap(void *arg)
{
nng_aio *aio = *(nng_aio **) arg;
- if (nng_aio_result(aio) != NNG_ECANCELED) {
- NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ if (nng_aio_result(aio) != NNG_ESTOPPED) {
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
}
nng_aio_reap(aio);
}
diff --git a/src/core/dialer.c b/src/core/dialer.c
index d684fd47..5bf9915c 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -421,6 +421,7 @@ dialer_connect_cb(void *arg)
break;
case NNG_ECLOSED: // No further action.
case NNG_ECANCELED: // No further action.
+ case NNG_ESTOPPED: // No further action.
nni_dialer_bump_error(d, rv);
break;
case NNG_ECONNREFUSED:
diff --git a/src/core/errors_test.c b/src/core/errors_test.c
index 72a107f4..b02bd36b 100644
--- a/src/core/errors_test.c
+++ b/src/core/errors_test.c
@@ -18,6 +18,7 @@ test_known_errors(void)
NUTS_MATCH(nng_strerror(0), "Hunky dory");
NUTS_MATCH(nng_strerror(NNG_ECLOSED), "Object closed");
NUTS_MATCH(nng_strerror(NNG_ETIMEDOUT), "Timed out");
+ NUTS_MATCH(nng_strerror(NNG_ESTOPPED), "Operation stopped");
}
static void
diff --git a/src/core/listener.c b/src/core/listener.c
index c212ce91..0ba745c9 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -417,6 +417,7 @@ listener_accept_cb(void *arg)
nni_listener_bump_error(l, rv);
listener_accept_start(l);
break;
+ case NNG_ESTOPPED: // no further action
case NNG_ECLOSED: // no further action
case NNG_ECANCELED: // no further action
nni_listener_bump_error(l, rv);
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 0d4d12dc..5231c8f4 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -46,7 +46,7 @@ struct nni_proto_pipe_ops {
// pipe_close is an idempotent, non-blocking, operation, called
// when the pipe is being closed. Any operations pending on the
// pipe should be canceled with NNG_ECLOSED. (Best option is to
- // use nng_aio_close() on them)
+ // use nni_aio_close() on them)
void (*pipe_close)(void *);
// pipe_stop is called during finalization, to ensure that
diff --git a/src/nng.c b/src/nng.c
index cdcdecd9..5f3c6f12 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1356,6 +1356,7 @@ static const struct {
{ NNG_EAMBIGUOUS, "Ambiguous option" },
{ NNG_EBADTYPE, "Incorrect type" },
{ NNG_ECONNSHUT, "Connection shutdown" },
+ { NNG_ESTOPPED, "Operation stopped"},
{ NNG_EINTERNAL, "Internal error detected" },
{ 0, NULL },
// clang-format on
diff --git a/src/sp/multistress_test.c b/src/sp/multistress_test.c
index 1e336205..22e15ff3 100644
--- a/src/sp/multistress_test.c
+++ b/src/sp/multistress_test.c
@@ -86,7 +86,8 @@ fatal(const char *msg, int rv)
void
error(test_case *c, const char *msg, int rv)
{
- if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED)) {
+ if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED) ||
+ (rv == NNG_ESTOPPED)) {
return;
}
fprintf(
diff --git a/src/sp/protocol/bus0/bus_test.c b/src/sp/protocol/bus0/bus_test.c
index a6b04df9..c9952d81 100644
--- a/src/sp/protocol/bus0/bus_test.c
+++ b/src/sp/protocol/bus0/bus_test.c
@@ -189,11 +189,32 @@ test_bus_aio_stopped(void)
nng_recv_aio(s1, aio);
nng_aio_wait(aio);
- NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
nng_aio_set_msg(aio, msg);
nng_send_aio(s1, aio);
nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+
+ nng_aio_free(aio);
+ nng_msg_free(msg);
+ NUTS_CLOSE(s1);
+}
+
+static void
+test_bus_aio_canceled(void)
+{
+ nng_socket s1;
+ nng_aio *aio;
+ nng_msg *msg;
+
+ NUTS_PASS(nng_bus0_open(&s1));
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_recv_aio(s1, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
nng_aio_free(aio);
@@ -393,6 +414,7 @@ TEST_LIST = {
{ "bus recv cancel", test_bus_recv_cancel },
{ "bus close recv abort", test_bus_close_recv_abort },
{ "bus aio stopped", test_bus_aio_stopped },
+ { "bus aio canceled", test_bus_aio_canceled },
{ "bus recv buf option", test_bus_recv_buf_option },
{ "bus send buf option", test_bus_send_buf_option },
{ "bus cooked", test_bus_cooked },
diff --git a/src/sp/protocol/pair0/pair0_test.c b/src/sp/protocol/pair0/pair0_test.c
index b01bdf91..d883f1e3 100644
--- a/src/sp/protocol/pair0/pair0_test.c
+++ b/src/sp/protocol/pair0/pair0_test.c
@@ -199,7 +199,7 @@ test_raw_exchange(void)
}
void
-test_pair0_send_closed_aio(void)
+test_pair0_send_stopped_aio(void)
{
nng_socket s1;
nng_aio *aio;
@@ -212,6 +212,26 @@ test_pair0_send_closed_aio(void)
nng_aio_stop(aio);
nng_send_aio(s1, aio);
nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ nng_msg_free(msg);
+ nng_aio_free(aio);
+ NUTS_PASS(nng_close(s1));
+}
+
+void
+test_pair0_send_canceled_aio(void)
+{
+ nng_socket s1;
+ nng_aio *aio;
+ nng_msg *msg;
+
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_pair0_open(&s1));
+ nng_aio_set_msg(aio, msg);
+ nng_send_aio(s1, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
nng_msg_free(msg);
nng_aio_free(aio);
@@ -219,6 +239,37 @@ test_pair0_send_closed_aio(void)
}
void
+test_pair0_recv_stopped_aio(void)
+{
+ nng_socket s1;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_pair0_open(&s1));
+ nng_aio_stop(aio);
+ nng_recv_aio(s1, aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ nng_aio_free(aio);
+ NUTS_PASS(nng_close(s1));
+}
+
+void
+test_pair0_recv_canceled_aio(void)
+{
+ nng_socket s1;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_pair0_open(&s1));
+ nng_recv_aio(s1, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ nng_aio_free(aio);
+ NUTS_PASS(nng_close(s1));
+}
+void
test_pair0_raw(void)
{
nng_socket s1;
@@ -417,7 +468,10 @@ NUTS_TESTS = {
{ "pair0 back pressure", test_back_pressure },
{ "pair0 send no peer", test_send_no_peer },
{ "pair0 raw exchange", test_raw_exchange },
- { "pair0 send closed aio", test_pair0_send_closed_aio },
+ { "pair0 send stopped aio", test_pair0_send_stopped_aio },
+ { "pair0 send canceled aio", test_pair0_send_canceled_aio },
+ { "pair0 recv stopped aio", test_pair0_recv_stopped_aio },
+ { "pair0 recv canceled aio", test_pair0_recv_canceled_aio },
{ "pair0 raw", test_pair0_raw },
{ "pair0 validate peer", test_pair0_validate_peer },
{ "pair0 no context", test_pair0_no_context },
diff --git a/src/sp/protocol/pair1/pair1_test.c b/src/sp/protocol/pair1/pair1_test.c
index 8d2ad940..55fbb09f 100644
--- a/src/sp/protocol/pair1/pair1_test.c
+++ b/src/sp/protocol/pair1/pair1_test.c
@@ -265,7 +265,7 @@ test_mono_raw_header(void)
}
void
-test_pair1_send_closed_aio(void)
+test_pair1_send_stopped_aio(void)
{
nng_socket s1;
nng_aio *aio;
@@ -278,6 +278,26 @@ test_pair1_send_closed_aio(void)
nng_aio_stop(aio);
nng_send_aio(s1, aio);
nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ nng_msg_free(msg);
+ nng_aio_free(aio);
+ NUTS_PASS(nng_close(s1));
+}
+
+void
+test_pair1_send_canceled_aio(void)
+{
+ nng_socket s1;
+ nng_aio *aio;
+ nng_msg *msg;
+
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_pair1_open(&s1));
+ nng_aio_set_msg(aio, msg);
+ nng_send_aio(s1, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
nng_msg_free(msg);
nng_aio_free(aio);
@@ -285,6 +305,38 @@ test_pair1_send_closed_aio(void)
}
void
+test_pair1_recv_stopped_aio(void)
+{
+ nng_socket s1;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_pair1_open(&s1));
+ nng_aio_stop(aio);
+ nng_recv_aio(s1, aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ nng_aio_free(aio);
+ NUTS_PASS(nng_close(s1));
+}
+
+void
+test_pair1_recv_canceled_aio(void)
+{
+ nng_socket s1;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_pair1_open(&s1));
+ nng_recv_aio(s1, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ nng_aio_free(aio);
+ NUTS_PASS(nng_close(s1));
+}
+
+void
test_pair1_raw(void)
{
nng_socket s1;
@@ -601,7 +653,10 @@ NUTS_TESTS = {
{ "pair1 send no peer", test_send_no_peer },
{ "pair1 mono raw exchange", test_mono_raw_exchange },
{ "pair1 mono raw header", test_mono_raw_header },
- { "pair1 send closed aio", test_pair1_send_closed_aio },
+ { "pair1 send stopped aio", test_pair1_send_stopped_aio },
+ { "pair1 send canceled aio", test_pair1_send_canceled_aio },
+ { "pair1 recv stopped aio", test_pair1_recv_stopped_aio },
+ { "pair1 recv canceled aio", test_pair1_recv_canceled_aio },
{ "pair1 raw", test_pair1_raw },
{ "pair1 ttl", test_pair1_ttl },
{ "pair1 validate peer", test_pair1_validate_peer },
diff --git a/src/sp/protocol/pipeline0/pull_test.c b/src/sp/protocol/pipeline0/pull_test.c
index 74fd6046..fb57b9a2 100644
--- a/src/sp/protocol/pipeline0/pull_test.c
+++ b/src/sp/protocol/pipeline0/pull_test.c
@@ -175,6 +175,23 @@ test_pull_recv_aio_stopped(void)
nng_aio_stop(aio);
nng_recv_aio(s, aio);
nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ NUTS_CLOSE(s);
+ nng_aio_free(aio);
+}
+
+static void
+test_pull_recv_aio_canceled(void)
+{
+ nng_socket s;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_pull0_open(&s));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_recv_aio(s, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
NUTS_CLOSE(s);
nng_aio_free(aio);
@@ -216,7 +233,7 @@ test_pull_recv_nonblock(void)
}
static void
-test_pull_recv_cancel(void)
+test_pull_recv_abort(void)
{
nng_socket s;
nng_aio *aio;
@@ -226,10 +243,10 @@ test_pull_recv_cancel(void)
nng_aio_set_timeout(aio, 1000);
nng_recv_aio(s, aio);
- nng_aio_abort(aio, NNG_ECANCELED);
+ nng_aio_abort(aio, NNG_EAMBIGUOUS);
nng_aio_wait(aio);
- NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ NUTS_FAIL(nng_aio_result(aio), NNG_EAMBIGUOUS);
NUTS_CLOSE(s);
nng_aio_free(aio);
}
@@ -255,9 +272,10 @@ TEST_LIST = {
{ "pull close pending", test_pull_close_pending },
{ "pull validate peer", test_pull_validate_peer },
{ "pull recv aio stopped", test_pull_recv_aio_stopped },
+ { "pull recv aio canceled", test_pull_recv_aio_canceled },
{ "pull close recv", test_pull_close_recv },
{ "pull recv nonblock", test_pull_recv_nonblock },
- { "pull recv cancel", test_pull_recv_cancel },
+ { "pull recv abort", test_pull_recv_abort },
{ "pull cooked", test_pull_cooked },
{ NULL, NULL },
};
diff --git a/src/sp/protocol/pipeline0/push_test.c b/src/sp/protocol/pipeline0/push_test.c
index 5eb98844..e0d314ee 100644
--- a/src/sp/protocol/pipeline0/push_test.c
+++ b/src/sp/protocol/pipeline0/push_test.c
@@ -252,7 +252,7 @@ test_push_send_aio_stopped(void)
nng_aio_stop(aio);
nng_send_aio(s, aio);
nng_aio_wait(aio);
- NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
NUTS_CLOSE(s);
nng_aio_free(aio);
nng_msg_free(m);
diff --git a/src/sp/protocol/pubsub0/pub_test.c b/src/sp/protocol/pubsub0/pub_test.c
index a2a20dd7..8b3a5d78 100644
--- a/src/sp/protocol/pubsub0/pub_test.c
+++ b/src/sp/protocol/pubsub0/pub_test.c
@@ -160,103 +160,6 @@ test_pub_send_queued(void)
NUTS_CLOSE(pub);
NUTS_CLOSE(sub);
}
-static void
-test_sub_recv_ctx_closed(void)
-{
- nng_socket sub;
- nng_ctx ctx;
- nng_aio *aio;
- NUTS_PASS(nng_sub0_open(&sub));
- NUTS_PASS(nng_ctx_open(&ctx, sub));
- NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
- nng_ctx_close(ctx);
- nng_ctx_recv(ctx, aio);
- nng_aio_wait(aio);
- NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED);
- nng_aio_free(aio);
- NUTS_CLOSE(sub);
-}
-
-static void
-test_sub_ctx_recv_aio_stopped(void)
-{
- nng_socket sub;
- nng_ctx ctx;
- nng_aio *aio;
-
- NUTS_PASS(nng_sub0_open(&sub));
- NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
- NUTS_PASS(nng_ctx_open(&ctx, sub));
-
- nng_aio_stop(aio);
- nng_ctx_recv(ctx, aio);
- nng_aio_wait(aio);
- NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
- NUTS_PASS(nng_ctx_close(ctx));
- NUTS_CLOSE(sub);
- nng_aio_free(aio);
-}
-
-static void
-test_sub_close_context_recv(void)
-{
- nng_socket sub;
- nng_ctx ctx;
- nng_aio *aio;
-
- NUTS_PASS(nng_sub0_open(&sub));
- NUTS_PASS(nng_ctx_open(&ctx, sub));
- NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
- nng_aio_set_timeout(aio, 1000);
- nng_ctx_recv(ctx, aio);
- NUTS_PASS(nng_ctx_close(ctx));
- nng_aio_wait(aio);
- NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED);
-
- NUTS_CLOSE(sub);
- nng_aio_free(aio);
-}
-
-static void
-test_sub_ctx_recv_nonblock(void)
-{
- nng_socket sub;
- nng_ctx ctx;
- nng_aio *aio;
-
- NUTS_PASS(nng_sub0_open(&sub));
- NUTS_PASS(nng_ctx_open(&ctx, sub));
- NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
-
- nng_aio_set_timeout(aio, 0); // Instant timeout
- nng_ctx_recv(ctx, aio);
-
- nng_aio_wait(aio);
- NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT);
- NUTS_CLOSE(sub);
- nng_aio_free(aio);
-}
-
-static void
-test_sub_ctx_recv_cancel(void)
-{
- nng_socket sub;
- nng_ctx ctx;
- nng_aio *aio;
-
- NUTS_PASS(nng_sub0_open(&sub));
- NUTS_PASS(nng_ctx_open(&ctx, sub));
- NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
-
- nng_aio_set_timeout(aio, 1000);
- nng_ctx_recv(ctx, aio);
- nng_aio_abort(aio, NNG_ECANCELED);
-
- nng_aio_wait(aio);
- NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
- NUTS_CLOSE(sub);
- nng_aio_free(aio);
-}
static void
test_pub_send_buf_option(void)
@@ -308,11 +211,6 @@ NUTS_TESTS = {
{ "pub validate peer", test_pub_validate_peer },
{ "pub send queued", test_pub_send_queued },
{ "pub send no pipes", test_pub_send_no_pipes },
- { "sub recv ctx closed", test_sub_recv_ctx_closed },
- { "sub recv aio ctx stopped", test_sub_ctx_recv_aio_stopped },
- { "sub close context recv", test_sub_close_context_recv },
- { "sub context recv nonblock", test_sub_ctx_recv_nonblock },
- { "sub context recv cancel", test_sub_ctx_recv_cancel },
{ "pub send buf option", test_pub_send_buf_option },
{ "pub cooked", test_pub_cooked },
{ NULL, NULL },
diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c
index 74a547f4..6899d47d 100644
--- a/src/sp/protocol/pubsub0/sub_test.c
+++ b/src/sp/protocol/pubsub0/sub_test.c
@@ -223,6 +223,26 @@ test_sub_ctx_recv_aio_stopped(void)
nng_aio_stop(aio);
nng_ctx_recv(ctx, aio);
nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ NUTS_PASS(nng_ctx_close(ctx));
+ NUTS_CLOSE(sub);
+ nng_aio_free(aio);
+}
+
+static void
+test_sub_ctx_recv_aio_canceled(void)
+{
+ nng_socket sub;
+ nng_ctx ctx;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_sub0_open(&sub));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_ctx_open(&ctx, sub));
+
+ nng_ctx_recv(ctx, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
NUTS_PASS(nng_ctx_close(ctx));
NUTS_CLOSE(sub);
@@ -270,7 +290,7 @@ test_sub_ctx_recv_nonblock(void)
}
static void
-test_sub_ctx_recv_cancel(void)
+test_sub_ctx_recv_abort(void)
{
nng_socket sub;
nng_ctx ctx;
@@ -282,10 +302,10 @@ test_sub_ctx_recv_cancel(void)
nng_aio_set_timeout(aio, 1000);
nng_ctx_recv(ctx, aio);
- nng_aio_abort(aio, NNG_ECANCELED);
+ nng_aio_abort(aio, NNG_EAMBIGUOUS);
nng_aio_wait(aio);
- NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ NUTS_FAIL(nng_aio_result(aio), NNG_EAMBIGUOUS);
NUTS_CLOSE(sub);
nng_aio_free(aio);
}
@@ -593,9 +613,10 @@ TEST_LIST = {
{ "sub recv late", test_sub_recv_late },
{ "sub recv ctx closed", test_sub_recv_ctx_closed },
{ "sub recv aio ctx stopped", test_sub_ctx_recv_aio_stopped },
+ { "sub recv aio ctx canceled", test_sub_ctx_recv_aio_canceled },
{ "sub close context recv", test_sub_close_context_recv },
{ "sub context recv nonblock", test_sub_ctx_recv_nonblock },
- { "sub context recv cancel", test_sub_ctx_recv_cancel },
+ { "sub context recv abort", test_sub_ctx_recv_abort },
{ "sub recv buf option", test_sub_recv_buf_option },
{ "sub subscribe option", test_sub_subscribe_option },
{ "sub unsubscribe option", test_sub_unsubscribe_option },
diff --git a/src/sp/protocol/pubsub0/xsub_test.c b/src/sp/protocol/pubsub0/xsub_test.c
index eb918e14..fd26467d 100644
--- a/src/sp/protocol/pubsub0/xsub_test.c
+++ b/src/sp/protocol/pubsub0/xsub_test.c
@@ -337,6 +337,23 @@ test_xsub_recv_aio_stopped(void)
nng_aio_stop(aio);
nng_recv_aio(sub, aio);
nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ NUTS_CLOSE(sub);
+ nng_aio_free(aio);
+}
+
+static void
+test_xsub_recv_aio_canceled(void)
+{
+ nng_socket sub;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_sub0_open_raw(&sub));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_recv_aio(sub, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
NUTS_CLOSE(sub);
nng_aio_free(aio);
@@ -358,6 +375,7 @@ TEST_LIST = {
{ "xsub no context", test_xsub_no_context },
{ "xsub raw", test_xsub_raw },
{ "xsub recv aio stopped", test_xsub_recv_aio_stopped },
+ { "xsub recv aio canceled", test_xsub_recv_aio_canceled },
{ "xsub close during recv ", test_xsub_close_during_recv },
{ "xsub close during pipe recv", test_xsub_close_during_pipe_recv },
{ NULL, NULL },
diff --git a/src/sp/protocol/reqrep0/rep_test.c b/src/sp/protocol/reqrep0/rep_test.c
index 579f795c..2a07ecbc 100644
--- a/src/sp/protocol/reqrep0/rep_test.c
+++ b/src/sp/protocol/reqrep0/rep_test.c
@@ -369,6 +369,26 @@ test_rep_ctx_recv_aio_stopped(void)
nng_aio_stop(aio);
nng_ctx_recv(ctx, aio);
nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ NUTS_PASS(nng_ctx_close(ctx));
+ NUTS_CLOSE(rep);
+ nng_aio_free(aio);
+}
+
+void
+test_rep_ctx_recv_aio_canceled(void)
+{
+ nng_socket rep;
+ nng_ctx ctx;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_ctx_open(&ctx, rep));
+
+ nng_ctx_recv(ctx, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
NUTS_PASS(nng_ctx_close(ctx));
NUTS_CLOSE(rep);
@@ -755,6 +775,7 @@ NUTS_TESTS = {
{ "rep close pipe before send", test_rep_close_pipe_before_send },
{ "rep close pipe during send", test_rep_close_pipe_during_send },
{ "rep recv aio ctx stopped", test_rep_ctx_recv_aio_stopped },
+ { "rep recv aio ctx canceled", test_rep_ctx_recv_aio_canceled },
{ "rep close pipe context send", test_rep_close_pipe_context_send },
{ "rep close context send", test_rep_close_context_send },
{ "rep close recv", test_rep_close_recv },
diff --git a/src/sp/protocol/reqrep0/reqstress_test.c b/src/sp/protocol/reqrep0/reqstress_test.c
index cd0004a0..fd964003 100644
--- a/src/sp/protocol/reqrep0/reqstress_test.c
+++ b/src/sp/protocol/reqrep0/reqstress_test.c
@@ -79,7 +79,8 @@ fatal(const char *msg, int rv)
void
error(test_case *c, const char *msg, int rv)
{
- if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED)) {
+ if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED) ||
+ (rv == NNG_ESTOPPED)) {
return;
}
fprintf(
diff --git a/src/sp/protocol/reqrep0/xrep_test.c b/src/sp/protocol/reqrep0/xrep_test.c
index d5110469..068d64a7 100644
--- a/src/sp/protocol/reqrep0/xrep_test.c
+++ b/src/sp/protocol/reqrep0/xrep_test.c
@@ -262,6 +262,23 @@ test_xrep_recv_aio_stopped(void)
nng_aio_stop(aio);
nng_recv_aio(rep, aio);
nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ NUTS_CLOSE(rep);
+ nng_aio_free(aio);
+}
+
+static void
+test_xrep_recv_aio_canceled(void)
+{
+ nng_socket rep;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_recv_aio(rep, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
NUTS_CLOSE(rep);
nng_aio_free(aio);
@@ -415,6 +432,7 @@ NUTS_TESTS = {
{ "xrep close pipe during send", test_xrep_close_pipe_during_send },
{ "xrep close during recv", test_xrep_close_during_recv },
{ "xrep recv aio stopped", test_xrep_recv_aio_stopped },
+ { "xrep recv aio canceled", test_xrep_recv_aio_canceled },
{ "xrep send no header", test_xrep_send_no_header },
{ "xrep recv garbage", test_xrep_recv_garbage },
{ "xrep ttl option", test_xrep_ttl_option },
diff --git a/src/sp/protocol/reqrep0/xreq_test.c b/src/sp/protocol/reqrep0/xreq_test.c
index 28f381fe..1f06eb17 100644
--- a/src/sp/protocol/reqrep0/xreq_test.c
+++ b/src/sp/protocol/reqrep0/xreq_test.c
@@ -168,7 +168,28 @@ test_xreq_recv_aio_stopped(void)
nng_aio_stop(aio);
nng_recv_aio(req, aio);
nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ NUTS_CLOSE(req);
+ nng_aio_free(aio);
+}
+
+static void
+test_xreq_send_aio_canceled(void)
+{
+ nng_socket req;
+ nng_aio *aio;
+ nng_msg *msg;
+
+ NUTS_PASS(nng_msg_alloc(&msg, 64));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_aio_set_msg(aio, msg);
+ nng_send_aio(req, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ nng_msg_free(msg);
NUTS_CLOSE(req);
nng_aio_free(aio);
}
@@ -348,6 +369,7 @@ NUTS_TESTS = {
{ "xreq poll writable", test_xreq_poll_writeable },
{ "xreq validate peer", test_xreq_validate_peer },
{ "xreq recv aio stopped", test_xreq_recv_aio_stopped },
+ { "xreq send aio canceled", test_xreq_send_aio_canceled },
{ "xreq recv garbage", test_xreq_recv_garbage },
{ "xreq recv header", test_xreq_recv_header },
{ "xreq close during recv", test_xreq_close_during_recv },
diff --git a/src/sp/protocol/survey0/respond_test.c b/src/sp/protocol/survey0/respond_test.c
index 0260dc30..ad8f7e60 100644
--- a/src/sp/protocol/survey0/respond_test.c
+++ b/src/sp/protocol/survey0/respond_test.c
@@ -257,6 +257,26 @@ test_resp_ctx_recv_aio_stopped(void)
nng_aio_stop(aio);
nng_ctx_recv(ctx, aio);
nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ NUTS_PASS(nng_ctx_close(ctx));
+ NUTS_CLOSE(resp);
+ nng_aio_free(aio);
+}
+
+void
+test_resp_ctx_recv_aio_canceled(void)
+{
+ nng_socket resp;
+ nng_ctx ctx;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_respondent0_open(&resp));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_ctx_open(&ctx, resp));
+
+ nng_ctx_recv(ctx, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
NUTS_PASS(nng_ctx_close(ctx));
NUTS_CLOSE(resp);
@@ -548,6 +568,7 @@ TEST_LIST = {
{ "respond close pipe before send", test_resp_close_pipe_before_send },
{ "respond close pipe during send", test_resp_close_pipe_during_send },
{ "respond recv aio ctx stopped", test_resp_ctx_recv_aio_stopped },
+ { "respond recv aio ctx canceled", test_resp_ctx_recv_aio_canceled },
{ "respond close pipe context send",
test_resp_close_pipe_context_send },
{ "respond close context send", test_resp_close_context_send },
diff --git a/src/sp/protocol/survey0/xrespond_test.c b/src/sp/protocol/survey0/xrespond_test.c
index 8106f161..579438ec 100644
--- a/src/sp/protocol/survey0/xrespond_test.c
+++ b/src/sp/protocol/survey0/xrespond_test.c
@@ -262,6 +262,23 @@ test_xresp_recv_aio_stopped(void)
nng_aio_stop(aio);
nng_recv_aio(resp, aio);
nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
+ NUTS_CLOSE(resp);
+ nng_aio_free(aio);
+}
+
+static void
+test_xresp_recv_aio_canceled(void)
+{
+ nng_socket resp;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_respondent0_open_raw(&resp));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_recv_aio(resp, aio);
+ nng_aio_cancel(aio);
+ nng_aio_wait(aio);
NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
NUTS_CLOSE(resp);
nng_aio_free(aio);
@@ -418,6 +435,7 @@ NUTS_TESTS = {
test_xresp_close_pipe_during_send },
{ "xrespond close during recv", test_xresp_close_during_recv },
{ "xrespond recv aio stopped", test_xresp_recv_aio_stopped },
+ { "xrespond recv aio canceled", test_xresp_recv_aio_canceled },
{ "xrespond send no header", test_xresp_send_no_header },
{ "xrespond recv garbage", test_xresp_recv_garbage },
{ "xrespond ttl option", test_xresp_ttl_option },
diff --git a/src/sp/protocol/survey0/xsurvey_test.c b/src/sp/protocol/survey0/xsurvey_test.c
index e90939e9..b151f230 100644
--- a/src/sp/protocol/survey0/xsurvey_test.c
+++ b/src/sp/protocol/survey0/xsurvey_test.c
@@ -167,7 +167,7 @@ test_xsurvey_recv_aio_stopped(void)
nng_aio_stop(aio);
nng_recv_aio(surv, aio);
nng_aio_wait(aio);
- NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ESTOPPED);
NUTS_CLOSE(surv);
nng_aio_free(aio);
}
diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c
index 6c0df5c4..97ffdc04 100644
--- a/src/sp/transport/udp/udp.c
+++ b/src/sp/transport/udp/udp.c
@@ -869,6 +869,7 @@ udp_rx_cb(void *arg)
switch (nni_aio_result(aio)) {
case NNG_ECLOSED:
case NNG_ECANCELED:
+ case NNG_ESTOPPED:
nni_mtx_unlock(&ep->mtx);
return;
case NNG_ETIMEDOUT:
@@ -1183,9 +1184,18 @@ udp_timer_cb(void *arg)
nni_mtx_lock(&ep->mtx);
rv = nni_aio_result(&ep->timeaio);
- if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED) || ep->closed) {
+ switch (rv) {
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ case NNG_ESTOPPED:
nni_mtx_unlock(&ep->mtx);
return;
+ default:
+ if (ep->closed) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ break;
}
uint32_t cursor = 0;