diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-11-03 00:51:15 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-11-03 00:58:37 -0700 |
| commit | b3fc8a44119d7ab90366a1b92a5e1327ebcb8145 (patch) | |
| tree | 994c0199184fdea3d56eeb61b252e10733588947 | |
| parent | 02ec0b55cbee5de4d0fd688ce0ebddf08178dc98 (diff) | |
| download | nng-b3fc8a44119d7ab90366a1b92a5e1327ebcb8145.tar.gz nng-b3fc8a44119d7ab90366a1b92a5e1327ebcb8145.tar.bz2 nng-b3fc8a44119d7ab90366a1b92a5e1327ebcb8145.zip | |
Replace NNG_OPT_SUB_SUBSCRIBE/UNSUBSCRIBE with functions.
The main purpose is to eliminate the NNI_TYPE_OPAQUE options,
by putting these into their own first class, protocol-specific, functions.
| -rw-r--r-- | docs/ref/migrate/nanomsg.md | 2 | ||||
| -rw-r--r-- | docs/ref/migrate/nng1.md | 7 | ||||
| -rw-r--r-- | include/nng/protocol/pubsub0/sub.h | 12 | ||||
| -rw-r--r-- | src/core/socket.c | 18 | ||||
| -rw-r--r-- | src/core/socket.h | 4 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/pub_test.c | 2 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/sub.c | 127 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/sub_test.c | 59 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/xsub_test.c | 13 | ||||
| -rw-r--r-- | src/tools/nngcat/nngcat.c | 3 | ||||
| -rw-r--r-- | src/tools/perf/pubdrop.c | 2 | ||||
| -rw-r--r-- | tests/multistress.c | 2 |
12 files changed, 162 insertions, 89 deletions
diff --git a/docs/ref/migrate/nanomsg.md b/docs/ref/migrate/nanomsg.md index 695f771b..67e6505a 100644 --- a/docs/ref/migrate/nanomsg.md +++ b/docs/ref/migrate/nanomsg.md @@ -84,6 +84,8 @@ The following options are changed. | `NN_IPV4ONLY` | None | Use URL such as `tcp4://` to obtain this functionality. | | `NN_SOCKET_NAME` | `NNG_OPT_SOCKNAME` | | `NN_MAXTTL` | `NNG_OPT_MAXTTL` | +| `NN_SUB_SUBSCRIBE` | `nng_sub0_socket_subscribe` | No longer an option, use a function call. | +| `NN_SUB_UNSUBSCRIBE` | `nng_sub0_socket_unsubscribe` | No longer an option, use a function call. | ## Error Codes diff --git a/docs/ref/migrate/nng1.md b/docs/ref/migrate/nng1.md index 3004b880..65af2382 100644 --- a/docs/ref/migrate/nng1.md +++ b/docs/ref/migrate/nng1.md @@ -80,6 +80,13 @@ matching the actual wire protocol values, instead of `int`. The `NNG_OPT_RAW` option has aso been replaced by a function, `nng_socket_raw`. +## Subscriptions + +The `NNG_OPT_SUB_SUBSCRIBE` and `NNG_OPT_SUB_UNSUBCRIBE` options have been replaced by +the following functions: `nng_sub0_socket_subscribe`, `nng_sub0_socket_unsubscribe`, +`nng_sub0_ctx_subscribe` and `nng_sub0_ctx_unsubscribe`. These functions, like the options +they replace, are only applicable to SUB sockets. + ## Statistics Use Constified Pointers A number of the statistics functions take, or return, `const nng_stat *` instead diff --git a/include/nng/protocol/pubsub0/sub.h b/include/nng/protocol/pubsub0/sub.h index 81f50a80..a8a37823 100644 --- a/include/nng/protocol/pubsub0/sub.h +++ b/include/nng/protocol/pubsub0/sub.h @@ -11,6 +11,8 @@ #ifndef NNG_PROTOCOL_PUBSUB0_SUB_H #define NNG_PROTOCOL_PUBSUB0_SUB_H +#include <nng/nng.h> + #ifdef __cplusplus extern "C" { #endif @@ -19,6 +21,13 @@ NNG_DECL int nng_sub0_open(nng_socket *); NNG_DECL int nng_sub0_open_raw(nng_socket *); +NNG_DECL int nng_sub0_socket_subscribe( + nng_socket id, const void *buf, size_t sz); +NNG_DECL int nng_sub0_socket_unsubscribe( + nng_socket id, const void *buf, size_t sz); +NNG_DECL int nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz); +NNG_DECL int nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz); + #ifndef nng_sub_open #define nng_sub_open nng_sub0_open #endif @@ -27,9 +36,6 @@ NNG_DECL int nng_sub0_open_raw(nng_socket *); #define nng_sub_open_raw nng_sub0_open_raw #endif -#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" -#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" - #define NNG_OPT_SUB_PREFNEW "sub:prefnew" #ifdef __cplusplus diff --git a/src/core/socket.c b/src/core/socket.c index b01938e5..1e0b0404 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -824,6 +824,18 @@ nni_sock_proto_pipe_ops(nni_sock *sock) return (&sock->s_pipe_ops); } +struct nni_proto_sock_ops * +nni_sock_proto_ops(nni_sock *sock) +{ + return (&sock->s_sock_ops); +} + +struct nni_proto_ctx_ops * +nni_ctx_proto_ops(nni_ctx *ctx) +{ + return (&ctx->c_ops); +} + void * nni_sock_proto_data(nni_sock *sock) { @@ -1142,6 +1154,12 @@ nni_ctx_find(nni_ctx **cp, uint32_t id, bool closing) return (rv); } +void * +nni_ctx_proto_data(nni_ctx *ctx) +{ + return (ctx->c_data); +} + static void nni_ctx_destroy(nni_ctx *ctx) { diff --git a/src/core/socket.h b/src/core/socket.h index 8fab08d6..3eb943f4 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -26,7 +26,9 @@ extern bool nni_sock_raw(nni_sock *); extern void *nni_sock_proto_data(nni_sock *); extern void nni_sock_add_stat(nni_sock *, nni_stat_item *); +extern struct nni_proto_sock_ops *nni_sock_proto_ops(nni_sock *); extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *); +extern struct nni_proto_ctx_ops *nni_ctx_proto_ops(nni_ctx *); extern int nni_sock_setopt( nni_sock *, const char *, const void *, size_t, nni_opt_type); @@ -77,6 +79,8 @@ extern int nni_ctx_open(nni_ctx **, nni_sock *); // NNG_ECLOSED unless the final argument is true.) extern int nni_ctx_find(nni_ctx **, uint32_t, bool); +extern void *nni_ctx_proto_data(nni_ctx *); + // nni_ctx_rele is called to release a hold on the context. These holds // are acquired by either nni_ctx_open or nni_ctx_find. If the context // is being closed (nni_ctx_close was called), and this is the last reference, diff --git a/src/sp/protocol/pubsub0/pub_test.c b/src/sp/protocol/pubsub0/pub_test.c index 733d6fb5..f3cf6e2d 100644 --- a/src/sp/protocol/pubsub0/pub_test.c +++ b/src/sp/protocol/pubsub0/pub_test.c @@ -141,7 +141,7 @@ test_pub_send_queued(void) // test to be really meaningful. NUTS_PASS(nng_pub0_open(&pub)); NUTS_PASS(nng_sub0_open(&sub)); - NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "", 0)); NUTS_PASS(nng_socket_set_int(pub, NNG_OPT_SENDBUF, 10)); NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 10)); NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index 5d6a2a05..a4741114 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -13,6 +13,7 @@ #include <string.h> #include "core/nng_impl.h" +#include "core/socket.h" #include "nng/protocol/pubsub0/sub.h" // Subscriber protocol. The SUB protocol receives messages sent to @@ -454,13 +455,11 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) // to replace this with a patricia trie, like old nanomsg had. static int -sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) +sub0_ctx_subscribe(sub0_ctx *ctx, const void *buf, size_t sz) { - sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; sub0_topic *topic; sub0_topic *new_topic; - NNI_ARG_UNUSED(t); nni_mtx_lock(&sock->lk); NNI_LIST_FOREACH (&ctx->topics, topic) { @@ -492,13 +491,11 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) } static int -sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) +sub0_ctx_unsubscribe(sub0_ctx *ctx, const void *buf, size_t sz) { - sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; sub0_topic *topic; size_t len; - NNI_ARG_UNUSED(t); nni_mtx_lock(&sock->lk); NNI_LIST_FOREACH (&ctx->topics, topic) { @@ -580,14 +577,6 @@ static nni_option sub0_ctx_options[] = { .o_set = sub0_ctx_set_recv_buf_len, }, { - .o_name = NNG_OPT_SUB_SUBSCRIBE, - .o_set = sub0_ctx_subscribe, - }, - { - .o_name = NNG_OPT_SUB_UNSUBSCRIBE, - .o_set = sub0_ctx_unsubscribe, - }, - { .o_name = NNG_OPT_SUB_PREFNEW, .o_get = sub0_ctx_get_prefer_new, .o_set = sub0_ctx_set_prefer_new, @@ -637,20 +626,6 @@ sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) } static int -sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_sock *sock = arg; - return (sub0_ctx_subscribe(&sock->master, buf, sz, t)); -} - -static int -sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_sock *sock = arg; - return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t)); -} - -static int sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) { sub0_sock *sock = arg; @@ -686,14 +661,6 @@ static nni_proto_ctx_ops sub0_ctx_ops = { static nni_option sub0_sock_options[] = { { - .o_name = NNG_OPT_SUB_SUBSCRIBE, - .o_set = sub0_sock_subscribe, - }, - { - .o_name = NNG_OPT_SUB_UNSUBSCRIBE, - .o_set = sub0_sock_unsubscribe, - }, - { .o_name = NNG_OPT_RECVBUF, .o_get = sub0_sock_get_recv_buf_len, .o_set = sub0_sock_set_recv_buf_len, @@ -736,3 +703,91 @@ nng_sub0_open(nng_socket *sock) { return (nni_proto_open(sock, &sub0_proto)); } + +int +nng_sub0_socket_subscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_subscribe(&sock->master, buf, sz); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_socket_unsubscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_unsubscribe(&sock->master, buf, sz); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_subscribe(ctx, buf, sz); + nni_ctx_rele(c); + return (rv); +} + +int +nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_unsubscribe(ctx, buf, sz); + nni_ctx_rele(c); + return (rv); +} diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c index 9802a35e..9929de64 100644 --- a/src/sp/protocol/pubsub0/sub_test.c +++ b/src/sp/protocol/pubsub0/sub_test.c @@ -82,7 +82,7 @@ test_sub_poll_readable(void) NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); - NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "a", 1)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "a", 1)); NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd)); @@ -126,7 +126,7 @@ test_sub_recv_late(void) NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "", 0)); NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd)); @@ -325,19 +325,11 @@ test_sub_recv_buf_option(void) static void test_sub_subscribe_option(void) { - nng_socket sub; - size_t sz; - int v; - const char *opt = NNG_OPT_SUB_SUBSCRIBE; - + nng_socket sub; NUTS_PASS(nng_sub0_open(&sub)); - NUTS_PASS(nng_socket_set(sub, opt, "abc", 3)); - NUTS_PASS(nng_socket_set(sub, opt, "abc", 3)); // duplicate - NUTS_PASS(nng_socket_set_bool(sub, opt, false)); - NUTS_PASS(nng_socket_set_int(sub, opt, 32)); - sz = sizeof(v); - NUTS_FAIL(nng_socket_get(sub, opt, &v, &sz), NNG_EWRITEONLY); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); // duplicate NUTS_CLOSE(sub); } @@ -345,23 +337,14 @@ test_sub_subscribe_option(void) static void test_sub_unsubscribe_option(void) { - nng_socket sub; - size_t sz; - int v; - const char *opt1 = NNG_OPT_SUB_SUBSCRIBE; - const char *opt2 = NNG_OPT_SUB_UNSUBSCRIBE; + nng_socket sub; NUTS_PASS(nng_sub0_open(&sub)); - NUTS_PASS(nng_socket_set(sub, opt1, "abc", 3)); - NUTS_FAIL(nng_socket_set(sub, opt2, "abc123", 6), NNG_ENOENT); - NUTS_PASS(nng_socket_set(sub, opt2, "abc", 3)); - NUTS_FAIL(nng_socket_set(sub, opt2, "abc", 3), NNG_ENOENT); - NUTS_PASS(nng_socket_set_int(sub, opt1, 32)); - NUTS_FAIL(nng_socket_set_int(sub, opt2, 23), NNG_ENOENT); - NUTS_PASS(nng_socket_set_int(sub, opt2, 32)); - sz = sizeof(v); - NUTS_FAIL(nng_socket_get(sub, opt2, &v, &sz), NNG_EWRITEONLY); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); + NUTS_FAIL(nng_sub0_socket_unsubscribe(sub, "abc123", 6), NNG_ENOENT); + NUTS_PASS(nng_sub0_socket_unsubscribe(sub, "abc", 3)); + NUTS_FAIL(nng_sub0_socket_unsubscribe(sub, "abc", 3), NNG_ENOENT); NUTS_CLOSE(sub); } @@ -403,7 +386,7 @@ test_sub_drop_new(void) NUTS_PASS(nng_pub0_open(&pub)); NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 2)); NUTS_PASS(nng_socket_set_bool(sub, NNG_OPT_SUB_PREFNEW, false)); - NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, NULL, 0)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, NULL, 0)); NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 200)); NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); NUTS_MARRY(pub, sub); @@ -429,7 +412,7 @@ test_sub_drop_old(void) NUTS_PASS(nng_pub0_open(&pub)); NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 2)); NUTS_PASS(nng_socket_set_bool(sub, NNG_OPT_SUB_PREFNEW, true)); - NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, NULL, 0)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, NULL, 0)); NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 200)); NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); NUTS_MARRY(pub, sub); @@ -459,10 +442,10 @@ test_sub_filter(void) NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 10)); // Set up some default filters - NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "abc", 3)); - NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "def", 3)); - NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "ghi", 3)); - NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "jkl", 3)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "def", 3)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "ghi", 3)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "jkl", 3)); NUTS_MARRY(pub, sub); @@ -474,7 +457,7 @@ test_sub_filter(void) NUTS_PASS(nng_send(pub, "jkl-mno", 6, 0)); NUTS_SLEEP(100); - NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_UNSUBSCRIBE, "ghi", 3)); + NUTS_PASS(nng_sub0_socket_unsubscribe(sub, "ghi", 3)); sz = sizeof(buf); NUTS_PASS(nng_recv(sub, buf, &sz, 0)); NUTS_TRUE(sz == 3); @@ -512,11 +495,11 @@ test_sub_multi_context(void) NUTS_PASS(nng_ctx_open(&c1, sub)); NUTS_PASS(nng_ctx_open(&c2, sub)); - NUTS_PASS(nng_ctx_set(c1, NNG_OPT_SUB_SUBSCRIBE, "one", 3)); - NUTS_PASS(nng_ctx_set(c1, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c1, "one", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c1, "all", 3)); - NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "two", 3)); - NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c2, "two", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c2, "all", 3)); nng_aio_set_timeout(aio1, 100); nng_aio_set_timeout(aio2, 100); diff --git a/src/sp/protocol/pubsub0/xsub_test.c b/src/sp/protocol/pubsub0/xsub_test.c index eec73505..fdbc5632 100644 --- a/src/sp/protocol/pubsub0/xsub_test.c +++ b/src/sp/protocol/pubsub0/xsub_test.c @@ -7,7 +7,8 @@ // found online at https://opensource.org/licenses/MIT. // -#include "nng/nng.h" +#include <nng/nng.h> +#include <nng/protocol/pubsub0/sub.h> #include <nuts.h> static void @@ -257,22 +258,20 @@ test_xsub_recv_buf_option(void) static void test_xsub_subscribe_option(void) { - nng_socket sub; - const char *opt = NNG_OPT_SUB_SUBSCRIBE; + nng_socket sub; NUTS_PASS(nng_sub0_open_raw(&sub)); - NUTS_FAIL(nng_socket_set(sub, opt, "abc", 3), NNG_ENOTSUP); + NUTS_FAIL(nng_sub0_socket_subscribe(sub, "abc", 3), NNG_ENOTSUP); NUTS_CLOSE(sub); } static void test_xsub_unsubscribe_option(void) { - nng_socket sub; - const char *opt = NNG_OPT_SUB_UNSUBSCRIBE; + nng_socket sub; NUTS_PASS(nng_sub0_open_raw(&sub)); - NUTS_FAIL(nng_socket_set(sub, opt, "abc", 3), NNG_ENOTSUP); + NUTS_FAIL(nng_sub0_socket_unsubscribe(sub, "abc", 3), NNG_ENOTSUP); NUTS_CLOSE(sub); } diff --git a/src/tools/nngcat/nngcat.c b/src/tools/nngcat/nngcat.c index bdf6823c..da3bd6d6 100644 --- a/src/tools/nngcat/nngcat.c +++ b/src/tools/nngcat/nngcat.c @@ -1065,8 +1065,7 @@ main(int ac, char **av) } for (struct topic *t = topics; t != NULL; t = t->next) { - rv = nng_socket_set( - sock, NNG_OPT_SUB_SUBSCRIBE, t->val, strlen(t->val)); + rv = nng_sub0_socket_subscribe(sock, t->val, strlen(t->val)); if (rv != 0) { fatal("Unable to subscribe to topic %s: %s", t->val, nng_strerror(rv)); diff --git a/src/tools/perf/pubdrop.c b/src/tools/perf/pubdrop.c index 0e493116..27d7802a 100644 --- a/src/tools/perf/pubdrop.c +++ b/src/tools/perf/pubdrop.c @@ -206,7 +206,7 @@ sub_client(void *arg) if ((rv = nng_socket_set_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) { die("setopt: %s", nng_strerror(rv)); } - if ((rv = nng_socket_set(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) { + if ((rv = nng_sub0_socket_subscribe(sock, "", 0)) != 0) { die("setopt: %s", nng_strerror(rv)); } if ((rv = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) { diff --git a/tests/multistress.c b/tests/multistress.c index e58b94d7..b9e48014 100644 --- a/tests/multistress.c +++ b/tests/multistress.c @@ -581,7 +581,7 @@ pubsub0_test(int ntests) if ((rv = nng_aio_alloc(&cli->recd, sub0_recd, cli)) != 0) { fatal("nng_aio_alloc", rv); } - rv = nng_socket_set(cli->sock, NNG_OPT_SUB_SUBSCRIBE, "", 0); + rv = nng_sub0_socket_subscribe(cli->sock, "", 0); if (rv != 0) { fatal("subscribe", rv); } |
