From 81808ce3d38cc7ce0131367e2187f0beb2cd1b43 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Tue, 15 May 2018 11:59:09 -0700 Subject: fixes #431 hang in taskq_wait fixes #429 async websocket reap leads to crash This tightens up the code for shutdown, ensuring that transport callbacks are completely stopped before advancing to the next step of teardown of transport pipes or endpoints. It also fixes a problem where task_wait would sometimes get "stuck" as tasks transitioned between asynch and synchronous completions. Finally, it saves a few cycles by only calling a cancellation callback once during cancellation of an aio. --- src/core/aio.c | 3 ++- src/core/endpt.c | 34 ++++++++++++++++------------------ src/core/pipe.c | 3 +++ src/core/taskq.c | 2 ++ src/core/transport.h | 11 ++++++++++- src/transport/ipc/ipc.c | 9 ++++++++- src/transport/tcp/tcp.c | 9 ++++++++- src/transport/tls/tls.c | 9 ++++++++- src/transport/ws/websocket.c | 9 ++++++++- 9 files changed, 65 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/core/aio.c b/src/core/aio.c index 2f90aa34..6276589b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -387,7 +387,8 @@ nni_aio_abort(nni_aio *aio, int rv) nni_aio_cancelfn cancelfn; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; nni_mtx_unlock(&nni_aio_lk); // Stop any I/O at the provider level. diff --git a/src/core/endpt.c b/src/core/endpt.c index 87566d42..d39824a4 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -23,10 +23,10 @@ struct nni_ep { nni_sock * ep_sock; nni_url * ep_url; int ep_mode; - int ep_started; - int ep_closed; // full shutdown - int ep_closing; // close pending (waiting on refcnt) int ep_refcnt; + bool ep_started; + bool ep_closed; // full shutdown + bool ep_closing; // close pending (waiting on refcnt) bool ep_tmo_run; nni_mtx ep_mtx; nni_cv ep_cv; @@ -138,8 +138,9 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *urlstr, int mode) return (NNG_ENOMEM); } ep->ep_url = url; - ep->ep_closed = 0; - ep->ep_started = 0; + ep->ep_closed = false; + ep->ep_closing = false; + ep->ep_started = false; ep->ep_data = NULL; ep->ep_refcnt = 1; ep->ep_sock = s; @@ -237,15 +238,12 @@ nni_ep_rele(nni_ep *ep) int nni_ep_shutdown(nni_ep *ep) { - // This is done under the endpoints lock, although the remove - // is done under that as well, we also make sure that we hold - // the socket lock in the remove step. nni_mtx_lock(&ep->ep_mtx); if (ep->ep_closing) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } - ep->ep_closing = 1; + ep->ep_closing = true; nni_mtx_unlock(&ep->ep_mtx); // Abort any remaining in-flight operations. @@ -271,15 +269,15 @@ nni_ep_close(nni_ep *ep) nni_ep_rele(ep); return; } - ep->ep_closed = 1; + ep->ep_closed = true; nni_mtx_unlock(&ep->ep_mtx); nni_ep_shutdown(ep); - nni_aio_close(ep->ep_acc_aio); - nni_aio_close(ep->ep_con_aio); - nni_aio_close(ep->ep_con_syn); - nni_aio_close(ep->ep_tmo_aio); + nni_aio_stop(ep->ep_acc_aio); + nni_aio_stop(ep->ep_con_aio); + nni_aio_stop(ep->ep_con_syn); + nni_aio_stop(ep->ep_tmo_aio); nni_mtx_lock(&ep->ep_mtx); NNI_LIST_FOREACH (&ep->ep_pipes, p) { @@ -435,7 +433,7 @@ nni_ep_dial(nni_ep *ep, int flags) } if ((flags & NNG_FLAG_NONBLOCK) != 0) { - ep->ep_started = 1; + ep->ep_started = true; nni_ep_con_start(ep); nni_mtx_unlock(&ep->ep_mtx); return (0); @@ -444,7 +442,7 @@ nni_ep_dial(nni_ep *ep, int flags) // Synchronous mode: so we have to wait for it to complete. aio = ep->ep_con_syn; ep->ep_ops.ep_connect(ep->ep_data, aio); - ep->ep_started = 1; + ep->ep_started = true; nni_mtx_unlock(&ep->ep_mtx); nni_aio_wait(aio); @@ -453,7 +451,7 @@ nni_ep_dial(nni_ep *ep, int flags) if (((rv = nni_aio_result(aio)) != 0) || ((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) { nni_mtx_lock(&ep->ep_mtx); - ep->ep_started = 0; + ep->ep_started = false; nni_mtx_unlock(&ep->ep_mtx); } return (rv); @@ -538,7 +536,7 @@ nni_ep_listen(nni_ep *ep, int flags) return (rv); } - ep->ep_started = 1; + ep->ep_started = true; nni_ep_acc_start(ep); nni_mtx_unlock(&ep->ep_mtx); diff --git a/src/core/pipe.c b/src/core/pipe.c index ba3027cf..321e1a09 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -110,6 +110,9 @@ nni_pipe_destroy(nni_pipe *p) if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_stop(p->p_proto_data); } + if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { + p->p_tran_ops.p_stop(p->p_tran_data); + } // We have exclusive access at this point, so we can check if // we are still on any lists. diff --git a/src/core/taskq.c b/src/core/taskq.c index 526fa0b4..a87e4729 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -170,6 +170,7 @@ nni_task_dispatch(nni_task *task) nni_mtx_lock(&task->task_mtx); task->task_sched = true; task->task_run = false; + task->task_exec = false; task->task_done = false; nni_mtx_unlock(&task->task_mtx); @@ -201,6 +202,7 @@ nni_task_exec(nni_task *task) task->task_exec = true; task->task_sched = false; task->task_done = false; + task->task_run = false; nni_mtx_unlock(&task->task_mtx); task->task_cb(task->task_arg); diff --git a/src/core/transport.h b/src/core/transport.h index 96976efd..d37fb449 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -102,9 +102,13 @@ struct nni_tran_ep_ops { void (*ep_accept)(void *, nni_aio *); // ep_close stops the endpoint from operating altogether. It does - // not affect pipes that have already been created. + // not affect pipes that have already been created. It is nonblocking. void (*ep_close)(void *); + // ep_stop stops the endpoint, and *waits* for any outstanding + // aio operations to complete. + void (*ep_stop)(void *); + // ep_options is an array of endpoint options. The final element must // have a NULL name. If this member is NULL, then no transport specific // options are available. @@ -142,6 +146,11 @@ struct nni_tran_pipe_ops { // its readiness by finishing the aio. void (*p_start)(void *, nni_aio *); + // p_stop stops the pipe, waiting for any callbacks that are + // outstanding to complete. This is done before tearing down + // resources with p_fini. + void (*p_stop)(void *); + // p_aio_send queues the message for transmit. If this fails, then // the caller may try again with the same message (or free it). If // the call succeeds, then the transport has taken ownership of the diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 1740bfcb..016e47ec 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -89,13 +89,19 @@ nni_ipc_pipe_close(void *arg) } static void -nni_ipc_pipe_fini(void *arg) +nni_ipc_pipe_stop(void *arg) { nni_ipc_pipe *pipe = arg; nni_aio_stop(pipe->rxaio); nni_aio_stop(pipe->txaio); nni_aio_stop(pipe->negaio); +} + +static void +nni_ipc_pipe_fini(void *arg) +{ + nni_ipc_pipe *pipe = arg; nni_aio_fini(pipe->rxaio); nni_aio_fini(pipe->txaio); @@ -890,6 +896,7 @@ static nni_tran_pipe_option nni_ipc_pipe_options[] = { static nni_tran_pipe_ops nni_ipc_pipe_ops = { .p_fini = nni_ipc_pipe_fini, .p_start = nni_ipc_pipe_start, + .p_stop = nni_ipc_pipe_stop, .p_send = nni_ipc_pipe_send, .p_recv = nni_ipc_pipe_recv, .p_close = nni_ipc_pipe_close, diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 22217699..171cc8ee 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -91,13 +91,19 @@ nni_tcp_pipe_close(void *arg) } static void -nni_tcp_pipe_fini(void *arg) +nni_tcp_pipe_stop(void *arg) { nni_tcp_pipe *p = arg; nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); nni_aio_stop(p->negaio); +} + +static void +nni_tcp_pipe_fini(void *arg) +{ + nni_tcp_pipe *p = arg; nni_aio_fini(p->rxaio); nni_aio_fini(p->txaio); @@ -917,6 +923,7 @@ static nni_tran_pipe_option nni_tcp_pipe_options[] = { static nni_tran_pipe_ops nni_tcp_pipe_ops = { .p_fini = nni_tcp_pipe_fini, .p_start = nni_tcp_pipe_start, + .p_stop = nni_tcp_pipe_stop, .p_send = nni_tcp_pipe_send, .p_recv = nni_tcp_pipe_recv, .p_close = nni_tcp_pipe_close, diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 21557270..f7e90303 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -99,13 +99,19 @@ nni_tls_pipe_close(void *arg) } static void -nni_tls_pipe_fini(void *arg) +nni_tls_pipe_stop(void *arg) { nni_tls_pipe *p = arg; nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); nni_aio_stop(p->negaio); +} + +static void +nni_tls_pipe_fini(void *arg) +{ + nni_tls_pipe *p = arg; nni_aio_fini(p->rxaio); nni_aio_fini(p->txaio); @@ -1043,6 +1049,7 @@ static nni_tran_pipe_option nni_tls_pipe_options[] = { static nni_tran_pipe_ops nni_tls_pipe_ops = { .p_fini = nni_tls_pipe_fini, .p_start = nni_tls_pipe_start, + .p_stop = nni_tls_pipe_stop, .p_send = nni_tls_pipe_send, .p_recv = nni_tls_pipe_recv, .p_close = nni_tls_pipe_close, diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index a4081b25..36deddac 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -183,12 +183,18 @@ ws_pipe_send(void *arg, nni_aio *aio) } static void -ws_pipe_fini(void *arg) +ws_pipe_stop(void *arg) { ws_pipe *p = arg; nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); +} + +static void +ws_pipe_fini(void *arg) +{ + ws_pipe *p = arg; nni_aio_fini(p->rxaio); nni_aio_fini(p->txaio); @@ -593,6 +599,7 @@ static nni_tran_pipe_option ws_pipe_options[] = { static nni_tran_pipe_ops ws_pipe_ops = { .p_fini = ws_pipe_fini, + .p_stop = ws_pipe_stop, .p_send = ws_pipe_send, .p_recv = ws_pipe_recv, .p_close = ws_pipe_close, -- cgit v1.2.3-70-g09d2