From 3d535b638667ad0fcfff4246fce61c0176a056c4 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 26 Dec 2020 11:54:40 -0800 Subject: fixes #972 Very slow pull/push performance compared to ZMQ This refactors the pipeline protocol to use lightweight mq instead of the more expensive message queue structure. It also provides nicer backpressure and buffering support. The test suite was updated and converted to NUTS as well. This won't completely close the gap, but it should help quite a bit. --- src/protocol/pipeline0/pull.c | 237 +++++++++++++++++++++++++++--------------- 1 file changed, 155 insertions(+), 82 deletions(-) (limited to 'src/protocol/pipeline0/pull.c') diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index 8feb08b8..94403431 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -26,37 +26,46 @@ typedef struct pull0_pipe pull0_pipe; typedef struct pull0_sock pull0_sock; -static void pull0_putq_cb(void *); static void pull0_recv_cb(void *); -static void pull0_putq(pull0_pipe *, nni_msg *); // pull0_sock is our per-socket protocol private structure. struct pull0_sock { - nni_msgq *urq; - bool raw; + bool raw; + nni_list pl; // pipe list (pipes with data ready) + nni_list rq; // recv queue (aio list) + nni_mtx m; + nni_pollable readable; }; // pull0_pipe is our per-pipe protocol private structure. struct pull0_pipe { - nni_pipe * pipe; - pull0_sock *pull; - nni_aio * putq_aio; - nni_aio * recv_aio; + nni_pipe * p; + pull0_sock * s; + nni_msg * m; + nni_aio aio; + bool closed; + nni_list_node node; }; static int pull0_sock_init(void *arg, nni_sock *sock) { pull0_sock *s = arg; + NNI_ARG_UNUSED(sock); - s->urq = nni_sock_recvq(sock); + nni_aio_list_init(&s->rq); + NNI_LIST_INIT(&s->pl, pull0_pipe, node); + nni_mtx_init(&s->m); + nni_pollable_init(&s->readable); return (0); } static void pull0_sock_fini(void *arg) { - NNI_ARG_UNUSED(arg); + pull0_sock *s = arg; + nni_mtx_fini(&s->m); + nni_pollable_fini(&s->readable); } static void @@ -64,8 +73,7 @@ pull0_pipe_stop(void *arg) { pull0_pipe *p = arg; - nni_aio_stop(p->putq_aio); - nni_aio_stop(p->recv_aio); + nni_aio_stop(&p->aio); } static void @@ -73,24 +81,20 @@ pull0_pipe_fini(void *arg) { pull0_pipe *p = arg; - nni_aio_free(p->putq_aio); - nni_aio_free(p->recv_aio); + nni_aio_fini(&p->aio); + if (p->m) { + nni_msg_free(p->m); + } } static int pull0_pipe_init(void *arg, nni_pipe *pipe, void *s) { pull0_pipe *p = arg; - int rv; - if (((rv = nni_aio_alloc(&p->putq_aio, pull0_putq_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->recv_aio, pull0_recv_cb, p)) != 0)) { - pull0_pipe_fini(p); - return (rv); - } - - p->pipe = pipe; - p->pull = s; + nni_aio_init(&p->aio, pull0_recv_cb, p); + p->p = pipe; + p->s = s; return (0); } @@ -99,13 +103,13 @@ pull0_pipe_start(void *arg) { pull0_pipe *p = arg; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUSH_V0) { + if (nni_pipe_peer(p->p) != NNI_PROTO_PUSH_V0) { // Peer protocol mismatch. return (NNG_EPROTO); } - // Start the pending pull... - nni_pipe_recv(p->pipe, p->recv_aio); + // Start the pending receive... + nni_pipe_recv(p->p, &p->aio); return (0); } @@ -114,58 +118,62 @@ static void pull0_pipe_close(void *arg) { pull0_pipe *p = arg; + pull0_sock *s = p->s; + + nni_mtx_lock(&s->m); + p->closed = true; + if (nni_list_node_active(&p->node)) { + nni_list_node_remove(&p->node); + if (nni_list_empty(&s->pl)) { + nni_pollable_clear(&s->readable); + } + } + nni_mtx_unlock(&s->m); - nni_aio_close(p->putq_aio); - nni_aio_close(p->recv_aio); + nni_aio_close(&p->aio); } static void pull0_recv_cb(void *arg) { - pull0_pipe *p = arg; - nni_aio * aio = p->recv_aio; - nni_msg * msg; + pull0_pipe *p = arg; + pull0_sock *s = p->s; + nni_aio * ap = &p->aio; + nni_aio * as; + nni_msg * m; - if (nni_aio_result(aio) != 0) { + if (nni_aio_result(ap) != 0) { // Failed to get a message, probably the pipe is closed. - nni_pipe_close(p->pipe); + nni_pipe_close(p->p); return; } // Got a message... start the put to send it up to the application. - msg = nni_aio_get_msg(aio); - nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - nni_aio_set_msg(aio, NULL); - pull0_putq(p, msg); -} - -static void -pull0_putq_cb(void *arg) -{ - pull0_pipe *p = arg; - nni_aio * aio = p->putq_aio; - - if (nni_aio_result(aio) != 0) { - // If we failed to put, probably NNG_ECLOSED, nothing else - // we can do. Just close the pipe. - nni_msg_free(nni_aio_get_msg(aio)); - nni_aio_set_msg(aio, NULL); - nni_pipe_close(p->pipe); + m = nni_aio_get_msg(ap); + nni_aio_set_msg(ap, NULL); + nni_msg_set_pipe(m, nni_pipe_id(p->p)); + + nni_mtx_lock(&s->m); + if (p->closed) { + nni_mtx_unlock(&s->m); + nni_msg_free(m); return; } - - nni_pipe_recv(p->pipe, p->recv_aio); -} - -// pull0_putq schedules a put operation to the user socket (sendup). -static void -pull0_putq(pull0_pipe *p, nni_msg *msg) -{ - pull0_sock *s = p->pull; - - nni_aio_set_msg(p->putq_aio, msg); - - nni_msgq_aio_put(s->urq, p->putq_aio); + if (nni_list_empty(&s->rq)) { + nni_list_append(&s->pl, p); + if (nni_list_first(&s->pl) == p) { + nni_pollable_raise(&s->readable); + } + p->m = m; + nni_mtx_unlock(&s->m); + return; + } + nni_pipe_recv(p->p, ap); + as = nni_list_first(&s->rq); + nni_aio_list_remove(as); + nni_mtx_unlock(&s->m); + nni_aio_set_msg(as, m); + nni_aio_finish_sync(as, 0, nni_msg_len(m)); } static void @@ -177,7 +185,15 @@ pull0_sock_open(void *arg) static void pull0_sock_close(void *arg) { - NNI_ARG_UNUSED(arg); + pull0_sock *s = arg; + nni_aio *a; + nni_mtx_lock(&s->m); + while ((a = nni_list_first(&s->rq)) != NULL) { + nni_aio_list_remove(a); + nni_aio_finish_error(a, NNG_ECLOSED); + } + // NB: The common socket framework closes pipes before this. + nni_mtx_unlock(&s->m); } static void @@ -187,14 +203,77 @@ pull0_sock_send(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ENOTSUP); } +static void +pull0_cancel(nni_aio *aio, void *arg, int rv) +{ + pull0_sock *s = arg; + nni_mtx_lock(&s->m); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->m); +} + static void pull0_sock_recv(void *arg, nni_aio *aio) { pull0_sock *s = arg; + pull0_pipe *p; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&s->m); + if ((p = nni_list_first(&s->pl)) == NULL) { + + int rv; + if ((rv = nni_aio_schedule(aio, pull0_cancel, s)) != 0) { + nni_mtx_unlock(&s->m); + nni_aio_finish_error(aio, rv); + return; + } + + nni_aio_list_append(&s->rq, aio); + nni_mtx_unlock(&s->m); + return; + } + + nni_list_remove(&s->pl, p); + if (nni_list_empty(&s->pl)) { + nni_pollable_clear(&s->readable); + } + nni_aio_finish_msg(aio, p->m); + p->m = NULL; + nni_pipe_recv(p->p, &p->aio); + nni_mtx_unlock(&s->m); +} + +static int +pull0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pull0_sock *s = arg; + int rv; + int fd; - nni_msgq_aio_get(s->urq, aio); + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); } +static nni_option pull0_sock_options[] = { + { + .o_name = NNG_OPT_RECVFD, + .o_get = pull0_sock_get_recv_fd, + }, + // terminate list + { + .o_name = NULL, + }, +}; + static nni_proto_pipe_ops pull0_pipe_ops = { .pipe_size = sizeof(pull0_pipe), .pipe_init = pull0_pipe_init, @@ -204,13 +283,6 @@ static nni_proto_pipe_ops pull0_pipe_ops = { .pipe_stop = pull0_pipe_stop, }; -static nni_option pull0_sock_options[] = { - // terminate list - { - .o_name = NULL, - }, -}; - static nni_proto_sock_ops pull0_sock_ops = { .sock_size = sizeof(pull0_sock), .sock_init = pull0_sock_init, @@ -226,28 +298,29 @@ static nni_proto pull0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PULL_V0, "pull" }, .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, - .proto_flags = NNI_PROTO_FLAG_RCV, + .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_NOMSGQ, .proto_pipe_ops = &pull0_pipe_ops, .proto_sock_ops = &pull0_sock_ops, }; static nni_proto pull0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_PULL_V0, "pull" }, - .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, - .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW, + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PULL_V0, "pull" }, + .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, + .proto_flags = + NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW | NNI_PROTO_FLAG_NOMSGQ, .proto_pipe_ops = &pull0_pipe_ops, .proto_sock_ops = &pull0_sock_ops, }; int -nng_pull0_open(nng_socket *sidp) +nng_pull0_open(nng_socket *s) { - return (nni_proto_open(sidp, &pull0_proto)); + return (nni_proto_open(s, &pull0_proto)); } int -nng_pull0_open_raw(nng_socket *sidp) +nng_pull0_open_raw(nng_socket *s) { - return (nni_proto_open(sidp, &pull0_proto_raw)); + return (nni_proto_open(s, &pull0_proto_raw)); } -- cgit v1.2.3-70-g09d2