diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-29 23:49:05 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-29 23:49:05 -0800 |
| commit | 0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6 (patch) | |
| tree | b2ea7f56ae0dc6d32219e695725f55d78b9e0b34 /src/core/pipe.c | |
| parent | 5d90b485fdb39cac7d1aac2ab8958ecd585ac69b (diff) | |
| download | nng-0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6.tar.gz nng-0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6.tar.bz2 nng-0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6.zip | |
Factor out repeated protocol code into common.
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 129 |
1 files changed, 127 insertions, 2 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 15ae393b..0a3f1d33 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -71,9 +71,19 @@ nni_pipe_peer(nni_pipe *p) void nni_pipe_destroy(nni_pipe *p) { + if (p->p_send_thr != NULL) { + nni_thread_reap(p->p_send_thr); + } + if (p->p_recv_thr != NULL) { + nni_thread_reap(p->p_recv_thr); + } if (p->p_trandata != NULL) { p->p_ops.p_destroy(p->p_trandata); } + nni_cond_fini(&p->p_cv); + if (p->p_pdata != NULL) { + nni_free(p->p_pdata, p->p_psize); + } nni_free(p, sizeof (*p)); } @@ -82,15 +92,28 @@ int nni_pipe_create(nni_pipe **pp, nni_endpt *ep) { nni_pipe *p; + nni_socket *sock = ep->ep_sock; + int rv; if ((p = nni_alloc(sizeof (*p))) == NULL) { return (NNG_ENOMEM); } p->p_trandata = NULL; - p->p_protdata = NULL; + if ((rv = nni_cond_init(&p->p_cv, &sock->s_mx)) != 0) { + nni_free(p, sizeof (*p)); + return (rv); + } + p->p_psize = sock->s_ops.proto_pipe_size; + if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) { + nni_cond_fini(&p->p_cv); + nni_free(p, sizeof (*p)); + return (NNG_ENOMEM); + } + p->p_sock = sock; p->p_ops = *ep->ep_ops.ep_pipe_ops; p->p_ep = ep; - p->p_sock = ep->ep_sock; + p->p_active = 0; + p->p_abort = 0; if (ep->ep_dialer != NULL) { ep->ep_pipe = p; } @@ -109,3 +132,105 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) } return (p->p_ops.p_getopt(p->p_trandata, opt, val, szp)); } + + +static void +nni_pipe_sender(void *arg) +{ + nni_pipe *p = arg; + + nni_mutex_enter(&p->p_sock->s_mx); + while ((!p->p_active) && (!p->p_abort)) { + nni_cond_wait(&p->p_cv); + } + if (p->p_abort) { + nni_mutex_exit(&p->p_sock->s_mx); + return; + } + nni_mutex_exit(&p->p_sock->s_mx); + if (p->p_sock->s_ops.proto_pipe_send != NULL) { + p->p_sock->s_ops.proto_pipe_send(p->p_pdata); + } +} + + +static void +nni_pipe_receiver(void *arg) +{ + nni_pipe *p = arg; + + nni_mutex_enter(&p->p_sock->s_mx); + while ((!p->p_active) && (!p->p_abort)) { + nni_cond_wait(&p->p_cv); + } + if (p->p_abort) { + nni_mutex_exit(&p->p_sock->s_mx); + return; + } + nni_mutex_exit(&p->p_sock->s_mx); + if (p->p_sock->s_ops.proto_pipe_recv != NULL) { + p->p_sock->s_ops.proto_pipe_recv(p->p_pdata); + } +} + + +int +nni_pipe_start(nni_pipe *pipe) +{ + int rv; + int collide; + nni_socket *sock = pipe->p_sock; + + nni_mutex_enter(&sock->s_mx); + if (sock->s_closing) { + nni_mutex_exit(&sock->s_mx); + return (NNG_ECLOSED); + } + + do { + // We generate a new pipe ID, but we make sure it does not + // collide with any we already have. This can only normally + // happen if we wrap -- i.e. we've had 4 billion or so pipes. + // XXX: consider making this a hash table!! + nni_pipe *check; + pipe->p_id = nni_plat_nextid() & 0x7FFFFFFF; + collide = 0; + NNI_LIST_FOREACH (&sock->s_pipes, check) { + if (check->p_id == pipe->p_id) { + collide = 1; + break; + } + } + } while (collide); + + rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_sender, pipe); + if (rv != 0) { + goto fail; + } + rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_receiver, pipe); + if (rv != 0) { + goto fail; + } + + rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata); + if (rv != 0) { + goto fail; + } + nni_list_append(&sock->s_pipes, pipe); + pipe->p_active = 1; + + // XXX: Publish event + nni_cond_broadcast(&pipe->p_cv); + + nni_mutex_exit(&sock->s_mx); + return (0); + +fail: + pipe->p_abort = 1; + pipe->p_reap = 1; + nni_list_append(&sock->s_reaps, pipe); + nni_cond_broadcast(&sock->s_cv); + nni_cond_broadcast(&pipe->p_cv); + nni_mutex_exit(&sock->s_mx); + return (rv); +} |
