diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-05 01:55:27 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-05 01:55:27 -0800 |
| commit | 5d0fb985db60c0807efc76f05edc8d4d3e5ffa95 (patch) | |
| tree | ddb150db90c1e72d6d8b3ecf0e3ad8e58b751fe5 /src | |
| parent | e1f991bd194dcbc8f2547ac4f583c998c727a1ec (diff) | |
| download | nng-5d0fb985db60c0807efc76f05edc8d4d3e5ffa95.tar.gz nng-5d0fb985db60c0807efc76f05edc8d4d3e5ffa95.tar.bz2 nng-5d0fb985db60c0807efc76f05edc8d4d3e5ffa95.zip | |
SUB protocol implemented (uses sorted linked list for topics).
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/list.c | 32 | ||||
| -rw-r--r-- | src/core/list.h | 2 | ||||
| -rw-r--r-- | src/core/protocol.c | 2 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 348 |
5 files changed, 386 insertions, 0 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a1c0fac7..f07a6696 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -67,6 +67,8 @@ set (NNG_SOURCES protocol/pair/pair.c + protocol/pubsub/sub.c + protocol/reqrep/rep.c protocol/reqrep/req.c diff --git a/src/core/list.c b/src/core/list.c index ca200415..b1adf2cd 100644 --- a/src/core/list.c +++ b/src/core/list.c @@ -83,6 +83,38 @@ nni_list_prepend(nni_list *list, void *item) } +void +nni_list_insert_before(nni_list *list, void *item, void *before) +{ + nni_list_node *node = NODE(list, item); + nni_list_node *where = NODE(list, before); + + if ((node->ln_next != NULL) || (node->ln_prev != NULL)) { + nni_panic("prepending node already on a list or not inited"); + } + node->ln_next = where; + node->ln_prev = where->ln_prev; + node->ln_next->ln_prev = node; + node->ln_prev->ln_next = node; +} + + +void +nni_list_insert_after(nni_list *list, void *item, void *after) +{ + nni_list_node *node = NODE(list, item); + nni_list_node *where = NODE(list, after); + + if ((node->ln_next != NULL) || (node->ln_prev != NULL)) { + nni_panic("prepending node already on a list or not inited"); + } + node->ln_prev = where; + node->ln_next = where->ln_next; + node->ln_next->ln_prev = node; + node->ln_prev->ln_next = node; +} + + void * nni_list_next(const nni_list *list, void *item) { diff --git a/src/core/list.h b/src/core/list.h index a6540f5e..7d8b50be 100644 --- a/src/core/list.h +++ b/src/core/list.h @@ -36,6 +36,8 @@ extern void *nni_list_first(const nni_list *); extern void *nni_list_last(const nni_list *); extern void nni_list_append(nni_list *, void *); extern void nni_list_prepend(nni_list *, void *); +extern void nni_list_insert_before(nni_list *, void *, void *); +extern void nni_list_insert_after(nni_list *, void *, void *); extern void *nni_list_next(const nni_list *, void *); extern void *nni_list_prev(const nni_list *, void *); extern void nni_list_remove(nni_list *, void *); diff --git a/src/core/protocol.c b/src/core/protocol.c index 27430771..765c2e9a 100644 --- a/src/core/protocol.c +++ b/src/core/protocol.c @@ -19,11 +19,13 @@ extern nni_proto nni_pair_proto; extern nni_proto nni_rep_proto; extern nni_proto nni_req_proto; +extern nni_proto nni_sub_proto; static nni_proto *protocols[] = { &nni_pair_proto, &nni_rep_proto, &nni_req_proto, + &nni_sub_proto, NULL }; diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c new file mode 100644 index 00000000..a1f23607 --- /dev/null +++ b/src/protocol/pubsub/sub.c @@ -0,0 +1,348 @@ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" + +// Subscriber protocol. The SUB protocol receives messages sent to +// it from publishers, and filters out those it is not interested in, +// only passing up ones that match known subscriptions. + +typedef struct nni_sub_pipe nni_sub_pipe; +typedef struct nni_sub_sock nni_sub_sock; +typedef struct nni_sub_topic nni_sub_topic; + +struct nni_sub_topic { + nni_list_node node; + size_t len; + void * buf; +}; + +// An nni_rep_sock is our per-socket protocol private structure. +struct nni_sub_sock { + nni_sock * sock; + nni_mtx mx; + nni_list topics; + nni_msgq * urq; + int raw; +}; + +// An nni_rep_pipe is our per-pipe protocol private structure. +struct nni_sub_pipe { + nni_pipe * pipe; + nni_sub_sock * sub; +}; + +static int +nni_sub_init(void **subp, nni_sock *sock) +{ + nni_sub_sock *sub; + int rv; + + if ((sub = NNI_ALLOC_STRUCT(sub)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mtx_init(&sub->mx)) != 0) { + NNI_FREE_STRUCT(sub); + return (rv); + } + NNI_LIST_INIT(&sub->topics, nni_sub_topic, node); + sub->sock = sock; + sub->raw = 0; + + sub->urq = nni_sock_recvq(sock); + nni_sock_senderr(sock, NNG_ENOTSUP); + *subp = sub; + return (0); +} + + +static void +nni_sub_fini(void *arg) +{ + nni_sub_sock *sub = arg; + + // XXX: free subscriptions... + nni_mtx_fini(&sub->mx); + NNI_FREE_STRUCT(sub); +} + + +static int +nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock) +{ + nni_sub_pipe *sp; + int rv; + + if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) { + return (NNG_ENOMEM); + } + sp->pipe = pipe; + sp->sub = ssock; + *spp = sp; + return (0); +} + + +static void +nni_sub_pipe_fini(void *arg) +{ + nni_sub_pipe *sp = arg; + + NNI_FREE_STRUCT(sp); +} + + +static int +nni_sub_pipe_add(void *arg) +{ + nni_sub_pipe *sp = arg; + + if (nni_pipe_peer(sp->pipe) != NNG_PROTO_PUB) { + return (NNG_EPROTO); + } + return (0); +} + + +static void +nni_sub_pipe_rem(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + + +static void +nni_sub_pipe_send(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + + +static void +nni_sub_pipe_recv(void *arg) +{ + nni_sub_pipe *sp = arg; + nni_sub_sock *sub = sp->sub; + nni_msgq *urq = sub->urq; + nni_pipe *pipe = sp->pipe; + nni_msg *msg; + int rv; + + for (;;) { + rv = nni_pipe_recv(pipe, &msg); + if (rv != 0) { + break; + } + + // Now send it up. + rv = nni_msgq_put(urq, msg); + if (rv != 0) { + nni_msg_free(msg); + break; + } + } + // Nobody else to signal... + nni_pipe_close(pipe); +} + + +// For now we maintain subscriptions on a sorted linked list. As we do not +// expect to have huge numbers of subscriptions, and as the operation is +// really O(n), we think this is acceptable. In the future we might decide +// to replace this with a patricia trie, like old nanomsg had. + +static int +nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz) +{ + nni_sub_topic *topic; + nni_sub_topic *newtopic; + + NNI_LIST_FOREACH (&sub->topics, topic) { + int rv; + if (topic->len >= sz) { + rv = memcmp(topic->buf, buf, sz); + } else { + rv = memcmp(topic->buf, buf, topic->len); + } + if (rv == 0) { + if (topic->len == sz) { + // Already inserted. + return (0); + } + if (topic->len > sz) { + break; + } + } else if (rv > 0) { + break; + } + } + + if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) { + return (NNG_ENOMEM); + } + if ((newtopic->buf = nni_alloc(sz)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_NODE_INIT(&newtopic->node); + newtopic->len = sz; + if (topic != NULL) { + nni_list_insert_before(&sub->topics, newtopic, topic); + } + return (0); +} + + +static int +nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz) +{ + nni_sub_topic *topic; + int rv; + + NNI_LIST_FOREACH (&sub->topics, topic) { + if (topic->len >= sz) { + rv = memcmp(topic->buf, buf, sz); + } else { + rv = memcmp(topic->buf, buf, topic->len); + } + if (rv == 0) { + if (topic->len == sz) { + nni_list_remove(&sub->topics, topic); + nni_free(topic->buf, topic->len); + NNI_FREE_STRUCT(topic); + return (0); + } + if (topic->len > sz) { + return (NNG_ENOENT); + } + } + if (rv > 0) { + return (NNG_ENOENT); + } + } + return (NNG_ENOENT); +} + + +static int +nni_sub_setopt(void *arg, int opt, const void *buf, size_t sz) +{ + nni_sub_sock *sub = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + nni_mtx_lock(&sub->mx); + rv = nni_setopt_int(&sub->raw, buf, sz, 0, 1); + nni_mtx_unlock(&sub->mx); + break; + case NNG_OPT_SUBSCRIBE: + nni_mtx_lock(&sub->mx); + rv = nni_sub_subscribe(sub, buf, sz); + nni_mtx_unlock(&sub->mx); + break; + case NNG_OPT_UNSUBSCRIBE: + nni_mtx_lock(&sub->mx); + rv = nni_sub_unsubscribe(sub, buf, sz); + nni_mtx_unlock(&sub->mx); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static int +nni_sub_getopt(void *arg, int opt, void *buf, size_t *szp) +{ + nni_sub_sock *sub = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + nni_mtx_lock(&sub->mx); + rv = nni_getopt_int(&sub->raw, buf, szp); + nni_mtx_unlock(&sub->mx); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static nni_msg * +nni_sub_recvfilter(void *arg, nni_msg *msg) +{ + nni_sub_sock *sub = arg; + nni_sub_topic *topic; + char *body; + size_t len; + int match; + + nni_mtx_lock(&sub->mx); + if (sub->raw) { + nni_mtx_unlock(&sub->mx); + return (msg); + } + + body = nni_msg_body(msg, &len); + + // Check to see if the message matches one of our subscriptions. + NNI_LIST_FOREACH (&sub->topics, topic) { + if (len >= topic->len) { + int rv = memcmp(topic->buf, body, topic->len); + if (rv == 0) { + // Matched! + match = 1; + break; + } + if (rv > 0) { + match = 0; + break; + } + } else if (memcmp(topic->buf, body, len) >= 0) { + match = 0; + break; + } + } + nni_mtx_unlock(&sub->mx); + if (!match) { + nni_msg_free(msg); + return (NULL); + } + return (0); +} + + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +static nni_proto_pipe nni_sub_proto_pipe = { + .pipe_init = nni_sub_pipe_init, + .pipe_fini = nni_sub_pipe_fini, + .pipe_add = nni_sub_pipe_add, + .pipe_rem = nni_sub_pipe_rem, + .pipe_send = nni_sub_pipe_send, + .pipe_recv = nni_sub_pipe_recv, +}; + +nni_proto nni_sub_proto = { + .proto_self = NNG_PROTO_SUB, + .proto_peer = NNG_PROTO_PUB, + .proto_name = "sub", + .proto_pipe = &nni_sub_proto_pipe, + .proto_init = nni_sub_init, + .proto_fini = nni_sub_fini, + .proto_setopt = nni_sub_setopt, + .proto_getopt = nni_sub_getopt, + .proto_recv_filter = nni_sub_recvfilter, +}; |
