diff options
| author | Garrett D'Amore <garrett@damore.org> | 2021-01-01 11:30:03 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2021-01-01 12:46:17 -0800 |
| commit | ed542ac45e00c9b2faa0b41f3c00de6e291e5678 (patch) | |
| tree | 673924ff077d468e6756529c2c204698d3faa47c /src/sp/protocol/survey0 | |
| parent | 1413b2421a82cd9b9cde178d44fb60c7893176b0 (diff) | |
| download | nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.gz nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.bz2 nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.zip | |
fixes #1345 Restructure the source tree
This is not quite complete, but it sets the stage for other
protocols (such as zmq or mqtt) to be added to the project.
Diffstat (limited to 'src/sp/protocol/survey0')
| -rw-r--r-- | src/sp/protocol/survey0/CMakeLists.txt | 25 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/respond.c | 693 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/respond_test.c | 586 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/survey.c | 663 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/survey_test.c | 626 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/xrespond.c | 417 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/xrespond_test.c | 436 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/xsurvey.c | 379 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/xsurvey_test.c | 399 |
9 files changed, 4224 insertions, 0 deletions
diff --git a/src/sp/protocol/survey0/CMakeLists.txt b/src/sp/protocol/survey0/CMakeLists.txt new file mode 100644 index 00000000..b5daca41 --- /dev/null +++ b/src/sp/protocol/survey0/CMakeLists.txt @@ -0,0 +1,25 @@ +# +# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Surveyor/Respondent protocol +nng_directory(survey0) + +nng_sources_if(NNG_PROTO_SURVEYOR0 survey.c xsurvey.c) +nng_headers_if(NNG_PROTO_SURVEYOR0 nng/protocol/survey0/survey.h) +nng_defines_if(NNG_PROTO_SURVEYOR0 NNG_HAVE_SURVEYOR0) + +nng_sources_if(NNG_PROTO_RESPONDENT0 respond.c xrespond.c) +nng_headers_if(NNG_PROTO_RESPONDENT0 nng/protocol/survey0/respond.h) +nng_defines_if(NNG_PROTO_RESPONDENT0 NNG_HAVE_RESPONDENT0) + +nng_test(respond_test) +nng_test(survey_test) +nng_test(xrespond_test) +nng_test(xsurvey_test)
\ No newline at end of file diff --git a/src/sp/protocol/survey0/respond.c b/src/sp/protocol/survey0/respond.c new file mode 100644 index 00000000..ad551c8f --- /dev/null +++ b/src/sp/protocol/survey0/respond.c @@ -0,0 +1,693 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "nng/protocol/survey0/respond.h" + +// Respondent protocol. The RESPONDENT protocol is the "replier" side of +// the surveyor pattern. This is useful for building service discovery, or +// voting algorithms, for example. + +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif + +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif + +typedef struct resp0_pipe resp0_pipe; +typedef struct resp0_sock resp0_sock; +typedef struct resp0_ctx resp0_ctx; + +static void resp0_pipe_send_cb(void *); +static void resp0_pipe_recv_cb(void *); +static void resp0_pipe_fini(void *); + +struct resp0_ctx { + resp0_sock * sock; + uint32_t pipe_id; + resp0_pipe * spipe; // send pipe + nni_aio * saio; // send aio + nni_aio * raio; // recv aio + nni_list_node sqnode; + nni_list_node rqnode; + size_t btrace_len; + uint32_t btrace[NNI_MAX_MAX_TTL + 1]; +}; + +// resp0_sock is our per-socket protocol private structure. +struct resp0_sock { + nni_mtx mtx; + nni_atomic_int ttl; + nni_id_map pipes; + resp0_ctx ctx; + nni_list recvpipes; + nni_list recvq; + nni_pollable readable; + nni_pollable writable; +}; + +// resp0_pipe is our per-pipe protocol private structure. +struct resp0_pipe { + nni_pipe * npipe; + resp0_sock * psock; + bool busy; + bool closed; + uint32_t id; + nni_list sendq; // contexts waiting to send + nni_aio aio_send; + nni_aio aio_recv; + nni_list_node rnode; // receivable linkage +}; + +static void +resp0_ctx_close(void *arg) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + nni_aio * aio; + + // complete any outstanding operations here, cancellation, etc. + + nni_mtx_lock(&s->mtx); + if ((aio = ctx->saio) != NULL) { + resp0_pipe *p = ctx->spipe; + ctx->saio = NULL; + ctx->spipe = NULL; + nni_list_remove(&p->sendq, ctx); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + if ((aio = ctx->raio) != NULL) { + ctx->raio = NULL; + nni_list_remove(&s->recvq, ctx); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&s->mtx); +} + +static void +resp0_ctx_fini(void *arg) +{ + resp0_ctx *ctx = arg; + + resp0_ctx_close(ctx); +} + +static int +resp0_ctx_init(void *carg, void *sarg) +{ + resp0_sock *s = sarg; + resp0_ctx * ctx = carg; + + NNI_LIST_NODE_INIT(&ctx->sqnode); + NNI_LIST_NODE_INIT(&ctx->rqnode); + ctx->btrace_len = 0; + ctx->sock = s; + ctx->pipe_id = 0; + + return (0); +} + +static void +resp0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + + nni_mtx_lock(&s->mtx); + if (ctx->saio != aio) { + nni_mtx_unlock(&s->mtx); + return; + } + nni_list_node_remove(&ctx->sqnode); + ctx->saio = NULL; + nni_mtx_unlock(&s->mtx); + nni_msg_header_clear(nni_aio_get_msg(aio)); // reset the headers + nni_aio_finish_error(aio, rv); +} + +static void +resp0_ctx_send(void *arg, nni_aio *aio) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + resp0_pipe *p; + nni_msg * msg; + size_t len; + uint32_t pid; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + msg = nni_aio_get_msg(aio); + nni_msg_header_clear(msg); + + if (ctx == &s->ctx) { + // We can't send anymore, because only one send per request. + nni_pollable_clear(&s->writable); + } + + nni_mtx_lock(&s->mtx); + if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + if ((len = ctx->btrace_len) == 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + pid = ctx->pipe_id; + ctx->pipe_id = 0; + ctx->btrace_len = 0; + + if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + if ((p = nni_id_get(&s->pipes, pid)) == NULL) { + // Surveyor has left the building. Just discard the reply. + nni_mtx_unlock(&s->mtx); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + return; + } + + if (!p->busy) { + p->busy = true; + len = nni_msg_len(msg); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->npipe, &p->aio_send); + nni_mtx_unlock(&s->mtx); + + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + return; + } + + ctx->saio = aio; + ctx->spipe = p; + nni_list_append(&p->sendq, ctx); + nni_mtx_unlock(&s->mtx); +} + +static void +resp0_sock_fini(void *arg) +{ + resp0_sock *s = arg; + + nni_id_map_fini(&s->pipes); + resp0_ctx_fini(&s->ctx); + nni_pollable_fini(&s->writable); + nni_pollable_fini(&s->readable); + nni_mtx_fini(&s->mtx); +} + +static int +resp0_sock_init(void *arg, nni_sock *nsock) +{ + resp0_sock *s = arg; + + NNI_ARG_UNUSED(nsock); + + nni_mtx_init(&s->mtx); + nni_id_map_init(&s->pipes, 0, 0, false); + + NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode); + NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode); + + nni_atomic_init(&s->ttl); + nni_atomic_set(&s->ttl, 8); // Per RFC + + (void) resp0_ctx_init(&s->ctx, s); + + // We start off without being either readable or writable. + // Readability comes when there is something on the socket. + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); + return (0); +} + +static void +resp0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +resp0_sock_close(void *arg) +{ + resp0_sock *s = arg; + + resp0_ctx_close(&s->ctx); +} + +static void +resp0_pipe_stop(void *arg) +{ + resp0_pipe *p = arg; + + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); +} + +static void +resp0_pipe_fini(void *arg) +{ + resp0_pipe *p = arg; + nng_msg * msg; + + if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) { + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_free(msg); + } + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); +} + +static int +resp0_pipe_init(void *arg, nni_pipe *npipe, void *s) +{ + resp0_pipe *p = arg; + + nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p); + nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p); + + NNI_LIST_INIT(&p->sendq, resp0_ctx, sqnode); + + p->npipe = npipe; + p->psock = s; + p->busy = false; + p->id = nni_pipe_id(npipe); + + return (0); +} + +static int +resp0_pipe_start(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + int rv; + + if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) { + return (NNG_EPROTO); + } + + nni_mtx_lock(&s->mtx); + rv = nni_id_set(&s->pipes, p->id, p); + nni_mtx_unlock(&s->mtx); + if (rv != 0) { + return (rv); + } + + nni_pipe_recv(p->npipe, &p->aio_recv); + return (rv); +} + +static void +resp0_pipe_close(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + resp0_ctx * ctx; + + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); + + nni_mtx_lock(&s->mtx); + p->closed = true; + while ((ctx = nni_list_first(&p->sendq)) != NULL) { + nni_aio *aio; + nni_msg *msg; + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + msg = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + } + if (p->id == s->ctx.pipe_id) { + // Make sure user space knows they can send a message to us, + // which we will happily discard. + nni_pollable_raise(&s->writable); + } + nni_id_remove(&s->pipes, p->id); + nni_mtx_unlock(&s->mtx); +} + +static void +resp0_pipe_send_cb(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + resp0_ctx * ctx; + nni_aio * aio; + nni_msg * msg; + size_t len; + + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); + nni_pipe_close(p->npipe); + return; + } + nni_mtx_lock(&s->mtx); + p->busy = false; + if ((ctx = nni_list_first(&p->sendq)) == NULL) { + // Nothing else to send. + if (p->id == s->ctx.pipe_id) { + // Mark us ready for the other side to send! + nni_pollable_raise(&s->writable); + } + nni_mtx_unlock(&s->mtx); + return; + } + + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + ctx->spipe = NULL; + p->busy = true; + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->npipe, &p->aio_send); + + nni_mtx_unlock(&s->mtx); + + nni_aio_finish_sync(aio, 0, len); +} + +static void +resp0_cancel_recv(nni_aio *aio, void *arg, int rv) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + + nni_mtx_lock(&s->mtx); + if (ctx->raio == aio) { + nni_list_remove(&s->recvq, ctx); + ctx->raio = NULL; + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->mtx); +} + +static void +resp0_ctx_recv(void *arg, nni_aio *aio) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + resp0_pipe *p; + size_t len; + nni_msg * msg; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&s->mtx); + if ((p = nni_list_first(&s->recvpipes)) == NULL) { + int rv; + rv = nni_aio_schedule(aio, resp0_cancel_recv, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + // We cannot have two concurrent receive requests on the same + // context... + if (ctx->raio != NULL) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + ctx->raio = aio; + nni_list_append(&s->recvq, ctx); + nni_mtx_unlock(&s->mtx); + return; + } + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_list_remove(&s->recvpipes, p); + if (nni_list_empty(&s->recvpipes)) { + nni_pollable_clear(&s->readable); + } + nni_pipe_recv(p->npipe, &p->aio_recv); + + len = nni_msg_header_len(msg); + memcpy(ctx->btrace, nni_msg_header(msg), len); + ctx->btrace_len = len; + ctx->pipe_id = p->id; + if (ctx == &s->ctx) { + nni_pollable_raise(&s->writable); + } + nni_mtx_unlock(&s->mtx); + + nni_msg_header_clear(msg); + nni_aio_set_msg(aio, msg); + nni_aio_finish(aio, 0, nni_msg_len(msg)); +} + +static void +resp0_pipe_recv_cb(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + resp0_ctx * ctx; + nni_msg * msg; + nni_aio * aio; + int hops; + size_t len; + int ttl; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->npipe); + return; + } + + ttl = nni_atomic_get(&s->ttl); + msg = nni_aio_get_msg(&p->aio_recv); + nni_msg_set_pipe(msg, p->id); + + // Move backtrace from body to header + hops = 1; + for (;;) { + bool end = 0; + uint8_t *body; + + if (hops > ttl) { + goto drop; + } + hops++; + if (nni_msg_len(msg) < 4) { + // Peer is speaking garbage, kick it. + nni_msg_free(msg); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_close(p->npipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { + goto drop; + } + nni_msg_trim(msg, 4); + if (end) { + break; + } + } + + len = nni_msg_header_len(msg); + + nni_mtx_lock(&s->mtx); + + if (p->closed) { + // If pipe was closed, we just abandon the data from it. + nni_aio_set_msg(&p->aio_recv, NULL); + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + return; + } + if ((ctx = nni_list_first(&s->recvq)) == NULL) { + // No one blocked in recv, stall. + nni_list_append(&s->recvpipes, p); + nni_pollable_raise(&s->readable); + nni_mtx_unlock(&s->mtx); + return; + } + + nni_list_remove(&s->recvq, ctx); + aio = ctx->raio; + ctx->raio = NULL; + nni_aio_set_msg(&p->aio_recv, NULL); + + // Start the next receive. + nni_pipe_recv(p->npipe, &p->aio_recv); + + ctx->btrace_len = len; + memcpy(ctx->btrace, nni_msg_header(msg), len); + nni_msg_header_clear(msg); + ctx->pipe_id = p->id; + + if ((ctx == &s->ctx) && (!p->busy)) { + nni_pollable_raise(&s->writable); + } + nni_mtx_unlock(&s->mtx); + + nni_aio_set_msg(aio, msg); + nni_aio_finish_sync(aio, 0, nni_msg_len(msg)); + return; + +drop: + nni_msg_free(msg); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->npipe, &p->aio_recv); +} + +static int +resp0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + resp0_sock *s = arg; + int ttl; + int rv; + + if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) { + nni_atomic_set(&s->ttl, ttl); + } + return (rv); +} + +static int +resp0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + resp0_sock *s = arg; + return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); +} + +static int +resp0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + resp0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static int +resp0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + resp0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static void +resp0_sock_send(void *arg, nni_aio *aio) +{ + resp0_sock *s = arg; + + resp0_ctx_send(&s->ctx, aio); +} + +static void +resp0_sock_recv(void *arg, nni_aio *aio) +{ + resp0_sock *s = arg; + + resp0_ctx_recv(&s->ctx, aio); +} + +static nni_proto_pipe_ops resp0_pipe_ops = { + .pipe_size = sizeof(resp0_pipe), + .pipe_init = resp0_pipe_init, + .pipe_fini = resp0_pipe_fini, + .pipe_start = resp0_pipe_start, + .pipe_close = resp0_pipe_close, + .pipe_stop = resp0_pipe_stop, +}; + +static nni_proto_ctx_ops resp0_ctx_ops = { + .ctx_size = sizeof(resp0_ctx), + .ctx_init = resp0_ctx_init, + .ctx_fini = resp0_ctx_fini, + .ctx_send = resp0_ctx_send, + .ctx_recv = resp0_ctx_recv, +}; + +static nni_option resp0_sock_options[] = { + { + .o_name = NNG_OPT_MAXTTL, + .o_get = resp0_sock_get_max_ttl, + .o_set = resp0_sock_set_max_ttl, + }, + { + .o_name = NNG_OPT_RECVFD, + .o_get = resp0_sock_get_recvfd, + .o_set = NULL, + }, + { + .o_name = NNG_OPT_SENDFD, + .o_get = resp0_sock_get_sendfd, + .o_set = NULL, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops resp0_sock_ops = { + .sock_size = sizeof(resp0_sock), + .sock_init = resp0_sock_init, + .sock_fini = resp0_sock_fini, + .sock_open = resp0_sock_open, + .sock_close = resp0_sock_close, + .sock_send = resp0_sock_send, + .sock_recv = resp0_sock_recv, + .sock_options = resp0_sock_options, +}; + +static nni_proto resp0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &resp0_sock_ops, + .proto_pipe_ops = &resp0_pipe_ops, + .proto_ctx_ops = &resp0_ctx_ops, +}; + +int +nng_respondent0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &resp0_proto)); +} diff --git a/src/sp/protocol/survey0/respond_test.c b/src/sp/protocol/survey0/respond_test.c new file mode 100644 index 00000000..51844c76 --- /dev/null +++ b/src/sp/protocol/survey0/respond_test.c @@ -0,0 +1,586 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +void +test_resp_identity(void) +{ + nng_socket s; + int p; + char * n; + + NUTS_PASS(nng_respondent0_open(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NNG_RESPONDENT0_SELF); + NUTS_TRUE(nng_socket_get_int(s, NNG_OPT_PEER, &p) == 0); + NUTS_TRUE(p == NNG_RESPONDENT0_PEER); + NUTS_TRUE(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n) == 0); + NUTS_MATCH(n, NNG_RESPONDENT0_SELF_NAME); + nng_strfree(n); + NUTS_TRUE(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n) == 0); + NUTS_MATCH(n, NNG_RESPONDENT0_PEER_NAME); + nng_strfree(n); + NUTS_CLOSE(s); +} + +void +test_resp_send_bad_state(void) +{ + nng_socket resp; + nng_msg * msg = NULL; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_FAIL(nng_sendmsg(resp, msg, 0), NNG_ESTATE); + nng_msg_free(msg); + NUTS_CLOSE(resp); +} + +void +test_resp_poll_writeable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(resp, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not writable before connect. + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_MARRY(surv, resp); + + // Still not writable. + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // If we get a job, *then* we become writable + NUTS_SEND(surv, "abc"); + NUTS_RECV(resp, "abc"); + NUTS_TRUE(nuts_poll_fd(fd) == true); + + // And is no longer writable once we send a message + NUTS_SEND(resp, "def"); + NUTS_TRUE(nuts_poll_fd(fd) == false); + // Even after receiving it + NUTS_RECV(surv, "def"); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_resp_poll_readable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(resp, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(surv, resp); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(surv, "abc"); + NUTS_SLEEP(100); + + NUTS_TRUE(nuts_poll_fd(fd) == true); + + // and receiving makes it no longer ready + NUTS_PASS(nng_recvmsg(resp, &msg, 0)); + nng_msg_free(msg); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // TODO verify unsolicited response + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_resp_context_no_poll(void) +{ + int fd; + nng_socket resp; + nng_ctx ctx; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_ctx_open(&ctx, resp)); + NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); + NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(resp); +} + +void +test_resp_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char * addr; + + NUTS_ADDR(addr, "inproc"); + + NUTS_PASS(nng_respondent0_open(&s1)); + NUTS_PASS(nng_respondent0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + nng_stats_free(stats); +} + +void +test_resp_double_recv(void) +{ + nng_socket s1; + nng_aio * aio1; + nng_aio * aio2; + + NUTS_PASS(nng_respondent0_open(&s1)); + NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + NUTS_PASS(nng_aio_alloc(&aio2, NULL, NULL)); + + nng_recv_aio(s1, aio1); + nng_recv_aio(s1, aio2); + + nng_aio_wait(aio2); + NUTS_FAIL(nng_aio_result(aio2), NNG_ESTATE); + NUTS_CLOSE(s1); + NUTS_FAIL(nng_aio_result(aio1), NNG_ECLOSED); + nng_aio_free(aio1); + nng_aio_free(aio2); +} + +void +test_resp_close_pipe_before_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p; + nng_aio * aio1; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + + NUTS_MARRY(surv, resp); + NUTS_SEND(surv, "test"); + + nng_recv_aio(resp, aio1); + nng_aio_wait(aio1); + NUTS_PASS(nng_aio_result(aio1)); + NUTS_TRUE((m = nng_aio_get_msg(aio1)) != NULL); + p = nng_msg_get_pipe(m); + NUTS_PASS(nng_pipe_close(p)); + NUTS_PASS(nng_sendmsg(resp, m, 0)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); + nng_aio_free(aio1); +} + +void +test_resp_close_pipe_during_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p = NNG_PIPE_INITIALIZER; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 200)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_SENDBUF, 20)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_RECVBUF, 20)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_SENDBUF, 20)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_RECVBUF, 1)); + + NUTS_MARRY(surv, resp); + + for (int i = 0; i < 100; i++) { + int rv; + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, (unsigned) i | 0x80000000u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + p = nng_msg_get_pipe(m); + rv = nng_sendmsg(resp, m, 0); + if (rv == NNG_ETIMEDOUT) { + // Queue is backed up, senders are busy. + nng_msg_free(m); + break; + } + NUTS_PASS(rv); + } + NUTS_PASS(nng_pipe_close(p)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_resp_ctx_recv_aio_stopped(void) +{ + nng_socket resp; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_ctx_open(&ctx, resp)); + + nng_aio_stop(aio); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +void +test_resp_close_pipe_context_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p = NNG_PIPE_INITIALIZER; + nng_msg * m; + nng_ctx ctx[10]; + nng_aio * aio[10]; + int i; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_SENDBUF, 1)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_RECVBUF, 1)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_SENDBUF, 1)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_RECVBUF, 1)); + for (i = 0; i < 10; i++) { + NUTS_PASS(nng_ctx_open(&ctx[i], resp)); + NUTS_PASS(nng_aio_alloc(&aio[i], NULL, NULL)); + } + + NUTS_MARRY(surv, resp); + + for (i = 0; i < 10; i++) { + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, (unsigned) i | 0x80000000u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + nng_ctx_recv(ctx[i], aio[i]); + } + for (i = 0; i < 10; i++) { + nng_aio_wait(aio[i]); + NUTS_PASS(nng_aio_result(aio[i])); + NUTS_TRUE((m = nng_aio_get_msg(aio[i])) != NULL); + p = nng_msg_get_pipe(m); + nng_aio_set_msg(aio[i], m); + nng_ctx_send(ctx[i], aio[i]); + } + + // Note that SURVEYOR socket is not reading the results. + NUTS_PASS(nng_pipe_close(p)); + + for (i = 0; i < 10; i++) { + int rv; + nng_aio_wait(aio[i]); + rv = nng_aio_result(aio[i]); + if (rv != 0) { + NUTS_FAIL(rv, NNG_ECLOSED); + nng_msg_free(nng_aio_get_msg(aio[i])); + } + nng_aio_free(aio[i]); + NUTS_PASS(nng_ctx_close(ctx[i])); + } + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_resp_close_context_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_ctx ctx[10]; + nng_aio * aio[10]; + int i; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_SENDBUF, 1)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_RECVBUF, 1)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_SENDBUF, 1)); + NUTS_PASS(nng_setopt_int(surv, NNG_OPT_RECVBUF, 1)); + for (i = 0; i < 10; i++) { + NUTS_PASS(nng_ctx_open(&ctx[i], resp)); + NUTS_PASS(nng_aio_alloc(&aio[i], NULL, NULL)); + } + + NUTS_MARRY(surv, resp); + + for (i = 0; i < 10; i++) { + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, (unsigned) i | 0x80000000u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + nng_ctx_recv(ctx[i], aio[i]); + } + for (i = 0; i < 10; i++) { + nng_aio_wait(aio[i]); + NUTS_PASS(nng_aio_result(aio[i])); + NUTS_TRUE((m = nng_aio_get_msg(aio[i])) != NULL); + nng_aio_set_msg(aio[i], m); + nng_ctx_send(ctx[i], aio[i]); + } + + // Note that REQ socket is not reading the results. + for (i = 0; i < 10; i++) { + int rv; + NUTS_PASS(nng_ctx_close(ctx[i])); + nng_aio_wait(aio[i]); + rv = nng_aio_result(aio[i]); + if (rv != 0) { + NUTS_FAIL(rv, NNG_ECLOSED); + nng_msg_free(nng_aio_get_msg(aio[i])); + } + nng_aio_free(aio[i]); + } + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_resp_ctx_recv_nonblock(void) +{ + nng_socket resp; + nng_ctx ctx; + nng_aio * aio; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_ctx_open(&ctx, resp)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_recv(ctx, aio); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +static void +test_resp_ctx_send_nonblock(void) +{ + nng_socket resp; + nng_socket surv; + nng_ctx ctx; + nng_aio * aio; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_ctx_open(&ctx, resp)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_MARRY(surv, resp); + + NUTS_SEND(surv, "SEND"); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + // message carries over + msg = nng_aio_get_msg(aio); + nng_aio_set_msg(aio, msg); + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_send(ctx, aio); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); + nng_aio_free(aio); + nng_msg_free(msg); +} + +void +test_resp_recv_garbage(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 200)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + NUTS_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_resp_ttl_option(void) +{ + nng_socket resp; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_MAXTTL; + + NUTS_PASS(nng_respondent0_open(&resp)); + + NUTS_PASS(nng_setopt_int(resp, opt, 1)); + NUTS_FAIL(nng_setopt_int(resp, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(resp, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(resp, opt, 16), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(resp, opt, 256), NNG_EINVAL); + NUTS_PASS(nng_setopt_int(resp, opt, 3)); + NUTS_PASS(nng_socket_get_int(resp, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(resp, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_setopt(resp, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(resp, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_setopt_bool(resp, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(resp, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(resp); +} + +static void +test_resp_ttl_drop(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_setopt_int(resp, NNG_OPT_MAXTTL, 3)); + NUTS_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200)); + NUTS_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + // Send messages. Note that xrep implicitly adds a hop on receive. + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 2 hops + NUTS_PASS(nng_msg_append_u32(m, 0x80000001u)); + NUTS_PASS(nng_msg_append(m, "PASS1", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard! + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 3u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000002u)); + NUTS_PASS(nng_msg_append(m, "FAIL2", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 3 hops - passes + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000003u)); + NUTS_PASS(nng_msg_append(m, "PASS3", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard! + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 3u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000003u)); + NUTS_PASS(nng_msg_append(m, "FAIL4", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_RECV(resp, "PASS1"); + NUTS_RECV(resp, "PASS3"); + + NUTS_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(resp); + NUTS_CLOSE(surv); +} + +TEST_LIST = { + { "respond identity", test_resp_identity }, + { "respond send bad state", test_resp_send_bad_state }, + { "respond poll readable", test_resp_poll_readable }, + { "respond poll writable", test_resp_poll_writeable }, + { "respond context does not poll", test_resp_context_no_poll }, + { "respond validate peer", test_resp_validate_peer }, + { "respond double recv", test_resp_double_recv }, + { "respond close pipe before send", test_resp_close_pipe_before_send }, + { "respond close pipe during send", test_resp_close_pipe_during_send }, + { "respond recv aio ctx stopped", test_resp_ctx_recv_aio_stopped }, + { "respond close pipe context send", + test_resp_close_pipe_context_send }, + { "respond close context send", test_resp_close_context_send }, + { "respond context send nonblock", test_resp_ctx_send_nonblock }, + { "respond context recv nonblock", test_resp_ctx_recv_nonblock }, + { "respond recv garbage", test_resp_recv_garbage }, + { "respond ttl option", test_resp_ttl_option }, + { "respond ttl drop", test_resp_ttl_drop }, + { NULL, NULL }, +}; diff --git a/src/sp/protocol/survey0/survey.c b/src/sp/protocol/survey0/survey.c new file mode 100644 index 00000000..ce1ed601 --- /dev/null +++ b/src/sp/protocol/survey0/survey.c @@ -0,0 +1,663 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> + +#include "core/nng_impl.h" +#include "nng/protocol/survey0/survey.h" + +// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the +// survey pattern. This is useful for building service discovery, voting, etc. +// Note that this pattern is not optimized for extreme low latency, as it makes +// multiple use of queues for simplicity. Typically this is used in cases +// where a few dozen extra microseconds does not matter. + +typedef struct surv0_pipe surv0_pipe; +typedef struct surv0_sock surv0_sock; +typedef struct surv0_ctx surv0_ctx; + +static void surv0_pipe_send_cb(void *); +static void surv0_pipe_recv_cb(void *); +static void surv0_ctx_timeout(void *); + +struct surv0_ctx { + surv0_sock * sock; + uint32_t survey_id; // survey id + nni_timer_node timer; + nni_time expire; + nni_lmq recv_lmq; + nni_list recv_queue; + nni_atomic_int recv_buf; + nni_atomic_int survey_time; + int err; +}; + +// surv0_sock is our per-socket protocol private structure. +struct surv0_sock { + int ttl; + nni_list pipes; + nni_mtx mtx; + surv0_ctx ctx; + nni_id_map surveys; + nni_pollable writable; + nni_pollable readable; + nni_atomic_int send_buf; +}; + +// surv0_pipe is our per-pipe protocol private structure. +struct surv0_pipe { + nni_pipe * pipe; + surv0_sock * sock; + nni_lmq send_queue; + nni_list_node node; + nni_aio aio_send; + nni_aio aio_recv; + bool busy; + bool closed; +}; + +static void +surv0_ctx_abort(surv0_ctx *ctx, int err) +{ + nni_aio * aio; + surv0_sock *sock = ctx->sock; + + while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_finish_error(aio, err); + } + nni_lmq_flush(&ctx->recv_lmq); + if (ctx->survey_id != 0) { + nni_id_remove(&sock->surveys, ctx->survey_id); + ctx->survey_id = 0; + } + if (ctx == &sock->ctx) { + nni_pollable_clear(&sock->readable); + } +} + +static void +surv0_ctx_close(surv0_ctx *ctx) +{ + surv0_sock *sock = ctx->sock; + + nni_mtx_lock(&sock->mtx); + surv0_ctx_abort(ctx, NNG_ECLOSED); + nni_mtx_unlock(&sock->mtx); +} + +static void +surv0_ctx_fini(void *arg) +{ + surv0_ctx *ctx = arg; + + surv0_ctx_close(ctx); + nni_timer_cancel(&ctx->timer); + nni_lmq_fini(&ctx->recv_lmq); +} + +static int +surv0_ctx_init(void *c, void *s) +{ + surv0_ctx * ctx = c; + surv0_sock * sock = s; + int rv; + int len; + nng_duration tmo; + + nni_aio_list_init(&ctx->recv_queue); + nni_atomic_init(&ctx->recv_buf); + nni_atomic_init(&ctx->survey_time); + + if (ctx == &sock->ctx) { + len = 128; + tmo = NNI_SECOND; // survey timeout + } else { + len = nni_atomic_get(&sock->ctx.recv_buf); + tmo = nni_atomic_get(&sock->ctx.survey_time); + } + + nni_atomic_set(&ctx->recv_buf, len); + nni_atomic_set(&ctx->survey_time, tmo); + + ctx->sock = sock; + + if ((rv = nni_lmq_init(&ctx->recv_lmq, len)) != 0) { + surv0_ctx_fini(ctx); + return (rv); + } + nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx); + return (0); +} + +static void +surv0_ctx_cancel(nni_aio *aio, void *arg, int rv) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + nni_mtx_lock(&sock->mtx); + if (nni_list_active(&ctx->recv_queue, aio)) { + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_finish_error(aio, rv); + } + if (ctx->survey_id != 0) { + nni_id_remove(&sock->surveys, ctx->survey_id); + ctx->survey_id = 0; + } + nni_mtx_unlock(&sock->mtx); +} + +static void +surv0_ctx_recv(void *arg, nni_aio *aio) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + nni_msg * msg; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&sock->mtx); + if (ctx->survey_id == 0) { + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } +again: + if (nni_lmq_getq(&ctx->recv_lmq, &msg) != 0) { + int rv; + if ((rv = nni_aio_schedule(aio, &surv0_ctx_cancel, ctx)) != + 0) { + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&ctx->recv_queue, aio); + nni_mtx_unlock(&sock->mtx); + return; + } + if (nni_lmq_empty(&ctx->recv_lmq) && (ctx == &sock->ctx)) { + nni_pollable_clear(&sock->readable); + } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } + + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_msg(aio, msg); +} + +void +surv0_ctx_timeout(void *arg) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + + nni_mtx_lock(&sock->mtx); + if (nni_clock() < ctx->expire) { + nni_mtx_unlock(&sock->mtx); + return; + } + + // Abort any pending receives. + surv0_ctx_abort(ctx, NNG_ETIMEDOUT); + nni_mtx_unlock(&sock->mtx); +} + +static void +surv0_ctx_send(void *arg, nni_aio *aio) +{ + surv0_ctx * ctx = arg; + surv0_sock * sock = ctx->sock; + surv0_pipe * pipe; + nni_msg * msg = nni_aio_get_msg(aio); + size_t len = nni_msg_len(msg); + nni_time now = nni_clock(); + nng_duration survey_time; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + survey_time = nni_atomic_get(&ctx->survey_time); + + nni_mtx_lock(&sock->mtx); + + // Abort everything outstanding. + surv0_ctx_abort(ctx, NNG_ECANCELED); + nni_timer_cancel(&ctx->timer); + + // Allocate the new ID. + if ((rv = nni_id_alloc(&sock->surveys, &ctx->survey_id, ctx)) != 0) { + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_msg_header_clear(msg); + nni_msg_header_append_u32(msg, (uint32_t) ctx->survey_id); + + // From this point, we're committed to success. Note that we send + // regardless of whether there are any pipes or not. If no pipes, + // then it just gets discarded. + nni_aio_set_msg(aio, NULL); + NNI_LIST_FOREACH (&sock->pipes, pipe) { + + // if the pipe isn't busy, then send this message direct. + if (!pipe->busy) { + pipe->busy = true; + nni_msg_clone(msg); + nni_aio_set_msg(&pipe->aio_send, msg); + nni_pipe_send(pipe->pipe, &pipe->aio_send); + } else if (!nni_lmq_full(&pipe->send_queue)) { + nni_msg_clone(msg); + nni_lmq_putq(&pipe->send_queue, msg); + } + } + + ctx->expire = now + survey_time; + nni_timer_schedule(&ctx->timer, ctx->expire); + + nni_mtx_unlock(&sock->mtx); + nni_msg_free(msg); + + nni_aio_finish(aio, 0, len); +} + +static void +surv0_sock_fini(void *arg) +{ + surv0_sock *sock = arg; + + surv0_ctx_fini(&sock->ctx); + nni_id_map_fini(&sock->surveys); + nni_pollable_fini(&sock->writable); + nni_pollable_fini(&sock->readable); + nni_mtx_fini(&sock->mtx); +} + +static int +surv0_sock_init(void *arg, nni_sock *s) +{ + surv0_sock *sock = arg; + int rv; + + NNI_ARG_UNUSED(s); + + NNI_LIST_INIT(&sock->pipes, surv0_pipe, node); + nni_mtx_init(&sock->mtx); + nni_pollable_init(&sock->readable); + nni_pollable_init(&sock->writable); + // We are always writable. + nni_pollable_raise(&sock->writable); + + // We allow for some buffering on a per pipe basis, to allow for + // multiple contexts to have surveys outstanding. It is recommended + // to increase this if many contexts will want to publish + // at nearly the same time. + nni_atomic_init(&sock->send_buf); + nni_atomic_set(&sock->send_buf, 8); + + // Survey IDs are 32 bits, with the high order bit set. + // We start at a random point, to minimize likelihood of + // accidental collision across restarts. + nni_id_map_init(&sock->surveys, 0x80000000u, 0xffffffffu, true); + + if ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0) { + surv0_sock_fini(sock); + return (rv); + } + + sock->ttl = 8; + + return (0); +} + +static void +surv0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +surv0_sock_close(void *arg) +{ + surv0_sock *s = arg; + + surv0_ctx_close(&s->ctx); +} + +static void +surv0_pipe_stop(void *arg) +{ + surv0_pipe *p = arg; + + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); +} + +static void +surv0_pipe_fini(void *arg) +{ + surv0_pipe *p = arg; + + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); + nni_lmq_fini(&p->send_queue); +} + +static int +surv0_pipe_init(void *arg, nni_pipe *pipe, void *s) +{ + surv0_pipe *p = arg; + surv0_sock *sock = s; + int rv; + int len; + + len = nni_atomic_get(&sock->send_buf); + nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p); + + // This depth could be tunable. The deeper the queue, the more + // concurrent surveys that can be delivered (multiple contexts). + // Note that surveys can be *outstanding*, but not yet put on the wire. + if ((rv = nni_lmq_init(&p->send_queue, len)) != 0) { + surv0_pipe_fini(p); + return (rv); + } + + p->pipe = pipe; + p->sock = sock; + return (0); +} + +static int +surv0_pipe_start(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *s = p->sock; + + if (nni_pipe_peer(p->pipe) != NNG_SURVEYOR0_PEER) { + return (NNG_EPROTO); + } + + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + nni_mtx_unlock(&s->mtx); + + nni_pipe_recv(p->pipe, &p->aio_recv); + return (0); +} + +static void +surv0_pipe_close(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *s = p->sock; + + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); + + nni_mtx_lock(&s->mtx); + p->closed = true; + nni_lmq_flush(&p->send_queue); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); + } + nni_mtx_unlock(&s->mtx); +} + +static void +surv0_pipe_send_cb(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *sock = p->sock; + nni_msg * msg; + + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); + nni_pipe_close(p->pipe); + return; + } + + nni_mtx_lock(&sock->mtx); + if (p->closed) { + nni_mtx_unlock(&sock->mtx); + return; + } + if (nni_lmq_getq(&p->send_queue, &msg) == 0) { + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); + } else { + p->busy = false; + } + nni_mtx_unlock(&sock->mtx); +} + +static void +surv0_pipe_recv_cb(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *sock = p->sock; + surv0_ctx * ctx; + nni_msg * msg; + uint32_t id; + nni_aio * aio; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->pipe); + return; + } + + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + // We yank 4 bytes of body, and move them to the header. + if (nni_msg_len(msg) < 4) { + // Peer sent us garbage. Kick it. + nni_msg_free(msg); + nni_pipe_close(p->pipe); + return; + } + id = nni_msg_trim_u32(msg); + nni_msg_header_append_u32(msg, id); + + nni_mtx_lock(&sock->mtx); + // Best effort at delivery. Discard if no context or context is + // unable to receive it. + if (((ctx = nni_id_get(&sock->surveys, id)) == NULL) || + (nni_lmq_full(&ctx->recv_lmq))) { + nni_msg_free(msg); + } else if ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { + nni_list_remove(&ctx->recv_queue, aio); + nni_aio_finish_msg(aio, msg); + } else { + nni_lmq_putq(&ctx->recv_lmq, msg); + if (ctx == &sock->ctx) { + nni_pollable_raise(&sock->readable); + } + } + nni_mtx_unlock(&sock->mtx); + + nni_pipe_recv(p->pipe, &p->aio_recv); +} + +static int +surv0_ctx_set_survey_time( + void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + surv0_ctx * ctx = arg; + nng_duration expire; + int rv; + if ((rv = nni_copyin_ms(&expire, buf, sz, t)) == 0) { + nni_atomic_set(&ctx->survey_time, expire); + } + return (rv); +} + +static int +surv0_ctx_get_survey_time(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + surv0_ctx *ctx = arg; + return ( + nni_copyout_ms(nni_atomic_get(&ctx->survey_time), buf, szp, t)); +} + +static int +surv0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + surv0_sock *s = arg; + return (nni_copyin_int(&s->ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)); +} + +static int +surv0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + surv0_sock *s = arg; + return (nni_copyout_int(s->ttl, buf, szp, t)); +} + +static int +surv0_sock_set_survey_time( + void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + surv0_sock *s = arg; + return (surv0_ctx_set_survey_time(&s->ctx, buf, sz, t)); +} + +static int +surv0_sock_get_survey_time(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + surv0_sock *s = arg; + return (surv0_ctx_get_survey_time(&s->ctx, buf, szp, t)); +} + +static int +surv0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + surv0_sock *sock = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&sock->writable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static int +surv0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + surv0_sock *sock = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static void +surv0_sock_recv(void *arg, nni_aio *aio) +{ + surv0_sock *s = arg; + surv0_ctx_recv(&s->ctx, aio); +} + +static void +surv0_sock_send(void *arg, nni_aio *aio) +{ + surv0_sock *s = arg; + surv0_ctx_send(&s->ctx, aio); +} + +static nni_proto_pipe_ops surv0_pipe_ops = { + .pipe_size = sizeof(surv0_pipe), + .pipe_init = surv0_pipe_init, + .pipe_fini = surv0_pipe_fini, + .pipe_start = surv0_pipe_start, + .pipe_close = surv0_pipe_close, + .pipe_stop = surv0_pipe_stop, +}; + +static nni_option surv0_ctx_options[] = { + { + .o_name = NNG_OPT_SURVEYOR_SURVEYTIME, + .o_get = surv0_ctx_get_survey_time, + .o_set = surv0_ctx_set_survey_time, + }, + { + .o_name = NULL, + } +}; +static nni_proto_ctx_ops surv0_ctx_ops = { + .ctx_size = sizeof(surv0_ctx), + .ctx_init = surv0_ctx_init, + .ctx_fini = surv0_ctx_fini, + .ctx_send = surv0_ctx_send, + .ctx_recv = surv0_ctx_recv, + .ctx_options = surv0_ctx_options, +}; + +static nni_option surv0_sock_options[] = { + { + .o_name = NNG_OPT_SURVEYOR_SURVEYTIME, + .o_get = surv0_sock_get_survey_time, + .o_set = surv0_sock_set_survey_time, + }, + { + .o_name = NNG_OPT_MAXTTL, + .o_get = surv0_sock_get_max_ttl, + .o_set = surv0_sock_set_max_ttl, + }, + { + .o_name = NNG_OPT_RECVFD, + .o_get = surv0_sock_get_recv_fd, + }, + { + .o_name = NNG_OPT_SENDFD, + .o_get = surv0_sock_get_send_fd, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops surv0_sock_ops = { + .sock_size = sizeof(surv0_sock), + .sock_init = surv0_sock_init, + .sock_fini = surv0_sock_fini, + .sock_open = surv0_sock_open, + .sock_close = surv0_sock_close, + .sock_send = surv0_sock_send, + .sock_recv = surv0_sock_recv, + .sock_options = surv0_sock_options, +}; + +static nni_proto surv0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME }, + .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &surv0_sock_ops, + .proto_pipe_ops = &surv0_pipe_ops, + .proto_ctx_ops = &surv0_ctx_ops, +}; + +int +nng_surveyor0_open(nng_socket *sock) +{ + return (nni_proto_open(sock, &surv0_proto)); +} diff --git a/src/sp/protocol/survey0/survey_test.c b/src/sp/protocol/survey0/survey_test.c new file mode 100644 index 00000000..95d27adf --- /dev/null +++ b/src/sp/protocol/survey0/survey_test.c @@ -0,0 +1,626 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +static void +test_surv_identity(void) +{ + nng_socket s; + int p; + char * n; + + NUTS_PASS(nng_surveyor0_open(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NNG_SURVEYOR0_SELF); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); + NUTS_TRUE(p == NNG_SURVEYOR0_PEER); // 49 + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); + NUTS_MATCH(n, NNG_SURVEYOR0_SELF_NAME); + nng_strfree(n); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); + NUTS_MATCH(n, NNG_SURVEYOR0_PEER_NAME); + nng_strfree(n); + NUTS_CLOSE(s); +} + +static void +test_surv_ttl_option(void) +{ + nng_socket surv; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_MAXTTL; + + NUTS_PASS(nng_surveyor0_open(&surv)); + + NUTS_PASS(nng_socket_set_int(surv, opt, 1)); + NUTS_FAIL(nng_socket_set_int(surv, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(surv, opt, -1), NNG_EINVAL); + // This test will fail if the NNI_MAX_MAX_TTL is changed from the + // builtin default of 15. + NUTS_FAIL(nng_socket_set_int(surv, opt, 16), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(surv, opt, 256), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(surv, opt, 3)); + NUTS_PASS(nng_socket_get_int(surv, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(surv, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(surv, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(surv, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(surv, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(surv, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(surv); +} + +static void +test_surv_survey_time_option(void) +{ + nng_socket surv; + nng_duration d; + bool b; + size_t sz = sizeof(b); + const char * opt = NNG_OPT_SURVEYOR_SURVEYTIME; + + NUTS_PASS(nng_surveyor0_open(&surv)); + + NUTS_PASS(nng_socket_set_ms(surv, opt, 10)); + NUTS_FAIL(nng_socket_set(surv, opt, "", 1), NNG_EINVAL); + NUTS_FAIL(nng_socket_get(surv, opt, &b, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(surv, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(surv, opt, &b), NNG_EBADTYPE); + + NUTS_PASS(nng_socket_get_ms(surv, opt, &d)); + NUTS_TRUE(d == 10); + NUTS_CLOSE(surv); +} + +void +test_surv_recv_bad_state(void) +{ + nng_socket surv; + nng_msg * msg = NULL; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_FAIL(nng_recvmsg(surv, &msg, 0), NNG_ESTATE); + NUTS_TRUE(msg == NULL); + NUTS_CLOSE(surv); +} + +static void +test_surv_recv_garbage(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + uint32_t surv_id; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + + // The message will have a header that contains the 32-bit pipe ID, + // followed by the 32-bit request ID. We will discard the request + // ID before sending it out. + NUTS_TRUE(nng_msg_header_len(m) == 8); + NUTS_PASS(nng_msg_header_chop_u32(m, &surv_id)); + + NUTS_PASS(nng_sendmsg(resp, m, 0)); + NUTS_FAIL(nng_recvmsg(surv, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +#define SECOND 1000 + +void +test_surv_resp_exchange(void) +{ + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, SECOND)); + + NUTS_MARRY(resp, surv); + + NUTS_SEND(surv, "ping"); + NUTS_RECV(resp, "ping"); + NUTS_SEND(resp, "pong"); + NUTS_RECV(surv, "pong"); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_surv_cancel(void) +{ + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 5 * SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 5 * SECOND)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_SENDBUF, 16)); + + NUTS_MARRY(resp, surv); + + // Send req #1 (abc). + NUTS_SEND(surv, "abc"); + + // Sleep a bit. This is so that we ensure that our request gets + // to the far side. (If we cancel too fast, then our outgoing send + // will be canceled before it gets to the peer.) + NUTS_SLEEP(100); + + // Send the next next request ("def"). Note that + // the RESP side server will have already buffered the receive + // request, and should simply be waiting for us to reply to abc. + NUTS_SEND(surv, "def"); + + // Receive the first request (should be abc) on the REP server. + NUTS_RECV(resp, "abc"); + + // RESP sends the reply to first command. This will be discarded + // by the SURV socket. + NUTS_SEND(resp, "abc"); + + // Now get the next command from the REP; should be "def". + NUTS_RECV(resp, "def"); + + // And send it back to REQ. + NUTS_SEND(resp, "def"); + + // Try a req command. This should give back "def" + NUTS_RECV(surv, "def"); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_surv_cancel_abort_recv(void) +{ + nng_aio * aio; + nng_duration time = SECOND * 10; // 10s (kind of never) + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SURVEYOR_SURVEYTIME, time)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_SENDBUF, 16)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 5 * SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 5 * SECOND)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 5 * SECOND)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 5 * SECOND)); + + NUTS_MARRY(resp, surv); + + // Send survey #1 (abc). + NUTS_SEND(surv, "abc"); + + // Wait for it to get ot the other side. + NUTS_SLEEP(100); + + nng_aio_set_timeout(aio, 5 * SECOND); + nng_recv_aio(surv, aio); + + // Give time for this recv to post properly. + NUTS_SLEEP(100); + + // Send the next next request ("def"). Note that + // the respondent side server will have already buffered the receive + // request, and should simply be waiting for us to reply to + // abc. + NUTS_SEND(surv, "def"); + + // Our pending I/O should have been canceled. + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + + // Receive the first request (should be abc) on the respondent. + NUTS_RECV(resp, "abc"); + + // Respondent sends the reply to first survey. This will be + // discarded by the SURV socket. + NUTS_SEND(resp, "abc"); + + // Now get the next survey from the RESP; should be "def". + NUTS_RECV(resp, "def"); + + // And send it back to REQ. + NUTS_SEND(resp, "def"); + + // Try a req command. This should give back "def" + NUTS_RECV(surv, "def"); + + nng_aio_free(aio); + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_surv_cancel_post_recv(void) +{ + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_MARRY(surv, resp); + + NUTS_SEND(surv, "ONE"); + NUTS_RECV(resp, "ONE"); + NUTS_SEND(resp, "one"); + NUTS_SLEEP(100); // Make sure reply arrives! + NUTS_SEND(surv, "TWO"); + NUTS_RECV(resp, "TWO"); + NUTS_SEND(resp, "two"); + NUTS_RECV(surv, "two"); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_surv_poll_writeable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(surv, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Survey is broadcast, so we can always write. + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_MARRY(surv, resp); + + // Now it's writable. + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +void +test_surv_poll_readable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(surv, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(surv, resp); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_msg_append(msg, "xyz", 3)); + NUTS_PASS(nng_sendmsg(surv, msg, 0)); + NUTS_PASS(nng_recvmsg(resp, &msg, 0)); // recv on rep + NUTS_PASS(nng_sendmsg(resp, msg, 0)); // echo it back + NUTS_SLEEP(200); // give time for message to arrive + + NUTS_TRUE(nuts_poll_fd(fd) == true); + + // and receiving makes it no longer ready + NUTS_PASS(nng_recvmsg(surv, &msg, 0)); + nng_msg_free(msg); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // TODO verify unsolicited response + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_surv_ctx_no_poll(void) +{ + int fd; + nng_socket surv; + nng_ctx ctx; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_ctx_open(&ctx, surv)); + NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); + NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP); + NUTS_PASS(nng_ctx_close(ctx)); + NUTS_CLOSE(surv); +} + +static void +test_surv_ctx_recv_nonblock(void) +{ + nng_socket surv; + nng_socket resp; + nng_ctx ctx; + nng_aio * aio; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_ctx_open(&ctx, surv)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + + NUTS_MARRY(surv, resp); + + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_recv(ctx, aio); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +static void +test_surv_ctx_send_nonblock(void) +{ + nng_socket surv; + nng_ctx ctx; + nng_aio * aio; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_ctx_open(&ctx, surv)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + + nng_aio_set_msg(aio, msg); + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); // We never block + NUTS_CLOSE(surv); + nng_aio_free(aio); +} + +static void +test_surv_send_best_effort(void) +{ + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_MARRY(surv, resp); + + for (int i = 0; i < 200; i++) { + NUTS_SEND(surv, "junk"); + } + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_surv_survey_timeout(void) +{ + nng_socket surv; + nng_socket resp; + char buf[16]; + size_t sz; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SURVEYOR_SURVEYTIME, 50)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 100)); + + NUTS_MARRY(surv, resp); + + NUTS_SEND(surv, "hello"); + NUTS_RECV(resp, "hello"); + + sz = sizeof(buf); + NUTS_FAIL(nng_recv(surv, buf, &sz, 0), NNG_ETIMEDOUT); + NUTS_SEND(resp, "world"); + NUTS_FAIL(nng_recv(surv, buf, &sz, 0), NNG_ESTATE); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_surv_ctx_recv_close_socket(void) +{ + nng_socket surv; + nng_socket resp; + nng_ctx ctx; + nng_aio * aio; + nng_msg * m; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_ctx_open(&ctx, surv)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_MARRY(surv, resp); + NUTS_PASS(nng_msg_alloc(&m, 0)); + nng_aio_set_msg(aio, m); + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + + nng_ctx_recv(ctx, aio); + nng_close(surv); + + NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); + nng_aio_free(aio); + NUTS_CLOSE(resp); +} + +static void +test_surv_context_multi(void) +{ + nng_socket surv; + nng_socket resp; + nng_ctx c[5]; + nng_aio * aio; + nng_msg * m; + int cnt = sizeof(c) / sizeof(c[0]); + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_MARRY(surv, resp); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SURVEYOR_SURVEYTIME, 200)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + for (int i = 0; i < cnt; i++) { + NUTS_PASS(nng_ctx_open(&c[i], surv)); + } + + for (int i = 0; i < cnt; i++) { + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, i)); + nng_aio_set_msg(aio, m); + nng_ctx_send(c[i], aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + } + + for (int i = 0; i < cnt; i++) { + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + NUTS_PASS(nng_sendmsg(resp, m, 0)); + } + + for (int i = cnt - 1; i >= 0; i--) { + uint32_t x; + nng_ctx_recv(c[i], aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + m = nng_aio_get_msg(aio); + TEST_ASSERT(m != NULL); + NUTS_PASS(nng_msg_trim_u32(m, &x)); + NUTS_TRUE(x == (uint32_t) i); + nng_msg_free(m); + } + + for (int i = 0; i < cnt; i++) { + nng_ctx_recv(c[i], aio); + nng_aio_wait(aio); + NUTS_TRUE(nng_aio_result(aio) != 0); + } + for (int i = 0; i < cnt; i++) { + nng_ctx_close(c[i]); + } + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +static void +test_surv_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char * addr; + + NUTS_ADDR(addr, "inproc"); + NUTS_PASS(nng_surveyor0_open(&s1)); + NUTS_PASS(nng_surveyor0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_PASS(nng_close(s1)); + NUTS_PASS(nng_close(s2)); + nng_stats_free(stats); +} + +TEST_LIST = { + { "survey identity", test_surv_identity }, + { "survey ttl option", test_surv_ttl_option }, + { "survey survey time option", test_surv_survey_time_option }, + { "survey recv bad state", test_surv_recv_bad_state }, + { "survey recv garbage", test_surv_recv_garbage }, + { "survey respondent exchange", test_surv_resp_exchange }, + { "survey cancel", test_surv_cancel }, + { "survey cancel abort recv", test_surv_cancel_abort_recv }, + { "survey cancel post recv", test_surv_cancel_post_recv }, + { "survey poll writable", test_surv_poll_writeable }, + { "survey poll readable", test_surv_poll_readable }, + { "survey context does not poll", test_surv_ctx_no_poll }, + { "survey context recv close socket", + test_surv_ctx_recv_close_socket }, + { "survey context recv nonblock", test_surv_ctx_recv_nonblock }, + { "survey context send nonblock", test_surv_ctx_send_nonblock }, + { "survey timeout", test_surv_survey_timeout }, + { "survey send best effort", test_surv_send_best_effort }, + { "survey context multi", test_surv_context_multi }, + { "survey validate peer", test_surv_validate_peer }, + { NULL, NULL }, +}; diff --git a/src/sp/protocol/survey0/xrespond.c b/src/sp/protocol/survey0/xrespond.c new file mode 100644 index 00000000..b2f203c3 --- /dev/null +++ b/src/sp/protocol/survey0/xrespond.c @@ -0,0 +1,417 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> + +#include "core/nng_impl.h" +#include "nng/protocol/survey0/respond.h" + +// Respondent protocol. The RESPONDENT protocol is the "replier" side of +// the surveyor pattern. This is useful for building service discovery, or +// voting algorithms, for example. + +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif + +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif + +typedef struct xresp0_pipe xresp0_pipe; +typedef struct xresp0_sock xresp0_sock; + +static void xresp0_recv_cb(void *); +static void xresp0_putq_cb(void *); +static void xresp0_getq_cb(void *); +static void xresp0_send_cb(void *); +static void xresp0_sock_getq_cb(void *); +static void xresp0_pipe_fini(void *); + +// resp0_sock is our per-socket protocol private structure. +struct xresp0_sock { + nni_msgq * urq; + nni_msgq * uwq; + nni_atomic_int ttl; + nni_id_map pipes; + nni_aio aio_getq; + nni_mtx mtx; +}; + +// resp0_pipe is our per-pipe protocol private structure. +struct xresp0_pipe { + nni_pipe * npipe; + xresp0_sock *psock; + uint32_t id; + nni_msgq * sendq; + nni_aio aio_getq; + nni_aio aio_putq; + nni_aio aio_send; + nni_aio aio_recv; +}; + +static void +xresp0_sock_fini(void *arg) +{ + xresp0_sock *s = arg; + + nni_aio_fini(&s->aio_getq); + nni_id_map_fini(&s->pipes); + nni_mtx_fini(&s->mtx); +} + +static int +xresp0_sock_init(void *arg, nni_sock *nsock) +{ + xresp0_sock *s = arg; + + nni_mtx_init(&s->mtx); + nni_atomic_init(&s->ttl); + nni_atomic_set(&s->ttl, 8); // Per RFC + nni_id_map_init(&s->pipes, 0, 0, false); + nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s); + + s->urq = nni_sock_recvq(nsock); + s->uwq = nni_sock_sendq(nsock); + + return (0); +} + +static void +xresp0_sock_open(void *arg) +{ + xresp0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, &s->aio_getq); +} + +static void +xresp0_sock_close(void *arg) +{ + xresp0_sock *s = arg; + + nni_aio_close(&s->aio_getq); +} + +static void +xresp0_pipe_stop(void *arg) +{ + xresp0_pipe *p = arg; + + nni_aio_stop(&p->aio_putq); + nni_aio_stop(&p->aio_getq); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); +} + +static void +xresp0_pipe_fini(void *arg) +{ + xresp0_pipe *p = arg; + + nni_aio_fini(&p->aio_putq); + nni_aio_fini(&p->aio_getq); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); + nni_msgq_fini(p->sendq); +} + +static int +xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s) +{ + xresp0_pipe *p = arg; + int rv; + + nni_aio_init(&p->aio_putq, xresp0_putq_cb, p); + nni_aio_init(&p->aio_recv, xresp0_recv_cb, p); + nni_aio_init(&p->aio_getq, xresp0_getq_cb, p); + nni_aio_init(&p->aio_send, xresp0_send_cb, p); + + if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) { + xresp0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = s; + return (0); +} + +static int +xresp0_pipe_start(void *arg) +{ + xresp0_pipe *p = arg; + xresp0_sock *s = p->psock; + int rv; + + if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) { + return (NNG_EPROTO); + } + + p->id = nni_pipe_id(p->npipe); + + nni_mtx_lock(&s->mtx); + rv = nni_id_set(&s->pipes, p->id, p); + nni_mtx_unlock(&s->mtx); + if (rv != 0) { + return (rv); + } + + nni_pipe_recv(p->npipe, &p->aio_recv); + nni_msgq_aio_get(p->sendq, &p->aio_getq); + + return (rv); +} + +static void +xresp0_pipe_close(void *arg) +{ + xresp0_pipe *p = arg; + xresp0_sock *s = p->psock; + + nni_aio_close(&p->aio_putq); + nni_aio_close(&p->aio_getq); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); + + nni_msgq_close(p->sendq); + + nni_mtx_lock(&s->mtx); + nni_id_remove(&s->pipes, p->id); + nni_mtx_unlock(&s->mtx); +} + +// resp0_sock_send watches for messages from the upper write queue, +// extracts the destination pipe, and forwards it to the appropriate +// destination pipe via a separate queue. This prevents a single bad +// or slow pipe from gumming up the works for the entire socket.s + +void +xresp0_sock_getq_cb(void *arg) +{ + xresp0_sock *s = arg; + nni_msg * msg; + uint32_t id; + xresp0_pipe *p; + + if (nni_aio_result(&s->aio_getq) != 0) { + return; + } + msg = nni_aio_get_msg(&s->aio_getq); + nni_aio_set_msg(&s->aio_getq, NULL); + + // We yank the outgoing pipe id from the header + if (nni_msg_header_len(msg) < 4) { + nni_msg_free(msg); + // We can't really close down the socket, so just keep going. + nni_msgq_aio_get(s->uwq, &s->aio_getq); + return; + } + id = nni_msg_header_trim_u32(msg); + + nni_mtx_lock(&s->mtx); + // Look for the pipe, and attempt to put the message there + // (nonblocking) if we can. If we can't for any reason, then we + // free the message. + if (((p = nni_id_get(&s->pipes, id)) == NULL) || + (nni_msgq_tryput(p->sendq, msg) != 0)) { + nni_msg_free(msg); + } + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_get(s->uwq, &s->aio_getq); +} + +void +xresp0_getq_cb(void *arg) +{ + xresp0_pipe *p = arg; + + if (nni_aio_result(&p->aio_getq) != 0) { + nni_pipe_close(p->npipe); + return; + } + + nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq)); + nni_aio_set_msg(&p->aio_getq, NULL); + + nni_pipe_send(p->npipe, &p->aio_send); +} + +void +xresp0_send_cb(void *arg) +{ + xresp0_pipe *p = arg; + + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); + nni_pipe_close(p->npipe); + return; + } + + nni_msgq_aio_get(p->sendq, &p->aio_getq); +} + +static void +xresp0_recv_cb(void *arg) +{ + xresp0_pipe *p = arg; + xresp0_sock *s = p->psock; + nni_msgq * urq = s->urq; + nni_msg * msg; + int hops; + int ttl; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->npipe); + return; + } + + ttl = nni_atomic_get(&s->ttl); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_set_pipe(msg, p->id); + + // Store the pipe id in the header, first thing. + nni_msg_header_append_u32(msg, p->id); + + // Move backtrace from body to header + hops = 1; + for (;;) { + bool end; + uint8_t *body; + + if (hops > ttl) { + goto drop; + } + hops++; + if (nni_msg_len(msg) < 4) { + // Peer sent us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_close(p->npipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { + goto drop; + } + nni_msg_trim(msg, 4); + if (end) { + break; + } + } + + // Now send it up. + nni_aio_set_msg(&p->aio_putq, msg); + nni_msgq_aio_put(urq, &p->aio_putq); + return; + +drop: + nni_msg_free(msg); + nni_pipe_recv(p->npipe, &p->aio_recv); +} + +static void +xresp0_putq_cb(void *arg) +{ + xresp0_pipe *p = arg; + + if (nni_aio_result(&p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_putq)); + nni_aio_set_msg(&p->aio_putq, NULL); + nni_pipe_close(p->npipe); + return; + } + + nni_pipe_recv(p->npipe, &p->aio_recv); +} + +static int +xresp0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + xresp0_sock *s = arg; + int ttl; + int rv; + if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) { + nni_atomic_set(&s->ttl, ttl); + } + return (rv); +} + +static int +xresp0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + xresp0_sock *s = arg; + return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); +} + +static void +xresp0_sock_send(void *arg, nni_aio *aio) +{ + xresp0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static void +xresp0_sock_recv(void *arg, nni_aio *aio) +{ + xresp0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static nni_proto_pipe_ops xresp0_pipe_ops = { + .pipe_size = sizeof(xresp0_pipe), + .pipe_init = xresp0_pipe_init, + .pipe_fini = xresp0_pipe_fini, + .pipe_start = xresp0_pipe_start, + .pipe_close = xresp0_pipe_close, + .pipe_stop = xresp0_pipe_stop, +}; + +static nni_option xresp0_sock_options[] = { + { + .o_name = NNG_OPT_MAXTTL, + .o_get = xresp0_sock_get_maxttl, + .o_set = xresp0_sock_set_maxttl, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops xresp0_sock_ops = { + .sock_size = sizeof(xresp0_sock), + .sock_init = xresp0_sock_init, + .sock_fini = xresp0_sock_fini, + .sock_open = xresp0_sock_open, + .sock_close = xresp0_sock_close, + .sock_send = xresp0_sock_send, + .sock_recv = xresp0_sock_recv, + .sock_options = xresp0_sock_options, +}; + +static nni_proto xresp0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &xresp0_sock_ops, + .proto_pipe_ops = &xresp0_pipe_ops, +}; + +int +nng_respondent0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &xresp0_proto)); +} diff --git a/src/sp/protocol/survey0/xrespond_test.c b/src/sp/protocol/survey0/xrespond_test.c new file mode 100644 index 00000000..ec5e99a3 --- /dev/null +++ b/src/sp/protocol/survey0/xrespond_test.c @@ -0,0 +1,436 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +static void +test_xresp_identity(void) +{ + nng_socket s; + int p1, p2; + char * n1; + char * n2; + + NUTS_PASS(nng_respondent0_open_raw(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p1)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p2)); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n1)); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n2)); + NUTS_CLOSE(s); + NUTS_TRUE(p1 == NNG_RESPONDENT0_SELF); + NUTS_TRUE(p2 == NNG_RESPONDENT0_PEER); + NUTS_MATCH(n1, NNG_RESPONDENT0_SELF_NAME); + NUTS_MATCH(n2, NNG_RESPONDENT0_PEER_NAME); + nng_strfree(n1); + nng_strfree(n2); +} + +static void +test_xresp_raw(void) +{ + nng_socket s; + bool b; + + NUTS_PASS(nng_respondent0_open_raw(&s)); + NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); + NUTS_TRUE(b); + NUTS_CLOSE(s); +} + +static void +test_xresp_no_context(void) +{ + nng_socket s; + nng_ctx ctx; + + NUTS_PASS(nng_respondent0_open_raw(&s)); + NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP); + NUTS_CLOSE(s); +} + +static void +test_xresp_poll_writeable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_socket_get_int(resp, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // We are always writeable, even before connect. This is so that + // back-pressure from a bad peer can't trash others. We assume + // that peers won't send us requests faster than they can consume + // the answers. If they do, they will lose their answers. + NUTS_TRUE(nuts_poll_fd(fd) == true); + + NUTS_MARRY(surv, resp); + + // Now it's writable. + NUTS_TRUE(nuts_poll_fd(fd) == true); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_poll_readable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_socket_get_int(resp, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(surv, resp); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(surv, "abc"); + NUTS_SLEEP(100); + + NUTS_TRUE(nuts_poll_fd(fd) == true); + + // and receiving makes it no longer ready + NUTS_PASS(nng_recvmsg(resp, &msg, 0)); + nng_msg_free(msg); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char * addr; + + NUTS_ADDR(addr, "inproc"); + + NUTS_PASS(nng_respondent0_open_raw(&s1)); + NUTS_PASS(nng_respondent0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + nng_stats_free(stats); +} + +static void +test_xresp_close_pipe_before_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p; + nng_aio * aio1; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open(&surv)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + + NUTS_MARRY(surv, resp); + NUTS_SEND(surv, "test"); + + nng_recv_aio(resp, aio1); + nng_aio_wait(aio1); + NUTS_PASS(nng_aio_result(aio1)); + NUTS_TRUE((m = nng_aio_get_msg(aio1)) != NULL); + p = nng_msg_get_pipe(m); + NUTS_PASS(nng_pipe_close(p)); + NUTS_PASS(nng_sendmsg(resp, m, 0)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); + nng_aio_free(aio1); +} + +static void +test_xresp_close_pipe_during_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p; + nng_msg * m; + + NUTS_PASS(nng_respondent_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 200)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_SENDBUF, 20)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_RECVBUF, 20)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_SENDBUF, 20)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_RECVBUF, 1)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, (unsigned) 0x81000000u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + p = nng_msg_get_pipe(m); + nng_msg_free(m); + + for (int i = 0; i < 100; i++) { + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p))); + NUTS_PASS( + nng_msg_header_append_u32(m, (unsigned) i | 0x80000000u)); + // protocol does not exert back-pressure + NUTS_PASS(nng_sendmsg(resp, m, 0)); + } + NUTS_PASS(nng_pipe_close(p)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_close_during_recv(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_RECVBUF, 5)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_SENDBUF, 20)); + + NUTS_MARRY(surv, resp); + + for (unsigned i = 0; i < 100; i++) { + int rv; + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_header_append_u32(m, i | 0x80000000u)); + rv = nng_sendmsg(surv, m, 0); + if (rv == NNG_ETIMEDOUT) { + nng_msg_free(m); + break; + } + } + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_recv_aio_stopped(void) +{ + nng_socket resp; + nng_aio * aio; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_stop(aio); + nng_recv_aio(resp, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_CLOSE(resp); + nng_aio_free(aio); +} + +static void +test_xresp_send_no_header(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_sendmsg(resp, m, 0)); + NUTS_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_recv_garbage(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + NUTS_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xresp_ttl_option(void) +{ + nng_socket resp; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_MAXTTL; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + + NUTS_PASS(nng_socket_set_int(resp, opt, 1)); + NUTS_FAIL(nng_socket_set_int(resp, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(resp, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(resp, opt, 16), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(resp, opt, 256), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(resp, opt, 3)); + NUTS_PASS(nng_socket_get_int(resp, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(resp, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(resp, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(resp, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(resp, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(resp, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(resp); +} + +static void +test_xresp_ttl_drop(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_MAXTTL, 3)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 200)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + // Send messages. Note that xresp implicitly adds a hop on receive. + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 2 hops + NUTS_PASS(nng_msg_append_u32(m, 0x80000001u)); + NUTS_PASS(nng_msg_append(m, "PASS1", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard! + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 3u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000002u)); + NUTS_PASS(nng_msg_append(m, "FAIL2", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 3 hops - passes + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000003u)); + NUTS_PASS(nng_msg_append(m, "PASS3", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard! + NUTS_PASS(nng_msg_append_u32(m, 2u)); + NUTS_PASS(nng_msg_append_u32(m, 3u)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000003u)); + NUTS_PASS(nng_msg_append(m, "FAIL4", 6)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + // So on receive we should see 80000001 and 80000003. + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + NUTS_TRUE(nng_msg_header_len(m) == 12); + NUTS_TRUE(nng_msg_len(m) == 6); + NUTS_MATCH(nng_msg_body(m), "PASS1"); + nng_msg_free(m); + + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + NUTS_TRUE(nng_msg_header_len(m) == 16); // 3 hops + ID + NUTS_TRUE(nng_msg_len(m) == 6); + NUTS_MATCH(nng_msg_body(m), "PASS3"); + nng_msg_free(m); + + NUTS_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +NUTS_TESTS = { + { "xrespond identity", test_xresp_identity }, + { "xrespond raw", test_xresp_raw }, + { "xrespond no context", test_xresp_no_context }, + { "xrespond poll readable", test_xresp_poll_readable }, + { "xrespond poll writable", test_xresp_poll_writeable }, + { "xrespond validate peer", test_xresp_validate_peer }, + { "xrespond close pipe before send", + test_xresp_close_pipe_before_send }, + { "xrespond close pipe during send", + test_xresp_close_pipe_during_send }, + { "xrespond close during recv", test_xresp_close_during_recv }, + { "xrespond recv aio stopped", test_xresp_recv_aio_stopped }, + { "xrespond send no header", test_xresp_send_no_header }, + { "xrespond recv garbage", test_xresp_recv_garbage }, + { "xrespond ttl option", test_xresp_ttl_option }, + { "xrespond ttl drop", test_xresp_ttl_drop }, + { NULL, NULL }, +}; diff --git a/src/sp/protocol/survey0/xsurvey.c b/src/sp/protocol/survey0/xsurvey.c new file mode 100644 index 00000000..2a198662 --- /dev/null +++ b/src/sp/protocol/survey0/xsurvey.c @@ -0,0 +1,379 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" +#include "nng/protocol/survey0/survey.h" + +// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the +// survey pattern. This is useful for building service discovery, voting, etc. + +typedef struct xsurv0_pipe xsurv0_pipe; +typedef struct xsurv0_sock xsurv0_sock; + +static void xsurv0_sock_getq_cb(void *); +static void xsurv0_getq_cb(void *); +static void xsurv0_putq_cb(void *); +static void xsurv0_send_cb(void *); +static void xsurv0_recv_cb(void *); + +// surv0_sock is our per-socket protocol private structure. +struct xsurv0_sock { + nni_list pipes; + nni_aio aio_getq; + nni_msgq * uwq; + nni_msgq * urq; + nni_mtx mtx; + nni_atomic_int ttl; +}; + +// surv0_pipe is our per-pipe protocol private structure. +struct xsurv0_pipe { + nni_pipe * npipe; + xsurv0_sock * psock; + nni_msgq * sendq; + nni_list_node node; + nni_aio aio_getq; + nni_aio aio_putq; + nni_aio aio_send; + nni_aio aio_recv; +}; + +static void +xsurv0_sock_fini(void *arg) +{ + xsurv0_sock *s = arg; + + nni_aio_fini(&s->aio_getq); + nni_mtx_fini(&s->mtx); +} + +static int +xsurv0_sock_init(void *arg, nni_sock *nsock) +{ + xsurv0_sock *s = arg; + + nni_aio_init(&s->aio_getq, xsurv0_sock_getq_cb, s); + NNI_LIST_INIT(&s->pipes, xsurv0_pipe, node); + nni_mtx_init(&s->mtx); + + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + nni_atomic_init(&s->ttl); + nni_atomic_set(&s->ttl, 8); + + return (0); +} + +static void +xsurv0_sock_open(void *arg) +{ + xsurv0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, &s->aio_getq); +} + +static void +xsurv0_sock_close(void *arg) +{ + xsurv0_sock *s = arg; + + nni_aio_close(&s->aio_getq); +} + +static void +xsurv0_pipe_stop(void *arg) +{ + xsurv0_pipe *p = arg; + + nni_aio_stop(&p->aio_getq); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); + nni_aio_stop(&p->aio_putq); +} + +static void +xsurv0_pipe_fini(void *arg) +{ + xsurv0_pipe *p = arg; + + nni_aio_fini(&p->aio_getq); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); + nni_aio_fini(&p->aio_putq); + nni_msgq_fini(p->sendq); +} + +static int +xsurv0_pipe_init(void *arg, nni_pipe *npipe, void *s) +{ + xsurv0_pipe *p = arg; + int rv; + + nni_aio_init(&p->aio_getq, xsurv0_getq_cb, p); + nni_aio_init(&p->aio_putq, xsurv0_putq_cb, p); + nni_aio_init(&p->aio_send, xsurv0_send_cb, p); + nni_aio_init(&p->aio_recv, xsurv0_recv_cb, p); + + // This depth could be tunable. The queue exists so that if we + // have multiple requests coming in faster than we can deliver them, + // we try to avoid dropping them. We don't really have a solution + // for applying back pressure. It would be nice if surveys carried + // an expiration with them, so that we could discard any that are + // not delivered before their expiration date. + if ((rv = nni_msgq_init(&p->sendq, 16)) != 0) { + xsurv0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = s; + return (0); +} + +static int +xsurv0_pipe_start(void *arg) +{ + xsurv0_pipe *p = arg; + xsurv0_sock *s = p->psock; + + if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) { + return (NNG_EPROTO); + } + + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + nni_mtx_unlock(&s->mtx); + + nni_msgq_aio_get(p->sendq, &p->aio_getq); + nni_pipe_recv(p->npipe, &p->aio_recv); + return (0); +} + +static void +xsurv0_pipe_close(void *arg) +{ + xsurv0_pipe *p = arg; + xsurv0_sock *s = p->psock; + + nni_aio_close(&p->aio_getq); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); + nni_aio_close(&p->aio_putq); + + nni_msgq_close(p->sendq); + + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); + } + nni_mtx_unlock(&s->mtx); +} + +static void +xsurv0_getq_cb(void *arg) +{ + xsurv0_pipe *p = arg; + + if (nni_aio_result(&p->aio_getq) != 0) { + nni_pipe_close(p->npipe); + return; + } + + nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq)); + nni_aio_set_msg(&p->aio_getq, NULL); + + nni_pipe_send(p->npipe, &p->aio_send); +} + +static void +xsurv0_send_cb(void *arg) +{ + xsurv0_pipe *p = arg; + + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); + nni_pipe_close(p->npipe); + return; + } + + nni_msgq_aio_get(p->sendq, &p->aio_getq); +} + +static void +xsurv0_putq_cb(void *arg) +{ + xsurv0_pipe *p = arg; + + if (nni_aio_result(&p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_putq)); + nni_aio_set_msg(&p->aio_putq, NULL); + nni_pipe_close(p->npipe); + return; + } + + nni_pipe_recv(p->npipe, &p->aio_recv); +} + +static void +xsurv0_recv_cb(void *arg) +{ + xsurv0_pipe *p = arg; + nni_msg * msg; + bool end; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->npipe); + return; + } + + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); + end = false; + + while (!end) { + uint8_t *body; + + if (nni_msg_len(msg) < 4) { + // Peer gave us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_close(p->npipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + + if (nni_msg_header_append(msg, body, sizeof(uint32_t)) != 0) { + // TODO: bump a no-memory stat + nni_msg_free(msg); + // Closing the pipe may release some memory. + // It at least gives an indication to the peer + // that we've lost the message. + nni_pipe_close(p->npipe); + return; + } + nni_msg_trim(msg, sizeof(uint32_t)); + } + + nni_aio_set_msg(&p->aio_putq, msg); + nni_msgq_aio_put(p->psock->urq, &p->aio_putq); +} + +static int +xsurv0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + xsurv0_sock *s = arg; + int ttl; + int rv; + if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) { + nni_atomic_set(&s->ttl, ttl); + } + return (rv); +} + +static int +xsurv0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + xsurv0_sock *s = arg; + return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); +} + +static void +xsurv0_sock_getq_cb(void *arg) +{ + xsurv0_sock *s = arg; + xsurv0_pipe *p; + nni_msg * msg; + + if (nni_aio_result(&s->aio_getq) != 0) { + // Should be NNG_ECLOSED. + return; + } + msg = nni_aio_get_msg(&s->aio_getq); + nni_aio_set_msg(&s->aio_getq, NULL); + + nni_mtx_lock(&s->mtx); + NNI_LIST_FOREACH (&s->pipes, p) { + nni_msg_clone(msg); + if (nni_msgq_tryput(p->sendq, msg) != 0) { + nni_msg_free(msg); + } + } + + nni_msgq_aio_get(s->uwq, &s->aio_getq); + nni_mtx_unlock(&s->mtx); + + // If there were no pipes to send on, just toss the message. + nni_msg_free(msg); +} + +static void +xsurv0_sock_recv(void *arg, nni_aio *aio) +{ + xsurv0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static void +xsurv0_sock_send(void *arg, nni_aio *aio) +{ + xsurv0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static nni_proto_pipe_ops xsurv0_pipe_ops = { + .pipe_size = sizeof(xsurv0_pipe), + .pipe_init = xsurv0_pipe_init, + .pipe_fini = xsurv0_pipe_fini, + .pipe_start = xsurv0_pipe_start, + .pipe_close = xsurv0_pipe_close, + .pipe_stop = xsurv0_pipe_stop, +}; + +static nni_option xsurv0_sock_options[] = { + { + .o_name = NNG_OPT_MAXTTL, + .o_get = xsurv0_sock_get_max_ttl, + .o_set = xsurv0_sock_set_max_ttl, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops xsurv0_sock_ops = { + .sock_size = sizeof(xsurv0_sock), + .sock_init = xsurv0_sock_init, + .sock_fini = xsurv0_sock_fini, + .sock_open = xsurv0_sock_open, + .sock_close = xsurv0_sock_close, + .sock_send = xsurv0_sock_send, + .sock_recv = xsurv0_sock_recv, + .sock_options = xsurv0_sock_options, +}; + +static nni_proto xsurv0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME }, + .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &xsurv0_sock_ops, + .proto_pipe_ops = &xsurv0_pipe_ops, +}; + +int +nng_surveyor0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &xsurv0_proto)); +} diff --git a/src/sp/protocol/survey0/xsurvey_test.c b/src/sp/protocol/survey0/xsurvey_test.c new file mode 100644 index 00000000..f8e9d401 --- /dev/null +++ b/src/sp/protocol/survey0/xsurvey_test.c @@ -0,0 +1,399 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +static void +test_xsurveyor_identity(void) +{ + nng_socket s; + int p; + char * n; + + NUTS_PASS(nng_surveyor0_open_raw(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NNG_SURVEYOR0_SELF); // 0x62 + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); + NUTS_TRUE(p == NNG_SURVEYOR0_PEER); // 0x62 + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); + NUTS_MATCH(n, NNG_SURVEYOR0_SELF_NAME); + nng_strfree(n); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); + NUTS_MATCH(n, NNG_SURVEYOR0_PEER_NAME); + nng_strfree(n); + NUTS_CLOSE(s); +} + +static void +test_xsurveyor_raw(void) +{ + nng_socket s; + bool b; + + NUTS_PASS(nng_surveyor0_open_raw(&s)); + NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); + NUTS_TRUE(b); + NUTS_CLOSE(s); +} + +static void +test_xsurvey_no_context(void) +{ + nng_socket s; + nng_ctx ctx; + + NUTS_PASS(nng_surveyor0_open_raw(&s)); + NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP); + NUTS_CLOSE(s); +} + +static void +test_xsurvey_poll_writeable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(surv, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Survey is broadcast, so we can always write. + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_MARRY(surv, resp); + + // Now it's writable. + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_poll_readable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + nng_msg * msg; + + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_respondent0_open(&resp)); + NUTS_PASS(nng_socket_get_int(surv, NNG_OPT_RECVFD, &fd)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(surv, resp); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_PASS(nng_msg_alloc(&msg, 0)); + // Request ID + NUTS_PASS(nng_msg_append_u32(msg, 0x80000000)); + NUTS_PASS(nng_sendmsg(surv, msg, 0)); + + NUTS_PASS(nng_recvmsg(resp, &msg, 0)); + NUTS_PASS(nng_sendmsg(resp, msg, 0)); + + NUTS_SLEEP(100); + + NUTS_TRUE(nuts_poll_fd(fd) ); + + // and receiving makes it no longer ready + NUTS_PASS(nng_recvmsg(surv, &msg, 0)); + nng_msg_free(msg); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char *addr; + + NUTS_ADDR(addr, "inproc"); + + NUTS_PASS(nng_surveyor0_open_raw(&s1)); + NUTS_PASS(nng_surveyor0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + nng_stats_free(stats); +} + +static void +test_xsurvey_recv_aio_stopped(void) +{ + nng_socket surv; + nng_aio * aio; + + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_stop(aio); + nng_recv_aio(surv, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_CLOSE(surv); + nng_aio_free(aio); +} + +static void +test_xsurvey_recv_garbage(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + uint32_t req_id; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY(surv, resp); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_append_u32(m, 0x80000000)); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + + NUTS_PASS(nng_recvmsg(resp, &m, 0)); + + // The message will have a header that contains the 32-bit pipe ID, + // followed by the 32-bit request ID. We will discard the request + // ID before sending it out. + NUTS_TRUE(nng_msg_header_len(m) == 8); + NUTS_PASS(nng_msg_header_chop_u32(m, &req_id)); + NUTS_TRUE(req_id == 0x80000000); + + NUTS_PASS(nng_sendmsg(resp, m, 0)); + NUTS_FAIL(nng_recvmsg(surv, &m, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_recv_header(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_pipe p; + uint32_t id; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + + NUTS_MARRY_EX(surv, resp, NULL, NULL, &p); + + // Simulate a few hops. + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p))); + NUTS_PASS(nng_msg_header_append_u32(m, 0x2)); + NUTS_PASS(nng_msg_header_append_u32(m, 0x1)); + NUTS_PASS(nng_msg_header_append_u32(m, 0x80000123u)); + + NUTS_PASS(nng_sendmsg(resp, m, 0)); + + NUTS_PASS(nng_recvmsg(surv, &m, 0)); + NUTS_TRUE(nng_msg_header_len(m) == 12); + NUTS_PASS(nng_msg_header_trim_u32(m, &id)); + NUTS_TRUE(id == 0x2); + NUTS_PASS(nng_msg_header_trim_u32(m, &id)); + NUTS_TRUE(id == 0x1); + NUTS_PASS(nng_msg_header_trim_u32(m, &id)); + NUTS_TRUE(id == 0x80000123u); + + nng_msg_free(m); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_close_during_recv(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_pipe p1; + nng_pipe p2; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_RECVBUF, 1)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_SENDBUF, 20)); + + NUTS_MARRY_EX(surv, resp, NULL, &p1, &p2); + NUTS_TRUE(nng_pipe_id(p1) > 0); + NUTS_TRUE(nng_pipe_id(p2) > 0); + + for (unsigned i = 0; i < 20; i++) { + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2))); + NUTS_PASS(nng_msg_header_append_u32(m, i | 0x80000000u)); + NUTS_SLEEP(10); + NUTS_PASS(nng_sendmsg(resp, m, 0)); + } + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_close_pipe_during_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_pipe p1; + nng_pipe p2; + + NUTS_PASS(nng_respondent0_open_raw(&resp)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_socket_set_int(resp, NNG_OPT_RECVBUF, 5)); + NUTS_PASS(nng_socket_set_int(surv, NNG_OPT_SENDBUF, 20)); + + NUTS_MARRY_EX(surv, resp, NULL, &p1, &p2); + NUTS_TRUE(nng_pipe_id(p1) > 0); + NUTS_TRUE(nng_pipe_id(p2) > 0); + + for (unsigned i = 0; i < 20; i++) { + NUTS_PASS(nng_msg_alloc(&m, 4)); + NUTS_PASS(nng_msg_header_append_u32(m, i | 0x80000000u)); + NUTS_SLEEP(10); + NUTS_PASS(nng_sendmsg(surv, m, 0)); + } + + NUTS_PASS(nng_pipe_close(p1)); + NUTS_CLOSE(surv); + NUTS_CLOSE(resp); +} + +static void +test_xsurvey_ttl_option(void) +{ + nng_socket s; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_MAXTTL; + + NUTS_PASS(nng_surveyor0_open_raw(&s)); + + NUTS_PASS(nng_socket_set_int(s, opt, 1)); + NUTS_FAIL(nng_socket_set_int(s, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s, opt, 16), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s, opt, 256), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(s, opt, 3)); + NUTS_PASS(nng_socket_get_int(s, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(s, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(s, opt, "", 1) , NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(s, opt, &v, &sz) , NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(s, opt, true) , NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(s, opt, &b) , NNG_EBADTYPE); + + NUTS_CLOSE(s); +} + +static void +test_xsurvey_broadcast(void) +{ + nng_socket resp1; + nng_socket resp2; + nng_socket surv; + nng_msg * m; + + NUTS_PASS(nng_respondent0_open(&resp1)); + NUTS_PASS(nng_respondent0_open(&resp2)); + NUTS_PASS(nng_surveyor0_open_raw(&surv)); + NUTS_PASS(nng_socket_set_ms(resp1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(resp2, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(surv, NNG_OPT_SENDTIMEO, 100)); + + NUTS_MARRY(surv, resp1); + NUTS_MARRY(surv, resp2); + + NUTS_PASS(nng_msg_alloc(&m, 0)); + NUTS_PASS(nng_msg_header_append_u32(m, 0x80000002u)); + NUTS_PASS(nng_msg_append(m, "hello", 6)); + + NUTS_PASS(nng_sendmsg(surv, m, 0)); + NUTS_RECV(resp1, "hello"); + NUTS_RECV(resp2, "hello"); + + NUTS_CLOSE(surv); + NUTS_CLOSE(resp1); + NUTS_CLOSE(resp2); +} + +TEST_LIST = { + { "xsurvey identity", test_xsurveyor_identity }, + { "xsurvey raw", test_xsurveyor_raw }, + { "xsurvey no context", test_xsurvey_no_context }, + { "xsurvey poll readable", test_xsurvey_poll_readable }, + { "xsurvey poll writable", test_xsurvey_poll_writeable }, + { "xsurvey validate peer", test_xsurvey_validate_peer }, + { "xsurvey recv aio stopped", test_xsurvey_recv_aio_stopped }, + { "xsurvey recv garbage", test_xsurvey_recv_garbage }, + { "xsurvey recv header", test_xsurvey_recv_header }, + { "xsurvey close during recv", test_xsurvey_close_during_recv }, + { "xsurvey close pipe during send", + test_xsurvey_close_pipe_during_send }, + { "xsurvey ttl option", test_xsurvey_ttl_option }, + { "xsurvey broadcast", test_xsurvey_broadcast }, + { NULL, NULL }, +}; |
