diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-14 19:43:09 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-14 19:43:09 -0700 |
| commit | 031f7f441f88379d5359a6e96cdd2fe296052070 (patch) | |
| tree | 6ce377b449fb1922ad5461265697d0df991c4837 /src/core/endpt.c | |
| parent | 36746b4f6615607510eedb8e5d168b0fc4897ded (diff) | |
| download | nng-031f7f441f88379d5359a6e96cdd2fe296052070.tar.gz nng-031f7f441f88379d5359a6e96cdd2fe296052070.tar.bz2 nng-031f7f441f88379d5359a6e96cdd2fe296052070.zip | |
Implemented asynchronous (fully) accept.
This logic leaves a race condition in the dial side, which will
be fixed with a subsequent change to convert that to fully asynchronous
as well.
Diffstat (limited to 'src/core/endpt.c')
| -rw-r--r-- | src/core/endpt.c | 395 |
1 files changed, 135 insertions, 260 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 8eb7dd12..fef6ac83 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -16,69 +16,31 @@ // Functionality related to end points. -static nni_objhash *nni_eps = NULL; -static void * nni_ep_ctor(uint32_t); -static void nni_ep_dtor(void *); +static void nni_ep_accept_start(nni_ep *); +static void nni_ep_accept_done(void *); + +static nni_idhash *nni_eps; int nni_ep_sys_init(void) { int rv; - rv = nni_objhash_init(&nni_eps, nni_ep_ctor, nni_ep_dtor); - if (rv != 0) { + if ((rv = nni_idhash_init(&nni_eps)) != 0) { return (rv); } - return (rv); -} - -void -nni_ep_sys_fini(void) -{ - nni_objhash_fini(nni_eps); - nni_eps = NULL; -} - -int -nni_ep_find(nni_ep **epp, uint32_t id) -{ - int rv; - nni_ep *ep; - if ((rv = nni_init()) != 0) { - return (rv); - } + nni_idhash_set_limits( + nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff); - rv = nni_objhash_find(nni_eps, id, (void **) &ep); - if (rv != 0) { - return (NNG_ECLOSED); - } - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - nni_objhash_unref(nni_eps, id); - return (NNG_ECLOSED); - } - nni_mtx_unlock(&ep->ep_mtx); - if (epp != NULL) { - *epp = ep; - } return (0); } void -nni_ep_hold(nni_ep *ep) -{ - int rv; - - rv = nni_objhash_find(nni_eps, ep->ep_id, NULL); - NNI_ASSERT(rv == 0); -} - -void -nni_ep_rele(nni_ep *ep) +nni_ep_sys_fini(void) { - nni_objhash_unref(nni_eps, ep->ep_id); + nni_idhash_fini(nni_eps); + nni_eps = NULL; } uint32_t @@ -87,47 +49,20 @@ nni_ep_id(nni_ep *ep) return (ep->ep_id); } -static void * -nni_ep_ctor(uint32_t id) -{ - nni_ep *ep; - int rv; - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NULL); - } - ep->ep_closed = 0; - ep->ep_bound = 0; - ep->ep_pipe = NULL; - ep->ep_id = id; - ep->ep_data = NULL; - - NNI_LIST_NODE_INIT(&ep->ep_node); - - nni_pipe_ep_list_init(&ep->ep_pipes); - - if ((rv = nni_mtx_init(&ep->ep_mtx)) != 0) { - NNI_FREE_STRUCT(ep); - return (NULL); - } - - if ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) { - nni_mtx_fini(&ep->ep_mtx); - NNI_FREE_STRUCT(ep); - return (NULL); - } - - return (ep); -} - static void -nni_ep_dtor(void *ptr) +nni_ep_destroy(nni_ep *ep) { - nni_ep *ep = ptr; - + if (ep == NULL) { + return; + } + nni_aio_fini(&ep->ep_acc_aio); if (ep->ep_data != NULL) { ep->ep_ops.ep_fini(ep->ep_data); } + if (ep->ep_id != 0) { + nni_idhash_remove(nni_eps, ep->ep_id); + } + nni_thr_fini(&ep->ep_thr); nni_cv_fini(&ep->ep_cv); nni_mtx_fini(&ep->ep_mtx); NNI_FREE_STRUCT(ep); @@ -148,10 +83,31 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) return (NNG_EINVAL); } - rv = nni_objhash_alloc(nni_eps, &id, (void **) &ep); + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + ep->ep_closed = 0; + ep->ep_bound = 0; + ep->ep_pipe = NULL; + ep->ep_id = id; + ep->ep_data = NULL; + + NNI_LIST_NODE_INIT(&ep->ep_node); + + nni_pipe_ep_list_init(&ep->ep_pipes); + + if (((rv = nni_mtx_init(&ep->ep_mtx)) != 0) || + ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) || + ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0)) { + nni_ep_destroy(ep); + return (rv); + } + rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_accept_done, ep); if (rv != 0) { + nni_ep_destroy(ep); return (rv); } + ep->ep_sock = sock; ep->ep_tran = tran; ep->ep_mode = mode; @@ -165,12 +121,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) ep->ep_ops = *tran->tran_ep; if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock, mode)) != 0) { - nni_objhash_unref(nni_eps, id); - return (rv); - } - - if ((rv = nni_sock_ep_add(sock, ep)) != 0) { - nni_objhash_unref(nni_eps, id); + nni_ep_destroy(ep); return (rv); } @@ -179,8 +130,11 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) } void -nni_ep_stop(nni_ep *ep) +nni_ep_close(nni_ep *ep) { + // Abort any in-flight operations. + nni_aio_stop(&ep->ep_acc_aio); + nni_mtx_lock(&ep->ep_mtx); if (ep->ep_closed == 0) { ep->ep_closed = 1; @@ -190,38 +144,33 @@ nni_ep_stop(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); } -void -nni_ep_close(nni_ep *ep) -{ - nni_ep_stop(ep); -} - -void -nni_ep_remove(nni_ep *ep) +static void +nni_ep_reap(nni_ep *ep) { nni_pipe *pipe; - nni_sock *sock = ep->ep_sock; - nni_ep_stop(ep); + nni_ep_close(ep); // Extra sanity. - nni_thr_wait(&ep->ep_thr); + // Take us off the sock list. + nni_sock_ep_remove(ep->ep_sock, ep); - nni_mtx_lock(&ep->ep_mtx); - NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { - nni_pipe_close(pipe); - } - nni_mtx_unlock(&ep->ep_mtx); + nni_ep_destroy(ep); +} +void +nni_ep_stop(nni_ep *ep) +{ nni_mtx_lock(&ep->ep_mtx); - while (nni_list_first(&ep->ep_pipes) != NULL) { - nni_cv_wait(&ep->ep_cv); + + // Protection against recursion. + if (ep->ep_stop) { + nni_mtx_unlock(&ep->ep_mtx); + return; } + ep->ep_stop = 1; + nni_taskq_ent_init(&ep->ep_reap_tqe, (nni_cb) nni_ep_reap, ep); + nni_taskq_dispatch(NULL, &ep->ep_reap_tqe); nni_mtx_unlock(&ep->ep_mtx); - - nni_sock_ep_remove(sock, ep); - - nni_thr_fini(&ep->ep_thr); - nni_objhash_unref(nni_eps, ep->ep_id); } static int @@ -253,17 +202,20 @@ nni_ep_connect_sync(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } - rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran); + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); if (rv != 0) { nni_mtx_unlock(&ep->ep_mtx); return (rv); } + pipe->p_ep = ep; nni_list_append(&ep->ep_pipes, pipe); nni_mtx_unlock(&ep->ep_mtx); rv = nni_ep_connect_aio(ep, &pipe->p_tran_data); if (rv != 0) { - nni_pipe_stop(pipe); + if (rv != NNG_ECLOSED) { // HACK ALERT + nni_pipe_stop(pipe); + } return (rv); } nni_pipe_start(pipe); @@ -273,24 +225,6 @@ nni_ep_connect_sync(nni_ep *ep) return (0); } -void -nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) -{ - if (ep == NULL) { - return; - } - nni_mtx_lock(&ep->ep_mtx); - if (nni_list_active(&ep->ep_pipes, pipe)) { - nni_list_remove(&ep->ep_pipes, pipe); - - if (ep->ep_pipe == pipe) { - ep->ep_pipe = NULL; - } - nni_cv_wake(&ep->ep_cv); - } - nni_mtx_unlock(&ep->ep_mtx); -} - // nni_dialer is the thread worker that dials in the background. static void nni_dialer(void *arg) @@ -396,131 +330,81 @@ nni_ep_dial(nni_ep *ep, int flags) return (rv); } -static int -nni_ep_accept_aio(nni_ep *ep, void **pipep) -{ - nni_aio aio; - int rv; +static void nni_ep_accept_start(nni_ep *ep); - nni_aio_init(&aio, NULL, NULL); - aio.a_endpt = ep->ep_data; - ep->ep_ops.ep_accept(ep->ep_data, &aio); - nni_aio_wait(&aio); - - if ((rv = nni_aio_result(&aio)) == 0) { - *pipep = aio.a_pipe; - } - nni_aio_fini(&aio); - return (rv); -} - -static int -nni_ep_accept_sync(nni_ep *ep) +static void +nni_ep_accept_done(void *arg) { - nni_pipe *pipe; - int rv; + nni_ep * ep = arg; + nni_aio * aio = &ep->ep_acc_aio; + void * tpipe; + nni_pipe * pipe; + int rv; + const nni_tran_pipe *ops; + + ops = ep->ep_tran->tran_pipe; nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); + if ((rv = nni_aio_result(aio)) != 0) { + goto done; } - rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - nni_list_append(&ep->ep_pipes, pipe); - nni_mtx_unlock(&ep->ep_mtx); + NNI_ASSERT((tpipe = aio->a_pipe) != NULL); - rv = nni_ep_accept_aio(ep, &pipe->p_tran_data); + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); if (rv != 0) { - nni_pipe_stop(pipe); - return (rv); + ops->p_fini(tpipe); + goto done; + } + +done: + + switch (rv) { + case 0: + pipe->p_tran_ops = *ops; + pipe->p_tran_data = tpipe; + nni_pipe_start(pipe); + nni_ep_accept_start(ep); + break; + case NNG_ECLOSED: + case NNG_ECANCELED: + // Canceled or closed, no furhter action. + break; + case NNG_ECONNABORTED: + case NNG_ECONNRESET: + // These are remote conditions, no cool down. + // cooldown = 0; + nni_ep_accept_start(ep); + break; + case NNG_ENOMEM: + // We're running low on memory, so its best to wait + // a whole second to give the system a chance to + // recover memory. + // cooldown = 1000000; + nni_ep_accept_start(ep); + break; + default: + // other cases... sleep a tiny bit then try again. + // cooldown = 1000; 10msec + // Add a timeout here instead to avoid spinning. + nni_ep_accept_start(ep); + break; } - nni_pipe_start(pipe); - return (0); + nni_mtx_unlock(&ep->ep_mtx); } static void -nni_listener(void *arg) +nni_ep_accept_start(nni_ep *ep) { - nni_ep *ep = arg; - int rv; - - for (;;) { - nni_time cooldown; - nni_mtx_lock(&ep->ep_mtx); + nni_aio *aio = &ep->ep_acc_aio; - // If we didn't bind synchronously, do it now. - while (!ep->ep_bound && !ep->ep_closed) { - int rv; - - nni_mtx_unlock(&ep->ep_mtx); - rv = ep->ep_ops.ep_bind(ep->ep_data); - nni_mtx_lock(&ep->ep_mtx); - - if (rv == 0) { - ep->ep_bound = 1; - break; - } - // Invalid address? Out of memory? Who knows. - // Try again in a bit (10ms). - // XXX: PROPER BACKOFF NEEDED - cooldown = 10000; - cooldown += nni_clock(); - while (!ep->ep_closed) { - rv = nni_cv_until(&ep->ep_cv, cooldown); - if (rv == NNG_ETIMEDOUT) { - break; - } - } - } - if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - break; - } - nni_mtx_unlock(&ep->ep_mtx); - - if ((rv = nni_ep_accept_sync(ep)) == 0) { - // Success! Loop around for the next one. - continue; - } - - switch (rv) { - case NNG_ECLOSED: - // This indicates the listening socket got closed. - // We just bail. - return; - - case NNG_ECONNABORTED: - case NNG_ECONNRESET: - // These are remote conditions, no cool down. - cooldown = 0; - break; - case NNG_ENOMEM: - // We're running low on memory, so its best to wait - // a whole second to give the system a chance to - // recover memory. - cooldown = 1000000; - break; - default: - // Other cases we sleep just a tiny bit to avoid - // burning the cpu (e.g. out of files). - cooldown = 1000; // 1 msec - break; - } - cooldown += nni_clock(); - nni_mtx_lock(&ep->ep_mtx); - while (!ep->ep_closed) { - rv = nni_cv_until(&ep->ep_cv, cooldown); - if (rv == NNG_ETIMEDOUT) { - break; - } - } - nni_mtx_unlock(&ep->ep_mtx); + // Call with the Endpoint lock held. + if (ep->ep_closed) { + return; } + + aio->a_endpt = ep->ep_data; + ep->ep_ops.ep_accept(ep->ep_data, aio); } int @@ -542,26 +426,17 @@ nni_ep_listen(nni_ep *ep, int flags) return (NNG_ECLOSED); } - if ((rv = nni_thr_init(&ep->ep_thr, nni_listener, ep)) != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - ep->ep_started = 1; - if (flags & NNG_FLAG_SYNCH) { + rv = ep->ep_ops.ep_bind(ep->ep_data); + if (rv != 0) { + ep->ep_started = 0; nni_mtx_unlock(&ep->ep_mtx); - rv = ep->ep_ops.ep_bind(ep->ep_data); - if (rv != 0) { - nni_thr_fini(&ep->ep_thr); - ep->ep_started = 0; - return (rv); - } - nni_mtx_lock(&ep->ep_mtx); - ep->ep_bound = 1; + return (rv); } + ep->ep_bound = 1; - nni_thr_run(&ep->ep_thr); + nni_ep_accept_start(ep); nni_mtx_unlock(&ep->ep_mtx); return (0); |
