aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0/rep.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-04 10:24:05 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-04 10:56:40 -0800
commit382b4cff3abd5ccb282ba420ef1f7c7d171ec91a (patch)
tree6860e1cceb52a7dab2763001eb27edf95a0e7246 /src/protocol/reqrep0/rep.c
parentbcc3814b58e9b198344bdaf6e7a916a354841275 (diff)
downloadnng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.tar.gz
nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.tar.bz2
nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.zip
fixes #1105 pollable can be inlined, and use atomics
This also introduces an nni_atomic_cas64 to help with lock-free designs. Some mechanical renaming was done in some of the protocols for spelling.
Diffstat (limited to 'src/protocol/reqrep0/rep.c')
-rw-r--r--src/protocol/reqrep0/rep.c39
1 files changed, 18 insertions, 21 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 328babbc..a715ab59 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -36,13 +36,13 @@ static void rep0_pipe_fini(void *);
struct rep0_ctx {
rep0_sock * sock;
- size_t btrace_len;
uint32_t pipe_id;
rep0_pipe * spipe; // send pipe
nni_aio * saio; // send aio
nni_aio * raio; // recv aio
nni_list_node sqnode;
nni_list_node rqnode;
+ size_t btrace_len;
uint32_t btrace[256]; // backtrace buffer
};
@@ -54,8 +54,8 @@ struct rep0_sock {
nni_list recvpipes; // list of pipes with data to receive
nni_list recvq;
rep0_ctx ctx;
- nni_pollable *recvable;
- nni_pollable *sendable;
+ nni_pollable readable;
+ nni_pollable writable;
};
// rep0_pipe is our per-pipe protocol private structure.
@@ -167,7 +167,7 @@ rep0_ctx_send(void *arg, nni_aio *aio)
// to send on the socket (root context). That's because
// we will have finished (successfully or otherwise) the
// reply for the single request we got.
- nni_pollable_clear(s->sendable);
+ nni_pollable_clear(&s->writable);
}
if (len == 0) {
@@ -220,8 +220,8 @@ rep0_sock_fini(void *arg)
nni_idhash_fini(s->pipes);
rep0_ctx_fini(&s->ctx);
- nni_pollable_free(s->sendable);
- nni_pollable_free(s->recvable);
+ nni_pollable_fini(&s->writable);
+ nni_pollable_fini(&s->readable);
nni_mtx_fini(&s->lk);
}
@@ -246,13 +246,10 @@ rep0_sock_init(void *arg, nni_sock *sock)
(void) rep0_ctx_init(&s->ctx, s);
- // We start off without being either readable or pollable.
+ // We start off without being either readable or writable.
// Readability comes when there is something on the socket.
- if (((rv = nni_pollable_alloc(&s->sendable)) != 0) ||
- ((rv = nni_pollable_alloc(&s->recvable)) != 0)) {
- rep0_sock_fini(s);
- return (rv);
- }
+ nni_pollable_init(&s->writable);
+ nni_pollable_init(&s->readable);
return (0);
}
@@ -331,7 +328,7 @@ rep0_pipe_start(void *arg)
return (rv);
}
// By definition, we have not received a request yet on this pipe,
- // so it cannot cause us to become sendable.
+ // so it cannot cause us to become writable.
nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
@@ -367,7 +364,7 @@ rep0_pipe_close(void *arg)
if (p->id == s->ctx.pipe_id) {
// We "can" send. (Well, not really, but we will happily
// accept a message and discard it.)
- nni_pollable_raise(s->sendable);
+ nni_pollable_raise(&s->writable);
}
nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
nni_mtx_unlock(&s->lk);
@@ -395,7 +392,7 @@ rep0_pipe_send_cb(void *arg)
// Nothing else to send.
if (p->id == s->ctx.pipe_id) {
// Mark us ready for the other side to send!
- nni_pollable_raise(s->sendable);
+ nni_pollable_raise(&s->writable);
}
nni_mtx_unlock(&s->lk);
return;
@@ -469,11 +466,11 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
nni_aio_set_msg(p->aio_recv, NULL);
nni_list_remove(&s->recvpipes, p);
if (nni_list_empty(&s->recvpipes)) {
- nni_pollable_clear(s->recvable);
+ nni_pollable_clear(&s->readable);
}
nni_pipe_recv(p->pipe, p->aio_recv);
if ((ctx == &s->ctx) && !p->busy) {
- nni_pollable_raise(s->sendable);
+ nni_pollable_raise(&s->writable);
}
len = nni_msg_header_len(msg);
@@ -547,7 +544,7 @@ rep0_pipe_recv_cb(void *arg)
if ((ctx = nni_list_first(&s->recvq)) == NULL) {
// No one waiting to receive yet, holding pattern.
nni_list_append(&s->recvpipes, p);
- nni_pollable_raise(s->recvable);
+ nni_pollable_raise(&s->readable);
nni_mtx_unlock(&s->lk);
return;
}
@@ -557,7 +554,7 @@ rep0_pipe_recv_cb(void *arg)
ctx->raio = NULL;
nni_aio_set_msg(p->aio_recv, NULL);
if ((ctx == &s->ctx) && !p->busy) {
- nni_pollable_raise(s->sendable);
+ nni_pollable_raise(&s->writable);
}
// schedule another receive
@@ -603,7 +600,7 @@ rep0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
int rv;
int fd;
- if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) {
+ if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) {
return (rv);
}
return (nni_copyout_int(fd, buf, szp, t));
@@ -616,7 +613,7 @@ rep0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
int rv;
int fd;
- if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) {
+ if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) {
return (rv);
}