diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 10 | ||||
| -rw-r--r-- | src/core/dialer.c | 12 | ||||
| -rw-r--r-- | src/core/listener.c | 19 | ||||
| -rw-r--r-- | src/core/pipe.c | 67 | ||||
| -rw-r--r-- | src/core/pipe.h | 5 | ||||
| -rw-r--r-- | src/core/socket.c | 52 | ||||
| -rw-r--r-- | src/core/socket.h | 2 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 5 | ||||
| -rw-r--r-- | src/core/transport.h | 12 |
9 files changed, 121 insertions, 63 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 294a0b92..40638bce 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -217,17 +217,11 @@ void nni_aio_stop(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; - nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECANCELED); - } + nni_aio_abort(aio, NNG_ECANCELED); nni_aio_wait(aio); } diff --git a/src/core/dialer.c b/src/core/dialer.c index 4b2f105d..34e90891 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -232,18 +232,24 @@ dialer_timer_cb(void *arg) static void dialer_connect_cb(void *arg) { - nni_dialer *d = arg; + nni_dialer *d = arg; + nni_pipe * p; nni_aio * aio = d->d_con_aio; int rv; bool synch; + if ((rv = nni_aio_result(aio)) == 0) { + void *data = nni_aio_get_output(aio, 0); + NNI_ASSERT(data != NULL); + rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data); + } nni_mtx_lock(&d->d_mtx); synch = d->d_synch; nni_mtx_unlock(&d->d_mtx); - switch ((rv = nni_aio_result(aio))) { + switch (rv) { case 0: - nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0)); + nni_dialer_add_pipe(d, p); break; case NNG_ECLOSED: // No further action. case NNG_ECANCELED: // No further action. diff --git a/src/core/listener.c b/src/core/listener.c index 17062d6e..debfa5f1 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -181,8 +181,6 @@ nni_listener_close(nni_listener *l) } l->l_closed = true; nni_mtx_unlock(&listeners_lk); - nni_aio_close(l->l_acc_aio); - nni_aio_close(l->l_tmo_aio); // Remove us from the table so we cannot be found. // This is done fairly early in the teardown process. @@ -230,25 +228,28 @@ listener_timer_cb(void *arg) static void listener_accept_cb(void *arg) { - nni_listener *l = arg; + nni_listener *l = arg; + nni_pipe * p; nni_aio * aio = l->l_acc_aio; + int rv; - switch (nni_aio_result(aio)) { + if ((rv = nni_aio_result(aio)) == 0) { + void *data = nni_aio_get_output(aio, 0); + NNI_ASSERT(data != NULL); + rv = nni_pipe_create(&p, l->l_sock, l->l_tran, data); + } + switch (rv) { case 0: - nni_listener_add_pipe(l, nni_aio_get_output(aio, 0)); + nni_listener_add_pipe(l, p); listener_accept_start(l); break; case NNG_ECONNABORTED: // remote condition, no cooldown case NNG_ECONNRESET: // remote condition, no cooldown - case NNG_EPEERAUTH: // peer validation failure listener_accept_start(l); break; case NNG_ECLOSED: // no further action case NNG_ECANCELED: // no further action break; - case NNG_ENOMEM: - case NNG_ENOFILES: - case NNG_ENOSPC: default: // We don't really know why we failed, but we backoff // here. This is because errors here are probably due diff --git a/src/core/pipe.c b/src/core/pipe.c index 374c45c8..4f50ac7c 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -75,6 +75,9 @@ pipe_destroy(nni_pipe *p) } nni_mtx_unlock(&nni_pipe_lk); + // Wait for neg callbacks to finish. (Already closed). + nni_aio_stop(p->p_start_aio); + if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_stop(p->p_proto_data); } @@ -90,6 +93,7 @@ pipe_destroy(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } + nni_aio_fini(p->p_start_aio); nni_cv_fini(&p->p_cv); nni_mtx_fini(&p->p_mtx); NNI_FREE_STRUCT(p); @@ -150,6 +154,9 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) void nni_pipe_close(nni_pipe *p) { + // abort any pending negotiation/start process. + nni_aio_close(p->p_start_aio); + nni_mtx_lock(&p->p_mtx); if (p->p_closed) { // We already did a close. @@ -171,12 +178,49 @@ nni_pipe_close(nni_pipe *p) nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p); } +bool +nni_pipe_closed(nni_pipe *p) +{ + bool rv; + nni_mtx_lock(&p->p_mtx); + rv = p->p_closed; + nni_mtx_unlock(&p->p_mtx); + return (rv); +} + uint16_t nni_pipe_peer(nni_pipe *p) { return (p->p_tran_ops.p_peer(p->p_tran_data)); } +static void +nni_pipe_start_cb(void *arg) +{ + nni_pipe *p = arg; + nni_sock *s = p->p_sock; + nni_aio * aio = p->p_start_aio; + + if (nni_aio_result(aio) != 0) { + nni_pipe_close(p); + return; + } + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); + if (nni_pipe_closed(p)) { + nni_pipe_close(p); + return; + } + + if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || + nni_sock_closing(s)) { + nni_pipe_close(p); + return; + } + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); +} + int nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { @@ -184,7 +228,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) int rv; void * sdata = nni_sock_proto_data(sock); nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); - uint64_t id; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... @@ -193,6 +236,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) } // Make a private copy of the transport ops. + p->p_start_aio = NULL; p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; p->p_proto_ops = *pops; @@ -209,11 +253,14 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) nni_mtx_init(&p->p_mtx); nni_cv_init(&p->p_cv, &nni_pipe_lk); - nni_mtx_lock(&nni_pipe_lk); - if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { - p->p_id = (uint32_t) id; + if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) { + uint64_t id; + nni_mtx_lock(&nni_pipe_lk); + if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { + p->p_id = (uint32_t) id; + } + nni_mtx_unlock(&nni_pipe_lk); } - nni_mtx_unlock(&nni_pipe_lk); if ((rv != 0) || ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) { @@ -249,6 +296,16 @@ nni_pipe_getopt( return (NNG_ENOTSUP); } +void +nni_pipe_start(nni_pipe *p) +{ + if (p->p_tran_ops.p_start == NULL) { + nni_aio_finish(p->p_start_aio, 0, 0); + } else { + p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio); + } +} + void * nni_pipe_get_proto_data(nni_pipe *p) { diff --git a/src/core/pipe.h b/src/core/pipe.h index 1e2f2b5d..1d73ce51 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -59,6 +59,11 @@ extern uint32_t nni_pipe_listener_id(nni_pipe *); // nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none). extern uint32_t nni_pipe_dialer_id(nni_pipe *); +// nni_pipe_closed returns true if nni_pipe_close was called. +// (This is used by the socket to determine if user closed the pipe +// during callback.) +extern bool nni_pipe_closed(nni_pipe *); + // nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find. extern void nni_pipe_rele(nni_pipe *); diff --git a/src/core/socket.c b/src/core/socket.c index 0fa776f1..f4e59af5 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -399,6 +399,16 @@ nni_sock_rele(nni_sock *s) nni_mtx_unlock(&sock_lk); } +bool +nni_sock_closing(nni_sock *s) +{ + bool rv; + nni_mtx_lock(&s->s_mx); + rv = s->s_closing; + nni_mtx_unlock(&s->s_mx); + return (rv); +} + static void sock_destroy(nni_sock *s) { @@ -1372,36 +1382,16 @@ nni_dialer_timer_start(nni_dialer *d) nni_mtx_unlock(&s->s_mx); } -static void -pipe_start(nni_pipe *p) -{ - nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); - - // As the callback above that would close the pipe runs on - // this thread, we can skip the lock. - if (p->p_closed) { - return; - } - - if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) { - nni_pipe_close(p); - return; - } - - nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); -} - void -nni_dialer_add_pipe(nni_dialer *d, void *tpipe) +nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) { nni_sock *s = d->d_sock; - nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || d->d_closing || - (nni_pipe_create(&p, d->d_sock, d->d_tran, tpipe) != 0)) { + if (s->s_closed || d->d_closing) { nni_mtx_unlock(&s->s_mx); + nni_pipe_close(p); return; } @@ -1413,7 +1403,7 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) nni_mtx_unlock(&s->s_mx); // Start the initial negotiation I/O... - pipe_start(p); + nni_pipe_start(p); } static void @@ -1483,15 +1473,14 @@ nni_dialer_reap(nni_dialer *d) } void -nni_listener_add_pipe(nni_listener *l, void *tpipe) +nni_listener_add_pipe(nni_listener *l, nni_pipe *p) { nni_sock *s = l->l_sock; - nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || l->l_closing || - (nni_pipe_create(&p, l->l_sock, l->l_tran, tpipe) != 0)) { + if (s->s_closed || l->l_closing) { nni_mtx_unlock(&s->s_mx); + nni_pipe_close(p); return; } p->p_listener = l; @@ -1500,7 +1489,7 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_mtx_unlock(&s->s_mx); // Start the initial negotiation I/O... - pipe_start(p); + nni_pipe_start(p); } static void @@ -1611,10 +1600,7 @@ nni_pipe_remove(nni_pipe *p) p->p_dialer = NULL; if ((d != NULL) && (d->d_pipe == p)) { d->d_pipe = NULL; - if (!s->s_closing) { - dialer_timer_start_locked( - d); // Kick the timer to redial. - } + dialer_timer_start_locked(d); // Kick the timer to redial. } if (s->s_closing) { nni_cv_wake(&s->s_cv); diff --git a/src/core/socket.h b/src/core/socket.h index cebe87ca..4b9c4642 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -62,6 +62,8 @@ extern uint32_t nni_sock_flags(nni_sock *); // should be executed. extern void nni_sock_set_pipe_cb(nni_sock *sock, int, nng_pipe_cb, void *); +extern bool nni_sock_closing(nni_sock *sock); + // nni_ctx_open is used to open/create a new context structure. // Contexts are not supported by most protocols, but for those that do, // this can offer some improvements for massive concurrency/scalability. diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 29e83f7a..569e5cae 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -78,6 +78,7 @@ struct nni_pipe { nni_mtx p_mtx; nni_cv p_cv; nni_reap_item p_reap; + nni_aio * p_start_aio; }; extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); @@ -86,14 +87,14 @@ extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); extern int nni_sock_add_listener(nni_sock *, nni_listener *); extern void nni_sock_remove_listener(nni_sock *, nni_listener *); -extern void nni_dialer_add_pipe(nni_dialer *, void *); +extern void nni_dialer_add_pipe(nni_dialer *, nni_pipe *); extern void nni_dialer_shutdown(nni_dialer *); extern void nni_dialer_reap(nni_dialer *); extern void nni_dialer_destroy(nni_dialer *); extern void nni_dialer_timer_start(nni_dialer *); extern void nni_dialer_close_rele(nni_dialer *); -extern void nni_listener_add_pipe(nni_listener *, void *); +extern void nni_listener_add_pipe(nni_listener *, nni_pipe *); extern void nni_listener_shutdown(nni_listener *); extern void nni_listener_reap(nni_listener *); extern void nni_listener_destroy(nni_listener *); diff --git a/src/core/transport.h b/src/core/transport.h index 458bfda4..257d232d 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -30,8 +30,7 @@ enum nni_ep_mode { #define NNI_TRANSPORT_V1 0x54520001 #define NNI_TRANSPORT_V2 0x54520002 #define NNI_TRANSPORT_V3 0x54520003 -#define NNI_TRANSPORT_V4 0x54520004 -#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V4 +#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3 // Option handlers. struct nni_tran_option { @@ -132,12 +131,19 @@ struct nni_tran_pipe_ops { // make further calls on the same pipe. void (*p_fini)(void *); + // p_start starts the pipe running. This gives the transport a + // chance to hook into any transport specific negotiation + // phase. The pipe will not have its p_send or p_recv calls + // started, and will not be access by the "socket" until the + // pipe has indicated its readiness by finishing the aio. + void (*p_start)(void *, nni_aio *); + // p_stop stops the pipe, waiting for any callbacks that are // outstanding to complete. This is done before tearing down // resources with p_fini. void (*p_stop)(void *); - // p_send queues the message for transmit. If this fails, + // p_aio_send queues the message for transmit. If this fails, // then the caller may try again with the same message (or free // it). If the call succeeds, then the transport has taken // ownership of the message, and the caller may not use it |
