summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-11-09 09:25:22 -0800
committerGarrett D'Amore <garrett@damore.org>2024-11-11 10:31:44 -0800
commit3163a56e06a58abb10c753fc77da388234d580c2 (patch)
treea35a02af1590d94cedb5fbcaef7de535d8e5b73e /src
parent18d7519234f456a487623d93bcb6daa121d0ce17 (diff)
downloadnng-3163a56e06a58abb10c753fc77da388234d580c2.tar.gz
nng-3163a56e06a58abb10c753fc77da388234d580c2.tar.bz2
nng-3163a56e06a58abb10c753fc77da388234d580c2.zip
Add nng_sub0_subscribe and friends.
These are new functions that replace `NNG_OPT_SUBSCRIBE` and `NNG_OPT_UNSUBSCRIBE`. They are provided here as a transition aid before those options are removed in NNG 2.0.
Diffstat (limited to 'src')
-rw-r--r--src/core/socket.c18
-rw-r--r--src/core/socket.h6
-rw-r--r--src/sp/protocol/pubsub0/sub.c88
-rw-r--r--src/sp/protocol/pubsub0/sub_test.c45
4 files changed, 136 insertions, 21 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index bf550a24..241b0401 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -889,12 +889,30 @@ nni_sock_proto_pipe_ops(nni_sock *sock)
return (&sock->s_pipe_ops);
}
+struct nni_proto_sock_ops *
+nni_sock_proto_ops(nni_sock *sock)
+{
+ return (&sock->s_sock_ops);
+}
+
+struct nni_proto_ctx_ops *
+nni_ctx_proto_ops(nni_ctx *ctx)
+{
+ return (&ctx->c_ops);
+}
+
void *
nni_sock_proto_data(nni_sock *sock)
{
return (sock->s_data);
}
+void *
+nni_ctx_proto_data(nni_ctx *ctx)
+{
+ return (ctx->c_data);
+}
+
int
nni_sock_add_listener(nni_sock *s, nni_listener *l)
{
diff --git a/src/core/socket.h b/src/core/socket.h
index 343310ca..d4bf1a97 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -1,5 +1,5 @@
//
-// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -26,7 +26,9 @@ extern bool nni_sock_raw(nni_sock *);
extern void *nni_sock_proto_data(nni_sock *);
extern void nni_sock_add_stat(nni_sock *, nni_stat_item *);
+extern struct nni_proto_sock_ops *nni_sock_proto_ops(nni_sock *);
extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *);
+extern struct nni_proto_ctx_ops *nni_ctx_proto_ops(nni_ctx *);
extern int nni_sock_setopt(
nni_sock *, const char *, const void *, size_t, nni_opt_type);
@@ -89,6 +91,8 @@ extern void nni_ctx_close(nni_ctx *);
// nni_ctx_id returns the context ID, which can be used with nni_ctx_find.
extern uint32_t nni_ctx_id(nni_ctx *);
+extern void *nni_ctx_proto_data(nni_ctx *);
+
// nni_ctx_recv receives asynchronously.
extern void nni_ctx_recv(nni_ctx *, nni_aio *);
diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c
index ee911153..e8db6db9 100644
--- a/src/sp/protocol/pubsub0/sub.c
+++ b/src/sp/protocol/pubsub0/sub.c
@@ -718,6 +718,94 @@ static nni_option sub0_sock_options[] = {
},
};
+int
+nng_sub0_socket_subscribe(nng_socket id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_sock *s;
+ sub0_sock *sock;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_sock_find(&s, id.id)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) {
+ nni_sock_rele(s);
+ return (NNG_ENOTSUP);
+ }
+ sock = nni_sock_proto_data(s);
+ rv = sub0_ctx_subscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE);
+ nni_sock_rele(s);
+ return (rv);
+}
+
+int
+nng_sub0_socket_unsubscribe(nng_socket id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_sock *s;
+ sub0_sock *sock;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_sock_find(&s, id.id)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_sock_proto_ops(s)->sock_init != sub0_sock_init) {
+ nni_sock_rele(s);
+ return (NNG_ENOTSUP);
+ }
+ sock = nni_sock_proto_data(s);
+ rv = sub0_ctx_unsubscribe(&sock->master, buf, sz, NNI_TYPE_OPAQUE);
+ nni_sock_rele(s);
+ return (rv);
+}
+
+int
+nng_sub0_ctx_subscribe(nng_ctx id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_ctx *c;
+ sub0_ctx *ctx;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_ctx_find(&c, id.id, false)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) {
+ nni_ctx_rele(c);
+ return (NNG_ENOTSUP);
+ }
+ ctx = nni_ctx_proto_data(c);
+ rv = sub0_ctx_subscribe(ctx, buf, sz, NNI_TYPE_OPAQUE);
+ nni_ctx_rele(c);
+ return (rv);
+}
+
+int
+nng_sub0_ctx_unsubscribe(nng_ctx id, const void *buf, size_t sz)
+{
+ int rv;
+ nni_ctx *c;
+ sub0_ctx *ctx;
+
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_ctx_find(&c, id.id, false)) != 0)) {
+ return (rv);
+ }
+ // validate the socket type
+ if (nni_ctx_proto_ops(c)->ctx_init != sub0_ctx_init) {
+ nni_ctx_rele(c);
+ return (NNG_ENOTSUP);
+ }
+ ctx = nni_ctx_proto_data(c);
+ rv = sub0_ctx_unsubscribe(ctx, buf, sz, NNI_TYPE_OPAQUE);
+ nni_ctx_rele(c);
+ return (rv);
+}
+
static nni_proto_sock_ops sub0_sock_ops = {
.sock_size = sizeof(sub0_sock),
.sock_init = sub0_sock_init,
diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c
index b830ae80..01995943 100644
--- a/src/sp/protocol/pubsub0/sub_test.c
+++ b/src/sp/protocol/pubsub0/sub_test.c
@@ -14,7 +14,7 @@ test_sub_identity(void)
{
nng_socket s;
int p;
- char * n;
+ char *n;
NUTS_PASS(nng_sub0_open(&s));
NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p));
@@ -45,8 +45,8 @@ test_sub_context_cannot_send(void)
{
nng_socket sub;
nng_ctx ctx;
- nng_msg * m;
- nng_aio * aio;
+ nng_msg *m;
+ nng_aio *aio;
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_ctx_open(&ctx, sub));
@@ -121,8 +121,8 @@ test_sub_recv_late(void)
int fd;
nng_socket pub;
nng_socket sub;
- nng_aio * aio;
- nng_msg * msg;
+ nng_aio *aio;
+ nng_msg *msg;
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_pub0_open(&pub));
@@ -180,9 +180,9 @@ void
test_sub_validate_peer(void)
{
nng_socket s1, s2;
- nng_stat * stats;
- nng_stat * reject;
- char * addr;
+ nng_stat *stats;
+ nng_stat *reject;
+ char *addr;
NUTS_ADDR(addr, "inproc");
@@ -212,7 +212,7 @@ test_sub_recv_ctx_closed(void)
{
nng_socket sub;
nng_ctx ctx;
- nng_aio * aio;
+ nng_aio *aio;
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_ctx_open(&ctx, sub));
NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
@@ -229,7 +229,7 @@ test_sub_ctx_recv_aio_stopped(void)
{
nng_socket sub;
nng_ctx ctx;
- nng_aio * aio;
+ nng_aio *aio;
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
@@ -249,7 +249,7 @@ test_sub_close_context_recv(void)
{
nng_socket sub;
nng_ctx ctx;
- nng_aio * aio;
+ nng_aio *aio;
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_ctx_open(&ctx, sub));
@@ -269,7 +269,7 @@ test_sub_ctx_recv_nonblock(void)
{
nng_socket sub;
nng_ctx ctx;
- nng_aio * aio;
+ nng_aio *aio;
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_ctx_open(&ctx, sub));
@@ -289,7 +289,7 @@ test_sub_ctx_recv_cancel(void)
{
nng_socket sub;
nng_ctx ctx;
- nng_aio * aio;
+ nng_aio *aio;
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_ctx_open(&ctx, sub));
@@ -354,6 +354,9 @@ test_sub_subscribe_option(void)
NUTS_PASS(nng_socket_set_int(sub, opt, 32));
sz = sizeof(v);
NUTS_FAIL(nng_socket_get(sub, opt, &v, &sz), NNG_EWRITEONLY);
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3));
+ NUTS_PASS(nng_sub0_socket_subscribe(sub, "abc", 3));
+ NUTS_PASS(nng_sub0_socket_unsubscribe(sub, "abc", 3));
NUTS_CLOSE(sub);
}
@@ -413,7 +416,7 @@ test_sub_drop_new(void)
{
nng_socket sub;
nng_socket pub;
- nng_msg * msg;
+ nng_msg *msg;
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_pub0_open(&pub));
@@ -439,7 +442,7 @@ test_sub_drop_old(void)
{
nng_socket sub;
nng_socket pub;
- nng_msg * msg;
+ nng_msg *msg;
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_pub0_open(&pub));
@@ -485,7 +488,7 @@ test_sub_filter(void)
NUTS_PASS(nng_send(pub, "def", 3, 0));
NUTS_PASS(nng_send(pub, "de", 2, 0)); // will not go through
NUTS_PASS(nng_send(pub, "abc123", 6, 0));
- NUTS_PASS(nng_send(pub, "xzy", 3, 0)); // does not match
+ NUTS_PASS(nng_send(pub, "xzy", 3, 0)); // does not match
NUTS_PASS(nng_send(pub, "ghi-drop", 7, 0)); // dropped by unsub
NUTS_PASS(nng_send(pub, "jkl-mno", 6, 0));
@@ -517,9 +520,9 @@ test_sub_multi_context(void)
nng_socket pub;
nng_ctx c1;
nng_ctx c2;
- nng_aio * aio1;
- nng_aio * aio2;
- nng_msg * m;
+ nng_aio *aio1;
+ nng_aio *aio2;
+ nng_msg *m;
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_pub0_open(&pub));
@@ -532,7 +535,9 @@ test_sub_multi_context(void)
NUTS_PASS(nng_ctx_set(c1, NNG_OPT_SUB_SUBSCRIBE, "all", 3));
NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "two", 3));
- NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "all", 3));
+ NUTS_PASS(nng_sub0_ctx_subscribe(c2, "all", 3));
+ NUTS_PASS(nng_sub0_ctx_subscribe(c2, "junk", 4));
+ NUTS_PASS(nng_sub0_ctx_unsubscribe(c2, "junk", 4));
nng_aio_set_timeout(aio1, 100);
nng_aio_set_timeout(aio2, 100);