diff options
Diffstat (limited to 'src/platform')
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 11 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 17 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipe.c | 1 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 5 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 7 | ||||
| -rw-r--r-- | src/platform/posix/posix_resolv_gai.c | 5 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 417 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 5 | ||||
| -rw-r--r-- | src/platform/windows/win_iocp.c | 13 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 7 | ||||
| -rw-r--r-- | src/platform/windows/win_net.c | 2 | ||||
| -rw-r--r-- | src/platform/windows/win_pipe.c | 2 | ||||
| -rw-r--r-- | src/platform/windows/win_resolv.c | 5 | ||||
| -rw-r--r-- | src/platform/windows/win_thread.c | 6 |
14 files changed, 326 insertions, 177 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 46fe2bea..f511f35f 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -126,10 +126,8 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) static void nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) { - nni_aio * aio; - int newfd; - struct sockaddr_storage ss; - socklen_t slen; + nni_aio *aio; + int newfd; while ((aio = nni_list_first(&ed->acceptq)) != NULL) { // We could argue that knowing the remote peer address would @@ -456,10 +454,7 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&ed->mtx)) != 0) { - NNI_FREE_STRUCT(ed); - return (rv); - } + nni_mtx_init(&ed->mtx); // We could randomly choose a different pollq, or for efficiencies // sake we could take a modulo of the file desc number to choose diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 46ebbc1d..81fbd48b 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -54,9 +54,19 @@ extern int nni_plat_errno(int); // elsewhere. struct nni_plat_mtx { - int init; pthread_t owner; pthread_mutex_t mtx; + int fallback; + int flags; +}; + +struct nni_plat_cv { + pthread_cond_t cv; + nni_plat_mtx * mtx; + int fallback; + int flags; + int gen; + int wake; }; struct nni_plat_thr { @@ -65,11 +75,6 @@ struct nni_plat_thr { void *arg; }; -struct nni_plat_cv { - pthread_cond_t cv; - nni_plat_mtx * mtx; -}; - #endif extern int nni_posix_pollq_sysinit(void); diff --git a/src/platform/posix/posix_pipe.c b/src/platform/posix/posix_pipe.c index 78415d26..314e2f5e 100644 --- a/src/platform/posix/posix_pipe.c +++ b/src/platform/posix/posix_pipe.c @@ -105,7 +105,6 @@ void nni_plat_pipe_clear(int rfd) { char buf[32]; - int rv; for (;;) { // Completely drain the pipe, but don't wait. This coalesces diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index bd74e0c0..b2c1cb1f 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -302,10 +302,6 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) // one. For now we just have a global pollq. Note that by tying // the pd to a single pollq we may get some kind of cache warmth. - if ((rv = nni_mtx_init(&pd->mtx)) != 0) { - NNI_FREE_STRUCT(pd); - return (rv); - } pd->closed = 0; pd->node.fd = fd; pd->node.cb = nni_posix_pipedesc_cb; @@ -313,6 +309,7 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) (void) fcntl(fd, F_SETFL, O_NONBLOCK); + nni_mtx_init(&pd->mtx); nni_aio_list_init(&pd->readq); nni_aio_list_init(&pd->writeq); diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index d3fdf394..df5a1799 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -347,9 +347,10 @@ nni_posix_pollq_init(nni_posix_pollq *pq) pq->wakerfd = -1; pq->close = 0; - if (((rv = nni_mtx_init(&pq->mtx)) != 0) || - ((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) || - ((rv = nni_posix_pollq_poll_grow(pq)) != 0) || + nni_mtx_init(&pq->mtx); + nni_cv_init(&pq->cv, &pq->mtx); + + if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) || ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) || ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) { nni_posix_pollq_fini(pq); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 09d40b94..dce8270a 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -276,9 +276,8 @@ nni_posix_resolv_sysinit(void) { int rv; - if ((rv = nni_mtx_init(&nni_posix_resolv_mtx)) != 0) { - return (rv); - } + nni_mtx_init(&nni_posix_resolv_mtx); + if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) { nni_mtx_fini(&nni_posix_resolv_mtx); return (rv); diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 0ef4754c..44bfbed2 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -24,118 +24,325 @@ #include <time.h> #include <unistd.h> -static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; -static int nni_plat_inited = 0; -static int nni_plat_forked = 0; +static pthread_mutex_t nni_plat_init_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t nni_plat_cond_cond = PTHREAD_COND_INITIALIZER; +static pthread_cond_t nni_plat_lock_cond = PTHREAD_COND_INITIALIZER; +static int nni_plat_inited = 0; +static int nni_plat_forked = 0; + +pthread_condattr_t nni_cvattr; +pthread_mutexattr_t nni_mxattr; + +#ifndef NDEBUG +int nni_plat_sync_fallback = 0; +#endif -pthread_condattr_t nni_cvattr; -pthread_mutexattr_t nni_mxattr; -static pthread_attr_t nni_pthread_attr; +enum nni_plat_sync_flags { + NNI_PLAT_SYNC_INIT = 0x01, + NNI_PLAT_SYNC_LOCKED = 0x04, + NNI_PLAT_SYNC_WAIT = 0x08, +}; -// We open a /dev/null file descriptor so that we can dup2() it to -// cause MacOS X to wakeup. This gives us a "safe" close semantic. +void +nni_plat_mtx_init(nni_plat_mtx *mtx) +{ + if (pthread_mutex_init(&mtx->mtx, &nni_mxattr) != 0) { + mtx->fallback = 1; + } else { + mtx->flags = NNI_PLAT_SYNC_INIT; + } +#ifndef NDEBUG + if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) { + mtx->fallback = 1; + } +#endif +} -int nni_plat_devnull = -1; +void +nni_plat_mtx_fini(nni_plat_mtx *mtx) +{ + if (mtx->flags & NNI_PLAT_SYNC_INIT) { + int rv; + if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) { + nni_panic("pthread_mutex_destroy: %s", strerror(rv)); + } + } + mtx->flags = 0; +} -int -nni_plat_mtx_init(nni_plat_mtx *mtx) +static void +nni_pthread_mutex_lock(pthread_mutex_t *m) { int rv; - if ((rv = pthread_mutex_init(&mtx->mtx, &nni_mxattr)) != 0) { - switch (rv) { - case EAGAIN: - case ENOMEM: - return (NNG_ENOMEM); + if ((rv = pthread_mutex_lock(m)) != 0) { + nni_panic("pthread_mutex_lock: %s", strerror(rv)); + } +} - default: - nni_panic("pthread_mutex_init: %s", strerror(rv)); - } +static void +nni_pthread_mutex_unlock(pthread_mutex_t *m) +{ + int rv; + + if ((rv = pthread_mutex_unlock(m)) != 0) { + nni_panic("pthread_mutex_unlock: %s", strerror(rv)); } - mtx->init = 1; - return (0); } -void -nni_plat_mtx_fini(nni_plat_mtx *mtx) +static void +nni_pthread_cond_broadcast(pthread_cond_t *c) { int rv; - if (!mtx->init) { - return; + if ((rv = pthread_cond_broadcast(c)) != 0) { + nni_panic("pthread_cond_broadcast: %s", strerror(rv)); } - pthread_mutex_lock(&mtx->mtx); - pthread_mutex_unlock(&mtx->mtx); - if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_fini: %s", strerror(rv)); +} + +static void +nni_pthread_cond_signal(pthread_cond_t *c) +{ + int rv; + if ((rv = pthread_cond_signal(c)) != 0) { + nni_panic("pthread_cond_signal: %s", strerror(rv)); } - mtx->init = 0; } -void -nni_plat_mtx_lock(nni_plat_mtx *mtx) +static void +nni_pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m) { int rv; - if ((rv = pthread_mutex_lock(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_lock: %s", strerror(rv)); + if ((rv = pthread_cond_wait(c, m)) != 0) { + nni_panic("pthread_cond_wait: %s", strerror(rv)); + } +} + +static int +nni_pthread_cond_timedwait( + pthread_cond_t *c, pthread_mutex_t *m, struct timespec *ts) +{ + int rv; + + switch ((rv = pthread_cond_timedwait(c, m, ts))) { + case 0: + return (0); + case ETIMEDOUT: + case EAGAIN: + return (NNG_ETIMEDOUT); + } + nni_panic("pthread_cond_timedwait: %s", strerror(rv)); + return (NNG_EINVAL); +} + +static void +nni_plat_mtx_lock_fallback_locked(nni_plat_mtx *mtx) +{ + while (mtx->flags & NNI_PLAT_SYNC_LOCKED) { + mtx->flags |= NNI_PLAT_SYNC_WAIT; + nni_pthread_cond_wait(&nni_plat_lock_cond, &nni_plat_lock); } + mtx->flags |= NNI_PLAT_SYNC_LOCKED; mtx->owner = pthread_self(); } +static void +nni_plat_mtx_unlock_fallback_locked(nni_plat_mtx *mtx) +{ + NNI_ASSERT(mtx->flags & NNI_PLAT_SYNC_LOCKED); + mtx->flags &= ~NNI_PLAT_SYNC_LOCKED; + if (mtx->flags & NNI_PLAT_SYNC_WAIT) { + mtx->flags &= ~NNI_PLAT_SYNC_WAIT; + pthread_cond_broadcast(&nni_plat_lock_cond); + } +} + +static void +nni_plat_mtx_lock_fallback(nni_plat_mtx *mtx) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + nni_plat_mtx_lock_fallback_locked(mtx); + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_mtx_unlock_fallback(nni_plat_mtx *mtx) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + nni_plat_mtx_unlock_fallback_locked(mtx); + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wake_fallback(nni_cv *cv) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + if (cv->flags & NNI_PLAT_SYNC_WAIT) { + cv->gen++; + cv->wake = 0; + nni_pthread_cond_broadcast(&nni_plat_cond_cond); + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wake1_fallback(nni_cv *cv) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + if (cv->flags & NNI_PLAT_SYNC_WAIT) { + cv->wake++; + nni_pthread_cond_broadcast(&nni_plat_cond_cond); + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wait_fallback(nni_cv *cv) +{ + int gen; + + nni_pthread_mutex_lock(&nni_plat_lock); + if (!cv->mtx->fallback) { + // transform the mutex to a fallback one. we have it held. + cv->mtx->fallback = 1; + cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED; + nni_pthread_mutex_unlock(&cv->mtx->mtx); + } + + NNI_ASSERT(cv->mtx->owner == pthread_self()); + NNI_ASSERT(cv->mtx->flags & NNI_PLAT_SYNC_LOCKED); + gen = cv->gen; + while ((cv->gen == gen) && (cv->wake == 0)) { + nni_plat_mtx_unlock_fallback_locked(cv->mtx); + cv->flags |= NNI_PLAT_SYNC_WAIT; + nni_pthread_cond_wait(&nni_plat_cond_cond, &nni_plat_lock); + + nni_plat_mtx_lock_fallback_locked(cv->mtx); + } + if (cv->wake > 0) { + cv->wake--; + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static int +nni_plat_cv_until_fallback(nni_cv *cv, struct timespec *ts) +{ + int gen; + int rv = 0; + + if (!cv->mtx->fallback) { + // transform the mutex to a fallback one. we have it held. + cv->mtx->fallback = 1; + cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED; + nni_pthread_mutex_unlock(&cv->mtx->mtx); + } + + nni_pthread_mutex_lock(&nni_plat_lock); + gen = cv->gen; + while ((cv->gen == gen) && (cv->wake == 0)) { + nni_plat_mtx_unlock_fallback_locked(cv->mtx); + cv->flags |= NNI_PLAT_SYNC_WAIT; + rv = nni_pthread_cond_timedwait( + &nni_plat_cond_cond, &nni_plat_lock, ts); + nni_plat_mtx_lock_fallback_locked(cv->mtx); + if (rv != 0) { + break; + } + } + if ((rv == 0) && (cv->wake > 0)) { + cv->wake--; + } + nni_pthread_mutex_unlock(&nni_plat_lock); + return (rv); +} + void -nni_plat_mtx_unlock(nni_plat_mtx *mtx) +nni_plat_mtx_lock(nni_plat_mtx *mtx) { int rv; + if (!mtx->fallback) { + nni_pthread_mutex_lock(&mtx->mtx); + + // We might have changed to a fallback lock; make + // sure this did not occur. Note that transitions to + // fallback locks only happen when a thread accesses + // a condition variable already holding this lock, + // so this is guranteed to be safe. + if (!mtx->fallback) { + mtx->owner = pthread_self(); + return; + } + nni_pthread_mutex_unlock(&mtx->mtx); + } + + // Fallback mode + nni_plat_mtx_lock_fallback(mtx); +} + +void +nni_plat_mtx_unlock(nni_plat_mtx *mtx) +{ NNI_ASSERT(mtx->owner == pthread_self()); mtx->owner = 0; - if ((rv = pthread_mutex_unlock(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_unlock: %s", strerror(rv)); + + if (mtx->fallback) { + nni_plat_mtx_unlock_fallback(mtx); + } else { + nni_pthread_mutex_unlock(&mtx->mtx); } } -int +void nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx) { - int rv; - - if ((rv = pthread_cond_init(&cv->cv, &nni_cvattr)) != 0) { - switch (rv) { - case ENOMEM: - case EAGAIN: - return (NNG_ENOMEM); - - default: - nni_panic("pthread_cond_init: %s", strerror(rv)); - } + if (mtx->fallback || (pthread_cond_init(&cv->cv, &nni_cvattr) != 0)) { + cv->fallback = 1; + } else { + cv->flags = NNI_PLAT_SYNC_INIT; + } +#ifndef NDEBUG + if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) { + cv->fallback = 1; } +#endif cv->mtx = mtx; - return (0); } void nni_plat_cv_wake(nni_plat_cv *cv) { - (void) pthread_cond_broadcast(&cv->cv); + if (cv->fallback) { + nni_plat_cv_wake_fallback(cv); + } else { + nni_pthread_cond_broadcast(&cv->cv); + } } void nni_plat_cv_wake1(nni_plat_cv *cv) { - (void) pthread_cond_signal(&cv->cv); + int rv; + if (cv->fallback) { + nni_plat_cv_wake1_fallback(cv); + } else { + nni_pthread_cond_signal(&cv->cv); + } } void nni_plat_cv_wait(nni_plat_cv *cv) { - int rv; - NNI_ASSERT(cv->mtx->owner == pthread_self()); - if ((rv = pthread_cond_wait(&cv->cv, &cv->mtx->mtx)) != 0) { - nni_panic("pthread_cond_wait: %s", strerror(rv)); + if (cv->fallback) { + nni_plat_cv_wait_fallback(cv); + } else { + nni_pthread_cond_wait(&cv->cv, &cv->mtx->mtx); + cv->mtx->owner = pthread_self(); } - cv->mtx->owner = pthread_self(); } int @@ -150,14 +357,13 @@ nni_plat_cv_until(nni_plat_cv *cv, nni_time until) ts.tv_sec = until / 1000000; ts.tv_nsec = (until % 1000000) * 1000; - rv = pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts); - cv->mtx->owner = pthread_self(); - if (rv == ETIMEDOUT) { - return (NNG_ETIMEDOUT); - } else if (rv != 0) { - nni_panic("pthread_cond_timedwait: %d", rv); + if (cv->fallback) { + rv = nni_plat_cv_until_fallback(cv, &ts); + } else { + rv = nni_pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts); + cv->mtx->owner = pthread_self(); } - return (0); + return (rv); } void @@ -165,13 +371,12 @@ nni_plat_cv_fini(nni_plat_cv *cv) { int rv; - if (cv->mtx == NULL) { - return; - } - if ((rv = pthread_cond_destroy(&cv->cv)) != 0) { + if ((cv->flags & NNI_PLAT_SYNC_INIT) && + ((rv = pthread_cond_destroy(&cv->cv)) != 0)) { nni_panic("pthread_cond_destroy: %s", strerror(rv)); } - cv->mtx = NULL; + cv->flags = 0; + cv->mtx = NULL; } static void * @@ -198,10 +403,10 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg) thr->arg = arg; // POSIX wants functions to return a void *, but we don't care. - rv = pthread_create( - &thr->tid, &nni_pthread_attr, nni_plat_thr_main, thr); + rv = pthread_create(&thr->tid, NULL, nni_plat_thr_main, thr); if (rv != 0) { - // nni_printf("pthread_create: %s", strerror(rv)); + // nni_printf("pthread_create: %s", + // strerror(rv)); return (NNG_ENOMEM); } return (0); @@ -227,7 +432,6 @@ int nni_plat_init(int (*helper)(void)) { int rv; - int devnull; if (nni_plat_forked) { nni_panic("nng is not fork-reentrant safe"); @@ -235,94 +439,60 @@ nni_plat_init(int (*helper)(void)) if (nni_plat_inited) { return (0); // fast path } - if ((devnull = open("/dev/null", O_RDONLY)) < 0) { - return (nni_plat_errno(errno)); - } - pthread_mutex_lock(&nni_plat_lock); + pthread_mutex_lock(&nni_plat_init_lock); if (nni_plat_inited) { // check again under the lock to be sure - pthread_mutex_unlock(&nni_plat_lock); - close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (0); } if (pthread_condattr_init(&nni_cvattr) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (NNG_ENOMEM); } #if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME if (pthread_condattr_setclock(&nni_cvattr, NNG_USE_CLOCKID) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (NNG_ENOMEM); } #endif if (pthread_mutexattr_init(&nni_mxattr) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - pthread_condattr_destroy(&nni_cvattr); - (void) close(devnull); - return (NNG_ENOMEM); - } - - rv = pthread_mutexattr_settype(&nni_mxattr, PTHREAD_MUTEX_ERRORCHECK); - if (rv != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); - pthread_mutexattr_destroy(&nni_mxattr); - pthread_condattr_destroy(&nni_cvattr); - return (NNG_ENOMEM); - } - - rv = pthread_attr_init(&nni_pthread_attr); - if (rv != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(nni_plat_devnull); - pthread_mutexattr_destroy(&nni_mxattr); + pthread_mutex_unlock(&nni_plat_init_lock); pthread_condattr_destroy(&nni_cvattr); return (NNG_ENOMEM); } - // We don't force this, but we want to have it small... we could - // probably get by with even just 8k, but Linux usually wants 16k - // as a minimum. If this fails, its not fatal, just we won't be - // as scalable / thrifty with our use of VM. - //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); + // if this one fails we don't care. + (void) pthread_mutexattr_settype( + &nni_mxattr, PTHREAD_MUTEX_ERRORCHECK); if ((rv = nni_posix_pollq_sysinit()) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(nni_plat_devnull); + pthread_mutex_unlock(&nni_plat_init_lock); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (rv); } if ((rv = nni_posix_resolv_sysinit()) != 0) { - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); nni_posix_pollq_sysfini(); - (void) close(nni_plat_devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (rv); } if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); nni_posix_resolv_sysfini(); nni_posix_pollq_sysfini(); - (void) close(devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (NNG_ENOMEM); } if ((rv = helper()) == 0) { nni_plat_inited = 1; } - nni_plat_devnull = devnull; - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); return (rv); } @@ -330,18 +500,15 @@ nni_plat_init(int (*helper)(void)) void nni_plat_fini(void) { - pthread_mutex_lock(&nni_plat_lock); + pthread_mutex_lock(&nni_plat_init_lock); if (nni_plat_inited) { nni_posix_resolv_sysfini(); nni_posix_pollq_sysfini(); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); - (void) close(nni_plat_devnull); - nni_plat_devnull = -1; - nni_plat_inited = 0; + nni_plat_inited = 0; } - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); } #else diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index db6d7af4..2aa16b36 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -208,10 +208,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&udp->udp_mtx)) != 0) { - NNI_FREE_STRUCT(udp); - return (rv); - } + nni_mtx_init(&udp->udp_mtx); udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP); if (udp->udp_fd < 0) { diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index c4cdcb8a..9c3343b7 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -174,17 +174,14 @@ nni_win_iocp_register(HANDLE h) int nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr) { - int rv; - ZeroMemory(&evt->olpd, sizeof(evt->olpd)); evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); if (evt->olpd.hEvent == NULL) { return (nni_win_error(GetLastError())); } - if (((rv = nni_mtx_init(&evt->mtx)) != 0) || - ((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) { - return (rv); // NB: This will never happen on Windows. - } + nni_mtx_init(&evt->mtx); + nni_cv_init(&evt->cv, &evt->mtx); + evt->ops = *ops; evt->aio = NULL; evt->ptr = ptr; @@ -240,9 +237,7 @@ nni_win_iocp_sysinit(void) goto fail; } } - if ((rv = nni_mtx_init(&nni_win_iocp_mtx)) != 0) { - goto fail; - } + nni_mtx_init(&nni_win_iocp_mtx); for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) { nni_thr_run(&nni_win_iocp_thrs[i]); } diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index c9eb20ec..a60815fa 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -566,10 +566,9 @@ nni_win_ipc_sysinit(void) NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node); NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node); - if (((rv = nni_mtx_init(&worker->mtx)) != 0) || - ((rv = nni_cv_init(&worker->cv, &worker->mtx)) != 0)) { - return (rv); - } + nni_mtx_init(&worker->mtx); + nni_cv_init(&worker->cv, &worker->mtx); + rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker); if (rv != 0) { return (rv); diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c index 63295a71..80e3724d 100644 --- a/src/platform/windows/win_net.c +++ b/src/platform/windows/win_net.c @@ -680,8 +680,6 @@ int nni_win_tcp_sysinit(void) { WSADATA data; - WORD ver; - ver = MAKEWORD(2, 2); if (WSAStartup(MAKEWORD(2, 2), &data) != 0) { NNI_ASSERT(LOBYTE(data.wVersion) == 2); NNI_ASSERT(HIBYTE(data.wVersion) == 2); diff --git a/src/platform/windows/win_pipe.c b/src/platform/windows/win_pipe.c index 861fbc76..edc4df3f 100644 --- a/src/platform/windows/win_pipe.c +++ b/src/platform/windows/win_pipe.c @@ -19,9 +19,9 @@ int nni_plat_pipe_open(int *wfdp, int *rfdp) { - SOCKET afd = INVALID_SOCKET; SOCKET rfd = INVALID_SOCKET; SOCKET wfd = INVALID_SOCKET; + SOCKET afd; struct sockaddr_in addr; socklen_t alen; diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index a01dc123..4ce12d84 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -254,9 +254,8 @@ nni_win_resolv_sysinit(void) { int rv; - if ((rv = nni_mtx_init(&nni_win_resolv_mtx)) != 0) { - return (rv); - } + nni_mtx_init(&nni_win_resolv_mtx); + if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) { nni_mtx_fini(&nni_win_resolv_mtx); return (rv); diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index c01ec782..879cd772 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -30,12 +30,11 @@ nni_free(void *b, size_t z) HeapFree(GetProcessHeap(), 0, b); } -int +void nni_plat_mtx_init(nni_plat_mtx *mtx) { InitializeSRWLock(&mtx->srl); mtx->init = 1; - return (0); } void @@ -56,12 +55,11 @@ nni_plat_mtx_unlock(nni_plat_mtx *mtx) ReleaseSRWLockExclusive(&mtx->srl); } -int +void nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx) { InitializeConditionVariable(&cv->cv); cv->srl = &mtx->srl; - return (0); } void |
