aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/dialer.c12
-rw-r--r--src/core/listener.c14
-rw-r--r--src/core/pipe.c69
-rw-r--r--src/core/pipe.h5
-rw-r--r--src/core/socket.c60
-rw-r--r--src/core/sockimpl.h5
-rw-r--r--src/core/transport.h10
7 files changed, 58 insertions, 117 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 34e90891..4b2f105d 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -232,24 +232,18 @@ dialer_timer_cb(void *arg)
static void
dialer_connect_cb(void *arg)
{
- nni_dialer *d = arg;
- nni_pipe * p;
+ nni_dialer *d = arg;
nni_aio * aio = d->d_con_aio;
int rv;
bool synch;
- if ((rv = nni_aio_result(aio)) == 0) {
- void *data = nni_aio_get_output(aio, 0);
- NNI_ASSERT(data != NULL);
- rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data);
- }
nni_mtx_lock(&d->d_mtx);
synch = d->d_synch;
nni_mtx_unlock(&d->d_mtx);
- switch (rv) {
+ switch ((rv = nni_aio_result(aio))) {
case 0:
- nni_dialer_add_pipe(d, p);
+ nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
break;
case NNG_ECLOSED: // No further action.
case NNG_ECANCELED: // No further action.
diff --git a/src/core/listener.c b/src/core/listener.c
index debfa5f1..c2e68bb5 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -228,23 +228,17 @@ listener_timer_cb(void *arg)
static void
listener_accept_cb(void *arg)
{
- nni_listener *l = arg;
- nni_pipe * p;
+ nni_listener *l = arg;
nni_aio * aio = l->l_acc_aio;
- int rv;
- if ((rv = nni_aio_result(aio)) == 0) {
- void *data = nni_aio_get_output(aio, 0);
- NNI_ASSERT(data != NULL);
- rv = nni_pipe_create(&p, l->l_sock, l->l_tran, data);
- }
- switch (rv) {
+ switch (nni_aio_result(aio)) {
case 0:
- nni_listener_add_pipe(l, p);
+ nni_listener_add_pipe(l, nni_aio_get_output(aio, 0));
listener_accept_start(l);
break;
case NNG_ECONNABORTED: // remote condition, no cooldown
case NNG_ECONNRESET: // remote condition, no cooldown
+ case NNG_EPEERAUTH: // peer validation failure
listener_accept_start(l);
break;
case NNG_ECLOSED: // no further action
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 4f50ac7c..66cd87cc 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -75,9 +75,6 @@ pipe_destroy(nni_pipe *p)
}
nni_mtx_unlock(&nni_pipe_lk);
- // Wait for neg callbacks to finish. (Already closed).
- nni_aio_stop(p->p_start_aio);
-
if (p->p_proto_data != NULL) {
p->p_proto_ops.pipe_stop(p->p_proto_data);
}
@@ -93,7 +90,6 @@ pipe_destroy(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
- nni_aio_fini(p->p_start_aio);
nni_cv_fini(&p->p_cv);
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
@@ -154,9 +150,6 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
void
nni_pipe_close(nni_pipe *p)
{
- // abort any pending negotiation/start process.
- nni_aio_close(p->p_start_aio);
-
nni_mtx_lock(&p->p_mtx);
if (p->p_closed) {
// We already did a close.
@@ -178,49 +171,12 @@ nni_pipe_close(nni_pipe *p)
nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p);
}
-bool
-nni_pipe_closed(nni_pipe *p)
-{
- bool rv;
- nni_mtx_lock(&p->p_mtx);
- rv = p->p_closed;
- nni_mtx_unlock(&p->p_mtx);
- return (rv);
-}
-
uint16_t
nni_pipe_peer(nni_pipe *p)
{
return (p->p_tran_ops.p_peer(p->p_tran_data));
}
-static void
-nni_pipe_start_cb(void *arg)
-{
- nni_pipe *p = arg;
- nni_sock *s = p->p_sock;
- nni_aio * aio = p->p_start_aio;
-
- if (nni_aio_result(aio) != 0) {
- nni_pipe_close(p);
- return;
- }
-
- nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
- if (nni_pipe_closed(p)) {
- nni_pipe_close(p);
- return;
- }
-
- if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) ||
- nni_sock_closing(s)) {
- nni_pipe_close(p);
- return;
- }
-
- nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
-}
-
int
nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
@@ -228,6 +184,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
int rv;
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
+ uint64_t id;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -236,7 +193,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
}
// Make a private copy of the transport ops.
- p->p_start_aio = NULL;
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
@@ -253,18 +209,17 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
- if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) {
- uint64_t id;
- nni_mtx_lock(&nni_pipe_lk);
- if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
- p->p_id = (uint32_t) id;
- }
- nni_mtx_unlock(&nni_pipe_lk);
+ nni_mtx_lock(&nni_pipe_lk);
+ if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
+ p->p_id = id;
+ p->p_refcnt = 1;
}
+ nni_mtx_unlock(&nni_pipe_lk);
if ((rv != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
nni_pipe_close(p);
+ nni_pipe_rele(p);
return (rv);
}
@@ -296,16 +251,6 @@ nni_pipe_getopt(
return (NNG_ENOTSUP);
}
-void
-nni_pipe_start(nni_pipe *p)
-{
- if (p->p_tran_ops.p_start == NULL) {
- nni_aio_finish(p->p_start_aio, 0, 0);
- } else {
- p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio);
- }
-}
-
void *
nni_pipe_get_proto_data(nni_pipe *p)
{
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 1d73ce51..1e2f2b5d 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -59,11 +59,6 @@ extern uint32_t nni_pipe_listener_id(nni_pipe *);
// nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none).
extern uint32_t nni_pipe_dialer_id(nni_pipe *);
-// nni_pipe_closed returns true if nni_pipe_close was called.
-// (This is used by the socket to determine if user closed the pipe
-// during callback.)
-extern bool nni_pipe_closed(nni_pipe *);
-
// nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find.
extern void nni_pipe_rele(nni_pipe *);
diff --git a/src/core/socket.c b/src/core/socket.c
index f4e59af5..1fcf5e0b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -399,16 +399,6 @@ nni_sock_rele(nni_sock *s)
nni_mtx_unlock(&sock_lk);
}
-bool
-nni_sock_closing(nni_sock *s)
-{
- bool rv;
- nni_mtx_lock(&s->s_mx);
- rv = s->s_closing;
- nni_mtx_unlock(&s->s_mx);
- return (rv);
-}
-
static void
sock_destroy(nni_sock *s)
{
@@ -1356,7 +1346,11 @@ static void
dialer_timer_start_locked(nni_dialer *d)
{
nni_duration backoff;
+ nni_sock * sock = d->d_sock;
+ if (d->d_closing || sock->s_closed) {
+ return;
+ }
backoff = d->d_currtime;
d->d_currtime *= 2;
if (d->d_currtime > d->d_maxrtime) {
@@ -1383,15 +1377,16 @@ nni_dialer_timer_start(nni_dialer *d)
}
void
-nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
+nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
{
nni_sock *s = d->d_sock;
+ nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || d->d_closing) {
+ if (s->s_closed || d->d_closing ||
+ (nni_pipe_create(&p, s, d->d_tran, tpipe) != 0)) {
nni_mtx_unlock(&s->s_mx);
- nni_pipe_close(p);
return;
}
@@ -1402,8 +1397,20 @@ nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);
- // Start the initial negotiation I/O...
- nni_pipe_start(p);
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
+
+ nni_mtx_lock(&s->s_mx);
+ if ((p->p_closed) ||
+ (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_close(p);
+ nni_pipe_rele(p);
+ return;
+ }
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
+ nni_pipe_rele(p);
}
static void
@@ -1473,14 +1480,15 @@ nni_dialer_reap(nni_dialer *d)
}
void
-nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
+nni_listener_add_pipe(nni_listener *l, void *tpipe)
{
nni_sock *s = l->l_sock;
+ nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || l->l_closing) {
+ if (s->s_closed || l->l_closing ||
+ (nni_pipe_create(&p, s, l->l_tran, tpipe) != 0)) {
nni_mtx_unlock(&s->s_mx);
- nni_pipe_close(p);
return;
}
p->p_listener = l;
@@ -1488,8 +1496,20 @@ nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
nni_list_append(&s->s_pipes, p);
nni_mtx_unlock(&s->s_mx);
- // Start the initial negotiation I/O...
- nni_pipe_start(p);
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
+
+ nni_mtx_lock(&s->s_mx);
+ if ((p->p_closed) ||
+ (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_close(p);
+ nni_pipe_rele(p);
+ return;
+ }
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
+ nni_pipe_rele(p);
}
static void
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index 569e5cae..29e83f7a 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -78,7 +78,6 @@ struct nni_pipe {
nni_mtx p_mtx;
nni_cv p_cv;
nni_reap_item p_reap;
- nni_aio * p_start_aio;
};
extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
@@ -87,14 +86,14 @@ extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *);
extern int nni_sock_add_listener(nni_sock *, nni_listener *);
extern void nni_sock_remove_listener(nni_sock *, nni_listener *);
-extern void nni_dialer_add_pipe(nni_dialer *, nni_pipe *);
+extern void nni_dialer_add_pipe(nni_dialer *, void *);
extern void nni_dialer_shutdown(nni_dialer *);
extern void nni_dialer_reap(nni_dialer *);
extern void nni_dialer_destroy(nni_dialer *);
extern void nni_dialer_timer_start(nni_dialer *);
extern void nni_dialer_close_rele(nni_dialer *);
-extern void nni_listener_add_pipe(nni_listener *, nni_pipe *);
+extern void nni_listener_add_pipe(nni_listener *, void *);
extern void nni_listener_shutdown(nni_listener *);
extern void nni_listener_reap(nni_listener *);
extern void nni_listener_destroy(nni_listener *);
diff --git a/src/core/transport.h b/src/core/transport.h
index 257d232d..8b08e366 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -30,7 +30,8 @@ enum nni_ep_mode {
#define NNI_TRANSPORT_V1 0x54520001
#define NNI_TRANSPORT_V2 0x54520002
#define NNI_TRANSPORT_V3 0x54520003
-#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3
+#define NNI_TRANSPORT_V4 0x54520004
+#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V4
// Option handlers.
struct nni_tran_option {
@@ -131,13 +132,6 @@ struct nni_tran_pipe_ops {
// make further calls on the same pipe.
void (*p_fini)(void *);
- // p_start starts the pipe running. This gives the transport a
- // chance to hook into any transport specific negotiation
- // phase. The pipe will not have its p_send or p_recv calls
- // started, and will not be access by the "socket" until the
- // pipe has indicated its readiness by finishing the aio.
- void (*p_start)(void *, nni_aio *);
-
// p_stop stops the pipe, waiting for any callbacks that are
// outstanding to complete. This is done before tearing down
// resources with p_fini.