diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 60 |
1 files changed, 40 insertions, 20 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index f4e59af5..1fcf5e0b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -399,16 +399,6 @@ nni_sock_rele(nni_sock *s) nni_mtx_unlock(&sock_lk); } -bool -nni_sock_closing(nni_sock *s) -{ - bool rv; - nni_mtx_lock(&s->s_mx); - rv = s->s_closing; - nni_mtx_unlock(&s->s_mx); - return (rv); -} - static void sock_destroy(nni_sock *s) { @@ -1356,7 +1346,11 @@ static void dialer_timer_start_locked(nni_dialer *d) { nni_duration backoff; + nni_sock * sock = d->d_sock; + if (d->d_closing || sock->s_closed) { + return; + } backoff = d->d_currtime; d->d_currtime *= 2; if (d->d_currtime > d->d_maxrtime) { @@ -1383,15 +1377,16 @@ nni_dialer_timer_start(nni_dialer *d) } void -nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) +nni_dialer_add_pipe(nni_dialer *d, void *tpipe) { nni_sock *s = d->d_sock; + nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || d->d_closing) { + if (s->s_closed || d->d_closing || + (nni_pipe_create(&p, s, d->d_tran, tpipe) != 0)) { nni_mtx_unlock(&s->s_mx); - nni_pipe_close(p); return; } @@ -1402,8 +1397,20 @@ nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) d->d_currtime = d->d_inirtime; nni_mtx_unlock(&s->s_mx); - // Start the initial negotiation I/O... - nni_pipe_start(p); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); + + nni_mtx_lock(&s->s_mx); + if ((p->p_closed) || + (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_close(p); + nni_pipe_rele(p); + return; + } + nni_mtx_unlock(&s->s_mx); + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); + nni_pipe_rele(p); } static void @@ -1473,14 +1480,15 @@ nni_dialer_reap(nni_dialer *d) } void -nni_listener_add_pipe(nni_listener *l, nni_pipe *p) +nni_listener_add_pipe(nni_listener *l, void *tpipe) { nni_sock *s = l->l_sock; + nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || l->l_closing) { + if (s->s_closed || l->l_closing || + (nni_pipe_create(&p, s, l->l_tran, tpipe) != 0)) { nni_mtx_unlock(&s->s_mx); - nni_pipe_close(p); return; } p->p_listener = l; @@ -1488,8 +1496,20 @@ nni_listener_add_pipe(nni_listener *l, nni_pipe *p) nni_list_append(&s->s_pipes, p); nni_mtx_unlock(&s->s_mx); - // Start the initial negotiation I/O... - nni_pipe_start(p); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); + + nni_mtx_lock(&s->s_mx); + if ((p->p_closed) || + (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_close(p); + nni_pipe_rele(p); + return; + } + nni_mtx_unlock(&s->s_mx); + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); + nni_pipe_rele(p); } static void |
