aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c150
1 files changed, 141 insertions, 9 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a036a606..341b218e 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -54,6 +54,57 @@ static nni_list nni_aio_expire_aios;
// if it comes back nonzero (NNG_ESTATE) then it must simply discard the
// request and return.
+// An nni_aio is an async I/O handle.
+struct nng_aio {
+ int a_result; // Result code (nng_errno)
+ size_t a_count; // Bytes transferred (I/O only)
+ nni_time a_expire; // Absolute timeout
+ nni_duration a_timeout; // Relative timeout
+
+ // These fields are private to the aio framework.
+ nni_cv a_cv;
+ unsigned a_fini : 1; // shutting down (no new operations)
+ unsigned a_done : 1; // operation has completed
+ unsigned a_pend : 1; // completion routine pending
+ unsigned a_active : 1; // aio was started
+ unsigned a_expiring : 1; // expiration callback in progress
+ unsigned a_waiting : 1; // a thread is waiting for this to finish
+ unsigned a_synch : 1; // run completion synchronously
+ nni_task a_task;
+
+ // Read/write operations.
+ nni_iov *a_iov;
+ int a_niov;
+ nni_iov a_iovinl[4]; // inline IOVs - when the IOV list is short
+ nni_iov *a_iovalloc; // dynamically allocated IOVs
+ int a_niovalloc; // number of allocated IOVs
+
+ // Message operations.
+ nni_msg *a_msg;
+
+ // Connect/accept operations.
+ void *a_pipe; // opaque pipe handle
+
+ // User scratch data. Consumers may store values here, which
+ // must be preserved by providers and the framework.
+ void *a_user_data[4];
+
+ // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be
+ // specified. The semantics of these will vary, and depend on the
+ // specific operation.
+ void *a_inputs[4];
+ void *a_outputs[4];
+
+ // Provider-use fields.
+ nni_aio_cancelfn a_prov_cancel;
+ void * a_prov_data;
+ nni_list_node a_prov_node;
+ void * a_prov_extra[4]; // Extra data used by provider
+
+ // Expire node.
+ nni_list_node a_expire_node;
+};
+
static void nni_aio_expire_add(nni_aio *);
int
@@ -88,7 +139,7 @@ nni_aio_fini(nni_aio *aio)
nni_cv_fini(&aio->a_cv);
if (aio->a_niovalloc > 0) {
- NNI_FREE_STRUCTS(aio->a_iov, aio->a_niovalloc);
+ NNI_FREE_STRUCTS(aio->a_iovalloc, aio->a_niovalloc);
}
NNI_FREE_STRUCT(aio);
@@ -96,21 +147,27 @@ nni_aio_fini(nni_aio *aio)
}
int
-nni_aio_set_iov(nni_aio *aio, int niov, nng_iov *iov)
+nni_aio_set_iov(nni_aio *aio, int niov, const nni_iov *iov)
{
- if ((niov > 4) && (niov > aio->a_niovalloc)) {
+ if ((niov > NNI_NUM_ELEMENTS(aio->a_iovinl)) &&
+ (niov > aio->a_niovalloc)) {
nni_iov *newiov = NNI_ALLOC_STRUCTS(newiov, niov);
if (newiov == NULL) {
return (NNG_ENOMEM);
}
if (aio->a_niovalloc > 0) {
- NNI_FREE_STRUCTS(aio->a_iov, aio->a_niovalloc);
+ NNI_FREE_STRUCTS(aio->a_iovalloc, aio->a_niovalloc);
}
aio->a_iov = newiov;
+ aio->a_iovalloc = newiov;
aio->a_niovalloc = niov;
}
-
- memcpy(aio->a_iov, iov, niov * sizeof(nng_iov));
+ if (niov <= NNI_NUM_ELEMENTS(aio->a_iovinl)) {
+ aio->a_iov = aio->a_iovinl;
+ } else {
+ aio->a_iov = aio->a_iovalloc;
+ }
+ memcpy(aio->a_iov, iov, niov * sizeof(nni_iov));
aio->a_niov = niov;
return (0);
}
@@ -136,7 +193,7 @@ nni_aio_stop(nni_aio *aio)
aio->a_fini = 1;
nni_mtx_unlock(&nni_aio_lk);
- nni_aio_cancel(aio, NNG_ECANCELED);
+ nni_aio_abort(aio, NNG_ECANCELED);
nni_aio_wait(aio);
}
@@ -293,10 +350,10 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
return (0);
}
-// nni_aio_cancel is called by a consumer which guarantees that the aio
+// nni_aio_abort is called by a consumer which guarantees that the aio
// is still valid.
void
-nni_aio_cancel(nni_aio *aio, int rv)
+nni_aio_abort(nni_aio *aio, int rv)
{
nni_aio_cancelfn cancelfn;
@@ -502,6 +559,81 @@ nni_aio_expire_loop(void *arg)
}
}
+void *
+nni_aio_get_prov_data(nni_aio *aio)
+{
+ return (aio->a_prov_data);
+}
+
+void
+nni_aio_set_prov_data(nni_aio *aio, void *data)
+{
+ aio->a_prov_data = data;
+}
+
+void *
+nni_aio_get_prov_extra(nni_aio *aio, unsigned index)
+{
+ return (aio->a_prov_extra[index]);
+}
+
+void
+nni_aio_set_prov_extra(nni_aio *aio, unsigned index, void *data)
+{
+ aio->a_prov_extra[index] = data;
+}
+
+void
+nni_aio_get_iov(nni_aio *aio, int *niovp, nni_iov **iovp)
+{
+ *niovp = aio->a_niov;
+ *iovp = aio->a_iov;
+}
+
+void
+nni_aio_normalize_timeout(nni_aio *aio, nng_duration dur)
+{
+ if (aio->a_timeout == NNG_DURATION_DEFAULT) {
+ aio->a_timeout = dur;
+ }
+}
+
+void
+nni_aio_bump_count(nni_aio *aio, size_t n)
+{
+ aio->a_count += n;
+}
+
+size_t
+nni_aio_iov_count(nni_aio *aio)
+{
+ size_t resid = 0;
+
+ for (int i = 0; i < aio->a_niov; i++) {
+ resid += aio->a_iov[i].iov_len;
+ }
+ return (resid);
+}
+
+size_t
+nni_aio_iov_advance(nni_aio *aio, size_t n)
+{
+ size_t resid = n;
+ while (n) {
+ NNI_ASSERT(aio->a_niov != 0);
+ if (aio->a_iov[0].iov_len > n) {
+ aio->a_iov[0].iov_len -= n;
+ NNI_INCPTR(aio->a_iov[0].iov_buf, n);
+ return (0); // we used all of "n"
+ }
+ resid -= aio->a_iov[0].iov_len;
+ n -= aio->a_iov[0].iov_len;
+ aio->a_iov = &aio->a_iov[1];
+ aio->a_niov--;
+ }
+ return (resid); // we might not have used all of n for this iov
+}
+
void
nni_aio_sys_fini(void)
{