aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c123
1 files changed, 91 insertions, 32 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index bc381cfc..344798d7 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -15,21 +15,24 @@
// side can close, and they may be closed more than once.
struct nni_msgq {
- nni_mtx mq_lock;
- nni_cv mq_readable;
- nni_cv mq_writeable;
- nni_cv mq_drained;
- int mq_cap;
- int mq_alloc; // alloc is cap + 2...
- int mq_len;
- int mq_get;
- int mq_put;
- int mq_closed;
- int mq_puterr;
- int mq_geterr;
- int mq_rwait; // readers waiting (unbuffered)
- int mq_wwait;
- nni_msg ** mq_msgs;
+ nni_mtx mq_lock;
+ nni_cv mq_readable;
+ nni_cv mq_writeable;
+ nni_cv mq_drained;
+ int mq_cap;
+ int mq_alloc; // alloc is cap + 2...
+ int mq_len;
+ int mq_get;
+ int mq_put;
+ int mq_closed;
+ int mq_puterr;
+ int mq_geterr;
+ int mq_rwait; // readers waiting (unbuffered)
+ int mq_wwait;
+ nni_msg ** mq_msgs;
+
+ nni_msgq_notify_fn mq_notify_fn;
+ void * mq_notify_arg;
};
int
@@ -54,31 +57,20 @@ nni_msgq_init(nni_msgq **mqp, int cap)
return (NNG_ENOMEM);
}
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
- nni_free(mq, sizeof (*mq));
- return (rv);
+ goto fail;
}
if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) {
- nni_mtx_fini(&mq->mq_lock);
- nni_free(mq, sizeof (*mq));
- return (NNG_ENOMEM);
+ goto fail;
}
if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) {
- nni_cv_fini(&mq->mq_readable);
- nni_mtx_fini(&mq->mq_lock);
- return (NNG_ENOMEM);
+ goto fail;
}
if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
- nni_cv_fini(&mq->mq_writeable);
- nni_cv_fini(&mq->mq_readable);
- nni_mtx_fini(&mq->mq_lock);
- return (NNG_ENOMEM);
+ goto fail;
}
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) {
- nni_cv_fini(&mq->mq_drained);
- nni_cv_fini(&mq->mq_writeable);
- nni_cv_fini(&mq->mq_readable);
- nni_mtx_fini(&mq->mq_lock);
- return (NNG_ENOMEM);
+ rv = NNG_ENOMEM;
+ goto fail;
}
mq->mq_cap = cap;
@@ -91,9 +83,22 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_geterr = 0;
mq->mq_wwait = 0;
mq->mq_rwait = 0;
+ mq->mq_notify_fn = NULL;
+ mq->mq_notify_arg = NULL;
*mqp = mq;
return (0);
+
+fail:
+ nni_cv_fini(&mq->mq_drained);
+ nni_cv_fini(&mq->mq_writeable);
+ nni_cv_fini(&mq->mq_readable);
+ nni_mtx_fini(&mq->mq_lock);
+ if (mq->mq_msgs != NULL) {
+ nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc);
+ }
+ NNI_FREE_STRUCT(mq);
+ return (rv);
}
@@ -127,6 +132,16 @@ nni_msgq_fini(nni_msgq *mq)
void
+nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_notify_fn = fn;
+ mq->mq_notify_arg = arg;
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
nni_msgq_set_put_error(nni_msgq *mq, int error)
{
nni_mtx_lock(&mq->mq_lock);
@@ -184,6 +199,9 @@ int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{
int rv;
+ int notify = 0;
+ nni_msgq_notify_fn fn;
+ void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -243,7 +261,18 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
if (mq->mq_rwait) {
nni_cv_wake(&mq->mq_readable);
}
+ notify = NNI_MSGQ_NOTIFY_CANGET;
+ if (mq->mq_len < mq->mq_cap) {
+ notify |= NNI_MSGQ_NOTIFY_CANPUT;
+ }
+ fn = mq->mq_notify_fn;
+ arg = mq->mq_notify_arg;
nni_mtx_unlock(&mq->mq_lock);
+
+ // The notify callback is executed outside of the lock.
+ if (fn != NULL) {
+ fn(mq, notify, arg);
+ }
return (0);
}
@@ -254,6 +283,10 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
int
nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
{
+ int notify = 0;
+ nni_msgq_notify_fn fn;
+ void *arg;
+
nni_mtx_lock(&mq->mq_lock);
// if closed, we don't put more... this check is first!
@@ -278,7 +311,19 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
if (mq->mq_rwait) {
nni_cv_wake(&mq->mq_readable);
}
+
+ notify = NNI_MSGQ_NOTIFY_CANGET;
+ if (mq->mq_len < mq->mq_cap) {
+ notify |= NNI_MSGQ_NOTIFY_CANPUT;
+ }
+ fn = mq->mq_notify_fn;
+ arg = mq->mq_notify_arg;
nni_mtx_unlock(&mq->mq_lock);
+
+ if (fn != NULL) {
+ fn(mq, notify, arg);
+ }
+
return (0);
}
@@ -287,6 +332,9 @@ static int
nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
{
int rv;
+ int notify = 0;
+ nni_msgq_notify_fn fn;
+ void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -335,7 +383,18 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
if (mq->mq_wwait) {
nni_cv_wake(&mq->mq_writeable);
}
+ notify = NNI_MSGQ_NOTIFY_CANPUT;
+ if (mq->mq_len > 0) {
+ notify |= NNI_MSGQ_NOTIFY_CANGET;
+ }
+ fn = mq->mq_notify_fn;
+ arg = mq->mq_notify_arg;
nni_mtx_unlock(&mq->mq_lock);
+
+ if (fn != NULL) {
+ fn(mq, notify, arg);
+ }
+
return (0);
}