diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-23 22:49:57 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-23 22:49:57 -0800 |
| commit | 7df0822d3ed58ee73918cac576c0b07363e84425 (patch) | |
| tree | 5cae69dc0dc3e609260e0bd99bb8743c1c1a28cc /src/protocol | |
| parent | 91a0b46b6a63f1c2345279b831a02c972e7b1781 (diff) | |
| download | nng-7df0822d3ed58ee73918cac576c0b07363e84425.tar.gz nng-7df0822d3ed58ee73918cac576c0b07363e84425.tar.bz2 nng-7df0822d3ed58ee73918cac576c0b07363e84425.zip | |
Added a bunch more compatibility stuff.
I implemented the reqrep compatibility test, which uncovered a few
semantic issues I had in the REQ/REP protocol, which I've fixed.
There are still missing things. and at least one portion of the req/rep
test suite cannot be enabled until I add tuning of the reconnect timeout,
which is currently way too long (1 sec) for the test suite to work.
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/reqrep/req.c | 187 |
1 files changed, 155 insertions, 32 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index c200fdc9..fe61584c 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -9,6 +9,7 @@ #include <stdlib.h> #include <string.h> +#include <stdio.h> #include "core/nng_impl.h" @@ -29,8 +30,15 @@ struct nni_req_sock { nni_time resend; int raw; int closing; + int wantw; nni_msg * reqmsg; nni_msg * retrymsg; + + nni_list pipes; + nni_req_pipe * nextpipe; + nni_req_pipe * pendpipe; + int npipes; + uint32_t nextid; // next id uint8_t reqid[4]; // outstanding request ID (big endian) }; @@ -39,7 +47,9 @@ struct nni_req_sock { struct nni_req_pipe { nni_pipe * pipe; nni_req_sock * req; + nni_msgq * mq; int sigclose; + nni_list_node node; }; static void nni_req_resender(void *); @@ -58,12 +68,16 @@ nni_req_sock_init(void **reqp, nni_sock *sock) return (rv); } // this is "semi random" start for request IDs. + NNI_LIST_INIT(&req->pipes, nni_req_pipe, node); + req->nextpipe = NULL; + req->npipes = 0; req->nextid = nni_random(); req->retry = NNI_SECOND * 60; req->sock = sock; req->reqmsg = NULL; req->retrymsg = NULL; req->raw = 0; + req->wantw = 0; req->resend = NNI_TIME_ZERO; req->uwq = nni_sock_sendq(sock); @@ -106,10 +120,16 @@ static int nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { nni_req_pipe *rp; + int rv; if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_msgq_init(&rp->mq, 0)) != 0) { + NNI_FREE_STRUCT(rp); + return (rv); + } + NNI_LIST_NODE_INIT(&rp->node); rp->pipe = pipe; rp->sigclose = 0; rp->req = rsock; @@ -124,6 +144,7 @@ nni_req_pipe_fini(void *arg) nni_req_pipe *rp = arg; if (rp != NULL) { + nni_msgq_fini(rp->mq); NNI_FREE_STRUCT(rp); } } @@ -133,10 +154,14 @@ static int nni_req_pipe_add(void *arg) { nni_req_pipe *rp = arg; + nni_req_sock *req = rp->req; if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) { return (NNG_EPROTO); } + nni_list_append(&req->pipes, rp); + req->npipes++; + nni_cv_wake(&req->cv); // Wake the top sender, new job candidate! return (0); } @@ -144,8 +169,20 @@ nni_req_pipe_add(void *arg) static void nni_req_pipe_rem(void *arg) { - // As with add, nothing to do here. - NNI_ARG_UNUSED(arg); + nni_req_pipe *rp = arg; + nni_req_sock *req = rp->req; + + if (rp == req->nextpipe) { + req->nextpipe = nni_list_next(&req->pipes, rp); + } + if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { + // we are removing the pipe we sent the last request on... + // schedule immediate resend. + req->resend = NNI_TIME_ZERO; + nni_cv_wake(&req->cv); + } + req->npipes--; + nni_list_remove(&req->pipes, rp); } @@ -154,33 +191,27 @@ nni_req_pipe_send(void *arg) { nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; - nni_msgq *uwq = req->uwq; - nni_msgq *urq = req->urq; - nni_pipe *pipe = rp->pipe; nni_mtx *mx = nni_sock_mtx(req->sock); nni_msg *msg; int rv; for (;;) { nni_mtx_lock(mx); - if ((msg = req->retrymsg) != NULL) { - req->retrymsg = NULL; + if (req->wantw) { + nni_cv_wake(&req->cv); } nni_mtx_unlock(mx); - if (msg == NULL) { - rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose); - if (rv != 0) { - break; - } + if (nni_msgq_get_sig(rp->mq, &msg, &rp->sigclose) != 0) { + break; } - rv = nni_pipe_send(pipe, msg); + rv = nni_pipe_send(rp->pipe, msg); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgq_signal(urq, &rp->sigclose); - nni_pipe_close(pipe); + nni_msgq_signal(req->urq, &rp->sigclose); + nni_pipe_close(rp->pipe); } @@ -221,7 +252,7 @@ nni_req_pipe_recv(void *arg) break; } } - nni_msgq_signal(uwq, &rp->sigclose); + nni_msgq_signal(rp->mq, &rp->sigclose); nni_pipe_close(pipe); } @@ -267,11 +298,76 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static void +nni_req_sock_send(void *arg) +{ + nni_req_sock *req = arg; + nni_req_pipe *rp; + nni_msgq *uwq = req->uwq; + nni_msg *msg; + nni_mtx *mx = nni_sock_mtx(req->sock); + int i; + + msg = NULL; + + for (;;) { + if ((msg == NULL) && (nni_msgq_get(uwq, &msg) != 0)) { + // ECLOSED? Should be! + return; + } + + nni_mtx_lock(mx); + if (!req->raw) { + nni_mtx_unlock(mx); + // Cooked messages come another path... just toss + // this (shouldn't happen actually!) + if (msg != NULL) { + nni_msg_free(msg); + msg = NULL; + } + continue; + } + + if (req->closing) { + if (msg != NULL) { + nni_mtx_unlock(mx); + nni_msg_free(msg); + return; + } + } + req->wantw = 0; + for (i = 0; i < req->npipes; i++) { + rp = req->nextpipe; + if (rp == NULL) { + rp = nni_list_first(&req->pipes); + } + req->nextpipe = nni_list_next(&req->pipes, rp); + if (nni_msgq_tryput(rp->mq, msg) == 0) { + msg = NULL; + break; + } + } + // We weren't able to deliver it. We have two choices: + // 1) drop the message and let the originator resend, or + // 2) apply pushback. There is value in pushback, since it + // will cause senders to slow down, or redistribute the work. + // So, let's try that. + if (msg != NULL) { + req->wantw = 1; + nni_cv_wait(&req->cv); + } + nni_mtx_unlock(mx); + } +} + + +static void nni_req_sock_resend(void *arg) { nni_req_sock *req = arg; + nni_req_pipe *rp; nni_mtx *mx = nni_sock_mtx(req->sock); - int rv; + nni_msg *msg; + int i; for (;;) { nni_mtx_lock(mx); @@ -284,14 +380,43 @@ nni_req_sock_resend(void *arg) nni_mtx_unlock(mx); continue; } - rv = nni_cv_until(&req->cv, req->resend); - if ((rv == NNG_ETIMEDOUT) && (req->reqmsg != NULL)) { - // XXX: check for final timeout on this? - if (req->retrymsg == NULL) { - nni_msg_dup(&req->retrymsg, req->reqmsg); + + if ((req->wantw) || (nni_clock() >= req->resend)) { + req->wantw = 0; + + if (nni_msg_dup(&msg, req->reqmsg) != 0) { + // Failed to alloc message, just wait for next + // retry. + req->resend = nni_clock() + req->retry; + nni_mtx_unlock(mx); + continue; + } + + // Now we iterate across all possible outpipes, until + // one accepts it. + for (i = 0; i < req->npipes; i++) { + rp = req->nextpipe; + if (rp == NULL) { + rp = nni_list_first(&req->pipes); + } + req->nextpipe = nni_list_next(&req->pipes, rp); + if (nni_msgq_tryput(rp->mq, msg) == 0) { + req->pendpipe = rp; + msg = NULL; + break; + } + } + if (msg == NULL) { + // Message was published, update the timeout. + req->resend = nni_clock() + req->retry; + } else { + // No suitable outbound destination found, + // so alert us when we get one. + req->wantw = 1; + nni_msg_free(msg); } - req->resend = nni_clock() + req->retry; } + nni_cv_until(&req->cv, req->resend); nni_mtx_unlock(mx); } } @@ -330,19 +455,15 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) } // Make a duplicate message... for retries. - if (nni_msg_dup(&req->reqmsg, msg) != 0) { - nni_msg_free(msg); - return (NULL); - } - - // Schedule the next retry - req->resend = nni_clock() + req->retry; + req->reqmsg = msg; + // Schedule for immediate send + req->resend = NNI_TIME_ZERO; nni_cv_wake(&req->cv); // Clear the error condition. nni_sock_recverr(req->sock, 0); - return (msg); + return (NULL); } @@ -375,6 +496,7 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg) nni_sock_recverr(req->sock, NNG_ESTATE); nni_msg_free(req->reqmsg); req->reqmsg = NULL; + req->pendpipe = NULL; nni_cv_wake(&req->cv); return (msg); } @@ -399,7 +521,8 @@ static nni_proto_sock_ops nni_req_sock_ops = { .sock_getopt = nni_req_sock_getopt, .sock_rfilter = nni_req_sock_rfilter, .sock_sfilter = nni_req_sock_sfilter, - .sock_worker = { nni_req_sock_resend }, + .sock_worker = { nni_req_sock_send, + nni_req_sock_resend }, }; nni_proto nni_req_proto = { |
