aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c3
-rw-r--r--src/core/endpt.c34
-rw-r--r--src/core/pipe.c3
-rw-r--r--src/core/taskq.c2
-rw-r--r--src/core/transport.h11
5 files changed, 33 insertions, 20 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 2f90aa34..6276589b 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -387,7 +387,8 @@ nni_aio_abort(nni_aio *aio, int rv)
nni_aio_cancelfn cancelfn;
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
+ cancelfn = aio->a_prov_cancel;
+ aio->a_prov_cancel = NULL;
nni_mtx_unlock(&nni_aio_lk);
// Stop any I/O at the provider level.
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 87566d42..d39824a4 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -23,10 +23,10 @@ struct nni_ep {
nni_sock * ep_sock;
nni_url * ep_url;
int ep_mode;
- int ep_started;
- int ep_closed; // full shutdown
- int ep_closing; // close pending (waiting on refcnt)
int ep_refcnt;
+ bool ep_started;
+ bool ep_closed; // full shutdown
+ bool ep_closing; // close pending (waiting on refcnt)
bool ep_tmo_run;
nni_mtx ep_mtx;
nni_cv ep_cv;
@@ -138,8 +138,9 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *urlstr, int mode)
return (NNG_ENOMEM);
}
ep->ep_url = url;
- ep->ep_closed = 0;
- ep->ep_started = 0;
+ ep->ep_closed = false;
+ ep->ep_closing = false;
+ ep->ep_started = false;
ep->ep_data = NULL;
ep->ep_refcnt = 1;
ep->ep_sock = s;
@@ -237,15 +238,12 @@ nni_ep_rele(nni_ep *ep)
int
nni_ep_shutdown(nni_ep *ep)
{
- // This is done under the endpoints lock, although the remove
- // is done under that as well, we also make sure that we hold
- // the socket lock in the remove step.
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_closing) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
}
- ep->ep_closing = 1;
+ ep->ep_closing = true;
nni_mtx_unlock(&ep->ep_mtx);
// Abort any remaining in-flight operations.
@@ -271,15 +269,15 @@ nni_ep_close(nni_ep *ep)
nni_ep_rele(ep);
return;
}
- ep->ep_closed = 1;
+ ep->ep_closed = true;
nni_mtx_unlock(&ep->ep_mtx);
nni_ep_shutdown(ep);
- nni_aio_close(ep->ep_acc_aio);
- nni_aio_close(ep->ep_con_aio);
- nni_aio_close(ep->ep_con_syn);
- nni_aio_close(ep->ep_tmo_aio);
+ nni_aio_stop(ep->ep_acc_aio);
+ nni_aio_stop(ep->ep_con_aio);
+ nni_aio_stop(ep->ep_con_syn);
+ nni_aio_stop(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
NNI_LIST_FOREACH (&ep->ep_pipes, p) {
@@ -435,7 +433,7 @@ nni_ep_dial(nni_ep *ep, int flags)
}
if ((flags & NNG_FLAG_NONBLOCK) != 0) {
- ep->ep_started = 1;
+ ep->ep_started = true;
nni_ep_con_start(ep);
nni_mtx_unlock(&ep->ep_mtx);
return (0);
@@ -444,7 +442,7 @@ nni_ep_dial(nni_ep *ep, int flags)
// Synchronous mode: so we have to wait for it to complete.
aio = ep->ep_con_syn;
ep->ep_ops.ep_connect(ep->ep_data, aio);
- ep->ep_started = 1;
+ ep->ep_started = true;
nni_mtx_unlock(&ep->ep_mtx);
nni_aio_wait(aio);
@@ -453,7 +451,7 @@ nni_ep_dial(nni_ep *ep, int flags)
if (((rv = nni_aio_result(aio)) != 0) ||
((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) {
nni_mtx_lock(&ep->ep_mtx);
- ep->ep_started = 0;
+ ep->ep_started = false;
nni_mtx_unlock(&ep->ep_mtx);
}
return (rv);
@@ -538,7 +536,7 @@ nni_ep_listen(nni_ep *ep, int flags)
return (rv);
}
- ep->ep_started = 1;
+ ep->ep_started = true;
nni_ep_acc_start(ep);
nni_mtx_unlock(&ep->ep_mtx);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index ba3027cf..321e1a09 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -110,6 +110,9 @@ nni_pipe_destroy(nni_pipe *p)
if (p->p_proto_data != NULL) {
p->p_proto_ops.pipe_stop(p->p_proto_data);
}
+ if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) {
+ p->p_tran_ops.p_stop(p->p_tran_data);
+ }
// We have exclusive access at this point, so we can check if
// we are still on any lists.
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 526fa0b4..a87e4729 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -170,6 +170,7 @@ nni_task_dispatch(nni_task *task)
nni_mtx_lock(&task->task_mtx);
task->task_sched = true;
task->task_run = false;
+ task->task_exec = false;
task->task_done = false;
nni_mtx_unlock(&task->task_mtx);
@@ -201,6 +202,7 @@ nni_task_exec(nni_task *task)
task->task_exec = true;
task->task_sched = false;
task->task_done = false;
+ task->task_run = false;
nni_mtx_unlock(&task->task_mtx);
task->task_cb(task->task_arg);
diff --git a/src/core/transport.h b/src/core/transport.h
index 96976efd..d37fb449 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -102,9 +102,13 @@ struct nni_tran_ep_ops {
void (*ep_accept)(void *, nni_aio *);
// ep_close stops the endpoint from operating altogether. It does
- // not affect pipes that have already been created.
+ // not affect pipes that have already been created. It is nonblocking.
void (*ep_close)(void *);
+ // ep_stop stops the endpoint, and *waits* for any outstanding
+ // aio operations to complete.
+ void (*ep_stop)(void *);
+
// ep_options is an array of endpoint options. The final element must
// have a NULL name. If this member is NULL, then no transport specific
// options are available.
@@ -142,6 +146,11 @@ struct nni_tran_pipe_ops {
// its readiness by finishing the aio.
void (*p_start)(void *, nni_aio *);
+ // p_stop stops the pipe, waiting for any callbacks that are
+ // outstanding to complete. This is done before tearing down
+ // resources with p_fini.
+ void (*p_stop)(void *);
+
// p_aio_send queues the message for transmit. If this fails, then
// the caller may try again with the same message (or free it). If
// the call succeeds, then the transport has taken ownership of the