diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-02-19 11:44:22 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-02-19 11:44:22 -0800 |
| commit | 1981675919003f2ea01971bb4fccbe99c57a2caf (patch) | |
| tree | 519b56e1bc66aca67d3cb1d87f01c9c270573d03 /src/core | |
| parent | 5eae01c44c2daeddb778608cb48f188b48782c2e (diff) | |
| download | nng-1981675919003f2ea01971bb4fccbe99c57a2caf.tar.gz nng-1981675919003f2ea01971bb4fccbe99c57a2caf.tar.bz2 nng-1981675919003f2ea01971bb4fccbe99c57a2caf.zip | |
Introduce new generic I/O event framework.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/defs.h | 2 | ||||
| -rw-r--r-- | src/core/endpt.h | 3 | ||||
| -rw-r--r-- | src/core/idhash.h | 2 | ||||
| -rw-r--r-- | src/core/ioev.c | 169 | ||||
| -rw-r--r-- | src/core/ioev.h | 109 | ||||
| -rw-r--r-- | src/core/list.h | 2 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 4 | ||||
| -rw-r--r-- | src/core/pipe.h | 1 | ||||
| -rw-r--r-- | src/core/taskq.h | 6 | ||||
| -rw-r--r-- | src/core/thread.h | 3 |
10 files changed, 294 insertions, 7 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index 0a04b732..c730cf41 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -50,8 +50,6 @@ typedef uint64_t nni_time; // Abs. time (usec). typedef int64_t nni_duration; // Rel. time (usec). typedef void (*nni_worker)(void *); -typedef struct nni_taskq nni_taskq; -typedef struct nni_taskq_ent nni_taskq_ent; typedef void (*nni_cb)(void *); // Used by transports for scatter gather I/O. diff --git a/src/core/endpt.h b/src/core/endpt.h index 493355d2..3a30f73e 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -10,6 +10,9 @@ #ifndef CORE_ENDPT_H #define CORE_ENDPT_H +#include "core/defs.h" +#include "core/list.h" +#include "core/thread.h" #include "core/transport.h" // NB: This structure is supplied here for use by the CORE. Use of this diff --git a/src/core/idhash.h b/src/core/idhash.h index 4616ccde..088eac55 100644 --- a/src/core/idhash.h +++ b/src/core/idhash.h @@ -10,7 +10,7 @@ #ifndef CORE_IDHASH_H #define CORE_IDHASH_H -#include "core/nng_impl.h" +#include "core/defs.h" // We find that we often want to have a list of things listed by a // numeric ID, which is generally monotonically increasing. This is diff --git a/src/core/ioev.c b/src/core/ioev.c new file mode 100644 index 00000000..716d1313 --- /dev/null +++ b/src/core/ioev.c @@ -0,0 +1,169 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <string.h> +#include "core/nng_impl.h" + +#define NNI_IOEV_DONE (1<<0) +#define NNI_IOEV_BUSY (1<<1) +#define NNI_IOEV_CANCEL (1<<2) +#define NNI_IOEV_WAKE (1<<3) + +void +nni_ioev_init(nni_ioev *ioev, nni_cb cb, void *arg) +{ + if (cb == NULL) { + cb = (nni_cb) nni_ioev_wake; + arg = ioev; + } + memset(ioev, 0, sizeof (*ioev)); + nni_mtx_init(&ioev->ie_lk); + nni_cv_init(&ioev->ie_cv, &ioev->ie_lk); + ioev->ie_cb = cb; + ioev->ie_cbarg = arg; +} + + +void +nni_ioev_fini(nni_ioev *ioev) +{ + nni_cv_fini(&ioev->ie_cv); + nni_mtx_fini(&ioev->ie_lk); +} + + +void +nni_ioev_cancel(nni_ioev *ioev) +{ + nni_cb cb; + void *arg; + + nni_mtx_lock(&ioev->ie_lk); + while ((ioev->ie_flags & NNI_IOEV_BUSY) != 0) { + nni_cv_wait(&ioev->ie_cv); + } + if ((ioev->ie_flags & NNI_IOEV_DONE) != 0) { + // Already finished the IO. + nni_mtx_unlock(&ioev->ie_lk); + return; + } + ioev->ie_flags |= NNI_IOEV_CANCEL; + nni_mtx_unlock(&ioev->ie_lk); + + // Do not hold the lock across the provider! The provider + // must not run on this because we have set the cancel flag, + // therefore "nni_ioev_start" will return failure. The provider + // is responsible for dealing with any linked list issues or such, + // and freeing any provider data at this point. + ioev->ie_prov_ops.ip_cancel(ioev->ie_prov_data); + + nni_mtx_lock(&ioev->ie_lk); + ioev->ie_result = NNG_ECANCELED; + cb = ioev->ie_cb; + arg = ioev->ie_cbarg; + nni_mtx_unlock(&ioev->ie_lk); + + // Call the callback. If none was registered, this will instead + // raise the done signal and wake anything blocked in nni_ioev_wait. + // (Because cb will be nni_ioev_wake, and arg will be the ioev itself.) + cb(arg); +} + + +int +nni_ioev_result(nni_ioev *ioev) +{ + return (ioev->ie_result); +} + + +size_t +nni_ioev_count(nni_ioev *ioev) +{ + return (ioev->ie_count); +} + + +void +nni_ioev_wake(nni_ioev *ioev) +{ + nni_mtx_lock(&ioev->ie_lk); + ioev->ie_flags |= NNI_IOEV_WAKE; + nni_cv_wake(&ioev->ie_cv); + nni_mtx_unlock(&ioev->ie_lk); +} + + +void +nni_ioev_wait(nni_ioev *ioev) +{ + nni_mtx_lock(&ioev->ie_lk); + while ((ioev->ie_flags & NNI_IOEV_WAKE) == 0) { + nni_cv_wait(&ioev->ie_cv); + } + nni_mtx_unlock(&ioev->ie_lk); +} + + +// I/O provider related functions. +void +nni_ioev_set_ops(nni_ioev *ioev, nni_ioev_ops *ops, void *data) +{ + memcpy(&ioev->ie_prov_ops, ops, sizeof (*ops)); + ioev->ie_prov_data = data; +} + + +// nni_ioev_busy effectively "locks" the IOEV. We'd like to use the +// underlying mutex, and that would be faster, but the completion might +// be executed by a thread other than the one that marked it busy, so we +// use our own flags. +int +nni_ioev_busy(nni_ioev *ioev) +{ + nni_mtx_lock(&ioev->ie_lk); + if ((ioev->ie_flags & NNI_IOEV_CANCEL) != 0) { + nni_mtx_unlock(&ioev->ie_lk); + return (NNG_ECANCELED); + } + + ioev->ie_flags |= NNI_IOEV_BUSY; + nni_mtx_unlock(&ioev->ie_lk); + return (0); +} + + +void +nni_ioev_unbusy(nni_ioev *ioev) +{ + nni_mtx_lock(&ioev->ie_lk); + ioev->ie_flags &= ~(NNI_IOEV_BUSY); + nni_cv_wake(&ioev->ie_cv); + nni_mtx_unlock(&ioev->ie_lk); +} + + +void +nni_ioev_finish(nni_ioev *ioev, int result, size_t count) +{ + nni_cb cb; + void *arg; + + nni_mtx_lock(&ioev->ie_lk); + ioev->ie_result = result; + ioev->ie_count = count; + ioev->ie_flags &= ~(NNI_IOEV_BUSY); + ioev->ie_flags |= NNI_IOEV_DONE; + cb = ioev->ie_cb; + arg = ioev->ie_cbarg; + nni_cv_wake(&ioev->ie_cv); + nni_mtx_unlock(&ioev->ie_lk); + + cb(arg); +} diff --git a/src/core/ioev.h b/src/core/ioev.h new file mode 100644 index 00000000..e635e0c0 --- /dev/null +++ b/src/core/ioev.h @@ -0,0 +1,109 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_IOEV_H +#define CORE_IOEV_H + +#include "core/defs.h" +#include "core/thread.h" + +typedef struct nni_ioev_ops nni_ioev_ops; +typedef struct nni_ioev nni_ioev; + +// Provider specific operations on I/O events. We only have cancellation +// at present. +struct nni_ioev_ops { + // Cancel the I/O. This should not callback up, but instead should + // just do whatever is necessary to free provider specific state, + // and unlink the I/O from any schedule. The I/O itself will not + // have been started if this is called. + void (*ip_cancel)(void *); +}; + +// An nni_iov is an I/O event handle, used to represent asynchronous I/O. +// These take several different forms. +struct nni_ioev { + int ie_result; // Result code (nng_errno) + size_t ie_count; // Bytes transferred (I/O only) + nni_cb ie_cb; // User specified callback. + void * ie_cbarg; // Callback argument. + + // These fields are private to the io events framework. + nni_mtx ie_lk; + nni_cv ie_cv; + unsigned ie_flags; + + // Provider data. + nni_ioev_ops ie_prov_ops; + void * ie_prov_data; +}; + +// nni_ioev_init initializes an IO event. The callback is called with +// the supplied argument when the operation is complete. If NULL is +// supplied for the callback, then nni_ioev_wake is used in its place, +// and the ioev is used for the argument. +extern void nni_ioev_init(nni_ioev *, nni_cb, void *); + +// nni_ioev_fini finalizes the IO event, releasing resources (locks) +// associated with it. The caller is responsible for ensuring that any +// associated I/O is unscheduled or complete. +extern void nni_ioev_fini(nni_ioev *); + +// nni_ioev_cancel cancels the IO event. The result will be NNG_ECANCELED, +// unless the underlying IO has already completed. +extern void nni_ioev_cancel(nni_ioev *); + +// nni_ioev_result returns the result code (0 on success, or an NNG errno) +// for the operation. It is only valid to call this when the operation is +// complete (such as when the callback is executed or after nni_ioev_wait +// is performed). +extern int nni_ioev_result(nni_ioev *); + +// nni_ioev_count returns the number of bytes of data transferred, if any. +// As with nni_ioev_result, it is only defined if the I/O operation has +// completed. +extern size_t nni_ioev_count(nni_ioev *); + +// nni_ioev_wake wakes any threads blocked in nni_ioev_wait. This is the +// default callback if no other is supplied. If a user callback is supplied +// then that code must call this routine to wake any waiters (unless the +// user code is certain that there are no such waiters). +extern void nni_ioev_wake(nni_ioev *); + +// nni_ioev_wait blocks the caller until the IO event is complete, as indicated +// by nni_ioev_wake being called. (Recall nni_ioev_wake is the default +// callback if none is supplied.) If a user supplied callback is provided, +// and that callback does not call nni_ioev_wake, then this routine may +// block the caller indefinitely. +extern void nni_ioev_wait(nni_ioev *); + +// I/O provider related functions. +extern void nni_ioev_set_ops(nni_ioev *, nni_ioev_ops *, void *); + +// nni_ioev_busy is called by the provider to begin an operation, marking +// the IO busy. The framework will avoid calling back into the provider +// (for cancellation for example) while the ioev is busy. It is important +// that the busy state be held for only brief periods of time, such as while +// a non-blocking I/O operation is in progress. If the IO is canceled (or +// a cancellation is in progress), the function will return NNG_ECANCELED. +// In this case, the provider must not perform any further I/O operations, +// and must not call the completion routine. Otherwise zero is returned. +extern int nni_ioev_busy(nni_ioev *); + +// nni_ioev_unbusy clears the "busy" state set by nni_ioev_busy. +extern void nni_ioev_unbusy(nni_ioev *); + +// nni_ioev_finish is called by the provider when an operation is complete. +// (This can be for any reason other than cancellation.) The provider gives +// the result code (0 for success, an NNG errno otherwise), and the amount of +// data transferred (if any). The ioev must have been marked busy when this +// is called. The ioev busy state is automatically cleared by this routine. +extern void nni_ioev_finish(nni_ioev *, int, size_t); + +#endif // CORE_IOEV_H diff --git a/src/core/list.h b/src/core/list.h index 963be01e..dd26377e 100644 --- a/src/core/list.h +++ b/src/core/list.h @@ -10,7 +10,7 @@ #ifndef CORE_LIST_H #define CORE_LIST_H -#include "core/nng_impl.h" +#include "core/defs.h" // In order to make life easy, we just define the list structures // directly, and let consumers directly inline structures. diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 91dce28a..da3424c5 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -23,16 +23,18 @@ // symbols should be found in the toplevel nng.h header. #include "core/defs.h" +#include "core/platform.h" + #include "core/clock.h" #include "core/device.h" #include "core/idhash.h" #include "core/init.h" +#include "core/ioev.h" #include "core/list.h" #include "core/message.h" #include "core/msgqueue.h" #include "core/options.h" #include "core/panic.h" -#include "core/platform.h" #include "core/protocol.h" #include "core/random.h" #include "core/taskq.h" diff --git a/src/core/pipe.h b/src/core/pipe.h index d54f1ce7..333a986d 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -15,6 +15,7 @@ // TRANSPORTS. #include "core/defs.h" +#include "core/thread.h" #include "core/transport.h" struct nni_pipe { diff --git a/src/core/taskq.h b/src/core/taskq.h index dc1f4f2d..931e6c77 100644 --- a/src/core/taskq.h +++ b/src/core/taskq.h @@ -10,7 +10,11 @@ #ifndef CORE_TASKQ_H #define CORE_TASKQ_H -#include "core/nng_impl.h" +#include "core/defs.h" +#include "core/list.h" + +typedef struct nni_taskq nni_taskq; +typedef struct nni_taskq_ent nni_taskq_ent; struct nni_taskq_ent { nni_list_node tqe_node; diff --git a/src/core/thread.h b/src/core/thread.h index d896b909..b99c8ffb 100644 --- a/src/core/thread.h +++ b/src/core/thread.h @@ -10,7 +10,8 @@ #ifndef CORE_THREAD_H #define CORE_THREAD_H -#include "core/nng_impl.h" +#include "core/defs.h" +#include "core/platform.h" struct nni_mtx { nni_plat_mtx mtx; |
