diff options
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 150 |
1 files changed, 141 insertions, 9 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index a036a606..341b218e 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -54,6 +54,57 @@ static nni_list nni_aio_expire_aios; // if it comes back nonzero (NNG_ESTATE) then it must simply discard the // request and return. +// An nni_aio is an async I/O handle. +struct nng_aio { + int a_result; // Result code (nng_errno) + size_t a_count; // Bytes transferred (I/O only) + nni_time a_expire; // Absolute timeout + nni_duration a_timeout; // Relative timeout + + // These fields are private to the aio framework. + nni_cv a_cv; + unsigned a_fini : 1; // shutting down (no new operations) + unsigned a_done : 1; // operation has completed + unsigned a_pend : 1; // completion routine pending + unsigned a_active : 1; // aio was started + unsigned a_expiring : 1; // expiration callback in progress + unsigned a_waiting : 1; // a thread is waiting for this to finish + unsigned a_synch : 1; // run completion synchronously + nni_task a_task; + + // Read/write operations. + nni_iov *a_iov; + int a_niov; + nni_iov a_iovinl[4]; // inline IOVs - when the IOV list is short + nni_iov *a_iovalloc; // dynamically allocated IOVs + int a_niovalloc; // number of allocated IOVs + + // Message operations. + nni_msg *a_msg; + + // Connect/accept operations. + void *a_pipe; // opaque pipe handle + + // User scratch data. Consumers may store values here, which + // must be preserved by providers and the framework. + void *a_user_data[4]; + + // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be + // specified. The semantics of these will vary, and depend on the + // specific operation. + void *a_inputs[4]; + void *a_outputs[4]; + + // Provider-use fields. + nni_aio_cancelfn a_prov_cancel; + void * a_prov_data; + nni_list_node a_prov_node; + void * a_prov_extra[4]; // Extra data used by provider + + // Expire node. + nni_list_node a_expire_node; +}; + static void nni_aio_expire_add(nni_aio *); int @@ -88,7 +139,7 @@ nni_aio_fini(nni_aio *aio) nni_cv_fini(&aio->a_cv); if (aio->a_niovalloc > 0) { - NNI_FREE_STRUCTS(aio->a_iov, aio->a_niovalloc); + NNI_FREE_STRUCTS(aio->a_iovalloc, aio->a_niovalloc); } NNI_FREE_STRUCT(aio); @@ -96,21 +147,27 @@ nni_aio_fini(nni_aio *aio) } int -nni_aio_set_iov(nni_aio *aio, int niov, nng_iov *iov) +nni_aio_set_iov(nni_aio *aio, int niov, const nni_iov *iov) { - if ((niov > 4) && (niov > aio->a_niovalloc)) { + if ((niov > NNI_NUM_ELEMENTS(aio->a_iovinl)) && + (niov > aio->a_niovalloc)) { nni_iov *newiov = NNI_ALLOC_STRUCTS(newiov, niov); if (newiov == NULL) { return (NNG_ENOMEM); } if (aio->a_niovalloc > 0) { - NNI_FREE_STRUCTS(aio->a_iov, aio->a_niovalloc); + NNI_FREE_STRUCTS(aio->a_iovalloc, aio->a_niovalloc); } aio->a_iov = newiov; + aio->a_iovalloc = newiov; aio->a_niovalloc = niov; } - - memcpy(aio->a_iov, iov, niov * sizeof(nng_iov)); + if (niov <= NNI_NUM_ELEMENTS(aio->a_iovinl)) { + aio->a_iov = aio->a_iovinl; + } else { + aio->a_iov = aio->a_iovalloc; + } + memcpy(aio->a_iov, iov, niov * sizeof(nni_iov)); aio->a_niov = niov; return (0); } @@ -136,7 +193,7 @@ nni_aio_stop(nni_aio *aio) aio->a_fini = 1; nni_mtx_unlock(&nni_aio_lk); - nni_aio_cancel(aio, NNG_ECANCELED); + nni_aio_abort(aio, NNG_ECANCELED); nni_aio_wait(aio); } @@ -293,10 +350,10 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) return (0); } -// nni_aio_cancel is called by a consumer which guarantees that the aio +// nni_aio_abort is called by a consumer which guarantees that the aio // is still valid. void -nni_aio_cancel(nni_aio *aio, int rv) +nni_aio_abort(nni_aio *aio, int rv) { nni_aio_cancelfn cancelfn; @@ -502,6 +559,81 @@ nni_aio_expire_loop(void *arg) } } +void * +nni_aio_get_prov_data(nni_aio *aio) +{ + return (aio->a_prov_data); +} + +void +nni_aio_set_prov_data(nni_aio *aio, void *data) +{ + aio->a_prov_data = data; +} + +void * +nni_aio_get_prov_extra(nni_aio *aio, unsigned index) +{ + return (aio->a_prov_extra[index]); +} + +void +nni_aio_set_prov_extra(nni_aio *aio, unsigned index, void *data) +{ + aio->a_prov_extra[index] = data; +} + +void +nni_aio_get_iov(nni_aio *aio, int *niovp, nni_iov **iovp) +{ + *niovp = aio->a_niov; + *iovp = aio->a_iov; +} + +void +nni_aio_normalize_timeout(nni_aio *aio, nng_duration dur) +{ + if (aio->a_timeout == NNG_DURATION_DEFAULT) { + aio->a_timeout = dur; + } +} + +void +nni_aio_bump_count(nni_aio *aio, size_t n) +{ + aio->a_count += n; +} + +size_t +nni_aio_iov_count(nni_aio *aio) +{ + size_t resid = 0; + + for (int i = 0; i < aio->a_niov; i++) { + resid += aio->a_iov[i].iov_len; + } + return (resid); +} + +size_t +nni_aio_iov_advance(nni_aio *aio, size_t n) +{ + size_t resid = n; + while (n) { + NNI_ASSERT(aio->a_niov != 0); + if (aio->a_iov[0].iov_len > n) { + aio->a_iov[0].iov_len -= n; + NNI_INCPTR(aio->a_iov[0].iov_buf, n); + return (0); // we used all of "n" + } + resid -= aio->a_iov[0].iov_len; + n -= aio->a_iov[0].iov_len; + aio->a_iov = &aio->a_iov[1]; + aio->a_niov--; + } + return (resid); // we might not have used all of n for this iov +} + void nni_aio_sys_fini(void) { |
