diff options
| author | Garrett D'Amore <garrett@damore.org> | 2021-12-25 17:38:14 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2021-12-25 17:38:14 -0800 |
| commit | 6237d268514e1f8aec562052954db22c4540eec3 (patch) | |
| tree | 32213ea0016ae10faee2817f414308c91d881c42 /src/sp/protocol/pubsub0 | |
| parent | 7a54bcd6fe345f35dd51eede6c5d66e8516c16ab (diff) | |
| download | nng-6237d268514e1f8aec562052954db22c4540eec3.tar.gz nng-6237d268514e1f8aec562052954db22c4540eec3.tar.bz2 nng-6237d268514e1f8aec562052954db22c4540eec3.zip | |
Provide a tiny buf for lmq buffer by default.
This allows us to make nni_lmq_init() non-failing. (Although
the buffer size requested at initialization might not be granted.)
Diffstat (limited to 'src/sp/protocol/pubsub0')
| -rw-r--r-- | src/sp/protocol/pubsub0/pub.c | 86 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/sub.c | 19 |
2 files changed, 48 insertions, 57 deletions
diff --git a/src/sp/protocol/pubsub0/pub.c b/src/sp/protocol/pubsub0/pub.c index cfc3ed6d..3911127a 100644 --- a/src/sp/protocol/pubsub0/pub.c +++ b/src/sp/protocol/pubsub0/pub.c @@ -36,22 +36,22 @@ static void pub0_pipe_fini(void *); // pub0_sock is our per-socket protocol private structure. struct pub0_sock { - nni_list pipes; - nni_mtx mtx; - bool closed; - size_t sendbuf; - nni_pollable sendable; + nni_list pipes; + nni_mtx mtx; + bool closed; + size_t sendbuf; + nni_pollable sendable; }; // pub0_pipe is our per-pipe protocol private structure. struct pub0_pipe { - nni_pipe * pipe; - pub0_sock * pub; + nni_pipe *pipe; + pub0_sock *pub; nni_lmq sendq; bool closed; bool busy; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; nni_list_node node; }; @@ -65,10 +65,10 @@ pub0_sock_fini(void *arg) } static int -pub0_sock_init(void *arg, nni_sock *nsock) +pub0_sock_init(void *arg, nni_sock *ns) { pub0_sock *sock = arg; - NNI_ARG_UNUSED(nsock); + NNI_ARG_UNUSED(ns); nni_pollable_init(&sock->sendable); nni_mtx_init(&sock->mtx); @@ -94,8 +94,8 @@ pub0_pipe_stop(void *arg) { pub0_pipe *p = arg; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -103,8 +103,8 @@ pub0_pipe_fini(void *arg) { pub0_pipe *p = arg; - nni_aio_free(p->aio_send); - nni_aio_free(p->aio_recv); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); nni_lmq_fini(&p->sendq); } @@ -113,21 +113,15 @@ pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { pub0_pipe *p = arg; pub0_sock *sock = s; - int rv; size_t len; nni_mtx_lock(&sock->mtx); len = sock->sendbuf; nni_mtx_unlock(&sock->mtx); - // XXX: consider making this depth tunable - if (((rv = nni_lmq_init(&p->sendq, len)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { - - pub0_pipe_fini(p); - return (rv); - } + nni_lmq_init(&p->sendq, len); + nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p); p->busy = false; p->pipe = pipe; @@ -149,7 +143,7 @@ pub0_pipe_start(void *arg) nni_mtx_unlock(&sock->mtx); // Start the receiver. - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -160,8 +154,8 @@ pub0_pipe_close(void *arg) pub0_pipe *p = arg; pub0_sock *sock = p->pub; - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_mtx_lock(&sock->mtx); p->closed = true; @@ -180,8 +174,8 @@ pub0_pipe_recv_cb(void *arg) // We should never receive a message -- the only valid reason for us to // be here is on pipe close. - if (nni_aio_result(p->aio_recv) == 0) { - nni_msg_free(nni_aio_get_msg(p->aio_recv)); + if (nni_aio_result(&p->aio_recv) == 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_recv)); } nni_pipe_close(p->pipe); } @@ -191,11 +185,11 @@ pub0_pipe_send_cb(void *arg) { pub0_pipe *p = arg; pub0_sock *sock = p->pub; - nni_msg * msg; + nni_msg *msg; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->pipe); return; } @@ -205,9 +199,9 @@ pub0_pipe_send_cb(void *arg) nni_mtx_unlock(&sock->mtx); return; } - if (nni_lmq_getq(&p->sendq, &msg) == 0) { - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + if (nni_lmq_get(&p->sendq, &msg) == 0) { + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); } else { p->busy = false; } @@ -228,7 +222,7 @@ pub0_sock_send(void *arg, nni_aio *aio) { pub0_sock *sock = arg; pub0_pipe *p; - nng_msg * msg; + nng_msg *msg; size_t len; msg = nni_aio_get_msg(aio); @@ -241,14 +235,14 @@ pub0_sock_send(void *arg, nni_aio *aio) if (nni_lmq_full(&p->sendq)) { // Make space for the new message. nni_msg *old; - (void) nni_lmq_getq(&p->sendq, &old); + (void) nni_lmq_get(&p->sendq, &old); nni_msg_free(old); } - nni_lmq_putq(&p->sendq, msg); + nni_lmq_put(&p->sendq, msg); } else { p->busy = true; - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); } } nni_mtx_unlock(&sock->mtx); @@ -289,7 +283,7 @@ pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t) nni_mtx_lock(&sock->mtx); sock->sendbuf = (size_t) val; NNI_LIST_FOREACH (&sock->pipes, p) { - // If we fail part way thru (should only be ENOMEM), we + // If we fail part way through (should only be ENOMEM), we // stop short. The others would likely fail for ENOMEM as // well anyway. There is a weird effect here where the // buffers may have been set for *some* of the pipes, but @@ -368,13 +362,13 @@ static nni_proto pub0_proto_raw = { }; int -nng_pub0_open(nng_socket *sidp) +nng_pub0_open(nng_socket *id) { - return (nni_proto_open(sidp, &pub0_proto)); + return (nni_proto_open(id, &pub0_proto)); } int -nng_pub0_open_raw(nng_socket *sidp) +nng_pub0_open_raw(nng_socket *id) { - return (nni_proto_open(sidp, &pub0_proto_raw)); + return (nni_proto_open(id, &pub0_proto_raw)); } diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index a40ee073..35b32af4 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Nathan Kent <nate@nkent.net> // @@ -115,7 +115,7 @@ again: return; } - (void) nni_lmq_getq(&ctx->lmq, &msg); + (void) nni_lmq_get(&ctx->lmq, &msg); if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { nni_pollable_clear(&sock->readable); @@ -182,15 +182,12 @@ sub0_ctx_init(void *ctx_arg, void *sock_arg) sub0_ctx * ctx = ctx_arg; size_t len; bool prefer_new; - int rv; nni_mtx_lock(&sock->lk); len = sock->recv_buf_len; prefer_new = sock->prefer_new; - if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) { - return (rv); - } + nni_lmq_init(&ctx->lmq, len); ctx->prefer_new = prefer_new; nni_aio_list_init(&ctx->recv_queue); @@ -385,14 +382,14 @@ sub0_recv_cb(void *arg) } else if (nni_lmq_full(&ctx->lmq)) { // Make space for the new message. nni_msg *old; - (void) nni_lmq_getq(&ctx->lmq, &old); + (void) nni_lmq_get(&ctx->lmq, &old); nni_msg_free(old); - (void) nni_lmq_putq(&ctx->lmq, dup_msg); + (void) nni_lmq_put(&ctx->lmq, dup_msg); queued = true; } else { - (void) nni_lmq_putq(&ctx->lmq, dup_msg); + (void) nni_lmq_put(&ctx->lmq, dup_msg); queued = true; } if (queued && ctx == &sock->master) { @@ -534,9 +531,9 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) for (size_t i = 0; i < len; i++) { nni_msg *msg; - (void) nni_lmq_getq(&ctx->lmq, &msg); + (void) nni_lmq_get(&ctx->lmq, &msg); if (sub0_matches(ctx, nni_msg_body(msg), nni_msg_len(msg))) { - (void) nni_lmq_putq(&ctx->lmq, msg); + (void) nni_lmq_put(&ctx->lmq, msg); } else { nni_msg_free(msg); } |
