diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-08-15 14:09:17 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-08-16 23:07:35 -0700 |
| commit | 4f5e11c391c4a8f1b2731aee5ad47bc0c925042a (patch) | |
| tree | 640aef66eb7e0030a2833bc9bba3246edb29d074 /src/protocol | |
| parent | 750662d4aab305d8a3d48bfa6edfc4dac4018881 (diff) | |
| download | nng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.tar.gz nng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.tar.bz2 nng-4f5e11c391c4a8f1b2731aee5ad47bc0c925042a.zip | |
fixes #1289 zerotier should have it's own copy of the id hashing code
fixes #1288 id allocation can overallocate
fixes #1126 consider removing lock from idhash
This substantially refactors the id hash code, giving a cleaner API,
and eliminating a extra locking as well as some wasteful allocations.
The ZeroTier code has it's own copy, that is 64-bit friendly, as the
rest of the consumers need only a simpler 32-bit API.
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/pair1/pair.c | 15 | ||||
| -rw-r--r-- | src/protocol/pair1/pair1_poly.c | 14 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 20 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 24 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xrep.c | 20 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.c | 16 | ||||
| -rw-r--r-- | src/protocol/survey0/respond_test.c | 3 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c | 75 | ||||
| -rw-r--r-- | src/protocol/survey0/xrespond.c | 124 |
9 files changed, 140 insertions, 171 deletions
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index 00959a4c..0492119a 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -34,7 +34,7 @@ struct pair1_sock { bool raw; nni_atomic_int ttl; nni_mtx mtx; - nni_idhash * pipes; + nni_id_map pipes; nni_list plist; bool started; nni_stat_item stat_poly; @@ -66,7 +66,7 @@ pair1_sock_fini(void *arg) { pair1_sock *s = arg; - nni_idhash_fini(s->pipes); + nni_id_map_fini(&s->pipes); nni_mtx_fini(&s->mtx); } @@ -75,9 +75,7 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw) { pair1_sock *s = arg; - if (nni_idhash_init(&s->pipes) != 0) { - return (NNG_ENOMEM); - } + nni_id_map_init(&s->pipes, 0, 0, false); NNI_LIST_INIT(&s->plist, pair1_pipe, node); // Raw mode uses this. @@ -199,12 +197,12 @@ pair1_pipe_start(void *arg) } id = nni_pipe_id(p->pipe); - if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) { + if ((rv = nni_id_set(&s->pipes, id, p)) != 0) { nni_mtx_unlock(&s->mtx); return (rv); } if (!nni_list_empty(&s->plist)) { - nni_idhash_remove(s->pipes, id); + nni_id_remove(&s->pipes, id); nni_mtx_unlock(&s->mtx); BUMP_STAT(&s->stat_reject_already); return (NNG_EBUSY); @@ -234,7 +232,7 @@ pair1_pipe_close(void *arg) nni_aio_close(&p->aio_get); nni_mtx_lock(&s->mtx); - nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); + nni_id_remove(&s->pipes, nni_pipe_id(p->pipe)); nni_list_node_remove(&p->node); nni_mtx_unlock(&s->mtx); } @@ -400,7 +398,6 @@ pair1_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); } - #ifdef NNG_TEST_LIB static int pair1_set_test_inject_header(void *arg, const void *buf, size_t sz, nni_type t) diff --git a/src/protocol/pair1/pair1_poly.c b/src/protocol/pair1/pair1_poly.c index 950c60f7..fc1bbf6a 100644 --- a/src/protocol/pair1/pair1_poly.c +++ b/src/protocol/pair1/pair1_poly.c @@ -40,7 +40,7 @@ struct pair1poly_sock { nni_sock * sock; nni_atomic_int ttl; nni_mtx mtx; - nni_idhash * pipes; + nni_id_map pipes; nni_list plist; bool started; nni_aio aio_get; @@ -72,7 +72,7 @@ pair1poly_sock_fini(void *arg) pair1poly_sock *s = arg; nni_aio_fini(&s->aio_get); - nni_idhash_fini(s->pipes); + nni_id_map_fini(&s->pipes); nni_mtx_fini(&s->mtx); } @@ -81,9 +81,7 @@ pair1poly_sock_init(void *arg, nni_sock *sock) { pair1poly_sock *s = arg; - if (nni_idhash_init(&s->pipes) != 0) { - return (NNG_ENOMEM); - } + nni_id_map_init(&s->pipes, 0, 0, false); NNI_LIST_INIT(&s->plist, pair1poly_pipe, node); // Raw mode uses this. @@ -196,7 +194,7 @@ pair1poly_pipe_start(void *arg) } id = nni_pipe_id(p->pipe); - if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) { + if ((rv = nni_id_set(&s->pipes, id, p)) != 0) { nni_mtx_unlock(&s->mtx); return (rv); } @@ -231,7 +229,7 @@ pair1poly_pipe_close(void *arg) nni_aio_close(&p->aio_get); nni_mtx_lock(&s->mtx); - nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); + nni_id_remove(&s->pipes, nni_pipe_id(p->pipe)); nni_list_node_remove(&p->node); nni_mtx_unlock(&s->mtx); @@ -311,7 +309,7 @@ pair1poly_sock_get_cb(void *arg) (!nni_list_empty(&s->plist))) { p = nni_list_first(&s->plist); } else { - nni_idhash_find(s->pipes, id, (void **) &p); + p = nni_id_get(&s->pipes, id); } // Try a non-blocking send. If this fails we just discard the diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index e750ef56..6f859ee6 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -41,7 +41,7 @@ struct rep0_ctx { struct rep0_sock { nni_mtx lk; nni_atomic_int ttl; - nni_idhash * pipes; + nni_id_map pipes; nni_list recvpipes; // list of pipes with data to receive nni_list recvq; rep0_ctx ctx; @@ -177,7 +177,7 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } - if (nni_idhash_find(s->pipes, p_id, (void **) &p) != 0) { + if ((p = nni_id_get(&s->pipes, p_id)) == NULL) { // Pipe is gone. Make this look like a good send to avoid // disrupting the state machine. We don't care if the peer // lost interest in our reply. @@ -210,7 +210,7 @@ rep0_sock_fini(void *arg) { rep0_sock *s = arg; - nni_idhash_fini(s->pipes); + nni_id_map_fini(&s->pipes); rep0_ctx_fini(&s->ctx); nni_pollable_fini(&s->writable); nni_pollable_fini(&s->readable); @@ -221,16 +221,11 @@ static int rep0_sock_init(void *arg, nni_sock *sock) { rep0_sock *s = arg; - int rv; NNI_ARG_UNUSED(sock); nni_mtx_init(&s->lk); - if ((rv = nni_idhash_init(&s->pipes)) != 0) { - rep0_sock_fini(s); - return (rv); - } - + nni_id_map_init(&s->pipes, 0, 0, false); NNI_LIST_INIT(&s->recvq, rep0_ctx, rqnode); NNI_LIST_INIT(&s->recvpipes, rep0_pipe, rnode); nni_atomic_init(&s->ttl); @@ -312,7 +307,10 @@ rep0_pipe_start(void *arg) return (NNG_EPROTO); } - if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { + nni_mtx_lock(&s->lk); + rv = nni_id_set(&s->pipes, nni_pipe_id(p->pipe), p); + nni_mtx_unlock(&s->lk); + if (rv != 0) { return (rv); } // By definition, we have not received a request yet on this pipe, @@ -355,7 +353,7 @@ rep0_pipe_close(void *arg) // accept a message and discard it.) nni_pollable_raise(&s->writable); } - nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); + nni_id_remove(&s->pipes, nni_pipe_id(p->pipe)); nni_mtx_unlock(&s->lk); } diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 0112f835..c63359d5 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -55,7 +55,7 @@ struct req0_sock { nni_list stop_pipes; nni_list contexts; nni_list send_queue; // contexts waiting to send. - nni_idhash * requests; // contexts by request ID + nni_id_map requests; // contexts by request ID nni_pollable readable; nni_pollable writable; nni_mtx mtx; @@ -80,19 +80,13 @@ static int req0_sock_init(void *arg, nni_sock *sock) { req0_sock *s = arg; - int rv; NNI_ARG_UNUSED(sock); - if ((rv = nni_idhash_init(&s->requests)) != 0) { - return (rv); - } - // Request IDs are 32 bits, with the high order bit set. // We start at a random point, to minimize likelihood of // accidental collision across restarts. - nni_idhash_set_limits( - s->requests, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u); + nni_id_map_init(&s->requests, 0x80000000u, 0xffffffffu, true); nni_mtx_init(&s->mtx); @@ -145,7 +139,7 @@ req0_sock_fini(void *arg) req0_ctx_fini(&s->master); nni_pollable_fini(&s->readable); nni_pollable_fini(&s->writable); - nni_idhash_fini(s->requests); + nni_id_map_fini(&s->requests); nni_mtx_fini(&s->mtx); } @@ -316,7 +310,7 @@ req0_recv_cb(void *arg) nni_pipe_recv(p->pipe, &p->aio_recv); // Look for a context to receive it. - if ((nni_idhash_find(s->requests, id, (void **) &ctx) != 0) || + if (((ctx = nni_id_get(&s->requests, id)) == NULL) || (ctx->send_aio != NULL) || (ctx->rep_msg != NULL)) { nni_mtx_unlock(&s->mtx); // No waiting context, we have not sent the request out to @@ -328,7 +322,7 @@ req0_recv_cb(void *arg) // We have our match, so we can remove this. nni_list_node_remove(&ctx->send_node); - nni_idhash_remove(s->requests, id); + nni_id_remove(&s->requests, id); ctx->request_id = 0; if (ctx->req_msg != NULL) { nni_msg_free(ctx->req_msg); @@ -512,7 +506,7 @@ req0_ctx_reset(req0_ctx *ctx) nni_list_node_remove(&ctx->pipe_node); nni_list_node_remove(&ctx->send_node); if (ctx->request_id != 0) { - nni_idhash_remove(s->requests, ctx->request_id); + nni_id_remove(&s->requests, ctx->request_id); ctx->request_id = 0; } if (ctx->req_msg != NULL) { @@ -631,7 +625,6 @@ req0_ctx_send(void *arg, nni_aio *aio) req0_ctx * ctx = arg; req0_sock *s = ctx->sock; nng_msg * msg = nni_aio_get_msg(aio); - uint64_t id; int rv; if (nni_aio_begin(aio) != 0) { @@ -662,12 +655,11 @@ req0_ctx_send(void *arg, nni_aio *aio) req0_ctx_reset(ctx); // Insert us on the per ID hash list, so that receives can find us. - if ((rv = nni_idhash_alloc(s->requests, &id, ctx)) != 0) { + if ((rv = nni_id_alloc(&s->requests, &ctx->request_id, ctx)) != 0) { nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); return; } - ctx->request_id = (uint32_t) id; nni_msg_header_clear(msg); nni_msg_header_append_u32(msg, ctx->request_id); @@ -675,7 +667,7 @@ req0_ctx_send(void *arg, nni_aio *aio) // schedule), then fail it. Should be NNG_ETIMEDOUT. rv = nni_aio_schedule(aio, req0_ctx_cancel_send, ctx); if ((rv != 0) && (nni_list_empty(&s->ready_pipes))) { - nni_idhash_remove(s->requests, id); + nni_id_remove(&s->requests, ctx->request_id); nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); return; diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index 0bce27ba..9737c600 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -33,7 +33,7 @@ struct xrep0_sock { nni_msgq * urq; nni_mtx lk; nni_atomic_int ttl; - nni_idhash * pipes; + nni_id_map pipes; nni_aio aio_getq; }; @@ -54,7 +54,7 @@ xrep0_sock_fini(void *arg) xrep0_sock *s = arg; nni_aio_fini(&s->aio_getq); - nni_idhash_fini(s->pipes); + nni_id_map_fini(&s->pipes); nni_mtx_fini(&s->lk); } @@ -62,7 +62,6 @@ static int xrep0_sock_init(void *arg, nni_sock *sock) { xrep0_sock *s = arg; - int rv; nni_mtx_init(&s->lk); nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s); @@ -71,11 +70,7 @@ xrep0_sock_init(void *arg, nni_sock *sock) s->uwq = nni_sock_sendq(sock); s->urq = nni_sock_recvq(sock); - if ((rv = nni_idhash_init(&s->pipes)) != 0) { - xrep0_sock_fini(s); - return (rv); - } - + nni_id_map_init(&s->pipes, 0, 0, false); return (0); } @@ -164,7 +159,10 @@ xrep0_pipe_start(void *arg) return (NNG_EPROTO); } - if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { + nni_mtx_lock(&s->lk); + rv = nni_id_set(&s->pipes, nni_pipe_id(p->pipe), p); + nni_mtx_unlock(&s->lk); + if (rv != 0) { return (rv); } @@ -186,7 +184,7 @@ xrep0_pipe_close(void *arg) nni_msgq_close(p->sendq); nni_mtx_lock(&s->lk); - nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); + nni_id_remove(&s->pipes, nni_pipe_id(p->pipe)); nni_mtx_unlock(&s->lk); } @@ -227,7 +225,7 @@ xrep0_sock_getq_cb(void *arg) // (non-blocking) if we can. If we can't for any reason, then we // free the message. nni_mtx_lock(&s->lk); - if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) || + if (((p = nni_id_get(&s->pipes, id)) == NULL) || (nni_msgq_tryput(p->sendq, msg) != 0)) { nni_msg_free(msg); } diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index b414c189..7583c4d8 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -50,7 +50,7 @@ struct resp0_ctx { struct resp0_sock { nni_mtx mtx; nni_atomic_int ttl; - nni_idhash * pipes; + nni_id_map pipes; resp0_ctx ctx; nni_list recvpipes; nni_list recvq; @@ -181,7 +181,7 @@ resp0_ctx_send(void *arg, nni_aio *aio) return; } - if (nni_idhash_find(s->pipes, pid, (void **) &p) != 0) { + if ((p = nni_id_get(&s->pipes, pid)) == NULL) { // Surveyor has left the building. Just discard the reply. nni_mtx_unlock(&s->mtx); nni_aio_set_msg(aio, NULL); @@ -213,7 +213,7 @@ resp0_sock_fini(void *arg) { resp0_sock *s = arg; - nni_idhash_fini(s->pipes); + nni_id_map_fini(&s->pipes); resp0_ctx_fini(&s->ctx); nni_pollable_fini(&s->writable); nni_pollable_fini(&s->readable); @@ -224,15 +224,11 @@ static int resp0_sock_init(void *arg, nni_sock *nsock) { resp0_sock *s = arg; - int rv; NNI_ARG_UNUSED(nsock); nni_mtx_init(&s->mtx); - if ((rv = nni_idhash_init(&s->pipes)) != 0) { - resp0_sock_fini(s); - return (rv); - } + nni_id_map_init(&s->pipes, 0, 0, false); NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode); NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode); @@ -316,7 +312,7 @@ resp0_pipe_start(void *arg) } nni_mtx_lock(&s->mtx); - rv = nni_idhash_insert(s->pipes, p->id, p); + rv = nni_id_set(&s->pipes, p->id, p); nni_mtx_unlock(&s->mtx); if (rv != 0) { return (rv); @@ -354,7 +350,7 @@ resp0_pipe_close(void *arg) // which we will happily discard. nni_pollable_raise(&s->writable); } - nni_idhash_remove(s->pipes, p->id); + nni_id_remove(&s->pipes, p->id); nni_mtx_unlock(&s->mtx); } diff --git a/src/protocol/survey0/respond_test.c b/src/protocol/survey0/respond_test.c index 2801222c..efda181b 100644 --- a/src/protocol/survey0/respond_test.c +++ b/src/protocol/survey0/respond_test.c @@ -13,9 +13,6 @@ #include <nng/protocol/survey0/respond.h> #include <nng/protocol/survey0/survey.h> -#include <nng/protocol/reqrep0/rep.h> -#include <nng/protocol/reqrep0/req.h> - #include <acutest.h> #include <testutil.h> diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index e4cdca2c..f2cc8aa8 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -29,7 +29,7 @@ static void surv0_ctx_timeout(void *); struct surv0_ctx { surv0_sock * sock; - uint64_t survey_id; // survey id + uint32_t survey_id; // survey id nni_timer_node timer; nni_time expire; nni_lmq recv_lmq; @@ -45,7 +45,7 @@ struct surv0_sock { nni_list pipes; nni_mtx mtx; surv0_ctx ctx; - nni_idhash * surveys; + nni_id_map surveys; nni_pollable writable; nni_pollable readable; nni_atomic_int send_buf; @@ -57,8 +57,8 @@ struct surv0_pipe { surv0_sock * sock; nni_lmq send_queue; nni_list_node node; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; bool busy; bool closed; }; @@ -75,7 +75,7 @@ surv0_ctx_abort(surv0_ctx *ctx, int err) } nni_lmq_flush(&ctx->recv_lmq); if (ctx->survey_id != 0) { - nni_idhash_remove(sock->surveys, ctx->survey_id); + nni_id_remove(&sock->surveys, ctx->survey_id); ctx->survey_id = 0; } if (ctx == &sock->ctx) { @@ -148,7 +148,7 @@ surv0_ctx_cancel(nni_aio *aio, void *arg, int rv) nni_aio_finish_error(aio, rv); } if (ctx->survey_id != 0) { - nni_idhash_remove(sock->surveys, ctx->survey_id); + nni_id_remove(&sock->surveys, ctx->survey_id); ctx->survey_id = 0; } nni_mtx_unlock(&sock->mtx); @@ -237,8 +237,7 @@ surv0_ctx_send(void *arg, nni_aio *aio) nni_timer_cancel(&ctx->timer); // Allocate the new ID. - if ((rv = nni_idhash_alloc(sock->surveys, &ctx->survey_id, ctx)) != - 0) { + if ((rv = nni_id_alloc(&sock->surveys, &ctx->survey_id, ctx)) != 0) { nni_mtx_unlock(&sock->mtx); nni_aio_finish_error(aio, rv); return; @@ -256,8 +255,8 @@ surv0_ctx_send(void *arg, nni_aio *aio) if (!pipe->busy) { pipe->busy = true; nni_msg_clone(msg); - nni_aio_set_msg(pipe->aio_send, msg); - nni_pipe_send(pipe->pipe, pipe->aio_send); + nni_aio_set_msg(&pipe->aio_send, msg); + nni_pipe_send(pipe->pipe, &pipe->aio_send); } else if (!nni_lmq_full(&pipe->send_queue)) { nni_msg_clone(msg); nni_lmq_putq(&pipe->send_queue, msg); @@ -279,7 +278,7 @@ surv0_sock_fini(void *arg) surv0_sock *sock = arg; surv0_ctx_fini(&sock->ctx); - nni_idhash_fini(sock->surveys); + nni_id_map_fini(&sock->surveys); nni_pollable_fini(&sock->writable); nni_pollable_fini(&sock->readable); nni_mtx_fini(&sock->mtx); @@ -307,17 +306,15 @@ surv0_sock_init(void *arg, nni_sock *s) nni_atomic_init(&sock->send_buf); nni_atomic_set(&sock->send_buf, 8); - if (((rv = nni_idhash_init(&sock->surveys)) != 0) || - ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0)) { - surv0_sock_fini(sock); - return (rv); - } - // Survey IDs are 32 bits, with the high order bit set. // We start at a random point, to minimize likelihood of // accidental collision across restarts. - nni_idhash_set_limits(sock->surveys, 0x80000000u, 0xffffffffu, - nni_random() | 0x80000000u); + nni_id_map_init(&sock->surveys, 0x80000000u, 0xffffffffu, true); + + if ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0) { + surv0_sock_fini(sock); + return (rv); + } sock->ttl = 8; @@ -343,8 +340,8 @@ surv0_pipe_stop(void *arg) { surv0_pipe *p = arg; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -352,8 +349,8 @@ surv0_pipe_fini(void *arg) { surv0_pipe *p = arg; - nni_aio_free(p->aio_send); - nni_aio_free(p->aio_recv); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); nni_lmq_fini(&p->send_queue); } @@ -366,13 +363,13 @@ surv0_pipe_init(void *arg, nni_pipe *pipe, void *s) int len; len = nni_atomic_get(&sock->send_buf); + nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p); // This depth could be tunable. The deeper the queue, the more // concurrent surveys that can be delivered (multiple contexts). // Note that surveys can be *outstanding*, but not yet put on the wire. - if (((rv = nni_lmq_init(&p->send_queue, len)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, surv0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) { + if ((rv = nni_lmq_init(&p->send_queue, len)) != 0) { surv0_pipe_fini(p); return (rv); } @@ -396,7 +393,7 @@ surv0_pipe_start(void *arg) nni_list_append(&s->pipes, p); nni_mtx_unlock(&s->mtx); - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -406,8 +403,8 @@ surv0_pipe_close(void *arg) surv0_pipe *p = arg; surv0_sock *s = p->sock; - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_mtx_lock(&s->mtx); p->closed = true; @@ -425,9 +422,9 @@ surv0_pipe_send_cb(void *arg) surv0_sock *sock = p->sock; nni_msg * msg; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->pipe); return; } @@ -438,8 +435,8 @@ surv0_pipe_send_cb(void *arg) return; } if (nni_lmq_getq(&p->send_queue, &msg) == 0) { - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); } else { p->busy = false; } @@ -456,13 +453,13 @@ surv0_pipe_recv_cb(void *arg) uint32_t id; nni_aio * aio; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); // We yank 4 bytes of body, and move them to the header. @@ -478,7 +475,7 @@ surv0_pipe_recv_cb(void *arg) nni_mtx_lock(&sock->mtx); // Best effort at delivery. Discard if no context or context is // unable to receive it. - if ((nni_idhash_find(sock->surveys, id, (void **) &ctx) != 0) || + if (((ctx = nni_id_get(&sock->surveys, id)) == NULL) || (nni_lmq_full(&ctx->recv_lmq))) { nni_msg_free(msg); } else if ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { @@ -492,7 +489,7 @@ surv0_pipe_recv_cb(void *arg) } nni_mtx_unlock(&sock->mtx); - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); } static int diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c index 25aacc2c..b2f203c3 100644 --- a/src/protocol/survey0/xrespond.c +++ b/src/protocol/survey0/xrespond.c @@ -40,8 +40,8 @@ struct xresp0_sock { nni_msgq * urq; nni_msgq * uwq; nni_atomic_int ttl; - nni_idhash * pipes; - nni_aio * aio_getq; + nni_id_map pipes; + nni_aio aio_getq; nni_mtx mtx; }; @@ -51,10 +51,10 @@ struct xresp0_pipe { xresp0_sock *psock; uint32_t id; nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_putq; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_getq; + nni_aio aio_putq; + nni_aio aio_send; + nni_aio aio_recv; }; static void @@ -62,8 +62,8 @@ xresp0_sock_fini(void *arg) { xresp0_sock *s = arg; - nni_aio_free(s->aio_getq); - nni_idhash_fini(s->pipes); + nni_aio_fini(&s->aio_getq); + nni_id_map_fini(&s->pipes); nni_mtx_fini(&s->mtx); } @@ -71,17 +71,12 @@ static int xresp0_sock_init(void *arg, nni_sock *nsock) { xresp0_sock *s = arg; - int rv; nni_mtx_init(&s->mtx); nni_atomic_init(&s->ttl); nni_atomic_set(&s->ttl, 8); // Per RFC - if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_alloc(&s->aio_getq, xresp0_sock_getq_cb, s)) != - 0)) { - xresp0_sock_fini(s); - return (rv); - } + nni_id_map_init(&s->pipes, 0, 0, false); + nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s); s->urq = nni_sock_recvq(nsock); s->uwq = nni_sock_sendq(nsock); @@ -94,7 +89,7 @@ xresp0_sock_open(void *arg) { xresp0_sock *s = arg; - nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_msgq_aio_get(s->uwq, &s->aio_getq); } static void @@ -102,7 +97,7 @@ xresp0_sock_close(void *arg) { xresp0_sock *s = arg; - nni_aio_close(s->aio_getq); + nni_aio_close(&s->aio_getq); } static void @@ -110,10 +105,10 @@ xresp0_pipe_stop(void *arg) { xresp0_pipe *p = arg; - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_putq); + nni_aio_stop(&p->aio_getq); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -121,10 +116,10 @@ xresp0_pipe_fini(void *arg) { xresp0_pipe *p = arg; - nni_aio_free(p->aio_putq); - nni_aio_free(p->aio_getq); - nni_aio_free(p->aio_send); - nni_aio_free(p->aio_recv); + nni_aio_fini(&p->aio_putq); + nni_aio_fini(&p->aio_getq); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); nni_msgq_fini(p->sendq); } @@ -134,11 +129,12 @@ xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s) xresp0_pipe *p = arg; int rv; - if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_alloc(&p->aio_putq, xresp0_putq_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_recv, xresp0_recv_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_getq, xresp0_getq_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, xresp0_send_cb, p)) != 0)) { + nni_aio_init(&p->aio_putq, xresp0_putq_cb, p); + nni_aio_init(&p->aio_recv, xresp0_recv_cb, p); + nni_aio_init(&p->aio_getq, xresp0_getq_cb, p); + nni_aio_init(&p->aio_send, xresp0_send_cb, p); + + if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) { xresp0_pipe_fini(p); return (rv); } @@ -162,14 +158,14 @@ xresp0_pipe_start(void *arg) p->id = nni_pipe_id(p->npipe); nni_mtx_lock(&s->mtx); - rv = nni_idhash_insert(s->pipes, p->id, p); + rv = nni_id_set(&s->pipes, p->id, p); nni_mtx_unlock(&s->mtx); if (rv != 0) { return (rv); } - nni_pipe_recv(p->npipe, p->aio_recv); - nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_pipe_recv(p->npipe, &p->aio_recv); + nni_msgq_aio_get(p->sendq, &p->aio_getq); return (rv); } @@ -180,15 +176,15 @@ xresp0_pipe_close(void *arg) xresp0_pipe *p = arg; xresp0_sock *s = p->psock; - nni_aio_close(p->aio_putq); - nni_aio_close(p->aio_getq); - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_putq); + nni_aio_close(&p->aio_getq); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_msgq_close(p->sendq); nni_mtx_lock(&s->mtx); - nni_idhash_remove(s->pipes, p->id); + nni_id_remove(&s->pipes, p->id); nni_mtx_unlock(&s->mtx); } @@ -205,17 +201,17 @@ xresp0_sock_getq_cb(void *arg) uint32_t id; xresp0_pipe *p; - if (nni_aio_result(s->aio_getq) != 0) { + if (nni_aio_result(&s->aio_getq) != 0) { return; } - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); + msg = nni_aio_get_msg(&s->aio_getq); + nni_aio_set_msg(&s->aio_getq, NULL); // We yank the outgoing pipe id from the header if (nni_msg_header_len(msg) < 4) { nni_msg_free(msg); // We can't really close down the socket, so just keep going. - nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_msgq_aio_get(s->uwq, &s->aio_getq); return; } id = nni_msg_header_trim_u32(msg); @@ -224,12 +220,12 @@ xresp0_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) || + if (((p = nni_id_get(&s->pipes, id)) == NULL) || (nni_msgq_tryput(p->sendq, msg) != 0)) { nni_msg_free(msg); } nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_msgq_aio_get(s->uwq, &s->aio_getq); } void @@ -237,15 +233,15 @@ xresp0_getq_cb(void *arg) { xresp0_pipe *p = arg; - if (nni_aio_result(p->aio_getq) != 0) { + if (nni_aio_result(&p->aio_getq) != 0) { nni_pipe_close(p->npipe); return; } - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); + nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq)); + nni_aio_set_msg(&p->aio_getq, NULL); - nni_pipe_send(p->npipe, p->aio_send); + nni_pipe_send(p->npipe, &p->aio_send); } void @@ -253,14 +249,14 @@ xresp0_send_cb(void *arg) { xresp0_pipe *p = arg; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->npipe); return; } - nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_msgq_aio_get(p->sendq, &p->aio_getq); } static void @@ -273,14 +269,14 @@ xresp0_recv_cb(void *arg) int hops; int ttl; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->npipe); return; } ttl = nni_atomic_get(&s->ttl); - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, p->id); // Store the pipe id in the header, first thing. @@ -314,13 +310,13 @@ xresp0_recv_cb(void *arg) } // Now send it up. - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(urq, p->aio_putq); + nni_aio_set_msg(&p->aio_putq, msg); + nni_msgq_aio_put(urq, &p->aio_putq); return; drop: nni_msg_free(msg); - nni_pipe_recv(p->npipe, p->aio_recv); + nni_pipe_recv(p->npipe, &p->aio_recv); } static void @@ -328,22 +324,22 @@ xresp0_putq_cb(void *arg) { xresp0_pipe *p = arg; - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); + if (nni_aio_result(&p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_putq)); + nni_aio_set_msg(&p->aio_putq, NULL); nni_pipe_close(p->npipe); return; } - nni_pipe_recv(p->npipe, p->aio_recv); + nni_pipe_recv(p->npipe, &p->aio_recv); } static int xresp0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t) { xresp0_sock *s = arg; - int ttl; - int rv; + int ttl; + int rv; if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) { nni_atomic_set(&s->ttl, ttl); } |
