aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/defs.h6
-rw-r--r--src/core/endpt.h2
-rw-r--r--src/core/list.c8
-rw-r--r--src/core/list.h8
-rw-r--r--src/core/message.c87
-rw-r--r--src/core/message.h3
-rw-r--r--src/core/pipe.h2
-rw-r--r--src/core/protocol.c2
-rw-r--r--src/core/socket.h1
-rw-r--r--src/nng.h14
-rw-r--r--src/platform/posix/posix_alloc.c4
-rw-r--r--src/protocol/pair/pair.c2
-rw-r--r--src/protocol/reqrep/rep.c3
-rw-r--r--src/protocol/reqrep/req.c3
-rw-r--r--src/transport/inproc/inproc.c12
15 files changed, 109 insertions, 48 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index dd225567..e44be1ec 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -28,7 +28,7 @@ typedef struct nni_tran nni_tran;
typedef struct nni_tran_ep nni_tran_ep;
typedef struct nni_tran_pipe nni_tran_pipe;
-typedef struct nni_proto_pipe nni_proto_pipe;
+typedef struct nni_proto_pipe nni_proto_pipe;
typedef struct nni_proto nni_proto;
typedef int nni_signal; // Turnstile/wakeup channel.
@@ -41,7 +41,7 @@ typedef int64_t nni_duration; // Relative time (usec).
#define NNI_SECOND (1000000)
// Structure allocation conveniences.
-#define NNI_ALLOC_STRUCT(s) nni_alloc(sizeof (*(s)))
-#define NNI_FREE_STRUCT(s) nni_free((s), sizeof (*(s)))
+#define NNI_ALLOC_STRUCT(s) nni_alloc(sizeof (*s))
+#define NNI_FREE_STRUCT(s) nni_free((s), sizeof (*s))
#endif // CORE_DEFS_H
diff --git a/src/core/endpt.h b/src/core/endpt.h
index a8fce86a..bff0ec6d 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -17,7 +17,7 @@
// OR TRANSPORTS.
struct nng_endpoint {
nni_tran_ep ep_ops;
- nni_tran *ep_tran;
+ nni_tran * ep_tran;
void * ep_data; // Transport private
nni_list_node ep_node; // Per socket list
nni_sock * ep_sock;
diff --git a/src/core/list.c b/src/core/list.c
index 8d6c3ace..ca200415 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -30,7 +30,7 @@ nni_list_init_offset(nni_list *list, size_t offset)
void *
-nni_list_first(nni_list *list)
+nni_list_first(const nni_list *list)
{
nni_list_node *node = list->ll_head.ln_next;
@@ -42,7 +42,7 @@ nni_list_first(nni_list *list)
void *
-nni_list_last(nni_list *list)
+nni_list_last(const nni_list *list)
{
nni_list_node *node = list->ll_head.ln_prev;
@@ -84,7 +84,7 @@ nni_list_prepend(nni_list *list, void *item)
void *
-nni_list_next(nni_list *list, void *item)
+nni_list_next(const nni_list *list, void *item)
{
nni_list_node *node = NODE(list, item);
@@ -96,7 +96,7 @@ nni_list_next(nni_list *list, void *item)
void *
-nni_list_prev(nni_list *list, void *item)
+nni_list_prev(const nni_list *list, void *item)
{
nni_list_node *node = NODE(list, item);
diff --git a/src/core/list.h b/src/core/list.h
index 9d95a527..a6540f5e 100644
--- a/src/core/list.h
+++ b/src/core/list.h
@@ -32,12 +32,12 @@ extern void nni_list_init_offset(nni_list *list, size_t offset);
#define NNI_LIST_NODE_INIT(node) \
{ (node)->ln_prev = (node)->ln_next = 0; }
-extern void *nni_list_first(nni_list *);
-extern void *nni_list_last(nni_list *);
+extern void *nni_list_first(const nni_list *);
+extern void *nni_list_last(const nni_list *);
extern void nni_list_append(nni_list *, void *);
extern void nni_list_prepend(nni_list *, void *);
-extern void *nni_list_next(nni_list *, void *);
-extern void *nni_list_prev(nni_list *, void *);
+extern void *nni_list_next(const nni_list *, void *);
+extern void *nni_list_prev(const nni_list *, void *);
extern void nni_list_remove(nni_list *, void *);
#define NNI_LIST_FOREACH(l, it) \
diff --git a/src/core/message.c b/src/core/message.c
index 3135cc95..351858a4 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -9,6 +9,7 @@
#include <stdlib.h>
#include <string.h>
+#include <stdio.h>
#include "core/nng_impl.h"
@@ -37,6 +38,55 @@ typedef struct {
nni_list_node mo_node;
} nni_msgopt;
+static void
+nni_chunk_dump(const nni_chunk *chunk, char *prefix)
+{
+ int i, j;
+ uint8_t x;
+ char buf[128];
+
+ (void) snprintf(buf, sizeof (buf),
+ " %s (cap %d, len %d, offset %d ptr %p):", prefix,
+ (int) chunk->ch_cap, (int) chunk->ch_len,
+ (int) (chunk->ch_ptr - chunk->ch_buf), chunk->ch_ptr);
+ nni_println(buf);
+
+ buf[0] = 0;
+ for (i = 0, j = 0; i < chunk->ch_len; i++) {
+ if ((i % 16) == 0) {
+ if (j > 0) {
+ buf[j++] = '\0';
+ nni_println(buf);
+ j = 0;
+ }
+ snprintf(buf, sizeof (buf), " %4x: ", i);
+ j += strlen(buf);
+ }
+ buf[j++] = ' ';
+ x = (chunk->ch_ptr[i] >> 4);
+ buf[j++] = x > 9 ? ('A' + (x - 10)) : '0' + x;
+ x = (chunk->ch_ptr[i] & 0x0f);
+ buf[j++] = x > 9 ? ('A' + (x - 10)) : '0' + x;
+ }
+ if (j > 0) {
+ buf[j++] = '\0';
+ nni_println(buf);
+ }
+}
+
+
+void
+nni_msg_dump(const char *banner, const nni_msg *msg)
+{
+ char buf[128];
+
+ (void) snprintf(buf, sizeof (buf), "--- %s BEGIN ---", banner);
+ nni_println(buf);
+ nni_chunk_dump(&msg->m_header, "HEADER");
+ nni_chunk_dump(&msg->m_body, "BODY");
+ nni_println("--- END ---");
+}
+
// nni_chunk_grow increases the underlying space for a chunk. It ensures
// that the desired amount of trailing space (including the length)
@@ -66,20 +116,19 @@ nni_chunk_grow(nni_chunk *ch, size_t newsz, size_t headwanted)
if ((ch->ch_ptr >= ch->ch_buf) &&
(ch->ch_ptr < (ch->ch_buf + ch->ch_cap))) {
headroom = (size_t) (ch->ch_ptr - ch->ch_buf);
+ if (headwanted < headroom) {
+ headwanted = headroom; // Never shrink this.
+ }
if (((newsz + headwanted) < ch->ch_cap) &&
(headwanted <= headroom)) {
// We have enough space at the ends already.
return (0);
}
- if (headwanted < headroom) {
- // We never shrink... headroom either.
- headwanted = headroom;
- }
if ((newbuf = nni_alloc(newsz + headwanted)) == NULL) {
return (NNG_ENOMEM);
}
// Copy all the data, but not header or trailer.
- memcpy(newbuf + headwanted, ch->ch_buf + headroom, ch->ch_len);
+ memcpy(newbuf + headwanted, ch->ch_ptr, ch->ch_len);
nni_free(ch->ch_buf, ch->ch_cap);
ch->ch_buf = newbuf;
ch->ch_ptr = newbuf + headwanted;
@@ -90,22 +139,16 @@ nni_chunk_grow(nni_chunk *ch, size_t newsz, size_t headwanted)
// We either don't have a data pointer yet, or it doesn't reference
// the backing store. In this case, we just check against the
// allocated capacity and grow, or don't grow.
- if (newsz < ch->ch_cap) {
- // Enough space at end, so just use it.
- if (ch->ch_ptr == NULL) {
- ch->ch_ptr = ch->ch_buf + headwanted;
+ if ((newsz + headwanted) >= ch->ch_cap) {
+ if ((newbuf = nni_alloc(newsz + headwanted)) == NULL) {
+ return (NNG_ENOMEM);
}
- return (0);
- } else if ((newbuf = nni_alloc(newsz)) == NULL) {
- return (NNG_ENOMEM);
+ nni_free(ch->ch_buf, ch->ch_cap);
+ ch->ch_cap = newsz + headwanted;
+ ch->ch_buf = newbuf;
}
- nni_free(ch->ch_buf, ch->ch_cap);
- ch->ch_buf = newbuf;
- ch->ch_cap = newsz;
- if (ch->ch_ptr == NULL) {
- ch->ch_ptr = ch->ch_buf + headwanted;
- }
+ ch->ch_ptr = ch->ch_buf + headwanted;
return (0);
}
@@ -154,7 +197,7 @@ nni_chunk_trim(nni_chunk *ch, size_t len)
static int
nni_chunk_dup(nni_chunk *dst, const nni_chunk *src)
{
- if ((dst->ch_buf = nni_alloc(src->ch_cap)) != 0) {
+ if ((dst->ch_buf = nni_alloc(src->ch_cap)) == NULL) {
return (NNG_ENOMEM);
}
dst->ch_cap = src->ch_cap;
@@ -270,7 +313,7 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
int
-nni_msg_dup(nni_msg **dup, nni_msg *src)
+nni_msg_dup(nni_msg **dup, const nni_msg *src)
{
nni_msg *m;
nni_msgopt *mo;
@@ -280,6 +323,10 @@ nni_msg_dup(nni_msg **dup, nni_msg *src)
if ((m = NNI_ALLOC_STRUCT(m)) == NULL) {
return (NNG_ENOMEM);
}
+ memset(m, 0, sizeof (*m));
+ NNI_LIST_INIT(&m->m_options, nni_msgopt, mo_node);
+
+
if ((rv = nni_chunk_dup(&m->m_header, &src->m_header)) != 0) {
NNI_FREE_STRUCT(m);
return (rv);
diff --git a/src/core/message.h b/src/core/message.h
index dff71646..0f9a24ac 100644
--- a/src/core/message.h
+++ b/src/core/message.h
@@ -15,7 +15,7 @@
extern int nni_msg_alloc(nni_msg **, size_t);
extern void nni_msg_free(nni_msg *);
extern int nni_msg_realloc(nni_msg *, size_t);
-extern int nni_msg_dup(nni_msg **, nni_msg *);
+extern int nni_msg_dup(nni_msg **, const nni_msg *);
extern void *nni_msg_header(nni_msg *, size_t *);
extern void *nni_msg_body(nni_msg *, size_t *);
extern int nni_msg_append(nni_msg *, const void *, size_t);
@@ -28,5 +28,6 @@ extern int nni_msg_trim_header(nni_msg *, size_t);
extern int nni_msg_trunc_header(nni_msg *, size_t);
extern int nni_msg_setopt(nni_msg *, int, const void *, size_t);
extern int nni_msg_getopt(nni_msg *, int, void *, size_t *);
+extern void nni_msg_dump(const char *, const nni_msg *);
#endif // CORE_SOCKET_H
diff --git a/src/core/pipe.h b/src/core/pipe.h
index d2819a31..a8d7b286 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -21,7 +21,7 @@ struct nng_pipe {
uint32_t p_id;
nni_tran_pipe p_tran_ops;
void * p_tran_data;
- nni_proto_pipe p_proto_ops;
+ nni_proto_pipe p_proto_ops;
void * p_proto_data;
nni_list_node p_node;
nni_sock * p_sock;
diff --git a/src/core/protocol.c b/src/core/protocol.c
index 072cc301..27430771 100644
--- a/src/core/protocol.c
+++ b/src/core/protocol.c
@@ -68,6 +68,7 @@ nni_proto_number(const char *name)
return (NNG_PROTO_NONE);
}
+
uint16_t
nni_proto_peer(uint16_t num)
{
@@ -78,4 +79,3 @@ nni_proto_peer(uint16_t num)
}
return (p->proto_peer);
}
-
diff --git a/src/core/socket.h b/src/core/socket.h
index 8b3c3ff9..5a0dbb84 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -48,6 +48,7 @@ struct nng_socket {
extern int nni_sock_open(nni_sock **, uint16_t);
extern int nni_sock_close(nni_sock *);
extern uint16_t nni_sock_proto(nni_sock *);
+extern uint16_t nni_sock_peer(nni_sock *);
extern int nni_sock_setopt(nni_sock *, int, const void *, size_t);
extern int nni_sock_getopt(nni_sock *, int, void *, size_t *);
extern int nni_sock_recvmsg(nni_sock *, nni_msg **, nni_time);
diff --git a/src/nng.h b/src/nng.h
index a0995595..edb2c66a 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -98,13 +98,13 @@ NNG_DECL int nng_unsetnotify(nng_socket *, nng_notify *);
// NNG_EVENT_ENDPT_ADD - An endpoint is added to the socket.
// NNG_EVENT_ENDPT_REM - An endpoint is removed from the socket.
#define NNG_EVENT_BIT(x) (1U << (x))
-#define NNG_EVENT_RECV NNG_EVENT_BIT(0)
-#define NNG_EVENT_SEND NNG_EVENT_BIT(1)
-#define NNG_EVENT_ERROR NNG_EVENT_BIT(2)
-#define NNG_EVENT_PIPE_ADD NNG_EVENT_BIT(3)
-#define NNG_EVENT_PIPE_REM NNG_EVENT_BIT(4)
-#define NNG_EVENT_ENDPOINT_ADD NNG_EVENT_BIT(5)
-#define NNG_EVENT_ENDPOINT_REM NNG_EVENT_BIT(6)
+#define NNG_EVENT_RECV NNG_EVENT_BIT(0)
+#define NNG_EVENT_SEND NNG_EVENT_BIT(1)
+#define NNG_EVENT_ERROR NNG_EVENT_BIT(2)
+#define NNG_EVENT_PIPE_ADD NNG_EVENT_BIT(3)
+#define NNG_EVENT_PIPE_REM NNG_EVENT_BIT(4)
+#define NNG_EVENT_ENDPOINT_ADD NNG_EVENT_BIT(5)
+#define NNG_EVENT_ENDPOINT_REM NNG_EVENT_BIT(6)
// The following functions return more detailed information about the event.
// Some of the values will not make sense for some event types, in which case
diff --git a/src/platform/posix/posix_alloc.c b/src/platform/posix/posix_alloc.c
index 98a76669..cce348f3 100644
--- a/src/platform/posix/posix_alloc.c
+++ b/src/platform/posix/posix_alloc.c
@@ -15,9 +15,9 @@
// POSIX memory allocation. This is pretty much standard C.
void *
-nni_alloc(size_t size)
+nni_alloc(size_t sz)
{
- return (calloc(1, size));
+ return (calloc(1, sz));
}
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 1d4e58d0..cc906791 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -91,9 +91,11 @@ static void
nni_pair_pipe_fini(void *arg)
{
nni_pair_pipe *pp = arg;
+
NNI_FREE_STRUCT(pp);
}
+
static int
nni_pair_pipe_add(void *arg)
{
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 60346390..96413523 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -263,7 +263,7 @@ nni_rep_pipe_recv(void *arg)
for (;;) {
size_t len;
- char *body;
+ uint8_t *body;
int hops;
again:
@@ -299,6 +299,7 @@ again:
goto again;
}
nni_msg_trim(msg, 4);
+ body = nni_msg_body(msg, &len);
if (end) {
break;
}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index b8401107..9cfc6f54 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -308,7 +308,6 @@ nni_req_sendfilter(void *arg, nni_msg *msg)
{
nni_req_sock *req = arg;
uint32_t id;
- uint8_t buf[4];
nni_mtx_lock(&req->mx);
if (req->raw) {
@@ -329,7 +328,7 @@ nni_req_sendfilter(void *arg, nni_msg *msg)
req->reqid[2] = (uint8_t) (id >> 8);
req->reqid[3] = (uint8_t) (id);
- if (nni_msg_append_header(msg, buf, 4) != 0) {
+ if (nni_msg_append_header(msg, req->reqid, 4) != 0) {
// Should be ENOMEM.
nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index ff0e9a2e..530e7ccf 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -132,7 +132,17 @@ static int
nni_inproc_pipe_send(void *arg, nni_msg *msg)
{
nni_inproc_pipe *pipe = arg;
-
+ char *h;
+ size_t l;
+
+ // We need to move any header data to the body, because the other
+ // side won't know what to do otherwise.
+ h = nni_msg_header(msg, &l);
+ if (nni_msg_prepend(msg, h, l) != 0) {
+ nni_msg_free(msg);
+ return (0); // Pretend we sent it.
+ }
+ nni_msg_trim_header(msg, l);
return (nni_msgq_put(pipe->wq, msg));
}