diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 75 | ||||
| -rw-r--r-- | src/core/aio.h | 26 | ||||
| -rw-r--r-- | src/core/endpt.c | 85 | ||||
| -rw-r--r-- | src/core/event.h | 3 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 59 | ||||
| -rw-r--r-- | src/core/pipe.c | 14 | ||||
| -rw-r--r-- | src/core/socket.c | 10 |
7 files changed, 183 insertions, 89 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 5b8c7970..9b308cd1 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -56,23 +56,34 @@ static nni_list nni_aio_expire_aios; static void nni_aio_expire_add(nni_aio *); -void -nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) +int +nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) { + nni_aio *aio; + + if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { + return (NNG_ENOMEM); + } memset(aio, 0, sizeof(*aio)); nni_cv_init(&aio->a_cv, &nni_aio_lk); aio->a_expire = NNI_TIME_NEVER; aio->a_init = 1; nni_task_init(NULL, &aio->a_task, cb, arg); + *aiop = aio; + return (0); } void nni_aio_fini(nni_aio *aio) { - nni_aio_stop(aio); + if (aio != NULL) { + nni_aio_stop(aio); - // At this point the AIO is done. - nni_cv_fini(&aio->a_cv); + // At this point the AIO is done. + nni_cv_fini(&aio->a_cv); + + NNI_FREE_STRUCT(aio); + } } // nni_aio_stop cancels any oustanding operation, and waits for the @@ -97,6 +108,60 @@ nni_aio_stop(nni_aio *aio) nni_aio_wait(aio); } +void +nni_aio_set_timeout(nni_aio *aio, nni_time when) +{ + aio->a_expire = when; +} + +void +nni_aio_set_msg(nni_aio *aio, nni_msg *msg) +{ + aio->a_msg = msg; +} + +nni_msg * +nni_aio_get_msg(nni_aio *aio) +{ + return (aio->a_msg); +} + +void +nni_aio_set_pipe(nni_aio *aio, void *p) +{ + aio->a_pipe = p; +} + +void * +nni_aio_get_pipe(nni_aio *aio) +{ + return (aio->a_pipe); +} + +void +nni_aio_set_ep(nni_aio *aio, void *ep) +{ + aio->a_endpt = ep; +} + +void * +nni_aio_get_ep(nni_aio *aio) +{ + return (aio->a_endpt); +} + +void +nni_aio_set_data(nni_aio *aio, void *data) +{ + aio->a_data = data; +} + +void * +nni_aio_get_data(nni_aio *aio) +{ + return (aio->a_data); +} + int nni_aio_result(nni_aio *aio) { diff --git a/src/core/aio.h b/src/core/aio.h index 9114c9fe..eae17446 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -52,6 +52,9 @@ struct nni_aio { // Resolver operations. nni_sockaddr *a_addr; + // Extra user data. + void *a_data; + // Provider-use fields. nni_aio_cancelfn a_prov_cancel; void * a_prov_data; @@ -65,7 +68,7 @@ struct nni_aio { // the supplied argument when the operation is complete. If NULL is // supplied for the callback, then nni_aio_wake is used in its place, // and the aio is used for the argument. -extern void nni_aio_init(nni_aio *, nni_cb, void *); +extern int nni_aio_init(nni_aio **, nni_cb, void *); // nni_aio_fini finalizes the aio, releasing resources (locks) // associated with it. The caller is responsible for ensuring that any @@ -83,6 +86,27 @@ extern void nni_aio_fini(nni_aio *); // use nni_aio_cancel instead.) extern void nni_aio_stop(nni_aio *); +// nni_aio_set_data sets user data. This should only be done by the +// consumer, initiating the I/O. The intention is to be able to store +// additional data for use when the operation callback is executed. +extern void nni_aio_set_data(nni_aio *, void *); + +// nni_aio_get_data returns the user data that was previously stored +// with nni_aio_set_data. +extern void *nni_aio_get_data(nni_aio *); + +extern void nni_aio_set_msg(nni_aio *, nni_msg *); +extern nni_msg *nni_aio_get_msg(nni_aio *); +extern void nni_aio_set_pipe(nni_aio *, void *); +extern void * nni_aio_get_pipe(nni_aio *); +extern void nni_aio_set_ep(nni_aio *, void *); +extern void * nni_aio_get_ep(nni_aio *); + +// nni_aio_set_timeout sets the timeout (absolute) when the AIO will +// be canceled. The cancelation does not happen until after nni_aio_start +// is called. +extern void nni_aio_set_timeout(nni_aio *, nni_time); + // nni_aio_result returns the result code (0 on success, or an NNG errno) // for the operation. It is only valid to call this when the operation is // complete (such as when the callback is executed or after nni_aio_wait diff --git a/src/core/endpt.c b/src/core/endpt.c index 9dc33867..9122bb58 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -31,10 +31,10 @@ struct nni_ep { nni_mtx ep_mtx; nni_cv ep_cv; nni_list ep_pipes; - nni_aio ep_acc_aio; - nni_aio ep_con_aio; - nni_aio ep_con_syn; // used for sync connect - nni_aio ep_tmo_aio; // backoff timer + nni_aio * ep_acc_aio; + nni_aio * ep_con_aio; + nni_aio * ep_con_syn; // used for sync connect + nni_aio * ep_tmo_aio; // backoff timer nni_duration ep_maxrtime; // maximum time for reconnect nni_duration ep_currtime; // current time for reconnect nni_duration ep_inirtime; // initial time for reconnect @@ -96,15 +96,15 @@ nni_ep_destroy(nni_ep *ep) nni_sock_ep_remove(ep->ep_sock, ep); - nni_aio_stop(&ep->ep_acc_aio); - nni_aio_stop(&ep->ep_con_aio); - nni_aio_stop(&ep->ep_con_syn); - nni_aio_stop(&ep->ep_tmo_aio); + nni_aio_stop(ep->ep_acc_aio); + nni_aio_stop(ep->ep_con_aio); + nni_aio_stop(ep->ep_con_syn); + nni_aio_stop(ep->ep_tmo_aio); - nni_aio_fini(&ep->ep_acc_aio); - nni_aio_fini(&ep->ep_con_aio); - nni_aio_fini(&ep->ep_con_syn); - nni_aio_fini(&ep->ep_tmo_aio); + nni_aio_fini(ep->ep_acc_aio); + nni_aio_fini(ep->ep_con_aio); + nni_aio_fini(ep->ep_con_syn); + nni_aio_fini(ep->ep_tmo_aio); nni_mtx_lock(&ep->ep_mtx); if (ep->ep_data != NULL) { @@ -154,12 +154,12 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode) nni_mtx_init(&ep->ep_mtx); nni_cv_init(&ep->ep_cv, &ep->ep_mtx); - nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep); - nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep); - nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep); - nni_aio_init(&ep->ep_con_syn, NULL, NULL); - if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || + if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) || + ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) || ((rv = nni_sock_ep_add(s, ep)) != 0)) { nni_ep_destroy(ep); @@ -246,10 +246,10 @@ nni_ep_shutdown(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); // Abort any remaining in-flight operations. - nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED); - nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED); - nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED); - nni_aio_cancel(&ep->ep_tmo_aio, NNG_ECLOSED); + nni_aio_cancel(ep->ep_acc_aio, NNG_ECLOSED); + nni_aio_cancel(ep->ep_con_aio, NNG_ECLOSED); + nni_aio_cancel(ep->ep_con_syn, NNG_ECLOSED); + nni_aio_cancel(ep->ep_tmo_aio, NNG_ECLOSED); // Stop the underlying transport. ep->ep_ops.ep_close(ep->ep_data); @@ -273,10 +273,10 @@ nni_ep_close(nni_ep *ep) nni_ep_shutdown(ep); - nni_aio_stop(&ep->ep_acc_aio); - nni_aio_stop(&ep->ep_con_aio); - nni_aio_stop(&ep->ep_con_syn); - nni_aio_stop(&ep->ep_tmo_aio); + nni_aio_stop(ep->ep_acc_aio); + nni_aio_stop(ep->ep_con_aio); + nni_aio_stop(ep->ep_con_syn); + nni_aio_stop(ep->ep_tmo_aio); nni_mtx_lock(&ep->ep_mtx); NNI_LIST_FOREACH (&ep->ep_pipes, p) { @@ -327,10 +327,11 @@ nni_ep_tmo_start(nni_ep *ep) // have a statistically perfect distribution with the modulo of // the random number, but this really doesn't matter. - ep->ep_tmo_aio.a_expire = - nni_clock() + (backoff ? nni_random() % backoff : 0); + nni_aio_set_timeout(ep->ep_tmo_aio, + nni_clock() + (backoff ? nni_random() % backoff : 0)); + ep->ep_tmo_run = 1; - if (nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) { + if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) { ep->ep_tmo_run = 0; } } @@ -339,7 +340,7 @@ static void nni_ep_tmo_cb(void *arg) { nni_ep * ep = arg; - nni_aio *aio = &ep->ep_tmo_aio; + nni_aio *aio = ep->ep_tmo_aio; nni_mtx_lock(&ep->ep_mtx); if (nni_aio_result(aio) == NNG_ETIMEDOUT) { @@ -356,11 +357,11 @@ static void nni_ep_con_cb(void *arg) { nni_ep * ep = arg; - nni_aio *aio = &ep->ep_con_aio; + nni_aio *aio = ep->ep_con_aio; int rv; if ((rv = nni_aio_result(aio)) == 0) { - rv = nni_pipe_create(ep, aio->a_pipe); + rv = nni_pipe_create(ep, nni_aio_get_pipe(aio)); } nni_mtx_lock(&ep->ep_mtx); switch (rv) { @@ -392,14 +393,14 @@ nni_ep_con_cb(void *arg) static void nni_ep_con_start(nni_ep *ep) { - nni_aio *aio = &ep->ep_con_aio; + nni_aio *aio = ep->ep_con_aio; // Call with the Endpoint lock held. if (ep->ep_closing) { return; } - aio->a_endpt = ep->ep_data; + nni_aio_set_ep(aio, ep->ep_data); ep->ep_ops.ep_connect(ep->ep_data, aio); } @@ -436,8 +437,8 @@ nni_ep_dial(nni_ep *ep, int flags) } // Synchronous mode: so we have to wait for it to complete. - aio = &ep->ep_con_syn; - aio->a_endpt = ep->ep_data; + aio = ep->ep_con_syn; + nni_aio_set_ep(aio, ep->ep_data); ep->ep_ops.ep_connect(ep->ep_data, aio); ep->ep_started = 1; nni_mtx_unlock(&ep->ep_mtx); @@ -446,7 +447,7 @@ nni_ep_dial(nni_ep *ep, int flags) // As we're synchronous, we also have to handle the completion. if (((rv = nni_aio_result(aio)) != 0) || - ((rv = nni_pipe_create(ep, aio->a_pipe)) != 0)) { + ((rv = nni_pipe_create(ep, nni_aio_get_pipe(aio))) != 0)) { nni_mtx_lock(&ep->ep_mtx); ep->ep_started = 0; nni_mtx_unlock(&ep->ep_mtx); @@ -458,12 +459,12 @@ static void nni_ep_acc_cb(void *arg) { nni_ep * ep = arg; - nni_aio *aio = &ep->ep_acc_aio; + nni_aio *aio = ep->ep_acc_aio; int rv; if ((rv = nni_aio_result(aio)) == 0) { - NNI_ASSERT(aio->a_pipe != NULL); - rv = nni_pipe_create(ep, aio->a_pipe); + NNI_ASSERT(nni_aio_get_pipe(aio) != NULL); + rv = nni_pipe_create(ep, nni_aio_get_pipe(aio)); } nni_mtx_lock(&ep->ep_mtx); @@ -495,14 +496,14 @@ nni_ep_acc_cb(void *arg) static void nni_ep_acc_start(nni_ep *ep) { - nni_aio *aio = &ep->ep_acc_aio; + nni_aio *aio = ep->ep_acc_aio; // Call with the Endpoint lock held. if (ep->ep_closing) { return; } - aio->a_pipe = NULL; - aio->a_endpt = ep->ep_data; + nni_aio_set_pipe(aio, NULL); + nni_aio_set_ep(aio, ep->ep_data); ep->ep_ops.ep_accept(ep->ep_data, aio); } diff --git a/src/core/event.h b/src/core/event.h index 22af096e..9536d206 100644 --- a/src/core/event.h +++ b/src/core/event.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -25,7 +26,7 @@ struct nng_notify { void * n_arg; int n_type; nni_sock * n_sock; - nni_aio n_aio; + nni_aio * n_aio; }; extern void nni_ev_init(nni_event *, int, nni_sock *); diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index ef7dd5a2..6fd95f98 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -166,14 +166,14 @@ nni_msgq_run_putq(nni_msgq *mq) size_t len; while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - msg = waio->a_msg; + msg = nni_aio_get_msg(waio); len = nni_msg_len(msg); // The presence of any blocked reader indicates that // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - waio->a_msg = NULL; + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(raio); nni_aio_list_remove(waio); @@ -190,7 +190,7 @@ nni_msgq_run_putq(nni_msgq *mq) mq->mq_put = 0; } mq->mq_len++; - waio->a_msg = NULL; + nni_aio_set_msg(waio, NULL); nni_aio_finish(waio, 0, len); continue; } @@ -215,7 +215,6 @@ nni_msgq_run_getq(nni_msgq *mq) mq->mq_get = 0; } mq->mq_len--; - raio->a_msg = msg; nni_aio_list_remove(raio); nni_aio_finish_msg(raio, msg); continue; @@ -223,8 +222,8 @@ nni_msgq_run_getq(nni_msgq *mq) // Nothing queued (unbuffered?), maybe a writer is waiting. if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - msg = waio->a_msg; - waio->a_msg = NULL; + msg = nni_aio_get_msg(waio); + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); nni_aio_list_remove(raio); @@ -397,34 +396,38 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { - nni_aio aio; - int rv; - - nni_aio_init(&aio, NULL, NULL); - aio.a_expire = expire; - nni_msgq_aio_get(mq, &aio); - nni_aio_wait(&aio); - if ((rv = nni_aio_result(&aio)) == 0) { - *msgp = aio.a_msg; - aio.a_msg = NULL; - } - nni_aio_fini(&aio); + nni_aio *aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + nni_aio_set_timeout(aio, expire); + nni_msgq_aio_get(mq, aio); + nni_aio_wait(aio); + if ((rv = nni_aio_result(aio)) == 0) { + *msgp = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); + } + nni_aio_fini(aio); return (rv); } int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { - nni_aio aio; - int rv; - - nni_aio_init(&aio, NULL, NULL); - aio.a_expire = expire; - aio.a_msg = msg; - nni_msgq_aio_put(mq, &aio); - nni_aio_wait(&aio); - rv = nni_aio_result(&aio); - nni_aio_fini(&aio); + nni_aio *aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + nni_aio_set_timeout(aio, expire); + nni_aio_set_msg(aio, msg); + nni_msgq_aio_put(mq, aio); + nni_aio_wait(aio); + rv = nni_aio_result(aio); + nni_aio_fini(aio); return (rv); } diff --git a/src/core/pipe.c b/src/core/pipe.c index 0c024c66..0670ed01 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -30,7 +30,7 @@ struct nni_pipe { nni_mtx p_mtx; nni_cv p_cv; nni_list_node p_reap_node; - nni_aio p_start_aio; + nni_aio * p_start_aio; }; static nni_idhash *nni_pipes; @@ -99,7 +99,7 @@ nni_pipe_destroy(nni_pipe *p) } // Stop any pending negotiation. - nni_aio_stop(&p->p_start_aio); + nni_aio_stop(p->p_start_aio); // Make sure any unlocked holders are done with this. // This happens during initialization for example. @@ -112,7 +112,7 @@ nni_pipe_destroy(nni_pipe *p) // We have exclusive access at this point, so we can check if // we are still on any lists. - nni_aio_fini(&p->p_start_aio); + nni_aio_fini(p->p_start_aio); if (nni_list_node_active(&p->p_ep_node)) { nni_ep_pipe_remove(p->p_ep, p); @@ -172,7 +172,7 @@ nni_pipe_close(nni_pipe *p) nni_mtx_unlock(&p->p_mtx); // abort any pending negotiation/start process. - nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED); + nni_aio_cancel(p->p_start_aio, NNG_ECLOSED); } void @@ -205,7 +205,7 @@ static void nni_pipe_start_cb(void *arg) { nni_pipe *p = arg; - nni_aio * aio = &p->p_start_aio; + nni_aio * aio = p->p_start_aio; int rv; if ((rv = nni_aio_result(aio)) != 0) { @@ -270,9 +270,9 @@ void nni_pipe_start(nni_pipe *p) { if (p->p_tran_ops.p_start == NULL) { - nni_aio_finish(&p->p_start_aio, 0, 0); + nni_aio_finish(p->p_start_aio, 0, 0); } else { - p->p_tran_ops.p_start(p->p_tran_data, &p->p_start_aio); + p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio); } } diff --git a/src/core/socket.c b/src/core/socket.c index 6a242558..01fd9a0c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -222,7 +222,7 @@ nni_sock_cansend_cb(void *arg) nni_notify *notify = arg; nni_sock * sock = notify->n_sock; - if (nni_aio_result(¬ify->n_aio) != 0) { + if (nni_aio_result(notify->n_aio) != 0) { return; } @@ -235,7 +235,7 @@ nni_sock_canrecv_cb(void *arg) nni_notify *notify = arg; nni_sock * sock = notify->n_sock; - if (nni_aio_result(¬ify->n_aio) != 0) { + if (nni_aio_result(notify->n_aio) != 0) { return; } @@ -259,11 +259,11 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) switch (type) { case NNG_EV_CAN_RCV: nni_aio_init(¬ify->n_aio, nni_sock_canrecv_cb, notify); - nni_msgq_aio_notify_get(sock->s_urq, ¬ify->n_aio); + nni_msgq_aio_notify_get(sock->s_urq, notify->n_aio); break; case NNG_EV_CAN_SND: nni_aio_init(¬ify->n_aio, nni_sock_cansend_cb, notify); - nni_msgq_aio_notify_put(sock->s_uwq, ¬ify->n_aio); + nni_msgq_aio_notify_put(sock->s_uwq, notify->n_aio); break; default: NNI_FREE_STRUCT(notify); @@ -276,7 +276,7 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) void nni_sock_unnotify(nni_sock *sock, nni_notify *notify) { - nni_aio_fini(¬ify->n_aio); + nni_aio_fini(notify->n_aio); NNI_FREE_STRUCT(notify); } |
