aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-08 20:34:26 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-08 21:16:30 -0800
commitb21d7805523a407a14567017edbdef57ca81781f (patch)
treee07f08bdc047ee4dfb057b670766e3de5bf2f981 /src/core
parent8479b4c8861c77cfd9eb64e724615605bdd1cbcb (diff)
downloadnng-b21d7805523a407a14567017edbdef57ca81781f.tar.gz
nng-b21d7805523a407a14567017edbdef57ca81781f.tar.bz2
nng-b21d7805523a407a14567017edbdef57ca81781f.zip
fixes #1094 Consider in-lining task and aio
This only does it for rep, but it also has changes that should increase the overall test coverage for the REP protocol
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c134
-rw-r--r--src/core/aio.h74
-rw-r--r--src/core/device.c10
-rw-r--r--src/core/dialer.c12
-rw-r--r--src/core/listener.c8
5 files changed, 133 insertions, 105 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 9f025f4c..a15bb47b 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -61,107 +61,75 @@ static nni_aio *nni_aio_expire_aio;
// operations from starting, without waiting for any existing one to
// complete, call nni_aio_close.
-// An nni_aio is an async I/O handle.
-struct nng_aio {
- int a_result; // Result code (nng_errno)
- size_t a_count; // Bytes transferred (I/O only)
- nni_time a_expire; // Absolute timeout
- nni_duration a_timeout; // Relative timeout
-
- bool a_stop; // shutting down (no new operations)
- bool a_sleep; // sleeping with no action
- bool a_expire_ok; // expire from sleep is ok
- nni_task a_task;
-
- // Read/write operations.
- nni_iov a_iov[8];
- unsigned a_niov;
-
- // Message operations.
- nni_msg *a_msg;
-
- // User scratch data. Consumers may store values here, which
- // must be preserved by providers and the framework.
- void *a_user_data[4];
-
- // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be
- // specified. The semantics of these will vary, and depend on the
- // specific operation.
- void *a_inputs[4];
- void *a_outputs[4];
-
- // Provider-use fields.
- nni_aio_cancelfn a_cancel_fn;
- void * a_cancel_arg;
- nni_list_node a_prov_node;
- void * a_prov_extra[4]; // Extra data used by provider
-
- // Socket address. This turns out to be very useful, as we wind up
- // needing socket addresses for numerous connection related routines.
- // It would be cleaner to not have this and avoid burning the space,
- // but having this hear dramatically simplifies lots of code.
- nng_sockaddr a_sockaddr;
-
- // Expire node.
- nni_list_node a_expire_node;
-};
-
static void nni_aio_expire_add(nni_aio *);
-int
-nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
+void
+nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
{
- nni_aio *aio;
-
- if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
- return (NNG_ENOMEM);
- }
+ memset(aio, 0, sizeof(*aio));
nni_task_init(&aio->a_task, NULL, cb, arg);
aio->a_expire = NNI_TIME_NEVER;
aio->a_timeout = NNG_DURATION_INFINITE;
- *aiop = aio;
- return (0);
}
void
nni_aio_fini(nni_aio *aio)
{
- if (aio != NULL) {
+ nni_aio_cancelfn fn;
+ void * arg;
- nni_aio_cancelfn fn;
- void * arg;
+ // TODO: This probably could just use nni_aio_stop.
- // This is like aio_close, but we don't want to dispatch
- // the task. And unlike aio_stop, we don't want to wait
- // for the task. (Because we implicitly do task_fini.)
- nni_mtx_lock(&nni_aio_lk);
- fn = aio->a_cancel_fn;
- arg = aio->a_cancel_arg;
- aio->a_cancel_fn = NULL;
- aio->a_cancel_arg = NULL;
- aio->a_stop = true;
- nni_mtx_unlock(&nni_aio_lk);
+ // This is like aio_close, but we don't want to dispatch
+ // the task. And unlike aio_stop, we don't want to wait
+ // for the task. (Because we implicitly do task_fini.)
+ nni_mtx_lock(&nni_aio_lk);
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_stop = true;
+ nni_mtx_unlock(&nni_aio_lk);
- if (fn != NULL) {
- fn(aio, arg, NNG_ECLOSED);
- }
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECLOSED);
+ }
- // Wait for the aio to be "done"; this ensures that we don't
- // destroy an aio from a "normal" completion callback while
- // the expiration thread is working.
+ // Wait for the aio to be "done"; this ensures that we don't
+ // destroy an aio from a "normal" completion callback while
+ // the expiration thread is working.
- nni_mtx_lock(&nni_aio_lk);
- while (nni_aio_expire_aio == aio) {
- if (nni_thr_is_self(&nni_aio_expire_thr)) {
- nni_aio_expire_aio = NULL;
- break;
- }
- nni_cv_wait(&nni_aio_expire_cv);
+ nni_mtx_lock(&nni_aio_lk);
+ while (nni_aio_expire_aio == aio) {
+ // TODO: It should be possible to remove this check!
+ if (nni_thr_is_self(&nni_aio_expire_thr)) {
+ nni_aio_expire_aio = NULL;
+ break;
}
- nni_mtx_unlock(&nni_aio_lk);
+ nni_cv_wait(&nni_aio_expire_cv);
+ }
+ nni_mtx_unlock(&nni_aio_lk);
+ nni_task_fini(&aio->a_task);
+}
- nni_task_fini(&aio->a_task);
+int
+nni_aio_alloc(nni_aio **aiop, nni_cb cb, void *arg)
+{
+ nni_aio *aio;
+
+ if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_aio_init(aio, cb, arg);
+ *aiop = aio;
+ return (0);
+}
+void
+nni_aio_free(nni_aio *aio)
+{
+ if (aio != NULL) {
+ nni_aio_fini(aio);
NNI_FREE_STRUCT(aio);
}
}
@@ -189,7 +157,7 @@ nni_aio_set_iov(nni_aio *aio, unsigned niov, const nni_iov *iov)
// callback to complete, if still running. It also marks the AIO as
// stopped, preventing further calls to nni_aio_begin from succeeding.
// To correctly tear down an AIO, call stop, and make sure any other
-// callers are not also stopped, before calling nni_aio_fini to release
+// callers are not also stopped, before calling nni_aio_free to release
// actual memory.
void
nni_aio_stop(nni_aio *aio)
diff --git a/src/core/aio.h b/src/core/aio.h
index 83c068b5..c2776bc0 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -22,19 +22,29 @@ typedef void (*nni_aio_cancelfn)(nni_aio *, void *, int);
// the supplied argument when the operation is complete. If NULL is
// supplied for the callback, then nni_aio_wake is used in its place,
// and the aio is used for the argument.
-extern int nni_aio_init(nni_aio **, nni_cb, void *);
+extern void nni_aio_init(nni_aio *, nni_cb, void *arg);
-// nni_aio_fini finalizes the aio, releasing resources (locks)
-// associated with it. The caller is responsible for ensuring that any
-// associated I/O is unscheduled or complete. This is safe to call
-// on zero'd memory.
+// nni_aio_fini finalizes an aio object, releasing associated resources.
+// It waits for the callback to complete.
extern void nni_aio_fini(nni_aio *);
+// nni_aio_alloc allocates an aio object and initializes it. The callback
+// is called with the supplied argument when the operation is complete.
+// If NULL is supplied for the callback, then nni_aio_wake is used in its
+// place, and the aio is used for the argument.
+extern int nni_aio_alloc(nni_aio **, nni_cb, void *arg);
+
+// nni_aio_free frees the aio, releasing resources (locks)
+// associated with it. This is safe to call on zero'd memory.
+// This must only be called on an object that was allocated
+// with nni_aio_allocate.
+extern void nni_aio_free(nni_aio *aio);
+
// nni_aio_stop cancels any unfinished I/O, running completion callbacks,
// but also prevents any new operations from starting (nni_aio_start will
-// return NNG_ESTATE). This should be called before nni_aio_fini(). The
+// return NNG_ESTATE). This should be called before nni_aio_free(). The
// best pattern is to call nni_aio_stop on all linked aios, before calling
-// nni_aio_fini on any of them. This function will block until any
+// nni_aio_free on any of them. This function will block until any
// callbacks are executed, and therefore it should never be executed
// from a callback itself. (To abort operations without blocking
// use nni_aio_cancel instead.)
@@ -162,4 +172,54 @@ extern void nni_sleep_aio(nni_duration, nni_aio *);
extern int nni_aio_sys_init(void);
extern void nni_aio_sys_fini(void);
+
+// An nni_aio is an async I/O handle. The details of this aio structure
+// are private to the AIO framework. The structure has the public name
+// (nng_aio) so that we minimize the pollution in the public API namespace.
+// It is a coding error for anything out side of the AIO framework to access
+// any of these members -- the definition is provided here to facilitate
+// inlining, but that should be the only use.
+struct nng_aio {
+ size_t a_count; // Bytes transferred (I/O only)
+ nni_time a_expire; // Absolute timeout
+ nni_duration a_timeout; // Relative timeout
+ int a_result; // Result code (nng_errno)
+ bool a_stop; // Shutting down (no new operations)
+ bool a_sleep; // Sleeping with no action
+ bool a_expire_ok; // Expire from sleep is ok
+ nni_task a_task;
+
+ // Read/write operations.
+ nni_iov a_iov[8];
+ unsigned a_niov;
+
+ // Message operations.
+ nni_msg *a_msg;
+
+ // User scratch data. Consumers may store values here, which
+ // must be preserved by providers and the framework.
+ void *a_user_data[4];
+
+ // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be
+ // specified. The semantics of these will vary, and depend on the
+ // specific operation.
+ void *a_inputs[4];
+ void *a_outputs[4];
+
+ // Provider-use fields.
+ nni_aio_cancelfn a_cancel_fn;
+ void * a_cancel_arg;
+ nni_list_node a_prov_node; // Linkage on provider list.
+ void * a_prov_extra[4]; // Extra data used by provider
+
+ // Socket address. This turns out to be very useful, as we wind up
+ // needing socket addresses for numerous connection related routines.
+ // It would be cleaner to not have this and avoid burning the space,
+ // but having this hear dramatically simplifies lots of code.
+ nng_sockaddr a_sockaddr;
+
+ // Expire node.
+ nni_list_node a_expire_node;
+};
+
#endif // CORE_AIO_H
diff --git a/src/core/device.c b/src/core/device.c
index fee108a8..71480bbc 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -94,7 +94,7 @@ nni_device_fini(nni_device_data *dd)
}
for (i = 0; i < dd->npath; i++) {
nni_device_path *p = &dd->paths[i];
- nni_aio_fini(p->aio);
+ nni_aio_free(p->aio);
}
nni_mtx_fini(&dd->mtx);
NNI_FREE_STRUCT(dd);
@@ -172,7 +172,7 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
p->dst = i == 0 ? s2 : s1;
p->state = NNI_DEVICE_STATE_INIT;
- if ((rv = nni_aio_init(&p->aio, nni_device_cb, p)) != 0) {
+ if ((rv = nni_aio_alloc(&p->aio, nni_device_cb, p)) != 0) {
nni_device_fini(dd);
return (rv);
}
@@ -221,11 +221,11 @@ nni_device(nni_sock *s1, nni_sock *s2)
nni_aio * aio;
int rv;
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) {
return (rv);
}
if ((rv = nni_device_init(&dd, s1, s2)) != 0) {
- nni_aio_fini(aio);
+ nni_aio_free(aio);
return (rv);
}
nni_device_start(dd, aio);
@@ -233,6 +233,6 @@ nni_device(nni_sock *s1, nni_sock *s2)
rv = nni_aio_result(aio);
nni_device_fini(dd);
- nni_aio_fini(aio);
+ nni_aio_free(aio);
return (rv);
}
diff --git a/src/core/dialer.c b/src/core/dialer.c
index e1178783..8a463452 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -60,8 +60,8 @@ nni_dialer_destroy(nni_dialer *d)
nni_aio_stop(d->d_con_aio);
nni_aio_stop(d->d_tmo_aio);
- nni_aio_fini(d->d_con_aio);
- nni_aio_fini(d->d_tmo_aio);
+ nni_aio_free(d->d_con_aio);
+ nni_aio_free(d->d_tmo_aio);
if (d->d_data != NULL) {
d->d_ops.d_fini(d->d_data);
@@ -204,8 +204,8 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
nni_mtx_init(&d->d_mtx);
dialer_stats_init(d);
- if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
- ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
+ if (((rv = nni_aio_alloc(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
+ ((rv = nni_aio_alloc(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) ||
((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) ||
((rv = nni_sock_add_dialer(s, d)) != 0)) {
@@ -382,7 +382,7 @@ nni_dialer_start(nni_dialer *d, int flags)
if ((flags & NNG_FLAG_NONBLOCK) != 0) {
aio = NULL;
} else {
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) {
nni_atomic_flag_reset(&d->d_started);
return (rv);
}
@@ -397,7 +397,7 @@ nni_dialer_start(nni_dialer *d, int flags)
if (aio != NULL) {
nni_aio_wait(aio);
rv = nni_aio_result(aio);
- nni_aio_fini(aio);
+ nni_aio_free(aio);
}
return (rv);
diff --git a/src/core/listener.c b/src/core/listener.c
index d44a5456..87d2c532 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -61,8 +61,8 @@ nni_listener_destroy(nni_listener *l)
nni_aio_stop(l->l_acc_aio);
nni_aio_stop(l->l_tmo_aio);
- nni_aio_fini(l->l_acc_aio);
- nni_aio_fini(l->l_tmo_aio);
+ nni_aio_free(l->l_acc_aio);
+ nni_aio_free(l->l_tmo_aio);
if (l->l_data != NULL) {
l->l_ops.l_fini(l->l_data);
@@ -196,8 +196,8 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str)
NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node);
listener_stats_init(l);
- if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) ||
- ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) ||
+ if (((rv = nni_aio_alloc(&l->l_acc_aio, listener_accept_cb, l)) != 0) ||
+ ((rv = nni_aio_alloc(&l->l_tmo_aio, listener_timer_cb, l)) != 0) ||
((rv = l->l_ops.l_init(&l->l_data, url, l)) != 0) ||
((rv = nni_idhash_alloc32(listeners, &l->l_id, l)) != 0) ||
((rv = nni_sock_add_listener(s, l)) != 0)) {