summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-20 20:52:32 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-24 15:06:33 -0700
commitfdefff742662ed4eb476bf19b9dda245f86bc406 (patch)
treea4e132716debd64e434478f8814f368db052cbc6
parente0b47b12d3d1462d07c5038e4f34f5282eeec675 (diff)
downloadnng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.gz
nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.bz2
nng-fdefff742662ed4eb476bf19b9dda245f86bc406.zip
fixes #342 Want Surveyor/Respondent context support
fixes #360 core should nng_aio_begin before nng_aio_finish_error fixes #361 nng_send_aio should check for NULL message fixes #362 nni_msgq does not signal pollable on certain events This adds support for contexts for both sides of the surveyor pattern. Prior to this commit, the raw mode was completely broken, and there were numerous other bugs found and fixed. This integration includes *much* deeper validation of this pattern. Some changes to the core and other patterns have been made, where it was obvioius that we could make such improvements. (The obviousness stemming from the fact that RESPONDENT in particular is very closely derived from REP.)
-rw-r--r--docs/man/nng_respondent.7.adoc17
-rw-r--r--src/core/msgqueue.c25
-rw-r--r--src/core/msgqueue.h4
-rw-r--r--src/nng.c28
-rw-r--r--src/protocol/reqrep0/rep.c73
-rw-r--r--src/protocol/reqrep0/xrep.c25
-rw-r--r--src/protocol/reqrep0/xreq.c20
-rw-r--r--src/protocol/survey0/CMakeLists.txt12
-rw-r--r--src/protocol/survey0/respond.c644
-rw-r--r--src/protocol/survey0/survey.c546
-rw-r--r--src/protocol/survey0/xrespond.c408
-rw-r--r--src/protocol/survey0/xsurvey.c380
-rw-r--r--tests/CMakeLists.txt4
-rw-r--r--tests/compat_surveyttl.c144
-rw-r--r--tests/reqpoll.c6
-rw-r--r--tests/respondpoll.c109
-rw-r--r--tests/stubs.h31
-rw-r--r--tests/survey.c217
-rw-r--r--tests/surveyctx.c298
-rw-r--r--tests/surveypoll.c126
20 files changed, 2578 insertions, 539 deletions
diff --git a/docs/man/nng_respondent.7.adoc b/docs/man/nng_respondent.7.adoc
index 2db78866..b2060d8e 100644
--- a/docs/man/nng_respondent.7.adoc
+++ b/docs/man/nng_respondent.7.adoc
@@ -16,9 +16,9 @@ nng_respondent - respondent protocol
== SYNOPSIS
[source,c]
-----------
+----
#include <nng/protocol/survey0/respond.h>
-----------
+----
== DESCRIPTION
@@ -50,6 +50,19 @@ Respondents may discard a survey by simply not replying to it.
Raw mode sockets (set with <<nng_options.5#NNG_OPT_RAW,`NNG_OPT_RAW`>>)
ignore all these restrictions.
+=== Context Operations
+
+This protocol supports the creation of <<nng_ctx.5#,contexts>> for concurrent
+use cases using <<nng_ctx_open.3#,`nng_ctx_open()`>>.
+
+Incoming surveys will be routed to and received by only one context.
+Additional surveys may be received by other contexts in parallel.
+Replies made using a context will be returned to the the surveyor that
+issued the survey most recently received by that context.
+The restrictions for order of operations with sockets apply equally
+well for contexts, except that each context will be treated as if it were
+a separate socket.
+
=== Protocol Versions
Only version 0 of this protocol is supported.
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 1bb5a762..7c33b256 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -40,6 +40,8 @@ struct nni_msgq {
void * mq_filter_arg;
};
+static void nni_msgq_run_notify(nni_msgq *);
+
int
nni_msgq_init(nni_msgq **mqp, unsigned cap)
{
@@ -128,6 +130,7 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
}
}
mq->mq_geterr = error;
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -149,6 +152,7 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
}
}
mq->mq_puterr = error;
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -172,6 +176,24 @@ nni_msgq_set_error(nni_msgq *mq, int error)
}
mq->mq_puterr = error;
mq->mq_geterr = error;
+ nni_msgq_run_notify(mq);
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+void
+nni_msgq_flush(nni_msgq *mq)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ while (mq->mq_len > 0) {
+ nni_msg *msg = mq->mq_msgs[mq->mq_get];
+ mq->mq_get++;
+ if (mq->mq_get >= mq->mq_alloc) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ nni_msg_free(msg);
+ }
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -331,6 +353,7 @@ nni_msgq_cancel(nni_aio *aio, int rv)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -413,6 +436,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
nni_list_remove(&mq->mq_aio_getq, raio);
nni_aio_finish_msg(raio, msg);
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
@@ -424,6 +448,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
mq->mq_put = 0;
}
mq->mq_len++;
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 2f1a46eb..65215bd0 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -33,6 +33,10 @@ extern int nni_msgq_init(nni_msgq **, unsigned);
// messages that may be in the queue.
extern void nni_msgq_fini(nni_msgq *);
+// nni_msgq_flush discards any messages that are sitting in the queue.
+// It does not wake any writers that might be waiting.
+extern void nni_msgq_flush(nni_msgq *);
+
extern void nni_msgq_aio_put(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_get(nni_msgq *, nni_aio *);
diff --git a/src/nng.c b/src/nng.c
index 9eb33f50..fb374dde 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -198,7 +198,9 @@ nng_recv_aio(nng_socket sid, nng_aio *aio)
int rv;
if ((rv = nni_sock_find(&sock, sid)) != 0) {
- nni_aio_finish_error(aio, rv);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, rv);
+ }
return;
}
nni_sock_recv(sock, aio);
@@ -211,8 +213,16 @@ nng_send_aio(nng_socket sid, nng_aio *aio)
nni_sock *sock;
int rv;
+ if (nni_aio_get_msg(aio) == NULL) {
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ }
+ return;
+ }
if ((rv = nni_sock_find(&sock, sid)) != 0) {
- nni_aio_finish_error(aio, rv);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, rv);
+ }
return;
}
nni_sock_send(sock, aio);
@@ -260,7 +270,9 @@ nng_ctx_recv(nng_ctx cid, nng_aio *aio)
nni_ctx *ctx;
if ((rv = nni_ctx_find(&ctx, cid, false)) != 0) {
- nni_aio_finish_error(aio, rv);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, rv);
+ }
return;
}
nni_ctx_recv(ctx, aio);
@@ -273,8 +285,16 @@ nng_ctx_send(nng_ctx cid, nng_aio *aio)
int rv;
nni_ctx *ctx;
+ if (nni_aio_get_msg(aio) == NULL) {
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ }
+ return;
+ }
if ((rv = nni_ctx_find(&ctx, cid, false)) != 0) {
- nni_aio_finish_error(aio, rv);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, rv);
+ }
return;
}
nni_ctx_send(ctx, aio);
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 4e20466b..385860cd 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -36,11 +36,9 @@ static void rep0_pipe_fini(void *);
struct rep0_ctx {
rep0_sock * sock;
- bool closed;
char * btrace;
size_t btrace_len;
size_t btrace_size;
- int ttl;
uint32_t pipe_id;
rep0_pipe * spipe; // send pipe
nni_aio * saio; // send aio
@@ -56,7 +54,6 @@ struct rep0_sock {
nni_idhash * pipes;
nni_list recvpipes; // list of pipes with data to receive
nni_list recvq;
- bool closed;
rep0_ctx * ctx;
nni_pollable *recvable;
nni_pollable *sendable;
@@ -82,15 +79,11 @@ rep0_ctx_close(void *arg)
nni_aio * aio;
nni_mtx_lock(&s->lk);
- ctx->closed = true;
if ((aio = ctx->saio) != NULL) {
- nni_msg * msg;
rep0_pipe *pipe = ctx->spipe;
ctx->saio = NULL;
ctx->spipe = NULL;
nni_list_remove(&pipe->sendq, ctx);
- msg = nni_aio_get_msg(aio);
- nni_msg_free(msg);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
if ((aio = ctx->raio) != NULL) {
@@ -191,53 +184,48 @@ rep0_ctx_send(void *arg, nni_aio *aio)
nni_pollable_clear(s->sendable);
}
- if (ctx->closed) {
+ if (len == 0) {
nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_aio_finish_error(aio, NNG_ESTATE);
return;
}
- if (len == 0) {
+ if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, NNG_ESTATE);
+ nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) {
+ if (nni_idhash_find(s->pipes, p_id, (void **) &p) != 0) {
// Pipe is gone. Make this look like a good send to avoid
// disrupting the state machine. We don't care if the peer
// lost interest in our reply.
- nni_aio_set_msg(aio, NULL);
nni_mtx_unlock(&s->lk);
+ nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, nni_msg_len(msg));
nni_msg_free(msg);
return;
}
- if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
+ if (!p->busy) {
+ p->busy = true;
+ len = nni_msg_len(msg);
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->pipe, p->aio_send);
nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, rv);
+
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
return;
}
- if (p->busy) {
- rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx);
- if (rv != 0) {
- nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, rv);
- return;
- }
- ctx->saio = aio;
- ctx->spipe = p;
- nni_list_append(&p->sendq, ctx);
+
+ rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx);
+ if (rv != 0) {
nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
return;
}
-
- p->busy = true;
- len = nni_msg_len(msg);
- nni_aio_set_msg(aio, NULL);
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ ctx->saio = aio;
+ ctx->spipe = p;
+ nni_list_append(&p->sendq, ctx);
nni_mtx_unlock(&s->lk);
-
- nni_aio_finish(aio, 0, len);
}
static void
@@ -376,6 +364,7 @@ rep0_pipe_stop(void *arg)
aio = ctx->saio;
ctx->saio = NULL;
msg = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, nni_msg_len(msg));
nni_msg_free(msg);
}
@@ -384,12 +373,11 @@ rep0_pipe_stop(void *arg)
// accept a message and discard it.)
nni_pollable_raise(s->sendable);
}
+ nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
nni_mtx_unlock(&s->lk);
nni_aio_stop(p->aio_send);
nni_aio_stop(p->aio_recv);
-
- nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
}
static void
@@ -465,11 +453,6 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&s->lk);
- if (ctx->closed) {
- nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
if ((p = nni_list_first(&s->recvpipes)) == NULL) {
int rv;
rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx);
@@ -509,7 +492,6 @@ rep0_pipe_recv_cb(void *arg)
rep0_sock *s = p->rep;
rep0_ctx * ctx;
nni_msg * msg;
- int rv;
uint8_t * body;
nni_aio * aio;
size_t len;
@@ -527,7 +509,7 @@ rep0_pipe_recv_cb(void *arg)
// Move backtrace from body to header
hops = 1;
for (;;) {
- int end = 0;
+ bool end = false;
if (hops > s->ttl) {
// This isn't malformed, but it has gone through
@@ -544,9 +526,8 @@ rep0_pipe_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_header_append(msg, body, 4);
- if (rv != 0) {
+ end = ((body[0] & 0x80) != 0);
+ if (nni_msg_header_append(msg, body, 4) != 0) {
// Out of memory, so drop it.
goto drop;
}
@@ -571,7 +552,6 @@ rep0_pipe_recv_cb(void *arg)
nni_list_remove(&s->recvq, ctx);
aio = ctx->raio;
ctx->raio = NULL;
- nni_aio_set_msg(aio, msg);
nni_aio_set_msg(p->aio_recv, NULL);
// schedule another receive
@@ -591,6 +571,7 @@ rep0_pipe_recv_cb(void *arg)
nni_mtx_unlock(&s->lk);
+ nni_aio_set_msg(aio, msg);
nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
return;
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index f7189453..4773677e 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -189,7 +189,9 @@ xrep0_pipe_stop(void *arg)
nni_aio_stop(p->aio_recv);
nni_aio_stop(p->aio_putq);
+ nni_mtx_lock(&s->lk);
nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
+ nni_mtx_unlock(&s->lk);
}
static void
@@ -200,7 +202,6 @@ xrep0_sock_getq_cb(void *arg)
nni_msg * msg;
uint32_t id;
xrep0_pipe *p;
- int rv;
// This watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
@@ -229,12 +230,12 @@ xrep0_sock_getq_cb(void *arg)
// Look for the pipe, and attempt to put the message there
// (nonblocking) if we can. If we can't for any reason, then we
// free the message.
- if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) {
- rv = nni_msgq_tryput(p->sendq, msg);
- }
- if (rv != 0) {
+ nni_mtx_lock(&s->lk);
+ if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) ||
+ (nni_msgq_tryput(p->sendq, msg) != 0)) {
nni_msg_free(msg);
}
+ nni_mtx_unlock(&s->lk);
// Now look for another message on the upper write queue.
nni_msgq_aio_get(uwq, s->aio_getq);
@@ -277,8 +278,6 @@ xrep0_pipe_recv_cb(void *arg)
xrep0_pipe *p = arg;
xrep0_sock *s = p->rep;
nni_msg * msg;
- int rv;
- uint8_t * body;
int hops;
if (nni_aio_result(p->aio_recv) != 0) {
@@ -292,8 +291,7 @@ xrep0_pipe_recv_cb(void *arg)
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
// Store the pipe id in the header, first thing.
- rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe));
- if (rv != 0) {
+ if (nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)) != 0) {
// Failure here causes us to drop the message.
goto drop;
}
@@ -301,7 +299,8 @@ xrep0_pipe_recv_cb(void *arg)
// Move backtrace from body to header
hops = 1;
for (;;) {
- int end = 0;
+ bool end = 0;
+ uint8_t *body;
if (hops > s->ttl) {
// This isn't malformed, but it has gone through
// too many hops. Do not disconnect, because we
@@ -317,9 +316,8 @@ xrep0_pipe_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_header_append(msg, body, 4);
- if (rv != 0) {
+ end = ((body[0] & 0x80) != 0);
+ if (nni_msg_header_append(msg, body, 4) != 0) {
// Out of memory most likely, but keep going to
// avoid breaking things.
goto drop;
@@ -413,7 +411,6 @@ static nni_proto_sock_ops xrep0_sock_ops = {
.sock_open = xrep0_sock_open,
.sock_close = xrep0_sock_close,
.sock_options = xrep0_sock_options,
- .sock_filter = NULL, // No filtering for raw mode
.sock_send = xrep0_sock_send,
.sock_recv = xrep0_sock_recv,
};
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
index 5c1841b2..13ae7418 100644
--- a/src/protocol/reqrep0/xreq.c
+++ b/src/protocol/reqrep0/xreq.c
@@ -226,25 +226,21 @@ xreq0_recv_cb(void *arg)
// We yank 4 bytes from front of body, and move them to the header.
if (nni_msg_len(msg) < 4) {
- // Malformed message.
- goto malformed;
+ // Peer gave us garbage, so kick it.
+ nni_msg_free(msg);
+ nni_pipe_stop(p->pipe);
+ return;
}
id = nni_msg_trim_u32(msg);
if (nni_msg_header_append_u32(msg, id) != 0) {
- // Arguably we could just discard and carry on. But
- // dropping the connection is probably more helpful since
- // it lets the other side see that a problem occurred.
- // Plus it gives us a chance to reclaim some memory.
- goto malformed;
+ // Probably ENOMEM, discard and carry on.
+ nni_msg_free(msg);
+ nni_pipe_recv(p->pipe, p->aio_recv);
+ return;
}
nni_aio_set_msg(p->aio_putq, msg);
nni_msgq_aio_put(sock->urq, p->aio_putq);
- return;
-
-malformed:
- nni_msg_free(msg);
- nni_pipe_stop(p->pipe);
}
static void
diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt
index 479c031c..0a82463c 100644
--- a/src/protocol/survey0/CMakeLists.txt
+++ b/src/protocol/survey0/CMakeLists.txt
@@ -1,6 +1,6 @@
#
-# Copyright 2017 Garrett D'Amore <garrett@damore.org>
-# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
@@ -8,15 +8,17 @@
# found online at https://opensource.org/licenses/MIT.
#
-# Req/Rep protocol
+# Surveyor/Respondent protocol
if (NNG_PROTO_SURVEYOR0)
- set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/survey.h)
+ set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/xsurvey.c
+ protocol/survey0/survey.h)
set(SURV0_HEADERS protocol/survey0/survey.h)
endif()
if (NNG_PROTO_RESPONDENT0)
- set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/respond.h)
+ set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/xrespond.c
+ protocol/survey0/respond.h)
set(RESP0_HEADERS protocol/survey0/respond.h)
endif()
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index 1605d9e6..60cf188a 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -28,49 +28,212 @@
typedef struct resp0_pipe resp0_pipe;
typedef struct resp0_sock resp0_sock;
+typedef struct resp0_ctx resp0_ctx;
-static void resp0_recv_cb(void *);
-static void resp0_putq_cb(void *);
-static void resp0_getq_cb(void *);
-static void resp0_send_cb(void *);
-static void resp0_sock_getq_cb(void *);
+static void resp0_pipe_send_cb(void *);
+static void resp0_pipe_recv_cb(void *);
static void resp0_pipe_fini(void *);
+struct resp0_ctx {
+ resp0_sock * sock;
+ char * btrace;
+ size_t btrace_len;
+ size_t btrace_size;
+ uint32_t pipe_id;
+ resp0_pipe * spipe; // send pipe
+ nni_aio * saio; // send aio
+ nni_aio * raio; // recv aio
+ nni_list_node sqnode;
+ nni_list_node rqnode;
+};
+
// resp0_sock is our per-socket protocol private structure.
struct resp0_sock {
- nni_msgq * urq;
- nni_msgq * uwq;
- int ttl;
- nni_idhash *pipes;
- char * btrace;
- size_t btrace_len;
- nni_aio * aio_getq;
- nni_mtx mtx;
+ nni_mtx mtx;
+ int ttl;
+ nni_idhash * pipes;
+ resp0_ctx * ctx;
+ nni_list recvpipes;
+ nni_list recvq;
+ nni_pollable *recvable;
+ nni_pollable *sendable;
};
// resp0_pipe is our per-pipe protocol private structure.
struct resp0_pipe {
- nni_pipe * npipe;
- resp0_sock *psock;
- uint32_t id;
- nni_msgq * sendq;
- nni_aio * aio_getq;
- nni_aio * aio_putq;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+ nni_pipe * npipe;
+ resp0_sock * psock;
+ bool busy;
+ uint32_t id;
+ nni_list sendq; // contexts waiting to send
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_list_node rnode; // receivable linkage
};
static void
+resp0_ctx_close(void *arg)
+{
+ resp0_ctx * ctx = arg;
+ resp0_sock *s = ctx->sock;
+ nni_aio * aio;
+
+ // complete any outstanding operations here, cancellation, etc.
+
+ nni_mtx_lock(&s->mtx);
+ if ((aio = ctx->saio) != NULL) {
+ resp0_pipe *p = ctx->spipe;
+ ctx->saio = NULL;
+ ctx->spipe = NULL;
+ nni_list_remove(&p->sendq, ctx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ if ((aio = ctx->raio) != NULL) {
+ ctx->raio = NULL;
+ nni_list_remove(&s->recvq, ctx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+resp0_ctx_fini(void *arg)
+{
+ resp0_ctx *ctx = arg;
+
+ resp0_ctx_close(ctx);
+ nni_free(ctx->btrace, ctx->btrace_size);
+ NNI_FREE_STRUCT(ctx);
+}
+
+static int
+resp0_ctx_init(void **ctxp, void *sarg)
+{
+ resp0_sock *s = sarg;
+ resp0_ctx * ctx;
+
+ if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ // this is 1kB, which covers the worst case.
+ ctx->btrace_size = 256 * sizeof(uint32_t);
+ if ((ctx->btrace = nni_alloc(ctx->btrace_size)) == NULL) {
+ NNI_FREE_STRUCT(ctx);
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_NODE_INIT(&ctx->sqnode);
+ // XXX: NNI_LIST_NODE_INIT(&ctx->rqnode);
+ ctx->btrace_len = 0;
+ ctx->sock = s;
+ ctx->pipe_id = 0;
+ *ctxp = ctx;
+
+ return (0);
+}
+
+static void
+resp0_ctx_cancel_send(nni_aio *aio, int rv)
+{
+ resp0_ctx * ctx = nni_aio_get_prov_data(aio);
+ resp0_sock *s = ctx->sock;
+
+ nni_mtx_lock(&s->mtx);
+ if (ctx->saio != aio) {
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+ nni_list_node_remove(&ctx->sqnode);
+ ctx->saio = NULL;
+ nni_mtx_unlock(&s->mtx);
+ nni_msg_header_clear(nni_aio_get_msg(aio)); // reset the headers
+ nni_aio_finish_error(aio, rv);
+}
+
+static void
+resp0_ctx_send(void *arg, nni_aio *aio)
+{
+ resp0_ctx * ctx = arg;
+ resp0_sock *s = ctx->sock;
+ resp0_pipe *p;
+ nni_msg * msg;
+ size_t len;
+ uint32_t pid;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ msg = nni_aio_get_msg(aio);
+ nni_msg_header_clear(msg);
+
+ if (ctx == s->ctx) {
+ // We can't send anymore, because only one send per request.
+ nni_pollable_clear(s->sendable);
+ }
+
+ nni_mtx_lock(&s->mtx);
+
+ if ((len = ctx->btrace_len) == 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ pid = ctx->pipe_id;
+ ctx->pipe_id = 0;
+ ctx->btrace_len = 0;
+
+ if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (nni_idhash_find(s->pipes, pid, (void **) &p) != 0) {
+ // Surveyor has left the building. Just discard the reply.
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+ nni_msg_free(msg);
+ return;
+ }
+
+ if (!p->busy) {
+ p->busy = true;
+ len = nni_msg_len(msg);
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->npipe, p->aio_send);
+ nni_mtx_unlock(&s->mtx);
+
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
+ return;
+ }
+
+ if ((rv = nni_aio_schedule_verify(aio, resp0_ctx_cancel_send, ctx)) !=
+ 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ ctx->saio = aio;
+ ctx->spipe = p;
+ nni_list_append(&p->sendq, ctx);
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
resp0_sock_fini(void *arg)
{
resp0_sock *s = arg;
- nni_aio_stop(s->aio_getq);
- nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
- if (s->btrace != NULL) {
- nni_free(s->btrace, s->btrace_len);
+ if (s->ctx != NULL) {
+ resp0_ctx_fini(s->ctx);
}
+ nni_pollable_free(s->sendable);
+ nni_pollable_free(s->recvable);
nni_mtx_fini(&s->mtx);
NNI_FREE_STRUCT(s);
}
@@ -81,22 +244,34 @@ resp0_sock_init(void **sp, nni_sock *nsock)
resp0_sock *s;
int rv;
+ NNI_ARG_UNUSED(nsock);
+
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->mtx);
- if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) {
+ if ((rv = nni_idhash_init(&s->pipes)) != 0) {
resp0_sock_fini(s);
return (rv);
}
- s->ttl = 8; // Per RFC
- s->btrace = NULL;
- s->btrace_len = 0;
- s->urq = nni_sock_recvq(nsock);
- s->uwq = nni_sock_sendq(nsock);
+ NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode);
+ NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode);
+
+ s->ttl = 8; // Per RFC
+
+ if ((rv = resp0_ctx_init((void **) &s->ctx, s)) != 0) {
+ resp0_ctx_fini(s);
+ return (rv);
+ }
+ // We start off without being either readable or pollable.
+ // Readability comes when there is something on the socket.
+ if (((rv = nni_pollable_alloc(&s->sendable)) != 0) ||
+ ((rv = nni_pollable_alloc(&s->recvable)) != 0)) {
+ resp0_sock_fini(s);
+ return (rv);
+ }
*sp = s;
return (0);
}
@@ -104,9 +279,7 @@ resp0_sock_init(void **sp, nni_sock *nsock)
static void
resp0_sock_open(void *arg)
{
- resp0_sock *s = arg;
-
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ NNI_ARG_UNUSED(arg);
}
static void
@@ -114,7 +287,7 @@ resp0_sock_close(void *arg)
{
resp0_sock *s = arg;
- nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+ resp0_ctx_close(s->ctx);
}
static void
@@ -122,11 +295,8 @@ resp0_pipe_fini(void *arg)
{
resp0_pipe *p = arg;
- nni_aio_fini(p->aio_putq);
- nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
- nni_msgq_fini(p->sendq);
NNI_FREE_STRUCT(p);
}
@@ -139,18 +309,20 @@ resp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) {
+ if (((rv = nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) {
resp0_pipe_fini(p);
return (rv);
}
+ NNI_LIST_INIT(&p->sendq, resp0_ctx, sqnode);
+
p->npipe = npipe;
p->psock = s;
- *pp = p;
+ p->busy = false;
+ p->id = nni_pipe_id(npipe);
+
+ *pp = p;
return (0);
}
@@ -161,8 +333,6 @@ resp0_pipe_start(void *arg)
resp0_sock *s = p->psock;
int rv;
- p->id = nni_pipe_id(p->npipe);
-
nni_mtx_lock(&s->mtx);
rv = nni_idhash_insert(s->pipes, p->id, p);
nni_mtx_unlock(&s->mtx);
@@ -171,8 +341,6 @@ resp0_pipe_start(void *arg)
}
nni_pipe_recv(p->npipe, p->aio_recv);
- nni_msgq_aio_get(p->sendq, p->aio_getq);
-
return (rv);
}
@@ -181,139 +349,179 @@ resp0_pipe_stop(void *arg)
{
resp0_pipe *p = arg;
resp0_sock *s = p->psock;
+ resp0_ctx * ctx;
+
+ nni_mtx_lock(&s->mtx);
+ while ((ctx = nni_list_first(&p->sendq)) != NULL) {
+ nni_aio *aio;
+ nni_msg *msg;
+ nni_list_remove(&p->sendq, ctx);
+ aio = ctx->saio;
+ ctx->saio = NULL;
+ msg = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+ nni_msg_free(msg);
+ }
+ if (p->id == s->ctx->pipe_id) {
+ // Make sure user space knows they can send a message to us,
+ // which we will happily discard.
+ nni_pollable_raise(s->sendable);
+ }
+ nni_idhash_remove(s->pipes, p->id);
+ nni_mtx_unlock(&s->mtx);
- nni_msgq_close(p->sendq);
- nni_aio_stop(p->aio_putq);
- nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
nni_aio_stop(p->aio_recv);
-
- if (p->id != 0) {
- nni_mtx_lock(&s->mtx);
- nni_idhash_remove(s->pipes, p->id);
- nni_mtx_unlock(&s->mtx);
- p->id = 0;
- }
}
-// resp0_sock_send watches for messages from the upper write queue,
-// extracts the destination pipe, and forwards it to the appropriate
-// destination pipe via a separate queue. This prevents a single bad
-// or slow pipe from gumming up the works for the entire socket.s
-
-void
-resp0_sock_getq_cb(void *arg)
+static void
+resp0_pipe_send_cb(void *arg)
{
- resp0_sock *s = arg;
+ resp0_pipe *p = arg;
+ resp0_sock *s = p->psock;
+ resp0_ctx * ctx;
+ nni_aio * aio;
nni_msg * msg;
- uint32_t id;
- resp0_pipe *p;
- int rv;
+ size_t len;
- if (nni_aio_result(s->aio_getq) != 0) {
+ nni_mtx_lock(&s->mtx);
+ p->busy = false;
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->npipe);
+ nni_mtx_unlock(&s->mtx);
return;
}
- msg = nni_aio_get_msg(s->aio_getq);
- nni_aio_set_msg(s->aio_getq, NULL);
-
- // We yank the outgoing pipe id from the header
- if (nni_msg_header_len(msg) < 4) {
- nni_msg_free(msg);
- // We can't really close down the socket, so just keep going.
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ if ((ctx = nni_list_first(&p->sendq)) == NULL) {
+ // Nothing else to send.
+ if (p->id == s->ctx->pipe_id) {
+ // Mark us ready for the other side to send!
+ nni_pollable_raise(s->sendable);
+ }
+ nni_mtx_unlock(&s->mtx);
return;
}
- id = nni_msg_header_trim_u32(msg);
- nni_mtx_lock(&s->mtx);
- if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) != 0) {
- // Destination pipe not present.
- nni_msg_free(msg);
- } else {
- // Non-blocking put.
- if (nni_msgq_tryput(p->sendq, msg) != 0) {
- nni_msg_free(msg);
- }
- }
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ nni_list_remove(&p->sendq, ctx);
+ aio = ctx->saio;
+ ctx->saio = NULL;
+ ctx->spipe = NULL;
+ p->busy = true;
+ msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->npipe, p->aio_send);
+
nni_mtx_unlock(&s->mtx);
+
+ nni_aio_finish_synch(aio, 0, len);
}
-void
-resp0_getq_cb(void *arg)
+static void
+resp0_cancel_recv(nni_aio *aio, int rv)
{
- resp0_pipe *p = arg;
+ resp0_ctx * ctx = nni_aio_get_prov_data(aio);
+ resp0_sock *s = ctx->sock;
- if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->npipe);
- return;
+ nni_mtx_lock(&s->mtx);
+ if (ctx->raio == aio) {
+ nni_list_remove(&s->recvq, ctx);
+ ctx->raio = NULL;
+ nni_aio_finish_error(aio, rv);
}
-
- nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
- nni_aio_set_msg(p->aio_getq, NULL);
-
- nni_pipe_send(p->npipe, p->aio_send);
+ nni_mtx_unlock(&s->mtx);
}
-void
-resp0_send_cb(void *arg)
+static void
+resp0_ctx_recv(void *arg, nni_aio *aio)
{
- resp0_pipe *p = arg;
+ resp0_ctx * ctx = arg;
+ resp0_sock *s = ctx->sock;
+ resp0_pipe *p;
+ size_t len;
+ nni_msg * msg;
- if (nni_aio_result(p->aio_send) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&s->mtx);
+ if ((p = nni_list_first(&s->recvpipes)) == NULL) {
+ int rv;
+ rv = nni_aio_schedule_verify(aio, resp0_cancel_recv, ctx);
+ if (rv != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ ctx->raio = aio;
+ nni_list_append(&s->recvq, ctx);
+ nni_mtx_unlock(&s->mtx);
return;
}
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_list_remove(&s->recvpipes, p);
+ if (nni_list_empty(&s->recvpipes)) {
+ nni_pollable_clear(s->recvable);
+ }
+ nni_pipe_recv(p->npipe, p->aio_recv);
- nni_msgq_aio_get(p->sendq, p->aio_getq);
+ len = nni_msg_header_len(msg);
+ memcpy(ctx->btrace, nni_msg_header(msg), len);
+ ctx->btrace_len = len;
+ ctx->pipe_id = p->id;
+ if (ctx == s->ctx) {
+ nni_pollable_raise(s->sendable);
+ }
+ nni_mtx_unlock(&s->mtx);
+
+ nni_msg_header_clear(msg);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
}
static void
-resp0_recv_cb(void *arg)
+resp0_pipe_recv_cb(void *arg)
{
- resp0_pipe *p = arg;
- resp0_sock *s = p->psock;
- nni_msgq * urq = s->urq;
+ resp0_pipe *p = arg;
+ resp0_sock *s = p->psock;
+ resp0_ctx * ctx;
nni_msg * msg;
+ nni_aio * aio;
int hops;
- int rv;
+ size_t len;
if (nni_aio_result(p->aio_recv) != 0) {
- goto error;
+ nni_pipe_stop(p->npipe);
+ return;
}
msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, p->id);
- // Store the pipe id in the header, first thing.
- if (nni_msg_header_append_u32(msg, p->id) != 0) {
- nni_msg_free(msg);
- goto error;
- }
-
// Move backtrace from body to header
- hops = 0;
+ hops = 1;
for (;;) {
- int end = 0;
+ bool end = 0;
uint8_t *body;
- if (hops >= s->ttl) {
- nni_msg_free(msg);
- goto error;
+ if (hops > s->ttl) {
+ goto drop;
}
+ hops++;
if (nni_msg_len(msg) < 4) {
+ // Peer is speaking garbage, kick it.
nni_msg_free(msg);
- goto error;
+ nni_pipe_stop(p->npipe);
+ return;
}
body = nni_msg_body(msg);
- end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_header_append(msg, body, 4);
- if (rv != 0) {
- nni_msg_free(msg);
- goto error;
+ end = ((body[0] & 0x80) != 0);
+ if (nni_msg_header_append(msg, body, 4) != 0) {
+ goto drop;
}
nni_msg_trim(msg, 4);
if (end) {
@@ -321,26 +529,41 @@ resp0_recv_cb(void *arg)
}
}
- // Now send it up.
- nni_aio_set_msg(p->aio_putq, msg);
- nni_msgq_aio_put(urq, p->aio_putq);
- return;
+ len = nni_msg_header_len(msg);
-error:
- nni_pipe_stop(p->npipe);
-}
+ nni_mtx_lock(&s->mtx);
+ if ((ctx = nni_list_first(&s->recvq)) == NULL) {
+ // No one blocked in recv, stall.
+ nni_list_append(&s->recvpipes, p);
+ nni_pollable_raise(s->recvable);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
-static void
-resp0_putq_cb(void *arg)
-{
- resp0_pipe *p = arg;
+ nni_list_remove(&s->recvq, ctx);
+ aio = ctx->raio;
+ ctx->raio = NULL;
+ nni_aio_set_msg(p->aio_recv, NULL);
- if (nni_aio_result(p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_putq));
- nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
+ // Start the next receive.
+ nni_pipe_recv(p->npipe, p->aio_recv);
+
+ ctx->btrace_len = len;
+ memcpy(ctx->btrace, nni_msg_header(msg), len);
+ nni_msg_header_clear(msg);
+ ctx->pipe_id = p->id;
+
+ if ((ctx == s->ctx) && (!p->busy)) {
+ nni_pollable_raise(s->sendable);
}
+ nni_mtx_unlock(&s->mtx);
+
+ nni_aio_set_msg(aio, msg);
+ nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
+ return;
+drop:
+ nni_msg_free(msg);
nni_pipe_recv(p->npipe, p->aio_recv);
}
@@ -358,76 +581,38 @@ resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ)
return (nni_copyout_int(s->ttl, buf, szp, typ));
}
-static void
-resp0_sock_send_raw(void *arg, nni_aio *aio)
+static int
+resp0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ)
{
resp0_sock *s = arg;
+ int rv;
+ int fd;
- nni_msgq_aio_put(s->uwq, aio);
+ if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) {
+ return (rv);
+ }
+ return (nni_copyout_int(fd, buf, szp, typ));
}
-static void
-resp0_sock_send(void *arg, nni_aio *aio)
+static int
+resp0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ)
{
resp0_sock *s = arg;
- nni_msg * msg;
int rv;
+ int fd;
- nni_mtx_lock(&s->mtx);
-
- msg = nni_aio_get_msg(aio);
-
- // If we have a stored backtrace, append it to the header...
- // if we don't have a backtrace, discard the message.
- if (s->btrace == NULL) {
- nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, NNG_ESTATE);
- return;
- }
-
- // drop anything else in the header...
- nni_msg_header_clear(msg);
-
- if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) {
- nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, rv);
- return;
+ if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) {
+ return (rv);
}
-
- nni_free(s->btrace, s->btrace_len);
- s->btrace = NULL;
- s->btrace_len = 0;
-
- nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_put(s->uwq, aio);
+ return (nni_copyout_int(fd, buf, szp, typ));
}
-static nni_msg *
-resp0_sock_filter(void *arg, nni_msg *msg)
+static void
+resp0_sock_send(void *arg, nni_aio *aio)
{
resp0_sock *s = arg;
- char * header;
- size_t len;
-
- nni_mtx_lock(&s->mtx);
- len = nni_msg_header_len(msg);
- header = nni_msg_header(msg);
- if (s->btrace != NULL) {
- nni_free(s->btrace, s->btrace_len);
- s->btrace = NULL;
- s->btrace_len = 0;
- }
- if ((s->btrace = nni_alloc(len)) == NULL) {
- nni_mtx_unlock(&s->mtx);
- nni_msg_free(msg);
- return (NULL);
- }
- s->btrace_len = len;
- memcpy(s->btrace, header, len);
- nni_msg_header_clear(msg);
- nni_mtx_unlock(&s->mtx);
- return (msg);
+ resp0_ctx_send(s->ctx, aio);
}
static void
@@ -435,7 +620,7 @@ resp0_sock_recv(void *arg, nni_aio *aio)
{
resp0_sock *s = arg;
- nni_msgq_aio_get(s->urq, aio);
+ resp0_ctx_recv(s->ctx, aio);
}
static nni_proto_pipe_ops resp0_pipe_ops = {
@@ -445,6 +630,13 @@ static nni_proto_pipe_ops resp0_pipe_ops = {
.pipe_stop = resp0_pipe_stop,
};
+static nni_proto_ctx_ops resp0_ctx_ops = {
+ .ctx_init = resp0_ctx_init,
+ .ctx_fini = resp0_ctx_fini,
+ .ctx_send = resp0_ctx_send,
+ .ctx_recv = resp0_ctx_recv,
+};
+
static nni_proto_sock_option resp0_sock_options[] = {
{
.pso_name = NNG_OPT_MAXTTL,
@@ -452,6 +644,18 @@ static nni_proto_sock_option resp0_sock_options[] = {
.pso_getopt = resp0_sock_getopt_maxttl,
.pso_setopt = resp0_sock_setopt_maxttl,
},
+ {
+ .pso_name = NNG_OPT_RECVFD,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = resp0_sock_getopt_recvfd,
+ .pso_setopt = NULL,
+ },
+ {
+ .pso_name = NNG_OPT_SENDFD,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = resp0_sock_getopt_sendfd,
+ .pso_setopt = NULL,
+ },
// terminate list
{
.pso_name = NULL,
@@ -463,39 +667,19 @@ static nni_proto_sock_ops resp0_sock_ops = {
.sock_fini = resp0_sock_fini,
.sock_open = resp0_sock_open,
.sock_close = resp0_sock_close,
- .sock_filter = resp0_sock_filter,
.sock_send = resp0_sock_send,
.sock_recv = resp0_sock_recv,
.sock_options = resp0_sock_options,
};
-static nni_proto_sock_ops resp0_sock_ops_raw = {
- .sock_init = resp0_sock_init,
- .sock_fini = resp0_sock_fini,
- .sock_open = resp0_sock_open,
- .sock_close = resp0_sock_close,
- .sock_filter = NULL, // no filter for raw
- .sock_send = resp0_sock_send_raw,
- .sock_recv = resp0_sock_recv,
- .sock_options = resp0_sock_options,
-};
-
static nni_proto resp0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" },
.proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &resp0_sock_ops,
.proto_pipe_ops = &resp0_pipe_ops,
-};
-
-static nni_proto resp0_proto_raw = {
- .proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" },
- .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
- .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
- .proto_sock_ops = &resp0_sock_ops_raw,
- .proto_pipe_ops = &resp0_pipe_ops,
+ .proto_ctx_ops = &resp0_ctx_ops,
};
int
@@ -503,9 +687,3 @@ nng_respondent0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &resp0_proto));
}
-
-int
-nng_respondent0_open_raw(nng_socket *sidp)
-{
- return (nni_proto_open(sidp, &resp0_proto_raw));
-}
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index b7158464..e725d2b3 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -16,6 +16,9 @@
// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
// survey pattern. This is useful for building service discovery, voting, etc.
+// Note that this pattern is not optimized for extreme low latency, as it makes
+// multiple use of queues for simplicity. Typically this is used in cases
+// where a few dozen extra microseconds does not matter.
#ifndef NNI_PROTO_SURVEYOR_V0
#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
@@ -27,86 +30,249 @@
typedef struct surv0_pipe surv0_pipe;
typedef struct surv0_sock surv0_sock;
+typedef struct surv0_ctx surv0_ctx;
-static void surv0_sock_getq_cb(void *);
-static void surv0_getq_cb(void *);
-static void surv0_putq_cb(void *);
-static void surv0_send_cb(void *);
-static void surv0_recv_cb(void *);
-static void surv0_timeout(void *);
+static void surv0_pipe_getq_cb(void *);
+static void surv0_pipe_send_cb(void *);
+static void surv0_pipe_recv_cb(void *);
+static void surv0_ctx_timeout(void *);
+
+struct surv0_ctx {
+ surv0_sock * sock;
+ uint64_t survid; // survey id
+ nni_timer_node timer;
+ nni_time expire;
+ nni_duration survtime;
+ nni_msgq * rq; // recv message queue
+};
// surv0_sock is our per-socket protocol private structure.
struct surv0_sock {
- nni_duration survtime;
- nni_time expire;
- int ttl;
- uint32_t nextid; // next id
- uint32_t survid; // outstanding request ID (big endian)
- nni_list pipes;
- nni_aio * aio_getq;
- nni_timer_node timer;
- nni_msgq * uwq;
- nni_msgq * urq;
- nni_mtx mtx;
+ int ttl;
+ nni_list pipes;
+ nni_mtx mtx;
+ surv0_ctx * ctx;
+ nni_idhash * surveys;
+ nni_pollable *sendable;
};
// surv0_pipe is our per-pipe protocol private structure.
struct surv0_pipe {
nni_pipe * npipe;
- surv0_sock * psock;
+ surv0_sock * sock;
nni_msgq * sendq;
nni_list_node node;
nni_aio * aio_getq;
- nni_aio * aio_putq;
nni_aio * aio_send;
nni_aio * aio_recv;
};
static void
+surv0_ctx_fini(void *arg)
+{
+ surv0_ctx *ctx = arg;
+
+ if (ctx->rq != NULL) {
+ nni_msgq_close(ctx->rq);
+ nni_msgq_fini(ctx->rq);
+ }
+ nni_timer_cancel(&ctx->timer);
+ NNI_FREE_STRUCT(ctx);
+}
+
+static int
+surv0_ctx_init(void **ctxp, void *sarg)
+{
+ surv0_ctx * ctx;
+ surv0_sock *sock = sarg;
+ int rv;
+
+ if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_lock(&sock->mtx);
+ if (sock->ctx != NULL) {
+ ctx->survtime = sock->ctx->survtime;
+ }
+ nni_mtx_unlock(&sock->mtx);
+ ctx->sock = sock;
+ // 126 is a deep enough queue, and leaves 2 extra cells for the
+ // pushback bit in msgqs. This can result in up to 1kB of allocation
+ // for the message queue.
+ if ((rv = nni_msgq_init(&ctx->rq, 126)) != 0) {
+ surv0_ctx_fini(ctx);
+ return (rv);
+ }
+
+ nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx);
+ *ctxp = ctx;
+ return (0);
+}
+
+static void
+surv0_ctx_recv(void *arg, nni_aio *aio)
+{
+ surv0_ctx * ctx = arg;
+ surv0_sock *sock = ctx->sock;
+
+ nni_mtx_lock(&sock->mtx);
+ if (ctx->survid == 0) {
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ nni_msgq_aio_get(ctx->rq, aio);
+ nni_mtx_unlock(&sock->mtx);
+}
+
+void
+surv0_ctx_timeout(void *arg)
+{
+ surv0_ctx * ctx = arg;
+ surv0_sock *sock = ctx->sock;
+
+ nni_mtx_lock(&sock->mtx);
+ if (nni_clock() < ctx->expire) {
+ nni_mtx_unlock(&sock->mtx);
+ return;
+ }
+ // Abort any pending receives.
+ nni_msgq_set_get_error(ctx->rq, NNG_ETIMEDOUT);
+ if (ctx->survid != 0) {
+ nni_idhash_remove(sock->surveys, ctx->survid);
+ ctx->survid = 0;
+ }
+ nni_mtx_unlock(&sock->mtx);
+}
+
+static void
+surv0_ctx_send(void *arg, nni_aio *aio)
+{
+ surv0_ctx * ctx = arg;
+ surv0_sock *sock = ctx->sock;
+ surv0_pipe *pipe;
+ nni_msg * msg = nni_aio_get_msg(aio);
+ size_t len = nni_msg_len(msg);
+ nni_time now = nni_clock();
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&sock->mtx);
+
+ // Abort any pending receives -- this is the same as cancellation.
+ nni_msgq_set_get_error(ctx->rq, NNG_ECANCELED);
+ nni_msgq_flush(ctx->rq);
+
+ // New survey id will be generated, so unregister the old one.
+ if (ctx->survid) {
+ nni_idhash_remove(sock->surveys, ctx->survid);
+ ctx->survid = 0;
+ }
+ // Allocate the new ID.
+ if ((rv = nni_idhash_alloc(sock->surveys, &ctx->survid, ctx)) != 0) {
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ // Insert it into the message. We report an error if one occurs,
+ // although arguably at this point we could just discard silently.
+ if ((rv = nni_msg_header_append_u32(msg, (uint32_t) ctx->survid)) !=
+ 0) {
+ nni_idhash_remove(sock->surveys, ctx->survid);
+ ctx->survid = 0;
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ // From this point, we're committed to success. Note that we send
+ // regardless of whether there are any pipes or not. If no pipes,
+ // then it just gets discarded.
+ nni_aio_set_msg(aio, NULL);
+ NNI_LIST_FOREACH (&sock->pipes, pipe) {
+ nni_msg *dmsg;
+
+ if (nni_list_next(&sock->pipes, pipe) != NULL) {
+ if (nni_msg_dup(&dmsg, msg) != 0) {
+ continue;
+ }
+ } else {
+ dmsg = msg;
+ msg = NULL;
+ }
+ if (nni_msgq_tryput(pipe->sendq, dmsg) != 0) {
+ nni_msg_free(dmsg);
+ }
+ }
+
+ ctx->expire = now + ctx->survtime;
+ nni_timer_schedule(&ctx->timer, ctx->expire);
+
+ // Allow recv to run.
+ nni_msgq_set_get_error(ctx->rq, 0);
+
+ nni_mtx_unlock(&sock->mtx);
+ if (msg != NULL) {
+ nni_msg_free(msg);
+ }
+
+ nni_aio_finish(aio, 0, len);
+}
+
+static void
surv0_sock_fini(void *arg)
{
- surv0_sock *s = arg;
+ surv0_sock *sock = arg;
- nni_aio_stop(s->aio_getq);
- nni_aio_fini(s->aio_getq);
- nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
+ if (sock->ctx != NULL) {
+ surv0_ctx_fini(sock->ctx);
+ }
+ nni_idhash_fini(sock->surveys);
+ nni_pollable_free(sock->sendable);
+ nni_mtx_fini(&sock->mtx);
+ NNI_FREE_STRUCT(sock);
}
static int
surv0_sock_init(void **sp, nni_sock *nsock)
{
- surv0_sock *s;
+ surv0_sock *sock;
int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ NNI_ARG_UNUSED(nsock);
+
+ if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) {
- surv0_sock_fini(s);
+ NNI_LIST_INIT(&sock->pipes, surv0_pipe, node);
+ nni_mtx_init(&sock->mtx);
+
+ if (((rv = nni_idhash_init(&sock->surveys)) != 0) ||
+ ((rv = surv0_ctx_init((void **) &sock->ctx, sock)) != 0)) {
+ surv0_sock_fini(sock);
return (rv);
}
- NNI_LIST_INIT(&s->pipes, surv0_pipe, node);
- nni_mtx_init(&s->mtx);
- nni_timer_init(&s->timer, surv0_timeout, s);
-
- s->nextid = nni_random();
- s->survtime = NNI_SECOND;
- s->expire = NNI_TIME_ZERO;
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
- s->ttl = 8;
-
- *sp = s;
+
+ // Survey IDs are 32 bits, with the high order bit set.
+ // We start at a random point, to minimize likelihood of
+ // accidental collision across restarts.
+ nni_idhash_set_limits(sock->surveys, 0x80000000u, 0xffffffffu,
+ nni_random() | 0x80000000u);
+
+ sock->ctx->survtime = NNI_SECOND;
+ sock->ttl = 8;
+
+ *sp = sock;
return (0);
}
static void
surv0_sock_open(void *arg)
{
- surv0_sock *s = arg;
-
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ NNI_ARG_UNUSED(arg);
}
static void
@@ -114,8 +280,7 @@ surv0_sock_close(void *arg)
{
surv0_sock *s = arg;
- nni_timer_cancel(&s->timer);
- nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+ nni_msgq_close(s->ctx->rq);
}
static void
@@ -126,7 +291,6 @@ surv0_pipe_fini(void *arg)
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
nni_msgq_fini(p->sendq);
NNI_FREE_STRUCT(p);
}
@@ -140,18 +304,20 @@ surv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- // This depth could be tunable.
+ // This depth could be tunable. The deeper the queue, the more
+ // concurrent surveys that can be delivered. Having said that, this
+ // is best effort, and a deep queue doesn't really do much for us.
+ // Note that surveys can be *outstanding*, but not yet put on the wire.
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) {
+ ((rv = nni_aio_init(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) {
surv0_pipe_fini(p);
return (rv);
}
p->npipe = npipe;
- p->psock = s;
+ p->sock = s;
*pp = p;
return (0);
}
@@ -160,7 +326,7 @@ static int
surv0_pipe_start(void *arg)
{
surv0_pipe *p = arg;
- surv0_sock *s = p->psock;
+ surv0_sock *s = p->sock;
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
@@ -175,12 +341,11 @@ static void
surv0_pipe_stop(void *arg)
{
surv0_pipe *p = arg;
- surv0_sock *s = p->psock;
+ surv0_sock *s = p->sock;
nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_putq);
nni_msgq_close(p->sendq);
@@ -192,7 +357,7 @@ surv0_pipe_stop(void *arg)
}
static void
-surv0_getq_cb(void *arg)
+surv0_pipe_getq_cb(void *arg)
{
surv0_pipe *p = arg;
@@ -208,7 +373,7 @@ surv0_getq_cb(void *arg)
}
static void
-surv0_send_cb(void *arg)
+surv0_pipe_send_cb(void *arg)
{
surv0_pipe *p = arg;
@@ -223,28 +388,17 @@ surv0_send_cb(void *arg)
}
static void
-surv0_putq_cb(void *arg)
+surv0_pipe_recv_cb(void *arg)
{
- surv0_pipe *p = arg;
-
- if (nni_aio_result(p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_putq));
- nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
- return;
- }
-
- nni_pipe_recv(p->npipe, p->aio_recv);
-}
-
-static void
-surv0_recv_cb(void *arg)
-{
- surv0_pipe *p = arg;
+ surv0_pipe *p = arg;
+ surv0_sock *sock = p->sock;
+ surv0_ctx * ctx;
nni_msg * msg;
+ uint32_t id;
if (nni_aio_result(p->aio_recv) != 0) {
- goto failed;
+ nni_pipe_stop(p->npipe);
+ return;
}
msg = nni_aio_get_msg(p->aio_recv);
@@ -253,23 +407,45 @@ surv0_recv_cb(void *arg)
// We yank 4 bytes of body, and move them to the header.
if (nni_msg_len(msg) < 4) {
- // Not enough data, just toss it.
+ // Peer sent us garbage. Kick it.
nni_msg_free(msg);
- goto failed;
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+ id = nni_msg_trim_u32(msg);
+ if (nni_msg_header_append_u32(msg, id) != 0) {
+ // Should be NNG_ENOMEM - discard and try again.
+ nni_msg_free(msg);
+ nni_pipe_recv(p->npipe, p->aio_recv);
+ return;
}
- if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) {
- // Should be NNG_ENOMEM
+
+ nni_mtx_lock(&sock->mtx);
+
+ // Best effort at delivery. Discard if no context or context is
+ // unable to receive it.
+ if ((nni_idhash_find(sock->surveys, id, (void **) &ctx) != 0) ||
+ (nni_msgq_tryput(ctx->rq, msg) != 0)) {
nni_msg_free(msg);
- goto failed;
}
- (void) nni_msg_trim(msg, 4);
- nni_aio_set_msg(p->aio_putq, msg);
- nni_msgq_aio_put(p->psock->urq, p->aio_putq);
- return;
+ nni_mtx_unlock(&sock->mtx);
-failed:
- nni_pipe_stop(p->npipe);
+ nni_pipe_recv(p->npipe, p->aio_recv);
+}
+
+static int
+surv0_ctx_setopt_surveytime(void *arg, const void *buf, size_t sz, int typ)
+{
+ surv0_ctx *ctx = arg;
+ return (nni_copyin_ms(&ctx->survtime, buf, sz, typ));
+}
+
+static int
+surv0_ctx_getopt_surveytime(void *arg, void *buf, size_t *szp, int typ)
+{
+ surv0_ctx *ctx = arg;
+ return (nni_copyout_ms(ctx->survtime, buf, szp, typ));
}
static int
@@ -290,141 +466,66 @@ static int
surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz, int typ)
{
surv0_sock *s = arg;
- return (nni_copyin_ms(&s->survtime, buf, sz, typ));
+ return (surv0_ctx_setopt_surveytime(s->ctx, buf, sz, typ));
}
static int
surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp, int typ)
{
surv0_sock *s = arg;
- return (nni_copyout_ms(s->survtime, buf, szp, typ));
+ return (surv0_ctx_getopt_surveytime(s->ctx, buf, szp, typ));
}
-static void
-surv0_sock_getq_cb(void *arg)
+static int
+surv0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ)
{
- surv0_sock *s = arg;
- surv0_pipe *p;
- surv0_pipe *last;
- nni_msg * msg, *dup;
-
- if (nni_aio_result(s->aio_getq) != 0) {
- // Should be NNG_ECLOSED.
- return;
- }
- msg = nni_aio_get_msg(s->aio_getq);
- nni_aio_set_msg(s->aio_getq, NULL);
+ surv0_sock *sock = arg;
+ int rv;
+ int fd;
- nni_mtx_lock(&s->mtx);
- last = nni_list_last(&s->pipes);
- NNI_LIST_FOREACH (&s->pipes, p) {
- if (p != last) {
- if (nni_msg_dup(&dup, msg) != 0) {
- continue;
- }
- } else {
- dup = msg;
- }
- if (nni_msgq_tryput(p->sendq, dup) != 0) {
- nni_msg_free(dup);
+ nni_mtx_lock(&sock->mtx);
+ if (sock->sendable == NULL) {
+ if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) {
+ nni_mtx_unlock(&sock->mtx);
+ return (rv);
}
+ // We are always sendable.
+ nni_pollable_raise(sock->sendable);
}
-
- nni_msgq_aio_get(s->uwq, s->aio_getq);
- nni_mtx_unlock(&s->mtx);
-
- if (last == NULL) {
- // If there were no pipes to send on, just toss the message.
- nni_msg_free(msg);
+ nni_mtx_unlock(&sock->mtx);
+ if ((rv = nni_pollable_getfd(sock->sendable, &fd)) != 0) {
+ return (rv);
}
+ return (nni_copyout_int(fd, buf, szp, typ));
}
-static void
-surv0_timeout(void *arg)
-{
- surv0_sock *s = arg;
-
- nni_mtx_lock(&s->mtx);
- s->survid = 0;
- nni_mtx_unlock(&s->mtx);
-
- nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT);
-}
-
-static void
-surv0_sock_recv(void *arg, nni_aio *aio)
+static int
+surv0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ)
{
- surv0_sock *s = arg;
+ surv0_sock * sock = arg;
+ nni_pollable *recvable;
+ int rv;
+ int fd;
- nni_mtx_lock(&s->mtx);
- if (s->survid == 0) {
- nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, NNG_ESTATE);
- return;
+ if (((rv = nni_msgq_get_recvable(sock->ctx->rq, &recvable)) != 0) ||
+ ((rv = nni_pollable_getfd(recvable, &fd)) != 0)) {
+ return (rv);
}
- nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(s->urq, aio);
+ return (nni_copyout_int(fd, buf, szp, typ));
}
static void
-surv0_sock_send_raw(void *arg, nni_aio *aio)
+surv0_sock_recv(void *arg, nni_aio *aio)
{
surv0_sock *s = arg;
-
- nni_msgq_aio_put(s->uwq, aio);
+ surv0_ctx_recv(s->ctx, aio);
}
static void
surv0_sock_send(void *arg, nni_aio *aio)
{
surv0_sock *s = arg;
- nni_msg * msg;
- int rv;
-
- nni_mtx_lock(&s->mtx);
-
- // Generate a new request ID. We always set the high
- // order bit so that the peer can locate the end of the
- // backtrace. (Pipe IDs have the high order bit clear.)
- s->survid = (s->nextid++) | 0x80000000u;
-
- msg = nni_aio_get_msg(aio);
- nni_msg_header_clear(msg);
- if ((rv = nni_msg_header_append_u32(msg, s->survid)) != 0) {
- nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- // If another message is there, this cancels it. We move the
- // survey expiration out. The timeout thread will wake up in
- // the wake below, and reschedule itself appropriately.
- nni_msgq_set_get_error(s->urq, 0);
- s->expire = nni_clock() + s->survtime;
- nni_timer_schedule(&s->timer, s->expire);
-
- nni_mtx_unlock(&s->mtx);
-
- nni_msgq_aio_put(s->uwq, aio);
-}
-
-static nni_msg *
-surv0_sock_filter(void *arg, nni_msg *msg)
-{
- surv0_sock *s = arg;
-
- nni_mtx_lock(&s->mtx);
-
- if ((nni_msg_header_len(msg) < sizeof(uint32_t)) ||
- (nni_msg_header_trim_u32(msg) != s->survid)) {
- // Wrong request id
- nni_mtx_unlock(&s->mtx);
- nni_msg_free(msg);
- return (NULL);
- }
- nni_mtx_unlock(&s->mtx);
-
- return (msg);
+ surv0_ctx_send(s->ctx, aio);
}
static nni_proto_pipe_ops surv0_pipe_ops = {
@@ -434,6 +535,25 @@ static nni_proto_pipe_ops surv0_pipe_ops = {
.pipe_stop = surv0_pipe_stop,
};
+static nni_proto_ctx_option surv0_ctx_options[] = {
+ {
+ .co_name = NNG_OPT_SURVEYOR_SURVEYTIME,
+ .co_type = NNI_TYPE_DURATION,
+ .co_getopt = surv0_ctx_getopt_surveytime,
+ .co_setopt = surv0_ctx_setopt_surveytime,
+ },
+ {
+ .co_name = NULL,
+ }
+};
+static nni_proto_ctx_ops surv0_ctx_ops = {
+ .ctx_init = surv0_ctx_init,
+ .ctx_fini = surv0_ctx_fini,
+ .ctx_send = surv0_ctx_send,
+ .ctx_recv = surv0_ctx_recv,
+ .ctx_options = surv0_ctx_options,
+};
+
static nni_proto_sock_option surv0_sock_options[] = {
{
.pso_name = NNG_OPT_SURVEYOR_SURVEYTIME,
@@ -447,6 +567,18 @@ static nni_proto_sock_option surv0_sock_options[] = {
.pso_getopt = surv0_sock_getopt_maxttl,
.pso_setopt = surv0_sock_setopt_maxttl,
},
+ {
+ .pso_name = NNG_OPT_RECVFD,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = surv0_sock_getopt_recvfd,
+ .pso_setopt = NULL,
+ },
+ {
+ .pso_name = NNG_OPT_SENDFD,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = surv0_sock_getopt_sendfd,
+ .pso_setopt = NULL,
+ },
// terminate list
{
.pso_name = NULL,
@@ -460,18 +592,6 @@ static nni_proto_sock_ops surv0_sock_ops = {
.sock_close = surv0_sock_close,
.sock_send = surv0_sock_send,
.sock_recv = surv0_sock_recv,
- .sock_filter = surv0_sock_filter,
- .sock_options = surv0_sock_options,
-};
-
-static nni_proto_sock_ops surv0_sock_ops_raw = {
- .sock_init = surv0_sock_init,
- .sock_fini = surv0_sock_fini,
- .sock_open = surv0_sock_open,
- .sock_close = surv0_sock_close,
- .sock_send = surv0_sock_send_raw,
- .sock_recv = surv0_sock_recv,
- .sock_filter = surv0_sock_filter,
.sock_options = surv0_sock_options,
};
@@ -479,18 +599,10 @@ static nni_proto surv0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
.proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &surv0_sock_ops,
.proto_pipe_ops = &surv0_pipe_ops,
-};
-
-static nni_proto surv0_proto_raw = {
- .proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
- .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
- .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
- .proto_sock_ops = &surv0_sock_ops_raw,
- .proto_pipe_ops = &surv0_pipe_ops,
+ .proto_ctx_ops = &surv0_ctx_ops,
};
int
@@ -498,9 +610,3 @@ nng_surveyor0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &surv0_proto));
}
-
-int
-nng_surveyor0_open_raw(nng_socket *sidp)
-{
- return (nni_proto_open(sidp, &surv0_proto_raw));
-}
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
new file mode 100644
index 00000000..7aaed6da
--- /dev/null
+++ b/src/protocol/survey0/xrespond.c
@@ -0,0 +1,408 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "protocol/survey0/respond.h"
+
+// Respondent protocol. The RESPONDENT protocol is the "replier" side of
+// the surveyor pattern. This is useful for building service discovery, or
+// voting algorithms, for example.
+
+#ifndef NNI_PROTO_SURVEYOR_V0
+#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
+#endif
+
+#ifndef NNI_PROTO_RESPONDENT_V0
+#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
+#endif
+
+typedef struct xresp0_pipe xresp0_pipe;
+typedef struct xresp0_sock xresp0_sock;
+
+static void xresp0_recv_cb(void *);
+static void xresp0_putq_cb(void *);
+static void xresp0_getq_cb(void *);
+static void xresp0_send_cb(void *);
+static void xresp0_sock_getq_cb(void *);
+static void xresp0_pipe_fini(void *);
+
+// resp0_sock is our per-socket protocol private structure.
+struct xresp0_sock {
+ nni_msgq * urq;
+ nni_msgq * uwq;
+ int ttl;
+ nni_idhash *pipes;
+ nni_aio * aio_getq;
+ nni_mtx mtx;
+};
+
+// resp0_pipe is our per-pipe protocol private structure.
+struct xresp0_pipe {
+ nni_pipe * npipe;
+ xresp0_sock *psock;
+ uint32_t id;
+ nni_msgq * sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+};
+
+static void
+xresp0_sock_fini(void *arg)
+{
+ xresp0_sock *s = arg;
+
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_idhash_fini(s->pipes);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
+}
+
+static int
+xresp0_sock_init(void **sp, nni_sock *nsock)
+{
+ xresp0_sock *s;
+ int rv;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&s->mtx);
+ if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
+ ((rv = nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) {
+ xresp0_sock_fini(s);
+ return (rv);
+ }
+
+ s->ttl = 8; // Per RFC
+ s->urq = nni_sock_recvq(nsock);
+ s->uwq = nni_sock_sendq(nsock);
+
+ *sp = s;
+ return (0);
+}
+
+static void
+xresp0_sock_open(void *arg)
+{
+ xresp0_sock *s = arg;
+
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
+}
+
+static void
+xresp0_sock_close(void *arg)
+{
+ xresp0_sock *s = arg;
+
+ nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+}
+
+static void
+xresp0_pipe_fini(void *arg)
+{
+ xresp0_pipe *p = arg;
+
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
+}
+
+static int
+xresp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
+{
+ xresp0_pipe *p;
+ int rv;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, xresp0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, xresp0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, xresp0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, xresp0_send_cb, p)) != 0)) {
+ xresp0_pipe_fini(p);
+ return (rv);
+ }
+
+ p->npipe = npipe;
+ p->psock = s;
+ *pp = p;
+ return (0);
+}
+
+static int
+xresp0_pipe_start(void *arg)
+{
+ xresp0_pipe *p = arg;
+ xresp0_sock *s = p->psock;
+ int rv;
+
+ p->id = nni_pipe_id(p->npipe);
+
+ nni_mtx_lock(&s->mtx);
+ rv = nni_idhash_insert(s->pipes, p->id, p);
+ nni_mtx_unlock(&s->mtx);
+ if (rv != 0) {
+ return (rv);
+ }
+
+ nni_pipe_recv(p->npipe, p->aio_recv);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
+
+ return (rv);
+}
+
+static void
+xresp0_pipe_stop(void *arg)
+{
+ xresp0_pipe *p = arg;
+ xresp0_sock *s = p->psock;
+
+ nni_msgq_close(p->sendq);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+
+ nni_mtx_lock(&s->mtx);
+ nni_idhash_remove(s->pipes, p->id);
+ nni_mtx_unlock(&s->mtx);
+}
+
+// resp0_sock_send watches for messages from the upper write queue,
+// extracts the destination pipe, and forwards it to the appropriate
+// destination pipe via a separate queue. This prevents a single bad
+// or slow pipe from gumming up the works for the entire socket.s
+
+void
+xresp0_sock_getq_cb(void *arg)
+{
+ xresp0_sock *s = arg;
+ nni_msg * msg;
+ uint32_t id;
+ xresp0_pipe *p;
+
+ if (nni_aio_result(s->aio_getq) != 0) {
+ return;
+ }
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
+
+ // We yank the outgoing pipe id from the header
+ if (nni_msg_header_len(msg) < 4) {
+ nni_msg_free(msg);
+ // We can't really close down the socket, so just keep going.
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
+ return;
+ }
+ id = nni_msg_header_trim_u32(msg);
+
+ nni_mtx_lock(&s->mtx);
+ // Look for the pipe, and attempt to put the message there
+ // (nonblocking) if we can. If we can't for any reason, then we
+ // free the message.
+ if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) ||
+ (nni_msgq_tryput(p->sendq, msg) != 0)) {
+ nni_msg_free(msg);
+ }
+ nni_mtx_unlock(&s->mtx);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
+}
+
+void
+xresp0_getq_cb(void *arg)
+{
+ xresp0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
+
+ nni_pipe_send(p->npipe, p->aio_send);
+}
+
+void
+xresp0_send_cb(void *arg)
+{
+ xresp0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
+}
+
+static void
+xresp0_recv_cb(void *arg)
+{
+ xresp0_pipe *p = arg;
+ xresp0_sock *s = p->psock;
+ nni_msgq * urq = s->urq;
+ nni_msg * msg;
+ int hops;
+
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, p->id);
+
+ // Store the pipe id in the header, first thing.
+ if (nni_msg_header_append_u32(msg, p->id) != 0) {
+ goto drop;
+ }
+
+ // Move backtrace from body to header
+ hops = 1;
+ for (;;) {
+ bool end = false;
+ uint8_t *body;
+
+ if (hops > s->ttl) {
+ goto drop;
+ }
+ hops++;
+ if (nni_msg_len(msg) < 4) {
+ // Peer sent us garbage, so kick it.
+ nni_msg_free(msg);
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+ body = nni_msg_body(msg);
+ end = ((body[0] & 0x80) != 0);
+ if (nni_msg_header_append(msg, body, 4) != 0) {
+ goto drop;
+ }
+ nni_msg_trim(msg, 4);
+ if (end) {
+ break;
+ }
+ }
+
+ // Now send it up.
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(urq, p->aio_putq);
+ return;
+
+drop:
+ nni_msg_free(msg);
+ nni_pipe_recv(p->npipe, p->aio_recv);
+}
+
+static void
+xresp0_putq_cb(void *arg)
+{
+ xresp0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ nni_pipe_recv(p->npipe, p->aio_recv);
+}
+
+static int
+xresp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
+{
+ xresp0_sock *s = arg;
+ return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ));
+}
+
+static int
+xresp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ)
+{
+ xresp0_sock *s = arg;
+ return (nni_copyout_int(s->ttl, buf, szp, typ));
+}
+
+static void
+xresp0_sock_send(void *arg, nni_aio *aio)
+{
+ xresp0_sock *s = arg;
+
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+xresp0_sock_recv(void *arg, nni_aio *aio)
+{
+ xresp0_sock *s = arg;
+
+ nni_msgq_aio_get(s->urq, aio);
+}
+
+static nni_proto_pipe_ops xresp0_pipe_ops = {
+ .pipe_init = xresp0_pipe_init,
+ .pipe_fini = xresp0_pipe_fini,
+ .pipe_start = xresp0_pipe_start,
+ .pipe_stop = xresp0_pipe_stop,
+};
+
+static nni_proto_sock_option xresp0_sock_options[] = {
+ {
+ .pso_name = NNG_OPT_MAXTTL,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = xresp0_sock_getopt_maxttl,
+ .pso_setopt = xresp0_sock_setopt_maxttl,
+ },
+ // terminate list
+ {
+ .pso_name = NULL,
+ },
+};
+
+static nni_proto_sock_ops xresp0_sock_ops = {
+ .sock_init = xresp0_sock_init,
+ .sock_fini = xresp0_sock_fini,
+ .sock_open = xresp0_sock_open,
+ .sock_close = xresp0_sock_close,
+ .sock_send = xresp0_sock_send,
+ .sock_recv = xresp0_sock_recv,
+ .sock_options = xresp0_sock_options,
+};
+
+static nni_proto xresp0_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" },
+ .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &xresp0_sock_ops,
+ .proto_pipe_ops = &xresp0_pipe_ops,
+};
+
+int
+nng_respondent0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &xresp0_proto));
+}
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
new file mode 100644
index 00000000..cf311b15
--- /dev/null
+++ b/src/protocol/survey0/xsurvey.c
@@ -0,0 +1,380 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "protocol/survey0/survey.h"
+
+// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
+// survey pattern. This is useful for building service discovery, voting, etc.
+
+#ifndef NNI_PROTO_SURVEYOR_V0
+#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
+#endif
+
+#ifndef NNI_PROTO_RESPONDENT_V0
+#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
+#endif
+
+typedef struct xsurv0_pipe xsurv0_pipe;
+typedef struct xsurv0_sock xsurv0_sock;
+
+static void xsurv0_sock_getq_cb(void *);
+static void xsurv0_getq_cb(void *);
+static void xsurv0_putq_cb(void *);
+static void xsurv0_send_cb(void *);
+static void xsurv0_recv_cb(void *);
+
+// surv0_sock is our per-socket protocol private structure.
+struct xsurv0_sock {
+ int ttl;
+ nni_list pipes;
+ nni_aio * aio_getq;
+ nni_msgq *uwq;
+ nni_msgq *urq;
+ nni_mtx mtx;
+};
+
+// surv0_pipe is our per-pipe protocol private structure.
+struct xsurv0_pipe {
+ nni_pipe * npipe;
+ xsurv0_sock * psock;
+ nni_msgq * sendq;
+ nni_list_node node;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+};
+
+static void
+xsurv0_sock_fini(void *arg)
+{
+ xsurv0_sock *s = arg;
+
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
+}
+
+static int
+xsurv0_sock_init(void **sp, nni_sock *nsock)
+{
+ xsurv0_sock *s;
+ int rv;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_aio_init(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) {
+ xsurv0_sock_fini(s);
+ return (rv);
+ }
+ NNI_LIST_INIT(&s->pipes, xsurv0_pipe, node);
+ nni_mtx_init(&s->mtx);
+
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+ s->ttl = 8;
+
+ *sp = s;
+ return (0);
+}
+
+static void
+xsurv0_sock_open(void *arg)
+{
+ xsurv0_sock *s = arg;
+
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
+}
+
+static void
+xsurv0_sock_close(void *arg)
+{
+ xsurv0_sock *s = arg;
+
+ nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+}
+
+static void
+xsurv0_pipe_fini(void *arg)
+{
+ xsurv0_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
+}
+
+static int
+xsurv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
+{
+ xsurv0_pipe *p;
+ int rv;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ // This depth could be tunable. The queue exists so that if we
+ // have multiple requests coming in faster than we can deliver them,
+ // we try to avoid dropping them. We don't really have a solution
+ // for applying backpressure. It would be nice if surveys carried
+ // an expiration with them, so that we could discard any that are
+ // not delivered before their expiration date.
+ if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, xsurv0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, xsurv0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, xsurv0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) {
+ xsurv0_pipe_fini(p);
+ return (rv);
+ }
+
+ p->npipe = npipe;
+ p->psock = s;
+ *pp = p;
+ return (0);
+}
+
+static int
+xsurv0_pipe_start(void *arg)
+{
+ xsurv0_pipe *p = arg;
+ xsurv0_sock *s = p->psock;
+
+ nni_mtx_lock(&s->mtx);
+ nni_list_append(&s->pipes, p);
+ nni_mtx_unlock(&s->mtx);
+
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_pipe_recv(p->npipe, p->aio_recv);
+ return (0);
+}
+
+static void
+xsurv0_pipe_stop(void *arg)
+{
+ xsurv0_pipe *p = arg;
+ xsurv0_sock *s = p->psock;
+
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
+
+ nni_msgq_close(p->sendq);
+
+ nni_mtx_lock(&s->mtx);
+ if (nni_list_active(&s->pipes, p)) {
+ nni_list_remove(&s->pipes, p);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+xsurv0_getq_cb(void *arg)
+{
+ xsurv0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
+
+ nni_pipe_send(p->npipe, p->aio_send);
+}
+
+static void
+xsurv0_send_cb(void *arg)
+{
+ xsurv0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
+}
+
+static void
+xsurv0_putq_cb(void *arg)
+{
+ xsurv0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ nni_pipe_recv(p->npipe, p->aio_recv);
+}
+
+static void
+xsurv0_recv_cb(void *arg)
+{
+ xsurv0_pipe *p = arg;
+ nni_msg * msg;
+
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
+
+ // We yank 4 bytes of body, and move them to the header.
+ if (nni_msg_len(msg) < 4) {
+ // Peer gave us garbage, so kick it.
+ nni_msg_free(msg);
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+ if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) {
+ // Probably ENOMEM, discard and keep going.
+ nni_msg_free(msg);
+ nni_pipe_recv(p->npipe, p->aio_recv);
+ return;
+ }
+ (void) nni_msg_trim(msg, 4);
+
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(p->psock->urq, p->aio_putq);
+}
+
+static int
+xsurv0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
+{
+ xsurv0_sock *s = arg;
+ return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ));
+}
+
+static int
+xsurv0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ)
+{
+ xsurv0_sock *s = arg;
+ return (nni_copyout_int(s->ttl, buf, szp, typ));
+}
+
+static void
+xsurv0_sock_getq_cb(void *arg)
+{
+ xsurv0_sock *s = arg;
+ xsurv0_pipe *p;
+ xsurv0_pipe *last;
+ nni_msg * msg, *dup;
+
+ if (nni_aio_result(s->aio_getq) != 0) {
+ // Should be NNG_ECLOSED.
+ return;
+ }
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
+
+ nni_mtx_lock(&s->mtx);
+ last = nni_list_last(&s->pipes);
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ if (p != last) {
+ if (nni_msg_dup(&dup, msg) != 0) {
+ continue;
+ }
+ } else {
+ dup = msg;
+ }
+ if (nni_msgq_tryput(p->sendq, dup) != 0) {
+ nni_msg_free(dup);
+ }
+ }
+
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
+ nni_mtx_unlock(&s->mtx);
+
+ if (last == NULL) {
+ // If there were no pipes to send on, just toss the message.
+ nni_msg_free(msg);
+ }
+}
+
+static void
+xsurv0_sock_recv(void *arg, nni_aio *aio)
+{
+ xsurv0_sock *s = arg;
+
+ nni_msgq_aio_get(s->urq, aio);
+}
+
+static void
+xsurv0_sock_send(void *arg, nni_aio *aio)
+{
+ xsurv0_sock *s = arg;
+
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static nni_proto_pipe_ops xsurv0_pipe_ops = {
+ .pipe_init = xsurv0_pipe_init,
+ .pipe_fini = xsurv0_pipe_fini,
+ .pipe_start = xsurv0_pipe_start,
+ .pipe_stop = xsurv0_pipe_stop,
+};
+
+static nni_proto_sock_option xsurv0_sock_options[] = {
+ {
+ .pso_name = NNG_OPT_MAXTTL,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = xsurv0_sock_getopt_maxttl,
+ .pso_setopt = xsurv0_sock_setopt_maxttl,
+ },
+ // terminate list
+ {
+ .pso_name = NULL,
+ },
+};
+
+static nni_proto_sock_ops xsurv0_sock_ops = {
+ .sock_init = xsurv0_sock_init,
+ .sock_fini = xsurv0_sock_fini,
+ .sock_open = xsurv0_sock_open,
+ .sock_close = xsurv0_sock_close,
+ .sock_send = xsurv0_sock_send,
+ .sock_recv = xsurv0_sock_recv,
+ .sock_options = xsurv0_sock_options,
+};
+
+static nni_proto xsurv0_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
+ .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &xsurv0_sock_ops,
+ .proto_pipe_ops = &xsurv0_pipe_ops,
+};
+
+int
+nng_surveyor0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &xsurv0_proto));
+}
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 08806679..af570daa 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -166,7 +166,10 @@ add_nng_proto_test(reqctx 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
add_nng_proto_test(reqpoll 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
add_nng_proto_test(reqrep 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
add_nng_proto_test(reqstress 60 NNG_PROTO_REQ0 NNG_PROTO_REP0)
+add_nng_proto_test(respondpoll 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0)
add_nng_test(survey 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0)
+add_nng_proto_test(surveyctx 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0)
+add_nng_proto_test(surveypoll 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0)
# compatbility tests
# We only support these if ALL the legacy protocols are supported. This
@@ -186,6 +189,7 @@ add_nng_compat_test(compat_reqrep 5)
add_nng_compat_test(compat_survey 5)
add_nng_compat_test(compat_reqttl 5)
add_nng_compat_test(compat_shutdown 5)
+add_nng_compat_test(compat_surveyttl 5)
add_nng_compat_test(compat_poll 5)
# These are special tests for compat mode, not inherited from the
diff --git a/tests/compat_surveyttl.c b/tests/compat_surveyttl.c
new file mode 100644
index 00000000..1c6f66be
--- /dev/null
+++ b/tests/compat_surveyttl.c
@@ -0,0 +1,144 @@
+/*
+ Copyright (c) 2012 Martin Sustrik All rights reserved.
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+ Copyright 2016 Garrett D'Amore <garrett@damore.org>
+ Copyright 2016 Franklin "Snaipe" Mathieu <franklinmathieu@gmail.com>
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include <nanomsg/nn.h>
+
+#include "compat_testutil.h"
+
+static char socket_address_a[128];
+static char socket_address_b[128];
+int dev0;
+int dev1;
+
+void device (NN_UNUSED void *arg)
+{
+ int rc;
+
+ /* Run the device. */
+ rc = nn_device (dev0, dev1);
+ nn_assert (rc < 0 && nn_errno () == EBADF);
+
+ /* Clean up. */
+ test_close (dev0);
+ test_close (dev1);
+}
+
+int main (int argc, const char *argv[])
+{
+ int end0;
+ int end1;
+ struct nn_thread thread1;
+ int timeo;
+ int maxttl;
+ size_t sz;
+ int rc;
+
+ int port = get_test_port(argc, argv);
+
+ test_addr_from(socket_address_a, "tcp", "127.0.0.1", port);
+ test_addr_from(socket_address_b, "tcp", "127.0.0.1", port + 1);
+
+ /* Intialise the device sockets. */
+ dev0 = test_socket (AF_SP_RAW, NN_RESPONDENT);
+ dev1 = test_socket (AF_SP_RAW, NN_SURVEYOR);
+
+ test_bind (dev0, socket_address_a);
+ test_bind (dev1, socket_address_b);
+
+ /* Start the device. */
+ nn_thread_init (&thread1, device, NULL);
+
+ end0 = test_socket (AF_SP, NN_SURVEYOR);
+ end1 = test_socket (AF_SP, NN_RESPONDENT);
+
+ /* Test the bi-directional device TTL */
+ test_connect (end0, socket_address_a);
+ test_connect (end1, socket_address_b);
+
+ /* Wait for TCP to establish. */
+ nn_sleep (100);
+
+ /* Set up max receive timeout. */
+ timeo = 100;
+ test_setsockopt (end0, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo));
+ timeo = 100;
+ test_setsockopt (end1, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo));
+
+ /* Test default TTL is 8. */
+ sz = sizeof (maxttl);
+ maxttl = -1;
+ rc = nn_getsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, &sz);
+ nn_assert (rc == 0);
+ nn_assert (sz == sizeof (maxttl));
+ nn_assert (maxttl == 8);
+
+ /* Test to make sure option TTL cannot be set below 1. */
+ maxttl = -1;
+ rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ nn_assert (rc < 0 && nn_errno () == EINVAL);
+ nn_assert (maxttl == -1);
+ maxttl = 0;
+ rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ nn_assert (rc < 0 && nn_errno () == EINVAL);
+ nn_assert (maxttl == 0);
+
+ /* Test to set non-integer size */
+ maxttl = 8;
+ rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, 1);
+ nn_assert (rc < 0 && nn_errno () == EINVAL);
+ nn_assert (maxttl == 8);
+
+ /* Pass a message between endpoints. */
+ test_send (end0, "SURVEY");
+ test_recv (end1, "SURVEY");
+
+ /* Now send a reply. */
+ test_send (end1, "REPLYXYZ");
+ test_recv (end0, "REPLYXYZ");
+
+ /* Now set the max TTL. */
+ maxttl = 1;
+ test_setsockopt (end0, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ test_setsockopt (end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+
+ test_send (end0, "DROPTHIS");
+ test_drop (end1, ETIMEDOUT);
+
+ maxttl = 2;
+ test_setsockopt (end0, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ test_setsockopt (end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ test_send (end0, "DONTDROP");
+ test_recv (end1, "DONTDROP");
+
+ /* Clean up. */
+ test_close (end0);
+ test_close (end1);
+
+ /* Shut down the devices. */
+ nn_term ();
+ nn_thread_term (&thread1);
+
+ return 0;
+}
diff --git a/tests/reqpoll.c b/tests/reqpoll.c
index 64c8df66..aeee7d0b 100644
--- a/tests/reqpoll.c
+++ b/tests/reqpoll.c
@@ -55,7 +55,7 @@ TestMain("REQ pollable", {
atexit(nng_fini);
- Convey("Given a connected REQ/REP pair", {
+ Convey("Given a REQ/REP pair", {
nng_socket req;
nng_socket rep;
nng_ctx ctx;
@@ -74,7 +74,7 @@ TestMain("REQ pollable", {
Convey("REQ ctx not pollable", {
int fd;
So(nng_ctx_open(&ctx, req) == 0);
- Reset({ nng_ctx_close(req); });
+ Reset({ nng_ctx_close(ctx); });
So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) ==
NNG_ENOTSUP);
So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) ==
@@ -87,7 +87,7 @@ TestMain("REQ pollable", {
So(nng_getopt_int(req, NNG_OPT_SENDFD, &fd) == 0);
So(isready(fd) == false);
- Convey("And becomes readable on connect", {
+ Convey("And becomes writable on connect", {
So(nng_dial(req, "inproc://ctx1", NULL, 0) ==
0);
nng_msleep(100);
diff --git a/tests/respondpoll.c b/tests/respondpoll.c
new file mode 100644
index 00000000..2e24b5b2
--- /dev/null
+++ b/tests/respondpoll.c
@@ -0,0 +1,109 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/survey0/respond.h"
+#include "protocol/survey0/survey.h"
+#include "stubs.h"
+#include "supplemental/util/platform.h"
+
+TestMain("Respondent pollable", {
+
+ atexit(nng_fini);
+
+ Convey("Given a connected survey pair", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_ctx ctx;
+
+ So(nng_surveyor0_open(&surv) == 0);
+ So(nng_respondent0_open(&resp) == 0);
+ So(nng_ctx_open(&ctx, resp) == 0);
+
+ So(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 2000) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 2000) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 2000) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 2000) == 0);
+
+ Reset({
+ nng_ctx_close(ctx);
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://ctx1", NULL, 0) == 0);
+
+ Convey("Respondent ctx not pollable", {
+ int fd;
+
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) ==
+ NNG_ENOTSUP);
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) ==
+ NNG_ENOTSUP);
+ });
+
+ Convey("Respondent starts not writable", {
+ int fd;
+
+ So(nng_getopt_int(resp, NNG_OPT_SENDFD, &fd) == 0);
+ So(fdready(fd) == false);
+
+ Convey("And remains unwritable on connect", {
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(fdready(fd) == false);
+
+ Convey("Becomes writable after recv", {
+ nng_msg *m;
+ So(nng_msg_alloc(&m, 0) == 0);
+ So(nng_sendmsg(surv, m, 0) == 0);
+ So(nng_recvmsg(resp, &m, 0) == 0);
+ nng_msg_free(m);
+ So(fdready(fd) == true);
+ });
+ });
+ });
+
+ Convey("Respondent starts not readable", {
+ int fd;
+
+ So(nng_getopt_int(resp, NNG_OPT_RECVFD, &fd) == 0);
+ So(fdready(fd) == false);
+
+ Convey("And doesn't become readable on connect", {
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(fdready(fd) == false);
+ });
+
+ Convey("And becomes readable on data", {
+ nng_msg *msg;
+
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(200);
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(fdready(fd) == false);
+ So(nng_msg_append(msg, "xyz", 3) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ nng_msleep(300); // give time for msg to arrive
+ So(fdready(fd) == true);
+ Convey("Is no longer readable after recv", {
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_msg_free(msg);
+ So(fdready(fd) == false);
+ });
+ });
+ });
+ });
+});
diff --git a/tests/stubs.h b/tests/stubs.h
index da3a6d5f..c34bed7a 100644
--- a/tests/stubs.h
+++ b/tests/stubs.h
@@ -12,11 +12,23 @@
#define STUBS_H
#ifdef _WIN32
+
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+
#include <windows.h>
+#include <winsock2.h>
+// order counts
+#include <mswsock.h>
+#define PLATFD SOCKET
+#define poll WSAPoll
#else
+#include <poll.h>
#include <stdint.h>
#include <sys/time.h>
#include <time.h>
+#define PLATFD int
#endif
// Stub handlers for some common things.
@@ -46,6 +58,25 @@ getms(void)
#endif
}
+bool
+fdready(int fd)
+{
+ struct pollfd pfd;
+ pfd.fd = (PLATFD) fd;
+ pfd.events = POLLRDNORM;
+ pfd.revents = 0;
+
+ switch (poll(&pfd, 1, 0)) {
+ case 0:
+ return (false);
+ case 1:
+ return (true);
+ default:
+ ConveyError("BAD POLL RETURN!");
+ return (false);
+ }
+}
+
int
nosocket(nng_socket *s)
{
diff --git a/tests/survey.c b/tests/survey.c
index a3a7ba1d..ed73b3dd 100644
--- a/tests/survey.c
+++ b/tests/survey.c
@@ -14,6 +14,7 @@
#include "protocol/survey0/respond.h"
#include "protocol/survey0/survey.h"
#include "stubs.h"
+#include "supplemental/util/platform.h"
#include <string.h>
@@ -114,5 +115,221 @@ TestMain("SURVEY pattern", {
So(nng_recvmsg(surv, &msg, 0) == NNG_ESTATE);
});
});
+
+ Convey("Second send cancels pending recv", {
+ nng_msg *msg;
+ nng_aio *aio;
+
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "one");
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ msg = NULL;
+ nng_recv_aio(surv, aio);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "two");
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECANCELED);
+ });
+
+ Convey("Sending a NULL message does not panic", {
+ nng_aio *aio;
+
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+ Reset({ nng_aio_free(aio); });
+ So(nng_sendmsg(surv, NULL, 0) == NNG_EINVAL);
+ nng_send_aio(surv, aio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_EINVAL);
+ });
+
+ Convey("Disconnecting before getting response", {
+ nng_msg *msg;
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_close(surv);
+ nng_msleep(100);
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ });
+ });
+
+ Convey("Bad backtrace survey is ignored", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ nng_msleep(100);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_msg_header_append_u32(msg, 1) ==
+ 0); // high order bit not set!
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Bad backtrace survey is ignored (raw)", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ nng_msleep(100);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_msg_header_append_u32(msg, 1) ==
+ 0); // high order bit not set!
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Missing backtrace survey is ignored", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ nng_msleep(100);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Missing backtrace survey is ignored (raw)", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ nng_msleep(100);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Bad backtrace response is ignored", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0);
+ nng_msleep(100);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_msg_header_clear(msg);
+ nng_msg_header_append_u32(msg, 1);
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
});
+
+ Convey("Bad backtrace response is ignored (raw)", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0);
+ nng_msleep(100);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_msg_header_append_u32(msg, 0x80000000) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_msg_header_clear(msg);
+ nng_msg_header_append_u32(msg, 1);
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Missing backtrace response is ignored", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0);
+ nng_msleep(100);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_msg_header_clear(msg);
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Missing backtrace response is ignored (raw)", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0);
+ nng_msleep(100);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_msg_header_append_u32(msg, 0x80000000) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_msg_header_clear(msg);
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
});
diff --git a/tests/surveyctx.c b/tests/surveyctx.c
new file mode 100644
index 00000000..9ab2de40
--- /dev/null
+++ b/tests/surveyctx.c
@@ -0,0 +1,298 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/survey0/respond.h"
+#include "protocol/survey0/survey.h"
+#include "stubs.h"
+#include "supplemental/util/platform.h"
+
+#include <string.h>
+
+static struct {
+ nng_aio *aio;
+ enum { START, SEND, RECV } state;
+ nng_socket s;
+ nng_msg * msg;
+ int cnt;
+} resp_state;
+
+void
+resp_cb(void)
+{
+ int rv;
+
+ if (resp_state.state == START) {
+ resp_state.state = RECV;
+ nng_recv_aio(resp_state.s, resp_state.aio);
+ return;
+ }
+ if ((rv = nng_aio_result(resp_state.aio)) != 0) {
+ if (resp_state.msg != NULL) {
+ nng_msg_free(resp_state.msg);
+ resp_state.msg = NULL;
+ }
+ return;
+ }
+ switch (resp_state.state) {
+ case START:
+ break;
+ case RECV:
+ resp_state.msg = nng_aio_get_msg(resp_state.aio);
+ resp_state.state = SEND;
+ nng_aio_set_msg(resp_state.aio, resp_state.msg);
+ nng_send_aio(resp_state.s, resp_state.aio);
+ break;
+ case SEND:
+ resp_state.msg = NULL;
+ resp_state.state = RECV;
+ nng_aio_set_msg(resp_state.aio, NULL);
+ nng_recv_aio(resp_state.s, resp_state.aio);
+ resp_state.cnt++;
+ break;
+ }
+}
+
+#define NCTX 10
+
+void
+markr(void *arg)
+{
+ *(bool *) arg = true;
+}
+
+static void
+marks(void *arg)
+{
+ *(bool *) arg = true;
+}
+
+nng_ctx ctxs[NCTX];
+uint32_t recv_order[NCTX];
+nng_aio *saios[NCTX];
+nng_aio *raios[NCTX];
+bool recd[NCTX];
+bool sent[NCTX];
+
+TestMain("Surveyor concurrent contexts", {
+ int rv;
+ const char *addr = "inproc://test";
+ int i;
+
+ memset(recv_order, 0, NCTX * sizeof(int));
+
+ atexit(nng_fini);
+
+ Convey("We can use Surveyor contexts concurrently", {
+ nng_socket surv;
+
+ So(nng_aio_alloc(&resp_state.aio, (void *) resp_cb, NULL) ==
+ 0);
+ So(nng_respondent0_open(&resp_state.s) == 0);
+ So(nng_surveyor0_open(&surv) == 0);
+
+ for (i = 0; i < NCTX; i++) {
+ sent[i] = recd[i] = false;
+ recv_order[i] = (uint32_t) i;
+ if (nng_aio_alloc(&raios[i], markr, &(recd[i])) != 0) {
+ break;
+ }
+ nng_aio_set_timeout(raios[i], 5000);
+ if (nng_aio_alloc(&saios[i], marks, &(sent[i])) != 0) {
+ break;
+ }
+ nng_aio_set_timeout(saios[i], 5000);
+ }
+
+ // So(nng_setopt_int(resp_state.s, NNG_OPT_SENDBUF, NCTX) ==
+ // 0);
+ So(i == NCTX);
+ for (i = 0; i < NCTX; i++) {
+ uint32_t tmp;
+ int ni = rand() % NCTX; // recv index
+
+ tmp = recv_order[i];
+ recv_order[i] = recv_order[ni];
+ recv_order[ni] = tmp;
+ }
+ Reset({
+ for (i = 0; i < NCTX; i++) {
+ nng_aio_free(saios[i]);
+ nng_aio_free(raios[i]);
+ }
+ nng_close(surv);
+ nng_close(resp_state.s);
+ nng_aio_free(resp_state.aio);
+ });
+
+ So(nng_listen(resp_state.s, addr, NULL, 0) == 0);
+ So(nng_dial(surv, addr, NULL, 0) == 0);
+
+ nng_msleep(100); // let things establish.
+
+ // Start the rep state machine going.
+ resp_cb();
+
+ for (i = 0; i < NCTX; i++) {
+ if ((rv = nng_ctx_open(&ctxs[i], surv)) != 0) {
+ break;
+ }
+ }
+ So(rv == 0);
+ So(i == NCTX);
+
+ // Send messages
+ for (i = 0; i < NCTX; i++) {
+ nng_msg *m;
+ if ((rv = nng_msg_alloc(&m, sizeof(uint32_t))) != 0) {
+ Fail("msg alloc failed: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_msg_append_u32(m, i)) != 0) {
+ Fail("append failed: %s", nng_strerror(rv));
+ }
+ nng_aio_set_msg(saios[i], m);
+ nng_ctx_send(ctxs[i], saios[i]);
+ }
+ So(rv == 0);
+ So(i == NCTX);
+
+ for (i = 0; i < NCTX; i++) {
+ nng_aio_wait(saios[i]);
+ if ((rv = nng_aio_result(saios[i])) != 0) {
+ Fail("send failed: %s", nng_strerror(rv));
+ So(false);
+ break;
+ }
+ }
+ for (i = 0; i < NCTX; i++) {
+ if (!sent[i]) {
+ Fail("Index %d (%d) not sent", i, i);
+ }
+ }
+
+ So(rv == 0);
+ So(i == NCTX);
+ // Receive answers
+ for (i = 0; i < NCTX; i++) {
+ int ri = recv_order[i];
+ nng_ctx_recv(ctxs[ri], raios[ri]);
+ }
+
+ for (i = 0; i < NCTX; i++) {
+ nng_msg *msg;
+ uint32_t x;
+
+ nng_aio_wait(raios[i]);
+ if ((rv = nng_aio_result(raios[i])) != 0) {
+ Fail("recv %d (%d) %d failed: %s", i,
+ recv_order[i], resp_state.cnt,
+ nng_strerror(rv));
+ continue;
+ }
+ msg = nng_aio_get_msg(raios[i]);
+ if ((rv = nng_msg_chop_u32(msg, &x)) != 0) {
+ Fail("recv msg trim: %s", nng_strerror(rv));
+ break;
+ }
+ if (x != (uint32_t) i) {
+ Fail("message body mismatch: %x %x\n", x,
+ (uint32_t) i);
+ break;
+ }
+
+ nng_msg_free(msg);
+ }
+ for (i = 0; i < NCTX; i++) {
+ if (!recd[i]) {
+ Fail("Index %d (%d) not received", i,
+ recv_order[i]);
+ break;
+ }
+ }
+
+ So(rv == 0);
+ So(i == NCTX);
+ });
+
+ Convey("Given a socket and a context", {
+ nng_socket surv;
+ nng_ctx ctx;
+ nng_aio * aio;
+
+ So(nng_surveyor0_open(&surv) == 0);
+ So(nng_ctx_open(&ctx, surv) == 0);
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+ nng_aio_set_timeout(aio, 1000);
+
+ Reset({ nng_aio_free(aio); });
+
+ Convey("Recv on the context is ESTATE", {
+ nng_ctx_recv(ctx, aio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ESTATE);
+ });
+
+ Convey("Closing the socket aborts a context recv", {
+ nng_msg *msg;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ nng_aio_set_msg(aio, msg);
+ nng_ctx_send(ctx, aio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == 0);
+ nng_ctx_recv(ctx, aio);
+ nng_close(surv);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECLOSED);
+ });
+
+ Convey("Sending a null message fails", {
+ nng_ctx_send(ctx, aio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_EINVAL);
+ });
+
+ Convey("Closing the context aborts a context send", {
+ nng_msg *msg;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ nng_aio_set_msg(aio, msg);
+ nng_ctx_send(ctx, aio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == 0);
+ nng_recv_aio(ctx, aio);
+ nng_ctx_close(ctx);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECLOSED);
+ nng_close(surv);
+ });
+
+ Convey("We can set separate survey times", {
+ nng_duration ms;
+ So(nng_setopt_ms(
+ surv, NNG_OPT_SURVEYOR_SURVEYTIME, 100) == 0);
+ So(nng_ctx_setopt_ms(
+ ctx, NNG_OPT_SURVEYOR_SURVEYTIME, 200) == 0);
+ So(nng_getopt_ms(
+ surv, NNG_OPT_SURVEYOR_SURVEYTIME, &ms) == 0);
+ So(ms == 100);
+ So(nng_ctx_getopt_ms(
+ ctx, NNG_OPT_SURVEYOR_SURVEYTIME, &ms) == 0);
+ So(ms == 200);
+ });
+ });
+
+ Convey("Raw mode does not support contexts", {
+ nng_socket surv;
+ nng_ctx ctx;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_ctx_open(&ctx, surv) == NNG_ENOTSUP);
+ nng_close(surv);
+ });
+});
diff --git a/tests/surveypoll.c b/tests/surveypoll.c
new file mode 100644
index 00000000..f97e1c22
--- /dev/null
+++ b/tests/surveypoll.c
@@ -0,0 +1,126 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/survey0/respond.h"
+#include "protocol/survey0/survey.h"
+#include "stubs.h"
+#include "supplemental/util/platform.h"
+
+TestMain("Survey pollable", {
+
+ atexit(nng_fini);
+
+ Convey("Given a connected survey pair", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_ctx ctx;
+
+ So(nng_surveyor0_open(&surv) == 0);
+ So(nng_respondent0_open(&resp) == 0);
+ So(nng_ctx_open(&ctx, surv) == 0);
+
+ So(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 2000) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 2000) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 2000) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 2000) == 0);
+
+ Reset({
+ nng_ctx_close(ctx);
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://ctx1", NULL, 0) == 0);
+
+ Convey("Surveyor ctx not pollable", {
+ int fd;
+
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) ==
+ NNG_ENOTSUP);
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) ==
+ NNG_ENOTSUP);
+ });
+
+ Convey("Suveyor starts writable", {
+ int fd;
+
+ So(nng_getopt_int(surv, NNG_OPT_SENDFD, &fd) == 0);
+ So(fdready(fd) == true);
+
+ Convey("And becomes readable on connect", {
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(fdready(fd) == true);
+
+ Convey("And stays writable", {
+ // 500 messages should force all
+ // the way to send depth.
+ int i;
+ for (i = 0; i < 500; i++) {
+ nng_msg *m;
+ if (nng_msg_alloc(&m, 0) !=
+ 0) {
+ break;
+ }
+ // Fill intermediate queues.
+ if (nng_sendmsg(surv, m,
+ NNG_FLAG_NONBLOCK) !=
+ 0) {
+ nng_msg_free(m);
+ }
+ }
+ So(i == 500);
+ So(fdready(fd) == true);
+ });
+ });
+ });
+
+ Convey("Surveyor starts not readable", {
+ int fd;
+
+ So(nng_getopt_int(surv, NNG_OPT_RECVFD, &fd) == 0);
+ So(fdready(fd) == false);
+
+ Convey("And doesn't become readable on connect", {
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(fdready(fd) == false);
+ });
+
+ Convey("And becomes readable on data", {
+ nng_msg *msg;
+
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(200);
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(fdready(fd) == false);
+ So(nng_msg_append(msg, "xyz", 3) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) ==
+ 0); // recv on rep
+ So(nng_sendmsg(resp, msg, 0) ==
+ 0); // echo it back
+ nng_msleep(
+ 300); // give time for message to arrive
+ So(fdready(fd) == true);
+ Convey("Is no longer readable after recv", {
+ So(nng_recvmsg(surv, &msg, 0) == 0);
+ nng_msg_free(msg);
+ So(fdready(fd) == false);
+ });
+ });
+ });
+ });
+});