aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-23 22:49:57 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-23 22:49:57 -0800
commit7df0822d3ed58ee73918cac576c0b07363e84425 (patch)
tree5cae69dc0dc3e609260e0bd99bb8743c1c1a28cc
parent91a0b46b6a63f1c2345279b831a02c972e7b1781 (diff)
downloadnng-7df0822d3ed58ee73918cac576c0b07363e84425.tar.gz
nng-7df0822d3ed58ee73918cac576c0b07363e84425.tar.bz2
nng-7df0822d3ed58ee73918cac576c0b07363e84425.zip
Added a bunch more compatibility stuff.
I implemented the reqrep compatibility test, which uncovered a few semantic issues I had in the REQ/REP protocol, which I've fixed. There are still missing things. and at least one portion of the req/rep test suite cannot be enabled until I add tuning of the reconnect timeout, which is currently way too long (1 sec) for the test suite to work.
-rw-r--r--src/core/list.c2
-rw-r--r--src/core/options.h6
-rw-r--r--src/nng.c1
-rw-r--r--src/nng_compat.c138
-rw-r--r--src/nng_compat.h7
-rw-r--r--src/protocol/reqrep/req.c187
-rw-r--r--tests/CMakeLists.txt18
-rw-r--r--tests/compat_reqrep.c194
-rw-r--r--tests/compat_testutil.h224
9 files changed, 731 insertions, 46 deletions
diff --git a/src/core/list.c b/src/core/list.c
index 18858582..821e48bd 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -1,5 +1,5 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017up Garrett D'Amore <garrett@damore.org>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
diff --git a/src/core/options.h b/src/core/options.h
index 6c46173f..2d843a4c 100644
--- a/src/core/options.h
+++ b/src/core/options.h
@@ -10,12 +10,6 @@
#ifndef CORE_OPTIONS_H
#define CORE_OPTIONS_H
-struct nni_notifyfd {
- int sn_wfd; // written to in order to flag an event
- int sn_rfd; // read from in order to clear an event
- int sn_init;
-};
-
// Option helpers. These can be called from protocols or transports
// in their own option handling, centralizing the logic for dealing with
// variable sized options.
diff --git a/src/nng.c b/src/nng.c
index 28da1f20..e21a84e6 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -266,6 +266,7 @@ nng_endpoint_close(nng_endpoint eid)
return (NNG_ENOTSUP);
}
+
int
nng_setopt(nng_socket sid, int opt, const void *val, size_t sz)
{
diff --git a/src/nng_compat.c b/src/nng_compat.c
index c9a36178..fd43af29 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -17,8 +17,8 @@
// avoid using these if at all possible, and instead use the new style APIs.
static struct {
- int perr;
int nerr;
+ int perr;
}
nn_errnos[] = {
{ NNG_EINTR, EINTR },
@@ -82,6 +82,13 @@ nn_seterror(int err)
int
+nn_errno(void)
+{
+ return (errno);
+}
+
+
+int
nn_socket(int domain, int protocol)
{
nng_socket sock;
@@ -498,8 +505,8 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
// because we have to copy the data, but we should
// only free this message on success. So we save the
// message now.
- cdata = *(void **)cdata;
- cmsg = *(nng_msg **)(cdata - sizeof (cmsg));
+ cdata = *(void **) cdata;
+ cmsg = *(nng_msg **) (cdata - sizeof (cmsg));
clen = nng_msg_len(cmsg);
}
if ((rv = nng_msg_append_header(msg, cdata, clen)) != 0) {
@@ -512,7 +519,7 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
}
sz = nng_msg_len(msg);
- if ((rv = nng_sendmsg((nng_socket)s, msg, flags)) != 0) {
+ if ((rv = nng_sendmsg((nng_socket) s, msg, flags)) != 0) {
if (!keep) {
nng_msg_free(msg);
}
@@ -524,5 +531,126 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
// We sent successfully, so free up the control message.
nng_msg_free(cmsg);
}
- return ((int)sz);
+ return ((int) sz);
+}
+
+
+int
+nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz)
+{
+ int opt = 0;
+ int mscvt = 0;
+ uint64_t usec;
+ int rv;
+
+ switch (nnlevel) {
+ case NN_SOL_SOCKET:
+ switch (nnopt) {
+ case NN_LINGER:
+ opt = NNG_OPT_LINGER;
+ break;
+ case NN_SNDBUF:
+ opt = NNG_OPT_SNDBUF;
+ break;
+ case NN_RCVBUF:
+ opt = NNG_OPT_RCVBUF;
+ break;
+ case NN_RECONNECT_IVL:
+ opt = NNG_OPT_RECONN_TIME;
+ mscvt = 1;
+ break;
+ case NN_RECONNECT_IVL_MAX:
+ opt = NNG_OPT_RECONN_MAXTIME;
+ mscvt = 1;
+ break;
+ case NN_SNDFD:
+ opt = NNG_OPT_SNDFD;
+ break;
+ case NN_RCVFD:
+ opt = NNG_OPT_RCVFD;
+ break;
+ case NN_RCVMAXSIZE:
+ opt = NNG_OPT_RCVMAXSZ;
+ break;
+ case NN_MAXTTL:
+ opt = NNG_OPT_MAXTTL;
+ break;
+ case NN_RCVTIMEO:
+ opt = NNG_OPT_RCVTIMEO;
+ mscvt = 1;
+ break;
+ case NN_SNDTIMEO:
+ opt = NNG_OPT_SNDTIMEO;
+ mscvt = 1;
+ break;
+ case NN_DOMAIN:
+ case NN_PROTOCOL:
+ case NN_IPV4ONLY:
+ case NN_SOCKET_NAME:
+ case NN_SNDPRIO:
+ case NN_RCVPRIO:
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+
+ break;
+ }
+ break;
+ case NN_REQ:
+ switch (nnopt) {
+ case NN_REQ_RESEND_IVL:
+ opt = NNG_OPT_RESENDTIME;
+ mscvt = 1;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ break;
+ case NN_SUB:
+ switch (nnopt) {
+ case NN_SUB_SUBSCRIBE:
+ opt = NNG_OPT_SUBSCRIBE;
+ break;
+ case NN_SUB_UNSUBSCRIBE:
+ opt = NNG_OPT_UNSUBSCRIBE;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ break;
+ case NN_SURVEYOR:
+ switch (nnopt) {
+ case NN_SURVEY_DEADLINE:
+ opt = NNG_OPT_SURVEYTIME;
+ mscvt = 1;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+
+ if (mscvt) {
+ // We have to convert value to ms...
+
+ if (sz != sizeof (int)) {
+ errno = EINVAL;
+ return (-1);
+ }
+ usec = *(int *) valp;
+ usec *= 1000;
+ valp = &usec;
+ sz = sizeof (usec);
+ }
+
+ if ((rv = nng_setopt((nng_socket) s, opt, valp, sz)) != 0) {
+ nn_seterror(rv);
+ return (-1);
+ }
+ return (0);
}
diff --git a/src/nng_compat.h b/src/nng_compat.h
index c49a33db..64ace069 100644
--- a/src/nng_compat.h
+++ b/src/nng_compat.h
@@ -193,7 +193,7 @@ extern "C" {
#define NN_LINGER 1
#define NN_SNDBUF 2
#define NN_RCVBUF 3
-#define NN_SNDTIMEO 5
+#define NN_SNDTIMEO 4
#define NN_RCVTIMEO 5
#define NN_RECONNECT_IVL 6
#define NN_RECONNECT_IVL_MAX 7
@@ -210,7 +210,8 @@ extern "C" {
// Protocol-specific options. To simplify thins we encode the protocol
// level in the option.
-#define NN_SUB_UNSUBSCRIBE (NN_SUB * 16 + 1)
+#define NN_SUB_SUBSCRIBE (NN_SUB * 16 + 1)
+#define NN_SUB_UNSUBSCRIBE (NN_SUB * 16 + 2)
#define NN_REQ_RESEND_IVL (NN_REQ * 16 + 1)
#define NN_SURVEY_DEADLINE (NN_SURVEYOR * 16 + 1)
@@ -294,6 +295,8 @@ NN_DECL uint64_t nn_get_statistic(int, int);
NN_DECL void *nn_allocmsg(size_t, int);
NN_DECL void *nn_reallocmsg(void *, size_t);
NN_DECL int nn_freemsg(void *);
+NN_DECL int nn_errno(void);
+NN_DECL const char *nn_strerror(int);
#ifdef __cplusplus
}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index c200fdc9..fe61584c 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -9,6 +9,7 @@
#include <stdlib.h>
#include <string.h>
+#include <stdio.h>
#include "core/nng_impl.h"
@@ -29,8 +30,15 @@ struct nni_req_sock {
nni_time resend;
int raw;
int closing;
+ int wantw;
nni_msg * reqmsg;
nni_msg * retrymsg;
+
+ nni_list pipes;
+ nni_req_pipe * nextpipe;
+ nni_req_pipe * pendpipe;
+ int npipes;
+
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
};
@@ -39,7 +47,9 @@ struct nni_req_sock {
struct nni_req_pipe {
nni_pipe * pipe;
nni_req_sock * req;
+ nni_msgq * mq;
int sigclose;
+ nni_list_node node;
};
static void nni_req_resender(void *);
@@ -58,12 +68,16 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
return (rv);
}
// this is "semi random" start for request IDs.
+ NNI_LIST_INIT(&req->pipes, nni_req_pipe, node);
+ req->nextpipe = NULL;
+ req->npipes = 0;
req->nextid = nni_random();
req->retry = NNI_SECOND * 60;
req->sock = sock;
req->reqmsg = NULL;
req->retrymsg = NULL;
req->raw = 0;
+ req->wantw = 0;
req->resend = NNI_TIME_ZERO;
req->uwq = nni_sock_sendq(sock);
@@ -106,10 +120,16 @@ static int
nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
nni_req_pipe *rp;
+ int rv;
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_msgq_init(&rp->mq, 0)) != 0) {
+ NNI_FREE_STRUCT(rp);
+ return (rv);
+ }
+ NNI_LIST_NODE_INIT(&rp->node);
rp->pipe = pipe;
rp->sigclose = 0;
rp->req = rsock;
@@ -124,6 +144,7 @@ nni_req_pipe_fini(void *arg)
nni_req_pipe *rp = arg;
if (rp != NULL) {
+ nni_msgq_fini(rp->mq);
NNI_FREE_STRUCT(rp);
}
}
@@ -133,10 +154,14 @@ static int
nni_req_pipe_add(void *arg)
{
nni_req_pipe *rp = arg;
+ nni_req_sock *req = rp->req;
if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) {
return (NNG_EPROTO);
}
+ nni_list_append(&req->pipes, rp);
+ req->npipes++;
+ nni_cv_wake(&req->cv); // Wake the top sender, new job candidate!
return (0);
}
@@ -144,8 +169,20 @@ nni_req_pipe_add(void *arg)
static void
nni_req_pipe_rem(void *arg)
{
- // As with add, nothing to do here.
- NNI_ARG_UNUSED(arg);
+ nni_req_pipe *rp = arg;
+ nni_req_sock *req = rp->req;
+
+ if (rp == req->nextpipe) {
+ req->nextpipe = nni_list_next(&req->pipes, rp);
+ }
+ if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
+ // we are removing the pipe we sent the last request on...
+ // schedule immediate resend.
+ req->resend = NNI_TIME_ZERO;
+ nni_cv_wake(&req->cv);
+ }
+ req->npipes--;
+ nni_list_remove(&req->pipes, rp);
}
@@ -154,33 +191,27 @@ nni_req_pipe_send(void *arg)
{
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- nni_msgq *uwq = req->uwq;
- nni_msgq *urq = req->urq;
- nni_pipe *pipe = rp->pipe;
nni_mtx *mx = nni_sock_mtx(req->sock);
nni_msg *msg;
int rv;
for (;;) {
nni_mtx_lock(mx);
- if ((msg = req->retrymsg) != NULL) {
- req->retrymsg = NULL;
+ if (req->wantw) {
+ nni_cv_wake(&req->cv);
}
nni_mtx_unlock(mx);
- if (msg == NULL) {
- rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose);
- if (rv != 0) {
- break;
- }
+ if (nni_msgq_get_sig(rp->mq, &msg, &rp->sigclose) != 0) {
+ break;
}
- rv = nni_pipe_send(pipe, msg);
+ rv = nni_pipe_send(rp->pipe, msg);
if (rv != 0) {
nni_msg_free(msg);
break;
}
}
- nni_msgq_signal(urq, &rp->sigclose);
- nni_pipe_close(pipe);
+ nni_msgq_signal(req->urq, &rp->sigclose);
+ nni_pipe_close(rp->pipe);
}
@@ -221,7 +252,7 @@ nni_req_pipe_recv(void *arg)
break;
}
}
- nni_msgq_signal(uwq, &rp->sigclose);
+ nni_msgq_signal(rp->mq, &rp->sigclose);
nni_pipe_close(pipe);
}
@@ -267,11 +298,76 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static void
+nni_req_sock_send(void *arg)
+{
+ nni_req_sock *req = arg;
+ nni_req_pipe *rp;
+ nni_msgq *uwq = req->uwq;
+ nni_msg *msg;
+ nni_mtx *mx = nni_sock_mtx(req->sock);
+ int i;
+
+ msg = NULL;
+
+ for (;;) {
+ if ((msg == NULL) && (nni_msgq_get(uwq, &msg) != 0)) {
+ // ECLOSED? Should be!
+ return;
+ }
+
+ nni_mtx_lock(mx);
+ if (!req->raw) {
+ nni_mtx_unlock(mx);
+ // Cooked messages come another path... just toss
+ // this (shouldn't happen actually!)
+ if (msg != NULL) {
+ nni_msg_free(msg);
+ msg = NULL;
+ }
+ continue;
+ }
+
+ if (req->closing) {
+ if (msg != NULL) {
+ nni_mtx_unlock(mx);
+ nni_msg_free(msg);
+ return;
+ }
+ }
+ req->wantw = 0;
+ for (i = 0; i < req->npipes; i++) {
+ rp = req->nextpipe;
+ if (rp == NULL) {
+ rp = nni_list_first(&req->pipes);
+ }
+ req->nextpipe = nni_list_next(&req->pipes, rp);
+ if (nni_msgq_tryput(rp->mq, msg) == 0) {
+ msg = NULL;
+ break;
+ }
+ }
+ // We weren't able to deliver it. We have two choices:
+ // 1) drop the message and let the originator resend, or
+ // 2) apply pushback. There is value in pushback, since it
+ // will cause senders to slow down, or redistribute the work.
+ // So, let's try that.
+ if (msg != NULL) {
+ req->wantw = 1;
+ nni_cv_wait(&req->cv);
+ }
+ nni_mtx_unlock(mx);
+ }
+}
+
+
+static void
nni_req_sock_resend(void *arg)
{
nni_req_sock *req = arg;
+ nni_req_pipe *rp;
nni_mtx *mx = nni_sock_mtx(req->sock);
- int rv;
+ nni_msg *msg;
+ int i;
for (;;) {
nni_mtx_lock(mx);
@@ -284,14 +380,43 @@ nni_req_sock_resend(void *arg)
nni_mtx_unlock(mx);
continue;
}
- rv = nni_cv_until(&req->cv, req->resend);
- if ((rv == NNG_ETIMEDOUT) && (req->reqmsg != NULL)) {
- // XXX: check for final timeout on this?
- if (req->retrymsg == NULL) {
- nni_msg_dup(&req->retrymsg, req->reqmsg);
+
+ if ((req->wantw) || (nni_clock() >= req->resend)) {
+ req->wantw = 0;
+
+ if (nni_msg_dup(&msg, req->reqmsg) != 0) {
+ // Failed to alloc message, just wait for next
+ // retry.
+ req->resend = nni_clock() + req->retry;
+ nni_mtx_unlock(mx);
+ continue;
+ }
+
+ // Now we iterate across all possible outpipes, until
+ // one accepts it.
+ for (i = 0; i < req->npipes; i++) {
+ rp = req->nextpipe;
+ if (rp == NULL) {
+ rp = nni_list_first(&req->pipes);
+ }
+ req->nextpipe = nni_list_next(&req->pipes, rp);
+ if (nni_msgq_tryput(rp->mq, msg) == 0) {
+ req->pendpipe = rp;
+ msg = NULL;
+ break;
+ }
+ }
+ if (msg == NULL) {
+ // Message was published, update the timeout.
+ req->resend = nni_clock() + req->retry;
+ } else {
+ // No suitable outbound destination found,
+ // so alert us when we get one.
+ req->wantw = 1;
+ nni_msg_free(msg);
}
- req->resend = nni_clock() + req->retry;
}
+ nni_cv_until(&req->cv, req->resend);
nni_mtx_unlock(mx);
}
}
@@ -330,19 +455,15 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
}
// Make a duplicate message... for retries.
- if (nni_msg_dup(&req->reqmsg, msg) != 0) {
- nni_msg_free(msg);
- return (NULL);
- }
-
- // Schedule the next retry
- req->resend = nni_clock() + req->retry;
+ req->reqmsg = msg;
+ // Schedule for immediate send
+ req->resend = NNI_TIME_ZERO;
nni_cv_wake(&req->cv);
// Clear the error condition.
nni_sock_recverr(req->sock, 0);
- return (msg);
+ return (NULL);
}
@@ -375,6 +496,7 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg)
nni_sock_recverr(req->sock, NNG_ESTATE);
nni_msg_free(req->reqmsg);
req->reqmsg = NULL;
+ req->pendpipe = NULL;
nni_cv_wake(&req->cv);
return (msg);
}
@@ -399,7 +521,8 @@ static nni_proto_sock_ops nni_req_sock_ops = {
.sock_getopt = nni_req_sock_getopt,
.sock_rfilter = nni_req_sock_rfilter,
.sock_sfilter = nni_req_sock_sfilter,
- .sock_worker = { nni_req_sock_resend },
+ .sock_worker = { nni_req_sock_send,
+ nni_req_sock_resend },
};
nni_proto nni_req_proto = {
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 2b7a816e..52c9fdba 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -50,6 +50,21 @@ if (NNG_TESTS)
math (EXPR TEST_PORT "${TEST_PORT}+10")
endmacro (add_nng_test)
+ macro (add_nng_compat_test NAME TIMEOUT)
+ list (APPEND all_tests ${NAME})
+ add_executable (${NAME} ${NAME}.c)
+ target_link_libraries (${NAME} ${PROJECT_NAME}_static)
+ target_link_libraries (${NAME} ${NNG_REQUIRED_LIBRARIES})
+ target_compile_definitions(${NAME} PUBLIC -DNNG_STATIC_LIB)
+ if (CMAKE_THREAD_LIBS_INIT)
+ target_link_libraries (${NAME} "${CMAKE_THREAD_LIBS_INIT}")
+ endif()
+
+ add_test (NAME ${NAME} COMMAND ${NAME} -v ${TEST_PORT})
+ set_tests_properties (${NAME} PROPERTIES TIMEOUT ${TIMEOUT})
+ math (EXPR TEST_PORT "${TEST_PORT}+10")
+ endmacro (add_nng_compat_test)
+
else ()
macro (add_nng_test NAME TIMEOUT)
endmacro (add_nng_test)
@@ -69,3 +84,6 @@ add_nng_test(pubsub 5)
add_nng_test(sock 5)
add_nng_test(survey 5)
add_nng_test(tcp 5)
+
+# compatbility tests
+add_nng_compat_test(compat_reqrep 5)
diff --git a/tests/compat_reqrep.c b/tests/compat_reqrep.c
new file mode 100644
index 00000000..a418b8d9
--- /dev/null
+++ b/tests/compat_reqrep.c
@@ -0,0 +1,194 @@
+/*
+ Copyright (c) 2012 Martin Sustrik All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "nng_compat.h"
+
+#include "compat_testutil.h"
+
+#define SOCKET_ADDRESS "inproc://test"
+
+extern void nng_usleep(uint64_t);
+
+int main ()
+{
+ int rc;
+ int rep1;
+ int rep2;
+ int req1;
+ int req2;
+ int resend_ivl;
+ char buf [7];
+ int timeo;
+
+ /* Test req/rep with full socket types. */
+ rep1 = test_socket (AF_SP, NN_REP);
+ test_bind (rep1, SOCKET_ADDRESS);
+ req1 = test_socket (AF_SP, NN_REQ);
+ test_connect (req1, SOCKET_ADDRESS);
+ req2 = test_socket (AF_SP, NN_REQ);
+ test_connect (req2, SOCKET_ADDRESS);
+
+ /* Check invalid sequence of sends and recvs. */
+ rc = nn_send (rep1, "ABC", 3, 0);
+ nn_assert (rc == -1 && nn_errno () == EFSM);
+ rc = nn_recv (req1, buf, sizeof (buf), 0);
+ nn_assert (rc == -1 && nn_errno () == EFSM);
+
+ /* Check fair queueing the requests. */
+ test_send (req2, "ABC");
+ test_recv (rep1, "ABC");
+ test_send (rep1, "ABC");
+ test_recv (req2, "ABC");
+
+ test_send (req1, "ABC");
+ test_recv (rep1, "ABC");
+ test_send (rep1, "ABC");
+ test_recv (req1, "ABC");
+
+ test_close (rep1);
+ test_close (req1);
+ test_close (req2);
+
+ /* Check load-balancing of requests. */
+ req1 = test_socket (AF_SP, NN_REQ);
+ test_bind (req1, SOCKET_ADDRESS);
+
+ rep1 = test_socket (AF_SP, NN_REP);
+ test_connect (rep1, SOCKET_ADDRESS);
+
+ nng_usleep(10000); // ensure rep1 binds before rep2
+
+ rep2 = test_socket (AF_SP, NN_REP);
+ test_connect (rep2, SOCKET_ADDRESS);
+
+ timeo = 500;
+ test_setsockopt (req1, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo));
+ test_setsockopt (rep1, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo));
+ test_setsockopt (rep2, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo));
+
+ // Give time for connections to settle -- required for LB stuff.
+ nng_usleep(100000);
+
+ test_send (req1, "ABC");
+ test_recv (rep1, "ABC");
+ test_send (rep1, "ABC");
+ test_recv (req1, "ABC");
+
+ test_send (req1, "ABC");
+ test_recv (rep2, "ABC");
+ test_send (rep2, "ABC");
+ test_recv (req1, "ABC");
+
+ test_close (rep2);
+ test_close (rep1);
+ test_close (req1);
+
+ /* Test re-sending of the request. */
+ rep1 = test_socket (AF_SP, NN_REP);
+ test_bind (rep1, SOCKET_ADDRESS);
+ req1 = test_socket (AF_SP, NN_REQ);
+ test_connect (req1, SOCKET_ADDRESS);
+ resend_ivl = 100;
+ rc = nn_setsockopt (req1, NN_REQ, NN_REQ_RESEND_IVL,
+ &resend_ivl, sizeof (resend_ivl));
+ errno_assert (rc == 0);
+
+ test_send (req1, "ABC");
+ test_recv (rep1, "ABC");
+ /* The following waits for request to be resent */
+ test_recv (rep1, "ABC");
+
+ test_close (req1);
+ test_close (rep1);
+
+
+#if 0 // The default reconnect interval is waaay to large for this to pass.
+ /* Check sending a request when the peer is not available. (It should
+ be sent immediatelly when the peer comes online rather than relying
+ on the resend algorithm. */
+ req1 = test_socket (AF_SP, NN_REQ);
+ timeo = 10;
+ rc = nn_setsockopt (req1, NN_SOL_SOCKET, NN_RECONNECT_IVL,
+ &timeo, sizeof (timeo));
+ test_send (req1, "ABC");
+ test_connect (req1, SOCKET_ADDRESS);
+
+ rep1 = test_socket (AF_SP, NN_REP);
+ test_bind (rep1, SOCKET_ADDRESS);
+ timeo = 500;
+ rc = nn_setsockopt (rep1, NN_SOL_SOCKET, NN_RCVTIMEO,
+ &timeo, sizeof (timeo));
+ printf("RC = %d errno %d (%s)\n", rc, errno, strerror(errno));
+ errno_assert (rc == 0);
+ test_recv (rep1, "ABC");
+
+ test_close (req1);
+ test_close (rep1);
+#endif
+
+ /* Check removing socket request sent to (It should
+ be sent immediatelly to other peer rather than relying
+ on the resend algorithm). */
+ req1 = test_socket (AF_SP, NN_REQ);
+ test_bind (req1, SOCKET_ADDRESS);
+ rep1 = test_socket (AF_SP, NN_REP);
+ test_connect (rep1, SOCKET_ADDRESS);
+ rep2 = test_socket (AF_SP, NN_REP);
+ nng_usleep(10000); // give time for rep1 to connect
+ test_connect (rep2, SOCKET_ADDRESS);
+
+ timeo = 500; // Was 200, but Windows occasionally fails at that rate.
+ test_setsockopt (rep1, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo));
+ test_setsockopt (rep2, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo));
+
+ test_send (req1, "ABC");
+ /* We got request through rep1 */
+ test_recv (rep1, "ABC");
+ /* But instead replying we simulate crash */
+ test_close (rep1);
+ /* The rep2 should get request immediately */
+ test_recv (rep2, "ABC");
+ /* Let's check it's delivered well */
+ test_send (rep2, "REPLY");
+ test_recv (req1, "REPLY");
+
+ test_close (req1);
+ test_close (rep2);
+
+ /* Test cancelling delayed request */
+
+ req1 = test_socket (AF_SP, NN_REQ);
+ test_connect (req1, SOCKET_ADDRESS);
+ test_send (req1, "ABC");
+ test_send (req1, "DEF");
+
+ rep1 = test_socket (AF_SP, NN_REP);
+ test_bind (rep1, SOCKET_ADDRESS);
+ timeo = 100;
+ test_recv (rep1, "DEF");
+
+ test_close (req1);
+ test_close (rep1);
+
+ return 0;
+}
+
diff --git a/tests/compat_testutil.h b/tests/compat_testutil.h
new file mode 100644
index 00000000..58020331
--- /dev/null
+++ b/tests/compat_testutil.h
@@ -0,0 +1,224 @@
+/*
+ Copyright (c) 2013 Insollo Entertainment, LLC. All rights reserved.
+ Copyright 2015 Garrett D'Amore <garrett@damore.org>
+ Copyright 2016 Franklin "Snaipe" Mathieu <franklinmathieu@gmail.com>
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+// Note: This file started life in nanomsg. We have copied it, and adjusted
+// it for validating the compatibility features of nanomsg. As much as
+// possible we want to run tests from the nanomsg test suite unmodified.
+
+#ifndef TESTUTIL_H_INCLUDED
+#define TESTUTIL_H_INCLUDED
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+#define nn_err_strerror nn_strerror
+#define nn_err_abort abort
+#define nn_assert assert
+#define errno_assert assert
+#define alloc_assert(x) assert(x != NULL)
+
+static int test_socket_impl (char *file, int line, int family, int protocol);
+static int test_connect_impl (char *file, int line, int sock, char *address);
+static int test_bind_impl (char *file, int line, int sock, char *address);
+static void test_close_impl (char *file, int line, int sock);
+static void test_send_impl (char *file, int line, int sock, char *data);
+static void test_recv_impl (char *file, int line, int sock, char *data);
+static void test_drop_impl (char *file, int line, int sock, int err);
+static int test_setsockopt_impl (char *file, int line, int sock, int level,
+ int option, const void *optval, size_t optlen);
+
+#define test_socket(f, p) test_socket_impl (__FILE__, __LINE__, (f), (p))
+#define test_connect(s, a) test_connect_impl (__FILE__, __LINE__, (s), (a))
+#define test_bind(s, a) test_bind_impl (__FILE__, __LINE__, (s), (a))
+#define test_send(s, d) test_send_impl (__FILE__, __LINE__, (s), (d))
+#define test_recv(s, d) test_recv_impl (__FILE__, __LINE__, (s), (d))
+#define test_drop(s, e) test_drop_impl (__FILE__, __LINE__, (s), (e))
+#define test_close(s) test_close_impl (__FILE__, __LINE__, (s))
+#define test_setsockopt(s, l, o, v, z) test_setsockopt_impl (__FILE__, \
+ __LINE__, (s), (l), (o), (v), (z))
+
+static int test_socket_impl (char *file, int line, int family,
+ int protocol)
+{
+ int sock;
+
+ sock = nn_socket (family, protocol);
+ if (sock == -1) {
+ fprintf (stderr, "Failed create socket: %s [%d] (%s:%d)\n",
+ nn_err_strerror (errno),
+ (int) errno, file, line);
+ nn_err_abort ();
+ }
+
+ return sock;
+}
+
+static int test_connect_impl (char *file, int line, int sock, char *address)
+{
+ int rc;
+
+ rc = nn_connect (sock, address);
+ if(rc < 0) {
+ fprintf (stderr, "Failed connect to \"%s\": %s [%d] (%s:%d)\n",
+ address,
+ nn_err_strerror (errno),
+ (int) errno, file, line);
+ nn_err_abort ();
+ }
+ return rc;
+}
+
+static int test_bind_impl (char *file, int line, int sock, char *address)
+{
+ int rc;
+
+ rc = nn_bind (sock, address);
+ if(rc < 0) {
+ fprintf (stderr, "Failed bind to \"%s\": %s [%d] (%s:%d)\n",
+ address,
+ nn_err_strerror (errno),
+ (int) errno, file, line);
+ nn_err_abort ();
+ }
+ return rc;
+}
+
+static int test_setsockopt_impl (char *file, int line,
+ int sock, int level, int option, const void *optval, size_t optlen)
+{
+ int rc;
+
+ rc = nn_setsockopt (sock, level, option, optval, optlen);
+ if(rc < 0) {
+ fprintf (stderr, "Failed set option \"%d\": %s [%d] (%s:%d)\n",
+ option,
+ nn_err_strerror (errno),
+ (int) errno, file, line);
+ nn_err_abort ();
+ }
+ return rc;
+}
+
+static void test_close_impl (char *file, int line, int sock)
+{
+ int rc;
+
+ rc = nn_close (sock);
+ if ((rc != 0) && (errno != EBADF && errno != ETERM)) {
+ fprintf (stderr, "Failed to close socket: %s [%d] (%s:%d)\n",
+ nn_err_strerror (errno),
+ (int) errno, file, line);
+ nn_err_abort ();
+ }
+}
+
+static void test_send_impl (char *file, int line,
+ int sock, char *data)
+{
+ size_t data_len;
+ int rc;
+
+ data_len = strlen (data);
+
+ rc = nn_send (sock, data, data_len, 0);
+ if (rc < 0) {
+ fprintf (stderr, "Failed to send: %s [%d] (%s:%d)\n",
+ nn_err_strerror (errno),
+ (int) errno, file, line);
+ nn_err_abort ();
+ }
+ if (rc != (int)data_len) {
+ fprintf (stderr, "Data to send is truncated: %d != %d (%s:%d)\n",
+ rc, (int) data_len,
+ file, line);
+ nn_err_abort ();
+ }
+}
+
+static void test_recv_impl (char *file, int line, int sock, char *data)
+{
+ size_t data_len;
+ int rc;
+ char *buf;
+
+ data_len = strlen (data);
+ /* We allocate plus one byte so that we are sure that message received
+ has correct length and not truncated */
+ buf = malloc (data_len+1);
+ alloc_assert (buf);
+
+ rc = nn_recv (sock, buf, data_len+1, 0);
+ if (rc < 0) {
+ fprintf (stderr, "Failed to recv: %s [%d] (%s:%d)\n",
+ nn_err_strerror (errno),
+ (int) errno, file, line);
+ nn_err_abort ();
+ }
+ if (rc != (int)data_len) {
+ fprintf (stderr, "Received data has wrong length: %d != %d (%s:%d)\n",
+ rc, (int) data_len,
+ file, line);
+ nn_err_abort ();
+ }
+ if (memcmp (data, buf, data_len) != 0) {
+ /* We don't print the data as it may have binary garbage */
+ fprintf (stderr, "Received data is wrong (%s:%d)\n", file, line);
+ nn_err_abort ();
+ }
+
+ free (buf);
+}
+
+static void test_drop_impl (char *file, int line, int sock, int err)
+{
+ int rc;
+ char buf[1024];
+
+ rc = nn_recv (sock, buf, sizeof (buf), 0);
+ if (rc < 0 && err != errno) {
+ fprintf (stderr, "Got wrong err to recv: %s [%d != %d] (%s:%d)\n",
+ nn_err_strerror (errno),
+ (int) errno, err, file, line);
+ nn_err_abort ();
+ } else if (rc >= 0) {
+ fprintf (stderr, "Did not drop message: [%d bytes] (%s:%d)\n",
+ rc, file, line);
+ nn_err_abort ();
+ }
+}
+
+static int get_test_port (int argc, const char *argv[])
+{
+ return atoi(argc < 2 ? "5555" : argv[1]);
+}
+
+static void test_addr_from (char *out, const char *proto,
+ const char *ip, int port)
+{
+ sprintf(out, "%s://%s:%d", proto, ip, port);
+}
+
+#endif