summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/list.c32
-rw-r--r--src/core/list.h2
-rw-r--r--src/core/protocol.c2
-rw-r--r--src/protocol/pubsub/sub.c348
5 files changed, 386 insertions, 0 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a1c0fac7..f07a6696 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -67,6 +67,8 @@ set (NNG_SOURCES
protocol/pair/pair.c
+ protocol/pubsub/sub.c
+
protocol/reqrep/rep.c
protocol/reqrep/req.c
diff --git a/src/core/list.c b/src/core/list.c
index ca200415..b1adf2cd 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -83,6 +83,38 @@ nni_list_prepend(nni_list *list, void *item)
}
+void
+nni_list_insert_before(nni_list *list, void *item, void *before)
+{
+ nni_list_node *node = NODE(list, item);
+ nni_list_node *where = NODE(list, before);
+
+ if ((node->ln_next != NULL) || (node->ln_prev != NULL)) {
+ nni_panic("prepending node already on a list or not inited");
+ }
+ node->ln_next = where;
+ node->ln_prev = where->ln_prev;
+ node->ln_next->ln_prev = node;
+ node->ln_prev->ln_next = node;
+}
+
+
+void
+nni_list_insert_after(nni_list *list, void *item, void *after)
+{
+ nni_list_node *node = NODE(list, item);
+ nni_list_node *where = NODE(list, after);
+
+ if ((node->ln_next != NULL) || (node->ln_prev != NULL)) {
+ nni_panic("prepending node already on a list or not inited");
+ }
+ node->ln_prev = where;
+ node->ln_next = where->ln_next;
+ node->ln_next->ln_prev = node;
+ node->ln_prev->ln_next = node;
+}
+
+
void *
nni_list_next(const nni_list *list, void *item)
{
diff --git a/src/core/list.h b/src/core/list.h
index a6540f5e..7d8b50be 100644
--- a/src/core/list.h
+++ b/src/core/list.h
@@ -36,6 +36,8 @@ extern void *nni_list_first(const nni_list *);
extern void *nni_list_last(const nni_list *);
extern void nni_list_append(nni_list *, void *);
extern void nni_list_prepend(nni_list *, void *);
+extern void nni_list_insert_before(nni_list *, void *, void *);
+extern void nni_list_insert_after(nni_list *, void *, void *);
extern void *nni_list_next(const nni_list *, void *);
extern void *nni_list_prev(const nni_list *, void *);
extern void nni_list_remove(nni_list *, void *);
diff --git a/src/core/protocol.c b/src/core/protocol.c
index 27430771..765c2e9a 100644
--- a/src/core/protocol.c
+++ b/src/core/protocol.c
@@ -19,11 +19,13 @@
extern nni_proto nni_pair_proto;
extern nni_proto nni_rep_proto;
extern nni_proto nni_req_proto;
+extern nni_proto nni_sub_proto;
static nni_proto *protocols[] = {
&nni_pair_proto,
&nni_rep_proto,
&nni_req_proto,
+ &nni_sub_proto,
NULL
};
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
new file mode 100644
index 00000000..a1f23607
--- /dev/null
+++ b/src/protocol/pubsub/sub.c
@@ -0,0 +1,348 @@
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+// Subscriber protocol. The SUB protocol receives messages sent to
+// it from publishers, and filters out those it is not interested in,
+// only passing up ones that match known subscriptions.
+
+typedef struct nni_sub_pipe nni_sub_pipe;
+typedef struct nni_sub_sock nni_sub_sock;
+typedef struct nni_sub_topic nni_sub_topic;
+
+struct nni_sub_topic {
+ nni_list_node node;
+ size_t len;
+ void * buf;
+};
+
+// An nni_rep_sock is our per-socket protocol private structure.
+struct nni_sub_sock {
+ nni_sock * sock;
+ nni_mtx mx;
+ nni_list topics;
+ nni_msgq * urq;
+ int raw;
+};
+
+// An nni_rep_pipe is our per-pipe protocol private structure.
+struct nni_sub_pipe {
+ nni_pipe * pipe;
+ nni_sub_sock * sub;
+};
+
+static int
+nni_sub_init(void **subp, nni_sock *sock)
+{
+ nni_sub_sock *sub;
+ int rv;
+
+ if ((sub = NNI_ALLOC_STRUCT(sub)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_mtx_init(&sub->mx)) != 0) {
+ NNI_FREE_STRUCT(sub);
+ return (rv);
+ }
+ NNI_LIST_INIT(&sub->topics, nni_sub_topic, node);
+ sub->sock = sock;
+ sub->raw = 0;
+
+ sub->urq = nni_sock_recvq(sock);
+ nni_sock_senderr(sock, NNG_ENOTSUP);
+ *subp = sub;
+ return (0);
+}
+
+
+static void
+nni_sub_fini(void *arg)
+{
+ nni_sub_sock *sub = arg;
+
+ // XXX: free subscriptions...
+ nni_mtx_fini(&sub->mx);
+ NNI_FREE_STRUCT(sub);
+}
+
+
+static int
+nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
+{
+ nni_sub_pipe *sp;
+ int rv;
+
+ if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ sp->pipe = pipe;
+ sp->sub = ssock;
+ *spp = sp;
+ return (0);
+}
+
+
+static void
+nni_sub_pipe_fini(void *arg)
+{
+ nni_sub_pipe *sp = arg;
+
+ NNI_FREE_STRUCT(sp);
+}
+
+
+static int
+nni_sub_pipe_add(void *arg)
+{
+ nni_sub_pipe *sp = arg;
+
+ if (nni_pipe_peer(sp->pipe) != NNG_PROTO_PUB) {
+ return (NNG_EPROTO);
+ }
+ return (0);
+}
+
+
+static void
+nni_sub_pipe_rem(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+
+static void
+nni_sub_pipe_send(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+
+static void
+nni_sub_pipe_recv(void *arg)
+{
+ nni_sub_pipe *sp = arg;
+ nni_sub_sock *sub = sp->sub;
+ nni_msgq *urq = sub->urq;
+ nni_pipe *pipe = sp->pipe;
+ nni_msg *msg;
+ int rv;
+
+ for (;;) {
+ rv = nni_pipe_recv(pipe, &msg);
+ if (rv != 0) {
+ break;
+ }
+
+ // Now send it up.
+ rv = nni_msgq_put(urq, msg);
+ if (rv != 0) {
+ nni_msg_free(msg);
+ break;
+ }
+ }
+ // Nobody else to signal...
+ nni_pipe_close(pipe);
+}
+
+
+// For now we maintain subscriptions on a sorted linked list. As we do not
+// expect to have huge numbers of subscriptions, and as the operation is
+// really O(n), we think this is acceptable. In the future we might decide
+// to replace this with a patricia trie, like old nanomsg had.
+
+static int
+nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz)
+{
+ nni_sub_topic *topic;
+ nni_sub_topic *newtopic;
+
+ NNI_LIST_FOREACH (&sub->topics, topic) {
+ int rv;
+ if (topic->len >= sz) {
+ rv = memcmp(topic->buf, buf, sz);
+ } else {
+ rv = memcmp(topic->buf, buf, topic->len);
+ }
+ if (rv == 0) {
+ if (topic->len == sz) {
+ // Already inserted.
+ return (0);
+ }
+ if (topic->len > sz) {
+ break;
+ }
+ } else if (rv > 0) {
+ break;
+ }
+ }
+
+ if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((newtopic->buf = nni_alloc(sz)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_NODE_INIT(&newtopic->node);
+ newtopic->len = sz;
+ if (topic != NULL) {
+ nni_list_insert_before(&sub->topics, newtopic, topic);
+ }
+ return (0);
+}
+
+
+static int
+nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
+{
+ nni_sub_topic *topic;
+ int rv;
+
+ NNI_LIST_FOREACH (&sub->topics, topic) {
+ if (topic->len >= sz) {
+ rv = memcmp(topic->buf, buf, sz);
+ } else {
+ rv = memcmp(topic->buf, buf, topic->len);
+ }
+ if (rv == 0) {
+ if (topic->len == sz) {
+ nni_list_remove(&sub->topics, topic);
+ nni_free(topic->buf, topic->len);
+ NNI_FREE_STRUCT(topic);
+ return (0);
+ }
+ if (topic->len > sz) {
+ return (NNG_ENOENT);
+ }
+ }
+ if (rv > 0) {
+ return (NNG_ENOENT);
+ }
+ }
+ return (NNG_ENOENT);
+}
+
+
+static int
+nni_sub_setopt(void *arg, int opt, const void *buf, size_t sz)
+{
+ nni_sub_sock *sub = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ nni_mtx_lock(&sub->mx);
+ rv = nni_setopt_int(&sub->raw, buf, sz, 0, 1);
+ nni_mtx_unlock(&sub->mx);
+ break;
+ case NNG_OPT_SUBSCRIBE:
+ nni_mtx_lock(&sub->mx);
+ rv = nni_sub_subscribe(sub, buf, sz);
+ nni_mtx_unlock(&sub->mx);
+ break;
+ case NNG_OPT_UNSUBSCRIBE:
+ nni_mtx_lock(&sub->mx);
+ rv = nni_sub_unsubscribe(sub, buf, sz);
+ nni_mtx_unlock(&sub->mx);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+static int
+nni_sub_getopt(void *arg, int opt, void *buf, size_t *szp)
+{
+ nni_sub_sock *sub = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ nni_mtx_lock(&sub->mx);
+ rv = nni_getopt_int(&sub->raw, buf, szp);
+ nni_mtx_unlock(&sub->mx);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+static nni_msg *
+nni_sub_recvfilter(void *arg, nni_msg *msg)
+{
+ nni_sub_sock *sub = arg;
+ nni_sub_topic *topic;
+ char *body;
+ size_t len;
+ int match;
+
+ nni_mtx_lock(&sub->mx);
+ if (sub->raw) {
+ nni_mtx_unlock(&sub->mx);
+ return (msg);
+ }
+
+ body = nni_msg_body(msg, &len);
+
+ // Check to see if the message matches one of our subscriptions.
+ NNI_LIST_FOREACH (&sub->topics, topic) {
+ if (len >= topic->len) {
+ int rv = memcmp(topic->buf, body, topic->len);
+ if (rv == 0) {
+ // Matched!
+ match = 1;
+ break;
+ }
+ if (rv > 0) {
+ match = 0;
+ break;
+ }
+ } else if (memcmp(topic->buf, body, len) >= 0) {
+ match = 0;
+ break;
+ }
+ }
+ nni_mtx_unlock(&sub->mx);
+ if (!match) {
+ nni_msg_free(msg);
+ return (NULL);
+ }
+ return (0);
+}
+
+
+// This is the global protocol structure -- our linkage to the core.
+// This should be the only global non-static symbol in this file.
+static nni_proto_pipe nni_sub_proto_pipe = {
+ .pipe_init = nni_sub_pipe_init,
+ .pipe_fini = nni_sub_pipe_fini,
+ .pipe_add = nni_sub_pipe_add,
+ .pipe_rem = nni_sub_pipe_rem,
+ .pipe_send = nni_sub_pipe_send,
+ .pipe_recv = nni_sub_pipe_recv,
+};
+
+nni_proto nni_sub_proto = {
+ .proto_self = NNG_PROTO_SUB,
+ .proto_peer = NNG_PROTO_PUB,
+ .proto_name = "sub",
+ .proto_pipe = &nni_sub_proto_pipe,
+ .proto_init = nni_sub_init,
+ .proto_fini = nni_sub_fini,
+ .proto_setopt = nni_sub_setopt,
+ .proto_getopt = nni_sub_getopt,
+ .proto_recv_filter = nni_sub_recvfilter,
+};