diff options
| -rw-r--r-- | src/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | src/core/protocol.c | 4 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 195 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 326 |
4 files changed, 528 insertions, 0 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0f040f35..28fcb026 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -67,6 +67,9 @@ set (NNG_SOURCES protocol/pair/pair.c + protocol/pipeline/pull.c + protocol/pipeline/push.c + protocol/pubsub/pub.c protocol/pubsub/sub.c diff --git a/src/core/protocol.c b/src/core/protocol.c index 5f23bf63..8cd1048f 100644 --- a/src/core/protocol.c +++ b/src/core/protocol.c @@ -21,6 +21,8 @@ extern nni_proto nni_rep_proto; extern nni_proto nni_req_proto; extern nni_proto nni_pub_proto; extern nni_proto nni_sub_proto; +extern nni_proto nni_push_proto; +extern nni_proto nni_pull_proto; static nni_proto *protocols[] = { &nni_pair_proto, @@ -28,6 +30,8 @@ static nni_proto *protocols[] = { &nni_req_proto, &nni_pub_proto, &nni_sub_proto, + &nni_push_proto, + &nni_pull_proto, NULL }; diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c new file mode 100644 index 00000000..695b730e --- /dev/null +++ b/src/protocol/pipeline/pull.c @@ -0,0 +1,195 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" + +// Pull protocol. The PULL protocol is the "read" side of a pipeline. + +typedef struct nni_pull_pipe nni_pull_pipe; +typedef struct nni_pull_sock nni_pull_sock; + +// An nni_pull_sock is our per-socket protocol private structure. +struct nni_pull_sock { + nni_mtx mx; + nni_msgq * urq; + int raw; +}; + +// An nni_pull_pipe is our per-pipe protocol private structure. +struct nni_pull_pipe { + nni_pipe * pipe; + nni_pull_sock * pull; +}; + +static int +nni_pull_init(void **pullp, nni_sock *sock) +{ + nni_pull_sock *pull; + int rv; + + if ((pull = NNI_ALLOC_STRUCT(pull)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mtx_init(&pull->mx)) != 0) { + NNI_FREE_STRUCT(pull); + return (rv); + } + pull->raw = 0; + pull->urq = nni_sock_recvq(sock); + *pullp = pull; + nni_sock_senderr(sock, NNG_ENOTSUP); + return (0); +} + + +static void +nni_pull_fini(void *arg) +{ + nni_pull_sock *pull = arg; + + nni_mtx_fini(&pull->mx); + NNI_FREE_STRUCT(pull); +} + + +static int +nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock) +{ + nni_pull_pipe *pp; + int rv; + + if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { + return (NNG_ENOMEM); + } + pp->pipe = pipe; + pp->pull = psock; + *ppp = pp; + return (0); +} + + +static void +nni_pull_pipe_fini(void *arg) +{ + nni_pull_pipe *pp = arg; + + NNI_FREE_STRUCT(pp); +} + + +static int +nni_pull_pipe_add(void *arg) +{ + nni_pull_pipe *pp = arg; + + if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PUSH) { + return (NNG_EPROTO); + } + return (0); +} + + +static void +nni_pull_pipe_rem(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + + +static void +nni_pull_pipe_send(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + + +static void +nni_pull_pipe_recv(void *arg) +{ + nni_pull_pipe *pp = arg; + nni_pull_sock *pull = pp->pull; + nni_msg *msg; + + for (;;) { + if (nni_pipe_recv(pp->pipe, &msg) != 0) { + break; + } + if (nni_msgq_put(pull->urq, msg) != 0) { + nni_msg_free(msg); + break; + } + } + nni_pipe_close(pp->pipe); +} + + +static int +nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz) +{ + nni_pull_sock *pull = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + nni_mtx_lock(&pull->mx); + rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1); + nni_mtx_unlock(&pull->mx); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static int +nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp) +{ + nni_pull_sock *pull = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + nni_mtx_lock(&pull->mx); + rv = nni_getopt_int(&pull->raw, buf, szp); + nni_mtx_unlock(&pull->mx); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +static nni_proto_pipe nni_pull_proto_pipe = { + .pipe_init = nni_pull_pipe_init, + .pipe_fini = nni_pull_pipe_fini, + .pipe_add = nni_pull_pipe_add, + .pipe_rem = nni_pull_pipe_rem, + .pipe_send = nni_pull_pipe_send, + .pipe_recv = nni_pull_pipe_recv, +}; + +nni_proto nni_pull_proto = { + .proto_self = NNG_PROTO_PULL, + .proto_peer = NNG_PROTO_PUSH, + .proto_name = "pull", + .proto_pipe = &nni_pull_proto_pipe, + .proto_init = nni_pull_init, + .proto_fini = nni_pull_fini, + .proto_setopt = nni_pull_setopt, + .proto_getopt = nni_pull_getopt, + .proto_recv_filter = NULL, + .proto_send_filter = NULL, +}; diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c new file mode 100644 index 00000000..10c0ddf9 --- /dev/null +++ b/src/protocol/pipeline/push.c @@ -0,0 +1,326 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" + +// Push protocol. The PUSH protocol is the "write" side of a pipeline. +// Push distributes fairly, or tries to, by giving messages in round-robin +// order. + +typedef struct nni_push_pipe nni_push_pipe; +typedef struct nni_push_sock nni_push_sock; + +// An nni_push_sock is our per-socket protocol private structure. +struct nni_push_sock { + nni_mtx mx; + nni_cv cv; + nni_msgq * uwq; + nni_thr sender; + int raw; + int closing; + int wantw; + nni_list pipes; + nni_push_pipe * nextpipe; + int npipes; +}; + +// An nni_push_pipe is our per-pipe protocol private structure. +struct nni_push_pipe { + nni_pipe * pipe; + nni_push_sock * push; + nni_msgq * mq; + int sigclose; + nni_list_node node; +}; + +static void nni_push_rrdist(void *); + +static int +nni_push_init(void **pushp, nni_sock *sock) +{ + nni_push_sock *push; + int rv; + + if ((push = NNI_ALLOC_STRUCT(push)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mtx_init(&push->mx)) != 0) { + NNI_FREE_STRUCT(push); + return (rv); + } + if ((rv = nni_cv_init(&push->cv, &push->mx)) != 0) { + nni_mtx_fini(&push->mx); + NNI_FREE_STRUCT(push); + return (rv); + } + NNI_LIST_INIT(&push->pipes, nni_push_pipe, node); + push->raw = 0; + push->npipes = 0; + push->wantw = 0; + push->uwq = nni_sock_sendq(sock); + *pushp = push; + nni_sock_recverr(sock, NNG_ENOTSUP); + rv = nni_thr_init(&push->sender, nni_push_rrdist, push); + if (rv != 0) { + nni_cv_fini(&push->cv); + nni_mtx_fini(&push->mx); + NNI_FREE_STRUCT(push); + return (rv); + } + nni_thr_run(&push->sender); + return (0); +} + + +static void +nni_push_fini(void *arg) +{ + nni_push_sock *push = arg; + + // Shut down the resender. We request it to exit by clearing + // its old value, then kick it. + nni_mtx_lock(&push->mx); + push->closing = 1; + nni_cv_wake(&push->cv); + nni_mtx_unlock(&push->mx); + + nni_thr_fini(&push->sender); + nni_cv_fini(&push->cv); + nni_mtx_fini(&push->mx); + NNI_FREE_STRUCT(push); +} + + +static int +nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) +{ + nni_push_pipe *pp; + int rv; + + if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_msgq_init(&pp->mq, 0)) != 0) { + NNI_FREE_STRUCT(pp); + return (rv); + } + NNI_LIST_NODE_INIT(&pp->node); + pp->pipe = pipe; + pp->sigclose = 0; + pp->push = psock; + *ppp = pp; + return (0); +} + + +static void +nni_push_pipe_fini(void *arg) +{ + nni_push_pipe *pp = arg; + + nni_msgq_fini(pp->mq); + NNI_FREE_STRUCT(pp); +} + + +static int +nni_push_pipe_add(void *arg) +{ + nni_push_pipe *pp = arg; + nni_push_sock *push = pp->push; + + if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) { + return (NNG_EPROTO); + } + // Wake the sender since we have a new pipe. + nni_mtx_lock(&push->mx); + if (push->nextpipe) { + // Inject us right before the next pipe, so that we're next. + nni_list_insert_before(&push->pipes, pp, push); + } else { + nni_list_append(&push->pipes, pp); + } + // Wake the top sender, as we can accept a job. + push->npipes++; + nni_cv_wake(&push->cv); + nni_mtx_unlock(&push->mx); + return (0); +} + + +static void +nni_push_pipe_rem(void *arg) +{ + nni_push_pipe *pp = arg; + nni_push_sock *push = pp->push; + + nni_mtx_lock(&push->mx); + if (pp == push->nextpipe) { + push->nextpipe = nni_list_next(&push->pipes, pp); + } + push->npipes--; + nni_list_remove(&push->pipes, pp); + nni_mtx_unlock(&push->mx); +} + + +static void +nni_push_pipe_send(void *arg) +{ + nni_push_pipe *pp = arg; + nni_push_sock *push = pp->push; + nni_msg *msg; + + for (;;) { + if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) { + break; + } + nni_mtx_lock(&push->mx); + if (push->wantw) { + nni_cv_wake(&push->cv); + } + nni_mtx_unlock(&push->mx); + if (nni_pipe_send(pp->pipe, msg) != 0) { + nni_msg_free(msg); + break; + } + } + nni_pipe_close(pp->pipe); +} + + +static void +nni_push_pipe_recv(void *arg) +{ + nni_push_pipe *pp = arg; + nni_msg *msg; + + for (;;) { + if (nni_pipe_recv(pp->pipe, &msg) != 0) { + break; + } + nni_msg_free(msg); + } + nni_msgq_signal(pp->mq, &pp->sigclose); + nni_pipe_close(pp->pipe); +} + + +static int +nni_push_setopt(void *arg, int opt, const void *buf, size_t sz) +{ + nni_push_sock *push = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + nni_mtx_lock(&push->mx); + rv = nni_setopt_int(&push->raw, buf, sz, 0, 1); + nni_mtx_unlock(&push->mx); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static int +nni_push_getopt(void *arg, int opt, void *buf, size_t *szp) +{ + nni_push_sock *push = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + nni_mtx_lock(&push->mx); + rv = nni_getopt_int(&push->raw, buf, szp); + nni_mtx_unlock(&push->mx); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static void +nni_push_rrdist(void *arg) +{ + nni_push_sock *push = arg; + nni_push_pipe *pp; + nni_msgq *uwq = push->uwq; + nni_msg *msg = NULL; + int rv; + int i; + + for (;;) { + if ((msg == NULL) && (nni_msgq_get(uwq, &msg) != 0)) { + // Should only be NNG_ECLOSED + return; + } + + nni_mtx_lock(&push->mx); + if (push->closing) { + if (msg != NULL) { + nni_mtx_unlock(&push->mx); + nni_msg_free(msg); + return; + } + } + for (i = 0; i < push->npipes; i++) { + pp = push->nextpipe; + if (pp == NULL) { + pp = nni_list_first(&push->pipes); + } + push->nextpipe = nni_list_next(&push->pipes, pp); + if (nni_msgq_tryput(pp->mq, msg) == 0) { + msg = NULL; + break; + } + } + if (msg != NULL) { + // We weren't able to deliver it, so keep it and + // wait for a sender to let us know its ready. + push->wantw = 1; + nni_cv_wait(&push->cv); + } else { + push->wantw = 0; + } + nni_mtx_unlock(&push->mx); + } +} + + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +static nni_proto_pipe nni_push_proto_pipe = { + .pipe_init = nni_push_pipe_init, + .pipe_fini = nni_push_pipe_fini, + .pipe_add = nni_push_pipe_add, + .pipe_rem = nni_push_pipe_rem, + .pipe_send = nni_push_pipe_send, + .pipe_recv = nni_push_pipe_recv, +}; + +nni_proto nni_push_proto = { + .proto_self = NNG_PROTO_PUSH, + .proto_peer = NNG_PROTO_PULL, + .proto_name = "push", + .proto_pipe = &nni_push_proto_pipe, + .proto_init = nni_push_init, + .proto_fini = nni_push_fini, + .proto_setopt = nni_push_setopt, + .proto_getopt = nni_push_getopt, + .proto_recv_filter = NULL, + .proto_send_filter = NULL, +}; |
