From 89c847f1f52969ee2ae6ed35018eef40366ca061 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 22 Dec 2016 17:38:46 -0800 Subject: Work on endpoints. More C99 & type cleanups. --- src/core/endpt.c | 123 ++++++++++++++++++++++++++++++++ src/core/endpt.h | 29 ++++---- src/core/list.h | 47 ++++++------ src/core/message.c | 186 ++++++++++++++++++++++------------------------- src/core/message.h | 51 +++++++------ src/core/msgqueue.c | 32 +++++---- src/core/msgqueue.h | 14 +++- src/core/pipe.c | 48 ++++++------- src/core/pipe.h | 8 +-- src/core/platform.h | 1 - src/core/protocol.c | 40 +++++------ src/core/protocol.h | 181 ++++++++++++++++++---------------------------- src/core/socket.c | 37 +++++----- src/core/transport.c | 46 ++++++------ src/core/transport.h | 198 +++++++++++++++++++-------------------------------- 15 files changed, 533 insertions(+), 508 deletions(-) create mode 100644 src/core/endpt.c (limited to 'src/core') diff --git a/src/core/endpt.c b/src/core/endpt.c new file mode 100644 index 00000000..b7181ce8 --- /dev/null +++ b/src/core/endpt.c @@ -0,0 +1,123 @@ +// +// Copyright 2016 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include +#include +#include + +// Functionality realited to end points. +#if 0 +struct nng_endpt { + struct nni_endpt_ops ep_ops; + void * ep_data; + nni_list_node_t ep_sock_node; + nni_socket * ep_sock; + char ep_addr[NNG_MAXADDRLEN]; + nni_thread * ep_dialer; + nni_thread * ep_listener; + int ep_close; + nni_mutex ep_mx; + nni_cond ep_cv; +}; +#endif + +int +nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) +{ + nni_transport *tran; + nni_endpt *ep; + int rv; + + if ((tran = nni_transport_find(addr)) == NULL) { + return (NNG_EINVAL); + } + if (strlen(addr) >= NNG_MAXADDRLEN) { + return (NNG_EINVAL); + } + + if ((ep = nni_alloc(sizeof (*ep))) == NULL) { + return (NNG_ENOMEM); + } + ep->ep_dialer = NULL; + ep->ep_listener = NULL; + ep->ep_close = 0; + if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) { + nni_free(ep, sizeof (*ep)); + return (NNG_ENOMEM); + } + if ((rv = nni_cond_init(&ep->ep_cv, &ep->ep_mx)) != 0) { + nni_mutex_fini(&ep->ep_mx); + nni_free(ep, sizeof (*ep)); + return (NNG_ENOMEM); + } + + // Could safely use strcpy here, but this avoids discussion. + (void) snprintf(ep->ep_addr, sizeof (ep->ep_addr), "%s", addr); + ep->ep_sock = sock; + ep->ep_ops = *tran->tran_ep_ops; + + rv = ep->ep_ops.ep_create(&ep->ep_data, addr, nni_socket_proto(sock)); + if (rv != 0) { + nni_cond_fini(&ep->ep_cv); + nni_mutex_fini(&ep->ep_mx); + nni_free(ep, sizeof (*ep)); + return (rv); + } + NNI_LIST_INIT(&ep->ep_pipes, nni_pipe, p_ep_node); + *epp = ep; + return (0); +} + +void +nni_endpt_destroy(nni_endpt *ep) +{ + // We should already have been closed at this point, so this + // should proceed very quickly. + if (ep->ep_dialer) { + nni_thread_reap(ep->ep_dialer); + } + if (ep->ep_listener) { + nni_thread_reap(ep->ep_listener); + } + nni_mutex_enter(&ep->ep_mx); + while (nni_list_first(&ep->ep_pipes) != NULL) { + nni_cond_wait(&ep->ep_cv); + } + nni_mutex_exit(&ep->ep_mx); + + ep->ep_ops.ep_destroy(ep->ep_data); + + nni_cond_fini(&ep->ep_cv); + nni_mutex_fini(&ep->ep_mx); + nni_free(ep, sizeof (*ep)); +} + +void +nni_endpt_close(nni_endpt *ep) +{ + nni_mutex_enter(&ep->ep_mx); + if (ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + ep->ep_close = 1; + nni_cond_broadcast(&ep->ep_cv); + nni_mutex_exit(&ep->ep_mx); + ep->ep_ops.ep_close(ep->ep_data); +} + + +#if 0 +int nni_endpt_dial(nni_endpt *, nni_pipe **); +int nni_endpt_listen(nni_endpt *); +int nni_endpt_accept(nni_endpt *, nni_pipe **); +int nni_endpt_close(nni_endpt *); +#endif diff --git a/src/core/endpt.h b/src/core/endpt.h index 0fa69678..4054a997 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -12,27 +12,28 @@ #include "core/transport.h" -// NB: This structure is supplied here for use by the CORE. Use of this library -// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR -// TRANSPORTS. +// NB: This structure is supplied here for use by the CORE. Use of this +// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS +// OR TRANSPORTS. struct nng_endpt { struct nni_endpt_ops ep_ops; - void * ep_tran; - nni_list_node_t ep_sock_node; + void * ep_data; // Transport private + nni_list_node ep_sock_node; // Per socket list nni_socket * ep_sock; - const char * ep_addr; - nni_thread_t ep_dialer; - nni_thread_t ep_listener; + char ep_addr[NNG_MAXADDRLEN]; + nni_thread * ep_dialer; + nni_thread * ep_listener; int ep_close; nni_mutex ep_mx; nni_cond ep_cv; + nni_list ep_pipes; // Active list of pipes }; -int nni_endpt_create(nni_endpt **, nni_socket *, const char *); -void nni_endpt_destroy(nni_endpt *); -int nni_endpt_dial(nni_endpt *, nni_pipe **); -int nni_endpt_listen(nni_endpt *); -int nni_endpt_accept(nni_endpt *, nni_pipe **); -int nni_endpt_close(nni_endpt *); +extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *); +extern void nni_endpt_destroy(nni_endpt *); +extern int nni_endpt_dial(nni_endpt *, nni_pipe **); +extern int nni_endpt_listen(nni_endpt *); +extern int nni_endpt_accept(nni_endpt *, nni_pipe **); +extern void nni_endpt_close(nni_endpt *); #endif // CORE_ENDPT_H diff --git a/src/core/list.h b/src/core/list.h index 65d6ae9e..68e6080d 100644 --- a/src/core/list.h +++ b/src/core/list.h @@ -1,43 +1,44 @@ -/* - * Copyright 2016 Garrett D'Amore - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef CORE_LIST_H #define CORE_LIST_H #include "core/nng_impl.h" -/* - * In order to make life easy, we just define the list structures - * directly, and let consumers directly inline structures. - */ +// In order to make life easy, we just define the list structures +// directly, and let consumers directly inline structures. typedef struct nni_list_node { struct nni_list_node * ln_next; struct nni_list_node * ln_prev; -} nni_list_node_t; +} nni_list_node; typedef struct nni_list { struct nni_list_node ll_head; size_t ll_offset; -} nni_list_t; +} nni_list; -extern void nni_list_init_offset(nni_list_t *list, size_t offset); +typedef nni_list nni_list_t; +typedef nni_list_node nni_list_node_t; + +extern void nni_list_init_offset(nni_list *list, size_t offset); #define NNI_LIST_INIT(list, type, field) \ nni_list_init_offset(list, offsetof(type, field)) -extern void *nni_list_first(nni_list_t *); -extern void *nni_list_last(nni_list_t *); -extern void nni_list_append(nni_list_t *, void *); -extern void nni_list_prepend(nni_list_t *, void *); -extern void *nni_list_next(nni_list_t *, void *); -extern void *nni_list_prev(nni_list_t *, void *); -extern void nni_list_remove(nni_list_t *, void *); -extern void nni_list_node_init(nni_list_t *, void *); +extern void *nni_list_first(nni_list *); +extern void *nni_list_last(nni_list *); +extern void nni_list_append(nni_list *, void *); +extern void nni_list_prepend(nni_list *, void *); +extern void *nni_list_next(nni_list *, void *); +extern void *nni_list_prev(nni_list *, void *); +extern void nni_list_remove(nni_list *, void *); +extern void nni_list_node_init(nni_list *, void *); #define NNI_LIST_FOREACH(l, it) \ for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it)) diff --git a/src/core/message.c b/src/core/message.c index 4ba30ed6..6ec47cb9 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -1,51 +1,47 @@ -/* - * Copyright 2016 Garrett D'Amore - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include #include #include "core/nng_impl.h" -/* - * Message API. - */ +// Message API. -/* Message chunk, internal to the message implementation. */ +// Message chunk, internal to the message implementation. typedef struct { - size_t ch_cap; /* allocated size */ - size_t ch_len; /* length in use */ - uint8_t * ch_buf; /* underlying buffer */ - uint8_t * ch_ptr; /* pointer to actual data */ -} chunk_t; + size_t ch_cap; // allocated size + size_t ch_len; // length in use + uint8_t * ch_buf; // underlying buffer + uint8_t * ch_ptr; // pointer to actual data +} nni_chunk; -/* Underlying message chunk. */ +// Underlying message structure. struct nng_msg { - chunk_t m_header; - chunk_t m_body; - int64_t m_expire; /* Unix usec */ - nni_pipe_t m_pipe; /* Pipe message was received on */ + nni_chunk m_header; + nni_chunk m_body; + nni_time m_expire; // usec + nni_pipe * m_pipe; // Pipe message was received on }; -/* - * chunk_grow increases the underlying space for a chunk. It ensures - * that the desired amount of trailing space (including the length) - * and headroom (excluding the length) are available. It also copies - * any extant referenced data. Note that the capacity will increase, - * but not the length. To increase the length of the referenced data, - * use either chunk_append or chunk_prepend. - * - * Note that having some headroom is useful when data must be prepended - * to a message - it avoids having to perform extra data copies, so we - * encourage initial allocations to start with sufficient room. - */ +// nni_chunk_grow increases the underlying space for a chunk. It ensures +// that the desired amount of trailing space (including the length) +// and headroom (excluding the length) are available. It also copies +// any extant referenced data. Note that the capacity will increase, +// but not the length. To increase the length of the referenced data, +// use either chunk_append or chunk_prepend. +// +// Note that having some headroom is useful when data must be prepended +// to a message - it avoids having to perform extra data copies, so we +// encourage initial allocations to start with sufficient room. static int -chunk_grow(chunk_t *ch, size_t newsz, size_t headwanted) +nni_chunk_grow(nni_chunk *ch, size_t newsz, size_t headwanted) { size_t headroom = 0; uint8_t *newbuf; @@ -111,7 +107,7 @@ chunk_grow(chunk_t *ch, size_t newsz, size_t headwanted) static void -chunk_free(chunk_t *ch) +nni_chunk_free(nni_chunk *ch) { if ((ch->ch_cap != 0) && (ch->ch_buf != NULL)) { nni_free(ch->ch_buf, ch->ch_cap); @@ -123,9 +119,9 @@ chunk_free(chunk_t *ch) } -/* chunk_trunc truncates the number of bytes from the end of the chunk. */ +// nni_chunk_trunc truncates bytes from the end of the chunk. static int -chunk_trunc(chunk_t *ch, size_t len) +nni_chunk_trunc(nni_chunk *ch, size_t len) { if (ch->ch_len < len) { return (NNG_EINVAL); @@ -135,9 +131,9 @@ chunk_trunc(chunk_t *ch, size_t len) } -/* chunk_trim removes the number of bytes from the beginning of the chunk. */ +// nni_chunk_trim removes bytes from the beginning of the chunk. static int -chunk_trim(chunk_t *ch, size_t len) +nni_chunk_trim(nni_chunk *ch, size_t len) { if (ch->ch_len < len) { return (NNG_EINVAL); @@ -148,20 +144,18 @@ chunk_trim(chunk_t *ch, size_t len) } -/* - * chunk_append appends the data to the chunk, growing the size as necessary. - * If the data pointer is NULL, then the chunk data region is allocated, but - * uninitialized. - */ +// nni_chunk_append appends the data to the chunk, growing as necessary. +// If the data pointer is NULL, then the chunk data region is allocated, +// but uninitialized. static int -chunk_append(chunk_t *ch, const void *data, size_t len) +nni_chunk_append(nni_chunk *ch, const void *data, size_t len) { int rv; if (len == 0) { return (0); } - if ((rv = chunk_grow(ch, len + ch->ch_len, 0)) != 0) { + if ((rv = nni_chunk_grow(ch, len + ch->ch_len, 0)) != 0) { return (rv); } if (ch->ch_ptr == NULL) { @@ -175,13 +169,11 @@ chunk_append(chunk_t *ch, const void *data, size_t len) } -/* - * chunk_prepend prepends data to the chunk, as efficiently as possible. - * If the data pointer is NULL, then no data is actually copied, but the - * data region will have "grown" in the beginning, with uninitialized data. - */ +// nni_chunk_prepend prepends data to the chunk, as efficiently as possible. +// If the data pointer is NULL, then no data is actually copied, but the +// data region will have "grown" in the beginning, with uninitialized data. static int -chunk_prepend(chunk_t *ch, const void *data, size_t len) +nni_chunk_prepend(nni_chunk *ch, const void *data, size_t len) { int rv; @@ -197,7 +189,7 @@ chunk_prepend(chunk_t *ch, const void *data, size_t len) } else if ((ch->ch_len + len) <= ch->ch_cap) { /* We had enough capacity, just shuffle data down. */ memmove(ch->ch_ptr + len, ch->ch_ptr, ch->ch_len); - } else if ((rv = chunk_grow(ch, 0, len)) == 0) { + } else if ((rv = nni_chunk_grow(ch, 0, len)) == 0) { /* We grew the chunk, so adjust. */ ch->ch_ptr -= len; } else { @@ -215,42 +207,38 @@ chunk_prepend(chunk_t *ch, const void *data, size_t len) int -nni_msg_alloc(nni_msg_t *mp, size_t sz) +nni_msg_alloc(nni_msg **mp, size_t sz) { - nni_msg_t m; + nni_msg *m; int rv; if ((m = nni_alloc(sizeof (*m))) == NULL) { return (NNG_ENOMEM); } - /* - * 64-bytes of header, including room for 32 bytes - * of headroom and 32 bytes of trailer. - */ - if ((rv = chunk_grow(&m->m_header, 32, 32)) != 0) { + // 64-bytes of header, including room for 32 bytes + // of headroom and 32 bytes of trailer. + if ((rv = nni_chunk_grow(&m->m_header, 32, 32)) != 0) { nni_free(m, sizeof (*m)); return (rv); } - /* - * If the message is less than 1024 bytes, or is not power - * of two aligned, then we insert a 32 bytes of headroom - * to allow for inlining backtraces, etc. We also allow the - * amount of space at the end for the same reason. Large aligned - * allocations are unmolested to avoid excessive overallocation. - */ + // If the message is less than 1024 bytes, or is not power + // of two aligned, then we insert a 32 bytes of headroom + // to allow for inlining backtraces, etc. We also allow the + // amount of space at the end for the same reason. Large aligned + // allocations are unmolested to avoid excessive overallocation. if ((sz < 1024) || ((sz & (sz-1)) != 0)) { - rv = chunk_grow(&m->m_body, sz + 32, 32); + rv = nni_chunk_grow(&m->m_body, sz + 32, 32); } else { - rv = chunk_grow(&m->m_body, sz, 0); + rv = nni_chunk_grow(&m->m_body, sz, 0); } if (rv != 0) { - chunk_free(&m->m_header); + nni_chunk_free(&m->m_header); nni_free(m, sizeof (*m)); } - if ((rv = chunk_append(&m->m_body, NULL, sz)) != 0) { - /* Should not happen since we just grew it to fit. */ + if ((rv = nni_chunk_append(&m->m_body, NULL, sz)) != 0) { + // Should not happen since we just grew it to fit. nni_panic("chunk_append failed"); } @@ -260,34 +248,34 @@ nni_msg_alloc(nni_msg_t *mp, size_t sz) void -nni_msg_free(nni_msg_t m) +nni_msg_free(nni_msg *m) { - chunk_free(&m->m_header); - chunk_free(&m->m_body); + nni_chunk_free(&m->m_header); + nni_chunk_free(&m->m_body); nni_free(m, sizeof (*m)); } int -nni_msg_realloc(nni_msg_t m, size_t sz) +nni_msg_realloc(nni_msg *m, size_t sz) { int rv = 0; if (m->m_body.ch_len < sz) { - rv = chunk_append(&m->m_body, NULL, sz - m->m_body.ch_len); + rv = nni_chunk_append(&m->m_body, NULL, sz - m->m_body.ch_len); if (rv != 0) { return (rv); } } else { - /* "Shrinking", just mark bytes at end usable again. */ - chunk_trunc(&m->m_body, m->m_body.ch_len - sz); + // "Shrinking", just mark bytes at end usable again. + nni_chunk_trunc(&m->m_body, m->m_body.ch_len - sz); } return (0); } void * -nni_msg_header(nni_msg_t m, size_t *szp) +nni_msg_header(nni_msg *m, size_t *szp) { if (szp != NULL) { *szp = m->m_header.ch_len; @@ -297,7 +285,7 @@ nni_msg_header(nni_msg_t m, size_t *szp) void * -nni_msg_body(nni_msg_t m, size_t *szp) +nni_msg_body(nni_msg *m, size_t *szp) { if (szp != NULL) { *szp = m->m_body.ch_len; @@ -307,63 +295,63 @@ nni_msg_body(nni_msg_t m, size_t *szp) int -nni_msg_append(nni_msg_t m, const void *data, size_t len) +nni_msg_append(nni_msg *m, const void *data, size_t len) { - return (chunk_append(&m->m_body, data, len)); + return (nni_chunk_append(&m->m_body, data, len)); } int -nni_msg_prepend(nni_msg_t m, const void *data, size_t len) +nni_msg_prepend(nni_msg *m, const void *data, size_t len) { - return (chunk_prepend(&m->m_body, data, len)); + return (nni_chunk_prepend(&m->m_body, data, len)); } int -nni_msg_trim(nni_msg_t m, size_t len) +nni_msg_trim(nni_msg *m, size_t len) { - return (chunk_trim(&m->m_body, len)); + return (nni_chunk_trim(&m->m_body, len)); } int -nni_msg_trunc(nni_msg_t m, size_t len) +nni_msg_trunc(nni_msg *m, size_t len) { - return (chunk_trunc(&m->m_body, len)); + return (nni_chunk_trunc(&m->m_body, len)); } int -nni_msg_append_header(nni_msg_t m, const void *data, size_t len) +nni_msg_append_header(nni_msg *m, const void *data, size_t len) { - return (chunk_append(&m->m_header, data, len)); + return (nni_chunk_append(&m->m_header, data, len)); } int -nni_msg_prepend_header(nni_msg_t m, const void *data, size_t len) +nni_msg_prepend_header(nni_msg *m, const void *data, size_t len) { - return (chunk_prepend(&m->m_header, data, len)); + return (nni_chunk_prepend(&m->m_header, data, len)); } int -nni_msg_trim_header(nni_msg_t m, size_t len) +nni_msg_trim_header(nni_msg *m, size_t len) { - return (chunk_trim(&m->m_header, len)); + return (nni_chunk_trim(&m->m_header, len)); } int -nni_msg_trunc_header(nni_msg_t m, size_t len) +nni_msg_trunc_header(nni_msg *m, size_t len) { - return (chunk_trunc(&m->m_header, len)); + return (nni_chunk_trunc(&m->m_header, len)); } int -nni_msg_pipe(nni_msg_t m, nni_pipe_t *pp) +nni_msg_pipe(nni_msg *m, nni_pipe **pp) { *pp = m->m_pipe; return (0); diff --git a/src/core/message.h b/src/core/message.h index 7b17a007..5c742e44 100644 --- a/src/core/message.h +++ b/src/core/message.h @@ -1,33 +1,30 @@ -/* - * Copyright 2016 Garrett D'Amore - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef CORE_MESSAGE_H #define CORE_MESSAGE_H -/* - * Internally used message API. Again, this stuff is not part of our public - * API. - */ +// Internally used message API. Again, this is not part of our public API. -extern int nni_msg_alloc(nni_msg_t *, size_t); -extern void nni_msg_free(nni_msg_t); -extern int nni_msg_realloc(nni_msg_t, size_t); -extern void *nni_msg_header(nni_msg_t, size_t *); -extern void *nni_msg_body(nni_msg_t, size_t *); -extern int nni_msg_append(nni_msg_t, const void *, size_t); -extern int nni_msg_prepend(nni_msg_t, const void *, size_t); -extern int nni_msg_append_header(nni_msg_t, const void *, size_t); -extern int nni_msg_prepend_header(nni_msg_t, const void *, size_t); -extern int nni_msg_trim(nni_msg_t, size_t); -extern int nni_msg_trunc(nni_msg_t, size_t); -extern int nni_msg_trim_header(nni_msg_t, size_t); -extern int nni_msg_trunc_header(nni_msg_t, size_t); -extern int nni_msg_pipe(nni_msg_t, nni_pipe_t *); +extern int nni_msg_alloc(nni_msg **, size_t); +extern void nni_msg_free(nni_msg *); +extern int nni_msg_realloc(nni_msg *, size_t); +extern void *nni_msg_header(nni_msg *, size_t *); +extern void *nni_msg_body(nni_msg *, size_t *); +extern int nni_msg_append(nni_msg *, const void *, size_t); +extern int nni_msg_prepend(nni_msg *, const void *, size_t); +extern int nni_msg_append_header(nni_msg *, const void *, size_t); +extern int nni_msg_prepend_header(nni_msg *, const void *, size_t); +extern int nni_msg_trim(nni_msg *, size_t); +extern int nni_msg_trunc(nni_msg *, size_t); +extern int nni_msg_trim_header(nni_msg *, size_t); +extern int nni_msg_trunc_header(nni_msg *, size_t); +extern int nni_msg_pipe(nni_msg *, nni_pipe **); -#endif /* CORE_SOCKET_H */ +#endif // CORE_SOCKET_H diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 7d892b88..b8d11e8c 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -52,7 +52,7 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap) nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } - if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) { + if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) { nni_cond_fini(&mq->mq_writeable); nni_cond_fini(&mq->mq_readable); nni_mutex_fini(&mq->mq_lock); @@ -90,7 +90,7 @@ nni_msgqueue_destroy(nni_msgqueue *mq) nni_msg_free(msg); } - nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t)); + nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg *)); nni_free(mq, sizeof (*mq)); } @@ -130,6 +130,8 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, (void) nni_cond_waituntil(&mq->mq_writeable, expire); } + // Once a queue is closed, you can't write to it. It can still be + // read from, at least until its empty. if (mq->mq_closed) { nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); @@ -175,19 +177,19 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, (void) nni_cond_waituntil(&mq->mq_readable, expire); } - if (mq->mq_closed) { - nni_mutex_exit(&mq->mq_lock); - return (NNG_ECLOSED); - } + // If there is any data left in the message queue, we will still + // provide it, so that the reader can drain. + if (mq->mq_len == 0) { + if (mq->mq_closed) { + nni_mutex_exit(&mq->mq_lock); + return (NNG_ECLOSED); + } - if ((mq->mq_len == 0) && (*signal)) { - // We are being interrupted. We only allow an interrupt - // if there is no data though, because we'd really prefer - // to give back the data. Otherwise our failure to deal - // with the data could lead to starvation; also lingering - // relies on this not interrupting if data is pending. - nni_mutex_exit(&mq->mq_lock); - return (NNG_EINTR); + if (*signal) { + // We are being interrupted. + nni_mutex_exit(&mq->mq_lock); + return (NNG_EINTR); + } } *msgp = mq->mq_msgs[mq->mq_get]; @@ -258,7 +260,7 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire) void nni_msgqueue_close(nni_msgqueue *mq) { - nni_msg_t msg; + nni_msg *msg; nni_mutex_enter(&mq->mq_lock); mq->mq_closed = 1; diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index d97482af..adea2dfa 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -15,8 +15,18 @@ // Message queues. Message queues work in some ways like Go channels; // they are a thread-safe way to pass messages between subsystems. They // do have additional capabilities though. -typedef struct nni_msgqueue * nni_msgqueue_t; -typedef struct nni_msgqueue nni_msgqueue; +// +// A closed message queue cannot be written to, but if there are messages +// still in it, it can be read from. (This allows them to play a role in +// draining/lingering.) +// +// Message queues can be closed many times safely. +// +// Readers & writers in a message queue can be woken either by a timeout +// or by a specific signal (arranged by the caller). +// +// TODO: Add message queue growing, and pushback. +typedef struct nni_msgqueue nni_msgqueue; // nni_msgqueue_create creates a message queue with the given capacity, // which must be a positive number. It returns NNG_EINVAL if the capacity diff --git a/src/core/pipe.c b/src/core/pipe.c index a617cf85..58a31d7f 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -1,64 +1,60 @@ -/* - * Copyright 2016 Garrett D'Amore - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "core/nng_impl.h" -/* - * This file contains functions relating to pipes. - * - * Operations on pipes (to the transport) are generally blocking operations, - * performed in the context of the protocol. - */ +// This file contains functions relating to pipes. +// +// Operations on pipes (to the transport) are generally blocking operations, +// performed in the context of the protocol. -/* nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. */ +// nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. uint32_t -nni_pipe_id(nni_pipe_t p) +nni_pipe_id(nni_pipe * p) { return (p->p_id); } int -nni_pipe_send(nni_pipe_t p, nng_msg_t msg) +nni_pipe_send(nni_pipe * p, nng_msg *msg) { return (p->p_ops.p_send(p->p_tran, msg)); } int -nni_pipe_recv(nni_pipe_t p, nng_msg_t *msgp) +nni_pipe_recv(nni_pipe * p, nng_msg **msgp) { return (p->p_ops.p_recv(p->p_tran, msgp)); } -/* - * nni_pipe_close closes the underlying connection. It is expected that - * subsequent attempts receive or send (including any waiting receive) will - * simply return NNG_ECLOSED. - */ +// nni_pipe_close closes the underlying connection. It is expected that +// subsequent attempts receive or send (including any waiting receive) will +// simply return NNG_ECLOSED. void -nni_pipe_close(nni_pipe_t p) +nni_pipe_close(nni_pipe * p) { p->p_ops.p_close(p->p_tran); } uint16_t -nni_pipe_peer(nni_pipe_t p) +nni_pipe_peer(nni_pipe * p) { return (p->p_ops.p_peer(p->p_tran)); } void -nni_pipe_destroy(nni_pipe_t p) +nni_pipe_destroy(nni_pipe * p) { p->p_ops.p_destroy(p->p_tran); nni_free(p, sizeof (*p)); diff --git a/src/core/pipe.h b/src/core/pipe.h index a4a03a93..b78eaa2a 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -22,16 +22,16 @@ struct nng_pipe { uint32_t p_id; struct nni_pipe_ops p_ops; void * p_tran; - nni_list_node_t p_sock_node; + nni_list_node p_sock_node; nni_socket * p_sock; - nni_list_node_t p_ep_node; + nni_list_node p_ep_node; nni_endpt * p_ep; }; // Pipe operations that protocols use. -extern int nni_pipe_recv(nni_pipe *, nng_msg_t *); -extern int nni_pipe_send(nni_pipe *, nng_msg_t); +extern int nni_pipe_recv(nni_pipe *, nng_msg **); +extern int nni_pipe_send(nni_pipe *, nng_msg *); extern uint32_t nni_pipe_id(nni_pipe *); extern void nni_pipe_close(nni_pipe *); diff --git a/src/core/platform.h b/src/core/platform.h index 58e4cd36..a674aac5 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -113,7 +113,6 @@ extern void nni_cond_wait(nni_cond *); // check the condition. It will return either NNG_ETIMEDOUT, or 0. extern int nni_cond_waituntil(nni_cond *, nni_time); -typedef struct nni_thread * nni_thread_t; typedef struct nni_thread nni_thread; // nni_thread_creates a thread that runs the given function. The thread diff --git a/src/core/protocol.c b/src/core/protocol.c index 113746b1..6399cd41 100644 --- a/src/core/protocol.c +++ b/src/core/protocol.c @@ -1,37 +1,33 @@ -/* - * Copyright 2016 Garrett D'Amore - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include #include "core/nng_impl.h" -/* - * Protocol related stuff - generically. - */ +// Protocol related stuff - generically. -/* - * The list of protocols is hardwired. This is reasonably unlikely to - * change, as adding new protocols is not something intended to be done - * outside of the core. - */ -extern struct nni_protocol nni_pair_protocol; +// The list of protocols is hardwired. This is reasonably unlikely to +// change, as adding new protocols is not something intended to be done +// outside of the core. +extern nni_protocol nni_pair_protocol; -static struct nni_protocol *protocols[] = { +static nni_protocol *protocols[] = { &nni_pair_protocol, NULL }; -struct nni_protocol * +nni_protocol * nni_protocol_find(uint16_t num) { int i; - struct nni_protocol *p; + nni_protocol *p; for (i = 0; (p = protocols[i]) != NULL; i++) { if (p->proto_self == num) { @@ -45,7 +41,7 @@ nni_protocol_find(uint16_t num) const char * nni_protocol_name(uint16_t num) { - struct nni_protocol *p; + nni_protocol *p; if ((p = nni_protocol_find(num)) == NULL) { return (NULL); @@ -57,7 +53,7 @@ nni_protocol_name(uint16_t num) uint16_t nni_protocol_number(const char *name) { - struct nni_protocol *p; + nni_protocol *p; int i; for (i = 0; (p = protocols[i]) != NULL; i++) { diff --git a/src/core/protocol.h b/src/core/protocol.h index 21d3b6b9..a4b24177 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -1,128 +1,89 @@ -/* - * Copyright 2016 Garrett D'Amore - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef CORE_PROTOCOL_H #define CORE_PROTOCOL_H -/* - * Protocol implementation details. Protocols must implement the - * interfaces in this file. Note that implementing new protocols is - * not necessarily intended to be a trivial task. The protocol developer - * must understand the nature of nng, as they are responsible for handling - * most of the logic. The protocol generally does most of the work for - * locking, and calls into the transport's pipe functions to do actual - * work, and the pipe functions generally assume no locking is needed. - * As a consequence, most of the concurrency in nng exists in the protocol - * implementations. - * - * Pipe operations may block, or even reenter the protoccol entry points - * (for example nni_pipe_close() causes the protocols proto_remove_pipe - * entry point to be called), so it is very important that protocols do - * not hold any locks across calls to pipe functions. - */ - +// Protocol implementation details. Protocols must implement the +// interfaces in this file. Note that implementing new protocols is +// not necessarily intended to be a trivial task. The protocol developer +// must understand the nature of nng, as they are responsible for handling +// most of the logic. The protocol generally does most of the work for +// locking, and calls into the transport's pipe functions to do actual +// work, and the pipe functions generally assume no locking is needed. +// As a consequence, most of the concurrency in nng exists in the protocol +// implementations. struct nni_protocol { - /* - * Protocol information. - */ - uint16_t proto_self; /* our 16-bit protocol ID */ - uint16_t proto_peer; /* who we peer with (protocol ID) */ - const char * proto_name; /* string version of our name */ - - /* - * Create protocol instance data, which will be stored on the socket. - */ - int (*proto_create)(void **, nni_socket_t); - - /* - * Destroy the protocol instance. - */ + uint16_t proto_self; // our 16-bit protocol ID + uint16_t proto_peer; // who we peer with (protocol ID) + const char * proto_name; // string version of our name + + //Create protocol instance, which will be stored on the socket. + int (*proto_create)(void **, nni_socket *); + + // Destroy the protocol instance. void (*proto_destroy)(void *); - /* - * Shutdown the protocol instance, including giving time to - * drain any outbound frames (linger). The protocol is not - * required to honor the linger. - * XXX: This is probably redundant -- protocol should notice - * drain by getting NNG_ECLOSED on the upper write queue. - */ + // Shutdown the protocol instance, including giving time to + // drain any outbound frames (linger). The protocol is not + // required to honor the linger. + // XXX: This is probably redundant -- protocol should notice + // drain by getting NNG_ECLOSED on the upper write queue. void (*proto_shutdown)(void *); - /* - * Add and remove pipes. These are called as connections are - * created or destroyed. - */ - int (*proto_add_pipe)(void *, nni_pipe_t); - int (*proto_rem_pipe)(void *, nni_pipe_t); + // Add and remove pipes. These are called as connections are + // created or destroyed. + int (*proto_add_pipe)(void *, nni_pipe *); + int (*proto_rem_pipe)(void *, nni_pipe *); - /* - * Option manipulation. These may be NULL. - */ + // Option manipulation. These may be NULL. int (*proto_setopt)(void *, int, const void *, size_t); int (*proto_getopt)(void *, int, void *, size_t *); - /* - * Receive filter. This may be NULL, but if it isn't, then - * messages coming into the system are routed here just before - * being delivered to the application. To drop the message, - * the protocol should return NULL, otherwise the message - * (possibly modified). - */ - nng_msg_t (*proto_recv_filter)(void *, nni_msg_t); - - /* - * Send filter. This may be NULL, but if it isn't, then - * messages here are filtered just after they come from the - * application. - */ - nng_msg_t (*proto_send_filter)(void *, nni_msg_t); + // Receive filter. This may be NULL, but if it isn't, then + // messages coming into the system are routed here just before being + // delivered to the application. To drop the message, the prtocol + // should return NULL, otherwise the message (possibly modified). + nni_msg * (*proto_recv_filter)(void *, nni_msg *); + + // Send filter. This may be NULL, but if it isn't, then messages + // here are filtered just after they come from the application. + nni_msg * (*proto_send_filter)(void *, nni_msg *); }; -/* - * These are socket methods that protocol operations can - * reasonably expect to call. - */ - -/* - * nni_socket_sendq obtains the upper writeq. The protocol should - * recieve messages from this, and place them on the appropriate - * pipe. - */ -extern nni_msgqueue_t nni_socket_sendq(nni_socket_t); - -/* - * nni_socket_recvq obtains the upper readq. The protocol should - * inject incoming messages from pipes to it. - */ -extern nni_msgqueue_t nni_socket_recvq(nni_socket_t); - -/* - * nni_socket_recv_err sets an error code to be returned to clients - * rather than waiting for a message. Set it to 0 to resume normal - * receive operation. - */ -extern void nni_socket_recv_err(nni_socket_t, int); - -/* - * nni_socket_send_err sets an error code to be returned to clients - * when they try to send, so that they don't have to timeout waiting - * for their message to be accepted for send. Set it to 0 to resume - * normal send operations. - */ -extern void nni_socket_send_err(nni_socket_t, int); - -/* - * These functions are not used by protocols, but rather by the socket - * core implementation. The lookups can be used by transports as well. - */ -extern struct nni_protocol *nni_protocol_find(uint16_t); +// These are socket methods that protocol operations can expect to call. +// Note that each of these should be called without any locks held, since +// the socket can reenter the protocol. + +// nni_socket_sendq obtains the upper writeq. The protocol should +// recieve messages from this, and place them on the appropriate pipe. +extern nni_msgqueue *nni_socket_sendq(nni_socket *); + +// nni_socket_recvq obtains the upper readq. The protocol should +// inject incoming messages from pipes to it. +extern nni_msgqueue *nni_socket_recvq(nni_socket *); + +// nni_socket_recv_err sets an error code to be returned to clients +// rather than waiting for a message. Set it to 0 to resume normal +// receive operation. +extern void nni_socket_recv_err(nni_socket *, int); + +// nni_socket_send_err sets an error code to be returned to clients +// when they try to send, so that they don't have to timeout waiting +// for their message to be accepted for send. Set it to 0 to resume +// normal send operations. +extern void nni_socket_send_err(nni_socket *, int); + +// These functions are not used by protocols, but rather by the socket +// core implementation. The lookups can be used by transports as well. +extern nni_protocol *nni_protocol_find(uint16_t); extern const char *nni_protocol_name(uint16_t); extern uint16_t nni_protocol_number(const char *); -#endif /* CORE_PROTOCOL_H */ +#endif // CORE_PROTOCOL_H diff --git a/src/core/socket.c b/src/core/socket.c index c500db31..1331bb4c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -13,14 +13,14 @@ // nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain // the upper read and write queues. -nni_msgqueue_t +nni_msgqueue * nni_socket_sendq(nni_socket *s) { return (s->s_uwq); } -nni_msgqueue_t +nni_msgqueue * nni_socket_recvq(nni_socket *s) { return (s->s_urq); @@ -53,8 +53,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) return (rv); } - NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node); - // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); + NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_sock_node); + NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_sock_node); if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { nni_cond_fini(&sock->s_cv); @@ -83,14 +83,7 @@ nni_socket_close(nni_socket *sock) // Stop all EPS. We're going to do this first, since we know // we're closing. NNI_LIST_FOREACH (&sock->s_eps, ep) { -#if 0 - // XXX: Question: block for them now, or wait further down? - // Probably we can simply just watch for the first list... - // stopping EPs should be *dead* easy, and never block. - nni_ep_stop(ep); - or nni_ep_shutdown(ep); - -#endif + nni_endpt_close(ep); } nni_mutex_exit(&sock->s_mx); @@ -133,9 +126,18 @@ nni_socket_close(nni_socket *sock) } // Wait to make sure endpoint listeners have shutdown and exited - // as well. They should have done so *long* ago. - while (nni_list_first(&sock->s_eps) != NULL) { - nni_cond_wait(&sock->s_cv); + // as well. They should have done so *long* ago. Because this + // requires waiting for threads to finish, which *could* in theory + // overlap with this, we must drop the socket lock. + while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + + // TODO: This looks like it should happen as an endpt_remove + // operation? + nni_list_remove(&sock->s_eps, ep); + nni_mutex_exit(&sock->s_mx); + + nni_endpt_destroy(ep); + nni_mutex_enter(&sock->s_mx); } nni_mutex_exit(&sock->s_mx); @@ -244,7 +246,6 @@ nni_socket_proto(nni_socket *sock) return (sock->s_ops.proto_self); } - // nni_socket_rem_pipe removes the pipe from the socket. This is often // called by the protocol when a pipe is removed due to close. void @@ -265,9 +266,11 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) // XXX: TODO: Redial // XXX: also publish event... // if (pipe->p_ep != NULL) { - // nn_endpt_remove_pipe(pipe->p_ep, pipe) + // nn_endpt_rem_pipe(pipe->p_ep, pipe) // } + nni_pipe_destroy(pipe); + // If we're closing, wake the socket if we finished draining. if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) { nni_cond_broadcast(&sock->s_cv); diff --git a/src/core/transport.c b/src/core/transport.c index 61ec5033..2e8dee7a 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -1,35 +1,33 @@ -/* - * Copyright 2016 Garrett D'Amore - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ - -#include +// +// Copyright 2016 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "core/nng_impl.h" -/* - * For now the list of transports is hard-wired. Adding new transports - * to the system dynamically is something that might be considered later. - */ -extern struct nni_transport nni_inproc_transport; +#include + +// For now the list of transports is hard-wired. Adding new transports +// to the system dynamically is something that might be considered later. +extern nni_transport nni_inproc_transport; -static struct nni_transport *transports[] = { +static nni_transport *transports[] = { &nni_inproc_transport, NULL }; -struct nni_transport * +nni_transport * nni_transport_find(const char *addr) { - /* address is of the form "://blah..." */ + // address is of the form "://blah..." const char *end; int len; int i; - struct nni_transport *ops; + nni_transport *ops; if ((end = strstr(addr, "://")) == NULL) { return (NULL); @@ -44,15 +42,13 @@ nni_transport_find(const char *addr) } -/* - * nni_transport_init initializes the entire transport subsystem, including - * each individual transport. - */ +// nni_transport_init initializes the entire transport subsystem, including +// each individual transport. void nni_transport_init(void) { int i; - struct nni_transport *ops; + nni_transport *ops; for (i = 0; (ops = transports[i]) != NULL; i++) { ops->tran_init(); @@ -64,7 +60,7 @@ void nni_transport_fini(void) { int i; - struct nni_transport *ops; + nni_transport *ops; for (i = 0; (ops = transports[i]) != NULL; i++) { if (ops->tran_fini != NULL) { diff --git a/src/core/transport.h b/src/core/transport.h index 370e87f5..4e39adaf 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -1,171 +1,123 @@ -/* - * Copyright 2016 Garrett D'Amore - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef CORE_TRANSPORT_H #define CORE_TRANSPORT_H -/* - * Transport implementation details. Transports must implement the - * interfaces in this file. - */ +// Transport implementation details. Transports must implement the +// interfaces in this file. struct nni_transport { - /* - * tran_scheme is the transport scheme, such as "tcp" or "inproc". - */ + // tran_scheme is the transport scheme, such as "tcp" or "inproc". const char * tran_scheme; - /* - * tran_ep_ops links our endpoint operations. - */ + // tran_ep_ops links our endpoint operations. const struct nni_endpt_ops * tran_ep_ops; - /* - * tran_pipe_ops links our pipe operations. - */ + // tran_pipe_ops links our pipe operations. const struct nni_pipe_ops * tran_pipe_ops; - /* - * tran_init, if not NULL, is called once during library - * initialization. - */ + // tran_init, if not NULL, is called once during library + // initialization. int (*tran_init)(void); - /* - * tran_fini, if not NULL, is called during library deinitialization. - * It should release any global resources, close any open files, etc. - * - * There will be no locks held, and no other threads running in the - * library. - * - * It is invalid to use any mutexes, condition variables, or - * threading routines. Mutexes and condition variables may be - * safely destroyed. - */ + // tran_fini, if not NULL, is called during library deinitialization. + // It should release any global resources, close any open files, etc. void (*tran_fini)(void); }; -/* - * Endpoint operations are called by the socket in a protocol-independent - * fashion. The socket makes individual calls, which are expected to block - * if appropriate (except for destroy). Endpoints are unable to call back - * into the socket, to prevent recusive entry and deadlock. - */ + +// Endpoint operations are called by the socket in a protocol-independent +// fashion. The socket makes individual calls, which are expected to block +// if appropriate (except for destroy). Endpoints are unable to call back +// into the socket, to prevent recusive entry and deadlock. struct nni_endpt_ops { - /* - * ep_create creates a vanilla endpoint. The value created is - * used for the first argument for all other endpoint functions. - */ + // ep_create creates a vanilla endpoint. The value created is + // used for the first argument for all other endpoint functions. int (*ep_create)(void **, const char *, uint16_t); - /* - * ep_destroy frees the resources associated with the endpoint. - * The endpoint will already have been closed. - */ + // ep_destroy frees the resources associated with the endpoint. + // The endpoint will already have been closed. void (*ep_destroy)(void *); - /* - * ep_dial starts dialing, and creates a new pipe, - * which is returned in the final argument. It can return errors - * NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED, - * NNG_ETIMEDOUT, and NNG_EPROTO. - */ + // ep_dial starts dialing, and creates a new pipe, + // which is returned in the final argument. It can return errors + // NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED, + // NNG_ETIMEDOUT, and NNG_EPROTO. int (*ep_dial)(void *, void **); - /* - * ep_listen just does the bind() and listen() work, - * reserving the address but not creating any connections. - * It should return NNG_EADDRINUSE if the address is already - * taken. It can also return NNG_EBADADDR for an unsuitable - * address, or NNG_EACCESS for permission problems. - */ + // ep_listen just does the bind() and listen() work, + // reserving the address but not creating any connections. + // It should return NNG_EADDRINUSE if the address is already + // taken. It can also return NNG_EBADADDR for an unsuitable + // address, or NNG_EACCESS for permission problems. int (*ep_listen)(void *); - /* - * ep_accept accepts an inbound connection, and creates - * a transport pipe, which is returned in the final argument. - */ + // ep_accept accepts an inbound connection, and creates + // a transport pipe, which is returned in the final argument. int (*ep_accept)(void *, void **); - /* - * ep_close stops the endpoint from operating altogether. It does - * not affect pipes that have already been created. - */ + // ep_close stops the endpoint from operating altogether. It does + // not affect pipes that have already been created. void (*ep_close)(void *); - /* ep_setopt sets an endpoint (transport-specific) option */ + // ep_setopt sets an endpoint (transport-specific) option. int (*ep_setopt)(void *, int, const void *, size_t); - /* ep_getopt gets an endpoint (transport-specific) option */ + // ep_getopt gets an endpoint (transport-specific) option. int (*ep_getopt)(void *, int, void *, size_t *); }; -/* - * Pipe operations are entry points called by the socket. These may be called - * with socket locks held, so it is forbidden for the transport to call - * back into the socket at this point. (Which is one reason pointers back - * to socket or even enclosing pipe state, are not provided.) - */ +// Pipe operations are entry points called by the socket. These may be called +// with socket locks held, so it is forbidden for the transport to call +// back into the socket at this point. (Which is one reason pointers back +// to socket or even enclosing pipe state, are not provided.) struct nni_pipe_ops { - /* - * p_destroy destroys the pipe. This should clean up all local resources, - * including closing files and freeing memory, used by the pipe. After - * this call returns, the system will not make further calls on the same - * pipe. - */ + // p_destroy destroys the pipe. This should clean up all local + // resources, including closing files and freeing memory, used by + // the pipe. After this call returns, the system will not make + // further calls on the same pipe. void (*p_destroy)(void *); - /* - * p_send sends the message. If the message cannot be received, then - * the caller may try again with the same message (or free it). If the - * call succeeds, then the transport has taken ownership of the message, - * and the caller may not use it again. The transport will have the - * responsibility to free the message (nng_msg_free()) when it is - * finished with it. - */ - int (*p_send)(void *, nng_msg_t); - - /* - * p_recv recvs the message. This is a blocking operation, and a read - * will be performed even for cases where no data is expected. This - * allows the socket to detect a closed socket, by the returned error - * NNG_ECLOSED. Note that the closed socket condition can arise as either - * a result of a remote peer closing the connection, or a synchronous - * call to p_close. - */ - int (*p_recv)(void *, nng_msg_t *); - - /* - * p_close closes the pipe. Further recv or send operations should - * return back NNG_ECLOSED. - */ + // p_send sends the message. If the message cannot be received, then + // the caller may try again with the same message (or free it). If + // the call succeeds, then the transport has taken ownership of the + // message, and the caller may not use it again. The transport will + // have the responsibility to free the message (nng_msg_free()) when + // it is finished with it. + int (*p_send)(void *, nni_msg *); + + // p_recv recvs the message. This is a blocking operation, and a read + // will be performed even for cases where no data is expected. This + // allows the socket to detect a closed socket, by the returned error + // NNG_ECLOSED. Note that the closed socket condition can arise as + // either a result of a remote peer closing the connection, or a + // synchronous call to p_close. + int (*p_recv)(void *, nng_msg **); + + // p_close closes the pipe. Further recv or send operations should + // return back NNG_ECLOSED. void (*p_close)(void *); - /* - * p_peer returns the peer protocol. This may arrive in whatever - * transport specific manner is appropriate. - */ + // p_peer returns the peer protocol. This may arrive in whatever + // transport specific manner is appropriate. uint16_t (*p_peer)(void *); - /* - * p_getopt gets an pipe (transport-specific) property. These values - * may not be changed once the pipe is created. - */ + // p_getopt gets an pipe (transport-specific) property. These values + // may not be changed once the pipe is created. int (*p_getopt)(void *, int, void *, size_t *); }; -/* - * These APIs are used by the framework internally, and not for use by - * transport implementations. - */ -extern struct nni_transport *nni_transport_find(const char *); +// These APIs are used by the framework internally, and not for use by +// transport implementations. +extern nni_transport *nni_transport_find(const char *); extern void nni_transport_init(void); extern void nni_transport_fini(void); -#endif /* CORE_TRANSPORT_H */ +#endif // CORE_TRANSPORT_H -- cgit v1.2.3-70-g09d2