diff options
| -rw-r--r-- | src/core/defs.h | 33 | ||||
| -rw-r--r-- | src/core/list.c | 36 | ||||
| -rw-r--r-- | src/core/list.h | 28 | ||||
| -rw-r--r-- | src/core/message.c | 4 | ||||
| -rw-r--r-- | src/core/msgqueue.c (renamed from src/core/msqueue.c) | 2 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 73 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 88 | ||||
| -rw-r--r-- | src/core/panic.c | 2 | ||||
| -rw-r--r-- | src/core/panic.h | 37 | ||||
| -rw-r--r-- | src/core/platform.h (renamed from src/platform/platform.h) | 6 | ||||
| -rw-r--r-- | src/core/snprintf.c | 2 | ||||
| -rw-r--r-- | src/core/snprintf.h | 35 | ||||
| -rw-r--r-- | src/core/socket.c | 4 | ||||
| -rw-r--r-- | src/core/transport.h | 117 | ||||
| -rw-r--r-- | src/nng.h | 8 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 391 |
16 files changed, 748 insertions, 118 deletions
diff --git a/src/core/defs.h b/src/core/defs.h new file mode 100644 index 00000000..f747f901 --- /dev/null +++ b/src/core/defs.h @@ -0,0 +1,33 @@ +/* + * Copyright 2016 Garrett D'Amore <garrett@damore.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef CORE_DEFS_H +#define CORE_DEFS_H + +/* + * C compilers may get unhappy when named arguments are not used. While + * there are things like __attribute__((unused)) which are arguably + * superior, support for such are not universal. + */ +#define NNI_ARG_UNUSED(x) ((void)x); + +#endif /* NNG_IMPL_H */ diff --git a/src/core/list.c b/src/core/list.c index f59c75e0..d81c6379 100644 --- a/src/core/list.c +++ b/src/core/list.c @@ -31,12 +31,12 @@ */ #define NODE(list, item) \ - (nni_list_node_t)(void *)(((char *)item) + list->ll_offset) + (nni_list_node_t *)(void *)(((char *)item) + list->ll_offset) #define ITEM(list, node) \ (void *)(((char *)node) - list->ll_offset) void -nni_list_init_offset(nni_list_t list, size_t offset) +nni_list_init_offset(nni_list_t *list, size_t offset) { list->ll_offset = offset; list->ll_head.ln_next = &list->ll_head; @@ -44,9 +44,9 @@ nni_list_init_offset(nni_list_t list, size_t offset) } void * -nni_list_first(nni_list_t list) +nni_list_first(nni_list_t *list) { - nni_list_node_t node = list->ll_head.ln_next; + nni_list_node_t *node = list->ll_head.ln_next; if (node == &list->ll_head) { return (NULL); } @@ -54,9 +54,9 @@ nni_list_first(nni_list_t list) } void * -nni_list_last(nni_list_t list) +nni_list_last(nni_list_t *list) { - nni_list_node_t node = list->ll_head.ln_prev; + nni_list_node_t *node = list->ll_head.ln_prev; if (node == &list->ll_head) { return (NULL); } @@ -64,9 +64,9 @@ nni_list_last(nni_list_t list) } void -nni_list_append(nni_list_t list, void *item) +nni_list_append(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); node->ln_prev = list->ll_head.ln_prev; node->ln_next = &list->ll_head; @@ -74,9 +74,9 @@ nni_list_append(nni_list_t list, void *item) node->ln_prev->ln_next = node; } void -nni_list_prepend(nni_list_t list, void *item) +nni_list_prepend(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); node->ln_next = list->ll_head.ln_next; node->ln_prev = &list->ll_head; @@ -85,9 +85,9 @@ nni_list_prepend(nni_list_t list, void *item) } void * -nni_list_next(nni_list_t list, void *item) +nni_list_next(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); if ((node = node->ln_next) == &list->ll_head) { return (NULL); @@ -96,9 +96,9 @@ nni_list_next(nni_list_t list, void *item) } void * -nni_list_prev(nni_list_t list, void *item) +nni_list_prev(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); if ((node = node->ln_prev) == &list->ll_head) { return (NULL); @@ -107,16 +107,16 @@ nni_list_prev(nni_list_t list, void *item) } void -nni_list_remove(nni_list_t list, void *item) +nni_list_remove(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); node->ln_prev->ln_next = node->ln_next; node->ln_next->ln_prev = node->ln_prev; } void -nni_list_node_init(nni_list_t list, void *item) +nni_list_node_init(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); node->ln_prev = node->ln_next = NULL; } diff --git a/src/core/list.h b/src/core/list.h index c55870cb..f12087a2 100644 --- a/src/core/list.h +++ b/src/core/list.h @@ -32,23 +32,25 @@ typedef struct nni_list_node { struct nni_list_node *ln_next; struct nni_list_node *ln_prev; -} *nni_list_node_t; +} nni_list_node_t; typedef struct nni_list { struct nni_list_node ll_head; size_t ll_offset; -} *nni_list_t; +} nni_list_t; -extern void nni_list_init_offset(nni_list_t list, size_t offset); +extern void nni_list_init_offset(nni_list_t *list, size_t offset); #define NNI_LIST_INIT(list, type, field) \ - nni_list_init_offset(list, type, (size_t)&((type *)0)->field) -extern void *nni_list_first(nni_list_t list); -extern void *nni_list_last(nni_list_t list); -extern void nni_list_append(nni_list_t list, void *item); -extern void nni_list_prepend(nni_list_t list, void *item); -extern void *nni_list_next(nni_list_t list, void *item); -extern void *nni_list_prev(nni_list_t list, void *item); -extern void nni_list_remove(nni_list_t list, void *item); -extern void nni_list_node_init(nni_list_t, void *); + nni_list_init_offset(list, offsetof (type, field)) +extern void *nni_list_first(nni_list_t *); +extern void *nni_list_last(nni_list_t *); +extern void nni_list_append(nni_list_t *, void *); +extern void nni_list_prepend(nni_list_t *, void *); +extern void *nni_list_next(nni_list_t *, void *); +extern void *nni_list_prev(nni_list_t *, void *); +extern void nni_list_remove(nni_list_t *, void *); +extern void nni_list_node_init(nni_list_t *, void *); +#define NNI_LIST_FOREACH(l, it) \ + for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it)) -#endif /* CORE_MSQUEUE_H */ +#endif /* CORE_LIST_H */ diff --git a/src/core/message.c b/src/core/message.c index 82f58dc5..581f7683 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -23,9 +23,7 @@ #include <stdlib.h> #include <string.h> -#include "../nng.h" - -#include "nng_impl.h" +#include "core/nng_impl.h" /* * Message API. diff --git a/src/core/msqueue.c b/src/core/msgqueue.c index 7becabfb..9f168878 100644 --- a/src/core/msqueue.c +++ b/src/core/msgqueue.c @@ -20,8 +20,6 @@ * IN THE SOFTWARE. */ -#include "../nng.h" - #include "nng_impl.h" /* diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h new file mode 100644 index 00000000..9d856edd --- /dev/null +++ b/src/core/msgqueue.h @@ -0,0 +1,73 @@ +/* + * Copyright 2016 Garrett D'Amore <garrett@damore.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef CORE_MSGQUEUE_H +#define CORE_MSGQUEUE_H + +#include "nng.h" + +/* + * Message queues. Message queues work in some ways like Go channels; + * they are a thread-safe way to pass messages between subsystems. + */ +typedef struct nni_msgqueue *nni_msgqueue_t; + +/* + * nni_msgqueue_create creates a message queue with the given capacity, + * which must be a positive number. It returns NNG_EINVAL if the capacity + * is invalid, or NNG_ENOMEM if resources cannot be allocated. + */ +extern int nni_msgqueue_create(nni_msgqueue_t *, int); + +/* + * nni_msgqueue_destroy destroys a message queue. It will also free any + * messages that may be in the queue. + */ +extern void nni_msgqueue_destroy(nni_msgqueue_t); + +/* + * nni_msgqueue_put attempts to put a message to the queue. It will wait + * for the timeout (us), if the value is positive. If the value is negative + * then it will wait forever. If the value is zero, it will just check, and + * return immediately whether a message can be put or not. Valid returns are + * NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot + * be placed after a time, or NNG_EAGAIN if the operation cannot succeed + * immediately and a zero timeout is specified. Note that timeout granularity + * may be limited -- for example Windows systems have a millisecond resolution + * timeout capability. + */ +extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int); + +/* + * nni_msgqueue_get gets the message from the queue, using a timeout just + * like nni_msgqueue_put. + */ +extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int); + +/* + * nni_msgqueue_close closes the queue. After this all operates on the + * message queue will return NNG_ECLOSED. Messages inside the queue + * are freed. Unlike closing a go channel, this operation is idempotent. + */ +extern void nni_msgqueue_close(nni_msgqueue_t); + +#endif /* CORE_MSQUEUE_H */ diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 683e0df1..d728c652 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -20,90 +20,30 @@ * IN THE SOFTWARE. */ -#ifndef NNG_IMPL_H -#define NNG_IMPL_H +#ifndef CORE_NNG_IMPL_H +#define CORE_NNG_IMPL_H #include "nng.h" -#include "platform/platform.h" /* * Internal implementation things for NNG, common definitions, etc. + * All internal modules wind up including this file to avoid having + * to figure out which header(s) to include. * * Hopefully it should be clear by the name that this file and its contents * are *NOT* for use outside of this library. * * Symbols that are private to the library begin with the nni_ prefix, whereas - * those starting with nng_ are intended for external consumption. + * those starting with nng_ are intended for external consumption. The latter + * symbols should be found in the toplevel nng.h header. */ -/* - * C compilers may get unhappy when named arguments are not used. While - * there are things like __attribute__((unused)) which are arguably - * superior, support for such are not universal. - */ -#define NNI_ARG_UNUSED(x) ((void)x); - -/* - * We have our own snprintf, because some platforms lack this, while - * others need special handling. Ours just calls the vsnprintf version - * from the platform. - */ -extern void nni_snprintf(char *, size_t, const char *, ...); - -/* - * nni_panic is used to terminate the process with prejudice, and - * should only be called in the face of a critical programming error, - * or other situation where it would be unsafe to attempt to continue. - * As this crashes the program, it should never be used when factors outside - * the program can cause it, such as receiving protocol errors, or running - * out of memory. Its better in those cases to return an error to the - * program and let the caller handle the error situation. - */ -extern void nni_panic(const char *, ...); - -/* - * Message queues. Message queues work in some ways like Go channels; - * they are a thread-safe way to pass messages between subsystems. - */ -typedef struct nni_msgqueue *nni_msgqueue_t; - -/* - * nni_msgqueue_create creates a message queue with the given capacity, - * which must be a positive number. It returns NNG_EINVAL if the capacity - * is invalid, or NNG_ENOMEM if resources cannot be allocated. - */ -extern int nni_msgqueue_create(nni_msgqueue_t *, int); - -/* - * nni_msgqueue_destroy destroys a message queue. It will also free any - * messages that may be in the queue. - */ -extern void nni_msgqueue_destroy(nni_msgqueue_t); - -/* - * nni_msgqueue_put attempts to put a message to the queue. It will wait - * for the timeout (us), if the value is positive. If the value is negative - * then it will wait forever. If the value is zero, it will just check, and - * return immediately whether a message can be put or not. Valid returns are - * NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot - * be placed after a time, or NNG_EAGAIN if the operation cannot succeed - * immediately and a zero timeout is specified. Note that timeout granularity - * may be limited -- for example Windows systems have a millisecond resolution - * timeout capability. - */ -extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int); - -/* - * nni_msgqueue_get gets the message from the queue, using a timeout just - * like nni_msgqueue_put. - */ -extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int); - -/* - * nni_msgqueue_close closes the queue. After this all operates on the - * message queue will return NNG_ECLOSED. Messages inside the queue - * are freed. Unlike closing a go channel, this operation is idempotent. - */ -extern void nni_msgqueue_close(nni_msgqueue_t); +#include "core/defs.h" +#include "core/list.h" +#include "core/msgqueue.h" +#include "core/panic.h" +#include "core/snprintf.h" +#include "core/platform.h" +#include "core/transport.h" -#endif /* NNG_IMPL_H */ +#endif /* CORE_NNG_IMPL_H */ diff --git a/src/core/panic.c b/src/core/panic.c index 60c790ac..52dcb8c8 100644 --- a/src/core/panic.c +++ b/src/core/panic.c @@ -28,7 +28,7 @@ #include <execinfo.h> #endif -#include "nng_impl.h" +#include "core/nng_impl.h" /* * Panic handling. diff --git a/src/core/panic.h b/src/core/panic.h new file mode 100644 index 00000000..6deef964 --- /dev/null +++ b/src/core/panic.h @@ -0,0 +1,37 @@ +/* + * Copyright 2016 Garrett D'Amore <garrett@damore.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef CORE_PANIC_H +#define CORE_PANIC_H + +/* + * nni_panic is used to terminate the process with prejudice, and + * should only be called in the face of a critical programming error, + * or other situation where it would be unsafe to attempt to continue. + * As this crashes the program, it should never be used when factors outside + * the program can cause it, such as receiving protocol errors, or running + * out of memory. Its better in those cases to return an error to the + * program and let the caller handle the error situation. + */ +extern void nni_panic(const char *, ...); + +#endif /* CORE_PANIC_H */ diff --git a/src/platform/platform.h b/src/core/platform.h index 0d958c65..ea8f1378 100644 --- a/src/platform/platform.h +++ b/src/core/platform.h @@ -20,8 +20,8 @@ * IN THE SOFTWARE. */ -#ifndef PLATFORM_H -#define PLATFORM_H +#ifndef CORE_PLATFORM_H +#define CORE_PLATFORM_H /* * We require some standard C header files. The only one of these that might @@ -139,4 +139,4 @@ uint64_t nni_clock(void); */ void nni_usleep(uint64_t); -#endif /* PLATFORM_H */ +#endif /* CORE_PLATFORM_H */ diff --git a/src/core/snprintf.c b/src/core/snprintf.c index 3ca73e00..574d537a 100644 --- a/src/core/snprintf.c +++ b/src/core/snprintf.c @@ -24,7 +24,7 @@ #include <stdint.h> #include <stdlib.h> -#include "nng_impl.h" +#include "core/nng_impl.h" void nni_snprintf(char *dst, size_t sz, const char *fmt, ...) diff --git a/src/core/snprintf.h b/src/core/snprintf.h new file mode 100644 index 00000000..b98b6ee7 --- /dev/null +++ b/src/core/snprintf.h @@ -0,0 +1,35 @@ +/* + * Copyright 2016 Garrett D'Amore <garrett@damore.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef CORE_SNPRINTF_H +#define CORE_SNPRINTF_H + +#include <stddef.h> + +/* + * We have our own snprintf, because some platforms lack this, while + * others need special handling. Ours just calls the vsnprintf version + * from the platform. + */ +extern void nni_snprintf(char *, size_t, const char *, ...); + +#endif /* CORE_SNPRINTF_H */ diff --git a/src/core/socket.c b/src/core/socket.c index e9963b74..6bb1d5d5 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -20,9 +20,7 @@ * IN THE SOFTWARE. */ -#include "../nng.h" - -#include "nng_impl.h" +#include "core/nng_impl.h" /* * Socket implementation. diff --git a/src/core/transport.h b/src/core/transport.h new file mode 100644 index 00000000..567cc0c5 --- /dev/null +++ b/src/core/transport.h @@ -0,0 +1,117 @@ +/* + * Copyright 2016 Garrett D'Amore <garrett@damore.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef CORE_TRANSPORT_H +#define CORE_TRANSPORT_H + +/* + * Transport implementation details. Transports must implement the + * interfaces in this file. + */ + +struct nni_transport_ops { + /* + * tran_scheme is the transport scheme, such as "tcp" or "inproc". + */ + const char *tran_scheme; + + /* + * tran_ep_ops links our endpoint operations. + */ + const struct nni_endpt_ops *tran_ep_ops; + + /* + * tran_pipe_ops links our pipe operations. + */ + const struct nni_pipe_ops *tran_pipe_ops; +}; + +struct nni_endpt_ops { + /* + * ep_create creates a vanilla endpoint. The value created is + * used for the first argument for all other endpoint functions. + */ + int (*ep_create)(void **, const char *, uint16_t); + + /* + * ep_destroy frees the resources associated with the endpoint. + * The endpoint will already have been closed. + */ + void (*ep_destroy)(void *); + + /* + * ep_dial starts dialing, and creates a new pipe, + * which is returned in the final argument. It can return errors + * NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED, + * NNG_ETIMEDOUT, and NNG_EPROTO. + */ + int (*ep_dial)(void *, void **); + + /* + * ep_listen just does the bind() and listen() work, + * reserving the address but not creating any connections. + * It should return NNG_EADDRINUSE if the address is already + * taken. It can also return NNG_EBADADDR for an unsuitable + * address, or NNG_EACCESS for permission problems. + */ + int (*ep_listen)(void *); + + /* + * ep_accept accepts an inbound connection, and creates + * a transport pipe, which is returned in the final argument. + */ + int (*ep_accept)(void *, void **); + + /* + * ep_close stops the endpoint from operating altogether. It does + * not affect pipes that have already been created. + */ + void (*ep_close)(void *); + + /* ep_setopt sets an endpoint (transport-specific) option */ + int (*ep_setopt)(void *, int, const void *, size_t); + + /* ep_getopt gets an endpoint (transport-specific) option */ + int (*ep_getopt)(void *, int, void *, size_t *); +}; + +struct nni_pipe_ops { + /* p_destroy destroys the pipe */ + void (*p_destroy)(void *); + + /* p_send sends the message */ + int (*p_send)(void *, nng_msg_t); + + /* p_recv recvs the message */ + int (*p_recv)(void *, nng_msg_t *); + + /* p_close closes the pipe */ + void (*p_close)(void *); + + /* p_proto returns the peer protocol */ + uint16_t (*p_proto)(void *); + + /* p_getopt gets an pipe (transport-specific) property */ + int (*p_getopt)(void *, int, void *, size_t *); +}; + +#endif /* CORE_TRANSPORT_H */ @@ -429,6 +429,14 @@ NNG_DECL int nng_device(nng_socket_t, nng_socket_t); #define NNG_ECONNREFUSED (-6) #define NNG_ECLOSED (-7) #define NNG_EAGAIN (-8) +#define NNG_ENOTSUP (-9) +#define NNG_EADDRINUSE (-10) + +/* + * Maximum length of a socket address. This includes the terminating NUL. + * This limit is built into other implementations, so do not change it. + */ +#define NNG_MAXADDRLEN (128) #ifdef __cplusplus } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c new file mode 100644 index 00000000..8186853d --- /dev/null +++ b/src/transport/inproc/inproc.c @@ -0,0 +1,391 @@ +/* + * Copyright 2016 Garrett D'Amore <garrett@damore.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" + +/* + * Inproc transport. This just transports messages from one + * peer to another. + */ + +typedef struct inproc_pair *inproc_pair_t; +typedef struct inproc_pipe *inproc_pipe_t; +typedef struct inproc_ep *inproc_ep_t; + +typedef struct { + nni_mutex_t mx; + nni_cond_t cv; + nni_list_t eps; +} inproc_global_t; + +struct inproc_pipe { + const char *addr; + inproc_pair_t pair; + nni_msgqueue_t rq; + nni_msgqueue_t wq; + uint16_t peer; +}; + +struct inproc_pair { + nni_mutex_t mx; + int refcnt; + nni_msgqueue_t q[2]; + struct inproc_pipe pipe[2]; + char addr[NNG_MAXADDRLEN]; +}; + +struct inproc_ep { + char addr[NNG_MAXADDRLEN]; + int mode; + int closed; + nni_list_node_t node; + uint16_t proto; + void *cpipe; /* connected pipe (DIAL only) */ +}; + +#define INPROC_EP_IDLE 0 +#define INPROC_EP_DIAL 1 +#define INPROC_EP_LISTEN 2 + +/* + * Global inproc state - this contains the list of active endpoints + * which we use for coordinating rendezvous. + */ +static inproc_global_t inproc; + +void +inproc_pipe_close(void *arg) +{ + inproc_pipe_t pipe = arg; + + nni_msgqueue_close(pipe->rq); + nni_msgqueue_close(pipe->wq); +} + +static void +inproc_pair_destroy(inproc_pair_t pair) +{ + if (pair == NULL) { + return; + } + if (pair->q[0]) { + nni_msgqueue_destroy(pair->q[0]); + } + if (pair->q[1]) { + nni_msgqueue_destroy(pair->q[1]); + } + if (pair->mx) { + nni_mutex_destroy(pair->mx); + } + nni_free(pair, sizeof (*pair)); +} + +void +inproc_pipe_destroy(void *arg) +{ + inproc_pipe_t pipe = arg; + inproc_pair_t pair = pipe->pair; + + /* We could assert the pipe closed... */ + + /* If we are the last peer, then toss the pair structure. */ + nni_mutex_enter(pair->mx); + pair->refcnt--; + if (pair->refcnt == 0) { + nni_mutex_exit(pair->mx); + inproc_pair_destroy(pair); + } else { + nni_mutex_exit(pair->mx); + } +} + +int +inproc_pipe_send(void *arg, nng_msg_t msg) +{ + inproc_pipe_t pipe = arg; + + /* + * TODO: look at the message expiration and use that to set up + * the timeout. (And if it expired already, throw it away.) + */ + return (nni_msgqueue_put(pipe->wq, msg, -1)); +} + +int +inproc_pipe_recv(void *arg, nng_msg_t *msgp) +{ + inproc_pipe_t pipe = arg; + + return (nni_msgqueue_get(pipe->rq, msgp, -1)); +} + +uint16_t +inproc_pipe_peer(void *arg) +{ + inproc_pipe_t pipe = arg; + + return (pipe->peer); +} + +int +inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) +{ + inproc_pipe_t pipe = arg; + size_t len; + + switch (option) { + case NNG_OPT_LOCALADDR: + case NNG_OPT_REMOTEADDR: + len = strlen(pipe->addr) + 1; + if (len > *szp) { + (void) memcpy(buf, pipe->addr, *szp); + } else { + (void) memcpy(buf, pipe->addr, len); + } + *szp = len; + return (0); + } + return (NNG_ENOTSUP); +} + +int +inproc_ep_create(void **epp, const char *url, uint16_t proto) +{ + inproc_ep_t ep; + + if (strlen(url) > NNG_MAXADDRLEN-1) { + return (NNG_EINVAL); + } + if ((ep = nni_alloc(sizeof (*ep))) == NULL) { + return (NNG_ENOMEM); + } + + ep->mode = INPROC_EP_IDLE; + ep->closed = 0; + ep->proto = proto; + nni_list_node_init(&inproc.eps, ep); + nni_snprintf(ep->addr, sizeof (ep->addr), "%s", url); + *epp = ep; + return (0); +} + +void +inproc_ep_destroy(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +void +inproc_ep_close(void *arg) +{ + inproc_ep_t ep = arg; + + nni_mutex_enter(inproc.mx); + if (!ep->closed) { + ep->closed = 1; + nni_list_remove(&inproc.eps, ep); + nni_cond_broadcast(inproc.cv); + } + nni_mutex_exit(inproc.mx); +} +int +inproc_ep_dial(void *arg, void **pipep) +{ + inproc_ep_t ep = arg; + inproc_ep_t srch; + nni_list_t *list = &inproc.eps; + + if (ep->mode != INPROC_EP_IDLE) { + return (NNG_EINVAL); + } + nni_mutex_enter(inproc.mx); + NNI_LIST_FOREACH(list, srch) { + if (srch->mode != INPROC_EP_LISTEN) { + continue; + } + if (strcmp(srch->addr, ep->addr) == 0) { + break; + } + } + if (srch == NULL) { + /* No listeners available. */ + nni_mutex_exit(inproc.mx); + return (NNG_ECONNREFUSED); + } + ep->mode = INPROC_EP_DIAL; + nni_list_append(list, ep); + nni_cond_broadcast(inproc.cv); + for (;;) { + if (ep->closed) { + /* Closer will have removed us from list. */ + nni_mutex_exit(inproc.mx); + return (NNG_ECLOSED); + } + if (ep->cpipe != NULL) { + break; + } + nni_cond_wait(inproc.cv); + } + /* NB: The acceptor or closer removes us from the list. */ + ep->mode = INPROC_EP_IDLE; + *pipep = ep->cpipe; + nni_mutex_exit(inproc.mx); + return (ep->closed ? NNG_ECLOSED : 0); +} + +int +inproc_ep_listen(void *arg) +{ + inproc_ep_t ep = arg; + inproc_ep_t srch; + nni_list_t *list = &inproc.eps; + + if (ep->mode != INPROC_EP_IDLE) { + return (NNG_EINVAL); + } + nni_mutex_enter(inproc.mx); + if (ep->closed) { + nni_mutex_exit(inproc.mx); + return (NNG_ECLOSED); + } + NNI_LIST_FOREACH(list, srch) { + if (srch->mode != INPROC_EP_LISTEN) { + continue; + } + if (strcmp(srch->addr, ep->addr) == 0) { + nni_mutex_exit(inproc.mx); + return (NNG_EADDRINUSE); + } + } + ep->mode = INPROC_EP_LISTEN; + nni_list_append(list, ep); + nni_mutex_exit(inproc.mx); + return (0); +} + +int +inproc_ep_accept(void *arg, void **pipep) +{ + inproc_ep_t ep = arg; + inproc_ep_t srch; + inproc_pair_t pair; + nni_list_t *list = &inproc.eps; + int rv; + + nni_mutex_enter(inproc.mx); + if (ep->mode != INPROC_EP_LISTEN) { + nni_mutex_exit(inproc.mx); + return (NNG_EINVAL); + } + for (;;) { + if (ep->closed) { + nni_mutex_exit(inproc.mx); + return (NNG_ECLOSED); + } + NNI_LIST_FOREACH(list, srch) { + if (srch->mode != INPROC_EP_DIAL) { + continue; + } + if (strcmp(srch->addr, ep->addr) == 0) { + break; + } + } + if (srch != NULL) { + break; + } + nni_cond_wait(inproc.cv); + } + if ((pair = nni_alloc(sizeof (*pair))) == NULL) { + nni_mutex_exit(inproc.mx); + return (NNG_ENOMEM); + } + if (((rv = nni_mutex_create(&pair->mx)) != 0) || + ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) || + ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) { + inproc_pair_destroy(pair); + } + nni_snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr); + pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0]; + pair->pipe[1].rq = pair->pipe[0].wq = pair->q[1]; + pair->pipe[0].pair = pair->pipe[1].pair = pair; + pair->pipe[0].addr = pair->pipe[1].addr = pair->addr; + pair->pipe[1].peer = srch->proto; + pair->pipe[0].peer = ep->proto; + pair->refcnt = 2; + srch->cpipe = &pair->pipe[0]; + *pipep = &pair->pipe[1]; + nni_cond_broadcast(inproc.cv); + + nni_mutex_exit(inproc.mx); + + return (0); +} + +int +nni_inproc_init(void) +{ + int rv; + if ((rv = nni_mutex_create(&inproc.mx)) != 0) { + return (rv); + } + if ((rv = nni_cond_create(&inproc.cv, inproc.mx)) != 0) { + nni_mutex_destroy(inproc.mx); + return (rv); + } + NNI_LIST_INIT(&inproc.eps, struct inproc_ep, node); + /* XXX: nni_register_transport(); */ + return (0); +} + +void +nni_inproc_term(void) +{ +} + +static struct nni_pipe_ops inproc_pipe_ops = { + inproc_pipe_destroy, + inproc_pipe_send, + inproc_pipe_recv, + inproc_pipe_close, + inproc_pipe_peer, + inproc_pipe_getopt, +}; + +static struct nni_endpt_ops inproc_ep_ops = { + inproc_ep_create, + inproc_ep_destroy, + inproc_ep_dial, + inproc_ep_listen, + inproc_ep_accept, + inproc_ep_close, + NULL, /* inproc_ep_setopt */ + NULL, /* inproc_ep_getopt */ +}; + +struct nni_transport_ops inproc_tran_ops = { + "inproc", /* tran_scheme */ + &inproc_ep_ops, + &inproc_pipe_ops, +}; |
