aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c10
-rw-r--r--src/core/dialer.c12
-rw-r--r--src/core/listener.c19
-rw-r--r--src/core/pipe.c67
-rw-r--r--src/core/pipe.h5
-rw-r--r--src/core/socket.c52
-rw-r--r--src/core/socket.h2
-rw-r--r--src/core/sockimpl.h5
-rw-r--r--src/core/transport.h12
9 files changed, 121 insertions, 63 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 294a0b92..40638bce 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -217,17 +217,11 @@ void
nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_cancelfn cancelfn;
-
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- aio->a_stop = true;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
- if (cancelfn != NULL) {
- cancelfn(aio, NNG_ECANCELED);
- }
+ nni_aio_abort(aio, NNG_ECANCELED);
nni_aio_wait(aio);
}
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 4b2f105d..34e90891 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -232,18 +232,24 @@ dialer_timer_cb(void *arg)
static void
dialer_connect_cb(void *arg)
{
- nni_dialer *d = arg;
+ nni_dialer *d = arg;
+ nni_pipe * p;
nni_aio * aio = d->d_con_aio;
int rv;
bool synch;
+ if ((rv = nni_aio_result(aio)) == 0) {
+ void *data = nni_aio_get_output(aio, 0);
+ NNI_ASSERT(data != NULL);
+ rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data);
+ }
nni_mtx_lock(&d->d_mtx);
synch = d->d_synch;
nni_mtx_unlock(&d->d_mtx);
- switch ((rv = nni_aio_result(aio))) {
+ switch (rv) {
case 0:
- nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
+ nni_dialer_add_pipe(d, p);
break;
case NNG_ECLOSED: // No further action.
case NNG_ECANCELED: // No further action.
diff --git a/src/core/listener.c b/src/core/listener.c
index 17062d6e..debfa5f1 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -181,8 +181,6 @@ nni_listener_close(nni_listener *l)
}
l->l_closed = true;
nni_mtx_unlock(&listeners_lk);
- nni_aio_close(l->l_acc_aio);
- nni_aio_close(l->l_tmo_aio);
// Remove us from the table so we cannot be found.
// This is done fairly early in the teardown process.
@@ -230,25 +228,28 @@ listener_timer_cb(void *arg)
static void
listener_accept_cb(void *arg)
{
- nni_listener *l = arg;
+ nni_listener *l = arg;
+ nni_pipe * p;
nni_aio * aio = l->l_acc_aio;
+ int rv;
- switch (nni_aio_result(aio)) {
+ if ((rv = nni_aio_result(aio)) == 0) {
+ void *data = nni_aio_get_output(aio, 0);
+ NNI_ASSERT(data != NULL);
+ rv = nni_pipe_create(&p, l->l_sock, l->l_tran, data);
+ }
+ switch (rv) {
case 0:
- nni_listener_add_pipe(l, nni_aio_get_output(aio, 0));
+ nni_listener_add_pipe(l, p);
listener_accept_start(l);
break;
case NNG_ECONNABORTED: // remote condition, no cooldown
case NNG_ECONNRESET: // remote condition, no cooldown
- case NNG_EPEERAUTH: // peer validation failure
listener_accept_start(l);
break;
case NNG_ECLOSED: // no further action
case NNG_ECANCELED: // no further action
break;
- case NNG_ENOMEM:
- case NNG_ENOFILES:
- case NNG_ENOSPC:
default:
// We don't really know why we failed, but we backoff
// here. This is because errors here are probably due
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 374c45c8..4f50ac7c 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -75,6 +75,9 @@ pipe_destroy(nni_pipe *p)
}
nni_mtx_unlock(&nni_pipe_lk);
+ // Wait for neg callbacks to finish. (Already closed).
+ nni_aio_stop(p->p_start_aio);
+
if (p->p_proto_data != NULL) {
p->p_proto_ops.pipe_stop(p->p_proto_data);
}
@@ -90,6 +93,7 @@ pipe_destroy(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
+ nni_aio_fini(p->p_start_aio);
nni_cv_fini(&p->p_cv);
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
@@ -150,6 +154,9 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
void
nni_pipe_close(nni_pipe *p)
{
+ // abort any pending negotiation/start process.
+ nni_aio_close(p->p_start_aio);
+
nni_mtx_lock(&p->p_mtx);
if (p->p_closed) {
// We already did a close.
@@ -171,12 +178,49 @@ nni_pipe_close(nni_pipe *p)
nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p);
}
+bool
+nni_pipe_closed(nni_pipe *p)
+{
+ bool rv;
+ nni_mtx_lock(&p->p_mtx);
+ rv = p->p_closed;
+ nni_mtx_unlock(&p->p_mtx);
+ return (rv);
+}
+
uint16_t
nni_pipe_peer(nni_pipe *p)
{
return (p->p_tran_ops.p_peer(p->p_tran_data));
}
+static void
+nni_pipe_start_cb(void *arg)
+{
+ nni_pipe *p = arg;
+ nni_sock *s = p->p_sock;
+ nni_aio * aio = p->p_start_aio;
+
+ if (nni_aio_result(aio) != 0) {
+ nni_pipe_close(p);
+ return;
+ }
+
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
+ if (nni_pipe_closed(p)) {
+ nni_pipe_close(p);
+ return;
+ }
+
+ if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) ||
+ nni_sock_closing(s)) {
+ nni_pipe_close(p);
+ return;
+ }
+
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
+}
+
int
nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
@@ -184,7 +228,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
int rv;
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
- uint64_t id;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -193,6 +236,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
}
// Make a private copy of the transport ops.
+ p->p_start_aio = NULL;
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
@@ -209,11 +253,14 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
- nni_mtx_lock(&nni_pipe_lk);
- if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
- p->p_id = (uint32_t) id;
+ if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) {
+ uint64_t id;
+ nni_mtx_lock(&nni_pipe_lk);
+ if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
+ p->p_id = (uint32_t) id;
+ }
+ nni_mtx_unlock(&nni_pipe_lk);
}
- nni_mtx_unlock(&nni_pipe_lk);
if ((rv != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
@@ -249,6 +296,16 @@ nni_pipe_getopt(
return (NNG_ENOTSUP);
}
+void
+nni_pipe_start(nni_pipe *p)
+{
+ if (p->p_tran_ops.p_start == NULL) {
+ nni_aio_finish(p->p_start_aio, 0, 0);
+ } else {
+ p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio);
+ }
+}
+
void *
nni_pipe_get_proto_data(nni_pipe *p)
{
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 1e2f2b5d..1d73ce51 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -59,6 +59,11 @@ extern uint32_t nni_pipe_listener_id(nni_pipe *);
// nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none).
extern uint32_t nni_pipe_dialer_id(nni_pipe *);
+// nni_pipe_closed returns true if nni_pipe_close was called.
+// (This is used by the socket to determine if user closed the pipe
+// during callback.)
+extern bool nni_pipe_closed(nni_pipe *);
+
// nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find.
extern void nni_pipe_rele(nni_pipe *);
diff --git a/src/core/socket.c b/src/core/socket.c
index 0fa776f1..f4e59af5 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -399,6 +399,16 @@ nni_sock_rele(nni_sock *s)
nni_mtx_unlock(&sock_lk);
}
+bool
+nni_sock_closing(nni_sock *s)
+{
+ bool rv;
+ nni_mtx_lock(&s->s_mx);
+ rv = s->s_closing;
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
+}
+
static void
sock_destroy(nni_sock *s)
{
@@ -1372,36 +1382,16 @@ nni_dialer_timer_start(nni_dialer *d)
nni_mtx_unlock(&s->s_mx);
}
-static void
-pipe_start(nni_pipe *p)
-{
- nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
-
- // As the callback above that would close the pipe runs on
- // this thread, we can skip the lock.
- if (p->p_closed) {
- return;
- }
-
- if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
- nni_pipe_close(p);
- return;
- }
-
- nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
-}
-
void
-nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
+nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
{
nni_sock *s = d->d_sock;
- nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || d->d_closing ||
- (nni_pipe_create(&p, d->d_sock, d->d_tran, tpipe) != 0)) {
+ if (s->s_closed || d->d_closing) {
nni_mtx_unlock(&s->s_mx);
+ nni_pipe_close(p);
return;
}
@@ -1413,7 +1403,7 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_mtx_unlock(&s->s_mx);
// Start the initial negotiation I/O...
- pipe_start(p);
+ nni_pipe_start(p);
}
static void
@@ -1483,15 +1473,14 @@ nni_dialer_reap(nni_dialer *d)
}
void
-nni_listener_add_pipe(nni_listener *l, void *tpipe)
+nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
{
nni_sock *s = l->l_sock;
- nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || l->l_closing ||
- (nni_pipe_create(&p, l->l_sock, l->l_tran, tpipe) != 0)) {
+ if (s->s_closed || l->l_closing) {
nni_mtx_unlock(&s->s_mx);
+ nni_pipe_close(p);
return;
}
p->p_listener = l;
@@ -1500,7 +1489,7 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_mtx_unlock(&s->s_mx);
// Start the initial negotiation I/O...
- pipe_start(p);
+ nni_pipe_start(p);
}
static void
@@ -1611,10 +1600,7 @@ nni_pipe_remove(nni_pipe *p)
p->p_dialer = NULL;
if ((d != NULL) && (d->d_pipe == p)) {
d->d_pipe = NULL;
- if (!s->s_closing) {
- dialer_timer_start_locked(
- d); // Kick the timer to redial.
- }
+ dialer_timer_start_locked(d); // Kick the timer to redial.
}
if (s->s_closing) {
nni_cv_wake(&s->s_cv);
diff --git a/src/core/socket.h b/src/core/socket.h
index cebe87ca..4b9c4642 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -62,6 +62,8 @@ extern uint32_t nni_sock_flags(nni_sock *);
// should be executed.
extern void nni_sock_set_pipe_cb(nni_sock *sock, int, nng_pipe_cb, void *);
+extern bool nni_sock_closing(nni_sock *sock);
+
// nni_ctx_open is used to open/create a new context structure.
// Contexts are not supported by most protocols, but for those that do,
// this can offer some improvements for massive concurrency/scalability.
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index 29e83f7a..569e5cae 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -78,6 +78,7 @@ struct nni_pipe {
nni_mtx p_mtx;
nni_cv p_cv;
nni_reap_item p_reap;
+ nni_aio * p_start_aio;
};
extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
@@ -86,14 +87,14 @@ extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *);
extern int nni_sock_add_listener(nni_sock *, nni_listener *);
extern void nni_sock_remove_listener(nni_sock *, nni_listener *);
-extern void nni_dialer_add_pipe(nni_dialer *, void *);
+extern void nni_dialer_add_pipe(nni_dialer *, nni_pipe *);
extern void nni_dialer_shutdown(nni_dialer *);
extern void nni_dialer_reap(nni_dialer *);
extern void nni_dialer_destroy(nni_dialer *);
extern void nni_dialer_timer_start(nni_dialer *);
extern void nni_dialer_close_rele(nni_dialer *);
-extern void nni_listener_add_pipe(nni_listener *, void *);
+extern void nni_listener_add_pipe(nni_listener *, nni_pipe *);
extern void nni_listener_shutdown(nni_listener *);
extern void nni_listener_reap(nni_listener *);
extern void nni_listener_destroy(nni_listener *);
diff --git a/src/core/transport.h b/src/core/transport.h
index 458bfda4..257d232d 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -30,8 +30,7 @@ enum nni_ep_mode {
#define NNI_TRANSPORT_V1 0x54520001
#define NNI_TRANSPORT_V2 0x54520002
#define NNI_TRANSPORT_V3 0x54520003
-#define NNI_TRANSPORT_V4 0x54520004
-#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V4
+#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3
// Option handlers.
struct nni_tran_option {
@@ -132,12 +131,19 @@ struct nni_tran_pipe_ops {
// make further calls on the same pipe.
void (*p_fini)(void *);
+ // p_start starts the pipe running. This gives the transport a
+ // chance to hook into any transport specific negotiation
+ // phase. The pipe will not have its p_send or p_recv calls
+ // started, and will not be access by the "socket" until the
+ // pipe has indicated its readiness by finishing the aio.
+ void (*p_start)(void *, nni_aio *);
+
// p_stop stops the pipe, waiting for any callbacks that are
// outstanding to complete. This is done before tearing down
// resources with p_fini.
void (*p_stop)(void *);
- // p_send queues the message for transmit. If this fails,
+ // p_aio_send queues the message for transmit. If this fails,
// then the caller may try again with the same message (or free
// it). If the call succeeds, then the transport has taken
// ownership of the message, and the caller may not use it