diff options
Diffstat (limited to 'src/sp/protocol/survey0')
| -rw-r--r-- | src/sp/protocol/survey0/CMakeLists.txt | 25 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/respond.c | 693 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/respond_test.c | 586 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/survey.c | 663 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/survey_test.c | 626 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/xrespond.c | 417 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/xrespond_test.c | 436 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/xsurvey.c | 379 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/xsurvey_test.c | 399 |
9 files changed, 4224 insertions, 0 deletions
diff --git a/src/sp/protocol/survey0/CMakeLists.txt b/src/sp/protocol/survey0/CMakeLists.txt new file mode 100644 index 00000000..b5daca41 --- /dev/null +++ b/src/sp/protocol/survey0/CMakeLists.txt @@ -0,0 +1,25 @@ +# +# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Surveyor/Respondent protocol +nng_directory(survey0) + +nng_sources_if(NNG_PROTO_SURVEYOR0 survey.c xsurvey.c) +nng_headers_if(NNG_PROTO_SURVEYOR0 nng/protocol/survey0/survey.h) +nng_defines_if(NNG_PROTO_SURVEYOR0 NNG_HAVE_SURVEYOR0) + +nng_sources_if(NNG_PROTO_RESPONDENT0 respond.c xrespond.c) +nng_headers_if(NNG_PROTO_RESPONDENT0 nng/protocol/survey0/respond.h) +nng_defines_if(NNG_PROTO_RESPONDENT0 NNG_HAVE_RESPONDENT0) + +nng_test(respond_test) +nng_test(survey_test) +nng_test(xrespond_test) +nng_test(xsurvey_test)
\ No newline at end of file diff --git a/src/sp/protocol/survey0/respond.c b/src/sp/protocol/survey0/respond.c new file mode 100644 index 00000000..ad551c8f --- /dev/null +++ b/src/sp/protocol/survey0/respond.c @@ -0,0 +1,693 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "nng/protocol/survey0/respond.h" + +// Respondent protocol. The RESPONDENT protocol is the "replier" side of +// the surveyor pattern. This is useful for building service discovery, or +// voting algorithms, for example. + +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif + +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif + +typedef struct resp0_pipe resp0_pipe; +typedef struct resp0_sock resp0_sock; +typedef struct resp0_ctx resp0_ctx; + +static void resp0_pipe_send_cb(void *); +static void resp0_pipe_recv_cb(void *); +static void resp0_pipe_fini(void *); + +struct resp0_ctx { + resp0_sock * sock; + uint32_t pipe_id; + resp0_pipe * spipe; // send pipe + nni_aio * saio; // send aio + nni_aio * raio; // recv aio + nni_list_node sqnode; + nni_list_node rqnode; + size_t btrace_len; + uint32_t btrace[NNI_MAX_MAX_TTL + 1]; +}; + +// resp0_sock is our per-socket protocol private structure. +struct resp0_sock { + nni_mtx mtx; + nni_atomic_int ttl; + nni_id_map pipes; + resp0_ctx ctx; + nni_list recvpipes; + nni_list recvq; + nni_pollable readable; + nni_pollable writable; +}; + +// resp0_pipe is our per-pipe protocol private structure. +struct resp0_pipe { + nni_pipe * npipe; + resp0_sock * psock; + bool busy; + bool closed; + uint32_t id; + nni_list sendq; // contexts waiting to send + nni_aio aio_send; + nni_aio aio_recv; + nni_list_node rnode; // receivable linkage +}; + +static void +resp0_ctx_close(void *arg) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + nni_aio * aio; + + // complete any outstanding operations here, cancellation, etc. + + nni_mtx_lock(&s->mtx); + if ((aio = ctx->saio) != NULL) { + resp0_pipe *p = ctx->spipe; + ctx->saio = NULL; + ctx->spipe = NULL; + nni_list_remove(&p->sendq, ctx); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + if ((aio = ctx->raio) != NULL) { + ctx->raio = NULL; + nni_list_remove(&s->recvq, ctx); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&s->mtx); +} + +static void +resp0_ctx_fini(void *arg) +{ + resp0_ctx *ctx = arg; + + resp0_ctx_close(ctx); +} + +static int +resp0_ctx_init(void *carg, void *sarg) +{ + resp0_sock *s = sarg; + resp0_ctx * ctx = carg; + + NNI_LIST_NODE_INIT(&ctx->sqnode); + NNI_LIST_NODE_INIT(&ctx->rqnode); + ctx->btrace_len = 0; + ctx->sock = s; + ctx->pipe_id = 0; + + return (0); +} + +static void +resp0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + + nni_mtx_lock(&s->mtx); + if (ctx->saio != aio) { + nni_mtx_unlock(&s->mtx); + return; + } + nni_list_node_remove(&ctx->sqnode); + ctx->saio = NULL; + nni_mtx_unlock(&s->mtx); + nni_msg_header_clear(nni_aio_get_msg(aio)); // reset the headers + nni_aio_finish_error(aio, rv); +} + +static void +resp0_ctx_send(void *arg, nni_aio *aio) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + resp0_pipe *p; + nni_msg * msg; + size_t len; + uint32_t pid; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + msg = nni_aio_get_msg(aio); + nni_msg_header_clear(msg); + + if (ctx == &s->ctx) { + // We can't send anymore, because only one send per request. + nni_pollable_clear(&s->writable); + } + + nni_mtx_lock(&s->mtx); + if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + if ((len = ctx->btrace_len) == 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + pid = ctx->pipe_id; + ctx->pipe_id = 0; + ctx->btrace_len = 0; + + if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + if ((p = nni_id_get(&s->pipes, pid)) == NULL) { + // Surveyor has left the building. Just discard the reply. + nni_mtx_unlock(&s->mtx); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + return; + } + + if (!p->busy) { + p->busy = true; + len = nni_msg_len(msg); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->npipe, &p->aio_send); + nni_mtx_unlock(&s->mtx); + + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + return; + } + + ctx->saio = aio; + ctx->spipe = p; + nni_list_append(&p->sendq, ctx); + nni_mtx_unlock(&s->mtx); +} + +static void +resp0_sock_fini(void *arg) +{ + resp0_sock *s = arg; + + nni_id_map_fini(&s->pipes); + resp0_ctx_fini(&s->ctx); + nni_pollable_fini(&s->writable); + nni_pollable_fini(&s->readable); + nni_mtx_fini(&s->mtx); +} + +static int +resp0_sock_init(void *arg, nni_sock *nsock) +{ + resp0_sock *s = arg; + + NNI_ARG_UNUSED(nsock); + + nni_mtx_init(&s->mtx); + nni_id_map_init(&s->pipes, 0, 0, false); + + NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode); + NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode); + + nni_atomic_init(&s->ttl); + nni_atomic_set(&s->ttl, 8); // Per RFC + + (void) resp0_ctx_init(&s->ctx, s); + + // We start off without being either readable or writable. + // Readability comes when there is something on the socket. + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); + return (0); +} + +static void +resp0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +resp0_sock_close(void *arg) +{ + resp0_sock *s = arg; + + resp0_ctx_close(&s->ctx); +} + +static void +resp0_pipe_stop(void *arg) +{ + resp0_pipe *p = arg; + + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); +} + +static void +resp0_pipe_fini(void *arg) +{ + resp0_pipe *p = arg; + nng_msg * msg; + + if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) { + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_free(msg); + } + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); +} + +static int +resp0_pipe_init(void *arg, nni_pipe *npipe, void *s) +{ + resp0_pipe *p = arg; + + nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p); + nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p); + + NNI_LIST_INIT(&p->sendq, resp0_ctx, sqnode); + + p->npipe = npipe; + p->psock = s; + p->busy = false; + p->id = nni_pipe_id(npipe); + + return (0); +} + +static int +resp0_pipe_start(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + int rv; + + if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) { + return (NNG_EPROTO); + } + + nni_mtx_lock(&s->mtx); + rv = nni_id_set(&s->pipes, p->id, p); + nni_mtx_unlock(&s->mtx); + if (rv != 0) { + return (rv); + } + + nni_pipe_recv(p->npipe, &p->aio_recv); + return (rv); +} + +static void +resp0_pipe_close(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + resp0_ctx * ctx; + + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); + + nni_mtx_lock(&s->mtx); + p->closed = true; + while ((ctx = nni_list_first(&p->sendq)) != NULL) { + nni_aio *aio; + nni_msg *msg; + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + msg = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + } + if (p->id == s->ctx.pipe_id) { + // Make sure user space knows they can send a message to us, + // which we will happily discard. + nni_pollable_raise(&s->writable); + } + nni_id_remove(&s->pipes, p->id); + nni_mtx_unlock(&s->mtx); +} + +static void +resp0_pipe_send_cb(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + resp0_ctx * ctx; + nni_aio * aio; + nni_msg * msg; + size_t len; + + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); + nni_pipe_close(p->npipe); + return; + } + nni_mtx_lock(&s->mtx); + p->busy = false; + if ((ctx = nni_list_first(&p->sendq)) == NULL) { + // Nothing else to send. + if (p->id == s->ctx.pipe_id) { + // Mark us ready for the other side to send! + nni_pollable_raise(&s->writable); + } + nni_mtx_unlock(&s->mtx); + return; + } + + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + ctx->spipe = NULL; + p->busy = true; + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->npipe, &p->aio_send); + + nni_mtx_unlock(&s->mtx); + + nni_aio_finish_sync(aio, 0, len); +} + +static void +resp0_cancel_recv(nni_aio *aio, void *arg, int rv) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + + nni_mtx_lock(&s->mtx); + if (ctx->raio == aio) { + nni_list_remove(&s->recvq, ctx); + ctx->raio = NULL; + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->mtx); +} + +static void +resp0_ctx_recv(void *arg, nni_aio *aio) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + resp0_pipe *p; + size_t len; + nni_msg * msg; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&s->mtx); + if ((p = nni_list_first(&s->recvpipes)) == NULL) { + int rv; + rv = nni_aio_schedule(aio, resp0_cancel_recv, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + // We cannot have two concurrent receive requests on the same + // context... + if (ctx->raio != NULL) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + ctx->raio = aio; + nni_list_append(&s->recvq, ctx); + nni_mtx_unlock(&s->mtx); + return; + } + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_list_remove(&s->recvpipes, p); + if (nni_list_empty(&s->recvpipes)) { + nni_pollable_clear(&s->readable); + } + nni_pipe_recv(p->npipe, &p->aio_recv); + + len = nni_msg_header_len(msg); + memcpy(ctx->btrace, nni_msg_header(msg), len); + ctx->btrace_len = len; + ctx->pipe_id = p->id; + if (ctx == &s->ctx) { + nni_pollable_raise(&s->writable); + } + nni_mtx_unlock(&s->mtx); + + nni_msg_header_clear(msg); + nni_aio_set_msg(aio, msg); + nni_aio_finish(aio, 0, nni_msg_len(msg)); +} + +static void +resp0_pipe_recv_cb(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + resp0_ctx * ctx; + nni_msg * msg; + nni_aio * aio; + int hops; + size_t len; + int ttl; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->npipe); + return; + } + + ttl = nni_atomic_get(&s->ttl); + msg = nni_aio_get_msg(&p->aio_recv); + nni_msg_set_pipe(msg, p->id); + + // Move backtrace from body to header + hops = 1; + for (;;) { + bool end = 0; + uint8_t *body; + + if (hops > ttl) { + goto drop; + } + hops++; + if (nni_msg_len(msg) < 4) { + // Peer is speaking garbage, kick it. + nni_msg_free(msg); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_close(p->npipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { + goto drop; + } + nni_msg_trim(msg, 4); + if (end) { + break; + } + } + + len = nni_msg_header_len(msg); + + nni_mtx_lock(&s->mtx); + + if (p->closed) { + // If pipe was closed, we just abandon the data from it. + nni_aio_set_msg(&p->aio_recv, NULL); + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + return; + } + if ((ctx = nni_list_first(&s->recvq)) == NULL) { + // No one blocked in recv, stall. + nni_list_append(&s->recvpipes, p); + nni_pollable_raise(&s->readable); + nni_mtx_unlock(&s->mtx); + return; + } + + nni_list_remove(&s->recvq, ctx); + aio = ctx->raio; + ctx->raio = NULL; + nni_aio_set_msg(&p->aio_recv, NULL); + + // Start the next receive. + nni_pipe_recv(p->npipe, &p->aio_recv); + + ctx->btrace_len = len; + memcpy(ctx->btrace, nni_msg_header(msg), len); + nni_msg_header_clear(msg); + ctx->pipe_id = p->id; + + if ((ctx == &s->ctx) && (!p->busy)) { + nni_pollable_raise(&s->writable); + } + nni_mtx_unlock(&s->mtx); + + nni_aio_set_msg(aio, msg); + nni_aio_finish_sync(aio, 0, nni_msg_len(msg)); + return; + +drop: + nni_msg_free(msg); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->npipe, &p->aio_recv); +} + +static int +resp0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + resp0_sock *s = arg; + int ttl; + int rv; + + if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) { + nni_atomic_set(&s->ttl, ttl); + } + return (rv); +} + +static int +resp0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + resp0_sock *s = arg; + return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); +} + +static int +resp0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + resp0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static int +resp0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + resp0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static void +resp0_sock_send(void *arg, nni_aio *aio) +{ + resp0_sock *s = arg; + + resp0_ctx_send(&s->ctx, aio); +} + +static void +resp0_sock_recv(void *arg, nni_aio *aio) +{ + resp0_sock *s = arg; + + resp0_ctx_recv(&s->ctx, aio); +} + +static nni_proto_pipe_ops resp0_pipe_ops = { + .pipe_size = sizeof(resp0_pipe), + .pipe_init = resp0_pipe_init, + .pipe_fini = resp0_pipe_fini, + .pipe_start = resp0_pipe_start, + .pipe_close = resp0_pipe_close, + .pipe_stop = resp0_pipe_stop, +}; + +static nni_proto_ctx_ops resp0_ctx_ops = { + .ctx_size = sizeof(resp0_ctx), + .ctx_init = resp0_ctx_init, + .ctx_fini = resp0_ctx_fini, + .ctx_send = resp0_ctx_send, + .ctx_recv = resp0_ctx_recv, +}; + +static nni_option resp0_sock_options[] = { + { + .o_name = NNG_OPT_MAXTTL, + .o_get = resp0_sock_get_max_ttl, + .o_set = resp0_sock_set_max_ttl, + }, + { + .o_name = NNG_OPT_RECVFD, + .o_get = resp0_sock_get_recvfd, + .o_set = NULL, + }, + { + .o_name = NNG_OPT_SENDFD, + .o_get = resp0_sock_get_sendfd, + .o_set = NULL, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops resp0_sock_ops = { + .sock_size = sizeof(resp0_sock), + .sock_init = resp0_sock_init, + .sock_fini = resp0_sock_fini, + .sock_open = resp0_sock_open, + .sock_close = resp0_sock_close, + .sock_send = resp0_sock_send, + .sock_recv = resp0_sock_recv, + .sock_options = resp0_sock_options, +}; + +static nni_proto resp0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &resp0_sock_ops, + .proto_pipe_ops = &resp0_pipe_ops, + .proto_ctx_ops = &resp0_ctx_ops, +}; + +int +nng_respondent0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &resp0_proto)); +} diff --git a/src/sp/protocol/survey0/respond_test.c b/src/sp/protocol/survey0/respond_test.c new file mode 100644 index 00000000..51844c76 --- /dev/null +++ b/src/sp/protocol/survey0/respond_test.c @@ -0,0 +1,586 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +void +test_resp_identity(void) +{ + nng_socket s; + int p; + char * n; + + NUTS_PASS(nng_respondent0_open(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NNG_RESPONDENT0_SELF); + NUTS_TRUE(nng_socket_get_int(s, NNG_OPT_PEER, &p) == 0); + NUTS_TRUE(p == NNG_RESPONDENT0_PEER); + NUTS_TRUE(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n) == 0); + NUTS_MATCH(n, NNG_RESPONDENT0_SELF_NAME); + nng_strfree(n); + NUTS_TRUE(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n) == 0); + NUTS_MATCH(n, NNG_RESPONDENT0_PEER_NAME); + nng_strfree(n); + NUTS_CLOSE(s); +} + +void +test_resp_send_bad_state(void) +{ + nng_socket resp; + nng_msg * msg = NULL; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_FAIL(nng_sendmsg(resp, msg, 0), NNG_ESTATE); + nng_msg_free(msg); + NUTS_CLOSE(resp); +} + +void +test_resp_poll_writeable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(resp, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not writable before connect. + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_MARRY(surv, resp); + + // Still not writable. + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // If we get a job, *then* we become writable + NUTS_SEND(surv, "abc"); + NUTS_RECV(resp, "abc"); + NUTS_TRUE(nuts_poll_fd(fd) == true); + + // And is no longer writable once we send a message + NUTS_SEND(resp, "def"); + NUTS_TRUE(nuts_poll_fd(fd) == false); + // Even after receiving it + NUTS_RECV(surv, "def"); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_resp_poll_readable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(resp, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(surv, resp); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(surv, "abc"); + NUTS_SLEEP(100); + + NUTS_TRUE(nuts_poll_fd(fd) == true); + + // and receiving makes it no longer ready + NUTS_PASS(nng_recvmsg(resp, &msg, 0)); + nng_msg_free(msg); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // TODO verify unsolicited response + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_resp_context_no_poll(void) +{ + int fd; + nng_socket resp; + nng_ctx ctx; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_ctx_open(&ctx, resp)); + NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); + NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(resp); +} + +void +test_resp_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char * addr; + + NUTS_ADDR(addr, "inproc"); + + NUTS_PASS(nng_respondent0_open(&s1)); + NUTS_PASS(nng_respondent0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + nng_stats_free(stats); +} + +void +test_resp_double_recv(void) +{ + nng_socket s1; + nng_aio * aio1; + nng_aio * aio2; + + NUTS_PASS(nng_respondent0_open(&s1)); + NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + NUTS_PASS(nng_aio_alloc(&aio2, NULL, NULL)); + + nng_recv_aio(s1, aio1); + nng_recv_aio(s1, aio2); + + nng_aio_wait(aio2); + NUTS_FAIL(nng_aio_result(aio2), NNG_ESTATE); + NUTS_CLOSE(s1); + NUTS_FAIL(nng_aio_result(aio1), NNG_ECLOSED); + nng_aio_free(aio1); + nng_aio_free(aio2); +} + +void +test_resp_close_pipe_before_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p; + nng_aio * aio1; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + + NUTS_MARRY(surv, resp); + NUTS_SEND(surv, "test"); + + nng_recv_aio(resp, aio1); + nng_aio_wait(aio1); + NUTS_PASS(nng_aio_result(aio1)); + NUTS_TRUE((m = nng_aio_get_msg(aio1)) != NULL); + p = nng_msg_get_pipe(m); + NUTS_PASS(nng_pipe_close(p)); + NUTS_PASS(nng_sendmsg(resp, m, 0)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); + nng_aio_free(aio1); +} + +void +test_resp_close_pipe_during_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p = NNG_PIPE_INITIALIZER; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 200)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_SENDBUF, 20)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_RECVBUF, 20)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_SENDBUF, 20)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_RECVBUF, 1)); + + NUTS_MARRY(surv, resp); + + for (int i = 0; i < 100; i++) { + int rv; + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, (unsigned) i | 0x80000000u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + p = nng_msg_get_pipe(m); + rv = nng_sendmsg(resp, m, 0); + if (rv == NNG_ETIMEDOUT) { + // Queue is backed up, senders are busy. + nng_msg_free(m); + break; + } + NUTS_PASS(rv); + } + NUTS_PASS(nng_pipe_close(p)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_resp_ctx_recv_aio_stopped(void) +{ + nng_socket resp; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_ctx_open(&ctx, resp)); + + nng_aio_stop(aio); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +void +test_resp_close_pipe_context_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p = NNG_PIPE_INITIALIZER; + nng_msg * m; + nng_ctx ctx[10]; + nng_aio * aio[10]; + int i; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_SENDBUF, 1)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_RECVBUF, 1)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_SENDBUF, 1)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_RECVBUF, 1)); + for (i = 0; i < 10; i++) { + NUTS_PASS(nng_ctx_open(&ctx[i], resp)); + NUTS_PASS(nng_aio_alloc(&aio[i], NULL, NULL)); + } + + NUTS_MARRY(surv, resp); + + for (i = 0; i < 10; i++) { + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, (unsigned) i | 0x80000000u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + nng_ctx_recv(ctx[i], aio[i]); + } + for (i = 0; i < 10; i++) { + nng_aio_wait(aio[i]); + NUTS_PASS(nng_aio_result(aio[i])); + NUTS_TRUE((m = nng_aio_get_msg(aio[i])) != NULL); + p = nng_msg_get_pipe(m); + nng_aio_set_msg(aio[i], m); + nng_ctx_send(ctx[i], aio[i]); + } + + // Note that SURVEYOR socket is not reading the results. + NUTS_PASS(nng_pipe_close(p)); + + for (i = 0; i < 10; i++) { + int rv; + nng_aio_wait(aio[i]); + rv = nng_aio_result(aio[i]); + if (rv != 0) { + NUTS_FAIL(rv, NNG_ECLOSED); + nng_msg_free(nng_aio_get_msg(aio[i])); + } + nng_aio_free(aio[i]); + NUTS_PASS(nng_ctx_close(ctx[i])); + } + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_resp_close_context_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_ctx ctx[10]; + nng_aio * aio[10]; + int i; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_SENDBUF, 1)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_RECVBUF, 1)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_SENDBUF, 1)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_RECVBUF, 1)); + for (i = 0; i < 10; i++) { + NUTS_PASS(nng_ctx_open(&ctx[i], resp)); + NUTS_PASS(nng_aio_alloc(&aio[i], NULL, NULL)); + } + + NUTS_MARRY(surv, resp); + + for (i = 0; i < 10; i++) { + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, (unsigned) i | 0x80000000u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + nng_ctx_recv(ctx[i], aio[i]); + } + for (i = 0; i < 10; i++) { + nng_aio_wait(aio[i]); + NUTS_PASS(nng_aio_result(aio[i])); + NUTS_TRUE((m = nng_aio_get_msg(aio[i])) != NULL); + nng_aio_set_msg(aio[i], m); + nng_ctx_send(ctx[i], aio[i]); + } + + // Note that REQ socket is not reading the results. + for (i = 0; i < 10; i++) { + int rv; + NUTS_PASS(nng_ctx_close(ctx[i])); + nng_aio_wait(aio[i]); + rv = nng_aio_result(aio[i]); + if (rv != 0) { + NUTS_FAIL(rv, NNG_ECLOSED); + nng_msg_free(nng_aio_get_msg(aio[i])); + } + nng_aio_free(aio[i]); + } + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_resp_ctx_recv_nonblock(void) +{ + nng_socket resp; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_ctx_open(&ctx, resp)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_recv(ctx, aio); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +static void +test_resp_ctx_send_nonblock(void) +{ + nng_socket resp; + nng_socket surv; + nng_ctx ctx; + nng_aio * aio; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_ctx_open(&ctx, resp)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_MARRY(surv, resp); + + NUTS_SEND(surv, "SEND"); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + // message carries over + msg = nng_aio_get_msg(aio); + nng_aio_set_msg(aio, msg); + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_send(ctx, aio); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); + nng_aio_free(aio); + nng_msg_free(msg); +} + +void +test_resp_recv_garbage(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 200)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + NUTS_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_resp_ttl_option(void) +{ + nng_socket resp; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_MAXTTL; + + NUTS_PASS(nng_respondent0_open(&resp)); + + NUTS_PASS(nng_setopt_int(resp, opt, 1)); + NUTS_FAIL(nng_setopt_int(resp, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(resp, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(resp, opt, 16), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(resp, opt, 256), NNG_EINVAL); + NUTS_PASS(nng_setopt_int(resp, opt, 3)); + NUTS_PASS(nng_socket_get_int(resp, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(resp, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_setopt(resp, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(resp, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_setopt_bool(resp, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(resp, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(resp); +} + +static void +test_resp_ttl_drop(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_MAXTTL, 3)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + // Send messages. Note that xrep implicitly adds a hop on receive. + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 2 hops + NUTS_PASS(nng_msg_append_u32(m, 0x80000001u)); + NUTS_PASS(nng_msg_append(m, "PASS1", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard! + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 3u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000002u)); + NUTS_PASS(nng_msg_append(m, "FAIL2", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 3 hops - passes + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000003u)); + NUTS_PASS(nng_msg_append(m, "PASS3", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard! + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 3u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000003u)); + NUTS_PASS(nng_msg_append(m, "FAIL4", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_RECV(resp, "PASS1"); + NUTS_RECV(resp, "PASS3"); + + NUTS_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(resp); + NUTS_CLOSE(surv); +} + +TEST_LIST = { + { "respond identity", test_resp_identity }, + { "respond send bad state", test_resp_send_bad_state }, + { "respond poll readable", test_resp_poll_readable }, + { "respond poll writable", test_resp_poll_writeable }, + { "respond context does not poll", test_resp_context_no_poll }, + { "respond validate peer", test_resp_validate_peer }, + { "respond double recv", test_resp_double_recv }, + { "respond close pipe before send", test_resp_close_pipe_before_send }, + { "respond close pipe during send", test_resp_close_pipe_during_send }, + { "respond recv aio ctx stopped", test_resp_ctx_recv_aio_stopped }, + { "respond close pipe context send", + test_resp_close_pipe_context_send }, + { "respond close context send", test_resp_close_context_send }, + { "respond context send nonblock", test_resp_ctx_send_nonblock }, + { "respond context recv nonblock", test_resp_ctx_recv_nonblock }, + { "respond recv garbage", test_resp_recv_garbage }, + { "respond ttl option", test_resp_ttl_option }, + { "respond ttl drop", test_resp_ttl_drop }, + { NULL, NULL }, +}; diff --git a/src/sp/protocol/survey0/survey.c b/src/sp/protocol/survey0/survey.c new file mode 100644 index 00000000..ce1ed601 --- /dev/null +++ b/src/sp/protocol/survey0/survey.c @@ -0,0 +1,663 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> + +#include "core/nng_impl.h" +#include "nng/protocol/survey0/survey.h" + +// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the +// survey pattern. This is useful for building service discovery, voting, etc. +// Note that this pattern is not optimized for extreme low latency, as it makes +// multiple use of queues for simplicity. Typically this is used in cases +// where a few dozen extra microseconds does not matter. + +typedef struct surv0_pipe surv0_pipe; +typedef struct surv0_sock surv0_sock; +typedef struct surv0_ctx surv0_ctx; + +static void surv0_pipe_send_cb(void *); +static void surv0_pipe_recv_cb(void *); +static void surv0_ctx_timeout(void *); + +struct surv0_ctx { + surv0_sock * sock; + uint32_t survey_id; // survey id + nni_timer_node timer; + nni_time expire; + nni_lmq recv_lmq; + nni_list recv_queue; + nni_atomic_int recv_buf; + nni_atomic_int survey_time; + int err; +}; + +// surv0_sock is our per-socket protocol private structure. +struct surv0_sock { + int ttl; + nni_list pipes; + nni_mtx mtx; + surv0_ctx ctx; + nni_id_map surveys; + nni_pollable writable; + nni_pollable readable; + nni_atomic_int send_buf; +}; + +// surv0_pipe is our per-pipe protocol private structure. +struct surv0_pipe { + nni_pipe * pipe; + surv0_sock * sock; + nni_lmq send_queue; + nni_list_node node; + nni_aio aio_send; + nni_aio aio_recv; + bool busy; + bool closed; +}; + +static void +surv0_ctx_abort(surv0_ctx *ctx, int err) +{ + nni_aio * aio; + surv0_sock *sock = ctx->sock; + + while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_finish_error(aio, err); + } + nni_lmq_flush(&ctx->recv_lmq); + if (ctx->survey_id != 0) { + nni_id_remove(&sock->surveys, ctx->survey_id); + ctx->survey_id = 0; + } + if (ctx == &sock->ctx) { + nni_pollable_clear(&sock->readable); + } +} + +static void +surv0_ctx_close(surv0_ctx *ctx) +{ + surv0_sock *sock = ctx->sock; + + nni_mtx_lock(&sock->mtx); + surv0_ctx_abort(ctx, NNG_ECLOSED); + nni_mtx_unlock(&sock->mtx); +} + +static void +surv0_ctx_fini(void *arg) +{ + surv0_ctx *ctx = arg; + + surv0_ctx_close(ctx); + nni_timer_cancel(&ctx->timer); + nni_lmq_fini(&ctx->recv_lmq); +} + +static int +surv0_ctx_init(void *c, void *s) +{ + surv0_ctx * ctx = c; + surv0_sock * sock = s; + int rv; + int len; + nng_duration tmo; + + nni_aio_list_init(&ctx->recv_queue); + nni_atomic_init(&ctx->recv_buf); + nni_atomic_init(&ctx->survey_time); + + if (ctx == &sock->ctx) { + len = 128; + tmo = NNI_SECOND; // survey timeout + } else { + len = nni_atomic_get(&sock->ctx.recv_buf); + tmo = nni_atomic_get(&sock->ctx.survey_time); + } + + nni_atomic_set(&ctx->recv_buf, len); + nni_atomic_set(&ctx->survey_time, tmo); + + ctx->sock = sock; + + if ((rv = nni_lmq_init(&ctx->recv_lmq, len)) != 0) { + surv0_ctx_fini(ctx); + return (rv); + } + nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx); + return (0); +} + +static void +surv0_ctx_cancel(nni_aio *aio, void *arg, int rv) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + nni_mtx_lock(&sock->mtx); + if (nni_list_active(&ctx->recv_queue, aio)) { + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_finish_error(aio, rv); + } + if (ctx->survey_id != 0) { + nni_id_remove(&sock->surveys, ctx->survey_id); + ctx->survey_id = 0; + } + nni_mtx_unlock(&sock->mtx); +} + +static void +surv0_ctx_recv(void *arg, nni_aio *aio) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + nni_msg * msg; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&sock->mtx); + if (ctx->survey_id == 0) { + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } +again: + if (nni_lmq_getq(&ctx->recv_lmq, &msg) != 0) { + int rv; + if ((rv = nni_aio_schedule(aio, &surv0_ctx_cancel, ctx)) != + 0) { + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&ctx->recv_queue, aio); + nni_mtx_unlock(&sock->mtx); + return; + } + if (nni_lmq_empty(&ctx->recv_lmq) && (ctx == &sock->ctx)) { + nni_pollable_clear(&sock->readable); + } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } + + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_msg(aio, msg); +} + +void +surv0_ctx_timeout(void *arg) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + + nni_mtx_lock(&sock->mtx); + if (nni_clock() < ctx->expire) { + nni_mtx_unlock(&sock->mtx); + return; + } + + // Abort any pending receives. + surv0_ctx_abort(ctx, NNG_ETIMEDOUT); + nni_mtx_unlock(&sock->mtx); +} + +static void +surv0_ctx_send(void *arg, nni_aio *aio) +{ + surv0_ctx * ctx = arg; + surv0_sock * sock = ctx->sock; + surv0_pipe * pipe; + nni_msg * msg = nni_aio_get_msg(aio); + size_t len = nni_msg_len(msg); + nni_time now = nni_clock(); + nng_duration survey_time; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + survey_time = nni_atomic_get(&ctx->survey_time); + + nni_mtx_lock(&sock->mtx); + + // Abort everything outstanding. + surv0_ctx_abort(ctx, NNG_ECANCELED); + nni_timer_cancel(&ctx->timer); + + // Allocate the new ID. + if ((rv = nni_id_alloc(&sock->surveys, &ctx->survey_id, ctx)) != 0) { + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_msg_header_clear(msg); + nni_msg_header_append_u32(msg, (uint32_t) ctx->survey_id); + + // From this point, we're committed to success. Note that we send + // regardless of whether there are any pipes or not. If no pipes, + // then it just gets discarded. + nni_aio_set_msg(aio, NULL); + NNI_LIST_FOREACH (&sock->pipes, pipe) { + + // if the pipe isn't busy, then send this message direct. + if (!pipe->busy) { + pipe->busy = true; + nni_msg_clone(msg); + nni_aio_set_msg(&pipe->aio_send, msg); + nni_pipe_send(pipe->pipe, &pipe->aio_send); + } else if (!nni_lmq_full(&pipe->send_queue)) { + nni_msg_clone(msg); + nni_lmq_putq(&pipe->send_queue, msg); + } + } + + ctx->expire = now + survey_time; + nni_timer_schedule(&ctx->timer, ctx->expire); + + nni_mtx_unlock(&sock->mtx); + nni_msg_free(msg); + + nni_aio_finish(aio, 0, len); +} + +static void +surv0_sock_fini(void *arg) +{ + surv0_sock *sock = arg; + + surv0_ctx_fini(&sock->ctx); + nni_id_map_fini(&sock->surveys); + nni_pollable_fini(&sock->writable); + nni_pollable_fini(&sock->readable); + nni_mtx_fini(&sock->mtx); +} + +static int +surv0_sock_init(void *arg, nni_sock *s) +{ + surv0_sock *sock = arg; + int rv; + + NNI_ARG_UNUSED(s); + + NNI_LIST_INIT(&sock->pipes, surv0_pipe, node); + nni_mtx_init(&sock->mtx); + nni_pollable_init(&sock->readable); + nni_pollable_init(&sock->writable); + // We are always writable. + nni_pollable_raise(&sock->writable); + + // We allow for some buffering on a per pipe basis, to allow for + // multiple contexts to have surveys outstanding. It is recommended + // to increase this if many contexts will want to publish + // at nearly the same time. + nni_atomic_init(&sock->send_buf); + nni_atomic_set(&sock->send_buf, 8); + + // Survey IDs are 32 bits, with the high order bit set. + // We start at a random point, to minimize likelihood of + // accidental collision across restarts. + nni_id_map_init(&sock->surveys, 0x80000000u, 0xffffffffu, true); + + if ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0) { + surv0_sock_fini(sock); + return (rv); + } + + sock->ttl = 8; + + return (0); +} + +static void +surv0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +surv0_sock_close(void *arg) +{ + surv0_sock *s = arg; + + surv0_ctx_close(&s->ctx); +} + +static void +surv0_pipe_stop(void *arg) +{ + surv0_pipe *p = arg; + + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); +} + +static void +surv0_pipe_fini(void *arg) +{ + surv0_pipe *p = arg; + + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); + nni_lmq_fini(&p->send_queue); +} + +static int +surv0_pipe_init(void *arg, nni_pipe *pipe, void *s) +{ + surv0_pipe *p = arg; + surv0_sock *sock = s; + int rv; + int len; + + len = nni_atomic_get(&sock->send_buf); + nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p); + + // This depth could be tunable. The deeper the queue, the more + // concurrent surveys that can be delivered (multiple contexts). + // Note that surveys can be *outstanding*, but not yet put on the wire. + if ((rv = nni_lmq_init(&p->send_queue, len)) != 0) { + surv0_pipe_fini(p); + return (rv); + } + + p->pipe = pipe; + p->sock = sock; + return (0); +} + +static int +surv0_pipe_start(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *s = p->sock; + + if (nni_pipe_peer(p->pipe) != NNG_SURVEYOR0_PEER) { + return (NNG_EPROTO); + } + + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + nni_mtx_unlock(&s->mtx); + + nni_pipe_recv(p->pipe, &p->aio_recv); + return (0); +} + +static void +surv0_pipe_close(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *s = p->sock; + + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); + + nni_mtx_lock(&s->mtx); + p->closed = true; + nni_lmq_flush(&p->send_queue); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); + } + nni_mtx_unlock(&s->mtx); +} + +static void +surv0_pipe_send_cb(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *sock = p->sock; + nni_msg * msg; + + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); + nni_pipe_close(p->pipe); + return; + } + + nni_mtx_lock(&sock->mtx); + if (p->closed) { + nni_mtx_unlock(&sock->mtx); + return; + } + if (nni_lmq_getq(&p->send_queue, &msg) == 0) { + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); + } else { + p->busy = false; + } + nni_mtx_unlock(&sock->mtx); +} + +static void +surv0_pipe_recv_cb(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *sock = p->sock; + surv0_ctx * ctx; + nni_msg * msg; + uint32_t id; + nni_aio * aio; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->pipe); + return; + } + + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + // We yank 4 bytes of body, and move them to the header. + if (nni_msg_len(msg) < 4) { + // Peer sent us garbage. Kick it. + nni_msg_free(msg); + nni_pipe_close(p->pipe); + return; + } + id = nni_msg_trim_u32(msg); + nni_msg_header_append_u32(msg, id); + + nni_mtx_lock(&sock->mtx); + // Best effort at delivery. Discard if no context or context is + // unable to receive it. + if (((ctx = nni_id_get(&sock->surveys, id)) == NULL) || + (nni_lmq_full(&ctx->recv_lmq))) { + nni_msg_free(msg); + } else if ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_finish_msg(aio, msg); + } else { + nni_lmq_putq(&ctx->recv_lmq, msg); + if (ctx == &sock->ctx) { + nni_pollable_raise(&sock->readable); + } + } + nni_mtx_unlock(&sock->mtx); + + nni_pipe_recv(p->pipe, &p->aio_recv); +} + +static int +surv0_ctx_set_survey_time( + void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + surv0_ctx * ctx = arg; + nng_duration expire; + int rv; + if ((rv = nni_copyin_ms(&expire, buf, sz, t)) == 0) { + nni_atomic_set(&ctx->survey_time, expire); + } + return (rv); +} + +static int +surv0_ctx_get_survey_time(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + surv0_ctx *ctx = arg; + return ( + nni_copyout_ms(nni_atomic_get(&ctx->survey_time), buf, szp, t)); +} + +static int +surv0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + surv0_sock *s = arg; + return (nni_copyin_int(&s->ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)); +} + +static int +surv0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + surv0_sock *s = arg; + return (nni_copyout_int(s->ttl, buf, szp, t)); +} + +static int +surv0_sock_set_survey_time( + void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + surv0_sock *s = arg; + return (surv0_ctx_set_survey_time(&s->ctx, buf, sz, t)); +} + +static int +surv0_sock_get_survey_time(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + surv0_sock *s = arg; + return (surv0_ctx_get_survey_time(&s->ctx, buf, szp, t)); +} + +static int +surv0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + surv0_sock *sock = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&sock->writable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static int +surv0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + surv0_sock *sock = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static void +surv0_sock_recv(void *arg, nni_aio *aio) +{ + surv0_sock *s = arg; + surv0_ctx_recv(&s->ctx, aio); +} + +static void +surv0_sock_send(void *arg, nni_aio *aio) +{ + surv0_sock *s = arg; + surv0_ctx_send(&s->ctx, aio); +} + +static nni_proto_pipe_ops surv0_pipe_ops = { + .pipe_size = sizeof(surv0_pipe), + .pipe_init = surv0_pipe_init, + .pipe_fini = surv0_pipe_fini, + .pipe_start = surv0_pipe_start, + .pipe_close = surv0_pipe_close, + .pipe_stop = surv0_pipe_stop, +}; + +static nni_option surv0_ctx_options[] = { + { + .o_name = NNG_OPT_SURVEYOR_SURVEYTIME, + .o_get = surv0_ctx_get_survey_time, + .o_set = surv0_ctx_set_survey_time, + }, + { + .o_name = NULL, + } +}; +static nni_proto_ctx_ops surv0_ctx_ops = { + .ctx_size = sizeof(surv0_ctx), + .ctx_init = surv0_ctx_init, + .ctx_fini = surv0_ctx_fini, + .ctx_send = surv0_ctx_send, + .ctx_recv = surv0_ctx_recv, + .ctx_options = surv0_ctx_options, +}; + +static nni_option surv0_sock_options[] = { + { + .o_name = NNG_OPT_SURVEYOR_SURVEYTIME, + .o_get = surv0_sock_get_survey_time, + .o_set = surv0_sock_set_survey_time, + }, + { + .o_name = NNG_OPT_MAXTTL, + .o_get = surv0_sock_get_max_ttl, + .o_set = surv0_sock_set_max_ttl, + }, + { + .o_name = NNG_OPT_RECVFD, + .o_get = surv0_sock_get_recv_fd, + }, + { + .o_name = NNG_OPT_SENDFD, + .o_get = surv0_sock_get_send_fd, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops surv0_sock_ops = { + .sock_size = sizeof(surv0_sock), + .sock_init = surv0_sock_init, + .sock_fini = surv0_sock_fini, + .sock_open = surv0_sock_open, + .sock_close = surv0_sock_close, + .sock_send = surv0_sock_send, + .sock_recv = surv0_sock_recv, + .sock_options = surv0_sock_options, +}; + +static nni_proto surv0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME }, + .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &surv0_sock_ops, + .proto_pipe_ops = &surv0_pipe_ops, + .proto_ctx_ops = &surv0_ctx_ops, +}; + +int +nng_surveyor0_open(nng_socket *sock) +{ + return (nni_proto_open(sock, &surv0_proto)); +} diff --git a/src/sp/protocol/survey0/survey_test.c b/src/sp/protocol/survey0/survey_test.c new file mode 100644 index 00000000..95d27adf --- /dev/null +++ b/src/sp/protocol/survey0/survey_test.c @@ -0,0 +1,626 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +static void +test_surv_identity(void) +{ + nng_socket s; + int p; + char * n; + + NUTS_PASS(nng_surveyor0_open(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NNG_SURVEYOR0_SELF); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); + NUTS_TRUE(p == NNG_SURVEYOR0_PEER); // 49 + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); + NUTS_MATCH(n, NNG_SURVEYOR0_SELF_NAME); + nng_strfree(n); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); + NUTS_MATCH(n, NNG_SURVEYOR0_PEER_NAME); + nng_strfree(n); + NUTS_CLOSE(s); +} + +static void +test_surv_ttl_option(void) +{ + nng_socket surv; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_MAXTTL; + + NUTS_PASS(nng_surveyor0_open(&surv)); + + NUTS_PASS(nng_socket_set_int(surv, opt, 1)); + NUTS_FAIL(nng_socket_set_int(surv, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(surv, opt, -1), NNG_EINVAL); + // This test will fail if the NNI_MAX_MAX_TTL is changed from the + // builtin default of 15. + NUTS_FAIL(nng_socket_set_int(surv, opt, 16), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(surv, opt, 256), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(surv, opt, 3)); + NUTS_PASS(nng_socket_get_int(surv, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(surv, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(surv, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(surv, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(surv, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(surv, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(surv); +} + +static void +test_surv_survey_time_option(void) +{ + nng_socket surv; + nng_duration d; + bool b; + size_t sz = sizeof(b); + const char * opt = NNG_OPT_SURVEYOR_SURVEYTIME; + + NUTS_PASS(nng_surveyor0_open(&surv)); + + NUTS_PASS(nng_socket_set_ms(surv, opt, 10)); + NUTS_FAIL(nng_socket_set(surv, opt, "", 1), NNG_EINVAL); + NUTS_FAIL(nng_socket_get(surv, opt, &b, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(surv, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(surv, opt, &b), NNG_EBADTYPE); + + NUTS_PASS(nng_socket_get_ms(surv, opt, &d)); + NUTS_TRUE(d == 10); + NUTS_CLOSE(surv); +} + +void +test_surv_recv_bad_state(void) +{ + nng_socket surv; + nng_msg * msg = NULL; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_FAIL(nng_recvmsg(surv, &msg, 0), NNG_ESTATE); + NUTS_TRUE(msg == NULL); + NUTS_CLOSE(surv); +} + +static void +test_surv_recv_garbage(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + uint32_t surv_id; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + + // The message will have a header that contains the 32-bit pipe ID, + // followed by the 32-bit request ID. We will discard the request + // ID before sending it out. + NUTS_TRUE(nng_msg_header_len(m) == 8); + NUTS_PASS(nng_msg_header_chop_u32(m, &surv_id)); + + NUTS_PASS(nng_sendmsg(resp, m, 0)); + NUTS_FAIL(nng_recvmsg(surv, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +#define SECOND 1000 + +void +test_surv_resp_exchange(void) +{ + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, SECOND)); + + NUTS_MARRY(resp, surv); + + NUTS_SEND(surv, "ping"); + NUTS_RECV(resp, "ping"); + NUTS_SEND(resp, "pong"); + NUTS_RECV(surv, "pong"); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_surv_cancel(void) +{ + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 5 * SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 5 * SECOND)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_SENDBUF, 16)); + + NUTS_MARRY(resp, surv); + + // Send req #1 (abc). + NUTS_SEND(surv, "abc"); + + // Sleep a bit. This is so that we ensure that our request gets + // to the far side. (If we cancel too fast, then our outgoing send + // will be canceled before it gets to the peer.) + NUTS_SLEEP(100); + + // Send the next next request ("def"). Note that + // the RESP side server will have already buffered the receive + // request, and should simply be waiting for us to reply to abc. + NUTS_SEND(surv, "def"); + + // Receive the first request (should be abc) on the REP server. + NUTS_RECV(resp, "abc"); + + // RESP sends the reply to first command. This will be discarded + // by the SURV socket. + NUTS_SEND(resp, "abc"); + + // Now get the next command from the REP; should be "def". + NUTS_RECV(resp, "def"); + + // And send it back to REQ. + NUTS_SEND(resp, "def"); + + // Try a req command. This should give back "def" + NUTS_RECV(surv, "def"); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_surv_cancel_abort_recv(void) +{ + nng_aio * aio; + nng_duration time = SECOND * 10; // 10s (kind of never) + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SURVEYOR_SURVEYTIME, time)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_SENDBUF, 16)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 5 * SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 5 * SECOND)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 5 * SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 5 * SECOND)); + + NUTS_MARRY(resp, surv); + + // Send survey #1 (abc). + NUTS_SEND(surv, "abc"); + + // Wait for it to get ot the other side. + NUTS_SLEEP(100); + + nng_aio_set_timeout(aio, 5 * SECOND); + nng_recv_aio(surv, aio); + + // Give time for this recv to post properly. + NUTS_SLEEP(100); + + // Send the next next request ("def"). Note that + // the respondent side server will have already buffered the receive + // request, and should simply be waiting for us to reply to + // abc. + NUTS_SEND(surv, "def"); + + // Our pending I/O should have been canceled. + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + + // Receive the first request (should be abc) on the respondent. + NUTS_RECV(resp, "abc"); + + // Respondent sends the reply to first survey. This will be + // discarded by the SURV socket. + NUTS_SEND(resp, "abc"); + + // Now get the next survey from the RESP; should be "def". + NUTS_RECV(resp, "def"); + + // And send it back to REQ. + NUTS_SEND(resp, "def"); + + // Try a req command. This should give back "def" + NUTS_RECV(surv, "def"); + + nng_aio_free(aio); + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_surv_cancel_post_recv(void) +{ + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_MARRY(surv, resp); + + NUTS_SEND(surv, "ONE"); + NUTS_RECV(resp, "ONE"); + NUTS_SEND(resp, "one"); + NUTS_SLEEP(100); // Make sure reply arrives! + NUTS_SEND(surv, "TWO"); + NUTS_RECV(resp, "TWO"); + NUTS_SEND(resp, "two"); + NUTS_RECV(surv, "two"); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_surv_poll_writeable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(surv, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Survey is broadcast, so we can always write. + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_MARRY(surv, resp); + + // Now it's writable. + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_surv_poll_readable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(surv, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(surv, resp); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_msg_append(msg, "xyz", 3)); + NUTS_PASS(nng_sendmsg(surv, msg, 0)); + NUTS_PASS(nng_recvmsg(resp, &msg, 0)); // recv on rep + NUTS_PASS(nng_sendmsg(resp, msg, 0)); // echo it back + NUTS_SLEEP(200); // give time for message to arrive + + NUTS_TRUE(nuts_poll_fd(fd) == true); + + // and receiving makes it no longer ready + NUTS_PASS(nng_recvmsg(surv, &msg, 0)); + nng_msg_free(msg); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // TODO verify unsolicited response + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_surv_ctx_no_poll(void) +{ + int fd; + nng_socket surv; + nng_ctx ctx; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_ctx_open(&ctx, surv)); + NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); + NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(surv); +} + +static void +test_surv_ctx_recv_nonblock(void) +{ + nng_socket surv; + nng_socket resp; + nng_ctx ctx; + nng_aio * aio; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_ctx_open(&ctx, surv)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + + NUTS_MARRY(surv, resp); + + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_recv(ctx, aio); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +static void +test_surv_ctx_send_nonblock(void) +{ + nng_socket surv; + nng_ctx ctx; + nng_aio * aio; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_ctx_open(&ctx, surv)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + + nng_aio_set_msg(aio, msg); + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); // We never block + NUTS_CLOSE(surv); + nng_aio_free(aio); +} + +static void +test_surv_send_best_effort(void) +{ + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_MARRY(surv, resp); + + for (int i = 0; i < 200; i++) { + NUTS_SEND(surv, "junk"); + } + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_surv_survey_timeout(void) +{ + nng_socket surv; + nng_socket resp; + char buf[16]; + size_t sz; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SURVEYOR_SURVEYTIME, 50)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 100)); + + NUTS_MARRY(surv, resp); + + NUTS_SEND(surv, "hello"); + NUTS_RECV(resp, "hello"); + + sz = sizeof(buf); + NUTS_FAIL(nng_recv(surv, buf, &sz, 0), NNG_ETIMEDOUT); + NUTS_SEND(resp, "world"); + NUTS_FAIL(nng_recv(surv, buf, &sz, 0), NNG_ESTATE); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_surv_ctx_recv_close_socket(void) +{ + nng_socket surv; + nng_socket resp; + nng_ctx ctx; + nng_aio * aio; + nng_msg * m; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_ctx_open(&ctx, surv)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_MARRY(surv, resp); + NUTS_PASS(nng_msg_alloc(&m, 0)); + nng_aio_set_msg(aio, m); + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + + nng_ctx_recv(ctx, aio); + nng_close(surv); + + NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); + nng_aio_free(aio); + NUTS_CLOSE(resp); +} + +static void +test_surv_context_multi(void) +{ + nng_socket surv; + nng_socket resp; + nng_ctx c[5]; + nng_aio * aio; + nng_msg * m; + int cnt = sizeof(c) / sizeof(c[0]); + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_MARRY(surv, resp); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SURVEYOR_SURVEYTIME, 200)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + for (int i = 0; i < cnt; i++) { + NUTS_PASS(nng_ctx_open(&c[i], surv)); + } + + for (int i = 0; i < cnt; i++) { + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, i)); + nng_aio_set_msg(aio, m); + nng_ctx_send(c[i], aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + } + + for (int i = 0; i < cnt; i++) { + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + NUTS_PASS(nng_sendmsg(resp, m, 0)); + } + + for (int i = cnt - 1; i >= 0; i--) { + uint32_t x; + nng_ctx_recv(c[i], aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + m = nng_aio_get_msg(aio); + TEST_ASSERT(m != NULL); + NUTS_PASS(nng_msg_trim_u32(m, &x)); + NUTS_TRUE(x == (uint32_t) i); + nng_msg_free(m); + } + + for (int i = 0; i < cnt; i++) { + nng_ctx_recv(c[i], aio); + nng_aio_wait(aio); + NUTS_TRUE(nng_aio_result(aio) != 0); + } + for (int i = 0; i < cnt; i++) { + nng_ctx_close(c[i]); + } + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +static void +test_surv_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char * addr; + + NUTS_ADDR(addr, "inproc"); + NUTS_PASS(nng_surveyor0_open(&s1)); + NUTS_PASS(nng_surveyor0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_PASS(nng_close(s1)); + NUTS_PASS(nng_close(s2)); + nng_stats_free(stats); +} + +TEST_LIST = { + { "survey identity", test_surv_identity }, + { "survey ttl option", test_surv_ttl_option }, + { "survey survey time option", test_surv_survey_time_option }, + { "survey recv bad state", test_surv_recv_bad_state }, + { "survey recv garbage", test_surv_recv_garbage }, + { "survey respondent exchange", test_surv_resp_exchange }, + { "survey cancel", test_surv_cancel }, + { "survey cancel abort recv", test_surv_cancel_abort_recv }, + { "survey cancel post recv", test_surv_cancel_post_recv }, + { "survey poll writable", test_surv_poll_writeable }, + { "survey poll readable", test_surv_poll_readable }, + { "survey context does not poll", test_surv_ctx_no_poll }, + { "survey context recv close socket", + test_surv_ctx_recv_close_socket }, + { "survey context recv nonblock", test_surv_ctx_recv_nonblock }, + { "survey context send nonblock", test_surv_ctx_send_nonblock }, + { "survey timeout", test_surv_survey_timeout }, + { "survey send best effort", test_surv_send_best_effort }, + { "survey context multi", test_surv_context_multi }, + { "survey validate peer", test_surv_validate_peer }, + { NULL, NULL }, +}; diff --git a/src/sp/protocol/survey0/xrespond.c b/src/sp/protocol/survey0/xrespond.c new file mode 100644 index 00000000..b2f203c3 --- /dev/null +++ b/src/sp/protocol/survey0/xrespond.c @@ -0,0 +1,417 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> + +#include "core/nng_impl.h" +#include "nng/protocol/survey0/respond.h" + +// Respondent protocol. The RESPONDENT protocol is the "replier" side of +// the surveyor pattern. This is useful for building service discovery, or +// voting algorithms, for example. + +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif + +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif + +typedef struct xresp0_pipe xresp0_pipe; +typedef struct xresp0_sock xresp0_sock; + +static void xresp0_recv_cb(void *); +static void xresp0_putq_cb(void *); +static void xresp0_getq_cb(void *); +static void xresp0_send_cb(void *); +static void xresp0_sock_getq_cb(void *); +static void xresp0_pipe_fini(void *); + +// resp0_sock is our per-socket protocol private structure. +struct xresp0_sock { + nni_msgq * urq; + nni_msgq * uwq; + nni_atomic_int ttl; + nni_id_map pipes; + nni_aio aio_getq; + nni_mtx mtx; +}; + +// resp0_pipe is our per-pipe protocol private structure. +struct xresp0_pipe { + nni_pipe * npipe; + xresp0_sock *psock; + uint32_t id; + nni_msgq * sendq; + nni_aio aio_getq; + nni_aio aio_putq; + nni_aio aio_send; + nni_aio aio_recv; +}; + +static void +xresp0_sock_fini(void *arg) +{ + xresp0_sock *s = arg; + + nni_aio_fini(&s->aio_getq); + nni_id_map_fini(&s->pipes); + nni_mtx_fini(&s->mtx); +} + +static int +xresp0_sock_init(void *arg, nni_sock *nsock) +{ + xresp0_sock *s = arg; + + nni_mtx_init(&s->mtx); + nni_atomic_init(&s->ttl); + nni_atomic_set(&s->ttl, 8); // Per RFC + nni_id_map_init(&s->pipes, 0, 0, false); + nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s); + + s->urq = nni_sock_recvq(nsock); + s->uwq = nni_sock_sendq(nsock); + + return (0); +} + +static void +xresp0_sock_open(void *arg) +{ + xresp0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, &s->aio_getq); +} + +static void +xresp0_sock_close(void *arg) +{ + xresp0_sock *s = arg; + + nni_aio_close(&s->aio_getq); +} + +static void +xresp0_pipe_stop(void *arg) +{ + xresp0_pipe *p = arg; + + nni_aio_stop(&p->aio_putq); + nni_aio_stop(&p->aio_getq); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); +} + +static void +xresp0_pipe_fini(void *arg) +{ + xresp0_pipe *p = arg; + + nni_aio_fini(&p->aio_putq); + nni_aio_fini(&p->aio_getq); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); + nni_msgq_fini(p->sendq); +} + +static int +xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s) +{ + xresp0_pipe *p = arg; + int rv; + + nni_aio_init(&p->aio_putq, xresp0_putq_cb, p); + nni_aio_init(&p->aio_recv, xresp0_recv_cb, p); + nni_aio_init(&p->aio_getq, xresp0_getq_cb, p); + nni_aio_init(&p->aio_send, xresp0_send_cb, p); + + if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) { + xresp0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = s; + return (0); +} + +static int +xresp0_pipe_start(void *arg) +{ + xresp0_pipe *p = arg; + xresp0_sock *s = p->psock; + int rv; + + if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) { + return (NNG_EPROTO); + } + + p->id = nni_pipe_id(p->npipe); + + nni_mtx_lock(&s->mtx); + rv = nni_id_set(&s->pipes, p->id, p); + nni_mtx_unlock(&s->mtx); + if (rv != 0) { + return (rv); + } + + nni_pipe_recv(p->npipe, &p->aio_recv); + nni_msgq_aio_get(p->sendq, &p->aio_getq); + + return (rv); +} + +static void +xresp0_pipe_close(void *arg) +{ + xresp0_pipe *p = arg; + xresp0_sock *s = p->psock; + + nni_aio_close(&p->aio_putq); + nni_aio_close(&p->aio_getq); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); + + nni_msgq_close(p->sendq); + + nni_mtx_lock(&s->mtx); + nni_id_remove(&s->pipes, p->id); + nni_mtx_unlock(&s->mtx); +} + +// resp0_sock_send watches for messages from the upper write queue, +// extracts the destination pipe, and forwards it to the appropriate +// destination pipe via a separate queue. This prevents a single bad +// or slow pipe from gumming up the works for the entire socket.s + +void +xresp0_sock_getq_cb(void *arg) +{ + xresp0_sock *s = arg; + nni_msg * msg; + uint32_t id; + xresp0_pipe *p; + + if (nni_aio_result(&s->aio_getq) != 0) { + return; + } + msg = nni_aio_get_msg(&s->aio_getq); + nni_aio_set_msg(&s->aio_getq, NULL); + + // We yank the outgoing pipe id from the header + if (nni_msg_header_len(msg) < 4) { + nni_msg_free(msg); + // We can't really close down the socket, so just keep going. + nni_msgq_aio_get(s->uwq, &s->aio_getq); + return; + } + id = nni_msg_header_trim_u32(msg); + + nni_mtx_lock(&s->mtx); + // Look for the pipe, and attempt to put the message there + // (nonblocking) if we can. If we can't for any reason, then we + // free the message. + if (((p = nni_id_get(&s->pipes, id)) == NULL) || + (nni_msgq_tryput(p->sendq, msg) != 0)) { + nni_msg_free(msg); + } + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_get(s->uwq, &s->aio_getq); +} + +void +xresp0_getq_cb(void *arg) +{ + xresp0_pipe *p = arg; + + if (nni_aio_result(&p->aio_getq) != 0) { + nni_pipe_close(p->npipe); + return; + } + + nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq)); + nni_aio_set_msg(&p->aio_getq, NULL); + + nni_pipe_send(p->npipe, &p->aio_send); +} + +void +xresp0_send_cb(void *arg) +{ + xresp0_pipe *p = arg; + + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); + nni_pipe_close(p->npipe); + return; + } + + nni_msgq_aio_get(p->sendq, &p->aio_getq); +} + +static void +xresp0_recv_cb(void *arg) +{ + xresp0_pipe *p = arg; + xresp0_sock *s = p->psock; + nni_msgq * urq = s->urq; + nni_msg * msg; + int hops; + int ttl; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->npipe); + return; + } + + ttl = nni_atomic_get(&s->ttl); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_set_pipe(msg, p->id); + + // Store the pipe id in the header, first thing. + nni_msg_header_append_u32(msg, p->id); + + // Move backtrace from body to header + hops = 1; + for (;;) { + bool end; + uint8_t *body; + + if (hops > ttl) { + goto drop; + } + hops++; + if (nni_msg_len(msg) < 4) { + // Peer sent us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_close(p->npipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { + goto drop; + } + nni_msg_trim(msg, 4); + if (end) { + break; + } + } + + // Now send it up. + nni_aio_set_msg(&p->aio_putq, msg); + nni_msgq_aio_put(urq, &p->aio_putq); + return; + +drop: + nni_msg_free(msg); + nni_pipe_recv(p->npipe, &p->aio_recv); +} + +static void +xresp0_putq_cb(void *arg) +{ + xresp0_pipe *p = arg; + + if (nni_aio_result(&p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_putq)); + nni_aio_set_msg(&p->aio_putq, NULL); + nni_pipe_close(p->npipe); + return; + } + + nni_pipe_recv(p->npipe, &p->aio_recv); +} + +static int +xresp0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + xresp0_sock *s = arg; + int ttl; + int rv; + if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) { + nni_atomic_set(&s->ttl, ttl); + } + return (rv); +} + +static int +xresp0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + xresp0_sock *s = arg; + return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); +} + +static void +xresp0_sock_send(void *arg, nni_aio *aio) +{ + xresp0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static void +xresp0_sock_recv(void *arg, nni_aio *aio) +{ + xresp0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static nni_proto_pipe_ops xresp0_pipe_ops = { + .pipe_size = sizeof(xresp0_pipe), + .pipe_init = xresp0_pipe_init, + .pipe_fini = xresp0_pipe_fini, + .pipe_start = xresp0_pipe_start, + .pipe_close = xresp0_pipe_close, + .pipe_stop = xresp0_pipe_stop, +}; + +static nni_option xresp0_sock_options[] = { + { + .o_name = NNG_OPT_MAXTTL, + .o_get = xresp0_sock_get_maxttl, + .o_set = xresp0_sock_set_maxttl, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops xresp0_sock_ops = { + .sock_size = sizeof(xresp0_sock), + .sock_init = xresp0_sock_init, + .sock_fini = xresp0_sock_fini, + .sock_open = xresp0_sock_open, + .sock_close = xresp0_sock_close, + .sock_send = xresp0_sock_send, + .sock_recv = xresp0_sock_recv, + .sock_options = xresp0_sock_options, +}; + +static nni_proto xresp0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &xresp0_sock_ops, + .proto_pipe_ops = &xresp0_pipe_ops, +}; + +int +nng_respondent0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &xresp0_proto)); +} diff --git a/src/sp/protocol/survey0/xrespond_test.c b/src/sp/protocol/survey0/xrespond_test.c new file mode 100644 index 00000000..ec5e99a3 --- /dev/null +++ b/src/sp/protocol/survey0/xrespond_test.c @@ -0,0 +1,436 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +static void +test_xresp_identity(void) +{ + nng_socket s; + int p1, p2; + char * n1; + char * n2; + + NUTS_PASS(nng_respondent0_open_raw(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p1)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p2)); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n1)); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n2)); + NUTS_CLOSE(s); + NUTS_TRUE(p1 == NNG_RESPONDENT0_SELF); + NUTS_TRUE(p2 == NNG_RESPONDENT0_PEER); + NUTS_MATCH(n1, NNG_RESPONDENT0_SELF_NAME); + NUTS_MATCH(n2, NNG_RESPONDENT0_PEER_NAME); + nng_strfree(n1); + nng_strfree(n2); +} + +static void +test_xresp_raw(void) +{ + nng_socket s; + bool b; + + NUTS_PASS(nng_respondent0_open_raw(&s)); + NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); + NUTS_TRUE(b); + NUTS_CLOSE(s); +} + +static void +test_xresp_no_context(void) +{ + nng_socket s; + nng_ctx ctx; + + NUTS_PASS(nng_respondent0_open_raw(&s)); + NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP); + NUTS_CLOSE(s); +} + +static void +test_xresp_poll_writeable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_socket_get_int(resp, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // We are always writeable, even before connect. This is so that + // back-pressure from a bad peer can't trash others. We assume + // that peers won't send us requests faster than they can consume + // the answers. If they do, they will lose their answers. + NUTS_TRUE(nuts_poll_fd(fd) == true); + + NUTS_MARRY(surv, resp); + + // Now it's writable. + NUTS_TRUE(nuts_poll_fd(fd) == true); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_poll_readable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_socket_get_int(resp, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(surv, resp); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(surv, "abc"); + NUTS_SLEEP(100); + + NUTS_TRUE(nuts_poll_fd(fd) == true); + + // and receiving makes it no longer ready + NUTS_PASS(nng_recvmsg(resp, &msg, 0)); + nng_msg_free(msg); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char * addr; + + NUTS_ADDR(addr, "inproc"); + + NUTS_PASS(nng_respondent0_open_raw(&s1)); + NUTS_PASS(nng_respondent0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + nng_stats_free(stats); +} + +static void +test_xresp_close_pipe_before_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p; + nng_aio * aio1; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + + NUTS_MARRY(surv, resp); + NUTS_SEND(surv, "test"); + + nng_recv_aio(resp, aio1); + nng_aio_wait(aio1); + NUTS_PASS(nng_aio_result(aio1)); + NUTS_TRUE((m = nng_aio_get_msg(aio1)) != NULL); + p = nng_msg_get_pipe(m); + NUTS_PASS(nng_pipe_close(p)); + NUTS_PASS(nng_sendmsg(resp, m, 0)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); + nng_aio_free(aio1); +} + +static void +test_xresp_close_pipe_during_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p; + nng_msg * m; + + NUTS_PASS(nng_respondent_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 200)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_SENDBUF, 20)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_RECVBUF, 20)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_SENDBUF, 20)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_RECVBUF, 1)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, (unsigned) 0x81000000u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + p = nng_msg_get_pipe(m); + nng_msg_free(m); + + for (int i = 0; i < 100; i++) { + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p))); + NUTS_PASS( + nng_msg_header_append_u32(m, (unsigned) i | 0x80000000u)); + // protocol does not exert back-pressure + NUTS_PASS(nng_sendmsg(resp, m, 0)); + } + NUTS_PASS(nng_pipe_close(p)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_close_during_recv(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_RECVBUF, 5)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_SENDBUF, 20)); + + NUTS_MARRY(surv, resp); + + for (unsigned i = 0; i < 100; i++) { + int rv; + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_header_append_u32(m, i | 0x80000000u)); + rv = nng_sendmsg(surv, m, 0); + if (rv == NNG_ETIMEDOUT) { + nng_msg_free(m); + break; + } + } + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_recv_aio_stopped(void) +{ + nng_socket resp; + nng_aio * aio; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_stop(aio); + nng_recv_aio(resp, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +static void +test_xresp_send_no_header(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_sendmsg(resp, m, 0)); + NUTS_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_recv_garbage(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + NUTS_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_ttl_option(void) +{ + nng_socket resp; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_MAXTTL; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + + NUTS_PASS(nng_socket_set_int(resp, opt, 1)); + NUTS_FAIL(nng_socket_set_int(resp, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(resp, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(resp, opt, 16), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(resp, opt, 256), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(resp, opt, 3)); + NUTS_PASS(nng_socket_get_int(resp, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(resp, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(resp, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(resp, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(resp, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(resp, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(resp); +} + +static void +test_xresp_ttl_drop(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_MAXTTL, 3)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 200)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + // Send messages. Note that xresp implicitly adds a hop on receive. + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 2 hops + NUTS_PASS(nng_msg_append_u32(m, 0x80000001u)); + NUTS_PASS(nng_msg_append(m, "PASS1", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard! + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 3u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000002u)); + NUTS_PASS(nng_msg_append(m, "FAIL2", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 3 hops - passes + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000003u)); + NUTS_PASS(nng_msg_append(m, "PASS3", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard! + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 3u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000003u)); + NUTS_PASS(nng_msg_append(m, "FAIL4", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + // So on receive we should see 80000001 and 80000003. + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + NUTS_TRUE(nng_msg_header_len(m) == 12); + NUTS_TRUE(nng_msg_len(m) == 6); + NUTS_MATCH(nng_msg_body(m), "PASS1"); + nng_msg_free(m); + + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + NUTS_TRUE(nng_msg_header_len(m) == 16); // 3 hops + ID + NUTS_TRUE(nng_msg_len(m) == 6); + NUTS_MATCH(nng_msg_body(m), "PASS3"); + nng_msg_free(m); + + NUTS_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +NUTS_TESTS = { + { "xrespond identity", test_xresp_identity }, + { "xrespond raw", test_xresp_raw }, + { "xrespond no context", test_xresp_no_context }, + { "xrespond poll readable", test_xresp_poll_readable }, + { "xrespond poll writable", test_xresp_poll_writeable }, + { "xrespond validate peer", test_xresp_validate_peer }, + { "xrespond close pipe before send", + test_xresp_close_pipe_before_send }, + { "xrespond close pipe during send", + test_xresp_close_pipe_during_send }, + { "xrespond close during recv", test_xresp_close_during_recv }, + { "xrespond recv aio stopped", test_xresp_recv_aio_stopped }, + { "xrespond send no header", test_xresp_send_no_header }, + { "xrespond recv garbage", test_xresp_recv_garbage }, + { "xrespond ttl option", test_xresp_ttl_option }, + { "xrespond ttl drop", test_xresp_ttl_drop }, + { NULL, NULL }, +}; diff --git a/src/sp/protocol/survey0/xsurvey.c b/src/sp/protocol/survey0/xsurvey.c new file mode 100644 index 00000000..2a198662 --- /dev/null +++ b/src/sp/protocol/survey0/xsurvey.c @@ -0,0 +1,379 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" +#include "nng/protocol/survey0/survey.h" + +// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the +// survey pattern. This is useful for building service discovery, voting, etc. + +typedef struct xsurv0_pipe xsurv0_pipe; +typedef struct xsurv0_sock xsurv0_sock; + +static void xsurv0_sock_getq_cb(void *); +static void xsurv0_getq_cb(void *); +static void xsurv0_putq_cb(void *); +static void xsurv0_send_cb(void *); +static void xsurv0_recv_cb(void *); + +// surv0_sock is our per-socket protocol private structure. +struct xsurv0_sock { + nni_list pipes; + nni_aio aio_getq; + nni_msgq * uwq; + nni_msgq * urq; + nni_mtx mtx; + nni_atomic_int ttl; +}; + +// surv0_pipe is our per-pipe protocol private structure. +struct xsurv0_pipe { + nni_pipe * npipe; + xsurv0_sock * psock; + nni_msgq * sendq; + nni_list_node node; + nni_aio aio_getq; + nni_aio aio_putq; + nni_aio aio_send; + nni_aio aio_recv; +}; + +static void +xsurv0_sock_fini(void *arg) +{ + xsurv0_sock *s = arg; + + nni_aio_fini(&s->aio_getq); + nni_mtx_fini(&s->mtx); +} + +static int +xsurv0_sock_init(void *arg, nni_sock *nsock) +{ + xsurv0_sock *s = arg; + + nni_aio_init(&s->aio_getq, xsurv0_sock_getq_cb, s); + NNI_LIST_INIT(&s->pipes, xsurv0_pipe, node); + nni_mtx_init(&s->mtx); + + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + nni_atomic_init(&s->ttl); + nni_atomic_set(&s->ttl, 8); + + return (0); +} + +static void +xsurv0_sock_open(void *arg) +{ + xsurv0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, &s->aio_getq); +} + +static void +xsurv0_sock_close(void *arg) +{ + xsurv0_sock *s = arg; + + nni_aio_close(&s->aio_getq); +} + +static void +xsurv0_pipe_stop(void *arg) +{ + xsurv0_pipe *p = arg; + + nni_aio_stop(&p->aio_getq); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); + nni_aio_stop(&p->aio_putq); +} + +static void +xsurv0_pipe_fini(void *arg) +{ + xsurv0_pipe *p = arg; + + nni_aio_fini(&p->aio_getq); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); + nni_aio_fini(&p->aio_putq); + nni_msgq_fini(p->sendq); +} + +static int +xsurv0_pipe_init(void *arg, nni_pipe *npipe, void *s) +{ + xsurv0_pipe *p = arg; + int rv; + + nni_aio_init(&p->aio_getq, xsurv0_getq_cb, p); + nni_aio_init(&p->aio_putq, xsurv0_putq_cb, p); + nni_aio_init(&p->aio_send, xsurv0_send_cb, p); + nni_aio_init(&p->aio_recv, xsurv0_recv_cb, p); + + // This depth could be tunable. The queue exists so that if we + // have multiple requests coming in faster than we can deliver them, + // we try to avoid dropping them. We don't really have a solution + // for applying back pressure. It would be nice if surveys carried + // an expiration with them, so that we could discard any that are + // not delivered before their expiration date. + if ((rv = nni_msgq_init(&p->sendq, 16)) != 0) { + xsurv0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = s; + return (0); +} + +static int +xsurv0_pipe_start(void *arg) +{ + xsurv0_pipe *p = arg; + xsurv0_sock *s = p->psock; + + if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) { + return (NNG_EPROTO); + } + + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + nni_mtx_unlock(&s->mtx); + + nni_msgq_aio_get(p->sendq, &p->aio_getq); + nni_pipe_recv(p->npipe, &p->aio_recv); + return (0); +} + +static void +xsurv0_pipe_close(void *arg) +{ + xsurv0_pipe *p = arg; + xsurv0_sock *s = p->psock; + + nni_aio_close(&p->aio_getq); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); + nni_aio_close(&p->aio_putq); + + nni_msgq_close(p->sendq); + + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); + } + nni_mtx_unlock(&s->mtx); +} + +static void +xsurv0_getq_cb(void *arg) +{ + xsurv0_pipe *p = arg; + + if (nni_aio_result(&p->aio_getq) != 0) { + nni_pipe_close(p->npipe); + return; + } + + nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq)); + nni_aio_set_msg(&p->aio_getq, NULL); + + nni_pipe_send(p->npipe, &p->aio_send); +} + +static void +xsurv0_send_cb(void *arg) +{ + xsurv0_pipe *p = arg; + + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); + nni_pipe_close(p->npipe); + return; + } + + nni_msgq_aio_get(p->sendq, &p->aio_getq); +} + +static void +xsurv0_putq_cb(void *arg) +{ + xsurv0_pipe *p = arg; + + if (nni_aio_result(&p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_putq)); + nni_aio_set_msg(&p->aio_putq, NULL); + nni_pipe_close(p->npipe); + return; + } + + nni_pipe_recv(p->npipe, &p->aio_recv); +} + +static void +xsurv0_recv_cb(void *arg) +{ + xsurv0_pipe *p = arg; + nni_msg * msg; + bool end; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->npipe); + return; + } + + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); + end = false; + + while (!end) { + uint8_t *body; + + if (nni_msg_len(msg) < 4) { + // Peer gave us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_close(p->npipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + + if (nni_msg_header_append(msg, body, sizeof(uint32_t)) != 0) { + // TODO: bump a no-memory stat + nni_msg_free(msg); + // Closing the pipe may release some memory. + // It at least gives an indication to the peer + // that we've lost the message. + nni_pipe_close(p->npipe); + return; + } + nni_msg_trim(msg, sizeof(uint32_t)); + } + + nni_aio_set_msg(&p->aio_putq, msg); + nni_msgq_aio_put(p->psock->urq, &p->aio_putq); +} + +static int +xsurv0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + xsurv0_sock *s = arg; + int ttl; + int rv; + if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) { + nni_atomic_set(&s->ttl, ttl); + } + return (rv); +} + +static int +xsurv0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + xsurv0_sock *s = arg; + return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); +} + +static void +xsurv0_sock_getq_cb(void *arg) +{ + xsurv0_sock *s = arg; + xsurv0_pipe *p; + nni_msg * msg; + + if (nni_aio_result(&s->aio_getq) != 0) { + // Should be NNG_ECLOSED. + return; + } + msg = nni_aio_get_msg(&s->aio_getq); + nni_aio_set_msg(&s->aio_getq, NULL); + + nni_mtx_lock(&s->mtx); + NNI_LIST_FOREACH (&s->pipes, p) { + nni_msg_clone(msg); + if (nni_msgq_tryput(p->sendq, msg) != 0) { + nni_msg_free(msg); + } + } + + nni_msgq_aio_get(s->uwq, &s->aio_getq); + nni_mtx_unlock(&s->mtx); + + // If there were no pipes to send on, just toss the message. + nni_msg_free(msg); +} + +static void +xsurv0_sock_recv(void *arg, nni_aio *aio) +{ + xsurv0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static void +xsurv0_sock_send(void *arg, nni_aio *aio) +{ + xsurv0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static nni_proto_pipe_ops xsurv0_pipe_ops = { + .pipe_size = sizeof(xsurv0_pipe), + .pipe_init = xsurv0_pipe_init, + .pipe_fini = xsurv0_pipe_fini, + .pipe_start = xsurv0_pipe_start, + .pipe_close = xsurv0_pipe_close, + .pipe_stop = xsurv0_pipe_stop, +}; + +static nni_option xsurv0_sock_options[] = { + { + .o_name = NNG_OPT_MAXTTL, + .o_get = xsurv0_sock_get_max_ttl, + .o_set = xsurv0_sock_set_max_ttl, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops xsurv0_sock_ops = { + .sock_size = sizeof(xsurv0_sock), + .sock_init = xsurv0_sock_init, + .sock_fini = xsurv0_sock_fini, + .sock_open = xsurv0_sock_open, + .sock_close = xsurv0_sock_close, + .sock_send = xsurv0_sock_send, + .sock_recv = xsurv0_sock_recv, + .sock_options = xsurv0_sock_options, +}; + +static nni_proto xsurv0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME }, + .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &xsurv0_sock_ops, + .proto_pipe_ops = &xsurv0_pipe_ops, +}; + +int +nng_surveyor0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &xsurv0_proto)); +} diff --git a/src/sp/protocol/survey0/xsurvey_test.c b/src/sp/protocol/survey0/xsurvey_test.c new file mode 100644 index 00000000..f8e9d401 --- /dev/null +++ b/src/sp/protocol/survey0/xsurvey_test.c @@ -0,0 +1,399 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +static void +test_xsurveyor_identity(void) +{ + nng_socket s; + int p; + char * n; + + NUTS_PASS(nng_surveyor0_open_raw(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NNG_SURVEYOR0_SELF); // 0x62 + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); + NUTS_TRUE(p == NNG_SURVEYOR0_PEER); // 0x62 + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); + NUTS_MATCH(n, NNG_SURVEYOR0_SELF_NAME); + nng_strfree(n); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); + NUTS_MATCH(n, NNG_SURVEYOR0_PEER_NAME); + nng_strfree(n); + NUTS_CLOSE(s); +} + +static void +test_xsurveyor_raw(void) +{ + nng_socket s; + bool b; + + NUTS_PASS(nng_surveyor0_open_raw(&s)); + NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); + NUTS_TRUE(b); + NUTS_CLOSE(s); +} + +static void +test_xsurvey_no_context(void) +{ + nng_socket s; + nng_ctx ctx; + + NUTS_PASS(nng_surveyor0_open_raw(&s)); + NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP); + NUTS_CLOSE(s); +} + +static void +test_xsurvey_poll_writeable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(surv, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Survey is broadcast, so we can always write. + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_MARRY(surv, resp); + + // Now it's writable. + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_poll_readable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(surv, NNG_OPT_RECVFD, &fd)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(surv, resp); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_PASS(nng_msg_alloc(&msg, 0)); + // Request ID + NUTS_PASS(nng_msg_append_u32(msg, 0x80000000)); + NUTS_PASS(nng_sendmsg(surv, msg, 0)); + + NUTS_PASS(nng_recvmsg(resp, &msg, 0)); + NUTS_PASS(nng_sendmsg(resp, msg, 0)); + + NUTS_SLEEP(100); + + NUTS_TRUE(nuts_poll_fd(fd) ); + + // and receiving makes it no longer ready + NUTS_PASS(nng_recvmsg(surv, &msg, 0)); + nng_msg_free(msg); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char *addr; + + NUTS_ADDR(addr, "inproc"); + + NUTS_PASS(nng_surveyor0_open_raw(&s1)); + NUTS_PASS(nng_surveyor0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + nng_stats_free(stats); +} + +static void +test_xsurvey_recv_aio_stopped(void) +{ + nng_socket surv; + nng_aio * aio; + + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_stop(aio); + nng_recv_aio(surv, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_CLOSE(surv); + nng_aio_free(aio); +} + +static void +test_xsurvey_recv_garbage(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + uint32_t req_id; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000000)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + + // The message will have a header that contains the 32-bit pipe ID, + // followed by the 32-bit request ID. We will discard the request + // ID before sending it out. + NUTS_TRUE(nng_msg_header_len(m) == 8); + NUTS_PASS(nng_msg_header_chop_u32(m, &req_id)); + NUTS_TRUE(req_id == 0x80000000); + + NUTS_PASS(nng_sendmsg(resp, m, 0)); + NUTS_FAIL(nng_recvmsg(surv, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_recv_header(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_pipe p; + uint32_t id; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY_EX(surv, resp, NULL, NULL, &p); + + // Simulate a few hops. + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p))); + NUTS_PASS(nng_msg_header_append_u32(m, 0x2)); + NUTS_PASS(nng_msg_header_append_u32(m, 0x1)); + NUTS_PASS(nng_msg_header_append_u32(m, 0x80000123u)); + + NUTS_PASS(nng_sendmsg(resp, m, 0)); + + NUTS_PASS(nng_recvmsg(surv, &m, 0)); + NUTS_TRUE(nng_msg_header_len(m) == 12); + NUTS_PASS(nng_msg_header_trim_u32(m, &id)); + NUTS_TRUE(id == 0x2); + NUTS_PASS(nng_msg_header_trim_u32(m, &id)); + NUTS_TRUE(id == 0x1); + NUTS_PASS(nng_msg_header_trim_u32(m, &id)); + NUTS_TRUE(id == 0x80000123u); + + nng_msg_free(m); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_close_during_recv(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_pipe p1; + nng_pipe p2; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_RECVBUF, 1)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_SENDBUF, 20)); + + NUTS_MARRY_EX(surv, resp, NULL, &p1, &p2); + NUTS_TRUE(nng_pipe_id(p1) > 0); + NUTS_TRUE(nng_pipe_id(p2) > 0); + + for (unsigned i = 0; i < 20; i++) { + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2))); + NUTS_PASS(nng_msg_header_append_u32(m, i | 0x80000000u)); + NUTS_SLEEP(10); + NUTS_PASS(nng_sendmsg(resp, m, 0)); + } + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_close_pipe_during_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_pipe p1; + nng_pipe p2; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_RECVBUF, 5)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_SENDBUF, 20)); + + NUTS_MARRY_EX(surv, resp, NULL, &p1, &p2); + NUTS_TRUE(nng_pipe_id(p1) > 0); + NUTS_TRUE(nng_pipe_id(p2) > 0); + + for (unsigned i = 0; i < 20; i++) { + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_header_append_u32(m, i | 0x80000000u)); + NUTS_SLEEP(10); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + } + + NUTS_PASS(nng_pipe_close(p1)); + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_ttl_option(void) +{ + nng_socket s; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_MAXTTL; + + NUTS_PASS(nng_surveyor0_open_raw(&s)); + + NUTS_PASS(nng_socket_set_int(s, opt, 1)); + NUTS_FAIL(nng_socket_set_int(s, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s, opt, 16), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s, opt, 256), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(s, opt, 3)); + NUTS_PASS(nng_socket_get_int(s, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(s, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(s, opt, "", 1) , NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(s, opt, &v, &sz) , NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(s, opt, true) , NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(s, opt, &b) , NNG_EBADTYPE); + + NUTS_CLOSE(s); +} + +static void +test_xsurvey_broadcast(void) +{ + nng_socket resp1; + nng_socket resp2; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open(&resp1)); + NUTS_PASS(nng_respondent0_open(&resp2)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(resp1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp2, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 100)); + + NUTS_MARRY(surv, resp1); + NUTS_MARRY(surv, resp2); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_header_append_u32(m, 0x80000002u)); + NUTS_PASS(nng_msg_append(m, "hello", 6)); + + NUTS_PASS(nng_sendmsg(surv, m, 0)); + NUTS_RECV(resp1, "hello"); + NUTS_RECV(resp2, "hello"); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp1); + NUTS_CLOSE(resp2); +} + +TEST_LIST = { + { "xsurvey identity", test_xsurveyor_identity }, + { "xsurvey raw", test_xsurveyor_raw }, + { "xsurvey no context", test_xsurvey_no_context }, + { "xsurvey poll readable", test_xsurvey_poll_readable }, + { "xsurvey poll writable", test_xsurvey_poll_writeable }, + { "xsurvey validate peer", test_xsurvey_validate_peer }, + { "xsurvey recv aio stopped", test_xsurvey_recv_aio_stopped }, + { "xsurvey recv garbage", test_xsurvey_recv_garbage }, + { "xsurvey recv header", test_xsurvey_recv_header }, + { "xsurvey close during recv", test_xsurvey_close_during_recv }, + { "xsurvey close pipe during send", + test_xsurvey_close_pipe_during_send }, + { "xsurvey ttl option", test_xsurvey_ttl_option }, + { "xsurvey broadcast", test_xsurvey_broadcast }, + { NULL, NULL }, +}; |
