aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/nng/nng.h5
-rw-r--r--src/core/aio.c70
-rw-r--r--src/core/aio.h9
-rw-r--r--src/core/device.c3
-rw-r--r--src/core/sockfd.c5
-rw-r--r--src/core/tcp.c4
-rw-r--r--src/nng.c4
-rw-r--r--src/platform/posix/posix_ipcconn.c4
-rw-r--r--src/sp/transport/inproc/inproc.c16
-rw-r--r--src/sp/transport/ws/websocket.c16
-rw-r--r--src/supplemental/http/http_conn.c10
11 files changed, 93 insertions, 53 deletions
diff --git a/include/nng/nng.h b/include/nng/nng.h
index d087805d..4d21287c 100644
--- a/include/nng/nng.h
+++ b/include/nng/nng.h
@@ -640,8 +640,11 @@ NNG_DECL void nng_aio_finish(nng_aio *, int);
// final argument is passed to the cancelfn. The final argument of the
// cancellation function is the error number (will not be zero) corresponding
// to the reason for cancellation, e.g. NNG_ETIMEDOUT or NNG_ECANCELED.
+// This returns false if the operation cannot be deferred (because the AIO
+// has been stopped with nng_aio_stop.) If it does so, then the aio's
+// completion callback will fire with a result of NNG_ECLOSED.
typedef void (*nng_aio_cancelfn)(nng_aio *, void *, int);
-NNG_DECL void nng_aio_defer(nng_aio *, nng_aio_cancelfn, void *);
+NNG_DECL bool nng_aio_defer(nng_aio *, nng_aio_cancelfn, void *);
// nng_aio_sleep does a "sleeping" operation, basically does nothing
// but wait for the specified number of milliseconds to expire, then
diff --git a/src/core/aio.c b/src/core/aio.c
index eaff5c80..0ac70a0d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -8,6 +8,7 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include "core/aio.h"
#include "core/nng_impl.h"
#include "core/taskq.h"
#include <string.h>
@@ -347,14 +348,14 @@ nni_aio_begin(nni_aio *aio)
aio->a_count = 0;
aio->a_cancel_fn = NULL;
aio->a_abort = false;
+ aio->a_expire_ok = false;
+ aio->a_sleep = false;
// We should not reschedule anything at this point.
if (aio->a_stop || eq->eq_stop) {
aio->a_result = NNG_ECANCELED;
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_sleep = false;
- aio->a_expire_ok = false;
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ECANCELED);
@@ -408,6 +409,65 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
return (0);
}
+bool
+nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
+{
+ nni_aio_expire_q *eq = aio->a_expire_q;
+ bool timeout = false;
+
+ if (!aio->a_sleep && !aio->a_use_expire) {
+ // Convert the relative timeout to an absolute timeout.
+ switch (aio->a_timeout) {
+ case NNG_DURATION_ZERO:
+ timeout = true;
+ break;
+ case NNG_DURATION_INFINITE:
+ case NNG_DURATION_DEFAULT:
+ aio->a_expire = NNI_TIME_NEVER;
+ break;
+ default:
+ aio->a_expire = nni_clock() + aio->a_timeout;
+ break;
+ }
+ } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) {
+ timeout = true;
+ }
+
+ nni_mtx_lock(&eq->eq_mtx);
+ if (timeout) {
+ aio->a_sleep = false;
+ aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+ if (aio->a_abort) {
+ aio->a_sleep = false;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+ if (aio->a_stop || eq->eq_stop) {
+ aio->a_sleep = false;
+ aio->a_result = NNG_ECLOSED;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+
+ NNI_ASSERT(aio->a_cancel_fn == NULL);
+ aio->a_cancel_fn = cancel;
+ aio->a_cancel_arg = data;
+
+ // We only schedule expiration if we have a way for the expiration
+ // handler to actively cancel it.
+ if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) {
+ nni_aio_expire_add(aio);
+ }
+ nni_mtx_unlock(&eq->eq_mtx);
+ return (true);
+}
+
// nni_aio_abort is called by a consumer which guarantees that the aio
// is still valid.
void
@@ -766,7 +826,6 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -788,9 +847,8 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
aio->a_expire =
ms == NNG_DURATION_INFINITE ? NNI_TIME_NEVER : nni_clock() + ms;
- if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) {
- nni_aio_finish_error(aio, rv);
- }
+ // we don't do anything else here, so we can ignore the return
+ (void) nni_aio_defer(aio, nni_sleep_cancel, NULL);
}
static bool
diff --git a/src/core/aio.h b/src/core/aio.h
index 9491a2fa..8628d8ef 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -164,8 +164,17 @@ extern void nni_aio_bump_count(nni_aio *, size_t);
// is returned. (In that case the caller should probably either return an
// error to its caller, or possibly cause an asynchronous error by calling
// nni_aio_finish_error on this aio.)
+//
+// NB: This function should be called while holding the lock that will be used
+// to cancel the operation. Otherwise a race can occur where the operation
+// cannot be canceled, which can lead to apparent hangs.
extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);
+// nni_aio_defer is just like nni_io_schedule, but it also calls the callback
+// automatically if the operation cannot be started because the AIO is stopped
+// or was canceled before this call (but after nni_aio_begin).
+extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *);
+
extern void nni_sleep_aio(nni_duration, nni_aio *);
// nni_aio_completion_list is used after removing the aio from an
diff --git a/src/core/device.c b/src/core/device.c
index 815fafcc..7084d3e4 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -228,9 +228,8 @@ nni_device(nni_aio *aio, nni_sock *s1, nni_sock *s2)
nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_aio_schedule(aio, device_cancel, d)) != 0) {
+ if (!nni_aio_defer(aio, device_cancel, d)) {
nni_mtx_unlock(&device_mtx);
- nni_aio_finish_error(aio, rv);
nni_reap(&device_reap, d);
}
device_start(d, aio);
diff --git a/src/core/sockfd.c b/src/core/sockfd.c
index 787a0783..1a0e792f 100644
--- a/src/core/sockfd.c
+++ b/src/core/sockfd.c
@@ -115,7 +115,6 @@ static void
sfd_listener_accept(void *arg, nng_aio *aio)
{
sfd_listener *l = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -129,9 +128,7 @@ sfd_listener_accept(void *arg, nng_aio *aio)
if (l->listen_cnt) {
sfd_start_conn(l, aio);
- } else if ((rv = nni_aio_schedule(aio, sfd_cancel_accept, l)) != 0) {
- nni_aio_finish_error(aio, rv);
- } else {
+ } else if (nni_aio_defer(aio, sfd_cancel_accept, l)) {
nni_aio_list_append(&l->accept_q, aio);
}
nni_mtx_unlock(&l->mtx);
diff --git a/src/core/tcp.c b/src/core/tcp.c
index d2e08493..75b938b0 100644
--- a/src/core/tcp.c
+++ b/src/core/tcp.c
@@ -177,7 +177,6 @@ static void
tcp_dialer_dial(void *arg, nng_aio *aio)
{
tcp_dialer *d = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -187,9 +186,8 @@ tcp_dialer_dial(void *arg, nng_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) {
+ if (!nni_aio_defer(aio, tcp_dial_cancel, d)) {
nni_mtx_unlock(&d->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&d->conaios, aio);
diff --git a/src/nng.c b/src/nng.c
index 2276f2e1..cdcdecd9 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -2062,10 +2062,10 @@ nng_aio_finish(nng_aio *aio, int rv)
nni_aio_finish(aio, rv, nni_aio_count(aio));
}
-void
+bool
nng_aio_defer(nng_aio *aio, nng_aio_cancelfn fn, void *arg)
{
- nni_aio_schedule(aio, fn, arg);
+ return (nni_aio_defer(aio, fn, arg));
}
bool
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
index 224828ff..ea4f5828 100644
--- a/src/platform/posix/posix_ipcconn.c
+++ b/src/platform/posix/posix_ipcconn.c
@@ -246,16 +246,14 @@ static void
ipc_send(void *arg, nni_aio *aio)
{
ipc_conn *c = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, ipc_cancel, c)) != 0) {
+ if (!nni_aio_defer(aio, ipc_cancel, c)) {
nni_mtx_unlock(&c->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&c->writeq, aio);
diff --git a/src/sp/transport/inproc/inproc.c b/src/sp/transport/inproc/inproc.c
index 90317499..fcf75566 100644
--- a/src/sp/transport/inproc/inproc.c
+++ b/src/sp/transport/inproc/inproc.c
@@ -193,7 +193,6 @@ inproc_pipe_send(void *arg, nni_aio *aio)
{
inproc_pipe *pipe = arg;
inproc_queue *queue = pipe->send_queue;
- int rv;
if (nni_aio_begin(aio) != 0) {
// No way to give the message back to the protocol, so
@@ -204,9 +203,8 @@ inproc_pipe_send(void *arg, nni_aio *aio)
}
nni_mtx_lock(&queue->lock);
- if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) {
+ if (!nni_aio_defer(aio, inproc_queue_cancel, queue)) {
nni_mtx_unlock(&queue->lock);
- nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&queue->writers, aio);
@@ -219,16 +217,14 @@ inproc_pipe_recv(void *arg, nni_aio *aio)
{
inproc_pipe *pipe = arg;
inproc_queue *queue = pipe->recv_queue;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&queue->lock);
- if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) {
+ if (!nni_aio_defer(aio, inproc_queue_cancel, queue)) {
nni_mtx_unlock(&queue->lock);
- nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&queue->readers, aio);
@@ -463,7 +459,6 @@ inproc_ep_connect(void *arg, nni_aio *aio)
{
inproc_ep *ep = arg;
inproc_ep *server;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -486,9 +481,8 @@ inproc_ep_connect(void *arg, nni_aio *aio)
// We don't have to worry about the case where a zero timeout
// on connect was specified, as there is no option to specify
// that in the upper API.
- if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) {
+ if (!nni_aio_defer(aio, inproc_ep_cancel, ep)) {
nni_mtx_unlock(&nni_inproc.mx);
- nni_aio_finish_error(aio, rv);
return;
}
@@ -523,7 +517,6 @@ static void
inproc_ep_accept(void *arg, nni_aio *aio)
{
inproc_ep *ep = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -533,9 +526,8 @@ inproc_ep_accept(void *arg, nni_aio *aio)
// We need not worry about the case where a non-blocking
// accept was tried -- there is no API to do such a thing.
- if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) {
+ if (!nni_aio_defer(aio, inproc_ep_cancel, ep)) {
nni_mtx_unlock(&nni_inproc.mx);
- nni_aio_finish_error(aio, rv);
return;
}
diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c
index f962adbf..ab139a48 100644
--- a/src/sp/transport/ws/websocket.c
+++ b/src/sp/transport/ws/websocket.c
@@ -126,15 +126,13 @@ static void
wstran_pipe_recv(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, wstran_pipe_recv_cancel, p)) != 0) {
+ if (!nni_aio_defer(aio, wstran_pipe_recv_cancel, p)) {
nni_mtx_unlock(&p->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
p->user_rxaio = aio;
@@ -161,7 +159,6 @@ static void
wstran_pipe_send(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
// No way to give the message back to the protocol, so
@@ -171,9 +168,8 @@ wstran_pipe_send(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, wstran_pipe_send_cancel, p)) != 0) {
+ if (!nni_aio_defer(aio, wstran_pipe_send_cancel, p)) {
nni_mtx_unlock(&p->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
p->user_txaio = aio;
@@ -271,7 +267,6 @@ static void
wstran_listener_accept(void *arg, nni_aio *aio)
{
ws_listener *l = arg;
- int rv;
// We already bound, so we just need to look for an available
// pipe (created by the handler), and match it.
@@ -280,9 +275,8 @@ wstran_listener_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&l->mtx);
- if ((rv = nni_aio_schedule(aio, wstran_listener_cancel, l)) != 0) {
+ if (!nni_aio_defer(aio, wstran_listener_cancel, l)) {
nni_mtx_unlock(&l->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&l->aios, aio);
@@ -311,16 +305,14 @@ static void
wstran_dialer_connect(void *arg, nni_aio *aio)
{
ws_dialer *d = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&d->mtx);
- if ((rv = nni_aio_schedule(aio, wstran_dialer_cancel, d)) != 0) {
+ if (!nni_aio_defer(aio, wstran_dialer_cancel, d)) {
nni_mtx_unlock(&d->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
NNI_ASSERT(nni_list_empty(&d->aios));
diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c
index 33ed70bb..9ff2c997 100644
--- a/src/supplemental/http/http_conn.c
+++ b/src/supplemental/http/http_conn.c
@@ -363,8 +363,6 @@ http_rd_cancel(nni_aio *aio, void *arg, int rv)
static void
http_rd_submit(nni_http_conn *conn, nni_aio *aio, enum read_flavor flavor)
{
- int rv;
-
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -372,8 +370,7 @@ http_rd_submit(nni_http_conn *conn, nni_aio *aio, enum read_flavor flavor)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- if ((rv = nni_aio_schedule(aio, http_rd_cancel, conn)) != 0) {
- nni_aio_finish_error(aio, rv);
+ if (!nni_aio_defer(aio, http_rd_cancel, conn)) {
return;
}
conn->rd_flavor = flavor;
@@ -483,8 +480,6 @@ http_wr_cancel(nni_aio *aio, void *arg, int rv)
static void
http_wr_submit(nni_http_conn *conn, nni_aio *aio, enum write_flavor flavor)
{
- int rv;
-
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -492,8 +487,7 @@ http_wr_submit(nni_http_conn *conn, nni_aio *aio, enum write_flavor flavor)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- if ((rv = nni_aio_schedule(aio, http_wr_cancel, conn)) != 0) {
- nni_aio_finish_error(aio, rv);
+ if (!nni_aio_defer(aio, http_wr_cancel, conn)) {
return;
}
conn->wr_flavor = flavor;