diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-08 12:22:42 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-08 12:22:42 -0800 |
| commit | 5eb72cbdea39728a67a09fdd6f6d1084dadced67 (patch) | |
| tree | af284318658c05785d321dc726cc1bba51942665 /src/protocol/survey/survey.c | |
| parent | ec2574b09a746709f15d2a3f5de135e29f4bcb52 (diff) | |
| download | nng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.tar.gz nng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.tar.bz2 nng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.zip | |
Add surveyor protocol (no tests yet).
This adds the surveyor protocol, and updates the respondent somewhat.
I've switched to using generic names for per-pipe and per-socket protocol
data. Hopefully this will make 'cut-n-paste' from other protocol
implementations easier.
Diffstat (limited to 'src/protocol/survey/survey.c')
| -rw-r--r-- | src/protocol/survey/survey.c | 427 |
1 files changed, 427 insertions, 0 deletions
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c new file mode 100644 index 00000000..cb600dc3 --- /dev/null +++ b/src/protocol/survey/survey.c @@ -0,0 +1,427 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" + +// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the +// survey pattern. This is useful for building service discovery, voting, etc. + +typedef struct nni_surv_pipe nni_surv_pipe; +typedef struct nni_surv_sock nni_surv_sock; + +// An nni_surv_sock is our per-socket protocol private structure. +struct nni_surv_sock { + nni_sock * nsock; + nni_cv cv; + nni_duration survtime; + nni_time expire; + int raw; + int closing; + uint32_t nextid; // next id + uint8_t survid[4]; // outstanding request ID (big endian) + nni_list pipes; +}; + +// An nni_surv_pipe is our per-pipe protocol private structure. +struct nni_surv_pipe { + nni_pipe * npipe; + nni_surv_sock * psock; + nni_msgq * sendq; + int sigclose; + nni_list_node node; +}; + +static int +nni_surv_sock_init(void **sp, nni_sock *nsock) +{ + nni_surv_sock *psock; + int rv; + + if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_cv_init(&psock->cv, nni_sock_mtx(nsock))) != 0) { + NNI_FREE_STRUCT(psock); + return (rv); + } + NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node); + // this is "semi random" start for request IDs. + psock->nextid = (nni_clock() >> 32) ^ (nni_clock() & 0xffffffff); + psock->nsock = nsock; + psock->raw = 0; + psock->survtime = NNI_SECOND * 60; + psock->expire = NNI_TIME_ZERO; + + *sp = psock; + nni_sock_recverr(nsock, NNG_ESTATE); + return (0); +} + + +static void +nni_surv_sock_close(void *arg) +{ + nni_surv_sock *psock = arg; + + // Shut down the resender. We request it to exit by clearing + // its old value, then kick it. + psock->closing = 1; + nni_cv_wake(&psock->cv); +} + + +static void +nni_surv_sock_fini(void *arg) +{ + nni_surv_sock *psock = arg; + + nni_cv_fini(&psock->cv); + NNI_FREE_STRUCT(psock); +} + + +static int +nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock) +{ + nni_surv_pipe *ppipe; + int rv; + + if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { + return (NNG_ENOMEM); + } + // This depth could be tunable. + if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { + NNI_FREE_STRUCT(ppipe); + return (rv); + } + ppipe->npipe = npipe; + ppipe->sigclose = 0; + ppipe->psock = psock; + *pp = ppipe; + return (0); +} + + +static void +nni_surv_pipe_fini(void *arg) +{ + nni_surv_pipe *sp = arg; + + NNI_FREE_STRUCT(sp); +} + + +static int +nni_surv_pipe_add(void *arg) +{ + nni_surv_pipe *ppipe = arg; + nni_surv_sock *psock = ppipe->psock; + + nni_list_append(&psock->pipes, ppipe); + return (0); +} + + +static void +nni_surv_pipe_rem(void *arg) +{ + nni_surv_pipe *ppipe = arg; + nni_surv_sock *psock = ppipe->psock; + + nni_list_remove(&psock->pipes, ppipe); +} + + +static void +nni_surv_pipe_sender(void *arg) +{ + nni_surv_pipe *ppipe = arg; + nni_surv_sock *psock = ppipe->psock; + nni_pipe *npipe = ppipe->npipe; + nni_msgq *uwq = nni_sock_sendq(psock->nsock); + nni_msgq *urq = nni_sock_recvq(psock->nsock); + nni_mtx *mx = nni_sock_mtx(psock->nsock); + nni_msg *msg; + int rv; + + for (;;) { + rv = nni_msgq_get_sig(uwq, &msg, &ppipe->sigclose); + if (rv != 0) { + break; + } + rv = nni_pipe_send(npipe, msg); + if (rv != 0) { + nni_msg_free(msg); + break; + } + } + nni_msgq_signal(urq, &ppipe->sigclose); + nni_pipe_close(npipe); +} + + +static void +nni_surv_pipe_receiver(void *arg) +{ + nni_surv_pipe *ppipe = arg; + nni_surv_sock *psock = ppipe->psock; + nni_msgq *urq = nni_sock_recvq(psock->nsock); + nni_msgq *uwq = nni_sock_sendq(psock->nsock); + nni_pipe *npipe = ppipe->npipe; + nni_msg *msg; + int rv; + + for (;;) { + rv = nni_pipe_recv(npipe, &msg); + if (rv != 0) { + break; + } + // We yank 4 bytes of body, and move them to the header. + if (nni_msg_len(msg) < 4) { + // Not enough data, just toss it. + nni_msg_free(msg); + continue; + } + if (nni_msg_append_header(msg, nni_msg_body(msg), 4) != 0) { + // Should be NNG_ENOMEM + nni_msg_free(msg); + continue; + } + if (nni_msg_trim(msg, 4) != 0) { + // This should never happen - could be an assert. + nni_panic("Failed to trim SURV header from body"); + } + rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose); + if (rv != 0) { + nni_msg_free(msg); + break; + } + } + nni_msgq_signal(uwq, &ppipe->sigclose); + nni_pipe_close(npipe); +} + + +static int +nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz) +{ + nni_surv_sock *psock = arg; + int rv; + int oldraw; + + switch (opt) { + case NNG_OPT_SURVEYTIME: + rv = nni_setopt_duration(&psock->survtime, buf, sz); + break; + case NNG_OPT_RAW: + oldraw = psock->raw; + rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); + if (oldraw != psock->raw) { + if (psock->raw) { + nni_sock_recverr(psock->nsock, 0); + } else { + nni_sock_recverr(psock->nsock, NNG_ESTATE); + } + memset(psock->survid, 0, sizeof (psock->survid)); + psock->expire = NNI_TIME_NEVER; + nni_cv_wake(&psock->cv); + } + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static int +nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp) +{ + nni_surv_sock *psock = arg; + int rv; + + switch (opt) { + case NNG_OPT_SURVEYTIME: + rv = nni_getopt_duration(&psock->survtime, buf, szp); + break; + case NNG_OPT_RAW: + rv = nni_getopt_int(&psock->raw, buf, szp); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static void +nni_surv_sock_sender(void *arg) +{ + nni_surv_sock *psock = arg; + nni_msgq *uwq = nni_sock_sendq(psock->nsock); + nni_mtx *mx = nni_sock_mtx(psock->nsock); + nni_msg *msg, *dup; + + for (;;) { + nni_surv_pipe *ppipe; + nni_surv_pipe *last; + int rv; + + if ((rv = nni_msgq_get(uwq, &msg)) != 0) { + break; + } + + nni_mtx_lock(mx); + last = nni_list_last(&psock->pipes); + NNI_LIST_FOREACH (&psock->pipes, ppipe) { + if (ppipe != last) { + rv = nni_msg_dup(&dup, msg); + if (rv != 0) { + continue; + } + } else { + dup = msg; + } + if ((rv = nni_msgq_tryput(ppipe->sendq, dup)) != 0) { + nni_msg_free(dup); + } + } + nni_mtx_unlock(mx); + + if (last == NULL) { + nni_msg_free(msg); + } + } +} + + +static void +nni_surv_sock_timeout(void *arg) +{ + nni_surv_sock *psock = arg; + nni_mtx *mx = nni_sock_mtx(psock->nsock); + + nni_mtx_lock(mx); + for (;;) { + if (psock->closing) { + nni_mtx_unlock(mx); + return; + } + if (nni_clock() > psock->expire) { + // Set the expiration ~forever + psock->expire = NNI_TIME_NEVER; + // Survey IDs *always* have the high order bit set, + // so zeroing means that nothing can match. + memset(psock->survid, 0, sizeof (psock->survid)); + nni_sock_recverr(psock->nsock, NNG_ESTATE); + } + nni_cv_until(&psock->cv, psock->expire); + } +} + + +static nni_msg * +nni_surv_sock_sfilter(void *arg, nni_msg *msg) +{ + nni_surv_sock *psock = arg; + uint32_t id; + + if (psock->raw) { + // No automatic retry, and the request ID must + // be in the header coming down. + return (msg); + } + + // Generate a new request ID. We always set the high + // order bit so that the peer can locate the end of the + // backtrace. (Pipe IDs have the high order bit clear.) + id = (psock->nextid++) | 0x80000000u; + + // Survey ID is in big endian format. + NNI_PUT32(psock->survid, id); + + if (nni_msg_append_header(msg, psock->survid, 4) != 0) { + // Should be ENOMEM. + nni_msg_free(msg); + return (NULL); + } + + // If another message is there, this cancels it. We move the + // survey expiration out. The timeout thread will wake up in + // the wake below, and reschedule itself appropriately. + psock->expire = nni_clock() + psock->survtime; + nni_cv_wake(&psock->cv); + + // Clear the error condition. + nni_sock_recverr(psock->nsock, 0); + + return (msg); +} + + +static nni_msg * +nni_surv_sock_rfilter(void *arg, nni_msg *msg) +{ + nni_surv_sock *ssock = arg; + + if (ssock->raw) { + // Pass it unmolested + return (msg); + } + + if (nni_msg_header_len(msg) < 4) { + nni_msg_free(msg); + return (NULL); + } + + if (memcmp(nni_msg_header(msg), ssock->survid, 4) != 0) { + // Wrong request id + nni_msg_free(msg); + return (NULL); + } + // Prune the survey ID. + nni_msg_trim_header(msg, 4); + + return (msg); +} + + +static nni_proto_pipe_ops nni_surv_pipe_ops = { + .pipe_init = nni_surv_pipe_init, + .pipe_fini = nni_surv_pipe_fini, + .pipe_add = nni_surv_pipe_add, + .pipe_rem = nni_surv_pipe_rem, + .pipe_worker = { nni_surv_pipe_sender, + nni_surv_pipe_receiver } +}; + +static nni_proto_sock_ops nni_surv_sock_ops = { + .sock_init = nni_surv_sock_init, + .sock_fini = nni_surv_sock_fini, + .sock_close = nni_surv_sock_close, + .sock_setopt = nni_surv_sock_setopt, + .sock_getopt = nni_surv_sock_getopt, + .sock_rfilter = nni_surv_sock_rfilter, + .sock_sfilter = nni_surv_sock_sfilter, + .sock_worker = { nni_surv_sock_sender, + nni_surv_sock_timeout } +}; + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +nni_proto nni_surveyor_proto = { + .proto_self = NNG_PROTO_SURVEYOR, + .proto_peer = NNG_PROTO_RESPONDENT, + .proto_name = "surveyor", + .proto_sock_ops = &nni_surv_sock_ops, + .proto_pipe_ops = &nni_surv_pipe_ops, +}; |
