diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/core/endpt.c | 90 | ||||
| -rw-r--r-- | src/core/endpt.h | 2 | ||||
| -rw-r--r-- | src/core/pipe.c | 18 | ||||
| -rw-r--r-- | src/core/platform.h | 59 | ||||
| -rw-r--r-- | src/core/socket.c | 83 | ||||
| -rw-r--r-- | src/core/socket.h | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_synch.c | 154 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 44 |
9 files changed, 117 insertions, 338 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 96ae3fdc..78070cc1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -62,7 +62,6 @@ set (NNG_SOURCES platform/posix/posix_alloc.c platform/posix/posix_clock.c platform/posix/posix_debug.c - platform/posix/posix_synch.c platform/posix/posix_thread.c protocol/pair/pair.c diff --git a/src/core/endpt.c b/src/core/endpt.c index e1488035..e54e423c 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -38,7 +38,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) ep->ep_pipe = NULL; NNI_LIST_NODE_INIT(&ep->ep_node); - if ((rv = nni_cond_init(&ep->ep_cv, &ep->ep_sock->s_mx)) != 0) { + if ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_sock->s_mx)) != 0) { nni_free(ep, sizeof (*ep)); return (NNG_ENOMEM); } @@ -49,14 +49,14 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) rv = ep->ep_ops.ep_create(&ep->ep_data, addr, nni_socket_proto(sock)); if (rv != 0) { - nni_cond_fini(&ep->ep_cv); + nni_cv_fini(&ep->ep_cv); nni_free(ep, sizeof (*ep)); return (rv); } - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); nni_list_append(&sock->s_eps, ep); - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); *epp = ep; return (0); @@ -67,11 +67,11 @@ void nni_endpt_close(nni_endpt *ep) { nni_pipe *pipe; - nni_mutex *mx = &ep->ep_sock->s_mx; + nni_mtx *mx = &ep->ep_sock->s_mx; - nni_mutex_enter(mx); + nni_mtx_lock(mx); if (ep->ep_close) { - nni_mutex_exit(mx); + nni_mtx_unlock(mx); return; } ep->ep_close = 1; @@ -80,9 +80,9 @@ nni_endpt_close(nni_endpt *ep) pipe->p_ep = NULL; ep->ep_pipe = NULL; } - nni_cond_broadcast(&ep->ep_cv); + nni_cv_wake(&ep->ep_cv); nni_list_remove(&ep->ep_sock->s_eps, ep); - nni_mutex_exit(mx); + nni_mtx_unlock(mx); if (ep->ep_mode != NNI_EP_MODE_IDLE) { nni_thr_fini(&ep->ep_thr); @@ -90,7 +90,7 @@ nni_endpt_close(nni_endpt *ep) ep->ep_ops.ep_destroy(ep->ep_data); - nni_cond_fini(&ep->ep_cv); + nni_cv_fini(&ep->ep_cv); nni_free(ep, sizeof (*ep)); } @@ -141,18 +141,18 @@ nni_dialer(void *arg) nni_pipe *pipe; int rv; nni_time cooldown; - nni_mutex *mx = &ep->ep_sock->s_mx; + nni_mtx *mx = &ep->ep_sock->s_mx; for (;;) { - nni_mutex_enter(mx); + nni_mtx_lock(mx); while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { - nni_cond_wait(&ep->ep_cv); + nni_cv_wait(&ep->ep_cv); } if (ep->ep_close) { - nni_mutex_exit(mx); + nni_mtx_unlock(mx); break; } - nni_mutex_exit(mx); + nni_mtx_unlock(mx); rv = nni_dial_once(ep); switch (rv) { @@ -175,15 +175,15 @@ nni_dialer(void *arg) // wait even longer, since the system needs time to // release resources. cooldown += nni_clock(); - nni_mutex_enter(mx); + nni_mtx_lock(mx); while (!ep->ep_close) { // We need a different condvar... - rv = nni_cond_waituntil(&ep->ep_cv, cooldown); + rv = nni_cv_until(&ep->ep_cv, cooldown); if (rv == NNG_ETIMEDOUT) { break; } } - nni_mutex_exit(mx); + nni_mtx_unlock(mx); } } @@ -192,38 +192,37 @@ int nni_endpt_dial(nni_endpt *ep, int flags) { int rv = 0; - nni_thread *reap = NULL; - nni_mutex *mx = &ep->ep_sock->s_mx; + nni_mtx *mx = &ep->ep_sock->s_mx; - nni_mutex_enter(mx); + nni_mtx_lock(mx); if (ep->ep_mode != NNI_EP_MODE_IDLE) { - nni_mutex_exit(mx); + nni_mtx_unlock(mx); return (NNG_EBUSY); } if (ep->ep_close) { - nni_mutex_exit(mx); + nni_mtx_unlock(mx); return (NNG_ECLOSED); } if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) { - nni_mutex_exit(mx); + nni_mtx_unlock(mx); return (rv); } ep->ep_mode = NNI_EP_MODE_DIAL; if (flags & NNG_FLAG_SYNCH) { - nni_mutex_exit(mx); + nni_mtx_unlock(mx); rv = nni_dial_once(ep); if (rv != 0) { nni_thr_fini(&ep->ep_thr); ep->ep_mode = NNI_EP_MODE_IDLE; return (rv); } - nni_mutex_enter(mx); + nni_mtx_lock(mx); } nni_thr_run(&ep->ep_thr); - nni_mutex_exit(mx); + nni_mtx_unlock(mx); return (rv); } @@ -256,19 +255,19 @@ nni_listener(void *arg) nni_endpt *ep = arg; nni_pipe *pipe; int rv; - nni_mutex *mx = &ep->ep_sock->s_mx; + nni_mtx *mx = &ep->ep_sock->s_mx; for (;;) { nni_time cooldown; - nni_mutex_enter(mx); + nni_mtx_lock(mx); // If we didn't bind synchronously, do it now. while (!ep->ep_bound && !ep->ep_close) { int rv; - nni_mutex_exit(mx); + nni_mtx_unlock(mx); rv = ep->ep_ops.ep_bind(ep->ep_data); - nni_mutex_enter(mx); + nni_mtx_lock(mx); if (rv == 0) { ep->ep_bound = 1; @@ -280,17 +279,17 @@ nni_listener(void *arg) cooldown = 10000; cooldown += nni_clock(); while (!ep->ep_close) { - rv = nni_cond_waituntil(&ep->ep_cv, cooldown); + rv = nni_cv_until(&ep->ep_cv, cooldown); if (rv == NNG_ETIMEDOUT) { break; } } } if (ep->ep_close) { - nni_mutex_exit(mx); + nni_mtx_unlock(mx); break; } - nni_mutex_exit(mx); + nni_mtx_unlock(mx); pipe = NULL; @@ -308,14 +307,14 @@ nni_listener(void *arg) cooldown = 100000; // 100ms } cooldown += nni_clock(); - nni_mutex_enter(mx); + nni_mtx_lock(mx); while (!ep->ep_close) { - rv = nni_cond_waituntil(&ep->ep_cv, cooldown); + rv = nni_cv_until(&ep->ep_cv, cooldown); if (rv == NNG_ETIMEDOUT) { break; } } - nni_mutex_exit(mx); + nni_mtx_unlock(mx); } } @@ -324,41 +323,40 @@ int nni_endpt_listen(nni_endpt *ep, int flags) { int rv = 0; - nni_thread *reap = NULL; - nni_mutex *mx = &ep->ep_sock->s_mx; + nni_mtx *mx = &ep->ep_sock->s_mx; - nni_mutex_enter(mx); + nni_mtx_lock(mx); if (ep->ep_mode != NNI_EP_MODE_IDLE) { - nni_mutex_exit(mx); + nni_mtx_unlock(mx); return (NNG_EBUSY); } if (ep->ep_close) { - nni_mutex_exit(mx); + nni_mtx_unlock(mx); return (NNG_ECLOSED); } if ((rv = nni_thr_init(&ep->ep_thr, nni_listener, ep)) != 0) { - nni_mutex_exit(mx); + nni_mtx_unlock(mx); return (rv); } ep->ep_mode = NNI_EP_MODE_LISTEN; if (flags & NNG_FLAG_SYNCH) { - nni_mutex_exit(mx); + nni_mtx_unlock(mx); rv = ep->ep_ops.ep_bind(ep->ep_data); if (rv != 0) { nni_thr_fini(&ep->ep_thr); ep->ep_mode = NNI_EP_MODE_IDLE; return (rv); } - nni_mutex_enter(mx); + nni_mtx_lock(mx); ep->ep_bound = 1; } nni_thr_run(&ep->ep_thr); - nni_mutex_exit(mx); + nni_mtx_unlock(mx); return (0); } diff --git a/src/core/endpt.h b/src/core/endpt.h index 757d58ce..28a52e81 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -25,7 +25,7 @@ struct nng_endpt { int ep_mode; int ep_close; // full shutdown int ep_bound; // true if we bound locally - nni_cond ep_cv; + nni_cv ep_cv; nni_pipe * ep_pipe; // Connected pipe (dialers only) }; diff --git a/src/core/pipe.c b/src/core/pipe.c index 8fa41934..34db782a 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -48,15 +48,15 @@ nni_pipe_close(nni_pipe *p) p->p_ops.p_close(p->p_trandata); } - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if (!p->p_reap) { // schedule deferred reap/close p->p_reap = 1; nni_list_remove(&sock->s_pipes, p); nni_list_append(&sock->s_reaps, p); - nni_cond_broadcast(&sock->s_cv); + nni_cv_wake(&sock->s_cv); } - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); } @@ -119,9 +119,9 @@ nni_pipe_create(nni_pipe **pp, nni_endpt *ep) return (rv); } p->p_psize = sock->s_ops.proto_pipe_size; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); nni_list_append(&sock->s_pipes, p); - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); *pp = p; return (0); @@ -146,9 +146,9 @@ nni_pipe_start(nni_pipe *pipe) int collide; nni_socket *sock = pipe->p_sock; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } @@ -178,12 +178,12 @@ nni_pipe_start(nni_pipe *pipe) // XXX: Publish event - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (0); fail: pipe->p_reap = 1; - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); nni_pipe_close(pipe); return (rv); } diff --git a/src/core/platform.h b/src/core/platform.h index 3dd9fa29..7b39ac8b 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -60,84 +60,65 @@ extern void *nni_alloc(size_t); // Most implementations can just call free() here. extern void nni_free(void *, size_t); -typedef struct nni_mutex nni_mutex; -typedef struct nni_cond nni_cond; - -typedef struct nni_plat_mtx nni_plat_mtx; -typedef struct nni_plat_cv nni_plat_cv; -typedef struct nni_plat_thr nni_plat_thr; +typedef struct nni_plat_mtx nni_plat_mtx; +typedef struct nni_plat_cv nni_plat_cv; +typedef struct nni_plat_thr nni_plat_thr; // Mutex handling. -// nni_mutex_init initializes a mutex structure. This may require dynamic +// nni_plat_mtx_init initializes a mutex structure. This may require dynamic // allocation, depending on the platform. It can return NNG_ENOMEM if that // fails. extern int nni_plat_mtx_init(nni_plat_mtx *); -extern int nni_mutex_init(nni_mutex *); -// nni_mutex_fini destroys the mutex and releases any resources allocated for -// it's use. -extern void nni_mutex_fini(nni_mutex *); +// nni_plat_mtx_fini destroys the mutex and releases any resources allocated +// for it's use. extern void nni_plat_mtx_fini(nni_plat_mtx *); -// nni_mutex_enter locks the mutex. This is not recursive -- a mutex can only -// be entered once. -extern void nni_mutex_enter(nni_mutex *); +// nni_plat_mtx_lock locks the mutex. This is not recursive -- a mutex can +// only be entered once. extern void nni_plat_mtx_lock(nni_plat_mtx *); -// nni_mutex_exit unlocks the mutex. This can only be performed by the thread -// that owned the mutex. -extern void nni_mutex_exit(nni_mutex *); +// nni_plat_mtx_unlock unlocks the mutex. This can only be performed by the +// threadthat owned the mutex. extern void nni_plat_mtx_unlock(nni_plat_mtx *); -// nni_mutex_tryenter tries to lock the mutex. If it can't, it may return -// NNG_EBUSY. -extern int nni_mutex_tryenter(nni_mutex *); +// nni_plat_mtx_tryenter tries to lock the mutex. If it can't, it may return +// NNG_EBUSY if the mutex is already owned. extern int nni_plat_mtx_trylock(nni_plat_mtx *); -// nni_cond_init initializes a condition variable. We require a mutex be +// nni_plat_cv_init initializes a condition variable. We require a mutex be // supplied with it, and that mutex must always be held when performing any // operations on the condition variable (other than fini.) This may require // dynamic allocation, and if so this operation may fail with NNG_ENOMEM. -extern int nni_cond_init(nni_cond *, nni_mutex *); extern int nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *); -// nni_cond_fini releases all resources associated with condition variable. -extern void nni_cond_fini(nni_cond *); +// nni_plat_cv_fini releases all resources associated with condition variable. extern void nni_plat_cv_fini(nni_plat_cv *); -// nni_cond_broadcast wakes all waiters on the condition. This should be +// nni_plat_cv_wake wakes all waiters on the condition. This should be // called with the lock held. -extern void nni_cond_broadcast(nni_cond *); extern void nni_plat_cv_wake(nni_plat_cv *); -// nni_cond_signal wakes a signal waiter. -extern void nni_cond_signal(nni_cond *); - -// nni_cond_wait waits for a wake up on the condition variable. The +// nni_plat_cv_wait waits for a wake up on the condition variable. The // associated lock is atomically released and reacquired upon wake up. // Callers can be spuriously woken. The associated lock must be held. -extern void nni_cond_wait(nni_cond *); extern void nni_plat_cv_wait(nni_plat_cv *); -// nni_cond_waituntil waits for a wakeup on the condition variable, or +// nni_plat_cv_until waits for a wakeup on the condition variable, or // until the system time reaches the specified absolute time. (It is an // absolute form of nni_cond_timedwait.) Early wakeups are possible, so // check the condition. It will return either NNG_ETIMEDOUT, or 0. -extern int nni_cond_waituntil(nni_cond *, nni_time); extern int nni_plat_cv_until(nni_plat_cv *, nni_time); -typedef struct nni_thread nni_thread; - -// nni_thread_creates a thread that runs the given function. The thread -// receives a single argument. -extern int nni_thread_create(nni_thread **, void (*fn)(void *), void *); +// nni_plat_thr_init creates a thread that runs the given function. The +// thread receives a single argument. The thread starts execution +// immediately. extern int nni_plat_thr_init(nni_plat_thr *, void (*)(void *), void *); // nni_thread_reap waits for the thread to exit, and then releases any // resources associated with the thread. After this returns, it // is an error to reference the thread in any further way. -extern void nni_thread_reap(nni_thread *); extern void nni_plat_thr_fini(nni_plat_thr *); // nn_clock returns a number of microseconds since some arbitrary time diff --git a/src/core/socket.c b/src/core/socket.c index a79b1fde..48cd25bc 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -41,16 +41,16 @@ nni_reaper(void *arg) nni_pipe *pipe; nni_endpt *ep; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) { nni_list_remove(&sock->s_reaps, pipe); if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { ep->ep_pipe = NULL; - nni_cond_broadcast(&ep->ep_cv); + nni_cv_wake(&ep->ep_cv); } - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); // This should already have been done. if (pipe->p_trandata != NULL) { @@ -74,12 +74,12 @@ nni_reaper(void *arg) if ((sock->s_closing) && (nni_list_first(&sock->s_reaps) == NULL) && (nni_list_first(&sock->s_pipes) == NULL)) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); break; } - nni_cond_wait(&sock->s_cv); - nni_mutex_exit(&sock->s_mx); + nni_cv_wait(&sock->s_cv); + nni_mtx_unlock(&sock->s_mx); } } @@ -109,35 +109,35 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node); - if ((rv = nni_mutex_init(&sock->s_mx)) != 0) { + if ((rv = nni_mtx_init(&sock->s_mx)) != 0) { nni_free(sock, sizeof (*sock)); return (rv); } - if ((rv = nni_cond_init(&sock->s_cv, &sock->s_mx)) != 0) { - nni_mutex_fini(&sock->s_mx); + if ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) { + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { - nni_cond_fini(&sock->s_cv); - nni_mutex_fini(&sock->s_mx); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } if ((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) { nni_thr_fini(&sock->s_reaper); - nni_cond_fini(&sock->s_cv); - nni_mutex_fini(&sock->s_mx); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } if ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0) { nni_msgqueue_destroy(sock->s_uwq); nni_thr_fini(&sock->s_reaper); - nni_cond_fini(&sock->s_cv); - nni_mutex_fini(&sock->s_mx); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -146,8 +146,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) nni_msgqueue_destroy(sock->s_urq); nni_msgqueue_destroy(sock->s_uwq); nni_thr_fini(&sock->s_reaper); - nni_cond_fini(&sock->s_cv); - nni_mutex_fini(&sock->s_mx); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -164,18 +164,17 @@ nni_socket_close(nni_socket *sock) nni_pipe *pipe; nni_endpt *ep; nni_time linger; - nni_thread *reaper; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); // Mark us closing, so no more EPs or changes can occur. sock->s_closing = 1; // Stop all EPS. We're going to do this first, since we know // we're closing. while ((ep = nni_list_first(&sock->s_eps)) != NULL) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); nni_endpt_close(ep); - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); } // Special optimization; if there are no pipes connected, @@ -186,7 +185,7 @@ nni_socket_close(nni_socket *sock) } else { linger = nni_clock() + sock->s_linger; } - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); // We drain the upper write queue. This is just like closing it, @@ -199,9 +198,9 @@ nni_socket_close(nni_socket *sock) // writes (e.g. a slow reader on the other side), it should be // trying to shut things down. We wait to give it // a chance to do so gracefully. - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); while (nni_list_first(&sock->s_pipes) != NULL) { - if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) { + if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) { break; } } @@ -221,8 +220,8 @@ nni_socket_close(nni_socket *sock) nni_list_append(&sock->s_reaps, pipe); } - nni_cond_broadcast(&sock->s_cv); - nni_mutex_exit(&sock->s_mx); + nni_cv_wake(&sock->s_cv); + nni_mtx_unlock(&sock->s_mx); // Wait for the reaper to exit. nni_thr_fini(&sock->s_reaper); @@ -234,8 +233,8 @@ nni_socket_close(nni_socket *sock) // And we need to clean up *our* state. nni_msgqueue_destroy(sock->s_urq); nni_msgqueue_destroy(sock->s_uwq); - nni_cond_fini(&sock->s_cv); - nni_mutex_fini(&sock->s_mx); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (0); } @@ -250,17 +249,17 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_time expire) // Senderr is typically set by protocols when the state machine // indicates that it is no longer valid to send a message. E.g. // a REP socket with no REQ pending. - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } if ((rv = sock->s_senderr) != 0) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } besteffort = sock->s_besteffort; - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); if (sock->s_ops.proto_send_filter != NULL) { msg = sock->s_ops.proto_send_filter(sock->s_data, msg); @@ -290,16 +289,16 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_time expire) int rv; nni_msg *msg; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } if ((rv = sock->s_recverr) != 0) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); for (;;) { rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire); @@ -392,11 +391,11 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size) void *ptr; int rv = ENOTSUP; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if (sock->s_ops.proto_setopt != NULL) { rv = sock->s_ops.proto_setopt(sock->s_data, opt, val, size); if (rv != NNG_ENOTSUP) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } } @@ -423,7 +422,7 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size) rv = nni_setopt_buf(sock->s_urq, val, size); break; } - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } @@ -435,11 +434,11 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep) void *ptr; int rv = ENOTSUP; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if (sock->s_ops.proto_getopt != NULL) { rv = sock->s_ops.proto_getopt(sock->s_data, opt, val, sizep); if (rv != NNG_ENOTSUP) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } } @@ -466,6 +465,6 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep) rv = nni_getopt_buf(sock->s_urq, val, sizep); break; } - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } diff --git a/src/core/socket.h b/src/core/socket.h index 62c5ccb5..8d72c5e6 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -14,8 +14,8 @@ // OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR // TRANSPORTS. struct nng_socket { - nni_mutex s_mx; - nni_cond s_cv; + nni_mtx s_mx; + nni_cv s_cv; nni_msgqueue * s_uwq; // Upper write queue nni_msgqueue * s_urq; // Upper read queue diff --git a/src/platform/posix/posix_synch.c b/src/platform/posix/posix_synch.c deleted file mode 100644 index d9fed53f..00000000 --- a/src/platform/posix/posix_synch.c +++ /dev/null @@ -1,154 +0,0 @@ -// -// Copyright 2016 Garrett D'Amore <garrett@damore.org> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -// POSIX synchronization (mutexes and condition variables). This uses -// pthreads. - -#include "core/nng_impl.h" - -#ifdef PLATFORM_POSIX_SYNCH - -#include <pthread.h> -#include <time.h> -#include <string.h> - -extern pthread_condattr_t nni_cvattr; -extern pthread_mutexattr_t nni_mxattr; - -int -nni_mutex_init(nni_mutex *mp) -{ - if (pthread_mutex_init(&mp->mx, &nni_mxattr) != 0) { - return (NNG_ENOMEM); - } - return (0); -} - - -void -nni_mutex_fini(nni_mutex *mp) -{ - int rv; - - if ((rv = pthread_mutex_destroy(&mp->mx)) != 0) { - nni_panic("pthread_mutex_destroy failed: %s", strerror(rv)); - } -} - - -void -nni_mutex_enter(nni_mutex *m) -{ - int rv; - - if ((rv = pthread_mutex_lock(&m->mx)) != 0) { - nni_panic("pthread_mutex_lock failed: %s", strerror(rv)); - } -} - - -void -nni_mutex_exit(nni_mutex *m) -{ - if (pthread_mutex_unlock(&m->mx) != 0) { - nni_panic("pthread_mutex_unlock failed"); - } -} - - -int -nni_mutex_tryenter(nni_mutex *m) -{ - if (pthread_mutex_trylock(&m->mx) != 0) { - return (NNG_EBUSY); - } - return (0); -} - - -int -nni_cond_init(nni_cond *c, nni_mutex *m) -{ - if (pthread_cond_init(&c->cv, &nni_cvattr) != 0) { - // In theory could be EAGAIN, but handle like ENOMEM - return (NNG_ENOMEM); - } - c->mx = &m->mx; - return (0); -} - - -void -nni_cond_fini(nni_cond *c) -{ - if (pthread_cond_destroy(&c->cv) != 0) { - nni_panic("pthread_cond_destroy failed"); - } -} - - -void -nni_cond_signal(nni_cond *c) -{ - if (pthread_cond_signal(&c->cv) != 0) { - nni_panic("pthread_cond_signal failed"); - } -} - - -void -nni_cond_broadcast(nni_cond *c) -{ - if (pthread_cond_broadcast(&c->cv) != 0) { - nni_panic("pthread_cond_broadcast failed"); - } -} - - -void -nni_cond_wait(nni_cond *c) -{ - if (pthread_cond_wait(&c->cv, c->mx) != 0) { - nni_panic("pthread_cond_wait failed"); - } -} - - -int -nni_cond_waituntil(nni_cond *c, nni_time usec) -{ - struct timespec ts; - int rv; - nni_duration delta = usec - nni_clock(); - - - if (usec != NNI_TIME_NEVER) { - ts.tv_sec = usec / 1000000; - ts.tv_nsec = (usec % 1000000) * 1000; - - rv = pthread_cond_timedwait(&c->cv, c->mx, &ts); - } else { - rv = pthread_cond_wait(&c->cv, c->mx); - } - - if (rv == ETIMEDOUT) { - if (nni_clock() < usec) { - // This only happens if the implementation - // is buggy. - nni_panic("Premature wakupe!"); - } - return (NNG_ETIMEDOUT); - } else if (rv != 0) { - nni_panic("pthread_cond_timedwait returned %d", rv); - } - return (0); -} - - -#endif diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index d1944879..0a999326 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -45,50 +45,6 @@ nni_plat_nextid(void) return (id); } - -static void * -nni_thrfunc(void *arg) -{ - nni_thread *thr = arg; - - thr->func(thr->arg); - return (NULL); -} - - -int -nni_thread_create(nni_thread **tp, void (*fn)(void *), void *arg) -{ - nni_thread *thr; - int rv; - - if ((thr = nni_alloc(sizeof (*thr))) == NULL) { - return (NNG_ENOMEM); - } - thr->func = fn; - thr->arg = arg; - - if ((rv = pthread_create(&thr->tid, NULL, nni_thrfunc, thr)) != 0) { - nni_free(thr, sizeof (*thr)); - return (NNG_ENOMEM); - } - *tp = thr; - return (0); -} - - -void -nni_thread_reap(nni_thread *thr) -{ - int rv; - - if ((rv = pthread_join(thr->tid, NULL)) != 0) { - nni_panic("pthread_thread: %s", strerror(rv)); - } - nni_free(thr, sizeof (*thr)); -} - - int nni_plat_mtx_init(nni_plat_mtx *mtx) { |
