diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 59 |
1 files changed, 31 insertions, 28 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index ef7dd5a2..6fd95f98 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -166,14 +166,14 @@ nni_msgq_run_putq(nni_msgq *mq) size_t len; while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - msg = waio->a_msg; + msg = nni_aio_get_msg(waio); len = nni_msg_len(msg); // The presence of any blocked reader indicates that // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - waio->a_msg = NULL; + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(raio); nni_aio_list_remove(waio); @@ -190,7 +190,7 @@ nni_msgq_run_putq(nni_msgq *mq) mq->mq_put = 0; } mq->mq_len++; - waio->a_msg = NULL; + nni_aio_set_msg(waio, NULL); nni_aio_finish(waio, 0, len); continue; } @@ -215,7 +215,6 @@ nni_msgq_run_getq(nni_msgq *mq) mq->mq_get = 0; } mq->mq_len--; - raio->a_msg = msg; nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); continue; @@ -223,8 +222,8 @@ nni_msgq_run_getq(nni_msgq *mq) // Nothing queued (unbuffered?), maybe a writer is waiting. if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - msg = waio->a_msg; - waio->a_msg = NULL; + msg = nni_aio_get_msg(waio); + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); nni_aio_list_remove(raio); @@ -397,34 +396,38 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { - nni_aio aio; - int rv; - - nni_aio_init(&aio, NULL, NULL); - aio.a_expire = expire; - nni_msgq_aio_get(mq, &aio); - nni_aio_wait(&aio); - if ((rv = nni_aio_result(&aio)) == 0) { - *msgp = aio.a_msg; - aio.a_msg = NULL; - } - nni_aio_fini(&aio); + nni_aio *aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + nni_aio_set_timeout(aio, expire); + nni_msgq_aio_get(mq, aio); + nni_aio_wait(aio); + if ((rv = nni_aio_result(aio)) == 0) { + *msgp = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); + } + nni_aio_fini(aio); return (rv); } int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { - nni_aio aio; - int rv; - - nni_aio_init(&aio, NULL, NULL); - aio.a_expire = expire; - aio.a_msg = msg; - nni_msgq_aio_put(mq, &aio); - nni_aio_wait(&aio); - rv = nni_aio_result(&aio); - nni_aio_fini(&aio); + nni_aio *aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + nni_aio_set_timeout(aio, expire); + nni_aio_set_msg(aio, msg); + nni_msgq_aio_put(mq, aio); + nni_aio_wait(aio); + rv = nni_aio_result(aio); + nni_aio_fini(aio); return (rv); } |
