aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/platform.h33
-rw-r--r--src/core/transport.c62
-rw-r--r--src/core/transport.h22
-rw-r--r--src/platform/posix/posix_impl.h2
-rw-r--r--src/platform/posix/posix_synch.h1
-rw-r--r--src/platform/posix/posix_thread.h67
-rw-r--r--src/transport/inproc/inproc.c74
7 files changed, 225 insertions, 36 deletions
diff --git a/src/core/platform.h b/src/core/platform.h
index ea8f1378..e2d81139 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -95,6 +95,7 @@ void nni_mutex_destroy(nni_mutex_t);
void nni_mutex_enter(nni_mutex_t);
void nni_mutex_exit(nni_mutex_t);
int nni_mutex_tryenter(nni_mutex_t);
+
int nni_cond_create(nni_cond_t *, nni_mutex_t);
void nni_cond_destroy(nni_cond_t);
@@ -127,6 +128,20 @@ void nni_cond_wait(nni_cond_t);
*/
int nni_cond_timedwait(nni_cond_t, uint64_t);
+typedef struct nni_thread *nni_thread_t;
+/*
+ * nni_thread_creates a thread that runs the given function. The thread
+ * receives a single argument.
+ */
+int nni_thread_create(nni_thread_t *, void (*fn)(void *), void *);
+
+/*
+ * nni_thread_reap waits for the thread to exit, and then releases any
+ * resources associated with the thread. After this returns, it
+ * is an error to reference the thread in any further way.
+ */
+void nni_thread_reap(nni_thread_t);
+
/*
* nn_clock returns a number of microseconds since some arbitrary time
* in the past. The values returned by nni_clock may be used with
@@ -139,4 +154,22 @@ uint64_t nni_clock(void);
*/
void nni_usleep(uint64_t);
+/*
+ * nni_init is called to allow the platform the chance to
+ * do any necessary initialization. This routine MUST be idempotent,
+ * and threadsafe, and will be called before any other API calls, and
+ * may be called at any point thereafter. It is permitted to return
+ * an error if some critical failure inializing the platform occurs,
+ * but once this succeeds, all future calls must succeed as well, unless
+ * nni_fini has been called.
+ */
+int nni_platform_init(void);
+
+/*
+ * nni_platform_fini is called to clean up resources. It is intended to
+ * be called as the last thing executed in the library, and no other functions
+ * will be called until nni_platform_init is called.
+ */
+void nni_fini(void);
+
#endif /* CORE_PLATFORM_H */
diff --git a/src/core/transport.c b/src/core/transport.c
new file mode 100644
index 00000000..67f2ca2f
--- /dev/null
+++ b/src/core/transport.c
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2016 Garrett D'Amore <garrett@damore.org>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom
+ * the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included
+ * in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "core/nng_impl.h"
+
+/*
+ * For now the list of transports is hard-wired. Adding new transports
+ * to the system dynamically is something that might be considered later.
+ */
+extern struct nni_transport_ops nni_inproc_tran_ops;
+
+static struct nni_transport_ops *transports[] = {
+ &nni_inproc_tran_ops,
+ NULL
+};
+
+/*
+ * nni_transport_init initializes the entire transport subsystem, including
+ * each individual transport.
+ */
+void
+nni_transport_init(void)
+{
+ int i;
+ struct nni_transport_ops *ops;
+
+ for (i = 0; (ops = transports[i]) != NULL; i++) {
+ ops->tran_init();
+ }
+}
+
+void
+nni_transport_fork(int prefork)
+{
+ int i;
+ struct nni_transport_ops *ops;
+
+ for (i = 0; (ops = transports[i]) != NULL; i++) {
+ if (ops->tran_fork != NULL) {
+ ops->tran_fork(prefork);
+ }
+ }
+}
diff --git a/src/core/transport.h b/src/core/transport.h
index 567cc0c5..65f1976e 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -43,6 +43,28 @@ struct nni_transport_ops {
* tran_pipe_ops links our pipe operations.
*/
const struct nni_pipe_ops *tran_pipe_ops;
+
+ /*
+ * tran_init, if not NULL, is called once during library
+ * initialization.
+ */
+ int (*tran_init)(void);
+
+ /*
+ * tran_fini, if not NULL, is called during library deinitialization.
+ * It should release any global resources.
+ */
+ void (*tran_fini)(void);
+
+ /*
+ * tran_fork, if not NULL, is called just before fork
+ * (e.g. during pthread_atfork() for the prefork phase),
+ * and again just after returning in the parent. There is
+ * nothing called for the child. If the transport does not
+ * create any threads of its own, this can be NULL. (The
+ * intended use is to prevent O_CLOEXEC races.)
+ */
+ void (*tran_fork)(int prefork);
};
struct nni_endpt_ops {
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 89022e60..60a1d663 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -36,6 +36,6 @@
#include "platform/posix/posix_alloc.h"
#include "platform/posix/posix_clock.h"
#include "platform/posix/posix_synch.h"
-/* #include "platform/posix/posix_thread.h" */
+#include "platform/posix/posix_thread.h"
#include "platform/posix/posix_vsnprintf.h"
#endif
diff --git a/src/platform/posix/posix_synch.h b/src/platform/posix/posix_synch.h
index b3b15fee..94058d32 100644
--- a/src/platform/posix/posix_synch.h
+++ b/src/platform/posix/posix_synch.h
@@ -118,7 +118,6 @@ nni_mutex_tryenter(nni_mutex_t m)
return (0);
}
-
int
cond_attr(pthread_condattr_t **attrpp)
{
diff --git a/src/platform/posix/posix_thread.h b/src/platform/posix/posix_thread.h
new file mode 100644
index 00000000..239959f8
--- /dev/null
+++ b/src/platform/posix/posix_thread.h
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2016 Garrett D'Amore <garrett@damore.org>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom
+ * the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included
+ * in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+/*
+ * This is more of a direct #include of a .c rather than .h file.
+ * But having it be a .h makes compiler rules work out properly. Do
+ * not include this more than once into your program, or you will
+ * get multiple symbols defined.
+ */
+
+/*
+ * POSIX threads.
+ */
+
+#include <pthread.h>
+#include <time.h>
+#include <string.h>
+
+struct nni_thread {
+ pthread_t tid;
+};
+
+int
+nni_thread_create(nni_thread_t *tp, void (*fn)(void *), void *arg)
+{
+ nni_thread_t thr;
+ int rv;
+
+ if ((thr = nni_alloc(sizeof (*thr))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = pthread_create(&thr->tid, NULL, (void *)fn, arg)) != 0) {
+ nni_free(thr, sizeof (*thr));
+ return (NNG_ENOMEM);
+ }
+ *tp = thr;
+ return (0);
+}
+
+void
+nni_thread_reap(nni_thread_t thr)
+{
+ int rv;
+ if ((rv = pthread_join(thr->tid, NULL)) != 0) {
+ nni_panic("pthread_thread: %s", strerror(errno));
+ }
+ nni_free(thr, sizeof (*thr));
+}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 8186853d..a209c587 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -75,7 +75,30 @@ struct inproc_ep {
*/
static inproc_global_t inproc;
-void
+static int
+inproc_init(void)
+{
+ int rv;
+ if ((rv = nni_mutex_create(&inproc.mx)) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_cond_create(&inproc.cv, inproc.mx)) != 0) {
+ nni_mutex_destroy(inproc.mx);
+ return (rv);
+ }
+ NNI_LIST_INIT(&inproc.eps, struct inproc_ep, node);
+
+ return (0);
+}
+
+static void
+inproc_fini(void)
+{
+ nni_cond_destroy(inproc.cv);
+ nni_mutex_destroy(inproc.mx);
+}
+
+static void
inproc_pipe_close(void *arg)
{
inproc_pipe_t pipe = arg;
@@ -102,7 +125,7 @@ inproc_pair_destroy(inproc_pair_t pair)
nni_free(pair, sizeof (*pair));
}
-void
+static void
inproc_pipe_destroy(void *arg)
{
inproc_pipe_t pipe = arg;
@@ -121,7 +144,7 @@ inproc_pipe_destroy(void *arg)
}
}
-int
+static int
inproc_pipe_send(void *arg, nng_msg_t msg)
{
inproc_pipe_t pipe = arg;
@@ -133,7 +156,7 @@ inproc_pipe_send(void *arg, nng_msg_t msg)
return (nni_msgqueue_put(pipe->wq, msg, -1));
}
-int
+static int
inproc_pipe_recv(void *arg, nng_msg_t *msgp)
{
inproc_pipe_t pipe = arg;
@@ -141,7 +164,7 @@ inproc_pipe_recv(void *arg, nng_msg_t *msgp)
return (nni_msgqueue_get(pipe->rq, msgp, -1));
}
-uint16_t
+static uint16_t
inproc_pipe_peer(void *arg)
{
inproc_pipe_t pipe = arg;
@@ -149,7 +172,7 @@ inproc_pipe_peer(void *arg)
return (pipe->peer);
}
-int
+static int
inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
{
inproc_pipe_t pipe = arg;
@@ -170,7 +193,7 @@ inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
return (NNG_ENOTSUP);
}
-int
+static int
inproc_ep_create(void **epp, const char *url, uint16_t proto)
{
inproc_ep_t ep;
@@ -191,13 +214,13 @@ inproc_ep_create(void **epp, const char *url, uint16_t proto)
return (0);
}
-void
+static void
inproc_ep_destroy(void *arg)
{
NNI_ARG_UNUSED(arg);
}
-void
+static void
inproc_ep_close(void *arg)
{
inproc_ep_t ep = arg;
@@ -210,7 +233,8 @@ inproc_ep_close(void *arg)
}
nni_mutex_exit(inproc.mx);
}
-int
+
+static int
inproc_ep_dial(void *arg, void **pipep)
{
inproc_ep_t ep = arg;
@@ -255,7 +279,7 @@ inproc_ep_dial(void *arg, void **pipep)
return (ep->closed ? NNG_ECLOSED : 0);
}
-int
+static int
inproc_ep_listen(void *arg)
{
inproc_ep_t ep = arg;
@@ -285,7 +309,7 @@ inproc_ep_listen(void *arg)
return (0);
}
-int
+static int
inproc_ep_accept(void *arg, void **pipep)
{
inproc_ep_t ep = arg;
@@ -343,27 +367,6 @@ inproc_ep_accept(void *arg, void **pipep)
return (0);
}
-int
-nni_inproc_init(void)
-{
- int rv;
- if ((rv = nni_mutex_create(&inproc.mx)) != 0) {
- return (rv);
- }
- if ((rv = nni_cond_create(&inproc.cv, inproc.mx)) != 0) {
- nni_mutex_destroy(inproc.mx);
- return (rv);
- }
- NNI_LIST_INIT(&inproc.eps, struct inproc_ep, node);
- /* XXX: nni_register_transport(); */
- return (0);
-}
-
-void
-nni_inproc_term(void)
-{
-}
-
static struct nni_pipe_ops inproc_pipe_ops = {
inproc_pipe_destroy,
inproc_pipe_send,
@@ -384,8 +387,11 @@ static struct nni_endpt_ops inproc_ep_ops = {
NULL, /* inproc_ep_getopt */
};
-struct nni_transport_ops inproc_tran_ops = {
+struct nni_transport_ops nni_inproc_tran_ops = {
"inproc", /* tran_scheme */
&inproc_ep_ops,
&inproc_pipe_ops,
+ inproc_init, /* tran_init */
+ inproc_fini, /* tran_fini */
+ NULL, /* tran_fork */
};