aboutsummaryrefslogtreecommitdiff
path: root/src/core/endpt.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-31 17:59:01 -0700
committerGarrett D'Amore <garrett@damore.org>2017-09-22 11:47:07 -0700
commitd72076207a2fad96ff014a81366868fb47a0ed1b (patch)
tree5a4f67ab607ef6690e983c2d1ab2c64062027e52 /src/core/endpt.c
parent366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff)
downloadnng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them abstractly in more places without inlining them. This will be used for the ZeroTier transport to allow us to create operations consisting of just the AIO. Furthermore, we provide accessors for some of the aio members, in the hopes that we will be able to wrap these for "safe" version of the AIO capability to export to applications, and to protocol and transport implementors. While here we cleaned up the protocol details to use consistently shorter names (no nni_ prefix for static symbols needed), and we also fixed a bug in the surveyor code.
Diffstat (limited to 'src/core/endpt.c')
-rw-r--r--src/core/endpt.c85
1 files changed, 43 insertions, 42 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 9dc33867..9122bb58 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -31,10 +31,10 @@ struct nni_ep {
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_list ep_pipes;
- nni_aio ep_acc_aio;
- nni_aio ep_con_aio;
- nni_aio ep_con_syn; // used for sync connect
- nni_aio ep_tmo_aio; // backoff timer
+ nni_aio * ep_acc_aio;
+ nni_aio * ep_con_aio;
+ nni_aio * ep_con_syn; // used for sync connect
+ nni_aio * ep_tmo_aio; // backoff timer
nni_duration ep_maxrtime; // maximum time for reconnect
nni_duration ep_currtime; // current time for reconnect
nni_duration ep_inirtime; // initial time for reconnect
@@ -96,15 +96,15 @@ nni_ep_destroy(nni_ep *ep)
nni_sock_ep_remove(ep->ep_sock, ep);
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_tmo_aio);
+ nni_aio_stop(ep->ep_acc_aio);
+ nni_aio_stop(ep->ep_con_aio);
+ nni_aio_stop(ep->ep_con_syn);
+ nni_aio_stop(ep->ep_tmo_aio);
- nni_aio_fini(&ep->ep_acc_aio);
- nni_aio_fini(&ep->ep_con_aio);
- nni_aio_fini(&ep->ep_con_syn);
- nni_aio_fini(&ep->ep_tmo_aio);
+ nni_aio_fini(ep->ep_acc_aio);
+ nni_aio_fini(ep->ep_con_aio);
+ nni_aio_fini(ep->ep_con_syn);
+ nni_aio_fini(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_data != NULL) {
@@ -154,12 +154,12 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
nni_mtx_init(&ep->ep_mtx);
nni_cv_init(&ep->ep_cv, &ep->ep_mtx);
- nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep);
- nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep);
- nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep);
- nni_aio_init(&ep->ep_con_syn, NULL, NULL);
- if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
+ if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
+ ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
((rv = nni_sock_ep_add(s, ep)) != 0)) {
nni_ep_destroy(ep);
@@ -246,10 +246,10 @@ nni_ep_shutdown(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
// Abort any remaining in-flight operations.
- nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_tmo_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_acc_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_con_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_con_syn, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_tmo_aio, NNG_ECLOSED);
// Stop the underlying transport.
ep->ep_ops.ep_close(ep->ep_data);
@@ -273,10 +273,10 @@ nni_ep_close(nni_ep *ep)
nni_ep_shutdown(ep);
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_tmo_aio);
+ nni_aio_stop(ep->ep_acc_aio);
+ nni_aio_stop(ep->ep_con_aio);
+ nni_aio_stop(ep->ep_con_syn);
+ nni_aio_stop(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
NNI_LIST_FOREACH (&ep->ep_pipes, p) {
@@ -327,10 +327,11 @@ nni_ep_tmo_start(nni_ep *ep)
// have a statistically perfect distribution with the modulo of
// the random number, but this really doesn't matter.
- ep->ep_tmo_aio.a_expire =
- nni_clock() + (backoff ? nni_random() % backoff : 0);
+ nni_aio_set_timeout(ep->ep_tmo_aio,
+ nni_clock() + (backoff ? nni_random() % backoff : 0));
+
ep->ep_tmo_run = 1;
- if (nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
+ if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
ep->ep_tmo_run = 0;
}
}
@@ -339,7 +340,7 @@ static void
nni_ep_tmo_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_tmo_aio;
+ nni_aio *aio = ep->ep_tmo_aio;
nni_mtx_lock(&ep->ep_mtx);
if (nni_aio_result(aio) == NNG_ETIMEDOUT) {
@@ -356,11 +357,11 @@ static void
nni_ep_con_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_con_aio;
+ nni_aio *aio = ep->ep_con_aio;
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- rv = nni_pipe_create(ep, aio->a_pipe);
+ rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
}
nni_mtx_lock(&ep->ep_mtx);
switch (rv) {
@@ -392,14 +393,14 @@ nni_ep_con_cb(void *arg)
static void
nni_ep_con_start(nni_ep *ep)
{
- nni_aio *aio = &ep->ep_con_aio;
+ nni_aio *aio = ep->ep_con_aio;
// Call with the Endpoint lock held.
if (ep->ep_closing) {
return;
}
- aio->a_endpt = ep->ep_data;
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_connect(ep->ep_data, aio);
}
@@ -436,8 +437,8 @@ nni_ep_dial(nni_ep *ep, int flags)
}
// Synchronous mode: so we have to wait for it to complete.
- aio = &ep->ep_con_syn;
- aio->a_endpt = ep->ep_data;
+ aio = ep->ep_con_syn;
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_connect(ep->ep_data, aio);
ep->ep_started = 1;
nni_mtx_unlock(&ep->ep_mtx);
@@ -446,7 +447,7 @@ nni_ep_dial(nni_ep *ep, int flags)
// As we're synchronous, we also have to handle the completion.
if (((rv = nni_aio_result(aio)) != 0) ||
- ((rv = nni_pipe_create(ep, aio->a_pipe)) != 0)) {
+ ((rv = nni_pipe_create(ep, nni_aio_get_pipe(aio))) != 0)) {
nni_mtx_lock(&ep->ep_mtx);
ep->ep_started = 0;
nni_mtx_unlock(&ep->ep_mtx);
@@ -458,12 +459,12 @@ static void
nni_ep_acc_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_acc_aio;
+ nni_aio *aio = ep->ep_acc_aio;
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- NNI_ASSERT(aio->a_pipe != NULL);
- rv = nni_pipe_create(ep, aio->a_pipe);
+ NNI_ASSERT(nni_aio_get_pipe(aio) != NULL);
+ rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
}
nni_mtx_lock(&ep->ep_mtx);
@@ -495,14 +496,14 @@ nni_ep_acc_cb(void *arg)
static void
nni_ep_acc_start(nni_ep *ep)
{
- nni_aio *aio = &ep->ep_acc_aio;
+ nni_aio *aio = ep->ep_acc_aio;
// Call with the Endpoint lock held.
if (ep->ep_closing) {
return;
}
- aio->a_pipe = NULL;
- aio->a_endpt = ep->ep_data;
+ nni_aio_set_pipe(aio, NULL);
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_accept(ep->ep_data, aio);
}