diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-11-09 09:25:22 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-11-11 10:31:44 -0800 |
| commit | 3163a56e06a58abb10c753fc77da388234d580c2 (patch) | |
| tree | a35a02af1590d94cedb5fbcaef7de535d8e5b73e /src/sp | |
| parent | 18d7519234f456a487623d93bcb6daa121d0ce17 (diff) | |
| download | nng-3163a56e06a58abb10c753fc77da388234d580c2.tar.gz nng-3163a56e06a58abb10c753fc77da388234d580c2.tar.bz2 nng-3163a56e06a58abb10c753fc77da388234d580c2.zip | |
Add nng_sub0_subscribe and friends.
These are new functions that replace `NNG_OPT_SUBSCRIBE` and
`NNG_OPT_UNSUBSCRIBE`. They are provided here as a transition
aid before those options are removed in NNG 2.0.
Diffstat (limited to 'src/sp')
| -rw-r--r-- | src/sp/protocol/pubsub0/sub.c | 88 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/sub_test.c | 45 |
2 files changed, 113 insertions, 20 deletions
diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index ee911153..e8db6db9 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -718,6 +718,94 @@ static nni_option sub0_sock_options[] = { }, }; +int +nng_sub0_socket_subscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_subscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_socket_unsubscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_unsubscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_subscribe(ctx, buf, sz, NNI_TYPE_OPAQUE); + nni_ctx_rele(c); + return (rv); +} + +int +nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_unsubscribe(ctx, buf, sz, NNI_TYPE_OPAQUE); + nni_ctx_rele(c); + return (rv); +} + static nni_proto_sock_ops sub0_sock_ops = { .sock_size = sizeof(sub0_sock), .sock_init = sub0_sock_init, diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c index b830ae80..01995943 100644 --- a/src/sp/protocol/pubsub0/sub_test.c +++ b/src/sp/protocol/pubsub0/sub_test.c @@ -14,7 +14,7 @@ test_sub_identity(void) { nng_socket s; int p; - char * n; + char *n; NUTS_PASS(nng_sub0_open(&s)); NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); @@ -45,8 +45,8 @@ test_sub_context_cannot_send(void) { nng_socket sub; nng_ctx ctx; - nng_msg * m; - nng_aio * aio; + nng_msg *m; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -121,8 +121,8 @@ test_sub_recv_late(void) int fd; nng_socket pub; nng_socket sub; - nng_aio * aio; - nng_msg * msg; + nng_aio *aio; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -180,9 +180,9 @@ void test_sub_validate_peer(void) { nng_socket s1, s2; - nng_stat * stats; - nng_stat * reject; - char * addr; + nng_stat *stats; + nng_stat *reject; + char *addr; NUTS_ADDR(addr, "inproc"); @@ -212,7 +212,7 @@ test_sub_recv_ctx_closed(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); @@ -229,7 +229,7 @@ test_sub_ctx_recv_aio_stopped(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); @@ -249,7 +249,7 @@ test_sub_close_context_recv(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -269,7 +269,7 @@ test_sub_ctx_recv_nonblock(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -289,7 +289,7 @@ test_sub_ctx_recv_cancel(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -354,6 +354,9 @@ test_sub_subscribe_option(void) NUTS_PASS(nng_socket_set_int(sub, opt, 32)); sz = sizeof(v); NUTS_FAIL(nng_socket_get(sub, opt, &v, &sz), NNG_EWRITEONLY); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); + NUTS_PASS(nng_sub0_socket_unsubscribe(sub, "abc", 3)); NUTS_CLOSE(sub); } @@ -413,7 +416,7 @@ test_sub_drop_new(void) { nng_socket sub; nng_socket pub; - nng_msg * msg; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -439,7 +442,7 @@ test_sub_drop_old(void) { nng_socket sub; nng_socket pub; - nng_msg * msg; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -485,7 +488,7 @@ test_sub_filter(void) NUTS_PASS(nng_send(pub, "def", 3, 0)); NUTS_PASS(nng_send(pub, "de", 2, 0)); // will not go through NUTS_PASS(nng_send(pub, "abc123", 6, 0)); - NUTS_PASS(nng_send(pub, "xzy", 3, 0)); // does not match + NUTS_PASS(nng_send(pub, "xzy", 3, 0)); // does not match NUTS_PASS(nng_send(pub, "ghi-drop", 7, 0)); // dropped by unsub NUTS_PASS(nng_send(pub, "jkl-mno", 6, 0)); @@ -517,9 +520,9 @@ test_sub_multi_context(void) nng_socket pub; nng_ctx c1; nng_ctx c2; - nng_aio * aio1; - nng_aio * aio2; - nng_msg * m; + nng_aio *aio1; + nng_aio *aio2; + nng_msg *m; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -532,7 +535,9 @@ test_sub_multi_context(void) NUTS_PASS(nng_ctx_set(c1, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "two", 3)); - NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c2, "all", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c2, "junk", 4)); + NUTS_PASS(nng_sub0_ctx_unsubscribe(c2, "junk", 4)); nng_aio_set_timeout(aio1, 100); nng_aio_set_timeout(aio2, 100); |
