diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 14 | ||||
| -rw-r--r-- | src/core/endpt.c | 58 | ||||
| -rw-r--r-- | src/core/transport.h | 2 |
3 files changed, 68 insertions, 6 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 6a57ad52..f4512a34 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -50,6 +50,7 @@ nni_aio_fini(nni_aio *aio) nni_mtx_lock(&aio->a_lk); aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled cancelfn = aio->a_prov_cancel; + nni_cv_wake(&aio->a_cv); nni_mtx_unlock(&aio->a_lk); // Cancel the AIO if it was scheduled. @@ -71,7 +72,15 @@ nni_aio_fini(nni_aio *aio) int nni_aio_result(nni_aio *aio) { - return (aio->a_result); + int rv; + + nni_mtx_lock(&aio->a_lk); + rv = aio->a_result; + if (aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP)) { + rv = NNG_ECANCELED; + } + nni_mtx_unlock(&aio->a_lk); + return (rv); } @@ -96,7 +105,7 @@ void nni_aio_wait(nni_aio *aio) { nni_mtx_lock(&aio->a_lk); - while ((aio->a_flags & NNI_AIO_WAKE) == 0) { + while ((aio->a_flags & (NNI_AIO_WAKE|NNI_AIO_FINI)) == 0) { nni_cv_wait(&aio->a_cv); } nni_mtx_unlock(&aio->a_lk); @@ -143,6 +152,7 @@ nni_aio_stop(nni_aio *aio) nni_mtx_lock(&aio->a_lk); aio->a_prov_data = NULL; aio->a_prov_cancel = NULL; + nni_cv_wake(&aio->a_cv); nni_mtx_unlock(&aio->a_lk); // This either aborts the task, or waits for it to complete if already diff --git a/src/core/endpt.c b/src/core/endpt.c index b24d7643..20874c90 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -237,15 +237,40 @@ nni_ep_remove(nni_ep *ep) static int +nni_ep_connect_aio(nni_ep *ep, void **pipep) +{ + nni_aio aio; + int rv; + + nni_aio_init(&aio, NULL, NULL); + aio.a_endpt = ep->ep_data; + ep->ep_ops.ep_connect(ep->ep_data, &aio); + nni_aio_wait(&aio); + + if ((rv = nni_aio_result(&aio)) == 0) { + *pipep = aio.a_pipe; + } + nni_aio_fini(&aio); + return (rv); +} + + +static int nni_ep_connect_sync(nni_ep *ep) { nni_pipe *pipe; int rv; - if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) { + rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran); + if (rv != 0) { return (rv); } - rv = ep->ep_ops.ep_connect_sync(ep->ep_data, &pipe->p_tran_data); + if (ep->ep_ops.ep_connect != NULL) { + rv = nni_ep_connect_aio(ep, &pipe->p_tran_data); + } else { + rv = ep->ep_ops.ep_connect_sync(ep->ep_data, + &pipe->p_tran_data); + } if (rv != 0) { nni_pipe_remove(pipe); return (rv); @@ -400,6 +425,25 @@ nni_ep_dial(nni_ep *ep, int flags) static int +nni_ep_accept_aio(nni_ep *ep, void **pipep) +{ + nni_aio aio; + int rv; + + nni_aio_init(&aio, NULL, NULL); + aio.a_endpt = ep->ep_data; + ep->ep_ops.ep_accept(ep->ep_data, &aio); + nni_aio_wait(&aio); + + if ((rv = nni_aio_result(&aio)) == 0) { + *pipep = aio.a_pipe; + } + nni_aio_fini(&aio); + return (rv); +} + + +static int nni_ep_accept_sync(nni_ep *ep) { nni_pipe *pipe; @@ -408,10 +452,16 @@ nni_ep_accept_sync(nni_ep *ep) if (ep->ep_closed) { return (NNG_ECLOSED); } - if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) { + rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran); + if (rv != 0) { return (rv); } - rv = ep->ep_ops.ep_accept_sync(ep->ep_data, &pipe->p_tran_data); + if (ep->ep_ops.ep_accept != NULL) { + rv = nni_ep_accept_aio(ep, &pipe->p_tran_data); + } else { + rv = ep->ep_ops.ep_accept_sync(ep->ep_data, + &pipe->p_tran_data); + } if (rv != 0) { nni_pipe_remove(pipe); return (rv); diff --git a/src/core/transport.h b/src/core/transport.h index 5a057c93..05f2e7f7 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -52,6 +52,7 @@ struct nni_tran_ep { // transport specific endpoint, and the second is a pointer to // receive a newly created transport-specific pipe structure. int (*ep_connect_sync)(void *, void **); + void (*ep_connect)(void *, nni_aio *); // ep_bind just does the bind() and listen() work, // reserving the address but not creating any connections. @@ -64,6 +65,7 @@ struct nni_tran_ep { // is the transport-specific endpoint, and the second is a pointer to // a transport-specific pipe, created by this function. int (*ep_accept_sync)(void *, void **); + void (*ep_accept)(void *, nni_aio *); // ep_close stops the endpoint from operating altogether. It does // not affect pipes that have already been created. |
