diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-09 13:01:22 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-09 13:01:22 -0800 |
| commit | 848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3 (patch) | |
| tree | 24a31a22803355a4a85293cb35c066b32c651b58 | |
| parent | 02231e715c4cb67d3394ee363a758f6b3e6b53b8 (diff) | |
| download | nng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.tar.gz nng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.tar.bz2 nng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.zip | |
Add survey test (and fix survey pattern).
As part of this, we've added a way to unblock callers in a message
queue with an error, even without a signal channel. This was necessary
to interrupt blockers upon survey timeout. They will get NNG_ETIMEDOUT,
but afterwards callers get NNG_ESTATE.
| -rw-r--r-- | src/core/msgqueue.c | 50 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 18 | ||||
| -rw-r--r-- | src/nng.h | 2 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 2 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 1 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 13 | ||||
| -rw-r--r-- | tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | tests/survey.c | 126 |
8 files changed, 203 insertions, 10 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 931fda68..9ef48f2d 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -25,6 +25,8 @@ struct nni_msgq { int mq_get; int mq_put; int mq_closed; + int mq_puterr; + int mq_geterr; int mq_rwait; // readers waiting (unbuffered) int mq_wwait; nni_msg ** mq_msgs; @@ -120,6 +122,44 @@ nni_msgq_fini(nni_msgq *mq) } +void +nni_msgq_set_put_error(nni_msgq *mq, int error) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_puterr = error; + if (error) { + nni_cv_wake(&mq->mq_writeable); + } + nni_mtx_unlock(&mq->mq_lock); +} + + +void +nni_msgq_set_get_error(nni_msgq *mq, int error) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_geterr = error; + if (error) { + nni_cv_wake(&mq->mq_readable); + } + nni_mtx_unlock(&mq->mq_lock); +} + + +void +nni_msgq_set_error(nni_msgq *mq, int error) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_geterr = error; + mq->mq_puterr = error; + if (error) { + nni_cv_wake(&mq->mq_readable); + nni_cv_wake(&mq->mq_writeable); + } + nni_mtx_unlock(&mq->mq_lock); +} + + // nni_msgq_signal raises a signal on the signal object. This allows a // waiter to be signaled, so that it can be woken e.g. due to a pipe closing. // Note that the signal object must be *zero* if no signal is raised. @@ -150,6 +190,11 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) return (NNG_ECLOSED); } + if ((rv = mq->mq_puterr) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } + // room in the queue? if (mq->mq_len < mq->mq_cap) { break; @@ -185,7 +230,6 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) } // Writeable! Yay!! - mq->mq_msgs[mq->mq_put] = msg; mq->mq_put++; if (mq->mq_put == mq->mq_alloc) { @@ -251,6 +295,10 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) nni_mtx_unlock(&mq->mq_lock); return (NNG_ECLOSED); } + if ((rv = mq->mq_geterr) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } if (expire == NNI_TIME_ZERO) { nni_mtx_unlock(&mq->mq_lock); return (NNG_EAGAIN); diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 09d7fa0c..450c55a4 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -87,6 +87,24 @@ extern int nni_msgq_get_sig(nni_msgq *, nni_msg **, nni_signal *); // It modifies the turnstile's value under the lock to a non-zero value. extern void nni_msgq_signal(nni_msgq *, nni_signal *); +// nni_msgq_set_error sets an error condition on the message queue, +// which causes all current and future readers/writes to return the +// given error condition (if non-zero). Threads waiting to put or get +// are woken as well, if non-zero. If zero, then any present error +// condition is cleared, and waiters are not woken (there shouldn't be +// any waiters unless it was already zero.) +extern void nni_msgq_set_error(nni_msgq *, int); + +// nni_msgq_set_put_error sets an error condition on the put side of the +// message queue, and for that side behaves like nni_msgq_set_error. +// Readers (nni_msgq_get*) are unaffected. +extern void nni_msgq_set_put_error(nni_msgq *, int); + +// nni_msgq_set_get_error sets an error condition on the get side of the +// message queue, and for that side behaves like nni_msgq_set_error. +// Readers (nni_msgq_put*) are unaffected. +extern void nni_msgq_set_get_error(nni_msgq *, int); + // nni_msgq_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue // are freed. Unlike closing a go channel, this operation is idempotent. @@ -254,7 +254,7 @@ NNG_DECL int nng_pipe_close(nng_pipe *); // <type> - zero (socket), or transport (8 bits) // <code> - specific value (16 bits) #define NNG_OPT_SOCKET(c) (c) -#define NNG_OPT_TRANSPORT_OPT(t, c) (0x10000 | ((p) << 16) | (c)) +#define NNG_OPT_TRANSPORT_OPT(t, c) (0x10000 | ((t) << 16) | (c)) #define NNG_OPT_RAW NNG_OPT_SOCKET(0) #define NNG_OPT_LINGER NNG_OPT_SOCKET(1) diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 5b9671f8..73ef4969 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -79,8 +79,6 @@ nni_req_sock_close(void *arg) { nni_req_sock *req = arg; - // Shut down the resender. We request it to exit by clearing - // its old value, then kick it. req->closing = 1; nni_cv_wake(&req->cv); } diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 2891edc1..2359fef1 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -228,7 +228,6 @@ again: if (rv != 0) { break; } - // Store the pipe id in the header, first thing. rv = nni_msg_append_header(msg, idbuf, 4); if (rv != 0) { diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 9def9292..3870a9ef 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -36,8 +36,8 @@ struct nni_surv_pipe { nni_pipe * npipe; nni_surv_sock * psock; nni_msgq * sendq; - int sigclose; nni_list_node node; + int sigclose; }; static int @@ -71,8 +71,7 @@ nni_surv_sock_close(void *arg) { nni_surv_sock *psock = arg; - // Shut down the resender. We request it to exit by clearing - // its old value, then kick it. + // Shut down the resender. psock->closing = 1; nni_cv_wake(&psock->cv); } @@ -103,8 +102,8 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (rv); } ppipe->npipe = npipe; - ppipe->sigclose = 0; ppipe->psock = psock; + ppipe->sigclose = 0; *pp = ppipe; return (0); } @@ -146,7 +145,7 @@ nni_surv_pipe_sender(void *arg) nni_surv_pipe *ppipe = arg; nni_surv_sock *psock = ppipe->psock; nni_pipe *npipe = ppipe->npipe; - nni_msgq *uwq = nni_sock_sendq(psock->nsock); + nni_msgq *uwq = ppipe->sendq; nni_msgq *urq = nni_sock_recvq(psock->nsock); nni_mtx *mx = nni_sock_mtx(psock->nsock); nni_msg *msg; @@ -206,6 +205,7 @@ nni_surv_pipe_receiver(void *arg) } } nni_msgq_signal(uwq, &ppipe->sigclose); + nni_msgq_set_error(ppipe->sendq, NNG_ECLOSED); nni_pipe_close(npipe); } @@ -308,6 +308,7 @@ nni_surv_sock_timeout(void *arg) { nni_surv_sock *psock = arg; nni_mtx *mx = nni_sock_mtx(psock->nsock); + nni_msgq *urq = nni_sock_recvq(psock->nsock); nni_mtx_lock(mx); for (;;) { @@ -322,6 +323,7 @@ nni_surv_sock_timeout(void *arg) // so zeroing means that nothing can match. memset(psock->survid, 0, sizeof (psock->survid)); nni_sock_recverr(psock->nsock, NNG_ESTATE); + nni_msgq_set_get_error(urq, NNG_ETIMEDOUT); } nni_cv_until(&psock->cv, psock->expire); } @@ -362,6 +364,7 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg) // Clear the error condition. nni_sock_recverr(psock->nsock, 0); + nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0); return (msg); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 83f7cbc1..c5bd75dd 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -60,4 +60,5 @@ add_nng_test(reqrep 5) add_nng_test(pipeline 5) add_nng_test(pubsub 5) add_nng_test(sock 5) +add_nng_test(survey 5) add_nng_test(tcp 5) diff --git a/tests/survey.c b/tests/survey.c new file mode 100644 index 00000000..138b61e0 --- /dev/null +++ b/tests/survey.c @@ -0,0 +1,126 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" + +#include <string.h> + +#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) +#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\ + So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) + +Main({ + int rv; + const char *addr = "inproc://test"; + + Test("SURVEY pattern", { + Convey("We can create a SURVEYOR socket", { + nng_socket *surv; + + So(nng_open(&surv, NNG_PROTO_SURVEYOR) == 0); + So(surv != NULL); + + Reset({ + nng_close(surv); + }) + + Convey("Protocols match", { + So(nng_protocol(surv) == NNG_PROTO_SURVEYOR); + So(nng_peer(surv) == NNG_PROTO_RESPONDENT); + }) + + Convey("Recv with no survey fails", { + nng_msg *msg; + So(nng_recvmsg(surv, &msg, 0) == NNG_ESTATE); + }) + + Convey("Survey without responder times out", { + uint64_t expire = 1000; + nng_msg *msg; + + So(nng_setopt(surv, NNG_OPT_SURVEYTIME, &expire, sizeof (expire)) == 0); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT); + }) + }) + + Convey("We can create a RESPONDENT socket", { + nng_socket *resp; + So(nng_open(&resp, NNG_PROTO_RESPONDENT) == 0); + So(resp != NULL); + + Reset({ + nng_close(resp); + }) + + Convey("Protocols match", { + So(nng_protocol(resp) == NNG_PROTO_RESPONDENT); + So(nng_peer(resp) == NNG_PROTO_SURVEYOR); + }) + + Convey("Send fails with no suvey", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(resp, msg, 0) == NNG_ESTATE); + nng_msg_free(msg); + }) + }) + + Convey("We can create a linked survey pair", { + nng_socket *surv; + nng_socket *resp; + uint64_t expire; + + So((rv = nng_open(&surv, NNG_PROTO_SURVEYOR)) == 0); + So(surv != NULL); + + So((rv = nng_open(&resp, NNG_PROTO_RESPONDENT)) == 0); + So(resp != NULL); + + Reset({ + nng_close(surv); + nng_close(resp); + }) + + expire = 10000; + So(nng_setopt(surv, NNG_OPT_SURVEYTIME, &expire, sizeof (expire)) == 0); + + So(nng_listen(surv, addr, NULL, NNG_FLAG_SYNCH) == 0); + So(nng_dial(resp, addr, NULL, NNG_FLAG_SYNCH) == 0); + + Convey("Survey works", { + nng_msg *msg; + uint64_t rtimeo; + + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "abc"); + So(nng_sendmsg(surv, msg, 0) == 0); + msg = NULL; + So(nng_recvmsg(resp, &msg, 0) == 0); + CHECKSTR(msg, "abc"); + nng_msg_trunc(msg, 3); + APPENDSTR(msg, "def"); + So(nng_sendmsg(resp, msg, 0) == 0); + msg = NULL; + So(nng_recvmsg(surv, &msg, 0) == 0); + CHECKSTR(msg, "def"); + nng_msg_free(msg); + + So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT); + + Convey("And goes to non-survey state", { + rv = nng_recvmsg(surv, &msg, 0); + So(rv== NNG_ESTATE); + }) + }) + }) + }) +}) |
