diff options
Diffstat (limited to 'src/core/endpt.c')
| -rw-r--r-- | src/core/endpt.c | 85 |
1 files changed, 43 insertions, 42 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 9dc33867..9122bb58 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -31,10 +31,10 @@ struct nni_ep { nni_mtx ep_mtx; nni_cv ep_cv; nni_list ep_pipes; - nni_aio ep_acc_aio; - nni_aio ep_con_aio; - nni_aio ep_con_syn; // used for sync connect - nni_aio ep_tmo_aio; // backoff timer + nni_aio * ep_acc_aio; + nni_aio * ep_con_aio; + nni_aio * ep_con_syn; // used for sync connect + nni_aio * ep_tmo_aio; // backoff timer nni_duration ep_maxrtime; // maximum time for reconnect nni_duration ep_currtime; // current time for reconnect nni_duration ep_inirtime; // initial time for reconnect @@ -96,15 +96,15 @@ nni_ep_destroy(nni_ep *ep) nni_sock_ep_remove(ep->ep_sock, ep); - nni_aio_stop(&ep->ep_acc_aio); - nni_aio_stop(&ep->ep_con_aio); - nni_aio_stop(&ep->ep_con_syn); - nni_aio_stop(&ep->ep_tmo_aio); + nni_aio_stop(ep->ep_acc_aio); + nni_aio_stop(ep->ep_con_aio); + nni_aio_stop(ep->ep_con_syn); + nni_aio_stop(ep->ep_tmo_aio); - nni_aio_fini(&ep->ep_acc_aio); - nni_aio_fini(&ep->ep_con_aio); - nni_aio_fini(&ep->ep_con_syn); - nni_aio_fini(&ep->ep_tmo_aio); + nni_aio_fini(ep->ep_acc_aio); + nni_aio_fini(ep->ep_con_aio); + nni_aio_fini(ep->ep_con_syn); + nni_aio_fini(ep->ep_tmo_aio); nni_mtx_lock(&ep->ep_mtx); if (ep->ep_data != NULL) { @@ -154,12 +154,12 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode) nni_mtx_init(&ep->ep_mtx); nni_cv_init(&ep->ep_cv, &ep->ep_mtx); - nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep); - nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep); - nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep); - nni_aio_init(&ep->ep_con_syn, NULL, NULL); - if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || + if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) || + ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) || ((rv = nni_sock_ep_add(s, ep)) != 0)) { nni_ep_destroy(ep); @@ -246,10 +246,10 @@ nni_ep_shutdown(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); // Abort any remaining in-flight operations. - nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED); - nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED); - nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED); - nni_aio_cancel(&ep->ep_tmo_aio, NNG_ECLOSED); + nni_aio_cancel(ep->ep_acc_aio, NNG_ECLOSED); + nni_aio_cancel(ep->ep_con_aio, NNG_ECLOSED); + nni_aio_cancel(ep->ep_con_syn, NNG_ECLOSED); + nni_aio_cancel(ep->ep_tmo_aio, NNG_ECLOSED); // Stop the underlying transport. ep->ep_ops.ep_close(ep->ep_data); @@ -273,10 +273,10 @@ nni_ep_close(nni_ep *ep) nni_ep_shutdown(ep); - nni_aio_stop(&ep->ep_acc_aio); - nni_aio_stop(&ep->ep_con_aio); - nni_aio_stop(&ep->ep_con_syn); - nni_aio_stop(&ep->ep_tmo_aio); + nni_aio_stop(ep->ep_acc_aio); + nni_aio_stop(ep->ep_con_aio); + nni_aio_stop(ep->ep_con_syn); + nni_aio_stop(ep->ep_tmo_aio); nni_mtx_lock(&ep->ep_mtx); NNI_LIST_FOREACH (&ep->ep_pipes, p) { @@ -327,10 +327,11 @@ nni_ep_tmo_start(nni_ep *ep) // have a statistically perfect distribution with the modulo of // the random number, but this really doesn't matter. - ep->ep_tmo_aio.a_expire = - nni_clock() + (backoff ? nni_random() % backoff : 0); + nni_aio_set_timeout(ep->ep_tmo_aio, + nni_clock() + (backoff ? nni_random() % backoff : 0)); + ep->ep_tmo_run = 1; - if (nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) { + if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) { ep->ep_tmo_run = 0; } } @@ -339,7 +340,7 @@ static void nni_ep_tmo_cb(void *arg) { nni_ep * ep = arg; - nni_aio *aio = &ep->ep_tmo_aio; + nni_aio *aio = ep->ep_tmo_aio; nni_mtx_lock(&ep->ep_mtx); if (nni_aio_result(aio) == NNG_ETIMEDOUT) { @@ -356,11 +357,11 @@ static void nni_ep_con_cb(void *arg) { nni_ep * ep = arg; - nni_aio *aio = &ep->ep_con_aio; + nni_aio *aio = ep->ep_con_aio; int rv; if ((rv = nni_aio_result(aio)) == 0) { - rv = nni_pipe_create(ep, aio->a_pipe); + rv = nni_pipe_create(ep, nni_aio_get_pipe(aio)); } nni_mtx_lock(&ep->ep_mtx); switch (rv) { @@ -392,14 +393,14 @@ nni_ep_con_cb(void *arg) static void nni_ep_con_start(nni_ep *ep) { - nni_aio *aio = &ep->ep_con_aio; + nni_aio *aio = ep->ep_con_aio; // Call with the Endpoint lock held. if (ep->ep_closing) { return; } - aio->a_endpt = ep->ep_data; + nni_aio_set_ep(aio, ep->ep_data); ep->ep_ops.ep_connect(ep->ep_data, aio); } @@ -436,8 +437,8 @@ nni_ep_dial(nni_ep *ep, int flags) } // Synchronous mode: so we have to wait for it to complete. - aio = &ep->ep_con_syn; - aio->a_endpt = ep->ep_data; + aio = ep->ep_con_syn; + nni_aio_set_ep(aio, ep->ep_data); ep->ep_ops.ep_connect(ep->ep_data, aio); ep->ep_started = 1; nni_mtx_unlock(&ep->ep_mtx); @@ -446,7 +447,7 @@ nni_ep_dial(nni_ep *ep, int flags) // As we're synchronous, we also have to handle the completion. if (((rv = nni_aio_result(aio)) != 0) || - ((rv = nni_pipe_create(ep, aio->a_pipe)) != 0)) { + ((rv = nni_pipe_create(ep, nni_aio_get_pipe(aio))) != 0)) { nni_mtx_lock(&ep->ep_mtx); ep->ep_started = 0; nni_mtx_unlock(&ep->ep_mtx); @@ -458,12 +459,12 @@ static void nni_ep_acc_cb(void *arg) { nni_ep * ep = arg; - nni_aio *aio = &ep->ep_acc_aio; + nni_aio *aio = ep->ep_acc_aio; int rv; if ((rv = nni_aio_result(aio)) == 0) { - NNI_ASSERT(aio->a_pipe != NULL); - rv = nni_pipe_create(ep, aio->a_pipe); + NNI_ASSERT(nni_aio_get_pipe(aio) != NULL); + rv = nni_pipe_create(ep, nni_aio_get_pipe(aio)); } nni_mtx_lock(&ep->ep_mtx); @@ -495,14 +496,14 @@ nni_ep_acc_cb(void *arg) static void nni_ep_acc_start(nni_ep *ep) { - nni_aio *aio = &ep->ep_acc_aio; + nni_aio *aio = ep->ep_acc_aio; // Call with the Endpoint lock held. if (ep->ep_closing) { return; } - aio->a_pipe = NULL; - aio->a_endpt = ep->ep_data; + nni_aio_set_pipe(aio, NULL); + nni_aio_set_ep(aio, ep->ep_data); ep->ep_ops.ep_accept(ep->ep_data, aio); } |
