diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-15 21:59:55 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-16 18:31:42 -0700 |
| commit | a9633313ec8e578c805cd53b37ba3360d83157bc (patch) | |
| tree | 14d32c4031ea1c8508a75469407ca77e353fa315 /src/platform | |
| parent | e7e2a6c14f0317eb77711951c6f1a650d4013dfe (diff) | |
| download | nng-a9633313ec8e578c805cd53b37ba3360d83157bc.tar.gz nng-a9633313ec8e578c805cd53b37ba3360d83157bc.tar.bz2 nng-a9633313ec8e578c805cd53b37ba3360d83157bc.zip | |
Provide versions of mutex, condvar, and aio init that never fail.
If the underlying platform fails (FreeBSD is the only one I'm aware
of that does this!), we use a global lock or condition variable instead.
This means that our lock initializers never ever fail.
Probably we could eliminate most of this for Linux and Darwin, since
on those platforms, mutex and condvar initialization reasonably never
fails. Initial benchmarks show little difference either way -- so we
can revisit (optimize) later.
This removes a lot of otherwise untested code in error cases and so forth,
improving coverage and resilience in the face of allocation failures.
Platforms other than POSIX should follow a similar pattern if they need
this. (VxWorks, I'm thinking of you.) Most sane platforms won't have
an issue here, since normally these initializations do not need to allocate
memory. (Reportedly, even FreeBSD has plans to "fix" this in libthr2.)
While here, some bugs were fixed in initialization & teardown.
The fallback code is properly tested with dedicated test cases.
Diffstat (limited to 'src/platform')
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 11 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 17 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipe.c | 1 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 5 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 7 | ||||
| -rw-r--r-- | src/platform/posix/posix_resolv_gai.c | 5 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 417 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 5 | ||||
| -rw-r--r-- | src/platform/windows/win_iocp.c | 13 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 7 | ||||
| -rw-r--r-- | src/platform/windows/win_net.c | 2 | ||||
| -rw-r--r-- | src/platform/windows/win_pipe.c | 2 | ||||
| -rw-r--r-- | src/platform/windows/win_resolv.c | 5 | ||||
| -rw-r--r-- | src/platform/windows/win_thread.c | 6 |
14 files changed, 326 insertions, 177 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 46fe2bea..f511f35f 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -126,10 +126,8 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) static void nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) { - nni_aio * aio; - int newfd; - struct sockaddr_storage ss; - socklen_t slen; + nni_aio *aio; + int newfd; while ((aio = nni_list_first(&ed->acceptq)) != NULL) { // We could argue that knowing the remote peer address would @@ -456,10 +454,7 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&ed->mtx)) != 0) { - NNI_FREE_STRUCT(ed); - return (rv); - } + nni_mtx_init(&ed->mtx); // We could randomly choose a different pollq, or for efficiencies // sake we could take a modulo of the file desc number to choose diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 46ebbc1d..81fbd48b 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -54,9 +54,19 @@ extern int nni_plat_errno(int); // elsewhere. struct nni_plat_mtx { - int init; pthread_t owner; pthread_mutex_t mtx; + int fallback; + int flags; +}; + +struct nni_plat_cv { + pthread_cond_t cv; + nni_plat_mtx * mtx; + int fallback; + int flags; + int gen; + int wake; }; struct nni_plat_thr { @@ -65,11 +75,6 @@ struct nni_plat_thr { void *arg; }; -struct nni_plat_cv { - pthread_cond_t cv; - nni_plat_mtx * mtx; -}; - #endif extern int nni_posix_pollq_sysinit(void); diff --git a/src/platform/posix/posix_pipe.c b/src/platform/posix/posix_pipe.c index 78415d26..314e2f5e 100644 --- a/src/platform/posix/posix_pipe.c +++ b/src/platform/posix/posix_pipe.c @@ -105,7 +105,6 @@ void nni_plat_pipe_clear(int rfd) { char buf[32]; - int rv; for (;;) { // Completely drain the pipe, but don't wait. This coalesces diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index bd74e0c0..b2c1cb1f 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -302,10 +302,6 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) // one. For now we just have a global pollq. Note that by tying // the pd to a single pollq we may get some kind of cache warmth. - if ((rv = nni_mtx_init(&pd->mtx)) != 0) { - NNI_FREE_STRUCT(pd); - return (rv); - } pd->closed = 0; pd->node.fd = fd; pd->node.cb = nni_posix_pipedesc_cb; @@ -313,6 +309,7 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) (void) fcntl(fd, F_SETFL, O_NONBLOCK); + nni_mtx_init(&pd->mtx); nni_aio_list_init(&pd->readq); nni_aio_list_init(&pd->writeq); diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index d3fdf394..df5a1799 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -347,9 +347,10 @@ nni_posix_pollq_init(nni_posix_pollq *pq) pq->wakerfd = -1; pq->close = 0; - if (((rv = nni_mtx_init(&pq->mtx)) != 0) || - ((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) || - ((rv = nni_posix_pollq_poll_grow(pq)) != 0) || + nni_mtx_init(&pq->mtx); + nni_cv_init(&pq->cv, &pq->mtx); + + if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) || ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) || ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) { nni_posix_pollq_fini(pq); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 09d40b94..dce8270a 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -276,9 +276,8 @@ nni_posix_resolv_sysinit(void) { int rv; - if ((rv = nni_mtx_init(&nni_posix_resolv_mtx)) != 0) { - return (rv); - } + nni_mtx_init(&nni_posix_resolv_mtx); + if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) { nni_mtx_fini(&nni_posix_resolv_mtx); return (rv); diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 0ef4754c..44bfbed2 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -24,118 +24,325 @@ #include <time.h> #include <unistd.h> -static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; -static int nni_plat_inited = 0; -static int nni_plat_forked = 0; +static pthread_mutex_t nni_plat_init_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t nni_plat_cond_cond = PTHREAD_COND_INITIALIZER; +static pthread_cond_t nni_plat_lock_cond = PTHREAD_COND_INITIALIZER; +static int nni_plat_inited = 0; +static int nni_plat_forked = 0; + +pthread_condattr_t nni_cvattr; +pthread_mutexattr_t nni_mxattr; + +#ifndef NDEBUG +int nni_plat_sync_fallback = 0; +#endif -pthread_condattr_t nni_cvattr; -pthread_mutexattr_t nni_mxattr; -static pthread_attr_t nni_pthread_attr; +enum nni_plat_sync_flags { + NNI_PLAT_SYNC_INIT = 0x01, + NNI_PLAT_SYNC_LOCKED = 0x04, + NNI_PLAT_SYNC_WAIT = 0x08, +}; -// We open a /dev/null file descriptor so that we can dup2() it to -// cause MacOS X to wakeup. This gives us a "safe" close semantic. +void +nni_plat_mtx_init(nni_plat_mtx *mtx) +{ + if (pthread_mutex_init(&mtx->mtx, &nni_mxattr) != 0) { + mtx->fallback = 1; + } else { + mtx->flags = NNI_PLAT_SYNC_INIT; + } +#ifndef NDEBUG + if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) { + mtx->fallback = 1; + } +#endif +} -int nni_plat_devnull = -1; +void +nni_plat_mtx_fini(nni_plat_mtx *mtx) +{ + if (mtx->flags & NNI_PLAT_SYNC_INIT) { + int rv; + if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) { + nni_panic("pthread_mutex_destroy: %s", strerror(rv)); + } + } + mtx->flags = 0; +} -int -nni_plat_mtx_init(nni_plat_mtx *mtx) +static void +nni_pthread_mutex_lock(pthread_mutex_t *m) { int rv; - if ((rv = pthread_mutex_init(&mtx->mtx, &nni_mxattr)) != 0) { - switch (rv) { - case EAGAIN: - case ENOMEM: - return (NNG_ENOMEM); + if ((rv = pthread_mutex_lock(m)) != 0) { + nni_panic("pthread_mutex_lock: %s", strerror(rv)); + } +} - default: - nni_panic("pthread_mutex_init: %s", strerror(rv)); - } +static void +nni_pthread_mutex_unlock(pthread_mutex_t *m) +{ + int rv; + + if ((rv = pthread_mutex_unlock(m)) != 0) { + nni_panic("pthread_mutex_unlock: %s", strerror(rv)); } - mtx->init = 1; - return (0); } -void -nni_plat_mtx_fini(nni_plat_mtx *mtx) +static void +nni_pthread_cond_broadcast(pthread_cond_t *c) { int rv; - if (!mtx->init) { - return; + if ((rv = pthread_cond_broadcast(c)) != 0) { + nni_panic("pthread_cond_broadcast: %s", strerror(rv)); } - pthread_mutex_lock(&mtx->mtx); - pthread_mutex_unlock(&mtx->mtx); - if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_fini: %s", strerror(rv)); +} + +static void +nni_pthread_cond_signal(pthread_cond_t *c) +{ + int rv; + if ((rv = pthread_cond_signal(c)) != 0) { + nni_panic("pthread_cond_signal: %s", strerror(rv)); } - mtx->init = 0; } -void -nni_plat_mtx_lock(nni_plat_mtx *mtx) +static void +nni_pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m) { int rv; - if ((rv = pthread_mutex_lock(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_lock: %s", strerror(rv)); + if ((rv = pthread_cond_wait(c, m)) != 0) { + nni_panic("pthread_cond_wait: %s", strerror(rv)); + } +} + +static int +nni_pthread_cond_timedwait( + pthread_cond_t *c, pthread_mutex_t *m, struct timespec *ts) +{ + int rv; + + switch ((rv = pthread_cond_timedwait(c, m, ts))) { + case 0: + return (0); + case ETIMEDOUT: + case EAGAIN: + return (NNG_ETIMEDOUT); + } + nni_panic("pthread_cond_timedwait: %s", strerror(rv)); + return (NNG_EINVAL); +} + +static void +nni_plat_mtx_lock_fallback_locked(nni_plat_mtx *mtx) +{ + while (mtx->flags & NNI_PLAT_SYNC_LOCKED) { + mtx->flags |= NNI_PLAT_SYNC_WAIT; + nni_pthread_cond_wait(&nni_plat_lock_cond, &nni_plat_lock); } + mtx->flags |= NNI_PLAT_SYNC_LOCKED; mtx->owner = pthread_self(); } +static void +nni_plat_mtx_unlock_fallback_locked(nni_plat_mtx *mtx) +{ + NNI_ASSERT(mtx->flags & NNI_PLAT_SYNC_LOCKED); + mtx->flags &= ~NNI_PLAT_SYNC_LOCKED; + if (mtx->flags & NNI_PLAT_SYNC_WAIT) { + mtx->flags &= ~NNI_PLAT_SYNC_WAIT; + pthread_cond_broadcast(&nni_plat_lock_cond); + } +} + +static void +nni_plat_mtx_lock_fallback(nni_plat_mtx *mtx) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + nni_plat_mtx_lock_fallback_locked(mtx); + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_mtx_unlock_fallback(nni_plat_mtx *mtx) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + nni_plat_mtx_unlock_fallback_locked(mtx); + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wake_fallback(nni_cv *cv) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + if (cv->flags & NNI_PLAT_SYNC_WAIT) { + cv->gen++; + cv->wake = 0; + nni_pthread_cond_broadcast(&nni_plat_cond_cond); + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wake1_fallback(nni_cv *cv) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + if (cv->flags & NNI_PLAT_SYNC_WAIT) { + cv->wake++; + nni_pthread_cond_broadcast(&nni_plat_cond_cond); + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wait_fallback(nni_cv *cv) +{ + int gen; + + nni_pthread_mutex_lock(&nni_plat_lock); + if (!cv->mtx->fallback) { + // transform the mutex to a fallback one. we have it held. + cv->mtx->fallback = 1; + cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED; + nni_pthread_mutex_unlock(&cv->mtx->mtx); + } + + NNI_ASSERT(cv->mtx->owner == pthread_self()); + NNI_ASSERT(cv->mtx->flags & NNI_PLAT_SYNC_LOCKED); + gen = cv->gen; + while ((cv->gen == gen) && (cv->wake == 0)) { + nni_plat_mtx_unlock_fallback_locked(cv->mtx); + cv->flags |= NNI_PLAT_SYNC_WAIT; + nni_pthread_cond_wait(&nni_plat_cond_cond, &nni_plat_lock); + + nni_plat_mtx_lock_fallback_locked(cv->mtx); + } + if (cv->wake > 0) { + cv->wake--; + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static int +nni_plat_cv_until_fallback(nni_cv *cv, struct timespec *ts) +{ + int gen; + int rv = 0; + + if (!cv->mtx->fallback) { + // transform the mutex to a fallback one. we have it held. + cv->mtx->fallback = 1; + cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED; + nni_pthread_mutex_unlock(&cv->mtx->mtx); + } + + nni_pthread_mutex_lock(&nni_plat_lock); + gen = cv->gen; + while ((cv->gen == gen) && (cv->wake == 0)) { + nni_plat_mtx_unlock_fallback_locked(cv->mtx); + cv->flags |= NNI_PLAT_SYNC_WAIT; + rv = nni_pthread_cond_timedwait( + &nni_plat_cond_cond, &nni_plat_lock, ts); + nni_plat_mtx_lock_fallback_locked(cv->mtx); + if (rv != 0) { + break; + } + } + if ((rv == 0) && (cv->wake > 0)) { + cv->wake--; + } + nni_pthread_mutex_unlock(&nni_plat_lock); + return (rv); +} + void -nni_plat_mtx_unlock(nni_plat_mtx *mtx) +nni_plat_mtx_lock(nni_plat_mtx *mtx) { int rv; + if (!mtx->fallback) { + nni_pthread_mutex_lock(&mtx->mtx); + + // We might have changed to a fallback lock; make + // sure this did not occur. Note that transitions to + // fallback locks only happen when a thread accesses + // a condition variable already holding this lock, + // so this is guranteed to be safe. + if (!mtx->fallback) { + mtx->owner = pthread_self(); + return; + } + nni_pthread_mutex_unlock(&mtx->mtx); + } + + // Fallback mode + nni_plat_mtx_lock_fallback(mtx); +} + +void +nni_plat_mtx_unlock(nni_plat_mtx *mtx) +{ NNI_ASSERT(mtx->owner == pthread_self()); mtx->owner = 0; - if ((rv = pthread_mutex_unlock(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_unlock: %s", strerror(rv)); + + if (mtx->fallback) { + nni_plat_mtx_unlock_fallback(mtx); + } else { + nni_pthread_mutex_unlock(&mtx->mtx); } } -int +void nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx) { - int rv; - - if ((rv = pthread_cond_init(&cv->cv, &nni_cvattr)) != 0) { - switch (rv) { - case ENOMEM: - case EAGAIN: - return (NNG_ENOMEM); - - default: - nni_panic("pthread_cond_init: %s", strerror(rv)); - } + if (mtx->fallback || (pthread_cond_init(&cv->cv, &nni_cvattr) != 0)) { + cv->fallback = 1; + } else { + cv->flags = NNI_PLAT_SYNC_INIT; + } +#ifndef NDEBUG + if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) { + cv->fallback = 1; } +#endif cv->mtx = mtx; - return (0); } void nni_plat_cv_wake(nni_plat_cv *cv) { - (void) pthread_cond_broadcast(&cv->cv); + if (cv->fallback) { + nni_plat_cv_wake_fallback(cv); + } else { + nni_pthread_cond_broadcast(&cv->cv); + } } void nni_plat_cv_wake1(nni_plat_cv *cv) { - (void) pthread_cond_signal(&cv->cv); + int rv; + if (cv->fallback) { + nni_plat_cv_wake1_fallback(cv); + } else { + nni_pthread_cond_signal(&cv->cv); + } } void nni_plat_cv_wait(nni_plat_cv *cv) { - int rv; - NNI_ASSERT(cv->mtx->owner == pthread_self()); - if ((rv = pthread_cond_wait(&cv->cv, &cv->mtx->mtx)) != 0) { - nni_panic("pthread_cond_wait: %s", strerror(rv)); + if (cv->fallback) { + nni_plat_cv_wait_fallback(cv); + } else { + nni_pthread_cond_wait(&cv->cv, &cv->mtx->mtx); + cv->mtx->owner = pthread_self(); } - cv->mtx->owner = pthread_self(); } int @@ -150,14 +357,13 @@ nni_plat_cv_until(nni_plat_cv *cv, nni_time until) ts.tv_sec = until / 1000000; ts.tv_nsec = (until % 1000000) * 1000; - rv = pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts); - cv->mtx->owner = pthread_self(); - if (rv == ETIMEDOUT) { - return (NNG_ETIMEDOUT); - } else if (rv != 0) { - nni_panic("pthread_cond_timedwait: %d", rv); + if (cv->fallback) { + rv = nni_plat_cv_until_fallback(cv, &ts); + } else { + rv = nni_pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts); + cv->mtx->owner = pthread_self(); } - return (0); + return (rv); } void @@ -165,13 +371,12 @@ nni_plat_cv_fini(nni_plat_cv *cv) { int rv; - if (cv->mtx == NULL) { - return; - } - if ((rv = pthread_cond_destroy(&cv->cv)) != 0) { + if ((cv->flags & NNI_PLAT_SYNC_INIT) && + ((rv = pthread_cond_destroy(&cv->cv)) != 0)) { nni_panic("pthread_cond_destroy: %s", strerror(rv)); } - cv->mtx = NULL; + cv->flags = 0; + cv->mtx = NULL; } static void * @@ -198,10 +403,10 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg) thr->arg = arg; // POSIX wants functions to return a void *, but we don't care. - rv = pthread_create( - &thr->tid, &nni_pthread_attr, nni_plat_thr_main, thr); + rv = pthread_create(&thr->tid, NULL, nni_plat_thr_main, thr); if (rv != 0) { - // nni_printf("pthread_create: %s", strerror(rv)); + // nni_printf("pthread_create: %s", + // strerror(rv)); return (NNG_ENOMEM); } return (0); @@ -227,7 +432,6 @@ int nni_plat_init(int (*helper)(void)) { int rv; - int devnull; if (nni_plat_forked) { nni_panic("nng is not fork-reentrant safe"); @@ -235,94 +439,60 @@ nni_plat_init(int (*helper)(void)) if (nni_plat_inited) { return (0); // fast path } - if ((devnull = open("/dev/null", O_RDONLY)) < 0) { - return (nni_plat_errno(errno)); - } - pthread_mutex_lock(&nni_plat_lock); + pthread_mutex_lock(&nni_plat_init_lock); if (nni_plat_inited) { // check again under the lock to be sure - pthread_mutex_unlock(&nni_plat_lock); - close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (0); } if (pthread_condattr_init(&nni_cvattr) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (NNG_ENOMEM); } #if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME if (pthread_condattr_setclock(&nni_cvattr, NNG_USE_CLOCKID) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (NNG_ENOMEM); } #endif if (pthread_mutexattr_init(&nni_mxattr) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - pthread_condattr_destroy(&nni_cvattr); - (void) close(devnull); - return (NNG_ENOMEM); - } - - rv = pthread_mutexattr_settype(&nni_mxattr, PTHREAD_MUTEX_ERRORCHECK); - if (rv != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); - pthread_mutexattr_destroy(&nni_mxattr); - pthread_condattr_destroy(&nni_cvattr); - return (NNG_ENOMEM); - } - - rv = pthread_attr_init(&nni_pthread_attr); - if (rv != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(nni_plat_devnull); - pthread_mutexattr_destroy(&nni_mxattr); + pthread_mutex_unlock(&nni_plat_init_lock); pthread_condattr_destroy(&nni_cvattr); return (NNG_ENOMEM); } - // We don't force this, but we want to have it small... we could - // probably get by with even just 8k, but Linux usually wants 16k - // as a minimum. If this fails, its not fatal, just we won't be - // as scalable / thrifty with our use of VM. - //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); + // if this one fails we don't care. + (void) pthread_mutexattr_settype( + &nni_mxattr, PTHREAD_MUTEX_ERRORCHECK); if ((rv = nni_posix_pollq_sysinit()) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(nni_plat_devnull); + pthread_mutex_unlock(&nni_plat_init_lock); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (rv); } if ((rv = nni_posix_resolv_sysinit()) != 0) { - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); nni_posix_pollq_sysfini(); - (void) close(nni_plat_devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (rv); } if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); nni_posix_resolv_sysfini(); nni_posix_pollq_sysfini(); - (void) close(devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (NNG_ENOMEM); } if ((rv = helper()) == 0) { nni_plat_inited = 1; } - nni_plat_devnull = devnull; - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); return (rv); } @@ -330,18 +500,15 @@ nni_plat_init(int (*helper)(void)) void nni_plat_fini(void) { - pthread_mutex_lock(&nni_plat_lock); + pthread_mutex_lock(&nni_plat_init_lock); if (nni_plat_inited) { nni_posix_resolv_sysfini(); nni_posix_pollq_sysfini(); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); - (void) close(nni_plat_devnull); - nni_plat_devnull = -1; - nni_plat_inited = 0; + nni_plat_inited = 0; } - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); } #else diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index db6d7af4..2aa16b36 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -208,10 +208,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&udp->udp_mtx)) != 0) { - NNI_FREE_STRUCT(udp); - return (rv); - } + nni_mtx_init(&udp->udp_mtx); udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP); if (udp->udp_fd < 0) { diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index c4cdcb8a..9c3343b7 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -174,17 +174,14 @@ nni_win_iocp_register(HANDLE h) int nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr) { - int rv; - ZeroMemory(&evt->olpd, sizeof(evt->olpd)); evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); if (evt->olpd.hEvent == NULL) { return (nni_win_error(GetLastError())); } - if (((rv = nni_mtx_init(&evt->mtx)) != 0) || - ((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) { - return (rv); // NB: This will never happen on Windows. - } + nni_mtx_init(&evt->mtx); + nni_cv_init(&evt->cv, &evt->mtx); + evt->ops = *ops; evt->aio = NULL; evt->ptr = ptr; @@ -240,9 +237,7 @@ nni_win_iocp_sysinit(void) goto fail; } } - if ((rv = nni_mtx_init(&nni_win_iocp_mtx)) != 0) { - goto fail; - } + nni_mtx_init(&nni_win_iocp_mtx); for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) { nni_thr_run(&nni_win_iocp_thrs[i]); } diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index c9eb20ec..a60815fa 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -566,10 +566,9 @@ nni_win_ipc_sysinit(void) NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node); NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node); - if (((rv = nni_mtx_init(&worker->mtx)) != 0) || - ((rv = nni_cv_init(&worker->cv, &worker->mtx)) != 0)) { - return (rv); - } + nni_mtx_init(&worker->mtx); + nni_cv_init(&worker->cv, &worker->mtx); + rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker); if (rv != 0) { return (rv); diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c index 63295a71..80e3724d 100644 --- a/src/platform/windows/win_net.c +++ b/src/platform/windows/win_net.c @@ -680,8 +680,6 @@ int nni_win_tcp_sysinit(void) { WSADATA data; - WORD ver; - ver = MAKEWORD(2, 2); if (WSAStartup(MAKEWORD(2, 2), &data) != 0) { NNI_ASSERT(LOBYTE(data.wVersion) == 2); NNI_ASSERT(HIBYTE(data.wVersion) == 2); diff --git a/src/platform/windows/win_pipe.c b/src/platform/windows/win_pipe.c index 861fbc76..edc4df3f 100644 --- a/src/platform/windows/win_pipe.c +++ b/src/platform/windows/win_pipe.c @@ -19,9 +19,9 @@ int nni_plat_pipe_open(int *wfdp, int *rfdp) { - SOCKET afd = INVALID_SOCKET; SOCKET rfd = INVALID_SOCKET; SOCKET wfd = INVALID_SOCKET; + SOCKET afd; struct sockaddr_in addr; socklen_t alen; diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index a01dc123..4ce12d84 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -254,9 +254,8 @@ nni_win_resolv_sysinit(void) { int rv; - if ((rv = nni_mtx_init(&nni_win_resolv_mtx)) != 0) { - return (rv); - } + nni_mtx_init(&nni_win_resolv_mtx); + if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) { nni_mtx_fini(&nni_win_resolv_mtx); return (rv); diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index c01ec782..879cd772 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -30,12 +30,11 @@ nni_free(void *b, size_t z) HeapFree(GetProcessHeap(), 0, b); } -int +void nni_plat_mtx_init(nni_plat_mtx *mtx) { InitializeSRWLock(&mtx->srl); mtx->init = 1; - return (0); } void @@ -56,12 +55,11 @@ nni_plat_mtx_unlock(nni_plat_mtx *mtx) ReleaseSRWLockExclusive(&mtx->srl); } -int +void nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx) { InitializeConditionVariable(&cv->cv); cv->srl = &mtx->srl; - return (0); } void |
