diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-15 21:59:55 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-16 18:31:42 -0700 |
| commit | a9633313ec8e578c805cd53b37ba3360d83157bc (patch) | |
| tree | 14d32c4031ea1c8508a75469407ca77e353fa315 /src/platform/posix/posix_thread.c | |
| parent | e7e2a6c14f0317eb77711951c6f1a650d4013dfe (diff) | |
| download | nng-a9633313ec8e578c805cd53b37ba3360d83157bc.tar.gz nng-a9633313ec8e578c805cd53b37ba3360d83157bc.tar.bz2 nng-a9633313ec8e578c805cd53b37ba3360d83157bc.zip | |
Provide versions of mutex, condvar, and aio init that never fail.
If the underlying platform fails (FreeBSD is the only one I'm aware
of that does this!), we use a global lock or condition variable instead.
This means that our lock initializers never ever fail.
Probably we could eliminate most of this for Linux and Darwin, since
on those platforms, mutex and condvar initialization reasonably never
fails. Initial benchmarks show little difference either way -- so we
can revisit (optimize) later.
This removes a lot of otherwise untested code in error cases and so forth,
improving coverage and resilience in the face of allocation failures.
Platforms other than POSIX should follow a similar pattern if they need
this. (VxWorks, I'm thinking of you.) Most sane platforms won't have
an issue here, since normally these initializations do not need to allocate
memory. (Reportedly, even FreeBSD has plans to "fix" this in libthr2.)
While here, some bugs were fixed in initialization & teardown.
The fallback code is properly tested with dedicated test cases.
Diffstat (limited to 'src/platform/posix/posix_thread.c')
| -rw-r--r-- | src/platform/posix/posix_thread.c | 417 |
1 files changed, 292 insertions, 125 deletions
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 0ef4754c..44bfbed2 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -24,118 +24,325 @@ #include <time.h> #include <unistd.h> -static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; -static int nni_plat_inited = 0; -static int nni_plat_forked = 0; +static pthread_mutex_t nni_plat_init_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t nni_plat_cond_cond = PTHREAD_COND_INITIALIZER; +static pthread_cond_t nni_plat_lock_cond = PTHREAD_COND_INITIALIZER; +static int nni_plat_inited = 0; +static int nni_plat_forked = 0; + +pthread_condattr_t nni_cvattr; +pthread_mutexattr_t nni_mxattr; + +#ifndef NDEBUG +int nni_plat_sync_fallback = 0; +#endif -pthread_condattr_t nni_cvattr; -pthread_mutexattr_t nni_mxattr; -static pthread_attr_t nni_pthread_attr; +enum nni_plat_sync_flags { + NNI_PLAT_SYNC_INIT = 0x01, + NNI_PLAT_SYNC_LOCKED = 0x04, + NNI_PLAT_SYNC_WAIT = 0x08, +}; -// We open a /dev/null file descriptor so that we can dup2() it to -// cause MacOS X to wakeup. This gives us a "safe" close semantic. +void +nni_plat_mtx_init(nni_plat_mtx *mtx) +{ + if (pthread_mutex_init(&mtx->mtx, &nni_mxattr) != 0) { + mtx->fallback = 1; + } else { + mtx->flags = NNI_PLAT_SYNC_INIT; + } +#ifndef NDEBUG + if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) { + mtx->fallback = 1; + } +#endif +} -int nni_plat_devnull = -1; +void +nni_plat_mtx_fini(nni_plat_mtx *mtx) +{ + if (mtx->flags & NNI_PLAT_SYNC_INIT) { + int rv; + if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) { + nni_panic("pthread_mutex_destroy: %s", strerror(rv)); + } + } + mtx->flags = 0; +} -int -nni_plat_mtx_init(nni_plat_mtx *mtx) +static void +nni_pthread_mutex_lock(pthread_mutex_t *m) { int rv; - if ((rv = pthread_mutex_init(&mtx->mtx, &nni_mxattr)) != 0) { - switch (rv) { - case EAGAIN: - case ENOMEM: - return (NNG_ENOMEM); + if ((rv = pthread_mutex_lock(m)) != 0) { + nni_panic("pthread_mutex_lock: %s", strerror(rv)); + } +} - default: - nni_panic("pthread_mutex_init: %s", strerror(rv)); - } +static void +nni_pthread_mutex_unlock(pthread_mutex_t *m) +{ + int rv; + + if ((rv = pthread_mutex_unlock(m)) != 0) { + nni_panic("pthread_mutex_unlock: %s", strerror(rv)); } - mtx->init = 1; - return (0); } -void -nni_plat_mtx_fini(nni_plat_mtx *mtx) +static void +nni_pthread_cond_broadcast(pthread_cond_t *c) { int rv; - if (!mtx->init) { - return; + if ((rv = pthread_cond_broadcast(c)) != 0) { + nni_panic("pthread_cond_broadcast: %s", strerror(rv)); } - pthread_mutex_lock(&mtx->mtx); - pthread_mutex_unlock(&mtx->mtx); - if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_fini: %s", strerror(rv)); +} + +static void +nni_pthread_cond_signal(pthread_cond_t *c) +{ + int rv; + if ((rv = pthread_cond_signal(c)) != 0) { + nni_panic("pthread_cond_signal: %s", strerror(rv)); } - mtx->init = 0; } -void -nni_plat_mtx_lock(nni_plat_mtx *mtx) +static void +nni_pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m) { int rv; - if ((rv = pthread_mutex_lock(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_lock: %s", strerror(rv)); + if ((rv = pthread_cond_wait(c, m)) != 0) { + nni_panic("pthread_cond_wait: %s", strerror(rv)); + } +} + +static int +nni_pthread_cond_timedwait( + pthread_cond_t *c, pthread_mutex_t *m, struct timespec *ts) +{ + int rv; + + switch ((rv = pthread_cond_timedwait(c, m, ts))) { + case 0: + return (0); + case ETIMEDOUT: + case EAGAIN: + return (NNG_ETIMEDOUT); + } + nni_panic("pthread_cond_timedwait: %s", strerror(rv)); + return (NNG_EINVAL); +} + +static void +nni_plat_mtx_lock_fallback_locked(nni_plat_mtx *mtx) +{ + while (mtx->flags & NNI_PLAT_SYNC_LOCKED) { + mtx->flags |= NNI_PLAT_SYNC_WAIT; + nni_pthread_cond_wait(&nni_plat_lock_cond, &nni_plat_lock); } + mtx->flags |= NNI_PLAT_SYNC_LOCKED; mtx->owner = pthread_self(); } +static void +nni_plat_mtx_unlock_fallback_locked(nni_plat_mtx *mtx) +{ + NNI_ASSERT(mtx->flags & NNI_PLAT_SYNC_LOCKED); + mtx->flags &= ~NNI_PLAT_SYNC_LOCKED; + if (mtx->flags & NNI_PLAT_SYNC_WAIT) { + mtx->flags &= ~NNI_PLAT_SYNC_WAIT; + pthread_cond_broadcast(&nni_plat_lock_cond); + } +} + +static void +nni_plat_mtx_lock_fallback(nni_plat_mtx *mtx) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + nni_plat_mtx_lock_fallback_locked(mtx); + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_mtx_unlock_fallback(nni_plat_mtx *mtx) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + nni_plat_mtx_unlock_fallback_locked(mtx); + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wake_fallback(nni_cv *cv) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + if (cv->flags & NNI_PLAT_SYNC_WAIT) { + cv->gen++; + cv->wake = 0; + nni_pthread_cond_broadcast(&nni_plat_cond_cond); + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wake1_fallback(nni_cv *cv) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + if (cv->flags & NNI_PLAT_SYNC_WAIT) { + cv->wake++; + nni_pthread_cond_broadcast(&nni_plat_cond_cond); + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wait_fallback(nni_cv *cv) +{ + int gen; + + nni_pthread_mutex_lock(&nni_plat_lock); + if (!cv->mtx->fallback) { + // transform the mutex to a fallback one. we have it held. + cv->mtx->fallback = 1; + cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED; + nni_pthread_mutex_unlock(&cv->mtx->mtx); + } + + NNI_ASSERT(cv->mtx->owner == pthread_self()); + NNI_ASSERT(cv->mtx->flags & NNI_PLAT_SYNC_LOCKED); + gen = cv->gen; + while ((cv->gen == gen) && (cv->wake == 0)) { + nni_plat_mtx_unlock_fallback_locked(cv->mtx); + cv->flags |= NNI_PLAT_SYNC_WAIT; + nni_pthread_cond_wait(&nni_plat_cond_cond, &nni_plat_lock); + + nni_plat_mtx_lock_fallback_locked(cv->mtx); + } + if (cv->wake > 0) { + cv->wake--; + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static int +nni_plat_cv_until_fallback(nni_cv *cv, struct timespec *ts) +{ + int gen; + int rv = 0; + + if (!cv->mtx->fallback) { + // transform the mutex to a fallback one. we have it held. + cv->mtx->fallback = 1; + cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED; + nni_pthread_mutex_unlock(&cv->mtx->mtx); + } + + nni_pthread_mutex_lock(&nni_plat_lock); + gen = cv->gen; + while ((cv->gen == gen) && (cv->wake == 0)) { + nni_plat_mtx_unlock_fallback_locked(cv->mtx); + cv->flags |= NNI_PLAT_SYNC_WAIT; + rv = nni_pthread_cond_timedwait( + &nni_plat_cond_cond, &nni_plat_lock, ts); + nni_plat_mtx_lock_fallback_locked(cv->mtx); + if (rv != 0) { + break; + } + } + if ((rv == 0) && (cv->wake > 0)) { + cv->wake--; + } + nni_pthread_mutex_unlock(&nni_plat_lock); + return (rv); +} + void -nni_plat_mtx_unlock(nni_plat_mtx *mtx) +nni_plat_mtx_lock(nni_plat_mtx *mtx) { int rv; + if (!mtx->fallback) { + nni_pthread_mutex_lock(&mtx->mtx); + + // We might have changed to a fallback lock; make + // sure this did not occur. Note that transitions to + // fallback locks only happen when a thread accesses + // a condition variable already holding this lock, + // so this is guranteed to be safe. + if (!mtx->fallback) { + mtx->owner = pthread_self(); + return; + } + nni_pthread_mutex_unlock(&mtx->mtx); + } + + // Fallback mode + nni_plat_mtx_lock_fallback(mtx); +} + +void +nni_plat_mtx_unlock(nni_plat_mtx *mtx) +{ NNI_ASSERT(mtx->owner == pthread_self()); mtx->owner = 0; - if ((rv = pthread_mutex_unlock(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_unlock: %s", strerror(rv)); + + if (mtx->fallback) { + nni_plat_mtx_unlock_fallback(mtx); + } else { + nni_pthread_mutex_unlock(&mtx->mtx); } } -int +void nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx) { - int rv; - - if ((rv = pthread_cond_init(&cv->cv, &nni_cvattr)) != 0) { - switch (rv) { - case ENOMEM: - case EAGAIN: - return (NNG_ENOMEM); - - default: - nni_panic("pthread_cond_init: %s", strerror(rv)); - } + if (mtx->fallback || (pthread_cond_init(&cv->cv, &nni_cvattr) != 0)) { + cv->fallback = 1; + } else { + cv->flags = NNI_PLAT_SYNC_INIT; + } +#ifndef NDEBUG + if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) { + cv->fallback = 1; } +#endif cv->mtx = mtx; - return (0); } void nni_plat_cv_wake(nni_plat_cv *cv) { - (void) pthread_cond_broadcast(&cv->cv); + if (cv->fallback) { + nni_plat_cv_wake_fallback(cv); + } else { + nni_pthread_cond_broadcast(&cv->cv); + } } void nni_plat_cv_wake1(nni_plat_cv *cv) { - (void) pthread_cond_signal(&cv->cv); + int rv; + if (cv->fallback) { + nni_plat_cv_wake1_fallback(cv); + } else { + nni_pthread_cond_signal(&cv->cv); + } } void nni_plat_cv_wait(nni_plat_cv *cv) { - int rv; - NNI_ASSERT(cv->mtx->owner == pthread_self()); - if ((rv = pthread_cond_wait(&cv->cv, &cv->mtx->mtx)) != 0) { - nni_panic("pthread_cond_wait: %s", strerror(rv)); + if (cv->fallback) { + nni_plat_cv_wait_fallback(cv); + } else { + nni_pthread_cond_wait(&cv->cv, &cv->mtx->mtx); + cv->mtx->owner = pthread_self(); } - cv->mtx->owner = pthread_self(); } int @@ -150,14 +357,13 @@ nni_plat_cv_until(nni_plat_cv *cv, nni_time until) ts.tv_sec = until / 1000000; ts.tv_nsec = (until % 1000000) * 1000; - rv = pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts); - cv->mtx->owner = pthread_self(); - if (rv == ETIMEDOUT) { - return (NNG_ETIMEDOUT); - } else if (rv != 0) { - nni_panic("pthread_cond_timedwait: %d", rv); + if (cv->fallback) { + rv = nni_plat_cv_until_fallback(cv, &ts); + } else { + rv = nni_pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts); + cv->mtx->owner = pthread_self(); } - return (0); + return (rv); } void @@ -165,13 +371,12 @@ nni_plat_cv_fini(nni_plat_cv *cv) { int rv; - if (cv->mtx == NULL) { - return; - } - if ((rv = pthread_cond_destroy(&cv->cv)) != 0) { + if ((cv->flags & NNI_PLAT_SYNC_INIT) && + ((rv = pthread_cond_destroy(&cv->cv)) != 0)) { nni_panic("pthread_cond_destroy: %s", strerror(rv)); } - cv->mtx = NULL; + cv->flags = 0; + cv->mtx = NULL; } static void * @@ -198,10 +403,10 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg) thr->arg = arg; // POSIX wants functions to return a void *, but we don't care. - rv = pthread_create( - &thr->tid, &nni_pthread_attr, nni_plat_thr_main, thr); + rv = pthread_create(&thr->tid, NULL, nni_plat_thr_main, thr); if (rv != 0) { - // nni_printf("pthread_create: %s", strerror(rv)); + // nni_printf("pthread_create: %s", + // strerror(rv)); return (NNG_ENOMEM); } return (0); @@ -227,7 +432,6 @@ int nni_plat_init(int (*helper)(void)) { int rv; - int devnull; if (nni_plat_forked) { nni_panic("nng is not fork-reentrant safe"); @@ -235,94 +439,60 @@ nni_plat_init(int (*helper)(void)) if (nni_plat_inited) { return (0); // fast path } - if ((devnull = open("/dev/null", O_RDONLY)) < 0) { - return (nni_plat_errno(errno)); - } - pthread_mutex_lock(&nni_plat_lock); + pthread_mutex_lock(&nni_plat_init_lock); if (nni_plat_inited) { // check again under the lock to be sure - pthread_mutex_unlock(&nni_plat_lock); - close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (0); } if (pthread_condattr_init(&nni_cvattr) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (NNG_ENOMEM); } #if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME if (pthread_condattr_setclock(&nni_cvattr, NNG_USE_CLOCKID) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (NNG_ENOMEM); } #endif if (pthread_mutexattr_init(&nni_mxattr) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - pthread_condattr_destroy(&nni_cvattr); - (void) close(devnull); - return (NNG_ENOMEM); - } - - rv = pthread_mutexattr_settype(&nni_mxattr, PTHREAD_MUTEX_ERRORCHECK); - if (rv != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); - pthread_mutexattr_destroy(&nni_mxattr); - pthread_condattr_destroy(&nni_cvattr); - return (NNG_ENOMEM); - } - - rv = pthread_attr_init(&nni_pthread_attr); - if (rv != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(nni_plat_devnull); - pthread_mutexattr_destroy(&nni_mxattr); + pthread_mutex_unlock(&nni_plat_init_lock); pthread_condattr_destroy(&nni_cvattr); return (NNG_ENOMEM); } - // We don't force this, but we want to have it small... we could - // probably get by with even just 8k, but Linux usually wants 16k - // as a minimum. If this fails, its not fatal, just we won't be - // as scalable / thrifty with our use of VM. - //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); + // if this one fails we don't care. + (void) pthread_mutexattr_settype( + &nni_mxattr, PTHREAD_MUTEX_ERRORCHECK); if ((rv = nni_posix_pollq_sysinit()) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(nni_plat_devnull); + pthread_mutex_unlock(&nni_plat_init_lock); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (rv); } if ((rv = nni_posix_resolv_sysinit()) != 0) { - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); nni_posix_pollq_sysfini(); - (void) close(nni_plat_devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (rv); } if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); nni_posix_resolv_sysfini(); nni_posix_pollq_sysfini(); - (void) close(devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (NNG_ENOMEM); } if ((rv = helper()) == 0) { nni_plat_inited = 1; } - nni_plat_devnull = devnull; - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); return (rv); } @@ -330,18 +500,15 @@ nni_plat_init(int (*helper)(void)) void nni_plat_fini(void) { - pthread_mutex_lock(&nni_plat_lock); + pthread_mutex_lock(&nni_plat_init_lock); if (nni_plat_inited) { nni_posix_resolv_sysfini(); nni_posix_pollq_sysfini(); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); - (void) close(nni_plat_devnull); - nni_plat_devnull = -1; - nni_plat_inited = 0; + nni_plat_inited = 0; } - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); } #else |
