diff options
Diffstat (limited to 'src/protocol/reqrep0/req.c')
| -rw-r--r-- | src/protocol/reqrep0/req.c | 92 |
1 files changed, 43 insertions, 49 deletions
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 14da7143..796bd71e 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -9,7 +9,6 @@ // #include <stdio.h> -#include <stdlib.h> #include "core/nng_impl.h" #include "nng/protocol/reqrep0/req.h" @@ -56,19 +55,19 @@ struct req0_ctx { // A req0_sock is our per-socket protocol private structure. struct req0_sock { - nni_duration retry; - bool closed; - int ttl; - req0_ctx master; // base socket master - nni_list ready_pipes; - nni_list busy_pipes; - nni_list stop_pipes; - nni_list contexts; - nni_list send_queue; // contexts waiting to send. - nni_idhash * requests; // contexts by request ID - nni_pollable readable; - nni_pollable writable; - nni_mtx mtx; + nni_duration retry; + bool closed; + nni_atomic_int ttl; + req0_ctx master; // base socket master + nni_list ready_pipes; + nni_list busy_pipes; + nni_list stop_pipes; + nni_list contexts; + nni_list send_queue; // contexts waiting to send. + nni_idhash * requests; // contexts by request ID + nni_pollable readable; + nni_pollable writable; + nni_mtx mtx; }; // A req0_pipe is our per-pipe protocol private structure. @@ -78,8 +77,8 @@ struct req0_pipe { nni_list_node node; nni_list contexts; // contexts with pending traffic bool closed; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; }; static void req0_sock_fini(void *); @@ -120,7 +119,8 @@ req0_sock_init(void *arg, nni_sock *sock) nni_pollable_init(&s->writable); nni_pollable_init(&s->readable); - s->ttl = 8; + nni_atomic_init(&s->ttl); + nni_atomic_set(&s->ttl, 8); return (0); } @@ -172,8 +172,8 @@ req0_pipe_stop(void *arg) req0_pipe *p = arg; req0_sock *s = p->req; - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_send); + nni_aio_stop(&p->aio_recv); + nni_aio_stop(&p->aio_send); nni_mtx_lock(&s->mtx); nni_list_node_remove(&p->node); nni_mtx_unlock(&s->mtx); @@ -184,22 +184,17 @@ req0_pipe_fini(void *arg) { req0_pipe *p = arg; - nni_aio_free(p->aio_recv); - nni_aio_free(p->aio_send); + nni_aio_fini(&p->aio_recv); + nni_aio_fini(&p->aio_send); } static int req0_pipe_init(void *arg, nni_pipe *pipe, void *s) { req0_pipe *p = arg; - int rv; - - if (((rv = nni_aio_alloc(&p->aio_recv, req0_recv_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, req0_send_cb, p)) != 0)) { - req0_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->aio_recv, req0_recv_cb, p); + nni_aio_init(&p->aio_send, req0_send_cb, p); NNI_LIST_NODE_INIT(&p->node); NNI_LIST_INIT(&p->contexts, req0_ctx, pipe_node); p->pipe = pipe; @@ -227,7 +222,7 @@ req0_pipe_start(void *arg) req0_run_send_queue(s, NULL); nni_mtx_unlock(&s->mtx); - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -238,8 +233,8 @@ req0_pipe_close(void *arg) req0_sock *s = p->req; req0_ctx * ctx; - nni_aio_close(p->aio_recv); - nni_aio_close(p->aio_send); + nni_aio_close(&p->aio_recv); + nni_aio_close(&p->aio_send); nni_mtx_lock(&s->mtx); // This removes the node from either busy_pipes or ready_pipes. @@ -277,10 +272,10 @@ req0_send_cb(void *arg) nni_list send_list; nni_aio_list_init(&send_list); - if (nni_aio_result(p->aio_send) != 0) { + if (nni_aio_result(&p->aio_send) != 0) { // We failed to send... clean up and deal with it. - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->pipe); return; } @@ -319,13 +314,13 @@ req0_recv_cb(void *arg) nni_aio * aio; uint32_t id; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); // We yank 4 bytes from front of body, and move them to the header. @@ -334,17 +329,11 @@ req0_recv_cb(void *arg) goto malformed; } id = nni_msg_trim_u32(msg); - if (nni_msg_header_append_u32(msg, id) != 0) { - // Arguably we could just discard and carry on. But - // dropping the connection is probably more helpful since - // it lets the other side see that a problem occurred. - // Plus it gives us a chance to reclaim some memory. - goto malformed; - } + nni_msg_header_must_append_u32(msg, id); // Schedule another receive while we are processing this. nni_mtx_lock(&s->mtx); - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); // Look for a context to receive it. if ((nni_idhash_find(s->requests, id, (void **) &ctx) != 0) || @@ -526,8 +515,8 @@ req0_run_send_queue(req0_sock *s, nni_list *send_list) } } - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); } } @@ -763,14 +752,19 @@ static int req0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) { req0_sock *s = arg; - return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, t)); + int ttl; + int rv; + if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) { + nni_atomic_set(&s->ttl, ttl); + } + return (rv); } static int req0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) { req0_sock *s = arg; - return (nni_copyout_int(s->ttl, buf, szp, t)); + return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); } static int |
