diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-04 10:24:05 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-04 10:56:40 -0800 |
| commit | 382b4cff3abd5ccb282ba420ef1f7c7d171ec91a (patch) | |
| tree | 6860e1cceb52a7dab2763001eb27edf95a0e7246 /src/protocol/reqrep0/rep.c | |
| parent | bcc3814b58e9b198344bdaf6e7a916a354841275 (diff) | |
| download | nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.tar.gz nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.tar.bz2 nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.zip | |
fixes #1105 pollable can be inlined, and use atomics
This also introduces an nni_atomic_cas64 to help with lock-free designs.
Some mechanical renaming was done in some of the protocols for spelling.
Diffstat (limited to 'src/protocol/reqrep0/rep.c')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 39 |
1 files changed, 18 insertions, 21 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 328babbc..a715ab59 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -36,13 +36,13 @@ static void rep0_pipe_fini(void *); struct rep0_ctx { rep0_sock * sock; - size_t btrace_len; uint32_t pipe_id; rep0_pipe * spipe; // send pipe nni_aio * saio; // send aio nni_aio * raio; // recv aio nni_list_node sqnode; nni_list_node rqnode; + size_t btrace_len; uint32_t btrace[256]; // backtrace buffer }; @@ -54,8 +54,8 @@ struct rep0_sock { nni_list recvpipes; // list of pipes with data to receive nni_list recvq; rep0_ctx ctx; - nni_pollable *recvable; - nni_pollable *sendable; + nni_pollable readable; + nni_pollable writable; }; // rep0_pipe is our per-pipe protocol private structure. @@ -167,7 +167,7 @@ rep0_ctx_send(void *arg, nni_aio *aio) // to send on the socket (root context). That's because // we will have finished (successfully or otherwise) the // reply for the single request we got. - nni_pollable_clear(s->sendable); + nni_pollable_clear(&s->writable); } if (len == 0) { @@ -220,8 +220,8 @@ rep0_sock_fini(void *arg) nni_idhash_fini(s->pipes); rep0_ctx_fini(&s->ctx); - nni_pollable_free(s->sendable); - nni_pollable_free(s->recvable); + nni_pollable_fini(&s->writable); + nni_pollable_fini(&s->readable); nni_mtx_fini(&s->lk); } @@ -246,13 +246,10 @@ rep0_sock_init(void *arg, nni_sock *sock) (void) rep0_ctx_init(&s->ctx, s); - // We start off without being either readable or pollable. + // We start off without being either readable or writable. // Readability comes when there is something on the socket. - if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || - ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { - rep0_sock_fini(s); - return (rv); - } + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); return (0); } @@ -331,7 +328,7 @@ rep0_pipe_start(void *arg) return (rv); } // By definition, we have not received a request yet on this pipe, - // so it cannot cause us to become sendable. + // so it cannot cause us to become writable. nni_pipe_recv(p->pipe, p->aio_recv); return (0); } @@ -367,7 +364,7 @@ rep0_pipe_close(void *arg) if (p->id == s->ctx.pipe_id) { // We "can" send. (Well, not really, but we will happily // accept a message and discard it.) - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); nni_mtx_unlock(&s->lk); @@ -395,7 +392,7 @@ rep0_pipe_send_cb(void *arg) // Nothing else to send. if (p->id == s->ctx.pipe_id) { // Mark us ready for the other side to send! - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_mtx_unlock(&s->lk); return; @@ -469,11 +466,11 @@ rep0_ctx_recv(void *arg, nni_aio *aio) nni_aio_set_msg(p->aio_recv, NULL); nni_list_remove(&s->recvpipes, p); if (nni_list_empty(&s->recvpipes)) { - nni_pollable_clear(s->recvable); + nni_pollable_clear(&s->readable); } nni_pipe_recv(p->pipe, p->aio_recv); if ((ctx == &s->ctx) && !p->busy) { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } len = nni_msg_header_len(msg); @@ -547,7 +544,7 @@ rep0_pipe_recv_cb(void *arg) if ((ctx = nni_list_first(&s->recvq)) == NULL) { // No one waiting to receive yet, holding pattern. nni_list_append(&s->recvpipes, p); - nni_pollable_raise(s->recvable); + nni_pollable_raise(&s->readable); nni_mtx_unlock(&s->lk); return; } @@ -557,7 +554,7 @@ rep0_pipe_recv_cb(void *arg) ctx->raio = NULL; nni_aio_set_msg(p->aio_recv, NULL); if ((ctx == &s->ctx) && !p->busy) { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } // schedule another receive @@ -603,7 +600,7 @@ rep0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); @@ -616,7 +613,7 @@ rep0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { return (rv); } |
