aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-24 15:54:44 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-24 22:51:01 -0800
commit3b73756a5e5d075adfd03f5c49014094428d818f (patch)
tree7109b8952eeb03fd47c293c5d915b6a9f7d6433d /src
parent3bed9dca3a3ae5b226e3bf6aee3352d7665dbcc2 (diff)
downloadnng-3b73756a5e5d075adfd03f5c49014094428d818f.tar.gz
nng-3b73756a5e5d075adfd03f5c49014094428d818f.tar.bz2
nng-3b73756a5e5d075adfd03f5c49014094428d818f.zip
Change in handling of extended info for messages.
Instead of supplying a pipe, and expecting that the info there would be included we use nng_msg_getopt(). This will be enabled by the app asking for extended information by setting an option, we don't copy the data for every app (most won't care). This means we don't have to worry about reference counting the pipe for the life of associated messages.
Diffstat (limited to 'src')
-rw-r--r--src/core/message.c78
-rw-r--r--src/core/message.h3
-rw-r--r--src/core/socket.c5
-rw-r--r--src/nng.c13
-rw-r--r--src/nng.h2
5 files changed, 88 insertions, 13 deletions
diff --git a/src/core/message.c b/src/core/message.c
index f1e1fc99..6976f4da 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -27,9 +27,17 @@ struct nng_msg {
nni_chunk m_header;
nni_chunk m_body;
nni_time m_expire; // usec
- nni_pipe * m_pipe; // Pipe message was received on
+ nni_list m_options;
};
+typedef struct {
+ int mo_num;
+ size_t mo_sz;
+ void * mo_val;
+ nni_list_node mo_node;
+} nni_msgopt;
+
+
// nni_chunk_grow increases the underlying space for a chunk. It ensures
// that the desired amount of trailing space (including the length)
// and headroom (excluding the length) are available. It also copies
@@ -238,6 +246,7 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
nni_panic("chunk_append failed");
}
+ NNI_LIST_INIT(&m->m_options, nni_msgopt, mo_node);
*mp = m;
return (0);
}
@@ -246,13 +255,72 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
void
nni_msg_free(nni_msg *m)
{
+ nni_msgopt *mo;
+
nni_chunk_free(&m->m_header);
nni_chunk_free(&m->m_body);
+ while ((mo = nni_list_first(&m->m_options)) != NULL) {
+ nni_list_remove(&m->m_options, mo);
+ nni_free(mo, sizeof (*mo) + mo->mo_sz);
+ }
nni_free(m, sizeof (*m));
}
int
+nni_msg_setopt(nni_msg *m, int opt, const void *val, size_t sz)
+{
+ // Find the existing option if present. Note that if we alter
+ // a value, we can wind up trashing old data due to ENOMEM.
+ nni_msgopt *oldmo, *newmo;
+
+ NNI_LIST_FOREACH (&m->m_options, oldmo) {
+ if (oldmo->mo_num == opt) {
+ if (sz == oldmo->mo_sz) {
+ // nice! we can just overwrite old value
+ memcpy(oldmo->mo_val, val, sz);
+ return (0);
+ }
+ break;
+ }
+ }
+ if ((newmo = nni_alloc(sizeof (*newmo) + sz)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ newmo->mo_val = ((char *) newmo + sizeof (*newmo));
+ newmo->mo_sz = sz;
+ newmo->mo_num = opt;
+ memcpy(newmo->mo_val, val, sz);
+ if (oldmo != NULL) {
+ nni_list_remove(&m->m_options, oldmo);
+ nni_free(oldmo, sizeof (*oldmo) + oldmo->mo_sz);
+ }
+ nni_list_append(&m->m_options, newmo);
+ return (0);
+}
+
+
+int
+nni_msg_getopt(nni_msg *m, int opt, void *val, size_t *szp)
+{
+ nni_msgopt *mo;
+
+ NNI_LIST_FOREACH (&m->m_options, mo) {
+ if (mo->mo_num == opt) {
+ int sz = *szp;
+ if (sz > mo->mo_sz) {
+ sz = mo->mo_sz;
+ memcpy(val, mo->mo_val, sz);
+ *szp = mo->mo_sz;
+ return (0);
+ }
+ }
+ }
+ return (NNG_ENOTSUP);
+}
+
+
+int
nni_msg_realloc(nni_msg *m, size_t sz)
{
int rv = 0;
@@ -344,11 +412,3 @@ nni_msg_trunc_header(nni_msg *m, size_t len)
{
return (nni_chunk_trunc(&m->m_header, len));
}
-
-
-int
-nni_msg_pipe(nni_msg *m, nni_pipe **pp)
-{
- *pp = m->m_pipe;
- return (0);
-}
diff --git a/src/core/message.h b/src/core/message.h
index 5c742e44..2982eeb7 100644
--- a/src/core/message.h
+++ b/src/core/message.h
@@ -25,6 +25,7 @@ extern int nni_msg_trim(nni_msg *, size_t);
extern int nni_msg_trunc(nni_msg *, size_t);
extern int nni_msg_trim_header(nni_msg *, size_t);
extern int nni_msg_trunc_header(nni_msg *, size_t);
-extern int nni_msg_pipe(nni_msg *, nni_pipe **);
+extern int nni_msg_setopt(nni_msg *, int, const void *, size_t);
+extern int nni_msg_getopt(nni_msg *, int, void *, size_t *);
#endif // CORE_SOCKET_H
diff --git a/src/core/socket.c b/src/core/socket.c
index adf2e082..9db61838 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -557,6 +557,7 @@ nni_setopt_duration(nni_duration *ptr, const void *val, size_t size)
return (0);
}
+
static int
nni_getopt_duration(nni_duration *ptr, void *val, size_t *sizep)
{
@@ -570,6 +571,7 @@ nni_getopt_duration(nni_duration *ptr, void *val, size_t *sizep)
return (0);
}
+
int
nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size)
{
@@ -606,6 +608,7 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size)
return (rv);
}
+
int
nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep)
{
@@ -641,5 +644,3 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep)
nni_mutex_exit(&sock->s_mx);
return (rv);
}
-
-
diff --git a/src/nng.c b/src/nng.c
index 5a0acaac..a55b210f 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -70,6 +70,7 @@ nng_recvmsg(nng_socket *s, nng_msg **msgp, int flags)
return (nni_socket_recvmsg(s, msgp, expire));
}
+
int
nng_sendmsg(nng_socket *s, nng_msg *msg, int flags)
{
@@ -180,3 +181,15 @@ nng_msg_free(nng_msg *msg)
nni_init();
return (nni_msg_free(msg));
}
+
+
+int
+nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp)
+{
+ int rv;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ return (nni_msg_getopt(msg, opt, ptr, szp));
+}
diff --git a/src/nng.h b/src/nng.h
index 653fa3cf..5afbca8d 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -185,7 +185,6 @@ NNG_DECL void nng_msg_free(nng_msg *);
NNG_DECL int nng_msg_realloc(nng_msg *, size_t);
NNG_DECL void *nng_msg_header(nng_msg *, size_t *);
NNG_DECL void *nng_msg_body(nng_msg *, size_t *);
-NNG_DECL int nng_msg_pipe(nng_msg *, nng_pipe **);
NNG_DECL int nng_msg_append(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_prepend(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_trim(nng_msg *, size_t);
@@ -194,6 +193,7 @@ NNG_DECL int nng_msg_append_header(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_prepend_header(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_trim_header(nng_msg *, size_t);
NNG_DECL int nng_msg_trunc_header(nng_msg *, size_t);
+NNG_DECL int nng_msg_getopt(nng_msg *, int, void *, size_t *);
// Pipe API. Generally pipes are only "observable" to applications, but
// we do permit an application to close a pipe. This can be useful, for