aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c78
1 files changed, 47 insertions, 31 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index d623e158..96fb6c7d 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -179,35 +179,14 @@ nni_pipe_peer(nni_pipe *p)
return (p->p_tran_ops.p_peer(p->p_tran_data));
}
-void
-nni_pipe_stats_init(nni_pipe *p)
-{
-#ifdef NNG_ENABLE_STATS
- nni_pipe_stats *st = &p->p_stats;
-
- if (p->p_listener != NULL) {
- st->s_ep_id.si_name = "listener";
- st->s_ep_id.si_desc = "listener for pipe";
- st->s_ep_id.si_value = nni_listener_id(p->p_listener);
- } else {
- st->s_ep_id.si_name = "dialer";
- st->s_ep_id.si_desc = "dialer for pipe";
- st->s_ep_id.si_value = nni_dialer_id(p->p_dialer);
- }
-
- nni_stat_append(NULL, &st->s_root);
-#else
- NNI_ARG_UNUSED(p);
-#endif
-}
-
-int
-nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
+static int
+pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
nni_pipe * p;
int rv;
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
+ nni_pipe_stats * st;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -215,7 +194,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
return (NNG_ENOMEM);
}
- // Make a private copy of the transport ops.
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
@@ -224,6 +202,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
p->p_closed = false;
p->p_cbs = false;
p->p_refcnt = 0;
+ st = &p->p_stats;
nni_atomic_flag_reset(&p->p_stop);
NNI_LIST_NODE_INIT(&p->p_sock_node);
@@ -238,7 +217,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
}
nni_mtx_unlock(&nni_pipe_lk);
- nni_pipe_stats *st = &p->p_stats;
snprintf(st->s_scope, sizeof(st->s_scope), "pipe%u", p->p_id);
nni_stat_init_scope(&st->s_root, st->s_scope, "pipe statistics");
@@ -246,15 +224,11 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_stat_init_id(&st->s_id, "id", "pipe id", p->p_id);
nni_stat_append(&st->s_root, &st->s_id);
- // name and description fleshed out later.
- nni_stat_init_id(&st->s_ep_id, "", "", 0);
- nni_stat_append(&st->s_root, &st->s_ep_id);
-
nni_stat_init_id(&st->s_sock_id, "socket", "socket for pipe",
nni_sock_id(p->p_sock));
nni_stat_append(&st->s_root, &st->s_sock_id);
- if ((rv != 0) || ((rv = tran->tran_pipe->p_init(tdata, p)) != 0) ||
+ if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tdata, p)) != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
nni_pipe_close(p);
nni_pipe_rele(p);
@@ -266,6 +240,48 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
}
int
+nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tdata)
+{
+ int rv;
+ nni_tran * tran = d->d_tran;
+ uint64_t id = nni_dialer_id(d);
+ nni_pipe * p;
+ nni_stat_item *st;
+
+ if ((rv = pipe_create(&p, d->d_sock, tran, tdata)) != 0) {
+ return (rv);
+ }
+ st = &p->p_stats.s_ep_id;
+ p->p_dialer = d;
+ nni_stat_init_id(st, "dialer", "dialer for pipe", id);
+ nni_pipe_add_stat(p, st);
+ nni_stat_append(NULL, &p->p_stats.s_root);
+ *pp = p;
+ return (0);
+}
+
+int
+nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tdata)
+{
+ int rv;
+ nni_tran * tran = l->l_tran;
+ uint64_t id = nni_listener_id(l);
+ nni_pipe * p;
+ nni_stat_item *st;
+
+ if ((rv = pipe_create(&p, l->l_sock, tran, tdata)) != 0) {
+ return (rv);
+ }
+ st = &p->p_stats.s_ep_id;
+ p->p_listener = l;
+ nni_stat_init_id(st, "listener", "listener for pipe", id);
+ nni_pipe_add_stat(p, st);
+ nni_stat_append(NULL, &p->p_stats.s_root);
+ *pp = p;
+ return (0);
+}
+
+int
nni_pipe_getopt(
nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
{