diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-31 13:06:38 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-11-02 16:10:26 -0700 |
| commit | 7bf591e20a94b8d926f92ab9b320f3b75d342345 (patch) | |
| tree | d67ce7cab328a004346419047feede7d579dad77 /src/protocol/survey0 | |
| parent | d340af7dc250388f48d36c5078c4857c51bb6121 (diff) | |
| download | nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.tar.gz nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.tar.bz2 nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.zip | |
fixes #143 Protocols and transports should be "configurable"
This makes all the protocols and transports optional. All
of them except ZeroTier are enabled by default, but you can
now disable them (remove from the build) with cmake options.
The test suite is modified so that tests still run as much
as they can, but skip over things caused by missing functionality
from the library (due to configuration).
Further, the constant definitions and prototypes for functions
that are specific to transports or protocols are moved into
appropriate headers, which should be included directly by
applications wishing to use these.
We have also added and improved documentation -- all of the
transports are documented, and several more man pages for
protocols have been added. (Req/Rep and Surveyor are still
missing.)
Diffstat (limited to 'src/protocol/survey0')
| -rw-r--r-- | src/protocol/survey0/CMakeLists.txt | 23 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.c | 509 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.h | 28 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c | 484 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.h | 30 |
5 files changed, 1074 insertions, 0 deletions
diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt new file mode 100644 index 00000000..61e5aa7b --- /dev/null +++ b/src/protocol/survey0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_SURVEYOR0) + set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/survey.h) + install(FILES survey.h DESTINATION include/nng/protocol/survey0) +endif() + +if (NNG_PROTO_RESPONDENT0) + set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/respond.h) + install(FILES respond.h DESTINATION include/nng/protocol/survey0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${SURV0_SOURCES} ${RESP0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c new file mode 100644 index 00000000..73e919c3 --- /dev/null +++ b/src/protocol/survey0/respond.c @@ -0,0 +1,509 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "protocol/survey0/respond.h" + +// Respondent protocol. The RESPONDENT protocol is the "replier" side of +// the surveyor pattern. This is useful for building service discovery, or +// voting algorithms, for example. + +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif + +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif + +typedef struct resp0_pipe resp0_pipe; +typedef struct resp0_sock resp0_sock; + +static void resp0_recv_cb(void *); +static void resp0_putq_cb(void *); +static void resp0_getq_cb(void *); +static void resp0_send_cb(void *); +static void resp0_sock_getq_cb(void *); +static void resp0_pipe_fini(void *); + +// resp0_sock is our per-socket protocol private structure. +struct resp0_sock { + nni_msgq * urq; + nni_msgq * uwq; + int raw; + int ttl; + nni_idhash *pipes; + char * btrace; + size_t btrace_len; + nni_aio * aio_getq; + nni_mtx mtx; +}; + +// resp0_pipe is our per-pipe protocol private structure. +struct resp0_pipe { + nni_pipe * npipe; + resp0_sock *psock; + uint32_t id; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; +}; + +static void +resp0_sock_fini(void *arg) +{ + resp0_sock *s = arg; + + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_idhash_fini(s->pipes); + if (s->btrace != NULL) { + nni_free(s->btrace, s->btrace_len); + } + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static int +resp0_sock_init(void **sp, nni_sock *nsock) +{ + resp0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->mtx); + if (((rv = nni_idhash_init(&s->pipes)) != 0) || + ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) { + resp0_sock_fini(s); + return (rv); + } + + s->ttl = 8; // Per RFC + s->raw = 0; + s->btrace = NULL; + s->btrace_len = 0; + s->urq = nni_sock_recvq(nsock); + s->uwq = nni_sock_sendq(nsock); + + *sp = s; + return (0); +} + +static void +resp0_sock_open(void *arg) +{ + resp0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +resp0_sock_close(void *arg) +{ + resp0_sock *s = arg; + + nni_aio_cancel(s->aio_getq, NNG_ECLOSED); +} + +static void +resp0_pipe_fini(void *arg) +{ + resp0_pipe *p = arg; + + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + +static int +resp0_pipe_init(void **pp, nni_pipe *npipe, void *s) +{ + resp0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) { + resp0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = s; + *pp = p; + return (0); +} + +static int +resp0_pipe_start(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + int rv; + + p->id = nni_pipe_id(p->npipe); + + nni_mtx_lock(&s->mtx); + rv = nni_idhash_insert(s->pipes, p->id, p); + nni_mtx_unlock(&s->mtx); + if (rv != 0) { + return (rv); + } + + nni_pipe_recv(p->npipe, p->aio_recv); + nni_msgq_aio_get(p->sendq, p->aio_getq); + + return (rv); +} + +static void +resp0_pipe_stop(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + + nni_msgq_close(p->sendq); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + + if (p->id != 0) { + nni_mtx_lock(&s->mtx); + nni_idhash_remove(s->pipes, p->id); + nni_mtx_unlock(&s->mtx); + p->id = 0; + } +} + +// resp0_sock_send watches for messages from the upper write queue, +// extracts the destination pipe, and forwards it to the appropriate +// destination pipe via a separate queue. This prevents a single bad +// or slow pipe from gumming up the works for the entire socket.s + +void +resp0_sock_getq_cb(void *arg) +{ + resp0_sock *s = arg; + nni_msg * msg; + uint32_t id; + resp0_pipe *p; + int rv; + + if (nni_aio_result(s->aio_getq) != 0) { + return; + } + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); + + // We yank the outgoing pipe id from the header + if (nni_msg_header_len(msg) < 4) { + nni_msg_free(msg); + // We can't really close down the socket, so just keep going. + nni_msgq_aio_get(s->uwq, s->aio_getq); + return; + } + id = nni_msg_header_trim_u32(msg); + + nni_mtx_lock(&s->mtx); + if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) != 0) { + // Destination pipe not present. + nni_msg_free(msg); + } else { + // Non-blocking put. + if (nni_msgq_tryput(p->sendq, msg) != 0) { + nni_msg_free(msg); + } + } + nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_mtx_unlock(&s->mtx); +} + +void +resp0_getq_cb(void *arg) +{ + resp0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->npipe, p->aio_send); +} + +void +resp0_send_cb(void *arg) +{ + resp0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_msgq_aio_get(p->sendq, p->aio_getq); +} + +static void +resp0_recv_cb(void *arg) +{ + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + nni_msgq * urq = s->urq; + nni_msg * msg; + int hops; + int rv; + + if (nni_aio_result(p->aio_recv) != 0) { + goto error; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, p->id); + + // Store the pipe id in the header, first thing. + if (nni_msg_header_append_u32(msg, p->id) != 0) { + nni_msg_free(msg); + goto error; + } + + // Move backtrace from body to header + hops = 0; + for (;;) { + int end = 0; + uint8_t *body; + + if (hops >= s->ttl) { + nni_msg_free(msg); + goto error; + } + if (nni_msg_len(msg) < 4) { + nni_msg_free(msg); + goto error; + } + body = nni_msg_body(msg); + end = (body[0] & 0x80) ? 1 : 0; + rv = nni_msg_header_append(msg, body, 4); + if (rv != 0) { + nni_msg_free(msg); + goto error; + } + nni_msg_trim(msg, 4); + if (end) { + break; + } + } + + // Now send it up. + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(urq, p->aio_putq); + return; + +error: + nni_pipe_stop(p->npipe); +} + +static void +resp0_putq_cb(void *arg) +{ + resp0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->npipe); + } + + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static int +resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + resp0_sock *s = arg; + int rv; + + nni_mtx_lock(&s->mtx); + rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + resp0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static int +resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +{ + resp0_sock *s = arg; + return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); +} + +static int +resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +{ + resp0_sock *s = arg; + return (nni_getopt_int(s->ttl, buf, szp)); +} + +static void +resp0_sock_send(void *arg, nni_aio *aio) +{ + resp0_sock *s = arg; + nni_msg * msg; + int rv; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_put(s->uwq, aio); + return; + } + + msg = nni_aio_get_msg(aio); + + // If we have a stored backtrace, append it to the header... + // if we don't have a backtrace, discard the message. + if (s->btrace == NULL) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + + // drop anything else in the header... + nni_msg_header_clear(msg); + + if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + nni_free(s->btrace, s->btrace_len); + s->btrace = NULL; + s->btrace_len = 0; + + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_put(s->uwq, aio); +} + +static nni_msg * +resp0_sock_filter(void *arg, nni_msg *msg) +{ + resp0_sock *s = arg; + char * header; + size_t len; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + nni_mtx_unlock(&s->mtx); + return (msg); + } + + len = nni_msg_header_len(msg); + header = nni_msg_header(msg); + if (s->btrace != NULL) { + nni_free(s->btrace, s->btrace_len); + s->btrace = NULL; + s->btrace_len = 0; + } + if ((s->btrace = nni_alloc(len)) == NULL) { + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + return (NULL); + } + s->btrace_len = len; + memcpy(s->btrace, header, len); + nni_msg_header_clear(msg); + nni_mtx_unlock(&s->mtx); + return (msg); +} + +static void +resp0_sock_recv(void *arg, nni_aio *aio) +{ + resp0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static nni_proto_pipe_ops resp0_pipe_ops = { + .pipe_init = resp0_pipe_init, + .pipe_fini = resp0_pipe_fini, + .pipe_start = resp0_pipe_start, + .pipe_stop = resp0_pipe_stop, +}; + +static nni_proto_sock_option resp0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = resp0_sock_getopt_raw, + .pso_setopt = resp0_sock_setopt_raw, + }, + { + .pso_name = NNG_OPT_MAXTTL, + .pso_getopt = resp0_sock_getopt_maxttl, + .pso_setopt = resp0_sock_setopt_maxttl, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops resp0_sock_ops = { + .sock_init = resp0_sock_init, + .sock_fini = resp0_sock_fini, + .sock_open = resp0_sock_open, + .sock_close = resp0_sock_close, + .sock_filter = resp0_sock_filter, + .sock_send = resp0_sock_send, + .sock_recv = resp0_sock_recv, + .sock_options = resp0_sock_options, +}; + +static nni_proto resp0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &resp0_sock_ops, + .proto_pipe_ops = &resp0_pipe_ops, +}; + +int +nng_respondent0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &resp0_proto)); +} diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h new file mode 100644 index 00000000..58c65298 --- /dev/null +++ b/src/protocol/survey0/respond.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_RESPOND_H +#define NNG_PROTOCOL_SURVEY0_RESPOND_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_respondent0_open(nng_socket *); + +#ifndef nng_respondent_open +#define nng_respondent_open nng_respondent0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_RESPOND_H diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c new file mode 100644 index 00000000..02283436 --- /dev/null +++ b/src/protocol/survey0/survey.c @@ -0,0 +1,484 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "protocol/survey0/survey.h" + +// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the +// survey pattern. This is useful for building service discovery, voting, etc. + +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif + +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif + +typedef struct surv0_pipe surv0_pipe; +typedef struct surv0_sock surv0_sock; + +static void surv0_sock_getq_cb(void *); +static void surv0_getq_cb(void *); +static void surv0_putq_cb(void *); +static void surv0_send_cb(void *); +static void surv0_recv_cb(void *); +static void surv0_timeout(void *); + +// surv0_sock is our per-socket protocol private structure. +struct surv0_sock { + nni_duration survtime; + nni_time expire; + int raw; + int closing; + uint32_t nextid; // next id + uint32_t survid; // outstanding request ID (big endian) + nni_list pipes; + nni_aio * aio_getq; + nni_timer_node timer; + nni_msgq * uwq; + nni_msgq * urq; + nni_mtx mtx; +}; + +// surv0_pipe is our per-pipe protocol private structure. +struct surv0_pipe { + nni_pipe * npipe; + surv0_sock * psock; + nni_msgq * sendq; + nni_list_node node; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; +}; + +static void +surv0_sock_fini(void *arg) +{ + surv0_sock *s = arg; + + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static int +surv0_sock_init(void **sp, nni_sock *nsock) +{ + surv0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) { + surv0_sock_fini(s); + return (rv); + } + NNI_LIST_INIT(&s->pipes, surv0_pipe, node); + nni_mtx_init(&s->mtx); + nni_timer_init(&s->timer, surv0_timeout, s); + + s->nextid = nni_random(); + s->raw = 0; + s->survtime = NNI_SECOND * 60; + s->expire = NNI_TIME_ZERO; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + + *sp = s; + return (0); +} + +static void +surv0_sock_open(void *arg) +{ + surv0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +surv0_sock_close(void *arg) +{ + surv0_sock *s = arg; + + nni_timer_cancel(&s->timer); + nni_aio_cancel(s->aio_getq, NNG_ECLOSED); +} + +static void +surv0_pipe_fini(void *arg) +{ + surv0_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + +static int +surv0_pipe_init(void **pp, nni_pipe *npipe, void *s) +{ + surv0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + // This depth could be tunable. + if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) { + surv0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = s; + *pp = p; + return (0); +} + +static int +surv0_pipe_start(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *s = p->psock; + + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + nni_mtx_unlock(&s->mtx); + + nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_pipe_recv(p->npipe, p->aio_recv); + return (0); +} + +static void +surv0_pipe_stop(void *arg) +{ + surv0_pipe *p = arg; + surv0_sock *s = p->psock; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); + + nni_msgq_close(p->sendq); + + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); + } + nni_mtx_unlock(&s->mtx); +} + +static void +surv0_getq_cb(void *arg) +{ + surv0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->npipe, p->aio_send); +} + +static void +surv0_send_cb(void *arg) +{ + surv0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_msgq_aio_get(p->psock->uwq, p->aio_getq); +} + +static void +surv0_putq_cb(void *arg) +{ + surv0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static void +surv0_recv_cb(void *arg) +{ + surv0_pipe *p = arg; + nni_msg * msg; + + if (nni_aio_result(p->aio_recv) != 0) { + goto failed; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); + + // We yank 4 bytes of body, and move them to the header. + if (nni_msg_len(msg) < 4) { + // Not enough data, just toss it. + nni_msg_free(msg); + goto failed; + } + if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { + // Should be NNG_ENOMEM + nni_msg_free(msg); + goto failed; + } + (void) nni_msg_trim(msg, 4); + + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(p->psock->urq, p->aio_putq); + return; + +failed: + nni_pipe_stop(p->npipe); +} + +static int +surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz) +{ + surv0_sock *s = arg; + int rv; + + nni_mtx_lock(&s->mtx); + if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) { + s->survid = 0; + nni_timer_cancel(&s->timer); + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp) +{ + surv0_sock *s = arg; + return (nni_getopt_int(s->raw, buf, szp)); +} + +static int +surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz) +{ + surv0_sock *s = arg; + return (nni_setopt_ms(&s->survtime, buf, sz)); +} + +static int +surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp) +{ + surv0_sock *s = arg; + return (nni_getopt_ms(s->survtime, buf, szp)); +} + +static void +surv0_sock_getq_cb(void *arg) +{ + surv0_sock *s = arg; + surv0_pipe *p; + surv0_pipe *last; + nni_msg * msg, *dup; + + if (nni_aio_result(s->aio_getq) != 0) { + // Should be NNG_ECLOSED. + return; + } + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); + + nni_mtx_lock(&s->mtx); + last = nni_list_last(&s->pipes); + NNI_LIST_FOREACH (&s->pipes, p) { + if (p != last) { + if (nni_msg_dup(&dup, msg) != 0) { + continue; + } + } else { + dup = msg; + } + if (nni_msgq_tryput(p->sendq, dup) != 0) { + nni_msg_free(dup); + } + } + nni_mtx_unlock(&s->mtx); + + if (last == NULL) { + // If there were no pipes to send on, just toss the message. + nni_msg_free(msg); + } +} + +static void +surv0_timeout(void *arg) +{ + surv0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + s->survid = 0; + nni_mtx_unlock(&s->mtx); + nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT); +} + +static void +surv0_sock_recv(void *arg, nni_aio *aio) +{ + surv0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (s->survid == 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_get(s->urq, aio); +} + +static void +surv0_sock_send(void *arg, nni_aio *aio) +{ + surv0_sock *s = arg; + nni_msg * msg; + int rv; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + // No automatic retry, and the request ID must + // be in the header coming down. + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_put(s->uwq, aio); + return; + } + + // Generate a new request ID. We always set the high + // order bit so that the peer can locate the end of the + // backtrace. (Pipe IDs have the high order bit clear.) + s->survid = (s->nextid++) | 0x80000000u; + + msg = nni_aio_get_msg(aio); + nni_msg_header_clear(msg); + if ((rv = nni_msg_header_append_u32(msg, s->survid)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + // If another message is there, this cancels it. We move the + // survey expiration out. The timeout thread will wake up in + // the wake below, and reschedule itself appropriately. + s->expire = nni_clock() + s->survtime; + nni_timer_schedule(&s->timer, s->expire); + + nni_mtx_unlock(&s->mtx); + + nni_msgq_aio_put(s->uwq, aio); +} + +static nni_msg * +surv0_sock_filter(void *arg, nni_msg *msg) +{ + surv0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (s->raw) { + // Pass it unmolested + nni_mtx_unlock(&s->mtx); + return (msg); + } + + if ((nni_msg_header_len(msg) < sizeof(uint32_t)) || + (nni_msg_header_trim_u32(msg) != s->survid)) { + // Wrong request id + nni_msg_free(msg); + return (NULL); + } + nni_mtx_unlock(&s->mtx); + + return (msg); +} + +static nni_proto_pipe_ops surv0_pipe_ops = { + .pipe_init = surv0_pipe_init, + .pipe_fini = surv0_pipe_fini, + .pipe_start = surv0_pipe_start, + .pipe_stop = surv0_pipe_stop, +}; + +static nni_proto_sock_option surv0_sock_options[] = { + { + .pso_name = NNG_OPT_RAW, + .pso_getopt = surv0_sock_getopt_raw, + .pso_setopt = surv0_sock_setopt_raw, + }, + { + .pso_name = NNG_OPT_SURVEYOR_SURVEYTIME, + .pso_getopt = surv0_sock_getopt_surveytime, + .pso_setopt = surv0_sock_setopt_surveytime, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static nni_proto_sock_ops surv0_sock_ops = { + .sock_init = surv0_sock_init, + .sock_fini = surv0_sock_fini, + .sock_open = surv0_sock_open, + .sock_close = surv0_sock_close, + .sock_send = surv0_sock_send, + .sock_recv = surv0_sock_recv, + .sock_filter = surv0_sock_filter, + .sock_options = surv0_sock_options, +}; + +static nni_proto surv0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &surv0_sock_ops, + .proto_pipe_ops = &surv0_pipe_ops, +}; + +int +nng_surveyor0_open(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &surv0_proto)); +} diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h new file mode 100644 index 00000000..a7b6d943 --- /dev/null +++ b/src/protocol/survey0/survey.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_SURVEY_H +#define NNG_PROTOCOL_SURVEY0_SURVEY_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_surveyor0_open(nng_socket *); + +#ifndef nng_surveyor_open +#define nng_surveyor_open nng_surveyor0_open +#endif + +#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_SURVEY_H |
