aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-23 22:49:57 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-23 22:49:57 -0800
commit7df0822d3ed58ee73918cac576c0b07363e84425 (patch)
tree5cae69dc0dc3e609260e0bd99bb8743c1c1a28cc /src
parent91a0b46b6a63f1c2345279b831a02c972e7b1781 (diff)
downloadnng-7df0822d3ed58ee73918cac576c0b07363e84425.tar.gz
nng-7df0822d3ed58ee73918cac576c0b07363e84425.tar.bz2
nng-7df0822d3ed58ee73918cac576c0b07363e84425.zip
Added a bunch more compatibility stuff.
I implemented the reqrep compatibility test, which uncovered a few semantic issues I had in the REQ/REP protocol, which I've fixed. There are still missing things. and at least one portion of the req/rep test suite cannot be enabled until I add tuning of the reconnect timeout, which is currently way too long (1 sec) for the test suite to work.
Diffstat (limited to 'src')
-rw-r--r--src/core/list.c2
-rw-r--r--src/core/options.h6
-rw-r--r--src/nng.c1
-rw-r--r--src/nng_compat.c138
-rw-r--r--src/nng_compat.h7
-rw-r--r--src/protocol/reqrep/req.c187
6 files changed, 295 insertions, 46 deletions
diff --git a/src/core/list.c b/src/core/list.c
index 18858582..821e48bd 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -1,5 +1,5 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017up Garrett D'Amore <garrett@damore.org>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
diff --git a/src/core/options.h b/src/core/options.h
index 6c46173f..2d843a4c 100644
--- a/src/core/options.h
+++ b/src/core/options.h
@@ -10,12 +10,6 @@
#ifndef CORE_OPTIONS_H
#define CORE_OPTIONS_H
-struct nni_notifyfd {
- int sn_wfd; // written to in order to flag an event
- int sn_rfd; // read from in order to clear an event
- int sn_init;
-};
-
// Option helpers. These can be called from protocols or transports
// in their own option handling, centralizing the logic for dealing with
// variable sized options.
diff --git a/src/nng.c b/src/nng.c
index 28da1f20..e21a84e6 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -266,6 +266,7 @@ nng_endpoint_close(nng_endpoint eid)
return (NNG_ENOTSUP);
}
+
int
nng_setopt(nng_socket sid, int opt, const void *val, size_t sz)
{
diff --git a/src/nng_compat.c b/src/nng_compat.c
index c9a36178..fd43af29 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -17,8 +17,8 @@
// avoid using these if at all possible, and instead use the new style APIs.
static struct {
- int perr;
int nerr;
+ int perr;
}
nn_errnos[] = {
{ NNG_EINTR, EINTR },
@@ -82,6 +82,13 @@ nn_seterror(int err)
int
+nn_errno(void)
+{
+ return (errno);
+}
+
+
+int
nn_socket(int domain, int protocol)
{
nng_socket sock;
@@ -498,8 +505,8 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
// because we have to copy the data, but we should
// only free this message on success. So we save the
// message now.
- cdata = *(void **)cdata;
- cmsg = *(nng_msg **)(cdata - sizeof (cmsg));
+ cdata = *(void **) cdata;
+ cmsg = *(nng_msg **) (cdata - sizeof (cmsg));
clen = nng_msg_len(cmsg);
}
if ((rv = nng_msg_append_header(msg, cdata, clen)) != 0) {
@@ -512,7 +519,7 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
}
sz = nng_msg_len(msg);
- if ((rv = nng_sendmsg((nng_socket)s, msg, flags)) != 0) {
+ if ((rv = nng_sendmsg((nng_socket) s, msg, flags)) != 0) {
if (!keep) {
nng_msg_free(msg);
}
@@ -524,5 +531,126 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
// We sent successfully, so free up the control message.
nng_msg_free(cmsg);
}
- return ((int)sz);
+ return ((int) sz);
+}
+
+
+int
+nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz)
+{
+ int opt = 0;
+ int mscvt = 0;
+ uint64_t usec;
+ int rv;
+
+ switch (nnlevel) {
+ case NN_SOL_SOCKET:
+ switch (nnopt) {
+ case NN_LINGER:
+ opt = NNG_OPT_LINGER;
+ break;
+ case NN_SNDBUF:
+ opt = NNG_OPT_SNDBUF;
+ break;
+ case NN_RCVBUF:
+ opt = NNG_OPT_RCVBUF;
+ break;
+ case NN_RECONNECT_IVL:
+ opt = NNG_OPT_RECONN_TIME;
+ mscvt = 1;
+ break;
+ case NN_RECONNECT_IVL_MAX:
+ opt = NNG_OPT_RECONN_MAXTIME;
+ mscvt = 1;
+ break;
+ case NN_SNDFD:
+ opt = NNG_OPT_SNDFD;
+ break;
+ case NN_RCVFD:
+ opt = NNG_OPT_RCVFD;
+ break;
+ case NN_RCVMAXSIZE:
+ opt = NNG_OPT_RCVMAXSZ;
+ break;
+ case NN_MAXTTL:
+ opt = NNG_OPT_MAXTTL;
+ break;
+ case NN_RCVTIMEO:
+ opt = NNG_OPT_RCVTIMEO;
+ mscvt = 1;
+ break;
+ case NN_SNDTIMEO:
+ opt = NNG_OPT_SNDTIMEO;
+ mscvt = 1;
+ break;
+ case NN_DOMAIN:
+ case NN_PROTOCOL:
+ case NN_IPV4ONLY:
+ case NN_SOCKET_NAME:
+ case NN_SNDPRIO:
+ case NN_RCVPRIO:
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+
+ break;
+ }
+ break;
+ case NN_REQ:
+ switch (nnopt) {
+ case NN_REQ_RESEND_IVL:
+ opt = NNG_OPT_RESENDTIME;
+ mscvt = 1;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ break;
+ case NN_SUB:
+ switch (nnopt) {
+ case NN_SUB_SUBSCRIBE:
+ opt = NNG_OPT_SUBSCRIBE;
+ break;
+ case NN_SUB_UNSUBSCRIBE:
+ opt = NNG_OPT_UNSUBSCRIBE;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ break;
+ case NN_SURVEYOR:
+ switch (nnopt) {
+ case NN_SURVEY_DEADLINE:
+ opt = NNG_OPT_SURVEYTIME;
+ mscvt = 1;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+
+ if (mscvt) {
+ // We have to convert value to ms...
+
+ if (sz != sizeof (int)) {
+ errno = EINVAL;
+ return (-1);
+ }
+ usec = *(int *) valp;
+ usec *= 1000;
+ valp = &usec;
+ sz = sizeof (usec);
+ }
+
+ if ((rv = nng_setopt((nng_socket) s, opt, valp, sz)) != 0) {
+ nn_seterror(rv);
+ return (-1);
+ }
+ return (0);
}
diff --git a/src/nng_compat.h b/src/nng_compat.h
index c49a33db..64ace069 100644
--- a/src/nng_compat.h
+++ b/src/nng_compat.h
@@ -193,7 +193,7 @@ extern "C" {
#define NN_LINGER 1
#define NN_SNDBUF 2
#define NN_RCVBUF 3
-#define NN_SNDTIMEO 5
+#define NN_SNDTIMEO 4
#define NN_RCVTIMEO 5
#define NN_RECONNECT_IVL 6
#define NN_RECONNECT_IVL_MAX 7
@@ -210,7 +210,8 @@ extern "C" {
// Protocol-specific options. To simplify thins we encode the protocol
// level in the option.
-#define NN_SUB_UNSUBSCRIBE (NN_SUB * 16 + 1)
+#define NN_SUB_SUBSCRIBE (NN_SUB * 16 + 1)
+#define NN_SUB_UNSUBSCRIBE (NN_SUB * 16 + 2)
#define NN_REQ_RESEND_IVL (NN_REQ * 16 + 1)
#define NN_SURVEY_DEADLINE (NN_SURVEYOR * 16 + 1)
@@ -294,6 +295,8 @@ NN_DECL uint64_t nn_get_statistic(int, int);
NN_DECL void *nn_allocmsg(size_t, int);
NN_DECL void *nn_reallocmsg(void *, size_t);
NN_DECL int nn_freemsg(void *);
+NN_DECL int nn_errno(void);
+NN_DECL const char *nn_strerror(int);
#ifdef __cplusplus
}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index c200fdc9..fe61584c 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -9,6 +9,7 @@
#include <stdlib.h>
#include <string.h>
+#include <stdio.h>
#include "core/nng_impl.h"
@@ -29,8 +30,15 @@ struct nni_req_sock {
nni_time resend;
int raw;
int closing;
+ int wantw;
nni_msg * reqmsg;
nni_msg * retrymsg;
+
+ nni_list pipes;
+ nni_req_pipe * nextpipe;
+ nni_req_pipe * pendpipe;
+ int npipes;
+
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
};
@@ -39,7 +47,9 @@ struct nni_req_sock {
struct nni_req_pipe {
nni_pipe * pipe;
nni_req_sock * req;
+ nni_msgq * mq;
int sigclose;
+ nni_list_node node;
};
static void nni_req_resender(void *);
@@ -58,12 +68,16 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
return (rv);
}
// this is "semi random" start for request IDs.
+ NNI_LIST_INIT(&req->pipes, nni_req_pipe, node);
+ req->nextpipe = NULL;
+ req->npipes = 0;
req->nextid = nni_random();
req->retry = NNI_SECOND * 60;
req->sock = sock;
req->reqmsg = NULL;
req->retrymsg = NULL;
req->raw = 0;
+ req->wantw = 0;
req->resend = NNI_TIME_ZERO;
req->uwq = nni_sock_sendq(sock);
@@ -106,10 +120,16 @@ static int
nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
nni_req_pipe *rp;
+ int rv;
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_msgq_init(&rp->mq, 0)) != 0) {
+ NNI_FREE_STRUCT(rp);
+ return (rv);
+ }
+ NNI_LIST_NODE_INIT(&rp->node);
rp->pipe = pipe;
rp->sigclose = 0;
rp->req = rsock;
@@ -124,6 +144,7 @@ nni_req_pipe_fini(void *arg)
nni_req_pipe *rp = arg;
if (rp != NULL) {
+ nni_msgq_fini(rp->mq);
NNI_FREE_STRUCT(rp);
}
}
@@ -133,10 +154,14 @@ static int
nni_req_pipe_add(void *arg)
{
nni_req_pipe *rp = arg;
+ nni_req_sock *req = rp->req;
if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) {
return (NNG_EPROTO);
}
+ nni_list_append(&req->pipes, rp);
+ req->npipes++;
+ nni_cv_wake(&req->cv); // Wake the top sender, new job candidate!
return (0);
}
@@ -144,8 +169,20 @@ nni_req_pipe_add(void *arg)
static void
nni_req_pipe_rem(void *arg)
{
- // As with add, nothing to do here.
- NNI_ARG_UNUSED(arg);
+ nni_req_pipe *rp = arg;
+ nni_req_sock *req = rp->req;
+
+ if (rp == req->nextpipe) {
+ req->nextpipe = nni_list_next(&req->pipes, rp);
+ }
+ if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
+ // we are removing the pipe we sent the last request on...
+ // schedule immediate resend.
+ req->resend = NNI_TIME_ZERO;
+ nni_cv_wake(&req->cv);
+ }
+ req->npipes--;
+ nni_list_remove(&req->pipes, rp);
}
@@ -154,33 +191,27 @@ nni_req_pipe_send(void *arg)
{
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- nni_msgq *uwq = req->uwq;
- nni_msgq *urq = req->urq;
- nni_pipe *pipe = rp->pipe;
nni_mtx *mx = nni_sock_mtx(req->sock);
nni_msg *msg;
int rv;
for (;;) {
nni_mtx_lock(mx);
- if ((msg = req->retrymsg) != NULL) {
- req->retrymsg = NULL;
+ if (req->wantw) {
+ nni_cv_wake(&req->cv);
}
nni_mtx_unlock(mx);
- if (msg == NULL) {
- rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose);
- if (rv != 0) {
- break;
- }
+ if (nni_msgq_get_sig(rp->mq, &msg, &rp->sigclose) != 0) {
+ break;
}
- rv = nni_pipe_send(pipe, msg);
+ rv = nni_pipe_send(rp->pipe, msg);
if (rv != 0) {
nni_msg_free(msg);
break;
}
}
- nni_msgq_signal(urq, &rp->sigclose);
- nni_pipe_close(pipe);
+ nni_msgq_signal(req->urq, &rp->sigclose);
+ nni_pipe_close(rp->pipe);
}
@@ -221,7 +252,7 @@ nni_req_pipe_recv(void *arg)
break;
}
}
- nni_msgq_signal(uwq, &rp->sigclose);
+ nni_msgq_signal(rp->mq, &rp->sigclose);
nni_pipe_close(pipe);
}
@@ -267,11 +298,76 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static void
+nni_req_sock_send(void *arg)
+{
+ nni_req_sock *req = arg;
+ nni_req_pipe *rp;
+ nni_msgq *uwq = req->uwq;
+ nni_msg *msg;
+ nni_mtx *mx = nni_sock_mtx(req->sock);
+ int i;
+
+ msg = NULL;
+
+ for (;;) {
+ if ((msg == NULL) && (nni_msgq_get(uwq, &msg) != 0)) {
+ // ECLOSED? Should be!
+ return;
+ }
+
+ nni_mtx_lock(mx);
+ if (!req->raw) {
+ nni_mtx_unlock(mx);
+ // Cooked messages come another path... just toss
+ // this (shouldn't happen actually!)
+ if (msg != NULL) {
+ nni_msg_free(msg);
+ msg = NULL;
+ }
+ continue;
+ }
+
+ if (req->closing) {
+ if (msg != NULL) {
+ nni_mtx_unlock(mx);
+ nni_msg_free(msg);
+ return;
+ }
+ }
+ req->wantw = 0;
+ for (i = 0; i < req->npipes; i++) {
+ rp = req->nextpipe;
+ if (rp == NULL) {
+ rp = nni_list_first(&req->pipes);
+ }
+ req->nextpipe = nni_list_next(&req->pipes, rp);
+ if (nni_msgq_tryput(rp->mq, msg) == 0) {
+ msg = NULL;
+ break;
+ }
+ }
+ // We weren't able to deliver it. We have two choices:
+ // 1) drop the message and let the originator resend, or
+ // 2) apply pushback. There is value in pushback, since it
+ // will cause senders to slow down, or redistribute the work.
+ // So, let's try that.
+ if (msg != NULL) {
+ req->wantw = 1;
+ nni_cv_wait(&req->cv);
+ }
+ nni_mtx_unlock(mx);
+ }
+}
+
+
+static void
nni_req_sock_resend(void *arg)
{
nni_req_sock *req = arg;
+ nni_req_pipe *rp;
nni_mtx *mx = nni_sock_mtx(req->sock);
- int rv;
+ nni_msg *msg;
+ int i;
for (;;) {
nni_mtx_lock(mx);
@@ -284,14 +380,43 @@ nni_req_sock_resend(void *arg)
nni_mtx_unlock(mx);
continue;
}
- rv = nni_cv_until(&req->cv, req->resend);
- if ((rv == NNG_ETIMEDOUT) && (req->reqmsg != NULL)) {
- // XXX: check for final timeout on this?
- if (req->retrymsg == NULL) {
- nni_msg_dup(&req->retrymsg, req->reqmsg);
+
+ if ((req->wantw) || (nni_clock() >= req->resend)) {
+ req->wantw = 0;
+
+ if (nni_msg_dup(&msg, req->reqmsg) != 0) {
+ // Failed to alloc message, just wait for next
+ // retry.
+ req->resend = nni_clock() + req->retry;
+ nni_mtx_unlock(mx);
+ continue;
+ }
+
+ // Now we iterate across all possible outpipes, until
+ // one accepts it.
+ for (i = 0; i < req->npipes; i++) {
+ rp = req->nextpipe;
+ if (rp == NULL) {
+ rp = nni_list_first(&req->pipes);
+ }
+ req->nextpipe = nni_list_next(&req->pipes, rp);
+ if (nni_msgq_tryput(rp->mq, msg) == 0) {
+ req->pendpipe = rp;
+ msg = NULL;
+ break;
+ }
+ }
+ if (msg == NULL) {
+ // Message was published, update the timeout.
+ req->resend = nni_clock() + req->retry;
+ } else {
+ // No suitable outbound destination found,
+ // so alert us when we get one.
+ req->wantw = 1;
+ nni_msg_free(msg);
}
- req->resend = nni_clock() + req->retry;
}
+ nni_cv_until(&req->cv, req->resend);
nni_mtx_unlock(mx);
}
}
@@ -330,19 +455,15 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
}
// Make a duplicate message... for retries.
- if (nni_msg_dup(&req->reqmsg, msg) != 0) {
- nni_msg_free(msg);
- return (NULL);
- }
-
- // Schedule the next retry
- req->resend = nni_clock() + req->retry;
+ req->reqmsg = msg;
+ // Schedule for immediate send
+ req->resend = NNI_TIME_ZERO;
nni_cv_wake(&req->cv);
// Clear the error condition.
nni_sock_recverr(req->sock, 0);
- return (msg);
+ return (NULL);
}
@@ -375,6 +496,7 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg)
nni_sock_recverr(req->sock, NNG_ESTATE);
nni_msg_free(req->reqmsg);
req->reqmsg = NULL;
+ req->pendpipe = NULL;
nni_cv_wake(&req->cv);
return (msg);
}
@@ -399,7 +521,8 @@ static nni_proto_sock_ops nni_req_sock_ops = {
.sock_getopt = nni_req_sock_getopt,
.sock_rfilter = nni_req_sock_rfilter,
.sock_sfilter = nni_req_sock_sfilter,
- .sock_worker = { nni_req_sock_resend },
+ .sock_worker = { nni_req_sock_send,
+ nni_req_sock_resend },
};
nni_proto nni_req_proto = {