diff options
| author | Garrett D'Amore <garrett@damore.org> | 2019-02-24 22:04:16 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2019-02-26 21:09:54 -0800 |
| commit | 5803db08e55ed9287dc59b3adc281b89c52c530f (patch) | |
| tree | 9d2d65ed86be5c7b976fc3bdfc5ed5b375143641 /src/protocol/pubsub0/pub.c | |
| parent | 9cf967e9d7fdab6ccf38f80d83e4bf3d1a5e1a67 (diff) | |
| download | nng-5803db08e55ed9287dc59b3adc281b89c52c530f.tar.gz nng-5803db08e55ed9287dc59b3adc281b89c52c530f.tar.bz2 nng-5803db08e55ed9287dc59b3adc281b89c52c530f.zip | |
fixes #461 Context support for SUB
fixes #762 Pub/Sub very slow compared with nanomsg
This introduces contexts for SUB, and converts both the cooked SUB
and PUB protocols to use a new lightweight message queue that has
significant performance benefits over the heavy-weight message queue.
We've also added a test program, pubdrop, in the perf directory,
which can be used for measuring pub/sub message rates and drop rates.
Note that its quite easy to overwhelm a subscriber still.
The SUB socket performance is still not completely where it needs to be.
There are two remainging things to improve. Firsst we need to replace
the naive linked list of topics with a proper PATRICIA trie. Second, we
need to work on the low level POSIX poller code. (The Windows code is
already quite good, and we outperform nanomsg on Windows.)
Diffstat (limited to 'src/protocol/pubsub0/pub.c')
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 282 |
1 files changed, 170 insertions, 112 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index 2567a5b6..bec0763f 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -32,25 +32,25 @@ typedef struct pub0_sock pub0_sock; static void pub0_pipe_recv_cb(void *); static void pub0_pipe_send_cb(void *); -static void pub0_pipe_getq_cb(void *); -static void pub0_sock_getq_cb(void *); static void pub0_sock_fini(void *); static void pub0_pipe_fini(void *); // pub0_sock is our per-socket protocol private structure. struct pub0_sock { - nni_msgq *uwq; - nni_aio * aio_getq; - nni_list pipes; - nni_mtx mtx; + nni_list pipes; + nni_mtx mtx; + bool closed; + size_t sendbuf; + nni_pollable *sendable; }; // pub0_pipe is our per-pipe protocol private structure. struct pub0_pipe { nni_pipe * pipe; pub0_sock * pub; - nni_msgq * sendq; - nni_aio * aio_getq; + nni_lmq sendq; + bool closed; + bool busy; nni_aio * aio_send; nni_aio * aio_recv; nni_list_node node; @@ -61,48 +61,46 @@ pub0_sock_fini(void *arg) { pub0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_pollable_free(s->sendable); nni_mtx_fini(&s->mtx); NNI_FREE_STRUCT(s); } static int -pub0_sock_init(void **sp, nni_sock *sock) +pub0_sock_init(void **sp, nni_sock *nsock) { - pub0_sock *s; + pub0_sock *sock; int rv; + NNI_ARG_UNUSED(nsock); - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) { - pub0_sock_fini(s); + if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) { + NNI_FREE_STRUCT(sock); return (rv); } - - NNI_LIST_INIT(&s->pipes, pub0_pipe, node); - - s->uwq = nni_sock_sendq(sock); - - *sp = s; + nni_mtx_init(&sock->mtx); + NNI_LIST_INIT(&sock->pipes, pub0_pipe, node); + sock->sendbuf = 16; // fairly arbitrary + *sp = sock; return (0); } static void pub0_sock_open(void *arg) { - pub0_sock *s = arg; - - nni_msgq_aio_get(s->uwq, s->aio_getq); + NNI_ARG_UNUSED(arg); } static void pub0_sock_close(void *arg) { - pub0_sock *s = arg; + pub0_sock *sock = arg; - nni_aio_close(s->aio_getq); + nni_mtx_lock(&sock->mtx); + sock->closed = true; + nni_mtx_unlock(&sock->mtx); } static void @@ -110,7 +108,6 @@ pub0_pipe_stop(void *arg) { pub0_pipe *p = arg; - nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); nni_aio_stop(p->aio_recv); } @@ -120,10 +117,9 @@ pub0_pipe_fini(void *arg) { pub0_pipe *p = arg; - nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); - nni_msgq_fini(p->sendq); + nni_lmq_fini(&p->sendq); NNI_FREE_STRUCT(p); } @@ -131,15 +127,20 @@ static int pub0_pipe_init(void **pp, nni_pipe *pipe, void *s) { pub0_pipe *p; + pub0_sock *sock = s; int rv; + size_t len; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } + nni_mtx_lock(&sock->mtx); + len = sock->sendbuf; + nni_mtx_unlock(&sock->mtx); + // XXX: consider making this depth tunable - if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) || + if (((rv = nni_lmq_init(&p->sendq, len)) != 0) || ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { @@ -147,6 +148,7 @@ pub0_pipe_init(void **pp, nni_pipe *pipe, void *s) return (rv); } + p->busy = false; p->pipe = pipe; p->pub = s; *pp = p; @@ -156,19 +158,18 @@ pub0_pipe_init(void **pp, nni_pipe *pipe, void *s) static int pub0_pipe_start(void *arg) { - pub0_pipe *p = arg; - pub0_sock *s = p->pub; + pub0_pipe *p = arg; + pub0_sock *sock = p->pub; if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) { return (NNG_EPROTO); } - nni_mtx_lock(&s->mtx); - nni_list_append(&s->pipes, p); - nni_mtx_unlock(&s->mtx); + nni_mtx_lock(&sock->mtx); + nni_list_append(&sock->pipes, p); + nni_mtx_unlock(&sock->mtx); - // Start the receiver and the queue reader. + // Start the receiver. nni_pipe_recv(p->pipe, p->aio_recv); - nni_msgq_aio_get(p->sendq, p->aio_getq); return (0); } @@ -176,62 +177,20 @@ pub0_pipe_start(void *arg) static void pub0_pipe_close(void *arg) { - pub0_pipe *p = arg; - pub0_sock *s = p->pub; + pub0_pipe *p = arg; + pub0_sock *sock = p->pub; - nni_aio_close(p->aio_getq); nni_aio_close(p->aio_send); nni_aio_close(p->aio_recv); - nni_msgq_close(p->sendq); + nni_mtx_lock(&sock->mtx); + p->closed = true; + nni_lmq_flush(&p->sendq); - nni_mtx_lock(&s->mtx); - if (nni_list_active(&s->pipes, p)) { - nni_list_remove(&s->pipes, p); + if (nni_list_active(&sock->pipes, p)) { + nni_list_remove(&sock->pipes, p); } - nni_mtx_unlock(&s->mtx); -} - -static void -pub0_sock_getq_cb(void *arg) -{ - pub0_sock *s = arg; - nni_msgq * uwq = s->uwq; - nni_msg * msg, *dup; - - pub0_pipe *p; - pub0_pipe *last; - int rv; - - if (nni_aio_result(s->aio_getq) != 0) { - return; - } - - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); - - nni_mtx_lock(&s->mtx); - last = nni_list_last(&s->pipes); - NNI_LIST_FOREACH (&s->pipes, p) { - if (p != last) { - rv = nni_msg_dup(&dup, msg); - if (rv != 0) { - continue; - } - } else { - dup = msg; - } - if ((rv = nni_msgq_tryput(p->sendq, dup)) != 0) { - nni_msg_free(dup); - } - } - nni_mtx_unlock(&s->mtx); - - if (last == NULL) { - nni_msg_free(msg); - } - - nni_msgq_aio_get(uwq, s->aio_getq); + nni_mtx_unlock(&sock->mtx); } static void @@ -244,31 +203,18 @@ pub0_pipe_recv_cb(void *arg) return; } + // We should never get any messages. If we do we just dicard them. nni_msg_free(nni_aio_get_msg(p->aio_recv)); nni_aio_set_msg(p->aio_recv, NULL); nni_pipe_recv(p->pipe, p->aio_recv); } static void -pub0_pipe_getq_cb(void *arg) -{ - pub0_pipe *p = arg; - - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_close(p->pipe); - return; - } - - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - nni_pipe_send(p->pipe, p->aio_send); -} - -static void pub0_pipe_send_cb(void *arg) { - pub0_pipe *p = arg; + pub0_pipe *p = arg; + pub0_sock *sock = p->pub; + nni_msg * msg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -277,23 +223,126 @@ pub0_pipe_send_cb(void *arg) return; } - nni_aio_set_msg(p->aio_send, NULL); - nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_mtx_lock(&sock->mtx); + if (sock->closed || p->closed) { + nni_mtx_unlock(&sock->mtx); + return; + } + if (nni_lmq_getq(&p->sendq, &msg) == 0) { + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->pipe, p->aio_send); + } else { + p->busy = false; + } + nni_mtx_unlock(&sock->mtx); } static void pub0_sock_recv(void *arg, nni_aio *aio) { NNI_ARG_UNUSED(arg); - nni_aio_finish_error(aio, NNG_ENOTSUP); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, NNG_ENOTSUP); + } } static void pub0_sock_send(void *arg, nni_aio *aio) { - pub0_sock *s = arg; + pub0_sock *sock = arg; + pub0_pipe *p; + nng_msg * msg; + nng_msg * dup; + size_t len; + + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_mtx_lock(&sock->mtx); + if (sock->closed) { + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + NNI_LIST_FOREACH (&sock->pipes, p) { + if (p->closed) { + continue; + } + if (nni_lmq_full(&p->sendq)) { + continue; + } + if (p == nni_list_last(&sock->pipes)) { + dup = msg; + msg = NULL; + } else if (nni_msg_dup(&dup, msg) != 0) { + continue; + } + if (p->busy) { + nni_lmq_putq(&p->sendq, dup); + } else { + p->busy = true; + nni_aio_set_msg(p->aio_send, dup); + nni_pipe_send(p->pipe, p->aio_send); + } + } + nni_mtx_unlock(&sock->mtx); + nni_aio_finish(aio, 0, len); +} + +static int +pub0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_type t) +{ + pub0_sock *sock = arg; + int fd; + int rv; + nni_mtx_lock(&sock->mtx); + // PUB sockets are *always* sendable. + nni_pollable_raise(sock->sendable); + rv = nni_pollable_getfd(sock->sendable, &fd); + nni_mtx_unlock(&sock->mtx); + + if (rv == 0) { + rv = nni_copyout_int(fd, buf, szp, t); + } + return (rv); +} + +static int +pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t) +{ + pub0_sock *sock = arg; + pub0_pipe *p; + size_t val; + int rv; + + if ((rv = nni_copyin_size(&val, buf, sz, 1, 8192, t)) != 0) { + return (rv); + } - nni_msgq_aio_put(s->uwq, aio); + nni_mtx_lock(&sock->mtx); + sock->sendbuf = val; + NNI_LIST_FOREACH (&sock->pipes, p) { + // If we fail part way thru (should only be ENOMEM), we + // stop short. The others would likely fail for ENOMEM as + // well anyway. There is a weird effect here where the + // buffers may have been set for *some* of the pipes, but + // we have no way to correct, or even report, partial failure. + if ((rv = nni_lmq_resize(&p->sendq, val)) != 0) { + break; + } + } + nni_mtx_unlock(&sock->mtx); + return (rv); +} + +static int +pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t) +{ + pub0_sock *sock = arg; + size_t val; + nni_mtx_lock(&sock->mtx); + val = sock->sendbuf; + nni_mtx_unlock(&sock->mtx); + return (nni_copyout_size(val, buf, szp, t)); } static nni_proto_pipe_ops pub0_pipe_ops = { @@ -307,6 +356,15 @@ static nni_proto_pipe_ops pub0_pipe_ops = { static nni_option pub0_sock_options[] = { // terminate list { + .o_name = NNG_OPT_SENDFD, + .o_get = pub0_sock_get_sendfd, + }, + { + .o_name = NNG_OPT_SENDBUF, + .o_get = pub0_sock_get_sendbuf, + .o_set = pub0_sock_set_sendbuf, + }, + { .o_name = NULL, }, }; @@ -325,7 +383,7 @@ static nni_proto pub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PUB_V0, "pub" }, .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, - .proto_flags = NNI_PROTO_FLAG_SND, + .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &pub0_sock_ops, .proto_pipe_ops = &pub0_pipe_ops, }; |
