aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-08 12:22:42 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-08 12:22:42 -0800
commit5eb72cbdea39728a67a09fdd6f6d1084dadced67 (patch)
treeaf284318658c05785d321dc726cc1bba51942665 /src
parentec2574b09a746709f15d2a3f5de135e29f4bcb52 (diff)
downloadnng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.tar.gz
nng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.tar.bz2
nng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.zip
Add surveyor protocol (no tests yet).
This adds the surveyor protocol, and updates the respondent somewhat. I've switched to using generic names for per-pipe and per-socket protocol data. Hopefully this will make 'cut-n-paste' from other protocol implementations easier.
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/core/protocol.c2
-rw-r--r--src/protocol/survey/respond.c189
-rw-r--r--src/protocol/survey/survey.c427
4 files changed, 531 insertions, 88 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 67ef1833..ac029bc9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -77,6 +77,7 @@ set (NNG_SOURCES
protocol/reqrep/req.c
protocol/survey/respond.c
+ protocol/survey/survey.c
transport/inproc/inproc.c
diff --git a/src/core/protocol.c b/src/core/protocol.c
index 0faebe95..667a6f91 100644
--- a/src/core/protocol.c
+++ b/src/core/protocol.c
@@ -23,6 +23,7 @@ extern nni_proto nni_pub_proto;
extern nni_proto nni_sub_proto;
extern nni_proto nni_push_proto;
extern nni_proto nni_pull_proto;
+extern nni_proto nni_surveyor_proto;
extern nni_proto nni_respondent_proto;
static nni_proto *protocols[] = {
@@ -33,6 +34,7 @@ static nni_proto *protocols[] = {
&nni_sub_proto,
&nni_push_proto,
&nni_pull_proto,
+ &nni_surveyor_proto,
&nni_respondent_proto,
NULL
};
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 3b23dff8..2891edc1 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -21,7 +21,7 @@ typedef struct nni_resp_sock nni_resp_sock;
// An nni_resp_sock is our per-socket protocol private structure.
struct nni_resp_sock {
- nni_sock * sock;
+ nni_sock * nsock;
int raw;
int ttl;
nni_idhash * pipes;
@@ -31,33 +31,33 @@ struct nni_resp_sock {
// An nni_resp_pipe is our per-pipe protocol private structure.
struct nni_resp_pipe {
- nni_pipe * pipe;
- nni_resp_sock * resp;
+ nni_pipe * npipe;
+ nni_resp_sock * psock;
nni_msgq * sendq;
int sigclose;
};
static int
-nni_resp_sock_init(void **respp, nni_sock *sock)
+nni_resp_sock_init(void **pp, nni_sock *nsock)
{
- nni_resp_sock *resp;
+ nni_resp_sock *psock;
int rv;
- if ((resp = NNI_ALLOC_STRUCT(resp)) == NULL) {
+ if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
}
- resp->ttl = 8; // Per RFC
- resp->sock = sock;
- resp->raw = 0;
- resp->btrace = NULL;
- resp->btrace_len = 0;
- if ((rv = nni_idhash_create(&resp->pipes)) != 0) {
- NNI_FREE_STRUCT(resp);
+ psock->ttl = 8; // Per RFC
+ psock->nsock = nsock;
+ psock->raw = 0;
+ psock->btrace = NULL;
+ psock->btrace_len = 0;
+ if ((rv = nni_idhash_create(&psock->pipes)) != 0) {
+ NNI_FREE_STRUCT(psock);
return (rv);
}
- *respp = resp;
- nni_sock_senderr(sock, NNG_ESTATE);
+ *pp = psock;
+ nni_sock_senderr(nsock, NNG_ESTATE);
return (0);
}
@@ -65,33 +65,33 @@ nni_resp_sock_init(void **respp, nni_sock *sock)
static void
nni_resp_sock_fini(void *arg)
{
- nni_resp_sock *resp = arg;
+ nni_resp_sock *psock = arg;
- nni_idhash_destroy(resp->pipes);
- if (resp->btrace != NULL) {
- nni_free(resp->btrace, resp->btrace_len);
+ nni_idhash_destroy(psock->pipes);
+ if (psock->btrace != NULL) {
+ nni_free(psock->btrace, psock->btrace_len);
}
- NNI_FREE_STRUCT(resp);
+ NNI_FREE_STRUCT(psock);
}
static int
-nni_resp_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
+nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
- nni_resp_pipe *rp;
+ nni_resp_pipe *ppipe;
int rv;
- if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) {
- NNI_FREE_STRUCT(rp);
+ if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) {
+ NNI_FREE_STRUCT(ppipe);
return (rv);
}
- rp->pipe = pipe;
- rp->resp = rsock;
- rp->sigclose = 0;
- *rpp = rp;
+ ppipe->npipe = npipe;
+ ppipe->psock = psock;
+ ppipe->sigclose = 0;
+ *pp = ppipe;
return (0);
}
@@ -99,30 +99,32 @@ nni_resp_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
static void
nni_resp_pipe_fini(void *arg)
{
- nni_resp_pipe *rp = arg;
+ nni_resp_pipe *ppipe = arg;
- nni_msgq_fini(rp->sendq);
- NNI_FREE_STRUCT(rp);
+ nni_msgq_fini(ppipe->sendq);
+ NNI_FREE_STRUCT(ppipe);
}
static int
nni_resp_pipe_add(void *arg)
{
- nni_resp_pipe *rp = arg;
- nni_resp_sock *resp = rp->resp;
+ nni_resp_pipe *ppipe = arg;
+ nni_resp_sock *psock = ppipe->psock;
+ int rv;
- return (nni_idhash_insert(resp->pipes, nni_pipe_id(rp->pipe), rp));
+ rv = nni_idhash_insert(psock->pipes, nni_pipe_id(ppipe->npipe), ppipe);
+ return (rv);
}
static void
nni_resp_pipe_rem(void *arg)
{
- nni_resp_pipe *rp = arg;
- nni_resp_sock *resp = rp->resp;
+ nni_resp_pipe *ppipe = arg;
+ nni_resp_sock *psock = ppipe->psock;
- nni_idhash_remove(resp->pipes, nni_pipe_id(rp->pipe));
+ nni_idhash_remove(psock->pipes, nni_pipe_id(ppipe->npipe));
}
@@ -133,15 +135,15 @@ nni_resp_pipe_rem(void *arg)
static void
nni_resp_sock_send(void *arg)
{
- nni_resp_sock *resp = arg;
- nni_msgq *uwq = nni_sock_sendq(resp->sock);
- nni_mtx *mx = nni_sock_mtx(resp->sock);
+ nni_resp_sock *psock = arg;
+ nni_msgq *uwq = nni_sock_sendq(psock->nsock);
+ nni_mtx *mx = nni_sock_mtx(psock->nsock);
nni_msg *msg;
for (;;) {
uint8_t *header;
uint32_t id;
- nni_resp_pipe *rp;
+ nni_resp_pipe *ppipe;
int rv;
if ((rv = nni_msgq_get(uwq, &msg)) != 0) {
@@ -157,13 +159,13 @@ nni_resp_sock_send(void *arg)
nni_msg_trim_header(msg, 4);
nni_mtx_lock(mx);
- if (nni_idhash_find(resp->pipes, id, (void **) &rp) != 0) {
+ if (nni_idhash_find(psock->pipes, id, (void **) &ppipe) != 0) {
nni_mtx_unlock(mx);
nni_msg_free(msg);
continue;
}
// Try a non-blocking put to the lower writer.
- rv = nni_msgq_put_until(rp->sendq, msg, NNI_TIME_ZERO);
+ rv = nni_msgq_put_until(ppipe->sendq, msg, NNI_TIME_ZERO);
if (rv != 0) {
// message queue is full, we have no choice but
// to drop it. This should not happen under normal
@@ -178,39 +180,41 @@ nni_resp_sock_send(void *arg)
static void
nni_resp_pipe_send(void *arg)
{
- nni_resp_pipe *rp = arg;
- nni_resp_sock *resp = rp->resp;
- nni_msgq *sendq = rp->sendq;
+ nni_resp_pipe *ppipe = arg;
+ nni_resp_sock *psock = ppipe->psock;
+ nni_pipe *npipe = ppipe->npipe;
+ nni_msgq *sendq = ppipe->sendq;
nni_msg *msg;
int rv;
for (;;) {
- rv = nni_msgq_get_sig(sendq, &msg, &rp->sigclose);
+ rv = nni_msgq_get_sig(sendq, &msg, &ppipe->sigclose);
if (rv != 0) {
break;
}
- rv = nni_pipe_send(rp->pipe, msg);
+ rv = nni_pipe_send(npipe, msg);
if (rv != 0) {
nni_msg_free(msg);
break;
}
}
- nni_msgq_signal(nni_sock_recvq(resp->sock), &rp->sigclose);
- nni_pipe_close(rp->pipe);
+ nni_msgq_signal(nni_sock_recvq(psock->nsock), &ppipe->sigclose);
+ nni_pipe_close(npipe);
}
static void
nni_resp_pipe_recv(void *arg)
{
- nni_resp_pipe *rp = arg;
- nni_resp_sock *resp = rp->resp;
- nni_msgq *urq = nni_sock_recvq(resp->sock);
+ nni_resp_pipe *ppipe = arg;
+ nni_resp_sock *psock = ppipe->psock;
+ nni_msgq *urq = nni_sock_recvq(psock->nsock);
+ nni_pipe *npipe = ppipe->npipe;
nni_msg *msg;
int rv;
uint8_t idbuf[4];
- uint32_t id = nni_pipe_id(rp->pipe);
+ uint32_t id = nni_pipe_id(npipe);
NNI_PUT32(idbuf, id);
@@ -220,7 +224,7 @@ nni_resp_pipe_recv(void *arg)
int hops;
again:
- rv = nni_pipe_recv(rp->pipe, &msg);
+ rv = nni_pipe_recv(npipe, &msg);
if (rv != 0) {
break;
}
@@ -236,7 +240,7 @@ again:
hops = 0;
for (;;) {
int end = 0;
- if (hops >= resp->ttl) {
+ if (hops >= psock->ttl) {
nni_msg_free(msg);
goto again;
}
@@ -258,30 +262,39 @@ again:
}
// Now send it up.
- rv = nni_msgq_put_sig(urq, msg, &rp->sigclose);
+ rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose);
if (rv != 0) {
nni_msg_free(msg);
break;
}
}
- nni_msgq_signal(nni_sock_sendq(resp->sock), &rp->sigclose);
- nni_msgq_signal(rp->sendq, &rp->sigclose);
- nni_pipe_close(rp->pipe);
+ nni_msgq_signal(nni_sock_sendq(psock->nsock), &ppipe->sigclose);
+ nni_msgq_signal(ppipe->sendq, &ppipe->sigclose);
+ nni_pipe_close(npipe);
}
static int
nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_resp_sock *resp = arg;
+ nni_resp_sock *psock = arg;
int rv;
+ int oldraw;
switch (opt) {
case NNG_OPT_MAXTTL:
- rv = nni_setopt_int(&resp->ttl, buf, sz, 1, 255);
+ rv = nni_setopt_int(&psock->ttl, buf, sz, 1, 255);
break;
case NNG_OPT_RAW:
- rv = nni_setopt_int(&resp->raw, buf, sz, 0, 1);
+ oldraw = psock->raw;
+ rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
+ if (oldraw != psock->raw) {
+ if (!psock->raw) {
+ nni_sock_senderr(psock->nsock, 0);
+ } else {
+ nni_sock_senderr(psock->nsock, NNG_ESTATE);
+ }
+ }
break;
default:
rv = NNG_ENOTSUP;
@@ -293,15 +306,15 @@ nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
static int
nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_resp_sock *resp = arg;
+ nni_resp_sock *psock = arg;
int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
- rv = nni_getopt_int(&resp->ttl, buf, szp);
+ rv = nni_getopt_int(&psock->ttl, buf, szp);
break;
case NNG_OPT_RAW:
- rv = nni_getopt_int(&resp->raw, buf, szp);
+ rv = nni_getopt_int(&psock->raw, buf, szp);
break;
default:
rv = NNG_ENOTSUP;
@@ -313,19 +326,19 @@ nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static nni_msg *
nni_resp_sock_sfilter(void *arg, nni_msg *msg)
{
- nni_resp_sock *resp = arg;
+ nni_resp_sock *psock = arg;
size_t len;
- if (resp->raw) {
+ if (psock->raw) {
return (msg);
}
// Cannot send again until a receive is done...
- nni_sock_senderr(resp->sock, NNG_ESTATE);
+ nni_sock_senderr(psock->nsock, NNG_ESTATE);
// If we have a stored backtrace, append it to the header...
// if we don't have a backtrace, discard the message.
- if (resp->btrace == NULL) {
+ if (psock->btrace == NULL) {
nni_msg_free(msg);
return (NULL);
}
@@ -333,17 +346,17 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg)
// drop anything else in the header...
nni_msg_trunc_header(msg, nni_msg_header_len(msg));
- if (nni_msg_append_header(msg, resp->btrace, resp->btrace_len) != 0) {
- nni_free(resp->btrace, resp->btrace_len);
- resp->btrace = NULL;
- resp->btrace_len = 0;
+ if (nni_msg_append_header(msg, psock->btrace, psock->btrace_len) != 0) {
+ nni_free(psock->btrace, psock->btrace_len);
+ psock->btrace = NULL;
+ psock->btrace_len = 0;
nni_msg_free(msg);
return (NULL);
}
- nni_free(resp->btrace, resp->btrace_len);
- resp->btrace = NULL;
- resp->btrace_len = 0;
+ nni_free(psock->btrace, psock->btrace_len);
+ psock->btrace = NULL;
+ psock->btrace_len = 0;
return (msg);
}
@@ -351,28 +364,28 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg)
static nni_msg *
nni_resp_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_resp_sock *resp = arg;
+ nni_resp_sock *psock = arg;
char *header;
size_t len;
- if (resp->raw) {
+ if (psock->raw) {
return (msg);
}
- nni_sock_senderr(resp->sock, 0);
+ nni_sock_senderr(psock->nsock, 0);
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
- if (resp->btrace != NULL) {
- nni_free(resp->btrace, resp->btrace_len);
- resp->btrace = NULL;
- resp->btrace_len = 0;
+ if (psock->btrace != NULL) {
+ nni_free(psock->btrace, psock->btrace_len);
+ psock->btrace = NULL;
+ psock->btrace_len = 0;
}
- if ((resp->btrace = nni_alloc(len)) == NULL) {
+ if ((psock->btrace = nni_alloc(len)) == NULL) {
nni_msg_free(msg);
return (NULL);
}
- resp->btrace_len = len;
- memcpy(resp->btrace, header, len);
+ psock->btrace_len = len;
+ memcpy(psock->btrace, header, len);
nni_msg_trunc_header(msg, len);
return (msg);
}
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
new file mode 100644
index 00000000..cb600dc3
--- /dev/null
+++ b/src/protocol/survey/survey.c
@@ -0,0 +1,427 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
+// survey pattern. This is useful for building service discovery, voting, etc.
+
+typedef struct nni_surv_pipe nni_surv_pipe;
+typedef struct nni_surv_sock nni_surv_sock;
+
+// An nni_surv_sock is our per-socket protocol private structure.
+struct nni_surv_sock {
+ nni_sock * nsock;
+ nni_cv cv;
+ nni_duration survtime;
+ nni_time expire;
+ int raw;
+ int closing;
+ uint32_t nextid; // next id
+ uint8_t survid[4]; // outstanding request ID (big endian)
+ nni_list pipes;
+};
+
+// An nni_surv_pipe is our per-pipe protocol private structure.
+struct nni_surv_pipe {
+ nni_pipe * npipe;
+ nni_surv_sock * psock;
+ nni_msgq * sendq;
+ int sigclose;
+ nni_list_node node;
+};
+
+static int
+nni_surv_sock_init(void **sp, nni_sock *nsock)
+{
+ nni_surv_sock *psock;
+ int rv;
+
+ if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_cv_init(&psock->cv, nni_sock_mtx(nsock))) != 0) {
+ NNI_FREE_STRUCT(psock);
+ return (rv);
+ }
+ NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node);
+ // this is "semi random" start for request IDs.
+ psock->nextid = (nni_clock() >> 32) ^ (nni_clock() & 0xffffffff);
+ psock->nsock = nsock;
+ psock->raw = 0;
+ psock->survtime = NNI_SECOND * 60;
+ psock->expire = NNI_TIME_ZERO;
+
+ *sp = psock;
+ nni_sock_recverr(nsock, NNG_ESTATE);
+ return (0);
+}
+
+
+static void
+nni_surv_sock_close(void *arg)
+{
+ nni_surv_sock *psock = arg;
+
+ // Shut down the resender. We request it to exit by clearing
+ // its old value, then kick it.
+ psock->closing = 1;
+ nni_cv_wake(&psock->cv);
+}
+
+
+static void
+nni_surv_sock_fini(void *arg)
+{
+ nni_surv_sock *psock = arg;
+
+ nni_cv_fini(&psock->cv);
+ NNI_FREE_STRUCT(psock);
+}
+
+
+static int
+nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+{
+ nni_surv_pipe *ppipe;
+ int rv;
+
+ if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ // This depth could be tunable.
+ if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
+ NNI_FREE_STRUCT(ppipe);
+ return (rv);
+ }
+ ppipe->npipe = npipe;
+ ppipe->sigclose = 0;
+ ppipe->psock = psock;
+ *pp = ppipe;
+ return (0);
+}
+
+
+static void
+nni_surv_pipe_fini(void *arg)
+{
+ nni_surv_pipe *sp = arg;
+
+ NNI_FREE_STRUCT(sp);
+}
+
+
+static int
+nni_surv_pipe_add(void *arg)
+{
+ nni_surv_pipe *ppipe = arg;
+ nni_surv_sock *psock = ppipe->psock;
+
+ nni_list_append(&psock->pipes, ppipe);
+ return (0);
+}
+
+
+static void
+nni_surv_pipe_rem(void *arg)
+{
+ nni_surv_pipe *ppipe = arg;
+ nni_surv_sock *psock = ppipe->psock;
+
+ nni_list_remove(&psock->pipes, ppipe);
+}
+
+
+static void
+nni_surv_pipe_sender(void *arg)
+{
+ nni_surv_pipe *ppipe = arg;
+ nni_surv_sock *psock = ppipe->psock;
+ nni_pipe *npipe = ppipe->npipe;
+ nni_msgq *uwq = nni_sock_sendq(psock->nsock);
+ nni_msgq *urq = nni_sock_recvq(psock->nsock);
+ nni_mtx *mx = nni_sock_mtx(psock->nsock);
+ nni_msg *msg;
+ int rv;
+
+ for (;;) {
+ rv = nni_msgq_get_sig(uwq, &msg, &ppipe->sigclose);
+ if (rv != 0) {
+ break;
+ }
+ rv = nni_pipe_send(npipe, msg);
+ if (rv != 0) {
+ nni_msg_free(msg);
+ break;
+ }
+ }
+ nni_msgq_signal(urq, &ppipe->sigclose);
+ nni_pipe_close(npipe);
+}
+
+
+static void
+nni_surv_pipe_receiver(void *arg)
+{
+ nni_surv_pipe *ppipe = arg;
+ nni_surv_sock *psock = ppipe->psock;
+ nni_msgq *urq = nni_sock_recvq(psock->nsock);
+ nni_msgq *uwq = nni_sock_sendq(psock->nsock);
+ nni_pipe *npipe = ppipe->npipe;
+ nni_msg *msg;
+ int rv;
+
+ for (;;) {
+ rv = nni_pipe_recv(npipe, &msg);
+ if (rv != 0) {
+ break;
+ }
+ // We yank 4 bytes of body, and move them to the header.
+ if (nni_msg_len(msg) < 4) {
+ // Not enough data, just toss it.
+ nni_msg_free(msg);
+ continue;
+ }
+ if (nni_msg_append_header(msg, nni_msg_body(msg), 4) != 0) {
+ // Should be NNG_ENOMEM
+ nni_msg_free(msg);
+ continue;
+ }
+ if (nni_msg_trim(msg, 4) != 0) {
+ // This should never happen - could be an assert.
+ nni_panic("Failed to trim SURV header from body");
+ }
+ rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose);
+ if (rv != 0) {
+ nni_msg_free(msg);
+ break;
+ }
+ }
+ nni_msgq_signal(uwq, &ppipe->sigclose);
+ nni_pipe_close(npipe);
+}
+
+
+static int
+nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+{
+ nni_surv_sock *psock = arg;
+ int rv;
+ int oldraw;
+
+ switch (opt) {
+ case NNG_OPT_SURVEYTIME:
+ rv = nni_setopt_duration(&psock->survtime, buf, sz);
+ break;
+ case NNG_OPT_RAW:
+ oldraw = psock->raw;
+ rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
+ if (oldraw != psock->raw) {
+ if (psock->raw) {
+ nni_sock_recverr(psock->nsock, 0);
+ } else {
+ nni_sock_recverr(psock->nsock, NNG_ESTATE);
+ }
+ memset(psock->survid, 0, sizeof (psock->survid));
+ psock->expire = NNI_TIME_NEVER;
+ nni_cv_wake(&psock->cv);
+ }
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+static int
+nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+{
+ nni_surv_sock *psock = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_SURVEYTIME:
+ rv = nni_getopt_duration(&psock->survtime, buf, szp);
+ break;
+ case NNG_OPT_RAW:
+ rv = nni_getopt_int(&psock->raw, buf, szp);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+
+static void
+nni_surv_sock_sender(void *arg)
+{
+ nni_surv_sock *psock = arg;
+ nni_msgq *uwq = nni_sock_sendq(psock->nsock);
+ nni_mtx *mx = nni_sock_mtx(psock->nsock);
+ nni_msg *msg, *dup;
+
+ for (;;) {
+ nni_surv_pipe *ppipe;
+ nni_surv_pipe *last;
+ int rv;
+
+ if ((rv = nni_msgq_get(uwq, &msg)) != 0) {
+ break;
+ }
+
+ nni_mtx_lock(mx);
+ last = nni_list_last(&psock->pipes);
+ NNI_LIST_FOREACH (&psock->pipes, ppipe) {
+ if (ppipe != last) {
+ rv = nni_msg_dup(&dup, msg);
+ if (rv != 0) {
+ continue;
+ }
+ } else {
+ dup = msg;
+ }
+ if ((rv = nni_msgq_tryput(ppipe->sendq, dup)) != 0) {
+ nni_msg_free(dup);
+ }
+ }
+ nni_mtx_unlock(mx);
+
+ if (last == NULL) {
+ nni_msg_free(msg);
+ }
+ }
+}
+
+
+static void
+nni_surv_sock_timeout(void *arg)
+{
+ nni_surv_sock *psock = arg;
+ nni_mtx *mx = nni_sock_mtx(psock->nsock);
+
+ nni_mtx_lock(mx);
+ for (;;) {
+ if (psock->closing) {
+ nni_mtx_unlock(mx);
+ return;
+ }
+ if (nni_clock() > psock->expire) {
+ // Set the expiration ~forever
+ psock->expire = NNI_TIME_NEVER;
+ // Survey IDs *always* have the high order bit set,
+ // so zeroing means that nothing can match.
+ memset(psock->survid, 0, sizeof (psock->survid));
+ nni_sock_recverr(psock->nsock, NNG_ESTATE);
+ }
+ nni_cv_until(&psock->cv, psock->expire);
+ }
+}
+
+
+static nni_msg *
+nni_surv_sock_sfilter(void *arg, nni_msg *msg)
+{
+ nni_surv_sock *psock = arg;
+ uint32_t id;
+
+ if (psock->raw) {
+ // No automatic retry, and the request ID must
+ // be in the header coming down.
+ return (msg);
+ }
+
+ // Generate a new request ID. We always set the high
+ // order bit so that the peer can locate the end of the
+ // backtrace. (Pipe IDs have the high order bit clear.)
+ id = (psock->nextid++) | 0x80000000u;
+
+ // Survey ID is in big endian format.
+ NNI_PUT32(psock->survid, id);
+
+ if (nni_msg_append_header(msg, psock->survid, 4) != 0) {
+ // Should be ENOMEM.
+ nni_msg_free(msg);
+ return (NULL);
+ }
+
+ // If another message is there, this cancels it. We move the
+ // survey expiration out. The timeout thread will wake up in
+ // the wake below, and reschedule itself appropriately.
+ psock->expire = nni_clock() + psock->survtime;
+ nni_cv_wake(&psock->cv);
+
+ // Clear the error condition.
+ nni_sock_recverr(psock->nsock, 0);
+
+ return (msg);
+}
+
+
+static nni_msg *
+nni_surv_sock_rfilter(void *arg, nni_msg *msg)
+{
+ nni_surv_sock *ssock = arg;
+
+ if (ssock->raw) {
+ // Pass it unmolested
+ return (msg);
+ }
+
+ if (nni_msg_header_len(msg) < 4) {
+ nni_msg_free(msg);
+ return (NULL);
+ }
+
+ if (memcmp(nni_msg_header(msg), ssock->survid, 4) != 0) {
+ // Wrong request id
+ nni_msg_free(msg);
+ return (NULL);
+ }
+ // Prune the survey ID.
+ nni_msg_trim_header(msg, 4);
+
+ return (msg);
+}
+
+
+static nni_proto_pipe_ops nni_surv_pipe_ops = {
+ .pipe_init = nni_surv_pipe_init,
+ .pipe_fini = nni_surv_pipe_fini,
+ .pipe_add = nni_surv_pipe_add,
+ .pipe_rem = nni_surv_pipe_rem,
+ .pipe_worker = { nni_surv_pipe_sender,
+ nni_surv_pipe_receiver }
+};
+
+static nni_proto_sock_ops nni_surv_sock_ops = {
+ .sock_init = nni_surv_sock_init,
+ .sock_fini = nni_surv_sock_fini,
+ .sock_close = nni_surv_sock_close,
+ .sock_setopt = nni_surv_sock_setopt,
+ .sock_getopt = nni_surv_sock_getopt,
+ .sock_rfilter = nni_surv_sock_rfilter,
+ .sock_sfilter = nni_surv_sock_sfilter,
+ .sock_worker = { nni_surv_sock_sender,
+ nni_surv_sock_timeout }
+};
+
+// This is the global protocol structure -- our linkage to the core.
+// This should be the only global non-static symbol in this file.
+nni_proto nni_surveyor_proto = {
+ .proto_self = NNG_PROTO_SURVEYOR,
+ .proto_peer = NNG_PROTO_RESPONDENT,
+ .proto_name = "surveyor",
+ .proto_sock_ops = &nni_surv_sock_ops,
+ .proto_pipe_ops = &nni_surv_pipe_ops,
+};