diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 169 |
1 files changed, 110 insertions, 59 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 05089fee..eaf49af9 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -38,9 +38,8 @@ pipe_destroy(void *arg) nni_pipe *p = arg; p->p_proto_ops.pipe_fini(p->p_proto_data); - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_fini(p->p_tran_data); - } + p->p_tran_ops.p_fini(p->p_tran_data); + nni_free(p, p->p_size); } @@ -49,6 +48,11 @@ pipe_reap(void *arg) { nni_pipe *p = arg; + p->p_proto_ops.pipe_close(p->p_proto_data); + + // Close the underlying transport. + p->p_tran_ops.p_close(p->p_tran_data); + nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST); // Make sure any unlocked holders are done with this. @@ -62,12 +66,11 @@ pipe_reap(void *arg) #ifdef NNG_ENABLE_STATS nni_stat_unregister(&p->st_root); #endif - nni_pipe_remove(p); p->p_proto_ops.pipe_stop(p->p_proto_data); - if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { - p->p_tran_ops.p_stop(p->p_tran_data); - } + p->p_tran_ops.p_stop(p->p_tran_data); + + nni_pipe_remove(p); nni_pipe_rele(p); } @@ -125,13 +128,6 @@ nni_pipe_close(nni_pipe *p) return; // We already did a close. } - p->p_proto_ops.pipe_close(p->p_proto_data); - - // Close the underlying transport. - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_close(p->p_tran_data); - } - nni_reap(&pipe_reap_list, p); } @@ -154,10 +150,12 @@ pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info) nni_stat_init(item, info); nni_stat_add(&p->st_root, item); } +#endif static void pipe_stats_init(nni_pipe *p) { +#ifdef NNG_ENABLE_STATS static const nni_stat_info root_info = { .si_name = "pipe", .si_desc = "pipe statistics", @@ -201,6 +199,16 @@ pipe_stats_init(nni_pipe *p) .si_unit = NNG_UNIT_BYTES, .si_atomic = true, }; + static const nni_stat_info dialer_info = { + .si_name = "dialer", + .si_desc = "dialer for pipe", + .si_type = NNG_STAT_ID, + }; + static const nni_stat_info listener_info = { + .si_name = "listener", + .si_desc = "listener for pipe", + .si_type = NNG_STAT_ID, + }; nni_stat_init(&p->st_root, &root_info); pipe_stat_init(p, &p->st_id, &id_info); @@ -213,34 +221,56 @@ pipe_stats_init(nni_pipe *p) nni_stat_set_id(&p->st_root, (int) p->p_id); nni_stat_set_id(&p->st_id, (int) p->p_id); nni_stat_set_id(&p->st_sock_id, (int) nni_sock_id(p->p_sock)); -} + + if (p->p_dialer) { + NNI_ASSERT(p->p_listener == NULL); + pipe_stat_init(p, &p->st_ep_id, &dialer_info); + nni_stat_set_id( + &p->st_ep_id, (int) nni_dialer_id(p->p_dialer)); + } + if (p->p_listener) { + pipe_stat_init(p, &p->st_ep_id, &listener_info); + nni_stat_set_id( + &p->st_ep_id, (int) nni_listener_id(p->p_listener)); + } +#else + NNI_ARG_UNUSED(p); #endif // NNG_ENABLE_STATS +} static int -pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) +pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d, + nni_listener *l, void *tran_data) { - nni_pipe *p; - int rv; - void *sock_data = nni_sock_proto_data(sock); - nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); - size_t sz; + nni_pipe *p; + int rv1, rv2, rv3; + void *sock_data = nni_sock_proto_data(sock); + nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); + const nni_sp_pipe_ops *tops = tran->tran_pipe; + size_t sz; - sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size; + sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) + + NNI_ALIGN_UP(tops->p_size); if ((p = nni_zalloc(sz)) == NULL) { - // In this case we just toss the pipe... - tran->tran_pipe->p_fini(tran_data); + // TODO: remove when all transports converted + // to use p_size. + if (tran_data != NULL) { + tops->p_fini(tran_data); + } return (NNG_ENOMEM); } - p->p_size = sz; - p->p_proto_data = p + 1; - p->p_tran_ops = *tran->tran_pipe; - p->p_tran_data = tran_data; - p->p_proto_ops = *pops; - p->p_sock = sock; - p->p_cbs = false; + p->p_size = sz; + p->p_proto_ops = *pops; + p->p_tran_ops = *tops; + p->p_sock = sock; + p->p_cbs = false; + p->p_dialer = d; + p->p_listener = l; + // Two references - one for our caller, and + // one to be dropped when the pipe is closed. nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy); nni_atomic_init_bool(&p->p_closed); @@ -248,19 +278,30 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); + uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p)); + + if (tran_data == NULL) { + tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size); + } + nni_pipe_add(p); + + p->p_tran_data = tran_data; + p->p_proto_data = proto_data; + nni_mtx_lock(&pipes_lk); - rv = nni_id_alloc32(&pipes, &p->p_id, p); + rv1 = nni_id_alloc32(&pipes, &p->p_id, p); nni_mtx_unlock(&pipes_lk); -#ifdef NNG_ENABLE_STATS + // must be done before protocol or transports, because + // they may add further stats pipe_stats_init(p); -#endif - if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) || - ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) { + rv2 = tops->p_init(tran_data, p); + rv3 = pops->pipe_init(proto_data, p, sock_data); + if (rv1 != 0 || rv2 != 0 || rv3 != 0) { nni_pipe_close(p); nni_pipe_rele(p); - return (rv); + return (rv1 ? rv1 : rv2 ? rv2 : rv3); } *pp = p; @@ -274,19 +315,9 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data) nni_sp_tran *tran = d->d_tran; nni_pipe *p; - if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) { + if ((rv = pipe_create(&p, d->d_sock, tran, d, NULL, tran_data)) != 0) { return (rv); } - p->p_dialer = d; -#ifdef NNG_ENABLE_STATS - static const nni_stat_info dialer_info = { - .si_name = "dialer", - .si_desc = "dialer for pipe", - .si_type = NNG_STAT_ID, - }; - pipe_stat_init(p, &p->st_ep_id, &dialer_info); - nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d)); -#endif *pp = p; return (0); } @@ -298,24 +329,44 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data) nni_sp_tran *tran = l->l_tran; nni_pipe *p; - if ((rv = pipe_create(&p, l->l_sock, tran, tran_data)) != 0) { + if ((rv = pipe_create(&p, l->l_sock, tran, NULL, l, tran_data)) != 0) { return (rv); } - p->p_listener = l; -#ifdef NNG_ENABLE_STATS - static const nni_stat_info listener_info = { - .si_name = "listener", - .si_desc = "listener for pipe", - .si_type = NNG_STAT_ID, - }; - pipe_stat_init(p, &p->st_ep_id, &listener_info); - nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l)); -#endif *pp = p; return (0); } int +nni_pipe_alloc_dialer(void **datap, nni_dialer *d) +{ + int rv; + nni_sp_tran *tran = d->d_tran; + nni_sock *s = d->d_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, d, NULL, NULL)) != 0) { + return (rv); + } + *datap = p->p_tran_data; + return (0); +} + +int +nni_pipe_alloc_listener(void **datap, nni_listener *l) +{ + int rv; + nni_sp_tran *tran = l->l_tran; + nni_sock *s = l->l_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, NULL, l, NULL)) != 0) { + return (rv); + } + *datap = p->p_tran_data; + return (0); +} + +int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) { |
