diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-22 17:38:46 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-22 17:38:46 -0800 |
| commit | 89c847f1f52969ee2ae6ed35018eef40366ca061 (patch) | |
| tree | ba3f3c2da64e4a74c69d315b2198df59bcd4441b | |
| parent | 934c1316ae47754a2e368c65228c3cbfe552680f (diff) | |
| download | nng-89c847f1f52969ee2ae6ed35018eef40366ca061.tar.gz nng-89c847f1f52969ee2ae6ed35018eef40366ca061.tar.bz2 nng-89c847f1f52969ee2ae6ed35018eef40366ca061.zip | |
Work on endpoints. More C99 & type cleanups.
| -rw-r--r-- | src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/core/endpt.c | 123 | ||||
| -rw-r--r-- | src/core/endpt.h | 29 | ||||
| -rw-r--r-- | src/core/list.h | 47 | ||||
| -rw-r--r-- | src/core/message.c | 186 | ||||
| -rw-r--r-- | src/core/message.h | 51 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 32 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 14 | ||||
| -rw-r--r-- | src/core/pipe.c | 48 | ||||
| -rw-r--r-- | src/core/pipe.h | 8 | ||||
| -rw-r--r-- | src/core/platform.h | 1 | ||||
| -rw-r--r-- | src/core/protocol.c | 40 | ||||
| -rw-r--r-- | src/core/protocol.h | 181 | ||||
| -rw-r--r-- | src/core/socket.c | 37 | ||||
| -rw-r--r-- | src/core/transport.c | 46 | ||||
| -rw-r--r-- | src/core/transport.h | 198 | ||||
| -rw-r--r-- | src/nng.c | 52 | ||||
| -rw-r--r-- | src/nng.h | 608 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 4 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 2 |
20 files changed, 818 insertions, 890 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ab8d4659..88c1e332 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -28,6 +28,7 @@ set (NNG_SOURCES nng.h core/defs.h + core/endpt.c core/init.c core/init.h core/list.c diff --git a/src/core/endpt.c b/src/core/endpt.c new file mode 100644 index 00000000..b7181ce8 --- /dev/null +++ b/src/core/endpt.c @@ -0,0 +1,123 @@ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <string.h> +#include <stdlib.h> +#include <stdio.h> + +// Functionality realited to end points. +#if 0 +struct nng_endpt { + struct nni_endpt_ops ep_ops; + void * ep_data; + nni_list_node_t ep_sock_node; + nni_socket * ep_sock; + char ep_addr[NNG_MAXADDRLEN]; + nni_thread * ep_dialer; + nni_thread * ep_listener; + int ep_close; + nni_mutex ep_mx; + nni_cond ep_cv; +}; +#endif + +int +nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) +{ + nni_transport *tran; + nni_endpt *ep; + int rv; + + if ((tran = nni_transport_find(addr)) == NULL) { + return (NNG_EINVAL); + } + if (strlen(addr) >= NNG_MAXADDRLEN) { + return (NNG_EINVAL); + } + + if ((ep = nni_alloc(sizeof (*ep))) == NULL) { + return (NNG_ENOMEM); + } + ep->ep_dialer = NULL; + ep->ep_listener = NULL; + ep->ep_close = 0; + if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) { + nni_free(ep, sizeof (*ep)); + return (NNG_ENOMEM); + } + if ((rv = nni_cond_init(&ep->ep_cv, &ep->ep_mx)) != 0) { + nni_mutex_fini(&ep->ep_mx); + nni_free(ep, sizeof (*ep)); + return (NNG_ENOMEM); + } + + // Could safely use strcpy here, but this avoids discussion. + (void) snprintf(ep->ep_addr, sizeof (ep->ep_addr), "%s", addr); + ep->ep_sock = sock; + ep->ep_ops = *tran->tran_ep_ops; + + rv = ep->ep_ops.ep_create(&ep->ep_data, addr, nni_socket_proto(sock)); + if (rv != 0) { + nni_cond_fini(&ep->ep_cv); + nni_mutex_fini(&ep->ep_mx); + nni_free(ep, sizeof (*ep)); + return (rv); + } + NNI_LIST_INIT(&ep->ep_pipes, nni_pipe, p_ep_node); + *epp = ep; + return (0); +} + +void +nni_endpt_destroy(nni_endpt *ep) +{ + // We should already have been closed at this point, so this + // should proceed very quickly. + if (ep->ep_dialer) { + nni_thread_reap(ep->ep_dialer); + } + if (ep->ep_listener) { + nni_thread_reap(ep->ep_listener); + } + nni_mutex_enter(&ep->ep_mx); + while (nni_list_first(&ep->ep_pipes) != NULL) { + nni_cond_wait(&ep->ep_cv); + } + nni_mutex_exit(&ep->ep_mx); + + ep->ep_ops.ep_destroy(ep->ep_data); + + nni_cond_fini(&ep->ep_cv); + nni_mutex_fini(&ep->ep_mx); + nni_free(ep, sizeof (*ep)); +} + +void +nni_endpt_close(nni_endpt *ep) +{ + nni_mutex_enter(&ep->ep_mx); + if (ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + ep->ep_close = 1; + nni_cond_broadcast(&ep->ep_cv); + nni_mutex_exit(&ep->ep_mx); + ep->ep_ops.ep_close(ep->ep_data); +} + + +#if 0 +int nni_endpt_dial(nni_endpt *, nni_pipe **); +int nni_endpt_listen(nni_endpt *); +int nni_endpt_accept(nni_endpt *, nni_pipe **); +int nni_endpt_close(nni_endpt *); +#endif diff --git a/src/core/endpt.h b/src/core/endpt.h index 0fa69678..4054a997 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -12,27 +12,28 @@ #include "core/transport.h" -// NB: This structure is supplied here for use by the CORE. Use of this library -// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR -// TRANSPORTS. +// NB: This structure is supplied here for use by the CORE. Use of this +// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS +// OR TRANSPORTS. struct nng_endpt { struct nni_endpt_ops ep_ops; - void * ep_tran; - nni_list_node_t ep_sock_node; + void * ep_data; // Transport private + nni_list_node ep_sock_node; // Per socket list nni_socket * ep_sock; - const char * ep_addr; - nni_thread_t ep_dialer; - nni_thread_t ep_listener; + char ep_addr[NNG_MAXADDRLEN]; + nni_thread * ep_dialer; + nni_thread * ep_listener; int ep_close; nni_mutex ep_mx; nni_cond ep_cv; + nni_list ep_pipes; // Active list of pipes }; -int nni_endpt_create(nni_endpt **, nni_socket *, const char *); -void nni_endpt_destroy(nni_endpt *); -int nni_endpt_dial(nni_endpt *, nni_pipe **); -int nni_endpt_listen(nni_endpt *); -int nni_endpt_accept(nni_endpt *, nni_pipe **); -int nni_endpt_close(nni_endpt *); +extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *); +extern void nni_endpt_destroy(nni_endpt *); +extern int nni_endpt_dial(nni_endpt *, nni_pipe **); +extern int nni_endpt_listen(nni_endpt *); +extern int nni_endpt_accept(nni_endpt *, nni_pipe **); +extern void nni_endpt_close(nni_endpt *); #endif // CORE_ENDPT_H diff --git a/src/core/list.h b/src/core/list.h index 65d6ae9e..68e6080d 100644 --- a/src/core/list.h +++ b/src/core/list.h @@ -1,43 +1,44 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef CORE_LIST_H #define CORE_LIST_H #include "core/nng_impl.h" -/* - * In order to make life easy, we just define the list structures - * directly, and let consumers directly inline structures. - */ +// In order to make life easy, we just define the list structures +// directly, and let consumers directly inline structures. typedef struct nni_list_node { struct nni_list_node * ln_next; struct nni_list_node * ln_prev; -} nni_list_node_t; +} nni_list_node; typedef struct nni_list { struct nni_list_node ll_head; size_t ll_offset; -} nni_list_t; +} nni_list; -extern void nni_list_init_offset(nni_list_t *list, size_t offset); +typedef nni_list nni_list_t; +typedef nni_list_node nni_list_node_t; + +extern void nni_list_init_offset(nni_list *list, size_t offset); #define NNI_LIST_INIT(list, type, field) \ nni_list_init_offset(list, offsetof(type, field)) -extern void *nni_list_first(nni_list_t *); -extern void *nni_list_last(nni_list_t *); -extern void nni_list_append(nni_list_t *, void *); -extern void nni_list_prepend(nni_list_t *, void *); -extern void *nni_list_next(nni_list_t *, void *); -extern void *nni_list_prev(nni_list_t *, void *); -extern void nni_list_remove(nni_list_t *, void *); -extern void nni_list_node_init(nni_list_t *, void *); +extern void *nni_list_first(nni_list *); +extern void *nni_list_last(nni_list *); +extern void nni_list_append(nni_list *, void *); +extern void nni_list_prepend(nni_list *, void *); +extern void *nni_list_next(nni_list *, void *); +extern void *nni_list_prev(nni_list *, void *); +extern void nni_list_remove(nni_list *, void *); +extern void nni_list_node_init(nni_list *, void *); #define NNI_LIST_FOREACH(l, it) \ for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it)) diff --git a/src/core/message.c b/src/core/message.c index 4ba30ed6..6ec47cb9 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -1,51 +1,47 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include <stdlib.h> #include <string.h> #include "core/nng_impl.h" -/* - * Message API. - */ +// Message API. -/* Message chunk, internal to the message implementation. */ +// Message chunk, internal to the message implementation. typedef struct { - size_t ch_cap; /* allocated size */ - size_t ch_len; /* length in use */ - uint8_t * ch_buf; /* underlying buffer */ - uint8_t * ch_ptr; /* pointer to actual data */ -} chunk_t; + size_t ch_cap; // allocated size + size_t ch_len; // length in use + uint8_t * ch_buf; // underlying buffer + uint8_t * ch_ptr; // pointer to actual data +} nni_chunk; -/* Underlying message chunk. */ +// Underlying message structure. struct nng_msg { - chunk_t m_header; - chunk_t m_body; - int64_t m_expire; /* Unix usec */ - nni_pipe_t m_pipe; /* Pipe message was received on */ + nni_chunk m_header; + nni_chunk m_body; + nni_time m_expire; // usec + nni_pipe * m_pipe; // Pipe message was received on }; -/* - * chunk_grow increases the underlying space for a chunk. It ensures - * that the desired amount of trailing space (including the length) - * and headroom (excluding the length) are available. It also copies - * any extant referenced data. Note that the capacity will increase, - * but not the length. To increase the length of the referenced data, - * use either chunk_append or chunk_prepend. - * - * Note that having some headroom is useful when data must be prepended - * to a message - it avoids having to perform extra data copies, so we - * encourage initial allocations to start with sufficient room. - */ +// nni_chunk_grow increases the underlying space for a chunk. It ensures +// that the desired amount of trailing space (including the length) +// and headroom (excluding the length) are available. It also copies +// any extant referenced data. Note that the capacity will increase, +// but not the length. To increase the length of the referenced data, +// use either chunk_append or chunk_prepend. +// +// Note that having some headroom is useful when data must be prepended +// to a message - it avoids having to perform extra data copies, so we +// encourage initial allocations to start with sufficient room. static int -chunk_grow(chunk_t *ch, size_t newsz, size_t headwanted) +nni_chunk_grow(nni_chunk *ch, size_t newsz, size_t headwanted) { size_t headroom = 0; uint8_t *newbuf; @@ -111,7 +107,7 @@ chunk_grow(chunk_t *ch, size_t newsz, size_t headwanted) static void -chunk_free(chunk_t *ch) +nni_chunk_free(nni_chunk *ch) { if ((ch->ch_cap != 0) && (ch->ch_buf != NULL)) { nni_free(ch->ch_buf, ch->ch_cap); @@ -123,9 +119,9 @@ chunk_free(chunk_t *ch) } -/* chunk_trunc truncates the number of bytes from the end of the chunk. */ +// nni_chunk_trunc truncates bytes from the end of the chunk. static int -chunk_trunc(chunk_t *ch, size_t len) +nni_chunk_trunc(nni_chunk *ch, size_t len) { if (ch->ch_len < len) { return (NNG_EINVAL); @@ -135,9 +131,9 @@ chunk_trunc(chunk_t *ch, size_t len) } -/* chunk_trim removes the number of bytes from the beginning of the chunk. */ +// nni_chunk_trim removes bytes from the beginning of the chunk. static int -chunk_trim(chunk_t *ch, size_t len) +nni_chunk_trim(nni_chunk *ch, size_t len) { if (ch->ch_len < len) { return (NNG_EINVAL); @@ -148,20 +144,18 @@ chunk_trim(chunk_t *ch, size_t len) } -/* - * chunk_append appends the data to the chunk, growing the size as necessary. - * If the data pointer is NULL, then the chunk data region is allocated, but - * uninitialized. - */ +// nni_chunk_append appends the data to the chunk, growing as necessary. +// If the data pointer is NULL, then the chunk data region is allocated, +// but uninitialized. static int -chunk_append(chunk_t *ch, const void *data, size_t len) +nni_chunk_append(nni_chunk *ch, const void *data, size_t len) { int rv; if (len == 0) { return (0); } - if ((rv = chunk_grow(ch, len + ch->ch_len, 0)) != 0) { + if ((rv = nni_chunk_grow(ch, len + ch->ch_len, 0)) != 0) { return (rv); } if (ch->ch_ptr == NULL) { @@ -175,13 +169,11 @@ chunk_append(chunk_t *ch, const void *data, size_t len) } -/* - * chunk_prepend prepends data to the chunk, as efficiently as possible. - * If the data pointer is NULL, then no data is actually copied, but the - * data region will have "grown" in the beginning, with uninitialized data. - */ +// nni_chunk_prepend prepends data to the chunk, as efficiently as possible. +// If the data pointer is NULL, then no data is actually copied, but the +// data region will have "grown" in the beginning, with uninitialized data. static int -chunk_prepend(chunk_t *ch, const void *data, size_t len) +nni_chunk_prepend(nni_chunk *ch, const void *data, size_t len) { int rv; @@ -197,7 +189,7 @@ chunk_prepend(chunk_t *ch, const void *data, size_t len) } else if ((ch->ch_len + len) <= ch->ch_cap) { /* We had enough capacity, just shuffle data down. */ memmove(ch->ch_ptr + len, ch->ch_ptr, ch->ch_len); - } else if ((rv = chunk_grow(ch, 0, len)) == 0) { + } else if ((rv = nni_chunk_grow(ch, 0, len)) == 0) { /* We grew the chunk, so adjust. */ ch->ch_ptr -= len; } else { @@ -215,42 +207,38 @@ chunk_prepend(chunk_t *ch, const void *data, size_t len) int -nni_msg_alloc(nni_msg_t *mp, size_t sz) +nni_msg_alloc(nni_msg **mp, size_t sz) { - nni_msg_t m; + nni_msg *m; int rv; if ((m = nni_alloc(sizeof (*m))) == NULL) { return (NNG_ENOMEM); } - /* - * 64-bytes of header, including room for 32 bytes - * of headroom and 32 bytes of trailer. - */ - if ((rv = chunk_grow(&m->m_header, 32, 32)) != 0) { + // 64-bytes of header, including room for 32 bytes + // of headroom and 32 bytes of trailer. + if ((rv = nni_chunk_grow(&m->m_header, 32, 32)) != 0) { nni_free(m, sizeof (*m)); return (rv); } - /* - * If the message is less than 1024 bytes, or is not power - * of two aligned, then we insert a 32 bytes of headroom - * to allow for inlining backtraces, etc. We also allow the - * amount of space at the end for the same reason. Large aligned - * allocations are unmolested to avoid excessive overallocation. - */ + // If the message is less than 1024 bytes, or is not power + // of two aligned, then we insert a 32 bytes of headroom + // to allow for inlining backtraces, etc. We also allow the + // amount of space at the end for the same reason. Large aligned + // allocations are unmolested to avoid excessive overallocation. if ((sz < 1024) || ((sz & (sz-1)) != 0)) { - rv = chunk_grow(&m->m_body, sz + 32, 32); + rv = nni_chunk_grow(&m->m_body, sz + 32, 32); } else { - rv = chunk_grow(&m->m_body, sz, 0); + rv = nni_chunk_grow(&m->m_body, sz, 0); } if (rv != 0) { - chunk_free(&m->m_header); + nni_chunk_free(&m->m_header); nni_free(m, sizeof (*m)); } - if ((rv = chunk_append(&m->m_body, NULL, sz)) != 0) { - /* Should not happen since we just grew it to fit. */ + if ((rv = nni_chunk_append(&m->m_body, NULL, sz)) != 0) { + // Should not happen since we just grew it to fit. nni_panic("chunk_append failed"); } @@ -260,34 +248,34 @@ nni_msg_alloc(nni_msg_t *mp, size_t sz) void -nni_msg_free(nni_msg_t m) +nni_msg_free(nni_msg *m) { - chunk_free(&m->m_header); - chunk_free(&m->m_body); + nni_chunk_free(&m->m_header); + nni_chunk_free(&m->m_body); nni_free(m, sizeof (*m)); } int -nni_msg_realloc(nni_msg_t m, size_t sz) +nni_msg_realloc(nni_msg *m, size_t sz) { int rv = 0; if (m->m_body.ch_len < sz) { - rv = chunk_append(&m->m_body, NULL, sz - m->m_body.ch_len); + rv = nni_chunk_append(&m->m_body, NULL, sz - m->m_body.ch_len); if (rv != 0) { return (rv); } } else { - /* "Shrinking", just mark bytes at end usable again. */ - chunk_trunc(&m->m_body, m->m_body.ch_len - sz); + // "Shrinking", just mark bytes at end usable again. + nni_chunk_trunc(&m->m_body, m->m_body.ch_len - sz); } return (0); } void * -nni_msg_header(nni_msg_t m, size_t *szp) +nni_msg_header(nni_msg *m, size_t *szp) { if (szp != NULL) { *szp = m->m_header.ch_len; @@ -297,7 +285,7 @@ nni_msg_header(nni_msg_t m, size_t *szp) void * -nni_msg_body(nni_msg_t m, size_t *szp) +nni_msg_body(nni_msg *m, size_t *szp) { if (szp != NULL) { *szp = m->m_body.ch_len; @@ -307,63 +295,63 @@ nni_msg_body(nni_msg_t m, size_t *szp) int -nni_msg_append(nni_msg_t m, const void *data, size_t len) +nni_msg_append(nni_msg *m, const void *data, size_t len) { - return (chunk_append(&m->m_body, data, len)); + return (nni_chunk_append(&m->m_body, data, len)); } int -nni_msg_prepend(nni_msg_t m, const void *data, size_t len) +nni_msg_prepend(nni_msg *m, const void *data, size_t len) { - return (chunk_prepend(&m->m_body, data, len)); + return (nni_chunk_prepend(&m->m_body, data, len)); } int -nni_msg_trim(nni_msg_t m, size_t len) +nni_msg_trim(nni_msg *m, size_t len) { - return (chunk_trim(&m->m_body, len)); + return (nni_chunk_trim(&m->m_body, len)); } int -nni_msg_trunc(nni_msg_t m, size_t len) +nni_msg_trunc(nni_msg *m, size_t len) { - return (chunk_trunc(&m->m_body, len)); + return (nni_chunk_trunc(&m->m_body, len)); } int -nni_msg_append_header(nni_msg_t m, const void *data, size_t len) +nni_msg_append_header(nni_msg *m, const void *data, size_t len) { - return (chunk_append(&m->m_header, data, len)); + return (nni_chunk_append(&m->m_header, data, len)); } int -nni_msg_prepend_header(nni_msg_t m, const void *data, size_t len) +nni_msg_prepend_header(nni_msg *m, const void *data, size_t len) { - return (chunk_prepend(&m->m_header, data, len)); + return (nni_chunk_prepend(&m->m_header, data, len)); } int -nni_msg_trim_header(nni_msg_t m, size_t len) +nni_msg_trim_header(nni_msg *m, size_t len) { - return (chunk_trim(&m->m_header, len)); + return (nni_chunk_trim(&m->m_header, len)); } int -nni_msg_trunc_header(nni_msg_t m, size_t len) +nni_msg_trunc_header(nni_msg *m, size_t len) { - return (chunk_trunc(&m->m_header, len)); + return (nni_chunk_trunc(&m->m_header, len)); } int -nni_msg_pipe(nni_msg_t m, nni_pipe_t *pp) +nni_msg_pipe(nni_msg *m, nni_pipe **pp) { *pp = m->m_pipe; return (0); diff --git a/src/core/message.h b/src/core/message.h index 7b17a007..5c742e44 100644 --- a/src/core/message.h +++ b/src/core/message.h @@ -1,33 +1,30 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef CORE_MESSAGE_H #define CORE_MESSAGE_H -/* - * Internally used message API. Again, this stuff is not part of our public - * API. - */ +// Internally used message API. Again, this is not part of our public API. -extern int nni_msg_alloc(nni_msg_t *, size_t); -extern void nni_msg_free(nni_msg_t); -extern int nni_msg_realloc(nni_msg_t, size_t); -extern void *nni_msg_header(nni_msg_t, size_t *); -extern void *nni_msg_body(nni_msg_t, size_t *); -extern int nni_msg_append(nni_msg_t, const void *, size_t); -extern int nni_msg_prepend(nni_msg_t, const void *, size_t); -extern int nni_msg_append_header(nni_msg_t, const void *, size_t); -extern int nni_msg_prepend_header(nni_msg_t, const void *, size_t); -extern int nni_msg_trim(nni_msg_t, size_t); -extern int nni_msg_trunc(nni_msg_t, size_t); -extern int nni_msg_trim_header(nni_msg_t, size_t); -extern int nni_msg_trunc_header(nni_msg_t, size_t); -extern int nni_msg_pipe(nni_msg_t, nni_pipe_t *); +extern int nni_msg_alloc(nni_msg **, size_t); +extern void nni_msg_free(nni_msg *); +extern int nni_msg_realloc(nni_msg *, size_t); +extern void *nni_msg_header(nni_msg *, size_t *); +extern void *nni_msg_body(nni_msg *, size_t *); +extern int nni_msg_append(nni_msg *, const void *, size_t); +extern int nni_msg_prepend(nni_msg *, const void *, size_t); +extern int nni_msg_append_header(nni_msg *, const void *, size_t); +extern int nni_msg_prepend_header(nni_msg *, const void *, size_t); +extern int nni_msg_trim(nni_msg *, size_t); +extern int nni_msg_trunc(nni_msg *, size_t); +extern int nni_msg_trim_header(nni_msg *, size_t); +extern int nni_msg_trunc_header(nni_msg *, size_t); +extern int nni_msg_pipe(nni_msg *, nni_pipe **); -#endif /* CORE_SOCKET_H */ +#endif // CORE_SOCKET_H diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 7d892b88..b8d11e8c 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -52,7 +52,7 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap) nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } - if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) { + if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) { nni_cond_fini(&mq->mq_writeable); nni_cond_fini(&mq->mq_readable); nni_mutex_fini(&mq->mq_lock); @@ -90,7 +90,7 @@ nni_msgqueue_destroy(nni_msgqueue *mq) nni_msg_free(msg); } - nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t)); + nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg *)); nni_free(mq, sizeof (*mq)); } @@ -130,6 +130,8 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, (void) nni_cond_waituntil(&mq->mq_writeable, expire); } + // Once a queue is closed, you can't write to it. It can still be + // read from, at least until its empty. if (mq->mq_closed) { nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); @@ -175,19 +177,19 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, (void) nni_cond_waituntil(&mq->mq_readable, expire); } - if (mq->mq_closed) { - nni_mutex_exit(&mq->mq_lock); - return (NNG_ECLOSED); - } + // If there is any data left in the message queue, we will still + // provide it, so that the reader can drain. + if (mq->mq_len == 0) { + if (mq->mq_closed) { + nni_mutex_exit(&mq->mq_lock); + return (NNG_ECLOSED); + } - if ((mq->mq_len == 0) && (*signal)) { - // We are being interrupted. We only allow an interrupt - // if there is no data though, because we'd really prefer - // to give back the data. Otherwise our failure to deal - // with the data could lead to starvation; also lingering - // relies on this not interrupting if data is pending. - nni_mutex_exit(&mq->mq_lock); - return (NNG_EINTR); + if (*signal) { + // We are being interrupted. + nni_mutex_exit(&mq->mq_lock); + return (NNG_EINTR); + } } *msgp = mq->mq_msgs[mq->mq_get]; @@ -258,7 +260,7 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire) void nni_msgqueue_close(nni_msgqueue *mq) { - nni_msg_t msg; + nni_msg *msg; nni_mutex_enter(&mq->mq_lock); mq->mq_closed = 1; diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index d97482af..adea2dfa 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -15,8 +15,18 @@ // Message queues. Message queues work in some ways like Go channels; // they are a thread-safe way to pass messages between subsystems. They // do have additional capabilities though. -typedef struct nni_msgqueue * nni_msgqueue_t; -typedef struct nni_msgqueue nni_msgqueue; +// +// A closed message queue cannot be written to, but if there are messages +// still in it, it can be read from. (This allows them to play a role in +// draining/lingering.) +// +// Message queues can be closed many times safely. +// +// Readers & writers in a message queue can be woken either by a timeout +// or by a specific signal (arranged by the caller). +// +// TODO: Add message queue growing, and pushback. +typedef struct nni_msgqueue nni_msgqueue; // nni_msgqueue_create creates a message queue with the given capacity, // which must be a positive number. It returns NNG_EINVAL if the capacity diff --git a/src/core/pipe.c b/src/core/pipe.c index a617cf85..58a31d7f 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -1,64 +1,60 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "core/nng_impl.h" -/* - * This file contains functions relating to pipes. - * - * Operations on pipes (to the transport) are generally blocking operations, - * performed in the context of the protocol. - */ +// This file contains functions relating to pipes. +// +// Operations on pipes (to the transport) are generally blocking operations, +// performed in the context of the protocol. -/* nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. */ +// nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. uint32_t -nni_pipe_id(nni_pipe_t p) +nni_pipe_id(nni_pipe * p) { return (p->p_id); } int -nni_pipe_send(nni_pipe_t p, nng_msg_t msg) +nni_pipe_send(nni_pipe * p, nng_msg *msg) { return (p->p_ops.p_send(p->p_tran, msg)); } int -nni_pipe_recv(nni_pipe_t p, nng_msg_t *msgp) +nni_pipe_recv(nni_pipe * p, nng_msg **msgp) { return (p->p_ops.p_recv(p->p_tran, msgp)); } -/* - * nni_pipe_close closes the underlying connection. It is expected that - * subsequent attempts receive or send (including any waiting receive) will - * simply return NNG_ECLOSED. - */ +// nni_pipe_close closes the underlying connection. It is expected that +// subsequent attempts receive or send (including any waiting receive) will +// simply return NNG_ECLOSED. void -nni_pipe_close(nni_pipe_t p) +nni_pipe_close(nni_pipe * p) { p->p_ops.p_close(p->p_tran); } uint16_t -nni_pipe_peer(nni_pipe_t p) +nni_pipe_peer(nni_pipe * p) { return (p->p_ops.p_peer(p->p_tran)); } void -nni_pipe_destroy(nni_pipe_t p) +nni_pipe_destroy(nni_pipe * p) { p->p_ops.p_destroy(p->p_tran); nni_free(p, sizeof (*p)); diff --git a/src/core/pipe.h b/src/core/pipe.h index a4a03a93..b78eaa2a 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -22,16 +22,16 @@ struct nng_pipe { uint32_t p_id; struct nni_pipe_ops p_ops; void * p_tran; - nni_list_node_t p_sock_node; + nni_list_node p_sock_node; nni_socket * p_sock; - nni_list_node_t p_ep_node; + nni_list_node p_ep_node; nni_endpt * p_ep; }; // Pipe operations that protocols use. -extern int nni_pipe_recv(nni_pipe *, nng_msg_t *); -extern int nni_pipe_send(nni_pipe *, nng_msg_t); +extern int nni_pipe_recv(nni_pipe *, nng_msg **); +extern int nni_pipe_send(nni_pipe *, nng_msg *); extern uint32_t nni_pipe_id(nni_pipe *); extern void nni_pipe_close(nni_pipe *); diff --git a/src/core/platform.h b/src/core/platform.h index 58e4cd36..a674aac5 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -113,7 +113,6 @@ extern void nni_cond_wait(nni_cond *); // check the condition. It will return either NNG_ETIMEDOUT, or 0. extern int nni_cond_waituntil(nni_cond *, nni_time); -typedef struct nni_thread * nni_thread_t; typedef struct nni_thread nni_thread; // nni_thread_creates a thread that runs the given function. The thread diff --git a/src/core/protocol.c b/src/core/protocol.c index 113746b1..6399cd41 100644 --- a/src/core/protocol.c +++ b/src/core/protocol.c @@ -1,37 +1,33 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include <string.h> #include "core/nng_impl.h" -/* - * Protocol related stuff - generically. - */ +// Protocol related stuff - generically. -/* - * The list of protocols is hardwired. This is reasonably unlikely to - * change, as adding new protocols is not something intended to be done - * outside of the core. - */ -extern struct nni_protocol nni_pair_protocol; +// The list of protocols is hardwired. This is reasonably unlikely to +// change, as adding new protocols is not something intended to be done +// outside of the core. +extern nni_protocol nni_pair_protocol; -static struct nni_protocol *protocols[] = { +static nni_protocol *protocols[] = { &nni_pair_protocol, NULL }; -struct nni_protocol * +nni_protocol * nni_protocol_find(uint16_t num) { int i; - struct nni_protocol *p; + nni_protocol *p; for (i = 0; (p = protocols[i]) != NULL; i++) { if (p->proto_self == num) { @@ -45,7 +41,7 @@ nni_protocol_find(uint16_t num) const char * nni_protocol_name(uint16_t num) { - struct nni_protocol *p; + nni_protocol *p; if ((p = nni_protocol_find(num)) == NULL) { return (NULL); @@ -57,7 +53,7 @@ nni_protocol_name(uint16_t num) uint16_t nni_protocol_number(const char *name) { - struct nni_protocol *p; + nni_protocol *p; int i; for (i = 0; (p = protocols[i]) != NULL; i++) { diff --git a/src/core/protocol.h b/src/core/protocol.h index 21d3b6b9..a4b24177 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -1,128 +1,89 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef CORE_PROTOCOL_H #define CORE_PROTOCOL_H -/* - * Protocol implementation details. Protocols must implement the - * interfaces in this file. Note that implementing new protocols is - * not necessarily intended to be a trivial task. The protocol developer - * must understand the nature of nng, as they are responsible for handling - * most of the logic. The protocol generally does most of the work for - * locking, and calls into the transport's pipe functions to do actual - * work, and the pipe functions generally assume no locking is needed. - * As a consequence, most of the concurrency in nng exists in the protocol - * implementations. - * - * Pipe operations may block, or even reenter the protoccol entry points - * (for example nni_pipe_close() causes the protocols proto_remove_pipe - * entry point to be called), so it is very important that protocols do - * not hold any locks across calls to pipe functions. - */ - +// Protocol implementation details. Protocols must implement the +// interfaces in this file. Note that implementing new protocols is +// not necessarily intended to be a trivial task. The protocol developer +// must understand the nature of nng, as they are responsible for handling +// most of the logic. The protocol generally does most of the work for +// locking, and calls into the transport's pipe functions to do actual +// work, and the pipe functions generally assume no locking is needed. +// As a consequence, most of the concurrency in nng exists in the protocol +// implementations. struct nni_protocol { - /* - * Protocol information. - */ - uint16_t proto_self; /* our 16-bit protocol ID */ - uint16_t proto_peer; /* who we peer with (protocol ID) */ - const char * proto_name; /* string version of our name */ - - /* - * Create protocol instance data, which will be stored on the socket. - */ - int (*proto_create)(void **, nni_socket_t); - - /* - * Destroy the protocol instance. - */ + uint16_t proto_self; // our 16-bit protocol ID + uint16_t proto_peer; // who we peer with (protocol ID) + const char * proto_name; // string version of our name + + //Create protocol instance, which will be stored on the socket. + int (*proto_create)(void **, nni_socket *); + + // Destroy the protocol instance. void (*proto_destroy)(void *); - /* - * Shutdown the protocol instance, including giving time to - * drain any outbound frames (linger). The protocol is not - * required to honor the linger. - * XXX: This is probably redundant -- protocol should notice - * drain by getting NNG_ECLOSED on the upper write queue. - */ + // Shutdown the protocol instance, including giving time to + // drain any outbound frames (linger). The protocol is not + // required to honor the linger. + // XXX: This is probably redundant -- protocol should notice + // drain by getting NNG_ECLOSED on the upper write queue. void (*proto_shutdown)(void *); - /* - * Add and remove pipes. These are called as connections are - * created or destroyed. - */ - int (*proto_add_pipe)(void *, nni_pipe_t); - int (*proto_rem_pipe)(void *, nni_pipe_t); + // Add and remove pipes. These are called as connections are + // created or destroyed. + int (*proto_add_pipe)(void *, nni_pipe *); + int (*proto_rem_pipe)(void *, nni_pipe *); - /* - * Option manipulation. These may be NULL. - */ + // Option manipulation. These may be NULL. int (*proto_setopt)(void *, int, const void *, size_t); int (*proto_getopt)(void *, int, void *, size_t *); - /* - * Receive filter. This may be NULL, but if it isn't, then - * messages coming into the system are routed here just before - * being delivered to the application. To drop the message, - * the protocol should return NULL, otherwise the message - * (possibly modified). - */ - nng_msg_t (*proto_recv_filter)(void *, nni_msg_t); - - /* - * Send filter. This may be NULL, but if it isn't, then - * messages here are filtered just after they come from the - * application. - */ - nng_msg_t (*proto_send_filter)(void *, nni_msg_t); + // Receive filter. This may be NULL, but if it isn't, then + // messages coming into the system are routed here just before being + // delivered to the application. To drop the message, the prtocol + // should return NULL, otherwise the message (possibly modified). + nni_msg * (*proto_recv_filter)(void *, nni_msg *); + + // Send filter. This may be NULL, but if it isn't, then messages + // here are filtered just after they come from the application. + nni_msg * (*proto_send_filter)(void *, nni_msg *); }; -/* - * These are socket methods that protocol operations can - * reasonably expect to call. - */ - -/* - * nni_socket_sendq obtains the upper writeq. The protocol should - * recieve messages from this, and place them on the appropriate - * pipe. - */ -extern nni_msgqueue_t nni_socket_sendq(nni_socket_t); - -/* - * nni_socket_recvq obtains the upper readq. The protocol should - * inject incoming messages from pipes to it. - */ -extern nni_msgqueue_t nni_socket_recvq(nni_socket_t); - -/* - * nni_socket_recv_err sets an error code to be returned to clients - * rather than waiting for a message. Set it to 0 to resume normal - * receive operation. - */ -extern void nni_socket_recv_err(nni_socket_t, int); - -/* - * nni_socket_send_err sets an error code to be returned to clients - * when they try to send, so that they don't have to timeout waiting - * for their message to be accepted for send. Set it to 0 to resume - * normal send operations. - */ -extern void nni_socket_send_err(nni_socket_t, int); - -/* - * These functions are not used by protocols, but rather by the socket - * core implementation. The lookups can be used by transports as well. - */ -extern struct nni_protocol *nni_protocol_find(uint16_t); +// These are socket methods that protocol operations can expect to call. +// Note that each of these should be called without any locks held, since +// the socket can reenter the protocol. + +// nni_socket_sendq obtains the upper writeq. The protocol should +// recieve messages from this, and place them on the appropriate pipe. +extern nni_msgqueue *nni_socket_sendq(nni_socket *); + +// nni_socket_recvq obtains the upper readq. The protocol should +// inject incoming messages from pipes to it. +extern nni_msgqueue *nni_socket_recvq(nni_socket *); + +// nni_socket_recv_err sets an error code to be returned to clients +// rather than waiting for a message. Set it to 0 to resume normal +// receive operation. +extern void nni_socket_recv_err(nni_socket *, int); + +// nni_socket_send_err sets an error code to be returned to clients +// when they try to send, so that they don't have to timeout waiting +// for their message to be accepted for send. Set it to 0 to resume +// normal send operations. +extern void nni_socket_send_err(nni_socket *, int); + +// These functions are not used by protocols, but rather by the socket +// core implementation. The lookups can be used by transports as well. +extern nni_protocol *nni_protocol_find(uint16_t); extern const char *nni_protocol_name(uint16_t); extern uint16_t nni_protocol_number(const char *); -#endif /* CORE_PROTOCOL_H */ +#endif // CORE_PROTOCOL_H diff --git a/src/core/socket.c b/src/core/socket.c index c500db31..1331bb4c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -13,14 +13,14 @@ // nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain // the upper read and write queues. -nni_msgqueue_t +nni_msgqueue * nni_socket_sendq(nni_socket *s) { return (s->s_uwq); } -nni_msgqueue_t +nni_msgqueue * nni_socket_recvq(nni_socket *s) { return (s->s_urq); @@ -53,8 +53,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) return (rv); } - NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node); - // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); + NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_sock_node); + NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_sock_node); if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { nni_cond_fini(&sock->s_cv); @@ -83,14 +83,7 @@ nni_socket_close(nni_socket *sock) // Stop all EPS. We're going to do this first, since we know // we're closing. NNI_LIST_FOREACH (&sock->s_eps, ep) { -#if 0 - // XXX: Question: block for them now, or wait further down? - // Probably we can simply just watch for the first list... - // stopping EPs should be *dead* easy, and never block. - nni_ep_stop(ep); - or nni_ep_shutdown(ep); - -#endif + nni_endpt_close(ep); } nni_mutex_exit(&sock->s_mx); @@ -133,9 +126,18 @@ nni_socket_close(nni_socket *sock) } // Wait to make sure endpoint listeners have shutdown and exited - // as well. They should have done so *long* ago. - while (nni_list_first(&sock->s_eps) != NULL) { - nni_cond_wait(&sock->s_cv); + // as well. They should have done so *long* ago. Because this + // requires waiting for threads to finish, which *could* in theory + // overlap with this, we must drop the socket lock. + while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + + // TODO: This looks like it should happen as an endpt_remove + // operation? + nni_list_remove(&sock->s_eps, ep); + nni_mutex_exit(&sock->s_mx); + + nni_endpt_destroy(ep); + nni_mutex_enter(&sock->s_mx); } nni_mutex_exit(&sock->s_mx); @@ -244,7 +246,6 @@ nni_socket_proto(nni_socket *sock) return (sock->s_ops.proto_self); } - // nni_socket_rem_pipe removes the pipe from the socket. This is often // called by the protocol when a pipe is removed due to close. void @@ -265,9 +266,11 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) // XXX: TODO: Redial // XXX: also publish event... // if (pipe->p_ep != NULL) { - // nn_endpt_remove_pipe(pipe->p_ep, pipe) + // nn_endpt_rem_pipe(pipe->p_ep, pipe) // } + nni_pipe_destroy(pipe); + // If we're closing, wake the socket if we finished draining. if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) { nni_cond_broadcast(&sock->s_cv); diff --git a/src/core/transport.c b/src/core/transport.c index 61ec5033..2e8dee7a 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -1,35 +1,33 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ - -#include <string.h> +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "core/nng_impl.h" -/* - * For now the list of transports is hard-wired. Adding new transports - * to the system dynamically is something that might be considered later. - */ -extern struct nni_transport nni_inproc_transport; +#include <string.h> + +// For now the list of transports is hard-wired. Adding new transports +// to the system dynamically is something that might be considered later. +extern nni_transport nni_inproc_transport; -static struct nni_transport *transports[] = { +static nni_transport *transports[] = { &nni_inproc_transport, NULL }; -struct nni_transport * +nni_transport * nni_transport_find(const char *addr) { - /* address is of the form "<scheme>://blah..." */ + // address is of the form "<scheme>://blah..." const char *end; int len; int i; - struct nni_transport *ops; + nni_transport *ops; if ((end = strstr(addr, "://")) == NULL) { return (NULL); @@ -44,15 +42,13 @@ nni_transport_find(const char *addr) } -/* - * nni_transport_init initializes the entire transport subsystem, including - * each individual transport. - */ +// nni_transport_init initializes the entire transport subsystem, including +// each individual transport. void nni_transport_init(void) { int i; - struct nni_transport *ops; + nni_transport *ops; for (i = 0; (ops = transports[i]) != NULL; i++) { ops->tran_init(); @@ -64,7 +60,7 @@ void nni_transport_fini(void) { int i; - struct nni_transport *ops; + nni_transport *ops; for (i = 0; (ops = transports[i]) != NULL; i++) { if (ops->tran_fini != NULL) { diff --git a/src/core/transport.h b/src/core/transport.h index 370e87f5..4e39adaf 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -1,171 +1,123 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef CORE_TRANSPORT_H #define CORE_TRANSPORT_H -/* - * Transport implementation details. Transports must implement the - * interfaces in this file. - */ +// Transport implementation details. Transports must implement the +// interfaces in this file. struct nni_transport { - /* - * tran_scheme is the transport scheme, such as "tcp" or "inproc". - */ + // tran_scheme is the transport scheme, such as "tcp" or "inproc". const char * tran_scheme; - /* - * tran_ep_ops links our endpoint operations. - */ + // tran_ep_ops links our endpoint operations. const struct nni_endpt_ops * tran_ep_ops; - /* - * tran_pipe_ops links our pipe operations. - */ + // tran_pipe_ops links our pipe operations. const struct nni_pipe_ops * tran_pipe_ops; - /* - * tran_init, if not NULL, is called once during library - * initialization. - */ + // tran_init, if not NULL, is called once during library + // initialization. int (*tran_init)(void); - /* - * tran_fini, if not NULL, is called during library deinitialization. - * It should release any global resources, close any open files, etc. - * - * There will be no locks held, and no other threads running in the - * library. - * - * It is invalid to use any mutexes, condition variables, or - * threading routines. Mutexes and condition variables may be - * safely destroyed. - */ + // tran_fini, if not NULL, is called during library deinitialization. + // It should release any global resources, close any open files, etc. void (*tran_fini)(void); }; -/* - * Endpoint operations are called by the socket in a protocol-independent - * fashion. The socket makes individual calls, which are expected to block - * if appropriate (except for destroy). Endpoints are unable to call back - * into the socket, to prevent recusive entry and deadlock. - */ + +// Endpoint operations are called by the socket in a protocol-independent +// fashion. The socket makes individual calls, which are expected to block +// if appropriate (except for destroy). Endpoints are unable to call back +// into the socket, to prevent recusive entry and deadlock. struct nni_endpt_ops { - /* - * ep_create creates a vanilla endpoint. The value created is - * used for the first argument for all other endpoint functions. - */ + // ep_create creates a vanilla endpoint. The value created is + // used for the first argument for all other endpoint functions. int (*ep_create)(void **, const char *, uint16_t); - /* - * ep_destroy frees the resources associated with the endpoint. - * The endpoint will already have been closed. - */ + // ep_destroy frees the resources associated with the endpoint. + // The endpoint will already have been closed. void (*ep_destroy)(void *); - /* - * ep_dial starts dialing, and creates a new pipe, - * which is returned in the final argument. It can return errors - * NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED, - * NNG_ETIMEDOUT, and NNG_EPROTO. - */ + // ep_dial starts dialing, and creates a new pipe, + // which is returned in the final argument. It can return errors + // NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED, + // NNG_ETIMEDOUT, and NNG_EPROTO. int (*ep_dial)(void *, void **); - /* - * ep_listen just does the bind() and listen() work, - * reserving the address but not creating any connections. - * It should return NNG_EADDRINUSE if the address is already - * taken. It can also return NNG_EBADADDR for an unsuitable - * address, or NNG_EACCESS for permission problems. - */ + // ep_listen just does the bind() and listen() work, + // reserving the address but not creating any connections. + // It should return NNG_EADDRINUSE if the address is already + // taken. It can also return NNG_EBADADDR for an unsuitable + // address, or NNG_EACCESS for permission problems. int (*ep_listen)(void *); - /* - * ep_accept accepts an inbound connection, and creates - * a transport pipe, which is returned in the final argument. - */ + // ep_accept accepts an inbound connection, and creates + // a transport pipe, which is returned in the final argument. int (*ep_accept)(void *, void **); - /* - * ep_close stops the endpoint from operating altogether. It does - * not affect pipes that have already been created. - */ + // ep_close stops the endpoint from operating altogether. It does + // not affect pipes that have already been created. void (*ep_close)(void *); - /* ep_setopt sets an endpoint (transport-specific) option */ + // ep_setopt sets an endpoint (transport-specific) option. int (*ep_setopt)(void *, int, const void *, size_t); - /* ep_getopt gets an endpoint (transport-specific) option */ + // ep_getopt gets an endpoint (transport-specific) option. int (*ep_getopt)(void *, int, void *, size_t *); }; -/* - * Pipe operations are entry points called by the socket. These may be called - * with socket locks held, so it is forbidden for the transport to call - * back into the socket at this point. (Which is one reason pointers back - * to socket or even enclosing pipe state, are not provided.) - */ +// Pipe operations are entry points called by the socket. These may be called +// with socket locks held, so it is forbidden for the transport to call +// back into the socket at this point. (Which is one reason pointers back +// to socket or even enclosing pipe state, are not provided.) struct nni_pipe_ops { - /* - * p_destroy destroys the pipe. This should clean up all local resources, - * including closing files and freeing memory, used by the pipe. After - * this call returns, the system will not make further calls on the same - * pipe. - */ + // p_destroy destroys the pipe. This should clean up all local + // resources, including closing files and freeing memory, used by + // the pipe. After this call returns, the system will not make + // further calls on the same pipe. void (*p_destroy)(void *); - /* - * p_send sends the message. If the message cannot be received, then - * the caller may try again with the same message (or free it). If the - * call succeeds, then the transport has taken ownership of the message, - * and the caller may not use it again. The transport will have the - * responsibility to free the message (nng_msg_free()) when it is - * finished with it. - */ - int (*p_send)(void *, nng_msg_t); - - /* - * p_recv recvs the message. This is a blocking operation, and a read - * will be performed even for cases where no data is expected. This - * allows the socket to detect a closed socket, by the returned error - * NNG_ECLOSED. Note that the closed socket condition can arise as either - * a result of a remote peer closing the connection, or a synchronous - * call to p_close. - */ - int (*p_recv)(void *, nng_msg_t *); - - /* - * p_close closes the pipe. Further recv or send operations should - * return back NNG_ECLOSED. - */ + // p_send sends the message. If the message cannot be received, then + // the caller may try again with the same message (or free it). If + // the call succeeds, then the transport has taken ownership of the + // message, and the caller may not use it again. The transport will + // have the responsibility to free the message (nng_msg_free()) when + // it is finished with it. + int (*p_send)(void *, nni_msg *); + + // p_recv recvs the message. This is a blocking operation, and a read + // will be performed even for cases where no data is expected. This + // allows the socket to detect a closed socket, by the returned error + // NNG_ECLOSED. Note that the closed socket condition can arise as + // either a result of a remote peer closing the connection, or a + // synchronous call to p_close. + int (*p_recv)(void *, nng_msg **); + + // p_close closes the pipe. Further recv or send operations should + // return back NNG_ECLOSED. void (*p_close)(void *); - /* - * p_peer returns the peer protocol. This may arrive in whatever - * transport specific manner is appropriate. - */ + // p_peer returns the peer protocol. This may arrive in whatever + // transport specific manner is appropriate. uint16_t (*p_peer)(void *); - /* - * p_getopt gets an pipe (transport-specific) property. These values - * may not be changed once the pipe is created. - */ + // p_getopt gets an pipe (transport-specific) property. These values + // may not be changed once the pipe is created. int (*p_getopt)(void *, int, void *, size_t *); }; -/* - * These APIs are used by the framework internally, and not for use by - * transport implementations. - */ -extern struct nni_transport *nni_transport_find(const char *); +// These APIs are used by the framework internally, and not for use by +// transport implementations. +extern nni_transport *nni_transport_find(const char *); extern void nni_transport_init(void); extern void nni_transport_fini(void); -#endif /* CORE_TRANSPORT_H */ +#endif // CORE_TRANSPORT_H @@ -1,26 +1,24 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "core/nng_impl.h" -/* - * This file provides the "public" API. This is a thin wrapper around - * internal API functions. We use the public prefix instead of internal, - * to indicate that these interfaces are intended for applications to use - * directly. - * - * Anything not defined in this file, applications have no business using. - * Pretty much every function calls the nni_platform_init to check against - * fork related activity. - */ +// This file provides the "public" API. This is a thin wrapper around +// internal API functions. We use the public prefix instead of internal, +// to indicate that these interfaces are intended for applications to use +// directly. +// +// Anything not defined in this file, applications have no business using. +// Pretty much every function calls the nni_platform_init to check against +// fork related activity. int -nng_socket_create(nng_socket_t *s, uint16_t proto) +nng_socket_create(nng_socket **s, uint16_t proto) { int rv; @@ -32,7 +30,7 @@ nng_socket_create(nng_socket_t *s, uint16_t proto) int -nng_socket_close(nng_socket_t s) +nng_socket_close(nng_socket *s) { int rv; @@ -44,16 +42,14 @@ nng_socket_close(nng_socket_t s) uint16_t -nng_socket_protocol(nng_socket_t s) +nng_socket_protocol(nng_socket *s) { nni_init(); return (nni_socket_proto(s)); } -/* - * Misc. - */ +// Misc. const char * nng_strerror(int num) { @@ -98,11 +94,9 @@ nng_strerror(int num) } -/* - * Message handling. - */ +// Message handling. int -nng_msg_alloc(nng_msg_t *msgp, size_t size) +nng_msg_alloc(nng_msg **msgp, size_t size) { int rv; @@ -114,7 +108,7 @@ nng_msg_alloc(nng_msg_t *msgp, size_t size) void -nng_msg_free(nng_msg_t msg) +nng_msg_free(nng_msg *msg) { nni_init(); return (nni_msg_free(msg)); @@ -1,22 +1,20 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef NNG_H #define NNG_H -/* - * NNG (nanomsg-ng) is a next generation implementation of the SP protocols. - * The APIs have changed, and there is no attempt to provide API compatibility - * with legacy libnanomsg. This file defines the library consumer-facing - * Public API. Use of definitions or declarations not found in this header file - * is specfically unsupported and strongly discouraged. - */ +// NNG (nanomsg-ng) is a next generation implementation of the SP protocols. +// The APIs have changed, and there is no attempt to provide API compatibility +// with legacy libnanomsg. This file defines the library consumer-facing +// Public API. Use of definitions or declarations not found in this header +// file is specfically unsupported and strongly discouraged. #ifdef __cplusplus extern "C" { @@ -26,90 +24,67 @@ extern "C" { #include <stddef.h> #include <stdint.h> -/* - * NNG_DECL is used on declarations to deal with scope. - * For building Windows DLLs, it should be the appropriate - * __declspec(). (We recommend *not* building this library - * as a DLL, but instead linking it statically for your projects - * to minimize questions about link dependencies later.) - */ +// NNG_DECL is used on declarations to deal with scope. +// For building Windows DLLs, it should be the appropriate +// __declspec(). (We recommend *not* building this library +// as a DLL, but instead linking it statically for your projects +// to minimize questions about link dependencies later.) #ifndef NNG_DECL #define NNG_DECL extern #endif -/* - * Types common to nng. - */ -typedef struct nng_socket * nng_socket_t; -typedef struct nng_endpt * nng_endpt_t; -typedef struct nng_pipe * nng_pipe_t; -typedef struct nng_msg * nng_msg_t; -typedef struct nng_event * nng_event_t; -typedef struct nng_notify * nng_notify_t; -typedef struct nng_snapshot * nng_snapshot_t; -typedef struct nng_stat * nng_stat_t; - -/* - * nng_socket simply creates a socket of the given class. It returns an - * error code on failure, or zero on success. The socket starts in cooked - * mode. - */ -NNG_DECL int nng_socket_create(nng_socket_t *, uint16_t proto); - -/* - * nng_socket_close closes the socket, terminating all activity and - * closing any underlying connections and releasing any associated - * resources. Memory associated with the socket is freed, so it is an - * error to reference the socket in any way after this is called. Likewise, - * it is an error to reference any resources such as end points associated - * with the socket. - */ -NNG_DECL int nng_socket_close(nng_socket_t); - -/* - * nng_socket_protocol returns the protocol number the socket was created - * with. - */ -uint16_t nng_socket_protocol(nng_socket_t); - -/* - * nng_socket_setopt sets an option for a specific socket. - */ -NNG_DECL int nng_socket_setopt(nng_socket_t, int, const void *, size_t); - -/* - * nng_socket_getopt obtains the option for a socket. - */ -NNG_DECL int nng_socket_getopt(nng_socket_t, int, void *, size_t *); - -/* - * nng_notify_register sets a notification callback. The callback will be - * called for any of the requested events. The callback can be deregistered - * by calling nng_notify_unregister with the same handle. These notification - * callbacks are executed on a separate thread, to avoid potential lock - * recursion. - */ -NNG_DECL nng_notify_t nng_notify_register(nng_socket_t, int, - void (*)(nng_socket_t, nng_event_t, void *), void *); -NNG_DECL int nng_notify_unregister(nng_socket_t, nng_notify_t); - -/* - * Event types. Sockets can have multiple different kind of events. - * Note that these are edge triggered -- therefore the status indicated - * may have changed since the notification occurred. - * - * NNG_EVENT_RECV - A message is ready for receive. - * NNG_EVENT_SEND - A message can be sent. - * NNG_EVENT_ERROR - An error condition on the socket occurred. - * NNG_EVENT_PIPE_ADD - A new pipe (connection) is added to the socket. - * The argument is an nn_pipe_t. - * NNG_EVENT_PIPE_RM - A pipe (connection) is removed from the socket. - * The argument is an nn_pipe_t. - * NNG_EVENT_ENDPT_ADD - An endpoint is added to the socket. - * The argument is an nn_endpt_t. - * NNG_EVENT_ENDPT_RM - An endpoint is removed from the socket. - * The argument is an nn_endpt_t. - */ +// Types common to nng. +typedef struct nng_socket nng_socket; +typedef struct nng_endpt nng_endpt; +typedef struct nng_pipe nng_pipe; +typedef struct nng_msg nng_msg; +typedef struct nng_event nng_event; +typedef struct nng_notify nng_notify; +typedef struct nng_snapshot nng_snapshot; +typedef struct nng_stat nng_stat; + +// nng_socket simply creates a socket of the given class. It returns an +// error code on failure, or zero on success. The socket starts in cooked +// mode. +NNG_DECL int nng_socket_create(nng_socket **, uint16_t proto); + +// nng_socket_close closes the socket, terminating all activity and +// closing any underlying connections and releasing any associated +// resources. Memory associated with the socket is freed, so it is an +// error to reference the socket in any way after this is called. Likewise, +// it is an error to reference any resources such as endpoints or +// pipes associated with the socket. +NNG_DECL int nng_socket_close(nng_socket *); + +// nng_socket_protocol returns the protocol number of the socket. +uint16_t nng_socket_protocol(nng_socket *); + +// nng_socket_setopt sets an option for a specific socket. +NNG_DECL int nng_socket_setopt(nng_socket *, int, const void *, size_t); + +// nng_socket_getopt obtains the option for a socket. +NNG_DECL int nng_socket_getopt(nng_socket *, int, void *, size_t *); + +// nng_notify_register sets a notification callback. The callback will be +// called for any of the requested events. The callback can be deregistered +// by calling nng_notify_unregister with the same handle. These notification +// callbacks are executed on a separate thread, to avoid potential lock +// recursion. +NNG_DECL nng_notify *nng_notify_register(nng_socket *, int, + void (*)(nng_event *, void *), void *); +NNG_DECL int nng_notify_unregister(nng_socket *, nng_notify *); + +// Event types. Sockets can have multiple different kind of events. +// Note that these are edge triggered -- therefore the status indicated +// may have changed since the notification occurred. +// +// NNG_EVENT_RECV - A message is ready for receive. +// NNG_EVENT_SEND - A message can be sent. +// NNG_EVENT_ERROR - An error condition on the socket occurred. +// NNG_EVENT_PIPE_ADD - A new pipe (connection) is added to the socket. +// NNG_EVENT_PIPE_RM - A pipe (connection) is removed from the socket. +// NNG_EVENT_ENDPT_ADD - An endpoint is added to the socket. +// NNG_EVENT_ENDPT_RM - An endpoint is removed from the socket. #define NNG_EVENT_BIT(x) (1U << (x)) #define NNG_EVENT_RECV NNG_EVENT_BIT(0) #define NNG_EVENT_SEND NNG_EVENT_BIT(1) @@ -119,147 +94,113 @@ NNG_DECL int nng_notify_unregister(nng_socket_t, nng_notify_t); #define NNG_EVENT_ENDPT_ADD NNG_EVENT_BIT(5) #define NNG_EVENT_ENDPT_RM NNG_EVENT_BIT(6) -/* - * The following functions return more detailed information about the event. - * Some of the values will not make sense for some event types, in which case - * the value returned will be NULL. - */ -NNG_DECL int nng_event_type(nng_event_t); -NNG_DECL nng_socket_t nng_event_socket(nng_event_t); -NNG_DECL nng_endpt_t nng_event_endpt(nng_event_t); -NNG_DECL nng_pipe_t nng_event_pipe(nng_event_t); -NNG_DECL const char *nng_event_reason(nng_event_t); - -/* - * nng_socket_listen creates a listening endpoint with no special options, - * and starts it listening. It is functionally equivalent to the legacy - * nn_bind(). The underlying endpoint is returned back to the caller. - */ -NNG_DECL int nng_socket_listen(nng_endpt_t *, nng_socket_t, const char *); - -/* - * nng_socket_dial creates a dialing endpoint, with no special options, - * and starts it dialing. Dialers have at most one active connection at a - * time. This is similar to the legacy nn_connect(). The underlying endpoint - * is returned back to the caller. - */ -NNG_DECL int nng_socket_dial(nng_endpt_t *, nng_socket_t, const char *); - -/* - * nng_socket_endpt creates an endpoint on the socket, but does not - * start it either dialing or connecting. - */ -NNG_DECL int nng_socket_endpt(nng_endpt_t *, nng_socket_t, const char *); - -/* - * nng_endpt_dial starts the endpoint dialing. This is only possible if - * the endpoint is not already dialing or listening. - */ -NNG_DECL int nng_endpt_dial(nng_endpt_t); - -/* - * nng_endpt_listen starts the endpoint listening. This is only possible if - * the endpoint is not already dialing or listening. - */ -NNG_DECL int nng_endpt_listen(nng_endpt_t); - -/* - * nng_endpt_close closes the endpt, shutting down all underlying - * connections and releasing all associated resources. It is an error to - * refer to the endpoint after this is called. - */ -NNG_DECL int nng_endpt_close(nng_endpt_t); - -/* - * nng_endpt_setopt sets an option for a specific endpoint. Note - * endpoint options may not be altered on a running endpoint. - */ -NNG_DECL int nng_endpt_setopt(nng_endpt_t, int, void *, size_t); - -/* - * nng_endpt_getopt obtains the option for an endpoint. - */ -NNG_DECL int nng_endpt_getopt(nng_endpt_t, int, void *, size_t *); - -/* - * nng_strerror returns a human readable string associated with the error - * code supplied. - */ +// The following functions return more detailed information about the event. +// Some of the values will not make sense for some event types, in which case +// the value returned will be NULL. +NNG_DECL int nng_event_type(nng_event *); +NNG_DECL nng_socket *nng_event_socket(nng_event *); +NNG_DECL nng_endpt *nng_event_endpt(nng_event *); +NNG_DECL nng_pipe *nng_event_pipe(nng_event *); +NNG_DECL const char *nng_event_reason(nng_event *); + +// nng_socket_listen creates a listening endpoint with no special options, +// and starts it listening. It is functionally equivalent to the legacy +// nn_bind(). The underlying endpoint is returned back to the caller. +NNG_DECL int nng_socket_listen(nng_endpt **, nng_socket *, const char *); + +// nng_socket_dial creates a dialing endpoint, with no special options, +// and starts it dialing. Dialers have at most one active connection at a +// time. This is similar to the legacy nn_connect(). The underlying endpoint +// is returned back to the caller. +NNG_DECL int nng_socket_dial(nng_endpt **, nng_socket *, const char *); + +// nng_socket_endpt creates an endpoint on the socket, but does not +// start it either dialing or listening. +NNG_DECL int nng_socket_endpt(nng_endpt **, nng_socket *, const char *); + +// nng_endpt_dial starts the endpoint dialing. This is only possible if +// the endpoint is not already dialing or listening. +NNG_DECL int nng_endpt_dial(nng_endpt *); + +// nng_endpt_listen starts the endpoint listening. This is only possible if +// the endpoint is not already dialing or listening. +NNG_DECL int nng_endpt_listen(nng_endpt *); + +// nng_endpt_close closes the endpt, shutting down all underlying +// connections and releasing all associated resources. It is an error to +// refer to the endpoint after this is called. +NNG_DECL int nng_endpt_close(nng_endpt *); + +// nng_endpt_setopt sets an option for a specific endpoint. Note +// endpoint options may not be altered on a running endpoint. +NNG_DECL int nng_endpt_setopt(nng_endpt *, int, void *, size_t); + +// nng_endpt_getopt obtains the option for an endpoint. +NNG_DECL int nng_endpt_getopt(nng_endpt *, int, void *, size_t *); + +// nng_strerror returns a human readable string associated with the error +// code supplied. NNG_DECL const char *nng_strerror(int); -/* - * nng_send sends (or arranges to send) the data on the socket. Note that - * this function may (will!) return before any receiver has actually - * received the data. The return value will be zero to indicate that the - * socket has accepted the entire data for send, or an errno to indicate - * failure. The flags may include NNG_FLAG_NONBLOCK. - */ -NNG_DECL int nng_send(nng_socket_t, const void *, size_t, int); - -/* - * nng_recv receives message data into the socket, up to the supplied size. - * The actual size of the message data will be written to the value pointed - * to by size. The flags may include NNG_FLAG_NONBLOCK and NNG_FLAG_ALLOC. - * If NNG_FLAG_ALLOC is supplied then the library will allocate memory for - * the caller. In that case the pointer to the allocated will be stored - * instead of the data itself. The caller is responsible for freeing the - * associated memory with free(). - */ -NNG_DECL int nng_recv(nng_socket_t, void *, size_t *, int); - -/* - * nng_sendmsg is like nng_send, but offers up a message structure, which - * gives the ability to provide more control over the message, including - * providing backtrace information. It also can take a message that was - * obtain via nn_recvmsg, allowing for zero copy forwarding. - */ -NNG_DECL int nng_sendmsg(nng_socket_t, nng_msg_t, int); - -/* - * nng_recvmsg is like nng_recv, but is used to obtain a message structure - * as well as the data buffer. This can be used to obtain more information - * about where the message came from, access raw headers, etc. It also - * can be passed off directly to nng_sendmsg. - */ -NNG_DECL int nng_recvmsg(nng_socket_t, nng_msg_t *, int); - -/* - * Message API. - */ -NNG_DECL int nng_msg_alloc(nng_msg_t *, size_t); -NNG_DECL void nng_msg_free(nng_msg_t); -NNG_DECL int nng_msg_realloc(nng_msg_t, size_t); -NNG_DECL void *nng_msg_header(nng_msg_t, size_t *); -NNG_DECL void *nng_msg_body(nng_msg_t, size_t *); -NNG_DECL int nng_msg_pipe(nng_msg_t, nng_pipe_t *); -NNG_DECL int nng_msg_append(nng_msg_t, const void *, size_t); -NNG_DECL int nng_msg_prepend(nng_msg_t, const void *, size_t); -NNG_DECL int nng_msg_trim(nng_msg_t, size_t); -NNG_DECL int nng_msg_trunc(nng_msg_t, size_t); -NNG_DECL int nng_msg_append_header(nng_msg_t, const void *, size_t); -NNG_DECL int nng_msg_prepend_header(nng_msg_t, const void *, size_t); -NNG_DECL int nng_msg_trim_header(nng_msg_t, size_t); -NNG_DECL int nng_msg_trunc_header(nng_msg_t, size_t); - -/* - * Pipe API. Generally pipes are only "observable" to applications, but - * we do permit an application to close a pipe. This can be useful, for - * example during a connection notification, to disconnect a pipe that - * is associated with an invalid or untrusted remote peer. - */ -NNG_DECL int nng_pipe_getopt(nng_pipe_t, int, void *, size_t *); -NNG_DECL int nng_pipe_close(nng_pipe_t); - -/* - * Protocol numbers. These are to be used with nng_socket_create(). - * These values are used on the wire, so must not be changed. The major - * number of the protocol is shifted left by 4 bits, and a subprotocol is - * assigned in the lower 4 bits. - * - * There are gaps in the list, which are obsolete or unsupported protocols. - * Protocol numbers are never more than 16 bits. Also, there will never be - * a valid protocol numbered 0 (NNG_PROTO_NONE). - */ +// nng_send sends (or arranges to send) the data on the socket. Note that +// this function may (will!) return before any receiver has actually +// received the data. The return value will be zero to indicate that the +// socket has accepted the entire data for send, or an errno to indicate +// failure. The flags may include NNG_FLAG_NONBLOCK. +NNG_DECL int nng_send(nng_socket *, const void *, size_t, int); + +// nng_recv receives message data into the socket, up to the supplied size. +// The actual size of the message data will be written to the value pointed +// to by size. The flags may include NNG_FLAG_NONBLOCK and NNG_FLAG_ALLOC. +// If NNG_FLAG_ALLOC is supplied then the library will allocate memory for +// the caller. In that case the pointer to the allocated will be stored +// instead of the data itself. The caller is responsible for freeing the +// associated memory with free(). +NNG_DECL int nng_recv(nng_socket *, void *, size_t *, int); + +// nng_sendmsg is like nng_send, but offers up a message structure, which +// gives the ability to provide more control over the message, including +// providing backtrace information. It also can take a message that was +// obtain via nn_recvmsg, allowing for zero copy forwarding. +NNG_DECL int nng_sendmsg(nng_socket *, nng_msg *, int); + +// nng_recvmsg is like nng_recv, but is used to obtain a message structure +// as well as the data buffer. This can be used to obtain more information +// about where the message came from, access raw headers, etc. It also +// can be passed off directly to nng_sendmsg. +NNG_DECL int nng_recvmsg(nng_socket *, nng_msg **, int); + +// Message API. +NNG_DECL int nng_msg_alloc(nng_msg **, size_t); +NNG_DECL void nng_msg_free(nng_msg *); +NNG_DECL int nng_msg_realloc(nng_msg *, size_t); +NNG_DECL void *nng_msg_header(nng_msg *, size_t *); +NNG_DECL void *nng_msg_body(nng_msg *, size_t *); +NNG_DECL int nng_msg_pipe(nng_msg *, nng_pipe **); +NNG_DECL int nng_msg_append(nng_msg *, const void *, size_t); +NNG_DECL int nng_msg_prepend(nng_msg *, const void *, size_t); +NNG_DECL int nng_msg_trim(nng_msg *, size_t); +NNG_DECL int nng_msg_trunc(nng_msg *, size_t); +NNG_DECL int nng_msg_append_header(nng_msg *, const void *, size_t); +NNG_DECL int nng_msg_prepend_header(nng_msg *, const void *, size_t); +NNG_DECL int nng_msg_trim_header(nng_msg *, size_t); +NNG_DECL int nng_msg_trunc_header(nng_msg *, size_t); + +// Pipe API. Generally pipes are only "observable" to applications, but +// we do permit an application to close a pipe. This can be useful, for +// example during a connection notification, to disconnect a pipe that +// is associated with an invalid or untrusted remote peer. +NNG_DECL int nng_pipe_getopt(nng_pipe *, int, void *, size_t *); +NNG_DECL int nng_pipe_close(nng_pipe *); + +// Protocol numbers. These are to be used with nng_socket_create(). +// These values are used on the wire, so must not be changed. The major +// number of the protocol is shifted left by 4 bits, and a subprotocol is +// assigned in the lower 4 bits. +// +// There are gaps in the list, which are obsolete or unsupported protocols. +// Protocol numbers are never more than 16 bits. Also, there will never be +// a valid protocol numbered 0 (NNG_PROTO_NONE). #define NNG_PROTO(major, minor) (((major) * 16) + (minor)) #define NNG_PROTO_NONE NNG_PROTO(0, 0) #define NNG_PROTO_PAIR NNG_PROTO(1, 0) @@ -274,14 +215,11 @@ NNG_DECL int nng_pipe_close(nng_pipe_t); #define NNG_PROTO_BUS NNG_PROTO(7, 0) #define NNG_PROTO_STAR NNG_PROTO(100, 0) -/* - * Options. We encode option numbers as follows: - * - * <level> - 0: socket, 1: transport - * <type> - zero (socket), or transport (8 bits) - * <code> - specific value (16 bits) - * - */ +// Options. We encode option numbers as follows: +// +// <level> - 0: socket, 1: transport +// <type> - zero (socket), or transport (8 bits) +// <code> - specific value (16 bits) #define NNG_OPT_SOCKET(c) (c) #define NNG_OPT_TRANSPORT_OPT(t, c) (0x10000 | ((p) << 16) | (c)) @@ -306,78 +244,62 @@ NNG_DECL int nng_pipe_close(nng_pipe_t); #define NNG_OPT_RECVFD NNG_OPT_SOCKET(18) #define NNG_OPT_SENDFD NNG_OPT_SOCKET(19) -/* XXX: TBD: priorities, socket names, ipv4only */ - -/* - * Statistics. These are for informational purposes only, and subject - * to change without notice. The API for accessing these is stable, - * but the individual statistic names, values, and meanings are all - * subject to change. - */ - -/* - * nng_snapshot_create creates a statistics snapshot. The snapshot - * object must be deallocated expressly by the user, and may persist beyond - * the lifetime of any socket object used to update it. Note that the - * values of the statistics are initially unset. - */ -NNG_DECL int nng_snapshot_create(nng_snapshot_t *); - -/* - * nng_snapshot_free frees a snapshot object. All statistic objects - * contained therein are destroyed as well. - */ -NNG_DECL void nng_snapshot_free(nng_snapshot_t); - -/* - * nng_snapshot_update updates a snapshot of all the statistics - * relevant to a particular socket. All prior values are overwritten. - * It is acceptable to use the same snapshot object with different - * sockets. - */ -NNG_DECL int nng_snapshot_update(nng_socket_t, nng_snapshot_t); - -/* - * nng_snapshot_iterate is used to iterate over the individual statistic - * objects inside the snapshot. Note that the statistic object, and the - * meta-data for the object (name, type, units) is fixed, and does not - * change for the entire life of the snapshot. Only the value - * is subject to change, and then only when a snapshot is updated. - * - * Iteration begins by providing NULL in the value referenced. Successive - * calls will update this value, returning NULL when no more statistics - * are available in the snapshot. - */ -NNG_DECL nng_stat_t nng_snapshot_iterate(nng_snapshot_t, nng_stat_t); - -/* - * nng_stat_name is used to determine the name of the statistic. - * This is a human readable name. Statistic names, as well as the presence - * or absence or semantic of any particular statistic are not part of any - * stable API, and may be changed without notice in future updates. - */ -NNG_DECL const char *nng_stat_name(nng_stat_t); - -/* - * nng_stat_type is used to determine the type of the statistic. - * At present, only NNG_STAT_TYPE_LEVEL and and NNG_STAT_TYPE_COUNTER - * are defined. Counters generally increment, and therefore changes in the - * value over time are likely more interesting than the actual level. Level - * values reflect some absolute state however, and should be presented to the - * user as is. - */ -NNG_DECL int nng_stat_type(nng_stat_t); +// XXX: TBD: priorities, socket names, ipv4only + +// Statistics. These are for informational purposes only, and subject +// to change without notice. The API for accessing these is stable, +// but the individual statistic names, values, and meanings are all +// subject to change. + +// nng_snapshot_create creates a statistics snapshot. The snapshot +// object must be deallocated expressly by the user, and may persist beyond +// the lifetime of any socket object used to update it. Note that the +// values of the statistics are initially unset. +NNG_DECL int nng_snapshot_create(nng_snapshot **); + +// nng_snapshot_free frees a snapshot object. All statistic objects +// contained therein are destroyed as well. +NNG_DECL void nng_snapshot_free(nng_snapshot *); + +// nng_snapshot_update updates a snapshot of all the statistics +// relevant to a particular socket. All prior values are overwritten. +// It is acceptable to use the same snapshot object with different +// sockets. +NNG_DECL int nng_snapshot_update(nng_socket *, nng_snapshot *); + +// nng_snapshot_iterate is used to iterate over the individual statistic +// objects inside the snapshot. Note that the statistic object, and the +// meta-data for the object (name, type, units) is fixed, and does not +// change for the entire life of the snapshot. Only the value +// is subject to change, and then only when a snapshot is updated. +// +// Iteration begins by providing NULL in the value referenced. Successive +// calls will update this value, returning NULL when no more statistics +// are available in the snapshot. +NNG_DECL nng_stat *nng_snapshot_iterate(nng_snapshot *, nng_stat *); + +// nng_stat_name is used to determine the name of the statistic. +// This is a human readable name. Statistic names, as well as the presence +// or absence or semantic of any particular statistic are not part of any +// stable API, and may be changed without notice in future updates. +NNG_DECL const char *nng_stat_name(nng_stat *); + +// nng_stat_type is used to determine the type of the statistic. +// At present, only NNG_STAT_TYPE_LEVEL and and NNG_STAT_TYPE_COUNTER +// are defined. Counters generally increment, and therefore changes in the +// value over time are likely more interesting than the actual level. Level +// values reflect some absolute state however, and should be presented to the +// user as is. +NNG_DECL int nng_stat_type(nng_stat *); #define NNG_STAT_LEVEL 0 #define NNG_STAT_COUNTER 1 -/* - * nng_stat_unit provides information about the unit for the statistic, - * such as NNG_UNIT_BYTES or NNG_UNIT_BYTES. If no specific unit is - * applicable, such as a relative priority, then NN_UNIT_NONE is - * returned. - */ -NNG_DECL int nng_stat_unit(nng_stat_t); +// nng_stat_unit provides information about the unit for the statistic, +// such as NNG_UNIT_BYTES or NNG_UNIT_BYTES. If no specific unit is +// applicable, such as a relative priority, then NN_UNIT_NONE is +// returned. +NNG_DECL int nng_stat_unit(nng_stat *); #define NNG_UNIT_NONE 0 #define NNG_UNIT_BYTES 1 @@ -386,39 +308,29 @@ NNG_DECL int nng_stat_unit(nng_stat_t); #define NNG_UNIT_MILLIS 4 #define NNG_UNIT_EVENTS 5 -/* - * nng_stat_value returns returns the actual value of the statistic. - * Statistic values reflect their value at the time that the corresponding - * snapshot was updated, and are undefined until an update is performed. - */ -NNG_DECL int64_t nng_stat_value(nng_stat_t); - -/* - * Device functionality. This connects two sockets together in a device, - * which means that messages from one side are forwarded to the other. - */ -NNG_DECL int nng_device(nng_socket_t, nng_socket_t); - -/* - * Pollset functionality. TBD. (Note that I'd rather avoid this - * altogether, because I believe that the notification mechanism I've - * created offers a superior way to handle this. I don't think many - * direct consumers of nn_poll existed in the wild, except via nn_device(). - * I suspect that there not even many nn_device() consumers.) - */ - -/* - * Symbol name and visibility. TBD. The only symbols that really should - * be directly exported to runtimes IMO are the option symbols. And frankly - * they have enough special logic around them that it might be best not to - * automate the promotion of them to other APIs. This is an area open - * for discussion. - */ - -/* - * Error codes. These may happen to align to errnos used on your platform, - * but do not count on this. - */ +// nng_stat_value returns returns the actual value of the statistic. +// Statistic values reflect their value at the time that the corresponding +// snapshot was updated, and are undefined until an update is performed. +NNG_DECL int64_t nng_stat_value(nng_stat *); + +// Device functionality. This connects two sockets together in a device, +// which means that messages from one side are forwarded to the other. +NNG_DECL int nng_device(nng_socket *, nng_socket *); + +// Pollset functionality. TBD. (Note that I'd rather avoid this +// altogether, because I believe that the notification mechanism I've +// created offers a superior way to handle this. I don't think many +// direct consumers of nn_poll existed in the wild, except via nn_device(). +// I suspect that there not even many nn_device() consumers.) + +// Symbol name and visibility. TBD. The only symbols that really should +// be directly exported to runtimes IMO are the option symbols. And frankly +// they have enough special logic around them that it might be best not to +// automate the promotion of them to other APIs. This is an area open +// for discussion. + +// Error codes. These may happen to align to errnos used on your platform, +// but do not count on this. #define NNG_EINTR (-1) #define NNG_ENOMEM (-2) #define NNG_EINVAL (-3) @@ -430,14 +342,12 @@ NNG_DECL int nng_device(nng_socket_t, nng_socket_t); #define NNG_ENOTSUP (-9) #define NNG_EADDRINUSE (-10) -/* - * Maximum length of a socket address. This includes the terminating NUL. - * This limit is built into other implementations, so do not change it. - */ -#define NNG_MAXADDRLEN (128) +// Maximum length of a socket address. This includes the terminating NUL. +// This limit is built into other implementations, so do not change it. +#define NNG_MAXADDRLEN (128) #ifdef __cplusplus } #endif -#endif /* NNG_H */ +#endif // NNG_H diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 364a72fa..cabd3f06 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -21,8 +21,8 @@ typedef struct nni_pair_sock { nni_socket * sock; nni_pipe * pipe; nni_mutex mx; - nni_msgqueue_t uwq; - nni_msgqueue_t urq; + nni_msgqueue * uwq; + nni_msgqueue * urq; } nni_pair_sock; // An nni_pair_pipe is our per-pipe protocol private structure. We keep diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 23046933..a72d6b1e 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -140,8 +140,6 @@ nni_inproc_pipe_send(void *arg, nni_msg *msg) { nni_inproc_pipe *pipe = arg; - // TODO: look at the message expiration and use that to set up - // the timeout. (And if it expired already, throw it away.) return (nni_msgqueue_put(pipe->wq, msg)); } |
