aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-02-19 11:44:22 -0800
committerGarrett D'Amore <garrett@damore.org>2017-02-19 11:44:22 -0800
commit1981675919003f2ea01971bb4fccbe99c57a2caf (patch)
tree519b56e1bc66aca67d3cb1d87f01c9c270573d03 /src/core
parent5eae01c44c2daeddb778608cb48f188b48782c2e (diff)
downloadnng-1981675919003f2ea01971bb4fccbe99c57a2caf.tar.gz
nng-1981675919003f2ea01971bb4fccbe99c57a2caf.tar.bz2
nng-1981675919003f2ea01971bb4fccbe99c57a2caf.zip
Introduce new generic I/O event framework.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/defs.h2
-rw-r--r--src/core/endpt.h3
-rw-r--r--src/core/idhash.h2
-rw-r--r--src/core/ioev.c169
-rw-r--r--src/core/ioev.h109
-rw-r--r--src/core/list.h2
-rw-r--r--src/core/nng_impl.h4
-rw-r--r--src/core/pipe.h1
-rw-r--r--src/core/taskq.h6
-rw-r--r--src/core/thread.h3
10 files changed, 294 insertions, 7 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index 0a04b732..c730cf41 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -50,8 +50,6 @@ typedef uint64_t nni_time; // Abs. time (usec).
typedef int64_t nni_duration; // Rel. time (usec).
typedef void (*nni_worker)(void *);
-typedef struct nni_taskq nni_taskq;
-typedef struct nni_taskq_ent nni_taskq_ent;
typedef void (*nni_cb)(void *);
// Used by transports for scatter gather I/O.
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 493355d2..3a30f73e 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -10,6 +10,9 @@
#ifndef CORE_ENDPT_H
#define CORE_ENDPT_H
+#include "core/defs.h"
+#include "core/list.h"
+#include "core/thread.h"
#include "core/transport.h"
// NB: This structure is supplied here for use by the CORE. Use of this
diff --git a/src/core/idhash.h b/src/core/idhash.h
index 4616ccde..088eac55 100644
--- a/src/core/idhash.h
+++ b/src/core/idhash.h
@@ -10,7 +10,7 @@
#ifndef CORE_IDHASH_H
#define CORE_IDHASH_H
-#include "core/nng_impl.h"
+#include "core/defs.h"
// We find that we often want to have a list of things listed by a
// numeric ID, which is generally monotonically increasing. This is
diff --git a/src/core/ioev.c b/src/core/ioev.c
new file mode 100644
index 00000000..716d1313
--- /dev/null
+++ b/src/core/ioev.c
@@ -0,0 +1,169 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <string.h>
+#include "core/nng_impl.h"
+
+#define NNI_IOEV_DONE (1<<0)
+#define NNI_IOEV_BUSY (1<<1)
+#define NNI_IOEV_CANCEL (1<<2)
+#define NNI_IOEV_WAKE (1<<3)
+
+void
+nni_ioev_init(nni_ioev *ioev, nni_cb cb, void *arg)
+{
+ if (cb == NULL) {
+ cb = (nni_cb) nni_ioev_wake;
+ arg = ioev;
+ }
+ memset(ioev, 0, sizeof (*ioev));
+ nni_mtx_init(&ioev->ie_lk);
+ nni_cv_init(&ioev->ie_cv, &ioev->ie_lk);
+ ioev->ie_cb = cb;
+ ioev->ie_cbarg = arg;
+}
+
+
+void
+nni_ioev_fini(nni_ioev *ioev)
+{
+ nni_cv_fini(&ioev->ie_cv);
+ nni_mtx_fini(&ioev->ie_lk);
+}
+
+
+void
+nni_ioev_cancel(nni_ioev *ioev)
+{
+ nni_cb cb;
+ void *arg;
+
+ nni_mtx_lock(&ioev->ie_lk);
+ while ((ioev->ie_flags & NNI_IOEV_BUSY) != 0) {
+ nni_cv_wait(&ioev->ie_cv);
+ }
+ if ((ioev->ie_flags & NNI_IOEV_DONE) != 0) {
+ // Already finished the IO.
+ nni_mtx_unlock(&ioev->ie_lk);
+ return;
+ }
+ ioev->ie_flags |= NNI_IOEV_CANCEL;
+ nni_mtx_unlock(&ioev->ie_lk);
+
+ // Do not hold the lock across the provider! The provider
+ // must not run on this because we have set the cancel flag,
+ // therefore "nni_ioev_start" will return failure. The provider
+ // is responsible for dealing with any linked list issues or such,
+ // and freeing any provider data at this point.
+ ioev->ie_prov_ops.ip_cancel(ioev->ie_prov_data);
+
+ nni_mtx_lock(&ioev->ie_lk);
+ ioev->ie_result = NNG_ECANCELED;
+ cb = ioev->ie_cb;
+ arg = ioev->ie_cbarg;
+ nni_mtx_unlock(&ioev->ie_lk);
+
+ // Call the callback. If none was registered, this will instead
+ // raise the done signal and wake anything blocked in nni_ioev_wait.
+ // (Because cb will be nni_ioev_wake, and arg will be the ioev itself.)
+ cb(arg);
+}
+
+
+int
+nni_ioev_result(nni_ioev *ioev)
+{
+ return (ioev->ie_result);
+}
+
+
+size_t
+nni_ioev_count(nni_ioev *ioev)
+{
+ return (ioev->ie_count);
+}
+
+
+void
+nni_ioev_wake(nni_ioev *ioev)
+{
+ nni_mtx_lock(&ioev->ie_lk);
+ ioev->ie_flags |= NNI_IOEV_WAKE;
+ nni_cv_wake(&ioev->ie_cv);
+ nni_mtx_unlock(&ioev->ie_lk);
+}
+
+
+void
+nni_ioev_wait(nni_ioev *ioev)
+{
+ nni_mtx_lock(&ioev->ie_lk);
+ while ((ioev->ie_flags & NNI_IOEV_WAKE) == 0) {
+ nni_cv_wait(&ioev->ie_cv);
+ }
+ nni_mtx_unlock(&ioev->ie_lk);
+}
+
+
+// I/O provider related functions.
+void
+nni_ioev_set_ops(nni_ioev *ioev, nni_ioev_ops *ops, void *data)
+{
+ memcpy(&ioev->ie_prov_ops, ops, sizeof (*ops));
+ ioev->ie_prov_data = data;
+}
+
+
+// nni_ioev_busy effectively "locks" the IOEV. We'd like to use the
+// underlying mutex, and that would be faster, but the completion might
+// be executed by a thread other than the one that marked it busy, so we
+// use our own flags.
+int
+nni_ioev_busy(nni_ioev *ioev)
+{
+ nni_mtx_lock(&ioev->ie_lk);
+ if ((ioev->ie_flags & NNI_IOEV_CANCEL) != 0) {
+ nni_mtx_unlock(&ioev->ie_lk);
+ return (NNG_ECANCELED);
+ }
+
+ ioev->ie_flags |= NNI_IOEV_BUSY;
+ nni_mtx_unlock(&ioev->ie_lk);
+ return (0);
+}
+
+
+void
+nni_ioev_unbusy(nni_ioev *ioev)
+{
+ nni_mtx_lock(&ioev->ie_lk);
+ ioev->ie_flags &= ~(NNI_IOEV_BUSY);
+ nni_cv_wake(&ioev->ie_cv);
+ nni_mtx_unlock(&ioev->ie_lk);
+}
+
+
+void
+nni_ioev_finish(nni_ioev *ioev, int result, size_t count)
+{
+ nni_cb cb;
+ void *arg;
+
+ nni_mtx_lock(&ioev->ie_lk);
+ ioev->ie_result = result;
+ ioev->ie_count = count;
+ ioev->ie_flags &= ~(NNI_IOEV_BUSY);
+ ioev->ie_flags |= NNI_IOEV_DONE;
+ cb = ioev->ie_cb;
+ arg = ioev->ie_cbarg;
+ nni_cv_wake(&ioev->ie_cv);
+ nni_mtx_unlock(&ioev->ie_lk);
+
+ cb(arg);
+}
diff --git a/src/core/ioev.h b/src/core/ioev.h
new file mode 100644
index 00000000..e635e0c0
--- /dev/null
+++ b/src/core/ioev.h
@@ -0,0 +1,109 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_IOEV_H
+#define CORE_IOEV_H
+
+#include "core/defs.h"
+#include "core/thread.h"
+
+typedef struct nni_ioev_ops nni_ioev_ops;
+typedef struct nni_ioev nni_ioev;
+
+// Provider specific operations on I/O events. We only have cancellation
+// at present.
+struct nni_ioev_ops {
+ // Cancel the I/O. This should not callback up, but instead should
+ // just do whatever is necessary to free provider specific state,
+ // and unlink the I/O from any schedule. The I/O itself will not
+ // have been started if this is called.
+ void (*ip_cancel)(void *);
+};
+
+// An nni_iov is an I/O event handle, used to represent asynchronous I/O.
+// These take several different forms.
+struct nni_ioev {
+ int ie_result; // Result code (nng_errno)
+ size_t ie_count; // Bytes transferred (I/O only)
+ nni_cb ie_cb; // User specified callback.
+ void * ie_cbarg; // Callback argument.
+
+ // These fields are private to the io events framework.
+ nni_mtx ie_lk;
+ nni_cv ie_cv;
+ unsigned ie_flags;
+
+ // Provider data.
+ nni_ioev_ops ie_prov_ops;
+ void * ie_prov_data;
+};
+
+// nni_ioev_init initializes an IO event. The callback is called with
+// the supplied argument when the operation is complete. If NULL is
+// supplied for the callback, then nni_ioev_wake is used in its place,
+// and the ioev is used for the argument.
+extern void nni_ioev_init(nni_ioev *, nni_cb, void *);
+
+// nni_ioev_fini finalizes the IO event, releasing resources (locks)
+// associated with it. The caller is responsible for ensuring that any
+// associated I/O is unscheduled or complete.
+extern void nni_ioev_fini(nni_ioev *);
+
+// nni_ioev_cancel cancels the IO event. The result will be NNG_ECANCELED,
+// unless the underlying IO has already completed.
+extern void nni_ioev_cancel(nni_ioev *);
+
+// nni_ioev_result returns the result code (0 on success, or an NNG errno)
+// for the operation. It is only valid to call this when the operation is
+// complete (such as when the callback is executed or after nni_ioev_wait
+// is performed).
+extern int nni_ioev_result(nni_ioev *);
+
+// nni_ioev_count returns the number of bytes of data transferred, if any.
+// As with nni_ioev_result, it is only defined if the I/O operation has
+// completed.
+extern size_t nni_ioev_count(nni_ioev *);
+
+// nni_ioev_wake wakes any threads blocked in nni_ioev_wait. This is the
+// default callback if no other is supplied. If a user callback is supplied
+// then that code must call this routine to wake any waiters (unless the
+// user code is certain that there are no such waiters).
+extern void nni_ioev_wake(nni_ioev *);
+
+// nni_ioev_wait blocks the caller until the IO event is complete, as indicated
+// by nni_ioev_wake being called. (Recall nni_ioev_wake is the default
+// callback if none is supplied.) If a user supplied callback is provided,
+// and that callback does not call nni_ioev_wake, then this routine may
+// block the caller indefinitely.
+extern void nni_ioev_wait(nni_ioev *);
+
+// I/O provider related functions.
+extern void nni_ioev_set_ops(nni_ioev *, nni_ioev_ops *, void *);
+
+// nni_ioev_busy is called by the provider to begin an operation, marking
+// the IO busy. The framework will avoid calling back into the provider
+// (for cancellation for example) while the ioev is busy. It is important
+// that the busy state be held for only brief periods of time, such as while
+// a non-blocking I/O operation is in progress. If the IO is canceled (or
+// a cancellation is in progress), the function will return NNG_ECANCELED.
+// In this case, the provider must not perform any further I/O operations,
+// and must not call the completion routine. Otherwise zero is returned.
+extern int nni_ioev_busy(nni_ioev *);
+
+// nni_ioev_unbusy clears the "busy" state set by nni_ioev_busy.
+extern void nni_ioev_unbusy(nni_ioev *);
+
+// nni_ioev_finish is called by the provider when an operation is complete.
+// (This can be for any reason other than cancellation.) The provider gives
+// the result code (0 for success, an NNG errno otherwise), and the amount of
+// data transferred (if any). The ioev must have been marked busy when this
+// is called. The ioev busy state is automatically cleared by this routine.
+extern void nni_ioev_finish(nni_ioev *, int, size_t);
+
+#endif // CORE_IOEV_H
diff --git a/src/core/list.h b/src/core/list.h
index 963be01e..dd26377e 100644
--- a/src/core/list.h
+++ b/src/core/list.h
@@ -10,7 +10,7 @@
#ifndef CORE_LIST_H
#define CORE_LIST_H
-#include "core/nng_impl.h"
+#include "core/defs.h"
// In order to make life easy, we just define the list structures
// directly, and let consumers directly inline structures.
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 91dce28a..da3424c5 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -23,16 +23,18 @@
// symbols should be found in the toplevel nng.h header.
#include "core/defs.h"
+#include "core/platform.h"
+
#include "core/clock.h"
#include "core/device.h"
#include "core/idhash.h"
#include "core/init.h"
+#include "core/ioev.h"
#include "core/list.h"
#include "core/message.h"
#include "core/msgqueue.h"
#include "core/options.h"
#include "core/panic.h"
-#include "core/platform.h"
#include "core/protocol.h"
#include "core/random.h"
#include "core/taskq.h"
diff --git a/src/core/pipe.h b/src/core/pipe.h
index d54f1ce7..333a986d 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -15,6 +15,7 @@
// TRANSPORTS.
#include "core/defs.h"
+#include "core/thread.h"
#include "core/transport.h"
struct nni_pipe {
diff --git a/src/core/taskq.h b/src/core/taskq.h
index dc1f4f2d..931e6c77 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -10,7 +10,11 @@
#ifndef CORE_TASKQ_H
#define CORE_TASKQ_H
-#include "core/nng_impl.h"
+#include "core/defs.h"
+#include "core/list.h"
+
+typedef struct nni_taskq nni_taskq;
+typedef struct nni_taskq_ent nni_taskq_ent;
struct nni_taskq_ent {
nni_list_node tqe_node;
diff --git a/src/core/thread.h b/src/core/thread.h
index d896b909..b99c8ffb 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -10,7 +10,8 @@
#ifndef CORE_THREAD_H
#define CORE_THREAD_H
-#include "core/nng_impl.h"
+#include "core/defs.h"
+#include "core/platform.h"
struct nni_mtx {
nni_plat_mtx mtx;