diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-23 22:49:57 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-23 22:49:57 -0800 |
| commit | 7df0822d3ed58ee73918cac576c0b07363e84425 (patch) | |
| tree | 5cae69dc0dc3e609260e0bd99bb8743c1c1a28cc /src | |
| parent | 91a0b46b6a63f1c2345279b831a02c972e7b1781 (diff) | |
| download | nng-7df0822d3ed58ee73918cac576c0b07363e84425.tar.gz nng-7df0822d3ed58ee73918cac576c0b07363e84425.tar.bz2 nng-7df0822d3ed58ee73918cac576c0b07363e84425.zip | |
Added a bunch more compatibility stuff.
I implemented the reqrep compatibility test, which uncovered a few
semantic issues I had in the REQ/REP protocol, which I've fixed.
There are still missing things. and at least one portion of the req/rep
test suite cannot be enabled until I add tuning of the reconnect timeout,
which is currently way too long (1 sec) for the test suite to work.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/list.c | 2 | ||||
| -rw-r--r-- | src/core/options.h | 6 | ||||
| -rw-r--r-- | src/nng.c | 1 | ||||
| -rw-r--r-- | src/nng_compat.c | 138 | ||||
| -rw-r--r-- | src/nng_compat.h | 7 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 187 |
6 files changed, 295 insertions, 46 deletions
diff --git a/src/core/list.c b/src/core/list.c index 18858582..821e48bd 100644 --- a/src/core/list.c +++ b/src/core/list.c @@ -1,5 +1,5 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017up Garrett D'Amore <garrett@damore.org> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this diff --git a/src/core/options.h b/src/core/options.h index 6c46173f..2d843a4c 100644 --- a/src/core/options.h +++ b/src/core/options.h @@ -10,12 +10,6 @@ #ifndef CORE_OPTIONS_H #define CORE_OPTIONS_H -struct nni_notifyfd { - int sn_wfd; // written to in order to flag an event - int sn_rfd; // read from in order to clear an event - int sn_init; -}; - // Option helpers. These can be called from protocols or transports // in their own option handling, centralizing the logic for dealing with // variable sized options. @@ -266,6 +266,7 @@ nng_endpoint_close(nng_endpoint eid) return (NNG_ENOTSUP); } + int nng_setopt(nng_socket sid, int opt, const void *val, size_t sz) { diff --git a/src/nng_compat.c b/src/nng_compat.c index c9a36178..fd43af29 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -17,8 +17,8 @@ // avoid using these if at all possible, and instead use the new style APIs. static struct { - int perr; int nerr; + int perr; } nn_errnos[] = { { NNG_EINTR, EINTR }, @@ -82,6 +82,13 @@ nn_seterror(int err) int +nn_errno(void) +{ + return (errno); +} + + +int nn_socket(int domain, int protocol) { nng_socket sock; @@ -498,8 +505,8 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) // because we have to copy the data, but we should // only free this message on success. So we save the // message now. - cdata = *(void **)cdata; - cmsg = *(nng_msg **)(cdata - sizeof (cmsg)); + cdata = *(void **) cdata; + cmsg = *(nng_msg **) (cdata - sizeof (cmsg)); clen = nng_msg_len(cmsg); } if ((rv = nng_msg_append_header(msg, cdata, clen)) != 0) { @@ -512,7 +519,7 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) } sz = nng_msg_len(msg); - if ((rv = nng_sendmsg((nng_socket)s, msg, flags)) != 0) { + if ((rv = nng_sendmsg((nng_socket) s, msg, flags)) != 0) { if (!keep) { nng_msg_free(msg); } @@ -524,5 +531,126 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) // We sent successfully, so free up the control message. nng_msg_free(cmsg); } - return ((int)sz); + return ((int) sz); +} + + +int +nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz) +{ + int opt = 0; + int mscvt = 0; + uint64_t usec; + int rv; + + switch (nnlevel) { + case NN_SOL_SOCKET: + switch (nnopt) { + case NN_LINGER: + opt = NNG_OPT_LINGER; + break; + case NN_SNDBUF: + opt = NNG_OPT_SNDBUF; + break; + case NN_RCVBUF: + opt = NNG_OPT_RCVBUF; + break; + case NN_RECONNECT_IVL: + opt = NNG_OPT_RECONN_TIME; + mscvt = 1; + break; + case NN_RECONNECT_IVL_MAX: + opt = NNG_OPT_RECONN_MAXTIME; + mscvt = 1; + break; + case NN_SNDFD: + opt = NNG_OPT_SNDFD; + break; + case NN_RCVFD: + opt = NNG_OPT_RCVFD; + break; + case NN_RCVMAXSIZE: + opt = NNG_OPT_RCVMAXSZ; + break; + case NN_MAXTTL: + opt = NNG_OPT_MAXTTL; + break; + case NN_RCVTIMEO: + opt = NNG_OPT_RCVTIMEO; + mscvt = 1; + break; + case NN_SNDTIMEO: + opt = NNG_OPT_SNDTIMEO; + mscvt = 1; + break; + case NN_DOMAIN: + case NN_PROTOCOL: + case NN_IPV4ONLY: + case NN_SOCKET_NAME: + case NN_SNDPRIO: + case NN_RCVPRIO: + default: + errno = ENOPROTOOPT; + return (-1); + + break; + } + break; + case NN_REQ: + switch (nnopt) { + case NN_REQ_RESEND_IVL: + opt = NNG_OPT_RESENDTIME; + mscvt = 1; + break; + default: + errno = ENOPROTOOPT; + return (-1); + } + break; + case NN_SUB: + switch (nnopt) { + case NN_SUB_SUBSCRIBE: + opt = NNG_OPT_SUBSCRIBE; + break; + case NN_SUB_UNSUBSCRIBE: + opt = NNG_OPT_UNSUBSCRIBE; + break; + default: + errno = ENOPROTOOPT; + return (-1); + } + break; + case NN_SURVEYOR: + switch (nnopt) { + case NN_SURVEY_DEADLINE: + opt = NNG_OPT_SURVEYTIME; + mscvt = 1; + break; + default: + errno = ENOPROTOOPT; + return (-1); + } + default: + errno = ENOPROTOOPT; + return (-1); + } + + if (mscvt) { + // We have to convert value to ms... + + if (sz != sizeof (int)) { + errno = EINVAL; + return (-1); + } + usec = *(int *) valp; + usec *= 1000; + valp = &usec; + sz = sizeof (usec); + } + + if ((rv = nng_setopt((nng_socket) s, opt, valp, sz)) != 0) { + nn_seterror(rv); + return (-1); + } + return (0); } diff --git a/src/nng_compat.h b/src/nng_compat.h index c49a33db..64ace069 100644 --- a/src/nng_compat.h +++ b/src/nng_compat.h @@ -193,7 +193,7 @@ extern "C" { #define NN_LINGER 1 #define NN_SNDBUF 2 #define NN_RCVBUF 3 -#define NN_SNDTIMEO 5 +#define NN_SNDTIMEO 4 #define NN_RCVTIMEO 5 #define NN_RECONNECT_IVL 6 #define NN_RECONNECT_IVL_MAX 7 @@ -210,7 +210,8 @@ extern "C" { // Protocol-specific options. To simplify thins we encode the protocol // level in the option. -#define NN_SUB_UNSUBSCRIBE (NN_SUB * 16 + 1) +#define NN_SUB_SUBSCRIBE (NN_SUB * 16 + 1) +#define NN_SUB_UNSUBSCRIBE (NN_SUB * 16 + 2) #define NN_REQ_RESEND_IVL (NN_REQ * 16 + 1) #define NN_SURVEY_DEADLINE (NN_SURVEYOR * 16 + 1) @@ -294,6 +295,8 @@ NN_DECL uint64_t nn_get_statistic(int, int); NN_DECL void *nn_allocmsg(size_t, int); NN_DECL void *nn_reallocmsg(void *, size_t); NN_DECL int nn_freemsg(void *); +NN_DECL int nn_errno(void); +NN_DECL const char *nn_strerror(int); #ifdef __cplusplus } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index c200fdc9..fe61584c 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -9,6 +9,7 @@ #include <stdlib.h> #include <string.h> +#include <stdio.h> #include "core/nng_impl.h" @@ -29,8 +30,15 @@ struct nni_req_sock { nni_time resend; int raw; int closing; + int wantw; nni_msg * reqmsg; nni_msg * retrymsg; + + nni_list pipes; + nni_req_pipe * nextpipe; + nni_req_pipe * pendpipe; + int npipes; + uint32_t nextid; // next id uint8_t reqid[4]; // outstanding request ID (big endian) }; @@ -39,7 +47,9 @@ struct nni_req_sock { struct nni_req_pipe { nni_pipe * pipe; nni_req_sock * req; + nni_msgq * mq; int sigclose; + nni_list_node node; }; static void nni_req_resender(void *); @@ -58,12 +68,16 @@ nni_req_sock_init(void **reqp, nni_sock *sock) return (rv); } // this is "semi random" start for request IDs. + NNI_LIST_INIT(&req->pipes, nni_req_pipe, node); + req->nextpipe = NULL; + req->npipes = 0; req->nextid = nni_random(); req->retry = NNI_SECOND * 60; req->sock = sock; req->reqmsg = NULL; req->retrymsg = NULL; req->raw = 0; + req->wantw = 0; req->resend = NNI_TIME_ZERO; req->uwq = nni_sock_sendq(sock); @@ -106,10 +120,16 @@ static int nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { nni_req_pipe *rp; + int rv; if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_msgq_init(&rp->mq, 0)) != 0) { + NNI_FREE_STRUCT(rp); + return (rv); + } + NNI_LIST_NODE_INIT(&rp->node); rp->pipe = pipe; rp->sigclose = 0; rp->req = rsock; @@ -124,6 +144,7 @@ nni_req_pipe_fini(void *arg) nni_req_pipe *rp = arg; if (rp != NULL) { + nni_msgq_fini(rp->mq); NNI_FREE_STRUCT(rp); } } @@ -133,10 +154,14 @@ static int nni_req_pipe_add(void *arg) { nni_req_pipe *rp = arg; + nni_req_sock *req = rp->req; if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) { return (NNG_EPROTO); } + nni_list_append(&req->pipes, rp); + req->npipes++; + nni_cv_wake(&req->cv); // Wake the top sender, new job candidate! return (0); } @@ -144,8 +169,20 @@ nni_req_pipe_add(void *arg) static void nni_req_pipe_rem(void *arg) { - // As with add, nothing to do here. - NNI_ARG_UNUSED(arg); + nni_req_pipe *rp = arg; + nni_req_sock *req = rp->req; + + if (rp == req->nextpipe) { + req->nextpipe = nni_list_next(&req->pipes, rp); + } + if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { + // we are removing the pipe we sent the last request on... + // schedule immediate resend. + req->resend = NNI_TIME_ZERO; + nni_cv_wake(&req->cv); + } + req->npipes--; + nni_list_remove(&req->pipes, rp); } @@ -154,33 +191,27 @@ nni_req_pipe_send(void *arg) { nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; - nni_msgq *uwq = req->uwq; - nni_msgq *urq = req->urq; - nni_pipe *pipe = rp->pipe; nni_mtx *mx = nni_sock_mtx(req->sock); nni_msg *msg; int rv; for (;;) { nni_mtx_lock(mx); - if ((msg = req->retrymsg) != NULL) { - req->retrymsg = NULL; + if (req->wantw) { + nni_cv_wake(&req->cv); } nni_mtx_unlock(mx); - if (msg == NULL) { - rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose); - if (rv != 0) { - break; - } + if (nni_msgq_get_sig(rp->mq, &msg, &rp->sigclose) != 0) { + break; } - rv = nni_pipe_send(pipe, msg); + rv = nni_pipe_send(rp->pipe, msg); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgq_signal(urq, &rp->sigclose); - nni_pipe_close(pipe); + nni_msgq_signal(req->urq, &rp->sigclose); + nni_pipe_close(rp->pipe); } @@ -221,7 +252,7 @@ nni_req_pipe_recv(void *arg) break; } } - nni_msgq_signal(uwq, &rp->sigclose); + nni_msgq_signal(rp->mq, &rp->sigclose); nni_pipe_close(pipe); } @@ -267,11 +298,76 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static void +nni_req_sock_send(void *arg) +{ + nni_req_sock *req = arg; + nni_req_pipe *rp; + nni_msgq *uwq = req->uwq; + nni_msg *msg; + nni_mtx *mx = nni_sock_mtx(req->sock); + int i; + + msg = NULL; + + for (;;) { + if ((msg == NULL) && (nni_msgq_get(uwq, &msg) != 0)) { + // ECLOSED? Should be! + return; + } + + nni_mtx_lock(mx); + if (!req->raw) { + nni_mtx_unlock(mx); + // Cooked messages come another path... just toss + // this (shouldn't happen actually!) + if (msg != NULL) { + nni_msg_free(msg); + msg = NULL; + } + continue; + } + + if (req->closing) { + if (msg != NULL) { + nni_mtx_unlock(mx); + nni_msg_free(msg); + return; + } + } + req->wantw = 0; + for (i = 0; i < req->npipes; i++) { + rp = req->nextpipe; + if (rp == NULL) { + rp = nni_list_first(&req->pipes); + } + req->nextpipe = nni_list_next(&req->pipes, rp); + if (nni_msgq_tryput(rp->mq, msg) == 0) { + msg = NULL; + break; + } + } + // We weren't able to deliver it. We have two choices: + // 1) drop the message and let the originator resend, or + // 2) apply pushback. There is value in pushback, since it + // will cause senders to slow down, or redistribute the work. + // So, let's try that. + if (msg != NULL) { + req->wantw = 1; + nni_cv_wait(&req->cv); + } + nni_mtx_unlock(mx); + } +} + + +static void nni_req_sock_resend(void *arg) { nni_req_sock *req = arg; + nni_req_pipe *rp; nni_mtx *mx = nni_sock_mtx(req->sock); - int rv; + nni_msg *msg; + int i; for (;;) { nni_mtx_lock(mx); @@ -284,14 +380,43 @@ nni_req_sock_resend(void *arg) nni_mtx_unlock(mx); continue; } - rv = nni_cv_until(&req->cv, req->resend); - if ((rv == NNG_ETIMEDOUT) && (req->reqmsg != NULL)) { - // XXX: check for final timeout on this? - if (req->retrymsg == NULL) { - nni_msg_dup(&req->retrymsg, req->reqmsg); + + if ((req->wantw) || (nni_clock() >= req->resend)) { + req->wantw = 0; + + if (nni_msg_dup(&msg, req->reqmsg) != 0) { + // Failed to alloc message, just wait for next + // retry. + req->resend = nni_clock() + req->retry; + nni_mtx_unlock(mx); + continue; + } + + // Now we iterate across all possible outpipes, until + // one accepts it. + for (i = 0; i < req->npipes; i++) { + rp = req->nextpipe; + if (rp == NULL) { + rp = nni_list_first(&req->pipes); + } + req->nextpipe = nni_list_next(&req->pipes, rp); + if (nni_msgq_tryput(rp->mq, msg) == 0) { + req->pendpipe = rp; + msg = NULL; + break; + } + } + if (msg == NULL) { + // Message was published, update the timeout. + req->resend = nni_clock() + req->retry; + } else { + // No suitable outbound destination found, + // so alert us when we get one. + req->wantw = 1; + nni_msg_free(msg); } - req->resend = nni_clock() + req->retry; } + nni_cv_until(&req->cv, req->resend); nni_mtx_unlock(mx); } } @@ -330,19 +455,15 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) } // Make a duplicate message... for retries. - if (nni_msg_dup(&req->reqmsg, msg) != 0) { - nni_msg_free(msg); - return (NULL); - } - - // Schedule the next retry - req->resend = nni_clock() + req->retry; + req->reqmsg = msg; + // Schedule for immediate send + req->resend = NNI_TIME_ZERO; nni_cv_wake(&req->cv); // Clear the error condition. nni_sock_recverr(req->sock, 0); - return (msg); + return (NULL); } @@ -375,6 +496,7 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg) nni_sock_recverr(req->sock, NNG_ESTATE); nni_msg_free(req->reqmsg); req->reqmsg = NULL; + req->pendpipe = NULL; nni_cv_wake(&req->cv); return (msg); } @@ -399,7 +521,8 @@ static nni_proto_sock_ops nni_req_sock_ops = { .sock_getopt = nni_req_sock_getopt, .sock_rfilter = nni_req_sock_rfilter, .sock_sfilter = nni_req_sock_sfilter, - .sock_worker = { nni_req_sock_resend }, + .sock_worker = { nni_req_sock_send, + nni_req_sock_resend }, }; nni_proto nni_req_proto = { |
