diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 64 |
1 files changed, 53 insertions, 11 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 0670ed01..edc8c15d 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -27,6 +27,7 @@ struct nni_pipe { int p_reap; int p_stop; int p_refcnt; + const char * p_url; nni_mtx p_mtx; nni_cv p_cv; nni_list_node p_reap_node; @@ -34,6 +35,7 @@ struct nni_pipe { }; static nni_idhash *nni_pipes; +static nni_mtx nni_pipe_lk; static nni_list nni_pipe_reap_list; static nni_mtx nni_pipe_reap_lk; @@ -49,6 +51,7 @@ nni_pipe_sys_init(void) int rv; NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node); + nni_mtx_init(&nni_pipe_lk); nni_mtx_init(&nni_pipe_reap_lk); nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk); @@ -85,6 +88,7 @@ nni_pipe_sys_fini(void) nni_thr_fini(&nni_pipe_reap_thr); nni_cv_fini(&nni_pipe_reap_cv); nni_mtx_fini(&nni_pipe_reap_lk); + nni_mtx_fini(&nni_pipe_lk); if (nni_pipes != NULL) { nni_idhash_fini(nni_pipes); nni_pipes = NULL; @@ -103,11 +107,14 @@ nni_pipe_destroy(nni_pipe *p) // Make sure any unlocked holders are done with this. // This happens during initialization for example. - nni_mtx_lock(&p->p_mtx); + nni_mtx_lock(&nni_pipe_lk); + if (p->p_id != 0) { + nni_idhash_remove(nni_pipes, p->p_id); + } while (p->p_refcnt != 0) { nni_cv_wait(&p->p_cv); } - nni_mtx_unlock(&p->p_mtx); + nni_mtx_unlock(&nni_pipe_lk); // We have exclusive access at this point, so we can check if // we are still on any lists. @@ -124,13 +131,35 @@ nni_pipe_destroy(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } - if (p->p_id != 0) { - nni_idhash_remove(nni_pipes, p->p_id); - } nni_mtx_fini(&p->p_mtx); NNI_FREE_STRUCT(p); } +int +nni_pipe_find(nni_pipe **pp, uint32_t id) +{ + int rv; + nni_pipe *p; + nni_mtx_lock(&nni_pipe_lk); + if ((rv = nni_idhash_find(nni_pipes, id, (void **) &p)) == 0) { + p->p_refcnt++; + *pp = p; + } + nni_mtx_unlock(&nni_pipe_lk); + return (rv); +} + +void +nni_pipe_rele(nni_pipe *p) +{ + nni_mtx_lock(&nni_pipe_lk); + p->p_refcnt--; + if (p->p_refcnt == 0) { + nni_cv_wake(&p->p_cv); + } + nni_mtx_unlock(&nni_pipe_lk); +} + // nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. uint32_t nni_pipe_id(nni_pipe *p) @@ -238,16 +267,21 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_proto_data = NULL; p->p_ep = ep; p->p_sock = sock; + p->p_url = nni_ep_url(ep); NNI_LIST_NODE_INIT(&p->p_reap_node); NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); nni_mtx_init(&p->p_mtx); - nni_cv_init(&p->p_cv, &p->p_mtx); + nni_cv_init(&p->p_cv, &nni_pipe_lk); nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p); - if (((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) || + nni_mtx_lock(&nni_pipe_lk); + rv = nni_idhash_alloc(nni_pipes, &p->p_id, p); + nni_mtx_unlock(&nni_pipe_lk); + + if ((rv != 0) || ((rv = nni_ep_pipe_add(ep, p)) != 0) || ((rv = nni_sock_pipe_add(sock, p)) != 0)) { nni_pipe_destroy(p); @@ -259,11 +293,19 @@ nni_pipe_create(nni_ep *ep, void *tdata) int nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) { - /* This should only be called with the mutex held... */ - if (p->p_tran_ops.p_getopt == NULL) { - return (NNG_ENOTSUP); + int rv = NNG_ENOTSUP; + + if (opt == nng_optid_url) { + return (nni_getopt_str(p->p_url, val, szp)); } - return (p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp)); + if (p->p_tran_ops.p_getopt != NULL) { + rv = p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp); + } + if (rv == NNG_ENOTSUP) { + // Maybe its a generic socket option? + rv = nni_sock_getopt(p->p_sock, opt, val, szp); + } + return (rv); } void |
