aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-12-10 22:17:23 -0800
committerGarrett D'Amore <garrett@damore.org>2020-12-12 11:32:51 -0800
commit2033988343bce413763d3e9664e3e8372da48591 (patch)
treef39ce75c40dd94f95884d7d4c43a76df510a86bf /src
parentb45f876d005371f62fc261a5584c4d7dafd7a0f7 (diff)
downloadnng-2033988343bce413763d3e9664e3e8372da48591.tar.gz
nng-2033988343bce413763d3e9664e3e8372da48591.tar.bz2
nng-2033988343bce413763d3e9664e3e8372da48591.zip
fixes #1313 support deferred nng_aio destruction
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c134
-rw-r--r--src/core/aio.h8
-rw-r--r--src/core/aio_test.c21
-rw-r--r--src/nng.c6
-rw-r--r--src/supplemental/http/http_client.c21
5 files changed, 145 insertions, 45 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 1a8739e4..b910a600 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -14,11 +14,18 @@
static nni_mtx nni_aio_lk;
// These are used for expiration.
static nni_cv nni_aio_expire_cv;
-static int nni_aio_expire_run;
+static bool nni_aio_expire_exit;
static nni_thr nni_aio_expire_thr;
static nni_list nni_aio_expire_list;
static nni_aio *nni_aio_expire_aio;
+// Reaping items.
+static nni_thr nni_aio_reap_thr;
+static nni_aio *nni_aio_reap_list;
+static nni_mtx nni_aio_reap_lk;
+static nni_cv nni_aio_reap_cv;
+static bool nni_aio_reap_exit;
+
// Design notes.
//
// AIOs are only ever "completed" by the provider, which must call
@@ -76,7 +83,7 @@ void
nni_aio_fini(nni_aio *aio)
{
nni_aio_cancel_fn fn;
- void * arg;
+ void * arg;
// TODO: This probably could just use nni_aio_stop.
@@ -134,6 +141,18 @@ nni_aio_free(nni_aio *aio)
}
}
+void
+nni_aio_reap(nni_aio *aio)
+{
+ if (aio != NULL) {
+ nni_mtx_lock(&nni_aio_reap_lk);
+ aio->a_reap_next = nni_aio_reap_list;
+ nni_aio_reap_list = aio;
+ nni_cv_wake1(&nni_aio_reap_cv);
+ nni_mtx_unlock(&nni_aio_reap_lk);
+ }
+}
+
int
nni_aio_set_iov(nni_aio *aio, unsigned nio, const nni_iov *iov)
{
@@ -164,7 +183,7 @@ nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
- void * arg;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
fn = aio->a_cancel_fn;
@@ -187,7 +206,7 @@ nni_aio_close(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
- void * arg;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
fn = aio->a_cancel_fn;
@@ -347,7 +366,7 @@ void
nni_aio_abort(nni_aio *aio, int rv)
{
nni_aio_cancel_fn fn;
- void * arg;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
fn = aio->a_cancel_fn;
@@ -471,13 +490,13 @@ nni_aio_expire_loop(void *unused)
NNI_ARG_UNUSED(unused);
- nni_thr_set_name(NULL, "nng:aio:expire");
+ nni_thr_set_name(NULL, "nng:aio:expire");
for (;;) {
nni_aio_cancel_fn fn;
- nni_time now;
- nni_aio * aio;
- int rv;
+ nni_time now;
+ nni_aio * aio;
+ int rv;
now = nni_clock();
@@ -485,7 +504,7 @@ nni_aio_expire_loop(void *unused)
if ((aio = nni_list_first(list)) == NULL) {
- if (nni_aio_expire_run == 0) {
+ if (nni_aio_expire_exit) {
nni_mtx_unlock(&nni_aio_lk);
return;
}
@@ -530,6 +549,41 @@ nni_aio_expire_loop(void *unused)
}
}
+static void
+nni_aio_reap_loop(void *unused)
+{
+ NNI_ARG_UNUSED(unused);
+
+ nni_thr_set_name(NULL, "nng:aio:reap");
+
+ nni_mtx_lock(&nni_aio_reap_lk);
+
+ for (;;) {
+ nni_aio *aio;
+
+ if ((aio = nni_aio_reap_list) == NULL) {
+ if (nni_aio_reap_exit) {
+ break;
+ }
+
+ nni_cv_wait(&nni_aio_reap_cv);
+ continue;
+ }
+ nni_aio_reap_list = NULL;
+ nni_mtx_unlock(&nni_aio_reap_lk);
+
+ while (aio != NULL) {
+ nni_aio *old = aio;
+ aio = aio->a_reap_next;
+ nni_aio_free(old);
+ }
+
+ nni_mtx_lock(&nni_aio_reap_lk);
+ }
+
+ nni_mtx_unlock(&nni_aio_reap_lk);
+}
+
void *
nni_aio_get_prov_extra(nni_aio *aio, unsigned index)
{
@@ -645,40 +699,60 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
void
nni_aio_sys_fini(void)
{
- nni_mtx *mtx = &nni_aio_lk;
- nni_cv * cv = &nni_aio_expire_cv;
- nni_thr *thr = &nni_aio_expire_thr;
+ nni_mtx *mtx1 = &nni_aio_lk;
+ nni_cv * cv1 = &nni_aio_expire_cv;
+ nni_thr *thr1 = &nni_aio_expire_thr;
+ nni_mtx *mtx2 = &nni_aio_reap_lk;
+ nni_cv * cv2 = &nni_aio_reap_cv;
+ nni_thr *thr2 = &nni_aio_reap_thr;
- if (nni_aio_expire_run) {
- nni_mtx_lock(mtx);
- nni_aio_expire_run = 0;
- nni_cv_wake(cv);
- nni_mtx_unlock(mtx);
+ if (!nni_aio_expire_exit) {
+ nni_mtx_lock(mtx1);
+ nni_aio_expire_exit = true;
+ nni_cv_wake(cv1);
+ nni_mtx_unlock(mtx1);
}
- nni_thr_fini(thr);
- nni_cv_fini(cv);
- nni_mtx_fini(mtx);
+ if (!nni_aio_reap_exit) {
+ nni_mtx_lock(mtx2);
+ nni_aio_reap_exit = true;
+ nni_cv_wake(cv2);
+ nni_mtx_unlock(mtx2);
+ }
+
+ nni_thr_fini(thr1);
+ nni_cv_fini(cv1);
+ nni_mtx_fini(mtx1);
+
+ nni_thr_fini(thr2);
+ nni_cv_fini(cv2);
+ nni_mtx_fini(mtx2);
}
int
nni_aio_sys_init(void)
{
- int rv;
- nni_mtx *mtx = &nni_aio_lk;
- nni_cv * cv = &nni_aio_expire_cv;
- nni_thr *thr = &nni_aio_expire_thr;
+ int rv, rv1, rv2;
+ nni_thr *thr1 = &nni_aio_expire_thr;
+ nni_thr *thr2 = &nni_aio_reap_thr;
NNI_LIST_INIT(&nni_aio_expire_list, nni_aio, a_expire_node);
- nni_mtx_init(mtx);
- nni_cv_init(cv, mtx);
+ nni_mtx_init(&nni_aio_lk);
+ nni_cv_init(&nni_aio_expire_cv, &nni_aio_lk);
+ nni_mtx_init(&nni_aio_reap_lk);
+ nni_cv_init(&nni_aio_reap_cv, &nni_aio_reap_lk);
+
+ nni_aio_expire_exit = false;
+ nni_aio_reap_exit = false;
- if ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0) {
+ rv1 = nni_thr_init(thr1, nni_aio_expire_loop, NULL);
+ rv2 = nni_thr_init(thr2, nni_aio_reap_loop, NULL);
+ if (((rv = rv1) != 0) || ((rv = rv2) != 0)) {
nni_aio_sys_fini();
return (rv);
}
- nni_aio_expire_run = 1;
- nni_thr_run(thr);
+ nni_thr_run(thr1);
+ nni_thr_run(thr2);
return (0);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index 80f48341..dbe7fbb9 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -28,6 +28,10 @@ extern void nni_aio_init(nni_aio *, nni_cb, void *arg);
// It waits for the callback to complete.
extern void nni_aio_fini(nni_aio *);
+// nni_aio_reap is used to asynchronously reap the aio. It can
+// be called even from the callback of the aio itself.
+extern void nni_aio_reap(nni_aio *);
+
// nni_aio_alloc allocates an aio object and initializes it. The callback
// is called with the supplied argument when the operation is complete.
// If NULL is supplied for the callback, then nni_aio_wake is used in its
@@ -195,8 +199,8 @@ struct nng_aio {
nni_list_node a_prov_node; // Linkage on provider list.
void * a_prov_extra[2]; // Extra data used by provider
- // Expire node.
- nni_list_node a_expire_node;
+ nni_list_node a_expire_node; // Expiration node
+ struct nng_aio *a_reap_next;
};
#endif // CORE_AIO_H
diff --git a/src/core/aio_test.c b/src/core/aio_test.c
index 3dab4b04..acf3c129 100644
--- a/src/core/aio_test.c
+++ b/src/core/aio_test.c
@@ -35,7 +35,7 @@ void
test_sleep(void)
{
nng_time start;
- nng_time end = 0;
+ nng_time end = 0;
nng_aio *aio;
NUTS_PASS(nng_aio_alloc(&aio, sleep_done, &end));
@@ -55,7 +55,7 @@ void
test_sleep_timeout(void)
{
nng_time start;
- nng_time end = 0;
+ nng_time end = 0;
nng_aio *aio;
NUTS_TRUE(nng_aio_alloc(&aio, sleep_done, &end) == 0);
@@ -226,6 +226,22 @@ test_zero_timeout(void)
NUTS_PASS(nng_close(s));
}
+static void
+aio_sleep_cb(void *arg)
+{
+ nng_aio *aio = *(nng_aio **) arg;
+ nng_aio_reap(aio);
+}
+
+void
+test_aio_reap(void)
+{
+ nng_aio *a;
+ NUTS_PASS(nng_aio_alloc(&a, aio_sleep_cb, &a));
+ nng_sleep_aio(10, a);
+ nng_msleep(20);
+}
+
NUTS_TESTS = {
{ "sleep", test_sleep },
{ "sleep timeout", test_sleep_timeout },
@@ -236,5 +252,6 @@ NUTS_TESTS = {
{ "explicit timeout", test_explicit_timeout },
{ "inherited timeout", test_inherited_timeout },
{ "zero timeout", test_zero_timeout },
+ { "aio reap", test_aio_reap },
{ NULL, NULL },
}; \ No newline at end of file
diff --git a/src/nng.c b/src/nng.c
index 10298035..3a3c3c5c 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1737,6 +1737,12 @@ nng_aio_free(nng_aio *aio)
}
void
+nng_aio_reap(nng_aio *aio)
+{
+ nni_aio_reap(aio);
+}
+
+void
nng_sleep_aio(nng_duration ms, nng_aio *aio)
{
nni_sleep_aio(ms, aio);
diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c
index c6f5167a..3737cb7b 100644
--- a/src/supplemental/http/http_client.c
+++ b/src/supplemental/http/http_client.c
@@ -224,11 +224,10 @@ typedef struct http_txn {
nni_http_res * res;
nni_http_chunks *chunks;
http_txn_state state;
- nni_reap_item reap;
} http_txn;
static void
-http_txn_reap(void *arg)
+http_txn_fini(void *arg)
{
http_txn *txn = arg;
if (txn->client != NULL) {
@@ -239,7 +238,7 @@ http_txn_reap(void *arg)
}
}
nni_http_chunks_free(txn->chunks);
- nni_aio_free(txn->aio);
+ nni_aio_reap(txn->aio);
NNI_FREE_STRUCT(txn);
}
@@ -270,7 +269,7 @@ http_txn_cb(void *arg)
if ((rv = nni_aio_result(txn->aio)) != 0) {
http_txn_finish_aios(txn, rv);
nni_mtx_unlock(&http_txn_lk);
- nni_reap(&txn->reap, http_txn_reap, txn);
+ http_txn_fini(txn);
return;
}
switch (txn->state) {
@@ -314,7 +313,7 @@ http_txn_cb(void *arg)
// never transfers data), then we are done.
http_txn_finish_aios(txn, 0);
nni_mtx_unlock(&http_txn_lk);
- nni_reap(&txn->reap, http_txn_reap, txn);
+ http_txn_fini(txn);
return;
}
@@ -333,7 +332,7 @@ http_txn_cb(void *arg)
// All done!
http_txn_finish_aios(txn, 0);
nni_mtx_unlock(&http_txn_lk);
- nni_reap(&txn->reap, http_txn_reap, txn);
+ http_txn_fini(txn);
return;
case HTTP_RECVING_CHUNKS:
@@ -352,7 +351,7 @@ http_txn_cb(void *arg)
}
http_txn_finish_aios(txn, 0);
nni_mtx_unlock(&http_txn_lk);
- nni_reap(&txn->reap, http_txn_reap, txn);
+ http_txn_fini(txn);
return;
}
@@ -360,7 +359,7 @@ error:
http_txn_finish_aios(txn, rv);
nni_http_conn_close(txn->conn);
nni_mtx_unlock(&http_txn_lk);
- nni_reap(&txn->reap, http_txn_reap, txn);
+ http_txn_fini(txn);
}
static void
@@ -411,7 +410,7 @@ nni_http_transact_conn(
if ((rv = nni_aio_schedule(aio, http_txn_cancel, txn)) != 0) {
nni_mtx_unlock(&http_txn_lk);
nni_aio_finish_error(aio, rv);
- nni_reap(&txn->reap, http_txn_reap, txn);
+ http_txn_fini(txn);
return;
}
nni_http_res_reset(txn->res);
@@ -448,7 +447,7 @@ nni_http_transact(nni_http_client *client, nni_http_req *req,
if ((rv = nni_http_req_set_header(req, "Connection", "close")) != 0) {
nni_aio_finish_error(aio, rv);
- nni_reap(&txn->reap, http_txn_reap, txn);
+ http_txn_fini(txn);
return;
}
@@ -463,7 +462,7 @@ nni_http_transact(nni_http_client *client, nni_http_req *req,
if ((rv = nni_aio_schedule(aio, http_txn_cancel, txn)) != 0) {
nni_mtx_unlock(&http_txn_lk);
nni_aio_finish_error(aio, rv);
- nni_reap(&txn->reap, http_txn_reap, txn);
+ http_txn_fini(txn);
return;
}
nni_http_res_reset(txn->res);