From d72076207a2fad96ff014a81366868fb47a0ed1b Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 31 Aug 2017 17:59:01 -0700 Subject: Allocate AIOs dynamically. We allocate AIO structures dynamically, so that we can use them abstractly in more places without inlining them. This will be used for the ZeroTier transport to allow us to create operations consisting of just the AIO. Furthermore, we provide accessors for some of the aio members, in the hopes that we will be able to wrap these for "safe" version of the AIO capability to export to applications, and to protocol and transport implementors. While here we cleaned up the protocol details to use consistently shorter names (no nni_ prefix for static symbols needed), and we also fixed a bug in the surveyor code. --- src/core/msgqueue.c | 59 ++++++++++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 28 deletions(-) (limited to 'src/core/msgqueue.c') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index ef7dd5a2..6fd95f98 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -166,14 +166,14 @@ nni_msgq_run_putq(nni_msgq *mq) size_t len; while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - msg = waio->a_msg; + msg = nni_aio_get_msg(waio); len = nni_msg_len(msg); // The presence of any blocked reader indicates that // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - waio->a_msg = NULL; + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(raio); nni_aio_list_remove(waio); @@ -190,7 +190,7 @@ nni_msgq_run_putq(nni_msgq *mq) mq->mq_put = 0; } mq->mq_len++; - waio->a_msg = NULL; + nni_aio_set_msg(waio, NULL); nni_aio_finish(waio, 0, len); continue; } @@ -215,7 +215,6 @@ nni_msgq_run_getq(nni_msgq *mq) mq->mq_get = 0; } mq->mq_len--; - raio->a_msg = msg; nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); continue; @@ -223,8 +222,8 @@ nni_msgq_run_getq(nni_msgq *mq) // Nothing queued (unbuffered?), maybe a writer is waiting. if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - msg = waio->a_msg; - waio->a_msg = NULL; + msg = nni_aio_get_msg(waio); + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); nni_aio_list_remove(raio); @@ -397,34 +396,38 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { - nni_aio aio; - int rv; - - nni_aio_init(&aio, NULL, NULL); - aio.a_expire = expire; - nni_msgq_aio_get(mq, &aio); - nni_aio_wait(&aio); - if ((rv = nni_aio_result(&aio)) == 0) { - *msgp = aio.a_msg; - aio.a_msg = NULL; - } - nni_aio_fini(&aio); + nni_aio *aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + nni_aio_set_timeout(aio, expire); + nni_msgq_aio_get(mq, aio); + nni_aio_wait(aio); + if ((rv = nni_aio_result(aio)) == 0) { + *msgp = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); + } + nni_aio_fini(aio); return (rv); } int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { - nni_aio aio; - int rv; - - nni_aio_init(&aio, NULL, NULL); - aio.a_expire = expire; - aio.a_msg = msg; - nni_msgq_aio_put(mq, &aio); - nni_aio_wait(&aio); - rv = nni_aio_result(&aio); - nni_aio_fini(&aio); + nni_aio *aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + nni_aio_set_timeout(aio, expire); + nni_aio_set_msg(aio, msg); + nni_msgq_aio_put(mq, aio); + nni_aio_wait(aio); + rv = nni_aio_result(aio); + nni_aio_fini(aio); return (rv); } -- cgit v1.2.3-70-g09d2