From 7f95fde8d752dd93c20ff0a209334f4aec549111 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 15 Jul 2017 15:45:48 -0700 Subject: Some initial progress on *connect* async. This actually is breaking at the moment, because we don't have good integration with timeouts, and there are some frustrating races with timeouts at points that can cause apparent hangs. --- src/core/endpt.c | 207 +++++++++++++++++++++++++++++++++++++----------------- src/core/endpt.h | 3 +- src/core/pipe.c | 9 +-- src/core/socket.c | 25 ++++++- src/core/socket.h | 4 ++ 5 files changed, 172 insertions(+), 76 deletions(-) (limited to 'src') diff --git a/src/core/endpt.c b/src/core/endpt.c index fef6ac83..a99cd21a 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -18,6 +18,8 @@ static void nni_ep_accept_start(nni_ep *); static void nni_ep_accept_done(void *); +static void nni_ep_connect_start(nni_ep *); +static void nni_ep_connect_done(void *); static nni_idhash *nni_eps; @@ -56,13 +58,14 @@ nni_ep_destroy(nni_ep *ep) return; } nni_aio_fini(&ep->ep_acc_aio); + nni_aio_fini(&ep->ep_con_aio); + nni_aio_fini(&ep->ep_con_syn); if (ep->ep_data != NULL) { ep->ep_ops.ep_fini(ep->ep_data); } if (ep->ep_id != 0) { nni_idhash_remove(nni_eps, ep->ep_id); } - nni_thr_fini(&ep->ep_thr); nni_cv_fini(&ep->ep_cv); nni_mtx_fini(&ep->ep_mtx); NNI_FREE_STRUCT(ep); @@ -91,6 +94,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) ep->ep_pipe = NULL; ep->ep_id = id; ep->ep_data = NULL; + ep->ep_refcnt = 0; NNI_LIST_NODE_INIT(&ep->ep_node); @@ -107,6 +111,16 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) nni_ep_destroy(ep); return (rv); } + rv = nni_aio_init(&ep->ep_con_aio, nni_ep_connect_done, ep); + if (rv != 0) { + nni_ep_destroy(ep); + return (rv); + } + rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL); + if (rv != 0) { + nni_ep_destroy(ep); + return (rv); + } ep->ep_sock = sock; ep->ep_tran = tran; @@ -132,16 +146,15 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) void nni_ep_close(nni_ep *ep) { - // Abort any in-flight operations. - nni_aio_stop(&ep->ep_acc_aio); - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed == 0) { - ep->ep_closed = 1; - ep->ep_ops.ep_close(ep->ep_data); + if (ep->ep_closed) { + nni_mtx_unlock(&ep->ep_mtx); + return; } - nni_cv_wake(&ep->ep_cv); + ep->ep_closed = 1; nni_mtx_unlock(&ep->ep_mtx); + + ep->ep_ops.ep_close(ep->ep_data); } static void @@ -151,9 +164,25 @@ nni_ep_reap(nni_ep *ep) nni_ep_close(ep); // Extra sanity. + // Abort any in-flight operations. + nni_aio_stop(&ep->ep_acc_aio); + nni_aio_stop(&ep->ep_con_aio); + nni_aio_stop(&ep->ep_con_syn); + // Take us off the sock list. nni_sock_ep_remove(ep->ep_sock, ep); + // Make sure any other unlocked users (references) are gone + // before we actually remove the memory. We should not have + // to wait long as we have closed the underlying pipe and + // done everything we can to wake any waiter (synchronous connect) + // gracefully. + nni_mtx_lock(&ep->ep_mtx); + while (ep->ep_refcnt != 0) { + nni_cv_wait(&ep->ep_cv); + } + nni_mtx_unlock(&ep->ep_mtx); + nni_ep_destroy(ep); } @@ -173,59 +202,73 @@ nni_ep_stop(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); } -static int -nni_ep_connect_aio(nni_ep *ep, void **pipep) +static void +nni_ep_connect_done(void *arg) { - nni_aio aio; - int rv; + nni_ep * ep = arg; + nni_aio * aio = &ep->ep_con_aio; + void * tpipe; + nni_pipe * pipe; + const nni_tran_pipe *ops; - nni_aio_init(&aio, NULL, NULL); - aio.a_endpt = ep->ep_data; - ep->ep_ops.ep_connect(ep->ep_data, &aio); - nni_aio_wait(&aio); + int rv; + + nni_mtx_lock(&ep->ep_mtx); + if ((rv = nni_aio_result(aio)) == 0) { + + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); - if ((rv = nni_aio_result(&aio)) == 0) { - *pipep = aio.a_pipe; + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); + if (rv != 0) { + ops->p_fini(tpipe); + } } - nni_aio_fini(&aio); - return (rv); + +done: + switch (rv) { + case 0: + pipe->p_tran_ops = *ops; + pipe->p_tran_data = tpipe; + nni_pipe_start(pipe); + // No further outgoing connects -- we will restart a + // connection from the pipe when the pipe is removed. + break; + case NNG_ECLOSED: + case NNG_ECANCELED: + // Canceled/closed -- stop everything. + break; + default: + // cooldown = nni_clock() + rtime; + // rtime *= 2; + // if ((maxrtime >= defrtime) && (rtime > maxrtime)) { + // rtime = maxrtime; + // } + + // XXX: We need to inject a cooldown, and then try again. For + // now we just try again immediately. This is very suboptimal. + nni_ep_connect_start(ep); + break; + } + nni_mtx_unlock(&ep->ep_mtx); } -static int -nni_ep_connect_sync(nni_ep *ep) +static void +nni_ep_connect_start(nni_ep *ep) { - nni_pipe *pipe; - int rv; + nni_aio *aio = &ep->ep_acc_aio; - nni_mtx_lock(&ep->ep_mtx); + // Call with the Endpoint lock held. if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); + return; } - pipe->p_ep = ep; - nni_list_append(&ep->ep_pipes, pipe); - nni_mtx_unlock(&ep->ep_mtx); - rv = nni_ep_connect_aio(ep, &pipe->p_tran_data); - if (rv != 0) { - if (rv != NNG_ECLOSED) { // HACK ALERT - nni_pipe_stop(pipe); - } - return (rv); - } - nni_pipe_start(pipe); - nni_mtx_lock(&ep->ep_mtx); - ep->ep_pipe = pipe; - nni_mtx_unlock(&ep->ep_mtx); - return (0); + aio->a_endpt = ep->ep_data; + ep->ep_ops.ep_connect(ep->ep_data, aio); } // nni_dialer is the thread worker that dials in the background. +#if 0 // this has the logic for timing out... remove when done. static void nni_dialer(void *arg) { @@ -287,13 +330,20 @@ nni_dialer(void *arg) nni_mtx_unlock(&ep->ep_mtx); } } +#endif int nni_ep_dial(nni_ep *ep, int flags) { - int rv = 0; + int rv = 0; + nni_aio * aio; + void * tpipe; + nni_pipe * pipe; + const nni_tran_pipe *ops; nni_mtx_lock(&ep->ep_mtx); + ops = ep->ep_tran->tran_pipe; + if (ep->ep_mode != NNI_EP_MODE_DIAL) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); @@ -307,31 +357,59 @@ nni_ep_dial(nni_ep *ep, int flags) return (NNG_ECLOSED); } - if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } ep->ep_started = 1; - if (flags & NNG_FLAG_SYNCH) { + if ((flags & NNG_FLAG_SYNCH) == 0) { + nni_ep_connect_start(ep); nni_mtx_unlock(&ep->ep_mtx); - rv = nni_ep_connect_sync(ep); - if (rv != 0) { - nni_thr_fini(&ep->ep_thr); - ep->ep_started = 0; - return (rv); - } - nni_mtx_lock(&ep->ep_mtx); + return (0); } - nni_thr_run(&ep->ep_thr); + // This one is kind of special, since we need + // to block for the connection to complete. Ick. + aio = &ep->ep_con_syn; + aio->a_endpt = ep->ep_data; + ep->ep_ops.ep_connect(ep->ep_data, aio); + + // We're about to drop the lock, but we cannot allow the + // endpoint to be removed. Put a hold on it. + ep->ep_refcnt++; nni_mtx_unlock(&ep->ep_mtx); + nni_aio_wait(aio); + + nni_mtx_lock(&ep->ep_mtx); + ep->ep_refcnt--; + + if (ep->ep_closed) { + rv = NNG_ECLOSED; + } else { + rv = nni_aio_result(aio); + } + + if (rv != 0) { + ep->ep_started = 0; + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); + return (rv); + } + + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); + if (rv != 0) { + ops->p_fini(tpipe); + } else { + pipe->p_tran_ops = *ops; + pipe->p_tran_data = tpipe; + nni_pipe_start(pipe); + } + + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); return (rv); } -static void nni_ep_accept_start(nni_ep *ep); - static void nni_ep_accept_done(void *arg) { @@ -348,7 +426,8 @@ nni_ep_accept_done(void *arg) if ((rv = nni_aio_result(aio)) != 0) { goto done; } - NNI_ASSERT((tpipe = aio->a_pipe) != NULL); + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); if (rv != 0) { diff --git a/src/core/endpt.h b/src/core/endpt.h index a47586a0..3350bc59 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -26,18 +26,19 @@ struct nni_ep { nni_list_node ep_node; // per socket list nni_sock * ep_sock; char ep_addr[NNG_MAXADDRLEN]; - nni_thr ep_thr; int ep_mode; int ep_started; int ep_stop; int ep_closed; // full shutdown int ep_bound; // true if we bound locally + int ep_refcnt; nni_mtx ep_mtx; nni_cv ep_cv; nni_pipe * ep_pipe; // Connected pipe (dialers only) nni_list ep_pipes; nni_aio ep_acc_aio; nni_aio ep_con_aio; + nni_aio ep_con_syn; // used for sync connect nni_taskq_ent ep_reap_tqe; }; diff --git a/src/core/pipe.c b/src/core/pipe.c index 8f2099a9..f1e8014e 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -91,8 +91,6 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) void nni_pipe_close(nni_pipe *p) { - nni_sock *sock = p->p_sock; - nni_mtx_lock(&p->p_mtx); if (p->p_reap == 1) { // We already did a close. @@ -200,18 +198,13 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran) nni_pipe_destroy(p); return (rv); } - p->p_sock = sock; // Make a copy of the transport ops. We can override entry points // and we avoid an extra dereference on hot code paths. p->p_tran_ops = *tran->tran_pipe; - // Save the protocol destructor. - p->p_proto_dtor = sock->s_pipe_ops.pipe_fini; - // Initialize protocol pipe data. - rv = sock->s_pipe_ops.pipe_init(&p->p_proto_data, p, sock->s_data); - if (rv != 0) { + if ((rv = nni_sock_pipe_init(sock, p)) != 0) { nni_pipe_destroy(p); return (rv); } diff --git a/src/core/socket.c b/src/core/socket.c index 3a4f36e5..75ae8450 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -76,6 +77,27 @@ nni_sock_rele(nni_sock *sock) nni_objhash_unref(nni_socks, sock->s_id); } +int +nni_sock_pipe_init(nni_sock *sock, nni_pipe *pipe) +{ + int rv; + + // Initialize protocol pipe data. + nni_mtx_lock(&sock->s_mx); + rv = sock->s_pipe_ops.pipe_init( + &pipe->p_proto_data, pipe, sock->s_data); + if (rv != 0) { + nni_mtx_lock(&sock->s_mx); + return (rv); + } + // Save the protocol destructor. + pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini; + pipe->p_sock = sock; + nni_list_append(&sock->s_pipes, pipe); + nni_mtx_unlock(&sock->s_mx); + return (0); +} + int nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) { @@ -98,9 +120,6 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) return (rv); } - // We have claimed ownership of the pipe, so add it to the list. - // Up until this point, the caller could destroy the pipe. - nni_list_append(&sock->s_pipes, pipe); nni_mtx_unlock(&sock->s_mx); return (0); diff --git a/src/core/socket.h b/src/core/socket.h index 2dc06009..56cf60c0 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -87,6 +87,10 @@ extern void nni_sock_unnotify(nni_sock *, nni_notify *); extern void nni_sock_ep_remove(nni_sock *, nni_ep *); +// nni_sock_pipe_init adds the pipe to the socket. It is called by +// the generic pipe creation code. +extern int nni_sock_pipe_init(nni_sock *, nni_pipe *); + extern void nni_sock_pipe_stop(nni_sock *, nni_pipe *); // nni_sock_pipe_ready lets the socket know the pipe is ready for -- cgit v1.2.3-70-g09d2