diff options
| author | Garrett D'Amore <garrett@damore.org> | 2021-12-25 17:38:14 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2021-12-25 17:38:14 -0800 |
| commit | 6237d268514e1f8aec562052954db22c4540eec3 (patch) | |
| tree | 32213ea0016ae10faee2817f414308c91d881c42 /src/sp | |
| parent | 7a54bcd6fe345f35dd51eede6c5d66e8516c16ab (diff) | |
| download | nng-6237d268514e1f8aec562052954db22c4540eec3.tar.gz nng-6237d268514e1f8aec562052954db22c4540eec3.tar.bz2 nng-6237d268514e1f8aec562052954db22c4540eec3.zip | |
Provide a tiny buf for lmq buffer by default.
This allows us to make nni_lmq_init() non-failing. (Although
the buffer size requested at initialization might not be granted.)
Diffstat (limited to 'src/sp')
| -rw-r--r-- | src/sp/protocol/pair0/pair.c | 16 | ||||
| -rw-r--r-- | src/sp/protocol/pair1/pair.c | 16 | ||||
| -rw-r--r-- | src/sp/protocol/pipeline0/push.c | 6 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/pub.c | 86 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/sub.c | 19 | ||||
| -rw-r--r-- | src/sp/protocol/survey0/survey.c | 22 |
6 files changed, 74 insertions, 91 deletions
diff --git a/src/sp/protocol/pair0/pair.c b/src/sp/protocol/pair0/pair.c index c6470b7b..24c88d36 100644 --- a/src/sp/protocol/pair0/pair.c +++ b/src/sp/protocol/pair0/pair.c @@ -217,7 +217,7 @@ pair0_pipe_recv_cb(void *arg) // maybe we have room in the rmq? if (!nni_lmq_full(&s->rmq)) { - nni_lmq_putq(&s->rmq, msg); + nni_lmq_put(&s->rmq, msg); nni_aio_set_msg(&p->aio_recv, NULL); nni_pipe_recv(p->pipe, &p->aio_recv); } else { @@ -245,14 +245,14 @@ pair0_send_sched(pair0_sock *s) s->wr_ready = true; // if message waiting in buffered queue, then we prefer that. - if (nni_lmq_getq(&s->wmq, &m) == 0) { + if (nni_lmq_get(&s->wmq, &m) == 0) { pair0_pipe_send(p, m); if ((a = nni_list_first(&s->waq)) != NULL) { nni_aio_list_remove(a); m = nni_aio_get_msg(a); l = nni_msg_len(m); - nni_lmq_putq(&s->wmq, m); + nni_lmq_put(&s->wmq, m); } } else if ((a = nni_list_first(&s->waq)) != NULL) { @@ -311,8 +311,8 @@ pair0_sock_close(void *arg) nni_aio_list_remove(a); nni_aio_finish_error(a, NNG_ECLOSED); } - while ((nni_lmq_getq(&s->rmq, &m) == 0) || - (nni_lmq_getq(&s->wmq, &m) == 0)) { + while ((nni_lmq_get(&s->rmq, &m) == 0) || + (nni_lmq_get(&s->wmq, &m) == 0)) { nni_msg_free(m); } nni_mtx_unlock(&s->mtx); @@ -359,7 +359,7 @@ pair0_sock_send(void *arg, nni_aio *aio) } // Can we maybe queue it. - if (nni_lmq_putq(&s->wmq, m) == 0) { + if (nni_lmq_put(&s->wmq, m) == 0) { // Yay, we can. So we're done. nni_aio_set_msg(aio, NULL); nni_aio_finish(aio, 0, len); @@ -396,14 +396,14 @@ pair0_sock_recv(void *arg, nni_aio *aio) // Buffered read. If there is a message waiting for us, pick // it up. We might need to post another read request as well. - if (nni_lmq_getq(&s->rmq, &m) == 0) { + if (nni_lmq_get(&s->rmq, &m) == 0) { nni_aio_set_msg(aio, m); nni_aio_finish(aio, 0, nni_msg_len(m)); if (s->rd_ready) { s->rd_ready = false; m = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); - nni_lmq_putq(&s->rmq, m); + nni_lmq_put(&s->rmq, m); nni_pipe_recv(p->pipe, &p->aio_recv); } if (nni_lmq_empty(&s->rmq)) { diff --git a/src/sp/protocol/pair1/pair.c b/src/sp/protocol/pair1/pair.c index e6be4628..636e51e9 100644 --- a/src/sp/protocol/pair1/pair.c +++ b/src/sp/protocol/pair1/pair.c @@ -358,7 +358,7 @@ pair1_pipe_recv_cb(void *arg) // maybe we have room in the rmq? if (!nni_lmq_full(&s->rmq)) { - nni_lmq_putq(&s->rmq, msg); + nni_lmq_put(&s->rmq, msg); nni_aio_set_msg(&p->aio_recv, NULL); nni_pipe_recv(pipe, &p->aio_recv); } else { @@ -386,14 +386,14 @@ pair1_send_sched(pair1_sock *s) s->wr_ready = true; // if message waiting in buffered queue, then we prefer that. - if (nni_lmq_getq(&s->wmq, &m) == 0) { + if (nni_lmq_get(&s->wmq, &m) == 0) { pair1_pipe_send(p, m); if ((a = nni_list_first(&s->waq)) != NULL) { nni_aio_list_remove(a); m = nni_aio_get_msg(a); l = nni_msg_len(m); - nni_lmq_putq(&s->wmq, m); + nni_lmq_put(&s->wmq, m); } } else if ((a = nni_list_first(&s->waq)) != NULL) { @@ -452,8 +452,8 @@ pair1_sock_close(void *arg) nni_aio_list_remove(a); nni_aio_finish_error(a, NNG_ECLOSED); } - while ((nni_lmq_getq(&s->rmq, &m) == 0) || - (nni_lmq_getq(&s->wmq, &m) == 0)) { + while ((nni_lmq_get(&s->rmq, &m) == 0) || + (nni_lmq_get(&s->wmq, &m) == 0)) { nni_msg_free(m); } nni_mtx_unlock(&s->mtx); @@ -574,7 +574,7 @@ inject: } // Can we maybe queue it. - if (nni_lmq_putq(&s->wmq, m) == 0) { + if (nni_lmq_put(&s->wmq, m) == 0) { // Yay, we can. So we're done. nni_aio_set_msg(aio, NULL); nni_aio_finish(aio, 0, len); @@ -611,14 +611,14 @@ pair1_sock_recv(void *arg, nni_aio *aio) // Buffered read. If there is a message waiting for us, pick // it up. We might need to post another read request as well. - if (nni_lmq_getq(&s->rmq, &m) == 0) { + if (nni_lmq_get(&s->rmq, &m) == 0) { nni_aio_set_msg(aio, m); nni_aio_finish(aio, 0, nni_msg_len(m)); if (s->rd_ready) { s->rd_ready = false; m = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); - nni_lmq_putq(&s->rmq, m); + nni_lmq_put(&s->rmq, m); nni_pipe_recv(p->pipe, &p->aio_recv); } if (nni_lmq_empty(&s->rmq)) { diff --git a/src/sp/protocol/pipeline0/push.c b/src/sp/protocol/pipeline0/push.c index 028104cd..99cb2da4 100644 --- a/src/sp/protocol/pipeline0/push.c +++ b/src/sp/protocol/pipeline0/push.c @@ -193,7 +193,7 @@ push0_pipe_ready(push0_pipe *p) // if message is waiting in the buffered queue // then we prefer that. - if (nni_lmq_getq(&s->wq, &m) == 0) { + if (nni_lmq_get(&s->wq, &m) == 0) { nni_aio_set_msg(&p->aio_send, m); nni_pipe_send(p->pipe, &p->aio_send); @@ -201,7 +201,7 @@ push0_pipe_ready(push0_pipe *p) nni_aio_list_remove(a); m = nni_aio_get_msg(a); l = nni_msg_len(m); - nni_lmq_putq(&s->wq, m); + nni_lmq_put(&s->wq, m); } } else if ((a = nni_list_first(&s->aq)) != NULL) { @@ -300,7 +300,7 @@ push0_sock_send(void *arg, nni_aio *aio) } // Can we maybe queue it. - if (nni_lmq_putq(&s->wq, m) == 0) { + if (nni_lmq_put(&s->wq, m) == 0) { // Yay, we can. So we're done. nni_aio_set_msg(aio, NULL); nni_aio_finish(aio, 0, l); diff --git a/src/sp/protocol/pubsub0/pub.c b/src/sp/protocol/pubsub0/pub.c index cfc3ed6d..3911127a 100644 --- a/src/sp/protocol/pubsub0/pub.c +++ b/src/sp/protocol/pubsub0/pub.c @@ -36,22 +36,22 @@ static void pub0_pipe_fini(void *); // pub0_sock is our per-socket protocol private structure. struct pub0_sock { - nni_list pipes; - nni_mtx mtx; - bool closed; - size_t sendbuf; - nni_pollable sendable; + nni_list pipes; + nni_mtx mtx; + bool closed; + size_t sendbuf; + nni_pollable sendable; }; // pub0_pipe is our per-pipe protocol private structure. struct pub0_pipe { - nni_pipe * pipe; - pub0_sock * pub; + nni_pipe *pipe; + pub0_sock *pub; nni_lmq sendq; bool closed; bool busy; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; nni_list_node node; }; @@ -65,10 +65,10 @@ pub0_sock_fini(void *arg) } static int -pub0_sock_init(void *arg, nni_sock *nsock) +pub0_sock_init(void *arg, nni_sock *ns) { pub0_sock *sock = arg; - NNI_ARG_UNUSED(nsock); + NNI_ARG_UNUSED(ns); nni_pollable_init(&sock->sendable); nni_mtx_init(&sock->mtx); @@ -94,8 +94,8 @@ pub0_pipe_stop(void *arg) { pub0_pipe *p = arg; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -103,8 +103,8 @@ pub0_pipe_fini(void *arg) { pub0_pipe *p = arg; - nni_aio_free(p->aio_send); - nni_aio_free(p->aio_recv); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); nni_lmq_fini(&p->sendq); } @@ -113,21 +113,15 @@ pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { pub0_pipe *p = arg; pub0_sock *sock = s; - int rv; size_t len; nni_mtx_lock(&sock->mtx); len = sock->sendbuf; nni_mtx_unlock(&sock->mtx); - // XXX: consider making this depth tunable - if (((rv = nni_lmq_init(&p->sendq, len)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { - - pub0_pipe_fini(p); - return (rv); - } + nni_lmq_init(&p->sendq, len); + nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p); p->busy = false; p->pipe = pipe; @@ -149,7 +143,7 @@ pub0_pipe_start(void *arg) nni_mtx_unlock(&sock->mtx); // Start the receiver. - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -160,8 +154,8 @@ pub0_pipe_close(void *arg) pub0_pipe *p = arg; pub0_sock *sock = p->pub; - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_mtx_lock(&sock->mtx); p->closed = true; @@ -180,8 +174,8 @@ pub0_pipe_recv_cb(void *arg) // We should never receive a message -- the only valid reason for us to // be here is on pipe close. - if (nni_aio_result(p->aio_recv) == 0) { - nni_msg_free(nni_aio_get_msg(p->aio_recv)); + if (nni_aio_result(&p->aio_recv) == 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_recv)); } nni_pipe_close(p->pipe); } @@ -191,11 +185,11 @@ pub0_pipe_send_cb(void *arg) { pub0_pipe *p = arg; pub0_sock *sock = p->pub; - nni_msg * msg; + nni_msg *msg; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->pipe); return; } @@ -205,9 +199,9 @@ pub0_pipe_send_cb(void *arg) nni_mtx_unlock(&sock->mtx); return; } - if (nni_lmq_getq(&p->sendq, &msg) == 0) { - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + if (nni_lmq_get(&p->sendq, &msg) == 0) { + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); } else { p->busy = false; } @@ -228,7 +222,7 @@ pub0_sock_send(void *arg, nni_aio *aio) { pub0_sock *sock = arg; pub0_pipe *p; - nng_msg * msg; + nng_msg *msg; size_t len; msg = nni_aio_get_msg(aio); @@ -241,14 +235,14 @@ pub0_sock_send(void *arg, nni_aio *aio) if (nni_lmq_full(&p->sendq)) { // Make space for the new message. nni_msg *old; - (void) nni_lmq_getq(&p->sendq, &old); + (void) nni_lmq_get(&p->sendq, &old); nni_msg_free(old); } - nni_lmq_putq(&p->sendq, msg); + nni_lmq_put(&p->sendq, msg); } else { p->busy = true; - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); } } nni_mtx_unlock(&sock->mtx); @@ -289,7 +283,7 @@ pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t) nni_mtx_lock(&sock->mtx); sock->sendbuf = (size_t) val; NNI_LIST_FOREACH (&sock->pipes, p) { - // If we fail part way thru (should only be ENOMEM), we + // If we fail part way through (should only be ENOMEM), we // stop short. The others would likely fail for ENOMEM as // well anyway. There is a weird effect here where the // buffers may have been set for *some* of the pipes, but @@ -368,13 +362,13 @@ static nni_proto pub0_proto_raw = { }; int -nng_pub0_open(nng_socket *sidp) +nng_pub0_open(nng_socket *id) { - return (nni_proto_open(sidp, &pub0_proto)); + return (nni_proto_open(id, &pub0_proto)); } int -nng_pub0_open_raw(nng_socket *sidp) +nng_pub0_open_raw(nng_socket *id) { - return (nni_proto_open(sidp, &pub0_proto_raw)); + return (nni_proto_open(id, &pub0_proto_raw)); } diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index a40ee073..35b32af4 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Nathan Kent <nate@nkent.net> // @@ -115,7 +115,7 @@ again: return; } - (void) nni_lmq_getq(&ctx->lmq, &msg); + (void) nni_lmq_get(&ctx->lmq, &msg); if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { nni_pollable_clear(&sock->readable); @@ -182,15 +182,12 @@ sub0_ctx_init(void *ctx_arg, void *sock_arg) sub0_ctx * ctx = ctx_arg; size_t len; bool prefer_new; - int rv; nni_mtx_lock(&sock->lk); len = sock->recv_buf_len; prefer_new = sock->prefer_new; - if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) { - return (rv); - } + nni_lmq_init(&ctx->lmq, len); ctx->prefer_new = prefer_new; nni_aio_list_init(&ctx->recv_queue); @@ -385,14 +382,14 @@ sub0_recv_cb(void *arg) } else if (nni_lmq_full(&ctx->lmq)) { // Make space for the new message. nni_msg *old; - (void) nni_lmq_getq(&ctx->lmq, &old); + (void) nni_lmq_get(&ctx->lmq, &old); nni_msg_free(old); - (void) nni_lmq_putq(&ctx->lmq, dup_msg); + (void) nni_lmq_put(&ctx->lmq, dup_msg); queued = true; } else { - (void) nni_lmq_putq(&ctx->lmq, dup_msg); + (void) nni_lmq_put(&ctx->lmq, dup_msg); queued = true; } if (queued && ctx == &sock->master) { @@ -534,9 +531,9 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) for (size_t i = 0; i < len; i++) { nni_msg *msg; - (void) nni_lmq_getq(&ctx->lmq, &msg); + (void) nni_lmq_get(&ctx->lmq, &msg); if (sub0_matches(ctx, nni_msg_body(msg), nni_msg_len(msg))) { - (void) nni_lmq_putq(&ctx->lmq, msg); + (void) nni_lmq_put(&ctx->lmq, msg); } else { nni_msg_free(msg); } diff --git a/src/sp/protocol/survey0/survey.c b/src/sp/protocol/survey0/survey.c index ce1ed601..3287138d 100644 --- a/src/sp/protocol/survey0/survey.c +++ b/src/sp/protocol/survey0/survey.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -108,7 +108,6 @@ surv0_ctx_init(void *c, void *s) { surv0_ctx * ctx = c; surv0_sock * sock = s; - int rv; int len; nng_duration tmo; @@ -129,10 +128,7 @@ surv0_ctx_init(void *c, void *s) ctx->sock = sock; - if ((rv = nni_lmq_init(&ctx->recv_lmq, len)) != 0) { - surv0_ctx_fini(ctx); - return (rv); - } + nni_lmq_init(&ctx->recv_lmq, len); nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx); return (0); } @@ -172,7 +168,7 @@ surv0_ctx_recv(void *arg, nni_aio *aio) return; } again: - if (nni_lmq_getq(&ctx->recv_lmq, &msg) != 0) { + if (nni_lmq_get(&ctx->recv_lmq, &msg) != 0) { int rv; if ((rv = nni_aio_schedule(aio, &surv0_ctx_cancel, ctx)) != 0) { @@ -259,7 +255,7 @@ surv0_ctx_send(void *arg, nni_aio *aio) nni_pipe_send(pipe->pipe, &pipe->aio_send); } else if (!nni_lmq_full(&pipe->send_queue)) { nni_msg_clone(msg); - nni_lmq_putq(&pipe->send_queue, msg); + nni_lmq_put(&pipe->send_queue, msg); } } @@ -359,7 +355,6 @@ surv0_pipe_init(void *arg, nni_pipe *pipe, void *s) { surv0_pipe *p = arg; surv0_sock *sock = s; - int rv; int len; len = nni_atomic_get(&sock->send_buf); @@ -369,10 +364,7 @@ surv0_pipe_init(void *arg, nni_pipe *pipe, void *s) // This depth could be tunable. The deeper the queue, the more // concurrent surveys that can be delivered (multiple contexts). // Note that surveys can be *outstanding*, but not yet put on the wire. - if ((rv = nni_lmq_init(&p->send_queue, len)) != 0) { - surv0_pipe_fini(p); - return (rv); - } + nni_lmq_init(&p->send_queue, len); p->pipe = pipe; p->sock = sock; @@ -434,7 +426,7 @@ surv0_pipe_send_cb(void *arg) nni_mtx_unlock(&sock->mtx); return; } - if (nni_lmq_getq(&p->send_queue, &msg) == 0) { + if (nni_lmq_get(&p->send_queue, &msg) == 0) { nni_aio_set_msg(&p->aio_send, msg); nni_pipe_send(p->pipe, &p->aio_send); } else { @@ -482,7 +474,7 @@ surv0_pipe_recv_cb(void *arg) nni_list_remove(&ctx->recv_queue, aio); nni_aio_finish_msg(aio, msg); } else { - nni_lmq_putq(&ctx->recv_lmq, msg); + nni_lmq_put(&ctx->recv_lmq, msg); if (ctx == &sock->ctx) { nni_pollable_raise(&sock->readable); } |
