diff options
Diffstat (limited to 'src/transport/inproc')
| -rw-r--r-- | src/transport/inproc/inproc.c | 135 |
1 files changed, 61 insertions, 74 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 6329a627..f9f438ab 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -217,14 +217,11 @@ nni_inproc_ep_fini(void *arg) } static void -nni_inproc_conn_finish(nni_aio *aio, int rv) +nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe) { nni_inproc_ep *ep = nni_aio_get_prov_data(aio); - void * pipe; nni_aio_list_remove(aio); - pipe = nni_aio_get_pipe(aio); - nni_aio_set_pipe(aio, NULL); if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) && nni_list_empty(&ep->aios)) { @@ -232,11 +229,10 @@ nni_inproc_conn_finish(nni_aio *aio, int rv) } if (rv == 0) { - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); } else { - if (pipe != NULL) { - nni_inproc_pipe_fini(pipe); - } + NNI_ASSERT(pipe == NULL); nni_aio_finish_error(aio, rv); } } @@ -255,67 +251,81 @@ nni_inproc_ep_close(void *arg) // Notify any waiting clients that we are closed. while ((client = nni_list_first(&ep->clients)) != NULL) { while ((aio = nni_list_first(&client->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); } nni_list_remove(&ep->clients, client); } while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECLOSED); + nni_inproc_conn_finish(aio, NNG_ECLOSED, NULL); } nni_mtx_unlock(&nni_inproc.mx); } static void -nni_inproc_accept_clients(nni_inproc_ep *server) +nni_inproc_accept_clients(nni_inproc_ep *srv) { - nni_inproc_ep * client, *nclient; - nni_aio * saio, *caio; - nni_inproc_pair *pair; - int rv; + nni_inproc_ep *cli, *nclient; + + nclient = nni_list_first(&srv->clients); + while ((cli = nclient) != NULL) { + nni_aio *caio; + nclient = nni_list_next(&srv->clients, nclient); + NNI_LIST_FOREACH (&cli->aios, caio) { - nclient = nni_list_first(&server->clients); - while ((client = nclient) != NULL) { - nclient = nni_list_next(&server->clients, nclient); - NNI_LIST_FOREACH (&client->aios, caio) { - if ((saio = nni_list_first(&server->aios)) == NULL) { + nni_inproc_pipe *cpipe; + nni_inproc_pipe *spipe; + nni_inproc_pair *pair; + nni_aio * saio; + int rv; + + if ((saio = nni_list_first(&srv->aios)) == NULL) { // No outstanding accept() calls. break; } if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { - nni_inproc_conn_finish(caio, NNG_ENOMEM); - nni_inproc_conn_finish(saio, NNG_ENOMEM); + nni_inproc_conn_finish(caio, NNG_ENOMEM, NULL); + nni_inproc_conn_finish(saio, NNG_ENOMEM, NULL); continue; } + nni_mtx_init(&pair->mx); - if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || + spipe = cpipe = NULL; + if (((rv = nni_inproc_pipe_init(&cpipe, cli)) != 0) || + ((rv = nni_inproc_pipe_init(&spipe, srv)) != 0) || + ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) { + + if (cpipe != NULL) { + nni_inproc_pipe_fini(cpipe); + } + if (spipe != NULL) { + nni_inproc_pipe_fini(spipe); + } + nni_inproc_conn_finish(caio, rv, NULL); + nni_inproc_conn_finish(saio, rv, NULL); nni_inproc_pair_destroy(pair); - nni_inproc_conn_finish(caio, rv); - nni_inproc_conn_finish(saio, rv); continue; } - nni_mtx_init(&pair->mx); - - pair->pipes[0] = nni_aio_get_pipe(caio); - pair->pipes[1] = nni_aio_get_pipe(saio); - pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; - pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1]; - pair->pipes[0]->pair = pair->pipes[1]->pair = pair; - pair->pipes[1]->peer = pair->pipes[0]->proto; - pair->pipes[0]->peer = pair->pipes[1]->proto; - pair->refcnt = 2; - - nni_inproc_conn_finish(caio, 0); - nni_inproc_conn_finish(saio, 0); + spipe->peer = cpipe->proto; + cpipe->peer = spipe->proto; + pair->pipes[0] = cpipe; + pair->pipes[1] = spipe; + pair->refcnt = 2; + cpipe->pair = spipe->pair = pair; + cpipe->rq = spipe->wq = pair->q[0]; + cpipe->wq = spipe->rq = pair->q[1]; + + nni_inproc_conn_finish(caio, 0, cpipe); + nni_inproc_conn_finish(saio, 0, spipe); } - if (nni_list_first(&client->aios) == NULL) { - // No more outstanding client connects. Normally - // there should only be one. - if (nni_list_active(&server->clients, client)) { - nni_list_remove(&server->clients, client); + if (nni_list_first(&cli->aios) == NULL) { + // No more outstanding client connects. + // Normally there should only be one. + if (nni_list_active(&srv->clients, cli)) { + nni_list_remove(&srv->clients, cli); } } } @@ -324,17 +334,12 @@ nni_inproc_accept_clients(nni_inproc_ep *server) static void nni_inproc_ep_cancel(nni_aio *aio, int rv) { - nni_inproc_ep * ep = nni_aio_get_prov_data(aio); - nni_inproc_pipe *pipe; + nni_inproc_ep *ep = nni_aio_get_prov_data(aio); nni_mtx_lock(&nni_inproc.mx); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_list_node_remove(&ep->node); - if ((pipe = nni_aio_get_pipe(aio)) != NULL) { - nni_aio_set_pipe(aio, NULL); - nni_inproc_pipe_fini(pipe); - } nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&nni_inproc.mx); @@ -343,10 +348,9 @@ nni_inproc_ep_cancel(nni_aio *aio, int rv) static void nni_inproc_ep_connect(void *arg, nni_aio *aio) { - nni_inproc_ep * ep = arg; - nni_inproc_ep * server; - int rv; - nni_inproc_pipe *pipe; + nni_inproc_ep *ep = arg; + nni_inproc_ep *server; + int rv; if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish_error(aio, NNG_EINVAL); @@ -359,13 +363,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) return; } - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - nni_aio_set_pipe(aio, pipe); - // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { if (strcmp(server->addr, ep->addr) == 0) { @@ -373,7 +370,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) } } if (server == NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); nni_mtx_unlock(&nni_inproc.mx); return; } @@ -407,9 +404,8 @@ nni_inproc_ep_bind(void *arg) static void nni_inproc_ep_accept(void *arg, nni_aio *aio) { - nni_inproc_ep * ep = arg; - nni_inproc_pipe *pipe; - int rv; + nni_inproc_ep *ep = arg; + int rv; nni_mtx_lock(&nni_inproc.mx); @@ -419,16 +415,7 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) } // We are already on the master list of servers, thanks to bind. - - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - nni_aio_set_pipe(aio, pipe); - - // Insert us into the pending server aios, and then run the - // accept list. + // Insert us into pending server aios, and then run accept list. nni_aio_list_append(&ep->aios, aio); nni_inproc_accept_clients(ep); nni_mtx_unlock(&nni_inproc.mx); |
