diff options
Diffstat (limited to 'src/protocol/pubsub/sub.c')
| -rw-r--r-- | src/protocol/pubsub/sub.c | 58 |
1 files changed, 34 insertions, 24 deletions
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 5f4b497d..7b6f4904 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -13,6 +13,9 @@ #include "core/nng_impl.h" +const char *nng_opt_sub_subscribe = NNG_OPT_SUB_SUBSCRIBE; +const char *nng_opt_sub_unsubscribe = NNG_OPT_SUB_UNSUBSCRIBE; + // Subscriber protocol. The SUB protocol receives messages sent to // it from publishers, and filters out those it is not interested in, // only passing up ones that match known subscriptions. @@ -178,8 +181,9 @@ sub_putq_cb(void *arg) // to replace this with a patricia trie, like old nanomsg had. static int -sub_subscribe(sub_sock *s, const void *buf, size_t sz) +sub_subscribe(void *arg, const void *buf, size_t sz) { + sub_sock * s = arg; sub_topic *topic; sub_topic *newtopic; @@ -222,8 +226,9 @@ sub_subscribe(sub_sock *s, const void *buf, size_t sz) } static int -sub_unsubscribe(sub_sock *s, const void *buf, size_t sz) +sub_unsubscribe(void *arg, const void *buf, size_t sz) { + sub_sock * s = arg; sub_topic *topic; int rv; @@ -252,31 +257,17 @@ sub_unsubscribe(sub_sock *s, const void *buf, size_t sz) } static int -sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) +sub_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - sub_sock *s = arg; - int rv = NNG_ENOTSUP; - - if (opt == nng_optid_raw) { - rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); - } else if (opt == nng_optid_sub_subscribe) { - rv = sub_subscribe(s, buf, sz); - } else if (opt == nng_optid_sub_unsubscribe) { - rv = sub_unsubscribe(s, buf, sz); - } - return (rv); + sub_sock *s = arg; + return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) +sub_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - sub_sock *s = arg; - int rv = NNG_ENOTSUP; - - if (opt == nng_optid_raw) { - rv = nni_getopt_int(s->raw, buf, szp); - } - return (rv); + sub_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); } static nni_msg * @@ -330,13 +321,32 @@ static nni_proto_pipe_ops sub_pipe_ops = { .pipe_stop = sub_pipe_stop, }; +static nni_proto_sock_option sub_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = sub_sock_getopt_raw, + .pso_setopt = sub_sock_setopt_raw, + }, + { + .pso_name = NNG_OPT_SUB_SUBSCRIBE, + .pso_getopt = NULL, + .pso_setopt = sub_subscribe, + }, + { + .pso_name = NNG_OPT_SUB_UNSUBSCRIBE, + .pso_getopt = NULL, + .pso_setopt = sub_unsubscribe, + }, + // terminate list + { NULL, NULL, NULL }, +}; + static nni_proto_sock_ops sub_sock_ops = { .sock_init = sub_sock_init, .sock_fini = sub_sock_fini, .sock_open = sub_sock_open, .sock_close = sub_sock_close, - .sock_setopt = sub_sock_setopt, - .sock_getopt = sub_sock_getopt, + .sock_options = sub_sock_options, .sock_rfilter = sub_sock_rfilter, }; |
