diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-15 15:45:48 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-15 15:45:48 -0700 |
| commit | 7f95fde8d752dd93c20ff0a209334f4aec549111 (patch) | |
| tree | f6226f1e9741ae855a96d215600dacb006927434 /src | |
| parent | 5fe345c66139fc3242c4fdbd78bf05e5670581e8 (diff) | |
| download | nng-7f95fde8d752dd93c20ff0a209334f4aec549111.tar.gz nng-7f95fde8d752dd93c20ff0a209334f4aec549111.tar.bz2 nng-7f95fde8d752dd93c20ff0a209334f4aec549111.zip | |
Some initial progress on *connect* async.
This actually is breaking at the moment, because we don't have
good integration with timeouts, and there are some frustrating
races with timeouts at points that can cause apparent hangs.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/endpt.c | 207 | ||||
| -rw-r--r-- | src/core/endpt.h | 3 | ||||
| -rw-r--r-- | src/core/pipe.c | 9 | ||||
| -rw-r--r-- | src/core/socket.c | 25 | ||||
| -rw-r--r-- | src/core/socket.h | 4 |
5 files changed, 172 insertions, 76 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index fef6ac83..a99cd21a 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -18,6 +18,8 @@ static void nni_ep_accept_start(nni_ep *); static void nni_ep_accept_done(void *); +static void nni_ep_connect_start(nni_ep *); +static void nni_ep_connect_done(void *); static nni_idhash *nni_eps; @@ -56,13 +58,14 @@ nni_ep_destroy(nni_ep *ep) return; } nni_aio_fini(&ep->ep_acc_aio); + nni_aio_fini(&ep->ep_con_aio); + nni_aio_fini(&ep->ep_con_syn); if (ep->ep_data != NULL) { ep->ep_ops.ep_fini(ep->ep_data); } if (ep->ep_id != 0) { nni_idhash_remove(nni_eps, ep->ep_id); } - nni_thr_fini(&ep->ep_thr); nni_cv_fini(&ep->ep_cv); nni_mtx_fini(&ep->ep_mtx); NNI_FREE_STRUCT(ep); @@ -91,6 +94,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) ep->ep_pipe = NULL; ep->ep_id = id; ep->ep_data = NULL; + ep->ep_refcnt = 0; NNI_LIST_NODE_INIT(&ep->ep_node); @@ -107,6 +111,16 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) nni_ep_destroy(ep); return (rv); } + rv = nni_aio_init(&ep->ep_con_aio, nni_ep_connect_done, ep); + if (rv != 0) { + nni_ep_destroy(ep); + return (rv); + } + rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL); + if (rv != 0) { + nni_ep_destroy(ep); + return (rv); + } ep->ep_sock = sock; ep->ep_tran = tran; @@ -132,16 +146,15 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) void nni_ep_close(nni_ep *ep) { - // Abort any in-flight operations. - nni_aio_stop(&ep->ep_acc_aio); - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed == 0) { - ep->ep_closed = 1; - ep->ep_ops.ep_close(ep->ep_data); + if (ep->ep_closed) { + nni_mtx_unlock(&ep->ep_mtx); + return; } - nni_cv_wake(&ep->ep_cv); + ep->ep_closed = 1; nni_mtx_unlock(&ep->ep_mtx); + + ep->ep_ops.ep_close(ep->ep_data); } static void @@ -151,9 +164,25 @@ nni_ep_reap(nni_ep *ep) nni_ep_close(ep); // Extra sanity. + // Abort any in-flight operations. + nni_aio_stop(&ep->ep_acc_aio); + nni_aio_stop(&ep->ep_con_aio); + nni_aio_stop(&ep->ep_con_syn); + // Take us off the sock list. nni_sock_ep_remove(ep->ep_sock, ep); + // Make sure any other unlocked users (references) are gone + // before we actually remove the memory. We should not have + // to wait long as we have closed the underlying pipe and + // done everything we can to wake any waiter (synchronous connect) + // gracefully. + nni_mtx_lock(&ep->ep_mtx); + while (ep->ep_refcnt != 0) { + nni_cv_wait(&ep->ep_cv); + } + nni_mtx_unlock(&ep->ep_mtx); + nni_ep_destroy(ep); } @@ -173,59 +202,73 @@ nni_ep_stop(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); } -static int -nni_ep_connect_aio(nni_ep *ep, void **pipep) +static void +nni_ep_connect_done(void *arg) { - nni_aio aio; - int rv; + nni_ep * ep = arg; + nni_aio * aio = &ep->ep_con_aio; + void * tpipe; + nni_pipe * pipe; + const nni_tran_pipe *ops; - nni_aio_init(&aio, NULL, NULL); - aio.a_endpt = ep->ep_data; - ep->ep_ops.ep_connect(ep->ep_data, &aio); - nni_aio_wait(&aio); + int rv; + + nni_mtx_lock(&ep->ep_mtx); + if ((rv = nni_aio_result(aio)) == 0) { + + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); - if ((rv = nni_aio_result(&aio)) == 0) { - *pipep = aio.a_pipe; + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); + if (rv != 0) { + ops->p_fini(tpipe); + } } - nni_aio_fini(&aio); - return (rv); + +done: + switch (rv) { + case 0: + pipe->p_tran_ops = *ops; + pipe->p_tran_data = tpipe; + nni_pipe_start(pipe); + // No further outgoing connects -- we will restart a + // connection from the pipe when the pipe is removed. + break; + case NNG_ECLOSED: + case NNG_ECANCELED: + // Canceled/closed -- stop everything. + break; + default: + // cooldown = nni_clock() + rtime; + // rtime *= 2; + // if ((maxrtime >= defrtime) && (rtime > maxrtime)) { + // rtime = maxrtime; + // } + + // XXX: We need to inject a cooldown, and then try again. For + // now we just try again immediately. This is very suboptimal. + nni_ep_connect_start(ep); + break; + } + nni_mtx_unlock(&ep->ep_mtx); } -static int -nni_ep_connect_sync(nni_ep *ep) +static void +nni_ep_connect_start(nni_ep *ep) { - nni_pipe *pipe; - int rv; + nni_aio *aio = &ep->ep_acc_aio; - nni_mtx_lock(&ep->ep_mtx); + // Call with the Endpoint lock held. if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); + return; } - pipe->p_ep = ep; - nni_list_append(&ep->ep_pipes, pipe); - nni_mtx_unlock(&ep->ep_mtx); - rv = nni_ep_connect_aio(ep, &pipe->p_tran_data); - if (rv != 0) { - if (rv != NNG_ECLOSED) { // HACK ALERT - nni_pipe_stop(pipe); - } - return (rv); - } - nni_pipe_start(pipe); - nni_mtx_lock(&ep->ep_mtx); - ep->ep_pipe = pipe; - nni_mtx_unlock(&ep->ep_mtx); - return (0); + aio->a_endpt = ep->ep_data; + ep->ep_ops.ep_connect(ep->ep_data, aio); } // nni_dialer is the thread worker that dials in the background. +#if 0 // this has the logic for timing out... remove when done. static void nni_dialer(void *arg) { @@ -287,13 +330,20 @@ nni_dialer(void *arg) nni_mtx_unlock(&ep->ep_mtx); } } +#endif int nni_ep_dial(nni_ep *ep, int flags) { - int rv = 0; + int rv = 0; + nni_aio * aio; + void * tpipe; + nni_pipe * pipe; + const nni_tran_pipe *ops; nni_mtx_lock(&ep->ep_mtx); + ops = ep->ep_tran->tran_pipe; + if (ep->ep_mode != NNI_EP_MODE_DIAL) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); @@ -307,31 +357,59 @@ nni_ep_dial(nni_ep *ep, int flags) return (NNG_ECLOSED); } - if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } ep->ep_started = 1; - if (flags & NNG_FLAG_SYNCH) { + if ((flags & NNG_FLAG_SYNCH) == 0) { + nni_ep_connect_start(ep); nni_mtx_unlock(&ep->ep_mtx); - rv = nni_ep_connect_sync(ep); - if (rv != 0) { - nni_thr_fini(&ep->ep_thr); - ep->ep_started = 0; - return (rv); - } - nni_mtx_lock(&ep->ep_mtx); + return (0); } - nni_thr_run(&ep->ep_thr); + // This one is kind of special, since we need + // to block for the connection to complete. Ick. + aio = &ep->ep_con_syn; + aio->a_endpt = ep->ep_data; + ep->ep_ops.ep_connect(ep->ep_data, aio); + + // We're about to drop the lock, but we cannot allow the + // endpoint to be removed. Put a hold on it. + ep->ep_refcnt++; nni_mtx_unlock(&ep->ep_mtx); + nni_aio_wait(aio); + + nni_mtx_lock(&ep->ep_mtx); + ep->ep_refcnt--; + + if (ep->ep_closed) { + rv = NNG_ECLOSED; + } else { + rv = nni_aio_result(aio); + } + + if (rv != 0) { + ep->ep_started = 0; + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); + return (rv); + } + + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); + if (rv != 0) { + ops->p_fini(tpipe); + } else { + pipe->p_tran_ops = *ops; + pipe->p_tran_data = tpipe; + nni_pipe_start(pipe); + } + + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); return (rv); } -static void nni_ep_accept_start(nni_ep *ep); - static void nni_ep_accept_done(void *arg) { @@ -348,7 +426,8 @@ nni_ep_accept_done(void *arg) if ((rv = nni_aio_result(aio)) != 0) { goto done; } - NNI_ASSERT((tpipe = aio->a_pipe) != NULL); + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); if (rv != 0) { diff --git a/src/core/endpt.h b/src/core/endpt.h index a47586a0..3350bc59 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -26,18 +26,19 @@ struct nni_ep { nni_list_node ep_node; // per socket list nni_sock * ep_sock; char ep_addr[NNG_MAXADDRLEN]; - nni_thr ep_thr; int ep_mode; int ep_started; int ep_stop; int ep_closed; // full shutdown int ep_bound; // true if we bound locally + int ep_refcnt; nni_mtx ep_mtx; nni_cv ep_cv; nni_pipe * ep_pipe; // Connected pipe (dialers only) nni_list ep_pipes; nni_aio ep_acc_aio; nni_aio ep_con_aio; + nni_aio ep_con_syn; // used for sync connect nni_taskq_ent ep_reap_tqe; }; diff --git a/src/core/pipe.c b/src/core/pipe.c index 8f2099a9..f1e8014e 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -91,8 +91,6 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) void nni_pipe_close(nni_pipe *p) { - nni_sock *sock = p->p_sock; - nni_mtx_lock(&p->p_mtx); if (p->p_reap == 1) { // We already did a close. @@ -200,18 +198,13 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran) nni_pipe_destroy(p); return (rv); } - p->p_sock = sock; // Make a copy of the transport ops. We can override entry points // and we avoid an extra dereference on hot code paths. p->p_tran_ops = *tran->tran_pipe; - // Save the protocol destructor. - p->p_proto_dtor = sock->s_pipe_ops.pipe_fini; - // Initialize protocol pipe data. - rv = sock->s_pipe_ops.pipe_init(&p->p_proto_data, p, sock->s_data); - if (rv != 0) { + if ((rv = nni_sock_pipe_init(sock, p)) != 0) { nni_pipe_destroy(p); return (rv); } diff --git a/src/core/socket.c b/src/core/socket.c index 3a4f36e5..75ae8450 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -77,6 +78,27 @@ nni_sock_rele(nni_sock *sock) } int +nni_sock_pipe_init(nni_sock *sock, nni_pipe *pipe) +{ + int rv; + + // Initialize protocol pipe data. + nni_mtx_lock(&sock->s_mx); + rv = sock->s_pipe_ops.pipe_init( + &pipe->p_proto_data, pipe, sock->s_data); + if (rv != 0) { + nni_mtx_lock(&sock->s_mx); + return (rv); + } + // Save the protocol destructor. + pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini; + pipe->p_sock = sock; + nni_list_append(&sock->s_pipes, pipe); + nni_mtx_unlock(&sock->s_mx); + return (0); +} + +int nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) { int rv; @@ -98,9 +120,6 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) return (rv); } - // We have claimed ownership of the pipe, so add it to the list. - // Up until this point, the caller could destroy the pipe. - nni_list_append(&sock->s_pipes, pipe); nni_mtx_unlock(&sock->s_mx); return (0); diff --git a/src/core/socket.h b/src/core/socket.h index 2dc06009..56cf60c0 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -87,6 +87,10 @@ extern void nni_sock_unnotify(nni_sock *, nni_notify *); extern void nni_sock_ep_remove(nni_sock *, nni_ep *); +// nni_sock_pipe_init adds the pipe to the socket. It is called by +// the generic pipe creation code. +extern int nni_sock_pipe_init(nni_sock *, nni_pipe *); + extern void nni_sock_pipe_stop(nni_sock *, nni_pipe *); // nni_sock_pipe_ready lets the socket know the pipe is ready for |
