From f26dea6463adce8d70e1a4b22d8f9a867cf672c6 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 1 Jan 2017 18:55:52 -0800 Subject: Socket reaper uses new-thread. --- src/core/socket.c | 71 +++++++++++++++++++++++++++---------------------------- src/core/socket.h | 2 +- 2 files changed, 36 insertions(+), 37 deletions(-) (limited to 'src/core') diff --git a/src/core/socket.c b/src/core/socket.c index 2e95c229..3536934a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -71,8 +71,10 @@ nni_reaper(void *arg) continue; } - if (sock->s_reaper == NULL) { - nni_mutex_exit(&sock->s_mx); + if ((sock->s_closing) && + (nni_list_first(&sock->s_reaps) == NULL) && + (nni_list_first(&sock->s_pipes) == NULL)) { + nni_mutex_exit(&sock->s_mx); break; } @@ -103,6 +105,9 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) sock->s_closing = 0; sock->s_reconn = NNI_SECOND; sock->s_reconnmax = NNI_SECOND; + NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); + NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); + NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node); if ((rv = nni_mutex_init(&sock->s_mx)) != 0) { nni_free(sock, sizeof (*sock)); @@ -114,44 +119,41 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) return (rv); } - NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); - NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); - NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node); - - if ((rv = nni_thread_create(&sock->s_reaper, nni_reaper, sock)) != 0) { - goto fail; + if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { + nni_cond_fini(&sock->s_cv); + nni_mutex_fini(&sock->s_mx); + nni_free(sock, sizeof (*sock)); + return (rv); } - if (((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) || - ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0)) { - goto fail; + if ((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) { + nni_thr_fini(&sock->s_reaper); + nni_cond_fini(&sock->s_cv); + nni_mutex_fini(&sock->s_mx); + nni_free(sock, sizeof (*sock)); + return (rv); + } + if ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0) { + nni_msgqueue_destroy(sock->s_uwq); + nni_thr_fini(&sock->s_reaper); + nni_cond_fini(&sock->s_cv); + nni_mutex_fini(&sock->s_mx); + nni_free(sock, sizeof (*sock)); + return (rv); } if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { - goto fail; - } - *sockp = sock; - return (0); - -fail: - if (sock->s_urq != NULL) { nni_msgqueue_destroy(sock->s_urq); - } - if (sock->s_uwq != NULL) { nni_msgqueue_destroy(sock->s_uwq); + nni_thr_fini(&sock->s_reaper); + nni_cond_fini(&sock->s_cv); + nni_mutex_fini(&sock->s_mx); + nni_free(sock, sizeof (*sock)); + return (rv); } - if (sock->s_reaper != NULL) { - nni_thread *reap = sock->s_reaper; - nni_mutex_enter(&sock->s_mx); - sock->s_reaper = NULL; - nni_cond_broadcast(&sock->s_cv); - nni_mutex_exit(&sock->s_mx); - nni_thread_reap(reap); - } - nni_cond_fini(&sock->s_cv); - nni_mutex_fini(&sock->s_mx); - nni_free(sock, sizeof (*sock)); - return (rv); + nni_thr_run(&sock->s_reaper); + *sockp = sock; + return (0); } @@ -219,14 +221,11 @@ nni_socket_close(nni_socket *sock) nni_list_append(&sock->s_reaps, pipe); } - // Tell the reaper it's done once it finishes. Also kick it off. - reaper = sock->s_reaper; - sock->s_reaper = NULL; nni_cond_broadcast(&sock->s_cv); nni_mutex_exit(&sock->s_mx); // Wait for the reaper to exit. - nni_thread_reap(reaper); + nni_thr_fini(&sock->s_reaper); // At this point nothing else should be referencing us. // The protocol needs to clean up its state. diff --git a/src/core/socket.h b/src/core/socket.h index ff3ad865..62c5ccb5 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -35,7 +35,7 @@ struct nng_socket { nni_list s_pipes; // pipes for this socket nni_list s_reaps; // pipes to reap - nni_thread * s_reaper; + nni_thr s_reaper; int s_closing; // Socket is closing int s_besteffort; // Best effort mode delivery -- cgit v1.2.3-70-g09d2