diff options
| author | Garrett D'Amore <garrett@damore.org> | 2021-01-01 11:30:03 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2021-01-01 12:46:17 -0800 |
| commit | ed542ac45e00c9b2faa0b41f3c00de6e291e5678 (patch) | |
| tree | 673924ff077d468e6756529c2c204698d3faa47c /src/sp/protocol/pubsub0 | |
| parent | 1413b2421a82cd9b9cde178d44fb60c7893176b0 (diff) | |
| download | nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.gz nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.bz2 nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.zip | |
fixes #1345 Restructure the source tree
This is not quite complete, but it sets the stage for other
protocols (such as zmq or mqtt) to be added to the project.
Diffstat (limited to 'src/sp/protocol/pubsub0')
| -rw-r--r-- | src/sp/protocol/pubsub0/CMakeLists.txt | 24 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/pub.c | 383 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/pub_test.c | 331 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/sub.c | 755 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/sub_test.c | 624 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/xsub.c | 211 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/xsub_test.c | 376 |
7 files changed, 2704 insertions, 0 deletions
diff --git a/src/sp/protocol/pubsub0/CMakeLists.txt b/src/sp/protocol/pubsub0/CMakeLists.txt new file mode 100644 index 00000000..160b7462 --- /dev/null +++ b/src/sp/protocol/pubsub0/CMakeLists.txt @@ -0,0 +1,24 @@ +# +# Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Pub/Sub protocol +nng_directory(pubsub0) + +nng_sources_if(NNG_PROTO_PUB0 pub.c) +nng_headers_if(NNG_PROTO_PUB0 nng/protocol/pubsub0/pub.h) +nng_defines_if(NNG_PROTO_PUB0 NNG_HAVE_PUB0) + +nng_sources_if(NNG_PROTO_SUB0 sub.c xsub.c) +nng_headers_if(NNG_PROTO_SUB0 nng/protocol/pubsub0/sub.h) +nng_defines_if(NNG_PROTO_SUB0 NNG_HAVE_SUB0) + +nng_test(pub_test) +nng_test(sub_test) +nng_test(xsub_test) diff --git a/src/sp/protocol/pubsub0/pub.c b/src/sp/protocol/pubsub0/pub.c new file mode 100644 index 00000000..e3d4f16a --- /dev/null +++ b/src/sp/protocol/pubsub0/pub.c @@ -0,0 +1,383 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <string.h> + +#include "core/nng_impl.h" +#include "nng/protocol/pubsub0/pub.h" + +// Publish protocol. The PUB protocol simply sends messages out, as +// a broadcast. It has nothing more sophisticated because it does not +// perform sender-side filtering. Its best effort delivery, so anything +// that can't receive the message won't get one. + +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif + +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif + +typedef struct pub0_pipe pub0_pipe; +typedef struct pub0_sock pub0_sock; + +static void pub0_pipe_recv_cb(void *); +static void pub0_pipe_send_cb(void *); +static void pub0_sock_fini(void *); +static void pub0_pipe_fini(void *); + +// pub0_sock is our per-socket protocol private structure. +struct pub0_sock { + nni_list pipes; + nni_mtx mtx; + bool closed; + size_t sendbuf; + nni_pollable *sendable; +}; + +// pub0_pipe is our per-pipe protocol private structure. +struct pub0_pipe { + nni_pipe * pipe; + pub0_sock * pub; + nni_lmq sendq; + bool closed; + bool busy; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_list_node node; +}; + +static void +pub0_sock_fini(void *arg) +{ + pub0_sock *s = arg; + + nni_pollable_free(s->sendable); + nni_mtx_fini(&s->mtx); +} + +static int +pub0_sock_init(void *arg, nni_sock *nsock) +{ + pub0_sock *sock = arg; + int rv; + NNI_ARG_UNUSED(nsock); + + if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) { + return (rv); + } + nni_mtx_init(&sock->mtx); + NNI_LIST_INIT(&sock->pipes, pub0_pipe, node); + sock->sendbuf = 16; // fairly arbitrary + return (0); +} + +static void +pub0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +pub0_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +pub0_pipe_stop(void *arg) +{ + pub0_pipe *p = arg; + + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); +} + +static void +pub0_pipe_fini(void *arg) +{ + pub0_pipe *p = arg; + + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_lmq_fini(&p->sendq); +} + +static int +pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) +{ + pub0_pipe *p = arg; + pub0_sock *sock = s; + int rv; + size_t len; + + nni_mtx_lock(&sock->mtx); + len = sock->sendbuf; + nni_mtx_unlock(&sock->mtx); + + // XXX: consider making this depth tunable + if (((rv = nni_lmq_init(&p->sendq, len)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { + + pub0_pipe_fini(p); + return (rv); + } + + p->busy = false; + p->pipe = pipe; + p->pub = s; + return (0); +} + +static int +pub0_pipe_start(void *arg) +{ + pub0_pipe *p = arg; + pub0_sock *sock = p->pub; + + if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) { + return (NNG_EPROTO); + } + nni_mtx_lock(&sock->mtx); + nni_list_append(&sock->pipes, p); + nni_mtx_unlock(&sock->mtx); + + // Start the receiver. + nni_pipe_recv(p->pipe, p->aio_recv); + + return (0); +} + +static void +pub0_pipe_close(void *arg) +{ + pub0_pipe *p = arg; + pub0_sock *sock = p->pub; + + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); + + nni_mtx_lock(&sock->mtx); + p->closed = true; + nni_lmq_flush(&p->sendq); + + if (nni_list_active(&sock->pipes, p)) { + nni_list_remove(&sock->pipes, p); + } + nni_mtx_unlock(&sock->mtx); +} + +static void +pub0_pipe_recv_cb(void *arg) +{ + pub0_pipe *p = arg; + + // We should never receive a message -- the only valid reason for us to + // be here is on pipe close. + if (nni_aio_result(p->aio_recv) == 0) { + nni_msg_free(nni_aio_get_msg(p->aio_recv)); + } + nni_pipe_close(p->pipe); +} + +static void +pub0_pipe_send_cb(void *arg) +{ + pub0_pipe *p = arg; + pub0_sock *sock = p->pub; + nni_msg * msg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_close(p->pipe); + return; + } + + nni_mtx_lock(&sock->mtx); + if (p->closed) { + nni_mtx_unlock(&sock->mtx); + return; + } + if (nni_lmq_getq(&p->sendq, &msg) == 0) { + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->pipe, p->aio_send); + } else { + p->busy = false; + } + nni_mtx_unlock(&sock->mtx); +} + +static void +pub0_sock_recv(void *arg, nni_aio *aio) +{ + NNI_ARG_UNUSED(arg); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, NNG_ENOTSUP); + } +} + +static void +pub0_sock_send(void *arg, nni_aio *aio) +{ + pub0_sock *sock = arg; + pub0_pipe *p; + nng_msg * msg; + size_t len; + + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_mtx_lock(&sock->mtx); + NNI_LIST_FOREACH (&sock->pipes, p) { + + nni_msg_clone(msg); + if (p->busy) { + if (nni_lmq_full(&p->sendq)) { + // Make space for the new message. + nni_msg *old; + (void) nni_lmq_getq(&p->sendq, &old); + nni_msg_free(old); + } + nni_lmq_putq(&p->sendq, msg); + } else { + p->busy = true; + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->pipe, p->aio_send); + } + } + nni_mtx_unlock(&sock->mtx); + nng_msg_free(msg); + nni_aio_finish(aio, 0, len); +} + +static int +pub0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_type t) +{ + pub0_sock *sock = arg; + int fd; + int rv; + nni_mtx_lock(&sock->mtx); + // PUB sockets are *always* writable. + nni_pollable_raise(sock->sendable); + rv = nni_pollable_getfd(sock->sendable, &fd); + nni_mtx_unlock(&sock->mtx); + + if (rv == 0) { + rv = nni_copyout_int(fd, buf, szp, t); + } + return (rv); +} + +static int +pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t) +{ + pub0_sock *sock = arg; + pub0_pipe *p; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) { + return (rv); + } + + nni_mtx_lock(&sock->mtx); + sock->sendbuf = (size_t) val; + NNI_LIST_FOREACH (&sock->pipes, p) { + // If we fail part way thru (should only be ENOMEM), we + // stop short. The others would likely fail for ENOMEM as + // well anyway. There is a weird effect here where the + // buffers may have been set for *some* of the pipes, but + // we have no way to correct partial failure. + if ((rv = nni_lmq_resize(&p->sendq, (size_t) val)) != 0) { + break; + } + } + nni_mtx_unlock(&sock->mtx); + return (rv); +} + +static int +pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t) +{ + pub0_sock *sock = arg; + int val; + nni_mtx_lock(&sock->mtx); + val = (int) sock->sendbuf; + nni_mtx_unlock(&sock->mtx); + return (nni_copyout_int(val, buf, szp, t)); +} + +static nni_proto_pipe_ops pub0_pipe_ops = { + .pipe_size = sizeof(pub0_pipe), + .pipe_init = pub0_pipe_init, + .pipe_fini = pub0_pipe_fini, + .pipe_start = pub0_pipe_start, + .pipe_close = pub0_pipe_close, + .pipe_stop = pub0_pipe_stop, +}; + +static nni_option pub0_sock_options[] = { + // terminate list + { + .o_name = NNG_OPT_SENDFD, + .o_get = pub0_sock_get_sendfd, + }, + { + .o_name = NNG_OPT_SENDBUF, + .o_get = pub0_sock_get_sendbuf, + .o_set = pub0_sock_set_sendbuf, + }, + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops pub0_sock_ops = { + .sock_size = sizeof(pub0_sock), + .sock_init = pub0_sock_init, + .sock_fini = pub0_sock_fini, + .sock_open = pub0_sock_open, + .sock_close = pub0_sock_close, + .sock_send = pub0_sock_send, + .sock_recv = pub0_sock_recv, + .sock_options = pub0_sock_options, +}; + +static nni_proto pub0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUB_V0, "pub" }, + .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, + .proto_flags = NNI_PROTO_FLAG_SND, + .proto_sock_ops = &pub0_sock_ops, + .proto_pipe_ops = &pub0_pipe_ops, +}; + +static nni_proto pub0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUB_V0, "pub" }, + .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, + .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &pub0_sock_ops, + .proto_pipe_ops = &pub0_pipe_ops, +}; + +int +nng_pub0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pub0_proto)); +} + +int +nng_pub0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pub0_proto_raw)); +} diff --git a/src/sp/protocol/pubsub0/pub_test.c b/src/sp/protocol/pubsub0/pub_test.c new file mode 100644 index 00000000..a430b610 --- /dev/null +++ b/src/sp/protocol/pubsub0/pub_test.c @@ -0,0 +1,331 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +static void +test_pub_identity(void) +{ + nng_socket s; + int p; + char * n; + + NUTS_PASS(nng_pub0_open(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NUTS_PROTO(2u, 0u)); // 32 + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); + NUTS_TRUE(p == NUTS_PROTO(2u, 1u)); // 33 + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); + NUTS_MATCH(n, "pub"); + nng_strfree(n); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); + NUTS_MATCH(n, "sub"); + nng_strfree(n); + NUTS_CLOSE(s); +} + +static void +test_pub_cannot_recv(void) +{ + nng_socket pub; + + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_FAIL(nng_recv(pub, "", 0, 0), NNG_ENOTSUP); + NUTS_CLOSE(pub); +} + +static void +test_pub_no_context(void) +{ + nng_socket pub; + nng_ctx ctx; + + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_FAIL(nng_ctx_open(&ctx, pub), NNG_ENOTSUP); + NUTS_CLOSE(pub); +} + +static void +test_pub_not_readable(void) +{ + int fd; + nng_socket pub; + + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_FAIL(nng_socket_get_int(pub, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP); + NUTS_CLOSE(pub); +} + +static void +test_pub_poll_writeable(void) +{ + int fd; + nng_socket pub; + nng_socket sub; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_socket_get_int(pub, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Pub is *always* writeable + NUTS_TRUE(nuts_poll_fd(fd)); + + // Even after connect (no message yet) + NUTS_MARRY(pub, sub); + NUTS_TRUE(nuts_poll_fd(fd)); + + // Even if we send messages. + NUTS_SEND(pub, "abc"); + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_CLOSE(pub); + NUTS_CLOSE(sub); +} + +static void +test_pub_send_no_pipes(void) +{ + nng_socket pub; + + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_SEND(pub, "DROP1"); + NUTS_SEND(pub, "DROP2"); + NUTS_CLOSE(pub); +} + +void +test_pub_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char *addr; + + NUTS_ADDR(addr, "inproc"); + + NUTS_PASS(nng_pub0_open(&s1)); + NUTS_PASS(nng_pub0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + nng_stats_free(stats); +} + +static void +test_pub_send_queued(void) +{ + nng_socket pub; + nng_socket sub; + + // MB: What we really need is a mock so that we can send harder + // than we receive -- we need a way to apply back-pressure for this + // test to be really meaningful. + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0)); + NUTS_PASS(nng_socket_set_int(pub, NNG_OPT_SENDBUF, 10)); + NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 10)); + NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); + NUTS_MARRY(pub, sub); + NUTS_SEND(pub, "first"); + NUTS_SEND(pub, "second"); + NUTS_SEND(pub, "three musketeers"); + NUTS_SEND(pub, "four"); + NUTS_SLEEP(50); + NUTS_RECV(sub, "first"); + NUTS_RECV(sub, "second"); + NUTS_RECV(sub, "three musketeers"); + NUTS_RECV(sub, "four"); + + NUTS_CLOSE(pub); + NUTS_CLOSE(sub); +} +static void +test_sub_recv_ctx_closed(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_aio * aio; + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + nng_ctx_close(ctx); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); + nng_aio_free(aio); + NUTS_CLOSE(sub); +} + +static void +test_sub_ctx_recv_aio_stopped(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + + nng_aio_stop(aio); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +static void +test_sub_close_context_recv(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + nng_aio_set_timeout(aio, 1000); + nng_ctx_recv(ctx, aio); + NUTS_PASS(nng_ctx_close(ctx)); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); + + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +static void +test_sub_ctx_recv_nonblock(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_recv(ctx, aio); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +static void +test_sub_ctx_recv_cancel(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, 1000); + nng_ctx_recv(ctx, aio); + nng_aio_abort(aio, NNG_ECANCELED); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +static void +test_pub_send_buf_option(void) +{ + nng_socket pub; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_SENDBUF; + + NUTS_PASS(nng_pub0_open(&pub)); + + NUTS_PASS(nng_socket_set_int(pub, opt, 1)); + NUTS_FAIL(nng_socket_set_int(pub, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(pub, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(pub, opt, 1000000), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(pub, opt, 3)); + NUTS_PASS(nng_socket_get_int(pub, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(pub, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(pub, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(pub, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(pub, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(pub, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(pub); +} + +static void +test_pub_cooked(void) +{ + nng_socket s; + bool b; + + NUTS_PASS(nng_pub0_open(&s)); + NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); + NUTS_TRUE(!b); + NUTS_FAIL(nng_socket_set_bool(s, NNG_OPT_RAW, true), NNG_EREADONLY); + NUTS_PASS(nng_close(s)); + + // raw pub only differs in the option setting + NUTS_PASS(nng_pub0_open_raw(&s)); + NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); + NUTS_TRUE(b); + NUTS_CLOSE(s); +} + +NUTS_TESTS = { + { "pub identity", test_pub_identity }, + { "pub cannot recv", test_pub_cannot_recv }, + { "put no context", test_pub_no_context }, + { "pub not readable", test_pub_not_readable }, + { "pub poll writeable", test_pub_poll_writeable }, + { "pub validate peer", test_pub_validate_peer }, + { "pub send queued", test_pub_send_queued }, + { "pub send no pipes", test_pub_send_no_pipes }, + { "sub recv ctx closed", test_sub_recv_ctx_closed }, + { "sub recv aio ctx stopped", test_sub_ctx_recv_aio_stopped }, + { "sub close context recv", test_sub_close_context_recv }, + { "sub context recv nonblock", test_sub_ctx_recv_nonblock }, + { "sub context recv cancel", test_sub_ctx_recv_cancel }, + { "pub send buf option", test_pub_send_buf_option }, + { "pub cooked", test_pub_cooked }, + { NULL, NULL }, +}; diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c new file mode 100644 index 00000000..9f3f2283 --- /dev/null +++ b/src/sp/protocol/pubsub0/sub.c @@ -0,0 +1,755 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2019 Nathan Kent <nate@nkent.net> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdbool.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "nng/protocol/pubsub0/sub.h" + +// Subscriber protocol. The SUB protocol receives messages sent to +// it from publishers, and filters out those it is not interested in, +// only passing up ones that match known subscriptions. + +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif + +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif + +// By default we accept 128 messages. +#define SUB0_DEFAULT_RECV_BUF_LEN 128 + +// By default, prefer new messages when the queue is full. +#define SUB0_DEFAULT_PREFER_NEW true + +typedef struct sub0_pipe sub0_pipe; +typedef struct sub0_sock sub0_sock; +typedef struct sub0_ctx sub0_ctx; +typedef struct sub0_topic sub0_topic; + +static void sub0_recv_cb(void *); +static void sub0_pipe_fini(void *); + +struct sub0_topic { + nni_list_node node; + size_t len; + void * buf; +}; + +// sub0_ctx is a context for a SUB socket. The advantage of contexts is +// that different contexts can maintain different subscriptions. +struct sub0_ctx { + nni_list_node node; + sub0_sock * sock; + nni_list topics; // TODO: Consider patricia trie + nni_list recv_queue; // can have multiple pending receives + nni_lmq lmq; + bool prefer_new; +}; + +// sub0_sock is our per-socket protocol private structure. +struct sub0_sock { + nni_pollable readable; + sub0_ctx master; // default context + nni_list contexts; // all contexts + int num_contexts; + size_t recv_buf_len; + bool prefer_new; + nni_mtx lk; +}; + +// sub0_pipe is our per-pipe protocol private structure. +struct sub0_pipe { + nni_pipe * pipe; + sub0_sock *sub; + nni_aio aio_recv; +}; + +static void +sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + nni_mtx_lock(&sock->lk); + if (nni_list_active(&ctx->recv_queue, aio)) { + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&sock->lk); +} + +static void +sub0_ctx_recv(void *arg, nni_aio *aio) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + nni_msg * msg; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&sock->lk); + +again: + if (nni_lmq_empty(&ctx->lmq)) { + int rv; + if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) { + nni_mtx_unlock(&sock->lk); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&ctx->recv_queue, aio); + nni_mtx_unlock(&sock->lk); + return; + } + + (void) nni_lmq_getq(&ctx->lmq, &msg); + + if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { + nni_pollable_clear(&sock->readable); + } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } + nni_aio_set_msg(aio, msg); + nni_mtx_unlock(&sock->lk); + nni_aio_finish(aio, 0, nni_msg_len(msg)); +} + +static void +sub0_ctx_send(void *arg, nni_aio *aio) +{ + NNI_ARG_UNUSED(arg); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, NNG_ENOTSUP); + } +} + +static void +sub0_ctx_close(void *arg) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + nni_aio * aio; + + nni_mtx_lock(&sock->lk); + while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&sock->lk); +} + +static void +sub0_ctx_fini(void *arg) +{ + sub0_ctx * ctx = arg; + sub0_sock * sock = ctx->sock; + sub0_topic *topic; + + sub0_ctx_close(ctx); + + nni_mtx_lock(&sock->lk); + nni_list_remove(&sock->contexts, ctx); + sock->num_contexts--; + nni_mtx_unlock(&sock->lk); + + while ((topic = nni_list_first(&ctx->topics)) != 0) { + nni_list_remove(&ctx->topics, topic); + nni_free(topic->buf, topic->len); + NNI_FREE_STRUCT(topic); + } + + nni_lmq_fini(&ctx->lmq); +} + +static int +sub0_ctx_init(void *ctx_arg, void *sock_arg) +{ + sub0_sock *sock = sock_arg; + sub0_ctx * ctx = ctx_arg; + size_t len; + bool prefer_new; + int rv; + + nni_mtx_lock(&sock->lk); + len = sock->recv_buf_len; + prefer_new = sock->prefer_new; + + if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) { + return (rv); + } + ctx->prefer_new = prefer_new; + + nni_aio_list_init(&ctx->recv_queue); + NNI_LIST_INIT(&ctx->topics, sub0_topic, node); + + ctx->sock = sock; + + nni_list_append(&sock->contexts, ctx); + sock->num_contexts++; + nni_mtx_unlock(&sock->lk); + + return (0); +} + +static void +sub0_sock_fini(void *arg) +{ + sub0_sock *sock = arg; + + sub0_ctx_fini(&sock->master); + nni_pollable_fini(&sock->readable); + nni_mtx_fini(&sock->lk); +} + +static int +sub0_sock_init(void *arg, nni_sock *unused) +{ + sub0_sock *sock = arg; + int rv; + + NNI_ARG_UNUSED(unused); + + NNI_LIST_INIT(&sock->contexts, sub0_ctx, node); + nni_mtx_init(&sock->lk); + sock->recv_buf_len = SUB0_DEFAULT_RECV_BUF_LEN; + sock->prefer_new = SUB0_DEFAULT_PREFER_NEW; + nni_pollable_init(&sock->readable); + + if ((rv = sub0_ctx_init(&sock->master, sock)) != 0) { + sub0_sock_fini(sock); + return (rv); + } + + return (0); +} + +static void +sub0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +sub0_sock_close(void *arg) +{ + sub0_sock *sock = arg; + sub0_ctx_close(&sock->master); +} + +static void +sub0_pipe_stop(void *arg) +{ + sub0_pipe *p = arg; + + nni_aio_stop(&p->aio_recv); +} + +static void +sub0_pipe_fini(void *arg) +{ + sub0_pipe *p = arg; + + nni_aio_fini(&p->aio_recv); +} + +static int +sub0_pipe_init(void *arg, nni_pipe *pipe, void *s) +{ + sub0_pipe *p = arg; + + nni_aio_init(&p->aio_recv, sub0_recv_cb, p); + + p->pipe = pipe; + p->sub = s; + return (0); +} + +static int +sub0_pipe_start(void *arg) +{ + sub0_pipe *p = arg; + + if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUB_V0) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + + nni_pipe_recv(p->pipe, &p->aio_recv); + return (0); +} + +static void +sub0_pipe_close(void *arg) +{ + sub0_pipe *p = arg; + + nni_aio_close(&p->aio_recv); +} + +static bool +sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len) +{ + sub0_topic *topic; + + // This is a naive and trivial matcher. Replace with a real + // patricia trie later. + NNI_LIST_FOREACH (&ctx->topics, topic) { + if (len < topic->len) { + continue; + } + if ((topic->len == 0) || + (memcmp(topic->buf, body, topic->len) == 0)) { + return (true); + } + } + return (false); +} + +static void +sub0_recv_cb(void *arg) +{ + sub0_pipe *p = arg; + sub0_sock *sock = p->sub; + sub0_ctx * ctx; + nni_msg * msg; + size_t len; + uint8_t * body; + nni_list finish; + nng_aio * aio; + nni_msg * dup_msg; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->pipe); + return; + } + + nni_aio_list_init(&finish); + + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + body = nni_msg_body(msg); + len = nni_msg_len(msg); + dup_msg = NULL; + + nni_mtx_lock(&sock->lk); + // Go through all contexts. We will try to send up. + NNI_LIST_FOREACH (&sock->contexts, ctx) { + bool queued = false; + + if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { + // Cannot deliver here, as receive buffer is full. + continue; + } + + if (!sub0_matches(ctx, body, len)) { + continue; + } + + // This is a performance optimization, that ensures we + // do not duplicate a message in the common case, where there + // is only a single context. + if (sock->num_contexts > 1) { + if (nni_msg_dup(&dup_msg, msg) != 0) { + // if we cannot dup it, continue on + continue; + } + } else { + // We only have one context, so it's the only + // possible message. + dup_msg = msg; + } + + if (!nni_list_empty(&ctx->recv_queue)) { + aio = nni_list_first(&ctx->recv_queue); + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_set_msg(aio, dup_msg); + + // Save for synchronous completion + nni_list_append(&finish, aio); + } else if (nni_lmq_full(&ctx->lmq)) { + // Make space for the new message. + nni_msg *old; + (void) nni_lmq_getq(&ctx->lmq, &old); + nni_msg_free(old); + + (void) nni_lmq_putq(&ctx->lmq, dup_msg); + queued = true; + + } else { + (void) nni_lmq_putq(&ctx->lmq, dup_msg); + queued = true; + } + if (queued && ctx == &sock->master) { + nni_pollable_raise(&sock->readable); + } + } + nni_mtx_unlock(&sock->lk); + + // NB: This is slightly less efficient in that we may have + // created an extra copy in the face of e.g. two subscriptions, + // but optimizing this further would require checking the subscription + // list twice, adding complexity. If this turns out to be a problem + // we could probably add some other sophistication with a counter + // and flags on the contexts themselves. + if (msg != dup_msg) { + // If we didn't just use the message, then free our copy. + nni_msg_free(msg); + } + + while ((aio = nni_list_first(&finish)) != NULL) { + nni_list_remove(&finish, aio); + nni_aio_finish_sync(aio, 0, len); + } + + nni_pipe_recv(p->pipe, &p->aio_recv); +} + +static int +sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + int val; + nni_mtx_lock(&sock->lk); + val = (int) nni_lmq_cap(&ctx->lmq); + nni_mtx_unlock(&sock->lk); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&sock->lk); + if ((rv = nni_lmq_resize(&ctx->lmq, (size_t) val)) != 0) { + nni_mtx_unlock(&sock->lk); + return (rv); + } + + // If we change the socket, then this will change the queue for + // any new contexts. (Previously constructed contexts are unaffected.) + if (&sock->master == ctx) { + sock->recv_buf_len = (size_t) val; + } + nni_mtx_unlock(&sock->lk); + return (0); +} + +// For now we maintain subscriptions on a sorted linked list. As we do not +// expect to have huge numbers of subscriptions, and as the operation is +// really O(n), we think this is acceptable. In the future we might decide +// to replace this with a patricia trie, like old nanomsg had. + +static int +sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock * sock = ctx->sock; + sub0_topic *topic; + sub0_topic *new_topic; + NNI_ARG_UNUSED(t); + + nni_mtx_lock(&sock->lk); + NNI_LIST_FOREACH (&ctx->topics, topic) { + if (topic->len != sz) { + continue; + } + if (memcmp(topic->buf, buf, sz) == 0) { + // Already have it. + nni_mtx_unlock(&sock->lk); + return (0); + } + } + if ((new_topic = NNI_ALLOC_STRUCT(new_topic)) == NULL) { + nni_mtx_unlock(&sock->lk); + return (NNG_ENOMEM); + } + if ((sz > 0) && ((new_topic->buf = nni_alloc(sz)) == NULL)) { + nni_mtx_unlock(&sock->lk); + NNI_FREE_STRUCT(new_topic); + return (NNG_ENOMEM); + } + memcpy(new_topic->buf, buf, sz); + new_topic->len = sz; + nni_list_append(&ctx->topics, new_topic); + nni_mtx_unlock(&sock->lk); + return (0); +} + +static int +sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock * sock = ctx->sock; + sub0_topic *topic; + size_t len; + NNI_ARG_UNUSED(t); + + nni_mtx_lock(&sock->lk); + NNI_LIST_FOREACH (&ctx->topics, topic) { + if (topic->len != sz) { + continue; + } + if (memcmp(topic->buf, buf, sz) == 0) { + // Matched! + break; + } + } + if (topic == NULL) { + nni_mtx_unlock(&sock->lk); + return (NNG_ENOENT); + } + nni_list_remove(&ctx->topics, topic); + + // Now we need to make sure that any messages that are waiting still + // match the subscription. We basically just run through the queue + // and requeue those messages we need. + len = nni_lmq_len(&ctx->lmq); + for (size_t i = 0; i < len; i++) { + nni_msg *msg; + + (void) nni_lmq_getq(&ctx->lmq, &msg); + if (sub0_matches(ctx, nni_msg_body(msg), nni_msg_len(msg))) { + (void) nni_lmq_putq(&ctx->lmq, msg); + } else { + nni_msg_free(msg); + } + } + nni_mtx_unlock(&sock->lk); + + nni_free(topic->buf, topic->len); + NNI_FREE_STRUCT(topic); + return (0); +} + +static int +sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + bool val; + + nni_mtx_lock(&sock->lk); + val = ctx->prefer_new; + nni_mtx_unlock(&sock->lk); + + return (nni_copyout_bool(val, buf, szp, t)); +} + +static int +sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + bool val; + int rv; + + if ((rv = nni_copyin_bool(&val, buf, sz, t)) != 0) { + return (rv); + } + + nni_mtx_lock(&sock->lk); + ctx->prefer_new = val; + if (&sock->master == ctx) { + sock->prefer_new = val; + } + nni_mtx_unlock(&sock->lk); + + return (0); +} + +static nni_option sub0_ctx_options[] = { + { + .o_name = NNG_OPT_RECVBUF, + .o_get = sub0_ctx_get_recv_buf_len, + .o_set = sub0_ctx_set_recv_buf_len, + }, + { + .o_name = NNG_OPT_SUB_SUBSCRIBE, + .o_set = sub0_ctx_subscribe, + }, + { + .o_name = NNG_OPT_SUB_UNSUBSCRIBE, + .o_set = sub0_ctx_unsubscribe, + }, + { + .o_name = NNG_OPT_SUB_PREFNEW, + .o_get = sub0_ctx_get_prefer_new, + .o_set = sub0_ctx_set_prefer_new, + }, + { + .o_name = NULL, + }, +}; + +static void +sub0_sock_send(void *arg, nni_aio *aio) +{ + NNI_ARG_UNUSED(arg); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, NNG_ENOTSUP); + } +} + +static void +sub0_sock_recv(void *arg, nni_aio *aio) +{ + sub0_sock *sock = arg; + + sub0_ctx_recv(&sock->master, aio); +} + +static int +sub0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + sub0_sock *sock = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static int +sub0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_get_recv_buf_len(&sock->master, buf, szp, t)); +} + +static int +sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_set_recv_buf_len(&sock->master, buf, sz, t)); +} + +static int +sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_subscribe(&sock->master, buf, sz, t)); +} + +static int +sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t)); +} + +static int +sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_get_prefer_new(&sock->master, buf, szp, t)); +} + +static int +sub0_sock_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_set_prefer_new(&sock->master, buf, sz, t)); +} + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +static nni_proto_pipe_ops sub0_pipe_ops = { + .pipe_size = sizeof(sub0_pipe), + .pipe_init = sub0_pipe_init, + .pipe_fini = sub0_pipe_fini, + .pipe_start = sub0_pipe_start, + .pipe_close = sub0_pipe_close, + .pipe_stop = sub0_pipe_stop, +}; + +static nni_proto_ctx_ops sub0_ctx_ops = { + .ctx_size = sizeof(sub0_ctx), + .ctx_init = sub0_ctx_init, + .ctx_fini = sub0_ctx_fini, + .ctx_send = sub0_ctx_send, + .ctx_recv = sub0_ctx_recv, + .ctx_options = sub0_ctx_options, +}; + +static nni_option sub0_sock_options[] = { + { + .o_name = NNG_OPT_SUB_SUBSCRIBE, + .o_set = sub0_sock_subscribe, + }, + { + .o_name = NNG_OPT_SUB_UNSUBSCRIBE, + .o_set = sub0_sock_unsubscribe, + }, + { + .o_name = NNG_OPT_RECVFD, + .o_get = sub0_sock_get_recv_fd, + }, + { + .o_name = NNG_OPT_RECVBUF, + .o_get = sub0_sock_get_recv_buf_len, + .o_set = sub0_sock_set_recv_buf_len, + }, + { + .o_name = NNG_OPT_SUB_PREFNEW, + .o_get = sub0_sock_get_prefer_new, + .o_set = sub0_sock_set_prefer_new, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops sub0_sock_ops = { + .sock_size = sizeof(sub0_sock), + .sock_init = sub0_sock_init, + .sock_fini = sub0_sock_fini, + .sock_open = sub0_sock_open, + .sock_close = sub0_sock_close, + .sock_send = sub0_sock_send, + .sock_recv = sub0_sock_recv, + .sock_options = sub0_sock_options, +}; + +static nni_proto sub0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SUB_V0, "sub" }, + .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, + .proto_flags = NNI_PROTO_FLAG_RCV, + .proto_sock_ops = &sub0_sock_ops, + .proto_pipe_ops = &sub0_pipe_ops, + .proto_ctx_ops = &sub0_ctx_ops, +}; + +int +nng_sub0_open(nng_socket *sock) +{ + return (nni_proto_open(sock, &sub0_proto)); +} diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c new file mode 100644 index 00000000..b830ae80 --- /dev/null +++ b/src/sp/protocol/pubsub0/sub_test.c @@ -0,0 +1,624 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +static void +test_sub_identity(void) +{ + nng_socket s; + int p; + char * n; + + NUTS_PASS(nng_sub0_open(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NUTS_PROTO(2u, 1u)); // 33 + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); + NUTS_TRUE(p == NUTS_PROTO(2u, 0u)); // 32 + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); + NUTS_MATCH(n, "sub"); + nng_strfree(n); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); + NUTS_MATCH(n, "pub"); + nng_strfree(n); + NUTS_CLOSE(s); +} + +static void +test_sub_cannot_send(void) +{ + nng_socket sub; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_FAIL(nng_send(sub, "", 0, 0), NNG_ENOTSUP); + NUTS_CLOSE(sub); +} + +static void +test_sub_context_cannot_send(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_msg * m; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + nng_aio_set_msg(aio, m); + nng_aio_set_timeout(aio, 1000); + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ENOTSUP); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(sub); + nng_aio_free(aio); + nng_msg_free(m); +} + +static void +test_sub_not_writeable(void) +{ + int fd; + nng_socket sub; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_FAIL(nng_socket_get_int(sub, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); + NUTS_CLOSE(sub); +} + +static void +test_sub_poll_readable(void) +{ + int fd; + nng_socket pub; + nng_socket sub; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "a", 1)); + NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(pub, sub); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // If we send a message we didn't subscribe to, that doesn't matter. + NUTS_SEND(pub, "def"); + NUTS_SLEEP(100); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(pub, "abc"); + NUTS_SLEEP(100); + NUTS_TRUE(nuts_poll_fd(fd)); + + // and receiving makes it no longer ready + NUTS_RECV(sub, "abc"); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(pub); + NUTS_CLOSE(sub); +} + +static void +test_sub_recv_late(void) +{ + int fd; + nng_socket pub; + nng_socket sub; + nng_aio * aio; + nng_msg * msg; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0)); + NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(pub, sub); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + nng_recv_aio(sub, aio); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(pub, "abc"); + NUTS_SLEEP(200); + + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + msg = nng_aio_get_msg(aio); + nng_aio_set_msg(aio, NULL); + NUTS_TRUE(nng_msg_len(msg) == 4); + NUTS_MATCH(nng_msg_body(msg), "abc"); + + nng_msg_free(msg); + nng_aio_free(aio); + + NUTS_CLOSE(pub); + NUTS_CLOSE(sub); +} + +void +test_sub_context_no_poll(void) +{ + int fd; + nng_socket sub; + nng_ctx ctx; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); + NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(sub); +} + +void +test_sub_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char * addr; + + NUTS_ADDR(addr, "inproc"); + + NUTS_PASS(nng_sub0_open(&s1)); + NUTS_PASS(nng_sub0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + nng_stats_free(stats); +} + +static void +test_sub_recv_ctx_closed(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_aio * aio; + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + nng_ctx_close(ctx); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); + nng_aio_free(aio); + NUTS_CLOSE(sub); +} + +static void +test_sub_ctx_recv_aio_stopped(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + + nng_aio_stop(aio); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +static void +test_sub_close_context_recv(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + nng_aio_set_timeout(aio, 1000); + nng_ctx_recv(ctx, aio); + NUTS_PASS(nng_ctx_close(ctx)); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); + + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +static void +test_sub_ctx_recv_nonblock(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_recv(ctx, aio); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +static void +test_sub_ctx_recv_cancel(void) +{ + nng_socket sub; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_ctx_open(&ctx, sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, 1000); + nng_ctx_recv(ctx, aio); + nng_aio_abort(aio, NNG_ECANCELED); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +static void +test_sub_recv_buf_option(void) +{ + nng_socket sub; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_RECVBUF; + + NUTS_PASS(nng_sub0_open(&sub)); + + NUTS_PASS(nng_socket_set_int(sub, opt, 1)); + NUTS_FAIL(nng_socket_set_int(sub, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(sub, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(sub, opt, 1000000), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(sub, opt, 3)); + NUTS_PASS(nng_socket_get_int(sub, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(sub, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(sub, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(sub, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(sub, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(sub, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(sub); +} + +static void +test_sub_subscribe_option(void) +{ + nng_socket sub; + size_t sz; + int v; + const char *opt = NNG_OPT_SUB_SUBSCRIBE; + + NUTS_PASS(nng_sub0_open(&sub)); + + NUTS_PASS(nng_socket_set(sub, opt, "abc", 3)); + NUTS_PASS(nng_socket_set(sub, opt, "abc", 3)); // duplicate + NUTS_PASS(nng_socket_set_bool(sub, opt, false)); + NUTS_PASS(nng_socket_set_int(sub, opt, 32)); + sz = sizeof(v); + NUTS_FAIL(nng_socket_get(sub, opt, &v, &sz), NNG_EWRITEONLY); + + NUTS_CLOSE(sub); +} + +static void +test_sub_unsubscribe_option(void) +{ + nng_socket sub; + size_t sz; + int v; + const char *opt1 = NNG_OPT_SUB_SUBSCRIBE; + const char *opt2 = NNG_OPT_SUB_UNSUBSCRIBE; + + NUTS_PASS(nng_sub0_open(&sub)); + + NUTS_PASS(nng_socket_set(sub, opt1, "abc", 3)); + NUTS_FAIL(nng_socket_set(sub, opt2, "abc123", 6), NNG_ENOENT); + NUTS_PASS(nng_socket_set(sub, opt2, "abc", 3)); + NUTS_FAIL(nng_socket_set(sub, opt2, "abc", 3), NNG_ENOENT); + NUTS_PASS(nng_socket_set_int(sub, opt1, 32)); + NUTS_FAIL(nng_socket_set_int(sub, opt2, 23), NNG_ENOENT); + NUTS_PASS(nng_socket_set_int(sub, opt2, 32)); + sz = sizeof(v); + NUTS_FAIL(nng_socket_get(sub, opt2, &v, &sz), NNG_EWRITEONLY); + + NUTS_CLOSE(sub); +} + +static void +test_sub_prefer_new_option(void) +{ + nng_socket sub; + bool b; + size_t sz; + const char *opt = NNG_OPT_SUB_PREFNEW; + + NUTS_PASS(nng_sub0_open(&sub)); + + NUTS_PASS(nng_socket_set_bool(sub, opt, true)); + NUTS_PASS(nng_socket_set_bool(sub, opt, false)); + NUTS_PASS(nng_socket_get_bool(sub, opt, &b)); + NUTS_TRUE(b == false); + sz = sizeof(b); + b = true; + NUTS_PASS(nng_socket_get(sub, opt, &b, &sz)); + NUTS_TRUE(b == false); + NUTS_TRUE(sz == sizeof(bool)); + + NUTS_FAIL(nng_socket_set(sub, opt, "abc", 3), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(sub, opt, 1), NNG_EBADTYPE); + + NUTS_CLOSE(sub); +} + +void +test_sub_drop_new(void) +{ + nng_socket sub; + nng_socket pub; + nng_msg * msg; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 2)); + NUTS_PASS(nng_socket_set_bool(sub, NNG_OPT_SUB_PREFNEW, false)); + NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, NULL, 0)); + NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 200)); + NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); + NUTS_MARRY(pub, sub); + NUTS_SEND(pub, "one"); + NUTS_SEND(pub, "two"); + NUTS_SEND(pub, "three"); + NUTS_SLEEP(100); + NUTS_RECV(sub, "one"); + NUTS_RECV(sub, "two"); + NUTS_FAIL(nng_recvmsg(sub, &msg, 0), NNG_ETIMEDOUT); + NUTS_CLOSE(pub); + NUTS_CLOSE(sub); +} + +void +test_sub_drop_old(void) +{ + nng_socket sub; + nng_socket pub; + nng_msg * msg; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 2)); + NUTS_PASS(nng_socket_set_bool(sub, NNG_OPT_SUB_PREFNEW, true)); + NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, NULL, 0)); + NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 200)); + NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); + NUTS_MARRY(pub, sub); + NUTS_SEND(pub, "one"); + NUTS_SEND(pub, "two"); + NUTS_SEND(pub, "three"); + NUTS_SLEEP(100); + NUTS_RECV(sub, "two"); + NUTS_RECV(sub, "three"); + NUTS_FAIL(nng_recvmsg(sub, &msg, 0), NNG_ETIMEDOUT); + NUTS_CLOSE(pub); + NUTS_CLOSE(sub); +} + +static void +test_sub_filter(void) +{ + nng_socket sub; + nng_socket pub; + char buf[32]; + size_t sz; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 10)); + + // Set up some default filters + NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "abc", 3)); + NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "def", 3)); + NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "ghi", 3)); + NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "jkl", 3)); + + NUTS_MARRY(pub, sub); + + NUTS_PASS(nng_send(pub, "def", 3, 0)); + NUTS_PASS(nng_send(pub, "de", 2, 0)); // will not go through + NUTS_PASS(nng_send(pub, "abc123", 6, 0)); + NUTS_PASS(nng_send(pub, "xzy", 3, 0)); // does not match + NUTS_PASS(nng_send(pub, "ghi-drop", 7, 0)); // dropped by unsub + NUTS_PASS(nng_send(pub, "jkl-mno", 6, 0)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_UNSUBSCRIBE, "ghi", 3)); + sz = sizeof(buf); + NUTS_PASS(nng_recv(sub, buf, &sz, 0)); + NUTS_TRUE(sz == 3); + NUTS_TRUE(memcmp(buf, "def", 3) == 0); + + sz = sizeof(buf); + NUTS_PASS(nng_recv(sub, buf, &sz, 0)); + NUTS_TRUE(sz == 6); + NUTS_TRUE(memcmp(buf, "abc123", 6) == 0); + + sz = sizeof(buf); + NUTS_PASS(nng_recv(sub, buf, &sz, 0)); + NUTS_TRUE(sz == 6); + NUTS_TRUE(memcmp(buf, "jkl-mno", 6) == 0); + + NUTS_CLOSE(sub); + NUTS_CLOSE(pub); +} + +static void +test_sub_multi_context(void) +{ + nng_socket sub; + nng_socket pub; + nng_ctx c1; + nng_ctx c2; + nng_aio * aio1; + nng_aio * aio2; + nng_msg * m; + + NUTS_PASS(nng_sub0_open(&sub)); + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + NUTS_PASS(nng_aio_alloc(&aio2, NULL, NULL)); + NUTS_PASS(nng_ctx_open(&c1, sub)); + NUTS_PASS(nng_ctx_open(&c2, sub)); + + NUTS_PASS(nng_ctx_set(c1, NNG_OPT_SUB_SUBSCRIBE, "one", 3)); + NUTS_PASS(nng_ctx_set(c1, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); + + NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "two", 3)); + NUTS_PASS(nng_ctx_set(c2, NNG_OPT_SUB_SUBSCRIBE, "all", 3)); + + nng_aio_set_timeout(aio1, 100); + nng_aio_set_timeout(aio2, 100); + + NUTS_MARRY(pub, sub); + + NUTS_SEND(pub, "one for the money"); + NUTS_SEND(pub, "all dogs go to heaven"); + NUTS_SEND(pub, "nobody likes a snitch"); + NUTS_SEND(pub, "two for the show"); + + nng_ctx_recv(c1, aio1); + nng_aio_wait(aio1); + NUTS_PASS(nng_aio_result(aio1)); + m = nng_aio_get_msg(aio1); + NUTS_MATCH(nng_msg_body(m), "one for the money"); + nng_msg_free(m); + + nng_ctx_recv(c1, aio1); + nng_aio_wait(aio1); + NUTS_PASS(nng_aio_result(aio1)); + m = nng_aio_get_msg(aio1); + NUTS_MATCH(nng_msg_body(m), "all dogs go to heaven"); + nng_msg_free(m); + + nng_ctx_recv(c2, aio1); + nng_aio_wait(aio1); + NUTS_PASS(nng_aio_result(aio1)); + m = nng_aio_get_msg(aio1); + NUTS_MATCH(nng_msg_body(m), "all dogs go to heaven"); + nng_msg_free(m); + + nng_ctx_recv(c2, aio1); + nng_aio_wait(aio1); + NUTS_PASS(nng_aio_result(aio1)); + m = nng_aio_get_msg(aio1); + NUTS_MATCH(nng_msg_body(m), "two for the show"); + nng_msg_free(m); + + nng_ctx_recv(c1, aio1); + nng_ctx_recv(c2, aio2); + + nng_aio_wait(aio1); + nng_aio_wait(aio2); + NUTS_FAIL(nng_aio_result(aio1), NNG_ETIMEDOUT); + NUTS_FAIL(nng_aio_result(aio2), NNG_ETIMEDOUT); + NUTS_CLOSE(sub); + NUTS_CLOSE(pub); + nng_aio_free(aio1); + nng_aio_free(aio2); +} + +static void +test_sub_cooked(void) +{ + nng_socket s; + bool b; + + NUTS_PASS(nng_sub0_open(&s)); + NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); + NUTS_TRUE(!b); + NUTS_CLOSE(s); +} + +TEST_LIST = { + { "sub identity", test_sub_identity }, + { "sub cannot send", test_sub_cannot_send }, + { "sub context cannot send", test_sub_context_cannot_send }, + { "sub not writeable", test_sub_not_writeable }, + { "sub poll readable", test_sub_poll_readable }, + { "sub context does not poll", test_sub_context_no_poll }, + { "sub validate peer", test_sub_validate_peer }, + { "sub recv late", test_sub_recv_late }, + { "sub recv ctx closed", test_sub_recv_ctx_closed }, + { "sub recv aio ctx stopped", test_sub_ctx_recv_aio_stopped }, + { "sub close context recv", test_sub_close_context_recv }, + { "sub context recv nonblock", test_sub_ctx_recv_nonblock }, + { "sub context recv cancel", test_sub_ctx_recv_cancel }, + { "sub recv buf option", test_sub_recv_buf_option }, + { "sub subscribe option", test_sub_subscribe_option }, + { "sub unsubscribe option", test_sub_unsubscribe_option }, + { "sub prefer new option", test_sub_prefer_new_option }, + { "sub drop new", test_sub_drop_new }, + { "sub drop old", test_sub_drop_old }, + { "sub filter", test_sub_filter }, + { "sub multi context", test_sub_multi_context }, + { "sub cooked", test_sub_cooked }, + { NULL, NULL }, +}; diff --git a/src/sp/protocol/pubsub0/xsub.c b/src/sp/protocol/pubsub0/xsub.c new file mode 100644 index 00000000..0013b8b3 --- /dev/null +++ b/src/sp/protocol/pubsub0/xsub.c @@ -0,0 +1,211 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> + +#include "core/nng_impl.h" +#include "nng/protocol/pubsub0/sub.h" + +// Subscriber protocol. The SUB protocol receives messages sent to +// it from publishers, and filters out those it is not interested in, +// only passing up ones that match known subscriptions. + +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif + +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif + +typedef struct xsub0_pipe xsub0_pipe; +typedef struct xsub0_sock xsub0_sock; + +static void xsub0_recv_cb(void *); +static void xsub0_pipe_fini(void *); + +// xsub0_sock is our per-socket protocol private structure. +struct xsub0_sock { + nni_msgq *urq; + nni_mtx lk; +}; + +// sub0_pipe is our per-pipe protocol private structure. +struct xsub0_pipe { + nni_pipe * pipe; + xsub0_sock *sub; + nni_aio aio_recv; +}; + +static int +xsub0_sock_init(void *arg, nni_sock *sock) +{ + xsub0_sock *s = arg; + + s->urq = nni_sock_recvq(sock); + return (0); +} + +static void +xsub0_sock_fini(void *arg) +{ + xsub0_sock *s = arg; + nni_mtx_fini(&s->lk); +} + +static void +xsub0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +xsub0_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +xsub0_pipe_stop(void *arg) +{ + xsub0_pipe *p = arg; + + nni_aio_stop(&p->aio_recv); +} + +static void +xsub0_pipe_fini(void *arg) +{ + xsub0_pipe *p = arg; + + nni_aio_fini(&p->aio_recv); +} + +static int +xsub0_pipe_init(void *arg, nni_pipe *pipe, void *s) +{ + xsub0_pipe *p = arg; + + nni_aio_init(&p->aio_recv, xsub0_recv_cb, p); + + p->pipe = pipe; + p->sub = s; + return (0); +} + +static int +xsub0_pipe_start(void *arg) +{ + xsub0_pipe *p = arg; + + if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUB_V0) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + + nni_pipe_recv(p->pipe, &p->aio_recv); + return (0); +} + +static void +xsub0_pipe_close(void *arg) +{ + xsub0_pipe *p = arg; + + nni_aio_close(&p->aio_recv); +} + +static void +xsub0_recv_cb(void *arg) +{ + xsub0_pipe *p = arg; + xsub0_sock *s = p->sub; + nni_msgq * urq = s->urq; + nni_msg * msg; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->pipe); + return; + } + + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + if (nni_msgq_tryput(urq, msg) != 0) { + // This only happens for two reasons. For flow control, + // in which case we just want to discard the message and + // carry on, and for a close of the socket (which is very + // hard to achieve, since we close the pipes.) In either + // case the easiest thing to do is just free the message + // and try again. + nni_msg_free(msg); + } + nni_pipe_recv(p->pipe, &p->aio_recv); +} + +static void +xsub0_sock_send(void *arg, nni_aio *aio) +{ + NNI_ARG_UNUSED(arg); + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + +static void +xsub0_sock_recv(void *arg, nni_aio *aio) +{ + xsub0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +static nni_proto_pipe_ops xsub0_pipe_ops = { + .pipe_size = sizeof(xsub0_pipe), + .pipe_init = xsub0_pipe_init, + .pipe_fini = xsub0_pipe_fini, + .pipe_start = xsub0_pipe_start, + .pipe_close = xsub0_pipe_close, + .pipe_stop = xsub0_pipe_stop, +}; + +static nni_option xsub0_sock_options[] = { + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops xsub0_sock_ops = { + .sock_size = sizeof(xsub0_sock), + .sock_init = xsub0_sock_init, + .sock_fini = xsub0_sock_fini, + .sock_open = xsub0_sock_open, + .sock_close = xsub0_sock_close, + .sock_send = xsub0_sock_send, + .sock_recv = xsub0_sock_recv, + .sock_options = xsub0_sock_options, +}; + +static nni_proto xsub0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SUB_V0, "sub" }, + .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, + .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &xsub0_sock_ops, + .proto_pipe_ops = &xsub0_pipe_ops, +}; + +int +nng_sub0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &xsub0_proto)); +} diff --git a/src/sp/protocol/pubsub0/xsub_test.c b/src/sp/protocol/pubsub0/xsub_test.c new file mode 100644 index 00000000..19815661 --- /dev/null +++ b/src/sp/protocol/pubsub0/xsub_test.c @@ -0,0 +1,376 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +static void +test_xsub_identity(void) +{ + nng_socket s; + int p; + char * n; + + NUTS_PASS(nng_sub0_open_raw(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NUTS_PROTO(2u, 1u)); // 33 + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); + NUTS_TRUE(p == NUTS_PROTO(2u, 0u)); // 32 + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); + NUTS_MATCH(n, "sub"); + nng_strfree(n); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); + NUTS_MATCH(n, "pub"); + nng_strfree(n); + NUTS_CLOSE(s); +} + +static void +test_xsub_cannot_send(void) +{ + nng_socket sub; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_FAIL(nng_send(sub, "", 0, 0), NNG_ENOTSUP); + NUTS_CLOSE(sub); +} + +static void +test_xsub_not_writeable(void) +{ + int fd; + nng_socket sub; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_FAIL(nng_socket_get_int(sub, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); + NUTS_CLOSE(sub); +} + +static void +test_xsub_poll_readable(void) +{ + int fd; + nng_socket pub; + nng_socket sub; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(pub, sub); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(pub, "abc"); + NUTS_SLEEP(200); + + NUTS_TRUE(nuts_poll_fd(fd)); + + // and receiving makes it no longer ready + NUTS_RECV(sub, "abc"); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(pub); + NUTS_CLOSE(sub); +} + +static void +test_xsub_recv_late(void) +{ + int fd; + nng_socket pub; + nng_socket sub; + nng_aio * aio; + nng_msg * msg; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(pub, sub); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + nng_recv_aio(sub, aio); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(pub, "abc"); + NUTS_SLEEP(200); + + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + msg = nng_aio_get_msg(aio); + nng_aio_set_msg(aio, NULL); + NUTS_TRUE(nng_msg_len(msg) == 4); + NUTS_TRUE(strcmp(nng_msg_body(msg), "abc") == 0); + + nng_msg_free(msg); + nng_aio_free(aio); + + NUTS_CLOSE(pub); + NUTS_CLOSE(sub); +} + +void +test_xsub_no_context(void) +{ + nng_socket sub; + nng_ctx ctx; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_FAIL(nng_ctx_open(&ctx, sub), NNG_ENOTSUP); + NUTS_CLOSE(sub); +} + +void +test_xsub_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char * addr; + + NUTS_ADDR(addr, "inproc"); + + NUTS_PASS(nng_sub0_open_raw(&s1)); + NUTS_PASS(nng_sub0_open_raw(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + nng_stats_free(stats); +} + +static void +test_xsub_recv_closed(void) +{ + nng_socket sub; + nng_aio * aio; + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_CLOSE(sub); + nng_recv_aio(sub, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); + nng_aio_free(aio); +} + +static void +test_xsub_close_recv(void) +{ + nng_socket sub; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + nng_aio_set_timeout(aio, 1000); + nng_recv_aio(sub, aio); + NUTS_CLOSE(sub); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); + + nng_aio_free(aio); +} + +static void +test_xsub_recv_nonblock(void) +{ + nng_socket sub; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_recv_aio(sub, aio); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +static void +test_xsub_recv_buf_option(void) +{ + nng_socket sub; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_RECVBUF; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + + NUTS_PASS(nng_socket_set_int(sub, opt, 1)); + NUTS_FAIL(nng_socket_set_int(sub, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(sub, opt, 1000000), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(sub, opt, 3)); + NUTS_PASS(nng_socket_get_int(sub, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(sub, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(sub, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(sub, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(sub, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(sub, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(sub); +} + +static void +test_xsub_subscribe_option(void) +{ + nng_socket sub; + const char *opt = NNG_OPT_SUB_SUBSCRIBE; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_FAIL(nng_socket_set(sub, opt, "abc", 3), NNG_ENOTSUP); + NUTS_CLOSE(sub); +} + +static void +test_xsub_unsubscribe_option(void) +{ + nng_socket sub; + const char *opt = NNG_OPT_SUB_UNSUBSCRIBE; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_FAIL(nng_socket_set(sub, opt, "abc", 3), NNG_ENOTSUP); + NUTS_CLOSE(sub); +} + +static void +test_xsub_raw(void) +{ + nng_socket s; + bool b; + + NUTS_PASS(nng_sub0_open_raw(&s)); + NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); + NUTS_TRUE(b); + NUTS_CLOSE(s); +} + +static void +test_xsub_close_during_recv(void) +{ + nng_socket sub; + nng_socket pub; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 5)); + NUTS_PASS(nng_socket_set_int(pub, NNG_OPT_SENDBUF, 20)); + + NUTS_MARRY(pub, sub); + + for (unsigned i = 0; i < 100; i++) { + NUTS_PASS(nng_send(pub, "abc", 3, 0)); + } + NUTS_CLOSE(pub); + NUTS_CLOSE(sub); +} + +static void +test_xsub_close_during_pipe_recv(void) +{ + nng_socket sub; + nng_socket pub; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_PASS(nng_pub0_open(&pub)); + NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_socket_set_int(sub, NNG_OPT_RECVBUF, 5)); + NUTS_PASS(nng_socket_set_int(pub, NNG_OPT_SENDBUF, 20)); + + NUTS_MARRY(pub, sub); + + for (unsigned i = 0; i < 100; i++) { + int rv; + rv = nng_send(pub, "abc", 3, 0); + if (rv == NNG_ETIMEDOUT) { + break; + } + NUTS_SLEEP(1); + } + NUTS_CLOSE(sub); +} + +static void +test_xsub_recv_aio_stopped(void) +{ + nng_socket sub; + nng_aio * aio; + + NUTS_PASS(nng_sub0_open_raw(&sub)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_stop(aio); + nng_recv_aio(sub, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_CLOSE(sub); + nng_aio_free(aio); +} + +TEST_LIST = { + { "xsub identity", test_xsub_identity }, + { "xsub cannot send", test_xsub_cannot_send }, + { "xsub not writeable", test_xsub_not_writeable }, + { "xsub poll readable", test_xsub_poll_readable }, + { "xsub validate peer", test_xsub_validate_peer }, + { "xsub recv late", test_xsub_recv_late }, + { "xsub recv closed", test_xsub_recv_closed }, + { "xsub close recv", test_xsub_close_recv }, + { "xsub recv nonblock", test_xsub_recv_nonblock }, + { "xsub recv buf option", test_xsub_recv_buf_option }, + { "xsub subscribe option", test_xsub_subscribe_option }, + { "xsub unsubscribe option", test_xsub_unsubscribe_option }, + { "xsub no context", test_xsub_no_context }, + { "xsub raw", test_xsub_raw }, + { "xsub recv aio stopped", test_xsub_recv_aio_stopped }, + { "xsub close during recv ", test_xsub_close_during_recv }, + { "xsub close during pipe recv", test_xsub_close_during_pipe_recv }, + { NULL, NULL }, +}; |
