aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-09 13:01:22 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-09 13:01:22 -0800
commit848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3 (patch)
tree24a31a22803355a4a85293cb35c066b32c651b58
parent02231e715c4cb67d3394ee363a758f6b3e6b53b8 (diff)
downloadnng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.tar.gz
nng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.tar.bz2
nng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.zip
Add survey test (and fix survey pattern).
As part of this, we've added a way to unblock callers in a message queue with an error, even without a signal channel. This was necessary to interrupt blockers upon survey timeout. They will get NNG_ETIMEDOUT, but afterwards callers get NNG_ESTATE.
-rw-r--r--src/core/msgqueue.c50
-rw-r--r--src/core/msgqueue.h18
-rw-r--r--src/nng.h2
-rw-r--r--src/protocol/reqrep/req.c2
-rw-r--r--src/protocol/survey/respond.c1
-rw-r--r--src/protocol/survey/survey.c13
-rw-r--r--tests/CMakeLists.txt1
-rw-r--r--tests/survey.c126
8 files changed, 203 insertions, 10 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 931fda68..9ef48f2d 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -25,6 +25,8 @@ struct nni_msgq {
int mq_get;
int mq_put;
int mq_closed;
+ int mq_puterr;
+ int mq_geterr;
int mq_rwait; // readers waiting (unbuffered)
int mq_wwait;
nni_msg ** mq_msgs;
@@ -120,6 +122,44 @@ nni_msgq_fini(nni_msgq *mq)
}
+void
+nni_msgq_set_put_error(nni_msgq *mq, int error)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_puterr = error;
+ if (error) {
+ nni_cv_wake(&mq->mq_writeable);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
+nni_msgq_set_get_error(nni_msgq *mq, int error)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_geterr = error;
+ if (error) {
+ nni_cv_wake(&mq->mq_readable);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
+nni_msgq_set_error(nni_msgq *mq, int error)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_geterr = error;
+ mq->mq_puterr = error;
+ if (error) {
+ nni_cv_wake(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_writeable);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
// nni_msgq_signal raises a signal on the signal object. This allows a
// waiter to be signaled, so that it can be woken e.g. due to a pipe closing.
// Note that the signal object must be *zero* if no signal is raised.
@@ -150,6 +190,11 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
return (NNG_ECLOSED);
}
+ if ((rv = mq->mq_puterr) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
+
// room in the queue?
if (mq->mq_len < mq->mq_cap) {
break;
@@ -185,7 +230,6 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
}
// Writeable! Yay!!
-
mq->mq_msgs[mq->mq_put] = msg;
mq->mq_put++;
if (mq->mq_put == mq->mq_alloc) {
@@ -251,6 +295,10 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
nni_mtx_unlock(&mq->mq_lock);
return (NNG_ECLOSED);
}
+ if ((rv = mq->mq_geterr) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
if (expire == NNI_TIME_ZERO) {
nni_mtx_unlock(&mq->mq_lock);
return (NNG_EAGAIN);
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 09d7fa0c..450c55a4 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -87,6 +87,24 @@ extern int nni_msgq_get_sig(nni_msgq *, nni_msg **, nni_signal *);
// It modifies the turnstile's value under the lock to a non-zero value.
extern void nni_msgq_signal(nni_msgq *, nni_signal *);
+// nni_msgq_set_error sets an error condition on the message queue,
+// which causes all current and future readers/writes to return the
+// given error condition (if non-zero). Threads waiting to put or get
+// are woken as well, if non-zero. If zero, then any present error
+// condition is cleared, and waiters are not woken (there shouldn't be
+// any waiters unless it was already zero.)
+extern void nni_msgq_set_error(nni_msgq *, int);
+
+// nni_msgq_set_put_error sets an error condition on the put side of the
+// message queue, and for that side behaves like nni_msgq_set_error.
+// Readers (nni_msgq_get*) are unaffected.
+extern void nni_msgq_set_put_error(nni_msgq *, int);
+
+// nni_msgq_set_get_error sets an error condition on the get side of the
+// message queue, and for that side behaves like nni_msgq_set_error.
+// Readers (nni_msgq_put*) are unaffected.
+extern void nni_msgq_set_get_error(nni_msgq *, int);
+
// nni_msgq_close closes the queue. After this all operates on the
// message queue will return NNG_ECLOSED. Messages inside the queue
// are freed. Unlike closing a go channel, this operation is idempotent.
diff --git a/src/nng.h b/src/nng.h
index e535c9d4..932c9a59 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -254,7 +254,7 @@ NNG_DECL int nng_pipe_close(nng_pipe *);
// <type> - zero (socket), or transport (8 bits)
// <code> - specific value (16 bits)
#define NNG_OPT_SOCKET(c) (c)
-#define NNG_OPT_TRANSPORT_OPT(t, c) (0x10000 | ((p) << 16) | (c))
+#define NNG_OPT_TRANSPORT_OPT(t, c) (0x10000 | ((t) << 16) | (c))
#define NNG_OPT_RAW NNG_OPT_SOCKET(0)
#define NNG_OPT_LINGER NNG_OPT_SOCKET(1)
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 5b9671f8..73ef4969 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -79,8 +79,6 @@ nni_req_sock_close(void *arg)
{
nni_req_sock *req = arg;
- // Shut down the resender. We request it to exit by clearing
- // its old value, then kick it.
req->closing = 1;
nni_cv_wake(&req->cv);
}
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 2891edc1..2359fef1 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -228,7 +228,6 @@ again:
if (rv != 0) {
break;
}
-
// Store the pipe id in the header, first thing.
rv = nni_msg_append_header(msg, idbuf, 4);
if (rv != 0) {
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 9def9292..3870a9ef 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -36,8 +36,8 @@ struct nni_surv_pipe {
nni_pipe * npipe;
nni_surv_sock * psock;
nni_msgq * sendq;
- int sigclose;
nni_list_node node;
+ int sigclose;
};
static int
@@ -71,8 +71,7 @@ nni_surv_sock_close(void *arg)
{
nni_surv_sock *psock = arg;
- // Shut down the resender. We request it to exit by clearing
- // its old value, then kick it.
+ // Shut down the resender.
psock->closing = 1;
nni_cv_wake(&psock->cv);
}
@@ -103,8 +102,8 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (rv);
}
ppipe->npipe = npipe;
- ppipe->sigclose = 0;
ppipe->psock = psock;
+ ppipe->sigclose = 0;
*pp = ppipe;
return (0);
}
@@ -146,7 +145,7 @@ nni_surv_pipe_sender(void *arg)
nni_surv_pipe *ppipe = arg;
nni_surv_sock *psock = ppipe->psock;
nni_pipe *npipe = ppipe->npipe;
- nni_msgq *uwq = nni_sock_sendq(psock->nsock);
+ nni_msgq *uwq = ppipe->sendq;
nni_msgq *urq = nni_sock_recvq(psock->nsock);
nni_mtx *mx = nni_sock_mtx(psock->nsock);
nni_msg *msg;
@@ -206,6 +205,7 @@ nni_surv_pipe_receiver(void *arg)
}
}
nni_msgq_signal(uwq, &ppipe->sigclose);
+ nni_msgq_set_error(ppipe->sendq, NNG_ECLOSED);
nni_pipe_close(npipe);
}
@@ -308,6 +308,7 @@ nni_surv_sock_timeout(void *arg)
{
nni_surv_sock *psock = arg;
nni_mtx *mx = nni_sock_mtx(psock->nsock);
+ nni_msgq *urq = nni_sock_recvq(psock->nsock);
nni_mtx_lock(mx);
for (;;) {
@@ -322,6 +323,7 @@ nni_surv_sock_timeout(void *arg)
// so zeroing means that nothing can match.
memset(psock->survid, 0, sizeof (psock->survid));
nni_sock_recverr(psock->nsock, NNG_ESTATE);
+ nni_msgq_set_get_error(urq, NNG_ETIMEDOUT);
}
nni_cv_until(&psock->cv, psock->expire);
}
@@ -362,6 +364,7 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg)
// Clear the error condition.
nni_sock_recverr(psock->nsock, 0);
+ nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0);
return (msg);
}
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 83f7cbc1..c5bd75dd 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -60,4 +60,5 @@ add_nng_test(reqrep 5)
add_nng_test(pipeline 5)
add_nng_test(pubsub 5)
add_nng_test(sock 5)
+add_nng_test(survey 5)
add_nng_test(tcp 5)
diff --git a/tests/survey.c b/tests/survey.c
new file mode 100644
index 00000000..138b61e0
--- /dev/null
+++ b/tests/survey.c
@@ -0,0 +1,126 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+
+#include <string.h>
+
+#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
+#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\
+ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
+
+Main({
+ int rv;
+ const char *addr = "inproc://test";
+
+ Test("SURVEY pattern", {
+ Convey("We can create a SURVEYOR socket", {
+ nng_socket *surv;
+
+ So(nng_open(&surv, NNG_PROTO_SURVEYOR) == 0);
+ So(surv != NULL);
+
+ Reset({
+ nng_close(surv);
+ })
+
+ Convey("Protocols match", {
+ So(nng_protocol(surv) == NNG_PROTO_SURVEYOR);
+ So(nng_peer(surv) == NNG_PROTO_RESPONDENT);
+ })
+
+ Convey("Recv with no survey fails", {
+ nng_msg *msg;
+ So(nng_recvmsg(surv, &msg, 0) == NNG_ESTATE);
+ })
+
+ Convey("Survey without responder times out", {
+ uint64_t expire = 1000;
+ nng_msg *msg;
+
+ So(nng_setopt(surv, NNG_OPT_SURVEYTIME, &expire, sizeof (expire)) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
+ })
+ })
+
+ Convey("We can create a RESPONDENT socket", {
+ nng_socket *resp;
+ So(nng_open(&resp, NNG_PROTO_RESPONDENT) == 0);
+ So(resp != NULL);
+
+ Reset({
+ nng_close(resp);
+ })
+
+ Convey("Protocols match", {
+ So(nng_protocol(resp) == NNG_PROTO_RESPONDENT);
+ So(nng_peer(resp) == NNG_PROTO_SURVEYOR);
+ })
+
+ Convey("Send fails with no suvey", {
+ nng_msg *msg;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(resp, msg, 0) == NNG_ESTATE);
+ nng_msg_free(msg);
+ })
+ })
+
+ Convey("We can create a linked survey pair", {
+ nng_socket *surv;
+ nng_socket *resp;
+ uint64_t expire;
+
+ So((rv = nng_open(&surv, NNG_PROTO_SURVEYOR)) == 0);
+ So(surv != NULL);
+
+ So((rv = nng_open(&resp, NNG_PROTO_RESPONDENT)) == 0);
+ So(resp != NULL);
+
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ })
+
+ expire = 10000;
+ So(nng_setopt(surv, NNG_OPT_SURVEYTIME, &expire, sizeof (expire)) == 0);
+
+ So(nng_listen(surv, addr, NULL, NNG_FLAG_SYNCH) == 0);
+ So(nng_dial(resp, addr, NULL, NNG_FLAG_SYNCH) == 0);
+
+ Convey("Survey works", {
+ nng_msg *msg;
+ uint64_t rtimeo;
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "abc");
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ msg = NULL;
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ CHECKSTR(msg, "abc");
+ nng_msg_trunc(msg, 3);
+ APPENDSTR(msg, "def");
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ msg = NULL;
+ So(nng_recvmsg(surv, &msg, 0) == 0);
+ CHECKSTR(msg, "def");
+ nng_msg_free(msg);
+
+ So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
+
+ Convey("And goes to non-survey state", {
+ rv = nng_recvmsg(surv, &msg, 0);
+ So(rv== NNG_ESTATE);
+ })
+ })
+ })
+ })
+})