aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c52
1 files changed, 19 insertions, 33 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 0fa776f1..f4e59af5 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -399,6 +399,16 @@ nni_sock_rele(nni_sock *s)
nni_mtx_unlock(&sock_lk);
}
+bool
+nni_sock_closing(nni_sock *s)
+{
+ bool rv;
+ nni_mtx_lock(&s->s_mx);
+ rv = s->s_closing;
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
+}
+
static void
sock_destroy(nni_sock *s)
{
@@ -1372,36 +1382,16 @@ nni_dialer_timer_start(nni_dialer *d)
nni_mtx_unlock(&s->s_mx);
}
-static void
-pipe_start(nni_pipe *p)
-{
- nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
-
- // As the callback above that would close the pipe runs on
- // this thread, we can skip the lock.
- if (p->p_closed) {
- return;
- }
-
- if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
- nni_pipe_close(p);
- return;
- }
-
- nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
-}
-
void
-nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
+nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
{
nni_sock *s = d->d_sock;
- nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || d->d_closing ||
- (nni_pipe_create(&p, d->d_sock, d->d_tran, tpipe) != 0)) {
+ if (s->s_closed || d->d_closing) {
nni_mtx_unlock(&s->s_mx);
+ nni_pipe_close(p);
return;
}
@@ -1413,7 +1403,7 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_mtx_unlock(&s->s_mx);
// Start the initial negotiation I/O...
- pipe_start(p);
+ nni_pipe_start(p);
}
static void
@@ -1483,15 +1473,14 @@ nni_dialer_reap(nni_dialer *d)
}
void
-nni_listener_add_pipe(nni_listener *l, void *tpipe)
+nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
{
nni_sock *s = l->l_sock;
- nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || l->l_closing ||
- (nni_pipe_create(&p, l->l_sock, l->l_tran, tpipe) != 0)) {
+ if (s->s_closed || l->l_closing) {
nni_mtx_unlock(&s->s_mx);
+ nni_pipe_close(p);
return;
}
p->p_listener = l;
@@ -1500,7 +1489,7 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_mtx_unlock(&s->s_mx);
// Start the initial negotiation I/O...
- pipe_start(p);
+ nni_pipe_start(p);
}
static void
@@ -1611,10 +1600,7 @@ nni_pipe_remove(nni_pipe *p)
p->p_dialer = NULL;
if ((d != NULL) && (d->d_pipe == p)) {
d->d_pipe = NULL;
- if (!s->s_closing) {
- dialer_timer_start_locked(
- d); // Kick the timer to redial.
- }
+ dialer_timer_start_locked(d); // Kick the timer to redial.
}
if (s->s_closing) {
nni_cv_wake(&s->s_cv);