diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-08 12:22:42 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-08 12:22:42 -0800 |
| commit | 5eb72cbdea39728a67a09fdd6f6d1084dadced67 (patch) | |
| tree | af284318658c05785d321dc726cc1bba51942665 | |
| parent | ec2574b09a746709f15d2a3f5de135e29f4bcb52 (diff) | |
| download | nng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.tar.gz nng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.tar.bz2 nng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.zip | |
Add surveyor protocol (no tests yet).
This adds the surveyor protocol, and updates the respondent somewhat.
I've switched to using generic names for per-pipe and per-socket protocol
data. Hopefully this will make 'cut-n-paste' from other protocol
implementations easier.
| -rw-r--r-- | src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/core/protocol.c | 2 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 189 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 427 |
4 files changed, 531 insertions, 88 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 67ef1833..ac029bc9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -77,6 +77,7 @@ set (NNG_SOURCES protocol/reqrep/req.c protocol/survey/respond.c + protocol/survey/survey.c transport/inproc/inproc.c diff --git a/src/core/protocol.c b/src/core/protocol.c index 0faebe95..667a6f91 100644 --- a/src/core/protocol.c +++ b/src/core/protocol.c @@ -23,6 +23,7 @@ extern nni_proto nni_pub_proto; extern nni_proto nni_sub_proto; extern nni_proto nni_push_proto; extern nni_proto nni_pull_proto; +extern nni_proto nni_surveyor_proto; extern nni_proto nni_respondent_proto; static nni_proto *protocols[] = { @@ -33,6 +34,7 @@ static nni_proto *protocols[] = { &nni_sub_proto, &nni_push_proto, &nni_pull_proto, + &nni_surveyor_proto, &nni_respondent_proto, NULL }; diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 3b23dff8..2891edc1 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -21,7 +21,7 @@ typedef struct nni_resp_sock nni_resp_sock; // An nni_resp_sock is our per-socket protocol private structure. struct nni_resp_sock { - nni_sock * sock; + nni_sock * nsock; int raw; int ttl; nni_idhash * pipes; @@ -31,33 +31,33 @@ struct nni_resp_sock { // An nni_resp_pipe is our per-pipe protocol private structure. struct nni_resp_pipe { - nni_pipe * pipe; - nni_resp_sock * resp; + nni_pipe * npipe; + nni_resp_sock * psock; nni_msgq * sendq; int sigclose; }; static int -nni_resp_sock_init(void **respp, nni_sock *sock) +nni_resp_sock_init(void **pp, nni_sock *nsock) { - nni_resp_sock *resp; + nni_resp_sock *psock; int rv; - if ((resp = NNI_ALLOC_STRUCT(resp)) == NULL) { + if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } - resp->ttl = 8; // Per RFC - resp->sock = sock; - resp->raw = 0; - resp->btrace = NULL; - resp->btrace_len = 0; - if ((rv = nni_idhash_create(&resp->pipes)) != 0) { - NNI_FREE_STRUCT(resp); + psock->ttl = 8; // Per RFC + psock->nsock = nsock; + psock->raw = 0; + psock->btrace = NULL; + psock->btrace_len = 0; + if ((rv = nni_idhash_create(&psock->pipes)) != 0) { + NNI_FREE_STRUCT(psock); return (rv); } - *respp = resp; - nni_sock_senderr(sock, NNG_ESTATE); + *pp = psock; + nni_sock_senderr(nsock, NNG_ESTATE); return (0); } @@ -65,33 +65,33 @@ nni_resp_sock_init(void **respp, nni_sock *sock) static void nni_resp_sock_fini(void *arg) { - nni_resp_sock *resp = arg; + nni_resp_sock *psock = arg; - nni_idhash_destroy(resp->pipes); - if (resp->btrace != NULL) { - nni_free(resp->btrace, resp->btrace_len); + nni_idhash_destroy(psock->pipes); + if (psock->btrace != NULL) { + nni_free(psock->btrace, psock->btrace_len); } - NNI_FREE_STRUCT(resp); + NNI_FREE_STRUCT(psock); } static int -nni_resp_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) +nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock) { - nni_resp_pipe *rp; + nni_resp_pipe *ppipe; int rv; - if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { + if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) { - NNI_FREE_STRUCT(rp); + if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) { + NNI_FREE_STRUCT(ppipe); return (rv); } - rp->pipe = pipe; - rp->resp = rsock; - rp->sigclose = 0; - *rpp = rp; + ppipe->npipe = npipe; + ppipe->psock = psock; + ppipe->sigclose = 0; + *pp = ppipe; return (0); } @@ -99,30 +99,32 @@ nni_resp_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) static void nni_resp_pipe_fini(void *arg) { - nni_resp_pipe *rp = arg; + nni_resp_pipe *ppipe = arg; - nni_msgq_fini(rp->sendq); - NNI_FREE_STRUCT(rp); + nni_msgq_fini(ppipe->sendq); + NNI_FREE_STRUCT(ppipe); } static int nni_resp_pipe_add(void *arg) { - nni_resp_pipe *rp = arg; - nni_resp_sock *resp = rp->resp; + nni_resp_pipe *ppipe = arg; + nni_resp_sock *psock = ppipe->psock; + int rv; - return (nni_idhash_insert(resp->pipes, nni_pipe_id(rp->pipe), rp)); + rv = nni_idhash_insert(psock->pipes, nni_pipe_id(ppipe->npipe), ppipe); + return (rv); } static void nni_resp_pipe_rem(void *arg) { - nni_resp_pipe *rp = arg; - nni_resp_sock *resp = rp->resp; + nni_resp_pipe *ppipe = arg; + nni_resp_sock *psock = ppipe->psock; - nni_idhash_remove(resp->pipes, nni_pipe_id(rp->pipe)); + nni_idhash_remove(psock->pipes, nni_pipe_id(ppipe->npipe)); } @@ -133,15 +135,15 @@ nni_resp_pipe_rem(void *arg) static void nni_resp_sock_send(void *arg) { - nni_resp_sock *resp = arg; - nni_msgq *uwq = nni_sock_sendq(resp->sock); - nni_mtx *mx = nni_sock_mtx(resp->sock); + nni_resp_sock *psock = arg; + nni_msgq *uwq = nni_sock_sendq(psock->nsock); + nni_mtx *mx = nni_sock_mtx(psock->nsock); nni_msg *msg; for (;;) { uint8_t *header; uint32_t id; - nni_resp_pipe *rp; + nni_resp_pipe *ppipe; int rv; if ((rv = nni_msgq_get(uwq, &msg)) != 0) { @@ -157,13 +159,13 @@ nni_resp_sock_send(void *arg) nni_msg_trim_header(msg, 4); nni_mtx_lock(mx); - if (nni_idhash_find(resp->pipes, id, (void **) &rp) != 0) { + if (nni_idhash_find(psock->pipes, id, (void **) &ppipe) != 0) { nni_mtx_unlock(mx); nni_msg_free(msg); continue; } // Try a non-blocking put to the lower writer. - rv = nni_msgq_put_until(rp->sendq, msg, NNI_TIME_ZERO); + rv = nni_msgq_put_until(ppipe->sendq, msg, NNI_TIME_ZERO); if (rv != 0) { // message queue is full, we have no choice but // to drop it. This should not happen under normal @@ -178,39 +180,41 @@ nni_resp_sock_send(void *arg) static void nni_resp_pipe_send(void *arg) { - nni_resp_pipe *rp = arg; - nni_resp_sock *resp = rp->resp; - nni_msgq *sendq = rp->sendq; + nni_resp_pipe *ppipe = arg; + nni_resp_sock *psock = ppipe->psock; + nni_pipe *npipe = ppipe->npipe; + nni_msgq *sendq = ppipe->sendq; nni_msg *msg; int rv; for (;;) { - rv = nni_msgq_get_sig(sendq, &msg, &rp->sigclose); + rv = nni_msgq_get_sig(sendq, &msg, &ppipe->sigclose); if (rv != 0) { break; } - rv = nni_pipe_send(rp->pipe, msg); + rv = nni_pipe_send(npipe, msg); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgq_signal(nni_sock_recvq(resp->sock), &rp->sigclose); - nni_pipe_close(rp->pipe); + nni_msgq_signal(nni_sock_recvq(psock->nsock), &ppipe->sigclose); + nni_pipe_close(npipe); } static void nni_resp_pipe_recv(void *arg) { - nni_resp_pipe *rp = arg; - nni_resp_sock *resp = rp->resp; - nni_msgq *urq = nni_sock_recvq(resp->sock); + nni_resp_pipe *ppipe = arg; + nni_resp_sock *psock = ppipe->psock; + nni_msgq *urq = nni_sock_recvq(psock->nsock); + nni_pipe *npipe = ppipe->npipe; nni_msg *msg; int rv; uint8_t idbuf[4]; - uint32_t id = nni_pipe_id(rp->pipe); + uint32_t id = nni_pipe_id(npipe); NNI_PUT32(idbuf, id); @@ -220,7 +224,7 @@ nni_resp_pipe_recv(void *arg) int hops; again: - rv = nni_pipe_recv(rp->pipe, &msg); + rv = nni_pipe_recv(npipe, &msg); if (rv != 0) { break; } @@ -236,7 +240,7 @@ again: hops = 0; for (;;) { int end = 0; - if (hops >= resp->ttl) { + if (hops >= psock->ttl) { nni_msg_free(msg); goto again; } @@ -258,30 +262,39 @@ again: } // Now send it up. - rv = nni_msgq_put_sig(urq, msg, &rp->sigclose); + rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgq_signal(nni_sock_sendq(resp->sock), &rp->sigclose); - nni_msgq_signal(rp->sendq, &rp->sigclose); - nni_pipe_close(rp->pipe); + nni_msgq_signal(nni_sock_sendq(psock->nsock), &ppipe->sigclose); + nni_msgq_signal(ppipe->sendq, &ppipe->sigclose); + nni_pipe_close(npipe); } static int nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { - nni_resp_sock *resp = arg; + nni_resp_sock *psock = arg; int rv; + int oldraw; switch (opt) { case NNG_OPT_MAXTTL: - rv = nni_setopt_int(&resp->ttl, buf, sz, 1, 255); + rv = nni_setopt_int(&psock->ttl, buf, sz, 1, 255); break; case NNG_OPT_RAW: - rv = nni_setopt_int(&resp->raw, buf, sz, 0, 1); + oldraw = psock->raw; + rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); + if (oldraw != psock->raw) { + if (!psock->raw) { + nni_sock_senderr(psock->nsock, 0); + } else { + nni_sock_senderr(psock->nsock, NNG_ESTATE); + } + } break; default: rv = NNG_ENOTSUP; @@ -293,15 +306,15 @@ nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz) static int nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { - nni_resp_sock *resp = arg; + nni_resp_sock *psock = arg; int rv; switch (opt) { case NNG_OPT_MAXTTL: - rv = nni_getopt_int(&resp->ttl, buf, szp); + rv = nni_getopt_int(&psock->ttl, buf, szp); break; case NNG_OPT_RAW: - rv = nni_getopt_int(&resp->raw, buf, szp); + rv = nni_getopt_int(&psock->raw, buf, szp); break; default: rv = NNG_ENOTSUP; @@ -313,19 +326,19 @@ nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_msg * nni_resp_sock_sfilter(void *arg, nni_msg *msg) { - nni_resp_sock *resp = arg; + nni_resp_sock *psock = arg; size_t len; - if (resp->raw) { + if (psock->raw) { return (msg); } // Cannot send again until a receive is done... - nni_sock_senderr(resp->sock, NNG_ESTATE); + nni_sock_senderr(psock->nsock, NNG_ESTATE); // If we have a stored backtrace, append it to the header... // if we don't have a backtrace, discard the message. - if (resp->btrace == NULL) { + if (psock->btrace == NULL) { nni_msg_free(msg); return (NULL); } @@ -333,17 +346,17 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg) // drop anything else in the header... nni_msg_trunc_header(msg, nni_msg_header_len(msg)); - if (nni_msg_append_header(msg, resp->btrace, resp->btrace_len) != 0) { - nni_free(resp->btrace, resp->btrace_len); - resp->btrace = NULL; - resp->btrace_len = 0; + if (nni_msg_append_header(msg, psock->btrace, psock->btrace_len) != 0) { + nni_free(psock->btrace, psock->btrace_len); + psock->btrace = NULL; + psock->btrace_len = 0; nni_msg_free(msg); return (NULL); } - nni_free(resp->btrace, resp->btrace_len); - resp->btrace = NULL; - resp->btrace_len = 0; + nni_free(psock->btrace, psock->btrace_len); + psock->btrace = NULL; + psock->btrace_len = 0; return (msg); } @@ -351,28 +364,28 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg) static nni_msg * nni_resp_sock_rfilter(void *arg, nni_msg *msg) { - nni_resp_sock *resp = arg; + nni_resp_sock *psock = arg; char *header; size_t len; - if (resp->raw) { + if (psock->raw) { return (msg); } - nni_sock_senderr(resp->sock, 0); + nni_sock_senderr(psock->nsock, 0); len = nni_msg_header_len(msg); header = nni_msg_header(msg); - if (resp->btrace != NULL) { - nni_free(resp->btrace, resp->btrace_len); - resp->btrace = NULL; - resp->btrace_len = 0; + if (psock->btrace != NULL) { + nni_free(psock->btrace, psock->btrace_len); + psock->btrace = NULL; + psock->btrace_len = 0; } - if ((resp->btrace = nni_alloc(len)) == NULL) { + if ((psock->btrace = nni_alloc(len)) == NULL) { nni_msg_free(msg); return (NULL); } - resp->btrace_len = len; - memcpy(resp->btrace, header, len); + psock->btrace_len = len; + memcpy(psock->btrace, header, len); nni_msg_trunc_header(msg, len); return (msg); } diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c new file mode 100644 index 00000000..cb600dc3 --- /dev/null +++ b/src/protocol/survey/survey.c @@ -0,0 +1,427 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" + +// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the +// survey pattern. This is useful for building service discovery, voting, etc. + +typedef struct nni_surv_pipe nni_surv_pipe; +typedef struct nni_surv_sock nni_surv_sock; + +// An nni_surv_sock is our per-socket protocol private structure. +struct nni_surv_sock { + nni_sock * nsock; + nni_cv cv; + nni_duration survtime; + nni_time expire; + int raw; + int closing; + uint32_t nextid; // next id + uint8_t survid[4]; // outstanding request ID (big endian) + nni_list pipes; +}; + +// An nni_surv_pipe is our per-pipe protocol private structure. +struct nni_surv_pipe { + nni_pipe * npipe; + nni_surv_sock * psock; + nni_msgq * sendq; + int sigclose; + nni_list_node node; +}; + +static int +nni_surv_sock_init(void **sp, nni_sock *nsock) +{ + nni_surv_sock *psock; + int rv; + + if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_cv_init(&psock->cv, nni_sock_mtx(nsock))) != 0) { + NNI_FREE_STRUCT(psock); + return (rv); + } + NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node); + // this is "semi random" start for request IDs. + psock->nextid = (nni_clock() >> 32) ^ (nni_clock() & 0xffffffff); + psock->nsock = nsock; + psock->raw = 0; + psock->survtime = NNI_SECOND * 60; + psock->expire = NNI_TIME_ZERO; + + *sp = psock; + nni_sock_recverr(nsock, NNG_ESTATE); + return (0); +} + + +static void +nni_surv_sock_close(void *arg) +{ + nni_surv_sock *psock = arg; + + // Shut down the resender. We request it to exit by clearing + // its old value, then kick it. + psock->closing = 1; + nni_cv_wake(&psock->cv); +} + + +static void +nni_surv_sock_fini(void *arg) +{ + nni_surv_sock *psock = arg; + + nni_cv_fini(&psock->cv); + NNI_FREE_STRUCT(psock); +} + + +static int +nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock) +{ + nni_surv_pipe *ppipe; + int rv; + + if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { + return (NNG_ENOMEM); + } + // This depth could be tunable. + if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { + NNI_FREE_STRUCT(ppipe); + return (rv); + } + ppipe->npipe = npipe; + ppipe->sigclose = 0; + ppipe->psock = psock; + *pp = ppipe; + return (0); +} + + +static void +nni_surv_pipe_fini(void *arg) +{ + nni_surv_pipe *sp = arg; + + NNI_FREE_STRUCT(sp); +} + + +static int +nni_surv_pipe_add(void *arg) +{ + nni_surv_pipe *ppipe = arg; + nni_surv_sock *psock = ppipe->psock; + + nni_list_append(&psock->pipes, ppipe); + return (0); +} + + +static void +nni_surv_pipe_rem(void *arg) +{ + nni_surv_pipe *ppipe = arg; + nni_surv_sock *psock = ppipe->psock; + + nni_list_remove(&psock->pipes, ppipe); +} + + +static void +nni_surv_pipe_sender(void *arg) +{ + nni_surv_pipe *ppipe = arg; + nni_surv_sock *psock = ppipe->psock; + nni_pipe *npipe = ppipe->npipe; + nni_msgq *uwq = nni_sock_sendq(psock->nsock); + nni_msgq *urq = nni_sock_recvq(psock->nsock); + nni_mtx *mx = nni_sock_mtx(psock->nsock); + nni_msg *msg; + int rv; + + for (;;) { + rv = nni_msgq_get_sig(uwq, &msg, &ppipe->sigclose); + if (rv != 0) { + break; + } + rv = nni_pipe_send(npipe, msg); + if (rv != 0) { + nni_msg_free(msg); + break; + } + } + nni_msgq_signal(urq, &ppipe->sigclose); + nni_pipe_close(npipe); +} + + +static void +nni_surv_pipe_receiver(void *arg) +{ + nni_surv_pipe *ppipe = arg; + nni_surv_sock *psock = ppipe->psock; + nni_msgq *urq = nni_sock_recvq(psock->nsock); + nni_msgq *uwq = nni_sock_sendq(psock->nsock); + nni_pipe *npipe = ppipe->npipe; + nni_msg *msg; + int rv; + + for (;;) { + rv = nni_pipe_recv(npipe, &msg); + if (rv != 0) { + break; + } + // We yank 4 bytes of body, and move them to the header. + if (nni_msg_len(msg) < 4) { + // Not enough data, just toss it. + nni_msg_free(msg); + continue; + } + if (nni_msg_append_header(msg, nni_msg_body(msg), 4) != 0) { + // Should be NNG_ENOMEM + nni_msg_free(msg); + continue; + } + if (nni_msg_trim(msg, 4) != 0) { + // This should never happen - could be an assert. + nni_panic("Failed to trim SURV header from body"); + } + rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose); + if (rv != 0) { + nni_msg_free(msg); + break; + } + } + nni_msgq_signal(uwq, &ppipe->sigclose); + nni_pipe_close(npipe); +} + + +static int +nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz) +{ + nni_surv_sock *psock = arg; + int rv; + int oldraw; + + switch (opt) { + case NNG_OPT_SURVEYTIME: + rv = nni_setopt_duration(&psock->survtime, buf, sz); + break; + case NNG_OPT_RAW: + oldraw = psock->raw; + rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); + if (oldraw != psock->raw) { + if (psock->raw) { + nni_sock_recverr(psock->nsock, 0); + } else { + nni_sock_recverr(psock->nsock, NNG_ESTATE); + } + memset(psock->survid, 0, sizeof (psock->survid)); + psock->expire = NNI_TIME_NEVER; + nni_cv_wake(&psock->cv); + } + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static int +nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp) +{ + nni_surv_sock *psock = arg; + int rv; + + switch (opt) { + case NNG_OPT_SURVEYTIME: + rv = nni_getopt_duration(&psock->survtime, buf, szp); + break; + case NNG_OPT_RAW: + rv = nni_getopt_int(&psock->raw, buf, szp); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); +} + + +static void +nni_surv_sock_sender(void *arg) +{ + nni_surv_sock *psock = arg; + nni_msgq *uwq = nni_sock_sendq(psock->nsock); + nni_mtx *mx = nni_sock_mtx(psock->nsock); + nni_msg *msg, *dup; + + for (;;) { + nni_surv_pipe *ppipe; + nni_surv_pipe *last; + int rv; + + if ((rv = nni_msgq_get(uwq, &msg)) != 0) { + break; + } + + nni_mtx_lock(mx); + last = nni_list_last(&psock->pipes); + NNI_LIST_FOREACH (&psock->pipes, ppipe) { + if (ppipe != last) { + rv = nni_msg_dup(&dup, msg); + if (rv != 0) { + continue; + } + } else { + dup = msg; + } + if ((rv = nni_msgq_tryput(ppipe->sendq, dup)) != 0) { + nni_msg_free(dup); + } + } + nni_mtx_unlock(mx); + + if (last == NULL) { + nni_msg_free(msg); + } + } +} + + +static void +nni_surv_sock_timeout(void *arg) +{ + nni_surv_sock *psock = arg; + nni_mtx *mx = nni_sock_mtx(psock->nsock); + + nni_mtx_lock(mx); + for (;;) { + if (psock->closing) { + nni_mtx_unlock(mx); + return; + } + if (nni_clock() > psock->expire) { + // Set the expiration ~forever + psock->expire = NNI_TIME_NEVER; + // Survey IDs *always* have the high order bit set, + // so zeroing means that nothing can match. + memset(psock->survid, 0, sizeof (psock->survid)); + nni_sock_recverr(psock->nsock, NNG_ESTATE); + } + nni_cv_until(&psock->cv, psock->expire); + } +} + + +static nni_msg * +nni_surv_sock_sfilter(void *arg, nni_msg *msg) +{ + nni_surv_sock *psock = arg; + uint32_t id; + + if (psock->raw) { + // No automatic retry, and the request ID must + // be in the header coming down. + return (msg); + } + + // Generate a new request ID. We always set the high + // order bit so that the peer can locate the end of the + // backtrace. (Pipe IDs have the high order bit clear.) + id = (psock->nextid++) | 0x80000000u; + + // Survey ID is in big endian format. + NNI_PUT32(psock->survid, id); + + if (nni_msg_append_header(msg, psock->survid, 4) != 0) { + // Should be ENOMEM. + nni_msg_free(msg); + return (NULL); + } + + // If another message is there, this cancels it. We move the + // survey expiration out. The timeout thread will wake up in + // the wake below, and reschedule itself appropriately. + psock->expire = nni_clock() + psock->survtime; + nni_cv_wake(&psock->cv); + + // Clear the error condition. + nni_sock_recverr(psock->nsock, 0); + + return (msg); +} + + +static nni_msg * +nni_surv_sock_rfilter(void *arg, nni_msg *msg) +{ + nni_surv_sock *ssock = arg; + + if (ssock->raw) { + // Pass it unmolested + return (msg); + } + + if (nni_msg_header_len(msg) < 4) { + nni_msg_free(msg); + return (NULL); + } + + if (memcmp(nni_msg_header(msg), ssock->survid, 4) != 0) { + // Wrong request id + nni_msg_free(msg); + return (NULL); + } + // Prune the survey ID. + nni_msg_trim_header(msg, 4); + + return (msg); +} + + +static nni_proto_pipe_ops nni_surv_pipe_ops = { + .pipe_init = nni_surv_pipe_init, + .pipe_fini = nni_surv_pipe_fini, + .pipe_add = nni_surv_pipe_add, + .pipe_rem = nni_surv_pipe_rem, + .pipe_worker = { nni_surv_pipe_sender, + nni_surv_pipe_receiver } +}; + +static nni_proto_sock_ops nni_surv_sock_ops = { + .sock_init = nni_surv_sock_init, + .sock_fini = nni_surv_sock_fini, + .sock_close = nni_surv_sock_close, + .sock_setopt = nni_surv_sock_setopt, + .sock_getopt = nni_surv_sock_getopt, + .sock_rfilter = nni_surv_sock_rfilter, + .sock_sfilter = nni_surv_sock_sfilter, + .sock_worker = { nni_surv_sock_sender, + nni_surv_sock_timeout } +}; + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +nni_proto nni_surveyor_proto = { + .proto_self = NNG_PROTO_SURVEYOR, + .proto_peer = NNG_PROTO_RESPONDENT, + .proto_name = "surveyor", + .proto_sock_ops = &nni_surv_sock_ops, + .proto_pipe_ops = &nni_surv_pipe_ops, +}; |
