aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/CMakeLists.txt3
-rw-r--r--src/core/protocol.c4
-rw-r--r--src/protocol/pipeline/pull.c195
-rw-r--r--src/protocol/pipeline/push.c326
4 files changed, 528 insertions, 0 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 0f040f35..28fcb026 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -67,6 +67,9 @@ set (NNG_SOURCES
protocol/pair/pair.c
+ protocol/pipeline/pull.c
+ protocol/pipeline/push.c
+
protocol/pubsub/pub.c
protocol/pubsub/sub.c
diff --git a/src/core/protocol.c b/src/core/protocol.c
index 5f23bf63..8cd1048f 100644
--- a/src/core/protocol.c
+++ b/src/core/protocol.c
@@ -21,6 +21,8 @@ extern nni_proto nni_rep_proto;
extern nni_proto nni_req_proto;
extern nni_proto nni_pub_proto;
extern nni_proto nni_sub_proto;
+extern nni_proto nni_push_proto;
+extern nni_proto nni_pull_proto;
static nni_proto *protocols[] = {
&nni_pair_proto,
@@ -28,6 +30,8 @@ static nni_proto *protocols[] = {
&nni_req_proto,
&nni_pub_proto,
&nni_sub_proto,
+ &nni_push_proto,
+ &nni_pull_proto,
NULL
};
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
new file mode 100644
index 00000000..695b730e
--- /dev/null
+++ b/src/protocol/pipeline/pull.c
@@ -0,0 +1,195 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+// Pull protocol. The PULL protocol is the "read" side of a pipeline.
+
+typedef struct nni_pull_pipe nni_pull_pipe;
+typedef struct nni_pull_sock nni_pull_sock;
+
+// An nni_pull_sock is our per-socket protocol private structure.
+struct nni_pull_sock {
+ nni_mtx mx;
+ nni_msgq * urq;
+ int raw;
+};
+
+// An nni_pull_pipe is our per-pipe protocol private structure.
+struct nni_pull_pipe {
+ nni_pipe * pipe;
+ nni_pull_sock * pull;
+};
+
+static int
+nni_pull_init(void **pullp, nni_sock *sock)
+{
+ nni_pull_sock *pull;
+ int rv;
+
+ if ((pull = NNI_ALLOC_STRUCT(pull)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_mtx_init(&pull->mx)) != 0) {
+ NNI_FREE_STRUCT(pull);
+ return (rv);
+ }
+ pull->raw = 0;
+ pull->urq = nni_sock_recvq(sock);
+ *pullp = pull;
+ nni_sock_senderr(sock, NNG_ENOTSUP);
+ return (0);
+}
+
+
+static void
+nni_pull_fini(void *arg)
+{
+ nni_pull_sock *pull = arg;
+
+ nni_mtx_fini(&pull->mx);
+ NNI_FREE_STRUCT(pull);
+}
+
+
+static int
+nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+{
+ nni_pull_pipe *pp;
+ int rv;
+
+ if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ pp->pipe = pipe;
+ pp->pull = psock;
+ *ppp = pp;
+ return (0);
+}
+
+
+static void
+nni_pull_pipe_fini(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+
+ NNI_FREE_STRUCT(pp);
+}
+
+
+static int
+nni_pull_pipe_add(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+
+ if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PUSH) {
+ return (NNG_EPROTO);
+ }
+ return (0);
+}
+
+
+static void
+nni_pull_pipe_rem(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+
+static void
+nni_pull_pipe_send(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+
+static void
+nni_pull_pipe_recv(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+ nni_pull_sock *pull = pp->pull;
+ nni_msg *msg;
+
+ for (;;) {
+ if (nni_pipe_recv(pp->pipe, &msg) != 0) {
+ break;
+ }
+ if (nni_msgq_put(pull->urq, msg) != 0) {
+ nni_msg_free(msg);
+ break;
+ }
+ }
+ nni_pipe_close(pp->pipe);
+}
+
+
+static int
+nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz)
+{
+ nni_pull_sock *pull = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ nni_mtx_lock(&pull->mx);
+ rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1);
+ nni_mtx_unlock(&pull->mx);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+static int
+nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp)
+{
+ nni_pull_sock *pull = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ nni_mtx_lock(&pull->mx);
+ rv = nni_getopt_int(&pull->raw, buf, szp);
+ nni_mtx_unlock(&pull->mx);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+// This is the global protocol structure -- our linkage to the core.
+// This should be the only global non-static symbol in this file.
+static nni_proto_pipe nni_pull_proto_pipe = {
+ .pipe_init = nni_pull_pipe_init,
+ .pipe_fini = nni_pull_pipe_fini,
+ .pipe_add = nni_pull_pipe_add,
+ .pipe_rem = nni_pull_pipe_rem,
+ .pipe_send = nni_pull_pipe_send,
+ .pipe_recv = nni_pull_pipe_recv,
+};
+
+nni_proto nni_pull_proto = {
+ .proto_self = NNG_PROTO_PULL,
+ .proto_peer = NNG_PROTO_PUSH,
+ .proto_name = "pull",
+ .proto_pipe = &nni_pull_proto_pipe,
+ .proto_init = nni_pull_init,
+ .proto_fini = nni_pull_fini,
+ .proto_setopt = nni_pull_setopt,
+ .proto_getopt = nni_pull_getopt,
+ .proto_recv_filter = NULL,
+ .proto_send_filter = NULL,
+};
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
new file mode 100644
index 00000000..10c0ddf9
--- /dev/null
+++ b/src/protocol/pipeline/push.c
@@ -0,0 +1,326 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+// Push protocol. The PUSH protocol is the "write" side of a pipeline.
+// Push distributes fairly, or tries to, by giving messages in round-robin
+// order.
+
+typedef struct nni_push_pipe nni_push_pipe;
+typedef struct nni_push_sock nni_push_sock;
+
+// An nni_push_sock is our per-socket protocol private structure.
+struct nni_push_sock {
+ nni_mtx mx;
+ nni_cv cv;
+ nni_msgq * uwq;
+ nni_thr sender;
+ int raw;
+ int closing;
+ int wantw;
+ nni_list pipes;
+ nni_push_pipe * nextpipe;
+ int npipes;
+};
+
+// An nni_push_pipe is our per-pipe protocol private structure.
+struct nni_push_pipe {
+ nni_pipe * pipe;
+ nni_push_sock * push;
+ nni_msgq * mq;
+ int sigclose;
+ nni_list_node node;
+};
+
+static void nni_push_rrdist(void *);
+
+static int
+nni_push_init(void **pushp, nni_sock *sock)
+{
+ nni_push_sock *push;
+ int rv;
+
+ if ((push = NNI_ALLOC_STRUCT(push)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_mtx_init(&push->mx)) != 0) {
+ NNI_FREE_STRUCT(push);
+ return (rv);
+ }
+ if ((rv = nni_cv_init(&push->cv, &push->mx)) != 0) {
+ nni_mtx_fini(&push->mx);
+ NNI_FREE_STRUCT(push);
+ return (rv);
+ }
+ NNI_LIST_INIT(&push->pipes, nni_push_pipe, node);
+ push->raw = 0;
+ push->npipes = 0;
+ push->wantw = 0;
+ push->uwq = nni_sock_sendq(sock);
+ *pushp = push;
+ nni_sock_recverr(sock, NNG_ENOTSUP);
+ rv = nni_thr_init(&push->sender, nni_push_rrdist, push);
+ if (rv != 0) {
+ nni_cv_fini(&push->cv);
+ nni_mtx_fini(&push->mx);
+ NNI_FREE_STRUCT(push);
+ return (rv);
+ }
+ nni_thr_run(&push->sender);
+ return (0);
+}
+
+
+static void
+nni_push_fini(void *arg)
+{
+ nni_push_sock *push = arg;
+
+ // Shut down the resender. We request it to exit by clearing
+ // its old value, then kick it.
+ nni_mtx_lock(&push->mx);
+ push->closing = 1;
+ nni_cv_wake(&push->cv);
+ nni_mtx_unlock(&push->mx);
+
+ nni_thr_fini(&push->sender);
+ nni_cv_fini(&push->cv);
+ nni_mtx_fini(&push->mx);
+ NNI_FREE_STRUCT(push);
+}
+
+
+static int
+nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+{
+ nni_push_pipe *pp;
+ int rv;
+
+ if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_msgq_init(&pp->mq, 0)) != 0) {
+ NNI_FREE_STRUCT(pp);
+ return (rv);
+ }
+ NNI_LIST_NODE_INIT(&pp->node);
+ pp->pipe = pipe;
+ pp->sigclose = 0;
+ pp->push = psock;
+ *ppp = pp;
+ return (0);
+}
+
+
+static void
+nni_push_pipe_fini(void *arg)
+{
+ nni_push_pipe *pp = arg;
+
+ nni_msgq_fini(pp->mq);
+ NNI_FREE_STRUCT(pp);
+}
+
+
+static int
+nni_push_pipe_add(void *arg)
+{
+ nni_push_pipe *pp = arg;
+ nni_push_sock *push = pp->push;
+
+ if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) {
+ return (NNG_EPROTO);
+ }
+ // Wake the sender since we have a new pipe.
+ nni_mtx_lock(&push->mx);
+ if (push->nextpipe) {
+ // Inject us right before the next pipe, so that we're next.
+ nni_list_insert_before(&push->pipes, pp, push);
+ } else {
+ nni_list_append(&push->pipes, pp);
+ }
+ // Wake the top sender, as we can accept a job.
+ push->npipes++;
+ nni_cv_wake(&push->cv);
+ nni_mtx_unlock(&push->mx);
+ return (0);
+}
+
+
+static void
+nni_push_pipe_rem(void *arg)
+{
+ nni_push_pipe *pp = arg;
+ nni_push_sock *push = pp->push;
+
+ nni_mtx_lock(&push->mx);
+ if (pp == push->nextpipe) {
+ push->nextpipe = nni_list_next(&push->pipes, pp);
+ }
+ push->npipes--;
+ nni_list_remove(&push->pipes, pp);
+ nni_mtx_unlock(&push->mx);
+}
+
+
+static void
+nni_push_pipe_send(void *arg)
+{
+ nni_push_pipe *pp = arg;
+ nni_push_sock *push = pp->push;
+ nni_msg *msg;
+
+ for (;;) {
+ if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) {
+ break;
+ }
+ nni_mtx_lock(&push->mx);
+ if (push->wantw) {
+ nni_cv_wake(&push->cv);
+ }
+ nni_mtx_unlock(&push->mx);
+ if (nni_pipe_send(pp->pipe, msg) != 0) {
+ nni_msg_free(msg);
+ break;
+ }
+ }
+ nni_pipe_close(pp->pipe);
+}
+
+
+static void
+nni_push_pipe_recv(void *arg)
+{
+ nni_push_pipe *pp = arg;
+ nni_msg *msg;
+
+ for (;;) {
+ if (nni_pipe_recv(pp->pipe, &msg) != 0) {
+ break;
+ }
+ nni_msg_free(msg);
+ }
+ nni_msgq_signal(pp->mq, &pp->sigclose);
+ nni_pipe_close(pp->pipe);
+}
+
+
+static int
+nni_push_setopt(void *arg, int opt, const void *buf, size_t sz)
+{
+ nni_push_sock *push = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ nni_mtx_lock(&push->mx);
+ rv = nni_setopt_int(&push->raw, buf, sz, 0, 1);
+ nni_mtx_unlock(&push->mx);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+static int
+nni_push_getopt(void *arg, int opt, void *buf, size_t *szp)
+{
+ nni_push_sock *push = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ nni_mtx_lock(&push->mx);
+ rv = nni_getopt_int(&push->raw, buf, szp);
+ nni_mtx_unlock(&push->mx);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+static void
+nni_push_rrdist(void *arg)
+{
+ nni_push_sock *push = arg;
+ nni_push_pipe *pp;
+ nni_msgq *uwq = push->uwq;
+ nni_msg *msg = NULL;
+ int rv;
+ int i;
+
+ for (;;) {
+ if ((msg == NULL) && (nni_msgq_get(uwq, &msg) != 0)) {
+ // Should only be NNG_ECLOSED
+ return;
+ }
+
+ nni_mtx_lock(&push->mx);
+ if (push->closing) {
+ if (msg != NULL) {
+ nni_mtx_unlock(&push->mx);
+ nni_msg_free(msg);
+ return;
+ }
+ }
+ for (i = 0; i < push->npipes; i++) {
+ pp = push->nextpipe;
+ if (pp == NULL) {
+ pp = nni_list_first(&push->pipes);
+ }
+ push->nextpipe = nni_list_next(&push->pipes, pp);
+ if (nni_msgq_tryput(pp->mq, msg) == 0) {
+ msg = NULL;
+ break;
+ }
+ }
+ if (msg != NULL) {
+ // We weren't able to deliver it, so keep it and
+ // wait for a sender to let us know its ready.
+ push->wantw = 1;
+ nni_cv_wait(&push->cv);
+ } else {
+ push->wantw = 0;
+ }
+ nni_mtx_unlock(&push->mx);
+ }
+}
+
+
+// This is the global protocol structure -- our linkage to the core.
+// This should be the only global non-static symbol in this file.
+static nni_proto_pipe nni_push_proto_pipe = {
+ .pipe_init = nni_push_pipe_init,
+ .pipe_fini = nni_push_pipe_fini,
+ .pipe_add = nni_push_pipe_add,
+ .pipe_rem = nni_push_pipe_rem,
+ .pipe_send = nni_push_pipe_send,
+ .pipe_recv = nni_push_pipe_recv,
+};
+
+nni_proto nni_push_proto = {
+ .proto_self = NNG_PROTO_PUSH,
+ .proto_peer = NNG_PROTO_PULL,
+ .proto_name = "push",
+ .proto_pipe = &nni_push_proto_pipe,
+ .proto_init = nni_push_init,
+ .proto_fini = nni_push_fini,
+ .proto_setopt = nni_push_setopt,
+ .proto_getopt = nni_push_getopt,
+ .proto_recv_filter = NULL,
+ .proto_send_filter = NULL,
+};