diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-31 17:59:01 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-09-22 11:47:07 -0700 |
| commit | d72076207a2fad96ff014a81366868fb47a0ed1b (patch) | |
| tree | 5a4f67ab607ef6690e983c2d1ab2c64062027e52 /src/core | |
| parent | 366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff) | |
| download | nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2 nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip | |
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them
abstractly in more places without inlining them. This will be used
for the ZeroTier transport to allow us to create operations consisting
of just the AIO. Furthermore, we provide accessors for some of the
aio members, in the hopes that we will be able to wrap these for
"safe" version of the AIO capability to export to applications, and
to protocol and transport implementors.
While here we cleaned up the protocol details to use consistently
shorter names (no nni_ prefix for static symbols needed), and we
also fixed a bug in the surveyor code.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 75 | ||||
| -rw-r--r-- | src/core/aio.h | 26 | ||||
| -rw-r--r-- | src/core/endpt.c | 85 | ||||
| -rw-r--r-- | src/core/event.h | 3 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 59 | ||||
| -rw-r--r-- | src/core/pipe.c | 14 | ||||
| -rw-r--r-- | src/core/socket.c | 10 |
7 files changed, 183 insertions, 89 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 5b8c7970..9b308cd1 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -56,23 +56,34 @@ static nni_list nni_aio_expire_aios; static void nni_aio_expire_add(nni_aio *); -void -nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) +int +nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) { + nni_aio *aio; + + if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { + return (NNG_ENOMEM); + } memset(aio, 0, sizeof(*aio)); nni_cv_init(&aio->a_cv, &nni_aio_lk); aio->a_expire = NNI_TIME_NEVER; aio->a_init = 1; nni_task_init(NULL, &aio->a_task, cb, arg); + *aiop = aio; + return (0); } void nni_aio_fini(nni_aio *aio) { - nni_aio_stop(aio); + if (aio != NULL) { + nni_aio_stop(aio); - // At this point the AIO is done. - nni_cv_fini(&aio->a_cv); + // At this point the AIO is done. + nni_cv_fini(&aio->a_cv); + + NNI_FREE_STRUCT(aio); + } } // nni_aio_stop cancels any oustanding operation, and waits for the @@ -97,6 +108,60 @@ nni_aio_stop(nni_aio *aio) nni_aio_wait(aio); } +void +nni_aio_set_timeout(nni_aio *aio, nni_time when) +{ + aio->a_expire = when; +} + +void +nni_aio_set_msg(nni_aio *aio, nni_msg *msg) +{ + aio->a_msg = msg; +} + +nni_msg * +nni_aio_get_msg(nni_aio *aio) +{ + return (aio->a_msg); +} + +void +nni_aio_set_pipe(nni_aio *aio, void *p) +{ + aio->a_pipe = p; +} + +void * +nni_aio_get_pipe(nni_aio *aio) +{ + return (aio->a_pipe); +} + +void +nni_aio_set_ep(nni_aio *aio, void *ep) +{ + aio->a_endpt = ep; +} + +void * +nni_aio_get_ep(nni_aio *aio) +{ + return (aio->a_endpt); +} + +void +nni_aio_set_data(nni_aio *aio, void *data) +{ + aio->a_data = data; +} + +void * +nni_aio_get_data(nni_aio *aio) +{ + return (aio->a_data); +} + int nni_aio_result(nni_aio *aio) { diff --git a/src/core/aio.h b/src/core/aio.h index 9114c9fe..eae17446 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -52,6 +52,9 @@ struct nni_aio { // Resolver operations. nni_sockaddr *a_addr; + // Extra user data. + void *a_data; + // Provider-use fields. nni_aio_cancelfn a_prov_cancel; void * a_prov_data; @@ -65,7 +68,7 @@ struct nni_aio { // the supplied argument when the operation is complete. If NULL is // supplied for the callback, then nni_aio_wake is used in its place, // and the aio is used for the argument. -extern void nni_aio_init(nni_aio *, nni_cb, void *); +extern int nni_aio_init(nni_aio **, nni_cb, void *); // nni_aio_fini finalizes the aio, releasing resources (locks) // associated with it. The caller is responsible for ensuring that any @@ -83,6 +86,27 @@ extern void nni_aio_fini(nni_aio *); // use nni_aio_cancel instead.) extern void nni_aio_stop(nni_aio *); +// nni_aio_set_data sets user data. This should only be done by the +// consumer, initiating the I/O. The intention is to be able to store +// additional data for use when the operation callback is executed. +extern void nni_aio_set_data(nni_aio *, void *); + +// nni_aio_get_data returns the user data that was previously stored +// with nni_aio_set_data. +extern void *nni_aio_get_data(nni_aio *); + +extern void nni_aio_set_msg(nni_aio *, nni_msg *); +extern nni_msg *nni_aio_get_msg(nni_aio *); +extern void nni_aio_set_pipe(nni_aio *, void *); +extern void * nni_aio_get_pipe(nni_aio *); +extern void nni_aio_set_ep(nni_aio *, void *); +extern void * nni_aio_get_ep(nni_aio *); + +// nni_aio_set_timeout sets the timeout (absolute) when the AIO will +// be canceled. The cancelation does not happen until after nni_aio_start +// is called. +extern void nni_aio_set_timeout(nni_aio *, nni_time); + // nni_aio_result returns the result code (0 on success, or an NNG errno) // for the operation. It is only valid to call this when the operation is // complete (such as when the callback is executed or after nni_aio_wait diff --git a/src/core/endpt.c b/src/core/endpt.c index 9dc33867..9122bb58 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -31,10 +31,10 @@ struct nni_ep { nni_mtx ep_mtx; nni_cv ep_cv; nni_list ep_pipes; - nni_aio ep_acc_aio; - nni_aio ep_con_aio; - nni_aio ep_con_syn; // used for sync connect - nni_aio ep_tmo_aio; // backoff timer + nni_aio * ep_acc_aio; + nni_aio * ep_con_aio; + nni_aio * ep_con_syn; // used for sync connect + nni_aio * ep_tmo_aio; // backoff timer nni_duration ep_maxrtime; // maximum time for reconnect nni_duration ep_currtime; // current time for reconnect nni_duration ep_inirtime; // initial time for reconnect @@ -96,15 +96,15 @@ nni_ep_destroy(nni_ep *ep) nni_sock_ep_remove(ep->ep_sock, ep); - nni_aio_stop(&ep->ep_acc_aio); - nni_aio_stop(&ep->ep_con_aio); - nni_aio_stop(&ep->ep_con_syn); - nni_aio_stop(&ep->ep_tmo_aio); + nni_aio_stop(ep->ep_acc_aio); + nni_aio_stop(ep->ep_con_aio); + nni_aio_stop(ep->ep_con_syn); + nni_aio_stop(ep->ep_tmo_aio); - nni_aio_fini(&ep->ep_acc_aio); - nni_aio_fini(&ep->ep_con_aio); - nni_aio_fini(&ep->ep_con_syn); - nni_aio_fini(&ep->ep_tmo_aio); + nni_aio_fini(ep->ep_acc_aio); + nni_aio_fini(ep->ep_con_aio); + nni_aio_fini(ep->ep_con_syn); + nni_aio_fini(ep->ep_tmo_aio); nni_mtx_lock(&ep->ep_mtx); if (ep->ep_data != NULL) { @@ -154,12 +154,12 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode) nni_mtx_init(&ep->ep_mtx); nni_cv_init(&ep->ep_cv, &ep->ep_mtx); - nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep); - nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep); - nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep); - nni_aio_init(&ep->ep_con_syn, NULL, NULL); - if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || + if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) || + ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) || ((rv = nni_sock_ep_add(s, ep)) != 0)) { nni_ep_destroy(ep); @@ -246,10 +246,10 @@ nni_ep_shutdown(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); // Abort any remaining in-flight operations. - nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED); - nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED); - nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED); - nni_aio_cancel(&ep->ep_tmo_aio, NNG_ECLOSED); + nni_aio_cancel(ep->ep_acc_aio, NNG_ECLOSED); + nni_aio_cancel(ep->ep_con_aio, NNG_ECLOSED); + nni_aio_cancel(ep->ep_con_syn, NNG_ECLOSED); + nni_aio_cancel(ep->ep_tmo_aio, NNG_ECLOSED); // Stop the underlying transport. ep->ep_ops.ep_close(ep->ep_data); @@ -273,10 +273,10 @@ nni_ep_close(nni_ep *ep) nni_ep_shutdown(ep); - nni_aio_stop(&ep->ep_acc_aio); - nni_aio_stop(&ep->ep_con_aio); - nni_aio_stop(&ep->ep_con_syn); - nni_aio_stop(&ep->ep_tmo_aio); + nni_aio_stop(ep->ep_acc_aio); + nni_aio_stop(ep->ep_con_aio); + nni_aio_stop(ep->ep_con_syn); + nni_aio_stop(ep->ep_tmo_aio); nni_mtx_lock(&ep->ep_mtx); NNI_LIST_FOREACH (&ep->ep_pipes, p) { @@ -327,10 +327,11 @@ nni_ep_tmo_start(nni_ep *ep) // have a statistically perfect distribution with the modulo of // the random number, but this really doesn't matter. - ep->ep_tmo_aio.a_expire = - nni_clock() + (backoff ? nni_random() % backoff : 0); + nni_aio_set_timeout(ep->ep_tmo_aio, + nni_clock() + (backoff ? nni_random() % backoff : 0)); + ep->ep_tmo_run = 1; - if (nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) { + if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) { ep->ep_tmo_run = 0; } } @@ -339,7 +340,7 @@ static void nni_ep_tmo_cb(void *arg) { nni_ep * ep = arg; - nni_aio *aio = &ep->ep_tmo_aio; + nni_aio *aio = ep->ep_tmo_aio; nni_mtx_lock(&ep->ep_mtx); if (nni_aio_result(aio) == NNG_ETIMEDOUT) { @@ -356,11 +357,11 @@ static void nni_ep_con_cb(void *arg) { nni_ep * ep = arg; - nni_aio *aio = &ep->ep_con_aio; + nni_aio *aio = ep->ep_con_aio; int rv; if ((rv = nni_aio_result(aio)) == 0) { - rv = nni_pipe_create(ep, aio->a_pipe); + rv = nni_pipe_create(ep, nni_aio_get_pipe(aio)); } nni_mtx_lock(&ep->ep_mtx); switch (rv) { @@ -392,14 +393,14 @@ nni_ep_con_cb(void *arg) static void nni_ep_con_start(nni_ep *ep) { - nni_aio *aio = &ep->ep_con_aio; + nni_aio *aio = ep->ep_con_aio; // Call with the Endpoint lock held. if (ep->ep_closing) { return; } - aio->a_endpt = ep->ep_data; + nni_aio_set_ep(aio, ep->ep_data); ep->ep_ops.ep_connect(ep->ep_data, aio); } @@ -436,8 +437,8 @@ nni_ep_dial(nni_ep *ep, int flags) } // Synchronous mode: so we have to wait for it to complete. - aio = &ep->ep_con_syn; - aio->a_endpt = ep->ep_data; + aio = ep->ep_con_syn; + nni_aio_set_ep(aio, ep->ep_data); ep->ep_ops.ep_connect(ep->ep_data, aio); ep->ep_started = 1; nni_mtx_unlock(&ep->ep_mtx); @@ -446,7 +447,7 @@ nni_ep_dial(nni_ep *ep, int flags) // As we're synchronous, we also have to handle the completion. if (((rv = nni_aio_result(aio)) != 0) || - ((rv = nni_pipe_create(ep, aio->a_pipe)) != 0)) { + ((rv = nni_pipe_create(ep, nni_aio_get_pipe(aio))) != 0)) { nni_mtx_lock(&ep->ep_mtx); ep->ep_started = 0; nni_mtx_unlock(&ep->ep_mtx); @@ -458,12 +459,12 @@ static void nni_ep_acc_cb(void *arg) { nni_ep * ep = arg; - nni_aio *aio = &ep->ep_acc_aio; + nni_aio *aio = ep->ep_acc_aio; int rv; if ((rv = nni_aio_result(aio)) == 0) { - NNI_ASSERT(aio->a_pipe != NULL); - rv = nni_pipe_create(ep, aio->a_pipe); + NNI_ASSERT(nni_aio_get_pipe(aio) != NULL); + rv = nni_pipe_create(ep, nni_aio_get_pipe(aio)); } nni_mtx_lock(&ep->ep_mtx); @@ -495,14 +496,14 @@ nni_ep_acc_cb(void *arg) static void nni_ep_acc_start(nni_ep *ep) { - nni_aio *aio = &ep->ep_acc_aio; + nni_aio *aio = ep->ep_acc_aio; // Call with the Endpoint lock held. if (ep->ep_closing) { return; } - aio->a_pipe = NULL; - aio->a_endpt = ep->ep_data; + nni_aio_set_pipe(aio, NULL); + nni_aio_set_ep(aio, ep->ep_data); ep->ep_ops.ep_accept(ep->ep_data, aio); } diff --git a/src/core/event.h b/src/core/event.h index 22af096e..9536d206 100644 --- a/src/core/event.h +++ b/src/core/event.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -25,7 +26,7 @@ struct nng_notify { void * n_arg; int n_type; nni_sock * n_sock; - nni_aio n_aio; + nni_aio * n_aio; }; extern void nni_ev_init(nni_event *, int, nni_sock *); diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index ef7dd5a2..6fd95f98 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -166,14 +166,14 @@ nni_msgq_run_putq(nni_msgq *mq) size_t len; while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - msg = waio->a_msg; + msg = nni_aio_get_msg(waio); len = nni_msg_len(msg); // The presence of any blocked reader indicates that // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - waio->a_msg = NULL; + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(raio); nni_aio_list_remove(waio); @@ -190,7 +190,7 @@ nni_msgq_run_putq(nni_msgq *mq) mq->mq_put = 0; } mq->mq_len++; - waio->a_msg = NULL; + nni_aio_set_msg(waio, NULL); nni_aio_finish(waio, 0, len); continue; } @@ -215,7 +215,6 @@ nni_msgq_run_getq(nni_msgq *mq) mq->mq_get = 0; } mq->mq_len--; - raio->a_msg = msg; nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); continue; @@ -223,8 +222,8 @@ nni_msgq_run_getq(nni_msgq *mq) // Nothing queued (unbuffered?), maybe a writer is waiting. if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - msg = waio->a_msg; - waio->a_msg = NULL; + msg = nni_aio_get_msg(waio); + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); nni_aio_list_remove(raio); @@ -397,34 +396,38 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { - nni_aio aio; - int rv; - - nni_aio_init(&aio, NULL, NULL); - aio.a_expire = expire; - nni_msgq_aio_get(mq, &aio); - nni_aio_wait(&aio); - if ((rv = nni_aio_result(&aio)) == 0) { - *msgp = aio.a_msg; - aio.a_msg = NULL; - } - nni_aio_fini(&aio); + nni_aio *aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + nni_aio_set_timeout(aio, expire); + nni_msgq_aio_get(mq, aio); + nni_aio_wait(aio); + if ((rv = nni_aio_result(aio)) == 0) { + *msgp = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); + } + nni_aio_fini(aio); return (rv); } int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { - nni_aio aio; - int rv; - - nni_aio_init(&aio, NULL, NULL); - aio.a_expire = expire; - aio.a_msg = msg; - nni_msgq_aio_put(mq, &aio); - nni_aio_wait(&aio); - rv = nni_aio_result(&aio); - nni_aio_fini(&aio); + nni_aio *aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + nni_aio_set_timeout(aio, expire); + nni_aio_set_msg(aio, msg); + nni_msgq_aio_put(mq, aio); + nni_aio_wait(aio); + rv = nni_aio_result(aio); + nni_aio_fini(aio); return (rv); } diff --git a/src/core/pipe.c b/src/core/pipe.c index 0c024c66..0670ed01 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -30,7 +30,7 @@ struct nni_pipe { nni_mtx p_mtx; nni_cv p_cv; nni_list_node p_reap_node; - nni_aio p_start_aio; + nni_aio * p_start_aio; }; static nni_idhash *nni_pipes; @@ -99,7 +99,7 @@ nni_pipe_destroy(nni_pipe *p) } // Stop any pending negotiation. - nni_aio_stop(&p->p_start_aio); + nni_aio_stop(p->p_start_aio); // Make sure any unlocked holders are done with this. // This happens during initialization for example. @@ -112,7 +112,7 @@ nni_pipe_destroy(nni_pipe *p) // We have exclusive access at this point, so we can check if // we are still on any lists. - nni_aio_fini(&p->p_start_aio); + nni_aio_fini(p->p_start_aio); if (nni_list_node_active(&p->p_ep_node)) { nni_ep_pipe_remove(p->p_ep, p); @@ -172,7 +172,7 @@ nni_pipe_close(nni_pipe *p) nni_mtx_unlock(&p->p_mtx); // abort any pending negotiation/start process. - nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED); + nni_aio_cancel(p->p_start_aio, NNG_ECLOSED); } void @@ -205,7 +205,7 @@ static void nni_pipe_start_cb(void *arg) { nni_pipe *p = arg; - nni_aio * aio = &p->p_start_aio; + nni_aio * aio = p->p_start_aio; int rv; if ((rv = nni_aio_result(aio)) != 0) { @@ -270,9 +270,9 @@ void nni_pipe_start(nni_pipe *p) { if (p->p_tran_ops.p_start == NULL) { - nni_aio_finish(&p->p_start_aio, 0, 0); + nni_aio_finish(p->p_start_aio, 0, 0); } else { - p->p_tran_ops.p_start(p->p_tran_data, &p->p_start_aio); + p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio); } } diff --git a/src/core/socket.c b/src/core/socket.c index 6a242558..01fd9a0c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -222,7 +222,7 @@ nni_sock_cansend_cb(void *arg) nni_notify *notify = arg; nni_sock * sock = notify->n_sock; - if (nni_aio_result(¬ify->n_aio) != 0) { + if (nni_aio_result(notify->n_aio) != 0) { return; } @@ -235,7 +235,7 @@ nni_sock_canrecv_cb(void *arg) nni_notify *notify = arg; nni_sock * sock = notify->n_sock; - if (nni_aio_result(¬ify->n_aio) != 0) { + if (nni_aio_result(notify->n_aio) != 0) { return; } @@ -259,11 +259,11 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) switch (type) { case NNG_EV_CAN_RCV: nni_aio_init(¬ify->n_aio, nni_sock_canrecv_cb, notify); - nni_msgq_aio_notify_get(sock->s_urq, ¬ify->n_aio); + nni_msgq_aio_notify_get(sock->s_urq, notify->n_aio); break; case NNG_EV_CAN_SND: nni_aio_init(¬ify->n_aio, nni_sock_cansend_cb, notify); - nni_msgq_aio_notify_put(sock->s_uwq, ¬ify->n_aio); + nni_msgq_aio_notify_put(sock->s_uwq, notify->n_aio); break; default: NNI_FREE_STRUCT(notify); @@ -276,7 +276,7 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) void nni_sock_unnotify(nni_sock *sock, nni_notify *notify) { - nni_aio_fini(¬ify->n_aio); + nni_aio_fini(notify->n_aio); NNI_FREE_STRUCT(notify); } |
