aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c60
1 files changed, 40 insertions, 20 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index f4e59af5..1fcf5e0b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -399,16 +399,6 @@ nni_sock_rele(nni_sock *s)
nni_mtx_unlock(&sock_lk);
}
-bool
-nni_sock_closing(nni_sock *s)
-{
- bool rv;
- nni_mtx_lock(&s->s_mx);
- rv = s->s_closing;
- nni_mtx_unlock(&s->s_mx);
- return (rv);
-}
-
static void
sock_destroy(nni_sock *s)
{
@@ -1356,7 +1346,11 @@ static void
dialer_timer_start_locked(nni_dialer *d)
{
nni_duration backoff;
+ nni_sock * sock = d->d_sock;
+ if (d->d_closing || sock->s_closed) {
+ return;
+ }
backoff = d->d_currtime;
d->d_currtime *= 2;
if (d->d_currtime > d->d_maxrtime) {
@@ -1383,15 +1377,16 @@ nni_dialer_timer_start(nni_dialer *d)
}
void
-nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
+nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
{
nni_sock *s = d->d_sock;
+ nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || d->d_closing) {
+ if (s->s_closed || d->d_closing ||
+ (nni_pipe_create(&p, s, d->d_tran, tpipe) != 0)) {
nni_mtx_unlock(&s->s_mx);
- nni_pipe_close(p);
return;
}
@@ -1402,8 +1397,20 @@ nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);
- // Start the initial negotiation I/O...
- nni_pipe_start(p);
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
+
+ nni_mtx_lock(&s->s_mx);
+ if ((p->p_closed) ||
+ (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_close(p);
+ nni_pipe_rele(p);
+ return;
+ }
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
+ nni_pipe_rele(p);
}
static void
@@ -1473,14 +1480,15 @@ nni_dialer_reap(nni_dialer *d)
}
void
-nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
+nni_listener_add_pipe(nni_listener *l, void *tpipe)
{
nni_sock *s = l->l_sock;
+ nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || l->l_closing) {
+ if (s->s_closed || l->l_closing ||
+ (nni_pipe_create(&p, s, l->l_tran, tpipe) != 0)) {
nni_mtx_unlock(&s->s_mx);
- nni_pipe_close(p);
return;
}
p->p_listener = l;
@@ -1488,8 +1496,20 @@ nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
nni_list_append(&s->s_pipes, p);
nni_mtx_unlock(&s->s_mx);
- // Start the initial negotiation I/O...
- nni_pipe_start(p);
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
+
+ nni_mtx_lock(&s->s_mx);
+ if ((p->p_closed) ||
+ (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_close(p);
+ nni_pipe_rele(p);
+ return;
+ }
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
+ nni_pipe_rele(p);
}
static void