From 7bf591e20a94b8d926f92ab9b320f3b75d342345 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Tue, 31 Oct 2017 13:06:38 -0700 Subject: fixes #143 Protocols and transports should be "configurable" This makes all the protocols and transports optional. All of them except ZeroTier are enabled by default, but you can now disable them (remove from the build) with cmake options. The test suite is modified so that tests still run as much as they can, but skip over things caused by missing functionality from the library (due to configuration). Further, the constant definitions and prototypes for functions that are specific to transports or protocols are moved into appropriate headers, which should be included directly by applications wishing to use these. We have also added and improved documentation -- all of the transports are documented, and several more man pages for protocols have been added. (Req/Rep and Surveyor are still missing.) --- src/protocol/bus/bus.c | 401 -------------------- src/protocol/bus0/CMakeLists.txt | 18 + src/protocol/bus0/bus.c | 406 ++++++++++++++++++++ src/protocol/bus0/bus.h | 28 ++ src/protocol/pair/pair_v0.c | 301 --------------- src/protocol/pair/pair_v1.c | 513 -------------------------- src/protocol/pair0/CMakeLists.txt | 18 + src/protocol/pair0/pair.c | 305 +++++++++++++++ src/protocol/pair0/pair.h | 28 ++ src/protocol/pair1/CMakeLists.txt | 18 + src/protocol/pair1/pair.c | 519 ++++++++++++++++++++++++++ src/protocol/pair1/pair.h | 30 ++ src/protocol/pipeline/pull.c | 242 ------------ src/protocol/pipeline/push.c | 259 ------------- src/protocol/pipeline0/CMakeLists.txt | 23 ++ src/protocol/pipeline0/pull.c | 250 +++++++++++++ src/protocol/pipeline0/pull.h | 28 ++ src/protocol/pipeline0/push.c | 267 ++++++++++++++ src/protocol/pipeline0/push.h | 28 ++ src/protocol/pubsub/pub.c | 335 ----------------- src/protocol/pubsub/sub.c | 398 -------------------- src/protocol/pubsub0/CMakeLists.txt | 23 ++ src/protocol/pubsub0/pub.c | 344 +++++++++++++++++ src/protocol/pubsub0/pub.h | 28 ++ src/protocol/pubsub0/sub.c | 404 ++++++++++++++++++++ src/protocol/pubsub0/sub.h | 31 ++ src/protocol/reqrep/rep.c | 506 ------------------------- src/protocol/reqrep/req.c | 668 --------------------------------- src/protocol/reqrep0/CMakeLists.txt | 23 ++ src/protocol/reqrep0/rep.c | 515 ++++++++++++++++++++++++++ src/protocol/reqrep0/rep.h | 28 ++ src/protocol/reqrep0/req.c | 675 ++++++++++++++++++++++++++++++++++ src/protocol/reqrep0/req.h | 30 ++ src/protocol/survey/respond.c | 500 ------------------------- src/protocol/survey/survey.c | 475 ------------------------ src/protocol/survey0/CMakeLists.txt | 23 ++ src/protocol/survey0/respond.c | 509 +++++++++++++++++++++++++ src/protocol/survey0/respond.h | 28 ++ src/protocol/survey0/survey.c | 484 ++++++++++++++++++++++++ src/protocol/survey0/survey.h | 30 ++ 40 files changed, 5141 insertions(+), 4598 deletions(-) delete mode 100644 src/protocol/bus/bus.c create mode 100644 src/protocol/bus0/CMakeLists.txt create mode 100644 src/protocol/bus0/bus.c create mode 100644 src/protocol/bus0/bus.h delete mode 100644 src/protocol/pair/pair_v0.c delete mode 100644 src/protocol/pair/pair_v1.c create mode 100644 src/protocol/pair0/CMakeLists.txt create mode 100644 src/protocol/pair0/pair.c create mode 100644 src/protocol/pair0/pair.h create mode 100644 src/protocol/pair1/CMakeLists.txt create mode 100644 src/protocol/pair1/pair.c create mode 100644 src/protocol/pair1/pair.h delete mode 100644 src/protocol/pipeline/pull.c delete mode 100644 src/protocol/pipeline/push.c create mode 100644 src/protocol/pipeline0/CMakeLists.txt create mode 100644 src/protocol/pipeline0/pull.c create mode 100644 src/protocol/pipeline0/pull.h create mode 100644 src/protocol/pipeline0/push.c create mode 100644 src/protocol/pipeline0/push.h delete mode 100644 src/protocol/pubsub/pub.c delete mode 100644 src/protocol/pubsub/sub.c create mode 100644 src/protocol/pubsub0/CMakeLists.txt create mode 100644 src/protocol/pubsub0/pub.c create mode 100644 src/protocol/pubsub0/pub.h create mode 100644 src/protocol/pubsub0/sub.c create mode 100644 src/protocol/pubsub0/sub.h delete mode 100644 src/protocol/reqrep/rep.c delete mode 100644 src/protocol/reqrep/req.c create mode 100644 src/protocol/reqrep0/CMakeLists.txt create mode 100644 src/protocol/reqrep0/rep.c create mode 100644 src/protocol/reqrep0/rep.h create mode 100644 src/protocol/reqrep0/req.c create mode 100644 src/protocol/reqrep0/req.h delete mode 100644 src/protocol/survey/respond.c delete mode 100644 src/protocol/survey/survey.c create mode 100644 src/protocol/survey0/CMakeLists.txt create mode 100644 src/protocol/survey0/respond.c create mode 100644 src/protocol/survey0/respond.h create mode 100644 src/protocol/survey0/survey.c create mode 100644 src/protocol/survey0/survey.h (limited to 'src/protocol') diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c deleted file mode 100644 index 6ec0066b..00000000 --- a/src/protocol/bus/bus.c +++ /dev/null @@ -1,401 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include - -#include "core/nng_impl.h" - -// Bus protocol. The BUS protocol, each peer sends a message to its peers. -// However, bus protocols do not "forward" (absent a device). So in order -// for each participant to receive the message, each sender must be connected -// to every other node in the network (full mesh). - -typedef struct bus_pipe bus_pipe; -typedef struct bus_sock bus_sock; - -static void bus_sock_getq(bus_sock *); -static void bus_sock_send(void *, nni_aio *); -static void bus_sock_recv(void *, nni_aio *); - -static void bus_pipe_getq(bus_pipe *); -static void bus_pipe_send(bus_pipe *); -static void bus_pipe_recv(bus_pipe *); - -static void bus_sock_getq_cb(void *); -static void bus_pipe_getq_cb(void *); -static void bus_pipe_send_cb(void *); -static void bus_pipe_recv_cb(void *); -static void bus_pipe_putq_cb(void *); - -// A bus_sock is our per-socket protocol private structure. -struct bus_sock { - int raw; - nni_aio * aio_getq; - nni_list pipes; - nni_mtx mtx; - nni_msgq *uwq; - nni_msgq *urq; -}; - -// A bus_pipe is our per-pipe protocol private structure. -struct bus_pipe { - nni_pipe * npipe; - bus_sock * psock; - nni_msgq * sendq; - nni_list_node node; - nni_aio * aio_getq; - nni_aio * aio_recv; - nni_aio * aio_send; - nni_aio * aio_putq; - nni_mtx mtx; -}; - -static void -bus_sock_fini(void *arg) -{ - bus_sock *s = arg; - - nni_aio_stop(s->aio_getq); - nni_aio_fini(s->aio_getq); - nni_mtx_fini(&s->mtx); - NNI_FREE_STRUCT(s); -} - -static int -bus_sock_init(void **sp, nni_sock *nsock) -{ - bus_sock *s; - int rv; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - NNI_LIST_INIT(&s->pipes, bus_pipe, node); - nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) { - bus_sock_fini(s); - return (rv); - } - s->raw = 0; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - - *sp = s; - return (0); -} - -static void -bus_sock_open(void *arg) -{ - bus_sock *s = arg; - - bus_sock_getq(s); -} - -static void -bus_sock_close(void *arg) -{ - bus_sock *s = arg; - - nni_aio_cancel(s->aio_getq, NNG_ECLOSED); -} - -static void -bus_pipe_fini(void *arg) -{ - bus_pipe *p = arg; - - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_msgq_fini(p->sendq); - nni_mtx_fini(&p->mtx); - NNI_FREE_STRUCT(p); -} - -static int -bus_pipe_init(void **pp, nni_pipe *npipe, void *s) -{ - bus_pipe *p; - int rv; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - NNI_LIST_NODE_INIT(&p->node); - nni_mtx_init(&p->mtx); - if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) { - bus_pipe_fini(p); - return (rv); - } - - p->npipe = npipe; - p->psock = s; - *pp = p; - return (0); -} - -static int -bus_pipe_start(void *arg) -{ - bus_pipe *p = arg; - bus_sock *s = p->psock; - - nni_mtx_lock(&s->mtx); - nni_list_append(&s->pipes, p); - nni_mtx_unlock(&s->mtx); - - bus_pipe_recv(p); - bus_pipe_getq(p); - - return (0); -} - -static void -bus_pipe_stop(void *arg) -{ - bus_pipe *p = arg; - bus_sock *s = p->psock; - - nni_msgq_close(p->sendq); - - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); - - nni_mtx_lock(&s->mtx); - if (nni_list_active(&s->pipes, p)) { - nni_list_remove(&s->pipes, p); - } - nni_mtx_unlock(&s->mtx); -} - -static void -bus_pipe_getq_cb(void *arg) -{ - bus_pipe *p = arg; - - if (nni_aio_result(p->aio_getq) != 0) { - // closed? - nni_pipe_stop(p->npipe); - return; - } - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - nni_pipe_send(p->npipe, p->aio_send); -} - -static void -bus_pipe_send_cb(void *arg) -{ - bus_pipe *p = arg; - - if (nni_aio_result(p->aio_send) != 0) { - // closed? - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); - return; - } - - bus_pipe_getq(p); -} - -static void -bus_pipe_recv_cb(void *arg) -{ - bus_pipe *p = arg; - bus_sock *s = p->psock; - nni_msg * msg; - - if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); - return; - } - msg = nni_aio_get_msg(p->aio_recv); - - if (nni_msg_header_insert_u32(msg, nni_pipe_id(p->npipe)) != 0) { - // XXX: bump a nomemory stat - nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_stop(p->npipe); - return; - } - - nni_aio_set_msg(p->aio_putq, msg); - nni_aio_set_msg(p->aio_recv, NULL); - nni_msgq_aio_put(s->urq, p->aio_putq); -} - -static void -bus_pipe_putq_cb(void *arg) -{ - bus_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); - return; - } - - // Wait for another recv. - bus_pipe_recv(p); -} - -static void -bus_sock_getq_cb(void *arg) -{ - bus_sock *s = arg; - bus_pipe *p; - bus_pipe *lastp; - nni_msg * msg; - nni_msg * dup; - uint32_t sender; - - if (nni_aio_result(s->aio_getq) != 0) { - return; - } - - msg = nni_aio_get_msg(s->aio_getq); - - // The header being present indicates that the message - // was received locally and we are rebroadcasting. (Device - // is doing this probably.) In this case grab the pipe - // ID from the header, so we can exclude it. - if (nni_msg_header_len(msg) >= 4) { - sender = nni_msg_header_trim_u32(msg); - } else { - sender = 0; - } - - nni_mtx_lock(&s->mtx); - lastp = nni_list_last(&s->pipes); - NNI_LIST_FOREACH (&s->pipes, p) { - if (nni_pipe_id(p->npipe) == sender) { - continue; - } - if (p != lastp) { - if (nni_msg_dup(&dup, msg) != 0) { - continue; - } - } else { - dup = msg; - } - if (nni_msgq_tryput(p->sendq, dup) != 0) { - nni_msg_free(dup); - } - } - nni_mtx_unlock(&s->mtx); - - if (lastp == NULL) { - nni_msg_free(msg); - } - - bus_sock_getq(s); -} - -static void -bus_sock_getq(bus_sock *s) -{ - nni_msgq_aio_get(s->uwq, s->aio_getq); -} - -static void -bus_pipe_getq(bus_pipe *p) -{ - nni_msgq_aio_get(p->sendq, p->aio_getq); -} - -static void -bus_pipe_recv(bus_pipe *p) -{ - nni_pipe_recv(p->npipe, p->aio_recv); -} - -static int -bus_sock_setopt_raw(void *arg, const void *buf, size_t sz) -{ - bus_sock *s = arg; - return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); -} - -static int -bus_sock_getopt_raw(void *arg, void *buf, size_t *szp) -{ - bus_sock *s = arg; - return (nni_getopt_int(s->raw, buf, szp)); -} - -static void -bus_sock_send(void *arg, nni_aio *aio) -{ - bus_sock *s = arg; - - nni_msgq_aio_put(s->uwq, aio); -} - -static void -bus_sock_recv(void *arg, nni_aio *aio) -{ - bus_sock *s = arg; - - nni_msgq_aio_get(s->urq, aio); -} - -static nni_proto_pipe_ops bus_pipe_ops = { - .pipe_init = bus_pipe_init, - .pipe_fini = bus_pipe_fini, - .pipe_start = bus_pipe_start, - .pipe_stop = bus_pipe_stop, -}; - -static nni_proto_sock_option bus_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_getopt = bus_sock_getopt_raw, - .pso_setopt = bus_sock_setopt_raw, - }, - // terminate list - { NULL, NULL, NULL }, -}; - -static nni_proto_sock_ops bus_sock_ops = { - .sock_init = bus_sock_init, - .sock_fini = bus_sock_fini, - .sock_open = bus_sock_open, - .sock_close = bus_sock_close, - .sock_send = bus_sock_send, - .sock_recv = bus_sock_recv, - .sock_options = bus_sock_options, -}; - -static nni_proto bus_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_BUS_V0, "bus" }, - .proto_peer = { NNI_PROTO_BUS_V0, "bus" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &bus_sock_ops, - .proto_pipe_ops = &bus_pipe_ops, -}; - -int -nng_bus0_open(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &bus_proto)); -} diff --git a/src/protocol/bus0/CMakeLists.txt b/src/protocol/bus0/CMakeLists.txt new file mode 100644 index 00000000..5071054a --- /dev/null +++ b/src/protocol/bus0/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore +# Copyright 2017 Capitar IT Group BV +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Bus protocol + +if (NNG_PROTO_BUS0) + set(BUS0_SOURCES protocol/bus0/bus.c protocol/bus0/bus.h) + install(FILES bus.h DESTINATION include/nng/protocol/bus0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${BUS0_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c new file mode 100644 index 00000000..3e15000b --- /dev/null +++ b/src/protocol/bus0/bus.c @@ -0,0 +1,406 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" +#include "protocol/bus0/bus.h" + +// Bus protocol. The BUS protocol, each peer sends a message to its peers. +// However, bus protocols do not "forward" (absent a device). So in order +// for each participant to receive the message, each sender must be connected +// to every other node in the network (full mesh). + +#ifndef NNI_PROTO_BUS_V0 +#define NNI_PROTO_BUS_V0 NNI_PROTO(7, 0) +#endif + +typedef struct bus0_pipe bus0_pipe; +typedef struct bus0_sock bus0_sock; + +static void bus0_sock_getq(bus0_sock *); +static void bus0_sock_send(void *, nni_aio *); +static void bus0_sock_recv(void *, nni_aio *); + +static void bus0_pipe_getq(bus0_pipe *); +static void bus0_pipe_send(bus0_pipe *); +static void bus0_pipe_recv(bus0_pipe *); + +static void bus0_sock_getq_cb(void *); +static void bus0_pipe_getq_cb(void *); +static void bus0_pipe_send_cb(void *); +static void bus0_pipe_recv_cb(void *); +static void bus0_pipe_putq_cb(void *); + +// bus0_sock is our per-socket protocol private structure. +struct bus0_sock { + int raw; + nni_aio * aio_getq; + nni_list pipes; + nni_mtx mtx; + nni_msgq *uwq; + nni_msgq *urq; +}; + +// bus0_pipe is our per-pipe protocol private structure. +struct bus0_pipe { + nni_pipe * npipe; + bus0_sock * psock; + nni_msgq * sendq; + nni_list_node node; + nni_aio * aio_getq; + nni_aio * aio_recv; + nni_aio * aio_send; + nni_aio * aio_putq; + nni_mtx mtx; +}; + +static void +bus0_sock_fini(void *arg) +{ + bus0_sock *s = arg; + + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static int +bus0_sock_init(void **sp, nni_sock *nsock) +{ + bus0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&s->pipes, bus0_pipe, node); + nni_mtx_init(&s->mtx); + if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { + bus0_sock_fini(s); + return (rv); + } + s->raw = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + + *sp = s; + return (0); +} + +static void +bus0_sock_open(void *arg) +{ + bus0_sock *s = arg; + + bus0_sock_getq(s); +} + +static void +bus0_sock_close(void *arg) +{ + bus0_sock *s = arg; + + nni_aio_cancel(s->aio_getq, NNG_ECLOSED); +} + +static void +bus0_pipe_fini(void *arg) +{ + bus0_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_msgq_fini(p->sendq); + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); +} + +static int +bus0_pipe_init(void **pp, nni_pipe *npipe, void *s) +{ + bus0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_NODE_INIT(&p->node); + nni_mtx_init(&p->mtx); + if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { + bus0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = s; + *pp = p; + return (0); +} + +static int +bus0_pipe_start(void *arg) +{ + bus0_pipe *p = arg; + bus0_sock *s = p->psock; + + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + nni_mtx_unlock(&s->mtx); + + bus0_pipe_recv(p); + bus0_pipe_getq(p); + + return (0); +} + +static void +bus0_pipe_stop(void *arg) +{ + bus0_pipe *p = arg; + bus0_sock *s = p->psock; + + nni_msgq_close(p->sendq); + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); + + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); + } + nni_mtx_unlock(&s->mtx); +} + +static void +bus0_pipe_getq_cb(void *arg) +{ + bus0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + // closed? + nni_pipe_stop(p->npipe); + return; + } + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->npipe, p->aio_send); +} + +static void +bus0_pipe_send_cb(void *arg) +{ + bus0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + // closed? + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); + return; + } + + bus0_pipe_getq(p); +} + +static void +bus0_pipe_recv_cb(void *arg) +{ + bus0_pipe *p = arg; + bus0_sock *s = p->psock; + nni_msg * msg; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->npipe); + return; + } + msg = nni_aio_get_msg(p->aio_recv); + + if (nni_msg_header_insert_u32(msg, nni_pipe_id(p->npipe)) != 0) { + // XXX: bump a nomemory stat + nni_msg_free(msg); + nni_aio_set_msg(p->aio_recv, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_aio_set_msg(p->aio_putq, msg); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msgq_aio_put(s->urq, p->aio_putq); +} + +static void +bus0_pipe_putq_cb(void *arg) +{ + bus0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->npipe); + return; + } + + // Wait for another recv. + bus0_pipe_recv(p); +} + +static void +bus0_sock_getq_cb(void *arg) +{ + bus0_sock *s = arg; + bus0_pipe *p; + bus0_pipe *lastp; + nni_msg * msg; + nni_msg * dup; + uint32_t sender; + + if (nni_aio_result(s->aio_getq) != 0) { + return; + } + + msg = nni_aio_get_msg(s->aio_getq); + + // The header being present indicates that the message + // was received locally and we are rebroadcasting. (Device + // is doing this probably.) In this case grab the pipe + // ID from the header, so we can exclude it. + if (nni_msg_header_len(msg) >= 4) { + sender = nni_msg_header_trim_u32(msg); + } else { + sender = 0; + } + + nni_mtx_lock(&s->mtx); + lastp = nni_list_last(&s->pipes); + NNI_LIST_FOREACH (&s->pipes, p) { + if (nni_pipe_id(p->npipe) == sender) { + continue; + } + if (p != lastp) { + if (nni_msg_dup(&dup, msg) != 0) { + continue; + } + } else { + dup = msg; + } + if (nni_msgq_tryput(p->sendq, dup) != 0) { + nni_msg_free(dup); + } + } + nni_mtx_unlock(&s->mtx); + + if (lastp == NULL) { + nni_msg_free(msg); + } + + bus0_sock_getq(s); +} + +static void +bus0_sock_getq(bus0_sock *s) +{ + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +bus0_pipe_getq(bus0_pipe *p) +{ + nni_msgq_aio_get(p->sendq, p->aio_getq); +} + +static void +bus0_pipe_recv(bus0_pipe *p) +{ + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static int +bus0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + bus0_sock *s = arg; + return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); +} + +static int +bus0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + bus0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static void +bus0_sock_send(void *arg, nni_aio *aio) +{ + bus0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static void +bus0_sock_recv(void *arg, nni_aio *aio) +{ + bus0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static nni_proto_pipe_ops bus0_pipe_ops = { + .pipe_init = bus0_pipe_init, + .pipe_fini = bus0_pipe_fini, + .pipe_start = bus0_pipe_start, + .pipe_stop = bus0_pipe_stop, +}; + +static nni_proto_sock_option bus0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = bus0_sock_getopt_raw, + .pso_setopt = bus0_sock_setopt_raw, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops bus0_sock_ops = { + .sock_init = bus0_sock_init, + .sock_fini = bus0_sock_fini, + .sock_open = bus0_sock_open, + .sock_close = bus0_sock_close, + .sock_send = bus0_sock_send, + .sock_recv = bus0_sock_recv, + .sock_options = bus0_sock_options, +}; + +static nni_proto bus0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_BUS_V0, "bus" }, + .proto_peer = { NNI_PROTO_BUS_V0, "bus" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &bus0_sock_ops, + .proto_pipe_ops = &bus0_pipe_ops, +}; + +int +nng_bus0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &bus0_proto)); +} diff --git a/src/protocol/bus0/bus.h b/src/protocol/bus0/bus.h new file mode 100644 index 00000000..0ef3d391 --- /dev/null +++ b/src/protocol/bus0/bus.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_BUS0_BUS_H +#define NNG_PROTOCOL_BUS0_BUS_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_bus0_open(nng_socket *); + +#ifndef nng_bus_open +#define nng_bus_open nng_bus0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_BUS0_BUS_H diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c deleted file mode 100644 index 93cd1497..00000000 --- a/src/protocol/pair/pair_v0.c +++ /dev/null @@ -1,301 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include - -#include "core/nng_impl.h" - -// Pair protocol. The PAIR protocol is a simple 1:1 messaging pattern. -// While a peer is connected to the server, all other peer connection -// attempts are discarded. - -typedef struct pair0_pipe pair0_pipe; -typedef struct pair0_sock pair0_sock; - -static void pair0_send_cb(void *); -static void pair0_recv_cb(void *); -static void pair0_getq_cb(void *); -static void pair0_putq_cb(void *); -static void pair0_pipe_fini(void *); - -// pair0_sock is our per-socket protocol private structure. -struct pair0_sock { - pair0_pipe *ppipe; - nni_msgq * uwq; - nni_msgq * urq; - int raw; - nni_mtx mtx; -}; - -// An pair0_pipe is our per-pipe protocol private structure. We keep -// one of these even though in theory we'd only have a single underlying -// pipe. The separate data structure is more like other protocols that do -// manage multiple pipes. -struct pair0_pipe { - nni_pipe * npipe; - pair0_sock *psock; - nni_aio * aio_send; - nni_aio * aio_recv; - nni_aio * aio_getq; - nni_aio * aio_putq; -}; - -static int -pair0_sock_init(void **sp, nni_sock *nsock) -{ - pair0_sock *s; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&s->mtx); - s->ppipe = NULL; - s->raw = 0; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - *sp = s; - return (0); -} - -static void -pair0_sock_fini(void *arg) -{ - pair0_sock *s = arg; - - nni_mtx_fini(&s->mtx); - NNI_FREE_STRUCT(s); -} - -static void -pair0_pipe_fini(void *arg) -{ - pair0_pipe *p = arg; - - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); - NNI_FREE_STRUCT(p); -} - -static int -pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock) -{ - pair0_pipe *p; - int rv; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) { - pair0_pipe_fini(p); - return (rv); - } - - p->npipe = npipe; - p->psock = psock; - *pp = p; - return (0); -} - -static int -pair0_pipe_start(void *arg) -{ - pair0_pipe *p = arg; - pair0_sock *s = p->psock; - - nni_mtx_lock(&s->mtx); - if (s->ppipe != NULL) { - nni_mtx_unlock(&s->mtx); - return (NNG_EBUSY); // Already have a peer, denied. - } - s->ppipe = p; - nni_mtx_unlock(&s->mtx); - - // Schedule a getq on the upper, and a read from the pipe. - // Each of these also sets up another hold on the pipe itself. - nni_msgq_aio_get(s->uwq, p->aio_getq); - nni_pipe_recv(p->npipe, p->aio_recv); - - return (0); -} - -static void -pair0_pipe_stop(void *arg) -{ - pair0_pipe *p = arg; - pair0_sock *s = p->psock; - - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_getq); - - nni_mtx_lock(&s->mtx); - if (s->ppipe == p) { - s->ppipe = NULL; - } - nni_mtx_unlock(&s->mtx); -} - -static void -pair0_recv_cb(void *arg) -{ - pair0_pipe *p = arg; - pair0_sock *s = p->psock; - nni_msg * msg; - - if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); - return; - } - - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_putq, msg); - nni_aio_set_msg(p->aio_recv, NULL); - - nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); - nni_msgq_aio_put(s->urq, p->aio_putq); -} - -static void -pair0_putq_cb(void *arg) -{ - pair0_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); - return; - } - nni_pipe_recv(p->npipe, p->aio_recv); -} - -static void -pair0_getq_cb(void *arg) -{ - pair0_pipe *p = arg; - pair0_sock *s = p->psock; - - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); - return; - } - - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - nni_pipe_send(p->npipe, p->aio_send); -} - -static void -pair0_send_cb(void *arg) -{ - pair0_pipe *p = arg; - pair0_sock *s = p->psock; - - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); - return; - } - - nni_msgq_aio_get(s->uwq, p->aio_getq); -} - -static void -pair0_sock_open(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static void -pair0_sock_close(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static int -pair0_sock_setopt_raw(void *arg, const void *buf, size_t sz) -{ - pair0_sock *s = arg; - return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); -} - -static int -pair0_sock_getopt_raw(void *arg, void *buf, size_t *szp) -{ - pair0_sock *s = arg; - return (nni_getopt_int(s->raw, buf, szp)); -} - -static void -pair0_sock_send(void *arg, nni_aio *aio) -{ - pair0_sock *s = arg; - - nni_msgq_aio_put(s->uwq, aio); -} - -static void -pair0_sock_recv(void *arg, nni_aio *aio) -{ - pair0_sock *s = arg; - - nni_msgq_aio_get(s->urq, aio); -} - -static nni_proto_pipe_ops pair0_pipe_ops = { - .pipe_init = pair0_pipe_init, - .pipe_fini = pair0_pipe_fini, - .pipe_start = pair0_pipe_start, - .pipe_stop = pair0_pipe_stop, -}; - -static nni_proto_sock_option pair0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_getopt = pair0_sock_getopt_raw, - .pso_setopt = pair0_sock_setopt_raw, - }, - // terminate list - { NULL, NULL, NULL }, -}; - -static nni_proto_sock_ops pair0_sock_ops = { - .sock_init = pair0_sock_init, - .sock_fini = pair0_sock_fini, - .sock_open = pair0_sock_open, - .sock_close = pair0_sock_close, - .sock_send = pair0_sock_send, - .sock_recv = pair0_sock_recv, - .sock_options = pair0_sock_options, -}; - -// Legacy protocol (v0) -static nni_proto pair0_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_PAIR_V0, "pair" }, - .proto_peer = { NNI_PROTO_PAIR_V0, "pair" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &pair0_sock_ops, - .proto_pipe_ops = &pair0_pipe_ops, -}; - -int -nng_pair0_open(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &pair0_proto)); -} diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c deleted file mode 100644 index e14d06d5..00000000 --- a/src/protocol/pair/pair_v1.c +++ /dev/null @@ -1,513 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include - -#include "core/nng_impl.h" - -// Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern, -// usually, but it can support a polyamorous mode where a single server can -// communicate with multiple partners. - -typedef struct pair1_pipe pair1_pipe; -typedef struct pair1_sock pair1_sock; - -static void pair1_sock_getq_cb(void *); -static void pair1_pipe_send_cb(void *); -static void pair1_pipe_recv_cb(void *); -static void pair1_pipe_getq_cb(void *); -static void pair1_pipe_putq_cb(void *); -static void pair1_pipe_fini(void *); - -// pair1_sock is our per-socket protocol private structure. -struct pair1_sock { - nni_msgq * uwq; - nni_msgq * urq; - int raw; - int ttl; - nni_mtx mtx; - nni_idhash *pipes; - nni_list plist; - int started; - int poly; - nni_aio * aio_getq; -}; - -// pair1_pipe is our per-pipe protocol private structure. -struct pair1_pipe { - nni_pipe * npipe; - pair1_sock * psock; - nni_msgq * sendq; - nni_aio * aio_send; - nni_aio * aio_recv; - nni_aio * aio_getq; - nni_aio * aio_putq; - nni_list_node node; -}; - -static void -pair1_sock_fini(void *arg) -{ - pair1_sock *s = arg; - - nni_aio_fini(s->aio_getq); - nni_idhash_fini(s->pipes); - nni_mtx_fini(&s->mtx); - - NNI_FREE_STRUCT(s); -} - -static int -pair1_sock_init(void **sp, nni_sock *nsock) -{ - pair1_sock *s; - int rv; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_idhash_init(&s->pipes)) != 0) { - NNI_FREE_STRUCT(s); - return (NNG_ENOMEM); - } - NNI_LIST_INIT(&s->plist, pair1_pipe, node); - - // Raw mode uses this. - nni_mtx_init(&s->mtx); - - if ((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) { - pair1_sock_fini(s); - return (rv); - } - - s->raw = 0; - s->poly = 0; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - s->ttl = 8; - *sp = s; - - return (0); -} - -static void -pair1_pipe_fini(void *arg) -{ - pair1_pipe *p = arg; - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); - nni_msgq_fini(p->sendq); - NNI_FREE_STRUCT(p); -} - -static int -pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock) -{ - pair1_pipe *p; - int rv; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { - pair1_pipe_fini(p); - return (NNG_ENOMEM); - } - - p->npipe = npipe; - p->psock = psock; - *pp = p; - - return (rv); -} - -static int -pair1_pipe_start(void *arg) -{ - pair1_pipe *p = arg; - pair1_sock *s = p->psock; - uint32_t id; - int rv; - - id = nni_pipe_id(p->npipe); - nni_mtx_lock(&s->mtx); - if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) { - nni_mtx_unlock(&s->mtx); - return (rv); - } - if (!s->poly) { - if (!nni_list_empty(&s->plist)) { - nni_idhash_remove(s->pipes, id); - nni_mtx_unlock(&s->mtx); - return (NNG_EBUSY); - } - } else { - if (!s->started) { - nni_msgq_aio_get(s->uwq, s->aio_getq); - } - } - nni_list_append(&s->plist, p); - s->started = 1; - nni_mtx_unlock(&s->mtx); - - // Schedule a getq. In polyamorous mode we get on the per pipe - // sendq, as the socket distributes to us. In monogamous mode - // we bypass and get from the upper writeq directly (saving a - // set of context switches). - if (s->poly) { - nni_msgq_aio_get(p->sendq, p->aio_getq); - } else { - nni_msgq_aio_get(s->uwq, p->aio_getq); - } - // And the pipe read of course. - nni_pipe_recv(p->npipe, p->aio_recv); - - return (0); -} - -static void -pair1_pipe_stop(void *arg) -{ - pair1_pipe *p = arg; - pair1_sock *s = p->psock; - - nni_mtx_lock(&s->mtx); - nni_idhash_remove(s->pipes, nni_pipe_id(p->npipe)); - nni_list_node_remove(&p->node); - nni_mtx_unlock(&s->mtx); - - nni_msgq_close(p->sendq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_getq); -} - -static void -pair1_pipe_recv_cb(void *arg) -{ - pair1_pipe *p = arg; - pair1_sock *s = p->psock; - nni_msg * msg; - uint32_t hdr; - nni_pipe * npipe = p->npipe; - int rv; - - if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); - return; - } - - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); - - // Store the pipe ID. - nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); - - // If the message is missing the hop count header, scrap it. - if (nni_msg_len(msg) < sizeof(uint32_t)) { - nni_msg_free(msg); - nni_pipe_stop(npipe); - return; - } - hdr = nni_msg_trim_u32(msg); - if (hdr & 0xffffff00) { - nni_msg_free(msg); - nni_pipe_stop(npipe); - return; - } - - // If we bounced too many times, discard the message, but - // keep getting more. - if (hdr > (unsigned) s->ttl) { - nni_msg_free(msg); - nni_pipe_recv(npipe, p->aio_recv); - return; - } - - // Store the hop count in the header. - if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) { - nni_msg_free(msg); - nni_pipe_recv(npipe, p->aio_recv); - return; - } - - // Send the message up. - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(s->urq, p->aio_putq); -} - -static void -pair1_sock_getq_cb(void *arg) -{ - pair1_pipe *p; - pair1_sock *s = arg; - nni_msg * msg; - uint32_t id; - - if (nni_aio_result(s->aio_getq) != 0) { - // Socket closing... - return; - } - - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); - - // By definition we are in polyamorous mode. - NNI_ASSERT(s->poly); - - p = NULL; - nni_mtx_lock(&s->mtx); - // If no pipe was requested, we look for any connected peer. - if (((id = nni_msg_get_pipe(msg)) == 0) && - (!nni_list_empty(&s->plist))) { - p = nni_list_first(&s->plist); - } else { - nni_idhash_find(s->pipes, id, (void **) &p); - } - if (p == NULL) { - // Pipe not present! - nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - nni_msgq_aio_get(s->uwq, s->aio_getq); - return; - } - - // Try a non-blocking send. If this fails we just discard the - // message. We have to do this to avoid head-of-line blocking - // for messages sent to other pipes. Note that there is some - // buffering in the sendq. - if (nni_msgq_tryput(p->sendq, msg) != 0) { - nni_msg_free(msg); - } - - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->uwq, s->aio_getq); -} - -static void -pair1_pipe_putq_cb(void *arg) -{ - pair1_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); - return; - } - nni_pipe_recv(p->npipe, p->aio_recv); -} - -static void -pair1_pipe_getq_cb(void *arg) -{ - pair1_pipe *p = arg; - pair1_sock *s = p->psock; - nni_msg * msg; - uint32_t hops; - - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); - return; - } - - msg = nni_aio_get_msg(p->aio_getq); - nni_aio_set_msg(p->aio_getq, NULL); - - // Raw mode messages have the header already formed, with - // a hop count. Cooked mode messages have no - // header so we have to add one. - if (s->raw) { - if (nni_msg_header_len(msg) != sizeof(uint32_t)) { - goto badmsg; - } - hops = nni_msg_header_trim_u32(msg); - } else { - // Strip off any previously existing header, such as when - // replying to messages. - nni_msg_header_clear(msg); - hops = 0; - } - - hops++; - - // Insert the hops header. - if (nni_msg_header_append_u32(msg, hops) != 0) { - goto badmsg; - } - - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->npipe, p->aio_send); - return; - -badmsg: - nni_msg_free(msg); - nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq); -} - -static void -pair1_pipe_send_cb(void *arg) -{ - pair1_pipe *p = arg; - pair1_sock *s = p->psock; - - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); - return; - } - - // In polyamorous mode, we want to get from the sendq; in - // monogamous we get from upper writeq. - nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq); -} - -static void -pair1_sock_open(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static void -pair1_sock_close(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static int -pair1_sock_setopt_raw(void *arg, const void *buf, size_t sz) -{ - pair1_sock *s = arg; - int rv; - nni_mtx_lock(&s->mtx); - rv = s->started ? NNG_ESTATE : nni_setopt_int(&s->raw, buf, sz, 0, 1); - nni_mtx_unlock(&s->mtx); - return (rv); -} - -static int -pair1_sock_getopt_raw(void *arg, void *buf, size_t *szp) -{ - pair1_sock *s = arg; - return (nni_getopt_int(s->raw, buf, szp)); -} - -static int -pair1_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) -{ - pair1_sock *s = arg; - int rv; - nni_mtx_lock(&s->mtx); // Have to be locked against recv cb. - rv = nni_setopt_int(&s->ttl, buf, sz, 1, 255); - nni_mtx_unlock(&s->mtx); - return (rv); -} - -static int -pair1_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) -{ - pair1_sock *s = arg; - return (nni_getopt_int(s->ttl, buf, szp)); -} - -static int -pair1_sock_setopt_poly(void *arg, const void *buf, size_t sz) -{ - pair1_sock *s = arg; - int rv; - nni_mtx_lock(&s->mtx); - rv = s->started ? NNG_ESTATE : nni_setopt_int(&s->poly, buf, sz, 0, 1); - nni_mtx_unlock(&s->mtx); - return (rv); -} - -static int -pair1_sock_getopt_poly(void *arg, void *buf, size_t *szp) -{ - pair1_sock *s = arg; - return (nni_getopt_int(s->poly, buf, szp)); -} - -static void -pair1_sock_send(void *arg, nni_aio *aio) -{ - pair1_sock *s = arg; - - nni_msgq_aio_put(s->uwq, aio); -} - -static void -pair1_sock_recv(void *arg, nni_aio *aio) -{ - pair1_sock *s = arg; - - nni_msgq_aio_get(s->urq, aio); -} - -static nni_proto_pipe_ops pair1_pipe_ops = { - .pipe_init = pair1_pipe_init, - .pipe_fini = pair1_pipe_fini, - .pipe_start = pair1_pipe_start, - .pipe_stop = pair1_pipe_stop, -}; - -static nni_proto_sock_option pair1_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_getopt = pair1_sock_getopt_raw, - .pso_setopt = pair1_sock_setopt_raw, - }, - { - .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = pair1_sock_getopt_maxttl, - .pso_setopt = pair1_sock_setopt_maxttl, - }, - { - .pso_name = NNG_OPT_PAIR1_POLY, - .pso_getopt = pair1_sock_getopt_poly, - .pso_setopt = pair1_sock_setopt_poly, - }, - // terminate list - { NULL, NULL, NULL }, -}; - -static nni_proto_sock_ops pair1_sock_ops = { - .sock_init = pair1_sock_init, - .sock_fini = pair1_sock_fini, - .sock_open = pair1_sock_open, - .sock_close = pair1_sock_close, - .sock_recv = pair1_sock_recv, - .sock_send = pair1_sock_send, - .sock_options = pair1_sock_options, -}; - -static nni_proto pair1_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_PAIR_V1, "pair1" }, - .proto_peer = { NNI_PROTO_PAIR_V1, "pair1" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &pair1_sock_ops, - .proto_pipe_ops = &pair1_pipe_ops, -}; - -int -nng_pair1_open(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &pair1_proto)); -} diff --git a/src/protocol/pair0/CMakeLists.txt b/src/protocol/pair0/CMakeLists.txt new file mode 100644 index 00000000..68e7ad34 --- /dev/null +++ b/src/protocol/pair0/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore +# Copyright 2017 Capitar IT Group BV +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# PAIRv0 protocol + +if (NNG_PROTO_PAIR0) + set(PAIR0_SOURCES protocol/pair0/pair.c protocol/pair0/pair.h) + install(FILES pair.h DESTINATION include/nng/protocol/pair0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PAIR0_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c new file mode 100644 index 00000000..bac405b8 --- /dev/null +++ b/src/protocol/pair0/pair.c @@ -0,0 +1,305 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" + +// Pair protocol. The PAIR protocol is a simple 1:1 messaging pattern. +// While a peer is connected to the server, all other peer connection +// attempts are discarded. + +#ifndef NNI_PROTO_PAIR_V0 +#define NNI_PROTO_PAIR_V0 NNI_PROTO(1, 0) +#endif + +typedef struct pair0_pipe pair0_pipe; +typedef struct pair0_sock pair0_sock; + +static void pair0_send_cb(void *); +static void pair0_recv_cb(void *); +static void pair0_getq_cb(void *); +static void pair0_putq_cb(void *); +static void pair0_pipe_fini(void *); + +// pair0_sock is our per-socket protocol private structure. +struct pair0_sock { + pair0_pipe *ppipe; + nni_msgq * uwq; + nni_msgq * urq; + int raw; + nni_mtx mtx; +}; + +// An pair0_pipe is our per-pipe protocol private structure. We keep +// one of these even though in theory we'd only have a single underlying +// pipe. The separate data structure is more like other protocols that do +// manage multiple pipes. +struct pair0_pipe { + nni_pipe * npipe; + pair0_sock *psock; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_getq; + nni_aio * aio_putq; +}; + +static int +pair0_sock_init(void **sp, nni_sock *nsock) +{ + pair0_sock *s; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->mtx); + s->ppipe = NULL; + s->raw = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + *sp = s; + return (0); +} + +static void +pair0_sock_fini(void *arg) +{ + pair0_sock *s = arg; + + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static void +pair0_pipe_fini(void *arg) +{ + pair0_pipe *p = arg; + + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_getq); + NNI_FREE_STRUCT(p); +} + +static int +pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock) +{ + pair0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) { + pair0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = psock; + *pp = p; + return (0); +} + +static int +pair0_pipe_start(void *arg) +{ + pair0_pipe *p = arg; + pair0_sock *s = p->psock; + + nni_mtx_lock(&s->mtx); + if (s->ppipe != NULL) { + nni_mtx_unlock(&s->mtx); + return (NNG_EBUSY); // Already have a peer, denied. + } + s->ppipe = p; + nni_mtx_unlock(&s->mtx); + + // Schedule a getq on the upper, and a read from the pipe. + // Each of these also sets up another hold on the pipe itself. + nni_msgq_aio_get(s->uwq, p->aio_getq); + nni_pipe_recv(p->npipe, p->aio_recv); + + return (0); +} + +static void +pair0_pipe_stop(void *arg) +{ + pair0_pipe *p = arg; + pair0_sock *s = p->psock; + + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_getq); + + nni_mtx_lock(&s->mtx); + if (s->ppipe == p) { + s->ppipe = NULL; + } + nni_mtx_unlock(&s->mtx); +} + +static void +pair0_recv_cb(void *arg) +{ + pair0_pipe *p = arg; + pair0_sock *s = p->psock; + nni_msg * msg; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_putq, msg); + nni_aio_set_msg(p->aio_recv, NULL); + + nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); + nni_msgq_aio_put(s->urq, p->aio_putq); +} + +static void +pair0_putq_cb(void *arg) +{ + pair0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->npipe); + return; + } + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static void +pair0_getq_cb(void *arg) +{ + pair0_pipe *p = arg; + pair0_sock *s = p->psock; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + nni_pipe_send(p->npipe, p->aio_send); +} + +static void +pair0_send_cb(void *arg) +{ + pair0_pipe *p = arg; + pair0_sock *s = p->psock; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_msgq_aio_get(s->uwq, p->aio_getq); +} + +static void +pair0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +pair0_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static int +pair0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + pair0_sock *s = arg; + return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); +} + +static int +pair0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + pair0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static void +pair0_sock_send(void *arg, nni_aio *aio) +{ + pair0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static void +pair0_sock_recv(void *arg, nni_aio *aio) +{ + pair0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static nni_proto_pipe_ops pair0_pipe_ops = { + .pipe_init = pair0_pipe_init, + .pipe_fini = pair0_pipe_fini, + .pipe_start = pair0_pipe_start, + .pipe_stop = pair0_pipe_stop, +}; + +static nni_proto_sock_option pair0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = pair0_sock_getopt_raw, + .pso_setopt = pair0_sock_setopt_raw, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops pair0_sock_ops = { + .sock_init = pair0_sock_init, + .sock_fini = pair0_sock_fini, + .sock_open = pair0_sock_open, + .sock_close = pair0_sock_close, + .sock_send = pair0_sock_send, + .sock_recv = pair0_sock_recv, + .sock_options = pair0_sock_options, +}; + +// Legacy protocol (v0) +static nni_proto pair0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PAIR_V0, "pair" }, + .proto_peer = { NNI_PROTO_PAIR_V0, "pair" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &pair0_sock_ops, + .proto_pipe_ops = &pair0_pipe_ops, +}; + +int +nng_pair0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pair0_proto)); +} diff --git a/src/protocol/pair0/pair.h b/src/protocol/pair0/pair.h new file mode 100644 index 00000000..6828c921 --- /dev/null +++ b/src/protocol/pair0/pair.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PAIR0_PAIR_H +#define NNG_PROTOCOL_PAIR0_PAIR_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pair0_open(nng_socket *); + +#ifndef nng_pair_open +#define nng_pair_open nng_pair0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PAIR0_PAIR_H diff --git a/src/protocol/pair1/CMakeLists.txt b/src/protocol/pair1/CMakeLists.txt new file mode 100644 index 00000000..f35d6959 --- /dev/null +++ b/src/protocol/pair1/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore +# Copyright 2017 Capitar IT Group BV +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# PAIRv1 protocol + +if (NNG_PROTO_PAIR1) + set(PAIR1_SOURCES protocol/pair1/pair.c protocol/pair1/pair.h) + install(FILES pair.h DESTINATION include/nng/protocol/pair1) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PAIR1_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c new file mode 100644 index 00000000..3f6f63fc --- /dev/null +++ b/src/protocol/pair1/pair.c @@ -0,0 +1,519 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" + +#include "protocol/pair1/pair.h" + +// Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern, +// usually, but it can support a polyamorous mode where a single server can +// communicate with multiple partners. + +#ifndef NNI_PROTO_PAIR_V1 +#define NNI_PROTO_PAIR_V1 NNI_PROTO(1, 1) +#endif + +typedef struct pair1_pipe pair1_pipe; +typedef struct pair1_sock pair1_sock; + +static void pair1_sock_getq_cb(void *); +static void pair1_pipe_send_cb(void *); +static void pair1_pipe_recv_cb(void *); +static void pair1_pipe_getq_cb(void *); +static void pair1_pipe_putq_cb(void *); +static void pair1_pipe_fini(void *); + +// pair1_sock is our per-socket protocol private structure. +struct pair1_sock { + nni_msgq * uwq; + nni_msgq * urq; + int raw; + int ttl; + nni_mtx mtx; + nni_idhash *pipes; + nni_list plist; + int started; + int poly; + nni_aio * aio_getq; +}; + +// pair1_pipe is our per-pipe protocol private structure. +struct pair1_pipe { + nni_pipe * npipe; + pair1_sock * psock; + nni_msgq * sendq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_list_node node; +}; + +static void +pair1_sock_fini(void *arg) +{ + pair1_sock *s = arg; + + nni_aio_fini(s->aio_getq); + nni_idhash_fini(s->pipes); + nni_mtx_fini(&s->mtx); + + NNI_FREE_STRUCT(s); +} + +static int +pair1_sock_init(void **sp, nni_sock *nsock) +{ + pair1_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_idhash_init(&s->pipes)) != 0) { + NNI_FREE_STRUCT(s); + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&s->plist, pair1_pipe, node); + + // Raw mode uses this. + nni_mtx_init(&s->mtx); + + if ((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) { + pair1_sock_fini(s); + return (rv); + } + + s->raw = 0; + s->poly = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + s->ttl = 8; + *sp = s; + + return (0); +} + +static void +pair1_pipe_fini(void *arg) +{ + pair1_pipe *p = arg; + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_getq); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + +static int +pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock) +{ + pair1_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || + ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { + pair1_pipe_fini(p); + return (NNG_ENOMEM); + } + + p->npipe = npipe; + p->psock = psock; + *pp = p; + + return (rv); +} + +static int +pair1_pipe_start(void *arg) +{ + pair1_pipe *p = arg; + pair1_sock *s = p->psock; + uint32_t id; + int rv; + + id = nni_pipe_id(p->npipe); + nni_mtx_lock(&s->mtx); + if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) { + nni_mtx_unlock(&s->mtx); + return (rv); + } + if (!s->poly) { + if (!nni_list_empty(&s->plist)) { + nni_idhash_remove(s->pipes, id); + nni_mtx_unlock(&s->mtx); + return (NNG_EBUSY); + } + } else { + if (!s->started) { + nni_msgq_aio_get(s->uwq, s->aio_getq); + } + } + nni_list_append(&s->plist, p); + s->started = 1; + nni_mtx_unlock(&s->mtx); + + // Schedule a getq. In polyamorous mode we get on the per pipe + // sendq, as the socket distributes to us. In monogamous mode + // we bypass and get from the upper writeq directly (saving a + // set of context switches). + if (s->poly) { + nni_msgq_aio_get(p->sendq, p->aio_getq); + } else { + nni_msgq_aio_get(s->uwq, p->aio_getq); + } + // And the pipe read of course. + nni_pipe_recv(p->npipe, p->aio_recv); + + return (0); +} + +static void +pair1_pipe_stop(void *arg) +{ + pair1_pipe *p = arg; + pair1_sock *s = p->psock; + + nni_mtx_lock(&s->mtx); + nni_idhash_remove(s->pipes, nni_pipe_id(p->npipe)); + nni_list_node_remove(&p->node); + nni_mtx_unlock(&s->mtx); + + nni_msgq_close(p->sendq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_getq); +} + +static void +pair1_pipe_recv_cb(void *arg) +{ + pair1_pipe *p = arg; + pair1_sock *s = p->psock; + nni_msg * msg; + uint32_t hdr; + nni_pipe * npipe = p->npipe; + int rv; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + + // Store the pipe ID. + nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); + + // If the message is missing the hop count header, scrap it. + if (nni_msg_len(msg) < sizeof(uint32_t)) { + nni_msg_free(msg); + nni_pipe_stop(npipe); + return; + } + hdr = nni_msg_trim_u32(msg); + if (hdr & 0xffffff00) { + nni_msg_free(msg); + nni_pipe_stop(npipe); + return; + } + + // If we bounced too many times, discard the message, but + // keep getting more. + if (hdr > (unsigned) s->ttl) { + nni_msg_free(msg); + nni_pipe_recv(npipe, p->aio_recv); + return; + } + + // Store the hop count in the header. + if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) { + nni_msg_free(msg); + nni_pipe_recv(npipe, p->aio_recv); + return; + } + + // Send the message up. + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(s->urq, p->aio_putq); +} + +static void +pair1_sock_getq_cb(void *arg) +{ + pair1_pipe *p; + pair1_sock *s = arg; + nni_msg * msg; + uint32_t id; + + if (nni_aio_result(s->aio_getq) != 0) { + // Socket closing... + return; + } + + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); + + // By definition we are in polyamorous mode. + NNI_ASSERT(s->poly); + + p = NULL; + nni_mtx_lock(&s->mtx); + // If no pipe was requested, we look for any connected peer. + if (((id = nni_msg_get_pipe(msg)) == 0) && + (!nni_list_empty(&s->plist))) { + p = nni_list_first(&s->plist); + } else { + nni_idhash_find(s->pipes, id, (void **) &p); + } + if (p == NULL) { + // Pipe not present! + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + nni_msgq_aio_get(s->uwq, s->aio_getq); + return; + } + + // Try a non-blocking send. If this fails we just discard the + // message. We have to do this to avoid head-of-line blocking + // for messages sent to other pipes. Note that there is some + // buffering in the sendq. + if (nni_msgq_tryput(p->sendq, msg) != 0) { + nni_msg_free(msg); + } + + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +pair1_pipe_putq_cb(void *arg) +{ + pair1_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->npipe); + return; + } + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static void +pair1_pipe_getq_cb(void *arg) +{ + pair1_pipe *p = arg; + pair1_sock *s = p->psock; + nni_msg * msg; + uint32_t hops; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + msg = nni_aio_get_msg(p->aio_getq); + nni_aio_set_msg(p->aio_getq, NULL); + + // Raw mode messages have the header already formed, with + // a hop count. Cooked mode messages have no + // header so we have to add one. + if (s->raw) { + if (nni_msg_header_len(msg) != sizeof(uint32_t)) { + goto badmsg; + } + hops = nni_msg_header_trim_u32(msg); + } else { + // Strip off any previously existing header, such as when + // replying to messages. + nni_msg_header_clear(msg); + hops = 0; + } + + hops++; + + // Insert the hops header. + if (nni_msg_header_append_u32(msg, hops) != 0) { + goto badmsg; + } + + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->npipe, p->aio_send); + return; + +badmsg: + nni_msg_free(msg); + nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq); +} + +static void +pair1_pipe_send_cb(void *arg) +{ + pair1_pipe *p = arg; + pair1_sock *s = p->psock; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); + return; + } + + // In polyamorous mode, we want to get from the sendq; in + // monogamous we get from upper writeq. + nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq); +} + +static void +pair1_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +pair1_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static int +pair1_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + pair1_sock *s = arg; + int rv; + nni_mtx_lock(&s->mtx); + rv = s->started ? NNG_ESTATE : nni_setopt_int(&s->raw, buf, sz, 0, 1); + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +pair1_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + pair1_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static int +pair1_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +{ + pair1_sock *s = arg; + int rv; + nni_mtx_lock(&s->mtx); // Have to be locked against recv cb. + rv = nni_setopt_int(&s->ttl, buf, sz, 1, 255); + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +pair1_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +{ + pair1_sock *s = arg; + return (nni_getopt_int(s->ttl, buf, szp)); +} + +static int +pair1_sock_setopt_poly(void *arg, const void *buf, size_t sz) +{ + pair1_sock *s = arg; + int rv; + nni_mtx_lock(&s->mtx); + rv = s->started ? NNG_ESTATE : nni_setopt_int(&s->poly, buf, sz, 0, 1); + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +pair1_sock_getopt_poly(void *arg, void *buf, size_t *szp) +{ + pair1_sock *s = arg; + return (nni_getopt_int(s->poly, buf, szp)); +} + +static void +pair1_sock_send(void *arg, nni_aio *aio) +{ + pair1_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static void +pair1_sock_recv(void *arg, nni_aio *aio) +{ + pair1_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static nni_proto_pipe_ops pair1_pipe_ops = { + .pipe_init = pair1_pipe_init, + .pipe_fini = pair1_pipe_fini, + .pipe_start = pair1_pipe_start, + .pipe_stop = pair1_pipe_stop, +}; + +static nni_proto_sock_option pair1_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = pair1_sock_getopt_raw, + .pso_setopt = pair1_sock_setopt_raw, + }, + { + .pso_name = NNG_OPT_MAXTTL, + .pso_getopt = pair1_sock_getopt_maxttl, + .pso_setopt = pair1_sock_setopt_maxttl, + }, + { + .pso_name = NNG_OPT_PAIR1_POLY, + .pso_getopt = pair1_sock_getopt_poly, + .pso_setopt = pair1_sock_setopt_poly, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops pair1_sock_ops = { + .sock_init = pair1_sock_init, + .sock_fini = pair1_sock_fini, + .sock_open = pair1_sock_open, + .sock_close = pair1_sock_close, + .sock_recv = pair1_sock_recv, + .sock_send = pair1_sock_send, + .sock_options = pair1_sock_options, +}; + +static nni_proto pair1_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PAIR_V1, "pair1" }, + .proto_peer = { NNI_PROTO_PAIR_V1, "pair1" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &pair1_sock_ops, + .proto_pipe_ops = &pair1_pipe_ops, +}; + +int +nng_pair1_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pair1_proto)); +} diff --git a/src/protocol/pair1/pair.h b/src/protocol/pair1/pair.h new file mode 100644 index 00000000..bc519d9f --- /dev/null +++ b/src/protocol/pair1/pair.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PAIR1_PAIR_H +#define NNG_PROTOCOL_PAIR1_PAIR_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pair1_open(nng_socket *); + +#ifndef nng_pair_open +#define nng_pair_open nng_pair1_open +#endif + +#define NNG_OPT_PAIR1_POLY "pair1:polyamorous" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PAIR1_PAIR_H diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c deleted file mode 100644 index 9685f0a1..00000000 --- a/src/protocol/pipeline/pull.c +++ /dev/null @@ -1,242 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include - -#include "core/nng_impl.h" - -// Pull protocol. The PULL protocol is the "read" side of a pipeline. - -typedef struct pull_pipe pull_pipe; -typedef struct pull_sock pull_sock; - -static void pull_putq_cb(void *); -static void pull_recv_cb(void *); -static void pull_putq(pull_pipe *, nni_msg *); - -// A pull_sock is our per-socket protocol private structure. -struct pull_sock { - nni_msgq *urq; - int raw; -}; - -// A pull_pipe is our per-pipe protocol private structure. -struct pull_pipe { - nni_pipe * pipe; - pull_sock *pull; - nni_aio * putq_aio; - nni_aio * recv_aio; -}; - -static int -pull_sock_init(void **sp, nni_sock *sock) -{ - pull_sock *s; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - s->raw = 0; - s->urq = nni_sock_recvq(sock); - - *sp = s; - return (0); -} - -static void -pull_sock_fini(void *arg) -{ - pull_sock *s = arg; - - NNI_FREE_STRUCT(s); -} - -static void -pull_pipe_fini(void *arg) -{ - pull_pipe *p = arg; - - nni_aio_fini(p->putq_aio); - nni_aio_fini(p->recv_aio); - NNI_FREE_STRUCT(p); -} - -static int -pull_pipe_init(void **pp, nni_pipe *pipe, void *s) -{ - pull_pipe *p; - int rv; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - if (((rv = nni_aio_init(&p->putq_aio, pull_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->recv_aio, pull_recv_cb, p)) != 0)) { - pull_pipe_fini(p); - return (rv); - } - - p->pipe = pipe; - p->pull = s; - *pp = p; - return (0); -} - -static int -pull_pipe_start(void *arg) -{ - pull_pipe *p = arg; - - // Start the pending pull... - nni_pipe_recv(p->pipe, p->recv_aio); - - return (0); -} - -static void -pull_pipe_stop(void *arg) -{ - pull_pipe *p = arg; - - nni_aio_stop(p->putq_aio); - nni_aio_stop(p->recv_aio); -} - -static void -pull_recv_cb(void *arg) -{ - pull_pipe *p = arg; - nni_aio * aio = p->recv_aio; - nni_msg * msg; - - if (nni_aio_result(aio) != 0) { - // Failed to get a message, probably the pipe is closed. - nni_pipe_stop(p->pipe); - return; - } - - // Got a message... start the put to send it up to the application. - msg = nni_aio_get_msg(aio); - nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - nni_aio_set_msg(aio, NULL); - pull_putq(p, msg); -} - -static void -pull_putq_cb(void *arg) -{ - pull_pipe *p = arg; - nni_aio * aio = p->putq_aio; - - if (nni_aio_result(aio) != 0) { - // If we failed to put, probably NNG_ECLOSED, nothing else - // we can do. Just close the pipe. - nni_msg_free(nni_aio_get_msg(aio)); - nni_aio_set_msg(aio, NULL); - nni_pipe_stop(p->pipe); - return; - } - - nni_pipe_recv(p->pipe, p->recv_aio); -} - -// nni_pull_putq schedules a put operation to the user socket (sendup). -static void -pull_putq(pull_pipe *p, nni_msg *msg) -{ - pull_sock *s = p->pull; - - nni_aio_set_msg(p->putq_aio, msg); - - nni_msgq_aio_put(s->urq, p->putq_aio); -} - -static void -pull_sock_open(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static void -pull_sock_close(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static int -pull_sock_setopt_raw(void *arg, const void *buf, size_t sz) -{ - pull_sock *s = arg; - return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); -} - -static int -pull_sock_getopt_raw(void *arg, void *buf, size_t *szp) -{ - pull_sock *s = arg; - return (nni_getopt_int(s->raw, buf, szp)); -} - -static void -pull_sock_send(void *arg, nni_aio *aio) -{ - nni_aio_finish_error(aio, NNG_ENOTSUP); -} - -static void -pull_sock_recv(void *arg, nni_aio *aio) -{ - pull_sock *s = arg; - - nni_msgq_aio_get(s->urq, aio); -} - -static nni_proto_pipe_ops pull_pipe_ops = { - .pipe_init = pull_pipe_init, - .pipe_fini = pull_pipe_fini, - .pipe_start = pull_pipe_start, - .pipe_stop = pull_pipe_stop, -}; - -static nni_proto_sock_option pull_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_getopt = pull_sock_getopt_raw, - .pso_setopt = pull_sock_setopt_raw, - }, - // terminate list - { NULL, NULL, NULL }, -}; - -static nni_proto_sock_ops pull_sock_ops = { - .sock_init = pull_sock_init, - .sock_fini = pull_sock_fini, - .sock_open = pull_sock_open, - .sock_close = pull_sock_close, - .sock_send = pull_sock_send, - .sock_recv = pull_sock_recv, - .sock_options = pull_sock_options, -}; - -static nni_proto pull_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_PULL_V0, "pull" }, - .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, - .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_pipe_ops = &pull_pipe_ops, - .proto_sock_ops = &pull_sock_ops, -}; - -int -nng_pull0_open(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &pull_proto)); -} diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c deleted file mode 100644 index 9ff74558..00000000 --- a/src/protocol/pipeline/push.c +++ /dev/null @@ -1,259 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include - -#include "core/nng_impl.h" - -// Push protocol. The PUSH protocol is the "write" side of a pipeline. -// Push distributes fairly, or tries to, by giving messages in round-robin -// order. - -typedef struct push_pipe push_pipe; -typedef struct push_sock push_sock; - -static void push_send_cb(void *); -static void push_recv_cb(void *); -static void push_getq_cb(void *); - -// An nni_push_sock is our per-socket protocol private structure. -struct push_sock { - nni_msgq *uwq; - int raw; -}; - -// An nni_push_pipe is our per-pipe protocol private structure. -struct push_pipe { - nni_pipe * pipe; - push_sock * push; - nni_list_node node; - - nni_aio *aio_recv; - nni_aio *aio_send; - nni_aio *aio_getq; -}; - -static int -push_sock_init(void **sp, nni_sock *sock) -{ - push_sock *s; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - s->raw = 0; - s->uwq = nni_sock_sendq(sock); - *sp = s; - return (0); -} - -static void -push_sock_fini(void *arg) -{ - push_sock *s = arg; - - NNI_FREE_STRUCT(s); -} - -static void -push_sock_open(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static void -push_sock_close(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static void -push_pipe_fini(void *arg) -{ - push_pipe *p = arg; - - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_getq); - NNI_FREE_STRUCT(p); -} - -static int -push_pipe_init(void **pp, nni_pipe *pipe, void *s) -{ - push_pipe *p; - int rv; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - if (((rv = nni_aio_init(&p->aio_recv, push_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, push_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, push_getq_cb, p)) != 0)) { - push_pipe_fini(p); - return (rv); - } - NNI_LIST_NODE_INIT(&p->node); - p->pipe = pipe; - p->push = s; - *pp = p; - return (0); -} - -static int -push_pipe_start(void *arg) -{ - push_pipe *p = arg; - push_sock *s = p->push; - - if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) { - return (NNG_EPROTO); - } - - // Schedule a receiver. This is mostly so that we can detect - // a closed transport pipe. - nni_pipe_recv(p->pipe, p->aio_recv); - - // Schedule a sender. - nni_msgq_aio_get(s->uwq, p->aio_getq); - - return (0); -} - -static void -push_pipe_stop(void *arg) -{ - push_pipe *p = arg; - - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_getq); -} - -static void -push_recv_cb(void *arg) -{ - push_pipe *p = arg; - - // We normally expect to receive an error. If a pipe actually - // sends us data, we just discard it. - if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); - return; - } - nni_msg_free(nni_aio_get_msg(p->aio_recv)); - nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_recv(p->pipe, p->aio_recv); -} - -static void -push_send_cb(void *arg) -{ - push_pipe *p = arg; - push_sock *s = p->push; - - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); - return; - } - - nni_msgq_aio_get(s->uwq, p->aio_getq); -} - -static void -push_getq_cb(void *arg) -{ - push_pipe *p = arg; - nni_aio * aio = p->aio_getq; - - if (nni_aio_result(aio) != 0) { - // If the socket is closing, nothing else we can do. - nni_pipe_stop(p->pipe); - return; - } - - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(aio)); - nni_aio_set_msg(aio, NULL); - - nni_pipe_send(p->pipe, p->aio_send); -} - -static int -push_sock_setopt_raw(void *arg, const void *buf, size_t sz) -{ - push_sock *s = arg; - return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); -} - -static int -push_sock_getopt_raw(void *arg, void *buf, size_t *szp) -{ - push_sock *s = arg; - return (nni_getopt_int(s->raw, buf, szp)); -} - -static void -push_sock_send(void *arg, nni_aio *aio) -{ - push_sock *s = arg; - - nni_msgq_aio_put(s->uwq, aio); -} - -static void -push_sock_recv(void *arg, nni_aio *aio) -{ - nni_aio_finish_error(aio, NNG_ENOTSUP); -} - -static nni_proto_pipe_ops push_pipe_ops = { - .pipe_init = push_pipe_init, - .pipe_fini = push_pipe_fini, - .pipe_start = push_pipe_start, - .pipe_stop = push_pipe_stop, -}; - -static nni_proto_sock_option push_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_getopt = push_sock_getopt_raw, - .pso_setopt = push_sock_setopt_raw, - }, - // terminate list - { NULL, NULL, NULL }, -}; - -static nni_proto_sock_ops push_sock_ops = { - .sock_init = push_sock_init, - .sock_fini = push_sock_fini, - .sock_open = push_sock_open, - .sock_close = push_sock_close, - .sock_options = push_sock_options, - .sock_send = push_sock_send, - .sock_recv = push_sock_recv, -}; - -static nni_proto push_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_PUSH_V0, "push" }, - .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, - .proto_flags = NNI_PROTO_FLAG_SND, - .proto_pipe_ops = &push_pipe_ops, - .proto_sock_ops = &push_sock_ops, -}; - -int -nng_push0_open(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &push_proto)); -} diff --git a/src/protocol/pipeline0/CMakeLists.txt b/src/protocol/pipeline0/CMakeLists.txt new file mode 100644 index 00000000..6153c5a7 --- /dev/null +++ b/src/protocol/pipeline0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore +# Copyright 2017 Capitar IT Group BV +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Pub/Sub protocol + +if (NNG_PROTO_PUSH0) + set(PUSH0_SOURCES protocol/pipeline0/push.c protocol/pipeline0/push.h) + install(FILES push.h DESTINATION include/nng/protocol/pipeline0) +endif() + +if (NNG_PROTO_PULL0) + set(PULL0_SOURCES protocol/pipeline0/pull.c protocol/pipeline0/pull.h) + install(FILES pull.h DESTINATION include/nng/protocol/pipeline0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PUSH0_SOURCES} ${PULL0_SOURCES} PARENT_SCOPE) \ No newline at end of file diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c new file mode 100644 index 00000000..8c16cb17 --- /dev/null +++ b/src/protocol/pipeline0/pull.c @@ -0,0 +1,250 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" + +// Pull protocol. The PULL protocol is the "read" side of a pipeline. + +#ifndef NNI_PROTO_PULL_V0 +#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) +#endif + +#ifndef NNI_PROTO_PUSH_V0 +#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) +#endif + +typedef struct pull0_pipe pull0_pipe; +typedef struct pull0_sock pull0_sock; + +static void pull0_putq_cb(void *); +static void pull0_recv_cb(void *); +static void pull0_putq(pull0_pipe *, nni_msg *); + +// pull0_sock is our per-socket protocol private structure. +struct pull0_sock { + nni_msgq *urq; + int raw; +}; + +// pull0_pipe is our per-pipe protocol private structure. +struct pull0_pipe { + nni_pipe * pipe; + pull0_sock *pull; + nni_aio * putq_aio; + nni_aio * recv_aio; +}; + +static int +pull0_sock_init(void **sp, nni_sock *sock) +{ + pull0_sock *s; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + s->raw = 0; + s->urq = nni_sock_recvq(sock); + + *sp = s; + return (0); +} + +static void +pull0_sock_fini(void *arg) +{ + pull0_sock *s = arg; + + NNI_FREE_STRUCT(s); +} + +static void +pull0_pipe_fini(void *arg) +{ + pull0_pipe *p = arg; + + nni_aio_fini(p->putq_aio); + nni_aio_fini(p->recv_aio); + NNI_FREE_STRUCT(p); +} + +static int +pull0_pipe_init(void **pp, nni_pipe *pipe, void *s) +{ + pull0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) { + pull0_pipe_fini(p); + return (rv); + } + + p->pipe = pipe; + p->pull = s; + *pp = p; + return (0); +} + +static int +pull0_pipe_start(void *arg) +{ + pull0_pipe *p = arg; + + // Start the pending pull... + nni_pipe_recv(p->pipe, p->recv_aio); + + return (0); +} + +static void +pull0_pipe_stop(void *arg) +{ + pull0_pipe *p = arg; + + nni_aio_stop(p->putq_aio); + nni_aio_stop(p->recv_aio); +} + +static void +pull0_recv_cb(void *arg) +{ + pull0_pipe *p = arg; + nni_aio * aio = p->recv_aio; + nni_msg * msg; + + if (nni_aio_result(aio) != 0) { + // Failed to get a message, probably the pipe is closed. + nni_pipe_stop(p->pipe); + return; + } + + // Got a message... start the put to send it up to the application. + msg = nni_aio_get_msg(aio); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + nni_aio_set_msg(aio, NULL); + pull0_putq(p, msg); +} + +static void +pull0_putq_cb(void *arg) +{ + pull0_pipe *p = arg; + nni_aio * aio = p->putq_aio; + + if (nni_aio_result(aio) != 0) { + // If we failed to put, probably NNG_ECLOSED, nothing else + // we can do. Just close the pipe. + nni_msg_free(nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); + nni_pipe_stop(p->pipe); + return; + } + + nni_pipe_recv(p->pipe, p->recv_aio); +} + +// pull0_putq schedules a put operation to the user socket (sendup). +static void +pull0_putq(pull0_pipe *p, nni_msg *msg) +{ + pull0_sock *s = p->pull; + + nni_aio_set_msg(p->putq_aio, msg); + + nni_msgq_aio_put(s->urq, p->putq_aio); +} + +static void +pull0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +pull0_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static int +pull0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + pull0_sock *s = arg; + return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); +} + +static int +pull0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + pull0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static void +pull0_sock_send(void *arg, nni_aio *aio) +{ + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + +static void +pull0_sock_recv(void *arg, nni_aio *aio) +{ + pull0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static nni_proto_pipe_ops pull0_pipe_ops = { + .pipe_init = pull0_pipe_init, + .pipe_fini = pull0_pipe_fini, + .pipe_start = pull0_pipe_start, + .pipe_stop = pull0_pipe_stop, +}; + +static nni_proto_sock_option pull0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = pull0_sock_getopt_raw, + .pso_setopt = pull0_sock_setopt_raw, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops pull0_sock_ops = { + .sock_init = pull0_sock_init, + .sock_fini = pull0_sock_fini, + .sock_open = pull0_sock_open, + .sock_close = pull0_sock_close, + .sock_send = pull0_sock_send, + .sock_recv = pull0_sock_recv, + .sock_options = pull0_sock_options, +}; + +static nni_proto pull0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PULL_V0, "pull" }, + .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, + .proto_flags = NNI_PROTO_FLAG_RCV, + .proto_pipe_ops = &pull0_pipe_ops, + .proto_sock_ops = &pull0_sock_ops, +}; + +int +nng_pull0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pull0_proto)); +} diff --git a/src/protocol/pipeline0/pull.h b/src/protocol/pipeline0/pull.h new file mode 100644 index 00000000..75bded03 --- /dev/null +++ b/src/protocol/pipeline0/pull.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PIPELINE0_PULL_H +#define NNG_PROTOCOL_PIPELINE0_PULL_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pull0_open(nng_socket *); + +#ifndef nng_pull_open +#define nng_pull_open nng_pull0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PIPELINE0_PULL_H diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c new file mode 100644 index 00000000..3dd83fe0 --- /dev/null +++ b/src/protocol/pipeline0/push.c @@ -0,0 +1,267 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" + +// Push protocol. The PUSH protocol is the "write" side of a pipeline. +// Push distributes fairly, or tries to, by giving messages in round-robin +// order. + +#ifndef NNI_PROTO_PULL_V0 +#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) +#endif + +#ifndef NNI_PROTO_PUSH_V0 +#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) +#endif + +typedef struct push0_pipe push0_pipe; +typedef struct push0_sock push0_sock; + +static void push0_send_cb(void *); +static void push0_recv_cb(void *); +static void push0_getq_cb(void *); + +// push0_sock is our per-socket protocol private structure. +struct push0_sock { + nni_msgq *uwq; + int raw; +}; + +// push0_pipe is our per-pipe protocol private structure. +struct push0_pipe { + nni_pipe * pipe; + push0_sock * push; + nni_list_node node; + + nni_aio *aio_recv; + nni_aio *aio_send; + nni_aio *aio_getq; +}; + +static int +push0_sock_init(void **sp, nni_sock *sock) +{ + push0_sock *s; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + s->raw = 0; + s->uwq = nni_sock_sendq(sock); + *sp = s; + return (0); +} + +static void +push0_sock_fini(void *arg) +{ + push0_sock *s = arg; + + NNI_FREE_STRUCT(s); +} + +static void +push0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +push0_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +push0_pipe_fini(void *arg) +{ + push0_pipe *p = arg; + + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_getq); + NNI_FREE_STRUCT(p); +} + +static int +push0_pipe_init(void **pp, nni_pipe *pipe, void *s) +{ + push0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) { + push0_pipe_fini(p); + return (rv); + } + NNI_LIST_NODE_INIT(&p->node); + p->pipe = pipe; + p->push = s; + *pp = p; + return (0); +} + +static int +push0_pipe_start(void *arg) +{ + push0_pipe *p = arg; + push0_sock *s = p->push; + + if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) { + return (NNG_EPROTO); + } + + // Schedule a receiver. This is mostly so that we can detect + // a closed transport pipe. + nni_pipe_recv(p->pipe, p->aio_recv); + + // Schedule a sender. + nni_msgq_aio_get(s->uwq, p->aio_getq); + + return (0); +} + +static void +push0_pipe_stop(void *arg) +{ + push0_pipe *p = arg; + + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_getq); +} + +static void +push0_recv_cb(void *arg) +{ + push0_pipe *p = arg; + + // We normally expect to receive an error. If a pipe actually + // sends us data, we just discard it. + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->pipe); + return; + } + nni_msg_free(nni_aio_get_msg(p->aio_recv)); + nni_aio_set_msg(p->aio_recv, NULL); + nni_pipe_recv(p->pipe, p->aio_recv); +} + +static void +push0_send_cb(void *arg) +{ + push0_pipe *p = arg; + push0_sock *s = p->push; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->pipe); + return; + } + + nni_msgq_aio_get(s->uwq, p->aio_getq); +} + +static void +push0_getq_cb(void *arg) +{ + push0_pipe *p = arg; + nni_aio * aio = p->aio_getq; + + if (nni_aio_result(aio) != 0) { + // If the socket is closing, nothing else we can do. + nni_pipe_stop(p->pipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); + + nni_pipe_send(p->pipe, p->aio_send); +} + +static int +push0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + push0_sock *s = arg; + return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); +} + +static int +push0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + push0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static void +push0_sock_send(void *arg, nni_aio *aio) +{ + push0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static void +push0_sock_recv(void *arg, nni_aio *aio) +{ + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + +static nni_proto_pipe_ops push0_pipe_ops = { + .pipe_init = push0_pipe_init, + .pipe_fini = push0_pipe_fini, + .pipe_start = push0_pipe_start, + .pipe_stop = push0_pipe_stop, +}; + +static nni_proto_sock_option push0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = push0_sock_getopt_raw, + .pso_setopt = push0_sock_setopt_raw, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops push0_sock_ops = { + .sock_init = push0_sock_init, + .sock_fini = push0_sock_fini, + .sock_open = push0_sock_open, + .sock_close = push0_sock_close, + .sock_options = push0_sock_options, + .sock_send = push0_sock_send, + .sock_recv = push0_sock_recv, +}; + +static nni_proto push0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUSH_V0, "push" }, + .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, + .proto_flags = NNI_PROTO_FLAG_SND, + .proto_pipe_ops = &push0_pipe_ops, + .proto_sock_ops = &push0_sock_ops, +}; + +int +nng_push0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &push0_proto)); +} diff --git a/src/protocol/pipeline0/push.h b/src/protocol/pipeline0/push.h new file mode 100644 index 00000000..c7303b92 --- /dev/null +++ b/src/protocol/pipeline0/push.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PIPELINE0_PUSH_H +#define NNG_PROTOCOL_PIPELINE0_PUSH_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_push0_open(nng_socket *); + +#ifndef nng_push_open +#define nng_push_open nng_push0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PIPELINE0_PUSH_H diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c deleted file mode 100644 index 9e5cd67f..00000000 --- a/src/protocol/pubsub/pub.c +++ /dev/null @@ -1,335 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include - -#include "core/nng_impl.h" - -// Publish protocol. The PUB protocol simply sends messages out, as -// a broadcast. It has nothing more sophisticated because it does not -// perform sender-side filtering. Its best effort delivery, so anything -// that can't receive the message won't get one. - -typedef struct pub_pipe pub_pipe; -typedef struct pub_sock pub_sock; - -static void pub_pipe_recv_cb(void *); -static void pub_pipe_send_cb(void *); -static void pub_pipe_getq_cb(void *); -static void pub_sock_getq_cb(void *); -static void pub_sock_fini(void *); -static void pub_pipe_fini(void *); - -// A pub_sock is our per-socket protocol private structure. -struct pub_sock { - nni_msgq *uwq; - int raw; - nni_aio * aio_getq; - nni_list pipes; - nni_mtx mtx; -}; - -// A pub_pipe is our per-pipe protocol private structure. -struct pub_pipe { - nni_pipe * pipe; - pub_sock * pub; - nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_send; - nni_aio * aio_recv; - nni_list_node node; -}; - -static void -pub_sock_fini(void *arg) -{ - pub_sock *s = arg; - - nni_aio_stop(s->aio_getq); - nni_aio_fini(s->aio_getq); - nni_mtx_fini(&s->mtx); - NNI_FREE_STRUCT(s); -} - -static int -pub_sock_init(void **sp, nni_sock *sock) -{ - pub_sock *s; - int rv; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, pub_sock_getq_cb, s)) != 0) { - pub_sock_fini(s); - return (rv); - } - - s->raw = 0; - NNI_LIST_INIT(&s->pipes, pub_pipe, node); - - s->uwq = nni_sock_sendq(sock); - - *sp = s; - return (0); -} - -static void -pub_sock_open(void *arg) -{ - pub_sock *s = arg; - - nni_msgq_aio_get(s->uwq, s->aio_getq); -} - -static void -pub_sock_close(void *arg) -{ - pub_sock *s = arg; - - nni_aio_cancel(s->aio_getq, NNG_ECLOSED); -} - -static void -pub_pipe_fini(void *arg) -{ - pub_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_msgq_fini(p->sendq); - NNI_FREE_STRUCT(p); -} - -static int -pub_pipe_init(void **pp, nni_pipe *pipe, void *s) -{ - pub_pipe *p; - int rv; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - - // XXX: consider making this depth tunable - if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pub_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pub_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pub_pipe_recv_cb, p)) != 0)) { - - pub_pipe_fini(p); - return (rv); - } - - p->pipe = pipe; - p->pub = s; - *pp = p; - return (0); -} - -static int -pub_pipe_start(void *arg) -{ - pub_pipe *p = arg; - pub_sock *s = p->pub; - - if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) { - return (NNG_EPROTO); - } - nni_mtx_lock(&s->mtx); - nni_list_append(&s->pipes, p); - nni_mtx_unlock(&s->mtx); - - // Start the receiver and the queue reader. - nni_pipe_recv(p->pipe, p->aio_recv); - nni_msgq_aio_get(p->sendq, p->aio_getq); - - return (0); -} - -static void -pub_pipe_stop(void *arg) -{ - pub_pipe *p = arg; - pub_sock *s = p->pub; - - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - - nni_msgq_close(p->sendq); - - nni_mtx_lock(&s->mtx); - if (nni_list_active(&s->pipes, p)) { - nni_list_remove(&s->pipes, p); - } - nni_mtx_unlock(&s->mtx); -} - -static void -pub_sock_getq_cb(void *arg) -{ - pub_sock *s = arg; - nni_msgq *uwq = s->uwq; - nni_msg * msg, *dup; - - pub_pipe *p; - pub_pipe *last; - int rv; - - if (nni_aio_result(s->aio_getq) != 0) { - return; - } - - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); - - nni_mtx_lock(&s->mtx); - last = nni_list_last(&s->pipes); - NNI_LIST_FOREACH (&s->pipes, p) { - if (p != last) { - rv = nni_msg_dup(&dup, msg); - if (rv != 0) { - continue; - } - } else { - dup = msg; - } - if ((rv = nni_msgq_tryput(p->sendq, dup)) != 0) { - nni_msg_free(dup); - } - } - nni_mtx_unlock(&s->mtx); - - if (last == NULL) { - nni_msg_free(msg); - } - - nni_msgq_aio_get(uwq, s->aio_getq); -} - -static void -pub_pipe_recv_cb(void *arg) -{ - pub_pipe *p = arg; - - if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); - return; - } - - nni_msg_free(nni_aio_get_msg(p->aio_recv)); - nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_recv(p->pipe, p->aio_recv); -} - -static void -pub_pipe_getq_cb(void *arg) -{ - pub_pipe *p = arg; - - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); - return; - } - - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - nni_pipe_send(p->pipe, p->aio_send); -} - -static void -pub_pipe_send_cb(void *arg) -{ - pub_pipe *p = arg; - - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); - return; - } - - nni_aio_set_msg(p->aio_send, NULL); - nni_msgq_aio_get(p->sendq, p->aio_getq); -} - -static int -pub_sock_setopt_raw(void *arg, const void *buf, size_t sz) -{ - pub_sock *s = arg; - return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); -} - -static int -pub_sock_getopt_raw(void *arg, void *buf, size_t *szp) -{ - pub_sock *s = arg; - return (nni_getopt_int(s->raw, buf, szp)); -} - -static void -pub_sock_recv(void *arg, nni_aio *aio) -{ - nni_aio_finish_error(aio, NNG_ENOTSUP); -} - -static void -pub_sock_send(void *arg, nni_aio *aio) -{ - pub_sock *s = arg; - - nni_msgq_aio_put(s->uwq, aio); -} - -static nni_proto_pipe_ops pub_pipe_ops = { - .pipe_init = pub_pipe_init, - .pipe_fini = pub_pipe_fini, - .pipe_start = pub_pipe_start, - .pipe_stop = pub_pipe_stop, -}; - -static nni_proto_sock_option pub_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_getopt = pub_sock_getopt_raw, - .pso_setopt = pub_sock_setopt_raw, - }, - // terminate list - { NULL, NULL, NULL }, -}; - -static nni_proto_sock_ops pub_sock_ops = { - .sock_init = pub_sock_init, - .sock_fini = pub_sock_fini, - .sock_open = pub_sock_open, - .sock_close = pub_sock_close, - .sock_send = pub_sock_send, - .sock_recv = pub_sock_recv, - .sock_options = pub_sock_options, -}; - -static nni_proto pub_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_PUB_V0, "pub" }, - .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, - .proto_flags = NNI_PROTO_FLAG_SND, - .proto_sock_ops = &pub_sock_ops, - .proto_pipe_ops = &pub_pipe_ops, -}; - -int -nng_pub0_open(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &pub_proto)); -} diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c deleted file mode 100644 index 555d528e..00000000 --- a/src/protocol/pubsub/sub.c +++ /dev/null @@ -1,398 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include - -#include "core/nng_impl.h" - -const char *nng_opt_sub_subscribe = NNG_OPT_SUB_SUBSCRIBE; -const char *nng_opt_sub_unsubscribe = NNG_OPT_SUB_UNSUBSCRIBE; - -// Subscriber protocol. The SUB protocol receives messages sent to -// it from publishers, and filters out those it is not interested in, -// only passing up ones that match known subscriptions. - -typedef struct sub_pipe sub_pipe; -typedef struct sub_sock sub_sock; -typedef struct sub_topic sub_topic; - -static void sub_recv_cb(void *); -static void sub_putq_cb(void *); -static void sub_pipe_fini(void *); - -struct sub_topic { - nni_list_node node; - size_t len; - void * buf; -}; - -// An nni_rep_sock is our per-socket protocol private structure. -struct sub_sock { - nni_list topics; - nni_msgq *urq; - int raw; - nni_mtx lk; -}; - -// An nni_rep_pipe is our per-pipe protocol private structure. -struct sub_pipe { - nni_pipe *pipe; - sub_sock *sub; - nni_aio * aio_recv; - nni_aio * aio_putq; -}; - -static int -sub_sock_init(void **sp, nni_sock *sock) -{ - sub_sock *s; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&s->lk); - NNI_LIST_INIT(&s->topics, sub_topic, node); - s->raw = 0; - - s->urq = nni_sock_recvq(sock); - *sp = s; - return (0); -} - -static void -sub_sock_fini(void *arg) -{ - sub_sock * s = arg; - sub_topic *topic; - - while ((topic = nni_list_first(&s->topics)) != NULL) { - nni_list_remove(&s->topics, topic); - nni_free(topic->buf, topic->len); - NNI_FREE_STRUCT(topic); - } - nni_mtx_fini(&s->lk); - NNI_FREE_STRUCT(s); -} - -static void -sub_sock_open(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static void -sub_sock_close(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static void -sub_pipe_fini(void *arg) -{ - sub_pipe *p = arg; - - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_recv); - NNI_FREE_STRUCT(p); -} - -static int -sub_pipe_init(void **pp, nni_pipe *pipe, void *s) -{ - sub_pipe *p; - int rv; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) { - sub_pipe_fini(p); - return (rv); - } - - p->pipe = pipe; - p->sub = s; - *pp = p; - return (0); -} - -static int -sub_pipe_start(void *arg) -{ - sub_pipe *p = arg; - - nni_pipe_recv(p->pipe, p->aio_recv); - return (0); -} - -static void -sub_pipe_stop(void *arg) -{ - sub_pipe *p = arg; - - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_recv); -} - -static void -sub_recv_cb(void *arg) -{ - sub_pipe *p = arg; - sub_sock *s = p->sub; - nni_msgq *urq = s->urq; - nni_msg * msg; - - if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); - return; - } - - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); - nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(urq, p->aio_putq); -} - -static void -sub_putq_cb(void *arg) -{ - sub_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); - return; - } - - nni_pipe_recv(p->pipe, p->aio_recv); -} - -// For now we maintain subscriptions on a sorted linked list. As we do not -// expect to have huge numbers of subscriptions, and as the operation is -// really O(n), we think this is acceptable. In the future we might decide -// to replace this with a patricia trie, like old nanomsg had. - -static int -sub_subscribe(void *arg, const void *buf, size_t sz) -{ - sub_sock * s = arg; - sub_topic *topic; - sub_topic *newtopic; - - nni_mtx_lock(&s->lk); - NNI_LIST_FOREACH (&s->topics, topic) { - int rv; - - if (topic->len >= sz) { - rv = memcmp(topic->buf, buf, sz); - } else { - rv = memcmp(topic->buf, buf, topic->len); - } - if (rv == 0) { - if (topic->len == sz) { - // Already inserted. - nni_mtx_unlock(&s->lk); - return (0); - } - if (topic->len > sz) { - break; - } - } else if (rv > 0) { - break; - } - } - - if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) { - nni_mtx_unlock(&s->lk); - return (NNG_ENOMEM); - } - if ((newtopic->buf = nni_alloc(sz)) == NULL) { - nni_mtx_unlock(&s->lk); - return (NNG_ENOMEM); - } - NNI_LIST_NODE_INIT(&newtopic->node); - newtopic->len = sz; - memcpy(newtopic->buf, buf, sz); - if (topic != NULL) { - nni_list_insert_before(&s->topics, newtopic, topic); - } else { - nni_list_append(&s->topics, newtopic); - } - nni_mtx_unlock(&s->lk); - return (0); -} - -static int -sub_unsubscribe(void *arg, const void *buf, size_t sz) -{ - sub_sock * s = arg; - sub_topic *topic; - int rv; - - nni_mtx_lock(&s->lk); - NNI_LIST_FOREACH (&s->topics, topic) { - if (topic->len >= sz) { - rv = memcmp(topic->buf, buf, sz); - } else { - rv = memcmp(topic->buf, buf, topic->len); - } - if (rv == 0) { - if (topic->len == sz) { - nni_list_remove(&s->topics, topic); - nni_mtx_unlock(&s->lk); - nni_free(topic->buf, topic->len); - NNI_FREE_STRUCT(topic); - return (0); - } - if (topic->len > sz) { - nni_mtx_unlock(&s->lk); - return (NNG_ENOENT); - } - } - if (rv > 0) { - nni_mtx_unlock(&s->lk); - return (NNG_ENOENT); - } - } - nni_mtx_unlock(&s->lk); - return (NNG_ENOENT); -} - -static int -sub_sock_setopt_raw(void *arg, const void *buf, size_t sz) -{ - sub_sock *s = arg; - return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); -} - -static int -sub_sock_getopt_raw(void *arg, void *buf, size_t *szp) -{ - sub_sock *s = arg; - return (nni_getopt_int(s->raw, buf, szp)); -} - -static void -sub_sock_send(void *arg, nni_aio *aio) -{ - nni_aio_finish_error(aio, NNG_ENOTSUP); -} - -static void -sub_sock_recv(void *arg, nni_aio *aio) -{ - sub_sock *s = arg; - - nni_msgq_aio_get(s->urq, aio); -} - -static nni_msg * -sub_sock_filter(void *arg, nni_msg *msg) -{ - sub_sock * s = arg; - sub_topic *topic; - char * body; - size_t len; - int match; - - nni_mtx_lock(&s->lk); - if (s->raw) { - nni_mtx_unlock(&s->lk); - return (msg); - } - - body = nni_msg_body(msg); - len = nni_msg_len(msg); - - match = 0; - // Check to see if the message matches one of our subscriptions. - NNI_LIST_FOREACH (&s->topics, topic) { - if (len >= topic->len) { - int rv = memcmp(topic->buf, body, topic->len); - if (rv == 0) { - // Matched! - match = 1; - break; - } - if (rv > 0) { - match = 0; - break; - } - } else if (memcmp(topic->buf, body, len) >= 0) { - match = 0; - break; - } - } - nni_mtx_unlock(&s->lk); - if (!match) { - nni_msg_free(msg); - return (NULL); - } - return (msg); -} - -// This is the global protocol structure -- our linkage to the core. -// This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops sub_pipe_ops = { - .pipe_init = sub_pipe_init, - .pipe_fini = sub_pipe_fini, - .pipe_start = sub_pipe_start, - .pipe_stop = sub_pipe_stop, -}; - -static nni_proto_sock_option sub_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_getopt = sub_sock_getopt_raw, - .pso_setopt = sub_sock_setopt_raw, - }, - { - .pso_name = NNG_OPT_SUB_SUBSCRIBE, - .pso_getopt = NULL, - .pso_setopt = sub_subscribe, - }, - { - .pso_name = NNG_OPT_SUB_UNSUBSCRIBE, - .pso_getopt = NULL, - .pso_setopt = sub_unsubscribe, - }, - // terminate list - { NULL, NULL, NULL }, -}; - -static nni_proto_sock_ops sub_sock_ops = { - .sock_init = sub_sock_init, - .sock_fini = sub_sock_fini, - .sock_open = sub_sock_open, - .sock_close = sub_sock_close, - .sock_send = sub_sock_send, - .sock_recv = sub_sock_recv, - .sock_filter = sub_sock_filter, - .sock_options = sub_sock_options, -}; - -static nni_proto sub_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_SUB_V0, "sub" }, - .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, - .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_sock_ops = &sub_sock_ops, - .proto_pipe_ops = &sub_pipe_ops, -}; - -int -nng_sub0_open(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &sub_proto)); -} diff --git a/src/protocol/pubsub0/CMakeLists.txt b/src/protocol/pubsub0/CMakeLists.txt new file mode 100644 index 00000000..4edcbfae --- /dev/null +++ b/src/protocol/pubsub0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore +# Copyright 2017 Capitar IT Group BV +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Pub/Sub protocol + +if (NNG_PROTO_PUB0) + set(PUB0_SOURCES protocol/pubsub0/pub.c protocol/pubsub0/pub.h) + install(FILES pub.h DESTINATION include/nng/protocol/pubsub0) +endif() + +if (NNG_PROTO_SUB0) + set(SUB0_SOURCES protocol/pubsub0/sub.c protocol/pubsub0/sub.h) + install(FILES sub.h DESTINATION include/nng/protocol/pubsub0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PUB0_SOURCES} ${SUB0_SOURCES} PARENT_SCOPE) \ No newline at end of file diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c new file mode 100644 index 00000000..f4a33b77 --- /dev/null +++ b/src/protocol/pubsub0/pub.c @@ -0,0 +1,344 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" +#include "protocol/pubsub0/pub.h" + +// Publish protocol. The PUB protocol simply sends messages out, as +// a broadcast. It has nothing more sophisticated because it does not +// perform sender-side filtering. Its best effort delivery, so anything +// that can't receive the message won't get one. + +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif + +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif + +typedef struct pub0_pipe pub0_pipe; +typedef struct pub0_sock pub0_sock; + +static void pub0_pipe_recv_cb(void *); +static void pub0_pipe_send_cb(void *); +static void pub0_pipe_getq_cb(void *); +static void pub0_sock_getq_cb(void *); +static void pub0_sock_fini(void *); +static void pub0_pipe_fini(void *); + +// pub0_sock is our per-socket protocol private structure. +struct pub0_sock { + nni_msgq *uwq; + int raw; + nni_aio * aio_getq; + nni_list pipes; + nni_mtx mtx; +}; + +// pub0_pipe is our per-pipe protocol private structure. +struct pub0_pipe { + nni_pipe * pipe; + pub0_sock * pub; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_list_node node; +}; + +static void +pub0_sock_fini(void *arg) +{ + pub0_sock *s = arg; + + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static int +pub0_sock_init(void **sp, nni_sock *sock) +{ + pub0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->mtx); + if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) { + pub0_sock_fini(s); + return (rv); + } + + s->raw = 0; + NNI_LIST_INIT(&s->pipes, pub0_pipe, node); + + s->uwq = nni_sock_sendq(sock); + + *sp = s; + return (0); +} + +static void +pub0_sock_open(void *arg) +{ + pub0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +pub0_sock_close(void *arg) +{ + pub0_sock *s = arg; + + nni_aio_cancel(s->aio_getq, NNG_ECLOSED); +} + +static void +pub0_pipe_fini(void *arg) +{ + pub0_pipe *p = arg; + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + +static int +pub0_pipe_init(void **pp, nni_pipe *pipe, void *s) +{ + pub0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + + // XXX: consider making this depth tunable + if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { + + pub0_pipe_fini(p); + return (rv); + } + + p->pipe = pipe; + p->pub = s; + *pp = p; + return (0); +} + +static int +pub0_pipe_start(void *arg) +{ + pub0_pipe *p = arg; + pub0_sock *s = p->pub; + + if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) { + return (NNG_EPROTO); + } + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + nni_mtx_unlock(&s->mtx); + + // Start the receiver and the queue reader. + nni_pipe_recv(p->pipe, p->aio_recv); + nni_msgq_aio_get(p->sendq, p->aio_getq); + + return (0); +} + +static void +pub0_pipe_stop(void *arg) +{ + pub0_pipe *p = arg; + pub0_sock *s = p->pub; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + + nni_msgq_close(p->sendq); + + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); + } + nni_mtx_unlock(&s->mtx); +} + +static void +pub0_sock_getq_cb(void *arg) +{ + pub0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg, *dup; + + pub0_pipe *p; + pub0_pipe *last; + int rv; + + if (nni_aio_result(s->aio_getq) != 0) { + return; + } + + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); + + nni_mtx_lock(&s->mtx); + last = nni_list_last(&s->pipes); + NNI_LIST_FOREACH (&s->pipes, p) { + if (p != last) { + rv = nni_msg_dup(&dup, msg); + if (rv != 0) { + continue; + } + } else { + dup = msg; + } + if ((rv = nni_msgq_tryput(p->sendq, dup)) != 0) { + nni_msg_free(dup); + } + } + nni_mtx_unlock(&s->mtx); + + if (last == NULL) { + nni_msg_free(msg); + } + + nni_msgq_aio_get(uwq, s->aio_getq); +} + +static void +pub0_pipe_recv_cb(void *arg) +{ + pub0_pipe *p = arg; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + nni_msg_free(nni_aio_get_msg(p->aio_recv)); + nni_aio_set_msg(p->aio_recv, NULL); + nni_pipe_recv(p->pipe, p->aio_recv); +} + +static void +pub0_pipe_getq_cb(void *arg) +{ + pub0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->pipe, p->aio_send); +} + +static void +pub0_pipe_send_cb(void *arg) +{ + pub0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->pipe); + return; + } + + nni_aio_set_msg(p->aio_send, NULL); + nni_msgq_aio_get(p->sendq, p->aio_getq); +} + +static int +pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + pub0_sock *s = arg; + return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); +} + +static int +pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + pub0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static void +pub0_sock_recv(void *arg, nni_aio *aio) +{ + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + +static void +pub0_sock_send(void *arg, nni_aio *aio) +{ + pub0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static nni_proto_pipe_ops pub0_pipe_ops = { + .pipe_init = pub0_pipe_init, + .pipe_fini = pub0_pipe_fini, + .pipe_start = pub0_pipe_start, + .pipe_stop = pub0_pipe_stop, +}; + +static nni_proto_sock_option pub0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = pub0_sock_getopt_raw, + .pso_setopt = pub0_sock_setopt_raw, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops pub0_sock_ops = { + .sock_init = pub0_sock_init, + .sock_fini = pub0_sock_fini, + .sock_open = pub0_sock_open, + .sock_close = pub0_sock_close, + .sock_send = pub0_sock_send, + .sock_recv = pub0_sock_recv, + .sock_options = pub0_sock_options, +}; + +static nni_proto pub0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUB_V0, "pub" }, + .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, + .proto_flags = NNI_PROTO_FLAG_SND, + .proto_sock_ops = &pub0_sock_ops, + .proto_pipe_ops = &pub0_pipe_ops, +}; + +int +nng_pub0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pub0_proto)); +} diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h new file mode 100644 index 00000000..2388a292 --- /dev/null +++ b/src/protocol/pubsub0/pub.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PUBSUB0_PUB_H +#define NNG_PROTOCOL_PUBSUB0_PUB_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pub0_open(nng_socket *); + +#ifndef nng_pub_open +#define nng_pub_open nng_pub0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PUBSUB0_PUB_H diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c new file mode 100644 index 00000000..6c504d75 --- /dev/null +++ b/src/protocol/pubsub0/sub.c @@ -0,0 +1,404 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" +#include "protocol/pubsub0/sub.h" + +// Subscriber protocol. The SUB protocol receives messages sent to +// it from publishers, and filters out those it is not interested in, +// only passing up ones that match known subscriptions. + +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif + +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif + +typedef struct sub0_pipe sub0_pipe; +typedef struct sub0_sock sub0_sock; +typedef struct sub0_topic sub0_topic; + +static void sub0_recv_cb(void *); +static void sub0_putq_cb(void *); +static void sub0_pipe_fini(void *); + +struct sub0_topic { + nni_list_node node; + size_t len; + void * buf; +}; + +// sub0_sock is our per-socket protocol private structure. +struct sub0_sock { + nni_list topics; + nni_msgq *urq; + int raw; + nni_mtx lk; +}; + +// sub0_pipe is our per-pipe protocol private structure. +struct sub0_pipe { + nni_pipe * pipe; + sub0_sock *sub; + nni_aio * aio_recv; + nni_aio * aio_putq; +}; + +static int +sub0_sock_init(void **sp, nni_sock *sock) +{ + sub0_sock *s; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->lk); + NNI_LIST_INIT(&s->topics, sub0_topic, node); + s->raw = 0; + + s->urq = nni_sock_recvq(sock); + *sp = s; + return (0); +} + +static void +sub0_sock_fini(void *arg) +{ + sub0_sock * s = arg; + sub0_topic *topic; + + while ((topic = nni_list_first(&s->topics)) != NULL) { + nni_list_remove(&s->topics, topic); + nni_free(topic->buf, topic->len); + NNI_FREE_STRUCT(topic); + } + nni_mtx_fini(&s->lk); + NNI_FREE_STRUCT(s); +} + +static void +sub0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +sub0_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +sub0_pipe_fini(void *arg) +{ + sub0_pipe *p = arg; + + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_recv); + NNI_FREE_STRUCT(p); +} + +static int +sub0_pipe_init(void **pp, nni_pipe *pipe, void *s) +{ + sub0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) { + sub0_pipe_fini(p); + return (rv); + } + + p->pipe = pipe; + p->sub = s; + *pp = p; + return (0); +} + +static int +sub0_pipe_start(void *arg) +{ + sub0_pipe *p = arg; + + nni_pipe_recv(p->pipe, p->aio_recv); + return (0); +} + +static void +sub0_pipe_stop(void *arg) +{ + sub0_pipe *p = arg; + + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_recv); +} + +static void +sub0_recv_cb(void *arg) +{ + sub0_pipe *p = arg; + sub0_sock *s = p->sub; + nni_msgq * urq = s->urq; + nni_msg * msg; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(urq, p->aio_putq); +} + +static void +sub0_putq_cb(void *arg) +{ + sub0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->pipe); + return; + } + + nni_pipe_recv(p->pipe, p->aio_recv); +} + +// For now we maintain subscriptions on a sorted linked list. As we do not +// expect to have huge numbers of subscriptions, and as the operation is +// really O(n), we think this is acceptable. In the future we might decide +// to replace this with a patricia trie, like old nanomsg had. + +static int +sub0_subscribe(void *arg, const void *buf, size_t sz) +{ + sub0_sock * s = arg; + sub0_topic *topic; + sub0_topic *newtopic; + + nni_mtx_lock(&s->lk); + NNI_LIST_FOREACH (&s->topics, topic) { + int rv; + + if (topic->len >= sz) { + rv = memcmp(topic->buf, buf, sz); + } else { + rv = memcmp(topic->buf, buf, topic->len); + } + if (rv == 0) { + if (topic->len == sz) { + // Already inserted. + nni_mtx_unlock(&s->lk); + return (0); + } + if (topic->len > sz) { + break; + } + } else if (rv > 0) { + break; + } + } + + if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) { + nni_mtx_unlock(&s->lk); + return (NNG_ENOMEM); + } + if ((newtopic->buf = nni_alloc(sz)) == NULL) { + nni_mtx_unlock(&s->lk); + return (NNG_ENOMEM); + } + NNI_LIST_NODE_INIT(&newtopic->node); + newtopic->len = sz; + memcpy(newtopic->buf, buf, sz); + if (topic != NULL) { + nni_list_insert_before(&s->topics, newtopic, topic); + } else { + nni_list_append(&s->topics, newtopic); + } + nni_mtx_unlock(&s->lk); + return (0); +} + +static int +sub0_unsubscribe(void *arg, const void *buf, size_t sz) +{ + sub0_sock * s = arg; + sub0_topic *topic; + int rv; + + nni_mtx_lock(&s->lk); + NNI_LIST_FOREACH (&s->topics, topic) { + if (topic->len >= sz) { + rv = memcmp(topic->buf, buf, sz); + } else { + rv = memcmp(topic->buf, buf, topic->len); + } + if (rv == 0) { + if (topic->len == sz) { + nni_list_remove(&s->topics, topic); + nni_mtx_unlock(&s->lk); + nni_free(topic->buf, topic->len); + NNI_FREE_STRUCT(topic); + return (0); + } + if (topic->len > sz) { + nni_mtx_unlock(&s->lk); + return (NNG_ENOENT); + } + } + if (rv > 0) { + nni_mtx_unlock(&s->lk); + return (NNG_ENOENT); + } + } + nni_mtx_unlock(&s->lk); + return (NNG_ENOENT); +} + +static int +sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + sub0_sock *s = arg; + return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); +} + +static int +sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + sub0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static void +sub0_sock_send(void *arg, nni_aio *aio) +{ + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + +static void +sub0_sock_recv(void *arg, nni_aio *aio) +{ + sub0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static nni_msg * +sub0_sock_filter(void *arg, nni_msg *msg) +{ + sub0_sock * s = arg; + sub0_topic *topic; + char * body; + size_t len; + int match; + + nni_mtx_lock(&s->lk); + if (s->raw) { + nni_mtx_unlock(&s->lk); + return (msg); + } + + body = nni_msg_body(msg); + len = nni_msg_len(msg); + + match = 0; + // Check to see if the message matches one of our subscriptions. + NNI_LIST_FOREACH (&s->topics, topic) { + if (len >= topic->len) { + int rv = memcmp(topic->buf, body, topic->len); + if (rv == 0) { + // Matched! + match = 1; + break; + } + if (rv > 0) { + match = 0; + break; + } + } else if (memcmp(topic->buf, body, len) >= 0) { + match = 0; + break; + } + } + nni_mtx_unlock(&s->lk); + if (!match) { + nni_msg_free(msg); + return (NULL); + } + return (msg); +} + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +static nni_proto_pipe_ops sub0_pipe_ops = { + .pipe_init = sub0_pipe_init, + .pipe_fini = sub0_pipe_fini, + .pipe_start = sub0_pipe_start, + .pipe_stop = sub0_pipe_stop, +}; + +static nni_proto_sock_option sub0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = sub0_sock_getopt_raw, + .pso_setopt = sub0_sock_setopt_raw, + }, + { + .pso_name = NNG_OPT_SUB_SUBSCRIBE, + .pso_getopt = NULL, + .pso_setopt = sub0_subscribe, + }, + { + .pso_name = NNG_OPT_SUB_UNSUBSCRIBE, + .pso_getopt = NULL, + .pso_setopt = sub0_unsubscribe, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops sub0_sock_ops = { + .sock_init = sub0_sock_init, + .sock_fini = sub0_sock_fini, + .sock_open = sub0_sock_open, + .sock_close = sub0_sock_close, + .sock_send = sub0_sock_send, + .sock_recv = sub0_sock_recv, + .sock_filter = sub0_sock_filter, + .sock_options = sub0_sock_options, +}; + +static nni_proto sub0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SUB_V0, "sub" }, + .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, + .proto_flags = NNI_PROTO_FLAG_RCV, + .proto_sock_ops = &sub0_sock_ops, + .proto_pipe_ops = &sub0_pipe_ops, +}; + +int +nng_sub0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &sub0_proto)); +} diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h new file mode 100644 index 00000000..1a09145d --- /dev/null +++ b/src/protocol/pubsub0/sub.h @@ -0,0 +1,31 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PUBSUB0_SUB_H +#define NNG_PROTOCOL_PUBSUB0_SUB_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_sub0_open(nng_socket *); + +#ifndef nng_sub_open +#define nng_sub_open nng_sub0_open +#endif + +#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" +#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PUBSUB0_SUB_H diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c deleted file mode 100644 index 100e739d..00000000 --- a/src/protocol/reqrep/rep.c +++ /dev/null @@ -1,506 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include - -#include "core/nng_impl.h" - -// Response protocol. The REP protocol is the "reply" side of a -// request-reply pair. This is useful for building RPC servers, for -// example. - -typedef struct rep_pipe rep_pipe; -typedef struct rep_sock rep_sock; - -static void rep_sock_getq_cb(void *); -static void rep_pipe_getq_cb(void *); -static void rep_pipe_putq_cb(void *); -static void rep_pipe_send_cb(void *); -static void rep_pipe_recv_cb(void *); -static void rep_pipe_fini(void *); - -// A rep_sock is our per-socket protocol private structure. -struct rep_sock { - nni_msgq * uwq; - nni_msgq * urq; - nni_mtx lk; - int raw; - int ttl; - nni_idhash *pipes; - char * btrace; - size_t btrace_len; - nni_aio * aio_getq; -}; - -// A rep_pipe is our per-pipe protocol private structure. -struct rep_pipe { - nni_pipe *pipe; - rep_sock *rep; - nni_msgq *sendq; - nni_aio * aio_getq; - nni_aio * aio_send; - nni_aio * aio_recv; - nni_aio * aio_putq; -}; - -static void -rep_sock_fini(void *arg) -{ - rep_sock *s = arg; - - nni_aio_stop(s->aio_getq); - nni_aio_fini(s->aio_getq); - nni_idhash_fini(s->pipes); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); - } - nni_mtx_fini(&s->lk); - NNI_FREE_STRUCT(s); -} - -static int -rep_sock_init(void **sp, nni_sock *sock) -{ - rep_sock *s; - int rv; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&s->lk); - if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) { - rep_sock_fini(s); - return (rv); - } - - s->ttl = 8; // Per RFC - s->raw = 0; - s->btrace = NULL; - s->btrace_len = 0; - s->uwq = nni_sock_sendq(sock); - s->urq = nni_sock_recvq(sock); - - *sp = s; - - return (0); -} - -static void -rep_sock_open(void *arg) -{ - rep_sock *s = arg; - - nni_msgq_aio_get(s->uwq, s->aio_getq); -} - -static void -rep_sock_close(void *arg) -{ - rep_sock *s = arg; - - nni_aio_cancel(s->aio_getq, NNG_ECLOSED); -} - -static void -rep_pipe_fini(void *arg) -{ - rep_pipe *p = arg; - - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_msgq_fini(p->sendq); - NNI_FREE_STRUCT(p); -} - -static int -rep_pipe_init(void **pp, nni_pipe *pipe, void *s) -{ - rep_pipe *p; - int rv; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, rep_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, rep_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, rep_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, rep_pipe_putq_cb, p)) != 0)) { - rep_pipe_fini(p); - return (rv); - } - - p->pipe = pipe; - p->rep = s; - *pp = p; - return (0); -} - -static int -rep_pipe_start(void *arg) -{ - rep_pipe *p = arg; - rep_sock *s = p->rep; - int rv; - - if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { - return (rv); - } - - nni_msgq_aio_get(p->sendq, p->aio_getq); - nni_pipe_recv(p->pipe, p->aio_recv); - return (0); -} - -static void -rep_pipe_stop(void *arg) -{ - rep_pipe *p = arg; - rep_sock *s = p->rep; - - nni_msgq_close(p->sendq); - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); - - nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); -} - -static void -rep_sock_getq_cb(void *arg) -{ - rep_sock *s = arg; - nni_msgq *uwq = s->uwq; - nni_msg * msg; - uint32_t id; - rep_pipe *p; - int rv; - - // This watches for messages from the upper write queue, - // extracts the destination pipe, and forwards it to the appropriate - // destination pipe via a separate queue. This prevents a single bad - // or slow pipe from gumming up the works for the entire socket. - - if (nni_aio_result(s->aio_getq) != 0) { - // Closed socket? - return; - } - - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); - - // We yank the outgoing pipe id from the header - if (nni_msg_header_len(msg) < 4) { - nni_msg_free(msg); - - // Look for another message on the upper write queue. - nni_msgq_aio_get(uwq, s->aio_getq); - return; - } - - id = nni_msg_header_trim_u32(msg); - - // Look for the pipe, and attempt to put the message there - // (nonblocking) if we can. If we can't for any reason, then we - // free the message. - // XXX: LOCKING?!?! - if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) { - rv = nni_msgq_tryput(p->sendq, msg); - } - if (rv != 0) { - nni_msg_free(msg); - } - - // Now look for another message on the upper write queue. - nni_msgq_aio_get(uwq, s->aio_getq); -} - -static void -rep_pipe_getq_cb(void *arg) -{ - rep_pipe *p = arg; - - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); - return; - } - - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - nni_pipe_send(p->pipe, p->aio_send); -} - -static void -rep_pipe_send_cb(void *arg) -{ - rep_pipe *p = arg; - - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); - return; - } - - nni_msgq_aio_get(p->sendq, p->aio_getq); -} - -static void -rep_pipe_recv_cb(void *arg) -{ - rep_pipe *p = arg; - rep_sock *s = p->rep; - nni_msg * msg; - int rv; - uint8_t * body; - int hops; - - if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); - return; - } - - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); - - nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - - // Store the pipe id in the header, first thing. - rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); - if (rv != 0) { - // Failure here causes us to drop the message. - goto drop; - } - - // Move backtrace from body to header - hops = 1; - for (;;) { - int end = 0; - if (hops >= s->ttl) { - // This isn't malformed, but it has gone through - // too many hops. Do not disconnect, because we - // can legitimately receive messages with too many - // hops from devices, etc. - goto drop; - } - if (nni_msg_len(msg) < 4) { - // Peer is speaking garbage. Kick it. - nni_msg_free(msg); - nni_pipe_stop(p->pipe); - return; - } - body = nni_msg_body(msg); - end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_header_append(msg, body, 4); - if (rv != 0) { - // Presumably this is due to out of memory. - // We could just discard and try again, but we - // just toss the connection for now. Given the - // out of memory situation, this is not unreasonable. - goto drop; - } - nni_msg_trim(msg, 4); - if (end) { - break; - } - } - - // Go ahead and send it up. - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(s->urq, p->aio_putq); - return; - -drop: - nni_msg_free(msg); - nni_pipe_recv(p->pipe, p->aio_recv); -} - -static void -rep_pipe_putq_cb(void *arg) -{ - rep_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); - return; - } - - nni_pipe_recv(p->pipe, p->aio_recv); -} - -static int -rep_sock_setopt_raw(void *arg, const void *buf, size_t sz) -{ - rep_sock *s = arg; - int rv; - - nni_mtx_lock(&s->lk); - rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); - nni_mtx_unlock(&s->lk); - return (rv); -} - -static int -rep_sock_getopt_raw(void *arg, void *buf, size_t *szp) -{ - rep_sock *s = arg; - return (nni_getopt_int(s->raw, buf, szp)); -} - -static int -rep_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) -{ - rep_sock *s = arg; - return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); -} - -static int -rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) -{ - rep_sock *s = arg; - return (nni_getopt_int(s->ttl, buf, szp)); -} - -static nni_msg * -rep_sock_filter(void *arg, nni_msg *msg) -{ - rep_sock *s = arg; - char * header; - size_t len; - - nni_mtx_lock(&s->lk); - if (s->raw) { - nni_mtx_unlock(&s->lk); - return (msg); - } - - len = nni_msg_header_len(msg); - header = nni_msg_header(msg); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - } - if ((s->btrace = nni_alloc(len)) == NULL) { - nni_msg_free(msg); - return (NULL); - } - s->btrace_len = len; - memcpy(s->btrace, header, len); - nni_msg_header_clear(msg); - nni_mtx_unlock(&s->lk); - return (msg); -} - -static void -rep_sock_send(void *arg, nni_aio *aio) -{ - rep_sock *s = arg; - int rv; - nni_msg * msg; - - nni_mtx_lock(&s->lk); - if (s->raw) { - // Pass thru - nni_mtx_unlock(&s->lk); - nni_msgq_aio_put(s->uwq, aio); - return; - } - if (s->btrace == NULL) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } - - msg = nni_aio_get_msg(aio); - - // drop anything else in the header... (it should already be - // empty, but there can be stale backtrace info there.) - nni_msg_header_clear(msg); - - if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); - return; - } - - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - - nni_mtx_unlock(&s->lk); - nni_msgq_aio_put(s->uwq, aio); -} - -static void -rep_sock_recv(void *arg, nni_aio *aio) -{ - rep_sock *s = arg; - - nni_msgq_aio_get(s->urq, aio); -} - -// This is the global protocol structure -- our linkage to the core. -// This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops rep_pipe_ops = { - .pipe_init = rep_pipe_init, - .pipe_fini = rep_pipe_fini, - .pipe_start = rep_pipe_start, - .pipe_stop = rep_pipe_stop, -}; - -static nni_proto_sock_option rep_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_getopt = rep_sock_getopt_raw, - .pso_setopt = rep_sock_setopt_raw, - }, - { - .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = rep_sock_getopt_maxttl, - .pso_setopt = rep_sock_setopt_maxttl, - }, - // terminate list - { NULL, NULL, NULL }, -}; - -static nni_proto_sock_ops rep_sock_ops = { - .sock_init = rep_sock_init, - .sock_fini = rep_sock_fini, - .sock_open = rep_sock_open, - .sock_close = rep_sock_close, - .sock_options = rep_sock_options, - .sock_filter = rep_sock_filter, - .sock_send = rep_sock_send, - .sock_recv = rep_sock_recv, -}; - -static nni_proto nni_rep_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REP_V0, "rep" }, - .proto_peer = { NNI_PROTO_REQ_V0, "req" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &rep_sock_ops, - .proto_pipe_ops = &rep_pipe_ops, -}; - -int -nng_rep0_open(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &nni_rep_proto)); -} diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c deleted file mode 100644 index bead1ec4..00000000 --- a/src/protocol/reqrep/req.c +++ /dev/null @@ -1,668 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include -#include - -#include "core/nng_impl.h" - -// Request protocol. The REQ protocol is the "request" side of a -// request-reply pair. This is useful for building RPC clients, for example. - -const char *nng_opt_req_resendtime = NNG_OPT_REQ_RESENDTIME; - -typedef struct req_pipe req_pipe; -typedef struct req_sock req_sock; - -static void req_resend(req_sock *); -static void req_timeout(void *); -static void req_pipe_fini(void *); - -// A req_sock is our per-socket protocol private structure. -struct req_sock { - nni_msgq * uwq; - nni_msgq * urq; - nni_duration retry; - nni_time resend; - int raw; - int wantw; - int closed; - int ttl; - nni_msg * reqmsg; - - req_pipe *pendpipe; - - nni_list readypipes; - nni_list busypipes; - - nni_timer_node timer; - - uint32_t nextid; // next id - uint8_t reqid[4]; // outstanding request ID (big endian) - nni_mtx mtx; - nni_cv cv; -}; - -// A req_pipe is our per-pipe protocol private structure. -struct req_pipe { - nni_pipe * pipe; - req_sock * req; - nni_list_node node; - nni_aio * aio_getq; // raw mode only - nni_aio * aio_sendraw; // raw mode only - nni_aio * aio_sendcooked; // cooked mode only - nni_aio * aio_recv; - nni_aio * aio_putq; - nni_mtx mtx; -}; - -static void req_resender(void *); -static void req_getq_cb(void *); -static void req_sendraw_cb(void *); -static void req_sendcooked_cb(void *); -static void req_recv_cb(void *); -static void req_putq_cb(void *); - -static int -req_sock_init(void **sp, nni_sock *sock) -{ - req_sock *s; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&s->mtx); - nni_cv_init(&s->cv, &s->mtx); - - NNI_LIST_INIT(&s->readypipes, req_pipe, node); - NNI_LIST_INIT(&s->busypipes, req_pipe, node); - nni_timer_init(&s->timer, req_timeout, s); - - // this is "semi random" start for request IDs. - s->nextid = nni_random(); - s->retry = NNI_SECOND * 60; - s->reqmsg = NULL; - s->raw = 0; - s->wantw = 0; - s->resend = NNI_TIME_ZERO; - s->ttl = 8; - s->uwq = nni_sock_sendq(sock); - s->urq = nni_sock_recvq(sock); - *sp = s; - - return (0); -} - -static void -req_sock_open(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static void -req_sock_close(void *arg) -{ - req_sock *s = arg; - - nni_mtx_lock(&s->mtx); - s->closed = 1; - nni_mtx_unlock(&s->mtx); - - nni_timer_cancel(&s->timer); -} - -static void -req_sock_fini(void *arg) -{ - req_sock *s = arg; - - nni_mtx_lock(&s->mtx); - while ((!nni_list_empty(&s->readypipes)) || - (!nni_list_empty(&s->busypipes))) { - nni_cv_wait(&s->cv); - } - if (s->reqmsg != NULL) { - nni_msg_free(s->reqmsg); - } - nni_mtx_unlock(&s->mtx); - nni_cv_fini(&s->cv); - nni_mtx_fini(&s->mtx); - NNI_FREE_STRUCT(s); -} - -static void -req_pipe_fini(void *arg) -{ - req_pipe *p = arg; - - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_sendcooked); - nni_aio_fini(p->aio_sendraw); - nni_mtx_fini(&p->mtx); - NNI_FREE_STRUCT(p); -} - -static int -req_pipe_init(void **pp, nni_pipe *pipe, void *s) -{ - req_pipe *p; - int rv; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->aio_getq, req_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, req_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, req_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendraw, req_sendraw_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendcooked, req_sendcooked_cb, p)) != - 0)) { - req_pipe_fini(p); - return (rv); - } - - NNI_LIST_NODE_INIT(&p->node); - p->pipe = pipe; - p->req = s; - *pp = p; - return (0); -} - -static int -req_pipe_start(void *arg) -{ - req_pipe *p = arg; - req_sock *s = p->req; - - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { - return (NNG_EPROTO); - } - - nni_mtx_lock(&s->mtx); - if (s->closed) { - nni_mtx_unlock(&s->mtx); - return (NNG_ECLOSED); - } - nni_list_append(&s->readypipes, p); - // If sock was waiting for somewhere to send data, go ahead and - // send it to this pipe. - if (s->wantw) { - req_resend(s); - } - nni_mtx_unlock(&s->mtx); - - nni_msgq_aio_get(s->uwq, p->aio_getq); - nni_pipe_recv(p->pipe, p->aio_recv); - return (0); -} - -static void -req_pipe_stop(void *arg) -{ - req_pipe *p = arg; - req_sock *s = p->req; - - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_sendcooked); - nni_aio_stop(p->aio_sendraw); - - // At this point there should not be any further AIOs running. - // Further, any completion tasks have completed. - - nni_mtx_lock(&s->mtx); - // This removes the node from either busypipes or readypipes. - // It doesn't much matter which. - if (nni_list_node_active(&p->node)) { - nni_list_node_remove(&p->node); - if (s->closed) { - nni_cv_wake(&s->cv); - } - } - - if ((p == s->pendpipe) && (s->reqmsg != NULL)) { - // removing the pipe we sent the last request on... - // schedule immediate resend. - s->pendpipe = NULL; - s->resend = NNI_TIME_ZERO; - s->wantw = 1; - req_resend(s); - } - nni_mtx_unlock(&s->mtx); -} - -static int -req_sock_setopt_raw(void *arg, const void *buf, size_t sz) -{ - req_sock *s = arg; - return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); -} - -static int -req_sock_getopt_raw(void *arg, void *buf, size_t *szp) -{ - req_sock *s = arg; - return (nni_getopt_int(s->raw, buf, szp)); -} - -static int -req_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) -{ - req_sock *s = arg; - return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); -} - -static int -req_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) -{ - req_sock *s = arg; - return (nni_getopt_int(s->ttl, buf, szp)); -} - -static int -req_sock_setopt_resendtime(void *arg, const void *buf, size_t sz) -{ - req_sock *s = arg; - return (nni_setopt_ms(&s->retry, buf, sz)); -} - -static int -req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) -{ - req_sock *s = arg; - return (nni_getopt_ms(s->retry, buf, szp)); -} - -// Raw and cooked mode differ in the way they send messages out. -// -// For cooked mdes, we have a getq callback on the upper write queue, which -// when it finds a message, cancels any current processing, and saves a copy -// of the message, and then tries to "resend" the message, looking for a -// suitable available outgoing pipe. If no suitable pipe is available, -// a flag is set, so that as soon as such a pipe is available we trigger -// a resend attempt. We also trigger the attempt on either timeout, or if -// the underlying pipe we chose disconnects. -// -// For raw mode we can just let the pipes "contend" via getq to get a -// message from the upper write queue. The msgqueue implementation -// actually provides ordering, so load will be spread automatically. -// (NB: We may have to revise this in the future if we want to provide some -// kind of priority.) - -static void -req_getq_cb(void *arg) -{ - req_pipe *p = arg; - req_sock *s = p->req; - - // We should be in RAW mode. Cooked mode traffic bypasses - // the upper write queue entirely, and should never end up here. - // If the mode changes, we may briefly deliver a message, but - // that's ok (there's an inherent race anyway). (One minor - // exception: we wind up here in error state when the uwq is closed.) - - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); - return; - } - - nni_aio_set_msg(p->aio_sendraw, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - // Send the message, but use the raw mode aio. - nni_pipe_send(p->pipe, p->aio_sendraw); -} - -static void -req_sendraw_cb(void *arg) -{ - req_pipe *p = arg; - - if (nni_aio_result(p->aio_sendraw) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_sendraw)); - nni_aio_set_msg(p->aio_sendraw, NULL); - nni_pipe_stop(p->pipe); - return; - } - - // Sent a message so we just need to look for another one. - nni_msgq_aio_get(p->req->uwq, p->aio_getq); -} - -static void -req_sendcooked_cb(void *arg) -{ - req_pipe *p = arg; - req_sock *s = p->req; - - if (nni_aio_result(p->aio_sendcooked) != 0) { - // We failed to send... clean up and deal with it. - // We leave ourselves on the busy list for now, which - // means no new asynchronous traffic can occur here. - nni_msg_free(nni_aio_get_msg(p->aio_sendcooked)); - nni_aio_set_msg(p->aio_sendcooked, NULL); - nni_pipe_stop(p->pipe); - return; - } - - // Cooked mode. We completed a cooked send, so we need to - // reinsert ourselves in the ready list, and possibly schedule - // a resend. - - nni_mtx_lock(&s->mtx); - if (nni_list_active(&s->busypipes, p)) { - nni_list_remove(&s->busypipes, p); - nni_list_append(&s->readypipes, p); - req_resend(s); - } else { - // We wind up here if stop was called from the reader - // side while we were waiting to be scheduled to run for the - // writer side. In this case we can't complete the operation, - // and we have to abort. - nni_pipe_stop(p->pipe); - } - nni_mtx_unlock(&s->mtx); -} - -static void -req_putq_cb(void *arg) -{ - req_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); - return; - } - nni_aio_set_msg(p->aio_putq, NULL); - - nni_pipe_recv(p->pipe, p->aio_recv); -} - -static void -req_recv_cb(void *arg) -{ - req_pipe *p = arg; - nni_msg * msg; - - if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); - return; - } - - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); - nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - - // We yank 4 bytes of body, and move them to the header. - if (nni_msg_len(msg) < 4) { - // Malformed message. - goto malformed; - } - if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { - // Arguably we could just discard and carry on. But - // dropping the connection is probably more helpful since - // it lets the other side see that a problem occurred. - // Plus it gives us a chance to reclaim some memory. - goto malformed; - } - (void) nni_msg_trim(msg, 4); // Cannot fail - - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(p->req->urq, p->aio_putq); - return; - -malformed: - nni_msg_free(msg); - nni_pipe_stop(p->pipe); -} - -static void -req_timeout(void *arg) -{ - req_sock *s = arg; - - nni_mtx_lock(&s->mtx); - if (s->reqmsg != NULL) { - s->wantw = 1; - req_resend(s); - } - nni_mtx_unlock(&s->mtx); -} - -static void -req_resend(req_sock *s) -{ - req_pipe *p; - nni_msg * msg; - - // Note: This routine should be called with the socket lock held. - // Also, this should only be called while handling cooked mode - // requests. - if ((msg = s->reqmsg) == NULL) { - return; - } - - if (s->closed) { - s->reqmsg = NULL; - nni_msg_free(msg); - } - - if (s->wantw) { - s->wantw = 0; - - if (nni_msg_dup(&msg, s->reqmsg) != 0) { - // Failed to alloc message, reschedule it. Also, - // mark that we have a message we want to resend, - // in case something comes available. - s->wantw = 1; - nni_timer_schedule(&s->timer, nni_clock() + s->retry); - return; - } - - // Now we iterate across all possible outpipes, until - // one accepts it. - if ((p = nni_list_first(&s->readypipes)) == NULL) { - // No pipes ready to process us. Note that we have - // something to send, and schedule it. - nni_msg_free(msg); - s->wantw = 1; - return; - } - - nni_list_remove(&s->readypipes, p); - nni_list_append(&s->busypipes, p); - - s->pendpipe = p; - s->resend = nni_clock() + s->retry; - nni_aio_set_msg(p->aio_sendcooked, msg); - - // Note that because we were ready rather than busy, we - // should not have any I/O oustanding and hence the aio - // object will be available for our use. - nni_pipe_send(p->pipe, p->aio_sendcooked); - nni_timer_schedule(&s->timer, s->resend); - } -} - -static void -req_sock_send(void *arg, nni_aio *aio) -{ - req_sock *s = arg; - uint32_t id; - size_t len; - nni_msg * msg; - int rv; - - nni_mtx_lock(&s->mtx); - if (s->raw) { - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); - return; - } - - msg = nni_aio_get_msg(aio); - len = nni_msg_len(msg); - - // In cooked mode, because we need to manage our own resend logic, - // we bypass the upper writeq entirely. - - // Generate a new request ID. We always set the high - // order bit so that the peer can locate the end of the - // backtrace. (Pipe IDs have the high order bit clear.) - id = (s->nextid++) | 0x80000000u; - // Request ID is in big endian format. - NNI_PUT32(s->reqid, id); - - if ((rv = nni_msg_header_append(msg, s->reqid, 4)) != 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); - return; - } - - // If another message is there, this cancels it. - if (s->reqmsg != NULL) { - nni_msg_free(s->reqmsg); - s->reqmsg = NULL; - } - - nni_aio_set_msg(aio, NULL); - - // Make a duplicate message... for retries. - s->reqmsg = msg; - // Schedule for immediate send - s->resend = NNI_TIME_ZERO; - s->wantw = 1; - - req_resend(s); - - nni_mtx_unlock(&s->mtx); - - nni_aio_finish(aio, 0, len); -} - -static nni_msg * -req_sock_filter(void *arg, nni_msg *msg) -{ - req_sock *s = arg; - nni_msg * rmsg; - - nni_mtx_lock(&s->mtx); - if (s->raw) { - // Pass it unmolested - nni_mtx_unlock(&s->mtx); - return (msg); - } - - if (nni_msg_header_len(msg) < 4) { - nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); - } - - if ((rmsg = s->reqmsg) == NULL) { - // We had no outstanding request. (Perhaps canceled, - // or duplicate response.) - nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); - } - - if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) { - // Wrong request id. - nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); - } - - s->reqmsg = NULL; - s->pendpipe = NULL; - nni_mtx_unlock(&s->mtx); - - nni_msg_free(rmsg); - - return (msg); -} - -static void -req_sock_recv(void *arg, nni_aio *aio) -{ - req_sock *s = arg; - - nni_mtx_lock(&s->mtx); - if (!s->raw) { - if (s->reqmsg == NULL) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } - } - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->urq, aio); -} - -static nni_proto_pipe_ops req_pipe_ops = { - .pipe_init = req_pipe_init, - .pipe_fini = req_pipe_fini, - .pipe_start = req_pipe_start, - .pipe_stop = req_pipe_stop, -}; - -static nni_proto_sock_option req_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_getopt = req_sock_getopt_raw, - .pso_setopt = req_sock_setopt_raw, - }, - { - .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = req_sock_getopt_maxttl, - .pso_setopt = req_sock_setopt_maxttl, - }, - { - .pso_name = NNG_OPT_REQ_RESENDTIME, - .pso_getopt = req_sock_getopt_resendtime, - .pso_setopt = req_sock_setopt_resendtime, - }, - // terminate list - { NULL, NULL, NULL }, -}; - -static nni_proto_sock_ops req_sock_ops = { - .sock_init = req_sock_init, - .sock_fini = req_sock_fini, - .sock_open = req_sock_open, - .sock_close = req_sock_close, - .sock_options = req_sock_options, - .sock_filter = req_sock_filter, - .sock_send = req_sock_send, - .sock_recv = req_sock_recv, -}; - -static nni_proto req_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REQ_V0, "req" }, - .proto_peer = { NNI_PROTO_REP_V0, "rep" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &req_sock_ops, - .proto_pipe_ops = &req_pipe_ops, -}; - -int -nng_req0_open(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &req_proto)); -} diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt new file mode 100644 index 00000000..4e82ad41 --- /dev/null +++ b/src/protocol/reqrep0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore +# Copyright 2017 Capitar IT Group BV +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_REQ0) + set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/req.h) + install(FILES req.h DESTINATION include/nng/protocol/reqrep0) +endif() + +if (NNG_PROTO_REP0) + set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/rep.h) + install(FILES rep.h DESTINATION include/nng/protocol/reqrep0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${REQ0_SOURCES} ${REP0_SOURCES} PARENT_SCOPE) \ No newline at end of file diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c new file mode 100644 index 00000000..ee8e4277 --- /dev/null +++ b/src/protocol/reqrep0/rep.c @@ -0,0 +1,515 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" +#include "protocol/reqrep0/rep.h" + +// Response protocol. The REP protocol is the "reply" side of a +// request-reply pair. This is useful for building RPC servers, for +// example. + +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif + +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif + +typedef struct rep0_pipe rep0_pipe; +typedef struct rep0_sock rep0_sock; + +static void rep0_sock_getq_cb(void *); +static void rep0_pipe_getq_cb(void *); +static void rep0_pipe_putq_cb(void *); +static void rep0_pipe_send_cb(void *); +static void rep0_pipe_recv_cb(void *); +static void rep0_pipe_fini(void *); + +// rep0_sock is our per-socket protocol private structure. +struct rep0_sock { + nni_msgq * uwq; + nni_msgq * urq; + nni_mtx lk; + int raw; + int ttl; + nni_idhash *pipes; + char * btrace; + size_t btrace_len; + nni_aio * aio_getq; +}; + +// rep0_pipe is our per-pipe protocol private structure. +struct rep0_pipe { + nni_pipe * pipe; + rep0_sock *rep; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_putq; +}; + +static void +rep0_sock_fini(void *arg) +{ + rep0_sock *s = arg; + + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_idhash_fini(s->pipes); + if (s->btrace != NULL) { + nni_free(s->btrace, s->btrace_len); + } + nni_mtx_fini(&s->lk); + NNI_FREE_STRUCT(s); +} + +static int +rep0_sock_init(void **sp, nni_sock *sock) +{ + rep0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->lk); + if (((rv = nni_idhash_init(&s->pipes)) != 0) || + ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) { + rep0_sock_fini(s); + return (rv); + } + + s->ttl = 8; // Per RFC + s->raw = 0; + s->btrace = NULL; + s->btrace_len = 0; + s->uwq = nni_sock_sendq(sock); + s->urq = nni_sock_recvq(sock); + + *sp = s; + + return (0); +} + +static void +rep0_sock_open(void *arg) +{ + rep0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +rep0_sock_close(void *arg) +{ + rep0_sock *s = arg; + + nni_aio_cancel(s->aio_getq, NNG_ECLOSED); +} + +static void +rep0_pipe_fini(void *arg) +{ + rep0_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + +static int +rep0_pipe_init(void **pp, nni_pipe *pipe, void *s) +{ + rep0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) { + rep0_pipe_fini(p); + return (rv); + } + + p->pipe = pipe; + p->rep = s; + *pp = p; + return (0); +} + +static int +rep0_pipe_start(void *arg) +{ + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + int rv; + + if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { + return (rv); + } + + nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_pipe_recv(p->pipe, p->aio_recv); + return (0); +} + +static void +rep0_pipe_stop(void *arg) +{ + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + + nni_msgq_close(p->sendq); + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); + + nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); +} + +static void +rep0_sock_getq_cb(void *arg) +{ + rep0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg; + uint32_t id; + rep0_pipe *p; + int rv; + + // This watches for messages from the upper write queue, + // extracts the destination pipe, and forwards it to the appropriate + // destination pipe via a separate queue. This prevents a single bad + // or slow pipe from gumming up the works for the entire socket. + + if (nni_aio_result(s->aio_getq) != 0) { + // Closed socket? + return; + } + + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); + + // We yank the outgoing pipe id from the header + if (nni_msg_header_len(msg) < 4) { + nni_msg_free(msg); + + // Look for another message on the upper write queue. + nni_msgq_aio_get(uwq, s->aio_getq); + return; + } + + id = nni_msg_header_trim_u32(msg); + + // Look for the pipe, and attempt to put the message there + // (nonblocking) if we can. If we can't for any reason, then we + // free the message. + // XXX: LOCKING?!?! + if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) { + rv = nni_msgq_tryput(p->sendq, msg); + } + if (rv != 0) { + nni_msg_free(msg); + } + + // Now look for another message on the upper write queue. + nni_msgq_aio_get(uwq, s->aio_getq); +} + +static void +rep0_pipe_getq_cb(void *arg) +{ + rep0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->pipe, p->aio_send); +} + +static void +rep0_pipe_send_cb(void *arg) +{ + rep0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->pipe); + return; + } + + nni_msgq_aio_get(p->sendq, p->aio_getq); +} + +static void +rep0_pipe_recv_cb(void *arg) +{ + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + nni_msg * msg; + int rv; + uint8_t * body; + int hops; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + // Store the pipe id in the header, first thing. + rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); + if (rv != 0) { + // Failure here causes us to drop the message. + goto drop; + } + + // Move backtrace from body to header + hops = 1; + for (;;) { + int end = 0; + if (hops >= s->ttl) { + // This isn't malformed, but it has gone through + // too many hops. Do not disconnect, because we + // can legitimately receive messages with too many + // hops from devices, etc. + goto drop; + } + if (nni_msg_len(msg) < 4) { + // Peer is speaking garbage. Kick it. + nni_msg_free(msg); + nni_pipe_stop(p->pipe); + return; + } + body = nni_msg_body(msg); + end = (body[0] & 0x80) ? 1 : 0; + rv = nni_msg_header_append(msg, body, 4); + if (rv != 0) { + // Presumably this is due to out of memory. + // We could just discard and try again, but we + // just toss the connection for now. Given the + // out of memory situation, this is not unreasonable. + goto drop; + } + nni_msg_trim(msg, 4); + if (end) { + break; + } + } + + // Go ahead and send it up. + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(s->urq, p->aio_putq); + return; + +drop: + nni_msg_free(msg); + nni_pipe_recv(p->pipe, p->aio_recv); +} + +static void +rep0_pipe_putq_cb(void *arg) +{ + rep0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->pipe); + return; + } + + nni_pipe_recv(p->pipe, p->aio_recv); +} + +static int +rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + rep0_sock *s = arg; + int rv; + + nni_mtx_lock(&s->lk); + rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); + nni_mtx_unlock(&s->lk); + return (rv); +} + +static int +rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + rep0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static int +rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +{ + rep0_sock *s = arg; + return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); +} + +static int +rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +{ + rep0_sock *s = arg; + return (nni_getopt_int(s->ttl, buf, szp)); +} + +static nni_msg * +rep0_sock_filter(void *arg, nni_msg *msg) +{ + rep0_sock *s = arg; + char * header; + size_t len; + + nni_mtx_lock(&s->lk); + if (s->raw) { + nni_mtx_unlock(&s->lk); + return (msg); + } + + len = nni_msg_header_len(msg); + header = nni_msg_header(msg); + if (s->btrace != NULL) { + nni_free(s->btrace, s->btrace_len); + s->btrace = NULL; + s->btrace_len = 0; + } + if ((s->btrace = nni_alloc(len)) == NULL) { + nni_msg_free(msg); + return (NULL); + } + s->btrace_len = len; + memcpy(s->btrace, header, len); + nni_msg_header_clear(msg); + nni_mtx_unlock(&s->lk); + return (msg); +} + +static void +rep0_sock_send(void *arg, nni_aio *aio) +{ + rep0_sock *s = arg; + int rv; + nni_msg * msg; + + nni_mtx_lock(&s->lk); + if (s->raw) { + // Pass thru + nni_mtx_unlock(&s->lk); + nni_msgq_aio_put(s->uwq, aio); + return; + } + if (s->btrace == NULL) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + + msg = nni_aio_get_msg(aio); + + // drop anything else in the header... (it should already be + // empty, but there can be stale backtrace info there.) + nni_msg_header_clear(msg); + + if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } + + nni_free(s->btrace, s->btrace_len); + s->btrace = NULL; + s->btrace_len = 0; + + nni_mtx_unlock(&s->lk); + nni_msgq_aio_put(s->uwq, aio); +} + +static void +rep0_sock_recv(void *arg, nni_aio *aio) +{ + rep0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +static nni_proto_pipe_ops rep0_pipe_ops = { + .pipe_init = rep0_pipe_init, + .pipe_fini = rep0_pipe_fini, + .pipe_start = rep0_pipe_start, + .pipe_stop = rep0_pipe_stop, +}; + +static nni_proto_sock_option rep0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = rep0_sock_getopt_raw, + .pso_setopt = rep0_sock_setopt_raw, + }, + { + .pso_name = NNG_OPT_MAXTTL, + .pso_getopt = rep0_sock_getopt_maxttl, + .pso_setopt = rep0_sock_setopt_maxttl, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops rep0_sock_ops = { + .sock_init = rep0_sock_init, + .sock_fini = rep0_sock_fini, + .sock_open = rep0_sock_open, + .sock_close = rep0_sock_close, + .sock_options = rep0_sock_options, + .sock_filter = rep0_sock_filter, + .sock_send = rep0_sock_send, + .sock_recv = rep0_sock_recv, +}; + +static nni_proto rep0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REP_V0, "rep" }, + .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &rep0_sock_ops, + .proto_pipe_ops = &rep0_pipe_ops, +}; + +int +nng_rep0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &rep0_proto)); +} diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h new file mode 100644 index 00000000..93df9379 --- /dev/null +++ b/src/protocol/reqrep0/rep.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REP_H +#define NNG_PROTOCOL_REQREP0_REP_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_rep0_open(nng_socket *); + +#ifndef nng_rep_open +#define nng_rep_open nng_rep0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REP_H diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c new file mode 100644 index 00000000..94c7f1a0 --- /dev/null +++ b/src/protocol/reqrep0/req.c @@ -0,0 +1,675 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include +#include + +#include "core/nng_impl.h" +#include "protocol/reqrep0/req.h" + +// Request protocol. The REQ protocol is the "request" side of a +// request-reply pair. This is useful for building RPC clients, for example. + +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif + +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif + +typedef struct req0_pipe req0_pipe; +typedef struct req0_sock req0_sock; + +static void req0_resend(req0_sock *); +static void req0_timeout(void *); +static void req0_pipe_fini(void *); + +// A req0_sock is our per-socket protocol private structure. +struct req0_sock { + nni_msgq * uwq; + nni_msgq * urq; + nni_duration retry; + nni_time resend; + int raw; + int wantw; + int closed; + int ttl; + nni_msg * reqmsg; + + req0_pipe *pendpipe; + + nni_list readypipes; + nni_list busypipes; + + nni_timer_node timer; + + uint32_t nextid; // next id + uint8_t reqid[4]; // outstanding request ID (big endian) + nni_mtx mtx; + nni_cv cv; +}; + +// A req0_pipe is our per-pipe protocol private structure. +struct req0_pipe { + nni_pipe * pipe; + req0_sock * req; + nni_list_node node; + nni_aio * aio_getq; // raw mode only + nni_aio * aio_sendraw; // raw mode only + nni_aio * aio_sendcooked; // cooked mode only + nni_aio * aio_recv; + nni_aio * aio_putq; + nni_mtx mtx; +}; + +static void req0_resender(void *); +static void req0_getq_cb(void *); +static void req0_sendraw_cb(void *); +static void req0_sendcooked_cb(void *); +static void req0_recv_cb(void *); +static void req0_putq_cb(void *); + +static int +req0_sock_init(void **sp, nni_sock *sock) +{ + req0_sock *s; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->mtx); + nni_cv_init(&s->cv, &s->mtx); + + NNI_LIST_INIT(&s->readypipes, req0_pipe, node); + NNI_LIST_INIT(&s->busypipes, req0_pipe, node); + nni_timer_init(&s->timer, req0_timeout, s); + + // this is "semi random" start for request IDs. + s->nextid = nni_random(); + s->retry = NNI_SECOND * 60; + s->reqmsg = NULL; + s->raw = 0; + s->wantw = 0; + s->resend = NNI_TIME_ZERO; + s->ttl = 8; + s->uwq = nni_sock_sendq(sock); + s->urq = nni_sock_recvq(sock); + *sp = s; + + return (0); +} + +static void +req0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +req0_sock_close(void *arg) +{ + req0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + s->closed = 1; + nni_mtx_unlock(&s->mtx); + + nni_timer_cancel(&s->timer); +} + +static void +req0_sock_fini(void *arg) +{ + req0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + while ((!nni_list_empty(&s->readypipes)) || + (!nni_list_empty(&s->busypipes))) { + nni_cv_wait(&s->cv); + } + if (s->reqmsg != NULL) { + nni_msg_free(s->reqmsg); + } + nni_mtx_unlock(&s->mtx); + nni_cv_fini(&s->cv); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static void +req0_pipe_fini(void *arg) +{ + req0_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_sendcooked); + nni_aio_fini(p->aio_sendraw); + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); +} + +static int +req0_pipe_init(void **pp, nni_pipe *pipe, void *s) +{ + req0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&p->mtx); + if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) != + 0)) { + req0_pipe_fini(p); + return (rv); + } + + NNI_LIST_NODE_INIT(&p->node); + p->pipe = pipe; + p->req = s; + *pp = p; + return (0); +} + +static int +req0_pipe_start(void *arg) +{ + req0_pipe *p = arg; + req0_sock *s = p->req; + + if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { + return (NNG_EPROTO); + } + + nni_mtx_lock(&s->mtx); + if (s->closed) { + nni_mtx_unlock(&s->mtx); + return (NNG_ECLOSED); + } + nni_list_append(&s->readypipes, p); + // If sock was waiting for somewhere to send data, go ahead and + // send it to this pipe. + if (s->wantw) { + req0_resend(s); + } + nni_mtx_unlock(&s->mtx); + + nni_msgq_aio_get(s->uwq, p->aio_getq); + nni_pipe_recv(p->pipe, p->aio_recv); + return (0); +} + +static void +req0_pipe_stop(void *arg) +{ + req0_pipe *p = arg; + req0_sock *s = p->req; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_sendcooked); + nni_aio_stop(p->aio_sendraw); + + // At this point there should not be any further AIOs running. + // Further, any completion tasks have completed. + + nni_mtx_lock(&s->mtx); + // This removes the node from either busypipes or readypipes. + // It doesn't much matter which. + if (nni_list_node_active(&p->node)) { + nni_list_node_remove(&p->node); + if (s->closed) { + nni_cv_wake(&s->cv); + } + } + + if ((p == s->pendpipe) && (s->reqmsg != NULL)) { + // removing the pipe we sent the last request on... + // schedule immediate resend. + s->pendpipe = NULL; + s->resend = NNI_TIME_ZERO; + s->wantw = 1; + req0_resend(s); + } + nni_mtx_unlock(&s->mtx); +} + +static int +req0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + req0_sock *s = arg; + return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); +} + +static int +req0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + req0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static int +req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +{ + req0_sock *s = arg; + return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); +} + +static int +req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +{ + req0_sock *s = arg; + return (nni_getopt_int(s->ttl, buf, szp)); +} + +static int +req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz) +{ + req0_sock *s = arg; + return (nni_setopt_ms(&s->retry, buf, sz)); +} + +static int +req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) +{ + req0_sock *s = arg; + return (nni_getopt_ms(s->retry, buf, szp)); +} + +// Raw and cooked mode differ in the way they send messages out. +// +// For cooked mdes, we have a getq callback on the upper write queue, which +// when it finds a message, cancels any current processing, and saves a copy +// of the message, and then tries to "resend" the message, looking for a +// suitable available outgoing pipe. If no suitable pipe is available, +// a flag is set, so that as soon as such a pipe is available we trigger +// a resend attempt. We also trigger the attempt on either timeout, or if +// the underlying pipe we chose disconnects. +// +// For raw mode we can just let the pipes "contend" via getq to get a +// message from the upper write queue. The msgqueue implementation +// actually provides ordering, so load will be spread automatically. +// (NB: We may have to revise this in the future if we want to provide some +// kind of priority.) + +static void +req0_getq_cb(void *arg) +{ + req0_pipe *p = arg; + req0_sock *s = p->req; + + // We should be in RAW mode. Cooked mode traffic bypasses + // the upper write queue entirely, and should never end up here. + // If the mode changes, we may briefly deliver a message, but + // that's ok (there's an inherent race anyway). (One minor + // exception: we wind up here in error state when the uwq is closed.) + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + nni_aio_set_msg(p->aio_sendraw, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + // Send the message, but use the raw mode aio. + nni_pipe_send(p->pipe, p->aio_sendraw); +} + +static void +req0_sendraw_cb(void *arg) +{ + req0_pipe *p = arg; + + if (nni_aio_result(p->aio_sendraw) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_sendraw)); + nni_aio_set_msg(p->aio_sendraw, NULL); + nni_pipe_stop(p->pipe); + return; + } + + // Sent a message so we just need to look for another one. + nni_msgq_aio_get(p->req->uwq, p->aio_getq); +} + +static void +req0_sendcooked_cb(void *arg) +{ + req0_pipe *p = arg; + req0_sock *s = p->req; + + if (nni_aio_result(p->aio_sendcooked) != 0) { + // We failed to send... clean up and deal with it. + // We leave ourselves on the busy list for now, which + // means no new asynchronous traffic can occur here. + nni_msg_free(nni_aio_get_msg(p->aio_sendcooked)); + nni_aio_set_msg(p->aio_sendcooked, NULL); + nni_pipe_stop(p->pipe); + return; + } + + // Cooked mode. We completed a cooked send, so we need to + // reinsert ourselves in the ready list, and possibly schedule + // a resend. + + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->busypipes, p)) { + nni_list_remove(&s->busypipes, p); + nni_list_append(&s->readypipes, p); + req0_resend(s); + } else { + // We wind up here if stop was called from the reader + // side while we were waiting to be scheduled to run for the + // writer side. In this case we can't complete the operation, + // and we have to abort. + nni_pipe_stop(p->pipe); + } + nni_mtx_unlock(&s->mtx); +} + +static void +req0_putq_cb(void *arg) +{ + req0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->pipe); + return; + } + nni_aio_set_msg(p->aio_putq, NULL); + + nni_pipe_recv(p->pipe, p->aio_recv); +} + +static void +req0_recv_cb(void *arg) +{ + req0_pipe *p = arg; + nni_msg * msg; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + // We yank 4 bytes of body, and move them to the header. + if (nni_msg_len(msg) < 4) { + // Malformed message. + goto malformed; + } + if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { + // Arguably we could just discard and carry on. But + // dropping the connection is probably more helpful since + // it lets the other side see that a problem occurred. + // Plus it gives us a chance to reclaim some memory. + goto malformed; + } + (void) nni_msg_trim(msg, 4); // Cannot fail + + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(p->req->urq, p->aio_putq); + return; + +malformed: + nni_msg_free(msg); + nni_pipe_stop(p->pipe); +} + +static void +req0_timeout(void *arg) +{ + req0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (s->reqmsg != NULL) { + s->wantw = 1; + req0_resend(s); + } + nni_mtx_unlock(&s->mtx); +} + +static void +req0_resend(req0_sock *s) +{ + req0_pipe *p; + nni_msg * msg; + + // Note: This routine should be called with the socket lock held. + // Also, this should only be called while handling cooked mode + // requests. + if ((msg = s->reqmsg) == NULL) { + return; + } + + if (s->closed) { + s->reqmsg = NULL; + nni_msg_free(msg); + } + + if (s->wantw) { + s->wantw = 0; + + if (nni_msg_dup(&msg, s->reqmsg) != 0) { + // Failed to alloc message, reschedule it. Also, + // mark that we have a message we want to resend, + // in case something comes available. + s->wantw = 1; + nni_timer_schedule(&s->timer, nni_clock() + s->retry); + return; + } + + // Now we iterate across all possible outpipes, until + // one accepts it. + if ((p = nni_list_first(&s->readypipes)) == NULL) { + // No pipes ready to process us. Note that we have + // something to send, and schedule it. + nni_msg_free(msg); + s->wantw = 1; + return; + } + + nni_list_remove(&s->readypipes, p); + nni_list_append(&s->busypipes, p); + + s->pendpipe = p; + s->resend = nni_clock() + s->retry; + nni_aio_set_msg(p->aio_sendcooked, msg); + + // Note that because we were ready rather than busy, we + // should not have any I/O oustanding and hence the aio + // object will be available for our use. + nni_pipe_send(p->pipe, p->aio_sendcooked); + nni_timer_schedule(&s->timer, s->resend); + } +} + +static void +req0_sock_send(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + uint32_t id; + size_t len; + nni_msg * msg; + int rv; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_put(s->uwq, aio); + return; + } + + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + + // In cooked mode, because we need to manage our own resend logic, + // we bypass the upper writeq entirely. + + // Generate a new request ID. We always set the high + // order bit so that the peer can locate the end of the + // backtrace. (Pipe IDs have the high order bit clear.) + id = (s->nextid++) | 0x80000000u; + // Request ID is in big endian format. + NNI_PUT32(s->reqid, id); + + if ((rv = nni_msg_header_append(msg, s->reqid, 4)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + // If another message is there, this cancels it. + if (s->reqmsg != NULL) { + nni_msg_free(s->reqmsg); + s->reqmsg = NULL; + } + + nni_aio_set_msg(aio, NULL); + + // Make a duplicate message... for retries. + s->reqmsg = msg; + // Schedule for immediate send + s->resend = NNI_TIME_ZERO; + s->wantw = 1; + + req0_resend(s); + + nni_mtx_unlock(&s->mtx); + + nni_aio_finish(aio, 0, len); +} + +static nni_msg * +req0_sock_filter(void *arg, nni_msg *msg) +{ + req0_sock *s = arg; + nni_msg * rmsg; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + // Pass it unmolested + nni_mtx_unlock(&s->mtx); + return (msg); + } + + if (nni_msg_header_len(msg) < 4) { + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + return (NULL); + } + + if ((rmsg = s->reqmsg) == NULL) { + // We had no outstanding request. (Perhaps canceled, + // or duplicate response.) + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + return (NULL); + } + + if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) { + // Wrong request id. + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + return (NULL); + } + + s->reqmsg = NULL; + s->pendpipe = NULL; + nni_mtx_unlock(&s->mtx); + + nni_msg_free(rmsg); + + return (msg); +} + +static void +req0_sock_recv(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (!s->raw) { + if (s->reqmsg == NULL) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + } + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_get(s->urq, aio); +} + +static nni_proto_pipe_ops req0_pipe_ops = { + .pipe_init = req0_pipe_init, + .pipe_fini = req0_pipe_fini, + .pipe_start = req0_pipe_start, + .pipe_stop = req0_pipe_stop, +}; + +static nni_proto_sock_option req0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = req0_sock_getopt_raw, + .pso_setopt = req0_sock_setopt_raw, + }, + { + .pso_name = NNG_OPT_MAXTTL, + .pso_getopt = req0_sock_getopt_maxttl, + .pso_setopt = req0_sock_setopt_maxttl, + }, + { + .pso_name = NNG_OPT_REQ_RESENDTIME, + .pso_getopt = req0_sock_getopt_resendtime, + .pso_setopt = req0_sock_setopt_resendtime, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops req0_sock_ops = { + .sock_init = req0_sock_init, + .sock_fini = req0_sock_fini, + .sock_open = req0_sock_open, + .sock_close = req0_sock_close, + .sock_options = req0_sock_options, + .sock_filter = req0_sock_filter, + .sock_send = req0_sock_send, + .sock_recv = req0_sock_recv, +}; + +static nni_proto req0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REQ_V0, "req" }, + .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &req0_sock_ops, + .proto_pipe_ops = &req0_pipe_ops, +}; + +int +nng_req0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &req0_proto)); +} diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h new file mode 100644 index 00000000..99c9bf62 --- /dev/null +++ b/src/protocol/reqrep0/req.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REQ_H +#define NNG_PROTOCOL_REQREP0_REQ_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_req0_open(nng_socket *); + +#ifndef nng_req_open +#define nng_req_open nng_req0_open +#endif + +#define NNG_OPT_REQ_RESENDTIME "req:resend-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REQ_H diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c deleted file mode 100644 index 94730be6..00000000 --- a/src/protocol/survey/respond.c +++ /dev/null @@ -1,500 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include - -#include "core/nng_impl.h" - -// Respondent protocol. The RESPONDENT protocol is the "replier" side of -// the surveyor pattern. This is useful for building service discovery, or -// voting algorithsm, for example. - -typedef struct resp_pipe resp_pipe; -typedef struct resp_sock resp_sock; - -static void resp_recv_cb(void *); -static void resp_putq_cb(void *); -static void resp_getq_cb(void *); -static void resp_send_cb(void *); -static void resp_sock_getq_cb(void *); -static void resp_pipe_fini(void *); - -// A resp_sock is our per-socket protocol private structure. -struct resp_sock { - nni_msgq * urq; - nni_msgq * uwq; - int raw; - int ttl; - nni_idhash *pipes; - char * btrace; - size_t btrace_len; - nni_aio * aio_getq; - nni_mtx mtx; -}; - -// A resp_pipe is our per-pipe protocol private structure. -struct resp_pipe { - nni_pipe * npipe; - resp_sock *psock; - uint32_t id; - nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_putq; - nni_aio * aio_send; - nni_aio * aio_recv; -}; - -static void -resp_sock_fini(void *arg) -{ - resp_sock *s = arg; - - nni_aio_stop(s->aio_getq); - nni_aio_fini(s->aio_getq); - nni_idhash_fini(s->pipes); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); - } - nni_mtx_fini(&s->mtx); - NNI_FREE_STRUCT(s); -} - -static int -resp_sock_init(void **sp, nni_sock *nsock) -{ - resp_sock *s; - int rv; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&s->mtx); - if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) { - resp_sock_fini(s); - return (rv); - } - - s->ttl = 8; // Per RFC - s->raw = 0; - s->btrace = NULL; - s->btrace_len = 0; - s->urq = nni_sock_recvq(nsock); - s->uwq = nni_sock_sendq(nsock); - - *sp = s; - return (0); -} - -static void -resp_sock_open(void *arg) -{ - resp_sock *s = arg; - - nni_msgq_aio_get(s->uwq, s->aio_getq); -} - -static void -resp_sock_close(void *arg) -{ - resp_sock *s = arg; - - nni_aio_cancel(s->aio_getq, NNG_ECLOSED); -} - -static void -resp_pipe_fini(void *arg) -{ - resp_pipe *p = arg; - - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_msgq_fini(p->sendq); - NNI_FREE_STRUCT(p); -} - -static int -resp_pipe_init(void **pp, nni_pipe *npipe, void *s) -{ - resp_pipe *p; - int rv; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, resp_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, resp_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, resp_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, resp_send_cb, p)) != 0)) { - resp_pipe_fini(p); - return (rv); - } - - p->npipe = npipe; - p->psock = s; - *pp = p; - return (0); -} - -static int -resp_pipe_start(void *arg) -{ - resp_pipe *p = arg; - resp_sock *s = p->psock; - int rv; - - p->id = nni_pipe_id(p->npipe); - - nni_mtx_lock(&s->mtx); - rv = nni_idhash_insert(s->pipes, p->id, p); - nni_mtx_unlock(&s->mtx); - if (rv != 0) { - return (rv); - } - - nni_pipe_recv(p->npipe, p->aio_recv); - nni_msgq_aio_get(p->sendq, p->aio_getq); - - return (rv); -} - -static void -resp_pipe_stop(void *arg) -{ - resp_pipe *p = arg; - resp_sock *s = p->psock; - - nni_msgq_close(p->sendq); - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - - if (p->id != 0) { - nni_mtx_lock(&s->mtx); - nni_idhash_remove(s->pipes, p->id); - nni_mtx_unlock(&s->mtx); - p->id = 0; - } -} - -// resp_sock_send watches for messages from the upper write queue, -// extracts the destination pipe, and forwards it to the appropriate -// destination pipe via a separate queue. This prevents a single bad -// or slow pipe from gumming up the works for the entire socket.s - -void -resp_sock_getq_cb(void *arg) -{ - resp_sock *s = arg; - nni_msg * msg; - uint32_t id; - resp_pipe *p; - int rv; - - if (nni_aio_result(s->aio_getq) != 0) { - return; - } - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); - - // We yank the outgoing pipe id from the header - if (nni_msg_header_len(msg) < 4) { - nni_msg_free(msg); - // We can't really close down the socket, so just keep going. - nni_msgq_aio_get(s->uwq, s->aio_getq); - return; - } - id = nni_msg_header_trim_u32(msg); - - nni_mtx_lock(&s->mtx); - if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) != 0) { - // Destination pipe not present. - nni_msg_free(msg); - } else { - // Non-blocking put. - if (nni_msgq_tryput(p->sendq, msg) != 0) { - nni_msg_free(msg); - } - } - nni_msgq_aio_get(s->uwq, s->aio_getq); - nni_mtx_unlock(&s->mtx); -} - -void -resp_getq_cb(void *arg) -{ - resp_pipe *p = arg; - - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); - return; - } - - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - nni_pipe_send(p->npipe, p->aio_send); -} - -void -resp_send_cb(void *arg) -{ - resp_pipe *p = arg; - - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); - return; - } - - nni_msgq_aio_get(p->sendq, p->aio_getq); -} - -static void -resp_recv_cb(void *arg) -{ - resp_pipe *p = arg; - resp_sock *s = p->psock; - nni_msgq * urq = s->urq; - nni_msg * msg; - int hops; - int rv; - - if (nni_aio_result(p->aio_recv) != 0) { - goto error; - } - - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); - nni_msg_set_pipe(msg, p->id); - - // Store the pipe id in the header, first thing. - if (nni_msg_header_append_u32(msg, p->id) != 0) { - nni_msg_free(msg); - goto error; - } - - // Move backtrace from body to header - hops = 0; - for (;;) { - int end = 0; - uint8_t *body; - - if (hops >= s->ttl) { - nni_msg_free(msg); - goto error; - } - if (nni_msg_len(msg) < 4) { - nni_msg_free(msg); - goto error; - } - body = nni_msg_body(msg); - end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_header_append(msg, body, 4); - if (rv != 0) { - nni_msg_free(msg); - goto error; - } - nni_msg_trim(msg, 4); - if (end) { - break; - } - } - - // Now send it up. - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(urq, p->aio_putq); - return; - -error: - nni_pipe_stop(p->npipe); -} - -static void -resp_putq_cb(void *arg) -{ - resp_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); - } - - nni_pipe_recv(p->npipe, p->aio_recv); -} - -static int -resp_sock_setopt_raw(void *arg, const void *buf, size_t sz) -{ - resp_sock *s = arg; - int rv; - - nni_mtx_lock(&s->mtx); - rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); - nni_mtx_unlock(&s->mtx); - return (rv); -} - -static int -resp_sock_getopt_raw(void *arg, void *buf, size_t *szp) -{ - resp_sock *s = arg; - return (nni_getopt_int(s->raw, buf, szp)); -} - -static int -resp_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) -{ - resp_sock *s = arg; - return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); -} - -static int -resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) -{ - resp_sock *s = arg; - return (nni_getopt_int(s->ttl, buf, szp)); -} - -static void -resp_sock_send(void *arg, nni_aio *aio) -{ - resp_sock *s = arg; - nni_msg * msg; - int rv; - - nni_mtx_lock(&s->mtx); - if (s->raw) { - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); - return; - } - - msg = nni_aio_get_msg(aio); - - // If we have a stored backtrace, append it to the header... - // if we don't have a backtrace, discard the message. - if (s->btrace == NULL) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } - - // drop anything else in the header... - nni_msg_header_clear(msg); - - if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); - return; - } - - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); -} - -static nni_msg * -resp_sock_filter(void *arg, nni_msg *msg) -{ - resp_sock *s = arg; - char * header; - size_t len; - - nni_mtx_lock(&s->mtx); - if (s->raw) { - nni_mtx_unlock(&s->mtx); - return (msg); - } - - len = nni_msg_header_len(msg); - header = nni_msg_header(msg); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - } - if ((s->btrace = nni_alloc(len)) == NULL) { - nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); - } - s->btrace_len = len; - memcpy(s->btrace, header, len); - nni_msg_header_clear(msg); - nni_mtx_unlock(&s->mtx); - return (msg); -} - -static void -resp_sock_recv(void *arg, nni_aio *aio) -{ - resp_sock *s = arg; - - nni_msgq_aio_get(s->urq, aio); -} - -static nni_proto_pipe_ops resp_pipe_ops = { - .pipe_init = resp_pipe_init, - .pipe_fini = resp_pipe_fini, - .pipe_start = resp_pipe_start, - .pipe_stop = resp_pipe_stop, -}; - -static nni_proto_sock_option resp_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_getopt = resp_sock_getopt_raw, - .pso_setopt = resp_sock_setopt_raw, - }, - { - .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = resp_sock_getopt_maxttl, - .pso_setopt = resp_sock_setopt_maxttl, - }, - // terminate list - { NULL, NULL, NULL }, -}; - -static nni_proto_sock_ops resp_sock_ops = { - .sock_init = resp_sock_init, - .sock_fini = resp_sock_fini, - .sock_open = resp_sock_open, - .sock_close = resp_sock_close, - .sock_filter = resp_sock_filter, - .sock_send = resp_sock_send, - .sock_recv = resp_sock_recv, - .sock_options = resp_sock_options, -}; - -static nni_proto resp_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, - .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &resp_sock_ops, - .proto_pipe_ops = &resp_pipe_ops, -}; - -int -nng_respondent0_open(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &resp_proto)); -} diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c deleted file mode 100644 index 9dcd6664..00000000 --- a/src/protocol/survey/survey.c +++ /dev/null @@ -1,475 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include - -#include "core/nng_impl.h" - -// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the -// survey pattern. This is useful for building service discovery, voting, etc. - -typedef struct surv_pipe surv_pipe; -typedef struct surv_sock surv_sock; - -static void surv_sock_getq_cb(void *); -static void surv_getq_cb(void *); -static void surv_putq_cb(void *); -static void surv_send_cb(void *); -static void surv_recv_cb(void *); -static void surv_timeout(void *); - -// A surv_sock is our per-socket protocol private structure. -struct surv_sock { - nni_duration survtime; - nni_time expire; - int raw; - int closing; - uint32_t nextid; // next id - uint32_t survid; // outstanding request ID (big endian) - nni_list pipes; - nni_aio * aio_getq; - nni_timer_node timer; - nni_msgq * uwq; - nni_msgq * urq; - nni_mtx mtx; -}; - -// A surv_pipe is our per-pipe protocol private structure. -struct surv_pipe { - nni_pipe * npipe; - surv_sock * psock; - nni_msgq * sendq; - nni_list_node node; - nni_aio * aio_getq; - nni_aio * aio_putq; - nni_aio * aio_send; - nni_aio * aio_recv; -}; - -static void -surv_sock_fini(void *arg) -{ - surv_sock *s = arg; - - nni_aio_stop(s->aio_getq); - nni_aio_fini(s->aio_getq); - nni_mtx_fini(&s->mtx); - NNI_FREE_STRUCT(s); -} - -static int -surv_sock_init(void **sp, nni_sock *nsock) -{ - surv_sock *s; - int rv; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_aio_init(&s->aio_getq, surv_sock_getq_cb, s)) != 0) { - surv_sock_fini(s); - return (rv); - } - NNI_LIST_INIT(&s->pipes, surv_pipe, node); - nni_mtx_init(&s->mtx); - nni_timer_init(&s->timer, surv_timeout, s); - - s->nextid = nni_random(); - s->raw = 0; - s->survtime = NNI_SECOND * 60; - s->expire = NNI_TIME_ZERO; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - - *sp = s; - return (0); -} - -static void -surv_sock_open(void *arg) -{ - surv_sock *s = arg; - - nni_msgq_aio_get(s->uwq, s->aio_getq); -} - -static void -surv_sock_close(void *arg) -{ - surv_sock *s = arg; - - nni_timer_cancel(&s->timer); - nni_aio_cancel(s->aio_getq, NNG_ECLOSED); -} - -static void -surv_pipe_fini(void *arg) -{ - surv_pipe *p = arg; - - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_msgq_fini(p->sendq); - NNI_FREE_STRUCT(p); -} - -static int -surv_pipe_init(void **pp, nni_pipe *npipe, void *s) -{ - surv_pipe *p; - int rv; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - // This depth could be tunable. - if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, surv_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, surv_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, surv_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, surv_recv_cb, p)) != 0)) { - surv_pipe_fini(p); - return (rv); - } - - p->npipe = npipe; - p->psock = s; - *pp = p; - return (0); -} - -static int -surv_pipe_start(void *arg) -{ - surv_pipe *p = arg; - surv_sock *s = p->psock; - - nni_mtx_lock(&s->mtx); - nni_list_append(&s->pipes, p); - nni_mtx_unlock(&s->mtx); - - nni_msgq_aio_get(p->sendq, p->aio_getq); - nni_pipe_recv(p->npipe, p->aio_recv); - return (0); -} - -static void -surv_pipe_stop(void *arg) -{ - surv_pipe *p = arg; - surv_sock *s = p->psock; - - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); - - nni_msgq_close(p->sendq); - - nni_mtx_lock(&s->mtx); - if (nni_list_active(&s->pipes, p)) { - nni_list_remove(&s->pipes, p); - } - nni_mtx_unlock(&s->mtx); -} - -static void -surv_getq_cb(void *arg) -{ - surv_pipe *p = arg; - - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); - return; - } - - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - nni_pipe_send(p->npipe, p->aio_send); -} - -static void -surv_send_cb(void *arg) -{ - surv_pipe *p = arg; - - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); - return; - } - - nni_msgq_aio_get(p->psock->uwq, p->aio_getq); -} - -static void -surv_putq_cb(void *arg) -{ - surv_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); - return; - } - - nni_pipe_recv(p->npipe, p->aio_recv); -} - -static void -surv_recv_cb(void *arg) -{ - surv_pipe *p = arg; - nni_msg * msg; - - if (nni_aio_result(p->aio_recv) != 0) { - goto failed; - } - - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); - nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); - - // We yank 4 bytes of body, and move them to the header. - if (nni_msg_len(msg) < 4) { - // Not enough data, just toss it. - nni_msg_free(msg); - goto failed; - } - if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { - // Should be NNG_ENOMEM - nni_msg_free(msg); - goto failed; - } - (void) nni_msg_trim(msg, 4); - - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(p->psock->urq, p->aio_putq); - return; - -failed: - nni_pipe_stop(p->npipe); -} - -static int -surv_sock_setopt_raw(void *arg, const void *buf, size_t sz) -{ - surv_sock *s = arg; - int rv; - - nni_mtx_lock(&s->mtx); - if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) { - s->survid = 0; - nni_timer_cancel(&s->timer); - } - nni_mtx_unlock(&s->mtx); - return (rv); -} - -static int -surv_sock_getopt_raw(void *arg, void *buf, size_t *szp) -{ - surv_sock *s = arg; - return (nni_getopt_int(s->raw, buf, szp)); -} - -static int -surv_sock_setopt_surveytime(void *arg, const void *buf, size_t sz) -{ - surv_sock *s = arg; - return (nni_setopt_ms(&s->survtime, buf, sz)); -} - -static int -surv_sock_getopt_surveytime(void *arg, void *buf, size_t *szp) -{ - surv_sock *s = arg; - return (nni_getopt_ms(s->survtime, buf, szp)); -} - -static void -surv_sock_getq_cb(void *arg) -{ - surv_sock *s = arg; - surv_pipe *p; - surv_pipe *last; - nni_msg * msg, *dup; - - if (nni_aio_result(s->aio_getq) != 0) { - // Should be NNG_ECLOSED. - return; - } - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); - - nni_mtx_lock(&s->mtx); - last = nni_list_last(&s->pipes); - NNI_LIST_FOREACH (&s->pipes, p) { - if (p != last) { - if (nni_msg_dup(&dup, msg) != 0) { - continue; - } - } else { - dup = msg; - } - if (nni_msgq_tryput(p->sendq, dup) != 0) { - nni_msg_free(dup); - } - } - nni_mtx_unlock(&s->mtx); - - if (last == NULL) { - // If there were no pipes to send on, just toss the message. - nni_msg_free(msg); - } -} - -static void -surv_timeout(void *arg) -{ - surv_sock *s = arg; - - nni_mtx_lock(&s->mtx); - s->survid = 0; - nni_mtx_unlock(&s->mtx); - nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT); -} - -static void -surv_sock_recv(void *arg, nni_aio *aio) -{ - surv_sock *s = arg; - - nni_mtx_lock(&s->mtx); - if (s->survid == 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->urq, aio); -} - -static void -surv_sock_send(void *arg, nni_aio *aio) -{ - surv_sock *s = arg; - nni_msg * msg; - int rv; - - nni_mtx_lock(&s->mtx); - if (s->raw) { - // No automatic retry, and the request ID must - // be in the header coming down. - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); - return; - } - - // Generate a new request ID. We always set the high - // order bit so that the peer can locate the end of the - // backtrace. (Pipe IDs have the high order bit clear.) - s->survid = (s->nextid++) | 0x80000000u; - - msg = nni_aio_get_msg(aio); - nni_msg_header_clear(msg); - if ((rv = nni_msg_header_append_u32(msg, s->survid)) != 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); - return; - } - - // If another message is there, this cancels it. We move the - // survey expiration out. The timeout thread will wake up in - // the wake below, and reschedule itself appropriately. - s->expire = nni_clock() + s->survtime; - nni_timer_schedule(&s->timer, s->expire); - - nni_mtx_unlock(&s->mtx); - - nni_msgq_aio_put(s->uwq, aio); -} - -static nni_msg * -surv_sock_filter(void *arg, nni_msg *msg) -{ - surv_sock *s = arg; - - nni_mtx_lock(&s->mtx); - if (s->raw) { - // Pass it unmolested - nni_mtx_unlock(&s->mtx); - return (msg); - } - - if ((nni_msg_header_len(msg) < sizeof(uint32_t)) || - (nni_msg_header_trim_u32(msg) != s->survid)) { - // Wrong request id - nni_msg_free(msg); - return (NULL); - } - nni_mtx_unlock(&s->mtx); - - return (msg); -} - -static nni_proto_pipe_ops surv_pipe_ops = { - .pipe_init = surv_pipe_init, - .pipe_fini = surv_pipe_fini, - .pipe_start = surv_pipe_start, - .pipe_stop = surv_pipe_stop, -}; - -static nni_proto_sock_option surv_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_getopt = surv_sock_getopt_raw, - .pso_setopt = surv_sock_setopt_raw, - }, - { - .pso_name = NNG_OPT_SURVEYOR_SURVEYTIME, - .pso_getopt = surv_sock_getopt_surveytime, - .pso_setopt = surv_sock_setopt_surveytime, - }, - // terminate list - { NULL, NULL, NULL }, -}; - -static nni_proto_sock_ops surv_sock_ops = { - .sock_init = surv_sock_init, - .sock_fini = surv_sock_fini, - .sock_open = surv_sock_open, - .sock_close = surv_sock_close, - .sock_send = surv_sock_send, - .sock_recv = surv_sock_recv, - .sock_filter = surv_sock_filter, - .sock_options = surv_sock_options, -}; - -static nni_proto surv_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &surv_sock_ops, - .proto_pipe_ops = &surv_pipe_ops, -}; - -int -nng_surveyor0_open(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &surv_proto)); -} diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt new file mode 100644 index 00000000..61e5aa7b --- /dev/null +++ b/src/protocol/survey0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore +# Copyright 2017 Capitar IT Group BV +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_SURVEYOR0) + set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/survey.h) + install(FILES survey.h DESTINATION include/nng/protocol/survey0) +endif() + +if (NNG_PROTO_RESPONDENT0) + set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/respond.h) + install(FILES respond.h DESTINATION include/nng/protocol/survey0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${SURV0_SOURCES} ${RESP0_SOURCES} PARENT_SCOPE) \ No newline at end of file diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c new file mode 100644 index 00000000..73e919c3 --- /dev/null +++ b/src/protocol/survey0/respond.c @@ -0,0 +1,509 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" +#include "protocol/survey0/respond.h" + +// Respondent protocol. The RESPONDENT protocol is the "replier" side of +// the surveyor pattern. This is useful for building service discovery, or +// voting algorithms, for example. + +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif + +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif + +typedef struct resp0_pipe resp0_pipe; +typedef struct resp0_sock resp0_sock; + +static void resp0_recv_cb(void *); +static void resp0_putq_cb(void *); +static void resp0_getq_cb(void *); +static void resp0_send_cb(void *); +static void resp0_sock_getq_cb(void *); +static void resp0_pipe_fini(void *); + +// resp0_sock is our per-socket protocol private structure. +struct resp0_sock { + nni_msgq * urq; + nni_msgq * uwq; + int raw; + int ttl; + nni_idhash *pipes; + char * btrace; + size_t btrace_len; + nni_aio * aio_getq; + nni_mtx mtx; +}; + +// resp0_pipe is our per-pipe protocol private structure. +struct resp0_pipe { + nni_pipe * npipe; + resp0_sock *psock; + uint32_t id; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; +}; + +static void +resp0_sock_fini(void *arg) +{ + resp0_sock *s = arg; + + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_idhash_fini(s->pipes); + if (s->btrace != NULL) { + nni_free(s->btrace, s->btrace_len); + } + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static int +resp0_sock_init(void **sp, nni_sock *nsock) +{ + resp0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->mtx); + if (((rv = nni_idhash_init(&s->pipes)) != 0) || + ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) { + resp0_sock_fini(s); + return (rv); + } + + s->ttl = 8; // Per RFC + s->raw = 0; + s->btrace = NULL; + s->btrace_len = 0; + s->urq = nni_sock_recvq(nsock); + s->uwq = nni_sock_sendq(nsock); + + *sp = s; + return (0); +} + +static void +resp0_sock_open(void *arg) +{ + resp0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +resp0_sock_close(void *arg) +{ + resp0_sock *s = arg; + + nni_aio_cancel(s->aio_getq, NNG_ECLOSED); +} + +static void +resp0_pipe_fini(void *arg) +{ + resp0_pipe *p = arg; + + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + +static int +resp0_pipe_init(void **pp, nni_pipe *npipe, void *s) +{ + resp0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) { + resp0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = s; + *pp = p; + return (0); +} + +static int +resp0_pipe_start(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + int rv; + + p->id = nni_pipe_id(p->npipe); + + nni_mtx_lock(&s->mtx); + rv = nni_idhash_insert(s->pipes, p->id, p); + nni_mtx_unlock(&s->mtx); + if (rv != 0) { + return (rv); + } + + nni_pipe_recv(p->npipe, p->aio_recv); + nni_msgq_aio_get(p->sendq, p->aio_getq); + + return (rv); +} + +static void +resp0_pipe_stop(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + + nni_msgq_close(p->sendq); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + + if (p->id != 0) { + nni_mtx_lock(&s->mtx); + nni_idhash_remove(s->pipes, p->id); + nni_mtx_unlock(&s->mtx); + p->id = 0; + } +} + +// resp0_sock_send watches for messages from the upper write queue, +// extracts the destination pipe, and forwards it to the appropriate +// destination pipe via a separate queue. This prevents a single bad +// or slow pipe from gumming up the works for the entire socket.s + +void +resp0_sock_getq_cb(void *arg) +{ + resp0_sock *s = arg; + nni_msg * msg; + uint32_t id; + resp0_pipe *p; + int rv; + + if (nni_aio_result(s->aio_getq) != 0) { + return; + } + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); + + // We yank the outgoing pipe id from the header + if (nni_msg_header_len(msg) < 4) { + nni_msg_free(msg); + // We can't really close down the socket, so just keep going. + nni_msgq_aio_get(s->uwq, s->aio_getq); + return; + } + id = nni_msg_header_trim_u32(msg); + + nni_mtx_lock(&s->mtx); + if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) != 0) { + // Destination pipe not present. + nni_msg_free(msg); + } else { + // Non-blocking put. + if (nni_msgq_tryput(p->sendq, msg) != 0) { + nni_msg_free(msg); + } + } + nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_mtx_unlock(&s->mtx); +} + +void +resp0_getq_cb(void *arg) +{ + resp0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->npipe, p->aio_send); +} + +void +resp0_send_cb(void *arg) +{ + resp0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_msgq_aio_get(p->sendq, p->aio_getq); +} + +static void +resp0_recv_cb(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + nni_msgq * urq = s->urq; + nni_msg * msg; + int hops; + int rv; + + if (nni_aio_result(p->aio_recv) != 0) { + goto error; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, p->id); + + // Store the pipe id in the header, first thing. + if (nni_msg_header_append_u32(msg, p->id) != 0) { + nni_msg_free(msg); + goto error; + } + + // Move backtrace from body to header + hops = 0; + for (;;) { + int end = 0; + uint8_t *body; + + if (hops >= s->ttl) { + nni_msg_free(msg); + goto error; + } + if (nni_msg_len(msg) < 4) { + nni_msg_free(msg); + goto error; + } + body = nni_msg_body(msg); + end = (body[0] & 0x80) ? 1 : 0; + rv = nni_msg_header_append(msg, body, 4); + if (rv != 0) { + nni_msg_free(msg); + goto error; + } + nni_msg_trim(msg, 4); + if (end) { + break; + } + } + + // Now send it up. + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(urq, p->aio_putq); + return; + +error: + nni_pipe_stop(p->npipe); +} + +static void +resp0_putq_cb(void *arg) +{ + resp0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->npipe); + } + + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static int +resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + resp0_sock *s = arg; + int rv; + + nni_mtx_lock(&s->mtx); + rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + resp0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static int +resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +{ + resp0_sock *s = arg; + return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); +} + +static int +resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +{ + resp0_sock *s = arg; + return (nni_getopt_int(s->ttl, buf, szp)); +} + +static void +resp0_sock_send(void *arg, nni_aio *aio) +{ + resp0_sock *s = arg; + nni_msg * msg; + int rv; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_put(s->uwq, aio); + return; + } + + msg = nni_aio_get_msg(aio); + + // If we have a stored backtrace, append it to the header... + // if we don't have a backtrace, discard the message. + if (s->btrace == NULL) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + + // drop anything else in the header... + nni_msg_header_clear(msg); + + if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + nni_free(s->btrace, s->btrace_len); + s->btrace = NULL; + s->btrace_len = 0; + + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_put(s->uwq, aio); +} + +static nni_msg * +resp0_sock_filter(void *arg, nni_msg *msg) +{ + resp0_sock *s = arg; + char * header; + size_t len; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + nni_mtx_unlock(&s->mtx); + return (msg); + } + + len = nni_msg_header_len(msg); + header = nni_msg_header(msg); + if (s->btrace != NULL) { + nni_free(s->btrace, s->btrace_len); + s->btrace = NULL; + s->btrace_len = 0; + } + if ((s->btrace = nni_alloc(len)) == NULL) { + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + return (NULL); + } + s->btrace_len = len; + memcpy(s->btrace, header, len); + nni_msg_header_clear(msg); + nni_mtx_unlock(&s->mtx); + return (msg); +} + +static void +resp0_sock_recv(void *arg, nni_aio *aio) +{ + resp0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static nni_proto_pipe_ops resp0_pipe_ops = { + .pipe_init = resp0_pipe_init, + .pipe_fini = resp0_pipe_fini, + .pipe_start = resp0_pipe_start, + .pipe_stop = resp0_pipe_stop, +}; + +static nni_proto_sock_option resp0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = resp0_sock_getopt_raw, + .pso_setopt = resp0_sock_setopt_raw, + }, + { + .pso_name = NNG_OPT_MAXTTL, + .pso_getopt = resp0_sock_getopt_maxttl, + .pso_setopt = resp0_sock_setopt_maxttl, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops resp0_sock_ops = { + .sock_init = resp0_sock_init, + .sock_fini = resp0_sock_fini, + .sock_open = resp0_sock_open, + .sock_close = resp0_sock_close, + .sock_filter = resp0_sock_filter, + .sock_send = resp0_sock_send, + .sock_recv = resp0_sock_recv, + .sock_options = resp0_sock_options, +}; + +static nni_proto resp0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &resp0_sock_ops, + .proto_pipe_ops = &resp0_pipe_ops, +}; + +int +nng_respondent0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &resp0_proto)); +} diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h new file mode 100644 index 00000000..58c65298 --- /dev/null +++ b/src/protocol/survey0/respond.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_RESPOND_H +#define NNG_PROTOCOL_SURVEY0_RESPOND_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_respondent0_open(nng_socket *); + +#ifndef nng_respondent_open +#define nng_respondent_open nng_respondent0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_RESPOND_H diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c new file mode 100644 index 00000000..02283436 --- /dev/null +++ b/src/protocol/survey0/survey.c @@ -0,0 +1,484 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" +#include "protocol/survey0/survey.h" + +// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the +// survey pattern. This is useful for building service discovery, voting, etc. + +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif + +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif + +typedef struct surv0_pipe surv0_pipe; +typedef struct surv0_sock surv0_sock; + +static void surv0_sock_getq_cb(void *); +static void surv0_getq_cb(void *); +static void surv0_putq_cb(void *); +static void surv0_send_cb(void *); +static void surv0_recv_cb(void *); +static void surv0_timeout(void *); + +// surv0_sock is our per-socket protocol private structure. +struct surv0_sock { + nni_duration survtime; + nni_time expire; + int raw; + int closing; + uint32_t nextid; // next id + uint32_t survid; // outstanding request ID (big endian) + nni_list pipes; + nni_aio * aio_getq; + nni_timer_node timer; + nni_msgq * uwq; + nni_msgq * urq; + nni_mtx mtx; +}; + +// surv0_pipe is our per-pipe protocol private structure. +struct surv0_pipe { + nni_pipe * npipe; + surv0_sock * psock; + nni_msgq * sendq; + nni_list_node node; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; +}; + +static void +surv0_sock_fini(void *arg) +{ + surv0_sock *s = arg; + + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static int +surv0_sock_init(void **sp, nni_sock *nsock) +{ + surv0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) { + surv0_sock_fini(s); + return (rv); + } + NNI_LIST_INIT(&s->pipes, surv0_pipe, node); + nni_mtx_init(&s->mtx); + nni_timer_init(&s->timer, surv0_timeout, s); + + s->nextid = nni_random(); + s->raw = 0; + s->survtime = NNI_SECOND * 60; + s->expire = NNI_TIME_ZERO; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + + *sp = s; + return (0); +} + +static void +surv0_sock_open(void *arg) +{ + surv0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +surv0_sock_close(void *arg) +{ + surv0_sock *s = arg; + + nni_timer_cancel(&s->timer); + nni_aio_cancel(s->aio_getq, NNG_ECLOSED); +} + +static void +surv0_pipe_fini(void *arg) +{ + surv0_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + +static int +surv0_pipe_init(void **pp, nni_pipe *npipe, void *s) +{ + surv0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + // This depth could be tunable. + if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) { + surv0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = s; + *pp = p; + return (0); +} + +static int +surv0_pipe_start(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *s = p->psock; + + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + nni_mtx_unlock(&s->mtx); + + nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_pipe_recv(p->npipe, p->aio_recv); + return (0); +} + +static void +surv0_pipe_stop(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *s = p->psock; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); + + nni_msgq_close(p->sendq); + + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); + } + nni_mtx_unlock(&s->mtx); +} + +static void +surv0_getq_cb(void *arg) +{ + surv0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->npipe, p->aio_send); +} + +static void +surv0_send_cb(void *arg) +{ + surv0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_msgq_aio_get(p->psock->uwq, p->aio_getq); +} + +static void +surv0_putq_cb(void *arg) +{ + surv0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static void +surv0_recv_cb(void *arg) +{ + surv0_pipe *p = arg; + nni_msg * msg; + + if (nni_aio_result(p->aio_recv) != 0) { + goto failed; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); + + // We yank 4 bytes of body, and move them to the header. + if (nni_msg_len(msg) < 4) { + // Not enough data, just toss it. + nni_msg_free(msg); + goto failed; + } + if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { + // Should be NNG_ENOMEM + nni_msg_free(msg); + goto failed; + } + (void) nni_msg_trim(msg, 4); + + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(p->psock->urq, p->aio_putq); + return; + +failed: + nni_pipe_stop(p->npipe); +} + +static int +surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + surv0_sock *s = arg; + int rv; + + nni_mtx_lock(&s->mtx); + if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) { + s->survid = 0; + nni_timer_cancel(&s->timer); + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + surv0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static int +surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz) +{ + surv0_sock *s = arg; + return (nni_setopt_ms(&s->survtime, buf, sz)); +} + +static int +surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp) +{ + surv0_sock *s = arg; + return (nni_getopt_ms(s->survtime, buf, szp)); +} + +static void +surv0_sock_getq_cb(void *arg) +{ + surv0_sock *s = arg; + surv0_pipe *p; + surv0_pipe *last; + nni_msg * msg, *dup; + + if (nni_aio_result(s->aio_getq) != 0) { + // Should be NNG_ECLOSED. + return; + } + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); + + nni_mtx_lock(&s->mtx); + last = nni_list_last(&s->pipes); + NNI_LIST_FOREACH (&s->pipes, p) { + if (p != last) { + if (nni_msg_dup(&dup, msg) != 0) { + continue; + } + } else { + dup = msg; + } + if (nni_msgq_tryput(p->sendq, dup) != 0) { + nni_msg_free(dup); + } + } + nni_mtx_unlock(&s->mtx); + + if (last == NULL) { + // If there were no pipes to send on, just toss the message. + nni_msg_free(msg); + } +} + +static void +surv0_timeout(void *arg) +{ + surv0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + s->survid = 0; + nni_mtx_unlock(&s->mtx); + nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT); +} + +static void +surv0_sock_recv(void *arg, nni_aio *aio) +{ + surv0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (s->survid == 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_get(s->urq, aio); +} + +static void +surv0_sock_send(void *arg, nni_aio *aio) +{ + surv0_sock *s = arg; + nni_msg * msg; + int rv; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + // No automatic retry, and the request ID must + // be in the header coming down. + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_put(s->uwq, aio); + return; + } + + // Generate a new request ID. We always set the high + // order bit so that the peer can locate the end of the + // backtrace. (Pipe IDs have the high order bit clear.) + s->survid = (s->nextid++) | 0x80000000u; + + msg = nni_aio_get_msg(aio); + nni_msg_header_clear(msg); + if ((rv = nni_msg_header_append_u32(msg, s->survid)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + // If another message is there, this cancels it. We move the + // survey expiration out. The timeout thread will wake up in + // the wake below, and reschedule itself appropriately. + s->expire = nni_clock() + s->survtime; + nni_timer_schedule(&s->timer, s->expire); + + nni_mtx_unlock(&s->mtx); + + nni_msgq_aio_put(s->uwq, aio); +} + +static nni_msg * +surv0_sock_filter(void *arg, nni_msg *msg) +{ + surv0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + // Pass it unmolested + nni_mtx_unlock(&s->mtx); + return (msg); + } + + if ((nni_msg_header_len(msg) < sizeof(uint32_t)) || + (nni_msg_header_trim_u32(msg) != s->survid)) { + // Wrong request id + nni_msg_free(msg); + return (NULL); + } + nni_mtx_unlock(&s->mtx); + + return (msg); +} + +static nni_proto_pipe_ops surv0_pipe_ops = { + .pipe_init = surv0_pipe_init, + .pipe_fini = surv0_pipe_fini, + .pipe_start = surv0_pipe_start, + .pipe_stop = surv0_pipe_stop, +}; + +static nni_proto_sock_option surv0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = surv0_sock_getopt_raw, + .pso_setopt = surv0_sock_setopt_raw, + }, + { + .pso_name = NNG_OPT_SURVEYOR_SURVEYTIME, + .pso_getopt = surv0_sock_getopt_surveytime, + .pso_setopt = surv0_sock_setopt_surveytime, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops surv0_sock_ops = { + .sock_init = surv0_sock_init, + .sock_fini = surv0_sock_fini, + .sock_open = surv0_sock_open, + .sock_close = surv0_sock_close, + .sock_send = surv0_sock_send, + .sock_recv = surv0_sock_recv, + .sock_filter = surv0_sock_filter, + .sock_options = surv0_sock_options, +}; + +static nni_proto surv0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &surv0_sock_ops, + .proto_pipe_ops = &surv0_pipe_ops, +}; + +int +nng_surveyor0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &surv0_proto)); +} diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h new file mode 100644 index 00000000..a7b6d943 --- /dev/null +++ b/src/protocol/survey0/survey.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_SURVEY_H +#define NNG_PROTOCOL_SURVEY0_SURVEY_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_surveyor0_open(nng_socket *); + +#ifndef nng_surveyor_open +#define nng_surveyor_open nng_surveyor0_open +#endif + +#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_SURVEY_H -- cgit v1.2.3-70-g09d2