diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/message.c | 99 | ||||
| -rw-r--r-- | src/core/message.h | 29 | ||||
| -rw-r--r-- | src/core/socket.c | 2 | ||||
| -rw-r--r-- | src/nng.c | 58 | ||||
| -rw-r--r-- | src/nng.h | 23 | ||||
| -rw-r--r-- | src/nng_compat.c | 4 | ||||
| -rw-r--r-- | src/protocol/bus/bus.c | 7 | ||||
| -rw-r--r-- | src/protocol/pair/pair_v1.c | 3 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 18 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 9 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 18 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 35 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 4 |
13 files changed, 173 insertions, 136 deletions
diff --git a/src/core/message.c b/src/core/message.c index 316bc035..10c42a25 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -1,5 +1,5 @@ // -// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Garrett D'Amore <garrett@damore.org> // Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -93,7 +93,7 @@ nni_msg_dump(const char *banner, const nni_msg *msg) // and headroom (excluding the length) are available. It also copies // any extant referenced data. Note that the capacity will increase, // but not the length. To increase the length of the referenced data, -// use either chunk_append or chunk_prepend. +// use either chunk_append or chunk_insert. // // Note that having some headroom is useful when data must be prepended // to a message - it avoids having to perform extra data copies, so we @@ -113,6 +113,11 @@ nni_chunk_grow(nni_chunk *ch, size_t newsz, size_t headwanted) // The test below also covers the case where the pointers are both // NULL, or the capacity is zero. + // No shrinking (violets) + if (newsz < ch->ch_len) { + newsz = ch->ch_len; + } + if ((ch->ch_ptr >= ch->ch_buf) && (ch->ch_ptr < (ch->ch_buf + ch->ch_cap))) { headroom = (size_t)(ch->ch_ptr - ch->ch_buf); @@ -124,6 +129,13 @@ nni_chunk_grow(nni_chunk *ch, size_t newsz, size_t headwanted) // We have enough space at the ends already. return (0); } + // Make sure we allocate at least as much tail room as we + // previously had. + + if (newsz < (ch->ch_cap - headroom)) { + newsz = ch->ch_cap - headroom; + } + if ((newbuf = nni_alloc(newsz + headwanted)) == NULL) { return (NNG_ENOMEM); } @@ -164,9 +176,16 @@ nni_chunk_free(nni_chunk *ch) ch->ch_cap = 0; } -// nni_chunk_trunc truncates bytes from the end of the chunk. +// nni_chunk_clear just resets the length to zero. +static void +nni_chunk_clear(nni_chunk *ch) +{ + ch->ch_len = 0; +} + +// nni_chunk_chop truncates bytes from the end of the chunk. static int -nni_chunk_trunc(nni_chunk *ch, size_t len) +nni_chunk_chop(nni_chunk *ch, size_t len) { if (ch->ch_len < len) { return (NNG_EINVAL); @@ -230,11 +249,11 @@ nni_chunk_append(nni_chunk *ch, const void *data, size_t len) return (0); } -// nni_chunk_prepend prepends data to the chunk, as efficiently as possible. +// nni_chunk_insert prepends data to the chunk, as efficiently as possible. // If the data pointer is NULL, then no data is actually copied, but the // data region will have "grown" in the beginning, with uninitialized data. static int -nni_chunk_prepend(nni_chunk *ch, const void *data, size_t len) +nni_chunk_insert(nni_chunk *ch, const void *data, size_t len) { int rv; @@ -267,11 +286,11 @@ nni_chunk_prepend(nni_chunk *ch, const void *data, size_t len) } static int -nni_chunk_prepend_u32(nni_chunk *ch, uint32_t val) +nni_chunk_insert_u32(nni_chunk *ch, uint32_t val) { unsigned char buf[sizeof(uint32_t)]; NNI_PUT32(buf, val); - return (nni_chunk_prepend(ch, buf, sizeof(buf))); + return (nni_chunk_insert(ch, buf, sizeof(buf))); } static int @@ -293,12 +312,12 @@ nni_chunk_trim_u32(nni_chunk *ch) } static uint32_t -nni_chunk_trunc_u32(nni_chunk *ch) +nni_chunk_chop_u32(nni_chunk *ch) { uint32_t v; NNI_ASSERT(ch->ch_len >= sizeof(v)); NNI_GET32(ch->ch_ptr + ch->ch_len - sizeof(v), v); - nni_chunk_trunc(ch, sizeof(v)); + nni_chunk_chop(ch, sizeof(v)); return (v); } @@ -448,7 +467,7 @@ nni_msg_getopt(nni_msg *m, int opt, void *val, size_t *szp) } } } - return (NNG_ENOTSUP); + return (NNG_ENOENT); } int @@ -463,7 +482,7 @@ nni_msg_realloc(nni_msg *m, size_t sz) } } else { // "Shrinking", just mark bytes at end usable again. - nni_chunk_trunc(&m->m_body, m->m_body.ch_len - sz); + nni_chunk_chop(&m->m_body, m->m_body.ch_len - sz); } return (0); } @@ -475,7 +494,7 @@ nni_msg_header(nni_msg *m) } size_t -nni_msg_header_len(nni_msg *m) +nni_msg_header_len(const nni_msg *m) { return (m->m_header.ch_len); } @@ -487,7 +506,7 @@ nni_msg_body(nni_msg *m) } size_t -nni_msg_len(nni_msg *m) +nni_msg_len(const nni_msg *m) { return (m->m_body.ch_len); } @@ -499,9 +518,9 @@ nni_msg_append(nni_msg *m, const void *data, size_t len) } int -nni_msg_prepend(nni_msg *m, const void *data, size_t len) +nni_msg_insert(nni_msg *m, const void *data, size_t len) { - return (nni_chunk_prepend(&m->m_body, data, len)); + return (nni_chunk_insert(&m->m_body, data, len)); } int @@ -511,33 +530,35 @@ nni_msg_trim(nni_msg *m, size_t len) } int -nni_msg_trunc(nni_msg *m, size_t len) +nni_msg_chop(nni_msg *m, size_t len) { - return (nni_chunk_trunc(&m->m_body, len)); + return (nni_chunk_chop(&m->m_body, len)); } int -nni_msg_append_header(nni_msg *m, const void *data, size_t len) +nni_msg_header_append(nni_msg *m, const void *data, size_t len) { return (nni_chunk_append(&m->m_header, data, len)); } + int -nni_msg_prepend_header(nni_msg *m, const void *data, size_t len) +nni_msg_header_insert(nni_msg *m, const void *data, size_t len) { - return (nni_chunk_prepend(&m->m_header, data, len)); + return (nni_chunk_insert(&m->m_header, data, len)); } int -nni_msg_trim_header(nni_msg *m, size_t len) +nni_msg_header_trim(nni_msg *m, size_t len) { return (nni_chunk_trim(&m->m_header, len)); } int -nni_msg_trunc_header(nni_msg *m, size_t len) +nni_msg_header_chop(nni_msg *m, size_t len) { - return (nni_chunk_trunc(&m->m_header, len)); + return (nni_chunk_chop(&m->m_header, len)); } + int nni_msg_append_u32(nni_msg *m, uint32_t val) { @@ -545,9 +566,9 @@ nni_msg_append_u32(nni_msg *m, uint32_t val) } int -nni_msg_prepend_u32(nni_msg *m, uint32_t val) +nni_msg_insert_u32(nni_msg *m, uint32_t val) { - return (nni_chunk_prepend_u32(&m->m_body, val)); + return (nni_chunk_insert_u32(&m->m_body, val)); } int @@ -557,15 +578,15 @@ nni_msg_header_append_u32(nni_msg *m, uint32_t val) } int -nni_msg_header_prepend_u32(nni_msg *m, uint32_t val) +nni_msg_header_insert_u32(nni_msg *m, uint32_t val) { - return (nni_chunk_prepend_u32(&m->m_header, val)); + return (nni_chunk_insert_u32(&m->m_header, val)); } uint32_t -nni_msg_trunc_u32(nni_msg *m) +nni_msg_chop_u32(nni_msg *m) { - return (nni_chunk_trunc_u32(&m->m_body)); + return (nni_chunk_chop_u32(&m->m_body)); } uint32_t @@ -575,9 +596,9 @@ nni_msg_trim_u32(nni_msg *m) } uint32_t -nni_msg_header_trunc_u32(nni_msg *m) +nni_msg_header_chop_u32(nni_msg *m) { - return (nni_chunk_trunc_u32(&m->m_header)); + return (nni_chunk_chop_u32(&m->m_header)); } uint32_t nni_msg_header_trim_u32(nni_msg *m) @@ -586,13 +607,25 @@ nni_msg_header_trim_u32(nni_msg *m) } void +nni_msg_clear(nni_msg *m) +{ + nni_chunk_clear(&m->m_body); +} + +void +nni_msg_header_clear(nni_msg *m) +{ + nni_chunk_clear(&m->m_header); +} + +void nni_msg_set_pipe(nni_msg *m, uint32_t pid) { m->m_pipe = pid; } uint32_t -nni_msg_get_pipe(nni_msg *m) +nni_msg_get_pipe(const nni_msg *m) { return (m->m_pipe); }
\ No newline at end of file diff --git a/src/core/message.h b/src/core/message.h index 4bc15321..00d31436 100644 --- a/src/core/message.h +++ b/src/core/message.h @@ -12,35 +12,38 @@ #define CORE_MESSAGE_H // Internally used message API. Again, this is not part of our public API. +// "trim" operations work from the front, and "chop" work from the end. extern int nni_msg_alloc(nni_msg **, size_t); extern void nni_msg_free(nni_msg *); extern int nni_msg_realloc(nni_msg *, size_t); extern int nni_msg_dup(nni_msg **, const nni_msg *); extern void * nni_msg_header(nni_msg *); -extern size_t nni_msg_header_len(nni_msg *); +extern size_t nni_msg_header_len(const nni_msg *); extern void * nni_msg_body(nni_msg *); -extern size_t nni_msg_len(nni_msg *); +extern size_t nni_msg_len(const nni_msg *); extern int nni_msg_append(nni_msg *, const void *, size_t); -extern int nni_msg_prepend(nni_msg *, const void *, size_t); -extern int nni_msg_append_header(nni_msg *, const void *, size_t); -extern int nni_msg_prepend_header(nni_msg *, const void *, size_t); +extern int nni_msg_insert(nni_msg *, const void *, size_t); +extern int nni_msg_header_append(nni_msg *, const void *, size_t); +extern int nni_msg_header_insert(nni_msg *, const void *, size_t); extern int nni_msg_trim(nni_msg *, size_t); -extern int nni_msg_trunc(nni_msg *, size_t); -extern int nni_msg_trim_header(nni_msg *, size_t); -extern int nni_msg_trunc_header(nni_msg *, size_t); +extern int nni_msg_chop(nni_msg *, size_t); +extern void nni_msg_clear(nni_msg *); +extern void nni_msg_header_clear(nni_msg *); +extern int nni_msg_header_trim(nni_msg *, size_t); +extern int nni_msg_header_chop(nni_msg *, size_t); extern int nni_msg_setopt(nni_msg *, int, const void *, size_t); extern int nni_msg_getopt(nni_msg *, int, void *, size_t *); extern void nni_msg_dump(const char *, const nni_msg *); extern int nni_msg_append_u32(nni_msg *, uint32_t); -extern int nni_msg_prepend_u32(nni_msg *, uint32_t); +extern int nni_msg_insert_u32(nni_msg *, uint32_t); extern int nni_msg_header_append_u32(nni_msg *, uint32_t); -extern int nni_msg_header_prepend_u32(nni_msg *, uint32_t); +extern int nni_msg_header_insert_u32(nni_msg *, uint32_t); extern uint32_t nni_msg_trim_u32(nni_msg *); -extern uint32_t nni_msg_trunc_u32(nni_msg *); +extern uint32_t nni_msg_chop_u32(nni_msg *); extern uint32_t nni_msg_header_trim_u32(nni_msg *); -extern uint32_t nni_msg_header_trunc_u32(nni_msg *); +extern uint32_t nni_msg_header_chop_u32(nni_msg *); extern void nni_msg_set_pipe(nni_msg *, uint32_t); -extern uint32_t nni_msg_get_pipe(nni_msg *); +extern uint32_t nni_msg_get_pipe(const nni_msg *); #endif // CORE_SOCKET_H diff --git a/src/core/socket.c b/src/core/socket.c index 46ed2100..711cd57a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -411,7 +411,6 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) { nni_sock *s = NULL; int rv; - uint32_t sockid; if (proto->proto_version != NNI_PROTOCOL_VERSION) { // unsupported protocol version @@ -603,7 +602,6 @@ void nni_sock_closeall(void) { nni_sock *s; - uint32_t id; for (;;) { nni_mtx_lock(&nni_sock_lk); @@ -486,7 +486,7 @@ nng_msg_body(nng_msg *msg) } size_t -nng_msg_len(nng_msg *msg) +nng_msg_len(const nng_msg *msg) { return (nni_msg_len(msg)); } @@ -498,7 +498,7 @@ nng_msg_header(nng_msg *msg) } size_t -nng_msg_header_len(nng_msg *msg) +nng_msg_header_len(const nng_msg *msg) { return (nni_msg_header_len(msg)); } @@ -510,21 +510,21 @@ nng_msg_append(nng_msg *msg, const void *data, size_t sz) } int -nng_msg_prepend(nng_msg *msg, const void *data, size_t sz) +nng_msg_insert(nng_msg *msg, const void *data, size_t sz) { - return (nni_msg_prepend(msg, data, sz)); + return (nni_msg_insert(msg, data, sz)); } int -nng_msg_append_header(nng_msg *msg, const void *data, size_t sz) +nng_msg_header_append(nng_msg *msg, const void *data, size_t sz) { - return (nni_msg_append_header(msg, data, sz)); + return (nni_msg_header_append(msg, data, sz)); } int -nng_msg_prepend_header(nng_msg *msg, const void *data, size_t sz) +nng_msg_header_insert(nng_msg *msg, const void *data, size_t sz) { - return (nni_msg_prepend_header(msg, data, sz)); + return (nni_msg_header_insert(msg, data, sz)); } int @@ -534,21 +534,51 @@ nng_msg_trim(nng_msg *msg, size_t sz) } int -nng_msg_trunc(nng_msg *msg, size_t sz) +nng_msg_chop(nng_msg *msg, size_t sz) { - return (nni_msg_trunc(msg, sz)); + return (nni_msg_chop(msg, sz)); } int -nng_msg_trim_header(nng_msg *msg, size_t sz) +nng_msg_header_trim(nng_msg *msg, size_t sz) { - return (nni_msg_trim_header(msg, sz)); + return (nni_msg_header_trim(msg, sz)); } int -nng_msg_trunc_header(nng_msg *msg, size_t sz) +nng_msg_header_chop(nng_msg *msg, size_t sz) { - return (nni_msg_trunc_header(msg, sz)); + return (nni_msg_header_chop(msg, sz)); +} + +void +nng_msg_clear(nng_msg *msg) +{ + nni_msg_clear(msg); +} + +void +nng_msg_header_clear(nng_msg *msg) +{ + nni_msg_header_clear(msg); +} + +int +nng_msg_dup(nng_msg **dup, const nng_msg *src) +{ + return (nni_msg_dup(dup, src)); +} + +nng_pipe +nng_msg_get_pipe(const nng_msg *msg) +{ + return (nni_msg_get_pipe(msg)); +} + +void +nng_msg_set_pipe(nng_msg *msg, nng_pipe p) +{ + nni_msg_set_pipe(msg, p); } int @@ -252,18 +252,23 @@ NNG_DECL int nng_msg_alloc(nng_msg **, size_t); NNG_DECL void nng_msg_free(nng_msg *); NNG_DECL int nng_msg_realloc(nng_msg *, size_t); NNG_DECL void *nng_msg_header(nng_msg *); -NNG_DECL size_t nng_msg_header_len(nng_msg *); +NNG_DECL size_t nng_msg_header_len(const nng_msg *); NNG_DECL void * nng_msg_body(nng_msg *); -NNG_DECL size_t nng_msg_len(nng_msg *); +NNG_DECL size_t nng_msg_len(const nng_msg *); NNG_DECL int nng_msg_append(nng_msg *, const void *, size_t); -NNG_DECL int nng_msg_prepend(nng_msg *, const void *, size_t); +NNG_DECL int nng_msg_insert(nng_msg *, const void *, size_t); NNG_DECL int nng_msg_trim(nng_msg *, size_t); -NNG_DECL int nng_msg_trunc(nng_msg *, size_t); -NNG_DECL int nng_msg_append_header(nng_msg *, const void *, size_t); -NNG_DECL int nng_msg_prepend_header(nng_msg *, const void *, size_t); -NNG_DECL int nng_msg_trim_header(nng_msg *, size_t); -NNG_DECL int nng_msg_trunc_header(nng_msg *, size_t); -NNG_DECL int nng_msg_getopt(nng_msg *, int, void *, size_t *); +NNG_DECL int nng_msg_chop(nng_msg *, size_t); +NNG_DECL int nng_msg_header_append(nng_msg *, const void *, size_t); +NNG_DECL int nng_msg_header_insert(nng_msg *, const void *, size_t); +NNG_DECL int nng_msg_header_trim(nng_msg *, size_t); +NNG_DECL int nng_msg_header_chop(nng_msg *, size_t); +NNG_DECL int nng_msg_dup(nng_msg **, const nng_msg *); +NNG_DECL void nng_msg_clear(nng_msg *); +NNG_DECL void nng_msg_header_clear(nng_msg *); +NNG_DECL void nng_msg_set_pipe(nng_msg *, nng_pipe); +NNG_DECL nng_pipe nng_msg_get_pipe(const nng_msg *); +NNG_DECL int nng_msg_getopt(nng_msg *, int, void *, size_t *); // Pipe API. Generally pipes are only "observable" to applications, but // we do permit an application to close a pipe. This can be useful, for diff --git a/src/nng_compat.c b/src/nng_compat.c index dfcb2134..710be9be 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -345,7 +345,7 @@ nn_recvmsg(int s, struct nn_msghdr *mh, int flags) if ((mh->msg_iovlen == 1) && (mh->msg_iov[0].iov_len == NN_MSG)) { // Receiver wants to have a dynamically allocated message. // There can only be one of these. - if ((rv = nng_msg_prepend(msg, &msg, sizeof(msg))) != 0) { + if ((rv = nng_msg_insert(msg, &msg, sizeof(msg))) != 0) { nng_msg_free(msg); nn_seterror(rv); return (-1); @@ -534,7 +534,7 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) continue; } data += sizeof(spsz); - rv = nng_msg_append_header(msg, data, spsz); + rv = nng_msg_header_append(msg, data, spsz); if (rv != 0) { if (!keep) { nng_msg_free(msg); diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index c5a6ce06..79d7187e 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -240,16 +240,14 @@ nni_bus_pipe_recv_cb(void *arg) nni_bus_pipe *ppipe = arg; nni_bus_sock *psock = ppipe->psock; nni_msg * msg; - uint32_t id; if (nni_aio_result(&ppipe->aio_recv) != 0) { nni_pipe_stop(ppipe->npipe); return; } msg = ppipe->aio_recv.a_msg; - id = nni_pipe_id(ppipe->npipe); - if (nni_msg_prepend_header(msg, &id, 4) != 0) { + if (nni_msg_header_insert_u32(msg, nni_pipe_id(ppipe->npipe)) != 0) { // XXX: bump a nomemory stat nni_msg_free(msg); nni_pipe_stop(ppipe->npipe); @@ -298,8 +296,7 @@ nni_bus_sock_getq_cb(void *arg) // is doing this probably.) In this case grab the pipe // ID from the header, so we can exclude it. if (nni_msg_header_len(msg) >= 4) { - memcpy(&sender, nni_msg_header(msg), 4); - nni_msg_trim_header(msg, 4); + sender = nni_msg_header_trim_u32(msg); } else { sender = 0; } diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c index ea8c1226..812617f1 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair/pair_v1.c @@ -224,7 +224,7 @@ pair1_pipe_recv_cb(void *arg) // If we bounced too many times, discard the message, but // keep getting more. - if (hdr >= s->ttl) { + if (hdr >= (unsigned) s->ttl) { nni_msg_free(msg); nni_pipe_recv(npipe, &p->aio_recv); return; @@ -309,7 +309,6 @@ pair1_pipe_getq_cb(void *arg) pair1_sock *s = p->psock; nni_msg * msg; uint32_t hops; - uint8_t * data; if (nni_aio_result(&p->aio_getq) != 0) { nni_pipe_stop(p->npipe); diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 1adfd0f8..49bcdce2 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -207,7 +207,6 @@ nni_rep_sock_getq_cb(void *arg) nni_rep_sock *rep = arg; nni_msgq * uwq = rep->uwq; nni_msg * msg; - uint8_t * header; uint32_t id; nni_rep_pipe *rp; int rv; @@ -234,9 +233,7 @@ nni_rep_sock_getq_cb(void *arg) return; } - header = nni_msg_header(msg); - NNI_GET32(header, id); - nni_msg_trim_header(msg, 4); + id = nni_msg_header_trim_u32(msg); // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we @@ -291,7 +288,6 @@ nni_rep_pipe_recv_cb(void *arg) nni_rep_sock *rep = rp->rep; nni_msg * msg; int rv; - uint8_t idbuf[4]; uint8_t * body; int hops; @@ -300,13 +296,11 @@ nni_rep_pipe_recv_cb(void *arg) return; } - NNI_PUT32(idbuf, nni_pipe_id(rp->pipe)); - msg = rp->aio_recv.a_msg; rp->aio_recv.a_msg = NULL; // Store the pipe id in the header, first thing. - rv = nni_msg_append_header(msg, idbuf, 4); + rv = nni_msg_header_append_u32(msg, nni_pipe_id(rp->pipe)); if (rv != 0) { goto malformed; } @@ -323,7 +317,7 @@ nni_rep_pipe_recv_cb(void *arg) } body = nni_msg_body(msg); end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_append_header(msg, body, 4); + rv = nni_msg_header_append(msg, body, 4); if (rv != 0) { // Presumably this is due to out of memory. // We could just discard and try again, but we @@ -422,9 +416,9 @@ nni_rep_sock_sfilter(void *arg, nni_msg *msg) } // drop anything else in the header... - nni_msg_trunc_header(msg, nni_msg_header_len(msg)); + nni_msg_header_clear(msg); - if (nni_msg_append_header(msg, rep->btrace, rep->btrace_len) != 0) { + if (nni_msg_header_append(msg, rep->btrace, rep->btrace_len) != 0) { nni_free(rep->btrace, rep->btrace_len); rep->btrace = NULL; rep->btrace_len = 0; @@ -463,7 +457,7 @@ nni_rep_sock_rfilter(void *arg, nni_msg *msg) } rep->btrace_len = len; memcpy(rep->btrace, header, len); - nni_msg_trunc_header(msg, len); + nni_msg_header_clear(msg); return (msg); } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index d0dd3887..20fb07f8 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -437,17 +437,14 @@ nni_req_recv_cb(void *arg) // Malformed message. goto malformed; } - if (nni_msg_append_header(msg, nni_msg_body(msg), 4) != 0) { + if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { // Arguably we could just discard and carry on. But // dropping the connection is probably more helpful since // it lets the other side see that a problem occurred. // Plus it gives us a chance to reclaim some memory. goto malformed; } - if (nni_msg_trim(msg, 4) != 0) { - // This should never happen - could be an assert. - nni_panic("Failed to trim REQ header from body"); - } + (void) nni_msg_trim(msg, 4); // Cannot fail rp->aio_putq.a_msg = msg; nni_msgq_aio_put(rp->req->urq, &rp->aio_putq); @@ -548,7 +545,7 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) // Request ID is in big endian format. NNI_PUT32(req->reqid, id); - if (nni_msg_append_header(msg, req->reqid, 4) != 0) { + if (nni_msg_header_append(msg, req->reqid, 4) != 0) { // Should be ENOMEM. nni_msg_free(msg); return (NULL); diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 4db79b86..32513134 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -225,7 +225,6 @@ nni_resp_sock_getq_cb(void *arg) { nni_resp_sock *psock = arg; nni_msg * msg; - uint8_t * header; uint32_t id; nni_resp_pipe *ppipe; int rv; @@ -244,9 +243,7 @@ nni_resp_sock_getq_cb(void *arg) nni_msgq_aio_get(psock->uwq, &psock->aio_getq); return; } - header = nni_msg_header(msg); - NNI_GET32(header, id); - nni_msg_trim_header(msg, 4); + id = nni_msg_header_trim_u32(msg); nni_mtx_lock(&psock->mtx); rv = nni_idhash_find(psock->pipes, id, (void **) &ppipe); @@ -301,7 +298,6 @@ nni_resp_recv_cb(void *arg) nni_resp_sock *psock = ppipe->psock; nni_msgq * urq; nni_msg * msg; - uint8_t idbuf[4]; int hops; int rv; @@ -311,13 +307,11 @@ nni_resp_recv_cb(void *arg) urq = nni_sock_recvq(psock->nsock); - NNI_PUT32(idbuf, ppipe->id); - msg = ppipe->aio_recv.a_msg; ppipe->aio_recv.a_msg = NULL; // Store the pipe id in the header, first thing. - if (nni_msg_append_header(msg, idbuf, 4) != 0) { + if (nni_msg_header_append_u32(msg, ppipe->id) != 0) { nni_msg_free(msg); goto error; } @@ -338,7 +332,7 @@ nni_resp_recv_cb(void *arg) } body = nni_msg_body(msg); end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_append_header(msg, body, 4); + rv = nni_msg_header_append(msg, body, 4); if (rv != 0) { nni_msg_free(msg); goto error; @@ -439,9 +433,9 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg) } // drop anything else in the header... - nni_msg_trunc_header(msg, nni_msg_header_len(msg)); + nni_msg_header_clear(msg); - if (nni_msg_append_header(msg, psock->btrace, psock->btrace_len) != + if (nni_msg_header_append(msg, psock->btrace, psock->btrace_len) != 0) { nni_free(psock->btrace, psock->btrace_len); psock->btrace = NULL; @@ -481,7 +475,7 @@ nni_resp_sock_rfilter(void *arg, nni_msg *msg) } psock->btrace_len = len; memcpy(psock->btrace, header, len); - nni_msg_trunc_header(msg, len); + nni_msg_header_clear(msg); return (msg); } diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 91fe4ad3..45b06d67 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -33,8 +33,8 @@ struct nni_surv_sock { nni_time expire; int raw; int closing; - uint32_t nextid; // next id - uint8_t survid[4]; // outstanding request ID (big endian) + uint32_t nextid; // next id + uint32_t survid; // outstanding request ID (big endian) nni_list pipes; nni_aio aio_getq; nni_timer_node timer; @@ -271,16 +271,13 @@ nni_surv_recv_cb(void *arg) nni_msg_free(msg); goto failed; } - if (nni_msg_append_header(msg, nni_msg_body(msg), 4) != 0) { + if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { // Should be NNG_ENOMEM nni_msg_free(msg); goto failed; } - if (nni_msg_trim(msg, 4) != 0) { - // This should never happen - could be an assert. - nni_msg_free(msg); - goto failed; - } + (void) nni_msg_trim(msg, 4); + ppipe->aio_putq.a_msg = msg; nni_msgq_aio_put(ppipe->psock->urq, &ppipe->aio_putq); return; @@ -309,7 +306,7 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz) } else { nni_sock_recverr(psock->nsock, NNG_ESTATE); } - memset(psock->survid, 0, sizeof(psock->survid)); + psock->survid = 0; nni_timer_cancel(&psock->timer); } break; @@ -381,7 +378,7 @@ nni_surv_timeout(void *arg) nni_surv_sock *psock = arg; nni_sock_lock(psock->nsock); - memset(psock->survid, 0, sizeof(psock->survid)); + psock->survid = 0; nni_sock_recverr(psock->nsock, NNG_ESTATE); nni_msgq_set_get_error(psock->urq, NNG_ETIMEDOUT); nni_sock_unlock(psock->nsock); @@ -391,7 +388,6 @@ static nni_msg * nni_surv_sock_sfilter(void *arg, nni_msg *msg) { nni_surv_sock *psock = arg; - uint32_t id; if (psock->raw) { // No automatic retry, and the request ID must @@ -402,12 +398,9 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg) // Generate a new request ID. We always set the high // order bit so that the peer can locate the end of the // backtrace. (Pipe IDs have the high order bit clear.) - id = (psock->nextid++) | 0x80000000u; - - // Survey ID is in big endian format. - NNI_PUT32(psock->survid, id); + psock->survid = (psock->nextid++) | 0x80000000u; - if (nni_msg_append_header(msg, psock->survid, 4) != 0) { + if (nni_msg_header_append_u32(msg, psock->survid) != 0) { // Should be ENOMEM. nni_msg_free(msg); return (NULL); @@ -436,18 +429,12 @@ nni_surv_sock_rfilter(void *arg, nni_msg *msg) return (msg); } - if (nni_msg_header_len(msg) < 4) { - nni_msg_free(msg); - return (NULL); - } - - if (memcmp(nni_msg_header(msg), ssock->survid, 4) != 0) { + if ((nni_msg_header_len(msg) < sizeof(uint32_t)) || + (nni_msg_header_trim_u32(msg) != ssock->survid)) { // Wrong request id nni_msg_free(msg); return (NULL); } - // Prune the survey ID. - nni_msg_trim_header(msg, 4); return (msg); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 9cc43ad7..37d7153f 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -159,11 +159,11 @@ nni_inproc_pipe_send(void *arg, nni_aio *aio) // side won't know what to do otherwise. h = nni_msg_header(msg); l = nni_msg_header_len(msg); - if ((rv = nni_msg_prepend(msg, h, l)) != 0) { + if ((rv = nni_msg_insert(msg, h, l)) != 0) { nni_aio_finish(aio, rv, aio->a_count); return; } - nni_msg_trunc_header(msg, l); + nni_msg_header_chop(msg, l); nni_msgq_aio_put(pipe->wq, aio); } |
