aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-02-26 00:22:11 -0800
committerGarrett D'Amore <garrett@damore.org>2020-02-26 20:48:22 -0800
commitabab1392c11395d387e3072c4f5788d284846093 (patch)
treedbb1ecb2b166338496981c7d8e3e9e16b977b995 /src
parent6a59b15fba1085432c4c18952680e14d80dd134a (diff)
downloadnng-abab1392c11395d387e3072c4f5788d284846093.tar.gz
nng-abab1392c11395d387e3072c4f5788d284846093.tar.bz2
nng-abab1392c11395d387e3072c4f5788d284846093.zip
fixes #1171 message header could be inlined in the message structure
This uses a maximum 64-byte header and should avoid allocations and cache misses, leading to a small performance boost overall.
Diffstat (limited to 'src')
-rw-r--r--src/core/message.c250
-rw-r--r--src/core/message.h34
-rw-r--r--src/nng.c331
-rw-r--r--src/protocol/bus0/bus.c12
-rw-r--r--src/protocol/pair1/pair.c4
-rw-r--r--src/protocol/pair1/pair1_poly.c4
-rw-r--r--src/protocol/reqrep0/req.c2
-rw-r--r--src/protocol/reqrep0/xrep.c2
-rw-r--r--src/protocol/survey0/survey.c4
-rw-r--r--src/protocol/survey0/xrespond.c2
10 files changed, 372 insertions, 273 deletions
diff --git a/src/core/message.c b/src/core/message.c
index f046feb4..9241ac65 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -24,7 +24,8 @@ typedef struct {
// Underlying message structure.
struct nng_msg {
- nni_chunk m_header;
+ uint32_t m_header_buf[(NNI_MAX_MAX_TTL + 1)];
+ size_t m_header_len;
nni_chunk m_body;
uint32_t m_pipe; // set on receive
nni_atomic_int m_refcnt;
@@ -74,7 +75,7 @@ nni_msg_dump(const char *banner, const nni_msg *msg)
(void) snprintf(buf, sizeof(buf), "--- %s BEGIN ---", banner);
nni_println(buf);
- nni_chunk_dump(&msg->m_header, "HEADER");
+ // TODO: dump the header
nni_chunk_dump(&msg->m_body, "BODY");
nni_println("--- END ---");
}
@@ -289,58 +290,6 @@ nni_chunk_insert(nni_chunk *ch, const void *data, size_t len)
return (0);
}
-static int
-nni_chunk_insert_u16(nni_chunk *ch, uint16_t val)
-{
- unsigned char buf[sizeof(uint16_t)];
- NNI_PUT16(buf, val);
- return (nni_chunk_insert(ch, buf, sizeof(buf)));
-}
-
-static int
-nni_chunk_append_u16(nni_chunk *ch, uint16_t val)
-{
- unsigned char buf[sizeof(uint16_t)];
- NNI_PUT16(buf, val);
- return (nni_chunk_append(ch, buf, sizeof(buf)));
-}
-
-static uint16_t
-nni_chunk_trim_u16(nni_chunk *ch)
-{
- uint16_t v;
- NNI_ASSERT(ch->ch_len >= sizeof(v));
- NNI_GET16(ch->ch_ptr, v);
- nni_chunk_trim(ch, sizeof(v));
- return (v);
-}
-
-static uint16_t
-nni_chunk_chop_u16(nni_chunk *ch)
-{
- uint16_t v;
- NNI_ASSERT(ch->ch_len >= sizeof(v));
- NNI_GET16(ch->ch_ptr + ch->ch_len - sizeof(v), v);
- nni_chunk_chop(ch, sizeof(v));
- return (v);
-}
-
-static int
-nni_chunk_insert_u32(nni_chunk *ch, uint32_t val)
-{
- unsigned char buf[sizeof(uint32_t)];
- NNI_PUT32(buf, val);
- return (nni_chunk_insert(ch, buf, sizeof(buf)));
-}
-
-static int
-nni_chunk_append_u32(nni_chunk *ch, uint32_t val)
-{
- unsigned char buf[sizeof(uint32_t)];
- NNI_PUT32(buf, val);
- return (nni_chunk_append(ch, buf, sizeof(buf)));
-}
-
static uint32_t
nni_chunk_trim_u32(nni_chunk *ch)
{
@@ -351,52 +300,6 @@ nni_chunk_trim_u32(nni_chunk *ch)
return (v);
}
-static uint32_t
-nni_chunk_chop_u32(nni_chunk *ch)
-{
- uint32_t v;
- NNI_ASSERT(ch->ch_len >= sizeof(v));
- NNI_GET32(ch->ch_ptr + ch->ch_len - sizeof(v), v);
- nni_chunk_chop(ch, sizeof(v));
- return (v);
-}
-
-static int
-nni_chunk_insert_u64(nni_chunk *ch, uint64_t val)
-{
- unsigned char buf[sizeof(uint64_t)];
- NNI_PUT64(buf, val);
- return (nni_chunk_insert(ch, buf, sizeof(buf)));
-}
-
-static int
-nni_chunk_append_u64(nni_chunk *ch, uint64_t val)
-{
- unsigned char buf[sizeof(uint64_t)];
- NNI_PUT64(buf, val);
- return (nni_chunk_append(ch, buf, sizeof(buf)));
-}
-
-static uint64_t
-nni_chunk_trim_u64(nni_chunk *ch)
-{
- uint64_t v;
- NNI_ASSERT(ch->ch_len >= sizeof(v));
- NNI_GET64(ch->ch_ptr, v);
- nni_chunk_trim(ch, sizeof(v));
- return (v);
-}
-
-static uint64_t
-nni_chunk_chop_u64(nni_chunk *ch)
-{
- uint64_t v;
- NNI_ASSERT(ch->ch_len >= sizeof(v));
- NNI_GET64(ch->ch_ptr + ch->ch_len - sizeof(v), v);
- nni_chunk_chop(ch, sizeof(v));
- return (v);
-}
-
void
nni_msg_clone(nni_msg *m)
{
@@ -470,13 +373,6 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
return (NNG_ENOMEM);
}
- // Header size is strictly limited. We need max hops plus one for
- // the request or survey ID. TODO: Inline the header (no chunk).
- if ((rv = nni_chunk_grow(&m->m_header, NNI_MAX_HEADER_SIZE, 0)) != 0) {
- NNI_FREE_STRUCT(m);
- return (rv);
- }
-
// If the message is less than 1024 bytes, or is not power
// of two aligned, then we insert a 32 bytes of headroom
// to allow for inlining backtraces, etc. We also allow the
@@ -488,11 +384,10 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
rv = nni_chunk_grow(&m->m_body, sz, 0);
}
if (rv != 0) {
- nni_chunk_free(&m->m_header);
NNI_FREE_STRUCT(m);
return (rv);
}
- if ((rv = nni_chunk_append(&m->m_body, NULL, sz)) != 0) {
+ if (nni_chunk_append(&m->m_body, NULL, sz) != 0) {
// Should not happen since we just grew it to fit.
nni_panic("chunk_append failed");
}
@@ -514,12 +409,10 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src)
return (NNG_ENOMEM);
}
- if ((rv = nni_chunk_dup(&m->m_header, &src->m_header)) != 0) {
- NNI_FREE_STRUCT(m);
- return (rv);
- }
+ memcpy(m->m_header_buf, src->m_header_buf, src->m_header_len);
+ m->m_header_len = src->m_header_len;
+
if ((rv = nni_chunk_dup(&m->m_body, &src->m_body)) != 0) {
- nni_chunk_free(&m->m_header);
NNI_FREE_STRUCT(m);
return (rv);
}
@@ -536,7 +429,6 @@ void
nni_msg_free(nni_msg *m)
{
if ((m != NULL) && (nni_atomic_dec_nv(&m->m_refcnt) == 0)) {
- nni_chunk_free(&m->m_header);
nni_chunk_free(&m->m_body);
NNI_FREE_STRUCT(m);
}
@@ -561,13 +453,13 @@ nni_msg_realloc(nni_msg *m, size_t sz)
void *
nni_msg_header(nni_msg *m)
{
- return (m->m_header.ch_ptr);
+ return (m->m_header_buf);
}
size_t
nni_msg_header_len(const nni_msg *m)
{
- return (m->m_header.ch_len);
+ return (m->m_header_len);
}
void *
@@ -600,6 +492,12 @@ nni_msg_trim(nni_msg *m, size_t len)
return (nni_chunk_trim(&m->m_body, len));
}
+uint32_t
+nni_msg_trim_u32(nni_msg *m)
+{
+ return (nni_chunk_trim_u32(&m->m_body));
+}
+
int
nni_msg_chop(nni_msg *m, size_t len)
{
@@ -609,95 +507,85 @@ nni_msg_chop(nni_msg *m, size_t len)
int
nni_msg_header_append(nni_msg *m, const void *data, size_t len)
{
- return (nni_chunk_append(&m->m_header, data, len));
+ if ((len + m->m_header_len) > sizeof(m->m_header_buf)) {
+ return (NNG_EINVAL);
+ }
+ memcpy(((uint8_t *) m->m_header_buf) + m->m_header_len, data, len);
+ m->m_header_len += len;
+ return (0);
}
int
nni_msg_header_insert(nni_msg *m, const void *data, size_t len)
{
- return (nni_chunk_insert(&m->m_header, data, len));
+ if ((len + m->m_header_len) > sizeof(m->m_header_buf)) {
+ return (NNG_EINVAL);
+ }
+ memmove(((uint8_t *) m->m_header_buf) + len, m->m_header_buf,
+ m->m_header_len);
+ memcpy(m->m_header_buf, data, len);
+ m->m_header_len += len;
+ return (0);
}
int
nni_msg_header_trim(nni_msg *m, size_t len)
{
- return (nni_chunk_trim(&m->m_header, len));
+ if (len > m->m_header_len) {
+ return (NNG_EINVAL);
+ }
+ memmove(m->m_header_buf, ((uint8_t *) m->m_header_buf) + len,
+ m->m_header_len - len);
+ m->m_header_len -= len;
+ return (0);
}
int
nni_msg_header_chop(nni_msg *m, size_t len)
{
- return (nni_chunk_chop(&m->m_header, len));
-}
-
-#define DEF_MSG_ADD_N(z, x) \
- int nni_msg_##z##_u##x(nni_msg *m, uint##x##_t v) \
- { \
- return (nni_chunk_##z##_u##x(&m->m_body, v)); \
- } \
- int nni_msg_header_##z##_u##x(nni_msg *m, uint##x##_t v) \
- { \
- return (nni_chunk_##z##_u##x(&m->m_header, v)); \
- }
-
-#define DEF_MSG_MUST_ADD_N(z, x) \
- void nni_msg_must_##z##_u##x(nni_msg *m, uint##x##_t v) \
- { \
- int rv; \
- if ((rv = nni_msg_##z##_u##x(m, v)) != 0) { \
- nni_panic("nni_msg_%s_u%s failed: %d", #z, #x, rv); \
- } \
- } \
- void nni_msg_header_must_##z##_u##x(nni_msg *m, uint##x##_t v) \
- { \
- int rv; \
- if ((rv = nni_msg_header_##z##_u##x(m, v)) != 0) { \
- nni_panic( \
- "nni_msg_header_%s_u%s failed: %d", #z, #x, rv); \
- } \
- }
-
-#define DEF_MSG_REM_N(z, x) \
- uint##x##_t nni_msg_##z##_u##x(nni_msg *m) \
- { \
- return (nni_chunk_##z##_u##x(&m->m_body)); \
- } \
- uint##x##_t nni_msg_header_##z##_u##x(nni_msg *m) \
- { \
- return (nni_chunk_##z##_u##x(&m->m_header)); \
- }
-
-#define DEF_MSG_ADD(op) \
- DEF_MSG_ADD_N(op, 16) DEF_MSG_ADD_N(op, 32) DEF_MSG_ADD_N(op, 64)
-
-#define DEF_MSG_MUST_ADD(op) DEF_MSG_MUST_ADD_N(op, 32)
-
-#define DEF_MSG_REM(op) \
- DEF_MSG_REM_N(op, 16) DEF_MSG_REM_N(op, 32) DEF_MSG_REM_N(op, 64)
-
-DEF_MSG_ADD(append)
-DEF_MSG_MUST_ADD(append)
-DEF_MSG_ADD(insert)
-DEF_MSG_REM(chop)
-DEF_MSG_REM(trim)
-
-#undef DEF_MSG_ADD_N
-#undef DEF_MUST_ADD_N
-#undef DEF_MSG_REM_N
-#undef DEF_MSG_ADD
-#undef DEF_MSG_MUST_ADD
-#undef DEF_MSG_REM
+ if (len > m->m_header_len) {
+ return (NNG_EINVAL);
+ }
+ m->m_header_len -= len;
+ return (0);
+}
+
+uint32_t
+nni_msg_header_trim_u32(nni_msg *m)
+{
+ uint32_t val;
+ uint8_t *dst;
+ dst = (void *) m->m_header_buf;
+ NNI_GET32(dst, val);
+ m->m_header_len -= sizeof(val);
+ memmove(m->m_header_buf, &m->m_header_buf[1], m->m_header_len);
+ return (val);
+}
+
+void
+nni_msg_header_append_u32(nni_msg *m, uint32_t val)
+{
+ uint8_t *dst;
+ if ((m->m_header_len + sizeof(val)) >= (sizeof(m->m_header_buf))) {
+ nni_panic("impossible header over-run");
+ }
+ dst = (void *) m->m_header_buf;
+ dst += m->m_header_len;
+ NNI_PUT32(dst, val);
+ m->m_header_len += sizeof(val);
+}
void
nni_msg_clear(nni_msg *m)
{
+ m->m_header_len = 0;
nni_chunk_clear(&m->m_body);
}
void
nni_msg_header_clear(nni_msg *m)
{
- nni_chunk_clear(&m->m_header);
+ m->m_header_len = 0;
}
void
diff --git a/src/core/message.h b/src/core/message.h
index 53b644c4..03991166 100644
--- a/src/core/message.h
+++ b/src/core/message.h
@@ -33,30 +33,9 @@ extern void nni_msg_header_clear(nni_msg *);
extern int nni_msg_header_trim(nni_msg *, size_t);
extern int nni_msg_header_chop(nni_msg *, size_t);
extern void nni_msg_dump(const char *, const nni_msg *);
-extern int nni_msg_append_u16(nni_msg *, uint16_t);
-extern int nni_msg_append_u32(nni_msg *, uint32_t);
-extern int nni_msg_append_u64(nni_msg *, uint64_t);
-extern int nni_msg_insert_u16(nni_msg *, uint16_t);
-extern int nni_msg_insert_u32(nni_msg *, uint32_t);
-extern int nni_msg_insert_u64(nni_msg *, uint64_t);
-extern int nni_msg_header_append_u16(nni_msg *, uint16_t);
-extern int nni_msg_header_append_u32(nni_msg *, uint32_t);
-extern int nni_msg_header_append_u64(nni_msg *, uint64_t);
-extern int nni_msg_header_insert_u16(nni_msg *, uint16_t);
-extern int nni_msg_header_insert_u32(nni_msg *, uint32_t);
-extern int nni_msg_header_insert_u64(nni_msg *, uint64_t);
-extern uint16_t nni_msg_trim_u16(nni_msg *);
-extern uint32_t nni_msg_trim_u32(nni_msg *);
-extern uint64_t nni_msg_trim_u64(nni_msg *);
-extern uint16_t nni_msg_chop_u16(nni_msg *);
-extern uint32_t nni_msg_chop_u32(nni_msg *);
-extern uint64_t nni_msg_chop_u64(nni_msg *);
-extern uint16_t nni_msg_header_trim_u16(nni_msg *);
+extern void nni_msg_header_append_u32(nni_msg *, uint32_t);
extern uint32_t nni_msg_header_trim_u32(nni_msg *);
-extern uint64_t nni_msg_header_trim_u64(nni_msg *);
-extern uint16_t nni_msg_header_chop_u16(nni_msg *);
-extern uint32_t nni_msg_header_chop_u32(nni_msg *);
-extern uint64_t nni_msg_header_chop_u64(nni_msg *);
+extern uint32_t nni_msg_trim_u32(nni_msg *);
extern void nni_msg_set_pipe(nni_msg *, uint32_t);
extern uint32_t nni_msg_get_pipe(const nni_msg *);
@@ -65,7 +44,7 @@ extern uint32_t nni_msg_get_pipe(const nni_msg *);
// this functionality MUST be certain to use nni_msg_unique() before
// passing a message out of their control (e.g. to user programs.)
// Failure to do so will likely result in corruption.
-extern void nni_msg_clone(nni_msg *);
+extern void nni_msg_clone(nni_msg *);
extern nni_msg *nni_msg_unique(nni_msg *);
// nni_msg_pull_up ensures that the message is unique, and that any
// header present is "pulled up" into the message body. If the function
@@ -74,11 +53,4 @@ extern nni_msg *nni_msg_unique(nni_msg *);
// original message in that case (same semantics as realloc).
extern nni_msg *nni_msg_pull_up(nni_msg *);
-// These should only be used when the transport or protocol is absolutely
-// certain that there is adequate room. There is about 32 bytes of
-// header and trailer space for a newly allocated message, and transports
-// should generally not be burning more than half that.
-extern void nni_msg_must_append_u32(nni_msg *, uint32_t);
-extern void nni_msg_header_must_append_u32(nni_msg *, uint32_t);
-
#endif // CORE_SOCKET_H
diff --git a/src/nng.c b/src/nng.c
index dd82bcee..28876e07 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -115,8 +115,8 @@ nng_recv(nng_socket s, void *buf, size_t *szp, int flags)
memcpy(nbuf, nni_msg_body(msg), nni_msg_len(msg));
*szp = nng_msg_len(msg);
} else {
- *(void **)buf = NULL;
- *szp = 0;
+ *(void **) buf = NULL;
+ *szp = 0;
}
}
nni_msg_free(msg);
@@ -997,53 +997,59 @@ nng_msg_insert(nng_msg *msg, const void *data, size_t sz)
}
int
+nng_msg_append_u16(nng_msg *msg, uint16_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT16(buf, v);
+ return (nni_msg_append(msg, buf, sizeof(v)));
+}
+
+int
+nng_msg_append_u32(nng_msg *msg, uint32_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT32(buf, v);
+ return (nni_msg_append(msg, buf, sizeof(v)));
+}
+
+int
+nng_msg_append_u64(nng_msg *msg, uint64_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT64(buf, v);
+ return (nni_msg_append(msg, buf, sizeof(v)));
+}
+
+int
+nng_msg_insert_u16(nng_msg *msg, uint16_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT16(buf, v);
+ return (nni_msg_insert(msg, buf, sizeof(v)));
+}
+
+int
+nng_msg_insert_u32(nng_msg *msg, uint32_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT32(buf, v);
+ return (nni_msg_insert(msg, buf, sizeof(v)));
+}
+
+int
+nng_msg_insert_u64(nng_msg *msg, uint64_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT64(buf, v);
+ return (nni_msg_insert(msg, buf, sizeof(v)));
+}
+
+int
nng_msg_header_append(nng_msg *msg, const void *data, size_t sz)
{
return (nni_msg_header_append(msg, data, sz));
}
-#define DEF_MSG_ADD_N(op, n) \
- int nng_msg_header_##op##_u##n(nng_msg *m, uint##n##_t v) \
- { \
- return (nni_msg_header_##op##_u##n(m, v)); \
- } \
- int nng_msg_##op##_u##n(nng_msg *m, uint##n##_t v) \
- { \
- return (nni_msg_##op##_u##n(m, v)); \
- }
-#define DEF_MSG_REM_N(op, n) \
- int nng_msg_header_##op##_u##n(nng_msg *m, uint##n##_t *vp) \
- { \
- if (nni_msg_header_len(m) < sizeof(*vp)) { \
- return (NNG_EINVAL); \
- } \
- *vp = nni_msg_header_##op##_u##n(m); \
- return (0); \
- } \
- int nng_msg_##op##_u##n(nng_msg *m, uint##n##_t *vp) \
- { \
- if (nni_msg_len(m) < sizeof(*vp)) { \
- return (NNG_EINVAL); \
- } \
- *vp = nni_msg_##op##_u##n(m); \
- return (0); \
- }
-
-#define DEF_MSG_ADD(op) \
- DEF_MSG_ADD_N(op, 16) DEF_MSG_ADD_N(op, 32) DEF_MSG_ADD_N(op, 64)
-#define DEF_MSG_REM(op) \
- DEF_MSG_REM_N(op, 16) DEF_MSG_REM_N(op, 32) DEF_MSG_REM_N(op, 64)
-
-DEF_MSG_ADD(append)
-DEF_MSG_ADD(insert)
-DEF_MSG_REM(chop)
-DEF_MSG_REM(trim)
-
-#undef DEF_MSG_ADD_N
-#undef DEF_MSG_REM_N
-#undef DEF_MSG_ADD
-#undef DEF_MSG_REM
-
int
nng_msg_header_insert(nng_msg *msg, const void *data, size_t sz)
{
@@ -1051,6 +1057,54 @@ nng_msg_header_insert(nng_msg *msg, const void *data, size_t sz)
}
int
+nng_msg_header_append_u16(nng_msg *msg, uint16_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT16(buf, v);
+ return (nni_msg_header_append(msg, buf, sizeof(v)));
+}
+
+int
+nng_msg_header_append_u32(nng_msg *msg, uint32_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT32(buf, v);
+ return (nni_msg_header_append(msg, buf, sizeof(v)));
+}
+
+int
+nng_msg_header_append_u64(nng_msg *msg, uint64_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT64(buf, v);
+ return (nni_msg_header_append(msg, buf, sizeof(v)));
+}
+
+int
+nng_msg_header_insert_u16(nng_msg *msg, uint16_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT16(buf, v);
+ return (nni_msg_header_insert(msg, buf, sizeof(v)));
+}
+
+int
+nng_msg_header_insert_u32(nng_msg *msg, uint32_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT32(buf, v);
+ return (nni_msg_header_insert(msg, buf, sizeof(v)));
+}
+
+int
+nng_msg_header_insert_u64(nng_msg *msg, uint64_t v)
+{
+ uint8_t buf[sizeof(v)];
+ NNI_PUT64(buf, v);
+ return (nni_msg_header_insert(msg, buf, sizeof(v)));
+}
+
+int
nng_msg_trim(nng_msg *msg, size_t sz)
{
return (nni_msg_trim(msg, sz));
@@ -1074,6 +1128,195 @@ nng_msg_header_chop(nng_msg *msg, size_t sz)
return (nni_msg_header_chop(msg, sz));
}
+int
+nng_msg_chop_u16(nng_msg *m, uint16_t *vp)
+{
+ uint8_t *body;
+ uint16_t v;
+ if (nni_msg_len(m) < sizeof(*vp)) {
+ return (NNG_EINVAL);
+ }
+ body = nni_msg_body(m);
+ body += nni_msg_len(m);
+ body -= sizeof(v);
+ NNI_GET16(body, v);
+ (void) nni_msg_chop(m, sizeof(v));
+ *vp = v;
+ return (0);
+}
+
+int
+nng_msg_chop_u32(nng_msg *m, uint32_t *vp)
+{
+ uint8_t *body;
+ uint32_t v;
+ if (nni_msg_len(m) < sizeof(*vp)) {
+ return (NNG_EINVAL);
+ }
+ body = nni_msg_body(m);
+ body += nni_msg_len(m);
+ body -= sizeof(v);
+ NNI_GET32(body, v);
+ (void) nni_msg_chop(m, sizeof(v));
+ *vp = v;
+ return (0);
+}
+
+int
+nng_msg_chop_u64(nng_msg *m, uint64_t *vp)
+{
+ uint8_t *body;
+ uint64_t v;
+ if (nni_msg_len(m) < sizeof(*vp)) {
+ return (NNG_EINVAL);
+ }
+ body = nni_msg_body(m);
+ body += nni_msg_len(m);
+ body -= sizeof(v);
+ NNI_GET64(body, v);
+ (void) nni_msg_chop(m, sizeof(v));
+ *vp = v;
+ return (0);
+}
+
+int
+nng_msg_trim_u16(nng_msg *m, uint16_t *vp)
+{
+ uint8_t *body;
+ uint16_t v;
+ if (nni_msg_len(m) < sizeof(v)) {
+ return (NNG_EINVAL);
+ }
+ body = nni_msg_body(m);
+ NNI_GET16(body, v);
+ (void) nni_msg_trim(m, sizeof(v));
+ *vp = v;
+ return (0);
+}
+
+int
+nng_msg_trim_u32(nng_msg *m, uint32_t *vp)
+{
+ uint8_t *body;
+ uint32_t v;
+ if (nni_msg_len(m) < sizeof(v)) {
+ return (NNG_EINVAL);
+ }
+ body = nni_msg_body(m);
+ NNI_GET32(body, v);
+ (void) nni_msg_trim(m, sizeof(v));
+ *vp = v;
+ return (0);
+}
+
+int
+nng_msg_trim_u64(nng_msg *m, uint64_t *vp)
+{
+ uint8_t *body;
+ uint64_t v;
+ if (nni_msg_len(m) < sizeof(v)) {
+ return (NNG_EINVAL);
+ }
+ body = nni_msg_body(m);
+ NNI_GET64(body, v);
+ (void) nni_msg_trim(m, sizeof(v));
+ *vp = v;
+ return (0);
+}
+
+int
+nng_msg_header_chop_u16(nng_msg *msg, uint16_t *val)
+{
+ uint8_t *header;
+ uint16_t v;
+ if (nng_msg_header_len(msg) < sizeof(*val)) {
+ return (NNG_EINVAL);
+ }
+ header = nng_msg_header(msg);
+ header += nng_msg_header_len(msg);
+ header -= sizeof(v);
+ NNI_GET16(header, v);
+ *val = v;
+ nni_msg_header_chop(msg, sizeof(v));
+ return (0);
+}
+
+int
+nng_msg_header_chop_u32(nng_msg *msg, uint32_t *val)
+{
+ uint8_t *header;
+ uint32_t v;
+ if (nng_msg_header_len(msg) < sizeof(*val)) {
+ return (NNG_EINVAL);
+ }
+ header = nng_msg_header(msg);
+ header += nng_msg_header_len(msg);
+ header -= sizeof(v);
+ NNI_GET32(header, v);
+ *val = v;
+ nni_msg_header_chop(msg, sizeof(v));
+ return (0);
+}
+
+int
+nng_msg_header_chop_u64(nng_msg *msg, uint64_t *val)
+{
+ uint8_t *header;
+ uint64_t v;
+ if (nng_msg_header_len(msg) < sizeof(v)) {
+ return (NNG_EINVAL);
+ }
+ header = nng_msg_header(msg);
+ header += nng_msg_header_len(msg);
+ header -= sizeof(v);
+ NNI_GET64(header, v);
+ *val = v;
+ nni_msg_header_chop(msg, sizeof(*val));
+ return (0);
+}
+
+int
+nng_msg_header_trim_u16(nng_msg *msg, uint16_t *val)
+{
+ uint8_t *header = nni_msg_header(msg);
+ uint16_t v;
+ if (nng_msg_header_len(msg) < sizeof(v)) {
+ return (NNG_EINVAL);
+ }
+ NNI_GET16(header, v);
+ *val = v;
+ nni_msg_header_trim(msg, sizeof(v));
+ return (0);
+}
+
+int
+nng_msg_header_trim_u32(nng_msg *msg, uint32_t *val)
+{
+ uint8_t *header = nni_msg_header(msg);
+ uint32_t v;
+ if (nng_msg_header_len(msg) < sizeof(v)) {
+ return (NNG_EINVAL);
+ }
+ NNI_GET32(header, v);
+ *val = v;
+ nni_msg_header_trim(msg, sizeof(v));
+ return (0);
+}
+
+int
+nng_msg_header_trim_u64(nng_msg *msg, uint64_t *val)
+{
+ uint8_t *header = nni_msg_header(msg);
+ uint64_t v;
+ if (nng_msg_header_len(msg) < sizeof(v)) {
+ return (NNG_EINVAL);
+ }
+ NNI_GET64(header, v);
+ *val = v;
+ nni_msg_header_trim(msg, sizeof(v));
+ return (0);
+}
+
void
nng_msg_clear(nng_msg *msg)
{
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index c409292e..9a610ac6 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -99,7 +99,8 @@ bus0_sock_init_raw(void *arg, nni_sock *nsock)
NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) {
+ if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb_raw, s)) !=
+ 0) {
bus0_sock_fini(s);
return (rv);
}
@@ -257,13 +258,8 @@ bus0_pipe_recv_cb(void *arg)
}
msg = nni_aio_get_msg(p->aio_recv);
- if (s->raw &&
- (nni_msg_header_insert_u32(msg, nni_pipe_id(p->npipe)) != 0)) {
- // XXX: bump a nomemory stat
- nni_msg_free(msg);
- nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_close(p->npipe);
- return;
+ if (s->raw) {
+ nni_msg_header_append_u32(msg, nni_pipe_id(p->npipe));
}
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index b98975c3..00959a4c 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -281,7 +281,7 @@ pair1_pipe_recv_cb(void *arg)
}
// Store the hop count in the header.
- nni_msg_header_must_append_u32(msg, hdr);
+ nni_msg_header_append_u32(msg, hdr);
// Send the message up.
nni_aio_set_msg(&p->aio_put, msg);
@@ -345,7 +345,7 @@ pair1_pipe_get_cb(void *arg)
hops++;
// Insert the hops header.
- nni_msg_header_must_append_u32(msg, hops);
+ nni_msg_header_append_u32(msg, hops);
nni_aio_set_msg(&p->aio_send, msg);
nni_pipe_send(p->pipe, &p->aio_send);
diff --git a/src/protocol/pair1/pair1_poly.c b/src/protocol/pair1/pair1_poly.c
index 183c10da..950c60f7 100644
--- a/src/protocol/pair1/pair1_poly.c
+++ b/src/protocol/pair1/pair1_poly.c
@@ -280,7 +280,7 @@ pair1poly_pipe_recv_cb(void *arg)
}
// Store the hop count in the header.
- nni_msg_header_must_append_u32(msg, hdr);
+ nni_msg_header_append_u32(msg, hdr);
// Send the message up.
nni_aio_set_msg(&p->aio_put, msg);
@@ -361,7 +361,7 @@ pair1poly_pipe_get_cb(void *arg)
nni_msg_header_clear(msg);
// Insert the hops header.
- nni_msg_header_must_append_u32(msg, 1);
+ nni_msg_header_append_u32(msg, 1);
nni_aio_set_msg(&p->aio_send, msg);
nni_pipe_send(p->pipe, &p->aio_send);
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index fea95725..cb716941 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -667,7 +667,7 @@ req0_ctx_send(void *arg, nni_aio *aio)
}
ctx->request_id = (uint32_t) id;
nni_msg_header_clear(msg);
- nni_msg_header_must_append_u32(msg, ctx->request_id);
+ nni_msg_header_append_u32(msg, ctx->request_id);
// If no pipes are ready, and the request was a poll (no background
// schedule), then fail it. Should be NNG_ETIMEDOUT.
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index 901cecc4..0bce27ba 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -290,7 +290,7 @@ xrep0_pipe_recv_cb(void *arg)
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
// Store the pipe id in the header, first thing.
- nni_msg_header_must_append_u32(msg, nni_pipe_id(p->pipe));
+ nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe));
// Move backtrace from body to header
hops = 1;
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index 22677a17..e4cdca2c 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -244,7 +244,7 @@ surv0_ctx_send(void *arg, nni_aio *aio)
return;
}
nni_msg_header_clear(msg);
- nni_msg_header_must_append_u32(msg, (uint32_t) ctx->survey_id);
+ nni_msg_header_append_u32(msg, (uint32_t) ctx->survey_id);
// From this point, we're committed to success. Note that we send
// regardless of whether there are any pipes or not. If no pipes,
@@ -473,7 +473,7 @@ surv0_pipe_recv_cb(void *arg)
return;
}
id = nni_msg_trim_u32(msg);
- nni_msg_header_must_append_u32(msg, id);
+ nni_msg_header_append_u32(msg, id);
nni_mtx_lock(&sock->mtx);
// Best effort at delivery. Discard if no context or context is
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
index c664f009..25aacc2c 100644
--- a/src/protocol/survey0/xrespond.c
+++ b/src/protocol/survey0/xrespond.c
@@ -284,7 +284,7 @@ xresp0_recv_cb(void *arg)
nni_msg_set_pipe(msg, p->id);
// Store the pipe id in the header, first thing.
- nni_msg_header_must_append_u32(msg, p->id);
+ nni_msg_header_append_u32(msg, p->id);
// Move backtrace from body to header
hops = 1;