diff options
| author | Garrett D'Amore <garrett@damore.org> | 2021-01-01 11:30:03 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2021-01-01 12:46:17 -0800 |
| commit | ed542ac45e00c9b2faa0b41f3c00de6e291e5678 (patch) | |
| tree | 673924ff077d468e6756529c2c204698d3faa47c /src/sp/protocol/pubsub0/sub.c | |
| parent | 1413b2421a82cd9b9cde178d44fb60c7893176b0 (diff) | |
| download | nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.gz nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.bz2 nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.zip | |
fixes #1345 Restructure the source tree
This is not quite complete, but it sets the stage for other
protocols (such as zmq or mqtt) to be added to the project.
Diffstat (limited to 'src/sp/protocol/pubsub0/sub.c')
| -rw-r--r-- | src/sp/protocol/pubsub0/sub.c | 755 |
1 files changed, 755 insertions, 0 deletions
diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c new file mode 100644 index 00000000..9f3f2283 --- /dev/null +++ b/src/sp/protocol/pubsub0/sub.c @@ -0,0 +1,755 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2019 Nathan Kent <nate@nkent.net> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdbool.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "nng/protocol/pubsub0/sub.h" + +// Subscriber protocol. The SUB protocol receives messages sent to +// it from publishers, and filters out those it is not interested in, +// only passing up ones that match known subscriptions. + +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif + +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif + +// By default we accept 128 messages. +#define SUB0_DEFAULT_RECV_BUF_LEN 128 + +// By default, prefer new messages when the queue is full. +#define SUB0_DEFAULT_PREFER_NEW true + +typedef struct sub0_pipe sub0_pipe; +typedef struct sub0_sock sub0_sock; +typedef struct sub0_ctx sub0_ctx; +typedef struct sub0_topic sub0_topic; + +static void sub0_recv_cb(void *); +static void sub0_pipe_fini(void *); + +struct sub0_topic { + nni_list_node node; + size_t len; + void * buf; +}; + +// sub0_ctx is a context for a SUB socket. The advantage of contexts is +// that different contexts can maintain different subscriptions. +struct sub0_ctx { + nni_list_node node; + sub0_sock * sock; + nni_list topics; // TODO: Consider patricia trie + nni_list recv_queue; // can have multiple pending receives + nni_lmq lmq; + bool prefer_new; +}; + +// sub0_sock is our per-socket protocol private structure. +struct sub0_sock { + nni_pollable readable; + sub0_ctx master; // default context + nni_list contexts; // all contexts + int num_contexts; + size_t recv_buf_len; + bool prefer_new; + nni_mtx lk; +}; + +// sub0_pipe is our per-pipe protocol private structure. +struct sub0_pipe { + nni_pipe * pipe; + sub0_sock *sub; + nni_aio aio_recv; +}; + +static void +sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + nni_mtx_lock(&sock->lk); + if (nni_list_active(&ctx->recv_queue, aio)) { + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&sock->lk); +} + +static void +sub0_ctx_recv(void *arg, nni_aio *aio) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + nni_msg * msg; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&sock->lk); + +again: + if (nni_lmq_empty(&ctx->lmq)) { + int rv; + if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) { + nni_mtx_unlock(&sock->lk); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&ctx->recv_queue, aio); + nni_mtx_unlock(&sock->lk); + return; + } + + (void) nni_lmq_getq(&ctx->lmq, &msg); + + if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { + nni_pollable_clear(&sock->readable); + } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } + nni_aio_set_msg(aio, msg); + nni_mtx_unlock(&sock->lk); + nni_aio_finish(aio, 0, nni_msg_len(msg)); +} + +static void +sub0_ctx_send(void *arg, nni_aio *aio) +{ + NNI_ARG_UNUSED(arg); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, NNG_ENOTSUP); + } +} + +static void +sub0_ctx_close(void *arg) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + nni_aio * aio; + + nni_mtx_lock(&sock->lk); + while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&sock->lk); +} + +static void +sub0_ctx_fini(void *arg) +{ + sub0_ctx * ctx = arg; + sub0_sock * sock = ctx->sock; + sub0_topic *topic; + + sub0_ctx_close(ctx); + + nni_mtx_lock(&sock->lk); + nni_list_remove(&sock->contexts, ctx); + sock->num_contexts--; + nni_mtx_unlock(&sock->lk); + + while ((topic = nni_list_first(&ctx->topics)) != 0) { + nni_list_remove(&ctx->topics, topic); + nni_free(topic->buf, topic->len); + NNI_FREE_STRUCT(topic); + } + + nni_lmq_fini(&ctx->lmq); +} + +static int +sub0_ctx_init(void *ctx_arg, void *sock_arg) +{ + sub0_sock *sock = sock_arg; + sub0_ctx * ctx = ctx_arg; + size_t len; + bool prefer_new; + int rv; + + nni_mtx_lock(&sock->lk); + len = sock->recv_buf_len; + prefer_new = sock->prefer_new; + + if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) { + return (rv); + } + ctx->prefer_new = prefer_new; + + nni_aio_list_init(&ctx->recv_queue); + NNI_LIST_INIT(&ctx->topics, sub0_topic, node); + + ctx->sock = sock; + + nni_list_append(&sock->contexts, ctx); + sock->num_contexts++; + nni_mtx_unlock(&sock->lk); + + return (0); +} + +static void +sub0_sock_fini(void *arg) +{ + sub0_sock *sock = arg; + + sub0_ctx_fini(&sock->master); + nni_pollable_fini(&sock->readable); + nni_mtx_fini(&sock->lk); +} + +static int +sub0_sock_init(void *arg, nni_sock *unused) +{ + sub0_sock *sock = arg; + int rv; + + NNI_ARG_UNUSED(unused); + + NNI_LIST_INIT(&sock->contexts, sub0_ctx, node); + nni_mtx_init(&sock->lk); + sock->recv_buf_len = SUB0_DEFAULT_RECV_BUF_LEN; + sock->prefer_new = SUB0_DEFAULT_PREFER_NEW; + nni_pollable_init(&sock->readable); + + if ((rv = sub0_ctx_init(&sock->master, sock)) != 0) { + sub0_sock_fini(sock); + return (rv); + } + + return (0); +} + +static void +sub0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +sub0_sock_close(void *arg) +{ + sub0_sock *sock = arg; + sub0_ctx_close(&sock->master); +} + +static void +sub0_pipe_stop(void *arg) +{ + sub0_pipe *p = arg; + + nni_aio_stop(&p->aio_recv); +} + +static void +sub0_pipe_fini(void *arg) +{ + sub0_pipe *p = arg; + + nni_aio_fini(&p->aio_recv); +} + +static int +sub0_pipe_init(void *arg, nni_pipe *pipe, void *s) +{ + sub0_pipe *p = arg; + + nni_aio_init(&p->aio_recv, sub0_recv_cb, p); + + p->pipe = pipe; + p->sub = s; + return (0); +} + +static int +sub0_pipe_start(void *arg) +{ + sub0_pipe *p = arg; + + if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUB_V0) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + + nni_pipe_recv(p->pipe, &p->aio_recv); + return (0); +} + +static void +sub0_pipe_close(void *arg) +{ + sub0_pipe *p = arg; + + nni_aio_close(&p->aio_recv); +} + +static bool +sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len) +{ + sub0_topic *topic; + + // This is a naive and trivial matcher. Replace with a real + // patricia trie later. + NNI_LIST_FOREACH (&ctx->topics, topic) { + if (len < topic->len) { + continue; + } + if ((topic->len == 0) || + (memcmp(topic->buf, body, topic->len) == 0)) { + return (true); + } + } + return (false); +} + +static void +sub0_recv_cb(void *arg) +{ + sub0_pipe *p = arg; + sub0_sock *sock = p->sub; + sub0_ctx * ctx; + nni_msg * msg; + size_t len; + uint8_t * body; + nni_list finish; + nng_aio * aio; + nni_msg * dup_msg; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->pipe); + return; + } + + nni_aio_list_init(&finish); + + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + body = nni_msg_body(msg); + len = nni_msg_len(msg); + dup_msg = NULL; + + nni_mtx_lock(&sock->lk); + // Go through all contexts. We will try to send up. + NNI_LIST_FOREACH (&sock->contexts, ctx) { + bool queued = false; + + if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { + // Cannot deliver here, as receive buffer is full. + continue; + } + + if (!sub0_matches(ctx, body, len)) { + continue; + } + + // This is a performance optimization, that ensures we + // do not duplicate a message in the common case, where there + // is only a single context. + if (sock->num_contexts > 1) { + if (nni_msg_dup(&dup_msg, msg) != 0) { + // if we cannot dup it, continue on + continue; + } + } else { + // We only have one context, so it's the only + // possible message. + dup_msg = msg; + } + + if (!nni_list_empty(&ctx->recv_queue)) { + aio = nni_list_first(&ctx->recv_queue); + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_set_msg(aio, dup_msg); + + // Save for synchronous completion + nni_list_append(&finish, aio); + } else if (nni_lmq_full(&ctx->lmq)) { + // Make space for the new message. + nni_msg *old; + (void) nni_lmq_getq(&ctx->lmq, &old); + nni_msg_free(old); + + (void) nni_lmq_putq(&ctx->lmq, dup_msg); + queued = true; + + } else { + (void) nni_lmq_putq(&ctx->lmq, dup_msg); + queued = true; + } + if (queued && ctx == &sock->master) { + nni_pollable_raise(&sock->readable); + } + } + nni_mtx_unlock(&sock->lk); + + // NB: This is slightly less efficient in that we may have + // created an extra copy in the face of e.g. two subscriptions, + // but optimizing this further would require checking the subscription + // list twice, adding complexity. If this turns out to be a problem + // we could probably add some other sophistication with a counter + // and flags on the contexts themselves. + if (msg != dup_msg) { + // If we didn't just use the message, then free our copy. + nni_msg_free(msg); + } + + while ((aio = nni_list_first(&finish)) != NULL) { + nni_list_remove(&finish, aio); + nni_aio_finish_sync(aio, 0, len); + } + + nni_pipe_recv(p->pipe, &p->aio_recv); +} + +static int +sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + int val; + nni_mtx_lock(&sock->lk); + val = (int) nni_lmq_cap(&ctx->lmq); + nni_mtx_unlock(&sock->lk); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&sock->lk); + if ((rv = nni_lmq_resize(&ctx->lmq, (size_t) val)) != 0) { + nni_mtx_unlock(&sock->lk); + return (rv); + } + + // If we change the socket, then this will change the queue for + // any new contexts. (Previously constructed contexts are unaffected.) + if (&sock->master == ctx) { + sock->recv_buf_len = (size_t) val; + } + nni_mtx_unlock(&sock->lk); + return (0); +} + +// For now we maintain subscriptions on a sorted linked list. As we do not +// expect to have huge numbers of subscriptions, and as the operation is +// really O(n), we think this is acceptable. In the future we might decide +// to replace this with a patricia trie, like old nanomsg had. + +static int +sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock * sock = ctx->sock; + sub0_topic *topic; + sub0_topic *new_topic; + NNI_ARG_UNUSED(t); + + nni_mtx_lock(&sock->lk); + NNI_LIST_FOREACH (&ctx->topics, topic) { + if (topic->len != sz) { + continue; + } + if (memcmp(topic->buf, buf, sz) == 0) { + // Already have it. + nni_mtx_unlock(&sock->lk); + return (0); + } + } + if ((new_topic = NNI_ALLOC_STRUCT(new_topic)) == NULL) { + nni_mtx_unlock(&sock->lk); + return (NNG_ENOMEM); + } + if ((sz > 0) && ((new_topic->buf = nni_alloc(sz)) == NULL)) { + nni_mtx_unlock(&sock->lk); + NNI_FREE_STRUCT(new_topic); + return (NNG_ENOMEM); + } + memcpy(new_topic->buf, buf, sz); + new_topic->len = sz; + nni_list_append(&ctx->topics, new_topic); + nni_mtx_unlock(&sock->lk); + return (0); +} + +static int +sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock * sock = ctx->sock; + sub0_topic *topic; + size_t len; + NNI_ARG_UNUSED(t); + + nni_mtx_lock(&sock->lk); + NNI_LIST_FOREACH (&ctx->topics, topic) { + if (topic->len != sz) { + continue; + } + if (memcmp(topic->buf, buf, sz) == 0) { + // Matched! + break; + } + } + if (topic == NULL) { + nni_mtx_unlock(&sock->lk); + return (NNG_ENOENT); + } + nni_list_remove(&ctx->topics, topic); + + // Now we need to make sure that any messages that are waiting still + // match the subscription. We basically just run through the queue + // and requeue those messages we need. + len = nni_lmq_len(&ctx->lmq); + for (size_t i = 0; i < len; i++) { + nni_msg *msg; + + (void) nni_lmq_getq(&ctx->lmq, &msg); + if (sub0_matches(ctx, nni_msg_body(msg), nni_msg_len(msg))) { + (void) nni_lmq_putq(&ctx->lmq, msg); + } else { + nni_msg_free(msg); + } + } + nni_mtx_unlock(&sock->lk); + + nni_free(topic->buf, topic->len); + NNI_FREE_STRUCT(topic); + return (0); +} + +static int +sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + bool val; + + nni_mtx_lock(&sock->lk); + val = ctx->prefer_new; + nni_mtx_unlock(&sock->lk); + + return (nni_copyout_bool(val, buf, szp, t)); +} + +static int +sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + bool val; + int rv; + + if ((rv = nni_copyin_bool(&val, buf, sz, t)) != 0) { + return (rv); + } + + nni_mtx_lock(&sock->lk); + ctx->prefer_new = val; + if (&sock->master == ctx) { + sock->prefer_new = val; + } + nni_mtx_unlock(&sock->lk); + + return (0); +} + +static nni_option sub0_ctx_options[] = { + { + .o_name = NNG_OPT_RECVBUF, + .o_get = sub0_ctx_get_recv_buf_len, + .o_set = sub0_ctx_set_recv_buf_len, + }, + { + .o_name = NNG_OPT_SUB_SUBSCRIBE, + .o_set = sub0_ctx_subscribe, + }, + { + .o_name = NNG_OPT_SUB_UNSUBSCRIBE, + .o_set = sub0_ctx_unsubscribe, + }, + { + .o_name = NNG_OPT_SUB_PREFNEW, + .o_get = sub0_ctx_get_prefer_new, + .o_set = sub0_ctx_set_prefer_new, + }, + { + .o_name = NULL, + }, +}; + +static void +sub0_sock_send(void *arg, nni_aio *aio) +{ + NNI_ARG_UNUSED(arg); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, NNG_ENOTSUP); + } +} + +static void +sub0_sock_recv(void *arg, nni_aio *aio) +{ + sub0_sock *sock = arg; + + sub0_ctx_recv(&sock->master, aio); +} + +static int +sub0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + sub0_sock *sock = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static int +sub0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_get_recv_buf_len(&sock->master, buf, szp, t)); +} + +static int +sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_set_recv_buf_len(&sock->master, buf, sz, t)); +} + +static int +sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_subscribe(&sock->master, buf, sz, t)); +} + +static int +sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t)); +} + +static int +sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_get_prefer_new(&sock->master, buf, szp, t)); +} + +static int +sub0_sock_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_set_prefer_new(&sock->master, buf, sz, t)); +} + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +static nni_proto_pipe_ops sub0_pipe_ops = { + .pipe_size = sizeof(sub0_pipe), + .pipe_init = sub0_pipe_init, + .pipe_fini = sub0_pipe_fini, + .pipe_start = sub0_pipe_start, + .pipe_close = sub0_pipe_close, + .pipe_stop = sub0_pipe_stop, +}; + +static nni_proto_ctx_ops sub0_ctx_ops = { + .ctx_size = sizeof(sub0_ctx), + .ctx_init = sub0_ctx_init, + .ctx_fini = sub0_ctx_fini, + .ctx_send = sub0_ctx_send, + .ctx_recv = sub0_ctx_recv, + .ctx_options = sub0_ctx_options, +}; + +static nni_option sub0_sock_options[] = { + { + .o_name = NNG_OPT_SUB_SUBSCRIBE, + .o_set = sub0_sock_subscribe, + }, + { + .o_name = NNG_OPT_SUB_UNSUBSCRIBE, + .o_set = sub0_sock_unsubscribe, + }, + { + .o_name = NNG_OPT_RECVFD, + .o_get = sub0_sock_get_recv_fd, + }, + { + .o_name = NNG_OPT_RECVBUF, + .o_get = sub0_sock_get_recv_buf_len, + .o_set = sub0_sock_set_recv_buf_len, + }, + { + .o_name = NNG_OPT_SUB_PREFNEW, + .o_get = sub0_sock_get_prefer_new, + .o_set = sub0_sock_set_prefer_new, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops sub0_sock_ops = { + .sock_size = sizeof(sub0_sock), + .sock_init = sub0_sock_init, + .sock_fini = sub0_sock_fini, + .sock_open = sub0_sock_open, + .sock_close = sub0_sock_close, + .sock_send = sub0_sock_send, + .sock_recv = sub0_sock_recv, + .sock_options = sub0_sock_options, +}; + +static nni_proto sub0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SUB_V0, "sub" }, + .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, + .proto_flags = NNI_PROTO_FLAG_RCV, + .proto_sock_ops = &sub0_sock_ops, + .proto_pipe_ops = &sub0_pipe_ops, + .proto_ctx_ops = &sub0_ctx_ops, +}; + +int +nng_sub0_open(nng_socket *sock) +{ + return (nni_proto_open(sock, &sub0_proto)); +} |
