aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-15 16:50:49 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-15 16:50:49 -0700
commit0d48c9d4f359ec79f9cc10db3e0e04cb7a58623e (patch)
treef1b0dd28372a92d756e6cd42eb949829d007a591
parent7f95fde8d752dd93c20ff0a209334f4aec549111 (diff)
downloadnng-0d48c9d4f359ec79f9cc10db3e0e04cb7a58623e.tar.gz
nng-0d48c9d4f359ec79f9cc10db3e0e04cb7a58623e.tar.bz2
nng-0d48c9d4f359ec79f9cc10db3e0e04cb7a58623e.zip
Race conditions removed... TCP tests work well know.
-rw-r--r--src/core/aio.c40
-rw-r--r--src/core/aio.h17
-rw-r--r--src/core/endpt.c11
-rw-r--r--src/core/pipe.c2
-rw-r--r--src/transport/tcp/tcp.c23
5 files changed, 61 insertions, 32 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index b69671b0..ba09b608 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -48,8 +49,14 @@ nni_aio_fini(nni_aio *aio)
nni_mtx_lock(&aio->a_lk);
aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
- cancelfn = aio->a_prov_cancel;
- nni_cv_wake(&aio->a_cv);
+ if ((aio->a_flags & NNI_AIO_DONE) == 0) {
+ aio->a_flags |= NNI_AIO_DONE;
+ aio->a_result = NNG_ECANCELED;
+ cancelfn = aio->a_prov_cancel;
+ } else {
+ cancelfn = NULL;
+ }
+ nni_cv_wake(&aio->a_cv); // XXX: why? aio_wait? We shouldn't have any
nni_mtx_unlock(&aio->a_lk);
// Cancel the AIO if it was scheduled.
@@ -78,9 +85,6 @@ nni_aio_result(nni_aio *aio)
nni_mtx_lock(&aio->a_lk);
rv = aio->a_result;
- if (aio->a_flags & (NNI_AIO_FINI | NNI_AIO_STOP)) {
- rv = NNG_ECANCELED;
- }
nni_mtx_unlock(&aio->a_lk);
return (rv);
}
@@ -131,12 +135,18 @@ nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data)
return (0);
}
+// XXX: REMOVE ME...
void
nni_aio_stop(nni_aio *aio)
{
void (*cancelfn)(nni_aio *);
nni_mtx_lock(&aio->a_lk);
+ if (aio->a_flags & (NNI_AIO_STOP | NNI_AIO_FINI | NNI_AIO_DONE)) {
+ nni_mtx_unlock(&aio->a_lk);
+ return;
+ }
+ aio->a_result = NNG_ECANCELED;
aio->a_flags |= NNI_AIO_DONE | NNI_AIO_STOP;
cancelfn = aio->a_prov_cancel;
nni_mtx_unlock(&aio->a_lk);
@@ -158,23 +168,25 @@ nni_aio_stop(nni_aio *aio)
}
void
-nni_aio_cancel(nni_aio *aio)
+nni_aio_cancel(nni_aio *aio, int rv)
{
void (*cancelfn)(nni_aio *);
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & NNI_AIO_DONE) {
+ if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
// The operation already completed - so there's nothing
// left for us to do.
nni_mtx_unlock(&aio->a_lk);
return;
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_result = NNG_ECANCELED;
+ aio->a_result = rv;
cancelfn = aio->a_prov_cancel;
+
+ // XXX: Think about the synchronization with nni_aio_fini...
nni_mtx_unlock(&aio->a_lk);
- // This unregisters the AIO from the provider.
+ // Stop any I/O at the provider level.
if (cancelfn != NULL) {
cancelfn(aio);
}
@@ -184,9 +196,12 @@ nni_aio_cancel(nni_aio *aio)
aio->a_prov_data = NULL;
aio->a_prov_cancel = NULL;
- if (!(aio->a_flags & (NNI_AIO_FINI | NNI_AIO_STOP))) {
+ // XXX: mark unbusy
+ if (!(aio->a_flags & NNI_AIO_FINI)) {
+ // If we are finalizing, then we are done.
nni_taskq_dispatch(NULL, &aio->a_tqe);
}
+ // XXX: else wake aio_cv.. because there is someone watching.
nni_mtx_unlock(&aio->a_lk);
}
@@ -196,7 +211,7 @@ void
nni_aio_finish(nni_aio *aio, int result, size_t count)
{
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & NNI_AIO_DONE) {
+ if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
// Operation already done (canceled or timed out?)
nni_mtx_unlock(&aio->a_lk);
return;
@@ -207,7 +222,8 @@ nni_aio_finish(nni_aio *aio, int result, size_t count)
aio->a_prov_cancel = NULL;
aio->a_prov_data = NULL;
- if (!(aio->a_flags & (NNI_AIO_FINI | NNI_AIO_STOP))) {
+ // XXX: cleanup the NNI_AIO_STOP flag, it's kind of pointless I think.
+ if (!(aio->a_flags & NNI_AIO_STOP)) {
nni_taskq_dispatch(NULL, &aio->a_tqe);
}
nni_mtx_unlock(&aio->a_lk);
diff --git a/src/core/aio.h b/src/core/aio.h
index 1b1f58e1..296b0682 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -29,6 +30,7 @@ struct nni_aio {
nni_mtx a_lk;
nni_cv a_cv;
unsigned a_flags;
+ int a_refcnt; // prevent use-after-free
nni_taskq_ent a_tqe;
// Read/write operations.
@@ -102,7 +104,18 @@ extern int nni_aio_list_active(nni_aio *);
// and the amount of data transferred (if any).
extern void nni_aio_finish(nni_aio *, int, size_t);
-extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *);
-extern void nni_aio_stop(nni_aio *);
+// nni_aio_cancel is used to cancel an operation. Any pending I/O or
+// timeouts are canceled if possible, and the callback will be returned
+// with the indicated result (NNG_ECLOSED or NNG_ECANCELED is recommended.)
+extern void nni_aio_cancel(nni_aio *, int rv);
+
+extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *);
+
+// nni_aio_stop is used to abort all further operations on the AIO.
+// When this is executed, no further operations or callbacks will be
+// executed, and if callbacks or I/O is in progress this will block
+// until they are either canceled or aborted. (Question: why not just
+// nni_fini?)
+// extern void nni_aio_stop(nni_aio *);
#endif // CORE_AIO_H
diff --git a/src/core/endpt.c b/src/core/endpt.c
index a99cd21a..f221be18 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -154,6 +154,12 @@ nni_ep_close(nni_ep *ep)
ep->ep_closed = 1;
nni_mtx_unlock(&ep->ep_mtx);
+ // Abort any remaining in-flight operations.
+ nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED);
+ nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED);
+ nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED);
+
+ // Stop the underlying transport.
ep->ep_ops.ep_close(ep->ep_data);
}
@@ -164,11 +170,6 @@ nni_ep_reap(nni_ep *ep)
nni_ep_close(ep); // Extra sanity.
- // Abort any in-flight operations.
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
-
// Take us off the sock list.
nni_sock_ep_remove(ep->ep_sock, ep);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index f1e8014e..3d0f96ab 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -100,7 +100,7 @@ nni_pipe_close(nni_pipe *p)
p->p_reap = 1;
// abort any pending negotiation/start process.
- nni_aio_stop(&p->p_start_aio);
+ nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
// Close the underlying transport.
if (p->p_tran_data != NULL) {
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 875eb71a..dad4c46e 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -141,11 +142,10 @@ nni_tcp_cancel_nego(nni_aio *aio)
nni_tcp_pipe *pipe = aio->a_prov_data;
nni_mtx_lock(&pipe->mtx);
- if ((aio = pipe->user_negaio) != NULL) {
- pipe->user_negaio = NULL;
- nni_aio_stop(aio);
- }
+ pipe->user_negaio = NULL;
nni_mtx_unlock(&pipe->mtx);
+
+ nni_aio_cancel(&pipe->negaio, aio->a_result);
}
static void
@@ -311,8 +311,8 @@ nni_tcp_cancel_tx(nni_aio *aio)
pipe->user_txaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- // stop the underlying aio ... we don't want a result for it.
- nni_aio_stop(&pipe->txaio);
+ // cancel the underlying operation.
+ nni_aio_cancel(&pipe->txaio, aio->a_result);
}
static void
@@ -356,8 +356,8 @@ nni_tcp_cancel_rx(nni_aio *aio)
pipe->user_rxaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- // stop the underlying aio ... we don't want a result for it.
- nni_aio_stop(&pipe->rxaio);
+ // cancel the underlying operation.
+ nni_aio_cancel(&pipe->rxaio, aio->a_result);
}
static void
@@ -637,11 +637,10 @@ nni_tcp_cancel_ep(nni_aio *aio)
nni_tcp_ep *ep = aio->a_prov_data;
nni_mtx_lock(&ep->mtx);
- if (ep->user_aio == aio) {
- ep->user_aio = NULL;
- }
- nni_aio_stop(&ep->aio);
+ ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
+
+ nni_aio_cancel(&ep->aio, aio->a_result);
}
static void