aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-09 13:33:11 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-22 21:31:28 -0800
commitc8ce57d668d73d92a071fa86f81e07ca403d8672 (patch)
treead7e602e43fefa64067fdac5fcd23987a50c3a90
parent013bb69c6be2f0a4572f4200de05e664692b6704 (diff)
downloadnng-c8ce57d668d73d92a071fa86f81e07ca403d8672.tar.gz
nng-c8ce57d668d73d92a071fa86f81e07ca403d8672.tar.bz2
nng-c8ce57d668d73d92a071fa86f81e07ca403d8672.zip
aio: introduce nni_aio_defer
This will replace nni_aio_schedule, and it includes finishing the task if needed. It does so without dropping the lock and so is more efficient and race free. This includes some conversion of some subsystems to it.
-rw-r--r--include/nng/nng.h5
-rw-r--r--src/core/aio.c70
-rw-r--r--src/core/aio.h9
-rw-r--r--src/core/device.c3
-rw-r--r--src/core/sockfd.c5
-rw-r--r--src/core/tcp.c4
-rw-r--r--src/nng.c4
-rw-r--r--src/platform/posix/posix_ipcconn.c4
-rw-r--r--src/sp/transport/inproc/inproc.c16
-rw-r--r--src/sp/transport/ws/websocket.c16
-rw-r--r--src/supplemental/http/http_conn.c10
11 files changed, 93 insertions, 53 deletions
diff --git a/include/nng/nng.h b/include/nng/nng.h
index d087805d..4d21287c 100644
--- a/include/nng/nng.h
+++ b/include/nng/nng.h
@@ -640,8 +640,11 @@ NNG_DECL void nng_aio_finish(nng_aio *, int);
// final argument is passed to the cancelfn. The final argument of the
// cancellation function is the error number (will not be zero) corresponding
// to the reason for cancellation, e.g. NNG_ETIMEDOUT or NNG_ECANCELED.
+// This returns false if the operation cannot be deferred (because the AIO
+// has been stopped with nng_aio_stop.) If it does so, then the aio's
+// completion callback will fire with a result of NNG_ECLOSED.
typedef void (*nng_aio_cancelfn)(nng_aio *, void *, int);
-NNG_DECL void nng_aio_defer(nng_aio *, nng_aio_cancelfn, void *);
+NNG_DECL bool nng_aio_defer(nng_aio *, nng_aio_cancelfn, void *);
// nng_aio_sleep does a "sleeping" operation, basically does nothing
// but wait for the specified number of milliseconds to expire, then
diff --git a/src/core/aio.c b/src/core/aio.c
index eaff5c80..0ac70a0d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -8,6 +8,7 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include "core/aio.h"
#include "core/nng_impl.h"
#include "core/taskq.h"
#include <string.h>
@@ -347,14 +348,14 @@ nni_aio_begin(nni_aio *aio)
aio->a_count = 0;
aio->a_cancel_fn = NULL;
aio->a_abort = false;
+ aio->a_expire_ok = false;
+ aio->a_sleep = false;
// We should not reschedule anything at this point.
if (aio->a_stop || eq->eq_stop) {
aio->a_result = NNG_ECANCELED;
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_sleep = false;
- aio->a_expire_ok = false;
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ECANCELED);
@@ -408,6 +409,65 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
return (0);
}
+bool
+nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
+{
+ nni_aio_expire_q *eq = aio->a_expire_q;
+ bool timeout = false;
+
+ if (!aio->a_sleep && !aio->a_use_expire) {
+ // Convert the relative timeout to an absolute timeout.
+ switch (aio->a_timeout) {
+ case NNG_DURATION_ZERO:
+ timeout = true;
+ break;
+ case NNG_DURATION_INFINITE:
+ case NNG_DURATION_DEFAULT:
+ aio->a_expire = NNI_TIME_NEVER;
+ break;
+ default:
+ aio->a_expire = nni_clock() + aio->a_timeout;
+ break;
+ }
+ } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) {
+ timeout = true;
+ }
+
+ nni_mtx_lock(&eq->eq_mtx);
+ if (timeout) {
+ aio->a_sleep = false;
+ aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+ if (aio->a_abort) {
+ aio->a_sleep = false;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+ if (aio->a_stop || eq->eq_stop) {
+ aio->a_sleep = false;
+ aio->a_result = NNG_ECLOSED;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+
+ NNI_ASSERT(aio->a_cancel_fn == NULL);
+ aio->a_cancel_fn = cancel;
+ aio->a_cancel_arg = data;
+
+ // We only schedule expiration if we have a way for the expiration
+ // handler to actively cancel it.
+ if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) {
+ nni_aio_expire_add(aio);
+ }
+ nni_mtx_unlock(&eq->eq_mtx);
+ return (true);
+}
+
// nni_aio_abort is called by a consumer which guarantees that the aio
// is still valid.
void
@@ -766,7 +826,6 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -788,9 +847,8 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
aio->a_expire =
ms == NNG_DURATION_INFINITE ? NNI_TIME_NEVER : nni_clock() + ms;
- if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) {
- nni_aio_finish_error(aio, rv);
- }
+ // we don't do anything else here, so we can ignore the return
+ (void) nni_aio_defer(aio, nni_sleep_cancel, NULL);
}
static bool
diff --git a/src/core/aio.h b/src/core/aio.h
index 9491a2fa..8628d8ef 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -164,8 +164,17 @@ extern void nni_aio_bump_count(nni_aio *, size_t);
// is returned. (In that case the caller should probably either return an
// error to its caller, or possibly cause an asynchronous error by calling
// nni_aio_finish_error on this aio.)
+//
+// NB: This function should be called while holding the lock that will be used
+// to cancel the operation. Otherwise a race can occur where the operation
+// cannot be canceled, which can lead to apparent hangs.
extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);
+// nni_aio_defer is just like nni_io_schedule, but it also calls the callback
+// automatically if the operation cannot be started because the AIO is stopped
+// or was canceled before this call (but after nni_aio_begin).
+extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *);
+
extern void nni_sleep_aio(nni_duration, nni_aio *);
// nni_aio_completion_list is used after removing the aio from an
diff --git a/src/core/device.c b/src/core/device.c
index 815fafcc..7084d3e4 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -228,9 +228,8 @@ nni_device(nni_aio *aio, nni_sock *s1, nni_sock *s2)
nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_aio_schedule(aio, device_cancel, d)) != 0) {
+ if (!nni_aio_defer(aio, device_cancel, d)) {
nni_mtx_unlock(&device_mtx);
- nni_aio_finish_error(aio, rv);
nni_reap(&device_reap, d);
}
device_start(d, aio);
diff --git a/src/core/sockfd.c b/src/core/sockfd.c
index 787a0783..1a0e792f 100644
--- a/src/core/sockfd.c
+++ b/src/core/sockfd.c
@@ -115,7 +115,6 @@ static void
sfd_listener_accept(void *arg, nng_aio *aio)
{
sfd_listener *l = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -129,9 +128,7 @@ sfd_listener_accept(void *arg, nng_aio *aio)
if (l->listen_cnt) {
sfd_start_conn(l, aio);
- } else if ((rv = nni_aio_schedule(aio, sfd_cancel_accept, l)) != 0) {
- nni_aio_finish_error(aio, rv);
- } else {
+ } else if (nni_aio_defer(aio, sfd_cancel_accept, l)) {
nni_aio_list_append(&l->accept_q, aio);
}
nni_mtx_unlock(&l->mtx);
diff --git a/src/core/tcp.c b/src/core/tcp.c
index d2e08493..75b938b0 100644
--- a/src/core/tcp.c
+++ b/src/core/tcp.c
@@ -177,7 +177,6 @@ static void
tcp_dialer_dial(void *arg, nng_aio *aio)
{
tcp_dialer *d = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -187,9 +186,8 @@ tcp_dialer_dial(void *arg, nng_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) {
+ if (!nni_aio_defer(aio, tcp_dial_cancel, d)) {
nni_mtx_unlock(&d->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&d->conaios, aio);
diff --git a/src/nng.c b/src/nng.c
index 2276f2e1..cdcdecd9 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -2062,10 +2062,10 @@ nng_aio_finish(nng_aio *aio, int rv)
nni_aio_finish(aio, rv, nni_aio_count(aio));
}
-void
+bool
nng_aio_defer(nng_aio *aio, nng_aio_cancelfn fn, void *arg)
{
- nni_aio_schedule(aio, fn, arg);
+ return (nni_aio_defer(aio, fn, arg));
}
bool
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
index 224828ff..ea4f5828 100644
--- a/src/platform/posix/posix_ipcconn.c
+++ b/src/platform/posix/posix_ipcconn.c
@@ -246,16 +246,14 @@ static void
ipc_send(void *arg, nni_aio *aio)
{
ipc_conn *c = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, ipc_cancel, c)) != 0) {
+ if (!nni_aio_defer(aio, ipc_cancel, c)) {
nni_mtx_unlock(&c->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&c->writeq, aio);
diff --git a/src/sp/transport/inproc/inproc.c b/src/sp/transport/inproc/inproc.c
index 90317499..fcf75566 100644
--- a/src/sp/transport/inproc/inproc.c
+++ b/src/sp/transport/inproc/inproc.c
@@ -193,7 +193,6 @@ inproc_pipe_send(void *arg, nni_aio *aio)
{
inproc_pipe *pipe = arg;
inproc_queue *queue = pipe->send_queue;
- int rv;
if (nni_aio_begin(aio) != 0) {
// No way to give the message back to the protocol, so
@@ -204,9 +203,8 @@ inproc_pipe_send(void *arg, nni_aio *aio)
}
nni_mtx_lock(&queue->lock);
- if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) {
+ if (!nni_aio_defer(aio, inproc_queue_cancel, queue)) {
nni_mtx_unlock(&queue->lock);
- nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&queue->writers, aio);
@@ -219,16 +217,14 @@ inproc_pipe_recv(void *arg, nni_aio *aio)
{
inproc_pipe *pipe = arg;
inproc_queue *queue = pipe->recv_queue;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&queue->lock);
- if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) {
+ if (!nni_aio_defer(aio, inproc_queue_cancel, queue)) {
nni_mtx_unlock(&queue->lock);
- nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&queue->readers, aio);
@@ -463,7 +459,6 @@ inproc_ep_connect(void *arg, nni_aio *aio)
{
inproc_ep *ep = arg;
inproc_ep *server;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -486,9 +481,8 @@ inproc_ep_connect(void *arg, nni_aio *aio)
// We don't have to worry about the case where a zero timeout
// on connect was specified, as there is no option to specify
// that in the upper API.
- if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) {
+ if (!nni_aio_defer(aio, inproc_ep_cancel, ep)) {
nni_mtx_unlock(&nni_inproc.mx);
- nni_aio_finish_error(aio, rv);
return;
}
@@ -523,7 +517,6 @@ static void
inproc_ep_accept(void *arg, nni_aio *aio)
{
inproc_ep *ep = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -533,9 +526,8 @@ inproc_ep_accept(void *arg, nni_aio *aio)
// We need not worry about the case where a non-blocking
// accept was tried -- there is no API to do such a thing.
- if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) {
+ if (!nni_aio_defer(aio, inproc_ep_cancel, ep)) {
nni_mtx_unlock(&nni_inproc.mx);
- nni_aio_finish_error(aio, rv);
return;
}
diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c
index f962adbf..ab139a48 100644
--- a/src/sp/transport/ws/websocket.c
+++ b/src/sp/transport/ws/websocket.c
@@ -126,15 +126,13 @@ static void
wstran_pipe_recv(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, wstran_pipe_recv_cancel, p)) != 0) {
+ if (!nni_aio_defer(aio, wstran_pipe_recv_cancel, p)) {
nni_mtx_unlock(&p->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
p->user_rxaio = aio;
@@ -161,7 +159,6 @@ static void
wstran_pipe_send(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
// No way to give the message back to the protocol, so
@@ -171,9 +168,8 @@ wstran_pipe_send(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, wstran_pipe_send_cancel, p)) != 0) {
+ if (!nni_aio_defer(aio, wstran_pipe_send_cancel, p)) {
nni_mtx_unlock(&p->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
p->user_txaio = aio;
@@ -271,7 +267,6 @@ static void
wstran_listener_accept(void *arg, nni_aio *aio)
{
ws_listener *l = arg;
- int rv;
// We already bound, so we just need to look for an available
// pipe (created by the handler), and match it.
@@ -280,9 +275,8 @@ wstran_listener_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&l->mtx);
- if ((rv = nni_aio_schedule(aio, wstran_listener_cancel, l)) != 0) {
+ if (!nni_aio_defer(aio, wstran_listener_cancel, l)) {
nni_mtx_unlock(&l->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&l->aios, aio);
@@ -311,16 +305,14 @@ static void
wstran_dialer_connect(void *arg, nni_aio *aio)
{
ws_dialer *d = arg;
- int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&d->mtx);
- if ((rv = nni_aio_schedule(aio, wstran_dialer_cancel, d)) != 0) {
+ if (!nni_aio_defer(aio, wstran_dialer_cancel, d)) {
nni_mtx_unlock(&d->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
NNI_ASSERT(nni_list_empty(&d->aios));
diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c
index 33ed70bb..9ff2c997 100644
--- a/src/supplemental/http/http_conn.c
+++ b/src/supplemental/http/http_conn.c
@@ -363,8 +363,6 @@ http_rd_cancel(nni_aio *aio, void *arg, int rv)
static void
http_rd_submit(nni_http_conn *conn, nni_aio *aio, enum read_flavor flavor)
{
- int rv;
-
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -372,8 +370,7 @@ http_rd_submit(nni_http_conn *conn, nni_aio *aio, enum read_flavor flavor)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- if ((rv = nni_aio_schedule(aio, http_rd_cancel, conn)) != 0) {
- nni_aio_finish_error(aio, rv);
+ if (!nni_aio_defer(aio, http_rd_cancel, conn)) {
return;
}
conn->rd_flavor = flavor;
@@ -483,8 +480,6 @@ http_wr_cancel(nni_aio *aio, void *arg, int rv)
static void
http_wr_submit(nni_http_conn *conn, nni_aio *aio, enum write_flavor flavor)
{
- int rv;
-
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -492,8 +487,7 @@ http_wr_submit(nni_http_conn *conn, nni_aio *aio, enum write_flavor flavor)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- if ((rv = nni_aio_schedule(aio, http_wr_cancel, conn)) != 0) {
- nni_aio_finish_error(aio, rv);
+ if (!nni_aio_defer(aio, http_wr_cancel, conn)) {
return;
}
conn->wr_flavor = flavor;