aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/survey0
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-03 18:03:57 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-03 18:09:08 -0800
commitbcc3814b58e9b198344bdaf6e7a916a354841275 (patch)
tree795ce060fa8b4356bb4d17457abccdaf6fed8883 /src/protocol/survey0
parentd4cb4abccaa8a3bf319d19f97345c04ebd755053 (diff)
downloadnng-bcc3814b58e9b198344bdaf6e7a916a354841275.tar.gz
nng-bcc3814b58e9b198344bdaf6e7a916a354841275.tar.bz2
nng-bcc3814b58e9b198344bdaf6e7a916a354841275.zip
fixes #1104 move allocation of protocol objects to common core
fixes #1103 respondent could inline backtrace
Diffstat (limited to 'src/protocol/survey0')
-rw-r--r--src/protocol/survey0/respond.c76
-rw-r--r--src/protocol/survey0/survey.c61
-rw-r--r--src/protocol/survey0/xrespond.c27
-rw-r--r--src/protocol/survey0/xsurvey.c25
4 files changed, 64 insertions, 125 deletions
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index bb457bdb..ccd25242 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -36,15 +36,14 @@ static void resp0_pipe_fini(void *);
struct resp0_ctx {
resp0_sock * sock;
- char * btrace;
- size_t btrace_len;
- size_t btrace_size;
uint32_t pipe_id;
resp0_pipe * spipe; // send pipe
nni_aio * saio; // send aio
nni_aio * raio; // recv aio
nni_list_node sqnode;
nni_list_node rqnode;
+ size_t btrace_len;
+ uint32_t btrace[256];
};
// resp0_sock is our per-socket protocol private structure.
@@ -52,7 +51,7 @@ struct resp0_sock {
nni_mtx mtx;
int ttl;
nni_idhash * pipes;
- resp0_ctx * ctx;
+ resp0_ctx ctx;
nni_list recvpipes;
nni_list recvq;
nni_pollable *recvable;
@@ -102,32 +101,19 @@ resp0_ctx_fini(void *arg)
resp0_ctx *ctx = arg;
resp0_ctx_close(ctx);
- nni_free(ctx->btrace, ctx->btrace_size);
- NNI_FREE_STRUCT(ctx);
}
static int
-resp0_ctx_init(void **ctxp, void *sarg)
+resp0_ctx_init(void *carg, void *sarg)
{
- resp0_sock *s = sarg;
- resp0_ctx * ctx;
+ resp0_sock *s = sarg;
+ resp0_ctx * ctx = carg;
- if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- // this is 1kB, which covers the worst case.
- ctx->btrace_size = 256 * sizeof(uint32_t);
- if ((ctx->btrace = nni_alloc(ctx->btrace_size)) == NULL) {
- NNI_FREE_STRUCT(ctx);
- return (NNG_ENOMEM);
- }
NNI_LIST_NODE_INIT(&ctx->sqnode);
NNI_LIST_NODE_INIT(&ctx->rqnode);
ctx->btrace_len = 0;
ctx->sock = s;
ctx->pipe_id = 0;
- *ctxp = ctx;
return (0);
}
@@ -167,7 +153,7 @@ resp0_ctx_send(void *arg, nni_aio *aio)
msg = nni_aio_get_msg(aio);
nni_msg_header_clear(msg);
- if (ctx == s->ctx) {
+ if (ctx == &s->ctx) {
// We can't send anymore, because only one send per request.
nni_pollable_clear(s->sendable);
}
@@ -228,26 +214,20 @@ resp0_sock_fini(void *arg)
resp0_sock *s = arg;
nni_idhash_fini(s->pipes);
- if (s->ctx != NULL) {
- resp0_ctx_fini(s->ctx);
- }
+ resp0_ctx_fini(&s->ctx);
nni_pollable_free(s->sendable);
nni_pollable_free(s->recvable);
nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
}
static int
-resp0_sock_init(void **sp, nni_sock *nsock)
+resp0_sock_init(void *arg, nni_sock *nsock)
{
- resp0_sock *s;
+ resp0_sock *s = arg;
int rv;
NNI_ARG_UNUSED(nsock);
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&s->mtx);
if ((rv = nni_idhash_init(&s->pipes)) != 0) {
resp0_sock_fini(s);
@@ -259,10 +239,7 @@ resp0_sock_init(void **sp, nni_sock *nsock)
s->ttl = 8; // Per RFC
- if ((rv = resp0_ctx_init((void **) &s->ctx, s)) != 0) {
- resp0_ctx_fini(s);
- return (rv);
- }
+ (void) resp0_ctx_init(&s->ctx, s);
// We start off without being either readable or pollable.
// Readability comes when there is something on the socket.
@@ -271,7 +248,6 @@ resp0_sock_init(void **sp, nni_sock *nsock)
resp0_sock_fini(s);
return (rv);
}
- *sp = s;
return (0);
}
@@ -286,7 +262,7 @@ resp0_sock_close(void *arg)
{
resp0_sock *s = arg;
- resp0_ctx_close(s->ctx);
+ resp0_ctx_close(&s->ctx);
}
static void
@@ -310,18 +286,14 @@ resp0_pipe_fini(void *arg)
}
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
- NNI_FREE_STRUCT(p);
}
static int
-resp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
+resp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
{
- resp0_pipe *p;
+ resp0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if (((rv = nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) {
resp0_pipe_fini(p);
@@ -335,7 +307,6 @@ resp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
p->busy = false;
p->id = nni_pipe_id(npipe);
- *pp = p;
return (0);
}
@@ -383,7 +354,7 @@ resp0_pipe_close(void *arg)
nni_aio_finish(aio, 0, nni_msg_len(msg));
nni_msg_free(msg);
}
- if (p->id == s->ctx->pipe_id) {
+ if (p->id == s->ctx.pipe_id) {
// Make sure user space knows they can send a message to us,
// which we will happily discard.
nni_pollable_raise(s->sendable);
@@ -412,7 +383,7 @@ resp0_pipe_send_cb(void *arg)
p->busy = false;
if ((ctx = nni_list_first(&p->sendq)) == NULL) {
// Nothing else to send.
- if (p->id == s->ctx->pipe_id) {
+ if (p->id == s->ctx.pipe_id) {
// Mark us ready for the other side to send!
nni_pollable_raise(s->sendable);
}
@@ -496,7 +467,7 @@ resp0_ctx_recv(void *arg, nni_aio *aio)
memcpy(ctx->btrace, nni_msg_header(msg), len);
ctx->btrace_len = len;
ctx->pipe_id = p->id;
- if (ctx == s->ctx) {
+ if (ctx == &s->ctx) {
nni_pollable_raise(s->sendable);
}
nni_mtx_unlock(&s->mtx);
@@ -543,7 +514,7 @@ resp0_pipe_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = ((body[0] & 0x80) != 0);
+ end = ((body[0] & 0x80u) != 0);
if (nni_msg_header_append(msg, body, 4) != 0) {
goto drop;
}
@@ -577,7 +548,7 @@ resp0_pipe_recv_cb(void *arg)
nni_msg_header_clear(msg);
ctx->pipe_id = p->id;
- if ((ctx == s->ctx) && (!p->busy)) {
+ if ((ctx == &s->ctx) && (!p->busy)) {
nni_pollable_raise(s->sendable);
}
nni_mtx_unlock(&s->mtx);
@@ -637,7 +608,7 @@ resp0_sock_send(void *arg, nni_aio *aio)
{
resp0_sock *s = arg;
- resp0_ctx_send(s->ctx, aio);
+ resp0_ctx_send(&s->ctx, aio);
}
static void
@@ -645,10 +616,11 @@ resp0_sock_recv(void *arg, nni_aio *aio)
{
resp0_sock *s = arg;
- resp0_ctx_recv(s->ctx, aio);
+ resp0_ctx_recv(&s->ctx, aio);
}
static nni_proto_pipe_ops resp0_pipe_ops = {
+ .pipe_size = sizeof(resp0_pipe),
.pipe_init = resp0_pipe_init,
.pipe_fini = resp0_pipe_fini,
.pipe_start = resp0_pipe_start,
@@ -657,6 +629,7 @@ static nni_proto_pipe_ops resp0_pipe_ops = {
};
static nni_proto_ctx_ops resp0_ctx_ops = {
+ .ctx_size = sizeof(resp0_ctx),
.ctx_init = resp0_ctx_init,
.ctx_fini = resp0_ctx_fini,
.ctx_send = resp0_ctx_send,
@@ -686,6 +659,7 @@ static nni_option resp0_sock_options[] = {
};
static nni_proto_sock_ops resp0_sock_ops = {
+ .sock_size = sizeof(resp0_sock),
.sock_init = resp0_sock_init,
.sock_fini = resp0_sock_fini,
.sock_open = resp0_sock_open,
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index 3ecc1457..be0ee55e 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -9,7 +9,6 @@
//
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/survey0/survey.h"
@@ -51,7 +50,7 @@ struct surv0_sock {
int ttl;
nni_list pipes;
nni_mtx mtx;
- surv0_ctx * ctx;
+ surv0_ctx ctx;
nni_idhash * surveys;
nni_pollable *sendable;
};
@@ -77,27 +76,21 @@ surv0_ctx_fini(void *arg)
nni_msgq_fini(ctx->rq);
}
nni_timer_cancel(&ctx->timer);
- NNI_FREE_STRUCT(ctx);
}
static int
-surv0_ctx_init(void **ctxp, void *sarg)
+surv0_ctx_init(void *carg, void *sarg)
{
- surv0_ctx * ctx;
+ surv0_ctx * ctx = carg;
surv0_sock *sock = sarg;
int rv;
- if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_lock(&sock->mtx);
- if (sock->ctx != NULL) {
- ctx->survtime = sock->ctx->survtime;
- }
+ ctx->survtime = sock->ctx.survtime;
nni_mtx_unlock(&sock->mtx);
ctx->sock = sock;
// 126 is a deep enough queue, and leaves 2 extra cells for the
- // pushback bit in msgqs. This can result in up to 1kB of allocation
+ // push back bit. This can result in up to 1kB of allocation
// for the message queue.
if ((rv = nni_msgq_init(&ctx->rq, 126)) != 0) {
surv0_ctx_fini(ctx);
@@ -105,7 +98,6 @@ surv0_ctx_init(void **ctxp, void *sarg)
}
nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx);
- *ctxp = ctx;
return (0);
}
@@ -227,31 +219,25 @@ surv0_sock_fini(void *arg)
{
surv0_sock *sock = arg;
- if (sock->ctx != NULL) {
- surv0_ctx_fini(sock->ctx);
- }
+ surv0_ctx_fini(&sock->ctx);
nni_idhash_fini(sock->surveys);
nni_pollable_free(sock->sendable);
nni_mtx_fini(&sock->mtx);
- NNI_FREE_STRUCT(sock);
}
static int
-surv0_sock_init(void **sp, nni_sock *nsock)
+surv0_sock_init(void *arg, nni_sock *nsock)
{
- surv0_sock *sock;
+ surv0_sock *sock = arg;
int rv;
NNI_ARG_UNUSED(nsock);
- if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
- return (NNG_ENOMEM);
- }
NNI_LIST_INIT(&sock->pipes, surv0_pipe, node);
nni_mtx_init(&sock->mtx);
if (((rv = nni_idhash_init(&sock->surveys)) != 0) ||
- ((rv = surv0_ctx_init((void **) &sock->ctx, sock)) != 0)) {
+ ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0)) {
surv0_sock_fini(sock);
return (rv);
}
@@ -262,10 +248,9 @@ surv0_sock_init(void **sp, nni_sock *nsock)
nni_idhash_set_limits(sock->surveys, 0x80000000u, 0xffffffffu,
nni_random() | 0x80000000u);
- sock->ctx->survtime = NNI_SECOND;
+ sock->ctx.survtime = NNI_SECOND;
sock->ttl = 8;
- *sp = sock;
return (0);
}
@@ -280,7 +265,7 @@ surv0_sock_close(void *arg)
{
surv0_sock *s = arg;
- nni_msgq_close(s->ctx->rq);
+ nni_msgq_close(s->ctx.rq);
}
static void
@@ -302,18 +287,14 @@ surv0_pipe_fini(void *arg)
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
}
static int
-surv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
+surv0_pipe_init(void *arg, nni_pipe *npipe, void *s)
{
- surv0_pipe *p;
+ surv0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
// This depth could be tunable. The deeper the queue, the more
// concurrent surveys that can be delivered. Having said that, this
// is best effort, and a deep queue doesn't really do much for us.
@@ -328,7 +309,6 @@ surv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
p->npipe = npipe;
p->sock = s;
- *pp = p;
return (0);
}
@@ -481,14 +461,14 @@ surv0_sock_set_surveytime(
void *arg, const void *buf, size_t sz, nni_opt_type t)
{
surv0_sock *s = arg;
- return (surv0_ctx_set_surveytime(s->ctx, buf, sz, t));
+ return (surv0_ctx_set_surveytime(&s->ctx, buf, sz, t));
}
static int
surv0_sock_get_surveytime(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
surv0_sock *s = arg;
- return (surv0_ctx_get_surveytime(s->ctx, buf, szp, t));
+ return (surv0_ctx_get_surveytime(&s->ctx, buf, szp, t));
}
static int
@@ -522,7 +502,7 @@ surv0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
int rv;
int fd;
- if (((rv = nni_msgq_get_recvable(sock->ctx->rq, &recvable)) != 0) ||
+ if (((rv = nni_msgq_get_recvable(sock->ctx.rq, &recvable)) != 0) ||
((rv = nni_pollable_getfd(recvable, &fd)) != 0)) {
return (rv);
}
@@ -533,17 +513,18 @@ static void
surv0_sock_recv(void *arg, nni_aio *aio)
{
surv0_sock *s = arg;
- surv0_ctx_recv(s->ctx, aio);
+ surv0_ctx_recv(&s->ctx, aio);
}
static void
surv0_sock_send(void *arg, nni_aio *aio)
{
surv0_sock *s = arg;
- surv0_ctx_send(s->ctx, aio);
+ surv0_ctx_send(&s->ctx, aio);
}
static nni_proto_pipe_ops surv0_pipe_ops = {
+ .pipe_size = sizeof(surv0_pipe),
.pipe_init = surv0_pipe_init,
.pipe_fini = surv0_pipe_fini,
.pipe_start = surv0_pipe_start,
@@ -562,6 +543,7 @@ static nni_option surv0_ctx_options[] = {
}
};
static nni_proto_ctx_ops surv0_ctx_ops = {
+ .ctx_size = sizeof(surv0_ctx),
.ctx_init = surv0_ctx_init,
.ctx_fini = surv0_ctx_fini,
.ctx_send = surv0_ctx_send,
@@ -595,6 +577,7 @@ static nni_option surv0_sock_options[] = {
};
static nni_proto_sock_ops surv0_sock_ops = {
+ .sock_size = sizeof(surv0_sock),
.sock_init = surv0_sock_init,
.sock_fini = surv0_sock_fini,
.sock_open = surv0_sock_open,
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
index 865a94f3..66b340ee 100644
--- a/src/protocol/survey0/xrespond.c
+++ b/src/protocol/survey0/xrespond.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -9,7 +9,6 @@
//
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/survey0/respond.h"
@@ -66,18 +65,14 @@ xresp0_sock_fini(void *arg)
nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
}
static int
-xresp0_sock_init(void **sp, nni_sock *nsock)
+xresp0_sock_init(void *arg, nni_sock *nsock)
{
- xresp0_sock *s;
+ xresp0_sock *s = arg;
int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&s->mtx);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
((rv = nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) {
@@ -89,7 +84,6 @@ xresp0_sock_init(void **sp, nni_sock *nsock)
s->urq = nni_sock_recvq(nsock);
s->uwq = nni_sock_sendq(nsock);
- *sp = s;
return (0);
}
@@ -130,18 +124,14 @@ xresp0_pipe_fini(void *arg)
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
}
static int
-xresp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
+xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
{
- xresp0_pipe *p;
+ xresp0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
((rv = nni_aio_init(&p->aio_putq, xresp0_putq_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_recv, xresp0_recv_cb, p)) != 0) ||
@@ -153,7 +143,6 @@ xresp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
p->npipe = npipe;
p->psock = s;
- *pp = p;
return (0);
}
@@ -298,7 +287,7 @@ xresp0_recv_cb(void *arg)
// Move backtrace from body to header
hops = 1;
for (;;) {
- bool end = false;
+ bool end;
uint8_t *body;
if (hops > s->ttl) {
@@ -312,7 +301,7 @@ xresp0_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = ((body[0] & 0x80) != 0);
+ end = ((body[0] & 0x80u) != 0);
if (nni_msg_header_append(msg, body, 4) != 0) {
goto drop;
}
@@ -378,6 +367,7 @@ xresp0_sock_recv(void *arg, nni_aio *aio)
}
static nni_proto_pipe_ops xresp0_pipe_ops = {
+ .pipe_size = sizeof(xresp0_pipe),
.pipe_init = xresp0_pipe_init,
.pipe_fini = xresp0_pipe_fini,
.pipe_start = xresp0_pipe_start,
@@ -398,6 +388,7 @@ static nni_option xresp0_sock_options[] = {
};
static nni_proto_sock_ops xresp0_sock_ops = {
+ .sock_size = sizeof(xresp0_sock),
.sock_init = xresp0_sock_init,
.sock_fini = xresp0_sock_fini,
.sock_open = xresp0_sock_open,
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
index 9cf5af1f..43c83793 100644
--- a/src/protocol/survey0/xsurvey.c
+++ b/src/protocol/survey0/xsurvey.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -9,7 +9,6 @@
//
#include <stdlib.h>
-#include <string.h>
#include "core/nng_impl.h"
#include "nng/protocol/survey0/survey.h"
@@ -63,18 +62,14 @@ xsurv0_sock_fini(void *arg)
nni_aio_fini(s->aio_getq);
nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
}
static int
-xsurv0_sock_init(void **sp, nni_sock *nsock)
+xsurv0_sock_init(void *arg, nni_sock *nsock)
{
- xsurv0_sock *s;
+ xsurv0_sock *s = arg;
int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
- return (NNG_ENOMEM);
- }
if ((rv = nni_aio_init(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) {
xsurv0_sock_fini(s);
return (rv);
@@ -86,7 +81,6 @@ xsurv0_sock_init(void **sp, nni_sock *nsock)
s->urq = nni_sock_recvq(nsock);
s->ttl = 8;
- *sp = s;
return (0);
}
@@ -127,22 +121,18 @@ xsurv0_pipe_fini(void *arg)
nni_aio_fini(p->aio_recv);
nni_aio_fini(p->aio_putq);
nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
}
static int
-xsurv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
+xsurv0_pipe_init(void *arg, nni_pipe *npipe, void *s)
{
- xsurv0_pipe *p;
+ xsurv0_pipe *p = arg;
int rv;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
// This depth could be tunable. The queue exists so that if we
// have multiple requests coming in faster than we can deliver them,
// we try to avoid dropping them. We don't really have a solution
- // for applying backpressure. It would be nice if surveys carried
+ // for applying back pressure. It would be nice if surveys carried
// an expiration with them, so that we could discard any that are
// not delivered before their expiration date.
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
@@ -156,7 +146,6 @@ xsurv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
p->npipe = npipe;
p->psock = s;
- *pp = p;
return (0);
}
@@ -349,6 +338,7 @@ xsurv0_sock_send(void *arg, nni_aio *aio)
}
static nni_proto_pipe_ops xsurv0_pipe_ops = {
+ .pipe_size = sizeof(xsurv0_pipe),
.pipe_init = xsurv0_pipe_init,
.pipe_fini = xsurv0_pipe_fini,
.pipe_start = xsurv0_pipe_start,
@@ -369,6 +359,7 @@ static nni_option xsurv0_sock_options[] = {
};
static nni_proto_sock_ops xsurv0_sock_ops = {
+ .sock_size = sizeof(xsurv0_sock),
.sock_init = xsurv0_sock_init,
.sock_fini = xsurv0_sock_fini,
.sock_open = xsurv0_sock_open,