aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/message.c78
-rw-r--r--src/core/message.h3
-rw-r--r--src/core/socket.c5
-rw-r--r--src/nng.c13
-rw-r--r--src/nng.h2
5 files changed, 88 insertions, 13 deletions
diff --git a/src/core/message.c b/src/core/message.c
index f1e1fc99..6976f4da 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -27,9 +27,17 @@ struct nng_msg {
nni_chunk m_header;
nni_chunk m_body;
nni_time m_expire; // usec
- nni_pipe * m_pipe; // Pipe message was received on
+ nni_list m_options;
};
+typedef struct {
+ int mo_num;
+ size_t mo_sz;
+ void * mo_val;
+ nni_list_node mo_node;
+} nni_msgopt;
+
+
// nni_chunk_grow increases the underlying space for a chunk. It ensures
// that the desired amount of trailing space (including the length)
// and headroom (excluding the length) are available. It also copies
@@ -238,6 +246,7 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
nni_panic("chunk_append failed");
}
+ NNI_LIST_INIT(&m->m_options, nni_msgopt, mo_node);
*mp = m;
return (0);
}
@@ -246,13 +255,72 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
void
nni_msg_free(nni_msg *m)
{
+ nni_msgopt *mo;
+
nni_chunk_free(&m->m_header);
nni_chunk_free(&m->m_body);
+ while ((mo = nni_list_first(&m->m_options)) != NULL) {
+ nni_list_remove(&m->m_options, mo);
+ nni_free(mo, sizeof (*mo) + mo->mo_sz);
+ }
nni_free(m, sizeof (*m));
}
int
+nni_msg_setopt(nni_msg *m, int opt, const void *val, size_t sz)
+{
+ // Find the existing option if present. Note that if we alter
+ // a value, we can wind up trashing old data due to ENOMEM.
+ nni_msgopt *oldmo, *newmo;
+
+ NNI_LIST_FOREACH (&m->m_options, oldmo) {
+ if (oldmo->mo_num == opt) {
+ if (sz == oldmo->mo_sz) {
+ // nice! we can just overwrite old value
+ memcpy(oldmo->mo_val, val, sz);
+ return (0);
+ }
+ break;
+ }
+ }
+ if ((newmo = nni_alloc(sizeof (*newmo) + sz)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ newmo->mo_val = ((char *) newmo + sizeof (*newmo));
+ newmo->mo_sz = sz;
+ newmo->mo_num = opt;
+ memcpy(newmo->mo_val, val, sz);
+ if (oldmo != NULL) {
+ nni_list_remove(&m->m_options, oldmo);
+ nni_free(oldmo, sizeof (*oldmo) + oldmo->mo_sz);
+ }
+ nni_list_append(&m->m_options, newmo);
+ return (0);
+}
+
+
+int
+nni_msg_getopt(nni_msg *m, int opt, void *val, size_t *szp)
+{
+ nni_msgopt *mo;
+
+ NNI_LIST_FOREACH (&m->m_options, mo) {
+ if (mo->mo_num == opt) {
+ int sz = *szp;
+ if (sz > mo->mo_sz) {
+ sz = mo->mo_sz;
+ memcpy(val, mo->mo_val, sz);
+ *szp = mo->mo_sz;
+ return (0);
+ }
+ }
+ }
+ return (NNG_ENOTSUP);
+}
+
+
+int
nni_msg_realloc(nni_msg *m, size_t sz)
{
int rv = 0;
@@ -344,11 +412,3 @@ nni_msg_trunc_header(nni_msg *m, size_t len)
{
return (nni_chunk_trunc(&m->m_header, len));
}
-
-
-int
-nni_msg_pipe(nni_msg *m, nni_pipe **pp)
-{
- *pp = m->m_pipe;
- return (0);
-}
diff --git a/src/core/message.h b/src/core/message.h
index 5c742e44..2982eeb7 100644
--- a/src/core/message.h
+++ b/src/core/message.h
@@ -25,6 +25,7 @@ extern int nni_msg_trim(nni_msg *, size_t);
extern int nni_msg_trunc(nni_msg *, size_t);
extern int nni_msg_trim_header(nni_msg *, size_t);
extern int nni_msg_trunc_header(nni_msg *, size_t);
-extern int nni_msg_pipe(nni_msg *, nni_pipe **);
+extern int nni_msg_setopt(nni_msg *, int, const void *, size_t);
+extern int nni_msg_getopt(nni_msg *, int, void *, size_t *);
#endif // CORE_SOCKET_H
diff --git a/src/core/socket.c b/src/core/socket.c
index adf2e082..9db61838 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -557,6 +557,7 @@ nni_setopt_duration(nni_duration *ptr, const void *val, size_t size)
return (0);
}
+
static int
nni_getopt_duration(nni_duration *ptr, void *val, size_t *sizep)
{
@@ -570,6 +571,7 @@ nni_getopt_duration(nni_duration *ptr, void *val, size_t *sizep)
return (0);
}
+
int
nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size)
{
@@ -606,6 +608,7 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size)
return (rv);
}
+
int
nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep)
{
@@ -641,5 +644,3 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep)
nni_mutex_exit(&sock->s_mx);
return (rv);
}
-
-
diff --git a/src/nng.c b/src/nng.c
index 5a0acaac..a55b210f 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -70,6 +70,7 @@ nng_recvmsg(nng_socket *s, nng_msg **msgp, int flags)
return (nni_socket_recvmsg(s, msgp, expire));
}
+
int
nng_sendmsg(nng_socket *s, nng_msg *msg, int flags)
{
@@ -180,3 +181,15 @@ nng_msg_free(nng_msg *msg)
nni_init();
return (nni_msg_free(msg));
}
+
+
+int
+nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp)
+{
+ int rv;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ return (nni_msg_getopt(msg, opt, ptr, szp));
+}
diff --git a/src/nng.h b/src/nng.h
index 653fa3cf..5afbca8d 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -185,7 +185,6 @@ NNG_DECL void nng_msg_free(nng_msg *);
NNG_DECL int nng_msg_realloc(nng_msg *, size_t);
NNG_DECL void *nng_msg_header(nng_msg *, size_t *);
NNG_DECL void *nng_msg_body(nng_msg *, size_t *);
-NNG_DECL int nng_msg_pipe(nng_msg *, nng_pipe **);
NNG_DECL int nng_msg_append(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_prepend(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_trim(nng_msg *, size_t);
@@ -194,6 +193,7 @@ NNG_DECL int nng_msg_append_header(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_prepend_header(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_trim_header(nng_msg *, size_t);
NNG_DECL int nng_msg_trunc_header(nng_msg *, size_t);
+NNG_DECL int nng_msg_getopt(nng_msg *, int, void *, size_t *);
// Pipe API. Generally pipes are only "observable" to applications, but
// we do permit an application to close a pipe. This can be useful, for