diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-01 19:48:10 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-01 19:48:10 -0800 |
| commit | 783470724ed22b315f2ecc4e3b1ac9d199d44ea2 (patch) | |
| tree | d4883f305b1f2c6f57710cd8900f4b0932ae14c0 /src/core/socket.c | |
| parent | bbed172a2b38f9227ca9e1c02a933df068e5eaf7 (diff) | |
| download | nng-783470724ed22b315f2ecc4e3b1ac9d199d44ea2.tar.gz nng-783470724ed22b315f2ecc4e3b1ac9d199d44ea2.tar.bz2 nng-783470724ed22b315f2ecc4e3b1ac9d199d44ea2.zip | |
Final purge of old threading & synch stuff.
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 83 |
1 files changed, 41 insertions, 42 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index a79b1fde..48cd25bc 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -41,16 +41,16 @@ nni_reaper(void *arg) nni_pipe *pipe; nni_endpt *ep; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) { nni_list_remove(&sock->s_reaps, pipe); if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { ep->ep_pipe = NULL; - nni_cond_broadcast(&ep->ep_cv); + nni_cv_wake(&ep->ep_cv); } - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); // This should already have been done. if (pipe->p_trandata != NULL) { @@ -74,12 +74,12 @@ nni_reaper(void *arg) if ((sock->s_closing) && (nni_list_first(&sock->s_reaps) == NULL) && (nni_list_first(&sock->s_pipes) == NULL)) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); break; } - nni_cond_wait(&sock->s_cv); - nni_mutex_exit(&sock->s_mx); + nni_cv_wait(&sock->s_cv); + nni_mtx_unlock(&sock->s_mx); } } @@ -109,35 +109,35 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node); - if ((rv = nni_mutex_init(&sock->s_mx)) != 0) { + if ((rv = nni_mtx_init(&sock->s_mx)) != 0) { nni_free(sock, sizeof (*sock)); return (rv); } - if ((rv = nni_cond_init(&sock->s_cv, &sock->s_mx)) != 0) { - nni_mutex_fini(&sock->s_mx); + if ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) { + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { - nni_cond_fini(&sock->s_cv); - nni_mutex_fini(&sock->s_mx); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } if ((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) { nni_thr_fini(&sock->s_reaper); - nni_cond_fini(&sock->s_cv); - nni_mutex_fini(&sock->s_mx); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } if ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0) { nni_msgqueue_destroy(sock->s_uwq); nni_thr_fini(&sock->s_reaper); - nni_cond_fini(&sock->s_cv); - nni_mutex_fini(&sock->s_mx); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -146,8 +146,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) nni_msgqueue_destroy(sock->s_urq); nni_msgqueue_destroy(sock->s_uwq); nni_thr_fini(&sock->s_reaper); - nni_cond_fini(&sock->s_cv); - nni_mutex_fini(&sock->s_mx); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -164,18 +164,17 @@ nni_socket_close(nni_socket *sock) nni_pipe *pipe; nni_endpt *ep; nni_time linger; - nni_thread *reaper; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); // Mark us closing, so no more EPs or changes can occur. sock->s_closing = 1; // Stop all EPS. We're going to do this first, since we know // we're closing. while ((ep = nni_list_first(&sock->s_eps)) != NULL) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); nni_endpt_close(ep); - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); } // Special optimization; if there are no pipes connected, @@ -186,7 +185,7 @@ nni_socket_close(nni_socket *sock) } else { linger = nni_clock() + sock->s_linger; } - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); // We drain the upper write queue. This is just like closing it, @@ -199,9 +198,9 @@ nni_socket_close(nni_socket *sock) // writes (e.g. a slow reader on the other side), it should be // trying to shut things down. We wait to give it // a chance to do so gracefully. - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); while (nni_list_first(&sock->s_pipes) != NULL) { - if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) { + if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) { break; } } @@ -221,8 +220,8 @@ nni_socket_close(nni_socket *sock) nni_list_append(&sock->s_reaps, pipe); } - nni_cond_broadcast(&sock->s_cv); - nni_mutex_exit(&sock->s_mx); + nni_cv_wake(&sock->s_cv); + nni_mtx_unlock(&sock->s_mx); // Wait for the reaper to exit. nni_thr_fini(&sock->s_reaper); @@ -234,8 +233,8 @@ nni_socket_close(nni_socket *sock) // And we need to clean up *our* state. nni_msgqueue_destroy(sock->s_urq); nni_msgqueue_destroy(sock->s_uwq); - nni_cond_fini(&sock->s_cv); - nni_mutex_fini(&sock->s_mx); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (0); } @@ -250,17 +249,17 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_time expire) // Senderr is typically set by protocols when the state machine // indicates that it is no longer valid to send a message. E.g. // a REP socket with no REQ pending. - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } if ((rv = sock->s_senderr) != 0) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } besteffort = sock->s_besteffort; - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); if (sock->s_ops.proto_send_filter != NULL) { msg = sock->s_ops.proto_send_filter(sock->s_data, msg); @@ -290,16 +289,16 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_time expire) int rv; nni_msg *msg; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } if ((rv = sock->s_recverr) != 0) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); for (;;) { rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire); @@ -392,11 +391,11 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size) void *ptr; int rv = ENOTSUP; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if (sock->s_ops.proto_setopt != NULL) { rv = sock->s_ops.proto_setopt(sock->s_data, opt, val, size); if (rv != NNG_ENOTSUP) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } } @@ -423,7 +422,7 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size) rv = nni_setopt_buf(sock->s_urq, val, size); break; } - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } @@ -435,11 +434,11 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep) void *ptr; int rv = ENOTSUP; - nni_mutex_enter(&sock->s_mx); + nni_mtx_lock(&sock->s_mx); if (sock->s_ops.proto_getopt != NULL) { rv = sock->s_ops.proto_getopt(sock->s_data, opt, val, sizep); if (rv != NNG_ENOTSUP) { - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } } @@ -466,6 +465,6 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep) rv = nni_getopt_buf(sock->s_urq, val, sizep); break; } - nni_mutex_exit(&sock->s_mx); + nni_mtx_unlock(&sock->s_mx); return (rv); } |
