diff options
Diffstat (limited to 'src/sp/protocol/pubsub0')
| -rw-r--r-- | src/sp/protocol/pubsub0/sub.c | 67 |
1 files changed, 32 insertions, 35 deletions
diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index 10f42724..e7540dee 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Nathan Kent <nate@nkent.net> // @@ -44,14 +44,14 @@ static void sub0_pipe_fini(void *); struct sub0_topic { nni_list_node node; size_t len; - void * buf; + void *buf; }; // sub0_ctx is a context for a SUB socket. The advantage of contexts is // that different contexts can maintain different subscriptions. struct sub0_ctx { nni_list_node node; - sub0_sock * sock; + sub0_sock *sock; nni_list topics; // TODO: Consider patricia trie nni_list recv_queue; // can have multiple pending receives nni_lmq lmq; @@ -71,7 +71,7 @@ struct sub0_sock { // sub0_pipe is our per-pipe protocol private structure. struct sub0_pipe { - nni_pipe * pipe; + nni_pipe *pipe; sub0_sock *sub; nni_aio aio_recv; }; @@ -79,7 +79,7 @@ struct sub0_pipe { static void sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; nni_mtx_lock(&sock->lk); if (nni_list_active(&ctx->recv_queue, aio)) { @@ -92,9 +92,9 @@ sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) static void sub0_ctx_recv(void *arg, nni_aio *aio) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; - nni_msg * msg; + nni_msg *msg; if (nni_aio_begin(aio) != 0) { return; @@ -140,9 +140,9 @@ sub0_ctx_send(void *arg, nni_aio *aio) static void sub0_ctx_close(void *arg) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; - nni_aio * aio; + nni_aio *aio; nni_mtx_lock(&sock->lk); while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { @@ -155,8 +155,8 @@ sub0_ctx_close(void *arg) static void sub0_ctx_fini(void *arg) { - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; + sub0_ctx *ctx = arg; + sub0_sock *sock = ctx->sock; sub0_topic *topic; sub0_ctx_close(ctx); @@ -179,7 +179,7 @@ static void sub0_ctx_init(void *ctx_arg, void *sock_arg) { sub0_sock *sock = sock_arg; - sub0_ctx * ctx = ctx_arg; + sub0_ctx *ctx = ctx_arg; size_t len; bool prefer_new; @@ -311,22 +311,22 @@ sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len) static void sub0_recv_cb(void *arg) { - sub0_pipe *p = arg; - sub0_sock *sock = p->sub; - sub0_ctx * ctx; - nni_msg * msg; - size_t len; - uint8_t * body; - nni_list finish; - nng_aio * aio; - nni_msg * dup_msg; + sub0_pipe *p = arg; + sub0_sock *sock = p->sub; + sub0_ctx *ctx; + nni_msg *msg; + size_t len; + uint8_t *body; + nng_aio *aio; + nni_msg *dup_msg; + nni_aio_completions finish; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - nni_aio_list_init(&finish); + nni_aio_completions_init(&finish); msg = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); @@ -370,7 +370,7 @@ sub0_recv_cb(void *arg) nni_aio_set_msg(aio, dup_msg); // Save for synchronous completion - nni_list_append(&finish, aio); + nni_aio_completions_add(&finish, aio, 0, len); } else if (nni_lmq_full(&ctx->lmq)) { // Make space for the new message. nni_msg *old; @@ -401,10 +401,7 @@ sub0_recv_cb(void *arg) nni_msg_free(msg); } - while ((aio = nni_list_first(&finish)) != NULL) { - nni_list_remove(&finish, aio); - nni_aio_finish_sync(aio, 0, len); - } + nni_aio_completions_run(&finish); nni_pipe_recv(p->pipe, &p->aio_recv); } @@ -412,7 +409,7 @@ sub0_recv_cb(void *arg) static int sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; int val; nni_mtx_lock(&sock->lk); @@ -425,7 +422,7 @@ sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) static int sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; int val; int rv; @@ -456,8 +453,8 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) static int sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; + sub0_ctx *ctx = arg; + sub0_sock *sock = ctx->sock; sub0_topic *topic; sub0_topic *new_topic; NNI_ARG_UNUSED(t); @@ -494,8 +491,8 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) static int sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; + sub0_ctx *ctx = arg; + sub0_sock *sock = ctx->sock; sub0_topic *topic; size_t len; NNI_ARG_UNUSED(t); @@ -540,7 +537,7 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) static int sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; bool val; @@ -554,7 +551,7 @@ sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) static int sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; bool val; int rv; |
