aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/defs.h6
-rw-r--r--src/core/pipe.c14
-rw-r--r--src/core/protocol.h44
-rw-r--r--src/core/socket.c45
-rw-r--r--src/core/sockimpl.h3
-rw-r--r--src/protocol/bus0/bus.c32
-rw-r--r--src/protocol/pair0/pair.c22
-rw-r--r--src/protocol/pair1/pair.c34
-rw-r--r--src/protocol/pipeline0/pull.c27
-rw-r--r--src/protocol/pipeline0/push.c29
-rw-r--r--src/protocol/pubsub0/pub.c24
-rw-r--r--src/protocol/pubsub0/sub.c99
-rw-r--r--src/protocol/pubsub0/xsub.c25
-rw-r--r--src/protocol/reqrep0/rep.c61
-rw-r--r--src/protocol/reqrep0/req.c94
-rw-r--r--src/protocol/reqrep0/xrep.c30
-rw-r--r--src/protocol/reqrep0/xreq.c28
-rw-r--r--src/protocol/survey0/respond.c76
-rw-r--r--src/protocol/survey0/survey.c61
-rw-r--r--src/protocol/survey0/xrespond.c27
-rw-r--r--src/protocol/survey0/xsurvey.c25
21 files changed, 308 insertions, 498 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index 9b7abafa..0e0deac2 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -134,6 +134,12 @@ typedef struct {
// This increments a pointer a fixed number of byte cells.
#define NNI_INCPTR(ptr, n) ((ptr) = (void *) ((char *) (ptr) + (n)))
+// Alignment -- this is used when allocating adjacent objects to ensure
+// that each object begins on a natural alignment boundary.
+#define NNI_ALIGN_SIZE sizeof(void *)
+#define NNI_ALIGN_MASK (NNI_ALIGN_SIZE - 1)
+#define NNI_ALIGN_UP(sz) (((sz) + NNI_ALIGN_MASK) & ~NNI_ALIGN_MASK)
+
// A few assorted other items.
#define NNI_FLAG_IPV4ONLY 1
diff --git a/src/core/pipe.c b/src/core/pipe.c
index f7269eb4..4076b62c 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -95,7 +95,7 @@ pipe_destroy(nni_pipe *p)
}
nni_cv_fini(&p->p_cv);
nni_mtx_fini(&p->p_mtx);
- NNI_FREE_STRUCT(p);
+ nni_free(p, p->p_size);
}
int
@@ -188,17 +188,21 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
nni_pipe_stats * st;
+ size_t sz;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof (*p)) + pops->pipe_size;
+
+ if ((p = nni_zalloc(sz)) == NULL) {
// In this case we just toss the pipe...
tran->tran_pipe->p_fini(tdata);
return (NNG_ENOMEM);
}
+ p->p_size = sz;
+ p->p_proto_data = p + 1;
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
- p->p_proto_data = NULL;
p->p_sock = sock;
p->p_closed = false;
p->p_cbs = false;
@@ -242,7 +246,7 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_stat_add(&st->s_root, &st->s_txbytes);
if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tdata, p)) != 0) ||
- ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
+ ((rv = pops->pipe_init(p->p_proto_data, p, sdata)) != 0)) {
nni_pipe_close(p);
nni_pipe_rele(p);
return (rv);
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 005e34fd..77ccdb08 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -25,9 +25,13 @@
// nni_proto_pipe contains protocol-specific per-pipe operations.
struct nni_proto_pipe_ops {
- // pipe_init creates the protocol-specific per pipe data structure.
+ // pipe_size is the size of a protocol pipe object. The common
+ // code allocates this memory for the protocol private state.
+ size_t pipe_size;
+
+ // pipe_init2 initializes the protocol-specific pipe data structure.
// The last argument is the per-socket protocol private data.
- int (*pipe_init)(void **, nni_pipe *, void *);
+ int (*pipe_init)(void *, nni_pipe *, void *);
// pipe_fini releases any pipe data structures. This is called after
// the pipe has been removed from the protocol, and the generic
@@ -53,9 +57,13 @@ struct nni_proto_pipe_ops {
};
struct nni_proto_ctx_ops {
- // ctx_init creates a new context. The second argument is the
+ // ctx_size is the size of a protocol context object. The common
+ // code allocates this memory for the protocol private state.
+ size_t ctx_size;
+
+ // ctx_init initializes a new context. The second argument is the
// protocol specific socket structure.
- int (*ctx_init)(void **, void *);
+ int (*ctx_init)(void *, void *);
// ctx_fini destroys a context.
void (*ctx_fini)(void *);
@@ -79,10 +87,13 @@ struct nni_proto_ctx_ops {
};
struct nni_proto_sock_ops {
- // sock_init creates the protocol instance, which will be stored on
- // the socket. This is run without the sock lock held, and allocates
- // storage or other resources for the socket.
- int (*sock_init)(void **, nni_sock *);
+ // ctx_size is the size of a protocol socket object. The common
+ // code allocates this memory for the protocol private state.
+ size_t sock_size;
+
+ // sock_init2 initializes the protocol instance, which will be stored
+ // on the socket. This is run without the sock lock held.
+ int (*sock_init)(void *, nni_sock *);
// sock_fini destroys the protocol instance. This is run without the
// socket lock held, and is intended to release resources. It may
@@ -141,8 +152,9 @@ struct nni_proto {
// during the life of the project. If we add a new version, please keep
// the old version around -- it may be possible to automatically convert
// older versions in the future.
-#define NNI_PROTOCOL_V0 0x50520000 // "pr\0\0"
-#define NNI_PROTOCOL_VERSION NNI_PROTOCOL_V0
+#define NNI_PROTOCOL_V0 0x50520000u // "pr\0\0"
+#define NNI_PROTOCOL_V1 0x50520001u // "pr\0\0"
+#define NNI_PROTOCOL_VERSION NNI_PROTOCOL_V1
// These flags determine which operations make sense. We use them so that
// we can reject attempts to create notification fds for operations that make
@@ -150,11 +162,11 @@ struct nni_proto {
// that at the socket layer (NNG_PROTO_FLAG_RAW). Finally, we provide the
// NNI_PROTO_FLAG_NOMSGQ flag for protocols that do not use the upper write
// or upper read queues.
-#define NNI_PROTO_FLAG_RCV 1 // Protocol can receive
-#define NNI_PROTO_FLAG_SND 2 // Protocol can send
-#define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv
-#define NNI_PROTO_FLAG_RAW 4 // Protocol is raw
-#define NNI_PROTO_FLAG_NOMSGQ 8 // Protocol bypasses the upper queues
+#define NNI_PROTO_FLAG_RCV 1u // Protocol can receive
+#define NNI_PROTO_FLAG_SND 2u // Protocol can send
+#define NNI_PROTO_FLAG_SNDRCV 3u // Protocol can both send & recv
+#define NNI_PROTO_FLAG_RAW 4u // Protocol is raw
+#define NNI_PROTO_FLAG_NOMSGQ 8u // Protocol bypasses the upper queues
// nni_proto_open is called by the protocol to create a socket instance
// with its ops vector. The intent is that applications will only see
diff --git a/src/core/socket.c b/src/core/socket.c
index 13d88253..bffe0f67 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -26,6 +26,7 @@ struct nni_ctx {
nni_sock * c_sock;
nni_proto_ctx_ops c_ops;
void * c_data;
+ size_t c_size;
bool c_closed;
unsigned c_refcnt; // protected by global lock
uint32_t c_id;
@@ -71,6 +72,7 @@ struct nni_socket {
uint32_t s_flags;
unsigned s_refcnt; // protected by global lock
void * s_data; // Protocol private
+ size_t s_size;
nni_msgq *s_uwq; // Upper write queue
nni_msgq *s_urq; // Upper read queue
@@ -496,7 +498,7 @@ sock_destroy(nni_sock *s)
nni_cv_fini(&s->s_cv);
nni_mtx_fini(&s->s_mx);
nni_mtx_fini(&s->s_pipe_cbs_mtx);
- NNI_FREE_STRUCT(s);
+ nni_free(s, s->s_size);
}
static int
@@ -505,10 +507,13 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
int rv;
nni_sock *s;
bool on;
+ size_t sz;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*s)) + proto->proto_sock_ops->sock_size;
+ if ((s = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
+ s->s_data = s + 1;
s->s_sndtimeo = -1;
s->s_rcvtimeo = -1;
s->s_reconn = NNI_SECOND;
@@ -545,7 +550,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 1)) != 0) ||
- ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0) ||
+ ((rv = s->s_sock_ops.sock_init(s->s_data, s)) != 0) ||
((rv = nni_sock_setopt(s, NNG_OPT_SENDTIMEO, &s->s_sndtimeo,
sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
((rv = nni_sock_setopt(s, NNG_OPT_RECVTIMEO, &s->s_rcvtimeo,
@@ -989,7 +994,7 @@ nni_sock_setopt(
return (rv);
}
- // Prepare a copy of the sockoption.
+ // Prepare a copy of the socket option.
if ((optv = NNI_ALLOC_STRUCT(optv)) == NULL) {
return (NNG_ENOMEM);
}
@@ -1187,7 +1192,7 @@ nni_ctx_destroy(nni_ctx *ctx)
}
// Let the socket go, our hold on it is done.
- NNI_FREE_STRUCT(ctx);
+ nni_free(ctx, ctx->c_size);
}
void
@@ -1221,40 +1226,44 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
{
nni_ctx *ctx;
int rv;
+ size_t sz;
if (sock->s_ctx_ops.ctx_init == NULL) {
return (NNG_ENOTSUP);
}
- if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
+
+ sz = NNI_ALIGN_UP(sizeof(*ctx)) + sock->s_ctx_ops.ctx_size;
+ if ((ctx = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
+ ctx->c_size = sz;
+ ctx->c_data = ctx + 1;
+ ctx->c_closed = false;
+ ctx->c_refcnt = 1; // Caller implicitly gets a reference.
+ ctx->c_sock = sock;
+ ctx->c_ops = sock->s_ctx_ops;
+ ctx->c_rcvtimeo = sock->s_rcvtimeo;
+ ctx->c_sndtimeo = sock->s_sndtimeo;
nni_mtx_lock(&sock_lk);
if (sock->s_closed) {
nni_mtx_unlock(&sock_lk);
- NNI_FREE_STRUCT(ctx);
+ nni_free(ctx, ctx->c_size);
return (NNG_ECLOSED);
}
if ((rv = nni_idhash_alloc32(ctx_hash, &ctx->c_id, ctx)) != 0) {
nni_mtx_unlock(&sock_lk);
- NNI_FREE_STRUCT(ctx);
+ nni_free(ctx, ctx->c_size);
return (rv);
}
- if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) {
+ if ((rv = sock->s_ctx_ops.ctx_init(ctx->c_data, sock->s_data)) != 0) {
nni_idhash_remove(ctx_hash, ctx->c_id);
nni_mtx_unlock(&sock_lk);
- NNI_FREE_STRUCT(ctx);
+ nni_free(ctx, ctx->c_size);
return (rv);
}
- ctx->c_closed = false;
- ctx->c_refcnt = 1; // Caller implicitly gets a reference.
- ctx->c_sock = sock;
- ctx->c_ops = sock->s_ctx_ops;
- ctx->c_rcvtimeo = sock->s_rcvtimeo;
- ctx->c_sndtimeo = sock->s_sndtimeo;
-
nni_list_append(&sock->s_ctxs, ctx);
nni_mtx_unlock(&sock_lk);
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index ffe6c6e8..16596c63 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -113,6 +113,7 @@ struct nni_pipe {
uint32_t p_id;
nni_tran_pipe_ops p_tran_ops;
nni_proto_pipe_ops p_proto_ops;
+ size_t p_size;
void * p_tran_data;
void * p_proto_data;
nni_list_node p_sock_node;
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index aba0a04c..afb12ef6 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -10,7 +10,6 @@
#include <stdbool.h>
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/bus0/bus.h"
@@ -71,18 +70,14 @@ bus0_sock_fini(void *arg)
nni_aio_fini(s->aio_getq);
nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
}
static int
-bus0_sock_init(void **sp, nni_sock *nsock)
+bus0_sock_init(void *arg, nni_sock *nsock)
{
- bus0_sock *s;
+ bus0_sock *s = arg;
int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) {
@@ -93,19 +88,15 @@ bus0_sock_init(void **sp, nni_sock *nsock)
s->urq = nni_sock_recvq(nsock);
s->raw = false;
- *sp = s;
return (0);
}
static int
-bus0_sock_init_raw(void **sp, nni_sock *nsock)
+bus0_sock_init_raw(void *arg, nni_sock *nsock)
{
- bus0_sock *s;
+ bus0_sock *s = arg;
int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) {
@@ -116,7 +107,6 @@ bus0_sock_init_raw(void **sp, nni_sock *nsock)
s->urq = nni_sock_recvq(nsock);
s->raw = true;
- *sp = s;
return (0);
}
@@ -158,18 +148,14 @@ bus0_pipe_fini(void *arg)
nni_aio_fini(p->aio_putq);
nni_msgq_fini(p->sendq);
nni_mtx_fini(&p->mtx);
- NNI_FREE_STRUCT(p);
}
static int
-bus0_pipe_init(void **pp, nni_pipe *npipe, void *s)
+bus0_pipe_init(void *arg, nni_pipe *npipe, void *s)
{
- bus0_pipe *p;
+ bus0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
NNI_LIST_NODE_INIT(&p->node);
nni_mtx_init(&p->mtx);
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
@@ -183,7 +169,6 @@ bus0_pipe_init(void **pp, nni_pipe *npipe, void *s)
p->npipe = npipe;
p->psock = s;
- *pp = p;
return (0);
}
@@ -432,6 +417,7 @@ bus0_sock_recv(void *arg, nni_aio *aio)
}
static nni_proto_pipe_ops bus0_pipe_ops = {
+ .pipe_size = sizeof(bus0_pipe),
.pipe_init = bus0_pipe_init,
.pipe_fini = bus0_pipe_fini,
.pipe_start = bus0_pipe_start,
@@ -447,6 +433,7 @@ static nni_option bus0_sock_options[] = {
};
static nni_proto_sock_ops bus0_sock_ops = {
+ .sock_size = sizeof(bus0_sock),
.sock_init = bus0_sock_init,
.sock_fini = bus0_sock_fini,
.sock_open = bus0_sock_open,
@@ -457,6 +444,7 @@ static nni_proto_sock_ops bus0_sock_ops = {
};
static nni_proto_sock_ops bus0_sock_ops_raw = {
+ .sock_size = sizeof(bus0_sock),
.sock_init = bus0_sock_init_raw,
.sock_fini = bus0_sock_fini,
.sock_open = bus0_sock_open,
diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c
index 4e708a61..860ac17f 100644
--- a/src/protocol/pair0/pair.c
+++ b/src/protocol/pair0/pair.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -53,18 +53,14 @@ struct pair0_pipe {
};
static int
-pair0_sock_init(void **sp, nni_sock *nsock)
+pair0_sock_init(void *arg, nni_sock *nsock)
{
- pair0_sock *s;
+ pair0_sock *s = arg;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&s->mtx);
s->ppipe = NULL;
s->uwq = nni_sock_sendq(nsock);
s->urq = nni_sock_recvq(nsock);
- *sp = s;
return (0);
}
@@ -74,7 +70,6 @@ pair0_sock_fini(void *arg)
pair0_sock *s = arg;
nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
}
static void
@@ -97,18 +92,14 @@ pair0_pipe_fini(void *arg)
nni_aio_fini(p->aio_recv);
nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_getq);
- NNI_FREE_STRUCT(p);
}
static int
-pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+pair0_pipe_init(void *arg, nni_pipe *npipe, void *psock)
{
- pair0_pipe *p;
+ pair0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
@@ -119,7 +110,6 @@ pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
p->npipe = npipe;
p->psock = psock;
- *pp = p;
return (0);
}
@@ -262,6 +252,7 @@ pair0_sock_recv(void *arg, nni_aio *aio)
}
static nni_proto_pipe_ops pair0_pipe_ops = {
+ .pipe_size = sizeof(pair0_pipe),
.pipe_init = pair0_pipe_init,
.pipe_fini = pair0_pipe_fini,
.pipe_start = pair0_pipe_start,
@@ -277,6 +268,7 @@ static nni_option pair0_sock_options[] = {
};
static nni_proto_sock_ops pair0_sock_ops = {
+ .sock_size = sizeof(pair0_sock),
.sock_init = pair0_sock_init,
.sock_fini = pair0_sock_fini,
.sock_open = pair0_sock_open,
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index 051bc8f3..df24d77b 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -9,7 +9,6 @@
//
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/pair1/pair.h"
@@ -73,21 +72,15 @@ pair1_sock_fini(void *arg)
nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
-
- NNI_FREE_STRUCT(s);
}
static int
-pair1_sock_init_impl(void **sp, nni_sock *nsock, bool raw)
+pair1_sock_init_impl(void *arg, nni_sock *nsock, bool raw)
{
- pair1_sock *s;
+ pair1_sock *s = arg;
int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_idhash_init(&s->pipes)) != 0) {
- NNI_FREE_STRUCT(s);
+ if (nni_idhash_init(&s->pipes) != 0) {
return (NNG_ENOMEM);
}
NNI_LIST_INIT(&s->plist, pair1_pipe, node);
@@ -122,21 +115,20 @@ pair1_sock_init_impl(void **sp, nni_sock *nsock, bool raw)
s->uwq = nni_sock_sendq(nsock);
s->urq = nni_sock_recvq(nsock);
s->ttl = 8;
- *sp = s;
return (0);
}
static int
-pair1_sock_init(void **sp, nni_sock *nsock)
+pair1_sock_init(void *arg, nni_sock *nsock)
{
- return (pair1_sock_init_impl(sp, nsock, false));
+ return (pair1_sock_init_impl(arg, nsock, false));
}
static int
-pair1_sock_init_raw(void **sp, nni_sock *nsock)
+pair1_sock_init_raw(void *arg, nni_sock *nsock)
{
- return (pair1_sock_init_impl(sp, nsock, true));
+ return (pair1_sock_init_impl(arg, nsock, true));
}
static void
@@ -160,18 +152,14 @@ pair1_pipe_fini(void *arg)
nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_getq);
nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
}
static int
-pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+pair1_pipe_init(void *arg, nni_pipe *npipe, void *psock)
{
- pair1_pipe *p;
+ pair1_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
@@ -183,7 +171,6 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
p->npipe = npipe;
p->psock = psock;
- *pp = p;
return (rv);
}
@@ -512,6 +499,7 @@ pair1_sock_recv(void *arg, nni_aio *aio)
}
static nni_proto_pipe_ops pair1_pipe_ops = {
+ .pipe_size = sizeof(pair1_pipe),
.pipe_init = pair1_pipe_init,
.pipe_fini = pair1_pipe_fini,
.pipe_start = pair1_pipe_start,
@@ -537,6 +525,7 @@ static nni_option pair1_sock_options[] = {
};
static nni_proto_sock_ops pair1_sock_ops = {
+ .sock_size = sizeof(pair1_sock),
.sock_init = pair1_sock_init,
.sock_fini = pair1_sock_fini,
.sock_open = pair1_sock_open,
@@ -562,6 +551,7 @@ nng_pair1_open(nng_socket *sidp)
}
static nni_proto_sock_ops pair1_sock_ops_raw = {
+ .sock_size = sizeof(pair1_sock),
.sock_init = pair1_sock_init_raw,
.sock_fini = pair1_sock_fini,
.sock_open = pair1_sock_open,
diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c
index cc98c895..64b47cef 100644
--- a/src/protocol/pipeline0/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -9,7 +9,6 @@
//
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/pipeline0/pull.h"
@@ -46,25 +45,18 @@ struct pull0_pipe {
};
static int
-pull0_sock_init(void **sp, nni_sock *sock)
+pull0_sock_init(void *arg, nni_sock *sock)
{
- pull0_sock *s;
+ pull0_sock *s = arg;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
s->urq = nni_sock_recvq(sock);
-
- *sp = s;
return (0);
}
static void
pull0_sock_fini(void *arg)
{
- pull0_sock *s = arg;
-
- NNI_FREE_STRUCT(s);
+ NNI_ARG_UNUSED(arg);
}
static void
@@ -83,18 +75,14 @@ pull0_pipe_fini(void *arg)
nni_aio_fini(p->putq_aio);
nni_aio_fini(p->recv_aio);
- NNI_FREE_STRUCT(p);
}
static int
-pull0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+pull0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- pull0_pipe *p;
+ pull0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) ||
((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) {
pull0_pipe_fini(p);
@@ -103,7 +91,6 @@ pull0_pipe_init(void **pp, nni_pipe *pipe, void *s)
p->pipe = pipe;
p->pull = s;
- *pp = p;
return (0);
}
@@ -209,6 +196,7 @@ pull0_sock_recv(void *arg, nni_aio *aio)
}
static nni_proto_pipe_ops pull0_pipe_ops = {
+ .pipe_size = sizeof(pull0_pipe),
.pipe_init = pull0_pipe_init,
.pipe_fini = pull0_pipe_fini,
.pipe_start = pull0_pipe_start,
@@ -224,6 +212,7 @@ static nni_option pull0_sock_options[] = {
};
static nni_proto_sock_ops pull0_sock_ops = {
+ .sock_size = sizeof(pull0_sock),
.sock_init = pull0_sock_init,
.sock_fini = pull0_sock_fini,
.sock_open = pull0_sock_open,
diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c
index 1c0657a8..5a932ece 100644
--- a/src/protocol/pipeline0/push.c
+++ b/src/protocol/pipeline0/push.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -9,7 +9,6 @@
//
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/pipeline0/push.h"
@@ -50,24 +49,17 @@ struct push0_pipe {
};
static int
-push0_sock_init(void **sp, nni_sock *sock)
+push0_sock_init(void *arg, nni_sock *sock)
{
- push0_sock *s;
-
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
- s->uwq = nni_sock_sendq(sock);
- *sp = s;
+ push0_sock *s = arg;
+ s->uwq = nni_sock_sendq(sock);
return (0);
}
static void
push0_sock_fini(void *arg)
{
- push0_sock *s = arg;
-
- NNI_FREE_STRUCT(s);
+ NNI_ARG_UNUSED(arg);
}
static void
@@ -100,18 +92,14 @@ push0_pipe_fini(void *arg)
nni_aio_fini(p->aio_recv);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_getq);
- NNI_FREE_STRUCT(p);
}
static int
-push0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+push0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- push0_pipe *p;
+ push0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) {
@@ -121,7 +109,6 @@ push0_pipe_init(void **pp, nni_pipe *pipe, void *s)
NNI_LIST_NODE_INIT(&p->node);
p->pipe = pipe;
p->push = s;
- *pp = p;
return (0);
}
@@ -221,6 +208,7 @@ push0_sock_recv(void *arg, nni_aio *aio)
}
static nni_proto_pipe_ops push0_pipe_ops = {
+ .pipe_size = sizeof(push0_pipe),
.pipe_init = push0_pipe_init,
.pipe_fini = push0_pipe_fini,
.pipe_start = push0_pipe_start,
@@ -236,6 +224,7 @@ static nni_option push0_sock_options[] = {
};
static nni_proto_sock_ops push0_sock_ops = {
+ .sock_size = sizeof(push0_sock),
.sock_init = push0_sock_init,
.sock_fini = push0_sock_fini,
.sock_open = push0_sock_open,
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index c54274a6..22195c52 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -63,27 +63,21 @@ pub0_sock_fini(void *arg)
nni_pollable_free(s->sendable);
nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
}
static int
-pub0_sock_init(void **sp, nni_sock *nsock)
+pub0_sock_init(void *arg, nni_sock *nsock)
{
- pub0_sock *sock;
+ pub0_sock *sock = arg;
int rv;
NNI_ARG_UNUSED(nsock);
- if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
- return (NNG_ENOMEM);
- }
if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) {
- NNI_FREE_STRUCT(sock);
return (rv);
}
nni_mtx_init(&sock->mtx);
NNI_LIST_INIT(&sock->pipes, pub0_pipe, node);
sock->sendbuf = 16; // fairly arbitrary
- *sp = sock;
return (0);
}
@@ -120,21 +114,16 @@ pub0_pipe_fini(void *arg)
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
nni_lmq_fini(&p->sendq);
- NNI_FREE_STRUCT(p);
}
static int
-pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+pub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- pub0_pipe *p;
+ pub0_pipe *p = arg;
pub0_sock *sock = s;
int rv;
size_t len;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
-
nni_mtx_lock(&sock->mtx);
len = sock->sendbuf;
nni_mtx_unlock(&sock->mtx);
@@ -151,7 +140,6 @@ pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
p->busy = false;
p->pipe = pipe;
p->pub = s;
- *pp = p;
return (0);
}
@@ -352,6 +340,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t)
}
static nni_proto_pipe_ops pub0_pipe_ops = {
+ .pipe_size = sizeof (pub0_pipe),
.pipe_init = pub0_pipe_init,
.pipe_fini = pub0_pipe_fini,
.pipe_start = pub0_pipe_start,
@@ -376,6 +365,7 @@ static nni_option pub0_sock_options[] = {
};
static nni_proto_sock_ops pub0_sock_ops = {
+ .sock_size = sizeof (pub0_sock),
.sock_init = pub0_sock_init,
.sock_fini = pub0_sock_fini,
.sock_open = pub0_sock_open,
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index aca28f0c..0a367216 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Nathan Kent <nate@nkent.net>
//
@@ -9,9 +9,9 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
-#include <stdbool.h>
#include "core/nng_impl.h"
#include "nng/protocol/pubsub0/sub.h"
@@ -58,20 +58,12 @@ struct sub0_ctx {
bool closed;
nni_lmq lmq;
bool prefnew;
-
-#if 0
- nni_msg **recvq;
- size_t recvqcap;
- size_t recvqlen;
- size_t recvqget;
- size_t recvqput;
-#endif
};
// sub0_sock is our per-socket protocol private structure.
struct sub0_sock {
nni_pollable *recvable;
- sub0_ctx * ctx; // default context
+ sub0_ctx ctx; // default context
nni_list ctxs; // all contexts
size_t recvbuflen;
bool prefnew;
@@ -131,7 +123,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
(void) nni_lmq_getq(&ctx->lmq, &msg);
- if (nni_lmq_empty(&ctx->lmq) && (ctx == sock->ctx)) {
+ if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->ctx)) {
nni_pollable_clear(sock->recvable);
}
nni_aio_set_msg(aio, msg);
@@ -184,24 +176,19 @@ sub0_ctx_fini(void *arg)
}
nni_lmq_fini(&ctx->lmq);
- NNI_FREE_STRUCT(ctx);
}
static int
-sub0_ctx_init(void **ctxp, void *sarg)
+sub0_ctx_init(void *carg, void *sarg)
{
sub0_sock *sock = sarg;
- sub0_ctx * ctx;
+ sub0_ctx * ctx = carg;
size_t len;
bool prefnew;
int rv;
- if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
- return (NNG_ENOMEM);
- }
-
nni_mtx_lock(&sock->lk);
- len = sock->recvbuflen;
+ len = sock->recvbuflen;
prefnew = sock->prefnew;
if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) {
@@ -213,7 +200,6 @@ sub0_ctx_init(void **ctxp, void *sarg)
NNI_LIST_INIT(&ctx->topics, sub0_topic, node);
ctx->sock = sock;
- *ctxp = ctx;
nni_list_append(&sock->ctxs, ctx);
nni_mtx_unlock(&sock->lk);
@@ -226,39 +212,32 @@ sub0_sock_fini(void *arg)
{
sub0_sock *sock = arg;
- if (sock->ctx != NULL) {
- sub0_ctx_fini(sock->ctx);
- }
+ sub0_ctx_fini(&sock->ctx);
if (sock->recvable != NULL) {
nni_pollable_free(sock->recvable);
}
nni_mtx_fini(&sock->lk);
- NNI_FREE_STRUCT(sock);
}
static int
-sub0_sock_init(void **sp, nni_sock *nsock)
+sub0_sock_init(void *arg, nni_sock *nsock)
{
- sub0_sock *sock;
+ sub0_sock *sock = arg;
int rv;
NNI_ARG_UNUSED(nsock);
- if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
- return (NNG_ENOMEM);
- }
NNI_LIST_INIT(&sock->ctxs, sub0_ctx, node);
nni_mtx_init(&sock->lk);
sock->recvbuflen = SUB0_DEFAULT_QLEN;
- sock->prefnew = SUB0_DEFAULT_PREFNEW;
+ sock->prefnew = SUB0_DEFAULT_PREFNEW;
- if (((rv = sub0_ctx_init((void **) &sock->ctx, sock)) != 0) ||
+ if (((rv = sub0_ctx_init(&sock->ctx, sock)) != 0) ||
((rv = nni_pollable_alloc(&sock->recvable)) != 0)) {
sub0_sock_fini(sock);
return (rv);
}
- *sp = sock;
return (0);
}
@@ -272,7 +251,7 @@ static void
sub0_sock_close(void *arg)
{
sub0_sock *sock = arg;
- sub0_ctx_close(sock->ctx);
+ sub0_ctx_close(&sock->ctx);
}
static void
@@ -289,18 +268,14 @@ sub0_pipe_fini(void *arg)
sub0_pipe *p = arg;
nni_aio_fini(p->aio_recv);
- NNI_FREE_STRUCT(p);
}
static int
-sub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+sub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- sub0_pipe *p;
+ sub0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) {
sub0_pipe_fini(p);
return (rv);
@@ -308,7 +283,6 @@ sub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
p->pipe = pipe;
p->sub = s;
- *pp = p;
return (0);
}
@@ -404,12 +378,12 @@ sub0_recv_cb(void *arg)
continue; // TODO: Bump a stat!
}
- // If we got to this point, we are capable of receiving this message
- // and it is intended for us.
+ // If we got to this point, we are capable of receiving this
+ // message and it is intended for us.
submatch = true;
if (!nni_list_empty(&ctx->raios)) {
- nni_aio *aio = nni_list_first(&ctx->raios);
+ aio = nni_list_first(&ctx->raios);
nni_list_remove(&ctx->raios, aio);
nni_aio_set_msg(aio, dup);
@@ -417,7 +391,7 @@ sub0_recv_cb(void *arg)
nni_list_append(&finish, aio);
} else if (nni_lmq_full(&ctx->lmq)) {
// Make space for the new message.
- nni_msg * old;
+ nni_msg *old;
(void) nni_lmq_getq(&ctx->lmq, &old);
nni_msg_free(old);
@@ -478,7 +452,7 @@ sub0_ctx_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t)
// If we change the socket, then this will change the queue for
// any new contexts. (Previously constructed contexts are unaffected.)
- if (sock->ctx == ctx) {
+ if (&sock->ctx == ctx) {
sock->recvbuflen = (size_t) val;
}
nni_mtx_unlock(&sock->lk);
@@ -575,9 +549,9 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
- bool val;
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ bool val;
nni_mtx_lock(&sock->lk);
val = ctx->prefnew;
@@ -589,10 +563,10 @@ sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_ctx * ctx = arg;
- sub0_sock * sock = ctx->sock;
- bool val;
- int rv;
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ bool val;
+ int rv;
if ((rv = nni_copyin_bool(&val, buf, sz, t)) != 0) {
return (rv);
@@ -600,7 +574,7 @@ sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
nni_mtx_lock(&sock->lk);
ctx->prefnew = val;
- if (sock->ctx == ctx) {
+ if (&sock->ctx == ctx) {
sock->prefnew = val;
}
nni_mtx_unlock(&sock->lk);
@@ -646,7 +620,7 @@ sub0_sock_recv(void *arg, nni_aio *aio)
{
sub0_sock *sock = arg;
- sub0_ctx_recv(sock->ctx, aio);
+ sub0_ctx_recv(&sock->ctx, aio);
}
static int
@@ -666,47 +640,48 @@ static int
sub0_sock_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_get_recvbuf(sock->ctx, buf, szp, t));
+ return (sub0_ctx_get_recvbuf(&sock->ctx, buf, szp, t));
}
static int
sub0_sock_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_set_recvbuf(sock->ctx, buf, sz, t));
+ return (sub0_ctx_set_recvbuf(&sock->ctx, buf, sz, t));
}
static int
sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_subscribe(sock->ctx, buf, sz, t));
+ return (sub0_ctx_subscribe(&sock->ctx, buf, sz, t));
}
static int
sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_unsubscribe(sock->ctx, buf, sz, t));
+ return (sub0_ctx_unsubscribe(&sock->ctx, buf, sz, t));
}
static int
sub0_sock_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_get_prefnew(sock->ctx, buf, szp, t));
+ return (sub0_ctx_get_prefnew(&sock->ctx, buf, szp, t));
}
static int
sub0_sock_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_set_prefnew(sock->ctx, buf, sz, t));
+ return (sub0_ctx_set_prefnew(&sock->ctx, buf, sz, t));
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops sub0_pipe_ops = {
+ .pipe_size = sizeof(sub0_pipe),
.pipe_init = sub0_pipe_init,
.pipe_fini = sub0_pipe_fini,
.pipe_start = sub0_pipe_start,
@@ -715,6 +690,7 @@ static nni_proto_pipe_ops sub0_pipe_ops = {
};
static nni_proto_ctx_ops sub0_ctx_ops = {
+ .ctx_size = sizeof(sub0_ctx),
.ctx_init = sub0_ctx_init,
.ctx_fini = sub0_ctx_fini,
.ctx_send = sub0_ctx_send,
@@ -752,6 +728,7 @@ static nni_option sub0_sock_options[] = {
};
static nni_proto_sock_ops sub0_sock_ops = {
+ .sock_size = sizeof(sub0_sock),
.sock_init = sub0_sock_init,
.sock_fini = sub0_sock_fini,
.sock_open = sub0_sock_open,
diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c
index b334bf87..be300df4 100644
--- a/src/protocol/pubsub0/xsub.c
+++ b/src/protocol/pubsub0/xsub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -9,7 +9,6 @@
//
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/pubsub0/sub.h"
@@ -46,17 +45,11 @@ struct xsub0_pipe {
};
static int
-xsub0_sock_init(void **sp, nni_sock *sock)
+xsub0_sock_init(void *arg, nni_sock *sock)
{
- xsub0_sock *s;
-
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
- nni_mtx_init(&s->lk);
+ xsub0_sock *s = arg;
s->urq = nni_sock_recvq(sock);
- *sp = s;
return (0);
}
@@ -65,7 +58,6 @@ xsub0_sock_fini(void *arg)
{
xsub0_sock *s = arg;
nni_mtx_fini(&s->lk);
- NNI_FREE_STRUCT(s);
}
static void
@@ -94,18 +86,14 @@ xsub0_pipe_fini(void *arg)
xsub0_pipe *p = arg;
nni_aio_fini(p->aio_recv);
- NNI_FREE_STRUCT(p);
}
static int
-xsub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+xsub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- xsub0_pipe *p;
+ xsub0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if ((rv = nni_aio_init(&p->aio_recv, xsub0_recv_cb, p)) != 0) {
xsub0_pipe_fini(p);
return (rv);
@@ -113,7 +101,6 @@ xsub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
p->pipe = pipe;
p->sub = s;
- *pp = p;
return (0);
}
@@ -190,6 +177,7 @@ xsub0_sock_recv(void *arg, nni_aio *aio)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops xsub0_pipe_ops = {
+ .pipe_size = sizeof(xsub0_pipe),
.pipe_init = xsub0_pipe_init,
.pipe_fini = xsub0_pipe_fini,
.pipe_start = xsub0_pipe_start,
@@ -205,6 +193,7 @@ static nni_option xsub0_sock_options[] = {
};
static nni_proto_sock_ops xsub0_sock_ops = {
+ .sock_size = sizeof(xsub0_sock),
.sock_init = xsub0_sock_init,
.sock_fini = xsub0_sock_fini,
.sock_open = xsub0_sock_open,
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 142edb9b..328babbc 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -53,7 +53,7 @@ struct rep0_sock {
nni_idhash * pipes;
nni_list recvpipes; // list of pipes with data to receive
nni_list recvq;
- rep0_ctx * ctx;
+ rep0_ctx ctx;
nni_pollable *recvable;
nni_pollable *sendable;
};
@@ -99,25 +99,19 @@ rep0_ctx_fini(void *arg)
rep0_ctx *ctx = arg;
rep0_ctx_close(ctx);
- NNI_FREE_STRUCT(ctx);
}
static int
-rep0_ctx_init(void **ctxp, void *sarg)
+rep0_ctx_init(void *carg, void *sarg)
{
- rep0_sock *s = sarg;
- rep0_ctx * ctx;
-
- if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
- return (NNG_ENOMEM);
- }
+ rep0_sock *s = sarg;
+ rep0_ctx * ctx = carg;
NNI_LIST_NODE_INIT(&ctx->sqnode);
NNI_LIST_NODE_INIT(&ctx->rqnode);
ctx->btrace_len = 0;
ctx->sock = s;
ctx->pipe_id = 0;
- *ctxp = ctx;
return (0);
}
@@ -168,7 +162,7 @@ rep0_ctx_send(void *arg, nni_aio *aio)
ctx->btrace_len = 0;
ctx->pipe_id = 0;
- if (ctx == s->ctx) {
+ if (ctx == &s->ctx) {
// No matter how this goes, we will no longer be able
// to send on the socket (root context). That's because
// we will have finished (successfully or otherwise) the
@@ -225,26 +219,20 @@ rep0_sock_fini(void *arg)
rep0_sock *s = arg;
nni_idhash_fini(s->pipes);
- if (s->ctx != NULL) {
- rep0_ctx_fini(s->ctx);
- }
+ rep0_ctx_fini(&s->ctx);
nni_pollable_free(s->sendable);
nni_pollable_free(s->recvable);
nni_mtx_fini(&s->lk);
- NNI_FREE_STRUCT(s);
}
static int
-rep0_sock_init(void **sp, nni_sock *sock)
+rep0_sock_init(void *arg, nni_sock *sock)
{
- rep0_sock *s;
+ rep0_sock *s = arg;
int rv;
NNI_ARG_UNUSED(sock);
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&s->lk);
if ((rv = nni_idhash_init(&s->pipes)) != 0) {
rep0_sock_fini(s);
@@ -256,10 +244,7 @@ rep0_sock_init(void **sp, nni_sock *sock)
s->ttl = 8;
- if ((rv = rep0_ctx_init((void **) &s->ctx, s)) != 0) {
- rep0_sock_fini(s);
- return (rv);
- }
+ (void) rep0_ctx_init(&s->ctx, s);
// We start off without being either readable or pollable.
// Readability comes when there is something on the socket.
@@ -269,8 +254,6 @@ rep0_sock_init(void **sp, nni_sock *sock)
return (rv);
}
- *sp = s;
-
return (0);
}
@@ -285,7 +268,7 @@ rep0_sock_close(void *arg)
{
rep0_sock *s = arg;
- rep0_ctx_close(s->ctx);
+ rep0_ctx_close(&s->ctx);
}
static void
@@ -310,18 +293,14 @@ rep0_pipe_fini(void *arg)
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
- NNI_FREE_STRUCT(p);
}
static int
-rep0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+rep0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- rep0_pipe *p;
+ rep0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) {
rep0_pipe_fini(p);
@@ -333,7 +312,6 @@ rep0_pipe_init(void **pp, nni_pipe *pipe, void *s)
p->id = nni_pipe_id(pipe);
p->pipe = pipe;
p->rep = s;
- *pp = p;
return (0);
}
@@ -386,7 +364,7 @@ rep0_pipe_close(void *arg)
nni_aio_finish(aio, 0, nni_msg_len(msg));
nni_msg_free(msg);
}
- if (p->id == s->ctx->pipe_id) {
+ if (p->id == s->ctx.pipe_id) {
// We "can" send. (Well, not really, but we will happily
// accept a message and discard it.)
nni_pollable_raise(s->sendable);
@@ -415,7 +393,7 @@ rep0_pipe_send_cb(void *arg)
p->busy = false;
if ((ctx = nni_list_first(&p->sendq)) == NULL) {
// Nothing else to send.
- if (p->id == s->ctx->pipe_id) {
+ if (p->id == s->ctx.pipe_id) {
// Mark us ready for the other side to send!
nni_pollable_raise(s->sendable);
}
@@ -494,7 +472,7 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
nni_pollable_clear(s->recvable);
}
nni_pipe_recv(p->pipe, p->aio_recv);
- if ((ctx == s->ctx) && !p->busy) {
+ if ((ctx == &s->ctx) && !p->busy) {
nni_pollable_raise(s->sendable);
}
@@ -578,7 +556,7 @@ rep0_pipe_recv_cb(void *arg)
aio = ctx->raio;
ctx->raio = NULL;
nni_aio_set_msg(p->aio_recv, NULL);
- if ((ctx == s->ctx) && !p->busy) {
+ if ((ctx == &s->ctx) && !p->busy) {
nni_pollable_raise(s->sendable);
}
@@ -650,7 +628,7 @@ rep0_sock_send(void *arg, nni_aio *aio)
{
rep0_sock *s = arg;
- rep0_ctx_send(s->ctx, aio);
+ rep0_ctx_send(&s->ctx, aio);
}
static void
@@ -658,12 +636,13 @@ rep0_sock_recv(void *arg, nni_aio *aio)
{
rep0_sock *s = arg;
- rep0_ctx_recv(s->ctx, aio);
+ rep0_ctx_recv(&s->ctx, aio);
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops rep0_pipe_ops = {
+ .pipe_size = sizeof(rep0_pipe),
.pipe_init = rep0_pipe_init,
.pipe_fini = rep0_pipe_fini,
.pipe_start = rep0_pipe_start,
@@ -672,6 +651,7 @@ static nni_proto_pipe_ops rep0_pipe_ops = {
};
static nni_proto_ctx_ops rep0_ctx_ops = {
+ .ctx_size = sizeof(rep0_ctx),
.ctx_init = rep0_ctx_init,
.ctx_fini = rep0_ctx_fini,
.ctx_send = rep0_ctx_send,
@@ -699,6 +679,7 @@ static nni_option rep0_sock_options[] = {
};
static nni_proto_sock_ops rep0_sock_ops = {
+ .sock_size = sizeof(rep0_sock),
.sock_init = rep0_sock_init,
.sock_fini = rep0_sock_fini,
.sock_open = rep0_sock_open,
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 010ee8a6..4326f411 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -10,7 +10,6 @@
#include <stdio.h>
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/reqrep0/req.h"
@@ -35,7 +34,7 @@ static void req0_ctx_reset(req0_ctx *);
static void req0_ctx_timeout(void *);
static void req0_pipe_fini(void *);
static void req0_ctx_fini(void *);
-static int req0_ctx_init(void **, void *);
+static int req0_ctx_init(void *, void *);
// A req0_ctx is a "context" for the request. It uses most of the
// socket, but keeps track of its own outstanding replays, the request ID,
@@ -57,24 +56,19 @@ struct req0_ctx {
// A req0_sock is our per-socket protocol private structure.
struct req0_sock {
- nni_sock * nsock;
- nni_duration retry;
- bool closed;
- int ttl;
-
- req0_ctx *ctx; // base socket ctx
-
- nni_list readypipes;
- nni_list busypipes;
- nni_list stoppipes;
- nni_list ctxs;
-
+ nni_duration retry;
+ bool closed;
+ int ttl;
+ req0_ctx ctx; // base socket ctx
+ nni_list readypipes;
+ nni_list busypipes;
+ nni_list stoppipes;
+ nni_list ctxs;
nni_list sendq; // contexts waiting to send.
nni_idhash * reqids; // contexts by request ID
nni_pollable *recvable;
nni_pollable *sendable;
-
- nni_mtx mtx;
+ nni_mtx mtx;
};
// A req0_pipe is our per-pipe protocol private structure.
@@ -93,16 +87,14 @@ static void req0_send_cb(void *);
static void req0_recv_cb(void *);
static int
-req0_sock_init(void **sp, nni_sock *sock)
+req0_sock_init(void *arg, nni_sock *sock)
{
- req0_sock *s;
+ req0_sock *s = arg;
int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
+ NNI_ARG_UNUSED(sock);
+
if ((rv = nni_idhash_init(&s->reqids)) != 0) {
- NNI_FREE_STRUCT(s);
return (rv);
}
@@ -121,13 +113,10 @@ req0_sock_init(void **sp, nni_sock *sock)
NNI_LIST_INIT(&s->ctxs, req0_ctx, snode);
// this is "semi random" start for request IDs.
- s->nsock = sock;
s->retry = NNI_SECOND * 60;
- if ((rv = req0_ctx_init((void **) &s->ctx, s)) != 0) {
- req0_sock_fini(s);
- return (rv);
- }
+ (void) req0_ctx_init(&s->ctx, s);
+
if (((rv = nni_pollable_alloc(&s->sendable)) != 0) ||
((rv = nni_pollable_alloc(&s->recvable)) != 0)) {
req0_sock_fini(s);
@@ -135,8 +124,6 @@ req0_sock_init(void **sp, nni_sock *sock)
}
s->ttl = 8;
- *sp = s;
-
return (0);
}
@@ -174,14 +161,12 @@ req0_sock_fini(void *arg)
NNI_ASSERT(nni_list_empty(&s->stoppipes));
NNI_ASSERT(nni_list_empty(&s->readypipes));
nni_mtx_unlock(&s->mtx);
- if (s->ctx) {
- req0_ctx_fini(s->ctx);
- }
+
+ req0_ctx_fini(&s->ctx);
nni_pollable_free(s->recvable);
nni_pollable_free(s->sendable);
nni_idhash_fini(s->reqids);
nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
}
static void
@@ -204,18 +189,14 @@ req0_pipe_fini(void *arg)
nni_aio_fini(p->aio_recv);
nni_aio_fini(p->aio_send);
- NNI_FREE_STRUCT(p);
}
static int
-req0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+req0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- req0_pipe *p;
+ req0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if (((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_send, req0_send_cb, p)) != 0)) {
req0_pipe_fini(p);
@@ -226,7 +207,6 @@ req0_pipe_init(void **pp, nni_pipe *pipe, void *s)
NNI_LIST_INIT(&p->ctxs, req0_ctx, pnode);
p->pipe = pipe;
p->req = s;
- *pp = p;
return (0);
}
@@ -398,7 +378,7 @@ req0_recv_cb(void *arg)
} else {
// No AIO, so stash msg. Receive will pick it up later.
ctx->repmsg = msg;
- if (ctx == s->ctx) {
+ if (ctx == &s->ctx) {
nni_pollable_raise(s->recvable);
}
nni_mtx_unlock(&s->mtx);
@@ -427,14 +407,10 @@ req0_ctx_timeout(void *arg)
}
static int
-req0_ctx_init(void **cpp, void *sarg)
+req0_ctx_init(void *carg, void *sarg)
{
- req0_sock *s = sarg;
- req0_ctx * ctx;
-
- if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
- return (NNG_ENOMEM);
- }
+ req0_sock *s = sarg;
+ req0_ctx * ctx = carg;
nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx);
@@ -445,7 +421,6 @@ req0_ctx_init(void **cpp, void *sarg)
nni_list_append(&s->ctxs, ctx);
nni_mtx_unlock(&s->mtx);
- *cpp = ctx;
return (0);
}
@@ -473,8 +448,6 @@ req0_ctx_fini(void *arg)
nni_timer_cancel(&ctx->timer);
nni_timer_fini(&ctx->timer);
-
- NNI_FREE_STRUCT(ctx);
}
static int
@@ -547,7 +520,7 @@ req0_run_sendq(req0_sock *s, nni_list *aiolist)
} else {
nni_aio_finish(aio, 0, 0);
}
- if (ctx == s->ctx) {
+ if (ctx == &s->ctx) {
if (nni_list_empty(&s->readypipes)) {
nni_pollable_clear(s->sendable);
} else {
@@ -662,7 +635,7 @@ req0_ctx_recv(void *arg, nni_aio *aio)
// We have got a message to pass up, yay!
nni_aio_set_msg(aio, msg);
- if (ctx == s->ctx) {
+ if (ctx == &s->ctx) {
nni_pollable_clear(s->recvable);
}
nni_mtx_unlock(&s->mtx);
@@ -779,14 +752,14 @@ static void
req0_sock_send(void *arg, nni_aio *aio)
{
req0_sock *s = arg;
- req0_ctx_send(s->ctx, aio);
+ req0_ctx_send(&s->ctx, aio);
}
static void
req0_sock_recv(void *arg, nni_aio *aio)
{
req0_sock *s = arg;
- req0_ctx_recv(s->ctx, aio);
+ req0_ctx_recv(&s->ctx, aio);
}
static int
@@ -808,8 +781,8 @@ req0_sock_set_resendtime(void *arg, const void *buf, size_t sz, nni_opt_type t)
{
req0_sock *s = arg;
int rv;
- rv = req0_ctx_set_resendtime(s->ctx, buf, sz, t);
- s->retry = s->ctx->retry;
+ rv = req0_ctx_set_resendtime(&s->ctx, buf, sz, t);
+ s->retry = s->ctx.retry;
return (rv);
}
@@ -817,7 +790,7 @@ static int
req0_sock_get_resendtime(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
req0_sock *s = arg;
- return (req0_ctx_get_resendtime(s->ctx, buf, szp, t));
+ return (req0_ctx_get_resendtime(&s->ctx, buf, szp, t));
}
static int
@@ -848,6 +821,7 @@ req0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
}
static nni_proto_pipe_ops req0_pipe_ops = {
+ .pipe_size = sizeof(req0_pipe),
.pipe_init = req0_pipe_init,
.pipe_fini = req0_pipe_fini,
.pipe_start = req0_pipe_start,
@@ -867,6 +841,7 @@ static nni_option req0_ctx_options[] = {
};
static nni_proto_ctx_ops req0_ctx_ops = {
+ .ctx_size = sizeof(req0_ctx),
.ctx_init = req0_ctx_init,
.ctx_fini = req0_ctx_fini,
.ctx_recv = req0_ctx_recv,
@@ -900,6 +875,7 @@ static nni_option req0_sock_options[] = {
};
static nni_proto_sock_ops req0_sock_ops = {
+ .sock_size = sizeof(req0_sock),
.sock_init = req0_sock_init,
.sock_fini = req0_sock_fini,
.sock_open = req0_sock_open,
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index 09c11cda..48f74075 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -65,18 +65,14 @@ xrep0_sock_fini(void *arg)
nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->lk);
- NNI_FREE_STRUCT(s);
}
static int
-xrep0_sock_init(void **sp, nni_sock *sock)
+xrep0_sock_init(void *arg, nni_sock *sock)
{
- xrep0_sock *s;
+ xrep0_sock *s = arg;
int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&s->lk);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
((rv = nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) {
@@ -88,8 +84,6 @@ xrep0_sock_init(void **sp, nni_sock *sock)
s->uwq = nni_sock_sendq(sock);
s->urq = nni_sock_recvq(sock);
- *sp = s;
-
return (0);
}
@@ -131,20 +125,15 @@ xrep0_pipe_fini(void *arg)
nni_aio_fini(p->aio_recv);
nni_aio_fini(p->aio_putq);
nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
}
static int
-xrep0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+xrep0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- xrep0_pipe *p;
+ xrep0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- // We want a pretty deep sendq on pipes. The rationale here is
+ // We want a pretty deep send queue on pipes. The rationale here is
// that the send rate will be mitigated by the receive rate.
// If a slow pipe (req pipe not reading its own responses!?)
// comes up, then we will start discarding its replies eventually,
@@ -152,7 +141,7 @@ xrep0_pipe_init(void **pp, nni_pipe *pipe, void *s)
// smash us with requests, but be unable to handle replies faster
// than we can forward them. If they do that, their replies get
// dropped. (From a DDoS perspective, it might be nice in the
- // future if we had a way to exert backpressure to the send side --
+ // future if we had a way to exert back pressure to the send side --
// essentially don't let peers send requests faster than they are
// willing to receive replies. Something to think about for the
// future.)
@@ -167,7 +156,6 @@ xrep0_pipe_init(void **pp, nni_pipe *pipe, void *s)
p->pipe = pipe;
p->rep = s;
- *pp = p;
return (0);
}
@@ -331,7 +319,7 @@ xrep0_pipe_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = ((body[0] & 0x80) != 0);
+ end = ((body[0] & 0x80u) != 0);
if (nni_msg_header_append(msg, body, 4) != 0) {
// Out of memory most likely, but keep going to
// avoid breaking things.
@@ -401,6 +389,7 @@ xrep0_sock_recv(void *arg, nni_aio *aio)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops xrep0_pipe_ops = {
+ .pipe_size = sizeof(xrep0_pipe),
.pipe_init = xrep0_pipe_init,
.pipe_fini = xrep0_pipe_fini,
.pipe_start = xrep0_pipe_start,
@@ -421,6 +410,7 @@ static nni_option xrep0_sock_options[] = {
};
static nni_proto_sock_ops xrep0_sock_ops = {
+ .sock_size = sizeof(xrep0_sock),
.sock_init = xrep0_sock_init,
.sock_fini = xrep0_sock_fini,
.sock_open = xrep0_sock_open,
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
index 119b2449..7455c986 100644
--- a/src/protocol/reqrep0/xreq.c
+++ b/src/protocol/reqrep0/xreq.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -9,8 +9,6 @@
//
#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/reqrep0/req.h"
@@ -53,18 +51,13 @@ static void xreq0_recv_cb(void *);
static void xreq0_putq_cb(void *);
static int
-xreq0_sock_init(void **sp, nni_sock *sock)
+xreq0_sock_init(void *arg, nni_sock *sock)
{
- xreq0_sock *s;
-
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
+ xreq0_sock *s = arg;
s->ttl = 8;
s->uwq = nni_sock_sendq(sock);
s->urq = nni_sock_recvq(sock);
- *sp = s;
return (0);
}
@@ -84,9 +77,7 @@ xreq0_sock_close(void *arg)
static void
xreq0_sock_fini(void *arg)
{
- xreq0_sock *s = arg;
-
- NNI_FREE_STRUCT(s);
+ NNI_ARG_UNUSED(arg);
}
static void
@@ -109,18 +100,14 @@ xreq0_pipe_fini(void *arg)
nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_recv);
nni_aio_fini(p->aio_send);
- NNI_FREE_STRUCT(p);
}
static int
-xreq0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+xreq0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- xreq0_pipe *p;
+ xreq0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if (((rv = nni_aio_init(&p->aio_getq, xreq0_getq_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_putq, xreq0_putq_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_recv, xreq0_recv_cb, p)) != 0) ||
@@ -131,7 +118,6 @@ xreq0_pipe_init(void **pp, nni_pipe *pipe, void *s)
p->pipe = pipe;
p->req = s;
- *pp = p;
return (0);
}
@@ -282,6 +268,7 @@ xreq0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
}
static nni_proto_pipe_ops xreq0_pipe_ops = {
+ .pipe_size = sizeof(xreq0_pipe),
.pipe_init = xreq0_pipe_init,
.pipe_fini = xreq0_pipe_fini,
.pipe_start = xreq0_pipe_start,
@@ -302,6 +289,7 @@ static nni_option xreq0_sock_options[] = {
};
static nni_proto_sock_ops xreq0_sock_ops = {
+ .sock_size = sizeof(xreq0_sock),
.sock_init = xreq0_sock_init,
.sock_fini = xreq0_sock_fini,
.sock_open = xreq0_sock_open,
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index bb457bdb..ccd25242 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -36,15 +36,14 @@ static void resp0_pipe_fini(void *);
struct resp0_ctx {
resp0_sock * sock;
- char * btrace;
- size_t btrace_len;
- size_t btrace_size;
uint32_t pipe_id;
resp0_pipe * spipe; // send pipe
nni_aio * saio; // send aio
nni_aio * raio; // recv aio
nni_list_node sqnode;
nni_list_node rqnode;
+ size_t btrace_len;
+ uint32_t btrace[256];
};
// resp0_sock is our per-socket protocol private structure.
@@ -52,7 +51,7 @@ struct resp0_sock {
nni_mtx mtx;
int ttl;
nni_idhash * pipes;
- resp0_ctx * ctx;
+ resp0_ctx ctx;
nni_list recvpipes;
nni_list recvq;
nni_pollable *recvable;
@@ -102,32 +101,19 @@ resp0_ctx_fini(void *arg)
resp0_ctx *ctx = arg;
resp0_ctx_close(ctx);
- nni_free(ctx->btrace, ctx->btrace_size);
- NNI_FREE_STRUCT(ctx);
}
static int
-resp0_ctx_init(void **ctxp, void *sarg)
+resp0_ctx_init(void *carg, void *sarg)
{
- resp0_sock *s = sarg;
- resp0_ctx * ctx;
+ resp0_sock *s = sarg;
+ resp0_ctx * ctx = carg;
- if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- // this is 1kB, which covers the worst case.
- ctx->btrace_size = 256 * sizeof(uint32_t);
- if ((ctx->btrace = nni_alloc(ctx->btrace_size)) == NULL) {
- NNI_FREE_STRUCT(ctx);
- return (NNG_ENOMEM);
- }
NNI_LIST_NODE_INIT(&ctx->sqnode);
NNI_LIST_NODE_INIT(&ctx->rqnode);
ctx->btrace_len = 0;
ctx->sock = s;
ctx->pipe_id = 0;
- *ctxp = ctx;
return (0);
}
@@ -167,7 +153,7 @@ resp0_ctx_send(void *arg, nni_aio *aio)
msg = nni_aio_get_msg(aio);
nni_msg_header_clear(msg);
- if (ctx == s->ctx) {
+ if (ctx == &s->ctx) {
// We can't send anymore, because only one send per request.
nni_pollable_clear(s->sendable);
}
@@ -228,26 +214,20 @@ resp0_sock_fini(void *arg)
resp0_sock *s = arg;
nni_idhash_fini(s->pipes);
- if (s->ctx != NULL) {
- resp0_ctx_fini(s->ctx);
- }
+ resp0_ctx_fini(&s->ctx);
nni_pollable_free(s->sendable);
nni_pollable_free(s->recvable);
nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
}
static int
-resp0_sock_init(void **sp, nni_sock *nsock)
+resp0_sock_init(void *arg, nni_sock *nsock)
{
- resp0_sock *s;
+ resp0_sock *s = arg;
int rv;
NNI_ARG_UNUSED(nsock);
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&s->mtx);
if ((rv = nni_idhash_init(&s->pipes)) != 0) {
resp0_sock_fini(s);
@@ -259,10 +239,7 @@ resp0_sock_init(void **sp, nni_sock *nsock)
s->ttl = 8; // Per RFC
- if ((rv = resp0_ctx_init((void **) &s->ctx, s)) != 0) {
- resp0_ctx_fini(s);
- return (rv);
- }
+ (void) resp0_ctx_init(&s->ctx, s);
// We start off without being either readable or pollable.
// Readability comes when there is something on the socket.
@@ -271,7 +248,6 @@ resp0_sock_init(void **sp, nni_sock *nsock)
resp0_sock_fini(s);
return (rv);
}
- *sp = s;
return (0);
}
@@ -286,7 +262,7 @@ resp0_sock_close(void *arg)
{
resp0_sock *s = arg;
- resp0_ctx_close(s->ctx);
+ resp0_ctx_close(&s->ctx);
}
static void
@@ -310,18 +286,14 @@ resp0_pipe_fini(void *arg)
}
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
- NNI_FREE_STRUCT(p);
}
static int
-resp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
+resp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
{
- resp0_pipe *p;
+ resp0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if (((rv = nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) {
resp0_pipe_fini(p);
@@ -335,7 +307,6 @@ resp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
p->busy = false;
p->id = nni_pipe_id(npipe);
- *pp = p;
return (0);
}
@@ -383,7 +354,7 @@ resp0_pipe_close(void *arg)
nni_aio_finish(aio, 0, nni_msg_len(msg));
nni_msg_free(msg);
}
- if (p->id == s->ctx->pipe_id) {
+ if (p->id == s->ctx.pipe_id) {
// Make sure user space knows they can send a message to us,
// which we will happily discard.
nni_pollable_raise(s->sendable);
@@ -412,7 +383,7 @@ resp0_pipe_send_cb(void *arg)
p->busy = false;
if ((ctx = nni_list_first(&p->sendq)) == NULL) {
// Nothing else to send.
- if (p->id == s->ctx->pipe_id) {
+ if (p->id == s->ctx.pipe_id) {
// Mark us ready for the other side to send!
nni_pollable_raise(s->sendable);
}
@@ -496,7 +467,7 @@ resp0_ctx_recv(void *arg, nni_aio *aio)
memcpy(ctx->btrace, nni_msg_header(msg), len);
ctx->btrace_len = len;
ctx->pipe_id = p->id;
- if (ctx == s->ctx) {
+ if (ctx == &s->ctx) {
nni_pollable_raise(s->sendable);
}
nni_mtx_unlock(&s->mtx);
@@ -543,7 +514,7 @@ resp0_pipe_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = ((body[0] & 0x80) != 0);
+ end = ((body[0] & 0x80u) != 0);
if (nni_msg_header_append(msg, body, 4) != 0) {
goto drop;
}
@@ -577,7 +548,7 @@ resp0_pipe_recv_cb(void *arg)
nni_msg_header_clear(msg);
ctx->pipe_id = p->id;
- if ((ctx == s->ctx) && (!p->busy)) {
+ if ((ctx == &s->ctx) && (!p->busy)) {
nni_pollable_raise(s->sendable);
}
nni_mtx_unlock(&s->mtx);
@@ -637,7 +608,7 @@ resp0_sock_send(void *arg, nni_aio *aio)
{
resp0_sock *s = arg;
- resp0_ctx_send(s->ctx, aio);
+ resp0_ctx_send(&s->ctx, aio);
}
static void
@@ -645,10 +616,11 @@ resp0_sock_recv(void *arg, nni_aio *aio)
{
resp0_sock *s = arg;
- resp0_ctx_recv(s->ctx, aio);
+ resp0_ctx_recv(&s->ctx, aio);
}
static nni_proto_pipe_ops resp0_pipe_ops = {
+ .pipe_size = sizeof(resp0_pipe),
.pipe_init = resp0_pipe_init,
.pipe_fini = resp0_pipe_fini,
.pipe_start = resp0_pipe_start,
@@ -657,6 +629,7 @@ static nni_proto_pipe_ops resp0_pipe_ops = {
};
static nni_proto_ctx_ops resp0_ctx_ops = {
+ .ctx_size = sizeof(resp0_ctx),
.ctx_init = resp0_ctx_init,
.ctx_fini = resp0_ctx_fini,
.ctx_send = resp0_ctx_send,
@@ -686,6 +659,7 @@ static nni_option resp0_sock_options[] = {
};
static nni_proto_sock_ops resp0_sock_ops = {
+ .sock_size = sizeof(resp0_sock),
.sock_init = resp0_sock_init,
.sock_fini = resp0_sock_fini,
.sock_open = resp0_sock_open,
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index 3ecc1457..be0ee55e 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -9,7 +9,6 @@
//
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/survey0/survey.h"
@@ -51,7 +50,7 @@ struct surv0_sock {
int ttl;
nni_list pipes;
nni_mtx mtx;
- surv0_ctx * ctx;
+ surv0_ctx ctx;
nni_idhash * surveys;
nni_pollable *sendable;
};
@@ -77,27 +76,21 @@ surv0_ctx_fini(void *arg)
nni_msgq_fini(ctx->rq);
}
nni_timer_cancel(&ctx->timer);
- NNI_FREE_STRUCT(ctx);
}
static int
-surv0_ctx_init(void **ctxp, void *sarg)
+surv0_ctx_init(void *carg, void *sarg)
{
- surv0_ctx * ctx;
+ surv0_ctx * ctx = carg;
surv0_sock *sock = sarg;
int rv;
- if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_lock(&sock->mtx);
- if (sock->ctx != NULL) {
- ctx->survtime = sock->ctx->survtime;
- }
+ ctx->survtime = sock->ctx.survtime;
nni_mtx_unlock(&sock->mtx);
ctx->sock = sock;
// 126 is a deep enough queue, and leaves 2 extra cells for the
- // pushback bit in msgqs. This can result in up to 1kB of allocation
+ // push back bit. This can result in up to 1kB of allocation
// for the message queue.
if ((rv = nni_msgq_init(&ctx->rq, 126)) != 0) {
surv0_ctx_fini(ctx);
@@ -105,7 +98,6 @@ surv0_ctx_init(void **ctxp, void *sarg)
}
nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx);
- *ctxp = ctx;
return (0);
}
@@ -227,31 +219,25 @@ surv0_sock_fini(void *arg)
{
surv0_sock *sock = arg;
- if (sock->ctx != NULL) {
- surv0_ctx_fini(sock->ctx);
- }
+ surv0_ctx_fini(&sock->ctx);
nni_idhash_fini(sock->surveys);
nni_pollable_free(sock->sendable);
nni_mtx_fini(&sock->mtx);
- NNI_FREE_STRUCT(sock);
}
static int
-surv0_sock_init(void **sp, nni_sock *nsock)
+surv0_sock_init(void *arg, nni_sock *nsock)
{
- surv0_sock *sock;
+ surv0_sock *sock = arg;
int rv;
NNI_ARG_UNUSED(nsock);
- if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
- return (NNG_ENOMEM);
- }
NNI_LIST_INIT(&sock->pipes, surv0_pipe, node);
nni_mtx_init(&sock->mtx);
if (((rv = nni_idhash_init(&sock->surveys)) != 0) ||
- ((rv = surv0_ctx_init((void **) &sock->ctx, sock)) != 0)) {
+ ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0)) {
surv0_sock_fini(sock);
return (rv);
}
@@ -262,10 +248,9 @@ surv0_sock_init(void **sp, nni_sock *nsock)
nni_idhash_set_limits(sock->surveys, 0x80000000u, 0xffffffffu,
nni_random() | 0x80000000u);
- sock->ctx->survtime = NNI_SECOND;
+ sock->ctx.survtime = NNI_SECOND;
sock->ttl = 8;
- *sp = sock;
return (0);
}
@@ -280,7 +265,7 @@ surv0_sock_close(void *arg)
{
surv0_sock *s = arg;
- nni_msgq_close(s->ctx->rq);
+ nni_msgq_close(s->ctx.rq);
}
static void
@@ -302,18 +287,14 @@ surv0_pipe_fini(void *arg)
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
}
static int
-surv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
+surv0_pipe_init(void *arg, nni_pipe *npipe, void *s)
{
- surv0_pipe *p;
+ surv0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
// This depth could be tunable. The deeper the queue, the more
// concurrent surveys that can be delivered. Having said that, this
// is best effort, and a deep queue doesn't really do much for us.
@@ -328,7 +309,6 @@ surv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
p->npipe = npipe;
p->sock = s;
- *pp = p;
return (0);
}
@@ -481,14 +461,14 @@ surv0_sock_set_surveytime(
void *arg, const void *buf, size_t sz, nni_opt_type t)
{
surv0_sock *s = arg;
- return (surv0_ctx_set_surveytime(s->ctx, buf, sz, t));
+ return (surv0_ctx_set_surveytime(&s->ctx, buf, sz, t));
}
static int
surv0_sock_get_surveytime(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
surv0_sock *s = arg;
- return (surv0_ctx_get_surveytime(s->ctx, buf, szp, t));
+ return (surv0_ctx_get_surveytime(&s->ctx, buf, szp, t));
}
static int
@@ -522,7 +502,7 @@ surv0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
int rv;
int fd;
- if (((rv = nni_msgq_get_recvable(sock->ctx->rq, &recvable)) != 0) ||
+ if (((rv = nni_msgq_get_recvable(sock->ctx.rq, &recvable)) != 0) ||
((rv = nni_pollable_getfd(recvable, &fd)) != 0)) {
return (rv);
}
@@ -533,17 +513,18 @@ static void
surv0_sock_recv(void *arg, nni_aio *aio)
{
surv0_sock *s = arg;
- surv0_ctx_recv(s->ctx, aio);
+ surv0_ctx_recv(&s->ctx, aio);
}
static void
surv0_sock_send(void *arg, nni_aio *aio)
{
surv0_sock *s = arg;
- surv0_ctx_send(s->ctx, aio);
+ surv0_ctx_send(&s->ctx, aio);
}
static nni_proto_pipe_ops surv0_pipe_ops = {
+ .pipe_size = sizeof(surv0_pipe),
.pipe_init = surv0_pipe_init,
.pipe_fini = surv0_pipe_fini,
.pipe_start = surv0_pipe_start,
@@ -562,6 +543,7 @@ static nni_option surv0_ctx_options[] = {
}
};
static nni_proto_ctx_ops surv0_ctx_ops = {
+ .ctx_size = sizeof(surv0_ctx),
.ctx_init = surv0_ctx_init,
.ctx_fini = surv0_ctx_fini,
.ctx_send = surv0_ctx_send,
@@ -595,6 +577,7 @@ static nni_option surv0_sock_options[] = {
};
static nni_proto_sock_ops surv0_sock_ops = {
+ .sock_size = sizeof(surv0_sock),
.sock_init = surv0_sock_init,
.sock_fini = surv0_sock_fini,
.sock_open = surv0_sock_open,
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
index 865a94f3..66b340ee 100644
--- a/src/protocol/survey0/xrespond.c
+++ b/src/protocol/survey0/xrespond.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -9,7 +9,6 @@
//
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/survey0/respond.h"
@@ -66,18 +65,14 @@ xresp0_sock_fini(void *arg)
nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
}
static int
-xresp0_sock_init(void **sp, nni_sock *nsock)
+xresp0_sock_init(void *arg, nni_sock *nsock)
{
- xresp0_sock *s;
+ xresp0_sock *s = arg;
int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&s->mtx);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
((rv = nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) {
@@ -89,7 +84,6 @@ xresp0_sock_init(void **sp, nni_sock *nsock)
s->urq = nni_sock_recvq(nsock);
s->uwq = nni_sock_sendq(nsock);
- *sp = s;
return (0);
}
@@ -130,18 +124,14 @@ xresp0_pipe_fini(void *arg)
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
}
static int
-xresp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
+xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
{
- xresp0_pipe *p;
+ xresp0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
((rv = nni_aio_init(&p->aio_putq, xresp0_putq_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_recv, xresp0_recv_cb, p)) != 0) ||
@@ -153,7 +143,6 @@ xresp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
p->npipe = npipe;
p->psock = s;
- *pp = p;
return (0);
}
@@ -298,7 +287,7 @@ xresp0_recv_cb(void *arg)
// Move backtrace from body to header
hops = 1;
for (;;) {
- bool end = false;
+ bool end;
uint8_t *body;
if (hops > s->ttl) {
@@ -312,7 +301,7 @@ xresp0_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = ((body[0] & 0x80) != 0);
+ end = ((body[0] & 0x80u) != 0);
if (nni_msg_header_append(msg, body, 4) != 0) {
goto drop;
}
@@ -378,6 +367,7 @@ xresp0_sock_recv(void *arg, nni_aio *aio)
}
static nni_proto_pipe_ops xresp0_pipe_ops = {
+ .pipe_size = sizeof(xresp0_pipe),
.pipe_init = xresp0_pipe_init,
.pipe_fini = xresp0_pipe_fini,
.pipe_start = xresp0_pipe_start,
@@ -398,6 +388,7 @@ static nni_option xresp0_sock_options[] = {
};
static nni_proto_sock_ops xresp0_sock_ops = {
+ .sock_size = sizeof(xresp0_sock),
.sock_init = xresp0_sock_init,
.sock_fini = xresp0_sock_fini,
.sock_open = xresp0_sock_open,
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
index 9cf5af1f..43c83793 100644
--- a/src/protocol/survey0/xsurvey.c
+++ b/src/protocol/survey0/xsurvey.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -9,7 +9,6 @@
//
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/survey0/survey.h"
@@ -63,18 +62,14 @@ xsurv0_sock_fini(void *arg)
nni_aio_fini(s->aio_getq);
nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
}
static int
-xsurv0_sock_init(void **sp, nni_sock *nsock)
+xsurv0_sock_init(void *arg, nni_sock *nsock)
{
- xsurv0_sock *s;
+ xsurv0_sock *s = arg;
int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
if ((rv = nni_aio_init(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) {
xsurv0_sock_fini(s);
return (rv);
@@ -86,7 +81,6 @@ xsurv0_sock_init(void **sp, nni_sock *nsock)
s->urq = nni_sock_recvq(nsock);
s->ttl = 8;
- *sp = s;
return (0);
}
@@ -127,22 +121,18 @@ xsurv0_pipe_fini(void *arg)
nni_aio_fini(p->aio_recv);
nni_aio_fini(p->aio_putq);
nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
}
static int
-xsurv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
+xsurv0_pipe_init(void *arg, nni_pipe *npipe, void *s)
{
- xsurv0_pipe *p;
+ xsurv0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
// This depth could be tunable. The queue exists so that if we
// have multiple requests coming in faster than we can deliver them,
// we try to avoid dropping them. We don't really have a solution
- // for applying backpressure. It would be nice if surveys carried
+ // for applying back pressure. It would be nice if surveys carried
// an expiration with them, so that we could discard any that are
// not delivered before their expiration date.
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
@@ -156,7 +146,6 @@ xsurv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
p->npipe = npipe;
p->psock = s;
- *pp = p;
return (0);
}
@@ -349,6 +338,7 @@ xsurv0_sock_send(void *arg, nni_aio *aio)
}
static nni_proto_pipe_ops xsurv0_pipe_ops = {
+ .pipe_size = sizeof(xsurv0_pipe),
.pipe_init = xsurv0_pipe_init,
.pipe_fini = xsurv0_pipe_fini,
.pipe_start = xsurv0_pipe_start,
@@ -369,6 +359,7 @@ static nni_option xsurv0_sock_options[] = {
};
static nni_proto_sock_ops xsurv0_sock_ops = {
+ .sock_size = sizeof(xsurv0_sock),
.sock_init = xsurv0_sock_init,
.sock_fini = xsurv0_sock_fini,
.sock_open = xsurv0_sock_open,