From b3fc8a44119d7ab90366a1b92a5e1327ebcb8145 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 3 Nov 2024 00:51:15 -0700 Subject: Replace NNG_OPT_SUB_SUBSCRIBE/UNSUBSCRIBE with functions. The main purpose is to eliminate the NNI_TYPE_OPAQUE options, by putting these into their own first class, protocol-specific, functions. --- src/sp/protocol/pubsub0/sub.c | 127 ++++++++++++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 36 deletions(-) (limited to 'src/sp/protocol/pubsub0/sub.c') diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index 5d6a2a05..a4741114 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -13,6 +13,7 @@ #include #include "core/nng_impl.h" +#include "core/socket.h" #include "nng/protocol/pubsub0/sub.h" // Subscriber protocol. The SUB protocol receives messages sent to @@ -454,13 +455,11 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) // to replace this with a patricia trie, like old nanomsg had. static int -sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) +sub0_ctx_subscribe(sub0_ctx *ctx, const void *buf, size_t sz) { - sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; sub0_topic *topic; sub0_topic *new_topic; - NNI_ARG_UNUSED(t); nni_mtx_lock(&sock->lk); NNI_LIST_FOREACH (&ctx->topics, topic) { @@ -492,13 +491,11 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) } static int -sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) +sub0_ctx_unsubscribe(sub0_ctx *ctx, const void *buf, size_t sz) { - sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; sub0_topic *topic; size_t len; - NNI_ARG_UNUSED(t); nni_mtx_lock(&sock->lk); NNI_LIST_FOREACH (&ctx->topics, topic) { @@ -579,14 +576,6 @@ static nni_option sub0_ctx_options[] = { .o_get = sub0_ctx_get_recv_buf_len, .o_set = sub0_ctx_set_recv_buf_len, }, - { - .o_name = NNG_OPT_SUB_SUBSCRIBE, - .o_set = sub0_ctx_subscribe, - }, - { - .o_name = NNG_OPT_SUB_UNSUBSCRIBE, - .o_set = sub0_ctx_unsubscribe, - }, { .o_name = NNG_OPT_SUB_PREFNEW, .o_get = sub0_ctx_get_prefer_new, @@ -636,20 +625,6 @@ sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) return (sub0_ctx_set_recv_buf_len(&sock->master, buf, sz, t)); } -static int -sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_sock *sock = arg; - return (sub0_ctx_subscribe(&sock->master, buf, sz, t)); -} - -static int -sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_sock *sock = arg; - return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t)); -} - static int sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) { @@ -685,14 +660,6 @@ static nni_proto_ctx_ops sub0_ctx_ops = { }; static nni_option sub0_sock_options[] = { - { - .o_name = NNG_OPT_SUB_SUBSCRIBE, - .o_set = sub0_sock_subscribe, - }, - { - .o_name = NNG_OPT_SUB_UNSUBSCRIBE, - .o_set = sub0_sock_unsubscribe, - }, { .o_name = NNG_OPT_RECVBUF, .o_get = sub0_sock_get_recv_buf_len, @@ -736,3 +703,91 @@ nng_sub0_open(nng_socket *sock) { return (nni_proto_open(sock, &sub0_proto)); } + +int +nng_sub0_socket_subscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_subscribe(&sock->master, buf, sz); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_socket_unsubscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_unsubscribe(&sock->master, buf, sz); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_subscribe(ctx, buf, sz); + nni_ctx_rele(c); + return (rv); +} + +int +nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_unsubscribe(ctx, buf, sz); + nni_ctx_rele(c); + return (rv); +} -- cgit v1.2.3-70-g09d2