aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c169
1 files changed, 110 insertions, 59 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 05089fee..eaf49af9 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -38,9 +38,8 @@ pipe_destroy(void *arg)
nni_pipe *p = arg;
p->p_proto_ops.pipe_fini(p->p_proto_data);
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.p_fini(p->p_tran_data);
- }
+ p->p_tran_ops.p_fini(p->p_tran_data);
+
nni_free(p, p->p_size);
}
@@ -49,6 +48,11 @@ pipe_reap(void *arg)
{
nni_pipe *p = arg;
+ p->p_proto_ops.pipe_close(p->p_proto_data);
+
+ // Close the underlying transport.
+ p->p_tran_ops.p_close(p->p_tran_data);
+
nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST);
// Make sure any unlocked holders are done with this.
@@ -62,12 +66,11 @@ pipe_reap(void *arg)
#ifdef NNG_ENABLE_STATS
nni_stat_unregister(&p->st_root);
#endif
- nni_pipe_remove(p);
p->p_proto_ops.pipe_stop(p->p_proto_data);
- if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) {
- p->p_tran_ops.p_stop(p->p_tran_data);
- }
+ p->p_tran_ops.p_stop(p->p_tran_data);
+
+ nni_pipe_remove(p);
nni_pipe_rele(p);
}
@@ -125,13 +128,6 @@ nni_pipe_close(nni_pipe *p)
return; // We already did a close.
}
- p->p_proto_ops.pipe_close(p->p_proto_data);
-
- // Close the underlying transport.
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.p_close(p->p_tran_data);
- }
-
nni_reap(&pipe_reap_list, p);
}
@@ -154,10 +150,12 @@ pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info)
nni_stat_init(item, info);
nni_stat_add(&p->st_root, item);
}
+#endif
static void
pipe_stats_init(nni_pipe *p)
{
+#ifdef NNG_ENABLE_STATS
static const nni_stat_info root_info = {
.si_name = "pipe",
.si_desc = "pipe statistics",
@@ -201,6 +199,16 @@ pipe_stats_init(nni_pipe *p)
.si_unit = NNG_UNIT_BYTES,
.si_atomic = true,
};
+ static const nni_stat_info dialer_info = {
+ .si_name = "dialer",
+ .si_desc = "dialer for pipe",
+ .si_type = NNG_STAT_ID,
+ };
+ static const nni_stat_info listener_info = {
+ .si_name = "listener",
+ .si_desc = "listener for pipe",
+ .si_type = NNG_STAT_ID,
+ };
nni_stat_init(&p->st_root, &root_info);
pipe_stat_init(p, &p->st_id, &id_info);
@@ -213,34 +221,56 @@ pipe_stats_init(nni_pipe *p)
nni_stat_set_id(&p->st_root, (int) p->p_id);
nni_stat_set_id(&p->st_id, (int) p->p_id);
nni_stat_set_id(&p->st_sock_id, (int) nni_sock_id(p->p_sock));
-}
+
+ if (p->p_dialer) {
+ NNI_ASSERT(p->p_listener == NULL);
+ pipe_stat_init(p, &p->st_ep_id, &dialer_info);
+ nni_stat_set_id(
+ &p->st_ep_id, (int) nni_dialer_id(p->p_dialer));
+ }
+ if (p->p_listener) {
+ pipe_stat_init(p, &p->st_ep_id, &listener_info);
+ nni_stat_set_id(
+ &p->st_ep_id, (int) nni_listener_id(p->p_listener));
+ }
+#else
+ NNI_ARG_UNUSED(p);
#endif // NNG_ENABLE_STATS
+}
static int
-pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)
+pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d,
+ nni_listener *l, void *tran_data)
{
- nni_pipe *p;
- int rv;
- void *sock_data = nni_sock_proto_data(sock);
- nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
- size_t sz;
+ nni_pipe *p;
+ int rv1, rv2, rv3;
+ void *sock_data = nni_sock_proto_data(sock);
+ nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
+ const nni_sp_pipe_ops *tops = tran->tran_pipe;
+ size_t sz;
- sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size;
+ sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) +
+ NNI_ALIGN_UP(tops->p_size);
if ((p = nni_zalloc(sz)) == NULL) {
- // In this case we just toss the pipe...
- tran->tran_pipe->p_fini(tran_data);
+ // TODO: remove when all transports converted
+ // to use p_size.
+ if (tran_data != NULL) {
+ tops->p_fini(tran_data);
+ }
return (NNG_ENOMEM);
}
- p->p_size = sz;
- p->p_proto_data = p + 1;
- p->p_tran_ops = *tran->tran_pipe;
- p->p_tran_data = tran_data;
- p->p_proto_ops = *pops;
- p->p_sock = sock;
- p->p_cbs = false;
+ p->p_size = sz;
+ p->p_proto_ops = *pops;
+ p->p_tran_ops = *tops;
+ p->p_sock = sock;
+ p->p_cbs = false;
+ p->p_dialer = d;
+ p->p_listener = l;
+ // Two references - one for our caller, and
+ // one to be dropped when the pipe is closed.
nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy);
nni_atomic_init_bool(&p->p_closed);
@@ -248,19 +278,30 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
+ uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p));
+
+ if (tran_data == NULL) {
+ tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size);
+ }
+ nni_pipe_add(p);
+
+ p->p_tran_data = tran_data;
+ p->p_proto_data = proto_data;
+
nni_mtx_lock(&pipes_lk);
- rv = nni_id_alloc32(&pipes, &p->p_id, p);
+ rv1 = nni_id_alloc32(&pipes, &p->p_id, p);
nni_mtx_unlock(&pipes_lk);
-#ifdef NNG_ENABLE_STATS
+ // must be done before protocol or transports, because
+ // they may add further stats
pipe_stats_init(p);
-#endif
- if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) ||
- ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) {
+ rv2 = tops->p_init(tran_data, p);
+ rv3 = pops->pipe_init(proto_data, p, sock_data);
+ if (rv1 != 0 || rv2 != 0 || rv3 != 0) {
nni_pipe_close(p);
nni_pipe_rele(p);
- return (rv);
+ return (rv1 ? rv1 : rv2 ? rv2 : rv3);
}
*pp = p;
@@ -274,19 +315,9 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data)
nni_sp_tran *tran = d->d_tran;
nni_pipe *p;
- if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) {
+ if ((rv = pipe_create(&p, d->d_sock, tran, d, NULL, tran_data)) != 0) {
return (rv);
}
- p->p_dialer = d;
-#ifdef NNG_ENABLE_STATS
- static const nni_stat_info dialer_info = {
- .si_name = "dialer",
- .si_desc = "dialer for pipe",
- .si_type = NNG_STAT_ID,
- };
- pipe_stat_init(p, &p->st_ep_id, &dialer_info);
- nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d));
-#endif
*pp = p;
return (0);
}
@@ -298,24 +329,44 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data)
nni_sp_tran *tran = l->l_tran;
nni_pipe *p;
- if ((rv = pipe_create(&p, l->l_sock, tran, tran_data)) != 0) {
+ if ((rv = pipe_create(&p, l->l_sock, tran, NULL, l, tran_data)) != 0) {
return (rv);
}
- p->p_listener = l;
-#ifdef NNG_ENABLE_STATS
- static const nni_stat_info listener_info = {
- .si_name = "listener",
- .si_desc = "listener for pipe",
- .si_type = NNG_STAT_ID,
- };
- pipe_stat_init(p, &p->st_ep_id, &listener_info);
- nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l));
-#endif
*pp = p;
return (0);
}
int
+nni_pipe_alloc_dialer(void **datap, nni_dialer *d)
+{
+ int rv;
+ nni_sp_tran *tran = d->d_tran;
+ nni_sock *s = d->d_sock;
+ nni_pipe *p;
+
+ if ((rv = pipe_create(&p, s, tran, d, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ *datap = p->p_tran_data;
+ return (0);
+}
+
+int
+nni_pipe_alloc_listener(void **datap, nni_listener *l)
+{
+ int rv;
+ nni_sp_tran *tran = l->l_tran;
+ nni_sock *s = l->l_sock;
+ nni_pipe *p;
+
+ if ((rv = pipe_create(&p, s, tran, NULL, l, NULL)) != 0) {
+ return (rv);
+ }
+ *datap = p->p_tran_data;
+ return (0);
+}
+
+int
nni_pipe_getopt(
nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
{