aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c59
1 files changed, 31 insertions, 28 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index ef7dd5a2..6fd95f98 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -166,14 +166,14 @@ nni_msgq_run_putq(nni_msgq *mq)
size_t len;
while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- msg = waio->a_msg;
+ msg = nni_aio_get_msg(waio);
len = nni_msg_len(msg);
// The presence of any blocked reader indicates that
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- waio->a_msg = NULL;
+ nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(raio);
nni_aio_list_remove(waio);
@@ -190,7 +190,7 @@ nni_msgq_run_putq(nni_msgq *mq)
mq->mq_put = 0;
}
mq->mq_len++;
- waio->a_msg = NULL;
+ nni_aio_set_msg(waio, NULL);
nni_aio_finish(waio, 0, len);
continue;
}
@@ -215,7 +215,6 @@ nni_msgq_run_getq(nni_msgq *mq)
mq->mq_get = 0;
}
mq->mq_len--;
- raio->a_msg = msg;
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
continue;
@@ -223,8 +222,8 @@ nni_msgq_run_getq(nni_msgq *mq)
// Nothing queued (unbuffered?), maybe a writer is waiting.
if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- msg = waio->a_msg;
- waio->a_msg = NULL;
+ msg = nni_aio_get_msg(waio);
+ nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
nni_aio_list_remove(raio);
@@ -397,34 +396,38 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
- nni_aio aio;
- int rv;
-
- nni_aio_init(&aio, NULL, NULL);
- aio.a_expire = expire;
- nni_msgq_aio_get(mq, &aio);
- nni_aio_wait(&aio);
- if ((rv = nni_aio_result(&aio)) == 0) {
- *msgp = aio.a_msg;
- aio.a_msg = NULL;
- }
- nni_aio_fini(&aio);
+ nni_aio *aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ nni_aio_set_timeout(aio, expire);
+ nni_msgq_aio_get(mq, aio);
+ nni_aio_wait(aio);
+ if ((rv = nni_aio_result(aio)) == 0) {
+ *msgp = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
+ }
+ nni_aio_fini(aio);
return (rv);
}
int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
- nni_aio aio;
- int rv;
-
- nni_aio_init(&aio, NULL, NULL);
- aio.a_expire = expire;
- aio.a_msg = msg;
- nni_msgq_aio_put(mq, &aio);
- nni_aio_wait(&aio);
- rv = nni_aio_result(&aio);
- nni_aio_fini(&aio);
+ nni_aio *aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ nni_aio_set_timeout(aio, expire);
+ nni_aio_set_msg(aio, msg);
+ nni_msgq_aio_put(mq, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
return (rv);
}