aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--perf/perf.c13
-rw-r--r--src/core/message.c29
-rw-r--r--src/core/message.h6
-rw-r--r--src/nng.c37
-rw-r--r--src/nng.h6
-rw-r--r--src/protocol/pubsub/sub.c3
-rw-r--r--src/protocol/reqrep/rep.c18
-rw-r--r--src/protocol/reqrep/req.c14
-rw-r--r--src/transport/inproc/inproc.c5
-rw-r--r--src/transport/tcp/tcp.c11
-rw-r--r--tests/reqrep.c25
-rw-r--r--tests/sock.c16
-rw-r--r--tests/trantest.h11
13 files changed, 93 insertions, 101 deletions
diff --git a/perf/perf.c b/perf/perf.c
index 36e563fc..7f0349f6 100644
--- a/perf/perf.c
+++ b/perf/perf.c
@@ -314,7 +314,6 @@ latency_server(const char *addr, int msgsize, int trips)
nng_msg *msg;
int rv;
int i;
- size_t len;
if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) {
die("nng_socket: %s", nng_strerror(rv));
@@ -331,9 +330,9 @@ latency_server(const char *addr, int msgsize, int trips)
if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
die("nng_recvmsg: %s", nng_strerror(rv));
}
- nng_msg_body(msg, &len);
- if (len != msgsize) {
- die("wrong message size: %d != %d", len, msgsize);
+ if (nng_msg_len(msg) != msgsize) {
+ die("wrong message size: %d != %d", nng_msg_len(msg),
+ msgsize);
}
if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
die("nng_sendmsg: %s", nng_strerror(rv));
@@ -389,10 +388,8 @@ throughput_server(const char *addr, int msgsize, int count)
if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
die("nng_recvmsg: %s", nng_strerror(rv));
}
- size_t len;
- nng_msg_body(msg, &len);
- if (len != msgsize) {
- die("wrong message size: %d != %d", len,
+ if (nng_msg_len(msg) != msgsize) {
+ die("wrong message size: %d != %d", nng_msg_len(msg),
msgsize);
}
nni_msg_free(msg);
diff --git a/src/core/message.c b/src/core/message.c
index 351858a4..4c666955 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -185,8 +185,11 @@ nni_chunk_trim(nni_chunk *ch, size_t len)
if (ch->ch_len < len) {
return (NNG_EINVAL);
}
- ch->ch_ptr += len;
ch->ch_len -= len;
+ // Don't advance the pointer if we are just removing the whole content
+ if (ch->ch_len != 0) {
+ ch->ch_ptr += len;
+ }
return (0);
}
@@ -442,25 +445,33 @@ nni_msg_realloc(nni_msg *m, size_t sz)
void *
-nni_msg_header(nni_msg *m, size_t *szp)
+nni_msg_header(nni_msg *m)
{
- if (szp != NULL) {
- *szp = m->m_header.ch_len;
- }
return (m->m_header.ch_ptr);
}
+size_t
+nni_msg_header_len(nni_msg *m)
+{
+ return (m->m_header.ch_len);
+}
+
+
void *
-nni_msg_body(nni_msg *m, size_t *szp)
+nni_msg_body(nni_msg *m)
{
- if (szp != NULL) {
- *szp = m->m_body.ch_len;
- }
return (m->m_body.ch_ptr);
}
+size_t
+nni_msg_len(nni_msg *m)
+{
+ return (m->m_body.ch_len);
+}
+
+
int
nni_msg_append(nni_msg *m, const void *data, size_t len)
{
diff --git a/src/core/message.h b/src/core/message.h
index 0f9a24ac..057c7539 100644
--- a/src/core/message.h
+++ b/src/core/message.h
@@ -16,8 +16,10 @@ extern int nni_msg_alloc(nni_msg **, size_t);
extern void nni_msg_free(nni_msg *);
extern int nni_msg_realloc(nni_msg *, size_t);
extern int nni_msg_dup(nni_msg **, const nni_msg *);
-extern void *nni_msg_header(nni_msg *, size_t *);
-extern void *nni_msg_body(nni_msg *, size_t *);
+extern void *nni_msg_header(nni_msg *);
+extern size_t nni_msg_header_len(nni_msg *);
+extern void *nni_msg_body(nni_msg *);
+extern size_t nni_msg_len(nni_msg *);
extern int nni_msg_append(nni_msg *, const void *, size_t);
extern int nni_msg_prepend(nni_msg *, const void *, size_t);
extern int nni_msg_append_header(nni_msg *, const void *, size_t);
diff --git a/src/nng.c b/src/nng.c
index db2233d2..9a25b285 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -237,7 +237,7 @@ nng_pipe_close(nng_pipe *pipe)
int
nng_msg_alloc(nng_msg **msgp, size_t size)
{
- NNI_INIT_INT();
+ NNI_INIT_VOID();
return (nni_msg_alloc(msgp, size));
}
@@ -245,7 +245,6 @@ nng_msg_alloc(nng_msg **msgp, size_t size)
int
nng_msg_realloc(nng_msg *msg, size_t sz)
{
- NNI_INIT_INT();
return (nni_msg_realloc(msg, sz));
}
@@ -253,31 +252,41 @@ nng_msg_realloc(nng_msg *msg, size_t sz)
void
nng_msg_free(nng_msg *msg)
{
- NNI_INIT_VOID();
return (nni_msg_free(msg));
}
void *
-nng_msg_body(nng_msg *msg, size_t *szp)
+nng_msg_body(nng_msg *msg)
{
- NNI_INIT_VOID();
- return (nni_msg_body(msg, szp));
+ return (nni_msg_body(msg));
+}
+
+
+size_t
+nng_msg_len(nng_msg *msg)
+{
+ return (nni_msg_len(msg));
}
void *
-nng_msg_header(nng_msg *msg, size_t *szp)
+nng_msg_header(nng_msg *msg)
{
- NNI_INIT_VOID();
- return (nni_msg_header(msg, szp));
+ return (nni_msg_header(msg));
+}
+
+
+size_t
+nng_msg_header_len(nng_msg *msg)
+{
+ return (nni_msg_header_len(msg));
}
int
nng_msg_append(nng_msg *msg, const void *data, size_t sz)
{
- NNI_INIT_INT();
return (nni_msg_append(msg, data, sz));
}
@@ -285,7 +294,6 @@ nng_msg_append(nng_msg *msg, const void *data, size_t sz)
int
nng_msg_prepend(nng_msg *msg, const void *data, size_t sz)
{
- NNI_INIT_INT();
return (nni_msg_prepend(msg, data, sz));
}
@@ -293,7 +301,6 @@ nng_msg_prepend(nng_msg *msg, const void *data, size_t sz)
int
nng_msg_append_header(nng_msg *msg, const void *data, size_t sz)
{
- NNI_INIT_INT();
return (nni_msg_append_header(msg, data, sz));
}
@@ -301,7 +308,6 @@ nng_msg_append_header(nng_msg *msg, const void *data, size_t sz)
int
nng_msg_prepend_header(nng_msg *msg, const void *data, size_t sz)
{
- NNI_INIT_INT();
return (nni_msg_prepend_header(msg, data, sz));
}
@@ -309,7 +315,6 @@ nng_msg_prepend_header(nng_msg *msg, const void *data, size_t sz)
int
nng_msg_trim(nng_msg *msg, size_t sz)
{
- NNI_INIT_INT();
return (nni_msg_trim(msg, sz));
}
@@ -317,7 +322,6 @@ nng_msg_trim(nng_msg *msg, size_t sz)
int
nng_msg_trunc(nng_msg *msg, size_t sz)
{
- NNI_INIT_INT();
return (nni_msg_trunc(msg, sz));
}
@@ -325,7 +329,6 @@ nng_msg_trunc(nng_msg *msg, size_t sz)
int
nng_msg_trim_header(nng_msg *msg, size_t sz)
{
- NNI_INIT_INT();
return (nni_msg_trim_header(msg, sz));
}
@@ -333,7 +336,6 @@ nng_msg_trim_header(nng_msg *msg, size_t sz)
int
nng_msg_trunc_header(nng_msg *msg, size_t sz)
{
- NNI_INIT_INT();
return (nni_msg_trunc_header(msg, sz));
}
@@ -341,7 +343,6 @@ nng_msg_trunc_header(nng_msg *msg, size_t sz)
int
nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp)
{
- NNI_INIT_INT();
return (nni_msg_getopt(msg, opt, ptr, szp));
}
diff --git a/src/nng.h b/src/nng.h
index 2a1113ed..e535c9d4 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -200,8 +200,10 @@ NNG_DECL int nng_recvmsg(nng_socket *, nng_msg **, int);
NNG_DECL int nng_msg_alloc(nng_msg **, size_t);
NNG_DECL void nng_msg_free(nng_msg *);
NNG_DECL int nng_msg_realloc(nng_msg *, size_t);
-NNG_DECL void *nng_msg_header(nng_msg *, size_t *);
-NNG_DECL void *nng_msg_body(nng_msg *, size_t *);
+NNG_DECL void *nng_msg_header(nng_msg *);
+NNG_DECL size_t nng_msg_header_len(nng_msg *);
+NNG_DECL void *nng_msg_body(nng_msg *);
+NNG_DECL size_t nng_msg_len(nng_msg *);
NNG_DECL int nng_msg_append(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_prepend(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_trim(nng_msg *, size_t);
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 6cd22712..1243ca47 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -303,7 +303,8 @@ nni_sub_recvfilter(void *arg, nni_msg *msg)
return (msg);
}
- body = nni_msg_body(msg, &len);
+ body = nni_msg_body(msg);
+ len = nni_msg_len(msg);
// Check to see if the message matches one of our subscriptions.
NNI_LIST_FOREACH (&sub->topics, topic) {
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 8c375b61..5e9c3696 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -174,7 +174,6 @@ nni_rep_topsender(void *arg)
for (;;) {
uint8_t *header;
- size_t size;
uint32_t id;
nni_rep_pipe *rp;
int rv;
@@ -183,11 +182,11 @@ nni_rep_topsender(void *arg)
break;
}
// We yank the outgoing pipe id from the header
- header = nni_msg_header(msg, &size);
- if (size < 4) {
+ if (nni_msg_header_len(msg) < 4) {
nni_msg_free(msg);
continue;
}
+ header = nni_msg_header(msg);
NNI_GET32(header, id);
nni_msg_trim_header(msg, 4);
@@ -281,11 +280,11 @@ again:
nni_msg_free(msg);
goto again;
}
- body = nni_msg_body(msg, &len);
- if (len < 4) {
+ if (nni_msg_len(msg) < 4) {
nni_msg_free(msg);
goto again;
}
+ body = nni_msg_body(msg);
end = (body[0] & 0x80) ? 1 : 0;
rv = nni_msg_append_header(msg, body, 4);
if (rv != 0) {
@@ -293,7 +292,6 @@ again:
goto again;
}
nni_msg_trim(msg, 4);
- body = nni_msg_body(msg, &len);
if (end) {
break;
}
@@ -384,8 +382,7 @@ nni_rep_sendfilter(void *arg, nni_msg *msg)
}
// drop anything else in the header...
- (void) nni_msg_header(msg, &len);
- nni_msg_trim_header(msg, len);
+ nni_msg_trunc_header(msg, nni_msg_header_len(msg));
if (nni_msg_append_header(msg, rep->btrace, rep->btrace_len) != 0) {
nni_free(rep->btrace, rep->btrace_len);
@@ -418,7 +415,8 @@ nni_rep_recvfilter(void *arg, nni_msg *msg)
}
nni_sock_senderr(rep->sock, 0);
- header = nni_msg_header(msg, &len);
+ len = nni_msg_header_len(msg);
+ header = nni_msg_header(msg);
if (rep->btrace != NULL) {
nni_free(rep->btrace, rep->btrace_len);
rep->btrace = NULL;
@@ -431,7 +429,7 @@ nni_rep_recvfilter(void *arg, nni_msg *msg)
}
rep->btrace_len = len;
memcpy(rep->btrace, header, len);
- nni_msg_trim_header(msg, len);
+ nni_msg_trunc_header(msg, len);
nni_mtx_unlock(&rep->mx);
return (msg);
}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index d4098526..037acd1c 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -206,20 +206,17 @@ nni_req_pipe_recv(void *arg)
int rv;
for (;;) {
- size_t len;
- char *body;
rv = nni_pipe_recv(pipe, &msg);
if (rv != 0) {
break;
}
// We yank 4 bytes of body, and move them to the header.
- body = nni_msg_body(msg, &len);
- if (len < 4) {
+ if (nni_msg_len(msg) < 4) {
// Not enough data, just toss it.
nni_msg_free(msg);
continue;
}
- if (nni_msg_append_header(msg, body, 4) != 0) {
+ if (nni_msg_append_header(msg, nni_msg_body(msg), 4) != 0) {
// Should be NNG_ENOMEM
nni_msg_free(msg);
continue;
@@ -375,8 +372,6 @@ static nni_msg *
nni_req_recvfilter(void *arg, nni_msg *msg)
{
nni_req_sock *req = arg;
- char *header;
- size_t len;
nni_mtx_lock(&req->mx);
if (req->raw) {
@@ -385,8 +380,7 @@ nni_req_recvfilter(void *arg, nni_msg *msg)
return (msg);
}
- header = nni_msg_header(msg, &len);
- if (len < 4) {
+ if (nni_msg_header_len(msg) < 4) {
nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
return (NULL);
@@ -398,7 +392,7 @@ nni_req_recvfilter(void *arg, nni_msg *msg)
nni_msg_free(msg);
return (NULL);
}
- if (memcmp(header, req->reqid, 4) != 0) {
+ if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) {
// Wrong request id
nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 530e7ccf..df9c9aa9 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -137,12 +137,13 @@ nni_inproc_pipe_send(void *arg, nni_msg *msg)
// We need to move any header data to the body, because the other
// side won't know what to do otherwise.
- h = nni_msg_header(msg, &l);
+ h = nni_msg_header(msg);
+ l = nni_msg_header_len(msg);
if (nni_msg_prepend(msg, h, l) != 0) {
nni_msg_free(msg);
return (0); // Pretend we sent it.
}
- nni_msg_trim_header(msg, l);
+ nni_msg_trunc_header(msg, l);
return (nni_msgq_put(pipe->wq, msg));
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index a37eab6a..b857863f 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -80,8 +80,10 @@ nni_tcp_pipe_send(void *arg, nni_msg *msg)
iov[0].iov_buf = buf;
iov[0].iov_len = sizeof (buf);
- iov[1].iov_buf = nni_msg_header(msg, &iov[1].iov_len);
- iov[2].iov_buf = nni_msg_body(msg, &iov[2].iov_len);
+ iov[1].iov_buf = nni_msg_header(msg);
+ iov[1].iov_len = nni_msg_header_len(msg);
+ iov[2].iov_buf = nni_msg_body(msg);
+ iov[2].iov_len = nni_msg_len(msg);
len = (uint64_t) iov[1].iov_len + (uint64_t) iov[2].iov_len;
NNI_PUT64(buf, len);
@@ -102,7 +104,6 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp)
uint8_t buf[sizeof (len)];
nni_iov iov[1];
int rv;
- size_t sz;
iov[0].iov_buf = buf;
iov[0].iov_len = sizeof (buf);
@@ -118,8 +119,8 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp)
return (rv);
}
- iov[0].iov_len = len;
- iov[0].iov_buf = nng_msg_body(msg, &sz);
+ iov[0].iov_len = nng_msg_len(msg);
+ iov[0].iov_buf = nng_msg_body(msg);
if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) == 0) {
*msgp = msg;
diff --git a/tests/reqrep.c b/tests/reqrep.c
index d182559d..1bdef4fd 100644
--- a/tests/reqrep.c
+++ b/tests/reqrep.c
@@ -91,30 +91,25 @@ Main({
Convey("They can REQ/REP exchange", {
nng_msg *ping;
nng_msg *pong;
- char *body;
- size_t len;
So(nng_msg_alloc(&ping, 0) == 0);
So(nng_msg_append(ping, "ping", 5) == 0);
- body = nng_msg_body(ping, &len);
- So(len == 5);
- So(memcmp(body, "ping", 5) == 0);
+ So(nng_msg_len(ping) == 5);
+ So(memcmp(nng_msg_body(ping), "ping", 5) == 0);
So(nng_sendmsg(req, ping, 0) == 0);
pong = NULL;
So(nng_recvmsg(rep, &pong, 0) == 0);
So(pong != NULL);
- body = nng_msg_body(pong, &len);
- So(len == 5);
- So(memcmp(body, "ping", 5) == 0);
+ So(nng_msg_len(pong) == 5);
+ So(memcmp(nng_msg_body(pong), "ping", 5) == 0);
nng_msg_trim(pong, 5);
So(nng_msg_append(pong, "pong", 5) == 0);
So(nng_sendmsg(rep, pong, 0) == 0);
ping = 0;
So(nng_recvmsg(req, &ping, 0) == 0);
So(ping != NULL);
- body = nng_msg_body(ping, &len);
- So(len == 5);
- So(memcmp(body, "pong", 5) == 0);
+ So(nng_msg_len(ping) == 5);
+ So(memcmp(nng_msg_body(ping), "pong", 5) == 0);
nng_msg_free(ping);
})
})
@@ -124,9 +119,8 @@ Main({
nng_msg *def;
nng_msg *cmd;
nng_msg *nvm;
- char *body;
- size_t len;
uint64_t retry = 100000; // 100 ms
+ size_t len;
nng_socket *req;
nng_socket *rep;
@@ -166,9 +160,8 @@ Main({
So(nng_recvmsg(req, &cmd, 0) == 0);
- body = nng_msg_body(cmd, &len);
- So(len == 4);
- So(memcmp(body, "def", 4) == 0);
+ So(nng_msg_len(cmd) == 4);
+ So(memcmp(nng_msg_body(cmd), "def", 4) == 0);
nng_msg_free(cmd);
})
})
diff --git a/tests/sock.c b/tests/sock.c
index e0743936..78d97a9b 100644
--- a/tests/sock.c
+++ b/tests/sock.c
@@ -148,8 +148,6 @@ TestMain("Socket Operations", {
nng_socket *sock2 = NULL;
int len = 1;
nng_msg *msg;
- size_t sz;
- char *ptr;
uint64_t second = 1000000;
rv = nng_open(&sock2, NNG_PROTO_PAIR);
@@ -181,11 +179,9 @@ TestMain("Socket Operations", {
rv = nng_msg_alloc(&msg, 3);
So(rv == 0);
- ptr = nng_msg_body(msg, &sz);
- So(ptr != NULL);
- So(sz == 3);
-
- memcpy(ptr, "abc", 3);
+ So(nng_msg_len(msg) == 3);
+ So(nng_msg_body(msg) != NULL);
+ memcpy(nng_msg_body(msg), "abc", 3);
rv = nng_sendmsg(sock, msg, 0);
So(rv == 0);
@@ -194,10 +190,8 @@ TestMain("Socket Operations", {
rv = nng_recvmsg(sock2, &msg, 0);
So(rv == 0);
So(msg != NULL);
- ptr = nng_msg_body(msg, &sz);
- So(ptr != NULL);
- So(sz == 3);
- So(memcmp(ptr, "abc", 3) == 0);
+ So(nng_msg_len(msg) == 3);
+ So(memcmp(nng_msg_body(msg), "abc", 3) == 0);
nng_msg_free(msg);
nng_close(sock2);
})
diff --git a/tests/trantest.h b/tests/trantest.h
index 9cd222fb..bd458d21 100644
--- a/tests/trantest.h
+++ b/tests/trantest.h
@@ -99,7 +99,6 @@ trantest_send_recv(trantest *tt)
nng_endpoint *ep = NULL;
nng_msg *send;
nng_msg *recv;
- char *body;
size_t len;
ep = NULL;
@@ -118,9 +117,8 @@ trantest_send_recv(trantest *tt)
recv = NULL;
So(nng_recvmsg(tt->repsock, &recv, 0) == 0);
So(recv != NULL);
- So((body = nng_msg_body(recv, &len)) != NULL);
- So(len == 5);
- So(strcmp(body, "ping") == 0);
+ So(nng_msg_len(recv) == 5);
+ So(strcmp(nng_msg_body(recv), "ping") == 0);
nng_msg_free(recv);
len = strlen("acknowledge");
@@ -129,9 +127,8 @@ trantest_send_recv(trantest *tt)
So(nng_sendmsg(tt->repsock, send, 0) == 0);
So(nng_recvmsg(tt->reqsock, &recv, 0) == 0);
So(recv != NULL);
- So((body = nng_msg_body(recv, &len)) != NULL);
- So(len == strlen("acknowledge"));
- So(strcmp(body, "acknowledge") == 0);
+ So(nng_msg_len(recv) == strlen("acknowledge"));
+ So(strcmp(nng_msg_body(recv), "acknowledge") == 0);
nng_msg_free(recv);
})
}