aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/dialer.c159
-rw-r--r--src/core/listener.c120
-rw-r--r--src/core/pipe.c169
-rw-r--r--src/core/pipe.h3
-rw-r--r--src/core/socket.c144
-rw-r--r--src/core/sockimpl.h2
6 files changed, 398 insertions, 199 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 2945fded..388d9981 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -19,6 +19,7 @@
// Functionality related to dialing.
static void dialer_connect_start(nni_dialer *);
static void dialer_connect_cb(void *);
+static void dialer_connect_cb_old(void *);
static void dialer_timer_cb(void *);
static nni_id_map dialers = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0);
@@ -33,9 +34,6 @@ nni_dialer_id(nni_dialer *d)
void
nni_dialer_destroy(nni_dialer *d)
{
- nni_aio_stop(&d->d_con_aio);
- nni_aio_stop(&d->d_tmo_aio);
-
nni_aio_fini(&d->d_con_aio);
nni_aio_fini(&d->d_tmo_aio);
@@ -44,7 +42,7 @@ nni_dialer_destroy(nni_dialer *d)
}
nni_mtx_fini(&d->d_mtx);
nni_url_fini(&d->d_url);
- NNI_FREE_STRUCT(d);
+ nni_free(d, NNI_ALIGN_UP(sizeof(*d)) + d->d_ops.d_size);
}
#ifdef NNG_ENABLE_STATS
@@ -54,10 +52,12 @@ dialer_stat_init(nni_dialer *d, nni_stat_item *item, const nni_stat_info *info)
nni_stat_init(item, info);
nni_stat_add(&d->st_root, item);
}
+#endif // NNG_ENABLE_STATS
static void
dialer_stats_init(nni_dialer *d)
{
+#ifdef NNG_ENABLE_STATS
static const nni_stat_info root_info = {
.si_name = "dialer",
.si_desc = "dialer statistics",
@@ -155,13 +155,21 @@ dialer_stats_init(nni_dialer *d)
dialer_stat_init(d, &d->st_auth, &auth_info);
dialer_stat_init(d, &d->st_oom, &oom_info);
dialer_stat_init(d, &d->st_reject, &reject_info);
+#endif // NNG_ENABLE_STATS
+}
+static void
+dialer_register_stats(nni_dialer *d)
+{
+#ifdef NNG_ENABLE_STATS
nni_stat_set_id(&d->st_root, (int) d->d_id);
nni_stat_set_id(&d->st_id, (int) d->d_id);
nni_stat_set_id(&d->st_sock, (int) nni_sock_id(d->d_sock));
nni_stat_register(&d->st_root);
+#else
+ NNI_ARG_UNUSED(d);
+#endif
}
-#endif // NNG_ENABLE_STATS
void
nni_dialer_bump_error(nni_dialer *d, int err)
@@ -207,7 +215,8 @@ nni_dialer_bump_error(nni_dialer *d, int err)
static int
nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran)
{
- int rv;
+ int rv;
+ void *dp;
d->d_closed = false;
d->d_data = NULL;
@@ -226,30 +235,41 @@ nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran)
nni_mtx_init(&d->d_mtx);
- nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
- nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
+ if (d->d_ops.d_size != 0) {
+ d->d_data = ((uint8_t *) d) + NNI_ALIGN_UP(sizeof(*d));
+ dp = d->d_data;
+ } else {
+ // legacy: remove me when transports converted
+ dp = &d->d_data;
+ }
- nni_mtx_lock(&dialers_lk);
- rv = nni_id_alloc32(&dialers, &d->d_id, d);
- nni_mtx_unlock(&dialers_lk);
+ if (tran->tran_pipe->p_size) {
+ nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
+ } else {
+ // legacy: remove me
+ nni_aio_init(&d->d_con_aio, dialer_connect_cb_old, d);
+ }
+ nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
-#ifdef NNG_ENABLE_STATS
dialer_stats_init(d);
-#endif
- if ((rv != 0) ||
- ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) ||
- ((rv = nni_sock_add_dialer(s, d)) != 0)) {
+ rv = d->d_ops.d_init(dp, &d->d_url, d);
+
+ if (rv == 0) {
+ rv = nni_sock_add_dialer(s, d);
+ }
+
+ if (rv == 0) {
nni_mtx_lock(&dialers_lk);
- nni_id_remove(&dialers, d->d_id);
+ rv = nni_id_alloc32(&dialers, &d->d_id, d);
nni_mtx_unlock(&dialers_lk);
-#ifdef NNG_ENABLE_STATS
- nni_stat_unregister(&d->st_root);
-#endif
- return (rv);
}
- return (0);
+ if (rv == 0) {
+ dialer_register_stats(d);
+ }
+
+ return (rv);
}
int
@@ -258,16 +278,18 @@ nni_dialer_create_url(nni_dialer **dp, nni_sock *s, const nng_url *url)
nni_sp_tran *tran;
nni_dialer *d;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) ||
(tran->tran_dialer == NULL)) {
return (NNG_ENOTSUP);
}
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size;
+ if ((d = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_url_clone_inline(&d->d_url, url)) != 0) {
- NNI_FREE_STRUCT(d);
+ nni_free(d, sz);
return (rv);
}
if ((rv = nni_dialer_init(d, s, tran)) != 0) {
@@ -289,59 +311,24 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str)
nni_sp_tran *tran;
nni_dialer *d;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(url_str)) == NULL) ||
(tran->tran_dialer == NULL)) {
return (NNG_ENOTSUP);
}
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size;
+ if ((d = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_url_parse_inline(&d->d_url, url_str)) != 0) {
- NNI_FREE_STRUCT(d);
+ nni_free(d, sz);
return (rv);
}
- d->d_closed = false;
- d->d_data = NULL;
- d->d_ref = 1;
- d->d_sock = s;
- d->d_tran = tran;
- nni_atomic_flag_reset(&d->d_started);
-
- // Make a copy of the endpoint operations. This allows us to
- // modify them (to override NULLs for example), and avoids an extra
- // dereference on hot paths.
- d->d_ops = *tran->tran_dialer;
-
- NNI_LIST_NODE_INIT(&d->d_node);
- NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node);
-
- nni_mtx_init(&d->d_mtx);
-
- nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
- nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);
-
- nni_mtx_lock(&dialers_lk);
- rv = nni_id_alloc32(&dialers, &d->d_id, d);
- nni_mtx_unlock(&dialers_lk);
-
-#ifdef NNG_ENABLE_STATS
- dialer_stats_init(d);
-#endif
-
- if ((rv != 0) ||
- ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) ||
- ((rv = nni_sock_add_dialer(s, d)) != 0)) {
- nni_mtx_lock(&dialers_lk);
- nni_id_remove(&dialers, d->d_id);
- nni_mtx_unlock(&dialers_lk);
-#ifdef NNG_ENABLE_STATS
- nni_stat_unregister(&d->st_root);
-#endif
+ if ((rv = nni_dialer_init(d, s, tran)) != 0) {
nni_dialer_destroy(d);
return (rv);
}
-
*dp = d;
return (0);
}
@@ -438,6 +425,50 @@ dialer_connect_cb(void *arg)
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&d->st_connect, 1);
#endif
+ nni_pipe_start(nni_aio_get_output(aio, 0));
+ break;
+ case NNG_ECLOSED: // No further action.
+ case NNG_ECANCELED: // No further action.
+ nni_dialer_bump_error(d, rv);
+ break;
+ case NNG_ECONNREFUSED:
+ case NNG_ETIMEDOUT:
+ default:
+ nng_log_warn("NNG-CONN-FAIL",
+ "Failed connecting socket<%u>: %s", nni_sock_id(d->d_sock),
+ nng_strerror(rv));
+
+ nni_dialer_bump_error(d, rv);
+ if (user_aio == NULL) {
+ nni_dialer_timer_start(d);
+ } else {
+ nni_atomic_flag_reset(&d->d_started);
+ }
+ break;
+ }
+ if (user_aio != NULL) {
+ nni_aio_finish(user_aio, rv, 0);
+ }
+}
+
+static void
+dialer_connect_cb_old(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_aio *aio = &d->d_con_aio;
+ nni_aio *user_aio;
+ int rv;
+
+ nni_mtx_lock(&d->d_mtx);
+ user_aio = d->d_user_aio;
+ d->d_user_aio = NULL;
+ nni_mtx_unlock(&d->d_mtx);
+
+ switch ((rv = nni_aio_result(aio))) {
+ case 0:
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&d->st_connect, 1);
+#endif
nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
break;
case NNG_ECLOSED: // No further action.
diff --git a/src/core/listener.c b/src/core/listener.c
index 9107e3a4..3dc2a0fa 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -22,6 +22,7 @@
static void listener_accept_start(nni_listener *);
static void listener_accept_cb(void *);
+static void listener_accept_cb_old(void *);
static void listener_timer_cb(void *);
static nni_id_map listeners = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0);
@@ -44,7 +45,7 @@ nni_listener_destroy(nni_listener *l)
l->l_ops.l_fini(l->l_data);
}
nni_url_fini(&l->l_url);
- NNI_FREE_STRUCT(l);
+ nni_free(l, NNI_ALIGN_UP(sizeof(*l)) + l->l_ops.l_size);
}
#ifdef NNG_ENABLE_STATS
@@ -55,10 +56,12 @@ listener_stat_init(
nni_stat_init(item, info);
nni_stat_add(&l->st_root, item);
}
+#endif
static void
listener_stats_init(nni_listener *l)
{
+#ifdef NNG_ENABLE_STATS
static const nni_stat_info root_info = {
.si_name = "listener",
.si_desc = "listener statistics",
@@ -153,9 +156,23 @@ listener_stats_init(nni_listener *l)
nni_stat_set_id(&l->st_root, (int) l->l_id);
nni_stat_set_id(&l->st_id, (int) l->l_id);
nni_stat_set_id(&l->st_sock, (int) nni_sock_id(l->l_sock));
+#else
+ NNI_ARG_UNUSED(l);
+#endif // NNG_ENABLE_STATS
+}
+
+static void
+listener_register_stats(nni_listener *l)
+{
+#ifdef NNG_ENABLE_STATS
+ nni_stat_set_id(&l->st_root, (int) l->l_id);
+ nni_stat_set_id(&l->st_id, (int) l->l_id);
+ nni_stat_set_id(&l->st_sock, (int) nni_sock_id(l->l_sock));
nni_stat_register(&l->st_root);
+#else
+ NNI_ARG_UNUSED(l);
+#endif
}
-#endif // NNG_ENABLE_STATS
void
nni_listener_bump_error(nni_listener *l, int err)
@@ -195,7 +212,8 @@ nni_listener_bump_error(nni_listener *l, int err)
static int
nni_listener_init(nni_listener *l, nni_sock *s, nni_sp_tran *tran)
{
- int rv;
+ int rv;
+ void *lp;
l->l_closed = false;
l->l_data = NULL;
@@ -212,30 +230,40 @@ nni_listener_init(nni_listener *l, nni_sock *s, nni_sp_tran *tran)
NNI_LIST_NODE_INIT(&l->l_node);
NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node);
- nni_aio_init(&l->l_acc_aio, listener_accept_cb, l);
+ if (tran->tran_pipe->p_size) {
+ nni_aio_init(&l->l_acc_aio, listener_accept_cb, l);
+ } else {
+ nni_aio_init(&l->l_acc_aio, listener_accept_cb_old, l);
+ }
nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l);
- nni_mtx_lock(&listeners_lk);
- rv = nni_id_alloc32(&listeners, &l->l_id, l);
- nni_mtx_unlock(&listeners_lk);
-
-#ifdef NNG_ENABLE_STATS
listener_stats_init(l);
-#endif
- if ((rv != 0) ||
- ((rv = l->l_ops.l_init(&l->l_data, &l->l_url, l)) != 0) ||
- ((rv = nni_sock_add_listener(s, l)) != 0)) {
+ if (l->l_ops.l_size != 0) {
+ l->l_data = ((uint8_t *) l) + NNI_ALIGN_UP(sizeof(*l));
+ lp = l->l_data;
+ } else {
+ // legacy: remove me when transports converted
+ lp = &l->l_data;
+ }
+
+ rv = l->l_ops.l_init(lp, &l->l_url, l);
+
+ if (rv == 0) {
+ rv = nni_sock_add_listener(s, l);
+ }
+
+ if (rv == 0) {
nni_mtx_lock(&listeners_lk);
- nni_id_remove(&listeners, l->l_id);
+ rv = nni_id_alloc32(&listeners, &l->l_id, l);
nni_mtx_unlock(&listeners_lk);
-#ifdef NNG_ENABLE_STATS
- nni_stat_unregister(&l->st_root);
-#endif
- return (rv);
}
- return (0);
+ if (rv == 0) {
+ listener_register_stats(l);
+ }
+
+ return (rv);
}
int
@@ -244,16 +272,18 @@ nni_listener_create_url(nni_listener **lp, nni_sock *s, const nng_url *url)
nni_sp_tran *tran;
nni_listener *l;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) ||
(tran->tran_listener == NULL)) {
return (NNG_ENOTSUP);
}
- if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*l)) + tran->tran_listener->l_size;
+ if ((l = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_url_clone_inline(&l->l_url, url)) != 0) {
- NNI_FREE_STRUCT(l);
+ nni_free(l, sz);
return (rv);
}
if ((rv = nni_listener_init(l, s, tran)) != 0) {
@@ -276,16 +306,19 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str)
nni_sp_tran *tran;
nni_listener *l;
int rv;
+ size_t sz;
if (((tran = nni_sp_tran_find(url_str)) == NULL) ||
(tran->tran_listener == NULL)) {
return (NNG_ENOTSUP);
}
- if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ sz = NNI_ALIGN_UP(sizeof(*l)) + tran->tran_listener->l_size;
+ if ((l = nni_zalloc(sz)) == NULL) {
return (NNG_ENOMEM);
}
+
if ((rv = nni_url_parse_inline(&l->l_url, url_str)) != 0) {
- NNI_FREE_STRUCT(l);
+ nni_free(l, sz);
return (rv);
}
if ((rv = nni_listener_init(l, s, tran)) != 0) {
@@ -381,6 +414,47 @@ listener_accept_cb(void *arg)
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&l->st_accept, 1);
#endif
+ nni_pipe_start(nni_aio_get_output(aio, 0));
+ listener_accept_start(l);
+ break;
+ case NNG_ECONNABORTED: // remote condition, no cool down
+ case NNG_ECONNRESET: // remote condition, no cool down
+ case NNG_ETIMEDOUT: // No need to sleep, we timed out already.
+ case NNG_EPEERAUTH: // peer validation failure
+ nng_log_warn("NNG-ACCEPT-FAIL",
+ "Failed accepting for socket<%u>: %s",
+ nni_sock_id(l->l_sock), nng_strerror(rv));
+ nni_listener_bump_error(l, rv);
+ listener_accept_start(l);
+ break;
+ case NNG_ECLOSED: // no further action
+ case NNG_ECANCELED: // no further action
+ nni_listener_bump_error(l, rv);
+ break;
+ default:
+ // We don't really know why we failed, but we back off
+ // here. This is because errors here are probably due
+ // to system failures (resource exhaustion) and we hope
+ // by not thrashing we give the system a chance to
+ // recover. 100 ms is enough to cool down.
+ nni_listener_bump_error(l, rv);
+ nni_sleep_aio(100, &l->l_tmo_aio);
+ break;
+ }
+}
+
+static void
+listener_accept_cb_old(void *arg)
+{
+ nni_listener *l = arg;
+ nni_aio *aio = &l->l_acc_aio;
+ int rv;
+
+ switch ((rv = nni_aio_result(aio))) {
+ case 0:
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&l->st_accept, 1);
+#endif
nni_listener_add_pipe(l, nni_aio_get_output(aio, 0));
listener_accept_start(l);
break;
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 05089fee..eaf49af9 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -38,9 +38,8 @@ pipe_destroy(void *arg)
nni_pipe *p = arg;
p->p_proto_ops.pipe_fini(p->p_proto_data);
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.p_fini(p->p_tran_data);
- }
+ p->p_tran_ops.p_fini(p->p_tran_data);
+
nni_free(p, p->p_size);
}
@@ -49,6 +48,11 @@ pipe_reap(void *arg)
{
nni_pipe *p = arg;
+ p->p_proto_ops.pipe_close(p->p_proto_data);
+
+ // Close the underlying transport.
+ p->p_tran_ops.p_close(p->p_tran_data);
+
nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST);
// Make sure any unlocked holders are done with this.
@@ -62,12 +66,11 @@ pipe_reap(void *arg)
#ifdef NNG_ENABLE_STATS
nni_stat_unregister(&p->st_root);
#endif
- nni_pipe_remove(p);
p->p_proto_ops.pipe_stop(p->p_proto_data);
- if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) {
- p->p_tran_ops.p_stop(p->p_tran_data);
- }
+ p->p_tran_ops.p_stop(p->p_tran_data);
+
+ nni_pipe_remove(p);
nni_pipe_rele(p);
}
@@ -125,13 +128,6 @@ nni_pipe_close(nni_pipe *p)
return; // We already did a close.
}
- p->p_proto_ops.pipe_close(p->p_proto_data);
-
- // Close the underlying transport.
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.p_close(p->p_tran_data);
- }
-
nni_reap(&pipe_reap_list, p);
}
@@ -154,10 +150,12 @@ pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info)
nni_stat_init(item, info);
nni_stat_add(&p->st_root, item);
}
+#endif
static void
pipe_stats_init(nni_pipe *p)
{
+#ifdef NNG_ENABLE_STATS
static const nni_stat_info root_info = {
.si_name = "pipe",
.si_desc = "pipe statistics",
@@ -201,6 +199,16 @@ pipe_stats_init(nni_pipe *p)
.si_unit = NNG_UNIT_BYTES,
.si_atomic = true,
};
+ static const nni_stat_info dialer_info = {
+ .si_name = "dialer",
+ .si_desc = "dialer for pipe",
+ .si_type = NNG_STAT_ID,
+ };
+ static const nni_stat_info listener_info = {
+ .si_name = "listener",
+ .si_desc = "listener for pipe",
+ .si_type = NNG_STAT_ID,
+ };
nni_stat_init(&p->st_root, &root_info);
pipe_stat_init(p, &p->st_id, &id_info);
@@ -213,34 +221,56 @@ pipe_stats_init(nni_pipe *p)
nni_stat_set_id(&p->st_root, (int) p->p_id);
nni_stat_set_id(&p->st_id, (int) p->p_id);
nni_stat_set_id(&p->st_sock_id, (int) nni_sock_id(p->p_sock));
-}
+
+ if (p->p_dialer) {
+ NNI_ASSERT(p->p_listener == NULL);
+ pipe_stat_init(p, &p->st_ep_id, &dialer_info);
+ nni_stat_set_id(
+ &p->st_ep_id, (int) nni_dialer_id(p->p_dialer));
+ }
+ if (p->p_listener) {
+ pipe_stat_init(p, &p->st_ep_id, &listener_info);
+ nni_stat_set_id(
+ &p->st_ep_id, (int) nni_listener_id(p->p_listener));
+ }
+#else
+ NNI_ARG_UNUSED(p);
#endif // NNG_ENABLE_STATS
+}
static int
-pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)
+pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d,
+ nni_listener *l, void *tran_data)
{
- nni_pipe *p;
- int rv;
- void *sock_data = nni_sock_proto_data(sock);
- nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
- size_t sz;
+ nni_pipe *p;
+ int rv1, rv2, rv3;
+ void *sock_data = nni_sock_proto_data(sock);
+ nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
+ const nni_sp_pipe_ops *tops = tran->tran_pipe;
+ size_t sz;
- sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size;
+ sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) +
+ NNI_ALIGN_UP(tops->p_size);
if ((p = nni_zalloc(sz)) == NULL) {
- // In this case we just toss the pipe...
- tran->tran_pipe->p_fini(tran_data);
+ // TODO: remove when all transports converted
+ // to use p_size.
+ if (tran_data != NULL) {
+ tops->p_fini(tran_data);
+ }
return (NNG_ENOMEM);
}
- p->p_size = sz;
- p->p_proto_data = p + 1;
- p->p_tran_ops = *tran->tran_pipe;
- p->p_tran_data = tran_data;
- p->p_proto_ops = *pops;
- p->p_sock = sock;
- p->p_cbs = false;
+ p->p_size = sz;
+ p->p_proto_ops = *pops;
+ p->p_tran_ops = *tops;
+ p->p_sock = sock;
+ p->p_cbs = false;
+ p->p_dialer = d;
+ p->p_listener = l;
+ // Two references - one for our caller, and
+ // one to be dropped when the pipe is closed.
nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy);
nni_atomic_init_bool(&p->p_closed);
@@ -248,19 +278,30 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
+ uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p));
+
+ if (tran_data == NULL) {
+ tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size);
+ }
+ nni_pipe_add(p);
+
+ p->p_tran_data = tran_data;
+ p->p_proto_data = proto_data;
+
nni_mtx_lock(&pipes_lk);
- rv = nni_id_alloc32(&pipes, &p->p_id, p);
+ rv1 = nni_id_alloc32(&pipes, &p->p_id, p);
nni_mtx_unlock(&pipes_lk);
-#ifdef NNG_ENABLE_STATS
+ // must be done before protocol or transports, because
+ // they may add further stats
pipe_stats_init(p);
-#endif
- if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) ||
- ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) {
+ rv2 = tops->p_init(tran_data, p);
+ rv3 = pops->pipe_init(proto_data, p, sock_data);
+ if (rv1 != 0 || rv2 != 0 || rv3 != 0) {
nni_pipe_close(p);
nni_pipe_rele(p);
- return (rv);
+ return (rv1 ? rv1 : rv2 ? rv2 : rv3);
}
*pp = p;
@@ -274,19 +315,9 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data)
nni_sp_tran *tran = d->d_tran;
nni_pipe *p;
- if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) {
+ if ((rv = pipe_create(&p, d->d_sock, tran, d, NULL, tran_data)) != 0) {
return (rv);
}
- p->p_dialer = d;
-#ifdef NNG_ENABLE_STATS
- static const nni_stat_info dialer_info = {
- .si_name = "dialer",
- .si_desc = "dialer for pipe",
- .si_type = NNG_STAT_ID,
- };
- pipe_stat_init(p, &p->st_ep_id, &dialer_info);
- nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d));
-#endif
*pp = p;
return (0);
}
@@ -298,24 +329,44 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data)
nni_sp_tran *tran = l->l_tran;
nni_pipe *p;
- if ((rv = pipe_create(&p, l->l_sock, tran, tran_data)) != 0) {
+ if ((rv = pipe_create(&p, l->l_sock, tran, NULL, l, tran_data)) != 0) {
return (rv);
}
- p->p_listener = l;
-#ifdef NNG_ENABLE_STATS
- static const nni_stat_info listener_info = {
- .si_name = "listener",
- .si_desc = "listener for pipe",
- .si_type = NNG_STAT_ID,
- };
- pipe_stat_init(p, &p->st_ep_id, &listener_info);
- nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l));
-#endif
*pp = p;
return (0);
}
int
+nni_pipe_alloc_dialer(void **datap, nni_dialer *d)
+{
+ int rv;
+ nni_sp_tran *tran = d->d_tran;
+ nni_sock *s = d->d_sock;
+ nni_pipe *p;
+
+ if ((rv = pipe_create(&p, s, tran, d, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ *datap = p->p_tran_data;
+ return (0);
+}
+
+int
+nni_pipe_alloc_listener(void **datap, nni_listener *l)
+{
+ int rv;
+ nni_sp_tran *tran = l->l_tran;
+ nni_sock *s = l->l_sock;
+ nni_pipe *p;
+
+ if ((rv = pipe_create(&p, s, tran, NULL, l, NULL)) != 0) {
+ return (rv);
+ }
+ *datap = p->p_tran_data;
+ return (0);
+}
+
+int
nni_pipe_getopt(
nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
{
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 9e7109de..71e07a9f 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -63,4 +63,7 @@ extern void nni_pipe_bump_error(nni_pipe *, int);
extern char *nni_pipe_peer_addr(nni_pipe *p, char buf[NNG_MAXADDRSTRLEN]);
+extern int nni_pipe_alloc_dialer(void **, nni_dialer *);
+extern int nni_pipe_alloc_listener(void **, nni_listener *);
+
#endif // CORE_PIPE_H
diff --git a/src/core/socket.c b/src/core/socket.c
index fe1b62d2..ec2691a3 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -645,6 +645,13 @@ nni_sock_shutdown(nni_sock *sock)
nni_dialer_close(d);
nni_mtx_lock(&sock->s_mx);
}
+
+ // For each pipe, arrange for it to teardown hard. We would
+ // expect there not to be any here.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+
nni_mtx_unlock(&sock->s_mx);
// Close the upper queues immediately.
@@ -679,30 +686,15 @@ nni_sock_shutdown(nni_sock *sock)
nni_mtx_unlock(&sock_lk);
nni_mtx_lock(&sock->s_mx);
-
- // At this point, we've done everything we politely can to
- // give the protocol a chance to flush its write side. Now
- // it is time to be a little more insistent.
-
- // For each pipe, arrange for it to teardown hard. We would
- // expect there not to be any here.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_close(pipe);
- }
-
// We have to wait for pipes to be removed.
while (!nni_list_empty(&sock->s_pipes)) {
nni_cv_wait(&sock->s_cv);
}
-
- sock->s_sock_ops.sock_close(sock->s_data);
-
- nni_cv_wake(&sock->s_cv);
-
NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL);
-
nni_mtx_unlock(&sock->s_mx);
+ sock->s_sock_ops.sock_close(sock->s_data);
+
// At this point, there are no threads blocked inside of us
// that are referencing socket state. User code should call
// nng_close to release the last resources.
@@ -1277,24 +1269,16 @@ nni_dialer_timer_start(nni_dialer *d)
nni_mtx_unlock(&s->s_mx);
}
-void
-nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
+static void
+dialer_start_pipe(nni_dialer *d, nni_pipe *p)
{
nni_sock *s = d->d_sock;
- nni_pipe *p;
nni_mtx_lock(&s->s_mx);
-
- if (nni_pipe_create_dialer(&p, d, tpipe) != 0) {
- nni_mtx_unlock(&s->s_mx);
- return;
- }
-
- nni_list_append(&d->d_pipes, p);
- nni_list_append(&s->s_pipes, p);
d->d_pipe = p;
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);
+
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&s->st_pipes, 1);
nni_stat_inc(&d->st_pipes, 1);
@@ -1342,6 +1326,24 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
}
void
+nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
+{
+ nni_sock *s = d->d_sock;
+ nni_pipe *p;
+
+ if (nni_pipe_create_dialer(&p, d, tpipe) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&s->s_mx);
+ d->d_pipe = p;
+ d->d_currtime = d->d_inirtime;
+ nni_mtx_unlock(&s->s_mx);
+
+ dialer_start_pipe(d, p);
+}
+
+void
nni_dialer_shutdown(nni_dialer *d)
{
nni_sock *s = d->d_sock;
@@ -1403,21 +1405,11 @@ nni_dialer_reap(nni_dialer *d)
nni_reap(&dialer_reap_list, d);
}
-void
-nni_listener_add_pipe(nni_listener *l, void *tpipe)
+static void
+listener_start_pipe(nni_listener *l, nni_pipe *p)
{
nni_sock *s = l->l_sock;
- nni_pipe *p;
-
- nni_mtx_lock(&s->s_mx);
- if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
- nni_mtx_unlock(&s->s_mx);
- return;
- }
- nni_list_append(&l->l_pipes, p);
- nni_list_append(&s->s_pipes, p);
- nni_mtx_unlock(&s->s_mx);
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&l->st_pipes, 1);
nni_stat_inc(&s->st_pipes, 1);
@@ -1454,10 +1446,25 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
"Accepted pipe<%u> on socket<%u> from %s", nni_pipe_id(p),
nni_sock_id(s), nni_pipe_peer_addr(p, addr));
}
+
+ // the socket now "owns" the pipe, and a pipe close should immediately
+ // start the process of teardown.
nni_pipe_rele(p);
}
void
+nni_listener_add_pipe(nni_listener *l, void *tpipe)
+{
+ nni_pipe *p;
+
+ if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
+ return;
+ }
+
+ listener_start_pipe(l, p);
+}
+
+void
nni_listener_shutdown(nni_listener *l)
{
nni_sock *s = l->l_sock;
@@ -1520,6 +1527,44 @@ nni_listener_reap(nni_listener *l)
nni_reap(&listener_reap_list, l);
}
+// nni_pipe_add just registers the pipe with the socket and endpoint
+// so they won't be deallocated while the pipe still exists.
+void
+nni_pipe_add(nni_pipe *p)
+{
+ nni_sock *s = p->p_sock;
+ nni_dialer *d = p->p_dialer;
+ nni_listener *l = p->p_listener;
+
+ nni_mtx_lock(&s->s_mx);
+ nni_list_append(&s->s_pipes, p);
+ if (d != NULL) {
+ NNI_ASSERT(l == NULL);
+ nni_list_append(&d->d_pipes, p);
+ }
+ if (l != NULL) {
+ NNI_ASSERT(d == NULL);
+ nni_list_append(&l->l_pipes, p);
+ }
+ nni_mtx_unlock(&s->s_mx);
+}
+
+// nni_pipe_start attempts to start the pipe, adding it to the socket and
+// endpoints and calling callbacks, etc. The pipe should already have finished
+// any negotiation needed at the transport layer.
+void
+nni_pipe_start(nni_pipe *p)
+{
+ if (p->p_listener) {
+ NNI_ASSERT(p->p_dialer == NULL);
+ listener_start_pipe(p->p_listener, p);
+ }
+ if (p->p_dialer) {
+ NNI_ASSERT(p->p_listener == NULL);
+ dialer_start_pipe(p->p_dialer, p);
+ }
+}
+
void
nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
{
@@ -1528,14 +1573,11 @@ nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
void *arg;
nni_mtx_lock(&s->s_pipe_cbs_mtx);
- if (!p->p_cbs) {
- if (ev == NNG_PIPE_EV_ADD_PRE) {
- // First event, after this we want all other events.
- p->p_cbs = true;
- } else {
- nni_mtx_unlock(&s->s_pipe_cbs_mtx);
- return;
- }
+ if (ev == NNG_PIPE_EV_ADD_PRE) {
+ p->p_cbs = true;
+ } else if (!p->p_cbs) {
+ nni_mtx_unlock(&s->s_pipe_cbs_mtx);
+ return;
}
cb = s->s_pipe_cbs[ev].cb_fn;
arg = s->s_pipe_cbs[ev].cb_arg;
@@ -1556,9 +1598,7 @@ nni_pipe_remove(nni_pipe *p)
nni_mtx_lock(&s->s_mx);
#ifdef NNG_ENABLE_STATS
- if (nni_list_node_active(&p->p_sock_node)) {
- nni_stat_dec(&s->st_pipes, 1);
- }
+ nni_stat_dec(&s->st_pipes, 1);
if (p->p_listener != NULL) {
nni_stat_dec(&p->p_listener->st_pipes, 1);
}
@@ -1568,8 +1608,6 @@ nni_pipe_remove(nni_pipe *p)
#endif
nni_list_node_remove(&p->p_sock_node);
nni_list_node_remove(&p->p_ep_node);
- p->p_listener = NULL;
- p->p_dialer = NULL;
if ((d != NULL) && (d->d_pipe == p)) {
d->d_pipe = NULL;
dialer_timer_start_locked(d); // Kick the timer to redial.
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index 801ef7b1..aeaa369b 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -132,12 +132,14 @@ extern void nni_dialer_destroy(nni_dialer *);
extern void nni_dialer_timer_start(nni_dialer *);
extern void nni_dialer_stop(nni_dialer *);
+extern void nni_listener_start_pipe(nni_listener *, nni_pipe *);
extern void nni_listener_add_pipe(nni_listener *, void *);
extern void nni_listener_shutdown(nni_listener *);
extern void nni_listener_reap(nni_listener *);
extern void nni_listener_destroy(nni_listener *);
extern void nni_listener_stop(nni_listener *);
+extern void nni_pipe_add(nni_pipe *);
extern void nni_pipe_remove(nni_pipe *);
extern bool nni_pipe_is_closed(nni_pipe *);
extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev);