aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-23 22:49:57 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-23 22:49:57 -0800
commit7df0822d3ed58ee73918cac576c0b07363e84425 (patch)
tree5cae69dc0dc3e609260e0bd99bb8743c1c1a28cc /src/protocol/reqrep
parent91a0b46b6a63f1c2345279b831a02c972e7b1781 (diff)
downloadnng-7df0822d3ed58ee73918cac576c0b07363e84425.tar.gz
nng-7df0822d3ed58ee73918cac576c0b07363e84425.tar.bz2
nng-7df0822d3ed58ee73918cac576c0b07363e84425.zip
Added a bunch more compatibility stuff.
I implemented the reqrep compatibility test, which uncovered a few semantic issues I had in the REQ/REP protocol, which I've fixed. There are still missing things. and at least one portion of the req/rep test suite cannot be enabled until I add tuning of the reconnect timeout, which is currently way too long (1 sec) for the test suite to work.
Diffstat (limited to 'src/protocol/reqrep')
-rw-r--r--src/protocol/reqrep/req.c187
1 files changed, 155 insertions, 32 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index c200fdc9..fe61584c 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -9,6 +9,7 @@
#include <stdlib.h>
#include <string.h>
+#include <stdio.h>
#include "core/nng_impl.h"
@@ -29,8 +30,15 @@ struct nni_req_sock {
nni_time resend;
int raw;
int closing;
+ int wantw;
nni_msg * reqmsg;
nni_msg * retrymsg;
+
+ nni_list pipes;
+ nni_req_pipe * nextpipe;
+ nni_req_pipe * pendpipe;
+ int npipes;
+
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
};
@@ -39,7 +47,9 @@ struct nni_req_sock {
struct nni_req_pipe {
nni_pipe * pipe;
nni_req_sock * req;
+ nni_msgq * mq;
int sigclose;
+ nni_list_node node;
};
static void nni_req_resender(void *);
@@ -58,12 +68,16 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
return (rv);
}
// this is "semi random" start for request IDs.
+ NNI_LIST_INIT(&req->pipes, nni_req_pipe, node);
+ req->nextpipe = NULL;
+ req->npipes = 0;
req->nextid = nni_random();
req->retry = NNI_SECOND * 60;
req->sock = sock;
req->reqmsg = NULL;
req->retrymsg = NULL;
req->raw = 0;
+ req->wantw = 0;
req->resend = NNI_TIME_ZERO;
req->uwq = nni_sock_sendq(sock);
@@ -106,10 +120,16 @@ static int
nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
nni_req_pipe *rp;
+ int rv;
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_msgq_init(&rp->mq, 0)) != 0) {
+ NNI_FREE_STRUCT(rp);
+ return (rv);
+ }
+ NNI_LIST_NODE_INIT(&rp->node);
rp->pipe = pipe;
rp->sigclose = 0;
rp->req = rsock;
@@ -124,6 +144,7 @@ nni_req_pipe_fini(void *arg)
nni_req_pipe *rp = arg;
if (rp != NULL) {
+ nni_msgq_fini(rp->mq);
NNI_FREE_STRUCT(rp);
}
}
@@ -133,10 +154,14 @@ static int
nni_req_pipe_add(void *arg)
{
nni_req_pipe *rp = arg;
+ nni_req_sock *req = rp->req;
if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) {
return (NNG_EPROTO);
}
+ nni_list_append(&req->pipes, rp);
+ req->npipes++;
+ nni_cv_wake(&req->cv); // Wake the top sender, new job candidate!
return (0);
}
@@ -144,8 +169,20 @@ nni_req_pipe_add(void *arg)
static void
nni_req_pipe_rem(void *arg)
{
- // As with add, nothing to do here.
- NNI_ARG_UNUSED(arg);
+ nni_req_pipe *rp = arg;
+ nni_req_sock *req = rp->req;
+
+ if (rp == req->nextpipe) {
+ req->nextpipe = nni_list_next(&req->pipes, rp);
+ }
+ if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
+ // we are removing the pipe we sent the last request on...
+ // schedule immediate resend.
+ req->resend = NNI_TIME_ZERO;
+ nni_cv_wake(&req->cv);
+ }
+ req->npipes--;
+ nni_list_remove(&req->pipes, rp);
}
@@ -154,33 +191,27 @@ nni_req_pipe_send(void *arg)
{
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- nni_msgq *uwq = req->uwq;
- nni_msgq *urq = req->urq;
- nni_pipe *pipe = rp->pipe;
nni_mtx *mx = nni_sock_mtx(req->sock);
nni_msg *msg;
int rv;
for (;;) {
nni_mtx_lock(mx);
- if ((msg = req->retrymsg) != NULL) {
- req->retrymsg = NULL;
+ if (req->wantw) {
+ nni_cv_wake(&req->cv);
}
nni_mtx_unlock(mx);
- if (msg == NULL) {
- rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose);
- if (rv != 0) {
- break;
- }
+ if (nni_msgq_get_sig(rp->mq, &msg, &rp->sigclose) != 0) {
+ break;
}
- rv = nni_pipe_send(pipe, msg);
+ rv = nni_pipe_send(rp->pipe, msg);
if (rv != 0) {
nni_msg_free(msg);
break;
}
}
- nni_msgq_signal(urq, &rp->sigclose);
- nni_pipe_close(pipe);
+ nni_msgq_signal(req->urq, &rp->sigclose);
+ nni_pipe_close(rp->pipe);
}
@@ -221,7 +252,7 @@ nni_req_pipe_recv(void *arg)
break;
}
}
- nni_msgq_signal(uwq, &rp->sigclose);
+ nni_msgq_signal(rp->mq, &rp->sigclose);
nni_pipe_close(pipe);
}
@@ -267,11 +298,76 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static void
+nni_req_sock_send(void *arg)
+{
+ nni_req_sock *req = arg;
+ nni_req_pipe *rp;
+ nni_msgq *uwq = req->uwq;
+ nni_msg *msg;
+ nni_mtx *mx = nni_sock_mtx(req->sock);
+ int i;
+
+ msg = NULL;
+
+ for (;;) {
+ if ((msg == NULL) && (nni_msgq_get(uwq, &msg) != 0)) {
+ // ECLOSED? Should be!
+ return;
+ }
+
+ nni_mtx_lock(mx);
+ if (!req->raw) {
+ nni_mtx_unlock(mx);
+ // Cooked messages come another path... just toss
+ // this (shouldn't happen actually!)
+ if (msg != NULL) {
+ nni_msg_free(msg);
+ msg = NULL;
+ }
+ continue;
+ }
+
+ if (req->closing) {
+ if (msg != NULL) {
+ nni_mtx_unlock(mx);
+ nni_msg_free(msg);
+ return;
+ }
+ }
+ req->wantw = 0;
+ for (i = 0; i < req->npipes; i++) {
+ rp = req->nextpipe;
+ if (rp == NULL) {
+ rp = nni_list_first(&req->pipes);
+ }
+ req->nextpipe = nni_list_next(&req->pipes, rp);
+ if (nni_msgq_tryput(rp->mq, msg) == 0) {
+ msg = NULL;
+ break;
+ }
+ }
+ // We weren't able to deliver it. We have two choices:
+ // 1) drop the message and let the originator resend, or
+ // 2) apply pushback. There is value in pushback, since it
+ // will cause senders to slow down, or redistribute the work.
+ // So, let's try that.
+ if (msg != NULL) {
+ req->wantw = 1;
+ nni_cv_wait(&req->cv);
+ }
+ nni_mtx_unlock(mx);
+ }
+}
+
+
+static void
nni_req_sock_resend(void *arg)
{
nni_req_sock *req = arg;
+ nni_req_pipe *rp;
nni_mtx *mx = nni_sock_mtx(req->sock);
- int rv;
+ nni_msg *msg;
+ int i;
for (;;) {
nni_mtx_lock(mx);
@@ -284,14 +380,43 @@ nni_req_sock_resend(void *arg)
nni_mtx_unlock(mx);
continue;
}
- rv = nni_cv_until(&req->cv, req->resend);
- if ((rv == NNG_ETIMEDOUT) && (req->reqmsg != NULL)) {
- // XXX: check for final timeout on this?
- if (req->retrymsg == NULL) {
- nni_msg_dup(&req->retrymsg, req->reqmsg);
+
+ if ((req->wantw) || (nni_clock() >= req->resend)) {
+ req->wantw = 0;
+
+ if (nni_msg_dup(&msg, req->reqmsg) != 0) {
+ // Failed to alloc message, just wait for next
+ // retry.
+ req->resend = nni_clock() + req->retry;
+ nni_mtx_unlock(mx);
+ continue;
+ }
+
+ // Now we iterate across all possible outpipes, until
+ // one accepts it.
+ for (i = 0; i < req->npipes; i++) {
+ rp = req->nextpipe;
+ if (rp == NULL) {
+ rp = nni_list_first(&req->pipes);
+ }
+ req->nextpipe = nni_list_next(&req->pipes, rp);
+ if (nni_msgq_tryput(rp->mq, msg) == 0) {
+ req->pendpipe = rp;
+ msg = NULL;
+ break;
+ }
+ }
+ if (msg == NULL) {
+ // Message was published, update the timeout.
+ req->resend = nni_clock() + req->retry;
+ } else {
+ // No suitable outbound destination found,
+ // so alert us when we get one.
+ req->wantw = 1;
+ nni_msg_free(msg);
}
- req->resend = nni_clock() + req->retry;
}
+ nni_cv_until(&req->cv, req->resend);
nni_mtx_unlock(mx);
}
}
@@ -330,19 +455,15 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
}
// Make a duplicate message... for retries.
- if (nni_msg_dup(&req->reqmsg, msg) != 0) {
- nni_msg_free(msg);
- return (NULL);
- }
-
- // Schedule the next retry
- req->resend = nni_clock() + req->retry;
+ req->reqmsg = msg;
+ // Schedule for immediate send
+ req->resend = NNI_TIME_ZERO;
nni_cv_wake(&req->cv);
// Clear the error condition.
nni_sock_recverr(req->sock, 0);
- return (msg);
+ return (NULL);
}
@@ -375,6 +496,7 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg)
nni_sock_recverr(req->sock, NNG_ESTATE);
nni_msg_free(req->reqmsg);
req->reqmsg = NULL;
+ req->pendpipe = NULL;
nni_cv_wake(&req->cv);
return (msg);
}
@@ -399,7 +521,8 @@ static nni_proto_sock_ops nni_req_sock_ops = {
.sock_getopt = nni_req_sock_getopt,
.sock_rfilter = nni_req_sock_rfilter,
.sock_sfilter = nni_req_sock_sfilter,
- .sock_worker = { nni_req_sock_resend },
+ .sock_worker = { nni_req_sock_send,
+ nni_req_sock_resend },
};
nni_proto nni_req_proto = {