summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-16 19:35:51 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-16 19:35:51 -0800
commit50e1484af0d443b46aa04fd4a8096b157dc160aa (patch)
treeab517217c82e785b0c851c986ee44dfc4a4a65fe /src/core/msgqueue.c
parentac8415c24ffea645105c3859e814843e81c97f8a (diff)
downloadnng-50e1484af0d443b46aa04fd4a8096b157dc160aa.tar.gz
nng-50e1484af0d443b46aa04fd4a8096b157dc160aa.tar.bz2
nng-50e1484af0d443b46aa04fd4a8096b157dc160aa.zip
Recv/Send event plumbing implemented (msgqueue and up).
This change provides for a private callback in the message queues, which can be used to notify the socket, and which than arranges for the appropriate event thread to run. Upper layer hooks to access this still need to be written.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c123
1 files changed, 91 insertions, 32 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index bc381cfc..344798d7 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -15,21 +15,24 @@
// side can close, and they may be closed more than once.
struct nni_msgq {
- nni_mtx mq_lock;
- nni_cv mq_readable;
- nni_cv mq_writeable;
- nni_cv mq_drained;
- int mq_cap;
- int mq_alloc; // alloc is cap + 2...
- int mq_len;
- int mq_get;
- int mq_put;
- int mq_closed;
- int mq_puterr;
- int mq_geterr;
- int mq_rwait; // readers waiting (unbuffered)
- int mq_wwait;
- nni_msg ** mq_msgs;
+ nni_mtx mq_lock;
+ nni_cv mq_readable;
+ nni_cv mq_writeable;
+ nni_cv mq_drained;
+ int mq_cap;
+ int mq_alloc; // alloc is cap + 2...
+ int mq_len;
+ int mq_get;
+ int mq_put;
+ int mq_closed;
+ int mq_puterr;
+ int mq_geterr;
+ int mq_rwait; // readers waiting (unbuffered)
+ int mq_wwait;
+ nni_msg ** mq_msgs;
+
+ nni_msgq_notify_fn mq_notify_fn;
+ void * mq_notify_arg;
};
int
@@ -54,31 +57,20 @@ nni_msgq_init(nni_msgq **mqp, int cap)
return (NNG_ENOMEM);
}
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
- nni_free(mq, sizeof (*mq));
- return (rv);
+ goto fail;
}
if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) {
- nni_mtx_fini(&mq->mq_lock);
- nni_free(mq, sizeof (*mq));
- return (NNG_ENOMEM);
+ goto fail;
}
if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) {
- nni_cv_fini(&mq->mq_readable);
- nni_mtx_fini(&mq->mq_lock);
- return (NNG_ENOMEM);
+ goto fail;
}
if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
- nni_cv_fini(&mq->mq_writeable);
- nni_cv_fini(&mq->mq_readable);
- nni_mtx_fini(&mq->mq_lock);
- return (NNG_ENOMEM);
+ goto fail;
}
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) {
- nni_cv_fini(&mq->mq_drained);
- nni_cv_fini(&mq->mq_writeable);
- nni_cv_fini(&mq->mq_readable);
- nni_mtx_fini(&mq->mq_lock);
- return (NNG_ENOMEM);
+ rv = NNG_ENOMEM;
+ goto fail;
}
mq->mq_cap = cap;
@@ -91,9 +83,22 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_geterr = 0;
mq->mq_wwait = 0;
mq->mq_rwait = 0;
+ mq->mq_notify_fn = NULL;
+ mq->mq_notify_arg = NULL;
*mqp = mq;
return (0);
+
+fail:
+ nni_cv_fini(&mq->mq_drained);
+ nni_cv_fini(&mq->mq_writeable);
+ nni_cv_fini(&mq->mq_readable);
+ nni_mtx_fini(&mq->mq_lock);
+ if (mq->mq_msgs != NULL) {
+ nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc);
+ }
+ NNI_FREE_STRUCT(mq);
+ return (rv);
}
@@ -127,6 +132,16 @@ nni_msgq_fini(nni_msgq *mq)
void
+nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_notify_fn = fn;
+ mq->mq_notify_arg = arg;
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
nni_msgq_set_put_error(nni_msgq *mq, int error)
{
nni_mtx_lock(&mq->mq_lock);
@@ -184,6 +199,9 @@ int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{
int rv;
+ int notify = 0;
+ nni_msgq_notify_fn fn;
+ void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -243,7 +261,18 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
if (mq->mq_rwait) {
nni_cv_wake(&mq->mq_readable);
}
+ notify = NNI_MSGQ_NOTIFY_CANGET;
+ if (mq->mq_len < mq->mq_cap) {
+ notify |= NNI_MSGQ_NOTIFY_CANPUT;
+ }
+ fn = mq->mq_notify_fn;
+ arg = mq->mq_notify_arg;
nni_mtx_unlock(&mq->mq_lock);
+
+ // The notify callback is executed outside of the lock.
+ if (fn != NULL) {
+ fn(mq, notify, arg);
+ }
return (0);
}
@@ -254,6 +283,10 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
int
nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
{
+ int notify = 0;
+ nni_msgq_notify_fn fn;
+ void *arg;
+
nni_mtx_lock(&mq->mq_lock);
// if closed, we don't put more... this check is first!
@@ -278,7 +311,19 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
if (mq->mq_rwait) {
nni_cv_wake(&mq->mq_readable);
}
+
+ notify = NNI_MSGQ_NOTIFY_CANGET;
+ if (mq->mq_len < mq->mq_cap) {
+ notify |= NNI_MSGQ_NOTIFY_CANPUT;
+ }
+ fn = mq->mq_notify_fn;
+ arg = mq->mq_notify_arg;
nni_mtx_unlock(&mq->mq_lock);
+
+ if (fn != NULL) {
+ fn(mq, notify, arg);
+ }
+
return (0);
}
@@ -287,6 +332,9 @@ static int
nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
{
int rv;
+ int notify = 0;
+ nni_msgq_notify_fn fn;
+ void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -335,7 +383,18 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
if (mq->mq_wwait) {
nni_cv_wake(&mq->mq_writeable);
}
+ notify = NNI_MSGQ_NOTIFY_CANPUT;
+ if (mq->mq_len > 0) {
+ notify |= NNI_MSGQ_NOTIFY_CANGET;
+ }
+ fn = mq->mq_notify_fn;
+ arg = mq->mq_notify_arg;
nni_mtx_unlock(&mq->mq_lock);
+
+ if (fn != NULL) {
+ fn(mq, notify, arg);
+ }
+
return (0);
}