aboutsummaryrefslogtreecommitdiff
path: root/src/transport/inproc/inproc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/inproc/inproc.c')
-rw-r--r--src/transport/inproc/inproc.c135
1 files changed, 61 insertions, 74 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 6329a627..f9f438ab 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -217,14 +217,11 @@ nni_inproc_ep_fini(void *arg)
}
static void
-nni_inproc_conn_finish(nni_aio *aio, int rv)
+nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe)
{
nni_inproc_ep *ep = nni_aio_get_prov_data(aio);
- void * pipe;
nni_aio_list_remove(aio);
- pipe = nni_aio_get_pipe(aio);
- nni_aio_set_pipe(aio, NULL);
if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) &&
nni_list_empty(&ep->aios)) {
@@ -232,11 +229,10 @@ nni_inproc_conn_finish(nni_aio *aio, int rv)
}
if (rv == 0) {
- nni_aio_finish_pipe(aio, pipe);
+ nni_aio_set_output(aio, 0, pipe);
+ nni_aio_finish(aio, 0, 0);
} else {
- if (pipe != NULL) {
- nni_inproc_pipe_fini(pipe);
- }
+ NNI_ASSERT(pipe == NULL);
nni_aio_finish_error(aio, rv);
}
}
@@ -255,67 +251,81 @@ nni_inproc_ep_close(void *arg)
// Notify any waiting clients that we are closed.
while ((client = nni_list_first(&ep->clients)) != NULL) {
while ((aio = nni_list_first(&client->aios)) != NULL) {
- nni_inproc_conn_finish(aio, NNG_ECONNREFUSED);
+ nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
}
nni_list_remove(&ep->clients, client);
}
while ((aio = nni_list_first(&ep->aios)) != NULL) {
- nni_inproc_conn_finish(aio, NNG_ECLOSED);
+ nni_inproc_conn_finish(aio, NNG_ECLOSED, NULL);
}
nni_mtx_unlock(&nni_inproc.mx);
}
static void
-nni_inproc_accept_clients(nni_inproc_ep *server)
+nni_inproc_accept_clients(nni_inproc_ep *srv)
{
- nni_inproc_ep * client, *nclient;
- nni_aio * saio, *caio;
- nni_inproc_pair *pair;
- int rv;
+ nni_inproc_ep *cli, *nclient;
+
+ nclient = nni_list_first(&srv->clients);
+ while ((cli = nclient) != NULL) {
+ nni_aio *caio;
+ nclient = nni_list_next(&srv->clients, nclient);
+ NNI_LIST_FOREACH (&cli->aios, caio) {
- nclient = nni_list_first(&server->clients);
- while ((client = nclient) != NULL) {
- nclient = nni_list_next(&server->clients, nclient);
- NNI_LIST_FOREACH (&client->aios, caio) {
- if ((saio = nni_list_first(&server->aios)) == NULL) {
+ nni_inproc_pipe *cpipe;
+ nni_inproc_pipe *spipe;
+ nni_inproc_pair *pair;
+ nni_aio * saio;
+ int rv;
+
+ if ((saio = nni_list_first(&srv->aios)) == NULL) {
// No outstanding accept() calls.
break;
}
if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) {
- nni_inproc_conn_finish(caio, NNG_ENOMEM);
- nni_inproc_conn_finish(saio, NNG_ENOMEM);
+ nni_inproc_conn_finish(caio, NNG_ENOMEM, NULL);
+ nni_inproc_conn_finish(saio, NNG_ENOMEM, NULL);
continue;
}
+ nni_mtx_init(&pair->mx);
- if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) ||
+ spipe = cpipe = NULL;
+ if (((rv = nni_inproc_pipe_init(&cpipe, cli)) != 0) ||
+ ((rv = nni_inproc_pipe_init(&spipe, srv)) != 0) ||
+ ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) ||
((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) {
+
+ if (cpipe != NULL) {
+ nni_inproc_pipe_fini(cpipe);
+ }
+ if (spipe != NULL) {
+ nni_inproc_pipe_fini(spipe);
+ }
+ nni_inproc_conn_finish(caio, rv, NULL);
+ nni_inproc_conn_finish(saio, rv, NULL);
nni_inproc_pair_destroy(pair);
- nni_inproc_conn_finish(caio, rv);
- nni_inproc_conn_finish(saio, rv);
continue;
}
- nni_mtx_init(&pair->mx);
-
- pair->pipes[0] = nni_aio_get_pipe(caio);
- pair->pipes[1] = nni_aio_get_pipe(saio);
- pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0];
- pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1];
- pair->pipes[0]->pair = pair->pipes[1]->pair = pair;
- pair->pipes[1]->peer = pair->pipes[0]->proto;
- pair->pipes[0]->peer = pair->pipes[1]->proto;
- pair->refcnt = 2;
-
- nni_inproc_conn_finish(caio, 0);
- nni_inproc_conn_finish(saio, 0);
+ spipe->peer = cpipe->proto;
+ cpipe->peer = spipe->proto;
+ pair->pipes[0] = cpipe;
+ pair->pipes[1] = spipe;
+ pair->refcnt = 2;
+ cpipe->pair = spipe->pair = pair;
+ cpipe->rq = spipe->wq = pair->q[0];
+ cpipe->wq = spipe->rq = pair->q[1];
+
+ nni_inproc_conn_finish(caio, 0, cpipe);
+ nni_inproc_conn_finish(saio, 0, spipe);
}
- if (nni_list_first(&client->aios) == NULL) {
- // No more outstanding client connects. Normally
- // there should only be one.
- if (nni_list_active(&server->clients, client)) {
- nni_list_remove(&server->clients, client);
+ if (nni_list_first(&cli->aios) == NULL) {
+ // No more outstanding client connects.
+ // Normally there should only be one.
+ if (nni_list_active(&srv->clients, cli)) {
+ nni_list_remove(&srv->clients, cli);
}
}
}
@@ -324,17 +334,12 @@ nni_inproc_accept_clients(nni_inproc_ep *server)
static void
nni_inproc_ep_cancel(nni_aio *aio, int rv)
{
- nni_inproc_ep * ep = nni_aio_get_prov_data(aio);
- nni_inproc_pipe *pipe;
+ nni_inproc_ep *ep = nni_aio_get_prov_data(aio);
nni_mtx_lock(&nni_inproc.mx);
if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_list_node_remove(&ep->node);
- if ((pipe = nni_aio_get_pipe(aio)) != NULL) {
- nni_aio_set_pipe(aio, NULL);
- nni_inproc_pipe_fini(pipe);
- }
nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&nni_inproc.mx);
@@ -343,10 +348,9 @@ nni_inproc_ep_cancel(nni_aio *aio, int rv)
static void
nni_inproc_ep_connect(void *arg, nni_aio *aio)
{
- nni_inproc_ep * ep = arg;
- nni_inproc_ep * server;
- int rv;
- nni_inproc_pipe *pipe;
+ nni_inproc_ep *ep = arg;
+ nni_inproc_ep *server;
+ int rv;
if (ep->mode != NNI_EP_MODE_DIAL) {
nni_aio_finish_error(aio, NNG_EINVAL);
@@ -359,13 +363,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
return;
}
- if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) {
- nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&nni_inproc.mx);
- return;
- }
- nni_aio_set_pipe(aio, pipe);
-
// Find a server.
NNI_LIST_FOREACH (&nni_inproc.servers, server) {
if (strcmp(server->addr, ep->addr) == 0) {
@@ -373,7 +370,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
}
}
if (server == NULL) {
- nni_inproc_conn_finish(aio, NNG_ECONNREFUSED);
+ nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
nni_mtx_unlock(&nni_inproc.mx);
return;
}
@@ -407,9 +404,8 @@ nni_inproc_ep_bind(void *arg)
static void
nni_inproc_ep_accept(void *arg, nni_aio *aio)
{
- nni_inproc_ep * ep = arg;
- nni_inproc_pipe *pipe;
- int rv;
+ nni_inproc_ep *ep = arg;
+ int rv;
nni_mtx_lock(&nni_inproc.mx);
@@ -419,16 +415,7 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio)
}
// We are already on the master list of servers, thanks to bind.
-
- if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) {
- nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&nni_inproc.mx);
- return;
- }
- nni_aio_set_pipe(aio, pipe);
-
- // Insert us into the pending server aios, and then run the
- // accept list.
+ // Insert us into pending server aios, and then run accept list.
nni_aio_list_append(&ep->aios, aio);
nni_inproc_accept_clients(ep);
nni_mtx_unlock(&nni_inproc.mx);