diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 113 |
1 files changed, 34 insertions, 79 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index b12a7f1c..8fa41934 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -44,16 +44,15 @@ nni_pipe_close(nni_pipe *p) { nni_socket *sock = p->p_sock; - p->p_ops.p_close(p->p_trandata); + if (p->p_trandata != NULL) { + p->p_ops.p_close(p->p_trandata); + } nni_mutex_enter(&sock->s_mx); if (!p->p_reap) { // schedule deferred reap/close p->p_reap = 1; - if (p->p_active) { - nni_list_remove(&sock->s_pipes, p); - p->p_active = 0; - } + nni_list_remove(&sock->s_pipes, p); nni_list_append(&sock->s_reaps, p); nni_cond_broadcast(&sock->s_cv); } @@ -71,16 +70,12 @@ nni_pipe_peer(nni_pipe *p) void nni_pipe_destroy(nni_pipe *p) { - if (p->p_send_thr != NULL) { - nni_thread_reap(p->p_send_thr); - } - if (p->p_recv_thr != NULL) { - nni_thread_reap(p->p_recv_thr); - } + nni_thr_fini(&p->p_send_thr); + nni_thr_fini(&p->p_recv_thr); + if (p->p_trandata != NULL) { p->p_ops.p_destroy(p->p_trandata); } - nni_cond_fini(&p->p_cv); if (p->p_pdata != NULL) { nni_free(p->p_pdata, p->p_psize); } @@ -93,29 +88,41 @@ nni_pipe_create(nni_pipe **pp, nni_endpt *ep) { nni_pipe *p; nni_socket *sock = ep->ep_sock; + nni_protocol *proto = &sock->s_ops; int rv; if ((p = nni_alloc(sizeof (*p))) == NULL) { return (NNG_ENOMEM); } - p->p_send_thr = NULL; - p->p_recv_thr = NULL; + p->p_sock = sock; + p->p_ops = *ep->ep_ops.ep_pipe_ops; p->p_trandata = NULL; p->p_active = 0; - p->p_abort = 0; - if ((rv = nni_cond_init(&p->p_cv, &sock->s_mx)) != 0) { + p->p_psize = proto->proto_pipe_size; + NNI_LIST_NODE_INIT(&p->p_node); + + if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) { + nni_free(p, sizeof (*p)); + return (NNG_ENOMEM); + } + rv = nni_thr_init(&p->p_recv_thr, proto->proto_pipe_recv, p->p_pdata); + if (rv != 0) { + nni_free(p->p_pdata, p->p_psize); nni_free(p, sizeof (*p)); return (rv); } - p->p_psize = sock->s_ops.proto_pipe_size; - if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) { - nni_cond_fini(&p->p_cv); + rv = nni_thr_init(&p->p_send_thr, proto->proto_pipe_send, p->p_pdata); + if (rv != 0) { + nni_thr_fini(&p->p_recv_thr); + nni_free(p->p_pdata, p->p_psize); nni_free(p, sizeof (*p)); - return (NNG_ENOMEM); + return (rv); } - p->p_sock = sock; - p->p_ops = *ep->ep_ops.ep_pipe_ops; - NNI_LIST_NODE_INIT(&p->p_node); + p->p_psize = sock->s_ops.proto_pipe_size; + nni_mutex_enter(&sock->s_mx); + nni_list_append(&sock->s_pipes, p); + nni_mutex_exit(&sock->s_mx); + *pp = p; return (0); } @@ -132,46 +139,6 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) } -static void -nni_pipe_sender(void *arg) -{ - nni_pipe *p = arg; - - nni_mutex_enter(&p->p_sock->s_mx); - while ((!p->p_active) && (!p->p_abort)) { - nni_cond_wait(&p->p_cv); - } - if (p->p_abort) { - nni_mutex_exit(&p->p_sock->s_mx); - return; - } - nni_mutex_exit(&p->p_sock->s_mx); - if (p->p_sock->s_ops.proto_pipe_send != NULL) { - p->p_sock->s_ops.proto_pipe_send(p->p_pdata); - } -} - - -static void -nni_pipe_receiver(void *arg) -{ - nni_pipe *p = arg; - - nni_mutex_enter(&p->p_sock->s_mx); - while ((!p->p_active) && (!p->p_abort)) { - nni_cond_wait(&p->p_cv); - } - if (p->p_abort) { - nni_mutex_exit(&p->p_sock->s_mx); - return; - } - nni_mutex_exit(&p->p_sock->s_mx); - if (p->p_sock->s_ops.proto_pipe_recv != NULL) { - p->p_sock->s_ops.proto_pipe_recv(p->p_pdata); - } -} - - int nni_pipe_start(nni_pipe *pipe) { @@ -194,41 +161,29 @@ nni_pipe_start(nni_pipe *pipe) pipe->p_id = nni_plat_nextid() & 0x7FFFFFFF; collide = 0; NNI_LIST_FOREACH (&sock->s_pipes, check) { - if (check->p_id == pipe->p_id) { + if ((pipe != check) && (check->p_id == pipe->p_id)) { collide = 1; break; } } } while (collide); - rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_sender, pipe); - if (rv != 0) { - goto fail; - } - rv = nni_thread_create(&pipe->p_recv_thr, nni_pipe_receiver, pipe); - if (rv != 0) { - goto fail; - } - rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata); if (rv != 0) { goto fail; } - nni_list_append(&sock->s_pipes, pipe); + nni_thr_run(&pipe->p_send_thr); + nni_thr_run(&pipe->p_recv_thr); pipe->p_active = 1; // XXX: Publish event - nni_cond_broadcast(&pipe->p_cv); nni_mutex_exit(&sock->s_mx); return (0); fail: - pipe->p_abort = 1; pipe->p_reap = 1; - nni_list_append(&sock->s_reaps, pipe); - nni_cond_broadcast(&sock->s_cv); - nni_cond_broadcast(&pipe->p_cv); nni_mutex_exit(&sock->s_mx); + nni_pipe_close(pipe); return (rv); } |
