diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-31 13:06:38 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-11-02 16:10:26 -0700 |
| commit | 7bf591e20a94b8d926f92ab9b320f3b75d342345 (patch) | |
| tree | d67ce7cab328a004346419047feede7d579dad77 /src/protocol/reqrep0 | |
| parent | d340af7dc250388f48d36c5078c4857c51bb6121 (diff) | |
| download | nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.tar.gz nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.tar.bz2 nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.zip | |
fixes #143 Protocols and transports should be "configurable"
This makes all the protocols and transports optional. All
of them except ZeroTier are enabled by default, but you can
now disable them (remove from the build) with cmake options.
The test suite is modified so that tests still run as much
as they can, but skip over things caused by missing functionality
from the library (due to configuration).
Further, the constant definitions and prototypes for functions
that are specific to transports or protocols are moved into
appropriate headers, which should be included directly by
applications wishing to use these.
We have also added and improved documentation -- all of the
transports are documented, and several more man pages for
protocols have been added. (Req/Rep and Surveyor are still
missing.)
Diffstat (limited to 'src/protocol/reqrep0')
| -rw-r--r-- | src/protocol/reqrep0/CMakeLists.txt | 23 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 515 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.h | 28 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 675 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.h | 30 |
5 files changed, 1271 insertions, 0 deletions
diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt new file mode 100644 index 00000000..4e82ad41 --- /dev/null +++ b/src/protocol/reqrep0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_REQ0) + set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/req.h) + install(FILES req.h DESTINATION include/nng/protocol/reqrep0) +endif() + +if (NNG_PROTO_REP0) + set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/rep.h) + install(FILES rep.h DESTINATION include/nng/protocol/reqrep0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${REQ0_SOURCES} ${REP0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c new file mode 100644 index 00000000..ee8e4277 --- /dev/null +++ b/src/protocol/reqrep0/rep.c @@ -0,0 +1,515 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "protocol/reqrep0/rep.h" + +// Response protocol. The REP protocol is the "reply" side of a +// request-reply pair. This is useful for building RPC servers, for +// example. + +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif + +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif + +typedef struct rep0_pipe rep0_pipe; +typedef struct rep0_sock rep0_sock; + +static void rep0_sock_getq_cb(void *); +static void rep0_pipe_getq_cb(void *); +static void rep0_pipe_putq_cb(void *); +static void rep0_pipe_send_cb(void *); +static void rep0_pipe_recv_cb(void *); +static void rep0_pipe_fini(void *); + +// rep0_sock is our per-socket protocol private structure. +struct rep0_sock { + nni_msgq * uwq; + nni_msgq * urq; + nni_mtx lk; + int raw; + int ttl; + nni_idhash *pipes; + char * btrace; + size_t btrace_len; + nni_aio * aio_getq; +}; + +// rep0_pipe is our per-pipe protocol private structure. +struct rep0_pipe { + nni_pipe * pipe; + rep0_sock *rep; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_putq; +}; + +static void +rep0_sock_fini(void *arg) +{ + rep0_sock *s = arg; + + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_idhash_fini(s->pipes); + if (s->btrace != NULL) { + nni_free(s->btrace, s->btrace_len); + } + nni_mtx_fini(&s->lk); + NNI_FREE_STRUCT(s); +} + +static int +rep0_sock_init(void **sp, nni_sock *sock) +{ + rep0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->lk); + if (((rv = nni_idhash_init(&s->pipes)) != 0) || + ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) { + rep0_sock_fini(s); + return (rv); + } + + s->ttl = 8; // Per RFC + s->raw = 0; + s->btrace = NULL; + s->btrace_len = 0; + s->uwq = nni_sock_sendq(sock); + s->urq = nni_sock_recvq(sock); + + *sp = s; + + return (0); +} + +static void +rep0_sock_open(void *arg) +{ + rep0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +rep0_sock_close(void *arg) +{ + rep0_sock *s = arg; + + nni_aio_cancel(s->aio_getq, NNG_ECLOSED); +} + +static void +rep0_pipe_fini(void *arg) +{ + rep0_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + +static int +rep0_pipe_init(void **pp, nni_pipe *pipe, void *s) +{ + rep0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) { + rep0_pipe_fini(p); + return (rv); + } + + p->pipe = pipe; + p->rep = s; + *pp = p; + return (0); +} + +static int +rep0_pipe_start(void *arg) +{ + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + int rv; + + if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { + return (rv); + } + + nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_pipe_recv(p->pipe, p->aio_recv); + return (0); +} + +static void +rep0_pipe_stop(void *arg) +{ + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + + nni_msgq_close(p->sendq); + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); + + nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); +} + +static void +rep0_sock_getq_cb(void *arg) +{ + rep0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg; + uint32_t id; + rep0_pipe *p; + int rv; + + // This watches for messages from the upper write queue, + // extracts the destination pipe, and forwards it to the appropriate + // destination pipe via a separate queue. This prevents a single bad + // or slow pipe from gumming up the works for the entire socket. + + if (nni_aio_result(s->aio_getq) != 0) { + // Closed socket? + return; + } + + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); + + // We yank the outgoing pipe id from the header + if (nni_msg_header_len(msg) < 4) { + nni_msg_free(msg); + + // Look for another message on the upper write queue. + nni_msgq_aio_get(uwq, s->aio_getq); + return; + } + + id = nni_msg_header_trim_u32(msg); + + // Look for the pipe, and attempt to put the message there + // (nonblocking) if we can. If we can't for any reason, then we + // free the message. + // XXX: LOCKING?!?! + if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) { + rv = nni_msgq_tryput(p->sendq, msg); + } + if (rv != 0) { + nni_msg_free(msg); + } + + // Now look for another message on the upper write queue. + nni_msgq_aio_get(uwq, s->aio_getq); +} + +static void +rep0_pipe_getq_cb(void *arg) +{ + rep0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->pipe, p->aio_send); +} + +static void +rep0_pipe_send_cb(void *arg) +{ + rep0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->pipe); + return; + } + + nni_msgq_aio_get(p->sendq, p->aio_getq); +} + +static void +rep0_pipe_recv_cb(void *arg) +{ + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + nni_msg * msg; + int rv; + uint8_t * body; + int hops; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + // Store the pipe id in the header, first thing. + rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); + if (rv != 0) { + // Failure here causes us to drop the message. + goto drop; + } + + // Move backtrace from body to header + hops = 1; + for (;;) { + int end = 0; + if (hops >= s->ttl) { + // This isn't malformed, but it has gone through + // too many hops. Do not disconnect, because we + // can legitimately receive messages with too many + // hops from devices, etc. + goto drop; + } + if (nni_msg_len(msg) < 4) { + // Peer is speaking garbage. Kick it. + nni_msg_free(msg); + nni_pipe_stop(p->pipe); + return; + } + body = nni_msg_body(msg); + end = (body[0] & 0x80) ? 1 : 0; + rv = nni_msg_header_append(msg, body, 4); + if (rv != 0) { + // Presumably this is due to out of memory. + // We could just discard and try again, but we + // just toss the connection for now. Given the + // out of memory situation, this is not unreasonable. + goto drop; + } + nni_msg_trim(msg, 4); + if (end) { + break; + } + } + + // Go ahead and send it up. + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(s->urq, p->aio_putq); + return; + +drop: + nni_msg_free(msg); + nni_pipe_recv(p->pipe, p->aio_recv); +} + +static void +rep0_pipe_putq_cb(void *arg) +{ + rep0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->pipe); + return; + } + + nni_pipe_recv(p->pipe, p->aio_recv); +} + +static int +rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + rep0_sock *s = arg; + int rv; + + nni_mtx_lock(&s->lk); + rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); + nni_mtx_unlock(&s->lk); + return (rv); +} + +static int +rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + rep0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static int +rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +{ + rep0_sock *s = arg; + return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); +} + +static int +rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +{ + rep0_sock *s = arg; + return (nni_getopt_int(s->ttl, buf, szp)); +} + +static nni_msg * +rep0_sock_filter(void *arg, nni_msg *msg) +{ + rep0_sock *s = arg; + char * header; + size_t len; + + nni_mtx_lock(&s->lk); + if (s->raw) { + nni_mtx_unlock(&s->lk); + return (msg); + } + + len = nni_msg_header_len(msg); + header = nni_msg_header(msg); + if (s->btrace != NULL) { + nni_free(s->btrace, s->btrace_len); + s->btrace = NULL; + s->btrace_len = 0; + } + if ((s->btrace = nni_alloc(len)) == NULL) { + nni_msg_free(msg); + return (NULL); + } + s->btrace_len = len; + memcpy(s->btrace, header, len); + nni_msg_header_clear(msg); + nni_mtx_unlock(&s->lk); + return (msg); +} + +static void +rep0_sock_send(void *arg, nni_aio *aio) +{ + rep0_sock *s = arg; + int rv; + nni_msg * msg; + + nni_mtx_lock(&s->lk); + if (s->raw) { + // Pass thru + nni_mtx_unlock(&s->lk); + nni_msgq_aio_put(s->uwq, aio); + return; + } + if (s->btrace == NULL) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + + msg = nni_aio_get_msg(aio); + + // drop anything else in the header... (it should already be + // empty, but there can be stale backtrace info there.) + nni_msg_header_clear(msg); + + if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } + + nni_free(s->btrace, s->btrace_len); + s->btrace = NULL; + s->btrace_len = 0; + + nni_mtx_unlock(&s->lk); + nni_msgq_aio_put(s->uwq, aio); +} + +static void +rep0_sock_recv(void *arg, nni_aio *aio) +{ + rep0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +static nni_proto_pipe_ops rep0_pipe_ops = { + .pipe_init = rep0_pipe_init, + .pipe_fini = rep0_pipe_fini, + .pipe_start = rep0_pipe_start, + .pipe_stop = rep0_pipe_stop, +}; + +static nni_proto_sock_option rep0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = rep0_sock_getopt_raw, + .pso_setopt = rep0_sock_setopt_raw, + }, + { + .pso_name = NNG_OPT_MAXTTL, + .pso_getopt = rep0_sock_getopt_maxttl, + .pso_setopt = rep0_sock_setopt_maxttl, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops rep0_sock_ops = { + .sock_init = rep0_sock_init, + .sock_fini = rep0_sock_fini, + .sock_open = rep0_sock_open, + .sock_close = rep0_sock_close, + .sock_options = rep0_sock_options, + .sock_filter = rep0_sock_filter, + .sock_send = rep0_sock_send, + .sock_recv = rep0_sock_recv, +}; + +static nni_proto rep0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REP_V0, "rep" }, + .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &rep0_sock_ops, + .proto_pipe_ops = &rep0_pipe_ops, +}; + +int +nng_rep0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &rep0_proto)); +} diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h new file mode 100644 index 00000000..93df9379 --- /dev/null +++ b/src/protocol/reqrep0/rep.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REP_H +#define NNG_PROTOCOL_REQREP0_REP_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_rep0_open(nng_socket *); + +#ifndef nng_rep_open +#define nng_rep_open nng_rep0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REP_H diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c new file mode 100644 index 00000000..94c7f1a0 --- /dev/null +++ b/src/protocol/reqrep0/req.c @@ -0,0 +1,675 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "protocol/reqrep0/req.h" + +// Request protocol. The REQ protocol is the "request" side of a +// request-reply pair. This is useful for building RPC clients, for example. + +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif + +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif + +typedef struct req0_pipe req0_pipe; +typedef struct req0_sock req0_sock; + +static void req0_resend(req0_sock *); +static void req0_timeout(void *); +static void req0_pipe_fini(void *); + +// A req0_sock is our per-socket protocol private structure. +struct req0_sock { + nni_msgq * uwq; + nni_msgq * urq; + nni_duration retry; + nni_time resend; + int raw; + int wantw; + int closed; + int ttl; + nni_msg * reqmsg; + + req0_pipe *pendpipe; + + nni_list readypipes; + nni_list busypipes; + + nni_timer_node timer; + + uint32_t nextid; // next id + uint8_t reqid[4]; // outstanding request ID (big endian) + nni_mtx mtx; + nni_cv cv; +}; + +// A req0_pipe is our per-pipe protocol private structure. +struct req0_pipe { + nni_pipe * pipe; + req0_sock * req; + nni_list_node node; + nni_aio * aio_getq; // raw mode only + nni_aio * aio_sendraw; // raw mode only + nni_aio * aio_sendcooked; // cooked mode only + nni_aio * aio_recv; + nni_aio * aio_putq; + nni_mtx mtx; +}; + +static void req0_resender(void *); +static void req0_getq_cb(void *); +static void req0_sendraw_cb(void *); +static void req0_sendcooked_cb(void *); +static void req0_recv_cb(void *); +static void req0_putq_cb(void *); + +static int +req0_sock_init(void **sp, nni_sock *sock) +{ + req0_sock *s; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->mtx); + nni_cv_init(&s->cv, &s->mtx); + + NNI_LIST_INIT(&s->readypipes, req0_pipe, node); + NNI_LIST_INIT(&s->busypipes, req0_pipe, node); + nni_timer_init(&s->timer, req0_timeout, s); + + // this is "semi random" start for request IDs. + s->nextid = nni_random(); + s->retry = NNI_SECOND * 60; + s->reqmsg = NULL; + s->raw = 0; + s->wantw = 0; + s->resend = NNI_TIME_ZERO; + s->ttl = 8; + s->uwq = nni_sock_sendq(sock); + s->urq = nni_sock_recvq(sock); + *sp = s; + + return (0); +} + +static void +req0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +req0_sock_close(void *arg) +{ + req0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + s->closed = 1; + nni_mtx_unlock(&s->mtx); + + nni_timer_cancel(&s->timer); +} + +static void +req0_sock_fini(void *arg) +{ + req0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + while ((!nni_list_empty(&s->readypipes)) || + (!nni_list_empty(&s->busypipes))) { + nni_cv_wait(&s->cv); + } + if (s->reqmsg != NULL) { + nni_msg_free(s->reqmsg); + } + nni_mtx_unlock(&s->mtx); + nni_cv_fini(&s->cv); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static void +req0_pipe_fini(void *arg) +{ + req0_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_sendcooked); + nni_aio_fini(p->aio_sendraw); + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); +} + +static int +req0_pipe_init(void **pp, nni_pipe *pipe, void *s) +{ + req0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&p->mtx); + if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) != + 0)) { + req0_pipe_fini(p); + return (rv); + } + + NNI_LIST_NODE_INIT(&p->node); + p->pipe = pipe; + p->req = s; + *pp = p; + return (0); +} + +static int +req0_pipe_start(void *arg) +{ + req0_pipe *p = arg; + req0_sock *s = p->req; + + if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { + return (NNG_EPROTO); + } + + nni_mtx_lock(&s->mtx); + if (s->closed) { + nni_mtx_unlock(&s->mtx); + return (NNG_ECLOSED); + } + nni_list_append(&s->readypipes, p); + // If sock was waiting for somewhere to send data, go ahead and + // send it to this pipe. + if (s->wantw) { + req0_resend(s); + } + nni_mtx_unlock(&s->mtx); + + nni_msgq_aio_get(s->uwq, p->aio_getq); + nni_pipe_recv(p->pipe, p->aio_recv); + return (0); +} + +static void +req0_pipe_stop(void *arg) +{ + req0_pipe *p = arg; + req0_sock *s = p->req; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_sendcooked); + nni_aio_stop(p->aio_sendraw); + + // At this point there should not be any further AIOs running. + // Further, any completion tasks have completed. + + nni_mtx_lock(&s->mtx); + // This removes the node from either busypipes or readypipes. + // It doesn't much matter which. + if (nni_list_node_active(&p->node)) { + nni_list_node_remove(&p->node); + if (s->closed) { + nni_cv_wake(&s->cv); + } + } + + if ((p == s->pendpipe) && (s->reqmsg != NULL)) { + // removing the pipe we sent the last request on... + // schedule immediate resend. + s->pendpipe = NULL; + s->resend = NNI_TIME_ZERO; + s->wantw = 1; + req0_resend(s); + } + nni_mtx_unlock(&s->mtx); +} + +static int +req0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + req0_sock *s = arg; + return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); +} + +static int +req0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + req0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static int +req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +{ + req0_sock *s = arg; + return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); +} + +static int +req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +{ + req0_sock *s = arg; + return (nni_getopt_int(s->ttl, buf, szp)); +} + +static int +req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz) +{ + req0_sock *s = arg; + return (nni_setopt_ms(&s->retry, buf, sz)); +} + +static int +req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) +{ + req0_sock *s = arg; + return (nni_getopt_ms(s->retry, buf, szp)); +} + +// Raw and cooked mode differ in the way they send messages out. +// +// For cooked mdes, we have a getq callback on the upper write queue, which +// when it finds a message, cancels any current processing, and saves a copy +// of the message, and then tries to "resend" the message, looking for a +// suitable available outgoing pipe. If no suitable pipe is available, +// a flag is set, so that as soon as such a pipe is available we trigger +// a resend attempt. We also trigger the attempt on either timeout, or if +// the underlying pipe we chose disconnects. +// +// For raw mode we can just let the pipes "contend" via getq to get a +// message from the upper write queue. The msgqueue implementation +// actually provides ordering, so load will be spread automatically. +// (NB: We may have to revise this in the future if we want to provide some +// kind of priority.) + +static void +req0_getq_cb(void *arg) +{ + req0_pipe *p = arg; + req0_sock *s = p->req; + + // We should be in RAW mode. Cooked mode traffic bypasses + // the upper write queue entirely, and should never end up here. + // If the mode changes, we may briefly deliver a message, but + // that's ok (there's an inherent race anyway). (One minor + // exception: we wind up here in error state when the uwq is closed.) + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + nni_aio_set_msg(p->aio_sendraw, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + // Send the message, but use the raw mode aio. + nni_pipe_send(p->pipe, p->aio_sendraw); +} + +static void +req0_sendraw_cb(void *arg) +{ + req0_pipe *p = arg; + + if (nni_aio_result(p->aio_sendraw) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_sendraw)); + nni_aio_set_msg(p->aio_sendraw, NULL); + nni_pipe_stop(p->pipe); + return; + } + + // Sent a message so we just need to look for another one. + nni_msgq_aio_get(p->req->uwq, p->aio_getq); +} + +static void +req0_sendcooked_cb(void *arg) +{ + req0_pipe *p = arg; + req0_sock *s = p->req; + + if (nni_aio_result(p->aio_sendcooked) != 0) { + // We failed to send... clean up and deal with it. + // We leave ourselves on the busy list for now, which + // means no new asynchronous traffic can occur here. + nni_msg_free(nni_aio_get_msg(p->aio_sendcooked)); + nni_aio_set_msg(p->aio_sendcooked, NULL); + nni_pipe_stop(p->pipe); + return; + } + + // Cooked mode. We completed a cooked send, so we need to + // reinsert ourselves in the ready list, and possibly schedule + // a resend. + + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->busypipes, p)) { + nni_list_remove(&s->busypipes, p); + nni_list_append(&s->readypipes, p); + req0_resend(s); + } else { + // We wind up here if stop was called from the reader + // side while we were waiting to be scheduled to run for the + // writer side. In this case we can't complete the operation, + // and we have to abort. + nni_pipe_stop(p->pipe); + } + nni_mtx_unlock(&s->mtx); +} + +static void +req0_putq_cb(void *arg) +{ + req0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->pipe); + return; + } + nni_aio_set_msg(p->aio_putq, NULL); + + nni_pipe_recv(p->pipe, p->aio_recv); +} + +static void +req0_recv_cb(void *arg) +{ + req0_pipe *p = arg; + nni_msg * msg; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + // We yank 4 bytes of body, and move them to the header. + if (nni_msg_len(msg) < 4) { + // Malformed message. + goto malformed; + } + if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { + // Arguably we could just discard and carry on. But + // dropping the connection is probably more helpful since + // it lets the other side see that a problem occurred. + // Plus it gives us a chance to reclaim some memory. + goto malformed; + } + (void) nni_msg_trim(msg, 4); // Cannot fail + + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(p->req->urq, p->aio_putq); + return; + +malformed: + nni_msg_free(msg); + nni_pipe_stop(p->pipe); +} + +static void +req0_timeout(void *arg) +{ + req0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (s->reqmsg != NULL) { + s->wantw = 1; + req0_resend(s); + } + nni_mtx_unlock(&s->mtx); +} + +static void +req0_resend(req0_sock *s) +{ + req0_pipe *p; + nni_msg * msg; + + // Note: This routine should be called with the socket lock held. + // Also, this should only be called while handling cooked mode + // requests. + if ((msg = s->reqmsg) == NULL) { + return; + } + + if (s->closed) { + s->reqmsg = NULL; + nni_msg_free(msg); + } + + if (s->wantw) { + s->wantw = 0; + + if (nni_msg_dup(&msg, s->reqmsg) != 0) { + // Failed to alloc message, reschedule it. Also, + // mark that we have a message we want to resend, + // in case something comes available. + s->wantw = 1; + nni_timer_schedule(&s->timer, nni_clock() + s->retry); + return; + } + + // Now we iterate across all possible outpipes, until + // one accepts it. + if ((p = nni_list_first(&s->readypipes)) == NULL) { + // No pipes ready to process us. Note that we have + // something to send, and schedule it. + nni_msg_free(msg); + s->wantw = 1; + return; + } + + nni_list_remove(&s->readypipes, p); + nni_list_append(&s->busypipes, p); + + s->pendpipe = p; + s->resend = nni_clock() + s->retry; + nni_aio_set_msg(p->aio_sendcooked, msg); + + // Note that because we were ready rather than busy, we + // should not have any I/O oustanding and hence the aio + // object will be available for our use. + nni_pipe_send(p->pipe, p->aio_sendcooked); + nni_timer_schedule(&s->timer, s->resend); + } +} + +static void +req0_sock_send(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + uint32_t id; + size_t len; + nni_msg * msg; + int rv; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_put(s->uwq, aio); + return; + } + + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + + // In cooked mode, because we need to manage our own resend logic, + // we bypass the upper writeq entirely. + + // Generate a new request ID. We always set the high + // order bit so that the peer can locate the end of the + // backtrace. (Pipe IDs have the high order bit clear.) + id = (s->nextid++) | 0x80000000u; + // Request ID is in big endian format. + NNI_PUT32(s->reqid, id); + + if ((rv = nni_msg_header_append(msg, s->reqid, 4)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + // If another message is there, this cancels it. + if (s->reqmsg != NULL) { + nni_msg_free(s->reqmsg); + s->reqmsg = NULL; + } + + nni_aio_set_msg(aio, NULL); + + // Make a duplicate message... for retries. + s->reqmsg = msg; + // Schedule for immediate send + s->resend = NNI_TIME_ZERO; + s->wantw = 1; + + req0_resend(s); + + nni_mtx_unlock(&s->mtx); + + nni_aio_finish(aio, 0, len); +} + +static nni_msg * +req0_sock_filter(void *arg, nni_msg *msg) +{ + req0_sock *s = arg; + nni_msg * rmsg; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + // Pass it unmolested + nni_mtx_unlock(&s->mtx); + return (msg); + } + + if (nni_msg_header_len(msg) < 4) { + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + return (NULL); + } + + if ((rmsg = s->reqmsg) == NULL) { + // We had no outstanding request. (Perhaps canceled, + // or duplicate response.) + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + return (NULL); + } + + if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) { + // Wrong request id. + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + return (NULL); + } + + s->reqmsg = NULL; + s->pendpipe = NULL; + nni_mtx_unlock(&s->mtx); + + nni_msg_free(rmsg); + + return (msg); +} + +static void +req0_sock_recv(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (!s->raw) { + if (s->reqmsg == NULL) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + } + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_get(s->urq, aio); +} + +static nni_proto_pipe_ops req0_pipe_ops = { + .pipe_init = req0_pipe_init, + .pipe_fini = req0_pipe_fini, + .pipe_start = req0_pipe_start, + .pipe_stop = req0_pipe_stop, +}; + +static nni_proto_sock_option req0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = req0_sock_getopt_raw, + .pso_setopt = req0_sock_setopt_raw, + }, + { + .pso_name = NNG_OPT_MAXTTL, + .pso_getopt = req0_sock_getopt_maxttl, + .pso_setopt = req0_sock_setopt_maxttl, + }, + { + .pso_name = NNG_OPT_REQ_RESENDTIME, + .pso_getopt = req0_sock_getopt_resendtime, + .pso_setopt = req0_sock_setopt_resendtime, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops req0_sock_ops = { + .sock_init = req0_sock_init, + .sock_fini = req0_sock_fini, + .sock_open = req0_sock_open, + .sock_close = req0_sock_close, + .sock_options = req0_sock_options, + .sock_filter = req0_sock_filter, + .sock_send = req0_sock_send, + .sock_recv = req0_sock_recv, +}; + +static nni_proto req0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REQ_V0, "req" }, + .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &req0_sock_ops, + .proto_pipe_ops = &req0_pipe_ops, +}; + +int +nng_req0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &req0_proto)); +} diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h new file mode 100644 index 00000000..99c9bf62 --- /dev/null +++ b/src/protocol/reqrep0/req.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REQ_H +#define NNG_PROTOCOL_REQREP0_REQ_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_req0_open(nng_socket *); + +#ifndef nng_req_open +#define nng_req_open nng_req0_open +#endif + +#define NNG_OPT_REQ_RESENDTIME "req:resend-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REQ_H |
