From d606317f5c028fa8fba5d5384b0ccd90ffa4eab5 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 8 Feb 2018 14:16:23 -0800 Subject: fixes #171 Refactor aio to use generic data fields This addresses the use of the pipe special field, and eliminates it. The message APIs (recvmsg, sendmsg) need to be updated as well still, but I want to handle that as part of a separate issue. While here we fixed various compiler warnings, etc. --- src/transport/inproc/inproc.c | 135 +++++++++++++++++++----------------------- 1 file changed, 61 insertions(+), 74 deletions(-) (limited to 'src/transport/inproc') diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 6329a627..f9f438ab 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -217,14 +217,11 @@ nni_inproc_ep_fini(void *arg) } static void -nni_inproc_conn_finish(nni_aio *aio, int rv) +nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe) { nni_inproc_ep *ep = nni_aio_get_prov_data(aio); - void * pipe; nni_aio_list_remove(aio); - pipe = nni_aio_get_pipe(aio); - nni_aio_set_pipe(aio, NULL); if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) && nni_list_empty(&ep->aios)) { @@ -232,11 +229,10 @@ nni_inproc_conn_finish(nni_aio *aio, int rv) } if (rv == 0) { - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); } else { - if (pipe != NULL) { - nni_inproc_pipe_fini(pipe); - } + NNI_ASSERT(pipe == NULL); nni_aio_finish_error(aio, rv); } } @@ -255,67 +251,81 @@ nni_inproc_ep_close(void *arg) // Notify any waiting clients that we are closed. while ((client = nni_list_first(&ep->clients)) != NULL) { while ((aio = nni_list_first(&client->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); } nni_list_remove(&ep->clients, client); } while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECLOSED); + nni_inproc_conn_finish(aio, NNG_ECLOSED, NULL); } nni_mtx_unlock(&nni_inproc.mx); } static void -nni_inproc_accept_clients(nni_inproc_ep *server) +nni_inproc_accept_clients(nni_inproc_ep *srv) { - nni_inproc_ep * client, *nclient; - nni_aio * saio, *caio; - nni_inproc_pair *pair; - int rv; + nni_inproc_ep *cli, *nclient; + + nclient = nni_list_first(&srv->clients); + while ((cli = nclient) != NULL) { + nni_aio *caio; + nclient = nni_list_next(&srv->clients, nclient); + NNI_LIST_FOREACH (&cli->aios, caio) { - nclient = nni_list_first(&server->clients); - while ((client = nclient) != NULL) { - nclient = nni_list_next(&server->clients, nclient); - NNI_LIST_FOREACH (&client->aios, caio) { - if ((saio = nni_list_first(&server->aios)) == NULL) { + nni_inproc_pipe *cpipe; + nni_inproc_pipe *spipe; + nni_inproc_pair *pair; + nni_aio * saio; + int rv; + + if ((saio = nni_list_first(&srv->aios)) == NULL) { // No outstanding accept() calls. break; } if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { - nni_inproc_conn_finish(caio, NNG_ENOMEM); - nni_inproc_conn_finish(saio, NNG_ENOMEM); + nni_inproc_conn_finish(caio, NNG_ENOMEM, NULL); + nni_inproc_conn_finish(saio, NNG_ENOMEM, NULL); continue; } + nni_mtx_init(&pair->mx); - if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || + spipe = cpipe = NULL; + if (((rv = nni_inproc_pipe_init(&cpipe, cli)) != 0) || + ((rv = nni_inproc_pipe_init(&spipe, srv)) != 0) || + ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) { + + if (cpipe != NULL) { + nni_inproc_pipe_fini(cpipe); + } + if (spipe != NULL) { + nni_inproc_pipe_fini(spipe); + } + nni_inproc_conn_finish(caio, rv, NULL); + nni_inproc_conn_finish(saio, rv, NULL); nni_inproc_pair_destroy(pair); - nni_inproc_conn_finish(caio, rv); - nni_inproc_conn_finish(saio, rv); continue; } - nni_mtx_init(&pair->mx); - - pair->pipes[0] = nni_aio_get_pipe(caio); - pair->pipes[1] = nni_aio_get_pipe(saio); - pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; - pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1]; - pair->pipes[0]->pair = pair->pipes[1]->pair = pair; - pair->pipes[1]->peer = pair->pipes[0]->proto; - pair->pipes[0]->peer = pair->pipes[1]->proto; - pair->refcnt = 2; - - nni_inproc_conn_finish(caio, 0); - nni_inproc_conn_finish(saio, 0); + spipe->peer = cpipe->proto; + cpipe->peer = spipe->proto; + pair->pipes[0] = cpipe; + pair->pipes[1] = spipe; + pair->refcnt = 2; + cpipe->pair = spipe->pair = pair; + cpipe->rq = spipe->wq = pair->q[0]; + cpipe->wq = spipe->rq = pair->q[1]; + + nni_inproc_conn_finish(caio, 0, cpipe); + nni_inproc_conn_finish(saio, 0, spipe); } - if (nni_list_first(&client->aios) == NULL) { - // No more outstanding client connects. Normally - // there should only be one. - if (nni_list_active(&server->clients, client)) { - nni_list_remove(&server->clients, client); + if (nni_list_first(&cli->aios) == NULL) { + // No more outstanding client connects. + // Normally there should only be one. + if (nni_list_active(&srv->clients, cli)) { + nni_list_remove(&srv->clients, cli); } } } @@ -324,17 +334,12 @@ nni_inproc_accept_clients(nni_inproc_ep *server) static void nni_inproc_ep_cancel(nni_aio *aio, int rv) { - nni_inproc_ep * ep = nni_aio_get_prov_data(aio); - nni_inproc_pipe *pipe; + nni_inproc_ep *ep = nni_aio_get_prov_data(aio); nni_mtx_lock(&nni_inproc.mx); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_list_node_remove(&ep->node); - if ((pipe = nni_aio_get_pipe(aio)) != NULL) { - nni_aio_set_pipe(aio, NULL); - nni_inproc_pipe_fini(pipe); - } nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&nni_inproc.mx); @@ -343,10 +348,9 @@ nni_inproc_ep_cancel(nni_aio *aio, int rv) static void nni_inproc_ep_connect(void *arg, nni_aio *aio) { - nni_inproc_ep * ep = arg; - nni_inproc_ep * server; - int rv; - nni_inproc_pipe *pipe; + nni_inproc_ep *ep = arg; + nni_inproc_ep *server; + int rv; if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish_error(aio, NNG_EINVAL); @@ -359,13 +363,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) return; } - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - nni_aio_set_pipe(aio, pipe); - // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { if (strcmp(server->addr, ep->addr) == 0) { @@ -373,7 +370,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) } } if (server == NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); nni_mtx_unlock(&nni_inproc.mx); return; } @@ -407,9 +404,8 @@ nni_inproc_ep_bind(void *arg) static void nni_inproc_ep_accept(void *arg, nni_aio *aio) { - nni_inproc_ep * ep = arg; - nni_inproc_pipe *pipe; - int rv; + nni_inproc_ep *ep = arg; + int rv; nni_mtx_lock(&nni_inproc.mx); @@ -419,16 +415,7 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) } // We are already on the master list of servers, thanks to bind. - - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - nni_aio_set_pipe(aio, pipe); - - // Insert us into the pending server aios, and then run the - // accept list. + // Insert us into pending server aios, and then run accept list. nni_aio_list_append(&ep->aios, aio); nni_inproc_accept_clients(ep); nni_mtx_unlock(&nni_inproc.mx); -- cgit v1.2.3-70-g09d2