aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/ref/migrate/nanomsg.md2
-rw-r--r--docs/ref/migrate/nng1.md7
-rw-r--r--include/nng/protocol/pubsub0/sub.h12
-rw-r--r--src/core/socket.c18
-rw-r--r--src/core/socket.h4
-rw-r--r--src/sp/protocol/pubsub0/pub_test.c2
-rw-r--r--src/sp/protocol/pubsub0/sub.c127
-rw-r--r--src/sp/protocol/pubsub0/sub_test.c59
-rw-r--r--src/sp/protocol/pubsub0/xsub_test.c13
-rw-r--r--src/tools/nngcat/nngcat.c3
-rw-r--r--src/tools/perf/pubdrop.c2
-rw-r--r--tests/multistress.c2
12 files changed, 162 insertions, 89 deletions
diff --git a/docs/ref/migrate/nanomsg.md b/docs/ref/migrate/nanomsg.md
index 695f771b..67e6505a 100644
--- a/docs/ref/migrate/nanomsg.md
+++ b/docs/ref/migrate/nanomsg.md
@@ -84,6 +84,8 @@ The following options are changed.
| `NN_IPV4ONLY` | None | Use URL such as `tcp4://` to obtain this functionality. |
| `NN_SOCKET_NAME` | `NNG_OPT_SOCKNAME` |
| `NN_MAXTTL` | `NNG_OPT_MAXTTL` |
+| `NN_SUB_SUBSCRIBE` | `nng_sub0_socket_subscribe` | No longer an option, use a function call. |
+| `NN_SUB_UNSUBSCRIBE` | `nng_sub0_socket_unsubscribe` | No longer an option, use a function call. |
## Error Codes
diff --git a/docs/ref/migrate/nng1.md b/docs/ref/migrate/nng1.md
index 3004b880..65af2382 100644
--- a/docs/ref/migrate/nng1.md
+++ b/docs/ref/migrate/nng1.md
@@ -80,6 +80,13 @@ matching the actual wire protocol values, instead of `int`.
The `NNG_OPT_RAW` option has aso been replaced by a function, `nng_socket_raw`.
+## Subscriptions
+
+The `NNG_OPT_SUB_SUBSCRIBE` and `NNG_OPT_SUB_UNSUBCRIBE` options have been replaced by
+the following functions: `nng_sub0_socket_subscribe`, `nng_sub0_socket_unsubscribe`,
+`nng_sub0_ctx_subscribe` and `nng_sub0_ctx_unsubscribe`. These functions, like the options
+they replace, are only applicable to SUB sockets.
+
## Statistics Use Constified Pointers
A number of the statistics functions take, or return, `const nng_stat *` instead
diff --git a/include/nng/protocol/pubsub0/sub.h b/include/nng/protocol/pubsub0/sub.h
index 81f50a80..a8a37823 100644
--- a/include/nng/protocol/pubsub0/sub.h
+++ b/include/nng/protocol/pubsub0/sub.h
@@ -11,6 +11,8 @@
#ifndef NNG_PROTOCOL_PUBSUB0_SUB_H
#define NNG_PROTOCOL_PUBSUB0_SUB_H
+#include <nng/nng.h>
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -19,6 +21,13 @@ NNG_DECL int nng_sub0_open(nng_socket *);
NNG_DECL int nng_sub0_open_raw(nng_socket *);
+NNG_DECL int nng_sub0_socket_subscribe(
+ nng_socket id, const void *buf, size_t sz);
+NNG_DECL int nng_sub0_socket_unsubscribe(
+ nng_socket id, const void *buf, size_t sz);
+NNG_DECL int nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz);
+NNG_DECL int nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz);
+
#ifndef nng_sub_open
#define nng_sub_open nng_sub0_open
#endif
@@ -27,9 +36,6 @@ NNG_DECL int nng_sub0_open_raw(nng_socket *);
#define nng_sub_open_raw nng_sub0_open_raw
#endif
-#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe"
-#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe"
-
#define NNG_OPT_SUB_PREFNEW "sub:prefnew"
#ifdef __cplusplus
diff --git a/src/core/socket.c b/src/core/socket.c
index b01938e5..1e0b0404 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -824,6 +824,18 @@ nni_sock_proto_pipe_ops(nni_sock *sock)
return (&sock->s_pipe_ops);
}
+struct nni_proto_sock_ops *
+nni_sock_proto_ops(nni_sock *sock)
+{
+ return (&sock->s_sock_ops);
+}
+
+struct nni_proto_ctx_ops *
+nni_ctx_proto_ops(nni_ctx *ctx)
+{
+ return (&ctx->c_ops);
+}
+
void *
nni_sock_proto_data(nni_sock *sock)
{
@@ -1142,6 +1154,12 @@ nni_ctx_find(nni_ctx **cp, uint32_t id, bool closing)
return (rv);
}
+void *
+nni_ctx_proto_data(nni_ctx *ctx)
+{
+ return (ctx->c_data);
+}
+
static void
nni_ctx_destroy(nni_ctx *ctx)
{
diff --git a/src/core/socket.h b/src/core/socket.h
index 8fab08d6..3eb943f4 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -26,7 +26,9 @@ extern bool nni_sock_raw(nni_sock *);
extern void *nni_sock_proto_data(nni_sock *);
extern void nni_sock_add_stat(nni_sock *, nni_stat_item *);
+extern struct nni_proto_sock_ops *nni_sock_proto_ops(nni_sock *);
extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *);
+extern struct nni_proto_ctx_ops *nni_ctx_proto_ops(nni_ctx *);
extern int nni_sock_setopt(
nni_sock *, const char *, const void *, size_t, nni_opt_type);
@@ -77,6 +79,8 @@ extern int nni_ctx_open(nni_ctx **, nni_sock *);
// NNG_ECLOSED unless the final argument is true.)
extern int nni_ctx_find(nni_ctx **, uint32_t, bool);
+extern void *nni_ctx_proto_data(nni_ctx *);
+
// nni_ctx_rele is called to release a hold on the context. These holds
// are acquired by either nni_ctx_open or nni_ctx_find. If the context
// is being closed (nni_ctx_close was called), and this is the last reference,
diff --git a/src/sp/protocol/pubsub0/pub_test.c b/src/sp/protocol/pubsub0/pub_test.c
index 733d6fb5..f3cf6e2d 100644
--- a/src/sp/protocol/pubsub0/pub_test.c
+++ b/src/sp/protocol/pubsub0/pub_test.c
@@ -141,7 +141,7 @@ test_pub_send_queued(void)
// test to be really meaningful.
NUTS_PASS(nng_pub0_open(&pub));
NUTS_PASS(nng_sub0_open(&sub));
- NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0));
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "", 0));
NUTS_PASS(nng_socket_set_int(pub, NNG_OPT_SENDBUF, 10));
NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 10));
NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000));
diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c
index 5d6a2a05..a4741114 100644
--- a/src/sp/protocol/pubsub0/sub.c
+++ b/src/sp/protocol/pubsub0/sub.c
@@ -13,6 +13,7 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "core/socket.h"
#include "nng/protocol/pubsub0/sub.h"
// Subscriber protocol. The SUB protocol receives messages sent to
@@ -454,13 +455,11 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
// to replace this with a patricia trie, like old nanomsg had.
static int
-sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
+sub0_ctx_subscribe(sub0_ctx *ctx, const void *buf, size_t sz)
{
- sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;
sub0_topic *new_topic;
- NNI_ARG_UNUSED(t);
nni_mtx_lock(&sock->lk);
NNI_LIST_FOREACH (&ctx->topics, topic) {
@@ -492,13 +491,11 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
+sub0_ctx_unsubscribe(sub0_ctx *ctx, const void *buf, size_t sz)
{
- sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;
size_t len;
- NNI_ARG_UNUSED(t);
nni_mtx_lock(&sock->lk);
NNI_LIST_FOREACH (&ctx->topics, topic) {
@@ -580,14 +577,6 @@ static nni_option sub0_ctx_options[] = {
.o_set = sub0_ctx_set_recv_buf_len,
},
{
- .o_name = NNG_OPT_SUB_SUBSCRIBE,
- .o_set = sub0_ctx_subscribe,
- },
- {
- .o_name = NNG_OPT_SUB_UNSUBSCRIBE,
- .o_set = sub0_ctx_unsubscribe,
- },
- {
.o_name = NNG_OPT_SUB_PREFNEW,
.o_get = sub0_ctx_get_prefer_new,
.o_set = sub0_ctx_set_prefer_new,
@@ -637,20 +626,6 @@ sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_sock *sock = arg;
- return (sub0_ctx_subscribe(&sock->master, buf, sz, t));
-}
-
-static int
-sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_sock *sock = arg;
- return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t));
-}
-
-static int
sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_sock *sock = arg;
@@ -686,14 +661,6 @@ static nni_proto_ctx_ops sub0_ctx_ops = {
static nni_option sub0_sock_options[] = {
{
- .o_name = NNG_OPT_SUB_SUBSCRIBE,
- .o_set = sub0_sock_subscribe,
- },
- {
- .o_name = NNG_OPT_SUB_UNSUBSCRIBE,
- .o_set = sub0_sock_unsubscribe,
- },
- {
.o_name = NNG_OPT_RECVBUF,
.o_get = sub0_sock_get_recv_buf_len,
.o_set = sub0_sock_set_recv_buf_len,
@@ -736,3 +703,91 @@ nng_sub0_open(nng_socket *sock)
{
return (nni_proto_open(sock, &sub0_proto));
}
+
+int
+nng_sub0_socket_subscribe(nng_socket id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_sock *s;
+ sub0_sock *sock;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_sock_find(&s, id.id)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) {
+ nni_sock_rele(s);
+ return (NNG_ENOTSUP);
+ }
+ sock = nni_sock_proto_data(s);
+ rv = sub0_ctx_subscribe(&sock->master, buf, sz);
+ nni_sock_rele(s);
+ return (rv);
+}
+
+int
+nng_sub0_socket_unsubscribe(nng_socket id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_sock *s;
+ sub0_sock *sock;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_sock_find(&s, id.id)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) {
+ nni_sock_rele(s);
+ return (NNG_ENOTSUP);
+ }
+ sock = nni_sock_proto_data(s);
+ rv = sub0_ctx_unsubscribe(&sock->master, buf, sz);
+ nni_sock_rele(s);
+ return (rv);
+}
+
+int
+nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_ctx *c;
+ sub0_ctx *ctx;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_ctx_find(&c, id.id, false)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) {
+ nni_ctx_rele(c);
+ return (NNG_ENOTSUP);
+ }
+ ctx = nni_ctx_proto_data(c);
+ rv = sub0_ctx_subscribe(ctx, buf, sz);
+ nni_ctx_rele(c);
+ return (rv);
+}
+
+int
+nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_ctx *c;
+ sub0_ctx *ctx;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_ctx_find(&c, id.id, false)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) {
+ nni_ctx_rele(c);
+ return (NNG_ENOTSUP);
+ }
+ ctx = nni_ctx_proto_data(c);
+ rv = sub0_ctx_unsubscribe(ctx, buf, sz);
+ nni_ctx_rele(c);
+ return (rv);
+}
diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c
index 9802a35e..9929de64 100644
--- a/src/sp/protocol/pubsub0/sub_test.c
+++ b/src/sp/protocol/pubsub0/sub_test.c
@@ -82,7 +82,7 @@ test_sub_poll_readable(void)
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_pub0_open(&pub));
- NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "a", 1));
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "a", 1));
NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000));
NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000));
NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd));
@@ -126,7 +126,7 @@ test_sub_recv_late(void)
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_pub0_open(&pub));
NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
- NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0));
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "", 0));
NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000));
NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000));
NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd));
@@ -325,19 +325,11 @@ test_sub_recv_buf_option(void)
static void
test_sub_subscribe_option(void)
{
- nng_socket sub;
- size_t sz;
- int v;
- const char *opt = NNG_OPT_SUB_SUBSCRIBE;
-
+ nng_socket sub;
NUTS_PASS(nng_sub0_open(&sub));
- NUTS_PASS(nng_socket_set(sub, opt, "abc", 3));
- NUTS_PASS(nng_socket_set(sub, opt, "abc", 3)); // duplicate
- NUTS_PASS(nng_socket_set_bool(sub, opt, false));
- NUTS_PASS(nng_socket_set_int(sub, opt, 32));
- sz = sizeof(v);
- NUTS_FAIL(nng_socket_get(sub, opt, &v, &sz), NNG_EWRITEONLY);
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3));
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); // duplicate
NUTS_CLOSE(sub);
}
@@ -345,23 +337,14 @@ test_sub_subscribe_option(void)
static void
test_sub_unsubscribe_option(void)
{
- nng_socket sub;
- size_t sz;
- int v;
- const char *opt1 = NNG_OPT_SUB_SUBSCRIBE;
- const char *opt2 = NNG_OPT_SUB_UNSUBSCRIBE;
+ nng_socket sub;
NUTS_PASS(nng_sub0_open(&sub));
- NUTS_PASS(nng_socket_set(sub, opt1, "abc", 3));
- NUTS_FAIL(nng_socket_set(sub, opt2, "abc123", 6), NNG_ENOENT);
- NUTS_PASS(nng_socket_set(sub, opt2, "abc", 3));
- NUTS_FAIL(nng_socket_set(sub, opt2, "abc", 3), NNG_ENOENT);
- NUTS_PASS(nng_socket_set_int(sub, opt1, 32));
- NUTS_FAIL(nng_socket_set_int(sub, opt2, 23), NNG_ENOENT);
- NUTS_PASS(nng_socket_set_int(sub, opt2, 32));
- sz = sizeof(v);
- NUTS_FAIL(nng_socket_get(sub, opt2, &v, &sz), NNG_EWRITEONLY);
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3));
+ NUTS_FAIL(nng_sub0_socket_unsubscribe(sub, "abc123", 6), NNG_ENOENT);
+ NUTS_PASS(nng_sub0_socket_unsubscribe(sub, "abc", 3));
+ NUTS_FAIL(nng_sub0_socket_unsubscribe(sub, "abc", 3), NNG_ENOENT);
NUTS_CLOSE(sub);
}
@@ -403,7 +386,7 @@ test_sub_drop_new(void)
NUTS_PASS(nng_pub0_open(&pub));
NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 2));
NUTS_PASS(nng_socket_set_bool(sub, NNG_OPT_SUB_PREFNEW, false));
- NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, NULL, 0));
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, NULL, 0));
NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 200));
NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000));
NUTS_MARRY(pub, sub);
@@ -429,7 +412,7 @@ test_sub_drop_old(void)
NUTS_PASS(nng_pub0_open(&pub));
NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 2));
NUTS_PASS(nng_socket_set_bool(sub, NNG_OPT_SUB_PREFNEW, true));
- NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, NULL, 0));
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, NULL, 0));
NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 200));
NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000));
NUTS_MARRY(pub, sub);
@@ -459,10 +442,10 @@ test_sub_filter(void)
NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 10));
// Set up some default filters
- NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "abc", 3));
- NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "def", 3));
- NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "ghi", 3));
- NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "jkl", 3));
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3));
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "def", 3));
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "ghi", 3));
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "jkl", 3));
NUTS_MARRY(pub, sub);
@@ -474,7 +457,7 @@ test_sub_filter(void)
NUTS_PASS(nng_send(pub, "jkl-mno", 6, 0));
NUTS_SLEEP(100);
- NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_UNSUBSCRIBE, "ghi", 3));
+ NUTS_PASS(nng_sub0_socket_unsubscribe(sub, "ghi", 3));
sz = sizeof(buf);
NUTS_PASS(nng_recv(sub, buf, &sz, 0));
NUTS_TRUE(sz == 3);
@@ -512,11 +495,11 @@ test_sub_multi_context(void)
NUTS_PASS(nng_ctx_open(&c1, sub));
NUTS_PASS(nng_ctx_open(&c2, sub));
- NUTS_PASS(nng_ctx_set(c1, NNG_OPT_SUB_SUBSCRIBE, "one", 3));
- NUTS_PASS(nng_ctx_set(c1, NNG_OPT_SUB_SUBSCRIBE, "all", 3));
+ NUTS_PASS(nng_sub0_ctx_subscribe(c1, "one", 3));
+ NUTS_PASS(nng_sub0_ctx_subscribe(c1, "all", 3));
- NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "two", 3));
- NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "all", 3));
+ NUTS_PASS(nng_sub0_ctx_subscribe(c2, "two", 3));
+ NUTS_PASS(nng_sub0_ctx_subscribe(c2, "all", 3));
nng_aio_set_timeout(aio1, 100);
nng_aio_set_timeout(aio2, 100);
diff --git a/src/sp/protocol/pubsub0/xsub_test.c b/src/sp/protocol/pubsub0/xsub_test.c
index eec73505..fdbc5632 100644
--- a/src/sp/protocol/pubsub0/xsub_test.c
+++ b/src/sp/protocol/pubsub0/xsub_test.c
@@ -7,7 +7,8 @@
// found online at https://opensource.org/licenses/MIT.
//
-#include "nng/nng.h"
+#include <nng/nng.h>
+#include <nng/protocol/pubsub0/sub.h>
#include <nuts.h>
static void
@@ -257,22 +258,20 @@ test_xsub_recv_buf_option(void)
static void
test_xsub_subscribe_option(void)
{
- nng_socket sub;
- const char *opt = NNG_OPT_SUB_SUBSCRIBE;
+ nng_socket sub;
NUTS_PASS(nng_sub0_open_raw(&sub));
- NUTS_FAIL(nng_socket_set(sub, opt, "abc", 3), NNG_ENOTSUP);
+ NUTS_FAIL(nng_sub0_socket_subscribe(sub, "abc", 3), NNG_ENOTSUP);
NUTS_CLOSE(sub);
}
static void
test_xsub_unsubscribe_option(void)
{
- nng_socket sub;
- const char *opt = NNG_OPT_SUB_UNSUBSCRIBE;
+ nng_socket sub;
NUTS_PASS(nng_sub0_open_raw(&sub));
- NUTS_FAIL(nng_socket_set(sub, opt, "abc", 3), NNG_ENOTSUP);
+ NUTS_FAIL(nng_sub0_socket_unsubscribe(sub, "abc", 3), NNG_ENOTSUP);
NUTS_CLOSE(sub);
}
diff --git a/src/tools/nngcat/nngcat.c b/src/tools/nngcat/nngcat.c
index bdf6823c..da3bd6d6 100644
--- a/src/tools/nngcat/nngcat.c
+++ b/src/tools/nngcat/nngcat.c
@@ -1065,8 +1065,7 @@ main(int ac, char **av)
}
for (struct topic *t = topics; t != NULL; t = t->next) {
- rv = nng_socket_set(
- sock, NNG_OPT_SUB_SUBSCRIBE, t->val, strlen(t->val));
+ rv = nng_sub0_socket_subscribe(sock, t->val, strlen(t->val));
if (rv != 0) {
fatal("Unable to subscribe to topic %s: %s", t->val,
nng_strerror(rv));
diff --git a/src/tools/perf/pubdrop.c b/src/tools/perf/pubdrop.c
index 0e493116..27d7802a 100644
--- a/src/tools/perf/pubdrop.c
+++ b/src/tools/perf/pubdrop.c
@@ -206,7 +206,7 @@ sub_client(void *arg)
if ((rv = nng_socket_set_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) {
die("setopt: %s", nng_strerror(rv));
}
- if ((rv = nng_socket_set(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
+ if ((rv = nng_sub0_socket_subscribe(sock, "", 0)) != 0) {
die("setopt: %s", nng_strerror(rv));
}
if ((rv = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) {
diff --git a/tests/multistress.c b/tests/multistress.c
index e58b94d7..b9e48014 100644
--- a/tests/multistress.c
+++ b/tests/multistress.c
@@ -581,7 +581,7 @@ pubsub0_test(int ntests)
if ((rv = nng_aio_alloc(&cli->recd, sub0_recd, cli)) != 0) {
fatal("nng_aio_alloc", rv);
}
- rv = nng_socket_set(cli->sock, NNG_OPT_SUB_SUBSCRIBE, "", 0);
+ rv = nng_sub0_socket_subscribe(cli->sock, "", 0);
if (rv != 0) {
fatal("subscribe", rv);
}