summaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-22 17:38:46 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-22 17:38:46 -0800
commit89c847f1f52969ee2ae6ed35018eef40366ca061 (patch)
treeba3f3c2da64e4a74c69d315b2198df59bcd4441b /src/core
parent934c1316ae47754a2e368c65228c3cbfe552680f (diff)
downloadnng-89c847f1f52969ee2ae6ed35018eef40366ca061.tar.gz
nng-89c847f1f52969ee2ae6ed35018eef40366ca061.tar.bz2
nng-89c847f1f52969ee2ae6ed35018eef40366ca061.zip
Work on endpoints. More C99 & type cleanups.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c123
-rw-r--r--src/core/endpt.h29
-rw-r--r--src/core/list.h47
-rw-r--r--src/core/message.c186
-rw-r--r--src/core/message.h51
-rw-r--r--src/core/msgqueue.c32
-rw-r--r--src/core/msgqueue.h14
-rw-r--r--src/core/pipe.c48
-rw-r--r--src/core/pipe.h8
-rw-r--r--src/core/platform.h1
-rw-r--r--src/core/protocol.c40
-rw-r--r--src/core/protocol.h181
-rw-r--r--src/core/socket.c37
-rw-r--r--src/core/transport.c46
-rw-r--r--src/core/transport.h198
15 files changed, 533 insertions, 508 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
new file mode 100644
index 00000000..b7181ce8
--- /dev/null
+++ b/src/core/endpt.c
@@ -0,0 +1,123 @@
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+// Functionality realited to end points.
+#if 0
+struct nng_endpt {
+ struct nni_endpt_ops ep_ops;
+ void * ep_data;
+ nni_list_node_t ep_sock_node;
+ nni_socket * ep_sock;
+ char ep_addr[NNG_MAXADDRLEN];
+ nni_thread * ep_dialer;
+ nni_thread * ep_listener;
+ int ep_close;
+ nni_mutex ep_mx;
+ nni_cond ep_cv;
+};
+#endif
+
+int
+nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
+{
+ nni_transport *tran;
+ nni_endpt *ep;
+ int rv;
+
+ if ((tran = nni_transport_find(addr)) == NULL) {
+ return (NNG_EINVAL);
+ }
+ if (strlen(addr) >= NNG_MAXADDRLEN) {
+ return (NNG_EINVAL);
+ }
+
+ if ((ep = nni_alloc(sizeof (*ep))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ ep->ep_dialer = NULL;
+ ep->ep_listener = NULL;
+ ep->ep_close = 0;
+ if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) {
+ nni_free(ep, sizeof (*ep));
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_cond_init(&ep->ep_cv, &ep->ep_mx)) != 0) {
+ nni_mutex_fini(&ep->ep_mx);
+ nni_free(ep, sizeof (*ep));
+ return (NNG_ENOMEM);
+ }
+
+ // Could safely use strcpy here, but this avoids discussion.
+ (void) snprintf(ep->ep_addr, sizeof (ep->ep_addr), "%s", addr);
+ ep->ep_sock = sock;
+ ep->ep_ops = *tran->tran_ep_ops;
+
+ rv = ep->ep_ops.ep_create(&ep->ep_data, addr, nni_socket_proto(sock));
+ if (rv != 0) {
+ nni_cond_fini(&ep->ep_cv);
+ nni_mutex_fini(&ep->ep_mx);
+ nni_free(ep, sizeof (*ep));
+ return (rv);
+ }
+ NNI_LIST_INIT(&ep->ep_pipes, nni_pipe, p_ep_node);
+ *epp = ep;
+ return (0);
+}
+
+void
+nni_endpt_destroy(nni_endpt *ep)
+{
+ // We should already have been closed at this point, so this
+ // should proceed very quickly.
+ if (ep->ep_dialer) {
+ nni_thread_reap(ep->ep_dialer);
+ }
+ if (ep->ep_listener) {
+ nni_thread_reap(ep->ep_listener);
+ }
+ nni_mutex_enter(&ep->ep_mx);
+ while (nni_list_first(&ep->ep_pipes) != NULL) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ ep->ep_ops.ep_destroy(ep->ep_data);
+
+ nni_cond_fini(&ep->ep_cv);
+ nni_mutex_fini(&ep->ep_mx);
+ nni_free(ep, sizeof (*ep));
+}
+
+void
+nni_endpt_close(nni_endpt *ep)
+{
+ nni_mutex_enter(&ep->ep_mx);
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ return;
+ }
+ ep->ep_close = 1;
+ nni_cond_broadcast(&ep->ep_cv);
+ nni_mutex_exit(&ep->ep_mx);
+ ep->ep_ops.ep_close(ep->ep_data);
+}
+
+
+#if 0
+int nni_endpt_dial(nni_endpt *, nni_pipe **);
+int nni_endpt_listen(nni_endpt *);
+int nni_endpt_accept(nni_endpt *, nni_pipe **);
+int nni_endpt_close(nni_endpt *);
+#endif
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 0fa69678..4054a997 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -12,27 +12,28 @@
#include "core/transport.h"
-// NB: This structure is supplied here for use by the CORE. Use of this library
-// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
-// TRANSPORTS.
+// NB: This structure is supplied here for use by the CORE. Use of this
+// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS
+// OR TRANSPORTS.
struct nng_endpt {
struct nni_endpt_ops ep_ops;
- void * ep_tran;
- nni_list_node_t ep_sock_node;
+ void * ep_data; // Transport private
+ nni_list_node ep_sock_node; // Per socket list
nni_socket * ep_sock;
- const char * ep_addr;
- nni_thread_t ep_dialer;
- nni_thread_t ep_listener;
+ char ep_addr[NNG_MAXADDRLEN];
+ nni_thread * ep_dialer;
+ nni_thread * ep_listener;
int ep_close;
nni_mutex ep_mx;
nni_cond ep_cv;
+ nni_list ep_pipes; // Active list of pipes
};
-int nni_endpt_create(nni_endpt **, nni_socket *, const char *);
-void nni_endpt_destroy(nni_endpt *);
-int nni_endpt_dial(nni_endpt *, nni_pipe **);
-int nni_endpt_listen(nni_endpt *);
-int nni_endpt_accept(nni_endpt *, nni_pipe **);
-int nni_endpt_close(nni_endpt *);
+extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *);
+extern void nni_endpt_destroy(nni_endpt *);
+extern int nni_endpt_dial(nni_endpt *, nni_pipe **);
+extern int nni_endpt_listen(nni_endpt *);
+extern int nni_endpt_accept(nni_endpt *, nni_pipe **);
+extern void nni_endpt_close(nni_endpt *);
#endif // CORE_ENDPT_H
diff --git a/src/core/list.h b/src/core/list.h
index 65d6ae9e..68e6080d 100644
--- a/src/core/list.h
+++ b/src/core/list.h
@@ -1,43 +1,44 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#ifndef CORE_LIST_H
#define CORE_LIST_H
#include "core/nng_impl.h"
-/*
- * In order to make life easy, we just define the list structures
- * directly, and let consumers directly inline structures.
- */
+// In order to make life easy, we just define the list structures
+// directly, and let consumers directly inline structures.
typedef struct nni_list_node {
struct nni_list_node * ln_next;
struct nni_list_node * ln_prev;
-} nni_list_node_t;
+} nni_list_node;
typedef struct nni_list {
struct nni_list_node ll_head;
size_t ll_offset;
-} nni_list_t;
+} nni_list;
-extern void nni_list_init_offset(nni_list_t *list, size_t offset);
+typedef nni_list nni_list_t;
+typedef nni_list_node nni_list_node_t;
+
+extern void nni_list_init_offset(nni_list *list, size_t offset);
#define NNI_LIST_INIT(list, type, field) \
nni_list_init_offset(list, offsetof(type, field))
-extern void *nni_list_first(nni_list_t *);
-extern void *nni_list_last(nni_list_t *);
-extern void nni_list_append(nni_list_t *, void *);
-extern void nni_list_prepend(nni_list_t *, void *);
-extern void *nni_list_next(nni_list_t *, void *);
-extern void *nni_list_prev(nni_list_t *, void *);
-extern void nni_list_remove(nni_list_t *, void *);
-extern void nni_list_node_init(nni_list_t *, void *);
+extern void *nni_list_first(nni_list *);
+extern void *nni_list_last(nni_list *);
+extern void nni_list_append(nni_list *, void *);
+extern void nni_list_prepend(nni_list *, void *);
+extern void *nni_list_next(nni_list *, void *);
+extern void *nni_list_prev(nni_list *, void *);
+extern void nni_list_remove(nni_list *, void *);
+extern void nni_list_node_init(nni_list *, void *);
#define NNI_LIST_FOREACH(l, it) \
for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it))
diff --git a/src/core/message.c b/src/core/message.c
index 4ba30ed6..6ec47cb9 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -1,51 +1,47 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#include <stdlib.h>
#include <string.h>
#include "core/nng_impl.h"
-/*
- * Message API.
- */
+// Message API.
-/* Message chunk, internal to the message implementation. */
+// Message chunk, internal to the message implementation.
typedef struct {
- size_t ch_cap; /* allocated size */
- size_t ch_len; /* length in use */
- uint8_t * ch_buf; /* underlying buffer */
- uint8_t * ch_ptr; /* pointer to actual data */
-} chunk_t;
+ size_t ch_cap; // allocated size
+ size_t ch_len; // length in use
+ uint8_t * ch_buf; // underlying buffer
+ uint8_t * ch_ptr; // pointer to actual data
+} nni_chunk;
-/* Underlying message chunk. */
+// Underlying message structure.
struct nng_msg {
- chunk_t m_header;
- chunk_t m_body;
- int64_t m_expire; /* Unix usec */
- nni_pipe_t m_pipe; /* Pipe message was received on */
+ nni_chunk m_header;
+ nni_chunk m_body;
+ nni_time m_expire; // usec
+ nni_pipe * m_pipe; // Pipe message was received on
};
-/*
- * chunk_grow increases the underlying space for a chunk. It ensures
- * that the desired amount of trailing space (including the length)
- * and headroom (excluding the length) are available. It also copies
- * any extant referenced data. Note that the capacity will increase,
- * but not the length. To increase the length of the referenced data,
- * use either chunk_append or chunk_prepend.
- *
- * Note that having some headroom is useful when data must be prepended
- * to a message - it avoids having to perform extra data copies, so we
- * encourage initial allocations to start with sufficient room.
- */
+// nni_chunk_grow increases the underlying space for a chunk. It ensures
+// that the desired amount of trailing space (including the length)
+// and headroom (excluding the length) are available. It also copies
+// any extant referenced data. Note that the capacity will increase,
+// but not the length. To increase the length of the referenced data,
+// use either chunk_append or chunk_prepend.
+//
+// Note that having some headroom is useful when data must be prepended
+// to a message - it avoids having to perform extra data copies, so we
+// encourage initial allocations to start with sufficient room.
static int
-chunk_grow(chunk_t *ch, size_t newsz, size_t headwanted)
+nni_chunk_grow(nni_chunk *ch, size_t newsz, size_t headwanted)
{
size_t headroom = 0;
uint8_t *newbuf;
@@ -111,7 +107,7 @@ chunk_grow(chunk_t *ch, size_t newsz, size_t headwanted)
static void
-chunk_free(chunk_t *ch)
+nni_chunk_free(nni_chunk *ch)
{
if ((ch->ch_cap != 0) && (ch->ch_buf != NULL)) {
nni_free(ch->ch_buf, ch->ch_cap);
@@ -123,9 +119,9 @@ chunk_free(chunk_t *ch)
}
-/* chunk_trunc truncates the number of bytes from the end of the chunk. */
+// nni_chunk_trunc truncates bytes from the end of the chunk.
static int
-chunk_trunc(chunk_t *ch, size_t len)
+nni_chunk_trunc(nni_chunk *ch, size_t len)
{
if (ch->ch_len < len) {
return (NNG_EINVAL);
@@ -135,9 +131,9 @@ chunk_trunc(chunk_t *ch, size_t len)
}
-/* chunk_trim removes the number of bytes from the beginning of the chunk. */
+// nni_chunk_trim removes bytes from the beginning of the chunk.
static int
-chunk_trim(chunk_t *ch, size_t len)
+nni_chunk_trim(nni_chunk *ch, size_t len)
{
if (ch->ch_len < len) {
return (NNG_EINVAL);
@@ -148,20 +144,18 @@ chunk_trim(chunk_t *ch, size_t len)
}
-/*
- * chunk_append appends the data to the chunk, growing the size as necessary.
- * If the data pointer is NULL, then the chunk data region is allocated, but
- * uninitialized.
- */
+// nni_chunk_append appends the data to the chunk, growing as necessary.
+// If the data pointer is NULL, then the chunk data region is allocated,
+// but uninitialized.
static int
-chunk_append(chunk_t *ch, const void *data, size_t len)
+nni_chunk_append(nni_chunk *ch, const void *data, size_t len)
{
int rv;
if (len == 0) {
return (0);
}
- if ((rv = chunk_grow(ch, len + ch->ch_len, 0)) != 0) {
+ if ((rv = nni_chunk_grow(ch, len + ch->ch_len, 0)) != 0) {
return (rv);
}
if (ch->ch_ptr == NULL) {
@@ -175,13 +169,11 @@ chunk_append(chunk_t *ch, const void *data, size_t len)
}
-/*
- * chunk_prepend prepends data to the chunk, as efficiently as possible.
- * If the data pointer is NULL, then no data is actually copied, but the
- * data region will have "grown" in the beginning, with uninitialized data.
- */
+// nni_chunk_prepend prepends data to the chunk, as efficiently as possible.
+// If the data pointer is NULL, then no data is actually copied, but the
+// data region will have "grown" in the beginning, with uninitialized data.
static int
-chunk_prepend(chunk_t *ch, const void *data, size_t len)
+nni_chunk_prepend(nni_chunk *ch, const void *data, size_t len)
{
int rv;
@@ -197,7 +189,7 @@ chunk_prepend(chunk_t *ch, const void *data, size_t len)
} else if ((ch->ch_len + len) <= ch->ch_cap) {
/* We had enough capacity, just shuffle data down. */
memmove(ch->ch_ptr + len, ch->ch_ptr, ch->ch_len);
- } else if ((rv = chunk_grow(ch, 0, len)) == 0) {
+ } else if ((rv = nni_chunk_grow(ch, 0, len)) == 0) {
/* We grew the chunk, so adjust. */
ch->ch_ptr -= len;
} else {
@@ -215,42 +207,38 @@ chunk_prepend(chunk_t *ch, const void *data, size_t len)
int
-nni_msg_alloc(nni_msg_t *mp, size_t sz)
+nni_msg_alloc(nni_msg **mp, size_t sz)
{
- nni_msg_t m;
+ nni_msg *m;
int rv;
if ((m = nni_alloc(sizeof (*m))) == NULL) {
return (NNG_ENOMEM);
}
- /*
- * 64-bytes of header, including room for 32 bytes
- * of headroom and 32 bytes of trailer.
- */
- if ((rv = chunk_grow(&m->m_header, 32, 32)) != 0) {
+ // 64-bytes of header, including room for 32 bytes
+ // of headroom and 32 bytes of trailer.
+ if ((rv = nni_chunk_grow(&m->m_header, 32, 32)) != 0) {
nni_free(m, sizeof (*m));
return (rv);
}
- /*
- * If the message is less than 1024 bytes, or is not power
- * of two aligned, then we insert a 32 bytes of headroom
- * to allow for inlining backtraces, etc. We also allow the
- * amount of space at the end for the same reason. Large aligned
- * allocations are unmolested to avoid excessive overallocation.
- */
+ // If the message is less than 1024 bytes, or is not power
+ // of two aligned, then we insert a 32 bytes of headroom
+ // to allow for inlining backtraces, etc. We also allow the
+ // amount of space at the end for the same reason. Large aligned
+ // allocations are unmolested to avoid excessive overallocation.
if ((sz < 1024) || ((sz & (sz-1)) != 0)) {
- rv = chunk_grow(&m->m_body, sz + 32, 32);
+ rv = nni_chunk_grow(&m->m_body, sz + 32, 32);
} else {
- rv = chunk_grow(&m->m_body, sz, 0);
+ rv = nni_chunk_grow(&m->m_body, sz, 0);
}
if (rv != 0) {
- chunk_free(&m->m_header);
+ nni_chunk_free(&m->m_header);
nni_free(m, sizeof (*m));
}
- if ((rv = chunk_append(&m->m_body, NULL, sz)) != 0) {
- /* Should not happen since we just grew it to fit. */
+ if ((rv = nni_chunk_append(&m->m_body, NULL, sz)) != 0) {
+ // Should not happen since we just grew it to fit.
nni_panic("chunk_append failed");
}
@@ -260,34 +248,34 @@ nni_msg_alloc(nni_msg_t *mp, size_t sz)
void
-nni_msg_free(nni_msg_t m)
+nni_msg_free(nni_msg *m)
{
- chunk_free(&m->m_header);
- chunk_free(&m->m_body);
+ nni_chunk_free(&m->m_header);
+ nni_chunk_free(&m->m_body);
nni_free(m, sizeof (*m));
}
int
-nni_msg_realloc(nni_msg_t m, size_t sz)
+nni_msg_realloc(nni_msg *m, size_t sz)
{
int rv = 0;
if (m->m_body.ch_len < sz) {
- rv = chunk_append(&m->m_body, NULL, sz - m->m_body.ch_len);
+ rv = nni_chunk_append(&m->m_body, NULL, sz - m->m_body.ch_len);
if (rv != 0) {
return (rv);
}
} else {
- /* "Shrinking", just mark bytes at end usable again. */
- chunk_trunc(&m->m_body, m->m_body.ch_len - sz);
+ // "Shrinking", just mark bytes at end usable again.
+ nni_chunk_trunc(&m->m_body, m->m_body.ch_len - sz);
}
return (0);
}
void *
-nni_msg_header(nni_msg_t m, size_t *szp)
+nni_msg_header(nni_msg *m, size_t *szp)
{
if (szp != NULL) {
*szp = m->m_header.ch_len;
@@ -297,7 +285,7 @@ nni_msg_header(nni_msg_t m, size_t *szp)
void *
-nni_msg_body(nni_msg_t m, size_t *szp)
+nni_msg_body(nni_msg *m, size_t *szp)
{
if (szp != NULL) {
*szp = m->m_body.ch_len;
@@ -307,63 +295,63 @@ nni_msg_body(nni_msg_t m, size_t *szp)
int
-nni_msg_append(nni_msg_t m, const void *data, size_t len)
+nni_msg_append(nni_msg *m, const void *data, size_t len)
{
- return (chunk_append(&m->m_body, data, len));
+ return (nni_chunk_append(&m->m_body, data, len));
}
int
-nni_msg_prepend(nni_msg_t m, const void *data, size_t len)
+nni_msg_prepend(nni_msg *m, const void *data, size_t len)
{
- return (chunk_prepend(&m->m_body, data, len));
+ return (nni_chunk_prepend(&m->m_body, data, len));
}
int
-nni_msg_trim(nni_msg_t m, size_t len)
+nni_msg_trim(nni_msg *m, size_t len)
{
- return (chunk_trim(&m->m_body, len));
+ return (nni_chunk_trim(&m->m_body, len));
}
int
-nni_msg_trunc(nni_msg_t m, size_t len)
+nni_msg_trunc(nni_msg *m, size_t len)
{
- return (chunk_trunc(&m->m_body, len));
+ return (nni_chunk_trunc(&m->m_body, len));
}
int
-nni_msg_append_header(nni_msg_t m, const void *data, size_t len)
+nni_msg_append_header(nni_msg *m, const void *data, size_t len)
{
- return (chunk_append(&m->m_header, data, len));
+ return (nni_chunk_append(&m->m_header, data, len));
}
int
-nni_msg_prepend_header(nni_msg_t m, const void *data, size_t len)
+nni_msg_prepend_header(nni_msg *m, const void *data, size_t len)
{
- return (chunk_prepend(&m->m_header, data, len));
+ return (nni_chunk_prepend(&m->m_header, data, len));
}
int
-nni_msg_trim_header(nni_msg_t m, size_t len)
+nni_msg_trim_header(nni_msg *m, size_t len)
{
- return (chunk_trim(&m->m_header, len));
+ return (nni_chunk_trim(&m->m_header, len));
}
int
-nni_msg_trunc_header(nni_msg_t m, size_t len)
+nni_msg_trunc_header(nni_msg *m, size_t len)
{
- return (chunk_trunc(&m->m_header, len));
+ return (nni_chunk_trunc(&m->m_header, len));
}
int
-nni_msg_pipe(nni_msg_t m, nni_pipe_t *pp)
+nni_msg_pipe(nni_msg *m, nni_pipe **pp)
{
*pp = m->m_pipe;
return (0);
diff --git a/src/core/message.h b/src/core/message.h
index 7b17a007..5c742e44 100644
--- a/src/core/message.h
+++ b/src/core/message.h
@@ -1,33 +1,30 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#ifndef CORE_MESSAGE_H
#define CORE_MESSAGE_H
-/*
- * Internally used message API. Again, this stuff is not part of our public
- * API.
- */
+// Internally used message API. Again, this is not part of our public API.
-extern int nni_msg_alloc(nni_msg_t *, size_t);
-extern void nni_msg_free(nni_msg_t);
-extern int nni_msg_realloc(nni_msg_t, size_t);
-extern void *nni_msg_header(nni_msg_t, size_t *);
-extern void *nni_msg_body(nni_msg_t, size_t *);
-extern int nni_msg_append(nni_msg_t, const void *, size_t);
-extern int nni_msg_prepend(nni_msg_t, const void *, size_t);
-extern int nni_msg_append_header(nni_msg_t, const void *, size_t);
-extern int nni_msg_prepend_header(nni_msg_t, const void *, size_t);
-extern int nni_msg_trim(nni_msg_t, size_t);
-extern int nni_msg_trunc(nni_msg_t, size_t);
-extern int nni_msg_trim_header(nni_msg_t, size_t);
-extern int nni_msg_trunc_header(nni_msg_t, size_t);
-extern int nni_msg_pipe(nni_msg_t, nni_pipe_t *);
+extern int nni_msg_alloc(nni_msg **, size_t);
+extern void nni_msg_free(nni_msg *);
+extern int nni_msg_realloc(nni_msg *, size_t);
+extern void *nni_msg_header(nni_msg *, size_t *);
+extern void *nni_msg_body(nni_msg *, size_t *);
+extern int nni_msg_append(nni_msg *, const void *, size_t);
+extern int nni_msg_prepend(nni_msg *, const void *, size_t);
+extern int nni_msg_append_header(nni_msg *, const void *, size_t);
+extern int nni_msg_prepend_header(nni_msg *, const void *, size_t);
+extern int nni_msg_trim(nni_msg *, size_t);
+extern int nni_msg_trunc(nni_msg *, size_t);
+extern int nni_msg_trim_header(nni_msg *, size_t);
+extern int nni_msg_trunc_header(nni_msg *, size_t);
+extern int nni_msg_pipe(nni_msg *, nni_pipe **);
-#endif /* CORE_SOCKET_H */
+#endif // CORE_SOCKET_H
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 7d892b88..b8d11e8c 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -52,7 +52,7 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap)
nni_mutex_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
- if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) {
+ if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) {
nni_cond_fini(&mq->mq_writeable);
nni_cond_fini(&mq->mq_readable);
nni_mutex_fini(&mq->mq_lock);
@@ -90,7 +90,7 @@ nni_msgqueue_destroy(nni_msgqueue *mq)
nni_msg_free(msg);
}
- nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t));
+ nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg *));
nni_free(mq, sizeof (*mq));
}
@@ -130,6 +130,8 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
(void) nni_cond_waituntil(&mq->mq_writeable, expire);
}
+ // Once a queue is closed, you can't write to it. It can still be
+ // read from, at least until its empty.
if (mq->mq_closed) {
nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
@@ -175,19 +177,19 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
(void) nni_cond_waituntil(&mq->mq_readable, expire);
}
- if (mq->mq_closed) {
- nni_mutex_exit(&mq->mq_lock);
- return (NNG_ECLOSED);
- }
+ // If there is any data left in the message queue, we will still
+ // provide it, so that the reader can drain.
+ if (mq->mq_len == 0) {
+ if (mq->mq_closed) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_ECLOSED);
+ }
- if ((mq->mq_len == 0) && (*signal)) {
- // We are being interrupted. We only allow an interrupt
- // if there is no data though, because we'd really prefer
- // to give back the data. Otherwise our failure to deal
- // with the data could lead to starvation; also lingering
- // relies on this not interrupting if data is pending.
- nni_mutex_exit(&mq->mq_lock);
- return (NNG_EINTR);
+ if (*signal) {
+ // We are being interrupted.
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_EINTR);
+ }
}
*msgp = mq->mq_msgs[mq->mq_get];
@@ -258,7 +260,7 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire)
void
nni_msgqueue_close(nni_msgqueue *mq)
{
- nni_msg_t msg;
+ nni_msg *msg;
nni_mutex_enter(&mq->mq_lock);
mq->mq_closed = 1;
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index d97482af..adea2dfa 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -15,8 +15,18 @@
// Message queues. Message queues work in some ways like Go channels;
// they are a thread-safe way to pass messages between subsystems. They
// do have additional capabilities though.
-typedef struct nni_msgqueue * nni_msgqueue_t;
-typedef struct nni_msgqueue nni_msgqueue;
+//
+// A closed message queue cannot be written to, but if there are messages
+// still in it, it can be read from. (This allows them to play a role in
+// draining/lingering.)
+//
+// Message queues can be closed many times safely.
+//
+// Readers & writers in a message queue can be woken either by a timeout
+// or by a specific signal (arranged by the caller).
+//
+// TODO: Add message queue growing, and pushback.
+typedef struct nni_msgqueue nni_msgqueue;
// nni_msgqueue_create creates a message queue with the given capacity,
// which must be a positive number. It returns NNG_EINVAL if the capacity
diff --git a/src/core/pipe.c b/src/core/pipe.c
index a617cf85..58a31d7f 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -1,64 +1,60 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#include "core/nng_impl.h"
-/*
- * This file contains functions relating to pipes.
- *
- * Operations on pipes (to the transport) are generally blocking operations,
- * performed in the context of the protocol.
- */
+// This file contains functions relating to pipes.
+//
+// Operations on pipes (to the transport) are generally blocking operations,
+// performed in the context of the protocol.
-/* nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. */
+// nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces.
uint32_t
-nni_pipe_id(nni_pipe_t p)
+nni_pipe_id(nni_pipe * p)
{
return (p->p_id);
}
int
-nni_pipe_send(nni_pipe_t p, nng_msg_t msg)
+nni_pipe_send(nni_pipe * p, nng_msg *msg)
{
return (p->p_ops.p_send(p->p_tran, msg));
}
int
-nni_pipe_recv(nni_pipe_t p, nng_msg_t *msgp)
+nni_pipe_recv(nni_pipe * p, nng_msg **msgp)
{
return (p->p_ops.p_recv(p->p_tran, msgp));
}
-/*
- * nni_pipe_close closes the underlying connection. It is expected that
- * subsequent attempts receive or send (including any waiting receive) will
- * simply return NNG_ECLOSED.
- */
+// nni_pipe_close closes the underlying connection. It is expected that
+// subsequent attempts receive or send (including any waiting receive) will
+// simply return NNG_ECLOSED.
void
-nni_pipe_close(nni_pipe_t p)
+nni_pipe_close(nni_pipe * p)
{
p->p_ops.p_close(p->p_tran);
}
uint16_t
-nni_pipe_peer(nni_pipe_t p)
+nni_pipe_peer(nni_pipe * p)
{
return (p->p_ops.p_peer(p->p_tran));
}
void
-nni_pipe_destroy(nni_pipe_t p)
+nni_pipe_destroy(nni_pipe * p)
{
p->p_ops.p_destroy(p->p_tran);
nni_free(p, sizeof (*p));
diff --git a/src/core/pipe.h b/src/core/pipe.h
index a4a03a93..b78eaa2a 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -22,16 +22,16 @@ struct nng_pipe {
uint32_t p_id;
struct nni_pipe_ops p_ops;
void * p_tran;
- nni_list_node_t p_sock_node;
+ nni_list_node p_sock_node;
nni_socket * p_sock;
- nni_list_node_t p_ep_node;
+ nni_list_node p_ep_node;
nni_endpt * p_ep;
};
// Pipe operations that protocols use.
-extern int nni_pipe_recv(nni_pipe *, nng_msg_t *);
-extern int nni_pipe_send(nni_pipe *, nng_msg_t);
+extern int nni_pipe_recv(nni_pipe *, nng_msg **);
+extern int nni_pipe_send(nni_pipe *, nng_msg *);
extern uint32_t nni_pipe_id(nni_pipe *);
extern void nni_pipe_close(nni_pipe *);
diff --git a/src/core/platform.h b/src/core/platform.h
index 58e4cd36..a674aac5 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -113,7 +113,6 @@ extern void nni_cond_wait(nni_cond *);
// check the condition. It will return either NNG_ETIMEDOUT, or 0.
extern int nni_cond_waituntil(nni_cond *, nni_time);
-typedef struct nni_thread * nni_thread_t;
typedef struct nni_thread nni_thread;
// nni_thread_creates a thread that runs the given function. The thread
diff --git a/src/core/protocol.c b/src/core/protocol.c
index 113746b1..6399cd41 100644
--- a/src/core/protocol.c
+++ b/src/core/protocol.c
@@ -1,37 +1,33 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#include <string.h>
#include "core/nng_impl.h"
-/*
- * Protocol related stuff - generically.
- */
+// Protocol related stuff - generically.
-/*
- * The list of protocols is hardwired. This is reasonably unlikely to
- * change, as adding new protocols is not something intended to be done
- * outside of the core.
- */
-extern struct nni_protocol nni_pair_protocol;
+// The list of protocols is hardwired. This is reasonably unlikely to
+// change, as adding new protocols is not something intended to be done
+// outside of the core.
+extern nni_protocol nni_pair_protocol;
-static struct nni_protocol *protocols[] = {
+static nni_protocol *protocols[] = {
&nni_pair_protocol,
NULL
};
-struct nni_protocol *
+nni_protocol *
nni_protocol_find(uint16_t num)
{
int i;
- struct nni_protocol *p;
+ nni_protocol *p;
for (i = 0; (p = protocols[i]) != NULL; i++) {
if (p->proto_self == num) {
@@ -45,7 +41,7 @@ nni_protocol_find(uint16_t num)
const char *
nni_protocol_name(uint16_t num)
{
- struct nni_protocol *p;
+ nni_protocol *p;
if ((p = nni_protocol_find(num)) == NULL) {
return (NULL);
@@ -57,7 +53,7 @@ nni_protocol_name(uint16_t num)
uint16_t
nni_protocol_number(const char *name)
{
- struct nni_protocol *p;
+ nni_protocol *p;
int i;
for (i = 0; (p = protocols[i]) != NULL; i++) {
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 21d3b6b9..a4b24177 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -1,128 +1,89 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#ifndef CORE_PROTOCOL_H
#define CORE_PROTOCOL_H
-/*
- * Protocol implementation details. Protocols must implement the
- * interfaces in this file. Note that implementing new protocols is
- * not necessarily intended to be a trivial task. The protocol developer
- * must understand the nature of nng, as they are responsible for handling
- * most of the logic. The protocol generally does most of the work for
- * locking, and calls into the transport's pipe functions to do actual
- * work, and the pipe functions generally assume no locking is needed.
- * As a consequence, most of the concurrency in nng exists in the protocol
- * implementations.
- *
- * Pipe operations may block, or even reenter the protoccol entry points
- * (for example nni_pipe_close() causes the protocols proto_remove_pipe
- * entry point to be called), so it is very important that protocols do
- * not hold any locks across calls to pipe functions.
- */
-
+// Protocol implementation details. Protocols must implement the
+// interfaces in this file. Note that implementing new protocols is
+// not necessarily intended to be a trivial task. The protocol developer
+// must understand the nature of nng, as they are responsible for handling
+// most of the logic. The protocol generally does most of the work for
+// locking, and calls into the transport's pipe functions to do actual
+// work, and the pipe functions generally assume no locking is needed.
+// As a consequence, most of the concurrency in nng exists in the protocol
+// implementations.
struct nni_protocol {
- /*
- * Protocol information.
- */
- uint16_t proto_self; /* our 16-bit protocol ID */
- uint16_t proto_peer; /* who we peer with (protocol ID) */
- const char * proto_name; /* string version of our name */
-
- /*
- * Create protocol instance data, which will be stored on the socket.
- */
- int (*proto_create)(void **, nni_socket_t);
-
- /*
- * Destroy the protocol instance.
- */
+ uint16_t proto_self; // our 16-bit protocol ID
+ uint16_t proto_peer; // who we peer with (protocol ID)
+ const char * proto_name; // string version of our name
+
+ //Create protocol instance, which will be stored on the socket.
+ int (*proto_create)(void **, nni_socket *);
+
+ // Destroy the protocol instance.
void (*proto_destroy)(void *);
- /*
- * Shutdown the protocol instance, including giving time to
- * drain any outbound frames (linger). The protocol is not
- * required to honor the linger.
- * XXX: This is probably redundant -- protocol should notice
- * drain by getting NNG_ECLOSED on the upper write queue.
- */
+ // Shutdown the protocol instance, including giving time to
+ // drain any outbound frames (linger). The protocol is not
+ // required to honor the linger.
+ // XXX: This is probably redundant -- protocol should notice
+ // drain by getting NNG_ECLOSED on the upper write queue.
void (*proto_shutdown)(void *);
- /*
- * Add and remove pipes. These are called as connections are
- * created or destroyed.
- */
- int (*proto_add_pipe)(void *, nni_pipe_t);
- int (*proto_rem_pipe)(void *, nni_pipe_t);
+ // Add and remove pipes. These are called as connections are
+ // created or destroyed.
+ int (*proto_add_pipe)(void *, nni_pipe *);
+ int (*proto_rem_pipe)(void *, nni_pipe *);
- /*
- * Option manipulation. These may be NULL.
- */
+ // Option manipulation. These may be NULL.
int (*proto_setopt)(void *, int, const void *, size_t);
int (*proto_getopt)(void *, int, void *, size_t *);
- /*
- * Receive filter. This may be NULL, but if it isn't, then
- * messages coming into the system are routed here just before
- * being delivered to the application. To drop the message,
- * the protocol should return NULL, otherwise the message
- * (possibly modified).
- */
- nng_msg_t (*proto_recv_filter)(void *, nni_msg_t);
-
- /*
- * Send filter. This may be NULL, but if it isn't, then
- * messages here are filtered just after they come from the
- * application.
- */
- nng_msg_t (*proto_send_filter)(void *, nni_msg_t);
+ // Receive filter. This may be NULL, but if it isn't, then
+ // messages coming into the system are routed here just before being
+ // delivered to the application. To drop the message, the prtocol
+ // should return NULL, otherwise the message (possibly modified).
+ nni_msg * (*proto_recv_filter)(void *, nni_msg *);
+
+ // Send filter. This may be NULL, but if it isn't, then messages
+ // here are filtered just after they come from the application.
+ nni_msg * (*proto_send_filter)(void *, nni_msg *);
};
-/*
- * These are socket methods that protocol operations can
- * reasonably expect to call.
- */
-
-/*
- * nni_socket_sendq obtains the upper writeq. The protocol should
- * recieve messages from this, and place them on the appropriate
- * pipe.
- */
-extern nni_msgqueue_t nni_socket_sendq(nni_socket_t);
-
-/*
- * nni_socket_recvq obtains the upper readq. The protocol should
- * inject incoming messages from pipes to it.
- */
-extern nni_msgqueue_t nni_socket_recvq(nni_socket_t);
-
-/*
- * nni_socket_recv_err sets an error code to be returned to clients
- * rather than waiting for a message. Set it to 0 to resume normal
- * receive operation.
- */
-extern void nni_socket_recv_err(nni_socket_t, int);
-
-/*
- * nni_socket_send_err sets an error code to be returned to clients
- * when they try to send, so that they don't have to timeout waiting
- * for their message to be accepted for send. Set it to 0 to resume
- * normal send operations.
- */
-extern void nni_socket_send_err(nni_socket_t, int);
-
-/*
- * These functions are not used by protocols, but rather by the socket
- * core implementation. The lookups can be used by transports as well.
- */
-extern struct nni_protocol *nni_protocol_find(uint16_t);
+// These are socket methods that protocol operations can expect to call.
+// Note that each of these should be called without any locks held, since
+// the socket can reenter the protocol.
+
+// nni_socket_sendq obtains the upper writeq. The protocol should
+// recieve messages from this, and place them on the appropriate pipe.
+extern nni_msgqueue *nni_socket_sendq(nni_socket *);
+
+// nni_socket_recvq obtains the upper readq. The protocol should
+// inject incoming messages from pipes to it.
+extern nni_msgqueue *nni_socket_recvq(nni_socket *);
+
+// nni_socket_recv_err sets an error code to be returned to clients
+// rather than waiting for a message. Set it to 0 to resume normal
+// receive operation.
+extern void nni_socket_recv_err(nni_socket *, int);
+
+// nni_socket_send_err sets an error code to be returned to clients
+// when they try to send, so that they don't have to timeout waiting
+// for their message to be accepted for send. Set it to 0 to resume
+// normal send operations.
+extern void nni_socket_send_err(nni_socket *, int);
+
+// These functions are not used by protocols, but rather by the socket
+// core implementation. The lookups can be used by transports as well.
+extern nni_protocol *nni_protocol_find(uint16_t);
extern const char *nni_protocol_name(uint16_t);
extern uint16_t nni_protocol_number(const char *);
-#endif /* CORE_PROTOCOL_H */
+#endif // CORE_PROTOCOL_H
diff --git a/src/core/socket.c b/src/core/socket.c
index c500db31..1331bb4c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -13,14 +13,14 @@
// nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain
// the upper read and write queues.
-nni_msgqueue_t
+nni_msgqueue *
nni_socket_sendq(nni_socket *s)
{
return (s->s_uwq);
}
-nni_msgqueue_t
+nni_msgqueue *
nni_socket_recvq(nni_socket *s)
{
return (s->s_urq);
@@ -53,8 +53,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
return (rv);
}
- NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node);
- // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node);
+ NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_sock_node);
+ NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_sock_node);
if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) {
nni_cond_fini(&sock->s_cv);
@@ -83,14 +83,7 @@ nni_socket_close(nni_socket *sock)
// Stop all EPS. We're going to do this first, since we know
// we're closing.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
-#if 0
- // XXX: Question: block for them now, or wait further down?
- // Probably we can simply just watch for the first list...
- // stopping EPs should be *dead* easy, and never block.
- nni_ep_stop(ep);
- or nni_ep_shutdown(ep);
-
-#endif
+ nni_endpt_close(ep);
}
nni_mutex_exit(&sock->s_mx);
@@ -133,9 +126,18 @@ nni_socket_close(nni_socket *sock)
}
// Wait to make sure endpoint listeners have shutdown and exited
- // as well. They should have done so *long* ago.
- while (nni_list_first(&sock->s_eps) != NULL) {
- nni_cond_wait(&sock->s_cv);
+ // as well. They should have done so *long* ago. Because this
+ // requires waiting for threads to finish, which *could* in theory
+ // overlap with this, we must drop the socket lock.
+ while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+
+ // TODO: This looks like it should happen as an endpt_remove
+ // operation?
+ nni_list_remove(&sock->s_eps, ep);
+ nni_mutex_exit(&sock->s_mx);
+
+ nni_endpt_destroy(ep);
+ nni_mutex_enter(&sock->s_mx);
}
nni_mutex_exit(&sock->s_mx);
@@ -244,7 +246,6 @@ nni_socket_proto(nni_socket *sock)
return (sock->s_ops.proto_self);
}
-
// nni_socket_rem_pipe removes the pipe from the socket. This is often
// called by the protocol when a pipe is removed due to close.
void
@@ -265,9 +266,11 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
// XXX: TODO: Redial
// XXX: also publish event...
// if (pipe->p_ep != NULL) {
- // nn_endpt_remove_pipe(pipe->p_ep, pipe)
+ // nn_endpt_rem_pipe(pipe->p_ep, pipe)
// }
+ nni_pipe_destroy(pipe);
+
// If we're closing, wake the socket if we finished draining.
if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) {
nni_cond_broadcast(&sock->s_cv);
diff --git a/src/core/transport.c b/src/core/transport.c
index 61ec5033..2e8dee7a 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -1,35 +1,33 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
-
-#include <string.h>
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#include "core/nng_impl.h"
-/*
- * For now the list of transports is hard-wired. Adding new transports
- * to the system dynamically is something that might be considered later.
- */
-extern struct nni_transport nni_inproc_transport;
+#include <string.h>
+
+// For now the list of transports is hard-wired. Adding new transports
+// to the system dynamically is something that might be considered later.
+extern nni_transport nni_inproc_transport;
-static struct nni_transport *transports[] = {
+static nni_transport *transports[] = {
&nni_inproc_transport,
NULL
};
-struct nni_transport *
+nni_transport *
nni_transport_find(const char *addr)
{
- /* address is of the form "<scheme>://blah..." */
+ // address is of the form "<scheme>://blah..."
const char *end;
int len;
int i;
- struct nni_transport *ops;
+ nni_transport *ops;
if ((end = strstr(addr, "://")) == NULL) {
return (NULL);
@@ -44,15 +42,13 @@ nni_transport_find(const char *addr)
}
-/*
- * nni_transport_init initializes the entire transport subsystem, including
- * each individual transport.
- */
+// nni_transport_init initializes the entire transport subsystem, including
+// each individual transport.
void
nni_transport_init(void)
{
int i;
- struct nni_transport *ops;
+ nni_transport *ops;
for (i = 0; (ops = transports[i]) != NULL; i++) {
ops->tran_init();
@@ -64,7 +60,7 @@ void
nni_transport_fini(void)
{
int i;
- struct nni_transport *ops;
+ nni_transport *ops;
for (i = 0; (ops = transports[i]) != NULL; i++) {
if (ops->tran_fini != NULL) {
diff --git a/src/core/transport.h b/src/core/transport.h
index 370e87f5..4e39adaf 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -1,171 +1,123 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#ifndef CORE_TRANSPORT_H
#define CORE_TRANSPORT_H
-/*
- * Transport implementation details. Transports must implement the
- * interfaces in this file.
- */
+// Transport implementation details. Transports must implement the
+// interfaces in this file.
struct nni_transport {
- /*
- * tran_scheme is the transport scheme, such as "tcp" or "inproc".
- */
+ // tran_scheme is the transport scheme, such as "tcp" or "inproc".
const char * tran_scheme;
- /*
- * tran_ep_ops links our endpoint operations.
- */
+ // tran_ep_ops links our endpoint operations.
const struct nni_endpt_ops * tran_ep_ops;
- /*
- * tran_pipe_ops links our pipe operations.
- */
+ // tran_pipe_ops links our pipe operations.
const struct nni_pipe_ops * tran_pipe_ops;
- /*
- * tran_init, if not NULL, is called once during library
- * initialization.
- */
+ // tran_init, if not NULL, is called once during library
+ // initialization.
int (*tran_init)(void);
- /*
- * tran_fini, if not NULL, is called during library deinitialization.
- * It should release any global resources, close any open files, etc.
- *
- * There will be no locks held, and no other threads running in the
- * library.
- *
- * It is invalid to use any mutexes, condition variables, or
- * threading routines. Mutexes and condition variables may be
- * safely destroyed.
- */
+ // tran_fini, if not NULL, is called during library deinitialization.
+ // It should release any global resources, close any open files, etc.
void (*tran_fini)(void);
};
-/*
- * Endpoint operations are called by the socket in a protocol-independent
- * fashion. The socket makes individual calls, which are expected to block
- * if appropriate (except for destroy). Endpoints are unable to call back
- * into the socket, to prevent recusive entry and deadlock.
- */
+
+// Endpoint operations are called by the socket in a protocol-independent
+// fashion. The socket makes individual calls, which are expected to block
+// if appropriate (except for destroy). Endpoints are unable to call back
+// into the socket, to prevent recusive entry and deadlock.
struct nni_endpt_ops {
- /*
- * ep_create creates a vanilla endpoint. The value created is
- * used for the first argument for all other endpoint functions.
- */
+ // ep_create creates a vanilla endpoint. The value created is
+ // used for the first argument for all other endpoint functions.
int (*ep_create)(void **, const char *, uint16_t);
- /*
- * ep_destroy frees the resources associated with the endpoint.
- * The endpoint will already have been closed.
- */
+ // ep_destroy frees the resources associated with the endpoint.
+ // The endpoint will already have been closed.
void (*ep_destroy)(void *);
- /*
- * ep_dial starts dialing, and creates a new pipe,
- * which is returned in the final argument. It can return errors
- * NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED,
- * NNG_ETIMEDOUT, and NNG_EPROTO.
- */
+ // ep_dial starts dialing, and creates a new pipe,
+ // which is returned in the final argument. It can return errors
+ // NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED,
+ // NNG_ETIMEDOUT, and NNG_EPROTO.
int (*ep_dial)(void *, void **);
- /*
- * ep_listen just does the bind() and listen() work,
- * reserving the address but not creating any connections.
- * It should return NNG_EADDRINUSE if the address is already
- * taken. It can also return NNG_EBADADDR for an unsuitable
- * address, or NNG_EACCESS for permission problems.
- */
+ // ep_listen just does the bind() and listen() work,
+ // reserving the address but not creating any connections.
+ // It should return NNG_EADDRINUSE if the address is already
+ // taken. It can also return NNG_EBADADDR for an unsuitable
+ // address, or NNG_EACCESS for permission problems.
int (*ep_listen)(void *);
- /*
- * ep_accept accepts an inbound connection, and creates
- * a transport pipe, which is returned in the final argument.
- */
+ // ep_accept accepts an inbound connection, and creates
+ // a transport pipe, which is returned in the final argument.
int (*ep_accept)(void *, void **);
- /*
- * ep_close stops the endpoint from operating altogether. It does
- * not affect pipes that have already been created.
- */
+ // ep_close stops the endpoint from operating altogether. It does
+ // not affect pipes that have already been created.
void (*ep_close)(void *);
- /* ep_setopt sets an endpoint (transport-specific) option */
+ // ep_setopt sets an endpoint (transport-specific) option.
int (*ep_setopt)(void *, int, const void *, size_t);
- /* ep_getopt gets an endpoint (transport-specific) option */
+ // ep_getopt gets an endpoint (transport-specific) option.
int (*ep_getopt)(void *, int, void *, size_t *);
};
-/*
- * Pipe operations are entry points called by the socket. These may be called
- * with socket locks held, so it is forbidden for the transport to call
- * back into the socket at this point. (Which is one reason pointers back
- * to socket or even enclosing pipe state, are not provided.)
- */
+// Pipe operations are entry points called by the socket. These may be called
+// with socket locks held, so it is forbidden for the transport to call
+// back into the socket at this point. (Which is one reason pointers back
+// to socket or even enclosing pipe state, are not provided.)
struct nni_pipe_ops {
- /*
- * p_destroy destroys the pipe. This should clean up all local resources,
- * including closing files and freeing memory, used by the pipe. After
- * this call returns, the system will not make further calls on the same
- * pipe.
- */
+ // p_destroy destroys the pipe. This should clean up all local
+ // resources, including closing files and freeing memory, used by
+ // the pipe. After this call returns, the system will not make
+ // further calls on the same pipe.
void (*p_destroy)(void *);
- /*
- * p_send sends the message. If the message cannot be received, then
- * the caller may try again with the same message (or free it). If the
- * call succeeds, then the transport has taken ownership of the message,
- * and the caller may not use it again. The transport will have the
- * responsibility to free the message (nng_msg_free()) when it is
- * finished with it.
- */
- int (*p_send)(void *, nng_msg_t);
-
- /*
- * p_recv recvs the message. This is a blocking operation, and a read
- * will be performed even for cases where no data is expected. This
- * allows the socket to detect a closed socket, by the returned error
- * NNG_ECLOSED. Note that the closed socket condition can arise as either
- * a result of a remote peer closing the connection, or a synchronous
- * call to p_close.
- */
- int (*p_recv)(void *, nng_msg_t *);
-
- /*
- * p_close closes the pipe. Further recv or send operations should
- * return back NNG_ECLOSED.
- */
+ // p_send sends the message. If the message cannot be received, then
+ // the caller may try again with the same message (or free it). If
+ // the call succeeds, then the transport has taken ownership of the
+ // message, and the caller may not use it again. The transport will
+ // have the responsibility to free the message (nng_msg_free()) when
+ // it is finished with it.
+ int (*p_send)(void *, nni_msg *);
+
+ // p_recv recvs the message. This is a blocking operation, and a read
+ // will be performed even for cases where no data is expected. This
+ // allows the socket to detect a closed socket, by the returned error
+ // NNG_ECLOSED. Note that the closed socket condition can arise as
+ // either a result of a remote peer closing the connection, or a
+ // synchronous call to p_close.
+ int (*p_recv)(void *, nng_msg **);
+
+ // p_close closes the pipe. Further recv or send operations should
+ // return back NNG_ECLOSED.
void (*p_close)(void *);
- /*
- * p_peer returns the peer protocol. This may arrive in whatever
- * transport specific manner is appropriate.
- */
+ // p_peer returns the peer protocol. This may arrive in whatever
+ // transport specific manner is appropriate.
uint16_t (*p_peer)(void *);
- /*
- * p_getopt gets an pipe (transport-specific) property. These values
- * may not be changed once the pipe is created.
- */
+ // p_getopt gets an pipe (transport-specific) property. These values
+ // may not be changed once the pipe is created.
int (*p_getopt)(void *, int, void *, size_t *);
};
-/*
- * These APIs are used by the framework internally, and not for use by
- * transport implementations.
- */
-extern struct nni_transport *nni_transport_find(const char *);
+// These APIs are used by the framework internally, and not for use by
+// transport implementations.
+extern nni_transport *nni_transport_find(const char *);
extern void nni_transport_init(void);
extern void nni_transport_fini(void);
-#endif /* CORE_TRANSPORT_H */
+#endif // CORE_TRANSPORT_H