From 1adefe3879b211a47a784f477d56a9416ae72254 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 12 Dec 2016 03:42:26 -0800 Subject: New inproc transport. Lots of supporting changes. --- src/core/defs.h | 33 ++++ src/core/list.c | 36 ++-- src/core/list.h | 28 +-- src/core/message.c | 4 +- src/core/msgqueue.c | 229 +++++++++++++++++++++++++ src/core/msgqueue.h | 73 ++++++++ src/core/msqueue.c | 231 ------------------------- src/core/nng_impl.h | 88 ++-------- src/core/panic.c | 2 +- src/core/panic.h | 37 ++++ src/core/platform.h | 142 +++++++++++++++ src/core/snprintf.c | 2 +- src/core/snprintf.h | 35 ++++ src/core/socket.c | 4 +- src/core/transport.h | 117 +++++++++++++ src/nng.h | 8 + src/platform/platform.h | 142 --------------- src/transport/inproc/inproc.c | 391 ++++++++++++++++++++++++++++++++++++++++++ 18 files changed, 1116 insertions(+), 486 deletions(-) create mode 100644 src/core/defs.h create mode 100644 src/core/msgqueue.c create mode 100644 src/core/msgqueue.h delete mode 100644 src/core/msqueue.c create mode 100644 src/core/panic.h create mode 100644 src/core/platform.h create mode 100644 src/core/snprintf.h create mode 100644 src/core/transport.h delete mode 100644 src/platform/platform.h create mode 100644 src/transport/inproc/inproc.c (limited to 'src') diff --git a/src/core/defs.h b/src/core/defs.h new file mode 100644 index 00000000..f747f901 --- /dev/null +++ b/src/core/defs.h @@ -0,0 +1,33 @@ +/* + * Copyright 2016 Garrett D'Amore + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef CORE_DEFS_H +#define CORE_DEFS_H + +/* + * C compilers may get unhappy when named arguments are not used. While + * there are things like __attribute__((unused)) which are arguably + * superior, support for such are not universal. + */ +#define NNI_ARG_UNUSED(x) ((void)x); + +#endif /* NNG_IMPL_H */ diff --git a/src/core/list.c b/src/core/list.c index f59c75e0..d81c6379 100644 --- a/src/core/list.c +++ b/src/core/list.c @@ -31,12 +31,12 @@ */ #define NODE(list, item) \ - (nni_list_node_t)(void *)(((char *)item) + list->ll_offset) + (nni_list_node_t *)(void *)(((char *)item) + list->ll_offset) #define ITEM(list, node) \ (void *)(((char *)node) - list->ll_offset) void -nni_list_init_offset(nni_list_t list, size_t offset) +nni_list_init_offset(nni_list_t *list, size_t offset) { list->ll_offset = offset; list->ll_head.ln_next = &list->ll_head; @@ -44,9 +44,9 @@ nni_list_init_offset(nni_list_t list, size_t offset) } void * -nni_list_first(nni_list_t list) +nni_list_first(nni_list_t *list) { - nni_list_node_t node = list->ll_head.ln_next; + nni_list_node_t *node = list->ll_head.ln_next; if (node == &list->ll_head) { return (NULL); } @@ -54,9 +54,9 @@ nni_list_first(nni_list_t list) } void * -nni_list_last(nni_list_t list) +nni_list_last(nni_list_t *list) { - nni_list_node_t node = list->ll_head.ln_prev; + nni_list_node_t *node = list->ll_head.ln_prev; if (node == &list->ll_head) { return (NULL); } @@ -64,9 +64,9 @@ nni_list_last(nni_list_t list) } void -nni_list_append(nni_list_t list, void *item) +nni_list_append(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); node->ln_prev = list->ll_head.ln_prev; node->ln_next = &list->ll_head; @@ -74,9 +74,9 @@ nni_list_append(nni_list_t list, void *item) node->ln_prev->ln_next = node; } void -nni_list_prepend(nni_list_t list, void *item) +nni_list_prepend(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); node->ln_next = list->ll_head.ln_next; node->ln_prev = &list->ll_head; @@ -85,9 +85,9 @@ nni_list_prepend(nni_list_t list, void *item) } void * -nni_list_next(nni_list_t list, void *item) +nni_list_next(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); if ((node = node->ln_next) == &list->ll_head) { return (NULL); @@ -96,9 +96,9 @@ nni_list_next(nni_list_t list, void *item) } void * -nni_list_prev(nni_list_t list, void *item) +nni_list_prev(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); if ((node = node->ln_prev) == &list->ll_head) { return (NULL); @@ -107,16 +107,16 @@ nni_list_prev(nni_list_t list, void *item) } void -nni_list_remove(nni_list_t list, void *item) +nni_list_remove(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); node->ln_prev->ln_next = node->ln_next; node->ln_next->ln_prev = node->ln_prev; } void -nni_list_node_init(nni_list_t list, void *item) +nni_list_node_init(nni_list_t *list, void *item) { - nni_list_node_t node = NODE(list, item); + nni_list_node_t *node = NODE(list, item); node->ln_prev = node->ln_next = NULL; } diff --git a/src/core/list.h b/src/core/list.h index c55870cb..f12087a2 100644 --- a/src/core/list.h +++ b/src/core/list.h @@ -32,23 +32,25 @@ typedef struct nni_list_node { struct nni_list_node *ln_next; struct nni_list_node *ln_prev; -} *nni_list_node_t; +} nni_list_node_t; typedef struct nni_list { struct nni_list_node ll_head; size_t ll_offset; -} *nni_list_t; +} nni_list_t; -extern void nni_list_init_offset(nni_list_t list, size_t offset); +extern void nni_list_init_offset(nni_list_t *list, size_t offset); #define NNI_LIST_INIT(list, type, field) \ - nni_list_init_offset(list, type, (size_t)&((type *)0)->field) -extern void *nni_list_first(nni_list_t list); -extern void *nni_list_last(nni_list_t list); -extern void nni_list_append(nni_list_t list, void *item); -extern void nni_list_prepend(nni_list_t list, void *item); -extern void *nni_list_next(nni_list_t list, void *item); -extern void *nni_list_prev(nni_list_t list, void *item); -extern void nni_list_remove(nni_list_t list, void *item); -extern void nni_list_node_init(nni_list_t, void *); + nni_list_init_offset(list, offsetof (type, field)) +extern void *nni_list_first(nni_list_t *); +extern void *nni_list_last(nni_list_t *); +extern void nni_list_append(nni_list_t *, void *); +extern void nni_list_prepend(nni_list_t *, void *); +extern void *nni_list_next(nni_list_t *, void *); +extern void *nni_list_prev(nni_list_t *, void *); +extern void nni_list_remove(nni_list_t *, void *); +extern void nni_list_node_init(nni_list_t *, void *); +#define NNI_LIST_FOREACH(l, it) \ + for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it)) -#endif /* CORE_MSQUEUE_H */ +#endif /* CORE_LIST_H */ diff --git a/src/core/message.c b/src/core/message.c index 82f58dc5..581f7683 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -23,9 +23,7 @@ #include #include -#include "../nng.h" - -#include "nng_impl.h" +#include "core/nng_impl.h" /* * Message API. diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c new file mode 100644 index 00000000..9f168878 --- /dev/null +++ b/src/core/msgqueue.c @@ -0,0 +1,229 @@ +/* + * Copyright 2016 Garrett D'Amore + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "nng_impl.h" + +/* + * Message queue. These operate in some respects like Go channels, + * but as we have access to the internals, we have made some fundamental + * differences and improvements. For example, these can grow, and either + * side can close, and they may be closed more than once. + */ + +struct nni_msgqueue { + nni_mutex_t mq_lock; + nni_cond_t mq_readable; + nni_cond_t mq_writeable; + int mq_cap; + int mq_len; + int mq_get; + int mq_put; + int mq_closed; + nng_msg_t *mq_msgs; +}; + +int +nni_msgqueue_create(nni_msgqueue_t *mqp, int cap) +{ + struct nni_msgqueue *mq; + int rv; + + if (cap < 1) { + return (NNG_EINVAL); + } + if ((mq = nni_alloc(sizeof (*mq))) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mutex_create(&mq->mq_lock)) != 0) { + nni_free(mq, sizeof (*mq)); + return (rv); + } + if ((rv = nni_cond_create(&mq->mq_readable, mq->mq_lock)) != 0) { + nni_mutex_destroy(mq->mq_lock); + nni_free(mq, sizeof (*mq)); + return (NNG_ENOMEM); + } + if ((rv = nni_cond_create(&mq->mq_writeable, mq->mq_lock)) != 0) { + nni_cond_destroy(mq->mq_readable); + nni_mutex_destroy(mq->mq_lock); + return (NNG_ENOMEM); + } + if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) { + nni_cond_destroy(mq->mq_writeable); + nni_cond_destroy(mq->mq_readable); + nni_mutex_destroy(mq->mq_lock); + return (NNG_ENOMEM); + } + + mq->mq_cap = cap; + mq->mq_len = 0; + mq->mq_get = 0; + mq->mq_put = 0; + mq->mq_closed = 0; + *mqp = mq; + + return (0); +} + +void +nni_msgqueue_destroy(nni_msgqueue_t mq) +{ + nng_msg_t msg; + + nni_cond_destroy(mq->mq_writeable); + nni_cond_destroy(mq->mq_readable); + nni_mutex_destroy(mq->mq_lock); + + /* Free any orphaned messages. */ + while (mq->mq_len > 0) { + msg = mq->mq_msgs[mq->mq_get]; + mq->mq_get++; + if (mq->mq_get > mq->mq_cap) { + mq->mq_get = 0; + } + mq->mq_len--; + nng_msg_free(msg); + } + + nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t)); + nni_free(mq, sizeof (*mq)); +} + +int +nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) +{ + uint64_t expire, now; + + if (tmout > 0) { + expire = nni_clock() + tmout; + } + + nni_mutex_enter(mq->mq_lock); + + while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap)) { + if (tmout == 0) { + nni_mutex_exit(mq->mq_lock); + return (NNG_EAGAIN); + } + + if (tmout < 0) { + (void) nni_cond_wait(mq->mq_writeable); + continue; + } + + now = nni_clock(); + if (now >= expire) { + nni_mutex_exit(mq->mq_lock); + return (NNG_ETIMEDOUT); + } + (void) nni_cond_timedwait(mq->mq_writeable, (expire - now)); + } + + if (mq->mq_closed) { + nni_mutex_exit(mq->mq_lock); + return (NNG_ECLOSED); + } + + mq->mq_msgs[mq->mq_put] = msg; + mq->mq_put++; + if (mq->mq_put == mq->mq_cap) { + mq->mq_put = 0; + } + mq->mq_len++; + if (mq->mq_len == 1) { + (void) nni_cond_signal(mq->mq_readable); + } + nni_mutex_exit(mq->mq_lock); + return (0); +} + +int +nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) +{ + uint64_t expire, now; + + if (tmout > 0) { + expire = nni_clock() + tmout; + } + + nni_mutex_enter(mq->mq_lock); + + while ((!mq->mq_closed) && (mq->mq_len == 0)) { + if (tmout == 0) { + nni_mutex_exit(mq->mq_lock); + return (NNG_EAGAIN); + } + + if (tmout < 0) { + (void) nni_cond_wait(mq->mq_readable); + continue; + } + + now = nni_clock(); + if (now >= expire) { + nni_mutex_exit(mq->mq_lock); + return (NNG_ETIMEDOUT); + } + (void) nni_cond_timedwait(mq->mq_readable, (expire - now)); + } + + if (mq->mq_closed) { + nni_mutex_exit(mq->mq_lock); + return (NNG_ECLOSED); + } + + *msgp = mq->mq_msgs[mq->mq_get]; + mq->mq_len--; + mq->mq_get++; + if (mq->mq_get == mq->mq_cap) { + mq->mq_get = 0; + } + mq->mq_len++; + if (mq->mq_len == (mq->mq_cap - 1)) { + (void) nni_cond_signal(mq->mq_writeable); + } + nni_mutex_exit(mq->mq_lock); + return (0); +} + +void +nni_msgqueue_close(nni_msgqueue_t mq) +{ + nng_msg_t msg; + + nni_mutex_enter(mq->mq_lock); + mq->mq_closed = 1; + nni_cond_broadcast(mq->mq_writeable); + nni_cond_broadcast(mq->mq_readable); + + /* Free the messages orphaned in the queue. */ + while (mq->mq_len > 0) { + msg = mq->mq_msgs[mq->mq_get]; + mq->mq_get++; + if (mq->mq_get > mq->mq_cap) { + mq->mq_get = 0; + } + mq->mq_len--; + nng_msg_free(msg); + } + nni_mutex_exit(mq->mq_lock); +} diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h new file mode 100644 index 00000000..9d856edd --- /dev/null +++ b/src/core/msgqueue.h @@ -0,0 +1,73 @@ +/* + * Copyright 2016 Garrett D'Amore + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef CORE_MSGQUEUE_H +#define CORE_MSGQUEUE_H + +#include "nng.h" + +/* + * Message queues. Message queues work in some ways like Go channels; + * they are a thread-safe way to pass messages between subsystems. + */ +typedef struct nni_msgqueue *nni_msgqueue_t; + +/* + * nni_msgqueue_create creates a message queue with the given capacity, + * which must be a positive number. It returns NNG_EINVAL if the capacity + * is invalid, or NNG_ENOMEM if resources cannot be allocated. + */ +extern int nni_msgqueue_create(nni_msgqueue_t *, int); + +/* + * nni_msgqueue_destroy destroys a message queue. It will also free any + * messages that may be in the queue. + */ +extern void nni_msgqueue_destroy(nni_msgqueue_t); + +/* + * nni_msgqueue_put attempts to put a message to the queue. It will wait + * for the timeout (us), if the value is positive. If the value is negative + * then it will wait forever. If the value is zero, it will just check, and + * return immediately whether a message can be put or not. Valid returns are + * NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot + * be placed after a time, or NNG_EAGAIN if the operation cannot succeed + * immediately and a zero timeout is specified. Note that timeout granularity + * may be limited -- for example Windows systems have a millisecond resolution + * timeout capability. + */ +extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int); + +/* + * nni_msgqueue_get gets the message from the queue, using a timeout just + * like nni_msgqueue_put. + */ +extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int); + +/* + * nni_msgqueue_close closes the queue. After this all operates on the + * message queue will return NNG_ECLOSED. Messages inside the queue + * are freed. Unlike closing a go channel, this operation is idempotent. + */ +extern void nni_msgqueue_close(nni_msgqueue_t); + +#endif /* CORE_MSQUEUE_H */ diff --git a/src/core/msqueue.c b/src/core/msqueue.c deleted file mode 100644 index 7becabfb..00000000 --- a/src/core/msqueue.c +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright 2016 Garrett D'Amore - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), - * to deal in the Software without restriction, including without limitation - * the rights to use, copy, modify, merge, publish, distribute, sublicense, - * and/or sell copies of the Software, and to permit persons to whom - * the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included - * in all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL - * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS - * IN THE SOFTWARE. - */ - -#include "../nng.h" - -#include "nng_impl.h" - -/* - * Message queue. These operate in some respects like Go channels, - * but as we have access to the internals, we have made some fundamental - * differences and improvements. For example, these can grow, and either - * side can close, and they may be closed more than once. - */ - -struct nni_msgqueue { - nni_mutex_t mq_lock; - nni_cond_t mq_readable; - nni_cond_t mq_writeable; - int mq_cap; - int mq_len; - int mq_get; - int mq_put; - int mq_closed; - nng_msg_t *mq_msgs; -}; - -int -nni_msgqueue_create(nni_msgqueue_t *mqp, int cap) -{ - struct nni_msgqueue *mq; - int rv; - - if (cap < 1) { - return (NNG_EINVAL); - } - if ((mq = nni_alloc(sizeof (*mq))) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_mutex_create(&mq->mq_lock)) != 0) { - nni_free(mq, sizeof (*mq)); - return (rv); - } - if ((rv = nni_cond_create(&mq->mq_readable, mq->mq_lock)) != 0) { - nni_mutex_destroy(mq->mq_lock); - nni_free(mq, sizeof (*mq)); - return (NNG_ENOMEM); - } - if ((rv = nni_cond_create(&mq->mq_writeable, mq->mq_lock)) != 0) { - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); - return (NNG_ENOMEM); - } - if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) { - nni_cond_destroy(mq->mq_writeable); - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); - return (NNG_ENOMEM); - } - - mq->mq_cap = cap; - mq->mq_len = 0; - mq->mq_get = 0; - mq->mq_put = 0; - mq->mq_closed = 0; - *mqp = mq; - - return (0); -} - -void -nni_msgqueue_destroy(nni_msgqueue_t mq) -{ - nng_msg_t msg; - - nni_cond_destroy(mq->mq_writeable); - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); - - /* Free any orphaned messages. */ - while (mq->mq_len > 0) { - msg = mq->mq_msgs[mq->mq_get]; - mq->mq_get++; - if (mq->mq_get > mq->mq_cap) { - mq->mq_get = 0; - } - mq->mq_len--; - nng_msg_free(msg); - } - - nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t)); - nni_free(mq, sizeof (*mq)); -} - -int -nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout) -{ - uint64_t expire, now; - - if (tmout > 0) { - expire = nni_clock() + tmout; - } - - nni_mutex_enter(mq->mq_lock); - - while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap)) { - if (tmout == 0) { - nni_mutex_exit(mq->mq_lock); - return (NNG_EAGAIN); - } - - if (tmout < 0) { - (void) nni_cond_wait(mq->mq_writeable); - continue; - } - - now = nni_clock(); - if (now >= expire) { - nni_mutex_exit(mq->mq_lock); - return (NNG_ETIMEDOUT); - } - (void) nni_cond_timedwait(mq->mq_writeable, (expire - now)); - } - - if (mq->mq_closed) { - nni_mutex_exit(mq->mq_lock); - return (NNG_ECLOSED); - } - - mq->mq_msgs[mq->mq_put] = msg; - mq->mq_put++; - if (mq->mq_put == mq->mq_cap) { - mq->mq_put = 0; - } - mq->mq_len++; - if (mq->mq_len == 1) { - (void) nni_cond_signal(mq->mq_readable); - } - nni_mutex_exit(mq->mq_lock); - return (0); -} - -int -nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout) -{ - uint64_t expire, now; - - if (tmout > 0) { - expire = nni_clock() + tmout; - } - - nni_mutex_enter(mq->mq_lock); - - while ((!mq->mq_closed) && (mq->mq_len == 0)) { - if (tmout == 0) { - nni_mutex_exit(mq->mq_lock); - return (NNG_EAGAIN); - } - - if (tmout < 0) { - (void) nni_cond_wait(mq->mq_readable); - continue; - } - - now = nni_clock(); - if (now >= expire) { - nni_mutex_exit(mq->mq_lock); - return (NNG_ETIMEDOUT); - } - (void) nni_cond_timedwait(mq->mq_readable, (expire - now)); - } - - if (mq->mq_closed) { - nni_mutex_exit(mq->mq_lock); - return (NNG_ECLOSED); - } - - *msgp = mq->mq_msgs[mq->mq_get]; - mq->mq_len--; - mq->mq_get++; - if (mq->mq_get == mq->mq_cap) { - mq->mq_get = 0; - } - mq->mq_len++; - if (mq->mq_len == (mq->mq_cap - 1)) { - (void) nni_cond_signal(mq->mq_writeable); - } - nni_mutex_exit(mq->mq_lock); - return (0); -} - -void -nni_msgqueue_close(nni_msgqueue_t mq) -{ - nng_msg_t msg; - - nni_mutex_enter(mq->mq_lock); - mq->mq_closed = 1; - nni_cond_broadcast(mq->mq_writeable); - nni_cond_broadcast(mq->mq_readable); - - /* Free the messages orphaned in the queue. */ - while (mq->mq_len > 0) { - msg = mq->mq_msgs[mq->mq_get]; - mq->mq_get++; - if (mq->mq_get > mq->mq_cap) { - mq->mq_get = 0; - } - mq->mq_len--; - nng_msg_free(msg); - } - nni_mutex_exit(mq->mq_lock); -} diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 683e0df1..d728c652 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -20,90 +20,30 @@ * IN THE SOFTWARE. */ -#ifndef NNG_IMPL_H -#define NNG_IMPL_H +#ifndef CORE_NNG_IMPL_H +#define CORE_NNG_IMPL_H #include "nng.h" -#include "platform/platform.h" /* * Internal implementation things for NNG, common definitions, etc. + * All internal modules wind up including this file to avoid having + * to figure out which header(s) to include. * * Hopefully it should be clear by the name that this file and its contents * are *NOT* for use outside of this library. * * Symbols that are private to the library begin with the nni_ prefix, whereas - * those starting with nng_ are intended for external consumption. + * those starting with nng_ are intended for external consumption. The latter + * symbols should be found in the toplevel nng.h header. */ -/* - * C compilers may get unhappy when named arguments are not used. While - * there are things like __attribute__((unused)) which are arguably - * superior, support for such are not universal. - */ -#define NNI_ARG_UNUSED(x) ((void)x); - -/* - * We have our own snprintf, because some platforms lack this, while - * others need special handling. Ours just calls the vsnprintf version - * from the platform. - */ -extern void nni_snprintf(char *, size_t, const char *, ...); - -/* - * nni_panic is used to terminate the process with prejudice, and - * should only be called in the face of a critical programming error, - * or other situation where it would be unsafe to attempt to continue. - * As this crashes the program, it should never be used when factors outside - * the program can cause it, such as receiving protocol errors, or running - * out of memory. Its better in those cases to return an error to the - * program and let the caller handle the error situation. - */ -extern void nni_panic(const char *, ...); - -/* - * Message queues. Message queues work in some ways like Go channels; - * they are a thread-safe way to pass messages between subsystems. - */ -typedef struct nni_msgqueue *nni_msgqueue_t; - -/* - * nni_msgqueue_create creates a message queue with the given capacity, - * which must be a positive number. It returns NNG_EINVAL if the capacity - * is invalid, or NNG_ENOMEM if resources cannot be allocated. - */ -extern int nni_msgqueue_create(nni_msgqueue_t *, int); - -/* - * nni_msgqueue_destroy destroys a message queue. It will also free any - * messages that may be in the queue. - */ -extern void nni_msgqueue_destroy(nni_msgqueue_t); - -/* - * nni_msgqueue_put attempts to put a message to the queue. It will wait - * for the timeout (us), if the value is positive. If the value is negative - * then it will wait forever. If the value is zero, it will just check, and - * return immediately whether a message can be put or not. Valid returns are - * NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot - * be placed after a time, or NNG_EAGAIN if the operation cannot succeed - * immediately and a zero timeout is specified. Note that timeout granularity - * may be limited -- for example Windows systems have a millisecond resolution - * timeout capability. - */ -extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int); - -/* - * nni_msgqueue_get gets the message from the queue, using a timeout just - * like nni_msgqueue_put. - */ -extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int); - -/* - * nni_msgqueue_close closes the queue. After this all operates on the - * message queue will return NNG_ECLOSED. Messages inside the queue - * are freed. Unlike closing a go channel, this operation is idempotent. - */ -extern void nni_msgqueue_close(nni_msgqueue_t); +#include "core/defs.h" +#include "core/list.h" +#include "core/msgqueue.h" +#include "core/panic.h" +#include "core/snprintf.h" +#include "core/platform.h" +#include "core/transport.h" -#endif /* NNG_IMPL_H */ +#endif /* CORE_NNG_IMPL_H */ diff --git a/src/core/panic.c b/src/core/panic.c index 60c790ac..52dcb8c8 100644 --- a/src/core/panic.c +++ b/src/core/panic.c @@ -28,7 +28,7 @@ #include #endif -#include "nng_impl.h" +#include "core/nng_impl.h" /* * Panic handling. diff --git a/src/core/panic.h b/src/core/panic.h new file mode 100644 index 00000000..6deef964 --- /dev/null +++ b/src/core/panic.h @@ -0,0 +1,37 @@ +/* + * Copyright 2016 Garrett D'Amore + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef CORE_PANIC_H +#define CORE_PANIC_H + +/* + * nni_panic is used to terminate the process with prejudice, and + * should only be called in the face of a critical programming error, + * or other situation where it would be unsafe to attempt to continue. + * As this crashes the program, it should never be used when factors outside + * the program can cause it, such as receiving protocol errors, or running + * out of memory. Its better in those cases to return an error to the + * program and let the caller handle the error situation. + */ +extern void nni_panic(const char *, ...); + +#endif /* CORE_PANIC_H */ diff --git a/src/core/platform.h b/src/core/platform.h new file mode 100644 index 00000000..ea8f1378 --- /dev/null +++ b/src/core/platform.h @@ -0,0 +1,142 @@ +/* + * Copyright 2016 Garrett D'Amore + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef CORE_PLATFORM_H +#define CORE_PLATFORM_H + +/* + * We require some standard C header files. The only one of these that might + * be problematic is , which is required for C99. Older versions + * of the Windows compilers might not have this. However, latest versions of + * MS Studio have a functional . If this impacts you, just upgrade + * your tool chain. + */ +#include +#include +#include + +/* + * These are the APIs that a platform must implement to support nng. + */ + +/* + * nni_abort crashes the system; it should do whatever is appropriate + * for abnormal programs on the platform, such as calling abort(). + */ +void nni_abort(void); + +/* + * nni_vnsprintf is exactly like its POSIX counterpart. + * Some platforms (Windows!) need a special version of this. + */ +void nni_vsnprintf(char *, size_t, const char *, va_list); + +/* + * nni_debug_output is used to emit debug messages. Typically this is used + * during core debugging, or to emit panic messages. Message content will + * not contain newlines, but the output will add them. + */ +void nni_debug_out(const char *); + +/* + * nni_set_debug_output is used to redirect debug output; for example an + * application could replace the default output routine with one that sends + * it's output to syslog. If NULL is specified, then a default handler + * used instead. The handler should add any newlines to the output as + * required. The default handler writes to standard error. + */ +void nni_set_debug_out(void (*)(const char *)); + +/* + * nni_alloc allocates memory. In most cases this can just be malloc(). + * However, you may provide a different allocator, for example it is + * possible to use a slab allocator or somesuch. It is permissible for this + * to return NULL if memory cannot be allocated. + */ +void *nni_alloc(size_t); + +/* + * nni_free frees memory allocated with nni_alloc. It takes a size because + * some allocators do not track size, or can operate more efficiently if + * the size is provided with the free call. Examples of this are slab + * allocators like this found in Solaris/illumos (see libumem or kmem). + * This routine does nothing if supplied with a NULL pointer and zero size. + * Most implementations can just call free() here. + */ +void nni_free(void *, size_t); + +typedef struct nni_mutex *nni_mutex_t; +typedef struct nni_cond *nni_cond_t; + +/* + * Mutex handling. + */ +int nni_mutex_create(nni_mutex_t *); +void nni_mutex_destroy(nni_mutex_t); +void nni_mutex_enter(nni_mutex_t); +void nni_mutex_exit(nni_mutex_t); +int nni_mutex_tryenter(nni_mutex_t); +int nni_cond_create(nni_cond_t *, nni_mutex_t); +void nni_cond_destroy(nni_cond_t); + +/* + * nni_cond_broadcast wakes all waiters on the condition. This should be + * called with the lock held. + */ +void nni_cond_broadcast(nni_cond_t); + +/* + * nni_cond_signal wakes a signal waiter. + */ +void nni_cond_signal(nni_cond_t); + +/* + * nni_condwait waits for a wake up on the condition variable. The + * associated lock is atomically released and reacquired upon wake up. + * Callers can be spuriously woken. The associated lock must be held. + */ +void nni_cond_wait(nni_cond_t); + +/* + * nni_cond_timedwait waits for a wakeup on the condition variable, just + * as with nni_condwait, but it will also wake after the given number of + * microseconds has passed. (This is a relative timed wait.) Early + * wakeups are permitted, and the caller must take care to double check any + * conditions. The return value is 0 on success, or an error code, which + * can be NNG_ETIMEDOUT. Note that it is permissible to wait for longer + * than the timeout based on the resolution of your system clock. + */ +int nni_cond_timedwait(nni_cond_t, uint64_t); + +/* + * nn_clock returns a number of microseconds since some arbitrary time + * in the past. The values returned by nni_clock may be used with + * nni_cond_timedwait. + */ +uint64_t nni_clock(void); + +/* + * nni_usleep sleeps for the specified number of microseconds (at least). + */ +void nni_usleep(uint64_t); + +#endif /* CORE_PLATFORM_H */ diff --git a/src/core/snprintf.c b/src/core/snprintf.c index 3ca73e00..574d537a 100644 --- a/src/core/snprintf.c +++ b/src/core/snprintf.c @@ -24,7 +24,7 @@ #include #include -#include "nng_impl.h" +#include "core/nng_impl.h" void nni_snprintf(char *dst, size_t sz, const char *fmt, ...) diff --git a/src/core/snprintf.h b/src/core/snprintf.h new file mode 100644 index 00000000..b98b6ee7 --- /dev/null +++ b/src/core/snprintf.h @@ -0,0 +1,35 @@ +/* + * Copyright 2016 Garrett D'Amore + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef CORE_SNPRINTF_H +#define CORE_SNPRINTF_H + +#include + +/* + * We have our own snprintf, because some platforms lack this, while + * others need special handling. Ours just calls the vsnprintf version + * from the platform. + */ +extern void nni_snprintf(char *, size_t, const char *, ...); + +#endif /* CORE_SNPRINTF_H */ diff --git a/src/core/socket.c b/src/core/socket.c index e9963b74..6bb1d5d5 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -20,9 +20,7 @@ * IN THE SOFTWARE. */ -#include "../nng.h" - -#include "nng_impl.h" +#include "core/nng_impl.h" /* * Socket implementation. diff --git a/src/core/transport.h b/src/core/transport.h new file mode 100644 index 00000000..567cc0c5 --- /dev/null +++ b/src/core/transport.h @@ -0,0 +1,117 @@ +/* + * Copyright 2016 Garrett D'Amore + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef CORE_TRANSPORT_H +#define CORE_TRANSPORT_H + +/* + * Transport implementation details. Transports must implement the + * interfaces in this file. + */ + +struct nni_transport_ops { + /* + * tran_scheme is the transport scheme, such as "tcp" or "inproc". + */ + const char *tran_scheme; + + /* + * tran_ep_ops links our endpoint operations. + */ + const struct nni_endpt_ops *tran_ep_ops; + + /* + * tran_pipe_ops links our pipe operations. + */ + const struct nni_pipe_ops *tran_pipe_ops; +}; + +struct nni_endpt_ops { + /* + * ep_create creates a vanilla endpoint. The value created is + * used for the first argument for all other endpoint functions. + */ + int (*ep_create)(void **, const char *, uint16_t); + + /* + * ep_destroy frees the resources associated with the endpoint. + * The endpoint will already have been closed. + */ + void (*ep_destroy)(void *); + + /* + * ep_dial starts dialing, and creates a new pipe, + * which is returned in the final argument. It can return errors + * NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED, + * NNG_ETIMEDOUT, and NNG_EPROTO. + */ + int (*ep_dial)(void *, void **); + + /* + * ep_listen just does the bind() and listen() work, + * reserving the address but not creating any connections. + * It should return NNG_EADDRINUSE if the address is already + * taken. It can also return NNG_EBADADDR for an unsuitable + * address, or NNG_EACCESS for permission problems. + */ + int (*ep_listen)(void *); + + /* + * ep_accept accepts an inbound connection, and creates + * a transport pipe, which is returned in the final argument. + */ + int (*ep_accept)(void *, void **); + + /* + * ep_close stops the endpoint from operating altogether. It does + * not affect pipes that have already been created. + */ + void (*ep_close)(void *); + + /* ep_setopt sets an endpoint (transport-specific) option */ + int (*ep_setopt)(void *, int, const void *, size_t); + + /* ep_getopt gets an endpoint (transport-specific) option */ + int (*ep_getopt)(void *, int, void *, size_t *); +}; + +struct nni_pipe_ops { + /* p_destroy destroys the pipe */ + void (*p_destroy)(void *); + + /* p_send sends the message */ + int (*p_send)(void *, nng_msg_t); + + /* p_recv recvs the message */ + int (*p_recv)(void *, nng_msg_t *); + + /* p_close closes the pipe */ + void (*p_close)(void *); + + /* p_proto returns the peer protocol */ + uint16_t (*p_proto)(void *); + + /* p_getopt gets an pipe (transport-specific) property */ + int (*p_getopt)(void *, int, void *, size_t *); +}; + +#endif /* CORE_TRANSPORT_H */ diff --git a/src/nng.h b/src/nng.h index d398e18c..e20d3543 100644 --- a/src/nng.h +++ b/src/nng.h @@ -429,6 +429,14 @@ NNG_DECL int nng_device(nng_socket_t, nng_socket_t); #define NNG_ECONNREFUSED (-6) #define NNG_ECLOSED (-7) #define NNG_EAGAIN (-8) +#define NNG_ENOTSUP (-9) +#define NNG_EADDRINUSE (-10) + +/* + * Maximum length of a socket address. This includes the terminating NUL. + * This limit is built into other implementations, so do not change it. + */ +#define NNG_MAXADDRLEN (128) #ifdef __cplusplus } diff --git a/src/platform/platform.h b/src/platform/platform.h deleted file mode 100644 index 0d958c65..00000000 --- a/src/platform/platform.h +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright 2016 Garrett D'Amore - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), - * to deal in the Software without restriction, including without limitation - * the rights to use, copy, modify, merge, publish, distribute, sublicense, - * and/or sell copies of the Software, and to permit persons to whom - * the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included - * in all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL - * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS - * IN THE SOFTWARE. - */ - -#ifndef PLATFORM_H -#define PLATFORM_H - -/* - * We require some standard C header files. The only one of these that might - * be problematic is , which is required for C99. Older versions - * of the Windows compilers might not have this. However, latest versions of - * MS Studio have a functional . If this impacts you, just upgrade - * your tool chain. - */ -#include -#include -#include - -/* - * These are the APIs that a platform must implement to support nng. - */ - -/* - * nni_abort crashes the system; it should do whatever is appropriate - * for abnormal programs on the platform, such as calling abort(). - */ -void nni_abort(void); - -/* - * nni_vnsprintf is exactly like its POSIX counterpart. - * Some platforms (Windows!) need a special version of this. - */ -void nni_vsnprintf(char *, size_t, const char *, va_list); - -/* - * nni_debug_output is used to emit debug messages. Typically this is used - * during core debugging, or to emit panic messages. Message content will - * not contain newlines, but the output will add them. - */ -void nni_debug_out(const char *); - -/* - * nni_set_debug_output is used to redirect debug output; for example an - * application could replace the default output routine with one that sends - * it's output to syslog. If NULL is specified, then a default handler - * used instead. The handler should add any newlines to the output as - * required. The default handler writes to standard error. - */ -void nni_set_debug_out(void (*)(const char *)); - -/* - * nni_alloc allocates memory. In most cases this can just be malloc(). - * However, you may provide a different allocator, for example it is - * possible to use a slab allocator or somesuch. It is permissible for this - * to return NULL if memory cannot be allocated. - */ -void *nni_alloc(size_t); - -/* - * nni_free frees memory allocated with nni_alloc. It takes a size because - * some allocators do not track size, or can operate more efficiently if - * the size is provided with the free call. Examples of this are slab - * allocators like this found in Solaris/illumos (see libumem or kmem). - * This routine does nothing if supplied with a NULL pointer and zero size. - * Most implementations can just call free() here. - */ -void nni_free(void *, size_t); - -typedef struct nni_mutex *nni_mutex_t; -typedef struct nni_cond *nni_cond_t; - -/* - * Mutex handling. - */ -int nni_mutex_create(nni_mutex_t *); -void nni_mutex_destroy(nni_mutex_t); -void nni_mutex_enter(nni_mutex_t); -void nni_mutex_exit(nni_mutex_t); -int nni_mutex_tryenter(nni_mutex_t); -int nni_cond_create(nni_cond_t *, nni_mutex_t); -void nni_cond_destroy(nni_cond_t); - -/* - * nni_cond_broadcast wakes all waiters on the condition. This should be - * called with the lock held. - */ -void nni_cond_broadcast(nni_cond_t); - -/* - * nni_cond_signal wakes a signal waiter. - */ -void nni_cond_signal(nni_cond_t); - -/* - * nni_condwait waits for a wake up on the condition variable. The - * associated lock is atomically released and reacquired upon wake up. - * Callers can be spuriously woken. The associated lock must be held. - */ -void nni_cond_wait(nni_cond_t); - -/* - * nni_cond_timedwait waits for a wakeup on the condition variable, just - * as with nni_condwait, but it will also wake after the given number of - * microseconds has passed. (This is a relative timed wait.) Early - * wakeups are permitted, and the caller must take care to double check any - * conditions. The return value is 0 on success, or an error code, which - * can be NNG_ETIMEDOUT. Note that it is permissible to wait for longer - * than the timeout based on the resolution of your system clock. - */ -int nni_cond_timedwait(nni_cond_t, uint64_t); - -/* - * nn_clock returns a number of microseconds since some arbitrary time - * in the past. The values returned by nni_clock may be used with - * nni_cond_timedwait. - */ -uint64_t nni_clock(void); - -/* - * nni_usleep sleeps for the specified number of microseconds (at least). - */ -void nni_usleep(uint64_t); - -#endif /* PLATFORM_H */ diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c new file mode 100644 index 00000000..8186853d --- /dev/null +++ b/src/transport/inproc/inproc.c @@ -0,0 +1,391 @@ +/* + * Copyright 2016 Garrett D'Amore + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include +#include + +#include "core/nng_impl.h" + +/* + * Inproc transport. This just transports messages from one + * peer to another. + */ + +typedef struct inproc_pair *inproc_pair_t; +typedef struct inproc_pipe *inproc_pipe_t; +typedef struct inproc_ep *inproc_ep_t; + +typedef struct { + nni_mutex_t mx; + nni_cond_t cv; + nni_list_t eps; +} inproc_global_t; + +struct inproc_pipe { + const char *addr; + inproc_pair_t pair; + nni_msgqueue_t rq; + nni_msgqueue_t wq; + uint16_t peer; +}; + +struct inproc_pair { + nni_mutex_t mx; + int refcnt; + nni_msgqueue_t q[2]; + struct inproc_pipe pipe[2]; + char addr[NNG_MAXADDRLEN]; +}; + +struct inproc_ep { + char addr[NNG_MAXADDRLEN]; + int mode; + int closed; + nni_list_node_t node; + uint16_t proto; + void *cpipe; /* connected pipe (DIAL only) */ +}; + +#define INPROC_EP_IDLE 0 +#define INPROC_EP_DIAL 1 +#define INPROC_EP_LISTEN 2 + +/* + * Global inproc state - this contains the list of active endpoints + * which we use for coordinating rendezvous. + */ +static inproc_global_t inproc; + +void +inproc_pipe_close(void *arg) +{ + inproc_pipe_t pipe = arg; + + nni_msgqueue_close(pipe->rq); + nni_msgqueue_close(pipe->wq); +} + +static void +inproc_pair_destroy(inproc_pair_t pair) +{ + if (pair == NULL) { + return; + } + if (pair->q[0]) { + nni_msgqueue_destroy(pair->q[0]); + } + if (pair->q[1]) { + nni_msgqueue_destroy(pair->q[1]); + } + if (pair->mx) { + nni_mutex_destroy(pair->mx); + } + nni_free(pair, sizeof (*pair)); +} + +void +inproc_pipe_destroy(void *arg) +{ + inproc_pipe_t pipe = arg; + inproc_pair_t pair = pipe->pair; + + /* We could assert the pipe closed... */ + + /* If we are the last peer, then toss the pair structure. */ + nni_mutex_enter(pair->mx); + pair->refcnt--; + if (pair->refcnt == 0) { + nni_mutex_exit(pair->mx); + inproc_pair_destroy(pair); + } else { + nni_mutex_exit(pair->mx); + } +} + +int +inproc_pipe_send(void *arg, nng_msg_t msg) +{ + inproc_pipe_t pipe = arg; + + /* + * TODO: look at the message expiration and use that to set up + * the timeout. (And if it expired already, throw it away.) + */ + return (nni_msgqueue_put(pipe->wq, msg, -1)); +} + +int +inproc_pipe_recv(void *arg, nng_msg_t *msgp) +{ + inproc_pipe_t pipe = arg; + + return (nni_msgqueue_get(pipe->rq, msgp, -1)); +} + +uint16_t +inproc_pipe_peer(void *arg) +{ + inproc_pipe_t pipe = arg; + + return (pipe->peer); +} + +int +inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) +{ + inproc_pipe_t pipe = arg; + size_t len; + + switch (option) { + case NNG_OPT_LOCALADDR: + case NNG_OPT_REMOTEADDR: + len = strlen(pipe->addr) + 1; + if (len > *szp) { + (void) memcpy(buf, pipe->addr, *szp); + } else { + (void) memcpy(buf, pipe->addr, len); + } + *szp = len; + return (0); + } + return (NNG_ENOTSUP); +} + +int +inproc_ep_create(void **epp, const char *url, uint16_t proto) +{ + inproc_ep_t ep; + + if (strlen(url) > NNG_MAXADDRLEN-1) { + return (NNG_EINVAL); + } + if ((ep = nni_alloc(sizeof (*ep))) == NULL) { + return (NNG_ENOMEM); + } + + ep->mode = INPROC_EP_IDLE; + ep->closed = 0; + ep->proto = proto; + nni_list_node_init(&inproc.eps, ep); + nni_snprintf(ep->addr, sizeof (ep->addr), "%s", url); + *epp = ep; + return (0); +} + +void +inproc_ep_destroy(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +void +inproc_ep_close(void *arg) +{ + inproc_ep_t ep = arg; + + nni_mutex_enter(inproc.mx); + if (!ep->closed) { + ep->closed = 1; + nni_list_remove(&inproc.eps, ep); + nni_cond_broadcast(inproc.cv); + } + nni_mutex_exit(inproc.mx); +} +int +inproc_ep_dial(void *arg, void **pipep) +{ + inproc_ep_t ep = arg; + inproc_ep_t srch; + nni_list_t *list = &inproc.eps; + + if (ep->mode != INPROC_EP_IDLE) { + return (NNG_EINVAL); + } + nni_mutex_enter(inproc.mx); + NNI_LIST_FOREACH(list, srch) { + if (srch->mode != INPROC_EP_LISTEN) { + continue; + } + if (strcmp(srch->addr, ep->addr) == 0) { + break; + } + } + if (srch == NULL) { + /* No listeners available. */ + nni_mutex_exit(inproc.mx); + return (NNG_ECONNREFUSED); + } + ep->mode = INPROC_EP_DIAL; + nni_list_append(list, ep); + nni_cond_broadcast(inproc.cv); + for (;;) { + if (ep->closed) { + /* Closer will have removed us from list. */ + nni_mutex_exit(inproc.mx); + return (NNG_ECLOSED); + } + if (ep->cpipe != NULL) { + break; + } + nni_cond_wait(inproc.cv); + } + /* NB: The acceptor or closer removes us from the list. */ + ep->mode = INPROC_EP_IDLE; + *pipep = ep->cpipe; + nni_mutex_exit(inproc.mx); + return (ep->closed ? NNG_ECLOSED : 0); +} + +int +inproc_ep_listen(void *arg) +{ + inproc_ep_t ep = arg; + inproc_ep_t srch; + nni_list_t *list = &inproc.eps; + + if (ep->mode != INPROC_EP_IDLE) { + return (NNG_EINVAL); + } + nni_mutex_enter(inproc.mx); + if (ep->closed) { + nni_mutex_exit(inproc.mx); + return (NNG_ECLOSED); + } + NNI_LIST_FOREACH(list, srch) { + if (srch->mode != INPROC_EP_LISTEN) { + continue; + } + if (strcmp(srch->addr, ep->addr) == 0) { + nni_mutex_exit(inproc.mx); + return (NNG_EADDRINUSE); + } + } + ep->mode = INPROC_EP_LISTEN; + nni_list_append(list, ep); + nni_mutex_exit(inproc.mx); + return (0); +} + +int +inproc_ep_accept(void *arg, void **pipep) +{ + inproc_ep_t ep = arg; + inproc_ep_t srch; + inproc_pair_t pair; + nni_list_t *list = &inproc.eps; + int rv; + + nni_mutex_enter(inproc.mx); + if (ep->mode != INPROC_EP_LISTEN) { + nni_mutex_exit(inproc.mx); + return (NNG_EINVAL); + } + for (;;) { + if (ep->closed) { + nni_mutex_exit(inproc.mx); + return (NNG_ECLOSED); + } + NNI_LIST_FOREACH(list, srch) { + if (srch->mode != INPROC_EP_DIAL) { + continue; + } + if (strcmp(srch->addr, ep->addr) == 0) { + break; + } + } + if (srch != NULL) { + break; + } + nni_cond_wait(inproc.cv); + } + if ((pair = nni_alloc(sizeof (*pair))) == NULL) { + nni_mutex_exit(inproc.mx); + return (NNG_ENOMEM); + } + if (((rv = nni_mutex_create(&pair->mx)) != 0) || + ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) || + ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) { + inproc_pair_destroy(pair); + } + nni_snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr); + pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0]; + pair->pipe[1].rq = pair->pipe[0].wq = pair->q[1]; + pair->pipe[0].pair = pair->pipe[1].pair = pair; + pair->pipe[0].addr = pair->pipe[1].addr = pair->addr; + pair->pipe[1].peer = srch->proto; + pair->pipe[0].peer = ep->proto; + pair->refcnt = 2; + srch->cpipe = &pair->pipe[0]; + *pipep = &pair->pipe[1]; + nni_cond_broadcast(inproc.cv); + + nni_mutex_exit(inproc.mx); + + return (0); +} + +int +nni_inproc_init(void) +{ + int rv; + if ((rv = nni_mutex_create(&inproc.mx)) != 0) { + return (rv); + } + if ((rv = nni_cond_create(&inproc.cv, inproc.mx)) != 0) { + nni_mutex_destroy(inproc.mx); + return (rv); + } + NNI_LIST_INIT(&inproc.eps, struct inproc_ep, node); + /* XXX: nni_register_transport(); */ + return (0); +} + +void +nni_inproc_term(void) +{ +} + +static struct nni_pipe_ops inproc_pipe_ops = { + inproc_pipe_destroy, + inproc_pipe_send, + inproc_pipe_recv, + inproc_pipe_close, + inproc_pipe_peer, + inproc_pipe_getopt, +}; + +static struct nni_endpt_ops inproc_ep_ops = { + inproc_ep_create, + inproc_ep_destroy, + inproc_ep_dial, + inproc_ep_listen, + inproc_ep_accept, + inproc_ep_close, + NULL, /* inproc_ep_setopt */ + NULL, /* inproc_ep_getopt */ +}; + +struct nni_transport_ops inproc_tran_ops = { + "inproc", /* tran_scheme */ + &inproc_ep_ops, + &inproc_pipe_ops, +}; -- cgit v1.2.3-70-g09d2