diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/dialer.c | 159 | ||||
| -rw-r--r-- | src/core/listener.c | 120 | ||||
| -rw-r--r-- | src/core/pipe.c | 169 | ||||
| -rw-r--r-- | src/core/pipe.h | 3 | ||||
| -rw-r--r-- | src/core/socket.c | 144 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 2 | ||||
| -rw-r--r-- | src/sp/transport.h | 22 |
7 files changed, 417 insertions, 202 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c index 2945fded..388d9981 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -19,6 +19,7 @@ // Functionality related to dialing. static void dialer_connect_start(nni_dialer *); static void dialer_connect_cb(void *); +static void dialer_connect_cb_old(void *); static void dialer_timer_cb(void *); static nni_id_map dialers = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0); @@ -33,9 +34,6 @@ nni_dialer_id(nni_dialer *d) void nni_dialer_destroy(nni_dialer *d) { - nni_aio_stop(&d->d_con_aio); - nni_aio_stop(&d->d_tmo_aio); - nni_aio_fini(&d->d_con_aio); nni_aio_fini(&d->d_tmo_aio); @@ -44,7 +42,7 @@ nni_dialer_destroy(nni_dialer *d) } nni_mtx_fini(&d->d_mtx); nni_url_fini(&d->d_url); - NNI_FREE_STRUCT(d); + nni_free(d, NNI_ALIGN_UP(sizeof(*d)) + d->d_ops.d_size); } #ifdef NNG_ENABLE_STATS @@ -54,10 +52,12 @@ dialer_stat_init(nni_dialer *d, nni_stat_item *item, const nni_stat_info *info) nni_stat_init(item, info); nni_stat_add(&d->st_root, item); } +#endif // NNG_ENABLE_STATS static void dialer_stats_init(nni_dialer *d) { +#ifdef NNG_ENABLE_STATS static const nni_stat_info root_info = { .si_name = "dialer", .si_desc = "dialer statistics", @@ -155,13 +155,21 @@ dialer_stats_init(nni_dialer *d) dialer_stat_init(d, &d->st_auth, &auth_info); dialer_stat_init(d, &d->st_oom, &oom_info); dialer_stat_init(d, &d->st_reject, &reject_info); +#endif // NNG_ENABLE_STATS +} +static void +dialer_register_stats(nni_dialer *d) +{ +#ifdef NNG_ENABLE_STATS nni_stat_set_id(&d->st_root, (int) d->d_id); nni_stat_set_id(&d->st_id, (int) d->d_id); nni_stat_set_id(&d->st_sock, (int) nni_sock_id(d->d_sock)); nni_stat_register(&d->st_root); +#else + NNI_ARG_UNUSED(d); +#endif } -#endif // NNG_ENABLE_STATS void nni_dialer_bump_error(nni_dialer *d, int err) @@ -207,7 +215,8 @@ nni_dialer_bump_error(nni_dialer *d, int err) static int nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran) { - int rv; + int rv; + void *dp; d->d_closed = false; d->d_data = NULL; @@ -226,30 +235,41 @@ nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran) nni_mtx_init(&d->d_mtx); - nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); - nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); + if (d->d_ops.d_size != 0) { + d->d_data = ((uint8_t *) d) + NNI_ALIGN_UP(sizeof(*d)); + dp = d->d_data; + } else { + // legacy: remove me when transports converted + dp = &d->d_data; + } - nni_mtx_lock(&dialers_lk); - rv = nni_id_alloc32(&dialers, &d->d_id, d); - nni_mtx_unlock(&dialers_lk); + if (tran->tran_pipe->p_size) { + nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); + } else { + // legacy: remove me + nni_aio_init(&d->d_con_aio, dialer_connect_cb_old, d); + } + nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); -#ifdef NNG_ENABLE_STATS dialer_stats_init(d); -#endif - if ((rv != 0) || - ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) || - ((rv = nni_sock_add_dialer(s, d)) != 0)) { + rv = d->d_ops.d_init(dp, &d->d_url, d); + + if (rv == 0) { + rv = nni_sock_add_dialer(s, d); + } + + if (rv == 0) { nni_mtx_lock(&dialers_lk); - nni_id_remove(&dialers, d->d_id); + rv = nni_id_alloc32(&dialers, &d->d_id, d); nni_mtx_unlock(&dialers_lk); -#ifdef NNG_ENABLE_STATS - nni_stat_unregister(&d->st_root); -#endif - return (rv); } - return (0); + if (rv == 0) { + dialer_register_stats(d); + } + + return (rv); } int @@ -258,16 +278,18 @@ nni_dialer_create_url(nni_dialer **dp, nni_sock *s, const nng_url *url) nni_sp_tran *tran; nni_dialer *d; int rv; + size_t sz; if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) || (tran->tran_dialer == NULL)) { return (NNG_ENOTSUP); } - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size; + if ((d = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } if ((rv = nni_url_clone_inline(&d->d_url, url)) != 0) { - NNI_FREE_STRUCT(d); + nni_free(d, sz); return (rv); } if ((rv = nni_dialer_init(d, s, tran)) != 0) { @@ -289,59 +311,24 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str) nni_sp_tran *tran; nni_dialer *d; int rv; + size_t sz; if (((tran = nni_sp_tran_find(url_str)) == NULL) || (tran->tran_dialer == NULL)) { return (NNG_ENOTSUP); } - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*d)) + tran->tran_dialer->d_size; + if ((d = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } if ((rv = nni_url_parse_inline(&d->d_url, url_str)) != 0) { - NNI_FREE_STRUCT(d); + nni_free(d, sz); return (rv); } - d->d_closed = false; - d->d_data = NULL; - d->d_ref = 1; - d->d_sock = s; - d->d_tran = tran; - nni_atomic_flag_reset(&d->d_started); - - // Make a copy of the endpoint operations. This allows us to - // modify them (to override NULLs for example), and avoids an extra - // dereference on hot paths. - d->d_ops = *tran->tran_dialer; - - NNI_LIST_NODE_INIT(&d->d_node); - NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node); - - nni_mtx_init(&d->d_mtx); - - nni_aio_init(&d->d_con_aio, dialer_connect_cb, d); - nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d); - - nni_mtx_lock(&dialers_lk); - rv = nni_id_alloc32(&dialers, &d->d_id, d); - nni_mtx_unlock(&dialers_lk); - -#ifdef NNG_ENABLE_STATS - dialer_stats_init(d); -#endif - - if ((rv != 0) || - ((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) || - ((rv = nni_sock_add_dialer(s, d)) != 0)) { - nni_mtx_lock(&dialers_lk); - nni_id_remove(&dialers, d->d_id); - nni_mtx_unlock(&dialers_lk); -#ifdef NNG_ENABLE_STATS - nni_stat_unregister(&d->st_root); -#endif + if ((rv = nni_dialer_init(d, s, tran)) != 0) { nni_dialer_destroy(d); return (rv); } - *dp = d; return (0); } @@ -438,6 +425,50 @@ dialer_connect_cb(void *arg) #ifdef NNG_ENABLE_STATS nni_stat_inc(&d->st_connect, 1); #endif + nni_pipe_start(nni_aio_get_output(aio, 0)); + break; + case NNG_ECLOSED: // No further action. + case NNG_ECANCELED: // No further action. + nni_dialer_bump_error(d, rv); + break; + case NNG_ECONNREFUSED: + case NNG_ETIMEDOUT: + default: + nng_log_warn("NNG-CONN-FAIL", + "Failed connecting socket<%u>: %s", nni_sock_id(d->d_sock), + nng_strerror(rv)); + + nni_dialer_bump_error(d, rv); + if (user_aio == NULL) { + nni_dialer_timer_start(d); + } else { + nni_atomic_flag_reset(&d->d_started); + } + break; + } + if (user_aio != NULL) { + nni_aio_finish(user_aio, rv, 0); + } +} + +static void +dialer_connect_cb_old(void *arg) +{ + nni_dialer *d = arg; + nni_aio *aio = &d->d_con_aio; + nni_aio *user_aio; + int rv; + + nni_mtx_lock(&d->d_mtx); + user_aio = d->d_user_aio; + d->d_user_aio = NULL; + nni_mtx_unlock(&d->d_mtx); + + switch ((rv = nni_aio_result(aio))) { + case 0: +#ifdef NNG_ENABLE_STATS + nni_stat_inc(&d->st_connect, 1); +#endif nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0)); break; case NNG_ECLOSED: // No further action. diff --git a/src/core/listener.c b/src/core/listener.c index 9107e3a4..3dc2a0fa 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -22,6 +22,7 @@ static void listener_accept_start(nni_listener *); static void listener_accept_cb(void *); +static void listener_accept_cb_old(void *); static void listener_timer_cb(void *); static nni_id_map listeners = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0); @@ -44,7 +45,7 @@ nni_listener_destroy(nni_listener *l) l->l_ops.l_fini(l->l_data); } nni_url_fini(&l->l_url); - NNI_FREE_STRUCT(l); + nni_free(l, NNI_ALIGN_UP(sizeof(*l)) + l->l_ops.l_size); } #ifdef NNG_ENABLE_STATS @@ -55,10 +56,12 @@ listener_stat_init( nni_stat_init(item, info); nni_stat_add(&l->st_root, item); } +#endif static void listener_stats_init(nni_listener *l) { +#ifdef NNG_ENABLE_STATS static const nni_stat_info root_info = { .si_name = "listener", .si_desc = "listener statistics", @@ -153,9 +156,23 @@ listener_stats_init(nni_listener *l) nni_stat_set_id(&l->st_root, (int) l->l_id); nni_stat_set_id(&l->st_id, (int) l->l_id); nni_stat_set_id(&l->st_sock, (int) nni_sock_id(l->l_sock)); +#else + NNI_ARG_UNUSED(l); +#endif // NNG_ENABLE_STATS +} + +static void +listener_register_stats(nni_listener *l) +{ +#ifdef NNG_ENABLE_STATS + nni_stat_set_id(&l->st_root, (int) l->l_id); + nni_stat_set_id(&l->st_id, (int) l->l_id); + nni_stat_set_id(&l->st_sock, (int) nni_sock_id(l->l_sock)); nni_stat_register(&l->st_root); +#else + NNI_ARG_UNUSED(l); +#endif } -#endif // NNG_ENABLE_STATS void nni_listener_bump_error(nni_listener *l, int err) @@ -195,7 +212,8 @@ nni_listener_bump_error(nni_listener *l, int err) static int nni_listener_init(nni_listener *l, nni_sock *s, nni_sp_tran *tran) { - int rv; + int rv; + void *lp; l->l_closed = false; l->l_data = NULL; @@ -212,30 +230,40 @@ nni_listener_init(nni_listener *l, nni_sock *s, nni_sp_tran *tran) NNI_LIST_NODE_INIT(&l->l_node); NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node); - nni_aio_init(&l->l_acc_aio, listener_accept_cb, l); + if (tran->tran_pipe->p_size) { + nni_aio_init(&l->l_acc_aio, listener_accept_cb, l); + } else { + nni_aio_init(&l->l_acc_aio, listener_accept_cb_old, l); + } nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l); - nni_mtx_lock(&listeners_lk); - rv = nni_id_alloc32(&listeners, &l->l_id, l); - nni_mtx_unlock(&listeners_lk); - -#ifdef NNG_ENABLE_STATS listener_stats_init(l); -#endif - if ((rv != 0) || - ((rv = l->l_ops.l_init(&l->l_data, &l->l_url, l)) != 0) || - ((rv = nni_sock_add_listener(s, l)) != 0)) { + if (l->l_ops.l_size != 0) { + l->l_data = ((uint8_t *) l) + NNI_ALIGN_UP(sizeof(*l)); + lp = l->l_data; + } else { + // legacy: remove me when transports converted + lp = &l->l_data; + } + + rv = l->l_ops.l_init(lp, &l->l_url, l); + + if (rv == 0) { + rv = nni_sock_add_listener(s, l); + } + + if (rv == 0) { nni_mtx_lock(&listeners_lk); - nni_id_remove(&listeners, l->l_id); + rv = nni_id_alloc32(&listeners, &l->l_id, l); nni_mtx_unlock(&listeners_lk); -#ifdef NNG_ENABLE_STATS - nni_stat_unregister(&l->st_root); -#endif - return (rv); } - return (0); + if (rv == 0) { + listener_register_stats(l); + } + + return (rv); } int @@ -244,16 +272,18 @@ nni_listener_create_url(nni_listener **lp, nni_sock *s, const nng_url *url) nni_sp_tran *tran; nni_listener *l; int rv; + size_t sz; if (((tran = nni_sp_tran_find(nng_url_scheme(url))) == NULL) || (tran->tran_listener == NULL)) { return (NNG_ENOTSUP); } - if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*l)) + tran->tran_listener->l_size; + if ((l = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } if ((rv = nni_url_clone_inline(&l->l_url, url)) != 0) { - NNI_FREE_STRUCT(l); + nni_free(l, sz); return (rv); } if ((rv = nni_listener_init(l, s, tran)) != 0) { @@ -276,16 +306,19 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str) nni_sp_tran *tran; nni_listener *l; int rv; + size_t sz; if (((tran = nni_sp_tran_find(url_str)) == NULL) || (tran->tran_listener == NULL)) { return (NNG_ENOTSUP); } - if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + sz = NNI_ALIGN_UP(sizeof(*l)) + tran->tran_listener->l_size; + if ((l = nni_zalloc(sz)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_url_parse_inline(&l->l_url, url_str)) != 0) { - NNI_FREE_STRUCT(l); + nni_free(l, sz); return (rv); } if ((rv = nni_listener_init(l, s, tran)) != 0) { @@ -381,6 +414,47 @@ listener_accept_cb(void *arg) #ifdef NNG_ENABLE_STATS nni_stat_inc(&l->st_accept, 1); #endif + nni_pipe_start(nni_aio_get_output(aio, 0)); + listener_accept_start(l); + break; + case NNG_ECONNABORTED: // remote condition, no cool down + case NNG_ECONNRESET: // remote condition, no cool down + case NNG_ETIMEDOUT: // No need to sleep, we timed out already. + case NNG_EPEERAUTH: // peer validation failure + nng_log_warn("NNG-ACCEPT-FAIL", + "Failed accepting for socket<%u>: %s", + nni_sock_id(l->l_sock), nng_strerror(rv)); + nni_listener_bump_error(l, rv); + listener_accept_start(l); + break; + case NNG_ECLOSED: // no further action + case NNG_ECANCELED: // no further action + nni_listener_bump_error(l, rv); + break; + default: + // We don't really know why we failed, but we back off + // here. This is because errors here are probably due + // to system failures (resource exhaustion) and we hope + // by not thrashing we give the system a chance to + // recover. 100 ms is enough to cool down. + nni_listener_bump_error(l, rv); + nni_sleep_aio(100, &l->l_tmo_aio); + break; + } +} + +static void +listener_accept_cb_old(void *arg) +{ + nni_listener *l = arg; + nni_aio *aio = &l->l_acc_aio; + int rv; + + switch ((rv = nni_aio_result(aio))) { + case 0: +#ifdef NNG_ENABLE_STATS + nni_stat_inc(&l->st_accept, 1); +#endif nni_listener_add_pipe(l, nni_aio_get_output(aio, 0)); listener_accept_start(l); break; diff --git a/src/core/pipe.c b/src/core/pipe.c index 05089fee..eaf49af9 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -38,9 +38,8 @@ pipe_destroy(void *arg) nni_pipe *p = arg; p->p_proto_ops.pipe_fini(p->p_proto_data); - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_fini(p->p_tran_data); - } + p->p_tran_ops.p_fini(p->p_tran_data); + nni_free(p, p->p_size); } @@ -49,6 +48,11 @@ pipe_reap(void *arg) { nni_pipe *p = arg; + p->p_proto_ops.pipe_close(p->p_proto_data); + + // Close the underlying transport. + p->p_tran_ops.p_close(p->p_tran_data); + nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST); // Make sure any unlocked holders are done with this. @@ -62,12 +66,11 @@ pipe_reap(void *arg) #ifdef NNG_ENABLE_STATS nni_stat_unregister(&p->st_root); #endif - nni_pipe_remove(p); p->p_proto_ops.pipe_stop(p->p_proto_data); - if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { - p->p_tran_ops.p_stop(p->p_tran_data); - } + p->p_tran_ops.p_stop(p->p_tran_data); + + nni_pipe_remove(p); nni_pipe_rele(p); } @@ -125,13 +128,6 @@ nni_pipe_close(nni_pipe *p) return; // We already did a close. } - p->p_proto_ops.pipe_close(p->p_proto_data); - - // Close the underlying transport. - if (p->p_tran_data != NULL) { - p->p_tran_ops.p_close(p->p_tran_data); - } - nni_reap(&pipe_reap_list, p); } @@ -154,10 +150,12 @@ pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info) nni_stat_init(item, info); nni_stat_add(&p->st_root, item); } +#endif static void pipe_stats_init(nni_pipe *p) { +#ifdef NNG_ENABLE_STATS static const nni_stat_info root_info = { .si_name = "pipe", .si_desc = "pipe statistics", @@ -201,6 +199,16 @@ pipe_stats_init(nni_pipe *p) .si_unit = NNG_UNIT_BYTES, .si_atomic = true, }; + static const nni_stat_info dialer_info = { + .si_name = "dialer", + .si_desc = "dialer for pipe", + .si_type = NNG_STAT_ID, + }; + static const nni_stat_info listener_info = { + .si_name = "listener", + .si_desc = "listener for pipe", + .si_type = NNG_STAT_ID, + }; nni_stat_init(&p->st_root, &root_info); pipe_stat_init(p, &p->st_id, &id_info); @@ -213,34 +221,56 @@ pipe_stats_init(nni_pipe *p) nni_stat_set_id(&p->st_root, (int) p->p_id); nni_stat_set_id(&p->st_id, (int) p->p_id); nni_stat_set_id(&p->st_sock_id, (int) nni_sock_id(p->p_sock)); -} + + if (p->p_dialer) { + NNI_ASSERT(p->p_listener == NULL); + pipe_stat_init(p, &p->st_ep_id, &dialer_info); + nni_stat_set_id( + &p->st_ep_id, (int) nni_dialer_id(p->p_dialer)); + } + if (p->p_listener) { + pipe_stat_init(p, &p->st_ep_id, &listener_info); + nni_stat_set_id( + &p->st_ep_id, (int) nni_listener_id(p->p_listener)); + } +#else + NNI_ARG_UNUSED(p); #endif // NNG_ENABLE_STATS +} static int -pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) +pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d, + nni_listener *l, void *tran_data) { - nni_pipe *p; - int rv; - void *sock_data = nni_sock_proto_data(sock); - nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); - size_t sz; + nni_pipe *p; + int rv1, rv2, rv3; + void *sock_data = nni_sock_proto_data(sock); + nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); + const nni_sp_pipe_ops *tops = tran->tran_pipe; + size_t sz; - sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size; + sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) + + NNI_ALIGN_UP(tops->p_size); if ((p = nni_zalloc(sz)) == NULL) { - // In this case we just toss the pipe... - tran->tran_pipe->p_fini(tran_data); + // TODO: remove when all transports converted + // to use p_size. + if (tran_data != NULL) { + tops->p_fini(tran_data); + } return (NNG_ENOMEM); } - p->p_size = sz; - p->p_proto_data = p + 1; - p->p_tran_ops = *tran->tran_pipe; - p->p_tran_data = tran_data; - p->p_proto_ops = *pops; - p->p_sock = sock; - p->p_cbs = false; + p->p_size = sz; + p->p_proto_ops = *pops; + p->p_tran_ops = *tops; + p->p_sock = sock; + p->p_cbs = false; + p->p_dialer = d; + p->p_listener = l; + // Two references - one for our caller, and + // one to be dropped when the pipe is closed. nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy); nni_atomic_init_bool(&p->p_closed); @@ -248,19 +278,30 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); + uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p)); + + if (tran_data == NULL) { + tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size); + } + nni_pipe_add(p); + + p->p_tran_data = tran_data; + p->p_proto_data = proto_data; + nni_mtx_lock(&pipes_lk); - rv = nni_id_alloc32(&pipes, &p->p_id, p); + rv1 = nni_id_alloc32(&pipes, &p->p_id, p); nni_mtx_unlock(&pipes_lk); -#ifdef NNG_ENABLE_STATS + // must be done before protocol or transports, because + // they may add further stats pipe_stats_init(p); -#endif - if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) || - ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) { + rv2 = tops->p_init(tran_data, p); + rv3 = pops->pipe_init(proto_data, p, sock_data); + if (rv1 != 0 || rv2 != 0 || rv3 != 0) { nni_pipe_close(p); nni_pipe_rele(p); - return (rv); + return (rv1 ? rv1 : rv2 ? rv2 : rv3); } *pp = p; @@ -274,19 +315,9 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data) nni_sp_tran *tran = d->d_tran; nni_pipe *p; - if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) { + if ((rv = pipe_create(&p, d->d_sock, tran, d, NULL, tran_data)) != 0) { return (rv); } - p->p_dialer = d; -#ifdef NNG_ENABLE_STATS - static const nni_stat_info dialer_info = { - .si_name = "dialer", - .si_desc = "dialer for pipe", - .si_type = NNG_STAT_ID, - }; - pipe_stat_init(p, &p->st_ep_id, &dialer_info); - nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d)); -#endif *pp = p; return (0); } @@ -298,24 +329,44 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data) nni_sp_tran *tran = l->l_tran; nni_pipe *p; - if ((rv = pipe_create(&p, l->l_sock, tran, tran_data)) != 0) { + if ((rv = pipe_create(&p, l->l_sock, tran, NULL, l, tran_data)) != 0) { return (rv); } - p->p_listener = l; -#ifdef NNG_ENABLE_STATS - static const nni_stat_info listener_info = { - .si_name = "listener", - .si_desc = "listener for pipe", - .si_type = NNG_STAT_ID, - }; - pipe_stat_init(p, &p->st_ep_id, &listener_info); - nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l)); -#endif *pp = p; return (0); } int +nni_pipe_alloc_dialer(void **datap, nni_dialer *d) +{ + int rv; + nni_sp_tran *tran = d->d_tran; + nni_sock *s = d->d_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, d, NULL, NULL)) != 0) { + return (rv); + } + *datap = p->p_tran_data; + return (0); +} + +int +nni_pipe_alloc_listener(void **datap, nni_listener *l) +{ + int rv; + nni_sp_tran *tran = l->l_tran; + nni_sock *s = l->l_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, NULL, l, NULL)) != 0) { + return (rv); + } + *datap = p->p_tran_data; + return (0); +} + +int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) { diff --git a/src/core/pipe.h b/src/core/pipe.h index 9e7109de..71e07a9f 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -63,4 +63,7 @@ extern void nni_pipe_bump_error(nni_pipe *, int); extern char *nni_pipe_peer_addr(nni_pipe *p, char buf[NNG_MAXADDRSTRLEN]); +extern int nni_pipe_alloc_dialer(void **, nni_dialer *); +extern int nni_pipe_alloc_listener(void **, nni_listener *); + #endif // CORE_PIPE_H diff --git a/src/core/socket.c b/src/core/socket.c index fe1b62d2..ec2691a3 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -645,6 +645,13 @@ nni_sock_shutdown(nni_sock *sock) nni_dialer_close(d); nni_mtx_lock(&sock->s_mx); } + + // For each pipe, arrange for it to teardown hard. We would + // expect there not to be any here. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_close(pipe); + } + nni_mtx_unlock(&sock->s_mx); // Close the upper queues immediately. @@ -679,30 +686,15 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_unlock(&sock_lk); nni_mtx_lock(&sock->s_mx); - - // At this point, we've done everything we politely can to - // give the protocol a chance to flush its write side. Now - // it is time to be a little more insistent. - - // For each pipe, arrange for it to teardown hard. We would - // expect there not to be any here. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_close(pipe); - } - // We have to wait for pipes to be removed. while (!nni_list_empty(&sock->s_pipes)) { nni_cv_wait(&sock->s_cv); } - - sock->s_sock_ops.sock_close(sock->s_data); - - nni_cv_wake(&sock->s_cv); - NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL); - nni_mtx_unlock(&sock->s_mx); + sock->s_sock_ops.sock_close(sock->s_data); + // At this point, there are no threads blocked inside of us // that are referencing socket state. User code should call // nng_close to release the last resources. @@ -1277,24 +1269,16 @@ nni_dialer_timer_start(nni_dialer *d) nni_mtx_unlock(&s->s_mx); } -void -nni_dialer_add_pipe(nni_dialer *d, void *tpipe) +static void +dialer_start_pipe(nni_dialer *d, nni_pipe *p) { nni_sock *s = d->d_sock; - nni_pipe *p; nni_mtx_lock(&s->s_mx); - - if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; - } - - nni_list_append(&d->d_pipes, p); - nni_list_append(&s->s_pipes, p); d->d_pipe = p; d->d_currtime = d->d_inirtime; nni_mtx_unlock(&s->s_mx); + #ifdef NNG_ENABLE_STATS nni_stat_inc(&s->st_pipes, 1); nni_stat_inc(&d->st_pipes, 1); @@ -1342,6 +1326,24 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) } void +nni_dialer_add_pipe(nni_dialer *d, void *tpipe) +{ + nni_sock *s = d->d_sock; + nni_pipe *p; + + if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { + return; + } + + nni_mtx_lock(&s->s_mx); + d->d_pipe = p; + d->d_currtime = d->d_inirtime; + nni_mtx_unlock(&s->s_mx); + + dialer_start_pipe(d, p); +} + +void nni_dialer_shutdown(nni_dialer *d) { nni_sock *s = d->d_sock; @@ -1403,21 +1405,11 @@ nni_dialer_reap(nni_dialer *d) nni_reap(&dialer_reap_list, d); } -void -nni_listener_add_pipe(nni_listener *l, void *tpipe) +static void +listener_start_pipe(nni_listener *l, nni_pipe *p) { nni_sock *s = l->l_sock; - nni_pipe *p; - - nni_mtx_lock(&s->s_mx); - if (nni_pipe_create_listener(&p, l, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; - } - nni_list_append(&l->l_pipes, p); - nni_list_append(&s->s_pipes, p); - nni_mtx_unlock(&s->s_mx); #ifdef NNG_ENABLE_STATS nni_stat_inc(&l->st_pipes, 1); nni_stat_inc(&s->st_pipes, 1); @@ -1454,10 +1446,25 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) "Accepted pipe<%u> on socket<%u> from %s", nni_pipe_id(p), nni_sock_id(s), nni_pipe_peer_addr(p, addr)); } + + // the socket now "owns" the pipe, and a pipe close should immediately + // start the process of teardown. nni_pipe_rele(p); } void +nni_listener_add_pipe(nni_listener *l, void *tpipe) +{ + nni_pipe *p; + + if (nni_pipe_create_listener(&p, l, tpipe) != 0) { + return; + } + + listener_start_pipe(l, p); +} + +void nni_listener_shutdown(nni_listener *l) { nni_sock *s = l->l_sock; @@ -1520,6 +1527,44 @@ nni_listener_reap(nni_listener *l) nni_reap(&listener_reap_list, l); } +// nni_pipe_add just registers the pipe with the socket and endpoint +// so they won't be deallocated while the pipe still exists. +void +nni_pipe_add(nni_pipe *p) +{ + nni_sock *s = p->p_sock; + nni_dialer *d = p->p_dialer; + nni_listener *l = p->p_listener; + + nni_mtx_lock(&s->s_mx); + nni_list_append(&s->s_pipes, p); + if (d != NULL) { + NNI_ASSERT(l == NULL); + nni_list_append(&d->d_pipes, p); + } + if (l != NULL) { + NNI_ASSERT(d == NULL); + nni_list_append(&l->l_pipes, p); + } + nni_mtx_unlock(&s->s_mx); +} + +// nni_pipe_start attempts to start the pipe, adding it to the socket and +// endpoints and calling callbacks, etc. The pipe should already have finished +// any negotiation needed at the transport layer. +void +nni_pipe_start(nni_pipe *p) +{ + if (p->p_listener) { + NNI_ASSERT(p->p_dialer == NULL); + listener_start_pipe(p->p_listener, p); + } + if (p->p_dialer) { + NNI_ASSERT(p->p_listener == NULL); + dialer_start_pipe(p->p_dialer, p); + } +} + void nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) { @@ -1528,14 +1573,11 @@ nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) void *arg; nni_mtx_lock(&s->s_pipe_cbs_mtx); - if (!p->p_cbs) { - if (ev == NNG_PIPE_EV_ADD_PRE) { - // First event, after this we want all other events. - p->p_cbs = true; - } else { - nni_mtx_unlock(&s->s_pipe_cbs_mtx); - return; - } + if (ev == NNG_PIPE_EV_ADD_PRE) { + p->p_cbs = true; + } else if (!p->p_cbs) { + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + return; } cb = s->s_pipe_cbs[ev].cb_fn; arg = s->s_pipe_cbs[ev].cb_arg; @@ -1556,9 +1598,7 @@ nni_pipe_remove(nni_pipe *p) nni_mtx_lock(&s->s_mx); #ifdef NNG_ENABLE_STATS - if (nni_list_node_active(&p->p_sock_node)) { - nni_stat_dec(&s->st_pipes, 1); - } + nni_stat_dec(&s->st_pipes, 1); if (p->p_listener != NULL) { nni_stat_dec(&p->p_listener->st_pipes, 1); } @@ -1568,8 +1608,6 @@ nni_pipe_remove(nni_pipe *p) #endif nni_list_node_remove(&p->p_sock_node); nni_list_node_remove(&p->p_ep_node); - p->p_listener = NULL; - p->p_dialer = NULL; if ((d != NULL) && (d->d_pipe == p)) { d->d_pipe = NULL; dialer_timer_start_locked(d); // Kick the timer to redial. diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 801ef7b1..aeaa369b 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -132,12 +132,14 @@ extern void nni_dialer_destroy(nni_dialer *); extern void nni_dialer_timer_start(nni_dialer *); extern void nni_dialer_stop(nni_dialer *); +extern void nni_listener_start_pipe(nni_listener *, nni_pipe *); extern void nni_listener_add_pipe(nni_listener *, void *); extern void nni_listener_shutdown(nni_listener *); extern void nni_listener_reap(nni_listener *); extern void nni_listener_destroy(nni_listener *); extern void nni_listener_stop(nni_listener *); +extern void nni_pipe_add(nni_pipe *); extern void nni_pipe_remove(nni_pipe *); extern bool nni_pipe_is_closed(nni_pipe *); extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev); diff --git a/src/sp/transport.h b/src/sp/transport.h index b7134595..10207f28 100644 --- a/src/sp/transport.h +++ b/src/sp/transport.h @@ -12,6 +12,7 @@ #ifndef PROTOCOL_SP_TRANSPORT_H #define PROTOCOL_SP_TRANSPORT_H +#include "core/defs.h" #include "core/options.h" // Endpoint operations are called by the socket in a @@ -25,6 +26,10 @@ // against any asynchronous operations they manage themselves, though.) struct nni_sp_dialer_ops { + // d_size is the size of transport specific data allocated + // to the listener. + size_t d_size; + // d_init creates a vanilla dialer. The value created is // used for the first argument for all other dialer functions. int (*d_init)(void **, nng_url *, nni_dialer *); @@ -64,6 +69,10 @@ struct nni_sp_dialer_ops { }; struct nni_sp_listener_ops { + // l_size is the size of transport specific data allocated + // to the listener. + size_t l_size; + // l_init creates a vanilla listener. The value created is // used for the first argument for all other listener functions. int (*l_init)(void **, nng_url *, nni_listener *); @@ -121,20 +130,27 @@ struct nni_sp_pipe_ops { // p_init initializes the pipe data structures. The main // purpose of this is so that the pipe will see the upper // layer nni_pipe and get a chance to register stats and such. + size_t p_size; + + // p_init initializes the transport's pipe data structure. + // The pipe MUST be left in a state that p_fini can be safely + // called on it, even if it does not succeed. (The upper layers + // will call p_fini as part of the cleanup of a failure.) + // This function should not acquire any locks. int (*p_init)(void *, nni_pipe *); // p_fini destroys the pipe. This should clean up all local // resources, including closing files and freeing memory, used // by the pipe. After this call returns, the system will not - // make further calls on the same pipe. + // make further calls on the same pipe. This call should not block. void (*p_fini)(void *); // p_stop stops the pipe, waiting for any callbacks that are // outstanding to complete. This is done before tearing down - // resources with p_fini. + // resources with p_fini. Unlike p_fini, p_stop may block. void (*p_stop)(void *); - // p_aio_send queues the message for transmit. If this fails, + // p_send queues the message for transmit. If this fails, // then the caller may try again with the same message (or free // it). If the call succeeds, then the transport has taken // ownership of the message, and the caller may not use it |
