diff options
Diffstat (limited to 'src/protocol/pubsub0/sub.c')
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 755 |
1 files changed, 0 insertions, 755 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c deleted file mode 100644 index 9f3f2283..00000000 --- a/src/protocol/pubsub0/sub.c +++ /dev/null @@ -1,755 +0,0 @@ -// -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2019 Nathan Kent <nate@nkent.net> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include <stdbool.h> -#include <string.h> - -#include "core/nng_impl.h" -#include "nng/protocol/pubsub0/sub.h" - -// Subscriber protocol. The SUB protocol receives messages sent to -// it from publishers, and filters out those it is not interested in, -// only passing up ones that match known subscriptions. - -#ifndef NNI_PROTO_SUB_V0 -#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) -#endif - -#ifndef NNI_PROTO_PUB_V0 -#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) -#endif - -// By default we accept 128 messages. -#define SUB0_DEFAULT_RECV_BUF_LEN 128 - -// By default, prefer new messages when the queue is full. -#define SUB0_DEFAULT_PREFER_NEW true - -typedef struct sub0_pipe sub0_pipe; -typedef struct sub0_sock sub0_sock; -typedef struct sub0_ctx sub0_ctx; -typedef struct sub0_topic sub0_topic; - -static void sub0_recv_cb(void *); -static void sub0_pipe_fini(void *); - -struct sub0_topic { - nni_list_node node; - size_t len; - void * buf; -}; - -// sub0_ctx is a context for a SUB socket. The advantage of contexts is -// that different contexts can maintain different subscriptions. -struct sub0_ctx { - nni_list_node node; - sub0_sock * sock; - nni_list topics; // TODO: Consider patricia trie - nni_list recv_queue; // can have multiple pending receives - nni_lmq lmq; - bool prefer_new; -}; - -// sub0_sock is our per-socket protocol private structure. -struct sub0_sock { - nni_pollable readable; - sub0_ctx master; // default context - nni_list contexts; // all contexts - int num_contexts; - size_t recv_buf_len; - bool prefer_new; - nni_mtx lk; -}; - -// sub0_pipe is our per-pipe protocol private structure. -struct sub0_pipe { - nni_pipe * pipe; - sub0_sock *sub; - nni_aio aio_recv; -}; - -static void -sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) -{ - sub0_ctx * ctx = arg; - sub0_sock *sock = ctx->sock; - nni_mtx_lock(&sock->lk); - if (nni_list_active(&ctx->recv_queue, aio)) { - nni_list_remove(&ctx->recv_queue, aio); - nni_aio_finish_error(aio, rv); - } - nni_mtx_unlock(&sock->lk); -} - -static void -sub0_ctx_recv(void *arg, nni_aio *aio) -{ - sub0_ctx * ctx = arg; - sub0_sock *sock = ctx->sock; - nni_msg * msg; - - if (nni_aio_begin(aio) != 0) { - return; - } - - nni_mtx_lock(&sock->lk); - -again: - if (nni_lmq_empty(&ctx->lmq)) { - int rv; - if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) { - nni_mtx_unlock(&sock->lk); - nni_aio_finish_error(aio, rv); - return; - } - nni_list_append(&ctx->recv_queue, aio); - nni_mtx_unlock(&sock->lk); - return; - } - - (void) nni_lmq_getq(&ctx->lmq, &msg); - - if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { - nni_pollable_clear(&sock->readable); - } - if ((msg = nni_msg_unique(msg)) == NULL) { - goto again; - } - nni_aio_set_msg(aio, msg); - nni_mtx_unlock(&sock->lk); - nni_aio_finish(aio, 0, nni_msg_len(msg)); -} - -static void -sub0_ctx_send(void *arg, nni_aio *aio) -{ - NNI_ARG_UNUSED(arg); - if (nni_aio_begin(aio) == 0) { - nni_aio_finish_error(aio, NNG_ENOTSUP); - } -} - -static void -sub0_ctx_close(void *arg) -{ - sub0_ctx * ctx = arg; - sub0_sock *sock = ctx->sock; - nni_aio * aio; - - nni_mtx_lock(&sock->lk); - while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { - nni_list_remove(&ctx->recv_queue, aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } - nni_mtx_unlock(&sock->lk); -} - -static void -sub0_ctx_fini(void *arg) -{ - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; - sub0_topic *topic; - - sub0_ctx_close(ctx); - - nni_mtx_lock(&sock->lk); - nni_list_remove(&sock->contexts, ctx); - sock->num_contexts--; - nni_mtx_unlock(&sock->lk); - - while ((topic = nni_list_first(&ctx->topics)) != 0) { - nni_list_remove(&ctx->topics, topic); - nni_free(topic->buf, topic->len); - NNI_FREE_STRUCT(topic); - } - - nni_lmq_fini(&ctx->lmq); -} - -static int -sub0_ctx_init(void *ctx_arg, void *sock_arg) -{ - sub0_sock *sock = sock_arg; - sub0_ctx * ctx = ctx_arg; - size_t len; - bool prefer_new; - int rv; - - nni_mtx_lock(&sock->lk); - len = sock->recv_buf_len; - prefer_new = sock->prefer_new; - - if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) { - return (rv); - } - ctx->prefer_new = prefer_new; - - nni_aio_list_init(&ctx->recv_queue); - NNI_LIST_INIT(&ctx->topics, sub0_topic, node); - - ctx->sock = sock; - - nni_list_append(&sock->contexts, ctx); - sock->num_contexts++; - nni_mtx_unlock(&sock->lk); - - return (0); -} - -static void -sub0_sock_fini(void *arg) -{ - sub0_sock *sock = arg; - - sub0_ctx_fini(&sock->master); - nni_pollable_fini(&sock->readable); - nni_mtx_fini(&sock->lk); -} - -static int -sub0_sock_init(void *arg, nni_sock *unused) -{ - sub0_sock *sock = arg; - int rv; - - NNI_ARG_UNUSED(unused); - - NNI_LIST_INIT(&sock->contexts, sub0_ctx, node); - nni_mtx_init(&sock->lk); - sock->recv_buf_len = SUB0_DEFAULT_RECV_BUF_LEN; - sock->prefer_new = SUB0_DEFAULT_PREFER_NEW; - nni_pollable_init(&sock->readable); - - if ((rv = sub0_ctx_init(&sock->master, sock)) != 0) { - sub0_sock_fini(sock); - return (rv); - } - - return (0); -} - -static void -sub0_sock_open(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static void -sub0_sock_close(void *arg) -{ - sub0_sock *sock = arg; - sub0_ctx_close(&sock->master); -} - -static void -sub0_pipe_stop(void *arg) -{ - sub0_pipe *p = arg; - - nni_aio_stop(&p->aio_recv); -} - -static void -sub0_pipe_fini(void *arg) -{ - sub0_pipe *p = arg; - - nni_aio_fini(&p->aio_recv); -} - -static int -sub0_pipe_init(void *arg, nni_pipe *pipe, void *s) -{ - sub0_pipe *p = arg; - - nni_aio_init(&p->aio_recv, sub0_recv_cb, p); - - p->pipe = pipe; - p->sub = s; - return (0); -} - -static int -sub0_pipe_start(void *arg) -{ - sub0_pipe *p = arg; - - if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUB_V0) { - // Peer protocol mismatch. - return (NNG_EPROTO); - } - - nni_pipe_recv(p->pipe, &p->aio_recv); - return (0); -} - -static void -sub0_pipe_close(void *arg) -{ - sub0_pipe *p = arg; - - nni_aio_close(&p->aio_recv); -} - -static bool -sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len) -{ - sub0_topic *topic; - - // This is a naive and trivial matcher. Replace with a real - // patricia trie later. - NNI_LIST_FOREACH (&ctx->topics, topic) { - if (len < topic->len) { - continue; - } - if ((topic->len == 0) || - (memcmp(topic->buf, body, topic->len) == 0)) { - return (true); - } - } - return (false); -} - -static void -sub0_recv_cb(void *arg) -{ - sub0_pipe *p = arg; - sub0_sock *sock = p->sub; - sub0_ctx * ctx; - nni_msg * msg; - size_t len; - uint8_t * body; - nni_list finish; - nng_aio * aio; - nni_msg * dup_msg; - - if (nni_aio_result(&p->aio_recv) != 0) { - nni_pipe_close(p->pipe); - return; - } - - nni_aio_list_init(&finish); - - msg = nni_aio_get_msg(&p->aio_recv); - nni_aio_set_msg(&p->aio_recv, NULL); - nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - - body = nni_msg_body(msg); - len = nni_msg_len(msg); - dup_msg = NULL; - - nni_mtx_lock(&sock->lk); - // Go through all contexts. We will try to send up. - NNI_LIST_FOREACH (&sock->contexts, ctx) { - bool queued = false; - - if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { - // Cannot deliver here, as receive buffer is full. - continue; - } - - if (!sub0_matches(ctx, body, len)) { - continue; - } - - // This is a performance optimization, that ensures we - // do not duplicate a message in the common case, where there - // is only a single context. - if (sock->num_contexts > 1) { - if (nni_msg_dup(&dup_msg, msg) != 0) { - // if we cannot dup it, continue on - continue; - } - } else { - // We only have one context, so it's the only - // possible message. - dup_msg = msg; - } - - if (!nni_list_empty(&ctx->recv_queue)) { - aio = nni_list_first(&ctx->recv_queue); - nni_list_remove(&ctx->recv_queue, aio); - nni_aio_set_msg(aio, dup_msg); - - // Save for synchronous completion - nni_list_append(&finish, aio); - } else if (nni_lmq_full(&ctx->lmq)) { - // Make space for the new message. - nni_msg *old; - (void) nni_lmq_getq(&ctx->lmq, &old); - nni_msg_free(old); - - (void) nni_lmq_putq(&ctx->lmq, dup_msg); - queued = true; - - } else { - (void) nni_lmq_putq(&ctx->lmq, dup_msg); - queued = true; - } - if (queued && ctx == &sock->master) { - nni_pollable_raise(&sock->readable); - } - } - nni_mtx_unlock(&sock->lk); - - // NB: This is slightly less efficient in that we may have - // created an extra copy in the face of e.g. two subscriptions, - // but optimizing this further would require checking the subscription - // list twice, adding complexity. If this turns out to be a problem - // we could probably add some other sophistication with a counter - // and flags on the contexts themselves. - if (msg != dup_msg) { - // If we didn't just use the message, then free our copy. - nni_msg_free(msg); - } - - while ((aio = nni_list_first(&finish)) != NULL) { - nni_list_remove(&finish, aio); - nni_aio_finish_sync(aio, 0, len); - } - - nni_pipe_recv(p->pipe, &p->aio_recv); -} - -static int -sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) -{ - sub0_ctx * ctx = arg; - sub0_sock *sock = ctx->sock; - int val; - nni_mtx_lock(&sock->lk); - val = (int) nni_lmq_cap(&ctx->lmq); - nni_mtx_unlock(&sock->lk); - - return (nni_copyout_int(val, buf, szp, t)); -} - -static int -sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_ctx * ctx = arg; - sub0_sock *sock = ctx->sock; - int val; - int rv; - - if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) { - return (rv); - } - nni_mtx_lock(&sock->lk); - if ((rv = nni_lmq_resize(&ctx->lmq, (size_t) val)) != 0) { - nni_mtx_unlock(&sock->lk); - return (rv); - } - - // If we change the socket, then this will change the queue for - // any new contexts. (Previously constructed contexts are unaffected.) - if (&sock->master == ctx) { - sock->recv_buf_len = (size_t) val; - } - nni_mtx_unlock(&sock->lk); - return (0); -} - -// For now we maintain subscriptions on a sorted linked list. As we do not -// expect to have huge numbers of subscriptions, and as the operation is -// really O(n), we think this is acceptable. In the future we might decide -// to replace this with a patricia trie, like old nanomsg had. - -static int -sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; - sub0_topic *topic; - sub0_topic *new_topic; - NNI_ARG_UNUSED(t); - - nni_mtx_lock(&sock->lk); - NNI_LIST_FOREACH (&ctx->topics, topic) { - if (topic->len != sz) { - continue; - } - if (memcmp(topic->buf, buf, sz) == 0) { - // Already have it. - nni_mtx_unlock(&sock->lk); - return (0); - } - } - if ((new_topic = NNI_ALLOC_STRUCT(new_topic)) == NULL) { - nni_mtx_unlock(&sock->lk); - return (NNG_ENOMEM); - } - if ((sz > 0) && ((new_topic->buf = nni_alloc(sz)) == NULL)) { - nni_mtx_unlock(&sock->lk); - NNI_FREE_STRUCT(new_topic); - return (NNG_ENOMEM); - } - memcpy(new_topic->buf, buf, sz); - new_topic->len = sz; - nni_list_append(&ctx->topics, new_topic); - nni_mtx_unlock(&sock->lk); - return (0); -} - -static int -sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; - sub0_topic *topic; - size_t len; - NNI_ARG_UNUSED(t); - - nni_mtx_lock(&sock->lk); - NNI_LIST_FOREACH (&ctx->topics, topic) { - if (topic->len != sz) { - continue; - } - if (memcmp(topic->buf, buf, sz) == 0) { - // Matched! - break; - } - } - if (topic == NULL) { - nni_mtx_unlock(&sock->lk); - return (NNG_ENOENT); - } - nni_list_remove(&ctx->topics, topic); - - // Now we need to make sure that any messages that are waiting still - // match the subscription. We basically just run through the queue - // and requeue those messages we need. - len = nni_lmq_len(&ctx->lmq); - for (size_t i = 0; i < len; i++) { - nni_msg *msg; - - (void) nni_lmq_getq(&ctx->lmq, &msg); - if (sub0_matches(ctx, nni_msg_body(msg), nni_msg_len(msg))) { - (void) nni_lmq_putq(&ctx->lmq, msg); - } else { - nni_msg_free(msg); - } - } - nni_mtx_unlock(&sock->lk); - - nni_free(topic->buf, topic->len); - NNI_FREE_STRUCT(topic); - return (0); -} - -static int -sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) -{ - sub0_ctx * ctx = arg; - sub0_sock *sock = ctx->sock; - bool val; - - nni_mtx_lock(&sock->lk); - val = ctx->prefer_new; - nni_mtx_unlock(&sock->lk); - - return (nni_copyout_bool(val, buf, szp, t)); -} - -static int -sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_ctx * ctx = arg; - sub0_sock *sock = ctx->sock; - bool val; - int rv; - - if ((rv = nni_copyin_bool(&val, buf, sz, t)) != 0) { - return (rv); - } - - nni_mtx_lock(&sock->lk); - ctx->prefer_new = val; - if (&sock->master == ctx) { - sock->prefer_new = val; - } - nni_mtx_unlock(&sock->lk); - - return (0); -} - -static nni_option sub0_ctx_options[] = { - { - .o_name = NNG_OPT_RECVBUF, - .o_get = sub0_ctx_get_recv_buf_len, - .o_set = sub0_ctx_set_recv_buf_len, - }, - { - .o_name = NNG_OPT_SUB_SUBSCRIBE, - .o_set = sub0_ctx_subscribe, - }, - { - .o_name = NNG_OPT_SUB_UNSUBSCRIBE, - .o_set = sub0_ctx_unsubscribe, - }, - { - .o_name = NNG_OPT_SUB_PREFNEW, - .o_get = sub0_ctx_get_prefer_new, - .o_set = sub0_ctx_set_prefer_new, - }, - { - .o_name = NULL, - }, -}; - -static void -sub0_sock_send(void *arg, nni_aio *aio) -{ - NNI_ARG_UNUSED(arg); - if (nni_aio_begin(aio) == 0) { - nni_aio_finish_error(aio, NNG_ENOTSUP); - } -} - -static void -sub0_sock_recv(void *arg, nni_aio *aio) -{ - sub0_sock *sock = arg; - - sub0_ctx_recv(&sock->master, aio); -} - -static int -sub0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) -{ - sub0_sock *sock = arg; - int rv; - int fd; - - if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) { - return (rv); - } - return (nni_copyout_int(fd, buf, szp, t)); -} - -static int -sub0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) -{ - sub0_sock *sock = arg; - return (sub0_ctx_get_recv_buf_len(&sock->master, buf, szp, t)); -} - -static int -sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_sock *sock = arg; - return (sub0_ctx_set_recv_buf_len(&sock->master, buf, sz, t)); -} - -static int -sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_sock *sock = arg; - return (sub0_ctx_subscribe(&sock->master, buf, sz, t)); -} - -static int -sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_sock *sock = arg; - return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t)); -} - -static int -sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) -{ - sub0_sock *sock = arg; - return (sub0_ctx_get_prefer_new(&sock->master, buf, szp, t)); -} - -static int -sub0_sock_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) -{ - sub0_sock *sock = arg; - return (sub0_ctx_set_prefer_new(&sock->master, buf, sz, t)); -} - -// This is the global protocol structure -- our linkage to the core. -// This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops sub0_pipe_ops = { - .pipe_size = sizeof(sub0_pipe), - .pipe_init = sub0_pipe_init, - .pipe_fini = sub0_pipe_fini, - .pipe_start = sub0_pipe_start, - .pipe_close = sub0_pipe_close, - .pipe_stop = sub0_pipe_stop, -}; - -static nni_proto_ctx_ops sub0_ctx_ops = { - .ctx_size = sizeof(sub0_ctx), - .ctx_init = sub0_ctx_init, - .ctx_fini = sub0_ctx_fini, - .ctx_send = sub0_ctx_send, - .ctx_recv = sub0_ctx_recv, - .ctx_options = sub0_ctx_options, -}; - -static nni_option sub0_sock_options[] = { - { - .o_name = NNG_OPT_SUB_SUBSCRIBE, - .o_set = sub0_sock_subscribe, - }, - { - .o_name = NNG_OPT_SUB_UNSUBSCRIBE, - .o_set = sub0_sock_unsubscribe, - }, - { - .o_name = NNG_OPT_RECVFD, - .o_get = sub0_sock_get_recv_fd, - }, - { - .o_name = NNG_OPT_RECVBUF, - .o_get = sub0_sock_get_recv_buf_len, - .o_set = sub0_sock_set_recv_buf_len, - }, - { - .o_name = NNG_OPT_SUB_PREFNEW, - .o_get = sub0_sock_get_prefer_new, - .o_set = sub0_sock_set_prefer_new, - }, - // terminate list - { - .o_name = NULL, - }, -}; - -static nni_proto_sock_ops sub0_sock_ops = { - .sock_size = sizeof(sub0_sock), - .sock_init = sub0_sock_init, - .sock_fini = sub0_sock_fini, - .sock_open = sub0_sock_open, - .sock_close = sub0_sock_close, - .sock_send = sub0_sock_send, - .sock_recv = sub0_sock_recv, - .sock_options = sub0_sock_options, -}; - -static nni_proto sub0_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_SUB_V0, "sub" }, - .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, - .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_sock_ops = &sub0_sock_ops, - .proto_pipe_ops = &sub0_pipe_ops, - .proto_ctx_ops = &sub0_ctx_ops, -}; - -int -nng_sub0_open(nng_socket *sock) -{ - return (nni_proto_open(sock, &sub0_proto)); -} |
