aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c52
-rw-r--r--src/core/aio.h20
-rw-r--r--src/core/device.c8
-rw-r--r--src/core/endpt.c6
-rw-r--r--src/core/msgqueue.c60
-rw-r--r--src/core/msgqueue.h2
6 files changed, 103 insertions, 45 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index fac62f12..388e6677 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -49,9 +49,9 @@ static nni_list nni_aio_expire_aios;
//
// In order to guard against aio reuse during teardown, we set a fini
// flag. Any attempt to initialize for a new operation after that point
-// will fail and the caller will get NNG_ESTATE indicating this. The
-// provider that calls nni_aio_start() MUST check the return value, and
-// if it comes back nonzero (NNG_ESTATE) then it must simply discard the
+// will fail and the caller will get NNG_ECANCELED indicating this. The
+// provider that calls nni_aio_begin() MUST check the return value, and
+// if it comes back nonzero (NNG_ECANCELED) then it must simply discard the
// request and return.
// An nni_aio is an async I/O handle.
@@ -184,7 +184,7 @@ nni_aio_fini_cb(nni_aio *aio)
// nni_aio_stop cancels any oustanding operation, and waits for the
// callback to complete, if still running. It also marks the AIO as
-// stopped, preventing further calls to nni_aio_start from succeeding.
+// stopped, preventing further calls to nni_aio_begin from succeeding.
// To correctly tear down an AIO, call stop, and make sure any other
// calles are not also stopped, before calling nni_aio_fini to release
// actual memory.
@@ -298,14 +298,11 @@ nni_aio_wait(nni_aio *aio)
}
int
-nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+nni_aio_begin(nni_aio *aio)
{
- nni_time now = nni_clock();
-
nni_mtx_lock(&nni_aio_lk);
-
+ // We should not reschedule anything at this point.
if (aio->a_fini) {
- // We should not reschedule anything at this point.
aio->a_active = false;
aio->a_result = NNG_ECANCELED;
nni_mtx_unlock(&nni_aio_lk);
@@ -315,34 +312,52 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
aio->a_pend = false;
aio->a_result = 0;
aio->a_count = 0;
- aio->a_prov_cancel = cancelfn;
- aio->a_prov_data = data;
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
aio->a_active = true;
-
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
+ nni_mtx_unlock(&nni_aio_lk);
+ return (0);
+}
+void
+nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+{
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
- aio->a_expire = NNI_TIME_ZERO;
+ aio->a_expire = nni_clock();
break;
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
aio->a_expire = NNI_TIME_NEVER;
break;
default:
- aio->a_expire = now + aio->a_timeout;
+ aio->a_expire = nni_clock() + aio->a_timeout;
break;
}
}
+ nni_mtx_lock(&nni_aio_lk);
+ aio->a_prov_cancel = cancelfn;
+ aio->a_prov_data = data;
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
+}
+
+int
+nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+{
+
+ if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) {
+ return (NNG_ETIMEDOUT);
+ }
+ nni_aio_schedule(aio, cancelfn, data);
return (0);
}
@@ -651,6 +666,9 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
switch (aio->a_timeout) {
case NNG_DURATION_DEFAULT:
case NNG_DURATION_INFINITE:
@@ -661,13 +679,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
// then let it still wake up early, but with NNG_ETIMEDOUT.
if (ms > aio->a_timeout) {
aio->a_sleep = false;
- (void) nni_aio_start(aio, NULL, NULL);
+ (void) nni_aio_schedule(aio, NULL, NULL);
return;
}
}
aio->a_sleep = true;
aio->a_expire = nni_clock() + ms;
- (void) nni_aio_start(aio, NULL, NULL);
+
+ // There is no cancellation, apart from just unexpiring.
+ nni_aio_schedule(aio, NULL, NULL);
}
void
diff --git a/src/core/aio.h b/src/core/aio.h
index a0f4934f..d23ddf5b 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -123,7 +123,14 @@ extern void nni_aio_finish_msg(nni_aio *, nni_msg *);
// with the indicated result (NNG_ECLOSED or NNG_ECANCELED is recommended.)
extern void nni_aio_abort(nni_aio *, int rv);
-extern int nni_aio_start(nni_aio *, nni_aio_cancelfn, void *);
+// nni_aio_begin is called by a provider to indicate it is starting the
+// operation, and to check that the aio has not already been marked for
+// teardown. It returns 0 on success, or NNG_ECANCELED if the aio is being
+// torn down. (In that case, no operation should be aborted without any
+// call to any other functions on this AIO, most especially not the
+// nng_aio_finish family of functions.)
+extern int nni_aio_begin(nni_aio *);
+
extern void *nni_aio_get_prov_data(nni_aio *);
extern void nni_aio_set_prov_data(nni_aio *, void *);
extern void *nni_aio_get_prov_extra(nni_aio *, unsigned);
@@ -143,6 +150,17 @@ extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **);
extern void nni_aio_normalize_timeout(nni_aio *, nng_duration);
extern void nni_aio_bump_count(nni_aio *, size_t);
+// nni_aio_schedule indicates that the AIO has begun, and is scheduled for
+// asychronous completion. This also starts the expiration timer. Note that
+// prior to this, the aio is uncancellable.
+extern void nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *);
+
+// nni_aio_schedule_verify is like nni_aio_schedule, except that if the
+// operation has been run with a zero time (NNG_FLAG_NONBLOCK), then it
+// returns NNG_ETIMEDOUT. This is done to permit bypassing scheduling
+// if the operation could not be immediately completed.
+extern int nni_aio_schedule_verify(nni_aio *, nni_aio_cancelfn, void *);
+
extern void nni_sleep_aio(nni_duration, nni_aio *);
extern int nni_aio_sys_init(void);
diff --git a/src/core/device.c b/src/core/device.c
index 6def9f64..e3b1d220 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -188,12 +188,12 @@ nni_device_start(nni_device_data *dd, nni_aio *user)
{
int i;
- nni_mtx_lock(&dd->mtx);
- dd->user = user;
- if (nni_aio_start(user, nni_device_cancel, dd) != 0) {
- nni_mtx_unlock(&dd->mtx);
+ if (nni_aio_begin(user) != 0) {
return;
}
+ nni_mtx_lock(&dd->mtx);
+ nni_aio_schedule(user, nni_device_cancel, dd);
+ dd->user = user;
for (i = 0; i < dd->npath; i++) {
nni_device_path *p = &dd->paths[i];
p->user = user;
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 0cb59a14..2741a8e6 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -313,7 +313,7 @@ nni_ep_tmo_start(nni_ep *ep)
{
nni_duration backoff;
- if (ep->ep_closing) {
+ if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) {
return;
}
backoff = ep->ep_currtime;
@@ -334,9 +334,7 @@ nni_ep_tmo_start(nni_ep *ep)
ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
ep->ep_tmo_run = 1;
- if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
- ep->ep_tmo_run = 0;
- }
+ nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep);
}
static void
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 212b52c2..1bb5a762 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -22,10 +22,10 @@ struct nni_msgq {
int mq_len;
int mq_get;
int mq_put;
- int mq_closed;
int mq_puterr;
int mq_geterr;
- int mq_besteffort;
+ bool mq_besteffort;
+ bool mq_closed;
nni_msg **mq_msgs;
nni_list mq_aio_putq;
@@ -117,6 +117,10 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
nni_aio_list_remove(aio);
@@ -134,6 +138,10 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_aio_list_remove(aio);
@@ -151,6 +159,10 @@ nni_msgq_set_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) ||
((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) {
@@ -231,7 +243,7 @@ nni_msgq_run_putq(nni_msgq *mq)
}
void
-nni_msgq_set_best_effort(nni_msgq *mq, int on)
+nni_msgq_set_best_effort(nni_msgq *mq, bool on)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_besteffort = on;
@@ -325,22 +337,28 @@ nni_msgq_cancel(nni_aio *aio, int rv)
void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return;
- }
- if (mq->mq_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- nni_mtx_unlock(&mq->mq_lock);
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&mq->mq_lock);
if (mq->mq_puterr) {
nni_aio_finish_error(aio, mq->mq_puterr);
nni_mtx_unlock(&mq->mq_lock);
return;
}
+ // If this is an instantaneous poll operation, and the queue has
+ // no room, nobody is waiting to receive, and we're not best effort
+ // (best effort discards), then report the error (NNG_ETIMEDOUT).
+ rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
+ (nni_list_empty(&mq->mq_aio_getq)) && (!mq->mq_besteffort)) {
+ nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -351,19 +369,22 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
return;
}
- if (mq->mq_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_geterr) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
- if (mq->mq_geterr) {
- nni_aio_finish_error(aio, mq->mq_geterr);
+ rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ if ((rv != 0) && (mq->mq_len == 0) &&
+ (nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, rv);
return;
}
@@ -417,7 +438,8 @@ nni_msgq_close(nni_msgq *mq)
nni_aio *aio;
nni_mtx_lock(&mq->mq_lock);
- mq->mq_closed = 1;
+ mq->mq_closed = true;
+ mq->mq_puterr = mq->mq_geterr = NNG_ECLOSED;
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 2d23f448..2f1a46eb 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -63,7 +63,7 @@ extern void nni_msgq_set_get_error(nni_msgq *, int);
// What this does is treat the message queue condition as if it were
// successful, returning 0, and discarding the message. If zero is
// passed then this mode is reset to normal.
-extern void nni_msgq_set_best_effort(nni_msgq *, int);
+extern void nni_msgq_set_best_effort(nni_msgq *, bool);
// nni_msgq_filter is a callback function used to filter messages.
// The function is called on entry (put) or exit (get). The void