From d606317f5c028fa8fba5d5384b0ccd90ffa4eab5 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 8 Feb 2018 14:16:23 -0800 Subject: fixes #171 Refactor aio to use generic data fields This addresses the use of the pipe special field, and eliminates it. The message APIs (recvmsg, sendmsg) need to be updated as well still, but I want to handle that as part of a separate issue. While here we fixed various compiler warnings, etc. --- src/core/aio.c | 45 +++-------- src/core/aio.h | 3 - src/core/endpt.c | 9 +-- src/core/url.c | 4 +- src/nng.c | 2 +- src/platform/posix/posix_epdesc.c | 5 +- src/platform/windows/win_ipc.c | 6 +- src/platform/windows/win_tcp.c | 10 +-- src/supplemental/http/http_client.c | 2 +- src/supplemental/http/http_public.c | 2 +- src/supplemental/http/http_server.c | 2 +- src/supplemental/websocket/websocket.c | 9 ++- src/transport/inproc/inproc.c | 135 +++++++++++++++------------------ src/transport/ipc/ipc.c | 8 +- src/transport/tcp/tcp.c | 8 +- src/transport/tls/tls.c | 8 +- src/transport/ws/websocket.c | 16 ++-- src/transport/zerotier/zerotier.c | 12 ++- tests/httpserver.c | 2 +- tests/resolv.c | 22 +++--- 20 files changed, 137 insertions(+), 173 deletions(-) diff --git a/src/core/aio.c b/src/core/aio.c index ee1ddf2a..1c293020 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -74,17 +74,14 @@ struct nng_aio { // Read/write operations. nni_iov *a_iov; - int a_niov; + unsigned a_niov; nni_iov a_iovinl[4]; // inline IOVs - when the IOV list is short nni_iov *a_iovalloc; // dynamically allocated IOVs - int a_niovalloc; // number of allocated IOVs + unsigned a_niovalloc; // number of allocated IOVs // Message operations. nni_msg *a_msg; - // Connect/accept operations. - void *a_pipe; // opaque pipe handle - // User scratch data. Consumers may store values here, which // must be preserved by providers and the framework. void *a_user_data[4]; @@ -217,18 +214,6 @@ nni_aio_get_msg(nni_aio *aio) return (aio->a_msg); } -void -nni_aio_set_pipe(nni_aio *aio, void *p) -{ - aio->a_pipe = p; -} - -void * -nni_aio_get_pipe(nni_aio *aio) -{ - return (aio->a_pipe); -} - void nni_aio_set_data(nni_aio *aio, int index, void *data) { @@ -330,6 +315,9 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; aio->a_active = 1; + for (int i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { + aio->a_outputs[i] = NULL; + } // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { @@ -370,8 +358,7 @@ nni_aio_abort(nni_aio *aio, int rv) // I/O provider related functions. static void -nni_aio_finish_impl( - nni_aio *aio, int result, size_t count, void *pipe, nni_msg *msg) +nni_aio_finish_impl(nni_aio *aio, int rv, size_t count, nni_msg *msg) { nni_mtx_lock(&nni_aio_lk); @@ -380,12 +367,9 @@ nni_aio_finish_impl( nni_list_node_remove(&aio->a_expire_node); aio->a_pend = 1; - aio->a_result = result; + aio->a_result = rv; aio->a_count = count; aio->a_prov_cancel = NULL; - if (pipe) { - aio->a_pipe = pipe; - } if (msg) { aio->a_msg = msg; } @@ -409,27 +393,20 @@ nni_aio_finish_impl( void nni_aio_finish(nni_aio *aio, int result, size_t count) { - nni_aio_finish_impl(aio, result, count, NULL, NULL); + nni_aio_finish_impl(aio, result, count, NULL); } void nni_aio_finish_error(nni_aio *aio, int result) { - nni_aio_finish_impl(aio, result, 0, NULL, NULL); -} - -void -nni_aio_finish_pipe(nni_aio *aio, void *pipe) -{ - NNI_ASSERT(pipe != NULL); - nni_aio_finish_impl(aio, 0, 0, pipe, NULL); + nni_aio_finish_impl(aio, result, 0, NULL); } void nni_aio_finish_msg(nni_aio *aio, nni_msg *msg) { NNI_ASSERT(msg != NULL); - nni_aio_finish_impl(aio, 0, nni_msg_len(msg), NULL, msg); + nni_aio_finish_impl(aio, 0, nni_msg_len(msg), msg); } void @@ -609,7 +586,7 @@ nni_aio_iov_count(nni_aio *aio) { size_t resid = 0; - for (int i = 0; i < aio->a_niov; i++) { + for (unsigned i = 0; i < aio->a_niov; i++) { resid += aio->a_iov[i].iov_len; } return (resid); diff --git a/src/core/aio.h b/src/core/aio.h index b17c8e97..718eeb91 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -78,8 +78,6 @@ extern void *nni_aio_get_output(nni_aio *, int); // XXX: These should be refactored in terms of generic inputs and outputs. extern void nni_aio_set_msg(nni_aio *, nni_msg *); extern nni_msg *nni_aio_get_msg(nni_aio *); -extern void nni_aio_set_pipe(nni_aio *, void *); -extern void * nni_aio_get_pipe(nni_aio *); // nni_aio_set_synch sets a synchronous completion flag on the AIO. // When this is set, the next time the AIO is completed, the callback @@ -126,7 +124,6 @@ extern int nni_aio_list_active(nni_aio *); // nni_aio_finish is called by the provider when an operation is complete. extern void nni_aio_finish(nni_aio *, int, size_t); extern void nni_aio_finish_error(nni_aio *, int); -extern void nni_aio_finish_pipe(nni_aio *, void *); extern void nni_aio_finish_msg(nni_aio *, nni_msg *); // nni_aio_abort is used to abort an operation. Any pending I/O or diff --git a/src/core/endpt.c b/src/core/endpt.c index 4a2c3097..b7167ad7 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -364,7 +364,7 @@ nni_ep_con_cb(void *arg) int rv; if ((rv = nni_aio_result(aio)) == 0) { - rv = nni_pipe_create(ep, nni_aio_get_pipe(aio)); + rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0)); } nni_mtx_lock(&ep->ep_mtx); switch (rv) { @@ -448,7 +448,7 @@ nni_ep_dial(nni_ep *ep, int flags) // As we're synchronous, we also have to handle the completion. if (((rv = nni_aio_result(aio)) != 0) || - ((rv = nni_pipe_create(ep, nni_aio_get_pipe(aio))) != 0)) { + ((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) { nni_mtx_lock(&ep->ep_mtx); ep->ep_started = 0; nni_mtx_unlock(&ep->ep_mtx); @@ -464,8 +464,8 @@ nni_ep_acc_cb(void *arg) int rv; if ((rv = nni_aio_result(aio)) == 0) { - NNI_ASSERT(nni_aio_get_pipe(aio) != NULL); - rv = nni_pipe_create(ep, nni_aio_get_pipe(aio)); + NNI_ASSERT(nni_aio_get_output(aio, 0) != NULL); + rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0)); } nni_mtx_lock(&ep->ep_mtx); @@ -503,7 +503,6 @@ nni_ep_acc_start(nni_ep *ep) if (ep->ep_closing) { return; } - nni_aio_set_pipe(aio, NULL); ep->ep_ops.ep_accept(ep->ep_data, aio); } diff --git a/src/core/url.c b/src/core/url.c index 6a2a4e53..2cdb43c2 100644 --- a/src/core/url.c +++ b/src/core/url.c @@ -284,7 +284,7 @@ nni_url_parse(nni_url **urlp, const char *raw) rv = NNG_ENOMEM; goto error; } - for (int i = 0; i < len; i++) { + for (size_t i = 0; i < len; i++) { url->u_scheme[i] = tolower(s[i]); } url->u_scheme[len] = '\0'; @@ -334,7 +334,7 @@ nni_url_parse(nni_url **urlp, const char *raw) } // Copy the host portion, but make it lower case (hostnames are // case insensitive). - for (int i = 0; i < len; i++) { + for (size_t i = 0; i < len; i++) { url->u_host[i] = tolower(s[i]); } url->u_host[len] = '\0'; diff --git a/src/nng.c b/src/nng.c index ef5b5292..f552e26b 100644 --- a/src/nng.c +++ b/src/nng.c @@ -1140,7 +1140,7 @@ void nng_aio_finish(nng_aio *aio, int rv) { // Preserve the count. - return (nni_aio_finish(aio, rv, nni_aio_count(aio))); + nni_aio_finish(aio, rv, nni_aio_count(aio)); } #if 0 diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 931ed052..b5e65826 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -74,7 +74,8 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) if (rv != 0) { nni_aio_finish_error(aio, rv); } else { - nni_aio_finish_pipe(aio, pd); + nni_aio_set_output(aio, 0, pd); + nni_aio_finish(aio, 0, 0); } } @@ -317,7 +318,6 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) // connection is ready for us. There isn't anything else for us to // do really, as that will have been done in listen. nni_mtx_lock(&ed->mtx); - nni_aio_set_pipe(aio, NULL); // If we can't start, it means that the AIO was stopped. if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { nni_mtx_unlock(&ed->mtx); @@ -343,7 +343,6 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) int fd; nni_mtx_lock(&ed->mtx); - nni_aio_set_pipe(aio, NULL); // If we can't start, it means that the AIO was stopped. if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { nni_mtx_unlock(&ed->mtx); diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index 63ba8a67..9851af25 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -321,7 +321,8 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) return; } - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); } static void @@ -448,7 +449,8 @@ nni_win_ipc_conn_thr(void *arg) ((rv = nni_win_iocp_register(p)) != 0)) { goto fail; } - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); continue; fail: diff --git a/src/platform/windows/win_tcp.c b/src/platform/windows/win_tcp.c index 254cf40b..6d58d495 100644 --- a/src/platform/windows/win_tcp.c +++ b/src/platform/windows/win_tcp.c @@ -105,7 +105,7 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) DWORD niov; DWORD flags; nni_plat_tcp_pipe *pipe = evt->ptr; - int i; + unsigned i; unsigned naiov; nni_iov * aiov; WSABUF * iov; @@ -469,7 +469,8 @@ nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio) memcpy(&pipe->sockname, sa1, len1); memcpy(&pipe->peername, sa2, len2); - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); } static int @@ -512,7 +513,6 @@ nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) void nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) { - nni_aio_set_pipe(aio, NULL); nni_win_event_submit(&ep->acc_ev, aio); } @@ -562,7 +562,8 @@ nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio) len = sizeof(pipe->sockname); (void) getsockname(s, (SOCKADDR *) &pipe->sockname, &len); - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); } static int @@ -632,7 +633,6 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) extern void nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) { - nni_aio_set_pipe(aio, NULL); nni_win_event_submit(&ep->con_ev, aio); } diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index 918b7b09..9058b6e2 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -44,7 +44,7 @@ http_conn_done(void *arg) nni_mtx_lock(&c->mtx); rv = nni_aio_result(c->connaio); - p = rv == 0 ? nni_aio_get_pipe(c->connaio) : NULL; + p = rv == 0 ? nni_aio_get_output(c->connaio, 0) : NULL; if ((aio = nni_list_first(&c->aios)) == NULL) { if (p != NULL) { nni_plat_tcp_pipe_fini(p); diff --git a/src/supplemental/http/http_public.c b/src/supplemental/http/http_public.c index 9d23ed8e..5437a000 100644 --- a/src/supplemental/http/http_public.c +++ b/src/supplemental/http/http_public.c @@ -737,7 +737,7 @@ void nng_http_client_connect(nng_http_client *cli, nng_aio *aio) { #ifdef NNG_SUPP_HTTP - return (nni_http_client_connect(cli, aio)); + nni_http_client_connect(cli, aio); #else NNI_ARG_UNUSED(cli); if (nni_aio_start(aio, NULL, NULL)) { diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index 4a426c8c..4c89708d 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -719,7 +719,7 @@ http_server_acccb(void *arg) nni_mtx_unlock(&s->mtx); return; } - tcp = nni_aio_get_pipe(aio); + tcp = nni_aio_get_output(aio, 0); if (s->closed) { // If we're closing, then reject this one. nni_plat_tcp_pipe_fini(tcp); diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index ad4ce196..05a2c62c 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -1174,7 +1174,8 @@ ws_http_cb_listener(nni_ws *ws, nni_aio *aio) ws->ready = true; if ((aio = nni_list_first(&l->aios)) != NULL) { nni_list_remove(&l->aios, aio); - nni_aio_finish_pipe(aio, ws); + nni_aio_set_output(aio, 0, ws); + nni_aio_finish(aio, 0, 0); } else { nni_list_append(&l->pend, ws); } @@ -1269,7 +1270,8 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) nni_list_remove(&d->wspend, ws); ws->ready = true; ws->useraio = NULL; - nni_aio_finish_pipe(uaio, ws); + nni_aio_set_output(uaio, 0, ws); + nni_aio_finish(uaio, 0, 0); nni_mtx_unlock(&d->mtx); return; err: @@ -1621,7 +1623,8 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio) } if ((ws = nni_list_first(&l->pend)) != NULL) { nni_list_remove(&l->pend, ws); - nni_aio_finish_pipe(aio, ws); + nni_aio_set_output(aio, 0, ws); + nni_aio_finish(aio, 0, 0); } else { nni_list_append(&l->aios, aio); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 6329a627..f9f438ab 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -217,14 +217,11 @@ nni_inproc_ep_fini(void *arg) } static void -nni_inproc_conn_finish(nni_aio *aio, int rv) +nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe) { nni_inproc_ep *ep = nni_aio_get_prov_data(aio); - void * pipe; nni_aio_list_remove(aio); - pipe = nni_aio_get_pipe(aio); - nni_aio_set_pipe(aio, NULL); if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) && nni_list_empty(&ep->aios)) { @@ -232,11 +229,10 @@ nni_inproc_conn_finish(nni_aio *aio, int rv) } if (rv == 0) { - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); } else { - if (pipe != NULL) { - nni_inproc_pipe_fini(pipe); - } + NNI_ASSERT(pipe == NULL); nni_aio_finish_error(aio, rv); } } @@ -255,67 +251,81 @@ nni_inproc_ep_close(void *arg) // Notify any waiting clients that we are closed. while ((client = nni_list_first(&ep->clients)) != NULL) { while ((aio = nni_list_first(&client->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); } nni_list_remove(&ep->clients, client); } while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECLOSED); + nni_inproc_conn_finish(aio, NNG_ECLOSED, NULL); } nni_mtx_unlock(&nni_inproc.mx); } static void -nni_inproc_accept_clients(nni_inproc_ep *server) +nni_inproc_accept_clients(nni_inproc_ep *srv) { - nni_inproc_ep * client, *nclient; - nni_aio * saio, *caio; - nni_inproc_pair *pair; - int rv; + nni_inproc_ep *cli, *nclient; + + nclient = nni_list_first(&srv->clients); + while ((cli = nclient) != NULL) { + nni_aio *caio; + nclient = nni_list_next(&srv->clients, nclient); + NNI_LIST_FOREACH (&cli->aios, caio) { - nclient = nni_list_first(&server->clients); - while ((client = nclient) != NULL) { - nclient = nni_list_next(&server->clients, nclient); - NNI_LIST_FOREACH (&client->aios, caio) { - if ((saio = nni_list_first(&server->aios)) == NULL) { + nni_inproc_pipe *cpipe; + nni_inproc_pipe *spipe; + nni_inproc_pair *pair; + nni_aio * saio; + int rv; + + if ((saio = nni_list_first(&srv->aios)) == NULL) { // No outstanding accept() calls. break; } if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { - nni_inproc_conn_finish(caio, NNG_ENOMEM); - nni_inproc_conn_finish(saio, NNG_ENOMEM); + nni_inproc_conn_finish(caio, NNG_ENOMEM, NULL); + nni_inproc_conn_finish(saio, NNG_ENOMEM, NULL); continue; } + nni_mtx_init(&pair->mx); - if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || + spipe = cpipe = NULL; + if (((rv = nni_inproc_pipe_init(&cpipe, cli)) != 0) || + ((rv = nni_inproc_pipe_init(&spipe, srv)) != 0) || + ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) { + + if (cpipe != NULL) { + nni_inproc_pipe_fini(cpipe); + } + if (spipe != NULL) { + nni_inproc_pipe_fini(spipe); + } + nni_inproc_conn_finish(caio, rv, NULL); + nni_inproc_conn_finish(saio, rv, NULL); nni_inproc_pair_destroy(pair); - nni_inproc_conn_finish(caio, rv); - nni_inproc_conn_finish(saio, rv); continue; } - nni_mtx_init(&pair->mx); - - pair->pipes[0] = nni_aio_get_pipe(caio); - pair->pipes[1] = nni_aio_get_pipe(saio); - pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; - pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1]; - pair->pipes[0]->pair = pair->pipes[1]->pair = pair; - pair->pipes[1]->peer = pair->pipes[0]->proto; - pair->pipes[0]->peer = pair->pipes[1]->proto; - pair->refcnt = 2; - - nni_inproc_conn_finish(caio, 0); - nni_inproc_conn_finish(saio, 0); + spipe->peer = cpipe->proto; + cpipe->peer = spipe->proto; + pair->pipes[0] = cpipe; + pair->pipes[1] = spipe; + pair->refcnt = 2; + cpipe->pair = spipe->pair = pair; + cpipe->rq = spipe->wq = pair->q[0]; + cpipe->wq = spipe->rq = pair->q[1]; + + nni_inproc_conn_finish(caio, 0, cpipe); + nni_inproc_conn_finish(saio, 0, spipe); } - if (nni_list_first(&client->aios) == NULL) { - // No more outstanding client connects. Normally - // there should only be one. - if (nni_list_active(&server->clients, client)) { - nni_list_remove(&server->clients, client); + if (nni_list_first(&cli->aios) == NULL) { + // No more outstanding client connects. + // Normally there should only be one. + if (nni_list_active(&srv->clients, cli)) { + nni_list_remove(&srv->clients, cli); } } } @@ -324,17 +334,12 @@ nni_inproc_accept_clients(nni_inproc_ep *server) static void nni_inproc_ep_cancel(nni_aio *aio, int rv) { - nni_inproc_ep * ep = nni_aio_get_prov_data(aio); - nni_inproc_pipe *pipe; + nni_inproc_ep *ep = nni_aio_get_prov_data(aio); nni_mtx_lock(&nni_inproc.mx); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_list_node_remove(&ep->node); - if ((pipe = nni_aio_get_pipe(aio)) != NULL) { - nni_aio_set_pipe(aio, NULL); - nni_inproc_pipe_fini(pipe); - } nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&nni_inproc.mx); @@ -343,10 +348,9 @@ nni_inproc_ep_cancel(nni_aio *aio, int rv) static void nni_inproc_ep_connect(void *arg, nni_aio *aio) { - nni_inproc_ep * ep = arg; - nni_inproc_ep * server; - int rv; - nni_inproc_pipe *pipe; + nni_inproc_ep *ep = arg; + nni_inproc_ep *server; + int rv; if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish_error(aio, NNG_EINVAL); @@ -359,13 +363,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) return; } - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - nni_aio_set_pipe(aio, pipe); - // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { if (strcmp(server->addr, ep->addr) == 0) { @@ -373,7 +370,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) } } if (server == NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); nni_mtx_unlock(&nni_inproc.mx); return; } @@ -407,9 +404,8 @@ nni_inproc_ep_bind(void *arg) static void nni_inproc_ep_accept(void *arg, nni_aio *aio) { - nni_inproc_ep * ep = arg; - nni_inproc_pipe *pipe; - int rv; + nni_inproc_ep *ep = arg; + int rv; nni_mtx_lock(&nni_inproc.mx); @@ -419,16 +415,7 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) } // We are already on the master list of servers, thanks to bind. - - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - nni_aio_set_pipe(aio, pipe); - - // Insert us into the pending server aios, and then run the - // accept list. + // Insert us into pending server aios, and then run accept list. nni_aio_list_append(&ep->aios, aio); nni_inproc_accept_clients(ep); nni_mtx_unlock(&nni_inproc.mx); diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 6475e43b..9ebf3145 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -579,20 +579,20 @@ nni_ipc_ep_finish(nni_ipc_ep *ep) if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); + NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); + rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); done: - nni_aio_set_pipe(ep->aio, NULL); aio = ep->user_aio; ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { NNI_ASSERT(pipe != NULL); - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); return; } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 1504d0ee..475a77ff 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -631,19 +631,19 @@ nni_tcp_ep_finish(nni_tcp_ep *ep) if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); + NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); + rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); done: - nni_aio_set_pipe(ep->aio, NULL); aio = ep->user_aio; ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); return; } if (pipe != NULL) { diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 09c59582..fc993501 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -653,19 +653,19 @@ nni_tls_ep_finish(nni_tls_ep *ep) if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); + NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_tls_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); + rv = nni_tls_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); done: - nni_aio_set_pipe(ep->aio, NULL); aio = ep->user_aio; ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); return; } if (pipe != NULL) { diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 2fe7804c..9740cdde 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -205,7 +205,6 @@ ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws) { ws_pipe *p; int rv; - nni_aio *aio; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -225,11 +224,6 @@ ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws) p->lproto = ep->lproto; p->ws = ws; - if ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_aio_list_remove(aio); - nni_aio_finish_pipe(aio, p); - } - *pipep = p; return (0); } @@ -621,7 +615,7 @@ ws_ep_conn_cb(void *arg) nni_mtx_lock(&ep->mtx); if (nni_aio_result(caio) == 0) { - ws = nni_aio_get_pipe(caio); + ws = nni_aio_get_output(caio, 0); } if ((uaio = nni_list_first(&ep->aios)) == NULL) { // The client stopped caring about this! @@ -639,7 +633,8 @@ ws_ep_conn_cb(void *arg) nni_ws_fini(ws); nni_aio_finish_error(uaio, rv); } else { - nni_aio_finish_pipe(uaio, p); + nni_aio_set_output(uaio, 0, p); + nni_aio_finish(uaio, 0, 0); } nni_mtx_unlock(&ep->mtx); } @@ -672,7 +667,7 @@ ws_ep_acc_cb(void *arg) nni_aio_finish_error(uaio, rv); } } else { - nni_ws *ws = nni_aio_get_pipe(aaio); + nni_ws *ws = nni_aio_get_output(aaio, 0); if (uaio != NULL) { ws_pipe *p; // Make a pipe @@ -681,7 +676,8 @@ ws_ep_acc_cb(void *arg) nni_ws_close(ws); nni_aio_finish_error(uaio, rv); } else { - nni_aio_finish_pipe(uaio, p); + nni_aio_set_output(uaio, 0, p); + nni_aio_finish(uaio, 0, 0); } } } diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index d6762d25..6dddbb9a 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -690,7 +690,8 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) nni_idhash_remove(ztn->zn_eps, ep->ze_laddr); ep->ze_laddr = 0; - nni_aio_finish_pipe(aio, p); + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } static void @@ -2351,7 +2352,8 @@ zt_ep_doaccept(zt_ep *ep) } p->zp_peer = creq.cr_proto; zt_pipe_send_conn_ack(p); - nni_aio_finish_pipe(aio, p); + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } } @@ -2398,14 +2400,16 @@ zt_ep_conn_req_cb(void *arg) ep->ze_creq_active = 0; switch ((rv = nni_aio_result(aio))) { case 0: + p = nni_aio_get_output(aio, 0); // Already canceled, or already handled? if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { nni_aio_list_remove(uaio); - nni_aio_finish_pipe(uaio, nni_aio_get_pipe(aio)); + nni_aio_set_output(uaio, 0, p); + nni_aio_finish(uaio, 0, 0); } else { // We have a pipe, but nowhere to stick it. // Just discard it. - zt_pipe_fini(nni_aio_get_pipe(aio)); + zt_pipe_fini(p); } ep->ze_creq_try = 0; break; diff --git a/tests/httpserver.c b/tests/httpserver.c index a62bae25..bbdd8423 100644 --- a/tests/httpserver.c +++ b/tests/httpserver.c @@ -200,7 +200,7 @@ TestMain("HTTP Server", { nng_aio_wait(aio); So(nng_aio_result(aio) == 0); - h = nni_aio_get_output(aio, 0); + h = nng_aio_get_output(aio, 0); So(h != NULL); So(nng_http_req_alloc(&req, url) == 0); So(nng_http_res_alloc(&res) == 0); diff --git a/tests/resolv.c b/tests/resolv.c index 607b3b11..9d83e6df 100644 --- a/tests/resolv.c +++ b/tests/resolv.c @@ -52,20 +52,20 @@ ip6tostr(void *addr) // the normal assumptions on Linux do *not* hold true. #if 0 Convey("Localhost IPv6 resolves", { - nni_aio aio; - memset(&aio, 0, sizeof (aio)); + nng_aio *aio; const char *str; - nni_aio_init(&aio, NULL, NULL); + nng_sockaddr sa; + So(nng_aio_alloc(&aio, NULL, NULL) == 0); + So(nng_aio_set_input(aio, 0, &sa) == 0); nni_plat_tcp_resolv("localhost", "80", NNG_AF_INET6, 1, - &aio); - nni_aio_wait(&aio); - So(nni_aio_result(&aio) == 0); - So(aio.a_naddrs == 1); - So(aio.a_addrs[0].s_un.s_in6.sa_family == NNG_AF_INET6); - So(aio.a_addrs[0].s_un.s_in6.sa_port == ntohs(80)); - str = ip6tostr(&aio.a_addrs[0].s_un.s_in6.sa_addr); + aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == 0); + So(sa.s_un.s_in6.sa_family == NNG_AF_INET6); + So(sa.s_un.s_in6.sa_port == ntohs(80)); + str = ip6tostr(&sa.s_un.s_in6.sa_addr); So(strcmp(str, "::1") == 0); - nni_aio_fini(&aio); + nng_aio_free(aio); } #endif -- cgit v1.2.3-70-g09d2