diff options
33 files changed, 196 insertions, 176 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 0a9c744d..15abfe8d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -95,7 +95,8 @@ struct nng_aio { void *a_outputs[4]; // Provider-use fields. - nni_aio_cancelfn a_prov_cancel; + nni_aio_cancelfn a_cancel_fn; + void * a_cancel_arg; void * a_prov_data; nni_list_node a_prov_node; void * a_prov_extra[4]; // Extra data used by provider @@ -136,19 +137,22 @@ nni_aio_fini(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; // This is like aio_close, but we don't want to dispatch // the task. And unlike aio_stop, we don't want to wait // for the task. (Because we implicitly do task_fini.) nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECLOSED); + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); } // Wait for the aio to be "done"; this ensures that we don't @@ -217,16 +221,19 @@ void nni_aio_stop(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECANCELED); + if (fn != NULL) { + fn(aio, arg, NNG_ECANCELED); } nni_aio_wait(aio); @@ -237,16 +244,19 @@ void nni_aio_close(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECLOSED); + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); } } } @@ -347,18 +357,19 @@ nni_aio_begin(nni_aio *aio) aio->a_result = NNG_ECANCELED; aio->a_count = 0; nni_list_node_remove(&aio->a_expire_node); - aio->a_prov_cancel = NULL; - aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = false; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_expire = NNI_TIME_NEVER; + aio->a_sleep = false; nni_mtx_unlock(&nni_aio_lk); nni_task_dispatch(aio->a_task); return (NNG_ECANCELED); } - aio->a_result = 0; - aio->a_count = 0; - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; + aio->a_result = 0; + aio->a_count = 0; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } @@ -391,9 +402,9 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) return (NNG_ECLOSED); } - NNI_ASSERT(aio->a_prov_cancel == NULL); - aio->a_prov_cancel = cancelfn; - aio->a_prov_data = data; + NNI_ASSERT(aio->a_cancel_fn == NULL); + aio->a_cancel_fn = cancelfn; + aio->a_cancel_arg = data; if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); @@ -407,16 +418,19 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) void nni_aio_abort(nni_aio *aio, int rv) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; nni_mtx_unlock(&nni_aio_lk); // Stop any I/O at the provider level. - if (cancelfn != NULL) { - cancelfn(aio, rv); + if (fn != NULL) { + fn(aio, arg, rv); } } @@ -430,9 +444,10 @@ nni_aio_finish_impl( nni_list_node_remove(&aio->a_expire_node); - aio->a_result = rv; - aio->a_count = count; - aio->a_prov_cancel = NULL; + aio->a_result = rv; + aio->a_count = count; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; if (msg) { aio->a_msg = msg; } @@ -536,7 +551,7 @@ nni_aio_expire_loop(void *arg) NNI_ARG_UNUSED(arg); for (;;) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; nni_time now; nni_aio * aio; int rv; @@ -569,8 +584,10 @@ nni_aio_expire_loop(void *arg) nni_list_remove(aios, aio); rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT; - if ((cancelfn = aio->a_prov_cancel) != NULL) { - + if ((fn = aio->a_cancel_fn) != NULL) { + void *arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; // Place a temporary hold on the aio. This prevents it // from being destroyed. nni_aio_expire_aio = aio; @@ -580,7 +597,7 @@ nni_aio_expire_loop(void *arg) // terminate the aio - we've tried, but it has to run // to it's natural conclusion. nni_mtx_unlock(&nni_aio_lk); - cancelfn(aio, rv); + fn(aio, arg, rv); nni_mtx_lock(&nni_aio_lk); nni_aio_expire_aio = NULL; @@ -666,8 +683,10 @@ nni_aio_iov_advance(nni_aio *aio, size_t n) } static void -nni_sleep_cancel(nng_aio *aio, int rv) +nni_sleep_cancel(nng_aio *aio, void *arg, int rv) { + NNI_ARG_UNUSED(arg); + nni_mtx_lock(&nni_aio_lk); if (!aio->a_sleep) { nni_mtx_unlock(&nni_aio_lk); diff --git a/src/core/aio.h b/src/core/aio.h index 5462c40b..0febc85e 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -18,7 +18,7 @@ typedef struct nni_aio_ops nni_aio_ops; -typedef void (*nni_aio_cancelfn)(nni_aio *, int); +typedef void (*nni_aio_cancelfn)(nni_aio *, void *, int); // nni_aio_init initializes an aio object. The callback is called with // the supplied argument when the operation is complete. If NULL is diff --git a/src/core/device.c b/src/core/device.c index 0fd23add..fee108a8 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -30,28 +30,29 @@ typedef struct nni_device_data { int npath; nni_device_path paths[2]; nni_mtx mtx; - int running; + bool running; } nni_device_data; typedef struct nni_device_pair nni_device_pair; static void -nni_device_cancel(nni_aio *aio, int rv) +nni_device_cancel(nni_aio *aio, void *arg, int rv) { - nni_device_data *dd = nni_aio_get_prov_data(aio); + nni_device_data *dd = arg; // cancellation is the only path to shutting it down. nni_mtx_lock(&dd->mtx); - if (dd->running == 0) { + if ((!dd->running) || (dd->user != aio)) { nni_mtx_unlock(&dd->mtx); return; } - dd->running = 0; + dd->running = false; + dd->user = NULL; nni_mtx_unlock(&dd->mtx); nni_sock_shutdown(dd->paths[0].src); nni_sock_shutdown(dd->paths[0].dst); - nni_aio_finish_error(dd->user, rv); + nni_aio_finish_error(aio, rv); } static void @@ -209,7 +210,7 @@ nni_device_start(nni_device_data *dd, nni_aio *user) p->state = NNI_DEVICE_STATE_RECV; nni_sock_recv(p->src, p->aio); } - dd->running = 1; + dd->running = true; nni_mtx_unlock(&dd->mtx); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index e58fe6f5..62a3893b 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -315,9 +315,9 @@ nni_msgq_run_notify(nni_msgq *mq) } static void -nni_msgq_cancel(nni_aio *aio, int rv) +nni_msgq_cancel(nni_aio *aio, void *arg, int rv) { - nni_msgq *mq = nni_aio_get_prov_data(aio); + nni_msgq *mq = arg; nni_mtx_lock(&mq->mq_lock); if (nni_aio_list_active(aio)) { diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c index 5a56d23e..0dae8bae 100644 --- a/src/platform/posix/posix_ipcconn.c +++ b/src/platform/posix/posix_ipcconn.c @@ -247,9 +247,9 @@ ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg) } static void -ipc_conn_cancel(nni_aio *aio, int rv) +ipc_conn_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_conn *c = nni_aio_get_prov_data(aio); + nni_ipc_conn *c = arg; nni_mtx_lock(&c->mtx); if (nni_aio_list_active(aio)) { @@ -352,7 +352,7 @@ ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid, return (0); #elif defined(NNG_HAVE_SOCKPEERCRED) struct sockpeercred uc; - socklen_t len = sizeof(uc); + socklen_t len = sizeof(uc); if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &len) != 0) { return (nni_plat_errno(errno)); } diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c index 13732911..3d0ad0d8 100644 --- a/src/platform/posix/posix_ipcdial.c +++ b/src/platform/posix/posix_ipcdial.c @@ -78,9 +78,9 @@ nni_ipc_dialer_fini(nni_ipc_dialer *d) } static void -ipc_dialer_cancel(nni_aio *aio, int rv) +ipc_dialer_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_dialer *d = nni_aio_get_prov_data(aio); + nni_ipc_dialer *d = arg; nni_ipc_conn * c; nni_mtx_lock(&d->mtx); diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c index 7134c4e3..6ac3eaa8 100644 --- a/src/platform/posix/posix_ipclisten.c +++ b/src/platform/posix/posix_ipclisten.c @@ -172,9 +172,9 @@ ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg) } static void -ipc_listener_cancel(nni_aio *aio, int rv) +ipc_listener_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_listener *l = nni_aio_get_prov_data(aio); + nni_ipc_listener *l = arg; // This is dead easy, because we'll ignore the completion if there // isn't anything to do the accept on! diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index aea823a0..1c56778e 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -57,17 +57,17 @@ struct resolv_item { }; static void -resolv_cancel(nni_aio *aio, int rv) +resolv_cancel(nni_aio *aio, void *arg, int rv) { - resolv_item *item; + resolv_item *item = arg; nni_mtx_lock(&resolv_mtx); - if ((item = nni_aio_get_prov_data(aio)) == NULL) { + if (item != nni_aio_get_prov_extra(aio, 0)) { // Already canceled? nni_mtx_unlock(&resolv_mtx); return; } - nni_aio_set_prov_data(aio, NULL); + nni_aio_set_prov_extra(aio, 0, NULL); if (nni_aio_list_active(aio)) { // We have not been picked up by a resolver thread yet, // so we can just discard everything. @@ -263,6 +263,7 @@ resolv_ip(const char *host, const char *serv, int passive, int family, if (resolv_fini) { rv = NNG_ECLOSED; } else { + nni_aio_set_prov_extra(aio, 0, item); rv = nni_aio_schedule(aio, resolv_cancel, item); } if (rv != 0) { @@ -310,7 +311,7 @@ resolv_worker(void *notused) continue; } - item = nni_aio_get_prov_data(aio); + item = nni_aio_get_prov_extra(aio, 0); nni_aio_list_remove(aio); // Now attempt to do the work. This runs synchronously. @@ -321,7 +322,7 @@ resolv_worker(void *notused) // Check to make sure we were not canceled. if ((aio = item->aio) != NULL) { nng_sockaddr *sa = nni_aio_get_input(aio, 0); - nni_aio_set_prov_data(aio, NULL); + nni_aio_set_prov_extra(aio, 0, NULL); item->aio = NULL; memcpy(sa, &item->sa, sizeof(*sa)); nni_aio_finish(aio, rv, 0); diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index c0352c55..788c4e9f 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -243,9 +243,9 @@ tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg) } static void -tcp_conn_cancel(nni_aio *aio, int rv) +tcp_conn_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_tcp_conn *c = arg; nni_mtx_lock(&c->mtx); if (nni_aio_list_active(aio)) { diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c index 21f6ecfe..ab3f3545 100644 --- a/src/platform/posix/posix_tcpdial.c +++ b/src/platform/posix/posix_tcpdial.c @@ -78,9 +78,9 @@ nni_tcp_dialer_fini(nni_tcp_dialer *d) } static void -tcp_dialer_cancel(nni_aio *aio, int rv) +tcp_dialer_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_dialer *d = arg; nni_tcp_conn * c; nni_mtx_lock(&d->mtx); diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c index 8c186885..bc414cd0 100644 --- a/src/platform/posix/posix_tcplisten.c +++ b/src/platform/posix/posix_tcplisten.c @@ -165,9 +165,9 @@ tcp_listener_cb(nni_posix_pfd *pfd, int events, void *arg) } static void -tcp_listener_cancel(nni_aio *aio, int rv) +tcp_listener_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_listener *l = nni_aio_get_prov_data(aio); + nni_tcp_listener *l = arg; // This is dead easy, because we'll ignore the completion if there // isn't anything to do the accept on! diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 873a02a1..6b1ef399 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -269,9 +269,9 @@ nni_plat_udp_close(nni_plat_udp *udp) } void -nni_plat_udp_cancel(nni_aio *aio, int rv) +nni_plat_udp_cancel(nni_aio *aio, void *arg, int rv) { - nni_plat_udp *udp = nni_aio_get_prov_data(aio); + nni_plat_udp *udp = arg; nni_mtx_lock(&udp->udp_mtx); if (nni_aio_list_active(aio)) { diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index b8e95f6c..38807e30 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -87,9 +87,9 @@ nni_win_iocp_handler(void *arg) } static void -nni_win_event_cancel(nni_aio *aio, int rv) +nni_win_event_cancel(nni_aio *aio, void *arg, int rv) { - nni_win_event *evt = nni_aio_get_prov_data(aio); + nni_win_event *evt = arg; nni_mtx_lock(&evt->mtx); if (aio == evt->active) { diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c index d8ef4e4e..d4e7f5ac 100644 --- a/src/platform/windows/win_ipcconn.c +++ b/src/platform/windows/win_ipcconn.c @@ -100,9 +100,9 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num) nni_aio_finish_synch(aio, rv, num); } static void -ipc_recv_cancel(nni_aio *aio, int rv) +ipc_recv_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_conn *c = nni_aio_get_prov_data(aio); + nni_ipc_conn *c = arg; nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->recv_aios)) { c->recv_rv = rv; @@ -226,9 +226,9 @@ ipc_send_cb(nni_win_io *io, int rv, size_t num) } static void -ipc_send_cancel(nni_aio *aio, int rv) +ipc_send_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_conn *c = nni_aio_get_prov_data(aio); + nni_ipc_conn *c = arg; nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->send_aios)) { c->send_rv = rv; diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c index 429bcedf..67865687 100644 --- a/src/platform/windows/win_ipcdial.c +++ b/src/platform/windows/win_ipcdial.c @@ -134,10 +134,10 @@ ipc_dial_thr(void *arg) } static void -ipc_dial_cancel(nni_aio *aio, int rv) +ipc_dial_cancel(nni_aio *aio, void *arg, int rv) { + nni_ipc_dialer *d = arg; ipc_dial_work * w = &ipc_connecter; - nni_ipc_dialer *d = nni_aio_get_prov_data(aio); nni_mtx_lock(&w->mtx); if (nni_aio_list_active(aio)) { diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c index 20bb8548..0fc1c9fc 100644 --- a/src/platform/windows/win_ipclisten.c +++ b/src/platform/windows/win_ipclisten.c @@ -222,9 +222,9 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa) } static void -ipc_accept_cancel(nni_aio *aio, int rv) +ipc_accept_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_listener *l = nni_aio_get_prov_data(aio); + nni_ipc_listener *l = arg; nni_mtx_unlock(&l->mtx); if (aio == nni_list_first(&l->aios)) { diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index cc876d04..aca07ecb 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -45,16 +45,16 @@ struct resolv_item { }; static void -resolv_cancel(nni_aio *aio, int rv) +resolv_cancel(nni_aio *aio, void *arg, int rv) { - resolv_item *item; + resolv_item *item = arg; nni_mtx_lock(&resolv_mtx); - if ((item = nni_aio_get_prov_data(aio)) == NULL) { + if (item != nni_aio_get_prov_extra(aio, 0)) { nni_mtx_unlock(&resolv_mtx); return; } - nni_aio_set_prov_data(aio, NULL); + nni_aio_set_prov_extra(aio, 0, NULL); if (nni_aio_list_active(aio)) { // We have not been picked up by a resolver thread yet, // so we can just discard everything. @@ -236,6 +236,7 @@ resolv_ip(const char *host, const char *serv, int passive, int family, if (resolv_fini) { rv = NNG_ECLOSED; } else { + nni_aio_set_prov_extra(aio, 0, item); rv = nni_aio_schedule(aio, resolv_cancel, item); } if (rv != 0) { @@ -283,7 +284,7 @@ resolv_worker(void *notused) continue; } - item = nni_aio_get_prov_data(aio); + item = nni_aio_get_prov_extra(aio, 0); nni_aio_list_remove(aio); // Now attempt to do the work. This runs synchronously. @@ -294,7 +295,7 @@ resolv_worker(void *notused) // Check to make sure we were not canceled. if ((aio = item->aio) != NULL) { nng_sockaddr *sa = nni_aio_get_input(aio, 0); - nni_aio_set_prov_data(aio, NULL); + nni_aio_set_prov_extra(aio, 0, NULL); item->aio = NULL; memcpy(sa, &item->sa, sizeof(*sa)); nni_aio_finish(aio, rv, 0); diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c index 08b759ca..68c72b9b 100644 --- a/src/platform/windows/win_tcpconn.c +++ b/src/platform/windows/win_tcpconn.c @@ -96,9 +96,9 @@ tcp_recv_cb(nni_win_io *io, int rv, size_t num) } static void -tcp_recv_cancel(nni_aio *aio, int rv) +tcp_recv_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_tcp_conn *c = arg; nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->recv_aios)) { c->recv_rv = rv; @@ -186,9 +186,9 @@ again: } static void -tcp_send_cancel(nni_aio *aio, int rv) +tcp_send_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_conn *c = nni_aio_get_prov_data(aio); + nni_tcp_conn *c = arg; nni_mtx_lock(&c->mtx); if (aio == nni_list_first(&c->send_aios)) { c->send_rv = rv; diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c index 5283ea81..99308ceb 100644 --- a/src/platform/windows/win_tcpdial.c +++ b/src/platform/windows/win_tcpdial.c @@ -94,9 +94,9 @@ nni_tcp_dialer_fini(nni_tcp_dialer *d) } static void -tcp_dial_cancel(nni_aio *aio, int rv) +tcp_dial_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_dialer *d = nni_aio_get_prov_data(aio); + nni_tcp_dialer *d = arg; nni_tcp_conn * c; nni_mtx_lock(&d->mtx); diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c index 1f197246..f0e195be 100644 --- a/src/platform/windows/win_tcplisten.c +++ b/src/platform/windows/win_tcplisten.c @@ -236,9 +236,9 @@ nni_tcp_listener_listen(nni_tcp_listener *l, nni_sockaddr *sa) } static void -tcp_accept_cancel(nni_aio *aio, int rv) +tcp_accept_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_listener *l = nni_aio_get_prov_data(aio); + nni_tcp_listener *l = arg; nni_tcp_conn * c; nni_mtx_lock(&l->mtx); diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index f9ce58fd..586f7143 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -131,9 +131,9 @@ rep0_ctx_init(void **ctxp, void *sarg) } static void -rep0_ctx_cancel_send(nni_aio *aio, int rv) +rep0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) { - rep0_ctx * ctx = nni_aio_get_prov_data(aio); + rep0_ctx * ctx = arg; rep0_sock *s = ctx->sock; nni_mtx_lock(&s->lk); @@ -448,9 +448,9 @@ rep0_pipe_send_cb(void *arg) } static void -rep0_cancel_recv(nni_aio *aio, int rv) +rep0_cancel_recv(nni_aio *aio, void *arg, int rv) { - rep0_ctx * ctx = nni_aio_get_prov_data(aio); + rep0_ctx * ctx = arg; rep0_sock *s = ctx->sock; nni_mtx_lock(&s->lk); diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 6220697b..49d0ea66 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -590,9 +590,9 @@ req0_ctx_reset(req0_ctx *ctx) } static void -req0_ctx_cancel_recv(nni_aio *aio, int rv) +req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv) { - req0_ctx * ctx = nni_aio_get_prov_data(aio); + req0_ctx * ctx = arg; req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); @@ -666,9 +666,9 @@ req0_ctx_recv(void *arg, nni_aio *aio) } static void -req0_ctx_cancel_send(nni_aio *aio, int rv) +req0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) { - req0_ctx * ctx = nni_aio_get_prov_data(aio); + req0_ctx * ctx = arg; req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index 4e0a5263..80e3f2f8 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -133,9 +133,9 @@ resp0_ctx_init(void **ctxp, void *sarg) } static void -resp0_ctx_cancel_send(nni_aio *aio, int rv) +resp0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) { - resp0_ctx * ctx = nni_aio_get_prov_data(aio); + resp0_ctx * ctx = arg; resp0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); @@ -437,9 +437,9 @@ resp0_pipe_send_cb(void *arg) } static void -resp0_cancel_recv(nni_aio *aio, int rv) +resp0_cancel_recv(nni_aio *aio, void *arg, int rv) { - resp0_ctx * ctx = nni_aio_get_prov_data(aio); + resp0_ctx * ctx = arg; resp0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index da4ae5c8..1639b3ec 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -232,9 +232,9 @@ nni_http_client_get_tls(nni_http_client *c, struct nng_tls_config **tlsp) } static void -http_dial_cancel(nni_aio *aio, int rv) +http_dial_cancel(nni_aio *aio, void *arg, int rv) { - nni_http_client *c = nni_aio_get_prov_data(aio); + nni_http_client *c = arg; nni_mtx_lock(&c->mtx); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c index 169918e9..b17b02cf 100644 --- a/src/supplemental/http/http_conn.c +++ b/src/supplemental/http/http_conn.c @@ -350,9 +350,9 @@ http_rd_cb(void *arg) } static void -http_rd_cancel(nni_aio *aio, int rv) +http_rd_cancel(nni_aio *aio, void *arg, int rv) { - nni_http_conn *conn = nni_aio_get_prov_data(aio); + nni_http_conn *conn = arg; nni_mtx_lock(&conn->mtx); if (aio == conn->rd_uaio) { @@ -469,9 +469,9 @@ done: } static void -http_wr_cancel(nni_aio *aio, int rv) +http_wr_cancel(nni_aio *aio, void *arg, int rv) { - nni_http_conn *conn = nni_aio_get_prov_data(aio); + nni_http_conn *conn = arg; nni_mtx_lock(&conn->mtx); if (aio == conn->wr_uaio) { diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c index 42333783..0d15ae9f 100644 --- a/src/supplemental/tls/mbedtls/tls.c +++ b/src/supplemental/tls/mbedtls/tls.c @@ -371,9 +371,9 @@ nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp) } static void -nni_tls_cancel(nni_aio *aio, int rv) +nni_tls_cancel(nni_aio *aio, void *arg, int rv) { - nni_tls *tp = nni_aio_get_prov_data(aio); + nni_tls *tp = arg; nni_mtx_lock(&tp->lk); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index efa0fdf8..a1553704 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -517,9 +517,9 @@ ws_start_write(nni_ws *ws) } static void -ws_cancel_close(nni_aio *aio, int rv) +ws_cancel_close(nni_aio *aio, void *arg, int rv) { - nni_ws *ws = nni_aio_get_prov_data(aio); + nni_ws *ws = arg; nni_mtx_lock(&ws->mtx); if (ws->wclose) { ws->wclose = false; @@ -616,15 +616,13 @@ ws_write_cb(void *arg) } static void -ws_write_cancel(nni_aio *aio, int rv) +ws_write_cancel(nni_aio *aio, void *arg, int rv) { - nni_ws * ws; + nni_ws * ws = arg; ws_msg * wm; ws_frame *frame; - // Is this aio active? We can tell by looking at the - // active tx frame. - ws = nni_aio_get_prov_data(aio); + // Is this aio active? We can tell by looking at the active tx frame. nni_mtx_lock(&ws->mtx); if (!nni_aio_list_active(aio)) { @@ -1038,9 +1036,9 @@ ws_read_cb(void *arg) } static void -ws_read_cancel(nni_aio *aio, int rv) +ws_read_cancel(nni_aio *aio, void *arg, int rv) { - nni_ws *ws = nni_aio_get_prov_data(aio); + nni_ws *ws = arg; ws_msg *wm; nni_mtx_lock(&ws->mtx); @@ -1676,9 +1674,9 @@ nni_ws_listener_proto(nni_ws_listener *l, const char *proto) } static void -ws_accept_cancel(nni_aio *aio, int rv) +ws_accept_cancel(nni_aio *aio, void *arg, int rv) { - nni_ws_listener *l = nni_aio_get_prov_data(aio); + nni_ws_listener *l = arg; nni_mtx_lock(&l->mtx); if (nni_aio_list_active(aio)) { @@ -2031,9 +2029,9 @@ nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto) } static void -ws_dial_cancel(nni_aio *aio, int rv) +ws_dial_cancel(nni_aio *aio, void *arg, int rv) { - nni_ws *ws = nni_aio_get_prov_data(aio); + nni_ws *ws = arg; nni_mtx_lock(&ws->mtx); if (aio == ws->useraio) { diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 89f9b024..3194a56d 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -245,10 +245,9 @@ nni_inproc_ep_fini(void *arg) } static void -nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe) +inproc_conn_finish( + nni_aio *aio, int rv, nni_inproc_ep *ep, nni_inproc_pipe *pipe) { - nni_inproc_ep *ep = nni_aio_get_prov_data(aio); - nni_aio_list_remove(aio); if ((ep != NULL) && (!ep->listener) && nni_list_empty(&ep->aios)) { @@ -289,12 +288,12 @@ nni_inproc_ep_close(void *arg) // Notify any waiting clients that we are closed. while ((client = nni_list_first(&ep->clients)) != NULL) { while ((aio = nni_list_first(&client->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); + inproc_conn_finish(aio, NNG_ECONNREFUSED, ep, NULL); } nni_list_remove(&ep->clients, client); } while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECLOSED, NULL); + inproc_conn_finish(aio, NNG_ECLOSED, ep, NULL); } nni_mtx_unlock(&nni_inproc.mx); } @@ -322,8 +321,10 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) } if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { - nni_inproc_conn_finish(caio, NNG_ENOMEM, NULL); - nni_inproc_conn_finish(saio, NNG_ENOMEM, NULL); + inproc_conn_finish( + caio, NNG_ENOMEM, cli, NULL); + inproc_conn_finish( + saio, NNG_ENOMEM, srv, NULL); continue; } nni_mtx_init(&pair->mx); @@ -340,8 +341,8 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) if (spipe != NULL) { nni_inproc_pipe_fini(spipe); } - nni_inproc_conn_finish(caio, rv, NULL); - nni_inproc_conn_finish(saio, rv, NULL); + inproc_conn_finish(caio, rv, cli, NULL); + inproc_conn_finish(saio, rv, srv, NULL); nni_inproc_pair_destroy(pair); continue; } @@ -357,8 +358,8 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) nni_msgq_set_filter(spipe->rq, inproc_filter, spipe); nni_msgq_set_filter(cpipe->rq, inproc_filter, cpipe); - nni_inproc_conn_finish(caio, 0, cpipe); - nni_inproc_conn_finish(saio, 0, spipe); + inproc_conn_finish(caio, 0, cli, cpipe); + inproc_conn_finish(saio, 0, srv, spipe); } if (nni_list_first(&cli->aios) == NULL) { @@ -372,9 +373,9 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) } static void -nni_inproc_ep_cancel(nni_aio *aio, int rv) +nni_inproc_ep_cancel(nni_aio *aio, void *arg, int rv) { - nni_inproc_ep *ep = nni_aio_get_prov_data(aio); + nni_inproc_ep *ep = arg; nni_mtx_lock(&nni_inproc.mx); if (nni_aio_list_active(aio)) { @@ -405,7 +406,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) } } if (server == NULL) { - // nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); nni_mtx_unlock(&nni_inproc.mx); nni_aio_finish_error(aio, NNG_ECONNREFUSED); return; diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 5d0064e5..05e0b895 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -451,9 +451,9 @@ error: } static void -ipctran_pipe_send_cancel(nni_aio *aio, int rv) +ipctran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) { - ipctran_pipe *p = nni_aio_get_prov_data(aio); + ipctran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -544,9 +544,9 @@ ipctran_pipe_send(void *arg, nni_aio *aio) } static void -ipctran_pipe_recv_cancel(nni_aio *aio, int rv) +ipctran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) { - ipctran_pipe *p = nni_aio_get_prov_data(aio); + ipctran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -686,9 +686,9 @@ ipctran_pipe_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_opt_type t) } static void -ipctran_pipe_conn_cancel(nni_aio *aio, int rv) +ipctran_pipe_conn_cancel(nni_aio *aio, void *arg, int rv) { - ipctran_pipe *p = nni_aio_get_prov_data(aio); + ipctran_pipe *p = arg; nni_mtx_lock(&p->ep->mtx); if (aio == p->useraio) { diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index c58d66ec..3a4e018e 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -193,9 +193,9 @@ tcptran_pipe_init(tcptran_pipe **pipep, tcptran_ep *ep) } static void -tcptran_pipe_conn_cancel(nni_aio *aio, int rv) +tcptran_pipe_conn_cancel(nni_aio *aio, void *arg, int rv) { - tcptran_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = arg; nni_mtx_lock(&p->ep->mtx); if (aio == p->useraio) { @@ -483,9 +483,9 @@ recv_error: } static void -tcptran_pipe_send_cancel(nni_aio *aio, int rv) +tcptran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) { - tcptran_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -576,9 +576,9 @@ tcptran_pipe_send(void *arg, nni_aio *aio) } static void -tcptran_pipe_recv_cancel(nni_aio *aio, int rv) +tcptran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) { - tcptran_pipe *p = nni_aio_get_prov_data(aio); + tcptran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index aff98c27..49bc932d 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -197,9 +197,9 @@ tlstran_pipe_reap(tlstran_pipe *p) } static void -tlstran_pipe_conn_cancel(nni_aio *aio, int rv) +tlstran_pipe_conn_cancel(nni_aio *aio, void *arg, int rv) { - tlstran_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = arg; nni_mtx_lock(&p->ep->mtx); if (aio == p->useraio) { @@ -493,9 +493,9 @@ recv_error: } static void -tlstran_pipe_send_cancel(nni_aio *aio, int rv) +tlstran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) { - tlstran_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -578,9 +578,9 @@ tlstran_pipe_send(void *arg, nni_aio *aio) } static void -tlstran_pipe_recv_cancel(nni_aio *aio, int rv) +tlstran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) { - tlstran_pipe *p = nni_aio_get_prov_data(aio); + tlstran_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 81638d95..9b3f67f4 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -120,9 +120,9 @@ ws_pipe_recv_cb(void *arg) } static void -ws_pipe_recv_cancel(nni_aio *aio, int rv) +ws_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) { - ws_pipe *p = nni_aio_get_prov_data(aio); + ws_pipe *p = arg; nni_mtx_lock(&p->mtx); if (p->user_rxaio != aio) { nni_mtx_unlock(&p->mtx); @@ -155,9 +155,9 @@ ws_pipe_recv(void *arg, nni_aio *aio) } static void -ws_pipe_send_cancel(nni_aio *aio, int rv) +ws_pipe_send_cancel(nni_aio *aio, void *arg, int rv) { - ws_pipe *p = nni_aio_get_prov_data(aio); + ws_pipe *p = arg; nni_mtx_lock(&p->mtx); if (p->user_txaio != aio) { nni_mtx_unlock(&p->mtx); @@ -299,9 +299,9 @@ ws_listener_bind(void *arg) } static void -ws_listener_cancel(nni_aio *aio, int rv) +ws_listener_cancel(nni_aio *aio, void *arg, int rv) { - ws_listener *l = nni_aio_get_prov_data(aio); + ws_listener *l = arg; nni_mtx_lock(&l->mtx); if (nni_aio_list_active(aio)) { @@ -337,9 +337,9 @@ ws_listener_accept(void *arg, nni_aio *aio) } static void -ws_dialer_cancel(nni_aio *aio, int rv) +ws_dialer_cancel(nni_aio *aio, void *arg, int rv) { - ws_dialer *d = nni_aio_get_prov_data(aio); + ws_dialer *d = arg; nni_mtx_lock(&d->mtx); if (nni_aio_list_active(aio)) { diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index d1072931..0a68c1ca 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -1840,9 +1840,9 @@ zt_pipe_send(void *arg, nni_aio *aio) } static void -zt_pipe_cancel_recv(nni_aio *aio, int rv) +zt_pipe_cancel_recv(nni_aio *aio, void *arg, int rv) { - zt_pipe *p = nni_aio_get_prov_data(aio); + zt_pipe *p = arg; nni_mtx_lock(&zt_lk); if (p->zp_user_rxaio == aio) { p->zp_user_rxaio = NULL; @@ -2331,9 +2331,9 @@ zt_ep_bind(void *arg) } static void -zt_ep_cancel(nni_aio *aio, int rv) +zt_ep_cancel(nni_aio *aio, void *arg, int rv) { - zt_ep *ep = nni_aio_get_prov_data(aio); + zt_ep *ep = arg; nni_mtx_lock(&zt_lk); if (nni_aio_list_active(aio)) { @@ -2417,9 +2417,9 @@ zt_ep_accept(void *arg, nni_aio *aio) } static void -zt_ep_conn_req_cancel(nni_aio *aio, int rv) +zt_ep_conn_req_cancel(nni_aio *aio, void *arg, int rv) { - zt_ep *ep = nni_aio_get_prov_data(aio); + zt_ep *ep = arg; // We don't have much to do here. The AIO will have been // canceled as a result of the "parent" AIO canceling. nni_mtx_lock(&zt_lk); |
