From 3163a56e06a58abb10c753fc77da388234d580c2 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 9 Nov 2024 09:25:22 -0800 Subject: Add nng_sub0_subscribe and friends. These are new functions that replace `NNG_OPT_SUBSCRIBE` and `NNG_OPT_UNSUBSCRIBE`. They are provided here as a transition aid before those options are removed in NNG 2.0. --- src/core/socket.c | 18 ++++++++ src/core/socket.h | 6 ++- src/sp/protocol/pubsub0/sub.c | 88 ++++++++++++++++++++++++++++++++++++++ src/sp/protocol/pubsub0/sub_test.c | 45 ++++++++++--------- 4 files changed, 136 insertions(+), 21 deletions(-) (limited to 'src') diff --git a/src/core/socket.c b/src/core/socket.c index bf550a24..241b0401 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -889,12 +889,30 @@ nni_sock_proto_pipe_ops(nni_sock *sock) return (&sock->s_pipe_ops); } +struct nni_proto_sock_ops * +nni_sock_proto_ops(nni_sock *sock) +{ + return (&sock->s_sock_ops); +} + +struct nni_proto_ctx_ops * +nni_ctx_proto_ops(nni_ctx *ctx) +{ + return (&ctx->c_ops); +} + void * nni_sock_proto_data(nni_sock *sock) { return (sock->s_data); } +void * +nni_ctx_proto_data(nni_ctx *ctx) +{ + return (ctx->c_data); +} + int nni_sock_add_listener(nni_sock *s, nni_listener *l) { diff --git a/src/core/socket.h b/src/core/socket.h index 343310ca..d4bf1a97 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -26,7 +26,9 @@ extern bool nni_sock_raw(nni_sock *); extern void *nni_sock_proto_data(nni_sock *); extern void nni_sock_add_stat(nni_sock *, nni_stat_item *); +extern struct nni_proto_sock_ops *nni_sock_proto_ops(nni_sock *); extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *); +extern struct nni_proto_ctx_ops *nni_ctx_proto_ops(nni_ctx *); extern int nni_sock_setopt( nni_sock *, const char *, const void *, size_t, nni_opt_type); @@ -89,6 +91,8 @@ extern void nni_ctx_close(nni_ctx *); // nni_ctx_id returns the context ID, which can be used with nni_ctx_find. extern uint32_t nni_ctx_id(nni_ctx *); +extern void *nni_ctx_proto_data(nni_ctx *); + // nni_ctx_recv receives asynchronously. extern void nni_ctx_recv(nni_ctx *, nni_aio *); diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index ee911153..e8db6db9 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -718,6 +718,94 @@ static nni_option sub0_sock_options[] = { }, }; +int +nng_sub0_socket_subscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_subscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_socket_unsubscribe(nng_socket id, const void *buf, size_t sz) +{ + int rv; + nni_sock *s; + sub0_sock *sock; + + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_find(&s, id.id)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) { + nni_sock_rele(s); + return (NNG_ENOTSUP); + } + sock = nni_sock_proto_data(s); + rv = sub0_ctx_unsubscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE); + nni_sock_rele(s); + return (rv); +} + +int +nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_subscribe(ctx, buf, sz, NNI_TYPE_OPAQUE); + nni_ctx_rele(c); + return (rv); +} + +int +nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz) +{ + int rv; + nni_ctx *c; + sub0_ctx *ctx; + + if (((rv = nni_init()) != 0) || + ((rv = nni_ctx_find(&c, id.id, false)) != 0)) { + return (rv); + } + // validate the socket type + if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) { + nni_ctx_rele(c); + return (NNG_ENOTSUP); + } + ctx = nni_ctx_proto_data(c); + rv = sub0_ctx_unsubscribe(ctx, buf, sz, NNI_TYPE_OPAQUE); + nni_ctx_rele(c); + return (rv); +} + static nni_proto_sock_ops sub0_sock_ops = { .sock_size = sizeof(sub0_sock), .sock_init = sub0_sock_init, diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c index b830ae80..01995943 100644 --- a/src/sp/protocol/pubsub0/sub_test.c +++ b/src/sp/protocol/pubsub0/sub_test.c @@ -14,7 +14,7 @@ test_sub_identity(void) { nng_socket s; int p; - char * n; + char *n; NUTS_PASS(nng_sub0_open(&s)); NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); @@ -45,8 +45,8 @@ test_sub_context_cannot_send(void) { nng_socket sub; nng_ctx ctx; - nng_msg * m; - nng_aio * aio; + nng_msg *m; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -121,8 +121,8 @@ test_sub_recv_late(void) int fd; nng_socket pub; nng_socket sub; - nng_aio * aio; - nng_msg * msg; + nng_aio *aio; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -180,9 +180,9 @@ void test_sub_validate_peer(void) { nng_socket s1, s2; - nng_stat * stats; - nng_stat * reject; - char * addr; + nng_stat *stats; + nng_stat *reject; + char *addr; NUTS_ADDR(addr, "inproc"); @@ -212,7 +212,7 @@ test_sub_recv_ctx_closed(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); @@ -229,7 +229,7 @@ test_sub_ctx_recv_aio_stopped(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); @@ -249,7 +249,7 @@ test_sub_close_context_recv(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -269,7 +269,7 @@ test_sub_ctx_recv_nonblock(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -289,7 +289,7 @@ test_sub_ctx_recv_cancel(void) { nng_socket sub; nng_ctx ctx; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_ctx_open(&ctx, sub)); @@ -354,6 +354,9 @@ test_sub_subscribe_option(void) NUTS_PASS(nng_socket_set_int(sub, opt, 32)); sz = sizeof(v); NUTS_FAIL(nng_socket_get(sub, opt, &v, &sz), NNG_EWRITEONLY); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); + NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3)); + NUTS_PASS(nng_sub0_socket_unsubscribe(sub, "abc", 3)); NUTS_CLOSE(sub); } @@ -413,7 +416,7 @@ test_sub_drop_new(void) { nng_socket sub; nng_socket pub; - nng_msg * msg; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -439,7 +442,7 @@ test_sub_drop_old(void) { nng_socket sub; nng_socket pub; - nng_msg * msg; + nng_msg *msg; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -485,7 +488,7 @@ test_sub_filter(void) NUTS_PASS(nng_send(pub, "def", 3, 0)); NUTS_PASS(nng_send(pub, "de", 2, 0)); // will not go through NUTS_PASS(nng_send(pub, "abc123", 6, 0)); - NUTS_PASS(nng_send(pub, "xzy", 3, 0)); // does not match + NUTS_PASS(nng_send(pub, "xzy", 3, 0)); // does not match NUTS_PASS(nng_send(pub, "ghi-drop", 7, 0)); // dropped by unsub NUTS_PASS(nng_send(pub, "jkl-mno", 6, 0)); @@ -517,9 +520,9 @@ test_sub_multi_context(void) nng_socket pub; nng_ctx c1; nng_ctx c2; - nng_aio * aio1; - nng_aio * aio2; - nng_msg * m; + nng_aio *aio1; + nng_aio *aio2; + nng_msg *m; NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); @@ -532,7 +535,9 @@ test_sub_multi_context(void) NUTS_PASS(nng_ctx_set(c1, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "two", 3)); - NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c2, "all", 3)); + NUTS_PASS(nng_sub0_ctx_subscribe(c2, "junk", 4)); + NUTS_PASS(nng_sub0_ctx_unsubscribe(c2, "junk", 4)); nng_aio_set_timeout(aio1, 100); nng_aio_set_timeout(aio2, 100); -- cgit v1.2.3-70-g09d2