diff options
| author | Garrett D'Amore <garrett@damore.org> | 2021-01-01 11:30:03 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2021-01-01 12:46:17 -0800 |
| commit | ed542ac45e00c9b2faa0b41f3c00de6e291e5678 (patch) | |
| tree | 673924ff077d468e6756529c2c204698d3faa47c /src/sp/protocol/pipeline0/push.c | |
| parent | 1413b2421a82cd9b9cde178d44fb60c7893176b0 (diff) | |
| download | nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.gz nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.bz2 nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.zip | |
fixes #1345 Restructure the source tree
This is not quite complete, but it sets the stage for other
protocols (such as zmq or mqtt) to be added to the project.
Diffstat (limited to 'src/sp/protocol/pipeline0/push.c')
| -rw-r--r-- | src/sp/protocol/pipeline0/push.c | 442 |
1 files changed, 442 insertions, 0 deletions
diff --git a/src/sp/protocol/pipeline0/push.c b/src/sp/protocol/pipeline0/push.c new file mode 100644 index 00000000..ad43d967 --- /dev/null +++ b/src/sp/protocol/pipeline0/push.c @@ -0,0 +1,442 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> + +#include "core/nng_impl.h" +#include "nng/protocol/pipeline0/push.h" + +// Push protocol. The PUSH protocol is the "write" side of a pipeline. +// Push distributes fairly, or tries to, by giving messages in round-robin +// order. + +#ifndef NNI_PROTO_PULL_V0 +#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) +#endif + +#ifndef NNI_PROTO_PUSH_V0 +#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) +#endif + +typedef struct push0_pipe push0_pipe; +typedef struct push0_sock push0_sock; + +static void push0_send_cb(void *); +static void push0_recv_cb(void *); +static void push0_pipe_ready(push0_pipe *); + +// push0_sock is our per-socket protocol private structure. +struct push0_sock { + nni_lmq wq; // list of messages queued + nni_list aq; // list of aio senders waiting + nni_list pl; // list of pipes ready to send + nni_pollable writable; + nni_mtx m; +}; + +// push0_pipe is our per-pipe protocol private structure. +struct push0_pipe { + nni_pipe * pipe; + push0_sock * push; + nni_list_node node; + + nni_aio aio_recv; + nni_aio aio_send; +}; + +static int +push0_sock_init(void *arg, nni_sock *sock) +{ + push0_sock *s = arg; + NNI_ARG_UNUSED(sock); + + nni_mtx_init(&s->m); + nni_aio_list_init(&s->aq); + NNI_LIST_INIT(&s->pl, push0_pipe, node); + nni_lmq_init(&s->wq, 0); // initially we start unbuffered. + nni_pollable_init(&s->writable); + + return (0); +} + +static void +push0_sock_fini(void *arg) +{ + push0_sock *s = arg; + nni_pollable_fini(&s->writable); + nni_lmq_fini(&s->wq); + nni_mtx_fini(&s->m); +} + +static void +push0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +push0_sock_close(void *arg) +{ + push0_sock *s = arg; + nni_aio * a; + nni_mtx_lock(&s->m); + while ((a = nni_list_first(&s->aq)) != NULL) { + nni_aio_list_remove(a); + nni_aio_finish_error(a, NNG_ECLOSED); + } + nni_mtx_unlock(&s->m); +} + +static void +push0_pipe_stop(void *arg) +{ + push0_pipe *p = arg; + + nni_aio_stop(&p->aio_recv); + nni_aio_stop(&p->aio_send); +} + +static void +push0_pipe_fini(void *arg) +{ + push0_pipe *p = arg; + + nni_aio_fini(&p->aio_recv); + nni_aio_fini(&p->aio_send); +} + +static int +push0_pipe_init(void *arg, nni_pipe *pipe, void *s) +{ + push0_pipe *p = arg; + + nni_aio_init(&p->aio_recv, push0_recv_cb, p); + nni_aio_init(&p->aio_send, push0_send_cb, p); + NNI_LIST_NODE_INIT(&p->node); + p->pipe = pipe; + p->push = s; + return (0); +} + +static int +push0_pipe_start(void *arg) +{ + push0_pipe *p = arg; + + if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) { + return (NNG_EPROTO); + } + + // Schedule a receiver. This is mostly so that we can detect + // a closed transport pipe. + nni_pipe_recv(p->pipe, &p->aio_recv); + push0_pipe_ready(p); + + return (0); +} + +static void +push0_pipe_close(void *arg) +{ + push0_pipe *p = arg; + push0_sock *s = p->push; + + nni_aio_close(&p->aio_recv); + nni_aio_close(&p->aio_send); + + nni_mtx_lock(&s->m); + if (nni_list_node_active(&p->node)) { + nni_list_node_remove(&p->node); + + if (nni_list_empty(&s->pl) && nni_lmq_full(&s->wq)) { + nni_pollable_clear(&s->writable); + } + } + nni_mtx_unlock(&s->m); +} + +static void +push0_recv_cb(void *arg) +{ + push0_pipe *p = arg; + + // We normally expect to receive an error. If a pipe actually + // sends us data, we just discard it. + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->pipe); + return; + } + nni_msg_free(nni_aio_get_msg(&p->aio_recv)); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->pipe, &p->aio_recv); +} + +static void +push0_pipe_ready(push0_pipe *p) +{ + push0_sock *s = p->push; + nni_msg * m; + nni_aio * a = NULL; + size_t l; + bool blocked; + + nni_mtx_lock(&s->m); + + blocked = nni_lmq_full(&s->wq) && nni_list_empty(&s->pl); + + // if message is waiting in the buffered queue + // then we prefer that. + if (nni_lmq_getq(&s->wq, &m) == 0) { + nni_aio_set_msg(&p->aio_send, m); + nni_pipe_send(p->pipe, &p->aio_send); + + if ((a = nni_list_first(&s->aq)) != NULL) { + nni_aio_list_remove(a); + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + nni_lmq_putq(&s->wq, m); + } + + } else if ((a = nni_list_first(&s->aq)) != NULL) { + // Looks like we had the unbuffered case, but + // someone was waiting. + nni_aio_list_remove(a); + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + + nni_aio_set_msg(&p->aio_send, m); + nni_pipe_send(p->pipe, &p->aio_send); + } else { + // We had nothing to send. Just put us in the ready list. + nni_list_append(&s->pl, p); + } + + if (blocked) { + // if we blocked, then toggle the status. + if ((!nni_lmq_full(&s->wq)) || (!nni_list_empty(&s->pl))) { + nni_pollable_raise(&s->writable); + } + } + + nni_mtx_unlock(&s->m); + + if (a != NULL) { + nni_aio_set_msg(a, NULL); + nni_aio_finish_sync(a, 0, l); + } +} + +static void +push0_send_cb(void *arg) +{ + push0_pipe *p = arg; + + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); + nni_pipe_close(p->pipe); + return; + } + + push0_pipe_ready(p); +} + +static void +push0_cancel(nni_aio *aio, void *arg, int rv) +{ + push0_sock *s = arg; + + nni_mtx_lock(&s->m); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->m); +} + +static void +push0_sock_send(void *arg, nni_aio *aio) +{ + push0_sock *s = arg; + push0_pipe *p; + nni_msg * m; + size_t l; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + m = nni_aio_get_msg(aio); + l = nni_msg_len(m); + + nni_mtx_lock(&s->m); + + // First we want to see if we can send it right now. + // Note that we don't block the sender until the read is complete, + // only until we have committed to send it. + if ((p = nni_list_first(&s->pl)) != NULL) { + nni_list_remove(&s->pl, p); + // NB: We won't have had any waiters in the message queue + // or the aio queue, because we would not put the pipe + // in the ready list in that case. Note though that the + // wq may be "full" if we are unbuffered. + if (nni_list_empty(&s->pl) && (nni_lmq_full(&s->wq))) { + nni_pollable_clear(&s->writable); + } + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, l); + nni_aio_set_msg(&p->aio_send, m); + nni_pipe_send(p->pipe, &p->aio_send); + nni_mtx_unlock(&s->m); + return; + } + + // Can we maybe queue it. + if (nni_lmq_putq(&s->wq, m) == 0) { + // Yay, we can. So we're done. + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, l); + if (nni_lmq_full(&s->wq)) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->m); + return; + } + + if ((rv = nni_aio_schedule(aio, push0_cancel, s)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&s->m); + return; + } + nni_aio_list_append(&s->aq, aio); + nni_mtx_unlock(&s->m); +} + +static void +push0_sock_recv(void *arg, nni_aio *aio) +{ + NNI_ARG_UNUSED(arg); + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + +static int +push0_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + push0_sock *s = arg; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->m); + rv = nni_lmq_resize(&s->wq, (size_t) val); + // Changing the size of the queue can affect our readiness. + if (!nni_lmq_full(&s->wq)) { + nni_pollable_raise(&s->writable); + } else if (nni_list_empty(&s->pl)) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->m); + return (rv); +} + +static int +push0_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + push0_sock *s = arg; + int val; + + nni_mtx_lock(&s->m); + val = nni_lmq_cap(&s->wq); + nni_mtx_unlock(&s->m); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +push0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + push0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static nni_proto_pipe_ops push0_pipe_ops = { + .pipe_size = sizeof(push0_pipe), + .pipe_init = push0_pipe_init, + .pipe_fini = push0_pipe_fini, + .pipe_start = push0_pipe_start, + .pipe_close = push0_pipe_close, + .pipe_stop = push0_pipe_stop, +}; + +static nni_option push0_sock_options[] = { + { + .o_name = NNG_OPT_SENDFD, + .o_get = push0_sock_get_send_fd, + }, + { + .o_name = NNG_OPT_SENDBUF, + .o_get = push0_get_send_buf_len, + .o_set = push0_set_send_buf_len, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops push0_sock_ops = { + .sock_size = sizeof(push0_sock), + .sock_init = push0_sock_init, + .sock_fini = push0_sock_fini, + .sock_open = push0_sock_open, + .sock_close = push0_sock_close, + .sock_options = push0_sock_options, + .sock_send = push0_sock_send, + .sock_recv = push0_sock_recv, +}; + +static nni_proto push0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUSH_V0, "push" }, + .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, + .proto_flags = NNI_PROTO_FLAG_SND, + .proto_pipe_ops = &push0_pipe_ops, + .proto_sock_ops = &push0_sock_ops, +}; + +static nni_proto push0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUSH_V0, "push" }, + .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, + .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW, + .proto_pipe_ops = &push0_pipe_ops, + .proto_sock_ops = &push0_sock_ops, +}; + +int +nng_push0_open(nng_socket *s) +{ + return (nni_proto_open(s, &push0_proto)); +} + +int +nng_push0_open_raw(nng_socket *s) +{ + return (nni_proto_open(s, &push0_proto_raw)); +} |
