diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 123 |
1 files changed, 91 insertions, 32 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index bc381cfc..344798d7 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -15,21 +15,24 @@ // side can close, and they may be closed more than once. struct nni_msgq { - nni_mtx mq_lock; - nni_cv mq_readable; - nni_cv mq_writeable; - nni_cv mq_drained; - int mq_cap; - int mq_alloc; // alloc is cap + 2... - int mq_len; - int mq_get; - int mq_put; - int mq_closed; - int mq_puterr; - int mq_geterr; - int mq_rwait; // readers waiting (unbuffered) - int mq_wwait; - nni_msg ** mq_msgs; + nni_mtx mq_lock; + nni_cv mq_readable; + nni_cv mq_writeable; + nni_cv mq_drained; + int mq_cap; + int mq_alloc; // alloc is cap + 2... + int mq_len; + int mq_get; + int mq_put; + int mq_closed; + int mq_puterr; + int mq_geterr; + int mq_rwait; // readers waiting (unbuffered) + int mq_wwait; + nni_msg ** mq_msgs; + + nni_msgq_notify_fn mq_notify_fn; + void * mq_notify_arg; }; int @@ -54,31 +57,20 @@ nni_msgq_init(nni_msgq **mqp, int cap) return (NNG_ENOMEM); } if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { - nni_free(mq, sizeof (*mq)); - return (rv); + goto fail; } if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) { - nni_mtx_fini(&mq->mq_lock); - nni_free(mq, sizeof (*mq)); - return (NNG_ENOMEM); + goto fail; } if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) { - nni_cv_fini(&mq->mq_readable); - nni_mtx_fini(&mq->mq_lock); - return (NNG_ENOMEM); + goto fail; } if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) { - nni_cv_fini(&mq->mq_writeable); - nni_cv_fini(&mq->mq_readable); - nni_mtx_fini(&mq->mq_lock); - return (NNG_ENOMEM); + goto fail; } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) { - nni_cv_fini(&mq->mq_drained); - nni_cv_fini(&mq->mq_writeable); - nni_cv_fini(&mq->mq_readable); - nni_mtx_fini(&mq->mq_lock); - return (NNG_ENOMEM); + rv = NNG_ENOMEM; + goto fail; } mq->mq_cap = cap; @@ -91,9 +83,22 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_geterr = 0; mq->mq_wwait = 0; mq->mq_rwait = 0; + mq->mq_notify_fn = NULL; + mq->mq_notify_arg = NULL; *mqp = mq; return (0); + +fail: + nni_cv_fini(&mq->mq_drained); + nni_cv_fini(&mq->mq_writeable); + nni_cv_fini(&mq->mq_readable); + nni_mtx_fini(&mq->mq_lock); + if (mq->mq_msgs != NULL) { + nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc); + } + NNI_FREE_STRUCT(mq); + return (rv); } @@ -127,6 +132,16 @@ nni_msgq_fini(nni_msgq *mq) void +nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_notify_fn = fn; + mq->mq_notify_arg = arg; + nni_mtx_unlock(&mq->mq_lock); +} + + +void nni_msgq_set_put_error(nni_msgq *mq, int error) { nni_mtx_lock(&mq->mq_lock); @@ -184,6 +199,9 @@ int nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) { int rv; + int notify = 0; + nni_msgq_notify_fn fn; + void *arg; nni_mtx_lock(&mq->mq_lock); @@ -243,7 +261,18 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) if (mq->mq_rwait) { nni_cv_wake(&mq->mq_readable); } + notify = NNI_MSGQ_NOTIFY_CANGET; + if (mq->mq_len < mq->mq_cap) { + notify |= NNI_MSGQ_NOTIFY_CANPUT; + } + fn = mq->mq_notify_fn; + arg = mq->mq_notify_arg; nni_mtx_unlock(&mq->mq_lock); + + // The notify callback is executed outside of the lock. + if (fn != NULL) { + fn(mq, notify, arg); + } return (0); } @@ -254,6 +283,10 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) int nni_msgq_putback(nni_msgq *mq, nni_msg *msg) { + int notify = 0; + nni_msgq_notify_fn fn; + void *arg; + nni_mtx_lock(&mq->mq_lock); // if closed, we don't put more... this check is first! @@ -278,7 +311,19 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg) if (mq->mq_rwait) { nni_cv_wake(&mq->mq_readable); } + + notify = NNI_MSGQ_NOTIFY_CANGET; + if (mq->mq_len < mq->mq_cap) { + notify |= NNI_MSGQ_NOTIFY_CANPUT; + } + fn = mq->mq_notify_fn; + arg = mq->mq_notify_arg; nni_mtx_unlock(&mq->mq_lock); + + if (fn != NULL) { + fn(mq, notify, arg); + } + return (0); } @@ -287,6 +332,9 @@ static int nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) { int rv; + int notify = 0; + nni_msgq_notify_fn fn; + void *arg; nni_mtx_lock(&mq->mq_lock); @@ -335,7 +383,18 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) if (mq->mq_wwait) { nni_cv_wake(&mq->mq_writeable); } + notify = NNI_MSGQ_NOTIFY_CANPUT; + if (mq->mq_len > 0) { + notify |= NNI_MSGQ_NOTIFY_CANGET; + } + fn = mq->mq_notify_fn; + arg = mq->mq_notify_arg; nni_mtx_unlock(&mq->mq_lock); + + if (fn != NULL) { + fn(mq, notify, arg); + } + return (0); } |
