diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/platform.h | 7 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 9 | ||||
| -rw-r--r-- | src/platform/posix/posix_synch.c | 21 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 38 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 38 |
5 files changed, 75 insertions, 38 deletions
diff --git a/src/core/platform.h b/src/core/platform.h index 104a94a0..ef6df820 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -60,6 +60,7 @@ extern void *nni_alloc(size_t); // Most implementations can just call free() here. extern void nni_free(void *, size_t); +typedef struct nni_mutex nni_mutex; typedef struct nni_mutex * nni_mutex_t; typedef struct nni_cond * nni_cond_t; @@ -70,9 +71,9 @@ extern void nni_mutex_fini(nni_mutex *); extern int nni_mutex_create(nni_mutex_t *); extern void nni_mutex_destroy(nni_mutex_t); -extern void nni_mutex_enter(nni_mutex_t); -extern void nni_mutex_exit(nni_mutex_t); -extern int nni_mutex_tryenter(nni_mutex_t); +extern void nni_mutex_enter(nni_mutex *); +extern void nni_mutex_exit(nni_mutex *); +extern int nni_mutex_tryenter(nni_mutex *); extern int nni_cond_create(nni_cond_t *, nni_mutex_t); extern void nni_cond_destroy(nni_cond_t); diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 0a3151f3..a7b15edc 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -22,14 +22,17 @@ // Define types that this platform uses. #ifdef PLATFORM_POSIX_SYNCH + +#include <pthread.h> + struct nni_mutex { pthread_mutex_t mx; -} +}; -struct nni_condvar { +struct nni_cond { pthread_cond_t cv; pthread_mutex_t * mx; -} +}; #endif #endif // PLATFORM_POSIX_IMPL_H
\ No newline at end of file diff --git a/src/platform/posix/posix_synch.c b/src/platform/posix/posix_synch.c index 13147573..a3129f62 100644 --- a/src/platform/posix/posix_synch.c +++ b/src/platform/posix/posix_synch.c @@ -25,12 +25,15 @@ #include <pthread.h> #include <time.h> +#include <string.h> + +extern pthread_condattr_t nni_condattr; +extern pthread_mutexattr_t nni_mutexattr; int nni_mutex_init(nni_mutex *mp) { - // pthrad_mutex_attr_t attr; - if (pthread_mutex_init(&mp->mx, NULL) != NULL) { + if (pthread_mutex_init(&mp->mx, &nni_mutexattr) != 0) { return (NNG_ENOMEM); } return (0); @@ -42,7 +45,7 @@ nni_mutex_fini(nni_mutex *mp) { int rv; - if ((rv = pthread_mutex_destroy(&mp-- > mx)) != 0) { + if ((rv = pthread_mutex_destroy(&mp->mx)) != 0) { nni_panic("pthread_mutex_destroy failed: %s", strerror(rv)); } } @@ -136,12 +139,10 @@ nni_cond_attr(pthread_condattr_t **attrpp) static int init = 0; int rv; - /* - * For efficiency's sake, we try to reuse the same attr for the - * life of the library. This avoids many reallocations. Technically - * this means that we will leak the attr on exit(), but this is - * preferable to constantly allocating and reallocating it. - */ + // For efficiency's sake, we try to reuse the same attr for the + // life of the library. This avoids many reallocations. Technically + // this means that we will leak the attr on exit(), but this is + // preferable to constantly allocating and reallocating it. if (init) { *attrpp = &attr; return (0); @@ -178,7 +179,7 @@ nni_cond_create(nni_cond_t *cvp, nni_mutex_t mx) pthread_condattr_t *attrp; int rv; - if ((rv = cond_attr(&attrp)) != 0) { + if ((rv = nni_cond_attr(&attrp)) != 0) { return (rv); } if ((c = nni_alloc(sizeof (*c))) == NULL) { diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 8928974c..1bbe6e28 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -71,12 +71,15 @@ nni_thread_reap(nni_thread_t thr) void -atfork_child(void) +nni_atfork_child(void) { plat_fork = 1; } +pthread_condattr_t nni_condattr; +pthread_mutexattr_t nni_mutexattr; + int nni_plat_init(int (*helper)(void)) { @@ -93,7 +96,30 @@ nni_plat_init(int (*helper)(void)) pthread_mutex_unlock(&plat_lock); return (0); } - if (pthread_atfork(NULL, NULL, atfork_child) != 0) { + if (pthread_condattr_init(&nni_condattr) != 0) { + pthread_mutex_unlock(&plat_lock); + return (NNG_ENOMEM); + } +#if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME + if (pthread_condattr_setclock(&nni_condattr, NNG_USE_CLOCKID) != 0) { + pthread_mutex_unlock(&plat_lock); + return (NNG_ENOMEM); + } +#endif + + if (pthread_mutexattr_init(&nni_mutexattr) != 0) { + pthread_mutex_unlock(&plat_lock); + return (NNG_ENOMEM); + } + + if (pthread_mutexattr_settype(&nni_mutexattr, + PTHREAD_MUTEX_ERRORCHECK) != 0) { + pthread_mutex_unlock(&plat_lock); + return (NNG_ENOMEM); + } + + + if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { pthread_mutex_unlock(&plat_lock); return (NNG_ENOMEM); } @@ -109,7 +135,13 @@ nni_plat_init(int (*helper)(void)) void nni_plat_fini(void) { - // XXX: NOTHING *YET* + pthread_mutex_lock(&plat_lock); + if (plat_init) { + pthread_mutexattr_destroy(&nni_mutexattr); + pthread_condattr_destroy(&nni_condattr); + plat_init = 0; + } + pthread_mutex_unlock(&plat_lock); } diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 4c2d36d1..9523ca84 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -20,7 +20,7 @@ typedef struct nni_pair_sock { nni_socket * sock; nni_pipe * pipe; - nni_mutex_t mx; + nni_mutex mx; nni_msgqueue_t uwq; nni_msgqueue_t urq; } nni_pair_sock; @@ -50,7 +50,7 @@ nni_pair_create(void **pairp, nni_socket *sock) if ((pair = nni_alloc(sizeof (*pair))) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mutex_create(&pair->mx)) != 0) { + if ((rv = nni_mutex_init(&pair->mx)) != 0) { nni_free(pair, sizeof (*pair)); return (rv); } @@ -71,7 +71,7 @@ nni_pair_destroy(void *arg) // this wold be the time to shut them all down. We don't, because // the socket already shut us down, and we don't have any other // threads that run. - nni_mutex_destroy(pair->mx); + nni_mutex_fini(&pair->mx); nni_free(pair, sizeof (*pair)); } @@ -90,10 +90,10 @@ nni_pair_shutdown(void *arg) // to notice the failure, and ultimately call back into the socket // to unregister them. The socket can use this to wait for a clean // shutdown of all pipe workers. - nni_mutex_enter(pair->mx); + nni_mutex_enter(&pair->mx); pipe = pair->pipe; pair->pipe = NULL; - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); nni_pipe_close(pipe); } @@ -113,24 +113,24 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe) pp->sthr = NULL; pp->rthr = NULL; - nni_mutex_enter(pair->mx); + nni_mutex_enter(&pair->mx); if (pair->pipe != NULL) { // Already have a peer, denied. - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); nni_free(pp, sizeof (*pp)); return (NNG_EBUSY); } if ((rv = nni_thread_create(&pp->rthr, nni_pair_receiver, pp)) != 0) { - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); return (rv); } if ((rv = nni_thread_create(&pp->sthr, nni_pair_sender, pp)) != 0) { - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); return (rv); } pp->good = 1; pair->pipe = pipe; - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); return (NNG_EINVAL); } @@ -147,12 +147,12 @@ nni_pair_rem_pipe(void *arg, nni_pipe *pipe) if (pp->rthr) { (void) nni_thread_reap(pp->rthr); } - nni_mutex_enter(pair->mx); + nni_mutex_enter(&pair->mx); if (pair->pipe != pipe) { - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); return (NNG_EINVAL); } - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); return (NNG_EINVAL); } @@ -168,12 +168,12 @@ nni_pair_sender(void *arg) nni_msg *msg; int rv; - nni_mutex_enter(pair->mx); + nni_mutex_enter(&pair->mx); if (!pp->good) { - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); return; } - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); for (;;) { @@ -204,12 +204,12 @@ nni_pair_receiver(void *arg) nni_msg *msg; int rv; - nni_mutex_enter(pair->mx); + nni_mutex_enter(&pair->mx); if (!pp->good) { - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); return; } - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); for (;;) { rv = nni_pipe_recv(pipe, &msg); |
