diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-15 22:16:09 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-16 20:22:15 -0800 |
| commit | 1cf3205d805be83e17ce096305d6eaef94e3a4d2 (patch) | |
| tree | cc726655772c9ba3682228889504adb562a5406a /src/protocol/survey0/respond.c | |
| parent | 49ea4dd5ec717303caeaef7f6f4efd10c90c94e1 (diff) | |
| download | nng-1cf3205d805be83e17ce096305d6eaef94e3a4d2.tar.gz nng-1cf3205d805be83e17ce096305d6eaef94e3a4d2.tar.bz2 nng-1cf3205d805be83e17ce096305d6eaef94e3a4d2.zip | |
RESPOND needs to use atomic for TTL.
Also, add a test to cover the RESPOND protocol. This gets about
95% of the coverage.
Diffstat (limited to 'src/protocol/survey0/respond.c')
| -rw-r--r-- | src/protocol/survey0/respond.c | 114 |
1 files changed, 58 insertions, 56 deletions
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index 06010d99..201811de 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -48,14 +48,14 @@ struct resp0_ctx { // resp0_sock is our per-socket protocol private structure. struct resp0_sock { - nni_mtx mtx; - int ttl; - nni_idhash * pipes; - resp0_ctx ctx; - nni_list recvpipes; - nni_list recvq; - nni_pollable readable; - nni_pollable writable; + nni_mtx mtx; + nni_atomic_int ttl; + nni_idhash * pipes; + resp0_ctx ctx; + nni_list recvpipes; + nni_list recvq; + nni_pollable readable; + nni_pollable writable; }; // resp0_pipe is our per-pipe protocol private structure. @@ -65,8 +65,8 @@ struct resp0_pipe { bool busy; uint32_t id; nni_list sendq; // contexts waiting to send - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; nni_list_node rnode; // receivable linkage }; @@ -159,6 +159,11 @@ resp0_ctx_send(void *arg, nni_aio *aio) } nni_mtx_lock(&s->mtx); + if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } if ((len = ctx->btrace_len) == 0) { nni_mtx_unlock(&s->mtx); @@ -187,8 +192,8 @@ resp0_ctx_send(void *arg, nni_aio *aio) if (!p->busy) { p->busy = true; len = nni_msg_len(msg); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->npipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->npipe, &p->aio_send); nni_mtx_unlock(&s->mtx); nni_aio_set_msg(aio, NULL); @@ -196,12 +201,6 @@ resp0_ctx_send(void *arg, nni_aio *aio) return; } - if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); - return; - } - ctx->saio = aio; ctx->spipe = p; nni_list_append(&p->sendq, ctx); @@ -237,7 +236,8 @@ resp0_sock_init(void *arg, nni_sock *nsock) NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode); NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode); - s->ttl = 8; // Per RFC + nni_atomic_init(&s->ttl); + nni_atomic_set(&s->ttl, 8); // Per RFC (void) resp0_ctx_init(&s->ctx, s); @@ -267,8 +267,8 @@ resp0_pipe_stop(void *arg) { resp0_pipe *p = arg; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -277,25 +277,21 @@ resp0_pipe_fini(void *arg) resp0_pipe *p = arg; nng_msg * msg; - if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) { - nni_aio_set_msg(p->aio_recv, NULL); + if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) { + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_free(msg); } - nni_aio_free(p->aio_send); - nni_aio_free(p->aio_recv); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); } static int resp0_pipe_init(void *arg, nni_pipe *npipe, void *s) { resp0_pipe *p = arg; - int rv; - if (((rv = nni_aio_alloc(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) { - resp0_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p); + nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p); NNI_LIST_INIT(&p->sendq, resp0_ctx, sqnode); @@ -325,7 +321,7 @@ resp0_pipe_start(void *arg) return (rv); } - nni_pipe_recv(p->npipe, p->aio_recv); + nni_pipe_recv(p->npipe, &p->aio_recv); return (rv); } @@ -336,8 +332,8 @@ resp0_pipe_close(void *arg) resp0_sock *s = p->psock; resp0_ctx * ctx; - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_mtx_lock(&s->mtx); while ((ctx = nni_list_first(&p->sendq)) != NULL) { @@ -370,9 +366,9 @@ resp0_pipe_send_cb(void *arg) nni_msg * msg; size_t len; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->npipe); return; } @@ -396,8 +392,8 @@ resp0_pipe_send_cb(void *arg) msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->npipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->npipe, &p->aio_send); nni_mtx_unlock(&s->mtx); @@ -452,13 +448,13 @@ resp0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_unlock(&s->mtx); return; } - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_list_remove(&s->recvpipes, p); if (nni_list_empty(&s->recvpipes)) { nni_pollable_clear(&s->readable); } - nni_pipe_recv(p->npipe, p->aio_recv); + nni_pipe_recv(p->npipe, &p->aio_recv); len = nni_msg_header_len(msg); memcpy(ctx->btrace, nni_msg_header(msg), len); @@ -485,12 +481,12 @@ resp0_pipe_recv_cb(void *arg) int hops; size_t len; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->npipe); return; } - msg = nni_aio_get_msg(p->aio_recv); + msg = nni_aio_get_msg(&p->aio_recv); nni_msg_set_pipe(msg, p->id); // Move backtrace from body to header @@ -499,14 +495,14 @@ resp0_pipe_recv_cb(void *arg) bool end = 0; uint8_t *body; - if (hops > s->ttl) { + if (hops > nni_atomic_get(&s->ttl)) { goto drop; } hops++; if (nni_msg_len(msg) < 4) { // Peer is speaking garbage, kick it. nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); nni_pipe_close(p->npipe); return; } @@ -535,10 +531,10 @@ resp0_pipe_recv_cb(void *arg) nni_list_remove(&s->recvq, ctx); aio = ctx->raio; ctx->raio = NULL; - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); // Start the next receive. - nni_pipe_recv(p->npipe, p->aio_recv); + nni_pipe_recv(p->npipe, &p->aio_recv); ctx->btrace_len = len; memcpy(ctx->btrace, nni_msg_header(msg), len); @@ -556,22 +552,28 @@ resp0_pipe_recv_cb(void *arg) drop: nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_recv(p->npipe, p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->npipe, &p->aio_recv); } static int -resp0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +resp0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) { resp0_sock *s = arg; - return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, t)); + int ttl; + int rv; + + if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) { + nni_atomic_set(&s->ttl, ttl); + } + return (rv); } static int -resp0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +resp0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) { resp0_sock *s = arg; - return (nni_copyout_int(s->ttl, buf, szp, t)); + return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); } static int @@ -636,8 +638,8 @@ static nni_proto_ctx_ops resp0_ctx_ops = { static nni_option resp0_sock_options[] = { { .o_name = NNG_OPT_MAXTTL, - .o_get = resp0_sock_get_maxttl, - .o_set = resp0_sock_set_maxttl, + .o_get = resp0_sock_get_max_ttl, + .o_set = resp0_sock_set_max_ttl, }, { .o_name = NNG_OPT_RECVFD, |
