aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-28 22:57:38 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-28 22:57:38 -0800
commit6d9175cf737c176799862557dae30df60b289bdf (patch)
tree2abe603a480fb879844496917d34f9397095b667 /src
parent790e4de4453b0f016974aed1cc89a3ac47ba60c2 (diff)
downloadnng-6d9175cf737c176799862557dae30df60b289bdf.tar.gz
nng-6d9175cf737c176799862557dae30df60b289bdf.tar.bz2
nng-6d9175cf737c176799862557dae30df60b289bdf.zip
REQ implementation.
This is an untested implementation for REQ. It is based hugely upon the mangos implementation, so there is an excellent chance it will work.
Diffstat (limited to 'src')
-rw-r--r--src/protocol/reqrep/req.c449
1 files changed, 449 insertions, 0 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
new file mode 100644
index 00000000..3514047b
--- /dev/null
+++ b/src/protocol/reqrep/req.c
@@ -0,0 +1,449 @@
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+// Request protocol. The REQ protocol is the "request" side of a
+// request-reply pair. This is useful for bulding RPC clients, for
+// example.
+
+typedef struct nni_req_pipe nni_req_pipe;
+typedef struct nni_req_sock nni_req_sock;
+
+// An nni_req_sock is our per-socket protocol private structure.
+struct nni_req_sock {
+ nni_socket * sock;
+ nni_mutex mx;
+ nni_cond cv;
+ nni_msgqueue * uwq;
+ nni_msgqueue * urq;
+ nni_duration retry;
+ nni_time resend;
+ nni_thread * resender;
+ int raw;
+ nni_list pipes;
+ nni_msg * reqmsg;
+ uint32_t nextid; // next id
+ uint8_t reqid[4]; // outstanding request ID (big endian)
+};
+
+// An nni_req_pipe is our per-pipe protocol private structure.
+struct nni_req_pipe {
+ nni_pipe * pipe;
+ nni_req_sock * req;
+ int good;
+ nni_thread * sthr;
+ nni_thread * rthr;
+ int sigclose;
+ nni_list_node node;
+};
+
+static void nni_req_receiver(void *);
+static void nni_req_sender(void *);
+static void nni_req_resender(void *);
+
+static int
+nni_req_create(void **reqp, nni_socket *sock)
+{
+ nni_req_sock *req;
+ int rv;
+
+ if ((req = nni_alloc(sizeof (*req))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_mutex_init(&req->mx)) != 0) {
+ nni_free(req, sizeof (*req));
+ return (rv);
+ }
+ if ((rv = nni_cond_init(&req->cv, &req->mx)) != 0) {
+ nni_mutex_fini(&req->mx);
+ nni_free(req, sizeof (*req));
+ return (rv);
+ }
+ // this is "semi random" start for request IDs.
+ req->nextid = (nni_clock() >> 32) ^ (nni_clock() & 0xffffffff);
+ req->retry = NNI_SECOND * 60;
+ req->sock = sock;
+ req->reqmsg = NULL;
+ req->raw = 0;
+ req->resend = NNI_TIME_ZERO;
+ NNI_LIST_INIT(&req->pipes, nni_req_pipe, node);
+
+ req->uwq = nni_socket_sendq(sock);
+ req->urq = nni_socket_recvq(sock);
+ *reqp = req;
+ nni_socket_recverr(sock, NNG_ESTATE);
+ rv = nni_thread_create(&req->resender, nni_req_resender, req);
+ if (rv != 0) {
+ nni_cond_fini(&req->cv);
+ nni_mutex_fini(&req->mx);
+ nni_free(req, sizeof (*req));
+ return (rv);
+ }
+ return (0);
+}
+
+
+static void
+nni_req_destroy(void *arg)
+{
+ nni_req_sock *req = arg;
+ nni_thread *resender;
+
+ // Shut down the resender. We request it to exit by clearing
+ // its old value, then kick it.
+ nni_mutex_enter(&req->mx);
+ resender = req->resender;
+ req->resender = NULL;
+ nni_cond_broadcast(&req->cv);
+ nni_mutex_exit(&req->mx);
+
+ nni_thread_reap(resender);
+ nni_cond_fini(&req->cv);
+ nni_mutex_fini(&req->mx);
+ nni_free(req, sizeof (*req));
+}
+
+
+static int
+nni_req_add_pipe(void *arg, nni_pipe *pipe, void **datap)
+{
+ nni_req_sock *req = arg;
+ nni_req_pipe *rp;
+ int rv;
+
+ rp = nni_alloc(sizeof (*rp));
+ rp->pipe = pipe;
+ rp->good = 0;
+ rp->sigclose = 0;
+ rp->sthr = NULL;
+ rp->rthr = NULL;
+ rp->req = req;
+
+ nni_mutex_enter(&req->mx);
+ if ((rv = nni_thread_create(&rp->rthr, nni_req_receiver, rp)) != 0) {
+ nni_mutex_exit(&req->mx);
+ return (rv);
+ }
+ if ((rv = nni_thread_create(&rp->sthr, nni_req_sender, rp)) != 0) {
+ nni_mutex_exit(&req->mx);
+ return (rv);
+ }
+ rp->good = 1;
+ nni_list_append(&req->pipes, rp);
+ *datap = rp;
+ nni_mutex_exit(&req->mx);
+ return (0);
+}
+
+
+static void
+nni_req_rem_pipe(void *arg, void *data)
+{
+ nni_req_sock *req = arg;
+ nni_req_pipe *rp = data;
+
+ nni_mutex_enter(&req->mx);
+ nni_list_remove(&req->pipes, rp);
+ nni_mutex_exit(&req->mx);
+
+ if (rp->sthr != NULL) {
+ (void) nni_thread_reap(rp->sthr);
+ }
+ if (rp->rthr != NULL) {
+ (void) nni_thread_reap(rp->rthr);
+ }
+}
+
+
+static void
+nni_req_sender(void *arg)
+{
+ nni_req_pipe *rp = arg;
+ nni_req_sock *req = rp->req;
+ nni_msgqueue *uwq = req->uwq;
+ nni_msgqueue *urq = req->urq;
+ nni_pipe *pipe = rp->pipe;
+ nni_msg *msg;
+ int rv;
+
+ nni_mutex_enter(&req->mx);
+ if (!rp->good) {
+ nni_mutex_exit(&req->mx);
+ return;
+ }
+ nni_mutex_exit(&req->mx);
+
+ for (;;) {
+ rv = nni_msgqueue_get_sig(uwq, &msg, &rp->sigclose);
+ if (rv != 0) {
+ break;
+ }
+ rv = nni_pipe_send(pipe, msg);
+ if (rv != 0) {
+ nni_msg_free(msg);
+ break;
+ }
+ }
+ nni_msgqueue_signal(urq, &rp->sigclose);
+ nni_pipe_close(pipe);
+}
+
+
+static void
+nni_req_receiver(void *arg)
+{
+ nni_req_pipe *rp = arg;
+ nni_req_sock *req = rp->req;
+ nni_msgqueue *urq = req->urq;
+ nni_msgqueue *uwq = req->uwq;
+ nni_pipe *pipe = rp->pipe;
+ nni_msg *msg;
+ int rv;
+
+ nni_mutex_enter(&req->mx);
+ if (!rp->good) {
+ nni_mutex_exit(&req->mx);
+ return;
+ }
+ nni_mutex_exit(&req->mx);
+ for (;;) {
+ size_t len;
+ char *body;
+ rv = nni_pipe_recv(pipe, &msg);
+ if (rv != 0) {
+ break;
+ }
+ // We yank 4 bytes of body, and move them to the header.
+ body = nni_msg_body(msg, &len);
+ if (len < 4) {
+ // Not enough data, just toss it.
+ nni_msg_free(msg);
+ continue;
+ }
+ if (nni_msg_append_header(msg, body, 4) != 0) {
+ // Should be NNG_ENOMEM
+ nni_msg_free(msg);
+ continue;
+ }
+ if (nni_msg_trim(msg, 4) != 0) {
+ // This should never happen - could be an assert.
+ nni_panic("Failed to trim REQ header from body");
+ }
+ rv = nni_msgqueue_put_sig(urq, msg, &rp->sigclose);
+ if (rv != 0) {
+ nni_msg_free(msg);
+ break;
+ }
+ }
+ nni_msgqueue_signal(uwq, &rp->sigclose);
+ nni_pipe_close(pipe);
+}
+
+
+static int
+nni_req_setopt(void *arg, int opt, const void *buf, size_t sz)
+{
+ nni_req_sock *req = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RESENDTIME:
+ nni_mutex_enter(&req->mx);
+ rv = nni_setopt_duration(&req->retry, buf, sz);
+ nni_mutex_exit(&req->mx);
+ break;
+ case NNG_OPT_RAW:
+ nni_mutex_enter(&req->mx);
+ rv = nni_setopt_int(&req->raw, buf, sz);
+ nni_mutex_exit(&req->mx);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+static int
+nni_req_getopt(void *arg, int opt, void *buf, size_t *szp)
+{
+ nni_req_sock *req = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RESENDTIME:
+ nni_mutex_enter(&req->mx);
+ rv = nni_getopt_duration(&req->retry, buf, szp);
+ nni_mutex_exit(&req->mx);
+ break;
+ case NNG_OPT_RAW:
+ nni_mutex_enter(&req->mx);
+ rv = nni_getopt_int(&req->raw, buf, szp);
+ nni_mutex_exit(&req->mx);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+static void
+nni_req_resender(void *arg)
+{
+ nni_req_sock *req = arg;
+ int rv;
+
+ for (;;) {
+ nni_mutex_enter(&req->mx);
+ if (req->resender == NULL) {
+ nni_mutex_exit(&req->mx);
+ return;
+ }
+ if (req->reqmsg == NULL) {
+ nni_cond_wait(&req->cv);
+ nni_mutex_exit(&req->mx);
+ continue;
+ }
+ rv = nni_cond_waituntil(&req->cv, req->resend);
+ if ((rv == NNG_ETIMEDOUT) && (req->reqmsg != NULL)) {
+ nni_msg *dup;
+ // XXX: check for final timeout on this?
+ if (nni_msg_dup(&dup, req->reqmsg) != 0) {
+ if (nni_msgqueue_putback(req->uwq, dup) != 0) {
+ nni_msg_free(dup);
+ }
+ }
+ req->resend = nni_clock() + req->retry;
+ }
+ nni_mutex_exit(&req->mx);
+ }
+}
+
+
+static nni_msg *
+nni_req_sendfilter(void *arg, nni_msg *msg)
+{
+ nni_req_sock *req = arg;
+ uint32_t id;
+ uint8_t buf[4];
+
+ nni_mutex_enter(&req->mx);
+ if (req->raw) {
+ // No automatic retry, and the request ID must
+ // be in the header coming down.
+ nni_mutex_exit(&req->mx);
+ return (msg);
+ }
+
+ // Generate a new request ID. We always set the high
+ // order bit so that the peer can locate the end of the
+ // backtrace. (Pipe IDs have the high order bit clear.)
+ id = (req->nextid++) | 0x80000000u;
+
+ // Request ID is in big endian format.
+ req->reqid[0] = (uint8_t) (id >> 24);
+ req->reqid[1] = (uint8_t) (id >> 16);
+ req->reqid[2] = (uint8_t) (id >> 8);
+ req->reqid[3] = (uint8_t) (id);
+
+ if (nni_msg_append_header(msg, buf, 4) != 0) {
+ // Should be ENOMEM.
+ nni_mutex_exit(&req->mx);
+ nni_msg_free(msg);
+ return (NULL);
+ }
+
+ // If another message is there, this cancels it.
+ if (req->reqmsg != NULL) {
+ nni_msg_free(req->reqmsg);
+ req->reqmsg = NULL;
+ }
+
+ // Make a duplicate message... for retries.
+ if (nni_msg_dup(&req->reqmsg, msg) != 0) {
+ nni_mutex_exit(&req->mx);
+ nni_msg_free(msg);
+ return (NULL);
+ }
+
+ // Schedule the next retry
+ req->resend = nni_clock() + req->retry;
+ nni_cond_signal(&req->cv);
+
+ // Clear the error condition.
+ nni_socket_recverr(req->sock, 0);
+ nni_mutex_exit(&req->mx);
+
+ return (msg);
+}
+
+
+static nni_msg *
+nni_req_recvfilter(void *arg, nni_msg *msg)
+{
+ nni_req_sock *req = arg;
+ char *header;
+ size_t len;
+
+ nni_mutex_enter(&req->mx);
+ if (req->raw) {
+ // Pass it unmolested
+ nni_mutex_exit(&req->mx);
+ return (msg);
+ }
+
+ header = nni_msg_header(msg, &len);
+ if (len < 4) {
+ nni_mutex_exit(&req->mx);
+ nni_msg_free(msg);
+ return (NULL);
+ }
+
+ if (req->reqmsg == NULL) {
+ // We had no outstanding request.
+ nni_mutex_exit(&req->mx);
+ nni_msg_free(msg);
+ return (NULL);
+ }
+ if (memcmp(header, req->reqid, 4) != 0) {
+ // Wrong request id
+ nni_mutex_exit(&req->mx);
+ nni_msg_free(msg);
+ return (NULL);
+ }
+
+ nni_socket_recverr(req->sock, NNG_ESTATE);
+ nni_msg_free(req->reqmsg);
+ req->reqmsg = NULL;
+ nni_cond_signal(&req->cv);
+ nni_mutex_exit(&req->mx);
+ return (msg);
+}
+
+
+// This is the global protocol structure -- our linkage to the core.
+// This should be the only global non-static symbol in this file.
+struct nni_protocol nni_req_protocol = {
+ .proto_self = NNG_PROTO_REQ,
+ .proto_peer = NNG_PROTO_REP,
+ .proto_name = "req",
+ .proto_create = nni_req_create,
+ .proto_destroy = nni_req_destroy,
+ .proto_add_pipe = nni_req_add_pipe,
+ .proto_rem_pipe = nni_req_rem_pipe,
+ .proto_setopt = nni_req_setopt,
+ .proto_getopt = nni_req_getopt,
+ .proto_recv_filter = nni_req_recvfilter,
+ .proto_send_filter = nni_req_sendfilter,
+};