diff options
Diffstat (limited to 'src/protocol/pubsub0')
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 24 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 99 | ||||
| -rw-r--r-- | src/protocol/pubsub0/xsub.c | 25 |
3 files changed, 52 insertions, 96 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index c54274a6..22195c52 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -63,27 +63,21 @@ pub0_sock_fini(void *arg) nni_pollable_free(s->sendable); nni_mtx_fini(&s->mtx); - NNI_FREE_STRUCT(s); } static int -pub0_sock_init(void **sp, nni_sock *nsock) +pub0_sock_init(void *arg, nni_sock *nsock) { - pub0_sock *sock; + pub0_sock *sock = arg; int rv; NNI_ARG_UNUSED(nsock); - if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { - return (NNG_ENOMEM); - } if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) { - NNI_FREE_STRUCT(sock); return (rv); } nni_mtx_init(&sock->mtx); NNI_LIST_INIT(&sock->pipes, pub0_pipe, node); sock->sendbuf = 16; // fairly arbitrary - *sp = sock; return (0); } @@ -120,21 +114,16 @@ pub0_pipe_fini(void *arg) nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); nni_lmq_fini(&p->sendq); - NNI_FREE_STRUCT(p); } static int -pub0_pipe_init(void **pp, nni_pipe *pipe, void *s) +pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { - pub0_pipe *p; + pub0_pipe *p = arg; pub0_sock *sock = s; int rv; size_t len; - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_lock(&sock->mtx); len = sock->sendbuf; nni_mtx_unlock(&sock->mtx); @@ -151,7 +140,6 @@ pub0_pipe_init(void **pp, nni_pipe *pipe, void *s) p->busy = false; p->pipe = pipe; p->pub = s; - *pp = p; return (0); } @@ -352,6 +340,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t) } static nni_proto_pipe_ops pub0_pipe_ops = { + .pipe_size = sizeof (pub0_pipe), .pipe_init = pub0_pipe_init, .pipe_fini = pub0_pipe_fini, .pipe_start = pub0_pipe_start, @@ -376,6 +365,7 @@ static nni_option pub0_sock_options[] = { }; static nni_proto_sock_ops pub0_sock_ops = { + .sock_size = sizeof (pub0_sock), .sock_init = pub0_sock_init, .sock_fini = pub0_sock_fini, .sock_open = pub0_sock_open, diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index aca28f0c..0a367216 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Nathan Kent <nate@nkent.net> // @@ -9,9 +9,9 @@ // found online at https://opensource.org/licenses/MIT. // +#include <stdbool.h> #include <stdlib.h> #include <string.h> -#include <stdbool.h> #include "core/nng_impl.h" #include "nng/protocol/pubsub0/sub.h" @@ -58,20 +58,12 @@ struct sub0_ctx { bool closed; nni_lmq lmq; bool prefnew; - -#if 0 - nni_msg **recvq; - size_t recvqcap; - size_t recvqlen; - size_t recvqget; - size_t recvqput; -#endif }; // sub0_sock is our per-socket protocol private structure. struct sub0_sock { nni_pollable *recvable; - sub0_ctx * ctx; // default context + sub0_ctx ctx; // default context nni_list ctxs; // all contexts size_t recvbuflen; bool prefnew; @@ -131,7 +123,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio) (void) nni_lmq_getq(&ctx->lmq, &msg); - if (nni_lmq_empty(&ctx->lmq) && (ctx == sock->ctx)) { + if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->ctx)) { nni_pollable_clear(sock->recvable); } nni_aio_set_msg(aio, msg); @@ -184,24 +176,19 @@ sub0_ctx_fini(void *arg) } nni_lmq_fini(&ctx->lmq); - NNI_FREE_STRUCT(ctx); } static int -sub0_ctx_init(void **ctxp, void *sarg) +sub0_ctx_init(void *carg, void *sarg) { sub0_sock *sock = sarg; - sub0_ctx * ctx; + sub0_ctx * ctx = carg; size_t len; bool prefnew; int rv; - if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_lock(&sock->lk); - len = sock->recvbuflen; + len = sock->recvbuflen; prefnew = sock->prefnew; if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) { @@ -213,7 +200,6 @@ sub0_ctx_init(void **ctxp, void *sarg) NNI_LIST_INIT(&ctx->topics, sub0_topic, node); ctx->sock = sock; - *ctxp = ctx; nni_list_append(&sock->ctxs, ctx); nni_mtx_unlock(&sock->lk); @@ -226,39 +212,32 @@ sub0_sock_fini(void *arg) { sub0_sock *sock = arg; - if (sock->ctx != NULL) { - sub0_ctx_fini(sock->ctx); - } + sub0_ctx_fini(&sock->ctx); if (sock->recvable != NULL) { nni_pollable_free(sock->recvable); } nni_mtx_fini(&sock->lk); - NNI_FREE_STRUCT(sock); } static int -sub0_sock_init(void **sp, nni_sock *nsock) +sub0_sock_init(void *arg, nni_sock *nsock) { - sub0_sock *sock; + sub0_sock *sock = arg; int rv; NNI_ARG_UNUSED(nsock); - if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { - return (NNG_ENOMEM); - } NNI_LIST_INIT(&sock->ctxs, sub0_ctx, node); nni_mtx_init(&sock->lk); sock->recvbuflen = SUB0_DEFAULT_QLEN; - sock->prefnew = SUB0_DEFAULT_PREFNEW; + sock->prefnew = SUB0_DEFAULT_PREFNEW; - if (((rv = sub0_ctx_init((void **) &sock->ctx, sock)) != 0) || + if (((rv = sub0_ctx_init(&sock->ctx, sock)) != 0) || ((rv = nni_pollable_alloc(&sock->recvable)) != 0)) { sub0_sock_fini(sock); return (rv); } - *sp = sock; return (0); } @@ -272,7 +251,7 @@ static void sub0_sock_close(void *arg) { sub0_sock *sock = arg; - sub0_ctx_close(sock->ctx); + sub0_ctx_close(&sock->ctx); } static void @@ -289,18 +268,14 @@ sub0_pipe_fini(void *arg) sub0_pipe *p = arg; nni_aio_fini(p->aio_recv); - NNI_FREE_STRUCT(p); } static int -sub0_pipe_init(void **pp, nni_pipe *pipe, void *s) +sub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { - sub0_pipe *p; + sub0_pipe *p = arg; int rv; - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) { sub0_pipe_fini(p); return (rv); @@ -308,7 +283,6 @@ sub0_pipe_init(void **pp, nni_pipe *pipe, void *s) p->pipe = pipe; p->sub = s; - *pp = p; return (0); } @@ -404,12 +378,12 @@ sub0_recv_cb(void *arg) continue; // TODO: Bump a stat! } - // If we got to this point, we are capable of receiving this message - // and it is intended for us. + // If we got to this point, we are capable of receiving this + // message and it is intended for us. submatch = true; if (!nni_list_empty(&ctx->raios)) { - nni_aio *aio = nni_list_first(&ctx->raios); + aio = nni_list_first(&ctx->raios); nni_list_remove(&ctx->raios, aio); nni_aio_set_msg(aio, dup); @@ -417,7 +391,7 @@ sub0_recv_cb(void *arg) nni_list_append(&finish, aio); } else if (nni_lmq_full(&ctx->lmq)) { // Make space for the new message. - nni_msg * old; + nni_msg *old; (void) nni_lmq_getq(&ctx->lmq, &old); nni_msg_free(old); @@ -478,7 +452,7 @@ sub0_ctx_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t) // If we change the socket, then this will change the queue for // any new contexts. (Previously constructed contexts are unaffected.) - if (sock->ctx == ctx) { + if (&sock->ctx == ctx) { sock->recvbuflen = (size_t) val; } nni_mtx_unlock(&sock->lk); @@ -575,9 +549,9 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) static int sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t) { - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; - bool val; + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + bool val; nni_mtx_lock(&sock->lk); val = ctx->prefnew; @@ -589,10 +563,10 @@ sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t) static int sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; - bool val; - int rv; + sub0_ctx * ctx = arg; + sub0_sock *sock = ctx->sock; + bool val; + int rv; if ((rv = nni_copyin_bool(&val, buf, sz, t)) != 0) { return (rv); @@ -600,7 +574,7 @@ sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) nni_mtx_lock(&sock->lk); ctx->prefnew = val; - if (sock->ctx == ctx) { + if (&sock->ctx == ctx) { sock->prefnew = val; } nni_mtx_unlock(&sock->lk); @@ -646,7 +620,7 @@ sub0_sock_recv(void *arg, nni_aio *aio) { sub0_sock *sock = arg; - sub0_ctx_recv(sock->ctx, aio); + sub0_ctx_recv(&sock->ctx, aio); } static int @@ -666,47 +640,48 @@ static int sub0_sock_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_get_recvbuf(sock->ctx, buf, szp, t)); + return (sub0_ctx_get_recvbuf(&sock->ctx, buf, szp, t)); } static int sub0_sock_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_set_recvbuf(sock->ctx, buf, sz, t)); + return (sub0_ctx_set_recvbuf(&sock->ctx, buf, sz, t)); } static int sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_subscribe(sock->ctx, buf, sz, t)); + return (sub0_ctx_subscribe(&sock->ctx, buf, sz, t)); } static int sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_unsubscribe(sock->ctx, buf, sz, t)); + return (sub0_ctx_unsubscribe(&sock->ctx, buf, sz, t)); } static int sub0_sock_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_get_prefnew(sock->ctx, buf, szp, t)); + return (sub0_ctx_get_prefnew(&sock->ctx, buf, szp, t)); } static int sub0_sock_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_set_prefnew(sock->ctx, buf, sz, t)); + return (sub0_ctx_set_prefnew(&sock->ctx, buf, sz, t)); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops sub0_pipe_ops = { + .pipe_size = sizeof(sub0_pipe), .pipe_init = sub0_pipe_init, .pipe_fini = sub0_pipe_fini, .pipe_start = sub0_pipe_start, @@ -715,6 +690,7 @@ static nni_proto_pipe_ops sub0_pipe_ops = { }; static nni_proto_ctx_ops sub0_ctx_ops = { + .ctx_size = sizeof(sub0_ctx), .ctx_init = sub0_ctx_init, .ctx_fini = sub0_ctx_fini, .ctx_send = sub0_ctx_send, @@ -752,6 +728,7 @@ static nni_option sub0_sock_options[] = { }; static nni_proto_sock_ops sub0_sock_ops = { + .sock_size = sizeof(sub0_sock), .sock_init = sub0_sock_init, .sock_fini = sub0_sock_fini, .sock_open = sub0_sock_open, diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c index b334bf87..be300df4 100644 --- a/src/protocol/pubsub0/xsub.c +++ b/src/protocol/pubsub0/xsub.c @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -9,7 +9,6 @@ // #include <stdlib.h> -#include <string.h> #include "core/nng_impl.h" #include "nng/protocol/pubsub0/sub.h" @@ -46,17 +45,11 @@ struct xsub0_pipe { }; static int -xsub0_sock_init(void **sp, nni_sock *sock) +xsub0_sock_init(void *arg, nni_sock *sock) { - xsub0_sock *s; - - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&s->lk); + xsub0_sock *s = arg; s->urq = nni_sock_recvq(sock); - *sp = s; return (0); } @@ -65,7 +58,6 @@ xsub0_sock_fini(void *arg) { xsub0_sock *s = arg; nni_mtx_fini(&s->lk); - NNI_FREE_STRUCT(s); } static void @@ -94,18 +86,14 @@ xsub0_pipe_fini(void *arg) xsub0_pipe *p = arg; nni_aio_fini(p->aio_recv); - NNI_FREE_STRUCT(p); } static int -xsub0_pipe_init(void **pp, nni_pipe *pipe, void *s) +xsub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { - xsub0_pipe *p; + xsub0_pipe *p = arg; int rv; - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } if ((rv = nni_aio_init(&p->aio_recv, xsub0_recv_cb, p)) != 0) { xsub0_pipe_fini(p); return (rv); @@ -113,7 +101,6 @@ xsub0_pipe_init(void **pp, nni_pipe *pipe, void *s) p->pipe = pipe; p->sub = s; - *pp = p; return (0); } @@ -190,6 +177,7 @@ xsub0_sock_recv(void *arg, nni_aio *aio) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops xsub0_pipe_ops = { + .pipe_size = sizeof(xsub0_pipe), .pipe_init = xsub0_pipe_init, .pipe_fini = xsub0_pipe_fini, .pipe_start = xsub0_pipe_start, @@ -205,6 +193,7 @@ static nni_option xsub0_sock_options[] = { }; static nni_proto_sock_ops xsub0_sock_ops = { + .sock_size = sizeof(xsub0_sock), .sock_init = xsub0_sock_init, .sock_fini = xsub0_sock_fini, .sock_open = xsub0_sock_open, |
