aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0/sub.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pubsub0/sub.c')
-rw-r--r--src/protocol/pubsub0/sub.c755
1 files changed, 0 insertions, 755 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
deleted file mode 100644
index 9f3f2283..00000000
--- a/src/protocol/pubsub0/sub.c
+++ /dev/null
@@ -1,755 +0,0 @@
-//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2019 Nathan Kent <nate@nkent.net>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include <stdbool.h>
-#include <string.h>
-
-#include "core/nng_impl.h"
-#include "nng/protocol/pubsub0/sub.h"
-
-// Subscriber protocol. The SUB protocol receives messages sent to
-// it from publishers, and filters out those it is not interested in,
-// only passing up ones that match known subscriptions.
-
-#ifndef NNI_PROTO_SUB_V0
-#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1)
-#endif
-
-#ifndef NNI_PROTO_PUB_V0
-#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0)
-#endif
-
-// By default we accept 128 messages.
-#define SUB0_DEFAULT_RECV_BUF_LEN 128
-
-// By default, prefer new messages when the queue is full.
-#define SUB0_DEFAULT_PREFER_NEW true
-
-typedef struct sub0_pipe sub0_pipe;
-typedef struct sub0_sock sub0_sock;
-typedef struct sub0_ctx sub0_ctx;
-typedef struct sub0_topic sub0_topic;
-
-static void sub0_recv_cb(void *);
-static void sub0_pipe_fini(void *);
-
-struct sub0_topic {
- nni_list_node node;
- size_t len;
- void * buf;
-};
-
-// sub0_ctx is a context for a SUB socket. The advantage of contexts is
-// that different contexts can maintain different subscriptions.
-struct sub0_ctx {
- nni_list_node node;
- sub0_sock * sock;
- nni_list topics; // TODO: Consider patricia trie
- nni_list recv_queue; // can have multiple pending receives
- nni_lmq lmq;
- bool prefer_new;
-};
-
-// sub0_sock is our per-socket protocol private structure.
-struct sub0_sock {
- nni_pollable readable;
- sub0_ctx master; // default context
- nni_list contexts; // all contexts
- int num_contexts;
- size_t recv_buf_len;
- bool prefer_new;
- nni_mtx lk;
-};
-
-// sub0_pipe is our per-pipe protocol private structure.
-struct sub0_pipe {
- nni_pipe * pipe;
- sub0_sock *sub;
- nni_aio aio_recv;
-};
-
-static void
-sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
-{
- sub0_ctx * ctx = arg;
- sub0_sock *sock = ctx->sock;
- nni_mtx_lock(&sock->lk);
- if (nni_list_active(&ctx->recv_queue, aio)) {
- nni_list_remove(&ctx->recv_queue, aio);
- nni_aio_finish_error(aio, rv);
- }
- nni_mtx_unlock(&sock->lk);
-}
-
-static void
-sub0_ctx_recv(void *arg, nni_aio *aio)
-{
- sub0_ctx * ctx = arg;
- sub0_sock *sock = ctx->sock;
- nni_msg * msg;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
-
- nni_mtx_lock(&sock->lk);
-
-again:
- if (nni_lmq_empty(&ctx->lmq)) {
- int rv;
- if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) {
- nni_mtx_unlock(&sock->lk);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_list_append(&ctx->recv_queue, aio);
- nni_mtx_unlock(&sock->lk);
- return;
- }
-
- (void) nni_lmq_getq(&ctx->lmq, &msg);
-
- if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) {
- nni_pollable_clear(&sock->readable);
- }
- if ((msg = nni_msg_unique(msg)) == NULL) {
- goto again;
- }
- nni_aio_set_msg(aio, msg);
- nni_mtx_unlock(&sock->lk);
- nni_aio_finish(aio, 0, nni_msg_len(msg));
-}
-
-static void
-sub0_ctx_send(void *arg, nni_aio *aio)
-{
- NNI_ARG_UNUSED(arg);
- if (nni_aio_begin(aio) == 0) {
- nni_aio_finish_error(aio, NNG_ENOTSUP);
- }
-}
-
-static void
-sub0_ctx_close(void *arg)
-{
- sub0_ctx * ctx = arg;
- sub0_sock *sock = ctx->sock;
- nni_aio * aio;
-
- nni_mtx_lock(&sock->lk);
- while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) {
- nni_list_remove(&ctx->recv_queue, aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
- nni_mtx_unlock(&sock->lk);
-}
-
-static void
-sub0_ctx_fini(void *arg)
-{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
- sub0_topic *topic;
-
- sub0_ctx_close(ctx);
-
- nni_mtx_lock(&sock->lk);
- nni_list_remove(&sock->contexts, ctx);
- sock->num_contexts--;
- nni_mtx_unlock(&sock->lk);
-
- while ((topic = nni_list_first(&ctx->topics)) != 0) {
- nni_list_remove(&ctx->topics, topic);
- nni_free(topic->buf, topic->len);
- NNI_FREE_STRUCT(topic);
- }
-
- nni_lmq_fini(&ctx->lmq);
-}
-
-static int
-sub0_ctx_init(void *ctx_arg, void *sock_arg)
-{
- sub0_sock *sock = sock_arg;
- sub0_ctx * ctx = ctx_arg;
- size_t len;
- bool prefer_new;
- int rv;
-
- nni_mtx_lock(&sock->lk);
- len = sock->recv_buf_len;
- prefer_new = sock->prefer_new;
-
- if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) {
- return (rv);
- }
- ctx->prefer_new = prefer_new;
-
- nni_aio_list_init(&ctx->recv_queue);
- NNI_LIST_INIT(&ctx->topics, sub0_topic, node);
-
- ctx->sock = sock;
-
- nni_list_append(&sock->contexts, ctx);
- sock->num_contexts++;
- nni_mtx_unlock(&sock->lk);
-
- return (0);
-}
-
-static void
-sub0_sock_fini(void *arg)
-{
- sub0_sock *sock = arg;
-
- sub0_ctx_fini(&sock->master);
- nni_pollable_fini(&sock->readable);
- nni_mtx_fini(&sock->lk);
-}
-
-static int
-sub0_sock_init(void *arg, nni_sock *unused)
-{
- sub0_sock *sock = arg;
- int rv;
-
- NNI_ARG_UNUSED(unused);
-
- NNI_LIST_INIT(&sock->contexts, sub0_ctx, node);
- nni_mtx_init(&sock->lk);
- sock->recv_buf_len = SUB0_DEFAULT_RECV_BUF_LEN;
- sock->prefer_new = SUB0_DEFAULT_PREFER_NEW;
- nni_pollable_init(&sock->readable);
-
- if ((rv = sub0_ctx_init(&sock->master, sock)) != 0) {
- sub0_sock_fini(sock);
- return (rv);
- }
-
- return (0);
-}
-
-static void
-sub0_sock_open(void *arg)
-{
- NNI_ARG_UNUSED(arg);
-}
-
-static void
-sub0_sock_close(void *arg)
-{
- sub0_sock *sock = arg;
- sub0_ctx_close(&sock->master);
-}
-
-static void
-sub0_pipe_stop(void *arg)
-{
- sub0_pipe *p = arg;
-
- nni_aio_stop(&p->aio_recv);
-}
-
-static void
-sub0_pipe_fini(void *arg)
-{
- sub0_pipe *p = arg;
-
- nni_aio_fini(&p->aio_recv);
-}
-
-static int
-sub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
-{
- sub0_pipe *p = arg;
-
- nni_aio_init(&p->aio_recv, sub0_recv_cb, p);
-
- p->pipe = pipe;
- p->sub = s;
- return (0);
-}
-
-static int
-sub0_pipe_start(void *arg)
-{
- sub0_pipe *p = arg;
-
- if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUB_V0) {
- // Peer protocol mismatch.
- return (NNG_EPROTO);
- }
-
- nni_pipe_recv(p->pipe, &p->aio_recv);
- return (0);
-}
-
-static void
-sub0_pipe_close(void *arg)
-{
- sub0_pipe *p = arg;
-
- nni_aio_close(&p->aio_recv);
-}
-
-static bool
-sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len)
-{
- sub0_topic *topic;
-
- // This is a naive and trivial matcher. Replace with a real
- // patricia trie later.
- NNI_LIST_FOREACH (&ctx->topics, topic) {
- if (len < topic->len) {
- continue;
- }
- if ((topic->len == 0) ||
- (memcmp(topic->buf, body, topic->len) == 0)) {
- return (true);
- }
- }
- return (false);
-}
-
-static void
-sub0_recv_cb(void *arg)
-{
- sub0_pipe *p = arg;
- sub0_sock *sock = p->sub;
- sub0_ctx * ctx;
- nni_msg * msg;
- size_t len;
- uint8_t * body;
- nni_list finish;
- nng_aio * aio;
- nni_msg * dup_msg;
-
- if (nni_aio_result(&p->aio_recv) != 0) {
- nni_pipe_close(p->pipe);
- return;
- }
-
- nni_aio_list_init(&finish);
-
- msg = nni_aio_get_msg(&p->aio_recv);
- nni_aio_set_msg(&p->aio_recv, NULL);
- nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
-
- body = nni_msg_body(msg);
- len = nni_msg_len(msg);
- dup_msg = NULL;
-
- nni_mtx_lock(&sock->lk);
- // Go through all contexts. We will try to send up.
- NNI_LIST_FOREACH (&sock->contexts, ctx) {
- bool queued = false;
-
- if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) {
- // Cannot deliver here, as receive buffer is full.
- continue;
- }
-
- if (!sub0_matches(ctx, body, len)) {
- continue;
- }
-
- // This is a performance optimization, that ensures we
- // do not duplicate a message in the common case, where there
- // is only a single context.
- if (sock->num_contexts > 1) {
- if (nni_msg_dup(&dup_msg, msg) != 0) {
- // if we cannot dup it, continue on
- continue;
- }
- } else {
- // We only have one context, so it's the only
- // possible message.
- dup_msg = msg;
- }
-
- if (!nni_list_empty(&ctx->recv_queue)) {
- aio = nni_list_first(&ctx->recv_queue);
- nni_list_remove(&ctx->recv_queue, aio);
- nni_aio_set_msg(aio, dup_msg);
-
- // Save for synchronous completion
- nni_list_append(&finish, aio);
- } else if (nni_lmq_full(&ctx->lmq)) {
- // Make space for the new message.
- nni_msg *old;
- (void) nni_lmq_getq(&ctx->lmq, &old);
- nni_msg_free(old);
-
- (void) nni_lmq_putq(&ctx->lmq, dup_msg);
- queued = true;
-
- } else {
- (void) nni_lmq_putq(&ctx->lmq, dup_msg);
- queued = true;
- }
- if (queued && ctx == &sock->master) {
- nni_pollable_raise(&sock->readable);
- }
- }
- nni_mtx_unlock(&sock->lk);
-
- // NB: This is slightly less efficient in that we may have
- // created an extra copy in the face of e.g. two subscriptions,
- // but optimizing this further would require checking the subscription
- // list twice, adding complexity. If this turns out to be a problem
- // we could probably add some other sophistication with a counter
- // and flags on the contexts themselves.
- if (msg != dup_msg) {
- // If we didn't just use the message, then free our copy.
- nni_msg_free(msg);
- }
-
- while ((aio = nni_list_first(&finish)) != NULL) {
- nni_list_remove(&finish, aio);
- nni_aio_finish_sync(aio, 0, len);
- }
-
- nni_pipe_recv(p->pipe, &p->aio_recv);
-}
-
-static int
-sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
-{
- sub0_ctx * ctx = arg;
- sub0_sock *sock = ctx->sock;
- int val;
- nni_mtx_lock(&sock->lk);
- val = (int) nni_lmq_cap(&ctx->lmq);
- nni_mtx_unlock(&sock->lk);
-
- return (nni_copyout_int(val, buf, szp, t));
-}
-
-static int
-sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_ctx * ctx = arg;
- sub0_sock *sock = ctx->sock;
- int val;
- int rv;
-
- if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) {
- return (rv);
- }
- nni_mtx_lock(&sock->lk);
- if ((rv = nni_lmq_resize(&ctx->lmq, (size_t) val)) != 0) {
- nni_mtx_unlock(&sock->lk);
- return (rv);
- }
-
- // If we change the socket, then this will change the queue for
- // any new contexts. (Previously constructed contexts are unaffected.)
- if (&sock->master == ctx) {
- sock->recv_buf_len = (size_t) val;
- }
- nni_mtx_unlock(&sock->lk);
- return (0);
-}
-
-// For now we maintain subscriptions on a sorted linked list. As we do not
-// expect to have huge numbers of subscriptions, and as the operation is
-// really O(n), we think this is acceptable. In the future we might decide
-// to replace this with a patricia trie, like old nanomsg had.
-
-static int
-sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
- sub0_topic *topic;
- sub0_topic *new_topic;
- NNI_ARG_UNUSED(t);
-
- nni_mtx_lock(&sock->lk);
- NNI_LIST_FOREACH (&ctx->topics, topic) {
- if (topic->len != sz) {
- continue;
- }
- if (memcmp(topic->buf, buf, sz) == 0) {
- // Already have it.
- nni_mtx_unlock(&sock->lk);
- return (0);
- }
- }
- if ((new_topic = NNI_ALLOC_STRUCT(new_topic)) == NULL) {
- nni_mtx_unlock(&sock->lk);
- return (NNG_ENOMEM);
- }
- if ((sz > 0) && ((new_topic->buf = nni_alloc(sz)) == NULL)) {
- nni_mtx_unlock(&sock->lk);
- NNI_FREE_STRUCT(new_topic);
- return (NNG_ENOMEM);
- }
- memcpy(new_topic->buf, buf, sz);
- new_topic->len = sz;
- nni_list_append(&ctx->topics, new_topic);
- nni_mtx_unlock(&sock->lk);
- return (0);
-}
-
-static int
-sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
- sub0_topic *topic;
- size_t len;
- NNI_ARG_UNUSED(t);
-
- nni_mtx_lock(&sock->lk);
- NNI_LIST_FOREACH (&ctx->topics, topic) {
- if (topic->len != sz) {
- continue;
- }
- if (memcmp(topic->buf, buf, sz) == 0) {
- // Matched!
- break;
- }
- }
- if (topic == NULL) {
- nni_mtx_unlock(&sock->lk);
- return (NNG_ENOENT);
- }
- nni_list_remove(&ctx->topics, topic);
-
- // Now we need to make sure that any messages that are waiting still
- // match the subscription. We basically just run through the queue
- // and requeue those messages we need.
- len = nni_lmq_len(&ctx->lmq);
- for (size_t i = 0; i < len; i++) {
- nni_msg *msg;
-
- (void) nni_lmq_getq(&ctx->lmq, &msg);
- if (sub0_matches(ctx, nni_msg_body(msg), nni_msg_len(msg))) {
- (void) nni_lmq_putq(&ctx->lmq, msg);
- } else {
- nni_msg_free(msg);
- }
- }
- nni_mtx_unlock(&sock->lk);
-
- nni_free(topic->buf, topic->len);
- NNI_FREE_STRUCT(topic);
- return (0);
-}
-
-static int
-sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
-{
- sub0_ctx * ctx = arg;
- sub0_sock *sock = ctx->sock;
- bool val;
-
- nni_mtx_lock(&sock->lk);
- val = ctx->prefer_new;
- nni_mtx_unlock(&sock->lk);
-
- return (nni_copyout_bool(val, buf, szp, t));
-}
-
-static int
-sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_ctx * ctx = arg;
- sub0_sock *sock = ctx->sock;
- bool val;
- int rv;
-
- if ((rv = nni_copyin_bool(&val, buf, sz, t)) != 0) {
- return (rv);
- }
-
- nni_mtx_lock(&sock->lk);
- ctx->prefer_new = val;
- if (&sock->master == ctx) {
- sock->prefer_new = val;
- }
- nni_mtx_unlock(&sock->lk);
-
- return (0);
-}
-
-static nni_option sub0_ctx_options[] = {
- {
- .o_name = NNG_OPT_RECVBUF,
- .o_get = sub0_ctx_get_recv_buf_len,
- .o_set = sub0_ctx_set_recv_buf_len,
- },
- {
- .o_name = NNG_OPT_SUB_SUBSCRIBE,
- .o_set = sub0_ctx_subscribe,
- },
- {
- .o_name = NNG_OPT_SUB_UNSUBSCRIBE,
- .o_set = sub0_ctx_unsubscribe,
- },
- {
- .o_name = NNG_OPT_SUB_PREFNEW,
- .o_get = sub0_ctx_get_prefer_new,
- .o_set = sub0_ctx_set_prefer_new,
- },
- {
- .o_name = NULL,
- },
-};
-
-static void
-sub0_sock_send(void *arg, nni_aio *aio)
-{
- NNI_ARG_UNUSED(arg);
- if (nni_aio_begin(aio) == 0) {
- nni_aio_finish_error(aio, NNG_ENOTSUP);
- }
-}
-
-static void
-sub0_sock_recv(void *arg, nni_aio *aio)
-{
- sub0_sock *sock = arg;
-
- sub0_ctx_recv(&sock->master, aio);
-}
-
-static int
-sub0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
-{
- sub0_sock *sock = arg;
- int rv;
- int fd;
-
- if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) {
- return (rv);
- }
- return (nni_copyout_int(fd, buf, szp, t));
-}
-
-static int
-sub0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
-{
- sub0_sock *sock = arg;
- return (sub0_ctx_get_recv_buf_len(&sock->master, buf, szp, t));
-}
-
-static int
-sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_sock *sock = arg;
- return (sub0_ctx_set_recv_buf_len(&sock->master, buf, sz, t));
-}
-
-static int
-sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_sock *sock = arg;
- return (sub0_ctx_subscribe(&sock->master, buf, sz, t));
-}
-
-static int
-sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_sock *sock = arg;
- return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t));
-}
-
-static int
-sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
-{
- sub0_sock *sock = arg;
- return (sub0_ctx_get_prefer_new(&sock->master, buf, szp, t));
-}
-
-static int
-sub0_sock_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t)
-{
- sub0_sock *sock = arg;
- return (sub0_ctx_set_prefer_new(&sock->master, buf, sz, t));
-}
-
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops sub0_pipe_ops = {
- .pipe_size = sizeof(sub0_pipe),
- .pipe_init = sub0_pipe_init,
- .pipe_fini = sub0_pipe_fini,
- .pipe_start = sub0_pipe_start,
- .pipe_close = sub0_pipe_close,
- .pipe_stop = sub0_pipe_stop,
-};
-
-static nni_proto_ctx_ops sub0_ctx_ops = {
- .ctx_size = sizeof(sub0_ctx),
- .ctx_init = sub0_ctx_init,
- .ctx_fini = sub0_ctx_fini,
- .ctx_send = sub0_ctx_send,
- .ctx_recv = sub0_ctx_recv,
- .ctx_options = sub0_ctx_options,
-};
-
-static nni_option sub0_sock_options[] = {
- {
- .o_name = NNG_OPT_SUB_SUBSCRIBE,
- .o_set = sub0_sock_subscribe,
- },
- {
- .o_name = NNG_OPT_SUB_UNSUBSCRIBE,
- .o_set = sub0_sock_unsubscribe,
- },
- {
- .o_name = NNG_OPT_RECVFD,
- .o_get = sub0_sock_get_recv_fd,
- },
- {
- .o_name = NNG_OPT_RECVBUF,
- .o_get = sub0_sock_get_recv_buf_len,
- .o_set = sub0_sock_set_recv_buf_len,
- },
- {
- .o_name = NNG_OPT_SUB_PREFNEW,
- .o_get = sub0_sock_get_prefer_new,
- .o_set = sub0_sock_set_prefer_new,
- },
- // terminate list
- {
- .o_name = NULL,
- },
-};
-
-static nni_proto_sock_ops sub0_sock_ops = {
- .sock_size = sizeof(sub0_sock),
- .sock_init = sub0_sock_init,
- .sock_fini = sub0_sock_fini,
- .sock_open = sub0_sock_open,
- .sock_close = sub0_sock_close,
- .sock_send = sub0_sock_send,
- .sock_recv = sub0_sock_recv,
- .sock_options = sub0_sock_options,
-};
-
-static nni_proto sub0_proto = {
- .proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_SUB_V0, "sub" },
- .proto_peer = { NNI_PROTO_PUB_V0, "pub" },
- .proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_sock_ops = &sub0_sock_ops,
- .proto_pipe_ops = &sub0_pipe_ops,
- .proto_ctx_ops = &sub0_ctx_ops,
-};
-
-int
-nng_sub0_open(nng_socket *sock)
-{
- return (nni_proto_open(sock, &sub0_proto));
-}