aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-07-31 12:33:58 -0700
committerGarrett D'Amore <garrett@damore.org>2018-08-05 18:45:04 +0300
commitd7f7c896c0ede24249ef63b1e45b1878bf4bd473 (patch)
tree32eece7d91a648f24cb174096fb9667cab978f37 /src/core/socket.c
parentccc24a8e508131a2226474642a038baaa2cbcc8c (diff)
downloadnng-d7f7c896c0ede24249ef63b1e45b1878bf4bd473.tar.gz
nng-d7f7c896c0ede24249ef63b1e45b1878bf4bd473.tar.bz2
nng-d7f7c896c0ede24249ef63b1e45b1878bf4bd473.zip
fixes #599 nng_dial sync should not return until added to socket
fixes #208 pipe start should occur before connect / accept fixes #616 Race condition closing between header & body This refactors the transports to handle their own connection handshaking before passing the pipe to the socket. This changes and simplifies the setup. This also fixes a rather challenging race condition described by #616.
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c52
1 files changed, 33 insertions, 19 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index f4e59af5..0fa776f1 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -399,16 +399,6 @@ nni_sock_rele(nni_sock *s)
nni_mtx_unlock(&sock_lk);
}
-bool
-nni_sock_closing(nni_sock *s)
-{
- bool rv;
- nni_mtx_lock(&s->s_mx);
- rv = s->s_closing;
- nni_mtx_unlock(&s->s_mx);
- return (rv);
-}
-
static void
sock_destroy(nni_sock *s)
{
@@ -1382,16 +1372,36 @@ nni_dialer_timer_start(nni_dialer *d)
nni_mtx_unlock(&s->s_mx);
}
+static void
+pipe_start(nni_pipe *p)
+{
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
+
+ // As the callback above that would close the pipe runs on
+ // this thread, we can skip the lock.
+ if (p->p_closed) {
+ return;
+ }
+
+ if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
+ nni_pipe_close(p);
+ return;
+ }
+
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
+}
+
void
-nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
+nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
{
nni_sock *s = d->d_sock;
+ nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || d->d_closing) {
+ if (s->s_closed || d->d_closing ||
+ (nni_pipe_create(&p, d->d_sock, d->d_tran, tpipe) != 0)) {
nni_mtx_unlock(&s->s_mx);
- nni_pipe_close(p);
return;
}
@@ -1403,7 +1413,7 @@ nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
nni_mtx_unlock(&s->s_mx);
// Start the initial negotiation I/O...
- nni_pipe_start(p);
+ pipe_start(p);
}
static void
@@ -1473,14 +1483,15 @@ nni_dialer_reap(nni_dialer *d)
}
void
-nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
+nni_listener_add_pipe(nni_listener *l, void *tpipe)
{
nni_sock *s = l->l_sock;
+ nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || l->l_closing) {
+ if (s->s_closed || l->l_closing ||
+ (nni_pipe_create(&p, l->l_sock, l->l_tran, tpipe) != 0)) {
nni_mtx_unlock(&s->s_mx);
- nni_pipe_close(p);
return;
}
p->p_listener = l;
@@ -1489,7 +1500,7 @@ nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
nni_mtx_unlock(&s->s_mx);
// Start the initial negotiation I/O...
- nni_pipe_start(p);
+ pipe_start(p);
}
static void
@@ -1600,7 +1611,10 @@ nni_pipe_remove(nni_pipe *p)
p->p_dialer = NULL;
if ((d != NULL) && (d->d_pipe == p)) {
d->d_pipe = NULL;
- dialer_timer_start_locked(d); // Kick the timer to redial.
+ if (!s->s_closing) {
+ dialer_timer_start_locked(
+ d); // Kick the timer to redial.
+ }
}
if (s->s_closing) {
nni_cv_wake(&s->s_cv);