aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/CMakeLists.txt3
-rw-r--r--src/platform/posix/posix_aio.h60
-rw-r--r--src/platform/posix/posix_aiothr.c323
-rw-r--r--src/platform/posix/posix_config.h2
4 files changed, 388 insertions, 0 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8cd94cdf..36d9e8a7 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -75,7 +75,10 @@ set (NNG_SOURCES
platform/posix/posix_impl.h
platform/posix/posix_config.h
+ platform/posix/posix_aio.h
+
platform/posix/posix_alloc.c
+ platform/posix/posix_aiothr.c
platform/posix/posix_clock.c
platform/posix/posix_debug.c
platform/posix/posix_ipc.c
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
new file mode 100644
index 00000000..08c731bc
--- /dev/null
+++ b/src/platform/posix/posix_aio.h
@@ -0,0 +1,60 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef PLATFORM_POSIX_AIO_H
+#define PLATFORM_POSIX_AIO_H
+
+// This file defines structures we will use for emulating asynchronous I/O
+// on POSIX. POSIX lacks the support for callback based asynchronous I/O
+// that we have on Windows, although it has a non-widely support aio layer
+// that is not very performant on many systems. So we emulate this using
+// one of several possible different backends.
+
+#include "core/nng_impl.h"
+
+typedef struct nni_posix_aioq nni_posix_aioq;
+typedef struct nni_posix_aiof nni_posix_aiof;
+typedef struct nni_posix_aio_pipe nni_posix_aio_pipe;
+typedef struct nni_posix_aio_ep nni_posix_aio_ep;
+
+// Head structure representing file operations for read/write. We process
+// the list of aios serially, and each file has its own thread for now.
+struct nni_posix_aioq {
+ nni_list aq_aios;
+ int aq_fd;
+ nni_mtx aq_lk;
+ nni_cv aq_cv;
+#ifdef NNG_USE_POSIX_AIOTHR
+ nni_thr aq_thr;
+#endif
+};
+
+struct nni_posix_aio_pipe {
+ int ap_fd;
+ nni_posix_aioq ap_readq;
+ nni_posix_aioq ap_writeq;
+};
+
+struct nni_posix_aio_ep {
+ int ap_fd;
+ nni_posix_aioq ap_q;
+};
+
+extern int nni_posix_aio_pipe_init(nni_posix_aio_pipe *, int);
+extern void nni_posix_aio_pipe_fini(nni_posix_aio_pipe *);
+
+// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int);
+// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *);
+extern int nni_posix_aio_read(nni_posix_aioq *, nni_aio *);
+extern int nni_posix_aio_write(nni_posix_aioq *, nni_aio *);
+
+// extern int nni_posix_aio_connect();
+// extern int nni_posix_aio_accept();
+
+#endif // PLATFORM_POSIX_AIO_H
diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c
new file mode 100644
index 00000000..8378eb79
--- /dev/null
+++ b/src/platform/posix/posix_aiothr.c
@@ -0,0 +1,323 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+#include "platform/posix/posix_aio.h"
+
+#ifdef NNG_USE_POSIX_AIOTHR
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+// POSIX AIO using threads. This allows us to use normal synchronous AIO,
+// along with underlying threads, to simulate asynchronous I/O. This will be
+// unscalable for systems where threads are a finite resource, but it should
+// be sufficient for systems where threads are efficient, and cheap, or for
+// applications that do not need excessive amounts of open files. It also
+// serves as a model upon which we can build more scalable forms of asynch
+// I/O, using non-blocking I/O and pollers.
+
+
+// nni_plat_aiothr_write is used to attempt a write, sending
+// as much as it can. On success, it returns 0, otherwise an errno. It will
+// retry if EINTR is received.
+static int
+nni_plat_aiothr_write(int fd, nni_aio *aio)
+{
+ int n;
+ int rv;
+ int i;
+ struct iovec iovec[4];
+ struct iovec *iovp;
+ int niov = aio->a_niov;
+ int progress = 0;
+
+ for (i = 0; i < niov; i++) {
+ iovec[i].iov_len = aio->a_iov[i].iov_len;
+ iovec[i].iov_base = aio->a_iov[i].iov_buf;
+ }
+ iovp = &iovec[0];
+ rv = 0;
+
+ while (niov != 0) {
+ n = writev(fd, iovp, niov);
+ if (n < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ rv = nni_plat_errno(errno);
+ break;
+ }
+
+ aio->a_count += n;
+ progress += n;
+ while (n) {
+ // If we didn't finish the at once, try again.
+ if (n < iovp->iov_len) {
+ iovp->iov_len -= n;
+ iovp->iov_base += n;
+ break;
+ }
+
+ n -= iovp->iov_len;
+ iovp++;
+ niov--;
+ }
+ }
+
+ // Either we got it all, or we didn't.
+ if ((rv != 0) && (progress != 0)) {
+ for (i = 0; i < niov; i++) {
+ aio->a_iov[i].iov_len = iovp[i].iov_len;
+ aio->a_iov[i].iov_buf = iovp[i].iov_base;
+ }
+ aio->a_niov = niov;
+ }
+
+ return (rv);
+}
+
+
+// nni_plat_aiothr_read is used to attempt a read, sending as much as it can
+// (limited by the requested read). On success, it returns 0, otherwise an
+// errno. It will retry if EINTR is received.
+static int
+nni_plat_aiothr_read(int fd, nni_aio *aio)
+{
+ int n;
+ int rv;
+ int i;
+ struct iovec iovec[4];
+ struct iovec *iovp;
+ int niov = aio->a_niov;
+ int progress = 0;
+
+ for (i = 0; i < niov; i++) {
+ iovec[i].iov_len = aio->a_iov[i].iov_len;
+ iovec[i].iov_base = aio->a_iov[i].iov_buf;
+ }
+ iovp = &iovec[0];
+ rv = 0;
+
+ while (niov != 0) {
+ n = readv(fd, iovp, niov);
+ if (n < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ rv = nni_plat_errno(errno);
+ break;
+ }
+ if (n == 0) {
+ rv = NNG_ECLOSED;
+ break;
+ }
+
+ aio->a_count += n;
+ progress += n;
+ while (n) {
+ // If we didn't finish the at once, try again.
+ if (n < iovp->iov_len) {
+ iovp->iov_len -= n;
+ iovp->iov_base += n;
+ break;
+ }
+
+ n -= iovp->iov_len;
+ iovp++;
+ niov--;
+ }
+ }
+
+ // Either we got it all, or we didn't.
+ if ((rv != 0) && (progress != 0)) {
+ for (i = 0; i < niov; i++) {
+ aio->a_iov[i].iov_len = iovp[i].iov_len;
+ aio->a_iov[i].iov_buf = iovp[i].iov_base;
+ }
+ aio->a_niov = niov;
+ }
+
+ return (rv);
+}
+
+
+static void
+nni_plat_aiothr_dothr(nni_posix_aioq *q, int (*fn)(int, nni_aio *))
+{
+ nni_aio *aio;
+ int rv;
+
+ nni_mtx_lock(&q->aq_lk);
+ for (;;) {
+ if (q->aq_fd < 0) {
+ break;
+ }
+ if ((aio = nni_list_first(&q->aq_aios)) == NULL) {
+ nni_cv_wait(&q->aq_cv);
+ continue;
+ }
+ rv = fn(q->aq_fd, aio);
+ if (rv == NNG_EAGAIN) {
+ continue;
+ }
+ if (rv == NNG_ECLOSED) {
+ break;
+ }
+
+ nni_list_remove(&q->aq_aios, aio);
+ nni_mtx_unlock(&q->aq_lk);
+
+ // Call the callback.
+ nni_aio_finish(aio, rv, aio->a_count);
+ }
+
+ while ((aio = nni_list_first(&q->aq_aios)) != NULL) {
+ nni_mtx_unlock(&q->aq_lk);
+ nni_aio_finish(aio, NNG_ECLOSED, aio->a_count);
+ nni_mtx_lock(&q->aq_lk);
+ }
+
+ nni_mtx_unlock(&q->aq_lk);
+}
+
+
+static void
+nni_plat_aiothr_readthr(void *arg)
+{
+ nni_plat_aiothr_dothr(arg, nni_plat_aiothr_read);
+}
+
+
+static void
+nni_plat_aiothr_writethr(void *arg)
+{
+ nni_plat_aiothr_dothr(arg, nni_plat_aiothr_write);
+}
+
+
+static int
+nni_posix_aioq_init(nni_posix_aioq *q, int fd, nni_cb cb)
+{
+ int rv;
+
+ NNI_LIST_INIT(&q->aq_aios, nni_aio, a_prov_node);
+ if ((rv = nni_mtx_init(&q->aq_lk)) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_cv_init(&q->aq_cv, &q->aq_lk)) != 0) {
+ nni_mtx_fini(&q->aq_lk);
+ return (rv);
+ }
+ if ((rv = nni_thr_init(&q->aq_thr, cb, q)) != 0) {
+ nni_cv_fini(&q->aq_cv);
+ nni_mtx_fini(&q->aq_lk);
+ return (rv);
+ }
+ q->aq_fd = fd;
+ return (0);
+}
+
+
+static void
+nni_posix_aioq_start(nni_posix_aioq *q)
+{
+ nni_thr_run(&q->aq_thr);
+}
+
+
+static void
+nni_posix_aioq_fini(nni_posix_aioq *q)
+{
+ nni_mtx_lock(&q->aq_lk);
+ q->aq_fd = -1;
+ nni_cv_wake(&q->aq_cv);
+ nni_mtx_unlock(&q->aq_lk);
+
+ nni_thr_fini(&q->aq_thr);
+ nni_cv_fini(&q->aq_cv);
+ nni_mtx_fini(&q->aq_lk);
+}
+
+
+int
+nni_posix_aio_pipe_init(nni_posix_aio_pipe *p, int fd)
+{
+ int rv;
+
+ rv = nni_posix_aioq_init(&p->ap_readq, fd, nni_plat_aiothr_readthr);
+ if (rv != 0) {
+ return (rv);
+ }
+ rv = nni_posix_aioq_init(&p->ap_writeq, fd, nni_plat_aiothr_writethr);
+ if (rv != 0) {
+ nni_posix_aioq_fini(&p->ap_readq);
+ return (rv);
+ }
+ nni_posix_aioq_start(&p->ap_readq);
+ nni_posix_aioq_start(&p->ap_writeq);
+ return (0);
+}
+
+
+void
+nni_posix_aio_pipe_fini(nni_posix_aio_pipe *p)
+{
+ nni_posix_aioq_fini(&p->ap_readq);
+ nni_posix_aioq_fini(&p->ap_writeq);
+}
+
+
+// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int);
+// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *);
+
+static int
+nni_posix_aio_submit(nni_posix_aioq *q, nni_aio *aio)
+{
+ nni_mtx_lock(&q->aq_lk);
+ if (q->aq_fd < 0) {
+ nni_mtx_unlock(&q->aq_lk);
+ return (NNG_ECLOSED);
+ }
+ nni_list_append(&q->aq_aios, aio);
+ nni_cv_wake(&q->aq_cv);
+ nni_mtx_unlock(&q->aq_lk);
+ return (0);
+}
+
+
+int
+nni_posix_aio_read(nni_posix_aioq *q, nni_aio *aio)
+{
+ return (nni_posix_aio_submit(q, aio));
+}
+
+
+int
+nni_posix_aio_write(nni_posix_aioq *q, nni_aio *aio)
+{
+ return (nni_posix_aio_submit(q, aio));
+}
+
+
+// extern int nni_posix_aio_connect();
+// extern int nni_posix_aio_accept();
+
+#else
+
+// Suppress empty symbols warnings in ranlib.
+int nni_posix_aiothr_not_used = 0;
+
+#endif // NNG_USE_POSIX_AIOTHR
diff --git a/src/platform/posix/posix_config.h b/src/platform/posix/posix_config.h
index 234d57cc..34ab2c6d 100644
--- a/src/platform/posix/posix_config.h
+++ b/src/platform/posix/posix_config.h
@@ -57,3 +57,5 @@
#else
#define NNG_USE_CLOCKID CLOCK_REALTIME
#endif // CLOCK_REALTIME
+
+#define NNG_USE_POSIX_AIOTHR 1