From 64f4efa81a4bddcd8bdc31b6408b67d6c2e27dd1 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 30 Dec 2016 01:10:25 -0800 Subject: REP protocol. Untested beyond compilation. --- src/protocol/reqrep/rep.c | 441 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 441 insertions(+) create mode 100644 src/protocol/reqrep/rep.c (limited to 'src/protocol') diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c new file mode 100644 index 00000000..1b48ca08 --- /dev/null +++ b/src/protocol/reqrep/rep.c @@ -0,0 +1,441 @@ +// +// Copyright 2016 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" + +// Response protocol. The REP protocol is the "reply" side of a +// request-reply pair. This is useful for bulding RPC servers, for +// example. + +typedef struct nni_rep_pipe nni_rep_pipe; +typedef struct nni_rep_sock nni_rep_sock; + +// An nni_rep_sock is our per-socket protocol private structure. +struct nni_rep_sock { + nni_socket * sock; + nni_mutex mx; + nni_msgqueue * uwq; + nni_msgqueue * urq; + int raw; + int ttl; + nni_thread * sender; + nni_idhash * pipes; + char * btrace; + size_t btrace_len; +}; + +// An nni_rep_pipe is our per-pipe protocol private structure. +struct nni_rep_pipe { + nni_pipe * pipe; + nni_rep_sock * rep; + nni_msgqueue * sendq; + int sigclose; +}; + +static void nni_rep_receiver(void *); +static void nni_rep_sender(void *); +static void nni_rep_topsender(void *); + +static int +nni_rep_create(void **repp, nni_socket *sock) +{ + nni_rep_sock *rep; + int rv; + + if ((rep = nni_alloc(sizeof (*rep))) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mutex_init(&rep->mx)) != 0) { + nni_free(rep, sizeof (*rep)); + return (rv); + } + rep->ttl = 8; // Per RFC + rep->sock = sock; + rep->raw = 0; + rep->btrace = NULL; + rep->btrace_len = 0; + if ((rv = nni_idhash_create(&rep->pipes)) != 0) { + nni_mutex_fini(&rep->mx); + nni_free(rep, sizeof (*rep)); + return (rv); + } + + rep->uwq = nni_socket_sendq(sock); + rep->urq = nni_socket_recvq(sock); + + rv = nni_thread_create(&rep->sender, nni_rep_topsender, rep); + if (rv != 0) { + nni_idhash_destroy(rep->pipes); + nni_mutex_fini(&rep->mx); + nni_free(rep, sizeof (*rep)); + return (rv); + } + *repp = rep; + nni_socket_senderr(sock, NNG_ESTATE); + return (0); +} + + +static void +nni_rep_destroy(void *arg) +{ + nni_rep_sock *rep = arg; + + nni_thread_reap(rep->sender); + nni_idhash_destroy(rep->pipes); + nni_mutex_fini(&rep->mx); + if (rep->btrace != NULL) { + nni_free(rep->btrace, rep->btrace_len); + } + nni_free(rep, sizeof (*rep)); +} + + +static int +nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap) +{ + nni_rep_sock *rep = arg; + nni_rep_pipe *rp; + int rv; + + rp->pipe = pipe; + rp->sigclose = 0; + + rv = nni_msgqueue_create(&rp->sendq, 2); + if (rv != 0) { + return (rv); + } + + nni_mutex_enter(&rep->mx); + if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) { + nni_msgqueue_destroy(rp->sendq); + nni_mutex_exit(&rep->mx); + return (rv); + } + nni_mutex_exit(&rep->mx); + return (0); +} + + +static void +nni_rep_rem_pipe(void *arg, void *data) +{ + nni_rep_sock *rep = arg; + nni_rep_pipe *rp = data; + + nni_mutex_enter(&rep->mx); + nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); + nni_mutex_exit(&rep->mx); + nni_msgqueue_destroy(rp->sendq); +} + +// nni_rep_topsender watches for messages from the upper write queue, +// extracts the destination pipe, and forwards it to the appropriate +// destination pipe via a separate queue. This prevents a single bad +// or slow pipe from gumming up the works for the entire socket. +static void +nni_rep_topsender(void *arg) +{ + nni_rep_sock *rep = arg; + nni_msgqueue *uwq = rep->uwq; + nni_msgqueue *urq = rep->urq; + nni_msg *msg; + + for (;;) { + uint8_t *header; + size_t size; + uint32_t id; + nni_rep_pipe *rp; + int rv; + + if ((rv = nni_msgqueue_get(uwq, &msg)) != 0) { + break; + } + // We yank the outgoing pipe id from the header + header = nni_msg_header(msg, &size); + if (size < 4) { + nni_msg_free(msg); + continue; + } + id = header[0]; + id <<= 8; + id += header[1]; + id <<= 8; + id += header[2]; + id <<= 8; + id += header[3]; + nni_msg_trim_header(msg, 4); + + nni_mutex_enter(&rep->mx); + if (nni_idhash_find(rep->pipes, id, (void **)&rp) != 0) { + nni_mutex_exit(&rep->mx); + nni_msg_free(msg); + continue; + } + // Try a non-blocking put to the lower writer. + rv = nni_msgqueue_put_until(rp->sendq, msg, NNI_TIME_ZERO); + if (rv != 0) { + // message queue is full, we have no choice but + // to drop it. This should not happen under normal + // circumstances. + nni_msg_free(msg); + } + nni_mutex_exit(&rep->mx); + } +} + +static void +nni_rep_sender(void *arg) +{ + nni_rep_pipe *rp = arg; + nni_rep_sock *rep = rp->rep; + nni_msgqueue *urq = rep->urq; + nni_msgqueue *wq = rp->sendq; + nni_pipe *pipe = rp->pipe; + nni_msg *msg; + uint8_t *body; + size_t size; + int rv; + + for (;;) { + rv = nni_msgqueue_get_sig(wq, &msg, &rp->sigclose); + if (rv != 0) { + break; + } + + rv = nni_pipe_send(pipe, msg); + if (rv != 0) { + nni_msg_free(msg); + break; + } + } + nni_msgqueue_signal(urq, &rp->sigclose); + nni_pipe_close(pipe); +} + + +static void +nni_rep_receiver(void *arg) +{ + nni_rep_pipe *rp = arg; + nni_rep_sock *rep = rp->rep; + nni_msgqueue *urq = rep->urq; + nni_msgqueue *uwq = rep->uwq; + nni_pipe *pipe = rp->pipe; + nni_msg *msg; + int rv; + uint8_t idbuf[4]; + uint32_t id = nni_pipe_id(pipe); + + idbuf[0] = (uint8_t) (id >> 24); + idbuf[1] = (uint8_t) (id >> 16); + idbuf[2] = (uint8_t) (id >> 8); + idbuf[3] = (uint8_t) (id); + + for (;;) { + size_t len; + char *body; + int hops; + +again: + rv = nni_pipe_recv(pipe, &msg); + if (rv != 0) { + break; + } + + // Store the pipe id in the header, first thing. + rv = nni_msg_append_header(msg, idbuf, 4); + if (rv != 0) { + nni_msg_free(msg); + continue; + } + + // Move backtrace from body to header + hops = 0; + for (;;) { + int end = 0; + if (hops >= rep->ttl) { + nni_msg_free(msg); + goto again; + } + body = nni_msg_body(msg, &len); + if (len < 4) { + nni_msg_free(msg); + goto again; + } + end = (body[0] & 0x80) ? 1 : 0; + rv = nni_msg_append_header(msg, body, 4); + if (rv != 0) { + nni_msg_free(msg); + goto again; + } + nni_msg_trim(msg, 4); + if (end) { + break; + } + } + + // Now send it up. + rv = nni_msgqueue_put_sig(urq, msg, &rp->sigclose); + if (rv != 0) { + nni_msg_free(msg); + break; + } + } + nni_msgqueue_signal(uwq, &rp->sigclose); + nni_pipe_close(pipe); +} + + +static int +nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz) +{ + nni_rep_sock *rep = arg; + int rv; + + switch (opt) { + case NNG_OPT_MAXTTL: + nni_mutex_enter(&rep->mx); + rv = nni_setopt_int(&rep->ttl, buf, sz, 1, 255); + nni_mutex_exit(&rep->mx); + break; + case NNG_OPT_RAW: + nni_mutex_enter(&rep->mx); + rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1); + nni_mutex_exit(&rep->mx); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static int +nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp) +{ + nni_rep_sock *rep = arg; + int rv; + + switch (opt) { + case NNG_OPT_MAXTTL: + nni_mutex_enter(&rep->mx); + rv = nni_getopt_int(&rep->ttl, buf, szp); + nni_mutex_exit(&rep->mx); + break; + case NNG_OPT_RAW: + nni_mutex_enter(&rep->mx); + rv = nni_getopt_int(&rep->raw, buf, szp); + nni_mutex_exit(&rep->mx); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + + +static nni_msg * +nni_rep_sendfilter(void *arg, nni_msg *msg) +{ + nni_rep_sock *rep = arg; + size_t len; + + nni_mutex_enter(&rep->mx); + if (rep->raw) { + nni_mutex_exit(&rep->mx); + return (msg); + } + + // Cannot send again until a receive is done... + nni_socket_senderr(rep->sock, NNG_ESTATE); + + // If we have a stored backtrace, append it to the header... + // if we don't have a backtrace, discard the message. + if (rep->btrace == NULL) { + nni_mutex_exit(&rep->mx); + nni_msg_free(msg); + return (NULL); + } + + // drop anything else in the header... + (void) nni_msg_header(msg, &len); + nni_msg_trim_header(msg, len); + + if (nni_msg_append_header(msg, rep->btrace, rep->btrace_len) != 0) { + nni_free(rep->btrace, rep->btrace_len); + rep->btrace = NULL; + rep->btrace_len = 0; + nni_mutex_exit(&rep->mx); + nni_msg_free(msg); + return (NULL); + } + + nni_free(rep->btrace, rep->btrace_len); + rep->btrace = NULL; + rep->btrace_len = 0; + return (msg); +} + + +static nni_msg * +nni_rep_recvfilter(void *arg, nni_msg *msg) +{ + nni_rep_sock *rep = arg; + char *header; + size_t len; + + nni_mutex_enter(&rep->mx); + if (rep->raw) { + nni_mutex_exit(&rep->mx); + return (msg); + } + + nni_socket_senderr(rep->sock, 0); + header = nni_msg_header(msg, &len); + if (rep->btrace != NULL) { + nni_free(rep->btrace, rep->btrace_len); + rep->btrace = NULL; + rep->btrace_len = 0; + } + if ((rep->btrace = nni_alloc(len)) == NULL) { + nni_mutex_exit(&rep->mx); + nni_msg_free(msg); + return (NULL); + } + rep->btrace_len = len; + memcpy(rep->btrace, 0, len); + nni_msg_trim_header(msg, len); + nni_mutex_exit(&rep->mx); + return (msg); +} + + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +struct nni_protocol nni_rep_protocol = { + .proto_self = NNG_PROTO_REP, + .proto_peer = NNG_PROTO_REQ, + .proto_name = "rep", + .proto_create = nni_rep_create, + .proto_destroy = nni_rep_destroy, + .proto_add_pipe = nni_rep_add_pipe, + .proto_rem_pipe = nni_rep_rem_pipe, + .proto_pipe_size = sizeof (nni_rep_pipe), + .proto_pipe_send = nni_rep_sender, + .proto_pipe_recv = nni_rep_receiver, + .proto_setopt = nni_rep_setopt, + .proto_getopt = nni_rep_getopt, + .proto_recv_filter = nni_rep_recvfilter, + .proto_send_filter = nni_rep_sendfilter, +}; -- cgit v1.2.3-70-g09d2