diff options
| -rw-r--r-- | src/core/message.c | 78 | ||||
| -rw-r--r-- | src/core/message.h | 3 | ||||
| -rw-r--r-- | src/core/socket.c | 5 | ||||
| -rw-r--r-- | src/nng.c | 13 | ||||
| -rw-r--r-- | src/nng.h | 2 |
5 files changed, 88 insertions, 13 deletions
diff --git a/src/core/message.c b/src/core/message.c index f1e1fc99..6976f4da 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -27,9 +27,17 @@ struct nng_msg { nni_chunk m_header; nni_chunk m_body; nni_time m_expire; // usec - nni_pipe * m_pipe; // Pipe message was received on + nni_list m_options; }; +typedef struct { + int mo_num; + size_t mo_sz; + void * mo_val; + nni_list_node mo_node; +} nni_msgopt; + + // nni_chunk_grow increases the underlying space for a chunk. It ensures // that the desired amount of trailing space (including the length) // and headroom (excluding the length) are available. It also copies @@ -238,6 +246,7 @@ nni_msg_alloc(nni_msg **mp, size_t sz) nni_panic("chunk_append failed"); } + NNI_LIST_INIT(&m->m_options, nni_msgopt, mo_node); *mp = m; return (0); } @@ -246,13 +255,72 @@ nni_msg_alloc(nni_msg **mp, size_t sz) void nni_msg_free(nni_msg *m) { + nni_msgopt *mo; + nni_chunk_free(&m->m_header); nni_chunk_free(&m->m_body); + while ((mo = nni_list_first(&m->m_options)) != NULL) { + nni_list_remove(&m->m_options, mo); + nni_free(mo, sizeof (*mo) + mo->mo_sz); + } nni_free(m, sizeof (*m)); } int +nni_msg_setopt(nni_msg *m, int opt, const void *val, size_t sz) +{ + // Find the existing option if present. Note that if we alter + // a value, we can wind up trashing old data due to ENOMEM. + nni_msgopt *oldmo, *newmo; + + NNI_LIST_FOREACH (&m->m_options, oldmo) { + if (oldmo->mo_num == opt) { + if (sz == oldmo->mo_sz) { + // nice! we can just overwrite old value + memcpy(oldmo->mo_val, val, sz); + return (0); + } + break; + } + } + if ((newmo = nni_alloc(sizeof (*newmo) + sz)) == NULL) { + return (NNG_ENOMEM); + } + newmo->mo_val = ((char *) newmo + sizeof (*newmo)); + newmo->mo_sz = sz; + newmo->mo_num = opt; + memcpy(newmo->mo_val, val, sz); + if (oldmo != NULL) { + nni_list_remove(&m->m_options, oldmo); + nni_free(oldmo, sizeof (*oldmo) + oldmo->mo_sz); + } + nni_list_append(&m->m_options, newmo); + return (0); +} + + +int +nni_msg_getopt(nni_msg *m, int opt, void *val, size_t *szp) +{ + nni_msgopt *mo; + + NNI_LIST_FOREACH (&m->m_options, mo) { + if (mo->mo_num == opt) { + int sz = *szp; + if (sz > mo->mo_sz) { + sz = mo->mo_sz; + memcpy(val, mo->mo_val, sz); + *szp = mo->mo_sz; + return (0); + } + } + } + return (NNG_ENOTSUP); +} + + +int nni_msg_realloc(nni_msg *m, size_t sz) { int rv = 0; @@ -344,11 +412,3 @@ nni_msg_trunc_header(nni_msg *m, size_t len) { return (nni_chunk_trunc(&m->m_header, len)); } - - -int -nni_msg_pipe(nni_msg *m, nni_pipe **pp) -{ - *pp = m->m_pipe; - return (0); -} diff --git a/src/core/message.h b/src/core/message.h index 5c742e44..2982eeb7 100644 --- a/src/core/message.h +++ b/src/core/message.h @@ -25,6 +25,7 @@ extern int nni_msg_trim(nni_msg *, size_t); extern int nni_msg_trunc(nni_msg *, size_t); extern int nni_msg_trim_header(nni_msg *, size_t); extern int nni_msg_trunc_header(nni_msg *, size_t); -extern int nni_msg_pipe(nni_msg *, nni_pipe **); +extern int nni_msg_setopt(nni_msg *, int, const void *, size_t); +extern int nni_msg_getopt(nni_msg *, int, void *, size_t *); #endif // CORE_SOCKET_H diff --git a/src/core/socket.c b/src/core/socket.c index adf2e082..9db61838 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -557,6 +557,7 @@ nni_setopt_duration(nni_duration *ptr, const void *val, size_t size) return (0); } + static int nni_getopt_duration(nni_duration *ptr, void *val, size_t *sizep) { @@ -570,6 +571,7 @@ nni_getopt_duration(nni_duration *ptr, void *val, size_t *sizep) return (0); } + int nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size) { @@ -606,6 +608,7 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size) return (rv); } + int nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep) { @@ -641,5 +644,3 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep) nni_mutex_exit(&sock->s_mx); return (rv); } - - @@ -70,6 +70,7 @@ nng_recvmsg(nng_socket *s, nng_msg **msgp, int flags) return (nni_socket_recvmsg(s, msgp, expire)); } + int nng_sendmsg(nng_socket *s, nng_msg *msg, int flags) { @@ -180,3 +181,15 @@ nng_msg_free(nng_msg *msg) nni_init(); return (nni_msg_free(msg)); } + + +int +nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp) +{ + int rv; + + if ((rv = nni_init()) != 0) { + return (rv); + } + return (nni_msg_getopt(msg, opt, ptr, szp)); +} @@ -185,7 +185,6 @@ NNG_DECL void nng_msg_free(nng_msg *); NNG_DECL int nng_msg_realloc(nng_msg *, size_t); NNG_DECL void *nng_msg_header(nng_msg *, size_t *); NNG_DECL void *nng_msg_body(nng_msg *, size_t *); -NNG_DECL int nng_msg_pipe(nng_msg *, nng_pipe **); NNG_DECL int nng_msg_append(nng_msg *, const void *, size_t); NNG_DECL int nng_msg_prepend(nng_msg *, const void *, size_t); NNG_DECL int nng_msg_trim(nng_msg *, size_t); @@ -194,6 +193,7 @@ NNG_DECL int nng_msg_append_header(nng_msg *, const void *, size_t); NNG_DECL int nng_msg_prepend_header(nng_msg *, const void *, size_t); NNG_DECL int nng_msg_trim_header(nng_msg *, size_t); NNG_DECL int nng_msg_trunc_header(nng_msg *, size_t); +NNG_DECL int nng_msg_getopt(nng_msg *, int, void *, size_t *); // Pipe API. Generally pipes are only "observable" to applications, but // we do permit an application to close a pipe. This can be useful, for |
