diff options
Diffstat (limited to 'src')
27 files changed, 423 insertions, 362 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index fac62f12..388e6677 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -49,9 +49,9 @@ static nni_list nni_aio_expire_aios; // // In order to guard against aio reuse during teardown, we set a fini // flag. Any attempt to initialize for a new operation after that point -// will fail and the caller will get NNG_ESTATE indicating this. The -// provider that calls nni_aio_start() MUST check the return value, and -// if it comes back nonzero (NNG_ESTATE) then it must simply discard the +// will fail and the caller will get NNG_ECANCELED indicating this. The +// provider that calls nni_aio_begin() MUST check the return value, and +// if it comes back nonzero (NNG_ECANCELED) then it must simply discard the // request and return. // An nni_aio is an async I/O handle. @@ -184,7 +184,7 @@ nni_aio_fini_cb(nni_aio *aio) // nni_aio_stop cancels any oustanding operation, and waits for the // callback to complete, if still running. It also marks the AIO as -// stopped, preventing further calls to nni_aio_start from succeeding. +// stopped, preventing further calls to nni_aio_begin from succeeding. // To correctly tear down an AIO, call stop, and make sure any other // calles are not also stopped, before calling nni_aio_fini to release // actual memory. @@ -298,14 +298,11 @@ nni_aio_wait(nni_aio *aio) } int -nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) +nni_aio_begin(nni_aio *aio) { - nni_time now = nni_clock(); - nni_mtx_lock(&nni_aio_lk); - + // We should not reschedule anything at this point. if (aio->a_fini) { - // We should not reschedule anything at this point. aio->a_active = false; aio->a_result = NNG_ECANCELED; nni_mtx_unlock(&nni_aio_lk); @@ -315,34 +312,52 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) aio->a_pend = false; aio->a_result = 0; aio->a_count = 0; - aio->a_prov_cancel = cancelfn; - aio->a_prov_data = data; + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; aio->a_active = true; - for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } + nni_mtx_unlock(&nni_aio_lk); + return (0); +} +void +nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) +{ if (!aio->a_sleep) { // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { case NNG_DURATION_ZERO: - aio->a_expire = NNI_TIME_ZERO; + aio->a_expire = nni_clock(); break; case NNG_DURATION_INFINITE: case NNG_DURATION_DEFAULT: aio->a_expire = NNI_TIME_NEVER; break; default: - aio->a_expire = now + aio->a_timeout; + aio->a_expire = nni_clock() + aio->a_timeout; break; } } + nni_mtx_lock(&nni_aio_lk); + aio->a_prov_cancel = cancelfn; + aio->a_prov_data = data; if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } nni_mtx_unlock(&nni_aio_lk); +} + +int +nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) +{ + + if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) { + return (NNG_ETIMEDOUT); + } + nni_aio_schedule(aio, cancelfn, data); return (0); } @@ -651,6 +666,9 @@ nni_aio_iov_advance(nni_aio *aio, size_t n) void nni_sleep_aio(nng_duration ms, nng_aio *aio) { + if (nni_aio_begin(aio) != 0) { + return; + } switch (aio->a_timeout) { case NNG_DURATION_DEFAULT: case NNG_DURATION_INFINITE: @@ -661,13 +679,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) // then let it still wake up early, but with NNG_ETIMEDOUT. if (ms > aio->a_timeout) { aio->a_sleep = false; - (void) nni_aio_start(aio, NULL, NULL); + (void) nni_aio_schedule(aio, NULL, NULL); return; } } aio->a_sleep = true; aio->a_expire = nni_clock() + ms; - (void) nni_aio_start(aio, NULL, NULL); + + // There is no cancellation, apart from just unexpiring. + nni_aio_schedule(aio, NULL, NULL); } void diff --git a/src/core/aio.h b/src/core/aio.h index a0f4934f..d23ddf5b 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -123,7 +123,14 @@ extern void nni_aio_finish_msg(nni_aio *, nni_msg *); // with the indicated result (NNG_ECLOSED or NNG_ECANCELED is recommended.) extern void nni_aio_abort(nni_aio *, int rv); -extern int nni_aio_start(nni_aio *, nni_aio_cancelfn, void *); +// nni_aio_begin is called by a provider to indicate it is starting the +// operation, and to check that the aio has not already been marked for +// teardown. It returns 0 on success, or NNG_ECANCELED if the aio is being +// torn down. (In that case, no operation should be aborted without any +// call to any other functions on this AIO, most especially not the +// nng_aio_finish family of functions.) +extern int nni_aio_begin(nni_aio *); + extern void *nni_aio_get_prov_data(nni_aio *); extern void nni_aio_set_prov_data(nni_aio *, void *); extern void *nni_aio_get_prov_extra(nni_aio *, unsigned); @@ -143,6 +150,17 @@ extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **); extern void nni_aio_normalize_timeout(nni_aio *, nng_duration); extern void nni_aio_bump_count(nni_aio *, size_t); +// nni_aio_schedule indicates that the AIO has begun, and is scheduled for +// asychronous completion. This also starts the expiration timer. Note that +// prior to this, the aio is uncancellable. +extern void nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *); + +// nni_aio_schedule_verify is like nni_aio_schedule, except that if the +// operation has been run with a zero time (NNG_FLAG_NONBLOCK), then it +// returns NNG_ETIMEDOUT. This is done to permit bypassing scheduling +// if the operation could not be immediately completed. +extern int nni_aio_schedule_verify(nni_aio *, nni_aio_cancelfn, void *); + extern void nni_sleep_aio(nni_duration, nni_aio *); extern int nni_aio_sys_init(void); diff --git a/src/core/device.c b/src/core/device.c index 6def9f64..e3b1d220 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -188,12 +188,12 @@ nni_device_start(nni_device_data *dd, nni_aio *user) { int i; - nni_mtx_lock(&dd->mtx); - dd->user = user; - if (nni_aio_start(user, nni_device_cancel, dd) != 0) { - nni_mtx_unlock(&dd->mtx); + if (nni_aio_begin(user) != 0) { return; } + nni_mtx_lock(&dd->mtx); + nni_aio_schedule(user, nni_device_cancel, dd); + dd->user = user; for (i = 0; i < dd->npath; i++) { nni_device_path *p = &dd->paths[i]; p->user = user; diff --git a/src/core/endpt.c b/src/core/endpt.c index 0cb59a14..2741a8e6 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -313,7 +313,7 @@ nni_ep_tmo_start(nni_ep *ep) { nni_duration backoff; - if (ep->ep_closing) { + if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) { return; } backoff = ep->ep_currtime; @@ -334,9 +334,7 @@ nni_ep_tmo_start(nni_ep *ep) ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0)); ep->ep_tmo_run = 1; - if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) { - ep->ep_tmo_run = 0; - } + nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep); } static void diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 212b52c2..1bb5a762 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -22,10 +22,10 @@ struct nni_msgq { int mq_len; int mq_get; int mq_put; - int mq_closed; int mq_puterr; int mq_geterr; - int mq_besteffort; + bool mq_besteffort; + bool mq_closed; nni_msg **mq_msgs; nni_list mq_aio_putq; @@ -117,6 +117,10 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) { nni_aio_list_remove(aio); @@ -134,6 +138,10 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { nni_aio_list_remove(aio); @@ -151,6 +159,10 @@ nni_msgq_set_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) || ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) { @@ -231,7 +243,7 @@ nni_msgq_run_putq(nni_msgq *mq) } void -nni_msgq_set_best_effort(nni_msgq *mq, int on) +nni_msgq_set_best_effort(nni_msgq *mq, bool on) { nni_mtx_lock(&mq->mq_lock); mq->mq_besteffort = on; @@ -325,22 +337,28 @@ nni_msgq_cancel(nni_aio *aio, int rv) void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return; - } - if (mq->mq_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); - nni_mtx_unlock(&mq->mq_lock); + int rv; + + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&mq->mq_lock); if (mq->mq_puterr) { nni_aio_finish_error(aio, mq->mq_puterr); nni_mtx_unlock(&mq->mq_lock); return; } + // If this is an instantaneous poll operation, and the queue has + // no room, nobody is waiting to receive, and we're not best effort + // (best effort discards), then report the error (NNG_ETIMEDOUT). + rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + if ((rv != 0) && (mq->mq_len >= mq->mq_cap) && + (nni_list_empty(&mq->mq_aio_getq)) && (!mq->mq_besteffort)) { + nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -351,19 +369,22 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); + int rv; + + if (nni_aio_begin(aio) != 0) { return; } - if (mq->mq_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_geterr) { nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, mq->mq_geterr); return; } - if (mq->mq_geterr) { - nni_aio_finish_error(aio, mq->mq_geterr); + rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + if ((rv != 0) && (mq->mq_len == 0) && + (nni_list_empty(&mq->mq_aio_putq))) { nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, rv); return; } @@ -417,7 +438,8 @@ nni_msgq_close(nni_msgq *mq) nni_aio *aio; nni_mtx_lock(&mq->mq_lock); - mq->mq_closed = 1; + mq->mq_closed = true; + mq->mq_puterr = mq->mq_geterr = NNG_ECLOSED; // Free the messages orphaned in the queue. while (mq->mq_len > 0) { diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 2d23f448..2f1a46eb 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -63,7 +63,7 @@ extern void nni_msgq_set_get_error(nni_msgq *, int); // What this does is treat the message queue condition as if it were // successful, returning 0, and discarding the message. If zero is // passed then this mode is reset to normal. -extern void nni_msgq_set_best_effort(nni_msgq *, int); +extern void nni_msgq_set_best_effort(nni_msgq *, bool); // nni_msgq_filter is a callback function used to filter messages. // The function is called on entry (put) or exit (get). The void diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 16c0d09f..196b206a 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -319,26 +319,22 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) void nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) { - int rv; - // Accept is simpler than the connect case. With accept we just // need to wait for the socket to be readable to indicate an incoming // connection is ready for us. There isn't anything else for us to // do really, as that will have been done in listen. - nni_mtx_lock(&ed->mtx); - // If we can't start, it means that the AIO was stopped. - if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { - nni_mtx_unlock(&ed->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ed->mtx); if (ed->closed) { - nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); nni_mtx_unlock(&ed->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); return; } - nni_aio_list_append(&ed->acceptq, aio); + nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed); nni_posix_pollq_arm(&ed->node, POLLIN); nni_mtx_unlock(&ed->mtx); } @@ -362,39 +358,33 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) int rv; int fd; - nni_mtx_lock(&ed->mtx); - // If we can't start, it means that the AIO was stopped. - if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { - nni_mtx_unlock(&ed->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ed->mtx); - fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0); - if (fd < 0) { - nni_posix_epdesc_finish(aio, rv, 0); + if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { + rv = nni_plat_errno(errno); nni_mtx_unlock(&ed->mtx); + nni_aio_finish_error(aio, rv); return; } // Possibly bind. - if (ed->loclen != 0) { - rv = bind(fd, (void *) &ed->locaddr, ed->loclen); - if (rv != 0) { - (void) close(fd); - nni_posix_epdesc_finish(aio, rv, 0); - nni_mtx_unlock(&ed->mtx); - return; - } + if ((ed->loclen != 0) && + (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) { + rv = nni_plat_errno(errno); + nni_mtx_unlock(&ed->mtx); + (void) close(fd); + nni_aio_finish_error(aio, rv); + return; } (void) fcntl(fd, F_SETFL, O_NONBLOCK); - rv = connect(fd, (void *) &ed->remaddr, ed->remlen); - - if (rv == 0) { + if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) { // Immediate connect, cool! This probably only happens on // loopback, and probably not on every platform. - nni_posix_epdesc_finish(aio, 0, fd); nni_mtx_unlock(&ed->mtx); return; @@ -402,24 +392,27 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) if (errno != EINPROGRESS) { // Some immediate failure occurred. - if (errno == ENOENT) { + if (errno == ENOENT) { // For UNIX domain sockets errno = ECONNREFUSED; } - (void) close(fd); - nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); + rv = nni_plat_errno(errno); nni_mtx_unlock(&ed->mtx); + (void) close(fd); + nni_aio_finish_error(aio, rv); return; } // We have to submit to the pollq, because the connection is pending. ed->node.fd = fd; if ((rv = nni_posix_pollq_add(&ed->node)) != 0) { - (void) close(fd); - nni_posix_epdesc_finish(aio, rv, 0); + ed->node.fd = -1; nni_mtx_unlock(&ed->mtx); + (void) close(fd); + nni_aio_finish_error(aio, rv); return; } + nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed); nni_aio_list_append(&ed->connectq, aio); nni_posix_pollq_arm(&ed->node, POLLOUT); nni_mtx_unlock(&ed->mtx); diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index aebea4b8..62531f9c 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -254,20 +254,20 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv) void nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) { - int rv; - - nni_mtx_lock(&pd->mtx); - if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) { - nni_mtx_unlock(&pd->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&pd->mtx); + if (pd->closed) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); nni_mtx_unlock(&pd->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); return; } nni_aio_list_append(&pd->readq, aio); + nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd); + // If we are only job on the list, go ahead and try to do an immediate // transfer. This allows for faster completions in many cases. We // also need not arm a list if it was already armed. @@ -285,20 +285,20 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) void nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) { - int rv; - - nni_mtx_lock(&pd->mtx); - if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) { - nni_mtx_unlock(&pd->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&pd->mtx); + if (pd->closed) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); nni_mtx_unlock(&pd->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); return; } nni_aio_list_append(&pd->writeq, aio); + nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd); + if (nni_list_first(&pd->writeq) == aio) { nni_posix_pipedesc_dowrite(pd); // If we are still the first thing on the list, that means we diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index f3ac7a19..d86a2008 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -206,9 +206,11 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive, int family, int proto, nni_aio *aio) { nni_posix_resolv_item *item; - int rv; sa_family_t fam; + if (nni_aio_begin(aio) != 0) { + return; + } switch (family) { case NNG_AF_INET: fam = AF_INET; @@ -241,12 +243,7 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive, item->family = fam; nni_mtx_lock(&nni_posix_resolv_mtx); - // If we were stopped, we're done... - if ((rv = nni_aio_start(aio, nni_posix_resolv_cancel, item)) != 0) { - nni_mtx_unlock(&nni_posix_resolv_mtx); - NNI_FREE_STRUCT(item); - return; - } + nni_aio_schedule(aio, nni_posix_resolv_cancel, item); nni_task_dispatch(&item->task); nni_mtx_unlock(&nni_posix_resolv_mtx); } diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 8a402acc..b414fa95 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -287,22 +287,26 @@ nni_plat_udp_cancel(nni_aio *aio, int rv) void nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio) { - nni_mtx_lock(&udp->udp_mtx); - if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) { - nni_list_append(&udp->udp_recvq, aio); - nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); + if (nni_aio_begin(aio) != 0) { + return; } + nni_mtx_lock(&udp->udp_mtx); + nni_aio_schedule(aio, nni_plat_udp_cancel, udp); + nni_list_append(&udp->udp_recvq, aio); + nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); nni_mtx_unlock(&udp->udp_mtx); } void nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio) { - nni_mtx_lock(&udp->udp_mtx); - if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) { - nni_list_append(&udp->udp_sendq, aio); - nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT); + if (nni_aio_begin(aio) != 0) { + return; } + nni_mtx_lock(&udp->udp_mtx); + nni_aio_schedule(aio, nni_plat_udp_cancel, udp); + nni_list_append(&udp->udp_sendq, aio); + nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT); nni_mtx_unlock(&udp->udp_mtx); } diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index 1189433a..a3ae3748 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -155,12 +155,11 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) void nni_win_event_submit(nni_win_event *evt, nni_aio *aio) { - nni_mtx_lock(&evt->mtx); - if (nni_aio_start(aio, nni_win_event_cancel, evt) != 0) { - // the aio was aborted - nni_mtx_unlock(&evt->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&evt->mtx); + nni_aio_schedule(aio, nni_win_event_cancel, evt); nni_aio_list_append(&evt->aios, aio); nni_win_event_start(evt); nni_mtx_unlock(&evt->mtx); diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index 5843917c..d372f639 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -491,16 +491,16 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) { nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; + if (nni_aio_begin(aio) != 0) { + return; + } nni_mtx_lock(&w->mtx); NNI_ASSERT(!nni_list_active(&w->waiters, ep)); - if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) { - nni_mtx_unlock(&w->mtx); - return; - } ep->con_aio = aio; nni_list_append(&w->waiters, ep); + nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep); nni_cv_wake(&w->cv); nni_mtx_unlock(&w->mtx); } diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index d07b4fd5..72e6b439 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -180,6 +180,9 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family, int rv; int fam; + if (nni_aio_begin(aio) != 0) { + return; + } switch (family) { case NNG_AF_INET: fam = AF_INET; @@ -211,12 +214,7 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family, item->family = fam; nni_mtx_lock(&nni_win_resolv_mtx); - // If we were stopped, we're done... - if ((rv = nni_aio_start(aio, nni_win_resolv_cancel, item)) != 0) { - nni_mtx_unlock(&nni_win_resolv_mtx); - NNI_FREE_STRUCT(item); - return; - } + nni_aio_schedule(aio, nni_win_resolv_cancel, item); nni_task_dispatch(&item->task); nni_mtx_unlock(&nni_win_resolv_mtx); } diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index e18675ee..afc52629 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -170,6 +170,10 @@ rep0_ctx_send(void *arg, nni_aio *aio) msg = nni_aio_get_msg(aio); nni_msg_header_clear(msg); + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&s->lk); len = ctx->btrace_len; p_id = ctx->pipe_id; @@ -187,10 +191,6 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_pollable_clear(s->sendable); } - if (nni_aio_start(aio, rep0_ctx_cancel_send, ctx) != 0) { - nni_mtx_unlock(&s->lk); - return; - } if (ctx->closed) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, NNG_ECLOSED); @@ -201,12 +201,6 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ESTATE); return; } - if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); - return; - } - if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) { // Pipe is gone. Make this look like a good send to avoid // disrupting the state machine. We don't care if the peer @@ -217,7 +211,18 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_msg_free(msg); return; } + if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } if (p->busy) { + rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } ctx->saio = aio; ctx->spipe = p; nni_list_append(&p->sendq, ctx); @@ -456,18 +461,23 @@ rep0_ctx_recv(void *arg, nni_aio *aio) size_t len; nni_msg * msg; - nni_mtx_lock(&s->lk); - if (nni_aio_start(aio, rep0_cancel_recv, ctx) != 0) { - nni_mtx_unlock(&s->lk); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&s->lk); if (ctx->closed) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, NNG_ECLOSED); return; } if ((p = nni_list_first(&s->recvpipes)) == NULL) { - nni_pollable_clear(s->recvable); + int rv; + rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } ctx->raio = aio; nni_list_append(&s->recvq, ctx); nni_mtx_unlock(&s->lk); diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 8149ce08..3ecc8604 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -617,11 +617,10 @@ req0_ctx_recv(void *arg, nni_aio *aio) req0_sock *s = ctx->sock; nni_msg * msg; - nni_mtx_lock(&s->mtx); - if (nni_aio_start(aio, req0_ctx_cancel_recv, ctx) != 0) { - nni_mtx_unlock(&s->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&s->mtx); if (s->closed) { nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, NNG_ECLOSED); @@ -638,6 +637,13 @@ req0_ctx_recv(void *arg, nni_aio *aio) } if ((msg = ctx->repmsg) == NULL) { + int rv; + rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_recv, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } ctx->raio = aio; nni_mtx_unlock(&s->mtx); return; @@ -697,14 +703,11 @@ req0_ctx_send(void *arg, nni_aio *aio) uint64_t id; int rv; - nni_mtx_lock(&s->mtx); - // Even though we always complete synchronously, this guards against - // restarting a request that was stopped. - if (nni_aio_start(aio, req0_ctx_cancel_send, ctx) != 0) { - nni_mtx_unlock(&s->mtx); + if (nni_aio_begin(aio) != 0) { return; } - // Sending a new requst cancels the old one, including any + nni_mtx_lock(&s->mtx); + // Sending a new request cancels the old one, including any // outstanding reply. if (ctx->raio != NULL) { nni_aio_finish_error(ctx->raio, NNG_ECANCELED); @@ -735,6 +738,15 @@ req0_ctx_send(void *arg, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } + // If no pipes are ready, and the request was a poll (no background + // schedule), then fail it. Should be NNG_TIMEDOUT. + rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_send, ctx); + if ((rv != 0) && (nni_list_empty(&s->readypipes))) { + nni_idhash_remove(s->reqids, id); + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } ctx->reqlen = nni_msg_len(msg); ctx->reqmsg = msg; ctx->saio = aio; @@ -743,6 +755,8 @@ req0_ctx_send(void *arg, nni_aio *aio) // Stick us on the sendq list. nni_list_append(&s->sendq, ctx); + // Note that this will be synchronous if the readypipes list was + // not empty. req0_run_sendq(s, NULL); nni_mtx_unlock(&s->mtx); } diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index c62a3c56..2a9be4bb 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -241,10 +241,11 @@ http_connect_cancel(nni_aio *aio, int rv) void nni_http_client_connect(nni_http_client *c, nni_aio *aio) { - if (nni_aio_start(aio, http_connect_cancel, aio) != 0) { + if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&c->mtx); + nni_aio_schedule(aio, http_connect_cancel, aio); nni_list_append(&c->aios, aio); if (nni_list_first(&c->aios) == aio) { http_conn_start(c); diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c index d37c9a2f..a3039d0d 100644 --- a/src/supplemental/http/http_conn.c +++ b/src/supplemental/http/http_conn.c @@ -360,13 +360,14 @@ http_rd_cancel(nni_aio *aio, int rv) static void http_rd_submit(nni_http_conn *conn, nni_aio *aio) { - if (nni_aio_start(aio, http_rd_cancel, conn) != 0) { + if (nni_aio_begin(aio) != 0) { return; } if (conn->closed) { nni_aio_finish_error(aio, NNG_ECLOSED); return; } + nni_aio_schedule(aio, http_rd_cancel, conn); nni_list_append(&conn->rdq, aio); if (conn->rd_uaio == NULL) { http_rd_start(conn); @@ -473,7 +474,7 @@ http_wr_cancel(nni_aio *aio, int rv) static void http_wr_submit(nni_http_conn *conn, nni_aio *aio) { - if (nni_aio_start(aio, http_wr_cancel, conn) != 0) { + if (nni_aio_begin(aio) != 0) { return; } if (conn->closed) { @@ -481,6 +482,7 @@ http_wr_submit(nni_http_conn *conn, nni_aio *aio) return; } nni_list_append(&conn->wrq, aio); + nni_aio_schedule(aio, http_wr_cancel, conn); if (conn->wr_uaio == NULL) { http_wr_start(conn); } diff --git a/src/supplemental/http/http_public.c b/src/supplemental/http/http_public.c index 7107c029..7aae45b2 100644 --- a/src/supplemental/http/http_public.c +++ b/src/supplemental/http/http_public.c @@ -9,9 +9,9 @@ // #include "core/nng_impl.h" -#include "supplemental/tls/tls.h" #include "http.h" #include "http_api.h" +#include "supplemental/tls/tls.h" // Symbols in this file are "public" versions of the HTTP API. // These are suitable for exposure to applications. @@ -382,7 +382,7 @@ nng_http_conn_read(nng_http_conn *conn, nng_aio *aio) nni_http_read(conn, aio); #else NNI_ARG_UNUSED(conn); - if (nni_aio_start(aio, NULL, NULL)) { + if (nni_aio_begin(aio) == 0) { nni_aio_finish_error(aio, NNG_ENOTSUP); } #endif @@ -395,7 +395,7 @@ nng_http_conn_read_all(nng_http_conn *conn, nng_aio *aio) nni_http_read_full(conn, aio); #else NNI_ARG_UNUSED(conn); - if (nni_aio_start(aio, NULL, NULL)) { + if (nni_aio_begin(aio) == 0) { nni_aio_finish_error(aio, NNG_ENOTSUP); } #endif @@ -408,7 +408,7 @@ nng_http_conn_write(nng_http_conn *conn, nng_aio *aio) nni_http_write(conn, aio); #else NNI_ARG_UNUSED(conn); - if (nni_aio_start(aio, NULL, NULL)) { + if (nni_aio_begin(aio) == 0) { nni_aio_finish_error(aio, NNG_ENOTSUP); } #endif @@ -433,7 +433,7 @@ nng_http_conn_write_req(nng_http_conn *conn, nng_http_req *req, nng_aio *aio) #else NNI_ARG_UNUSED(conn); NNI_ARG_UNUSED(req); - if (nni_aio_start(aio, NULL, NULL)) { + if (nni_aio_begin(aio) == 0) { nni_aio_finish_error(aio, NNG_ENOTSUP); } #endif @@ -447,7 +447,7 @@ nng_http_conn_write_res(nng_http_conn *conn, nng_http_res *res, nng_aio *aio) #else NNI_ARG_UNUSED(conn); NNI_ARG_UNUSED(res); - if (nni_aio_start(aio, NULL, NULL)) { + if (nni_aio_begin(aio) == 0) { nni_aio_finish_error(aio, NNG_ENOTSUP); } #endif @@ -461,7 +461,7 @@ nng_http_conn_read_req(nng_http_conn *conn, nng_http_req *req, nng_aio *aio) #else NNI_ARG_UNUSED(conn); NNI_ARG_UNUSED(req); - if (nni_aio_start(aio, NULL, NULL)) { + if (nni_aio_begin(aio) == 0) { nni_aio_finish_error(aio, NNG_ENOTSUP); } #endif @@ -475,7 +475,7 @@ nng_http_conn_read_res(nng_http_conn *conn, nng_http_res *res, nng_aio *aio) #else NNI_ARG_UNUSED(conn); NNI_ARG_UNUSED(res); - if (nni_aio_start(aio, NULL, NULL)) { + if (nni_aio_begin(aio) == 0) { nni_aio_finish_error(aio, NNG_ENOTSUP); } #endif @@ -574,7 +574,8 @@ nng_http_handler_set_host(nng_http_handler *h, const char *host) #endif } -int nng_http_handler_set_tree(nng_http_handler *h) +int +nng_http_handler_set_tree(nng_http_handler *h) { #ifdef NNG_SUPP_HTTP return (nni_http_handler_set_tree(h)); @@ -698,12 +699,13 @@ nng_http_server_get_tls(nng_http_server *srv, nng_tls_config **cfgp) #endif } -int nng_http_hijack(nng_http_conn * conn) +int +nng_http_hijack(nng_http_conn *conn) { #ifdef NNG_SUPP_HTTP - return (nni_http_hijack(conn)); + return (nni_http_hijack(conn)); #else - NNI_ARG_UNUSED(conn); + NNI_ARG_UNUSED(conn); return (NNG_ENOTSUP); #endif } @@ -762,7 +764,7 @@ nng_http_client_connect(nng_http_client *cli, nng_aio *aio) nni_http_client_connect(cli, aio); #else NNI_ARG_UNUSED(cli); - if (nni_aio_start(aio, NULL, NULL)) { + if (nni_aio_begin(aio) == 0) { nni_aio_finish_error(aio, NNG_ENOTSUP); } #endif diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index 7feadc96..c92de586 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -590,8 +590,8 @@ http_sconn_rxdone(void *arg) // Technically, probably callback should initialize this with // start, but we do it instead. - - if (nni_aio_start(sc->cbaio, NULL, NULL) != 0) { + // This operation cannot at present canceled or timed out. + if (nni_aio_begin(sc->cbaio) != 0) { nni_mtx_unlock(&s->mtx); return; } diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c index 8faf4e46..c2a0fb52 100644 --- a/src/supplemental/tls/mbedtls/tls.c +++ b/src/supplemental/tls/mbedtls/tls.c @@ -13,7 +13,6 @@ #include <stdlib.h> #include <string.h> - #include "mbedtls/version.h" // Must be first in order to pick up version #include "mbedtls/error.h" @@ -562,16 +561,16 @@ nni_tls_net_recv(void *ctx, unsigned char *buf, size_t len) void nni_tls_send(nni_tls *tp, nni_aio *aio) { - nni_mtx_lock(&tp->lk); - if (nni_aio_start(aio, nni_tls_cancel, tp) != 0) { - nni_mtx_unlock(&tp->lk); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&tp->lk); if (tp->tls_closed) { nni_mtx_unlock(&tp->lk); nni_aio_finish_error(aio, NNG_ECLOSED); return; } + nni_aio_schedule(aio, nni_tls_cancel, tp); nni_list_append(&tp->sends, aio); nni_tls_do_send(tp); nni_mtx_unlock(&tp->lk); @@ -580,16 +579,16 @@ nni_tls_send(nni_tls *tp, nni_aio *aio) void nni_tls_recv(nni_tls *tp, nni_aio *aio) { - nni_mtx_lock(&tp->lk); - if (nni_aio_start(aio, nni_tls_cancel, tp) != 0) { - nni_mtx_unlock(&tp->lk); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&tp->lk); if (tp->tls_closed) { nni_mtx_unlock(&tp->lk); nni_aio_finish_error(aio, NNG_ECLOSED); return; } + nni_aio_schedule(aio, nni_tls_cancel, tp); nni_list_append(&tp->recvs, aio); nni_tls_do_recv(tp); nni_mtx_unlock(&tp->lk); diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 7bd698bd..74b56437 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -655,9 +655,7 @@ ws_send_close(nni_ws *ws, uint16_t code) ws->closed = true; aio = ws->closeaio; - // We don't care about cancellation here. If this times out, - // we will still shut all the physical I/O down in the callback. - if (nni_aio_start(aio, ws_cancel_close, ws) != 0) { + if (nni_aio_begin(aio) != 0) { return; } ws->wclose = true; @@ -669,6 +667,7 @@ ws_send_close(nni_ws *ws, uint16_t code) } // Close frames get priority! nni_list_prepend(&ws->txmsgs, wm); + nni_aio_schedule(aio, ws_cancel_close, ws); ws_start_write(ws); } @@ -722,26 +721,22 @@ nni_ws_send_msg(nni_ws *ws, nni_aio *aio) msg = nni_aio_get_msg(aio); + if (nni_aio_begin(aio) != 0) { + return; + } if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) { - if (nni_aio_start(aio, NULL, NULL) == 0) { - nni_aio_finish_error(aio, rv); - } + nni_aio_finish_error(aio, rv); return; } nni_mtx_lock(&ws->mtx); - - if (nni_aio_start(aio, ws_write_cancel, ws) != 0) { - nni_mtx_unlock(&ws->mtx); - ws_msg_fini(wm); - return; - } if (ws->closed) { nni_mtx_unlock(&ws->mtx); nni_aio_finish_error(aio, NNG_ECLOSED); ws_msg_fini(wm); return; } + nni_aio_schedule(aio, ws_write_cancel, ws); nni_aio_set_prov_extra(aio, 0, wm); nni_list_append(&ws->sendq, aio); nni_list_append(&ws->txmsgs, wm); @@ -1056,19 +1051,20 @@ nni_ws_recv_msg(nni_ws *ws, nni_aio *aio) ws_msg *wm; int rv; + if (nni_aio_begin(aio) != 0) { + return; + } if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) { - if (nni_aio_start(aio, NULL, NULL)) { - nni_aio_finish_error(aio, rv); - } + nni_aio_finish_error(aio, rv); return; } nni_mtx_lock(&ws->mtx); - if (nni_aio_start(aio, ws_read_cancel, ws) == 0) { - nni_aio_set_prov_extra(aio, 0, wm); - nni_list_append(&ws->recvq, aio); - nni_list_append(&ws->rxmsgs, wm); - ws_start_read(ws); - } + nni_aio_schedule(aio, ws_read_cancel, ws); + nni_aio_set_prov_extra(aio, 0, wm); + nni_list_append(&ws->recvq, aio); + nni_list_append(&ws->rxmsgs, wm); + ws_start_read(ws); + nni_mtx_unlock(&ws->mtx); } @@ -1651,11 +1647,10 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio) { nni_ws *ws; - nni_mtx_lock(&l->mtx); - if (nni_aio_start(aio, ws_accept_cancel, l) != 0) { - nni_mtx_unlock(&l->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&l->mtx); if (l->closed) { nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&l->mtx); @@ -1668,11 +1663,13 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio) } if ((ws = nni_list_first(&l->pend)) != NULL) { nni_list_remove(&l->pend, ws); + nni_mtx_unlock(&l->mtx); nni_aio_set_output(aio, 0, ws); nni_aio_finish(aio, 0, 0); - } else { - nni_list_append(&l->aios, aio); + return; } + nni_aio_schedule(aio, ws_accept_cancel, l); + nni_list_append(&l->aios, aio); nni_mtx_unlock(&l->mtx); } @@ -1983,19 +1980,17 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio) nni_ws *ws; int rv; + if (nni_aio_begin(aio) != 0) { + return; + } if ((rv = ws_init(&ws)) != 0) { nni_aio_finish_error(aio, rv); return; } nni_mtx_lock(&d->mtx); if (d->closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); - nni_mtx_unlock(&d->mtx); - ws_fini(ws); - return; - } - if (nni_aio_start(aio, ws_dial_cancel, ws) != 0) { nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); ws_fini(ws); return; } @@ -2003,6 +1998,7 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio) ws->useraio = aio; ws->mode = NNI_EP_MODE_DIAL; nni_list_append(&d->wspend, ws); + nni_aio_schedule(aio, ws_dial_cancel, ws); nni_http_client_connect(d->client, ws->connaio); nni_mtx_unlock(&d->mtx); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 82f46fc4..8bfb097e 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -349,19 +349,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) { nni_inproc_ep *ep = arg; nni_inproc_ep *server; - int rv; + if (nni_aio_begin(aio) != 0) { + return; + } if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish_error(aio, NNG_EINVAL); return; } nni_mtx_lock(&nni_inproc.mx); - if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) { - nni_mtx_unlock(&nni_inproc.mx); - return; - } - // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { if (strcmp(server->addr, ep->addr) == 0) { @@ -369,11 +366,17 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) } } if (server == NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); + // nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, NNG_ECONNREFUSED); return; } + // We don't have to worry about the case where a zero timeout + // on connect was specified, as there is no option to specify + // that in the upper API. + nni_aio_schedule(aio, nni_inproc_ep_cancel, ep); + nni_list_append(&server->clients, ep); nni_aio_list_append(&ep->aios, aio); @@ -404,15 +407,17 @@ static void nni_inproc_ep_accept(void *arg, nni_aio *aio) { nni_inproc_ep *ep = arg; - int rv; - nni_mtx_lock(&nni_inproc.mx); - - if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) { - nni_mtx_unlock(&nni_inproc.mx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&nni_inproc.mx); + + // We need not worry about the case where a non-blocking + // accept was tried -- there is no API to do such a thing. + nni_aio_schedule(aio, nni_inproc_ep_cancel, ep); + // We are already on the master list of servers, thanks to bind. // Insert us into pending server aios, and then run accept list. nni_aio_list_append(&ep->aios, aio); diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index ecc9a962..61b89f20 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -418,11 +418,11 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - nni_mtx_lock(&pipe->mtx); - if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) { - nni_mtx_unlock(&pipe->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&pipe->mtx); + nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe); nni_list_append(&pipe->sendq, aio); if (nni_list_first(&pipe->sendq) == aio) { nni_ipc_pipe_dosend(pipe, aio); @@ -474,12 +474,13 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - nni_mtx_lock(&pipe->mtx); - - if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) { - nni_mtx_unlock(&pipe->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&pipe->mtx); + + // Transports never have a zero length timeout. + (void) nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe); nni_list_append(&pipe->recvq, aio); if (nni_list_first(&pipe->recvq) == aio) { @@ -492,10 +493,12 @@ static void nni_ipc_pipe_start(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - int rv; nni_aio * negaio; nni_iov iov; + if (nni_aio_begin(aio) != 0) { + return; + } nni_mtx_lock(&pipe->mtx); pipe->txhead[0] = 0; pipe->txhead[1] = 'S'; @@ -513,11 +516,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &pipe->txhead[0]; nni_aio_set_iov(negaio, 1, &iov); - rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe); - if (rv != 0) { - nni_mtx_unlock(&pipe->mtx); - return; - } + nni_aio_schedule(aio, nni_ipc_cancel_start, pipe); nni_plat_ipc_pipe_send(pipe->ipp, negaio); nni_mtx_unlock(&pipe->mtx); } @@ -680,16 +679,14 @@ static void nni_ipc_ep_accept(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - int rv; - - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_ipc_cancel_ep, ep); ep->user_aio = aio; nni_plat_ipc_ep_accept(ep->iep, ep->aio); @@ -700,18 +697,14 @@ static void nni_ipc_ep_connect(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - int rv; - - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - // If we can't start, then its dying and we can't report - // either. - if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_ipc_cancel_ep, ep); ep->user_aio = aio; nni_plat_ipc_ep_connect(ep->iep, ep->aio); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 7ea77035..a551c5be 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -398,11 +398,11 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; - nni_mtx_lock(&p->mtx); - if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&p->mtx); + nni_aio_schedule(aio, nni_tcp_cancel_tx, p); nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { nni_tcp_pipe_dosend(p, aio); @@ -454,11 +454,11 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; - nni_mtx_lock(&p->mtx); - if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&p->mtx); + nni_aio_schedule(aio, nni_tcp_cancel_rx, p); nni_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { nni_tcp_pipe_dorecv(p); @@ -510,6 +510,9 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) nni_aio * negaio; nni_iov iov; + if (nni_aio_begin(aio) != 0) { + return; + } nni_mtx_lock(&p->mtx); p->txlen[0] = 0; p->txlen[1] = 'S'; @@ -527,10 +530,7 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; nni_aio_set_iov(negaio, 1, &iov); - if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) { - nni_mtx_unlock(&p->mtx); - return; - } + nni_aio_schedule(aio, nni_tcp_cancel_nego, p); nni_plat_tcp_pipe_send(p->tpp, negaio); nni_mtx_unlock(&p->mtx); } @@ -721,16 +721,14 @@ static void nni_tcp_ep_accept(void *arg, nni_aio *aio) { nni_tcp_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - - if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_tcp_cancel_ep, ep); ep->user_aio = aio; nni_plat_tcp_ep_accept(ep->tep, ep->aio); @@ -741,17 +739,14 @@ static void nni_tcp_ep_connect(void *arg, nni_aio *aio) { nni_tcp_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - - // If we can't start, then its dying and we can't report either. - if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_tcp_cancel_ep, ep); ep->user_aio = aio; nni_plat_tcp_ep_connect(ep->tep, ep->aio); diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index a852ddd8..2ea14e93 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -405,13 +405,11 @@ nni_tls_pipe_send(void *arg, nni_aio *aio) { nni_tls_pipe *p = arg; - nni_mtx_lock(&p->mtx); - - if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } - + nni_mtx_lock(&p->mtx); + nni_aio_schedule(aio, nni_tls_cancel_tx, p); nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { nni_tls_pipe_dosend(p, aio); @@ -463,12 +461,12 @@ nni_tls_pipe_recv(void *arg, nni_aio *aio) { nni_tls_pipe *p = arg; - nni_mtx_lock(&p->mtx); - - if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&p->mtx); + + nni_aio_schedule(aio, nni_tls_cancel_rx, p); nni_aio_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { nni_tls_pipe_dorecv(p); @@ -519,6 +517,9 @@ nni_tls_pipe_start(void *arg, nni_aio *aio) nni_aio * negaio; nni_iov iov; + if (nni_aio_begin(aio) != 0) { + return; + } nni_mtx_lock(&p->mtx); p->txlen[0] = 0; p->txlen[1] = 'S'; @@ -536,10 +537,7 @@ nni_tls_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; nni_aio_set_iov(negaio, 1, &iov); - if (nni_aio_start(aio, nni_tls_cancel_nego, p) != 0) { - nni_mtx_unlock(&p->mtx); - return; - } + nni_aio_schedule(aio, nni_tls_cancel_nego, p); nni_tls_send(p->tls, negaio); nni_mtx_unlock(&p->mtx); } @@ -743,18 +741,14 @@ static void nni_tls_ep_accept(void *arg, nni_aio *aio) { nni_tls_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - - if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } - + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_tls_cancel_ep, ep); ep->user_aio = aio; - nni_plat_tcp_ep_accept(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); } @@ -763,19 +757,14 @@ static void nni_tls_ep_connect(void *arg, nni_aio *aio) { nni_tls_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - - // If we can't start, then its dying and we can't report either. - if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } - + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_tls_cancel_ep, ep); ep->user_aio = aio; - nni_plat_tcp_ep_connect(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); } diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 0542d0c7..761ba824 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -129,13 +129,12 @@ ws_pipe_recv(void *arg, nni_aio *aio) { ws_pipe *p = arg; - nni_mtx_lock(&p->mtx); - if (nni_aio_start(aio, ws_pipe_recv_cancel, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&p->mtx); + nni_aio_schedule(aio, ws_pipe_recv_cancel, p); p->user_rxaio = aio; - nni_ws_recv_msg(p->ws, p->rxaio); nni_mtx_unlock(&p->mtx); } @@ -160,11 +159,11 @@ ws_pipe_send(void *arg, nni_aio *aio) { ws_pipe *p = arg; - nni_mtx_lock(&p->mtx); - if (nni_aio_start(aio, ws_pipe_send_cancel, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&p->mtx); + nni_aio_schedule(aio, ws_pipe_send_cancel, p); p->user_txaio = aio; nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio)); nni_aio_set_msg(aio, NULL); @@ -294,11 +293,11 @@ ws_ep_accept(void *arg, nni_aio *aio) // We already bound, so we just need to look for an available // pipe (created by the handler), and match it. // Otherwise we stick the AIO in the accept list. - nni_mtx_lock(&ep->mtx); - if (nni_aio_start(aio, ws_ep_cancel, ep) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ep->mtx); + nni_aio_schedule(aio, ws_ep_cancel, ep); nni_list_append(&ep->aios, aio); if (aio == nni_list_first(&ep->aios)) { nni_ws_listener_accept(ep->listener, ep->accaio); @@ -313,6 +312,9 @@ ws_ep_connect(void *arg, nni_aio *aio) int rv = 0; ws_hdr *h; + if (nni_aio_begin(aio) != 0) { + return; + } if (!ep->started) { NNI_LIST_FOREACH (&ep->headers, h) { rv = nni_ws_dialer_header( @@ -327,11 +329,7 @@ ws_ep_connect(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(nni_list_empty(&ep->aios)); - // If we can't start, then its dying and we can't report either. - if ((rv = nni_aio_start(aio, ws_ep_cancel, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } + nni_aio_schedule(aio, ws_ep_cancel, ep); ep->started = true; nni_list_append(&ep->aios, aio); nni_ws_dialer_dial(ep->dialer, ep->connaio); diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index d0790b37..05866cfb 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -1745,26 +1745,24 @@ zt_pipe_send(void *arg, nni_aio *aio) size_t bytes; nni_msg *m; - nni_mtx_lock(&zt_lk); - if (nni_aio_start(aio, NULL, p) != 0) { - nni_mtx_unlock(&zt_lk); + if (nni_aio_begin(aio) != 0) { + return; + } + if ((m = nni_aio_get_msg(aio)) == NULL) { + nni_aio_finish_error(aio, NNG_EINVAL); return; } + nni_mtx_lock(&zt_lk); + if (p->zp_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&zt_lk); + nni_aio_finish_error(aio, NNG_ECLOSED); return; } fragsz = (uint16_t)(p->zp_mtu - zt_offset_data_data); - if ((m = nni_aio_get_msg(aio)) == NULL) { - nni_aio_finish_error(aio, NNG_EINVAL); - nni_mtx_unlock(&zt_lk); - return; - }; - bytes = nni_msg_header_len(m) + nni_msg_len(m); if (bytes >= (0xfffe * fragsz)) { nni_aio_finish_error(aio, NNG_EMSGSIZE); @@ -1821,11 +1819,16 @@ zt_pipe_send(void *arg, nni_aio *aio) zt_send(p->zp_ztn, p->zp_nwid, zt_op_data, p->zp_raddr, p->zp_laddr, data, fraglen + zt_offset_data_data); } while (nni_msg_len(m) != 0); + nni_mtx_unlock(&zt_lk); + + // NB, We never bothered to call nn_aio_sched, because we run this + // synchronously, relying on UDP to simply discard messages if we + // cannot deliver them. This means that pipe send operations with + // this transport are not cancellable. nni_aio_set_msg(aio, NULL); nni_msg_free(m); nni_aio_finish(aio, 0, offset); - nni_mtx_unlock(&zt_lk); } static void @@ -1903,17 +1906,18 @@ zt_pipe_recv(void *arg, nni_aio *aio) { zt_pipe *p = arg; - nni_mtx_lock(&zt_lk); - if (nni_aio_start(aio, zt_pipe_cancel_recv, p) != 0) { - nni_mtx_unlock(&zt_lk); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&zt_lk); if (p->zp_closed) { + nni_mtx_unlock(&zt_lk); nni_aio_finish_error(aio, NNG_ECLOSED); - } else { - p->zp_user_rxaio = aio; - zt_pipe_dorecv(p); + return; } + nni_aio_schedule(aio, zt_pipe_cancel_recv, p); + p->zp_user_rxaio = aio; + zt_pipe_dorecv(p); nni_mtx_unlock(&zt_lk); } @@ -2047,11 +2051,10 @@ zt_pipe_ping_cb(void *arg) // use the the timer to wake us up even if we aren't // going to send a ping. (We don't increment the try count // unless we actually do send one though.) - if (nni_aio_start(aio, zt_pipe_cancel_ping, p) == 0) { + if (nni_aio_begin(aio) == 0) { + nni_aio_schedule(aio, zt_pipe_cancel_ping, p); p->zp_ping_active = 1; if (now > (p->zp_last_recv + p->zp_ping_time)) { - // We have to send a ping to keep the session - // up. p->zp_ping_try++; zt_pipe_send_ping(p); } @@ -2069,6 +2072,9 @@ zt_pipe_start(void *arg, nni_aio *aio) { zt_pipe *p = arg; + if (nni_aio_begin(aio) != 0) { + return; + } nni_mtx_lock(&zt_lk); p->zp_ping_active = 0; // send a gratuitous ping, and start the ping interval timer. @@ -2077,8 +2083,9 @@ zt_pipe_start(void *arg, nni_aio *aio) (p->zp_ping_aio != NULL)) { p->zp_ping_try = 0; nni_aio_set_timeout(aio, p->zp_ping_time); - if (nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p) == - 0) { + if (nni_aio_begin(p->zp_ping_aio) == 0) { + nni_aio_schedule( + p->zp_ping_aio, zt_pipe_cancel_ping, p); p->zp_ping_active = 1; zt_pipe_send_ping(p); } @@ -2402,11 +2409,13 @@ zt_ep_accept(void *arg, nni_aio *aio) { zt_ep *ep = arg; - nni_mtx_lock(&zt_lk); - if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) { - nni_aio_list_append(&ep->ze_aios, aio); - zt_ep_doaccept(ep); + if (nni_aio_begin(aio) != 0) { + return; } + nni_mtx_lock(&zt_lk); + nni_aio_schedule(aio, zt_ep_cancel, ep); + nni_aio_list_append(&ep->ze_aios, aio); + zt_ep_doaccept(ep); nni_mtx_unlock(&zt_lk); } @@ -2479,7 +2488,8 @@ zt_ep_conn_req_cb(void *arg) if (nni_list_first(&ep->ze_aios) != NULL) { nni_aio_set_timeout(aio, ep->ze_conn_time); - if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) { + if (nni_aio_begin(aio) == 0) { + nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep); ep->ze_creq_active = 1; ep->ze_creq_try++; zt_ep_send_conn_req(ep); @@ -2493,43 +2503,41 @@ static void zt_ep_connect(void *arg, nni_aio *aio) { zt_ep *ep = arg; + int rv; + if (nni_aio_begin(aio) != 0) { + return; + } // We bind locally. We'll use the address later when we give // it to the pipe, but this allows us to receive the initial // ack back from the server. (This gives us an ephemeral // address to work with.) nni_mtx_lock(&zt_lk); - if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) { - int rv; - - // Clear the port so we get an ephemeral port. - ep->ze_laddr &= ~((uint64_t) zt_port_mask); - - if ((rv = zt_ep_bind_locked(ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&zt_lk); - return; - } - - if ((ep->ze_raddr >> 24) == 0) { - ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift); - } - nni_aio_list_append(&ep->ze_aios, aio); - - ep->ze_running = 1; - - nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time); + // Clear the port so we get an ephemeral port. + ep->ze_laddr &= ~((uint64_t) zt_port_mask); - if (nni_aio_start( - ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) { + if ((rv = zt_ep_bind_locked(ep)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&zt_lk); + return; + } - // Send out the first connect message; if not - // yet attached to network message will be dropped. - ep->ze_creq_try = 1; - ep->ze_creq_active = 1; - zt_ep_send_conn_req(ep); - } + if ((ep->ze_raddr >> 24) == 0) { + ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift); + } + nni_aio_list_append(&ep->ze_aios, aio); + ep->ze_running = 1; + nni_aio_schedule(aio, zt_ep_cancel, ep); + + nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time); + if (nni_aio_begin(ep->ze_creq_aio) == 0) { + nni_aio_schedule(ep->ze_creq_aio, zt_ep_conn_req_cancel, ep); + // Send out the first connect message; if not + // yet attached to network message will be dropped. + ep->ze_creq_try = 1; + ep->ze_creq_active = 1; + zt_ep_send_conn_req(ep); } nni_mtx_unlock(&zt_lk); } |
