aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-12-19 10:21:54 -0800
committerGarrett D'Amore <garrett@damore.org>2020-12-19 12:50:05 -0800
commitd12e169c1e733b255d146847ed57037b74681285 (patch)
treee4a59142a6cf097dfdda8620635f173f53db9e7a /src/core
parent2033988343bce413763d3e9664e3e8372da48591 (diff)
downloadnng-d12e169c1e733b255d146847ed57037b74681285.tar.gz
nng-d12e169c1e733b255d146847ed57037b74681285.tar.bz2
nng-d12e169c1e733b255d146847ed57037b74681285.zip
fixes #1372 nni_reap could be smaller
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c101
-rw-r--r--src/core/aio.h3
-rw-r--r--src/core/aio_test.c4
-rw-r--r--src/core/dialer.c10
-rw-r--r--src/core/listener.c9
-rw-r--r--src/core/pipe.c67
-rw-r--r--src/core/pipe.h4
-rw-r--r--src/core/reap.c102
-rw-r--r--src/core/reap.h42
-rw-r--r--src/core/socket.c44
-rw-r--r--src/core/sockimpl.h7
11 files changed, 206 insertions, 187 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index b910a600..272f5e9d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -19,13 +19,6 @@ static nni_thr nni_aio_expire_thr;
static nni_list nni_aio_expire_list;
static nni_aio *nni_aio_expire_aio;
-// Reaping items.
-static nni_thr nni_aio_reap_thr;
-static nni_aio *nni_aio_reap_list;
-static nni_mtx nni_aio_reap_lk;
-static nni_cv nni_aio_reap_cv;
-static bool nni_aio_reap_exit;
-
// Design notes.
//
// AIOs are only ever "completed" by the provider, which must call
@@ -68,6 +61,11 @@ static bool nni_aio_reap_exit;
// operations from starting, without waiting for any existing one to
// complete, call nni_aio_close.
+static nni_reap_list aio_reap_list = {
+ .rl_offset = offsetof(nni_aio, a_reap_node),
+ .rl_func = (nni_cb) nni_aio_free,
+};
+
static void nni_aio_expire_add(nni_aio *);
void
@@ -145,11 +143,7 @@ void
nni_aio_reap(nni_aio *aio)
{
if (aio != NULL) {
- nni_mtx_lock(&nni_aio_reap_lk);
- aio->a_reap_next = nni_aio_reap_list;
- nni_aio_reap_list = aio;
- nni_cv_wake1(&nni_aio_reap_cv);
- nni_mtx_unlock(&nni_aio_reap_lk);
+ nni_reap(&aio_reap_list, aio);
}
}
@@ -549,41 +543,6 @@ nni_aio_expire_loop(void *unused)
}
}
-static void
-nni_aio_reap_loop(void *unused)
-{
- NNI_ARG_UNUSED(unused);
-
- nni_thr_set_name(NULL, "nng:aio:reap");
-
- nni_mtx_lock(&nni_aio_reap_lk);
-
- for (;;) {
- nni_aio *aio;
-
- if ((aio = nni_aio_reap_list) == NULL) {
- if (nni_aio_reap_exit) {
- break;
- }
-
- nni_cv_wait(&nni_aio_reap_cv);
- continue;
- }
- nni_aio_reap_list = NULL;
- nni_mtx_unlock(&nni_aio_reap_lk);
-
- while (aio != NULL) {
- nni_aio *old = aio;
- aio = aio->a_reap_next;
- nni_aio_free(old);
- }
-
- nni_mtx_lock(&nni_aio_reap_lk);
- }
-
- nni_mtx_unlock(&nni_aio_reap_lk);
-}
-
void *
nni_aio_get_prov_extra(nni_aio *aio, unsigned index)
{
@@ -699,60 +658,40 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
void
nni_aio_sys_fini(void)
{
- nni_mtx *mtx1 = &nni_aio_lk;
- nni_cv * cv1 = &nni_aio_expire_cv;
- nni_thr *thr1 = &nni_aio_expire_thr;
- nni_mtx *mtx2 = &nni_aio_reap_lk;
- nni_cv * cv2 = &nni_aio_reap_cv;
- nni_thr *thr2 = &nni_aio_reap_thr;
+ nni_mtx *mtx = &nni_aio_lk;
+ nni_cv * cv = &nni_aio_expire_cv;
+ nni_thr *thr = &nni_aio_expire_thr;
if (!nni_aio_expire_exit) {
- nni_mtx_lock(mtx1);
+ nni_mtx_lock(mtx);
nni_aio_expire_exit = true;
- nni_cv_wake(cv1);
- nni_mtx_unlock(mtx1);
+ nni_cv_wake(cv);
+ nni_mtx_unlock(mtx);
}
- if (!nni_aio_reap_exit) {
- nni_mtx_lock(mtx2);
- nni_aio_reap_exit = true;
- nni_cv_wake(cv2);
- nni_mtx_unlock(mtx2);
- }
-
- nni_thr_fini(thr1);
- nni_cv_fini(cv1);
- nni_mtx_fini(mtx1);
-
- nni_thr_fini(thr2);
- nni_cv_fini(cv2);
- nni_mtx_fini(mtx2);
+ nni_thr_fini(thr);
+ nni_cv_fini(cv);
+ nni_mtx_fini(mtx);
}
int
nni_aio_sys_init(void)
{
- int rv, rv1, rv2;
- nni_thr *thr1 = &nni_aio_expire_thr;
- nni_thr *thr2 = &nni_aio_reap_thr;
+ int rv;
+ nni_thr *thr = &nni_aio_expire_thr;
NNI_LIST_INIT(&nni_aio_expire_list, nni_aio, a_expire_node);
nni_mtx_init(&nni_aio_lk);
nni_cv_init(&nni_aio_expire_cv, &nni_aio_lk);
- nni_mtx_init(&nni_aio_reap_lk);
- nni_cv_init(&nni_aio_reap_cv, &nni_aio_reap_lk);
nni_aio_expire_exit = false;
- nni_aio_reap_exit = false;
- rv1 = nni_thr_init(thr1, nni_aio_expire_loop, NULL);
- rv2 = nni_thr_init(thr2, nni_aio_reap_loop, NULL);
- if (((rv = rv1) != 0) || ((rv = rv2) != 0)) {
+ rv = nni_thr_init(thr, nni_aio_expire_loop, NULL);
+ if (rv != 0) {
nni_aio_sys_fini();
return (rv);
}
- nni_thr_run(thr1);
- nni_thr_run(thr2);
+ nni_thr_run(thr);
return (0);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index dbe7fbb9..e108e4b8 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -13,6 +13,7 @@
#include "core/defs.h"
#include "core/list.h"
+#include "core/reap.h"
#include "core/taskq.h"
#include "core/thread.h"
@@ -200,7 +201,7 @@ struct nng_aio {
void * a_prov_extra[2]; // Extra data used by provider
nni_list_node a_expire_node; // Expiration node
- struct nng_aio *a_reap_next;
+ nni_reap_node a_reap_node;
};
#endif // CORE_AIO_H
diff --git a/src/core/aio_test.c b/src/core/aio_test.c
index acf3c129..01024af6 100644
--- a/src/core/aio_test.c
+++ b/src/core/aio_test.c
@@ -236,10 +236,10 @@ aio_sleep_cb(void *arg)
void
test_aio_reap(void)
{
- nng_aio *a;
+ static nng_aio *a;
NUTS_PASS(nng_aio_alloc(&a, aio_sleep_cb, &a));
nng_sleep_aio(10, a);
- nng_msleep(20);
+ nng_msleep(100);
}
NUTS_TESTS = {
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 3efbcabd..1f9a5e58 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -326,12 +326,16 @@ nni_dialer_hold(nni_dialer *d)
void
nni_dialer_rele(nni_dialer *d)
{
+ bool reap;
+
nni_mtx_lock(&dialers_lk);
d->d_ref--;
- if ((d->d_ref == 0) && (d->d_closed)) {
- nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d);
- }
+ reap = ((d->d_ref == 0) && (d->d_closed));
nni_mtx_unlock(&dialers_lk);
+
+ if (reap) {
+ nni_dialer_reap(d);
+ }
}
void
diff --git a/src/core/listener.c b/src/core/listener.c
index 83882389..13d5c6a6 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -314,12 +314,15 @@ nni_listener_hold(nni_listener *l)
void
nni_listener_rele(nni_listener *l)
{
+ bool reap;
+
nni_mtx_lock(&listeners_lk);
l->l_ref--;
- if ((l->l_ref == 0) && (l->l_closed)) {
- nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l);
- }
+ reap = ((l->l_ref == 0) && (l->l_closed));
nni_mtx_unlock(&listeners_lk);
+ if (reap) {
+ nni_listener_reap(l);
+ }
}
void
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 4e767605..47304fe7 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -22,6 +22,13 @@
static nni_id_map pipes;
static nni_mtx pipes_lk;
+static void pipe_destroy(void *);
+
+static nni_reap_list pipe_reap_list = {
+ .rl_offset = offsetof(nni_pipe, p_reap),
+ .rl_func = pipe_destroy,
+};
+
int
nni_pipe_sys_init(void)
{
@@ -43,8 +50,9 @@ nni_pipe_sys_fini(void)
}
static void
-pipe_destroy(nni_pipe *p)
+pipe_destroy(void *arg)
{
+ nni_pipe *p = arg;
if (p == NULL) {
return;
}
@@ -158,7 +166,7 @@ nni_pipe_close(nni_pipe *p)
p->p_tran_ops.p_close(p->p_tran_data);
}
- nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p);
+ nni_reap(&pipe_reap_list, p);
}
uint16_t
@@ -201,25 +209,25 @@ pipe_stats_init(nni_pipe *p)
.si_atomic = true,
};
static const nni_stat_info tx_msgs_info = {
- .si_name = "tx_msgs",
- .si_desc = "messages sent",
- .si_type = NNG_STAT_COUNTER,
- .si_unit = NNG_UNIT_MESSAGES,
- .si_atomic = true,
+ .si_name = "tx_msgs",
+ .si_desc = "messages sent",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
};
static const nni_stat_info rx_bytes_info = {
- .si_name = "rx_bytes",
- .si_desc = "bytes received",
- .si_type = NNG_STAT_COUNTER,
- .si_unit = NNG_UNIT_BYTES,
- .si_atomic = true,
+ .si_name = "rx_bytes",
+ .si_desc = "bytes received",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_BYTES,
+ .si_atomic = true,
};
static const nni_stat_info tx_bytes_info = {
- .si_name = "tx_bytes",
- .si_desc = "bytes sent",
- .si_type = NNG_STAT_COUNTER,
- .si_unit = NNG_UNIT_BYTES,
- .si_atomic = true,
+ .si_name = "tx_bytes",
+ .si_desc = "bytes sent",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_BYTES,
+ .si_atomic = true,
};
nni_stat_init(&p->st_root, &root_info);
@@ -294,9 +302,9 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
int
nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tdata)
{
- int rv;
- nni_tran * tran = d->d_tran;
- nni_pipe * p;
+ int rv;
+ nni_tran *tran = d->d_tran;
+ nni_pipe *p;
if ((rv = pipe_create(&p, d->d_sock, tran, tdata)) != 0) {
return (rv);
@@ -318,9 +326,9 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tdata)
int
nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tdata)
{
- int rv;
- nni_tran * tran = l->l_tran;
- nni_pipe * p;
+ int rv;
+ nni_tran *tran = l->l_tran;
+ nni_pipe *p;
if ((rv = pipe_create(&p, l->l_sock, tran, tdata)) != 0) {
return (rv);
@@ -328,9 +336,9 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tdata)
p->p_listener = l;
#if NNG_ENABLE_STATS
static const nni_stat_info listener_info = {
- .si_name = "listener",
- .si_desc = "listener for pipe",
- .si_type = NNG_STAT_ID,
+ .si_name = "listener",
+ .si_desc = "listener for pipe",
+ .si_type = NNG_STAT_ID,
};
pipe_stat_init(p, &p->st_ep_id, &listener_info);
nni_stat_set_id(&p->st_ep_id, nni_listener_id(l));
@@ -361,12 +369,6 @@ nni_pipe_getopt(
return (NNG_ENOTSUP);
}
-void *
-nni_pipe_get_proto_data(nni_pipe *p)
-{
- return (p->p_proto_data);
-}
-
uint32_t
nni_pipe_sock_id(nni_pipe *p)
{
@@ -385,7 +387,6 @@ nni_pipe_dialer_id(nni_pipe *p)
return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0);
}
-
void
nni_pipe_add_stat(nni_pipe *p, nni_stat_item *item)
{
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 20d34e82..79c48500 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -42,10 +42,6 @@ extern uint16_t nni_pipe_peer(nni_pipe *);
extern int nni_pipe_getopt(
nni_pipe *, const char *, void *, size_t *, nni_opt_type);
-// nni_pipe_get_proto_data gets the protocol private data set with the
-// nni_pipe_set_proto_data function. No locking is performed.
-extern void *nni_pipe_get_proto_data(nni_pipe *);
-
// nni_pipe_find finds a pipe given its ID. It places a hold on the
// pipe, which must be released by the caller when it is done.
extern int nni_pipe_find(nni_pipe **, uint32_t);
diff --git a/src/core/reap.c b/src/core/reap.c
index ddd2a06e..8be5ee12 100644
--- a/src/core/reap.c
+++ b/src/core/reap.c
@@ -14,53 +14,79 @@
#include <stdbool.h>
-static nni_list reap_list;
-static nni_mtx reap_mtx;
-static nni_cv reap_cv;
-static nni_cv reap_empty_cv;
-static bool reap_exit = false;
-static bool reap_empty = false;
-static nni_thr reap_thr;
+// New stuff.
+static nni_reap_list *reap_list = NULL;
+static nni_thr reap_thr;
+static bool reap_exit;
+static nni_mtx reap_mtx;
+static bool reap_empty;
+static nni_cv reap_work_cv;
+static nni_cv reap_empty_cv;
static void
-reap_worker(void *notused)
+reap_worker(void *unused)
{
- NNI_ARG_UNUSED(notused);
+ NNI_ARG_UNUSED(unused);
+ nni_thr_set_name(NULL, "nng:reap2");
- nni_thr_set_name(NULL, "nng:reap");
-
- nni_mtx_lock(&reap_mtx);
+ nni_mtx_lock(&reap_mtx);
for (;;) {
- nni_reap_item *item;
- while ((item = nni_list_first(&reap_list)) != NULL) {
- nni_list_remove(&reap_list, item);
- nni_mtx_unlock(&reap_mtx);
+ nni_reap_list *list;
+ bool reaped = false;
- item->r_func(item->r_ptr);
- nni_mtx_lock(&reap_mtx);
- }
+ for (list = reap_list; list != NULL; list = list->rl_next) {
+ nni_reap_node *node;
+ size_t offset;
+ nni_cb func;
- reap_empty = true;
- nni_cv_wake(&reap_empty_cv);
+ if ((node = list->rl_nodes) == NULL) {
+ continue;
+ }
- if (reap_exit) {
- break;
- }
+ reaped = true;
+ offset = list->rl_offset;
+ func = list->rl_func;
+ list->rl_nodes = NULL;
- nni_cv_wait(&reap_cv);
+ // We process our list of nodes while not holding
+ // the lock.
+ nni_mtx_unlock(&reap_mtx);
+ while (node != NULL) {
+ void *ptr;
+ ptr = ((char *) node) - offset;
+ node = node->rn_next;
+ func(ptr);
+ }
+ nni_mtx_lock(&reap_mtx);
+ }
+ if (!reaped) {
+ reap_empty = true;
+ nni_cv_wake(&reap_empty_cv);
+ if (reap_exit) {
+ nni_mtx_unlock(&reap_mtx);
+ return;
+ }
+ nni_cv_wait(&reap_work_cv);
+ }
}
- nni_mtx_unlock(&reap_mtx);
}
void
-nni_reap(nni_reap_item *item, nni_cb func, void *ptr)
+nni_reap(nni_reap_list *rl, void *item)
{
+ nni_reap_node *node;
+
nni_mtx_lock(&reap_mtx);
- item->r_func = func;
- item->r_ptr = ptr;
- nni_list_append(&reap_list, item);
- reap_empty = false;
- nni_cv_wake(&reap_cv);
+ if (!rl->rl_inited) {
+ rl->rl_inited = true;
+ rl->rl_next = reap_list;
+ reap_list = rl;
+ }
+ reap_empty = false;
+ node = (void *) ((char *) item + rl->rl_offset);
+ node->rn_next = rl->rl_nodes;
+ rl->rl_nodes = node;
+ nni_cv_wake1(&reap_work_cv);
nni_mtx_unlock(&reap_mtx);
}
@@ -79,16 +105,15 @@ nni_reap_sys_init(void)
{
int rv;
- NNI_LIST_INIT(&reap_list, nni_reap_item, r_link);
+ reap_exit = false;
nni_mtx_init(&reap_mtx);
- nni_cv_init(&reap_cv, &reap_mtx);
+ nni_cv_init(&reap_work_cv, &reap_mtx);
nni_cv_init(&reap_empty_cv, &reap_mtx);
- reap_exit = false;
// If this fails, we don't fail init, instead we will try to
// start up at reap time.
if ((rv = nni_thr_init(&reap_thr, reap_worker, NULL)) != 0) {
- nni_cv_fini(&reap_cv);
+ nni_cv_fini(&reap_work_cv);
nni_cv_fini(&reap_empty_cv);
nni_mtx_fini(&reap_mtx);
return (rv);
@@ -102,7 +127,10 @@ nni_reap_sys_fini(void)
{
nni_mtx_lock(&reap_mtx);
reap_exit = true;
- nni_cv_wake(&reap_cv);
+ nni_cv_wake1(&reap_work_cv);
nni_mtx_unlock(&reap_mtx);
nni_thr_fini(&reap_thr);
+
+ // NB: The subsystem linkages remain in place. We don't need
+ // to reinitialize them across future initializations.
}
diff --git a/src/core/reap.h b/src/core/reap.h
index 40c27a5d..5f631885 100644
--- a/src/core/reap.h
+++ b/src/core/reap.h
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -14,19 +14,35 @@
#include "core/defs.h"
#include "core/list.h"
-// nni_reap_item is defined here so that it can be inlined into
-// structures. Callers must access its members directly.
-typedef struct nni_reap_item {
- nni_list_node r_link;
- void * r_ptr;
- nni_cb r_func;
-} nni_reap_item;
+// nni_reap_node is to be inserted inline into structures
+// for subsystems that wish to support deferred reaping.
+// It should be zeroed at object initialization, but apart
+// from that it must not be touched directly except by the
+// reap subsystem.
+typedef struct nni_reap_node nni_reap_node;
+struct nni_reap_node {
+ nni_reap_node *rn_next;
+};
+
+// nni_reap_list is for subsystems to define their own reap lists.
+// This allows for the reap linkage to be restricted to a single
+// pointer. Subsystems should initialize rl_offset and rl_func,
+// and leave the rest zeroed. The intention is that this is a global
+// static member for each subsystem.
+typedef struct nni_reap_list nni_reap_list;
+struct nni_reap_list {
+ nni_reap_list *rl_next; // linkage in global reap list
+ nni_reap_node *rl_nodes; // list of nodes to reap
+ size_t rl_offset; // offset of reap_node within member.
+ nni_cb rl_func; // function called to reap the item
+ bool rl_inited; // initialized means it is linked in the list
+};
// nni_reap performs an asynchronous reap of an item. This allows functions
// it calls to acquire locks or resources without worrying about deadlocks
// (such as from a completion callback.) The called function should avoid
// blocking for too long if possible, since only one reap thread is present
-// in the system. The intended usage is for an nni_reap_item to be a member
+// in the system. The intended usage is for an nni_reap_node to be a member
// of the structure to be reaped, and and then this function is called to
// finalize it.
//
@@ -35,10 +51,14 @@ typedef struct nni_reap_item {
// is busy. These will be queued at the end of the reap list. This will
// allow a dependency to defer reaping until its dependents have first been
// reaped. HOWEVER, it is important that the item in question actually be
-// part of a fully reapable graph; otherwise this can lead to an infinite
+// part of a fully reap-able graph; otherwise this can lead to an infinite
// loop in the reap thread.
-extern void nni_reap(nni_reap_item *, nni_cb, void *);
+
+extern void nni_reap(nni_reap_list *, void *);
+
+// nni_reap_drain waits for the reap queue to be drained.
extern void nni_reap_drain(void);
+
extern int nni_reap_sys_init(void);
extern void nni_reap_sys_fini(void);
diff --git a/src/core/socket.c b/src/core/socket.c
index f741b6f0..82267287 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -1581,10 +1581,18 @@ nni_dialer_shutdown(nni_dialer *d)
nni_mtx_unlock(&s->s_mx);
}
-void
-nni_dialer_reap(nni_dialer *d)
+static void dialer_reap(void *);
+
+static nni_reap_list dialer_reap_list = {
+ .rl_offset = offsetof(nni_dialer, d_reap),
+ .rl_func = dialer_reap,
+};
+
+static void
+dialer_reap(void *arg)
{
- nni_sock *s = d->d_sock;
+ nni_dialer *d = arg;
+ nni_sock * s = d->d_sock;
nni_aio_stop(&d->d_tmo_aio);
nni_aio_stop(&d->d_con_aio);
@@ -1602,7 +1610,7 @@ nni_dialer_reap(nni_dialer *d)
}
nni_mtx_unlock(&s->s_mx);
// Go back to the end of reap list.
- nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d);
+ nni_dialer_reap(d);
return;
}
@@ -1617,6 +1625,12 @@ nni_dialer_reap(nni_dialer *d)
}
void
+nni_dialer_reap(nni_dialer *d)
+{
+ nni_reap(&dialer_reap_list, d);
+}
+
+void
nni_listener_add_pipe(nni_listener *l, void *tpipe)
{
nni_sock *s = l->l_sock;
@@ -1703,10 +1717,18 @@ nni_listener_shutdown(nni_listener *l)
nni_mtx_unlock(&s->s_mx);
}
-void
-nni_listener_reap(nni_listener *l)
+static void listener_reap(void *);
+
+static nni_reap_list listener_reap_list = {
+ .rl_offset = offsetof(nni_listener, l_reap),
+ .rl_func = listener_reap,
+};
+
+static void
+listener_reap(void *arg)
{
- nni_sock *s = l->l_sock;
+ nni_listener *l = arg;
+ nni_sock * s = l->l_sock;
nni_aio_stop(&l->l_tmo_aio);
nni_aio_stop(&l->l_acc_aio);
@@ -1724,7 +1746,7 @@ nni_listener_reap(nni_listener *l)
}
nni_mtx_unlock(&s->s_mx);
// Go back to the end of reap list.
- nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l);
+ nni_reap(&listener_reap_list, l);
return;
}
@@ -1739,6 +1761,12 @@ nni_listener_reap(nni_listener *l)
}
void
+nni_listener_reap(nni_listener *l)
+{
+ nni_reap(&listener_reap_list, l);
+}
+
+void
nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
{
nni_sock * s = p->p_sock;
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index 804fa00c..638a3bfd 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -37,7 +37,7 @@ struct nni_dialer {
nni_duration d_currtime; // current time for reconnect
nni_duration d_inirtime; // initial time for reconnect
nni_time d_conntime; // time of last good connect
- nni_reap_item d_reap;
+ nni_reap_node d_reap;
#ifdef NNG_ENABLE_STATS
nni_stat_item st_root;
@@ -73,7 +73,7 @@ struct nni_listener {
nni_list l_pipes;
nni_aio l_acc_aio;
nni_aio l_tmo_aio;
- nni_reap_item l_reap;
+ nni_reap_node l_reap;
#ifdef NNG_ENABLE_STATS
nni_stat_item st_root;
@@ -111,7 +111,7 @@ struct nni_pipe {
int p_ref;
nni_mtx p_mtx;
nni_cv p_cv;
- nni_reap_item p_reap;
+ nni_reap_node p_reap;
#ifdef NNG_ENABLE_STATS
nni_stat_item st_root;
@@ -122,7 +122,6 @@ struct nni_pipe {
nni_stat_item st_tx_msgs;
nni_stat_item st_rx_bytes;
nni_stat_item st_tx_bytes;
-
#endif
};