aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-18 20:38:00 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-20 07:34:16 -0700
commit5902d02ad0a056a146231568f1293ffbcd59f61c (patch)
treebe38584c02d703ec2322ab941d4d723c752fe187 /src/core
parent40542e7af0f5003d7ad67876ea580a59174031ca (diff)
downloadnng-5902d02ad0a056a146231568f1293ffbcd59f61c.tar.gz
nng-5902d02ad0a056a146231568f1293ffbcd59f61c.tar.bz2
nng-5902d02ad0a056a146231568f1293ffbcd59f61c.zip
fixes #346 nng_recv() sometimes acts on null `msg` pointer
This closes a fundamental flaw in the way aio structures were handled. In paticular, aio expiration could race ahead, and fire before the aio was properly registered by the provider. This ultimately led to the possibility of duplicate completions on the same aio. The solution involved breaking up nni_aio_start into two functions. nni_aio_begin (which can be run outside of external locks) simply validates that nni_aio_fini() has not been called, and clears certain fields in the aio to make it ready for use by the provider. nni_aio_schedule does the work to register the aio with the expiration thread, and should only be called when the aio is actually scheduled for asynchronous completion. nni_aio_schedule_verify does the same thing, but returns NNG_ETIMEDOUT if the aio has a zero length timeout. This change has a small negative performance impact. We have plans to rectify that by converting nni_aio_begin to use a locklesss flag for the aio->a_fini bit. While we were here, we fixed some error paths in the POSIX subsystem, which would have returned incorrect error codes, and we made some optmizations in the message queues to reduce conditionals while holding locks in the hot code path.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c52
-rw-r--r--src/core/aio.h20
-rw-r--r--src/core/device.c8
-rw-r--r--src/core/endpt.c6
-rw-r--r--src/core/msgqueue.c60
-rw-r--r--src/core/msgqueue.h2
6 files changed, 103 insertions, 45 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index fac62f12..388e6677 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -49,9 +49,9 @@ static nni_list nni_aio_expire_aios;
//
// In order to guard against aio reuse during teardown, we set a fini
// flag. Any attempt to initialize for a new operation after that point
-// will fail and the caller will get NNG_ESTATE indicating this. The
-// provider that calls nni_aio_start() MUST check the return value, and
-// if it comes back nonzero (NNG_ESTATE) then it must simply discard the
+// will fail and the caller will get NNG_ECANCELED indicating this. The
+// provider that calls nni_aio_begin() MUST check the return value, and
+// if it comes back nonzero (NNG_ECANCELED) then it must simply discard the
// request and return.
// An nni_aio is an async I/O handle.
@@ -184,7 +184,7 @@ nni_aio_fini_cb(nni_aio *aio)
// nni_aio_stop cancels any oustanding operation, and waits for the
// callback to complete, if still running. It also marks the AIO as
-// stopped, preventing further calls to nni_aio_start from succeeding.
+// stopped, preventing further calls to nni_aio_begin from succeeding.
// To correctly tear down an AIO, call stop, and make sure any other
// calles are not also stopped, before calling nni_aio_fini to release
// actual memory.
@@ -298,14 +298,11 @@ nni_aio_wait(nni_aio *aio)
}
int
-nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+nni_aio_begin(nni_aio *aio)
{
- nni_time now = nni_clock();
-
nni_mtx_lock(&nni_aio_lk);
-
+ // We should not reschedule anything at this point.
if (aio->a_fini) {
- // We should not reschedule anything at this point.
aio->a_active = false;
aio->a_result = NNG_ECANCELED;
nni_mtx_unlock(&nni_aio_lk);
@@ -315,34 +312,52 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
aio->a_pend = false;
aio->a_result = 0;
aio->a_count = 0;
- aio->a_prov_cancel = cancelfn;
- aio->a_prov_data = data;
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
aio->a_active = true;
-
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
+ nni_mtx_unlock(&nni_aio_lk);
+ return (0);
+}
+void
+nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+{
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
- aio->a_expire = NNI_TIME_ZERO;
+ aio->a_expire = nni_clock();
break;
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
aio->a_expire = NNI_TIME_NEVER;
break;
default:
- aio->a_expire = now + aio->a_timeout;
+ aio->a_expire = nni_clock() + aio->a_timeout;
break;
}
}
+ nni_mtx_lock(&nni_aio_lk);
+ aio->a_prov_cancel = cancelfn;
+ aio->a_prov_data = data;
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
+}
+
+int
+nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+{
+
+ if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) {
+ return (NNG_ETIMEDOUT);
+ }
+ nni_aio_schedule(aio, cancelfn, data);
return (0);
}
@@ -651,6 +666,9 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
switch (aio->a_timeout) {
case NNG_DURATION_DEFAULT:
case NNG_DURATION_INFINITE:
@@ -661,13 +679,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
// then let it still wake up early, but with NNG_ETIMEDOUT.
if (ms > aio->a_timeout) {
aio->a_sleep = false;
- (void) nni_aio_start(aio, NULL, NULL);
+ (void) nni_aio_schedule(aio, NULL, NULL);
return;
}
}
aio->a_sleep = true;
aio->a_expire = nni_clock() + ms;
- (void) nni_aio_start(aio, NULL, NULL);
+
+ // There is no cancellation, apart from just unexpiring.
+ nni_aio_schedule(aio, NULL, NULL);
}
void
diff --git a/src/core/aio.h b/src/core/aio.h
index a0f4934f..d23ddf5b 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -123,7 +123,14 @@ extern void nni_aio_finish_msg(nni_aio *, nni_msg *);
// with the indicated result (NNG_ECLOSED or NNG_ECANCELED is recommended.)
extern void nni_aio_abort(nni_aio *, int rv);
-extern int nni_aio_start(nni_aio *, nni_aio_cancelfn, void *);
+// nni_aio_begin is called by a provider to indicate it is starting the
+// operation, and to check that the aio has not already been marked for
+// teardown. It returns 0 on success, or NNG_ECANCELED if the aio is being
+// torn down. (In that case, no operation should be aborted without any
+// call to any other functions on this AIO, most especially not the
+// nng_aio_finish family of functions.)
+extern int nni_aio_begin(nni_aio *);
+
extern void *nni_aio_get_prov_data(nni_aio *);
extern void nni_aio_set_prov_data(nni_aio *, void *);
extern void *nni_aio_get_prov_extra(nni_aio *, unsigned);
@@ -143,6 +150,17 @@ extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **);
extern void nni_aio_normalize_timeout(nni_aio *, nng_duration);
extern void nni_aio_bump_count(nni_aio *, size_t);
+// nni_aio_schedule indicates that the AIO has begun, and is scheduled for
+// asychronous completion. This also starts the expiration timer. Note that
+// prior to this, the aio is uncancellable.
+extern void nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *);
+
+// nni_aio_schedule_verify is like nni_aio_schedule, except that if the
+// operation has been run with a zero time (NNG_FLAG_NONBLOCK), then it
+// returns NNG_ETIMEDOUT. This is done to permit bypassing scheduling
+// if the operation could not be immediately completed.
+extern int nni_aio_schedule_verify(nni_aio *, nni_aio_cancelfn, void *);
+
extern void nni_sleep_aio(nni_duration, nni_aio *);
extern int nni_aio_sys_init(void);
diff --git a/src/core/device.c b/src/core/device.c
index 6def9f64..e3b1d220 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -188,12 +188,12 @@ nni_device_start(nni_device_data *dd, nni_aio *user)
{
int i;
- nni_mtx_lock(&dd->mtx);
- dd->user = user;
- if (nni_aio_start(user, nni_device_cancel, dd) != 0) {
- nni_mtx_unlock(&dd->mtx);
+ if (nni_aio_begin(user) != 0) {
return;
}
+ nni_mtx_lock(&dd->mtx);
+ nni_aio_schedule(user, nni_device_cancel, dd);
+ dd->user = user;
for (i = 0; i < dd->npath; i++) {
nni_device_path *p = &dd->paths[i];
p->user = user;
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 0cb59a14..2741a8e6 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -313,7 +313,7 @@ nni_ep_tmo_start(nni_ep *ep)
{
nni_duration backoff;
- if (ep->ep_closing) {
+ if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) {
return;
}
backoff = ep->ep_currtime;
@@ -334,9 +334,7 @@ nni_ep_tmo_start(nni_ep *ep)
ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
ep->ep_tmo_run = 1;
- if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
- ep->ep_tmo_run = 0;
- }
+ nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep);
}
static void
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 212b52c2..1bb5a762 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -22,10 +22,10 @@ struct nni_msgq {
int mq_len;
int mq_get;
int mq_put;
- int mq_closed;
int mq_puterr;
int mq_geterr;
- int mq_besteffort;
+ bool mq_besteffort;
+ bool mq_closed;
nni_msg **mq_msgs;
nni_list mq_aio_putq;
@@ -117,6 +117,10 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
nni_aio_list_remove(aio);
@@ -134,6 +138,10 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_aio_list_remove(aio);
@@ -151,6 +159,10 @@ nni_msgq_set_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) ||
((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) {
@@ -231,7 +243,7 @@ nni_msgq_run_putq(nni_msgq *mq)
}
void
-nni_msgq_set_best_effort(nni_msgq *mq, int on)
+nni_msgq_set_best_effort(nni_msgq *mq, bool on)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_besteffort = on;
@@ -325,22 +337,28 @@ nni_msgq_cancel(nni_aio *aio, int rv)
void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return;
- }
- if (mq->mq_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- nni_mtx_unlock(&mq->mq_lock);
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&mq->mq_lock);
if (mq->mq_puterr) {
nni_aio_finish_error(aio, mq->mq_puterr);
nni_mtx_unlock(&mq->mq_lock);
return;
}
+ // If this is an instantaneous poll operation, and the queue has
+ // no room, nobody is waiting to receive, and we're not best effort
+ // (best effort discards), then report the error (NNG_ETIMEDOUT).
+ rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
+ (nni_list_empty(&mq->mq_aio_getq)) && (!mq->mq_besteffort)) {
+ nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -351,19 +369,22 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
return;
}
- if (mq->mq_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_geterr) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
- if (mq->mq_geterr) {
- nni_aio_finish_error(aio, mq->mq_geterr);
+ rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ if ((rv != 0) && (mq->mq_len == 0) &&
+ (nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, rv);
return;
}
@@ -417,7 +438,8 @@ nni_msgq_close(nni_msgq *mq)
nni_aio *aio;
nni_mtx_lock(&mq->mq_lock);
- mq->mq_closed = 1;
+ mq->mq_closed = true;
+ mq->mq_puterr = mq->mq_geterr = NNG_ECLOSED;
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 2d23f448..2f1a46eb 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -63,7 +63,7 @@ extern void nni_msgq_set_get_error(nni_msgq *, int);
// What this does is treat the message queue condition as if it were
// successful, returning 0, and discarding the message. If zero is
// passed then this mode is reset to normal.
-extern void nni_msgq_set_best_effort(nni_msgq *, int);
+extern void nni_msgq_set_best_effort(nni_msgq *, bool);
// nni_msgq_filter is a callback function used to filter messages.
// The function is called on entry (put) or exit (get). The void