diff options
Diffstat (limited to 'src/core/endpt.c')
| -rw-r--r-- | src/core/endpt.c | 207 |
1 files changed, 143 insertions, 64 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index fef6ac83..a99cd21a 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -18,6 +18,8 @@ static void nni_ep_accept_start(nni_ep *); static void nni_ep_accept_done(void *); +static void nni_ep_connect_start(nni_ep *); +static void nni_ep_connect_done(void *); static nni_idhash *nni_eps; @@ -56,13 +58,14 @@ nni_ep_destroy(nni_ep *ep) return; } nni_aio_fini(&ep->ep_acc_aio); + nni_aio_fini(&ep->ep_con_aio); + nni_aio_fini(&ep->ep_con_syn); if (ep->ep_data != NULL) { ep->ep_ops.ep_fini(ep->ep_data); } if (ep->ep_id != 0) { nni_idhash_remove(nni_eps, ep->ep_id); } - nni_thr_fini(&ep->ep_thr); nni_cv_fini(&ep->ep_cv); nni_mtx_fini(&ep->ep_mtx); NNI_FREE_STRUCT(ep); @@ -91,6 +94,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) ep->ep_pipe = NULL; ep->ep_id = id; ep->ep_data = NULL; + ep->ep_refcnt = 0; NNI_LIST_NODE_INIT(&ep->ep_node); @@ -107,6 +111,16 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) nni_ep_destroy(ep); return (rv); } + rv = nni_aio_init(&ep->ep_con_aio, nni_ep_connect_done, ep); + if (rv != 0) { + nni_ep_destroy(ep); + return (rv); + } + rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL); + if (rv != 0) { + nni_ep_destroy(ep); + return (rv); + } ep->ep_sock = sock; ep->ep_tran = tran; @@ -132,16 +146,15 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) void nni_ep_close(nni_ep *ep) { - // Abort any in-flight operations. - nni_aio_stop(&ep->ep_acc_aio); - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed == 0) { - ep->ep_closed = 1; - ep->ep_ops.ep_close(ep->ep_data); + if (ep->ep_closed) { + nni_mtx_unlock(&ep->ep_mtx); + return; } - nni_cv_wake(&ep->ep_cv); + ep->ep_closed = 1; nni_mtx_unlock(&ep->ep_mtx); + + ep->ep_ops.ep_close(ep->ep_data); } static void @@ -151,9 +164,25 @@ nni_ep_reap(nni_ep *ep) nni_ep_close(ep); // Extra sanity. + // Abort any in-flight operations. + nni_aio_stop(&ep->ep_acc_aio); + nni_aio_stop(&ep->ep_con_aio); + nni_aio_stop(&ep->ep_con_syn); + // Take us off the sock list. nni_sock_ep_remove(ep->ep_sock, ep); + // Make sure any other unlocked users (references) are gone + // before we actually remove the memory. We should not have + // to wait long as we have closed the underlying pipe and + // done everything we can to wake any waiter (synchronous connect) + // gracefully. + nni_mtx_lock(&ep->ep_mtx); + while (ep->ep_refcnt != 0) { + nni_cv_wait(&ep->ep_cv); + } + nni_mtx_unlock(&ep->ep_mtx); + nni_ep_destroy(ep); } @@ -173,59 +202,73 @@ nni_ep_stop(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); } -static int -nni_ep_connect_aio(nni_ep *ep, void **pipep) +static void +nni_ep_connect_done(void *arg) { - nni_aio aio; - int rv; + nni_ep * ep = arg; + nni_aio * aio = &ep->ep_con_aio; + void * tpipe; + nni_pipe * pipe; + const nni_tran_pipe *ops; - nni_aio_init(&aio, NULL, NULL); - aio.a_endpt = ep->ep_data; - ep->ep_ops.ep_connect(ep->ep_data, &aio); - nni_aio_wait(&aio); + int rv; + + nni_mtx_lock(&ep->ep_mtx); + if ((rv = nni_aio_result(aio)) == 0) { + + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); - if ((rv = nni_aio_result(&aio)) == 0) { - *pipep = aio.a_pipe; + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); + if (rv != 0) { + ops->p_fini(tpipe); + } } - nni_aio_fini(&aio); - return (rv); + +done: + switch (rv) { + case 0: + pipe->p_tran_ops = *ops; + pipe->p_tran_data = tpipe; + nni_pipe_start(pipe); + // No further outgoing connects -- we will restart a + // connection from the pipe when the pipe is removed. + break; + case NNG_ECLOSED: + case NNG_ECANCELED: + // Canceled/closed -- stop everything. + break; + default: + // cooldown = nni_clock() + rtime; + // rtime *= 2; + // if ((maxrtime >= defrtime) && (rtime > maxrtime)) { + // rtime = maxrtime; + // } + + // XXX: We need to inject a cooldown, and then try again. For + // now we just try again immediately. This is very suboptimal. + nni_ep_connect_start(ep); + break; + } + nni_mtx_unlock(&ep->ep_mtx); } -static int -nni_ep_connect_sync(nni_ep *ep) +static void +nni_ep_connect_start(nni_ep *ep) { - nni_pipe *pipe; - int rv; + nni_aio *aio = &ep->ep_acc_aio; - nni_mtx_lock(&ep->ep_mtx); + // Call with the Endpoint lock held. if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); + return; } - pipe->p_ep = ep; - nni_list_append(&ep->ep_pipes, pipe); - nni_mtx_unlock(&ep->ep_mtx); - rv = nni_ep_connect_aio(ep, &pipe->p_tran_data); - if (rv != 0) { - if (rv != NNG_ECLOSED) { // HACK ALERT - nni_pipe_stop(pipe); - } - return (rv); - } - nni_pipe_start(pipe); - nni_mtx_lock(&ep->ep_mtx); - ep->ep_pipe = pipe; - nni_mtx_unlock(&ep->ep_mtx); - return (0); + aio->a_endpt = ep->ep_data; + ep->ep_ops.ep_connect(ep->ep_data, aio); } // nni_dialer is the thread worker that dials in the background. +#if 0 // this has the logic for timing out... remove when done. static void nni_dialer(void *arg) { @@ -287,13 +330,20 @@ nni_dialer(void *arg) nni_mtx_unlock(&ep->ep_mtx); } } +#endif int nni_ep_dial(nni_ep *ep, int flags) { - int rv = 0; + int rv = 0; + nni_aio * aio; + void * tpipe; + nni_pipe * pipe; + const nni_tran_pipe *ops; nni_mtx_lock(&ep->ep_mtx); + ops = ep->ep_tran->tran_pipe; + if (ep->ep_mode != NNI_EP_MODE_DIAL) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); @@ -307,31 +357,59 @@ nni_ep_dial(nni_ep *ep, int flags) return (NNG_ECLOSED); } - if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } ep->ep_started = 1; - if (flags & NNG_FLAG_SYNCH) { + if ((flags & NNG_FLAG_SYNCH) == 0) { + nni_ep_connect_start(ep); nni_mtx_unlock(&ep->ep_mtx); - rv = nni_ep_connect_sync(ep); - if (rv != 0) { - nni_thr_fini(&ep->ep_thr); - ep->ep_started = 0; - return (rv); - } - nni_mtx_lock(&ep->ep_mtx); + return (0); } - nni_thr_run(&ep->ep_thr); + // This one is kind of special, since we need + // to block for the connection to complete. Ick. + aio = &ep->ep_con_syn; + aio->a_endpt = ep->ep_data; + ep->ep_ops.ep_connect(ep->ep_data, aio); + + // We're about to drop the lock, but we cannot allow the + // endpoint to be removed. Put a hold on it. + ep->ep_refcnt++; nni_mtx_unlock(&ep->ep_mtx); + nni_aio_wait(aio); + + nni_mtx_lock(&ep->ep_mtx); + ep->ep_refcnt--; + + if (ep->ep_closed) { + rv = NNG_ECLOSED; + } else { + rv = nni_aio_result(aio); + } + + if (rv != 0) { + ep->ep_started = 0; + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); + return (rv); + } + + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); + if (rv != 0) { + ops->p_fini(tpipe); + } else { + pipe->p_tran_ops = *ops; + pipe->p_tran_data = tpipe; + nni_pipe_start(pipe); + } + + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); return (rv); } -static void nni_ep_accept_start(nni_ep *ep); - static void nni_ep_accept_done(void *arg) { @@ -348,7 +426,8 @@ nni_ep_accept_done(void *arg) if ((rv = nni_aio_result(aio)) != 0) { goto done; } - NNI_ASSERT((tpipe = aio->a_pipe) != NULL); + tpipe = aio->a_pipe; + NNI_ASSERT(tpipe != NULL); rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); if (rv != 0) { |
