aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.c33
-rw-r--r--src/core/aio.h26
-rw-r--r--src/sp/protocol/pubsub0/sub.c67
3 files changed, 90 insertions, 36 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 564e91a3..e849b33d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -508,6 +508,39 @@ nni_aio_list_active(nni_aio *aio)
return (nni_list_node_active(&aio->a_prov_node));
}
+// completions list.
+// Implementation note: in order to avoid wasting space, we
+// reuse the reap node -- which will be inactive here.
+void
+nni_aio_completions_init(nni_aio_completions *clp)
+{
+ *clp = NULL;
+}
+
+void
+nni_aio_completions_add(nni_aio_completions *clp, nni_aio *aio, int result, size_t count)
+{
+ NNI_ASSERT(!nni_aio_list_active(aio));
+ aio->a_reap_node.rn_next = *clp;
+ aio->a_result = result;
+ aio->a_count = count;
+ *clp = aio;
+}
+
+void
+nni_aio_completions_run(nni_aio_completions *clp)
+{
+ nni_aio *aio;
+ nni_aio *cl = *clp;
+ *clp = NULL;
+
+ while ((aio = cl) != NULL) {
+ cl = (void *)aio->a_reap_node.rn_next;
+ aio->a_reap_node.rn_next = NULL;
+ nni_aio_finish_sync(aio, aio->a_result, aio->a_count);
+ }
+}
+
static void
nni_aio_expire_add(nni_aio *aio)
{
diff --git a/src/core/aio.h b/src/core/aio.h
index 6315e90c..a2ebf70a 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -1,5 +1,5 @@
//
-// Copyright 2022 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -166,6 +166,30 @@ extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);
extern void nni_sleep_aio(nni_duration, nni_aio *);
+// nni_aio_completion_list is used after removing the aio from an
+// active work queue, and keeping them so that the completions can
+// be run in a deferred manner. These lists are simple, and intended
+// to be used as local variables. It's important to initialize the
+// list before using it. Also, any AIO added to a completion list must
+// not be in active use anywhere.
+typedef void *nni_aio_completions;
+
+// nni_aio_completions_init just initializes a completions list.
+// This just sets the pointed value to NULL.
+extern void nni_aio_completions_init(nni_aio_completions *);
+
+// nni_aio_completions_run runs nni_aio_finish_sync for all the aio objects
+// that have been added to the completions. The result code and count used
+// are those supplied in nni_aio_completions_add. Callers should not hold
+// locks when calling this.
+extern void nni_aio_completions_run(nni_aio_completions *);
+
+// nni_aio_completions_add adds an aio (with the result code and length as
+// appropriate) to the completion list. This should be done while the
+// appropriate lock is held. The aio must not be scheduled.
+extern void nni_aio_completions_add(nni_aio_completions *, nni_aio *,
+ int, size_t);
+
extern int nni_aio_sys_init(void);
extern void nni_aio_sys_fini(void);
diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c
index 10f42724..e7540dee 100644
--- a/src/sp/protocol/pubsub0/sub.c
+++ b/src/sp/protocol/pubsub0/sub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Nathan Kent <nate@nkent.net>
//
@@ -44,14 +44,14 @@ static void sub0_pipe_fini(void *);
struct sub0_topic {
nni_list_node node;
size_t len;
- void * buf;
+ void *buf;
};
// sub0_ctx is a context for a SUB socket. The advantage of contexts is
// that different contexts can maintain different subscriptions.
struct sub0_ctx {
nni_list_node node;
- sub0_sock * sock;
+ sub0_sock *sock;
nni_list topics; // TODO: Consider patricia trie
nni_list recv_queue; // can have multiple pending receives
nni_lmq lmq;
@@ -71,7 +71,7 @@ struct sub0_sock {
// sub0_pipe is our per-pipe protocol private structure.
struct sub0_pipe {
- nni_pipe * pipe;
+ nni_pipe *pipe;
sub0_sock *sub;
nni_aio aio_recv;
};
@@ -79,7 +79,7 @@ struct sub0_pipe {
static void
sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
nni_mtx_lock(&sock->lk);
if (nni_list_active(&ctx->recv_queue, aio)) {
@@ -92,9 +92,9 @@ sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
static void
sub0_ctx_recv(void *arg, nni_aio *aio)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
- nni_msg * msg;
+ nni_msg *msg;
if (nni_aio_begin(aio) != 0) {
return;
@@ -140,9 +140,9 @@ sub0_ctx_send(void *arg, nni_aio *aio)
static void
sub0_ctx_close(void *arg)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
- nni_aio * aio;
+ nni_aio *aio;
nni_mtx_lock(&sock->lk);
while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) {
@@ -155,8 +155,8 @@ sub0_ctx_close(void *arg)
static void
sub0_ctx_fini(void *arg)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
+ sub0_ctx *ctx = arg;
+ sub0_sock *sock = ctx->sock;
sub0_topic *topic;
sub0_ctx_close(ctx);
@@ -179,7 +179,7 @@ static void
sub0_ctx_init(void *ctx_arg, void *sock_arg)
{
sub0_sock *sock = sock_arg;
- sub0_ctx * ctx = ctx_arg;
+ sub0_ctx *ctx = ctx_arg;
size_t len;
bool prefer_new;
@@ -311,22 +311,22 @@ sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len)
static void
sub0_recv_cb(void *arg)
{
- sub0_pipe *p = arg;
- sub0_sock *sock = p->sub;
- sub0_ctx * ctx;
- nni_msg * msg;
- size_t len;
- uint8_t * body;
- nni_list finish;
- nng_aio * aio;
- nni_msg * dup_msg;
+ sub0_pipe *p = arg;
+ sub0_sock *sock = p->sub;
+ sub0_ctx *ctx;
+ nni_msg *msg;
+ size_t len;
+ uint8_t *body;
+ nng_aio *aio;
+ nni_msg *dup_msg;
+ nni_aio_completions finish;
if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
- nni_aio_list_init(&finish);
+ nni_aio_completions_init(&finish);
msg = nni_aio_get_msg(&p->aio_recv);
nni_aio_set_msg(&p->aio_recv, NULL);
@@ -370,7 +370,7 @@ sub0_recv_cb(void *arg)
nni_aio_set_msg(aio, dup_msg);
// Save for synchronous completion
- nni_list_append(&finish, aio);
+ nni_aio_completions_add(&finish, aio, 0, len);
} else if (nni_lmq_full(&ctx->lmq)) {
// Make space for the new message.
nni_msg *old;
@@ -401,10 +401,7 @@ sub0_recv_cb(void *arg)
nni_msg_free(msg);
}
- while ((aio = nni_list_first(&finish)) != NULL) {
- nni_list_remove(&finish, aio);
- nni_aio_finish_sync(aio, 0, len);
- }
+ nni_aio_completions_run(&finish);
nni_pipe_recv(p->pipe, &p->aio_recv);
}
@@ -412,7 +409,7 @@ sub0_recv_cb(void *arg)
static int
sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
int val;
nni_mtx_lock(&sock->lk);
@@ -425,7 +422,7 @@ sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
int val;
int rv;
@@ -456,8 +453,8 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
+ sub0_ctx *ctx = arg;
+ sub0_sock *sock = ctx->sock;
sub0_topic *topic;
sub0_topic *new_topic;
NNI_ARG_UNUSED(t);
@@ -494,8 +491,8 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
+ sub0_ctx *ctx = arg;
+ sub0_sock *sock = ctx->sock;
sub0_topic *topic;
size_t len;
NNI_ARG_UNUSED(t);
@@ -540,7 +537,7 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
bool val;
@@ -554,7 +551,7 @@ sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
bool val;
int rv;