aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/survey0/respond.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-15 22:16:09 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-16 20:22:15 -0800
commit1cf3205d805be83e17ce096305d6eaef94e3a4d2 (patch)
treecc726655772c9ba3682228889504adb562a5406a /src/protocol/survey0/respond.c
parent49ea4dd5ec717303caeaef7f6f4efd10c90c94e1 (diff)
downloadnng-1cf3205d805be83e17ce096305d6eaef94e3a4d2.tar.gz
nng-1cf3205d805be83e17ce096305d6eaef94e3a4d2.tar.bz2
nng-1cf3205d805be83e17ce096305d6eaef94e3a4d2.zip
RESPOND needs to use atomic for TTL.
Also, add a test to cover the RESPOND protocol. This gets about 95% of the coverage.
Diffstat (limited to 'src/protocol/survey0/respond.c')
-rw-r--r--src/protocol/survey0/respond.c114
1 files changed, 58 insertions, 56 deletions
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index 06010d99..201811de 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -48,14 +48,14 @@ struct resp0_ctx {
// resp0_sock is our per-socket protocol private structure.
struct resp0_sock {
- nni_mtx mtx;
- int ttl;
- nni_idhash * pipes;
- resp0_ctx ctx;
- nni_list recvpipes;
- nni_list recvq;
- nni_pollable readable;
- nni_pollable writable;
+ nni_mtx mtx;
+ nni_atomic_int ttl;
+ nni_idhash * pipes;
+ resp0_ctx ctx;
+ nni_list recvpipes;
+ nni_list recvq;
+ nni_pollable readable;
+ nni_pollable writable;
};
// resp0_pipe is our per-pipe protocol private structure.
@@ -65,8 +65,8 @@ struct resp0_pipe {
bool busy;
uint32_t id;
nni_list sendq; // contexts waiting to send
- nni_aio * aio_send;
- nni_aio * aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_recv;
nni_list_node rnode; // receivable linkage
};
@@ -159,6 +159,11 @@ resp0_ctx_send(void *arg, nni_aio *aio)
}
nni_mtx_lock(&s->mtx);
+ if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
if ((len = ctx->btrace_len) == 0) {
nni_mtx_unlock(&s->mtx);
@@ -187,8 +192,8 @@ resp0_ctx_send(void *arg, nni_aio *aio)
if (!p->busy) {
p->busy = true;
len = nni_msg_len(msg);
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->npipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->npipe, &p->aio_send);
nni_mtx_unlock(&s->mtx);
nni_aio_set_msg(aio, NULL);
@@ -196,12 +201,6 @@ resp0_ctx_send(void *arg, nni_aio *aio)
return;
}
- if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) {
- nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
ctx->saio = aio;
ctx->spipe = p;
nni_list_append(&p->sendq, ctx);
@@ -237,7 +236,8 @@ resp0_sock_init(void *arg, nni_sock *nsock)
NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode);
NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode);
- s->ttl = 8; // Per RFC
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8); // Per RFC
(void) resp0_ctx_init(&s->ctx, s);
@@ -267,8 +267,8 @@ resp0_pipe_stop(void *arg)
{
resp0_pipe *p = arg;
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
+ nni_aio_stop(&p->aio_send);
+ nni_aio_stop(&p->aio_recv);
}
static void
@@ -277,25 +277,21 @@ resp0_pipe_fini(void *arg)
resp0_pipe *p = arg;
nng_msg * msg;
- if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) {
- nni_aio_set_msg(p->aio_recv, NULL);
+ if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) {
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_free(msg);
}
- nni_aio_free(p->aio_send);
- nni_aio_free(p->aio_recv);
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
}
static int
resp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
{
resp0_pipe *p = arg;
- int rv;
- if (((rv = nni_aio_alloc(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) {
- resp0_pipe_fini(p);
- return (rv);
- }
+ nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p);
+ nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p);
NNI_LIST_INIT(&p->sendq, resp0_ctx, sqnode);
@@ -325,7 +321,7 @@ resp0_pipe_start(void *arg)
return (rv);
}
- nni_pipe_recv(p->npipe, p->aio_recv);
+ nni_pipe_recv(p->npipe, &p->aio_recv);
return (rv);
}
@@ -336,8 +332,8 @@ resp0_pipe_close(void *arg)
resp0_sock *s = p->psock;
resp0_ctx * ctx;
- nni_aio_close(p->aio_send);
- nni_aio_close(p->aio_recv);
+ nni_aio_close(&p->aio_send);
+ nni_aio_close(&p->aio_recv);
nni_mtx_lock(&s->mtx);
while ((ctx = nni_list_first(&p->sendq)) != NULL) {
@@ -370,9 +366,9 @@ resp0_pipe_send_cb(void *arg)
nni_msg * msg;
size_t len;
- if (nni_aio_result(p->aio_send) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->npipe);
return;
}
@@ -396,8 +392,8 @@ resp0_pipe_send_cb(void *arg)
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
nni_aio_set_msg(aio, NULL);
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->npipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->npipe, &p->aio_send);
nni_mtx_unlock(&s->mtx);
@@ -452,13 +448,13 @@ resp0_ctx_recv(void *arg, nni_aio *aio)
nni_mtx_unlock(&s->mtx);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_list_remove(&s->recvpipes, p);
if (nni_list_empty(&s->recvpipes)) {
nni_pollable_clear(&s->readable);
}
- nni_pipe_recv(p->npipe, p->aio_recv);
+ nni_pipe_recv(p->npipe, &p->aio_recv);
len = nni_msg_header_len(msg);
memcpy(ctx->btrace, nni_msg_header(msg), len);
@@ -485,12 +481,12 @@ resp0_pipe_recv_cb(void *arg)
int hops;
size_t len;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->npipe);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
+ msg = nni_aio_get_msg(&p->aio_recv);
nni_msg_set_pipe(msg, p->id);
// Move backtrace from body to header
@@ -499,14 +495,14 @@ resp0_pipe_recv_cb(void *arg)
bool end = 0;
uint8_t *body;
- if (hops > s->ttl) {
+ if (hops > nni_atomic_get(&s->ttl)) {
goto drop;
}
hops++;
if (nni_msg_len(msg) < 4) {
// Peer is speaking garbage, kick it.
nni_msg_free(msg);
- nni_aio_set_msg(p->aio_recv, NULL);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_pipe_close(p->npipe);
return;
}
@@ -535,10 +531,10 @@ resp0_pipe_recv_cb(void *arg)
nni_list_remove(&s->recvq, ctx);
aio = ctx->raio;
ctx->raio = NULL;
- nni_aio_set_msg(p->aio_recv, NULL);
+ nni_aio_set_msg(&p->aio_recv, NULL);
// Start the next receive.
- nni_pipe_recv(p->npipe, p->aio_recv);
+ nni_pipe_recv(p->npipe, &p->aio_recv);
ctx->btrace_len = len;
memcpy(ctx->btrace, nni_msg_header(msg), len);
@@ -556,22 +552,28 @@ resp0_pipe_recv_cb(void *arg)
drop:
nni_msg_free(msg);
- nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_recv(p->npipe, p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_pipe_recv(p->npipe, &p->aio_recv);
}
static int
-resp0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
+resp0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
{
resp0_sock *s = arg;
- return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, t));
+ int ttl;
+ int rv;
+
+ if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) {
+ nni_atomic_set(&s->ttl, ttl);
+ }
+ return (rv);
}
static int
-resp0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
+resp0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
resp0_sock *s = arg;
- return (nni_copyout_int(s->ttl, buf, szp, t));
+ return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
}
static int
@@ -636,8 +638,8 @@ static nni_proto_ctx_ops resp0_ctx_ops = {
static nni_option resp0_sock_options[] = {
{
.o_name = NNG_OPT_MAXTTL,
- .o_get = resp0_sock_get_maxttl,
- .o_set = resp0_sock_set_maxttl,
+ .o_get = resp0_sock_get_max_ttl,
+ .o_set = resp0_sock_set_max_ttl,
},
{
.o_name = NNG_OPT_RECVFD,