diff options
Diffstat (limited to 'src/protocol/survey0')
| -rw-r--r-- | src/protocol/survey0/respond.c | 47 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c | 27 |
2 files changed, 32 insertions, 42 deletions
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index ccd25242..b4ffc917 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -48,14 +48,14 @@ struct resp0_ctx { // resp0_sock is our per-socket protocol private structure. struct resp0_sock { - nni_mtx mtx; - int ttl; - nni_idhash * pipes; - resp0_ctx ctx; - nni_list recvpipes; - nni_list recvq; - nni_pollable *recvable; - nni_pollable *sendable; + nni_mtx mtx; + int ttl; + nni_idhash * pipes; + resp0_ctx ctx; + nni_list recvpipes; + nni_list recvq; + nni_pollable readable; + nni_pollable writable; }; // resp0_pipe is our per-pipe protocol private structure. @@ -155,7 +155,7 @@ resp0_ctx_send(void *arg, nni_aio *aio) if (ctx == &s->ctx) { // We can't send anymore, because only one send per request. - nni_pollable_clear(s->sendable); + nni_pollable_clear(&s->writable); } nni_mtx_lock(&s->mtx); @@ -215,8 +215,8 @@ resp0_sock_fini(void *arg) nni_idhash_fini(s->pipes); resp0_ctx_fini(&s->ctx); - nni_pollable_free(s->sendable); - nni_pollable_free(s->recvable); + nni_pollable_fini(&s->writable); + nni_pollable_fini(&s->readable); nni_mtx_fini(&s->mtx); } @@ -241,13 +241,10 @@ resp0_sock_init(void *arg, nni_sock *nsock) (void) resp0_ctx_init(&s->ctx, s); - // We start off without being either readable or pollable. + // We start off without being either readable or writable. // Readability comes when there is something on the socket. - if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || - ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { - resp0_sock_fini(s); - return (rv); - } + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); return (0); } @@ -357,7 +354,7 @@ resp0_pipe_close(void *arg) if (p->id == s->ctx.pipe_id) { // Make sure user space knows they can send a message to us, // which we will happily discard. - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_idhash_remove(s->pipes, p->id); nni_mtx_unlock(&s->mtx); @@ -385,7 +382,7 @@ resp0_pipe_send_cb(void *arg) // Nothing else to send. if (p->id == s->ctx.pipe_id) { // Mark us ready for the other side to send! - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_mtx_unlock(&s->mtx); return; @@ -459,7 +456,7 @@ resp0_ctx_recv(void *arg, nni_aio *aio) nni_aio_set_msg(p->aio_recv, NULL); nni_list_remove(&s->recvpipes, p); if (nni_list_empty(&s->recvpipes)) { - nni_pollable_clear(s->recvable); + nni_pollable_clear(&s->readable); } nni_pipe_recv(p->npipe, p->aio_recv); @@ -468,7 +465,7 @@ resp0_ctx_recv(void *arg, nni_aio *aio) ctx->btrace_len = len; ctx->pipe_id = p->id; if (ctx == &s->ctx) { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_mtx_unlock(&s->mtx); @@ -530,7 +527,7 @@ resp0_pipe_recv_cb(void *arg) if ((ctx = nni_list_first(&s->recvq)) == NULL) { // No one blocked in recv, stall. nni_list_append(&s->recvpipes, p); - nni_pollable_raise(s->recvable); + nni_pollable_raise(&s->readable); nni_mtx_unlock(&s->mtx); return; } @@ -549,7 +546,7 @@ resp0_pipe_recv_cb(void *arg) ctx->pipe_id = p->id; if ((ctx == &s->ctx) && (!p->busy)) { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_mtx_unlock(&s->mtx); @@ -584,7 +581,7 @@ resp0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); @@ -597,7 +594,7 @@ resp0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index be0ee55e..8aa05dd4 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -52,7 +52,7 @@ struct surv0_sock { nni_mtx mtx; surv0_ctx ctx; nni_idhash * surveys; - nni_pollable *sendable; + nni_pollable writable; }; // surv0_pipe is our per-pipe protocol private structure. @@ -221,7 +221,7 @@ surv0_sock_fini(void *arg) surv0_ctx_fini(&sock->ctx); nni_idhash_fini(sock->surveys); - nni_pollable_free(sock->sendable); + nni_pollable_fini(&sock->writable); nni_mtx_fini(&sock->mtx); } @@ -235,6 +235,9 @@ surv0_sock_init(void *arg, nni_sock *nsock) NNI_LIST_INIT(&sock->pipes, surv0_pipe, node); nni_mtx_init(&sock->mtx); + nni_pollable_init(&sock->writable); + // We are always writable. + nni_pollable_raise(&sock->writable); if (((rv = nni_idhash_init(&sock->surveys)) != 0) || ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0)) { @@ -249,7 +252,7 @@ surv0_sock_init(void *arg, nni_sock *nsock) nni_random() | 0x80000000u); sock->ctx.survtime = NNI_SECOND; - sock->ttl = 8; + sock->ttl = 8; return (0); } @@ -478,17 +481,7 @@ surv0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - nni_mtx_lock(&sock->mtx); - if (sock->sendable == NULL) { - if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) { - nni_mtx_unlock(&sock->mtx); - return (rv); - } - // We are always sendable. - nni_pollable_raise(sock->sendable); - } - nni_mtx_unlock(&sock->mtx); - if ((rv = nni_pollable_getfd(sock->sendable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&sock->writable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); @@ -498,12 +491,12 @@ static int surv0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) { surv0_sock * sock = arg; - nni_pollable *recvable; + nni_pollable *readable; int rv; int fd; - if (((rv = nni_msgq_get_recvable(sock->ctx.rq, &recvable)) != 0) || - ((rv = nni_pollable_getfd(recvable, &fd)) != 0)) { + if (((rv = nni_msgq_get_recvable(sock->ctx.rq, &readable)) != 0) || + ((rv = nni_pollable_getfd(readable, &fd)) != 0)) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); |
