diff options
| -rw-r--r-- | src/core/pipe.c | 88 | ||||
| -rw-r--r-- | src/core/socket.c | 23 |
2 files changed, 47 insertions, 64 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index ce33222d..d5a9ed85 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -24,7 +24,18 @@ nni_pipe_ctor(uint32_t id) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NULL); } + if (nni_mtx_init(&p->p_mtx) != 0) { + NNI_FREE_STRUCT(p); + return (NULL); + } + + p->p_tran_data = NULL; + p->p_proto_data = NULL; p->p_id = id; + + NNI_LIST_NODE_INIT(&p->p_sock_node); + NNI_LIST_NODE_INIT(&p->p_ep_node); + return (p); } @@ -34,6 +45,13 @@ nni_pipe_dtor(void *ptr) { nni_pipe *p = ptr; + nni_sock_pipe_rem(p->p_sock, p); + + if (p->p_tran_data != NULL) { + p->p_tran_ops.p_fini(p->p_tran_data); + } + + nni_mtx_fini(&p->p_mtx); NNI_FREE_STRUCT(p); } @@ -86,24 +104,19 @@ nni_pipe_aio_send(nni_pipe *p, nni_aio *aio) void nni_pipe_incref(nni_pipe *p) { - nni_mtx_lock(&p->p_mtx); - p->p_refcnt++; - nni_mtx_unlock(&p->p_mtx); + int rv; + nni_pipe *scratch; + + rv = nni_objhash_find(nni_allpipes, p->p_id, (void **) &scratch); + NNI_ASSERT(rv == 0); + NNI_ASSERT(p == scratch); } void nni_pipe_decref(nni_pipe *p) { - nni_mtx_lock(&p->p_mtx); - p->p_refcnt--; - if (p->p_refcnt == 0) { - nni_mtx_unlock(&p->p_mtx); - - nni_pipe_destroy(p); - return; - } - nni_mtx_unlock(&p->p_mtx); + nni_objhash_unref(nni_allpipes, p->p_id); } @@ -128,21 +141,12 @@ nni_pipe_close(nni_pipe *p) p->p_tran_ops.p_close(p->p_tran_data); } - // Unregister our ID so nobody else can find it. - if (p->p_id != 0) { - nni_mtx_lock(nni_idlock); - nni_idhash_remove(nni_pipes, p->p_id); - nni_mtx_unlock(nni_idlock); - p->p_id = 0; - } - nni_mtx_unlock(&p->p_mtx); // Let the socket (and endpoint) know we have closed. nni_sock_pipe_closed(sock, p); - // Drop a reference count, possibly doing deferred destroy. - nni_pipe_decref(p); + nni_objhash_unref(nni_allpipes, p->p_id); } @@ -157,22 +161,14 @@ int nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) { nni_pipe *p; - void *pdata; int rv; + uint32_t id; - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_mtx_init(&p->p_mtx)) != 0) { - NNI_FREE_STRUCT(p); + rv = nni_objhash_alloc(nni_allpipes, &id, (void **) &p); + if (rv != 0) { return (rv); } p->p_sock = sock; - p->p_tran_data = NULL; - p->p_proto_data = NULL; - p->p_id = 0; - NNI_LIST_NODE_INIT(&p->p_sock_node); - NNI_LIST_NODE_INIT(&p->p_ep_node); // Make a copy of the transport ops. We can override entry points // and we avoid an extra dereference on hot code paths. @@ -180,15 +176,12 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) // Initialize the transport pipe data. if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) { - nni_mtx_fini(&p->p_mtx); - NNI_FREE_STRUCT(p); + nni_objhash_unref(nni_allpipes, p->p_id); return (rv); } if ((rv = nni_sock_pipe_add(sock, p)) != 0) { - p->p_tran_ops.p_fini(p->p_tran_data); - nni_mtx_fini(&p->p_mtx); - NNI_FREE_STRUCT(p); + nni_objhash_unref(nni_allpipes, p->p_id); return (rv); } @@ -202,14 +195,7 @@ nni_pipe_destroy(nni_pipe *p) { NNI_ASSERT(p->p_refcnt == 0); - // The caller is responsible for ensuring that the pipe - // is not in use by any other consumers. It must not be started - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_fini(p->p_tran_data); - } - nni_sock_pipe_rem(p->p_sock, p); - nni_mtx_fini(&p->p_mtx); - NNI_FREE_STRUCT(p); + nni_objhash_unref_wait(nni_allpipes, p->p_id); } @@ -228,17 +214,13 @@ int nni_pipe_start(nni_pipe *p) { int rv; + nni_pipe *scratch; - nni_pipe_incref(p); - - nni_mtx_lock(nni_idlock); - rv = nni_idhash_alloc(nni_pipes, &p->p_id, p); - nni_mtx_unlock(nni_idlock); - + rv = nni_objhash_find(nni_allpipes, p->p_id, (void **) &scratch); if (rv != 0) { - nni_pipe_close(p); return (rv); } + NNI_ASSERT(p == scratch); if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) { nni_pipe_close(p); diff --git a/src/core/socket.c b/src/core/socket.c index ae399d73..fe06cc73 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -82,6 +82,9 @@ nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) if (rv != 0) { return (rv); } + + // XXX: place a hold on the socket. + nni_mtx_lock(&sock->s_mx); nni_pipe_set_proto_data(pipe, pdata); nni_list_append(&sock->s_pipes, pipe); @@ -112,9 +115,6 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) return (rv); } - nni_list_remove(&sock->s_idles, pipe); - nni_list_append(&sock->s_pipes, pipe); - nni_mtx_unlock(&sock->s_mx); return (0); @@ -133,7 +133,9 @@ nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe) // is on, and so if the pipe is already on the idle list these // two statements are effectively a no-op. nni_list_remove(&sock->s_pipes, pipe); - nni_list_append(&sock->s_idles, pipe); + if (nni_list_first(&sock->s_pipes) == NULL) { + nni_cv_wake(&sock->s_cv); + } sock->s_pipe_ops.pipe_stop(pdata); @@ -153,12 +155,13 @@ nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe) void *pdata = nni_pipe_get_proto_data(pipe); nni_mtx_lock(&sock->s_mx); - nni_list_remove(&sock->s_idles, pipe); + nni_list_remove(&sock->s_pipes, pipe); if (pdata != NULL) { sock->s_pipe_ops.pipe_fini(pdata); } + // XXX: Move this to a seperate ep-specific API. // Notify the endpoint that the pipe has closed - if not already done. if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { ep->ep_pipe = NULL; @@ -166,6 +169,8 @@ nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe) } nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); + + // XXX release the hold on the pipe } @@ -573,10 +578,8 @@ nni_sock_shutdown(nni_sock *sock) // For each pipe, close the underlying transport. Also move it // to the idle list so we won't keep looping. while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_pipe_incref(pipe); nni_mtx_unlock(&sock->s_mx); nni_pipe_close(pipe); - nni_pipe_decref(pipe); nni_mtx_lock(&sock->s_mx); } @@ -584,10 +587,8 @@ nni_sock_shutdown(nni_sock *sock) nni_cv_wake(&sock->s_cv); - while ((nni_list_first(&sock->s_idles) != NULL) || - (nni_list_first(&sock->s_pipes) != NULL)) { - nni_cv_wait(&sock->s_cv); - } + NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL); + nni_mtx_unlock(&sock->s_mx); // At this point, there are no threads blocked inside of us |
