summaryrefslogtreecommitdiff
path: root/src/protocol/pipeline/pull.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pipeline/pull.c')
-rw-r--r--src/protocol/pipeline/pull.c195
1 files changed, 195 insertions, 0 deletions
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
new file mode 100644
index 00000000..695b730e
--- /dev/null
+++ b/src/protocol/pipeline/pull.c
@@ -0,0 +1,195 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+// Pull protocol. The PULL protocol is the "read" side of a pipeline.
+
+typedef struct nni_pull_pipe nni_pull_pipe;
+typedef struct nni_pull_sock nni_pull_sock;
+
+// An nni_pull_sock is our per-socket protocol private structure.
+struct nni_pull_sock {
+ nni_mtx mx;
+ nni_msgq * urq;
+ int raw;
+};
+
+// An nni_pull_pipe is our per-pipe protocol private structure.
+struct nni_pull_pipe {
+ nni_pipe * pipe;
+ nni_pull_sock * pull;
+};
+
+static int
+nni_pull_init(void **pullp, nni_sock *sock)
+{
+ nni_pull_sock *pull;
+ int rv;
+
+ if ((pull = NNI_ALLOC_STRUCT(pull)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_mtx_init(&pull->mx)) != 0) {
+ NNI_FREE_STRUCT(pull);
+ return (rv);
+ }
+ pull->raw = 0;
+ pull->urq = nni_sock_recvq(sock);
+ *pullp = pull;
+ nni_sock_senderr(sock, NNG_ENOTSUP);
+ return (0);
+}
+
+
+static void
+nni_pull_fini(void *arg)
+{
+ nni_pull_sock *pull = arg;
+
+ nni_mtx_fini(&pull->mx);
+ NNI_FREE_STRUCT(pull);
+}
+
+
+static int
+nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+{
+ nni_pull_pipe *pp;
+ int rv;
+
+ if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ pp->pipe = pipe;
+ pp->pull = psock;
+ *ppp = pp;
+ return (0);
+}
+
+
+static void
+nni_pull_pipe_fini(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+
+ NNI_FREE_STRUCT(pp);
+}
+
+
+static int
+nni_pull_pipe_add(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+
+ if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PUSH) {
+ return (NNG_EPROTO);
+ }
+ return (0);
+}
+
+
+static void
+nni_pull_pipe_rem(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+
+static void
+nni_pull_pipe_send(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+
+static void
+nni_pull_pipe_recv(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+ nni_pull_sock *pull = pp->pull;
+ nni_msg *msg;
+
+ for (;;) {
+ if (nni_pipe_recv(pp->pipe, &msg) != 0) {
+ break;
+ }
+ if (nni_msgq_put(pull->urq, msg) != 0) {
+ nni_msg_free(msg);
+ break;
+ }
+ }
+ nni_pipe_close(pp->pipe);
+}
+
+
+static int
+nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz)
+{
+ nni_pull_sock *pull = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ nni_mtx_lock(&pull->mx);
+ rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1);
+ nni_mtx_unlock(&pull->mx);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+static int
+nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp)
+{
+ nni_pull_sock *pull = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ nni_mtx_lock(&pull->mx);
+ rv = nni_getopt_int(&pull->raw, buf, szp);
+ nni_mtx_unlock(&pull->mx);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+// This is the global protocol structure -- our linkage to the core.
+// This should be the only global non-static symbol in this file.
+static nni_proto_pipe nni_pull_proto_pipe = {
+ .pipe_init = nni_pull_pipe_init,
+ .pipe_fini = nni_pull_pipe_fini,
+ .pipe_add = nni_pull_pipe_add,
+ .pipe_rem = nni_pull_pipe_rem,
+ .pipe_send = nni_pull_pipe_send,
+ .pipe_recv = nni_pull_pipe_recv,
+};
+
+nni_proto nni_pull_proto = {
+ .proto_self = NNG_PROTO_PULL,
+ .proto_peer = NNG_PROTO_PUSH,
+ .proto_name = "pull",
+ .proto_pipe = &nni_pull_proto_pipe,
+ .proto_init = nni_pull_init,
+ .proto_fini = nni_pull_fini,
+ .proto_setopt = nni_pull_setopt,
+ .proto_getopt = nni_pull_getopt,
+ .proto_recv_filter = NULL,
+ .proto_send_filter = NULL,
+};