diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-07-31 12:33:58 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-08-05 18:45:04 +0300 |
| commit | d7f7c896c0ede24249ef63b1e45b1878bf4bd473 (patch) | |
| tree | 32eece7d91a648f24cb174096fb9667cab978f37 /src/core | |
| parent | ccc24a8e508131a2226474642a038baaa2cbcc8c (diff) | |
| download | nng-d7f7c896c0ede24249ef63b1e45b1878bf4bd473.tar.gz nng-d7f7c896c0ede24249ef63b1e45b1878bf4bd473.tar.bz2 nng-d7f7c896c0ede24249ef63b1e45b1878bf4bd473.zip | |
fixes #599 nng_dial sync should not return until added to socket
fixes #208 pipe start should occur before connect / accept
fixes #616 Race condition closing between header & body
This refactors the transports to handle their own connection
handshaking before passing the pipe to the socket. This
changes and simplifies the setup. This also fixes a rather
challenging race condition described by #616.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 10 | ||||
| -rw-r--r-- | src/core/dialer.c | 12 | ||||
| -rw-r--r-- | src/core/listener.c | 19 | ||||
| -rw-r--r-- | src/core/pipe.c | 67 | ||||
| -rw-r--r-- | src/core/pipe.h | 5 | ||||
| -rw-r--r-- | src/core/socket.c | 52 | ||||
| -rw-r--r-- | src/core/socket.h | 2 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 5 | ||||
| -rw-r--r-- | src/core/transport.h | 12 |
9 files changed, 63 insertions, 121 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 40638bce..294a0b92 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -217,11 +217,17 @@ void nni_aio_stop(nni_aio *aio) { if (aio != NULL) { + nni_aio_cancelfn cancelfn; + nni_mtx_lock(&nni_aio_lk); - aio->a_stop = true; + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - nni_aio_abort(aio, NNG_ECANCELED); + if (cancelfn != NULL) { + cancelfn(aio, NNG_ECANCELED); + } nni_aio_wait(aio); } diff --git a/src/core/dialer.c b/src/core/dialer.c index 34e90891..4b2f105d 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -232,24 +232,18 @@ dialer_timer_cb(void *arg) static void dialer_connect_cb(void *arg) { - nni_dialer *d = arg; - nni_pipe * p; + nni_dialer *d = arg; nni_aio * aio = d->d_con_aio; int rv; bool synch; - if ((rv = nni_aio_result(aio)) == 0) { - void *data = nni_aio_get_output(aio, 0); - NNI_ASSERT(data != NULL); - rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data); - } nni_mtx_lock(&d->d_mtx); synch = d->d_synch; nni_mtx_unlock(&d->d_mtx); - switch (rv) { + switch ((rv = nni_aio_result(aio))) { case 0: - nni_dialer_add_pipe(d, p); + nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0)); break; case NNG_ECLOSED: // No further action. case NNG_ECANCELED: // No further action. diff --git a/src/core/listener.c b/src/core/listener.c index debfa5f1..17062d6e 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -181,6 +181,8 @@ nni_listener_close(nni_listener *l) } l->l_closed = true; nni_mtx_unlock(&listeners_lk); + nni_aio_close(l->l_acc_aio); + nni_aio_close(l->l_tmo_aio); // Remove us from the table so we cannot be found. // This is done fairly early in the teardown process. @@ -228,28 +230,25 @@ listener_timer_cb(void *arg) static void listener_accept_cb(void *arg) { - nni_listener *l = arg; - nni_pipe * p; + nni_listener *l = arg; nni_aio * aio = l->l_acc_aio; - int rv; - if ((rv = nni_aio_result(aio)) == 0) { - void *data = nni_aio_get_output(aio, 0); - NNI_ASSERT(data != NULL); - rv = nni_pipe_create(&p, l->l_sock, l->l_tran, data); - } - switch (rv) { + switch (nni_aio_result(aio)) { case 0: - nni_listener_add_pipe(l, p); + nni_listener_add_pipe(l, nni_aio_get_output(aio, 0)); listener_accept_start(l); break; case NNG_ECONNABORTED: // remote condition, no cooldown case NNG_ECONNRESET: // remote condition, no cooldown + case NNG_EPEERAUTH: // peer validation failure listener_accept_start(l); break; case NNG_ECLOSED: // no further action case NNG_ECANCELED: // no further action break; + case NNG_ENOMEM: + case NNG_ENOFILES: + case NNG_ENOSPC: default: // We don't really know why we failed, but we backoff // here. This is because errors here are probably due diff --git a/src/core/pipe.c b/src/core/pipe.c index 4f50ac7c..374c45c8 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -75,9 +75,6 @@ pipe_destroy(nni_pipe *p) } nni_mtx_unlock(&nni_pipe_lk); - // Wait for neg callbacks to finish. (Already closed). - nni_aio_stop(p->p_start_aio); - if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_stop(p->p_proto_data); } @@ -93,7 +90,6 @@ pipe_destroy(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } - nni_aio_fini(p->p_start_aio); nni_cv_fini(&p->p_cv); nni_mtx_fini(&p->p_mtx); NNI_FREE_STRUCT(p); @@ -154,9 +150,6 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) void nni_pipe_close(nni_pipe *p) { - // abort any pending negotiation/start process. - nni_aio_close(p->p_start_aio); - nni_mtx_lock(&p->p_mtx); if (p->p_closed) { // We already did a close. @@ -178,49 +171,12 @@ nni_pipe_close(nni_pipe *p) nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p); } -bool -nni_pipe_closed(nni_pipe *p) -{ - bool rv; - nni_mtx_lock(&p->p_mtx); - rv = p->p_closed; - nni_mtx_unlock(&p->p_mtx); - return (rv); -} - uint16_t nni_pipe_peer(nni_pipe *p) { return (p->p_tran_ops.p_peer(p->p_tran_data)); } -static void -nni_pipe_start_cb(void *arg) -{ - nni_pipe *p = arg; - nni_sock *s = p->p_sock; - nni_aio * aio = p->p_start_aio; - - if (nni_aio_result(aio) != 0) { - nni_pipe_close(p); - return; - } - - nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); - if (nni_pipe_closed(p)) { - nni_pipe_close(p); - return; - } - - if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || - nni_sock_closing(s)) { - nni_pipe_close(p); - return; - } - - nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); -} - int nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { @@ -228,6 +184,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) int rv; void * sdata = nni_sock_proto_data(sock); nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); + uint64_t id; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... @@ -236,7 +193,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) } // Make a private copy of the transport ops. - p->p_start_aio = NULL; p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; p->p_proto_ops = *pops; @@ -253,14 +209,11 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) nni_mtx_init(&p->p_mtx); nni_cv_init(&p->p_cv, &nni_pipe_lk); - if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) { - uint64_t id; - nni_mtx_lock(&nni_pipe_lk); - if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { - p->p_id = (uint32_t) id; - } - nni_mtx_unlock(&nni_pipe_lk); + nni_mtx_lock(&nni_pipe_lk); + if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { + p->p_id = (uint32_t) id; } + nni_mtx_unlock(&nni_pipe_lk); if ((rv != 0) || ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) { @@ -296,16 +249,6 @@ nni_pipe_getopt( return (NNG_ENOTSUP); } -void -nni_pipe_start(nni_pipe *p) -{ - if (p->p_tran_ops.p_start == NULL) { - nni_aio_finish(p->p_start_aio, 0, 0); - } else { - p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio); - } -} - void * nni_pipe_get_proto_data(nni_pipe *p) { diff --git a/src/core/pipe.h b/src/core/pipe.h index 1d73ce51..1e2f2b5d 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -59,11 +59,6 @@ extern uint32_t nni_pipe_listener_id(nni_pipe *); // nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none). extern uint32_t nni_pipe_dialer_id(nni_pipe *); -// nni_pipe_closed returns true if nni_pipe_close was called. -// (This is used by the socket to determine if user closed the pipe -// during callback.) -extern bool nni_pipe_closed(nni_pipe *); - // nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find. extern void nni_pipe_rele(nni_pipe *); diff --git a/src/core/socket.c b/src/core/socket.c index f4e59af5..0fa776f1 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -399,16 +399,6 @@ nni_sock_rele(nni_sock *s) nni_mtx_unlock(&sock_lk); } -bool -nni_sock_closing(nni_sock *s) -{ - bool rv; - nni_mtx_lock(&s->s_mx); - rv = s->s_closing; - nni_mtx_unlock(&s->s_mx); - return (rv); -} - static void sock_destroy(nni_sock *s) { @@ -1382,16 +1372,36 @@ nni_dialer_timer_start(nni_dialer *d) nni_mtx_unlock(&s->s_mx); } +static void +pipe_start(nni_pipe *p) +{ + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); + + // As the callback above that would close the pipe runs on + // this thread, we can skip the lock. + if (p->p_closed) { + return; + } + + if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) { + nni_pipe_close(p); + return; + } + + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); +} + void -nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) +nni_dialer_add_pipe(nni_dialer *d, void *tpipe) { nni_sock *s = d->d_sock; + nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || d->d_closing) { + if (s->s_closed || d->d_closing || + (nni_pipe_create(&p, d->d_sock, d->d_tran, tpipe) != 0)) { nni_mtx_unlock(&s->s_mx); - nni_pipe_close(p); return; } @@ -1403,7 +1413,7 @@ nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) nni_mtx_unlock(&s->s_mx); // Start the initial negotiation I/O... - nni_pipe_start(p); + pipe_start(p); } static void @@ -1473,14 +1483,15 @@ nni_dialer_reap(nni_dialer *d) } void -nni_listener_add_pipe(nni_listener *l, nni_pipe *p) +nni_listener_add_pipe(nni_listener *l, void *tpipe) { nni_sock *s = l->l_sock; + nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || l->l_closing) { + if (s->s_closed || l->l_closing || + (nni_pipe_create(&p, l->l_sock, l->l_tran, tpipe) != 0)) { nni_mtx_unlock(&s->s_mx); - nni_pipe_close(p); return; } p->p_listener = l; @@ -1489,7 +1500,7 @@ nni_listener_add_pipe(nni_listener *l, nni_pipe *p) nni_mtx_unlock(&s->s_mx); // Start the initial negotiation I/O... - nni_pipe_start(p); + pipe_start(p); } static void @@ -1600,7 +1611,10 @@ nni_pipe_remove(nni_pipe *p) p->p_dialer = NULL; if ((d != NULL) && (d->d_pipe == p)) { d->d_pipe = NULL; - dialer_timer_start_locked(d); // Kick the timer to redial. + if (!s->s_closing) { + dialer_timer_start_locked( + d); // Kick the timer to redial. + } } if (s->s_closing) { nni_cv_wake(&s->s_cv); diff --git a/src/core/socket.h b/src/core/socket.h index 4b9c4642..cebe87ca 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -62,8 +62,6 @@ extern uint32_t nni_sock_flags(nni_sock *); // should be executed. extern void nni_sock_set_pipe_cb(nni_sock *sock, int, nng_pipe_cb, void *); -extern bool nni_sock_closing(nni_sock *sock); - // nni_ctx_open is used to open/create a new context structure. // Contexts are not supported by most protocols, but for those that do, // this can offer some improvements for massive concurrency/scalability. diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 569e5cae..29e83f7a 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -78,7 +78,6 @@ struct nni_pipe { nni_mtx p_mtx; nni_cv p_cv; nni_reap_item p_reap; - nni_aio * p_start_aio; }; extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); @@ -87,14 +86,14 @@ extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); extern int nni_sock_add_listener(nni_sock *, nni_listener *); extern void nni_sock_remove_listener(nni_sock *, nni_listener *); -extern void nni_dialer_add_pipe(nni_dialer *, nni_pipe *); +extern void nni_dialer_add_pipe(nni_dialer *, void *); extern void nni_dialer_shutdown(nni_dialer *); extern void nni_dialer_reap(nni_dialer *); extern void nni_dialer_destroy(nni_dialer *); extern void nni_dialer_timer_start(nni_dialer *); extern void nni_dialer_close_rele(nni_dialer *); -extern void nni_listener_add_pipe(nni_listener *, nni_pipe *); +extern void nni_listener_add_pipe(nni_listener *, void *); extern void nni_listener_shutdown(nni_listener *); extern void nni_listener_reap(nni_listener *); extern void nni_listener_destroy(nni_listener *); diff --git a/src/core/transport.h b/src/core/transport.h index 257d232d..458bfda4 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -30,7 +30,8 @@ enum nni_ep_mode { #define NNI_TRANSPORT_V1 0x54520001 #define NNI_TRANSPORT_V2 0x54520002 #define NNI_TRANSPORT_V3 0x54520003 -#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3 +#define NNI_TRANSPORT_V4 0x54520004 +#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V4 // Option handlers. struct nni_tran_option { @@ -131,19 +132,12 @@ struct nni_tran_pipe_ops { // make further calls on the same pipe. void (*p_fini)(void *); - // p_start starts the pipe running. This gives the transport a - // chance to hook into any transport specific negotiation - // phase. The pipe will not have its p_send or p_recv calls - // started, and will not be access by the "socket" until the - // pipe has indicated its readiness by finishing the aio. - void (*p_start)(void *, nni_aio *); - // p_stop stops the pipe, waiting for any callbacks that are // outstanding to complete. This is done before tearing down // resources with p_fini. void (*p_stop)(void *); - // p_aio_send queues the message for transmit. If this fails, + // p_send queues the message for transmit. If this fails, // then the caller may try again with the same message (or free // it). If the call succeeds, then the transport has taken // ownership of the message, and the caller may not use it |
