diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 3 | ||||
| -rw-r--r-- | src/core/endpt.c | 34 | ||||
| -rw-r--r-- | src/core/pipe.c | 3 | ||||
| -rw-r--r-- | src/core/taskq.c | 2 | ||||
| -rw-r--r-- | src/core/transport.h | 11 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 9 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 9 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 9 | ||||
| -rw-r--r-- | src/transport/ws/websocket.c | 9 |
9 files changed, 65 insertions, 24 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 2f90aa34..6276589b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -387,7 +387,8 @@ nni_aio_abort(nni_aio *aio, int rv) nni_aio_cancelfn cancelfn; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; nni_mtx_unlock(&nni_aio_lk); // Stop any I/O at the provider level. diff --git a/src/core/endpt.c b/src/core/endpt.c index 87566d42..d39824a4 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -23,10 +23,10 @@ struct nni_ep { nni_sock * ep_sock; nni_url * ep_url; int ep_mode; - int ep_started; - int ep_closed; // full shutdown - int ep_closing; // close pending (waiting on refcnt) int ep_refcnt; + bool ep_started; + bool ep_closed; // full shutdown + bool ep_closing; // close pending (waiting on refcnt) bool ep_tmo_run; nni_mtx ep_mtx; nni_cv ep_cv; @@ -138,8 +138,9 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *urlstr, int mode) return (NNG_ENOMEM); } ep->ep_url = url; - ep->ep_closed = 0; - ep->ep_started = 0; + ep->ep_closed = false; + ep->ep_closing = false; + ep->ep_started = false; ep->ep_data = NULL; ep->ep_refcnt = 1; ep->ep_sock = s; @@ -237,15 +238,12 @@ nni_ep_rele(nni_ep *ep) int nni_ep_shutdown(nni_ep *ep) { - // This is done under the endpoints lock, although the remove - // is done under that as well, we also make sure that we hold - // the socket lock in the remove step. nni_mtx_lock(&ep->ep_mtx); if (ep->ep_closing) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } - ep->ep_closing = 1; + ep->ep_closing = true; nni_mtx_unlock(&ep->ep_mtx); // Abort any remaining in-flight operations. @@ -271,15 +269,15 @@ nni_ep_close(nni_ep *ep) nni_ep_rele(ep); return; } - ep->ep_closed = 1; + ep->ep_closed = true; nni_mtx_unlock(&ep->ep_mtx); nni_ep_shutdown(ep); - nni_aio_close(ep->ep_acc_aio); - nni_aio_close(ep->ep_con_aio); - nni_aio_close(ep->ep_con_syn); - nni_aio_close(ep->ep_tmo_aio); + nni_aio_stop(ep->ep_acc_aio); + nni_aio_stop(ep->ep_con_aio); + nni_aio_stop(ep->ep_con_syn); + nni_aio_stop(ep->ep_tmo_aio); nni_mtx_lock(&ep->ep_mtx); NNI_LIST_FOREACH (&ep->ep_pipes, p) { @@ -435,7 +433,7 @@ nni_ep_dial(nni_ep *ep, int flags) } if ((flags & NNG_FLAG_NONBLOCK) != 0) { - ep->ep_started = 1; + ep->ep_started = true; nni_ep_con_start(ep); nni_mtx_unlock(&ep->ep_mtx); return (0); @@ -444,7 +442,7 @@ nni_ep_dial(nni_ep *ep, int flags) // Synchronous mode: so we have to wait for it to complete. aio = ep->ep_con_syn; ep->ep_ops.ep_connect(ep->ep_data, aio); - ep->ep_started = 1; + ep->ep_started = true; nni_mtx_unlock(&ep->ep_mtx); nni_aio_wait(aio); @@ -453,7 +451,7 @@ nni_ep_dial(nni_ep *ep, int flags) if (((rv = nni_aio_result(aio)) != 0) || ((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) { nni_mtx_lock(&ep->ep_mtx); - ep->ep_started = 0; + ep->ep_started = false; nni_mtx_unlock(&ep->ep_mtx); } return (rv); @@ -538,7 +536,7 @@ nni_ep_listen(nni_ep *ep, int flags) return (rv); } - ep->ep_started = 1; + ep->ep_started = true; nni_ep_acc_start(ep); nni_mtx_unlock(&ep->ep_mtx); diff --git a/src/core/pipe.c b/src/core/pipe.c index ba3027cf..321e1a09 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -110,6 +110,9 @@ nni_pipe_destroy(nni_pipe *p) if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_stop(p->p_proto_data); } + if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { + p->p_tran_ops.p_stop(p->p_tran_data); + } // We have exclusive access at this point, so we can check if // we are still on any lists. diff --git a/src/core/taskq.c b/src/core/taskq.c index 526fa0b4..a87e4729 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -170,6 +170,7 @@ nni_task_dispatch(nni_task *task) nni_mtx_lock(&task->task_mtx); task->task_sched = true; task->task_run = false; + task->task_exec = false; task->task_done = false; nni_mtx_unlock(&task->task_mtx); @@ -201,6 +202,7 @@ nni_task_exec(nni_task *task) task->task_exec = true; task->task_sched = false; task->task_done = false; + task->task_run = false; nni_mtx_unlock(&task->task_mtx); task->task_cb(task->task_arg); diff --git a/src/core/transport.h b/src/core/transport.h index 96976efd..d37fb449 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -102,9 +102,13 @@ struct nni_tran_ep_ops { void (*ep_accept)(void *, nni_aio *); // ep_close stops the endpoint from operating altogether. It does - // not affect pipes that have already been created. + // not affect pipes that have already been created. It is nonblocking. void (*ep_close)(void *); + // ep_stop stops the endpoint, and *waits* for any outstanding + // aio operations to complete. + void (*ep_stop)(void *); + // ep_options is an array of endpoint options. The final element must // have a NULL name. If this member is NULL, then no transport specific // options are available. @@ -142,6 +146,11 @@ struct nni_tran_pipe_ops { // its readiness by finishing the aio. void (*p_start)(void *, nni_aio *); + // p_stop stops the pipe, waiting for any callbacks that are + // outstanding to complete. This is done before tearing down + // resources with p_fini. + void (*p_stop)(void *); + // p_aio_send queues the message for transmit. If this fails, then // the caller may try again with the same message (or free it). If // the call succeeds, then the transport has taken ownership of the diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 1740bfcb..016e47ec 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -89,13 +89,19 @@ nni_ipc_pipe_close(void *arg) } static void -nni_ipc_pipe_fini(void *arg) +nni_ipc_pipe_stop(void *arg) { nni_ipc_pipe *pipe = arg; nni_aio_stop(pipe->rxaio); nni_aio_stop(pipe->txaio); nni_aio_stop(pipe->negaio); +} + +static void +nni_ipc_pipe_fini(void *arg) +{ + nni_ipc_pipe *pipe = arg; nni_aio_fini(pipe->rxaio); nni_aio_fini(pipe->txaio); @@ -890,6 +896,7 @@ static nni_tran_pipe_option nni_ipc_pipe_options[] = { static nni_tran_pipe_ops nni_ipc_pipe_ops = { .p_fini = nni_ipc_pipe_fini, .p_start = nni_ipc_pipe_start, + .p_stop = nni_ipc_pipe_stop, .p_send = nni_ipc_pipe_send, .p_recv = nni_ipc_pipe_recv, .p_close = nni_ipc_pipe_close, diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 22217699..171cc8ee 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -91,13 +91,19 @@ nni_tcp_pipe_close(void *arg) } static void -nni_tcp_pipe_fini(void *arg) +nni_tcp_pipe_stop(void *arg) { nni_tcp_pipe *p = arg; nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); nni_aio_stop(p->negaio); +} + +static void +nni_tcp_pipe_fini(void *arg) +{ + nni_tcp_pipe *p = arg; nni_aio_fini(p->rxaio); nni_aio_fini(p->txaio); @@ -917,6 +923,7 @@ static nni_tran_pipe_option nni_tcp_pipe_options[] = { static nni_tran_pipe_ops nni_tcp_pipe_ops = { .p_fini = nni_tcp_pipe_fini, .p_start = nni_tcp_pipe_start, + .p_stop = nni_tcp_pipe_stop, .p_send = nni_tcp_pipe_send, .p_recv = nni_tcp_pipe_recv, .p_close = nni_tcp_pipe_close, diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 21557270..f7e90303 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -99,13 +99,19 @@ nni_tls_pipe_close(void *arg) } static void -nni_tls_pipe_fini(void *arg) +nni_tls_pipe_stop(void *arg) { nni_tls_pipe *p = arg; nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); nni_aio_stop(p->negaio); +} + +static void +nni_tls_pipe_fini(void *arg) +{ + nni_tls_pipe *p = arg; nni_aio_fini(p->rxaio); nni_aio_fini(p->txaio); @@ -1043,6 +1049,7 @@ static nni_tran_pipe_option nni_tls_pipe_options[] = { static nni_tran_pipe_ops nni_tls_pipe_ops = { .p_fini = nni_tls_pipe_fini, .p_start = nni_tls_pipe_start, + .p_stop = nni_tls_pipe_stop, .p_send = nni_tls_pipe_send, .p_recv = nni_tls_pipe_recv, .p_close = nni_tls_pipe_close, diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index a4081b25..36deddac 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -183,12 +183,18 @@ ws_pipe_send(void *arg, nni_aio *aio) } static void -ws_pipe_fini(void *arg) +ws_pipe_stop(void *arg) { ws_pipe *p = arg; nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); +} + +static void +ws_pipe_fini(void *arg) +{ + ws_pipe *p = arg; nni_aio_fini(p->rxaio); nni_aio_fini(p->txaio); @@ -593,6 +599,7 @@ static nni_tran_pipe_option ws_pipe_options[] = { static nni_tran_pipe_ops ws_pipe_ops = { .p_fini = ws_pipe_fini, + .p_stop = ws_pipe_stop, .p_send = ws_pipe_send, .p_recv = ws_pipe_recv, .p_close = ws_pipe_close, |
