aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0/req.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep0/req.c')
-rw-r--r--src/protocol/reqrep0/req.c92
1 files changed, 43 insertions, 49 deletions
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 14da7143..796bd71e 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -9,7 +9,6 @@
//
#include <stdio.h>
-#include <stdlib.h>
#include "core/nng_impl.h"
#include "nng/protocol/reqrep0/req.h"
@@ -56,19 +55,19 @@ struct req0_ctx {
// A req0_sock is our per-socket protocol private structure.
struct req0_sock {
- nni_duration retry;
- bool closed;
- int ttl;
- req0_ctx master; // base socket master
- nni_list ready_pipes;
- nni_list busy_pipes;
- nni_list stop_pipes;
- nni_list contexts;
- nni_list send_queue; // contexts waiting to send.
- nni_idhash * requests; // contexts by request ID
- nni_pollable readable;
- nni_pollable writable;
- nni_mtx mtx;
+ nni_duration retry;
+ bool closed;
+ nni_atomic_int ttl;
+ req0_ctx master; // base socket master
+ nni_list ready_pipes;
+ nni_list busy_pipes;
+ nni_list stop_pipes;
+ nni_list contexts;
+ nni_list send_queue; // contexts waiting to send.
+ nni_idhash * requests; // contexts by request ID
+ nni_pollable readable;
+ nni_pollable writable;
+ nni_mtx mtx;
};
// A req0_pipe is our per-pipe protocol private structure.
@@ -78,8 +77,8 @@ struct req0_pipe {
nni_list_node node;
nni_list contexts; // contexts with pending traffic
bool closed;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_recv;
};
static void req0_sock_fini(void *);
@@ -120,7 +119,8 @@ req0_sock_init(void *arg, nni_sock *sock)
nni_pollable_init(&s->writable);
nni_pollable_init(&s->readable);
- s->ttl = 8;
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8);
return (0);
}
@@ -172,8 +172,8 @@ req0_pipe_stop(void *arg)
req0_pipe *p = arg;
req0_sock *s = p->req;
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_send);
+ nni_aio_stop(&p->aio_recv);
+ nni_aio_stop(&p->aio_send);
nni_mtx_lock(&s->mtx);
nni_list_node_remove(&p->node);
nni_mtx_unlock(&s->mtx);
@@ -184,22 +184,17 @@ req0_pipe_fini(void *arg)
{
req0_pipe *p = arg;
- nni_aio_free(p->aio_recv);
- nni_aio_free(p->aio_send);
+ nni_aio_fini(&p->aio_recv);
+ nni_aio_fini(&p->aio_send);
}
static int
req0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
req0_pipe *p = arg;
- int rv;
-
- if (((rv = nni_aio_alloc(&p->aio_recv, req0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_send, req0_send_cb, p)) != 0)) {
- req0_pipe_fini(p);
- return (rv);
- }
+ nni_aio_init(&p->aio_recv, req0_recv_cb, p);
+ nni_aio_init(&p->aio_send, req0_send_cb, p);
NNI_LIST_NODE_INIT(&p->node);
NNI_LIST_INIT(&p->contexts, req0_ctx, pipe_node);
p->pipe = pipe;
@@ -227,7 +222,7 @@ req0_pipe_start(void *arg)
req0_run_send_queue(s, NULL);
nni_mtx_unlock(&s->mtx);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -238,8 +233,8 @@ req0_pipe_close(void *arg)
req0_sock *s = p->req;
req0_ctx * ctx;
- nni_aio_close(p->aio_recv);
- nni_aio_close(p->aio_send);
+ nni_aio_close(&p->aio_recv);
+ nni_aio_close(&p->aio_send);
nni_mtx_lock(&s->mtx);
// This removes the node from either busy_pipes or ready_pipes.
@@ -277,10 +272,10 @@ req0_send_cb(void *arg)
nni_list send_list;
nni_aio_list_init(&send_list);
- if (nni_aio_result(p->aio_send) != 0) {
+ if (nni_aio_result(&p->aio_send) != 0) {
// We failed to send... clean up and deal with it.
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->pipe);
return;
}
@@ -319,13 +314,13 @@ req0_recv_cb(void *arg)
nni_aio * aio;
uint32_t id;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
// We yank 4 bytes from front of body, and move them to the header.
@@ -334,17 +329,11 @@ req0_recv_cb(void *arg)
goto malformed;
}
id = nni_msg_trim_u32(msg);
- if (nni_msg_header_append_u32(msg, id) != 0) {
- // Arguably we could just discard and carry on. But
- // dropping the connection is probably more helpful since
- // it lets the other side see that a problem occurred.
- // Plus it gives us a chance to reclaim some memory.
- goto malformed;
- }
+ nni_msg_header_must_append_u32(msg, id);
// Schedule another receive while we are processing this.
nni_mtx_lock(&s->mtx);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
// Look for a context to receive it.
if ((nni_idhash_find(s->requests, id, (void **) &ctx) != 0) ||
@@ -526,8 +515,8 @@ req0_run_send_queue(req0_sock *s, nni_list *send_list)
}
}
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
}
}
@@ -763,14 +752,19 @@ static int
req0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
{
req0_sock *s = arg;
- return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, t));
+ int ttl;
+ int rv;
+ if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) {
+ nni_atomic_set(&s->ttl, ttl);
+ }
+ return (rv);
}
static int
req0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
req0_sock *s = arg;
- return (nni_copyout_int(s->ttl, buf, szp, t));
+ return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
}
static int