aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-16 16:27:22 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-16 16:27:22 -0800
commitac8415c24ffea645105c3859e814843e81c97f8a (patch)
tree7b64b4aab3de6ce5bdd69c3d5b7ead57f4a4b4e7 /src/core
parentb8f7236aa2928d70d9bff2e1071654982539eeda (diff)
downloadnng-ac8415c24ffea645105c3859e814843e81c97f8a.tar.gz
nng-ac8415c24ffea645105c3859e814843e81c97f8a.tar.bz2
nng-ac8415c24ffea645105c3859e814843e81c97f8a.zip
Start of event framework.
This compiles correctly, but doesn't actually deliver events yet. As part of this, I've made most of the initializables in nng safe to tear-down if uninitialized (or set to zero e.g. via calloc). This makes it loads easier to write the teardown on error code, since I can deinit everything, without worrying about which things have been initialized and which have not.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/defs.h2
-rw-r--r--src/core/event.c103
-rw-r--r--src/core/event.h35
-rw-r--r--src/core/idhash.c6
-rw-r--r--src/core/message.c14
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/platform.h8
-rw-r--r--src/core/socket.c75
-rw-r--r--src/core/socket.h8
-rw-r--r--src/core/thread.c9
-rw-r--r--src/core/thread.h1
11 files changed, 209 insertions, 53 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index 9ba41fae..25b131d4 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -27,6 +27,8 @@ typedef struct nng_endpoint nni_ep;
typedef struct nng_pipe nni_pipe;
typedef struct nng_msg nni_msg;
typedef struct nng_sockaddr nni_sockaddr;
+typedef struct nng_event nni_event;
+typedef struct nng_notify nni_notify;
// These are our own names.
typedef struct nni_tran nni_tran;
diff --git a/src/core/event.c b/src/core/event.c
new file mode 100644
index 00000000..3f7d1143
--- /dev/null
+++ b/src/core/event.c
@@ -0,0 +1,103 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+int
+nni_event_init(nni_event *event, int type, nni_sock *sock)
+{
+ int rv;
+
+ memset(event, 0, sizeof (*event));
+ if ((rv = nni_cv_init(&event->e_cv, &sock->s_mx)) != 0) {
+ return (rv);
+ }
+ NNI_LIST_NODE_INIT(&event->e_node);
+ event->e_type = type;
+ event->e_sock = sock;
+ return (0);
+}
+
+
+void
+nni_event_fini(nni_event *event)
+{
+ nni_cv_fini(&event->e_cv);
+}
+
+
+void
+nni_event_submit(nni_sock *sock, nni_event *event)
+{
+ // Call with socket mutex owned!
+ if (event->e_pending == 0) {
+ event->e_pending = 1;
+ event->e_done = 0;
+ nni_list_append(&sock->s_events, event);
+ nni_cv_wake(&sock->s_notify_cv);
+ }
+}
+
+
+void
+nni_event_wait(nni_sock *sock, nni_event *event)
+{
+ // Call with socket mutex owned!
+ // Note that the socket mutex is dropped during the call.
+ while ((event->e_pending) && (!event->e_done)) {
+ nni_cv_wait(&event->e_cv);
+ }
+}
+
+
+void
+nni_event_notifier(void *arg)
+{
+ nni_sock *sock = arg;
+ nni_event *event;
+ nni_notify *notify;
+
+ nni_mtx_lock(&sock->s_mx);
+ for (;;) {
+ if (sock->s_closing) {
+ break;
+ }
+
+ if ((event = nni_list_first(&sock->s_events)) != NULL) {
+ event->e_pending = 0;
+ nni_list_remove(&sock->s_events, event);
+ nni_mtx_unlock(&sock->s_mx);
+
+ // Lock the notify list, it must not change.
+ nni_mtx_lock(&sock->s_notify_mx);
+ NNI_LIST_FOREACH (&sock->s_notify, notify) {
+ if ((notify->n_mask & event->e_type) == 0) {
+ // No interest.
+ continue;
+ }
+ notify->n_func(event, &notify->n_arg);
+ }
+ nni_mtx_unlock(&sock->s_notify_mx);
+
+ nni_mtx_lock(&sock->s_mx);
+ // Let the event submitter know we are done, unless
+ // they have resubmitted. Submitters can wait on this
+ // lock.
+ event->e_done = 1;
+ nni_cv_wake(&event->e_cv);
+ continue;
+ }
+
+ nni_cv_wait(&sock->s_notify_cv);
+ }
+ nni_mtx_unlock(&sock->s_mx);
+}
diff --git a/src/core/event.h b/src/core/event.h
new file mode 100644
index 00000000..d7faad2e
--- /dev/null
+++ b/src/core/event.h
@@ -0,0 +1,35 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_EVENT_H
+#define CORE_EVENT_H
+
+#include "core/defs.h"
+#include "core/list.h"
+
+struct nng_event {
+ int e_type;
+ nni_sock * e_sock;
+ nni_ep * e_ep;
+ nni_pipe * e_pipe;
+
+ int e_done; // true when notify thr is finished
+ int e_pending; // true if event is queued
+ nni_cv e_cv; // signaled when e_done is noted
+ nni_list_node e_node; // location on the socket list
+};
+
+struct nng_notify {
+ nni_list_node n_node;
+ nng_notify_func n_func;
+ void * n_arg;
+ int n_mask;
+};
+
+#endif // CORE_EVENT_H
diff --git a/src/core/idhash.c b/src/core/idhash.c
index d816ddf4..e541ef36 100644
--- a/src/core/idhash.c
+++ b/src/core/idhash.c
@@ -55,8 +55,10 @@ nni_idhash_create(nni_idhash **hp)
void
nni_idhash_destroy(nni_idhash *h)
{
- nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry));
- NNI_FREE_STRUCT(h);
+ if (h != NULL) {
+ nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry));
+ NNI_FREE_STRUCT(h);
+ }
}
diff --git a/src/core/message.c b/src/core/message.c
index 778e7dfc..346570c9 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -363,13 +363,15 @@ nni_msg_free(nni_msg *m)
{
nni_msgopt *mo;
- nni_chunk_free(&m->m_header);
- nni_chunk_free(&m->m_body);
- while ((mo = nni_list_first(&m->m_options)) != NULL) {
- nni_list_remove(&m->m_options, mo);
- nni_free(mo, sizeof (*mo) + mo->mo_sz);
+ if (m != NULL) {
+ nni_chunk_free(&m->m_header);
+ nni_chunk_free(&m->m_body);
+ while ((mo = nni_list_first(&m->m_options)) != NULL) {
+ nni_list_remove(&m->m_options, mo);
+ nni_free(mo, sizeof (*mo) + mo->mo_sz);
+ }
+ NNI_FREE_STRUCT(m);
}
- NNI_FREE_STRUCT(m);
}
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index ff25f5ad..007e20a6 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -38,6 +38,7 @@
#include "core/transport.h"
// These have to come after the others - particularly transport.h
+#include "core/event.h"
#include "core/pipe.h"
#include "core/socket.h"
#include "core/endpt.h"
diff --git a/src/core/platform.h b/src/core/platform.h
index 8d19a7c8..7afd33ef 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -70,11 +70,11 @@ typedef struct nni_plat_ipcsock nni_plat_ipcsock;
// nni_plat_mtx_init initializes a mutex structure. This may require dynamic
// allocation, depending on the platform. It can return NNG_ENOMEM if that
-// fails.
+// fails. An initialized mutex must be distinguishable from zeroed memory.
extern int nni_plat_mtx_init(nni_plat_mtx *);
// nni_plat_mtx_fini destroys the mutex and releases any resources allocated
-// for it's use.
+// for it's use. If the mutex is zeroed memory, this should do nothing.
extern void nni_plat_mtx_fini(nni_plat_mtx *);
// nni_plat_mtx_lock locks the mutex. This is not recursive -- a mutex can
@@ -93,9 +93,13 @@ extern int nni_plat_mtx_trylock(nni_plat_mtx *);
// supplied with it, and that mutex must always be held when performing any
// operations on the condition variable (other than fini.) This may require
// dynamic allocation, and if so this operation may fail with NNG_ENOMEM.
+// As with mutexes, an initialized mutex should be distinguishable from
+// zeroed memory.
extern int nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *);
// nni_plat_cv_fini releases all resources associated with condition variable.
+// If the cv points to just zeroed memory (was never initialized), it does
+// nothing.
extern void nni_plat_cv_fini(nni_plat_cv *);
// nni_plat_cv_wake wakes all waiters on the condition. This should be
diff --git a/src/core/socket.c b/src/core/socket.c
index ae7cbc13..56347e4e 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -165,6 +165,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
+ NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node);
+ NNI_LIST_INIT(&sock->s_events, nni_event, e_node);
sock->s_sock_ops = *proto->proto_sock_ops;
sops = &sock->s_sock_ops;
@@ -192,47 +194,24 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
pops->pipe_rem = nni_sock_nullop;
}
- if ((rv = nni_mtx_init(&sock->s_mx)) != 0) {
- NNI_FREE_STRUCT(sock);
- return (rv);
- }
- if ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) {
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
- return (rv);
+ if (((rv = nni_mtx_init(&sock->s_mx)) != 0) ||
+ ((rv = nni_mtx_init(&sock->s_notify_mx)) != 0) ||
+ ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) ||
+ ((rv = nni_cv_init(&sock->s_notify_cv, &sock->s_mx)) != 0)) {
+ goto fail;
}
if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) {
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
- return (rv);
+ goto fail;
}
- if ((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) {
- nni_thr_fini(&sock->s_reaper);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
- return (rv);
- }
- if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) {
- nni_msgq_fini(sock->s_uwq);
- nni_thr_fini(&sock->s_reaper);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
- return (rv);
+ if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) ||
+ ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) {
+ goto fail;
}
if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) {
- nni_msgq_fini(sock->s_urq);
- nni_msgq_fini(sock->s_uwq);
- nni_thr_fini(&sock->s_reaper);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
- return (rv);
+ goto fail;
}
// NB: If worker functions are null, then the thread initialization
@@ -241,16 +220,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_worker fn = sops->sock_worker[i];
rv = nni_thr_init(&sock->s_worker_thr[i], fn, sock->s_data);
if (rv != 0) {
- while (i > 0) {
- i--;
- nni_thr_fini(&sock->s_worker_thr[i]);
- }
- sops->sock_fini(&sock->s_data);
- nni_msgq_fini(sock->s_urq);
- nni_msgq_fini(sock->s_uwq);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
+ goto fail;
}
}
@@ -261,6 +231,23 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_thr_run(&sock->s_reaper);
*sockp = sock;
return (0);
+
+fail:
+ sock->s_sock_ops.sock_fini(sock->s_data);
+
+ // And we need to clean up *our* state.
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_thr_fini(&sock->s_worker_thr[i]);
+ }
+ nni_thr_fini(&sock->s_reaper);
+ nni_msgq_fini(sock->s_urq);
+ nni_msgq_fini(sock->s_uwq);
+ nni_cv_fini(&sock->s_notify_cv);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_notify_mx);
+ nni_mtx_fini(&sock->s_mx);
+ NNI_FREE_STRUCT(sock);
+ return (rv);
}
@@ -388,7 +375,9 @@ nni_sock_close(nni_sock *sock)
nni_thr_fini(&sock->s_reaper);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
+ nni_cv_fini(&sock->s_notify_cv);
nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_notify_mx);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 4656c5d7..d8b37252 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -37,9 +37,14 @@ struct nng_socket {
nni_list s_eps; // active endpoints
nni_list s_pipes; // pipes for this socket
+ nni_list s_events; // pending events
+ nni_list s_notify; // event watchers
+ nni_cv s_notify_cv; // wakes notify thread
+ nni_mtx s_notify_mx; // protects s_notify list
nni_list s_reaps; // pipes to reap
nni_thr s_reaper;
+ nni_thr s_notify_thr;
nni_thr s_worker_thr[NNI_MAXWORKERS];
int s_ep_pend; // EP dial/listen in progress
@@ -48,6 +53,9 @@ struct nng_socket {
int s_senderr; // Protocol state machine use
int s_recverr; // Protocol state machine use
+ nni_event s_recv_ev; // Event for readability
+ nni_event s_send_ev; // Event for sendability
+
uint32_t s_nextid; // Next Pipe ID.
};
diff --git a/src/core/thread.c b/src/core/thread.c
index 50d63521..91bacdec 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -115,21 +115,26 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg)
thr->arg = arg;
if ((rv = nni_plat_mtx_init(&thr->mtx)) != 0) {
+ thr->done = 1;
return (rv);
}
if ((rv = nni_plat_cv_init(&thr->cv, &thr->mtx)) != 0) {
nni_plat_mtx_fini(&thr->mtx);
+ thr->done = 1;
return (rv);
}
if (fn == NULL) {
+ thr->init = 1;
thr->done = 1;
return (0);
}
if ((rv = nni_plat_thr_init(&thr->thr, nni_thr_wrap, thr)) != 0) {
+ thr->done = 1;
nni_plat_cv_fini(&thr->cv);
nni_plat_mtx_fini(&thr->mtx);
return (rv);
}
+ thr->init = 1;
return (0);
}
@@ -160,6 +165,9 @@ nni_thr_wait(nni_thr *thr)
void
nni_thr_fini(nni_thr *thr)
{
+ if (!thr->init) {
+ return;
+ }
nni_plat_mtx_lock(&thr->mtx);
thr->stop = 1;
nni_plat_cv_wake(&thr->cv);
@@ -170,6 +178,7 @@ nni_thr_fini(nni_thr *thr)
if (thr->fn != NULL) {
nni_plat_thr_fini(&thr->thr);
}
+
nni_plat_cv_fini(&thr->cv);
nni_plat_mtx_fini(&thr->mtx);
}
diff --git a/src/core/thread.h b/src/core/thread.h
index 95794a6e..44f5c9e4 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -31,6 +31,7 @@ typedef struct {
int start;
int stop;
int done;
+ int init;
} nni_thr;
// nni_mtx_init initializes the mutex. (Win32 programmers take note;