aboutsummaryrefslogtreecommitdiff
path: root/src/core/endpt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/endpt.c')
-rw-r--r--src/core/endpt.c85
1 files changed, 43 insertions, 42 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 9dc33867..9122bb58 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -31,10 +31,10 @@ struct nni_ep {
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_list ep_pipes;
- nni_aio ep_acc_aio;
- nni_aio ep_con_aio;
- nni_aio ep_con_syn; // used for sync connect
- nni_aio ep_tmo_aio; // backoff timer
+ nni_aio * ep_acc_aio;
+ nni_aio * ep_con_aio;
+ nni_aio * ep_con_syn; // used for sync connect
+ nni_aio * ep_tmo_aio; // backoff timer
nni_duration ep_maxrtime; // maximum time for reconnect
nni_duration ep_currtime; // current time for reconnect
nni_duration ep_inirtime; // initial time for reconnect
@@ -96,15 +96,15 @@ nni_ep_destroy(nni_ep *ep)
nni_sock_ep_remove(ep->ep_sock, ep);
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_tmo_aio);
+ nni_aio_stop(ep->ep_acc_aio);
+ nni_aio_stop(ep->ep_con_aio);
+ nni_aio_stop(ep->ep_con_syn);
+ nni_aio_stop(ep->ep_tmo_aio);
- nni_aio_fini(&ep->ep_acc_aio);
- nni_aio_fini(&ep->ep_con_aio);
- nni_aio_fini(&ep->ep_con_syn);
- nni_aio_fini(&ep->ep_tmo_aio);
+ nni_aio_fini(ep->ep_acc_aio);
+ nni_aio_fini(ep->ep_con_aio);
+ nni_aio_fini(ep->ep_con_syn);
+ nni_aio_fini(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_data != NULL) {
@@ -154,12 +154,12 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
nni_mtx_init(&ep->ep_mtx);
nni_cv_init(&ep->ep_cv, &ep->ep_mtx);
- nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep);
- nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep);
- nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep);
- nni_aio_init(&ep->ep_con_syn, NULL, NULL);
- if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
+ if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
+ ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
((rv = nni_sock_ep_add(s, ep)) != 0)) {
nni_ep_destroy(ep);
@@ -246,10 +246,10 @@ nni_ep_shutdown(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
// Abort any remaining in-flight operations.
- nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_tmo_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_acc_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_con_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_con_syn, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_tmo_aio, NNG_ECLOSED);
// Stop the underlying transport.
ep->ep_ops.ep_close(ep->ep_data);
@@ -273,10 +273,10 @@ nni_ep_close(nni_ep *ep)
nni_ep_shutdown(ep);
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_tmo_aio);
+ nni_aio_stop(ep->ep_acc_aio);
+ nni_aio_stop(ep->ep_con_aio);
+ nni_aio_stop(ep->ep_con_syn);
+ nni_aio_stop(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
NNI_LIST_FOREACH (&ep->ep_pipes, p) {
@@ -327,10 +327,11 @@ nni_ep_tmo_start(nni_ep *ep)
// have a statistically perfect distribution with the modulo of
// the random number, but this really doesn't matter.
- ep->ep_tmo_aio.a_expire =
- nni_clock() + (backoff ? nni_random() % backoff : 0);
+ nni_aio_set_timeout(ep->ep_tmo_aio,
+ nni_clock() + (backoff ? nni_random() % backoff : 0));
+
ep->ep_tmo_run = 1;
- if (nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
+ if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
ep->ep_tmo_run = 0;
}
}
@@ -339,7 +340,7 @@ static void
nni_ep_tmo_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_tmo_aio;
+ nni_aio *aio = ep->ep_tmo_aio;
nni_mtx_lock(&ep->ep_mtx);
if (nni_aio_result(aio) == NNG_ETIMEDOUT) {
@@ -356,11 +357,11 @@ static void
nni_ep_con_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_con_aio;
+ nni_aio *aio = ep->ep_con_aio;
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- rv = nni_pipe_create(ep, aio->a_pipe);
+ rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
}
nni_mtx_lock(&ep->ep_mtx);
switch (rv) {
@@ -392,14 +393,14 @@ nni_ep_con_cb(void *arg)
static void
nni_ep_con_start(nni_ep *ep)
{
- nni_aio *aio = &ep->ep_con_aio;
+ nni_aio *aio = ep->ep_con_aio;
// Call with the Endpoint lock held.
if (ep->ep_closing) {
return;
}
- aio->a_endpt = ep->ep_data;
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_connect(ep->ep_data, aio);
}
@@ -436,8 +437,8 @@ nni_ep_dial(nni_ep *ep, int flags)
}
// Synchronous mode: so we have to wait for it to complete.
- aio = &ep->ep_con_syn;
- aio->a_endpt = ep->ep_data;
+ aio = ep->ep_con_syn;
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_connect(ep->ep_data, aio);
ep->ep_started = 1;
nni_mtx_unlock(&ep->ep_mtx);
@@ -446,7 +447,7 @@ nni_ep_dial(nni_ep *ep, int flags)
// As we're synchronous, we also have to handle the completion.
if (((rv = nni_aio_result(aio)) != 0) ||
- ((rv = nni_pipe_create(ep, aio->a_pipe)) != 0)) {
+ ((rv = nni_pipe_create(ep, nni_aio_get_pipe(aio))) != 0)) {
nni_mtx_lock(&ep->ep_mtx);
ep->ep_started = 0;
nni_mtx_unlock(&ep->ep_mtx);
@@ -458,12 +459,12 @@ static void
nni_ep_acc_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_acc_aio;
+ nni_aio *aio = ep->ep_acc_aio;
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- NNI_ASSERT(aio->a_pipe != NULL);
- rv = nni_pipe_create(ep, aio->a_pipe);
+ NNI_ASSERT(nni_aio_get_pipe(aio) != NULL);
+ rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
}
nni_mtx_lock(&ep->ep_mtx);
@@ -495,14 +496,14 @@ nni_ep_acc_cb(void *arg)
static void
nni_ep_acc_start(nni_ep *ep)
{
- nni_aio *aio = &ep->ep_acc_aio;
+ nni_aio *aio = ep->ep_acc_aio;
// Call with the Endpoint lock held.
if (ep->ep_closing) {
return;
}
- aio->a_pipe = NULL;
- aio->a_endpt = ep->ep_data;
+ nni_aio_set_pipe(aio, NULL);
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_accept(ep->ep_data, aio);
}