aboutsummaryrefslogtreecommitdiff
path: root/src/sp/protocol/pubsub0/sub.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-11-03 00:51:15 -0700
committerGarrett D'Amore <garrett@damore.org>2024-11-03 00:58:37 -0700
commitb3fc8a44119d7ab90366a1b92a5e1327ebcb8145 (patch)
tree994c0199184fdea3d56eeb61b252e10733588947 /src/sp/protocol/pubsub0/sub.c
parent02ec0b55cbee5de4d0fd688ce0ebddf08178dc98 (diff)
downloadnng-b3fc8a44119d7ab90366a1b92a5e1327ebcb8145.tar.gz
nng-b3fc8a44119d7ab90366a1b92a5e1327ebcb8145.tar.bz2
nng-b3fc8a44119d7ab90366a1b92a5e1327ebcb8145.zip
Replace NNG_OPT_SUB_SUBSCRIBE/UNSUBSCRIBE with functions.
The main purpose is to eliminate the NNI_TYPE_OPAQUE options, by putting these into their own first class, protocol-specific, functions.
Diffstat (limited to 'src/sp/protocol/pubsub0/sub.c')
-rw-r--r--src/sp/protocol/pubsub0/sub.c127
1 files changed, 91 insertions, 36 deletions
diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c
index 5d6a2a05..a4741114 100644
--- a/src/sp/protocol/pubsub0/sub.c
+++ b/src/sp/protocol/pubsub0/sub.c
@@ -13,6 +13,7 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "core/socket.h"
#include "nng/protocol/pubsub0/sub.h"
// Subscriber protocol. The SUB protocol receives messages sent to
@@ -454,13 +455,11 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
// to replace this with a patricia trie, like old nanomsg had.
static int
-sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
+sub0_ctx_subscribe(sub0_ctx *ctx, const void *buf, size_t sz)
{
- sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;
sub0_topic *new_topic;
- NNI_ARG_UNUSED(t);
nni_mtx_lock(&sock->lk);
NNI_LIST_FOREACH (&ctx->topics, topic) {
@@ -492,13 +491,11 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
+sub0_ctx_unsubscribe(sub0_ctx *ctx, const void *buf, size_t sz)
{
- sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;
size_t len;
- NNI_ARG_UNUSED(t);
nni_mtx_lock(&sock->lk);
NNI_LIST_FOREACH (&ctx->topics, topic) {
@@ -580,14 +577,6 @@ static nni_option sub0_ctx_options[] = {
.o_set = sub0_ctx_set_recv_buf_len,
},
{
- .o_name = NNG_OPT_SUB_SUBSCRIBE,
- .o_set = sub0_ctx_subscribe,
- },
- {
- .o_name = NNG_OPT_SUB_UNSUBSCRIBE,
- .o_set = sub0_ctx_unsubscribe,
- },
- {
.o_name = NNG_OPT_SUB_PREFNEW,
.o_get = sub0_ctx_get_prefer_new,
.o_set = sub0_ctx_set_prefer_new,
@@ -637,20 +626,6 @@ sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_sock *sock = arg;
- return (sub0_ctx_subscribe(&sock->master, buf, sz, t));
-}
-
-static int
-sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_sock *sock = arg;
- return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t));
-}
-
-static int
sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_sock *sock = arg;
@@ -686,14 +661,6 @@ static nni_proto_ctx_ops sub0_ctx_ops = {
static nni_option sub0_sock_options[] = {
{
- .o_name = NNG_OPT_SUB_SUBSCRIBE,
- .o_set = sub0_sock_subscribe,
- },
- {
- .o_name = NNG_OPT_SUB_UNSUBSCRIBE,
- .o_set = sub0_sock_unsubscribe,
- },
- {
.o_name = NNG_OPT_RECVBUF,
.o_get = sub0_sock_get_recv_buf_len,
.o_set = sub0_sock_set_recv_buf_len,
@@ -736,3 +703,91 @@ nng_sub0_open(nng_socket *sock)
{
return (nni_proto_open(sock, &sub0_proto));
}
+
+int
+nng_sub0_socket_subscribe(nng_socket id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_sock *s;
+ sub0_sock *sock;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_sock_find(&s, id.id)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) {
+ nni_sock_rele(s);
+ return (NNG_ENOTSUP);
+ }
+ sock = nni_sock_proto_data(s);
+ rv = sub0_ctx_subscribe(&sock->master, buf, sz);
+ nni_sock_rele(s);
+ return (rv);
+}
+
+int
+nng_sub0_socket_unsubscribe(nng_socket id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_sock *s;
+ sub0_sock *sock;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_sock_find(&s, id.id)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) {
+ nni_sock_rele(s);
+ return (NNG_ENOTSUP);
+ }
+ sock = nni_sock_proto_data(s);
+ rv = sub0_ctx_unsubscribe(&sock->master, buf, sz);
+ nni_sock_rele(s);
+ return (rv);
+}
+
+int
+nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_ctx *c;
+ sub0_ctx *ctx;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_ctx_find(&c, id.id, false)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) {
+ nni_ctx_rele(c);
+ return (NNG_ENOTSUP);
+ }
+ ctx = nni_ctx_proto_data(c);
+ rv = sub0_ctx_subscribe(ctx, buf, sz);
+ nni_ctx_rele(c);
+ return (rv);
+}
+
+int
+nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_ctx *c;
+ sub0_ctx *ctx;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_ctx_find(&c, id.id, false)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) {
+ nni_ctx_rele(c);
+ return (NNG_ENOTSUP);
+ }
+ ctx = nni_ctx_proto_data(c);
+ rv = sub0_ctx_unsubscribe(ctx, buf, sz);
+ nni_ctx_rele(c);
+ return (rv);
+}