diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-04 10:24:05 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-04 10:56:40 -0800 |
| commit | 382b4cff3abd5ccb282ba420ef1f7c7d171ec91a (patch) | |
| tree | 6860e1cceb52a7dab2763001eb27edf95a0e7246 /src/protocol/reqrep0 | |
| parent | bcc3814b58e9b198344bdaf6e7a916a354841275 (diff) | |
| download | nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.tar.gz nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.tar.bz2 nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.zip | |
fixes #1105 pollable can be inlined, and use atomics
This also introduces an nni_atomic_cas64 to help with lock-free designs.
Some mechanical renaming was done in some of the protocols for spelling.
Diffstat (limited to 'src/protocol/reqrep0')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 39 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep_test.c | 2 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 392 |
3 files changed, 214 insertions, 219 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 328babbc..a715ab59 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -36,13 +36,13 @@ static void rep0_pipe_fini(void *); struct rep0_ctx { rep0_sock * sock; - size_t btrace_len; uint32_t pipe_id; rep0_pipe * spipe; // send pipe nni_aio * saio; // send aio nni_aio * raio; // recv aio nni_list_node sqnode; nni_list_node rqnode; + size_t btrace_len; uint32_t btrace[256]; // backtrace buffer }; @@ -54,8 +54,8 @@ struct rep0_sock { nni_list recvpipes; // list of pipes with data to receive nni_list recvq; rep0_ctx ctx; - nni_pollable *recvable; - nni_pollable *sendable; + nni_pollable readable; + nni_pollable writable; }; // rep0_pipe is our per-pipe protocol private structure. @@ -167,7 +167,7 @@ rep0_ctx_send(void *arg, nni_aio *aio) // to send on the socket (root context). That's because // we will have finished (successfully or otherwise) the // reply for the single request we got. - nni_pollable_clear(s->sendable); + nni_pollable_clear(&s->writable); } if (len == 0) { @@ -220,8 +220,8 @@ rep0_sock_fini(void *arg) nni_idhash_fini(s->pipes); rep0_ctx_fini(&s->ctx); - nni_pollable_free(s->sendable); - nni_pollable_free(s->recvable); + nni_pollable_fini(&s->writable); + nni_pollable_fini(&s->readable); nni_mtx_fini(&s->lk); } @@ -246,13 +246,10 @@ rep0_sock_init(void *arg, nni_sock *sock) (void) rep0_ctx_init(&s->ctx, s); - // We start off without being either readable or pollable. + // We start off without being either readable or writable. // Readability comes when there is something on the socket. - if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || - ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { - rep0_sock_fini(s); - return (rv); - } + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); return (0); } @@ -331,7 +328,7 @@ rep0_pipe_start(void *arg) return (rv); } // By definition, we have not received a request yet on this pipe, - // so it cannot cause us to become sendable. + // so it cannot cause us to become writable. nni_pipe_recv(p->pipe, p->aio_recv); return (0); } @@ -367,7 +364,7 @@ rep0_pipe_close(void *arg) if (p->id == s->ctx.pipe_id) { // We "can" send. (Well, not really, but we will happily // accept a message and discard it.) - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); nni_mtx_unlock(&s->lk); @@ -395,7 +392,7 @@ rep0_pipe_send_cb(void *arg) // Nothing else to send. if (p->id == s->ctx.pipe_id) { // Mark us ready for the other side to send! - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_mtx_unlock(&s->lk); return; @@ -469,11 +466,11 @@ rep0_ctx_recv(void *arg, nni_aio *aio) nni_aio_set_msg(p->aio_recv, NULL); nni_list_remove(&s->recvpipes, p); if (nni_list_empty(&s->recvpipes)) { - nni_pollable_clear(s->recvable); + nni_pollable_clear(&s->readable); } nni_pipe_recv(p->pipe, p->aio_recv); if ((ctx == &s->ctx) && !p->busy) { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } len = nni_msg_header_len(msg); @@ -547,7 +544,7 @@ rep0_pipe_recv_cb(void *arg) if ((ctx = nni_list_first(&s->recvq)) == NULL) { // No one waiting to receive yet, holding pattern. nni_list_append(&s->recvpipes, p); - nni_pollable_raise(s->recvable); + nni_pollable_raise(&s->readable); nni_mtx_unlock(&s->lk); return; } @@ -557,7 +554,7 @@ rep0_pipe_recv_cb(void *arg) ctx->raio = NULL; nni_aio_set_msg(p->aio_recv, NULL); if ((ctx == &s->ctx) && !p->busy) { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } // schedule another receive @@ -603,7 +600,7 @@ rep0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); @@ -616,7 +613,7 @@ rep0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { return (rv); } diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c index 5fe9cb17..879d6ae4 100644 --- a/src/protocol/reqrep0/rep_test.c +++ b/src/protocol/reqrep0/rep_test.c @@ -74,7 +74,7 @@ test_rep_poll_writeable(void) // Still not writable. TEST_CHECK(testutil_pollfd(fd) == false); - // If we get a job, *then* we become writeable + // If we get a job, *then* we become writable TEST_NNG_SEND_STR(req, "abc"); TEST_NNG_RECV_STR(rep, "abc"); TEST_CHECK(testutil_pollfd(fd) == true); diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 4326f411..33629abc 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -29,7 +29,7 @@ typedef struct req0_pipe req0_pipe; typedef struct req0_sock req0_sock; typedef struct req0_ctx req0_ctx; -static void req0_run_sendq(req0_sock *, nni_list *); +static void req0_run_send_queue(req0_sock *, nni_list *); static void req0_ctx_reset(req0_ctx *); static void req0_ctx_timeout(void *); static void req0_pipe_fini(void *); @@ -40,35 +40,35 @@ static int req0_ctx_init(void *, void *); // socket, but keeps track of its own outstanding replays, the request ID, // and so forth. struct req0_ctx { - nni_list_node snode; - nni_list_node sqnode; // node on the sendq - nni_list_node pnode; // node on the pipe list - uint32_t reqid; req0_sock * sock; - nni_aio * raio; // user aio waiting to receive - only one! - nni_aio * saio; - nng_msg * reqmsg; // request message - size_t reqlen; - nng_msg * repmsg; // reply message + nni_list_node sock_node; // node on the socket context list + nni_list_node send_node; // node on the send_queue + nni_list_node pipe_node; // node on the pipe list + uint32_t request_id; // request ID, without high bit set + nni_aio * recv_aio; // user aio waiting to recv - only one! + nni_aio * send_aio; // user aio waiting to send + nng_msg * req_msg; // request message + size_t req_len; // length of request message (for stats) + nng_msg * rep_msg; // reply message nni_timer_node timer; nni_duration retry; }; // A req0_sock is our per-socket protocol private structure. struct req0_sock { - nni_duration retry; - bool closed; - int ttl; - req0_ctx ctx; // base socket ctx - nni_list readypipes; - nni_list busypipes; - nni_list stoppipes; - nni_list ctxs; - nni_list sendq; // contexts waiting to send. - nni_idhash * reqids; // contexts by request ID - nni_pollable *recvable; - nni_pollable *sendable; - nni_mtx mtx; + nni_duration retry; + bool closed; + int ttl; + req0_ctx master; // base socket master + nni_list ready_pipes; + nni_list busy_pipes; + nni_list stop_pipes; + nni_list contexts; + nni_list send_queue; // contexts waiting to send. + nni_idhash * requests; // contexts by request ID + nni_pollable readable; + nni_pollable writable; + nni_mtx mtx; }; // A req0_pipe is our per-pipe protocol private structure. @@ -76,7 +76,7 @@ struct req0_pipe { nni_pipe * pipe; req0_sock * req; nni_list_node node; - nni_list ctxs; // ctxs with pending traffic + nni_list contexts; // contexts with pending traffic bool closed; nni_aio * aio_send; nni_aio * aio_recv; @@ -94,7 +94,7 @@ req0_sock_init(void *arg, nni_sock *sock) NNI_ARG_UNUSED(sock); - if ((rv = nni_idhash_init(&s->reqids)) != 0) { + if ((rv = nni_idhash_init(&s->requests)) != 0) { return (rv); } @@ -102,26 +102,23 @@ req0_sock_init(void *arg, nni_sock *sock) // We start at a random point, to minimize likelihood of // accidental collision across restarts. nni_idhash_set_limits( - s->reqids, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u); + s->requests, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u); nni_mtx_init(&s->mtx); - NNI_LIST_INIT(&s->readypipes, req0_pipe, node); - NNI_LIST_INIT(&s->busypipes, req0_pipe, node); - NNI_LIST_INIT(&s->stoppipes, req0_pipe, node); - NNI_LIST_INIT(&s->sendq, req0_ctx, sqnode); - NNI_LIST_INIT(&s->ctxs, req0_ctx, snode); + NNI_LIST_INIT(&s->ready_pipes, req0_pipe, node); + NNI_LIST_INIT(&s->busy_pipes, req0_pipe, node); + NNI_LIST_INIT(&s->stop_pipes, req0_pipe, node); + NNI_LIST_INIT(&s->send_queue, req0_ctx, send_node); + NNI_LIST_INIT(&s->contexts, req0_ctx, sock_node); // this is "semi random" start for request IDs. s->retry = NNI_SECOND * 60; - (void) req0_ctx_init(&s->ctx, s); + (void) req0_ctx_init(&s->master, s); - if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || - ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { - req0_sock_fini(s); - return (rv); - } + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); s->ttl = 8; return (0); @@ -141,10 +138,10 @@ req0_sock_close(void *arg) nni_mtx_lock(&s->mtx); s->closed = true; - NNI_LIST_FOREACH (&s->ctxs, ctx) { - if (ctx->raio != NULL) { - nni_aio_finish_error(ctx->raio, NNG_ECLOSED); - ctx->raio = NULL; + NNI_LIST_FOREACH (&s->contexts, ctx) { + if (ctx->recv_aio != NULL) { + nni_aio_finish_error(ctx->recv_aio, NNG_ECLOSED); + ctx->recv_aio = NULL; req0_ctx_reset(ctx); } } @@ -157,15 +154,15 @@ req0_sock_fini(void *arg) req0_sock *s = arg; nni_mtx_lock(&s->mtx); - NNI_ASSERT(nni_list_empty(&s->busypipes)); - NNI_ASSERT(nni_list_empty(&s->stoppipes)); - NNI_ASSERT(nni_list_empty(&s->readypipes)); + NNI_ASSERT(nni_list_empty(&s->busy_pipes)); + NNI_ASSERT(nni_list_empty(&s->stop_pipes)); + NNI_ASSERT(nni_list_empty(&s->ready_pipes)); nni_mtx_unlock(&s->mtx); - req0_ctx_fini(&s->ctx); - nni_pollable_free(s->recvable); - nni_pollable_free(s->sendable); - nni_idhash_fini(s->reqids); + req0_ctx_fini(&s->master); + nni_pollable_fini(&s->readable); + nni_pollable_fini(&s->writable); + nni_idhash_fini(s->requests); nni_mtx_fini(&s->mtx); } @@ -204,7 +201,7 @@ req0_pipe_init(void *arg, nni_pipe *pipe, void *s) } NNI_LIST_NODE_INIT(&p->node); - NNI_LIST_INIT(&p->ctxs, req0_ctx, pnode); + NNI_LIST_INIT(&p->contexts, req0_ctx, pipe_node); p->pipe = pipe; p->req = s; return (0); @@ -225,9 +222,9 @@ req0_pipe_start(void *arg) nni_mtx_unlock(&s->mtx); return (NNG_ECLOSED); } - nni_list_append(&s->readypipes, p); - nni_pollable_raise(s->sendable); - req0_run_sendq(s, NULL); + nni_list_append(&s->ready_pipes, p); + nni_pollable_raise(&s->writable); + req0_run_send_queue(s, NULL); nni_mtx_unlock(&s->mtx); nni_pipe_recv(p->pipe, p->aio_recv); @@ -245,22 +242,22 @@ req0_pipe_close(void *arg) nni_aio_close(p->aio_send); nni_mtx_lock(&s->mtx); - // This removes the node from either busypipes or readypipes. + // This removes the node from either busy_pipes or ready_pipes. // It doesn't much matter which. We stick the pipe on the stop // list, so that we can wait for that to close down safely. p->closed = true; nni_list_node_remove(&p->node); - nni_list_append(&s->stoppipes, p); - if (nni_list_empty(&s->readypipes)) { - nni_pollable_clear(s->sendable); + nni_list_append(&s->stop_pipes, p); + if (nni_list_empty(&s->ready_pipes)) { + nni_pollable_clear(&s->writable); } - while ((ctx = nni_list_first(&p->ctxs)) != NULL) { - nni_list_remove(&p->ctxs, ctx); + while ((ctx = nni_list_first(&p->contexts)) != NULL) { + nni_list_remove(&p->contexts, ctx); // Reset the timer on this so it expires immediately. // This is actually easier than canceling the timer and - // running the sendq separately. (In particular, it avoids - // a potential deadlock on cancelling the timer.) + // running the send_queue separately. (In particular, it + // avoids a potential deadlock on cancelling the timer.) nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO); } nni_mtx_unlock(&s->mtx); @@ -268,7 +265,7 @@ req0_pipe_close(void *arg) // For cooked mode, we use a context, and send out that way. This // completely bypasses the upper write queue. Each context keeps one -// message pending; these are "scheduled" via the sendq. The sendq +// message pending; these are "scheduled" via the send_queue. The send_queue // is ordered, so FIFO ordering between contexts is provided for. static void @@ -277,9 +274,9 @@ req0_send_cb(void *arg) req0_pipe *p = arg; req0_sock *s = p->req; nni_aio * aio; - nni_list aios; + nni_list send_list; - nni_aio_list_init(&aios); + nni_aio_list_init(&send_list); if (nni_aio_result(p->aio_send) != 0) { // We failed to send... clean up and deal with it. nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -289,7 +286,7 @@ req0_send_cb(void *arg) } // We completed a cooked send, so we need to reinsert ourselves - // in the ready list, and re-run the sendq. + // in the ready list, and re-run the send_queue. nni_mtx_lock(&s->mtx); if (p->closed || s->closed) { @@ -298,16 +295,16 @@ req0_send_cb(void *arg) nni_mtx_unlock(&s->mtx); return; } - nni_list_remove(&s->busypipes, p); - nni_list_append(&s->readypipes, p); - if (nni_list_empty(&s->sendq)) { - nni_pollable_raise(s->sendable); + nni_list_remove(&s->busy_pipes, p); + nni_list_append(&s->ready_pipes, p); + if (nni_list_empty(&s->send_queue)) { + nni_pollable_raise(&s->writable); } - req0_run_sendq(s, &aios); + req0_run_send_queue(s, &send_list); nni_mtx_unlock(&s->mtx); - while ((aio = nni_list_first(&aios)) != NULL) { - nni_list_remove(&aios, aio); + while ((aio = nni_list_first(&send_list)) != NULL) { + nni_list_remove(&send_list, aio); nni_aio_finish_synch(aio, 0, 0); } } @@ -350,8 +347,8 @@ req0_recv_cb(void *arg) nni_pipe_recv(p->pipe, p->aio_recv); // Look for a context to receive it. - if ((nni_idhash_find(s->reqids, id, (void **) &ctx) != 0) || - (ctx->saio != NULL) || (ctx->repmsg != NULL)) { + if ((nni_idhash_find(s->requests, id, (void **) &ctx) != 0) || + (ctx->send_aio != NULL) || (ctx->rep_msg != NULL)) { nni_mtx_unlock(&s->mtx); // No waiting context, we have not sent the request out to // the wire yet, or context already has a reply ready. @@ -361,25 +358,25 @@ req0_recv_cb(void *arg) } // We have our match, so we can remove this. - nni_list_node_remove(&ctx->sqnode); - nni_idhash_remove(s->reqids, id); - ctx->reqid = 0; - if (ctx->reqmsg != NULL) { - nni_msg_free(ctx->reqmsg); - ctx->reqmsg = NULL; + nni_list_node_remove(&ctx->send_node); + nni_idhash_remove(s->requests, id); + ctx->request_id = 0; + if (ctx->req_msg != NULL) { + nni_msg_free(ctx->req_msg); + ctx->req_msg = NULL; } // Is there an aio waiting for us? - if ((aio = ctx->raio) != NULL) { - ctx->raio = NULL; + if ((aio = ctx->recv_aio) != NULL) { + ctx->recv_aio = NULL; nni_mtx_unlock(&s->mtx); nni_aio_set_msg(aio, msg); nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); } else { // No AIO, so stash msg. Receive will pick it up later. - ctx->repmsg = msg; - if (ctx == &s->ctx) { - nni_pollable_raise(s->recvable); + ctx->rep_msg = msg; + if (ctx == &s->master) { + nni_pollable_raise(&s->readable); } nni_mtx_unlock(&s->mtx); } @@ -397,28 +394,28 @@ req0_ctx_timeout(void *arg) req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); - if ((ctx->reqmsg != NULL) && (!s->closed)) { - if (!nni_list_node_active(&ctx->sqnode)) { - nni_list_append(&s->sendq, ctx); + if ((ctx->req_msg != NULL) && (!s->closed)) { + if (!nni_list_node_active(&ctx->send_node)) { + nni_list_append(&s->send_queue, ctx); } - req0_run_sendq(s, NULL); + req0_run_send_queue(s, NULL); } nni_mtx_unlock(&s->mtx); } static int -req0_ctx_init(void *carg, void *sarg) +req0_ctx_init(void *arg, void *sock) { - req0_sock *s = sarg; - req0_ctx * ctx = carg; + req0_sock *s = sock; + req0_ctx * ctx = arg; nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx); nni_mtx_lock(&s->mtx); - ctx->sock = s; - ctx->raio = NULL; - ctx->retry = s->retry; - nni_list_append(&s->ctxs, ctx); + ctx->sock = s; + ctx->recv_aio = NULL; + ctx->retry = s->retry; + nni_list_append(&s->contexts, ctx); nni_mtx_unlock(&s->mtx); return (0); @@ -432,18 +429,18 @@ req0_ctx_fini(void *arg) nni_aio * aio; nni_mtx_lock(&s->mtx); - if ((aio = ctx->raio) != NULL) { - ctx->raio = NULL; + if ((aio = ctx->recv_aio) != NULL) { + ctx->recv_aio = NULL; nni_aio_finish_error(aio, NNG_ECLOSED); } - if ((aio = ctx->saio) != NULL) { - ctx->saio = NULL; - nni_aio_set_msg(aio, ctx->reqmsg); - ctx->reqmsg = NULL; + if ((aio = ctx->send_aio) != NULL) { + ctx->send_aio = NULL; + nni_aio_set_msg(aio, ctx->req_msg); + ctx->req_msg = NULL; nni_aio_finish_error(aio, NNG_ECLOSED); } req0_ctx_reset(ctx); - nni_list_remove(&s->ctxs, ctx); + nni_list_remove(&s->contexts, ctx); nni_mtx_unlock(&s->mtx); nni_timer_cancel(&ctx->timer); @@ -451,31 +448,31 @@ req0_ctx_fini(void *arg) } static int -req0_ctx_set_resendtime(void *arg, const void *buf, size_t sz, nni_opt_type t) +req0_ctx_set_resend_time(void *arg, const void *buf, size_t sz, nni_opt_type t) { req0_ctx *ctx = arg; return (nni_copyin_ms(&ctx->retry, buf, sz, t)); } static int -req0_ctx_get_resendtime(void *arg, void *buf, size_t *szp, nni_opt_type t) +req0_ctx_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t) { req0_ctx *ctx = arg; return (nni_copyout_ms(ctx->retry, buf, szp, t)); } static void -req0_run_sendq(req0_sock *s, nni_list *aiolist) +req0_run_send_queue(req0_sock *s, nni_list *send_list) { req0_ctx *ctx; nni_aio * aio; // Note: This routine should be called with the socket lock held. - while ((ctx = nni_list_first(&s->sendq)) != NULL) { + while ((ctx = nni_list_first(&s->send_queue)) != NULL) { nni_msg * msg; req0_pipe *p; - if ((p = nni_list_first(&s->readypipes)) == NULL) { + if ((p = nni_list_first(&s->ready_pipes)) == NULL) { return; } @@ -484,18 +481,18 @@ req0_run_sendq(req0_sock *s, nni_list *aiolist) // be dropped, we rely on the resend timer to pick it up. // We also notify the completion callback if this is the // first send attempt. - nni_list_remove(&s->sendq, ctx); + nni_list_remove(&s->send_queue, ctx); // Schedule a resubmit timer. We only do this if we got // a pipe to send to. Otherwise, we should get handled - // the next time that the sendq is run. We don't do this + // the next time that the send_queue is run. We don't do this // if the retry is "disabled" with NNG_DURATION_INFINITE. if (ctx->retry > 0) { nni_timer_schedule( &ctx->timer, nni_clock() + ctx->retry); } - if (nni_msg_dup(&msg, ctx->reqmsg) != 0) { + if (nni_msg_dup(&msg, ctx->req_msg) != 0) { // Oops. Well, keep trying each context; maybe // one of them will get lucky. continue; @@ -504,27 +501,27 @@ req0_run_sendq(req0_sock *s, nni_list *aiolist) // Put us on the pipe list of active contexts. // This gives the pipe a chance to kick a resubmit // if the pipe is removed. - nni_list_node_remove(&ctx->pnode); - nni_list_append(&p->ctxs, ctx); + nni_list_node_remove(&ctx->pipe_node); + nni_list_append(&p->contexts, ctx); - nni_list_remove(&s->readypipes, p); - nni_list_append(&s->busypipes, p); + nni_list_remove(&s->ready_pipes, p); + nni_list_append(&s->busy_pipes, p); - if ((aio = ctx->saio) != NULL) { - ctx->saio = NULL; - nni_aio_bump_count(aio, ctx->reqlen); + if ((aio = ctx->send_aio) != NULL) { + ctx->send_aio = NULL; + nni_aio_bump_count(aio, ctx->req_len); // If the list was passed in, we want to do a // synchronous completion later. - if (aiolist != NULL) { - nni_list_append(aiolist, aio); + if (send_list != NULL) { + nni_list_append(send_list, aio); } else { nni_aio_finish(aio, 0, 0); } - if (ctx == &s->ctx) { - if (nni_list_empty(&s->readypipes)) { - nni_pollable_clear(s->sendable); + if (ctx == &s->master) { + if (nni_list_empty(&s->ready_pipes)) { + nni_pollable_clear(&s->writable); } else { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } } } @@ -543,26 +540,26 @@ req0_ctx_reset(req0_ctx *ctx) // We cannot safely "wait" using nni_timer_cancel, but this removes // any scheduled timer activation. If the timeout is already running // concurrently, it will still run. It should do nothing, because - // we toss the reqmsg. There is still a very narrow race if the + // we toss the request. There is still a very narrow race if the // timeout fires, but doesn't actually start running before we // both finish this function, *and* manage to reschedule another // request. The consequence of that occurring is that the request // will be emitted on the wire twice. This is not actually tragic. nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER); - nni_list_node_remove(&ctx->pnode); - nni_list_node_remove(&ctx->sqnode); - if (ctx->reqid != 0) { - nni_idhash_remove(s->reqids, ctx->reqid); - ctx->reqid = 0; + nni_list_node_remove(&ctx->pipe_node); + nni_list_node_remove(&ctx->send_node); + if (ctx->request_id != 0) { + nni_idhash_remove(s->requests, ctx->request_id); + ctx->request_id = 0; } - if (ctx->reqmsg != NULL) { - nni_msg_free(ctx->reqmsg); - ctx->reqmsg = NULL; + if (ctx->req_msg != NULL) { + nni_msg_free(ctx->req_msg); + ctx->req_msg = NULL; } - if (ctx->repmsg != NULL) { - nni_msg_free(ctx->repmsg); - ctx->repmsg = NULL; + if (ctx->rep_msg != NULL) { + nni_msg_free(ctx->rep_msg); + ctx->rep_msg = NULL; } } @@ -573,12 +570,12 @@ req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv) req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); - if (ctx->raio != aio) { + if (ctx->recv_aio != aio) { // already completed, ignore this. nni_mtx_unlock(&s->mtx); return; } - ctx->raio = NULL; + ctx->recv_aio = NULL; // Cancellation of a pending receive is treated as aborting the // entire state machine. This allows us to preserve the semantic of @@ -608,8 +605,8 @@ req0_ctx_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - if ((ctx->raio != NULL) || - ((ctx->reqmsg == NULL) && (ctx->repmsg == NULL))) { + if ((ctx->recv_aio != NULL) || + ((ctx->req_msg == NULL) && (ctx->rep_msg == NULL))) { // We have already got a pending receive or have not // tried to send a request yet. // Either of these violate our basic state assumptions. @@ -618,7 +615,7 @@ req0_ctx_recv(void *arg, nni_aio *aio) return; } - if ((msg = ctx->repmsg) == NULL) { + if ((msg = ctx->rep_msg) == NULL) { int rv; rv = nni_aio_schedule(aio, req0_ctx_cancel_recv, ctx); if (rv != 0) { @@ -626,17 +623,17 @@ req0_ctx_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } - ctx->raio = aio; + ctx->recv_aio = aio; nni_mtx_unlock(&s->mtx); return; } - ctx->repmsg = NULL; + ctx->rep_msg = NULL; // We have got a message to pass up, yay! nni_aio_set_msg(aio, msg); - if (ctx == &s->ctx) { - nni_pollable_clear(s->recvable); + if (ctx == &s->master) { + nni_pollable_clear(&s->readable); } nni_mtx_unlock(&s->mtx); nni_aio_finish(aio, 0, nni_msg_len(msg)); @@ -649,7 +646,7 @@ req0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); - if (ctx->saio != aio) { + if (ctx->send_aio != aio) { // already completed, ignore this. nni_mtx_unlock(&s->mtx); return; @@ -657,12 +654,12 @@ req0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) // There should not be a pending reply, because we canceled // it while we were waiting. - NNI_ASSERT(ctx->raio == NULL); - ctx->saio = NULL; + NNI_ASSERT(ctx->recv_aio == NULL); + ctx->send_aio = NULL; // Restore the message back to the aio. - nni_aio_set_msg(aio, ctx->reqmsg); - nni_msg_header_clear(ctx->reqmsg); - ctx->reqmsg = NULL; + nni_aio_set_msg(aio, ctx->req_msg); + nni_msg_header_clear(ctx->req_msg); + ctx->req_msg = NULL; // Cancellation of a pending receive is treated as aborting the // entire state machine. This allows us to preserve the semantic of @@ -696,55 +693,55 @@ req0_ctx_send(void *arg, nni_aio *aio) } // Sending a new request cancels the old one, including any // outstanding reply. - if (ctx->raio != NULL) { - nni_aio_finish_error(ctx->raio, NNG_ECANCELED); - ctx->raio = NULL; + if (ctx->recv_aio != NULL) { + nni_aio_finish_error(ctx->recv_aio, NNG_ECANCELED); + ctx->recv_aio = NULL; } - if (ctx->saio != NULL) { - nni_aio_set_msg(ctx->saio, ctx->reqmsg); - nni_msg_header_clear(ctx->reqmsg); - ctx->reqmsg = NULL; - nni_aio_finish_error(ctx->saio, NNG_ECANCELED); - ctx->saio = NULL; - nni_list_remove(&s->sendq, ctx); + if (ctx->send_aio != NULL) { + nni_aio_set_msg(ctx->send_aio, ctx->req_msg); + nni_msg_header_clear(ctx->req_msg); + ctx->req_msg = NULL; + nni_aio_finish_error(ctx->send_aio, NNG_ECANCELED); + ctx->send_aio = NULL; + nni_list_remove(&s->send_queue, ctx); } // This resets the entire state machine. req0_ctx_reset(ctx); // Insert us on the per ID hash list, so that receives can find us. - if ((rv = nni_idhash_alloc(s->reqids, &id, ctx)) != 0) { + if ((rv = nni_idhash_alloc(s->requests, &id, ctx)) != 0) { nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); return; } - ctx->reqid = (uint32_t) id; - if ((rv = nni_msg_header_append_u32(msg, ctx->reqid)) != 0) { - nni_idhash_remove(s->reqids, id); + ctx->request_id = (uint32_t) id; + if ((rv = nni_msg_header_append_u32(msg, ctx->request_id)) != 0) { + nni_idhash_remove(s->requests, id); nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); return; } // If no pipes are ready, and the request was a poll (no background - // schedule), then fail it. Should be NNG_TIMEDOUT. + // schedule), then fail it. Should be NNG_ETIMEDOUT. rv = nni_aio_schedule(aio, req0_ctx_cancel_send, ctx); - if ((rv != 0) && (nni_list_empty(&s->readypipes))) { - nni_idhash_remove(s->reqids, id); + if ((rv != 0) && (nni_list_empty(&s->ready_pipes))) { + nni_idhash_remove(s->requests, id); nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); return; } - ctx->reqlen = nni_msg_len(msg); - ctx->reqmsg = msg; - ctx->saio = aio; + ctx->req_len = nni_msg_len(msg); + ctx->req_msg = msg; + ctx->send_aio = aio; nni_aio_set_msg(aio, NULL); - // Stick us on the sendq list. - nni_list_append(&s->sendq, ctx); + // Stick us on the send_queue list. + nni_list_append(&s->send_queue, ctx); - // Note that this will be synchronous if the readypipes list was + // Note that this will be synchronous if the ready_pipes list was // not empty. - req0_run_sendq(s, NULL); + req0_run_send_queue(s, NULL); nni_mtx_unlock(&s->mtx); } @@ -752,68 +749,69 @@ static void req0_sock_send(void *arg, nni_aio *aio) { req0_sock *s = arg; - req0_ctx_send(&s->ctx, aio); + req0_ctx_send(&s->master, aio); } static void req0_sock_recv(void *arg, nni_aio *aio) { req0_sock *s = arg; - req0_ctx_recv(&s->ctx, aio); + req0_ctx_recv(&s->master, aio); } static int -req0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +req0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) { req0_sock *s = arg; return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, t)); } static int -req0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +req0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) { req0_sock *s = arg; return (nni_copyout_int(s->ttl, buf, szp, t)); } static int -req0_sock_set_resendtime(void *arg, const void *buf, size_t sz, nni_opt_type t) +req0_sock_set_resend_time( + void *arg, const void *buf, size_t sz, nni_opt_type t) { req0_sock *s = arg; int rv; - rv = req0_ctx_set_resendtime(&s->ctx, buf, sz, t); - s->retry = s->ctx.retry; + rv = req0_ctx_set_resend_time(&s->master, buf, sz, t); + s->retry = s->master.retry; return (rv); } static int -req0_sock_get_resendtime(void *arg, void *buf, size_t *szp, nni_opt_type t) +req0_sock_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t) { req0_sock *s = arg; - return (req0_ctx_get_resendtime(&s->ctx, buf, szp, t)); + return (req0_ctx_get_resend_time(&s->master, buf, szp, t)); } static int -req0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t) +req0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) { req0_sock *s = arg; int rv; int fd; - if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); } static int -req0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) +req0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) { req0_sock *s = arg; int rv; int fd; - if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { return (rv); } @@ -832,8 +830,8 @@ static nni_proto_pipe_ops req0_pipe_ops = { static nni_option req0_ctx_options[] = { { .o_name = NNG_OPT_REQ_RESENDTIME, - .o_get = req0_ctx_get_resendtime, - .o_set = req0_ctx_set_resendtime, + .o_get = req0_ctx_get_resend_time, + .o_set = req0_ctx_set_resend_time, }, { .o_name = NULL, @@ -852,21 +850,21 @@ static nni_proto_ctx_ops req0_ctx_ops = { static nni_option req0_sock_options[] = { { .o_name = NNG_OPT_MAXTTL, - .o_get = req0_sock_get_maxttl, - .o_set = req0_sock_set_maxttl, + .o_get = req0_sock_get_max_ttl, + .o_set = req0_sock_set_max_ttl, }, { .o_name = NNG_OPT_REQ_RESENDTIME, - .o_get = req0_sock_get_resendtime, - .o_set = req0_sock_set_resendtime, + .o_get = req0_sock_get_resend_time, + .o_set = req0_sock_set_resend_time, }, { .o_name = NNG_OPT_RECVFD, - .o_get = req0_sock_get_recvfd, + .o_get = req0_sock_get_recv_fd, }, { .o_name = NNG_OPT_SENDFD, - .o_get = req0_sock_get_sendfd, + .o_get = req0_sock_get_send_fd, }, // terminate list { @@ -896,7 +894,7 @@ static nni_proto req0_proto = { }; int -nng_req0_open(nng_socket *sidp) +nng_req0_open(nng_socket *sock) { - return (nni_proto_open(sidp, &req0_proto)); + return (nni_proto_open(sock, &req0_proto)); } |
