diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-12-26 11:54:40 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-12-27 17:26:19 -0800 |
| commit | 3d535b638667ad0fcfff4246fce61c0176a056c4 (patch) | |
| tree | ec9b5054885e3898ac0f5f3ed203ab49534d11bb /src/protocol/pipeline0/push.c | |
| parent | c3bfec2b38caaf34379a891e0ea30c7e48147c6f (diff) | |
| download | nng-3d535b638667ad0fcfff4246fce61c0176a056c4.tar.gz nng-3d535b638667ad0fcfff4246fce61c0176a056c4.tar.bz2 nng-3d535b638667ad0fcfff4246fce61c0176a056c4.zip | |
fixes #972 Very slow pull/push performance compared to ZMQ
This refactors the pipeline protocol to use lightweight mq
instead of the more expensive message queue structure. It
also provides nicer backpressure and buffering support.
The test suite was updated and converted to NUTS as well.
This won't completely close the gap, but it should help quite
a bit.
Diffstat (limited to 'src/protocol/pipeline0/push.c')
| -rw-r--r-- | src/protocol/pipeline0/push.c | 298 |
1 files changed, 238 insertions, 60 deletions
diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c index 90c94af9..8aa5311c 100644 --- a/src/protocol/pipeline0/push.c +++ b/src/protocol/pipeline0/push.c @@ -30,11 +30,15 @@ typedef struct push0_sock push0_sock; static void push0_send_cb(void *); static void push0_recv_cb(void *); -static void push0_getq_cb(void *); +static void push0_pipe_ready(push0_pipe *); // push0_sock is our per-socket protocol private structure. struct push0_sock { - nni_msgq *uwq; + nni_lmq wq; // list of messages queued + nni_list aq; // list of aio senders waiting + nni_list pl; // list of pipes ready to send + nni_pollable writable; + nni_mtx m; }; // push0_pipe is our per-pipe protocol private structure. @@ -43,23 +47,32 @@ struct push0_pipe { push0_sock * push; nni_list_node node; - nni_aio *aio_recv; - nni_aio *aio_send; - nni_aio *aio_getq; + nni_aio aio_recv; + nni_aio aio_send; }; static int push0_sock_init(void *arg, nni_sock *sock) { push0_sock *s = arg; - s->uwq = nni_sock_sendq(sock); + NNI_ARG_UNUSED(sock); + + nni_mtx_init(&s->m); + nni_aio_list_init(&s->aq); + NNI_LIST_INIT(&s->pl, push0_pipe, node); + nni_lmq_init(&s->wq, 0); // initially we start unbuffered. + nni_pollable_init(&s->writable); + return (0); } static void push0_sock_fini(void *arg) { - NNI_ARG_UNUSED(arg); + push0_sock *s = arg; + nni_pollable_fini(&s->writable); + nni_lmq_fini(&s->wq); + nni_mtx_fini(&s->m); } static void @@ -71,7 +84,14 @@ push0_sock_open(void *arg) static void push0_sock_close(void *arg) { - NNI_ARG_UNUSED(arg); + push0_sock *s = arg; + nni_aio * a; + nni_mtx_lock(&s->m); + while ((a = nni_list_first(&s->aq)) != NULL) { + nni_aio_list_remove(a); + nni_aio_finish_error(a, NNG_ECLOSED); + } + nni_mtx_unlock(&s->m); } static void @@ -79,9 +99,8 @@ push0_pipe_stop(void *arg) { push0_pipe *p = arg; - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_getq); + nni_aio_stop(&p->aio_recv); + nni_aio_stop(&p->aio_send); } static void @@ -89,23 +108,17 @@ push0_pipe_fini(void *arg) { push0_pipe *p = arg; - nni_aio_free(p->aio_recv); - nni_aio_free(p->aio_send); - nni_aio_free(p->aio_getq); + nni_aio_fini(&p->aio_recv); + nni_aio_fini(&p->aio_send); } static int push0_pipe_init(void *arg, nni_pipe *pipe, void *s) { push0_pipe *p = arg; - int rv; - if (((rv = nni_aio_alloc(&p->aio_recv, push0_recv_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, push0_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_getq, push0_getq_cb, p)) != 0)) { - push0_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->aio_recv, push0_recv_cb, p); + nni_aio_init(&p->aio_send, push0_send_cb, p); NNI_LIST_NODE_INIT(&p->node); p->pipe = pipe; p->push = s; @@ -116,7 +129,6 @@ static int push0_pipe_start(void *arg) { push0_pipe *p = arg; - push0_sock *s = p->push; if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) { return (NNG_EPROTO); @@ -124,10 +136,8 @@ push0_pipe_start(void *arg) // Schedule a receiver. This is mostly so that we can detect // a closed transport pipe. - nni_pipe_recv(p->pipe, p->aio_recv); - - // Schedule a sender. - nni_msgq_aio_get(s->uwq, p->aio_getq); + nni_pipe_recv(p->pipe, &p->aio_recv); + push0_pipe_ready(p); return (0); } @@ -136,10 +146,20 @@ static void push0_pipe_close(void *arg) { push0_pipe *p = arg; + push0_sock *s = p->push; + + nni_aio_close(&p->aio_recv); + nni_aio_close(&p->aio_send); + + nni_mtx_lock(&s->m); + if (nni_list_node_active(&p->node)) { + nni_list_node_remove(&p->node); - nni_aio_close(p->aio_recv); - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_getq); + if (nni_list_empty(&s->pl) && nni_lmq_full(&s->wq)) { + nni_pollable_clear(&s->writable); + } + } + nni_mtx_unlock(&s->m); } static void @@ -149,55 +169,155 @@ push0_recv_cb(void *arg) // We normally expect to receive an error. If a pipe actually // sends us data, we just discard it. - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - nni_msg_free(nni_aio_get_msg(p->aio_recv)); - nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_recv(p->pipe, p->aio_recv); + nni_msg_free(nni_aio_get_msg(&p->aio_recv)); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->pipe, &p->aio_recv); } static void -push0_send_cb(void *arg) +push0_pipe_ready(push0_pipe *p) { - push0_pipe *p = arg; push0_sock *s = p->push; + nni_msg * m; + nni_aio * a = NULL; + size_t l; + bool blocked; + + nni_mtx_lock(&s->m); + + blocked = nni_lmq_full(&s->wq) && nni_list_empty(&s->pl); + + // if message is waiting in the buffered queue + // then we prefer that. + if (nni_lmq_getq(&s->wq, &m) == 0) { + nni_aio_set_msg(&p->aio_send, m); + nni_pipe_send(p->pipe, &p->aio_send); + + if ((a = nni_list_first(&s->aq)) != NULL) { + nni_aio_list_remove(a); + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + nni_lmq_putq(&s->wq, m); + } + + } else if ((a = nni_list_first(&s->aq)) != NULL) { + // Looks like we had the unbuffered case, but + // someone was waiting. + nni_aio_list_remove(a); + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + + nni_aio_set_msg(&p->aio_send, m); + nni_pipe_send(p->pipe, &p->aio_send); + } else { + // We had nothing to send. Just put us in the ready list. + nni_list_append(&s->pl, p); + } - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_close(p->pipe); - return; + if (blocked) { + // if we blocked, then toggle the status. + if ((!nni_lmq_full(&s->wq)) || (!nni_list_empty(&s->pl))) { + nni_pollable_raise(&s->writable); + } } - nni_msgq_aio_get(s->uwq, p->aio_getq); + nni_mtx_unlock(&s->m); + + if (a != NULL) { + nni_aio_set_msg(a, NULL); + nni_aio_finish_sync(a, 0, l); + } } static void -push0_getq_cb(void *arg) +push0_send_cb(void *arg) { - push0_pipe *p = arg; - nni_aio * aio = p->aio_getq; + push0_pipe *p = arg; - if (nni_aio_result(aio) != 0) { - // If the socket is closing, nothing else we can do. + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->pipe); return; } - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(aio)); - nni_aio_set_msg(aio, NULL); + push0_pipe_ready(p); +} - nni_pipe_send(p->pipe, p->aio_send); +static void +push0_cancel(nni_aio *aio, void *arg, int rv) +{ + push0_sock *s = arg; + + nni_mtx_lock(&s->m); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->m); } static void push0_sock_send(void *arg, nni_aio *aio) { push0_sock *s = arg; + push0_pipe *p; + nni_msg * m; + size_t l; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + m = nni_aio_get_msg(aio); + l = nni_msg_len(m); + + nni_mtx_lock(&s->m); + + // First we want to see if we can send it right now. + // Note that we don't block the sender until the read is complete, + // only until we have committed to send it. + if ((p = nni_list_first(&s->pl)) != NULL) { + nni_list_remove(&s->pl, p); + // NB: We won't have had any waiters in the message queue + // or the aio queue, because we would not put the pipe + // in the ready list in that case. Note though that the + // wq may be "full" if we are unbuffered. + if (nni_list_empty(&s->pl) && (nni_lmq_full(&s->wq))) { + nni_pollable_clear(&s->writable); + } + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, l); + nni_aio_set_msg(&p->aio_send, m); + nni_pipe_send(p->pipe, &p->aio_send); + nni_mtx_unlock(&s->m); + return; + } - nni_msgq_aio_put(s->uwq, aio); + // Can we maybe queue it. + if (nni_lmq_putq(&s->wq, m) == 0) { + // Yay, we can. So we're done. + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, l); + if (nni_lmq_full(&s->wq)) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->m); + return; + } + + if ((rv = nni_aio_schedule(aio, push0_cancel, s)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&s->m); + return; + } + nni_aio_list_append(&s->aq, aio); + nni_mtx_unlock(&s->m); } static void @@ -207,6 +327,54 @@ push0_sock_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ENOTSUP); } +static int +push0_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + push0_sock *s = arg; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->m); + rv = nni_lmq_resize(&s->wq, (size_t) val); + // Changing the size of the queue can affect our readiness. + if (!nni_lmq_full(&s->wq)) { + nni_pollable_raise(&s->writable); + } else if (nni_list_empty(&s->pl)) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->m); + return (rv); +} + +static int +push0_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + push0_sock *s = arg; + int val; + + nni_mtx_lock(&s->m); + val = nni_lmq_cap(&s->wq); + nni_mtx_unlock(&s->m); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +push0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + push0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + static nni_proto_pipe_ops push0_pipe_ops = { .pipe_size = sizeof(push0_pipe), .pipe_init = push0_pipe_init, @@ -217,7 +385,16 @@ static nni_proto_pipe_ops push0_pipe_ops = { }; static nni_option push0_sock_options[] = { - // terminate list + { + .o_name = NNG_OPT_SENDFD, + .o_get = push0_sock_get_send_fd, + }, + { + .o_name = NNG_OPT_SENDBUF, + .o_get = push0_get_send_buf_len, + .o_set = push0_set_send_buf_len, + }, + // terminate list { .o_name = NULL, }, @@ -238,28 +415,29 @@ static nni_proto push0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PUSH_V0, "push" }, .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, - .proto_flags = NNI_PROTO_FLAG_SND, + .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_NOMSGQ, .proto_pipe_ops = &push0_pipe_ops, .proto_sock_ops = &push0_sock_ops, }; static nni_proto push0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_PUSH_V0, "push" }, - .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, - .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW, + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUSH_V0, "push" }, + .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, + .proto_flags = + NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW | NNI_PROTO_FLAG_NOMSGQ, .proto_pipe_ops = &push0_pipe_ops, .proto_sock_ops = &push0_sock_ops, }; int -nng_push0_open(nng_socket *sidp) +nng_push0_open(nng_socket *s) { - return (nni_proto_open(sidp, &push0_proto)); + return (nni_proto_open(s, &push0_proto)); } int -nng_push0_open_raw(nng_socket *sidp) +nng_push0_open_raw(nng_socket *s) { - return (nni_proto_open(sidp, &push0_proto_raw)); + return (nni_proto_open(s, &push0_proto_raw)); } |
