From 3163a56e06a58abb10c753fc77da388234d580c2 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 9 Nov 2024 09:25:22 -0800 Subject: Add nng_sub0_subscribe and friends. These are new functions that replace `NNG_OPT_SUBSCRIBE` and `NNG_OPT_UNSUBSCRIBE`. They are provided here as a transition aid before those options are removed in NNG 2.0. --- src/sp/protocol/pubsub0/sub.c | 88 ++++++++++++++++++++++++++++++++++++++ src/sp/protocol/pubsub0/sub_test.c | 45 ++++++++++--------- 2 files changed, 113 insertions(+), 20 deletions(-) (limited to 'src/sp') diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index ee911153..e8db6db9 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -718,6 +718,94 @@ static nni_option sub0_sock_options[] = { }, }; +int +nng_sub0_socket_subscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_subscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_socket_unsubscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_unsubscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_subscribe(ctx, buf, sz, NNI_TYPE_OPAQUE); + nni_ctx_rele(c); + return (rv); +} + +int +nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_unsubscribe(ctx, buf, sz, NNI_TYPE_OPAQUE); + nni_ctx_rele(c); + return (rv); +} + static nni_proto_sock_ops sub0_sock_ops = { .sock_size = sizeof(sub0_sock), .sock_init = sub0_sock_init, diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c index b830ae80..01995943 100644 --- a/src/sp/protocol/pubsub0/sub_test.c +++ b/src/sp/protocol/pubsub0/sub_test.c @@ -14,7 +14,7 @@ test_sub_identity(void) { nng_socket s; int p; - char * n; + char *n; NUTS_PASS(nng_sub0_open(&s)); NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); @@ -45,8 +45,8 @@ test_sub_context_cannot_send(void) { nng_socket sub; nng_ctx ctx; - nng_msg * m; - nng_aio * aio; + nng_msg *m; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -121,8 +121,8 @@ test_sub_recv_late(void) int fd; nng_socket pub; nng_socket sub; - nng_aio * aio; - nng_msg * msg; + nng_aio *aio; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -180,9 +180,9 @@ void test_sub_validate_peer(void) { nng_socket s1, s2; - nng_stat * stats; - nng_stat * reject; - char * addr; + nng_stat *stats; + nng_stat *reject; + char *addr; NUTS_ADDR(addr, "inproc"); @@ -212,7 +212,7 @@ test_sub_recv_ctx_closed(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); @@ -229,7 +229,7 @@ test_sub_ctx_recv_aio_stopped(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); @@ -249,7 +249,7 @@ test_sub_close_context_recv(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -269,7 +269,7 @@ test_sub_ctx_recv_nonblock(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -289,7 +289,7 @@ test_sub_ctx_recv_cancel(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -354,6 +354,9 @@ test_sub_subscribe_option(void) NUTS_PASS(nng_socket_set_int(sub, opt, 32)); sz = sizeof(v); NUTS_FAIL(nng_socket_get(sub, opt, &v, &sz), NNG_EWRITEONLY); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); + NUTS_PASS(nng_sub0_socket_unsubscribe(sub, "abc", 3)); NUTS_CLOSE(sub); } @@ -413,7 +416,7 @@ test_sub_drop_new(void) { nng_socket sub; nng_socket pub; - nng_msg * msg; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -439,7 +442,7 @@ test_sub_drop_old(void) { nng_socket sub; nng_socket pub; - nng_msg * msg; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -485,7 +488,7 @@ test_sub_filter(void) NUTS_PASS(nng_send(pub, "def", 3, 0)); NUTS_PASS(nng_send(pub, "de", 2, 0)); // will not go through NUTS_PASS(nng_send(pub, "abc123", 6, 0)); - NUTS_PASS(nng_send(pub, "xzy", 3, 0)); // does not match + NUTS_PASS(nng_send(pub, "xzy", 3, 0)); // does not match NUTS_PASS(nng_send(pub, "ghi-drop", 7, 0)); // dropped by unsub NUTS_PASS(nng_send(pub, "jkl-mno", 6, 0)); @@ -517,9 +520,9 @@ test_sub_multi_context(void) nng_socket pub; nng_ctx c1; nng_ctx c2; - nng_aio * aio1; - nng_aio * aio2; - nng_msg * m; + nng_aio *aio1; + nng_aio *aio2; + nng_msg *m; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -532,7 +535,9 @@ test_sub_multi_context(void) NUTS_PASS(nng_ctx_set(c1, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "two", 3)); - NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c2, "all", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c2, "junk", 4)); + NUTS_PASS(nng_sub0_ctx_unsubscribe(c2, "junk", 4)); nng_aio_set_timeout(aio1, 100); nng_aio_set_timeout(aio2, 100); -- cgit v1.2.3-70-g09d2