diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-01 21:18:58 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-01 21:18:58 -0800 |
| commit | 97614393e450b6c6813021f0e733b864a6265872 (patch) | |
| tree | 31c8d4d0b2206e944f797b291660d8cf52eebea0 /src | |
| parent | 5436cc9e1619bf1dd80fabb568aac344ae65d406 (diff) | |
| download | nng-97614393e450b6c6813021f0e733b864a6265872.tar.gz nng-97614393e450b6c6813021f0e733b864a6265872.tar.bz2 nng-97614393e450b6c6813021f0e733b864a6265872.zip | |
Start of msgq aio.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 3 | ||||
| -rw-r--r-- | src/core/aio.h | 4 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 127 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 8 | ||||
| -rw-r--r-- | src/core/pipe.c | 14 | ||||
| -rw-r--r-- | src/core/pipe.h | 4 | ||||
| -rw-r--r-- | src/core/transport.h | 3 | ||||
| -rw-r--r-- | src/platform/posix/posix_aiothr.c | 4 |
8 files changed, 163 insertions, 4 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index b8304e8a..ce60385e 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -24,6 +24,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) nni_cv_init(&aio->a_cv, &aio->a_lk); aio->a_cb = cb; aio->a_cbarg = arg; + nni_taskq_ent_init(&aio->a_tqe, cb, arg); } @@ -86,5 +87,5 @@ nni_aio_finish(nni_aio *aio, int result, size_t count) nni_cv_wake(&aio->a_cv); nni_mtx_unlock(&aio->a_lk); - cb(arg); + nni_taskq_dispatch(NULL, &aio->a_tqe); } diff --git a/src/core/aio.h b/src/core/aio.h index 15746fc3..76ebb7c3 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -12,6 +12,7 @@ #include "core/defs.h" #include "core/list.h" +#include "core/taskq.h" #include "core/thread.h" typedef struct nni_aio_ops nni_aio_ops; @@ -25,10 +26,11 @@ struct nni_aio { nni_cb a_cb; // User specified callback. void * a_cbarg; // Callback argument. - // These fields are private to the io events framework. + // These fields are private to the aio framework. nni_mtx a_lk; nni_cv a_cv; unsigned a_flags; + nni_taskq_ent a_tqe; // Read/write operations. nni_iov a_iov[4]; diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index b09bb817..66f246f8 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -36,6 +36,9 @@ struct nni_msgq { nni_thr mq_notify_thr; nni_msgq_notify_fn mq_notify_fn; void * mq_notify_arg; + + nni_list mq_aio_putq; + nni_list mq_aio_getq; }; @@ -107,6 +110,9 @@ nni_msgq_init(nni_msgq **mqp, int cap) if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) { return (NNG_ENOMEM); } + NNI_LIST_INIT(&mq->mq_aio_putq, nni_aio, a_prov_node); + NNI_LIST_INIT(&mq->mq_aio_getq, nni_aio, a_prov_node); + if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { goto fail; } @@ -262,6 +268,127 @@ nni_msgq_signal(nni_msgq *mq, int *signal) } +static void +nni_msgq_run_putq(nni_msgq *mq) +{ + nni_aio *waio; + nni_aio *raio; + nni_msg *msg; + size_t len; + + while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { + + msg = waio->a_msg; + len = nni_msg_len(msg); + + // The presence of any blocked reader indicates that + // the queue is empty, otherwise it would have just taken + // data from the queue. + if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { + nni_list_remove(&mq->mq_aio_getq, raio); + nni_list_remove(&mq->mq_aio_putq, waio); + + *raio->a_msgp = msg; + waio->a_msg = NULL; + + nni_aio_finish(raio, 0, len); + nni_aio_finish(waio, 0, len); + continue; + } + + // Otherwise if we have room in the buffer, just queue it. + if (mq->mq_len < mq->mq_cap) { + nni_list_remove(&mq->mq_aio_putq, waio); + mq->mq_msgs[mq->mq_put++] = msg; + if (mq->mq_put == mq->mq_alloc) { + mq->mq_put = 0; + } + mq->mq_len++; + waio->a_msg = NULL; + nni_aio_finish(waio, 0, len); + continue; + } + + // Unable to make progress, leave the aio where it is. + break; + } +} + + +static void +nni_msgq_run_getq(nni_msgq *mq) +{ + nni_aio *raio; + nni_aio *waio; + nni_msg *msg; + size_t len; + + while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { + + // If anything is waiting in the queue, get it first. + if (mq->mq_len != 0) { + msg = mq->mq_msgs[mq->mq_get++]; + if (mq->mq_get > mq->mq_cap) { + mq->mq_get = 0; + } + mq->mq_len--; + len = nni_msg_len(msg); + *(raio->a_msgp) = msg; + nni_aio_finish(raio, 0, len); + continue; + } + + // Nothing queued (unbuffered?), maybe a writer is waiting. + if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { + nni_list_remove(&mq->mq_aio_putq, waio); + nni_list_remove(&mq->mq_aio_getq, raio); + + msg = waio->a_msg; + len = nni_msg_len(msg); + waio->a_msg = NULL; + *raio->a_msgp = msg; + nni_aio_finish(raio, 0, len); + nni_aio_finish(waio, 0, len); + continue; + } + + // No data to get, and no unbuffered writers waiting. Just + // wait until something arrives. + break; + } +} + + +static void +nni_msgq_run_notify(nni_msgq *mq) +{ +} + + +int +nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) +{ + nni_mtx_lock(&mq->mq_lock); + nni_list_append(&mq->mq_aio_putq, aio); + nni_msgq_run_putq(mq); + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); + return (0); +} + + +int +nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) +{ + nni_mtx_lock(&mq->mq_lock); + nni_list_append(&mq->mq_aio_getq, aio); + nni_msgq_run_getq(mq); + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); + return (0); +} + + int nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) { diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 7451139d..a350c3b3 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -35,6 +35,14 @@ extern int nni_msgq_init(nni_msgq **, int); // messages that may be in the queue. extern void nni_msgq_fini(nni_msgq *); +extern int nni_msgq_aio_put(nni_msgq *, nni_aio *); +extern int nni_msgq_aio_get(nni_msgq *, nni_aio *); +extern int nni_msgq_aio_put_until(nni_msgq *, nni_aio *, nni_time); +extern int nni_msgq_aio_get_until(nni_msgq *, nni_aio *, nni_time); +extern int nni_msgq_aio_notify_get(nni_msgq *, nni_aio *); +extern int nni_msgq_aio_notify_put(nni_msgq *, nni_aio *); +extern int nni_msgq_aio_cancel(nni_msgq *, nni_aio *); + // nni_msgq_put puts the message to the queue. It blocks until it // was able to do so, or the queue is closed, returning either 0 on // success or NNG_ECLOSED if the queue was closed. If NNG_ECLOSED is diff --git a/src/core/pipe.c b/src/core/pipe.c index 39b4d012..a401e4e3 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -36,6 +36,20 @@ nni_pipe_recv(nni_pipe *p, nng_msg **msgp) } +int +nni_pipe_aio_recv(nni_pipe *p, nni_aio *aio) +{ + return (p->p_tran_ops.pipe_aio_recv(p->p_tran_data, aio)); +} + + +int +nni_pipe_aio_send(nni_pipe *p, nni_aio *aio) +{ + return (p->p_tran_ops.pipe_aio_send(p->p_tran_data, aio)); +} + + // nni_pipe_close closes the underlying connection. It is expected that // subsequent attempts receive or send (including any waiting receive) will // simply return NNG_ECLOSED. diff --git a/src/core/pipe.h b/src/core/pipe.h index 333a986d..3ec4a7a3 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -31,6 +31,10 @@ struct nni_pipe { nni_thr p_worker_thr[NNI_MAXWORKERS]; }; +// AIO +extern int nni_pipe_aio_recv(nni_pipe *, nni_aio *); +extern int nni_pipe_aio_send(nni_pipe *, nni_aio *); + // Pipe operations that protocols use. extern int nni_pipe_recv(nni_pipe *, nng_msg **); extern int nni_pipe_send(nni_pipe *, nng_msg *); diff --git a/src/core/transport.h b/src/core/transport.h index 1765b361..83ec3121 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -85,6 +85,9 @@ struct nni_tran_pipe { // further calls on the same pipe. void (*pipe_destroy)(void *); + int (*pipe_aio_send)(void *, nni_aio *); + int (*pipe_aio_recv)(void *, nni_aio *); + // p_send sends the message. If the message cannot be received, then // the caller may try again with the same message (or free it). If // the call succeeds, then the transport has taken ownership of the diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c index 8378eb79..013e4599 100644 --- a/src/platform/posix/posix_aiothr.c +++ b/src/platform/posix/posix_aiothr.c @@ -64,7 +64,7 @@ nni_plat_aiothr_write(int fd, nni_aio *aio) aio->a_count += n; progress += n; while (n) { - // If we didn't finish the at once, try again. + // If we didn't finish it yet, try again. if (n < iovp->iov_len) { iovp->iov_len -= n; iovp->iov_base += n; @@ -128,7 +128,7 @@ nni_plat_aiothr_read(int fd, nni_aio *aio) aio->a_count += n; progress += n; while (n) { - // If we didn't finish the at once, try again. + // If we didn't finish it yet, try again. if (n < iovp->iov_len) { iovp->iov_len -= n; iovp->iov_base += n; |
