aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/survey0
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-08-15 14:09:17 -0700
committerGarrett D'Amore <garrett@damore.org>2020-08-16 23:07:35 -0700
commit4f5e11c391c4a8f1b2731aee5ad47bc0c925042a (patch)
tree640aef66eb7e0030a2833bc9bba3246edb29d074 /src/protocol/survey0
parent750662d4aab305d8a3d48bfa6edfc4dac4018881 (diff)
downloadnng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.tar.gz
nng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.tar.bz2
nng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.zip
fixes #1289 zerotier should have it's own copy of the id hashing code
fixes #1288 id allocation can overallocate fixes #1126 consider removing lock from idhash This substantially refactors the id hash code, giving a cleaner API, and eliminating a extra locking as well as some wasteful allocations. The ZeroTier code has it's own copy, that is 64-bit friendly, as the rest of the consumers need only a simpler 32-bit API.
Diffstat (limited to 'src/protocol/survey0')
-rw-r--r--src/protocol/survey0/respond.c16
-rw-r--r--src/protocol/survey0/respond_test.c3
-rw-r--r--src/protocol/survey0/survey.c75
-rw-r--r--src/protocol/survey0/xrespond.c124
4 files changed, 102 insertions, 116 deletions
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index b414c189..7583c4d8 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -50,7 +50,7 @@ struct resp0_ctx {
struct resp0_sock {
nni_mtx mtx;
nni_atomic_int ttl;
- nni_idhash * pipes;
+ nni_id_map pipes;
resp0_ctx ctx;
nni_list recvpipes;
nni_list recvq;
@@ -181,7 +181,7 @@ resp0_ctx_send(void *arg, nni_aio *aio)
return;
}
- if (nni_idhash_find(s->pipes, pid, (void **) &p) != 0) {
+ if ((p = nni_id_get(&s->pipes, pid)) == NULL) {
// Surveyor has left the building. Just discard the reply.
nni_mtx_unlock(&s->mtx);
nni_aio_set_msg(aio, NULL);
@@ -213,7 +213,7 @@ resp0_sock_fini(void *arg)
{
resp0_sock *s = arg;
- nni_idhash_fini(s->pipes);
+ nni_id_map_fini(&s->pipes);
resp0_ctx_fini(&s->ctx);
nni_pollable_fini(&s->writable);
nni_pollable_fini(&s->readable);
@@ -224,15 +224,11 @@ static int
resp0_sock_init(void *arg, nni_sock *nsock)
{
resp0_sock *s = arg;
- int rv;
NNI_ARG_UNUSED(nsock);
nni_mtx_init(&s->mtx);
- if ((rv = nni_idhash_init(&s->pipes)) != 0) {
- resp0_sock_fini(s);
- return (rv);
- }
+ nni_id_map_init(&s->pipes, 0, 0, false);
NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode);
NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode);
@@ -316,7 +312,7 @@ resp0_pipe_start(void *arg)
}
nni_mtx_lock(&s->mtx);
- rv = nni_idhash_insert(s->pipes, p->id, p);
+ rv = nni_id_set(&s->pipes, p->id, p);
nni_mtx_unlock(&s->mtx);
if (rv != 0) {
return (rv);
@@ -354,7 +350,7 @@ resp0_pipe_close(void *arg)
// which we will happily discard.
nni_pollable_raise(&s->writable);
}
- nni_idhash_remove(s->pipes, p->id);
+ nni_id_remove(&s->pipes, p->id);
nni_mtx_unlock(&s->mtx);
}
diff --git a/src/protocol/survey0/respond_test.c b/src/protocol/survey0/respond_test.c
index 2801222c..efda181b 100644
--- a/src/protocol/survey0/respond_test.c
+++ b/src/protocol/survey0/respond_test.c
@@ -13,9 +13,6 @@
#include <nng/protocol/survey0/respond.h>
#include <nng/protocol/survey0/survey.h>
-#include <nng/protocol/reqrep0/rep.h>
-#include <nng/protocol/reqrep0/req.h>
-
#include <acutest.h>
#include <testutil.h>
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index e4cdca2c..f2cc8aa8 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -29,7 +29,7 @@ static void surv0_ctx_timeout(void *);
struct surv0_ctx {
surv0_sock * sock;
- uint64_t survey_id; // survey id
+ uint32_t survey_id; // survey id
nni_timer_node timer;
nni_time expire;
nni_lmq recv_lmq;
@@ -45,7 +45,7 @@ struct surv0_sock {
nni_list pipes;
nni_mtx mtx;
surv0_ctx ctx;
- nni_idhash * surveys;
+ nni_id_map surveys;
nni_pollable writable;
nni_pollable readable;
nni_atomic_int send_buf;
@@ -57,8 +57,8 @@ struct surv0_pipe {
surv0_sock * sock;
nni_lmq send_queue;
nni_list_node node;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_recv;
bool busy;
bool closed;
};
@@ -75,7 +75,7 @@ surv0_ctx_abort(surv0_ctx *ctx, int err)
}
nni_lmq_flush(&ctx->recv_lmq);
if (ctx->survey_id != 0) {
- nni_idhash_remove(sock->surveys, ctx->survey_id);
+ nni_id_remove(&sock->surveys, ctx->survey_id);
ctx->survey_id = 0;
}
if (ctx == &sock->ctx) {
@@ -148,7 +148,7 @@ surv0_ctx_cancel(nni_aio *aio, void *arg, int rv)
nni_aio_finish_error(aio, rv);
}
if (ctx->survey_id != 0) {
- nni_idhash_remove(sock->surveys, ctx->survey_id);
+ nni_id_remove(&sock->surveys, ctx->survey_id);
ctx->survey_id = 0;
}
nni_mtx_unlock(&sock->mtx);
@@ -237,8 +237,7 @@ surv0_ctx_send(void *arg, nni_aio *aio)
nni_timer_cancel(&ctx->timer);
// Allocate the new ID.
- if ((rv = nni_idhash_alloc(sock->surveys, &ctx->survey_id, ctx)) !=
- 0) {
+ if ((rv = nni_id_alloc(&sock->surveys, &ctx->survey_id, ctx)) != 0) {
nni_mtx_unlock(&sock->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -256,8 +255,8 @@ surv0_ctx_send(void *arg, nni_aio *aio)
if (!pipe->busy) {
pipe->busy = true;
nni_msg_clone(msg);
- nni_aio_set_msg(pipe->aio_send, msg);
- nni_pipe_send(pipe->pipe, pipe->aio_send);
+ nni_aio_set_msg(&pipe->aio_send, msg);
+ nni_pipe_send(pipe->pipe, &pipe->aio_send);
} else if (!nni_lmq_full(&pipe->send_queue)) {
nni_msg_clone(msg);
nni_lmq_putq(&pipe->send_queue, msg);
@@ -279,7 +278,7 @@ surv0_sock_fini(void *arg)
surv0_sock *sock = arg;
surv0_ctx_fini(&sock->ctx);
- nni_idhash_fini(sock->surveys);
+ nni_id_map_fini(&sock->surveys);
nni_pollable_fini(&sock->writable);
nni_pollable_fini(&sock->readable);
nni_mtx_fini(&sock->mtx);
@@ -307,17 +306,15 @@ surv0_sock_init(void *arg, nni_sock *s)
nni_atomic_init(&sock->send_buf);
nni_atomic_set(&sock->send_buf, 8);
- if (((rv = nni_idhash_init(&sock->surveys)) != 0) ||
- ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0)) {
- surv0_sock_fini(sock);
- return (rv);
- }
-
// Survey IDs are 32 bits, with the high order bit set.
// We start at a random point, to minimize likelihood of
// accidental collision across restarts.
- nni_idhash_set_limits(sock->surveys, 0x80000000u, 0xffffffffu,
- nni_random() | 0x80000000u);
+ nni_id_map_init(&sock->surveys, 0x80000000u, 0xffffffffu, true);
+
+ if ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0) {
+ surv0_sock_fini(sock);
+ return (rv);
+ }
sock->ttl = 8;
@@ -343,8 +340,8 @@ surv0_pipe_stop(void *arg)
{
surv0_pipe *p = arg;
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
+ nni_aio_stop(&p->aio_send);
+ nni_aio_stop(&p->aio_recv);
}
static void
@@ -352,8 +349,8 @@ surv0_pipe_fini(void *arg)
{
surv0_pipe *p = arg;
- nni_aio_free(p->aio_send);
- nni_aio_free(p->aio_recv);
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
nni_lmq_fini(&p->send_queue);
}
@@ -366,13 +363,13 @@ surv0_pipe_init(void *arg, nni_pipe *pipe, void *s)
int len;
len = nni_atomic_get(&sock->send_buf);
+ nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p);
// This depth could be tunable. The deeper the queue, the more
// concurrent surveys that can be delivered (multiple contexts).
// Note that surveys can be *outstanding*, but not yet put on the wire.
- if (((rv = nni_lmq_init(&p->send_queue, len)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_send, surv0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) {
+ if ((rv = nni_lmq_init(&p->send_queue, len)) != 0) {
surv0_pipe_fini(p);
return (rv);
}
@@ -396,7 +393,7 @@ surv0_pipe_start(void *arg)
nni_list_append(&s->pipes, p);
nni_mtx_unlock(&s->mtx);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -406,8 +403,8 @@ surv0_pipe_close(void *arg)
surv0_pipe *p = arg;
surv0_sock *s = p->sock;
- nni_aio_close(p->aio_send);
- nni_aio_close(p->aio_recv);
+ nni_aio_close(&p->aio_send);
+ nni_aio_close(&p->aio_recv);
nni_mtx_lock(&s->mtx);
p->closed = true;
@@ -425,9 +422,9 @@ surv0_pipe_send_cb(void *arg)
surv0_sock *sock = p->sock;
nni_msg * msg;
- if (nni_aio_result(p->aio_send) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->pipe);
return;
}
@@ -438,8 +435,8 @@ surv0_pipe_send_cb(void *arg)
return;
}
if (nni_lmq_getq(&p->send_queue, &msg) == 0) {
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
} else {
p->busy = false;
}
@@ -456,13 +453,13 @@ surv0_pipe_recv_cb(void *arg)
uint32_t id;
nni_aio * aio;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
// We yank 4 bytes of body, and move them to the header.
@@ -478,7 +475,7 @@ surv0_pipe_recv_cb(void *arg)
nni_mtx_lock(&sock->mtx);
// Best effort at delivery. Discard if no context or context is
// unable to receive it.
- if ((nni_idhash_find(sock->surveys, id, (void **) &ctx) != 0) ||
+ if (((ctx = nni_id_get(&sock->surveys, id)) == NULL) ||
(nni_lmq_full(&ctx->recv_lmq))) {
nni_msg_free(msg);
} else if ((aio = nni_list_first(&ctx->recv_queue)) != NULL) {
@@ -492,7 +489,7 @@ surv0_pipe_recv_cb(void *arg)
}
nni_mtx_unlock(&sock->mtx);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
}
static int
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
index 25aacc2c..b2f203c3 100644
--- a/src/protocol/survey0/xrespond.c
+++ b/src/protocol/survey0/xrespond.c
@@ -40,8 +40,8 @@ struct xresp0_sock {
nni_msgq * urq;
nni_msgq * uwq;
nni_atomic_int ttl;
- nni_idhash * pipes;
- nni_aio * aio_getq;
+ nni_id_map pipes;
+ nni_aio aio_getq;
nni_mtx mtx;
};
@@ -51,10 +51,10 @@ struct xresp0_pipe {
xresp0_sock *psock;
uint32_t id;
nni_msgq * sendq;
- nni_aio * aio_getq;
- nni_aio * aio_putq;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+ nni_aio aio_getq;
+ nni_aio aio_putq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
};
static void
@@ -62,8 +62,8 @@ xresp0_sock_fini(void *arg)
{
xresp0_sock *s = arg;
- nni_aio_free(s->aio_getq);
- nni_idhash_fini(s->pipes);
+ nni_aio_fini(&s->aio_getq);
+ nni_id_map_fini(&s->pipes);
nni_mtx_fini(&s->mtx);
}
@@ -71,17 +71,12 @@ static int
xresp0_sock_init(void *arg, nni_sock *nsock)
{
xresp0_sock *s = arg;
- int rv;
nni_mtx_init(&s->mtx);
nni_atomic_init(&s->ttl);
nni_atomic_set(&s->ttl, 8); // Per RFC
- if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_alloc(&s->aio_getq, xresp0_sock_getq_cb, s)) !=
- 0)) {
- xresp0_sock_fini(s);
- return (rv);
- }
+ nni_id_map_init(&s->pipes, 0, 0, false);
+ nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s);
s->urq = nni_sock_recvq(nsock);
s->uwq = nni_sock_sendq(nsock);
@@ -94,7 +89,7 @@ xresp0_sock_open(void *arg)
{
xresp0_sock *s = arg;
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ nni_msgq_aio_get(s->uwq, &s->aio_getq);
}
static void
@@ -102,7 +97,7 @@ xresp0_sock_close(void *arg)
{
xresp0_sock *s = arg;
- nni_aio_close(s->aio_getq);
+ nni_aio_close(&s->aio_getq);
}
static void
@@ -110,10 +105,10 @@ xresp0_pipe_stop(void *arg)
{
xresp0_pipe *p = arg;
- nni_aio_stop(p->aio_putq);
- nni_aio_stop(p->aio_getq);
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
+ nni_aio_stop(&p->aio_putq);
+ nni_aio_stop(&p->aio_getq);
+ nni_aio_stop(&p->aio_send);
+ nni_aio_stop(&p->aio_recv);
}
static void
@@ -121,10 +116,10 @@ xresp0_pipe_fini(void *arg)
{
xresp0_pipe *p = arg;
- nni_aio_free(p->aio_putq);
- nni_aio_free(p->aio_getq);
- nni_aio_free(p->aio_send);
- nni_aio_free(p->aio_recv);
+ nni_aio_fini(&p->aio_putq);
+ nni_aio_fini(&p->aio_getq);
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
nni_msgq_fini(p->sendq);
}
@@ -134,11 +129,12 @@ xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
xresp0_pipe *p = arg;
int rv;
- if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_putq, xresp0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_recv, xresp0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_getq, xresp0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_send, xresp0_send_cb, p)) != 0)) {
+ nni_aio_init(&p->aio_putq, xresp0_putq_cb, p);
+ nni_aio_init(&p->aio_recv, xresp0_recv_cb, p);
+ nni_aio_init(&p->aio_getq, xresp0_getq_cb, p);
+ nni_aio_init(&p->aio_send, xresp0_send_cb, p);
+
+ if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) {
xresp0_pipe_fini(p);
return (rv);
}
@@ -162,14 +158,14 @@ xresp0_pipe_start(void *arg)
p->id = nni_pipe_id(p->npipe);
nni_mtx_lock(&s->mtx);
- rv = nni_idhash_insert(s->pipes, p->id, p);
+ rv = nni_id_set(&s->pipes, p->id, p);
nni_mtx_unlock(&s->mtx);
if (rv != 0) {
return (rv);
}
- nni_pipe_recv(p->npipe, p->aio_recv);
- nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_msgq_aio_get(p->sendq, &p->aio_getq);
return (rv);
}
@@ -180,15 +176,15 @@ xresp0_pipe_close(void *arg)
xresp0_pipe *p = arg;
xresp0_sock *s = p->psock;
- nni_aio_close(p->aio_putq);
- nni_aio_close(p->aio_getq);
- nni_aio_close(p->aio_send);
- nni_aio_close(p->aio_recv);
+ nni_aio_close(&p->aio_putq);
+ nni_aio_close(&p->aio_getq);
+ nni_aio_close(&p->aio_send);
+ nni_aio_close(&p->aio_recv);
nni_msgq_close(p->sendq);
nni_mtx_lock(&s->mtx);
- nni_idhash_remove(s->pipes, p->id);
+ nni_id_remove(&s->pipes, p->id);
nni_mtx_unlock(&s->mtx);
}
@@ -205,17 +201,17 @@ xresp0_sock_getq_cb(void *arg)
uint32_t id;
xresp0_pipe *p;
- if (nni_aio_result(s->aio_getq) != 0) {
+ if (nni_aio_result(&s->aio_getq) != 0) {
return;
}
- msg = nni_aio_get_msg(s->aio_getq);
- nni_aio_set_msg(s->aio_getq, NULL);
+ msg = nni_aio_get_msg(&s->aio_getq);
+ nni_aio_set_msg(&s->aio_getq, NULL);
// We yank the outgoing pipe id from the header
if (nni_msg_header_len(msg) < 4) {
nni_msg_free(msg);
// We can't really close down the socket, so just keep going.
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ nni_msgq_aio_get(s->uwq, &s->aio_getq);
return;
}
id = nni_msg_header_trim_u32(msg);
@@ -224,12 +220,12 @@ xresp0_sock_getq_cb(void *arg)
// Look for the pipe, and attempt to put the message there
// (nonblocking) if we can. If we can't for any reason, then we
// free the message.
- if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) ||
+ if (((p = nni_id_get(&s->pipes, id)) == NULL) ||
(nni_msgq_tryput(p->sendq, msg) != 0)) {
nni_msg_free(msg);
}
nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ nni_msgq_aio_get(s->uwq, &s->aio_getq);
}
void
@@ -237,15 +233,15 @@ xresp0_getq_cb(void *arg)
{
xresp0_pipe *p = arg;
- if (nni_aio_result(p->aio_getq) != 0) {
+ if (nni_aio_result(&p->aio_getq) != 0) {
nni_pipe_close(p->npipe);
return;
}
- nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
- nni_aio_set_msg(p->aio_getq, NULL);
+ nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq));
+ nni_aio_set_msg(&p->aio_getq, NULL);
- nni_pipe_send(p->npipe, p->aio_send);
+ nni_pipe_send(p->npipe, &p->aio_send);
}
void
@@ -253,14 +249,14 @@ xresp0_send_cb(void *arg)
{
xresp0_pipe *p = arg;
- if (nni_aio_result(p->aio_send) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->npipe);
return;
}
- nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_msgq_aio_get(p->sendq, &p->aio_getq);
}
static void
@@ -273,14 +269,14 @@ xresp0_recv_cb(void *arg)
int hops;
int ttl;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->npipe);
return;
}
ttl = nni_atomic_get(&s->ttl);
- msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, p->id);
// Store the pipe id in the header, first thing.
@@ -314,13 +310,13 @@ xresp0_recv_cb(void *arg)
}
// Now send it up.
- nni_aio_set_msg(p->aio_putq, msg);
- nni_msgq_aio_put(urq, p->aio_putq);
+ nni_aio_set_msg(&p->aio_putq, msg);
+ nni_msgq_aio_put(urq, &p->aio_putq);
return;
drop:
nni_msg_free(msg);
- nni_pipe_recv(p->npipe, p->aio_recv);
+ nni_pipe_recv(p->npipe, &p->aio_recv);
}
static void
@@ -328,22 +324,22 @@ xresp0_putq_cb(void *arg)
{
xresp0_pipe *p = arg;
- if (nni_aio_result(p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_putq));
- nni_aio_set_msg(p->aio_putq, NULL);
+ if (nni_aio_result(&p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_putq));
+ nni_aio_set_msg(&p->aio_putq, NULL);
nni_pipe_close(p->npipe);
return;
}
- nni_pipe_recv(p->npipe, p->aio_recv);
+ nni_pipe_recv(p->npipe, &p->aio_recv);
}
static int
xresp0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
{
xresp0_sock *s = arg;
- int ttl;
- int rv;
+ int ttl;
+ int rv;
if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) {
nni_atomic_set(&s->ttl, ttl);
}