diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-06 18:26:28 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-06 18:26:28 -0800 |
| commit | e1d963b2d35dac0e0332a8654befdb327dc0094c (patch) | |
| tree | dee3d4cdd0fd56e203723413542ddf0d737e74b6 /src/protocol/pipeline/pull.c | |
| parent | f0cf0d11a0ec35fb6c0b65351d5a8bb311dc091d (diff) | |
| download | nng-e1d963b2d35dac0e0332a8654befdb327dc0094c.tar.gz nng-e1d963b2d35dac0e0332a8654befdb327dc0094c.tar.bz2 nng-e1d963b2d35dac0e0332a8654befdb327dc0094c.zip | |
Initial swag at pipeline (PUSH/PULL).
PUSH attempts to do a round-robin based distribution. However, I
noticed that there is a bug in REQ, because REQ sockets will continue
to pull down work until the first one no longer has room. This can
in theory lead to scheduliung imbalances when the load is very light.
(Under heavy load, the backpressure dominates.)
Also, I note that mangos suffers the same problem. It does not
make any attempt to deliver work equally, basically each pipe winds
up pulling messages until its own buffers are full. This is bad.
We can borrow the logic here for both REQ and mangos.
None of this is tested yet.
Diffstat (limited to 'src/protocol/pipeline/pull.c')
| -rw-r--r-- | src/protocol/pipeline/pull.c | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c new file mode 100644 index 00000000..695b730e --- /dev/null +++ b/src/protocol/pipeline/pull.c @@ -0,0 +1,195 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" + +// Pull protocol. The PULL protocol is the "read" side of a pipeline. + +typedef struct nni_pull_pipe nni_pull_pipe; +typedef struct nni_pull_sock nni_pull_sock; + +// An nni_pull_sock is our per-socket protocol private structure. +struct nni_pull_sock { + nni_mtx mx; + nni_msgq * urq; + int raw; +}; + +// An nni_pull_pipe is our per-pipe protocol private structure. +struct nni_pull_pipe { + nni_pipe * pipe; + nni_pull_sock * pull; +}; + +static int +nni_pull_init(void **pullp, nni_sock *sock) +{ + nni_pull_sock *pull; + int rv; + + if ((pull = NNI_ALLOC_STRUCT(pull)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mtx_init(&pull->mx)) != 0) { + NNI_FREE_STRUCT(pull); + return (rv); + } + pull->raw = 0; + pull->urq = nni_sock_recvq(sock); + *pullp = pull; + nni_sock_senderr(sock, NNG_ENOTSUP); + return (0); +} + + +static void +nni_pull_fini(void *arg) +{ + nni_pull_sock *pull = arg; + + nni_mtx_fini(&pull->mx); + NNI_FREE_STRUCT(pull); +} + + +static int +nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock) +{ + nni_pull_pipe *pp; + int rv; + + if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { + return (NNG_ENOMEM); + } + pp->pipe = pipe; + pp->pull = psock; + *ppp = pp; + return (0); +} + + +static void +nni_pull_pipe_fini(void *arg) +{ + nni_pull_pipe *pp = arg; + + NNI_FREE_STRUCT(pp); +} + + +static int +nni_pull_pipe_add(void *arg) +{ + nni_pull_pipe *pp = arg; + + if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PUSH) { + return (NNG_EPROTO); + } + return (0); +} + + +static void +nni_pull_pipe_rem(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + + +static void +nni_pull_pipe_send(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + + +static void +nni_pull_pipe_recv(void *arg) +{ + nni_pull_pipe *pp = arg; + nni_pull_sock *pull = pp->pull; + nni_msg *msg; + + for (;;) { + if (nni_pipe_recv(pp->pipe, &msg) != 0) { + break; + } + if (nni_msgq_put(pull->urq, msg) != 0) { + nni_msg_free(msg); + break; + } + } + nni_pipe_close(pp->pipe); +} + + +static int +nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz) +{ + nni_pull_sock *pull = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + nni_mtx_lock(&pull->mx); + rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1); + nni_mtx_unlock(&pull->mx); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static int +nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp) +{ + nni_pull_sock *pull = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + nni_mtx_lock(&pull->mx); + rv = nni_getopt_int(&pull->raw, buf, szp); + nni_mtx_unlock(&pull->mx); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +static nni_proto_pipe nni_pull_proto_pipe = { + .pipe_init = nni_pull_pipe_init, + .pipe_fini = nni_pull_pipe_fini, + .pipe_add = nni_pull_pipe_add, + .pipe_rem = nni_pull_pipe_rem, + .pipe_send = nni_pull_pipe_send, + .pipe_recv = nni_pull_pipe_recv, +}; + +nni_proto nni_pull_proto = { + .proto_self = NNG_PROTO_PULL, + .proto_peer = NNG_PROTO_PUSH, + .proto_name = "pull", + .proto_pipe = &nni_pull_proto_pipe, + .proto_init = nni_pull_init, + .proto_fini = nni_pull_fini, + .proto_setopt = nni_pull_setopt, + .proto_getopt = nni_pull_getopt, + .proto_recv_filter = NULL, + .proto_send_filter = NULL, +}; |
