diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-08-20 09:00:46 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-08-20 09:26:20 -0700 |
| commit | 6e5f6a26beec0a44d25625cacb5095cdc7a94146 (patch) | |
| tree | 7d6ef720b239a30d4906f1f2303fed77707b2761 /src | |
| parent | 3e70013111b70f1439b2f9991211c887a8eefff3 (diff) | |
| download | nng-6e5f6a26beec0a44d25625cacb5095cdc7a94146.tar.gz nng-6e5f6a26beec0a44d25625cacb5095cdc7a94146.tar.bz2 nng-6e5f6a26beec0a44d25625cacb5095cdc7a94146.zip | |
fixes #664 aio cancellation could be better
This changes the signature of the aio cancellation routines
to take the argument for cancellation directly, so we do not
need to lookup the argument using the nni_aio_get_prov_data.
We should probably consider eliminating nni_aio_get_prov_data,
and co, and changing the prov_extra to reflect prov_data. Later.
Diffstat (limited to 'src')
33 files changed, 196 insertions, 176 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 0a9c744d..15abfe8d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -95,7 +95,8 @@ struct nng_aio { void *a_outputs[4]; // Provider-use fields. - nni_aio_cancelfn a_prov_cancel; + nni_aio_cancelfn a_cancel_fn; + void * a_cancel_arg; void * a_prov_data; nni_list_node a_prov_node; void * a_prov_extra[4]; // Extra data used by provider @@ -136,19 +137,22 @@ nni_aio_fini(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; // This is like aio_close, but we don't want to dispatch // the task. And unlike aio_stop, we don't want to wait // for the task. (Because we implicitly do task_fini.) nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECLOSED); + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); } // Wait for the aio to be "done"; this ensures that we don't @@ -217,16 +221,19 @@ void nni_aio_stop(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECANCELED); + if (fn != NULL) { + fn(aio, arg, NNG_ECANCELED); } nni_aio_wait(aio); @@ -237,16 +244,19 @@ void nni_aio_close(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECLOSED); + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); } } } @@ -347,18 +357,19 @@ nni_aio_begin(nni_aio *aio) aio->a_result = NNG_ECANCELED; aio->a_count = 0; nni_list_node_remove(&aio->a_expire_node); - aio->a_prov_cancel = NULL; - aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = false; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_expire = NNI_TIME_NEVER; + aio->a_sleep = false; nni_mtx_unlock(&nni_aio_lk); nni_task_dispatch(aio->a_task); return (NNG_ECANCELED); } - aio->a_result = 0; - aio->a_count = 0; - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; + aio->a_result = 0; + aio->a_count = 0; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } @@ -391,9 +402,9 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) return (NNG_ECLOSED); } - NNI_ASSERT(aio->a_prov_cancel == NULL); - aio->a_prov_cancel = cancelfn; - aio->a_prov_data = data; + NNI_ASSERT(aio->a_cancel_fn == NULL); + aio->a_cancel_fn = cancelfn; + aio->a_cancel_arg = data; if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); @@ -407,16 +418,19 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) void nni_aio_abort(nni_aio *aio, int rv) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; nni_mtx_unlock(&nni_aio_lk); // Stop any I/O at the provider level. - if (cancelfn != NULL) { - cancelfn(aio, rv); + if (fn != NULL) { + fn(aio, arg, rv); } } @@ -430,9 +444,10 @@ nni_aio_finish_impl( nni_list_node_remove(&aio->a_expire_node); - aio->a_result = rv; - aio->a_count = count; - aio->a_prov_cancel = NULL; + aio->a_result = rv; + aio->a_count = count; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; if (msg) { aio->a_msg = msg; } @@ -536,7 +551,7 @@ nni_aio_expire_loop(void *arg) NNI_ARG_UNUSED(arg); for (;;) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; nni_time now; nni_aio * aio; int rv; @@ -569,8 +584,10 @@ nni_aio_expire_loop(void *arg) nni_list_remove(aios, aio); rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT; - if ((cancelfn = aio->a_prov_cancel) != NULL) { - + if ((fn = aio->a_cancel_fn) != NULL) { + void *arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; // Place a temporary hold on the aio. This prevents it // from being destroyed. nni_aio_expire_aio = aio; @@ -580,7 +597,7 @@ nni_aio_expire_loop(void *arg) // terminate the aio - we've tried, but it has to run // to it's natural conclusion. nni_mtx_unlock(&nni_aio_lk); - cancelfn(aio, rv); + fn(aio, arg, rv); nni_mtx_lock(&nni_aio_lk); nni_aio_expire_aio = NULL; @@ -666,8 +683,10 @@ nni_aio_iov_advance(nni_aio *aio, size_t n) } static void -nni_sleep_cancel(nng_aio *aio, int rv) +nni_sleep_cancel(nng_aio *aio, void *arg, int rv) { + NNI_ARG_UNUSED(arg); + nni_mtx_lock(&nni_aio_lk); if (!aio->a_sleep) { nni_mtx_unlock(&nni_aio_lk); diff --git a/src/core/aio.h b/src/core/aio.h index 5462c40b..0febc85e 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -18,7 +18,7 @@ typedef struct nni_aio_ops nni_aio_ops; -typedef void (*nni_aio_cancelfn)(nni_aio *, int); +typedef void (*nni_aio_cancelfn)(nni_aio *, void *, int); // nni_aio_init initializes an aio object. The callback is called with // the supplied argument when the operation is complete. If NULL is diff --git a/src/core/device.c b/src/core/device.c index 0fd23add..fee108a8 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -30,28 +30,29 @@ typedef struct nni_device_data { int npath; nni_device_path paths[2]; nni_mtx mtx; - int running; + bool running; } nni_device_data; typedef struct nni_device_pair nni_device_pair; static void -nni_device_cancel(nni_aio *aio, int rv) +nni_device_cancel(nni_aio *aio, void *arg, int rv) { - nni_device_data *dd = nni_aio_get_prov_data(aio); + nni_device_data *dd = arg; // cancellation is the only path to shutting it down. nni_mtx_lock(&dd->mtx); - if (dd->running == 0) { + if ((!dd->running) || (dd->user != aio)) { nni_mtx_unlock(&dd->mtx); return; } - dd->running = 0; + dd->running = false; + dd->user = NULL; nni_mtx_unlock(&dd->mtx); nni_sock_shutdown(dd->paths[0].src); nni_sock_shutdown(dd->paths[0].dst); - nni_aio_finish_error(dd->user, rv); + nni_aio_finish_error(aio, rv); } static void @@ -209,7 +210,7 @@ nni_device_start(nni_device_data *dd, nni_aio *user) p->state = NNI_DEVICE_STATE_RECV; nni_sock_recv(p->src, p->aio); } - dd->running = 1; + dd->running = true; nni_mtx_unlock(&dd->mtx); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index e58fe6f5..62a3893b 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -315,9 +315,9 @@ nni_msgq_run_notify(nni_msgq *mq) } static void -nni_msgq_cancel(nni_aio *aio, int rv) +nni_msgq_cancel(nni_aio *aio, void *arg, int rv) { - nni_msgq *mq = nni_aio_get_prov_data(aio); + nni_msgq *mq = arg; nni_mtx_lock(&mq->mq_lock); if (nni_aio_list_active(aio)) { diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c index 5a56d23e..0dae8bae 100644 --- a/src/platform/posix/posix_ipcconn.c +++ b/src/platform/posix/posix_ipcconn.c @@ -247,9 +247,9 @@ ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg) } static void -ipc_conn_cancel(nni_aio *aio, int rv) +ipc_conn_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_conn *c = nni_aio_get_prov_data(aio); + nni_ipc_conn *c = arg; nni_mtx_lock(&c->mtx); if (nni_aio_list_active(aio)) { @@ -352,7 +352,7 @@ ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid, return (0); #elif defined(NNG_HAVE_SOCKPEERCRED) struct sockpeercred uc; - socklen_t len = sizeof(uc); + socklen_t len = sizeof(uc); if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) { return (nni_plat_errno(errno)); } diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c index 13732911..3d0ad0d8 100644 --- a/src/platform/posix/posix_ipcdial.c +++ b/src/platform/posix/posix_ipcdial.c @@ -78,9 +78,9 @@ nni_ipc_dialer_fini(nni_ipc_dialer *d) } static void -ipc_dialer_cancel(nni_aio *aio, int rv) +ipc_dialer_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_dialer *d = nni_aio_get_prov_data(aio); + nni_ipc_dialer *d = arg; nni_ipc_conn * c; nni_mtx_lock(&d->mtx); diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c index 7134c4e3..6ac3eaa8 100644 --- a/src/platform/posix/posix_ipclisten.c +++ b/src/platform/posix/posix_ipclisten.c @@ -172,9 +172,9 @@ ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg) } static void -ipc_listener_cancel(nni_aio *aio, int rv) +ipc_listener_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_listener *l = nni_aio_get_prov_data(aio); + nni_ipc_listener *l = arg; // This is dead easy, because we'll ignore the completion if there // isn't anything to do the accept on! diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index aea823a0..1c56778e 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -57,17 +57,17 @@ struct resolv_item { }; static void -resolv_cancel(nni_aio *aio, int rv) +resolv_cancel(nni_aio *aio, void *arg, int rv) { - resolv_item *item; + resolv_item *item = arg; nni_mtx_lock(&resolv_mtx); - if ((item = nni_aio_get_prov_data(aio)) == NULL) { + if (item != nni_aio_get_prov_extra(aio, 0)) { // Already canceled? nni_mtx_unlock(&resolv_mtx); return; } - nni_aio_set_prov_data(aio, NULL); + nni_aio_set_prov_extra(aio, 0, NULL); if (nni_aio_list_active(aio)) { // We have not been picked up by a resolver thread yet, // so we can just discard everything. @@ -263,6 +263,7 @@ resolv_ip(const char *host, const char *serv, int passive, int family, if (resolv_fini) { rv = NNG_ECLOSED; } else { + nni_aio_set_prov_extra(aio, 0, item); rv = nni_aio_schedule(aio, resolv_cancel, item); } if (rv != 0) { @@ -310,7 +311,7 @@ resolv_worker(void *notused) continue; } - item = nni_aio_get_prov_data(aio); + item = nni_aio_get_prov_extra(aio, 0); nni_aio_list_remove(aio); // Now attempt to do the work. This runs synchronously. @@ -321,7 +322,7 @@ resolv_worker(void *notused) // Check to make sure we were not canceled. if ((aio = item->aio) != NULL) { nng_sockaddr *sa = nni_aio_get_input(aio, 0); - nni_aio_set_prov_data(aio, NULL); + nni_aio_set_prov_extra(aio, 0, NULL); item->aio = NULL; memcpy(sa, &item->sa, sizeof(*sa)); nni_aio_finish(aio, rv, 0); diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index c0352c55..788c4e9f 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -243,9 +243,9 @@ tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg) } static void -tcp_conn_cancel(nni_aio *aio, int rv) +tcp_conn_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_tcp_conn *c = arg; nni_mtx_lock(&c->mtx); if (nni_aio_list_active(aio)) { diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c index 21f6ecfe..ab3f3545 100644 --- a/src/platform/posix/posix_tcpdial.c +++ b/src/platform/posix/posix_tcpdial.c @@ -78,9 +78,9 @@ nni_tcp_dialer_fini(nni_tcp_dialer *d) } static void -tcp_dialer_cancel(nni_aio *aio, int rv) +tcp_dialer_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_dialer *d = arg; nni_tcp_conn * c; nni_mtx_lock(&d->mtx); diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c index 8c186885..bc414cd0 100644 --- a/src/platform/posix/posix_tcplisten.c +++ b/src/platform/posix/posix_tcplisten.c @@ -165,9 +165,9 @@ tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg) } static void -tcp_listener_cancel(nni_aio *aio, int rv) +tcp_listener_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_listener *l = nni_aio_get_prov_data(aio); + nni_tcp_listener *l = arg; // This is dead easy, because we'll ignore the completion if there // isn't anything to do the accept on! diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 873a02a1..6b1ef399 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -269,9 +269,9 @@ nni_plat_udp_close(nni_plat_udp *udp) } void -nni_plat_udp_cancel(nni_aio *aio, int rv) +nni_plat_udp_cancel(nni_aio *aio, void *arg, int rv) { - nni_plat_udp *udp = nni_aio_get_prov_data(aio); + nni_plat_udp *udp = arg; nni_mtx_lock(&udp->udp_mtx); if (nni_aio_list_active(aio)) { diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index b8e95f6c..38807e30 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -87,9 +87,9 @@ nni_win_iocp_handler(void *arg) } static void -nni_win_event_cancel(nni_aio *aio, int rv) +nni_win_event_cancel(nni_aio *aio, void *arg, int rv) { - nni_win_event *evt = nni_aio_get_prov_data(aio); + nni_win_event *evt = arg; nni_mtx_lock(&evt->mtx); if (aio == evt->active) { diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c index d8ef4e4e..d4e7f5ac 100644 --- a/src/platform/windows/win_ipcconn.c +++ b/src/platform/windows/win_ipcconn.c @@ -100,9 +100,9 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num) nni_aio_finish_synch(aio, rv, num); } static void -ipc_recv_cancel(nni_aio *aio, int rv) +ipc_recv_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_conn *c = nni_aio_get_prov_data(aio); + nni_ipc_conn *c = arg; nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->recv_aios)) { c->recv_rv = rv; @@ -226,9 +226,9 @@ ipc_send_cb(nni_win_io *io, int rv, size_t num) } static void -ipc_send_cancel(nni_aio *aio, int rv) +ipc_send_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_conn *c = nni_aio_get_prov_data(aio); + nni_ipc_conn *c = arg; nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->send_aios)) { c->send_rv = rv; diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c index 429bcedf..67865687 100644 --- a/src/platform/windows/win_ipcdial.c +++ b/src/platform/windows/win_ipcdial.c @@ -134,10 +134,10 @@ ipc_dial_thr(void *arg) } static void -ipc_dial_cancel(nni_aio *aio, int rv) +ipc_dial_cancel(nni_aio *aio, void *arg, int rv) { + nni_ipc_dialer *d = arg; ipc_dial_work * w = &ipc_connecter; - nni_ipc_dialer *d = nni_aio_get_prov_data(aio); nni_mtx_lock(&w->mtx); if (nni_aio_list_active(aio)) { diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c index 20bb8548..0fc1c9fc 100644 --- a/src/platform/windows/win_ipclisten.c +++ b/src/platform/windows/win_ipclisten.c @@ -222,9 +222,9 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa) } static void -ipc_accept_cancel(nni_aio *aio, int rv) +ipc_accept_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_listener *l = nni_aio_get_prov_data(aio); + nni_ipc_listener *l = arg; nni_mtx_unlock(&l->mtx); if (aio == nni_list_first(&l->aios)) { diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index cc876d04..aca07ecb 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -45,16 +45,16 @@ struct resolv_item { }; static void -resolv_cancel(nni_aio *aio, int rv) +resolv_cancel(nni_aio *aio, void *arg, int rv) { - resolv_item *item; + resolv_item *item = arg; nni_mtx_lock(&resolv_mtx); - if ((item = nni_aio_get_prov_data(aio)) == NULL) { + if (item != nni_aio_get_prov_extra(aio, 0)) { nni_mtx_unlock(&resolv_mtx); return; } - nni_aio_set_prov_data(aio, NULL); + nni_aio_set_prov_extra(aio, 0, NULL); if (nni_aio_list_active(aio)) { // We have not been picked up by a resolver thread yet, // so we can just discard everything. @@ -236,6 +236,7 @@ resolv_ip(const char *host, const char *serv, int passive, int family, if (resolv_fini) { rv = NNG_ECLOSED; } else { + nni_aio_set_prov_extra(aio, 0, item); rv = nni_aio_schedule(aio, resolv_cancel, item); } if (rv != 0) { @@ -283,7 +284,7 @@ resolv_worker(void *notused) continue; } - item = nni_aio_get_prov_data(aio); + item = nni_aio_get_prov_extra(aio, 0); nni_aio_list_remove(aio); // Now attempt to do the work. This runs synchronously. @@ -294,7 +295,7 @@ resolv_worker(void *notused) // Check to make sure we were not canceled. if ((aio = item->aio) != NULL) { nng_sockaddr *sa = nni_aio_get_input(aio, 0); - nni_aio_set_prov_data(aio, NULL); + nni_aio_set_prov_extra(aio, 0, NULL); item->aio = NULL; memcpy(sa, &item->sa, sizeof(*sa)); nni_aio_finish(aio, rv, 0); diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c index 08b759ca..68c72b9b 100644 --- a/src/platform/windows/win_tcpconn.c +++ b/src/platform/windows/win_tcpconn.c @@ -96,9 +96,9 @@ tcp_recv_cb(nni_win_io *io, int rv, size_t num) } static void -tcp_recv_cancel(nni_aio *aio, int rv) +tcp_recv_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_tcp_conn *c = arg; nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->recv_aios)) { c->recv_rv = rv; @@ -186,9 +186,9 @@ again: } static void -tcp_send_cancel(nni_aio *aio, int rv) +tcp_send_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_tcp_conn *c = arg; nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->send_aios)) { c->send_rv = rv; diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c index 5283ea81..99308ceb 100644 --- a/src/platform/windows/win_tcpdial.c +++ b/src/platform/windows/win_tcpdial.c @@ -94,9 +94,9 @@ nni_tcp_dialer_fini(nni_tcp_dialer *d) } static void -tcp_dial_cancel(nni_aio *aio, int rv) +tcp_dial_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_dialer *d = arg; nni_tcp_conn * c; nni_mtx_lock(&d->mtx); diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c index 1f197246..f0e195be 100644 --- a/src/platform/windows/win_tcplisten.c +++ b/src/platform/windows/win_tcplisten.c @@ -236,9 +236,9 @@ nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) } static void -tcp_accept_cancel(nni_aio *aio, int rv) +tcp_accept_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_listener *l = nni_aio_get_prov_data(aio); + nni_tcp_listener *l = arg; nni_tcp_conn * c; nni_mtx_lock(&l->mtx); diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index f9ce58fd..586f7143 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -131,9 +131,9 @@ rep0_ctx_init(void **ctxp, void *sarg) } static void -rep0_ctx_cancel_send(nni_aio *aio, int rv) +rep0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) { - rep0_ctx * ctx = nni_aio_get_prov_data(aio); + rep0_ctx * ctx = arg; rep0_sock *s = ctx->sock; nni_mtx_lock(&s->lk); @@ -448,9 +448,9 @@ rep0_pipe_send_cb(void *arg) } static void -rep0_cancel_recv(nni_aio *aio, int rv) +rep0_cancel_recv(nni_aio *aio, void *arg, int rv) { - rep0_ctx * ctx = nni_aio_get_prov_data(aio); + rep0_ctx * ctx = arg; rep0_sock *s = ctx->sock; nni_mtx_lock(&s->lk); diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 6220697b..49d0ea66 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -590,9 +590,9 @@ req0_ctx_reset(req0_ctx *ctx) } static void -req0_ctx_cancel_recv(nni_aio *aio, int rv) +req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv) { - req0_ctx * ctx = nni_aio_get_prov_data(aio); + req0_ctx * ctx = arg; req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); @@ -666,9 +666,9 @@ req0_ctx_recv(void *arg, nni_aio *aio) } static void -req0_ctx_cancel_send(nni_aio *aio, int rv) +req0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) { - req0_ctx * ctx = nni_aio_get_prov_data(aio); + req0_ctx * ctx = arg; req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index 4e0a5263..80e3f2f8 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -133,9 +133,9 @@ resp0_ctx_init(void **ctxp, void *sarg) } static void -resp0_ctx_cancel_send(nni_aio *aio, int rv) +resp0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) { - resp0_ctx * ctx = nni_aio_get_prov_data(aio); + resp0_ctx * ctx = arg; resp0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); @@ -437,9 +437,9 @@ resp0_pipe_send_cb(void *arg) } static void -resp0_cancel_recv(nni_aio *aio, int rv) +resp0_cancel_recv(nni_aio *aio, void *arg, int rv) { - resp0_ctx * ctx = nni_aio_get_prov_data(aio); + resp0_ctx * ctx = arg; resp0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index da4ae5c8..1639b3ec 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -232,9 +232,9 @@ nni_http_client_get_tls(nni_http_client *c, struct nng_tls_config **tlsp) } static void -http_dial_cancel(nni_aio *aio, int rv) +http_dial_cancel(nni_aio *aio, void *arg, int rv) { - nni_http_client *c = nni_aio_get_prov_data(aio); + nni_http_client *c = arg; nni_mtx_lock(&c->mtx); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c index 169918e9..b17b02cf 100644 --- a/src/supplemental/http/http_conn.c +++ b/src/supplemental/http/http_conn.c @@ -350,9 +350,9 @@ http_rd_cb(void *arg) } static void -http_rd_cancel(nni_aio *aio, int rv) +http_rd_cancel(nni_aio *aio, void *arg, int rv) { - nni_http_conn *conn = nni_aio_get_prov_data(aio); + nni_http_conn *conn = arg; nni_mtx_lock(&conn->mtx); if (aio == conn->rd_uaio) { @@ -469,9 +469,9 @@ done: } static void -http_wr_cancel(nni_aio *aio, int rv) +http_wr_cancel(nni_aio *aio, void *arg, int rv) { - nni_http_conn *conn = nni_aio_get_prov_data(aio); + nni_http_conn *conn = arg; nni_mtx_lock(&conn->mtx); if (aio == conn->wr_uaio) { diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c index 42333783..0d15ae9f 100644 --- a/src/supplemental/tls/mbedtls/tls.c +++ b/src/supplemental/tls/mbedtls/tls.c @@ -371,9 +371,9 @@ nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp) } static void -nni_tls_cancel(nni_aio *aio, int rv) +nni_tls_cancel(nni_aio *aio, void *arg, int rv) { - nni_tls *tp = nni_aio_get_prov_data(aio); + nni_tls *tp = arg; nni_mtx_lock(&tp->lk); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index efa0fdf8..a1553704 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -517,9 +517,9 @@ ws_start_write(nni_ws *ws) } static void -ws_cancel_close(nni_aio *aio, int rv) +ws_cancel_close(nni_aio *aio, void *arg, int rv) { - nni_ws *ws = nni_aio_get_prov_data(aio); + nni_ws *ws = arg; nni_mtx_lock(&ws->mtx); if (ws->wclose) { ws->wclose = false; @@ -616,15 +616,13 @@ ws_write_cb(void *arg) } static void -ws_write_cancel(nni_aio *aio, int rv) +ws_write_cancel(nni_aio *aio, void *arg, int rv) { - nni_ws * ws; + nni_ws * ws = arg; ws_msg * wm; ws_frame *frame; - // Is this aio active? We can tell by looking at the - // active tx frame. - ws = nni_aio_get_prov_data(aio); + // Is this aio active? We can tell by looking at the active tx frame. nni_mtx_lock(&ws->mtx); if (!nni_aio_list_active(aio)) { @@ -1038,9 +1036,9 @@ ws_read_cb(void *arg) } static void -ws_read_cancel(nni_aio *aio, int rv) +ws_read_cancel(nni_aio *aio, void *arg, int rv) { - nni_ws *ws = nni_aio_get_prov_data(aio); + nni_ws *ws = arg; ws_msg *wm; nni_mtx_lock(&ws->mtx); @@ -1676,9 +1674,9 @@ nni_ws_listener_proto(nni_ws_listener *l, const char *proto) } static void -ws_accept_cancel(nni_aio *aio, int rv) +ws_accept_cancel(nni_aio *aio, void *arg, int rv) { - nni_ws_listener *l = nni_aio_get_prov_data(aio); + nni_ws_listener *l = arg; nni_mtx_lock(&l->mtx); if (nni_aio_list_active(aio)) { @@ -2031,9 +2029,9 @@ nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto) } static void -ws_dial_cancel(nni_aio *aio, int rv) +ws_dial_cancel(nni_aio *aio, void *arg, int rv) { - nni_ws *ws = nni_aio_get_prov_data(aio); + nni_ws *ws = arg; nni_mtx_lock(&ws->mtx); if (aio == ws->useraio) { diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 89f9b024..3194a56d 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -245,10 +245,9 @@ nni_inproc_ep_fini(void *arg) } static void -nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe) +inproc_conn_finish( + nni_aio *aio, int rv, nni_inproc_ep *ep, nni_inproc_pipe *pipe) { - nni_inproc_ep *ep = nni_aio_get_prov_data(aio); - nni_aio_list_remove(aio); if ((ep != NULL) && (!ep->listener) && nni_list_empty(&ep->aios)) { @@ -289,12 +288,12 @@ nni_inproc_ep_close(void *arg) // Notify any waiting clients that we are closed. while ((client = nni_list_first(&ep->clients)) != NULL) { while ((aio = nni_list_first(&client->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); + inproc_conn_finish(aio, NNG_ECONNREFUSED, ep, NULL); } nni_list_remove(&ep->clients, client); } while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECLOSED, NULL); + inproc_conn_finish(aio, NNG_ECLOSED, ep, NULL); } nni_mtx_unlock(&nni_inproc.mx); } @@ -322,8 +321,10 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) } if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { - nni_inproc_conn_finish(caio, NNG_ENOMEM, NULL); - nni_inproc_conn_finish(saio, NNG_ENOMEM, NULL); + inproc_conn_finish( + caio, NNG_ENOMEM, cli, NULL); + inproc_conn_finish( + saio, NNG_ENOMEM, srv, NULL); continue; } nni_mtx_init(&pair->mx); @@ -340,8 +341,8 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) if (spipe != NULL) { nni_inproc_pipe_fini(spipe); } - nni_inproc_conn_finish(caio, rv, NULL); - nni_inproc_conn_finish(saio, rv, NULL); + inproc_conn_finish(caio, rv, cli, NULL); + inproc_conn_finish(saio, rv, srv, NULL); nni_inproc_pair_destroy(pair); continue; } @@ -357,8 +358,8 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) nni_msgq_set_filter(spipe->rq, inproc_filter, spipe); nni_msgq_set_filter(cpipe->rq, inproc_filter, cpipe); - nni_inproc_conn_finish(caio, 0, cpipe); - nni_inproc_conn_finish(saio, 0, spipe); + inproc_conn_finish(caio, 0, cli, cpipe); + inproc_conn_finish(saio, 0, srv, spipe); } if (nni_list_first(&cli->aios) == NULL) { @@ -372,9 +373,9 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) } static void -nni_inproc_ep_cancel(nni_aio *aio, int rv) +nni_inproc_ep_cancel(nni_aio *aio, void *arg, int rv) { - nni_inproc_ep *ep = nni_aio_get_prov_data(aio); + nni_inproc_ep *ep = arg; nni_mtx_lock(&nni_inproc.mx); if (nni_aio_list_active(aio)) { @@ -405,7 +406,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) } } if (server == NULL) { - // nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); nni_mtx_unlock(&nni_inproc.mx); nni_aio_finish_error(aio, NNG_ECONNREFUSED); return; diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 5d0064e5..05e0b895 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -451,9 +451,9 @@ error: } static void -ipctran_pipe_send_cancel(nni_aio *aio, int rv) +ipctran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) { - ipctran_pipe *p = nni_aio_get_prov_data(aio); + ipctran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -544,9 +544,9 @@ ipctran_pipe_send(void *arg, nni_aio *aio) } static void -ipctran_pipe_recv_cancel(nni_aio *aio, int rv) +ipctran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) { - ipctran_pipe *p = nni_aio_get_prov_data(aio); + ipctran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -686,9 +686,9 @@ ipctran_pipe_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_opt_type t) } static void -ipctran_pipe_conn_cancel(nni_aio *aio, int rv) +ipctran_pipe_conn_cancel(nni_aio *aio, void *arg, int rv) { - ipctran_pipe *p = nni_aio_get_prov_data(aio); + ipctran_pipe *p = arg; nni_mtx_lock(&p->ep->mtx); if (aio == p->useraio) { diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index c58d66ec..3a4e018e 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -193,9 +193,9 @@ tcptran_pipe_init(tcptran_pipe **pipep, tcptran_ep *ep) } static void -tcptran_pipe_conn_cancel(nni_aio *aio, int rv) +tcptran_pipe_conn_cancel(nni_aio *aio, void *arg, int rv) { - tcptran_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = arg; nni_mtx_lock(&p->ep->mtx); if (aio == p->useraio) { @@ -483,9 +483,9 @@ recv_error: } static void -tcptran_pipe_send_cancel(nni_aio *aio, int rv) +tcptran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) { - tcptran_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -576,9 +576,9 @@ tcptran_pipe_send(void *arg, nni_aio *aio) } static void -tcptran_pipe_recv_cancel(nni_aio *aio, int rv) +tcptran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) { - tcptran_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index aff98c27..49bc932d 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -197,9 +197,9 @@ tlstran_pipe_reap(tlstran_pipe *p) } static void -tlstran_pipe_conn_cancel(nni_aio *aio, int rv) +tlstran_pipe_conn_cancel(nni_aio *aio, void *arg, int rv) { - tlstran_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = arg; nni_mtx_lock(&p->ep->mtx); if (aio == p->useraio) { @@ -493,9 +493,9 @@ recv_error: } static void -tlstran_pipe_send_cancel(nni_aio *aio, int rv) +tlstran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) { - tlstran_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -578,9 +578,9 @@ tlstran_pipe_send(void *arg, nni_aio *aio) } static void -tlstran_pipe_recv_cancel(nni_aio *aio, int rv) +tlstran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) { - tlstran_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 81638d95..9b3f67f4 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -120,9 +120,9 @@ ws_pipe_recv_cb(void *arg) } static void -ws_pipe_recv_cancel(nni_aio *aio, int rv) +ws_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) { - ws_pipe *p = nni_aio_get_prov_data(aio); + ws_pipe *p = arg; nni_mtx_lock(&p->mtx); if (p->user_rxaio != aio) { nni_mtx_unlock(&p->mtx); @@ -155,9 +155,9 @@ ws_pipe_recv(void *arg, nni_aio *aio) } static void -ws_pipe_send_cancel(nni_aio *aio, int rv) +ws_pipe_send_cancel(nni_aio *aio, void *arg, int rv) { - ws_pipe *p = nni_aio_get_prov_data(aio); + ws_pipe *p = arg; nni_mtx_lock(&p->mtx); if (p->user_txaio != aio) { nni_mtx_unlock(&p->mtx); @@ -299,9 +299,9 @@ ws_listener_bind(void *arg) } static void -ws_listener_cancel(nni_aio *aio, int rv) +ws_listener_cancel(nni_aio *aio, void *arg, int rv) { - ws_listener *l = nni_aio_get_prov_data(aio); + ws_listener *l = arg; nni_mtx_lock(&l->mtx); if (nni_aio_list_active(aio)) { @@ -337,9 +337,9 @@ ws_listener_accept(void *arg, nni_aio *aio) } static void -ws_dialer_cancel(nni_aio *aio, int rv) +ws_dialer_cancel(nni_aio *aio, void *arg, int rv) { - ws_dialer *d = nni_aio_get_prov_data(aio); + ws_dialer *d = arg; nni_mtx_lock(&d->mtx); if (nni_aio_list_active(aio)) { diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index d1072931..0a68c1ca 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -1840,9 +1840,9 @@ zt_pipe_send(void *arg, nni_aio *aio) } static void -zt_pipe_cancel_recv(nni_aio *aio, int rv) +zt_pipe_cancel_recv(nni_aio *aio, void *arg, int rv) { - zt_pipe *p = nni_aio_get_prov_data(aio); + zt_pipe *p = arg; nni_mtx_lock(&zt_lk); if (p->zp_user_rxaio == aio) { p->zp_user_rxaio = NULL; @@ -2331,9 +2331,9 @@ zt_ep_bind(void *arg) } static void -zt_ep_cancel(nni_aio *aio, int rv) +zt_ep_cancel(nni_aio *aio, void *arg, int rv) { - zt_ep *ep = nni_aio_get_prov_data(aio); + zt_ep *ep = arg; nni_mtx_lock(&zt_lk); if (nni_aio_list_active(aio)) { @@ -2417,9 +2417,9 @@ zt_ep_accept(void *arg, nni_aio *aio) } static void -zt_ep_conn_req_cancel(nni_aio *aio, int rv) +zt_ep_conn_req_cancel(nni_aio *aio, void *arg, int rv) { - zt_ep *ep = nni_aio_get_prov_data(aio); + zt_ep *ep = arg; // We don't have much to do here. The AIO will have been // canceled as a result of the "parent" AIO canceling. nni_mtx_lock(&zt_lk); |
