diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-12 12:23:32 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-12 12:23:32 -0800 |
| commit | 0991802d1c91c790c60828145ddecbfe1583f6db (patch) | |
| tree | dc2e47e482dba13fa789a6946c16f0a934abdda4 | |
| parent | 1adefe3879b211a47a784f477d56a9416ae72254 (diff) | |
| download | nng-0991802d1c91c790c60828145ddecbfe1583f6db.tar.gz nng-0991802d1c91c790c60828145ddecbfe1583f6db.tar.bz2 nng-0991802d1c91c790c60828145ddecbfe1583f6db.zip | |
Added threading primitives, more complete transport API.
| -rw-r--r-- | src/core/platform.h | 33 | ||||
| -rw-r--r-- | src/core/transport.c | 62 | ||||
| -rw-r--r-- | src/core/transport.h | 22 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_synch.h | 1 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.h | 67 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 74 |
7 files changed, 225 insertions, 36 deletions
diff --git a/src/core/platform.h b/src/core/platform.h index ea8f1378..e2d81139 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -95,6 +95,7 @@ void nni_mutex_destroy(nni_mutex_t); void nni_mutex_enter(nni_mutex_t); void nni_mutex_exit(nni_mutex_t); int nni_mutex_tryenter(nni_mutex_t); + int nni_cond_create(nni_cond_t *, nni_mutex_t); void nni_cond_destroy(nni_cond_t); @@ -127,6 +128,20 @@ void nni_cond_wait(nni_cond_t); */ int nni_cond_timedwait(nni_cond_t, uint64_t); +typedef struct nni_thread *nni_thread_t; +/* + * nni_thread_creates a thread that runs the given function. The thread + * receives a single argument. + */ +int nni_thread_create(nni_thread_t *, void (*fn)(void *), void *); + +/* + * nni_thread_reap waits for the thread to exit, and then releases any + * resources associated with the thread. After this returns, it + * is an error to reference the thread in any further way. + */ +void nni_thread_reap(nni_thread_t); + /* * nn_clock returns a number of microseconds since some arbitrary time * in the past. The values returned by nni_clock may be used with @@ -139,4 +154,22 @@ uint64_t nni_clock(void); */ void nni_usleep(uint64_t); +/* + * nni_init is called to allow the platform the chance to + * do any necessary initialization. This routine MUST be idempotent, + * and threadsafe, and will be called before any other API calls, and + * may be called at any point thereafter. It is permitted to return + * an error if some critical failure inializing the platform occurs, + * but once this succeeds, all future calls must succeed as well, unless + * nni_fini has been called. + */ +int nni_platform_init(void); + +/* + * nni_platform_fini is called to clean up resources. It is intended to + * be called as the last thing executed in the library, and no other functions + * will be called until nni_platform_init is called. + */ +void nni_fini(void); + #endif /* CORE_PLATFORM_H */ diff --git a/src/core/transport.c b/src/core/transport.c new file mode 100644 index 00000000..67f2ca2f --- /dev/null +++ b/src/core/transport.c @@ -0,0 +1,62 @@ +/* + * Copyright 2016 Garrett D'Amore <garrett@damore.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "core/nng_impl.h" + +/* + * For now the list of transports is hard-wired. Adding new transports + * to the system dynamically is something that might be considered later. + */ +extern struct nni_transport_ops nni_inproc_tran_ops; + +static struct nni_transport_ops *transports[] = { + &nni_inproc_tran_ops, + NULL +}; + +/* + * nni_transport_init initializes the entire transport subsystem, including + * each individual transport. + */ +void +nni_transport_init(void) +{ + int i; + struct nni_transport_ops *ops; + + for (i = 0; (ops = transports[i]) != NULL; i++) { + ops->tran_init(); + } +} + +void +nni_transport_fork(int prefork) +{ + int i; + struct nni_transport_ops *ops; + + for (i = 0; (ops = transports[i]) != NULL; i++) { + if (ops->tran_fork != NULL) { + ops->tran_fork(prefork); + } + } +} diff --git a/src/core/transport.h b/src/core/transport.h index 567cc0c5..65f1976e 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -43,6 +43,28 @@ struct nni_transport_ops { * tran_pipe_ops links our pipe operations. */ const struct nni_pipe_ops *tran_pipe_ops; + + /* + * tran_init, if not NULL, is called once during library + * initialization. + */ + int (*tran_init)(void); + + /* + * tran_fini, if not NULL, is called during library deinitialization. + * It should release any global resources. + */ + void (*tran_fini)(void); + + /* + * tran_fork, if not NULL, is called just before fork + * (e.g. during pthread_atfork() for the prefork phase), + * and again just after returning in the parent. There is + * nothing called for the child. If the transport does not + * create any threads of its own, this can be NULL. (The + * intended use is to prevent O_CLOEXEC races.) + */ + void (*tran_fork)(int prefork); }; struct nni_endpt_ops { diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 89022e60..60a1d663 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -36,6 +36,6 @@ #include "platform/posix/posix_alloc.h" #include "platform/posix/posix_clock.h" #include "platform/posix/posix_synch.h" -/* #include "platform/posix/posix_thread.h" */ +#include "platform/posix/posix_thread.h" #include "platform/posix/posix_vsnprintf.h" #endif diff --git a/src/platform/posix/posix_synch.h b/src/platform/posix/posix_synch.h index b3b15fee..94058d32 100644 --- a/src/platform/posix/posix_synch.h +++ b/src/platform/posix/posix_synch.h @@ -118,7 +118,6 @@ nni_mutex_tryenter(nni_mutex_t m) return (0); } - int cond_attr(pthread_condattr_t **attrpp) { diff --git a/src/platform/posix/posix_thread.h b/src/platform/posix/posix_thread.h new file mode 100644 index 00000000..239959f8 --- /dev/null +++ b/src/platform/posix/posix_thread.h @@ -0,0 +1,67 @@ +/* + * Copyright 2016 Garrett D'Amore <garrett@damore.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom + * the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +/* + * This is more of a direct #include of a .c rather than .h file. + * But having it be a .h makes compiler rules work out properly. Do + * not include this more than once into your program, or you will + * get multiple symbols defined. + */ + +/* + * POSIX threads. + */ + +#include <pthread.h> +#include <time.h> +#include <string.h> + +struct nni_thread { + pthread_t tid; +}; + +int +nni_thread_create(nni_thread_t *tp, void (*fn)(void *), void *arg) +{ + nni_thread_t thr; + int rv; + + if ((thr = nni_alloc(sizeof (*thr))) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = pthread_create(&thr->tid, NULL, (void *)fn, arg)) != 0) { + nni_free(thr, sizeof (*thr)); + return (NNG_ENOMEM); + } + *tp = thr; + return (0); +} + +void +nni_thread_reap(nni_thread_t thr) +{ + int rv; + if ((rv = pthread_join(thr->tid, NULL)) != 0) { + nni_panic("pthread_thread: %s", strerror(errno)); + } + nni_free(thr, sizeof (*thr)); +} diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 8186853d..a209c587 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -75,7 +75,30 @@ struct inproc_ep { */ static inproc_global_t inproc; -void +static int +inproc_init(void) +{ + int rv; + if ((rv = nni_mutex_create(&inproc.mx)) != 0) { + return (rv); + } + if ((rv = nni_cond_create(&inproc.cv, inproc.mx)) != 0) { + nni_mutex_destroy(inproc.mx); + return (rv); + } + NNI_LIST_INIT(&inproc.eps, struct inproc_ep, node); + + return (0); +} + +static void +inproc_fini(void) +{ + nni_cond_destroy(inproc.cv); + nni_mutex_destroy(inproc.mx); +} + +static void inproc_pipe_close(void *arg) { inproc_pipe_t pipe = arg; @@ -102,7 +125,7 @@ inproc_pair_destroy(inproc_pair_t pair) nni_free(pair, sizeof (*pair)); } -void +static void inproc_pipe_destroy(void *arg) { inproc_pipe_t pipe = arg; @@ -121,7 +144,7 @@ inproc_pipe_destroy(void *arg) } } -int +static int inproc_pipe_send(void *arg, nng_msg_t msg) { inproc_pipe_t pipe = arg; @@ -133,7 +156,7 @@ inproc_pipe_send(void *arg, nng_msg_t msg) return (nni_msgqueue_put(pipe->wq, msg, -1)); } -int +static int inproc_pipe_recv(void *arg, nng_msg_t *msgp) { inproc_pipe_t pipe = arg; @@ -141,7 +164,7 @@ inproc_pipe_recv(void *arg, nng_msg_t *msgp) return (nni_msgqueue_get(pipe->rq, msgp, -1)); } -uint16_t +static uint16_t inproc_pipe_peer(void *arg) { inproc_pipe_t pipe = arg; @@ -149,7 +172,7 @@ inproc_pipe_peer(void *arg) return (pipe->peer); } -int +static int inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) { inproc_pipe_t pipe = arg; @@ -170,7 +193,7 @@ inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) return (NNG_ENOTSUP); } -int +static int inproc_ep_create(void **epp, const char *url, uint16_t proto) { inproc_ep_t ep; @@ -191,13 +214,13 @@ inproc_ep_create(void **epp, const char *url, uint16_t proto) return (0); } -void +static void inproc_ep_destroy(void *arg) { NNI_ARG_UNUSED(arg); } -void +static void inproc_ep_close(void *arg) { inproc_ep_t ep = arg; @@ -210,7 +233,8 @@ inproc_ep_close(void *arg) } nni_mutex_exit(inproc.mx); } -int + +static int inproc_ep_dial(void *arg, void **pipep) { inproc_ep_t ep = arg; @@ -255,7 +279,7 @@ inproc_ep_dial(void *arg, void **pipep) return (ep->closed ? NNG_ECLOSED : 0); } -int +static int inproc_ep_listen(void *arg) { inproc_ep_t ep = arg; @@ -285,7 +309,7 @@ inproc_ep_listen(void *arg) return (0); } -int +static int inproc_ep_accept(void *arg, void **pipep) { inproc_ep_t ep = arg; @@ -343,27 +367,6 @@ inproc_ep_accept(void *arg, void **pipep) return (0); } -int -nni_inproc_init(void) -{ - int rv; - if ((rv = nni_mutex_create(&inproc.mx)) != 0) { - return (rv); - } - if ((rv = nni_cond_create(&inproc.cv, inproc.mx)) != 0) { - nni_mutex_destroy(inproc.mx); - return (rv); - } - NNI_LIST_INIT(&inproc.eps, struct inproc_ep, node); - /* XXX: nni_register_transport(); */ - return (0); -} - -void -nni_inproc_term(void) -{ -} - static struct nni_pipe_ops inproc_pipe_ops = { inproc_pipe_destroy, inproc_pipe_send, @@ -384,8 +387,11 @@ static struct nni_endpt_ops inproc_ep_ops = { NULL, /* inproc_ep_getopt */ }; -struct nni_transport_ops inproc_tran_ops = { +struct nni_transport_ops nni_inproc_tran_ops = { "inproc", /* tran_scheme */ &inproc_ep_ops, &inproc_pipe_ops, + inproc_init, /* tran_init */ + inproc_fini, /* tran_fini */ + NULL, /* tran_fork */ }; |
