diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-04-20 20:52:32 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-04-24 15:06:33 -0700 |
| commit | fdefff742662ed4eb476bf19b9dda245f86bc406 (patch) | |
| tree | a4e132716debd64e434478f8814f368db052cbc6 /src/core | |
| parent | e0b47b12d3d1462d07c5038e4f34f5282eeec675 (diff) | |
| download | nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.gz nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.bz2 nng-fdefff742662ed4eb476bf19b9dda245f86bc406.zip | |
fixes #342 Want Surveyor/Respondent context support
fixes #360 core should nng_aio_begin before nng_aio_finish_error
fixes #361 nng_send_aio should check for NULL message
fixes #362 nni_msgq does not signal pollable on certain events
This adds support for contexts for both sides of the surveyor pattern.
Prior to this commit, the raw mode was completely broken, and there
were numerous other bugs found and fixed. This integration includes
*much* deeper validation of this pattern.
Some changes to the core and other patterns have been made, where it
was obvioius that we could make such improvements. (The obviousness
stemming from the fact that RESPONDENT in particular is very closely
derived from REP.)
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/msgqueue.c | 25 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 4 |
2 files changed, 29 insertions, 0 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 1bb5a762..7c33b256 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -40,6 +40,8 @@ struct nni_msgq { void * mq_filter_arg; }; +static void nni_msgq_run_notify(nni_msgq *); + int nni_msgq_init(nni_msgq **mqp, unsigned cap) { @@ -128,6 +130,7 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) } } mq->mq_geterr = error; + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -149,6 +152,7 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) } } mq->mq_puterr = error; + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -172,6 +176,24 @@ nni_msgq_set_error(nni_msgq *mq, int error) } mq->mq_puterr = error; mq->mq_geterr = error; + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); +} + +void +nni_msgq_flush(nni_msgq *mq) +{ + nni_mtx_lock(&mq->mq_lock); + while (mq->mq_len > 0) { + nni_msg *msg = mq->mq_msgs[mq->mq_get]; + mq->mq_get++; + if (mq->mq_get >= mq->mq_alloc) { + mq->mq_get = 0; + } + mq->mq_len--; + nni_msg_free(msg); + } + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -331,6 +353,7 @@ nni_msgq_cancel(nni_aio *aio, int rv) nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -413,6 +436,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) nni_list_remove(&mq->mq_aio_getq, raio); nni_aio_finish_msg(raio, msg); + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); return (0); } @@ -424,6 +448,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) mq->mq_put = 0; } mq->mq_len++; + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); return (0); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 2f1a46eb..65215bd0 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -33,6 +33,10 @@ extern int nni_msgq_init(nni_msgq **, unsigned); // messages that may be in the queue. extern void nni_msgq_fini(nni_msgq *); +// nni_msgq_flush discards any messages that are sitting in the queue. +// It does not wake any writers that might be waiting. +extern void nni_msgq_flush(nni_msgq *); + extern void nni_msgq_aio_put(nni_msgq *, nni_aio *); extern void nni_msgq_aio_get(nni_msgq *, nni_aio *); |
