aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2023-11-25 17:35:35 -0800
committerGarrett D'Amore <garrett@damore.org>2023-11-25 17:47:02 -0800
commite623dedab28a1fec6270c05f9643e68bfb98b7c3 (patch)
tree2860475e583380771ac900c0d6417627efdcbd03
parente5fcc317c5f75d8fc6ea053c9f960e35e09ac38f (diff)
downloadnng-e623dedab28a1fec6270c05f9643e68bfb98b7c3.tar.gz
nng-e623dedab28a1fec6270c05f9643e68bfb98b7c3.tar.bz2
nng-e623dedab28a1fec6270c05f9643e68bfb98b7c3.zip
fixes #1523 rare SEGV in sub nni_list_remove
Credit goes to Wu Xuan (@willwu1217) for diagnosing and proposing a fix as part of #1695. This approach takes a revised approach to avoid adding extra memory, and it also is slightly faster as we do not need to update both pointers in the linked list, by reusing the reap node. As part of this a new internal API, nni_aio_completions, is introduced. In all likelihood we will be able to use this to solve some similar crashes in other areas of the code.
-rw-r--r--src/core/aio.c33
-rw-r--r--src/core/aio.h26
-rw-r--r--src/sp/protocol/pubsub0/sub.c67
3 files changed, 90 insertions, 36 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 564e91a3..e849b33d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -508,6 +508,39 @@ nni_aio_list_active(nni_aio *aio)
return (nni_list_node_active(&aio->a_prov_node));
}
+// completions list.
+// Implementation note: in order to avoid wasting space, we
+// reuse the reap node -- which will be inactive here.
+void
+nni_aio_completions_init(nni_aio_completions *clp)
+{
+ *clp = NULL;
+}
+
+void
+nni_aio_completions_add(nni_aio_completions *clp, nni_aio *aio, int result, size_t count)
+{
+ NNI_ASSERT(!nni_aio_list_active(aio));
+ aio->a_reap_node.rn_next = *clp;
+ aio->a_result = result;
+ aio->a_count = count;
+ *clp = aio;
+}
+
+void
+nni_aio_completions_run(nni_aio_completions *clp)
+{
+ nni_aio *aio;
+ nni_aio *cl = *clp;
+ *clp = NULL;
+
+ while ((aio = cl) != NULL) {
+ cl = (void *)aio->a_reap_node.rn_next;
+ aio->a_reap_node.rn_next = NULL;
+ nni_aio_finish_sync(aio, aio->a_result, aio->a_count);
+ }
+}
+
static void
nni_aio_expire_add(nni_aio *aio)
{
diff --git a/src/core/aio.h b/src/core/aio.h
index 6315e90c..a2ebf70a 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -1,5 +1,5 @@
//
-// Copyright 2022 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -166,6 +166,30 @@ extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);
extern void nni_sleep_aio(nni_duration, nni_aio *);
+// nni_aio_completion_list is used after removing the aio from an
+// active work queue, and keeping them so that the completions can
+// be run in a deferred manner. These lists are simple, and intended
+// to be used as local variables. It's important to initialize the
+// list before using it. Also, any AIO added to a completion list must
+// not be in active use anywhere.
+typedef void *nni_aio_completions;
+
+// nni_aio_completions_init just initializes a completions list.
+// This just sets the pointed value to NULL.
+extern void nni_aio_completions_init(nni_aio_completions *);
+
+// nni_aio_completions_run runs nni_aio_finish_sync for all the aio objects
+// that have been added to the completions. The result code and count used
+// are those supplied in nni_aio_completions_add. Callers should not hold
+// locks when calling this.
+extern void nni_aio_completions_run(nni_aio_completions *);
+
+// nni_aio_completions_add adds an aio (with the result code and length as
+// appropriate) to the completion list. This should be done while the
+// appropriate lock is held. The aio must not be scheduled.
+extern void nni_aio_completions_add(nni_aio_completions *, nni_aio *,
+ int, size_t);
+
extern int nni_aio_sys_init(void);
extern void nni_aio_sys_fini(void);
diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c
index 10f42724..e7540dee 100644
--- a/src/sp/protocol/pubsub0/sub.c
+++ b/src/sp/protocol/pubsub0/sub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Nathan Kent <nate@nkent.net>
//
@@ -44,14 +44,14 @@ static void sub0_pipe_fini(void *);
struct sub0_topic {
nni_list_node node;
size_t len;
- void * buf;
+ void *buf;
};
// sub0_ctx is a context for a SUB socket. The advantage of contexts is
// that different contexts can maintain different subscriptions.
struct sub0_ctx {
nni_list_node node;
- sub0_sock * sock;
+ sub0_sock *sock;
nni_list topics; // TODO: Consider patricia trie
nni_list recv_queue; // can have multiple pending receives
nni_lmq lmq;
@@ -71,7 +71,7 @@ struct sub0_sock {
// sub0_pipe is our per-pipe protocol private structure.
struct sub0_pipe {
- nni_pipe * pipe;
+ nni_pipe *pipe;
sub0_sock *sub;
nni_aio aio_recv;
};
@@ -79,7 +79,7 @@ struct sub0_pipe {
static void
sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
nni_mtx_lock(&sock->lk);
if (nni_list_active(&ctx->recv_queue, aio)) {
@@ -92,9 +92,9 @@ sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
static void
sub0_ctx_recv(void *arg, nni_aio *aio)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
- nni_msg * msg;
+ nni_msg *msg;
if (nni_aio_begin(aio) != 0) {
return;
@@ -140,9 +140,9 @@ sub0_ctx_send(void *arg, nni_aio *aio)
static void
sub0_ctx_close(void *arg)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
- nni_aio * aio;
+ nni_aio *aio;
nni_mtx_lock(&sock->lk);
while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) {
@@ -155,8 +155,8 @@ sub0_ctx_close(void *arg)
static void
sub0_ctx_fini(void *arg)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
+ sub0_ctx *ctx = arg;
+ sub0_sock *sock = ctx->sock;
sub0_topic *topic;
sub0_ctx_close(ctx);
@@ -179,7 +179,7 @@ static void
sub0_ctx_init(void *ctx_arg, void *sock_arg)
{
sub0_sock *sock = sock_arg;
- sub0_ctx * ctx = ctx_arg;
+ sub0_ctx *ctx = ctx_arg;
size_t len;
bool prefer_new;
@@ -311,22 +311,22 @@ sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len)
static void
sub0_recv_cb(void *arg)
{
- sub0_pipe *p = arg;
- sub0_sock *sock = p->sub;
- sub0_ctx * ctx;
- nni_msg * msg;
- size_t len;
- uint8_t * body;
- nni_list finish;
- nng_aio * aio;
- nni_msg * dup_msg;
+ sub0_pipe *p = arg;
+ sub0_sock *sock = p->sub;
+ sub0_ctx *ctx;
+ nni_msg *msg;
+ size_t len;
+ uint8_t *body;
+ nng_aio *aio;
+ nni_msg *dup_msg;
+ nni_aio_completions finish;
if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
- nni_aio_list_init(&finish);
+ nni_aio_completions_init(&finish);
msg = nni_aio_get_msg(&p->aio_recv);
nni_aio_set_msg(&p->aio_recv, NULL);
@@ -370,7 +370,7 @@ sub0_recv_cb(void *arg)
nni_aio_set_msg(aio, dup_msg);
// Save for synchronous completion
- nni_list_append(&finish, aio);
+ nni_aio_completions_add(&finish, aio, 0, len);
} else if (nni_lmq_full(&ctx->lmq)) {
// Make space for the new message.
nni_msg *old;
@@ -401,10 +401,7 @@ sub0_recv_cb(void *arg)
nni_msg_free(msg);
}
- while ((aio = nni_list_first(&finish)) != NULL) {
- nni_list_remove(&finish, aio);
- nni_aio_finish_sync(aio, 0, len);
- }
+ nni_aio_completions_run(&finish);
nni_pipe_recv(p->pipe, &p->aio_recv);
}
@@ -412,7 +409,7 @@ sub0_recv_cb(void *arg)
static int
sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
int val;
nni_mtx_lock(&sock->lk);
@@ -425,7 +422,7 @@ sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
int val;
int rv;
@@ -456,8 +453,8 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
+ sub0_ctx *ctx = arg;
+ sub0_sock *sock = ctx->sock;
sub0_topic *topic;
sub0_topic *new_topic;
NNI_ARG_UNUSED(t);
@@ -494,8 +491,8 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
+ sub0_ctx *ctx = arg;
+ sub0_sock *sock = ctx->sock;
sub0_topic *topic;
size_t len;
NNI_ARG_UNUSED(t);
@@ -540,7 +537,7 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
bool val;
@@ -554,7 +551,7 @@ sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
bool val;
int rv;