aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-01 19:48:10 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-01 19:48:10 -0800
commit783470724ed22b315f2ecc4e3b1ac9d199d44ea2 (patch)
treed4883f305b1f2c6f57710cd8900f4b0932ae14c0 /src
parentbbed172a2b38f9227ca9e1c02a933df068e5eaf7 (diff)
downloadnng-783470724ed22b315f2ecc4e3b1ac9d199d44ea2.tar.gz
nng-783470724ed22b315f2ecc4e3b1ac9d199d44ea2.tar.bz2
nng-783470724ed22b315f2ecc4e3b1ac9d199d44ea2.zip
Final purge of old threading & synch stuff.
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/core/endpt.c90
-rw-r--r--src/core/endpt.h2
-rw-r--r--src/core/pipe.c18
-rw-r--r--src/core/platform.h59
-rw-r--r--src/core/socket.c83
-rw-r--r--src/core/socket.h4
-rw-r--r--src/platform/posix/posix_synch.c154
-rw-r--r--src/platform/posix/posix_thread.c44
9 files changed, 117 insertions, 338 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 96ae3fdc..78070cc1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -62,7 +62,6 @@ set (NNG_SOURCES
platform/posix/posix_alloc.c
platform/posix/posix_clock.c
platform/posix/posix_debug.c
- platform/posix/posix_synch.c
platform/posix/posix_thread.c
protocol/pair/pair.c
diff --git a/src/core/endpt.c b/src/core/endpt.c
index e1488035..e54e423c 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -38,7 +38,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
ep->ep_pipe = NULL;
NNI_LIST_NODE_INIT(&ep->ep_node);
- if ((rv = nni_cond_init(&ep->ep_cv, &ep->ep_sock->s_mx)) != 0) {
+ if ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_sock->s_mx)) != 0) {
nni_free(ep, sizeof (*ep));
return (NNG_ENOMEM);
}
@@ -49,14 +49,14 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
rv = ep->ep_ops.ep_create(&ep->ep_data, addr, nni_socket_proto(sock));
if (rv != 0) {
- nni_cond_fini(&ep->ep_cv);
+ nni_cv_fini(&ep->ep_cv);
nni_free(ep, sizeof (*ep));
return (rv);
}
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
nni_list_append(&sock->s_eps, ep);
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
*epp = ep;
return (0);
@@ -67,11 +67,11 @@ void
nni_endpt_close(nni_endpt *ep)
{
nni_pipe *pipe;
- nni_mutex *mx = &ep->ep_sock->s_mx;
+ nni_mtx *mx = &ep->ep_sock->s_mx;
- nni_mutex_enter(mx);
+ nni_mtx_lock(mx);
if (ep->ep_close) {
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
return;
}
ep->ep_close = 1;
@@ -80,9 +80,9 @@ nni_endpt_close(nni_endpt *ep)
pipe->p_ep = NULL;
ep->ep_pipe = NULL;
}
- nni_cond_broadcast(&ep->ep_cv);
+ nni_cv_wake(&ep->ep_cv);
nni_list_remove(&ep->ep_sock->s_eps, ep);
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
if (ep->ep_mode != NNI_EP_MODE_IDLE) {
nni_thr_fini(&ep->ep_thr);
@@ -90,7 +90,7 @@ nni_endpt_close(nni_endpt *ep)
ep->ep_ops.ep_destroy(ep->ep_data);
- nni_cond_fini(&ep->ep_cv);
+ nni_cv_fini(&ep->ep_cv);
nni_free(ep, sizeof (*ep));
}
@@ -141,18 +141,18 @@ nni_dialer(void *arg)
nni_pipe *pipe;
int rv;
nni_time cooldown;
- nni_mutex *mx = &ep->ep_sock->s_mx;
+ nni_mtx *mx = &ep->ep_sock->s_mx;
for (;;) {
- nni_mutex_enter(mx);
+ nni_mtx_lock(mx);
while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
- nni_cond_wait(&ep->ep_cv);
+ nni_cv_wait(&ep->ep_cv);
}
if (ep->ep_close) {
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
break;
}
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
rv = nni_dial_once(ep);
switch (rv) {
@@ -175,15 +175,15 @@ nni_dialer(void *arg)
// wait even longer, since the system needs time to
// release resources.
cooldown += nni_clock();
- nni_mutex_enter(mx);
+ nni_mtx_lock(mx);
while (!ep->ep_close) {
// We need a different condvar...
- rv = nni_cond_waituntil(&ep->ep_cv, cooldown);
+ rv = nni_cv_until(&ep->ep_cv, cooldown);
if (rv == NNG_ETIMEDOUT) {
break;
}
}
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
}
}
@@ -192,38 +192,37 @@ int
nni_endpt_dial(nni_endpt *ep, int flags)
{
int rv = 0;
- nni_thread *reap = NULL;
- nni_mutex *mx = &ep->ep_sock->s_mx;
+ nni_mtx *mx = &ep->ep_sock->s_mx;
- nni_mutex_enter(mx);
+ nni_mtx_lock(mx);
if (ep->ep_mode != NNI_EP_MODE_IDLE) {
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
return (NNG_EBUSY);
}
if (ep->ep_close) {
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
return (NNG_ECLOSED);
}
if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) {
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
return (rv);
}
ep->ep_mode = NNI_EP_MODE_DIAL;
if (flags & NNG_FLAG_SYNCH) {
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
rv = nni_dial_once(ep);
if (rv != 0) {
nni_thr_fini(&ep->ep_thr);
ep->ep_mode = NNI_EP_MODE_IDLE;
return (rv);
}
- nni_mutex_enter(mx);
+ nni_mtx_lock(mx);
}
nni_thr_run(&ep->ep_thr);
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
return (rv);
}
@@ -256,19 +255,19 @@ nni_listener(void *arg)
nni_endpt *ep = arg;
nni_pipe *pipe;
int rv;
- nni_mutex *mx = &ep->ep_sock->s_mx;
+ nni_mtx *mx = &ep->ep_sock->s_mx;
for (;;) {
nni_time cooldown;
- nni_mutex_enter(mx);
+ nni_mtx_lock(mx);
// If we didn't bind synchronously, do it now.
while (!ep->ep_bound && !ep->ep_close) {
int rv;
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
rv = ep->ep_ops.ep_bind(ep->ep_data);
- nni_mutex_enter(mx);
+ nni_mtx_lock(mx);
if (rv == 0) {
ep->ep_bound = 1;
@@ -280,17 +279,17 @@ nni_listener(void *arg)
cooldown = 10000;
cooldown += nni_clock();
while (!ep->ep_close) {
- rv = nni_cond_waituntil(&ep->ep_cv, cooldown);
+ rv = nni_cv_until(&ep->ep_cv, cooldown);
if (rv == NNG_ETIMEDOUT) {
break;
}
}
}
if (ep->ep_close) {
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
break;
}
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
pipe = NULL;
@@ -308,14 +307,14 @@ nni_listener(void *arg)
cooldown = 100000; // 100ms
}
cooldown += nni_clock();
- nni_mutex_enter(mx);
+ nni_mtx_lock(mx);
while (!ep->ep_close) {
- rv = nni_cond_waituntil(&ep->ep_cv, cooldown);
+ rv = nni_cv_until(&ep->ep_cv, cooldown);
if (rv == NNG_ETIMEDOUT) {
break;
}
}
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
}
}
@@ -324,41 +323,40 @@ int
nni_endpt_listen(nni_endpt *ep, int flags)
{
int rv = 0;
- nni_thread *reap = NULL;
- nni_mutex *mx = &ep->ep_sock->s_mx;
+ nni_mtx *mx = &ep->ep_sock->s_mx;
- nni_mutex_enter(mx);
+ nni_mtx_lock(mx);
if (ep->ep_mode != NNI_EP_MODE_IDLE) {
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
return (NNG_EBUSY);
}
if (ep->ep_close) {
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
return (NNG_ECLOSED);
}
if ((rv = nni_thr_init(&ep->ep_thr, nni_listener, ep)) != 0) {
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
return (rv);
}
ep->ep_mode = NNI_EP_MODE_LISTEN;
if (flags & NNG_FLAG_SYNCH) {
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
rv = ep->ep_ops.ep_bind(ep->ep_data);
if (rv != 0) {
nni_thr_fini(&ep->ep_thr);
ep->ep_mode = NNI_EP_MODE_IDLE;
return (rv);
}
- nni_mutex_enter(mx);
+ nni_mtx_lock(mx);
ep->ep_bound = 1;
}
nni_thr_run(&ep->ep_thr);
- nni_mutex_exit(mx);
+ nni_mtx_unlock(mx);
return (0);
}
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 757d58ce..28a52e81 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -25,7 +25,7 @@ struct nng_endpt {
int ep_mode;
int ep_close; // full shutdown
int ep_bound; // true if we bound locally
- nni_cond ep_cv;
+ nni_cv ep_cv;
nni_pipe * ep_pipe; // Connected pipe (dialers only)
};
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 8fa41934..34db782a 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -48,15 +48,15 @@ nni_pipe_close(nni_pipe *p)
p->p_ops.p_close(p->p_trandata);
}
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if (!p->p_reap) {
// schedule deferred reap/close
p->p_reap = 1;
nni_list_remove(&sock->s_pipes, p);
nni_list_append(&sock->s_reaps, p);
- nni_cond_broadcast(&sock->s_cv);
+ nni_cv_wake(&sock->s_cv);
}
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
}
@@ -119,9 +119,9 @@ nni_pipe_create(nni_pipe **pp, nni_endpt *ep)
return (rv);
}
p->p_psize = sock->s_ops.proto_pipe_size;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
nni_list_append(&sock->s_pipes, p);
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
*pp = p;
return (0);
@@ -146,9 +146,9 @@ nni_pipe_start(nni_pipe *pipe)
int collide;
nni_socket *sock = pipe->p_sock;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
@@ -178,12 +178,12 @@ nni_pipe_start(nni_pipe *pipe)
// XXX: Publish event
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (0);
fail:
pipe->p_reap = 1;
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
nni_pipe_close(pipe);
return (rv);
}
diff --git a/src/core/platform.h b/src/core/platform.h
index 3dd9fa29..7b39ac8b 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -60,84 +60,65 @@ extern void *nni_alloc(size_t);
// Most implementations can just call free() here.
extern void nni_free(void *, size_t);
-typedef struct nni_mutex nni_mutex;
-typedef struct nni_cond nni_cond;
-
-typedef struct nni_plat_mtx nni_plat_mtx;
-typedef struct nni_plat_cv nni_plat_cv;
-typedef struct nni_plat_thr nni_plat_thr;
+typedef struct nni_plat_mtx nni_plat_mtx;
+typedef struct nni_plat_cv nni_plat_cv;
+typedef struct nni_plat_thr nni_plat_thr;
// Mutex handling.
-// nni_mutex_init initializes a mutex structure. This may require dynamic
+// nni_plat_mtx_init initializes a mutex structure. This may require dynamic
// allocation, depending on the platform. It can return NNG_ENOMEM if that
// fails.
extern int nni_plat_mtx_init(nni_plat_mtx *);
-extern int nni_mutex_init(nni_mutex *);
-// nni_mutex_fini destroys the mutex and releases any resources allocated for
-// it's use.
-extern void nni_mutex_fini(nni_mutex *);
+// nni_plat_mtx_fini destroys the mutex and releases any resources allocated
+// for it's use.
extern void nni_plat_mtx_fini(nni_plat_mtx *);
-// nni_mutex_enter locks the mutex. This is not recursive -- a mutex can only
-// be entered once.
-extern void nni_mutex_enter(nni_mutex *);
+// nni_plat_mtx_lock locks the mutex. This is not recursive -- a mutex can
+// only be entered once.
extern void nni_plat_mtx_lock(nni_plat_mtx *);
-// nni_mutex_exit unlocks the mutex. This can only be performed by the thread
-// that owned the mutex.
-extern void nni_mutex_exit(nni_mutex *);
+// nni_plat_mtx_unlock unlocks the mutex. This can only be performed by the
+// threadthat owned the mutex.
extern void nni_plat_mtx_unlock(nni_plat_mtx *);
-// nni_mutex_tryenter tries to lock the mutex. If it can't, it may return
-// NNG_EBUSY.
-extern int nni_mutex_tryenter(nni_mutex *);
+// nni_plat_mtx_tryenter tries to lock the mutex. If it can't, it may return
+// NNG_EBUSY if the mutex is already owned.
extern int nni_plat_mtx_trylock(nni_plat_mtx *);
-// nni_cond_init initializes a condition variable. We require a mutex be
+// nni_plat_cv_init initializes a condition variable. We require a mutex be
// supplied with it, and that mutex must always be held when performing any
// operations on the condition variable (other than fini.) This may require
// dynamic allocation, and if so this operation may fail with NNG_ENOMEM.
-extern int nni_cond_init(nni_cond *, nni_mutex *);
extern int nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *);
-// nni_cond_fini releases all resources associated with condition variable.
-extern void nni_cond_fini(nni_cond *);
+// nni_plat_cv_fini releases all resources associated with condition variable.
extern void nni_plat_cv_fini(nni_plat_cv *);
-// nni_cond_broadcast wakes all waiters on the condition. This should be
+// nni_plat_cv_wake wakes all waiters on the condition. This should be
// called with the lock held.
-extern void nni_cond_broadcast(nni_cond *);
extern void nni_plat_cv_wake(nni_plat_cv *);
-// nni_cond_signal wakes a signal waiter.
-extern void nni_cond_signal(nni_cond *);
-
-// nni_cond_wait waits for a wake up on the condition variable. The
+// nni_plat_cv_wait waits for a wake up on the condition variable. The
// associated lock is atomically released and reacquired upon wake up.
// Callers can be spuriously woken. The associated lock must be held.
-extern void nni_cond_wait(nni_cond *);
extern void nni_plat_cv_wait(nni_plat_cv *);
-// nni_cond_waituntil waits for a wakeup on the condition variable, or
+// nni_plat_cv_until waits for a wakeup on the condition variable, or
// until the system time reaches the specified absolute time. (It is an
// absolute form of nni_cond_timedwait.) Early wakeups are possible, so
// check the condition. It will return either NNG_ETIMEDOUT, or 0.
-extern int nni_cond_waituntil(nni_cond *, nni_time);
extern int nni_plat_cv_until(nni_plat_cv *, nni_time);
-typedef struct nni_thread nni_thread;
-
-// nni_thread_creates a thread that runs the given function. The thread
-// receives a single argument.
-extern int nni_thread_create(nni_thread **, void (*fn)(void *), void *);
+// nni_plat_thr_init creates a thread that runs the given function. The
+// thread receives a single argument. The thread starts execution
+// immediately.
extern int nni_plat_thr_init(nni_plat_thr *, void (*)(void *), void *);
// nni_thread_reap waits for the thread to exit, and then releases any
// resources associated with the thread. After this returns, it
// is an error to reference the thread in any further way.
-extern void nni_thread_reap(nni_thread *);
extern void nni_plat_thr_fini(nni_plat_thr *);
// nn_clock returns a number of microseconds since some arbitrary time
diff --git a/src/core/socket.c b/src/core/socket.c
index a79b1fde..48cd25bc 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -41,16 +41,16 @@ nni_reaper(void *arg)
nni_pipe *pipe;
nni_endpt *ep;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) {
nni_list_remove(&sock->s_reaps, pipe);
if (((ep = pipe->p_ep) != NULL) &&
((ep->ep_pipe == pipe))) {
ep->ep_pipe = NULL;
- nni_cond_broadcast(&ep->ep_cv);
+ nni_cv_wake(&ep->ep_cv);
}
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
// This should already have been done.
if (pipe->p_trandata != NULL) {
@@ -74,12 +74,12 @@ nni_reaper(void *arg)
if ((sock->s_closing) &&
(nni_list_first(&sock->s_reaps) == NULL) &&
(nni_list_first(&sock->s_pipes) == NULL)) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
break;
}
- nni_cond_wait(&sock->s_cv);
- nni_mutex_exit(&sock->s_mx);
+ nni_cv_wait(&sock->s_cv);
+ nni_mtx_unlock(&sock->s_mx);
}
}
@@ -109,35 +109,35 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node);
- if ((rv = nni_mutex_init(&sock->s_mx)) != 0) {
+ if ((rv = nni_mtx_init(&sock->s_mx)) != 0) {
nni_free(sock, sizeof (*sock));
return (rv);
}
- if ((rv = nni_cond_init(&sock->s_cv, &sock->s_mx)) != 0) {
- nni_mutex_fini(&sock->s_mx);
+ if ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) {
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) {
- nni_cond_fini(&sock->s_cv);
- nni_mutex_fini(&sock->s_mx);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
if ((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) {
nni_thr_fini(&sock->s_reaper);
- nni_cond_fini(&sock->s_cv);
- nni_mutex_fini(&sock->s_mx);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
if ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0) {
nni_msgqueue_destroy(sock->s_uwq);
nni_thr_fini(&sock->s_reaper);
- nni_cond_fini(&sock->s_cv);
- nni_mutex_fini(&sock->s_mx);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
@@ -146,8 +146,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
nni_msgqueue_destroy(sock->s_urq);
nni_msgqueue_destroy(sock->s_uwq);
nni_thr_fini(&sock->s_reaper);
- nni_cond_fini(&sock->s_cv);
- nni_mutex_fini(&sock->s_mx);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
@@ -164,18 +164,17 @@ nni_socket_close(nni_socket *sock)
nni_pipe *pipe;
nni_endpt *ep;
nni_time linger;
- nni_thread *reaper;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = 1;
// Stop all EPS. We're going to do this first, since we know
// we're closing.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
nni_endpt_close(ep);
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
}
// Special optimization; if there are no pipes connected,
@@ -186,7 +185,7 @@ nni_socket_close(nni_socket *sock)
} else {
linger = nni_clock() + sock->s_linger;
}
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
// We drain the upper write queue. This is just like closing it,
@@ -199,9 +198,9 @@ nni_socket_close(nni_socket *sock)
// writes (e.g. a slow reader on the other side), it should be
// trying to shut things down. We wait to give it
// a chance to do so gracefully.
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
while (nni_list_first(&sock->s_pipes) != NULL) {
- if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
+ if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
break;
}
}
@@ -221,8 +220,8 @@ nni_socket_close(nni_socket *sock)
nni_list_append(&sock->s_reaps, pipe);
}
- nni_cond_broadcast(&sock->s_cv);
- nni_mutex_exit(&sock->s_mx);
+ nni_cv_wake(&sock->s_cv);
+ nni_mtx_unlock(&sock->s_mx);
// Wait for the reaper to exit.
nni_thr_fini(&sock->s_reaper);
@@ -234,8 +233,8 @@ nni_socket_close(nni_socket *sock)
// And we need to clean up *our* state.
nni_msgqueue_destroy(sock->s_urq);
nni_msgqueue_destroy(sock->s_uwq);
- nni_cond_fini(&sock->s_cv);
- nni_mutex_fini(&sock->s_mx);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (0);
}
@@ -250,17 +249,17 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_time expire)
// Senderr is typically set by protocols when the state machine
// indicates that it is no longer valid to send a message. E.g.
// a REP socket with no REQ pending.
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
if ((rv = sock->s_senderr) != 0) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
besteffort = sock->s_besteffort;
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
if (sock->s_ops.proto_send_filter != NULL) {
msg = sock->s_ops.proto_send_filter(sock->s_data, msg);
@@ -290,16 +289,16 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_time expire)
int rv;
nni_msg *msg;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
if ((rv = sock->s_recverr) != 0) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
for (;;) {
rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire);
@@ -392,11 +391,11 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size)
void *ptr;
int rv = ENOTSUP;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if (sock->s_ops.proto_setopt != NULL) {
rv = sock->s_ops.proto_setopt(sock->s_data, opt, val, size);
if (rv != NNG_ENOTSUP) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
}
@@ -423,7 +422,7 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size)
rv = nni_setopt_buf(sock->s_urq, val, size);
break;
}
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
@@ -435,11 +434,11 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep)
void *ptr;
int rv = ENOTSUP;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if (sock->s_ops.proto_getopt != NULL) {
rv = sock->s_ops.proto_getopt(sock->s_data, opt, val, sizep);
if (rv != NNG_ENOTSUP) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
}
@@ -466,6 +465,6 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep)
rv = nni_getopt_buf(sock->s_urq, val, sizep);
break;
}
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 62c5ccb5..8d72c5e6 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -14,8 +14,8 @@
// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
// TRANSPORTS.
struct nng_socket {
- nni_mutex s_mx;
- nni_cond s_cv;
+ nni_mtx s_mx;
+ nni_cv s_cv;
nni_msgqueue * s_uwq; // Upper write queue
nni_msgqueue * s_urq; // Upper read queue
diff --git a/src/platform/posix/posix_synch.c b/src/platform/posix/posix_synch.c
deleted file mode 100644
index d9fed53f..00000000
--- a/src/platform/posix/posix_synch.c
+++ /dev/null
@@ -1,154 +0,0 @@
-//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-// POSIX synchronization (mutexes and condition variables). This uses
-// pthreads.
-
-#include "core/nng_impl.h"
-
-#ifdef PLATFORM_POSIX_SYNCH
-
-#include <pthread.h>
-#include <time.h>
-#include <string.h>
-
-extern pthread_condattr_t nni_cvattr;
-extern pthread_mutexattr_t nni_mxattr;
-
-int
-nni_mutex_init(nni_mutex *mp)
-{
- if (pthread_mutex_init(&mp->mx, &nni_mxattr) != 0) {
- return (NNG_ENOMEM);
- }
- return (0);
-}
-
-
-void
-nni_mutex_fini(nni_mutex *mp)
-{
- int rv;
-
- if ((rv = pthread_mutex_destroy(&mp->mx)) != 0) {
- nni_panic("pthread_mutex_destroy failed: %s", strerror(rv));
- }
-}
-
-
-void
-nni_mutex_enter(nni_mutex *m)
-{
- int rv;
-
- if ((rv = pthread_mutex_lock(&m->mx)) != 0) {
- nni_panic("pthread_mutex_lock failed: %s", strerror(rv));
- }
-}
-
-
-void
-nni_mutex_exit(nni_mutex *m)
-{
- if (pthread_mutex_unlock(&m->mx) != 0) {
- nni_panic("pthread_mutex_unlock failed");
- }
-}
-
-
-int
-nni_mutex_tryenter(nni_mutex *m)
-{
- if (pthread_mutex_trylock(&m->mx) != 0) {
- return (NNG_EBUSY);
- }
- return (0);
-}
-
-
-int
-nni_cond_init(nni_cond *c, nni_mutex *m)
-{
- if (pthread_cond_init(&c->cv, &nni_cvattr) != 0) {
- // In theory could be EAGAIN, but handle like ENOMEM
- return (NNG_ENOMEM);
- }
- c->mx = &m->mx;
- return (0);
-}
-
-
-void
-nni_cond_fini(nni_cond *c)
-{
- if (pthread_cond_destroy(&c->cv) != 0) {
- nni_panic("pthread_cond_destroy failed");
- }
-}
-
-
-void
-nni_cond_signal(nni_cond *c)
-{
- if (pthread_cond_signal(&c->cv) != 0) {
- nni_panic("pthread_cond_signal failed");
- }
-}
-
-
-void
-nni_cond_broadcast(nni_cond *c)
-{
- if (pthread_cond_broadcast(&c->cv) != 0) {
- nni_panic("pthread_cond_broadcast failed");
- }
-}
-
-
-void
-nni_cond_wait(nni_cond *c)
-{
- if (pthread_cond_wait(&c->cv, c->mx) != 0) {
- nni_panic("pthread_cond_wait failed");
- }
-}
-
-
-int
-nni_cond_waituntil(nni_cond *c, nni_time usec)
-{
- struct timespec ts;
- int rv;
- nni_duration delta = usec - nni_clock();
-
-
- if (usec != NNI_TIME_NEVER) {
- ts.tv_sec = usec / 1000000;
- ts.tv_nsec = (usec % 1000000) * 1000;
-
- rv = pthread_cond_timedwait(&c->cv, c->mx, &ts);
- } else {
- rv = pthread_cond_wait(&c->cv, c->mx);
- }
-
- if (rv == ETIMEDOUT) {
- if (nni_clock() < usec) {
- // This only happens if the implementation
- // is buggy.
- nni_panic("Premature wakupe!");
- }
- return (NNG_ETIMEDOUT);
- } else if (rv != 0) {
- nni_panic("pthread_cond_timedwait returned %d", rv);
- }
- return (0);
-}
-
-
-#endif
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index d1944879..0a999326 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -45,50 +45,6 @@ nni_plat_nextid(void)
return (id);
}
-
-static void *
-nni_thrfunc(void *arg)
-{
- nni_thread *thr = arg;
-
- thr->func(thr->arg);
- return (NULL);
-}
-
-
-int
-nni_thread_create(nni_thread **tp, void (*fn)(void *), void *arg)
-{
- nni_thread *thr;
- int rv;
-
- if ((thr = nni_alloc(sizeof (*thr))) == NULL) {
- return (NNG_ENOMEM);
- }
- thr->func = fn;
- thr->arg = arg;
-
- if ((rv = pthread_create(&thr->tid, NULL, nni_thrfunc, thr)) != 0) {
- nni_free(thr, sizeof (*thr));
- return (NNG_ENOMEM);
- }
- *tp = thr;
- return (0);
-}
-
-
-void
-nni_thread_reap(nni_thread *thr)
-{
- int rv;
-
- if ((rv = pthread_join(thr->tid, NULL)) != 0) {
- nni_panic("pthread_thread: %s", strerror(rv));
- }
- nni_free(thr, sizeof (*thr));
-}
-
-
int
nni_plat_mtx_init(nni_plat_mtx *mtx)
{