aboutsummaryrefslogtreecommitdiff
path: root/src/core/endpt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/endpt.c')
-rw-r--r--src/core/endpt.c129
1 files changed, 97 insertions, 32 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index efd3eefb..00690cd4 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -19,13 +19,54 @@ static nni_objhash *nni_eps = NULL;
static void *nni_ep_ctor(uint32_t);
static void nni_ep_dtor(void *);
+// Because we can't reap threads from themselves, we need to have a separate
+// task to reap endpoints. We use one global task to do this, and we just
+// add to the reap list as needed.
+static nni_taskq_ent nni_ep_reap_tqe;
+static nni_mtx nni_ep_reap_mx;
+static nni_list nni_ep_reap_list;
+
+static void
+nni_ep_reaper(void *arg)
+{
+ nni_ep *ep;
+
+ NNI_ARG_UNUSED(arg);
+
+ nni_mtx_lock(&nni_ep_reap_mx);
+ while ((ep = nni_list_first(&nni_ep_reap_list)) != NULL) {
+ nni_list_remove(&nni_ep_reap_list, ep);
+
+ nni_mtx_unlock(&nni_ep_reap_mx);
+ nni_thr_fini(&ep->ep_thr);
+ nni_objhash_unref(nni_eps, ep->ep_id);
+ nni_mtx_lock(&nni_ep_reap_mx);
+
+ continue;
+ }
+
+ nni_mtx_unlock(&nni_ep_reap_mx);
+}
+
+
int
nni_ep_sys_init(void)
{
int rv;
rv = nni_objhash_init(&nni_eps, nni_ep_ctor, nni_ep_dtor);
+ if (rv != 0) {
+ return (rv);
+ }
+ rv = nni_mtx_init(&nni_ep_reap_mx);
+ if (rv != 0) {
+ nni_objhash_fini(nni_eps);
+ nni_eps = NULL;
+ return (rv);
+ }
+ nni_ep_list_init(&nni_ep_reap_list);
+ nni_taskq_ent_init(&nni_ep_reap_tqe, nni_ep_reaper, NULL);
return (rv);
}
@@ -33,6 +74,8 @@ nni_ep_sys_init(void)
void
nni_ep_sys_fini(void)
{
+ nni_taskq_cancel(NULL, &nni_ep_reap_tqe);
+ nni_mtx_fini(&nni_ep_reap_mx);
nni_objhash_fini(nni_eps);
nni_eps = NULL;
}
@@ -129,13 +172,6 @@ nni_ep_dtor(void *ptr)
{
nni_ep *ep = ptr;
- // If a thread is running, make sure it is stopped.
- nni_thr_fini(&ep->ep_thr);
-
- if (ep->ep_sock != NULL) {
- // This is idempotent; harmless if not already on the list.
- nni_sock_rem_ep(ep->ep_sock, ep);
- }
if (ep->ep_data != NULL) {
ep->ep_ops.ep_fini(ep->ep_data);
}
@@ -181,7 +217,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
return (rv);
}
- if ((rv = nni_sock_add_ep(sock, ep)) != 0) {
+ if ((rv = nni_sock_ep_add(sock, ep)) != 0) {
nni_objhash_unref(nni_eps, id);
return (rv);
}
@@ -192,22 +228,50 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
void
+nni_ep_stop(nni_ep *ep)
+{
+ nni_mtx_lock(&ep->ep_mtx);
+ if (ep->ep_closed == 0) {
+ ep->ep_closed = 1;
+ ep->ep_ops.ep_close(ep->ep_data);
+ }
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
+}
+
+
+void
nni_ep_close(nni_ep *ep)
{
nni_pipe *pipe;
+
+ nni_ep_stop(ep);
+ nni_mtx_lock(&ep->ep_mtx);
+ NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
+}
+
+
+void
+nni_ep_remove(nni_ep *ep)
+{
+ nni_pipe *pipe;
nni_sock *sock = ep->ep_sock;
+ nni_ep_close(ep);
+
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed == 0) {
- ep->ep_closed = 1;
- ep->ep_ops.ep_close(ep->ep_data);
- if ((pipe = ep->ep_pipe) != NULL) {
- pipe->p_ep = NULL;
- ep->ep_pipe = NULL;
- }
- nni_cv_wake(&ep->ep_cv);
+ while (nni_list_first(&ep->ep_pipes) != NULL) {
+ nni_cv_wait(&ep->ep_cv);
}
nni_mtx_unlock(&ep->ep_mtx);
+
+ nni_sock_ep_remove(sock, ep);
+
+ nni_thr_fini(&ep->ep_thr);
+ nni_objhash_unref(nni_eps, ep->ep_id);
}
@@ -222,29 +286,27 @@ nni_ep_connect(nni_ep *ep)
}
rv = ep->ep_ops.ep_connect(ep->ep_data, pipe->p_tran_data);
if (rv != 0) {
- nni_pipe_destroy(pipe);
+ nni_pipe_remove(pipe);
return (rv);
}
if ((rv = nni_pipe_start(pipe)) != 0) {
- nni_pipe_close(pipe);
+ nni_pipe_remove(pipe);
return (rv);
}
ep->ep_pipe = pipe;
- pipe->p_ep = ep;
return (0);
}
int
-nni_ep_add_pipe(nni_ep *ep, nni_pipe *pipe)
+nni_ep_pipe_add(nni_ep *ep, nni_pipe *pipe)
{
- nni_ep_hold(ep);
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
- nni_ep_rele(ep);
return (NNG_ECLOSED);
}
+
nni_list_append(&ep->ep_pipes, pipe);
nni_mtx_unlock(&ep->ep_mtx);
@@ -253,16 +315,19 @@ nni_ep_add_pipe(nni_ep *ep, nni_pipe *pipe)
void
-nni_ep_rem_pipe(nni_ep *ep, nni_pipe *pipe)
+nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe)
{
- nni_mtx_lock(&ep->ep_mtx);
- if (!nni_list_active(&ep->ep_pipes, pipe)) {
+ if ((ep != NULL) && (nni_list_active(&ep->ep_pipes, pipe))) {
+ nni_mtx_lock(&ep->ep_mtx);
+ nni_list_remove(&ep->ep_pipes, pipe);
+
+ if (ep->ep_pipe == pipe) {
+ ep->ep_pipe = NULL;
+ }
+ nni_cv_wake(&ep->ep_cv);
+
nni_mtx_unlock(&ep->ep_mtx);
- return;
}
- nni_list_remove(&ep->ep_pipes, pipe);
- nni_mtx_unlock(&ep->ep_mtx);
- nni_ep_rele(ep);
}
@@ -295,7 +360,7 @@ nni_dialer(void *arg)
}
if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
- break;
+ return;
}
nni_mtx_unlock(&ep->ep_mtx);
@@ -382,11 +447,11 @@ nni_ep_accept(nni_ep *ep)
}
rv = ep->ep_ops.ep_accept(ep->ep_data, pipe->p_tran_data);
if (rv != 0) {
- nni_pipe_destroy(pipe);
+ nni_pipe_remove(pipe);
return (rv);
}
if ((rv = nni_pipe_start(pipe)) != 0) {
- nni_pipe_close(pipe);
+ nni_pipe_remove(pipe);
return (rv);
}
return (0);