aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-11 18:09:33 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-11 18:09:33 -0800
commitf6e715fb640ef72b30cbcc0d1882ef81115e96d8 (patch)
treef37aae87e945a288905b4b23970d3c1dbbf16867 /src/core
parent51023b0d0d46ec920ddc6ffdc41bcff242ff970d (diff)
downloadnng-f6e715fb640ef72b30cbcc0d1882ef81115e96d8.tar.gz
nng-f6e715fb640ef72b30cbcc0d1882ef81115e96d8.tar.bz2
nng-f6e715fb640ef72b30cbcc0d1882ef81115e96d8.zip
New msgqueue implementation, use CLOCK_MONOTONIC if available.
Start of socket definitions.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/msqueue.c111
-rw-r--r--src/core/nng_impl.h41
-rw-r--r--src/core/panic.c2
-rw-r--r--src/core/socket.c40
4 files changed, 192 insertions, 2 deletions
diff --git a/src/core/msqueue.c b/src/core/msqueue.c
new file mode 100644
index 00000000..3e4ade79
--- /dev/null
+++ b/src/core/msqueue.c
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2016 Garrett D'Amore <garrett@damore.org>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom
+ * the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included
+ * in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "../nng.h"
+
+#include "nng_impl.h"
+
+/*
+ * Message queue. These operate in some respects like Go channels,
+ * but as we have access to the internals, we have made some fundamental
+ * differences and improvements. For example, these can grow, and either
+ * side can close, and they may be closed more than once.
+ */
+
+struct nni_msgqueue {
+ nni_mutex_t mq_lock;
+ nni_cond_t mq_readable;
+ nni_cond_t mq_writeable;
+ int mq_cap;
+ int mq_len;
+ int mq_get;
+ int mq_put;
+ int mq_closed;
+ nng_msg_t *mq_msgs;
+};
+
+int
+nni_msgqueue_create(nni_msgqueue_t *mqp, int cap)
+{
+ struct nni_msgqueue *mq;
+ int rv;
+
+ if (cap < 1) {
+ return (NNG_EINVAL);
+ }
+ if ((mq = nni_alloc(sizeof (*mq))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_mutex_create(&mq->mq_lock)) != 0) {
+ nni_free(mq, sizeof (*mq));
+ return (rv);
+ }
+ if ((rv = nni_cond_create(&mq->mq_readable, mq->mq_lock)) != 0) {
+ nni_mutex_destroy(mq->mq_lock);
+ nni_free(mq, sizeof (*mq));
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_cond_create(&mq->mq_writeable, mq->mq_lock)) != 0) {
+ nni_cond_destroy(mq->mq_readable);
+ nni_mutex_destroy(mq->mq_lock);
+ return (NNG_ENOMEM);
+ }
+ if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) {
+ nni_cond_destroy(mq->mq_writeable);
+ nni_cond_destroy(mq->mq_readable);
+ nni_mutex_destroy(mq->mq_lock);
+ return (NNG_ENOMEM);
+ }
+
+ mq->mq_cap = cap;
+ mq->mq_len = 0;
+ mq->mq_get = 0;
+ mq->mq_put = 0;
+ mq->mq_closed = 0;
+ *mqp = mq;
+
+ return (0);
+}
+
+void
+nni_msgqueue_destroy(nni_msgqueue_t mq)
+{
+ nng_msg_t msg;
+
+ nni_cond_destroy(mq->mq_writeable);
+ nni_cond_destroy(mq->mq_readable);
+ nni_mutex_destroy(mq->mq_lock);
+
+ /* Free any orphaned messages. */
+ while (mq->mq_len > 0) {
+ msg = mq->mq_msgs[mq->mq_get];
+ mq->mq_get++;
+ if (mq->mq_get > mq->mq_cap) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ nng_msg_free(msg);
+ }
+
+ nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t));
+ nni_free(mq, sizeof (*mq));
+}
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index a25b87a4..a62ac101 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -61,4 +61,45 @@ extern void nni_snprintf(char *, size_t, const char *, ...);
*/
extern void nni_panic(const char *, ...);
+/*
+ * Message queues. Message queues work in some ways like Go channels;
+ * they are a thread-safe way to pass messages between subsystems.
+ */
+typedef struct nni_msgqueue *nni_msgqueue_t;
+
+/*
+ * nni_msgqueue_create creates a message queue with the given capacity,
+ * which must be a positive number. It returns NNG_EINVAL if the capacity
+ * is invalid, or NNG_ENOMEM if resources cannot be allocated.
+ */
+extern int nni_msgqueue_create(nni_msgqueue_t *, int);
+
+/*
+ * nni_msgqueue_destroy destroys a message queue. It will also free any
+ * messages that may be in the queue.
+ */
+extern void nni_msgqueue_destroy(nni_msgqueue_t);
+
+extern int nni_msgqueue_len(nni_msgqueue_t);
+extern int nni_msgqueue_cap(nni_msgqueue_t);
+
+/*
+ * nni_msgqueue_put attempts to put a message to the queue. It will wait
+ * for the timeout (us), if the value is positive. If the value is negative
+ * then it will wait forever. If the value is zero, it will just check, and
+ * return immediately whether a message can be put or not. Valid returns are
+ * NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot
+ * be placed after a time, or NNG_EAGAIN if the operation cannot succeed
+ * immediately and a zero timeout is specified. Note that timeout granularity
+ * may be limited -- for example Windows systems have a millisecond resolution
+ * timeout capability.
+ */
+extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int);
+
+/*
+ * nni_msgqueue_get gets the message from the queue, using a timeout just
+ * like nni_msgqueue_put.
+ */
+extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int);
+
#endif /* NNG_IMPL_H */
diff --git a/src/core/panic.c b/src/core/panic.c
index e993ddb9..60c790ac 100644
--- a/src/core/panic.c
+++ b/src/core/panic.c
@@ -28,8 +28,6 @@
#include <execinfo.h>
#endif
-#include "nng.h"
-
#include "nng_impl.h"
/*
diff --git a/src/core/socket.c b/src/core/socket.c
new file mode 100644
index 00000000..247c5640
--- /dev/null
+++ b/src/core/socket.c
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016 Garrett D'Amore <garrett@damore.org>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom
+ * the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included
+ * in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "../nng.h"
+
+#include "nng_impl.h"
+
+/*
+ * Socket implementation.
+ */
+
+struct nng_socket {
+ int s_proto;
+ nni_mutex_t s_mx;
+
+ /* uwq */
+ /* urq */
+ /* options */
+ /* pipes */
+ /* endpoints */
+};