summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-08-14 15:57:57 +0500
committerGarrett D'Amore <garrett@damore.org>2018-08-14 15:57:57 +0500
commit77311bfcc94bba96cdee73ddcd1ac9a6d0ed17d2 (patch)
tree2a6e124e6121738d3628fdf6b227b076875b85d2 /src
parent6c6bdba20795166d2909adfecfa2d152de410101 (diff)
downloadnng-77311bfcc94bba96cdee73ddcd1ac9a6d0ed17d2.tar.gz
nng-77311bfcc94bba96cdee73ddcd1ac9a6d0ed17d2.tar.bz2
nng-77311bfcc94bba96cdee73ddcd1ac9a6d0ed17d2.zip
fixes #208 pipe start should occur before connect / accept
fixes #599 nng_dial sync should not return until added to socket This reintroduces the changes for the above fixes, building upon the transport modifications that we have made to eliminate the separate transport pipe start entry point. It also includes slightly reworked code during start to put a hold on the pipe when it is created, which we we drop at the end, hopefully fixing a use-after-free.
Diffstat (limited to 'src')
-rw-r--r--src/core/dialer.c12
-rw-r--r--src/core/listener.c14
-rw-r--r--src/core/pipe.c69
-rw-r--r--src/core/pipe.h5
-rw-r--r--src/core/socket.c60
-rw-r--r--src/core/sockimpl.h5
-rw-r--r--src/core/transport.h10
7 files changed, 58 insertions, 117 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 34e90891..4b2f105d 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -232,24 +232,18 @@ dialer_timer_cb(void *arg)
static void
dialer_connect_cb(void *arg)
{
- nni_dialer *d = arg;
- nni_pipe * p;
+ nni_dialer *d = arg;
nni_aio * aio = d->d_con_aio;
int rv;
bool synch;
- if ((rv = nni_aio_result(aio)) == 0) {
- void *data = nni_aio_get_output(aio, 0);
- NNI_ASSERT(data != NULL);
- rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data);
- }
nni_mtx_lock(&d->d_mtx);
synch = d->d_synch;
nni_mtx_unlock(&d->d_mtx);
- switch (rv) {
+ switch ((rv = nni_aio_result(aio))) {
case 0:
- nni_dialer_add_pipe(d, p);
+ nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
break;
case NNG_ECLOSED: // No further action.
case NNG_ECANCELED: // No further action.
diff --git a/src/core/listener.c b/src/core/listener.c
index debfa5f1..c2e68bb5 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -228,23 +228,17 @@ listener_timer_cb(void *arg)
static void
listener_accept_cb(void *arg)
{
- nni_listener *l = arg;
- nni_pipe * p;
+ nni_listener *l = arg;
nni_aio * aio = l->l_acc_aio;
- int rv;
- if ((rv = nni_aio_result(aio)) == 0) {
- void *data = nni_aio_get_output(aio, 0);
- NNI_ASSERT(data != NULL);
- rv = nni_pipe_create(&p, l->l_sock, l->l_tran, data);
- }
- switch (rv) {
+ switch (nni_aio_result(aio)) {
case 0:
- nni_listener_add_pipe(l, p);
+ nni_listener_add_pipe(l, nni_aio_get_output(aio, 0));
listener_accept_start(l);
break;
case NNG_ECONNABORTED: // remote condition, no cooldown
case NNG_ECONNRESET: // remote condition, no cooldown
+ case NNG_EPEERAUTH: // peer validation failure
listener_accept_start(l);
break;
case NNG_ECLOSED: // no further action
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 4f50ac7c..66cd87cc 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -75,9 +75,6 @@ pipe_destroy(nni_pipe *p)
}
nni_mtx_unlock(&nni_pipe_lk);
- // Wait for neg callbacks to finish. (Already closed).
- nni_aio_stop(p->p_start_aio);
-
if (p->p_proto_data != NULL) {
p->p_proto_ops.pipe_stop(p->p_proto_data);
}
@@ -93,7 +90,6 @@ pipe_destroy(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
- nni_aio_fini(p->p_start_aio);
nni_cv_fini(&p->p_cv);
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
@@ -154,9 +150,6 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
void
nni_pipe_close(nni_pipe *p)
{
- // abort any pending negotiation/start process.
- nni_aio_close(p->p_start_aio);
-
nni_mtx_lock(&p->p_mtx);
if (p->p_closed) {
// We already did a close.
@@ -178,49 +171,12 @@ nni_pipe_close(nni_pipe *p)
nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p);
}
-bool
-nni_pipe_closed(nni_pipe *p)
-{
- bool rv;
- nni_mtx_lock(&p->p_mtx);
- rv = p->p_closed;
- nni_mtx_unlock(&p->p_mtx);
- return (rv);
-}
-
uint16_t
nni_pipe_peer(nni_pipe *p)
{
return (p->p_tran_ops.p_peer(p->p_tran_data));
}
-static void
-nni_pipe_start_cb(void *arg)
-{
- nni_pipe *p = arg;
- nni_sock *s = p->p_sock;
- nni_aio * aio = p->p_start_aio;
-
- if (nni_aio_result(aio) != 0) {
- nni_pipe_close(p);
- return;
- }
-
- nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
- if (nni_pipe_closed(p)) {
- nni_pipe_close(p);
- return;
- }
-
- if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) ||
- nni_sock_closing(s)) {
- nni_pipe_close(p);
- return;
- }
-
- nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
-}
-
int
nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
@@ -228,6 +184,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
int rv;
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
+ uint64_t id;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -236,7 +193,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
}
// Make a private copy of the transport ops.
- p->p_start_aio = NULL;
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
@@ -253,18 +209,17 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
- if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) {
- uint64_t id;
- nni_mtx_lock(&nni_pipe_lk);
- if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
- p->p_id = (uint32_t) id;
- }
- nni_mtx_unlock(&nni_pipe_lk);
+ nni_mtx_lock(&nni_pipe_lk);
+ if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
+ p->p_id = id;
+ p->p_refcnt = 1;
}
+ nni_mtx_unlock(&nni_pipe_lk);
if ((rv != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
nni_pipe_close(p);
+ nni_pipe_rele(p);
return (rv);
}
@@ -296,16 +251,6 @@ nni_pipe_getopt(
return (NNG_ENOTSUP);
}
-void
-nni_pipe_start(nni_pipe *p)
-{
- if (p->p_tran_ops.p_start == NULL) {
- nni_aio_finish(p->p_start_aio, 0, 0);
- } else {
- p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio);
- }
-}
-
void *
nni_pipe_get_proto_data(nni_pipe *p)
{
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 1d73ce51..1e2f2b5d 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -59,11 +59,6 @@ extern uint32_t nni_pipe_listener_id(nni_pipe *);
// nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none).
extern uint32_t nni_pipe_dialer_id(nni_pipe *);
-// nni_pipe_closed returns true if nni_pipe_close was called.
-// (This is used by the socket to determine if user closed the pipe
-// during callback.)
-extern bool nni_pipe_closed(nni_pipe *);
-
// nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find.
extern void nni_pipe_rele(nni_pipe *);
diff --git a/src/core/socket.c b/src/core/socket.c
index f4e59af5..1fcf5e0b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -399,16 +399,6 @@ nni_sock_rele(nni_sock *s)
nni_mtx_unlock(&sock_lk);
}
-bool
-nni_sock_closing(nni_sock *s)
-{
- bool rv;
- nni_mtx_lock(&s->s_mx);
- rv = s->s_closing;
- nni_mtx_unlock(&s->s_mx);
- return (rv);
-}
-
static void
sock_destroy(nni_sock *s)
{
@@ -1356,7 +1346,11 @@ static void
dialer_timer_start_locked(nni_dialer *d)
{
nni_duration backoff;
+ nni_sock * sock = d->d_sock;
+ if (d->d_closing || sock->s_closed) {
+ return;
+ }
backoff = d->d_currtime;
d->d_currtime *= 2;
if (d->d_currtime > d->d_maxrtime) {
@@ -1383,15 +1377,16 @@ nni_dialer_timer_start(nni_dialer *d)
}
void
-nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
+nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
{
nni_sock *s = d->d_sock;
+ nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || d->d_closing) {
+ if (s->s_closed || d->d_closing ||
+ (nni_pipe_create(&p, s, d->d_tran, tpipe) != 0)) {
nni_mtx_unlock(&s->s_mx);
- nni_pipe_close(p);
return;
}
@@ -1402,8 +1397,20 @@ nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);
- // Start the initial negotiation I/O...
- nni_pipe_start(p);
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
+
+ nni_mtx_lock(&s->s_mx);
+ if ((p->p_closed) ||
+ (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_close(p);
+ nni_pipe_rele(p);
+ return;
+ }
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
+ nni_pipe_rele(p);
}
static void
@@ -1473,14 +1480,15 @@ nni_dialer_reap(nni_dialer *d)
}
void
-nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
+nni_listener_add_pipe(nni_listener *l, void *tpipe)
{
nni_sock *s = l->l_sock;
+ nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || l->l_closing) {
+ if (s->s_closed || l->l_closing ||
+ (nni_pipe_create(&p, s, l->l_tran, tpipe) != 0)) {
nni_mtx_unlock(&s->s_mx);
- nni_pipe_close(p);
return;
}
p->p_listener = l;
@@ -1488,8 +1496,20 @@ nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
nni_list_append(&s->s_pipes, p);
nni_mtx_unlock(&s->s_mx);
- // Start the initial negotiation I/O...
- nni_pipe_start(p);
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
+
+ nni_mtx_lock(&s->s_mx);
+ if ((p->p_closed) ||
+ (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_close(p);
+ nni_pipe_rele(p);
+ return;
+ }
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
+ nni_pipe_rele(p);
}
static void
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index 569e5cae..29e83f7a 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -78,7 +78,6 @@ struct nni_pipe {
nni_mtx p_mtx;
nni_cv p_cv;
nni_reap_item p_reap;
- nni_aio * p_start_aio;
};
extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
@@ -87,14 +86,14 @@ extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *);
extern int nni_sock_add_listener(nni_sock *, nni_listener *);
extern void nni_sock_remove_listener(nni_sock *, nni_listener *);
-extern void nni_dialer_add_pipe(nni_dialer *, nni_pipe *);
+extern void nni_dialer_add_pipe(nni_dialer *, void *);
extern void nni_dialer_shutdown(nni_dialer *);
extern void nni_dialer_reap(nni_dialer *);
extern void nni_dialer_destroy(nni_dialer *);
extern void nni_dialer_timer_start(nni_dialer *);
extern void nni_dialer_close_rele(nni_dialer *);
-extern void nni_listener_add_pipe(nni_listener *, nni_pipe *);
+extern void nni_listener_add_pipe(nni_listener *, void *);
extern void nni_listener_shutdown(nni_listener *);
extern void nni_listener_reap(nni_listener *);
extern void nni_listener_destroy(nni_listener *);
diff --git a/src/core/transport.h b/src/core/transport.h
index 257d232d..8b08e366 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -30,7 +30,8 @@ enum nni_ep_mode {
#define NNI_TRANSPORT_V1 0x54520001
#define NNI_TRANSPORT_V2 0x54520002
#define NNI_TRANSPORT_V3 0x54520003
-#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3
+#define NNI_TRANSPORT_V4 0x54520004
+#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V4
// Option handlers.
struct nni_tran_option {
@@ -131,13 +132,6 @@ struct nni_tran_pipe_ops {
// make further calls on the same pipe.
void (*p_fini)(void *);
- // p_start starts the pipe running. This gives the transport a
- // chance to hook into any transport specific negotiation
- // phase. The pipe will not have its p_send or p_recv calls
- // started, and will not be access by the "socket" until the
- // pipe has indicated its readiness by finishing the aio.
- void (*p_start)(void *, nni_aio *);
-
// p_stop stops the pipe, waiting for any callbacks that are
// outstanding to complete. This is done before tearing down
// resources with p_fini.