diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-08-14 15:57:57 +0500 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-08-14 15:57:57 +0500 |
| commit | 77311bfcc94bba96cdee73ddcd1ac9a6d0ed17d2 (patch) | |
| tree | 2a6e124e6121738d3628fdf6b227b076875b85d2 | |
| parent | 6c6bdba20795166d2909adfecfa2d152de410101 (diff) | |
| download | nng-77311bfcc94bba96cdee73ddcd1ac9a6d0ed17d2.tar.gz nng-77311bfcc94bba96cdee73ddcd1ac9a6d0ed17d2.tar.bz2 nng-77311bfcc94bba96cdee73ddcd1ac9a6d0ed17d2.zip | |
fixes #208 pipe start should occur before connect / accept
fixes #599 nng_dial sync should not return until added to socket
This reintroduces the changes for the above fixes, building upon the
transport modifications that we have made to eliminate the separate
transport pipe start entry point. It also includes slightly reworked
code during start to put a hold on the pipe when it is created, which
we we drop at the end, hopefully fixing a use-after-free.
| -rw-r--r-- | src/core/dialer.c | 12 | ||||
| -rw-r--r-- | src/core/listener.c | 14 | ||||
| -rw-r--r-- | src/core/pipe.c | 69 | ||||
| -rw-r--r-- | src/core/pipe.h | 5 | ||||
| -rw-r--r-- | src/core/socket.c | 60 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 5 | ||||
| -rw-r--r-- | src/core/transport.h | 10 |
7 files changed, 58 insertions, 117 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c index 34e90891..4b2f105d 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -232,24 +232,18 @@ dialer_timer_cb(void *arg) static void dialer_connect_cb(void *arg) { - nni_dialer *d = arg; - nni_pipe * p; + nni_dialer *d = arg; nni_aio * aio = d->d_con_aio; int rv; bool synch; - if ((rv = nni_aio_result(aio)) == 0) { - void *data = nni_aio_get_output(aio, 0); - NNI_ASSERT(data != NULL); - rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data); - } nni_mtx_lock(&d->d_mtx); synch = d->d_synch; nni_mtx_unlock(&d->d_mtx); - switch (rv) { + switch ((rv = nni_aio_result(aio))) { case 0: - nni_dialer_add_pipe(d, p); + nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0)); break; case NNG_ECLOSED: // No further action. case NNG_ECANCELED: // No further action. diff --git a/src/core/listener.c b/src/core/listener.c index debfa5f1..c2e68bb5 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -228,23 +228,17 @@ listener_timer_cb(void *arg) static void listener_accept_cb(void *arg) { - nni_listener *l = arg; - nni_pipe * p; + nni_listener *l = arg; nni_aio * aio = l->l_acc_aio; - int rv; - if ((rv = nni_aio_result(aio)) == 0) { - void *data = nni_aio_get_output(aio, 0); - NNI_ASSERT(data != NULL); - rv = nni_pipe_create(&p, l->l_sock, l->l_tran, data); - } - switch (rv) { + switch (nni_aio_result(aio)) { case 0: - nni_listener_add_pipe(l, p); + nni_listener_add_pipe(l, nni_aio_get_output(aio, 0)); listener_accept_start(l); break; case NNG_ECONNABORTED: // remote condition, no cooldown case NNG_ECONNRESET: // remote condition, no cooldown + case NNG_EPEERAUTH: // peer validation failure listener_accept_start(l); break; case NNG_ECLOSED: // no further action diff --git a/src/core/pipe.c b/src/core/pipe.c index 4f50ac7c..66cd87cc 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -75,9 +75,6 @@ pipe_destroy(nni_pipe *p) } nni_mtx_unlock(&nni_pipe_lk); - // Wait for neg callbacks to finish. (Already closed). - nni_aio_stop(p->p_start_aio); - if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_stop(p->p_proto_data); } @@ -93,7 +90,6 @@ pipe_destroy(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } - nni_aio_fini(p->p_start_aio); nni_cv_fini(&p->p_cv); nni_mtx_fini(&p->p_mtx); NNI_FREE_STRUCT(p); @@ -154,9 +150,6 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) void nni_pipe_close(nni_pipe *p) { - // abort any pending negotiation/start process. - nni_aio_close(p->p_start_aio); - nni_mtx_lock(&p->p_mtx); if (p->p_closed) { // We already did a close. @@ -178,49 +171,12 @@ nni_pipe_close(nni_pipe *p) nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p); } -bool -nni_pipe_closed(nni_pipe *p) -{ - bool rv; - nni_mtx_lock(&p->p_mtx); - rv = p->p_closed; - nni_mtx_unlock(&p->p_mtx); - return (rv); -} - uint16_t nni_pipe_peer(nni_pipe *p) { return (p->p_tran_ops.p_peer(p->p_tran_data)); } -static void -nni_pipe_start_cb(void *arg) -{ - nni_pipe *p = arg; - nni_sock *s = p->p_sock; - nni_aio * aio = p->p_start_aio; - - if (nni_aio_result(aio) != 0) { - nni_pipe_close(p); - return; - } - - nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); - if (nni_pipe_closed(p)) { - nni_pipe_close(p); - return; - } - - if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || - nni_sock_closing(s)) { - nni_pipe_close(p); - return; - } - - nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); -} - int nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { @@ -228,6 +184,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) int rv; void * sdata = nni_sock_proto_data(sock); nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); + uint64_t id; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... @@ -236,7 +193,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) } // Make a private copy of the transport ops. - p->p_start_aio = NULL; p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; p->p_proto_ops = *pops; @@ -253,18 +209,17 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) nni_mtx_init(&p->p_mtx); nni_cv_init(&p->p_cv, &nni_pipe_lk); - if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) { - uint64_t id; - nni_mtx_lock(&nni_pipe_lk); - if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { - p->p_id = (uint32_t) id; - } - nni_mtx_unlock(&nni_pipe_lk); + nni_mtx_lock(&nni_pipe_lk); + if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { + p->p_id = id; + p->p_refcnt = 1; } + nni_mtx_unlock(&nni_pipe_lk); if ((rv != 0) || ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) { nni_pipe_close(p); + nni_pipe_rele(p); return (rv); } @@ -296,16 +251,6 @@ nni_pipe_getopt( return (NNG_ENOTSUP); } -void -nni_pipe_start(nni_pipe *p) -{ - if (p->p_tran_ops.p_start == NULL) { - nni_aio_finish(p->p_start_aio, 0, 0); - } else { - p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio); - } -} - void * nni_pipe_get_proto_data(nni_pipe *p) { diff --git a/src/core/pipe.h b/src/core/pipe.h index 1d73ce51..1e2f2b5d 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -59,11 +59,6 @@ extern uint32_t nni_pipe_listener_id(nni_pipe *); // nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none). extern uint32_t nni_pipe_dialer_id(nni_pipe *); -// nni_pipe_closed returns true if nni_pipe_close was called. -// (This is used by the socket to determine if user closed the pipe -// during callback.) -extern bool nni_pipe_closed(nni_pipe *); - // nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find. extern void nni_pipe_rele(nni_pipe *); diff --git a/src/core/socket.c b/src/core/socket.c index f4e59af5..1fcf5e0b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -399,16 +399,6 @@ nni_sock_rele(nni_sock *s) nni_mtx_unlock(&sock_lk); } -bool -nni_sock_closing(nni_sock *s) -{ - bool rv; - nni_mtx_lock(&s->s_mx); - rv = s->s_closing; - nni_mtx_unlock(&s->s_mx); - return (rv); -} - static void sock_destroy(nni_sock *s) { @@ -1356,7 +1346,11 @@ static void dialer_timer_start_locked(nni_dialer *d) { nni_duration backoff; + nni_sock * sock = d->d_sock; + if (d->d_closing || sock->s_closed) { + return; + } backoff = d->d_currtime; d->d_currtime *= 2; if (d->d_currtime > d->d_maxrtime) { @@ -1383,15 +1377,16 @@ nni_dialer_timer_start(nni_dialer *d) } void -nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) +nni_dialer_add_pipe(nni_dialer *d, void *tpipe) { nni_sock *s = d->d_sock; + nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || d->d_closing) { + if (s->s_closed || d->d_closing || + (nni_pipe_create(&p, s, d->d_tran, tpipe) != 0)) { nni_mtx_unlock(&s->s_mx); - nni_pipe_close(p); return; } @@ -1402,8 +1397,20 @@ nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) d->d_currtime = d->d_inirtime; nni_mtx_unlock(&s->s_mx); - // Start the initial negotiation I/O... - nni_pipe_start(p); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); + + nni_mtx_lock(&s->s_mx); + if ((p->p_closed) || + (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_close(p); + nni_pipe_rele(p); + return; + } + nni_mtx_unlock(&s->s_mx); + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); + nni_pipe_rele(p); } static void @@ -1473,14 +1480,15 @@ nni_dialer_reap(nni_dialer *d) } void -nni_listener_add_pipe(nni_listener *l, nni_pipe *p) +nni_listener_add_pipe(nni_listener *l, void *tpipe) { nni_sock *s = l->l_sock; + nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || l->l_closing) { + if (s->s_closed || l->l_closing || + (nni_pipe_create(&p, s, l->l_tran, tpipe) != 0)) { nni_mtx_unlock(&s->s_mx); - nni_pipe_close(p); return; } p->p_listener = l; @@ -1488,8 +1496,20 @@ nni_listener_add_pipe(nni_listener *l, nni_pipe *p) nni_list_append(&s->s_pipes, p); nni_mtx_unlock(&s->s_mx); - // Start the initial negotiation I/O... - nni_pipe_start(p); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); + + nni_mtx_lock(&s->s_mx); + if ((p->p_closed) || + (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_close(p); + nni_pipe_rele(p); + return; + } + nni_mtx_unlock(&s->s_mx); + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); + nni_pipe_rele(p); } static void diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 569e5cae..29e83f7a 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -78,7 +78,6 @@ struct nni_pipe { nni_mtx p_mtx; nni_cv p_cv; nni_reap_item p_reap; - nni_aio * p_start_aio; }; extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); @@ -87,14 +86,14 @@ extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); extern int nni_sock_add_listener(nni_sock *, nni_listener *); extern void nni_sock_remove_listener(nni_sock *, nni_listener *); -extern void nni_dialer_add_pipe(nni_dialer *, nni_pipe *); +extern void nni_dialer_add_pipe(nni_dialer *, void *); extern void nni_dialer_shutdown(nni_dialer *); extern void nni_dialer_reap(nni_dialer *); extern void nni_dialer_destroy(nni_dialer *); extern void nni_dialer_timer_start(nni_dialer *); extern void nni_dialer_close_rele(nni_dialer *); -extern void nni_listener_add_pipe(nni_listener *, nni_pipe *); +extern void nni_listener_add_pipe(nni_listener *, void *); extern void nni_listener_shutdown(nni_listener *); extern void nni_listener_reap(nni_listener *); extern void nni_listener_destroy(nni_listener *); diff --git a/src/core/transport.h b/src/core/transport.h index 257d232d..8b08e366 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -30,7 +30,8 @@ enum nni_ep_mode { #define NNI_TRANSPORT_V1 0x54520001 #define NNI_TRANSPORT_V2 0x54520002 #define NNI_TRANSPORT_V3 0x54520003 -#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3 +#define NNI_TRANSPORT_V4 0x54520004 +#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V4 // Option handlers. struct nni_tran_option { @@ -131,13 +132,6 @@ struct nni_tran_pipe_ops { // make further calls on the same pipe. void (*p_fini)(void *); - // p_start starts the pipe running. This gives the transport a - // chance to hook into any transport specific negotiation - // phase. The pipe will not have its p_send or p_recv calls - // started, and will not be access by the "socket" until the - // pipe has indicated its readiness by finishing the aio. - void (*p_start)(void *, nni_aio *); - // p_stop stops the pipe, waiting for any callbacks that are // outstanding to complete. This is done before tearing down // resources with p_fini. |
