aboutsummaryrefslogtreecommitdiff
path: root/src/sp
diff options
context:
space:
mode:
Diffstat (limited to 'src/sp')
-rw-r--r--src/sp/protocol/pubsub0/sub.c67
1 files changed, 32 insertions, 35 deletions
diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c
index 10f42724..e7540dee 100644
--- a/src/sp/protocol/pubsub0/sub.c
+++ b/src/sp/protocol/pubsub0/sub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Nathan Kent <nate@nkent.net>
//
@@ -44,14 +44,14 @@ static void sub0_pipe_fini(void *);
struct sub0_topic {
nni_list_node node;
size_t len;
- void * buf;
+ void *buf;
};
// sub0_ctx is a context for a SUB socket. The advantage of contexts is
// that different contexts can maintain different subscriptions.
struct sub0_ctx {
nni_list_node node;
- sub0_sock * sock;
+ sub0_sock *sock;
nni_list topics; // TODO: Consider patricia trie
nni_list recv_queue; // can have multiple pending receives
nni_lmq lmq;
@@ -71,7 +71,7 @@ struct sub0_sock {
// sub0_pipe is our per-pipe protocol private structure.
struct sub0_pipe {
- nni_pipe * pipe;
+ nni_pipe *pipe;
sub0_sock *sub;
nni_aio aio_recv;
};
@@ -79,7 +79,7 @@ struct sub0_pipe {
static void
sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
nni_mtx_lock(&sock->lk);
if (nni_list_active(&ctx->recv_queue, aio)) {
@@ -92,9 +92,9 @@ sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
static void
sub0_ctx_recv(void *arg, nni_aio *aio)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
- nni_msg * msg;
+ nni_msg *msg;
if (nni_aio_begin(aio) != 0) {
return;
@@ -140,9 +140,9 @@ sub0_ctx_send(void *arg, nni_aio *aio)
static void
sub0_ctx_close(void *arg)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
- nni_aio * aio;
+ nni_aio *aio;
nni_mtx_lock(&sock->lk);
while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) {
@@ -155,8 +155,8 @@ sub0_ctx_close(void *arg)
static void
sub0_ctx_fini(void *arg)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
+ sub0_ctx *ctx = arg;
+ sub0_sock *sock = ctx->sock;
sub0_topic *topic;
sub0_ctx_close(ctx);
@@ -179,7 +179,7 @@ static void
sub0_ctx_init(void *ctx_arg, void *sock_arg)
{
sub0_sock *sock = sock_arg;
- sub0_ctx * ctx = ctx_arg;
+ sub0_ctx *ctx = ctx_arg;
size_t len;
bool prefer_new;
@@ -311,22 +311,22 @@ sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len)
static void
sub0_recv_cb(void *arg)
{
- sub0_pipe *p = arg;
- sub0_sock *sock = p->sub;
- sub0_ctx * ctx;
- nni_msg * msg;
- size_t len;
- uint8_t * body;
- nni_list finish;
- nng_aio * aio;
- nni_msg * dup_msg;
+ sub0_pipe *p = arg;
+ sub0_sock *sock = p->sub;
+ sub0_ctx *ctx;
+ nni_msg *msg;
+ size_t len;
+ uint8_t *body;
+ nng_aio *aio;
+ nni_msg *dup_msg;
+ nni_aio_completions finish;
if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
- nni_aio_list_init(&finish);
+ nni_aio_completions_init(&finish);
msg = nni_aio_get_msg(&p->aio_recv);
nni_aio_set_msg(&p->aio_recv, NULL);
@@ -370,7 +370,7 @@ sub0_recv_cb(void *arg)
nni_aio_set_msg(aio, dup_msg);
// Save for synchronous completion
- nni_list_append(&finish, aio);
+ nni_aio_completions_add(&finish, aio, 0, len);
} else if (nni_lmq_full(&ctx->lmq)) {
// Make space for the new message.
nni_msg *old;
@@ -401,10 +401,7 @@ sub0_recv_cb(void *arg)
nni_msg_free(msg);
}
- while ((aio = nni_list_first(&finish)) != NULL) {
- nni_list_remove(&finish, aio);
- nni_aio_finish_sync(aio, 0, len);
- }
+ nni_aio_completions_run(&finish);
nni_pipe_recv(p->pipe, &p->aio_recv);
}
@@ -412,7 +409,7 @@ sub0_recv_cb(void *arg)
static int
sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
int val;
nni_mtx_lock(&sock->lk);
@@ -425,7 +422,7 @@ sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
int val;
int rv;
@@ -456,8 +453,8 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
+ sub0_ctx *ctx = arg;
+ sub0_sock *sock = ctx->sock;
sub0_topic *topic;
sub0_topic *new_topic;
NNI_ARG_UNUSED(t);
@@ -494,8 +491,8 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
+ sub0_ctx *ctx = arg;
+ sub0_sock *sock = ctx->sock;
sub0_topic *topic;
size_t len;
NNI_ARG_UNUSED(t);
@@ -540,7 +537,7 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
bool val;
@@ -554,7 +551,7 @@ sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
+ sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
bool val;
int rv;