diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 45 | ||||
| -rw-r--r-- | src/core/aio.h | 3 | ||||
| -rw-r--r-- | src/core/endpt.c | 9 | ||||
| -rw-r--r-- | src/core/url.c | 4 | ||||
| -rw-r--r-- | src/nng.c | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 5 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 6 | ||||
| -rw-r--r-- | src/platform/windows/win_tcp.c | 10 | ||||
| -rw-r--r-- | src/supplemental/http/http_client.c | 2 | ||||
| -rw-r--r-- | src/supplemental/http/http_public.c | 2 | ||||
| -rw-r--r-- | src/supplemental/http/http_server.c | 2 | ||||
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 9 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 135 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 8 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 8 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 8 | ||||
| -rw-r--r-- | src/transport/ws/websocket.c | 16 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 12 |
18 files changed, 125 insertions, 161 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index ee1ddf2a..1c293020 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -74,17 +74,14 @@ struct nng_aio { // Read/write operations. nni_iov *a_iov; - int a_niov; + unsigned a_niov; nni_iov a_iovinl[4]; // inline IOVs - when the IOV list is short nni_iov *a_iovalloc; // dynamically allocated IOVs - int a_niovalloc; // number of allocated IOVs + unsigned a_niovalloc; // number of allocated IOVs // Message operations. nni_msg *a_msg; - // Connect/accept operations. - void *a_pipe; // opaque pipe handle - // User scratch data. Consumers may store values here, which // must be preserved by providers and the framework. void *a_user_data[4]; @@ -218,18 +215,6 @@ nni_aio_get_msg(nni_aio *aio) } void -nni_aio_set_pipe(nni_aio *aio, void *p) -{ - aio->a_pipe = p; -} - -void * -nni_aio_get_pipe(nni_aio *aio) -{ - return (aio->a_pipe); -} - -void nni_aio_set_data(nni_aio *aio, int index, void *data) { if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_user_data))) { @@ -330,6 +315,9 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; aio->a_active = 1; + for (int i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { + aio->a_outputs[i] = NULL; + } // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { @@ -370,8 +358,7 @@ nni_aio_abort(nni_aio *aio, int rv) // I/O provider related functions. static void -nni_aio_finish_impl( - nni_aio *aio, int result, size_t count, void *pipe, nni_msg *msg) +nni_aio_finish_impl(nni_aio *aio, int rv, size_t count, nni_msg *msg) { nni_mtx_lock(&nni_aio_lk); @@ -380,12 +367,9 @@ nni_aio_finish_impl( nni_list_node_remove(&aio->a_expire_node); aio->a_pend = 1; - aio->a_result = result; + aio->a_result = rv; aio->a_count = count; aio->a_prov_cancel = NULL; - if (pipe) { - aio->a_pipe = pipe; - } if (msg) { aio->a_msg = msg; } @@ -409,27 +393,20 @@ nni_aio_finish_impl( void nni_aio_finish(nni_aio *aio, int result, size_t count) { - nni_aio_finish_impl(aio, result, count, NULL, NULL); + nni_aio_finish_impl(aio, result, count, NULL); } void nni_aio_finish_error(nni_aio *aio, int result) { - nni_aio_finish_impl(aio, result, 0, NULL, NULL); -} - -void -nni_aio_finish_pipe(nni_aio *aio, void *pipe) -{ - NNI_ASSERT(pipe != NULL); - nni_aio_finish_impl(aio, 0, 0, pipe, NULL); + nni_aio_finish_impl(aio, result, 0, NULL); } void nni_aio_finish_msg(nni_aio *aio, nni_msg *msg) { NNI_ASSERT(msg != NULL); - nni_aio_finish_impl(aio, 0, nni_msg_len(msg), NULL, msg); + nni_aio_finish_impl(aio, 0, nni_msg_len(msg), msg); } void @@ -609,7 +586,7 @@ nni_aio_iov_count(nni_aio *aio) { size_t resid = 0; - for (int i = 0; i < aio->a_niov; i++) { + for (unsigned i = 0; i < aio->a_niov; i++) { resid += aio->a_iov[i].iov_len; } return (resid); diff --git a/src/core/aio.h b/src/core/aio.h index b17c8e97..718eeb91 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -78,8 +78,6 @@ extern void *nni_aio_get_output(nni_aio *, int); // XXX: These should be refactored in terms of generic inputs and outputs. extern void nni_aio_set_msg(nni_aio *, nni_msg *); extern nni_msg *nni_aio_get_msg(nni_aio *); -extern void nni_aio_set_pipe(nni_aio *, void *); -extern void * nni_aio_get_pipe(nni_aio *); // nni_aio_set_synch sets a synchronous completion flag on the AIO. // When this is set, the next time the AIO is completed, the callback @@ -126,7 +124,6 @@ extern int nni_aio_list_active(nni_aio *); // nni_aio_finish is called by the provider when an operation is complete. extern void nni_aio_finish(nni_aio *, int, size_t); extern void nni_aio_finish_error(nni_aio *, int); -extern void nni_aio_finish_pipe(nni_aio *, void *); extern void nni_aio_finish_msg(nni_aio *, nni_msg *); // nni_aio_abort is used to abort an operation. Any pending I/O or diff --git a/src/core/endpt.c b/src/core/endpt.c index 4a2c3097..b7167ad7 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -364,7 +364,7 @@ nni_ep_con_cb(void *arg) int rv; if ((rv = nni_aio_result(aio)) == 0) { - rv = nni_pipe_create(ep, nni_aio_get_pipe(aio)); + rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0)); } nni_mtx_lock(&ep->ep_mtx); switch (rv) { @@ -448,7 +448,7 @@ nni_ep_dial(nni_ep *ep, int flags) // As we're synchronous, we also have to handle the completion. if (((rv = nni_aio_result(aio)) != 0) || - ((rv = nni_pipe_create(ep, nni_aio_get_pipe(aio))) != 0)) { + ((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) { nni_mtx_lock(&ep->ep_mtx); ep->ep_started = 0; nni_mtx_unlock(&ep->ep_mtx); @@ -464,8 +464,8 @@ nni_ep_acc_cb(void *arg) int rv; if ((rv = nni_aio_result(aio)) == 0) { - NNI_ASSERT(nni_aio_get_pipe(aio) != NULL); - rv = nni_pipe_create(ep, nni_aio_get_pipe(aio)); + NNI_ASSERT(nni_aio_get_output(aio, 0) != NULL); + rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0)); } nni_mtx_lock(&ep->ep_mtx); @@ -503,7 +503,6 @@ nni_ep_acc_start(nni_ep *ep) if (ep->ep_closing) { return; } - nni_aio_set_pipe(aio, NULL); ep->ep_ops.ep_accept(ep->ep_data, aio); } diff --git a/src/core/url.c b/src/core/url.c index 6a2a4e53..2cdb43c2 100644 --- a/src/core/url.c +++ b/src/core/url.c @@ -284,7 +284,7 @@ nni_url_parse(nni_url **urlp, const char *raw) rv = NNG_ENOMEM; goto error; } - for (int i = 0; i < len; i++) { + for (size_t i = 0; i < len; i++) { url->u_scheme[i] = tolower(s[i]); } url->u_scheme[len] = '\0'; @@ -334,7 +334,7 @@ nni_url_parse(nni_url **urlp, const char *raw) } // Copy the host portion, but make it lower case (hostnames are // case insensitive). - for (int i = 0; i < len; i++) { + for (size_t i = 0; i < len; i++) { url->u_host[i] = tolower(s[i]); } url->u_host[len] = '\0'; @@ -1140,7 +1140,7 @@ void nng_aio_finish(nng_aio *aio, int rv) { // Preserve the count. - return (nni_aio_finish(aio, rv, nni_aio_count(aio))); + nni_aio_finish(aio, rv, nni_aio_count(aio)); } #if 0 diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 931ed052..b5e65826 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -74,7 +74,8 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) if (rv != 0) { nni_aio_finish_error(aio, rv); } else { - nni_aio_finish_pipe(aio, pd); + nni_aio_set_output(aio, 0, pd); + nni_aio_finish(aio, 0, 0); } } @@ -317,7 +318,6 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) // connection is ready for us. There isn't anything else for us to // do really, as that will have been done in listen. nni_mtx_lock(&ed->mtx); - nni_aio_set_pipe(aio, NULL); // If we can't start, it means that the AIO was stopped. if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { nni_mtx_unlock(&ed->mtx); @@ -343,7 +343,6 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) int fd; nni_mtx_lock(&ed->mtx); - nni_aio_set_pipe(aio, NULL); // If we can't start, it means that the AIO was stopped. if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { nni_mtx_unlock(&ed->mtx); diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index 63ba8a67..9851af25 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -321,7 +321,8 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) return; } - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); } static void @@ -448,7 +449,8 @@ nni_win_ipc_conn_thr(void *arg) ((rv = nni_win_iocp_register(p)) != 0)) { goto fail; } - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); continue; fail: diff --git a/src/platform/windows/win_tcp.c b/src/platform/windows/win_tcp.c index 254cf40b..6d58d495 100644 --- a/src/platform/windows/win_tcp.c +++ b/src/platform/windows/win_tcp.c @@ -105,7 +105,7 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) DWORD niov; DWORD flags; nni_plat_tcp_pipe *pipe = evt->ptr; - int i; + unsigned i; unsigned naiov; nni_iov * aiov; WSABUF * iov; @@ -469,7 +469,8 @@ nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio) memcpy(&pipe->sockname, sa1, len1); memcpy(&pipe->peername, sa2, len2); - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); } static int @@ -512,7 +513,6 @@ nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) void nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) { - nni_aio_set_pipe(aio, NULL); nni_win_event_submit(&ep->acc_ev, aio); } @@ -562,7 +562,8 @@ nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio) len = sizeof(pipe->sockname); (void) getsockname(s, (SOCKADDR *) &pipe->sockname, &len); - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); } static int @@ -632,7 +633,6 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) extern void nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) { - nni_aio_set_pipe(aio, NULL); nni_win_event_submit(&ep->con_ev, aio); } diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index 918b7b09..9058b6e2 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -44,7 +44,7 @@ http_conn_done(void *arg) nni_mtx_lock(&c->mtx); rv = nni_aio_result(c->connaio); - p = rv == 0 ? nni_aio_get_pipe(c->connaio) : NULL; + p = rv == 0 ? nni_aio_get_output(c->connaio, 0) : NULL; if ((aio = nni_list_first(&c->aios)) == NULL) { if (p != NULL) { nni_plat_tcp_pipe_fini(p); diff --git a/src/supplemental/http/http_public.c b/src/supplemental/http/http_public.c index 9d23ed8e..5437a000 100644 --- a/src/supplemental/http/http_public.c +++ b/src/supplemental/http/http_public.c @@ -737,7 +737,7 @@ void nng_http_client_connect(nng_http_client *cli, nng_aio *aio) { #ifdef NNG_SUPP_HTTP - return (nni_http_client_connect(cli, aio)); + nni_http_client_connect(cli, aio); #else NNI_ARG_UNUSED(cli); if (nni_aio_start(aio, NULL, NULL)) { diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index 4a426c8c..4c89708d 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -719,7 +719,7 @@ http_server_acccb(void *arg) nni_mtx_unlock(&s->mtx); return; } - tcp = nni_aio_get_pipe(aio); + tcp = nni_aio_get_output(aio, 0); if (s->closed) { // If we're closing, then reject this one. nni_plat_tcp_pipe_fini(tcp); diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index ad4ce196..05a2c62c 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -1174,7 +1174,8 @@ ws_http_cb_listener(nni_ws *ws, nni_aio *aio) ws->ready = true; if ((aio = nni_list_first(&l->aios)) != NULL) { nni_list_remove(&l->aios, aio); - nni_aio_finish_pipe(aio, ws); + nni_aio_set_output(aio, 0, ws); + nni_aio_finish(aio, 0, 0); } else { nni_list_append(&l->pend, ws); } @@ -1269,7 +1270,8 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) nni_list_remove(&d->wspend, ws); ws->ready = true; ws->useraio = NULL; - nni_aio_finish_pipe(uaio, ws); + nni_aio_set_output(uaio, 0, ws); + nni_aio_finish(uaio, 0, 0); nni_mtx_unlock(&d->mtx); return; err: @@ -1621,7 +1623,8 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio) } if ((ws = nni_list_first(&l->pend)) != NULL) { nni_list_remove(&l->pend, ws); - nni_aio_finish_pipe(aio, ws); + nni_aio_set_output(aio, 0, ws); + nni_aio_finish(aio, 0, 0); } else { nni_list_append(&l->aios, aio); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 6329a627..f9f438ab 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -217,14 +217,11 @@ nni_inproc_ep_fini(void *arg) } static void -nni_inproc_conn_finish(nni_aio *aio, int rv) +nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe) { nni_inproc_ep *ep = nni_aio_get_prov_data(aio); - void * pipe; nni_aio_list_remove(aio); - pipe = nni_aio_get_pipe(aio); - nni_aio_set_pipe(aio, NULL); if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) && nni_list_empty(&ep->aios)) { @@ -232,11 +229,10 @@ nni_inproc_conn_finish(nni_aio *aio, int rv) } if (rv == 0) { - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); } else { - if (pipe != NULL) { - nni_inproc_pipe_fini(pipe); - } + NNI_ASSERT(pipe == NULL); nni_aio_finish_error(aio, rv); } } @@ -255,67 +251,81 @@ nni_inproc_ep_close(void *arg) // Notify any waiting clients that we are closed. while ((client = nni_list_first(&ep->clients)) != NULL) { while ((aio = nni_list_first(&client->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); } nni_list_remove(&ep->clients, client); } while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECLOSED); + nni_inproc_conn_finish(aio, NNG_ECLOSED, NULL); } nni_mtx_unlock(&nni_inproc.mx); } static void -nni_inproc_accept_clients(nni_inproc_ep *server) +nni_inproc_accept_clients(nni_inproc_ep *srv) { - nni_inproc_ep * client, *nclient; - nni_aio * saio, *caio; - nni_inproc_pair *pair; - int rv; + nni_inproc_ep *cli, *nclient; + + nclient = nni_list_first(&srv->clients); + while ((cli = nclient) != NULL) { + nni_aio *caio; + nclient = nni_list_next(&srv->clients, nclient); + NNI_LIST_FOREACH (&cli->aios, caio) { - nclient = nni_list_first(&server->clients); - while ((client = nclient) != NULL) { - nclient = nni_list_next(&server->clients, nclient); - NNI_LIST_FOREACH (&client->aios, caio) { - if ((saio = nni_list_first(&server->aios)) == NULL) { + nni_inproc_pipe *cpipe; + nni_inproc_pipe *spipe; + nni_inproc_pair *pair; + nni_aio * saio; + int rv; + + if ((saio = nni_list_first(&srv->aios)) == NULL) { // No outstanding accept() calls. break; } if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { - nni_inproc_conn_finish(caio, NNG_ENOMEM); - nni_inproc_conn_finish(saio, NNG_ENOMEM); + nni_inproc_conn_finish(caio, NNG_ENOMEM, NULL); + nni_inproc_conn_finish(saio, NNG_ENOMEM, NULL); continue; } + nni_mtx_init(&pair->mx); - if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || + spipe = cpipe = NULL; + if (((rv = nni_inproc_pipe_init(&cpipe, cli)) != 0) || + ((rv = nni_inproc_pipe_init(&spipe, srv)) != 0) || + ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) { + + if (cpipe != NULL) { + nni_inproc_pipe_fini(cpipe); + } + if (spipe != NULL) { + nni_inproc_pipe_fini(spipe); + } + nni_inproc_conn_finish(caio, rv, NULL); + nni_inproc_conn_finish(saio, rv, NULL); nni_inproc_pair_destroy(pair); - nni_inproc_conn_finish(caio, rv); - nni_inproc_conn_finish(saio, rv); continue; } - nni_mtx_init(&pair->mx); - - pair->pipes[0] = nni_aio_get_pipe(caio); - pair->pipes[1] = nni_aio_get_pipe(saio); - pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; - pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1]; - pair->pipes[0]->pair = pair->pipes[1]->pair = pair; - pair->pipes[1]->peer = pair->pipes[0]->proto; - pair->pipes[0]->peer = pair->pipes[1]->proto; - pair->refcnt = 2; - - nni_inproc_conn_finish(caio, 0); - nni_inproc_conn_finish(saio, 0); + spipe->peer = cpipe->proto; + cpipe->peer = spipe->proto; + pair->pipes[0] = cpipe; + pair->pipes[1] = spipe; + pair->refcnt = 2; + cpipe->pair = spipe->pair = pair; + cpipe->rq = spipe->wq = pair->q[0]; + cpipe->wq = spipe->rq = pair->q[1]; + + nni_inproc_conn_finish(caio, 0, cpipe); + nni_inproc_conn_finish(saio, 0, spipe); } - if (nni_list_first(&client->aios) == NULL) { - // No more outstanding client connects. Normally - // there should only be one. - if (nni_list_active(&server->clients, client)) { - nni_list_remove(&server->clients, client); + if (nni_list_first(&cli->aios) == NULL) { + // No more outstanding client connects. + // Normally there should only be one. + if (nni_list_active(&srv->clients, cli)) { + nni_list_remove(&srv->clients, cli); } } } @@ -324,17 +334,12 @@ nni_inproc_accept_clients(nni_inproc_ep *server) static void nni_inproc_ep_cancel(nni_aio *aio, int rv) { - nni_inproc_ep * ep = nni_aio_get_prov_data(aio); - nni_inproc_pipe *pipe; + nni_inproc_ep *ep = nni_aio_get_prov_data(aio); nni_mtx_lock(&nni_inproc.mx); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_list_node_remove(&ep->node); - if ((pipe = nni_aio_get_pipe(aio)) != NULL) { - nni_aio_set_pipe(aio, NULL); - nni_inproc_pipe_fini(pipe); - } nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&nni_inproc.mx); @@ -343,10 +348,9 @@ nni_inproc_ep_cancel(nni_aio *aio, int rv) static void nni_inproc_ep_connect(void *arg, nni_aio *aio) { - nni_inproc_ep * ep = arg; - nni_inproc_ep * server; - int rv; - nni_inproc_pipe *pipe; + nni_inproc_ep *ep = arg; + nni_inproc_ep *server; + int rv; if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish_error(aio, NNG_EINVAL); @@ -359,13 +363,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) return; } - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - nni_aio_set_pipe(aio, pipe); - // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { if (strcmp(server->addr, ep->addr) == 0) { @@ -373,7 +370,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) } } if (server == NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); nni_mtx_unlock(&nni_inproc.mx); return; } @@ -407,9 +404,8 @@ nni_inproc_ep_bind(void *arg) static void nni_inproc_ep_accept(void *arg, nni_aio *aio) { - nni_inproc_ep * ep = arg; - nni_inproc_pipe *pipe; - int rv; + nni_inproc_ep *ep = arg; + int rv; nni_mtx_lock(&nni_inproc.mx); @@ -419,16 +415,7 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) } // We are already on the master list of servers, thanks to bind. - - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - nni_aio_set_pipe(aio, pipe); - - // Insert us into the pending server aios, and then run the - // accept list. + // Insert us into pending server aios, and then run accept list. nni_aio_list_append(&ep->aios, aio); nni_inproc_accept_clients(ep); nni_mtx_unlock(&nni_inproc.mx); diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 6475e43b..9ebf3145 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -579,20 +579,20 @@ nni_ipc_ep_finish(nni_ipc_ep *ep) if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); + NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); + rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); done: - nni_aio_set_pipe(ep->aio, NULL); aio = ep->user_aio; ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { NNI_ASSERT(pipe != NULL); - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); return; } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 1504d0ee..475a77ff 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -631,19 +631,19 @@ nni_tcp_ep_finish(nni_tcp_ep *ep) if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); + NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); + rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); done: - nni_aio_set_pipe(ep->aio, NULL); aio = ep->user_aio; ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); return; } if (pipe != NULL) { diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 09c59582..fc993501 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -653,19 +653,19 @@ nni_tls_ep_finish(nni_tls_ep *ep) if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); + NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_tls_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); + rv = nni_tls_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); done: - nni_aio_set_pipe(ep->aio, NULL); aio = ep->user_aio; ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); return; } if (pipe != NULL) { diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 2fe7804c..9740cdde 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -205,7 +205,6 @@ ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws) { ws_pipe *p; int rv; - nni_aio *aio; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -225,11 +224,6 @@ ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws) p->lproto = ep->lproto; p->ws = ws; - if ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_aio_list_remove(aio); - nni_aio_finish_pipe(aio, p); - } - *pipep = p; return (0); } @@ -621,7 +615,7 @@ ws_ep_conn_cb(void *arg) nni_mtx_lock(&ep->mtx); if (nni_aio_result(caio) == 0) { - ws = nni_aio_get_pipe(caio); + ws = nni_aio_get_output(caio, 0); } if ((uaio = nni_list_first(&ep->aios)) == NULL) { // The client stopped caring about this! @@ -639,7 +633,8 @@ ws_ep_conn_cb(void *arg) nni_ws_fini(ws); nni_aio_finish_error(uaio, rv); } else { - nni_aio_finish_pipe(uaio, p); + nni_aio_set_output(uaio, 0, p); + nni_aio_finish(uaio, 0, 0); } nni_mtx_unlock(&ep->mtx); } @@ -672,7 +667,7 @@ ws_ep_acc_cb(void *arg) nni_aio_finish_error(uaio, rv); } } else { - nni_ws *ws = nni_aio_get_pipe(aaio); + nni_ws *ws = nni_aio_get_output(aaio, 0); if (uaio != NULL) { ws_pipe *p; // Make a pipe @@ -681,7 +676,8 @@ ws_ep_acc_cb(void *arg) nni_ws_close(ws); nni_aio_finish_error(uaio, rv); } else { - nni_aio_finish_pipe(uaio, p); + nni_aio_set_output(uaio, 0, p); + nni_aio_finish(uaio, 0, 0); } } } diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index d6762d25..6dddbb9a 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -690,7 +690,8 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) nni_idhash_remove(ztn->zn_eps, ep->ze_laddr); ep->ze_laddr = 0; - nni_aio_finish_pipe(aio, p); + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } static void @@ -2351,7 +2352,8 @@ zt_ep_doaccept(zt_ep *ep) } p->zp_peer = creq.cr_proto; zt_pipe_send_conn_ack(p); - nni_aio_finish_pipe(aio, p); + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } } @@ -2398,14 +2400,16 @@ zt_ep_conn_req_cb(void *arg) ep->ze_creq_active = 0; switch ((rv = nni_aio_result(aio))) { case 0: + p = nni_aio_get_output(aio, 0); // Already canceled, or already handled? if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { nni_aio_list_remove(uaio); - nni_aio_finish_pipe(uaio, nni_aio_get_pipe(aio)); + nni_aio_set_output(uaio, 0, p); + nni_aio_finish(uaio, 0, 0); } else { // We have a pipe, but nowhere to stick it. // Just discard it. - zt_pipe_fini(nni_aio_get_pipe(aio)); + zt_pipe_fini(p); } ep->ze_creq_try = 0; break; |
