aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-03 18:03:57 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-03 18:09:08 -0800
commitbcc3814b58e9b198344bdaf6e7a916a354841275 (patch)
tree795ce060fa8b4356bb4d17457abccdaf6fed8883 /src/protocol/pubsub0
parentd4cb4abccaa8a3bf319d19f97345c04ebd755053 (diff)
downloadnng-bcc3814b58e9b198344bdaf6e7a916a354841275.tar.gz
nng-bcc3814b58e9b198344bdaf6e7a916a354841275.tar.bz2
nng-bcc3814b58e9b198344bdaf6e7a916a354841275.zip
fixes #1104 move allocation of protocol objects to common core
fixes #1103 respondent could inline backtrace
Diffstat (limited to 'src/protocol/pubsub0')
-rw-r--r--src/protocol/pubsub0/pub.c24
-rw-r--r--src/protocol/pubsub0/sub.c99
-rw-r--r--src/protocol/pubsub0/xsub.c25
3 files changed, 52 insertions, 96 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index c54274a6..22195c52 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -63,27 +63,21 @@ pub0_sock_fini(void *arg)
nni_pollable_free(s->sendable);
nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
}
static int
-pub0_sock_init(void **sp, nni_sock *nsock)
+pub0_sock_init(void *arg, nni_sock *nsock)
{
- pub0_sock *sock;
+ pub0_sock *sock = arg;
int rv;
NNI_ARG_UNUSED(nsock);
- if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
- return (NNG_ENOMEM);
- }
if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) {
- NNI_FREE_STRUCT(sock);
return (rv);
}
nni_mtx_init(&sock->mtx);
NNI_LIST_INIT(&sock->pipes, pub0_pipe, node);
sock->sendbuf = 16; // fairly arbitrary
- *sp = sock;
return (0);
}
@@ -120,21 +114,16 @@ pub0_pipe_fini(void *arg)
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
nni_lmq_fini(&p->sendq);
- NNI_FREE_STRUCT(p);
}
static int
-pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+pub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- pub0_pipe *p;
+ pub0_pipe *p = arg;
pub0_sock *sock = s;
int rv;
size_t len;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
-
nni_mtx_lock(&sock->mtx);
len = sock->sendbuf;
nni_mtx_unlock(&sock->mtx);
@@ -151,7 +140,6 @@ pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
p->busy = false;
p->pipe = pipe;
p->pub = s;
- *pp = p;
return (0);
}
@@ -352,6 +340,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t)
}
static nni_proto_pipe_ops pub0_pipe_ops = {
+ .pipe_size = sizeof (pub0_pipe),
.pipe_init = pub0_pipe_init,
.pipe_fini = pub0_pipe_fini,
.pipe_start = pub0_pipe_start,
@@ -376,6 +365,7 @@ static nni_option pub0_sock_options[] = {
};
static nni_proto_sock_ops pub0_sock_ops = {
+ .sock_size = sizeof (pub0_sock),
.sock_init = pub0_sock_init,
.sock_fini = pub0_sock_fini,
.sock_open = pub0_sock_open,
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index aca28f0c..0a367216 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Nathan Kent <nate@nkent.net>
//
@@ -9,9 +9,9 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
-#include <stdbool.h>
#include "core/nng_impl.h"
#include "nng/protocol/pubsub0/sub.h"
@@ -58,20 +58,12 @@ struct sub0_ctx {
bool closed;
nni_lmq lmq;
bool prefnew;
-
-#if 0
- nni_msg **recvq;
- size_t recvqcap;
- size_t recvqlen;
- size_t recvqget;
- size_t recvqput;
-#endif
};
// sub0_sock is our per-socket protocol private structure.
struct sub0_sock {
nni_pollable *recvable;
- sub0_ctx * ctx; // default context
+ sub0_ctx ctx; // default context
nni_list ctxs; // all contexts
size_t recvbuflen;
bool prefnew;
@@ -131,7 +123,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
(void) nni_lmq_getq(&ctx->lmq, &msg);
- if (nni_lmq_empty(&ctx->lmq) && (ctx == sock->ctx)) {
+ if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->ctx)) {
nni_pollable_clear(sock->recvable);
}
nni_aio_set_msg(aio, msg);
@@ -184,24 +176,19 @@ sub0_ctx_fini(void *arg)
}
nni_lmq_fini(&ctx->lmq);
- NNI_FREE_STRUCT(ctx);
}
static int
-sub0_ctx_init(void **ctxp, void *sarg)
+sub0_ctx_init(void *carg, void *sarg)
{
sub0_sock *sock = sarg;
- sub0_ctx * ctx;
+ sub0_ctx * ctx = carg;
size_t len;
bool prefnew;
int rv;
- if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
- return (NNG_ENOMEM);
- }
-
nni_mtx_lock(&sock->lk);
- len = sock->recvbuflen;
+ len = sock->recvbuflen;
prefnew = sock->prefnew;
if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) {
@@ -213,7 +200,6 @@ sub0_ctx_init(void **ctxp, void *sarg)
NNI_LIST_INIT(&ctx->topics, sub0_topic, node);
ctx->sock = sock;
- *ctxp = ctx;
nni_list_append(&sock->ctxs, ctx);
nni_mtx_unlock(&sock->lk);
@@ -226,39 +212,32 @@ sub0_sock_fini(void *arg)
{
sub0_sock *sock = arg;
- if (sock->ctx != NULL) {
- sub0_ctx_fini(sock->ctx);
- }
+ sub0_ctx_fini(&sock->ctx);
if (sock->recvable != NULL) {
nni_pollable_free(sock->recvable);
}
nni_mtx_fini(&sock->lk);
- NNI_FREE_STRUCT(sock);
}
static int
-sub0_sock_init(void **sp, nni_sock *nsock)
+sub0_sock_init(void *arg, nni_sock *nsock)
{
- sub0_sock *sock;
+ sub0_sock *sock = arg;
int rv;
NNI_ARG_UNUSED(nsock);
- if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
- return (NNG_ENOMEM);
- }
NNI_LIST_INIT(&sock->ctxs, sub0_ctx, node);
nni_mtx_init(&sock->lk);
sock->recvbuflen = SUB0_DEFAULT_QLEN;
- sock->prefnew = SUB0_DEFAULT_PREFNEW;
+ sock->prefnew = SUB0_DEFAULT_PREFNEW;
- if (((rv = sub0_ctx_init((void **) &sock->ctx, sock)) != 0) ||
+ if (((rv = sub0_ctx_init(&sock->ctx, sock)) != 0) ||
((rv = nni_pollable_alloc(&sock->recvable)) != 0)) {
sub0_sock_fini(sock);
return (rv);
}
- *sp = sock;
return (0);
}
@@ -272,7 +251,7 @@ static void
sub0_sock_close(void *arg)
{
sub0_sock *sock = arg;
- sub0_ctx_close(sock->ctx);
+ sub0_ctx_close(&sock->ctx);
}
static void
@@ -289,18 +268,14 @@ sub0_pipe_fini(void *arg)
sub0_pipe *p = arg;
nni_aio_fini(p->aio_recv);
- NNI_FREE_STRUCT(p);
}
static int
-sub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+sub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- sub0_pipe *p;
+ sub0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) {
sub0_pipe_fini(p);
return (rv);
@@ -308,7 +283,6 @@ sub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
p->pipe = pipe;
p->sub = s;
- *pp = p;
return (0);
}
@@ -404,12 +378,12 @@ sub0_recv_cb(void *arg)
continue; // TODO: Bump a stat!
}
- // If we got to this point, we are capable of receiving this message
- // and it is intended for us.
+ // If we got to this point, we are capable of receiving this
+ // message and it is intended for us.
submatch = true;
if (!nni_list_empty(&ctx->raios)) {
- nni_aio *aio = nni_list_first(&ctx->raios);
+ aio = nni_list_first(&ctx->raios);
nni_list_remove(&ctx->raios, aio);
nni_aio_set_msg(aio, dup);
@@ -417,7 +391,7 @@ sub0_recv_cb(void *arg)
nni_list_append(&finish, aio);
} else if (nni_lmq_full(&ctx->lmq)) {
// Make space for the new message.
- nni_msg * old;
+ nni_msg *old;
(void) nni_lmq_getq(&ctx->lmq, &old);
nni_msg_free(old);
@@ -478,7 +452,7 @@ sub0_ctx_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t)
// If we change the socket, then this will change the queue for
// any new contexts. (Previously constructed contexts are unaffected.)
- if (sock->ctx == ctx) {
+ if (&sock->ctx == ctx) {
sock->recvbuflen = (size_t) val;
}
nni_mtx_unlock(&sock->lk);
@@ -575,9 +549,9 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
- bool val;
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ bool val;
nni_mtx_lock(&sock->lk);
val = ctx->prefnew;
@@ -589,10 +563,10 @@ sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
- bool val;
- int rv;
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ bool val;
+ int rv;
if ((rv = nni_copyin_bool(&val, buf, sz, t)) != 0) {
return (rv);
@@ -600,7 +574,7 @@ sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
nni_mtx_lock(&sock->lk);
ctx->prefnew = val;
- if (sock->ctx == ctx) {
+ if (&sock->ctx == ctx) {
sock->prefnew = val;
}
nni_mtx_unlock(&sock->lk);
@@ -646,7 +620,7 @@ sub0_sock_recv(void *arg, nni_aio *aio)
{
sub0_sock *sock = arg;
- sub0_ctx_recv(sock->ctx, aio);
+ sub0_ctx_recv(&sock->ctx, aio);
}
static int
@@ -666,47 +640,48 @@ static int
sub0_sock_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_get_recvbuf(sock->ctx, buf, szp, t));
+ return (sub0_ctx_get_recvbuf(&sock->ctx, buf, szp, t));
}
static int
sub0_sock_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_set_recvbuf(sock->ctx, buf, sz, t));
+ return (sub0_ctx_set_recvbuf(&sock->ctx, buf, sz, t));
}
static int
sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_subscribe(sock->ctx, buf, sz, t));
+ return (sub0_ctx_subscribe(&sock->ctx, buf, sz, t));
}
static int
sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_unsubscribe(sock->ctx, buf, sz, t));
+ return (sub0_ctx_unsubscribe(&sock->ctx, buf, sz, t));
}
static int
sub0_sock_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_get_prefnew(sock->ctx, buf, szp, t));
+ return (sub0_ctx_get_prefnew(&sock->ctx, buf, szp, t));
}
static int
sub0_sock_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_set_prefnew(sock->ctx, buf, sz, t));
+ return (sub0_ctx_set_prefnew(&sock->ctx, buf, sz, t));
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops sub0_pipe_ops = {
+ .pipe_size = sizeof(sub0_pipe),
.pipe_init = sub0_pipe_init,
.pipe_fini = sub0_pipe_fini,
.pipe_start = sub0_pipe_start,
@@ -715,6 +690,7 @@ static nni_proto_pipe_ops sub0_pipe_ops = {
};
static nni_proto_ctx_ops sub0_ctx_ops = {
+ .ctx_size = sizeof(sub0_ctx),
.ctx_init = sub0_ctx_init,
.ctx_fini = sub0_ctx_fini,
.ctx_send = sub0_ctx_send,
@@ -752,6 +728,7 @@ static nni_option sub0_sock_options[] = {
};
static nni_proto_sock_ops sub0_sock_ops = {
+ .sock_size = sizeof(sub0_sock),
.sock_init = sub0_sock_init,
.sock_fini = sub0_sock_fini,
.sock_open = sub0_sock_open,
diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c
index b334bf87..be300df4 100644
--- a/src/protocol/pubsub0/xsub.c
+++ b/src/protocol/pubsub0/xsub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -9,7 +9,6 @@
//
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/pubsub0/sub.h"
@@ -46,17 +45,11 @@ struct xsub0_pipe {
};
static int
-xsub0_sock_init(void **sp, nni_sock *sock)
+xsub0_sock_init(void *arg, nni_sock *sock)
{
- xsub0_sock *s;
-
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
- nni_mtx_init(&s->lk);
+ xsub0_sock *s = arg;
s->urq = nni_sock_recvq(sock);
- *sp = s;
return (0);
}
@@ -65,7 +58,6 @@ xsub0_sock_fini(void *arg)
{
xsub0_sock *s = arg;
nni_mtx_fini(&s->lk);
- NNI_FREE_STRUCT(s);
}
static void
@@ -94,18 +86,14 @@ xsub0_pipe_fini(void *arg)
xsub0_pipe *p = arg;
nni_aio_fini(p->aio_recv);
- NNI_FREE_STRUCT(p);
}
static int
-xsub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+xsub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- xsub0_pipe *p;
+ xsub0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if ((rv = nni_aio_init(&p->aio_recv, xsub0_recv_cb, p)) != 0) {
xsub0_pipe_fini(p);
return (rv);
@@ -113,7 +101,6 @@ xsub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
p->pipe = pipe;
p->sub = s;
- *pp = p;
return (0);
}
@@ -190,6 +177,7 @@ xsub0_sock_recv(void *arg, nni_aio *aio)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops xsub0_pipe_ops = {
+ .pipe_size = sizeof(xsub0_pipe),
.pipe_init = xsub0_pipe_init,
.pipe_fini = xsub0_pipe_fini,
.pipe_start = xsub0_pipe_start,
@@ -205,6 +193,7 @@ static nni_option xsub0_sock_options[] = {
};
static nni_proto_sock_ops xsub0_sock_ops = {
+ .sock_size = sizeof(xsub0_sock),
.sock_init = xsub0_sock_init,
.sock_fini = xsub0_sock_fini,
.sock_open = xsub0_sock_open,