diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-15 15:45:48 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-15 15:45:48 -0700 |
| commit | 7f95fde8d752dd93c20ff0a209334f4aec549111 (patch) | |
| tree | f6226f1e9741ae855a96d215600dacb006927434 /src/core/endpt.c | |
| parent | 5fe345c66139fc3242c4fdbd78bf05e5670581e8 (diff) | |
| download | nng-7f95fde8d752dd93c20ff0a209334f4aec549111.tar.gz nng-7f95fde8d752dd93c20ff0a209334f4aec549111.tar.bz2 nng-7f95fde8d752dd93c20ff0a209334f4aec549111.zip | |
Some initial progress on *connect* async.
This actually is breaking at the moment, because we don't have
good integration with timeouts, and there are some frustrating
races with timeouts at points that can cause apparent hangs.
Diffstat (limited to 'src/core/endpt.c')
| -rw-r--r-- | src/core/endpt.c | 207 |
1 files changed, 143 insertions, 64 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index fef6ac83..a99cd21a 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -18,6 +18,8 @@ static void nni_ep_accept_start(nni_ep *); static void nni_ep_accept_done(void *); +static void nni_ep_connect_start(nni_ep *); +static void nni_ep_connect_done(void *); static nni_idhash *nni_eps; @@ -56,13 +58,14 @@ nni_ep_destroy(nni_ep *ep) return; } nni_aio_fini(&ep->ep_acc_aio); + nni_aio_fini(&ep->ep_con_aio); + nni_aio_fini(&ep->ep_con_syn); if (ep->ep_data != NULL) { ep->ep_ops.ep_fini(ep->ep_data); } if (ep->ep_id != 0) { nni_idhash_remove(nni_eps, ep->ep_id); } - nni_thr_fini(&ep->ep_thr); nni_cv_fini(&ep->ep_cv); nni_mtx_fini(&ep->ep_mtx); NNI_FREE_STRUCT(ep); @@ -91,6 +94,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) ep->ep_pipe = NULL; ep->ep_id = id; ep->ep_data = NULL; + ep->ep_refcnt = 0; NNI_LIST_NODE_INIT(&ep->ep_node); @@ -107,6 +111,16 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) nni_ep_destroy(ep); return (rv); } + rv = nni_aio_init(&ep->ep_con_aio, nni_ep_connect_done, ep); + if (rv != 0) { + nni_ep_destroy(ep); + return (rv); + } + rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL); + if (rv != 0) { + nni_ep_destroy(ep); + return (rv); + } ep->ep_sock = sock; ep->ep_tran = tran; @@ -132,16 +146,15 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) void nni_ep_close(nni_ep *ep) { - // Abort any in-flight operations. - nni_aio_stop(&ep->ep_acc_aio); - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed == 0) { - ep->ep_closed = 1; - ep->ep_ops.ep_close(ep->ep_data); + if (ep->ep_closed) { + nni_mtx_unlock(&ep->ep_mtx); + return; } - nni_cv_wake(&ep->ep_cv); + ep->ep_closed = 1; nni_mtx_unlock(&ep->ep_mtx); + + ep->ep_ops.ep_close(ep->ep_data); } static void @@ -151,9 +164,25 @@ nni_ep_reap(nni_ep *ep) nni_ep_close(ep); // Extra sanity. + // Abort any in-flight operations. + nni_aio_stop(&ep->ep_acc_aio); + nni_aio_stop(&ep->ep_con_aio); + nni_aio_stop(&ep->ep_con_syn); + // Take us off the sock list. nni_sock_ep_remove(ep->ep_sock, ep); + // Make sure any other unlocked users (references) are gone + // before we actually remove the memory. We should not have + // to wait long as we have closed the underlying pipe and + // done everything we can to wake any waiter (synchronous connect) + // gracefully. + nni_mtx_lock(&ep->ep_mtx); + while (ep->ep_refcnt != 0) { + nni_cv_wait(&ep->ep_cv); + } + nni_mtx_unlock(&ep->ep_mtx); + nni_ep_destroy(ep); } @@ -173,59 +202,73 @@ nni_ep_stop(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); } -static int -nni_ep_connect_aio(nni_ep *ep, void **pipep) +static void +nni_ep_connect_done(void *arg) { - nni_aio aio; - int rv; + nni_ep * ep = arg; + nni_aio * aio = &ep->ep_con_aio; + void * tpipe; + nni_pipe * pipe; + const nni_tran_pipe *ops; - nni_aio_init(&aio, NULL, NULL); - aio.a_endpt = ep->ep_data; - ep->ep_ops.ep_connect(ep->ep_data, &aio); - nni_aio_wait(&aio); + int rv; + + nni_mtx_lock(&ep->ep_mtx); + if ((rv = nni_aio_result(aio)) == 0) { + + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); - if ((rv = nni_aio_result(&aio)) == 0) { - *pipep = aio.a_pipe; + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); + if (rv != 0) { + ops->p_fini(tpipe); + } } - nni_aio_fini(&aio); - return (rv); + +done: + switch (rv) { + case 0: + pipe->p_tran_ops = *ops; + pipe->p_tran_data = tpipe; + nni_pipe_start(pipe); + // No further outgoing connects -- we will restart a + // connection from the pipe when the pipe is removed. + break; + case NNG_ECLOSED: + case NNG_ECANCELED: + // Canceled/closed -- stop everything. + break; + default: + // cooldown = nni_clock() + rtime; + // rtime *= 2; + // if ((maxrtime >= defrtime) && (rtime > maxrtime)) { + // rtime = maxrtime; + // } + + // XXX: We need to inject a cooldown, and then try again. For + // now we just try again immediately. This is very suboptimal. + nni_ep_connect_start(ep); + break; + } + nni_mtx_unlock(&ep->ep_mtx); } -static int -nni_ep_connect_sync(nni_ep *ep) +static void +nni_ep_connect_start(nni_ep *ep) { - nni_pipe *pipe; - int rv; + nni_aio *aio = &ep->ep_acc_aio; - nni_mtx_lock(&ep->ep_mtx); + // Call with the Endpoint lock held. if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); + return; } - pipe->p_ep = ep; - nni_list_append(&ep->ep_pipes, pipe); - nni_mtx_unlock(&ep->ep_mtx); - rv = nni_ep_connect_aio(ep, &pipe->p_tran_data); - if (rv != 0) { - if (rv != NNG_ECLOSED) { // HACK ALERT - nni_pipe_stop(pipe); - } - return (rv); - } - nni_pipe_start(pipe); - nni_mtx_lock(&ep->ep_mtx); - ep->ep_pipe = pipe; - nni_mtx_unlock(&ep->ep_mtx); - return (0); + aio->a_endpt = ep->ep_data; + ep->ep_ops.ep_connect(ep->ep_data, aio); } // nni_dialer is the thread worker that dials in the background. +#if 0 // this has the logic for timing out... remove when done. static void nni_dialer(void *arg) { @@ -287,13 +330,20 @@ nni_dialer(void *arg) nni_mtx_unlock(&ep->ep_mtx); } } +#endif int nni_ep_dial(nni_ep *ep, int flags) { - int rv = 0; + int rv = 0; + nni_aio * aio; + void * tpipe; + nni_pipe * pipe; + const nni_tran_pipe *ops; nni_mtx_lock(&ep->ep_mtx); + ops = ep->ep_tran->tran_pipe; + if (ep->ep_mode != NNI_EP_MODE_DIAL) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); @@ -307,31 +357,59 @@ nni_ep_dial(nni_ep *ep, int flags) return (NNG_ECLOSED); } - if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } ep->ep_started = 1; - if (flags & NNG_FLAG_SYNCH) { + if ((flags & NNG_FLAG_SYNCH) == 0) { + nni_ep_connect_start(ep); nni_mtx_unlock(&ep->ep_mtx); - rv = nni_ep_connect_sync(ep); - if (rv != 0) { - nni_thr_fini(&ep->ep_thr); - ep->ep_started = 0; - return (rv); - } - nni_mtx_lock(&ep->ep_mtx); + return (0); } - nni_thr_run(&ep->ep_thr); + // This one is kind of special, since we need + // to block for the connection to complete. Ick. + aio = &ep->ep_con_syn; + aio->a_endpt = ep->ep_data; + ep->ep_ops.ep_connect(ep->ep_data, aio); + + // We're about to drop the lock, but we cannot allow the + // endpoint to be removed. Put a hold on it. + ep->ep_refcnt++; nni_mtx_unlock(&ep->ep_mtx); + nni_aio_wait(aio); + + nni_mtx_lock(&ep->ep_mtx); + ep->ep_refcnt--; + + if (ep->ep_closed) { + rv = NNG_ECLOSED; + } else { + rv = nni_aio_result(aio); + } + + if (rv != 0) { + ep->ep_started = 0; + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); + return (rv); + } + + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); + if (rv != 0) { + ops->p_fini(tpipe); + } else { + pipe->p_tran_ops = *ops; + pipe->p_tran_data = tpipe; + nni_pipe_start(pipe); + } + + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); return (rv); } -static void nni_ep_accept_start(nni_ep *ep); - static void nni_ep_accept_done(void *arg) { @@ -348,7 +426,8 @@ nni_ep_accept_done(void *arg) if ((rv = nni_aio_result(aio)) != 0) { goto done; } - NNI_ASSERT((tpipe = aio->a_pipe) != NULL); + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); if (rv != 0) { |
