diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-06-24 14:11:35 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-06-24 14:11:35 -0700 |
| commit | 0a51aa7bfc88d55b98fdde0d497b072e6911457d (patch) | |
| tree | 722ef4713bf27a9aac9dce0a1fe9fa0edfe34a2d /src/core/endpt.c | |
| parent | d753c00d43e6dc642b2445e4821537a92b8b8d23 (diff) | |
| download | nng-0a51aa7bfc88d55b98fdde0d497b072e6911457d.tar.gz nng-0a51aa7bfc88d55b98fdde0d497b072e6911457d.tar.bz2 nng-0a51aa7bfc88d55b98fdde0d497b072e6911457d.zip | |
Protocols keep their own reference counts.
Diffstat (limited to 'src/core/endpt.c')
| -rw-r--r-- | src/core/endpt.c | 129 |
1 files changed, 97 insertions, 32 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index efd3eefb..00690cd4 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -19,13 +19,54 @@ static nni_objhash *nni_eps = NULL; static void *nni_ep_ctor(uint32_t); static void nni_ep_dtor(void *); +// Because we can't reap threads from themselves, we need to have a separate +// task to reap endpoints. We use one global task to do this, and we just +// add to the reap list as needed. +static nni_taskq_ent nni_ep_reap_tqe; +static nni_mtx nni_ep_reap_mx; +static nni_list nni_ep_reap_list; + +static void +nni_ep_reaper(void *arg) +{ + nni_ep *ep; + + NNI_ARG_UNUSED(arg); + + nni_mtx_lock(&nni_ep_reap_mx); + while ((ep = nni_list_first(&nni_ep_reap_list)) != NULL) { + nni_list_remove(&nni_ep_reap_list, ep); + + nni_mtx_unlock(&nni_ep_reap_mx); + nni_thr_fini(&ep->ep_thr); + nni_objhash_unref(nni_eps, ep->ep_id); + nni_mtx_lock(&nni_ep_reap_mx); + + continue; + } + + nni_mtx_unlock(&nni_ep_reap_mx); +} + + int nni_ep_sys_init(void) { int rv; rv = nni_objhash_init(&nni_eps, nni_ep_ctor, nni_ep_dtor); + if (rv != 0) { + return (rv); + } + rv = nni_mtx_init(&nni_ep_reap_mx); + if (rv != 0) { + nni_objhash_fini(nni_eps); + nni_eps = NULL; + return (rv); + } + nni_ep_list_init(&nni_ep_reap_list); + nni_taskq_ent_init(&nni_ep_reap_tqe, nni_ep_reaper, NULL); return (rv); } @@ -33,6 +74,8 @@ nni_ep_sys_init(void) void nni_ep_sys_fini(void) { + nni_taskq_cancel(NULL, &nni_ep_reap_tqe); + nni_mtx_fini(&nni_ep_reap_mx); nni_objhash_fini(nni_eps); nni_eps = NULL; } @@ -129,13 +172,6 @@ nni_ep_dtor(void *ptr) { nni_ep *ep = ptr; - // If a thread is running, make sure it is stopped. - nni_thr_fini(&ep->ep_thr); - - if (ep->ep_sock != NULL) { - // This is idempotent; harmless if not already on the list. - nni_sock_rem_ep(ep->ep_sock, ep); - } if (ep->ep_data != NULL) { ep->ep_ops.ep_fini(ep->ep_data); } @@ -181,7 +217,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) return (rv); } - if ((rv = nni_sock_add_ep(sock, ep)) != 0) { + if ((rv = nni_sock_ep_add(sock, ep)) != 0) { nni_objhash_unref(nni_eps, id); return (rv); } @@ -192,22 +228,50 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) void +nni_ep_stop(nni_ep *ep) +{ + nni_mtx_lock(&ep->ep_mtx); + if (ep->ep_closed == 0) { + ep->ep_closed = 1; + ep->ep_ops.ep_close(ep->ep_data); + } + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); +} + + +void nni_ep_close(nni_ep *ep) { nni_pipe *pipe; + + nni_ep_stop(ep); + nni_mtx_lock(&ep->ep_mtx); + NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { + nni_pipe_close(pipe); + } + nni_mtx_unlock(&ep->ep_mtx); +} + + +void +nni_ep_remove(nni_ep *ep) +{ + nni_pipe *pipe; nni_sock *sock = ep->ep_sock; + nni_ep_close(ep); + nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed == 0) { - ep->ep_closed = 1; - ep->ep_ops.ep_close(ep->ep_data); - if ((pipe = ep->ep_pipe) != NULL) { - pipe->p_ep = NULL; - ep->ep_pipe = NULL; - } - nni_cv_wake(&ep->ep_cv); + while (nni_list_first(&ep->ep_pipes) != NULL) { + nni_cv_wait(&ep->ep_cv); } nni_mtx_unlock(&ep->ep_mtx); + + nni_sock_ep_remove(sock, ep); + + nni_thr_fini(&ep->ep_thr); + nni_objhash_unref(nni_eps, ep->ep_id); } @@ -222,29 +286,27 @@ nni_ep_connect(nni_ep *ep) } rv = ep->ep_ops.ep_connect(ep->ep_data, pipe->p_tran_data); if (rv != 0) { - nni_pipe_destroy(pipe); + nni_pipe_remove(pipe); return (rv); } if ((rv = nni_pipe_start(pipe)) != 0) { - nni_pipe_close(pipe); + nni_pipe_remove(pipe); return (rv); } ep->ep_pipe = pipe; - pipe->p_ep = ep; return (0); } int -nni_ep_add_pipe(nni_ep *ep, nni_pipe *pipe) +nni_ep_pipe_add(nni_ep *ep, nni_pipe *pipe) { - nni_ep_hold(ep); nni_mtx_lock(&ep->ep_mtx); if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); - nni_ep_rele(ep); return (NNG_ECLOSED); } + nni_list_append(&ep->ep_pipes, pipe); nni_mtx_unlock(&ep->ep_mtx); @@ -253,16 +315,19 @@ nni_ep_add_pipe(nni_ep *ep, nni_pipe *pipe) void -nni_ep_rem_pipe(nni_ep *ep, nni_pipe *pipe) +nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) { - nni_mtx_lock(&ep->ep_mtx); - if (!nni_list_active(&ep->ep_pipes, pipe)) { + if ((ep != NULL) && (nni_list_active(&ep->ep_pipes, pipe))) { + nni_mtx_lock(&ep->ep_mtx); + nni_list_remove(&ep->ep_pipes, pipe); + + if (ep->ep_pipe == pipe) { + ep->ep_pipe = NULL; + } + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); - return; } - nni_list_remove(&ep->ep_pipes, pipe); - nni_mtx_unlock(&ep->ep_mtx); - nni_ep_rele(ep); } @@ -295,7 +360,7 @@ nni_dialer(void *arg) } if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); - break; + return; } nni_mtx_unlock(&ep->ep_mtx); @@ -382,11 +447,11 @@ nni_ep_accept(nni_ep *ep) } rv = ep->ep_ops.ep_accept(ep->ep_data, pipe->p_tran_data); if (rv != 0) { - nni_pipe_destroy(pipe); + nni_pipe_remove(pipe); return (rv); } if ((rv = nni_pipe_start(pipe)) != 0) { - nni_pipe_close(pipe); + nni_pipe_remove(pipe); return (rv); } return (0); |
