aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub/sub.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pubsub/sub.c')
-rw-r--r--src/protocol/pubsub/sub.c58
1 files changed, 34 insertions, 24 deletions
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 5f4b497d..7b6f4904 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -13,6 +13,9 @@
#include "core/nng_impl.h"
+const char *nng_opt_sub_subscribe = NNG_OPT_SUB_SUBSCRIBE;
+const char *nng_opt_sub_unsubscribe = NNG_OPT_SUB_UNSUBSCRIBE;
+
// Subscriber protocol. The SUB protocol receives messages sent to
// it from publishers, and filters out those it is not interested in,
// only passing up ones that match known subscriptions.
@@ -178,8 +181,9 @@ sub_putq_cb(void *arg)
// to replace this with a patricia trie, like old nanomsg had.
static int
-sub_subscribe(sub_sock *s, const void *buf, size_t sz)
+sub_subscribe(void *arg, const void *buf, size_t sz)
{
+ sub_sock * s = arg;
sub_topic *topic;
sub_topic *newtopic;
@@ -222,8 +226,9 @@ sub_subscribe(sub_sock *s, const void *buf, size_t sz)
}
static int
-sub_unsubscribe(sub_sock *s, const void *buf, size_t sz)
+sub_unsubscribe(void *arg, const void *buf, size_t sz)
{
+ sub_sock * s = arg;
sub_topic *topic;
int rv;
@@ -252,31 +257,17 @@ sub_unsubscribe(sub_sock *s, const void *buf, size_t sz)
}
static int
-sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+sub_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- sub_sock *s = arg;
- int rv = NNG_ENOTSUP;
-
- if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
- } else if (opt == nng_optid_sub_subscribe) {
- rv = sub_subscribe(s, buf, sz);
- } else if (opt == nng_optid_sub_unsubscribe) {
- rv = sub_unsubscribe(s, buf, sz);
- }
- return (rv);
+ sub_sock *s = arg;
+ return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+sub_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- sub_sock *s = arg;
- int rv = NNG_ENOTSUP;
-
- if (opt == nng_optid_raw) {
- rv = nni_getopt_int(s->raw, buf, szp);
- }
- return (rv);
+ sub_sock *s = arg;
+ return (nni_getopt_int(s->raw, buf, szp));
}
static nni_msg *
@@ -330,13 +321,32 @@ static nni_proto_pipe_ops sub_pipe_ops = {
.pipe_stop = sub_pipe_stop,
};
+static nni_proto_sock_option sub_sock_options[] = {
+ {
+ .pso_name = NNG_OPT_RAW,
+ .pso_getopt = sub_sock_getopt_raw,
+ .pso_setopt = sub_sock_setopt_raw,
+ },
+ {
+ .pso_name = NNG_OPT_SUB_SUBSCRIBE,
+ .pso_getopt = NULL,
+ .pso_setopt = sub_subscribe,
+ },
+ {
+ .pso_name = NNG_OPT_SUB_UNSUBSCRIBE,
+ .pso_getopt = NULL,
+ .pso_setopt = sub_unsubscribe,
+ },
+ // terminate list
+ { NULL, NULL, NULL },
+};
+
static nni_proto_sock_ops sub_sock_ops = {
.sock_init = sub_sock_init,
.sock_fini = sub_sock_fini,
.sock_open = sub_sock_open,
.sock_close = sub_sock_close,
- .sock_setopt = sub_sock_setopt,
- .sock_getopt = sub_sock_getopt,
+ .sock_options = sub_sock_options,
.sock_rfilter = sub_sock_rfilter,
};