aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-08-15 14:09:17 -0700
committerGarrett D'Amore <garrett@damore.org>2020-08-16 23:07:35 -0700
commit4f5e11c391c4a8f1b2731aee5ad47bc0c925042a (patch)
tree640aef66eb7e0030a2833bc9bba3246edb29d074 /src/protocol/reqrep0
parent750662d4aab305d8a3d48bfa6edfc4dac4018881 (diff)
downloadnng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.tar.gz
nng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.tar.bz2
nng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.zip
fixes #1289 zerotier should have it's own copy of the id hashing code
fixes #1288 id allocation can overallocate fixes #1126 consider removing lock from idhash This substantially refactors the id hash code, giving a cleaner API, and eliminating a extra locking as well as some wasteful allocations. The ZeroTier code has it's own copy, that is 64-bit friendly, as the rest of the consumers need only a simpler 32-bit API.
Diffstat (limited to 'src/protocol/reqrep0')
-rw-r--r--src/protocol/reqrep0/rep.c20
-rw-r--r--src/protocol/reqrep0/req.c24
-rw-r--r--src/protocol/reqrep0/xrep.c20
3 files changed, 26 insertions, 38 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index e750ef56..6f859ee6 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -41,7 +41,7 @@ struct rep0_ctx {
struct rep0_sock {
nni_mtx lk;
nni_atomic_int ttl;
- nni_idhash * pipes;
+ nni_id_map pipes;
nni_list recvpipes; // list of pipes with data to receive
nni_list recvq;
rep0_ctx ctx;
@@ -177,7 +177,7 @@ rep0_ctx_send(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
- if (nni_idhash_find(s->pipes, p_id, (void **) &p) != 0) {
+ if ((p = nni_id_get(&s->pipes, p_id)) == NULL) {
// Pipe is gone. Make this look like a good send to avoid
// disrupting the state machine. We don't care if the peer
// lost interest in our reply.
@@ -210,7 +210,7 @@ rep0_sock_fini(void *arg)
{
rep0_sock *s = arg;
- nni_idhash_fini(s->pipes);
+ nni_id_map_fini(&s->pipes);
rep0_ctx_fini(&s->ctx);
nni_pollable_fini(&s->writable);
nni_pollable_fini(&s->readable);
@@ -221,16 +221,11 @@ static int
rep0_sock_init(void *arg, nni_sock *sock)
{
rep0_sock *s = arg;
- int rv;
NNI_ARG_UNUSED(sock);
nni_mtx_init(&s->lk);
- if ((rv = nni_idhash_init(&s->pipes)) != 0) {
- rep0_sock_fini(s);
- return (rv);
- }
-
+ nni_id_map_init(&s->pipes, 0, 0, false);
NNI_LIST_INIT(&s->recvq, rep0_ctx, rqnode);
NNI_LIST_INIT(&s->recvpipes, rep0_pipe, rnode);
nni_atomic_init(&s->ttl);
@@ -312,7 +307,10 @@ rep0_pipe_start(void *arg)
return (NNG_EPROTO);
}
- if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
+ nni_mtx_lock(&s->lk);
+ rv = nni_id_set(&s->pipes, nni_pipe_id(p->pipe), p);
+ nni_mtx_unlock(&s->lk);
+ if (rv != 0) {
return (rv);
}
// By definition, we have not received a request yet on this pipe,
@@ -355,7 +353,7 @@ rep0_pipe_close(void *arg)
// accept a message and discard it.)
nni_pollable_raise(&s->writable);
}
- nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
+ nni_id_remove(&s->pipes, nni_pipe_id(p->pipe));
nni_mtx_unlock(&s->lk);
}
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 0112f835..c63359d5 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -55,7 +55,7 @@ struct req0_sock {
nni_list stop_pipes;
nni_list contexts;
nni_list send_queue; // contexts waiting to send.
- nni_idhash * requests; // contexts by request ID
+ nni_id_map requests; // contexts by request ID
nni_pollable readable;
nni_pollable writable;
nni_mtx mtx;
@@ -80,19 +80,13 @@ static int
req0_sock_init(void *arg, nni_sock *sock)
{
req0_sock *s = arg;
- int rv;
NNI_ARG_UNUSED(sock);
- if ((rv = nni_idhash_init(&s->requests)) != 0) {
- return (rv);
- }
-
// Request IDs are 32 bits, with the high order bit set.
// We start at a random point, to minimize likelihood of
// accidental collision across restarts.
- nni_idhash_set_limits(
- s->requests, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u);
+ nni_id_map_init(&s->requests, 0x80000000u, 0xffffffffu, true);
nni_mtx_init(&s->mtx);
@@ -145,7 +139,7 @@ req0_sock_fini(void *arg)
req0_ctx_fini(&s->master);
nni_pollable_fini(&s->readable);
nni_pollable_fini(&s->writable);
- nni_idhash_fini(s->requests);
+ nni_id_map_fini(&s->requests);
nni_mtx_fini(&s->mtx);
}
@@ -316,7 +310,7 @@ req0_recv_cb(void *arg)
nni_pipe_recv(p->pipe, &p->aio_recv);
// Look for a context to receive it.
- if ((nni_idhash_find(s->requests, id, (void **) &ctx) != 0) ||
+ if (((ctx = nni_id_get(&s->requests, id)) == NULL) ||
(ctx->send_aio != NULL) || (ctx->rep_msg != NULL)) {
nni_mtx_unlock(&s->mtx);
// No waiting context, we have not sent the request out to
@@ -328,7 +322,7 @@ req0_recv_cb(void *arg)
// We have our match, so we can remove this.
nni_list_node_remove(&ctx->send_node);
- nni_idhash_remove(s->requests, id);
+ nni_id_remove(&s->requests, id);
ctx->request_id = 0;
if (ctx->req_msg != NULL) {
nni_msg_free(ctx->req_msg);
@@ -512,7 +506,7 @@ req0_ctx_reset(req0_ctx *ctx)
nni_list_node_remove(&ctx->pipe_node);
nni_list_node_remove(&ctx->send_node);
if (ctx->request_id != 0) {
- nni_idhash_remove(s->requests, ctx->request_id);
+ nni_id_remove(&s->requests, ctx->request_id);
ctx->request_id = 0;
}
if (ctx->req_msg != NULL) {
@@ -631,7 +625,6 @@ req0_ctx_send(void *arg, nni_aio *aio)
req0_ctx * ctx = arg;
req0_sock *s = ctx->sock;
nng_msg * msg = nni_aio_get_msg(aio);
- uint64_t id;
int rv;
if (nni_aio_begin(aio) != 0) {
@@ -662,12 +655,11 @@ req0_ctx_send(void *arg, nni_aio *aio)
req0_ctx_reset(ctx);
// Insert us on the per ID hash list, so that receives can find us.
- if ((rv = nni_idhash_alloc(s->requests, &id, ctx)) != 0) {
+ if ((rv = nni_id_alloc(&s->requests, &ctx->request_id, ctx)) != 0) {
nni_mtx_unlock(&s->mtx);
nni_aio_finish_error(aio, rv);
return;
}
- ctx->request_id = (uint32_t) id;
nni_msg_header_clear(msg);
nni_msg_header_append_u32(msg, ctx->request_id);
@@ -675,7 +667,7 @@ req0_ctx_send(void *arg, nni_aio *aio)
// schedule), then fail it. Should be NNG_ETIMEDOUT.
rv = nni_aio_schedule(aio, req0_ctx_cancel_send, ctx);
if ((rv != 0) && (nni_list_empty(&s->ready_pipes))) {
- nni_idhash_remove(s->requests, id);
+ nni_id_remove(&s->requests, ctx->request_id);
nni_mtx_unlock(&s->mtx);
nni_aio_finish_error(aio, rv);
return;
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index 0bce27ba..9737c600 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -33,7 +33,7 @@ struct xrep0_sock {
nni_msgq * urq;
nni_mtx lk;
nni_atomic_int ttl;
- nni_idhash * pipes;
+ nni_id_map pipes;
nni_aio aio_getq;
};
@@ -54,7 +54,7 @@ xrep0_sock_fini(void *arg)
xrep0_sock *s = arg;
nni_aio_fini(&s->aio_getq);
- nni_idhash_fini(s->pipes);
+ nni_id_map_fini(&s->pipes);
nni_mtx_fini(&s->lk);
}
@@ -62,7 +62,6 @@ static int
xrep0_sock_init(void *arg, nni_sock *sock)
{
xrep0_sock *s = arg;
- int rv;
nni_mtx_init(&s->lk);
nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s);
@@ -71,11 +70,7 @@ xrep0_sock_init(void *arg, nni_sock *sock)
s->uwq = nni_sock_sendq(sock);
s->urq = nni_sock_recvq(sock);
- if ((rv = nni_idhash_init(&s->pipes)) != 0) {
- xrep0_sock_fini(s);
- return (rv);
- }
-
+ nni_id_map_init(&s->pipes, 0, 0, false);
return (0);
}
@@ -164,7 +159,10 @@ xrep0_pipe_start(void *arg)
return (NNG_EPROTO);
}
- if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
+ nni_mtx_lock(&s->lk);
+ rv = nni_id_set(&s->pipes, nni_pipe_id(p->pipe), p);
+ nni_mtx_unlock(&s->lk);
+ if (rv != 0) {
return (rv);
}
@@ -186,7 +184,7 @@ xrep0_pipe_close(void *arg)
nni_msgq_close(p->sendq);
nni_mtx_lock(&s->lk);
- nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
+ nni_id_remove(&s->pipes, nni_pipe_id(p->pipe));
nni_mtx_unlock(&s->lk);
}
@@ -227,7 +225,7 @@ xrep0_sock_getq_cb(void *arg)
// (non-blocking) if we can. If we can't for any reason, then we
// free the message.
nni_mtx_lock(&s->lk);
- if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) ||
+ if (((p = nni_id_get(&s->pipes, id)) == NULL) ||
(nni_msgq_tryput(p->sendq, msg) != 0)) {
nni_msg_free(msg);
}