aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-04 17:17:42 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-04 21:20:00 -0700
commitdc334d7193a2a0bc0194221b853a37e1be7f5b9a (patch)
tree1eebf2773745a3a25e8a071fbe4f51cd5490d4e4
parent6887900ae033add30ee0151b72abe927c5239588 (diff)
downloadnng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.gz
nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.bz2
nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.zip
Refactor AIO logic to close numerous races and reduce complexity.
This passes valgrind 100% clean for both helgrind and deep leak checks. This represents a complete rethink of how the AIOs work, and much simpler synchronization; the provider API is a bit simpler to boot, as a number of failure modes have been simply eliminated. While here a few other minor bugs were squashed.
-rw-r--r--src/core/aio.c320
-rw-r--r--src/core/aio.h32
-rw-r--r--src/core/endpt.c11
-rw-r--r--src/core/msgqueue.c103
-rw-r--r--src/core/taskq.c1
-rw-r--r--src/platform/posix/posix_epdesc.c16
-rw-r--r--src/platform/posix/posix_pipedesc.c9
-rw-r--r--src/platform/posix/posix_resolv_gai.c3
-rw-r--r--src/platform/windows/win_impl.h9
-rw-r--r--src/platform/windows/win_iocp.c101
-rw-r--r--src/platform/windows/win_ipc.c58
-rw-r--r--src/platform/windows/win_net.c66
-rw-r--r--src/platform/windows/win_resolv.c28
-rw-r--r--src/transport/inproc/inproc.c110
-rw-r--r--src/transport/ipc/ipc.c69
-rw-r--r--src/transport/tcp/tcp.c61
-rw-r--r--tests/compat_bug777.c1
-rw-r--r--tests/inproc.c3
18 files changed, 468 insertions, 533 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index e6157786..792b63f2 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -11,22 +11,50 @@
#include "core/nng_impl.h"
#include <string.h>
-enum nni_aio_flags {
- NNI_AIO_INIT = 0x1,
- NNI_AIO_DONE = 0x2,
- NNI_AIO_FINI = 0x4,
-};
-
+static nni_mtx nni_aio_lk;
// These are used for expiration.
-static nni_mtx nni_aio_expire_mtx;
static nni_cv nni_aio_expire_cv;
static int nni_aio_expire_exit;
-static nni_list nni_aio_expire_aios;
static nni_thr nni_aio_expire_thr;
-static nni_aio *nni_aio_expire_current;
+static nni_list nni_aio_expire_aios;
+
+// Design notes.
+//
+// AIOs are only ever "completed" by the provider, which must call
+// one of the nni_aio_finish variants. Until this occurs, the provider
+// guarantees that the AIO is valid. The provider must guarantee that
+// an AIO will be "completed" (with a call to nni_aio_finish & friends)
+// exactly once.
+//
+// Note that the cancellation routine may be called by the framework
+// several times. The framework (or the consumer) guarantees that the
+// AIO will remain valid across these calls, so that the provider is
+// free to examine the aio for list membership, etc. The provider must
+// not call finish more than once though.
+//
+// A single lock, nni_aio_lk, is used to protect the flags on the AIO,
+// as well as the expire list on the AIOs. We will not permit an AIO
+// to be marked done if an expiration is outstanding.
+//
+// In order to synchronize with the expiration, we set a flag when we
+// are going to cancel due to expiration, and then let the expiration
+// thread dispatch the notification to the user (after ensuring that
+// the provider is done with the aio.) This ensures that the completion
+// task will be dispatch *exactly* once, and only after nothing in
+// the provider or the framework is using it further. (The consumer
+// will probably still be using, but if the consumer calls nni_aio_wait
+// or nni_aio_stop, then the consumer will have exclusive access to it.
+// Provided, of course, that the consumer does not reuse the aio for
+// another operation in the callback.)
+//
+// In order to guard against aio reuse during teardown, we set a fini
+// flag. Any attempt to initialize for a new operation after that point
+// will fail and the caller will get NNG_ESTATE indicating this. The
+// provider that calls nni_aio_start() MUST check the return value, and
+// if it comes back nonzero (NNG_ESTATE) then it must simply discard the
+// request and return.
static void nni_aio_expire_add(nni_aio *);
-static void nni_aio_expire_remove(nni_aio *);
int
nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
@@ -34,15 +62,11 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
int rv;
memset(aio, 0, sizeof(*aio));
- if ((rv = nni_mtx_init(&aio->a_lk)) != 0) {
- return (rv);
- }
- if ((rv = nni_cv_init(&aio->a_cv, &aio->a_lk)) != 0) {
- nni_mtx_fini(&aio->a_lk);
+ if ((rv = nni_cv_init(&aio->a_cv, &nni_aio_lk)) != 0) {
return (rv);
}
aio->a_expire = NNI_TIME_NEVER;
- aio->a_flags = NNI_AIO_INIT;
+ aio->a_init = 1;
nni_task_init(NULL, &aio->a_task, cb, arg);
return (0);
@@ -55,7 +79,6 @@ nni_aio_fini(nni_aio *aio)
// At this point the AIO is done.
nni_cv_fini(&aio->a_cv);
- nni_mtx_fini(&aio->a_lk);
if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
@@ -71,30 +94,23 @@ nni_aio_fini(nni_aio *aio)
void
nni_aio_stop(nni_aio *aio)
{
- if ((aio->a_flags & NNI_AIO_INIT) == 0) {
+ if (!aio->a_init) {
// Never initialized, so nothing should have happened.
return;
}
- nni_mtx_lock(&aio->a_lk);
- aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
- nni_mtx_unlock(&aio->a_lk);
+ nni_mtx_lock(&nni_aio_lk);
+ aio->a_fini = 1;
+ nni_mtx_unlock(&nni_aio_lk);
nni_aio_cancel(aio, NNG_ECANCELED);
- // Wait for any outstanding task to complete. We won't schedule
- // new stuff because nni_aio_start will fail (due to AIO_FINI).
- nni_task_wait(&aio->a_task);
+ nni_aio_wait(aio);
}
int
nni_aio_result(nni_aio *aio)
{
- int rv;
-
- nni_mtx_lock(&aio->a_lk);
- rv = aio->a_result;
- nni_mtx_unlock(&aio->a_lk);
- return (rv);
+ return (aio->a_result);
}
size_t
@@ -106,131 +122,116 @@ nni_aio_count(nni_aio *aio)
void
nni_aio_wait(nni_aio *aio)
{
- nni_mtx_lock(&aio->a_lk);
- while ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) == 0) {
+ nni_mtx_lock(&nni_aio_lk);
+ while ((aio->a_active) && (!aio->a_done)) {
nni_cv_wait(&aio->a_cv);
}
- nni_mtx_unlock(&aio->a_lk);
+ nni_mtx_unlock(&nni_aio_lk);
nni_task_wait(&aio->a_task);
}
int
-nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data)
+nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
{
- nni_mtx_lock(&aio->a_lk);
- aio->a_flags &= ~NNI_AIO_DONE;
- if (aio->a_flags & NNI_AIO_FINI) {
+ nni_mtx_lock(&nni_aio_lk);
+ if (aio->a_fini) {
// We should not reschedule anything at this point.
- nni_mtx_unlock(&aio->a_lk);
+ aio->a_active = 0;
+ aio->a_result = NNG_ECANCELED;
+ nni_mtx_unlock(&nni_aio_lk);
return (NNG_ECANCELED);
}
+ aio->a_done = 0;
+ aio->a_pend = 0;
aio->a_result = 0;
aio->a_count = 0;
- aio->a_prov_cancel = cancel;
+ aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
+ aio->a_active = 1;
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
- nni_mtx_unlock(&aio->a_lk);
+ nni_mtx_unlock(&nni_aio_lk);
return (0);
}
+// nni_aio_cancel is called by a consumer which guarantees that the aio
+// is still valid.
void
nni_aio_cancel(nni_aio *aio, int rv)
{
- void (*cancelfn)(nni_aio *);
-
- nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & NNI_AIO_DONE) {
- // The operation already completed - so there's nothing
- // left for us to do.
- nni_mtx_unlock(&aio->a_lk);
- return;
- }
- aio->a_flags |= NNI_AIO_DONE;
- aio->a_result = rv;
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
-
- aio->a_refcnt++;
- nni_mtx_unlock(&aio->a_lk);
+ nni_aio_cancelfn cancelfn;
- // Guaraneed to just be a list operation.
- nni_aio_expire_remove(aio);
+ nni_mtx_lock(&nni_aio_lk);
+ cancelfn = aio->a_prov_cancel;
+ nni_mtx_unlock(&nni_aio_lk);
// Stop any I/O at the provider level.
if (cancelfn != NULL) {
- cancelfn(aio);
+ cancelfn(aio, rv);
}
-
- nni_mtx_lock(&aio->a_lk);
-
- aio->a_refcnt--;
- nni_cv_wake(&aio->a_cv);
-
- // These should have already been cleared by the cancel function.
- aio->a_prov_data = NULL;
- aio->a_prov_cancel = NULL;
-
- nni_task_dispatch(&aio->a_task);
- nni_mtx_unlock(&aio->a_lk);
}
// I/O provider related functions.
-int
-nni_aio_finish(nni_aio *aio, int result, size_t count)
+static void
+nni_aio_finish_impl(
+ nni_aio *aio, int result, size_t count, void *pipe, nni_msg *msg)
{
- nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & NNI_AIO_DONE) {
- // Operation already done (canceled or timed out?)
- nni_mtx_unlock(&aio->a_lk);
- return (NNG_ESTATE);
- }
- aio->a_flags |= NNI_AIO_DONE;
+ nni_mtx_lock(&nni_aio_lk);
+ NNI_ASSERT(aio->a_pend == 0); // provider only calls us *once*
+
+ nni_list_node_remove(&aio->a_expire_node);
+ aio->a_pend = 1;
aio->a_result = result;
aio->a_count = count;
aio->a_prov_cancel = NULL;
aio->a_prov_data = NULL;
+ if (pipe) {
+ aio->a_pipe = pipe;
+ }
+ if (msg) {
+ aio->a_msg = msg;
+ }
- // This is guaranteed to just be a list operation at this point,
- // because done wasn't set.
- nni_aio_expire_remove(aio);
aio->a_expire = NNI_TIME_NEVER;
- nni_cv_wake(&aio->a_cv);
- nni_task_dispatch(&aio->a_task);
- nni_mtx_unlock(&aio->a_lk);
- return (0);
+ // If we are expiring, then we rely on the expiration thread to
+ // complete this; we must not because the expiration thread is
+ // still holding the reference.
+ if (!aio->a_expiring) {
+ aio->a_done = 1;
+ nni_cv_wake(&aio->a_cv);
+ nni_task_dispatch(&aio->a_task);
+ }
+ nni_mtx_unlock(&nni_aio_lk);
}
-int
-nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
+void
+nni_aio_finish(nni_aio *aio, int result, size_t count)
{
- nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & NNI_AIO_DONE) {
- // Operation already done (canceled or timed out?)
- nni_mtx_unlock(&aio->a_lk);
- return (NNG_ESTATE);
- }
- aio->a_flags |= NNI_AIO_DONE;
+ nni_aio_finish_impl(aio, result, count, NULL, NULL);
+}
- aio->a_result = result;
- aio->a_count = 0;
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
- aio->a_pipe = pipe;
+void
+nni_aio_finish_error(nni_aio *aio, int result)
+{
+ nni_aio_finish_impl(aio, result, 0, NULL, NULL);
+}
- // This is guaranteed to just be a list operation at this point,
- // because done wasn't set.
- nni_aio_expire_remove(aio);
- aio->a_expire = NNI_TIME_NEVER;
- nni_cv_wake(&aio->a_cv);
+void
+nni_aio_finish_pipe(nni_aio *aio, void *pipe)
+{
+ NNI_ASSERT(pipe != NULL);
+ nni_aio_finish_impl(aio, 0, 0, pipe, NULL);
+}
- nni_task_dispatch(&aio->a_task);
- nni_mtx_unlock(&aio->a_lk);
- return (0);
+void
+nni_aio_finish_msg(nni_aio *aio, nni_msg *msg)
+{
+ NNI_ASSERT(msg != NULL);
+ nni_aio_finish_impl(aio, 0, nni_msg_len(msg), NULL, msg);
}
void
@@ -261,12 +262,9 @@ nni_aio_list_active(nni_aio *aio)
static void
nni_aio_expire_add(nni_aio *aio)
{
- nni_mtx * mtx = &nni_aio_expire_mtx;
- nni_cv * cv = &nni_aio_expire_cv;
nni_list *list = &nni_aio_expire_aios;
nni_aio * naio;
- nni_mtx_lock(mtx);
// This is a reverse walk of the list. We're more likely to find
// a match at the end of the list.
for (naio = nni_list_last(list); naio != NULL;
@@ -280,105 +278,69 @@ nni_aio_expire_add(nni_aio *aio)
// This has the shortest time, so insert at the start.
nni_list_prepend(list, aio);
// And, as we are the latest, kick the thing.
- nni_cv_wake(cv);
+ nni_cv_wake(&nni_aio_expire_cv);
}
- nni_mtx_unlock(mtx);
-}
-
-static void
-nni_aio_expire_remove(nni_aio *aio)
-{
- nni_mtx * mtx = &nni_aio_expire_mtx;
- nni_cv * cv = &nni_aio_expire_cv;
- nni_list *list = &nni_aio_expire_aios;
-
- nni_mtx_lock(mtx);
- if (nni_list_active(list, aio)) {
- nni_list_remove(list, aio);
- }
- while (aio == nni_aio_expire_current) {
- nni_cv_wait(cv);
- }
- nni_mtx_unlock(mtx);
}
static void
nni_aio_expire_loop(void *arg)
{
- nni_mtx * mtx = &nni_aio_expire_mtx;
- nni_cv * cv = &nni_aio_expire_cv;
- nni_list *aios = &nni_aio_expire_aios;
- nni_aio * aio;
- nni_time now;
-
- void (*cancelfn)(nni_aio *);
+ nni_list * aios = &nni_aio_expire_aios;
+ nni_aio * aio;
+ nni_time now;
+ nni_aio_cancelfn cancelfn;
NNI_ARG_UNUSED(arg);
for (;;) {
- nni_mtx_lock(mtx);
-
- // If we are resuming this loop after processing an AIO,
- // note that we are done with it, and wake anyone waiting
- // for that to clear up.
- if ((aio = nni_aio_expire_current) != NULL) {
- nni_aio_expire_current = NULL;
- nni_cv_wake(cv);
- }
+ nni_mtx_lock(&nni_aio_lk);
if (nni_aio_expire_exit) {
- nni_mtx_unlock(mtx);
+ nni_mtx_unlock(&nni_aio_lk);
return;
}
if ((aio = nni_list_first(aios)) == NULL) {
- nni_cv_wait(cv);
- nni_mtx_unlock(mtx);
+ nni_cv_wait(&nni_aio_expire_cv);
+ nni_mtx_unlock(&nni_aio_lk);
continue;
}
now = nni_clock();
if (now < aio->a_expire) {
// Unexpired; the list is ordered, so we just wait.
- nni_cv_until(cv, aio->a_expire);
- nni_mtx_unlock(mtx);
+ nni_cv_until(&nni_aio_expire_cv, aio->a_expire);
+ nni_mtx_unlock(&nni_aio_lk);
continue;
}
// This aio's time has come. Expire it, canceling any
// outstanding I/O.
-
nni_list_remove(aios, aio);
- nni_aio_expire_current = aio;
- nni_mtx_unlock(mtx);
-
- cancelfn = NULL;
- nni_mtx_lock(&aio->a_lk);
- if ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) != 0) {
- nni_mtx_unlock(&aio->a_lk);
- continue;
- }
+ // Mark it as expiring. This acts as a hold on
+ // the aio, similar to the consumers. The actual taskq
+ // dispatch on completion won't occur until this is cleared,
+ // and the done flag won't be set either.
+ aio->a_expiring = 1;
+ cancelfn = aio->a_prov_cancel;
- aio->a_flags |= NNI_AIO_DONE;
-
- aio->a_result = NNG_ETIMEDOUT;
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- nni_cv_wake(&aio->a_cv);
- nni_mtx_unlock(&aio->a_lk);
-
- // Cancel any outstanding activity.
+ // Cancel any outstanding activity. This is always non-NULL
+ // for a valid aio, and becomes NULL only when an AIO is
+ // already being canceled or finished.
if (cancelfn != NULL) {
- cancelfn(aio);
+ nni_mtx_unlock(&nni_aio_lk);
+ cancelfn(aio, NNG_ETIMEDOUT);
+ nni_mtx_lock(&nni_aio_lk);
}
- // Arguably we could avoid dispatching, and execute the
- // callback inline here as we are already on a separate
- // thread. But keeping it separate is clearer, and more
- // consistent with other uses. And this should not be a
- // hot code path.
+ NNI_ASSERT(aio->a_pend); // nni_aio_finish was run
+ NNI_ASSERT(aio->a_prov_cancel == NULL);
+ aio->a_expiring = 0;
+ aio->a_done = 1;
+ nni_cv_wake(&aio->a_cv);
nni_task_dispatch(&aio->a_task);
+ nni_mtx_unlock(&nni_aio_lk);
}
}
@@ -386,7 +348,7 @@ int
nni_aio_sys_init(void)
{
int rv;
- nni_mtx *mtx = &nni_aio_expire_mtx;
+ nni_mtx *mtx = &nni_aio_lk;
nni_cv * cv = &nni_aio_expire_cv;
nni_thr *thr = &nni_aio_expire_thr;
@@ -409,7 +371,7 @@ fail:
void
nni_aio_sys_fini(void)
{
- nni_mtx *mtx = &nni_aio_expire_mtx;
+ nni_mtx *mtx = &nni_aio_lk;
nni_cv * cv = &nni_aio_expire_cv;
nni_thr *thr = &nni_aio_expire_thr;
diff --git a/src/core/aio.h b/src/core/aio.h
index 31a54f12..0f41c01f 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -18,6 +18,8 @@
typedef struct nni_aio_ops nni_aio_ops;
+typedef void (*nni_aio_cancelfn)(nni_aio *, int);
+
// An nni_aio is an async I/O handle.
struct nni_aio {
int a_result; // Result code (nng_errno)
@@ -25,10 +27,14 @@ struct nni_aio {
nni_time a_expire;
// These fields are private to the aio framework.
- nni_mtx a_lk;
nni_cv a_cv;
- unsigned a_flags;
- int a_refcnt; // prevent use-after-free
+ unsigned a_init : 1; // initialized flag
+ unsigned a_fini : 1; // shutting down (no new operations)
+ unsigned a_done : 1; // operation has completed
+ unsigned a_pend : 1; // completion routine pending
+ unsigned a_active : 1; // aio was started
+ unsigned a_expiring : 1; // expiration callback in progress
+ unsigned a_pad : 27; // ensure 32-bit alignment
nni_task a_task;
// Read/write operations.
@@ -47,9 +53,9 @@ struct nni_aio {
int a_naddrs;
// Provider-use fields.
- void (*a_prov_cancel)(nni_aio *);
- void * a_prov_data;
- nni_list_node a_prov_node;
+ nni_aio_cancelfn a_prov_cancel;
+ void * a_prov_data;
+ nni_list_node a_prov_node;
// Expire node.
nni_list_node a_expire_node;
@@ -106,21 +112,17 @@ extern void nni_aio_list_remove(nni_aio *);
extern int nni_aio_list_active(nni_aio *);
// nni_aio_finish is called by the provider when an operation is complete.
-// The provider gives the result code (0 for success, an NNG errno otherwise),
-// and the amount of data transferred (if any). If the return code is
-// non-zero, it indicates that the operation failed (usually because the aio
-// was already canceled.) This is important for providers that need to
-// prevent resources (new pipes for example) from accidentally leaking
-// during close operations.
-extern int nni_aio_finish(nni_aio *, int, size_t);
-extern int nni_aio_finish_pipe(nni_aio *, int, void *);
+extern void nni_aio_finish(nni_aio *, int, size_t);
+extern void nni_aio_finish_error(nni_aio *, int);
+extern void nni_aio_finish_pipe(nni_aio *, void *);
+extern void nni_aio_finish_msg(nni_aio *, nni_msg *);
// nni_aio_cancel is used to cancel an operation. Any pending I/O or
// timeouts are canceled if possible, and the callback will be returned
// with the indicated result (NNG_ECLOSED or NNG_ECANCELED is recommended.)
extern void nni_aio_cancel(nni_aio *, int rv);
-extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *);
+extern int nni_aio_start(nni_aio *, nni_aio_cancelfn, void *);
// nni_aio_stop is used to abort all further operations on the AIO.
// When this is executed, no further operations or callbacks will be
diff --git a/src/core/endpt.c b/src/core/endpt.c
index c0f9d399..aa4c5a31 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -175,6 +175,7 @@ nni_ep_close(nni_ep *ep)
nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED);
nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED);
nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED);
+ nni_aio_cancel(&ep->ep_backoff, NNG_ECLOSED);
// Stop the underlying transport.
ep->ep_ops.ep_close(ep->ep_data);
@@ -188,6 +189,7 @@ nni_ep_reap(nni_ep *ep)
nni_aio_stop(&ep->ep_acc_aio);
nni_aio_stop(&ep->ep_con_aio);
nni_aio_stop(&ep->ep_con_syn);
+ nni_aio_stop(&ep->ep_backoff);
// Take us off the sock list.
nni_sock_ep_remove(ep->ep_sock, ep);
@@ -233,6 +235,13 @@ nni_ep_stop(nni_ep *ep)
}
static void
+nni_ep_backoff_cancel(nni_aio *aio, int rv)
+{
+ // The only way this ever gets "finished", is via cancellation.
+ nni_aio_finish_error(aio, rv);
+}
+
+static void
nni_ep_backoff_start(nni_ep *ep)
{
nni_duration backoff;
@@ -255,7 +264,7 @@ nni_ep_backoff_start(nni_ep *ep)
// random number, but this really doesn't matter.
ep->ep_backoff.a_expire = nni_clock() + (nni_random() % backoff);
- nni_aio_start(&ep->ep_backoff, NULL, ep);
+ nni_aio_start(&ep->ep_backoff, nni_ep_backoff_cancel, ep);
}
static void
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index d98c68be..2ebc4927 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -121,26 +121,17 @@ nni_msgq_fini(nni_msgq *mq)
NNI_FREE_STRUCT(mq);
}
-static void
-nni_msgq_finish(nni_aio *aio, int rv)
-{
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, 0);
-}
-
void
nni_msgq_set_get_error(nni_msgq *mq, int error)
{
- nni_aio *naio;
nni_aio *aio;
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
if (error != 0) {
- naio = nni_list_first(&mq->mq_aio_getq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_getq, aio);
- nni_msgq_finish(aio, error);
+ while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, error);
}
}
mq->mq_geterr = error;
@@ -150,16 +141,14 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
void
nni_msgq_set_put_error(nni_msgq *mq, int error)
{
- nni_aio *naio;
nni_aio *aio;
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
if (error != 0) {
- naio = nni_list_first(&mq->mq_aio_putq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_putq, aio);
- nni_msgq_finish(aio, error);
+ while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, error);
}
}
mq->mq_puterr = error;
@@ -169,21 +158,15 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
void
nni_msgq_set_error(nni_msgq *mq, int error)
{
- nni_aio *naio;
nni_aio *aio;
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
if (error != 0) {
- naio = nni_list_first(&mq->mq_aio_getq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_getq, aio);
- nni_msgq_finish(aio, error);
- }
- naio = nni_list_first(&mq->mq_aio_putq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_putq, aio);
- nni_msgq_finish(aio, error);
+ while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) ||
+ ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, error);
}
}
mq->mq_puterr = error;
@@ -207,11 +190,12 @@ nni_msgq_run_putq(nni_msgq *mq)
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- raio->a_msg = msg;
waio->a_msg = NULL;
- nni_msgq_finish(raio, 0);
- nni_msgq_finish(waio, 0);
+ nni_aio_list_remove(raio);
+ nni_aio_list_remove(waio);
+ nni_aio_finish(waio, 0, len);
+ nni_aio_finish_msg(raio, msg);
continue;
}
@@ -224,7 +208,7 @@ nni_msgq_run_putq(nni_msgq *mq)
}
mq->mq_len++;
waio->a_msg = NULL;
- nni_msgq_finish(waio, 0);
+ nni_aio_finish(waio, 0, len);
continue;
}
@@ -243,14 +227,14 @@ nni_msgq_run_getq(nni_msgq *mq)
while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
// If anything is waiting in the queue, get it first.
if (mq->mq_len != 0) {
- nni_list_remove(&mq->mq_aio_getq, raio);
msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get == mq->mq_alloc) {
mq->mq_get = 0;
}
mq->mq_len--;
raio->a_msg = msg;
- nni_msgq_finish(raio, 0);
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
continue;
}
@@ -258,9 +242,11 @@ nni_msgq_run_getq(nni_msgq *mq)
if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
msg = waio->a_msg;
waio->a_msg = NULL;
- raio->a_msg = msg;
- nni_msgq_finish(raio, 0);
- nni_msgq_finish(waio, 0);
+ nni_aio_list_remove(waio);
+ nni_aio_list_remove(raio);
+
+ nni_aio_finish(waio, 0, nni_msg_len(msg));
+ nni_aio_finish_msg(raio, msg);
continue;
}
@@ -300,16 +286,15 @@ nni_msgq_run_notify(nni_msgq *mq)
}
static void
-nni_msgq_cancel(nni_aio *aio)
+nni_msgq_cancel(nni_aio *aio, int rv)
{
nni_msgq *mq = aio->a_prov_data;
- if (mq == NULL) {
- return;
- }
-
nni_mtx_lock(&mq->mq_lock);
- nni_aio_list_remove(aio);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
nni_mtx_unlock(&mq->mq_lock);
}
@@ -346,12 +331,12 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
return;
}
if (mq->mq_closed) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&mq->mq_lock);
return;
}
if (mq->mq_puterr) {
- nni_aio_finish(aio, mq->mq_puterr, 0);
+ nni_aio_finish_error(aio, mq->mq_puterr);
nni_mtx_unlock(&mq->mq_lock);
return;
}
@@ -372,12 +357,12 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
return;
}
if (mq->mq_closed) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&mq->mq_lock);
return;
}
if (mq->mq_geterr) {
- nni_aio_finish(aio, mq->mq_geterr, 0);
+ nni_aio_finish_error(aio, mq->mq_geterr);
nni_mtx_unlock(&mq->mq_lock);
return;
}
@@ -439,9 +424,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
nni_list_remove(&mq->mq_aio_getq, raio);
- raio->a_msg = msg;
-
- nni_aio_finish(raio, 0, len);
+ nni_aio_finish_msg(raio, msg);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
@@ -512,13 +495,16 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
break;
}
}
- // If we timedout, free any remaining messages in the queue.
- // Also complete the putq as NNG_ECLOSED.
+ // Timed out or writers drained.
+
+ // Complete the putq as NNG_ECLOSED.
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
+
+ // Free any remaining messages in the queue.
while (mq->mq_len > 0) {
nni_msg *msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get > mq->mq_alloc) {
@@ -551,17 +537,10 @@ nni_msgq_close(nni_msgq *mq)
// Let all pending blockers know we are closing the queue.
naio = nni_list_first(&mq->mq_aio_getq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_getq, aio);
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- }
-
- naio = nni_list_first(&mq->mq_aio_putq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_putq, aio);
+ while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) ||
+ ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) {
nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
nni_mtx_unlock(&mq->mq_lock);
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 5fbcdb33..14b04085 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -158,7 +158,6 @@ void
nni_taskq_fini(nni_taskq *tq)
{
int i;
- int busy;
// First drain the taskq completely. This is necessary since some
// tasks that are presently running may need to schedule additional
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 7a91b4ec..a8c8395e 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -46,13 +46,17 @@ struct nni_posix_epdesc {
};
static void
-nni_posix_epdesc_cancel(nni_aio *aio)
+nni_posix_epdesc_cancel(nni_aio *aio, int rv)
{
nni_posix_epdesc *ed = aio->a_prov_data;
+ NNI_ASSERT(rv != 0);
nni_mtx_lock(&ed->mtx);
- nni_aio_list_remove(aio);
- NNI_ASSERT(aio->a_pipe == NULL);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ NNI_ASSERT(aio->a_pipe == NULL);
+ nni_aio_finish_error(aio, rv);
+ }
nni_mtx_unlock(&ed->mtx);
}
@@ -70,8 +74,10 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
(void) close(newfd);
}
}
- if ((nni_aio_finish_pipe(aio, rv, pd) != 0) && (pd != NULL)) {
- nni_posix_pipedesc_fini(pd);
+ if (rv != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ nni_aio_finish_pipe(aio, pd);
}
}
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index 5dd77dcb..bd74e0c0 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -39,7 +39,7 @@ static void
nni_posix_pipedesc_finish(nni_aio *aio, int rv)
{
nni_aio_list_remove(aio);
- (void) nni_aio_finish(aio, rv, aio->a_count);
+ nni_aio_finish(aio, rv, aio->a_count);
}
static void
@@ -233,12 +233,15 @@ nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
}
static void
-nni_posix_pipedesc_cancel(nni_aio *aio)
+nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
{
nni_posix_pipedesc *pd = aio->a_prov_data;
nni_mtx_lock(&pd->mtx);
- nni_aio_list_remove(aio);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
nni_mtx_unlock(&pd->mtx);
}
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index 5852f34c..09d40b94 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -62,7 +62,7 @@ nni_posix_resolv_finish(nni_posix_resolv_item *item, int rv)
}
static void
-nni_posix_resolv_cancel(nni_aio *aio)
+nni_posix_resolv_cancel(nni_aio *aio, int rv)
{
nni_posix_resolv_item *item;
@@ -75,6 +75,7 @@ nni_posix_resolv_cancel(nni_aio *aio)
nni_mtx_unlock(&nni_posix_resolv_mtx);
nni_task_cancel(&item->task);
NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
}
static int
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index a77fcf0b..a2700485 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -65,17 +66,13 @@ struct nni_win_event {
nni_aio * aio;
nni_mtx mtx;
nni_cv cv;
- int flags;
+ unsigned run : 1;
+ unsigned fini : 1;
int count;
int status;
nni_win_event_ops ops;
};
-enum nni_win_event_flags {
- NNI_WIN_EVENT_RUNNING = 1,
- NNI_WIN_EVENT_ABORT = 2,
-};
-
extern int nni_win_error(int);
extern int nni_win_event_init(nni_win_event *, nni_win_event_ops *, void *);
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index df0357c8..c4cdcb8a 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -15,7 +15,7 @@
#define NNI_WIN_IOCP_NTHREADS 4
#include <stdio.h>
-// Windows IO Completion Port support. We basically creaet a single
+// Windows IO Completion Port support. We basically create a single
// IO completion port, then start threads on it. Handles are added
// to the port on an as needed basis. We use a single IO completion
// port for pretty much everything.
@@ -25,6 +25,18 @@ static nni_thr nni_win_iocp_thrs[NNI_WIN_IOCP_NTHREADS];
static nni_mtx nni_win_iocp_mtx;
static void
+nni_win_event_finish(nni_win_event *evt, nni_aio *aio)
+{
+ evt->run = 0;
+ if (aio != NULL) {
+ evt->ops.wev_finish(evt, aio);
+ }
+ if (evt->fini) {
+ nni_cv_wake(&evt->cv);
+ }
+}
+
+static void
nni_win_iocp_handler(void *arg)
{
HANDLE iocp;
@@ -59,42 +71,30 @@ nni_win_iocp_handler(void *arg)
if (ok) {
rv = ERROR_SUCCESS;
- } else {
- rv = GetLastError();
+ } else if (evt->status == 0) {
+ evt->status = nni_win_error(GetLastError());
}
- aio = evt->aio;
- evt->aio = NULL;
- evt->status = rv;
- evt->count = cnt;
-
- // Aborted operations don't get the finish callback done.
- // All others do.
- evt->flags &= ~NNI_WIN_EVENT_RUNNING;
- if (evt->flags & NNI_WIN_EVENT_ABORT) {
- nni_cv_wake(&evt->cv);
- } else if ((rv != ERROR_OPERATION_ABORTED) && (aio != NULL)) {
- evt->ops.wev_finish(evt, aio);
- }
+ aio = evt->aio;
+ evt->aio = NULL;
+ evt->count = cnt;
+
+ nni_win_event_finish(evt, aio);
nni_mtx_unlock(&evt->mtx);
}
}
static void
-nni_win_event_cancel(nni_aio *aio)
+nni_win_event_cancel(nni_aio *aio, int rv)
{
nni_win_event *evt = aio->a_prov_data;
nni_mtx_lock(&evt->mtx);
- evt->flags |= NNI_WIN_EVENT_ABORT;
- evt->aio = NULL;
-
- // Use provider specific cancellation.
- evt->ops.wev_cancel(evt);
+ if (evt->aio == aio) {
+ evt->status = rv;
- // Wait for everything to stop referencing this.
- while (evt->flags & NNI_WIN_EVENT_RUNNING) {
- nni_cv_wait(&evt->cv);
+ // Use provider specific cancellation.
+ evt->ops.wev_cancel(evt);
}
nni_mtx_unlock(&evt->mtx);
}
@@ -107,28 +107,28 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
// The lock is held.
// Abort operation -- no further activity.
- if (evt->flags & NNI_WIN_EVENT_ABORT) {
+ if (evt->fini) {
+ evt->run = 0;
+ nni_cv_wake(&evt->cv);
return;
}
- evt->status = ERROR_SUCCESS;
+ evt->status = 0;
evt->count = 0;
if (!ResetEvent(evt->olpd.hEvent)) {
- evt->status = GetLastError();
+ evt->status = nni_win_error(GetLastError());
evt->count = 0;
-
- evt->ops.wev_finish(evt, aio);
+ nni_win_event_finish(evt, aio);
return;
}
evt->aio = aio;
- evt->flags |= NNI_WIN_EVENT_RUNNING;
+ evt->run = 1;
if (evt->ops.wev_start(evt, aio) != 0) {
// Start completed synchronously. It will have stored
// the count and status in the evt.
- evt->flags &= ~NNI_WIN_EVENT_RUNNING;
evt->aio = NULL;
- evt->ops.wev_finish(evt, aio);
+ nni_win_event_finish(evt, aio);
}
}
@@ -154,20 +154,10 @@ nni_win_event_complete(nni_win_event *evt, int cnt)
void
nni_win_event_close(nni_win_event *evt)
{
- nni_aio *aio;
-
if (evt->ptr != NULL) {
nni_mtx_lock(&evt->mtx);
- evt->flags |= NNI_WIN_EVENT_ABORT;
+ evt->status = NNG_ECLOSED;
evt->ops.wev_cancel(evt);
- if ((aio = evt->aio) != NULL) {
- evt->aio = NULL;
- // We really don't care if we transferred data or not.
- // The caller indicates they have closed the pipe.
- evt->status = ERROR_INVALID_HANDLE;
- evt->count = 0;
- evt->ops.wev_finish(evt, aio);
- }
nni_mtx_unlock(&evt->mtx);
}
}
@@ -195,28 +185,27 @@ nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr)
((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) {
return (rv); // NB: This will never happen on Windows.
}
- evt->ops = *ops;
- evt->aio = NULL;
- evt->ptr = ptr;
+ evt->ops = *ops;
+ evt->aio = NULL;
+ evt->ptr = ptr;
+ evt->fini = 0;
+ evt->run = 0;
return (0);
}
void
nni_win_event_fini(nni_win_event *evt)
{
- nni_aio *aio;
-
if (evt->ptr != NULL) {
nni_mtx_lock(&evt->mtx);
- if ((aio = evt->aio) != NULL) {
- evt->flags |= NNI_WIN_EVENT_ABORT;
- evt->aio = NULL;
- // Use provider specific cancellation.
- evt->ops.wev_cancel(evt);
- }
+ evt->fini = 1;
+
+ // Use provider specific cancellation.
+ evt->ops.wev_cancel(evt);
+
// Wait for everything to stop referencing this.
- while (evt->flags & NNI_WIN_EVENT_RUNNING) {
+ while (evt->run) {
nni_cv_wait(&evt->cv);
}
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index bd9ce26d..c9eb20ec 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -65,7 +65,7 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
if (pipe->p == INVALID_HANDLE_VALUE) {
- evt->status = ERROR_INVALID_HANDLE;
+ evt->status = NNG_ECLOSED;
evt->count = 0;
return (1);
}
@@ -92,7 +92,7 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
}
if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
- evt->status = rv;
+ evt->status = nni_win_error(rv);
evt->count = 0;
return (1);
}
@@ -108,13 +108,7 @@ nni_win_ipc_pipe_cancel(nni_win_event *evt)
{
nni_plat_ipc_pipe *pipe = evt->ptr;
- if (CancelIoEx(pipe->p, &evt->olpd)) {
- DWORD cnt;
-
- // If we canceled, make sure that we've completely
- // finished with the overlapped.
- GetOverlappedResult(pipe->p, &evt->olpd, &cnt, TRUE);
- }
+ CancelIoEx(pipe->p, &evt->olpd);
}
static void
@@ -146,7 +140,7 @@ nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio)
}
// All done; hopefully successfully.
- nni_aio_finish(aio, nni_win_error(rv), aio->a_count);
+ nni_aio_finish(aio, rv, aio->a_count);
}
static int
@@ -294,7 +288,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
HANDLE newp, oldp;
if ((rv = evt->status) != 0) {
- nni_aio_finish(aio, nni_win_error(rv), 0);
+ nni_aio_finish_error(aio, rv);
return;
}
@@ -308,7 +302,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
// We connected, but as we cannot get a new pipe,
// we have to disconnect the old one.
DisconnectNamedPipe(ep->p);
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
return;
}
if ((rv = nni_win_iocp_register(newp)) != 0) {
@@ -317,7 +311,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
// And discard the half-baked new one.
DisconnectNamedPipe(newp);
(void) CloseHandle(newp);
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
return;
}
@@ -329,14 +323,11 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
// the old one, since failed to be able to use it.
DisconnectNamedPipe(oldp);
(void) CloseHandle(oldp);
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
return;
}
- // What if the pipe is already finished?
- if (nni_aio_finish_pipe(aio, 0, pipe) != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- }
+ nni_aio_finish_pipe(aio, pipe);
}
static void
@@ -344,13 +335,7 @@ nni_win_ipc_acc_cancel(nni_win_event *evt)
{
nni_plat_ipc_ep *ep = evt->ptr;
- if (CancelIoEx(ep->p, &evt->olpd)) {
- DWORD cnt;
-
- // If we canceled, make sure that we've completely
- // finished with the overlapped.
- GetOverlappedResult(ep->p, &evt->olpd, &cnt, TRUE);
- }
+ (void) CancelIoEx(ep->p, &evt->olpd);
// Just to be sure.
(void) DisconnectNamedPipe(ep->p);
}
@@ -376,7 +361,7 @@ nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio)
default:
// Fast-fail (synchronous).
- evt->status = rv;
+ evt->status = nni_win_error(rv);
evt->count = 0;
return (1);
}
@@ -468,9 +453,7 @@ nni_win_ipc_conn_thr(void *arg)
((rv = nni_win_iocp_register(p)) != 0)) {
goto fail;
}
- if (rv = nni_aio_finish_pipe(aio, 0, pipe) != 0) {
- nni_plat_ipc_pipe_fini(pipe);
- }
+ nni_aio_finish_pipe(aio, pipe);
continue;
fail:
@@ -481,7 +464,7 @@ nni_win_ipc_conn_thr(void *arg)
if (pipe != NULL) {
nni_plat_ipc_pipe_fini(pipe);
}
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
}
if (nni_list_empty(&w->waiters)) {
@@ -496,16 +479,19 @@ nni_win_ipc_conn_thr(void *arg)
}
static void
-nni_win_ipc_conn_cancel(nni_aio *aio)
+nni_win_ipc_conn_cancel(nni_aio *aio, int rv)
{
nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
nni_plat_ipc_ep * ep = aio->a_prov_data;
nni_mtx_lock(&w->mtx);
- ep->con_aio = NULL;
- if (nni_list_active(&w->waiters, ep)) {
- nni_list_remove(&w->waiters, ep);
- nni_cv_wake(&w->cv);
+ if (aio == ep->con_aio) {
+ ep->con_aio = NULL;
+ if (nni_list_active(&w->waiters, ep)) {
+ nni_list_remove(&w->waiters, ep);
+ nni_cv_wake(&w->cv);
+ }
+ nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&w->mtx);
}
@@ -556,7 +542,7 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep)
}
if ((aio = ep->con_aio) != NULL) {
ep->con_aio = NULL;
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
nni_mtx_unlock(&w->mtx);
break;
diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c
index 633dd256..df6275ff 100644
--- a/src/platform/windows/win_net.c
+++ b/src/platform/windows/win_net.c
@@ -144,7 +144,7 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio)
}
if ((s = pipe->s) == INVALID_SOCKET) {
- evt->status = ERROR_INVALID_HANDLE;
+ evt->status = NNG_ECLOSED;
evt->count = 0;
return (1);
}
@@ -163,7 +163,7 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio)
if ((rv == SOCKET_ERROR) &&
((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
- evt->status = rv;
+ evt->status = nni_win_error(rv);
evt->count = 0;
return (1);
}
@@ -179,13 +179,7 @@ nni_win_tcp_pipe_cancel(nni_win_event *evt)
{
nni_plat_tcp_pipe *pipe = evt->ptr;
- if (CancelIoEx((HANDLE) pipe->s, &evt->olpd)) {
- DWORD cnt;
-
- // If we canceled, make sure that we've completely
- // finished with the overlapped.
- GetOverlappedResult((HANDLE) pipe->s, &evt->olpd, &cnt, TRUE);
- }
+ (void) CancelIoEx((HANDLE) pipe->s, &evt->olpd);
}
static void
@@ -228,7 +222,7 @@ nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio)
}
// All done; hopefully successfully.
- nni_aio_finish(aio, nni_win_error(rv), aio->a_count);
+ nni_aio_finish(aio, rv, aio->a_count);
}
static int
@@ -507,12 +501,8 @@ nni_win_tcp_acc_cancel(nni_win_event *evt)
nni_plat_tcp_ep *ep = evt->ptr;
SOCKET s = ep->s;
- if ((s != INVALID_SOCKET) && CancelIoEx((HANDLE) s, &evt->olpd)) {
- DWORD cnt;
-
- // If we canceled, make sure that we've completely
- // finished with the overlapped.
- GetOverlappedResult((HANDLE) s, &evt->olpd, &cnt, TRUE);
+ if (s != INVALID_SOCKET) {
+ CancelIoEx((HANDLE) s, &evt->olpd);
}
}
@@ -531,22 +521,15 @@ nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio)
return;
}
- if ((rv = evt->status) != 0) {
- closesocket(s);
- nni_aio_finish(aio, nni_win_error(rv), 0);
- return;
- }
-
- if (((rv = nni_win_iocp_register((HANDLE) s)) != 0) ||
+ if (((rv = evt->status) != 0) ||
+ ((rv = nni_win_iocp_register((HANDLE) s)) != 0) ||
((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) {
closesocket(s);
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
return;
}
- if (nni_aio_finish_pipe(aio, 0, pipe) != 0) {
- nni_plat_tcp_pipe_fini(pipe);
- }
+ nni_aio_finish_pipe(aio, pipe);
}
static int
@@ -559,7 +542,7 @@ nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio)
acc_s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
if (acc_s == INVALID_SOCKET) {
- evt->status = GetLastError();
+ evt->status = nni_win_error(GetLastError());
evt->count = 0;
return (1);
}
@@ -575,7 +558,7 @@ nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio)
default:
// Fast-fail (synchronous).
- evt->status = rv;
+ evt->status = nni_win_error(rv);
evt->count = 0;
return (1);
}
@@ -599,12 +582,8 @@ nni_win_tcp_con_cancel(nni_win_event *evt)
nni_plat_tcp_ep *ep = evt->ptr;
SOCKET s = ep->s;
- if ((s != INVALID_SOCKET) && CancelIoEx((HANDLE) s, &evt->olpd)) {
- DWORD cnt;
-
- // If we canceled, make sure that we've completely
- // finished with the overlapped.
- GetOverlappedResult((HANDLE) s, &evt->olpd, &cnt, TRUE);
+ if (s != INVALID_SOCKET) {
+ CancelIoEx((HANDLE) s, &evt->olpd);
}
}
@@ -619,19 +598,14 @@ nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio)
s = ep->s;
ep->s = INVALID_SOCKET;
- if ((rv = evt->status) != 0) {
- closesocket(s);
- nni_aio_finish(aio, nni_win_error(rv), 0);
- return;
- }
-
// The socket was already registere with the IOCP.
- if ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0) {
+ if (((rv = evt->status) != 0) ||
+ ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) {
// The new pipe is already fine for us. Discard
// the old one, since failed to be able to use it.
closesocket(s);
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
return;
}
@@ -650,7 +624,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio)
s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
if (s == INVALID_SOCKET) {
- evt->status = GetLastError();
+ evt->status = nni_win_error(GetLastError());
evt->count = 0;
return (1);
}
@@ -667,7 +641,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio)
len = ep->remlen;
}
if (bind(s, (struct sockaddr *) &bss, len) < 0) {
- evt->status = GetLastError();
+ evt->status = nni_win_error(GetLastError());
evt->count = 0;
closesocket(s);
return (1);
@@ -687,7 +661,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio)
if ((rv = GetLastError()) != ERROR_IO_PENDING) {
closesocket(s);
ep->s = INVALID_SOCKET;
- evt->status = rv;
+ evt->status = nni_win_error(rv);
evt->count = 0;
return (1);
}
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index 44d00c34..a01dc123 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -30,13 +30,13 @@ static nni_mtx nni_win_resolv_mtx;
typedef struct nni_win_resolv_item nni_win_resolv_item;
struct nni_win_resolv_item {
- int family;
- int passive;
- const char * name;
- const char * serv;
- int proto;
- nni_aio * aio;
- nni_taskq_ent tqe;
+ int family;
+ int passive;
+ const char *name;
+ const char *serv;
+ int proto;
+ nni_aio * aio;
+ nni_task task;
};
static void
@@ -50,7 +50,7 @@ nni_win_resolv_finish(nni_win_resolv_item *item, int rv)
}
static void
-nni_win_resolv_cancel(nni_aio *aio)
+nni_win_resolv_cancel(nni_aio *aio, int rv)
{
nni_win_resolv_item *item;
@@ -61,8 +61,9 @@ nni_win_resolv_cancel(nni_aio *aio)
}
aio->a_prov_data = NULL;
nni_mtx_unlock(&nni_win_resolv_mtx);
- nni_taskq_cancel(nni_win_resolv_tq, &item->tqe);
+ nni_task_cancel(&item->task);
NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
}
static int
@@ -209,7 +210,8 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
return;
}
- nni_taskq_ent_init(&item->tqe, nni_win_resolv_task, item);
+ nni_task_init(
+ nni_win_resolv_tq, &item->task, nni_win_resolv_task, item);
switch (family) {
case NNG_AF_INET:
@@ -236,11 +238,7 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
NNI_FREE_STRUCT(item);
return;
}
- if ((rv = nni_taskq_dispatch(nni_win_resolv_tq, &item->tqe)) != 0) {
- nni_win_resolv_finish(item, rv);
- nni_mtx_unlock(&nni_win_resolv_mtx);
- return;
- }
+ nni_task_dispatch(&item->task);
nni_mtx_unlock(&nni_win_resolv_mtx);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 226a31ce..9cc43ad7 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -241,27 +241,24 @@ static void
nni_inproc_conn_finish(nni_aio *aio, int rv)
{
nni_inproc_ep *ep = aio->a_endpt;
+ void * pipe;
- if (rv != 0) {
- if (aio->a_pipe != NULL) {
- nni_inproc_pipe_fini(aio->a_pipe);
- aio->a_pipe = NULL;
- }
- }
nni_aio_list_remove(aio);
- if (ep != NULL) {
- if ((ep->mode != NNI_EP_MODE_LISTEN) &&
- nni_list_empty(&ep->aios)) {
- if (nni_list_active(&ep->clients, ep)) {
- nni_list_remove(&ep->clients, ep);
- }
- }
+ pipe = aio->a_pipe;
+ aio->a_pipe = NULL;
+
+ if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) &&
+ nni_list_empty(&ep->aios)) {
+ nni_list_node_remove(&ep->node);
}
- if (nni_aio_finish(aio, rv, 0) != 0) {
- if (aio->a_pipe != NULL) {
- nni_inproc_pipe_fini(aio->a_pipe);
- aio->a_pipe = NULL;
+
+ if (rv == 0) {
+ nni_aio_finish_pipe(aio, pipe);
+ } else {
+ if (pipe != NULL) {
+ nni_inproc_pipe_fini(pipe);
}
+ nni_aio_finish_error(aio, rv);
}
}
@@ -291,29 +288,6 @@ nni_inproc_ep_close(void *arg)
}
static void
-nni_inproc_connect_abort(nni_aio *aio)
-{
- nni_inproc_ep *ep = aio->a_endpt;
-
- nni_mtx_lock(&nni_inproc.mx);
-
- if (aio->a_pipe != NULL) {
- nni_inproc_pipe_fini(aio->a_pipe);
- aio->a_pipe = NULL;
- }
- nni_aio_list_remove(aio);
- if (ep != NULL) {
- if ((ep->mode != NNI_EP_MODE_LISTEN) &&
- nni_list_empty(&ep->aios)) {
- if (nni_list_active(&ep->clients, ep)) {
- nni_list_remove(&ep->clients, ep);
- }
- }
- }
- nni_mtx_unlock(&nni_inproc.mx);
-}
-
-static void
nni_inproc_accept_clients(nni_inproc_ep *server)
{
nni_inproc_ep * client, *nclient;
@@ -369,23 +343,24 @@ nni_inproc_accept_clients(nni_inproc_ep *server)
}
static void
-nni_inproc_ep_cancel(nni_aio *aio)
+nni_inproc_ep_cancel(nni_aio *aio, int rv)
{
- nni_inproc_ep *ep = aio->a_prov_data;
+ nni_inproc_ep * ep = aio->a_prov_data;
+ nni_inproc_pipe *pipe;
nni_mtx_lock(&nni_inproc.mx);
- if (nni_list_active(&ep->aios, aio)) {
- nni_list_remove(&ep->aios, aio);
- }
- // Arguably if the mode is a client... then we need to remove
- // it from the server's list. Notably this isn't *our* list,
- // but the offsets are the same and they're good enough using the
- // global lock to make it all safe.
- if (nni_list_active(&ep->clients, ep)) {
- nni_list_remove(&ep->clients, ep);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_list_node_remove(&ep->node);
+ if ((pipe = aio->a_pipe) != NULL) {
+ aio->a_pipe = NULL;
+ nni_inproc_pipe_fini(pipe);
+ }
+ nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&nni_inproc.mx);
}
+
static void
nni_inproc_ep_connect(void *arg, nni_aio *aio)
{
@@ -394,7 +369,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
int rv;
if (ep->mode != NNI_EP_MODE_DIAL) {
- nni_aio_finish(aio, NNG_EINVAL, 0);
+ nni_aio_finish_error(aio, NNG_EINVAL);
return;
}
nni_mtx_lock(&nni_inproc.mx);
@@ -406,24 +381,24 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
if (nni_list_active(&ep->clients, ep)) {
// We already have a pending connection...
- nni_aio_finish(aio, NNG_EINVAL, 0);
+ nni_aio_finish_error(aio, NNG_EINVAL);
nni_mtx_unlock(&nni_inproc.mx);
return;
}
if (ep->started) {
- nni_aio_finish(aio, NNG_EBUSY, 0);
+ nni_aio_finish_error(aio, NNG_EBUSY);
nni_mtx_unlock(&nni_inproc.mx);
return;
}
if (ep->closed) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&nni_inproc.mx);
return;
}
if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) {
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&nni_inproc.mx);
return;
}
@@ -491,32 +466,33 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio)
nni_inproc_ep *ep = arg;
int rv;
- if (ep->mode != NNI_EP_MODE_LISTEN) {
- nni_aio_finish(aio, NNG_EINVAL, 0);
+ nni_mtx_lock(&nni_inproc.mx);
+
+ if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&nni_inproc.mx);
return;
}
- nni_mtx_lock(&nni_inproc.mx);
+ if (ep->mode != NNI_EP_MODE_LISTEN) {
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ nni_mtx_unlock(&nni_inproc.mx);
+ return;
+ }
// We are already on the master list of servers, thanks to bind.
if (ep->closed) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&nni_inproc.mx);
return;
}
if (!ep->started) {
- nni_aio_finish(aio, NNG_ESTATE, 0);
- nni_mtx_unlock(&nni_inproc.mx);
- return;
- }
-
- if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) {
+ nni_aio_finish_error(aio, NNG_ESTATE);
nni_mtx_unlock(&nni_inproc.mx);
return;
}
if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) {
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&nni_inproc.mx);
return;
}
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 96dae6de..e8c7968f 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -142,15 +142,20 @@ fail:
}
static void
-nni_ipc_cancel_start(nni_aio *aio)
+nni_ipc_cancel_start(nni_aio *aio, int rv)
{
nni_ipc_pipe *pipe = aio->a_prov_data;
nni_mtx_lock(&pipe->mtx);
+ if (pipe->user_negaio != aio) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
pipe->user_negaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->negaio, aio->a_result);
+ nni_aio_cancel(&pipe->negaio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
@@ -239,10 +244,10 @@ nni_ipc_pipe_recv_cb(void *arg)
nni_ipc_pipe *pipe = arg;
nni_aio * aio;
int rv;
+ nni_msg * msg;
nni_mtx_lock(&pipe->mtx);
- aio = pipe->user_rxaio;
- if (aio == NULL) {
+ if ((aio = pipe->user_rxaio) == NULL) {
// aio was canceled
nni_mtx_unlock(&pipe->mtx);
return;
@@ -257,7 +262,7 @@ nni_ipc_pipe_recv_cb(void *arg)
pipe->rxmsg = NULL;
}
pipe->user_rxaio = NULL;
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -270,7 +275,7 @@ nni_ipc_pipe_recv_cb(void *arg)
// Check to make sure we got msg type 1.
if (pipe->rxhead[0] != 1) {
- nni_aio_finish(aio, NNG_EPROTO, 0);
+ nni_aio_finish_error(aio, NNG_EPROTO);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -282,7 +287,7 @@ nni_ipc_pipe_recv_cb(void *arg)
// the caller will shut down the pipe.
if (len > pipe->rcvmax) {
pipe->user_rxaio = NULL;
- nni_aio_finish(aio, NNG_EMSGSIZE, 0);
+ nni_aio_finish_error(aio, NNG_EMSGSIZE);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -294,7 +299,7 @@ nni_ipc_pipe_recv_cb(void *arg)
// unlikely to be much of an issue though.
if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
pipe->user_rxaio = NULL;
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -313,22 +318,27 @@ nni_ipc_pipe_recv_cb(void *arg)
// Otherwise we got a message read completely. Let the user know the
// good news.
pipe->user_rxaio = NULL;
- aio->a_msg = pipe->rxmsg;
+ msg = pipe->rxmsg;
pipe->rxmsg = NULL;
- nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg));
+ nni_aio_finish_msg(aio, msg);
nni_mtx_unlock(&pipe->mtx);
}
static void
-nni_ipc_cancel_tx(nni_aio *aio)
+nni_ipc_cancel_tx(nni_aio *aio, int rv)
{
nni_ipc_pipe *pipe = aio->a_prov_data;
nni_mtx_lock(&pipe->mtx);
+ if (pipe->user_txaio != aio) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
pipe->user_txaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->txaio, aio->a_result);
+ nni_aio_cancel(&pipe->txaio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
@@ -364,15 +374,20 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
}
static void
-nni_ipc_cancel_rx(nni_aio *aio)
+nni_ipc_cancel_rx(nni_aio *aio, int rv)
{
nni_ipc_pipe *pipe = aio->a_prov_data;
nni_mtx_lock(&pipe->mtx);
+ if (pipe->user_rxaio != aio) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
pipe->user_rxaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->rxaio, aio->a_result);
+ nni_aio_cancel(&pipe->rxaio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
@@ -552,10 +567,18 @@ done:
aio = ep->user_aio;
ep->user_aio = NULL;
- if ((aio == NULL) || (nni_aio_finish_pipe(aio, rv, pipe) != 0)) {
- if (pipe != NULL) {
- nni_ipc_pipe_fini(pipe);
- }
+ if ((aio != NULL) && (rv == 0)) {
+ NNI_ASSERT(pipe != NULL);
+ nni_aio_finish_pipe(aio, pipe);
+ return;
+ }
+
+ if (pipe != NULL) {
+ nni_ipc_pipe_fini(pipe);
+ }
+ if (aio != NULL) {
+ NNI_ASSERT(rv != 0);
+ nni_aio_finish_error(aio, rv);
}
}
@@ -570,15 +593,21 @@ nni_ipc_ep_cb(void *arg)
}
static void
-nni_ipc_cancel_ep(nni_aio *aio)
+nni_ipc_cancel_ep(nni_aio *aio, int rv)
{
nni_ipc_ep *ep = aio->a_prov_data;
+ NNI_ASSERT(rv != 0);
nni_mtx_lock(&ep->mtx);
+ if (ep->user_aio != aio) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(&ep->aio, aio->a_result);
+ nni_aio_cancel(&ep->aio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 28677f54..a42fa377 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -141,15 +141,20 @@ fail:
}
static void
-nni_tcp_cancel_nego(nni_aio *aio)
+nni_tcp_cancel_nego(nni_aio *aio, int rv)
{
nni_tcp_pipe *pipe = aio->a_prov_data;
nni_mtx_lock(&pipe->mtx);
+ if (pipe->user_negaio != aio) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
pipe->user_negaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->negaio, aio->a_result);
+ nni_aio_cancel(&pipe->negaio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
@@ -239,6 +244,7 @@ nni_tcp_pipe_recv_cb(void *arg)
nni_tcp_pipe *pipe = arg;
nni_aio * aio;
int rv;
+ nni_msg * msg;
nni_mtx_lock(&pipe->mtx);
@@ -257,7 +263,7 @@ nni_tcp_pipe_recv_cb(void *arg)
pipe->rxmsg = NULL;
}
pipe->user_rxaio = NULL;
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -274,14 +280,14 @@ nni_tcp_pipe_recv_cb(void *arg)
// the caller will shut down the pipe.
if (len > pipe->rcvmax) {
pipe->user_rxaio = NULL;
- nni_aio_finish(aio, NNG_EMSGSIZE, 0);
+ nni_aio_finish_error(aio, NNG_EMSGSIZE);
nni_mtx_unlock(&pipe->mtx);
return;
}
if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
pipe->user_rxaio = NULL;
- nni_aio_finish(aio, rv, 0);
+ nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -300,23 +306,28 @@ nni_tcp_pipe_recv_cb(void *arg)
// Otherwise we got a message read completely. Let the user know the
// good news.
pipe->user_rxaio = NULL;
- aio->a_msg = pipe->rxmsg;
+ msg = pipe->rxmsg;
pipe->rxmsg = NULL;
- nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg));
+ nni_aio_finish_msg(aio, msg);
nni_mtx_unlock(&pipe->mtx);
}
static void
-nni_tcp_cancel_tx(nni_aio *aio)
+nni_tcp_cancel_tx(nni_aio *aio, int rv)
{
nni_tcp_pipe *pipe = aio->a_prov_data;
nni_mtx_lock(&pipe->mtx);
+ if (pipe->user_txaio != aio) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
pipe->user_txaio = NULL;
nni_mtx_unlock(&pipe->mtx);
// cancel the underlying operation.
- nni_aio_cancel(&pipe->txaio, aio->a_result);
+ nni_aio_cancel(&pipe->txaio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
@@ -352,16 +363,21 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
}
static void
-nni_tcp_cancel_rx(nni_aio *aio)
+nni_tcp_cancel_rx(nni_aio *aio, int rv)
{
nni_tcp_pipe *pipe = aio->a_prov_data;
nni_mtx_lock(&pipe->mtx);
+ if (pipe->user_rxaio != aio) {
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
pipe->user_rxaio = NULL;
nni_mtx_unlock(&pipe->mtx);
// cancel the underlying operation.
- nni_aio_cancel(&pipe->rxaio, aio->a_result);
+ nni_aio_cancel(&pipe->rxaio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
@@ -619,10 +635,16 @@ done:
aio = ep->user_aio;
ep->user_aio = NULL;
- if ((aio == NULL) || (nni_aio_finish_pipe(aio, rv, pipe) != 0)) {
- if (pipe != NULL) {
- nni_tcp_pipe_fini(pipe);
- }
+ if ((aio != NULL) && (rv == 0)) {
+ nni_aio_finish_pipe(aio, pipe);
+ return;
+ }
+ if (pipe != NULL) {
+ nni_tcp_pipe_fini(pipe);
+ }
+ if (aio != NULL) {
+ NNI_ASSERT(rv != 0);
+ nni_aio_finish_error(aio, rv);
}
}
@@ -637,15 +659,20 @@ nni_tcp_ep_cb(void *arg)
}
static void
-nni_tcp_cancel_ep(nni_aio *aio)
+nni_tcp_cancel_ep(nni_aio *aio, int rv)
{
nni_tcp_ep *ep = aio->a_prov_data;
nni_mtx_lock(&ep->mtx);
+ if (ep->user_aio != aio) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(&ep->aio, aio->a_result);
+ nni_aio_cancel(&ep->aio, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
diff --git a/tests/compat_bug777.c b/tests/compat_bug777.c
index dc1f54f3..7bd6fce5 100644
--- a/tests/compat_bug777.c
+++ b/tests/compat_bug777.c
@@ -29,7 +29,6 @@ int main (int argc, const char *argv[])
int sb;
int sc1;
int sc2;
- char socket_address[128];
sb = test_socket (AF_SP, NN_PAIR);
test_bind (sb, "inproc://pair");
diff --git a/tests/inproc.c b/tests/inproc.c
index 0d1cafbf..2afb1ae0 100644
--- a/tests/inproc.c
+++ b/tests/inproc.c
@@ -8,9 +8,8 @@
//
#include "convey.h"
-#include "trantest.h"
#include "core/nng_impl.h"
-
+#include "trantest.h"
// Inproc tests.