aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_epdesc.c11
-rw-r--r--src/platform/posix/posix_impl.h17
-rw-r--r--src/platform/posix/posix_pipe.c1
-rw-r--r--src/platform/posix/posix_pipedesc.c5
-rw-r--r--src/platform/posix/posix_pollq_poll.c7
-rw-r--r--src/platform/posix/posix_resolv_gai.c5
-rw-r--r--src/platform/posix/posix_thread.c417
-rw-r--r--src/platform/posix/posix_udp.c5
-rw-r--r--src/platform/windows/win_iocp.c13
-rw-r--r--src/platform/windows/win_ipc.c7
-rw-r--r--src/platform/windows/win_net.c2
-rw-r--r--src/platform/windows/win_pipe.c2
-rw-r--r--src/platform/windows/win_resolv.c5
-rw-r--r--src/platform/windows/win_thread.c6
14 files changed, 326 insertions, 177 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 46fe2bea..f511f35f 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -126,10 +126,8 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
static void
nni_posix_epdesc_doaccept(nni_posix_epdesc *ed)
{
- nni_aio * aio;
- int newfd;
- struct sockaddr_storage ss;
- socklen_t slen;
+ nni_aio *aio;
+ int newfd;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
// We could argue that knowing the remote peer address would
@@ -456,10 +454,7 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&ed->mtx)) != 0) {
- NNI_FREE_STRUCT(ed);
- return (rv);
- }
+ nni_mtx_init(&ed->mtx);
// We could randomly choose a different pollq, or for efficiencies
// sake we could take a modulo of the file desc number to choose
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 46ebbc1d..81fbd48b 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -54,9 +54,19 @@ extern int nni_plat_errno(int);
// elsewhere.
struct nni_plat_mtx {
- int init;
pthread_t owner;
pthread_mutex_t mtx;
+ int fallback;
+ int flags;
+};
+
+struct nni_plat_cv {
+ pthread_cond_t cv;
+ nni_plat_mtx * mtx;
+ int fallback;
+ int flags;
+ int gen;
+ int wake;
};
struct nni_plat_thr {
@@ -65,11 +75,6 @@ struct nni_plat_thr {
void *arg;
};
-struct nni_plat_cv {
- pthread_cond_t cv;
- nni_plat_mtx * mtx;
-};
-
#endif
extern int nni_posix_pollq_sysinit(void);
diff --git a/src/platform/posix/posix_pipe.c b/src/platform/posix/posix_pipe.c
index 78415d26..314e2f5e 100644
--- a/src/platform/posix/posix_pipe.c
+++ b/src/platform/posix/posix_pipe.c
@@ -105,7 +105,6 @@ void
nni_plat_pipe_clear(int rfd)
{
char buf[32];
- int rv;
for (;;) {
// Completely drain the pipe, but don't wait. This coalesces
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index bd74e0c0..b2c1cb1f 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -302,10 +302,6 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
// one. For now we just have a global pollq. Note that by tying
// the pd to a single pollq we may get some kind of cache warmth.
- if ((rv = nni_mtx_init(&pd->mtx)) != 0) {
- NNI_FREE_STRUCT(pd);
- return (rv);
- }
pd->closed = 0;
pd->node.fd = fd;
pd->node.cb = nni_posix_pipedesc_cb;
@@ -313,6 +309,7 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ nni_mtx_init(&pd->mtx);
nni_aio_list_init(&pd->readq);
nni_aio_list_init(&pd->writeq);
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index d3fdf394..df5a1799 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -347,9 +347,10 @@ nni_posix_pollq_init(nni_posix_pollq *pq)
pq->wakerfd = -1;
pq->close = 0;
- if (((rv = nni_mtx_init(&pq->mtx)) != 0) ||
- ((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) ||
- ((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
+ nni_mtx_init(&pq->mtx);
+ nni_cv_init(&pq->cv, &pq->mtx);
+
+ if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) ||
((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) {
nni_posix_pollq_fini(pq);
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index 09d40b94..dce8270a 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -276,9 +276,8 @@ nni_posix_resolv_sysinit(void)
{
int rv;
- if ((rv = nni_mtx_init(&nni_posix_resolv_mtx)) != 0) {
- return (rv);
- }
+ nni_mtx_init(&nni_posix_resolv_mtx);
+
if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) {
nni_mtx_fini(&nni_posix_resolv_mtx);
return (rv);
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 0ef4754c..44bfbed2 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -24,118 +24,325 @@
#include <time.h>
#include <unistd.h>
-static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER;
-static int nni_plat_inited = 0;
-static int nni_plat_forked = 0;
+static pthread_mutex_t nni_plat_init_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t nni_plat_cond_cond = PTHREAD_COND_INITIALIZER;
+static pthread_cond_t nni_plat_lock_cond = PTHREAD_COND_INITIALIZER;
+static int nni_plat_inited = 0;
+static int nni_plat_forked = 0;
+
+pthread_condattr_t nni_cvattr;
+pthread_mutexattr_t nni_mxattr;
+
+#ifndef NDEBUG
+int nni_plat_sync_fallback = 0;
+#endif
-pthread_condattr_t nni_cvattr;
-pthread_mutexattr_t nni_mxattr;
-static pthread_attr_t nni_pthread_attr;
+enum nni_plat_sync_flags {
+ NNI_PLAT_SYNC_INIT = 0x01,
+ NNI_PLAT_SYNC_LOCKED = 0x04,
+ NNI_PLAT_SYNC_WAIT = 0x08,
+};
-// We open a /dev/null file descriptor so that we can dup2() it to
-// cause MacOS X to wakeup. This gives us a "safe" close semantic.
+void
+nni_plat_mtx_init(nni_plat_mtx *mtx)
+{
+ if (pthread_mutex_init(&mtx->mtx, &nni_mxattr) != 0) {
+ mtx->fallback = 1;
+ } else {
+ mtx->flags = NNI_PLAT_SYNC_INIT;
+ }
+#ifndef NDEBUG
+ if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) {
+ mtx->fallback = 1;
+ }
+#endif
+}
-int nni_plat_devnull = -1;
+void
+nni_plat_mtx_fini(nni_plat_mtx *mtx)
+{
+ if (mtx->flags & NNI_PLAT_SYNC_INIT) {
+ int rv;
+ if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) {
+ nni_panic("pthread_mutex_destroy: %s", strerror(rv));
+ }
+ }
+ mtx->flags = 0;
+}
-int
-nni_plat_mtx_init(nni_plat_mtx *mtx)
+static void
+nni_pthread_mutex_lock(pthread_mutex_t *m)
{
int rv;
- if ((rv = pthread_mutex_init(&mtx->mtx, &nni_mxattr)) != 0) {
- switch (rv) {
- case EAGAIN:
- case ENOMEM:
- return (NNG_ENOMEM);
+ if ((rv = pthread_mutex_lock(m)) != 0) {
+ nni_panic("pthread_mutex_lock: %s", strerror(rv));
+ }
+}
- default:
- nni_panic("pthread_mutex_init: %s", strerror(rv));
- }
+static void
+nni_pthread_mutex_unlock(pthread_mutex_t *m)
+{
+ int rv;
+
+ if ((rv = pthread_mutex_unlock(m)) != 0) {
+ nni_panic("pthread_mutex_unlock: %s", strerror(rv));
}
- mtx->init = 1;
- return (0);
}
-void
-nni_plat_mtx_fini(nni_plat_mtx *mtx)
+static void
+nni_pthread_cond_broadcast(pthread_cond_t *c)
{
int rv;
- if (!mtx->init) {
- return;
+ if ((rv = pthread_cond_broadcast(c)) != 0) {
+ nni_panic("pthread_cond_broadcast: %s", strerror(rv));
}
- pthread_mutex_lock(&mtx->mtx);
- pthread_mutex_unlock(&mtx->mtx);
- if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) {
- nni_panic("pthread_mutex_fini: %s", strerror(rv));
+}
+
+static void
+nni_pthread_cond_signal(pthread_cond_t *c)
+{
+ int rv;
+ if ((rv = pthread_cond_signal(c)) != 0) {
+ nni_panic("pthread_cond_signal: %s", strerror(rv));
}
- mtx->init = 0;
}
-void
-nni_plat_mtx_lock(nni_plat_mtx *mtx)
+static void
+nni_pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m)
{
int rv;
- if ((rv = pthread_mutex_lock(&mtx->mtx)) != 0) {
- nni_panic("pthread_mutex_lock: %s", strerror(rv));
+ if ((rv = pthread_cond_wait(c, m)) != 0) {
+ nni_panic("pthread_cond_wait: %s", strerror(rv));
+ }
+}
+
+static int
+nni_pthread_cond_timedwait(
+ pthread_cond_t *c, pthread_mutex_t *m, struct timespec *ts)
+{
+ int rv;
+
+ switch ((rv = pthread_cond_timedwait(c, m, ts))) {
+ case 0:
+ return (0);
+ case ETIMEDOUT:
+ case EAGAIN:
+ return (NNG_ETIMEDOUT);
+ }
+ nni_panic("pthread_cond_timedwait: %s", strerror(rv));
+ return (NNG_EINVAL);
+}
+
+static void
+nni_plat_mtx_lock_fallback_locked(nni_plat_mtx *mtx)
+{
+ while (mtx->flags & NNI_PLAT_SYNC_LOCKED) {
+ mtx->flags |= NNI_PLAT_SYNC_WAIT;
+ nni_pthread_cond_wait(&nni_plat_lock_cond, &nni_plat_lock);
}
+ mtx->flags |= NNI_PLAT_SYNC_LOCKED;
mtx->owner = pthread_self();
}
+static void
+nni_plat_mtx_unlock_fallback_locked(nni_plat_mtx *mtx)
+{
+ NNI_ASSERT(mtx->flags & NNI_PLAT_SYNC_LOCKED);
+ mtx->flags &= ~NNI_PLAT_SYNC_LOCKED;
+ if (mtx->flags & NNI_PLAT_SYNC_WAIT) {
+ mtx->flags &= ~NNI_PLAT_SYNC_WAIT;
+ pthread_cond_broadcast(&nni_plat_lock_cond);
+ }
+}
+
+static void
+nni_plat_mtx_lock_fallback(nni_plat_mtx *mtx)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ nni_plat_mtx_lock_fallback_locked(mtx);
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_mtx_unlock_fallback(nni_plat_mtx *mtx)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ nni_plat_mtx_unlock_fallback_locked(mtx);
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_cv_wake_fallback(nni_cv *cv)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ if (cv->flags & NNI_PLAT_SYNC_WAIT) {
+ cv->gen++;
+ cv->wake = 0;
+ nni_pthread_cond_broadcast(&nni_plat_cond_cond);
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_cv_wake1_fallback(nni_cv *cv)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ if (cv->flags & NNI_PLAT_SYNC_WAIT) {
+ cv->wake++;
+ nni_pthread_cond_broadcast(&nni_plat_cond_cond);
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_cv_wait_fallback(nni_cv *cv)
+{
+ int gen;
+
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ if (!cv->mtx->fallback) {
+ // transform the mutex to a fallback one. we have it held.
+ cv->mtx->fallback = 1;
+ cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED;
+ nni_pthread_mutex_unlock(&cv->mtx->mtx);
+ }
+
+ NNI_ASSERT(cv->mtx->owner == pthread_self());
+ NNI_ASSERT(cv->mtx->flags & NNI_PLAT_SYNC_LOCKED);
+ gen = cv->gen;
+ while ((cv->gen == gen) && (cv->wake == 0)) {
+ nni_plat_mtx_unlock_fallback_locked(cv->mtx);
+ cv->flags |= NNI_PLAT_SYNC_WAIT;
+ nni_pthread_cond_wait(&nni_plat_cond_cond, &nni_plat_lock);
+
+ nni_plat_mtx_lock_fallback_locked(cv->mtx);
+ }
+ if (cv->wake > 0) {
+ cv->wake--;
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static int
+nni_plat_cv_until_fallback(nni_cv *cv, struct timespec *ts)
+{
+ int gen;
+ int rv = 0;
+
+ if (!cv->mtx->fallback) {
+ // transform the mutex to a fallback one. we have it held.
+ cv->mtx->fallback = 1;
+ cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED;
+ nni_pthread_mutex_unlock(&cv->mtx->mtx);
+ }
+
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ gen = cv->gen;
+ while ((cv->gen == gen) && (cv->wake == 0)) {
+ nni_plat_mtx_unlock_fallback_locked(cv->mtx);
+ cv->flags |= NNI_PLAT_SYNC_WAIT;
+ rv = nni_pthread_cond_timedwait(
+ &nni_plat_cond_cond, &nni_plat_lock, ts);
+ nni_plat_mtx_lock_fallback_locked(cv->mtx);
+ if (rv != 0) {
+ break;
+ }
+ }
+ if ((rv == 0) && (cv->wake > 0)) {
+ cv->wake--;
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+ return (rv);
+}
+
void
-nni_plat_mtx_unlock(nni_plat_mtx *mtx)
+nni_plat_mtx_lock(nni_plat_mtx *mtx)
{
int rv;
+ if (!mtx->fallback) {
+ nni_pthread_mutex_lock(&mtx->mtx);
+
+ // We might have changed to a fallback lock; make
+ // sure this did not occur. Note that transitions to
+ // fallback locks only happen when a thread accesses
+ // a condition variable already holding this lock,
+ // so this is guranteed to be safe.
+ if (!mtx->fallback) {
+ mtx->owner = pthread_self();
+ return;
+ }
+ nni_pthread_mutex_unlock(&mtx->mtx);
+ }
+
+ // Fallback mode
+ nni_plat_mtx_lock_fallback(mtx);
+}
+
+void
+nni_plat_mtx_unlock(nni_plat_mtx *mtx)
+{
NNI_ASSERT(mtx->owner == pthread_self());
mtx->owner = 0;
- if ((rv = pthread_mutex_unlock(&mtx->mtx)) != 0) {
- nni_panic("pthread_mutex_unlock: %s", strerror(rv));
+
+ if (mtx->fallback) {
+ nni_plat_mtx_unlock_fallback(mtx);
+ } else {
+ nni_pthread_mutex_unlock(&mtx->mtx);
}
}
-int
+void
nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx)
{
- int rv;
-
- if ((rv = pthread_cond_init(&cv->cv, &nni_cvattr)) != 0) {
- switch (rv) {
- case ENOMEM:
- case EAGAIN:
- return (NNG_ENOMEM);
-
- default:
- nni_panic("pthread_cond_init: %s", strerror(rv));
- }
+ if (mtx->fallback || (pthread_cond_init(&cv->cv, &nni_cvattr) != 0)) {
+ cv->fallback = 1;
+ } else {
+ cv->flags = NNI_PLAT_SYNC_INIT;
+ }
+#ifndef NDEBUG
+ if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) {
+ cv->fallback = 1;
}
+#endif
cv->mtx = mtx;
- return (0);
}
void
nni_plat_cv_wake(nni_plat_cv *cv)
{
- (void) pthread_cond_broadcast(&cv->cv);
+ if (cv->fallback) {
+ nni_plat_cv_wake_fallback(cv);
+ } else {
+ nni_pthread_cond_broadcast(&cv->cv);
+ }
}
void
nni_plat_cv_wake1(nni_plat_cv *cv)
{
- (void) pthread_cond_signal(&cv->cv);
+ int rv;
+ if (cv->fallback) {
+ nni_plat_cv_wake1_fallback(cv);
+ } else {
+ nni_pthread_cond_signal(&cv->cv);
+ }
}
void
nni_plat_cv_wait(nni_plat_cv *cv)
{
- int rv;
-
NNI_ASSERT(cv->mtx->owner == pthread_self());
- if ((rv = pthread_cond_wait(&cv->cv, &cv->mtx->mtx)) != 0) {
- nni_panic("pthread_cond_wait: %s", strerror(rv));
+ if (cv->fallback) {
+ nni_plat_cv_wait_fallback(cv);
+ } else {
+ nni_pthread_cond_wait(&cv->cv, &cv->mtx->mtx);
+ cv->mtx->owner = pthread_self();
}
- cv->mtx->owner = pthread_self();
}
int
@@ -150,14 +357,13 @@ nni_plat_cv_until(nni_plat_cv *cv, nni_time until)
ts.tv_sec = until / 1000000;
ts.tv_nsec = (until % 1000000) * 1000;
- rv = pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts);
- cv->mtx->owner = pthread_self();
- if (rv == ETIMEDOUT) {
- return (NNG_ETIMEDOUT);
- } else if (rv != 0) {
- nni_panic("pthread_cond_timedwait: %d", rv);
+ if (cv->fallback) {
+ rv = nni_plat_cv_until_fallback(cv, &ts);
+ } else {
+ rv = nni_pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts);
+ cv->mtx->owner = pthread_self();
}
- return (0);
+ return (rv);
}
void
@@ -165,13 +371,12 @@ nni_plat_cv_fini(nni_plat_cv *cv)
{
int rv;
- if (cv->mtx == NULL) {
- return;
- }
- if ((rv = pthread_cond_destroy(&cv->cv)) != 0) {
+ if ((cv->flags & NNI_PLAT_SYNC_INIT) &&
+ ((rv = pthread_cond_destroy(&cv->cv)) != 0)) {
nni_panic("pthread_cond_destroy: %s", strerror(rv));
}
- cv->mtx = NULL;
+ cv->flags = 0;
+ cv->mtx = NULL;
}
static void *
@@ -198,10 +403,10 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg)
thr->arg = arg;
// POSIX wants functions to return a void *, but we don't care.
- rv = pthread_create(
- &thr->tid, &nni_pthread_attr, nni_plat_thr_main, thr);
+ rv = pthread_create(&thr->tid, NULL, nni_plat_thr_main, thr);
if (rv != 0) {
- // nni_printf("pthread_create: %s", strerror(rv));
+ // nni_printf("pthread_create: %s",
+ // strerror(rv));
return (NNG_ENOMEM);
}
return (0);
@@ -227,7 +432,6 @@ int
nni_plat_init(int (*helper)(void))
{
int rv;
- int devnull;
if (nni_plat_forked) {
nni_panic("nng is not fork-reentrant safe");
@@ -235,94 +439,60 @@ nni_plat_init(int (*helper)(void))
if (nni_plat_inited) {
return (0); // fast path
}
- if ((devnull = open("/dev/null", O_RDONLY)) < 0) {
- return (nni_plat_errno(errno));
- }
- pthread_mutex_lock(&nni_plat_lock);
+ pthread_mutex_lock(&nni_plat_init_lock);
if (nni_plat_inited) { // check again under the lock to be sure
- pthread_mutex_unlock(&nni_plat_lock);
- close(devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (0);
}
if (pthread_condattr_init(&nni_cvattr) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (NNG_ENOMEM);
}
#if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME
if (pthread_condattr_setclock(&nni_cvattr, NNG_USE_CLOCKID) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (NNG_ENOMEM);
}
#endif
if (pthread_mutexattr_init(&nni_mxattr) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- pthread_condattr_destroy(&nni_cvattr);
- (void) close(devnull);
- return (NNG_ENOMEM);
- }
-
- rv = pthread_mutexattr_settype(&nni_mxattr, PTHREAD_MUTEX_ERRORCHECK);
- if (rv != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(devnull);
- pthread_mutexattr_destroy(&nni_mxattr);
- pthread_condattr_destroy(&nni_cvattr);
- return (NNG_ENOMEM);
- }
-
- rv = pthread_attr_init(&nni_pthread_attr);
- if (rv != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(nni_plat_devnull);
- pthread_mutexattr_destroy(&nni_mxattr);
+ pthread_mutex_unlock(&nni_plat_init_lock);
pthread_condattr_destroy(&nni_cvattr);
return (NNG_ENOMEM);
}
- // We don't force this, but we want to have it small... we could
- // probably get by with even just 8k, but Linux usually wants 16k
- // as a minimum. If this fails, its not fatal, just we won't be
- // as scalable / thrifty with our use of VM.
- //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384);
+ // if this one fails we don't care.
+ (void) pthread_mutexattr_settype(
+ &nni_mxattr, PTHREAD_MUTEX_ERRORCHECK);
if ((rv = nni_posix_pollq_sysinit()) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(nni_plat_devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
return (rv);
}
if ((rv = nni_posix_resolv_sysinit()) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
nni_posix_pollq_sysfini();
- (void) close(nni_plat_devnull);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
return (rv);
}
if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
nni_posix_resolv_sysfini();
nni_posix_pollq_sysfini();
- (void) close(devnull);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
return (NNG_ENOMEM);
}
if ((rv = helper()) == 0) {
nni_plat_inited = 1;
}
- nni_plat_devnull = devnull;
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (rv);
}
@@ -330,18 +500,15 @@ nni_plat_init(int (*helper)(void))
void
nni_plat_fini(void)
{
- pthread_mutex_lock(&nni_plat_lock);
+ pthread_mutex_lock(&nni_plat_init_lock);
if (nni_plat_inited) {
nni_posix_resolv_sysfini();
nni_posix_pollq_sysfini();
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
- (void) close(nni_plat_devnull);
- nni_plat_devnull = -1;
- nni_plat_inited = 0;
+ nni_plat_inited = 0;
}
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
}
#else
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index db6d7af4..2aa16b36 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -208,10 +208,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&udp->udp_mtx)) != 0) {
- NNI_FREE_STRUCT(udp);
- return (rv);
- }
+ nni_mtx_init(&udp->udp_mtx);
udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP);
if (udp->udp_fd < 0) {
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index c4cdcb8a..9c3343b7 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -174,17 +174,14 @@ nni_win_iocp_register(HANDLE h)
int
nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr)
{
- int rv;
-
ZeroMemory(&evt->olpd, sizeof(evt->olpd));
evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
if (evt->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
}
- if (((rv = nni_mtx_init(&evt->mtx)) != 0) ||
- ((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) {
- return (rv); // NB: This will never happen on Windows.
- }
+ nni_mtx_init(&evt->mtx);
+ nni_cv_init(&evt->cv, &evt->mtx);
+
evt->ops = *ops;
evt->aio = NULL;
evt->ptr = ptr;
@@ -240,9 +237,7 @@ nni_win_iocp_sysinit(void)
goto fail;
}
}
- if ((rv = nni_mtx_init(&nni_win_iocp_mtx)) != 0) {
- goto fail;
- }
+ nni_mtx_init(&nni_win_iocp_mtx);
for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) {
nni_thr_run(&nni_win_iocp_thrs[i]);
}
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index c9eb20ec..a60815fa 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -566,10 +566,9 @@ nni_win_ipc_sysinit(void)
NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node);
NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node);
- if (((rv = nni_mtx_init(&worker->mtx)) != 0) ||
- ((rv = nni_cv_init(&worker->cv, &worker->mtx)) != 0)) {
- return (rv);
- }
+ nni_mtx_init(&worker->mtx);
+ nni_cv_init(&worker->cv, &worker->mtx);
+
rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker);
if (rv != 0) {
return (rv);
diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c
index 63295a71..80e3724d 100644
--- a/src/platform/windows/win_net.c
+++ b/src/platform/windows/win_net.c
@@ -680,8 +680,6 @@ int
nni_win_tcp_sysinit(void)
{
WSADATA data;
- WORD ver;
- ver = MAKEWORD(2, 2);
if (WSAStartup(MAKEWORD(2, 2), &data) != 0) {
NNI_ASSERT(LOBYTE(data.wVersion) == 2);
NNI_ASSERT(HIBYTE(data.wVersion) == 2);
diff --git a/src/platform/windows/win_pipe.c b/src/platform/windows/win_pipe.c
index 861fbc76..edc4df3f 100644
--- a/src/platform/windows/win_pipe.c
+++ b/src/platform/windows/win_pipe.c
@@ -19,9 +19,9 @@
int
nni_plat_pipe_open(int *wfdp, int *rfdp)
{
- SOCKET afd = INVALID_SOCKET;
SOCKET rfd = INVALID_SOCKET;
SOCKET wfd = INVALID_SOCKET;
+ SOCKET afd;
struct sockaddr_in addr;
socklen_t alen;
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index a01dc123..4ce12d84 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -254,9 +254,8 @@ nni_win_resolv_sysinit(void)
{
int rv;
- if ((rv = nni_mtx_init(&nni_win_resolv_mtx)) != 0) {
- return (rv);
- }
+ nni_mtx_init(&nni_win_resolv_mtx);
+
if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) {
nni_mtx_fini(&nni_win_resolv_mtx);
return (rv);
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index c01ec782..879cd772 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -30,12 +30,11 @@ nni_free(void *b, size_t z)
HeapFree(GetProcessHeap(), 0, b);
}
-int
+void
nni_plat_mtx_init(nni_plat_mtx *mtx)
{
InitializeSRWLock(&mtx->srl);
mtx->init = 1;
- return (0);
}
void
@@ -56,12 +55,11 @@ nni_plat_mtx_unlock(nni_plat_mtx *mtx)
ReleaseSRWLockExclusive(&mtx->srl);
}
-int
+void
nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx)
{
InitializeConditionVariable(&cv->cv);
cv->srl = &mtx->srl;
- return (0);
}
void