aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/message.c99
-rw-r--r--src/core/message.h29
-rw-r--r--src/core/socket.c2
-rw-r--r--src/nng.c58
-rw-r--r--src/nng.h23
-rw-r--r--src/nng_compat.c4
-rw-r--r--src/protocol/bus/bus.c7
-rw-r--r--src/protocol/pair/pair_v1.c3
-rw-r--r--src/protocol/reqrep/rep.c18
-rw-r--r--src/protocol/reqrep/req.c9
-rw-r--r--src/protocol/survey/respond.c18
-rw-r--r--src/protocol/survey/survey.c35
-rw-r--r--src/transport/inproc/inproc.c4
-rw-r--r--tests/CMakeLists.txt2
-rw-r--r--tests/message.c186
-rw-r--r--tests/survey.c2
16 files changed, 361 insertions, 138 deletions
diff --git a/src/core/message.c b/src/core/message.c
index 316bc035..10c42a25 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -1,5 +1,5 @@
//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -93,7 +93,7 @@ nni_msg_dump(const char *banner, const nni_msg *msg)
// and headroom (excluding the length) are available. It also copies
// any extant referenced data. Note that the capacity will increase,
// but not the length. To increase the length of the referenced data,
-// use either chunk_append or chunk_prepend.
+// use either chunk_append or chunk_insert.
//
// Note that having some headroom is useful when data must be prepended
// to a message - it avoids having to perform extra data copies, so we
@@ -113,6 +113,11 @@ nni_chunk_grow(nni_chunk *ch, size_t newsz, size_t headwanted)
// The test below also covers the case where the pointers are both
// NULL, or the capacity is zero.
+ // No shrinking (violets)
+ if (newsz < ch->ch_len) {
+ newsz = ch->ch_len;
+ }
+
if ((ch->ch_ptr >= ch->ch_buf) &&
(ch->ch_ptr < (ch->ch_buf + ch->ch_cap))) {
headroom = (size_t)(ch->ch_ptr - ch->ch_buf);
@@ -124,6 +129,13 @@ nni_chunk_grow(nni_chunk *ch, size_t newsz, size_t headwanted)
// We have enough space at the ends already.
return (0);
}
+ // Make sure we allocate at least as much tail room as we
+ // previously had.
+
+ if (newsz < (ch->ch_cap - headroom)) {
+ newsz = ch->ch_cap - headroom;
+ }
+
if ((newbuf = nni_alloc(newsz + headwanted)) == NULL) {
return (NNG_ENOMEM);
}
@@ -164,9 +176,16 @@ nni_chunk_free(nni_chunk *ch)
ch->ch_cap = 0;
}
-// nni_chunk_trunc truncates bytes from the end of the chunk.
+// nni_chunk_clear just resets the length to zero.
+static void
+nni_chunk_clear(nni_chunk *ch)
+{
+ ch->ch_len = 0;
+}
+
+// nni_chunk_chop truncates bytes from the end of the chunk.
static int
-nni_chunk_trunc(nni_chunk *ch, size_t len)
+nni_chunk_chop(nni_chunk *ch, size_t len)
{
if (ch->ch_len < len) {
return (NNG_EINVAL);
@@ -230,11 +249,11 @@ nni_chunk_append(nni_chunk *ch, const void *data, size_t len)
return (0);
}
-// nni_chunk_prepend prepends data to the chunk, as efficiently as possible.
+// nni_chunk_insert prepends data to the chunk, as efficiently as possible.
// If the data pointer is NULL, then no data is actually copied, but the
// data region will have "grown" in the beginning, with uninitialized data.
static int
-nni_chunk_prepend(nni_chunk *ch, const void *data, size_t len)
+nni_chunk_insert(nni_chunk *ch, const void *data, size_t len)
{
int rv;
@@ -267,11 +286,11 @@ nni_chunk_prepend(nni_chunk *ch, const void *data, size_t len)
}
static int
-nni_chunk_prepend_u32(nni_chunk *ch, uint32_t val)
+nni_chunk_insert_u32(nni_chunk *ch, uint32_t val)
{
unsigned char buf[sizeof(uint32_t)];
NNI_PUT32(buf, val);
- return (nni_chunk_prepend(ch, buf, sizeof(buf)));
+ return (nni_chunk_insert(ch, buf, sizeof(buf)));
}
static int
@@ -293,12 +312,12 @@ nni_chunk_trim_u32(nni_chunk *ch)
}
static uint32_t
-nni_chunk_trunc_u32(nni_chunk *ch)
+nni_chunk_chop_u32(nni_chunk *ch)
{
uint32_t v;
NNI_ASSERT(ch->ch_len >= sizeof(v));
NNI_GET32(ch->ch_ptr + ch->ch_len - sizeof(v), v);
- nni_chunk_trunc(ch, sizeof(v));
+ nni_chunk_chop(ch, sizeof(v));
return (v);
}
@@ -448,7 +467,7 @@ nni_msg_getopt(nni_msg *m, int opt, void *val, size_t *szp)
}
}
}
- return (NNG_ENOTSUP);
+ return (NNG_ENOENT);
}
int
@@ -463,7 +482,7 @@ nni_msg_realloc(nni_msg *m, size_t sz)
}
} else {
// "Shrinking", just mark bytes at end usable again.
- nni_chunk_trunc(&m->m_body, m->m_body.ch_len - sz);
+ nni_chunk_chop(&m->m_body, m->m_body.ch_len - sz);
}
return (0);
}
@@ -475,7 +494,7 @@ nni_msg_header(nni_msg *m)
}
size_t
-nni_msg_header_len(nni_msg *m)
+nni_msg_header_len(const nni_msg *m)
{
return (m->m_header.ch_len);
}
@@ -487,7 +506,7 @@ nni_msg_body(nni_msg *m)
}
size_t
-nni_msg_len(nni_msg *m)
+nni_msg_len(const nni_msg *m)
{
return (m->m_body.ch_len);
}
@@ -499,9 +518,9 @@ nni_msg_append(nni_msg *m, const void *data, size_t len)
}
int
-nni_msg_prepend(nni_msg *m, const void *data, size_t len)
+nni_msg_insert(nni_msg *m, const void *data, size_t len)
{
- return (nni_chunk_prepend(&m->m_body, data, len));
+ return (nni_chunk_insert(&m->m_body, data, len));
}
int
@@ -511,33 +530,35 @@ nni_msg_trim(nni_msg *m, size_t len)
}
int
-nni_msg_trunc(nni_msg *m, size_t len)
+nni_msg_chop(nni_msg *m, size_t len)
{
- return (nni_chunk_trunc(&m->m_body, len));
+ return (nni_chunk_chop(&m->m_body, len));
}
int
-nni_msg_append_header(nni_msg *m, const void *data, size_t len)
+nni_msg_header_append(nni_msg *m, const void *data, size_t len)
{
return (nni_chunk_append(&m->m_header, data, len));
}
+
int
-nni_msg_prepend_header(nni_msg *m, const void *data, size_t len)
+nni_msg_header_insert(nni_msg *m, const void *data, size_t len)
{
- return (nni_chunk_prepend(&m->m_header, data, len));
+ return (nni_chunk_insert(&m->m_header, data, len));
}
int
-nni_msg_trim_header(nni_msg *m, size_t len)
+nni_msg_header_trim(nni_msg *m, size_t len)
{
return (nni_chunk_trim(&m->m_header, len));
}
int
-nni_msg_trunc_header(nni_msg *m, size_t len)
+nni_msg_header_chop(nni_msg *m, size_t len)
{
- return (nni_chunk_trunc(&m->m_header, len));
+ return (nni_chunk_chop(&m->m_header, len));
}
+
int
nni_msg_append_u32(nni_msg *m, uint32_t val)
{
@@ -545,9 +566,9 @@ nni_msg_append_u32(nni_msg *m, uint32_t val)
}
int
-nni_msg_prepend_u32(nni_msg *m, uint32_t val)
+nni_msg_insert_u32(nni_msg *m, uint32_t val)
{
- return (nni_chunk_prepend_u32(&m->m_body, val));
+ return (nni_chunk_insert_u32(&m->m_body, val));
}
int
@@ -557,15 +578,15 @@ nni_msg_header_append_u32(nni_msg *m, uint32_t val)
}
int
-nni_msg_header_prepend_u32(nni_msg *m, uint32_t val)
+nni_msg_header_insert_u32(nni_msg *m, uint32_t val)
{
- return (nni_chunk_prepend_u32(&m->m_header, val));
+ return (nni_chunk_insert_u32(&m->m_header, val));
}
uint32_t
-nni_msg_trunc_u32(nni_msg *m)
+nni_msg_chop_u32(nni_msg *m)
{
- return (nni_chunk_trunc_u32(&m->m_body));
+ return (nni_chunk_chop_u32(&m->m_body));
}
uint32_t
@@ -575,9 +596,9 @@ nni_msg_trim_u32(nni_msg *m)
}
uint32_t
-nni_msg_header_trunc_u32(nni_msg *m)
+nni_msg_header_chop_u32(nni_msg *m)
{
- return (nni_chunk_trunc_u32(&m->m_header));
+ return (nni_chunk_chop_u32(&m->m_header));
}
uint32_t
nni_msg_header_trim_u32(nni_msg *m)
@@ -586,13 +607,25 @@ nni_msg_header_trim_u32(nni_msg *m)
}
void
+nni_msg_clear(nni_msg *m)
+{
+ nni_chunk_clear(&m->m_body);
+}
+
+void
+nni_msg_header_clear(nni_msg *m)
+{
+ nni_chunk_clear(&m->m_header);
+}
+
+void
nni_msg_set_pipe(nni_msg *m, uint32_t pid)
{
m->m_pipe = pid;
}
uint32_t
-nni_msg_get_pipe(nni_msg *m)
+nni_msg_get_pipe(const nni_msg *m)
{
return (m->m_pipe);
} \ No newline at end of file
diff --git a/src/core/message.h b/src/core/message.h
index 4bc15321..00d31436 100644
--- a/src/core/message.h
+++ b/src/core/message.h
@@ -12,35 +12,38 @@
#define CORE_MESSAGE_H
// Internally used message API. Again, this is not part of our public API.
+// "trim" operations work from the front, and "chop" work from the end.
extern int nni_msg_alloc(nni_msg **, size_t);
extern void nni_msg_free(nni_msg *);
extern int nni_msg_realloc(nni_msg *, size_t);
extern int nni_msg_dup(nni_msg **, const nni_msg *);
extern void * nni_msg_header(nni_msg *);
-extern size_t nni_msg_header_len(nni_msg *);
+extern size_t nni_msg_header_len(const nni_msg *);
extern void * nni_msg_body(nni_msg *);
-extern size_t nni_msg_len(nni_msg *);
+extern size_t nni_msg_len(const nni_msg *);
extern int nni_msg_append(nni_msg *, const void *, size_t);
-extern int nni_msg_prepend(nni_msg *, const void *, size_t);
-extern int nni_msg_append_header(nni_msg *, const void *, size_t);
-extern int nni_msg_prepend_header(nni_msg *, const void *, size_t);
+extern int nni_msg_insert(nni_msg *, const void *, size_t);
+extern int nni_msg_header_append(nni_msg *, const void *, size_t);
+extern int nni_msg_header_insert(nni_msg *, const void *, size_t);
extern int nni_msg_trim(nni_msg *, size_t);
-extern int nni_msg_trunc(nni_msg *, size_t);
-extern int nni_msg_trim_header(nni_msg *, size_t);
-extern int nni_msg_trunc_header(nni_msg *, size_t);
+extern int nni_msg_chop(nni_msg *, size_t);
+extern void nni_msg_clear(nni_msg *);
+extern void nni_msg_header_clear(nni_msg *);
+extern int nni_msg_header_trim(nni_msg *, size_t);
+extern int nni_msg_header_chop(nni_msg *, size_t);
extern int nni_msg_setopt(nni_msg *, int, const void *, size_t);
extern int nni_msg_getopt(nni_msg *, int, void *, size_t *);
extern void nni_msg_dump(const char *, const nni_msg *);
extern int nni_msg_append_u32(nni_msg *, uint32_t);
-extern int nni_msg_prepend_u32(nni_msg *, uint32_t);
+extern int nni_msg_insert_u32(nni_msg *, uint32_t);
extern int nni_msg_header_append_u32(nni_msg *, uint32_t);
-extern int nni_msg_header_prepend_u32(nni_msg *, uint32_t);
+extern int nni_msg_header_insert_u32(nni_msg *, uint32_t);
extern uint32_t nni_msg_trim_u32(nni_msg *);
-extern uint32_t nni_msg_trunc_u32(nni_msg *);
+extern uint32_t nni_msg_chop_u32(nni_msg *);
extern uint32_t nni_msg_header_trim_u32(nni_msg *);
-extern uint32_t nni_msg_header_trunc_u32(nni_msg *);
+extern uint32_t nni_msg_header_chop_u32(nni_msg *);
extern void nni_msg_set_pipe(nni_msg *, uint32_t);
-extern uint32_t nni_msg_get_pipe(nni_msg *);
+extern uint32_t nni_msg_get_pipe(const nni_msg *);
#endif // CORE_SOCKET_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 46ed2100..711cd57a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -411,7 +411,6 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
{
nni_sock *s = NULL;
int rv;
- uint32_t sockid;
if (proto->proto_version != NNI_PROTOCOL_VERSION) {
// unsupported protocol version
@@ -603,7 +602,6 @@ void
nni_sock_closeall(void)
{
nni_sock *s;
- uint32_t id;
for (;;) {
nni_mtx_lock(&nni_sock_lk);
diff --git a/src/nng.c b/src/nng.c
index 815fb2b8..50f323fd 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -486,7 +486,7 @@ nng_msg_body(nng_msg *msg)
}
size_t
-nng_msg_len(nng_msg *msg)
+nng_msg_len(const nng_msg *msg)
{
return (nni_msg_len(msg));
}
@@ -498,7 +498,7 @@ nng_msg_header(nng_msg *msg)
}
size_t
-nng_msg_header_len(nng_msg *msg)
+nng_msg_header_len(const nng_msg *msg)
{
return (nni_msg_header_len(msg));
}
@@ -510,21 +510,21 @@ nng_msg_append(nng_msg *msg, const void *data, size_t sz)
}
int
-nng_msg_prepend(nng_msg *msg, const void *data, size_t sz)
+nng_msg_insert(nng_msg *msg, const void *data, size_t sz)
{
- return (nni_msg_prepend(msg, data, sz));
+ return (nni_msg_insert(msg, data, sz));
}
int
-nng_msg_append_header(nng_msg *msg, const void *data, size_t sz)
+nng_msg_header_append(nng_msg *msg, const void *data, size_t sz)
{
- return (nni_msg_append_header(msg, data, sz));
+ return (nni_msg_header_append(msg, data, sz));
}
int
-nng_msg_prepend_header(nng_msg *msg, const void *data, size_t sz)
+nng_msg_header_insert(nng_msg *msg, const void *data, size_t sz)
{
- return (nni_msg_prepend_header(msg, data, sz));
+ return (nni_msg_header_insert(msg, data, sz));
}
int
@@ -534,21 +534,51 @@ nng_msg_trim(nng_msg *msg, size_t sz)
}
int
-nng_msg_trunc(nng_msg *msg, size_t sz)
+nng_msg_chop(nng_msg *msg, size_t sz)
{
- return (nni_msg_trunc(msg, sz));
+ return (nni_msg_chop(msg, sz));
}
int
-nng_msg_trim_header(nng_msg *msg, size_t sz)
+nng_msg_header_trim(nng_msg *msg, size_t sz)
{
- return (nni_msg_trim_header(msg, sz));
+ return (nni_msg_header_trim(msg, sz));
}
int
-nng_msg_trunc_header(nng_msg *msg, size_t sz)
+nng_msg_header_chop(nng_msg *msg, size_t sz)
{
- return (nni_msg_trunc_header(msg, sz));
+ return (nni_msg_header_chop(msg, sz));
+}
+
+void
+nng_msg_clear(nng_msg *msg)
+{
+ nni_msg_clear(msg);
+}
+
+void
+nng_msg_header_clear(nng_msg *msg)
+{
+ nni_msg_header_clear(msg);
+}
+
+int
+nng_msg_dup(nng_msg **dup, const nng_msg *src)
+{
+ return (nni_msg_dup(dup, src));
+}
+
+nng_pipe
+nng_msg_get_pipe(const nng_msg *msg)
+{
+ return (nni_msg_get_pipe(msg));
+}
+
+void
+nng_msg_set_pipe(nng_msg *msg, nng_pipe p)
+{
+ nni_msg_set_pipe(msg, p);
}
int
diff --git a/src/nng.h b/src/nng.h
index 247be1f7..9f66cdd9 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -252,18 +252,23 @@ NNG_DECL int nng_msg_alloc(nng_msg **, size_t);
NNG_DECL void nng_msg_free(nng_msg *);
NNG_DECL int nng_msg_realloc(nng_msg *, size_t);
NNG_DECL void *nng_msg_header(nng_msg *);
-NNG_DECL size_t nng_msg_header_len(nng_msg *);
+NNG_DECL size_t nng_msg_header_len(const nng_msg *);
NNG_DECL void * nng_msg_body(nng_msg *);
-NNG_DECL size_t nng_msg_len(nng_msg *);
+NNG_DECL size_t nng_msg_len(const nng_msg *);
NNG_DECL int nng_msg_append(nng_msg *, const void *, size_t);
-NNG_DECL int nng_msg_prepend(nng_msg *, const void *, size_t);
+NNG_DECL int nng_msg_insert(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_trim(nng_msg *, size_t);
-NNG_DECL int nng_msg_trunc(nng_msg *, size_t);
-NNG_DECL int nng_msg_append_header(nng_msg *, const void *, size_t);
-NNG_DECL int nng_msg_prepend_header(nng_msg *, const void *, size_t);
-NNG_DECL int nng_msg_trim_header(nng_msg *, size_t);
-NNG_DECL int nng_msg_trunc_header(nng_msg *, size_t);
-NNG_DECL int nng_msg_getopt(nng_msg *, int, void *, size_t *);
+NNG_DECL int nng_msg_chop(nng_msg *, size_t);
+NNG_DECL int nng_msg_header_append(nng_msg *, const void *, size_t);
+NNG_DECL int nng_msg_header_insert(nng_msg *, const void *, size_t);
+NNG_DECL int nng_msg_header_trim(nng_msg *, size_t);
+NNG_DECL int nng_msg_header_chop(nng_msg *, size_t);
+NNG_DECL int nng_msg_dup(nng_msg **, const nng_msg *);
+NNG_DECL void nng_msg_clear(nng_msg *);
+NNG_DECL void nng_msg_header_clear(nng_msg *);
+NNG_DECL void nng_msg_set_pipe(nng_msg *, nng_pipe);
+NNG_DECL nng_pipe nng_msg_get_pipe(const nng_msg *);
+NNG_DECL int nng_msg_getopt(nng_msg *, int, void *, size_t *);
// Pipe API. Generally pipes are only "observable" to applications, but
// we do permit an application to close a pipe. This can be useful, for
diff --git a/src/nng_compat.c b/src/nng_compat.c
index dfcb2134..710be9be 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -345,7 +345,7 @@ nn_recvmsg(int s, struct nn_msghdr *mh, int flags)
if ((mh->msg_iovlen == 1) && (mh->msg_iov[0].iov_len == NN_MSG)) {
// Receiver wants to have a dynamically allocated message.
// There can only be one of these.
- if ((rv = nng_msg_prepend(msg, &msg, sizeof(msg))) != 0) {
+ if ((rv = nng_msg_insert(msg, &msg, sizeof(msg))) != 0) {
nng_msg_free(msg);
nn_seterror(rv);
return (-1);
@@ -534,7 +534,7 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
continue;
}
data += sizeof(spsz);
- rv = nng_msg_append_header(msg, data, spsz);
+ rv = nng_msg_header_append(msg, data, spsz);
if (rv != 0) {
if (!keep) {
nng_msg_free(msg);
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index c5a6ce06..79d7187e 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -240,16 +240,14 @@ nni_bus_pipe_recv_cb(void *arg)
nni_bus_pipe *ppipe = arg;
nni_bus_sock *psock = ppipe->psock;
nni_msg * msg;
- uint32_t id;
if (nni_aio_result(&ppipe->aio_recv) != 0) {
nni_pipe_stop(ppipe->npipe);
return;
}
msg = ppipe->aio_recv.a_msg;
- id = nni_pipe_id(ppipe->npipe);
- if (nni_msg_prepend_header(msg, &id, 4) != 0) {
+ if (nni_msg_header_insert_u32(msg, nni_pipe_id(ppipe->npipe)) != 0) {
// XXX: bump a nomemory stat
nni_msg_free(msg);
nni_pipe_stop(ppipe->npipe);
@@ -298,8 +296,7 @@ nni_bus_sock_getq_cb(void *arg)
// is doing this probably.) In this case grab the pipe
// ID from the header, so we can exclude it.
if (nni_msg_header_len(msg) >= 4) {
- memcpy(&sender, nni_msg_header(msg), 4);
- nni_msg_trim_header(msg, 4);
+ sender = nni_msg_header_trim_u32(msg);
} else {
sender = 0;
}
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
index ea8c1226..812617f1 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair/pair_v1.c
@@ -224,7 +224,7 @@ pair1_pipe_recv_cb(void *arg)
// If we bounced too many times, discard the message, but
// keep getting more.
- if (hdr >= s->ttl) {
+ if (hdr >= (unsigned) s->ttl) {
nni_msg_free(msg);
nni_pipe_recv(npipe, &p->aio_recv);
return;
@@ -309,7 +309,6 @@ pair1_pipe_getq_cb(void *arg)
pair1_sock *s = p->psock;
nni_msg * msg;
uint32_t hops;
- uint8_t * data;
if (nni_aio_result(&p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 1adfd0f8..49bcdce2 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -207,7 +207,6 @@ nni_rep_sock_getq_cb(void *arg)
nni_rep_sock *rep = arg;
nni_msgq * uwq = rep->uwq;
nni_msg * msg;
- uint8_t * header;
uint32_t id;
nni_rep_pipe *rp;
int rv;
@@ -234,9 +233,7 @@ nni_rep_sock_getq_cb(void *arg)
return;
}
- header = nni_msg_header(msg);
- NNI_GET32(header, id);
- nni_msg_trim_header(msg, 4);
+ id = nni_msg_header_trim_u32(msg);
// Look for the pipe, and attempt to put the message there
// (nonblocking) if we can. If we can't for any reason, then we
@@ -291,7 +288,6 @@ nni_rep_pipe_recv_cb(void *arg)
nni_rep_sock *rep = rp->rep;
nni_msg * msg;
int rv;
- uint8_t idbuf[4];
uint8_t * body;
int hops;
@@ -300,13 +296,11 @@ nni_rep_pipe_recv_cb(void *arg)
return;
}
- NNI_PUT32(idbuf, nni_pipe_id(rp->pipe));
-
msg = rp->aio_recv.a_msg;
rp->aio_recv.a_msg = NULL;
// Store the pipe id in the header, first thing.
- rv = nni_msg_append_header(msg, idbuf, 4);
+ rv = nni_msg_header_append_u32(msg, nni_pipe_id(rp->pipe));
if (rv != 0) {
goto malformed;
}
@@ -323,7 +317,7 @@ nni_rep_pipe_recv_cb(void *arg)
}
body = nni_msg_body(msg);
end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_append_header(msg, body, 4);
+ rv = nni_msg_header_append(msg, body, 4);
if (rv != 0) {
// Presumably this is due to out of memory.
// We could just discard and try again, but we
@@ -422,9 +416,9 @@ nni_rep_sock_sfilter(void *arg, nni_msg *msg)
}
// drop anything else in the header...
- nni_msg_trunc_header(msg, nni_msg_header_len(msg));
+ nni_msg_header_clear(msg);
- if (nni_msg_append_header(msg, rep->btrace, rep->btrace_len) != 0) {
+ if (nni_msg_header_append(msg, rep->btrace, rep->btrace_len) != 0) {
nni_free(rep->btrace, rep->btrace_len);
rep->btrace = NULL;
rep->btrace_len = 0;
@@ -463,7 +457,7 @@ nni_rep_sock_rfilter(void *arg, nni_msg *msg)
}
rep->btrace_len = len;
memcpy(rep->btrace, header, len);
- nni_msg_trunc_header(msg, len);
+ nni_msg_header_clear(msg);
return (msg);
}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index d0dd3887..20fb07f8 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -437,17 +437,14 @@ nni_req_recv_cb(void *arg)
// Malformed message.
goto malformed;
}
- if (nni_msg_append_header(msg, nni_msg_body(msg), 4) != 0) {
+ if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) {
// Arguably we could just discard and carry on. But
// dropping the connection is probably more helpful since
// it lets the other side see that a problem occurred.
// Plus it gives us a chance to reclaim some memory.
goto malformed;
}
- if (nni_msg_trim(msg, 4) != 0) {
- // This should never happen - could be an assert.
- nni_panic("Failed to trim REQ header from body");
- }
+ (void) nni_msg_trim(msg, 4); // Cannot fail
rp->aio_putq.a_msg = msg;
nni_msgq_aio_put(rp->req->urq, &rp->aio_putq);
@@ -548,7 +545,7 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
// Request ID is in big endian format.
NNI_PUT32(req->reqid, id);
- if (nni_msg_append_header(msg, req->reqid, 4) != 0) {
+ if (nni_msg_header_append(msg, req->reqid, 4) != 0) {
// Should be ENOMEM.
nni_msg_free(msg);
return (NULL);
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 4db79b86..32513134 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -225,7 +225,6 @@ nni_resp_sock_getq_cb(void *arg)
{
nni_resp_sock *psock = arg;
nni_msg * msg;
- uint8_t * header;
uint32_t id;
nni_resp_pipe *ppipe;
int rv;
@@ -244,9 +243,7 @@ nni_resp_sock_getq_cb(void *arg)
nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
return;
}
- header = nni_msg_header(msg);
- NNI_GET32(header, id);
- nni_msg_trim_header(msg, 4);
+ id = nni_msg_header_trim_u32(msg);
nni_mtx_lock(&psock->mtx);
rv = nni_idhash_find(psock->pipes, id, (void **) &ppipe);
@@ -301,7 +298,6 @@ nni_resp_recv_cb(void *arg)
nni_resp_sock *psock = ppipe->psock;
nni_msgq * urq;
nni_msg * msg;
- uint8_t idbuf[4];
int hops;
int rv;
@@ -311,13 +307,11 @@ nni_resp_recv_cb(void *arg)
urq = nni_sock_recvq(psock->nsock);
- NNI_PUT32(idbuf, ppipe->id);
-
msg = ppipe->aio_recv.a_msg;
ppipe->aio_recv.a_msg = NULL;
// Store the pipe id in the header, first thing.
- if (nni_msg_append_header(msg, idbuf, 4) != 0) {
+ if (nni_msg_header_append_u32(msg, ppipe->id) != 0) {
nni_msg_free(msg);
goto error;
}
@@ -338,7 +332,7 @@ nni_resp_recv_cb(void *arg)
}
body = nni_msg_body(msg);
end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_append_header(msg, body, 4);
+ rv = nni_msg_header_append(msg, body, 4);
if (rv != 0) {
nni_msg_free(msg);
goto error;
@@ -439,9 +433,9 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg)
}
// drop anything else in the header...
- nni_msg_trunc_header(msg, nni_msg_header_len(msg));
+ nni_msg_header_clear(msg);
- if (nni_msg_append_header(msg, psock->btrace, psock->btrace_len) !=
+ if (nni_msg_header_append(msg, psock->btrace, psock->btrace_len) !=
0) {
nni_free(psock->btrace, psock->btrace_len);
psock->btrace = NULL;
@@ -481,7 +475,7 @@ nni_resp_sock_rfilter(void *arg, nni_msg *msg)
}
psock->btrace_len = len;
memcpy(psock->btrace, header, len);
- nni_msg_trunc_header(msg, len);
+ nni_msg_header_clear(msg);
return (msg);
}
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 91fe4ad3..45b06d67 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -33,8 +33,8 @@ struct nni_surv_sock {
nni_time expire;
int raw;
int closing;
- uint32_t nextid; // next id
- uint8_t survid[4]; // outstanding request ID (big endian)
+ uint32_t nextid; // next id
+ uint32_t survid; // outstanding request ID (big endian)
nni_list pipes;
nni_aio aio_getq;
nni_timer_node timer;
@@ -271,16 +271,13 @@ nni_surv_recv_cb(void *arg)
nni_msg_free(msg);
goto failed;
}
- if (nni_msg_append_header(msg, nni_msg_body(msg), 4) != 0) {
+ if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) {
// Should be NNG_ENOMEM
nni_msg_free(msg);
goto failed;
}
- if (nni_msg_trim(msg, 4) != 0) {
- // This should never happen - could be an assert.
- nni_msg_free(msg);
- goto failed;
- }
+ (void) nni_msg_trim(msg, 4);
+
ppipe->aio_putq.a_msg = msg;
nni_msgq_aio_put(ppipe->psock->urq, &ppipe->aio_putq);
return;
@@ -309,7 +306,7 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
} else {
nni_sock_recverr(psock->nsock, NNG_ESTATE);
}
- memset(psock->survid, 0, sizeof(psock->survid));
+ psock->survid = 0;
nni_timer_cancel(&psock->timer);
}
break;
@@ -381,7 +378,7 @@ nni_surv_timeout(void *arg)
nni_surv_sock *psock = arg;
nni_sock_lock(psock->nsock);
- memset(psock->survid, 0, sizeof(psock->survid));
+ psock->survid = 0;
nni_sock_recverr(psock->nsock, NNG_ESTATE);
nni_msgq_set_get_error(psock->urq, NNG_ETIMEDOUT);
nni_sock_unlock(psock->nsock);
@@ -391,7 +388,6 @@ static nni_msg *
nni_surv_sock_sfilter(void *arg, nni_msg *msg)
{
nni_surv_sock *psock = arg;
- uint32_t id;
if (psock->raw) {
// No automatic retry, and the request ID must
@@ -402,12 +398,9 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg)
// Generate a new request ID. We always set the high
// order bit so that the peer can locate the end of the
// backtrace. (Pipe IDs have the high order bit clear.)
- id = (psock->nextid++) | 0x80000000u;
-
- // Survey ID is in big endian format.
- NNI_PUT32(psock->survid, id);
+ psock->survid = (psock->nextid++) | 0x80000000u;
- if (nni_msg_append_header(msg, psock->survid, 4) != 0) {
+ if (nni_msg_header_append_u32(msg, psock->survid) != 0) {
// Should be ENOMEM.
nni_msg_free(msg);
return (NULL);
@@ -436,18 +429,12 @@ nni_surv_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
- if (nni_msg_header_len(msg) < 4) {
- nni_msg_free(msg);
- return (NULL);
- }
-
- if (memcmp(nni_msg_header(msg), ssock->survid, 4) != 0) {
+ if ((nni_msg_header_len(msg) < sizeof(uint32_t)) ||
+ (nni_msg_header_trim_u32(msg) != ssock->survid)) {
// Wrong request id
nni_msg_free(msg);
return (NULL);
}
- // Prune the survey ID.
- nni_msg_trim_header(msg, 4);
return (msg);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 9cc43ad7..37d7153f 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -159,11 +159,11 @@ nni_inproc_pipe_send(void *arg, nni_aio *aio)
// side won't know what to do otherwise.
h = nni_msg_header(msg);
l = nni_msg_header_len(msg);
- if ((rv = nni_msg_prepend(msg, h, l)) != 0) {
+ if ((rv = nni_msg_insert(msg, h, l)) != 0) {
nni_aio_finish(aio, rv, aio->a_count);
return;
}
- nni_msg_trunc_header(msg, l);
+ nni_msg_header_chop(msg, l);
nni_msgq_aio_put(pipe->wq, aio);
}
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 416bcb1f..f0c09ca6 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -87,7 +87,7 @@ add_nng_test(sock 5)
add_nng_test(survey 5)
add_nng_test(tcp 5)
add_nng_test(scalability 20)
-add_nng_test(device 5)
+add_nng_test(message 5)
# compatbility tests
add_nng_compat_test(compat_block 5)
diff --git a/tests/message.c b/tests/message.c
new file mode 100644
index 00000000..2fc119f5
--- /dev/null
+++ b/tests/message.c
@@ -0,0 +1,186 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+
+#include <string.h>
+
+TestMain("Message Tests", {
+ int rv;
+ nng_msg *msg;
+
+ Convey("Given an empty message", {
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+
+ Reset({ nng_msg_free(msg); });
+
+ Convey("Lengths are empty", {
+ So(nng_msg_len(msg) == 0);
+ So(nng_msg_header_len(msg) == 0);
+ });
+
+ Convey("We can append to the header", {
+ So(nng_msg_header_append(msg, "pad", 4) == 0);
+ So(nng_msg_header_len(msg) == 4);
+ So(strcmp(nng_msg_header(msg), "pad") == 0);
+ });
+
+ Convey("We can append to the body", {
+ So(nng_msg_append(msg, "123", 4) == 0);
+ So(nng_msg_len(msg) == 4);
+ So(strcmp(nng_msg_body(msg), "123") == 0);
+ });
+
+ Convey("We can insert to the header", {
+ So(nng_msg_header_append(msg, "def", 4) == 0);
+ So(nng_msg_header_insert(msg, "abc", 3) == 0);
+ So(nng_msg_header_len(msg) == 7);
+ So(strcmp(nng_msg_header(msg), "abcdef") == 0);
+
+ Convey("We can delete from the front", {
+ So(nng_msg_header_trim(msg, 2) == 0);
+ So(nng_msg_header_len(msg) == 5);
+ So(strcmp(nng_msg_header(msg), "cdef") == 0);
+ });
+
+ Convey("We can delete from the back", {
+ So(nng_msg_header_chop(msg, 5) == 0);
+ So(nng_msg_header_len(msg) == 2);
+ So(memcmp(nng_msg_header(msg), "ab", 2) == 0);
+ });
+ });
+
+ Convey("We can insert to the body", {
+ So(nng_msg_append(msg, "xyz", 4) == 0);
+ So(nng_msg_insert(msg, "uvw", 3) == 0);
+ So(nng_msg_len(msg) == 7);
+ So(strcmp(nng_msg_body(msg), "uvwxyz") == 0);
+
+ Convey("We can delete from the front", {
+ So(nng_msg_trim(msg, 2) == 0);
+ So(nng_msg_len(msg) == 5);
+ So(strcmp(nng_msg_body(msg), "wxyz") == 0);
+ });
+
+ Convey("We can delete from the back", {
+ So(nng_msg_chop(msg, 5) == 0);
+ So(nng_msg_len(msg) == 2);
+ So(memcmp(nng_msg_body(msg), "uv", 2) == 0);
+ });
+ });
+
+ Convey("Clearing the header works", {
+ So(nng_msg_header_append(msg, "bogus", 6) == 0);
+ So(nng_msg_header_len(msg) == 6);
+ nng_msg_header_clear(msg);
+ So(nng_msg_header_len(msg) == 0);
+ });
+
+ Convey("Clearing the body works", {
+ So(nng_msg_append(msg, "bogus", 6) == 0);
+ So(nng_msg_len(msg) == 6);
+ nng_msg_clear(msg);
+ So(nng_msg_len(msg) == 0);
+ });
+
+ Convey("We cannot delete more header than exists", {
+ So(nng_msg_header_append(
+ msg, "short", strlen("short") + 1) == 0);
+ So(nng_msg_header_trim(msg, 16) == NNG_EINVAL);
+ So(nng_msg_header_len(msg) == strlen("short") + 1);
+ So(nng_msg_header_chop(msg, 16) == NNG_EINVAL);
+ So(nng_msg_header_len(msg) == strlen("short") + 1);
+ So(strcmp(nng_msg_header(msg), "short") == 0);
+ });
+
+ Convey("We cannot delete more body than exists", {
+ So(nng_msg_append(msg, "short", strlen("short") + 1) ==
+ 0);
+ So(nng_msg_trim(msg, 16) == NNG_EINVAL);
+ So(nng_msg_len(msg) == strlen("short") + 1);
+ So(nng_msg_chop(msg, 16) == NNG_EINVAL);
+ So(nng_msg_len(msg) == strlen("short") + 1);
+ So(strcmp(nng_msg_body(msg), "short") == 0);
+ });
+
+ Convey("Pipe retrievals work", {
+ So(nng_msg_get_pipe(msg) == 0);
+ nng_msg_set_pipe(msg, (nng_pipe) 45);
+ So(nng_msg_get_pipe(msg) == (nng_pipe) 45);
+ });
+
+ Convey("Message realloc works", {
+ So(nng_msg_append(msg, "abc", 4) == 0);
+ So(nng_msg_realloc(msg, 1500) == 0);
+ So(nng_msg_len(msg) == 1500);
+ So(strcmp(nng_msg_body(msg), "abc") == 0);
+ So(nng_msg_realloc(msg, 2) == 0);
+ So(nng_msg_len(msg) == 2);
+ So(memcmp(nng_msg_body(msg), "abc", 2) == 0);
+ So(nng_msg_append(msg, "CDEF", strlen("CDEF") + 1) ==
+ 0);
+ So(nng_msg_len(msg) == strlen("abCDEF") + 1);
+ So(strcmp(nng_msg_body(msg), "abCDEF") == 0);
+ });
+
+ Convey("Inserting a lot of data works", {
+ char chunk[1024];
+ memset(chunk, '+', sizeof(chunk));
+ So(nng_msg_append(msg, "abc", strlen("abc") + 1) == 0);
+ So(nng_msg_len(msg) == strlen("abc") + 1);
+ So(nng_msg_insert(msg, chunk, sizeof(chunk)) == 0);
+ So(nng_msg_len(msg) ==
+ strlen("abc") + 1 + sizeof(chunk));
+ So(memcmp(chunk, nng_msg_body(msg), sizeof(chunk)) ==
+ 0);
+ So(strcmp((char *) nng_msg_body(msg) + sizeof(chunk),
+ "abc") == 0);
+ So(nng_msg_trim(msg, sizeof(chunk) - 2) == 0);
+ So(strcmp(nng_msg_body(msg), "++abc") == 0);
+ });
+
+ Convey("Message dup works", {
+ nng_msg *m2;
+
+ So(nng_msg_header_append(
+ msg, "front", strlen("front") + 1) == 0);
+ So(nng_msg_append(msg, "back", strlen("back") + 1) ==
+ 0);
+
+ So(nng_msg_dup(&m2, msg) == 0);
+ Reset({ nng_msg_free(m2); });
+
+ So(nng_msg_len(msg) == strlen("front"));
+ So(nng_msg_len(m2) == strlen("front"));
+ So(nng_msg_header_len(msg) == nng_msg_header_len(m2));
+
+ So(nng_msg_insert(msg, "way", 3) == 0);
+ So(nng_msg_len(msg) == strlen("wayback") + 1);
+ So(nng_msg_len(m2) == strlen("back") + 1);
+ So(strcmp(nng_msg_body(msg), "wayback") == 0);
+ So(strcmp(nng_msg_body(m2), "back") == 0);
+ So(nng_msg_chop(m2, 1) == 0);
+ So(nng_msg_append(
+ m2, "2basics", strlen("2basics") + 1) == 0);
+ So(nng_msg_len(msg) == strlen("wayback") + 1);
+ So(strcmp(nng_msg_body(msg), "wayback") == 0);
+ So(nng_msg_len(m2) == strlen("back2basics") + 1);
+ So(strcmp(nng_msg_body(m2), "back2basics") == 0);
+ });
+
+ Convey("Missing option fails properly", {
+ char buf[128];
+ size_t sz = sizeof(buf);
+ So(nng_msg_getopt(msg, 4545, buf, &sz) == NNG_ENOENT);
+ });
+ });
+});
diff --git a/tests/survey.c b/tests/survey.c
index 6f85850f..2a757a98 100644
--- a/tests/survey.c
+++ b/tests/survey.c
@@ -123,7 +123,7 @@ Main({
msg = NULL;
So(nng_recvmsg(resp, &msg, 0) == 0);
CHECKSTR(msg, "abc");
- nng_msg_trunc(msg, 3);
+ nng_msg_chop(msg, 3);
APPENDSTR(msg, "def");
So(nng_sendmsg(resp, msg, 0) == 0);
msg = NULL;