aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-01 19:48:10 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-01 19:48:10 -0800
commit783470724ed22b315f2ecc4e3b1ac9d199d44ea2 (patch)
treed4883f305b1f2c6f57710cd8900f4b0932ae14c0 /src/core/socket.c
parentbbed172a2b38f9227ca9e1c02a933df068e5eaf7 (diff)
downloadnng-783470724ed22b315f2ecc4e3b1ac9d199d44ea2.tar.gz
nng-783470724ed22b315f2ecc4e3b1ac9d199d44ea2.tar.bz2
nng-783470724ed22b315f2ecc4e3b1ac9d199d44ea2.zip
Final purge of old threading & synch stuff.
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c83
1 files changed, 41 insertions, 42 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index a79b1fde..48cd25bc 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -41,16 +41,16 @@ nni_reaper(void *arg)
nni_pipe *pipe;
nni_endpt *ep;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) {
nni_list_remove(&sock->s_reaps, pipe);
if (((ep = pipe->p_ep) != NULL) &&
((ep->ep_pipe == pipe))) {
ep->ep_pipe = NULL;
- nni_cond_broadcast(&ep->ep_cv);
+ nni_cv_wake(&ep->ep_cv);
}
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
// This should already have been done.
if (pipe->p_trandata != NULL) {
@@ -74,12 +74,12 @@ nni_reaper(void *arg)
if ((sock->s_closing) &&
(nni_list_first(&sock->s_reaps) == NULL) &&
(nni_list_first(&sock->s_pipes) == NULL)) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
break;
}
- nni_cond_wait(&sock->s_cv);
- nni_mutex_exit(&sock->s_mx);
+ nni_cv_wait(&sock->s_cv);
+ nni_mtx_unlock(&sock->s_mx);
}
}
@@ -109,35 +109,35 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node);
- if ((rv = nni_mutex_init(&sock->s_mx)) != 0) {
+ if ((rv = nni_mtx_init(&sock->s_mx)) != 0) {
nni_free(sock, sizeof (*sock));
return (rv);
}
- if ((rv = nni_cond_init(&sock->s_cv, &sock->s_mx)) != 0) {
- nni_mutex_fini(&sock->s_mx);
+ if ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) {
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) {
- nni_cond_fini(&sock->s_cv);
- nni_mutex_fini(&sock->s_mx);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
if ((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) {
nni_thr_fini(&sock->s_reaper);
- nni_cond_fini(&sock->s_cv);
- nni_mutex_fini(&sock->s_mx);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
if ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0) {
nni_msgqueue_destroy(sock->s_uwq);
nni_thr_fini(&sock->s_reaper);
- nni_cond_fini(&sock->s_cv);
- nni_mutex_fini(&sock->s_mx);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
@@ -146,8 +146,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
nni_msgqueue_destroy(sock->s_urq);
nni_msgqueue_destroy(sock->s_uwq);
nni_thr_fini(&sock->s_reaper);
- nni_cond_fini(&sock->s_cv);
- nni_mutex_fini(&sock->s_mx);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
@@ -164,18 +164,17 @@ nni_socket_close(nni_socket *sock)
nni_pipe *pipe;
nni_endpt *ep;
nni_time linger;
- nni_thread *reaper;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = 1;
// Stop all EPS. We're going to do this first, since we know
// we're closing.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
nni_endpt_close(ep);
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
}
// Special optimization; if there are no pipes connected,
@@ -186,7 +185,7 @@ nni_socket_close(nni_socket *sock)
} else {
linger = nni_clock() + sock->s_linger;
}
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
// We drain the upper write queue. This is just like closing it,
@@ -199,9 +198,9 @@ nni_socket_close(nni_socket *sock)
// writes (e.g. a slow reader on the other side), it should be
// trying to shut things down. We wait to give it
// a chance to do so gracefully.
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
while (nni_list_first(&sock->s_pipes) != NULL) {
- if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
+ if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
break;
}
}
@@ -221,8 +220,8 @@ nni_socket_close(nni_socket *sock)
nni_list_append(&sock->s_reaps, pipe);
}
- nni_cond_broadcast(&sock->s_cv);
- nni_mutex_exit(&sock->s_mx);
+ nni_cv_wake(&sock->s_cv);
+ nni_mtx_unlock(&sock->s_mx);
// Wait for the reaper to exit.
nni_thr_fini(&sock->s_reaper);
@@ -234,8 +233,8 @@ nni_socket_close(nni_socket *sock)
// And we need to clean up *our* state.
nni_msgqueue_destroy(sock->s_urq);
nni_msgqueue_destroy(sock->s_uwq);
- nni_cond_fini(&sock->s_cv);
- nni_mutex_fini(&sock->s_mx);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (0);
}
@@ -250,17 +249,17 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_time expire)
// Senderr is typically set by protocols when the state machine
// indicates that it is no longer valid to send a message. E.g.
// a REP socket with no REQ pending.
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
if ((rv = sock->s_senderr) != 0) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
besteffort = sock->s_besteffort;
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
if (sock->s_ops.proto_send_filter != NULL) {
msg = sock->s_ops.proto_send_filter(sock->s_data, msg);
@@ -290,16 +289,16 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_time expire)
int rv;
nni_msg *msg;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
if ((rv = sock->s_recverr) != 0) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
for (;;) {
rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire);
@@ -392,11 +391,11 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size)
void *ptr;
int rv = ENOTSUP;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if (sock->s_ops.proto_setopt != NULL) {
rv = sock->s_ops.proto_setopt(sock->s_data, opt, val, size);
if (rv != NNG_ENOTSUP) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
}
@@ -423,7 +422,7 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size)
rv = nni_setopt_buf(sock->s_urq, val, size);
break;
}
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
@@ -435,11 +434,11 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep)
void *ptr;
int rv = ENOTSUP;
- nni_mutex_enter(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
if (sock->s_ops.proto_getopt != NULL) {
rv = sock->s_ops.proto_getopt(sock->s_data, opt, val, sizep);
if (rv != NNG_ENOTSUP) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
}
@@ -466,6 +465,6 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep)
rv = nni_getopt_buf(sock->s_urq, val, sizep);
break;
}
- nni_mutex_exit(&sock->s_mx);
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}