aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-01 21:18:58 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-01 21:18:58 -0800
commit97614393e450b6c6813021f0e733b864a6265872 (patch)
tree31c8d4d0b2206e944f797b291660d8cf52eebea0 /src
parent5436cc9e1619bf1dd80fabb568aac344ae65d406 (diff)
downloadnng-97614393e450b6c6813021f0e733b864a6265872.tar.gz
nng-97614393e450b6c6813021f0e733b864a6265872.tar.bz2
nng-97614393e450b6c6813021f0e733b864a6265872.zip
Start of msgq aio.
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c3
-rw-r--r--src/core/aio.h4
-rw-r--r--src/core/msgqueue.c127
-rw-r--r--src/core/msgqueue.h8
-rw-r--r--src/core/pipe.c14
-rw-r--r--src/core/pipe.h4
-rw-r--r--src/core/transport.h3
-rw-r--r--src/platform/posix/posix_aiothr.c4
8 files changed, 163 insertions, 4 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index b8304e8a..ce60385e 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -24,6 +24,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
nni_cv_init(&aio->a_cv, &aio->a_lk);
aio->a_cb = cb;
aio->a_cbarg = arg;
+ nni_taskq_ent_init(&aio->a_tqe, cb, arg);
}
@@ -86,5 +87,5 @@ nni_aio_finish(nni_aio *aio, int result, size_t count)
nni_cv_wake(&aio->a_cv);
nni_mtx_unlock(&aio->a_lk);
- cb(arg);
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index 15746fc3..76ebb7c3 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -12,6 +12,7 @@
#include "core/defs.h"
#include "core/list.h"
+#include "core/taskq.h"
#include "core/thread.h"
typedef struct nni_aio_ops nni_aio_ops;
@@ -25,10 +26,11 @@ struct nni_aio {
nni_cb a_cb; // User specified callback.
void * a_cbarg; // Callback argument.
- // These fields are private to the io events framework.
+ // These fields are private to the aio framework.
nni_mtx a_lk;
nni_cv a_cv;
unsigned a_flags;
+ nni_taskq_ent a_tqe;
// Read/write operations.
nni_iov a_iov[4];
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index b09bb817..66f246f8 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -36,6 +36,9 @@ struct nni_msgq {
nni_thr mq_notify_thr;
nni_msgq_notify_fn mq_notify_fn;
void * mq_notify_arg;
+
+ nni_list mq_aio_putq;
+ nni_list mq_aio_getq;
};
@@ -107,6 +110,9 @@ nni_msgq_init(nni_msgq **mqp, int cap)
if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) {
return (NNG_ENOMEM);
}
+ NNI_LIST_INIT(&mq->mq_aio_putq, nni_aio, a_prov_node);
+ NNI_LIST_INIT(&mq->mq_aio_getq, nni_aio, a_prov_node);
+
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
goto fail;
}
@@ -262,6 +268,127 @@ nni_msgq_signal(nni_msgq *mq, int *signal)
}
+static void
+nni_msgq_run_putq(nni_msgq *mq)
+{
+ nni_aio *waio;
+ nni_aio *raio;
+ nni_msg *msg;
+ size_t len;
+
+ while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
+
+ msg = waio->a_msg;
+ len = nni_msg_len(msg);
+
+ // The presence of any blocked reader indicates that
+ // the queue is empty, otherwise it would have just taken
+ // data from the queue.
+ if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
+ nni_list_remove(&mq->mq_aio_getq, raio);
+ nni_list_remove(&mq->mq_aio_putq, waio);
+
+ *raio->a_msgp = msg;
+ waio->a_msg = NULL;
+
+ nni_aio_finish(raio, 0, len);
+ nni_aio_finish(waio, 0, len);
+ continue;
+ }
+
+ // Otherwise if we have room in the buffer, just queue it.
+ if (mq->mq_len < mq->mq_cap) {
+ nni_list_remove(&mq->mq_aio_putq, waio);
+ mq->mq_msgs[mq->mq_put++] = msg;
+ if (mq->mq_put == mq->mq_alloc) {
+ mq->mq_put = 0;
+ }
+ mq->mq_len++;
+ waio->a_msg = NULL;
+ nni_aio_finish(waio, 0, len);
+ continue;
+ }
+
+ // Unable to make progress, leave the aio where it is.
+ break;
+ }
+}
+
+
+static void
+nni_msgq_run_getq(nni_msgq *mq)
+{
+ nni_aio *raio;
+ nni_aio *waio;
+ nni_msg *msg;
+ size_t len;
+
+ while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
+
+ // If anything is waiting in the queue, get it first.
+ if (mq->mq_len != 0) {
+ msg = mq->mq_msgs[mq->mq_get++];
+ if (mq->mq_get > mq->mq_cap) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ len = nni_msg_len(msg);
+ *(raio->a_msgp) = msg;
+ nni_aio_finish(raio, 0, len);
+ continue;
+ }
+
+ // Nothing queued (unbuffered?), maybe a writer is waiting.
+ if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
+ nni_list_remove(&mq->mq_aio_putq, waio);
+ nni_list_remove(&mq->mq_aio_getq, raio);
+
+ msg = waio->a_msg;
+ len = nni_msg_len(msg);
+ waio->a_msg = NULL;
+ *raio->a_msgp = msg;
+ nni_aio_finish(raio, 0, len);
+ nni_aio_finish(waio, 0, len);
+ continue;
+ }
+
+ // No data to get, and no unbuffered writers waiting. Just
+ // wait until something arrives.
+ break;
+ }
+}
+
+
+static void
+nni_msgq_run_notify(nni_msgq *mq)
+{
+}
+
+
+int
+nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ nni_list_append(&mq->mq_aio_putq, aio);
+ nni_msgq_run_putq(mq);
+ nni_msgq_run_notify(mq);
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+}
+
+
+int
+nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ nni_list_append(&mq->mq_aio_getq, aio);
+ nni_msgq_run_getq(mq);
+ nni_msgq_run_notify(mq);
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+}
+
+
int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 7451139d..a350c3b3 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -35,6 +35,14 @@ extern int nni_msgq_init(nni_msgq **, int);
// messages that may be in the queue.
extern void nni_msgq_fini(nni_msgq *);
+extern int nni_msgq_aio_put(nni_msgq *, nni_aio *);
+extern int nni_msgq_aio_get(nni_msgq *, nni_aio *);
+extern int nni_msgq_aio_put_until(nni_msgq *, nni_aio *, nni_time);
+extern int nni_msgq_aio_get_until(nni_msgq *, nni_aio *, nni_time);
+extern int nni_msgq_aio_notify_get(nni_msgq *, nni_aio *);
+extern int nni_msgq_aio_notify_put(nni_msgq *, nni_aio *);
+extern int nni_msgq_aio_cancel(nni_msgq *, nni_aio *);
+
// nni_msgq_put puts the message to the queue. It blocks until it
// was able to do so, or the queue is closed, returning either 0 on
// success or NNG_ECLOSED if the queue was closed. If NNG_ECLOSED is
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 39b4d012..a401e4e3 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -36,6 +36,20 @@ nni_pipe_recv(nni_pipe *p, nng_msg **msgp)
}
+int
+nni_pipe_aio_recv(nni_pipe *p, nni_aio *aio)
+{
+ return (p->p_tran_ops.pipe_aio_recv(p->p_tran_data, aio));
+}
+
+
+int
+nni_pipe_aio_send(nni_pipe *p, nni_aio *aio)
+{
+ return (p->p_tran_ops.pipe_aio_send(p->p_tran_data, aio));
+}
+
+
// nni_pipe_close closes the underlying connection. It is expected that
// subsequent attempts receive or send (including any waiting receive) will
// simply return NNG_ECLOSED.
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 333a986d..3ec4a7a3 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -31,6 +31,10 @@ struct nni_pipe {
nni_thr p_worker_thr[NNI_MAXWORKERS];
};
+// AIO
+extern int nni_pipe_aio_recv(nni_pipe *, nni_aio *);
+extern int nni_pipe_aio_send(nni_pipe *, nni_aio *);
+
// Pipe operations that protocols use.
extern int nni_pipe_recv(nni_pipe *, nng_msg **);
extern int nni_pipe_send(nni_pipe *, nng_msg *);
diff --git a/src/core/transport.h b/src/core/transport.h
index 1765b361..83ec3121 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -85,6 +85,9 @@ struct nni_tran_pipe {
// further calls on the same pipe.
void (*pipe_destroy)(void *);
+ int (*pipe_aio_send)(void *, nni_aio *);
+ int (*pipe_aio_recv)(void *, nni_aio *);
+
// p_send sends the message. If the message cannot be received, then
// the caller may try again with the same message (or free it). If
// the call succeeds, then the transport has taken ownership of the
diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c
index 8378eb79..013e4599 100644
--- a/src/platform/posix/posix_aiothr.c
+++ b/src/platform/posix/posix_aiothr.c
@@ -64,7 +64,7 @@ nni_plat_aiothr_write(int fd, nni_aio *aio)
aio->a_count += n;
progress += n;
while (n) {
- // If we didn't finish the at once, try again.
+ // If we didn't finish it yet, try again.
if (n < iovp->iov_len) {
iovp->iov_len -= n;
iovp->iov_base += n;
@@ -128,7 +128,7 @@ nni_plat_aiothr_read(int fd, nni_aio *aio)
aio->a_count += n;
progress += n;
while (n) {
- // If we didn't finish the at once, try again.
+ // If we didn't finish it yet, try again.
if (n < iovp->iov_len) {
iovp->iov_len -= n;
iovp->iov_base += n;