diff options
| -rw-r--r-- | src/core/aio.c | 320 | ||||
| -rw-r--r-- | src/core/aio.h | 32 | ||||
| -rw-r--r-- | src/core/endpt.c | 11 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 103 | ||||
| -rw-r--r-- | src/core/taskq.c | 1 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 16 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 9 | ||||
| -rw-r--r-- | src/platform/posix/posix_resolv_gai.c | 3 | ||||
| -rw-r--r-- | src/platform/windows/win_impl.h | 9 | ||||
| -rw-r--r-- | src/platform/windows/win_iocp.c | 101 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 58 | ||||
| -rw-r--r-- | src/platform/windows/win_net.c | 66 | ||||
| -rw-r--r-- | src/platform/windows/win_resolv.c | 28 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 110 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 69 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 61 | ||||
| -rw-r--r-- | tests/compat_bug777.c | 1 | ||||
| -rw-r--r-- | tests/inproc.c | 3 |
18 files changed, 468 insertions, 533 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index e6157786..792b63f2 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -11,22 +11,50 @@ #include "core/nng_impl.h" #include <string.h> -enum nni_aio_flags { - NNI_AIO_INIT = 0x1, - NNI_AIO_DONE = 0x2, - NNI_AIO_FINI = 0x4, -}; - +static nni_mtx nni_aio_lk; // These are used for expiration. -static nni_mtx nni_aio_expire_mtx; static nni_cv nni_aio_expire_cv; static int nni_aio_expire_exit; -static nni_list nni_aio_expire_aios; static nni_thr nni_aio_expire_thr; -static nni_aio *nni_aio_expire_current; +static nni_list nni_aio_expire_aios; + +// Design notes. +// +// AIOs are only ever "completed" by the provider, which must call +// one of the nni_aio_finish variants. Until this occurs, the provider +// guarantees that the AIO is valid. The provider must guarantee that +// an AIO will be "completed" (with a call to nni_aio_finish & friends) +// exactly once. +// +// Note that the cancellation routine may be called by the framework +// several times. The framework (or the consumer) guarantees that the +// AIO will remain valid across these calls, so that the provider is +// free to examine the aio for list membership, etc. The provider must +// not call finish more than once though. +// +// A single lock, nni_aio_lk, is used to protect the flags on the AIO, +// as well as the expire list on the AIOs. We will not permit an AIO +// to be marked done if an expiration is outstanding. +// +// In order to synchronize with the expiration, we set a flag when we +// are going to cancel due to expiration, and then let the expiration +// thread dispatch the notification to the user (after ensuring that +// the provider is done with the aio.) This ensures that the completion +// task will be dispatch *exactly* once, and only after nothing in +// the provider or the framework is using it further. (The consumer +// will probably still be using, but if the consumer calls nni_aio_wait +// or nni_aio_stop, then the consumer will have exclusive access to it. +// Provided, of course, that the consumer does not reuse the aio for +// another operation in the callback.) +// +// In order to guard against aio reuse during teardown, we set a fini +// flag. Any attempt to initialize for a new operation after that point +// will fail and the caller will get NNG_ESTATE indicating this. The +// provider that calls nni_aio_start() MUST check the return value, and +// if it comes back nonzero (NNG_ESTATE) then it must simply discard the +// request and return. static void nni_aio_expire_add(nni_aio *); -static void nni_aio_expire_remove(nni_aio *); int nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) @@ -34,15 +62,11 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) int rv; memset(aio, 0, sizeof(*aio)); - if ((rv = nni_mtx_init(&aio->a_lk)) != 0) { - return (rv); - } - if ((rv = nni_cv_init(&aio->a_cv, &aio->a_lk)) != 0) { - nni_mtx_fini(&aio->a_lk); + if ((rv = nni_cv_init(&aio->a_cv, &nni_aio_lk)) != 0) { return (rv); } aio->a_expire = NNI_TIME_NEVER; - aio->a_flags = NNI_AIO_INIT; + aio->a_init = 1; nni_task_init(NULL, &aio->a_task, cb, arg); return (0); @@ -55,7 +79,6 @@ nni_aio_fini(nni_aio *aio) // At this point the AIO is done. nni_cv_fini(&aio->a_cv); - nni_mtx_fini(&aio->a_lk); if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) { NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs); @@ -71,30 +94,23 @@ nni_aio_fini(nni_aio *aio) void nni_aio_stop(nni_aio *aio) { - if ((aio->a_flags & NNI_AIO_INIT) == 0) { + if (!aio->a_init) { // Never initialized, so nothing should have happened. return; } - nni_mtx_lock(&aio->a_lk); - aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled - nni_mtx_unlock(&aio->a_lk); + nni_mtx_lock(&nni_aio_lk); + aio->a_fini = 1; + nni_mtx_unlock(&nni_aio_lk); nni_aio_cancel(aio, NNG_ECANCELED); - // Wait for any outstanding task to complete. We won't schedule - // new stuff because nni_aio_start will fail (due to AIO_FINI). - nni_task_wait(&aio->a_task); + nni_aio_wait(aio); } int nni_aio_result(nni_aio *aio) { - int rv; - - nni_mtx_lock(&aio->a_lk); - rv = aio->a_result; - nni_mtx_unlock(&aio->a_lk); - return (rv); + return (aio->a_result); } size_t @@ -106,131 +122,116 @@ nni_aio_count(nni_aio *aio) void nni_aio_wait(nni_aio *aio) { - nni_mtx_lock(&aio->a_lk); - while ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) == 0) { + nni_mtx_lock(&nni_aio_lk); + while ((aio->a_active) && (!aio->a_done)) { nni_cv_wait(&aio->a_cv); } - nni_mtx_unlock(&aio->a_lk); + nni_mtx_unlock(&nni_aio_lk); nni_task_wait(&aio->a_task); } int -nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) +nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) { - nni_mtx_lock(&aio->a_lk); - aio->a_flags &= ~NNI_AIO_DONE; - if (aio->a_flags & NNI_AIO_FINI) { + nni_mtx_lock(&nni_aio_lk); + if (aio->a_fini) { // We should not reschedule anything at this point. - nni_mtx_unlock(&aio->a_lk); + aio->a_active = 0; + aio->a_result = NNG_ECANCELED; + nni_mtx_unlock(&nni_aio_lk); return (NNG_ECANCELED); } + aio->a_done = 0; + aio->a_pend = 0; aio->a_result = 0; aio->a_count = 0; - aio->a_prov_cancel = cancel; + aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; + aio->a_active = 1; if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } - nni_mtx_unlock(&aio->a_lk); + nni_mtx_unlock(&nni_aio_lk); return (0); } +// nni_aio_cancel is called by a consumer which guarantees that the aio +// is still valid. void nni_aio_cancel(nni_aio *aio, int rv) { - void (*cancelfn)(nni_aio *); - - nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & NNI_AIO_DONE) { - // The operation already completed - so there's nothing - // left for us to do. - nni_mtx_unlock(&aio->a_lk); - return; - } - aio->a_flags |= NNI_AIO_DONE; - aio->a_result = rv; - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - - aio->a_refcnt++; - nni_mtx_unlock(&aio->a_lk); + nni_aio_cancelfn cancelfn; - // Guaraneed to just be a list operation. - nni_aio_expire_remove(aio); + nni_mtx_lock(&nni_aio_lk); + cancelfn = aio->a_prov_cancel; + nni_mtx_unlock(&nni_aio_lk); // Stop any I/O at the provider level. if (cancelfn != NULL) { - cancelfn(aio); + cancelfn(aio, rv); } - - nni_mtx_lock(&aio->a_lk); - - aio->a_refcnt--; - nni_cv_wake(&aio->a_cv); - - // These should have already been cleared by the cancel function. - aio->a_prov_data = NULL; - aio->a_prov_cancel = NULL; - - nni_task_dispatch(&aio->a_task); - nni_mtx_unlock(&aio->a_lk); } // I/O provider related functions. -int -nni_aio_finish(nni_aio *aio, int result, size_t count) +static void +nni_aio_finish_impl( + nni_aio *aio, int result, size_t count, void *pipe, nni_msg *msg) { - nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & NNI_AIO_DONE) { - // Operation already done (canceled or timed out?) - nni_mtx_unlock(&aio->a_lk); - return (NNG_ESTATE); - } - aio->a_flags |= NNI_AIO_DONE; + nni_mtx_lock(&nni_aio_lk); + NNI_ASSERT(aio->a_pend == 0); // provider only calls us *once* + + nni_list_node_remove(&aio->a_expire_node); + aio->a_pend = 1; aio->a_result = result; aio->a_count = count; aio->a_prov_cancel = NULL; aio->a_prov_data = NULL; + if (pipe) { + aio->a_pipe = pipe; + } + if (msg) { + aio->a_msg = msg; + } - // This is guaranteed to just be a list operation at this point, - // because done wasn't set. - nni_aio_expire_remove(aio); aio->a_expire = NNI_TIME_NEVER; - nni_cv_wake(&aio->a_cv); - nni_task_dispatch(&aio->a_task); - nni_mtx_unlock(&aio->a_lk); - return (0); + // If we are expiring, then we rely on the expiration thread to + // complete this; we must not because the expiration thread is + // still holding the reference. + if (!aio->a_expiring) { + aio->a_done = 1; + nni_cv_wake(&aio->a_cv); + nni_task_dispatch(&aio->a_task); + } + nni_mtx_unlock(&nni_aio_lk); } -int -nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe) +void +nni_aio_finish(nni_aio *aio, int result, size_t count) { - nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & NNI_AIO_DONE) { - // Operation already done (canceled or timed out?) - nni_mtx_unlock(&aio->a_lk); - return (NNG_ESTATE); - } - aio->a_flags |= NNI_AIO_DONE; + nni_aio_finish_impl(aio, result, count, NULL, NULL); +} - aio->a_result = result; - aio->a_count = 0; - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; - aio->a_pipe = pipe; +void +nni_aio_finish_error(nni_aio *aio, int result) +{ + nni_aio_finish_impl(aio, result, 0, NULL, NULL); +} - // This is guaranteed to just be a list operation at this point, - // because done wasn't set. - nni_aio_expire_remove(aio); - aio->a_expire = NNI_TIME_NEVER; - nni_cv_wake(&aio->a_cv); +void +nni_aio_finish_pipe(nni_aio *aio, void *pipe) +{ + NNI_ASSERT(pipe != NULL); + nni_aio_finish_impl(aio, 0, 0, pipe, NULL); +} - nni_task_dispatch(&aio->a_task); - nni_mtx_unlock(&aio->a_lk); - return (0); +void +nni_aio_finish_msg(nni_aio *aio, nni_msg *msg) +{ + NNI_ASSERT(msg != NULL); + nni_aio_finish_impl(aio, 0, nni_msg_len(msg), NULL, msg); } void @@ -261,12 +262,9 @@ nni_aio_list_active(nni_aio *aio) static void nni_aio_expire_add(nni_aio *aio) { - nni_mtx * mtx = &nni_aio_expire_mtx; - nni_cv * cv = &nni_aio_expire_cv; nni_list *list = &nni_aio_expire_aios; nni_aio * naio; - nni_mtx_lock(mtx); // This is a reverse walk of the list. We're more likely to find // a match at the end of the list. for (naio = nni_list_last(list); naio != NULL; @@ -280,105 +278,69 @@ nni_aio_expire_add(nni_aio *aio) // This has the shortest time, so insert at the start. nni_list_prepend(list, aio); // And, as we are the latest, kick the thing. - nni_cv_wake(cv); + nni_cv_wake(&nni_aio_expire_cv); } - nni_mtx_unlock(mtx); -} - -static void -nni_aio_expire_remove(nni_aio *aio) -{ - nni_mtx * mtx = &nni_aio_expire_mtx; - nni_cv * cv = &nni_aio_expire_cv; - nni_list *list = &nni_aio_expire_aios; - - nni_mtx_lock(mtx); - if (nni_list_active(list, aio)) { - nni_list_remove(list, aio); - } - while (aio == nni_aio_expire_current) { - nni_cv_wait(cv); - } - nni_mtx_unlock(mtx); } static void nni_aio_expire_loop(void *arg) { - nni_mtx * mtx = &nni_aio_expire_mtx; - nni_cv * cv = &nni_aio_expire_cv; - nni_list *aios = &nni_aio_expire_aios; - nni_aio * aio; - nni_time now; - - void (*cancelfn)(nni_aio *); + nni_list * aios = &nni_aio_expire_aios; + nni_aio * aio; + nni_time now; + nni_aio_cancelfn cancelfn; NNI_ARG_UNUSED(arg); for (;;) { - nni_mtx_lock(mtx); - - // If we are resuming this loop after processing an AIO, - // note that we are done with it, and wake anyone waiting - // for that to clear up. - if ((aio = nni_aio_expire_current) != NULL) { - nni_aio_expire_current = NULL; - nni_cv_wake(cv); - } + nni_mtx_lock(&nni_aio_lk); if (nni_aio_expire_exit) { - nni_mtx_unlock(mtx); + nni_mtx_unlock(&nni_aio_lk); return; } if ((aio = nni_list_first(aios)) == NULL) { - nni_cv_wait(cv); - nni_mtx_unlock(mtx); + nni_cv_wait(&nni_aio_expire_cv); + nni_mtx_unlock(&nni_aio_lk); continue; } now = nni_clock(); if (now < aio->a_expire) { // Unexpired; the list is ordered, so we just wait. - nni_cv_until(cv, aio->a_expire); - nni_mtx_unlock(mtx); + nni_cv_until(&nni_aio_expire_cv, aio->a_expire); + nni_mtx_unlock(&nni_aio_lk); continue; } // This aio's time has come. Expire it, canceling any // outstanding I/O. - nni_list_remove(aios, aio); - nni_aio_expire_current = aio; - nni_mtx_unlock(mtx); - - cancelfn = NULL; - nni_mtx_lock(&aio->a_lk); - if ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) != 0) { - nni_mtx_unlock(&aio->a_lk); - continue; - } + // Mark it as expiring. This acts as a hold on + // the aio, similar to the consumers. The actual taskq + // dispatch on completion won't occur until this is cleared, + // and the done flag won't be set either. + aio->a_expiring = 1; + cancelfn = aio->a_prov_cancel; - aio->a_flags |= NNI_AIO_DONE; - - aio->a_result = NNG_ETIMEDOUT; - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - nni_cv_wake(&aio->a_cv); - nni_mtx_unlock(&aio->a_lk); - - // Cancel any outstanding activity. + // Cancel any outstanding activity. This is always non-NULL + // for a valid aio, and becomes NULL only when an AIO is + // already being canceled or finished. if (cancelfn != NULL) { - cancelfn(aio); + nni_mtx_unlock(&nni_aio_lk); + cancelfn(aio, NNG_ETIMEDOUT); + nni_mtx_lock(&nni_aio_lk); } - // Arguably we could avoid dispatching, and execute the - // callback inline here as we are already on a separate - // thread. But keeping it separate is clearer, and more - // consistent with other uses. And this should not be a - // hot code path. + NNI_ASSERT(aio->a_pend); // nni_aio_finish was run + NNI_ASSERT(aio->a_prov_cancel == NULL); + aio->a_expiring = 0; + aio->a_done = 1; + nni_cv_wake(&aio->a_cv); nni_task_dispatch(&aio->a_task); + nni_mtx_unlock(&nni_aio_lk); } } @@ -386,7 +348,7 @@ int nni_aio_sys_init(void) { int rv; - nni_mtx *mtx = &nni_aio_expire_mtx; + nni_mtx *mtx = &nni_aio_lk; nni_cv * cv = &nni_aio_expire_cv; nni_thr *thr = &nni_aio_expire_thr; @@ -409,7 +371,7 @@ fail: void nni_aio_sys_fini(void) { - nni_mtx *mtx = &nni_aio_expire_mtx; + nni_mtx *mtx = &nni_aio_lk; nni_cv * cv = &nni_aio_expire_cv; nni_thr *thr = &nni_aio_expire_thr; diff --git a/src/core/aio.h b/src/core/aio.h index 31a54f12..0f41c01f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -18,6 +18,8 @@ typedef struct nni_aio_ops nni_aio_ops; +typedef void (*nni_aio_cancelfn)(nni_aio *, int); + // An nni_aio is an async I/O handle. struct nni_aio { int a_result; // Result code (nng_errno) @@ -25,10 +27,14 @@ struct nni_aio { nni_time a_expire; // These fields are private to the aio framework. - nni_mtx a_lk; nni_cv a_cv; - unsigned a_flags; - int a_refcnt; // prevent use-after-free + unsigned a_init : 1; // initialized flag + unsigned a_fini : 1; // shutting down (no new operations) + unsigned a_done : 1; // operation has completed + unsigned a_pend : 1; // completion routine pending + unsigned a_active : 1; // aio was started + unsigned a_expiring : 1; // expiration callback in progress + unsigned a_pad : 27; // ensure 32-bit alignment nni_task a_task; // Read/write operations. @@ -47,9 +53,9 @@ struct nni_aio { int a_naddrs; // Provider-use fields. - void (*a_prov_cancel)(nni_aio *); - void * a_prov_data; - nni_list_node a_prov_node; + nni_aio_cancelfn a_prov_cancel; + void * a_prov_data; + nni_list_node a_prov_node; // Expire node. nni_list_node a_expire_node; @@ -106,21 +112,17 @@ extern void nni_aio_list_remove(nni_aio *); extern int nni_aio_list_active(nni_aio *); // nni_aio_finish is called by the provider when an operation is complete. -// The provider gives the result code (0 for success, an NNG errno otherwise), -// and the amount of data transferred (if any). If the return code is -// non-zero, it indicates that the operation failed (usually because the aio -// was already canceled.) This is important for providers that need to -// prevent resources (new pipes for example) from accidentally leaking -// during close operations. -extern int nni_aio_finish(nni_aio *, int, size_t); -extern int nni_aio_finish_pipe(nni_aio *, int, void *); +extern void nni_aio_finish(nni_aio *, int, size_t); +extern void nni_aio_finish_error(nni_aio *, int); +extern void nni_aio_finish_pipe(nni_aio *, void *); +extern void nni_aio_finish_msg(nni_aio *, nni_msg *); // nni_aio_cancel is used to cancel an operation. Any pending I/O or // timeouts are canceled if possible, and the callback will be returned // with the indicated result (NNG_ECLOSED or NNG_ECANCELED is recommended.) extern void nni_aio_cancel(nni_aio *, int rv); -extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *); +extern int nni_aio_start(nni_aio *, nni_aio_cancelfn, void *); // nni_aio_stop is used to abort all further operations on the AIO. // When this is executed, no further operations or callbacks will be diff --git a/src/core/endpt.c b/src/core/endpt.c index c0f9d399..aa4c5a31 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -175,6 +175,7 @@ nni_ep_close(nni_ep *ep) nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED); nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED); nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED); + nni_aio_cancel(&ep->ep_backoff, NNG_ECLOSED); // Stop the underlying transport. ep->ep_ops.ep_close(ep->ep_data); @@ -188,6 +189,7 @@ nni_ep_reap(nni_ep *ep) nni_aio_stop(&ep->ep_acc_aio); nni_aio_stop(&ep->ep_con_aio); nni_aio_stop(&ep->ep_con_syn); + nni_aio_stop(&ep->ep_backoff); // Take us off the sock list. nni_sock_ep_remove(ep->ep_sock, ep); @@ -233,6 +235,13 @@ nni_ep_stop(nni_ep *ep) } static void +nni_ep_backoff_cancel(nni_aio *aio, int rv) +{ + // The only way this ever gets "finished", is via cancellation. + nni_aio_finish_error(aio, rv); +} + +static void nni_ep_backoff_start(nni_ep *ep) { nni_duration backoff; @@ -255,7 +264,7 @@ nni_ep_backoff_start(nni_ep *ep) // random number, but this really doesn't matter. ep->ep_backoff.a_expire = nni_clock() + (nni_random() % backoff); - nni_aio_start(&ep->ep_backoff, NULL, ep); + nni_aio_start(&ep->ep_backoff, nni_ep_backoff_cancel, ep); } static void diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index d98c68be..2ebc4927 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -121,26 +121,17 @@ nni_msgq_fini(nni_msgq *mq) NNI_FREE_STRUCT(mq); } -static void -nni_msgq_finish(nni_aio *aio, int rv) -{ - nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, 0); -} - void nni_msgq_set_get_error(nni_msgq *mq, int error) { - nni_aio *naio; nni_aio *aio; // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); if (error != 0) { - naio = nni_list_first(&mq->mq_aio_getq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_getq, aio); - nni_msgq_finish(aio, error); + while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, error); } } mq->mq_geterr = error; @@ -150,16 +141,14 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) void nni_msgq_set_put_error(nni_msgq *mq, int error) { - nni_aio *naio; nni_aio *aio; // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); if (error != 0) { - naio = nni_list_first(&mq->mq_aio_putq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_putq, aio); - nni_msgq_finish(aio, error); + while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, error); } } mq->mq_puterr = error; @@ -169,21 +158,15 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) void nni_msgq_set_error(nni_msgq *mq, int error) { - nni_aio *naio; nni_aio *aio; // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); if (error != 0) { - naio = nni_list_first(&mq->mq_aio_getq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_getq, aio); - nni_msgq_finish(aio, error); - } - naio = nni_list_first(&mq->mq_aio_putq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_putq, aio); - nni_msgq_finish(aio, error); + while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) || + ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, error); } } mq->mq_puterr = error; @@ -207,11 +190,12 @@ nni_msgq_run_putq(nni_msgq *mq) // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - raio->a_msg = msg; waio->a_msg = NULL; - nni_msgq_finish(raio, 0); - nni_msgq_finish(waio, 0); + nni_aio_list_remove(raio); + nni_aio_list_remove(waio); + nni_aio_finish(waio, 0, len); + nni_aio_finish_msg(raio, msg); continue; } @@ -224,7 +208,7 @@ nni_msgq_run_putq(nni_msgq *mq) } mq->mq_len++; waio->a_msg = NULL; - nni_msgq_finish(waio, 0); + nni_aio_finish(waio, 0, len); continue; } @@ -243,14 +227,14 @@ nni_msgq_run_getq(nni_msgq *mq) while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { // If anything is waiting in the queue, get it first. if (mq->mq_len != 0) { - nni_list_remove(&mq->mq_aio_getq, raio); msg = mq->mq_msgs[mq->mq_get++]; if (mq->mq_get == mq->mq_alloc) { mq->mq_get = 0; } mq->mq_len--; raio->a_msg = msg; - nni_msgq_finish(raio, 0); + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); continue; } @@ -258,9 +242,11 @@ nni_msgq_run_getq(nni_msgq *mq) if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { msg = waio->a_msg; waio->a_msg = NULL; - raio->a_msg = msg; - nni_msgq_finish(raio, 0); - nni_msgq_finish(waio, 0); + nni_aio_list_remove(waio); + nni_aio_list_remove(raio); + + nni_aio_finish(waio, 0, nni_msg_len(msg)); + nni_aio_finish_msg(raio, msg); continue; } @@ -300,16 +286,15 @@ nni_msgq_run_notify(nni_msgq *mq) } static void -nni_msgq_cancel(nni_aio *aio) +nni_msgq_cancel(nni_aio *aio, int rv) { nni_msgq *mq = aio->a_prov_data; - if (mq == NULL) { - return; - } - nni_mtx_lock(&mq->mq_lock); - nni_aio_list_remove(aio); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } nni_mtx_unlock(&mq->mq_lock); } @@ -346,12 +331,12 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) return; } if (mq->mq_closed) { - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&mq->mq_lock); return; } if (mq->mq_puterr) { - nni_aio_finish(aio, mq->mq_puterr, 0); + nni_aio_finish_error(aio, mq->mq_puterr); nni_mtx_unlock(&mq->mq_lock); return; } @@ -372,12 +357,12 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) return; } if (mq->mq_closed) { - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&mq->mq_lock); return; } if (mq->mq_geterr) { - nni_aio_finish(aio, mq->mq_geterr, 0); + nni_aio_finish_error(aio, mq->mq_geterr); nni_mtx_unlock(&mq->mq_lock); return; } @@ -439,9 +424,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { nni_list_remove(&mq->mq_aio_getq, raio); - raio->a_msg = msg; - - nni_aio_finish(raio, 0, len); + nni_aio_finish_msg(raio, msg); nni_mtx_unlock(&mq->mq_lock); return (0); } @@ -512,13 +495,16 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) break; } } - // If we timedout, free any remaining messages in the queue. - // Also complete the putq as NNG_ECLOSED. + // Timed out or writers drained. + + // Complete the putq as NNG_ECLOSED. while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); } + + // Free any remaining messages in the queue. while (mq->mq_len > 0) { nni_msg *msg = mq->mq_msgs[mq->mq_get++]; if (mq->mq_get > mq->mq_alloc) { @@ -551,17 +537,10 @@ nni_msgq_close(nni_msgq *mq) // Let all pending blockers know we are closing the queue. naio = nni_list_first(&mq->mq_aio_getq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_getq, aio); - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ECLOSED, 0); - } - - naio = nni_list_first(&mq->mq_aio_putq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_putq, aio); + while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) || + ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) { nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); } nni_mtx_unlock(&mq->mq_lock); diff --git a/src/core/taskq.c b/src/core/taskq.c index 5fbcdb33..14b04085 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -158,7 +158,6 @@ void nni_taskq_fini(nni_taskq *tq) { int i; - int busy; // First drain the taskq completely. This is necessary since some // tasks that are presently running may need to schedule additional diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 7a91b4ec..a8c8395e 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -46,13 +46,17 @@ struct nni_posix_epdesc { }; static void -nni_posix_epdesc_cancel(nni_aio *aio) +nni_posix_epdesc_cancel(nni_aio *aio, int rv) { nni_posix_epdesc *ed = aio->a_prov_data; + NNI_ASSERT(rv != 0); nni_mtx_lock(&ed->mtx); - nni_aio_list_remove(aio); - NNI_ASSERT(aio->a_pipe == NULL); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + NNI_ASSERT(aio->a_pipe == NULL); + nni_aio_finish_error(aio, rv); + } nni_mtx_unlock(&ed->mtx); } @@ -70,8 +74,10 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) (void) close(newfd); } } - if ((nni_aio_finish_pipe(aio, rv, pd) != 0) && (pd != NULL)) { - nni_posix_pipedesc_fini(pd); + if (rv != 0) { + nni_aio_finish_error(aio, rv); + } else { + nni_aio_finish_pipe(aio, pd); } } diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index 5dd77dcb..bd74e0c0 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -39,7 +39,7 @@ static void nni_posix_pipedesc_finish(nni_aio *aio, int rv) { nni_aio_list_remove(aio); - (void) nni_aio_finish(aio, rv, aio->a_count); + nni_aio_finish(aio, rv, aio->a_count); } static void @@ -233,12 +233,15 @@ nni_posix_pipedesc_close(nni_posix_pipedesc *pd) } static void -nni_posix_pipedesc_cancel(nni_aio *aio) +nni_posix_pipedesc_cancel(nni_aio *aio, int rv) { nni_posix_pipedesc *pd = aio->a_prov_data; nni_mtx_lock(&pd->mtx); - nni_aio_list_remove(aio); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } nni_mtx_unlock(&pd->mtx); } diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 5852f34c..09d40b94 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -62,7 +62,7 @@ nni_posix_resolv_finish(nni_posix_resolv_item *item, int rv) } static void -nni_posix_resolv_cancel(nni_aio *aio) +nni_posix_resolv_cancel(nni_aio *aio, int rv) { nni_posix_resolv_item *item; @@ -75,6 +75,7 @@ nni_posix_resolv_cancel(nni_aio *aio) nni_mtx_unlock(&nni_posix_resolv_mtx); nni_task_cancel(&item->task); NNI_FREE_STRUCT(item); + nni_aio_finish_error(aio, rv); } static int diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index a77fcf0b..a2700485 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -65,17 +66,13 @@ struct nni_win_event { nni_aio * aio; nni_mtx mtx; nni_cv cv; - int flags; + unsigned run : 1; + unsigned fini : 1; int count; int status; nni_win_event_ops ops; }; -enum nni_win_event_flags { - NNI_WIN_EVENT_RUNNING = 1, - NNI_WIN_EVENT_ABORT = 2, -}; - extern int nni_win_error(int); extern int nni_win_event_init(nni_win_event *, nni_win_event_ops *, void *); diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index df0357c8..c4cdcb8a 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -15,7 +15,7 @@ #define NNI_WIN_IOCP_NTHREADS 4 #include <stdio.h> -// Windows IO Completion Port support. We basically creaet a single +// Windows IO Completion Port support. We basically create a single // IO completion port, then start threads on it. Handles are added // to the port on an as needed basis. We use a single IO completion // port for pretty much everything. @@ -25,6 +25,18 @@ static nni_thr nni_win_iocp_thrs[NNI_WIN_IOCP_NTHREADS]; static nni_mtx nni_win_iocp_mtx; static void +nni_win_event_finish(nni_win_event *evt, nni_aio *aio) +{ + evt->run = 0; + if (aio != NULL) { + evt->ops.wev_finish(evt, aio); + } + if (evt->fini) { + nni_cv_wake(&evt->cv); + } +} + +static void nni_win_iocp_handler(void *arg) { HANDLE iocp; @@ -59,42 +71,30 @@ nni_win_iocp_handler(void *arg) if (ok) { rv = ERROR_SUCCESS; - } else { - rv = GetLastError(); + } else if (evt->status == 0) { + evt->status = nni_win_error(GetLastError()); } - aio = evt->aio; - evt->aio = NULL; - evt->status = rv; - evt->count = cnt; - - // Aborted operations don't get the finish callback done. - // All others do. - evt->flags &= ~NNI_WIN_EVENT_RUNNING; - if (evt->flags & NNI_WIN_EVENT_ABORT) { - nni_cv_wake(&evt->cv); - } else if ((rv != ERROR_OPERATION_ABORTED) && (aio != NULL)) { - evt->ops.wev_finish(evt, aio); - } + aio = evt->aio; + evt->aio = NULL; + evt->count = cnt; + + nni_win_event_finish(evt, aio); nni_mtx_unlock(&evt->mtx); } } static void -nni_win_event_cancel(nni_aio *aio) +nni_win_event_cancel(nni_aio *aio, int rv) { nni_win_event *evt = aio->a_prov_data; nni_mtx_lock(&evt->mtx); - evt->flags |= NNI_WIN_EVENT_ABORT; - evt->aio = NULL; - - // Use provider specific cancellation. - evt->ops.wev_cancel(evt); + if (evt->aio == aio) { + evt->status = rv; - // Wait for everything to stop referencing this. - while (evt->flags & NNI_WIN_EVENT_RUNNING) { - nni_cv_wait(&evt->cv); + // Use provider specific cancellation. + evt->ops.wev_cancel(evt); } nni_mtx_unlock(&evt->mtx); } @@ -107,28 +107,28 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) // The lock is held. // Abort operation -- no further activity. - if (evt->flags & NNI_WIN_EVENT_ABORT) { + if (evt->fini) { + evt->run = 0; + nni_cv_wake(&evt->cv); return; } - evt->status = ERROR_SUCCESS; + evt->status = 0; evt->count = 0; if (!ResetEvent(evt->olpd.hEvent)) { - evt->status = GetLastError(); + evt->status = nni_win_error(GetLastError()); evt->count = 0; - - evt->ops.wev_finish(evt, aio); + nni_win_event_finish(evt, aio); return; } evt->aio = aio; - evt->flags |= NNI_WIN_EVENT_RUNNING; + evt->run = 1; if (evt->ops.wev_start(evt, aio) != 0) { // Start completed synchronously. It will have stored // the count and status in the evt. - evt->flags &= ~NNI_WIN_EVENT_RUNNING; evt->aio = NULL; - evt->ops.wev_finish(evt, aio); + nni_win_event_finish(evt, aio); } } @@ -154,20 +154,10 @@ nni_win_event_complete(nni_win_event *evt, int cnt) void nni_win_event_close(nni_win_event *evt) { - nni_aio *aio; - if (evt->ptr != NULL) { nni_mtx_lock(&evt->mtx); - evt->flags |= NNI_WIN_EVENT_ABORT; + evt->status = NNG_ECLOSED; evt->ops.wev_cancel(evt); - if ((aio = evt->aio) != NULL) { - evt->aio = NULL; - // We really don't care if we transferred data or not. - // The caller indicates they have closed the pipe. - evt->status = ERROR_INVALID_HANDLE; - evt->count = 0; - evt->ops.wev_finish(evt, aio); - } nni_mtx_unlock(&evt->mtx); } } @@ -195,28 +185,27 @@ nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr) ((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) { return (rv); // NB: This will never happen on Windows. } - evt->ops = *ops; - evt->aio = NULL; - evt->ptr = ptr; + evt->ops = *ops; + evt->aio = NULL; + evt->ptr = ptr; + evt->fini = 0; + evt->run = 0; return (0); } void nni_win_event_fini(nni_win_event *evt) { - nni_aio *aio; - if (evt->ptr != NULL) { nni_mtx_lock(&evt->mtx); - if ((aio = evt->aio) != NULL) { - evt->flags |= NNI_WIN_EVENT_ABORT; - evt->aio = NULL; - // Use provider specific cancellation. - evt->ops.wev_cancel(evt); - } + evt->fini = 1; + + // Use provider specific cancellation. + evt->ops.wev_cancel(evt); + // Wait for everything to stop referencing this. - while (evt->flags & NNI_WIN_EVENT_RUNNING) { + while (evt->run) { nni_cv_wait(&evt->cv); } diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index bd9ce26d..c9eb20ec 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -65,7 +65,7 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); if (pipe->p == INVALID_HANDLE_VALUE) { - evt->status = ERROR_INVALID_HANDLE; + evt->status = NNG_ECLOSED; evt->count = 0; return (1); } @@ -92,7 +92,7 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) } if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) { // Synchronous failure. - evt->status = rv; + evt->status = nni_win_error(rv); evt->count = 0; return (1); } @@ -108,13 +108,7 @@ nni_win_ipc_pipe_cancel(nni_win_event *evt) { nni_plat_ipc_pipe *pipe = evt->ptr; - if (CancelIoEx(pipe->p, &evt->olpd)) { - DWORD cnt; - - // If we canceled, make sure that we've completely - // finished with the overlapped. - GetOverlappedResult(pipe->p, &evt->olpd, &cnt, TRUE); - } + CancelIoEx(pipe->p, &evt->olpd); } static void @@ -146,7 +140,7 @@ nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio) } // All done; hopefully successfully. - nni_aio_finish(aio, nni_win_error(rv), aio->a_count); + nni_aio_finish(aio, rv, aio->a_count); } static int @@ -294,7 +288,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) HANDLE newp, oldp; if ((rv = evt->status) != 0) { - nni_aio_finish(aio, nni_win_error(rv), 0); + nni_aio_finish_error(aio, rv); return; } @@ -308,7 +302,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) // We connected, but as we cannot get a new pipe, // we have to disconnect the old one. DisconnectNamedPipe(ep->p); - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); return; } if ((rv = nni_win_iocp_register(newp)) != 0) { @@ -317,7 +311,7 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) // And discard the half-baked new one. DisconnectNamedPipe(newp); (void) CloseHandle(newp); - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); return; } @@ -329,14 +323,11 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) // the old one, since failed to be able to use it. DisconnectNamedPipe(oldp); (void) CloseHandle(oldp); - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); return; } - // What if the pipe is already finished? - if (nni_aio_finish_pipe(aio, 0, pipe) != 0) { - nni_plat_ipc_pipe_fini(pipe); - } + nni_aio_finish_pipe(aio, pipe); } static void @@ -344,13 +335,7 @@ nni_win_ipc_acc_cancel(nni_win_event *evt) { nni_plat_ipc_ep *ep = evt->ptr; - if (CancelIoEx(ep->p, &evt->olpd)) { - DWORD cnt; - - // If we canceled, make sure that we've completely - // finished with the overlapped. - GetOverlappedResult(ep->p, &evt->olpd, &cnt, TRUE); - } + (void) CancelIoEx(ep->p, &evt->olpd); // Just to be sure. (void) DisconnectNamedPipe(ep->p); } @@ -376,7 +361,7 @@ nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio) default: // Fast-fail (synchronous). - evt->status = rv; + evt->status = nni_win_error(rv); evt->count = 0; return (1); } @@ -468,9 +453,7 @@ nni_win_ipc_conn_thr(void *arg) ((rv = nni_win_iocp_register(p)) != 0)) { goto fail; } - if (rv = nni_aio_finish_pipe(aio, 0, pipe) != 0) { - nni_plat_ipc_pipe_fini(pipe); - } + nni_aio_finish_pipe(aio, pipe); continue; fail: @@ -481,7 +464,7 @@ nni_win_ipc_conn_thr(void *arg) if (pipe != NULL) { nni_plat_ipc_pipe_fini(pipe); } - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); } if (nni_list_empty(&w->waiters)) { @@ -496,16 +479,19 @@ nni_win_ipc_conn_thr(void *arg) } static void -nni_win_ipc_conn_cancel(nni_aio *aio) +nni_win_ipc_conn_cancel(nni_aio *aio, int rv) { nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; nni_plat_ipc_ep * ep = aio->a_prov_data; nni_mtx_lock(&w->mtx); - ep->con_aio = NULL; - if (nni_list_active(&w->waiters, ep)) { - nni_list_remove(&w->waiters, ep); - nni_cv_wake(&w->cv); + if (aio == ep->con_aio) { + ep->con_aio = NULL; + if (nni_list_active(&w->waiters, ep)) { + nni_list_remove(&w->waiters, ep); + nni_cv_wake(&w->cv); + } + nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&w->mtx); } @@ -556,7 +542,7 @@ nni_plat_ipc_ep_close(nni_plat_ipc_ep *ep) } if ((aio = ep->con_aio) != NULL) { ep->con_aio = NULL; - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); } nni_mtx_unlock(&w->mtx); break; diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c index 633dd256..df6275ff 100644 --- a/src/platform/windows/win_net.c +++ b/src/platform/windows/win_net.c @@ -144,7 +144,7 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) } if ((s = pipe->s) == INVALID_SOCKET) { - evt->status = ERROR_INVALID_HANDLE; + evt->status = NNG_ECLOSED; evt->count = 0; return (1); } @@ -163,7 +163,7 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) if ((rv == SOCKET_ERROR) && ((rv = GetLastError()) != ERROR_IO_PENDING)) { // Synchronous failure. - evt->status = rv; + evt->status = nni_win_error(rv); evt->count = 0; return (1); } @@ -179,13 +179,7 @@ nni_win_tcp_pipe_cancel(nni_win_event *evt) { nni_plat_tcp_pipe *pipe = evt->ptr; - if (CancelIoEx((HANDLE) pipe->s, &evt->olpd)) { - DWORD cnt; - - // If we canceled, make sure that we've completely - // finished with the overlapped. - GetOverlappedResult((HANDLE) pipe->s, &evt->olpd, &cnt, TRUE); - } + (void) CancelIoEx((HANDLE) pipe->s, &evt->olpd); } static void @@ -228,7 +222,7 @@ nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio) } // All done; hopefully successfully. - nni_aio_finish(aio, nni_win_error(rv), aio->a_count); + nni_aio_finish(aio, rv, aio->a_count); } static int @@ -507,12 +501,8 @@ nni_win_tcp_acc_cancel(nni_win_event *evt) nni_plat_tcp_ep *ep = evt->ptr; SOCKET s = ep->s; - if ((s != INVALID_SOCKET) && CancelIoEx((HANDLE) s, &evt->olpd)) { - DWORD cnt; - - // If we canceled, make sure that we've completely - // finished with the overlapped. - GetOverlappedResult((HANDLE) s, &evt->olpd, &cnt, TRUE); + if (s != INVALID_SOCKET) { + CancelIoEx((HANDLE) s, &evt->olpd); } } @@ -531,22 +521,15 @@ nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio) return; } - if ((rv = evt->status) != 0) { - closesocket(s); - nni_aio_finish(aio, nni_win_error(rv), 0); - return; - } - - if (((rv = nni_win_iocp_register((HANDLE) s)) != 0) || + if (((rv = evt->status) != 0) || + ((rv = nni_win_iocp_register((HANDLE) s)) != 0) || ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { closesocket(s); - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); return; } - if (nni_aio_finish_pipe(aio, 0, pipe) != 0) { - nni_plat_tcp_pipe_fini(pipe); - } + nni_aio_finish_pipe(aio, pipe); } static int @@ -559,7 +542,7 @@ nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) acc_s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); if (acc_s == INVALID_SOCKET) { - evt->status = GetLastError(); + evt->status = nni_win_error(GetLastError()); evt->count = 0; return (1); } @@ -575,7 +558,7 @@ nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) default: // Fast-fail (synchronous). - evt->status = rv; + evt->status = nni_win_error(rv); evt->count = 0; return (1); } @@ -599,12 +582,8 @@ nni_win_tcp_con_cancel(nni_win_event *evt) nni_plat_tcp_ep *ep = evt->ptr; SOCKET s = ep->s; - if ((s != INVALID_SOCKET) && CancelIoEx((HANDLE) s, &evt->olpd)) { - DWORD cnt; - - // If we canceled, make sure that we've completely - // finished with the overlapped. - GetOverlappedResult((HANDLE) s, &evt->olpd, &cnt, TRUE); + if (s != INVALID_SOCKET) { + CancelIoEx((HANDLE) s, &evt->olpd); } } @@ -619,19 +598,14 @@ nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio) s = ep->s; ep->s = INVALID_SOCKET; - if ((rv = evt->status) != 0) { - closesocket(s); - nni_aio_finish(aio, nni_win_error(rv), 0); - return; - } - // The socket was already registere with the IOCP. - if ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0) { + if (((rv = evt->status) != 0) || + ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { // The new pipe is already fine for us. Discard // the old one, since failed to be able to use it. closesocket(s); - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); return; } @@ -650,7 +624,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); if (s == INVALID_SOCKET) { - evt->status = GetLastError(); + evt->status = nni_win_error(GetLastError()); evt->count = 0; return (1); } @@ -667,7 +641,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) len = ep->remlen; } if (bind(s, (struct sockaddr *) &bss, len) < 0) { - evt->status = GetLastError(); + evt->status = nni_win_error(GetLastError()); evt->count = 0; closesocket(s); return (1); @@ -687,7 +661,7 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) if ((rv = GetLastError()) != ERROR_IO_PENDING) { closesocket(s); ep->s = INVALID_SOCKET; - evt->status = rv; + evt->status = nni_win_error(rv); evt->count = 0; return (1); } diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index 44d00c34..a01dc123 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -30,13 +30,13 @@ static nni_mtx nni_win_resolv_mtx; typedef struct nni_win_resolv_item nni_win_resolv_item; struct nni_win_resolv_item { - int family; - int passive; - const char * name; - const char * serv; - int proto; - nni_aio * aio; - nni_taskq_ent tqe; + int family; + int passive; + const char *name; + const char *serv; + int proto; + nni_aio * aio; + nni_task task; }; static void @@ -50,7 +50,7 @@ nni_win_resolv_finish(nni_win_resolv_item *item, int rv) } static void -nni_win_resolv_cancel(nni_aio *aio) +nni_win_resolv_cancel(nni_aio *aio, int rv) { nni_win_resolv_item *item; @@ -61,8 +61,9 @@ nni_win_resolv_cancel(nni_aio *aio) } aio->a_prov_data = NULL; nni_mtx_unlock(&nni_win_resolv_mtx); - nni_taskq_cancel(nni_win_resolv_tq, &item->tqe); + nni_task_cancel(&item->task); NNI_FREE_STRUCT(item); + nni_aio_finish_error(aio, rv); } static int @@ -209,7 +210,8 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family, return; } - nni_taskq_ent_init(&item->tqe, nni_win_resolv_task, item); + nni_task_init( + nni_win_resolv_tq, &item->task, nni_win_resolv_task, item); switch (family) { case NNG_AF_INET: @@ -236,11 +238,7 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family, NNI_FREE_STRUCT(item); return; } - if ((rv = nni_taskq_dispatch(nni_win_resolv_tq, &item->tqe)) != 0) { - nni_win_resolv_finish(item, rv); - nni_mtx_unlock(&nni_win_resolv_mtx); - return; - } + nni_task_dispatch(&item->task); nni_mtx_unlock(&nni_win_resolv_mtx); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 226a31ce..9cc43ad7 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -241,27 +241,24 @@ static void nni_inproc_conn_finish(nni_aio *aio, int rv) { nni_inproc_ep *ep = aio->a_endpt; + void * pipe; - if (rv != 0) { - if (aio->a_pipe != NULL) { - nni_inproc_pipe_fini(aio->a_pipe); - aio->a_pipe = NULL; - } - } nni_aio_list_remove(aio); - if (ep != NULL) { - if ((ep->mode != NNI_EP_MODE_LISTEN) && - nni_list_empty(&ep->aios)) { - if (nni_list_active(&ep->clients, ep)) { - nni_list_remove(&ep->clients, ep); - } - } + pipe = aio->a_pipe; + aio->a_pipe = NULL; + + if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) && + nni_list_empty(&ep->aios)) { + nni_list_node_remove(&ep->node); } - if (nni_aio_finish(aio, rv, 0) != 0) { - if (aio->a_pipe != NULL) { - nni_inproc_pipe_fini(aio->a_pipe); - aio->a_pipe = NULL; + + if (rv == 0) { + nni_aio_finish_pipe(aio, pipe); + } else { + if (pipe != NULL) { + nni_inproc_pipe_fini(pipe); } + nni_aio_finish_error(aio, rv); } } @@ -291,29 +288,6 @@ nni_inproc_ep_close(void *arg) } static void -nni_inproc_connect_abort(nni_aio *aio) -{ - nni_inproc_ep *ep = aio->a_endpt; - - nni_mtx_lock(&nni_inproc.mx); - - if (aio->a_pipe != NULL) { - nni_inproc_pipe_fini(aio->a_pipe); - aio->a_pipe = NULL; - } - nni_aio_list_remove(aio); - if (ep != NULL) { - if ((ep->mode != NNI_EP_MODE_LISTEN) && - nni_list_empty(&ep->aios)) { - if (nni_list_active(&ep->clients, ep)) { - nni_list_remove(&ep->clients, ep); - } - } - } - nni_mtx_unlock(&nni_inproc.mx); -} - -static void nni_inproc_accept_clients(nni_inproc_ep *server) { nni_inproc_ep * client, *nclient; @@ -369,23 +343,24 @@ nni_inproc_accept_clients(nni_inproc_ep *server) } static void -nni_inproc_ep_cancel(nni_aio *aio) +nni_inproc_ep_cancel(nni_aio *aio, int rv) { - nni_inproc_ep *ep = aio->a_prov_data; + nni_inproc_ep * ep = aio->a_prov_data; + nni_inproc_pipe *pipe; nni_mtx_lock(&nni_inproc.mx); - if (nni_list_active(&ep->aios, aio)) { - nni_list_remove(&ep->aios, aio); - } - // Arguably if the mode is a client... then we need to remove - // it from the server's list. Notably this isn't *our* list, - // but the offsets are the same and they're good enough using the - // global lock to make it all safe. - if (nni_list_active(&ep->clients, ep)) { - nni_list_remove(&ep->clients, ep); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_list_node_remove(&ep->node); + if ((pipe = aio->a_pipe) != NULL) { + aio->a_pipe = NULL; + nni_inproc_pipe_fini(pipe); + } + nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&nni_inproc.mx); } + static void nni_inproc_ep_connect(void *arg, nni_aio *aio) { @@ -394,7 +369,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) int rv; if (ep->mode != NNI_EP_MODE_DIAL) { - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_aio_finish_error(aio, NNG_EINVAL); return; } nni_mtx_lock(&nni_inproc.mx); @@ -406,24 +381,24 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) if (nni_list_active(&ep->clients, ep)) { // We already have a pending connection... - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_aio_finish_error(aio, NNG_EINVAL); nni_mtx_unlock(&nni_inproc.mx); return; } if (ep->started) { - nni_aio_finish(aio, NNG_EBUSY, 0); + nni_aio_finish_error(aio, NNG_EBUSY); nni_mtx_unlock(&nni_inproc.mx); return; } if (ep->closed) { - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&nni_inproc.mx); return; } if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) { - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&nni_inproc.mx); return; } @@ -491,32 +466,33 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) nni_inproc_ep *ep = arg; int rv; - if (ep->mode != NNI_EP_MODE_LISTEN) { - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_mtx_lock(&nni_inproc.mx); + + if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&nni_inproc.mx); return; } - nni_mtx_lock(&nni_inproc.mx); + if (ep->mode != NNI_EP_MODE_LISTEN) { + nni_aio_finish_error(aio, NNG_EINVAL); + nni_mtx_unlock(&nni_inproc.mx); + return; + } // We are already on the master list of servers, thanks to bind. if (ep->closed) { - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&nni_inproc.mx); return; } if (!ep->started) { - nni_aio_finish(aio, NNG_ESTATE, 0); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - - if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) { + nni_aio_finish_error(aio, NNG_ESTATE); nni_mtx_unlock(&nni_inproc.mx); return; } if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) { - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&nni_inproc.mx); return; } diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 96dae6de..e8c7968f 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -142,15 +142,20 @@ fail: } static void -nni_ipc_cancel_start(nni_aio *aio) +nni_ipc_cancel_start(nni_aio *aio, int rv) { nni_ipc_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_negaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_negaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->negaio, aio->a_result); + nni_aio_cancel(&pipe->negaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -239,10 +244,10 @@ nni_ipc_pipe_recv_cb(void *arg) nni_ipc_pipe *pipe = arg; nni_aio * aio; int rv; + nni_msg * msg; nni_mtx_lock(&pipe->mtx); - aio = pipe->user_rxaio; - if (aio == NULL) { + if ((aio = pipe->user_rxaio) == NULL) { // aio was canceled nni_mtx_unlock(&pipe->mtx); return; @@ -257,7 +262,7 @@ nni_ipc_pipe_recv_cb(void *arg) pipe->rxmsg = NULL; } pipe->user_rxaio = NULL; - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&pipe->mtx); return; } @@ -270,7 +275,7 @@ nni_ipc_pipe_recv_cb(void *arg) // Check to make sure we got msg type 1. if (pipe->rxhead[0] != 1) { - nni_aio_finish(aio, NNG_EPROTO, 0); + nni_aio_finish_error(aio, NNG_EPROTO); nni_mtx_unlock(&pipe->mtx); return; } @@ -282,7 +287,7 @@ nni_ipc_pipe_recv_cb(void *arg) // the caller will shut down the pipe. if (len > pipe->rcvmax) { pipe->user_rxaio = NULL; - nni_aio_finish(aio, NNG_EMSGSIZE, 0); + nni_aio_finish_error(aio, NNG_EMSGSIZE); nni_mtx_unlock(&pipe->mtx); return; } @@ -294,7 +299,7 @@ nni_ipc_pipe_recv_cb(void *arg) // unlikely to be much of an issue though. if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { pipe->user_rxaio = NULL; - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&pipe->mtx); return; } @@ -313,22 +318,27 @@ nni_ipc_pipe_recv_cb(void *arg) // Otherwise we got a message read completely. Let the user know the // good news. pipe->user_rxaio = NULL; - aio->a_msg = pipe->rxmsg; + msg = pipe->rxmsg; pipe->rxmsg = NULL; - nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); + nni_aio_finish_msg(aio, msg); nni_mtx_unlock(&pipe->mtx); } static void -nni_ipc_cancel_tx(nni_aio *aio) +nni_ipc_cancel_tx(nni_aio *aio, int rv) { nni_ipc_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_txaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_txaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->txaio, aio->a_result); + nni_aio_cancel(&pipe->txaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -364,15 +374,20 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) } static void -nni_ipc_cancel_rx(nni_aio *aio) +nni_ipc_cancel_rx(nni_aio *aio, int rv) { nni_ipc_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_rxaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_rxaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->rxaio, aio->a_result); + nni_aio_cancel(&pipe->rxaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -552,10 +567,18 @@ done: aio = ep->user_aio; ep->user_aio = NULL; - if ((aio == NULL) || (nni_aio_finish_pipe(aio, rv, pipe) != 0)) { - if (pipe != NULL) { - nni_ipc_pipe_fini(pipe); - } + if ((aio != NULL) && (rv == 0)) { + NNI_ASSERT(pipe != NULL); + nni_aio_finish_pipe(aio, pipe); + return; + } + + if (pipe != NULL) { + nni_ipc_pipe_fini(pipe); + } + if (aio != NULL) { + NNI_ASSERT(rv != 0); + nni_aio_finish_error(aio, rv); } } @@ -570,15 +593,21 @@ nni_ipc_ep_cb(void *arg) } static void -nni_ipc_cancel_ep(nni_aio *aio) +nni_ipc_cancel_ep(nni_aio *aio, int rv) { nni_ipc_ep *ep = aio->a_prov_data; + NNI_ASSERT(rv != 0); nni_mtx_lock(&ep->mtx); + if (ep->user_aio != aio) { + nni_mtx_unlock(&ep->mtx); + return; + } ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(&ep->aio, aio->a_result); + nni_aio_cancel(&ep->aio, rv); + nni_aio_finish_error(aio, rv); } static void diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 28677f54..a42fa377 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -141,15 +141,20 @@ fail: } static void -nni_tcp_cancel_nego(nni_aio *aio) +nni_tcp_cancel_nego(nni_aio *aio, int rv) { nni_tcp_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_negaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_negaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->negaio, aio->a_result); + nni_aio_cancel(&pipe->negaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -239,6 +244,7 @@ nni_tcp_pipe_recv_cb(void *arg) nni_tcp_pipe *pipe = arg; nni_aio * aio; int rv; + nni_msg * msg; nni_mtx_lock(&pipe->mtx); @@ -257,7 +263,7 @@ nni_tcp_pipe_recv_cb(void *arg) pipe->rxmsg = NULL; } pipe->user_rxaio = NULL; - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&pipe->mtx); return; } @@ -274,14 +280,14 @@ nni_tcp_pipe_recv_cb(void *arg) // the caller will shut down the pipe. if (len > pipe->rcvmax) { pipe->user_rxaio = NULL; - nni_aio_finish(aio, NNG_EMSGSIZE, 0); + nni_aio_finish_error(aio, NNG_EMSGSIZE); nni_mtx_unlock(&pipe->mtx); return; } if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { pipe->user_rxaio = NULL; - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&pipe->mtx); return; } @@ -300,23 +306,28 @@ nni_tcp_pipe_recv_cb(void *arg) // Otherwise we got a message read completely. Let the user know the // good news. pipe->user_rxaio = NULL; - aio->a_msg = pipe->rxmsg; + msg = pipe->rxmsg; pipe->rxmsg = NULL; - nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); + nni_aio_finish_msg(aio, msg); nni_mtx_unlock(&pipe->mtx); } static void -nni_tcp_cancel_tx(nni_aio *aio) +nni_tcp_cancel_tx(nni_aio *aio, int rv) { nni_tcp_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_txaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_txaio = NULL; nni_mtx_unlock(&pipe->mtx); // cancel the underlying operation. - nni_aio_cancel(&pipe->txaio, aio->a_result); + nni_aio_cancel(&pipe->txaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -352,16 +363,21 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) } static void -nni_tcp_cancel_rx(nni_aio *aio) +nni_tcp_cancel_rx(nni_aio *aio, int rv) { nni_tcp_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_rxaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_rxaio = NULL; nni_mtx_unlock(&pipe->mtx); // cancel the underlying operation. - nni_aio_cancel(&pipe->rxaio, aio->a_result); + nni_aio_cancel(&pipe->rxaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -619,10 +635,16 @@ done: aio = ep->user_aio; ep->user_aio = NULL; - if ((aio == NULL) || (nni_aio_finish_pipe(aio, rv, pipe) != 0)) { - if (pipe != NULL) { - nni_tcp_pipe_fini(pipe); - } + if ((aio != NULL) && (rv == 0)) { + nni_aio_finish_pipe(aio, pipe); + return; + } + if (pipe != NULL) { + nni_tcp_pipe_fini(pipe); + } + if (aio != NULL) { + NNI_ASSERT(rv != 0); + nni_aio_finish_error(aio, rv); } } @@ -637,15 +659,20 @@ nni_tcp_ep_cb(void *arg) } static void -nni_tcp_cancel_ep(nni_aio *aio) +nni_tcp_cancel_ep(nni_aio *aio, int rv) { nni_tcp_ep *ep = aio->a_prov_data; nni_mtx_lock(&ep->mtx); + if (ep->user_aio != aio) { + nni_mtx_unlock(&ep->mtx); + return; + } ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(&ep->aio, aio->a_result); + nni_aio_cancel(&ep->aio, rv); + nni_aio_finish_error(aio, rv); } static void diff --git a/tests/compat_bug777.c b/tests/compat_bug777.c index dc1f54f3..7bd6fce5 100644 --- a/tests/compat_bug777.c +++ b/tests/compat_bug777.c @@ -29,7 +29,6 @@ int main (int argc, const char *argv[]) int sb; int sc1; int sc2; - char socket_address[128]; sb = test_socket (AF_SP, NN_PAIR); test_bind (sb, "inproc://pair"); diff --git a/tests/inproc.c b/tests/inproc.c index 0d1cafbf..2afb1ae0 100644 --- a/tests/inproc.c +++ b/tests/inproc.c @@ -8,9 +8,8 @@ // #include "convey.h" -#include "trantest.h" #include "core/nng_impl.h" - +#include "trantest.h" // Inproc tests. |
