From d7f7c896c0ede24249ef63b1e45b1878bf4bd473 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Tue, 31 Jul 2018 12:33:58 -0700 Subject: fixes #599 nng_dial sync should not return until added to socket fixes #208 pipe start should occur before connect / accept fixes #616 Race condition closing between header & body This refactors the transports to handle their own connection handshaking before passing the pipe to the socket. This changes and simplifies the setup. This also fixes a rather challenging race condition described by #616. --- src/core/socket.c | 52 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 19 deletions(-) (limited to 'src/core/socket.c') diff --git a/src/core/socket.c b/src/core/socket.c index f4e59af5..0fa776f1 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -399,16 +399,6 @@ nni_sock_rele(nni_sock *s) nni_mtx_unlock(&sock_lk); } -bool -nni_sock_closing(nni_sock *s) -{ - bool rv; - nni_mtx_lock(&s->s_mx); - rv = s->s_closing; - nni_mtx_unlock(&s->s_mx); - return (rv); -} - static void sock_destroy(nni_sock *s) { @@ -1382,16 +1372,36 @@ nni_dialer_timer_start(nni_dialer *d) nni_mtx_unlock(&s->s_mx); } +static void +pipe_start(nni_pipe *p) +{ + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); + + // As the callback above that would close the pipe runs on + // this thread, we can skip the lock. + if (p->p_closed) { + return; + } + + if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) { + nni_pipe_close(p); + return; + } + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); +} + void -nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) +nni_dialer_add_pipe(nni_dialer *d, void *tpipe) { nni_sock *s = d->d_sock; + nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || d->d_closing) { + if (s->s_closed || d->d_closing || + (nni_pipe_create(&p, d->d_sock, d->d_tran, tpipe) != 0)) { nni_mtx_unlock(&s->s_mx); - nni_pipe_close(p); return; } @@ -1403,7 +1413,7 @@ nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) nni_mtx_unlock(&s->s_mx); // Start the initial negotiation I/O... - nni_pipe_start(p); + pipe_start(p); } static void @@ -1473,14 +1483,15 @@ nni_dialer_reap(nni_dialer *d) } void -nni_listener_add_pipe(nni_listener *l, nni_pipe *p) +nni_listener_add_pipe(nni_listener *l, void *tpipe) { nni_sock *s = l->l_sock; + nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || l->l_closing) { + if (s->s_closed || l->l_closing || + (nni_pipe_create(&p, l->l_sock, l->l_tran, tpipe) != 0)) { nni_mtx_unlock(&s->s_mx); - nni_pipe_close(p); return; } p->p_listener = l; @@ -1489,7 +1500,7 @@ nni_listener_add_pipe(nni_listener *l, nni_pipe *p) nni_mtx_unlock(&s->s_mx); // Start the initial negotiation I/O... - nni_pipe_start(p); + pipe_start(p); } static void @@ -1600,7 +1611,10 @@ nni_pipe_remove(nni_pipe *p) p->p_dialer = NULL; if ((d != NULL) && (d->d_pipe == p)) { d->d_pipe = NULL; - dialer_timer_start_locked(d); // Kick the timer to redial. + if (!s->s_closing) { + dialer_timer_start_locked( + d); // Kick the timer to redial. + } } if (s->s_closing) { nni_cv_wake(&s->s_cv); -- cgit v1.2.3-70-g09d2