diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 151 |
1 files changed, 63 insertions, 88 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index a69af47c..9f7ff7fd 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -15,37 +15,36 @@ // side can close, and they may be closed more than once. struct nni_msgq { - nni_mtx mq_lock; - nni_cv mq_drained; - int mq_cap; - int mq_alloc; // alloc is cap + 2... - int mq_len; - int mq_get; - int mq_put; - int mq_closed; - int mq_puterr; - int mq_geterr; - int mq_draining; - nni_msg ** mq_msgs; - - nni_list mq_aio_putq; - nni_list mq_aio_getq; - nni_list mq_aio_notify_get; - nni_list mq_aio_notify_put; - - nni_timer_node mq_timer; - nni_time mq_expire; + nni_mtx mq_lock; + nni_cv mq_drained; + int mq_cap; + int mq_alloc; // alloc is cap + 2... + int mq_len; + int mq_get; + int mq_put; + int mq_closed; + int mq_puterr; + int mq_geterr; + int mq_draining; + nni_msg **mq_msgs; + + nni_list mq_aio_putq; + nni_list mq_aio_getq; + nni_list mq_aio_notify_get; + nni_list mq_aio_notify_put; + + nni_timer_node mq_timer; + nni_time mq_expire; }; - static void nni_msgq_run_timeout(void *); int nni_msgq_init(nni_msgq **mqp, int cap) { struct nni_msgq *mq; - int rv; - int alloc; + int rv; + int alloc; if (cap < 0) { return (NNG_EINVAL); @@ -72,24 +71,24 @@ nni_msgq_init(nni_msgq **mqp, int cap) if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) { goto fail; } - if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) { + if ((mq->mq_msgs = nni_alloc(sizeof(nng_msg *) * alloc)) == NULL) { rv = NNG_ENOMEM; goto fail; } nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq); - mq->mq_cap = cap; - mq->mq_alloc = alloc; - mq->mq_len = 0; - mq->mq_get = 0; - mq->mq_put = 0; - mq->mq_closed = 0; - mq->mq_puterr = 0; - mq->mq_geterr = 0; - mq->mq_expire = NNI_TIME_NEVER; + mq->mq_cap = cap; + mq->mq_alloc = alloc; + mq->mq_len = 0; + mq->mq_get = 0; + mq->mq_put = 0; + mq->mq_closed = 0; + mq->mq_puterr = 0; + mq->mq_geterr = 0; + mq->mq_expire = NNI_TIME_NEVER; mq->mq_draining = 0; - *mqp = mq; + *mqp = mq; return (0); @@ -97,13 +96,12 @@ fail: nni_cv_fini(&mq->mq_drained); nni_mtx_fini(&mq->mq_lock); if (mq->mq_msgs != NULL) { - nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc); + nni_free(mq->mq_msgs, sizeof(nng_msg *) * alloc); } NNI_FREE_STRUCT(mq); return (rv); } - void nni_msgq_fini(nni_msgq *mq) { @@ -127,11 +125,10 @@ nni_msgq_fini(nni_msgq *mq) nni_msg_free(msg); } - nni_free(mq->mq_msgs, mq->mq_alloc * sizeof (nng_msg *)); + nni_free(mq->mq_msgs, mq->mq_alloc * sizeof(nng_msg *)); NNI_FREE_STRUCT(mq); } - static void nni_msgq_finish(nni_aio *aio, int rv) { @@ -139,7 +136,6 @@ nni_msgq_finish(nni_aio *aio, int rv) nni_aio_finish(aio, rv, 0); } - void nni_msgq_set_get_error(nni_msgq *mq, int error) { @@ -159,7 +155,6 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) nni_mtx_unlock(&mq->mq_lock); } - void nni_msgq_set_put_error(nni_msgq *mq, int error) { @@ -179,7 +174,6 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) nni_mtx_unlock(&mq->mq_lock); } - void nni_msgq_set_error(nni_msgq *mq, int error) { @@ -205,14 +199,13 @@ nni_msgq_set_error(nni_msgq *mq, int error) nni_mtx_unlock(&mq->mq_lock); } - static void nni_msgq_run_putq(nni_msgq *mq) { nni_aio *waio; nni_aio *raio; nni_msg *msg; - size_t len; + size_t len; while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { msg = waio->a_msg; @@ -248,7 +241,6 @@ nni_msgq_run_putq(nni_msgq *mq) } } - static void nni_msgq_run_getq(nni_msgq *mq) { @@ -272,7 +264,7 @@ nni_msgq_run_getq(nni_msgq *mq) // Nothing queued (unbuffered?), maybe a writer is waiting. if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - msg = waio->a_msg; + msg = waio->a_msg; waio->a_msg = NULL; raio->a_msg = msg; nni_msgq_finish(raio, 0); @@ -286,7 +278,6 @@ nni_msgq_run_getq(nni_msgq *mq) } } - static void nni_msgq_run_notify(nni_msgq *mq) { @@ -316,7 +307,6 @@ nni_msgq_run_notify(nni_msgq *mq) } } - static void nni_msgq_cancel(nni_aio *aio) { @@ -331,7 +321,6 @@ nni_msgq_cancel(nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); } - void nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) { @@ -344,7 +333,6 @@ nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); } - void nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) { @@ -357,7 +345,6 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); } - void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { @@ -391,7 +378,6 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); } - void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { @@ -425,7 +411,6 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); } - int nni_msgq_canput(nni_msgq *mq) { @@ -442,7 +427,6 @@ nni_msgq_canput(nni_msgq *mq) return (0); } - int nni_msgq_canget(nni_msgq *mq) { @@ -459,12 +443,11 @@ nni_msgq_canget(nni_msgq *mq) return (0); } - int nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) { nni_aio *raio; - size_t len = nni_msg_len(msg); + size_t len = nni_msg_len(msg); nni_mtx_lock(&mq->mq_lock); if (mq->mq_closed) { @@ -500,16 +483,15 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) return (NNG_EAGAIN); } - // XXX: Move this to generic AIO timeout... void nni_msgq_run_timeout(void *arg) { nni_msgq *mq = arg; - nni_time now; - nni_time exp; - nni_aio *aio; - nni_aio *naio; + nni_time now; + nni_time exp; + nni_aio * aio; + nni_aio * naio; now = nni_clock(); exp = NNI_TIME_NEVER; @@ -550,12 +532,11 @@ nni_msgq_run_timeout(void *arg) nni_mtx_unlock(&mq->mq_lock); } - int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { nni_aio aio; - int rv; + int rv; if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { return (rv); @@ -564,25 +545,24 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) nni_msgq_aio_get(mq, &aio); nni_aio_wait(&aio); if ((rv = nni_aio_result(&aio)) == 0) { - *msgp = aio.a_msg; + *msgp = aio.a_msg; aio.a_msg = NULL; } nni_aio_fini(&aio); return (rv); } - int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { nni_aio aio; - int rv; + int rv; if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { return (rv); } aio.a_expire = expire; - aio.a_msg = msg; + aio.a_msg = msg; nni_msgq_aio_put(mq, &aio); nni_aio_wait(&aio); rv = nni_aio_result(&aio); @@ -590,14 +570,13 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) return (rv); } - void nni_msgq_drain(nni_msgq *mq, nni_time expire) { nni_aio *aio; nni_mtx_lock(&mq->mq_lock); - mq->mq_closed = 1; + mq->mq_closed = 1; mq->mq_draining = 1; while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) { if (nni_cv_until(&mq->mq_drained, expire) != 0) { @@ -622,7 +601,6 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) nni_mtx_unlock(&mq->mq_lock); } - void nni_msgq_close(nni_msgq *mq) { @@ -660,7 +638,6 @@ nni_msgq_close(nni_msgq *mq) nni_mtx_unlock(&mq->mq_lock); } - int nni_msgq_len(nni_msgq *mq) { @@ -672,7 +649,6 @@ nni_msgq_len(nni_msgq *mq) return (rv); } - int nni_msgq_cap(nni_msgq *mq) { @@ -684,23 +660,22 @@ nni_msgq_cap(nni_msgq *mq) return (rv); } - int nni_msgq_resize(nni_msgq *mq, int cap) { - int alloc; - nni_msg *msg; + int alloc; + nni_msg * msg; nni_msg **newq, **oldq; - int oldget; - int oldput; - int oldcap; - int oldlen; - int oldalloc; + int oldget; + int oldput; + int oldcap; + int oldlen; + int oldalloc; alloc = cap + 2; if (alloc > mq->mq_alloc) { - newq = nni_alloc(sizeof (nni_msg *) * alloc); + newq = nni_alloc(sizeof(nni_msg *) * alloc); if (newq == NULL) { return (NNG_ENOMEM); } @@ -726,17 +701,17 @@ nni_msgq_resize(nni_msgq *mq, int cap) goto out; } - oldq = mq->mq_msgs; - oldget = mq->mq_get; - oldput = mq->mq_put; - oldcap = mq->mq_cap; + oldq = mq->mq_msgs; + oldget = mq->mq_get; + oldput = mq->mq_put; + oldcap = mq->mq_cap; oldalloc = mq->mq_alloc; - oldlen = mq->mq_len; + oldlen = mq->mq_len; mq->mq_msgs = newq; mq->mq_len = mq->mq_get = mq->mq_put = 0; - mq->mq_cap = cap; - mq->mq_alloc = alloc; + mq->mq_cap = cap; + mq->mq_alloc = alloc; while (oldlen) { mq->mq_msgs[mq->mq_put++] = oldq[oldget++]; @@ -749,7 +724,7 @@ nni_msgq_resize(nni_msgq *mq, int cap) mq->mq_len++; oldlen--; } - nni_free(oldq, sizeof (nni_msg *) * oldalloc); + nni_free(oldq, sizeof(nni_msg *) * oldalloc); out: // Wake everyone up -- we changed everything. |
