aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/msqueue.c120
-rw-r--r--src/core/nng_impl.h10
-rw-r--r--src/nng.h2
3 files changed, 129 insertions, 3 deletions
diff --git a/src/core/msqueue.c b/src/core/msqueue.c
index 3e4ade79..7becabfb 100644
--- a/src/core/msqueue.c
+++ b/src/core/msqueue.c
@@ -109,3 +109,123 @@ nni_msgqueue_destroy(nni_msgqueue_t mq)
nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t));
nni_free(mq, sizeof (*mq));
}
+
+int
+nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
+{
+ uint64_t expire, now;
+
+ if (tmout > 0) {
+ expire = nni_clock() + tmout;
+ }
+
+ nni_mutex_enter(mq->mq_lock);
+
+ while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap)) {
+ if (tmout == 0) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_EAGAIN);
+ }
+
+ if (tmout < 0) {
+ (void) nni_cond_wait(mq->mq_writeable);
+ continue;
+ }
+
+ now = nni_clock();
+ if (now >= expire) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_ETIMEDOUT);
+ }
+ (void) nni_cond_timedwait(mq->mq_writeable, (expire - now));
+ }
+
+ if (mq->mq_closed) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_ECLOSED);
+ }
+
+ mq->mq_msgs[mq->mq_put] = msg;
+ mq->mq_put++;
+ if (mq->mq_put == mq->mq_cap) {
+ mq->mq_put = 0;
+ }
+ mq->mq_len++;
+ if (mq->mq_len == 1) {
+ (void) nni_cond_signal(mq->mq_readable);
+ }
+ nni_mutex_exit(mq->mq_lock);
+ return (0);
+}
+
+int
+nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
+{
+ uint64_t expire, now;
+
+ if (tmout > 0) {
+ expire = nni_clock() + tmout;
+ }
+
+ nni_mutex_enter(mq->mq_lock);
+
+ while ((!mq->mq_closed) && (mq->mq_len == 0)) {
+ if (tmout == 0) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_EAGAIN);
+ }
+
+ if (tmout < 0) {
+ (void) nni_cond_wait(mq->mq_readable);
+ continue;
+ }
+
+ now = nni_clock();
+ if (now >= expire) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_ETIMEDOUT);
+ }
+ (void) nni_cond_timedwait(mq->mq_readable, (expire - now));
+ }
+
+ if (mq->mq_closed) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_ECLOSED);
+ }
+
+ *msgp = mq->mq_msgs[mq->mq_get];
+ mq->mq_len--;
+ mq->mq_get++;
+ if (mq->mq_get == mq->mq_cap) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len++;
+ if (mq->mq_len == (mq->mq_cap - 1)) {
+ (void) nni_cond_signal(mq->mq_writeable);
+ }
+ nni_mutex_exit(mq->mq_lock);
+ return (0);
+}
+
+void
+nni_msgqueue_close(nni_msgqueue_t mq)
+{
+ nng_msg_t msg;
+
+ nni_mutex_enter(mq->mq_lock);
+ mq->mq_closed = 1;
+ nni_cond_broadcast(mq->mq_writeable);
+ nni_cond_broadcast(mq->mq_readable);
+
+ /* Free the messages orphaned in the queue. */
+ while (mq->mq_len > 0) {
+ msg = mq->mq_msgs[mq->mq_get];
+ mq->mq_get++;
+ if (mq->mq_get > mq->mq_cap) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ nng_msg_free(msg);
+ }
+ nni_mutex_exit(mq->mq_lock);
+}
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index a62ac101..683e0df1 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -80,9 +80,6 @@ extern int nni_msgqueue_create(nni_msgqueue_t *, int);
*/
extern void nni_msgqueue_destroy(nni_msgqueue_t);
-extern int nni_msgqueue_len(nni_msgqueue_t);
-extern int nni_msgqueue_cap(nni_msgqueue_t);
-
/*
* nni_msgqueue_put attempts to put a message to the queue. It will wait
* for the timeout (us), if the value is positive. If the value is negative
@@ -102,4 +99,11 @@ extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int);
*/
extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int);
+/*
+ * nni_msgqueue_close closes the queue. After this all operates on the
+ * message queue will return NNG_ECLOSED. Messages inside the queue
+ * are freed. Unlike closing a go channel, this operation is idempotent.
+ */
+extern void nni_msgqueue_close(nni_msgqueue_t);
+
#endif /* NNG_IMPL_H */
diff --git a/src/nng.h b/src/nng.h
index 082d5735..d398e18c 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -427,6 +427,8 @@ NNG_DECL int nng_device(nng_socket_t, nng_socket_t);
#define NNG_EBUSY (-4)
#define NNG_ETIMEDOUT (-5)
#define NNG_ECONNREFUSED (-6)
+#define NNG_ECLOSED (-7)
+#define NNG_EAGAIN (-8)
#ifdef __cplusplus
}