aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-22 20:52:45 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-22 20:52:45 -0800
commitee45cbf4498a3c1d1868469bdb0c767d66c278e4 (patch)
treeb9116256f12a54c90f92bf5cf215f3d4c8152126 /src/core/msgqueue.c
parent718de1828cc5b5256511c5b723360d499ae21c8f (diff)
downloadnng-ee45cbf4498a3c1d1868469bdb0c767d66c278e4.tar.gz
nng-ee45cbf4498a3c1d1868469bdb0c767d66c278e4.tar.bz2
nng-ee45cbf4498a3c1d1868469bdb0c767d66c278e4.zip
Endpoint dialer implemented.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c46
1 files changed, 40 insertions, 6 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index b8d11e8c..266a8e26 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -18,6 +18,7 @@ struct nni_msgqueue {
nni_mutex mq_lock;
nni_cond mq_readable;
nni_cond mq_writeable;
+ nni_cond mq_drained;
int mq_cap;
int mq_len;
int mq_get;
@@ -52,7 +53,14 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap)
nni_mutex_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
+ if ((rv = nni_cond_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
+ nni_cond_fini(&mq->mq_writeable);
+ nni_cond_fini(&mq->mq_readable);
+ nni_mutex_fini(&mq->mq_lock);
+ return (NNG_ENOMEM);
+ }
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) {
+ nni_cond_fini(&mq->mq_drained);
nni_cond_fini(&mq->mq_writeable);
nni_cond_fini(&mq->mq_readable);
nni_mutex_fini(&mq->mq_lock);
@@ -75,6 +83,7 @@ nni_msgqueue_destroy(nni_msgqueue *mq)
{
nni_msg *msg;
+ nni_cond_fini(&mq->mq_drained);
nni_cond_fini(&mq->mq_writeable);
nni_cond_fini(&mq->mq_readable);
nni_mutex_fini(&mq->mq_lock);
@@ -153,7 +162,7 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
}
mq->mq_len++;
if (mq->mq_len == 1) {
- (void) nni_cond_signal(&mq->mq_readable);
+ nni_cond_signal(&mq->mq_readable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);
@@ -181,6 +190,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
// provide it, so that the reader can drain.
if (mq->mq_len == 0) {
if (mq->mq_closed) {
+ nni_cond_signal(&mq->mq_drained);
nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
}
@@ -198,9 +208,8 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
if (mq->mq_get == mq->mq_cap) {
mq->mq_get = 0;
}
- mq->mq_len++;
if (mq->mq_len == (mq->mq_cap - 1)) {
- (void) nni_cond_signal(&mq->mq_writeable);
+ nni_cond_signal(&mq->mq_writeable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);
@@ -258,10 +267,35 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire)
void
-nni_msgqueue_close(nni_msgqueue *mq)
+nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire)
{
- nni_msg *msg;
+ nni_mutex_enter(&mq->mq_lock);
+ mq->mq_closed = 1;
+ nni_cond_broadcast(&mq->mq_writeable);
+ nni_cond_broadcast(&mq->mq_readable);
+ while (mq->mq_len > 0) {
+ if (nni_cond_waituntil(&mq->mq_drained, expire) ==
+ NNG_ETIMEDOUT) {
+ break;
+ }
+ }
+ // If we timedout, free any remaining messages in the queue.
+ while (mq->mq_len > 0) {
+ nni_msg *msg = mq->mq_msgs[mq->mq_get];
+ mq->mq_get++;
+ if (mq->mq_get > mq->mq_cap) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ nni_msg_free(msg);
+ }
+ nni_mutex_exit(&mq->mq_lock);
+}
+
+void
+nni_msgqueue_close(nni_msgqueue *mq)
+{
nni_mutex_enter(&mq->mq_lock);
mq->mq_closed = 1;
nni_cond_broadcast(&mq->mq_writeable);
@@ -269,7 +303,7 @@ nni_msgqueue_close(nni_msgqueue *mq)
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
- msg = mq->mq_msgs[mq->mq_get];
+ nni_msg *msg = mq->mq_msgs[mq->mq_get];
mq->mq_get++;
if (mq->mq_get > mq->mq_cap) {
mq->mq_get = 0;