diff options
| -rw-r--r-- | src/core/dialer.c | 12 | ||||
| -rw-r--r-- | src/core/listener.c | 14 | ||||
| -rw-r--r-- | src/core/pipe.c | 69 | ||||
| -rw-r--r-- | src/core/pipe.h | 5 | ||||
| -rw-r--r-- | src/core/socket.c | 60 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 5 | ||||
| -rw-r--r-- | src/core/transport.h | 10 |
7 files changed, 58 insertions, 117 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c index 34e90891..4b2f105d 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -232,24 +232,18 @@ dialer_timer_cb(void *arg) static void dialer_connect_cb(void *arg) { - nni_dialer *d = arg; - nni_pipe * p; + nni_dialer *d = arg; nni_aio * aio = d->d_con_aio; int rv; bool synch; - if ((rv = nni_aio_result(aio)) == 0) { - void *data = nni_aio_get_output(aio, 0); - NNI_ASSERT(data != NULL); - rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data); - } nni_mtx_lock(&d->d_mtx); synch = d->d_synch; nni_mtx_unlock(&d->d_mtx); - switch (rv) { + switch ((rv = nni_aio_result(aio))) { case 0: - nni_dialer_add_pipe(d, p); + nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0)); break; case NNG_ECLOSED: // No further action. case NNG_ECANCELED: // No further action. diff --git a/src/core/listener.c b/src/core/listener.c index debfa5f1..c2e68bb5 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -228,23 +228,17 @@ listener_timer_cb(void *arg) static void listener_accept_cb(void *arg) { - nni_listener *l = arg; - nni_pipe * p; + nni_listener *l = arg; nni_aio * aio = l->l_acc_aio; - int rv; - if ((rv = nni_aio_result(aio)) == 0) { - void *data = nni_aio_get_output(aio, 0); - NNI_ASSERT(data != NULL); - rv = nni_pipe_create(&p, l->l_sock, l->l_tran, data); - } - switch (rv) { + switch (nni_aio_result(aio)) { case 0: - nni_listener_add_pipe(l, p); + nni_listener_add_pipe(l, nni_aio_get_output(aio, 0)); listener_accept_start(l); break; case NNG_ECONNABORTED: // remote condition, no cooldown case NNG_ECONNRESET: // remote condition, no cooldown + case NNG_EPEERAUTH: // peer validation failure listener_accept_start(l); break; case NNG_ECLOSED: // no further action diff --git a/src/core/pipe.c b/src/core/pipe.c index 4f50ac7c..66cd87cc 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -75,9 +75,6 @@ pipe_destroy(nni_pipe *p) } nni_mtx_unlock(&nni_pipe_lk); - // Wait for neg callbacks to finish. (Already closed). - nni_aio_stop(p->p_start_aio); - if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_stop(p->p_proto_data); } @@ -93,7 +90,6 @@ pipe_destroy(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } - nni_aio_fini(p->p_start_aio); nni_cv_fini(&p->p_cv); nni_mtx_fini(&p->p_mtx); NNI_FREE_STRUCT(p); @@ -154,9 +150,6 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) void nni_pipe_close(nni_pipe *p) { - // abort any pending negotiation/start process. - nni_aio_close(p->p_start_aio); - nni_mtx_lock(&p->p_mtx); if (p->p_closed) { // We already did a close. @@ -178,49 +171,12 @@ nni_pipe_close(nni_pipe *p) nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p); } -bool -nni_pipe_closed(nni_pipe *p) -{ - bool rv; - nni_mtx_lock(&p->p_mtx); - rv = p->p_closed; - nni_mtx_unlock(&p->p_mtx); - return (rv); -} - uint16_t nni_pipe_peer(nni_pipe *p) { return (p->p_tran_ops.p_peer(p->p_tran_data)); } -static void -nni_pipe_start_cb(void *arg) -{ - nni_pipe *p = arg; - nni_sock *s = p->p_sock; - nni_aio * aio = p->p_start_aio; - - if (nni_aio_result(aio) != 0) { - nni_pipe_close(p); - return; - } - - nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); - if (nni_pipe_closed(p)) { - nni_pipe_close(p); - return; - } - - if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || - nni_sock_closing(s)) { - nni_pipe_close(p); - return; - } - - nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); -} - int nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { @@ -228,6 +184,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) int rv; void * sdata = nni_sock_proto_data(sock); nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); + uint64_t id; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... @@ -236,7 +193,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) } // Make a private copy of the transport ops. - p->p_start_aio = NULL; p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; p->p_proto_ops = *pops; @@ -253,18 +209,17 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) nni_mtx_init(&p->p_mtx); nni_cv_init(&p->p_cv, &nni_pipe_lk); - if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) { - uint64_t id; - nni_mtx_lock(&nni_pipe_lk); - if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { - p->p_id = (uint32_t) id; - } - nni_mtx_unlock(&nni_pipe_lk); + nni_mtx_lock(&nni_pipe_lk); + if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { + p->p_id = id; + p->p_refcnt = 1; } + nni_mtx_unlock(&nni_pipe_lk); if ((rv != 0) || ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) { nni_pipe_close(p); + nni_pipe_rele(p); return (rv); } @@ -296,16 +251,6 @@ nni_pipe_getopt( return (NNG_ENOTSUP); } -void -nni_pipe_start(nni_pipe *p) -{ - if (p->p_tran_ops.p_start == NULL) { - nni_aio_finish(p->p_start_aio, 0, 0); - } else { - p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio); - } -} - void * nni_pipe_get_proto_data(nni_pipe *p) { diff --git a/src/core/pipe.h b/src/core/pipe.h index 1d73ce51..1e2f2b5d 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -59,11 +59,6 @@ extern uint32_t nni_pipe_listener_id(nni_pipe *); // nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none). extern uint32_t nni_pipe_dialer_id(nni_pipe *); -// nni_pipe_closed returns true if nni_pipe_close was called. -// (This is used by the socket to determine if user closed the pipe -// during callback.) -extern bool nni_pipe_closed(nni_pipe *); - // nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find. extern void nni_pipe_rele(nni_pipe *); diff --git a/src/core/socket.c b/src/core/socket.c index f4e59af5..1fcf5e0b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -399,16 +399,6 @@ nni_sock_rele(nni_sock *s) nni_mtx_unlock(&sock_lk); } -bool -nni_sock_closing(nni_sock *s) -{ - bool rv; - nni_mtx_lock(&s->s_mx); - rv = s->s_closing; - nni_mtx_unlock(&s->s_mx); - return (rv); -} - static void sock_destroy(nni_sock *s) { @@ -1356,7 +1346,11 @@ static void dialer_timer_start_locked(nni_dialer *d) { nni_duration backoff; + nni_sock * sock = d->d_sock; + if (d->d_closing || sock->s_closed) { + return; + } backoff = d->d_currtime; d->d_currtime *= 2; if (d->d_currtime > d->d_maxrtime) { @@ -1383,15 +1377,16 @@ nni_dialer_timer_start(nni_dialer *d) } void -nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) +nni_dialer_add_pipe(nni_dialer *d, void *tpipe) { nni_sock *s = d->d_sock; + nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || d->d_closing) { + if (s->s_closed || d->d_closing || + (nni_pipe_create(&p, s, d->d_tran, tpipe) != 0)) { nni_mtx_unlock(&s->s_mx); - nni_pipe_close(p); return; } @@ -1402,8 +1397,20 @@ nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) d->d_currtime = d->d_inirtime; nni_mtx_unlock(&s->s_mx); - // Start the initial negotiation I/O... - nni_pipe_start(p); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); + + nni_mtx_lock(&s->s_mx); + if ((p->p_closed) || + (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_close(p); + nni_pipe_rele(p); + return; + } + nni_mtx_unlock(&s->s_mx); + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); + nni_pipe_rele(p); } static void @@ -1473,14 +1480,15 @@ nni_dialer_reap(nni_dialer *d) } void -nni_listener_add_pipe(nni_listener *l, nni_pipe *p) +nni_listener_add_pipe(nni_listener *l, void *tpipe) { nni_sock *s = l->l_sock; + nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || l->l_closing) { + if (s->s_closed || l->l_closing || + (nni_pipe_create(&p, s, l->l_tran, tpipe) != 0)) { nni_mtx_unlock(&s->s_mx); - nni_pipe_close(p); return; } p->p_listener = l; @@ -1488,8 +1496,20 @@ nni_listener_add_pipe(nni_listener *l, nni_pipe *p) nni_list_append(&s->s_pipes, p); nni_mtx_unlock(&s->s_mx); - // Start the initial negotiation I/O... - nni_pipe_start(p); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); + + nni_mtx_lock(&s->s_mx); + if ((p->p_closed) || + (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_close(p); + nni_pipe_rele(p); + return; + } + nni_mtx_unlock(&s->s_mx); + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); + nni_pipe_rele(p); } static void diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 569e5cae..29e83f7a 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -78,7 +78,6 @@ struct nni_pipe { nni_mtx p_mtx; nni_cv p_cv; nni_reap_item p_reap; - nni_aio * p_start_aio; }; extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); @@ -87,14 +86,14 @@ extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); extern int nni_sock_add_listener(nni_sock *, nni_listener *); extern void nni_sock_remove_listener(nni_sock *, nni_listener *); -extern void nni_dialer_add_pipe(nni_dialer *, nni_pipe *); +extern void nni_dialer_add_pipe(nni_dialer *, void *); extern void nni_dialer_shutdown(nni_dialer *); extern void nni_dialer_reap(nni_dialer *); extern void nni_dialer_destroy(nni_dialer *); extern void nni_dialer_timer_start(nni_dialer *); extern void nni_dialer_close_rele(nni_dialer *); -extern void nni_listener_add_pipe(nni_listener *, nni_pipe *); +extern void nni_listener_add_pipe(nni_listener *, void *); extern void nni_listener_shutdown(nni_listener *); extern void nni_listener_reap(nni_listener *); extern void nni_listener_destroy(nni_listener *); diff --git a/src/core/transport.h b/src/core/transport.h index 257d232d..8b08e366 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -30,7 +30,8 @@ enum nni_ep_mode { #define NNI_TRANSPORT_V1 0x54520001 #define NNI_TRANSPORT_V2 0x54520002 #define NNI_TRANSPORT_V3 0x54520003 -#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3 +#define NNI_TRANSPORT_V4 0x54520004 +#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V4 // Option handlers. struct nni_tran_option { @@ -131,13 +132,6 @@ struct nni_tran_pipe_ops { // make further calls on the same pipe. void (*p_fini)(void *); - // p_start starts the pipe running. This gives the transport a - // chance to hook into any transport specific negotiation - // phase. The pipe will not have its p_send or p_recv calls - // started, and will not be access by the "socket" until the - // pipe has indicated its readiness by finishing the aio. - void (*p_start)(void *, nni_aio *); - // p_stop stops the pipe, waiting for any callbacks that are // outstanding to complete. This is done before tearing down // resources with p_fini. |
