diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-09 13:33:11 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-22 21:31:28 -0800 |
| commit | c8ce57d668d73d92a071fa86f81e07ca403d8672 (patch) | |
| tree | ad7e602e43fefa64067fdac5fcd23987a50c3a90 /src/sp/transport | |
| parent | 013bb69c6be2f0a4572f4200de05e664692b6704 (diff) | |
| download | nng-c8ce57d668d73d92a071fa86f81e07ca403d8672.tar.gz nng-c8ce57d668d73d92a071fa86f81e07ca403d8672.tar.bz2 nng-c8ce57d668d73d92a071fa86f81e07ca403d8672.zip | |
aio: introduce nni_aio_defer
This will replace nni_aio_schedule, and it includes finishing the
task if needed. It does so without dropping the lock and so is
more efficient and race free.
This includes some conversion of some subsystems to it.
Diffstat (limited to 'src/sp/transport')
| -rw-r--r-- | src/sp/transport/inproc/inproc.c | 16 | ||||
| -rw-r--r-- | src/sp/transport/ws/websocket.c | 16 |
2 files changed, 8 insertions, 24 deletions
diff --git a/src/sp/transport/inproc/inproc.c b/src/sp/transport/inproc/inproc.c index 90317499..fcf75566 100644 --- a/src/sp/transport/inproc/inproc.c +++ b/src/sp/transport/inproc/inproc.c @@ -193,7 +193,6 @@ inproc_pipe_send(void *arg, nni_aio *aio) { inproc_pipe *pipe = arg; inproc_queue *queue = pipe->send_queue; - int rv; if (nni_aio_begin(aio) != 0) { // No way to give the message back to the protocol, so @@ -204,9 +203,8 @@ inproc_pipe_send(void *arg, nni_aio *aio) } nni_mtx_lock(&queue->lock); - if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) { + if (!nni_aio_defer(aio, inproc_queue_cancel, queue)) { nni_mtx_unlock(&queue->lock); - nni_aio_finish_error(aio, rv); return; } nni_aio_list_append(&queue->writers, aio); @@ -219,16 +217,14 @@ inproc_pipe_recv(void *arg, nni_aio *aio) { inproc_pipe *pipe = arg; inproc_queue *queue = pipe->recv_queue; - int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&queue->lock); - if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) { + if (!nni_aio_defer(aio, inproc_queue_cancel, queue)) { nni_mtx_unlock(&queue->lock); - nni_aio_finish_error(aio, rv); return; } nni_aio_list_append(&queue->readers, aio); @@ -463,7 +459,6 @@ inproc_ep_connect(void *arg, nni_aio *aio) { inproc_ep *ep = arg; inproc_ep *server; - int rv; if (nni_aio_begin(aio) != 0) { return; @@ -486,9 +481,8 @@ inproc_ep_connect(void *arg, nni_aio *aio) // We don't have to worry about the case where a zero timeout // on connect was specified, as there is no option to specify // that in the upper API. - if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { + if (!nni_aio_defer(aio, inproc_ep_cancel, ep)) { nni_mtx_unlock(&nni_inproc.mx); - nni_aio_finish_error(aio, rv); return; } @@ -523,7 +517,6 @@ static void inproc_ep_accept(void *arg, nni_aio *aio) { inproc_ep *ep = arg; - int rv; if (nni_aio_begin(aio) != 0) { return; @@ -533,9 +526,8 @@ inproc_ep_accept(void *arg, nni_aio *aio) // We need not worry about the case where a non-blocking // accept was tried -- there is no API to do such a thing. - if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { + if (!nni_aio_defer(aio, inproc_ep_cancel, ep)) { nni_mtx_unlock(&nni_inproc.mx); - nni_aio_finish_error(aio, rv); return; } diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c index f962adbf..ab139a48 100644 --- a/src/sp/transport/ws/websocket.c +++ b/src/sp/transport/ws/websocket.c @@ -126,15 +126,13 @@ static void wstran_pipe_recv(void *arg, nni_aio *aio) { ws_pipe *p = arg; - int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, wstran_pipe_recv_cancel, p)) != 0) { + if (!nni_aio_defer(aio, wstran_pipe_recv_cancel, p)) { nni_mtx_unlock(&p->mtx); - nni_aio_finish_error(aio, rv); return; } p->user_rxaio = aio; @@ -161,7 +159,6 @@ static void wstran_pipe_send(void *arg, nni_aio *aio) { ws_pipe *p = arg; - int rv; if (nni_aio_begin(aio) != 0) { // No way to give the message back to the protocol, so @@ -171,9 +168,8 @@ wstran_pipe_send(void *arg, nni_aio *aio) return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, wstran_pipe_send_cancel, p)) != 0) { + if (!nni_aio_defer(aio, wstran_pipe_send_cancel, p)) { nni_mtx_unlock(&p->mtx); - nni_aio_finish_error(aio, rv); return; } p->user_txaio = aio; @@ -271,7 +267,6 @@ static void wstran_listener_accept(void *arg, nni_aio *aio) { ws_listener *l = arg; - int rv; // We already bound, so we just need to look for an available // pipe (created by the handler), and match it. @@ -280,9 +275,8 @@ wstran_listener_accept(void *arg, nni_aio *aio) return; } nni_mtx_lock(&l->mtx); - if ((rv = nni_aio_schedule(aio, wstran_listener_cancel, l)) != 0) { + if (!nni_aio_defer(aio, wstran_listener_cancel, l)) { nni_mtx_unlock(&l->mtx); - nni_aio_finish_error(aio, rv); return; } nni_list_append(&l->aios, aio); @@ -311,16 +305,14 @@ static void wstran_dialer_connect(void *arg, nni_aio *aio) { ws_dialer *d = arg; - int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&d->mtx); - if ((rv = nni_aio_schedule(aio, wstran_dialer_cancel, d)) != 0) { + if (!nni_aio_defer(aio, wstran_dialer_cancel, d)) { nni_mtx_unlock(&d->mtx); - nni_aio_finish_error(aio, rv); return; } NNI_ASSERT(nni_list_empty(&d->aios)); |
