aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-08 22:18:45 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-08 22:18:45 -0700
commit402c355eee701976a6c8ee00f6a915d1e417e163 (patch)
treea76680ce089735d2269d9799bd919c6dae070ede
parent765a39a398dfacbc029c91e1b6b1d5e74a5f5e9f (diff)
downloadnng-402c355eee701976a6c8ee00f6a915d1e417e163.tar.gz
nng-402c355eee701976a6c8ee00f6a915d1e417e163.tar.bz2
nng-402c355eee701976a6c8ee00f6a915d1e417e163.zip
Pipes are now mostly using object hash -- taskq_cancel race TBD.
-rw-r--r--src/core/pipe.c88
-rw-r--r--src/core/socket.c23
2 files changed, 47 insertions, 64 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index ce33222d..d5a9ed85 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -24,7 +24,18 @@ nni_pipe_ctor(uint32_t id)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NULL);
}
+ if (nni_mtx_init(&p->p_mtx) != 0) {
+ NNI_FREE_STRUCT(p);
+ return (NULL);
+ }
+
+ p->p_tran_data = NULL;
+ p->p_proto_data = NULL;
p->p_id = id;
+
+ NNI_LIST_NODE_INIT(&p->p_sock_node);
+ NNI_LIST_NODE_INIT(&p->p_ep_node);
+
return (p);
}
@@ -34,6 +45,13 @@ nni_pipe_dtor(void *ptr)
{
nni_pipe *p = ptr;
+ nni_sock_pipe_rem(p->p_sock, p);
+
+ if (p->p_tran_data != NULL) {
+ p->p_tran_ops.p_fini(p->p_tran_data);
+ }
+
+ nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
}
@@ -86,24 +104,19 @@ nni_pipe_aio_send(nni_pipe *p, nni_aio *aio)
void
nni_pipe_incref(nni_pipe *p)
{
- nni_mtx_lock(&p->p_mtx);
- p->p_refcnt++;
- nni_mtx_unlock(&p->p_mtx);
+ int rv;
+ nni_pipe *scratch;
+
+ rv = nni_objhash_find(nni_allpipes, p->p_id, (void **) &scratch);
+ NNI_ASSERT(rv == 0);
+ NNI_ASSERT(p == scratch);
}
void
nni_pipe_decref(nni_pipe *p)
{
- nni_mtx_lock(&p->p_mtx);
- p->p_refcnt--;
- if (p->p_refcnt == 0) {
- nni_mtx_unlock(&p->p_mtx);
-
- nni_pipe_destroy(p);
- return;
- }
- nni_mtx_unlock(&p->p_mtx);
+ nni_objhash_unref(nni_allpipes, p->p_id);
}
@@ -128,21 +141,12 @@ nni_pipe_close(nni_pipe *p)
p->p_tran_ops.p_close(p->p_tran_data);
}
- // Unregister our ID so nobody else can find it.
- if (p->p_id != 0) {
- nni_mtx_lock(nni_idlock);
- nni_idhash_remove(nni_pipes, p->p_id);
- nni_mtx_unlock(nni_idlock);
- p->p_id = 0;
- }
-
nni_mtx_unlock(&p->p_mtx);
// Let the socket (and endpoint) know we have closed.
nni_sock_pipe_closed(sock, p);
- // Drop a reference count, possibly doing deferred destroy.
- nni_pipe_decref(p);
+ nni_objhash_unref(nni_allpipes, p->p_id);
}
@@ -157,22 +161,14 @@ int
nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
{
nni_pipe *p;
- void *pdata;
int rv;
+ uint32_t id;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_mtx_init(&p->p_mtx)) != 0) {
- NNI_FREE_STRUCT(p);
+ rv = nni_objhash_alloc(nni_allpipes, &id, (void **) &p);
+ if (rv != 0) {
return (rv);
}
p->p_sock = sock;
- p->p_tran_data = NULL;
- p->p_proto_data = NULL;
- p->p_id = 0;
- NNI_LIST_NODE_INIT(&p->p_sock_node);
- NNI_LIST_NODE_INIT(&p->p_ep_node);
// Make a copy of the transport ops. We can override entry points
// and we avoid an extra dereference on hot code paths.
@@ -180,15 +176,12 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
// Initialize the transport pipe data.
if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) {
- nni_mtx_fini(&p->p_mtx);
- NNI_FREE_STRUCT(p);
+ nni_objhash_unref(nni_allpipes, p->p_id);
return (rv);
}
if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- p->p_tran_ops.p_fini(p->p_tran_data);
- nni_mtx_fini(&p->p_mtx);
- NNI_FREE_STRUCT(p);
+ nni_objhash_unref(nni_allpipes, p->p_id);
return (rv);
}
@@ -202,14 +195,7 @@ nni_pipe_destroy(nni_pipe *p)
{
NNI_ASSERT(p->p_refcnt == 0);
- // The caller is responsible for ensuring that the pipe
- // is not in use by any other consumers. It must not be started
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.p_fini(p->p_tran_data);
- }
- nni_sock_pipe_rem(p->p_sock, p);
- nni_mtx_fini(&p->p_mtx);
- NNI_FREE_STRUCT(p);
+ nni_objhash_unref_wait(nni_allpipes, p->p_id);
}
@@ -228,17 +214,13 @@ int
nni_pipe_start(nni_pipe *p)
{
int rv;
+ nni_pipe *scratch;
- nni_pipe_incref(p);
-
- nni_mtx_lock(nni_idlock);
- rv = nni_idhash_alloc(nni_pipes, &p->p_id, p);
- nni_mtx_unlock(nni_idlock);
-
+ rv = nni_objhash_find(nni_allpipes, p->p_id, (void **) &scratch);
if (rv != 0) {
- nni_pipe_close(p);
return (rv);
}
+ NNI_ASSERT(p == scratch);
if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) {
nni_pipe_close(p);
diff --git a/src/core/socket.c b/src/core/socket.c
index ae399d73..fe06cc73 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -82,6 +82,9 @@ nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
if (rv != 0) {
return (rv);
}
+
+ // XXX: place a hold on the socket.
+
nni_mtx_lock(&sock->s_mx);
nni_pipe_set_proto_data(pipe, pdata);
nni_list_append(&sock->s_pipes, pipe);
@@ -112,9 +115,6 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
return (rv);
}
- nni_list_remove(&sock->s_idles, pipe);
- nni_list_append(&sock->s_pipes, pipe);
-
nni_mtx_unlock(&sock->s_mx);
return (0);
@@ -133,7 +133,9 @@ nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe)
// is on, and so if the pipe is already on the idle list these
// two statements are effectively a no-op.
nni_list_remove(&sock->s_pipes, pipe);
- nni_list_append(&sock->s_idles, pipe);
+ if (nni_list_first(&sock->s_pipes) == NULL) {
+ nni_cv_wake(&sock->s_cv);
+ }
sock->s_pipe_ops.pipe_stop(pdata);
@@ -153,12 +155,13 @@ nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe)
void *pdata = nni_pipe_get_proto_data(pipe);
nni_mtx_lock(&sock->s_mx);
- nni_list_remove(&sock->s_idles, pipe);
+ nni_list_remove(&sock->s_pipes, pipe);
if (pdata != NULL) {
sock->s_pipe_ops.pipe_fini(pdata);
}
+ // XXX: Move this to a seperate ep-specific API.
// Notify the endpoint that the pipe has closed - if not already done.
if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) {
ep->ep_pipe = NULL;
@@ -166,6 +169,8 @@ nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe)
}
nni_cv_wake(&sock->s_cv);
nni_mtx_unlock(&sock->s_mx);
+
+ // XXX release the hold on the pipe
}
@@ -573,10 +578,8 @@ nni_sock_shutdown(nni_sock *sock)
// For each pipe, close the underlying transport. Also move it
// to the idle list so we won't keep looping.
while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- nni_pipe_incref(pipe);
nni_mtx_unlock(&sock->s_mx);
nni_pipe_close(pipe);
- nni_pipe_decref(pipe);
nni_mtx_lock(&sock->s_mx);
}
@@ -584,10 +587,8 @@ nni_sock_shutdown(nni_sock *sock)
nni_cv_wake(&sock->s_cv);
- while ((nni_list_first(&sock->s_idles) != NULL) ||
- (nni_list_first(&sock->s_pipes) != NULL)) {
- nni_cv_wait(&sock->s_cv);
- }
+ NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL);
+
nni_mtx_unlock(&sock->s_mx);
// At this point, there are no threads blocked inside of us