diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-16 16:27:22 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-16 16:27:22 -0800 |
| commit | ac8415c24ffea645105c3859e814843e81c97f8a (patch) | |
| tree | 7b64b4aab3de6ce5bdd69c3d5b7ead57f4a4b4e7 /src/core | |
| parent | b8f7236aa2928d70d9bff2e1071654982539eeda (diff) | |
| download | nng-ac8415c24ffea645105c3859e814843e81c97f8a.tar.gz nng-ac8415c24ffea645105c3859e814843e81c97f8a.tar.bz2 nng-ac8415c24ffea645105c3859e814843e81c97f8a.zip | |
Start of event framework.
This compiles correctly, but doesn't actually deliver events yet.
As part of this, I've made most of the initializables in nng
safe to tear-down if uninitialized (or set to zero e.g. via calloc).
This makes it loads easier to write the teardown on error code, since
I can deinit everything, without worrying about which things have been
initialized and which have not.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/defs.h | 2 | ||||
| -rw-r--r-- | src/core/event.c | 103 | ||||
| -rw-r--r-- | src/core/event.h | 35 | ||||
| -rw-r--r-- | src/core/idhash.c | 6 | ||||
| -rw-r--r-- | src/core/message.c | 14 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/platform.h | 8 | ||||
| -rw-r--r-- | src/core/socket.c | 75 | ||||
| -rw-r--r-- | src/core/socket.h | 8 | ||||
| -rw-r--r-- | src/core/thread.c | 9 | ||||
| -rw-r--r-- | src/core/thread.h | 1 |
11 files changed, 209 insertions, 53 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index 9ba41fae..25b131d4 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -27,6 +27,8 @@ typedef struct nng_endpoint nni_ep; typedef struct nng_pipe nni_pipe; typedef struct nng_msg nni_msg; typedef struct nng_sockaddr nni_sockaddr; +typedef struct nng_event nni_event; +typedef struct nng_notify nni_notify; // These are our own names. typedef struct nni_tran nni_tran; diff --git a/src/core/event.c b/src/core/event.c new file mode 100644 index 00000000..3f7d1143 --- /dev/null +++ b/src/core/event.c @@ -0,0 +1,103 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <stdlib.h> +#include <string.h> + +int +nni_event_init(nni_event *event, int type, nni_sock *sock) +{ + int rv; + + memset(event, 0, sizeof (*event)); + if ((rv = nni_cv_init(&event->e_cv, &sock->s_mx)) != 0) { + return (rv); + } + NNI_LIST_NODE_INIT(&event->e_node); + event->e_type = type; + event->e_sock = sock; + return (0); +} + + +void +nni_event_fini(nni_event *event) +{ + nni_cv_fini(&event->e_cv); +} + + +void +nni_event_submit(nni_sock *sock, nni_event *event) +{ + // Call with socket mutex owned! + if (event->e_pending == 0) { + event->e_pending = 1; + event->e_done = 0; + nni_list_append(&sock->s_events, event); + nni_cv_wake(&sock->s_notify_cv); + } +} + + +void +nni_event_wait(nni_sock *sock, nni_event *event) +{ + // Call with socket mutex owned! + // Note that the socket mutex is dropped during the call. + while ((event->e_pending) && (!event->e_done)) { + nni_cv_wait(&event->e_cv); + } +} + + +void +nni_event_notifier(void *arg) +{ + nni_sock *sock = arg; + nni_event *event; + nni_notify *notify; + + nni_mtx_lock(&sock->s_mx); + for (;;) { + if (sock->s_closing) { + break; + } + + if ((event = nni_list_first(&sock->s_events)) != NULL) { + event->e_pending = 0; + nni_list_remove(&sock->s_events, event); + nni_mtx_unlock(&sock->s_mx); + + // Lock the notify list, it must not change. + nni_mtx_lock(&sock->s_notify_mx); + NNI_LIST_FOREACH (&sock->s_notify, notify) { + if ((notify->n_mask & event->e_type) == 0) { + // No interest. + continue; + } + notify->n_func(event, ¬ify->n_arg); + } + nni_mtx_unlock(&sock->s_notify_mx); + + nni_mtx_lock(&sock->s_mx); + // Let the event submitter know we are done, unless + // they have resubmitted. Submitters can wait on this + // lock. + event->e_done = 1; + nni_cv_wake(&event->e_cv); + continue; + } + + nni_cv_wait(&sock->s_notify_cv); + } + nni_mtx_unlock(&sock->s_mx); +} diff --git a/src/core/event.h b/src/core/event.h new file mode 100644 index 00000000..d7faad2e --- /dev/null +++ b/src/core/event.h @@ -0,0 +1,35 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_EVENT_H +#define CORE_EVENT_H + +#include "core/defs.h" +#include "core/list.h" + +struct nng_event { + int e_type; + nni_sock * e_sock; + nni_ep * e_ep; + nni_pipe * e_pipe; + + int e_done; // true when notify thr is finished + int e_pending; // true if event is queued + nni_cv e_cv; // signaled when e_done is noted + nni_list_node e_node; // location on the socket list +}; + +struct nng_notify { + nni_list_node n_node; + nng_notify_func n_func; + void * n_arg; + int n_mask; +}; + +#endif // CORE_EVENT_H diff --git a/src/core/idhash.c b/src/core/idhash.c index d816ddf4..e541ef36 100644 --- a/src/core/idhash.c +++ b/src/core/idhash.c @@ -55,8 +55,10 @@ nni_idhash_create(nni_idhash **hp) void nni_idhash_destroy(nni_idhash *h) { - nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry)); - NNI_FREE_STRUCT(h); + if (h != NULL) { + nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry)); + NNI_FREE_STRUCT(h); + } } diff --git a/src/core/message.c b/src/core/message.c index 778e7dfc..346570c9 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -363,13 +363,15 @@ nni_msg_free(nni_msg *m) { nni_msgopt *mo; - nni_chunk_free(&m->m_header); - nni_chunk_free(&m->m_body); - while ((mo = nni_list_first(&m->m_options)) != NULL) { - nni_list_remove(&m->m_options, mo); - nni_free(mo, sizeof (*mo) + mo->mo_sz); + if (m != NULL) { + nni_chunk_free(&m->m_header); + nni_chunk_free(&m->m_body); + while ((mo = nni_list_first(&m->m_options)) != NULL) { + nni_list_remove(&m->m_options, mo); + nni_free(mo, sizeof (*mo) + mo->mo_sz); + } + NNI_FREE_STRUCT(m); } - NNI_FREE_STRUCT(m); } diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index ff25f5ad..007e20a6 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -38,6 +38,7 @@ #include "core/transport.h" // These have to come after the others - particularly transport.h +#include "core/event.h" #include "core/pipe.h" #include "core/socket.h" #include "core/endpt.h" diff --git a/src/core/platform.h b/src/core/platform.h index 8d19a7c8..7afd33ef 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -70,11 +70,11 @@ typedef struct nni_plat_ipcsock nni_plat_ipcsock; // nni_plat_mtx_init initializes a mutex structure. This may require dynamic // allocation, depending on the platform. It can return NNG_ENOMEM if that -// fails. +// fails. An initialized mutex must be distinguishable from zeroed memory. extern int nni_plat_mtx_init(nni_plat_mtx *); // nni_plat_mtx_fini destroys the mutex and releases any resources allocated -// for it's use. +// for it's use. If the mutex is zeroed memory, this should do nothing. extern void nni_plat_mtx_fini(nni_plat_mtx *); // nni_plat_mtx_lock locks the mutex. This is not recursive -- a mutex can @@ -93,9 +93,13 @@ extern int nni_plat_mtx_trylock(nni_plat_mtx *); // supplied with it, and that mutex must always be held when performing any // operations on the condition variable (other than fini.) This may require // dynamic allocation, and if so this operation may fail with NNG_ENOMEM. +// As with mutexes, an initialized mutex should be distinguishable from +// zeroed memory. extern int nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *); // nni_plat_cv_fini releases all resources associated with condition variable. +// If the cv points to just zeroed memory (was never initialized), it does +// nothing. extern void nni_plat_cv_fini(nni_plat_cv *); // nni_plat_cv_wake wakes all waiters on the condition. This should be diff --git a/src/core/socket.c b/src/core/socket.c index ae7cbc13..56347e4e 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -165,6 +165,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node); + NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node); + NNI_LIST_INIT(&sock->s_events, nni_event, e_node); sock->s_sock_ops = *proto->proto_sock_ops; sops = &sock->s_sock_ops; @@ -192,47 +194,24 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) pops->pipe_rem = nni_sock_nullop; } - if ((rv = nni_mtx_init(&sock->s_mx)) != 0) { - NNI_FREE_STRUCT(sock); - return (rv); - } - if ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) { - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (rv); + if (((rv = nni_mtx_init(&sock->s_mx)) != 0) || + ((rv = nni_mtx_init(&sock->s_notify_mx)) != 0) || + ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) || + ((rv = nni_cv_init(&sock->s_notify_cv, &sock->s_mx)) != 0)) { + goto fail; } if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (rv); + goto fail; } - if ((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) { - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (rv); - } - if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) { - nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (rv); + if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) || + ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) { + goto fail; } if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) { - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (rv); + goto fail; } // NB: If worker functions are null, then the thread initialization @@ -241,16 +220,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_worker fn = sops->sock_worker[i]; rv = nni_thr_init(&sock->s_worker_thr[i], fn, sock->s_data); if (rv != 0) { - while (i > 0) { - i--; - nni_thr_fini(&sock->s_worker_thr[i]); - } - sops->sock_fini(&sock->s_data); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); + goto fail; } } @@ -261,6 +231,23 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_thr_run(&sock->s_reaper); *sockp = sock; return (0); + +fail: + sock->s_sock_ops.sock_fini(sock->s_data); + + // And we need to clean up *our* state. + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_fini(&sock->s_worker_thr[i]); + } + nni_thr_fini(&sock->s_reaper); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_cv_fini(&sock->s_notify_cv); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_notify_mx); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); + return (rv); } @@ -388,7 +375,9 @@ nni_sock_close(nni_sock *sock) nni_thr_fini(&sock->s_reaper); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); + nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_notify_mx); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); } diff --git a/src/core/socket.h b/src/core/socket.h index 4656c5d7..d8b37252 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -37,9 +37,14 @@ struct nng_socket { nni_list s_eps; // active endpoints nni_list s_pipes; // pipes for this socket + nni_list s_events; // pending events + nni_list s_notify; // event watchers + nni_cv s_notify_cv; // wakes notify thread + nni_mtx s_notify_mx; // protects s_notify list nni_list s_reaps; // pipes to reap nni_thr s_reaper; + nni_thr s_notify_thr; nni_thr s_worker_thr[NNI_MAXWORKERS]; int s_ep_pend; // EP dial/listen in progress @@ -48,6 +53,9 @@ struct nng_socket { int s_senderr; // Protocol state machine use int s_recverr; // Protocol state machine use + nni_event s_recv_ev; // Event for readability + nni_event s_send_ev; // Event for sendability + uint32_t s_nextid; // Next Pipe ID. }; diff --git a/src/core/thread.c b/src/core/thread.c index 50d63521..91bacdec 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -115,21 +115,26 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg) thr->arg = arg; if ((rv = nni_plat_mtx_init(&thr->mtx)) != 0) { + thr->done = 1; return (rv); } if ((rv = nni_plat_cv_init(&thr->cv, &thr->mtx)) != 0) { nni_plat_mtx_fini(&thr->mtx); + thr->done = 1; return (rv); } if (fn == NULL) { + thr->init = 1; thr->done = 1; return (0); } if ((rv = nni_plat_thr_init(&thr->thr, nni_thr_wrap, thr)) != 0) { + thr->done = 1; nni_plat_cv_fini(&thr->cv); nni_plat_mtx_fini(&thr->mtx); return (rv); } + thr->init = 1; return (0); } @@ -160,6 +165,9 @@ nni_thr_wait(nni_thr *thr) void nni_thr_fini(nni_thr *thr) { + if (!thr->init) { + return; + } nni_plat_mtx_lock(&thr->mtx); thr->stop = 1; nni_plat_cv_wake(&thr->cv); @@ -170,6 +178,7 @@ nni_thr_fini(nni_thr *thr) if (thr->fn != NULL) { nni_plat_thr_fini(&thr->thr); } + nni_plat_cv_fini(&thr->cv); nni_plat_mtx_fini(&thr->mtx); } diff --git a/src/core/thread.h b/src/core/thread.h index 95794a6e..44f5c9e4 100644 --- a/src/core/thread.h +++ b/src/core/thread.h @@ -31,6 +31,7 @@ typedef struct { int start; int stop; int done; + int init; } nni_thr; // nni_mtx_init initializes the mutex. (Win32 programmers take note; |
