diff options
| -rw-r--r-- | src/core/aio.c | 14 | ||||
| -rw-r--r-- | src/core/endpt.c | 58 | ||||
| -rw-r--r-- | src/core/transport.h | 2 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 318 | ||||
| -rw-r--r-- | tests/inproc.c | 1 |
5 files changed, 264 insertions, 129 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 6a57ad52..f4512a34 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -50,6 +50,7 @@ nni_aio_fini(nni_aio *aio) nni_mtx_lock(&aio->a_lk); aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled cancelfn = aio->a_prov_cancel; + nni_cv_wake(&aio->a_cv); nni_mtx_unlock(&aio->a_lk); // Cancel the AIO if it was scheduled. @@ -71,7 +72,15 @@ nni_aio_fini(nni_aio *aio) int nni_aio_result(nni_aio *aio) { - return (aio->a_result); + int rv; + + nni_mtx_lock(&aio->a_lk); + rv = aio->a_result; + if (aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP)) { + rv = NNG_ECANCELED; + } + nni_mtx_unlock(&aio->a_lk); + return (rv); } @@ -96,7 +105,7 @@ void nni_aio_wait(nni_aio *aio) { nni_mtx_lock(&aio->a_lk); - while ((aio->a_flags & NNI_AIO_WAKE) == 0) { + while ((aio->a_flags & (NNI_AIO_WAKE|NNI_AIO_FINI)) == 0) { nni_cv_wait(&aio->a_cv); } nni_mtx_unlock(&aio->a_lk); @@ -143,6 +152,7 @@ nni_aio_stop(nni_aio *aio) nni_mtx_lock(&aio->a_lk); aio->a_prov_data = NULL; aio->a_prov_cancel = NULL; + nni_cv_wake(&aio->a_cv); nni_mtx_unlock(&aio->a_lk); // This either aborts the task, or waits for it to complete if already diff --git a/src/core/endpt.c b/src/core/endpt.c index b24d7643..20874c90 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -237,15 +237,40 @@ nni_ep_remove(nni_ep *ep) static int +nni_ep_connect_aio(nni_ep *ep, void **pipep) +{ + nni_aio aio; + int rv; + + nni_aio_init(&aio, NULL, NULL); + aio.a_endpt = ep->ep_data; + ep->ep_ops.ep_connect(ep->ep_data, &aio); + nni_aio_wait(&aio); + + if ((rv = nni_aio_result(&aio)) == 0) { + *pipep = aio.a_pipe; + } + nni_aio_fini(&aio); + return (rv); +} + + +static int nni_ep_connect_sync(nni_ep *ep) { nni_pipe *pipe; int rv; - if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) { + rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran); + if (rv != 0) { return (rv); } - rv = ep->ep_ops.ep_connect_sync(ep->ep_data, &pipe->p_tran_data); + if (ep->ep_ops.ep_connect != NULL) { + rv = nni_ep_connect_aio(ep, &pipe->p_tran_data); + } else { + rv = ep->ep_ops.ep_connect_sync(ep->ep_data, + &pipe->p_tran_data); + } if (rv != 0) { nni_pipe_remove(pipe); return (rv); @@ -400,6 +425,25 @@ nni_ep_dial(nni_ep *ep, int flags) static int +nni_ep_accept_aio(nni_ep *ep, void **pipep) +{ + nni_aio aio; + int rv; + + nni_aio_init(&aio, NULL, NULL); + aio.a_endpt = ep->ep_data; + ep->ep_ops.ep_accept(ep->ep_data, &aio); + nni_aio_wait(&aio); + + if ((rv = nni_aio_result(&aio)) == 0) { + *pipep = aio.a_pipe; + } + nni_aio_fini(&aio); + return (rv); +} + + +static int nni_ep_accept_sync(nni_ep *ep) { nni_pipe *pipe; @@ -408,10 +452,16 @@ nni_ep_accept_sync(nni_ep *ep) if (ep->ep_closed) { return (NNG_ECLOSED); } - if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) { + rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran); + if (rv != 0) { return (rv); } - rv = ep->ep_ops.ep_accept_sync(ep->ep_data, &pipe->p_tran_data); + if (ep->ep_ops.ep_accept != NULL) { + rv = nni_ep_accept_aio(ep, &pipe->p_tran_data); + } else { + rv = ep->ep_ops.ep_accept_sync(ep->ep_data, + &pipe->p_tran_data); + } if (rv != 0) { nni_pipe_remove(pipe); return (rv); diff --git a/src/core/transport.h b/src/core/transport.h index 5a057c93..05f2e7f7 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -52,6 +52,7 @@ struct nni_tran_ep { // transport specific endpoint, and the second is a pointer to // receive a newly created transport-specific pipe structure. int (*ep_connect_sync)(void *, void **); + void (*ep_connect)(void *, nni_aio *); // ep_bind just does the bind() and listen() work, // reserving the address but not creating any connections. @@ -64,6 +65,7 @@ struct nni_tran_ep { // is the transport-specific endpoint, and the second is a pointer to // a transport-specific pipe, created by this function. int (*ep_accept_sync)(void *, void **); + void (*ep_accept)(void *, nni_aio *); // ep_close stops the endpoint from operating altogether. It does // not affect pipes that have already been created. diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 5e06f3a8..d41e5505 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -33,6 +33,7 @@ struct nni_inproc_pipe { nni_msgq * rq; nni_msgq * wq; uint16_t peer; + uint16_t proto; }; // nni_inproc_pair represents a pair of pipes. Because we control both @@ -42,18 +43,17 @@ struct nni_inproc_pair { int refcnt; nni_msgq * q[2]; nni_inproc_pipe * pipes[2]; - char addr[NNG_MAXADDRLEN+1]; }; struct nni_inproc_ep { - char addr[NNG_MAXADDRLEN+1]; - int mode; - int closed; - nni_list_node node; - uint16_t proto; - nni_cv cv; - nni_list clients; - nni_inproc_pipe * cpipe; // connected pipe (DIAL only) + char addr[NNG_MAXADDRLEN+1]; + int mode; + int closed; + nni_list_node node; + uint16_t proto; + nni_cv cv; + nni_list clients; + nni_list aios; }; #define NNI_INPROC_EP_IDLE 0 @@ -113,13 +113,15 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair) static int -nni_inproc_pipe_init(nni_inproc_pipe **pipep) +nni_inproc_pipe_init(nni_inproc_pipe **pipep, nni_inproc_ep *ep) { nni_inproc_pipe *pipe; if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } + pipe->proto = ep->proto; + pipe->addr = ep->addr; *pipep = pipe; return (0); } @@ -227,16 +229,12 @@ nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock) if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_cv_init(&ep->cv, &nni_inproc.mx)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } ep->mode = NNI_INPROC_EP_IDLE; ep->closed = 0; ep->proto = nni_sock_proto(sock); - NNI_LIST_NODE_INIT(&ep->node); NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); + NNI_LIST_INIT(&ep->aios, nni_aio, a_prov_node); (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); *epp = ep; @@ -249,59 +247,184 @@ nni_inproc_ep_fini(void *arg) { nni_inproc_ep *ep = arg; - if (!ep->closed) { - nni_panic("inproc_ep_destroy while not closed!"); - } - nni_cv_fini(&ep->cv); + NNI_ASSERT(ep->closed); NNI_FREE_STRUCT(ep); } static void +nni_inproc_conn_finish(nni_aio *aio, int rv) +{ + nni_inproc_ep *ep = aio->a_endpt; + + if (rv != 0) { + if (aio->a_pipe != NULL) { + nni_inproc_pipe_fini(aio->a_pipe); + aio->a_pipe = NULL; + } + } + if (ep != NULL) { + if (nni_list_active(&ep->aios, aio)) { + nni_list_remove(&ep->aios, aio); + } + + if ((ep->mode != NNI_INPROC_EP_LISTEN) && + (nni_list_first(&ep->aios) == NULL)) { + if (nni_list_active(&ep->clients, ep)) { + nni_list_remove(&ep->clients, ep); + } + } + } + nni_aio_finish(aio, rv, 0); +} + + +static void nni_inproc_ep_close(void *arg) { nni_inproc_ep *ep = arg; + nni_inproc_ep *client; + nni_aio *aio; + + nni_mtx_lock(&nni_inproc.mx); + ep->closed = 1; + if (nni_list_active(&nni_inproc.servers, ep)) { + nni_list_remove(&nni_inproc.servers, ep); + } + // Notify any waiting clients that we are closed. + while ((client = nni_list_first(&ep->clients)) != NULL) { + while ((aio = nni_list_first(&client->aios)) != NULL) { + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); + } + nni_list_remove(&ep->clients, client); + } + while ((aio = nni_list_first(&ep->aios)) != NULL) { + nni_inproc_conn_finish(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&nni_inproc.mx); +} + + +static void +nni_inproc_connect_abort(nni_aio *aio) +{ + nni_inproc_ep *ep = aio->a_endpt; nni_mtx_lock(&nni_inproc.mx); - if (!ep->closed) { - ep->closed = 1; - if (ep->mode == NNI_INPROC_EP_LISTEN) { - nni_list_remove(&nni_inproc.servers, ep); - for (;;) { - // Notify waiting clients that we are closed. - nni_inproc_ep *client; - client = nni_list_first(&ep->clients); - if (client == NULL) { - break; - } - nni_list_remove(&ep->clients, client); - client->mode = NNI_INPROC_EP_IDLE; - nni_cv_wake(&client->cv); + + if (aio->a_pipe != NULL) { + nni_inproc_pipe_fini(aio->a_pipe); + aio->a_pipe = NULL; + } + if (ep != NULL) { + if (nni_list_active(&ep->aios, aio)) { + nni_list_remove(&ep->aios, aio); + } + + if ((ep->mode != NNI_INPROC_EP_LISTEN) && + (nni_list_first(&ep->aios) == NULL)) { + if (nni_list_active(&ep->clients, ep)) { + nni_list_remove(&ep->clients, ep); } } - nni_cv_wake(&ep->cv); } nni_mtx_unlock(&nni_inproc.mx); } -static int -nni_inproc_ep_connect_sync(void *arg, void **pipep) +static void +nni_inproc_accept_clients(nni_inproc_ep *server) +{ + nni_inproc_ep *client, *nclient; + nni_aio *saio, *caio; + nni_inproc_pair *pair; + int rv; + + nclient = nni_list_first(&server->clients); + while ((client = nclient) != NULL) { + nclient = nni_list_next(&server->clients, nclient); + NNI_LIST_FOREACH (&client->aios, caio) { + if ((saio = nni_list_first(&server->aios)) == NULL) { + // No outstanding accept() calls. + break; + } + + if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { + nni_inproc_conn_finish(caio, NNG_ENOMEM); + nni_inproc_conn_finish(saio, NNG_ENOMEM); + continue; + } + + if (((rv = nni_mtx_init(&pair->mx)) != 0) || + ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || + ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) { + nni_inproc_pair_destroy(pair); + nni_inproc_conn_finish(caio, rv); + nni_inproc_conn_finish(saio, rv); + continue; + } + + pair->pipes[0] = caio->a_pipe; + pair->pipes[1] = saio->a_pipe; + pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; + pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1]; + pair->pipes[0]->pair = pair->pipes[1]->pair = pair; + pair->pipes[1]->peer = pair->pipes[0]->proto; + pair->pipes[0]->peer = pair->pipes[1]->proto; + pair->refcnt = 2; + client->mode = NNI_INPROC_EP_IDLE; // XXX: ?? + + nni_inproc_conn_finish(caio, 0); + nni_inproc_conn_finish(saio, 0); + } + + if (nni_list_first(&client->aios) == NULL) { + // No more outstanding client connects. Normally + // there should only be one. + if (nni_list_active(&server->clients, client)) { + nni_list_remove(&server->clients, client); + } + } + } +} + + +static void +nni_inproc_ep_connect(void *arg, nni_aio *aio) { - nni_inproc_pipe *pipe; nni_inproc_ep *ep = arg; + nni_inproc_pipe *pipe; nni_inproc_ep *server; int rv; if (ep->mode != NNI_INPROC_EP_IDLE) { - return (NNG_EINVAL); + nni_aio_finish(aio, NNG_EINVAL, 0); + return; + } + if (nni_list_active(&ep->clients, ep)) { + nni_aio_finish(aio, NNG_EINVAL, 0); + return; } - if ((rv = nni_inproc_pipe_init(&pipe)) != 0) { - return (rv); + if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { + nni_aio_finish(aio, rv, 0); + return; } nni_mtx_lock(&nni_inproc.mx); + aio->a_pipe = pipe; + + if (nni_list_active(&ep->clients, ep)) { + // We already have a pending connection... + nni_inproc_conn_finish(aio, NNG_EINVAL); + return; + } + + if (ep->closed) { + nni_inproc_conn_finish(aio, rv); + nni_mtx_unlock(&nni_inproc.mx); + return; + } // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { @@ -313,38 +436,17 @@ nni_inproc_ep_connect_sync(void *arg, void **pipep) } } if (server == NULL) { + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); nni_mtx_unlock(&nni_inproc.mx); - nni_inproc_pipe_fini(pipe); - return (NNG_ECONNREFUSED); + return; } ep->mode = NNI_INPROC_EP_DIAL; - ep->cpipe = pipe; nni_list_append(&server->clients, ep); + nni_list_append(&ep->aios, aio); - while (ep->mode != NNI_INPROC_EP_IDLE) { - if (ep->closed) { - nni_list_remove(&server->clients, ep); - nni_mtx_unlock(&nni_inproc.mx); - nni_inproc_pipe_fini(pipe); - return (NNG_ECLOSED); - } - nni_cv_wake(&server->cv); - nni_cv_wait(&ep->cv); - } - - // If we got here, either we connected successfully, or the far end - // server closed on us. In the former case our cpipe will be NULL, - // having been cleared by the server. In the latter, the cpipe will - // still be set, indicating server shutdown. - if (ep->cpipe != NULL) { - nni_mtx_unlock(&nni_inproc.mx); - nni_inproc_pipe_fini(pipe); - return (NNG_ECONNRESET); - } + nni_inproc_accept_clients(server); nni_mtx_unlock(&nni_inproc.mx); - *pipep = pipe; - return (0); } @@ -379,69 +481,39 @@ nni_inproc_ep_bind(void *arg) } -static int -nni_inproc_ep_accept_sync(void *arg, void **pipep) +static void +nni_inproc_ep_accept(void *arg, nni_aio *aio) { nni_inproc_ep *ep = arg; - nni_inproc_ep *client; - nni_inproc_pair *pair; - int rv; nni_inproc_pipe *pipe; + int rv; if (ep->mode != NNI_INPROC_EP_LISTEN) { - return (NNG_EINVAL); + nni_aio_finish(aio, NNG_EINVAL, 0); } - // Preallocate the pair, so we don't do it while holding a lock - if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_mtx_init(&pair->mx)) != 0) { - NNI_FREE_STRUCT(pair); - return (rv); - } - if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || - ((rv = nni_msgq_init(&pair->q[1], 4)) != 0) || - ((rv = nni_inproc_pipe_init(&pipe)) != 0)) { - nni_inproc_pair_destroy(pair); - return (rv); + if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { + nni_aio_finish(aio, rv, 0); + return; } nni_mtx_lock(&nni_inproc.mx); - for (;;) { - if (ep->closed) { - // This is the only possible error path from the - // time we acquired the lock. - nni_mtx_unlock(&nni_inproc.mx); - nni_inproc_pair_destroy(pair); - nni_inproc_pipe_fini(pipe); - return (NNG_ECLOSED); - } - if ((client = nni_list_first(&ep->clients)) != NULL) { - break; - } - nni_cv_wait(&ep->cv); - } + aio->a_pipe = pipe; - nni_list_remove(&ep->clients, client); - pair->pipes[0] = client->cpipe; - pair->pipes[1] = pipe; - (void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr); - pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; - pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1]; - pair->pipes[0]->pair = pair->pipes[1]->pair = pair; - pair->pipes[0]->addr = pair->pipes[1]->addr = pair->addr; - pair->pipes[1]->peer = client->proto; - pair->pipes[0]->peer = ep->proto; - pair->refcnt = 2; - client->mode = NNI_INPROC_EP_IDLE; - client->cpipe = NULL; - nni_cv_wake(&client->cv); + // We are already on the master list of servers, thanks to bind. - *pipep = pipe; - nni_mtx_unlock(&nni_inproc.mx); + if (ep->closed) { + // This is the only possible error path from the + // time we acquired the lock. + nni_inproc_conn_finish(aio, NNG_ECLOSED); + return; + } - return (0); + // Insert us into the pending server aios, and then run the + // accept list. + nni_list_append(&ep->aios, aio); + nni_inproc_accept_clients(ep); + nni_mtx_unlock(&nni_inproc.mx); } @@ -455,14 +527,14 @@ static nni_tran_pipe nni_inproc_pipe_ops = { }; static nni_tran_ep nni_inproc_ep_ops = { - .ep_init = nni_inproc_ep_init, - .ep_fini = nni_inproc_ep_fini, - .ep_connect_sync = nni_inproc_ep_connect_sync, - .ep_bind = nni_inproc_ep_bind, - .ep_accept_sync = nni_inproc_ep_accept_sync, - .ep_close = nni_inproc_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_inproc_ep_init, + .ep_fini = nni_inproc_ep_fini, + .ep_connect = nni_inproc_ep_connect, + .ep_bind = nni_inproc_ep_bind, + .ep_accept = nni_inproc_ep_accept, + .ep_close = nni_inproc_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the inproc transport linkage, and should be the only global diff --git a/tests/inproc.c b/tests/inproc.c index 0375320f..0d1cafbf 100644 --- a/tests/inproc.c +++ b/tests/inproc.c @@ -16,4 +16,5 @@ TestMain("Inproc Transport", { trantest_test_all("inproc://TEST"); + nni_fini(); }) |
