diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-15 16:50:49 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-15 16:50:49 -0700 |
| commit | 0d48c9d4f359ec79f9cc10db3e0e04cb7a58623e (patch) | |
| tree | f1b0dd28372a92d756e6cd42eb949829d007a591 /src | |
| parent | 7f95fde8d752dd93c20ff0a209334f4aec549111 (diff) | |
| download | nng-0d48c9d4f359ec79f9cc10db3e0e04cb7a58623e.tar.gz nng-0d48c9d4f359ec79f9cc10db3e0e04cb7a58623e.tar.bz2 nng-0d48c9d4f359ec79f9cc10db3e0e04cb7a58623e.zip | |
Race conditions removed... TCP tests work well know.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 40 | ||||
| -rw-r--r-- | src/core/aio.h | 17 | ||||
| -rw-r--r-- | src/core/endpt.c | 11 | ||||
| -rw-r--r-- | src/core/pipe.c | 2 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 23 |
5 files changed, 61 insertions, 32 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index b69671b0..ba09b608 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -48,8 +49,14 @@ nni_aio_fini(nni_aio *aio) nni_mtx_lock(&aio->a_lk); aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled - cancelfn = aio->a_prov_cancel; - nni_cv_wake(&aio->a_cv); + if ((aio->a_flags & NNI_AIO_DONE) == 0) { + aio->a_flags |= NNI_AIO_DONE; + aio->a_result = NNG_ECANCELED; + cancelfn = aio->a_prov_cancel; + } else { + cancelfn = NULL; + } + nni_cv_wake(&aio->a_cv); // XXX: why? aio_wait? We shouldn't have any nni_mtx_unlock(&aio->a_lk); // Cancel the AIO if it was scheduled. @@ -78,9 +85,6 @@ nni_aio_result(nni_aio *aio) nni_mtx_lock(&aio->a_lk); rv = aio->a_result; - if (aio->a_flags & (NNI_AIO_FINI | NNI_AIO_STOP)) { - rv = NNG_ECANCELED; - } nni_mtx_unlock(&aio->a_lk); return (rv); } @@ -131,12 +135,18 @@ nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) return (0); } +// XXX: REMOVE ME... void nni_aio_stop(nni_aio *aio) { void (*cancelfn)(nni_aio *); nni_mtx_lock(&aio->a_lk); + if (aio->a_flags & (NNI_AIO_STOP | NNI_AIO_FINI | NNI_AIO_DONE)) { + nni_mtx_unlock(&aio->a_lk); + return; + } + aio->a_result = NNG_ECANCELED; aio->a_flags |= NNI_AIO_DONE | NNI_AIO_STOP; cancelfn = aio->a_prov_cancel; nni_mtx_unlock(&aio->a_lk); @@ -158,23 +168,25 @@ nni_aio_stop(nni_aio *aio) } void -nni_aio_cancel(nni_aio *aio) +nni_aio_cancel(nni_aio *aio, int rv) { void (*cancelfn)(nni_aio *); nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & NNI_AIO_DONE) { + if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { // The operation already completed - so there's nothing // left for us to do. nni_mtx_unlock(&aio->a_lk); return; } aio->a_flags |= NNI_AIO_DONE; - aio->a_result = NNG_ECANCELED; + aio->a_result = rv; cancelfn = aio->a_prov_cancel; + + // XXX: Think about the synchronization with nni_aio_fini... nni_mtx_unlock(&aio->a_lk); - // This unregisters the AIO from the provider. + // Stop any I/O at the provider level. if (cancelfn != NULL) { cancelfn(aio); } @@ -184,9 +196,12 @@ nni_aio_cancel(nni_aio *aio) aio->a_prov_data = NULL; aio->a_prov_cancel = NULL; - if (!(aio->a_flags & (NNI_AIO_FINI | NNI_AIO_STOP))) { + // XXX: mark unbusy + if (!(aio->a_flags & NNI_AIO_FINI)) { + // If we are finalizing, then we are done. nni_taskq_dispatch(NULL, &aio->a_tqe); } + // XXX: else wake aio_cv.. because there is someone watching. nni_mtx_unlock(&aio->a_lk); } @@ -196,7 +211,7 @@ void nni_aio_finish(nni_aio *aio, int result, size_t count) { nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & NNI_AIO_DONE) { + if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { // Operation already done (canceled or timed out?) nni_mtx_unlock(&aio->a_lk); return; @@ -207,7 +222,8 @@ nni_aio_finish(nni_aio *aio, int result, size_t count) aio->a_prov_cancel = NULL; aio->a_prov_data = NULL; - if (!(aio->a_flags & (NNI_AIO_FINI | NNI_AIO_STOP))) { + // XXX: cleanup the NNI_AIO_STOP flag, it's kind of pointless I think. + if (!(aio->a_flags & NNI_AIO_STOP)) { nni_taskq_dispatch(NULL, &aio->a_tqe); } nni_mtx_unlock(&aio->a_lk); diff --git a/src/core/aio.h b/src/core/aio.h index 1b1f58e1..296b0682 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -29,6 +30,7 @@ struct nni_aio { nni_mtx a_lk; nni_cv a_cv; unsigned a_flags; + int a_refcnt; // prevent use-after-free nni_taskq_ent a_tqe; // Read/write operations. @@ -102,7 +104,18 @@ extern int nni_aio_list_active(nni_aio *); // and the amount of data transferred (if any). extern void nni_aio_finish(nni_aio *, int, size_t); -extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *); -extern void nni_aio_stop(nni_aio *); +// nni_aio_cancel is used to cancel an operation. Any pending I/O or +// timeouts are canceled if possible, and the callback will be returned +// with the indicated result (NNG_ECLOSED or NNG_ECANCELED is recommended.) +extern void nni_aio_cancel(nni_aio *, int rv); + +extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *); + +// nni_aio_stop is used to abort all further operations on the AIO. +// When this is executed, no further operations or callbacks will be +// executed, and if callbacks or I/O is in progress this will block +// until they are either canceled or aborted. (Question: why not just +// nni_fini?) +// extern void nni_aio_stop(nni_aio *); #endif // CORE_AIO_H diff --git a/src/core/endpt.c b/src/core/endpt.c index a99cd21a..f221be18 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -154,6 +154,12 @@ nni_ep_close(nni_ep *ep) ep->ep_closed = 1; nni_mtx_unlock(&ep->ep_mtx); + // Abort any remaining in-flight operations. + nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED); + nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED); + nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED); + + // Stop the underlying transport. ep->ep_ops.ep_close(ep->ep_data); } @@ -164,11 +170,6 @@ nni_ep_reap(nni_ep *ep) nni_ep_close(ep); // Extra sanity. - // Abort any in-flight operations. - nni_aio_stop(&ep->ep_acc_aio); - nni_aio_stop(&ep->ep_con_aio); - nni_aio_stop(&ep->ep_con_syn); - // Take us off the sock list. nni_sock_ep_remove(ep->ep_sock, ep); diff --git a/src/core/pipe.c b/src/core/pipe.c index f1e8014e..3d0f96ab 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -100,7 +100,7 @@ nni_pipe_close(nni_pipe *p) p->p_reap = 1; // abort any pending negotiation/start process. - nni_aio_stop(&p->p_start_aio); + nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED); // Close the underlying transport. if (p->p_tran_data != NULL) { diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 875eb71a..dad4c46e 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -141,11 +142,10 @@ nni_tcp_cancel_nego(nni_aio *aio) nni_tcp_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); - if ((aio = pipe->user_negaio) != NULL) { - pipe->user_negaio = NULL; - nni_aio_stop(aio); - } + pipe->user_negaio = NULL; nni_mtx_unlock(&pipe->mtx); + + nni_aio_cancel(&pipe->negaio, aio->a_result); } static void @@ -311,8 +311,8 @@ nni_tcp_cancel_tx(nni_aio *aio) pipe->user_txaio = NULL; nni_mtx_unlock(&pipe->mtx); - // stop the underlying aio ... we don't want a result for it. - nni_aio_stop(&pipe->txaio); + // cancel the underlying operation. + nni_aio_cancel(&pipe->txaio, aio->a_result); } static void @@ -356,8 +356,8 @@ nni_tcp_cancel_rx(nni_aio *aio) pipe->user_rxaio = NULL; nni_mtx_unlock(&pipe->mtx); - // stop the underlying aio ... we don't want a result for it. - nni_aio_stop(&pipe->rxaio); + // cancel the underlying operation. + nni_aio_cancel(&pipe->rxaio, aio->a_result); } static void @@ -637,11 +637,10 @@ nni_tcp_cancel_ep(nni_aio *aio) nni_tcp_ep *ep = aio->a_prov_data; nni_mtx_lock(&ep->mtx); - if (ep->user_aio == aio) { - ep->user_aio = NULL; - } - nni_aio_stop(&ep->aio); + ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); + + nni_aio_cancel(&ep->aio, aio->a_result); } static void |
