diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-11-09 09:25:22 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-11-11 10:31:44 -0800 |
| commit | 3163a56e06a58abb10c753fc77da388234d580c2 (patch) | |
| tree | a35a02af1590d94cedb5fbcaef7de535d8e5b73e /src | |
| parent | 18d7519234f456a487623d93bcb6daa121d0ce17 (diff) | |
| download | nng-3163a56e06a58abb10c753fc77da388234d580c2.tar.gz nng-3163a56e06a58abb10c753fc77da388234d580c2.tar.bz2 nng-3163a56e06a58abb10c753fc77da388234d580c2.zip | |
Add nng_sub0_subscribe and friends.
These are new functions that replace `NNG_OPT_SUBSCRIBE` and
`NNG_OPT_UNSUBSCRIBE`. They are provided here as a transition
aid before those options are removed in NNG 2.0.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/socket.c | 18 | ||||
| -rw-r--r-- | src/core/socket.h | 6 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/sub.c | 88 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/sub_test.c | 45 |
4 files changed, 136 insertions, 21 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index bf550a24..241b0401 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -889,12 +889,30 @@ nni_sock_proto_pipe_ops(nni_sock *sock) return (&sock->s_pipe_ops); } +struct nni_proto_sock_ops * +nni_sock_proto_ops(nni_sock *sock) +{ + return (&sock->s_sock_ops); +} + +struct nni_proto_ctx_ops * +nni_ctx_proto_ops(nni_ctx *ctx) +{ + return (&ctx->c_ops); +} + void * nni_sock_proto_data(nni_sock *sock) { return (sock->s_data); } +void * +nni_ctx_proto_data(nni_ctx *ctx) +{ + return (ctx->c_data); +} + int nni_sock_add_listener(nni_sock *s, nni_listener *l) { diff --git a/src/core/socket.h b/src/core/socket.h index 343310ca..d4bf1a97 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -26,7 +26,9 @@ extern bool nni_sock_raw(nni_sock *); extern void *nni_sock_proto_data(nni_sock *); extern void nni_sock_add_stat(nni_sock *, nni_stat_item *); +extern struct nni_proto_sock_ops *nni_sock_proto_ops(nni_sock *); extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *); +extern struct nni_proto_ctx_ops *nni_ctx_proto_ops(nni_ctx *); extern int nni_sock_setopt( nni_sock *, const char *, const void *, size_t, nni_opt_type); @@ -89,6 +91,8 @@ extern void nni_ctx_close(nni_ctx *); // nni_ctx_id returns the context ID, which can be used with nni_ctx_find. extern uint32_t nni_ctx_id(nni_ctx *); +extern void *nni_ctx_proto_data(nni_ctx *); + // nni_ctx_recv receives asynchronously. extern void nni_ctx_recv(nni_ctx *, nni_aio *); diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index ee911153..e8db6db9 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -718,6 +718,94 @@ static nni_option sub0_sock_options[] = { }, }; +int +nng_sub0_socket_subscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_subscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_socket_unsubscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_unsubscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_subscribe(ctx, buf, sz, NNI_TYPE_OPAQUE); + nni_ctx_rele(c); + return (rv); +} + +int +nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_unsubscribe(ctx, buf, sz, NNI_TYPE_OPAQUE); + nni_ctx_rele(c); + return (rv); +} + static nni_proto_sock_ops sub0_sock_ops = { .sock_size = sizeof(sub0_sock), .sock_init = sub0_sock_init, diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c index b830ae80..01995943 100644 --- a/src/sp/protocol/pubsub0/sub_test.c +++ b/src/sp/protocol/pubsub0/sub_test.c @@ -14,7 +14,7 @@ test_sub_identity(void) { nng_socket s; int p; - char * n; + char *n; NUTS_PASS(nng_sub0_open(&s)); NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); @@ -45,8 +45,8 @@ test_sub_context_cannot_send(void) { nng_socket sub; nng_ctx ctx; - nng_msg * m; - nng_aio * aio; + nng_msg *m; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -121,8 +121,8 @@ test_sub_recv_late(void) int fd; nng_socket pub; nng_socket sub; - nng_aio * aio; - nng_msg * msg; + nng_aio *aio; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -180,9 +180,9 @@ void test_sub_validate_peer(void) { nng_socket s1, s2; - nng_stat * stats; - nng_stat * reject; - char * addr; + nng_stat *stats; + nng_stat *reject; + char *addr; NUTS_ADDR(addr, "inproc"); @@ -212,7 +212,7 @@ test_sub_recv_ctx_closed(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); @@ -229,7 +229,7 @@ test_sub_ctx_recv_aio_stopped(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); @@ -249,7 +249,7 @@ test_sub_close_context_recv(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -269,7 +269,7 @@ test_sub_ctx_recv_nonblock(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -289,7 +289,7 @@ test_sub_ctx_recv_cancel(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -354,6 +354,9 @@ test_sub_subscribe_option(void) NUTS_PASS(nng_socket_set_int(sub, opt, 32)); sz = sizeof(v); NUTS_FAIL(nng_socket_get(sub, opt, &v, &sz), NNG_EWRITEONLY); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); + NUTS_PASS(nng_sub0_socket_unsubscribe(sub, "abc", 3)); NUTS_CLOSE(sub); } @@ -413,7 +416,7 @@ test_sub_drop_new(void) { nng_socket sub; nng_socket pub; - nng_msg * msg; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -439,7 +442,7 @@ test_sub_drop_old(void) { nng_socket sub; nng_socket pub; - nng_msg * msg; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -485,7 +488,7 @@ test_sub_filter(void) NUTS_PASS(nng_send(pub, "def", 3, 0)); NUTS_PASS(nng_send(pub, "de", 2, 0)); // will not go through NUTS_PASS(nng_send(pub, "abc123", 6, 0)); - NUTS_PASS(nng_send(pub, "xzy", 3, 0)); // does not match + NUTS_PASS(nng_send(pub, "xzy", 3, 0)); // does not match NUTS_PASS(nng_send(pub, "ghi-drop", 7, 0)); // dropped by unsub NUTS_PASS(nng_send(pub, "jkl-mno", 6, 0)); @@ -517,9 +520,9 @@ test_sub_multi_context(void) nng_socket pub; nng_ctx c1; nng_ctx c2; - nng_aio * aio1; - nng_aio * aio2; - nng_msg * m; + nng_aio *aio1; + nng_aio *aio2; + nng_msg *m; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -532,7 +535,9 @@ test_sub_multi_context(void) NUTS_PASS(nng_ctx_set(c1, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "two", 3)); - NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c2, "all", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c2, "junk", 4)); + NUTS_PASS(nng_sub0_ctx_unsubscribe(c2, "junk", 4)); nng_aio_set_timeout(aio1, 100); nng_aio_set_timeout(aio2, 100); |
